src/evsql.c
changeset 26 61668c57f4bb
parent 25 99a41f48e29b
child 27 461be4cd34a3
--- a/src/evsql.c	Sun Oct 12 20:10:47 2008 +0300
+++ b/src/evsql.c	Sun Oct 12 21:59:52 2008 +0300
@@ -10,6 +10,10 @@
 #include "lib/error.h"
 #include "lib/misc.h"
 
+/*
+ * A couple function prototypes
+ */ 
+static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn);
 
 /*
  * Actually execute the given query.
@@ -141,12 +145,31 @@
     
     // remove from list
     LIST_REMOVE(conn, entry);
+    
+    // catch deadlocks
+    assert(!LIST_EMPTY(&conn->evsql->conn_list) || TAILQ_EMPTY(&conn->evsql->query_queue));
 
     // free
     free(conn);
 }
 
 /*
+ * Release a transaction, it should already be deassociated from the query.
+ *
+ * Perform a two-way-deassociation with the conn, and then free the trans.
+ */
+static void _evsql_trans_release (struct evsql_trans *trans) {
+    assert(trans->query == NULL);
+    assert(trans->conn != NULL);
+
+    // deassociate the conn
+    trans->conn->trans = NULL; trans->conn = NULL;
+
+    // free the trans
+    _evsql_trans_free(trans);
+}
+
+/*
  * Fail a single query, this will trigger the callback and free it.
  */
 static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) {
@@ -164,7 +187,7 @@
  * Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the
  * conn, and then free the trans.
  */ 
-static void _evsql_trans_fail (struct evsql_trans *trans) {
+static void _evsql_trans_fail (struct evsql_trans *trans, int silent) {
     if (trans->query) {
         // free the query silently
         _evsql_query_free(trans->query); trans->query = NULL;
@@ -172,10 +195,14 @@
 
     // tell the user
     // XXX: trans is in a bad state during this call
-    trans->error_fn(trans, trans->cb_arg);
+    if (!silent)
+        trans->error_fn(trans, trans->cb_arg);
+ 
+    // deassociate and release the conn
+    trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL;
 
-    // fail the conn
-    trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL;
+    // pump the queue for requests that were waiting for this connection
+    _evsql_pump(trans->evsql, NULL);
 
     // free the trans
     _evsql_trans_free(trans);
@@ -188,7 +215,7 @@
 static void _evsql_conn_fail (struct evsql_conn *conn) {
     if (conn->trans) {
         // let transactions handle their connection failures
-        _evsql_trans_fail(conn->trans);
+        _evsql_trans_fail(conn->trans, 0);
 
     } else {
         if (conn->query) {
@@ -207,6 +234,8 @@
  * If execing a query on a connection fails, both the query and the connection are failed (in that order).
  *
  * Any further queries will then also be failed, because there's no reconnection/retry logic yet.
+ *
+ * This means that if conn is NULL, all queries are failed.
  */
 static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) {
     struct evsql_query *query;
@@ -268,9 +297,12 @@
     
     // transaction is now ready for use
     res->trans->ready_fn(res->trans, res->trans->cb_arg);
+    
+    // good
+    return;
 
 error:
-    _evsql_trans_fail(res->trans);
+    _evsql_trans_fail(res->trans, 0);
 }
 
 /*
@@ -321,7 +353,7 @@
 
 error:
     // fail the transaction
-    _evsql_trans_fail(trans);
+    _evsql_trans_fail(trans, 0);
 }
 
 /*
@@ -619,11 +651,52 @@
     return -1;
 }
 
+struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, evsql_trans_done_cb done_fn, void *cb_arg) {
+    struct evsql_trans *trans = NULL;
+
+    // allocate it
+    if ((trans = calloc(1, sizeof(*trans))) == NULL)
+        ERROR("calloc");
+
+    // store
+    trans->evsql = evsql;
+    trans->error_fn = error_fn;
+    trans->ready_fn = ready_fn;
+    trans->done_fn = done_fn;
+    trans->cb_arg = cb_arg;
+    trans->type = type;
+
+    // find a connection
+    if (_evsql_conn_get(evsql, &trans->conn, 0))
+        ERROR("_evsql_conn_get");
+
+    // associate the conn
+    trans->conn->trans = trans;
+
+    // is it already ready?
+    if (_evsql_conn_ready(trans->conn) > 0) {
+        // call _evsql_trans_conn_ready directly
+        _evsql_trans_conn_ready(evsql, trans);
+
+    } else {
+        // otherwise, wait for the conn to be ready
+         
+    }
+
+    // ok
+    return trans;
+
+error:
+    free(trans);
+
+    return NULL;
+}
+
 /*
  * Validate and allocate the basic stuff for a new query.
  */
 static struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) {
-    struct evsql_query *query;
+    struct evsql_query *query = NULL;
     
     // if it's part of a trans, then make sure the trans is idle
     if (trans && trans->query)
@@ -660,8 +733,13 @@
         trans->query = query;
 
         // execute directly
-        if (_evsql_query_exec(trans->conn, query, command))
+        if (_evsql_query_exec(trans->conn, query, command)) {
+            // ack, fail the connection
+            _evsql_conn_fail(trans->conn);
+            
+            // caller frees query
             goto error;
+        }
 
     } else {
         struct evsql_conn *conn;
@@ -673,9 +751,17 @@
         // we must enqueue if no idle conn or the conn is not yet ready
         if (conn && _evsql_conn_ready(conn) > 0) {
             // execute directly
-            if (_evsql_query_exec(conn, query, command))
+            if (_evsql_query_exec(conn, query, command)) {
+                // ack, fail the connection
+                _evsql_conn_fail(conn);
+                
+                // make sure we don't deadlock any queries, but if this query got a conn directly, then we shouldn't
+                // have any queries enqueued anyways
+                assert(TAILQ_EMPTY(&evsql->query_queue));
+                
+                // caller frees query
                 goto error;
-
+            }
 
         } else {
             // copy the command for later execution
@@ -777,3 +863,59 @@
     return NULL;
 }
 
+void _evsql_trans_commit_res (const struct evsql_result_info *res, void *arg) {
+    (void) arg;
+
+    assert(res->trans);
+
+    // check for errors
+    if (res->error)
+        ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res));
+    
+    // transaction is now done
+    res->trans->done_fn(res->trans, res->trans->cb_arg);
+    
+    // release it
+    _evsql_trans_release(res->trans);
+
+    // success
+    return;
+
+error:
+    _evsql_trans_fail(res->trans, 0);
+}
+
+int evsql_trans_commit (struct evsql_trans *trans) {
+    static const char *sql = "COMMIT TRANSACTION";
+    
+    // query
+    return (evsql_query(trans->evsql, trans, sql, _evsql_trans_commit_res, NULL) != NULL);
+}
+
+void _evsql_trans_rollback_res (const struct evsql_result_info *res, void *arg) {
+    (void) arg;
+
+    assert(res->trans);
+
+    // fail the connection on errors
+    if (res->error)
+        ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res));
+
+    // release it
+    _evsql_trans_release(res->trans);
+
+    // success
+    return;
+
+error:
+    // but do it silently
+    _evsql_trans_fail(res->trans, 1);
+}
+
+int evsql_trans_abort (struct evsql_trans *trans) {
+    static const char *sql = "ROLLBACK TRANSACTION";
+
+    // query
+    return (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, NULL) != NULL);
+}
+