43#if LASS_PLATFORM_TYPE == LASS_PLATFORM_TYPE_WIN32
45# pragma warning(disable: 4267)
46# pragma warning(disable: 4996)
64template <
typename T,
typename C,
typename IP,
template <
typename,
typename,
typename>
class PP>
65ThreadPool<T, C, IP, PP>::ThreadPool(
66 size_t numberOfThreads,
67 size_t maximumNumberOfTasksInQueue,
68 const TConsumer& consumerPrototype,
70 TParticipationPolicy(consumerPrototype),
73 numThreads_(numberOfThreads == autoNumberOfThreads ?
numberOfProcessors() : numberOfThreads),
74 maxWaitingTasks_(maximumNumberOfTasksInQueue),
80 LASS_ENFORCE(numThreads_ > 0);
81 startThreads(consumerPrototype, name);
86template <
typename T,
typename C,
typename IP,
template <
typename,
typename,
typename>
class PP>
94 stopThreads(this->numDynamicThreads(numThreads_));
103template <
typename T,
typename C,
typename IP,
template <
typename,
typename,
typename>
class PP>
108 if (maxWaitingTasks_ == unlimitedNumberOfTasks || numWaitingTasks_ < maxWaitingTasks_)
112 waitingTasks_.push(task);
113 this->wakeConsumer();
116 this->sleepProducer();
117 this->rethrowError();
126template <
typename T,
typename C,
typename IP,
template <
typename,
typename,
typename>
class PP>
129 while (numWaitingTasks_ > 0 || numRunningTasks_ > 0)
131 if (this->participate(waitingTasks_))
135 this->sleepProducer();
136 this->rethrowError();
145template <
typename T,
typename C,
typename IP,
template <
typename,
typename,
typename>
class PP>
149 while (waitingTasks_.pop(dummy))
157template <
typename T,
typename C,
typename IP,
template <
typename,
typename,
typename>
class PP>
172template <
typename T,
typename C,
typename IP,
template <
typename,
typename,
typename>
class PP>
175 LASS_ASSERT(numThreads_ > 0);
176 const size_t dynThreads = this->numDynamicThreads(numThreads_);
183 const size_t bufferSize = 10;
184 char nameBuffer[bufferSize] =
"consumers";
187 strncpy(nameBuffer, name, bufferSize);
189 nameBuffer[bufferSize - 1] =
'\0';
190 const size_t length = strlen(nameBuffer);
191 const size_t indexLength = 2;
192 char* indexBuffer = nameBuffer + std::min(length, bufferSize - indexLength - 1);
194 threads_ =
static_cast<ConsumerThread*
>(malloc(dynThreads *
sizeof(ConsumerThread)));
197 throw std::bad_alloc();
201 size_t nextProcessor = numThreads_ - dynThreads;
204 for (i = 0; i < dynThreads; ++i)
207#if LASS_COMPILER_TYPE == LASS_COMPILER_TYPE_MSVC
212 indexBuffer, indexLength + 1,
"%02X",
int(i & 0xff));
213 indexBuffer[indexLength] =
'\0';
215 new (&threads_[i]) ConsumerThread(consumerPrototype, *
this, nameBuffer);
219 nextProcessor = threads_[i].bindToNextAvailable(nextProcessor);
223 threads_[i].~ConsumerThread();
240template <
typename T,
typename C,
typename IP,
template <
typename,
typename,
typename>
class PP>
246 this->wakeAllConsumers();
248 LASS_CATCH_TO_WARNING
250 LASS_ASSERT(
static_cast<int>(numAllocatedThreads) >= 0);
251 for (
size_t n = numAllocatedThreads; n > 0; --n)
255 threads_[n - 1].join();
257 LASS_CATCH_TO_WARNING
258 threads_[n - 1].~ConsumerThread();
266template <
typename T,
typename C,
typename IP,
template <
typename,
typename,
typename>
class PP>
269 std::lock_guard<std::mutex> lock(errorMutex_);
275 std::rethrow_exception(error);
283template <
typename T,
typename C,
typename IP,
template <
typename,
typename,
typename>
class PP>
285 const TConsumer& consumer, TSelf& pool,
const char* name):
294template <
typename T,
typename C,
typename IP,
template <
typename,
typename,
typename>
class PP>
298 const size_t lastBeforeError = nextProcessor + n;
303 this->bind(nextProcessor++ % n);
304 return nextProcessor % n;
308 if (nextProcessor >= lastBeforeError)
314 LASS_ASSERT_UNREACHABLE;
319#define LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(exception_type)\
320catch (const exception_type& error)\
322 pool_.error_.reset(new RemoteExceptionWrapper<exception_type>(error));\
326template <
typename T,
typename C,
typename IP,
template <
typename,
typename,
typename>
class PP>
330 while (!pool_.abort_)
332 if (pool_.waitingTasks_.pop(task))
334 ++pool_.numRunningTasks_;
335 --pool_.numWaitingTasks_;
336 pool_.wakeProducer();
343 std::lock_guard<std::mutex> lock(pool_.errorMutex_);
346 pool_.error_ = std::current_exception();
350 --pool_.numRunningTasks_;
352 else if (pool_.shutDown_)
358 pool_.sleepConsumer();
365template <
typename T>
inline
366void DefaultConsumer<T>::operator()(
typename util::CallTraits<T>::TParam task)
377#if LASS_PLATFORM_TYPE == LASS_PLATFORM_TYPE_WIN32
Production-Consumer concurrency pattern.
size_t numberOfProcessors()
Return highest id of processor + 1, in this machine.
@ threadJoinable
joinable thread, can be waited for
general utility, debug facilities, ...
Library for Assembled Shared Sources.