池
池是一种以空间换时间的思想。对于内存,线程,连接这类资源,每次使用时都需要申请,申请完后都要释放。一旦内存块/线程/连接数变多,这之中的资源消耗就会变多,因此我们可以先申请一定数量的资源,等待需要使用时,可以直接在高层分配,而不需要向操作系统申请,在使用完后,也不直接销毁,而是清空,等待下一次循环使用,这就是池。
线程池
通过先在池中生成一定数量的线程,先令其挂起,等待有新的线程任务时再执行。结束后回归池,实现资源复用。
具体结构
- 池本身(逻辑结构,并不实际存在):需要实现创建池,销毁池,和外部添加任务。
- 作业队列:外部添加的任务全部会按顺序加入这个队列,等待工作线程获取并执行。
- 工作线程:以一个循环包裹的线程,循环中无限等待作业队列是否有新任务并处理。
作业队列需要在任务添加时立即通知所有的工作线程,让工作线程可以尝试获取新任务。
工作线程获取新任务时,要确保其它工作线程不能获取同一个任务。
具体逻辑
内存池类:
- 创建池:创建n个工作线程和一个作业队列,初始化上面需要的锁getwork和条件变量ispushing。
- 作业队列:一个队列,每个元素是作业结构,其中有函数CallBack和传参args。
- 添加任务的公开函数:传入一个CallBack生成一个作业结构push到队列末尾。并用WakeAllConditionVariable唤醒所有等待ispushing的工作线程。
- 工作线程:处于循环,先EnterCriticalSection getwork,如果没有任务,等待ispushing,否则取任务,并执行。
- 销毁线程池:设置一个flag标志,如果工作线程检测到flag,同时没有任务,立即return。然后销毁作业队列和线程池本身。
实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
| #pragma once #include <queue> #include <stdexcept> #include <Windows.h> class ThreadPool { typedef DWORD(*pCallBack)(PVOID args); struct Job { pCallBack _callBack; PVOID _args; };
private: std::queue<Job> jobQueue; std::vector<HANDLE> threads; CRITICAL_SECTION cs; CONDITION_VARIABLE cv; int maxJob; int threadNum; bool stop;
private: static DWORD WINAPI ThreadFunc(LPVOID lpParam) { ThreadPool* pool = static_cast<ThreadPool*>(lpParam); while (true) { EnterCriticalSection(&pool->cs); while (pool->jobQueue.empty() && !pool->stop) { SleepConditionVariableCS(&pool->cv, &pool->cs, INFINITE); } if (pool->jobQueue.empty() && pool->stop) { LeaveCriticalSection(&pool->cs); break; } Job job = pool->jobQueue.front(); pool->jobQueue.pop(); LeaveCriticalSection(&pool->cs); if (job._callBack) { job._callBack(job._args); } } return 0; } bool allThreadExitIfStop() {
WakeAllConditionVariable(&cv); for (auto ithread : threads) { WaitForSingleObject(ithread, INFINITE); CloseHandle(ithread); } threads.clear(); return true; }
public: ThreadPool(int threadNum = 4, int maxJob = 100) : maxJob(maxJob), threadNum(threadNum), stop(false) { if (maxJob <= 0 || threadNum <= 0) { throw std::invalid_argument("maxJob and threadNum must be greater than 0."); } InitializeCriticalSection(&cs); InitializeConditionVariable(&cv);
for (int i = 0; i < threadNum; ++i) { HANDLE thread = CreateThread(NULL, 0, ThreadFunc, this, 0, NULL); if (thread == NULL) { stop = true; allThreadExitIfStop();
DeleteCriticalSection(&cs); throw std::runtime_error("CreateThread failed."); } threads.push_back(thread);
} } ~ThreadPool() { stop = true; allThreadExitIfStop(); DeleteCriticalSection(&cs); } bool addWork(pCallBack callBack, PVOID args) { EnterCriticalSection(&cs); if (jobQueue.size() >= maxJob) { LeaveCriticalSection(&cs); return false; } jobQueue.push({ callBack, args }); WakeConditionVariable(&cv); LeaveCriticalSection(&cs); return true; }
};
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| #include <chrono> #include <windows.h> #include <iostream> #include"ThreadPool.hpp"
DWORD myWork(PVOID i) { using namespace std; cout << "Thread ID: " << GetCurrentThreadId() << " is working on task " << *(PDWORD)i << endl; Sleep(500); return 0; }
int main(int argc, char* argv[]) { auto startTime = std::chrono::high_resolution_clock::now(); ThreadPool pool = ThreadPool(10, 100); for (int i = 0; i < 100; ++i) { DWORD* arg = new DWORD(i); pool.addWork(myWork, arg);
} return 0; }
|