library of assembled shared sources

http://lass.cocamware.com

thread_pool.h

Go to the documentation of this file.
00001 /** @file
00002  *  @author Bram de Greve (bramz@users.sourceforge.net)
00003  *  @author Tom De Muer (tomdemuer@users.sourceforge.net)
00004  *
00005  *  *** BEGIN LICENSE INFORMATION ***
00006  *  
00007  *  The contents of this file are subject to the Common Public Attribution License 
00008  *  Version 1.0 (the "License"); you may not use this file except in compliance with 
00009  *  the License. You may obtain a copy of the License at 
00010  *  http://lass.sourceforge.net/cpal-license. The License is based on the 
00011  *  Mozilla Public License Version 1.1 but Sections 14 and 15 have been added to cover 
00012  *  use of software over a computer network and provide for limited attribution for 
00013  *  the Original Developer. In addition, Exhibit A has been modified to be consistent 
00014  *  with Exhibit B.
00015  *  
00016  *  Software distributed under the License is distributed on an "AS IS" basis, WITHOUT 
00017  *  WARRANTY OF ANY KIND, either express or implied. See the License for the specific 
00018  *  language governing rights and limitations under the License.
00019  *  
00020  *  The Original Code is LASS - Library of Assembled Shared Sources.
00021  *  
00022  *  The Initial Developer of the Original Code is Bram de Greve and Tom De Muer.
00023  *  The Original Developer is the Initial Developer.
00024  *  
00025  *  All portions of the code written by the Initial Developer are:
00026  *  Copyright (C) 2004-2007 the Initial Developer.
00027  *  All Rights Reserved.
00028  *  
00029  *  Contributor(s):
00030  *
00031  *  Alternatively, the contents of this file may be used under the terms of the 
00032  *  GNU General Public License Version 2 or later (the GPL), in which case the 
00033  *  provisions of GPL are applicable instead of those above.  If you wish to allow use
00034  *  of your version of this file only under the terms of the GPL and not to allow 
00035  *  others to use your version of this file under the CPAL, indicate your decision by 
00036  *  deleting the provisions above and replace them with the notice and other 
00037  *  provisions required by the GPL License. If you do not delete the provisions above,
00038  *  a recipient may use your version of this file under either the CPAL or the GPL.
00039  *  
00040  *  *** END LICENSE INFORMATION ***
00041  */
00042 
00043 /** @class ThreadPool
00044  *  @ingroup Threading
00045  *  @brief Production-Consumer concurrency pattern
00046  *  @author Bramz
00047  *
00048  *  @code
00049  *  #include <lass/util/thread_pool.h>
00050  *  #include <lass/util/bind.h>
00051  *  using namespace lass::util;
00052  *
00053  *  void parallelWork(int begin, int end);
00054  *  const int n = 1000000;
00055  *  const int step = 1000;
00056  *
00057  *  util::ThreadPool<> pool;
00058  *  for (int i = 0; i < n; i += step)
00059  *  {
00060  *      pool.addTask(bind(parallelWork, i, std::min(i + step, n)));
00061  *  }
00062  *  pool.completeAllTasks();
00063  *  @endcode
00064  * 
00065  *  @section Good-Practice
00066  *
00067  *  Using the policies and constructor parameters, the ThreadPool can be operated in many
00068  *  different configurations.  However, there are two that are particular interesting:
00069  *
00070  *  @subsection 1 fast synchronized thread pool
00071  *
00072  *  Best used when short bursts of a bounded number of tasks are followed by completeAllTasks().
00073  *  Example: A single task is splitted in a parallel subtasks, all feeded to the pool at once,
00074  *      and completeAllTasks() is called to finish the grand tasks. 
00075  
00076  *  @arg IdlePolicy: Spinning, for a fast reaction time
00077  *  @arg ParticipationPolicy: SelfParticipating, so that during completeAllTasks() the cpu cycles 
00078  *      of the control thread are not wasted by spinning.
00079  *  @arg numberOfThreads: autoNumberOfThreads
00080  *  @arg maxNumberOfTasksInQueue: unlimitedNumberOfTasks, because the number of tasks in the queue
00081  *      can be (and should be) bounded by the application itself, and we want the control thread
00082  *      to go in completeAllTasks() ASAP, so it can donate cpu cycles to completing the tasks
00083  *      instead of waisting them spinning until another task can be added to the queue.
00084  *
00085  *  @subsection 2 slower continues thread pool
00086  *
00087  *  Best used when a continues (unbounded) stream of tasks reaches a server.
00088  *
00089  *  @arg IdlePolicy: Signaled, to avoid needing an extra cpu for the control thread.
00090  *  @arg ParticipationPolicy: NotParticipating, because you'll never call completeAllTasks()
00091  *  @arg numberOfThreads: autoNumberOfThreads
00092  *  @arg maxNumberOfTasksInQueue: <A LIMITED NUMBER> to avoid an excessive queue size
00093  *
00094  *  @section Policies
00095  *
00096  *  @subsection IdlePolicy
00097  *
00098  *  @code
00099  *  class IdlePolicy
00100  *  {
00101  *  protected:
00102  *      void sleepProducer();
00103  *      void wakeProducer();
00104  *      void sleepConsumer();
00105  *      void wakeConsumer();
00106  *      void wakeAllConsumers();
00107  *  };
00108  *  @endcode
00109  *
00110  *  @subsection ParticipationPolicy
00111  *
00112  *  @code
00113  *  template <typename TaskType, typename ConsumerType, typename IdlePolicy>
00114  *  class ParticipationPolicy: public IdlePolicy
00115  *  {
00116  *  protected:
00117  *      NotParticipating(const ConsumerType& prototype);
00118  *      const unsigned numDynamicThreads(unsigned numThreads) const;
00119  *      template <typename Queue> bool participate(Queue&);
00120  *  };
00121  *  @endcode
00122  *
00123  */
00124 
00125 #ifndef LASS_GUARDIAN_OF_INCLUSION_UTIL_THREAD_POOL_H
00126 #define LASS_GUARDIAN_OF_INCLUSION_UTIL_THREAD_POOL_H
00127 
00128 #include "util_common.h"
00129 #include "thread.h"
00130 #include "callback_0.h"
00131 #include "../stde/lock_free_queue.h"
00132 #include "future.h"
00133 
00134 namespace lass
00135 {
00136 namespace util
00137 {
00138 
00139 /** Default consumer calls operator() of task.
00140  *  @ingroup Threading
00141  *  @sa ThreadPool
00142  */
00143 template <typename TaskType>
00144 class DefaultConsumer
00145 {
00146 public:
00147     void operator()(typename util::CallTraits<TaskType>::TParam task);
00148 };
00149 
00150 /** implementation of ThreadPool's IdlePolicy
00151  *  @ingroup Threading
00152  *  @sa ThreadPool
00153  *
00154  *  Idle threads don't go to sleep, but spin until a condition is met.
00155  *  This results in a faster response but it should only be used if each thread 
00156  *  (including the control thread) can have it's own cpu or core.
00157  */
00158 class Spinning
00159 {
00160 protected:
00161     Spinning() {}
00162     ~Spinning() {}
00163     void sleepProducer() {}
00164     void wakeProducer() {}
00165     void sleepConsumer() {}
00166     void wakeConsumer() {}
00167     void wakeAllConsumers() {}
00168 };
00169 
00170 /** implementation of ThreadPool's IdlePolicy
00171  *  @ingroup Threading
00172  *  @sa ThreadPool
00173  *
00174  *  Idle threads are put to sleep and signaled to awake when a condition is met.
00175  *  It results in a slower response of the threads, but it avoids wasting cpu cycles
00176  *  to idle threads.
00177  */
00178 class Signaled
00179 {
00180 protected:
00181     Signaled() {}
00182     ~Signaled() {}
00183     void sleepProducer() { producer_.wait(mSecsToSleep_); }
00184     void wakeProducer() { producer_.signal(); }
00185     void sleepConsumer() { consumer_.wait(mSecsToSleep_); }
00186     void wakeConsumer() { consumer_.signal(); }
00187     void wakeAllConsumers() { consumer_.broadcast(); }
00188 private:
00189     enum { mSecsToSleep_ = 50 };
00190     Condition producer_;
00191     Condition consumer_;
00192 };
00193 
00194 /** implementation of ThreadPool's ParticipationPolicy
00195  *  @ingroup Threading
00196  *  @sa ThreadPool
00197  *
00198  *  The control thread participates as producer while waiting to complete all tasks.
00199  *  Is best used when short burst of adding a limited number of tasks to an 
00200  *  unlimited queue, followed by a completeAllTasks().
00201  */
00202 template <typename TaskType, typename ConsumerType, typename IdlePolicy>
00203 class SelfParticipating: public IdlePolicy
00204 {
00205 protected:
00206     SelfParticipating(const ConsumerType& prototype): IdlePolicy(), consumer_(prototype) { Thread::bindCurrent(0); }
00207     ~SelfParticipating() { Thread::bindCurrent(Thread::anyProcessor); }
00208     const size_t numDynamicThreads(size_t numThreads) const { return numThreads - 1; }
00209     template <typename Queue> bool participate(Queue& queue)
00210     {
00211         TaskType task;
00212         if (queue.pop(task))
00213         {
00214             consumer_(task);
00215             return true;
00216         }
00217         return false;
00218     }       
00219 private:
00220     ConsumerType consumer_;
00221 };
00222 
00223 /** implementation of ThreadPool's ParticipationPolicy
00224  *  @ingroup Threading
00225  *  @sa ThreadPool
00226  *
00227  *  The control thread will not participate as producer thread at any time.
00228  *  Should be used when there's a continues stream of added tasks too a limited queue,
00229  *  and when completeAllTasks() is never called.
00230  */
00231 template <typename TaskType, typename ConsumerType, typename IdlePolicy>
00232 class NotParticipating: public IdlePolicy
00233 {
00234 protected:
00235     NotParticipating(const ConsumerType& prototype): IdlePolicy() {}
00236     ~NotParticipating() {}
00237     const size_t numDynamicThreads(size_t numThreads) const { return numThreads; }
00238     template <typename Queue> bool participate(Queue&) { return false; }
00239 };
00240 
00241 
00242 
00243 
00244 template 
00245 <
00246     typename TaskType = Callback0, 
00247     typename ConsumerType = DefaultConsumer<TaskType>,
00248     typename IdlePolicy = Signaled,
00249     template <typename, typename, typename> class ParticipationPolicy = NotParticipating
00250 >
00251 class ThreadPool: public ParticipationPolicy<TaskType, ConsumerType, IdlePolicy>
00252 {
00253 public:
00254     
00255     typedef TaskType TTask;
00256     typedef ConsumerType TConsumer;
00257     typedef IdlePolicy TIdlePolicy;
00258     typedef ParticipationPolicy<TaskType, ConsumerType, IdlePolicy> TParticipationPolicy;
00259     typedef ThreadPool<TaskType, ConsumerType, IdlePolicy, ParticipationPolicy> TSelf;
00260 
00261     enum 
00262     { 
00263         autoNumberOfThreads = 0,
00264         unlimitedNumberOfTasks = 0
00265     };
00266 
00267     ThreadPool(size_t numberOfThreads = autoNumberOfThreads, 
00268         size_t maximumNumberOfTasksInQueue = unlimitedNumberOfTasks, 
00269         const TConsumer& consumerPrototype = TConsumer(),
00270         const char* name = 0);
00271     ~ThreadPool();
00272 
00273     void addTask(typename util::CallTraits<TTask>::TParam task);
00274     void completeAllTasks();
00275     void clearQueue();
00276     const size_t numberOfThreads() const;
00277 
00278 private:
00279 
00280     typedef stde::lock_free_queue<TTask> TTaskQueue;
00281 
00282     friend class ConsumerThread;
00283 
00284     class ConsumerThread: public Thread
00285     {
00286     public:
00287         ConsumerThread(const TConsumer& consumer, TSelf& pool, const char* name);
00288     private:
00289         void doRun();
00290         TConsumer consumer_;
00291         TSelf& pool_;
00292     };
00293 
00294     void startThreads(const TConsumer& consumerPrototype, const char* name);
00295     void stopThreads(size_t numAllocatedThreads);
00296 
00297     TTaskQueue waitingTasks_;
00298     std::auto_ptr<experimental::RemoteExceptionBase> error_;
00299     ConsumerThread* threads_;
00300     unsigned long mSecsToSleep_;
00301     size_t numThreads_;
00302     size_t maxWaitingTasks_;
00303     volatile size_t numWaitingTasks_;
00304     volatile size_t numRunningTasks_;
00305     volatile bool shutDown_;
00306     volatile bool abort_;
00307 };
00308 
00309 
00310 }
00311 
00312 }
00313 
00314 #include "thread_pool.inl"
00315 
00316 #endif
00317 
00318 // EOF

Generated on Mon Nov 10 14:21:40 2008 for Library of Assembled Shared Sources by doxygen 1.5.7.1
SourceForge.net Logo