condition.hpp 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. //////////////////////////////////////////////////////////////////////////////
  2. //
  3. // (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost
  4. // Software License, Version 1.0. (See accompanying file
  5. // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // See http://www.boost.org/libs/interprocess for documentation.
  8. //
  9. //////////////////////////////////////////////////////////////////////////////
  10. #ifndef BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP
  11. #define BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP
  12. #include <boost/interprocess/detail/config_begin.hpp>
  13. #include <boost/interprocess/detail/workaround.hpp>
  14. #include <boost/interprocess/sync/spin/mutex.hpp>
  15. #include <boost/interprocess/detail/posix_time_types_wrk.hpp>
  16. #include <boost/interprocess/detail/atomic.hpp>
  17. #include <boost/interprocess/sync/scoped_lock.hpp>
  18. #include <boost/interprocess/exceptions.hpp>
  19. #include <boost/interprocess/detail/os_thread_functions.hpp>
  20. #include <boost/move/move.hpp>
  21. #include <boost/cstdint.hpp>
  22. namespace boost {
  23. namespace interprocess {
  24. namespace ipcdetail {
  25. class spin_condition
  26. {
  27. spin_condition(const spin_condition &);
  28. spin_condition &operator=(const spin_condition &);
  29. public:
  30. spin_condition();
  31. ~spin_condition();
  32. void notify_one();
  33. void notify_all();
  34. template <typename L>
  35. bool timed_wait(L& lock, const boost::posix_time::ptime &abs_time)
  36. {
  37. if(abs_time == boost::posix_time::pos_infin){
  38. this->wait(lock);
  39. return true;
  40. }
  41. if (!lock)
  42. throw lock_exception();
  43. return this->do_timed_wait(abs_time, *lock.mutex());
  44. }
  45. template <typename L, typename Pr>
  46. bool timed_wait(L& lock, const boost::posix_time::ptime &abs_time, Pr pred)
  47. {
  48. if(abs_time == boost::posix_time::pos_infin){
  49. this->wait(lock, pred);
  50. return true;
  51. }
  52. if (!lock)
  53. throw lock_exception();
  54. while (!pred()){
  55. if (!this->do_timed_wait(abs_time, *lock.mutex()))
  56. return pred();
  57. }
  58. return true;
  59. }
  60. template <typename L>
  61. void wait(L& lock)
  62. {
  63. if (!lock)
  64. throw lock_exception();
  65. do_wait(*lock.mutex());
  66. }
  67. template <typename L, typename Pr>
  68. void wait(L& lock, Pr pred)
  69. {
  70. if (!lock)
  71. throw lock_exception();
  72. while (!pred())
  73. do_wait(*lock.mutex());
  74. }
  75. template<class InterprocessMutex>
  76. void do_wait(InterprocessMutex &mut);
  77. template<class InterprocessMutex>
  78. bool do_timed_wait(const boost::posix_time::ptime &abs_time, InterprocessMutex &mut);
  79. private:
  80. template<class InterprocessMutex>
  81. bool do_timed_wait(bool tout_enabled, const boost::posix_time::ptime &abs_time, InterprocessMutex &mut);
  82. enum { SLEEP = 0, NOTIFY_ONE, NOTIFY_ALL };
  83. spin_mutex m_enter_mut;
  84. volatile boost::uint32_t m_command;
  85. volatile boost::uint32_t m_num_waiters;
  86. void notify(boost::uint32_t command);
  87. };
  88. inline spin_condition::spin_condition()
  89. {
  90. //Note that this class is initialized to zero.
  91. //So zeroed memory can be interpreted as an initialized
  92. //condition variable
  93. m_command = SLEEP;
  94. m_num_waiters = 0;
  95. }
  96. inline spin_condition::~spin_condition()
  97. {
  98. //Trivial destructor
  99. }
  100. inline void spin_condition::notify_one()
  101. {
  102. this->notify(NOTIFY_ONE);
  103. }
  104. inline void spin_condition::notify_all()
  105. {
  106. this->notify(NOTIFY_ALL);
  107. }
  108. inline void spin_condition::notify(boost::uint32_t command)
  109. {
  110. //This mutex guarantees that no other thread can enter to the
  111. //do_timed_wait method logic, so that thread count will be
  112. //constant until the function writes a NOTIFY_ALL command.
  113. //It also guarantees that no other notification can be signaled
  114. //on this spin_condition before this one ends
  115. m_enter_mut.lock();
  116. //Return if there are no waiters
  117. if(!atomic_read32(&m_num_waiters)) {
  118. m_enter_mut.unlock();
  119. return;
  120. }
  121. //Notify that all threads should execute wait logic
  122. while(SLEEP != atomic_cas32(const_cast<boost::uint32_t*>(&m_command), command, SLEEP)){
  123. thread_yield();
  124. }
  125. /*
  126. //Wait until the threads are woken
  127. while(SLEEP != atomic_cas32(const_cast<boost::uint32_t*>(&m_command), 0)){
  128. thread_yield();
  129. }
  130. */
  131. //The enter mutex will rest locked until the last waiting thread unlocks it
  132. }
  133. template<class InterprocessMutex>
  134. inline void spin_condition::do_wait(InterprocessMutex &mut)
  135. {
  136. this->do_timed_wait(false, boost::posix_time::ptime(), mut);
  137. }
  138. template<class InterprocessMutex>
  139. inline bool spin_condition::do_timed_wait
  140. (const boost::posix_time::ptime &abs_time, InterprocessMutex &mut)
  141. {
  142. return this->do_timed_wait(true, abs_time, mut);
  143. }
  144. template<class InterprocessMutex>
  145. inline bool spin_condition::do_timed_wait(bool tout_enabled,
  146. const boost::posix_time::ptime &abs_time,
  147. InterprocessMutex &mut)
  148. {
  149. boost::posix_time::ptime now = microsec_clock::universal_time();
  150. if(tout_enabled){
  151. if(now >= abs_time) return false;
  152. }
  153. typedef boost::interprocess::scoped_lock<spin_mutex> InternalLock;
  154. //The enter mutex guarantees that while executing a notification,
  155. //no other thread can execute the do_timed_wait method.
  156. {
  157. //---------------------------------------------------------------
  158. InternalLock lock;
  159. if(tout_enabled){
  160. InternalLock dummy(m_enter_mut, abs_time);
  161. lock = boost::move(dummy);
  162. }
  163. else{
  164. InternalLock dummy(m_enter_mut);
  165. lock = boost::move(dummy);
  166. }
  167. if(!lock)
  168. return false;
  169. //---------------------------------------------------------------
  170. //We increment the waiting thread count protected so that it will be
  171. //always constant when another thread enters the notification logic.
  172. //The increment marks this thread as "waiting on spin_condition"
  173. atomic_inc32(const_cast<boost::uint32_t*>(&m_num_waiters));
  174. //We unlock the external mutex atomically with the increment
  175. mut.unlock();
  176. }
  177. //By default, we suppose that no timeout has happened
  178. bool timed_out = false, unlock_enter_mut= false;
  179. //Loop until a notification indicates that the thread should
  180. //exit or timeout occurs
  181. while(1){
  182. //The thread sleeps/spins until a spin_condition commands a notification
  183. //Notification occurred, we will lock the checking mutex so that
  184. while(atomic_read32(&m_command) == SLEEP){
  185. thread_yield();
  186. //Check for timeout
  187. if(tout_enabled){
  188. now = microsec_clock::universal_time();
  189. if(now >= abs_time){
  190. //If we can lock the mutex it means that no notification
  191. //is being executed in this spin_condition variable
  192. timed_out = m_enter_mut.try_lock();
  193. //If locking fails, indicates that another thread is executing
  194. //notification, so we play the notification game
  195. if(!timed_out){
  196. //There is an ongoing notification, we will try again later
  197. continue;
  198. }
  199. //No notification in execution, since enter mutex is locked.
  200. //We will execute time-out logic, so we will decrement count,
  201. //release the enter mutex and return false.
  202. break;
  203. }
  204. }
  205. }
  206. //If a timeout occurred, the mutex will not execute checking logic
  207. if(tout_enabled && timed_out){
  208. //Decrement wait count
  209. atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
  210. unlock_enter_mut = true;
  211. break;
  212. }
  213. else{
  214. boost::uint32_t result = atomic_cas32
  215. (const_cast<boost::uint32_t*>(&m_command), SLEEP, NOTIFY_ONE);
  216. if(result == SLEEP){
  217. //Other thread has been notified and since it was a NOTIFY one
  218. //command, this thread must sleep again
  219. continue;
  220. }
  221. else if(result == NOTIFY_ONE){
  222. //If it was a NOTIFY_ONE command, only this thread should
  223. //exit. This thread has atomically marked command as sleep before
  224. //so no other thread will exit.
  225. //Decrement wait count.
  226. unlock_enter_mut = true;
  227. atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
  228. break;
  229. }
  230. else{
  231. //If it is a NOTIFY_ALL command, all threads should return
  232. //from do_timed_wait function. Decrement wait count.
  233. unlock_enter_mut = 1 == atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
  234. //Check if this is the last thread of notify_all waiters
  235. //Only the last thread will release the mutex
  236. if(unlock_enter_mut){
  237. atomic_cas32(const_cast<boost::uint32_t*>(&m_command), SLEEP, NOTIFY_ALL);
  238. }
  239. break;
  240. }
  241. }
  242. }
  243. //Unlock the enter mutex if it is a single notification, if this is
  244. //the last notified thread in a notify_all or a timeout has occurred
  245. if(unlock_enter_mut){
  246. m_enter_mut.unlock();
  247. }
  248. //Lock external again before returning from the method
  249. mut.lock();
  250. return !timed_out;
  251. }
  252. } //namespace ipcdetail
  253. } //namespace interprocess
  254. } //namespace boost
  255. #include <boost/interprocess/detail/config_end.hpp>
  256. #endif //BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP