12 throw CL_Error("malloc failed"); |
12 throw CL_Error("malloc failed"); |
13 |
13 |
14 // remember size |
14 // remember size |
15 size = size_hint; |
15 size = size_hint; |
16 } |
16 } |
17 |
17 |
18 void NetworkBuffer::resize (size_t new_size) { |
18 ~NetworkBuffer::NetworkBuffer (void) { |
19 // realloc buffer |
19 free(buf); |
20 if ((buf = (char *) realloc((void *) buf, new_size)) == NULL) |
20 } |
21 throw CL_Error("realloc failed"); |
21 |
22 |
22 void NetworkBuffer::resize (size_t suffix_size) { |
23 // update size |
23 size_t new_size = size; |
24 size = new_size; |
24 |
25 } |
25 // grow new_size until item_size fits |
26 |
26 while (offset + item_size > new_size) |
27 void NetworkBuffer::write (const char *buf_ptr, size_t buf_size) { |
27 new_size *= 2; |
|
28 |
|
29 // grow if needed |
|
30 if (new_size != size) { |
|
31 // realloc buffer |
|
32 if ((buf = (char *) realloc((void *) buf, new_size)) == NULL) |
|
33 throw CL_Error("realloc failed"); |
|
34 |
|
35 // update size |
|
36 size = new_size; |
|
37 |
|
38 } else if (new_size > (offset + item_size) * 4) { |
|
39 // XXX: shrink? |
|
40 } |
|
41 } |
|
42 |
|
43 void NetworkBuffer::trim (size_t prefix_size) { |
|
44 // update offset |
|
45 offset -= prefix; |
|
46 |
|
47 // shift the buffer forwards from (buf + prefix) -> (buf), copying (old_offset - prefix) bytes |
|
48 memmove(buf, buf + ret, offset); |
|
49 } |
|
50 |
|
51 bool try_read (size_t item_size) { |
|
52 int ret; |
|
53 size_t to_read = item_size; |
|
54 |
|
55 // keept reads at at least NETWORK_CHUNK_SIZE bytes |
|
56 if (to_read < NETWORK_TCP_CHUNK_SIZE) |
|
57 to_read = NETWORK_TCP_CHUNK_SIZE; |
|
58 |
|
59 // resize buffer if needed |
|
60 resize(to_read); |
|
61 |
|
62 // read once |
|
63 try { |
|
64 ret = socket.recv(buf + offset, to_read); |
|
65 |
|
66 } catch (CL_Error &e) { |
|
67 if (errno == EAGAIN) |
|
68 return false; |
|
69 else |
|
70 throw; |
|
71 } |
|
72 |
|
73 // update offset |
|
74 offset += ret; |
|
75 |
|
76 // did we get enough? |
|
77 if (ret < item_size) |
|
78 return false; |
|
79 else |
|
80 return true; |
|
81 } |
|
82 |
|
83 bool peek_prefix (uint16_t &ref) { |
|
84 if (offset < sizeof(uint16_t)) |
|
85 return false; |
|
86 |
|
87 ret = ntohs(*((uint16_t *) (buf))); |
|
88 |
|
89 return true; |
|
90 } |
|
91 |
|
92 bool peek_prefix (uint32_t &ref) { |
|
93 if (offset < sizeof(uint32_t)) |
|
94 return false; |
|
95 |
|
96 ret = ntohl(*((uint32_t *) (buf))); |
|
97 |
|
98 return true; |
|
99 } |
|
100 |
|
101 template <typename PrefixType> PrefixType read_prefix (char *buf_ptr, size_t buf_max) { |
|
102 PrefixType prefix = 0; |
|
103 size_t missing = 0; |
|
104 |
|
105 do { |
|
106 // do we have the prefix? |
|
107 if (peek_prefix(prefix)) { |
|
108 // do we already have the payload? |
|
109 if (offset >= sizeof(PrefixType) + prefix) { |
|
110 break; |
|
111 |
|
112 } else { |
|
113 missing = (sizeof(PrefixType) + prefix) - offset; |
|
114 } |
|
115 |
|
116 } else { |
|
117 missing = sizeof(PrefixType); |
|
118 } |
|
119 |
|
120 // sanity-check |
|
121 assert(missing); |
|
122 |
|
123 // try and read the missing data |
|
124 if (try_read(missing) == false) { |
|
125 // if unable to read what we need, return zero. |
|
126 return 0; |
|
127 } |
|
128 |
|
129 // assess the situation again |
|
130 } while (true); |
|
131 |
|
132 // copy the data over, unless it's too large |
|
133 if (prefix <= buf_max) { |
|
134 memcpy(buf_ptr, buf, prefix); |
|
135 |
|
136 // trim the bytes out |
|
137 trim(prefix); |
|
138 |
|
139 // return |
|
140 return prefix; |
|
141 |
|
142 } else { |
|
143 // trim the bytes out |
|
144 trim(prefix); |
|
145 |
|
146 throw CL_Error("recv prefix overflow"); |
|
147 } |
|
148 } |
|
149 |
|
150 void NetworkBuffer::push_write (char *buf_ptr, size_t buf_size) { |
28 int ret; |
151 int ret; |
29 |
152 |
30 // try and short-circuit writes unless we have already buffered data |
153 // try and short-circuit writes unless we have already buffered data |
31 if (offset == 0) { |
154 if (offset == 0) { |
32 try { |
155 try { |
53 if (buf_size == 0) |
176 if (buf_size == 0) |
54 return; |
177 return; |
55 } |
178 } |
56 } |
179 } |
57 |
180 |
58 size_t new_size = size; |
181 // resize to fit buf_size more bytes |
59 |
182 resize(buf_size); |
60 // calcluate new buffer size |
183 |
61 while (offset + buf_size > new_size) |
|
62 new_size *= 2; |
|
63 |
|
64 // grow internal buffer if needed |
|
65 if (new_size != size) |
|
66 resize(new_size); |
|
67 |
|
68 // copy into our internal buffer |
184 // copy into our internal buffer |
69 memcpy(buf + offset, buf_ptr, buf_size); |
185 memcpy(buf + offset, buf_ptr, buf_size); |
70 } |
186 } |
71 |
187 |
72 void NetworkBuffer::flush_write (void) { |
188 void NetworkBuffer::flush_write (void) { |
87 |
203 |
88 else |
204 else |
89 throw; |
205 throw; |
90 } |
206 } |
91 |
207 |
92 // update offset |
208 // trim the buffer |
93 offset -= ret; |
209 trim(ret); |
94 |
210 } |
95 // shift the buffer forwards from (buf + ret) -> (buf), copying (old_offset - ret) bytes |
211 |
96 memmove(buf, buf + ret, offset); |
212 template <typename PrefixType> void NetworkBuffer::write_prefix (char *buf, PrefixType prefix) { |
97 } |
213 push_write(&prefix, sizeof(PrefixType)); |
|
214 push_write(buf, prefix); |
|
215 } |
|
216 |
98 |
217 |
99 NetworkTCPTransport::NetworkTCPTransport (CL_Socket socket) : |
218 NetworkTCPTransport::NetworkTCPTransport (CL_Socket socket) : |
100 socket(socket), in(NETWORK_TCP_INITIAL_IN_BUF), out(NETWORK_TCP_INITIAL_OUT_BUF) { |
219 socket(socket), in(NETWORK_TCP_INITIAL_IN_BUF), out(NETWORK_TCP_INITIAL_OUT_BUF) { |
101 |
220 |
102 // use nonblocking sockets |
221 // use nonblocking sockets |
108 slots.connect(socket.sig_disconnected_triggered(), this, &NetworkTCPTransport::on_disconnected); |
227 slots.connect(socket.sig_disconnected_triggered(), this, &NetworkTCPTransport::on_disconnected); |
109 } |
228 } |
110 |
229 |
111 |
230 |
112 void NetworkTCPTransport::on_read (void) { |
231 void NetworkTCPTransport::on_read (void) { |
|
232 uint16_t prefix; |
113 NetworkPacket packet; |
233 NetworkPacket packet; |
114 |
234 |
115 do { |
235 // let the in stream read length-prefixed packets and pass them on to handle_packet |
116 size_t to_read = 0; |
236 while ((prefix = in.read_prefix(packet.get_buf(), packet.get_buf_size)) > 0) { |
117 |
237 packet.set_data_size(prefix); |
118 // guess how much data to receive based on either the given length prefix or our minimim chunk size |
238 sig_packet(packet); |
119 if (in.read_prefix<uint16_t>(to_read) == false || to_read < NETWORK_TCP_CHUNK_SIZE) |
239 } |
120 to_read = NETWORK_TCP_CHUNK_SIZE |
|
121 |
|
122 // do the recv |
|
123 if (in.recv(socket, to_read) == -1) |
|
124 |
|
125 // read out any packets |
|
126 while (in.read_prefix_packet<uint16_t>(packet)) { |
|
127 handle_packet(packet); |
|
128 } |
|
129 } while (...); |
|
130 } |
240 } |
131 |
241 |
132 void NetworkTCPTransport::on_write (void) { |
242 void NetworkTCPTransport::on_write (void) { |
133 // just flush the output buffer |
243 // just flush the output buffer |
134 out.flush_write(); |
244 out.flush_write(); |
135 } |
245 } |
136 |
246 |
137 void NetworkTCPTransport::on_disconnected (void) { |
247 void NetworkTCPTransport::on_disconnected (void) { |
138 |
248 // pass right through |
|
249 sig_disconnect(); |
139 } |
250 } |
140 |
251 |
141 void NetworkTCPTransport::write_packet (const NetworkPacket &packet) { |
252 void NetworkTCPTransport::write_packet (const NetworkPacket &packet) { |
|
253 uint16_t prefix = packet.get_data_size(); |
|
254 |
|
255 if (prefix != packet.get_data_size()) |
|
256 throw CL_Error("send prefix overflow"); |
|
257 |
142 // just write to the output buffer |
258 // just write to the output buffer |
143 out.write(packet.get_buf(), packet.get_size()); |
259 out.write_prefix(packet.get_buf(), prefix); |
144 } |
260 } |
145 |
261 |
146 NetworkTCPServer::NetworkTCPServer (const NetworkAddress &listen_addr) : |
262 NetworkTCPServer::NetworkTCPServer (const NetworkAddress &listen_addr) : |
147 socket(tcp, ipv4) { |
263 socket(tcp, ipv4) { |
148 |
264 |
163 |
279 |
164 // create a new NetworkTCPTransport |
280 // create a new NetworkTCPTransport |
165 NetworkTCPTransport *client = new NetworkTCPTransport(client_sock); |
281 NetworkTCPTransport *client = new NetworkTCPTransport(client_sock); |
166 |
282 |
167 // let our user handle it |
283 // let our user handle it |
168 handle_client(client); |
284 sig_client(client); |
169 } |
285 } |
170 |
286 |
171 NetworkTCPClient::NetworkTCPClient (const NetworkAddress &connect_addr) : |
287 NetworkTCPClient::NetworkTCPClient (const NetworkAddress &connect_addr) : |
172 NetworkTCPTransport(NetworkSocket(tcp, ipv4)) { |
288 NetworkTCPTransport(NetworkSocket(tcp, ipv4)) { |
173 |
289 |