00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043 #if LASS_COMPILER_TYPE == LASS_COMPILER_TYPE_MSVC
00044 # pragma warning(push)
00045 # pragma warning(disable: 4267) // conversion from 'size_t' to 'lass::num::Tuint32', possible loss of data
00046 # pragma warning(disable: 4996) // this function or variable may be unsafe
00047 #endif
00048
00049 namespace lass
00050 {
00051 namespace util
00052 {
00053
00054
00055
00056
00057
00058
00059
00060
00061 template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
00062 ThreadPool<T, C, IP, PP>::ThreadPool(
00063 size_t numberOfThreads,
00064 size_t maximumNumberOfTasksInQueue,
00065 const TConsumer& consumerPrototype,
00066 const char* name):
00067 TParticipationPolicy(consumerPrototype),
00068 threads_(0),
00069 numThreads_(numberOfThreads == autoNumberOfThreads ? numberOfProcessors : numberOfThreads),
00070 maxWaitingTasks_(maximumNumberOfTasksInQueue),
00071 numWaitingTasks_(0),
00072 numRunningTasks_(0),
00073 shutDown_(false),
00074 abort_(false)
00075 {
00076 LASS_ENFORCE(numThreads_ > 0);
00077 startThreads(consumerPrototype, name);
00078 }
00079
00080
00081
00082 template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
00083 ThreadPool<T, C, IP, PP>::~ThreadPool()
00084 {
00085 try
00086 {
00087 completeAllTasks();
00088 }
00089 LASS_CATCH_TO_WARNING
00090 stopThreads(this->numDynamicThreads(numThreads_));
00091 }
00092
00093
00094
00095
00096
00097
00098
00099 template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
00100 void ThreadPool<T, C, IP, PP>::addTask(typename util::CallTraits<TTask>::TParam task)
00101 {
00102 while (true)
00103 {
00104 size_t numWaitingTasks = numWaitingTasks_;
00105 if (maxWaitingTasks_ == unlimitedNumberOfTasks || numWaitingTasks < maxWaitingTasks_)
00106 {
00107 if (util::atomicCompareAndSwap(
00108 numWaitingTasks_, numWaitingTasks, numWaitingTasks + 1))
00109 {
00110 waitingTasks_.push(task);
00111 this->wakeConsumer();
00112 return;
00113 }
00114 }
00115 this->sleepProducer();
00116 if (error_.get())
00117 {
00118 abort_ = true;
00119 error_->throwSelf();
00120 }
00121 }
00122 }
00123
00124
00125
00126
00127
00128
00129 template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
00130 void ThreadPool<T, C, IP, PP>::completeAllTasks()
00131 {
00132 while (numWaitingTasks_ > 0 || numRunningTasks_ > 0)
00133 {
00134 if (this->participate(waitingTasks_))
00135 {
00136 atomicDecrement(numWaitingTasks_);
00137 }
00138 this->sleepProducer();
00139 if (error_.get())
00140 {
00141 abort_ = true;
00142 error_->throwSelf();
00143 }
00144 }
00145 }
00146
00147
00148
00149
00150
00151
00152 template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
00153 void ThreadPool<T, C, IP, PP>::clearQueue()
00154 {
00155 TTask dummy;
00156 while (waitingTasks_.pop(dummy))
00157 {
00158 }
00159 }
00160
00161
00162
00163 template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
00164 const size_t ThreadPool<T, C, IP, PP>::numberOfThreads() const
00165 {
00166 return numThreads_;
00167 }
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178 template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
00179 void ThreadPool<T, C, IP, PP>::startThreads(const TConsumer& consumerPrototype, const char* name)
00180 {
00181 LASS_ASSERT(numThreads_ > 0);
00182 const size_t dynThreads = this->numDynamicThreads(numThreads_);
00183 const size_t bindOffset = numThreads_ - dynThreads;
00184 if (dynThreads == 0)
00185 {
00186 threads_ = 0;
00187 return;
00188 }
00189
00190 const size_t bufferSize = 10;
00191 char nameBuffer[bufferSize] = "consumers";
00192 if (name)
00193 {
00194 strncpy(nameBuffer, name, bufferSize);
00195 }
00196 nameBuffer[bufferSize - 1] = '\0';
00197 const size_t length = strlen(nameBuffer);
00198 const size_t indexLength = 2;
00199 char* indexBuffer = nameBuffer + std::min(length, bufferSize - indexLength - 1);
00200
00201 threads_ = static_cast<ConsumerThread*>(malloc(dynThreads * sizeof(ConsumerThread)));
00202 if (!threads_)
00203 {
00204 throw std::bad_alloc();
00205 }
00206
00207 size_t i;
00208 try
00209 {
00210 for (i = 0; i < dynThreads; ++i)
00211 {
00212
00213 #if LASS_COMPILER_TYPE == LASS_COMPILER_TYPE_MSVC
00214 _snprintf(
00215 #else
00216 snprintf(
00217 #endif
00218 indexBuffer, indexLength + 1, "%02X", int(i & 0xff));
00219 indexBuffer[indexLength] = '\0';
00220
00221 new (&threads_[i]) ConsumerThread(consumerPrototype, *this, nameBuffer);
00222 try
00223 {
00224 threads_[i].run();
00225 threads_[i].bind((bindOffset + i) % numberOfProcessors);
00226 }
00227 catch (...)
00228 {
00229 threads_[i].~ConsumerThread();
00230 throw;
00231 }
00232 }
00233 }
00234 catch (...)
00235 {
00236 stopThreads(i);
00237 throw;
00238 }
00239 }
00240
00241
00242
00243
00244
00245
00246 template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
00247 void ThreadPool<T, C, IP, PP>::stopThreads(size_t numAllocatedThreads)
00248 {
00249 shutDown_ = true;
00250 try
00251 {
00252 this->wakeAllConsumers();
00253 }
00254 LASS_CATCH_TO_WARNING
00255
00256 LASS_ASSERT(static_cast<int>(numAllocatedThreads) >= 0);
00257 for (size_t n = numAllocatedThreads; n > 0; --n)
00258 {
00259 try
00260 {
00261 threads_[n - 1].join();
00262 }
00263 LASS_CATCH_TO_WARNING
00264 threads_[n - 1].~ConsumerThread();
00265 }
00266
00267 free(threads_);
00268 }
00269
00270
00271
00272
00273
00274 template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
00275 ThreadPool<T, C, IP, PP>::ConsumerThread::ConsumerThread(
00276 const TConsumer& consumer, TSelf& pool, const char* name):
00277 Thread(threadJoinable, name),
00278 consumer_(consumer),
00279 pool_(pool)
00280 {
00281 }
00282
00283
00284
00285 #define LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(exception_type)\
00286 catch (const exception_type& error)\
00287 {\
00288 pool_.error_.reset(new experimental::RemoteExceptionWrapper<exception_type>(error));\
00289 return;\
00290 }
00291
00292 template <typename T, typename C, typename IP, template <typename, typename, typename> class PP>
00293 void ThreadPool<T, C, IP, PP>::ConsumerThread::doRun()
00294 {
00295 TTask task;
00296 while (!pool_.abort_)
00297 {
00298 if (pool_.waitingTasks_.pop(task))
00299 {
00300 util::atomicIncrement(pool_.numRunningTasks_);
00301 util::atomicDecrement(pool_.numWaitingTasks_);
00302 pool_.wakeProducer();
00303 try
00304 {
00305 consumer_(task);
00306 }
00307 catch (const experimental::RemoteExceptionBase& error)
00308 {
00309
00310 std::auto_ptr<experimental::RemoteExceptionBase> temp = error.clone();
00311 pool_.error_ = temp;
00312 return;
00313 }
00314 LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(::std::domain_error)
00315 LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(::std::invalid_argument)
00316 LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(::std::length_error)
00317 LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(::std::out_of_range)
00318 LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(::std::range_error)
00319 LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(::std::overflow_error)
00320 LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(::std::underflow_error)
00321 LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(::std::runtime_error)
00322 LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(::std::logic_error)
00323 LASS_UTIL_THREAD_POOL_CATCH_AND_WRAP(::std::exception)
00324 util::atomicDecrement(pool_.numRunningTasks_);
00325 }
00326 else if (pool_.shutDown_)
00327 {
00328 return;
00329 }
00330 else
00331 {
00332 pool_.sleepConsumer();
00333 }
00334 }
00335 }
00336
00337
00338
00339 template <typename T> inline
00340 void DefaultConsumer<T>::operator()(typename util::CallTraits<T>::TParam task)
00341 {
00342 task();
00343 }
00344
00345
00346
00347 }
00348
00349 }
00350
00351 #if LASS_COMPILER_TYPE == LASS_COMPILER_TYPE_MSVC
00352 # pragma warning(pop)
00353 #endif
00354
00355