diff --git a/src/engine/helper/HL_ThreadPool.cpp b/src/engine/helper/HL_ThreadPool.cpp new file mode 100644 index 00000000..742a2541 --- /dev/null +++ b/src/engine/helper/HL_ThreadPool.cpp @@ -0,0 +1,51 @@ +#include "HL_ThreadPool.h" + +thread_pool::thread_pool(size_t num_of_threads, const std::string& name) +{ + this->name = name; + if (!num_of_threads) + num_of_threads = std::thread::hardware_concurrency(); + + for(size_t idx = 0; idx < num_of_threads; idx++) + this->workers.push_back(std::thread(&thread_pool::run_worker, this)); +} + +// Add new work item to the pool +void thread_pool::enqueue(const thread_pool::task& t) +{ + { + std::unique_lock lock(this->queue_mutex); + this->tasks.push(t); + } + this->condition.notify_one(); +} + + +// the destructor joins all threads +thread_pool::~thread_pool() +{ + { + std::unique_lock lock(this->queue_mutex); + stop = true; + } + this->condition.notify_all(); + for(std::thread &worker: workers) + worker.join(); +} + +void thread_pool::run_worker() +{ + ThreadHelper::setName(this->name); + task t; + while(!this->stop) + { + { + std::unique_lock lock(this->queue_mutex); + + this->condition.wait(lock, [this]{return !this->tasks.empty() || this->stop;}); + t = this->tasks.front(); + this->tasks.pop(); + } + t(); // function type + } +} diff --git a/src/engine/helper/HL_ThreadPool.h b/src/engine/helper/HL_ThreadPool.h index d82a7e9e..9f2ce013 100644 --- a/src/engine/helper/HL_ThreadPool.h +++ b/src/engine/helper/HL_ThreadPool.h @@ -13,99 +13,31 @@ #include #include "HL_Sync.h" -class ThreadPool +class thread_pool { public: - ThreadPool(size_t, const std::string&); - template - auto enqueue(F&& f, Args&&... args) - -> std::future::type>; - ~ThreadPool(); - void setPause(bool v) { this->pause = v;} + typedef std::function task; + + thread_pool(size_t num_of_threads, const std::string&); + ~thread_pool(); + + void enqueue(const task& task); + private: // need to keep track of threads so we can join them std::vector< std::thread > workers; + // the task queue - std::queue< std::function > tasks; + std::queue< task > tasks; // synchronization std::mutex queue_mutex; std::condition_variable condition; - bool stop, pause; + bool stop = false; + + // thread name prefix for worker threads std::string name; + + void run_worker(); }; - -// the constructor just launches some amount of workers -inline ThreadPool::ThreadPool(size_t threads, const std::string& name) - : stop(false), pause(false), name(name) -{ - for(size_t i = 0;iname + std::to_string(i)); - for(;;) - { - std::function task; bool task_assigned = false; - - { - std::unique_lock lock(this->queue_mutex); - this->condition.wait(lock, - [this]{ return this->stop || !this->pause || !this->tasks.empty(); }); - if (this->tasks.empty()) - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - if(this->stop && this->tasks.empty()) - return; - if(this->pause) - continue; - if (this->tasks.size()) - { - task = std::move(this->tasks.front()); task_assigned = true; - this->tasks.pop(); - } - } - if (task_assigned) - task(); - } - } - ); -} - -// add new work item to the pool -template -auto ThreadPool::enqueue(F&& f, Args&&... args) - -> std::future::type> -{ - using return_type = typename std::result_of::type; - - auto task = std::make_shared< std::packaged_task >( - std::bind(std::forward(f), std::forward(args)...) - ); - - std::future res = task->get_future(); - { - std::unique_lock lock(queue_mutex); - - // don't allow enqueueing after stopping the pool - if(stop) - throw std::runtime_error("enqueue on stopped ThreadPool"); - - tasks.emplace([task](){ (*task)(); }); - } - condition.notify_one(); - return res; -} - -// the destructor joins all threads -inline ThreadPool::~ThreadPool() -{ - { - std::unique_lock lock(queue_mutex); - stop = true; - } - condition.notify_all(); - for(std::thread &worker: workers) - worker.join(); -} - #endif