blocxx
Select.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2 * Copyright (C) 2005, Vintela, Inc. All rights reserved.
3 * Copyright (C) 2006, Novell, Inc. All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of
14 * Vintela, Inc.,
15 * nor Novell, Inc.,
16 * nor the names of its contributors or employees may be used to
17 * endorse or promote products derived from this software without
18 * specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
31 *******************************************************************************/
32 
33 
39 #include "blocxx/BLOCXX_config.h"
40 #include "blocxx/Select.hpp"
41 #include "blocxx/AutoPtr.hpp"
42 #include "blocxx/Assertion.hpp"
43 #include "blocxx/Thread.hpp" // for testCancel()
44 #include "blocxx/TimeoutTimer.hpp"
46 
47 #if defined(BLOCXX_WIN32)
48 #include <cassert>
49 #endif
50 
51 extern "C"
52 {
53 
54 #ifndef BLOCXX_WIN32
55  #ifdef BLOCXX_HAVE_SYS_EPOLL_H
56  #include <sys/epoll.h>
57  #endif
58  #if defined (BLOCXX_HAVE_SYS_POLL_H)
59  #include <sys/poll.h>
60  #endif
61  #if defined (BLOCXX_HAVE_SYS_SELECT_H)
62  #include <sys/select.h>
63  #endif
64 #endif
65 
66 #ifdef BLOCXX_HAVE_SYS_TIME_H
67  #include <sys/time.h>
68 #endif
69 
70 #include <sys/types.h>
71 
72 #ifdef BLOCXX_HAVE_UNISTD_H
73  #include <unistd.h>
74 #endif
75 
76 #include <errno.h>
77 }
78 
79 namespace BLOCXX_NAMESPACE
80 {
81 
82 namespace Select
83 {
84 
85 namespace
86 {
87  const float LOOP_TIMEOUT = 10.0;
88 }
90 // deprecated in 4.0.0
91 int
92 selectRW(SelectObjectArray& selarray, UInt32 ms)
93 {
94  return selectRW(selarray, Timeout::relative(static_cast<float>(ms) * 1000));
95 }
96 
97 #if defined(BLOCXX_WIN32)
98 int
100 selectRW(SelectObjectArray& selarray, const Timeout& timeout)
101 {
102  int rc;
103  size_t hcount = static_cast<DWORD>(selarray.size());
104  AutoPtrVec<HANDLE> hdls(new HANDLE[hcount]);
105 
106  size_t handleidx = 0;
107  for (size_t i = 0; i < selarray.size(); i++, handleidx++)
108  {
109  if(selarray[i].s.isSocket && selarray[i].s.networkevents)
110  {
111  ::WSAEventSelect(selarray[i].s.sockfd,
112  selarray[i].s.event, selarray[i].s.networkevents);
113  }
114 
115  hdls[handleidx] = selarray[i].s.event;
116  }
117 
118  TimeoutTimer timer(timeout);
119  timer.start();
120  DWORD cc = ::WaitForMultipleObjects(hcount, hdls.get(), FALSE, timer.asDWORDMs());
121 
122  assert(cc != WAIT_ABANDONED);
123 
124  switch (cc)
125  {
126  case WAIT_FAILED:
128  break;
129  case WAIT_TIMEOUT:
131  break;
132  default:
133  rc = cc - WAIT_OBJECT_0;
134 
135  // If this is a socket, set it back to
136  // blocking mode
137  if(selarray[rc].s.isSocket)
138  {
139  if(selarray[rc].s.networkevents
140  && selarray[rc].s.doreset == false)
141  {
142  ::WSAEventSelect(selarray[rc].s.sockfd,
143  selarray[rc].s.event, selarray[rc].s.networkevents);
144  }
145  else
146  {
147  // Set socket back to blocking
148  ::WSAEventSelect(selarray[rc].s.sockfd,
149  selarray[rc].s.event, 0);
150  u_long ioctlarg = 0;
151  ::ioctlsocket(selarray[rc].s.sockfd, FIONBIO, &ioctlarg);
152  }
153  }
154  break;
155  }
156 
157  if( rc < 0 )
158  return rc;
159 
160  int availableCount = 0;
161  for (size_t i = 0; i < selarray.size(); i++)
162  {
163  if( WaitForSingleObject(selarray[i].s.event, 0) == WAIT_OBJECT_0 )
164  {
165  if( selarray[i].waitForRead )
166  selarray[i].readAvailable = true;
167  if( selarray[i].waitForWrite )
168  selarray[i].writeAvailable = true;
169  ++availableCount;
170  }
171  else
172  {
173  selarray[i].readAvailable = false;
174  selarray[i].writeAvailable = false;
175  }
176  }
177  return availableCount;
178 }
179 
180 
181 #else
182 
184 // epoll version
185 int
186 selectRWEpoll(SelectObjectArray& selarray, const Timeout& timeout)
187 {
188 #ifdef BLOCXX_HAVE_SYS_EPOLL_H
189  int ecc = 0;
190  AutoPtrVec<epoll_event> events(new epoll_event[selarray.size()]);
191  AutoDescriptor epfd(epoll_create(selarray.size()));
192  if(epfd.get() == -1)
193  {
194  if (errno == ENOSYS) // kernel doesn't support it
195  {
196  return SELECT_NOT_IMPLEMENTED;
197  }
198  // Need to return something else?
199  return Select::SELECT_ERROR;
200  }
201 
202  UInt32 const read_events = EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP;
203  UInt32 const write_events = EPOLLOUT | EPOLLERR | EPOLLHUP;
204  for (size_t i = 0; i < selarray.size(); i++)
205  {
206  BLOCXX_ASSERT(selarray[i].s >= 0);
207  selarray[i].readAvailable = false;
208  selarray[i].writeAvailable = false;
209  selarray[i].wasError = false;
210  events[i].data = epoll_data_t(); // zero-init to make valgrind happy
211  events[i].data.u32 = i;
212  events[i].events = 0;
213  if(selarray[i].waitForRead)
214  {
215  events[i].events |= read_events;
216  }
217  if(selarray[i].waitForWrite)
218  {
219  events[i].events |= write_events;
220  }
221 
222  if(epoll_ctl(epfd.get(), EPOLL_CTL_ADD, selarray[i].s, &events[i]) != 0)
223  {
224  return errno == EPERM ? SELECT_NOT_IMPLEMENTED : SELECT_ERROR;
225  }
226  }
227 
228  // here we spin checking for thread cancellation every so often.
229 
230  TimeoutTimer timer(timeout);
231  timer.start();
232  int savedErrno;
233  do
234  {
236  const float maxWaitSec = LOOP_TIMEOUT;
237  ecc = epoll_wait(epfd.get(), events.get(), selarray.size(), timer.asIntMs(maxWaitSec));
238  savedErrno = errno;
239  if (ecc < 0 && errno == EINTR)
240  {
241  ecc = 0;
242  errno = 0;
244  }
245  timer.loop();
246  } while ((ecc == 0) && !timer.expired());
247 
248  if (ecc < 0)
249  {
250  errno = savedErrno;
251  return Select::SELECT_ERROR;
252  }
253  if (ecc == 0)
254  {
255  return Select::SELECT_TIMEOUT;
256  }
257 
258  for(int i = 0; i < ecc; i++)
259  {
260  SelectObject & so = selarray[events[i].data.u32];
261  so.readAvailable = so.waitForRead && (events[i].events & read_events);
262  so.writeAvailable = so.waitForWrite && (events[i].events & write_events);
263  }
264 
265  return ecc;
266 #else
267  return SELECT_NOT_IMPLEMENTED;
268 #endif
269 }
270 
272 // poll() version
273 int
274 selectRWPoll(SelectObjectArray& selarray, const Timeout& timeout)
275 {
276 #if defined (BLOCXX_HAVE_SYS_POLL_H)
277  int rc = 0;
278 
279  AutoPtrVec<pollfd> pfds(new pollfd[selarray.size()]);
280 
281  // here we spin checking for thread cancellation every so often.
282  TimeoutTimer timer(timeout);
283  timer.start();
284 
285  int savedErrno;
286  do
287  {
288  for (size_t i = 0; i < selarray.size(); i++)
289  {
290  BLOCXX_ASSERT(selarray[i].s >= 0);
291  selarray[i].readAvailable = false;
292  selarray[i].writeAvailable = false;
293  selarray[i].wasError = false;
294  pfds[i].revents = 0;
295  pfds[i].fd = selarray[i].s;
296  pfds[i].events = selarray[i].waitForRead ? (POLLIN | POLLPRI) : 0;
297  if(selarray[i].waitForWrite)
298  pfds[i].events |= POLLOUT;
299  }
300 
302  const float maxWaitSec = LOOP_TIMEOUT;
303  rc = ::poll(pfds.get(), selarray.size(), timer.asIntMs(maxWaitSec));
304  savedErrno = errno;
305  if (rc < 0 && errno == EINTR)
306  {
307  rc = 0;
308  errno = 0;
310 #ifdef BLOCXX_NETWARE
311  // When the NetWare server is shutting down, select will
312  // set errno to EINTR on return. If this thread does not
313  // yield control (cooperative multitasking) then we end
314  // up in a very tight loop and get a CPUHog server abbend.
315  pthread_yield();
316 #endif
317  }
318 
319  timer.loop();
320  } while ((rc == 0) && !timer.expired());
321 
322  if (rc < 0)
323  {
324  errno = savedErrno;
325  return Select::SELECT_ERROR;
326  }
327  if (rc == 0)
328  {
329  return Select::SELECT_TIMEOUT;
330  }
331  for (size_t i = 0; i < selarray.size(); i++)
332  {
333  if (pfds[i].revents & (POLLERR | POLLNVAL))
334  {
335  selarray[i].wasError = true;
336  }
337 
338  if(selarray[i].waitForRead)
339  {
340  selarray[i].readAvailable = (pfds[i].revents &
341  (POLLIN | POLLPRI | POLLHUP));
342  }
343 
344  if(selarray[i].waitForWrite)
345  {
346  selarray[i].writeAvailable = (pfds[i].revents &
347  (POLLOUT | POLLHUP));
348  }
349  }
350 
351  return rc;
352 #else
353  return SELECT_NOT_IMPLEMENTED;
354 #endif
355 }
357 // ::select() version
358 int
359 selectRWSelect(SelectObjectArray& selarray, const Timeout& timeout)
360 {
361 #if defined (BLOCXX_HAVE_SYS_SELECT_H)
362  int rc = 0;
363  fd_set ifds;
364  fd_set ofds;
365 
366  // here we spin checking for thread cancellation every so often.
367  TimeoutTimer timer(timeout);
368  timer.start();
369 
370  int savedErrno;
371  do
372  {
373  int maxfd = 0;
374  FD_ZERO(&ifds);
375  FD_ZERO(&ofds);
376  for (size_t i = 0; i < selarray.size(); ++i)
377  {
378  int fd = selarray[i].s;
379  BLOCXX_ASSERT(fd >= 0);
380  if (maxfd < fd)
381  {
382  maxfd = fd;
383  }
384  if (fd < 0 || fd >= FD_SETSIZE)
385  {
386  errno = EINVAL;
387  return Select::SELECT_ERROR;
388  }
389  if (selarray[i].waitForRead)
390  {
391  FD_SET(fd, &ifds);
392  }
393  if (selarray[i].waitForWrite)
394  {
395  FD_SET(fd, &ofds);
396  }
397  }
398 
400  struct timeval tv;
401  const float maxWaitSec = LOOP_TIMEOUT;
402  rc = ::select(maxfd+1, &ifds, &ofds, NULL, timer.asTimeval(tv, maxWaitSec));
403  savedErrno = errno;
404  if (rc < 0 && errno == EINTR)
405  {
406  rc = 0;
407  errno = 0;
409 #ifdef BLOCXX_NETWARE
410  // When the NetWare server is shutting down, select will
411  // set errno to EINTR on return. If this thread does not
412  // yield control (cooperative multitasking) then we end
413  // up in a very tight loop and get a CPUHog server abbend.
414  pthread_yield();
415 #endif
416  }
417 
418  timer.loop();
419  } while ((rc == 0) && !timer.expired());
420 
421  if (rc < 0)
422  {
423  errno = savedErrno;
424  return Select::SELECT_ERROR;
425  }
426  if (rc == 0)
427  {
428  return Select::SELECT_TIMEOUT;
429  }
430  int availableCount = 0;
431  int cval;
432  for (size_t i = 0; i < selarray.size(); i++)
433  {
434  selarray[i].wasError = false;
435  cval = 0;
436  if (FD_ISSET(selarray[i].s, &ifds))
437  {
438  selarray[i].readAvailable = true;
439  cval = 1;
440  }
441  else
442  {
443  selarray[i].readAvailable = false;
444  }
445 
446  if (FD_ISSET(selarray[i].s, &ofds))
447  {
448  selarray[i].writeAvailable = true;
449  cval = 1;
450  }
451  else
452  {
453  selarray[i].writeAvailable = false;
454  }
455 
456  availableCount += cval;
457 
458  }
459 
460  return availableCount;
461 #else
462  return SELECT_NOT_IMPLEMENTED;
463 #endif
464 }
465 
466 int
467 selectRW(SelectObjectArray& selarray, const Timeout& timeout)
468 {
469  int rv = selectRWEpoll(selarray, timeout);
470  if (rv != SELECT_NOT_IMPLEMENTED)
471  {
472  return rv;
473  }
474 
475  rv = selectRWPoll(selarray, timeout);
476  if (rv != SELECT_NOT_IMPLEMENTED)
477  {
478  return rv;
479  }
480 
481  rv = selectRWSelect(selarray, timeout);
483  return rv;
484 }
485 
487 #endif // #else BLOCXX_WIN32
488 
489 int
490 select(const SelectTypeArray& selarray, UInt32 ms)
491 {
492  return select(selarray, Timeout::relative(static_cast<float>(ms) * 1000.0));
493 }
494 
496 int
497 select(const SelectTypeArray& selarray, const Timeout& timeout)
498 {
499  SelectObjectArray soa;
500  soa.reserve(selarray.size());
501  for (size_t i = 0; i < selarray.size(); ++i)
502  {
503  SelectObject curObj(selarray[i]);
504  curObj.waitForRead = true;
505  soa.push_back(curObj);
506  }
507  int rv = selectRW(soa, timeout);
508  if (rv < 0)
509  {
510  return rv;
511  }
512 
513  // find the first selected object
514  for (size_t i = 0; i < soa.size(); ++i)
515  {
516  if (soa[i].readAvailable)
517  {
518  return i;
519  }
520  }
521  errno = 0;
522  return SELECT_ERROR;
523 }
524 
525 } // end namespace Select
526 
527 } // end namespace BLOCXX_NAMESPACE
528 
Array<> wraps std::vector<> in COWReference<> adding ref counting and copy on write capability...
Definition: Array.hpp:65
Taken from RFC 1321.
Array< SelectObject > SelectObjectArray
Definition: Select.hpp:113
A TimeoutTimer is used by an algorithm to determine when a timeout has expired.
int selectRWPoll(SelectObjectArray &selarray, const Timeout &timeout)
Definition: Select.cpp:274
void loop()
Meant to be called by timeout functions which loop, but don't want to reset the interval.
bool waitForWrite
Input parameter. Set it to true to indicate that waiting for write availability on s is desired...
Definition: Select.hpp:105
#define BLOCXX_ASSERT(CON)
BLOCXX_ASSERT works similar to the assert() macro, but instead of calling abort(), it throws an AssertionException.
Definition: Assertion.hpp:57
int select(const SelectTypeArray &selarray, UInt32 ms)
Select returns as soon as input is available on any of Select_t objects that are in given array...
Definition: Select.cpp:490
bool expired() const
Indicates whether the last loop time has exceeded the timeout.
void reserve(size_type n)
Ensure the capacity is at least the size of a given value.
Definition: ArrayImpl.hpp:216
static Timeout relative(float seconds)
Definition: Timeout.cpp:58
const int SELECT_ERROR
The value returned from select when any error occurs other than timeout.
Definition: Select.hpp:63
int selectRWSelect(SelectObjectArray &selarray, const Timeout &timeout)
Definition: Select.cpp:359
const int SELECT_TIMEOUT
The value returned from select when the timeout value has expired.
Definition: Select.hpp:59
A timeout can be absolute, which means that it will happen at the specified DateTime.
Definition: Timeout.hpp:55
PURPOSE: The AutoResource class template is an analog of std::auto_ptr for managing arbitrary resourc...
bool writeAvailable
Ouput parameter. Will be set to true to indicate that s has become available for writing.
Definition: Select.hpp:109
void start()
Meant to be called by timeout functions which loop.
bool waitForRead
Input parameter. Set it to true to indicate that waiting for read availability on s is desired...
Definition: Select.hpp:103
bool readAvailable
Ouput parameter. Will be set to true to indicate that s has become available for reading.
Definition: Select.hpp:107
void push_back(const T &x)
Append an element to the end of the Array.
Definition: ArrayImpl.hpp:251
The AutoPtrVec class provides a simple class for smart pointers to a dynamically allocated array of o...
Definition: AutoPtr.hpp:184
int selectRW(SelectObjectArray &selarray, UInt32 ms)
Definition: Select.cpp:92
static void testCancel()
Test if this thread has been cancelled.
Definition: Thread.cpp:432
int selectRWEpoll(SelectObjectArray &selarray, const Timeout &timeout)
Definition: Select.cpp:186
size_type size() const
Definition: ArrayImpl.hpp:160
const int SELECT_NOT_IMPLEMENTED
Used internally, but listed here to prevent conflicts.
Definition: Select.hpp:67