|
1 #define _GNU_SOURCE |
|
2 #include <stdlib.h> |
|
3 #include <assert.h> |
|
4 #include <string.h> |
|
5 |
|
6 #include "evsql.h" |
|
7 #include "../lib/log.h" |
|
8 #include "../lib/error.h" |
|
9 #include "../lib/misc.h" |
|
10 |
|
11 /* |
|
12 * A couple function prototypes |
|
13 */ |
|
14 static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn); |
|
15 |
|
16 /* |
|
17 * Actually execute the given query. |
|
18 * |
|
19 * The backend should be able to accept the query at this time. |
|
20 * |
|
21 * You should assume that if trying to execute a query fails, then the connection should also be considred as failed. |
|
22 */ |
|
23 static int _evsql_query_exec (struct evsql_conn *conn, struct evsql_query *query, const char *command) { |
|
24 int err; |
|
25 |
|
26 switch (conn->evsql->type) { |
|
27 case EVSQL_EVPQ: |
|
28 // got params? |
|
29 if (query->params.count) { |
|
30 err = evpq_query_params(conn->engine.evpq, command, |
|
31 query->params.count, |
|
32 query->params.types, |
|
33 query->params.values, |
|
34 query->params.lengths, |
|
35 query->params.formats, |
|
36 query->params.result_format |
|
37 ); |
|
38 |
|
39 } else { |
|
40 // plain 'ole query |
|
41 err = evpq_query(conn->engine.evpq, command); |
|
42 } |
|
43 |
|
44 if (err) { |
|
45 if (PQstatus(evpq_pgconn(conn->engine.evpq)) != CONNECTION_OK) |
|
46 WARNING("conn failed"); |
|
47 else |
|
48 WARNING("query failed, dropping conn as well"); |
|
49 } |
|
50 |
|
51 break; |
|
52 |
|
53 default: |
|
54 FATAL("evsql->type"); |
|
55 } |
|
56 |
|
57 if (!err) |
|
58 // assign the query |
|
59 conn->query = query; |
|
60 |
|
61 return err; |
|
62 } |
|
63 |
|
64 /* |
|
65 * Free the query and related resources, doesn't trigger any callbacks or remove from any queues. |
|
66 * |
|
67 * The command should already be taken care of (NULL). |
|
68 */ |
|
69 static void _evsql_query_free (struct evsql_query *query) { |
|
70 if (!query) |
|
71 return; |
|
72 |
|
73 assert(query->command == NULL); |
|
74 |
|
75 // free params if present |
|
76 free(query->params.types); |
|
77 free(query->params.values); |
|
78 free(query->params.lengths); |
|
79 free(query->params.formats); |
|
80 |
|
81 // free the query itself |
|
82 free(query); |
|
83 } |
|
84 |
|
85 /* |
|
86 * Execute the callback if res is given, and free the query. |
|
87 * |
|
88 * The query has been aborted, it will simply be freed |
|
89 */ |
|
90 static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *res) { |
|
91 if (res) { |
|
92 if (query->cb_fn) |
|
93 // call the callback |
|
94 query->cb_fn(res, query->cb_arg); |
|
95 else |
|
96 WARNING("supressing cb_fn because query was aborted"); |
|
97 } |
|
98 |
|
99 // free |
|
100 _evsql_query_free(query); |
|
101 } |
|
102 |
|
103 /* |
|
104 * XXX: |
|
105 * / |
|
106 static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *res) { |
|
107 struct evsql_query *query; |
|
108 |
|
109 // clear the queue |
|
110 while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) { |
|
111 _evsql_query_done(query, res); |
|
112 |
|
113 TAILQ_REMOVE(&evsql->query_queue, query, entry); |
|
114 } |
|
115 |
|
116 // free |
|
117 free(evsql); |
|
118 } |
|
119 */ |
|
120 |
|
121 /* |
|
122 * Free the transaction, it should already be deassociated from the query and conn. |
|
123 */ |
|
124 static void _evsql_trans_free (struct evsql_trans *trans) { |
|
125 // ensure we don't leak anything |
|
126 assert(trans->query == NULL); |
|
127 assert(trans->conn == NULL); |
|
128 |
|
129 // free |
|
130 free(trans); |
|
131 } |
|
132 |
|
133 /* |
|
134 * Release a connection. It should already be deassociated from the trans and query. |
|
135 * |
|
136 * Releases the engine, removes from the conn_list and frees this. |
|
137 */ |
|
138 static void _evsql_conn_release (struct evsql_conn *conn) { |
|
139 // ensure we don't leak anything |
|
140 assert(conn->trans == NULL); |
|
141 assert(conn->query == NULL); |
|
142 |
|
143 // release the engine |
|
144 switch (conn->evsql->type) { |
|
145 case EVSQL_EVPQ: |
|
146 evpq_release(conn->engine.evpq); |
|
147 break; |
|
148 |
|
149 default: |
|
150 FATAL("evsql->type"); |
|
151 } |
|
152 |
|
153 // remove from list |
|
154 LIST_REMOVE(conn, entry); |
|
155 |
|
156 // catch deadlocks |
|
157 assert(!LIST_EMPTY(&conn->evsql->conn_list) || TAILQ_EMPTY(&conn->evsql->query_queue)); |
|
158 |
|
159 // free |
|
160 free(conn); |
|
161 } |
|
162 |
|
163 /* |
|
164 * Release a transaction, it should already be deassociated from the query. |
|
165 * |
|
166 * Perform a two-way-deassociation with the conn, and then free the trans. |
|
167 */ |
|
168 static void _evsql_trans_release (struct evsql_trans *trans) { |
|
169 assert(trans->query == NULL); |
|
170 assert(trans->conn != NULL); |
|
171 |
|
172 // deassociate the conn |
|
173 trans->conn->trans = NULL; trans->conn = NULL; |
|
174 |
|
175 // free the trans |
|
176 _evsql_trans_free(trans); |
|
177 } |
|
178 |
|
179 /* |
|
180 * Fail a single query, this will trigger the callback and free it. |
|
181 * |
|
182 * NOTE: Only for *TRANSACTIONLESS* queries. |
|
183 */ |
|
184 static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) { |
|
185 struct evsql_result_info res; ZINIT(res); |
|
186 |
|
187 // set up the result_info |
|
188 res.evsql = evsql; |
|
189 res.trans = NULL; |
|
190 res.error = 1; |
|
191 |
|
192 // finish off the query |
|
193 _evsql_query_done(query, &res); |
|
194 } |
|
195 |
|
196 /* |
|
197 * Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the |
|
198 * conn, and then free the trans. |
|
199 */ |
|
200 static void _evsql_trans_fail (struct evsql_trans *trans) { |
|
201 if (trans->query) { |
|
202 // free the query silently |
|
203 _evsql_query_free(trans->query); trans->query = NULL; |
|
204 |
|
205 // also deassociate it from the conn! |
|
206 trans->conn->query = NULL; |
|
207 } |
|
208 |
|
209 // tell the user |
|
210 // XXX: trans is in a bad state during this call |
|
211 if (trans->error_fn) |
|
212 trans->error_fn(trans, trans->cb_arg); |
|
213 else |
|
214 WARNING("supressing error because error_fn was NULL"); |
|
215 |
|
216 // deassociate and release the conn |
|
217 trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL; |
|
218 |
|
219 // pump the queue for requests that were waiting for this connection |
|
220 _evsql_pump(trans->evsql, NULL); |
|
221 |
|
222 // free the trans |
|
223 _evsql_trans_free(trans); |
|
224 } |
|
225 |
|
226 /* |
|
227 * Fail a connection. If the connection is transactional, this will just call _evsql_trans_fail, but otherwise it will |
|
228 * fail any ongoing query, and then release the connection. |
|
229 */ |
|
230 static void _evsql_conn_fail (struct evsql_conn *conn) { |
|
231 if (conn->trans) { |
|
232 // let transactions handle their connection failures |
|
233 _evsql_trans_fail(conn->trans); |
|
234 |
|
235 } else { |
|
236 if (conn->query) { |
|
237 // fail the in-progress query |
|
238 _evsql_query_fail(conn->evsql, conn->query); conn->query = NULL; |
|
239 } |
|
240 |
|
241 // finish off the whole connection |
|
242 _evsql_conn_release(conn); |
|
243 } |
|
244 } |
|
245 |
|
246 /* |
|
247 * Processes enqueued non-transactional queries until the queue is empty, or we managed to exec a query. |
|
248 * |
|
249 * If execing a query on a connection fails, both the query and the connection are failed (in that order). |
|
250 * |
|
251 * Any further queries will then also be failed, because there's no reconnection/retry logic yet. |
|
252 * |
|
253 * This means that if conn is NULL, all queries are failed. |
|
254 */ |
|
255 static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) { |
|
256 struct evsql_query *query; |
|
257 int err; |
|
258 |
|
259 // look for waiting queries |
|
260 while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) { |
|
261 // dequeue |
|
262 TAILQ_REMOVE(&evsql->query_queue, query, entry); |
|
263 |
|
264 if (conn) { |
|
265 // try and execute it |
|
266 err = _evsql_query_exec(conn, query, query->command); |
|
267 } |
|
268 |
|
269 // free the command buf |
|
270 free(query->command); query->command = NULL; |
|
271 |
|
272 if (err || !conn) { |
|
273 if (!conn) { |
|
274 // warn when dropping queries |
|
275 WARNING("failing query becuse there are no conns"); |
|
276 } |
|
277 |
|
278 // fail the query |
|
279 _evsql_query_fail(evsql, query); |
|
280 |
|
281 if (conn) { |
|
282 // fail the connection |
|
283 WARNING("failing the connection because a query-exec failed"); |
|
284 |
|
285 _evsql_conn_fail(conn); conn = NULL; |
|
286 } |
|
287 |
|
288 } else { |
|
289 // we have succesfully enqueued a query, and we can wait for this connection to complete |
|
290 break; |
|
291 |
|
292 } |
|
293 |
|
294 // handle the rest of the queue |
|
295 } |
|
296 |
|
297 // ok |
|
298 return; |
|
299 } |
|
300 |
|
301 /* |
|
302 * Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use. |
|
303 */ |
|
304 static void _evsql_trans_ready (const struct evsql_result_info *res, void *arg) { |
|
305 (void) arg; |
|
306 |
|
307 assert(res->trans); |
|
308 |
|
309 // check for errors |
|
310 if (res->error) |
|
311 ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res)); |
|
312 |
|
313 // transaction is now ready for use |
|
314 res->trans->ready_fn(res->trans, res->trans->cb_arg); |
|
315 |
|
316 // good |
|
317 return; |
|
318 |
|
319 error: |
|
320 _evsql_trans_fail(res->trans); |
|
321 } |
|
322 |
|
323 /* |
|
324 * The transaction's connection is ready, send the 'BEGIN' query. |
|
325 * |
|
326 * If anything fails, calls _evsql_trans_fail and returns nonzero, zero on success |
|
327 */ |
|
328 static int _evsql_trans_conn_ready (struct evsql *evsql, struct evsql_trans *trans) { |
|
329 char trans_sql[EVSQL_QUERY_BEGIN_BUF]; |
|
330 const char *isolation_level; |
|
331 int ret; |
|
332 |
|
333 // determine the isolation_level to use |
|
334 switch (trans->type) { |
|
335 case EVSQL_TRANS_DEFAULT: |
|
336 isolation_level = NULL; break; |
|
337 |
|
338 case EVSQL_TRANS_SERIALIZABLE: |
|
339 isolation_level = "SERIALIZABLE"; break; |
|
340 |
|
341 case EVSQL_TRANS_REPEATABLE_READ: |
|
342 isolation_level = "REPEATABLE READ"; break; |
|
343 |
|
344 case EVSQL_TRANS_READ_COMMITTED: |
|
345 isolation_level = "READ COMMITTED"; break; |
|
346 |
|
347 case EVSQL_TRANS_READ_UNCOMMITTED: |
|
348 isolation_level = "READ UNCOMMITTED"; break; |
|
349 |
|
350 default: |
|
351 FATAL("trans->type: %d", trans->type); |
|
352 } |
|
353 |
|
354 // build the trans_sql |
|
355 if (isolation_level) |
|
356 ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION ISOLATION LEVEL %s", isolation_level); |
|
357 else |
|
358 ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION"); |
|
359 |
|
360 // make sure it wasn't truncated |
|
361 if (ret >= EVSQL_QUERY_BEGIN_BUF) |
|
362 ERROR("trans_sql overflow: %d >= %d", ret, EVSQL_QUERY_BEGIN_BUF); |
|
363 |
|
364 // execute the query |
|
365 if (evsql_query(evsql, trans, trans_sql, _evsql_trans_ready, NULL) == NULL) |
|
366 ERROR("evsql_query"); |
|
367 |
|
368 // success |
|
369 return 0; |
|
370 |
|
371 error: |
|
372 // fail the transaction |
|
373 _evsql_trans_fail(trans); |
|
374 |
|
375 return -1; |
|
376 } |
|
377 |
|
378 /* |
|
379 * The evpq connection was succesfully established. |
|
380 */ |
|
381 static void _evsql_evpq_connected (struct evpq_conn *_conn, void *arg) { |
|
382 struct evsql_conn *conn = arg; |
|
383 |
|
384 if (conn->trans) |
|
385 // notify the transaction |
|
386 // don't care about errors |
|
387 (void) _evsql_trans_conn_ready(conn->evsql, conn->trans); |
|
388 |
|
389 else |
|
390 // pump any waiting transactionless queries |
|
391 _evsql_pump(conn->evsql, conn); |
|
392 } |
|
393 |
|
394 /* |
|
395 * Got one result on this evpq connection. |
|
396 */ |
|
397 static void _evsql_evpq_result (struct evpq_conn *_conn, PGresult *result, void *arg) { |
|
398 struct evsql_conn *conn = arg; |
|
399 struct evsql_query *query = conn->query; |
|
400 |
|
401 assert(query != NULL); |
|
402 |
|
403 // if we get multiple results, only return the first one |
|
404 if (query->result.evpq) { |
|
405 WARNING("[evsql] evpq query returned multiple results, discarding previous one"); |
|
406 |
|
407 PQclear(query->result.evpq); query->result.evpq = NULL; |
|
408 } |
|
409 |
|
410 // remember the result |
|
411 query->result.evpq = result; |
|
412 } |
|
413 |
|
414 /* |
|
415 * No more results for this query. |
|
416 */ |
|
417 static void _evsql_evpq_done (struct evpq_conn *_conn, void *arg) { |
|
418 struct evsql_conn *conn = arg; |
|
419 struct evsql_query *query = conn->query; |
|
420 struct evsql_result_info res; ZINIT(res); |
|
421 |
|
422 assert(query != NULL); |
|
423 |
|
424 // set up the result_info |
|
425 res.evsql = conn->evsql; |
|
426 res.trans = conn->trans; |
|
427 |
|
428 if (query->result.evpq == NULL) { |
|
429 // if a query didn't return any results (bug?), warn and fail the query |
|
430 WARNING("[evsql] evpq query didn't return any results"); |
|
431 |
|
432 res.error = 1; |
|
433 |
|
434 } else if (strcmp(PQresultErrorMessage(query->result.evpq), "") != 0) { |
|
435 // the query failed with some error |
|
436 res.error = 1; |
|
437 res.result.pq = query->result.evpq; |
|
438 |
|
439 } else { |
|
440 res.error = 0; |
|
441 res.result.pq = query->result.evpq; |
|
442 |
|
443 } |
|
444 |
|
445 // de-associate the query from the connection |
|
446 conn->query = NULL; |
|
447 |
|
448 // how we handle query completion depends on if we're a transaction or not |
|
449 if (conn->trans) { |
|
450 // we can deassign the trans's query |
|
451 conn->trans->query = NULL; |
|
452 |
|
453 // was an abort? |
|
454 if (!query->cb_fn) |
|
455 // notify the user that the transaction query has been aborted |
|
456 conn->trans->ready_fn(conn->trans, conn->trans->cb_arg); |
|
457 |
|
458 // then hand the query to the user |
|
459 _evsql_query_done(query, &res); |
|
460 |
|
461 } else { |
|
462 // a transactionless query, so just finish it off and pump any other waiting ones |
|
463 _evsql_query_done(query, &res); |
|
464 |
|
465 // pump the next one |
|
466 _evsql_pump(conn->evsql, conn); |
|
467 } |
|
468 } |
|
469 |
|
470 /* |
|
471 * The connection failed. |
|
472 */ |
|
473 static void _evsql_evpq_failure (struct evpq_conn *_conn, void *arg) { |
|
474 struct evsql_conn *conn = arg; |
|
475 |
|
476 // just fail the conn |
|
477 _evsql_conn_fail(conn); |
|
478 } |
|
479 |
|
480 /* |
|
481 * Our evpq behaviour |
|
482 */ |
|
483 static struct evpq_callback_info _evsql_evpq_cb_info = { |
|
484 .fn_connected = _evsql_evpq_connected, |
|
485 .fn_result = _evsql_evpq_result, |
|
486 .fn_done = _evsql_evpq_done, |
|
487 .fn_failure = _evsql_evpq_failure, |
|
488 }; |
|
489 |
|
490 /* |
|
491 * Allocate the generic evsql context. |
|
492 */ |
|
493 static struct evsql *_evsql_new_base (struct event_base *ev_base, evsql_error_cb error_fn, void *cb_arg) { |
|
494 struct evsql *evsql = NULL; |
|
495 |
|
496 // allocate it |
|
497 if ((evsql = calloc(1, sizeof(*evsql))) == NULL) |
|
498 ERROR("calloc"); |
|
499 |
|
500 // store |
|
501 evsql->ev_base = ev_base; |
|
502 evsql->error_fn = error_fn; |
|
503 evsql->cb_arg = cb_arg; |
|
504 |
|
505 // init |
|
506 LIST_INIT(&evsql->conn_list); |
|
507 TAILQ_INIT(&evsql->query_queue); |
|
508 |
|
509 // done |
|
510 return evsql; |
|
511 |
|
512 error: |
|
513 return NULL; |
|
514 } |
|
515 |
|
516 /* |
|
517 * Start a new connection and add it to the list, it won't be ready until _evsql_evpq_connected is called |
|
518 */ |
|
519 static struct evsql_conn *_evsql_conn_new (struct evsql *evsql) { |
|
520 struct evsql_conn *conn = NULL; |
|
521 |
|
522 // allocate |
|
523 if ((conn = calloc(1, sizeof(*conn))) == NULL) |
|
524 ERROR("calloc"); |
|
525 |
|
526 // init |
|
527 conn->evsql = evsql; |
|
528 |
|
529 // connect the engine |
|
530 switch (evsql->type) { |
|
531 case EVSQL_EVPQ: |
|
532 if ((conn->engine.evpq = evpq_connect(evsql->ev_base, evsql->engine_conf.evpq, _evsql_evpq_cb_info, conn)) == NULL) |
|
533 goto error; |
|
534 |
|
535 break; |
|
536 |
|
537 default: |
|
538 FATAL("evsql->type"); |
|
539 } |
|
540 |
|
541 // add it to the list |
|
542 LIST_INSERT_HEAD(&evsql->conn_list, conn, entry); |
|
543 |
|
544 // success |
|
545 return conn; |
|
546 |
|
547 error: |
|
548 free(conn); |
|
549 |
|
550 return NULL; |
|
551 } |
|
552 |
|
553 struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) { |
|
554 struct evsql *evsql = NULL; |
|
555 |
|
556 // base init |
|
557 if ((evsql = _evsql_new_base (ev_base, error_fn, cb_arg)) == NULL) |
|
558 goto error; |
|
559 |
|
560 // store conf |
|
561 evsql->engine_conf.evpq = pq_conninfo; |
|
562 |
|
563 // pre-create one connection |
|
564 if (_evsql_conn_new(evsql) == NULL) |
|
565 goto error; |
|
566 |
|
567 // done |
|
568 return evsql; |
|
569 |
|
570 error: |
|
571 // XXX: more complicated than this? |
|
572 free(evsql); |
|
573 |
|
574 return NULL; |
|
575 } |
|
576 |
|
577 /* |
|
578 * Checks if the connection is already allocated for some other trans/query. |
|
579 * |
|
580 * Returns: |
|
581 * 0 connection idle, can be allocated |
|
582 * >1 connection busy |
|
583 */ |
|
584 static int _evsql_conn_busy (struct evsql_conn *conn) { |
|
585 // transactions get the connection to themselves |
|
586 if (conn->trans) |
|
587 return 1; |
|
588 |
|
589 // if it has a query assigned, it's busy |
|
590 if (conn->query) |
|
591 return 1; |
|
592 |
|
593 // otherwise, it's all idle |
|
594 return 0; |
|
595 } |
|
596 |
|
597 /* |
|
598 * Checks if the connection is ready for use (i.e. _evsql_evpq_connected was called). |
|
599 * |
|
600 * The connection should not already have a query running. |
|
601 * |
|
602 * Returns |
|
603 * <0 the connection is not valid (failed, query in progress) |
|
604 * 0 the connection is still pending, and will become ready at some point |
|
605 * >0 it's ready |
|
606 */ |
|
607 static int _evsql_conn_ready (struct evsql_conn *conn) { |
|
608 switch (conn->evsql->type) { |
|
609 case EVSQL_EVPQ: { |
|
610 enum evpq_state state = evpq_state(conn->engine.evpq); |
|
611 |
|
612 switch (state) { |
|
613 case EVPQ_CONNECT: |
|
614 return 0; |
|
615 |
|
616 case EVPQ_CONNECTED: |
|
617 return 1; |
|
618 |
|
619 case EVPQ_QUERY: |
|
620 case EVPQ_INIT: |
|
621 case EVPQ_FAILURE: |
|
622 return -1; |
|
623 |
|
624 default: |
|
625 FATAL("evpq_state: %d", state); |
|
626 } |
|
627 |
|
628 } |
|
629 |
|
630 default: |
|
631 FATAL("evsql->type: %d", conn->evsql->type); |
|
632 } |
|
633 } |
|
634 |
|
635 /* |
|
636 * Allocate a connection for use and return it via *conn_ptr, or if may_queue is nonzero and the connection pool is |
|
637 * getting full, return NULL (query should be queued). |
|
638 * |
|
639 * Note that the returned connection might not be ready for use yet (if we created a new one, see _evsql_conn_ready). |
|
640 * |
|
641 * Returns zero if a connection was found or the request should be queued, or nonzero if something failed and the |
|
642 * request should be dropped. |
|
643 */ |
|
644 static int _evsql_conn_get (struct evsql *evsql, struct evsql_conn **conn_ptr, int may_queue) { |
|
645 int have_nontrans = 0; |
|
646 *conn_ptr = NULL; |
|
647 |
|
648 // find a connection that isn't busy and is ready (unless the query queue is empty). |
|
649 LIST_FOREACH(*conn_ptr, &evsql->conn_list, entry) { |
|
650 // we can only have a query enqueue itself if there is a non-trans conn it can later use |
|
651 if (!(*conn_ptr)->trans) |
|
652 have_nontrans = 1; |
|
653 |
|
654 // skip busy conns always |
|
655 if (_evsql_conn_busy(*conn_ptr)) |
|
656 continue; |
|
657 |
|
658 // accept pending conns as long as there are NO enqueued queries (might cause deadlock otherwise) |
|
659 if (_evsql_conn_ready(*conn_ptr) == 0 && TAILQ_EMPTY(&evsql->query_queue)) |
|
660 break; |
|
661 |
|
662 // accept conns that are in a fully ready state |
|
663 if (_evsql_conn_ready(*conn_ptr) > 0) |
|
664 break; |
|
665 } |
|
666 |
|
667 // if we found an idle connection, we can just return that right away |
|
668 if (*conn_ptr) |
|
669 return 0; |
|
670 |
|
671 // return NULL if may_queue and we have a non-trans conn that we can, at some point, use |
|
672 if (may_queue && have_nontrans) |
|
673 return 0; |
|
674 |
|
675 // we need to open a new connection |
|
676 if ((*conn_ptr = _evsql_conn_new(evsql)) == NULL) |
|
677 goto error; |
|
678 |
|
679 // good |
|
680 return 0; |
|
681 error: |
|
682 return -1; |
|
683 } |
|
684 |
|
685 struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, evsql_trans_done_cb done_fn, void *cb_arg) { |
|
686 struct evsql_trans *trans = NULL; |
|
687 |
|
688 // allocate it |
|
689 if ((trans = calloc(1, sizeof(*trans))) == NULL) |
|
690 ERROR("calloc"); |
|
691 |
|
692 // store |
|
693 trans->evsql = evsql; |
|
694 trans->ready_fn = ready_fn; |
|
695 trans->done_fn = done_fn; |
|
696 trans->cb_arg = cb_arg; |
|
697 trans->type = type; |
|
698 |
|
699 // find a connection |
|
700 if (_evsql_conn_get(evsql, &trans->conn, 0)) |
|
701 ERROR("_evsql_conn_get"); |
|
702 |
|
703 // associate the conn |
|
704 trans->conn->trans = trans; |
|
705 |
|
706 // is it already ready? |
|
707 if (_evsql_conn_ready(trans->conn) > 0) { |
|
708 // call _evsql_trans_conn_ready directly, it will handle cleanup (silently, !error_fn) |
|
709 if (_evsql_trans_conn_ready(evsql, trans)) { |
|
710 // return NULL directly |
|
711 return NULL; |
|
712 } |
|
713 |
|
714 } else { |
|
715 // otherwise, wait for the conn to be ready |
|
716 |
|
717 } |
|
718 |
|
719 // and let it pass errors to the user |
|
720 trans->error_fn = error_fn; |
|
721 |
|
722 // ok |
|
723 return trans; |
|
724 |
|
725 error: |
|
726 free(trans); |
|
727 |
|
728 return NULL; |
|
729 } |
|
730 |
|
731 /* |
|
732 * Validate and allocate the basic stuff for a new query. |
|
733 */ |
|
734 static struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) { |
|
735 struct evsql_query *query = NULL; |
|
736 |
|
737 // if it's part of a trans, then make sure the trans is idle |
|
738 if (trans && trans->query) |
|
739 ERROR("transaction is busy"); |
|
740 |
|
741 // allocate it |
|
742 if ((query = calloc(1, sizeof(*query))) == NULL) |
|
743 ERROR("calloc"); |
|
744 |
|
745 // store |
|
746 query->cb_fn = query_fn; |
|
747 query->cb_arg = cb_arg; |
|
748 |
|
749 // success |
|
750 return query; |
|
751 |
|
752 error: |
|
753 return NULL; |
|
754 } |
|
755 |
|
756 /* |
|
757 * Handle a new query. |
|
758 * |
|
759 * For transactions this will associate the query and then execute it, otherwise this will either find an idle |
|
760 * connection and send the query, or enqueue it. |
|
761 */ |
|
762 static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_trans *trans, struct evsql_query *query, const char *command) { |
|
763 // transaction queries are handled differently |
|
764 if (trans) { |
|
765 // it's an in-transaction query |
|
766 assert(trans->query == NULL); |
|
767 |
|
768 // assign the query |
|
769 trans->query = query; |
|
770 |
|
771 // execute directly |
|
772 if (_evsql_query_exec(trans->conn, query, command)) { |
|
773 // ack, fail the transaction |
|
774 _evsql_trans_fail(trans); |
|
775 |
|
776 // caller frees query |
|
777 goto error; |
|
778 } |
|
779 |
|
780 } else { |
|
781 struct evsql_conn *conn; |
|
782 |
|
783 // find an idle connection |
|
784 if ((_evsql_conn_get(evsql, &conn, 1))) |
|
785 ERROR("couldn't allocate a connection for the query"); |
|
786 |
|
787 // we must enqueue if no idle conn or the conn is not yet ready |
|
788 if (conn && _evsql_conn_ready(conn) > 0) { |
|
789 // execute directly |
|
790 if (_evsql_query_exec(conn, query, command)) { |
|
791 // ack, fail the connection |
|
792 _evsql_conn_fail(conn); |
|
793 |
|
794 // make sure we don't deadlock any queries, but if this query got a conn directly, then we shouldn't |
|
795 // have any queries enqueued anyways |
|
796 assert(TAILQ_EMPTY(&evsql->query_queue)); |
|
797 |
|
798 // caller frees query |
|
799 goto error; |
|
800 } |
|
801 |
|
802 } else { |
|
803 // copy the command for later execution |
|
804 if ((query->command = strdup(command)) == NULL) |
|
805 ERROR("strdup"); |
|
806 |
|
807 // enqueue until some connection pumps the queue |
|
808 TAILQ_INSERT_TAIL(&evsql->query_queue, query, entry); |
|
809 } |
|
810 } |
|
811 |
|
812 // ok, good |
|
813 return 0; |
|
814 |
|
815 error: |
|
816 return -1; |
|
817 } |
|
818 |
|
819 struct evsql_query *evsql_query (struct evsql *evsql, struct evsql_trans *trans, const char *command, evsql_query_cb query_fn, void *cb_arg) { |
|
820 struct evsql_query *query = NULL; |
|
821 |
|
822 // alloc new query |
|
823 if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL) |
|
824 goto error; |
|
825 |
|
826 // just execute the command string directly |
|
827 if (_evsql_query_enqueue(evsql, trans, query, command)) |
|
828 goto error; |
|
829 |
|
830 // ok |
|
831 return query; |
|
832 |
|
833 error: |
|
834 _evsql_query_free(query); |
|
835 |
|
836 return NULL; |
|
837 } |
|
838 |
|
839 struct evsql_query *evsql_query_params (struct evsql *evsql, struct evsql_trans *trans, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg) { |
|
840 struct evsql_query *query = NULL; |
|
841 const struct evsql_query_param *param; |
|
842 int idx; |
|
843 |
|
844 // alloc new query |
|
845 if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL) |
|
846 goto error; |
|
847 |
|
848 // count the params |
|
849 for (param = params->list; param->type; param++) |
|
850 query->params.count++; |
|
851 |
|
852 // allocate the vertical storage for the parameters |
|
853 if (0 |
|
854 |
|
855 // !(query->params.types = calloc(query->params.count, sizeof(Oid))) |
|
856 || !(query->params.values = calloc(query->params.count, sizeof(char *))) |
|
857 || !(query->params.lengths = calloc(query->params.count, sizeof(int))) |
|
858 || !(query->params.formats = calloc(query->params.count, sizeof(int))) |
|
859 ) |
|
860 ERROR("calloc"); |
|
861 |
|
862 // transform |
|
863 for (param = params->list, idx = 0; param->type; param++, idx++) { |
|
864 // `types` stays NULL |
|
865 // query->params.types[idx] = 0; |
|
866 |
|
867 // values |
|
868 query->params.values[idx] = param->data_raw; |
|
869 |
|
870 // lengths |
|
871 query->params.lengths[idx] = param->length; |
|
872 |
|
873 // formats, binary if length is nonzero |
|
874 query->params.formats[idx] = param->length ? 1 : 0; |
|
875 } |
|
876 |
|
877 // result format |
|
878 switch (params->result_fmt) { |
|
879 case EVSQL_FMT_TEXT: |
|
880 query->params.result_format = 0; break; |
|
881 |
|
882 case EVSQL_FMT_BINARY: |
|
883 query->params.result_format = 1; break; |
|
884 |
|
885 default: |
|
886 FATAL("params.result_fmt: %d", params->result_fmt); |
|
887 } |
|
888 |
|
889 // execute it |
|
890 if (_evsql_query_enqueue(evsql, trans, query, command)) |
|
891 goto error; |
|
892 |
|
893 // ok |
|
894 return query; |
|
895 |
|
896 error: |
|
897 _evsql_query_free(query); |
|
898 |
|
899 return NULL; |
|
900 } |
|
901 |
|
902 void evsql_query_abort (struct evsql_trans *trans, struct evsql_query *query) { |
|
903 assert(query); |
|
904 |
|
905 if (trans) { |
|
906 // must be the right query |
|
907 assert(trans->query == query); |
|
908 } |
|
909 |
|
910 // just strip the callback and wait for it to complete as normal |
|
911 query->cb_fn = NULL; |
|
912 } |
|
913 |
|
914 void _evsql_trans_commit_res (const struct evsql_result_info *res, void *arg) { |
|
915 (void) arg; |
|
916 |
|
917 assert(res->trans); |
|
918 |
|
919 // check for errors |
|
920 if (res->error) |
|
921 ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res)); |
|
922 |
|
923 // transaction is now done |
|
924 res->trans->done_fn(res->trans, res->trans->cb_arg); |
|
925 |
|
926 // release it |
|
927 _evsql_trans_release(res->trans); |
|
928 |
|
929 // success |
|
930 return; |
|
931 |
|
932 error: |
|
933 _evsql_trans_fail(res->trans); |
|
934 } |
|
935 |
|
936 int evsql_trans_commit (struct evsql_trans *trans) { |
|
937 static const char *sql = "COMMIT TRANSACTION"; |
|
938 |
|
939 if (trans->query) |
|
940 ERROR("cannot COMMIT because transaction is still busy"); |
|
941 |
|
942 // query |
|
943 if (evsql_query(trans->evsql, trans, sql, _evsql_trans_commit_res, NULL) == NULL) |
|
944 goto error; |
|
945 |
|
946 // mark it as commited in case someone wants to abort it |
|
947 trans->has_commit = 1; |
|
948 |
|
949 // success |
|
950 return 0; |
|
951 |
|
952 error: |
|
953 return -1; |
|
954 } |
|
955 |
|
956 void _evsql_trans_rollback_res (const struct evsql_result_info *res, void *arg) { |
|
957 (void) arg; |
|
958 |
|
959 assert(res->trans); |
|
960 |
|
961 // fail the connection on errors |
|
962 if (res->error) |
|
963 ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res)); |
|
964 |
|
965 // release it |
|
966 _evsql_trans_release(res->trans); |
|
967 |
|
968 // success |
|
969 return; |
|
970 |
|
971 error: |
|
972 // fail the connection too, errors are supressed |
|
973 _evsql_trans_fail(res->trans); |
|
974 } |
|
975 |
|
976 /* |
|
977 * Used as the ready_fn callback in case of abort, otherwise directly |
|
978 */ |
|
979 void _evsql_trans_rollback (struct evsql_trans *trans, void *unused) { |
|
980 static const char *sql = "ROLLBACK TRANSACTION"; |
|
981 |
|
982 (void) unused; |
|
983 |
|
984 // query |
|
985 if (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, NULL) == NULL) { |
|
986 // fail the transaction/connection |
|
987 _evsql_trans_fail(trans); |
|
988 } |
|
989 |
|
990 } |
|
991 |
|
992 void evsql_trans_abort (struct evsql_trans *trans) { |
|
993 // supress errors |
|
994 trans->error_fn = NULL; |
|
995 |
|
996 if (trans->has_commit) { |
|
997 // abort after commit doesn't make sense |
|
998 FATAL("transaction was already commited"); |
|
999 } |
|
1000 |
|
1001 if (trans->query) { |
|
1002 // gah, some query is running |
|
1003 WARNING("aborting pending query"); |
|
1004 |
|
1005 // prepare to rollback once complete |
|
1006 trans->ready_fn = _evsql_trans_rollback; |
|
1007 |
|
1008 // abort |
|
1009 evsql_query_abort(trans, trans->query); |
|
1010 |
|
1011 } else { |
|
1012 // just rollback directly |
|
1013 _evsql_trans_rollback(trans, NULL); |
|
1014 |
|
1015 } |
|
1016 } |
|
1017 |