|
1 |
|
2 #include <event2/event.h> |
|
3 #include <assert.h> |
|
4 #include <stdlib.h> |
|
5 |
|
6 #include "evpq.h" |
|
7 #include "lib/error.h" |
|
8 |
|
9 struct evpq_conn { |
|
10 struct event_base *ev_base; |
|
11 struct evpq_callback_info user_cb; |
|
12 |
|
13 PGconn *pg_conn; |
|
14 |
|
15 struct event *ev; |
|
16 |
|
17 enum evpq_state state; |
|
18 }; |
|
19 |
|
20 /* |
|
21 * This evpq_conn has experienced a GENERAL FAILURE. |
|
22 */ |
|
23 static void _evpq_failure (struct evpq_conn *conn) { |
|
24 // update state |
|
25 conn->state = EVPQ_FAILURE; |
|
26 |
|
27 // notify |
|
28 conn->user_cb.fn_failure(conn, conn->user_cb.cb_arg); |
|
29 } |
|
30 |
|
31 /* |
|
32 * Initial connect was succesfull |
|
33 */ |
|
34 static void _evpq_connect_ok (struct evpq_conn *conn) { |
|
35 // update state |
|
36 conn->state = EVPQ_CONNECTED; |
|
37 |
|
38 // notify |
|
39 conn->user_cb.fn_connected(conn, conn->user_cb.cb_arg); |
|
40 } |
|
41 |
|
42 /* |
|
43 * Initial connect failed |
|
44 */ |
|
45 static void _evpq_connect_fail (struct evpq_conn *conn) { |
|
46 // just mark it as a generic failure |
|
47 _evpq_failure(conn); |
|
48 } |
|
49 |
|
50 /* |
|
51 * Receive a result and gives it to the user. If there was no more results, update state and tell the user. |
|
52 * |
|
53 * Returns zero if we got a result, 1 if there were/are no more results to handle. |
|
54 */ |
|
55 static int _evpq_query_result (struct evpq_conn *conn) { |
|
56 PGresult *result; |
|
57 |
|
58 // get the result |
|
59 if ((result = PQgetResult(conn->pg_conn)) == NULL) { |
|
60 // no more results, update state |
|
61 conn->state = EVPQ_CONNECTED; |
|
62 |
|
63 // tell the user the query is done |
|
64 conn->user_cb.fn_done(conn, conn->user_cb.cb_arg); |
|
65 |
|
66 // stop waiting for more results |
|
67 return 1; |
|
68 |
|
69 } else { |
|
70 // got a result, give it to the user |
|
71 conn->user_cb.fn_result(conn, result, conn->user_cb.cb_arg); |
|
72 |
|
73 // great |
|
74 return 0; |
|
75 } |
|
76 } |
|
77 |
|
78 /* |
|
79 * Schedule a new _evpq_event for this connection. |
|
80 */ |
|
81 static int _evpq_schedule (struct evpq_conn *conn, short what, void (*handler)(evutil_socket_t, short, void *)) { |
|
82 assert(conn->pg_conn != NULL); |
|
83 |
|
84 // ensure we have a valid socket, this should be the case after the PQstatus check... |
|
85 if (PQsocket(conn->pg_conn) < 0) |
|
86 FATAL("PQsocket gave invalid socket"); |
|
87 |
|
88 // reschedule with a new event |
|
89 if (conn->ev) { |
|
90 event_assign(conn->ev, conn->ev_base, PQsocket(conn->pg_conn), what, handler, conn); |
|
91 |
|
92 } else { |
|
93 if ((conn->ev = event_new(conn->ev_base, PQsocket(conn->pg_conn), what, handler, conn)) == NULL) |
|
94 PERROR("event_new"); |
|
95 |
|
96 } |
|
97 |
|
98 // add it |
|
99 // XXX: timeouts? |
|
100 if (event_add(conn->ev, NULL)) |
|
101 PERROR("event_add"); |
|
102 |
|
103 // success |
|
104 return 0; |
|
105 |
|
106 error: |
|
107 return -1; |
|
108 } |
|
109 |
|
110 /* |
|
111 * Handle events on the PQ socket while connecting |
|
112 */ |
|
113 static void _evpq_connect_event (evutil_socket_t fd, short what, void *arg) { |
|
114 struct evpq_conn *conn = arg; |
|
115 PostgresPollingStatusType poll_status; |
|
116 |
|
117 // this is only for connect events |
|
118 assert(conn->state == EVPQ_CONNECT); |
|
119 |
|
120 // XXX: timeouts? |
|
121 |
|
122 // ask PQ what to do |
|
123 switch ((poll_status = PQconnectPoll(conn->pg_conn))) { |
|
124 case PGRES_POLLING_READING: |
|
125 // poll for read |
|
126 what = EV_READ; |
|
127 |
|
128 // reschedule |
|
129 break; |
|
130 |
|
131 case PGRES_POLLING_WRITING: |
|
132 // poll for write |
|
133 what = EV_WRITE; |
|
134 |
|
135 // reschedule |
|
136 break; |
|
137 |
|
138 case PGRES_POLLING_OK: |
|
139 // connected |
|
140 _evpq_connect_ok(conn); |
|
141 |
|
142 // done |
|
143 return; |
|
144 |
|
145 case PGRES_POLLING_FAILED: |
|
146 // faaaaail! |
|
147 _evpq_connect_fail(conn); |
|
148 |
|
149 // done |
|
150 return; |
|
151 |
|
152 default: |
|
153 FATAL("PQconnectPoll gave a weird value: %d", poll_status); |
|
154 } |
|
155 |
|
156 // reschedule |
|
157 if (_evpq_schedule(conn, what, _evpq_connect_event)) |
|
158 goto error; |
|
159 |
|
160 // done, wait for the next event |
|
161 return; |
|
162 |
|
163 error: |
|
164 // XXX: reset? |
|
165 _evpq_failure(conn); |
|
166 } |
|
167 |
|
168 static void _evpq_query_event (evutil_socket_t fd, short what, void *arg) { |
|
169 struct evpq_conn *conn = arg; |
|
170 |
|
171 // this is only for query events |
|
172 assert(conn->state == EVPQ_QUERY); |
|
173 |
|
174 // XXX: PQflush, timeouts |
|
175 assert(what == EV_READ); |
|
176 |
|
177 // we're going to assume that all queries will *require* data for their results |
|
178 // this would break otherwise (PQconsumeInput might block?) |
|
179 assert(PQisBusy(conn->pg_conn) != 0); |
|
180 |
|
181 // handle input |
|
182 if (PQconsumeInput(conn->pg_conn) == 0) |
|
183 ERROR("PQconsumeInput: %s", PQerrorMessage(conn->pg_conn)); |
|
184 |
|
185 // handle results |
|
186 while (PQisBusy(conn->pg_conn) == 0) { |
|
187 // handle the result |
|
188 if (_evpq_query_result(conn) == 1) { |
|
189 // no need to wait for anything anymore |
|
190 return; |
|
191 } |
|
192 |
|
193 // loop to handle the next result |
|
194 } |
|
195 |
|
196 // still need to wait for a result, so reschedule |
|
197 if (_evpq_schedule(conn, EV_READ, _evpq_query_event)) |
|
198 goto error; |
|
199 |
|
200 // done, wait for the next event |
|
201 return; |
|
202 |
|
203 error: |
|
204 // XXX: reset? |
|
205 _evpq_failure(conn); |
|
206 |
|
207 } |
|
208 |
|
209 struct evpq_conn *evpq_connect (struct event_base *ev_base, const char *conninfo, const struct evpq_callback_info cb_info) { |
|
210 struct evpq_conn *conn = NULL; |
|
211 |
|
212 // alloc our context |
|
213 if ((conn = calloc(1, sizeof(*conn))) == NULL) |
|
214 ERROR("calloc"); |
|
215 |
|
216 // initial state |
|
217 conn->ev_base = ev_base; |
|
218 conn->user_cb = cb_info; |
|
219 conn->state = EVPQ_INIT; |
|
220 |
|
221 // create our PGconn |
|
222 if ((conn->pg_conn = PQconnectStart(conninfo)) == NULL) |
|
223 PERROR("PQconnectStart"); |
|
224 |
|
225 // check for immediate failure |
|
226 if (PQstatus(conn->pg_conn) == CONNECTION_BAD) |
|
227 ERROR("PQstatus indicates CONNECTION_BAD after PQconnectStart"); |
|
228 |
|
229 // assume PGRES_POLLING_WRITING |
|
230 if (_evpq_schedule(conn, EV_WRITE, _evpq_connect_event)) |
|
231 goto error; |
|
232 |
|
233 // connecting |
|
234 conn->state = EVPQ_CONNECT; |
|
235 |
|
236 // success, wait for the connection to be established |
|
237 return conn; |
|
238 |
|
239 error: |
|
240 if (conn) { |
|
241 if (conn->pg_conn) |
|
242 PQfinish(conn->pg_conn); |
|
243 |
|
244 free(conn); |
|
245 } |
|
246 |
|
247 return NULL; |
|
248 } |
|
249 |
|
250 int evpq_query (struct evpq_conn *conn, const char *command) { |
|
251 // check state |
|
252 if (conn->state != EVPQ_CONNECTED) |
|
253 ERROR("invalid evpq state: %d", conn->state); |
|
254 |
|
255 // do the query |
|
256 if (PQsendQuery(conn->pg_conn, command) == 0) |
|
257 ERROR("PQsendQuery: %s", PQerrorMessage(conn->pg_conn)); |
|
258 |
|
259 // update state |
|
260 conn->state = EVPQ_QUERY; |
|
261 |
|
262 // XXX: PQflush |
|
263 |
|
264 // poll for read |
|
265 if (_evpq_schedule(conn, EV_READ, _evpq_query_event)) |
|
266 goto error; |
|
267 |
|
268 // and then we wait |
|
269 return 0; |
|
270 |
|
271 error: |
|
272 return -1; |
|
273 } |
|
274 |
|
275 const PGconn *evpq_pgconn (struct evpq_conn *conn) { |
|
276 return conn->pg_conn; |
|
277 } |