39 #include "blocxx/BLOCXX_config.h" 41 #if !defined(BLOCXX_WIN32) 56 #ifdef BLOCXX_HAVE_UNISTD_H 59 #include <sys/socket.h> 60 #include <sys/types.h> 66 #if defined(BLOCXX_DARWIN) 70 #include <sys/utsname.h> 85 }
while (rc < 0 && errno == EINTR);
90 BLOCXX_LOG_ERROR(lgr, Format(
"Closing pipe handle %1 failed: %2", fd, lerrno));
95 ::ssize_t upread(
int fd,
void * buf, std::size_t count)
101 rv =
::read(fd, buf, count);
102 }
while (rv < 0 && errno == EINTR);
106 ::ssize_t upwrite(
int fd,
void const * buf, std::size_t count)
110 SignalScope ss(SIGPIPE, SIG_IGN);
115 }
while (rv < 0 && errno == EINTR);
119 int upaccept(
int s,
struct sockaddr * addr,
socklen_t * addrlen)
124 rv = ::accept(s, addr, addrlen);
125 }
while (rv < 0 && errno == EINTR);
130 E_WRITE_PIPE, E_READ_PIPE
142 int optname = (edir == E_WRITE_PIPE ? SO_SNDBUF : SO_RCVBUF);
145 socklen_t getbufsz_len =
sizeof(getbufsz);
148 int errc = ::getsockopt(sockfd, SOL_SOCKET, optname, (
char*)&getbufsz, &getbufsz_len);
150 int errc = ::getsockopt(sockfd, SOL_SOCKET, optname, &getbufsz, &getbufsz_len);
152 if (errc == 0 && getbufsz < bufsz)
155 ::setsockopt(sockfd, SOL_SOCKET, optname, (
char*)&bufsz,
sizeof(bufsz));
157 ::setsockopt(sockfd, SOL_SOCKET, optname, &bufsz,
sizeof(bufsz));
164 int const BUFSZ = 64 * 1024;
165 setKernelBufferSize(sockfd_read, BUFSZ, E_READ_PIPE);
166 setKernelBufferSize(sockfd_write, BUFSZ, E_WRITE_PIPE);
171 #if defined(BLOCXX_DARWIN) 174 bool needDescriptorPassingWorkaround =
true;
177 OnceFlag detectDescriptorPassingBugFlag = BLOCXX_ONCE_INIT;
180 void detectDescriptorPassingBug()
183 needDescriptorPassingWorkaround =
true;
187 struct utsname unamerv;
188 if (::uname(&unamerv) == -1)
190 needDescriptorPassingWorkaround =
true;
193 String release(unamerv.release);
194 PosixRegEx re(
"([^.]*)\\..*");
196 if (releaseCapture.
size() < 2)
198 needDescriptorPassingWorkaround =
true;
201 String majorRelease = releaseCapture[1];
204 needDescriptorPassingWorkaround = (majorRelease.
toInt32() < 9);
208 needDescriptorPassingWorkaround =
true;
217 #ifdef BLOCXX_NETWARE 223 AcceptThread(
int serversock)
224 : m_serversock(serversock)
229 void acceptConnection();
230 int getConnectFD() {
return m_serverconn; }
237 AcceptThread::acceptConnection()
239 struct sockaddr_in sin;
244 ::setsockopt(m_serversock, IPPROTO_TCP, 1,
245 (
char*) &tmp,
sizeof(
int));
247 val =
sizeof(
struct sockaddr_in);
248 if ((m_serverconn = upaccept(m_serversock, (
struct sockaddr*)&sin, &val))
254 ::setsockopt(m_serverconn, IPPROTO_TCP, 1,
255 (
char *) &tmp,
sizeof(
int));
257 ::setsockopt(m_serverconn, SOL_SOCKET, SO_KEEPALIVE,
258 (
char*) &tmp,
sizeof(
int));
262 runConnClass(
void* arg)
264 AcceptThread* acceptThread = (AcceptThread*)(arg);
265 acceptThread->acceptConnection();
266 ::pthread_exit(NULL);
273 int svrfd, lerrno, connectfd;
275 struct sockaddr_in sin;
277 svrfd = socket( AF_INET, SOCK_STREAM, 0 );
278 sin.sin_family = AF_INET;
279 sin.sin_addr.s_addr = htonl( 0x7f000001 );
281 memset(sin.sin_zero, 0, 8 );
282 if (bind(svrfd, (
struct sockaddr * )&sin,
sizeof(
struct sockaddr_in ) ) == -1)
286 fprintf(stderr,
"CreateSocket(): Failed to bind on socket" );
289 if (listen(svrfd, 1) == -1)
295 val =
sizeof(
struct sockaddr_in);
296 if (getsockname(svrfd, (
struct sockaddr * )&sin, &val ) == -1)
299 fprintf(stderr,
"CreateSocket(): Failed to obtain socket name" );
304 AcceptThread* pat =
new AcceptThread(svrfd);
308 pthread_create(&athread, NULL, runConnClass, pat);
310 int clientfd = socket(AF_INET, SOCK_STREAM, 0);
318 struct sockaddr_in csin;
319 csin.sin_family = AF_INET;
320 csin.sin_addr.s_addr = htonl(0x7f000001);
321 csin.sin_port = sin.sin_port;
322 if (::connect(clientfd, (
struct sockaddr*)&csin,
sizeof(csin)) == -1)
328 #define TCP_NODELAY 1 333 ::setsockopt(clientfd, IPPROTO_TCP, TCP_NODELAY, (
char*)&tmp,
sizeof(
int));
335 ::setsockopt(clientfd, SOL_SOCKET, SO_KEEPALIVE, (
char*)&tmp,
sizeof(
int));
339 ::pthread_join(athread, &threadResult);
342 fds[0] = pat->getConnectFD();
348 #endif // BLOCXX_NETWARE 359 setBlocking(E_BLOCKING);
365 m_fds[0] = inputfd.
get();
366 m_fds[1] = outputfd.
get();
368 setBlocking(E_BLOCKING);
369 setDefaultKernelBufsz(m_fds[0], m_fds[1]);
384 void set_desc_blocking(
385 int d, EBlockingMode & bmflag, EBlockingMode blocking_mode)
388 int fdflags = fcntl(d, F_GETFL, 0);
395 fdflags &= ~O_NONBLOCK;
399 fdflags |= O_NONBLOCK;
401 if (fcntl(d, F_SETFL, fdflags) == -1)
405 bmflag = blocking_mode;
414 for (
size_t i = 0;
i < 2; ++
i)
418 set_desc_blocking(m_fds[
i], m_blocking[i], blocking_mode);
426 set_desc_blocking(m_fds[1], m_blocking[1], blocking_mode);
432 set_desc_blocking(m_fds[0], m_blocking[0], blocking_mode);
442 #if defined(BLOCXX_NETWARE) 450 if (::socketpair(AF_UNIX, SOCK_STREAM, 0, m_fds) == -1)
452 m_fds[0] = m_fds[1] = -1;
455 ::shutdown(m_fds[0], SHUT_WR);
456 ::shutdown(m_fds[1], SHUT_RD);
457 setDefaultKernelBufsz(m_fds[0], m_fds[1]);
467 if (m_fds[0] == m_fds[1])
475 rc = upclose(m_fds[0]);
481 rc = upclose(m_fds[1]);
501 if (m_fds[0] != m_fds[1])
503 rc = upclose(m_fds[0]);
516 if (m_fds[0] != m_fds[1])
518 rc = upclose(m_fds[1]);
531 if (m_blocking[1] == E_BLOCKING)
540 if (errorAsException == E_THROW_ON_ERROR)
550 rc = upwrite(m_fds[1], data, dataLen);
552 if (errorAsException == E_THROW_ON_ERROR && rc == -1)
572 if (m_blocking[0] == E_BLOCKING)
581 if (errorAsException == E_THROW_ON_ERROR)
591 rc = upread(m_fds[0], buffer, bufferLen);
599 if (errorAsException == E_THROW_ON_ERROR && rc == -1)
633 if (m_blocking[1] == E_BLOCKING)
653 #if defined(BLOCXX_DARWIN) 654 callOnce(detectDescriptorPassingBugFlag, detectDescriptorPassingBug);
655 if (rc != -1 && needDescriptorPassingWorkaround)
662 rc = ackPipe->
read(&ack,
sizeof(ack), E_RETURN_ON_ERROR);
700 if (m_blocking[0] == E_BLOCKING)
715 #if defined(BLOCXX_DARWIN) 716 callOnce(detectDescriptorPassingBugFlag, detectDescriptorPassingBug);
717 if (needDescriptorPassingWorkaround)
724 ackPipe->
write(&ack,
sizeof(ack), E_THROW_ON_ERROR);
754 return m_blocking[0];
761 return m_blocking[1];
#define BLOCXX_INVALID_HANDLE
virtual Select_t getReadSelectObj() const
Get the read select object.
virtual bool isOpen() const
Is the pipe open or closed?
virtual int write(const void *data, int dataLen, ErrorAction errorAsException=E_RETURN_ON_ERROR)
Write a specified number of bytes to the device that is exposing the IOIFC interface.
virtual void setWriteBlocking(EBlockingMode isBlocking=E_BLOCKING)
#define BLOCXX_LOG_ERROR(logger, message)
Log message to logger with the Error level.
virtual void setReadBlocking(EBlockingMode isBlocking=E_BLOCKING)
#define BLOCXX_ASSERT(CON)
BLOCXX_ASSERT works similar to the assert() macro, but instead of calling abort(), it throws an AssertionException.
virtual int close()
Close the pipe.
BLOCXX_COMMON_API int close(const FileHandle &hdl)
Close file handle.
virtual int write(const void *dataOut, int dataOutLen, ErrorAction errorAsException=E_RETURN_ON_ERROR)=0
Write a specified number of bytes to the device that is exposing the IOIFC interface.
This String class is an abstract data type that represents as NULL terminated string of characters...
virtual void passDescriptor(Descriptor h, const UnnamedPipeRef &ackPipe=0, const ProcessRef &targetProcess=0)
Sends a Descriptor to the peer.
void read(std::streambuf &istrm, void *dataIn, size_t dataInLen)
virtual int read(void *dataIn, int dataInLen, ErrorAction errorAsException=E_RETURN_ON_ERROR)=0
Read a specified number of bytes from the device that is exposing the IOIFC interface.
virtual void setBlocking(EBlockingMode outputIsBlocking=E_BLOCKING)
handle_type release()
Relinquish ownership of resource and return its handle.
virtual Descriptor getOutputDescriptor() const
Get the underlying output descriptor.
virtual AutoDescriptor receiveDescriptor(const UnnamedPipeRef &ackPipe)
Gets a Descriptor from the peer.
PosixUnnamedPipe(EOpen doOpen=E_OPEN)
virtual Descriptor getInputDescriptor() const =0
Get the underlying input descriptor.
static Timeout relative(float seconds)
virtual EBlockingMode getWriteBlocking() const
Get the current blocking mode for writing from pipe.
GlobalString COMPONENT_NAME
virtual void open()
Open the pipe.
virtual int closeInputHandle()
Int32 toInt32(int base=10) const
virtual EBlockingMode getReadBlocking() const
Get the current blocking mode for reading from pipe.
PURPOSE: The AutoResource class template is an analog of std::auto_ptr for managing arbitrary resourc...
virtual int closeOutputHandle()
int passDescriptor(Descriptor streamPipe, Descriptor descriptor, ProcId targetProcessId)
Sends a Descriptor to the peer.
#define BLOCXX_GLOBAL_STRING_INIT(str)
virtual Descriptor getInputDescriptor() const
Get the underlying input descriptor.
virtual Descriptor getOutputDescriptor() const =0
Get the underlying output descriptor.
void BLOCXX_COMMON_API callOnce(OnceFlag &flag, FuncT F)
The first time callOnce is called with a given onceFlag argument, it calls func with no argument and ...
class BLOCXX_COMMON_API Logger
virtual int read(void *buffer, int bufferLen, ErrorAction errorAsException=E_RETURN_ON_ERROR)
Read a specified number of bytes from the device that is exposing the IOIFC interface.
#define BLOCXX_THROW(exType, msg)
Throw an exception using FILE and LINE.
handle_type get() const
Return handle of resource, retaining ownership.
virtual Select_t getWriteSelectObj() const
Get the write select object.
int waitForIO(SocketHandle_t fd, int timeOutSecs, SocketFlags::EWaitDirectionFlag waitFlag)
Wait for input or output on a socket.
This class can be used to store a global variable that is lazily initialized in a thread safe manner...
static void testCancel()
Test if this thread has been cancelled.
void write(std::streambuf &ostrm, void const *dataOut, size_t dataOutLen)
#define BLOCXX_THROW_ERRNO_MSG(exType, msg)
Throw an exception using FILE, LINE, errno and strerror(errno)
virtual ~PosixUnnamedPipe()
AutoDescriptor receiveDescriptor(Descriptor streamPipe)
Gets a Descriptor from the peer.