Library of Assembled Shared Sources
binary_o_socket.cpp
Go to the documentation of this file.
1/** @file
2 * @author Bram de Greve (bram@cocamware.com)
3 * @author Tom De Muer (tom@cocamware.com)
4 *
5 * *** BEGIN LICENSE INFORMATION ***
6 *
7 * The contents of this file are subject to the Common Public Attribution License
8 * Version 1.0 (the "License"); you may not use this file except in compliance with
9 * the License. You may obtain a copy of the License at
10 * http://lass.sourceforge.net/cpal-license. The License is based on the
11 * Mozilla Public License Version 1.1 but Sections 14 and 15 have been added to cover
12 * use of software over a computer network and provide for limited attribution for
13 * the Original Developer. In addition, Exhibit A has been modified to be consistent
14 * with Exhibit B.
15 *
16 * Software distributed under the License is distributed on an "AS IS" basis, WITHOUT
17 * WARRANTY OF ANY KIND, either express or implied. See the License for the specific
18 * language governing rights and limitations under the License.
19 *
20 * The Original Code is LASS - Library of Assembled Shared Sources.
21 *
22 * The Initial Developer of the Original Code is Bram de Greve and Tom De Muer.
23 * The Original Developer is the Initial Developer.
24 *
25 * All portions of the code written by the Initial Developer are:
26 * Copyright (C) 2004-2011 the Initial Developer.
27 * All Rights Reserved.
28 *
29 * Contributor(s):
30 *
31 * Alternatively, the contents of this file may be used under the terms of the
32 * GNU General Public License Version 2 or later (the GPL), in which case the
33 * provisions of GPL are applicable instead of those above. If you wish to allow use
34 * of your version of this file only under the terms of the GPL and not to allow
35 * others to use your version of this file under the CPAL, indicate your decision by
36 * deleting the provisions above and replace them with the notice and other
37 * provisions required by the GPL License. If you do not delete the provisions above,
38 * a recipient may use your version of this file under either the CPAL or the GPL.
39 *
40 * *** END LICENSE INFORMATION ***
41 */
42
43#include "lass_common.h"
44#include "binary_o_socket.h"
45#include "socket.h"
46#include "../util/callback_0.h"
47#include "../util/thread_fun.h"
48#include "../num/num_cast.h"
49#include <string.h>
50
51namespace lass
52{
53namespace io
54{
55
56// --- public --------------------------------------------------------------------------------------
57
58BinaryOSocket::BinaryOSocket(size_t bufferSize, unsigned long flushPeriod):
60 socket_(0),
61 requestedBufferSize_(bufferSize),
62 current_(0),
63 flushPeriod_(flushPeriod),
64 stopFlushThread_(false),
65 skipABeat_(false)
66{
67 init();
68}
69
70
71
72/**
73 * @param socket [in] BinaryISocket does _not_ take ownership of socket, and it must be alive as long as BinaryISocket needs it
74 * (until BinaryISocket goes out of scope, or another socket is installed).
75 */
76BinaryOSocket::BinaryOSocket(Socket* socket, size_t bufferSize, unsigned long flushPeriod):
77 BinaryOStream(),
78 socket_(socket),
79 requestedBufferSize_(bufferSize),
80 current_(0),
81 flushPeriod_(flushPeriod),
82 stopFlushThread_(false),
83 skipABeat_(false)
84{
85 init();
86}
87
88
89
90BinaryOSocket::~BinaryOSocket()
91{
92 stopFlushThread_ = true;
93 skipABeat_ = false;
94 try
95 {
96 LASS_LOCK(bufferLock_)
97 {
98 flushImpl();
99 }
100 flushCondition_.signal();
101 flushThread_->join();
102 }
103 catch (std::exception& error)
104 {
105 std::cerr << "[LASS RUN MSG] UNDEFINED BEHAVIOUR WARNING: exception thrown in ~BinaryOSocket(): " << error.what() << std::endl;
106 }
107 catch (...)
108 {
109 std::cerr << "[LASS RUN MSG] UNDEFINED BEHAVIOUR WARNING: unknown exception thrown in ~BinaryOSocket()" << std::endl;
110 }
111}
112
113
114
115Socket* BinaryOSocket::socket() const
116{
117 return socket_;
118}
119
120
121
122/**
123 * @param socket [in] BinaryISocket does _not_ take ownership of socket, and it must be alive as long as BinaryISocket needs it
124 * (until BinaryISocket goes out of scope, or another socket is installed).
125 */
127{
128 LASS_LOCK(bufferLock_)
129 {
130 flushImpl();
131 socket_ = socket;
132 init();
133 }
134}
135
136
137
138// --- private -------------------------------------------------------------------------------------
139
140BinaryOSocket::pos_type BinaryOSocket::doTellp() const
141{
142 LASS_THROW("no position in network streams");
143}
144
145
146
147void BinaryOSocket::doSeekp(pos_type)
148{
149 LASS_THROW("no seeking in network streams");
150}
151
152
153
154void BinaryOSocket::doSeekp(off_type, std::ios_base::seekdir)
155{
156 LASS_THROW("no seeking in network streams");
157}
158
159
160
161void BinaryOSocket::doFlush()
162{
163 LASS_LOCK(bufferLock_)
164 {
165 flushImpl();
166 }
167}
168
169
170
171/** write a buffer of bytes to the stream
172 * @par begin pointer to buffer.
173 * @par numberOfBytes length of buffer in bytes.
174 */
175size_t BinaryOSocket::doWrite(const void* begin, size_t numberOfBytes)
176{
177 LASS_LOCK(bufferLock_)
178 {
179 if (!socket_)
180 {
181 setstate(std::ios_base::badbit); // we won't be able to flush this data as there's nothing to flush it to ...
182 return 0;
183 }
184 const char* first = static_cast<const char*>(begin);
185 size_t bytesToWrite = numberOfBytes;
186 while (bytesToWrite > 0)
187 {
188 if (current_ == buffer_.size())
189 {
190 flushImpl();
191 }
192
193 if (!good())
194 {
195 return 0;
196 }
197
198 LASS_ASSERT(current_ < buffer_.size());
199 const size_t freeSize = buffer_.size() - current_;
200 const size_t writeSize = std::min(bytesToWrite, freeSize);
201
202 ::memcpy(&buffer_[current_], first, writeSize);
203 current_ += writeSize;
204 first += writeSize;
205 bytesToWrite -= writeSize;
206 }
207 skipABeat_ = true;
208 }
209
210 return numberOfBytes;
211}
212
213
214
215void BinaryOSocket::init()
216{
217 size_t size = requestedBufferSize_;
218 if (socket_)
219 {
220 const size_t maxSize = num::numCast<size_t>(socket_->sizeSendBuffer());
221 if (size)
222 {
223 size = std::min(size, maxSize);
224 }
225 else
226 {
227 size = maxSize;
228 }
229 }
230 buffer_.resize(size);
231 current_ = 0;
232
233 if (!flushThread_)
234 {
235 flushThread_.reset(util::threadMemFun(this, &BinaryOSocket::flusher, util::threadJoinable));
236 flushThread_->run();
237 }
238}
239
240
241
242void BinaryOSocket::flusher()
243{
244 while (!stopFlushThread_)
245 {
246 LASS_TRY_LOCK( bufferLock_ )
247 {
248 if (!skipABeat_)
249 {
250 flushImpl();
251 }
252 skipABeat_ = false;
253 }
254 flushCondition_.wait(flushPeriod_);
255 }
256}
257
258
259void BinaryOSocket::flushImpl()
260{
261 if (!good() || !socket_ || buffer_.empty())
262 {
263 return;
264 }
265
266 const char* begin = &buffer_[0];
267 const size_t nMax = static_cast<size_t>(num::NumTraits<int>::max);
268
269 while (current_ > 0)
270 {
271 const int n = static_cast<int>(current_ > nMax ? nMax : current_);
272 LASS_ASSERT(n > 0 && static_cast<size_t>(n) <= current_);
273 try
274 {
275 const int sent = socket_->send(begin, n);
276 LASS_ASSERT(sent >= 0 && sent <= n);
277 begin += sent;
278 current_ -= static_cast<size_t>(sent);
279 }
280 catch (const util::Exception&)
281 {
282 setstate(std::ios_base::badbit);
283 return;
284 }
285 }
286}
287
288
289}
290
291}
292
293// EOF
void setSocket(Socket *socket)
base class of binary output streams.
TCP/IP socket.
Definition socket.h:76
#define LASS_LOCK(iLock)
Locks a iLock and starts a scope block in which it remains locked.
Definition thread.h:619
@ threadJoinable
joinable thread, can be waited for
Definition thread.h:111
streams, binary streams, vrmlstreams, ...
Library for Assembled Shared Sources.
Definition config.h:53