library of assembled shared sources

http://lass.cocamware.com

thread_pool.inl

Go to the documentation of this file.
00001 /** @file
00002  *  @author Bram de Greve (bramz@users.sourceforge.net)
00003  *  @author Tom De Muer (tomdemuer@users.sourceforge.net)
00004  *
00005  *  *** BEGIN LICENSE INFORMATION ***
00006  *  
00007  *  The contents of this file are subject to the Common Public Attribution License 
00008  *  Version 1.0 (the "License"); you may not use this file except in compliance with 
00009  *  the License. You may obtain a copy of the License at 
00010  *  http://lass.sourceforge.net/cpal-license. The License is based on the 
00011  *  Mozilla Public License Version 1.1 but Sections 14 and 15 have been added to cover 
00012  *  use of software over a computer network and provide for limited attribution for 
00013  *  the Original Developer. In addition, Exhibit A has been modified to be consistent 
00014  *  with Exhibit B.
00015  *  
00016  *  Software distributed under the License is distributed on an "AS IS" basis, WITHOUT 
00017  *  WARRANTY OF ANY KIND, either express or implied. See the License for the specific 
00018  *  language governing rights and limitations under the License.
00019  *  
00020  *  The Original Code is LASS - Library of Assembled Shared Sources.
00021  *  
00022  *  The Initial Developer of the Original Code is Bram de Greve and Tom De Muer.
00023  *  The Original Developer is the Initial Developer.
00024  *  
00025  *  All portions of the code written by the Initial Developer are:
00026  *  Copyright (C) 2004-2007 the Initial Developer.
00027  *  All Rights Reserved.
00028  *  
00029  *  Contributor(s):
00030  *
00031  *  Alternatively, the contents of this file may be used under the terms of the 
00032  *  GNU General Public License Version 2 or later (the GPL), in which case the 
00033  *  provisions of GPL are applicable instead of those above.  If you wish to allow use
00034  *  of your version of this file only under the terms of the GPL and not to allow 
00035  *  others to use your version of this file under the CPAL, indicate your decision by 
00036  *  deleting the provisions above and replace them with the notice and other 
00037  *  provisions required by the GPL License. If you do not delete the provisions above,
00038  *  a recipient may use your version of this file under either the CPAL or the GPL.
00039  *  
00040  *  *** END LICENSE INFORMATION ***
00041  */
00042 
00043 #if LASS_COMPILER_TYPE == LASS_COMPILER_TYPE_MSVC
00044 #   pragma warning(push)
00045 #   pragma warning(disable: 4267) // conversion from 'size_t' to 'lass::num::Tuint32', possible loss of data
00046 #   pragma warning(disable: 4996) // this function or variable may be unsafe
00047 #endif
00048 
00049 namespace lass
00050 {
00051 namespace util
00052 {
00053 
00054 // --- public --------------------------------------------------------------------------------------
00055 
00056 /** @param iNumberOfThreads specify number of producer threads.  Specify @a autoNumberOfThreads
00057  *      to automatically use as many threads as processors.
00058  *  @param mNumberOfTasksInQueue specifiy the maximum number of tasks that may be waiting in the
00059  *      queue.  Specify @a unlimitedNumberOfTasks to have an unlimited queue.
00060  */
00061 template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
00062 ThreadPool<T, C, IP, PP>::ThreadPool(
00063         size_t numberOfThreads, 
00064         size_t maximumNumberOfTasksInQueue,
00065         const TConsumer& consumerPrototype,
00066         const char* name):
00067     TParticipationPolicy(consumerPrototype),
00068     threads_(0),
00069     numThreads_(numberOfThreads == autoNumberOfThreads ? numberOfProcessors : numberOfThreads),
00070     maxWaitingTasks_(maximumNumberOfTasksInQueue),
00071     numWaitingTasks_(0),
00072     numRunningTasks_(0),
00073     shutDown_(false),
00074     abort_(false)
00075 {
00076     LASS_ENFORCE(numThreads_ > 0);
00077     startThreads(consumerPrototype, name);
00078 }
00079 
00080 
00081 
00082 template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
00083 ThreadPool<T, C, IP, PP>::~ThreadPool()
00084 {
00085     try
00086     {
00087         completeAllTasks();
00088     }
00089     LASS_CATCH_TO_WARNING
00090     stopThreads(this->numDynamicThreads(numThreads_));
00091 }
00092 
00093 
00094 
00095 /** submit a task to the pool, and block if queue is full.
00096  *  Function waits until tasks can be added to queue without participating as producer 
00097  *      (in case of SelfParticipating).
00098  */
00099 template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
00100 void ThreadPool<T, C, IP, PP>::addTask(typename util::CallTraits<TTask>::TParam task)
00101 {
00102     while (true)
00103     {
00104         size_t numWaitingTasks = numWaitingTasks_;
00105         if (maxWaitingTasks_ == unlimitedNumberOfTasks || numWaitingTasks < maxWaitingTasks_)
00106         {
00107             if (util::atomicCompareAndSwap(
00108                 numWaitingTasks_, numWaitingTasks, numWaitingTasks + 1))
00109             {
00110                 waitingTasks_.push(task);
00111                 this->wakeConsumer();
00112                 return;
00113             }
00114         }
00115         this->sleepProducer();
00116         if (error_.get())
00117         {
00118             abort_ = true;
00119             error_->throwSelf();
00120         }
00121     }
00122 }
00123 
00124 
00125 
00126 /** blocks until all tasks in the queue are completed
00127  *  control thread participates as consumer if policy allows for it.
00128  */
00129 template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
00130 void ThreadPool<T, C, IP, PP>::completeAllTasks()
00131 {
00132     while (numWaitingTasks_ > 0 || numRunningTasks_ > 0)
00133     {
00134         if (this->participate(waitingTasks_))
00135         {
00136             atomicDecrement(numWaitingTasks_);
00137         }
00138         this->sleepProducer();
00139         if (error_.get())
00140         {
00141             abort_ = true;
00142             error_->throwSelf();
00143         }
00144     }
00145 }
00146 
00147 
00148 
00149 /** clear queue without completing tasks.
00150  *  All waiting tasks in the queue are simply thrown away without ever being completed.
00151  */
00152 template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
00153 void ThreadPool<T, C, IP, PP>::clearQueue()
00154 {
00155     TTask dummy;
00156     while (waitingTasks_.pop(dummy))
00157     {
00158     }
00159 }
00160 
00161 
00162 
00163 template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
00164 const size_t ThreadPool<T, C, IP, PP>::numberOfThreads() const
00165 {
00166     return numThreads_;
00167 }
00168 
00169 
00170 
00171 // --- private -------------------------------------------------------------------------------------
00172 
00173 /** Allocate a bunch of threads and run them.
00174  *  We can't simply use a std::vector or an C-array, because the threads are neither
00175  *  copy-constructable or default constructable.  So we need to do our own housekeeping.
00176  *  Don't worry folks, I know what I'm doing ;) [Bramz]
00177  */
00178 template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
00179 void ThreadPool<T, C, IP, PP>::startThreads(const TConsumer& consumerPrototype, const char* name)
00180 {
00181     LASS_ASSERT(numThreads_ > 0);
00182     const size_t dynThreads = this->numDynamicThreads(numThreads_);
00183     const size_t bindOffset = numThreads_ - dynThreads; // bind dynamic threads to "upper bits"
00184     if (dynThreads == 0)
00185     {
00186         threads_ = 0;
00187         return;
00188     }
00189 
00190     const size_t bufferSize = 10;
00191     char nameBuffer[bufferSize] = "consumers";
00192     if (name)
00193     {
00194         strncpy(nameBuffer, name, bufferSize);
00195     }
00196     nameBuffer[bufferSize - 1] = '\0';
00197     const size_t length = strlen(nameBuffer);
00198     const size_t indexLength = 2;
00199     char* indexBuffer = nameBuffer + std::min(length, bufferSize - indexLength - 1);    
00200 
00201     threads_ = static_cast<ConsumerThread*>(malloc(dynThreads * sizeof(ConsumerThread)));
00202     if (!threads_)
00203     {
00204         throw std::bad_alloc();
00205     }
00206 
00207     size_t i;
00208     try
00209     {
00210         for (i = 0; i < dynThreads; ++i)
00211         {
00212 
00213 #if LASS_COMPILER_TYPE == LASS_COMPILER_TYPE_MSVC
00214             _snprintf(
00215 #else
00216             snprintf(
00217 #endif
00218                 indexBuffer, indexLength + 1, "%02X", int(i & 0xff));
00219             indexBuffer[indexLength] = '\0';
00220 
00221             new (&threads_[i]) ConsumerThread(consumerPrototype, *this, nameBuffer);
00222             try
00223             {
00224                 threads_[i].run();
00225                 threads_[i].bind((bindOffset + i) % numberOfProcessors);
00226             }
00227             catch (...)
00228             {
00229                 threads_[i].~ConsumerThread();
00230                 throw;
00231             }
00232         }
00233     }
00234     catch (...)
00235     {
00236         stopThreads(i); // i == number of threads already started.
00237         throw;
00238     }
00239 }
00240 
00241 
00242 
00243 /** Deallocate the threads and free memory.
00244  *  @throw no exceptions should be throw, nor bubble up from this function ... ever!
00245  */
00246 template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
00247 void ThreadPool<T, C, IP, PP>::stopThreads(size_t numAllocatedThreads)
00248 {
00249     shutDown_ = true;
00250     try
00251     {
00252         this->wakeAllConsumers();
00253     }
00254     LASS_CATCH_TO_WARNING
00255 
00256     LASS_ASSERT(static_cast<int>(numAllocatedThreads) >= 0);
00257     for (size_t n = numAllocatedThreads; n > 0; --n)
00258     {
00259         try
00260         {
00261             threads_[n - 1].join();
00262         }
00263         LASS_CATCH_TO_WARNING
00264         threads_[n - 1].~ConsumerThread(); // shouldn't throw anyway ...
00265     }
00266 
00267     free(threads_); // shouldn't throw
00268 }
00269 
00270 
00271 
00272 // --- ConsumerThread ------------------------------------------------------------------------------
00273 
00274 template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
00275 ThreadPool<T, C, IP, PP>::ConsumerThread::ConsumerThread(
00276         const TConsumer& consumer, TSelf& pool, const char* name):
00277     Thread(threadJoinable, name), 
00278     consumer_(consumer),
00279     pool_(pool) 
00280 {
00281 }
00282 
00283 
00284 
00285 #define LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(exception_type)\
00286 catch (const exception_type& error)\
00287 {\
00288     pool_.error_.reset(new experimental::RemoteExceptionWrapper<exception_type>(error));\
00289     return;\
00290 }
00291 
00292 template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
00293 void ThreadPool<T, C, IP, PP>::ConsumerThread::doRun()
00294 {
00295     TTask task;
00296     while (!pool_.abort_)
00297     {
00298         if (pool_.waitingTasks_.pop(task))
00299         {
00300             util::atomicIncrement(pool_.numRunningTasks_);
00301             util::atomicDecrement(pool_.numWaitingTasks_);
00302             pool_.wakeProducer();
00303             try
00304             {
00305                 consumer_(task);
00306             }
00307             catch (const experimental::RemoteExceptionBase& error)
00308             {
00309                 // two phase copy assignment to avoid warning on VC7.1 [Bramz]
00310                 std::auto_ptr<experimental::RemoteExceptionBase> temp = error.clone();
00311                 pool_.error_ = temp;
00312                 return;
00313             }
00314             LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(::std::domain_error)
00315             LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(::std::invalid_argument)
00316             LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(::std::length_error)
00317             LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(::std::out_of_range)
00318             LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(::std::range_error)
00319             LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(::std::overflow_error)
00320             LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(::std::underflow_error)
00321             LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(::std::runtime_error)
00322             LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(::std::logic_error)
00323             LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(::std::exception)
00324             util::atomicDecrement(pool_.numRunningTasks_);
00325         }
00326         else if (pool_.shutDown_)
00327         {
00328             return;
00329         }
00330         else
00331         {
00332             pool_.sleepConsumer();
00333         }
00334     }
00335 }
00336 
00337 // --- DefaultConsumer -----------------------------------------------------------------------------
00338 
00339 template <typename T> inline
00340 void DefaultConsumer<T>::operator()(typename util::CallTraits<T>::TParam task)
00341 {
00342     task();
00343 }
00344 
00345 
00346 
00347 }
00348 
00349 }
00350 
00351 #if LASS_COMPILER_TYPE == LASS_COMPILER_TYPE_MSVC
00352 #   pragma warning(pop)
00353 #endif
00354 
00355 // EOF

Generated on Mon Nov 10 14:21:40 2008 for Library of Assembled Shared Sources by doxygen 1.5.7.1
SourceForge.net Logo