|
1 #include "transport_fd.h" |
|
2 |
|
3 #include "log.h" |
|
4 |
|
5 #include <fcntl.h> |
|
6 #include <unistd.h> |
|
7 #include <assert.h> |
|
8 |
|
9 /** |
|
10 * Our libevent callback |
|
11 */ |
|
12 static void transport_fd_on_event (evutil_socket_t _fd, short ev_what, void *arg) |
|
13 { |
|
14 struct transport_fd *fd = arg; |
|
15 |
|
16 (void) _fd; |
|
17 |
|
18 short what = 0; |
|
19 |
|
20 // build flags |
|
21 if (ev_what & EV_READ) |
|
22 what |= TRANSPORT_READ; |
|
23 |
|
24 if (ev_what & EV_WRITE) |
|
25 what |= TRANSPORT_WRITE; |
|
26 |
|
27 // invoke user callback |
|
28 fd->cb_func(fd, what, fd->cb_arg); |
|
29 } |
|
30 |
|
31 /** |
|
32 * Our transport_methods implementations |
|
33 */ |
|
34 err_t transport_fd__read (transport_t *transport, void *buf, size_t *len, error_t *err) |
|
35 { |
|
36 struct transport_fd *fd = transport_check(transport, &transport_fd_type); |
|
37 int ret; |
|
38 |
|
39 error_reset(err); |
|
40 |
|
41 // read(), and detect non-EAGAIN or EOF |
|
42 if ((ret = read(fd->fd, buf, *len)) < 0 && errno != EAGAIN) |
|
43 // unexpected error |
|
44 return SET_ERROR_ERRNO(err, &libc_errors, ERR_READ); |
|
45 |
|
46 else if (ret == 0) |
|
47 // EOF |
|
48 return SET_ERROR(err, &transport_errors, ERR_TRANSPORT_EOF); |
|
49 |
|
50 |
|
51 if (ret < 0) { |
|
52 // EAGAIN -> zero bytes |
|
53 *len = 0; |
|
54 |
|
55 } else { |
|
56 // normal -> bytes read |
|
57 *len = ret; |
|
58 } |
|
59 |
|
60 // ok |
|
61 return SUCCESS; |
|
62 } |
|
63 |
|
64 err_t transport_fd__write (transport_t *transport, const void *buf, size_t *len, error_t *err) |
|
65 { |
|
66 struct transport_fd *fd = transport_check(transport, &transport_fd_type); |
|
67 int ret; |
|
68 |
|
69 error_reset(err); |
|
70 |
|
71 // write(), and detect non-EAGAIN or EOF |
|
72 if ((ret = write(fd->fd, buf, *len)) < 0 && errno != EAGAIN) |
|
73 // unexpected error |
|
74 return SET_ERROR_ERRNO(err, &libc_errors, ERR_WRITE); |
|
75 |
|
76 else if (ret == 0) |
|
77 // EOF |
|
78 return SET_ERROR(err, &libc_errors, ERR_WRITE_EOF); |
|
79 |
|
80 |
|
81 if (ret < 0) { |
|
82 // EAGAIN -> zero bytes |
|
83 *len = 0; |
|
84 |
|
85 if (transport->info.ev_mask & TRANSPORT_WRITE) |
|
86 // enable the write event |
|
87 if ((ERROR_CODE(err) = transport_fd_enable(fd, TRANSPORT_WRITE))) |
|
88 return ERROR_CODE(err); |
|
89 |
|
90 } else { |
|
91 // normal -> bytes read |
|
92 *len = ret; |
|
93 } |
|
94 |
|
95 return SUCCESS; |
|
96 } |
|
97 |
|
98 err_t transport_fd__events (transport_t *transport, short ev_mask, error_t *err) |
|
99 { |
|
100 struct transport_fd *fd = transport_check(transport, &transport_fd_type); |
|
101 |
|
102 short mask = 0; |
|
103 |
|
104 // enable read as requested |
|
105 if (ev_mask & TRANSPORT_READ) |
|
106 mask |= TRANSPORT_READ; |
|
107 |
|
108 // enable write if requested and it's currently enabled |
|
109 if ((ev_mask & TRANSPORT_WRITE) && event_pending(fd->ev_write, EV_WRITE, NULL)) |
|
110 mask |= TRANSPORT_WRITE; |
|
111 |
|
112 // set |
|
113 return (ERROR_CODE(err) = transport_fd_events(fd, mask)); |
|
114 } |
|
115 |
|
116 void transport_fd__deinit (transport_t *transport) |
|
117 { |
|
118 struct transport_fd *fd = transport_check(transport, &transport_fd_type); |
|
119 |
|
120 transport_fd_deinit(fd); |
|
121 } |
|
122 |
|
123 const struct transport_type transport_fd_type = { |
|
124 .base_type = { |
|
125 .parent = &transport_type_type, |
|
126 }, |
|
127 .methods = { |
|
128 .read = transport_fd__read, |
|
129 .write = transport_fd__write, |
|
130 .events = transport_fd__events, |
|
131 .deinit = transport_fd__deinit |
|
132 } |
|
133 }; |
|
134 |
|
135 /** |
|
136 * Dummy callbacks |
|
137 */ |
|
138 void transport_fd_callback_user (struct transport_fd *fd, short what, void *arg) |
|
139 { |
|
140 (void) arg; |
|
141 |
|
142 // proxy |
|
143 transport_invoke(TRANSPORT_FD_BASE(fd), what); |
|
144 } |
|
145 |
|
146 /** |
|
147 * Function implementations |
|
148 */ |
|
149 void transport_fd_init (struct transport_fd *fd, struct event_base *ev_base, int _fd) |
|
150 { |
|
151 // sanity-check |
|
152 assert(!fd->fd); |
|
153 assert(!fd->ev_read && !fd->ev_write); |
|
154 assert(_fd == TRANSPORT_FD_INVALID || _fd >= 0); |
|
155 |
|
156 // initialize |
|
157 fd->ev_base = ev_base; |
|
158 fd->fd = _fd; |
|
159 fd->cb_func = fd->cb_arg = NULL; |
|
160 } |
|
161 |
|
162 err_t transport_fd_nonblock (struct transport_fd *fd, bool nonblock) |
|
163 { |
|
164 assert(fd->fd != TRANSPORT_FD_INVALID); |
|
165 |
|
166 // XXX: maintain old flags? |
|
167 |
|
168 |
|
169 // set new flags |
|
170 if (fcntl(fd->fd, F_SETFL, nonblock ? O_NONBLOCK : 0) < 0) |
|
171 return ERR_FCNTL; |
|
172 |
|
173 return SUCCESS; |
|
174 } |
|
175 |
|
176 /** |
|
177 * Install our internal event handler. |
|
178 * |
|
179 * The events should not already be set up. |
|
180 * |
|
181 * Cleans up partial events on errors |
|
182 */ |
|
183 err_t transport_fd_install (struct transport_fd *fd) |
|
184 { |
|
185 assert(fd->fd != TRANSPORT_FD_INVALID); |
|
186 assert(!fd->ev_read && !fd->ev_write); |
|
187 |
|
188 // create new events |
|
189 if ((fd->ev_read = event_new(fd->ev_base, fd->fd, EV_READ | EV_PERSIST, transport_fd_on_event, fd)) == NULL) |
|
190 goto err_event_add; |
|
191 |
|
192 if ((fd->ev_write = event_new(fd->ev_base, fd->fd, EV_WRITE, transport_fd_on_event, fd)) == NULL) |
|
193 goto err_event_add; |
|
194 |
|
195 // ok |
|
196 return SUCCESS; |
|
197 |
|
198 err_event_add: |
|
199 // remove partial events |
|
200 transport_fd_clear(fd); |
|
201 |
|
202 return ERR_EVENT_NEW; |
|
203 } |
|
204 |
|
205 err_t transport_fd_setup (struct transport_fd *fd, transport_fd_callback_func cb_func, void *cb_arg) |
|
206 { |
|
207 // requires a valid fd |
|
208 assert(fd->fd != TRANSPORT_FD_INVALID); |
|
209 |
|
210 // store |
|
211 fd->cb_func = cb_func; |
|
212 fd->cb_arg = cb_arg; |
|
213 |
|
214 // install the event handlers? |
|
215 if (!fd->ev_read || !fd->ev_write) |
|
216 return transport_fd_install(fd); |
|
217 else |
|
218 return SUCCESS; |
|
219 } |
|
220 |
|
221 err_t transport_fd_enable (struct transport_fd *fd, short mask) |
|
222 { |
|
223 // just add the appropriate events |
|
224 if (mask & TRANSPORT_READ && event_add(fd->ev_read, NULL)) |
|
225 return ERR_EVENT_ADD; |
|
226 |
|
227 if (mask & TRANSPORT_WRITE && event_add(fd->ev_write, NULL)) |
|
228 return ERR_EVENT_ADD; |
|
229 |
|
230 |
|
231 return SUCCESS; |
|
232 } |
|
233 |
|
234 err_t transport_fd_disable (struct transport_fd *fd, short mask) |
|
235 { |
|
236 if (mask & TRANSPORT_READ && event_del(fd->ev_read)) |
|
237 return ERR_EVENT_DEL; |
|
238 |
|
239 if (mask & TRANSPORT_WRITE && event_del(fd->ev_write)) |
|
240 return ERR_EVENT_DEL; |
|
241 |
|
242 |
|
243 return SUCCESS; |
|
244 } |
|
245 |
|
246 err_t transport_fd_events (struct transport_fd *fd, short mask) |
|
247 { |
|
248 err_t err; |
|
249 |
|
250 // enable/disable read |
|
251 if (mask & TRANSPORT_READ) |
|
252 err = event_add(fd->ev_read, NULL); |
|
253 else |
|
254 err = event_del(fd->ev_read); |
|
255 |
|
256 if (err) |
|
257 return err; |
|
258 |
|
259 // enable/disable write |
|
260 if (mask & TRANSPORT_WRITE) |
|
261 err = event_add(fd->ev_write, NULL); |
|
262 else |
|
263 err = event_del(fd->ev_write); |
|
264 |
|
265 if (err) |
|
266 return err; |
|
267 |
|
268 // ok |
|
269 return SUCCESS; |
|
270 } |
|
271 |
|
272 /** |
|
273 * Remove our current ev_* events, but leave the cb_* intact. |
|
274 */ |
|
275 static void transport_fd_remove (struct transport_fd *fd) |
|
276 { |
|
277 if (fd->ev_read) |
|
278 event_free(fd->ev_read); |
|
279 |
|
280 if (fd->ev_write) |
|
281 event_free(fd->ev_write); |
|
282 |
|
283 fd->ev_read = NULL; |
|
284 fd->ev_write = NULL; |
|
285 } |
|
286 |
|
287 void transport_fd_clear (struct transport_fd *fd) |
|
288 { |
|
289 // remove the events |
|
290 transport_fd_remove(fd); |
|
291 |
|
292 // clear the callbacks |
|
293 fd->cb_func = fd->cb_arg = NULL; |
|
294 } |
|
295 |
|
296 err_t transport_fd_defaults (struct transport_fd *fd) |
|
297 { |
|
298 error_t err; |
|
299 |
|
300 // install the transport_invoke callback handler |
|
301 if ((ERROR_CODE(&err) = transport_fd_setup(fd, transport_fd_callback_user, NULL))) |
|
302 goto error; |
|
303 |
|
304 // enable read unless masked out |
|
305 if (TRANSPORT_FD_BASE(fd)->info.ev_mask & TRANSPORT_READ) { |
|
306 if ((ERROR_CODE(&err) = transport_fd_enable(fd, TRANSPORT_READ))) |
|
307 goto error; |
|
308 } |
|
309 |
|
310 // ok |
|
311 return SUCCESS; |
|
312 |
|
313 error: |
|
314 return ERROR_CODE(&err); |
|
315 } |
|
316 |
|
317 err_t transport_fd_set (struct transport_fd *fd, int _fd, error_t *err) |
|
318 { |
|
319 assert(_fd == TRANSPORT_FD_INVALID || _fd >= 0); |
|
320 |
|
321 // close the old stuff |
|
322 if (transport_fd_close(fd, err)) |
|
323 log_warn_error(err, "close"); |
|
324 |
|
325 // set the new one |
|
326 fd->fd = _fd; |
|
327 |
|
328 // do we have callbacks that we need to setup? |
|
329 if (fd->cb_func) |
|
330 return transport_fd_install(fd); |
|
331 |
|
332 else |
|
333 return SUCCESS; |
|
334 } |
|
335 |
|
336 void transport_fd_invoke (struct transport_fd *fd, short what) |
|
337 { |
|
338 // invoke |
|
339 transport_invoke(TRANSPORT_FD_BASE(fd), what); |
|
340 } |
|
341 |
|
342 err_t transport_fd_close (struct transport_fd *fd, error_t *err) |
|
343 { |
|
344 int _fd = fd->fd; |
|
345 |
|
346 // remove any installed events |
|
347 transport_fd_remove(fd); |
|
348 |
|
349 // invalidate fd |
|
350 fd->fd = TRANSPORT_FD_INVALID; |
|
351 |
|
352 // close the fd |
|
353 if (_fd != TRANSPORT_FD_INVALID && close(_fd)) |
|
354 return SET_ERROR_ERRNO(err, &libc_errors, ERR_CLOSE); |
|
355 |
|
356 return SUCCESS; |
|
357 } |
|
358 |
|
359 void transport_fd_deinit (struct transport_fd *fd) |
|
360 { |
|
361 error_t err; |
|
362 |
|
363 // XXX: this might block |
|
364 if (transport_fd_close(fd, &err)) |
|
365 log_warn_error(&err, "close"); |
|
366 |
|
367 } |
|
368 |