1 |
1 |
2 #include <stdlib.h> |
2 #include <stdlib.h> |
3 #include <unistd.h> |
3 #include <unistd.h> |
4 #include <string.h> |
4 #include <string.h> |
|
5 #include <errno.h> |
5 #include <assert.h> |
6 #include <assert.h> |
6 |
7 |
7 #include "connection.h" |
8 #include "connection.h" |
|
9 #include "command.h" |
|
10 #include "request.h" |
8 #include "../socket.h" |
11 #include "../socket.h" |
9 #include "../common.h" |
12 #include "../common.h" |
10 |
13 |
11 static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg); |
14 static void _memcache_conn_ev_connect (evutil_socket_t fd, short what, void *arg); |
|
15 static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg); |
|
16 static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg); |
|
17 static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg); |
|
18 static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg); |
12 |
19 |
13 struct memcache_conn *memcache_conn_open (struct memcache_server *server) { |
20 struct memcache_conn *memcache_conn_open (struct memcache_server *server) { |
14 struct memcache_conn *conn = NULL; |
21 struct memcache_conn *conn = NULL; |
15 |
22 |
16 if ((conn = calloc(1, sizeof(*conn))) == NULL) |
23 if ((conn = calloc(1, sizeof(*conn))) == NULL) |
17 ERROR("calloc"); |
24 ERROR("calloc"); |
18 |
25 |
19 // remember the server |
26 // remember the server |
20 conn->server = server; |
27 conn->server = server; |
21 |
28 |
22 // attempt connect |
29 // attempt connect |
23 if (memcache_conn_connect(conn)) |
30 if (memcache_conn_connect(conn)) |
24 ERROR("failed to connect to server"); |
31 ERROR("failed to connect to server"); |
25 |
32 |
26 // success |
33 // success |
72 assert(conn->req == NULL); |
79 assert(conn->req == NULL); |
73 |
80 |
74 // store the req |
81 // store the req |
75 conn->req = req; |
82 conn->req = req; |
76 |
83 |
77 // XXX: transmit it |
84 // write the request header into our bufferevent's output buffer |
|
85 if (memcache_cmd_format_header(bufferevent_get_output(conn->bev), req->cmd_type, &req->key, &req->obj)) |
|
86 ERROR("failed to init the cmd"); |
|
87 |
|
88 // tell our bufferevent to send it |
|
89 if (bufferevent_enable(conn->bev, EV_WRITE)) |
|
90 PERROR("bufferevent_enable"); |
|
91 |
|
92 // wait for that to complete |
|
93 return 0; |
78 |
94 |
79 error: |
95 error: |
80 return -1; |
96 return -1; |
|
97 } |
|
98 |
|
99 /* |
|
100 * Start writing out the request data |
|
101 */ |
|
102 void memcache_conn_send_req_data (struct memcache_conn *conn) { |
|
103 // just fake a call to the event handler |
|
104 _memcache_conn_ev_write(conn->fd, EV_WRITE, conn); |
|
105 } |
|
106 |
|
107 /* |
|
108 * Start reading a reply from the connection |
|
109 */ |
|
110 void memcache_conn_handle_reply (struct memcache_conn *conn) { |
|
111 // ensure that we either didn't have a command, or it has been sent |
|
112 assert(conn->req->buf.data == NULL || conn->req->buf.offset == conn->req->buf.len); |
|
113 |
|
114 // start reading on the bufferevent |
|
115 if (bufferevent_enable(conn->bev, EV_READ)) |
|
116 PERROR("bufferevent_enable"); |
|
117 |
|
118 // ok, wait for the reply |
|
119 return; |
|
120 |
|
121 error: |
|
122 // XXX: error handling |
|
123 assert(0); |
81 } |
124 } |
82 |
125 |
83 /* |
126 /* |
84 * The connect() has finished |
127 * The connect() has finished |
85 */ |
128 */ |
91 goto error; |
134 goto error; |
92 |
135 |
93 if (error) |
136 if (error) |
94 ERROR("connect failed: %s", strerror(error)); |
137 ERROR("connect failed: %s", strerror(error)); |
95 |
138 |
|
139 // set up the bufferevent |
|
140 if ((conn->bev = bufferevent_new(fd, |
|
141 &_memcache_conn_bev_read, |
|
142 &_memcache_conn_bev_write, |
|
143 &_memcache_conn_bev_error, |
|
144 conn |
|
145 )) == NULL) |
|
146 ERROR("bufferevent_new"); |
|
147 |
96 // mark us as succesfully connected |
148 // mark us as succesfully connected |
97 conn->is_connected = 1; |
149 conn->is_connected = 1; |
98 |
150 |
99 // notify the server |
151 // notify the server |
100 memcache_server_conn_ready(conn->server, conn); |
152 memcache_server_conn_ready(conn->server, conn); |
103 return; |
155 return; |
104 |
156 |
105 error: |
157 error: |
106 // notify the server |
158 // notify the server |
107 memcache_server_conn_dead(conn->server, conn); |
159 memcache_server_conn_dead(conn->server, conn); |
|
160 } |
|
161 |
|
162 /* |
|
163 * The write buffer is empty, which means that we have written out a command header |
|
164 */ |
|
165 static void _memcache_conn_bev_write (struct bufferevent *bev, void *arg) { |
|
166 struct memcache_conn *conn = arg; |
|
167 |
|
168 // the command header has been sent |
|
169 assert(evbuffer_get_length(bufferevent_get_output(bev)) == 0); |
|
170 |
|
171 // does this request have some data to be included in the request? |
|
172 if (conn->req->buf.data > 0) { |
|
173 // we need to send the request data next |
|
174 memcache_conn_send_req_data(conn); |
|
175 |
|
176 } else { |
|
177 // wait for a reply |
|
178 memcache_conn_handle_reply(conn); |
|
179 } |
|
180 } |
|
181 |
|
182 /* |
|
183 * We have received some reply data, which should include the complete reply line at some point |
|
184 */ |
|
185 static void _memcache_conn_bev_read (struct bufferevent *bev, void *arg) { |
|
186 struct memcache_conn *conn = arg; |
|
187 struct evbuffer *in_buf = bufferevent_get_input(bev); |
|
188 char *header_data; |
|
189 enum memcache_reply reply_type; |
|
190 int has_data; |
|
191 |
|
192 // ensure that we do indeed have some data |
|
193 assert(evbuffer_get_length(in_buf) > 0); |
|
194 |
|
195 // attempt to parse the response header |
|
196 if (memcache_cmd_parse_header(in_buf, &header_data, &reply_type, &conn->req->key, &conn->req->obj, &has_data)) |
|
197 ERROR("memcache_cmd_parse_header"); |
|
198 |
|
199 // XXX: read reply data |
|
200 |
|
201 error: |
|
202 // XXX: error handling |
|
203 return; |
|
204 } |
|
205 |
|
206 |
|
207 static void _memcache_conn_bev_error (struct bufferevent *bev, short what, void *arg) { |
|
208 // XXX: error handling |
|
209 assert(0); |
|
210 } |
|
211 |
|
212 static void _memcache_conn_ev_write (evutil_socket_t fd, short event, void *arg) { |
|
213 struct memcache_conn *conn = arg; |
|
214 struct memcache_buf *buf = &conn->req->buf; |
|
215 int ret; |
|
216 |
|
217 // correct event |
|
218 assert(event == EV_WRITE); |
|
219 |
|
220 // we do indeed have data to send |
|
221 assert(buf->len > 0 && buf->data != NULL && buf->offset < buf->len); |
|
222 |
|
223 // do the actual write() |
|
224 if ((ret = write(fd, buf->data + buf->offset, buf->len - buf->offset)) == -1 && errno != EAGAIN) |
|
225 PERROR("write"); |
|
226 |
|
227 // should never be the case... ? |
|
228 if (ret == 0) |
|
229 ERROR("write returned 0 !?!"); |
|
230 |
|
231 // did we manage to write some data? |
|
232 if (ret > 0) { |
|
233 // update offset |
|
234 buf->offset += ret; |
|
235 } |
|
236 |
|
237 // data left to write? |
|
238 if (buf->offset < buf->len) { |
|
239 // reschedule |
|
240 if (event_add(&conn->ev, NULL)) |
|
241 PERROR("event_add"); |
|
242 |
|
243 } else { |
|
244 // done! We can handle the reply now |
|
245 memcache_conn_handle_reply(conn); |
|
246 } |
|
247 |
|
248 // success |
|
249 return; |
|
250 |
|
251 error: |
|
252 // XXX: error handling |
|
253 assert(0); |
108 } |
254 } |
109 |
255 |
110 void memcache_conn_free (struct memcache_conn *conn) { |
256 void memcache_conn_free (struct memcache_conn *conn) { |
111 // ensure that the connection is not considered to be connected anymore |
257 // ensure that the connection is not considered to be connected anymore |
112 assert(!conn->is_connected); |
258 assert(!conn->is_connected); |