thread_posix.inl
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043 #ifndef LASS_GUARDIAN_OF_INCLUSION_UTIL_IMPL_THREAD_POSIX_INL
00044 #define LASS_GUARDIAN_OF_INCLUSION_UTIL_IMPL_THREAD_POSIX_INL
00045
00046 #include "../util_common.h"
00047 #include "../thread.h"
00048 #include "../../stde/extended_string.h"
00049 #include "lass_errno.h"
00050 #include <errno.h>
00051 #include <pthread.h>
00052 #include <sched.h>
00053
00054 namespace lass
00055 {
00056 namespace util
00057 {
00058 namespace impl
00059 {
00060
00061 unsigned numberOfProcessors()
00062 {
00063
00064
00065 cpu_set_t mask;
00066 CPU_ZERO(&mask);
00067 LASS_ENFORCE_CLIB(sched_getaffinity(0, sizeof(cpu_set_t), &mask));
00068
00069 unsigned count = 0;
00070 int i = 0;
00071 while (CPU_ISSET(i++, &mask))
00072 {
00073 ++count;
00074 }
00075
00076
00077
00078
00079
00080 while (i < CPU_SETSIZE)
00081 {
00082 if (CPU_ISSET(i++, &mask))
00083 {
00084 std::cerr << "[LASS RUN MSG] UNDEFINED BEHAVIOUR: "
00085 "numberOfProcessors' assumption is wrong!\n" << std::endl;
00086 }
00087 }
00088
00089 return count;
00090 }
00091
00092
00093
00094
00095
00096
00097 class MutexInternal: NonCopyable
00098 {
00099 public:
00100 MutexInternal():
00101 lockCount_(0)
00102 {
00103 pthread_mutexattr_t mutexattr;
00104 LASS_ENFORCE_CLIB_RC(pthread_mutexattr_init(&mutexattr));
00105 LASS_ENFORCE_CLIB_RC(pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE));
00106 LASS_ENFORCE_CLIB_RC(pthread_mutex_init(&mutex_,&mutexattr));
00107 LASS_ENFORCE_CLIB_RC(pthread_mutexattr_destroy(&mutexattr));
00108 }
00109 ~MutexInternal()
00110 {
00111 LASS_ASSERT(lockCount_ == 0);
00112 LASS_WARN_CLIB_RC(pthread_mutex_destroy(&mutex_));
00113 }
00114 void lock()
00115 {
00116 LASS_ENFORCE_CLIB_RC(pthread_mutex_lock(&mutex_));
00117 ++lockCount_;
00118 }
00119 const LockResult tryLock()
00120 {
00121 const int ret = pthread_mutex_trylock(&mutex_);
00122 if (ret == 0)
00123 {
00124 ++lockCount_;
00125 return lockSuccess;
00126 }
00127 else if (ret == EBUSY)
00128 {
00129 return lockBusy;
00130 }
00131 LASS_THROW("pthread_mutex_trylock failed: ("
00132 << ret << ") " << impl::lass_strerror(ret));
00133 }
00134 void unlock()
00135 {
00136 LASS_ASSERT(lockCount_ > 0);
00137 if (lockCount_ == 0)
00138 {
00139 LASS_THROW("attempting to unlock an unlocked mutex");
00140 }
00141 --lockCount_;
00142 LASS_ENFORCE_CLIB_RC(pthread_mutex_unlock(&mutex_));
00143 }
00144 const unsigned lockCount() const
00145 {
00146 return lockCount_;
00147 }
00148 public:
00149 pthread_mutex_t mutex_;
00150 unsigned lockCount_;
00151 };
00152
00153
00154
00155
00156 typedef MutexInternal CriticalSectionInternal;
00157
00158
00159
00160
00161
00162
00163 class ConditionInternal: NonCopyable
00164 {
00165 public:
00166 ConditionInternal():
00167 threadsWaiting_(0),
00168 signalFlag_(false),
00169 broadcastFlag_(false)
00170 {
00171 LASS_ENFORCE_CLIB_RC(pthread_cond_init(&condition_, NULL));
00172 LASS_ENFORCE_CLIB_RC(pthread_mutex_init(&mutex_, NULL));
00173 }
00174 ~ConditionInternal()
00175 {
00176 LASS_WARN_CLIB_RC(pthread_mutex_destroy(&mutex_));
00177 LASS_WARN_CLIB_RC(pthread_cond_destroy(&condition_));
00178 }
00179 void wait()
00180 {
00181 LASS_ENFORCE_CLIB_RC(pthread_mutex_lock(&mutex_));
00182
00183 int retWait = 0;
00184 ++threadsWaiting_;
00185 while (retWait == 0 && !(signalFlag_ || broadcastFlag_))
00186 {
00187 retWait = pthread_cond_wait(&condition_, &mutex_);
00188 LASS_ASSERT(retWait == 0);
00189 }
00190 --threadsWaiting_;
00191 signalFlag_ = false;
00192 broadcastFlag_ &= (threadsWaiting_ > 0);
00193
00194 const int retUnlock = pthread_mutex_unlock(&mutex_);
00195 LASS_ASSERT(retUnlock == 0);
00196 if (retWait != 0)
00197 {
00198 LASS_THROW("pthread_cond_wait failed: ("
00199 << retWait << ") " << impl::lass_strerror(retWait));
00200 }
00201 if (retUnlock != 0)
00202 {
00203 LASS_THROW("pthread_mutex_unlock failed: ("
00204 << retUnlock << ") " << impl::lass_strerror(retUnlock));
00205 }
00206 }
00207 const WaitResult wait(unsigned long iMilliSeconds)
00208 {
00209 const long million = 1000000;
00210 const long trillion = 1000000000;
00211
00212 LASS_ENFORCE_CLIB_RC(pthread_mutex_lock(&mutex_));
00213
00214 struct timespec timeToWaitTo;
00215 clock_gettime(CLOCK_REALTIME,&timeToWaitTo);
00216 timeToWaitTo.tv_nsec += iMilliSeconds * million;
00217 if (timeToWaitTo.tv_nsec >= trillion)
00218 {
00219 timeToWaitTo.tv_sec += timeToWaitTo.tv_nsec / trillion;
00220 timeToWaitTo.tv_nsec %= trillion;
00221 }
00222
00223 ++threadsWaiting_;
00224 int retWait = 0;
00225 while (retWait == 0 && !(signalFlag_ || broadcastFlag_))
00226 {
00227 retWait = pthread_cond_timedwait(&condition_,&mutex_,&timeToWaitTo);
00228 LASS_ASSERT(retWait == 0 || retWait == ETIMEDOUT);
00229 }
00230
00231 --threadsWaiting_;
00232 signalFlag_ = false;
00233 broadcastFlag_ &= (threadsWaiting_ > 0);
00234
00235 const int retUnlock = pthread_mutex_unlock(&mutex_);
00236 LASS_ASSERT(retUnlock == 0);
00237 if (retWait != 0 && retWait != ETIMEDOUT)
00238 {
00239 LASS_THROW("pthread_cond_timedwait failed: ("
00240 << retWait << ") " << impl::lass_strerror(retWait));
00241 }
00242 if (retUnlock != 0)
00243 {
00244 LASS_THROW("pthread_mutex_unlock failed: ("
00245 << retUnlock << ") " << impl::lass_strerror(retUnlock));
00246 }
00247
00248 LASS_ASSERT(retWait == 0 || retWait == ETIMEDOUT);
00249 return retWait == 0 ? waitSuccess : waitTimeout;
00250 }
00251 void signal()
00252 {
00253 LASS_ENFORCE_CLIB_RC(pthread_mutex_lock(&mutex_));
00254 signalFlag_ = true;
00255 pthread_cond_signal(&condition_);
00256 LASS_ENFORCE_CLIB_RC(pthread_mutex_unlock(&mutex_));
00257 }
00258 void broadcast()
00259 {
00260 LASS_ENFORCE_CLIB_RC(pthread_mutex_lock(&mutex_));
00261 signalFlag_ = true;
00262 pthread_cond_broadcast(&condition_);
00263 LASS_ENFORCE_CLIB_RC(pthread_mutex_unlock(&mutex_));
00264 }
00265 private:
00266 pthread_mutex_t mutex_;
00267 pthread_cond_t condition_;
00268 unsigned threadsWaiting_;
00269 bool signalFlag_;
00270 bool broadcastFlag_;
00271 };
00272
00273
00274
00275
00276
00277
00278 void bindThread(pid_t pid, unsigned processor)
00279 {
00280 cpu_set_t mask;
00281 CPU_ZERO(&mask);
00282
00283 if (processor == Thread::anyProcessor)
00284 {
00285 const int n = static_cast<int>(util::numberOfProcessors);
00286 LASS_ASSERT(n > 0);
00287 for (int i = 0; i < n; ++i)
00288 {
00289 CPU_SET(i, &mask);
00290 }
00291 }
00292 else
00293 {
00294 if (processor >= util::numberOfProcessors)
00295 {
00296 LASS_THROW("'" << processor << "' is an invalid processor index. "
00297 << "Valid range is [0, " << util::numberOfProcessors << ").");
00298 }
00299 LASS_ASSERT(static_cast<int>(processor) >= 0);
00300 CPU_SET(static_cast<int>(processor), &mask);
00301 }
00302
00303 LASS_ENFORCE_CLIB(sched_setaffinity(pid, sizeof(cpu_set_t), &mask));
00304 }
00305
00306
00307
00308 #define LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(exception_type)\
00309 catch (const exception_type& error)\
00310 {\
00311 pimpl->error_.reset(new experimental::RemoteExceptionWrapper<exception_type>(error));\
00312 }
00313
00314
00315
00316
00317 class ThreadInternal: NonCopyable
00318 {
00319 public:
00320
00321 ThreadInternal(Thread& iThread, ThreadKind iKind, const char* ):
00322 thread_(iThread),
00323 isJoinable_(iKind == threadJoinable),
00324 isCreated_(false)
00325 {
00326
00327 }
00328
00329 ~ThreadInternal()
00330 {
00331 }
00332
00333
00334
00335 void run()
00336 {
00337 if (isCreated_)
00338 {
00339 LASS_THROW("You can run a thread only once");
00340 }
00341 LASS_ENFORCE_CLIB_RC(pthread_create(
00342 &handle_, 0, &ThreadInternal::staticThreadStart, this));
00343 while (!isCreated_)
00344 {
00345 runCondition_.wait(100);
00346 }
00347 }
00348
00349 void join()
00350 {
00351 if (!(isJoinable_ && isCreated_))
00352 {
00353 LASS_THROW("Can not wait for uncreated or detached threads");
00354 }
00355 else
00356 {
00357 pthread_join(handle_, 0);
00358 isJoinable_ = false;
00359 if (error_.get())
00360 {
00361 error_->throwSelf();
00362 }
00363 }
00364 }
00365
00366 void bind(size_t processor)
00367 {
00368 bindThread(tid_, processor);
00369 }
00370
00371 static void sleep(unsigned long iMilliSeconds)
00372 {
00373 timespec timeOut;
00374 if (iMilliSeconds < 1000)
00375 {
00376 timeOut.tv_sec = 0;
00377 timeOut.tv_nsec = iMilliSeconds * 1000000;
00378 }
00379 else
00380 {
00381 timeOut.tv_sec = iMilliSeconds / 1000;
00382 timeOut.tv_nsec = (iMilliSeconds % 1000) * 1000000;
00383 }
00384
00385
00386
00387
00388
00389 timespec timeRemaining;
00390 while (true)
00391 {
00392 const int ret = nanosleep(&timeOut, &timeRemaining);
00393 if (ret == 0)
00394 {
00395
00396 return;
00397 }
00398 const int errnum = impl::lass_errno();
00399 if (errnum != EINTR)
00400 {
00401
00402 LASS_THROW("nanosleep failed: (" << errnum
00403 << ") " << impl::lass_strerror(errnum));
00404 }
00405
00406
00407 timeOut.tv_sec = timeRemaining.tv_sec;
00408 timeOut.tv_nsec = timeRemaining.tv_nsec;
00409 }
00410 LASS_ASSERT(timeRemaining.tv_sec == 0 && timeRemaining.tv_nsec == 0);
00411 }
00412
00413 static void yield()
00414 {
00415 LASS_ENFORCE_CLIB(sched_yield());
00416 }
00417
00418 static void bindCurrent(size_t processor)
00419 {
00420 bindThread(0, processor);
00421 }
00422
00423
00424 static void* staticThreadStart(void* iPimpl)
00425 {
00426 LASS_ASSERT(iPimpl);
00427 ThreadInternal* pimpl = static_cast<ThreadInternal*>(iPimpl);
00428 pimpl->tid_ = getpid();
00429 pimpl->isCreated_ = true;
00430 if (pimpl->isJoinable_)
00431 {
00432 try
00433 {
00434 pimpl->runCondition_.signal();
00435 pimpl->thread_.doRun();
00436 }
00437 catch (const experimental::RemoteExceptionBase& error)
00438 {
00439 pimpl->error_ = error.clone();
00440 }
00441 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::domain_error)
00442 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::invalid_argument)
00443 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::length_error)
00444 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::out_of_range)
00445 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::range_error)
00446 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::overflow_error)
00447 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::underflow_error)
00448 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::runtime_error)
00449 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::logic_error)
00450 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::exception)
00451 }
00452 else
00453 {
00454 pimpl->runCondition_.signal();
00455 pimpl->thread_.doRun();
00456 delete &pimpl->thread_;
00457 }
00458 return 0;
00459 }
00460
00461 private:
00462
00463 Thread& thread_;
00464 pthread_t handle_;
00465 Condition runCondition_;
00466 std::auto_ptr<experimental::RemoteExceptionBase> error_;
00467 pid_t tid_;
00468 volatile bool isJoinable_;
00469 volatile bool isCreated_;
00470 };
00471
00472
00473
00474
00475
00476
00477 class ThreadLocalStorageInternal: NonCopyable
00478 {
00479 public:
00480 ThreadLocalStorageInternal(void (*destructor)(void*))
00481 {
00482 LASS_ENFORCE_CLIB_RC(pthread_key_create(&key_, destructor));
00483 }
00484 ~ThreadLocalStorageInternal()
00485 {
00486 LASS_WARN_CLIB_RC(pthread_key_delete(key_));
00487 }
00488 void* const get() const
00489 {
00490 return pthread_getspecific(key_);
00491 }
00492 void set(const void* value)
00493 {
00494 LASS_ENFORCE_CLIB_RC(pthread_setspecific(key_, value));
00495 }
00496 private:
00497 pthread_key_t key_;
00498 };
00499
00500 }
00501 }
00502 }
00503
00504 #endif
00505
00506