src/evsql/evsql.c
branchnew-evsql
changeset 45 424ce5ab82fd
parent 44 9e76ee9729b6
child 47 8c6c459eacb7
equal deleted inserted replaced
44:9e76ee9729b6 45:424ce5ab82fd
    61         conn->query = query;
    61         conn->query = query;
    62 
    62 
    63     return err;
    63     return err;
    64 }
    64 }
    65 
    65 
    66 /*
    66 void _evsql_query_free (struct evsql_query *query) {
    67  * Free the query and related resources, doesn't trigger any callbacks or remove from any queues.
       
    68  *
       
    69  * The command should already be taken care of (NULL).
       
    70  */
       
    71 static void _evsql_query_free (struct evsql_query *query) {
       
    72     if (!query)
    67     if (!query)
    73         return;
    68         return;
    74         
    69         
    75     assert(query->command == NULL);
    70     assert(query->command == NULL);
    76     
    71     
    87 /*
    82 /*
    88  * Execute the callback if res is given, and free the query.
    83  * Execute the callback if res is given, and free the query.
    89  *
    84  *
    90  * The query has been aborted, it will simply be freed
    85  * The query has been aborted, it will simply be freed
    91  */
    86  */
    92 static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *res) {
    87 static void _evsql_query_done (struct evsql_query *query, struct evsql_result *res) {
    93     if (res) {
    88     if (res) {
    94         if (query->cb_fn)
    89         if (query->cb_fn)
    95             // call the callback
    90             // call the callback
    96             query->cb_fn(res, query->cb_arg);
    91             query->cb_fn(res, query->cb_arg);
    97         else
    92         else {
    98             WARNING("supressing cb_fn because query was aborted");
    93             WARNING("supressing cb_fn because query was aborted");
       
    94             
       
    95             // free the results
       
    96             evsql_result_free(res);
       
    97         }
    99     }
    98     }
   100 
    99 
   101     // free
   100     // free
   102     _evsql_query_free(query);
   101     _evsql_query_free(query);
   103 }
   102 }
   104 
   103 
   105 /*
   104 /*
   106  * XXX:
   105  * XXX:
   107  * /
   106  * /
   108 static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *res) {
   107 static void _evsql_destroy (struct evsql *evsql, const struct evsql_result *res) {
   109     struct evsql_query *query;
   108     struct evsql_query *query;
   110     
   109     
   111     // clear the queue
   110     // clear the queue
   112     while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
   111     while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
   113         _evsql_query_done(query, res);
   112         _evsql_query_done(query, res);
   182  * Fail a single query, this will trigger the callback and free it.
   181  * Fail a single query, this will trigger the callback and free it.
   183  *
   182  *
   184  * NOTE: Only for *TRANSACTIONLESS* queries.
   183  * NOTE: Only for *TRANSACTIONLESS* queries.
   185  */
   184  */
   186 static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) {
   185 static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) {
   187     struct evsql_result_info res; ZINIT(res);
   186     struct evsql_result res; ZINIT(res);
   188     
   187     
   189     // set up the result_info
   188     // set up the result_info
   190     res.evsql = evsql;
   189     res.evsql = evsql;
   191     res.trans = NULL;
       
   192     res.error = 1;
   190     res.error = 1;
   193     
   191     
   194     // finish off the query
   192     // finish off the query
   195     _evsql_query_done(query, &res);
   193     _evsql_query_done(query, &res);
   196 }
   194 }
   301 }
   299 }
   302 
   300 
   303 /*
   301 /*
   304  * Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use.
   302  * Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use.
   305  */
   303  */
   306 static void _evsql_trans_ready (const struct evsql_result_info *res, void *arg) {
   304 static void _evsql_trans_ready (struct evsql_result *res, void *arg) {
   307     (void) arg;
   305     struct evsql_trans *trans = arg;
   308 
   306 
   309     assert(res->trans);
   307     assert(trans != NULL);
   310 
   308 
   311     // check for errors
   309     // check for errors
   312     if (res->error)
   310     if (res->error)
   313         ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res));
   311         ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res));
   314     
   312     
   315     // transaction is now ready for use
   313     // transaction is now ready for use
   316     res->trans->ready_fn(res->trans, res->trans->cb_arg);
   314     trans->ready_fn(trans, trans->cb_arg);
   317     
   315     
   318     // good
   316     // good
   319     return;
   317     return;
   320 
   318 
   321 error:
   319 error:
   322     _evsql_trans_fail(res->trans);
   320     _evsql_trans_fail(trans);
   323 }
   321 }
   324 
   322 
   325 /*
   323 /*
   326  * The transaction's connection is ready, send the 'BEGIN' query.
   324  * The transaction's connection is ready, send the 'BEGIN' query.
   327  *
   325  *
   362     // make sure it wasn't truncated
   360     // make sure it wasn't truncated
   363     if (ret >= EVSQL_QUERY_BEGIN_BUF)
   361     if (ret >= EVSQL_QUERY_BEGIN_BUF)
   364         ERROR("trans_sql overflow: %d >= %d", ret, EVSQL_QUERY_BEGIN_BUF);
   362         ERROR("trans_sql overflow: %d >= %d", ret, EVSQL_QUERY_BEGIN_BUF);
   365     
   363     
   366     // execute the query
   364     // execute the query
   367     if (evsql_query(evsql, trans, trans_sql, _evsql_trans_ready, NULL) == NULL)
   365     if (evsql_query(evsql, trans, trans_sql, _evsql_trans_ready, trans) == NULL)
   368         ERROR("evsql_query");
   366         ERROR("evsql_query");
   369     
   367     
   370     // success
   368     // success
   371     return 0;
   369     return 0;
   372 
   370 
   401     struct evsql_query *query = conn->query;
   399     struct evsql_query *query = conn->query;
   402 
   400 
   403     assert(query != NULL);
   401     assert(query != NULL);
   404 
   402 
   405     // if we get multiple results, only return the first one
   403     // if we get multiple results, only return the first one
   406     if (query->result.evpq) {
   404     if (query->result.pq) {
   407         WARNING("[evsql] evpq query returned multiple results, discarding previous one");
   405         WARNING("[evsql] evpq query returned multiple results, discarding previous one");
   408         
   406         
   409         PQclear(query->result.evpq); query->result.evpq = NULL;
   407         PQclear(query->result.pq); query->result.pq = NULL;
   410     }
   408     }
   411     
   409     
   412     // remember the result
   410     // remember the result
   413     query->result.evpq = result;
   411     query->result.pq = result;
   414 }
   412 }
   415 
   413 
   416 /*
   414 /*
   417  * No more results for this query.
   415  * No more results for this query.
   418  */
   416  */
   419 static void _evsql_evpq_done (struct evpq_conn *_conn, void *arg) {
   417 static void _evsql_evpq_done (struct evpq_conn *_conn, void *arg) {
   420     struct evsql_conn *conn = arg;
   418     struct evsql_conn *conn = arg;
   421     struct evsql_query *query = conn->query;
   419     struct evsql_query *query = conn->query;
   422     struct evsql_result_info res; ZINIT(res);
   420     struct evsql_result res; ZINIT(res);
   423     
   421     
   424     assert(query != NULL);
   422     assert(query != NULL);
   425     
   423     
   426     // set up the result_info
   424     // set up the result_info
   427     res.evsql = conn->evsql;
   425     res.evsql = conn->evsql;
   428     res.trans = conn->trans;
   426     res.result = query->result;
   429     
   427     
   430     if (query->result.evpq == NULL) {
   428     if (query->result.pq == NULL) {
   431         // if a query didn't return any results (bug?), warn and fail the query
   429         // if a query didn't return any results (bug?), warn and fail the query
   432         WARNING("[evsql] evpq query didn't return any results");
   430         WARNING("[evsql] evpq query didn't return any results");
   433 
   431 
   434         res.error = 1;
   432         res.error = 1;
   435     
   433     
   436     } else if (strcmp(PQresultErrorMessage(query->result.evpq), "") != 0) {
   434     } else if (strcmp(PQresultErrorMessage(query->result.pq), "") != 0) {
   437         // the query failed with some error
   435         // the query failed with some error
   438         res.error = 1;
   436         res.error = 1;
   439         res.result.pq = query->result.evpq;
       
   440 
   437 
   441     } else {
   438     } else {
       
   439         // the query succeeded \o/
   442         res.error = 0;
   440         res.error = 0;
   443         res.result.pq = query->result.evpq;
       
   444 
   441 
   445     }
   442     }
   446 
   443 
   447     // de-associate the query from the connection
   444     // de-associate the query from the connection
   448     conn->query = NULL;
   445     conn->query = NULL;
   731 }
   728 }
   732 
   729 
   733 /*
   730 /*
   734  * Internal query functions
   731  * Internal query functions
   735  */
   732  */
   736 static struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) {
   733 struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) {
   737     struct evsql_query *query = NULL;
   734     struct evsql_query *query = NULL;
   738     
   735     
   739     // if it's part of a trans, then make sure the trans is idle
   736     // if it's part of a trans, then make sure the trans is idle
   740     if (trans && trans->query)
   737     if (trans && trans->query)
   741         ERROR("transaction is busy");
   738         ERROR("transaction is busy");
   753 
   750 
   754 error:
   751 error:
   755     return NULL;
   752     return NULL;
   756 }
   753 }
   757 
   754 
   758 static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_trans *trans, struct evsql_query *query, const char *command) {
   755 int _evsql_query_enqueue (struct evsql *evsql, struct evsql_trans *trans, struct evsql_query *query, const char *command) {
   759     // transaction queries are handled differently
   756     // transaction queries are handled differently
   760     if (trans) {
   757     if (trans) {
   761         // it's an in-transaction query
   758         // it's an in-transaction query
   762         assert(trans->query == NULL);
   759         assert(trans->query == NULL);
   763         
   760         
   811 error:
   808 error:
   812     return -1;
   809     return -1;
   813 }
   810 }
   814 
   811 
   815 
   812 
   816 void _evsql_trans_commit_res (const struct evsql_result_info *res, void *arg) {
   813 void _evsql_trans_commit_res (struct evsql_result *res, void *arg) {
   817     (void) arg;
   814     struct evsql_trans *trans = arg;
   818 
       
   819     assert(res->trans);
       
   820 
   815 
   821     // check for errors
   816     // check for errors
   822     if (res->error)
   817     if (res->error)
   823         ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res));
   818         ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res));
   824     
   819     
   825     // transaction is now done
   820     // transaction is now done
   826     res->trans->done_fn(res->trans, res->trans->cb_arg);
   821     trans->done_fn(trans, trans->cb_arg);
   827     
   822     
   828     // release it
   823     // release it
   829     _evsql_trans_release(res->trans);
   824     _evsql_trans_release(trans);
   830 
   825 
   831     // success
   826     // success
   832     return;
   827     return;
   833 
   828 
   834 error:
   829 error:
   835     _evsql_trans_fail(res->trans);
   830     _evsql_trans_fail(trans);
   836 }
   831 }
   837 
   832 
   838 int evsql_trans_commit (struct evsql_trans *trans) {
   833 int evsql_trans_commit (struct evsql_trans *trans) {
   839     static const char *sql = "COMMIT TRANSACTION";
   834     static const char *sql = "COMMIT TRANSACTION";
   840 
   835 
   853 
   848 
   854 error:
   849 error:
   855     return -1;
   850     return -1;
   856 }
   851 }
   857 
   852 
   858 void _evsql_trans_rollback_res (const struct evsql_result_info *res, void *arg) {
   853 void _evsql_trans_rollback_res (struct evsql_result *res, void *arg) {
   859     (void) arg;
   854     struct evsql_trans *trans = arg;
   860 
       
   861     assert(res->trans);
       
   862 
   855 
   863     // fail the connection on errors
   856     // fail the connection on errors
   864     if (res->error)
   857     if (res->error)
   865         ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res));
   858         ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res));
   866 
   859 
   867     // release it
   860     // release it
   868     _evsql_trans_release(res->trans);
   861     _evsql_trans_release(trans);
   869 
   862 
   870     // success
   863     // success
   871     return;
   864     return;
   872 
   865 
   873 error:
   866 error:
   874     // fail the connection too, errors are supressed
   867     // fail the connection too, errors are supressed
   875     _evsql_trans_fail(res->trans);
   868     _evsql_trans_fail(trans);
   876 }
   869 }
   877 
   870 
   878 /*
   871 /*
   879  * Used as the ready_fn callback in case of abort, otherwise directly
   872  * Used as the ready_fn callback in case of abort, otherwise directly
   880  */
   873  */
   881 void _evsql_trans_rollback (struct evsql_trans *trans, void *unused) {
   874 void _evsql_trans_rollback (struct evsql_trans *trans, void *arg) {
   882     static const char *sql = "ROLLBACK TRANSACTION";
   875     static const char *sql = "ROLLBACK TRANSACTION";
   883 
   876 
   884     (void) unused;
   877     (void) arg;
   885 
   878 
   886     // query
   879     // query
   887     if (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, NULL) == NULL) {
   880     if (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, trans) == NULL) {
   888         // fail the transaction/connection
   881         // fail the transaction/connection, errors are supressed
   889         _evsql_trans_fail(trans);
   882         _evsql_trans_fail(trans);
   890     }
   883     }
   891 
   884 
   892 }
   885 }
   893 
   886 
   902 
   895 
   903     if (trans->query) {
   896     if (trans->query) {
   904         // gah, some query is running
   897         // gah, some query is running
   905         WARNING("aborting pending query");
   898         WARNING("aborting pending query");
   906         
   899         
   907         // prepare to rollback once complete
   900         // prepare to rollback once complete by hijacking ready_fn
   908         trans->ready_fn = _evsql_trans_rollback;
   901         trans->ready_fn = _evsql_trans_rollback;
   909         
   902         
   910         // abort
   903         // abort
   911         evsql_query_abort(trans, trans->query);
   904         evsql_query_abort(trans, trans->query);
   912 
   905