blocxx
Condition.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2 * Copyright (C) 2005, Vintela, Inc. All rights reserved.
3 * Copyright (C) 2006, Novell, Inc. All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of
14 * Vintela, Inc.,
15 * nor Novell, Inc.,
16 * nor the names of its contributors or employees may be used to
17 * endorse or promote products derived from this software without
18 * specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
31 *******************************************************************************/
32 
33 
37 #include "blocxx/BLOCXX_config.h"
38 #include "blocxx/Condition.hpp"
40 #include "blocxx/ExceptionIds.hpp"
41 #include "blocxx/Timeout.hpp"
42 #include "blocxx/TimeoutTimer.hpp"
43 #include "blocxx/ThreadImpl.hpp"
44 
45 #include <cassert>
46 #include <cerrno>
47 #include <limits>
48 #ifdef BLOCXX_HAVE_SYS_TIME_H
49 #include <sys/time.h>
50 #endif
51 
52 namespace BLOCXX_NAMESPACE
53 {
54 
57 #if defined(BLOCXX_USE_PTHREAD)
60 {
61  int res = pthread_cond_init(&m_condition, PTHREAD_COND_ATTR_DEFAULT);
62  if (res != 0)
63  {
64  BLOCXX_THROW(ConditionResourceException, "Failed initializing condition variable");
65  }
66 }
69 {
70  int res = pthread_cond_destroy(&m_condition);
71  assert(res == 0);
72 }
74 void
76 {
77  int res = pthread_cond_signal(&m_condition);
78  assert(res == 0);
79 }
81 void
83 {
84  int res = pthread_cond_broadcast(&m_condition);
85  assert(res == 0);
86 }
88 void
90 {
92  int res;
93  NonRecursiveMutexLockState state;
94  mutex.conditionPreWait(state);
95  res = pthread_cond_wait(&m_condition, state.pmutex);
96  mutex.conditionPostWait(state);
97  assert(res == 0 || res == EINTR);
98  if (res == EINTR)
99  {
101  }
102 }
104 namespace
105 {
106  inline
107  bool timespec_less(struct timespec const & x, struct timespec const & y)
108  {
109  return x.tv_sec < y.tv_sec ||
110  x.tv_sec == y.tv_sec && x.tv_nsec < y.tv_nsec;
111  }
112 
113  int check_timedwait(
114  int rc, pthread_cond_t * cond, pthread_mutex_t * mtx,
115  struct timespec const * abstime
116  )
117  {
118 #ifdef BLOCXX_NCR
119  if (rc == -1 && errno == EAGAIN)
120  {
121  return ETIMEDOUT;
122  }
123 #endif
124  if (rc != EINVAL)
125  {
126  return rc;
127  }
128  // Solaris won't let you wait more than 10 ** 8 seconds.
129  time_t const max_future = 99999999;
130  time_t const max_time = std::numeric_limits<time_t>::max();
131  time_t now_sec = DateTime::getCurrent().get();
132  struct timespec new_abstime;
133  new_abstime.tv_sec = (
134  now_sec <= max_time - max_future
135  ? now_sec + max_future
136  : max_time
137  );
138  new_abstime.tv_nsec = 0;
139  bool early = timespec_less(new_abstime, *abstime);
140  if (!early)
141  {
142  new_abstime = *abstime;
143  }
144  int newrc = pthread_cond_timedwait(cond, mtx, &new_abstime);
145  return (newrc == ETIMEDOUT && early ? EINTR : newrc);
146  }
147 }
148 
149 bool
150 Condition::doTimedWait(NonRecursiveMutex& mutex, const Timeout& timeout)
151 {
153  int res;
154  NonRecursiveMutexLockState state;
155  mutex.conditionPreWait(state);
156  bool ret = false;
157 
158  timespec ts;
159  TimeoutTimer timer(timeout);
160 
161  res = pthread_cond_timedwait(&m_condition, state.pmutex, timer.asTimespec(ts));
162  res = check_timedwait(res, &m_condition, state.pmutex, &ts);
163  mutex.conditionPostWait(state);
164  assert(res == 0 || res == ETIMEDOUT || res == EINTR);
165  if (res == EINTR)
166  {
168  }
169  ret = res != ETIMEDOUT;
170  return ret;
171 }
172 #elif defined (BLOCXX_WIN32)
175  : m_condition(new ConditionInfo_t)
176 {
177  m_condition->waitersCount = 0;
178  m_condition->wasBroadcast = false;
179  m_condition->queue = ::CreateSemaphore(
180  NULL, // No security
181  0, // initially 0
182  0x7fffffff, // max count
183  NULL); // Unnamed
184  ::InitializeCriticalSection(&m_condition->waitersCountLock);
185  m_condition->waitersDone = ::CreateEvent(
186  NULL, // No security
187  false, // auto-reset
188  false, // non-signaled initially
189  NULL); // Unnamed
190 }
193 {
194  ::CloseHandle(m_condition->queue);
195  ::DeleteCriticalSection(&m_condition->waitersCountLock);
196  ::CloseHandle(m_condition->waitersDone);
197  delete m_condition;
198 }
200 void
202 {
203  ::EnterCriticalSection(&m_condition->waitersCountLock);
204  bool haveWaiters = m_condition->waitersCount > 0;
205  ::LeaveCriticalSection(&m_condition->waitersCountLock);
206 
207  // If no threads waiting, then this is a no-op
208  if (haveWaiters)
209  {
210  ::ReleaseSemaphore(m_condition->queue, 1, 0);
211  }
212 }
214 void
216 {
217  ::EnterCriticalSection(&m_condition->waitersCountLock);
218  bool haveWaiters = false;
219  if (m_condition->waitersCount > 0)
220  {
221  // It's gonna be a broadcast, even if there's only one waiting thread.
222  haveWaiters = m_condition->wasBroadcast = true;
223  }
224 
225  if (haveWaiters)
226  {
227  // Wake up all the waiting threads atomically
228  ::ReleaseSemaphore(m_condition->queue, m_condition->waitersCount, 0);
229  ::LeaveCriticalSection(&m_condition->waitersCountLock);
230 
231  // Wait for all the threads to acquire the counting semaphore
232  ::WaitForSingleObject(m_condition->waitersDone, INFINITE);
233  m_condition->wasBroadcast = false;
234  }
235  else
236  {
237  ::LeaveCriticalSection(&m_condition->waitersCountLock);
238  }
239 }
241 void
243 {
244  doTimedWait(mutex, Timeout::infinite);
245 }
247 bool
248 //Condition::doTimedWait(NonRecursiveMutex& mutex, UInt32 sTimeout, UInt32 usTimeout)
249 Condition::doTimedWait(NonRecursiveMutex& mutex, const Timeout& timeout)
250 {
252  bool cc = true;
253  NonRecursiveMutexLockState state;
254  mutex.conditionPreWait(state);
255 
256  ::EnterCriticalSection(&m_condition->waitersCountLock);
257  m_condition->waitersCount++;
258  ::LeaveCriticalSection(&m_condition->waitersCountLock);
259 
260  TimeoutTimer timer(timeout);
261  // Atomically release the mutex and wait on the
262  // queue until signal/broadcast.
263  if (::SignalObjectAndWait(mutex.m_mutex, m_condition->queue, timer.asDWORDMs(),
264  false) == WAIT_TIMEOUT)
265  {
266  cc = false;
267  }
268 
269  ::EnterCriticalSection(&m_condition->waitersCountLock);
270  m_condition->waitersCount--;
271 
272  // Check to see if we're the last waiter after the broadcast
273  bool isLastWaiter = (m_condition->wasBroadcast && m_condition->waitersCount == 0
274  && cc == true);
275 
276  ::LeaveCriticalSection(&m_condition->waitersCountLock);
277 
278  // If this is the last thread waiting for this broadcast, then let all the
279  // other threads proceed.
280  if (isLastWaiter)
281  {
282  // Atomically signal the waitersDone event and wait to acquire
283  // the external mutex. Enusres fairness
284  ::SignalObjectAndWait(m_condition->waitersDone, mutex.m_mutex,
285  INFINITE, false);
286  }
287  else
288  {
289  // Re-gain ownership of the external mutex
290  ::WaitForSingleObject(mutex.m_mutex, INFINITE);
291  }
292  mutex.conditionPostWait(state);
293  return cc;
294 }
295 #else
296 #error "port me!"
297 #endif
298 void
301 {
302  if (!lock.isLocked())
303  {
304  BLOCXX_THROW(ConditionLockException, "Lock must be locked");
305  }
306  doWait(*(lock.m_mutex));
307 }
309 bool
310 Condition::timedWait(NonRecursiveMutexLock& lock, UInt32 sTimeout, UInt32 usTimeout)
311 {
312  return timedWait(lock, Timeout::relative(sTimeout + static_cast<float>(usTimeout) / 1000000.0));
313 }
314 
316 bool
318 {
319  if (!lock.isLocked())
320  {
321  BLOCXX_THROW(ConditionLockException, "Lock must be locked");
322  }
323  return doTimedWait(*(lock.m_mutex), timeout);
324 }
325 
326 } // end namespace BLOCXX_NAMESPACE
327 
Taken from RFC 1321.
A TimeoutTimer is used by an algorithm to determine when a timeout has expired.
void conditionPreWait(NonRecursiveMutexLockState &state)
void wait(NonRecursiveMutexLock &lock)
Atomically unlock a given mutex and wait for the this Condition object to get signalled.
Definition: Condition.cpp:300
~Condition()
Destroy this Condition object.
void notifyAll()
Signal all threads that are currently waiting on the Condition object.
#define BLOCXX_DEFINE_EXCEPTION_WITH_ID(NAME)
Define a new exception class named <NAME>Exception that derives from Exception.
Definition: Exception.hpp:449
void conditionPostWait(NonRecursiveMutexLockState &state)
bool doTimedWait(NonRecursiveMutex &mutex, const Timeout &timeout)
BLOCXX_COMMON_API void testCancel()
Test if this thread has been cancelled.
static Timeout relative(float seconds)
Definition: Timeout.cpp:58
void doWait(NonRecursiveMutex &mutex)
A timeout can be absolute, which means that it will happen at the specified DateTime.
Definition: Timeout.hpp:55
bool timedWait(NonRecursiveMutexLock &lock, const Timeout &timeout)
Atomically unlock a given mutex and wait for a given amount of time for this Condition object to get ...
Definition: Condition.cpp:317
static Timeout infinite
Definition: Timeout.hpp:62
EParserState state
ConditionVar_t m_condition
Definition: Condition.hpp:150
Condition()
Construct a new Condition object.
#define BLOCXX_THROW(exType, msg)
Throw an exception using FILE and LINE.
Definition: Exception.hpp:263
static DateTime getCurrent()
Gets a DateTime instance set to the current system time.
Definition: DateTime.cpp:1329
Note that descriptions of what exceptions may be thrown assumes that object is used correctly...
Note that descriptions of what exceptions may be thrown assumes that object is used correctly...
#define ETIMEDOUT
Definition: SocketUtils.hpp:47
void notifyOne()
Signal one thread that is currently waiting on the Condition object through the wait or timedWait met...