blocxx
PosixUnnamedPipe.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2 * Copyright (C) 2005, Vintela, Inc. All rights reserved.
3 * Copyright (C) 2006, Novell, Inc. All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of
14 * Vintela, Inc.,
15 * nor Novell, Inc.,
16 * nor the names of its contributors or employees may be used to
17 * endorse or promote products derived from this software without
18 * specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
31 *******************************************************************************/
32 
33 
39 #include "blocxx/BLOCXX_config.h"
40 
41 #if !defined(BLOCXX_WIN32)
42 
44 #include "blocxx/AutoPtr.hpp"
45 #include "blocxx/IOException.hpp"
46 #include "blocxx/Format.hpp"
47 #include "blocxx/SocketUtils.hpp"
48 #include "blocxx/Assertion.hpp"
50 #include "blocxx/SignalScope.hpp"
51 #include "blocxx/Logger.hpp"
52 #include "blocxx/GlobalString.hpp"
53 
54 
55 #include "blocxx/Thread.hpp"
56 #ifdef BLOCXX_HAVE_UNISTD_H
57  #include <unistd.h>
58 #endif
59 #include <sys/socket.h>
60 #include <sys/types.h>
61 
62 #include <fcntl.h>
63 #include <errno.h>
64 #include <cstring>
65 
66 #if defined(BLOCXX_DARWIN)
67 // Necessary for detecting the kernel version in order to activate the descriptor passing workaround.
68 #include "blocxx/ThreadOnce.hpp"
69 #include "blocxx/PosixRegEx.hpp"
70 #include <sys/utsname.h>
71 #endif
72 
73 
74 namespace BLOCXX_NAMESPACE
75 {
76 
77 namespace
78 {
79  int upclose(int fd)
80  {
81  int rc;
82  do
83  {
84  rc = ::close(fd);
85  } while (rc < 0 && errno == EINTR);
86  if (rc == -1)
87  {
88  int lerrno = errno;
89  Logger lgr("blocxx");
90  BLOCXX_LOG_ERROR(lgr, Format("Closing pipe handle %1 failed: %2", fd, lerrno));
91  }
92  return rc;
93  }
94 
95  ::ssize_t upread(int fd, void * buf, std::size_t count)
96  {
97  ::ssize_t rv;
98  do
99  {
101  rv = ::read(fd, buf, count);
102  } while (rv < 0 && errno == EINTR);
103  return rv;
104  }
105 
106  ::ssize_t upwrite(int fd, void const * buf, std::size_t count)
107  {
108  ::ssize_t rv;
109  // block SIGPIPE so we don't kill the process if the pipe is closed.
110  SignalScope ss(SIGPIPE, SIG_IGN);
111  do
112  {
114  rv = ::write(fd, buf, count);
115  } while (rv < 0 && errno == EINTR);
116  return rv;
117  }
118 
119  int upaccept(int s, struct sockaddr * addr, socklen_t * addrlen)
120  {
121  int rv;
122  do
123  {
124  rv = ::accept(s, addr, addrlen);
125  } while (rv < 0 && errno == EINTR);
126  return rv;
127  }
128  enum EDirection
129  {
130  E_WRITE_PIPE, E_READ_PIPE
131  };
132 
133  // bufsz MUST be an int, and not some other integral type (address taken)
134  //
135  void setKernelBufferSize(Descriptor sockfd, int bufsz, EDirection edir)
136  {
137  if (sockfd == BLOCXX_INVALID_HANDLE)
138  {
139  return;
140  }
141 
142  int optname = (edir == E_WRITE_PIPE ? SO_SNDBUF : SO_RCVBUF);
143 
144  int getbufsz;
145  socklen_t getbufsz_len = sizeof(getbufsz);
146 
147 #ifdef BLOCXX_NCR
148  int errc = ::getsockopt(sockfd, SOL_SOCKET, optname, (char*)&getbufsz, &getbufsz_len);
149 #else
150  int errc = ::getsockopt(sockfd, SOL_SOCKET, optname, &getbufsz, &getbufsz_len);
151 #endif
152  if (errc == 0 && getbufsz < bufsz)
153  {
154 #ifdef BLOCXX_NCR
155  ::setsockopt(sockfd, SOL_SOCKET, optname, (char*)&bufsz, sizeof(bufsz));
156 #else
157  ::setsockopt(sockfd, SOL_SOCKET, optname, &bufsz, sizeof(bufsz));
158 #endif
159  }
160  }
161 
162  void setDefaultKernelBufsz(Descriptor sockfd_read, Descriptor sockfd_write)
163  {
164  int const BUFSZ = 64 * 1024;
165  setKernelBufferSize(sockfd_read, BUFSZ, E_READ_PIPE);
166  setKernelBufferSize(sockfd_write, BUFSZ, E_WRITE_PIPE);
167  }
168 
169  GlobalString COMPONENT_NAME = BLOCXX_GLOBAL_STRING_INIT("blocxx.PosixUnnamedPipe");
170 
171 #if defined(BLOCXX_DARWIN)
172  // Mac OS X < 10.5 has a kernel bug related to passing descriptors. As a workaround, descriptors are passed synchronously.
173  // This variable determines whether the workaround will be used. It will be set to false by detectDescriptorPassingBug()
174  bool needDescriptorPassingWorkaround = true;
175 
176  // This is the control to ensure that detectDescriptorPassingBug() is only called once.
177  OnceFlag detectDescriptorPassingBugFlag = BLOCXX_ONCE_INIT;
178 
179  // This function can not have logging statements or they will be sent over the pipe before sending the ACK.
180  void detectDescriptorPassingBug()
181  {
182  // until OS X 10.5 is actually released, assume it will be broken (even though Apple said it is fixed)
183  needDescriptorPassingWorkaround = true;
184  return;
185 #if 0
186  // if uname() reports the version as < 9.0.0 then we'll need the workaround.
187  struct utsname unamerv;
188  if (::uname(&unamerv) == -1)
189  {
190  needDescriptorPassingWorkaround = true; // unknown, so just assume it's necessary.
191  return;
192  }
193  String release(unamerv.release);
194  PosixRegEx re("([^.]*)\\..*");
195  StringArray releaseCapture = re.capture(release);
196  if (releaseCapture.size() < 2)
197  {
198  needDescriptorPassingWorkaround = true; // unknown, so just assume it's necessary.
199  return;
200  }
201  String majorRelease = releaseCapture[1];
202  try
203  {
204  needDescriptorPassingWorkaround = (majorRelease.toInt32() < 9);
205  }
206  catch (StringConversionException& e)
207  {
208  needDescriptorPassingWorkaround = true; // unknown, so just assume it's necessary.
209  return;
210  }
211 #endif
212  }
213 #endif
214 
215 }
216 
217 #ifdef BLOCXX_NETWARE
218 namespace
219 {
220 class AcceptThread
221 {
222 public:
223  AcceptThread(int serversock)
224  : m_serversock(serversock)
225  , m_serverconn(-1)
226  {
227  }
228 
229  void acceptConnection();
230  int getConnectFD() { return m_serverconn; }
231 private:
232  int m_serversock;
233  int m_serverconn;
234 };
235 
236 void
237 AcceptThread::acceptConnection()
238 {
239  struct sockaddr_in sin;
240  size_t val;
241  int tmp = 1;
242 
243  tmp = 1;
244  ::setsockopt(m_serversock, IPPROTO_TCP, 1, // #define TCP_NODELAY 1
245  (char*) &tmp, sizeof(int));
246 
247  val = sizeof(struct sockaddr_in);
248  if ((m_serverconn = upaccept(m_serversock, (struct sockaddr*)&sin, &val))
249  == -1)
250  {
251  return;
252  }
253  tmp = 1;
254  ::setsockopt(m_serverconn, IPPROTO_TCP, 1, // #define TCP_NODELAY 1
255  (char *) &tmp, sizeof(int));
256  tmp = 0;
257  ::setsockopt(m_serverconn, SOL_SOCKET, SO_KEEPALIVE,
258  (char*) &tmp, sizeof(int));
259 }
260 
261 void*
262 runConnClass(void* arg)
263 {
264  AcceptThread* acceptThread = (AcceptThread*)(arg);
265  acceptThread->acceptConnection();
266  ::pthread_exit(NULL);
267  return 0;
268 }
269 
270 int
271 _pipe(int *fds)
272 {
273  int svrfd, lerrno, connectfd;
274  size_t val;
275  struct sockaddr_in sin;
276 
277  svrfd = socket( AF_INET, SOCK_STREAM, 0 );
278  sin.sin_family = AF_INET;
279  sin.sin_addr.s_addr = htonl( 0x7f000001 ); // loopback
280  sin.sin_port = 0;
281  memset(sin.sin_zero, 0, 8 );
282  if (bind(svrfd, (struct sockaddr * )&sin, sizeof( struct sockaddr_in ) ) == -1)
283  {
284  int lerrno = errno;
285  upclose(svrfd);
286  fprintf(stderr, "CreateSocket(): Failed to bind on socket" );
287  return -1;
288  }
289  if (listen(svrfd, 1) == -1)
290  {
291  int lerrno = errno;
292  upclose(svrfd);
293  return -1;
294  }
295  val = sizeof(struct sockaddr_in);
296  if (getsockname(svrfd, ( struct sockaddr * )&sin, &val ) == -1)
297  {
298  int lerrno = errno;
299  fprintf(stderr, "CreateSocket(): Failed to obtain socket name" );
300  upclose(svrfd);
301  return -1;
302  }
303 
304  AcceptThread* pat = new AcceptThread(svrfd);
305  pthread_t athread;
306  // Start thread that will accept connection on svrfd.
307  // Once a connection is made the thread will exit.
308  pthread_create(&athread, NULL, runConnClass, pat);
309 
310  int clientfd = socket(AF_INET, SOCK_STREAM, 0);
311  if (clientfd == -1)
312  {
313  delete pat;
314  return -1;
315  }
316 
317  // Connect to server
318  struct sockaddr_in csin;
319  csin.sin_family = AF_INET;
320  csin.sin_addr.s_addr = htonl(0x7f000001); // loopback
321  csin.sin_port = sin.sin_port;
322  if (::connect(clientfd, (struct sockaddr*)&csin, sizeof(csin)) == -1)
323  {
324  delete pat;
325  return -1;
326  }
327 
328 #define TCP_NODELAY 1
329  int tmp = 1;
330  //
331  // Set for Non-blocking writes and disable keepalive
332  //
333  ::setsockopt(clientfd, IPPROTO_TCP, TCP_NODELAY, (char*)&tmp, sizeof(int));
334  tmp = 0;
335  ::setsockopt(clientfd, SOL_SOCKET, SO_KEEPALIVE, (char*)&tmp, sizeof(int));
336 
337  void* threadResult;
338  // Wait for accept thread to terminate
339  ::pthread_join(athread, &threadResult);
340 
341  upclose(svrfd);
342  fds[0] = pat->getConnectFD();
343  fds[1] = clientfd;
344  delete pat;
345  return 0;
346 }
347 }
348 #endif // BLOCXX_NETWARE
349 
352 {
354  if (doOpen)
355  {
356  open();
357  }
358  setTimeouts(Timeout::relative(60 * 10)); // 10 minutes. This helps break deadlocks when using safePopen()
359  setBlocking(E_BLOCKING); // necessary to set the pipes up right.
360 }
361 
364 {
365  m_fds[0] = inputfd.get();
366  m_fds[1] = outputfd.get();
367  setTimeouts(Timeout::relative(60 * 10)); // 10 minutes. This helps break deadlocks when using safePopen()
369  setDefaultKernelBufsz(m_fds[0], m_fds[1]);
370  inputfd.release();
371  outputfd.release();
372 }
373 
376 {
377  close();
378 }
380 namespace
381 {
382  typedef UnnamedPipe::EBlockingMode EBlockingMode;
383 
384  void set_desc_blocking(
385  int d, EBlockingMode & bmflag, EBlockingMode blocking_mode)
386  {
388  int fdflags = fcntl(d, F_GETFL, 0);
389  if (fdflags == -1)
390  {
391  BLOCXX_THROW_ERRNO_MSG(IOException, "Failed to set pipe blocking mode");
392  }
393  if (blocking_mode == UnnamedPipe::E_BLOCKING)
394  {
395  fdflags &= ~O_NONBLOCK;
396  }
397  else
398  {
399  fdflags |= O_NONBLOCK;
400  }
401  if (fcntl(d, F_SETFL, fdflags) == -1)
402  {
403  BLOCXX_THROW_ERRNO_MSG(IOException, "Failed to set pipe blocking mode");
404  }
405  bmflag = blocking_mode;
406  }
407 }
409 void
410 PosixUnnamedPipe::setBlocking(EBlockingMode blocking_mode)
411 {
413 
414  for (size_t i = 0; i < 2; ++i)
415  {
416  if (m_fds[i] != -1)
417  {
418  set_desc_blocking(m_fds[i], m_blocking[i], blocking_mode);
419  }
420  }
421 }
423 void
424 PosixUnnamedPipe::setWriteBlocking(EBlockingMode blocking_mode)
425 {
426  set_desc_blocking(m_fds[1], m_blocking[1], blocking_mode);
427 }
429 void
430 PosixUnnamedPipe::setReadBlocking(EBlockingMode blocking_mode)
431 {
432  set_desc_blocking(m_fds[0], m_blocking[0], blocking_mode);
433 }
435 void
437 {
438  if (m_fds[0] != BLOCXX_INVALID_HANDLE)
439  {
440  close();
441  }
442 #if defined(BLOCXX_NETWARE)
443  if (_pipe(m_fds) == BLOCXX_INVALID_HANDLE)
444  {
446  BLOCXX_THROW_ERRNO_MSG(UnnamedPipeException, "PosixUnamedPipe::open(): soketpair()");
447  }
448 
449 #else
450  if (::socketpair(AF_UNIX, SOCK_STREAM, 0, m_fds) == -1)
451  {
452  m_fds[0] = m_fds[1] = -1;
453  BLOCXX_THROW_ERRNO_MSG(UnnamedPipeException, "PosixUnamedPipe::open(): soketpair()");
454  }
455  ::shutdown(m_fds[0], SHUT_WR);
456  ::shutdown(m_fds[1], SHUT_RD);
457  setDefaultKernelBufsz(m_fds[0], m_fds[1]);
458 #endif
459 }
461 int
463 {
464  int rc = -1;
465 
466  // handle the case where both input and output are the same descriptor. It can't be closed twice.
467  if (m_fds[0] == m_fds[1])
468  {
470  }
471 
472  if (m_fds[0] != BLOCXX_INVALID_HANDLE)
473  {
474 
475  rc = upclose(m_fds[0]);
477  }
478 
479  if (m_fds[1] != BLOCXX_INVALID_HANDLE)
480  {
481  rc = upclose(m_fds[1]);
483  }
484 
485  return rc;
486 }
488 bool
490 {
491  return (m_fds[0] != BLOCXX_INVALID_HANDLE) || (m_fds[1] != BLOCXX_INVALID_HANDLE);
492 }
493 
495 int
497 {
498  int rc = -1;
499  if (m_fds[0] != BLOCXX_INVALID_HANDLE)
500  {
501  if (m_fds[0] != m_fds[1])
502  {
503  rc = upclose(m_fds[0]);
504  }
506  }
507  return rc;
508 }
510 int
512 {
513  int rc = -1;
514  if (m_fds[1] != BLOCXX_INVALID_HANDLE)
515  {
516  if (m_fds[0] != m_fds[1])
517  {
518  rc = upclose(m_fds[1]);
519  }
521  }
522  return rc;
523 }
525 int
526 PosixUnnamedPipe::write(const void* data, int dataLen, ErrorAction errorAsException)
527 {
528  int rc = -1;
529  if (m_fds[1] != BLOCXX_INVALID_HANDLE)
530  {
531  if (m_blocking[1] == E_BLOCKING)
532  {
534  if (rc != 0)
535  {
536  if (rc == ETIMEDOUT)
537  {
538  errno = ETIMEDOUT;
539  }
540  if (errorAsException == E_THROW_ON_ERROR)
541  {
542  BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
543  }
544  else
545  {
546  return -1;
547  }
548  }
549  }
550  rc = upwrite(m_fds[1], data, dataLen);
551  }
552  if (errorAsException == E_THROW_ON_ERROR && rc == -1)
553  {
554  if (m_fds[1] == BLOCXX_INVALID_HANDLE)
555  {
556  BLOCXX_THROW(IOException, "pipe write failed because pipe is closed");
557  }
558  else
559  {
560  BLOCXX_THROW_ERRNO_MSG(IOException, "pipe write failed");
561  }
562  }
563  return rc;
564 }
566 int
567 PosixUnnamedPipe::read(void* buffer, int bufferLen, ErrorAction errorAsException)
568 {
569  int rc = -1;
570  if (m_fds[0] != BLOCXX_INVALID_HANDLE)
571  {
572  if (m_blocking[0] == E_BLOCKING)
573  {
575  if (rc != 0)
576  {
577  if (rc == ETIMEDOUT)
578  {
579  errno = ETIMEDOUT;
580  }
581  if (errorAsException == E_THROW_ON_ERROR)
582  {
583  BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
584  }
585  else
586  {
587  return -1;
588  }
589  }
590  }
591  rc = upread(m_fds[0], buffer, bufferLen);
592  }
593 
594  if (rc == 0)
595  {
597  }
598 
599  if (errorAsException == E_THROW_ON_ERROR && rc == -1)
600  {
601  if (m_fds[0] == BLOCXX_INVALID_HANDLE)
602  {
603  BLOCXX_THROW(IOException, "pipe read failed because pipe is closed");
604  }
605  else
606  {
607  BLOCXX_THROW_ERRNO_MSG(IOException, "pipe read failed");
608  }
609  }
610  return rc;
611 }
613 Select_t
615 {
616  return m_fds[0];
617 }
618 
620 Select_t
622 {
623  return m_fds[1];
624 }
625 
627 void
628 PosixUnnamedPipe::passDescriptor(Descriptor descriptor, const UnnamedPipeRef& ackPipe, const ProcessRef& targetProcess)
629 {
630  int rc = -1;
631  if (m_fds[1] != BLOCXX_INVALID_HANDLE)
632  {
633  if (m_blocking[1] == E_BLOCKING)
634  {
636 
637  if (rc != 0)
638  {
639  if (rc == ETIMEDOUT)
640  {
641  errno = ETIMEDOUT;
642  }
643  BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
644  }
645  }
646 
647  rc = blocxx::passDescriptor(m_fds[1], descriptor);
648  if (rc == -1)
649  {
650  BLOCXX_THROW_ERRNO_MSG(IOException, "sendDescriptor() failed: passDescriptor()");
651  }
652 
653 #if defined(BLOCXX_DARWIN)
654  callOnce(detectDescriptorPassingBugFlag, detectDescriptorPassingBug);
655  if (rc != -1 && needDescriptorPassingWorkaround)
656  {
657  // This ignores the blocking and timeouts, because this ACK shouldn't timeout.
659  if (rc != -1)
660  {
661  char ack = 'Z';
662  rc = ackPipe->read(&ack, sizeof(ack), E_RETURN_ON_ERROR);
663  if (rc == -1)
664  {
665  BLOCXX_THROW_ERRNO_MSG(IOException, "sendDescriptor() failed: ackPipe->read()");
666  }
667  if (ack != 'A')
668  {
669  BLOCXX_THROW(IOException, Format("sendDescriptor() failed: ackPipe->read() didn't get 'A', got %1", static_cast<int>(ack)).c_str());
670  }
671  }
672  else
673  {
674  BLOCXX_THROW_ERRNO_MSG(IOException, "sendDescriptor() failed: waitForIO()");
675  }
676  }
677 #endif
678  }
679  if (rc == -1)
680  {
681  if (m_fds[1] == BLOCXX_INVALID_HANDLE)
682  {
683  BLOCXX_THROW(IOException, "sendDescriptor() failed because pipe is closed");
684  }
685  else
686  {
687  BLOCXX_THROW_ERRNO_MSG(IOException, "sendDescriptor() failed");
688  }
689  }
690 }
691 
695 {
696  int rc = -1;
697  AutoDescriptor descriptor;
698  if (m_fds[0] != BLOCXX_INVALID_HANDLE)
699  {
700  if (m_blocking[0] == E_BLOCKING)
701  {
703 
704  if (rc != 0)
705  {
706  if (rc == ETIMEDOUT)
707  {
708  errno = ETIMEDOUT;
709  }
710  BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
711  }
712  }
713  descriptor = blocxx::receiveDescriptor(m_fds[0]);
714 
715 #if defined(BLOCXX_DARWIN)
716  callOnce(detectDescriptorPassingBugFlag, detectDescriptorPassingBug);
717  if (needDescriptorPassingWorkaround)
718  {
719  // This ignores the blocking and timeouts, because this ACK shouldn't timeout.
721  if (rc != -1)
722  {
723  char ack = 'A';
724  ackPipe->write(&ack, sizeof(ack), E_THROW_ON_ERROR);
725  }
726  }
727 #endif
728  }
729  else
730  {
731  BLOCXX_THROW(IOException, "receiveDescriptor() failed because pipe is closed");
732  }
733  return descriptor;
734 }
735 
739 {
740  return m_fds[0];
741 }
742 
746 {
747  return m_fds[1];
748 }
749 
751 EBlockingMode
753 {
754  return m_blocking[0];
755 }
756 
758 EBlockingMode
760 {
761  return m_blocking[1];
762 }
763 
764 } // end namespace BLOCXX_NAMESPACE
765 
766 #endif
#define BLOCXX_INVALID_HANDLE
Definition: Types.hpp:136
virtual int write(const void *data, int dataLen, ErrorAction errorAsException=E_RETURN_ON_ERROR)
Write a specified number of bytes to the device that is exposing the IOIFC interface.
void setTimeouts(const Timeout &timeout)
Sets the read & write timeout values.
virtual void setWriteBlocking(EBlockingMode isBlocking=E_BLOCKING)
virtual bool isOpen() const
Is the pipe open or closed?
#define BLOCXX_LOG_ERROR(logger, message)
Log message to logger with the Error level.
Definition: Logger.hpp:433
virtual void setReadBlocking(EBlockingMode isBlocking=E_BLOCKING)
Timeout getWriteTimeout()
Gets the write timeout value.
#define BLOCXX_ASSERT(CON)
BLOCXX_ASSERT works similar to the assert() macro, but instead of calling abort(), it throws an AssertionException.
Definition: Assertion.hpp:57
virtual int close()
Close the pipe.
BLOCXX_COMMON_API int close(const FileHandle &hdl)
Close file handle.
virtual int write(const void *dataOut, int dataOutLen, ErrorAction errorAsException=E_RETURN_ON_ERROR)=0
Write a specified number of bytes to the device that is exposing the IOIFC interface.
virtual EBlockingMode getReadBlocking() const
Get the current blocking mode for reading from pipe.
virtual Select_t getWriteSelectObj() const
Get the write select object.
virtual void passDescriptor(Descriptor h, const UnnamedPipeRef &ackPipe=0, const ProcessRef &targetProcess=0)
Sends a Descriptor to the peer.
LazyGlobal< String, char const *const > GlobalString
void read(std::streambuf &istrm, void *dataIn, size_t dataInLen)
handle_type get() const
Return handle of resource, retaining ownership.
virtual int read(void *dataIn, int dataInLen, ErrorAction errorAsException=E_RETURN_ON_ERROR)=0
Read a specified number of bytes from the device that is exposing the IOIFC interface.
virtual void setBlocking(EBlockingMode outputIsBlocking=E_BLOCKING)
handle_type release()
Relinquish ownership of resource and return its handle.
virtual AutoDescriptor receiveDescriptor(const UnnamedPipeRef &ackPipe)
Gets a Descriptor from the peer.
Array< String > StringArray
Definition: CommonFwd.hpp:72
virtual Descriptor getInputDescriptor() const =0
Get the underlying input descriptor.
static Timeout relative(float seconds)
Definition: Timeout.cpp:58
virtual Descriptor getInputDescriptor() const
Get the underlying input descriptor.
virtual void open()
Open the pipe.
virtual EBlockingMode getWriteBlocking() const
Get the current blocking mode for writing from pipe.
virtual Descriptor getOutputDescriptor() const
Get the underlying output descriptor.
PURPOSE: The AutoResource class template is an analog of std::auto_ptr for managing arbitrary resourc...
int passDescriptor(Descriptor streamPipe, Descriptor descriptor, ProcId targetProcessId)
Sends a Descriptor to the peer.
unsigned socklen_t
#define BLOCXX_GLOBAL_STRING_INIT(str)
static Timeout infinite
Definition: Timeout.hpp:62
virtual Select_t getReadSelectObj() const
Get the read select object.
Timeout getReadTimeout()
Gets the read timeout value.
virtual Descriptor getOutputDescriptor() const =0
Get the underlying output descriptor.
void BLOCXX_COMMON_API callOnce(OnceFlag &flag, FuncT F)
The first time callOnce is called with a given onceFlag argument, it calls func with no argument and ...
class BLOCXX_COMMON_API Logger
Definition: CommonFwd.hpp:63
virtual int read(void *buffer, int bufferLen, ErrorAction errorAsException=E_RETURN_ON_ERROR)
Read a specified number of bytes from the device that is exposing the IOIFC interface.
#define BLOCXX_THROW(exType, msg)
Throw an exception using FILE and LINE.
Definition: Exception.hpp:263
int waitForIO(SocketHandle_t fd, int timeOutSecs, SocketFlags::EWaitDirectionFlag waitFlag)
Wait for input or output on a socket.
static void testCancel()
Test if this thread has been cancelled.
Definition: Thread.cpp:432
void write(std::streambuf &ostrm, void const *dataOut, size_t dataOutLen)
#define BLOCXX_THROW_ERRNO_MSG(exType, msg)
Throw an exception using FILE, LINE, errno and strerror(errno)
Definition: Exception.hpp:312
#define ETIMEDOUT
Definition: SocketUtils.hpp:47
AutoDescriptor receiveDescriptor(Descriptor streamPipe)
Gets a Descriptor from the peer.