--- a/src/evsql.c Sun Oct 12 14:57:06 2008 +0300
+++ b/src/evsql.c Sun Oct 12 20:10:47 2008 +0300
@@ -1,82 +1,31 @@
#define _GNU_SOURCE
#include <stdlib.h>
-#include <sys/queue.h>
#include <assert.h>
#include <string.h>
#include "evsql.h"
+#include "evsql_internal.h"
#include "evpq.h"
#include "lib/log.h"
#include "lib/error.h"
#include "lib/misc.h"
-enum evsql_type {
- EVSQL_EVPQ,
-};
-
-struct evsql {
- // callbacks
- evsql_error_cb error_fn;
- void *cb_arg;
-
- // backend engine
- enum evsql_type type;
-
- union {
- struct evpq_conn *evpq;
- } engine;
-
- // list of queries running or waiting to run
- TAILQ_HEAD(evsql_queue, evsql_query) queue;
-};
-
-struct evsql_query {
- // the evsql we are querying
- struct evsql *evsql;
-
- // the actual SQL query, this may or may not be ours, see _evsql_query_exec
- char *command;
-
- // possible query params
- struct evsql_query_param_info {
- int count;
-
- Oid *types;
- const char **values;
- int *lengths;
- int *formats;
-
- int result_format;
- } params;
-
- // our callback
- evsql_query_cb cb_fn;
- void *cb_arg;
-
- // our position in the query list
- TAILQ_ENTRY(evsql_query) entry;
-
- // the result
- union {
- PGresult *evpq;
- } result;
-};
-
/*
* Actually execute the given query.
*
* The backend should be able to accept the query at this time.
*
- * query->command must be valid during the execution of this function, but once it returns, the command is not needed
- * anymore, and should be set to NULL.
+ * You should assume that if trying to execute a query fails, then the connection should also be considred as failed.
*/
-static int _evsql_query_exec (struct evsql *evsql, struct evsql_query *query, const char *command) {
- switch (evsql->type) {
+static int _evsql_query_exec (struct evsql_conn *conn, struct evsql_query *query, const char *command) {
+ int err;
+
+ switch (conn->evsql->type) {
case EVSQL_EVPQ:
// got params?
if (query->params.count) {
- return evpq_query_params(evsql->engine.evpq, command,
+ err = evpq_query_params(conn->engine.evpq, command,
query->params.count,
query->params.types,
query->params.values,
@@ -87,16 +36,33 @@
} else {
// plain 'ole query
- return evpq_query(evsql->engine.evpq, command);
+ err = evpq_query(conn->engine.evpq, command);
}
+
+ if (err) {
+ if (PQstatus(evpq_pgconn(conn->engine.evpq)) != CONNECTION_OK)
+ WARNING("conn failed");
+ else
+ WARNING("query failed, dropping conn as well");
+ }
+
+ break;
default:
FATAL("evsql->type");
}
+
+ if (!err)
+ // assign the query
+ conn->query = query;
+
+ return err;
}
/*
- * Free the query and related resources, doesn't trigger any callbacks or remove from any queues
+ * Free the query and related resources, doesn't trigger any callbacks or remove from any queues.
+ *
+ * The command should already be taken care of (NULL).
*/
static void _evsql_query_free (struct evsql_query *query) {
assert(query->command == NULL);
@@ -112,12 +78,9 @@
}
/*
- * Dequeue the query, execute the callback, and free it.
+ * Execute the callback if res is given, and free the query.
*/
static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *res) {
- // dequeue
- TAILQ_REMOVE(&query->evsql->queue, query, entry);
-
if (res)
// call the callback
query->cb_fn(res, query->cb_arg);
@@ -127,77 +90,263 @@
}
/*
- * A query has failed, notify the user and remove it.
- */
-static void _evsql_query_failure (struct evsql *evsql, struct evsql_query *query) {
- struct evsql_result_info res; ZINIT(res);
-
- // set up the result_info
- res.evsql = evsql;
- res.error = 1;
-
- // finish it off
- _evsql_query_done(query, &res);
-}
-
-/*
- * Clear every enqueued query and then free the evsql.
- *
- * If result_info is given, each query will also recieve it via their callback, and the error_fn will be called.
- */
+ * XXX:
+ * /
static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *res) {
struct evsql_query *query;
// clear the queue
- while ((query = TAILQ_FIRST(&evsql->queue)) != NULL) {
+ while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
_evsql_query_done(query, res);
- TAILQ_REMOVE(&evsql->queue, query, entry);
+ TAILQ_REMOVE(&evsql->query_queue, query, entry);
}
- // do the error callback if required
- if (res)
- evsql->error_fn(evsql, evsql->cb_arg);
-
// free
free(evsql);
}
-
+*/
/*
- * Sends the next query if there are more enqueued
+ * Free the transaction, it should already be deassociated from the query and conn.
*/
-static void _evsql_pump (struct evsql *evsql) {
- struct evsql_query *query;
+static void _evsql_trans_free (struct evsql_trans *trans) {
+ // ensure we don't leak anything
+ assert(trans->query == NULL);
+ assert(trans->conn == NULL);
- // look for the next query
- if ((query = TAILQ_FIRST(&evsql->queue)) != NULL) {
- // try and execute it
- if (_evsql_query_exec(evsql, query, query->command)) {
- // the query failed
- _evsql_query_failure(evsql, query);
+ // free
+ free(trans);
+}
+
+/*
+ * Release a connection. It should already be deassociated from the trans and query.
+ *
+ * Releases the engine, removes from the conn_list and frees this.
+ */
+static void _evsql_conn_release (struct evsql_conn *conn) {
+ // ensure we don't leak anything
+ assert(conn->trans == NULL);
+ assert(conn->query == NULL);
+
+ // release the engine
+ switch (conn->evsql->type) {
+ case EVSQL_EVPQ:
+ evpq_release(conn->engine.evpq);
+ break;
+
+ default:
+ FATAL("evsql->type");
+ }
+
+ // remove from list
+ LIST_REMOVE(conn, entry);
+
+ // free
+ free(conn);
+}
+
+/*
+ * Fail a single query, this will trigger the callback and free it.
+ */
+static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) {
+ struct evsql_result_info res; ZINIT(res);
+
+ // set up the result_info
+ res.evsql = evsql;
+ res.error = 1;
+
+ // finish off the query
+ _evsql_query_done(query, &res);
+}
+
+/*
+ * Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the
+ * conn, and then free the trans.
+ */
+static void _evsql_trans_fail (struct evsql_trans *trans) {
+ if (trans->query) {
+ // free the query silently
+ _evsql_query_free(trans->query); trans->query = NULL;
+ }
+
+ // tell the user
+ // XXX: trans is in a bad state during this call
+ trans->error_fn(trans, trans->cb_arg);
+
+ // fail the conn
+ trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL;
+
+ // free the trans
+ _evsql_trans_free(trans);
+}
+
+/*
+ * Fail a connection. If the connection is transactional, this will just call _evsql_trans_fail, but otherwise it will
+ * fail any ongoing query, and then release the connection.
+ */
+static void _evsql_conn_fail (struct evsql_conn *conn) {
+ if (conn->trans) {
+ // let transactions handle their connection failures
+ _evsql_trans_fail(conn->trans);
+
+ } else {
+ if (conn->query) {
+ // fail the in-progress query
+ _evsql_query_fail(conn->evsql, conn->query); conn->query = NULL;
}
- // free the command
- free(query->command); query->command = NULL;
-
- // ok, then we just wait
+ // finish off the whole connection
+ _evsql_conn_release(conn);
}
}
+/*
+ * Processes enqueued non-transactional queries until the queue is empty, or we managed to exec a query.
+ *
+ * If execing a query on a connection fails, both the query and the connection are failed (in that order).
+ *
+ * Any further queries will then also be failed, because there's no reconnection/retry logic yet.
+ */
+static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) {
+ struct evsql_query *query;
+ int err;
+
+ // look for waiting queries
+ while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
+ // dequeue
+ TAILQ_REMOVE(&evsql->query_queue, query, entry);
+
+ if (conn) {
+ // try and execute it
+ err = _evsql_query_exec(conn, query, query->command);
+ }
-static void _evsql_evpq_connected (struct evpq_conn *conn, void *arg) {
- struct evsql *evsql = arg;
+ // free the command buf
+ free(query->command); query->command = NULL;
- // no state to update, just pump any waiting queries
- _evsql_pump(evsql);
+ if (err || !conn) {
+ if (!conn) {
+ // warn when dropping queries
+ WARNING("failing query becuse there are no conns");
+ }
+
+ // fail the query
+ _evsql_query_fail(evsql, query);
+
+ if (conn) {
+ // fail the connection
+ WARNING("failing the connection because a query-exec failed");
+
+ _evsql_conn_fail(conn); conn = NULL;
+ }
+
+ } else {
+ // we have succesfully enqueued a query, and we can wait for this connection to complete
+ break;
+
+ }
+
+ // handle the rest of the queue
+ }
+
+ // ok
+ return;
}
-static void _evsql_evpq_result (struct evpq_conn *conn, PGresult *result, void *arg) {
- struct evsql *evsql = arg;
- struct evsql_query *query;
+/*
+ * Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use.
+ */
+static void _evsql_trans_ready (const struct evsql_result_info *res, void *arg) {
+ (void) arg;
- assert((query = TAILQ_FIRST(&evsql->queue)) != NULL);
+ assert(res->trans);
+
+ // check for errors
+ if (res->error)
+ ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res));
+
+ // transaction is now ready for use
+ res->trans->ready_fn(res->trans, res->trans->cb_arg);
+
+error:
+ _evsql_trans_fail(res->trans);
+}
+
+/*
+ * The transaction's connection is ready, send the 'BEGIN' query.
+ */
+static void _evsql_trans_conn_ready (struct evsql *evsql, struct evsql_trans *trans) {
+ char trans_sql[EVSQL_QUERY_BEGIN_BUF];
+ const char *isolation_level;
+ int ret;
+
+ // determine the isolation_level to use
+ switch (trans->type) {
+ case EVSQL_TRANS_DEFAULT:
+ isolation_level = NULL; break;
+
+ case EVSQL_TRANS_SERIALIZABLE:
+ isolation_level = "SERIALIZABLE"; break;
+
+ case EVSQL_TRANS_REPEATABLE_READ:
+ isolation_level = "REPEATABLE READ"; break;
+
+ case EVSQL_TRANS_READ_COMMITTED:
+ isolation_level = "READ COMMITTED"; break;
+
+ case EVSQL_TRANS_READ_UNCOMMITTED:
+ isolation_level = "READ UNCOMMITTED"; break;
+
+ default:
+ FATAL("trans->type: %d", trans->type);
+ }
+
+ // build the trans_sql
+ if (isolation_level)
+ ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION ISOLATION LEVEL %s", isolation_level);
+ else
+ ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION");
+
+ // make sure it wasn't truncated
+ if (ret >= EVSQL_QUERY_BEGIN_BUF)
+ ERROR("trans_sql overflow: %d >= %d", ret, EVSQL_QUERY_BEGIN_BUF);
+
+ // execute the query
+ if (evsql_query(evsql, trans, trans_sql, _evsql_trans_ready, NULL))
+ ERROR("evsql_query");
+
+ // success
+ return;
+
+error:
+ // fail the transaction
+ _evsql_trans_fail(trans);
+}
+
+/*
+ * The evpq connection was succesfully established.
+ */
+static void _evsql_evpq_connected (struct evpq_conn *_conn, void *arg) {
+ struct evsql_conn *conn = arg;
+
+ if (conn->trans)
+ // notify the transaction
+ _evsql_trans_conn_ready(conn->evsql, conn->trans);
+
+ else
+ // pump any waiting transactionless queries
+ _evsql_pump(conn->evsql, conn);
+}
+
+/*
+ * Got one result on this evpq connection.
+ */
+static void _evsql_evpq_result (struct evpq_conn *_conn, PGresult *result, void *arg) {
+ struct evsql_conn *conn = arg;
+ struct evsql_query *query = conn->query;
+
+ assert(query != NULL);
// if we get multiple results, only return the first one
if (query->result.evpq) {
@@ -210,15 +359,18 @@
query->result.evpq = result;
}
-static void _evsql_evpq_done (struct evpq_conn *conn, void *arg) {
- struct evsql *evsql = arg;
- struct evsql_query *query;
+/*
+ * No more results for this query.
+ */
+static void _evsql_evpq_done (struct evpq_conn *_conn, void *arg) {
+ struct evsql_conn *conn = arg;
+ struct evsql_query *query = conn->query;
struct evsql_result_info res; ZINIT(res);
-
- assert((query = TAILQ_FIRST(&evsql->queue)) != NULL);
+
+ assert(query != NULL);
// set up the result_info
- res.evsql = evsql;
+ res.evsql = conn->evsql;
if (query->result.evpq == NULL) {
// if a query didn't return any results (bug?), warn and fail the query
@@ -237,27 +389,39 @@
}
- // finish it off
- _evsql_query_done(query, &res);
+ // de-associate the query from the connection
+ conn->query = NULL;
+
+ // how we handle query completion depends on if we're a transaction or not
+ if (conn->trans) {
+ // we can deassign the trans's query
+ conn->trans->query = NULL;
- // pump the next one
- _evsql_pump(evsql);
+ // then hand the query to the user
+ _evsql_query_done(query, &res);
+
+ } else {
+ // a transactionless query, so just finish it off and pump any other waiting ones
+ _evsql_query_done(query, &res);
+
+ // pump the next one
+ _evsql_pump(conn->evsql, conn);
+ }
}
-static void _evsql_evpq_failure (struct evpq_conn *conn, void *arg) {
- struct evsql *evsql = arg;
- struct evsql_result_info result; ZINIT(result);
-
- // OH SHI...
+/*
+ * The connection failed.
+ */
+static void _evsql_evpq_failure (struct evpq_conn *_conn, void *arg) {
+ struct evsql_conn *conn = arg;
- // set up the result_info
- result.evsql = evsql;
- result.error = 1;
-
- // finish off the whole connection
- _evsql_destroy(evsql, &result);
+ // just fail the conn
+ _evsql_conn_fail(conn);
}
+/*
+ * Our evpq behaviour
+ */
static struct evpq_callback_info _evsql_evpq_cb_info = {
.fn_connected = _evsql_evpq_connected,
.fn_result = _evsql_evpq_result,
@@ -265,7 +429,10 @@
.fn_failure = _evsql_evpq_failure,
};
-static struct evsql *_evsql_new_base (evsql_error_cb error_fn, void *cb_arg) {
+/*
+ * Allocate the generic evsql context.
+ */
+static struct evsql *_evsql_new_base (struct event_base *ev_base, evsql_error_cb error_fn, void *cb_arg) {
struct evsql *evsql = NULL;
// allocate it
@@ -273,11 +440,13 @@
ERROR("calloc");
// store
+ evsql->ev_base = ev_base;
evsql->error_fn = error_fn;
evsql->cb_arg = cb_arg;
// init
- TAILQ_INIT(&evsql->queue);
+ LIST_INIT(&evsql->conn_list);
+ TAILQ_INIT(&evsql->query_queue);
// done
return evsql;
@@ -286,15 +455,55 @@
return NULL;
}
+/*
+ * Start a new connection and add it to the list, it won't be ready until _evsql_evpq_connected is called
+ */
+static struct evsql_conn *_evsql_conn_new (struct evsql *evsql) {
+ struct evsql_conn *conn = NULL;
+
+ // allocate
+ if ((conn = calloc(1, sizeof(*conn))) == NULL)
+ ERROR("calloc");
+
+ // init
+ conn->evsql = evsql;
+
+ // connect the engine
+ switch (evsql->type) {
+ case EVSQL_EVPQ:
+ if ((conn->engine.evpq = evpq_connect(evsql->ev_base, evsql->engine_conf.evpq, _evsql_evpq_cb_info, conn)) == NULL)
+ goto error;
+
+ break;
+
+ default:
+ FATAL("evsql->type");
+ }
+
+ // add it to the list
+ LIST_INSERT_HEAD(&evsql->conn_list, conn, entry);
+
+ // success
+ return conn;
+
+error:
+ free(conn);
+
+ return NULL;
+}
+
struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) {
struct evsql *evsql = NULL;
// base init
- if ((evsql = _evsql_new_base (error_fn, cb_arg)) == NULL)
+ if ((evsql = _evsql_new_base (ev_base, error_fn, cb_arg)) == NULL)
goto error;
- // connect the engine
- if ((evsql->engine.evpq = evpq_connect(ev_base, pq_conninfo, _evsql_evpq_cb_info, evsql)) == NULL)
+ // store conf
+ evsql->engine_conf.evpq = pq_conninfo;
+
+ // pre-create one connection
+ if (_evsql_conn_new(evsql) == NULL)
goto error;
// done
@@ -308,50 +517,123 @@
}
/*
- * Checks what the state of the connection is in regards to executing a query.
+ * Checks if the connection is already allocated for some other trans/query.
*
* Returns:
- * <0 connection failure, query not possible
- * 0 connection idle, can query immediately
- * 1 connection busy, must queue query
+ * 0 connection idle, can be allocated
+ * >1 connection busy
*/
-static int _evsql_query_busy (struct evsql *evsql) {
- switch (evsql->type) {
+static int _evsql_conn_busy (struct evsql_conn *conn) {
+ // transactions get the connection to themselves
+ if (conn->trans)
+ return 1;
+
+ // if it has a query assigned, it's busy
+ if (conn->query)
+ return 1;
+
+ // otherwise, it's all idle
+ return 0;
+}
+
+/*
+ * Checks if the connection is ready for use (i.e. _evsql_evpq_connected was called).
+ *
+ * The connection should not already have a query running.
+ *
+ * Returns
+ * <0 the connection is not valid (failed, query in progress)
+ * 0 the connection is still pending, and will become ready at some point
+ * >0 it's ready
+ */
+static int _evsql_conn_ready (struct evsql_conn *conn) {
+ switch (conn->evsql->type) {
case EVSQL_EVPQ: {
- enum evpq_state state = evpq_state(evsql->engine.evpq);
+ enum evpq_state state = evpq_state(conn->engine.evpq);
switch (state) {
case EVPQ_CONNECT:
- case EVPQ_QUERY:
- return 1;
+ return 0;
case EVPQ_CONNECTED:
- return 0;
+ return 1;
+ case EVPQ_QUERY:
case EVPQ_INIT:
case EVPQ_FAILURE:
return -1;
default:
- FATAL("evpq_state");
+ FATAL("evpq_state: %d", state);
}
}
default:
- FATAL("evsql->type");
+ FATAL("evsql->type: %d", conn->evsql->type);
}
}
-static struct evsql_query *_evsql_query_new (struct evsql *evsql, evsql_query_cb query_fn, void *cb_arg) {
+/*
+ * Allocate a connection for use and return it via *conn_ptr, or if may_queue is nonzero and the connection pool is
+ * getting full, return NULL (query should be queued).
+ *
+ * Note that the returned connection might not be ready for use yet (if we created a new one, see _evsql_conn_ready).
+ *
+ * Returns zero if a connection was found or the request should be queued, or nonzero if something failed and the
+ * request should be dropped.
+ */
+static int _evsql_conn_get (struct evsql *evsql, struct evsql_conn **conn_ptr, int may_queue) {
+ *conn_ptr = NULL;
+
+ // find a connection that isn't busy and is ready (unless the query queue is empty).
+ LIST_FOREACH(*conn_ptr, &evsql->conn_list, entry) {
+ // skip busy conns always
+ if (_evsql_conn_busy(*conn_ptr))
+ continue;
+
+ // accept pending conns as long as there are NO enqueued queries (might cause deadlock otherwise)
+ if (_evsql_conn_ready(*conn_ptr) == 0 && TAILQ_EMPTY(&evsql->query_queue))
+ break;
+
+ // accept conns that are in a fully ready state
+ if (_evsql_conn_ready(*conn_ptr) > 0)
+ break;
+ }
+
+ // if we found an idle connection, we can just return that right away
+ if (*conn_ptr)
+ return 0;
+
+ // return NULL if may_queue and the conn list is not empty
+ if (may_queue && !LIST_EMPTY(&evsql->conn_list))
+ return 0;
+
+ // we need to open a new connection
+ if ((*conn_ptr = _evsql_conn_new(evsql)) == NULL)
+ goto error;
+
+ // good
+ return 0;
+error:
+ return -1;
+}
+
+/*
+ * Validate and allocate the basic stuff for a new query.
+ */
+static struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) {
struct evsql_query *query;
+ // if it's part of a trans, then make sure the trans is idle
+ if (trans && trans->query)
+ ERROR("transaction is busy");
+
// allocate it
if ((query = calloc(1, sizeof(*query))) == NULL)
ERROR("calloc");
// store
- query->evsql = evsql;
query->cb_fn = query_fn;
query->cb_arg = cb_arg;
@@ -362,29 +644,48 @@
return NULL;
}
-static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_query *query, const char *command) {
- int busy;
-
- // check state
- if ((busy = _evsql_query_busy(evsql)) < 0)
- ERROR("connection is not valid");
-
- if (busy) {
- // copy the command for later execution
- if ((query->command = strdup(command)) == NULL)
- ERROR("strdup");
+/*
+ * Handle a new query.
+ *
+ * For transactions this will associate the query and then execute it, otherwise this will either find an idle
+ * connection and send the query, or enqueue it.
+ */
+static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_trans *trans, struct evsql_query *query, const char *command) {
+ // transaction queries are handled differently
+ if (trans) {
+ // it's an in-transaction query
+ assert(trans->query == NULL);
+
+ // assign the query
+ trans->query = query;
+
+ // execute directly
+ if (_evsql_query_exec(trans->conn, query, command))
+ goto error;
} else {
- assert(TAILQ_EMPTY(&evsql->queue));
+ struct evsql_conn *conn;
+
+ // find an idle connection
+ if ((_evsql_conn_get(evsql, &conn, 1)))
+ ERROR("couldn't allocate a connection for the query");
- // execute directly
- if (_evsql_query_exec(evsql, query, command))
- goto error;
+ // we must enqueue if no idle conn or the conn is not yet ready
+ if (conn && _evsql_conn_ready(conn) > 0) {
+ // execute directly
+ if (_evsql_query_exec(conn, query, command))
+ goto error;
+
+ } else {
+ // copy the command for later execution
+ if ((query->command = strdup(command)) == NULL)
+ ERROR("strdup");
+
+ // enqueue until some connection pumps the queue
+ TAILQ_INSERT_TAIL(&evsql->query_queue, query, entry);
+ }
}
-
- // store it on the list
- TAILQ_INSERT_TAIL(&evsql->queue, query, entry);
// ok, good
return 0;
@@ -393,15 +694,15 @@
return -1;
}
-struct evsql_query *evsql_query (struct evsql *evsql, const char *command, evsql_query_cb query_fn, void *cb_arg) {
+struct evsql_query *evsql_query (struct evsql *evsql, struct evsql_trans *trans, const char *command, evsql_query_cb query_fn, void *cb_arg) {
struct evsql_query *query = NULL;
// alloc new query
- if ((query = _evsql_query_new(evsql, query_fn, cb_arg)) == NULL)
+ if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL)
goto error;
// just execute the command string directly
- if (_evsql_query_enqueue(evsql, query, command))
+ if (_evsql_query_enqueue(evsql, trans, query, command))
goto error;
// ok
@@ -413,13 +714,13 @@
return NULL;
}
-struct evsql_query *evsql_query_params (struct evsql *evsql, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg) {
+struct evsql_query *evsql_query_params (struct evsql *evsql, struct evsql_trans *trans, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg) {
struct evsql_query *query = NULL;
const struct evsql_query_param *param;
int idx;
// alloc new query
- if ((query = _evsql_query_new(evsql, query_fn, cb_arg)) == NULL)
+ if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL)
goto error;
// count the params
@@ -464,7 +765,7 @@
}
// execute it
- if (_evsql_query_enqueue(evsql, query, command))
+ if (_evsql_query_enqueue(evsql, trans, query, command))
goto error;
// ok
@@ -476,166 +777,3 @@
return NULL;
}
-int evsql_param_string (struct evsql_query_params *params, size_t param, const char *ptr) {
- struct evsql_query_param *p = ¶ms->list[param];
-
- assert(p->type == EVSQL_PARAM_STRING);
-
- p->data_raw = ptr;
- p->length = 0;
-
- return 0;
-}
-
-int evsql_param_uint32 (struct evsql_query_params *params, size_t param, uint32_t uval) {
- struct evsql_query_param *p = ¶ms->list[param];
-
- assert(p->type == EVSQL_PARAM_UINT32);
-
- p->data.uint32 = htonl(uval);
- p->data_raw = (const char *) &p->data.uint32;
- p->length = sizeof(uval);
-
- return 0;
-}
-
-const char *evsql_result_error (const struct evsql_result_info *res) {
- if (!res->error)
- return "No error";
-
- switch (res->evsql->type) {
- case EVSQL_EVPQ:
- if (!res->result.pq)
- return "unknown error (no result)";
-
- return PQresultErrorMessage(res->result.pq);
-
- default:
- FATAL("res->evsql->type");
- }
-
-}
-
-size_t evsql_result_rows (const struct evsql_result_info *res) {
- switch (res->evsql->type) {
- case EVSQL_EVPQ:
- return PQntuples(res->result.pq);
-
- default:
- FATAL("res->evsql->type");
- }
-}
-
-size_t evsql_result_cols (const struct evsql_result_info *res) {
- switch (res->evsql->type) {
- case EVSQL_EVPQ:
- return PQnfields(res->result.pq);
-
- default:
- FATAL("res->evsql->type");
- }
-}
-
-int evsql_result_binary (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, size_t size, int nullok) {
- *ptr = NULL;
-
- switch (res->evsql->type) {
- case EVSQL_EVPQ:
- if (PQgetisnull(res->result.pq, row, col)) {
- if (nullok)
- return 0;
- else
- ERROR("[%zu:%zu] field is null", row, col);
- }
-
- if (PQfformat(res->result.pq, col) != 1)
- ERROR("[%zu:%zu] PQfformat is not binary: %d", row, col, PQfformat(res->result.pq, col));
-
- if (size && PQgetlength(res->result.pq, row, col) != size)
- ERROR("[%zu:%zu] field size mismatch: %zu -> %d", row, col, size, PQgetlength(res->result.pq, row, col));
-
- *ptr = PQgetvalue(res->result.pq, row, col);
-
- return 0;
-
- default:
- FATAL("res->evsql->type");
- }
-
-error:
- return -1;
-}
-
-int evsql_result_string (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, int nullok) {
- return evsql_result_binary(res, row, col, ptr, 0, nullok);
-}
-
-int evsql_result_uint16 (const struct evsql_result_info *res, size_t row, size_t col, uint16_t *uval, int nullok) {
- const char *data;
- int16_t sval;
-
- if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok))
- goto error;
-
- sval = ntohs(*((int16_t *) data));
-
- if (sval < 0)
- ERROR("negative value for unsigned: %d", sval);
-
- *uval = sval;
-
- return 0;
-
-error:
- return nullok ? 0 : -1;
-}
-
-int evsql_result_uint32 (const struct evsql_result_info *res, size_t row, size_t col, uint32_t *uval, int nullok) {
- const char *data;
- int32_t sval;
-
- if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok))
- goto error;
-
- sval = ntohl(*(int32_t *) data);
-
- if (sval < 0)
- ERROR("negative value for unsigned: %d", sval);
-
- *uval = sval;
-
- return 0;
-
-error:
- return nullok ? 0 : -1;
-}
-
-int evsql_result_uint64 (const struct evsql_result_info *res, size_t row, size_t col, uint64_t *uval, int nullok) {
- const char *data;
- int64_t sval;
-
- if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok))
- goto error;
-
- sval = ntohq(*(int64_t *) data);
-
- if (sval < 0)
- ERROR("negative value for unsigned: %ld", sval);
-
- *uval = sval;
-
- return 0;
-
-error:
- return nullok ? 0 : -1;
-}
-
-void evsql_result_free (const struct evsql_result_info *res) {
- switch (res->evsql->type) {
- case EVSQL_EVPQ:
- return PQclear(res->result.pq);
-
- default:
- FATAL("res->evsql->type");
- }
-}