library of assembled shared sources

http://lass.cocamware.com

thread_posix.inl

Go to the documentation of this file.
00001 /** @file
00002  *  @author Bram de Greve (bramz@users.sourceforge.net)
00003  *  @author Tom De Muer (tomdemuer@users.sourceforge.net)
00004  *
00005  *  *** BEGIN LICENSE INFORMATION ***
00006  *  
00007  *  The contents of this file are subject to the Common Public Attribution License 
00008  *  Version 1.0 (the "License"); you may not use this file except in compliance with 
00009  *  the License. You may obtain a copy of the License at 
00010  *  http://lass.sourceforge.net/cpal-license. The License is based on the 
00011  *  Mozilla Public License Version 1.1 but Sections 14 and 15 have been added to cover 
00012  *  use of software over a computer network and provide for limited attribution for 
00013  *  the Original Developer. In addition, Exhibit A has been modified to be consistent 
00014  *  with Exhibit B.
00015  *  
00016  *  Software distributed under the License is distributed on an "AS IS" basis, WITHOUT 
00017  *  WARRANTY OF ANY KIND, either express or implied. See the License for the specific 
00018  *  language governing rights and limitations under the License.
00019  *  
00020  *  The Original Code is LASS - Library of Assembled Shared Sources.
00021  *  
00022  *  The Initial Developer of the Original Code is Bram de Greve and Tom De Muer.
00023  *  The Original Developer is the Initial Developer.
00024  *  
00025  *  All portions of the code written by the Initial Developer are:
00026  *  Copyright (C) 2004-2007 the Initial Developer.
00027  *  All Rights Reserved.
00028  *  
00029  *  Contributor(s):
00030  *
00031  *  Alternatively, the contents of this file may be used under the terms of the 
00032  *  GNU General Public License Version 2 or later (the GPL), in which case the 
00033  *  provisions of GPL are applicable instead of those above.  If you wish to allow use
00034  *  of your version of this file only under the terms of the GPL and not to allow 
00035  *  others to use your version of this file under the CPAL, indicate your decision by 
00036  *  deleting the provisions above and replace them with the notice and other 
00037  *  provisions required by the GPL License. If you do not delete the provisions above,
00038  *  a recipient may use your version of this file under either the CPAL or the GPL.
00039  *  
00040  *  *** END LICENSE INFORMATION ***
00041  */
00042 
00043 #ifndef LASS_GUARDIAN_OF_INCLUSION_UTIL_IMPL_THREAD_POSIX_INL
00044 #define LASS_GUARDIAN_OF_INCLUSION_UTIL_IMPL_THREAD_POSIX_INL
00045 
00046 #include "../util_common.h"
00047 #include "../thread.h"
00048 #include "../../stde/extended_string.h"
00049 #include "lass_errno.h"
00050 #include <errno.h>
00051 #include <pthread.h>
00052 #include <sched.h>
00053 
00054 namespace lass
00055 {
00056 namespace util
00057 {
00058 namespace impl
00059 {
00060 
00061 unsigned numberOfProcessors()
00062 {
00063     // we'll need to cache this if we want this to ever work ...
00064     
00065     cpu_set_t mask;
00066     CPU_ZERO(&mask);    
00067     LASS_ENFORCE_CLIB(sched_getaffinity(0, sizeof(cpu_set_t), &mask));
00068     
00069     unsigned count = 0;
00070     int i = 0;
00071     while (CPU_ISSET(i++, &mask))
00072     {
00073         ++count;
00074     }
00075     
00076     // we're doing an assumption here ... We think, we hope, that the mask
00077     // is a continuous series of bits starting from the LSB.  We'll test for this
00078     // until we are sure that our assumption is correct. [Bramz]
00079     //
00080     while (i < CPU_SETSIZE)
00081     {
00082         if (CPU_ISSET(i++, &mask))
00083         {
00084             std::cerr << "[LASS RUN MSG] UNDEFINED BEHAVIOUR: "
00085                 "numberOfProcessors' assumption is wrong!\n" << std::endl;
00086         }
00087     }
00088     
00089     return count;
00090 }
00091 
00092 
00093 
00094 /** @internal
00095  *  @ingroup Threading
00096  */
00097 class MutexInternal: NonCopyable
00098 {
00099 public:
00100     MutexInternal():
00101         lockCount_(0)
00102     {
00103         pthread_mutexattr_t mutexattr;
00104         LASS_ENFORCE_CLIB_RC(pthread_mutexattr_init(&mutexattr));
00105         LASS_ENFORCE_CLIB_RC(pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE));
00106         LASS_ENFORCE_CLIB_RC(pthread_mutex_init(&mutex_,&mutexattr));
00107         LASS_ENFORCE_CLIB_RC(pthread_mutexattr_destroy(&mutexattr));
00108     }
00109     ~MutexInternal() 
00110     {
00111         LASS_ASSERT(lockCount_ == 0);
00112         LASS_WARN_CLIB_RC(pthread_mutex_destroy(&mutex_));
00113     }
00114     void lock()
00115     {
00116         LASS_ENFORCE_CLIB_RC(pthread_mutex_lock(&mutex_));
00117         ++lockCount_;
00118     }
00119     const LockResult tryLock()
00120     {
00121         const int ret = pthread_mutex_trylock(&mutex_);
00122         if (ret == 0)
00123         {
00124             ++lockCount_;
00125             return lockSuccess;
00126         }
00127         else if (ret == EBUSY)
00128         {
00129             return lockBusy;
00130         }
00131         LASS_THROW("pthread_mutex_trylock failed: (" 
00132             << ret << ") " << impl::lass_strerror(ret));
00133     }
00134     void unlock()
00135     {
00136         LASS_ASSERT(lockCount_ > 0);
00137         if (lockCount_ == 0)
00138         {
00139             LASS_THROW("attempting to unlock an unlocked mutex");
00140         }
00141         --lockCount_;
00142         LASS_ENFORCE_CLIB_RC(pthread_mutex_unlock(&mutex_));
00143     }
00144     const unsigned lockCount() const 
00145     {
00146         return lockCount_;
00147     }
00148 public:
00149     pthread_mutex_t mutex_;
00150     unsigned lockCount_;
00151 };
00152 
00153 /** @internal
00154  *  @ingroup Threading
00155  */
00156 typedef MutexInternal CriticalSectionInternal;
00157 
00158 
00159 
00160 /** @internal
00161  *  @ingroup Threading
00162  */
00163 class ConditionInternal: NonCopyable
00164 {
00165 public:
00166     ConditionInternal():
00167         threadsWaiting_(0),
00168         signalFlag_(false),
00169         broadcastFlag_(false)
00170     {
00171         LASS_ENFORCE_CLIB_RC(pthread_cond_init(&condition_, NULL));
00172         LASS_ENFORCE_CLIB_RC(pthread_mutex_init(&mutex_, NULL));
00173     }
00174     ~ConditionInternal()
00175     {
00176         LASS_WARN_CLIB_RC(pthread_mutex_destroy(&mutex_));
00177         LASS_WARN_CLIB_RC(pthread_cond_destroy(&condition_));
00178     }       
00179     void wait()
00180     {
00181         LASS_ENFORCE_CLIB_RC(pthread_mutex_lock(&mutex_));
00182         
00183         int retWait = 0;
00184         ++threadsWaiting_;
00185         while (retWait == 0 && !(signalFlag_ || broadcastFlag_))
00186         {
00187             retWait = pthread_cond_wait(&condition_, &mutex_);
00188             LASS_ASSERT(retWait == 0);
00189         }
00190         --threadsWaiting_;
00191         signalFlag_ = false;
00192         broadcastFlag_ &= (threadsWaiting_ > 0);        
00193         
00194         const int retUnlock = pthread_mutex_unlock(&mutex_);
00195         LASS_ASSERT(retUnlock == 0);
00196         if (retWait != 0)
00197         {
00198             LASS_THROW("pthread_cond_wait failed: (" 
00199                 << retWait << ") " << impl::lass_strerror(retWait));
00200         }
00201         if (retUnlock != 0)
00202         {
00203             LASS_THROW("pthread_mutex_unlock failed: (" 
00204                 << retUnlock << ") " << impl::lass_strerror(retUnlock));
00205         }
00206     }   
00207     const WaitResult wait(unsigned long iMilliSeconds)
00208     {
00209         const long million =  1000000;
00210         const long trillion = 1000000000;
00211         
00212         LASS_ENFORCE_CLIB_RC(pthread_mutex_lock(&mutex_));
00213         
00214         struct timespec timeToWaitTo;
00215         clock_gettime(CLOCK_REALTIME,&timeToWaitTo);
00216         timeToWaitTo.tv_nsec += iMilliSeconds * million;
00217         if (timeToWaitTo.tv_nsec >= trillion)
00218         {
00219             timeToWaitTo.tv_sec += timeToWaitTo.tv_nsec / trillion;
00220             timeToWaitTo.tv_nsec %= trillion;
00221         }
00222         
00223         ++threadsWaiting_;
00224         int retWait = 0;
00225         while (retWait == 0 && !(signalFlag_ || broadcastFlag_))
00226         {
00227             retWait = pthread_cond_timedwait(&condition_,&mutex_,&timeToWaitTo);
00228             LASS_ASSERT(retWait == 0 || retWait == ETIMEDOUT);
00229         }
00230         
00231         --threadsWaiting_;
00232         signalFlag_ = false;
00233         broadcastFlag_ &= (threadsWaiting_ > 0);        
00234 
00235         const int retUnlock = pthread_mutex_unlock(&mutex_);
00236         LASS_ASSERT(retUnlock == 0);
00237         if (retWait != 0 && retWait != ETIMEDOUT)
00238         {
00239             LASS_THROW("pthread_cond_timedwait failed: (" 
00240                 << retWait << ") " << impl::lass_strerror(retWait));
00241         }
00242         if (retUnlock != 0)
00243         {
00244             LASS_THROW("pthread_mutex_unlock failed: (" 
00245                 << retUnlock << ") " << impl::lass_strerror(retUnlock));
00246         }
00247 
00248         LASS_ASSERT(retWait == 0 || retWait == ETIMEDOUT);
00249         return retWait == 0 ? waitSuccess : waitTimeout;
00250     }
00251     void signal()
00252     {
00253         LASS_ENFORCE_CLIB_RC(pthread_mutex_lock(&mutex_));
00254         signalFlag_ = true;
00255         pthread_cond_signal(&condition_);
00256         LASS_ENFORCE_CLIB_RC(pthread_mutex_unlock(&mutex_));
00257     }
00258     void broadcast()
00259     {
00260         LASS_ENFORCE_CLIB_RC(pthread_mutex_lock(&mutex_));
00261         signalFlag_ = true;
00262         pthread_cond_broadcast(&condition_);
00263         LASS_ENFORCE_CLIB_RC(pthread_mutex_unlock(&mutex_));
00264     }
00265 private:
00266     pthread_mutex_t mutex_;
00267     pthread_cond_t condition_;
00268     unsigned threadsWaiting_;
00269     bool signalFlag_;
00270     bool broadcastFlag_;
00271 };
00272 
00273 
00274 
00275 /** @internal
00276  *  @ingroup Threading
00277  */
00278 void bindThread(pid_t pid, unsigned processor)
00279 {
00280     cpu_set_t mask;
00281     CPU_ZERO(&mask);
00282     
00283     if (processor == Thread::anyProcessor)
00284     {
00285         const int n = static_cast<int>(util::numberOfProcessors);
00286         LASS_ASSERT(n > 0);
00287         for (int i = 0; i < n; ++i)
00288         {
00289             CPU_SET(i, &mask);
00290         }
00291     }
00292     else
00293     {
00294         if (processor >= util::numberOfProcessors)
00295         {
00296             LASS_THROW("'" << processor << "' is an invalid processor index. "
00297                 << "Valid range is [0, " << util::numberOfProcessors << ").");
00298         }
00299         LASS_ASSERT(static_cast<int>(processor) >= 0);
00300         CPU_SET(static_cast<int>(processor), &mask);
00301     }
00302     
00303     LASS_ENFORCE_CLIB(sched_setaffinity(pid, sizeof(cpu_set_t), &mask));    
00304 }
00305 
00306 
00307 
00308 #define LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(exception_type)\
00309     catch (const exception_type& error)\
00310     {\
00311         pimpl->error_.reset(new experimental::RemoteExceptionWrapper<exception_type>(error));\
00312     }
00313 
00314 /** @internal
00315  *  @ingroup Threading
00316  */
00317 class ThreadInternal: NonCopyable
00318 {
00319 public:
00320 
00321     ThreadInternal(Thread& iThread, ThreadKind iKind, const char* /*name*/):
00322         thread_(iThread),
00323         isJoinable_(iKind == threadJoinable),
00324         isCreated_(false)
00325     {
00326 
00327     }
00328     
00329     ~ThreadInternal()
00330     {
00331     }
00332     
00333     /** run thread.
00334         */
00335     void run()
00336     {
00337         if (isCreated_)
00338         {
00339             LASS_THROW("You can run a thread only once");
00340         }
00341         LASS_ENFORCE_CLIB_RC(pthread_create(
00342             &handle_, 0, &ThreadInternal::staticThreadStart, this));
00343         while (!isCreated_)
00344         {
00345             runCondition_.wait(100);
00346         }
00347     }
00348     
00349     void join()
00350     {           
00351         if (!(isJoinable_ && isCreated_))
00352         {
00353             LASS_THROW("Can not wait for uncreated or detached threads");
00354         }
00355         else
00356         {
00357             pthread_join(handle_, 0);
00358             isJoinable_ = false;
00359             if (error_.get())
00360             {
00361                 error_->throwSelf();
00362             }
00363         }
00364     }
00365 
00366     void bind(size_t processor)
00367     {
00368         bindThread(tid_, processor);
00369     }
00370     
00371     static void sleep(unsigned long iMilliSeconds)
00372     {
00373         timespec timeOut;
00374         if (iMilliSeconds < 1000)
00375         {
00376             timeOut.tv_sec = 0;
00377             timeOut.tv_nsec = iMilliSeconds * 1000000;
00378         }
00379         else
00380         {
00381             timeOut.tv_sec = iMilliSeconds / 1000;
00382             timeOut.tv_nsec = (iMilliSeconds % 1000) * 1000000;
00383         }
00384         
00385         // nanosleep may return earlier than expected if there's a signal
00386         // that should be handled by the calling thread.  If it happens,
00387         // sleep again. [Bramz]
00388         //
00389         timespec timeRemaining;
00390         while (true)
00391         {
00392             const int ret = nanosleep(&timeOut, &timeRemaining);
00393             if (ret == 0)
00394             {
00395                 // we're done =)
00396                 return; 
00397             }
00398             const int errnum = impl::lass_errno();
00399             if (errnum != EINTR)
00400             {
00401                 // there was an error :(
00402                 LASS_THROW("nanosleep failed: (" << errnum
00403                     << ") " << impl::lass_strerror(errnum));
00404             }
00405             // if we're here, there was only an sleep interruption
00406             // go back to sleep.
00407             timeOut.tv_sec = timeRemaining.tv_sec;
00408             timeOut.tv_nsec = timeRemaining.tv_nsec;
00409         }
00410         LASS_ASSERT(timeRemaining.tv_sec == 0 && timeRemaining.tv_nsec == 0);
00411     }
00412     
00413     static void yield()
00414     {
00415         LASS_ENFORCE_CLIB(sched_yield());
00416     }
00417 
00418     static void bindCurrent(size_t processor)
00419     {
00420         bindThread(0, processor);
00421     }
00422 
00423     // thread function
00424     static void* staticThreadStart(void* iPimpl)
00425     {
00426         LASS_ASSERT(iPimpl);
00427         ThreadInternal* pimpl = static_cast<ThreadInternal*>(iPimpl);
00428         pimpl->tid_ = getpid();
00429         pimpl->isCreated_ = true;
00430         if (pimpl->isJoinable_)
00431         {
00432             try
00433             {
00434                 pimpl->runCondition_.signal();
00435                 pimpl->thread_.doRun();
00436             }
00437             catch (const experimental::RemoteExceptionBase& error)
00438             {
00439                 pimpl->error_ = error.clone();
00440             }
00441             LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::domain_error)
00442             LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::invalid_argument)
00443             LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::length_error)
00444             LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::out_of_range)
00445             LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::range_error)
00446             LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::overflow_error)
00447             LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::underflow_error)
00448             LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::runtime_error)
00449             LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::logic_error)
00450             LASS_UTIL_THREAD_POSIX_CATCH_AND_WRAP(::std::exception)
00451         }
00452         else
00453         {
00454             pimpl->runCondition_.signal();
00455             pimpl->thread_.doRun();
00456             delete &pimpl->thread_;
00457         }
00458         return 0;
00459     }
00460 
00461 private:
00462 
00463     Thread& thread_;
00464     pthread_t handle_;   // handle of the thread
00465     Condition runCondition_;
00466     std::auto_ptr<experimental::RemoteExceptionBase> error_;
00467     pid_t tid_;
00468     volatile bool isJoinable_;
00469     volatile bool isCreated_;
00470 };
00471 
00472 
00473 
00474 /** @internal
00475  *  @ingroup Threading
00476  */
00477 class ThreadLocalStorageInternal: NonCopyable
00478 {
00479 public:
00480     ThreadLocalStorageInternal(void (*destructor)(void*))
00481     {
00482         LASS_ENFORCE_CLIB_RC(pthread_key_create(&key_, destructor));
00483     }
00484     ~ThreadLocalStorageInternal()
00485     {
00486         LASS_WARN_CLIB_RC(pthread_key_delete(key_));
00487     }
00488     void* const get() const
00489     {
00490         return pthread_getspecific(key_);
00491     }
00492     void set(const void* value)
00493     {
00494         LASS_ENFORCE_CLIB_RC(pthread_setspecific(key_, value));
00495     }
00496 private:
00497     pthread_key_t key_;
00498 };
00499 
00500 }
00501 }
00502 }
00503 
00504 #endif
00505  
00506 // EOF

Generated on Mon Nov 10 14:21:40 2008 for Library of Assembled Shared Sources by doxygen 1.5.7.1
SourceForge.net Logo