src/evsql.c
changeset 21 e5da1d428e3e
child 23 1dee73ae4ad0
equal deleted inserted replaced
20:f0ef6d8880b4 21:e5da1d428e3e
       
     1 #define _GNU_SOURCE
       
     2 #include <stdlib.h>
       
     3 #include <sys/queue.h>
       
     4 #include <assert.h>
       
     5 #include <string.h>
       
     6 
       
     7 #include "evsql.h"
       
     8 #include "evpq.h"
       
     9 #include "lib/log.h"
       
    10 #include "lib/error.h"
       
    11 #include "lib/misc.h"
       
    12 
       
    13 enum evsql_type {
       
    14     EVSQL_EVPQ,
       
    15 };
       
    16 
       
    17 struct evsql {
       
    18     // callbacks
       
    19     evsql_error_cb error_fn;
       
    20     void *cb_arg;
       
    21 
       
    22     // backend engine
       
    23     enum evsql_type type;
       
    24 
       
    25     union {
       
    26         struct evpq_conn *evpq;
       
    27     } engine;
       
    28     
       
    29     // list of queries running or waiting to run
       
    30     TAILQ_HEAD(evsql_queue, evsql_query) queue;
       
    31 };
       
    32 
       
    33 struct evsql_query {
       
    34     // the evsql we are querying
       
    35     struct evsql *evsql;
       
    36 
       
    37     // the actual SQL query, this may or may not be ours, see _evsql_query_exec
       
    38     char *command;
       
    39 
       
    40     // our callback
       
    41     evsql_query_cb cb_fn;
       
    42     void *cb_arg;
       
    43 
       
    44     // our position in the query list
       
    45     TAILQ_ENTRY(evsql_query) entry;
       
    46 
       
    47     // the result
       
    48     union {
       
    49         PGresult *evpq;
       
    50     } result;
       
    51 };
       
    52 
       
    53 /*
       
    54  * Actually execute the given query.
       
    55  *
       
    56  * The backend should be able to accept the query at this time.
       
    57  *
       
    58  * query->command must be valid during the execution of this function, but once it returns, the command is not needed
       
    59  * anymore, and should be set to NULL.
       
    60  */
       
    61 static int _evsql_query_exec (struct evsql *evsql, struct evsql_query *query, const char *command) {
       
    62     switch (evsql->type) {
       
    63         case EVSQL_EVPQ:
       
    64             // just pass it through
       
    65             return evpq_query(evsql->engine.evpq, command);
       
    66         
       
    67         default:
       
    68             FATAL("evsql->type");
       
    69     }
       
    70 }
       
    71 
       
    72 /*
       
    73  * Dequeue the query, execute the callback, and free it.
       
    74  */
       
    75 static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *result_info) {
       
    76     assert(query->command == NULL);
       
    77 
       
    78     // dequeue
       
    79     TAILQ_REMOVE(&query->evsql->queue, query, entry);
       
    80     
       
    81     if (result_info) 
       
    82         // call the callback
       
    83         query->cb_fn(*result_info, query->cb_arg);
       
    84     
       
    85     // free
       
    86     free(query);
       
    87 }
       
    88 
       
    89 /*
       
    90  * A query has failed, notify the user and remove it.
       
    91  */
       
    92 static void _evsql_query_failure (struct evsql *evsql, struct evsql_query *query) {
       
    93     struct evsql_result_info result; ZINIT(result);
       
    94 
       
    95     // set up the result_info
       
    96     result.evsql = evsql;
       
    97     result.error = 1;
       
    98 
       
    99     // finish it off
       
   100     _evsql_query_done(query, &result);
       
   101 }
       
   102 
       
   103 /*
       
   104  * Clear every enqueued query and then free the evsql.
       
   105  *
       
   106  * If result_info is given, each query will also recieve it via their callback, and the error_fn will be called.
       
   107  */
       
   108 static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *result_info) {
       
   109     struct evsql_query *query;
       
   110     
       
   111     // clear the queue
       
   112     while ((query = TAILQ_FIRST(&evsql->queue)) != NULL) {
       
   113         _evsql_query_done(query, result_info);
       
   114         
       
   115         TAILQ_REMOVE(&evsql->queue, query, entry);
       
   116     }
       
   117     
       
   118     // do the error callback if required
       
   119     if (result_info)
       
   120         evsql->error_fn(evsql, evsql->cb_arg);
       
   121     
       
   122     // free
       
   123     free(evsql);
       
   124 }
       
   125 
       
   126 
       
   127 /*
       
   128  * Sends the next query if there are more enqueued
       
   129  */
       
   130 static void _evsql_pump (struct evsql *evsql) {
       
   131     struct evsql_query *query;
       
   132     
       
   133     // look for the next query
       
   134     if ((query = TAILQ_FIRST(&evsql->queue)) != NULL) {
       
   135         // try and execute it
       
   136         if (_evsql_query_exec(evsql, query, query->command)) {
       
   137             // the query failed
       
   138             _evsql_query_failure(evsql, query);
       
   139         }
       
   140 
       
   141         // free the command
       
   142         free(query->command); query->command = NULL;
       
   143 
       
   144         // ok, then we just wait
       
   145     }
       
   146 }
       
   147 
       
   148 
       
   149 static void _evsql_evpq_connected (struct evpq_conn *conn, void *arg) {
       
   150     struct evsql *evsql = arg;
       
   151 
       
   152     // no state to update, just pump any waiting queries
       
   153     _evsql_pump(evsql);
       
   154 }
       
   155 
       
   156 static void _evsql_evpq_result (struct evpq_conn *conn, PGresult *result, void *arg) {
       
   157     struct evsql *evsql = arg;
       
   158     struct evsql_query *query;
       
   159 
       
   160     assert((query = TAILQ_FIRST(&evsql->queue)) != NULL);
       
   161 
       
   162     // if we get multiple results, only return the first one
       
   163     if (query->result.evpq) {
       
   164         WARNING("[evsql] evpq query returned multiple results, discarding previous one");
       
   165         
       
   166         PQclear(query->result.evpq); query->result.evpq = NULL;
       
   167     }
       
   168     
       
   169     // remember the result
       
   170     query->result.evpq = result;
       
   171 }
       
   172 
       
   173 static void _evsql_evpq_done (struct evpq_conn *conn, void *arg) {
       
   174     struct evsql *evsql = arg;
       
   175     struct evsql_query *query;
       
   176     struct evsql_result_info result; ZINIT(result);
       
   177 
       
   178     assert((query = TAILQ_FIRST(&evsql->queue)) != NULL);
       
   179     
       
   180     // set up the result_info
       
   181     result.evsql = evsql;
       
   182     
       
   183     if (query->result.evpq == NULL) {
       
   184         // if a query didn't return any results (bug?), warn and fail the query
       
   185         WARNING("[evsql] evpq query didn't return any results");
       
   186 
       
   187         result.error = 1;
       
   188 
       
   189     } else {
       
   190         result.error = 0;
       
   191         result.result.pq = query->result.evpq;
       
   192 
       
   193     }
       
   194 
       
   195     // finish it off
       
   196     _evsql_query_done(query, &result);
       
   197 
       
   198     // pump the next one
       
   199     _evsql_pump(evsql);
       
   200 }
       
   201 
       
   202 static void _evsql_evpq_failure (struct evpq_conn *conn, void *arg) {
       
   203     struct evsql *evsql = arg;
       
   204     struct evsql_result_info result; ZINIT(result);
       
   205     
       
   206     // OH SHI...
       
   207     
       
   208     // set up the result_info
       
   209     result.evsql = evsql;
       
   210     result.error = 1;
       
   211 
       
   212     // finish off the whole connection
       
   213     _evsql_destroy(evsql, &result);
       
   214 }
       
   215 
       
   216 static struct evpq_callback_info _evsql_evpq_cb_info = {
       
   217     .fn_connected       = _evsql_evpq_connected,
       
   218     .fn_result          = _evsql_evpq_result,
       
   219     .fn_done            = _evsql_evpq_done,
       
   220     .fn_failure         = _evsql_evpq_failure,
       
   221 };
       
   222 
       
   223 static struct evsql *_evsql_new_base (evsql_error_cb error_fn, void *cb_arg) {
       
   224     struct evsql *evsql = NULL;
       
   225     
       
   226     // allocate it
       
   227     if ((evsql = calloc(1, sizeof(*evsql))) == NULL)
       
   228         ERROR("calloc");
       
   229 
       
   230     // store
       
   231     evsql->error_fn = error_fn;
       
   232     evsql->cb_arg = cb_arg;
       
   233 
       
   234     // init
       
   235     TAILQ_INIT(&evsql->queue);
       
   236 
       
   237     // done
       
   238     return evsql;
       
   239 
       
   240 error:
       
   241     return NULL;
       
   242 }
       
   243 
       
   244 struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) {
       
   245     struct evsql *evsql = NULL;
       
   246     
       
   247     // base init
       
   248     if ((evsql = _evsql_new_base (error_fn, cb_arg)) == NULL)
       
   249         goto error;
       
   250 
       
   251     // connect the engine
       
   252     if ((evsql->engine.evpq = evpq_connect(ev_base, pq_conninfo, _evsql_evpq_cb_info, evsql)) == NULL)
       
   253         goto error;
       
   254 
       
   255     // done
       
   256     return evsql;
       
   257 
       
   258 error:
       
   259     // XXX: more complicated than this?
       
   260     free(evsql); 
       
   261 
       
   262     return NULL;
       
   263 }
       
   264 
       
   265 /*
       
   266  * Checks what the state of the connection is in regards to executing a query.
       
   267  *
       
   268  * Returns:
       
   269  *      <0      connection failure, query not possible
       
   270  *      0       connection idle, can query immediately
       
   271  *      1       connection busy, must queue query
       
   272  */
       
   273 static int _evsql_query_idle (struct evsql *evsql) {
       
   274     switch (evsql->type) {
       
   275         case EVSQL_EVPQ: {
       
   276             enum evpq_state state = evpq_state(evsql->engine.evpq);
       
   277             
       
   278             switch (state) {
       
   279                 case EVPQ_CONNECT:
       
   280                 case EVPQ_QUERY:
       
   281                     return 1;
       
   282                 
       
   283                 case EVPQ_CONNECTED:
       
   284                     return 0;
       
   285 
       
   286                 case EVPQ_INIT:
       
   287                 case EVPQ_FAILURE:
       
   288                     return -1;
       
   289                 
       
   290                 default:
       
   291                     FATAL("evpq_state");
       
   292             }
       
   293 
       
   294         }
       
   295         
       
   296         default:
       
   297             FATAL("evsql->type");
       
   298     }
       
   299 }
       
   300 
       
   301 
       
   302 struct evsql_query *evsql_query (struct evsql *evsql, const char *command, evsql_query_cb query_fn, void *cb_arg) {
       
   303     struct evsql_query *query;
       
   304     int idle;
       
   305 
       
   306     // allocate it
       
   307     if ((query = calloc(1, sizeof(*query))) == NULL)
       
   308         ERROR("calloc");
       
   309 
       
   310     // store
       
   311     query->evsql = evsql;
       
   312     query->cb_fn = query_fn;
       
   313     query->cb_arg = cb_arg;
       
   314     
       
   315     // check state
       
   316     if ((idle = _evsql_query_idle(evsql)) < 0)
       
   317         ERROR("connection is not valid");
       
   318     
       
   319     if (idle) {
       
   320         assert(TAILQ_EMPTY(&evsql->queue));
       
   321 
       
   322         // execute directly
       
   323         if (_evsql_query_exec(evsql, query, command))
       
   324             goto error;
       
   325 
       
   326     } else {
       
   327         // copy the command for later execution
       
   328         if ((query->command = strdup(command)) == NULL)
       
   329             ERROR("strdup");
       
   330     }
       
   331     
       
   332     // store it on the list
       
   333     TAILQ_INSERT_TAIL(&evsql->queue, query, entry);
       
   334     
       
   335     // success
       
   336     return query;
       
   337 
       
   338 error:
       
   339     // do *NOT* free query->command, ever
       
   340     free(query);
       
   341 
       
   342     return NULL;
       
   343 }
       
   344