48#if LASS_PLATFORM_TYPE == LASS_PLATFORM_TYPE_WIN32
50# define WIN32_LEAN_AND_MEAN
53# if LASS_HAVE_SYS_SOCKET_H
54# include <sys/socket.h>
56# if LASS_HAVE_LINUX_UN_H
58# elif LASS_HAVE_SYS_UN_H
67# if LASS_HAVE_UNISTD_H
89 static constexpr size_t infinite = MessagePipe::infinite;
91 MessagePipeImpl(
size_t bufferSize):
92 pipe_(INVALID_HANDLE_VALUE),
93 bufferSize_(static_cast<DWORD>(bufferSize))
107 const unsigned long pid = GetCurrentProcessId();
108 stde::safe_sprintf(name_,
"\\\\.\\pipe\\LassIOMessagePipe_%lx_%i", pid, pipeId_++);
109 name_[maxNameLength] = 0;
111 if (!initOverlapped())
116 pipe_ = CreateNamedPipe(
118 PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED,
119 PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT,
126 return pipe_ != INVALID_HANDLE_VALUE;
129 bool connect(
const char* pipeName,
size_t msecTimeout)
133 stde::safe_strcpy(name_, pipeName);
134 name_[maxNameLength] = 0;
136 if (!initOverlapped())
142 const size_t attempts = msecTimeout == infinite ? 1 : std::max<size_t>(msecTimeout / 1000, 1);
143 const DWORD msecTimeoutPerAttemt = dwordTimeout(msecTimeout / attempts);
144 for (
size_t k = 0; k <= attempts; ++k)
148 FILE_READ_DATA | FILE_WRITE_DATA | FILE_WRITE_ATTRIBUTES,
152 SECURITY_IDENTIFICATION | SECURITY_SQOS_PRESENT | FILE_FLAG_OVERLAPPED,
154 if (pipe_ != INVALID_HANDLE_VALUE)
156 DWORD mode = PIPE_READMODE_MESSAGE;
157 if (!SetNamedPipeHandleState(pipe_, &mode, 0, 0))
164 if (GetLastError() != ERROR_PIPE_BUSY || k == attempts)
168 if (!WaitNamedPipe(pipeName, msecTimeoutPerAttemt))
176 bool accept(
size_t msecTimeout)
178 if (ConnectNamedPipe(pipe_, &overlapped_ ))
182 switch ( GetLastError() )
184 case ERROR_PIPE_CONNECTED:
186 case ERROR_IO_PENDING:
188 if (WaitForSingleObject( overlapped_.hEvent, dwordTimeout(msecTimeout)) != WAIT_OBJECT_0)
194 return GetOverlappedResult(pipe_, &overlapped_, &dummy, FALSE) != 0;
203 if (pipe_ != INVALID_HANDLE_VALUE)
206 pipe_ = INVALID_HANDLE_VALUE;
210 const char* name()
const
215 bool operator!()
const
217 return pipe_ == INVALID_HANDLE_VALUE;
220 bool send(
const void* out,
size_t size,
size_t msecTimeout)
const
222 assert(size <= bufferSize_);
223 if (!WriteFile(pipe_, out,
static_cast<DWORD
>(size), 0, &overlapped_))
225 if ( GetLastError() != ERROR_IO_PENDING )
229 if ( WaitForSingleObject( overlapped_.hEvent, dwordTimeout(msecTimeout)) != WAIT_OBJECT_0 )
235 DWORD bytesWritten = 0;
236 if ( !GetOverlappedResult( pipe_, &overlapped_, &bytesWritten, FALSE ) )
240 return bytesWritten == size;
243 bool receive(
void* in,
size_t size,
size_t msecTimeout)
const
245 assert(size <= bufferSize_);
246 if ( !ReadFile(pipe_, in,
static_cast<DWORD
>(size),0, &overlapped_) )
248 if ( GetLastError() != ERROR_IO_PENDING )
252 if ( WaitForSingleObject( overlapped_.hEvent, dwordTimeout(msecTimeout)) != WAIT_OBJECT_0 )
259 if ( !GetOverlappedResult( pipe_, &overlapped_, &bytesRead, FALSE ) )
263 return bytesRead == size;
266 bool transact(
const void* out,
size_t sizeOut,
void* in,
size_t sizeIn,
size_t msecTimeout=INFINITE)
const
268 assert(sizeOut <= bufferSize_ && sizeIn <= bufferSize_);
269 if ( !TransactNamedPipe(pipe_,
const_cast<void*
>(out),
static_cast<DWORD
>(sizeOut), in,
static_cast<DWORD
>(sizeIn), 0, &overlapped_) )
271 if ( GetLastError() != ERROR_IO_PENDING )
275 if ( WaitForSingleObject( overlapped_.hEvent, dwordTimeout(msecTimeout)) != WAIT_OBJECT_0 )
282 if ( !GetOverlappedResult( pipe_, &overlapped_, &bytesRead, FALSE ) )
286 return bytesRead == sizeIn;
290 static DWORD dwordTimeout(
size_t msecTimeout)
292 return msecTimeout == MessagePipe::infinite ? INFINITE : num::numCast<DWORD>(msecTimeout);
295 bool initOverlapped()
297 memset(&overlapped_, 0,
sizeof(OVERLAPPED));
298 overlapped_.hEvent = CreateEvent(0, TRUE, FALSE, 0);
299 return overlapped_.hEvent != 0;
308 mutable OVERLAPPED overlapped_;
309 char name_[maxNameLength + 1];
311 static unsigned pipeId_;
314unsigned MessagePipeImpl::pipeId_ = 0;
321# define UNIX_PATH_MAX sizeof(((sockaddr_un*)0)->sun_path)
324#if LASS_PLATFORM_TYPE == LASS_PLATFORM_TYPE_LINUX
325# define LASS_HAVE_ABSTRACT_NAMES 1
332 static constexpr size_t infinite = MessagePipe::infinite;
334 MessagePipeImpl(
size_t ):
348 socket_ = socket(AF_UNIX, SOCK_SEQPACKET, 0);
356#if LASS_PLATFORM_TYPE == LASS_PLATFORM_TYPE_LINUX
357 const int passcred = 1;
358 if (setsockopt(socket_, SOL_SOCKET, SO_PASSCRED, &passcred,
sizeof(passcred)) != 0)
366 if (fcntl(socket_, F_SETFL, O_NONBLOCK) != 0)
373 memset(&addr, 0,
sizeof(sockaddr_un));
374 addr.sun_family = AF_UNIX;
375#if LASS_HAVE_ABSTRACT_NAMES
377 socklen_t addr_len =
sizeof(sa_family_t);
381 stde::safe_strcpy(name_,
"/tmp/lass-sock-XXXXXX");
382 fd_ = mkstemp(name_);
388 stde::safe_strcpy(addr.sun_path, name_);
389 socklen_t addr_len =
sizeof(sa_family_t) +
static_cast<socklen_t
>(::strlen(addr.sun_path));
391 if (bind(socket_,
reinterpret_cast<sockaddr*
>(&addr), addr_len) != 0)
397#if LASS_HAVE_ABSTRACT_NAMES
399 addr_len =
sizeof(sockaddr_un);
400 if(getsockname(socket_,
reinterpret_cast<sockaddr*
>(&addr), &addr_len) != 0)
405 if (addr_len <=
sizeof(sa_family_t))
409 assert(addr.sun_path[0] == 0);
410 size_t name_len = addr_len -
sizeof(sa_family_t) - 1;
411 assert(name_len <
sizeof(name_));
412 memcpy(name_, &addr.sun_path[1], name_len);
418 if (listen(socket_, 1) != 0)
428 bool connect(
const char* pipeName,
size_t msecTimeout)
433 socket_ = socket(AF_UNIX, SOCK_SEQPACKET, 0);
440 if (fcntl(socket_, F_SETFL, O_NONBLOCK) != 0)
447 assert(strlen(pipeName) < UNIX_PATH_MAX);
448 const size_t name_len = std::min<size_t>(strlen(pipeName), UNIX_PATH_MAX - 1);
449 memcpy(name_, pipeName, name_len);
452 addr.sun_family = AF_UNIX;
453#if LASS_HAVE_ABSTRACT_NAMES
454 memset(addr.sun_path, 0, UNIX_PATH_MAX);
455 memcpy(&addr.sun_path[1], pipeName, name_len);
456 socklen_t addr_len =
static_cast<socklen_t
>(
sizeof(sa_family_t) + name_len + 1);
458 memcpy(&addr.sun_path[0], pipeName, name_len);
459 socklen_t addr_len =
static_cast<socklen_t
>(
sizeof(sa_family_t) + name_len);
462 if ( ::connect(socket_,
reinterpret_cast<sockaddr*
>(&addr), addr_len) == -1 )
475 if ( !poll(socket_, POLLOUT, msecTimeout) )
482 socklen_t length =
sizeof(so_error);
483 if ( ::getsockopt(socket_, SOL_SOCKET, SO_ERROR, &so_error, &length) == -1 )
501 bool accept(
size_t msecTimeout)
513 if (!poll(socket_, POLLIN, msecTimeout))
518 pipe_ = ::accept(socket_, 0, 0);
520 if (fcntl(pipe_, F_SETFL, O_NONBLOCK) != 0)
531 if (isServer_ && pipe_ >= 0)
549 const char* name()
const
554 bool operator!()
const
559 bool send(
const void* out,
size_t size,
size_t msecTimeout)
const
561 if (!poll(pipe_, POLLOUT, msecTimeout))
567 memset(&hdr, 0,
sizeof(msghdr));
570 msg.iov_base =
const_cast<void*
>(
out);
574 hdr.msg_flags = MSG_EOR;
576 return sendmsg(pipe_, &hdr, 0) >= 0;
578 bool receive(
void* in,
size_t size,
size_t msecTimeout)
const
580 if (!poll(pipe_, POLLIN, msecTimeout))
586 memset(&hdr, 0,
sizeof(msghdr));
594 if (recvmsg(pipe_, &hdr, 0) <= 0)
600 return !(hdr.msg_flags & MSG_TRUNC);
602 bool transact(
const void* out,
size_t sizeOut,
void* in,
size_t sizeIn,
size_t msecTimeout)
const
604 return send(out, sizeOut, msecTimeout) && receive(in, sizeIn, msecTimeout);
609 bool poll(
int fd,
short events,
size_t msecTimeout)
const
617 num::Tint64 msecDeadline = 0;
618 if (msecTimeout != infinite)
620 timeout = num::numCast<int>(msecTimeout);
621 msecDeadline = msecTime() + timeout;
624 int rc = ::poll(&pfd, 1, timeout);
638 LASS_ASSERT( rc == 1 );
639 return pfd.revents & events;
642 if (msecTimeout != infinite)
644 const num::Tint64 timeleft = msecDeadline - msecTime();
647 timeout =
static_cast<int>(timeleft);
650 rc = ::poll(&pfd, 1, timeout);
654 num::Tint64 msecTime()
const
657 LASS_ENFORCE_CLIB(clock_gettime(CLOCK_MONOTONIC, &tp));
658 return static_cast<num::Tint64
>(tp.tv_sec) * 1000 +
static_cast<num::Tint64
>(tp.tv_nsec) / 1000000;
665 char name_[UNIX_PATH_MAX];
674MessagePipe::MessagePipe(
size_t bufferSize):
675 pimpl_(new impl::MessagePipeImpl(bufferSize))
680MessagePipe::~MessagePipe()
686bool MessagePipe::create()
688 return pimpl_->create();
692bool MessagePipe::connect(
const char* pipeName,
size_t msecTimeout)
694 return pimpl_->connect(pipeName, msecTimeout);
698bool MessagePipe::accept(
size_t msecTimeout)
700 return pimpl_->accept(msecTimeout);
704void MessagePipe::close()
710const char* MessagePipe::name()
const
712 return pimpl_->name();
716bool MessagePipe::operator!()
const
722bool MessagePipe::send(
const void* out,
size_t size,
size_t msecTimeout)
const
724 return pimpl_->send(out, size, msecTimeout);
728bool MessagePipe::receive(
void* in,
size_t size,
size_t msecTimeout)
const
730 return pimpl_->receive(in, size, msecTimeout);
734bool MessagePipe::transact(
const void* out,
size_t outSize,
void* in,
size_t inSize,
size_t msecTimeout)
const
736 return pimpl_->transact(out, outSize, in, inSize, msecTimeout);
streams, binary streams, vrmlstreams, ...
ColorRGBA out(const ColorRGBA &a, const ColorRGBA &b)
a held out by b, part of a outside b.
ColorRGBA in(const ColorRGBA &a, const ColorRGBA &b)
part of a inside b.
Library for Assembled Shared Sources.