src/evsql.c
changeset 26 61668c57f4bb
parent 25 99a41f48e29b
child 27 461be4cd34a3
equal deleted inserted replaced
25:99a41f48e29b 26:61668c57f4bb
     8 #include "evpq.h"
     8 #include "evpq.h"
     9 #include "lib/log.h"
     9 #include "lib/log.h"
    10 #include "lib/error.h"
    10 #include "lib/error.h"
    11 #include "lib/misc.h"
    11 #include "lib/misc.h"
    12 
    12 
       
    13 /*
       
    14  * A couple function prototypes
       
    15  */ 
       
    16 static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn);
    13 
    17 
    14 /*
    18 /*
    15  * Actually execute the given query.
    19  * Actually execute the given query.
    16  *
    20  *
    17  * The backend should be able to accept the query at this time.
    21  * The backend should be able to accept the query at this time.
   139             FATAL("evsql->type");
   143             FATAL("evsql->type");
   140     }
   144     }
   141     
   145     
   142     // remove from list
   146     // remove from list
   143     LIST_REMOVE(conn, entry);
   147     LIST_REMOVE(conn, entry);
       
   148     
       
   149     // catch deadlocks
       
   150     assert(!LIST_EMPTY(&conn->evsql->conn_list) || TAILQ_EMPTY(&conn->evsql->query_queue));
   144 
   151 
   145     // free
   152     // free
   146     free(conn);
   153     free(conn);
       
   154 }
       
   155 
       
   156 /*
       
   157  * Release a transaction, it should already be deassociated from the query.
       
   158  *
       
   159  * Perform a two-way-deassociation with the conn, and then free the trans.
       
   160  */
       
   161 static void _evsql_trans_release (struct evsql_trans *trans) {
       
   162     assert(trans->query == NULL);
       
   163     assert(trans->conn != NULL);
       
   164 
       
   165     // deassociate the conn
       
   166     trans->conn->trans = NULL; trans->conn = NULL;
       
   167 
       
   168     // free the trans
       
   169     _evsql_trans_free(trans);
   147 }
   170 }
   148 
   171 
   149 /*
   172 /*
   150  * Fail a single query, this will trigger the callback and free it.
   173  * Fail a single query, this will trigger the callback and free it.
   151  */
   174  */
   162 
   185 
   163 /*
   186 /*
   164  * Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the
   187  * Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the
   165  * conn, and then free the trans.
   188  * conn, and then free the trans.
   166  */ 
   189  */ 
   167 static void _evsql_trans_fail (struct evsql_trans *trans) {
   190 static void _evsql_trans_fail (struct evsql_trans *trans, int silent) {
   168     if (trans->query) {
   191     if (trans->query) {
   169         // free the query silently
   192         // free the query silently
   170         _evsql_query_free(trans->query); trans->query = NULL;
   193         _evsql_query_free(trans->query); trans->query = NULL;
   171     }
   194     }
   172 
   195 
   173     // tell the user
   196     // tell the user
   174     // XXX: trans is in a bad state during this call
   197     // XXX: trans is in a bad state during this call
   175     trans->error_fn(trans, trans->cb_arg);
   198     if (!silent)
   176 
   199         trans->error_fn(trans, trans->cb_arg);
   177     // fail the conn
   200  
       
   201     // deassociate and release the conn
   178     trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL;
   202     trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL;
       
   203 
       
   204     // pump the queue for requests that were waiting for this connection
       
   205     _evsql_pump(trans->evsql, NULL);
   179 
   206 
   180     // free the trans
   207     // free the trans
   181     _evsql_trans_free(trans);
   208     _evsql_trans_free(trans);
   182 }
   209 }
   183 
   210 
   186  * fail any ongoing query, and then release the connection.
   213  * fail any ongoing query, and then release the connection.
   187  */
   214  */
   188 static void _evsql_conn_fail (struct evsql_conn *conn) {
   215 static void _evsql_conn_fail (struct evsql_conn *conn) {
   189     if (conn->trans) {
   216     if (conn->trans) {
   190         // let transactions handle their connection failures
   217         // let transactions handle their connection failures
   191         _evsql_trans_fail(conn->trans);
   218         _evsql_trans_fail(conn->trans, 0);
   192 
   219 
   193     } else {
   220     } else {
   194         if (conn->query) {
   221         if (conn->query) {
   195             // fail the in-progress query
   222             // fail the in-progress query
   196             _evsql_query_fail(conn->evsql, conn->query); conn->query = NULL;
   223             _evsql_query_fail(conn->evsql, conn->query); conn->query = NULL;
   205  * Processes enqueued non-transactional queries until the queue is empty, or we managed to exec a query.
   232  * Processes enqueued non-transactional queries until the queue is empty, or we managed to exec a query.
   206  *
   233  *
   207  * If execing a query on a connection fails, both the query and the connection are failed (in that order).
   234  * If execing a query on a connection fails, both the query and the connection are failed (in that order).
   208  *
   235  *
   209  * Any further queries will then also be failed, because there's no reconnection/retry logic yet.
   236  * Any further queries will then also be failed, because there's no reconnection/retry logic yet.
       
   237  *
       
   238  * This means that if conn is NULL, all queries are failed.
   210  */
   239  */
   211 static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) {
   240 static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) {
   212     struct evsql_query *query;
   241     struct evsql_query *query;
   213     int err;
   242     int err;
   214     
   243     
   266     if (res->error)
   295     if (res->error)
   267         ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res));
   296         ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res));
   268     
   297     
   269     // transaction is now ready for use
   298     // transaction is now ready for use
   270     res->trans->ready_fn(res->trans, res->trans->cb_arg);
   299     res->trans->ready_fn(res->trans, res->trans->cb_arg);
   271 
   300     
   272 error:
   301     // good
   273     _evsql_trans_fail(res->trans);
   302     return;
       
   303 
       
   304 error:
       
   305     _evsql_trans_fail(res->trans, 0);
   274 }
   306 }
   275 
   307 
   276 /*
   308 /*
   277  * The transaction's connection is ready, send the 'BEGIN' query.
   309  * The transaction's connection is ready, send the 'BEGIN' query.
   278  */
   310  */
   319     // success
   351     // success
   320     return;
   352     return;
   321 
   353 
   322 error:
   354 error:
   323     // fail the transaction
   355     // fail the transaction
   324     _evsql_trans_fail(trans);
   356     _evsql_trans_fail(trans, 0);
   325 }
   357 }
   326 
   358 
   327 /*
   359 /*
   328  * The evpq connection was succesfully established.
   360  * The evpq connection was succesfully established.
   329  */ 
   361  */ 
   617     return 0;
   649     return 0;
   618 error:
   650 error:
   619     return -1;
   651     return -1;
   620 }
   652 }
   621 
   653 
       
   654 struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, evsql_trans_done_cb done_fn, void *cb_arg) {
       
   655     struct evsql_trans *trans = NULL;
       
   656 
       
   657     // allocate it
       
   658     if ((trans = calloc(1, sizeof(*trans))) == NULL)
       
   659         ERROR("calloc");
       
   660 
       
   661     // store
       
   662     trans->evsql = evsql;
       
   663     trans->error_fn = error_fn;
       
   664     trans->ready_fn = ready_fn;
       
   665     trans->done_fn = done_fn;
       
   666     trans->cb_arg = cb_arg;
       
   667     trans->type = type;
       
   668 
       
   669     // find a connection
       
   670     if (_evsql_conn_get(evsql, &trans->conn, 0))
       
   671         ERROR("_evsql_conn_get");
       
   672 
       
   673     // associate the conn
       
   674     trans->conn->trans = trans;
       
   675 
       
   676     // is it already ready?
       
   677     if (_evsql_conn_ready(trans->conn) > 0) {
       
   678         // call _evsql_trans_conn_ready directly
       
   679         _evsql_trans_conn_ready(evsql, trans);
       
   680 
       
   681     } else {
       
   682         // otherwise, wait for the conn to be ready
       
   683          
       
   684     }
       
   685 
       
   686     // ok
       
   687     return trans;
       
   688 
       
   689 error:
       
   690     free(trans);
       
   691 
       
   692     return NULL;
       
   693 }
       
   694 
   622 /*
   695 /*
   623  * Validate and allocate the basic stuff for a new query.
   696  * Validate and allocate the basic stuff for a new query.
   624  */
   697  */
   625 static struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) {
   698 static struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) {
   626     struct evsql_query *query;
   699     struct evsql_query *query = NULL;
   627     
   700     
   628     // if it's part of a trans, then make sure the trans is idle
   701     // if it's part of a trans, then make sure the trans is idle
   629     if (trans && trans->query)
   702     if (trans && trans->query)
   630         ERROR("transaction is busy");
   703         ERROR("transaction is busy");
   631 
   704 
   658         
   731         
   659         // assign the query
   732         // assign the query
   660         trans->query = query;
   733         trans->query = query;
   661 
   734 
   662         // execute directly
   735         // execute directly
   663         if (_evsql_query_exec(trans->conn, query, command))
   736         if (_evsql_query_exec(trans->conn, query, command)) {
       
   737             // ack, fail the connection
       
   738             _evsql_conn_fail(trans->conn);
       
   739             
       
   740             // caller frees query
   664             goto error;
   741             goto error;
       
   742         }
   665 
   743 
   666     } else {
   744     } else {
   667         struct evsql_conn *conn;
   745         struct evsql_conn *conn;
   668         
   746         
   669         // find an idle connection
   747         // find an idle connection
   671             ERROR("couldn't allocate a connection for the query");
   749             ERROR("couldn't allocate a connection for the query");
   672 
   750 
   673         // we must enqueue if no idle conn or the conn is not yet ready
   751         // we must enqueue if no idle conn or the conn is not yet ready
   674         if (conn && _evsql_conn_ready(conn) > 0) {
   752         if (conn && _evsql_conn_ready(conn) > 0) {
   675             // execute directly
   753             // execute directly
   676             if (_evsql_query_exec(conn, query, command))
   754             if (_evsql_query_exec(conn, query, command)) {
       
   755                 // ack, fail the connection
       
   756                 _evsql_conn_fail(conn);
       
   757                 
       
   758                 // make sure we don't deadlock any queries, but if this query got a conn directly, then we shouldn't
       
   759                 // have any queries enqueued anyways
       
   760                 assert(TAILQ_EMPTY(&evsql->query_queue));
       
   761                 
       
   762                 // caller frees query
   677                 goto error;
   763                 goto error;
   678 
   764             }
   679 
   765 
   680         } else {
   766         } else {
   681             // copy the command for later execution
   767             // copy the command for later execution
   682             if ((query->command = strdup(command)) == NULL)
   768             if ((query->command = strdup(command)) == NULL)
   683                 ERROR("strdup");
   769                 ERROR("strdup");
   775     _evsql_query_free(query);
   861     _evsql_query_free(query);
   776     
   862     
   777     return NULL;
   863     return NULL;
   778 }
   864 }
   779 
   865 
       
   866 void _evsql_trans_commit_res (const struct evsql_result_info *res, void *arg) {
       
   867     (void) arg;
       
   868 
       
   869     assert(res->trans);
       
   870 
       
   871     // check for errors
       
   872     if (res->error)
       
   873         ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res));
       
   874     
       
   875     // transaction is now done
       
   876     res->trans->done_fn(res->trans, res->trans->cb_arg);
       
   877     
       
   878     // release it
       
   879     _evsql_trans_release(res->trans);
       
   880 
       
   881     // success
       
   882     return;
       
   883 
       
   884 error:
       
   885     _evsql_trans_fail(res->trans, 0);
       
   886 }
       
   887 
       
   888 int evsql_trans_commit (struct evsql_trans *trans) {
       
   889     static const char *sql = "COMMIT TRANSACTION";
       
   890     
       
   891     // query
       
   892     return (evsql_query(trans->evsql, trans, sql, _evsql_trans_commit_res, NULL) != NULL);
       
   893 }
       
   894 
       
   895 void _evsql_trans_rollback_res (const struct evsql_result_info *res, void *arg) {
       
   896     (void) arg;
       
   897 
       
   898     assert(res->trans);
       
   899 
       
   900     // fail the connection on errors
       
   901     if (res->error)
       
   902         ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res));
       
   903 
       
   904     // release it
       
   905     _evsql_trans_release(res->trans);
       
   906 
       
   907     // success
       
   908     return;
       
   909 
       
   910 error:
       
   911     // but do it silently
       
   912     _evsql_trans_fail(res->trans, 1);
       
   913 }
       
   914 
       
   915 int evsql_trans_abort (struct evsql_trans *trans) {
       
   916     static const char *sql = "ROLLBACK TRANSACTION";
       
   917 
       
   918     // query
       
   919     return (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, NULL) != NULL);
       
   920 }
       
   921