31 double y2; |
36 double y2; |
32 } render_cmd; |
37 } render_cmd; |
33 |
38 |
34 #pragma pack(pop) |
39 #pragma pack(pop) |
35 |
40 |
36 // has cb_done/cb_fail already been called? |
41 // have we sent the command yet? |
|
42 int cmd_sent; |
|
43 |
|
44 // have we received the EOF? |
|
45 int have_eof; |
|
46 |
|
47 // has cb_done/cb_fail/cancel already been called? |
37 int alive; |
48 int alive; |
38 |
49 |
39 void (*cb_sent)(void *arg); |
50 void (*cb_sent)(void *arg); |
40 void (*cb_data)(struct evbuffer *buf, void *arg); |
51 void (*cb_data)(struct evbuffer *buf, void *arg); |
41 void (*cb_done)(void *arg); |
52 void (*cb_done)(void *arg); |
42 void (*cb_fail)(void *arg); |
53 void (*cb_fail)(void *arg); |
43 |
54 |
44 void *cb_arg; |
55 void *cb_arg; |
45 }; |
56 }; |
46 |
57 |
47 static void _remote_render_free (struct remote_render_ctx *ctx) { |
58 // internal prototypes |
48 // free the data_bev |
59 static void _render_remote_free (struct render_remote *ctx); |
49 if (ctx->data_bev) { |
60 static void _render_remote_do_data (struct render_remote *ctx); |
50 bufferevent_free(ctx->data_bev); |
61 static void _render_remote_do_done (struct render_remote *ctx); |
51 ctx->data_bev = NULL; |
62 static void _render_remote_do_fail (struct render_remote *ctx); |
52 } |
63 |
53 |
64 static void _render_remote_free (struct render_remote *ctx) { |
54 // close the socket (ctx->ev_conn remains valid even after we're done with it...) |
65 // free the bev_data |
55 close(event_get_fd(ctx->ev_conn)); |
66 if (ctx->bev_data) |
56 |
67 bufferevent_free(ctx->bev_data); |
57 // and the event |
68 |
58 event_free(ctx->ev_conn); |
69 // and the events |
|
70 if (event_pending(&ctx->ev_connect, EV_WRITE, NULL)) |
|
71 event_del(&ctx->ev_connect); |
|
72 |
|
73 if (event_pending(&ctx->ev_data, EV_READ, NULL)) |
|
74 event_del(&ctx->ev_data); |
|
75 |
|
76 // close the socket (ctx->ev_connect remains valid even after we're done with it...) |
|
77 if (ctx->sock) |
|
78 close(ctx->sock); |
59 |
79 |
60 // free the context structure |
80 // free the context structure |
61 free(ctx); |
81 free(ctx); |
62 } |
82 } |
63 |
83 |
64 static void _remote_render_done (struct remote_render_ctx *ctx) { |
84 static void _render_remote_do_data (struct render_remote *ctx) { |
|
85 // if there's data in the buffer, call cb_data |
|
86 if (evbuffer_get_length(bufferevent_get_input(ctx->bev_data))) { |
|
87 ctx->cb_data(EVBUFFER_INPUT(ctx->bev_data), ctx->cb_arg); |
|
88 } |
|
89 |
|
90 // if we got EOF on the connection and there's no data left in the buffer, call cb_done |
|
91 if (ctx->have_eof && evbuffer_get_length(bufferevent_get_input(ctx->bev_data)) == 0) { |
|
92 _render_remote_do_done(ctx); |
|
93 } |
|
94 } |
|
95 |
|
96 static void _render_remote_do_done (struct render_remote *ctx) { |
65 assert(ctx->alive); |
97 assert(ctx->alive); |
66 |
98 |
67 ctx->alive = 0; |
99 ctx->alive = 0; |
68 |
100 |
69 ctx->cb_done(ctx->cb_arg); |
101 ctx->cb_done(ctx->cb_arg); |
70 |
102 } |
71 _remote_render_free(ctx); |
103 |
72 } |
104 static void _render_remote_do_fail (struct render_remote *ctx) { |
73 |
|
74 static void _remote_render_fail (struct remote_render_ctx *ctx) { |
|
75 assert(ctx->alive); |
105 assert(ctx->alive); |
76 |
106 |
77 ctx->alive = 0; |
107 ctx->alive = 0; |
78 |
108 |
79 ctx->cb_fail(ctx->cb_arg); |
109 ctx->cb_fail(ctx->cb_arg); |
80 |
|
81 _remote_render_free(ctx); |
|
82 } |
110 } |
83 |
111 |
84 static void _remote_write (struct bufferevent *bev, void *arg) { |
112 static void _remote_write (struct bufferevent *bev, void *arg) { |
85 struct remote_render_ctx *ctx = arg; |
113 struct render_remote *ctx = arg; |
86 |
114 |
87 // the write buffer was drained, so the render command was sent |
115 if (!ctx->cmd_sent) { |
88 ctx->cb_sent(ctx->cb_arg); |
116 // write the render command |
89 |
117 if (bufferevent_write(ctx->bev_data, &ctx->render_cmd, sizeof(ctx->render_cmd))) |
90 // we don't care about EV_WRITE anymore |
118 ERROR("bufferevent_write"); |
91 if (bufferevent_disable(ctx->data_bev, EV_WRITE)) |
119 |
92 ERROR("bufferevent_disable"); |
120 // wait for it to be written out (we get called a second time) |
93 |
121 ctx->cmd_sent = 1; |
94 // start receiving data |
122 |
95 if (bufferevent_enable(ctx->data_bev, EV_READ)) |
123 } else { |
96 ERROR("bufferevent_enable"); |
124 // the write buffer was drained, so the render command was write():n |
97 |
125 assert(ctx->cb_sent); |
|
126 ctx->cb_sent(ctx->cb_arg); |
|
127 ctx->cb_sent = NULL; |
|
128 |
|
129 // we don't care about EV_WRITE anymore |
|
130 if (bufferevent_disable(ctx->bev_data, EV_WRITE)) |
|
131 ERROR("bufferevent_disable"); |
|
132 |
|
133 // are we buffered or raw? |
|
134 if (ctx->cb_data) { |
|
135 // start receiving data into our buffer |
|
136 if (bufferevent_enable(ctx->bev_data, EV_READ)) |
|
137 ERROR("bufferevent_enable"); |
|
138 |
|
139 } else { |
|
140 assert(event_initialized(&ctx->ev_data)); |
|
141 |
|
142 // enable the raw read event |
|
143 if (event_add(&ctx->ev_data, NULL)) |
|
144 ERROR("event_add"); |
|
145 } |
|
146 } |
|
147 |
98 return; |
148 return; |
99 error: |
149 |
100 _remote_render_fail(ctx); |
150 error: |
|
151 _render_remote_do_fail(ctx); |
101 } |
152 } |
102 |
153 |
103 static void _remote_read (struct bufferevent *bev, void *arg) { |
154 static void _remote_read (struct bufferevent *bev, void *arg) { |
104 struct remote_render_ctx *ctx = arg; |
155 struct render_remote *ctx = arg; |
105 |
156 |
106 // pass the bufferevent's input buffer to our callback - libevent doesn't provide any function to access this, but hopefully this works correctly |
157 _render_remote_do_data(ctx); |
107 ctx->cb_data(EVBUFFER_INPUT(bev), ctx->cb_arg); |
|
108 } |
158 } |
109 |
159 |
110 static void _remote_error (struct bufferevent *bev, short what, void *arg) { |
160 static void _remote_error (struct bufferevent *bev, short what, void *arg) { |
111 struct remote_render_ctx *ctx = arg; |
161 struct render_remote *ctx = arg; |
112 |
162 |
113 // OH NOES; WHAT DO WE DO!? |
163 // OH NOES; WHAT DO WE DO!? |
114 |
164 |
115 if (what & EVBUFFER_EOF) { |
165 if (what & EVBUFFER_EOF) { |
116 // great! |
166 // great! |
117 |
167 ctx->have_eof = 1; |
118 // send any remaining-chunk data |
168 |
119 if (EVBUFFER_LENGTH(EVBUFFER_INPUT(bev)) > 0) |
169 // flush any remaining data/call cb_send as needed |
120 ctx->cb_data(EVBUFFER_INPUT(bev), ctx->cb_arg); |
170 _render_remote_do_data(ctx); |
121 |
|
122 // signal completion |
|
123 _remote_render_done(ctx); |
|
124 |
171 |
125 return; |
172 return; |
126 |
173 |
127 } else if (what & EVBUFFER_ERROR) { |
174 } else if (what & EVBUFFER_ERROR) { |
128 // crap. |
175 // crap. |
135 } else { |
182 } else { |
136 FATAL("weird bufferevent error code: 0x%02X", what); |
183 FATAL("weird bufferevent error code: 0x%02X", what); |
137 } |
184 } |
138 |
185 |
139 // cb_fail + free |
186 // cb_fail + free |
140 _remote_render_fail(ctx); |
187 _render_remote_do_fail(ctx); |
141 } |
188 } |
142 |
189 |
143 static void _remote_connected (int fd, short event, void *arg) { |
190 /* |
144 struct remote_render_ctx *ctx = arg; |
191 * Do the initial IO-agnostic work to initialize the rendering process |
145 |
192 */ |
146 // set up the read/write bufferevent |
193 static struct render_remote *_render_remote_init (struct render *render, struct remote_node *remote_node) { |
147 if ((ctx->data_bev = bufferevent_new(fd, &_remote_read, &_remote_write, &_remote_error, ctx)) == NULL) |
194 struct render_remote *ctx; |
148 ERROR("bufferevent_new"); |
195 |
149 |
196 printf("remote_node render load: %d/%d\n", remote_node->current_load, remote_node->parallel_renders); |
150 // write the render command |
197 |
151 if (bufferevent_write(ctx->data_bev, &ctx->render_cmd, sizeof(ctx->render_cmd))) |
198 // alloc the remote render ctx |
152 ERROR("bufferevent_write"); |
199 if (!(ctx = calloc(1, sizeof(struct render_remote)))) |
153 |
200 ERROR("calloc"); |
154 // wait for it to be written out |
201 |
155 if (bufferevent_enable(ctx->data_bev, EV_WRITE)) |
202 // copy the relevant stuff from the render_ctx |
156 ERROR("bufferevent_enable"); |
|
157 |
|
158 return; |
|
159 |
|
160 error: |
|
161 _remote_render_fail(ctx); |
|
162 } |
|
163 |
|
164 static void render_cmd_build (struct render *render, struct remote_render_ctx *ctx) { |
|
165 // just copy over the render params to the render_cmd |
|
166 ctx->render_cmd.mode = render->mode; |
203 ctx->render_cmd.mode = render->mode; |
167 ctx->render_cmd.img_w = htonl(render->img_w); |
204 ctx->render_cmd.img_w = htonl(render->img_w); |
168 ctx->render_cmd.img_h = htonl(render->img_h); |
205 ctx->render_cmd.img_h = htonl(render->img_h); |
169 ctx->render_cmd.x1 = render->x1; |
206 ctx->render_cmd.x1 = render->x1; |
170 ctx->render_cmd.y1 = render->y1; |
207 ctx->render_cmd.y1 = render->y1; |
171 ctx->render_cmd.x2 = render->x2; |
208 ctx->render_cmd.x2 = render->x2; |
172 ctx->render_cmd.y2 = render->y2; |
209 ctx->render_cmd.y2 = render->y2; |
173 } |
210 |
174 |
211 // create the socket |
175 struct remote_render_ctx *render_remote ( |
212 if ((ctx->sock = socket(remote_node->addr.ss_family, SOCK_STREAM, 0)) < 0) |
|
213 PERROR("socket"); |
|
214 |
|
215 // mark it as nonblocking |
|
216 if (fcntl(ctx->sock, F_SETFL, O_NONBLOCK) == -1) |
|
217 PERROR("fcntl"); |
|
218 |
|
219 // initiate the connect |
|
220 int err = connect(ctx->sock, (struct sockaddr *) &remote_node->addr, sizeof(remote_node->addr)); |
|
221 |
|
222 if (err != -1 || errno != EINPROGRESS) |
|
223 PERROR("connect"); |
|
224 |
|
225 // return the raw ctx |
|
226 return ctx; |
|
227 |
|
228 error: |
|
229 _render_remote_free(ctx); |
|
230 return NULL; |
|
231 } |
|
232 |
|
233 /* |
|
234 * Raw unbuffered I/O mode |
|
235 */ |
|
236 struct render_remote *render_remote_rawio ( |
|
237 struct render *render, |
|
238 struct remote_node *remote_node, |
|
239 void (*cb_sent)(void *arg), |
|
240 void (*cb_fail)(void *arg), |
|
241 void (*cb_io_data)(evutil_socket_t, short, void*), |
|
242 void *cb_arg |
|
243 ) { |
|
244 struct render_remote *ctx; |
|
245 |
|
246 // short-circuit error handling |
|
247 if (!(ctx = _render_remote_init(render, remote_node))) |
|
248 return NULL; |
|
249 |
|
250 // store the provided callback functions |
|
251 ctx->cb_sent = cb_sent; |
|
252 ctx->cb_fail = cb_fail; |
|
253 ctx->cb_arg = cb_arg; |
|
254 |
|
255 // set up the write bufferevent |
|
256 if ((ctx->bev_data = bufferevent_new(ctx->sock, NULL, &_remote_write, &_remote_error, ctx)) == NULL) |
|
257 ERROR("bufferevent_new"); |
|
258 |
|
259 // wait for it to connect |
|
260 if (bufferevent_enable(ctx->bev_data, EV_WRITE)) |
|
261 ERROR("bufferevent_enable"); |
|
262 |
|
263 // set up the custom EV_READ callback |
|
264 event_set(&ctx->ev_data, ctx->sock, EV_READ, cb_io_data, cb_arg); |
|
265 |
|
266 // we are now alive |
|
267 ctx->alive = 1; |
|
268 |
|
269 // success |
|
270 return ctx; |
|
271 |
|
272 error: |
|
273 _render_remote_free(ctx); |
|
274 return NULL; |
|
275 } |
|
276 |
|
277 /* |
|
278 * Old buffered mode |
|
279 */ |
|
280 struct render_remote *render_remote ( |
176 struct render *render, |
281 struct render *render, |
177 struct remote_node *remote_node, |
282 struct remote_node *remote_node, |
178 void (*cb_sent)(void *arg), |
283 void (*cb_sent)(void *arg), |
179 void (*cb_data)(struct evbuffer *buf, void *arg), |
284 void (*cb_data)(struct evbuffer *buf, void *arg), |
180 void (*cb_done)(void *arg), |
285 void (*cb_done)(void *arg), |
181 void (*cb_fail)(void *arg), |
286 void (*cb_fail)(void *arg), |
182 void *cb_arg |
287 void *cb_arg |
183 ) { |
288 ) { |
184 struct remote_render_ctx *ctx; |
289 struct render_remote *ctx; |
185 int sock; |
290 |
186 |
291 // short-circuit error handling |
187 printf("remote_node render load: %d/%d\n", remote_node->current_load, remote_node->parallel_renders); |
292 if (!(ctx = _render_remote_init(render, remote_node))) |
188 |
293 return NULL; |
189 // alloc the remote render ctx |
|
190 if (!(ctx = calloc(1, sizeof(struct remote_render_ctx)))) |
|
191 ERROR("calloc"); |
|
192 |
294 |
193 // store the provided callback functions |
295 // store the provided callback functions |
194 ctx->cb_sent = cb_sent; |
296 ctx->cb_sent = cb_sent; |
195 ctx->cb_data = cb_data; |
297 ctx->cb_data = cb_data; |
196 ctx->cb_done = cb_done; |
298 ctx->cb_done = cb_done; |
197 ctx->cb_fail = cb_fail; |
299 ctx->cb_fail = cb_fail; |
198 ctx->cb_arg = cb_arg; |
300 ctx->cb_arg = cb_arg; |
199 |
301 |
200 // copy the relevant stuff from the render_ctx |
302 // set up the read/write bufferevent |
201 render_cmd_build(render, ctx); |
303 if ((ctx->bev_data = bufferevent_new(ctx->sock, &_remote_read, &_remote_write, &_remote_error, ctx)) == NULL) |
202 |
304 ERROR("bufferevent_new"); |
203 // create the socket |
305 |
204 if ((sock = socket(remote_node->addr.ss_family, SOCK_STREAM, 0)) < 0) |
306 // wait for it to connect |
205 PERROR("socket"); |
307 if (bufferevent_enable(ctx->bev_data, EV_WRITE)) |
206 |
308 ERROR("bufferevent_enable"); |
207 // mark it as nonblocking |
309 |
208 if (fcntl(sock, F_SETFL, O_NONBLOCK) == -1) |
|
209 PERROR("fcntl"); |
|
210 |
|
211 // initiate the connect |
|
212 int err = connect(sock, (struct sockaddr *) &remote_node->addr, sizeof(remote_node->addr)); |
|
213 |
|
214 if (err != -1 || errno != EINPROGRESS) |
|
215 PERROR("connect"); |
|
216 |
|
217 // do the libevent dance |
|
218 if (!(ctx->ev_conn = event_new(NULL, sock, EV_WRITE, &_remote_connected, ctx))) |
|
219 ERROR("event_new"); |
|
220 |
|
221 if (event_add(ctx->ev_conn, NULL)) |
|
222 ERROR("event_add"); |
|
223 |
|
224 // we are now alive |
310 // we are now alive |
225 ctx->alive = 1; |
311 ctx->alive = 1; |
226 |
312 |
227 // success |
313 // success |
228 return ctx; |
314 return ctx; |
229 |
315 |
230 error: |
316 error: |
231 free(ctx); |
317 _render_remote_free(ctx); |
232 |
|
233 if (sock > 0) |
|
234 close(sock); |
|
235 |
|
236 return NULL; |
318 return NULL; |
237 } |
319 } |
238 |
320 |
239 int render_remote_set_recv (struct remote_render_ctx *ctx, size_t recv_threshold, size_t unread_buffer) { |
321 void render_remote_set_recv (struct render_remote *ctx, size_t recv_threshold, size_t unread_buffer) { |
240 if (ctx->data_bev == NULL) |
322 assert(ctx->bev_data); |
241 return -1; |
323 |
242 |
324 bufferevent_setwatermark(ctx->bev_data, EV_READ, recv_threshold, recv_threshold + unread_buffer); |
243 bufferevent_setwatermark(ctx->data_bev, EV_READ, recv_threshold, recv_threshold + unread_buffer); |
325 } |
244 |
326 |
|
327 void render_remote_flush (struct render_remote *ctx) { |
|
328 assert(ctx->bev_data); |
|
329 |
|
330 // call cb_data/cb_done as appropriate |
|
331 _render_remote_do_data(ctx); |
|
332 } |
|
333 |
|
334 int render_remote_reschedule (struct render_remote *ctx) { |
|
335 assert(event_initialized(&ctx->ev_data)); |
|
336 |
|
337 // just reschedule it |
|
338 if (event_add(&ctx->ev_data, NULL)) |
|
339 ERROR("event_add"); |
|
340 |
|
341 // ok |
245 return 0; |
342 return 0; |
246 } |
343 |
247 |
344 error: |
248 int render_remote_shake (struct remote_render_ctx *ctx) { |
345 return -1; |
249 if (ctx->data_bev == NULL) |
346 } |
250 return -1; |
347 |
251 |
348 void render_remote_cancel (struct render_remote *ctx) { |
252 ctx->cb_data(EVBUFFER_INPUT(ctx->data_bev), ctx->cb_arg); |
|
253 |
|
254 return 0; |
|
255 } |
|
256 |
|
257 void render_remote_cancel (struct remote_render_ctx *ctx) { |
|
258 // we must be alive for this.. |
349 // we must be alive for this.. |
259 assert(ctx->alive); |
350 assert(ctx->alive); |
260 |
351 |
261 // if it's still just connecting, cancel that |
352 _render_remote_free(ctx); |
262 if (event_pending(ctx->ev_conn, EV_WRITE, NULL)) |
353 } |
263 event_del(ctx->ev_conn); |
354 |
264 |
355 void render_remote_free (struct render_remote *ctx) { |
265 // this takes care of the rest |
356 // XXX: add some sanity checks |
266 _remote_render_free (ctx); |
357 |
267 } |
358 _render_remote_free(ctx); |
268 |
359 } |
|
360 |