#define _GNU_SOURCE
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#include "internal.h"
#include "lib/log.h"
#include "lib/error.h"
#include "lib/misc.h"
/*
* A couple function prototypes
*/
static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn);
/*
* Actually execute the given query.
*
* The backend should be able to accept the query at this time.
*
* You should assume that if trying to execute a query fails, then the connection should also be considred as failed.
*/
static int _evsql_query_exec (struct evsql_conn *conn, struct evsql_query *query, const char *command) {
int err;
DEBUG("evsql.%p: exec query=%p on trans=%p on conn=%p:", conn->evsql, query, conn->trans, conn);
switch (conn->evsql->type) {
case EVSQL_EVPQ:
// got params?
if (query->params.count) {
err = evpq_query_params(conn->engine.evpq, command,
query->params.count,
query->params.types,
query->params.values,
query->params.lengths,
query->params.formats,
query->params.result_format
);
} else {
// plain 'ole query
err = evpq_query(conn->engine.evpq, command);
}
if (err) {
if (PQstatus(evpq_pgconn(conn->engine.evpq)) != CONNECTION_OK)
WARNING("conn failed");
else
WARNING("query failed, dropping conn as well");
}
break;
default:
FATAL("evsql->type");
}
if (!err)
// assign the query
conn->query = query;
return err;
}
void _evsql_query_free (struct evsql_query *query) {
if (!query)
return;
assert(query->command == NULL);
// free params if present
free(query->params.types);
free(query->params.values);
free(query->params.lengths);
free(query->params.formats);
// free the query itself
free(query);
}
/*
* Execute the callback if res is given, and free the query.
*
* The query has been aborted, it will simply be freed
*/
static void _evsql_query_done (struct evsql_query *query, struct evsql_result *res) {
if (res) {
if (query->cb_fn) {
// call the callback
query->cb_fn(res, query->cb_arg);
} else {
WARNING("supressing cb_fn because query was aborted");
// free the results
evsql_result_free(res);
}
}
// free
_evsql_query_free(query);
}
/*
* XXX:
* /
static void _evsql_destroy (struct evsql *evsql, const struct evsql_result *res) {
struct evsql_query *query;
// clear the queue
while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
_evsql_query_done(query, res);
TAILQ_REMOVE(&evsql->query_queue, query, entry);
}
// free
free(evsql);
}
*/
/*
* Free the transaction, it should already be deassociated from the query and conn.
*/
static void _evsql_trans_free (struct evsql_trans *trans) {
// ensure we don't leak anything
assert(trans->query == NULL);
assert(trans->conn == NULL);
// free
free(trans);
}
/*
* Release a connection. It should already be deassociated from the trans and query.
*
* Releases the engine, removes from the conn_list and frees this.
*/
static void _evsql_conn_release (struct evsql_conn *conn) {
// ensure we don't leak anything
assert(conn->trans == NULL);
assert(conn->query == NULL);
// release the engine
switch (conn->evsql->type) {
case EVSQL_EVPQ:
evpq_release(conn->engine.evpq);
break;
default:
FATAL("evsql->type");
}
// remove from list
LIST_REMOVE(conn, entry);
// catch deadlocks
// XXX: still just assert?
assert(!LIST_EMPTY(&conn->evsql->conn_list) || TAILQ_EMPTY(&conn->evsql->query_queue));
// free
free(conn);
}
/*
* Release a transaction, it should already be deassociated from the query.
*
* Perform a two-way-deassociation with the conn, and then free the trans.
*/
static void _evsql_trans_release (struct evsql_trans *trans) {
assert(trans->query == NULL);
assert(trans->conn != NULL);
// deassociate the conn
trans->conn->trans = NULL; trans->conn = NULL;
// free the trans
_evsql_trans_free(trans);
}
/*
* Fail a single query, this will trigger the callback and free it.
*
* NOTE: Only for *TRANSACTIONLESS* queries.
*/
static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) {
struct evsql_result res; ZINIT(res);
// set up the result_info
res.evsql = evsql;
res.error = 1;
// finish off the query
_evsql_query_done(query, &res);
}
/*
* Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the
* conn, and then free the trans.
*/
static void _evsql_trans_fail (struct evsql_trans *trans) {
if (trans->query) {
// free the query silently
_evsql_query_free(trans->query); trans->query = NULL;
// also deassociate it from the conn!
trans->conn->query = NULL;
}
// tell the user
// XXX: trans is in a bad state during this call
if (trans->error_fn)
trans->error_fn(trans, trans->cb_arg);
else
WARNING("supressing error because error_fn was NULL");
// deassociate and release the conn
trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL;
// pump the queue for requests that were waiting for this connection
_evsql_pump(trans->evsql, NULL);
// free the trans
_evsql_trans_free(trans);
}
/*
* Fail a connection. If the connection is transactional, this will just call _evsql_trans_fail, but otherwise it will
* fail any ongoing query, and then release the connection.
*/
static void _evsql_conn_fail (struct evsql_conn *conn) {
if (conn->trans) {
// let transactions handle their connection failures
_evsql_trans_fail(conn->trans);
} else {
if (conn->query) {
// fail the in-progress query
_evsql_query_fail(conn->evsql, conn->query); conn->query = NULL;
}
// finish off the whole connection
_evsql_conn_release(conn);
}
}
/*
* Processes enqueued non-transactional queries until the queue is empty, or we managed to exec a query.
*
* If execing a query on a connection fails, both the query and the connection are failed (in that order).
*
* Any further queries will then also be failed, because there's no reconnection/retry logic yet.
*
* This means that if conn is NULL, all queries are failed.
*/
static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) {
struct evsql_query *query;
int err;
// look for waiting queries
while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
// zero err
err = 0;
// dequeue
TAILQ_REMOVE(&evsql->query_queue, query, entry);
if (conn) {
// try and execute it
err = _evsql_query_exec(conn, query, query->command);
}
// free the command buf
free(query->command); query->command = NULL;
if (err || !conn) {
if (!conn) {
// warn when dropping queries
WARNING("failing query becuse there are no conns");
}
// fail the query
_evsql_query_fail(evsql, query);
if (conn) {
// fail the connection
WARNING("failing the connection because a query-exec failed");
_evsql_conn_fail(conn); conn = NULL;
}
} else {
// we have succesfully enqueued a query, and we can wait for this connection to complete
break;
}
// handle the rest of the queue
}
// ok
return;
}
/*
* Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use.
*/
static void _evsql_trans_ready (struct evsql_result *res, void *arg) {
struct evsql_trans *trans = arg;
assert(trans != NULL);
// check for errors
if (res->error)
ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res));
// transaction is now ready for use
trans->ready_fn(trans, trans->cb_arg);
// good
return;
error:
_evsql_trans_fail(trans);
}
/*
* The transaction's connection is ready, send the 'BEGIN' query.
*
* If anything fails, calls _evsql_trans_fail and returns nonzero, zero on success
*/
static int _evsql_trans_conn_ready (struct evsql *evsql, struct evsql_trans *trans) {
char trans_sql[EVSQL_QUERY_BEGIN_BUF];
const char *isolation_level;
int ret;
// determine the isolation_level to use
switch (trans->type) {
case EVSQL_TRANS_DEFAULT:
isolation_level = NULL; break;
case EVSQL_TRANS_SERIALIZABLE:
isolation_level = "SERIALIZABLE"; break;
case EVSQL_TRANS_REPEATABLE_READ:
isolation_level = "REPEATABLE READ"; break;
case EVSQL_TRANS_READ_COMMITTED:
isolation_level = "READ COMMITTED"; break;
case EVSQL_TRANS_READ_UNCOMMITTED:
isolation_level = "READ UNCOMMITTED"; break;
default:
FATAL("trans->type: %d", trans->type);
}
// build the trans_sql
if (isolation_level)
ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION ISOLATION LEVEL %s", isolation_level);
else
ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION");
// make sure it wasn't truncated
if (ret >= EVSQL_QUERY_BEGIN_BUF)
ERROR("trans_sql overflow: %d >= %d", ret, EVSQL_QUERY_BEGIN_BUF);
// execute the query
if (evsql_query(evsql, trans, trans_sql, _evsql_trans_ready, trans) == NULL)
ERROR("evsql_query");
// success
return 0;
error:
// fail the transaction
_evsql_trans_fail(trans);
return -1;
}
/*
* The evpq connection was succesfully established.
*/
static void _evsql_evpq_connected (struct evpq_conn *_conn, void *arg) {
struct evsql_conn *conn = arg;
if (conn->trans)
// notify the transaction
// don't care about errors
(void) _evsql_trans_conn_ready(conn->evsql, conn->trans);
else
// pump any waiting transactionless queries
_evsql_pump(conn->evsql, conn);
}
/*
* Got one result on this evpq connection.
*/
static void _evsql_evpq_result (struct evpq_conn *_conn, PGresult *result, void *arg) {
struct evsql_conn *conn = arg;
struct evsql_query *query = conn->query;
assert(query != NULL);
// if we get multiple results, only return the first one
if (query->result.pq) {
WARNING("[evsql] evpq query returned multiple results, discarding previous one");
PQclear(query->result.pq); query->result.pq = NULL;
}
// remember the result
query->result.pq = result;
}
/*
* No more results for this query.
*/
static void _evsql_evpq_done (struct evpq_conn *_conn, void *arg) {
struct evsql_conn *conn = arg;
struct evsql_query *query = conn->query;
struct evsql_result res; ZINIT(res);
assert(query != NULL);
// set up the result_info
res.evsql = conn->evsql;
res.result = query->result;
if (query->result.pq == NULL) {
// if a query didn't return any results (bug?), warn and fail the query
WARNING("[evsql] evpq query didn't return any results");
res.error = 1;
} else if (strcmp(PQresultErrorMessage(query->result.pq), "") != 0) {
// the query failed with some error
res.error = 1;
} else {
// the query succeeded \o/
res.error = 0;
}
// de-associate the query from the connection
conn->query = NULL;
// how we handle query completion depends on if we're a transaction or not
if (conn->trans) {
// we can deassign the trans's query
conn->trans->query = NULL;
// was an abort?
if (!query->cb_fn)
// notify the user that the transaction query has been aborted
conn->trans->ready_fn(conn->trans, conn->trans->cb_arg);
// then hand the query to the user
_evsql_query_done(query, &res);
} else {
// a transactionless query, so just finish it off and pump any other waiting ones
_evsql_query_done(query, &res);
// pump the next one
_evsql_pump(conn->evsql, conn);
}
}
/*
* The connection failed.
*/
static void _evsql_evpq_failure (struct evpq_conn *_conn, void *arg) {
struct evsql_conn *conn = arg;
// just fail the conn
_evsql_conn_fail(conn);
}
/*
* Our evpq behaviour
*/
static struct evpq_callback_info _evsql_evpq_cb_info = {
.fn_connected = _evsql_evpq_connected,
.fn_result = _evsql_evpq_result,
.fn_done = _evsql_evpq_done,
.fn_failure = _evsql_evpq_failure,
};
/*
* Allocate the generic evsql context.
*/
static struct evsql *_evsql_new_base (struct event_base *ev_base, evsql_error_cb error_fn, void *cb_arg) {
struct evsql *evsql = NULL;
// allocate it
if ((evsql = calloc(1, sizeof(*evsql))) == NULL)
ERROR("calloc");
// store
evsql->ev_base = ev_base;
evsql->error_fn = error_fn;
evsql->cb_arg = cb_arg;
// init
LIST_INIT(&evsql->conn_list);
TAILQ_INIT(&evsql->query_queue);
// done
return evsql;
error:
return NULL;
}
/*
* Start a new connection and add it to the list, it won't be ready until _evsql_evpq_connected is called
*/
static struct evsql_conn *_evsql_conn_new (struct evsql *evsql) {
struct evsql_conn *conn = NULL;
// allocate
if ((conn = calloc(1, sizeof(*conn))) == NULL)
ERROR("calloc");
// init
conn->evsql = evsql;
// connect the engine
switch (evsql->type) {
case EVSQL_EVPQ:
if ((conn->engine.evpq = evpq_connect(evsql->ev_base, evsql->engine_conf.evpq, _evsql_evpq_cb_info, conn)) == NULL)
goto error;
break;
default:
FATAL("evsql->type");
}
// add it to the list
LIST_INSERT_HEAD(&evsql->conn_list, conn, entry);
// success
return conn;
error:
free(conn);
return NULL;
}
struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) {
struct evsql *evsql = NULL;
// base init
if ((evsql = _evsql_new_base (ev_base, error_fn, cb_arg)) == NULL)
goto error;
// store conf
evsql->engine_conf.evpq = pq_conninfo;
// pre-create one connection
if (_evsql_conn_new(evsql) == NULL)
goto error;
// done
return evsql;
error:
// there's no other state yet
free(evsql);
return NULL;
}
/*
* Checks if the connection is already allocated for some other trans/query.
*
* Returns:
* 0 connection idle, can be allocated
* >1 connection busy
*/
static int _evsql_conn_busy (struct evsql_conn *conn) {
// transactions get the connection to themselves
if (conn->trans)
return 1;
// if it has a query assigned, it's busy
if (conn->query)
return 1;
// otherwise, it's all idle
return 0;
}
/*
* Checks if the connection is ready for use (i.e. _evsql_evpq_connected was called).
*
* The connection should not already have a query running.
*
* Returns
* <0 the connection is not valid (failed, query in progress)
* 0 the connection is still pending, and will become ready at some point
* >0 it's ready
*/
static int _evsql_conn_ready (struct evsql_conn *conn) {
switch (conn->evsql->type) {
case EVSQL_EVPQ: {
enum evpq_state state = evpq_state(conn->engine.evpq);
switch (state) {
case EVPQ_CONNECT:
return 0;
case EVPQ_CONNECTED:
return 1;
case EVPQ_QUERY:
case EVPQ_INIT:
case EVPQ_FAILURE:
return -1;
default:
FATAL("evpq_state: %d", state);
}
}
default:
FATAL("evsql->type: %d", conn->evsql->type);
}
}
/*
* Allocate a connection for use and return it via *conn_ptr, or if may_queue is nonzero and the connection pool is
* getting full, return NULL (query should be queued).
*
* Note that the returned connection might not be ready for use yet (if we created a new one, see _evsql_conn_ready).
*
* Returns zero if a connection was found or the request should be queued, or nonzero if something failed and the
* request should be dropped.
*/
static int _evsql_conn_get (struct evsql *evsql, struct evsql_conn **conn_ptr, int may_queue) {
int have_nontrans = 0;
*conn_ptr = NULL;
// find a connection that isn't busy and is ready (unless the query queue is empty).
LIST_FOREACH(*conn_ptr, &evsql->conn_list, entry) {
// we can only have a query enqueue itself if there is a non-trans conn it can later use
if (!(*conn_ptr)->trans)
have_nontrans = 1;
// skip busy conns always
if (_evsql_conn_busy(*conn_ptr))
continue;
// accept pending conns as long as there are NO enqueued queries (might cause deadlock otherwise)
if (_evsql_conn_ready(*conn_ptr) == 0 && TAILQ_EMPTY(&evsql->query_queue))
break;
// accept conns that are in a fully ready state
if (_evsql_conn_ready(*conn_ptr) > 0)
break;
}
// if we found an idle connection, we can just return that right away
if (*conn_ptr)
return 0;
// return NULL if may_queue and we have a non-trans conn that we can, at some point, use
if (may_queue && have_nontrans)
return 0;
// we need to open a new connection
if ((*conn_ptr = _evsql_conn_new(evsql)) == NULL)
goto error;
// good
return 0;
error:
return -1;
}
struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, evsql_trans_done_cb done_fn, void *cb_arg) {
struct evsql_trans *trans = NULL;
// allocate it
if ((trans = calloc(1, sizeof(*trans))) == NULL)
ERROR("calloc");
// store
trans->evsql = evsql;
trans->ready_fn = ready_fn;
trans->done_fn = done_fn;
trans->cb_arg = cb_arg;
trans->type = type;
// find a connection
if (_evsql_conn_get(evsql, &trans->conn, 0))
ERROR("_evsql_conn_get");
// associate the conn
trans->conn->trans = trans;
// is it already ready?
if (_evsql_conn_ready(trans->conn) > 0) {
// call _evsql_trans_conn_ready directly, it will handle cleanup (silently, !error_fn)
if (_evsql_trans_conn_ready(evsql, trans)) {
// return NULL directly
return NULL;
}
} else {
// otherwise, wait for the conn to be ready
}
// and let it pass errors to the user
trans->error_fn = error_fn;
// ok
return trans;
error:
free(trans);
return NULL;
}
/*
* Internal query functions
*/
struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) {
struct evsql_query *query = NULL;
// if it's part of a trans, then make sure the trans is idle
if (trans && trans->query)
ERROR("transaction is busy");
// allocate it
if ((query = calloc(1, sizeof(*query))) == NULL)
ERROR("calloc");
// store
query->cb_fn = query_fn;
query->cb_arg = cb_arg;
// success
return query;
error:
return NULL;
}
int _evsql_query_enqueue (struct evsql *evsql, struct evsql_trans *trans, struct evsql_query *query, const char *command) {
// transaction queries are handled differently
if (trans) {
// it's an in-transaction query
assert(trans->query == NULL);
// assign the query
trans->query = query;
// execute directly
if (_evsql_query_exec(trans->conn, query, command)) {
// ack, fail the transaction
_evsql_trans_fail(trans);
// caller frees query
goto error;
}
} else {
struct evsql_conn *conn;
// find an idle connection
if ((_evsql_conn_get(evsql, &conn, 1)))
ERROR("couldn't allocate a connection for the query");
// we must enqueue if no idle conn or the conn is not yet ready
if (conn && _evsql_conn_ready(conn) > 0) {
// execute directly
if (_evsql_query_exec(conn, query, command)) {
// ack, fail the connection
_evsql_conn_fail(conn);
// make sure we don't deadlock any queries, but if this query got a conn directly, then we shouldn't
// have any queries enqueued anyways
assert(TAILQ_EMPTY(&evsql->query_queue));
// caller frees query
goto error;
}
} else {
// copy the command for later execution
if ((query->command = strdup(command)) == NULL)
ERROR("strdup");
// enqueue until some connection pumps the queue
TAILQ_INSERT_TAIL(&evsql->query_queue, query, entry);
}
}
// ok, good
return 0;
error:
return -1;
}
void _evsql_trans_commit_res (struct evsql_result *res, void *arg) {
struct evsql_trans *trans = arg;
// check for errors
if (res->error)
ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res));
// transaction is now done
trans->done_fn(trans, trans->cb_arg);
// release it
_evsql_trans_release(trans);
// success
return;
error:
_evsql_trans_fail(trans);
}
int evsql_trans_commit (struct evsql_trans *trans) {
static const char *sql = "COMMIT TRANSACTION";
if (trans->query)
ERROR("cannot COMMIT because transaction is still busy");
// query
if (evsql_query(trans->evsql, trans, sql, _evsql_trans_commit_res, trans) == NULL)
goto error;
// mark it as commited in case someone wants to abort it
trans->has_commit = 1;
// success
return 0;
error:
return -1;
}
void _evsql_trans_rollback_res (struct evsql_result *res, void *arg) {
struct evsql_trans *trans = arg;
// fail the connection on errors
if (res->error)
ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res));
// release it
_evsql_trans_release(trans);
// success
return;
error:
// fail the connection too, errors are supressed
_evsql_trans_fail(trans);
}
/*
* Used as the ready_fn callback in case of abort, otherwise directly
*/
void _evsql_trans_rollback (struct evsql_trans *trans, void *arg) {
static const char *sql = "ROLLBACK TRANSACTION";
(void) arg;
// query
if (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, trans) == NULL) {
// fail the transaction/connection, errors are supressed
_evsql_trans_fail(trans);
}
}
void evsql_trans_abort (struct evsql_trans *trans) {
// supress errors
trans->error_fn = NULL;
if (trans->has_commit) {
// abort after commit doesn't make sense
FATAL("transaction was already commited");
}
if (trans->query) {
// gah, some query is running
WARNING("aborting pending query");
// prepare to rollback once complete by hijacking ready_fn
trans->ready_fn = _evsql_trans_rollback;
// abort
evsql_query_abort(trans, trans->query);
} else {
// just rollback directly
_evsql_trans_rollback(trans, NULL);
}
}
void evsql_destroy (struct evsql *evsql) {
struct evsql_query *query;
struct evsql_conn *conn;
// kill off all queued queries
while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
// just free it, command first
free(query->command); query->command = NULL;
_evsql_query_free(query);
}
// kill off all connections
while ((conn = LIST_FIRST(&evsql->conn_list)) != NULL) {
// kill off the query
if (conn->query) {
free(query->command); query->command = NULL;
_evsql_query_free(query);
conn->query = NULL;
}
// kill off the transaction
if (conn->trans) {
conn->trans->query = NULL;
_evsql_trans_release(conn->trans);
}
// kill it off
_evsql_conn_release(conn);
}
// then free the evsql itself
free(evsql);
}
void _evsql_destroy_handler (int fd, short what, void *arg)
{
struct evsql *evsql = arg;
evsql_destroy(evsql);
}
evsql_err_t evsql_destroy_next (struct evsql *evsql)
{
struct timeval tv = {0, 0};
// schedule a one-time event
return event_base_once(evsql->ev_base, 0, EV_TIMEOUT, &_evsql_destroy_handler, evsql, &tv);
}