diff -r f0ef6d8880b4 -r e5da1d428e3e src/evsql.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/evsql.c Sun Oct 12 00:17:09 2008 +0300 @@ -0,0 +1,344 @@ +#define _GNU_SOURCE +#include +#include +#include +#include + +#include "evsql.h" +#include "evpq.h" +#include "lib/log.h" +#include "lib/error.h" +#include "lib/misc.h" + +enum evsql_type { + EVSQL_EVPQ, +}; + +struct evsql { + // callbacks + evsql_error_cb error_fn; + void *cb_arg; + + // backend engine + enum evsql_type type; + + union { + struct evpq_conn *evpq; + } engine; + + // list of queries running or waiting to run + TAILQ_HEAD(evsql_queue, evsql_query) queue; +}; + +struct evsql_query { + // the evsql we are querying + struct evsql *evsql; + + // the actual SQL query, this may or may not be ours, see _evsql_query_exec + char *command; + + // our callback + evsql_query_cb cb_fn; + void *cb_arg; + + // our position in the query list + TAILQ_ENTRY(evsql_query) entry; + + // the result + union { + PGresult *evpq; + } result; +}; + +/* + * Actually execute the given query. + * + * The backend should be able to accept the query at this time. + * + * query->command must be valid during the execution of this function, but once it returns, the command is not needed + * anymore, and should be set to NULL. + */ +static int _evsql_query_exec (struct evsql *evsql, struct evsql_query *query, const char *command) { + switch (evsql->type) { + case EVSQL_EVPQ: + // just pass it through + return evpq_query(evsql->engine.evpq, command); + + default: + FATAL("evsql->type"); + } +} + +/* + * Dequeue the query, execute the callback, and free it. + */ +static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *result_info) { + assert(query->command == NULL); + + // dequeue + TAILQ_REMOVE(&query->evsql->queue, query, entry); + + if (result_info) + // call the callback + query->cb_fn(*result_info, query->cb_arg); + + // free + free(query); +} + +/* + * A query has failed, notify the user and remove it. + */ +static void _evsql_query_failure (struct evsql *evsql, struct evsql_query *query) { + struct evsql_result_info result; ZINIT(result); + + // set up the result_info + result.evsql = evsql; + result.error = 1; + + // finish it off + _evsql_query_done(query, &result); +} + +/* + * Clear every enqueued query and then free the evsql. + * + * If result_info is given, each query will also recieve it via their callback, and the error_fn will be called. + */ +static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *result_info) { + struct evsql_query *query; + + // clear the queue + while ((query = TAILQ_FIRST(&evsql->queue)) != NULL) { + _evsql_query_done(query, result_info); + + TAILQ_REMOVE(&evsql->queue, query, entry); + } + + // do the error callback if required + if (result_info) + evsql->error_fn(evsql, evsql->cb_arg); + + // free + free(evsql); +} + + +/* + * Sends the next query if there are more enqueued + */ +static void _evsql_pump (struct evsql *evsql) { + struct evsql_query *query; + + // look for the next query + if ((query = TAILQ_FIRST(&evsql->queue)) != NULL) { + // try and execute it + if (_evsql_query_exec(evsql, query, query->command)) { + // the query failed + _evsql_query_failure(evsql, query); + } + + // free the command + free(query->command); query->command = NULL; + + // ok, then we just wait + } +} + + +static void _evsql_evpq_connected (struct evpq_conn *conn, void *arg) { + struct evsql *evsql = arg; + + // no state to update, just pump any waiting queries + _evsql_pump(evsql); +} + +static void _evsql_evpq_result (struct evpq_conn *conn, PGresult *result, void *arg) { + struct evsql *evsql = arg; + struct evsql_query *query; + + assert((query = TAILQ_FIRST(&evsql->queue)) != NULL); + + // if we get multiple results, only return the first one + if (query->result.evpq) { + WARNING("[evsql] evpq query returned multiple results, discarding previous one"); + + PQclear(query->result.evpq); query->result.evpq = NULL; + } + + // remember the result + query->result.evpq = result; +} + +static void _evsql_evpq_done (struct evpq_conn *conn, void *arg) { + struct evsql *evsql = arg; + struct evsql_query *query; + struct evsql_result_info result; ZINIT(result); + + assert((query = TAILQ_FIRST(&evsql->queue)) != NULL); + + // set up the result_info + result.evsql = evsql; + + if (query->result.evpq == NULL) { + // if a query didn't return any results (bug?), warn and fail the query + WARNING("[evsql] evpq query didn't return any results"); + + result.error = 1; + + } else { + result.error = 0; + result.result.pq = query->result.evpq; + + } + + // finish it off + _evsql_query_done(query, &result); + + // pump the next one + _evsql_pump(evsql); +} + +static void _evsql_evpq_failure (struct evpq_conn *conn, void *arg) { + struct evsql *evsql = arg; + struct evsql_result_info result; ZINIT(result); + + // OH SHI... + + // set up the result_info + result.evsql = evsql; + result.error = 1; + + // finish off the whole connection + _evsql_destroy(evsql, &result); +} + +static struct evpq_callback_info _evsql_evpq_cb_info = { + .fn_connected = _evsql_evpq_connected, + .fn_result = _evsql_evpq_result, + .fn_done = _evsql_evpq_done, + .fn_failure = _evsql_evpq_failure, +}; + +static struct evsql *_evsql_new_base (evsql_error_cb error_fn, void *cb_arg) { + struct evsql *evsql = NULL; + + // allocate it + if ((evsql = calloc(1, sizeof(*evsql))) == NULL) + ERROR("calloc"); + + // store + evsql->error_fn = error_fn; + evsql->cb_arg = cb_arg; + + // init + TAILQ_INIT(&evsql->queue); + + // done + return evsql; + +error: + return NULL; +} + +struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) { + struct evsql *evsql = NULL; + + // base init + if ((evsql = _evsql_new_base (error_fn, cb_arg)) == NULL) + goto error; + + // connect the engine + if ((evsql->engine.evpq = evpq_connect(ev_base, pq_conninfo, _evsql_evpq_cb_info, evsql)) == NULL) + goto error; + + // done + return evsql; + +error: + // XXX: more complicated than this? + free(evsql); + + return NULL; +} + +/* + * Checks what the state of the connection is in regards to executing a query. + * + * Returns: + * <0 connection failure, query not possible + * 0 connection idle, can query immediately + * 1 connection busy, must queue query + */ +static int _evsql_query_idle (struct evsql *evsql) { + switch (evsql->type) { + case EVSQL_EVPQ: { + enum evpq_state state = evpq_state(evsql->engine.evpq); + + switch (state) { + case EVPQ_CONNECT: + case EVPQ_QUERY: + return 1; + + case EVPQ_CONNECTED: + return 0; + + case EVPQ_INIT: + case EVPQ_FAILURE: + return -1; + + default: + FATAL("evpq_state"); + } + + } + + default: + FATAL("evsql->type"); + } +} + + +struct evsql_query *evsql_query (struct evsql *evsql, const char *command, evsql_query_cb query_fn, void *cb_arg) { + struct evsql_query *query; + int idle; + + // allocate it + if ((query = calloc(1, sizeof(*query))) == NULL) + ERROR("calloc"); + + // store + query->evsql = evsql; + query->cb_fn = query_fn; + query->cb_arg = cb_arg; + + // check state + if ((idle = _evsql_query_idle(evsql)) < 0) + ERROR("connection is not valid"); + + if (idle) { + assert(TAILQ_EMPTY(&evsql->queue)); + + // execute directly + if (_evsql_query_exec(evsql, query, command)) + goto error; + + } else { + // copy the command for later execution + if ((query->command = strdup(command)) == NULL) + ERROR("strdup"); + } + + // store it on the list + TAILQ_INSERT_TAIL(&evsql->queue, query, entry); + + // success + return query; + +error: + // do *NOT* free query->command, ever + free(query); + + return NULL; +} +