src/evsql/evsql.c
changeset 29 5de62ca9a5aa
child 31 7804cd7b5cd5
equal deleted inserted replaced
28:e944453ca924 29:5de62ca9a5aa
       
     1 #define _GNU_SOURCE
       
     2 #include <stdlib.h>
       
     3 #include <assert.h>
       
     4 #include <string.h>
       
     5 
       
     6 #include "evsql.h"
       
     7 #include "../lib/log.h"
       
     8 #include "../lib/error.h"
       
     9 #include "../lib/misc.h"
       
    10 
       
    11 /*
       
    12  * A couple function prototypes
       
    13  */ 
       
    14 static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn);
       
    15 
       
    16 /*
       
    17  * Actually execute the given query.
       
    18  *
       
    19  * The backend should be able to accept the query at this time.
       
    20  *
       
    21  * You should assume that if trying to execute a query fails, then the connection should also be considred as failed.
       
    22  */
       
    23 static int _evsql_query_exec (struct evsql_conn *conn, struct evsql_query *query, const char *command) {
       
    24     int err;
       
    25 
       
    26     switch (conn->evsql->type) {
       
    27         case EVSQL_EVPQ:
       
    28             // got params?
       
    29             if (query->params.count) {
       
    30                 err = evpq_query_params(conn->engine.evpq, command,
       
    31                     query->params.count, 
       
    32                     query->params.types, 
       
    33                     query->params.values, 
       
    34                     query->params.lengths, 
       
    35                     query->params.formats, 
       
    36                     query->params.result_format
       
    37                 );
       
    38 
       
    39             } else {
       
    40                 // plain 'ole query
       
    41                 err = evpq_query(conn->engine.evpq, command);
       
    42             }
       
    43 
       
    44             if (err) {
       
    45                 if (PQstatus(evpq_pgconn(conn->engine.evpq)) != CONNECTION_OK)
       
    46                     WARNING("conn failed");
       
    47                 else
       
    48                     WARNING("query failed, dropping conn as well");
       
    49             }
       
    50 
       
    51             break;
       
    52         
       
    53         default:
       
    54             FATAL("evsql->type");
       
    55     }
       
    56 
       
    57     if (!err)
       
    58         // assign the query
       
    59         conn->query = query;
       
    60 
       
    61     return err;
       
    62 }
       
    63 
       
    64 /*
       
    65  * Free the query and related resources, doesn't trigger any callbacks or remove from any queues.
       
    66  *
       
    67  * The command should already be taken care of (NULL).
       
    68  */
       
    69 static void _evsql_query_free (struct evsql_query *query) {
       
    70     if (!query)
       
    71         return;
       
    72         
       
    73     assert(query->command == NULL);
       
    74     
       
    75     // free params if present
       
    76     free(query->params.types);
       
    77     free(query->params.values);
       
    78     free(query->params.lengths);
       
    79     free(query->params.formats);
       
    80 
       
    81     // free the query itself
       
    82     free(query);
       
    83 }
       
    84 
       
    85 /*
       
    86  * Execute the callback if res is given, and free the query.
       
    87  *
       
    88  * The query has been aborted, it will simply be freed
       
    89  */
       
    90 static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *res) {
       
    91     if (res) {
       
    92         if (query->cb_fn)
       
    93             // call the callback
       
    94             query->cb_fn(res, query->cb_arg);
       
    95         else
       
    96             WARNING("supressing cb_fn because query was aborted");
       
    97     }
       
    98 
       
    99     // free
       
   100     _evsql_query_free(query);
       
   101 }
       
   102 
       
   103 /*
       
   104  * XXX:
       
   105  * /
       
   106 static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *res) {
       
   107     struct evsql_query *query;
       
   108     
       
   109     // clear the queue
       
   110     while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
       
   111         _evsql_query_done(query, res);
       
   112         
       
   113         TAILQ_REMOVE(&evsql->query_queue, query, entry);
       
   114     }
       
   115     
       
   116     // free
       
   117     free(evsql);
       
   118 }
       
   119 */
       
   120 
       
   121 /*
       
   122  * Free the transaction, it should already be deassociated from the query and conn.
       
   123  */
       
   124 static void _evsql_trans_free (struct evsql_trans *trans) {
       
   125     // ensure we don't leak anything
       
   126     assert(trans->query == NULL);
       
   127     assert(trans->conn == NULL);
       
   128     
       
   129     // free
       
   130     free(trans);
       
   131 }
       
   132 
       
   133 /*
       
   134  * Release a connection. It should already be deassociated from the trans and query.
       
   135  *
       
   136  * Releases the engine, removes from the conn_list and frees this.
       
   137  */
       
   138 static void _evsql_conn_release (struct evsql_conn *conn) {
       
   139     // ensure we don't leak anything
       
   140     assert(conn->trans == NULL);
       
   141     assert(conn->query == NULL);
       
   142 
       
   143     // release the engine
       
   144     switch (conn->evsql->type) {
       
   145         case EVSQL_EVPQ:
       
   146             evpq_release(conn->engine.evpq);
       
   147             break;
       
   148         
       
   149         default:
       
   150             FATAL("evsql->type");
       
   151     }
       
   152     
       
   153     // remove from list
       
   154     LIST_REMOVE(conn, entry);
       
   155     
       
   156     // catch deadlocks
       
   157     assert(!LIST_EMPTY(&conn->evsql->conn_list) || TAILQ_EMPTY(&conn->evsql->query_queue));
       
   158 
       
   159     // free
       
   160     free(conn);
       
   161 }
       
   162 
       
   163 /*
       
   164  * Release a transaction, it should already be deassociated from the query.
       
   165  *
       
   166  * Perform a two-way-deassociation with the conn, and then free the trans.
       
   167  */
       
   168 static void _evsql_trans_release (struct evsql_trans *trans) {
       
   169     assert(trans->query == NULL);
       
   170     assert(trans->conn != NULL);
       
   171 
       
   172     // deassociate the conn
       
   173     trans->conn->trans = NULL; trans->conn = NULL;
       
   174 
       
   175     // free the trans
       
   176     _evsql_trans_free(trans);
       
   177 }
       
   178 
       
   179 /*
       
   180  * Fail a single query, this will trigger the callback and free it.
       
   181  *
       
   182  * NOTE: Only for *TRANSACTIONLESS* queries.
       
   183  */
       
   184 static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) {
       
   185     struct evsql_result_info res; ZINIT(res);
       
   186     
       
   187     // set up the result_info
       
   188     res.evsql = evsql;
       
   189     res.trans = NULL;
       
   190     res.error = 1;
       
   191     
       
   192     // finish off the query
       
   193     _evsql_query_done(query, &res);
       
   194 }
       
   195 
       
   196 /*
       
   197  * Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the
       
   198  * conn, and then free the trans.
       
   199  */ 
       
   200 static void _evsql_trans_fail (struct evsql_trans *trans) {
       
   201     if (trans->query) {
       
   202         // free the query silently
       
   203         _evsql_query_free(trans->query); trans->query = NULL;
       
   204 
       
   205         // also deassociate it from the conn!
       
   206         trans->conn->query = NULL;
       
   207     }
       
   208 
       
   209     // tell the user
       
   210     // XXX: trans is in a bad state during this call
       
   211     if (trans->error_fn)
       
   212         trans->error_fn(trans, trans->cb_arg);
       
   213     else
       
   214         WARNING("supressing error because error_fn was NULL");
       
   215  
       
   216     // deassociate and release the conn
       
   217     trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL;
       
   218 
       
   219     // pump the queue for requests that were waiting for this connection
       
   220     _evsql_pump(trans->evsql, NULL);
       
   221 
       
   222     // free the trans
       
   223     _evsql_trans_free(trans);
       
   224 }
       
   225 
       
   226 /*
       
   227  * Fail a connection. If the connection is transactional, this will just call _evsql_trans_fail, but otherwise it will
       
   228  * fail any ongoing query, and then release the connection.
       
   229  */
       
   230 static void _evsql_conn_fail (struct evsql_conn *conn) {
       
   231     if (conn->trans) {
       
   232         // let transactions handle their connection failures
       
   233         _evsql_trans_fail(conn->trans);
       
   234 
       
   235     } else {
       
   236         if (conn->query) {
       
   237             // fail the in-progress query
       
   238             _evsql_query_fail(conn->evsql, conn->query); conn->query = NULL;
       
   239         }
       
   240 
       
   241         // finish off the whole connection
       
   242         _evsql_conn_release(conn);
       
   243     }
       
   244 }
       
   245 
       
   246 /*
       
   247  * Processes enqueued non-transactional queries until the queue is empty, or we managed to exec a query.
       
   248  *
       
   249  * If execing a query on a connection fails, both the query and the connection are failed (in that order).
       
   250  *
       
   251  * Any further queries will then also be failed, because there's no reconnection/retry logic yet.
       
   252  *
       
   253  * This means that if conn is NULL, all queries are failed.
       
   254  */
       
   255 static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) {
       
   256     struct evsql_query *query;
       
   257     int err;
       
   258     
       
   259     // look for waiting queries
       
   260     while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
       
   261         // dequeue
       
   262         TAILQ_REMOVE(&evsql->query_queue, query, entry);
       
   263         
       
   264         if (conn) {
       
   265             // try and execute it
       
   266             err = _evsql_query_exec(conn, query, query->command);
       
   267         }
       
   268 
       
   269         // free the command buf
       
   270         free(query->command); query->command = NULL;
       
   271 
       
   272         if (err || !conn) {
       
   273             if (!conn) {
       
   274                 // warn when dropping queries
       
   275                 WARNING("failing query becuse there are no conns");
       
   276             }
       
   277 
       
   278             // fail the query
       
   279             _evsql_query_fail(evsql, query);
       
   280             
       
   281             if (conn) {
       
   282                 // fail the connection
       
   283                 WARNING("failing the connection because a query-exec failed");
       
   284 
       
   285                 _evsql_conn_fail(conn); conn = NULL;
       
   286             }
       
   287 
       
   288         } else {
       
   289             // we have succesfully enqueued a query, and we can wait for this connection to complete
       
   290             break;
       
   291 
       
   292         }
       
   293 
       
   294         // handle the rest of the queue
       
   295     }
       
   296     
       
   297     // ok
       
   298     return;
       
   299 }
       
   300 
       
   301 /*
       
   302  * Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use.
       
   303  */
       
   304 static void _evsql_trans_ready (const struct evsql_result_info *res, void *arg) {
       
   305     (void) arg;
       
   306 
       
   307     assert(res->trans);
       
   308 
       
   309     // check for errors
       
   310     if (res->error)
       
   311         ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res));
       
   312     
       
   313     // transaction is now ready for use
       
   314     res->trans->ready_fn(res->trans, res->trans->cb_arg);
       
   315     
       
   316     // good
       
   317     return;
       
   318 
       
   319 error:
       
   320     _evsql_trans_fail(res->trans);
       
   321 }
       
   322 
       
   323 /*
       
   324  * The transaction's connection is ready, send the 'BEGIN' query.
       
   325  *
       
   326  * If anything fails, calls _evsql_trans_fail and returns nonzero, zero on success
       
   327  */
       
   328 static int _evsql_trans_conn_ready (struct evsql *evsql, struct evsql_trans *trans) {
       
   329     char trans_sql[EVSQL_QUERY_BEGIN_BUF];
       
   330     const char *isolation_level;
       
   331     int ret;
       
   332     
       
   333     // determine the isolation_level to use
       
   334     switch (trans->type) {
       
   335         case EVSQL_TRANS_DEFAULT:
       
   336             isolation_level = NULL; break;
       
   337 
       
   338         case EVSQL_TRANS_SERIALIZABLE:
       
   339             isolation_level = "SERIALIZABLE"; break;
       
   340 
       
   341         case EVSQL_TRANS_REPEATABLE_READ:
       
   342             isolation_level = "REPEATABLE READ"; break;
       
   343 
       
   344         case EVSQL_TRANS_READ_COMMITTED:
       
   345             isolation_level = "READ COMMITTED"; break;
       
   346 
       
   347         case EVSQL_TRANS_READ_UNCOMMITTED:
       
   348             isolation_level = "READ UNCOMMITTED"; break;
       
   349 
       
   350         default:
       
   351             FATAL("trans->type: %d", trans->type);
       
   352     }
       
   353     
       
   354     // build the trans_sql
       
   355     if (isolation_level)
       
   356         ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION ISOLATION LEVEL %s", isolation_level);
       
   357     else
       
   358         ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION");
       
   359     
       
   360     // make sure it wasn't truncated
       
   361     if (ret >= EVSQL_QUERY_BEGIN_BUF)
       
   362         ERROR("trans_sql overflow: %d >= %d", ret, EVSQL_QUERY_BEGIN_BUF);
       
   363     
       
   364     // execute the query
       
   365     if (evsql_query(evsql, trans, trans_sql, _evsql_trans_ready, NULL) == NULL)
       
   366         ERROR("evsql_query");
       
   367     
       
   368     // success
       
   369     return 0;
       
   370 
       
   371 error:
       
   372     // fail the transaction
       
   373     _evsql_trans_fail(trans);
       
   374 
       
   375     return -1;
       
   376 }
       
   377 
       
   378 /*
       
   379  * The evpq connection was succesfully established.
       
   380  */ 
       
   381 static void _evsql_evpq_connected (struct evpq_conn *_conn, void *arg) {
       
   382     struct evsql_conn *conn = arg;
       
   383 
       
   384     if (conn->trans)
       
   385         // notify the transaction
       
   386         // don't care about errors
       
   387         (void) _evsql_trans_conn_ready(conn->evsql, conn->trans);
       
   388     
       
   389     else
       
   390         // pump any waiting transactionless queries
       
   391         _evsql_pump(conn->evsql, conn);
       
   392 }
       
   393 
       
   394 /*
       
   395  * Got one result on this evpq connection.
       
   396  */
       
   397 static void _evsql_evpq_result (struct evpq_conn *_conn, PGresult *result, void *arg) {
       
   398     struct evsql_conn *conn = arg;
       
   399     struct evsql_query *query = conn->query;
       
   400 
       
   401     assert(query != NULL);
       
   402 
       
   403     // if we get multiple results, only return the first one
       
   404     if (query->result.evpq) {
       
   405         WARNING("[evsql] evpq query returned multiple results, discarding previous one");
       
   406         
       
   407         PQclear(query->result.evpq); query->result.evpq = NULL;
       
   408     }
       
   409     
       
   410     // remember the result
       
   411     query->result.evpq = result;
       
   412 }
       
   413 
       
   414 /*
       
   415  * No more results for this query.
       
   416  */
       
   417 static void _evsql_evpq_done (struct evpq_conn *_conn, void *arg) {
       
   418     struct evsql_conn *conn = arg;
       
   419     struct evsql_query *query = conn->query;
       
   420     struct evsql_result_info res; ZINIT(res);
       
   421     
       
   422     assert(query != NULL);
       
   423     
       
   424     // set up the result_info
       
   425     res.evsql = conn->evsql;
       
   426     res.trans = conn->trans;
       
   427     
       
   428     if (query->result.evpq == NULL) {
       
   429         // if a query didn't return any results (bug?), warn and fail the query
       
   430         WARNING("[evsql] evpq query didn't return any results");
       
   431 
       
   432         res.error = 1;
       
   433     
       
   434     } else if (strcmp(PQresultErrorMessage(query->result.evpq), "") != 0) {
       
   435         // the query failed with some error
       
   436         res.error = 1;
       
   437         res.result.pq = query->result.evpq;
       
   438 
       
   439     } else {
       
   440         res.error = 0;
       
   441         res.result.pq = query->result.evpq;
       
   442 
       
   443     }
       
   444 
       
   445     // de-associate the query from the connection
       
   446     conn->query = NULL;
       
   447     
       
   448     // how we handle query completion depends on if we're a transaction or not
       
   449     if (conn->trans) {
       
   450         // we can deassign the trans's query
       
   451         conn->trans->query = NULL;
       
   452 
       
   453         // was an abort?
       
   454         if (!query->cb_fn)
       
   455             // notify the user that the transaction query has been aborted
       
   456             conn->trans->ready_fn(conn->trans, conn->trans->cb_arg);
       
   457 
       
   458         // then hand the query to the user
       
   459         _evsql_query_done(query, &res);
       
   460         
       
   461     } else {
       
   462         // a transactionless query, so just finish it off and pump any other waiting ones
       
   463         _evsql_query_done(query, &res);
       
   464 
       
   465         // pump the next one
       
   466         _evsql_pump(conn->evsql, conn);
       
   467     }
       
   468 }
       
   469 
       
   470 /*
       
   471  * The connection failed.
       
   472  */
       
   473 static void _evsql_evpq_failure (struct evpq_conn *_conn, void *arg) {
       
   474     struct evsql_conn *conn = arg;
       
   475     
       
   476     // just fail the conn
       
   477     _evsql_conn_fail(conn);
       
   478 }
       
   479 
       
   480 /*
       
   481  * Our evpq behaviour
       
   482  */
       
   483 static struct evpq_callback_info _evsql_evpq_cb_info = {
       
   484     .fn_connected       = _evsql_evpq_connected,
       
   485     .fn_result          = _evsql_evpq_result,
       
   486     .fn_done            = _evsql_evpq_done,
       
   487     .fn_failure         = _evsql_evpq_failure,
       
   488 };
       
   489 
       
   490 /*
       
   491  * Allocate the generic evsql context.
       
   492  */
       
   493 static struct evsql *_evsql_new_base (struct event_base *ev_base, evsql_error_cb error_fn, void *cb_arg) {
       
   494     struct evsql *evsql = NULL;
       
   495     
       
   496     // allocate it
       
   497     if ((evsql = calloc(1, sizeof(*evsql))) == NULL)
       
   498         ERROR("calloc");
       
   499 
       
   500     // store
       
   501     evsql->ev_base = ev_base;
       
   502     evsql->error_fn = error_fn;
       
   503     evsql->cb_arg = cb_arg;
       
   504 
       
   505     // init
       
   506     LIST_INIT(&evsql->conn_list);
       
   507     TAILQ_INIT(&evsql->query_queue);
       
   508 
       
   509     // done
       
   510     return evsql;
       
   511 
       
   512 error:
       
   513     return NULL;
       
   514 }
       
   515 
       
   516 /*
       
   517  * Start a new connection and add it to the list, it won't be ready until _evsql_evpq_connected is called
       
   518  */
       
   519 static struct evsql_conn *_evsql_conn_new (struct evsql *evsql) {
       
   520     struct evsql_conn *conn = NULL;
       
   521     
       
   522     // allocate
       
   523     if ((conn = calloc(1, sizeof(*conn))) == NULL)
       
   524         ERROR("calloc");
       
   525 
       
   526     // init
       
   527     conn->evsql = evsql;
       
   528     
       
   529     // connect the engine
       
   530     switch (evsql->type) {
       
   531         case EVSQL_EVPQ:
       
   532             if ((conn->engine.evpq = evpq_connect(evsql->ev_base, evsql->engine_conf.evpq, _evsql_evpq_cb_info, conn)) == NULL)
       
   533                 goto error;
       
   534             
       
   535             break;
       
   536             
       
   537         default:
       
   538             FATAL("evsql->type");
       
   539     }
       
   540 
       
   541     // add it to the list
       
   542     LIST_INSERT_HEAD(&evsql->conn_list, conn, entry);
       
   543 
       
   544     // success
       
   545     return conn;
       
   546 
       
   547 error:
       
   548     free(conn);
       
   549 
       
   550     return NULL;
       
   551 }
       
   552 
       
   553 struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) {
       
   554     struct evsql *evsql = NULL;
       
   555     
       
   556     // base init
       
   557     if ((evsql = _evsql_new_base (ev_base, error_fn, cb_arg)) == NULL)
       
   558         goto error;
       
   559 
       
   560     // store conf
       
   561     evsql->engine_conf.evpq = pq_conninfo;
       
   562 
       
   563     // pre-create one connection
       
   564     if (_evsql_conn_new(evsql) == NULL)
       
   565         goto error;
       
   566 
       
   567     // done
       
   568     return evsql;
       
   569 
       
   570 error:
       
   571     // XXX: more complicated than this?
       
   572     free(evsql); 
       
   573 
       
   574     return NULL;
       
   575 }
       
   576 
       
   577 /*
       
   578  * Checks if the connection is already allocated for some other trans/query.
       
   579  *
       
   580  * Returns:
       
   581  *      0       connection idle, can be allocated
       
   582  *      >1      connection busy
       
   583  */
       
   584 static int _evsql_conn_busy (struct evsql_conn *conn) {
       
   585     // transactions get the connection to themselves
       
   586     if (conn->trans)
       
   587         return 1;
       
   588     
       
   589     // if it has a query assigned, it's busy
       
   590     if (conn->query)
       
   591         return 1;
       
   592 
       
   593     // otherwise, it's all idle
       
   594     return 0;
       
   595 }
       
   596 
       
   597 /*
       
   598  * Checks if the connection is ready for use (i.e. _evsql_evpq_connected was called).
       
   599  *
       
   600  * The connection should not already have a query running.
       
   601  *
       
   602  * Returns 
       
   603  *  <0  the connection is not valid (failed, query in progress)
       
   604  *  0   the connection is still pending, and will become ready at some point
       
   605  *  >0  it's ready
       
   606  */
       
   607 static int _evsql_conn_ready (struct evsql_conn *conn) {
       
   608     switch (conn->evsql->type) {
       
   609         case EVSQL_EVPQ: {
       
   610             enum evpq_state state = evpq_state(conn->engine.evpq);
       
   611             
       
   612             switch (state) {
       
   613                 case EVPQ_CONNECT:
       
   614                     return 0;
       
   615                 
       
   616                 case EVPQ_CONNECTED:
       
   617                     return 1;
       
   618 
       
   619                 case EVPQ_QUERY:
       
   620                 case EVPQ_INIT:
       
   621                 case EVPQ_FAILURE:
       
   622                     return -1;
       
   623                 
       
   624                 default:
       
   625                     FATAL("evpq_state: %d", state);
       
   626             }
       
   627 
       
   628         }
       
   629         
       
   630         default:
       
   631             FATAL("evsql->type: %d", conn->evsql->type);
       
   632     }
       
   633 }
       
   634 
       
   635 /*
       
   636  * Allocate a connection for use and return it via *conn_ptr, or if may_queue is nonzero and the connection pool is
       
   637  * getting full, return NULL (query should be queued).
       
   638  *
       
   639  * Note that the returned connection might not be ready for use yet (if we created a new one, see _evsql_conn_ready).
       
   640  *
       
   641  * Returns zero if a connection was found or the request should be queued, or nonzero if something failed and the
       
   642  * request should be dropped.
       
   643  */
       
   644 static int _evsql_conn_get (struct evsql *evsql, struct evsql_conn **conn_ptr, int may_queue) {
       
   645     int have_nontrans = 0;
       
   646     *conn_ptr = NULL;
       
   647     
       
   648     // find a connection that isn't busy and is ready (unless the query queue is empty).
       
   649     LIST_FOREACH(*conn_ptr, &evsql->conn_list, entry) {
       
   650         // we can only have a query enqueue itself if there is a non-trans conn it can later use
       
   651         if (!(*conn_ptr)->trans)
       
   652             have_nontrans = 1;
       
   653 
       
   654         // skip busy conns always
       
   655         if (_evsql_conn_busy(*conn_ptr))
       
   656             continue;
       
   657         
       
   658         // accept pending conns as long as there are NO enqueued queries (might cause deadlock otherwise)
       
   659         if (_evsql_conn_ready(*conn_ptr) == 0 && TAILQ_EMPTY(&evsql->query_queue))
       
   660             break;
       
   661 
       
   662         // accept conns that are in a fully ready state
       
   663         if (_evsql_conn_ready(*conn_ptr) > 0)
       
   664             break;
       
   665     }
       
   666     
       
   667     // if we found an idle connection, we can just return that right away
       
   668     if (*conn_ptr)
       
   669         return 0;
       
   670 
       
   671     // return NULL if may_queue and we have a non-trans conn that we can, at some point, use
       
   672     if (may_queue && have_nontrans)
       
   673         return 0;
       
   674     
       
   675     // we need to open a new connection
       
   676     if ((*conn_ptr = _evsql_conn_new(evsql)) == NULL)
       
   677         goto error;
       
   678 
       
   679     // good
       
   680     return 0;
       
   681 error:
       
   682     return -1;
       
   683 }
       
   684 
       
   685 struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, evsql_trans_done_cb done_fn, void *cb_arg) {
       
   686     struct evsql_trans *trans = NULL;
       
   687 
       
   688     // allocate it
       
   689     if ((trans = calloc(1, sizeof(*trans))) == NULL)
       
   690         ERROR("calloc");
       
   691 
       
   692     // store
       
   693     trans->evsql = evsql;
       
   694     trans->ready_fn = ready_fn;
       
   695     trans->done_fn = done_fn;
       
   696     trans->cb_arg = cb_arg;
       
   697     trans->type = type;
       
   698 
       
   699     // find a connection
       
   700     if (_evsql_conn_get(evsql, &trans->conn, 0))
       
   701         ERROR("_evsql_conn_get");
       
   702 
       
   703     // associate the conn
       
   704     trans->conn->trans = trans;
       
   705 
       
   706     // is it already ready?
       
   707     if (_evsql_conn_ready(trans->conn) > 0) {
       
   708         // call _evsql_trans_conn_ready directly, it will handle cleanup (silently, !error_fn)
       
   709         if (_evsql_trans_conn_ready(evsql, trans)) {
       
   710             // return NULL directly
       
   711             return NULL;
       
   712         }
       
   713 
       
   714     } else {
       
   715         // otherwise, wait for the conn to be ready
       
   716          
       
   717     }
       
   718     
       
   719     // and let it pass errors to the user
       
   720     trans->error_fn = error_fn;
       
   721 
       
   722     // ok
       
   723     return trans;
       
   724 
       
   725 error:
       
   726     free(trans);
       
   727 
       
   728     return NULL;
       
   729 }
       
   730 
       
   731 /*
       
   732  * Validate and allocate the basic stuff for a new query.
       
   733  */
       
   734 static struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) {
       
   735     struct evsql_query *query = NULL;
       
   736     
       
   737     // if it's part of a trans, then make sure the trans is idle
       
   738     if (trans && trans->query)
       
   739         ERROR("transaction is busy");
       
   740 
       
   741     // allocate it
       
   742     if ((query = calloc(1, sizeof(*query))) == NULL)
       
   743         ERROR("calloc");
       
   744 
       
   745     // store
       
   746     query->cb_fn = query_fn;
       
   747     query->cb_arg = cb_arg;
       
   748 
       
   749     // success
       
   750     return query;
       
   751 
       
   752 error:
       
   753     return NULL;
       
   754 }
       
   755 
       
   756 /*
       
   757  * Handle a new query.
       
   758  *
       
   759  * For transactions this will associate the query and then execute it, otherwise this will either find an idle
       
   760  * connection and send the query, or enqueue it.
       
   761  */
       
   762 static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_trans *trans, struct evsql_query *query, const char *command) {
       
   763     // transaction queries are handled differently
       
   764     if (trans) {
       
   765         // it's an in-transaction query
       
   766         assert(trans->query == NULL);
       
   767         
       
   768         // assign the query
       
   769         trans->query = query;
       
   770 
       
   771         // execute directly
       
   772         if (_evsql_query_exec(trans->conn, query, command)) {
       
   773             // ack, fail the transaction
       
   774             _evsql_trans_fail(trans);
       
   775             
       
   776             // caller frees query
       
   777             goto error;
       
   778         }
       
   779 
       
   780     } else {
       
   781         struct evsql_conn *conn;
       
   782         
       
   783         // find an idle connection
       
   784         if ((_evsql_conn_get(evsql, &conn, 1)))
       
   785             ERROR("couldn't allocate a connection for the query");
       
   786 
       
   787         // we must enqueue if no idle conn or the conn is not yet ready
       
   788         if (conn && _evsql_conn_ready(conn) > 0) {
       
   789             // execute directly
       
   790             if (_evsql_query_exec(conn, query, command)) {
       
   791                 // ack, fail the connection
       
   792                 _evsql_conn_fail(conn);
       
   793                 
       
   794                 // make sure we don't deadlock any queries, but if this query got a conn directly, then we shouldn't
       
   795                 // have any queries enqueued anyways
       
   796                 assert(TAILQ_EMPTY(&evsql->query_queue));
       
   797                 
       
   798                 // caller frees query
       
   799                 goto error;
       
   800             }
       
   801 
       
   802         } else {
       
   803             // copy the command for later execution
       
   804             if ((query->command = strdup(command)) == NULL)
       
   805                 ERROR("strdup");
       
   806             
       
   807             // enqueue until some connection pumps the queue
       
   808             TAILQ_INSERT_TAIL(&evsql->query_queue, query, entry);
       
   809         }
       
   810     }
       
   811 
       
   812     // ok, good
       
   813     return 0;
       
   814 
       
   815 error:
       
   816     return -1;
       
   817 }
       
   818 
       
   819 struct evsql_query *evsql_query (struct evsql *evsql, struct evsql_trans *trans, const char *command, evsql_query_cb query_fn, void *cb_arg) {
       
   820     struct evsql_query *query = NULL;
       
   821     
       
   822     // alloc new query
       
   823     if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL)
       
   824         goto error;
       
   825     
       
   826     // just execute the command string directly
       
   827     if (_evsql_query_enqueue(evsql, trans, query, command))
       
   828         goto error;
       
   829 
       
   830     // ok
       
   831     return query;
       
   832 
       
   833 error:
       
   834     _evsql_query_free(query);
       
   835 
       
   836     return NULL;
       
   837 }
       
   838 
       
   839 struct evsql_query *evsql_query_params (struct evsql *evsql, struct evsql_trans *trans, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg) {
       
   840     struct evsql_query *query = NULL;
       
   841     const struct evsql_query_param *param;
       
   842     int idx;
       
   843     
       
   844     // alloc new query
       
   845     if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL)
       
   846         goto error;
       
   847 
       
   848     // count the params
       
   849     for (param = params->list; param->type; param++) 
       
   850         query->params.count++;
       
   851 
       
   852     // allocate the vertical storage for the parameters
       
   853     if (0
       
   854         
       
   855 //            !(query->params.types    = calloc(query->params.count, sizeof(Oid)))
       
   856         ||  !(query->params.values   = calloc(query->params.count, sizeof(char *)))
       
   857         ||  !(query->params.lengths  = calloc(query->params.count, sizeof(int)))
       
   858         ||  !(query->params.formats  = calloc(query->params.count, sizeof(int)))
       
   859     )
       
   860         ERROR("calloc");
       
   861 
       
   862     // transform
       
   863     for (param = params->list, idx = 0; param->type; param++, idx++) {
       
   864         // `types` stays NULL
       
   865         // query->params.types[idx] = 0;
       
   866         
       
   867         // values
       
   868         query->params.values[idx] = param->data_raw;
       
   869 
       
   870         // lengths
       
   871         query->params.lengths[idx] = param->length;
       
   872 
       
   873         // formats, binary if length is nonzero
       
   874         query->params.formats[idx] = param->length ? 1 : 0;
       
   875     }
       
   876 
       
   877     // result format
       
   878     switch (params->result_fmt) {
       
   879         case EVSQL_FMT_TEXT:
       
   880             query->params.result_format = 0; break;
       
   881 
       
   882         case EVSQL_FMT_BINARY:
       
   883             query->params.result_format = 1; break;
       
   884 
       
   885         default:
       
   886             FATAL("params.result_fmt: %d", params->result_fmt);
       
   887     }
       
   888 
       
   889     // execute it
       
   890     if (_evsql_query_enqueue(evsql, trans, query, command))
       
   891         goto error;
       
   892 
       
   893     // ok
       
   894     return query;
       
   895 
       
   896 error:
       
   897     _evsql_query_free(query);
       
   898     
       
   899     return NULL;
       
   900 }
       
   901 
       
   902 void evsql_query_abort (struct evsql_trans *trans, struct evsql_query *query) {
       
   903     assert(query);
       
   904 
       
   905     if (trans) {
       
   906         // must be the right query
       
   907         assert(trans->query == query);
       
   908     }
       
   909 
       
   910     // just strip the callback and wait for it to complete as normal
       
   911     query->cb_fn = NULL;
       
   912 }
       
   913 
       
   914 void _evsql_trans_commit_res (const struct evsql_result_info *res, void *arg) {
       
   915     (void) arg;
       
   916 
       
   917     assert(res->trans);
       
   918 
       
   919     // check for errors
       
   920     if (res->error)
       
   921         ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res));
       
   922     
       
   923     // transaction is now done
       
   924     res->trans->done_fn(res->trans, res->trans->cb_arg);
       
   925     
       
   926     // release it
       
   927     _evsql_trans_release(res->trans);
       
   928 
       
   929     // success
       
   930     return;
       
   931 
       
   932 error:
       
   933     _evsql_trans_fail(res->trans);
       
   934 }
       
   935 
       
   936 int evsql_trans_commit (struct evsql_trans *trans) {
       
   937     static const char *sql = "COMMIT TRANSACTION";
       
   938 
       
   939     if (trans->query)
       
   940         ERROR("cannot COMMIT because transaction is still busy");
       
   941     
       
   942     // query
       
   943     if (evsql_query(trans->evsql, trans, sql, _evsql_trans_commit_res, NULL) == NULL)
       
   944         goto error;
       
   945     
       
   946     // mark it as commited in case someone wants to abort it
       
   947     trans->has_commit = 1;
       
   948 
       
   949     // success
       
   950     return 0;
       
   951 
       
   952 error:
       
   953     return -1;
       
   954 }
       
   955 
       
   956 void _evsql_trans_rollback_res (const struct evsql_result_info *res, void *arg) {
       
   957     (void) arg;
       
   958 
       
   959     assert(res->trans);
       
   960 
       
   961     // fail the connection on errors
       
   962     if (res->error)
       
   963         ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res));
       
   964 
       
   965     // release it
       
   966     _evsql_trans_release(res->trans);
       
   967 
       
   968     // success
       
   969     return;
       
   970 
       
   971 error:
       
   972     // fail the connection too, errors are supressed
       
   973     _evsql_trans_fail(res->trans);
       
   974 }
       
   975 
       
   976 /*
       
   977  * Used as the ready_fn callback in case of abort, otherwise directly
       
   978  */
       
   979 void _evsql_trans_rollback (struct evsql_trans *trans, void *unused) {
       
   980     static const char *sql = "ROLLBACK TRANSACTION";
       
   981 
       
   982     (void) unused;
       
   983 
       
   984     // query
       
   985     if (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, NULL) == NULL) {
       
   986         // fail the transaction/connection
       
   987         _evsql_trans_fail(trans);
       
   988     }
       
   989 
       
   990 }
       
   991 
       
   992 void evsql_trans_abort (struct evsql_trans *trans) {
       
   993     // supress errors
       
   994     trans->error_fn = NULL;
       
   995     
       
   996     if (trans->has_commit) {
       
   997         // abort after commit doesn't make sense
       
   998         FATAL("transaction was already commited");
       
   999     }
       
  1000 
       
  1001     if (trans->query) {
       
  1002         // gah, some query is running
       
  1003         WARNING("aborting pending query");
       
  1004         
       
  1005         // prepare to rollback once complete
       
  1006         trans->ready_fn = _evsql_trans_rollback;
       
  1007         
       
  1008         // abort
       
  1009         evsql_query_abort(trans, trans->query);
       
  1010 
       
  1011     } else {
       
  1012         // just rollback directly
       
  1013         _evsql_trans_rollback(trans, NULL);
       
  1014 
       
  1015     }
       
  1016 }
       
  1017