125#ifndef LASS_GUARDIAN_OF_INCLUSION_UTIL_THREAD_POOL_H
126#define LASS_GUARDIAN_OF_INCLUSION_UTIL_THREAD_POOL_H
145template <
typename TaskType>
149 void operator()(
typename util::CallTraits<TaskType>::TParam task);
165 void sleepProducer() { LASS_SPIN_PAUSE; }
166 void wakeProducer() {}
167 void sleepConsumer() { LASS_SPIN_PAUSE; }
168 void wakeConsumer() {}
169 void wakeAllConsumers() {}
185 void sleepProducer() { producer_.wait(mSecsToSleep_); }
186 void wakeProducer() { producer_.signal(); }
187 void sleepConsumer() { consumer_.wait(mSecsToSleep_); }
188 void wakeConsumer() { consumer_.signal(); }
189 void wakeAllConsumers() { consumer_.broadcast(); }
191 enum { mSecsToSleep_ = 50 };
204template <
typename TaskType,
typename ConsumerType,
typename IdlePolicy>
205class SelfParticipating:
public IdlePolicy
208 SelfParticipating(
const ConsumerType& prototype): IdlePolicy(), consumer_(prototype) {
Thread::bindCurrent(0); }
210 size_t numDynamicThreads(
size_t numThreads)
const {
return numThreads - 1; }
211 template <
typename Queue>
bool participate(Queue& queue)
222 ConsumerType consumer_;
233template <
typename TaskType,
typename ConsumerType,
typename IdlePolicy>
234class NotParticipating:
public IdlePolicy
237 NotParticipating(
const ConsumerType&): IdlePolicy() {}
238 ~NotParticipating() {}
239 size_t numDynamicThreads(
size_t numThreads)
const {
return numThreads; }
240 template <
typename Queue>
bool participate(Queue&) {
return false; }
251 template <
typename,
typename,
typename>
class ParticipationPolicy =
NotParticipating
253class ThreadPool:
public ParticipationPolicy<TaskType, ConsumerType, IdlePolicy>
257 typedef TaskType TTask;
258 typedef ConsumerType TConsumer;
259 typedef IdlePolicy TIdlePolicy;
260 typedef ParticipationPolicy<TaskType, ConsumerType, IdlePolicy> TParticipationPolicy;
261 typedef ThreadPool<TaskType, ConsumerType, IdlePolicy, ParticipationPolicy> TSelf;
265 autoNumberOfThreads = 0,
266 unlimitedNumberOfTasks = 0
269 ThreadPool(
size_t numberOfThreads = autoNumberOfThreads,
270 size_t maximumNumberOfTasksInQueue = unlimitedNumberOfTasks,
271 const TConsumer& consumerPrototype = TConsumer(),
272 const char* name = 0);
275 void addTask(
typename util::CallTraits<TTask>::TParam task);
276 void completeAllTasks();
278 size_t numberOfThreads()
const;
284 friend class ConsumerThread;
286 class ConsumerThread:
public Thread
289 ConsumerThread(
const TConsumer& consumer, TSelf& pool,
const char* name);
290 size_t bindToNextAvailable(
size_t processor);
292 void doRun()
override;
297 void startThreads(
const TConsumer& consumerPrototype,
const char* name);
298 void stopThreads(
size_t numAllocatedThreads);
301 TTaskQueue waitingTasks_;
302 std::exception_ptr error_;
303 std::mutex errorMutex_;
304 ConsumerThread* threads_;
305 unsigned long mSecsToSleep_;
307 size_t maxWaitingTasks_;
308 std::atomic<size_t> numWaitingTasks_;
309 std::atomic<size_t> numRunningTasks_;
310 std::atomic<bool> shutDown_;
311 std::atomic<bool> abort_;
Production-Consumer concurrency pattern.
Non-blocking, lock-free FIFO data structure.
callback with 0 parameter(s) and without returnvalue.
Default consumer calls operator() of task.
implementation of ThreadPool's ParticipationPolicy
implementation of ThreadPool's IdlePolicy
static void bindCurrent(size_t processor)
bind current thread to a processor (current as in callee's context)
static constexpr size_t anyProcessor
argument for Thread::bind to unbind the thread so it runs on any processor
Library for Assembled Shared Sources.