src/evsql.c
changeset 29 5de62ca9a5aa
parent 28 e944453ca924
child 30 d8fabd347a8e
equal deleted inserted replaced
28:e944453ca924 29:5de62ca9a5aa
     1 #define _GNU_SOURCE
       
     2 #include <stdlib.h>
       
     3 #include <assert.h>
       
     4 #include <string.h>
       
     5 
       
     6 #include "evsql.h"
       
     7 #include "evsql_internal.h"
       
     8 #include "evpq.h"
       
     9 #include "lib/log.h"
       
    10 #include "lib/error.h"
       
    11 #include "lib/misc.h"
       
    12 
       
    13 /*
       
    14  * A couple function prototypes
       
    15  */ 
       
    16 static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn);
       
    17 
       
    18 /*
       
    19  * Actually execute the given query.
       
    20  *
       
    21  * The backend should be able to accept the query at this time.
       
    22  *
       
    23  * You should assume that if trying to execute a query fails, then the connection should also be considred as failed.
       
    24  */
       
    25 static int _evsql_query_exec (struct evsql_conn *conn, struct evsql_query *query, const char *command) {
       
    26     int err;
       
    27 
       
    28     switch (conn->evsql->type) {
       
    29         case EVSQL_EVPQ:
       
    30             // got params?
       
    31             if (query->params.count) {
       
    32                 err = evpq_query_params(conn->engine.evpq, command,
       
    33                     query->params.count, 
       
    34                     query->params.types, 
       
    35                     query->params.values, 
       
    36                     query->params.lengths, 
       
    37                     query->params.formats, 
       
    38                     query->params.result_format
       
    39                 );
       
    40 
       
    41             } else {
       
    42                 // plain 'ole query
       
    43                 err = evpq_query(conn->engine.evpq, command);
       
    44             }
       
    45 
       
    46             if (err) {
       
    47                 if (PQstatus(evpq_pgconn(conn->engine.evpq)) != CONNECTION_OK)
       
    48                     WARNING("conn failed");
       
    49                 else
       
    50                     WARNING("query failed, dropping conn as well");
       
    51             }
       
    52 
       
    53             break;
       
    54         
       
    55         default:
       
    56             FATAL("evsql->type");
       
    57     }
       
    58 
       
    59     if (!err)
       
    60         // assign the query
       
    61         conn->query = query;
       
    62 
       
    63     return err;
       
    64 }
       
    65 
       
    66 /*
       
    67  * Free the query and related resources, doesn't trigger any callbacks or remove from any queues.
       
    68  *
       
    69  * The command should already be taken care of (NULL).
       
    70  */
       
    71 static void _evsql_query_free (struct evsql_query *query) {
       
    72     if (!query)
       
    73         return;
       
    74         
       
    75     assert(query->command == NULL);
       
    76     
       
    77     // free params if present
       
    78     free(query->params.types);
       
    79     free(query->params.values);
       
    80     free(query->params.lengths);
       
    81     free(query->params.formats);
       
    82 
       
    83     // free the query itself
       
    84     free(query);
       
    85 }
       
    86 
       
    87 /*
       
    88  * Execute the callback if res is given, and free the query.
       
    89  *
       
    90  * The query has been aborted, it will simply be freed
       
    91  */
       
    92 static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *res) {
       
    93     if (res) {
       
    94         if (query->cb_fn)
       
    95             // call the callback
       
    96             query->cb_fn(res, query->cb_arg);
       
    97         else
       
    98             WARNING("supressing cb_fn because query was aborted");
       
    99     }
       
   100 
       
   101     // free
       
   102     _evsql_query_free(query);
       
   103 }
       
   104 
       
   105 /*
       
   106  * XXX:
       
   107  * /
       
   108 static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *res) {
       
   109     struct evsql_query *query;
       
   110     
       
   111     // clear the queue
       
   112     while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
       
   113         _evsql_query_done(query, res);
       
   114         
       
   115         TAILQ_REMOVE(&evsql->query_queue, query, entry);
       
   116     }
       
   117     
       
   118     // free
       
   119     free(evsql);
       
   120 }
       
   121 */
       
   122 
       
   123 /*
       
   124  * Free the transaction, it should already be deassociated from the query and conn.
       
   125  */
       
   126 static void _evsql_trans_free (struct evsql_trans *trans) {
       
   127     // ensure we don't leak anything
       
   128     assert(trans->query == NULL);
       
   129     assert(trans->conn == NULL);
       
   130     
       
   131     // free
       
   132     free(trans);
       
   133 }
       
   134 
       
   135 /*
       
   136  * Release a connection. It should already be deassociated from the trans and query.
       
   137  *
       
   138  * Releases the engine, removes from the conn_list and frees this.
       
   139  */
       
   140 static void _evsql_conn_release (struct evsql_conn *conn) {
       
   141     // ensure we don't leak anything
       
   142     assert(conn->trans == NULL);
       
   143     assert(conn->query == NULL);
       
   144 
       
   145     // release the engine
       
   146     switch (conn->evsql->type) {
       
   147         case EVSQL_EVPQ:
       
   148             evpq_release(conn->engine.evpq);
       
   149             break;
       
   150         
       
   151         default:
       
   152             FATAL("evsql->type");
       
   153     }
       
   154     
       
   155     // remove from list
       
   156     LIST_REMOVE(conn, entry);
       
   157     
       
   158     // catch deadlocks
       
   159     assert(!LIST_EMPTY(&conn->evsql->conn_list) || TAILQ_EMPTY(&conn->evsql->query_queue));
       
   160 
       
   161     // free
       
   162     free(conn);
       
   163 }
       
   164 
       
   165 /*
       
   166  * Release a transaction, it should already be deassociated from the query.
       
   167  *
       
   168  * Perform a two-way-deassociation with the conn, and then free the trans.
       
   169  */
       
   170 static void _evsql_trans_release (struct evsql_trans *trans) {
       
   171     assert(trans->query == NULL);
       
   172     assert(trans->conn != NULL);
       
   173 
       
   174     // deassociate the conn
       
   175     trans->conn->trans = NULL; trans->conn = NULL;
       
   176 
       
   177     // free the trans
       
   178     _evsql_trans_free(trans);
       
   179 }
       
   180 
       
   181 /*
       
   182  * Fail a single query, this will trigger the callback and free it.
       
   183  *
       
   184  * NOTE: Only for *TRANSACTIONLESS* queries.
       
   185  */
       
   186 static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) {
       
   187     struct evsql_result_info res; ZINIT(res);
       
   188     
       
   189     // set up the result_info
       
   190     res.evsql = evsql;
       
   191     res.trans = NULL;
       
   192     res.error = 1;
       
   193     
       
   194     // finish off the query
       
   195     _evsql_query_done(query, &res);
       
   196 }
       
   197 
       
   198 /*
       
   199  * Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the
       
   200  * conn, and then free the trans.
       
   201  */ 
       
   202 static void _evsql_trans_fail (struct evsql_trans *trans) {
       
   203     if (trans->query) {
       
   204         // free the query silently
       
   205         _evsql_query_free(trans->query); trans->query = NULL;
       
   206 
       
   207         // also deassociate it from the conn!
       
   208         trans->conn->query = NULL;
       
   209     }
       
   210 
       
   211     // tell the user
       
   212     // XXX: trans is in a bad state during this call
       
   213     if (trans->error_fn)
       
   214         trans->error_fn(trans, trans->cb_arg);
       
   215     else
       
   216         WARNING("supressing error because error_fn was NULL");
       
   217  
       
   218     // deassociate and release the conn
       
   219     trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL;
       
   220 
       
   221     // pump the queue for requests that were waiting for this connection
       
   222     _evsql_pump(trans->evsql, NULL);
       
   223 
       
   224     // free the trans
       
   225     _evsql_trans_free(trans);
       
   226 }
       
   227 
       
   228 /*
       
   229  * Fail a connection. If the connection is transactional, this will just call _evsql_trans_fail, but otherwise it will
       
   230  * fail any ongoing query, and then release the connection.
       
   231  */
       
   232 static void _evsql_conn_fail (struct evsql_conn *conn) {
       
   233     if (conn->trans) {
       
   234         // let transactions handle their connection failures
       
   235         _evsql_trans_fail(conn->trans);
       
   236 
       
   237     } else {
       
   238         if (conn->query) {
       
   239             // fail the in-progress query
       
   240             _evsql_query_fail(conn->evsql, conn->query); conn->query = NULL;
       
   241         }
       
   242 
       
   243         // finish off the whole connection
       
   244         _evsql_conn_release(conn);
       
   245     }
       
   246 }
       
   247 
       
   248 /*
       
   249  * Processes enqueued non-transactional queries until the queue is empty, or we managed to exec a query.
       
   250  *
       
   251  * If execing a query on a connection fails, both the query and the connection are failed (in that order).
       
   252  *
       
   253  * Any further queries will then also be failed, because there's no reconnection/retry logic yet.
       
   254  *
       
   255  * This means that if conn is NULL, all queries are failed.
       
   256  */
       
   257 static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) {
       
   258     struct evsql_query *query;
       
   259     int err;
       
   260     
       
   261     // look for waiting queries
       
   262     while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
       
   263         // dequeue
       
   264         TAILQ_REMOVE(&evsql->query_queue, query, entry);
       
   265         
       
   266         if (conn) {
       
   267             // try and execute it
       
   268             err = _evsql_query_exec(conn, query, query->command);
       
   269         }
       
   270 
       
   271         // free the command buf
       
   272         free(query->command); query->command = NULL;
       
   273 
       
   274         if (err || !conn) {
       
   275             if (!conn) {
       
   276                 // warn when dropping queries
       
   277                 WARNING("failing query becuse there are no conns");
       
   278             }
       
   279 
       
   280             // fail the query
       
   281             _evsql_query_fail(evsql, query);
       
   282             
       
   283             if (conn) {
       
   284                 // fail the connection
       
   285                 WARNING("failing the connection because a query-exec failed");
       
   286 
       
   287                 _evsql_conn_fail(conn); conn = NULL;
       
   288             }
       
   289 
       
   290         } else {
       
   291             // we have succesfully enqueued a query, and we can wait for this connection to complete
       
   292             break;
       
   293 
       
   294         }
       
   295 
       
   296         // handle the rest of the queue
       
   297     }
       
   298     
       
   299     // ok
       
   300     return;
       
   301 }
       
   302 
       
   303 /*
       
   304  * Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use.
       
   305  */
       
   306 static void _evsql_trans_ready (const struct evsql_result_info *res, void *arg) {
       
   307     (void) arg;
       
   308 
       
   309     assert(res->trans);
       
   310 
       
   311     // check for errors
       
   312     if (res->error)
       
   313         ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res));
       
   314     
       
   315     // transaction is now ready for use
       
   316     res->trans->ready_fn(res->trans, res->trans->cb_arg);
       
   317     
       
   318     // good
       
   319     return;
       
   320 
       
   321 error:
       
   322     _evsql_trans_fail(res->trans);
       
   323 }
       
   324 
       
   325 /*
       
   326  * The transaction's connection is ready, send the 'BEGIN' query.
       
   327  *
       
   328  * If anything fails, calls _evsql_trans_fail and returns nonzero, zero on success
       
   329  */
       
   330 static int _evsql_trans_conn_ready (struct evsql *evsql, struct evsql_trans *trans) {
       
   331     char trans_sql[EVSQL_QUERY_BEGIN_BUF];
       
   332     const char *isolation_level;
       
   333     int ret;
       
   334     
       
   335     // determine the isolation_level to use
       
   336     switch (trans->type) {
       
   337         case EVSQL_TRANS_DEFAULT:
       
   338             isolation_level = NULL; break;
       
   339 
       
   340         case EVSQL_TRANS_SERIALIZABLE:
       
   341             isolation_level = "SERIALIZABLE"; break;
       
   342 
       
   343         case EVSQL_TRANS_REPEATABLE_READ:
       
   344             isolation_level = "REPEATABLE READ"; break;
       
   345 
       
   346         case EVSQL_TRANS_READ_COMMITTED:
       
   347             isolation_level = "READ COMMITTED"; break;
       
   348 
       
   349         case EVSQL_TRANS_READ_UNCOMMITTED:
       
   350             isolation_level = "READ UNCOMMITTED"; break;
       
   351 
       
   352         default:
       
   353             FATAL("trans->type: %d", trans->type);
       
   354     }
       
   355     
       
   356     // build the trans_sql
       
   357     if (isolation_level)
       
   358         ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION ISOLATION LEVEL %s", isolation_level);
       
   359     else
       
   360         ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION");
       
   361     
       
   362     // make sure it wasn't truncated
       
   363     if (ret >= EVSQL_QUERY_BEGIN_BUF)
       
   364         ERROR("trans_sql overflow: %d >= %d", ret, EVSQL_QUERY_BEGIN_BUF);
       
   365     
       
   366     // execute the query
       
   367     if (evsql_query(evsql, trans, trans_sql, _evsql_trans_ready, NULL) == NULL)
       
   368         ERROR("evsql_query");
       
   369     
       
   370     // success
       
   371     return 0;
       
   372 
       
   373 error:
       
   374     // fail the transaction
       
   375     _evsql_trans_fail(trans);
       
   376 
       
   377     return -1;
       
   378 }
       
   379 
       
   380 /*
       
   381  * The evpq connection was succesfully established.
       
   382  */ 
       
   383 static void _evsql_evpq_connected (struct evpq_conn *_conn, void *arg) {
       
   384     struct evsql_conn *conn = arg;
       
   385 
       
   386     if (conn->trans)
       
   387         // notify the transaction
       
   388         // don't care about errors
       
   389         (void) _evsql_trans_conn_ready(conn->evsql, conn->trans);
       
   390     
       
   391     else
       
   392         // pump any waiting transactionless queries
       
   393         _evsql_pump(conn->evsql, conn);
       
   394 }
       
   395 
       
   396 /*
       
   397  * Got one result on this evpq connection.
       
   398  */
       
   399 static void _evsql_evpq_result (struct evpq_conn *_conn, PGresult *result, void *arg) {
       
   400     struct evsql_conn *conn = arg;
       
   401     struct evsql_query *query = conn->query;
       
   402 
       
   403     assert(query != NULL);
       
   404 
       
   405     // if we get multiple results, only return the first one
       
   406     if (query->result.evpq) {
       
   407         WARNING("[evsql] evpq query returned multiple results, discarding previous one");
       
   408         
       
   409         PQclear(query->result.evpq); query->result.evpq = NULL;
       
   410     }
       
   411     
       
   412     // remember the result
       
   413     query->result.evpq = result;
       
   414 }
       
   415 
       
   416 /*
       
   417  * No more results for this query.
       
   418  */
       
   419 static void _evsql_evpq_done (struct evpq_conn *_conn, void *arg) {
       
   420     struct evsql_conn *conn = arg;
       
   421     struct evsql_query *query = conn->query;
       
   422     struct evsql_result_info res; ZINIT(res);
       
   423     
       
   424     assert(query != NULL);
       
   425     
       
   426     // set up the result_info
       
   427     res.evsql = conn->evsql;
       
   428     res.trans = conn->trans;
       
   429     
       
   430     if (query->result.evpq == NULL) {
       
   431         // if a query didn't return any results (bug?), warn and fail the query
       
   432         WARNING("[evsql] evpq query didn't return any results");
       
   433 
       
   434         res.error = 1;
       
   435     
       
   436     } else if (strcmp(PQresultErrorMessage(query->result.evpq), "") != 0) {
       
   437         // the query failed with some error
       
   438         res.error = 1;
       
   439         res.result.pq = query->result.evpq;
       
   440 
       
   441     } else {
       
   442         res.error = 0;
       
   443         res.result.pq = query->result.evpq;
       
   444 
       
   445     }
       
   446 
       
   447     // de-associate the query from the connection
       
   448     conn->query = NULL;
       
   449     
       
   450     // how we handle query completion depends on if we're a transaction or not
       
   451     if (conn->trans) {
       
   452         // we can deassign the trans's query
       
   453         conn->trans->query = NULL;
       
   454 
       
   455         // was an abort?
       
   456         if (!query->cb_fn)
       
   457             // notify the user that the transaction query has been aborted
       
   458             conn->trans->ready_fn(conn->trans, conn->trans->cb_arg);
       
   459 
       
   460         // then hand the query to the user
       
   461         _evsql_query_done(query, &res);
       
   462         
       
   463     } else {
       
   464         // a transactionless query, so just finish it off and pump any other waiting ones
       
   465         _evsql_query_done(query, &res);
       
   466 
       
   467         // pump the next one
       
   468         _evsql_pump(conn->evsql, conn);
       
   469     }
       
   470 }
       
   471 
       
   472 /*
       
   473  * The connection failed.
       
   474  */
       
   475 static void _evsql_evpq_failure (struct evpq_conn *_conn, void *arg) {
       
   476     struct evsql_conn *conn = arg;
       
   477     
       
   478     // just fail the conn
       
   479     _evsql_conn_fail(conn);
       
   480 }
       
   481 
       
   482 /*
       
   483  * Our evpq behaviour
       
   484  */
       
   485 static struct evpq_callback_info _evsql_evpq_cb_info = {
       
   486     .fn_connected       = _evsql_evpq_connected,
       
   487     .fn_result          = _evsql_evpq_result,
       
   488     .fn_done            = _evsql_evpq_done,
       
   489     .fn_failure         = _evsql_evpq_failure,
       
   490 };
       
   491 
       
   492 /*
       
   493  * Allocate the generic evsql context.
       
   494  */
       
   495 static struct evsql *_evsql_new_base (struct event_base *ev_base, evsql_error_cb error_fn, void *cb_arg) {
       
   496     struct evsql *evsql = NULL;
       
   497     
       
   498     // allocate it
       
   499     if ((evsql = calloc(1, sizeof(*evsql))) == NULL)
       
   500         ERROR("calloc");
       
   501 
       
   502     // store
       
   503     evsql->ev_base = ev_base;
       
   504     evsql->error_fn = error_fn;
       
   505     evsql->cb_arg = cb_arg;
       
   506 
       
   507     // init
       
   508     LIST_INIT(&evsql->conn_list);
       
   509     TAILQ_INIT(&evsql->query_queue);
       
   510 
       
   511     // done
       
   512     return evsql;
       
   513 
       
   514 error:
       
   515     return NULL;
       
   516 }
       
   517 
       
   518 /*
       
   519  * Start a new connection and add it to the list, it won't be ready until _evsql_evpq_connected is called
       
   520  */
       
   521 static struct evsql_conn *_evsql_conn_new (struct evsql *evsql) {
       
   522     struct evsql_conn *conn = NULL;
       
   523     
       
   524     // allocate
       
   525     if ((conn = calloc(1, sizeof(*conn))) == NULL)
       
   526         ERROR("calloc");
       
   527 
       
   528     // init
       
   529     conn->evsql = evsql;
       
   530     
       
   531     // connect the engine
       
   532     switch (evsql->type) {
       
   533         case EVSQL_EVPQ:
       
   534             if ((conn->engine.evpq = evpq_connect(evsql->ev_base, evsql->engine_conf.evpq, _evsql_evpq_cb_info, conn)) == NULL)
       
   535                 goto error;
       
   536             
       
   537             break;
       
   538             
       
   539         default:
       
   540             FATAL("evsql->type");
       
   541     }
       
   542 
       
   543     // add it to the list
       
   544     LIST_INSERT_HEAD(&evsql->conn_list, conn, entry);
       
   545 
       
   546     // success
       
   547     return conn;
       
   548 
       
   549 error:
       
   550     free(conn);
       
   551 
       
   552     return NULL;
       
   553 }
       
   554 
       
   555 struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) {
       
   556     struct evsql *evsql = NULL;
       
   557     
       
   558     // base init
       
   559     if ((evsql = _evsql_new_base (ev_base, error_fn, cb_arg)) == NULL)
       
   560         goto error;
       
   561 
       
   562     // store conf
       
   563     evsql->engine_conf.evpq = pq_conninfo;
       
   564 
       
   565     // pre-create one connection
       
   566     if (_evsql_conn_new(evsql) == NULL)
       
   567         goto error;
       
   568 
       
   569     // done
       
   570     return evsql;
       
   571 
       
   572 error:
       
   573     // XXX: more complicated than this?
       
   574     free(evsql); 
       
   575 
       
   576     return NULL;
       
   577 }
       
   578 
       
   579 /*
       
   580  * Checks if the connection is already allocated for some other trans/query.
       
   581  *
       
   582  * Returns:
       
   583  *      0       connection idle, can be allocated
       
   584  *      >1      connection busy
       
   585  */
       
   586 static int _evsql_conn_busy (struct evsql_conn *conn) {
       
   587     // transactions get the connection to themselves
       
   588     if (conn->trans)
       
   589         return 1;
       
   590     
       
   591     // if it has a query assigned, it's busy
       
   592     if (conn->query)
       
   593         return 1;
       
   594 
       
   595     // otherwise, it's all idle
       
   596     return 0;
       
   597 }
       
   598 
       
   599 /*
       
   600  * Checks if the connection is ready for use (i.e. _evsql_evpq_connected was called).
       
   601  *
       
   602  * The connection should not already have a query running.
       
   603  *
       
   604  * Returns 
       
   605  *  <0  the connection is not valid (failed, query in progress)
       
   606  *  0   the connection is still pending, and will become ready at some point
       
   607  *  >0  it's ready
       
   608  */
       
   609 static int _evsql_conn_ready (struct evsql_conn *conn) {
       
   610     switch (conn->evsql->type) {
       
   611         case EVSQL_EVPQ: {
       
   612             enum evpq_state state = evpq_state(conn->engine.evpq);
       
   613             
       
   614             switch (state) {
       
   615                 case EVPQ_CONNECT:
       
   616                     return 0;
       
   617                 
       
   618                 case EVPQ_CONNECTED:
       
   619                     return 1;
       
   620 
       
   621                 case EVPQ_QUERY:
       
   622                 case EVPQ_INIT:
       
   623                 case EVPQ_FAILURE:
       
   624                     return -1;
       
   625                 
       
   626                 default:
       
   627                     FATAL("evpq_state: %d", state);
       
   628             }
       
   629 
       
   630         }
       
   631         
       
   632         default:
       
   633             FATAL("evsql->type: %d", conn->evsql->type);
       
   634     }
       
   635 }
       
   636 
       
   637 /*
       
   638  * Allocate a connection for use and return it via *conn_ptr, or if may_queue is nonzero and the connection pool is
       
   639  * getting full, return NULL (query should be queued).
       
   640  *
       
   641  * Note that the returned connection might not be ready for use yet (if we created a new one, see _evsql_conn_ready).
       
   642  *
       
   643  * Returns zero if a connection was found or the request should be queued, or nonzero if something failed and the
       
   644  * request should be dropped.
       
   645  */
       
   646 static int _evsql_conn_get (struct evsql *evsql, struct evsql_conn **conn_ptr, int may_queue) {
       
   647     int have_nontrans = 0;
       
   648     *conn_ptr = NULL;
       
   649     
       
   650     // find a connection that isn't busy and is ready (unless the query queue is empty).
       
   651     LIST_FOREACH(*conn_ptr, &evsql->conn_list, entry) {
       
   652         // we can only have a query enqueue itself if there is a non-trans conn it can later use
       
   653         if (!(*conn_ptr)->trans)
       
   654             have_nontrans = 1;
       
   655 
       
   656         // skip busy conns always
       
   657         if (_evsql_conn_busy(*conn_ptr))
       
   658             continue;
       
   659         
       
   660         // accept pending conns as long as there are NO enqueued queries (might cause deadlock otherwise)
       
   661         if (_evsql_conn_ready(*conn_ptr) == 0 && TAILQ_EMPTY(&evsql->query_queue))
       
   662             break;
       
   663 
       
   664         // accept conns that are in a fully ready state
       
   665         if (_evsql_conn_ready(*conn_ptr) > 0)
       
   666             break;
       
   667     }
       
   668     
       
   669     // if we found an idle connection, we can just return that right away
       
   670     if (*conn_ptr)
       
   671         return 0;
       
   672 
       
   673     // return NULL if may_queue and we have a non-trans conn that we can, at some point, use
       
   674     if (may_queue && have_nontrans)
       
   675         return 0;
       
   676     
       
   677     // we need to open a new connection
       
   678     if ((*conn_ptr = _evsql_conn_new(evsql)) == NULL)
       
   679         goto error;
       
   680 
       
   681     // good
       
   682     return 0;
       
   683 error:
       
   684     return -1;
       
   685 }
       
   686 
       
   687 struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, evsql_trans_done_cb done_fn, void *cb_arg) {
       
   688     struct evsql_trans *trans = NULL;
       
   689 
       
   690     // allocate it
       
   691     if ((trans = calloc(1, sizeof(*trans))) == NULL)
       
   692         ERROR("calloc");
       
   693 
       
   694     // store
       
   695     trans->evsql = evsql;
       
   696     trans->ready_fn = ready_fn;
       
   697     trans->done_fn = done_fn;
       
   698     trans->cb_arg = cb_arg;
       
   699     trans->type = type;
       
   700 
       
   701     // find a connection
       
   702     if (_evsql_conn_get(evsql, &trans->conn, 0))
       
   703         ERROR("_evsql_conn_get");
       
   704 
       
   705     // associate the conn
       
   706     trans->conn->trans = trans;
       
   707 
       
   708     // is it already ready?
       
   709     if (_evsql_conn_ready(trans->conn) > 0) {
       
   710         // call _evsql_trans_conn_ready directly, it will handle cleanup (silently, !error_fn)
       
   711         if (_evsql_trans_conn_ready(evsql, trans)) {
       
   712             // return NULL directly
       
   713             return NULL;
       
   714         }
       
   715 
       
   716     } else {
       
   717         // otherwise, wait for the conn to be ready
       
   718          
       
   719     }
       
   720     
       
   721     // and let it pass errors to the user
       
   722     trans->error_fn = error_fn;
       
   723 
       
   724     // ok
       
   725     return trans;
       
   726 
       
   727 error:
       
   728     free(trans);
       
   729 
       
   730     return NULL;
       
   731 }
       
   732 
       
   733 /*
       
   734  * Validate and allocate the basic stuff for a new query.
       
   735  */
       
   736 static struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) {
       
   737     struct evsql_query *query = NULL;
       
   738     
       
   739     // if it's part of a trans, then make sure the trans is idle
       
   740     if (trans && trans->query)
       
   741         ERROR("transaction is busy");
       
   742 
       
   743     // allocate it
       
   744     if ((query = calloc(1, sizeof(*query))) == NULL)
       
   745         ERROR("calloc");
       
   746 
       
   747     // store
       
   748     query->cb_fn = query_fn;
       
   749     query->cb_arg = cb_arg;
       
   750 
       
   751     // success
       
   752     return query;
       
   753 
       
   754 error:
       
   755     return NULL;
       
   756 }
       
   757 
       
   758 /*
       
   759  * Handle a new query.
       
   760  *
       
   761  * For transactions this will associate the query and then execute it, otherwise this will either find an idle
       
   762  * connection and send the query, or enqueue it.
       
   763  */
       
   764 static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_trans *trans, struct evsql_query *query, const char *command) {
       
   765     // transaction queries are handled differently
       
   766     if (trans) {
       
   767         // it's an in-transaction query
       
   768         assert(trans->query == NULL);
       
   769         
       
   770         // assign the query
       
   771         trans->query = query;
       
   772 
       
   773         // execute directly
       
   774         if (_evsql_query_exec(trans->conn, query, command)) {
       
   775             // ack, fail the transaction
       
   776             _evsql_trans_fail(trans);
       
   777             
       
   778             // caller frees query
       
   779             goto error;
       
   780         }
       
   781 
       
   782     } else {
       
   783         struct evsql_conn *conn;
       
   784         
       
   785         // find an idle connection
       
   786         if ((_evsql_conn_get(evsql, &conn, 1)))
       
   787             ERROR("couldn't allocate a connection for the query");
       
   788 
       
   789         // we must enqueue if no idle conn or the conn is not yet ready
       
   790         if (conn && _evsql_conn_ready(conn) > 0) {
       
   791             // execute directly
       
   792             if (_evsql_query_exec(conn, query, command)) {
       
   793                 // ack, fail the connection
       
   794                 _evsql_conn_fail(conn);
       
   795                 
       
   796                 // make sure we don't deadlock any queries, but if this query got a conn directly, then we shouldn't
       
   797                 // have any queries enqueued anyways
       
   798                 assert(TAILQ_EMPTY(&evsql->query_queue));
       
   799                 
       
   800                 // caller frees query
       
   801                 goto error;
       
   802             }
       
   803 
       
   804         } else {
       
   805             // copy the command for later execution
       
   806             if ((query->command = strdup(command)) == NULL)
       
   807                 ERROR("strdup");
       
   808             
       
   809             // enqueue until some connection pumps the queue
       
   810             TAILQ_INSERT_TAIL(&evsql->query_queue, query, entry);
       
   811         }
       
   812     }
       
   813 
       
   814     // ok, good
       
   815     return 0;
       
   816 
       
   817 error:
       
   818     return -1;
       
   819 }
       
   820 
       
   821 struct evsql_query *evsql_query (struct evsql *evsql, struct evsql_trans *trans, const char *command, evsql_query_cb query_fn, void *cb_arg) {
       
   822     struct evsql_query *query = NULL;
       
   823     
       
   824     // alloc new query
       
   825     if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL)
       
   826         goto error;
       
   827     
       
   828     // just execute the command string directly
       
   829     if (_evsql_query_enqueue(evsql, trans, query, command))
       
   830         goto error;
       
   831 
       
   832     // ok
       
   833     return query;
       
   834 
       
   835 error:
       
   836     _evsql_query_free(query);
       
   837 
       
   838     return NULL;
       
   839 }
       
   840 
       
   841 struct evsql_query *evsql_query_params (struct evsql *evsql, struct evsql_trans *trans, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg) {
       
   842     struct evsql_query *query = NULL;
       
   843     const struct evsql_query_param *param;
       
   844     int idx;
       
   845     
       
   846     // alloc new query
       
   847     if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL)
       
   848         goto error;
       
   849 
       
   850     // count the params
       
   851     for (param = params->list; param->type; param++) 
       
   852         query->params.count++;
       
   853 
       
   854     // allocate the vertical storage for the parameters
       
   855     if (0
       
   856         
       
   857 //            !(query->params.types    = calloc(query->params.count, sizeof(Oid)))
       
   858         ||  !(query->params.values   = calloc(query->params.count, sizeof(char *)))
       
   859         ||  !(query->params.lengths  = calloc(query->params.count, sizeof(int)))
       
   860         ||  !(query->params.formats  = calloc(query->params.count, sizeof(int)))
       
   861     )
       
   862         ERROR("calloc");
       
   863 
       
   864     // transform
       
   865     for (param = params->list, idx = 0; param->type; param++, idx++) {
       
   866         // `types` stays NULL
       
   867         // query->params.types[idx] = 0;
       
   868         
       
   869         // values
       
   870         query->params.values[idx] = param->data_raw;
       
   871 
       
   872         // lengths
       
   873         query->params.lengths[idx] = param->length;
       
   874 
       
   875         // formats, binary if length is nonzero
       
   876         query->params.formats[idx] = param->length ? 1 : 0;
       
   877     }
       
   878 
       
   879     // result format
       
   880     switch (params->result_fmt) {
       
   881         case EVSQL_FMT_TEXT:
       
   882             query->params.result_format = 0; break;
       
   883 
       
   884         case EVSQL_FMT_BINARY:
       
   885             query->params.result_format = 1; break;
       
   886 
       
   887         default:
       
   888             FATAL("params.result_fmt: %d", params->result_fmt);
       
   889     }
       
   890 
       
   891     // execute it
       
   892     if (_evsql_query_enqueue(evsql, trans, query, command))
       
   893         goto error;
       
   894 
       
   895     // ok
       
   896     return query;
       
   897 
       
   898 error:
       
   899     _evsql_query_free(query);
       
   900     
       
   901     return NULL;
       
   902 }
       
   903 
       
   904 void evsql_query_abort (struct evsql_trans *trans, struct evsql_query *query) {
       
   905     assert(query);
       
   906 
       
   907     if (trans) {
       
   908         // must be the right query
       
   909         assert(trans->query == query);
       
   910     }
       
   911 
       
   912     // just strip the callback and wait for it to complete as normal
       
   913     query->cb_fn = NULL;
       
   914 }
       
   915 
       
   916 void _evsql_trans_commit_res (const struct evsql_result_info *res, void *arg) {
       
   917     (void) arg;
       
   918 
       
   919     assert(res->trans);
       
   920 
       
   921     // check for errors
       
   922     if (res->error)
       
   923         ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res));
       
   924     
       
   925     // transaction is now done
       
   926     res->trans->done_fn(res->trans, res->trans->cb_arg);
       
   927     
       
   928     // release it
       
   929     _evsql_trans_release(res->trans);
       
   930 
       
   931     // success
       
   932     return;
       
   933 
       
   934 error:
       
   935     _evsql_trans_fail(res->trans);
       
   936 }
       
   937 
       
   938 int evsql_trans_commit (struct evsql_trans *trans) {
       
   939     static const char *sql = "COMMIT TRANSACTION";
       
   940 
       
   941     if (trans->query)
       
   942         ERROR("cannot COMMIT because transaction is still busy");
       
   943     
       
   944     // query
       
   945     if (evsql_query(trans->evsql, trans, sql, _evsql_trans_commit_res, NULL) == NULL)
       
   946         goto error;
       
   947     
       
   948     // mark it as commited in case someone wants to abort it
       
   949     trans->has_commit = 1;
       
   950 
       
   951     // success
       
   952     return 0;
       
   953 
       
   954 error:
       
   955     return -1;
       
   956 }
       
   957 
       
   958 void _evsql_trans_rollback_res (const struct evsql_result_info *res, void *arg) {
       
   959     (void) arg;
       
   960 
       
   961     assert(res->trans);
       
   962 
       
   963     // fail the connection on errors
       
   964     if (res->error)
       
   965         ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res));
       
   966 
       
   967     // release it
       
   968     _evsql_trans_release(res->trans);
       
   969 
       
   970     // success
       
   971     return;
       
   972 
       
   973 error:
       
   974     // fail the connection too, errors are supressed
       
   975     _evsql_trans_fail(res->trans);
       
   976 }
       
   977 
       
   978 /*
       
   979  * Used as the ready_fn callback in case of abort, otherwise directly
       
   980  */
       
   981 void _evsql_trans_rollback (struct evsql_trans *trans, void *unused) {
       
   982     static const char *sql = "ROLLBACK TRANSACTION";
       
   983 
       
   984     (void) unused;
       
   985 
       
   986     // query
       
   987     if (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, NULL) == NULL) {
       
   988         // fail the transaction/connection
       
   989         _evsql_trans_fail(trans);
       
   990     }
       
   991 
       
   992 }
       
   993 
       
   994 void evsql_trans_abort (struct evsql_trans *trans) {
       
   995     // supress errors
       
   996     trans->error_fn = NULL;
       
   997     
       
   998     if (trans->has_commit) {
       
   999         // abort after commit doesn't make sense
       
  1000         FATAL("transaction was already commited");
       
  1001     }
       
  1002 
       
  1003     if (trans->query) {
       
  1004         // gah, some query is running
       
  1005         WARNING("aborting pending query");
       
  1006         
       
  1007         // prepare to rollback once complete
       
  1008         trans->ready_fn = _evsql_trans_rollback;
       
  1009         
       
  1010         // abort
       
  1011         evsql_query_abort(trans, trans->query);
       
  1012 
       
  1013     } else {
       
  1014         // just rollback directly
       
  1015         _evsql_trans_rollback(trans, NULL);
       
  1016 
       
  1017     }
       
  1018 }
       
  1019