线程池处理异步任务队列
线程池处理异步任务队列
/// <author>cxg 2020-9-3</author> /// 支持d7以上版本,更低版本没有测试,支持跨OS unit tasks; interface uses {$IFDEF mswindows} Windows, {$ENDIF} {$IFDEF posix} posix.Unistd, posix.Semaphore, {$ENDIF} Contnrs, SyncObjs, Classes, SysUtils; type TCallBack = procedure(task: Pointer) of object; type TThreadConf = class private fCallBack: TCallBack; fThreadNum: Integer; fWorkers: array of TThread; procedure freeThreads; procedure newThreads; public constructor Create(const threadNum: Integer = 0); destructor Destroy; override; procedure startThreads; procedure stopThreads; procedure allotTask(task: Pointer); property onCallback: TCallBack read fCallBack write fCallBack; end; type TWorkThread = class(TThread) private fConf: TThreadConf; fQueue: TQueue; public constructor Create(conf: TThreadConf); destructor Destroy; override; procedure Execute; override; procedure enqueue(task: Pointer); end; function cpuNum: Integer; implementation var {$IFDEF mswindows} hsem: THandle; //信号量 {$ELSE} hsem: sem_t; {$ENDIF} gIndex: Integer; function cpuNum: Integer; {$IFDEF MSWINDOWS} var si: SYSTEM_INFO; {$ENDIF} begin {$IFDEF MSWINDOWS} GetSystemInfo(si); Result := si.dwNumberOfProcessors; {$ELSE} Result := sysconf(_SC_NPROCESSORS_ONLN); {$ENDIF} end; { TThreadConf } procedure TThreadConf.allotTask(task: Pointer); var i: integer; thread: TWorkThread; begin i := AtomicIncrement(gIndex) mod fThreadNum; thread := fWorkers[i] as TWorkThread; thread.enqueue(task); end; constructor TThreadConf.Create(const threadNum: Integer = 0); begin fThreadNum := threadNum; if fThreadNum = 0 then fThreadNum := cpuNum; SetLength(fWorkers, fThreadNum); {$IFDEF mswindows} hsem := CreateSemaphore(nil, 0, threadNum , nil); {$ELSE} sem_init(hsem, 0, 0); {$ENDIF} newThreads; end; destructor TThreadConf.Destroy; begin freeThreads; {$IFDEF mswindows} CloseHandle(hsem); {$ELSE} sem_destroy(hsem); {$ENDIF} inherited; end; procedure TThreadConf.freeThreads; var i: Integer; begin for i := 0 to fThreadNum - 1 do begin fWorkers[i].Terminate; fWorkers[i].WaitFor; FreeAndNil(fWorkers[i]); end; end; procedure TThreadConf.newThreads; var i: Integer; begin for i := 0 to fThreadNum - 1 do fWorkers[i] := TWorkThread.Create(Self); end; procedure TThreadConf.startThreads; var i: Integer; begin for i := 0 to fThreadNum - 1 do {$IFDEF unicode} fWorkers[i].Start; {$ELSE} fWorkers[i].Resume; {$ENDIF} end; procedure TThreadConf.stopThreads; var i: Integer; begin for i := 0 to fThreadNum - 1 do fWorkers[i].Suspend; end; { TWorkThread } constructor TWorkThread.Create(conf: TThreadConf); begin inherited Create(True); FreeOnTerminate := true; fConf := conf; fQueue := TQueue.Create; end; destructor TWorkThread.Destroy; begin FreeAndNil(fQueue); inherited; end; procedure TWorkThread.enqueue(task: Pointer); begin fQueue.Push(task); {$IFDEF mswindows} ReleaseSemaphore(hsem, 1, nil); {$ELSE} sem_post(hsem); {$ENDIF} end; procedure TWorkThread.Execute; var task: Pointer; procedure run; begin task := fQueue.Pop; if task <> nil then if Assigned(fConf.fCallBack) then fConf.fCallBack(task); end; begin {$IFDEF mswindows} if WaitForSingleObject(hSem, INFINITE) = WAIT_OBJECT_0 then begin run; ReleaseSemaphore(hsem, 1, nil); end; {$ELSE} if sem_wait(hsem) > 0 then begin run; sem_post(hsem); end; {$ENDIF} end; end.