library of assembled shared sources |
http://lass.cocamware.com |
00001 /** @file 00002 * @author Bram de Greve (bramz@users.sourceforge.net) 00003 * @author Tom De Muer (tomdemuer@users.sourceforge.net) 00004 * 00005 * *** BEGIN LICENSE INFORMATION *** 00006 * 00007 * The contents of this file are subject to the Common Public Attribution License 00008 * Version 1.0 (the "License"); you may not use this file except in compliance with 00009 * the License. You may obtain a copy of the License at 00010 * http://lass.sourceforge.net/cpal-license. The License is based on the 00011 * Mozilla Public License Version 1.1 but Sections 14 and 15 have been added to cover 00012 * use of software over a computer network and provide for limited attribution for 00013 * the Original Developer. In addition, Exhibit A has been modified to be consistent 00014 * with Exhibit B. 00015 * 00016 * Software distributed under the License is distributed on an "AS IS" basis, WITHOUT 00017 * WARRANTY OF ANY KIND, either express or implied. See the License for the specific 00018 * language governing rights and limitations under the License. 00019 * 00020 * The Original Code is LASS - Library of Assembled Shared Sources. 00021 * 00022 * The Initial Developer of the Original Code is Bram de Greve and Tom De Muer. 00023 * The Original Developer is the Initial Developer. 00024 * 00025 * All portions of the code written by the Initial Developer are: 00026 * Copyright (C) 2004-2007 the Initial Developer. 00027 * All Rights Reserved. 00028 * 00029 * Contributor(s): 00030 * 00031 * Alternatively, the contents of this file may be used under the terms of the 00032 * GNU General Public License Version 2 or later (the GPL), in which case the 00033 * provisions of GPL are applicable instead of those above. If you wish to allow use 00034 * of your version of this file only under the terms of the GPL and not to allow 00035 * others to use your version of this file under the CPAL, indicate your decision by 00036 * deleting the provisions above and replace them with the notice and other 00037 * provisions required by the GPL License. If you do not delete the provisions above, 00038 * a recipient may use your version of this file under either the CPAL or the GPL. 00039 * 00040 * *** END LICENSE INFORMATION *** 00041 */ 00042 00043 /** @class ThreadPool 00044 * @ingroup Threading 00045 * @brief Production-Consumer concurrency pattern 00046 * @author Bramz 00047 * 00048 * @code 00049 * #include <lass/util/thread_pool.h> 00050 * #include <lass/util/bind.h> 00051 * using namespace lass::util; 00052 * 00053 * void parallelWork(int begin, int end); 00054 * const int n = 1000000; 00055 * const int step = 1000; 00056 * 00057 * util::ThreadPool<> pool; 00058 * for (int i = 0; i < n; i += step) 00059 * { 00060 * pool.addTask(bind(parallelWork, i, std::min(i + step, n))); 00061 * } 00062 * pool.completeAllTasks(); 00063 * @endcode 00064 * 00065 * @section Good-Practice 00066 * 00067 * Using the policies and constructor parameters, the ThreadPool can be operated in many 00068 * different configurations. However, there are two that are particular interesting: 00069 * 00070 * @subsection 1 fast synchronized thread pool 00071 * 00072 * Best used when short bursts of a bounded number of tasks are followed by completeAllTasks(). 00073 * Example: A single task is splitted in a parallel subtasks, all feeded to the pool at once, 00074 * and completeAllTasks() is called to finish the grand tasks. 00075 00076 * @arg IdlePolicy: Spinning, for a fast reaction time 00077 * @arg ParticipationPolicy: SelfParticipating, so that during completeAllTasks() the cpu cycles 00078 * of the control thread are not wasted by spinning. 00079 * @arg numberOfThreads: autoNumberOfThreads 00080 * @arg maxNumberOfTasksInQueue: unlimitedNumberOfTasks, because the number of tasks in the queue 00081 * can be (and should be) bounded by the application itself, and we want the control thread 00082 * to go in completeAllTasks() ASAP, so it can donate cpu cycles to completing the tasks 00083 * instead of waisting them spinning until another task can be added to the queue. 00084 * 00085 * @subsection 2 slower continues thread pool 00086 * 00087 * Best used when a continues (unbounded) stream of tasks reaches a server. 00088 * 00089 * @arg IdlePolicy: Signaled, to avoid needing an extra cpu for the control thread. 00090 * @arg ParticipationPolicy: NotParticipating, because you'll never call completeAllTasks() 00091 * @arg numberOfThreads: autoNumberOfThreads 00092 * @arg maxNumberOfTasksInQueue: <A LIMITED NUMBER> to avoid an excessive queue size 00093 * 00094 * @section Policies 00095 * 00096 * @subsection IdlePolicy 00097 * 00098 * @code 00099 * class IdlePolicy 00100 * { 00101 * protected: 00102 * void sleepProducer(); 00103 * void wakeProducer(); 00104 * void sleepConsumer(); 00105 * void wakeConsumer(); 00106 * void wakeAllConsumers(); 00107 * }; 00108 * @endcode 00109 * 00110 * @subsection ParticipationPolicy 00111 * 00112 * @code 00113 * template <typename TaskType, typename ConsumerType, typename IdlePolicy> 00114 * class ParticipationPolicy: public IdlePolicy 00115 * { 00116 * protected: 00117 * NotParticipating(const ConsumerType& prototype); 00118 * const unsigned numDynamicThreads(unsigned numThreads) const; 00119 * template <typename Queue> bool participate(Queue&); 00120 * }; 00121 * @endcode 00122 * 00123 */ 00124 00125 #ifndef LASS_GUARDIAN_OF_INCLUSION_UTIL_THREAD_POOL_H 00126 #define LASS_GUARDIAN_OF_INCLUSION_UTIL_THREAD_POOL_H 00127 00128 #include "util_common.h" 00129 #include "thread.h" 00130 #include "callback_0.h" 00131 #include "../stde/lock_free_queue.h" 00132 #include "future.h" 00133 00134 namespace lass 00135 { 00136 namespace util 00137 { 00138 00139 /** Default consumer calls operator() of task. 00140 * @ingroup Threading 00141 * @sa ThreadPool 00142 */ 00143 template <typename TaskType> 00144 class DefaultConsumer 00145 { 00146 public: 00147 void operator()(typename util::CallTraits<TaskType>::TParam task); 00148 }; 00149 00150 /** implementation of ThreadPool's IdlePolicy 00151 * @ingroup Threading 00152 * @sa ThreadPool 00153 * 00154 * Idle threads don't go to sleep, but spin until a condition is met. 00155 * This results in a faster response but it should only be used if each thread 00156 * (including the control thread) can have it's own cpu or core. 00157 */ 00158 class Spinning 00159 { 00160 protected: 00161 Spinning() {} 00162 ~Spinning() {} 00163 void sleepProducer() {} 00164 void wakeProducer() {} 00165 void sleepConsumer() {} 00166 void wakeConsumer() {} 00167 void wakeAllConsumers() {} 00168 }; 00169 00170 /** implementation of ThreadPool's IdlePolicy 00171 * @ingroup Threading 00172 * @sa ThreadPool 00173 * 00174 * Idle threads are put to sleep and signaled to awake when a condition is met. 00175 * It results in a slower response of the threads, but it avoids wasting cpu cycles 00176 * to idle threads. 00177 */ 00178 class Signaled 00179 { 00180 protected: 00181 Signaled() {} 00182 ~Signaled() {} 00183 void sleepProducer() { producer_.wait(mSecsToSleep_); } 00184 void wakeProducer() { producer_.signal(); } 00185 void sleepConsumer() { consumer_.wait(mSecsToSleep_); } 00186 void wakeConsumer() { consumer_.signal(); } 00187 void wakeAllConsumers() { consumer_.broadcast(); } 00188 private: 00189 enum { mSecsToSleep_ = 50 }; 00190 Condition producer_; 00191 Condition consumer_; 00192 }; 00193 00194 /** implementation of ThreadPool's ParticipationPolicy 00195 * @ingroup Threading 00196 * @sa ThreadPool 00197 * 00198 * The control thread participates as producer while waiting to complete all tasks. 00199 * Is best used when short burst of adding a limited number of tasks to an 00200 * unlimited queue, followed by a completeAllTasks(). 00201 */ 00202 template <typename TaskType, typename ConsumerType, typename IdlePolicy> 00203 class SelfParticipating: public IdlePolicy 00204 { 00205 protected: 00206 SelfParticipating(const ConsumerType& prototype): IdlePolicy(), consumer_(prototype) { Thread::bindCurrent(0); } 00207 ~SelfParticipating() { Thread::bindCurrent(Thread::anyProcessor); } 00208 const size_t numDynamicThreads(size_t numThreads) const { return numThreads - 1; } 00209 template <typename Queue> bool participate(Queue& queue) 00210 { 00211 TaskType task; 00212 if (queue.pop(task)) 00213 { 00214 consumer_(task); 00215 return true; 00216 } 00217 return false; 00218 } 00219 private: 00220 ConsumerType consumer_; 00221 }; 00222 00223 /** implementation of ThreadPool's ParticipationPolicy 00224 * @ingroup Threading 00225 * @sa ThreadPool 00226 * 00227 * The control thread will not participate as producer thread at any time. 00228 * Should be used when there's a continues stream of added tasks too a limited queue, 00229 * and when completeAllTasks() is never called. 00230 */ 00231 template <typename TaskType, typename ConsumerType, typename IdlePolicy> 00232 class NotParticipating: public IdlePolicy 00233 { 00234 protected: 00235 NotParticipating(const ConsumerType& prototype): IdlePolicy() {} 00236 ~NotParticipating() {} 00237 const size_t numDynamicThreads(size_t numThreads) const { return numThreads; } 00238 template <typename Queue> bool participate(Queue&) { return false; } 00239 }; 00240 00241 00242 00243 00244 template 00245 < 00246 typename TaskType = Callback0, 00247 typename ConsumerType = DefaultConsumer<TaskType>, 00248 typename IdlePolicy = Signaled, 00249 template <typename, typename, typename> class ParticipationPolicy = NotParticipating 00250 > 00251 class ThreadPool: public ParticipationPolicy<TaskType, ConsumerType, IdlePolicy> 00252 { 00253 public: 00254 00255 typedef TaskType TTask; 00256 typedef ConsumerType TConsumer; 00257 typedef IdlePolicy TIdlePolicy; 00258 typedef ParticipationPolicy<TaskType, ConsumerType, IdlePolicy> TParticipationPolicy; 00259 typedef ThreadPool<TaskType, ConsumerType, IdlePolicy, ParticipationPolicy> TSelf; 00260 00261 enum 00262 { 00263 autoNumberOfThreads = 0, 00264 unlimitedNumberOfTasks = 0 00265 }; 00266 00267 ThreadPool(size_t numberOfThreads = autoNumberOfThreads, 00268 size_t maximumNumberOfTasksInQueue = unlimitedNumberOfTasks, 00269 const TConsumer& consumerPrototype = TConsumer(), 00270 const char* name = 0); 00271 ~ThreadPool(); 00272 00273 void addTask(typename util::CallTraits<TTask>::TParam task); 00274 void completeAllTasks(); 00275 void clearQueue(); 00276 const size_t numberOfThreads() const; 00277 00278 private: 00279 00280 typedef stde::lock_free_queue<TTask> TTaskQueue; 00281 00282 friend class ConsumerThread; 00283 00284 class ConsumerThread: public Thread 00285 { 00286 public: 00287 ConsumerThread(const TConsumer& consumer, TSelf& pool, const char* name); 00288 private: 00289 void doRun(); 00290 TConsumer consumer_; 00291 TSelf& pool_; 00292 }; 00293 00294 void startThreads(const TConsumer& consumerPrototype, const char* name); 00295 void stopThreads(size_t numAllocatedThreads); 00296 00297 TTaskQueue waitingTasks_; 00298 std::auto_ptr<experimental::RemoteExceptionBase> error_; 00299 ConsumerThread* threads_; 00300 unsigned long mSecsToSleep_; 00301 size_t numThreads_; 00302 size_t maxWaitingTasks_; 00303 volatile size_t numWaitingTasks_; 00304 volatile size_t numRunningTasks_; 00305 volatile bool shutDown_; 00306 volatile bool abort_; 00307 }; 00308 00309 00310 } 00311 00312 } 00313 00314 #include "thread_pool.inl" 00315 00316 #endif 00317 00318 // EOF
Generated on Mon Nov 10 14:21:40 2008 for Library of Assembled Shared Sources by 1.5.7.1 |