Library of Assembled Shared Sources
thread_posix.inl
Go to the documentation of this file.
1/** @file
2 * @author Bram de Greve (bram@cocamware.com)
3 * @author Tom De Muer (tom@cocamware.com)
4 *
5 * *** BEGIN LICENSE INFORMATION ***
6 *
7 * The contents of this file are subject to the Common Public Attribution License
8 * Version 1.0 (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
10 * http://lass.sourceforge.net/cpal-license. The License is based on the
11 * Mozilla Public License Version 1.1 but Sections 14 and 15 have been added to cover
12 * use of software over a computer network and provide for limited attribution for
13 * the Original Developer. In addition, Exhibit A has been modified to be consistent
14 * with Exhibit B.
15 *
16 * Software distributed under the License is distributed on an "AS IS" basis, WITHOUT
17 * WARRANTY OF ANY KIND, either express or implied. See the License for the specific
18 * language governing rights and limitations under the License.
19 *
20 * The Original Code is LASS - Library of Assembled Shared Sources.
21 *
22 * The Initial Developer of the Original Code is Bram de Greve and Tom De Muer.
23 * The Original Developer is the Initial Developer.
24 *
25 * All portions of the code written by the Initial Developer are:
26 * Copyright (C) 2004-2011 the Initial Developer.
27 * All Rights Reserved.
28 *
29 * Contributor(s):
30 *
31 * Alternatively, the contents of this file may be used under the terms of the
32 * GNU General Public License Version 2 or later (the GPL), in which case the
33 * provisions of GPL are applicable instead of those above. If you wish to allow use
34 * of your version of this file only under the terms of the GPL and not to allow
35 * others to use your version of this file under the CPAL, indicate your decision by
36 * deleting the provisions above and replace them with the notice and other
37 * provisions required by the GPL License. If you do not delete the provisions above,
38 * a recipient may use your version of this file under either the CPAL or the GPL.
39 *
40 * *** END LICENSE INFORMATION ***
41 */
42
43#ifndef LASS_GUARDIAN_OF_INCLUSION_UTIL_IMPL_THREAD_POSIX_INL
44#define LASS_GUARDIAN_OF_INCLUSION_UTIL_IMPL_THREAD_POSIX_INL
45
46#include "../util_common.h"
47#include "../thread.h"
49#include "lass_errno.h"
50#include <errno.h>
51#ifdef LASS_HAVE_PTHREAD_H
52# include <pthread.h>
53#endif
54#ifdef LASS_HAVE_PTHREAD_NP_H
55# include <pthread_np.h>
56#endif
57#if LASS_HAVE_SCHED_H
58# include <sched.h>
59#endif
60#if LASS_HAVE_UNISTD_H
61# include <unistd.h>
62#endif
63#if LASS_HAVE_SYS_PROCESSOR_H
64# include <sys/processor.h>
65#endif
66#if LASS_HAVE_SYS_SYSCALL_H
67# include <sys/syscall.h>
68#endif
69#if LASS_HAVE_SYSCTL_H_CTL_HW && LASS_HAVE_SYSCTL_H_HW_NCPU
70# include <sys/sysctl.h>
71#endif
72#if LASS_HAVE_SYS_TIME_H
73# include <sys/time.h>
74#endif
75#if LASS_HAVE_SYS_CPUSET_H
76# include <sys/param.h> // also required
77# include <sys/cpuset.h>
78#endif
79#if LASS_HAVE_MACH_THREAD_POLICY_H
80# include <mach/thread_policy.h>
81#endif
82
83#if LASS_HAVE_SCHED_H_CPU_SET_T || LASS_HAVE_SYS_CPUSET_H
84# define LASS_HAVE_CPU_SET_T 1
85#endif
86
87namespace lass
88{
89namespace util
90{
91namespace impl
92{
93
94#if LASS_HAVE_SYS_CPUSET_H
95typedef cpuset_t cpu_set_t;
96#endif
97
98#if LASS_HAVE_CPU_SET_T
99cpu_set_t originalCpuSet()
100{
101 static cpu_set_t cpu_set;
102 static bool isInitialized = false;
103 if (!isInitialized)
104 {
105 CPU_ZERO(&cpu_set);
106#if LASS_HAVE_SCHED_H_CPU_SET_T
107 LASS_ENFORCE_CLIB(sched_getaffinity(0, sizeof(cpu_set_t), &cpu_set));
108#elif LASS_HAVE_SYS_CPUSET_H
109 LASS_ENFORCE_CLIB(cpuset_getaffinity(CPU_LEVEL_WHICH, CPU_WHICH_PID, -1, sizeof(cpu_set_t), &cpu_set));
110#else
111# error missing implementation
112#endif
113 isInitialized = true;
114 }
115 return cpu_set;
116}
117cpu_set_t forceCalculationBeforeMain = originalCpuSet();
118#endif
119
120TCpuSet availableProcessors()
121{
122#if LASS_HAVE_CPU_SET_T
123 cpu_set_t cpu_set = originalCpuSet();
124#endif
125
126 // determine number of processors (highest set bit of systemAffinityMask)
127#if LASS_HAVE_UNISTD_H_SC_NPROCESSORS_CONF
128 const size_t n = static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF));
129#elif LASS_HAVE_CPU_SET_T
130 size_t n = 0;
131 for (int i = 0; i < CPU_SETSIZE; ++i)
132 {
133 if (CPU_ISSET(i, &cpu_set))
134 {
135 n = static_cast<size_t>(i) + 1;
136 }
137 }
138#elif LASS_HAVE_SYSCTL_H_CTL_HW && LASS_HAVE_SYSCTL_H_HW_NCPU
139 int mid[2] = { CTL_HW, HW_NCPU };
140 int ncpus = 1;
141 size_t len = sizeof(ncpus);
142 sysctl(mib, 2, &ncpus, &len, 0, 0);
143 const size_t n = static_cast<size_t>(ncpus);
144#else
145# error missing implementation
146#endif
147
148 // determine what processors are available to this process
149 TCpuSet cpuSet(n, false);
150 for (size_t i = 0; i < n; ++i)
151 {
152#if LASS_HAVE_CPU_SET_T
153 cpuSet[i] = CPU_ISSET(static_cast<int>(i), &cpu_set);
154#elif LASS_HAVE_SYS_PROCESSOR_H
155 const int r = LASS_ENFORCE_CLIB(p_online(static_cast<processorid_t>(i), P_STATUS))("i=")(i);
156 cpuSet[i] = r == P_ONLINE || r == P_NOINTR;
157#else
158# warning [LASS BUILD MSG] do not know how to figure out what processors in the CPU set are available. Will assuming they all are.
159 cpuSet[i] = true;
160#endif
161 }
162
163 return cpuSet;
164}
165
166
167inline pid_t lass_gettid()
168{
169#if LASS_HAVE_SYS_SYSCALL_H_GETTID
170 return static_cast<pid_t>(syscall(__NR_gettid));
171#else
172 return -1;
173#endif
174}
175
176
177
178/** @internal
179 * @ingroup Threading
180 */
181class MutexInternal: NonCopyable
182{
183public:
184 MutexInternal():
185 lockCount_(0)
186 {
187 pthread_mutexattr_t mutexattr;
188 LASS_ENFORCE_CLIB_RC(pthread_mutexattr_init(&mutexattr));
189 LASS_ENFORCE_CLIB_RC(pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE));
190 LASS_ENFORCE_CLIB_RC(pthread_mutex_init(&mutex_,&mutexattr));
191 LASS_ENFORCE_CLIB_RC(pthread_mutexattr_destroy(&mutexattr));
192 }
193 ~MutexInternal()
194 {
195 LASS_ASSERT(lockCount_ == 0);
196 LASS_WARN_CLIB_RC(pthread_mutex_destroy(&mutex_));
197 }
198 void lock()
199 {
200 LASS_ENFORCE_CLIB_RC(pthread_mutex_lock(&mutex_));
201 ++lockCount_;
202 }
203 LockResult tryLock()
204 {
205 const int ret = pthread_mutex_trylock(&mutex_);
206 if (ret == 0)
207 {
208 ++lockCount_;
209 return lockSuccess;
210 }
211 else if (ret == EBUSY)
212 {
213 return lockBusy;
214 }
215 LASS_THROW("pthread_mutex_trylock failed: ("
216 << ret << ") " << impl::lass_strerror(ret));
217 }
218 void unlock()
219 {
220 LASS_ASSERT(lockCount_ > 0);
221 if (lockCount_ == 0)
222 {
223 LASS_THROW("attempting to unlock an unlocked mutex");
224 }
225 --lockCount_;
226 LASS_ENFORCE_CLIB_RC(pthread_mutex_unlock(&mutex_));
227 }
228 unsigned lockCount() const
229 {
230 return lockCount_;
231 }
232public:
233 pthread_mutex_t mutex_;
234 unsigned lockCount_;
235};
236
237/** @internal
238 * @ingroup Threading
239 */
240typedef MutexInternal CriticalSectionInternal;
241
242
243
244/** @internal
245 * @ingroup Threading
246 */
247class ConditionInternal: NonCopyable
248{
249public:
250 ConditionInternal():
251 threadsWaiting_(0),
252 signalFlag_(false),
253 broadcastFlag_(false)
254 {
255 LASS_ENFORCE_CLIB_RC(pthread_cond_init(&condition_, NULL));
256 LASS_ENFORCE_CLIB_RC(pthread_mutex_init(&mutex_, NULL));
257 }
258 ~ConditionInternal()
259 {
260 LASS_WARN_CLIB_RC(pthread_mutex_destroy(&mutex_));
261 LASS_WARN_CLIB_RC(pthread_cond_destroy(&condition_));
262 }
263 void wait()
264 {
265 LASS_ENFORCE_CLIB_RC(pthread_mutex_lock(&mutex_));
266
267 int retWait = 0;
268 ++threadsWaiting_;
269 while (retWait == 0 && !(signalFlag_ || broadcastFlag_))
270 {
271 retWait = pthread_cond_wait(&condition_, &mutex_);
272 LASS_ASSERT(retWait == 0);
273 }
274 --threadsWaiting_;
275 signalFlag_ = false;
276 broadcastFlag_ &= (threadsWaiting_ > 0);
277
278 const int retUnlock = pthread_mutex_unlock(&mutex_);
279 LASS_ASSERT(retUnlock == 0);
280 if (retWait != 0)
281 {
282 LASS_THROW("pthread_cond_wait failed: ("
283 << retWait << ") " << impl::lass_strerror(retWait));
284 }
285 if (retUnlock != 0)
286 {
287 LASS_THROW("pthread_mutex_unlock failed: ("
288 << retUnlock << ") " << impl::lass_strerror(retUnlock));
289 }
290 }
291 WaitResult wait(unsigned long milliSeconds)
292 {
293 enum
294 {
295 msec_per_sec = 1000,
296 nsec_per_usec = 1000,
297 nsec_per_msec = 1000 * 1000,
298 nsec_per_sec = nsec_per_msec * msec_per_sec
299 };
300
301 const unsigned long seconds = milliSeconds / msec_per_sec;
302 milliSeconds %= msec_per_sec;
303
304 LASS_ENFORCE_CLIB_RC(pthread_mutex_lock(&mutex_));
305
306 struct timespec timeToWaitTo;
307#if LASS_HAVE_CLOCK_GETTIME
308 clock_gettime(CLOCK_REALTIME, &timeToWaitTo);
309#else
310 ::timeval now;
311 ::gettimeofday(&now, 0);
312 timeToWaitTo.tv_sec = now.tv_sec;
313 timeToWaitTo.tv_nsec = now.tv_usec * nsec_per_usec;
314#endif
315 timeToWaitTo.tv_nsec += milliSeconds * nsec_per_msec;
316 timeToWaitTo.tv_sec += seconds;
317 if (timeToWaitTo.tv_nsec >= nsec_per_sec)
318 {
319 timeToWaitTo.tv_sec += timeToWaitTo.tv_nsec / nsec_per_sec;
320 timeToWaitTo.tv_nsec %= nsec_per_sec;
321 }
322
323 ++threadsWaiting_;
324 int retWait = 0;
325 while (retWait == 0 && !(signalFlag_ || broadcastFlag_))
326 {
327 retWait = pthread_cond_timedwait(&condition_,&mutex_,&timeToWaitTo);
328 LASS_ASSERT(retWait == 0 || retWait == ETIMEDOUT);
329 }
330
331 --threadsWaiting_;
332 signalFlag_ = false;
333 broadcastFlag_ &= (threadsWaiting_ > 0);
334
335 const int retUnlock = pthread_mutex_unlock(&mutex_);
336 LASS_ASSERT(retUnlock == 0);
337 if (retWait != 0 && retWait != ETIMEDOUT)
338 {
339 LASS_THROW("pthread_cond_timedwait failed: ("
340 << retWait << ") " << impl::lass_strerror(retWait));
341 }
342 if (retUnlock != 0)
343 {
344 LASS_THROW("pthread_mutex_unlock failed: ("
345 << retUnlock << ") " << impl::lass_strerror(retUnlock));
346 }
347
348 LASS_ASSERT(retWait == 0 || retWait == ETIMEDOUT);
349 return retWait == 0 ? waitSuccess : waitTimeout;
350 }
351 void signal()
352 {
353 LASS_ENFORCE_CLIB_RC(pthread_mutex_lock(&mutex_));
354 signalFlag_ = true;
355 pthread_cond_signal(&condition_);
356 LASS_ENFORCE_CLIB_RC(pthread_mutex_unlock(&mutex_));
357 }
358 void broadcast()
359 {
360 LASS_ENFORCE_CLIB_RC(pthread_mutex_lock(&mutex_));
361 signalFlag_ = true;
362 pthread_cond_broadcast(&condition_);
363 LASS_ENFORCE_CLIB_RC(pthread_mutex_unlock(&mutex_));
364 }
365private:
366 pthread_mutex_t mutex_;
367 pthread_cond_t condition_;
368 unsigned threadsWaiting_;
369 bool signalFlag_;
370 bool broadcastFlag_;
371};
372
373
374
375/** @internal
376 * @ingroup Threading
377 */
378void bindThread(pthread_t LASS_UNUSED(handle), pid_t LASS_UNUSED(tid), size_t LASS_UNUSED(processor))
379{
380#if LASS_HAVE_CPU_SET_T
381 cpu_set_t mask;
382 if (processor == Thread::anyProcessor)
383 {
384 mask = originalCpuSet();
385 }
386 else
387 {
388 LASS_ASSERT(static_cast<int>(processor) >= 0);
389 CPU_ZERO(&mask);
390 CPU_SET(static_cast<int>(processor), &mask);
391 }
392# if LASS_HAVE_PTHREAD_SETAFFINITY_NP
393 LASS_ENFORCE_CLIB(pthread_setaffinity_np(handle, sizeof(cpu_set_t), &mask))("handle=")(handle);
394# else
395 LASS_ENFORCE_CLIB(sched_setaffinity(tid, sizeof(cpu_set_t), &mask))("tid=")(tid);
396# endif
397#elif LASS_HAVE_SYS_PROCESSOR_H
398 const processorid_t cpu_id = processor == Thread::anyProcessor ? PBIND_NONE : static_cast<processorid_t>(processor);
399 LASS_ENFORCE_CLIB(processor_bind(P_LWPID, handle, cpu_id, 0))("handle=")(handle);
400#else
401# warning [LASS BUILD MSG] no implementation for util::Thread::bind: setting thread affinity will be a no-op.
402#endif
403}
404
405
406
407#define LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(exception_type)\
408 catch (const exception_type& error)\
409 {\
410 pimpl->error_.reset(new RemoteExceptionWrapper<exception_type>(error));\
411 }
412
413/** @internal
414 * @ingroup Threading
415 */
416class ThreadInternal: NonCopyable
417{
418public:
419
420 ThreadInternal(Thread& iThread, ThreadKind iKind, const char* /*name*/):
421 thread_(iThread),
422 isJoinable_(iKind == threadJoinable),
423 isCreated_(false)
424 {
425
426 }
427
428 ~ThreadInternal()
429 {
430 }
431
432 /** run thread.
433 */
434 void run()
435 {
436 if (isCreated_)
437 {
438 LASS_THROW("You can run a thread only once");
439 }
440 LASS_ENFORCE_CLIB_RC(pthread_create(
441 &handle_, 0, &ThreadInternal::staticThreadStart, this));
442 while (!isCreated_)
443 {
444 runCondition_.wait(100);
445 }
446 }
447
448 bool isJoinable() const
449 {
450 return isJoinable_ && isCreated_;
451 }
452
453 void join()
454 {
455 if (!isJoinable())
456 {
457 LASS_THROW("Can not wait for uncreated or detached threads");
458 }
459
460 pthread_join(handle_, 0);
461 isJoinable_ = false;
462 if (error_.get())
463 {
464 error_->throwSelf();
465 }
466 }
467
468 void bind(size_t processor)
469 {
470 bindThread(handle_, tid_, processor);
471 }
472
473 const TCpuSet affinity() const
474 {
475 const size_t n = numberOfProcessors();
476 TCpuSet result(n, false);
477#if LASS_HAVE_CPU_SET_T
478 cpu_set_t mask;
479# if LASS_HAVE_PTHREAD_SETAFFINITY_NP
480 LASS_ENFORCE_CLIB(pthread_getaffinity_np(handle_, sizeof(cpu_set_t), &mask))("handle=")(handle_);
481# else
482 LASS_ENFORCE_CLIB(sched_getaffinity(tid_, sizeof(cpu_set_t), &mask))("tid=")(tid_);
483# endif
484 for (size_t i = 0; i < n; ++i)
485 {
486 result[i] = CPU_ISSET(static_cast<int>(i), &mask);
487 }
488#elif LASS_HAVE_SYS_PROCESSOR_H
489 processorid_t cpu_id;
490 LASS_ENFORCE_CLIB(processor_bind(P_LWPID, handle_, PBIND_QUERY, &cpu_id))("handle=")(handle_);
491 if (cpu_id == PBIND_NONE)
492 {
493 return availableProcessors();
494 }
495 result[static_cast<size_t>(cpu_id)] = true;
496#else
497# warning [LASS BUILD MSG] no implementation for util::Thread::affinity: will return set with all available processors
498 return availableProcessors();
499#endif
500 return result;
501 }
502
503 static void sleep(unsigned long milliSeconds)
504 {
505 enum
506 {
507 msec_per_sec = 1000,
508 nsec_per_msec = 1000 * 1000,
509 nsec_per_sec = nsec_per_msec * msec_per_sec
510 };
511
512 timespec timeOut;
513 if (milliSeconds < msec_per_sec)
514 {
515 timeOut.tv_sec = 0;
516 timeOut.tv_nsec = static_cast<long>(milliSeconds * nsec_per_msec);
517 }
518 else
519 {
520 timeOut.tv_sec = milliSeconds / msec_per_sec;
521 timeOut.tv_nsec = (milliSeconds % msec_per_sec) * nsec_per_msec;
522 }
523
524 // nanosleep may return earlier than expected if there's a signal
525 // that should be handled by the calling thread. If it happens,
526 // sleep again. [Bramz]
527 //
528 timespec timeRemaining;
529 while (true)
530 {
531 const int ret = nanosleep(&timeOut, &timeRemaining);
532 if (ret == 0)
533 {
534 // we're done =)
535 return;
536 }
537 const int errnum = impl::lass_errno();
538 if (errnum != EINTR)
539 {
540 // there was an error :(
541 LASS_THROW("nanosleep failed: (" << errnum
542 << ") " << impl::lass_strerror(errnum));
543 }
544 // if we're here, there was only an sleep interruption
545 // go back to sleep.
546 timeOut.tv_sec = timeRemaining.tv_sec;
547 timeOut.tv_nsec = timeRemaining.tv_nsec;
548 }
549 LASS_ASSERT(timeRemaining.tv_sec == 0 && timeRemaining.tv_nsec == 0);
550 }
551
552 static void yield()
553 {
554 LASS_ENFORCE_CLIB(sched_yield());
555 }
556
557 static void bindCurrent(size_t processor)
558 {
559 bindThread(pthread_self(), lass_gettid(), processor);
560 }
561
562 // thread function
563 static void* staticThreadStart(void* iPimpl)
564 {
565 LASS_ASSERT(iPimpl);
566 ThreadInternal* pimpl = static_cast<ThreadInternal*>(iPimpl);
567 pimpl->tid_ = lass_gettid();
568 pimpl->isCreated_ = true;
569 if (pimpl->isJoinable_)
570 {
571 try
572 {
573 pimpl->runCondition_.signal();
574 pimpl->thread_.doRun();
575 }
576 catch (const RemoteExceptionBase& error)
577 {
578 pimpl->error_ = error.clone();
579 }
580 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::domain_error)
581 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::invalid_argument)
582 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::length_error)
583 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::out_of_range)
584 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::range_error)
585 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::overflow_error)
586 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::underflow_error)
587 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::runtime_error)
588 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::logic_error)
589 LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::exception)
590 }
591 else
592 {
593 pimpl->runCondition_.signal();
594 pimpl->thread_.doRun();
595 delete &pimpl->thread_;
596 }
597 return 0;
598 }
599
600private:
601
602 Thread& thread_;
603 pthread_t handle_; // handle of the thread
604 Condition runCondition_;
605 TRemoteExceptionBasePtr error_;
606
607 pid_t tid_;
608 volatile bool isJoinable_;
609 volatile bool isCreated_;
610};
611
612
613
614/** @internal
615 * @ingroup Threading
616 */
617class ThreadLocalStorageInternal: NonCopyable
618{
619public:
620 ThreadLocalStorageInternal(void (*destructor)(void*))
621 {
622 LASS_ENFORCE_CLIB_RC(pthread_key_create(&key_, destructor));
623 }
624 ~ThreadLocalStorageInternal()
625 {
626 LASS_WARN_CLIB_RC(pthread_key_delete(key_));
627 }
628 void* get() const
629 {
630 return pthread_getspecific(key_);
631 }
632 void set(const void* value)
633 {
634 LASS_ENFORCE_CLIB_RC(pthread_setspecific(key_, value));
635 }
636private:
637 pthread_key_t key_;
638};
639
640}
641}
642}
643
644#endif
645
646// EOF
static constexpr size_t anyProcessor
argument for Thread::bind to unbind the thread so it runs on any processor
Definition thread.h:210
const std::string lass_strerror(int errnum)
returns message associated to an CLIB error code
int lass_errno()
returns CLIB errno
ThreadKind
ThreadKind.
Definition thread.h:109
LockResult
Return code for lock functions.
Definition thread.h:89
size_t numberOfProcessors()
Return highest id of processor + 1, in this machine.
Definition thread.cpp:74
WaitResult
Return code for wait functions.
Definition thread.h:99
@ threadJoinable
joinable thread, can be waited for
Definition thread.h:111
@ lockSuccess
Mutex/CriticalSection is succesfully locked by this thread.
Definition thread.h:90
@ lockBusy
Mutex/CriticalSection is locked by another thread.
Definition thread.h:91
@ waitSuccess
Wait is successfully terminated.
Definition thread.h:100
@ waitTimeout
Wait failed because of a timeout.
Definition thread.h:101
general utility, debug facilities, ...
Library for Assembled Shared Sources.
Definition config.h:53