线程池处理异步任务队列


	线程池处理异步任务队列
[编程语言教程]

线程池处理异步任务队列

/// <author>cxg 2020-9-3</author>
/// 支持d7以上版本,更低版本没有测试,支持跨OS

unit tasks;

interface

uses
  {$IFDEF mswindows}
  Windows,
  {$ENDIF}
  {$IFDEF posix}
  posix.Unistd, posix.Semaphore,
  {$ENDIF}
  Contnrs, SyncObjs, Classes, SysUtils;

type
  TCallBack = procedure(task: Pointer) of object;

type
  TThreadConf = class
  private
    fCallBack: TCallBack;
    fThreadNum: Integer;
    fWorkers: array of TThread;
    procedure freeThreads;
    procedure newThreads;
  public
    constructor Create(const threadNum: Integer = 0);
    destructor Destroy; override;
    procedure startThreads;
    procedure stopThreads;
    procedure allotTask(task: Pointer);
    property onCallback: TCallBack read fCallBack write fCallBack;
  end;

type
  TWorkThread = class(TThread)
  private
    fConf: TThreadConf;
    fQueue: TQueue;
  public
    constructor Create(conf: TThreadConf);
    destructor Destroy; override;
    procedure Execute; override;
    procedure enqueue(task: Pointer);
  end;

function cpuNum: Integer;

implementation

var
  {$IFDEF mswindows}
  hsem: THandle;   //信号量
  {$ELSE}
  hsem: sem_t;
  {$ENDIF}
  gIndex: Integer;

function cpuNum: Integer;
{$IFDEF MSWINDOWS}
var
  si: SYSTEM_INFO;
{$ENDIF}
begin
  {$IFDEF MSWINDOWS}
  GetSystemInfo(si);
  Result := si.dwNumberOfProcessors;
  {$ELSE}
  Result := sysconf(_SC_NPROCESSORS_ONLN);
  {$ENDIF}
end;

{ TThreadConf }

procedure TThreadConf.allotTask(task: Pointer);
var
  i: integer;
  thread: TWorkThread;
begin
  i := AtomicIncrement(gIndex) mod fThreadNum;
  thread := fWorkers[i] as TWorkThread;
  thread.enqueue(task);
end;

constructor TThreadConf.Create(const threadNum: Integer = 0);
begin
  fThreadNum := threadNum;
  if fThreadNum = 0 then
    fThreadNum := cpuNum;
  SetLength(fWorkers, fThreadNum);
  {$IFDEF mswindows}
  hsem := CreateSemaphore(nil, 0, threadNum , nil);
  {$ELSE}
  sem_init(hsem, 0, 0);
  {$ENDIF}
  newThreads;
end;

destructor TThreadConf.Destroy;
begin
  freeThreads;
  {$IFDEF mswindows}
  CloseHandle(hsem);
  {$ELSE}
  sem_destroy(hsem);
  {$ENDIF}
  inherited;
end;

procedure TThreadConf.freeThreads;
var
  i: Integer;
begin
  for i := 0 to fThreadNum - 1 do
  begin
    fWorkers[i].Terminate;
    fWorkers[i].WaitFor;
    FreeAndNil(fWorkers[i]);
  end;
end;

procedure TThreadConf.newThreads;
var
  i: Integer;
begin
  for i := 0 to fThreadNum - 1 do
    fWorkers[i] := TWorkThread.Create(Self);
end;

procedure TThreadConf.startThreads;
var
  i: Integer;
begin
  for i := 0 to fThreadNum - 1 do
    {$IFDEF unicode}
    fWorkers[i].Start;
    {$ELSE}
    fWorkers[i].Resume;
    {$ENDIF}
end;

procedure TThreadConf.stopThreads;
var
  i: Integer;
begin
  for i := 0 to fThreadNum - 1 do
    fWorkers[i].Suspend;
end;

{ TWorkThread }

constructor TWorkThread.Create(conf: TThreadConf);
begin
  inherited Create(True);
  FreeOnTerminate := true;
  fConf := conf;
  fQueue := TQueue.Create;
end;

destructor TWorkThread.Destroy;
begin
  FreeAndNil(fQueue);
  inherited;
end;

procedure TWorkThread.enqueue(task: Pointer);
begin
  fQueue.Push(task);
  {$IFDEF mswindows}
  ReleaseSemaphore(hsem, 1, nil);
  {$ELSE}
  sem_post(hsem);
  {$ENDIF}
end;

procedure TWorkThread.Execute;
var
  task: Pointer;
  procedure run;
  begin
    task := fQueue.Pop;
    if task <> nil then
      if Assigned(fConf.fCallBack) then
        fConf.fCallBack(task);
  end;
begin
  {$IFDEF mswindows}
  if WaitForSingleObject(hSem, INFINITE) = WAIT_OBJECT_0 then
  begin
    run;
    ReleaseSemaphore(hsem, 1, nil);
  end;
  {$ELSE}
  if sem_wait(hsem) > 0 then
  begin
    run;
    sem_post(hsem);
  end;
  {$ENDIF}
end;

end.
hmoban主题是根据ripro二开的主题,极致后台体验,无插件,集成会员系统
自学咖网 » 线程池处理异步任务队列