1 #define _GNU_SOURCE |
|
2 #include <stdlib.h> |
|
3 #include <assert.h> |
|
4 #include <string.h> |
|
5 |
|
6 #include "evsql.h" |
|
7 #include "evsql_internal.h" |
|
8 #include "evpq.h" |
|
9 #include "lib/log.h" |
|
10 #include "lib/error.h" |
|
11 #include "lib/misc.h" |
|
12 |
|
13 /* |
|
14 * A couple function prototypes |
|
15 */ |
|
16 static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn); |
|
17 |
|
18 /* |
|
19 * Actually execute the given query. |
|
20 * |
|
21 * The backend should be able to accept the query at this time. |
|
22 * |
|
23 * You should assume that if trying to execute a query fails, then the connection should also be considred as failed. |
|
24 */ |
|
25 static int _evsql_query_exec (struct evsql_conn *conn, struct evsql_query *query, const char *command) { |
|
26 int err; |
|
27 |
|
28 switch (conn->evsql->type) { |
|
29 case EVSQL_EVPQ: |
|
30 // got params? |
|
31 if (query->params.count) { |
|
32 err = evpq_query_params(conn->engine.evpq, command, |
|
33 query->params.count, |
|
34 query->params.types, |
|
35 query->params.values, |
|
36 query->params.lengths, |
|
37 query->params.formats, |
|
38 query->params.result_format |
|
39 ); |
|
40 |
|
41 } else { |
|
42 // plain 'ole query |
|
43 err = evpq_query(conn->engine.evpq, command); |
|
44 } |
|
45 |
|
46 if (err) { |
|
47 if (PQstatus(evpq_pgconn(conn->engine.evpq)) != CONNECTION_OK) |
|
48 WARNING("conn failed"); |
|
49 else |
|
50 WARNING("query failed, dropping conn as well"); |
|
51 } |
|
52 |
|
53 break; |
|
54 |
|
55 default: |
|
56 FATAL("evsql->type"); |
|
57 } |
|
58 |
|
59 if (!err) |
|
60 // assign the query |
|
61 conn->query = query; |
|
62 |
|
63 return err; |
|
64 } |
|
65 |
|
66 /* |
|
67 * Free the query and related resources, doesn't trigger any callbacks or remove from any queues. |
|
68 * |
|
69 * The command should already be taken care of (NULL). |
|
70 */ |
|
71 static void _evsql_query_free (struct evsql_query *query) { |
|
72 if (!query) |
|
73 return; |
|
74 |
|
75 assert(query->command == NULL); |
|
76 |
|
77 // free params if present |
|
78 free(query->params.types); |
|
79 free(query->params.values); |
|
80 free(query->params.lengths); |
|
81 free(query->params.formats); |
|
82 |
|
83 // free the query itself |
|
84 free(query); |
|
85 } |
|
86 |
|
87 /* |
|
88 * Execute the callback if res is given, and free the query. |
|
89 * |
|
90 * The query has been aborted, it will simply be freed |
|
91 */ |
|
92 static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *res) { |
|
93 if (res) { |
|
94 if (query->cb_fn) |
|
95 // call the callback |
|
96 query->cb_fn(res, query->cb_arg); |
|
97 else |
|
98 WARNING("supressing cb_fn because query was aborted"); |
|
99 } |
|
100 |
|
101 // free |
|
102 _evsql_query_free(query); |
|
103 } |
|
104 |
|
105 /* |
|
106 * XXX: |
|
107 * / |
|
108 static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *res) { |
|
109 struct evsql_query *query; |
|
110 |
|
111 // clear the queue |
|
112 while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) { |
|
113 _evsql_query_done(query, res); |
|
114 |
|
115 TAILQ_REMOVE(&evsql->query_queue, query, entry); |
|
116 } |
|
117 |
|
118 // free |
|
119 free(evsql); |
|
120 } |
|
121 */ |
|
122 |
|
123 /* |
|
124 * Free the transaction, it should already be deassociated from the query and conn. |
|
125 */ |
|
126 static void _evsql_trans_free (struct evsql_trans *trans) { |
|
127 // ensure we don't leak anything |
|
128 assert(trans->query == NULL); |
|
129 assert(trans->conn == NULL); |
|
130 |
|
131 // free |
|
132 free(trans); |
|
133 } |
|
134 |
|
135 /* |
|
136 * Release a connection. It should already be deassociated from the trans and query. |
|
137 * |
|
138 * Releases the engine, removes from the conn_list and frees this. |
|
139 */ |
|
140 static void _evsql_conn_release (struct evsql_conn *conn) { |
|
141 // ensure we don't leak anything |
|
142 assert(conn->trans == NULL); |
|
143 assert(conn->query == NULL); |
|
144 |
|
145 // release the engine |
|
146 switch (conn->evsql->type) { |
|
147 case EVSQL_EVPQ: |
|
148 evpq_release(conn->engine.evpq); |
|
149 break; |
|
150 |
|
151 default: |
|
152 FATAL("evsql->type"); |
|
153 } |
|
154 |
|
155 // remove from list |
|
156 LIST_REMOVE(conn, entry); |
|
157 |
|
158 // catch deadlocks |
|
159 assert(!LIST_EMPTY(&conn->evsql->conn_list) || TAILQ_EMPTY(&conn->evsql->query_queue)); |
|
160 |
|
161 // free |
|
162 free(conn); |
|
163 } |
|
164 |
|
165 /* |
|
166 * Release a transaction, it should already be deassociated from the query. |
|
167 * |
|
168 * Perform a two-way-deassociation with the conn, and then free the trans. |
|
169 */ |
|
170 static void _evsql_trans_release (struct evsql_trans *trans) { |
|
171 assert(trans->query == NULL); |
|
172 assert(trans->conn != NULL); |
|
173 |
|
174 // deassociate the conn |
|
175 trans->conn->trans = NULL; trans->conn = NULL; |
|
176 |
|
177 // free the trans |
|
178 _evsql_trans_free(trans); |
|
179 } |
|
180 |
|
181 /* |
|
182 * Fail a single query, this will trigger the callback and free it. |
|
183 * |
|
184 * NOTE: Only for *TRANSACTIONLESS* queries. |
|
185 */ |
|
186 static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) { |
|
187 struct evsql_result_info res; ZINIT(res); |
|
188 |
|
189 // set up the result_info |
|
190 res.evsql = evsql; |
|
191 res.trans = NULL; |
|
192 res.error = 1; |
|
193 |
|
194 // finish off the query |
|
195 _evsql_query_done(query, &res); |
|
196 } |
|
197 |
|
198 /* |
|
199 * Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the |
|
200 * conn, and then free the trans. |
|
201 */ |
|
202 static void _evsql_trans_fail (struct evsql_trans *trans) { |
|
203 if (trans->query) { |
|
204 // free the query silently |
|
205 _evsql_query_free(trans->query); trans->query = NULL; |
|
206 |
|
207 // also deassociate it from the conn! |
|
208 trans->conn->query = NULL; |
|
209 } |
|
210 |
|
211 // tell the user |
|
212 // XXX: trans is in a bad state during this call |
|
213 if (trans->error_fn) |
|
214 trans->error_fn(trans, trans->cb_arg); |
|
215 else |
|
216 WARNING("supressing error because error_fn was NULL"); |
|
217 |
|
218 // deassociate and release the conn |
|
219 trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL; |
|
220 |
|
221 // pump the queue for requests that were waiting for this connection |
|
222 _evsql_pump(trans->evsql, NULL); |
|
223 |
|
224 // free the trans |
|
225 _evsql_trans_free(trans); |
|
226 } |
|
227 |
|
228 /* |
|
229 * Fail a connection. If the connection is transactional, this will just call _evsql_trans_fail, but otherwise it will |
|
230 * fail any ongoing query, and then release the connection. |
|
231 */ |
|
232 static void _evsql_conn_fail (struct evsql_conn *conn) { |
|
233 if (conn->trans) { |
|
234 // let transactions handle their connection failures |
|
235 _evsql_trans_fail(conn->trans); |
|
236 |
|
237 } else { |
|
238 if (conn->query) { |
|
239 // fail the in-progress query |
|
240 _evsql_query_fail(conn->evsql, conn->query); conn->query = NULL; |
|
241 } |
|
242 |
|
243 // finish off the whole connection |
|
244 _evsql_conn_release(conn); |
|
245 } |
|
246 } |
|
247 |
|
248 /* |
|
249 * Processes enqueued non-transactional queries until the queue is empty, or we managed to exec a query. |
|
250 * |
|
251 * If execing a query on a connection fails, both the query and the connection are failed (in that order). |
|
252 * |
|
253 * Any further queries will then also be failed, because there's no reconnection/retry logic yet. |
|
254 * |
|
255 * This means that if conn is NULL, all queries are failed. |
|
256 */ |
|
257 static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) { |
|
258 struct evsql_query *query; |
|
259 int err; |
|
260 |
|
261 // look for waiting queries |
|
262 while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) { |
|
263 // dequeue |
|
264 TAILQ_REMOVE(&evsql->query_queue, query, entry); |
|
265 |
|
266 if (conn) { |
|
267 // try and execute it |
|
268 err = _evsql_query_exec(conn, query, query->command); |
|
269 } |
|
270 |
|
271 // free the command buf |
|
272 free(query->command); query->command = NULL; |
|
273 |
|
274 if (err || !conn) { |
|
275 if (!conn) { |
|
276 // warn when dropping queries |
|
277 WARNING("failing query becuse there are no conns"); |
|
278 } |
|
279 |
|
280 // fail the query |
|
281 _evsql_query_fail(evsql, query); |
|
282 |
|
283 if (conn) { |
|
284 // fail the connection |
|
285 WARNING("failing the connection because a query-exec failed"); |
|
286 |
|
287 _evsql_conn_fail(conn); conn = NULL; |
|
288 } |
|
289 |
|
290 } else { |
|
291 // we have succesfully enqueued a query, and we can wait for this connection to complete |
|
292 break; |
|
293 |
|
294 } |
|
295 |
|
296 // handle the rest of the queue |
|
297 } |
|
298 |
|
299 // ok |
|
300 return; |
|
301 } |
|
302 |
|
303 /* |
|
304 * Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use. |
|
305 */ |
|
306 static void _evsql_trans_ready (const struct evsql_result_info *res, void *arg) { |
|
307 (void) arg; |
|
308 |
|
309 assert(res->trans); |
|
310 |
|
311 // check for errors |
|
312 if (res->error) |
|
313 ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res)); |
|
314 |
|
315 // transaction is now ready for use |
|
316 res->trans->ready_fn(res->trans, res->trans->cb_arg); |
|
317 |
|
318 // good |
|
319 return; |
|
320 |
|
321 error: |
|
322 _evsql_trans_fail(res->trans); |
|
323 } |
|
324 |
|
325 /* |
|
326 * The transaction's connection is ready, send the 'BEGIN' query. |
|
327 * |
|
328 * If anything fails, calls _evsql_trans_fail and returns nonzero, zero on success |
|
329 */ |
|
330 static int _evsql_trans_conn_ready (struct evsql *evsql, struct evsql_trans *trans) { |
|
331 char trans_sql[EVSQL_QUERY_BEGIN_BUF]; |
|
332 const char *isolation_level; |
|
333 int ret; |
|
334 |
|
335 // determine the isolation_level to use |
|
336 switch (trans->type) { |
|
337 case EVSQL_TRANS_DEFAULT: |
|
338 isolation_level = NULL; break; |
|
339 |
|
340 case EVSQL_TRANS_SERIALIZABLE: |
|
341 isolation_level = "SERIALIZABLE"; break; |
|
342 |
|
343 case EVSQL_TRANS_REPEATABLE_READ: |
|
344 isolation_level = "REPEATABLE READ"; break; |
|
345 |
|
346 case EVSQL_TRANS_READ_COMMITTED: |
|
347 isolation_level = "READ COMMITTED"; break; |
|
348 |
|
349 case EVSQL_TRANS_READ_UNCOMMITTED: |
|
350 isolation_level = "READ UNCOMMITTED"; break; |
|
351 |
|
352 default: |
|
353 FATAL("trans->type: %d", trans->type); |
|
354 } |
|
355 |
|
356 // build the trans_sql |
|
357 if (isolation_level) |
|
358 ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION ISOLATION LEVEL %s", isolation_level); |
|
359 else |
|
360 ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION"); |
|
361 |
|
362 // make sure it wasn't truncated |
|
363 if (ret >= EVSQL_QUERY_BEGIN_BUF) |
|
364 ERROR("trans_sql overflow: %d >= %d", ret, EVSQL_QUERY_BEGIN_BUF); |
|
365 |
|
366 // execute the query |
|
367 if (evsql_query(evsql, trans, trans_sql, _evsql_trans_ready, NULL) == NULL) |
|
368 ERROR("evsql_query"); |
|
369 |
|
370 // success |
|
371 return 0; |
|
372 |
|
373 error: |
|
374 // fail the transaction |
|
375 _evsql_trans_fail(trans); |
|
376 |
|
377 return -1; |
|
378 } |
|
379 |
|
380 /* |
|
381 * The evpq connection was succesfully established. |
|
382 */ |
|
383 static void _evsql_evpq_connected (struct evpq_conn *_conn, void *arg) { |
|
384 struct evsql_conn *conn = arg; |
|
385 |
|
386 if (conn->trans) |
|
387 // notify the transaction |
|
388 // don't care about errors |
|
389 (void) _evsql_trans_conn_ready(conn->evsql, conn->trans); |
|
390 |
|
391 else |
|
392 // pump any waiting transactionless queries |
|
393 _evsql_pump(conn->evsql, conn); |
|
394 } |
|
395 |
|
396 /* |
|
397 * Got one result on this evpq connection. |
|
398 */ |
|
399 static void _evsql_evpq_result (struct evpq_conn *_conn, PGresult *result, void *arg) { |
|
400 struct evsql_conn *conn = arg; |
|
401 struct evsql_query *query = conn->query; |
|
402 |
|
403 assert(query != NULL); |
|
404 |
|
405 // if we get multiple results, only return the first one |
|
406 if (query->result.evpq) { |
|
407 WARNING("[evsql] evpq query returned multiple results, discarding previous one"); |
|
408 |
|
409 PQclear(query->result.evpq); query->result.evpq = NULL; |
|
410 } |
|
411 |
|
412 // remember the result |
|
413 query->result.evpq = result; |
|
414 } |
|
415 |
|
416 /* |
|
417 * No more results for this query. |
|
418 */ |
|
419 static void _evsql_evpq_done (struct evpq_conn *_conn, void *arg) { |
|
420 struct evsql_conn *conn = arg; |
|
421 struct evsql_query *query = conn->query; |
|
422 struct evsql_result_info res; ZINIT(res); |
|
423 |
|
424 assert(query != NULL); |
|
425 |
|
426 // set up the result_info |
|
427 res.evsql = conn->evsql; |
|
428 res.trans = conn->trans; |
|
429 |
|
430 if (query->result.evpq == NULL) { |
|
431 // if a query didn't return any results (bug?), warn and fail the query |
|
432 WARNING("[evsql] evpq query didn't return any results"); |
|
433 |
|
434 res.error = 1; |
|
435 |
|
436 } else if (strcmp(PQresultErrorMessage(query->result.evpq), "") != 0) { |
|
437 // the query failed with some error |
|
438 res.error = 1; |
|
439 res.result.pq = query->result.evpq; |
|
440 |
|
441 } else { |
|
442 res.error = 0; |
|
443 res.result.pq = query->result.evpq; |
|
444 |
|
445 } |
|
446 |
|
447 // de-associate the query from the connection |
|
448 conn->query = NULL; |
|
449 |
|
450 // how we handle query completion depends on if we're a transaction or not |
|
451 if (conn->trans) { |
|
452 // we can deassign the trans's query |
|
453 conn->trans->query = NULL; |
|
454 |
|
455 // was an abort? |
|
456 if (!query->cb_fn) |
|
457 // notify the user that the transaction query has been aborted |
|
458 conn->trans->ready_fn(conn->trans, conn->trans->cb_arg); |
|
459 |
|
460 // then hand the query to the user |
|
461 _evsql_query_done(query, &res); |
|
462 |
|
463 } else { |
|
464 // a transactionless query, so just finish it off and pump any other waiting ones |
|
465 _evsql_query_done(query, &res); |
|
466 |
|
467 // pump the next one |
|
468 _evsql_pump(conn->evsql, conn); |
|
469 } |
|
470 } |
|
471 |
|
472 /* |
|
473 * The connection failed. |
|
474 */ |
|
475 static void _evsql_evpq_failure (struct evpq_conn *_conn, void *arg) { |
|
476 struct evsql_conn *conn = arg; |
|
477 |
|
478 // just fail the conn |
|
479 _evsql_conn_fail(conn); |
|
480 } |
|
481 |
|
482 /* |
|
483 * Our evpq behaviour |
|
484 */ |
|
485 static struct evpq_callback_info _evsql_evpq_cb_info = { |
|
486 .fn_connected = _evsql_evpq_connected, |
|
487 .fn_result = _evsql_evpq_result, |
|
488 .fn_done = _evsql_evpq_done, |
|
489 .fn_failure = _evsql_evpq_failure, |
|
490 }; |
|
491 |
|
492 /* |
|
493 * Allocate the generic evsql context. |
|
494 */ |
|
495 static struct evsql *_evsql_new_base (struct event_base *ev_base, evsql_error_cb error_fn, void *cb_arg) { |
|
496 struct evsql *evsql = NULL; |
|
497 |
|
498 // allocate it |
|
499 if ((evsql = calloc(1, sizeof(*evsql))) == NULL) |
|
500 ERROR("calloc"); |
|
501 |
|
502 // store |
|
503 evsql->ev_base = ev_base; |
|
504 evsql->error_fn = error_fn; |
|
505 evsql->cb_arg = cb_arg; |
|
506 |
|
507 // init |
|
508 LIST_INIT(&evsql->conn_list); |
|
509 TAILQ_INIT(&evsql->query_queue); |
|
510 |
|
511 // done |
|
512 return evsql; |
|
513 |
|
514 error: |
|
515 return NULL; |
|
516 } |
|
517 |
|
518 /* |
|
519 * Start a new connection and add it to the list, it won't be ready until _evsql_evpq_connected is called |
|
520 */ |
|
521 static struct evsql_conn *_evsql_conn_new (struct evsql *evsql) { |
|
522 struct evsql_conn *conn = NULL; |
|
523 |
|
524 // allocate |
|
525 if ((conn = calloc(1, sizeof(*conn))) == NULL) |
|
526 ERROR("calloc"); |
|
527 |
|
528 // init |
|
529 conn->evsql = evsql; |
|
530 |
|
531 // connect the engine |
|
532 switch (evsql->type) { |
|
533 case EVSQL_EVPQ: |
|
534 if ((conn->engine.evpq = evpq_connect(evsql->ev_base, evsql->engine_conf.evpq, _evsql_evpq_cb_info, conn)) == NULL) |
|
535 goto error; |
|
536 |
|
537 break; |
|
538 |
|
539 default: |
|
540 FATAL("evsql->type"); |
|
541 } |
|
542 |
|
543 // add it to the list |
|
544 LIST_INSERT_HEAD(&evsql->conn_list, conn, entry); |
|
545 |
|
546 // success |
|
547 return conn; |
|
548 |
|
549 error: |
|
550 free(conn); |
|
551 |
|
552 return NULL; |
|
553 } |
|
554 |
|
555 struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) { |
|
556 struct evsql *evsql = NULL; |
|
557 |
|
558 // base init |
|
559 if ((evsql = _evsql_new_base (ev_base, error_fn, cb_arg)) == NULL) |
|
560 goto error; |
|
561 |
|
562 // store conf |
|
563 evsql->engine_conf.evpq = pq_conninfo; |
|
564 |
|
565 // pre-create one connection |
|
566 if (_evsql_conn_new(evsql) == NULL) |
|
567 goto error; |
|
568 |
|
569 // done |
|
570 return evsql; |
|
571 |
|
572 error: |
|
573 // XXX: more complicated than this? |
|
574 free(evsql); |
|
575 |
|
576 return NULL; |
|
577 } |
|
578 |
|
579 /* |
|
580 * Checks if the connection is already allocated for some other trans/query. |
|
581 * |
|
582 * Returns: |
|
583 * 0 connection idle, can be allocated |
|
584 * >1 connection busy |
|
585 */ |
|
586 static int _evsql_conn_busy (struct evsql_conn *conn) { |
|
587 // transactions get the connection to themselves |
|
588 if (conn->trans) |
|
589 return 1; |
|
590 |
|
591 // if it has a query assigned, it's busy |
|
592 if (conn->query) |
|
593 return 1; |
|
594 |
|
595 // otherwise, it's all idle |
|
596 return 0; |
|
597 } |
|
598 |
|
599 /* |
|
600 * Checks if the connection is ready for use (i.e. _evsql_evpq_connected was called). |
|
601 * |
|
602 * The connection should not already have a query running. |
|
603 * |
|
604 * Returns |
|
605 * <0 the connection is not valid (failed, query in progress) |
|
606 * 0 the connection is still pending, and will become ready at some point |
|
607 * >0 it's ready |
|
608 */ |
|
609 static int _evsql_conn_ready (struct evsql_conn *conn) { |
|
610 switch (conn->evsql->type) { |
|
611 case EVSQL_EVPQ: { |
|
612 enum evpq_state state = evpq_state(conn->engine.evpq); |
|
613 |
|
614 switch (state) { |
|
615 case EVPQ_CONNECT: |
|
616 return 0; |
|
617 |
|
618 case EVPQ_CONNECTED: |
|
619 return 1; |
|
620 |
|
621 case EVPQ_QUERY: |
|
622 case EVPQ_INIT: |
|
623 case EVPQ_FAILURE: |
|
624 return -1; |
|
625 |
|
626 default: |
|
627 FATAL("evpq_state: %d", state); |
|
628 } |
|
629 |
|
630 } |
|
631 |
|
632 default: |
|
633 FATAL("evsql->type: %d", conn->evsql->type); |
|
634 } |
|
635 } |
|
636 |
|
637 /* |
|
638 * Allocate a connection for use and return it via *conn_ptr, or if may_queue is nonzero and the connection pool is |
|
639 * getting full, return NULL (query should be queued). |
|
640 * |
|
641 * Note that the returned connection might not be ready for use yet (if we created a new one, see _evsql_conn_ready). |
|
642 * |
|
643 * Returns zero if a connection was found or the request should be queued, or nonzero if something failed and the |
|
644 * request should be dropped. |
|
645 */ |
|
646 static int _evsql_conn_get (struct evsql *evsql, struct evsql_conn **conn_ptr, int may_queue) { |
|
647 int have_nontrans = 0; |
|
648 *conn_ptr = NULL; |
|
649 |
|
650 // find a connection that isn't busy and is ready (unless the query queue is empty). |
|
651 LIST_FOREACH(*conn_ptr, &evsql->conn_list, entry) { |
|
652 // we can only have a query enqueue itself if there is a non-trans conn it can later use |
|
653 if (!(*conn_ptr)->trans) |
|
654 have_nontrans = 1; |
|
655 |
|
656 // skip busy conns always |
|
657 if (_evsql_conn_busy(*conn_ptr)) |
|
658 continue; |
|
659 |
|
660 // accept pending conns as long as there are NO enqueued queries (might cause deadlock otherwise) |
|
661 if (_evsql_conn_ready(*conn_ptr) == 0 && TAILQ_EMPTY(&evsql->query_queue)) |
|
662 break; |
|
663 |
|
664 // accept conns that are in a fully ready state |
|
665 if (_evsql_conn_ready(*conn_ptr) > 0) |
|
666 break; |
|
667 } |
|
668 |
|
669 // if we found an idle connection, we can just return that right away |
|
670 if (*conn_ptr) |
|
671 return 0; |
|
672 |
|
673 // return NULL if may_queue and we have a non-trans conn that we can, at some point, use |
|
674 if (may_queue && have_nontrans) |
|
675 return 0; |
|
676 |
|
677 // we need to open a new connection |
|
678 if ((*conn_ptr = _evsql_conn_new(evsql)) == NULL) |
|
679 goto error; |
|
680 |
|
681 // good |
|
682 return 0; |
|
683 error: |
|
684 return -1; |
|
685 } |
|
686 |
|
687 struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, evsql_trans_done_cb done_fn, void *cb_arg) { |
|
688 struct evsql_trans *trans = NULL; |
|
689 |
|
690 // allocate it |
|
691 if ((trans = calloc(1, sizeof(*trans))) == NULL) |
|
692 ERROR("calloc"); |
|
693 |
|
694 // store |
|
695 trans->evsql = evsql; |
|
696 trans->ready_fn = ready_fn; |
|
697 trans->done_fn = done_fn; |
|
698 trans->cb_arg = cb_arg; |
|
699 trans->type = type; |
|
700 |
|
701 // find a connection |
|
702 if (_evsql_conn_get(evsql, &trans->conn, 0)) |
|
703 ERROR("_evsql_conn_get"); |
|
704 |
|
705 // associate the conn |
|
706 trans->conn->trans = trans; |
|
707 |
|
708 // is it already ready? |
|
709 if (_evsql_conn_ready(trans->conn) > 0) { |
|
710 // call _evsql_trans_conn_ready directly, it will handle cleanup (silently, !error_fn) |
|
711 if (_evsql_trans_conn_ready(evsql, trans)) { |
|
712 // return NULL directly |
|
713 return NULL; |
|
714 } |
|
715 |
|
716 } else { |
|
717 // otherwise, wait for the conn to be ready |
|
718 |
|
719 } |
|
720 |
|
721 // and let it pass errors to the user |
|
722 trans->error_fn = error_fn; |
|
723 |
|
724 // ok |
|
725 return trans; |
|
726 |
|
727 error: |
|
728 free(trans); |
|
729 |
|
730 return NULL; |
|
731 } |
|
732 |
|
733 /* |
|
734 * Validate and allocate the basic stuff for a new query. |
|
735 */ |
|
736 static struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) { |
|
737 struct evsql_query *query = NULL; |
|
738 |
|
739 // if it's part of a trans, then make sure the trans is idle |
|
740 if (trans && trans->query) |
|
741 ERROR("transaction is busy"); |
|
742 |
|
743 // allocate it |
|
744 if ((query = calloc(1, sizeof(*query))) == NULL) |
|
745 ERROR("calloc"); |
|
746 |
|
747 // store |
|
748 query->cb_fn = query_fn; |
|
749 query->cb_arg = cb_arg; |
|
750 |
|
751 // success |
|
752 return query; |
|
753 |
|
754 error: |
|
755 return NULL; |
|
756 } |
|
757 |
|
758 /* |
|
759 * Handle a new query. |
|
760 * |
|
761 * For transactions this will associate the query and then execute it, otherwise this will either find an idle |
|
762 * connection and send the query, or enqueue it. |
|
763 */ |
|
764 static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_trans *trans, struct evsql_query *query, const char *command) { |
|
765 // transaction queries are handled differently |
|
766 if (trans) { |
|
767 // it's an in-transaction query |
|
768 assert(trans->query == NULL); |
|
769 |
|
770 // assign the query |
|
771 trans->query = query; |
|
772 |
|
773 // execute directly |
|
774 if (_evsql_query_exec(trans->conn, query, command)) { |
|
775 // ack, fail the transaction |
|
776 _evsql_trans_fail(trans); |
|
777 |
|
778 // caller frees query |
|
779 goto error; |
|
780 } |
|
781 |
|
782 } else { |
|
783 struct evsql_conn *conn; |
|
784 |
|
785 // find an idle connection |
|
786 if ((_evsql_conn_get(evsql, &conn, 1))) |
|
787 ERROR("couldn't allocate a connection for the query"); |
|
788 |
|
789 // we must enqueue if no idle conn or the conn is not yet ready |
|
790 if (conn && _evsql_conn_ready(conn) > 0) { |
|
791 // execute directly |
|
792 if (_evsql_query_exec(conn, query, command)) { |
|
793 // ack, fail the connection |
|
794 _evsql_conn_fail(conn); |
|
795 |
|
796 // make sure we don't deadlock any queries, but if this query got a conn directly, then we shouldn't |
|
797 // have any queries enqueued anyways |
|
798 assert(TAILQ_EMPTY(&evsql->query_queue)); |
|
799 |
|
800 // caller frees query |
|
801 goto error; |
|
802 } |
|
803 |
|
804 } else { |
|
805 // copy the command for later execution |
|
806 if ((query->command = strdup(command)) == NULL) |
|
807 ERROR("strdup"); |
|
808 |
|
809 // enqueue until some connection pumps the queue |
|
810 TAILQ_INSERT_TAIL(&evsql->query_queue, query, entry); |
|
811 } |
|
812 } |
|
813 |
|
814 // ok, good |
|
815 return 0; |
|
816 |
|
817 error: |
|
818 return -1; |
|
819 } |
|
820 |
|
821 struct evsql_query *evsql_query (struct evsql *evsql, struct evsql_trans *trans, const char *command, evsql_query_cb query_fn, void *cb_arg) { |
|
822 struct evsql_query *query = NULL; |
|
823 |
|
824 // alloc new query |
|
825 if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL) |
|
826 goto error; |
|
827 |
|
828 // just execute the command string directly |
|
829 if (_evsql_query_enqueue(evsql, trans, query, command)) |
|
830 goto error; |
|
831 |
|
832 // ok |
|
833 return query; |
|
834 |
|
835 error: |
|
836 _evsql_query_free(query); |
|
837 |
|
838 return NULL; |
|
839 } |
|
840 |
|
841 struct evsql_query *evsql_query_params (struct evsql *evsql, struct evsql_trans *trans, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg) { |
|
842 struct evsql_query *query = NULL; |
|
843 const struct evsql_query_param *param; |
|
844 int idx; |
|
845 |
|
846 // alloc new query |
|
847 if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL) |
|
848 goto error; |
|
849 |
|
850 // count the params |
|
851 for (param = params->list; param->type; param++) |
|
852 query->params.count++; |
|
853 |
|
854 // allocate the vertical storage for the parameters |
|
855 if (0 |
|
856 |
|
857 // !(query->params.types = calloc(query->params.count, sizeof(Oid))) |
|
858 || !(query->params.values = calloc(query->params.count, sizeof(char *))) |
|
859 || !(query->params.lengths = calloc(query->params.count, sizeof(int))) |
|
860 || !(query->params.formats = calloc(query->params.count, sizeof(int))) |
|
861 ) |
|
862 ERROR("calloc"); |
|
863 |
|
864 // transform |
|
865 for (param = params->list, idx = 0; param->type; param++, idx++) { |
|
866 // `types` stays NULL |
|
867 // query->params.types[idx] = 0; |
|
868 |
|
869 // values |
|
870 query->params.values[idx] = param->data_raw; |
|
871 |
|
872 // lengths |
|
873 query->params.lengths[idx] = param->length; |
|
874 |
|
875 // formats, binary if length is nonzero |
|
876 query->params.formats[idx] = param->length ? 1 : 0; |
|
877 } |
|
878 |
|
879 // result format |
|
880 switch (params->result_fmt) { |
|
881 case EVSQL_FMT_TEXT: |
|
882 query->params.result_format = 0; break; |
|
883 |
|
884 case EVSQL_FMT_BINARY: |
|
885 query->params.result_format = 1; break; |
|
886 |
|
887 default: |
|
888 FATAL("params.result_fmt: %d", params->result_fmt); |
|
889 } |
|
890 |
|
891 // execute it |
|
892 if (_evsql_query_enqueue(evsql, trans, query, command)) |
|
893 goto error; |
|
894 |
|
895 // ok |
|
896 return query; |
|
897 |
|
898 error: |
|
899 _evsql_query_free(query); |
|
900 |
|
901 return NULL; |
|
902 } |
|
903 |
|
904 void evsql_query_abort (struct evsql_trans *trans, struct evsql_query *query) { |
|
905 assert(query); |
|
906 |
|
907 if (trans) { |
|
908 // must be the right query |
|
909 assert(trans->query == query); |
|
910 } |
|
911 |
|
912 // just strip the callback and wait for it to complete as normal |
|
913 query->cb_fn = NULL; |
|
914 } |
|
915 |
|
916 void _evsql_trans_commit_res (const struct evsql_result_info *res, void *arg) { |
|
917 (void) arg; |
|
918 |
|
919 assert(res->trans); |
|
920 |
|
921 // check for errors |
|
922 if (res->error) |
|
923 ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res)); |
|
924 |
|
925 // transaction is now done |
|
926 res->trans->done_fn(res->trans, res->trans->cb_arg); |
|
927 |
|
928 // release it |
|
929 _evsql_trans_release(res->trans); |
|
930 |
|
931 // success |
|
932 return; |
|
933 |
|
934 error: |
|
935 _evsql_trans_fail(res->trans); |
|
936 } |
|
937 |
|
938 int evsql_trans_commit (struct evsql_trans *trans) { |
|
939 static const char *sql = "COMMIT TRANSACTION"; |
|
940 |
|
941 if (trans->query) |
|
942 ERROR("cannot COMMIT because transaction is still busy"); |
|
943 |
|
944 // query |
|
945 if (evsql_query(trans->evsql, trans, sql, _evsql_trans_commit_res, NULL) == NULL) |
|
946 goto error; |
|
947 |
|
948 // mark it as commited in case someone wants to abort it |
|
949 trans->has_commit = 1; |
|
950 |
|
951 // success |
|
952 return 0; |
|
953 |
|
954 error: |
|
955 return -1; |
|
956 } |
|
957 |
|
958 void _evsql_trans_rollback_res (const struct evsql_result_info *res, void *arg) { |
|
959 (void) arg; |
|
960 |
|
961 assert(res->trans); |
|
962 |
|
963 // fail the connection on errors |
|
964 if (res->error) |
|
965 ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res)); |
|
966 |
|
967 // release it |
|
968 _evsql_trans_release(res->trans); |
|
969 |
|
970 // success |
|
971 return; |
|
972 |
|
973 error: |
|
974 // fail the connection too, errors are supressed |
|
975 _evsql_trans_fail(res->trans); |
|
976 } |
|
977 |
|
978 /* |
|
979 * Used as the ready_fn callback in case of abort, otherwise directly |
|
980 */ |
|
981 void _evsql_trans_rollback (struct evsql_trans *trans, void *unused) { |
|
982 static const char *sql = "ROLLBACK TRANSACTION"; |
|
983 |
|
984 (void) unused; |
|
985 |
|
986 // query |
|
987 if (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, NULL) == NULL) { |
|
988 // fail the transaction/connection |
|
989 _evsql_trans_fail(trans); |
|
990 } |
|
991 |
|
992 } |
|
993 |
|
994 void evsql_trans_abort (struct evsql_trans *trans) { |
|
995 // supress errors |
|
996 trans->error_fn = NULL; |
|
997 |
|
998 if (trans->has_commit) { |
|
999 // abort after commit doesn't make sense |
|
1000 FATAL("transaction was already commited"); |
|
1001 } |
|
1002 |
|
1003 if (trans->query) { |
|
1004 // gah, some query is running |
|
1005 WARNING("aborting pending query"); |
|
1006 |
|
1007 // prepare to rollback once complete |
|
1008 trans->ready_fn = _evsql_trans_rollback; |
|
1009 |
|
1010 // abort |
|
1011 evsql_query_abort(trans, trans->query); |
|
1012 |
|
1013 } else { |
|
1014 // just rollback directly |
|
1015 _evsql_trans_rollback(trans, NULL); |
|
1016 |
|
1017 } |
|
1018 } |
|
1019 |
|