1 #define _GNU_SOURCE |
1 #define _GNU_SOURCE |
2 #include <stdlib.h> |
2 #include <stdlib.h> |
3 #include <sys/queue.h> |
|
4 #include <assert.h> |
3 #include <assert.h> |
5 #include <string.h> |
4 #include <string.h> |
6 |
5 |
7 #include "evsql.h" |
6 #include "evsql.h" |
|
7 #include "evsql_internal.h" |
8 #include "evpq.h" |
8 #include "evpq.h" |
9 #include "lib/log.h" |
9 #include "lib/log.h" |
10 #include "lib/error.h" |
10 #include "lib/error.h" |
11 #include "lib/misc.h" |
11 #include "lib/misc.h" |
12 |
12 |
13 enum evsql_type { |
|
14 EVSQL_EVPQ, |
|
15 }; |
|
16 |
|
17 struct evsql { |
|
18 // callbacks |
|
19 evsql_error_cb error_fn; |
|
20 void *cb_arg; |
|
21 |
|
22 // backend engine |
|
23 enum evsql_type type; |
|
24 |
|
25 union { |
|
26 struct evpq_conn *evpq; |
|
27 } engine; |
|
28 |
|
29 // list of queries running or waiting to run |
|
30 TAILQ_HEAD(evsql_queue, evsql_query) queue; |
|
31 }; |
|
32 |
|
33 struct evsql_query { |
|
34 // the evsql we are querying |
|
35 struct evsql *evsql; |
|
36 |
|
37 // the actual SQL query, this may or may not be ours, see _evsql_query_exec |
|
38 char *command; |
|
39 |
|
40 // possible query params |
|
41 struct evsql_query_param_info { |
|
42 int count; |
|
43 |
|
44 Oid *types; |
|
45 const char **values; |
|
46 int *lengths; |
|
47 int *formats; |
|
48 |
|
49 int result_format; |
|
50 } params; |
|
51 |
|
52 // our callback |
|
53 evsql_query_cb cb_fn; |
|
54 void *cb_arg; |
|
55 |
|
56 // our position in the query list |
|
57 TAILQ_ENTRY(evsql_query) entry; |
|
58 |
|
59 // the result |
|
60 union { |
|
61 PGresult *evpq; |
|
62 } result; |
|
63 }; |
|
64 |
|
65 |
13 |
66 /* |
14 /* |
67 * Actually execute the given query. |
15 * Actually execute the given query. |
68 * |
16 * |
69 * The backend should be able to accept the query at this time. |
17 * The backend should be able to accept the query at this time. |
70 * |
18 * |
71 * query->command must be valid during the execution of this function, but once it returns, the command is not needed |
19 * You should assume that if trying to execute a query fails, then the connection should also be considred as failed. |
72 * anymore, and should be set to NULL. |
20 */ |
73 */ |
21 static int _evsql_query_exec (struct evsql_conn *conn, struct evsql_query *query, const char *command) { |
74 static int _evsql_query_exec (struct evsql *evsql, struct evsql_query *query, const char *command) { |
22 int err; |
75 switch (evsql->type) { |
23 |
|
24 switch (conn->evsql->type) { |
76 case EVSQL_EVPQ: |
25 case EVSQL_EVPQ: |
77 // got params? |
26 // got params? |
78 if (query->params.count) { |
27 if (query->params.count) { |
79 return evpq_query_params(evsql->engine.evpq, command, |
28 err = evpq_query_params(conn->engine.evpq, command, |
80 query->params.count, |
29 query->params.count, |
81 query->params.types, |
30 query->params.types, |
82 query->params.values, |
31 query->params.values, |
83 query->params.lengths, |
32 query->params.lengths, |
84 query->params.formats, |
33 query->params.formats, |
85 query->params.result_format |
34 query->params.result_format |
86 ); |
35 ); |
87 |
36 |
88 } else { |
37 } else { |
89 // plain 'ole query |
38 // plain 'ole query |
90 return evpq_query(evsql->engine.evpq, command); |
39 err = evpq_query(conn->engine.evpq, command); |
91 } |
40 } |
|
41 |
|
42 if (err) { |
|
43 if (PQstatus(evpq_pgconn(conn->engine.evpq)) != CONNECTION_OK) |
|
44 WARNING("conn failed"); |
|
45 else |
|
46 WARNING("query failed, dropping conn as well"); |
|
47 } |
|
48 |
|
49 break; |
92 |
50 |
93 default: |
51 default: |
94 FATAL("evsql->type"); |
52 FATAL("evsql->type"); |
95 } |
53 } |
96 } |
54 |
97 |
55 if (!err) |
98 /* |
56 // assign the query |
99 * Free the query and related resources, doesn't trigger any callbacks or remove from any queues |
57 conn->query = query; |
|
58 |
|
59 return err; |
|
60 } |
|
61 |
|
62 /* |
|
63 * Free the query and related resources, doesn't trigger any callbacks or remove from any queues. |
|
64 * |
|
65 * The command should already be taken care of (NULL). |
100 */ |
66 */ |
101 static void _evsql_query_free (struct evsql_query *query) { |
67 static void _evsql_query_free (struct evsql_query *query) { |
102 assert(query->command == NULL); |
68 assert(query->command == NULL); |
103 |
69 |
104 // free params if present |
70 // free params if present |
110 // free the query itself |
76 // free the query itself |
111 free(query); |
77 free(query); |
112 } |
78 } |
113 |
79 |
114 /* |
80 /* |
115 * Dequeue the query, execute the callback, and free it. |
81 * Execute the callback if res is given, and free the query. |
116 */ |
82 */ |
117 static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *res) { |
83 static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *res) { |
118 // dequeue |
|
119 TAILQ_REMOVE(&query->evsql->queue, query, entry); |
|
120 |
|
121 if (res) |
84 if (res) |
122 // call the callback |
85 // call the callback |
123 query->cb_fn(res, query->cb_arg); |
86 query->cb_fn(res, query->cb_arg); |
124 |
87 |
125 // free |
88 // free |
126 _evsql_query_free(query); |
89 _evsql_query_free(query); |
127 } |
90 } |
128 |
91 |
129 /* |
92 /* |
130 * A query has failed, notify the user and remove it. |
93 * XXX: |
131 */ |
94 * / |
132 static void _evsql_query_failure (struct evsql *evsql, struct evsql_query *query) { |
95 static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *res) { |
|
96 struct evsql_query *query; |
|
97 |
|
98 // clear the queue |
|
99 while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) { |
|
100 _evsql_query_done(query, res); |
|
101 |
|
102 TAILQ_REMOVE(&evsql->query_queue, query, entry); |
|
103 } |
|
104 |
|
105 // free |
|
106 free(evsql); |
|
107 } |
|
108 */ |
|
109 |
|
110 /* |
|
111 * Free the transaction, it should already be deassociated from the query and conn. |
|
112 */ |
|
113 static void _evsql_trans_free (struct evsql_trans *trans) { |
|
114 // ensure we don't leak anything |
|
115 assert(trans->query == NULL); |
|
116 assert(trans->conn == NULL); |
|
117 |
|
118 // free |
|
119 free(trans); |
|
120 } |
|
121 |
|
122 /* |
|
123 * Release a connection. It should already be deassociated from the trans and query. |
|
124 * |
|
125 * Releases the engine, removes from the conn_list and frees this. |
|
126 */ |
|
127 static void _evsql_conn_release (struct evsql_conn *conn) { |
|
128 // ensure we don't leak anything |
|
129 assert(conn->trans == NULL); |
|
130 assert(conn->query == NULL); |
|
131 |
|
132 // release the engine |
|
133 switch (conn->evsql->type) { |
|
134 case EVSQL_EVPQ: |
|
135 evpq_release(conn->engine.evpq); |
|
136 break; |
|
137 |
|
138 default: |
|
139 FATAL("evsql->type"); |
|
140 } |
|
141 |
|
142 // remove from list |
|
143 LIST_REMOVE(conn, entry); |
|
144 |
|
145 // free |
|
146 free(conn); |
|
147 } |
|
148 |
|
149 /* |
|
150 * Fail a single query, this will trigger the callback and free it. |
|
151 */ |
|
152 static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) { |
133 struct evsql_result_info res; ZINIT(res); |
153 struct evsql_result_info res; ZINIT(res); |
134 |
154 |
135 // set up the result_info |
155 // set up the result_info |
136 res.evsql = evsql; |
156 res.evsql = evsql; |
137 res.error = 1; |
157 res.error = 1; |
138 |
158 |
139 // finish it off |
159 // finish off the query |
140 _evsql_query_done(query, &res); |
160 _evsql_query_done(query, &res); |
141 } |
161 } |
142 |
162 |
143 /* |
163 /* |
144 * Clear every enqueued query and then free the evsql. |
164 * Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the |
145 * |
165 * conn, and then free the trans. |
146 * If result_info is given, each query will also recieve it via their callback, and the error_fn will be called. |
166 */ |
147 */ |
167 static void _evsql_trans_fail (struct evsql_trans *trans) { |
148 static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *res) { |
168 if (trans->query) { |
|
169 // free the query silently |
|
170 _evsql_query_free(trans->query); trans->query = NULL; |
|
171 } |
|
172 |
|
173 // tell the user |
|
174 // XXX: trans is in a bad state during this call |
|
175 trans->error_fn(trans, trans->cb_arg); |
|
176 |
|
177 // fail the conn |
|
178 trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL; |
|
179 |
|
180 // free the trans |
|
181 _evsql_trans_free(trans); |
|
182 } |
|
183 |
|
184 /* |
|
185 * Fail a connection. If the connection is transactional, this will just call _evsql_trans_fail, but otherwise it will |
|
186 * fail any ongoing query, and then release the connection. |
|
187 */ |
|
188 static void _evsql_conn_fail (struct evsql_conn *conn) { |
|
189 if (conn->trans) { |
|
190 // let transactions handle their connection failures |
|
191 _evsql_trans_fail(conn->trans); |
|
192 |
|
193 } else { |
|
194 if (conn->query) { |
|
195 // fail the in-progress query |
|
196 _evsql_query_fail(conn->evsql, conn->query); conn->query = NULL; |
|
197 } |
|
198 |
|
199 // finish off the whole connection |
|
200 _evsql_conn_release(conn); |
|
201 } |
|
202 } |
|
203 |
|
204 /* |
|
205 * Processes enqueued non-transactional queries until the queue is empty, or we managed to exec a query. |
|
206 * |
|
207 * If execing a query on a connection fails, both the query and the connection are failed (in that order). |
|
208 * |
|
209 * Any further queries will then also be failed, because there's no reconnection/retry logic yet. |
|
210 */ |
|
211 static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) { |
149 struct evsql_query *query; |
212 struct evsql_query *query; |
150 |
213 int err; |
151 // clear the queue |
214 |
152 while ((query = TAILQ_FIRST(&evsql->queue)) != NULL) { |
215 // look for waiting queries |
153 _evsql_query_done(query, res); |
216 while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) { |
154 |
217 // dequeue |
155 TAILQ_REMOVE(&evsql->queue, query, entry); |
218 TAILQ_REMOVE(&evsql->query_queue, query, entry); |
156 } |
219 |
157 |
220 if (conn) { |
158 // do the error callback if required |
221 // try and execute it |
159 if (res) |
222 err = _evsql_query_exec(conn, query, query->command); |
160 evsql->error_fn(evsql, evsql->cb_arg); |
|
161 |
|
162 // free |
|
163 free(evsql); |
|
164 } |
|
165 |
|
166 |
|
167 /* |
|
168 * Sends the next query if there are more enqueued |
|
169 */ |
|
170 static void _evsql_pump (struct evsql *evsql) { |
|
171 struct evsql_query *query; |
|
172 |
|
173 // look for the next query |
|
174 if ((query = TAILQ_FIRST(&evsql->queue)) != NULL) { |
|
175 // try and execute it |
|
176 if (_evsql_query_exec(evsql, query, query->command)) { |
|
177 // the query failed |
|
178 _evsql_query_failure(evsql, query); |
|
179 } |
223 } |
180 |
224 |
181 // free the command |
225 // free the command buf |
182 free(query->command); query->command = NULL; |
226 free(query->command); query->command = NULL; |
183 |
227 |
184 // ok, then we just wait |
228 if (err || !conn) { |
185 } |
229 if (!conn) { |
186 } |
230 // warn when dropping queries |
187 |
231 WARNING("failing query becuse there are no conns"); |
188 |
232 } |
189 static void _evsql_evpq_connected (struct evpq_conn *conn, void *arg) { |
233 |
190 struct evsql *evsql = arg; |
234 // fail the query |
191 |
235 _evsql_query_fail(evsql, query); |
192 // no state to update, just pump any waiting queries |
236 |
193 _evsql_pump(evsql); |
237 if (conn) { |
194 } |
238 // fail the connection |
195 |
239 WARNING("failing the connection because a query-exec failed"); |
196 static void _evsql_evpq_result (struct evpq_conn *conn, PGresult *result, void *arg) { |
240 |
197 struct evsql *evsql = arg; |
241 _evsql_conn_fail(conn); conn = NULL; |
198 struct evsql_query *query; |
242 } |
199 |
243 |
200 assert((query = TAILQ_FIRST(&evsql->queue)) != NULL); |
244 } else { |
|
245 // we have succesfully enqueued a query, and we can wait for this connection to complete |
|
246 break; |
|
247 |
|
248 } |
|
249 |
|
250 // handle the rest of the queue |
|
251 } |
|
252 |
|
253 // ok |
|
254 return; |
|
255 } |
|
256 |
|
257 /* |
|
258 * Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use. |
|
259 */ |
|
260 static void _evsql_trans_ready (const struct evsql_result_info *res, void *arg) { |
|
261 (void) arg; |
|
262 |
|
263 assert(res->trans); |
|
264 |
|
265 // check for errors |
|
266 if (res->error) |
|
267 ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res)); |
|
268 |
|
269 // transaction is now ready for use |
|
270 res->trans->ready_fn(res->trans, res->trans->cb_arg); |
|
271 |
|
272 error: |
|
273 _evsql_trans_fail(res->trans); |
|
274 } |
|
275 |
|
276 /* |
|
277 * The transaction's connection is ready, send the 'BEGIN' query. |
|
278 */ |
|
279 static void _evsql_trans_conn_ready (struct evsql *evsql, struct evsql_trans *trans) { |
|
280 char trans_sql[EVSQL_QUERY_BEGIN_BUF]; |
|
281 const char *isolation_level; |
|
282 int ret; |
|
283 |
|
284 // determine the isolation_level to use |
|
285 switch (trans->type) { |
|
286 case EVSQL_TRANS_DEFAULT: |
|
287 isolation_level = NULL; break; |
|
288 |
|
289 case EVSQL_TRANS_SERIALIZABLE: |
|
290 isolation_level = "SERIALIZABLE"; break; |
|
291 |
|
292 case EVSQL_TRANS_REPEATABLE_READ: |
|
293 isolation_level = "REPEATABLE READ"; break; |
|
294 |
|
295 case EVSQL_TRANS_READ_COMMITTED: |
|
296 isolation_level = "READ COMMITTED"; break; |
|
297 |
|
298 case EVSQL_TRANS_READ_UNCOMMITTED: |
|
299 isolation_level = "READ UNCOMMITTED"; break; |
|
300 |
|
301 default: |
|
302 FATAL("trans->type: %d", trans->type); |
|
303 } |
|
304 |
|
305 // build the trans_sql |
|
306 if (isolation_level) |
|
307 ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION ISOLATION LEVEL %s", isolation_level); |
|
308 else |
|
309 ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION"); |
|
310 |
|
311 // make sure it wasn't truncated |
|
312 if (ret >= EVSQL_QUERY_BEGIN_BUF) |
|
313 ERROR("trans_sql overflow: %d >= %d", ret, EVSQL_QUERY_BEGIN_BUF); |
|
314 |
|
315 // execute the query |
|
316 if (evsql_query(evsql, trans, trans_sql, _evsql_trans_ready, NULL)) |
|
317 ERROR("evsql_query"); |
|
318 |
|
319 // success |
|
320 return; |
|
321 |
|
322 error: |
|
323 // fail the transaction |
|
324 _evsql_trans_fail(trans); |
|
325 } |
|
326 |
|
327 /* |
|
328 * The evpq connection was succesfully established. |
|
329 */ |
|
330 static void _evsql_evpq_connected (struct evpq_conn *_conn, void *arg) { |
|
331 struct evsql_conn *conn = arg; |
|
332 |
|
333 if (conn->trans) |
|
334 // notify the transaction |
|
335 _evsql_trans_conn_ready(conn->evsql, conn->trans); |
|
336 |
|
337 else |
|
338 // pump any waiting transactionless queries |
|
339 _evsql_pump(conn->evsql, conn); |
|
340 } |
|
341 |
|
342 /* |
|
343 * Got one result on this evpq connection. |
|
344 */ |
|
345 static void _evsql_evpq_result (struct evpq_conn *_conn, PGresult *result, void *arg) { |
|
346 struct evsql_conn *conn = arg; |
|
347 struct evsql_query *query = conn->query; |
|
348 |
|
349 assert(query != NULL); |
201 |
350 |
202 // if we get multiple results, only return the first one |
351 // if we get multiple results, only return the first one |
203 if (query->result.evpq) { |
352 if (query->result.evpq) { |
204 WARNING("[evsql] evpq query returned multiple results, discarding previous one"); |
353 WARNING("[evsql] evpq query returned multiple results, discarding previous one"); |
205 |
354 |
235 res.error = 0; |
387 res.error = 0; |
236 res.result.pq = query->result.evpq; |
388 res.result.pq = query->result.evpq; |
237 |
389 |
238 } |
390 } |
239 |
391 |
240 // finish it off |
392 // de-associate the query from the connection |
241 _evsql_query_done(query, &res); |
393 conn->query = NULL; |
242 |
394 |
243 // pump the next one |
395 // how we handle query completion depends on if we're a transaction or not |
244 _evsql_pump(evsql); |
396 if (conn->trans) { |
245 } |
397 // we can deassign the trans's query |
246 |
398 conn->trans->query = NULL; |
247 static void _evsql_evpq_failure (struct evpq_conn *conn, void *arg) { |
399 |
248 struct evsql *evsql = arg; |
400 // then hand the query to the user |
249 struct evsql_result_info result; ZINIT(result); |
401 _evsql_query_done(query, &res); |
250 |
402 |
251 // OH SHI... |
403 } else { |
252 |
404 // a transactionless query, so just finish it off and pump any other waiting ones |
253 // set up the result_info |
405 _evsql_query_done(query, &res); |
254 result.evsql = evsql; |
406 |
255 result.error = 1; |
407 // pump the next one |
256 |
408 _evsql_pump(conn->evsql, conn); |
257 // finish off the whole connection |
409 } |
258 _evsql_destroy(evsql, &result); |
410 } |
259 } |
411 |
260 |
412 /* |
|
413 * The connection failed. |
|
414 */ |
|
415 static void _evsql_evpq_failure (struct evpq_conn *_conn, void *arg) { |
|
416 struct evsql_conn *conn = arg; |
|
417 |
|
418 // just fail the conn |
|
419 _evsql_conn_fail(conn); |
|
420 } |
|
421 |
|
422 /* |
|
423 * Our evpq behaviour |
|
424 */ |
261 static struct evpq_callback_info _evsql_evpq_cb_info = { |
425 static struct evpq_callback_info _evsql_evpq_cb_info = { |
262 .fn_connected = _evsql_evpq_connected, |
426 .fn_connected = _evsql_evpq_connected, |
263 .fn_result = _evsql_evpq_result, |
427 .fn_result = _evsql_evpq_result, |
264 .fn_done = _evsql_evpq_done, |
428 .fn_done = _evsql_evpq_done, |
265 .fn_failure = _evsql_evpq_failure, |
429 .fn_failure = _evsql_evpq_failure, |
266 }; |
430 }; |
267 |
431 |
268 static struct evsql *_evsql_new_base (evsql_error_cb error_fn, void *cb_arg) { |
432 /* |
|
433 * Allocate the generic evsql context. |
|
434 */ |
|
435 static struct evsql *_evsql_new_base (struct event_base *ev_base, evsql_error_cb error_fn, void *cb_arg) { |
269 struct evsql *evsql = NULL; |
436 struct evsql *evsql = NULL; |
270 |
437 |
271 // allocate it |
438 // allocate it |
272 if ((evsql = calloc(1, sizeof(*evsql))) == NULL) |
439 if ((evsql = calloc(1, sizeof(*evsql))) == NULL) |
273 ERROR("calloc"); |
440 ERROR("calloc"); |
274 |
441 |
275 // store |
442 // store |
|
443 evsql->ev_base = ev_base; |
276 evsql->error_fn = error_fn; |
444 evsql->error_fn = error_fn; |
277 evsql->cb_arg = cb_arg; |
445 evsql->cb_arg = cb_arg; |
278 |
446 |
279 // init |
447 // init |
280 TAILQ_INIT(&evsql->queue); |
448 LIST_INIT(&evsql->conn_list); |
|
449 TAILQ_INIT(&evsql->query_queue); |
281 |
450 |
282 // done |
451 // done |
283 return evsql; |
452 return evsql; |
284 |
453 |
285 error: |
454 error: |
286 return NULL; |
455 return NULL; |
287 } |
456 } |
288 |
457 |
|
458 /* |
|
459 * Start a new connection and add it to the list, it won't be ready until _evsql_evpq_connected is called |
|
460 */ |
|
461 static struct evsql_conn *_evsql_conn_new (struct evsql *evsql) { |
|
462 struct evsql_conn *conn = NULL; |
|
463 |
|
464 // allocate |
|
465 if ((conn = calloc(1, sizeof(*conn))) == NULL) |
|
466 ERROR("calloc"); |
|
467 |
|
468 // init |
|
469 conn->evsql = evsql; |
|
470 |
|
471 // connect the engine |
|
472 switch (evsql->type) { |
|
473 case EVSQL_EVPQ: |
|
474 if ((conn->engine.evpq = evpq_connect(evsql->ev_base, evsql->engine_conf.evpq, _evsql_evpq_cb_info, conn)) == NULL) |
|
475 goto error; |
|
476 |
|
477 break; |
|
478 |
|
479 default: |
|
480 FATAL("evsql->type"); |
|
481 } |
|
482 |
|
483 // add it to the list |
|
484 LIST_INSERT_HEAD(&evsql->conn_list, conn, entry); |
|
485 |
|
486 // success |
|
487 return conn; |
|
488 |
|
489 error: |
|
490 free(conn); |
|
491 |
|
492 return NULL; |
|
493 } |
|
494 |
289 struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) { |
495 struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) { |
290 struct evsql *evsql = NULL; |
496 struct evsql *evsql = NULL; |
291 |
497 |
292 // base init |
498 // base init |
293 if ((evsql = _evsql_new_base (error_fn, cb_arg)) == NULL) |
499 if ((evsql = _evsql_new_base (ev_base, error_fn, cb_arg)) == NULL) |
294 goto error; |
500 goto error; |
295 |
501 |
296 // connect the engine |
502 // store conf |
297 if ((evsql->engine.evpq = evpq_connect(ev_base, pq_conninfo, _evsql_evpq_cb_info, evsql)) == NULL) |
503 evsql->engine_conf.evpq = pq_conninfo; |
|
504 |
|
505 // pre-create one connection |
|
506 if (_evsql_conn_new(evsql) == NULL) |
298 goto error; |
507 goto error; |
299 |
508 |
300 // done |
509 // done |
301 return evsql; |
510 return evsql; |
302 |
511 |
306 |
515 |
307 return NULL; |
516 return NULL; |
308 } |
517 } |
309 |
518 |
310 /* |
519 /* |
311 * Checks what the state of the connection is in regards to executing a query. |
520 * Checks if the connection is already allocated for some other trans/query. |
312 * |
521 * |
313 * Returns: |
522 * Returns: |
314 * <0 connection failure, query not possible |
523 * 0 connection idle, can be allocated |
315 * 0 connection idle, can query immediately |
524 * >1 connection busy |
316 * 1 connection busy, must queue query |
525 */ |
317 */ |
526 static int _evsql_conn_busy (struct evsql_conn *conn) { |
318 static int _evsql_query_busy (struct evsql *evsql) { |
527 // transactions get the connection to themselves |
319 switch (evsql->type) { |
528 if (conn->trans) |
|
529 return 1; |
|
530 |
|
531 // if it has a query assigned, it's busy |
|
532 if (conn->query) |
|
533 return 1; |
|
534 |
|
535 // otherwise, it's all idle |
|
536 return 0; |
|
537 } |
|
538 |
|
539 /* |
|
540 * Checks if the connection is ready for use (i.e. _evsql_evpq_connected was called). |
|
541 * |
|
542 * The connection should not already have a query running. |
|
543 * |
|
544 * Returns |
|
545 * <0 the connection is not valid (failed, query in progress) |
|
546 * 0 the connection is still pending, and will become ready at some point |
|
547 * >0 it's ready |
|
548 */ |
|
549 static int _evsql_conn_ready (struct evsql_conn *conn) { |
|
550 switch (conn->evsql->type) { |
320 case EVSQL_EVPQ: { |
551 case EVSQL_EVPQ: { |
321 enum evpq_state state = evpq_state(evsql->engine.evpq); |
552 enum evpq_state state = evpq_state(conn->engine.evpq); |
322 |
553 |
323 switch (state) { |
554 switch (state) { |
324 case EVPQ_CONNECT: |
555 case EVPQ_CONNECT: |
325 case EVPQ_QUERY: |
556 return 0; |
326 return 1; |
|
327 |
557 |
328 case EVPQ_CONNECTED: |
558 case EVPQ_CONNECTED: |
329 return 0; |
559 return 1; |
330 |
560 |
|
561 case EVPQ_QUERY: |
331 case EVPQ_INIT: |
562 case EVPQ_INIT: |
332 case EVPQ_FAILURE: |
563 case EVPQ_FAILURE: |
333 return -1; |
564 return -1; |
334 |
565 |
335 default: |
566 default: |
336 FATAL("evpq_state"); |
567 FATAL("evpq_state: %d", state); |
337 } |
568 } |
338 |
569 |
339 } |
570 } |
340 |
571 |
341 default: |
572 default: |
342 FATAL("evsql->type"); |
573 FATAL("evsql->type: %d", conn->evsql->type); |
343 } |
574 } |
344 } |
575 } |
345 |
576 |
346 static struct evsql_query *_evsql_query_new (struct evsql *evsql, evsql_query_cb query_fn, void *cb_arg) { |
577 /* |
|
578 * Allocate a connection for use and return it via *conn_ptr, or if may_queue is nonzero and the connection pool is |
|
579 * getting full, return NULL (query should be queued). |
|
580 * |
|
581 * Note that the returned connection might not be ready for use yet (if we created a new one, see _evsql_conn_ready). |
|
582 * |
|
583 * Returns zero if a connection was found or the request should be queued, or nonzero if something failed and the |
|
584 * request should be dropped. |
|
585 */ |
|
586 static int _evsql_conn_get (struct evsql *evsql, struct evsql_conn **conn_ptr, int may_queue) { |
|
587 *conn_ptr = NULL; |
|
588 |
|
589 // find a connection that isn't busy and is ready (unless the query queue is empty). |
|
590 LIST_FOREACH(*conn_ptr, &evsql->conn_list, entry) { |
|
591 // skip busy conns always |
|
592 if (_evsql_conn_busy(*conn_ptr)) |
|
593 continue; |
|
594 |
|
595 // accept pending conns as long as there are NO enqueued queries (might cause deadlock otherwise) |
|
596 if (_evsql_conn_ready(*conn_ptr) == 0 && TAILQ_EMPTY(&evsql->query_queue)) |
|
597 break; |
|
598 |
|
599 // accept conns that are in a fully ready state |
|
600 if (_evsql_conn_ready(*conn_ptr) > 0) |
|
601 break; |
|
602 } |
|
603 |
|
604 // if we found an idle connection, we can just return that right away |
|
605 if (*conn_ptr) |
|
606 return 0; |
|
607 |
|
608 // return NULL if may_queue and the conn list is not empty |
|
609 if (may_queue && !LIST_EMPTY(&evsql->conn_list)) |
|
610 return 0; |
|
611 |
|
612 // we need to open a new connection |
|
613 if ((*conn_ptr = _evsql_conn_new(evsql)) == NULL) |
|
614 goto error; |
|
615 |
|
616 // good |
|
617 return 0; |
|
618 error: |
|
619 return -1; |
|
620 } |
|
621 |
|
622 /* |
|
623 * Validate and allocate the basic stuff for a new query. |
|
624 */ |
|
625 static struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) { |
347 struct evsql_query *query; |
626 struct evsql_query *query; |
348 |
627 |
|
628 // if it's part of a trans, then make sure the trans is idle |
|
629 if (trans && trans->query) |
|
630 ERROR("transaction is busy"); |
|
631 |
349 // allocate it |
632 // allocate it |
350 if ((query = calloc(1, sizeof(*query))) == NULL) |
633 if ((query = calloc(1, sizeof(*query))) == NULL) |
351 ERROR("calloc"); |
634 ERROR("calloc"); |
352 |
635 |
353 // store |
636 // store |
354 query->evsql = evsql; |
|
355 query->cb_fn = query_fn; |
637 query->cb_fn = query_fn; |
356 query->cb_arg = cb_arg; |
638 query->cb_arg = cb_arg; |
357 |
639 |
358 // success |
640 // success |
359 return query; |
641 return query; |
360 |
642 |
361 error: |
643 error: |
362 return NULL; |
644 return NULL; |
363 } |
645 } |
364 |
646 |
365 static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_query *query, const char *command) { |
647 /* |
366 int busy; |
648 * Handle a new query. |
367 |
649 * |
368 // check state |
650 * For transactions this will associate the query and then execute it, otherwise this will either find an idle |
369 if ((busy = _evsql_query_busy(evsql)) < 0) |
651 * connection and send the query, or enqueue it. |
370 ERROR("connection is not valid"); |
652 */ |
371 |
653 static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_trans *trans, struct evsql_query *query, const char *command) { |
372 if (busy) { |
654 // transaction queries are handled differently |
373 // copy the command for later execution |
655 if (trans) { |
374 if ((query->command = strdup(command)) == NULL) |
656 // it's an in-transaction query |
375 ERROR("strdup"); |
657 assert(trans->query == NULL); |
|
658 |
|
659 // assign the query |
|
660 trans->query = query; |
|
661 |
|
662 // execute directly |
|
663 if (_evsql_query_exec(trans->conn, query, command)) |
|
664 goto error; |
376 |
665 |
377 } else { |
666 } else { |
378 assert(TAILQ_EMPTY(&evsql->queue)); |
667 struct evsql_conn *conn; |
379 |
668 |
380 // execute directly |
669 // find an idle connection |
381 if (_evsql_query_exec(evsql, query, command)) |
670 if ((_evsql_conn_get(evsql, &conn, 1))) |
382 goto error; |
671 ERROR("couldn't allocate a connection for the query"); |
383 |
672 |
384 } |
673 // we must enqueue if no idle conn or the conn is not yet ready |
385 |
674 if (conn && _evsql_conn_ready(conn) > 0) { |
386 // store it on the list |
675 // execute directly |
387 TAILQ_INSERT_TAIL(&evsql->queue, query, entry); |
676 if (_evsql_query_exec(conn, query, command)) |
|
677 goto error; |
|
678 |
|
679 |
|
680 } else { |
|
681 // copy the command for later execution |
|
682 if ((query->command = strdup(command)) == NULL) |
|
683 ERROR("strdup"); |
|
684 |
|
685 // enqueue until some connection pumps the queue |
|
686 TAILQ_INSERT_TAIL(&evsql->query_queue, query, entry); |
|
687 } |
|
688 } |
388 |
689 |
389 // ok, good |
690 // ok, good |
390 return 0; |
691 return 0; |
391 |
692 |
392 error: |
693 error: |
393 return -1; |
694 return -1; |
394 } |
695 } |
395 |
696 |
396 struct evsql_query *evsql_query (struct evsql *evsql, const char *command, evsql_query_cb query_fn, void *cb_arg) { |
697 struct evsql_query *evsql_query (struct evsql *evsql, struct evsql_trans *trans, const char *command, evsql_query_cb query_fn, void *cb_arg) { |
397 struct evsql_query *query = NULL; |
698 struct evsql_query *query = NULL; |
398 |
699 |
399 // alloc new query |
700 // alloc new query |
400 if ((query = _evsql_query_new(evsql, query_fn, cb_arg)) == NULL) |
701 if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL) |
401 goto error; |
702 goto error; |
402 |
703 |
403 // just execute the command string directly |
704 // just execute the command string directly |
404 if (_evsql_query_enqueue(evsql, query, command)) |
705 if (_evsql_query_enqueue(evsql, trans, query, command)) |
405 goto error; |
706 goto error; |
406 |
707 |
407 // ok |
708 // ok |
408 return query; |
709 return query; |
409 |
710 |
474 _evsql_query_free(query); |
775 _evsql_query_free(query); |
475 |
776 |
476 return NULL; |
777 return NULL; |
477 } |
778 } |
478 |
779 |
479 int evsql_param_string (struct evsql_query_params *params, size_t param, const char *ptr) { |
|
480 struct evsql_query_param *p = ¶ms->list[param]; |
|
481 |
|
482 assert(p->type == EVSQL_PARAM_STRING); |
|
483 |
|
484 p->data_raw = ptr; |
|
485 p->length = 0; |
|
486 |
|
487 return 0; |
|
488 } |
|
489 |
|
490 int evsql_param_uint32 (struct evsql_query_params *params, size_t param, uint32_t uval) { |
|
491 struct evsql_query_param *p = ¶ms->list[param]; |
|
492 |
|
493 assert(p->type == EVSQL_PARAM_UINT32); |
|
494 |
|
495 p->data.uint32 = htonl(uval); |
|
496 p->data_raw = (const char *) &p->data.uint32; |
|
497 p->length = sizeof(uval); |
|
498 |
|
499 return 0; |
|
500 } |
|
501 |
|
502 const char *evsql_result_error (const struct evsql_result_info *res) { |
|
503 if (!res->error) |
|
504 return "No error"; |
|
505 |
|
506 switch (res->evsql->type) { |
|
507 case EVSQL_EVPQ: |
|
508 if (!res->result.pq) |
|
509 return "unknown error (no result)"; |
|
510 |
|
511 return PQresultErrorMessage(res->result.pq); |
|
512 |
|
513 default: |
|
514 FATAL("res->evsql->type"); |
|
515 } |
|
516 |
|
517 } |
|
518 |
|
519 size_t evsql_result_rows (const struct evsql_result_info *res) { |
|
520 switch (res->evsql->type) { |
|
521 case EVSQL_EVPQ: |
|
522 return PQntuples(res->result.pq); |
|
523 |
|
524 default: |
|
525 FATAL("res->evsql->type"); |
|
526 } |
|
527 } |
|
528 |
|
529 size_t evsql_result_cols (const struct evsql_result_info *res) { |
|
530 switch (res->evsql->type) { |
|
531 case EVSQL_EVPQ: |
|
532 return PQnfields(res->result.pq); |
|
533 |
|
534 default: |
|
535 FATAL("res->evsql->type"); |
|
536 } |
|
537 } |
|
538 |
|
539 int evsql_result_binary (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, size_t size, int nullok) { |
|
540 *ptr = NULL; |
|
541 |
|
542 switch (res->evsql->type) { |
|
543 case EVSQL_EVPQ: |
|
544 if (PQgetisnull(res->result.pq, row, col)) { |
|
545 if (nullok) |
|
546 return 0; |
|
547 else |
|
548 ERROR("[%zu:%zu] field is null", row, col); |
|
549 } |
|
550 |
|
551 if (PQfformat(res->result.pq, col) != 1) |
|
552 ERROR("[%zu:%zu] PQfformat is not binary: %d", row, col, PQfformat(res->result.pq, col)); |
|
553 |
|
554 if (size && PQgetlength(res->result.pq, row, col) != size) |
|
555 ERROR("[%zu:%zu] field size mismatch: %zu -> %d", row, col, size, PQgetlength(res->result.pq, row, col)); |
|
556 |
|
557 *ptr = PQgetvalue(res->result.pq, row, col); |
|
558 |
|
559 return 0; |
|
560 |
|
561 default: |
|
562 FATAL("res->evsql->type"); |
|
563 } |
|
564 |
|
565 error: |
|
566 return -1; |
|
567 } |
|
568 |
|
569 int evsql_result_string (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, int nullok) { |
|
570 return evsql_result_binary(res, row, col, ptr, 0, nullok); |
|
571 } |
|
572 |
|
573 int evsql_result_uint16 (const struct evsql_result_info *res, size_t row, size_t col, uint16_t *uval, int nullok) { |
|
574 const char *data; |
|
575 int16_t sval; |
|
576 |
|
577 if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok)) |
|
578 goto error; |
|
579 |
|
580 sval = ntohs(*((int16_t *) data)); |
|
581 |
|
582 if (sval < 0) |
|
583 ERROR("negative value for unsigned: %d", sval); |
|
584 |
|
585 *uval = sval; |
|
586 |
|
587 return 0; |
|
588 |
|
589 error: |
|
590 return nullok ? 0 : -1; |
|
591 } |
|
592 |
|
593 int evsql_result_uint32 (const struct evsql_result_info *res, size_t row, size_t col, uint32_t *uval, int nullok) { |
|
594 const char *data; |
|
595 int32_t sval; |
|
596 |
|
597 if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok)) |
|
598 goto error; |
|
599 |
|
600 sval = ntohl(*(int32_t *) data); |
|
601 |
|
602 if (sval < 0) |
|
603 ERROR("negative value for unsigned: %d", sval); |
|
604 |
|
605 *uval = sval; |
|
606 |
|
607 return 0; |
|
608 |
|
609 error: |
|
610 return nullok ? 0 : -1; |
|
611 } |
|
612 |
|
613 int evsql_result_uint64 (const struct evsql_result_info *res, size_t row, size_t col, uint64_t *uval, int nullok) { |
|
614 const char *data; |
|
615 int64_t sval; |
|
616 |
|
617 if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok)) |
|
618 goto error; |
|
619 |
|
620 sval = ntohq(*(int64_t *) data); |
|
621 |
|
622 if (sval < 0) |
|
623 ERROR("negative value for unsigned: %ld", sval); |
|
624 |
|
625 *uval = sval; |
|
626 |
|
627 return 0; |
|
628 |
|
629 error: |
|
630 return nullok ? 0 : -1; |
|
631 } |
|
632 |
|
633 void evsql_result_free (const struct evsql_result_info *res) { |
|
634 switch (res->evsql->type) { |
|
635 case EVSQL_EVPQ: |
|
636 return PQclear(res->result.pq); |
|
637 |
|
638 default: |
|
639 FATAL("res->evsql->type"); |
|
640 } |
|
641 } |
|