Library of Assembled Shared Sources
thread_pool.h
Go to the documentation of this file.
1/** @file
2 * @author Bram de Greve (bram@cocamware.com)
3 * @author Tom De Muer (tom@cocamware.com)
4 *
5 * *** BEGIN LICENSE INFORMATION ***
6 *
7 * The contents of this file are subject to the Common Public Attribution License
8 * Version 1.0 (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
10 * http://lass.sourceforge.net/cpal-license. The License is based on the
11 * Mozilla Public License Version 1.1 but Sections 14 and 15 have been added to cover
12 * use of software over a computer network and provide for limited attribution for
13 * the Original Developer. In addition, Exhibit A has been modified to be consistent
14 * with Exhibit B.
15 *
16 * Software distributed under the License is distributed on an "AS IS" basis, WITHOUT
17 * WARRANTY OF ANY KIND, either express or implied. See the License for the specific
18 * language governing rights and limitations under the License.
19 *
20 * The Original Code is LASS - Library of Assembled Shared Sources.
21 *
22 * The Initial Developer of the Original Code is Bram de Greve and Tom De Muer.
23 * The Original Developer is the Initial Developer.
24 *
25 * All portions of the code written by the Initial Developer are:
26 * Copyright (C) 2004-2023 the Initial Developer.
27 * All Rights Reserved.
28 *
29 * Contributor(s):
30 *
31 * Alternatively, the contents of this file may be used under the terms of the
32 * GNU General Public License Version 2 or later (the GPL), in which case the
33 * provisions of GPL are applicable instead of those above. If you wish to allow use
34 * of your version of this file only under the terms of the GPL and not to allow
35 * others to use your version of this file under the CPAL, indicate your decision by
36 * deleting the provisions above and replace them with the notice and other
37 * provisions required by the GPL License. If you do not delete the provisions above,
38 * a recipient may use your version of this file under either the CPAL or the GPL.
39 *
40 * *** END LICENSE INFORMATION ***
41 */
42
43/** @class ThreadPool
44 * @ingroup Threading
45 * @brief Production-Consumer concurrency pattern
46 * @author Bramz
47 *
48 * @code
49 * #include <lass/util/thread_pool.h>
50 * #include <lass/util/bind.h>
51 * using namespace lass::util;
52 *
53 * void parallelWork(int begin, int end);
54 * const int n = 1000000;
55 * const int step = 1000;
56 *
57 * util::ThreadPool<> pool;
58 * for (int i = 0; i < n; i += step)
59 * {
60 * pool.addTask(bind(parallelWork, i, std::min(i + step, n)));
61 * }
62 * pool.completeAllTasks();
63 * @endcode
64 *
65 * @section Good-Practice
66 *
67 * Using the policies and constructor parameters, the ThreadPool can be operated in many
68 * different configurations. However, there are two that are particular interesting:
69 *
70 * @subsection 1 fast synchronized thread pool
71 *
72 * Best used when short bursts of a bounded number of tasks are followed by completeAllTasks().
73 * Example: A single task is splitted in a parallel subtasks, all feeded to the pool at once,
74 * and completeAllTasks() is called to finish the grand tasks.
75
76 * @arg IdlePolicy: Spinning, for a fast reaction time
77 * @arg ParticipationPolicy: SelfParticipating, so that during completeAllTasks() the cpu cycles
78 * of the control thread are not wasted by spinning.
79 * @arg numberOfThreads: autoNumberOfThreads
80 * @arg maxNumberOfTasksInQueue: unlimitedNumberOfTasks, because the number of tasks in the queue
81 * can be (and should be) bounded by the application itself, and we want the control thread
82 * to go in completeAllTasks() ASAP, so it can donate cpu cycles to completing the tasks
83 * instead of waisting them spinning until another task can be added to the queue.
84 *
85 * @subsection 2 slower continues thread pool
86 *
87 * Best used when a continues (unbounded) stream of tasks reaches a server.
88 *
89 * @arg IdlePolicy: Signaled, to avoid needing an extra cpu for the control thread.
90 * @arg ParticipationPolicy: NotParticipating, because you'll never call completeAllTasks()
91 * @arg numberOfThreads: autoNumberOfThreads
92 * @arg maxNumberOfTasksInQueue: <A LIMITED NUMBER> to avoid an excessive queue size
93 *
94 * @section Policies
95 *
96 * @subsection IdlePolicy
97 *
98 * @code
99 * class IdlePolicy
100 * {
101 * protected:
102 * void sleepProducer();
103 * void wakeProducer();
104 * void sleepConsumer();
105 * void wakeConsumer();
106 * void wakeAllConsumers();
107 * };
108 * @endcode
109 *
110 * @subsection ParticipationPolicy
111 *
112 * @code
113 * template <typename TaskType, typename ConsumerType, typename IdlePolicy>
114 * class ParticipationPolicy: public IdlePolicy
115 * {
116 * protected:
117 * NotParticipating(const ConsumerType& prototype);
118 * const unsigned numDynamicThreads(unsigned numThreads) const;
119 * template <typename Queue> bool participate(Queue&);
120 * };
121 * @endcode
122 *
123 */
124
125#ifndef LASS_GUARDIAN_OF_INCLUSION_UTIL_THREAD_POOL_H
126#define LASS_GUARDIAN_OF_INCLUSION_UTIL_THREAD_POOL_H
127
128#include "util_common.h"
129#include "thread.h"
130#include "callback_0.h"
132#include "future.h"
133#include <cstring>
134#include <mutex>
135
136namespace lass
137{
138namespace util
139{
140
141/** Default consumer calls operator() of task.
142 * @ingroup Threading
143 * @sa ThreadPool
144 */
145template <typename TaskType>
147{
148public:
149 void operator()(typename util::CallTraits<TaskType>::TParam task);
150};
151
152/** implementation of ThreadPool's IdlePolicy
153 * @ingroup Threading
154 * @sa ThreadPool
155 *
156 * Idle threads don't go to sleep, but spin until a condition is met.
157 * This results in a faster response but it should only be used if each thread
158 * (including the control thread) can have it's own cpu or core.
159 */
160class Spinning
161{
162protected:
163 Spinning() {}
164 ~Spinning() {}
165 void sleepProducer() { LASS_SPIN_PAUSE; }
166 void wakeProducer() {}
167 void sleepConsumer() { LASS_SPIN_PAUSE; }
168 void wakeConsumer() {}
169 void wakeAllConsumers() {}
170};
171
172/** implementation of ThreadPool's IdlePolicy
173 * @ingroup Threading
174 * @sa ThreadPool
175 *
176 * Idle threads are put to sleep and signaled to awake when a condition is met.
177 * It results in a slower response of the threads, but it avoids wasting cpu cycles
178 * to idle threads.
179 */
180class Signaled
181{
182protected:
183 Signaled() {}
184 ~Signaled() {}
185 void sleepProducer() { producer_.wait(mSecsToSleep_); }
186 void wakeProducer() { producer_.signal(); }
187 void sleepConsumer() { consumer_.wait(mSecsToSleep_); }
188 void wakeConsumer() { consumer_.signal(); }
189 void wakeAllConsumers() { consumer_.broadcast(); }
190private:
191 enum { mSecsToSleep_ = 50 };
192 Condition producer_;
193 Condition consumer_;
194};
195
196/** implementation of ThreadPool's ParticipationPolicy
197 * @ingroup Threading
198 * @sa ThreadPool
199 *
200 * The control thread participates as producer while waiting to complete all tasks.
201 * Is best used when short burst of adding a limited number of tasks to an
202 * unlimited queue, followed by a completeAllTasks().
203 */
204template <typename TaskType, typename ConsumerType, typename IdlePolicy>
205class SelfParticipating: public IdlePolicy
206{
207protected:
208 SelfParticipating(const ConsumerType& prototype): IdlePolicy(), consumer_(prototype) { Thread::bindCurrent(0); }
209 ~SelfParticipating() { Thread::bindCurrent(Thread::anyProcessor); }
210 size_t numDynamicThreads(size_t numThreads) const { return numThreads - 1; }
211 template <typename Queue> bool participate(Queue& queue)
212 {
213 TaskType task;
214 if (queue.pop(task))
215 {
216 consumer_(task);
217 return true;
218 }
219 return false;
220 }
221private:
222 ConsumerType consumer_;
223};
224
225/** implementation of ThreadPool's ParticipationPolicy
226 * @ingroup Threading
227 * @sa ThreadPool
228 *
229 * The control thread will not participate as producer thread at any time.
230 * Should be used when there's a continues stream of added tasks too a limited queue,
231 * and when completeAllTasks() is never called.
232 */
233template <typename TaskType, typename ConsumerType, typename IdlePolicy>
234class NotParticipating: public IdlePolicy
235{
236protected:
237 NotParticipating(const ConsumerType&): IdlePolicy() {}
238 ~NotParticipating() {}
239 size_t numDynamicThreads(size_t numThreads) const { return numThreads; }
240 template <typename Queue> bool participate(Queue&) { return false; }
241};
242
243
244
245
246template
247<
248 typename TaskType = Callback0,
249 typename ConsumerType = DefaultConsumer<TaskType>,
250 typename IdlePolicy = Signaled,
251 template <typename, typename, typename> class ParticipationPolicy = NotParticipating
252>
253class ThreadPool: public ParticipationPolicy<TaskType, ConsumerType, IdlePolicy>
254{
255public:
256
257 typedef TaskType TTask;
258 typedef ConsumerType TConsumer;
259 typedef IdlePolicy TIdlePolicy;
260 typedef ParticipationPolicy<TaskType, ConsumerType, IdlePolicy> TParticipationPolicy;
261 typedef ThreadPool<TaskType, ConsumerType, IdlePolicy, ParticipationPolicy> TSelf;
262
263 enum
264 {
265 autoNumberOfThreads = 0,
266 unlimitedNumberOfTasks = 0
267 };
268
269 ThreadPool(size_t numberOfThreads = autoNumberOfThreads,
270 size_t maximumNumberOfTasksInQueue = unlimitedNumberOfTasks,
271 const TConsumer& consumerPrototype = TConsumer(),
272 const char* name = 0);
273 ~ThreadPool();
274
275 void addTask(typename util::CallTraits<TTask>::TParam task);
276 void completeAllTasks();
277 void clearQueue();
278 size_t numberOfThreads() const;
279
280private:
281
282 typedef stde::lock_free_queue<TTask> TTaskQueue;
283
284 friend class ConsumerThread;
285
286 class ConsumerThread: public Thread
287 {
288 public:
289 ConsumerThread(const TConsumer& consumer, TSelf& pool, const char* name);
290 size_t bindToNextAvailable(size_t processor);
291 private:
292 void doRun() override;
293 TConsumer consumer_;
294 TSelf& pool_;
295 };
296
297 void startThreads(const TConsumer& consumerPrototype, const char* name);
298 void stopThreads(size_t numAllocatedThreads);
299 void rethrowError();
300
301 TTaskQueue waitingTasks_;
302 std::exception_ptr error_;
303 std::mutex errorMutex_;
304 ConsumerThread* threads_;
305 unsigned long mSecsToSleep_;
306 size_t numThreads_;
307 size_t maxWaitingTasks_;
308 std::atomic<size_t> numWaitingTasks_;
309 std::atomic<size_t> numRunningTasks_;
310 std::atomic<bool> shutDown_;
311 std::atomic<bool> abort_;
312};
313
314
315}
316
317}
318
319#include "thread_pool.inl"
320
321#endif
322
323// EOF
Production-Consumer concurrency pattern.
Non-blocking, lock-free FIFO data structure.
callback with 0 parameter(s) and without returnvalue.
Definition callback_0.h:84
Default consumer calls operator() of task.
implementation of ThreadPool's ParticipationPolicy
implementation of ThreadPool's IdlePolicy
static void bindCurrent(size_t processor)
bind current thread to a processor (current as in callee's context)
Definition thread.cpp:287
static constexpr size_t anyProcessor
argument for Thread::bind to unbind the thread so it runs on any processor
Definition thread.h:210
Library for Assembled Shared Sources.
Definition config.h:53