39 #include "blocxx/BLOCXX_config.h" 41 #if !defined(BLOCXX_WIN32) 56 #ifdef BLOCXX_HAVE_UNISTD_H 59 #include <sys/socket.h> 60 #include <sys/types.h> 66 #if defined(BLOCXX_DARWIN) 70 #include <sys/utsname.h> 85 }
while (rc < 0 && errno == EINTR);
90 BLOCXX_LOG_ERROR(lgr, Format(
"Closing pipe handle %1 failed: %2", fd, lerrno));
95 ::ssize_t upread(
int fd,
void * buf, std::size_t count)
101 rv =
::read(fd, buf, count);
102 }
while (rv < 0 && errno == EINTR);
106 ::ssize_t upwrite(
int fd,
void const * buf, std::size_t count)
110 SignalScope ss(SIGPIPE, SIG_IGN);
115 }
while (rv < 0 && errno == EINTR);
119 int upaccept(
int s,
struct sockaddr * addr,
socklen_t * addrlen)
124 rv = ::accept(s, addr, addrlen);
125 }
while (rv < 0 && errno == EINTR);
130 E_WRITE_PIPE, E_READ_PIPE
135 void setKernelBufferSize(
Descriptor sockfd,
int bufsz, EDirection edir)
142 int optname = (edir == E_WRITE_PIPE ? SO_SNDBUF : SO_RCVBUF);
145 socklen_t getbufsz_len =
sizeof(getbufsz);
148 int errc = ::getsockopt(sockfd, SOL_SOCKET, optname, (
char*)&getbufsz, &getbufsz_len);
150 int errc = ::getsockopt(sockfd, SOL_SOCKET, optname, &getbufsz, &getbufsz_len);
152 if (errc == 0 && getbufsz < bufsz)
155 ::setsockopt(sockfd, SOL_SOCKET, optname, (
char*)&bufsz,
sizeof(bufsz));
157 ::setsockopt(sockfd, SOL_SOCKET, optname, &bufsz,
sizeof(bufsz));
164 int const BUFSZ = 64 * 1024;
165 setKernelBufferSize(sockfd_read,
BUFSZ, E_READ_PIPE);
166 setKernelBufferSize(sockfd_write,
BUFSZ, E_WRITE_PIPE);
171 #if defined(BLOCXX_DARWIN) 174 bool needDescriptorPassingWorkaround =
true;
177 OnceFlag detectDescriptorPassingBugFlag = BLOCXX_ONCE_INIT;
180 void detectDescriptorPassingBug()
183 needDescriptorPassingWorkaround =
true;
187 struct utsname unamerv;
188 if (::uname(&unamerv) == -1)
190 needDescriptorPassingWorkaround =
true;
193 String release(unamerv.release);
194 PosixRegEx re(
"([^.]*)\\..*");
196 if (releaseCapture.size() < 2)
198 needDescriptorPassingWorkaround =
true;
201 String majorRelease = releaseCapture[1];
204 needDescriptorPassingWorkaround = (majorRelease.toInt32() < 9);
206 catch (StringConversionException& e)
208 needDescriptorPassingWorkaround =
true;
217 #ifdef BLOCXX_NETWARE 223 AcceptThread(
int serversock)
224 : m_serversock(serversock)
229 void acceptConnection();
230 int getConnectFD() {
return m_serverconn; }
237 AcceptThread::acceptConnection()
239 struct sockaddr_in sin;
244 ::setsockopt(m_serversock, IPPROTO_TCP, 1,
245 (
char*) &tmp,
sizeof(
int));
247 val =
sizeof(
struct sockaddr_in);
248 if ((m_serverconn = upaccept(m_serversock, (
struct sockaddr*)&sin, &val))
254 ::setsockopt(m_serverconn, IPPROTO_TCP, 1,
255 (
char *) &tmp,
sizeof(
int));
257 ::setsockopt(m_serverconn, SOL_SOCKET, SO_KEEPALIVE,
258 (
char*) &tmp,
sizeof(
int));
262 runConnClass(
void* arg)
264 AcceptThread* acceptThread = (AcceptThread*)(arg);
265 acceptThread->acceptConnection();
266 ::pthread_exit(NULL);
273 int svrfd, lerrno, connectfd;
275 struct sockaddr_in sin;
277 svrfd = socket( AF_INET, SOCK_STREAM, 0 );
278 sin.sin_family = AF_INET;
279 sin.sin_addr.s_addr = htonl( 0x7f000001 );
281 memset(sin.sin_zero, 0, 8 );
282 if (bind(svrfd, (
struct sockaddr * )&sin,
sizeof(
struct sockaddr_in ) ) == -1)
286 fprintf(stderr,
"CreateSocket(): Failed to bind on socket" );
289 if (listen(svrfd, 1) == -1)
295 val =
sizeof(
struct sockaddr_in);
296 if (getsockname(svrfd, (
struct sockaddr * )&sin, &val ) == -1)
299 fprintf(stderr,
"CreateSocket(): Failed to obtain socket name" );
304 AcceptThread* pat =
new AcceptThread(svrfd);
308 pthread_create(&athread, NULL, runConnClass, pat);
310 int clientfd = socket(AF_INET, SOCK_STREAM, 0);
318 struct sockaddr_in csin;
319 csin.sin_family = AF_INET;
320 csin.sin_addr.s_addr = htonl(0x7f000001);
321 csin.sin_port = sin.sin_port;
322 if (::connect(clientfd, (
struct sockaddr*)&csin,
sizeof(csin)) == -1)
328 #define TCP_NODELAY 1 333 ::setsockopt(clientfd, IPPROTO_TCP, TCP_NODELAY, (
char*)&tmp,
sizeof(
int));
335 ::setsockopt(clientfd, SOL_SOCKET, SO_KEEPALIVE, (
char*)&tmp,
sizeof(
int));
339 ::pthread_join(athread, &threadResult);
342 fds[0] = pat->getConnectFD();
348 #endif // BLOCXX_NETWARE 384 void set_desc_blocking(
385 int d, EBlockingMode & bmflag, EBlockingMode blocking_mode)
388 int fdflags = fcntl(d, F_GETFL, 0);
395 fdflags &= ~O_NONBLOCK;
399 fdflags |= O_NONBLOCK;
401 if (fcntl(d, F_SETFL, fdflags) == -1)
405 bmflag = blocking_mode;
414 for (
size_t i = 0;
i < 2; ++
i)
442 #if defined(BLOCXX_NETWARE) 450 if (::socketpair(AF_UNIX, SOCK_STREAM, 0,
m_fds) == -1)
455 ::shutdown(
m_fds[0], SHUT_WR);
456 ::shutdown(
m_fds[1], SHUT_RD);
475 rc = upclose(
m_fds[0]);
481 rc = upclose(
m_fds[1]);
503 rc = upclose(
m_fds[0]);
518 rc = upclose(
m_fds[1]);
550 rc = upwrite(
m_fds[1], data, dataLen);
591 rc = upread(
m_fds[0], buffer, bufferLen);
653 #if defined(BLOCXX_DARWIN) 654 callOnce(detectDescriptorPassingBugFlag, detectDescriptorPassingBug);
655 if (rc != -1 && needDescriptorPassingWorkaround)
715 #if defined(BLOCXX_DARWIN) 716 callOnce(detectDescriptorPassingBugFlag, detectDescriptorPassingBug);
717 if (needDescriptorPassingWorkaround)
#define BLOCXX_INVALID_HANDLE
virtual Select_t getReadSelectObj() const
Get the read select object.
virtual bool isOpen() const
Is the pipe open or closed?
virtual int write(const void *data, int dataLen, ErrorAction errorAsException=E_RETURN_ON_ERROR)
Write a specified number of bytes to the device that is exposing the IOIFC interface.
void setTimeouts(const Timeout &timeout)
Sets the read & write timeout values.
virtual void setWriteBlocking(EBlockingMode isBlocking=E_BLOCKING)
EBlockingMode m_blocking[2]
#define BLOCXX_LOG_ERROR(logger, message)
Log message to logger with the Error level.
virtual void setReadBlocking(EBlockingMode isBlocking=E_BLOCKING)
Timeout getWriteTimeout()
Gets the write timeout value.
#define BLOCXX_ASSERT(CON)
BLOCXX_ASSERT works similar to the assert() macro, but instead of calling abort(),...
virtual int close()
Close the pipe.
BLOCXX_COMMON_API int close(const FileHandle &hdl)
Close file handle.
virtual int write(const void *dataOut, int dataOutLen, ErrorAction errorAsException=E_RETURN_ON_ERROR)=0
Write a specified number of bytes to the device that is exposing the IOIFC interface.
virtual void passDescriptor(Descriptor h, const UnnamedPipeRef &ackPipe=0, const ProcessRef &targetProcess=0)
Sends a Descriptor to the peer.
LazyGlobal< String, char const *const > GlobalString
void read(std::streambuf &istrm, void *dataIn, size_t dataInLen)
virtual int read(void *dataIn, int dataInLen, ErrorAction errorAsException=E_RETURN_ON_ERROR)=0
Read a specified number of bytes from the device that is exposing the IOIFC interface.
virtual void setBlocking(EBlockingMode outputIsBlocking=E_BLOCKING)
handle_type release()
Relinquish ownership of resource and return its handle.
virtual Descriptor getOutputDescriptor() const
Get the underlying output descriptor.
virtual AutoDescriptor receiveDescriptor(const UnnamedPipeRef &ackPipe)
Gets a Descriptor from the peer.
PosixUnnamedPipe(EOpen doOpen=E_OPEN)
Array< String > StringArray
virtual Descriptor getInputDescriptor() const =0
Get the underlying input descriptor.
static Timeout relative(float seconds)
virtual EBlockingMode getWriteBlocking() const
Get the current blocking mode for writing from pipe.
GlobalString COMPONENT_NAME
virtual void open()
Open the pipe.
virtual int closeInputHandle()
virtual EBlockingMode getReadBlocking() const
Get the current blocking mode for reading from pipe.
PURPOSE: The AutoResource class template is an analog of std::auto_ptr for managing arbitrary resourc...
virtual int closeOutputHandle()
int passDescriptor(Descriptor streamPipe, Descriptor descriptor, ProcId targetProcessId)
Sends a Descriptor to the peer.
#define BLOCXX_GLOBAL_STRING_INIT(str)
virtual Descriptor getInputDescriptor() const
Get the underlying input descriptor.
Timeout getReadTimeout()
Gets the read timeout value.
virtual Descriptor getOutputDescriptor() const =0
Get the underlying output descriptor.
void BLOCXX_COMMON_API callOnce(OnceFlag &flag, FuncT F)
The first time callOnce is called with a given onceFlag argument, it calls func with no argument and ...
class BLOCXX_COMMON_API Logger
virtual int read(void *buffer, int bufferLen, ErrorAction errorAsException=E_RETURN_ON_ERROR)
Read a specified number of bytes from the device that is exposing the IOIFC interface.
#define BLOCXX_THROW(exType, msg)
Throw an exception using FILE and LINE.
handle_type get() const
Return handle of resource, retaining ownership.
virtual Select_t getWriteSelectObj() const
Get the write select object.
int waitForIO(SocketHandle_t fd, int timeOutSecs, SocketFlags::EWaitDirectionFlag waitFlag)
Wait for input or output on a socket.
static void testCancel()
Test if this thread has been cancelled.
void write(std::streambuf &ostrm, void const *dataOut, size_t dataOutLen)
#define BLOCXX_THROW_ERRNO_MSG(exType, msg)
Throw an exception using FILE, LINE, errno and strerror(errno)
virtual ~PosixUnnamedPipe()
AutoDescriptor receiveDescriptor(Descriptor streamPipe)
Gets a Descriptor from the peer.