134 // copy the data over, unless it's too large |
140 // copy the data over, unless it's too large |
135 if (prefix <= buf_max) { |
141 if (prefix <= buf_max) { |
136 memcpy(buf_ptr, buf, prefix); |
142 memcpy(buf_ptr, buf, prefix); |
137 |
143 |
138 // trim the bytes out |
144 // trim the bytes out |
139 trim(prefix); |
145 trim(sizeof(PrefixType) + prefix); |
140 |
146 |
141 // return |
147 // return |
142 return prefix; |
148 return prefix; |
143 |
149 |
144 } else { |
150 } else { |
145 // trim the bytes out |
151 // trim the bytes out |
146 trim(prefix); |
152 trim(sizeof(PrefixType) + prefix); |
147 |
153 |
148 throw NetworkBufferError("recv prefix overflow"); |
154 throw NetworkBufferError("recv prefix overflow"); |
149 } |
155 } |
150 } |
156 } |
151 |
157 |
159 ret = socket.send(buf_ptr, buf_size); |
165 ret = socket.send(buf_ptr, buf_size); |
160 |
166 |
161 } catch (CL_Error &e) { |
167 } catch (CL_Error &e) { |
162 // ignore EAGAIN, detect this by setting ret to -1 |
168 // ignore EAGAIN, detect this by setting ret to -1 |
163 if (errno != EAGAIN) |
169 if (errno != EAGAIN) |
164 throw; |
170 throw NetworkSocketOSError(socket, "send"); |
165 |
171 |
166 ret = -1; |
172 ret = -1; |
167 } |
173 } |
168 |
174 |
169 // if we managed to send something, adjust buf/size and buffer |
175 // if we managed to send something, adjust buf/size and buffer |
202 // ignore EAGAIN and just return |
208 // ignore EAGAIN and just return |
203 if (errno == EAGAIN) |
209 if (errno == EAGAIN) |
204 return; |
210 return; |
205 |
211 |
206 else |
212 else |
207 throw; |
213 throw NetworkSocketOSError(socket, "send"); |
208 } |
214 } |
209 |
215 |
210 // trim the buffer |
216 // trim the buffer |
211 trim(ret); |
217 trim(ret); |
212 } |
218 } |
213 |
219 |
214 template <typename PrefixType> void NetworkBuffer::write_prefix (char *buf, PrefixType prefix) { |
220 void NetworkBuffer::write_prefix (char *buf, uint16_t prefix) { |
215 push_write((char*) &prefix, sizeof(PrefixType)); |
221 uint16_t nval = htons(prefix); |
|
222 |
|
223 push_write((char*) &nval, sizeof(uint16_t)); |
216 push_write(buf, prefix); |
224 push_write(buf, prefix); |
217 } |
225 } |
218 |
226 |
|
227 void NetworkBuffer::write_prefix (char *buf, uint32_t prefix) { |
|
228 uint32_t nval = htonl(prefix); |
|
229 |
|
230 push_write((char*) &nval, sizeof(uint32_t)); |
|
231 push_write(buf, prefix); |
|
232 } |
219 |
233 |
220 NetworkTCPTransport::NetworkTCPTransport (NetworkSocket socket) : |
234 NetworkTCPTransport::NetworkTCPTransport (NetworkSocket socket) : |
221 socket(socket), in(socket, NETWORK_TCP_INITIAL_IN_BUF), out(socket, NETWORK_TCP_INITIAL_OUT_BUF) { |
235 socket(socket), in(socket, NETWORK_TCP_INITIAL_IN_BUF), out(socket, NETWORK_TCP_INITIAL_OUT_BUF) { |
222 |
236 |
223 // use nonblocking sockets |
|
224 socket.set_nonblocking(true); |
|
225 |
|
226 // connect signals |
237 // connect signals |
227 slots.connect(socket.sig_read_triggered(), this, &NetworkTCPTransport::on_read); |
238 slots.connect(socket.sig_read_triggered(), this, &NetworkTCPTransport::on_read); |
228 slots.connect(socket.sig_write_triggered(), this, &NetworkTCPTransport::on_write); |
239 slots.connect(socket.sig_write_triggered(), this, &NetworkTCPTransport::on_write); |
229 slots.connect(socket.sig_disconnected(), this, &NetworkTCPTransport::on_disconnected); |
240 slots.connect(socket.sig_disconnected(), this, &NetworkTCPTransport::on_disconnected); |
230 } |
241 } |
254 void NetworkTCPTransport::write_packet (const NetworkPacket &packet) { |
265 void NetworkTCPTransport::write_packet (const NetworkPacket &packet) { |
255 uint16_t prefix = packet.get_data_size(); |
266 uint16_t prefix = packet.get_data_size(); |
256 |
267 |
257 if (prefix != packet.get_data_size()) |
268 if (prefix != packet.get_data_size()) |
258 throw CL_Error("send prefix overflow"); |
269 throw CL_Error("send prefix overflow"); |
259 |
270 |
260 // just write to the output buffer |
271 try { |
261 out.write_prefix<uint16_t>((char *) packet.get_buf(), prefix); |
272 // just write to the output buffer |
|
273 out.write_prefix((char *) packet.get_buf(), prefix); |
|
274 |
|
275 } catch (Error &e) { |
|
276 const char *err = e.what(); |
|
277 |
|
278 Engine::log(ERROR, "tcp.write_packet") << err; |
|
279 |
|
280 throw; |
|
281 } |
262 } |
282 } |
263 |
283 |
264 NetworkTCPServer::NetworkTCPServer (const NetworkAddress &listen_addr) : |
284 NetworkTCPServer::NetworkTCPServer (const NetworkAddress &listen_addr) : |
265 socket(CL_Socket::tcp, CL_Socket::ipv4) { |
285 socket(CL_Socket::tcp, CL_Socket::ipv4) { |
266 |
286 |
270 // assign slots |
290 // assign slots |
271 slots.connect(socket.sig_read_triggered(), this, &NetworkTCPServer::on_accept); |
291 slots.connect(socket.sig_read_triggered(), this, &NetworkTCPServer::on_accept); |
272 |
292 |
273 // listen |
293 // listen |
274 socket.listen(NETWORK_LISTEN_BACKLOG); |
294 socket.listen(NETWORK_LISTEN_BACKLOG); |
|
295 |
|
296 // use nonblocking sockets |
|
297 socket.set_nonblocking(true); |
275 } |
298 } |
276 |
299 |
277 |
300 |
278 void NetworkTCPServer::on_accept (void) { |
301 void NetworkTCPServer::on_accept (void) { |
279 // accept a new socket |
302 // accept a new socket |
293 NetworkTCPClient::NetworkTCPClient (const NetworkAddress &connect_addr) : |
316 NetworkTCPClient::NetworkTCPClient (const NetworkAddress &connect_addr) : |
294 NetworkTCPTransport(NetworkSocket(CL_Socket::tcp, CL_Socket::ipv4)) { |
317 NetworkTCPTransport(NetworkSocket(CL_Socket::tcp, CL_Socket::ipv4)) { |
295 |
318 |
296 // connect |
319 // connect |
297 socket.connect(connect_addr); |
320 socket.connect(connect_addr); |
298 |
321 |
299 } |
322 // use nonblocking sockets |
|
323 socket.set_nonblocking(true); |
|
324 } |