--- a/src/evsql.c Sun Oct 12 20:10:47 2008 +0300
+++ b/src/evsql.c Sun Oct 12 21:59:52 2008 +0300
@@ -10,6 +10,10 @@
#include "lib/error.h"
#include "lib/misc.h"
+/*
+ * A couple function prototypes
+ */
+static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn);
/*
* Actually execute the given query.
@@ -141,12 +145,31 @@
// remove from list
LIST_REMOVE(conn, entry);
+
+ // catch deadlocks
+ assert(!LIST_EMPTY(&conn->evsql->conn_list) || TAILQ_EMPTY(&conn->evsql->query_queue));
// free
free(conn);
}
/*
+ * Release a transaction, it should already be deassociated from the query.
+ *
+ * Perform a two-way-deassociation with the conn, and then free the trans.
+ */
+static void _evsql_trans_release (struct evsql_trans *trans) {
+ assert(trans->query == NULL);
+ assert(trans->conn != NULL);
+
+ // deassociate the conn
+ trans->conn->trans = NULL; trans->conn = NULL;
+
+ // free the trans
+ _evsql_trans_free(trans);
+}
+
+/*
* Fail a single query, this will trigger the callback and free it.
*/
static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) {
@@ -164,7 +187,7 @@
* Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the
* conn, and then free the trans.
*/
-static void _evsql_trans_fail (struct evsql_trans *trans) {
+static void _evsql_trans_fail (struct evsql_trans *trans, int silent) {
if (trans->query) {
// free the query silently
_evsql_query_free(trans->query); trans->query = NULL;
@@ -172,10 +195,14 @@
// tell the user
// XXX: trans is in a bad state during this call
- trans->error_fn(trans, trans->cb_arg);
+ if (!silent)
+ trans->error_fn(trans, trans->cb_arg);
+
+ // deassociate and release the conn
+ trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL;
- // fail the conn
- trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL;
+ // pump the queue for requests that were waiting for this connection
+ _evsql_pump(trans->evsql, NULL);
// free the trans
_evsql_trans_free(trans);
@@ -188,7 +215,7 @@
static void _evsql_conn_fail (struct evsql_conn *conn) {
if (conn->trans) {
// let transactions handle their connection failures
- _evsql_trans_fail(conn->trans);
+ _evsql_trans_fail(conn->trans, 0);
} else {
if (conn->query) {
@@ -207,6 +234,8 @@
* If execing a query on a connection fails, both the query and the connection are failed (in that order).
*
* Any further queries will then also be failed, because there's no reconnection/retry logic yet.
+ *
+ * This means that if conn is NULL, all queries are failed.
*/
static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) {
struct evsql_query *query;
@@ -268,9 +297,12 @@
// transaction is now ready for use
res->trans->ready_fn(res->trans, res->trans->cb_arg);
+
+ // good
+ return;
error:
- _evsql_trans_fail(res->trans);
+ _evsql_trans_fail(res->trans, 0);
}
/*
@@ -321,7 +353,7 @@
error:
// fail the transaction
- _evsql_trans_fail(trans);
+ _evsql_trans_fail(trans, 0);
}
/*
@@ -619,11 +651,52 @@
return -1;
}
+struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, evsql_trans_done_cb done_fn, void *cb_arg) {
+ struct evsql_trans *trans = NULL;
+
+ // allocate it
+ if ((trans = calloc(1, sizeof(*trans))) == NULL)
+ ERROR("calloc");
+
+ // store
+ trans->evsql = evsql;
+ trans->error_fn = error_fn;
+ trans->ready_fn = ready_fn;
+ trans->done_fn = done_fn;
+ trans->cb_arg = cb_arg;
+ trans->type = type;
+
+ // find a connection
+ if (_evsql_conn_get(evsql, &trans->conn, 0))
+ ERROR("_evsql_conn_get");
+
+ // associate the conn
+ trans->conn->trans = trans;
+
+ // is it already ready?
+ if (_evsql_conn_ready(trans->conn) > 0) {
+ // call _evsql_trans_conn_ready directly
+ _evsql_trans_conn_ready(evsql, trans);
+
+ } else {
+ // otherwise, wait for the conn to be ready
+
+ }
+
+ // ok
+ return trans;
+
+error:
+ free(trans);
+
+ return NULL;
+}
+
/*
* Validate and allocate the basic stuff for a new query.
*/
static struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) {
- struct evsql_query *query;
+ struct evsql_query *query = NULL;
// if it's part of a trans, then make sure the trans is idle
if (trans && trans->query)
@@ -660,8 +733,13 @@
trans->query = query;
// execute directly
- if (_evsql_query_exec(trans->conn, query, command))
+ if (_evsql_query_exec(trans->conn, query, command)) {
+ // ack, fail the connection
+ _evsql_conn_fail(trans->conn);
+
+ // caller frees query
goto error;
+ }
} else {
struct evsql_conn *conn;
@@ -673,9 +751,17 @@
// we must enqueue if no idle conn or the conn is not yet ready
if (conn && _evsql_conn_ready(conn) > 0) {
// execute directly
- if (_evsql_query_exec(conn, query, command))
+ if (_evsql_query_exec(conn, query, command)) {
+ // ack, fail the connection
+ _evsql_conn_fail(conn);
+
+ // make sure we don't deadlock any queries, but if this query got a conn directly, then we shouldn't
+ // have any queries enqueued anyways
+ assert(TAILQ_EMPTY(&evsql->query_queue));
+
+ // caller frees query
goto error;
-
+ }
} else {
// copy the command for later execution
@@ -777,3 +863,59 @@
return NULL;
}
+void _evsql_trans_commit_res (const struct evsql_result_info *res, void *arg) {
+ (void) arg;
+
+ assert(res->trans);
+
+ // check for errors
+ if (res->error)
+ ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res));
+
+ // transaction is now done
+ res->trans->done_fn(res->trans, res->trans->cb_arg);
+
+ // release it
+ _evsql_trans_release(res->trans);
+
+ // success
+ return;
+
+error:
+ _evsql_trans_fail(res->trans, 0);
+}
+
+int evsql_trans_commit (struct evsql_trans *trans) {
+ static const char *sql = "COMMIT TRANSACTION";
+
+ // query
+ return (evsql_query(trans->evsql, trans, sql, _evsql_trans_commit_res, NULL) != NULL);
+}
+
+void _evsql_trans_rollback_res (const struct evsql_result_info *res, void *arg) {
+ (void) arg;
+
+ assert(res->trans);
+
+ // fail the connection on errors
+ if (res->error)
+ ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res));
+
+ // release it
+ _evsql_trans_release(res->trans);
+
+ // success
+ return;
+
+error:
+ // but do it silently
+ _evsql_trans_fail(res->trans, 1);
+}
+
+int evsql_trans_abort (struct evsql_trans *trans) {
+ static const char *sql = "ROLLBACK TRANSACTION";
+
+ // query
+ return (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, NULL) != NULL);
+}
+