39 #include "blocxx/BLOCXX_config.h"
41 #if !defined(BLOCXX_WIN32)
56 #ifdef BLOCXX_HAVE_UNISTD_H
59 #include <sys/socket.h>
60 #include <sys/types.h>
66 #if defined(BLOCXX_DARWIN)
70 #include <sys/utsname.h>
74 namespace BLOCXX_NAMESPACE
85 }
while (rc < 0 && errno == EINTR);
90 BLOCXX_LOG_ERROR(lgr, Format(
"Closing pipe handle %1 failed: %2", fd, lerrno));
95 ::ssize_t upread(
int fd,
void * buf, std::size_t count)
101 rv =
::read(fd, buf, count);
102 }
while (rv < 0 && errno == EINTR);
106 ::ssize_t upwrite(
int fd,
void const * buf, std::size_t count)
110 SignalScope ss(SIGPIPE, SIG_IGN);
115 }
while (rv < 0 && errno == EINTR);
119 int upaccept(
int s,
struct sockaddr * addr,
socklen_t * addrlen)
124 rv = ::accept(s, addr, addrlen);
125 }
while (rv < 0 && errno == EINTR);
130 E_WRITE_PIPE, E_READ_PIPE
135 void setKernelBufferSize(
Descriptor sockfd,
int bufsz, EDirection edir)
142 int optname = (edir == E_WRITE_PIPE ? SO_SNDBUF : SO_RCVBUF);
145 socklen_t getbufsz_len =
sizeof(getbufsz);
148 int errc = ::getsockopt(sockfd, SOL_SOCKET, optname, (
char*)&getbufsz, &getbufsz_len);
150 int errc = ::getsockopt(sockfd, SOL_SOCKET, optname, &getbufsz, &getbufsz_len);
152 if (errc == 0 && getbufsz < bufsz)
155 ::setsockopt(sockfd, SOL_SOCKET, optname, (
char*)&bufsz,
sizeof(bufsz));
157 ::setsockopt(sockfd, SOL_SOCKET, optname, &bufsz,
sizeof(bufsz));
164 int const BUFSZ = 64 * 1024;
165 setKernelBufferSize(sockfd_read, BUFSZ, E_READ_PIPE);
166 setKernelBufferSize(sockfd_write, BUFSZ, E_WRITE_PIPE);
171 #if defined(BLOCXX_DARWIN)
174 bool needDescriptorPassingWorkaround =
true;
177 OnceFlag detectDescriptorPassingBugFlag = BLOCXX_ONCE_INIT;
180 void detectDescriptorPassingBug()
183 needDescriptorPassingWorkaround =
true;
187 struct utsname unamerv;
188 if (::uname(&unamerv) == -1)
190 needDescriptorPassingWorkaround =
true;
193 String release(unamerv.release);
194 PosixRegEx re(
"([^.]*)\\..*");
196 if (releaseCapture.size() < 2)
198 needDescriptorPassingWorkaround =
true;
201 String majorRelease = releaseCapture[1];
204 needDescriptorPassingWorkaround = (majorRelease.toInt32() < 9);
206 catch (StringConversionException& e)
208 needDescriptorPassingWorkaround =
true;
217 #ifdef BLOCXX_NETWARE
223 AcceptThread(
int serversock)
224 : m_serversock(serversock)
229 void acceptConnection();
230 int getConnectFD() {
return m_serverconn; }
237 AcceptThread::acceptConnection()
239 struct sockaddr_in sin;
244 ::setsockopt(m_serversock, IPPROTO_TCP, 1,
245 (
char*) &tmp,
sizeof(
int));
247 val =
sizeof(
struct sockaddr_in);
248 if ((m_serverconn = upaccept(m_serversock, (
struct sockaddr*)&sin, &val))
254 ::setsockopt(m_serverconn, IPPROTO_TCP, 1,
255 (
char *) &tmp,
sizeof(
int));
257 ::setsockopt(m_serverconn, SOL_SOCKET, SO_KEEPALIVE,
258 (
char*) &tmp,
sizeof(
int));
262 runConnClass(
void* arg)
264 AcceptThread* acceptThread = (AcceptThread*)(arg);
265 acceptThread->acceptConnection();
266 ::pthread_exit(NULL);
273 int svrfd, lerrno, connectfd;
275 struct sockaddr_in sin;
277 svrfd = socket( AF_INET, SOCK_STREAM, 0 );
278 sin.sin_family = AF_INET;
279 sin.sin_addr.s_addr = htonl( 0x7f000001 );
281 memset(sin.sin_zero, 0, 8 );
282 if (bind(svrfd, (
struct sockaddr * )&sin,
sizeof(
struct sockaddr_in ) ) == -1)
286 fprintf(stderr,
"CreateSocket(): Failed to bind on socket" );
289 if (listen(svrfd, 1) == -1)
295 val =
sizeof(
struct sockaddr_in);
296 if (getsockname(svrfd, (
struct sockaddr * )&sin, &val ) == -1)
299 fprintf(stderr,
"CreateSocket(): Failed to obtain socket name" );
304 AcceptThread* pat =
new AcceptThread(svrfd);
308 pthread_create(&athread, NULL, runConnClass, pat);
310 int clientfd = socket(AF_INET, SOCK_STREAM, 0);
318 struct sockaddr_in csin;
319 csin.sin_family = AF_INET;
320 csin.sin_addr.s_addr = htonl(0x7f000001);
321 csin.sin_port = sin.sin_port;
322 if (::connect(clientfd, (
struct sockaddr*)&csin,
sizeof(csin)) == -1)
328 #define TCP_NODELAY 1
333 ::setsockopt(clientfd, IPPROTO_TCP, TCP_NODELAY, (
char*)&tmp,
sizeof(
int));
335 ::setsockopt(clientfd, SOL_SOCKET, SO_KEEPALIVE, (
char*)&tmp,
sizeof(
int));
339 ::pthread_join(athread, &threadResult);
342 fds[0] = pat->getConnectFD();
348 #endif // BLOCXX_NETWARE
384 void set_desc_blocking(
385 int d, EBlockingMode & bmflag, EBlockingMode blocking_mode)
388 int fdflags = fcntl(d, F_GETFL, 0);
395 fdflags &= ~O_NONBLOCK;
399 fdflags |= O_NONBLOCK;
401 if (fcntl(d, F_SETFL, fdflags) == -1)
405 bmflag = blocking_mode;
414 for (
size_t i = 0;
i < 2; ++
i)
442 #if defined(BLOCXX_NETWARE)
450 if (::socketpair(AF_UNIX, SOCK_STREAM, 0,
m_fds) == -1)
455 ::shutdown(
m_fds[0], SHUT_WR);
456 ::shutdown(
m_fds[1], SHUT_RD);
475 rc = upclose(
m_fds[0]);
481 rc = upclose(
m_fds[1]);
503 rc = upclose(
m_fds[0]);
518 rc = upclose(
m_fds[1]);
550 rc = upwrite(
m_fds[1], data, dataLen);
591 rc = upread(
m_fds[0], buffer, bufferLen);
653 #if defined(BLOCXX_DARWIN)
654 callOnce(detectDescriptorPassingBugFlag, detectDescriptorPassingBug);
655 if (rc != -1 && needDescriptorPassingWorkaround)
715 #if defined(BLOCXX_DARWIN)
716 callOnce(detectDescriptorPassingBugFlag, detectDescriptorPassingBug);
717 if (needDescriptorPassingWorkaround)
#define BLOCXX_INVALID_HANDLE
virtual int write(const void *data, int dataLen, ErrorAction errorAsException=E_RETURN_ON_ERROR)
Write a specified number of bytes to the device that is exposing the IOIFC interface.
void setTimeouts(const Timeout &timeout)
Sets the read & write timeout values.
virtual void setWriteBlocking(EBlockingMode isBlocking=E_BLOCKING)
EBlockingMode m_blocking[2]
virtual bool isOpen() const
Is the pipe open or closed?
#define BLOCXX_LOG_ERROR(logger, message)
Log message to logger with the Error level.
virtual void setReadBlocking(EBlockingMode isBlocking=E_BLOCKING)
Timeout getWriteTimeout()
Gets the write timeout value.
#define BLOCXX_ASSERT(CON)
BLOCXX_ASSERT works similar to the assert() macro, but instead of calling abort(), it throws an AssertionException.
virtual int close()
Close the pipe.
BLOCXX_COMMON_API int close(const FileHandle &hdl)
Close file handle.
virtual int write(const void *dataOut, int dataOutLen, ErrorAction errorAsException=E_RETURN_ON_ERROR)=0
Write a specified number of bytes to the device that is exposing the IOIFC interface.
virtual EBlockingMode getReadBlocking() const
Get the current blocking mode for reading from pipe.
virtual Select_t getWriteSelectObj() const
Get the write select object.
virtual void passDescriptor(Descriptor h, const UnnamedPipeRef &ackPipe=0, const ProcessRef &targetProcess=0)
Sends a Descriptor to the peer.
LazyGlobal< String, char const *const > GlobalString
void read(std::streambuf &istrm, void *dataIn, size_t dataInLen)
handle_type get() const
Return handle of resource, retaining ownership.
virtual int read(void *dataIn, int dataInLen, ErrorAction errorAsException=E_RETURN_ON_ERROR)=0
Read a specified number of bytes from the device that is exposing the IOIFC interface.
virtual void setBlocking(EBlockingMode outputIsBlocking=E_BLOCKING)
handle_type release()
Relinquish ownership of resource and return its handle.
virtual AutoDescriptor receiveDescriptor(const UnnamedPipeRef &ackPipe)
Gets a Descriptor from the peer.
PosixUnnamedPipe(EOpen doOpen=E_OPEN)
Array< String > StringArray
virtual Descriptor getInputDescriptor() const =0
Get the underlying input descriptor.
static Timeout relative(float seconds)
virtual Descriptor getInputDescriptor() const
Get the underlying input descriptor.
GlobalString COMPONENT_NAME
virtual void open()
Open the pipe.
virtual int closeInputHandle()
virtual EBlockingMode getWriteBlocking() const
Get the current blocking mode for writing from pipe.
virtual Descriptor getOutputDescriptor() const
Get the underlying output descriptor.
PURPOSE: The AutoResource class template is an analog of std::auto_ptr for managing arbitrary resourc...
virtual int closeOutputHandle()
int passDescriptor(Descriptor streamPipe, Descriptor descriptor, ProcId targetProcessId)
Sends a Descriptor to the peer.
#define BLOCXX_GLOBAL_STRING_INIT(str)
virtual Select_t getReadSelectObj() const
Get the read select object.
Timeout getReadTimeout()
Gets the read timeout value.
virtual Descriptor getOutputDescriptor() const =0
Get the underlying output descriptor.
void BLOCXX_COMMON_API callOnce(OnceFlag &flag, FuncT F)
The first time callOnce is called with a given onceFlag argument, it calls func with no argument and ...
class BLOCXX_COMMON_API Logger
virtual int read(void *buffer, int bufferLen, ErrorAction errorAsException=E_RETURN_ON_ERROR)
Read a specified number of bytes from the device that is exposing the IOIFC interface.
#define BLOCXX_THROW(exType, msg)
Throw an exception using FILE and LINE.
int waitForIO(SocketHandle_t fd, int timeOutSecs, SocketFlags::EWaitDirectionFlag waitFlag)
Wait for input or output on a socket.
static void testCancel()
Test if this thread has been cancelled.
void write(std::streambuf &ostrm, void const *dataOut, size_t dataOutLen)
#define BLOCXX_THROW_ERRNO_MSG(exType, msg)
Throw an exception using FILE, LINE, errno and strerror(errno)
virtual ~PosixUnnamedPipe()
AutoDescriptor receiveDescriptor(Descriptor streamPipe)
Gets a Descriptor from the peer.