src/evsql.c
changeset 25 99a41f48e29b
parent 24 82cfdb6680d1
child 26 61668c57f4bb
equal deleted inserted replaced
24:82cfdb6680d1 25:99a41f48e29b
     1 #define _GNU_SOURCE
     1 #define _GNU_SOURCE
     2 #include <stdlib.h>
     2 #include <stdlib.h>
     3 #include <sys/queue.h>
       
     4 #include <assert.h>
     3 #include <assert.h>
     5 #include <string.h>
     4 #include <string.h>
     6 
     5 
     7 #include "evsql.h"
     6 #include "evsql.h"
       
     7 #include "evsql_internal.h"
     8 #include "evpq.h"
     8 #include "evpq.h"
     9 #include "lib/log.h"
     9 #include "lib/log.h"
    10 #include "lib/error.h"
    10 #include "lib/error.h"
    11 #include "lib/misc.h"
    11 #include "lib/misc.h"
    12 
    12 
    13 enum evsql_type {
       
    14     EVSQL_EVPQ,
       
    15 };
       
    16 
       
    17 struct evsql {
       
    18     // callbacks
       
    19     evsql_error_cb error_fn;
       
    20     void *cb_arg;
       
    21 
       
    22     // backend engine
       
    23     enum evsql_type type;
       
    24 
       
    25     union {
       
    26         struct evpq_conn *evpq;
       
    27     } engine;
       
    28     
       
    29     // list of queries running or waiting to run
       
    30     TAILQ_HEAD(evsql_queue, evsql_query) queue;
       
    31 };
       
    32 
       
    33 struct evsql_query {
       
    34     // the evsql we are querying
       
    35     struct evsql *evsql;
       
    36 
       
    37     // the actual SQL query, this may or may not be ours, see _evsql_query_exec
       
    38     char *command;
       
    39     
       
    40     // possible query params
       
    41     struct evsql_query_param_info {
       
    42         int count;
       
    43 
       
    44         Oid *types;
       
    45         const char **values;
       
    46         int *lengths;
       
    47         int *formats;
       
    48 
       
    49         int result_format;
       
    50     } params;
       
    51 
       
    52     // our callback
       
    53     evsql_query_cb cb_fn;
       
    54     void *cb_arg;
       
    55 
       
    56     // our position in the query list
       
    57     TAILQ_ENTRY(evsql_query) entry;
       
    58 
       
    59     // the result
       
    60     union {
       
    61         PGresult *evpq;
       
    62     } result;
       
    63 };
       
    64 
       
    65 
    13 
    66 /*
    14 /*
    67  * Actually execute the given query.
    15  * Actually execute the given query.
    68  *
    16  *
    69  * The backend should be able to accept the query at this time.
    17  * The backend should be able to accept the query at this time.
    70  *
    18  *
    71  * query->command must be valid during the execution of this function, but once it returns, the command is not needed
    19  * You should assume that if trying to execute a query fails, then the connection should also be considred as failed.
    72  * anymore, and should be set to NULL.
    20  */
    73  */
    21 static int _evsql_query_exec (struct evsql_conn *conn, struct evsql_query *query, const char *command) {
    74 static int _evsql_query_exec (struct evsql *evsql, struct evsql_query *query, const char *command) {
    22     int err;
    75     switch (evsql->type) {
    23 
       
    24     switch (conn->evsql->type) {
    76         case EVSQL_EVPQ:
    25         case EVSQL_EVPQ:
    77             // got params?
    26             // got params?
    78             if (query->params.count) {
    27             if (query->params.count) {
    79                 return evpq_query_params(evsql->engine.evpq, command,
    28                 err = evpq_query_params(conn->engine.evpq, command,
    80                     query->params.count, 
    29                     query->params.count, 
    81                     query->params.types, 
    30                     query->params.types, 
    82                     query->params.values, 
    31                     query->params.values, 
    83                     query->params.lengths, 
    32                     query->params.lengths, 
    84                     query->params.formats, 
    33                     query->params.formats, 
    85                     query->params.result_format
    34                     query->params.result_format
    86                 );
    35                 );
    87 
    36 
    88             } else {
    37             } else {
    89                 // plain 'ole query
    38                 // plain 'ole query
    90                 return evpq_query(evsql->engine.evpq, command);
    39                 err = evpq_query(conn->engine.evpq, command);
    91             }
    40             }
       
    41 
       
    42             if (err) {
       
    43                 if (PQstatus(evpq_pgconn(conn->engine.evpq)) != CONNECTION_OK)
       
    44                     WARNING("conn failed");
       
    45                 else
       
    46                     WARNING("query failed, dropping conn as well");
       
    47             }
       
    48 
       
    49             break;
    92         
    50         
    93         default:
    51         default:
    94             FATAL("evsql->type");
    52             FATAL("evsql->type");
    95     }
    53     }
    96 }
    54 
    97 
    55     if (!err)
    98 /*
    56         // assign the query
    99  * Free the query and related resources, doesn't trigger any callbacks or remove from any queues
    57         conn->query = query;
       
    58 
       
    59     return err;
       
    60 }
       
    61 
       
    62 /*
       
    63  * Free the query and related resources, doesn't trigger any callbacks or remove from any queues.
       
    64  *
       
    65  * The command should already be taken care of (NULL).
   100  */
    66  */
   101 static void _evsql_query_free (struct evsql_query *query) {
    67 static void _evsql_query_free (struct evsql_query *query) {
   102     assert(query->command == NULL);
    68     assert(query->command == NULL);
   103     
    69     
   104     // free params if present
    70     // free params if present
   110     // free the query itself
    76     // free the query itself
   111     free(query);
    77     free(query);
   112 }
    78 }
   113 
    79 
   114 /*
    80 /*
   115  * Dequeue the query, execute the callback, and free it.
    81  * Execute the callback if res is given, and free the query.
   116  */
    82  */
   117 static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *res) {
    83 static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *res) {
   118     // dequeue
       
   119     TAILQ_REMOVE(&query->evsql->queue, query, entry);
       
   120     
       
   121     if (res) 
    84     if (res) 
   122         // call the callback
    85         // call the callback
   123         query->cb_fn(res, query->cb_arg);
    86         query->cb_fn(res, query->cb_arg);
   124     
    87     
   125     // free
    88     // free
   126     _evsql_query_free(query);
    89     _evsql_query_free(query);
   127 }
    90 }
   128 
    91 
   129 /*
    92 /*
   130  * A query has failed, notify the user and remove it.
    93  * XXX:
   131  */
    94  * /
   132 static void _evsql_query_failure (struct evsql *evsql, struct evsql_query *query) {
    95 static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *res) {
       
    96     struct evsql_query *query;
       
    97     
       
    98     // clear the queue
       
    99     while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
       
   100         _evsql_query_done(query, res);
       
   101         
       
   102         TAILQ_REMOVE(&evsql->query_queue, query, entry);
       
   103     }
       
   104     
       
   105     // free
       
   106     free(evsql);
       
   107 }
       
   108 */
       
   109 
       
   110 /*
       
   111  * Free the transaction, it should already be deassociated from the query and conn.
       
   112  */
       
   113 static void _evsql_trans_free (struct evsql_trans *trans) {
       
   114     // ensure we don't leak anything
       
   115     assert(trans->query == NULL);
       
   116     assert(trans->conn == NULL);
       
   117     
       
   118     // free
       
   119     free(trans);
       
   120 }
       
   121 
       
   122 /*
       
   123  * Release a connection. It should already be deassociated from the trans and query.
       
   124  *
       
   125  * Releases the engine, removes from the conn_list and frees this.
       
   126  */
       
   127 static void _evsql_conn_release (struct evsql_conn *conn) {
       
   128     // ensure we don't leak anything
       
   129     assert(conn->trans == NULL);
       
   130     assert(conn->query == NULL);
       
   131 
       
   132     // release the engine
       
   133     switch (conn->evsql->type) {
       
   134         case EVSQL_EVPQ:
       
   135             evpq_release(conn->engine.evpq);
       
   136             break;
       
   137         
       
   138         default:
       
   139             FATAL("evsql->type");
       
   140     }
       
   141     
       
   142     // remove from list
       
   143     LIST_REMOVE(conn, entry);
       
   144 
       
   145     // free
       
   146     free(conn);
       
   147 }
       
   148 
       
   149 /*
       
   150  * Fail a single query, this will trigger the callback and free it.
       
   151  */
       
   152 static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) {
   133     struct evsql_result_info res; ZINIT(res);
   153     struct evsql_result_info res; ZINIT(res);
   134 
   154     
   135     // set up the result_info
   155     // set up the result_info
   136     res.evsql = evsql;
   156     res.evsql = evsql;
   137     res.error = 1;
   157     res.error = 1;
   138 
   158     
   139     // finish it off
   159     // finish off the query
   140     _evsql_query_done(query, &res);
   160     _evsql_query_done(query, &res);
   141 }
   161 }
   142 
   162 
   143 /*
   163 /*
   144  * Clear every enqueued query and then free the evsql.
   164  * Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the
   145  *
   165  * conn, and then free the trans.
   146  * If result_info is given, each query will also recieve it via their callback, and the error_fn will be called.
   166  */ 
   147  */
   167 static void _evsql_trans_fail (struct evsql_trans *trans) {
   148 static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *res) {
   168     if (trans->query) {
       
   169         // free the query silently
       
   170         _evsql_query_free(trans->query); trans->query = NULL;
       
   171     }
       
   172 
       
   173     // tell the user
       
   174     // XXX: trans is in a bad state during this call
       
   175     trans->error_fn(trans, trans->cb_arg);
       
   176 
       
   177     // fail the conn
       
   178     trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL;
       
   179 
       
   180     // free the trans
       
   181     _evsql_trans_free(trans);
       
   182 }
       
   183 
       
   184 /*
       
   185  * Fail a connection. If the connection is transactional, this will just call _evsql_trans_fail, but otherwise it will
       
   186  * fail any ongoing query, and then release the connection.
       
   187  */
       
   188 static void _evsql_conn_fail (struct evsql_conn *conn) {
       
   189     if (conn->trans) {
       
   190         // let transactions handle their connection failures
       
   191         _evsql_trans_fail(conn->trans);
       
   192 
       
   193     } else {
       
   194         if (conn->query) {
       
   195             // fail the in-progress query
       
   196             _evsql_query_fail(conn->evsql, conn->query); conn->query = NULL;
       
   197         }
       
   198 
       
   199         // finish off the whole connection
       
   200         _evsql_conn_release(conn);
       
   201     }
       
   202 }
       
   203 
       
   204 /*
       
   205  * Processes enqueued non-transactional queries until the queue is empty, or we managed to exec a query.
       
   206  *
       
   207  * If execing a query on a connection fails, both the query and the connection are failed (in that order).
       
   208  *
       
   209  * Any further queries will then also be failed, because there's no reconnection/retry logic yet.
       
   210  */
       
   211 static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) {
   149     struct evsql_query *query;
   212     struct evsql_query *query;
   150     
   213     int err;
   151     // clear the queue
   214     
   152     while ((query = TAILQ_FIRST(&evsql->queue)) != NULL) {
   215     // look for waiting queries
   153         _evsql_query_done(query, res);
   216     while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
   154         
   217         // dequeue
   155         TAILQ_REMOVE(&evsql->queue, query, entry);
   218         TAILQ_REMOVE(&evsql->query_queue, query, entry);
   156     }
   219         
   157     
   220         if (conn) {
   158     // do the error callback if required
   221             // try and execute it
   159     if (res)
   222             err = _evsql_query_exec(conn, query, query->command);
   160         evsql->error_fn(evsql, evsql->cb_arg);
       
   161     
       
   162     // free
       
   163     free(evsql);
       
   164 }
       
   165 
       
   166 
       
   167 /*
       
   168  * Sends the next query if there are more enqueued
       
   169  */
       
   170 static void _evsql_pump (struct evsql *evsql) {
       
   171     struct evsql_query *query;
       
   172     
       
   173     // look for the next query
       
   174     if ((query = TAILQ_FIRST(&evsql->queue)) != NULL) {
       
   175         // try and execute it
       
   176         if (_evsql_query_exec(evsql, query, query->command)) {
       
   177             // the query failed
       
   178             _evsql_query_failure(evsql, query);
       
   179         }
   223         }
   180 
   224 
   181         // free the command
   225         // free the command buf
   182         free(query->command); query->command = NULL;
   226         free(query->command); query->command = NULL;
   183 
   227 
   184         // ok, then we just wait
   228         if (err || !conn) {
   185     }
   229             if (!conn) {
   186 }
   230                 // warn when dropping queries
   187 
   231                 WARNING("failing query becuse there are no conns");
   188 
   232             }
   189 static void _evsql_evpq_connected (struct evpq_conn *conn, void *arg) {
   233 
   190     struct evsql *evsql = arg;
   234             // fail the query
   191 
   235             _evsql_query_fail(evsql, query);
   192     // no state to update, just pump any waiting queries
   236             
   193     _evsql_pump(evsql);
   237             if (conn) {
   194 }
   238                 // fail the connection
   195 
   239                 WARNING("failing the connection because a query-exec failed");
   196 static void _evsql_evpq_result (struct evpq_conn *conn, PGresult *result, void *arg) {
   240 
   197     struct evsql *evsql = arg;
   241                 _evsql_conn_fail(conn); conn = NULL;
   198     struct evsql_query *query;
   242             }
   199 
   243 
   200     assert((query = TAILQ_FIRST(&evsql->queue)) != NULL);
   244         } else {
       
   245             // we have succesfully enqueued a query, and we can wait for this connection to complete
       
   246             break;
       
   247 
       
   248         }
       
   249 
       
   250         // handle the rest of the queue
       
   251     }
       
   252     
       
   253     // ok
       
   254     return;
       
   255 }
       
   256 
       
   257 /*
       
   258  * Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use.
       
   259  */
       
   260 static void _evsql_trans_ready (const struct evsql_result_info *res, void *arg) {
       
   261     (void) arg;
       
   262 
       
   263     assert(res->trans);
       
   264 
       
   265     // check for errors
       
   266     if (res->error)
       
   267         ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res));
       
   268     
       
   269     // transaction is now ready for use
       
   270     res->trans->ready_fn(res->trans, res->trans->cb_arg);
       
   271 
       
   272 error:
       
   273     _evsql_trans_fail(res->trans);
       
   274 }
       
   275 
       
   276 /*
       
   277  * The transaction's connection is ready, send the 'BEGIN' query.
       
   278  */
       
   279 static void _evsql_trans_conn_ready (struct evsql *evsql, struct evsql_trans *trans) {
       
   280     char trans_sql[EVSQL_QUERY_BEGIN_BUF];
       
   281     const char *isolation_level;
       
   282     int ret;
       
   283     
       
   284     // determine the isolation_level to use
       
   285     switch (trans->type) {
       
   286         case EVSQL_TRANS_DEFAULT:
       
   287             isolation_level = NULL; break;
       
   288 
       
   289         case EVSQL_TRANS_SERIALIZABLE:
       
   290             isolation_level = "SERIALIZABLE"; break;
       
   291 
       
   292         case EVSQL_TRANS_REPEATABLE_READ:
       
   293             isolation_level = "REPEATABLE READ"; break;
       
   294 
       
   295         case EVSQL_TRANS_READ_COMMITTED:
       
   296             isolation_level = "READ COMMITTED"; break;
       
   297 
       
   298         case EVSQL_TRANS_READ_UNCOMMITTED:
       
   299             isolation_level = "READ UNCOMMITTED"; break;
       
   300 
       
   301         default:
       
   302             FATAL("trans->type: %d", trans->type);
       
   303     }
       
   304     
       
   305     // build the trans_sql
       
   306     if (isolation_level)
       
   307         ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION ISOLATION LEVEL %s", isolation_level);
       
   308     else
       
   309         ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION");
       
   310     
       
   311     // make sure it wasn't truncated
       
   312     if (ret >= EVSQL_QUERY_BEGIN_BUF)
       
   313         ERROR("trans_sql overflow: %d >= %d", ret, EVSQL_QUERY_BEGIN_BUF);
       
   314     
       
   315     // execute the query
       
   316     if (evsql_query(evsql, trans, trans_sql, _evsql_trans_ready, NULL))
       
   317         ERROR("evsql_query");
       
   318     
       
   319     // success
       
   320     return;
       
   321 
       
   322 error:
       
   323     // fail the transaction
       
   324     _evsql_trans_fail(trans);
       
   325 }
       
   326 
       
   327 /*
       
   328  * The evpq connection was succesfully established.
       
   329  */ 
       
   330 static void _evsql_evpq_connected (struct evpq_conn *_conn, void *arg) {
       
   331     struct evsql_conn *conn = arg;
       
   332 
       
   333     if (conn->trans)
       
   334         // notify the transaction
       
   335         _evsql_trans_conn_ready(conn->evsql, conn->trans);
       
   336     
       
   337     else
       
   338         // pump any waiting transactionless queries
       
   339         _evsql_pump(conn->evsql, conn);
       
   340 }
       
   341 
       
   342 /*
       
   343  * Got one result on this evpq connection.
       
   344  */
       
   345 static void _evsql_evpq_result (struct evpq_conn *_conn, PGresult *result, void *arg) {
       
   346     struct evsql_conn *conn = arg;
       
   347     struct evsql_query *query = conn->query;
       
   348 
       
   349     assert(query != NULL);
   201 
   350 
   202     // if we get multiple results, only return the first one
   351     // if we get multiple results, only return the first one
   203     if (query->result.evpq) {
   352     if (query->result.evpq) {
   204         WARNING("[evsql] evpq query returned multiple results, discarding previous one");
   353         WARNING("[evsql] evpq query returned multiple results, discarding previous one");
   205         
   354         
   208     
   357     
   209     // remember the result
   358     // remember the result
   210     query->result.evpq = result;
   359     query->result.evpq = result;
   211 }
   360 }
   212 
   361 
   213 static void _evsql_evpq_done (struct evpq_conn *conn, void *arg) {
   362 /*
   214     struct evsql *evsql = arg;
   363  * No more results for this query.
   215     struct evsql_query *query;
   364  */
       
   365 static void _evsql_evpq_done (struct evpq_conn *_conn, void *arg) {
       
   366     struct evsql_conn *conn = arg;
       
   367     struct evsql_query *query = conn->query;
   216     struct evsql_result_info res; ZINIT(res);
   368     struct evsql_result_info res; ZINIT(res);
   217 
   369     
   218     assert((query = TAILQ_FIRST(&evsql->queue)) != NULL);
   370     assert(query != NULL);
   219     
   371     
   220     // set up the result_info
   372     // set up the result_info
   221     res.evsql = evsql;
   373     res.evsql = conn->evsql;
   222     
   374     
   223     if (query->result.evpq == NULL) {
   375     if (query->result.evpq == NULL) {
   224         // if a query didn't return any results (bug?), warn and fail the query
   376         // if a query didn't return any results (bug?), warn and fail the query
   225         WARNING("[evsql] evpq query didn't return any results");
   377         WARNING("[evsql] evpq query didn't return any results");
   226 
   378 
   235         res.error = 0;
   387         res.error = 0;
   236         res.result.pq = query->result.evpq;
   388         res.result.pq = query->result.evpq;
   237 
   389 
   238     }
   390     }
   239 
   391 
   240     // finish it off
   392     // de-associate the query from the connection
   241     _evsql_query_done(query, &res);
   393     conn->query = NULL;
   242 
   394     
   243     // pump the next one
   395     // how we handle query completion depends on if we're a transaction or not
   244     _evsql_pump(evsql);
   396     if (conn->trans) {
   245 }
   397         // we can deassign the trans's query
   246 
   398         conn->trans->query = NULL;
   247 static void _evsql_evpq_failure (struct evpq_conn *conn, void *arg) {
   399 
   248     struct evsql *evsql = arg;
   400         // then hand the query to the user
   249     struct evsql_result_info result; ZINIT(result);
   401         _evsql_query_done(query, &res);
   250     
   402         
   251     // OH SHI...
   403     } else {
   252     
   404         // a transactionless query, so just finish it off and pump any other waiting ones
   253     // set up the result_info
   405         _evsql_query_done(query, &res);
   254     result.evsql = evsql;
   406 
   255     result.error = 1;
   407         // pump the next one
   256 
   408         _evsql_pump(conn->evsql, conn);
   257     // finish off the whole connection
   409     }
   258     _evsql_destroy(evsql, &result);
   410 }
   259 }
   411 
   260 
   412 /*
       
   413  * The connection failed.
       
   414  */
       
   415 static void _evsql_evpq_failure (struct evpq_conn *_conn, void *arg) {
       
   416     struct evsql_conn *conn = arg;
       
   417     
       
   418     // just fail the conn
       
   419     _evsql_conn_fail(conn);
       
   420 }
       
   421 
       
   422 /*
       
   423  * Our evpq behaviour
       
   424  */
   261 static struct evpq_callback_info _evsql_evpq_cb_info = {
   425 static struct evpq_callback_info _evsql_evpq_cb_info = {
   262     .fn_connected       = _evsql_evpq_connected,
   426     .fn_connected       = _evsql_evpq_connected,
   263     .fn_result          = _evsql_evpq_result,
   427     .fn_result          = _evsql_evpq_result,
   264     .fn_done            = _evsql_evpq_done,
   428     .fn_done            = _evsql_evpq_done,
   265     .fn_failure         = _evsql_evpq_failure,
   429     .fn_failure         = _evsql_evpq_failure,
   266 };
   430 };
   267 
   431 
   268 static struct evsql *_evsql_new_base (evsql_error_cb error_fn, void *cb_arg) {
   432 /*
       
   433  * Allocate the generic evsql context.
       
   434  */
       
   435 static struct evsql *_evsql_new_base (struct event_base *ev_base, evsql_error_cb error_fn, void *cb_arg) {
   269     struct evsql *evsql = NULL;
   436     struct evsql *evsql = NULL;
   270     
   437     
   271     // allocate it
   438     // allocate it
   272     if ((evsql = calloc(1, sizeof(*evsql))) == NULL)
   439     if ((evsql = calloc(1, sizeof(*evsql))) == NULL)
   273         ERROR("calloc");
   440         ERROR("calloc");
   274 
   441 
   275     // store
   442     // store
       
   443     evsql->ev_base = ev_base;
   276     evsql->error_fn = error_fn;
   444     evsql->error_fn = error_fn;
   277     evsql->cb_arg = cb_arg;
   445     evsql->cb_arg = cb_arg;
   278 
   446 
   279     // init
   447     // init
   280     TAILQ_INIT(&evsql->queue);
   448     LIST_INIT(&evsql->conn_list);
       
   449     TAILQ_INIT(&evsql->query_queue);
   281 
   450 
   282     // done
   451     // done
   283     return evsql;
   452     return evsql;
   284 
   453 
   285 error:
   454 error:
   286     return NULL;
   455     return NULL;
   287 }
   456 }
   288 
   457 
       
   458 /*
       
   459  * Start a new connection and add it to the list, it won't be ready until _evsql_evpq_connected is called
       
   460  */
       
   461 static struct evsql_conn *_evsql_conn_new (struct evsql *evsql) {
       
   462     struct evsql_conn *conn = NULL;
       
   463     
       
   464     // allocate
       
   465     if ((conn = calloc(1, sizeof(*conn))) == NULL)
       
   466         ERROR("calloc");
       
   467 
       
   468     // init
       
   469     conn->evsql = evsql;
       
   470     
       
   471     // connect the engine
       
   472     switch (evsql->type) {
       
   473         case EVSQL_EVPQ:
       
   474             if ((conn->engine.evpq = evpq_connect(evsql->ev_base, evsql->engine_conf.evpq, _evsql_evpq_cb_info, conn)) == NULL)
       
   475                 goto error;
       
   476             
       
   477             break;
       
   478             
       
   479         default:
       
   480             FATAL("evsql->type");
       
   481     }
       
   482 
       
   483     // add it to the list
       
   484     LIST_INSERT_HEAD(&evsql->conn_list, conn, entry);
       
   485 
       
   486     // success
       
   487     return conn;
       
   488 
       
   489 error:
       
   490     free(conn);
       
   491 
       
   492     return NULL;
       
   493 }
       
   494 
   289 struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) {
   495 struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) {
   290     struct evsql *evsql = NULL;
   496     struct evsql *evsql = NULL;
   291     
   497     
   292     // base init
   498     // base init
   293     if ((evsql = _evsql_new_base (error_fn, cb_arg)) == NULL)
   499     if ((evsql = _evsql_new_base (ev_base, error_fn, cb_arg)) == NULL)
   294         goto error;
   500         goto error;
   295 
   501 
   296     // connect the engine
   502     // store conf
   297     if ((evsql->engine.evpq = evpq_connect(ev_base, pq_conninfo, _evsql_evpq_cb_info, evsql)) == NULL)
   503     evsql->engine_conf.evpq = pq_conninfo;
       
   504 
       
   505     // pre-create one connection
       
   506     if (_evsql_conn_new(evsql) == NULL)
   298         goto error;
   507         goto error;
   299 
   508 
   300     // done
   509     // done
   301     return evsql;
   510     return evsql;
   302 
   511 
   306 
   515 
   307     return NULL;
   516     return NULL;
   308 }
   517 }
   309 
   518 
   310 /*
   519 /*
   311  * Checks what the state of the connection is in regards to executing a query.
   520  * Checks if the connection is already allocated for some other trans/query.
   312  *
   521  *
   313  * Returns:
   522  * Returns:
   314  *      <0      connection failure, query not possible
   523  *      0       connection idle, can be allocated
   315  *      0       connection idle, can query immediately
   524  *      >1      connection busy
   316  *      1       connection busy, must queue query
   525  */
   317  */
   526 static int _evsql_conn_busy (struct evsql_conn *conn) {
   318 static int _evsql_query_busy (struct evsql *evsql) {
   527     // transactions get the connection to themselves
   319     switch (evsql->type) {
   528     if (conn->trans)
       
   529         return 1;
       
   530     
       
   531     // if it has a query assigned, it's busy
       
   532     if (conn->query)
       
   533         return 1;
       
   534 
       
   535     // otherwise, it's all idle
       
   536     return 0;
       
   537 }
       
   538 
       
   539 /*
       
   540  * Checks if the connection is ready for use (i.e. _evsql_evpq_connected was called).
       
   541  *
       
   542  * The connection should not already have a query running.
       
   543  *
       
   544  * Returns 
       
   545  *  <0  the connection is not valid (failed, query in progress)
       
   546  *  0   the connection is still pending, and will become ready at some point
       
   547  *  >0  it's ready
       
   548  */
       
   549 static int _evsql_conn_ready (struct evsql_conn *conn) {
       
   550     switch (conn->evsql->type) {
   320         case EVSQL_EVPQ: {
   551         case EVSQL_EVPQ: {
   321             enum evpq_state state = evpq_state(evsql->engine.evpq);
   552             enum evpq_state state = evpq_state(conn->engine.evpq);
   322             
   553             
   323             switch (state) {
   554             switch (state) {
   324                 case EVPQ_CONNECT:
   555                 case EVPQ_CONNECT:
   325                 case EVPQ_QUERY:
   556                     return 0;
   326                     return 1;
       
   327                 
   557                 
   328                 case EVPQ_CONNECTED:
   558                 case EVPQ_CONNECTED:
   329                     return 0;
   559                     return 1;
   330 
   560 
       
   561                 case EVPQ_QUERY:
   331                 case EVPQ_INIT:
   562                 case EVPQ_INIT:
   332                 case EVPQ_FAILURE:
   563                 case EVPQ_FAILURE:
   333                     return -1;
   564                     return -1;
   334                 
   565                 
   335                 default:
   566                 default:
   336                     FATAL("evpq_state");
   567                     FATAL("evpq_state: %d", state);
   337             }
   568             }
   338 
   569 
   339         }
   570         }
   340         
   571         
   341         default:
   572         default:
   342             FATAL("evsql->type");
   573             FATAL("evsql->type: %d", conn->evsql->type);
   343     }
   574     }
   344 }
   575 }
   345 
   576 
   346 static struct evsql_query *_evsql_query_new (struct evsql *evsql, evsql_query_cb query_fn, void *cb_arg) {
   577 /*
       
   578  * Allocate a connection for use and return it via *conn_ptr, or if may_queue is nonzero and the connection pool is
       
   579  * getting full, return NULL (query should be queued).
       
   580  *
       
   581  * Note that the returned connection might not be ready for use yet (if we created a new one, see _evsql_conn_ready).
       
   582  *
       
   583  * Returns zero if a connection was found or the request should be queued, or nonzero if something failed and the
       
   584  * request should be dropped.
       
   585  */
       
   586 static int _evsql_conn_get (struct evsql *evsql, struct evsql_conn **conn_ptr, int may_queue) {
       
   587     *conn_ptr = NULL;
       
   588     
       
   589     // find a connection that isn't busy and is ready (unless the query queue is empty).
       
   590     LIST_FOREACH(*conn_ptr, &evsql->conn_list, entry) {
       
   591         // skip busy conns always
       
   592         if (_evsql_conn_busy(*conn_ptr))
       
   593             continue;
       
   594         
       
   595         // accept pending conns as long as there are NO enqueued queries (might cause deadlock otherwise)
       
   596         if (_evsql_conn_ready(*conn_ptr) == 0 && TAILQ_EMPTY(&evsql->query_queue))
       
   597             break;
       
   598 
       
   599         // accept conns that are in a fully ready state
       
   600         if (_evsql_conn_ready(*conn_ptr) > 0)
       
   601             break;
       
   602     }
       
   603     
       
   604     // if we found an idle connection, we can just return that right away
       
   605     if (*conn_ptr)
       
   606         return 0;
       
   607 
       
   608     // return NULL if may_queue and the conn list is not empty
       
   609     if (may_queue && !LIST_EMPTY(&evsql->conn_list))
       
   610         return 0;
       
   611     
       
   612     // we need to open a new connection
       
   613     if ((*conn_ptr = _evsql_conn_new(evsql)) == NULL)
       
   614         goto error;
       
   615 
       
   616     // good
       
   617     return 0;
       
   618 error:
       
   619     return -1;
       
   620 }
       
   621 
       
   622 /*
       
   623  * Validate and allocate the basic stuff for a new query.
       
   624  */
       
   625 static struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) {
   347     struct evsql_query *query;
   626     struct evsql_query *query;
   348     
   627     
       
   628     // if it's part of a trans, then make sure the trans is idle
       
   629     if (trans && trans->query)
       
   630         ERROR("transaction is busy");
       
   631 
   349     // allocate it
   632     // allocate it
   350     if ((query = calloc(1, sizeof(*query))) == NULL)
   633     if ((query = calloc(1, sizeof(*query))) == NULL)
   351         ERROR("calloc");
   634         ERROR("calloc");
   352 
   635 
   353     // store
   636     // store
   354     query->evsql = evsql;
       
   355     query->cb_fn = query_fn;
   637     query->cb_fn = query_fn;
   356     query->cb_arg = cb_arg;
   638     query->cb_arg = cb_arg;
   357 
   639 
   358     // success
   640     // success
   359     return query;
   641     return query;
   360 
   642 
   361 error:
   643 error:
   362     return NULL;
   644     return NULL;
   363 }
   645 }
   364 
   646 
   365 static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_query *query, const char *command) {
   647 /*
   366     int busy;
   648  * Handle a new query.
   367     
   649  *
   368     // check state
   650  * For transactions this will associate the query and then execute it, otherwise this will either find an idle
   369     if ((busy = _evsql_query_busy(evsql)) < 0)
   651  * connection and send the query, or enqueue it.
   370         ERROR("connection is not valid");
   652  */
   371     
   653 static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_trans *trans, struct evsql_query *query, const char *command) {
   372     if (busy) {
   654     // transaction queries are handled differently
   373         // copy the command for later execution
   655     if (trans) {
   374         if ((query->command = strdup(command)) == NULL)
   656         // it's an in-transaction query
   375             ERROR("strdup");
   657         assert(trans->query == NULL);
       
   658         
       
   659         // assign the query
       
   660         trans->query = query;
       
   661 
       
   662         // execute directly
       
   663         if (_evsql_query_exec(trans->conn, query, command))
       
   664             goto error;
   376 
   665 
   377     } else {
   666     } else {
   378         assert(TAILQ_EMPTY(&evsql->queue));
   667         struct evsql_conn *conn;
   379 
   668         
   380         // execute directly
   669         // find an idle connection
   381         if (_evsql_query_exec(evsql, query, command))
   670         if ((_evsql_conn_get(evsql, &conn, 1)))
   382             goto error;
   671             ERROR("couldn't allocate a connection for the query");
   383 
   672 
   384     }
   673         // we must enqueue if no idle conn or the conn is not yet ready
   385     
   674         if (conn && _evsql_conn_ready(conn) > 0) {
   386     // store it on the list
   675             // execute directly
   387     TAILQ_INSERT_TAIL(&evsql->queue, query, entry);
   676             if (_evsql_query_exec(conn, query, command))
       
   677                 goto error;
       
   678 
       
   679 
       
   680         } else {
       
   681             // copy the command for later execution
       
   682             if ((query->command = strdup(command)) == NULL)
       
   683                 ERROR("strdup");
       
   684             
       
   685             // enqueue until some connection pumps the queue
       
   686             TAILQ_INSERT_TAIL(&evsql->query_queue, query, entry);
       
   687         }
       
   688     }
   388 
   689 
   389     // ok, good
   690     // ok, good
   390     return 0;
   691     return 0;
   391 
   692 
   392 error:
   693 error:
   393     return -1;
   694     return -1;
   394 }
   695 }
   395 
   696 
   396 struct evsql_query *evsql_query (struct evsql *evsql, const char *command, evsql_query_cb query_fn, void *cb_arg) {
   697 struct evsql_query *evsql_query (struct evsql *evsql, struct evsql_trans *trans, const char *command, evsql_query_cb query_fn, void *cb_arg) {
   397     struct evsql_query *query = NULL;
   698     struct evsql_query *query = NULL;
   398     
   699     
   399     // alloc new query
   700     // alloc new query
   400     if ((query = _evsql_query_new(evsql, query_fn, cb_arg)) == NULL)
   701     if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL)
   401         goto error;
   702         goto error;
   402     
   703     
   403     // just execute the command string directly
   704     // just execute the command string directly
   404     if (_evsql_query_enqueue(evsql, query, command))
   705     if (_evsql_query_enqueue(evsql, trans, query, command))
   405         goto error;
   706         goto error;
   406 
   707 
   407     // ok
   708     // ok
   408     return query;
   709     return query;
   409 
   710 
   411     _evsql_query_free(query);
   712     _evsql_query_free(query);
   412 
   713 
   413     return NULL;
   714     return NULL;
   414 }
   715 }
   415 
   716 
   416 struct evsql_query *evsql_query_params (struct evsql *evsql, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg) {
   717 struct evsql_query *evsql_query_params (struct evsql *evsql, struct evsql_trans *trans, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg) {
   417     struct evsql_query *query = NULL;
   718     struct evsql_query *query = NULL;
   418     const struct evsql_query_param *param;
   719     const struct evsql_query_param *param;
   419     int idx;
   720     int idx;
   420     
   721     
   421     // alloc new query
   722     // alloc new query
   422     if ((query = _evsql_query_new(evsql, query_fn, cb_arg)) == NULL)
   723     if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL)
   423         goto error;
   724         goto error;
   424 
   725 
   425     // count the params
   726     // count the params
   426     for (param = params->list; param->type; param++) 
   727     for (param = params->list; param->type; param++) 
   427         query->params.count++;
   728         query->params.count++;
   462         default:
   763         default:
   463             FATAL("params.result_fmt: %d", params->result_fmt);
   764             FATAL("params.result_fmt: %d", params->result_fmt);
   464     }
   765     }
   465 
   766 
   466     // execute it
   767     // execute it
   467     if (_evsql_query_enqueue(evsql, query, command))
   768     if (_evsql_query_enqueue(evsql, trans, query, command))
   468         goto error;
   769         goto error;
   469 
   770 
   470     // ok
   771     // ok
   471     return query;
   772     return query;
   472 
   773 
   474     _evsql_query_free(query);
   775     _evsql_query_free(query);
   475     
   776     
   476     return NULL;
   777     return NULL;
   477 }
   778 }
   478 
   779 
   479 int evsql_param_string (struct evsql_query_params *params, size_t param, const char *ptr) {
       
   480     struct evsql_query_param *p = &params->list[param];
       
   481     
       
   482     assert(p->type == EVSQL_PARAM_STRING);
       
   483 
       
   484     p->data_raw = ptr;
       
   485     p->length = 0;
       
   486 
       
   487     return 0;
       
   488 }
       
   489 
       
   490 int evsql_param_uint32 (struct evsql_query_params *params, size_t param, uint32_t uval) {
       
   491     struct evsql_query_param *p = &params->list[param];
       
   492     
       
   493     assert(p->type == EVSQL_PARAM_UINT32);
       
   494 
       
   495     p->data.uint32 = htonl(uval);
       
   496     p->data_raw = (const char *) &p->data.uint32;
       
   497     p->length = sizeof(uval);
       
   498 
       
   499     return 0;
       
   500 }
       
   501 
       
   502 const char *evsql_result_error (const struct evsql_result_info *res) {
       
   503     if (!res->error)
       
   504         return "No error";
       
   505 
       
   506     switch (res->evsql->type) {
       
   507         case EVSQL_EVPQ:
       
   508             if (!res->result.pq)
       
   509                 return "unknown error (no result)";
       
   510             
       
   511             return PQresultErrorMessage(res->result.pq);
       
   512 
       
   513         default:
       
   514             FATAL("res->evsql->type");
       
   515     }
       
   516 
       
   517 }
       
   518 
       
   519 size_t evsql_result_rows (const struct evsql_result_info *res) {
       
   520     switch (res->evsql->type) {
       
   521         case EVSQL_EVPQ:
       
   522             return PQntuples(res->result.pq);
       
   523 
       
   524         default:
       
   525             FATAL("res->evsql->type");
       
   526     }
       
   527 }
       
   528 
       
   529 size_t evsql_result_cols (const struct evsql_result_info *res) {
       
   530     switch (res->evsql->type) {
       
   531         case EVSQL_EVPQ:
       
   532             return PQnfields(res->result.pq);
       
   533 
       
   534         default:
       
   535             FATAL("res->evsql->type");
       
   536     }
       
   537 }
       
   538 
       
   539 int evsql_result_binary (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, size_t size, int nullok) {
       
   540     *ptr = NULL;
       
   541 
       
   542     switch (res->evsql->type) {
       
   543         case EVSQL_EVPQ:
       
   544             if (PQgetisnull(res->result.pq, row, col)) {
       
   545                 if (nullok)
       
   546                     return 0;
       
   547                 else
       
   548                     ERROR("[%zu:%zu] field is null", row, col);
       
   549             }
       
   550 
       
   551             if (PQfformat(res->result.pq, col) != 1)
       
   552                 ERROR("[%zu:%zu] PQfformat is not binary: %d", row, col, PQfformat(res->result.pq, col));
       
   553     
       
   554             if (size && PQgetlength(res->result.pq, row, col) != size)
       
   555                 ERROR("[%zu:%zu] field size mismatch: %zu -> %d", row, col, size, PQgetlength(res->result.pq, row, col));
       
   556 
       
   557             *ptr = PQgetvalue(res->result.pq, row, col);
       
   558 
       
   559             return 0;
       
   560 
       
   561         default:
       
   562             FATAL("res->evsql->type");
       
   563     }
       
   564 
       
   565 error:
       
   566     return -1;
       
   567 }
       
   568 
       
   569 int evsql_result_string (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, int nullok) {
       
   570     return evsql_result_binary(res, row, col, ptr, 0, nullok);
       
   571 }
       
   572 
       
   573 int evsql_result_uint16 (const struct evsql_result_info *res, size_t row, size_t col, uint16_t *uval, int nullok) {
       
   574     const char *data;
       
   575     int16_t sval;
       
   576 
       
   577     if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok))
       
   578         goto error;
       
   579 
       
   580     sval = ntohs(*((int16_t *) data));
       
   581 
       
   582     if (sval < 0)
       
   583         ERROR("negative value for unsigned: %d", sval);
       
   584 
       
   585     *uval = sval;
       
   586     
       
   587     return 0;
       
   588 
       
   589 error:
       
   590     return nullok ? 0 : -1;
       
   591 }
       
   592 
       
   593 int evsql_result_uint32 (const struct evsql_result_info *res, size_t row, size_t col, uint32_t *uval, int nullok) {
       
   594     const char *data;
       
   595     int32_t sval;
       
   596 
       
   597     if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok))
       
   598         goto error;
       
   599 
       
   600     sval = ntohl(*(int32_t *) data);
       
   601 
       
   602     if (sval < 0)
       
   603         ERROR("negative value for unsigned: %d", sval);
       
   604 
       
   605     *uval = sval;
       
   606     
       
   607     return 0;
       
   608 
       
   609 error:
       
   610     return nullok ? 0 : -1;
       
   611 }
       
   612 
       
   613 int evsql_result_uint64 (const struct evsql_result_info *res, size_t row, size_t col, uint64_t *uval, int nullok) {
       
   614     const char *data;
       
   615     int64_t sval;
       
   616 
       
   617     if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok))
       
   618         goto error;
       
   619 
       
   620     sval = ntohq(*(int64_t *) data);
       
   621 
       
   622     if (sval < 0)
       
   623         ERROR("negative value for unsigned: %ld", sval);
       
   624 
       
   625     *uval = sval;
       
   626     
       
   627     return 0;
       
   628 
       
   629 error:
       
   630     return nullok ? 0 : -1;
       
   631 }
       
   632 
       
   633 void evsql_result_free (const struct evsql_result_info *res) {
       
   634     switch (res->evsql->type) {
       
   635         case EVSQL_EVPQ:
       
   636             return PQclear(res->result.pq);
       
   637 
       
   638         default:
       
   639             FATAL("res->evsql->type");
       
   640     }
       
   641 }