Library of Assembled Shared Sources
message_pipe.cpp
Go to the documentation of this file.
1/** @file
2 * @author Bram de Greve (bram@cocamware.com)
3 * @author Tom De Muer (tom@cocamware.com)
4 *
5 * *** BEGIN LICENSE INFORMATION ***
6 *
7 * The contents of this file are subject to the Common Public Attribution License
8 * Version 1.0 (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
10 * http://lass.sourceforge.net/cpal-license. The License is based on the
11 * Mozilla Public License Version 1.1 but Sections 14 and 15 have been added to cover
12 * use of software over a computer network and provide for limited attribution for
13 * the Original Developer. In addition, Exhibit A has been modified to be consistent
14 * with Exhibit B.
15 *
16 * Software distributed under the License is distributed on an "AS IS" basis, WITHOUT
17 * WARRANTY OF ANY KIND, either express or implied. See the License for the specific
18 * language governing rights and limitations under the License.
19 *
20 * The Original Code is LASS - Library of Assembled Shared Sources.
21 *
22 * The Initial Developer of the Original Code is Bram de Greve and Tom De Muer.
23 * The Original Developer is the Initial Developer.
24 *
25 * All portions of the code written by the Initial Developer are:
26 * Copyright (C) 2004-2024 the Initial Developer.
27 * All Rights Reserved.
28 *
29 * Contributor(s):
30 *
31 * Alternatively, the contents of this file may be used under the terms of the
32 * GNU General Public License Version 2 or later (the GPL), in which case the
33 * provisions of GPL are applicable instead of those above. If you wish to allow use
34 * of your version of this file only under the terms of the GPL and not to allow
35 * others to use your version of this file under the CPAL, indicate your decision by
36 * deleting the provisions above and replace them with the notice and other
37 * provisions required by the GPL License. If you do not delete the provisions above,
38 * a recipient may use your version of this file under either the CPAL or the GPL.
39 *
40 * *** END LICENSE INFORMATION ***
41 */
42
43#include "lass_common.h"
44#include "message_pipe.h"
46#include "../num/num_cast.h"
47
48#if LASS_PLATFORM_TYPE == LASS_PLATFORM_TYPE_WIN32
49# define NOMINMAX
50# define WIN32_LEAN_AND_MEAN
51# include <Windows.h>
52#else
53# if LASS_HAVE_SYS_SOCKET_H
54# include <sys/socket.h>
55# endif
56# if LASS_HAVE_LINUX_UN_H
57# include <linux/un.h>
58# elif LASS_HAVE_SYS_UN_H
59# include <sys/un.h>
60# endif
61# if LASS_HAVE_FCNTL_H
62# include <fcntl.h>
63# endif
64# if LASS_HAVE_POLL_H
65# include <poll.h>
66# endif
67# if LASS_HAVE_UNISTD_H
68# include <unistd.h>
69# endif
70# include <errno.h>
71#endif
72
73#include <assert.h>
74#include <string.h>
75
76namespace lass
77{
78namespace io
79{
80
81namespace impl
82{
83
84#ifdef _WIN32
85
86class MessagePipeImpl
87{
88public:
89 static constexpr size_t infinite = MessagePipe::infinite;
90
91 MessagePipeImpl(size_t bufferSize):
92 pipe_(INVALID_HANDLE_VALUE),
93 bufferSize_(static_cast<DWORD>(bufferSize))
94 {
95 name_[0] = 0;
96 }
97
98 ~MessagePipeImpl()
99 {
100 close();
101 }
102
103 bool create()
104 {
105 close();
106
107 const unsigned long pid = GetCurrentProcessId();
108 stde::safe_sprintf(name_, "\\\\.\\pipe\\LassIOMessagePipe_%lx_%i", pid, pipeId_++);
109 name_[maxNameLength] = 0;
110
111 if (!initOverlapped())
112 {
113 return false;
114 }
115
116 pipe_ = CreateNamedPipe(
117 name_,
118 PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED,
119 PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT,
120 1,
121 bufferSize_,
122 bufferSize_,
123 0,
124 0);
125
126 return pipe_ != INVALID_HANDLE_VALUE;
127 }
128
129 bool connect(const char* pipeName, size_t msecTimeout)
130 {
131 close();
132
133 stde::safe_strcpy(name_, pipeName);
134 name_[maxNameLength] = 0;
135
136 if (!initOverlapped())
137 {
138 return false;
139 }
140
141 // attempt to connect
142 const size_t attempts = msecTimeout == infinite ? 1 : std::max<size_t>(msecTimeout / 1000, 1);
143 const DWORD msecTimeoutPerAttemt = dwordTimeout(msecTimeout / attempts);
144 for (size_t k = 0; k <= attempts; ++k) // always try once more
145 {
146 pipe_ = CreateFile(
147 pipeName,
148 FILE_READ_DATA | FILE_WRITE_DATA | FILE_WRITE_ATTRIBUTES,
149 0,
150 0,
151 OPEN_EXISTING,
152 SECURITY_IDENTIFICATION | SECURITY_SQOS_PRESENT | FILE_FLAG_OVERLAPPED,
153 0);
154 if (pipe_ != INVALID_HANDLE_VALUE)
155 {
156 DWORD mode = PIPE_READMODE_MESSAGE;
157 if (!SetNamedPipeHandleState(pipe_, &mode, 0, 0))
158 {
159 close();
160 return false;
161 }
162 return true;
163 }
164 if (GetLastError() != ERROR_PIPE_BUSY || k == attempts)
165 {
166 return false;
167 }
168 if (!WaitNamedPipe(pipeName, msecTimeoutPerAttemt))
169 {
170 return false;
171 }
172 }
173 return false;
174 }
175
176 bool accept(size_t msecTimeout)
177 {
178 if (ConnectNamedPipe(pipe_, &overlapped_ ))
179 {
180 return true; // w00t! connected.
181 }
182 switch ( GetLastError() )
183 {
184 case ERROR_PIPE_CONNECTED:
185 return true; // was connected before the call. that's fine too.
186 case ERROR_IO_PENDING:
187 {
188 if (WaitForSingleObject( overlapped_.hEvent, dwordTimeout(msecTimeout)) != WAIT_OBJECT_0)
189 {
190 CancelIo( pipe_ );
191 return false;
192 }
193 DWORD dummy;
194 return GetOverlappedResult(pipe_, &overlapped_, &dummy, FALSE) != 0;
195 }
196 default:
197 return false;
198 }
199 }
200
201 void close()
202 {
203 if (pipe_ != INVALID_HANDLE_VALUE)
204 {
205 CloseHandle(pipe_);
206 pipe_ = INVALID_HANDLE_VALUE;
207 }
208 }
209
210 const char* name() const
211 {
212 return name_;
213 }
214
215 bool operator!() const
216 {
217 return pipe_ == INVALID_HANDLE_VALUE;
218 }
219
220 bool send(const void* out, size_t size, size_t msecTimeout) const
221 {
222 assert(size <= bufferSize_);
223 if (!WriteFile(pipe_, out, static_cast<DWORD>(size), 0, &overlapped_))
224 {
225 if ( GetLastError() != ERROR_IO_PENDING )
226 {
227 return false;
228 }
229 if ( WaitForSingleObject( overlapped_.hEvent, dwordTimeout(msecTimeout)) != WAIT_OBJECT_0 )
230 {
231 CancelIo( pipe_ );
232 return false;
233 }
234 }
235 DWORD bytesWritten = 0;
236 if ( !GetOverlappedResult( pipe_, &overlapped_, &bytesWritten, FALSE ) )
237 {
238 return false;
239 }
240 return bytesWritten == size;
241 }
242
243 bool receive(void* in, size_t size, size_t msecTimeout) const
244 {
245 assert(size <= bufferSize_);
246 if ( !ReadFile(pipe_, in, static_cast<DWORD>(size),0, &overlapped_) )
247 {
248 if ( GetLastError() != ERROR_IO_PENDING )
249 {
250 return false;
251 }
252 if ( WaitForSingleObject( overlapped_.hEvent, dwordTimeout(msecTimeout)) != WAIT_OBJECT_0 )
253 {
254 CancelIo( pipe_ );
255 return false;
256 }
257 }
258 DWORD bytesRead = 0;
259 if ( !GetOverlappedResult( pipe_, &overlapped_, &bytesRead, FALSE ) )
260 {
261 return false;
262 }
263 return bytesRead == size;
264 }
265
266 bool transact(const void* out, size_t sizeOut, void* in, size_t sizeIn, size_t msecTimeout=INFINITE) const
267 {
268 assert(sizeOut <= bufferSize_ && sizeIn <= bufferSize_);
269 if ( !TransactNamedPipe(pipe_, const_cast<void*>(out), static_cast<DWORD>(sizeOut), in, static_cast<DWORD>(sizeIn), 0, &overlapped_) )
270 {
271 if ( GetLastError() != ERROR_IO_PENDING )
272 {
273 return false;
274 }
275 if ( WaitForSingleObject( overlapped_.hEvent, dwordTimeout(msecTimeout)) != WAIT_OBJECT_0 )
276 {
277 CancelIo( pipe_ );
278 return false;
279 }
280 }
281 DWORD bytesRead = 0;
282 if ( !GetOverlappedResult( pipe_, &overlapped_, &bytesRead, FALSE ) )
283 {
284 return false;
285 }
286 return bytesRead == sizeIn;
287 }
288
289private:
290 static DWORD dwordTimeout(size_t msecTimeout)
291 {
292 return msecTimeout == MessagePipe::infinite ? INFINITE : num::numCast<DWORD>(msecTimeout);
293 }
294
295 bool initOverlapped()
296 {
297 memset(&overlapped_, 0, sizeof(OVERLAPPED));
298 overlapped_.hEvent = CreateEvent(0, TRUE, FALSE, 0);
299 return overlapped_.hEvent != 0;
300 }
301
302 enum
303 {
304 maxNameLength = 256 // according to MSDN docs on named pipes.
305 };
306
307 HANDLE pipe_;
308 mutable OVERLAPPED overlapped_;
309 char name_[maxNameLength + 1];
310 DWORD bufferSize_;
311 static unsigned pipeId_;
312};
313
314unsigned MessagePipeImpl::pipeId_ = 0;
315
316#else
317
318#ifndef UNIX_PATH_MAX
319 // FreeBSD's sys/un.h doesn't have UNIX_PATH_MAX
320 // Actually, Linux's has neither, but we use linux/un.h instead.
321# define UNIX_PATH_MAX sizeof(((sockaddr_un*)0)->sun_path)
322#endif
323
324#if LASS_PLATFORM_TYPE == LASS_PLATFORM_TYPE_LINUX
325# define LASS_HAVE_ABSTRACT_NAMES 1
326#endif
327
328class MessagePipeImpl
329{
330public:
331
332 static constexpr size_t infinite = MessagePipe::infinite;
333
334 MessagePipeImpl(size_t /*bufferSize*/):
335 socket_(-1),
336 pipe_(-1),
337 fd_(-1),
338 isServer_(false)
339 {
340 name_[0] = 0;
341 }
342
343 bool create()
344 {
345 close();
346
347 // open a socket
348 socket_ = socket(AF_UNIX, SOCK_SEQPACKET, 0);
349 if (socket_ < 0)
350 {
351 return false;
352 }
353
354 // Not sure if this is actually necessary? Autobind should work without.
355 // In any case, it's only available on Linux
356#if LASS_PLATFORM_TYPE == LASS_PLATFORM_TYPE_LINUX
357 const int passcred = 1;
358 if (setsockopt(socket_, SOL_SOCKET, SO_PASSCRED, &passcred, sizeof(passcred)) != 0)
359 {
360 close();
361 return false;
362 }
363#endif
364
365 // make it non-blocking
366 if (fcntl(socket_, F_SETFL, O_NONBLOCK) != 0)
367 {
368 close();
369 return false;
370 }
371
372 sockaddr_un addr;
373 memset(&addr, 0, sizeof(sockaddr_un));
374 addr.sun_family = AF_UNIX;
375#if LASS_HAVE_ABSTRACT_NAMES
376 // autobind: don't set the name, the length just includes the family
377 socklen_t addr_len = sizeof(sa_family_t);
378#else
379 // BSD doesn't have the concept of abstract names,
380 // so create a temporary file in /tmp.
381 stde::safe_strcpy(name_, "/tmp/lass-sock-XXXXXX");
382 fd_ = mkstemp(name_);
383 if (fd_ == -1)
384 {
385 close();
386 return false;
387 }
388 stde::safe_strcpy(addr.sun_path, name_);
389 socklen_t addr_len = sizeof(sa_family_t) + static_cast<socklen_t>(::strlen(addr.sun_path));
390#endif
391 if (bind(socket_, reinterpret_cast<sockaddr*>(&addr), addr_len) != 0)
392 {
393 close();
394 return false;
395 }
396
397#if LASS_HAVE_ABSTRACT_NAMES
398 // let's retrieve the actual socket name.
399 addr_len = sizeof(sockaddr_un);
400 if(getsockname(socket_, reinterpret_cast<sockaddr*>(&addr), &addr_len) != 0)
401 {
402 close();
403 return false;
404 }
405 if (addr_len <= sizeof(sa_family_t))
406 {
407 return false; // err, no autobind?
408 }
409 assert(addr.sun_path[0] == 0); // abstract names start with null, and don't have a terminating null
410 size_t name_len = addr_len - sizeof(sa_family_t) - 1; // without leading null
411 assert(name_len < sizeof(name_));
412 memcpy(name_, &addr.sun_path[1], name_len); // don't copy leading null
413 name_[name_len] = 0; // set a terminating null
414#endif
415
416 // already start listening, so that child can already connect,
417 // even if we didnt' connect yet.
418 if (listen(socket_, 1) != 0)
419 {
420 close();
421 return false;
422 }
423
424 isServer_ = true;
425 return true;
426 }
427
428 bool connect(const char* pipeName, size_t msecTimeout)
429 {
430 close();
431
432 // open a socket
433 socket_ = socket(AF_UNIX, SOCK_SEQPACKET, 0);
434 if (socket_ < 0)
435 {
436 return false;
437 }
438
439 // make it non-blocking
440 if (fcntl(socket_, F_SETFL, O_NONBLOCK) != 0)
441 {
442 close();
443 return false;
444 }
445
446 // set its name
447 assert(strlen(pipeName) < UNIX_PATH_MAX);
448 const size_t name_len = std::min<size_t>(strlen(pipeName), UNIX_PATH_MAX - 1);
449 memcpy(name_, pipeName, name_len);
450 name_[name_len] = 0;
451 sockaddr_un addr;
452 addr.sun_family = AF_UNIX;
453#if LASS_HAVE_ABSTRACT_NAMES
454 memset(addr.sun_path, 0, UNIX_PATH_MAX); // fill with zeroes.
455 memcpy(&addr.sun_path[1], pipeName, name_len);
456 socklen_t addr_len = static_cast<socklen_t>(sizeof(sa_family_t) + name_len + 1);
457#else
458 memcpy(&addr.sun_path[0], pipeName, name_len);
459 socklen_t addr_len = static_cast<socklen_t>(sizeof(sa_family_t) + name_len);
460#endif
461
462 if ( ::connect(socket_, reinterpret_cast<sockaddr*>(&addr), addr_len) == -1 )
463 {
464 switch (errno)
465 {
466 case EINPROGRESS:
467 case EALREADY:
468 case EINTR:
469 break;
470 default:
471 close();
472 return false;
473 }
474
475 if ( !poll(socket_, POLLOUT, msecTimeout) )
476 {
477 close();
478 return false;
479 }
480
481 int so_error;
482 socklen_t length = sizeof(so_error);
483 if ( ::getsockopt(socket_, SOL_SOCKET, SO_ERROR, &so_error, &length) == -1 )
484 {
485 close();
486 return false;
487 }
488 if ( so_error != 0 )
489 {
490 close();
491 return false;
492 }
493 }
494
495 // we're good!
496 pipe_ = socket_; // is same thing
497 isServer_ = false;
498 return true;
499 }
500
501 bool accept(size_t msecTimeout)
502 {
503 assert(isServer_);
504 if (socket_ < 0)
505 {
506 return false;
507 }
508 if (pipe_ >= 0)
509 {
510 ::close(pipe_);
511 }
512
513 if (!poll(socket_, POLLIN, msecTimeout))
514 {
515 return false;
516 }
517
518 pipe_ = ::accept(socket_, 0, 0);
519
520 if (fcntl(pipe_, F_SETFL, O_NONBLOCK) != 0)
521 {
522 close();
523 return false;
524 }
525
526 return pipe_ >= 0;
527 }
528
529 void close()
530 {
531 if (isServer_ && pipe_ >= 0)
532 {
533 ::close(pipe_);
534 pipe_ = -1;
535 }
536 if (socket_ >= 0)
537 {
538 ::close(socket_);
539 socket_ = -1;
540 }
541 if (fd_ >= 0)
542 {
543 ::close(fd_);
544 fd_ = -1;
545 ::unlink(name_);
546 }
547 }
548
549 const char* name() const
550 {
551 return name_;
552 }
553
554 bool operator!() const
555 {
556 return socket_ < 0;
557 }
558
559 bool send(const void* out, size_t size, size_t msecTimeout) const
560 {
561 if (!poll(pipe_, POLLOUT, msecTimeout))
562 {
563 return false;
564 }
565
566 msghdr hdr;
567 memset(&hdr, 0, sizeof(msghdr));
568
569 iovec msg;
570 msg.iov_base = const_cast<void*>(out);
571 msg.iov_len = size;
572 hdr.msg_iov = &msg;
573 hdr.msg_iovlen = 1;
574 hdr.msg_flags = MSG_EOR;
575
576 return sendmsg(pipe_, &hdr, 0) >= 0;
577 }
578 bool receive(void* in, size_t size, size_t msecTimeout) const
579 {
580 if (!poll(pipe_, POLLIN, msecTimeout))
581 {
582 return false;
583 }
584
585 msghdr hdr;
586 memset(&hdr, 0, sizeof(msghdr));
587
588 iovec msg;
589 msg.iov_base = in;
590 msg.iov_len = size;
591 hdr.msg_iov = &msg;
592 hdr.msg_iovlen = 1;
593
594 if (recvmsg(pipe_, &hdr, 0) <= 0) // -1 is error, 0 is orderly shutdown
595 {
596 return false;
597 }
598
599 // normally, msg_flags should have MSG_EOR set, but linux doesn't care.
600 return !(hdr.msg_flags & MSG_TRUNC);
601 }
602 bool transact(const void* out, size_t sizeOut, void* in, size_t sizeIn, size_t msecTimeout) const
603 {
604 return send(out, sizeOut, msecTimeout) && receive(in, sizeIn, msecTimeout);
605 }
606
607private:
608
609 bool poll(int fd, short events, size_t msecTimeout) const
610 {
611 pollfd pfd;
612 pfd.fd = fd;
613 pfd.events = events;
614 pfd.revents = 0;
615
616 int timeout = -1;
617 num::Tint64 msecDeadline = 0;
618 if (msecTimeout != infinite)
619 {
620 timeout = num::numCast<int>(msecTimeout);
621 msecDeadline = msecTime() + timeout;
622 }
623
624 int rc = ::poll(&pfd, 1, timeout);
625 while (true)
626 {
627 switch(rc)
628 {
629 case -1:
630 if (errno != EINTR)
631 {
632 return false;
633 }
634 break;
635 case 0:
636 return false;
637 default:
638 LASS_ASSERT( rc == 1 );
639 return pfd.revents & events;
640 }
641
642 if (msecTimeout != infinite)
643 {
644 const num::Tint64 timeleft = msecDeadline - msecTime();
645 if (timeleft < 0)
646 return false;
647 timeout = static_cast<int>(timeleft);
648 }
649
650 rc = ::poll(&pfd, 1, timeout);
651 }
652 }
653
654 num::Tint64 msecTime() const
655 {
656 struct timespec tp;
657 LASS_ENFORCE_CLIB(clock_gettime(CLOCK_MONOTONIC, &tp));
658 return static_cast<num::Tint64>(tp.tv_sec) * 1000 + static_cast<num::Tint64>(tp.tv_nsec) / 1000000;
659 }
660
661 int socket_;
662 int pipe_;
663 int fd_;
664 bool isServer_;
665 char name_[UNIX_PATH_MAX];
666};
667
668#endif
669
670}
671
672// --- MessagePipe ---------------------------------------------------------------------------------
673
674MessagePipe::MessagePipe(size_t bufferSize):
675 pimpl_(new impl::MessagePipeImpl(bufferSize))
676{
677}
678
679
680MessagePipe::~MessagePipe()
681{
682 delete pimpl_;
683}
684
685
686bool MessagePipe::create()
687{
688 return pimpl_->create();
689}
690
691
692bool MessagePipe::connect(const char* pipeName, size_t msecTimeout)
693{
694 return pimpl_->connect(pipeName, msecTimeout);
695}
696
697
698bool MessagePipe::accept(size_t msecTimeout)
699{
700 return pimpl_->accept(msecTimeout);
701}
702
703
704void MessagePipe::close()
705{
706 pimpl_->close();
707}
708
709
710const char* MessagePipe::name() const
711{
712 return pimpl_->name();
713}
714
715
716bool MessagePipe::operator!() const
717{
718 return !(*pimpl_);
719}
720
721
722bool MessagePipe::send(const void* out, size_t size, size_t msecTimeout) const
723{
724 return pimpl_->send(out, size, msecTimeout);
725}
726
727
728bool MessagePipe::receive(void* in, size_t size, size_t msecTimeout) const
729{
730 return pimpl_->receive(in, size, msecTimeout);
731}
732
733
734bool MessagePipe::transact(const void* out, size_t outSize, void* in, size_t inSize, size_t msecTimeout) const
735{
736 return pimpl_->transact(out, outSize, in, inSize, msecTimeout);
737}
738
739
740}
741}
742
743// EOF
streams, binary streams, vrmlstreams, ...
ColorRGBA out(const ColorRGBA &a, const ColorRGBA &b)
a held out by b, part of a outside b.
ColorRGBA in(const ColorRGBA &a, const ColorRGBA &b)
part of a inside b.
Library for Assembled Shared Sources.
Definition config.h:53