Library of Assembled Shared Sources
thread_pool.inl
Go to the documentation of this file.
1/** @file
2 * @author Bram de Greve (bram@cocamware.com)
3 * @author Tom De Muer (tom@cocamware.com)
4 *
5 * *** BEGIN LICENSE INFORMATION ***
6 *
7 * The contents of this file are subject to the Common Public Attribution License
8 * Version 1.0 (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
10 * http://lass.sourceforge.net/cpal-license. The License is based on the
11 * Mozilla Public License Version 1.1 but Sections 14 and 15 have been added to cover
12 * use of software over a computer network and provide for limited attribution for
13 * the Original Developer. In addition, Exhibit A has been modified to be consistent
14 * with Exhibit B.
15 *
16 * Software distributed under the License is distributed on an "AS IS" basis, WITHOUT
17 * WARRANTY OF ANY KIND, either express or implied. See the License for the specific
18 * language governing rights and limitations under the License.
19 *
20 * The Original Code is LASS - Library of Assembled Shared Sources.
21 *
22 * The Initial Developer of the Original Code is Bram de Greve and Tom De Muer.
23 * The Original Developer is the Initial Developer.
24 *
25 * All portions of the code written by the Initial Developer are:
26 * Copyright (C) 2004-2024 the Initial Developer.
27 * All Rights Reserved.
28 *
29 * Contributor(s):
30 *
31 * Alternatively, the contents of this file may be used under the terms of the
32 * GNU General Public License Version 2 or later (the GPL), in which case the
33 * provisions of GPL are applicable instead of those above. If you wish to allow use
34 * of your version of this file only under the terms of the GPL and not to allow
35 * others to use your version of this file under the CPAL, indicate your decision by
36 * deleting the provisions above and replace them with the notice and other
37 * provisions required by the GPL License. If you do not delete the provisions above,
38 * a recipient may use your version of this file under either the CPAL or the GPL.
39 *
40 * *** END LICENSE INFORMATION ***
41 */
42
43#if LASS_PLATFORM_TYPE == LASS_PLATFORM_TYPE_WIN32
44# pragma warning(push)
45# pragma warning(disable: 4267) // conversion from 'size_t' to 'lass::num::Tuint32', possible loss of data
46# pragma warning(disable: 4996) // this function or variable may be unsafe
47#endif
48
49#include <string.h>
50#include <stdio.h>
51
52namespace lass
53{
54namespace util
55{
56
57// --- public --------------------------------------------------------------------------------------
58
59/** @param iNumberOfThreads specify number of producer threads. Specify @a autoNumberOfThreads
60 * to automatically use as many threads as processors.
61 * @param mNumberOfTasksInQueue specifiy the maximum number of tasks that may be waiting in the
62 * queue. Specify @a unlimitedNumberOfTasks to have an unlimited queue.
63 */
64template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
65ThreadPool<T, C, IP, PP>::ThreadPool(
66 size_t numberOfThreads,
67 size_t maximumNumberOfTasksInQueue,
68 const TConsumer& consumerPrototype,
69 const char* name):
70 TParticipationPolicy(consumerPrototype),
71 error_(nullptr),
72 threads_(0),
73 numThreads_(numberOfThreads == autoNumberOfThreads ? numberOfProcessors() : numberOfThreads),
74 maxWaitingTasks_(maximumNumberOfTasksInQueue),
75 numWaitingTasks_(0),
76 numRunningTasks_(0),
77 shutDown_(false),
78 abort_(false)
79{
80 LASS_ENFORCE(numThreads_ > 0);
81 startThreads(consumerPrototype, name);
82}
83
84
85
86template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
88{
89 try
90 {
91 completeAllTasks();
92 }
93 LASS_CATCH_TO_WARNING
94 stopThreads(this->numDynamicThreads(numThreads_));
95}
96
97
98
99/** submit a task to the pool, and block if queue is full.
100 * Function waits until tasks can be added to queue without participating as producer
101 * (in case of SelfParticipating).
102 */
103template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
104void ThreadPool<T, C, IP, PP>::addTask(typename util::CallTraits<TTask>::TParam task)
105{
106 while (true)
107 {
108 if (maxWaitingTasks_ == unlimitedNumberOfTasks || numWaitingTasks_ < maxWaitingTasks_)
109 {
110 // because a single producer is assumed, we're certain we can push the task now
111 ++numWaitingTasks_;
112 waitingTasks_.push(task);
113 this->wakeConsumer();
114 return;
115 }
116 this->sleepProducer();
117 this->rethrowError();
118 }
119}
120
121
122
123/** blocks until all tasks in the queue are completed
124 * control thread participates as consumer if policy allows for it.
125 */
126template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
128{
129 while (numWaitingTasks_ > 0 || numRunningTasks_ > 0)
130 {
131 if (this->participate(waitingTasks_))
132 {
133 --numWaitingTasks_;
134 }
135 this->sleepProducer();
136 this->rethrowError();
137 }
138}
139
140
141
142/** clear queue without completing tasks.
143 * All waiting tasks in the queue are simply thrown away without ever being completed.
144 */
145template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
147{
148 TTask dummy;
149 while (waitingTasks_.pop(dummy))
150 {
151 --numWaitingTasks_;
152 }
153}
154
155
156
157template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
159{
160 return numThreads_;
161}
162
163
164
165// --- private -------------------------------------------------------------------------------------
166
167/** Allocate a bunch of threads and run them.
168 * We can't simply use a std::vector or an C-array, because the threads are neither
169 * copy-constructable or default constructable. So we need to do our own housekeeping.
170 * Don't worry folks, I know what I'm doing ;) [Bramz]
171 */
172template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
173void ThreadPool<T, C, IP, PP>::startThreads(const TConsumer& consumerPrototype, const char* name)
174{
175 LASS_ASSERT(numThreads_ > 0);
176 const size_t dynThreads = this->numDynamicThreads(numThreads_);
177 if (dynThreads == 0)
178 {
179 threads_ = 0;
180 return;
181 }
182
183 const size_t bufferSize = 10;
184 char nameBuffer[bufferSize] = "consumers";
185 if (name)
186 {
187 strncpy(nameBuffer, name, bufferSize);
188 }
189 nameBuffer[bufferSize - 1] = '\0';
190 const size_t length = strlen(nameBuffer);
191 const size_t indexLength = 2;
192 char* indexBuffer = nameBuffer + std::min(length, bufferSize - indexLength - 1);
193
194 threads_ = static_cast<ConsumerThread*>(malloc(dynThreads * sizeof(ConsumerThread)));
195 if (!threads_)
196 {
197 throw std::bad_alloc();
198 }
199
200 size_t i;
201 size_t nextProcessor = numThreads_ - dynThreads; // bind dynamic threads to "upper bits"
202 try
203 {
204 for (i = 0; i < dynThreads; ++i)
205 {
206
207#if LASS_COMPILER_TYPE == LASS_COMPILER_TYPE_MSVC
208 ::_snprintf(
209#else
210 ::snprintf(
211#endif
212 indexBuffer, indexLength + 1, "%02X", int(i & 0xff));
213 indexBuffer[indexLength] = '\0';
214
215 new (&threads_[i]) ConsumerThread(consumerPrototype, *this, nameBuffer);
216 try
217 {
218 threads_[i].run();
219 nextProcessor = threads_[i].bindToNextAvailable(nextProcessor);
220 }
221 catch (...)
222 {
223 threads_[i].~ConsumerThread();
224 throw;
225 }
226 }
227 }
228 catch (...)
229 {
230 stopThreads(i); // i == number of threads already started.
231 throw;
232 }
233}
234
235
236
237/** Deallocate the threads and free memory.
238 * @throw no exceptions should be throw, nor bubble up from this function ... ever!
239 */
240template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
241void ThreadPool<T, C, IP, PP>::stopThreads(size_t numAllocatedThreads)
242{
243 shutDown_ = true;
244 try
245 {
246 this->wakeAllConsumers();
247 }
248 LASS_CATCH_TO_WARNING
249
250 LASS_ASSERT(static_cast<int>(numAllocatedThreads) >= 0);
251 for (size_t n = numAllocatedThreads; n > 0; --n)
252 {
253 try
254 {
255 threads_[n - 1].join();
256 }
257 LASS_CATCH_TO_WARNING
258 threads_[n - 1].~ConsumerThread(); // shouldn't throw anyway ...
259 }
260
261 free(threads_); // shouldn't throw
262}
263
264
265
266template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
268{
269 std::lock_guard<std::mutex> lock(errorMutex_);
270 if (error_)
271 {
272 auto error = error_;
273 error_ = nullptr;
274 abort_ = true;
275 std::rethrow_exception(error);
276 }
277}
278
279
280
281// --- ConsumerThread ------------------------------------------------------------------------------
282
283template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
285 const TConsumer& consumer, TSelf& pool, const char* name):
286 Thread(threadJoinable, name),
287 consumer_(consumer),
288 pool_(pool)
289{
290}
291
292
293
294template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
296{
297 const size_t n = numberOfProcessors();
298 const size_t lastBeforeError = nextProcessor + n;
299 while (true)
300 {
301 try
302 {
303 this->bind(nextProcessor++ % n);
304 return nextProcessor % n;
305 }
306 catch (...)
307 {
308 if (nextProcessor >= lastBeforeError)
309 {
310 throw;
311 }
312 }
313 }
314 LASS_ASSERT_UNREACHABLE;
315}
316
317
318
319#define LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(exception_type)\
320catch (const exception_type& error)\
321{\
322 pool_.error_.reset(new RemoteExceptionWrapper<exception_type>(error));\
323 return;\
324}
325
326template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
328{
329 TTask task;
330 while (!pool_.abort_)
331 {
332 if (pool_.waitingTasks_.pop(task))
333 {
334 ++pool_.numRunningTasks_;
335 --pool_.numWaitingTasks_;
336 pool_.wakeProducer();
337 try
338 {
339 consumer_(task);
340 }
341 catch (...)
342 {
343 std::lock_guard<std::mutex> lock(pool_.errorMutex_);
344 if (!pool_.error_)
345 {
346 pool_.error_ = std::current_exception();
347 }
348 return;
349 }
350 --pool_.numRunningTasks_;
351 }
352 else if (pool_.shutDown_)
353 {
354 return;
355 }
356 else
357 {
358 pool_.sleepConsumer();
359 }
360 }
361}
362
363// --- DefaultConsumer -----------------------------------------------------------------------------
364
365template <typename T> inline
366void DefaultConsumer<T>::operator()(typename util::CallTraits<T>::TParam task)
367{
368 task();
369}
370
371
372
373}
374
375}
376
377#if LASS_PLATFORM_TYPE == LASS_PLATFORM_TYPE_WIN32
378# pragma warning(pop)
379#endif
380
381// EOF
Production-Consumer concurrency pattern.
size_t numberOfProcessors()
Return highest id of processor + 1, in this machine.
Definition thread.cpp:74
@ threadJoinable
joinable thread, can be waited for
Definition thread.h:111
general utility, debug facilities, ...
Library for Assembled Shared Sources.
Definition config.h:53