43#ifndef LASS_GUARDIAN_OF_INCLUSION_UTIL_IMPL_THREAD_POSIX_INL
44#define LASS_GUARDIAN_OF_INCLUSION_UTIL_IMPL_THREAD_POSIX_INL
51#ifdef LASS_HAVE_PTHREAD_H
54#ifdef LASS_HAVE_PTHREAD_NP_H
55# include <pthread_np.h>
63#if LASS_HAVE_SYS_PROCESSOR_H
64# include <sys/processor.h>
66#if LASS_HAVE_SYS_SYSCALL_H
67# include <sys/syscall.h>
69#if LASS_HAVE_SYSCTL_H_CTL_HW && LASS_HAVE_SYSCTL_H_HW_NCPU
70# include <sys/sysctl.h>
72#if LASS_HAVE_SYS_TIME_H
75#if LASS_HAVE_SYS_CPUSET_H
76# include <sys/param.h>
77# include <sys/cpuset.h>
79#if LASS_HAVE_MACH_THREAD_POLICY_H
80# include <mach/thread_policy.h>
83#if LASS_HAVE_SCHED_H_CPU_SET_T || LASS_HAVE_SYS_CPUSET_H
84# define LASS_HAVE_CPU_SET_T 1
94#if LASS_HAVE_SYS_CPUSET_H
95typedef cpuset_t cpu_set_t;
98#if LASS_HAVE_CPU_SET_T
99cpu_set_t originalCpuSet()
101 static cpu_set_t cpu_set;
102 static bool isInitialized =
false;
106#if LASS_HAVE_SCHED_H_CPU_SET_T
107 LASS_ENFORCE_CLIB(sched_getaffinity(0,
sizeof(cpu_set_t), &cpu_set));
108#elif LASS_HAVE_SYS_CPUSET_H
109 LASS_ENFORCE_CLIB(cpuset_getaffinity(CPU_LEVEL_WHICH, CPU_WHICH_PID, -1,
sizeof(cpu_set_t), &cpu_set));
111# error missing implementation
113 isInitialized =
true;
117cpu_set_t forceCalculationBeforeMain = originalCpuSet();
120TCpuSet availableProcessors()
122#if LASS_HAVE_CPU_SET_T
123 cpu_set_t cpu_set = originalCpuSet();
127#if LASS_HAVE_UNISTD_H_SC_NPROCESSORS_CONF
128 const size_t n =
static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF));
129#elif LASS_HAVE_CPU_SET_T
131 for (
int i = 0; i < CPU_SETSIZE; ++i)
133 if (CPU_ISSET(i, &cpu_set))
135 n =
static_cast<size_t>(i) + 1;
138#elif LASS_HAVE_SYSCTL_H_CTL_HW && LASS_HAVE_SYSCTL_H_HW_NCPU
139 int mid[2] = { CTL_HW, HW_NCPU };
141 size_t len =
sizeof(ncpus);
142 sysctl(mib, 2, &ncpus, &len, 0, 0);
143 const size_t n =
static_cast<size_t>(ncpus);
145# error missing implementation
149 TCpuSet cpuSet(n,
false);
150 for (
size_t i = 0; i < n; ++i)
152#if LASS_HAVE_CPU_SET_T
153 cpuSet[i] = CPU_ISSET(
static_cast<int>(i), &cpu_set);
154#elif LASS_HAVE_SYS_PROCESSOR_H
155 const int r = LASS_ENFORCE_CLIB(p_online(
static_cast<processorid_t
>(i), P_STATUS))(
"i=")(i);
156 cpuSet[i] = r == P_ONLINE || r == P_NOINTR;
158# warning [LASS BUILD MSG] do not know how to figure out what processors in the CPU set are available. Will assuming they all are.
167inline pid_t lass_gettid()
169#if LASS_HAVE_SYS_SYSCALL_H_GETTID
170 return static_cast<pid_t
>(syscall(__NR_gettid));
181class MutexInternal: NonCopyable
187 pthread_mutexattr_t mutexattr;
188 LASS_ENFORCE_CLIB_RC(pthread_mutexattr_init(&mutexattr));
189 LASS_ENFORCE_CLIB_RC(pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE));
190 LASS_ENFORCE_CLIB_RC(pthread_mutex_init(&mutex_,&mutexattr));
191 LASS_ENFORCE_CLIB_RC(pthread_mutexattr_destroy(&mutexattr));
195 LASS_ASSERT(lockCount_ == 0);
196 LASS_WARN_CLIB_RC(pthread_mutex_destroy(&mutex_));
200 LASS_ENFORCE_CLIB_RC(pthread_mutex_lock(&mutex_));
205 const int ret = pthread_mutex_trylock(&mutex_);
211 else if (ret == EBUSY)
215 LASS_THROW(
"pthread_mutex_trylock failed: ("
220 LASS_ASSERT(lockCount_ > 0);
223 LASS_THROW(
"attempting to unlock an unlocked mutex");
226 LASS_ENFORCE_CLIB_RC(pthread_mutex_unlock(&mutex_));
228 unsigned lockCount()
const
233 pthread_mutex_t mutex_;
240typedef MutexInternal CriticalSectionInternal;
247class ConditionInternal: NonCopyable
253 broadcastFlag_(false)
255 LASS_ENFORCE_CLIB_RC(pthread_cond_init(&condition_, NULL));
256 LASS_ENFORCE_CLIB_RC(pthread_mutex_init(&mutex_, NULL));
260 LASS_WARN_CLIB_RC(pthread_mutex_destroy(&mutex_));
261 LASS_WARN_CLIB_RC(pthread_cond_destroy(&condition_));
265 LASS_ENFORCE_CLIB_RC(pthread_mutex_lock(&mutex_));
269 while (retWait == 0 && !(signalFlag_ || broadcastFlag_))
271 retWait = pthread_cond_wait(&condition_, &mutex_);
272 LASS_ASSERT(retWait == 0);
276 broadcastFlag_ &= (threadsWaiting_ > 0);
278 const int retUnlock = pthread_mutex_unlock(&mutex_);
279 LASS_ASSERT(retUnlock == 0);
282 LASS_THROW(
"pthread_cond_wait failed: ("
287 LASS_THROW(
"pthread_mutex_unlock failed: ("
296 nsec_per_usec = 1000,
297 nsec_per_msec = 1000 * 1000,
298 nsec_per_sec = nsec_per_msec * msec_per_sec
301 const unsigned long seconds = milliSeconds / msec_per_sec;
302 milliSeconds %= msec_per_sec;
304 LASS_ENFORCE_CLIB_RC(pthread_mutex_lock(&mutex_));
306 struct timespec timeToWaitTo;
307#if LASS_HAVE_CLOCK_GETTIME
308 clock_gettime(CLOCK_REALTIME, &timeToWaitTo);
311 ::gettimeofday(&now, 0);
312 timeToWaitTo.tv_sec = now.tv_sec;
313 timeToWaitTo.tv_nsec = now.tv_usec * nsec_per_usec;
315 timeToWaitTo.tv_nsec += milliSeconds * nsec_per_msec;
316 timeToWaitTo.tv_sec += seconds;
317 if (timeToWaitTo.tv_nsec >= nsec_per_sec)
319 timeToWaitTo.tv_sec += timeToWaitTo.tv_nsec / nsec_per_sec;
320 timeToWaitTo.tv_nsec %= nsec_per_sec;
325 while (retWait == 0 && !(signalFlag_ || broadcastFlag_))
327 retWait = pthread_cond_timedwait(&condition_,&mutex_,&timeToWaitTo);
328 LASS_ASSERT(retWait == 0 || retWait == ETIMEDOUT);
333 broadcastFlag_ &= (threadsWaiting_ > 0);
335 const int retUnlock = pthread_mutex_unlock(&mutex_);
336 LASS_ASSERT(retUnlock == 0);
337 if (retWait != 0 && retWait != ETIMEDOUT)
339 LASS_THROW(
"pthread_cond_timedwait failed: ("
344 LASS_THROW(
"pthread_mutex_unlock failed: ("
348 LASS_ASSERT(retWait == 0 || retWait == ETIMEDOUT);
353 LASS_ENFORCE_CLIB_RC(pthread_mutex_lock(&mutex_));
355 pthread_cond_signal(&condition_);
356 LASS_ENFORCE_CLIB_RC(pthread_mutex_unlock(&mutex_));
360 LASS_ENFORCE_CLIB_RC(pthread_mutex_lock(&mutex_));
362 pthread_cond_broadcast(&condition_);
363 LASS_ENFORCE_CLIB_RC(pthread_mutex_unlock(&mutex_));
366 pthread_mutex_t mutex_;
367 pthread_cond_t condition_;
368 unsigned threadsWaiting_;
378void bindThread(pthread_t LASS_UNUSED(handle), pid_t LASS_UNUSED(tid),
size_t LASS_UNUSED(processor))
380#if LASS_HAVE_CPU_SET_T
384 mask = originalCpuSet();
388 LASS_ASSERT(
static_cast<int>(processor) >= 0);
390 CPU_SET(
static_cast<int>(processor), &mask);
392# if LASS_HAVE_PTHREAD_SETAFFINITY_NP
393 LASS_ENFORCE_CLIB(pthread_setaffinity_np(handle,
sizeof(cpu_set_t), &mask))(
"handle=")(handle);
395 LASS_ENFORCE_CLIB(sched_setaffinity(tid,
sizeof(cpu_set_t), &mask))(
"tid=")(tid);
397#elif LASS_HAVE_SYS_PROCESSOR_H
398 const processorid_t cpu_id = processor ==
Thread::anyProcessor ? PBIND_NONE :
static_cast<processorid_t
>(processor);
399 LASS_ENFORCE_CLIB(processor_bind(P_LWPID, handle, cpu_id, 0))(
"handle=")(handle);
401# warning [LASS BUILD MSG] no implementation for util::Thread::bind: setting thread affinity will be a no-op.
407#define LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(exception_type)\
408 catch (const exception_type& error)\
410 pimpl->error_.reset(new RemoteExceptionWrapper<exception_type>(error));\
416class ThreadInternal: NonCopyable
420 ThreadInternal(Thread& iThread,
ThreadKind iKind,
const char* ):
438 LASS_THROW(
"You can run a thread only once");
440 LASS_ENFORCE_CLIB_RC(pthread_create(
441 &handle_, 0, &ThreadInternal::staticThreadStart,
this));
444 runCondition_.wait(100);
448 bool isJoinable()
const
450 return isJoinable_ && isCreated_;
457 LASS_THROW(
"Can not wait for uncreated or detached threads");
460 pthread_join(handle_, 0);
468 void bind(
size_t processor)
470 bindThread(handle_, tid_, processor);
473 const TCpuSet affinity()
const
476 TCpuSet result(n,
false);
477#if LASS_HAVE_CPU_SET_T
479# if LASS_HAVE_PTHREAD_SETAFFINITY_NP
480 LASS_ENFORCE_CLIB(pthread_getaffinity_np(handle_,
sizeof(cpu_set_t), &mask))(
"handle=")(handle_);
482 LASS_ENFORCE_CLIB(sched_getaffinity(tid_,
sizeof(cpu_set_t), &mask))(
"tid=")(tid_);
484 for (
size_t i = 0; i < n; ++i)
486 result[i] = CPU_ISSET(
static_cast<int>(i), &mask);
488#elif LASS_HAVE_SYS_PROCESSOR_H
489 processorid_t cpu_id;
490 LASS_ENFORCE_CLIB(processor_bind(P_LWPID, handle_, PBIND_QUERY, &cpu_id))(
"handle=")(handle_);
491 if (cpu_id == PBIND_NONE)
493 return availableProcessors();
495 result[
static_cast<size_t>(cpu_id)] =
true;
497# warning [LASS BUILD MSG] no implementation for util::Thread::affinity: will return set with all available processors
498 return availableProcessors();
503 static void sleep(
unsigned long milliSeconds)
508 nsec_per_msec = 1000 * 1000,
509 nsec_per_sec = nsec_per_msec * msec_per_sec
513 if (milliSeconds < msec_per_sec)
516 timeOut.tv_nsec =
static_cast<long>(milliSeconds * nsec_per_msec);
520 timeOut.tv_sec = milliSeconds / msec_per_sec;
521 timeOut.tv_nsec = (milliSeconds % msec_per_sec) * nsec_per_msec;
528 timespec timeRemaining;
531 const int ret = nanosleep(&timeOut, &timeRemaining);
541 LASS_THROW(
"nanosleep failed: (" << errnum
546 timeOut.tv_sec = timeRemaining.tv_sec;
547 timeOut.tv_nsec = timeRemaining.tv_nsec;
549 LASS_ASSERT(timeRemaining.tv_sec == 0 && timeRemaining.tv_nsec == 0);
554 LASS_ENFORCE_CLIB(sched_yield());
557 static void bindCurrent(
size_t processor)
559 bindThread(pthread_self(), lass_gettid(), processor);
563 static void* staticThreadStart(
void* iPimpl)
566 ThreadInternal* pimpl =
static_cast<ThreadInternal*
>(iPimpl);
567 pimpl->tid_ = lass_gettid();
568 pimpl->isCreated_ =
true;
569 if (pimpl->isJoinable_)
573 pimpl->runCondition_.signal();
574 pimpl->thread_.doRun();
576 catch (
const RemoteExceptionBase& error)
578 pimpl->error_ = error.clone();
580 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::domain_error)
581 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::invalid_argument)
582 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::length_error)
583 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::out_of_range)
584 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::range_error)
585 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::overflow_error)
586 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::underflow_error)
587 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::runtime_error)
588 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::logic_error)
589 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::exception)
593 pimpl->runCondition_.signal();
594 pimpl->thread_.doRun();
595 delete &pimpl->thread_;
604 Condition runCondition_;
605 TRemoteExceptionBasePtr error_;
608 volatile bool isJoinable_;
609 volatile bool isCreated_;
617class ThreadLocalStorageInternal: NonCopyable
620 ThreadLocalStorageInternal(
void (*destructor)(
void*))
622 LASS_ENFORCE_CLIB_RC(pthread_key_create(&key_, destructor));
624 ~ThreadLocalStorageInternal()
626 LASS_WARN_CLIB_RC(pthread_key_delete(key_));
630 return pthread_getspecific(key_);
632 void set(
const void* value)
634 LASS_ENFORCE_CLIB_RC(pthread_setspecific(key_, value));
static constexpr size_t anyProcessor
argument for Thread::bind to unbind the thread so it runs on any processor
const std::string lass_strerror(int errnum)
returns message associated to an CLIB error code
int lass_errno()
returns CLIB errno
LockResult
Return code for lock functions.
size_t numberOfProcessors()
Return highest id of processor + 1, in this machine.
WaitResult
Return code for wait functions.
@ threadJoinable
joinable thread, can be waited for
@ lockSuccess
Mutex/CriticalSection is succesfully locked by this thread.
@ lockBusy
Mutex/CriticalSection is locked by another thread.
@ waitSuccess
Wait is successfully terminated.
@ waitTimeout
Wait failed because of a timeout.
general utility, debug facilities, ...
Library for Assembled Shared Sources.