src/core.c
changeset 58 02e539965ef4
parent 52 f5037572c326
parent 56 9dfc861273e5
child 64 83d53afa2551
equal deleted inserted replaced
57:527d23bf6441 58:02e539965ef4
    61         conn->query = query;
    61         conn->query = query;
    62 
    62 
    63     return err;
    63     return err;
    64 }
    64 }
    65 
    65 
    66 /*
    66 void _evsql_query_free (struct evsql_query *query) {
    67  * Free the query and related resources, doesn't trigger any callbacks or remove from any queues.
       
    68  *
       
    69  * The command should already be taken care of (NULL).
       
    70  */
       
    71 static void _evsql_query_free (struct evsql_query *query) {
       
    72     if (!query)
    67     if (!query)
    73         return;
    68         return;
    74         
    69         
    75     assert(query->command == NULL);
    70     assert(query->command == NULL);
    76     
    71     
    87 /*
    82 /*
    88  * Execute the callback if res is given, and free the query.
    83  * Execute the callback if res is given, and free the query.
    89  *
    84  *
    90  * The query has been aborted, it will simply be freed
    85  * The query has been aborted, it will simply be freed
    91  */
    86  */
    92 static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *res) {
    87 static void _evsql_query_done (struct evsql_query *query, struct evsql_result *res) {
    93     if (res) {
    88     if (res) {
    94         if (query->cb_fn)
    89         if (query->cb_fn)
    95             // call the callback
    90             // call the callback
    96             query->cb_fn(res, query->cb_arg);
    91             query->cb_fn(res, query->cb_arg);
    97         else
    92         else {
    98             WARNING("supressing cb_fn because query was aborted");
    93             WARNING("supressing cb_fn because query was aborted");
       
    94             
       
    95             // free the results
       
    96             evsql_result_free(res);
       
    97         }
    99     }
    98     }
   100 
    99 
   101     // free
   100     // free
   102     _evsql_query_free(query);
   101     _evsql_query_free(query);
   103 }
   102 }
   104 
   103 
   105 /*
   104 /*
   106  * XXX:
   105  * XXX:
   107  * /
   106  * /
   108 static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *res) {
   107 static void _evsql_destroy (struct evsql *evsql, const struct evsql_result *res) {
   109     struct evsql_query *query;
   108     struct evsql_query *query;
   110     
   109     
   111     // clear the queue
   110     // clear the queue
   112     while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
   111     while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
   113         _evsql_query_done(query, res);
   112         _evsql_query_done(query, res);
   182  * Fail a single query, this will trigger the callback and free it.
   181  * Fail a single query, this will trigger the callback and free it.
   183  *
   182  *
   184  * NOTE: Only for *TRANSACTIONLESS* queries.
   183  * NOTE: Only for *TRANSACTIONLESS* queries.
   185  */
   184  */
   186 static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) {
   185 static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) {
   187     struct evsql_result_info res; ZINIT(res);
   186     struct evsql_result res; ZINIT(res);
   188     
   187     
   189     // set up the result_info
   188     // set up the result_info
   190     res.evsql = evsql;
   189     res.evsql = evsql;
   191     res.trans = NULL;
       
   192     res.error = 1;
   190     res.error = 1;
   193     
   191     
   194     // finish off the query
   192     // finish off the query
   195     _evsql_query_done(query, &res);
   193     _evsql_query_done(query, &res);
   196 }
   194 }
   258     struct evsql_query *query;
   256     struct evsql_query *query;
   259     int err;
   257     int err;
   260     
   258     
   261     // look for waiting queries
   259     // look for waiting queries
   262     while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
   260     while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
       
   261         // zero err
       
   262         err = 0;
       
   263 
   263         // dequeue
   264         // dequeue
   264         TAILQ_REMOVE(&evsql->query_queue, query, entry);
   265         TAILQ_REMOVE(&evsql->query_queue, query, entry);
   265         
   266         
   266         if (conn) {
   267         if (conn) {
   267             // try and execute it
   268             // try and execute it
   301 }
   302 }
   302 
   303 
   303 /*
   304 /*
   304  * Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use.
   305  * Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use.
   305  */
   306  */
   306 static void _evsql_trans_ready (const struct evsql_result_info *res, void *arg) {
   307 static void _evsql_trans_ready (struct evsql_result *res, void *arg) {
   307     (void) arg;
   308     struct evsql_trans *trans = arg;
   308 
   309 
   309     assert(res->trans);
   310     assert(trans != NULL);
   310 
   311 
   311     // check for errors
   312     // check for errors
   312     if (res->error)
   313     if (res->error)
   313         ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res));
   314         ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res));
   314     
   315     
   315     // transaction is now ready for use
   316     // transaction is now ready for use
   316     res->trans->ready_fn(res->trans, res->trans->cb_arg);
   317     trans->ready_fn(trans, trans->cb_arg);
   317     
   318     
   318     // good
   319     // good
   319     return;
   320     return;
   320 
   321 
   321 error:
   322 error:
   322     _evsql_trans_fail(res->trans);
   323     _evsql_trans_fail(trans);
   323 }
   324 }
   324 
   325 
   325 /*
   326 /*
   326  * The transaction's connection is ready, send the 'BEGIN' query.
   327  * The transaction's connection is ready, send the 'BEGIN' query.
   327  *
   328  *
   362     // make sure it wasn't truncated
   363     // make sure it wasn't truncated
   363     if (ret >= EVSQL_QUERY_BEGIN_BUF)
   364     if (ret >= EVSQL_QUERY_BEGIN_BUF)
   364         ERROR("trans_sql overflow: %d >= %d", ret, EVSQL_QUERY_BEGIN_BUF);
   365         ERROR("trans_sql overflow: %d >= %d", ret, EVSQL_QUERY_BEGIN_BUF);
   365     
   366     
   366     // execute the query
   367     // execute the query
   367     if (evsql_query(evsql, trans, trans_sql, _evsql_trans_ready, NULL) == NULL)
   368     if (evsql_query(evsql, trans, trans_sql, _evsql_trans_ready, trans) == NULL)
   368         ERROR("evsql_query");
   369         ERROR("evsql_query");
   369     
   370     
   370     // success
   371     // success
   371     return 0;
   372     return 0;
   372 
   373 
   401     struct evsql_query *query = conn->query;
   402     struct evsql_query *query = conn->query;
   402 
   403 
   403     assert(query != NULL);
   404     assert(query != NULL);
   404 
   405 
   405     // if we get multiple results, only return the first one
   406     // if we get multiple results, only return the first one
   406     if (query->result.evpq) {
   407     if (query->result.pq) {
   407         WARNING("[evsql] evpq query returned multiple results, discarding previous one");
   408         WARNING("[evsql] evpq query returned multiple results, discarding previous one");
   408         
   409         
   409         PQclear(query->result.evpq); query->result.evpq = NULL;
   410         PQclear(query->result.pq); query->result.pq = NULL;
   410     }
   411     }
   411     
   412     
   412     // remember the result
   413     // remember the result
   413     query->result.evpq = result;
   414     query->result.pq = result;
   414 }
   415 }
   415 
   416 
   416 /*
   417 /*
   417  * No more results for this query.
   418  * No more results for this query.
   418  */
   419  */
   419 static void _evsql_evpq_done (struct evpq_conn *_conn, void *arg) {
   420 static void _evsql_evpq_done (struct evpq_conn *_conn, void *arg) {
   420     struct evsql_conn *conn = arg;
   421     struct evsql_conn *conn = arg;
   421     struct evsql_query *query = conn->query;
   422     struct evsql_query *query = conn->query;
   422     struct evsql_result_info res; ZINIT(res);
   423     struct evsql_result res; ZINIT(res);
   423     
   424     
   424     assert(query != NULL);
   425     assert(query != NULL);
   425     
   426     
   426     // set up the result_info
   427     // set up the result_info
   427     res.evsql = conn->evsql;
   428     res.evsql = conn->evsql;
   428     res.trans = conn->trans;
   429     res.result = query->result;
   429     
   430     
   430     if (query->result.evpq == NULL) {
   431     if (query->result.pq == NULL) {
   431         // if a query didn't return any results (bug?), warn and fail the query
   432         // if a query didn't return any results (bug?), warn and fail the query
   432         WARNING("[evsql] evpq query didn't return any results");
   433         WARNING("[evsql] evpq query didn't return any results");
   433 
   434 
   434         res.error = 1;
   435         res.error = 1;
   435     
   436     
   436     } else if (strcmp(PQresultErrorMessage(query->result.evpq), "") != 0) {
   437     } else if (strcmp(PQresultErrorMessage(query->result.pq), "") != 0) {
   437         // the query failed with some error
   438         // the query failed with some error
   438         res.error = 1;
   439         res.error = 1;
   439         res.result.pq = query->result.evpq;
       
   440 
   440 
   441     } else {
   441     } else {
       
   442         // the query succeeded \o/
   442         res.error = 0;
   443         res.error = 0;
   443         res.result.pq = query->result.evpq;
       
   444 
   444 
   445     }
   445     }
   446 
   446 
   447     // de-associate the query from the connection
   447     // de-associate the query from the connection
   448     conn->query = NULL;
   448     conn->query = NULL;
   729 
   729 
   730     return NULL;
   730     return NULL;
   731 }
   731 }
   732 
   732 
   733 /*
   733 /*
   734  * Validate and allocate the basic stuff for a new query.
   734  * Internal query functions
   735  */
   735  */
   736 static struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) {
   736 struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) {
   737     struct evsql_query *query = NULL;
   737     struct evsql_query *query = NULL;
   738     
   738     
   739     // if it's part of a trans, then make sure the trans is idle
   739     // if it's part of a trans, then make sure the trans is idle
   740     if (trans && trans->query)
   740     if (trans && trans->query)
   741         ERROR("transaction is busy");
   741         ERROR("transaction is busy");
   753 
   753 
   754 error:
   754 error:
   755     return NULL;
   755     return NULL;
   756 }
   756 }
   757 
   757 
   758 /*
   758 int _evsql_query_enqueue (struct evsql *evsql, struct evsql_trans *trans, struct evsql_query *query, const char *command) {
   759  * Handle a new query.
       
   760  *
       
   761  * For transactions this will associate the query and then execute it, otherwise this will either find an idle
       
   762  * connection and send the query, or enqueue it.
       
   763  */
       
   764 static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_trans *trans, struct evsql_query *query, const char *command) {
       
   765     // transaction queries are handled differently
   759     // transaction queries are handled differently
   766     if (trans) {
   760     if (trans) {
   767         // it's an in-transaction query
   761         // it's an in-transaction query
   768         assert(trans->query == NULL);
   762         assert(trans->query == NULL);
   769         
   763         
   816 
   810 
   817 error:
   811 error:
   818     return -1;
   812     return -1;
   819 }
   813 }
   820 
   814 
   821 struct evsql_query *evsql_query (struct evsql *evsql, struct evsql_trans *trans, const char *command, evsql_query_cb query_fn, void *cb_arg) {
   815 
   822     struct evsql_query *query = NULL;
   816 void _evsql_trans_commit_res (struct evsql_result *res, void *arg) {
   823     
   817     struct evsql_trans *trans = arg;
   824     // alloc new query
       
   825     if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL)
       
   826         goto error;
       
   827     
       
   828     // just execute the command string directly
       
   829     if (_evsql_query_enqueue(evsql, trans, query, command))
       
   830         goto error;
       
   831 
       
   832     // ok
       
   833     return query;
       
   834 
       
   835 error:
       
   836     _evsql_query_free(query);
       
   837 
       
   838     return NULL;
       
   839 }
       
   840 
       
   841 struct evsql_query *evsql_query_params (struct evsql *evsql, struct evsql_trans *trans, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg) {
       
   842     struct evsql_query *query = NULL;
       
   843     const struct evsql_query_param *param;
       
   844     int idx;
       
   845     
       
   846     // alloc new query
       
   847     if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL)
       
   848         goto error;
       
   849 
       
   850     // count the params
       
   851     for (param = params->list; param->type; param++) 
       
   852         query->params.count++;
       
   853 
       
   854     // allocate the vertical storage for the parameters
       
   855     if (0
       
   856         
       
   857         ||  !(query->params.types    = calloc(query->params.count, sizeof(Oid)))
       
   858         ||  !(query->params.values   = calloc(query->params.count, sizeof(char *)))
       
   859         ||  !(query->params.lengths  = calloc(query->params.count, sizeof(int)))
       
   860         ||  !(query->params.formats  = calloc(query->params.count, sizeof(int)))
       
   861     )
       
   862         ERROR("calloc");
       
   863 
       
   864     // transform
       
   865     for (param = params->list, idx = 0; param->type; param++, idx++) {
       
   866         // `set for NULLs, otherwise not
       
   867         query->params.types[idx] = param->data_raw ? 0 : EVSQL_PQ_ARBITRARY_TYPE_OID;
       
   868         
       
   869         // values
       
   870         query->params.values[idx] = param->data_raw;
       
   871 
       
   872         // lengths
       
   873         query->params.lengths[idx] = param->length;
       
   874 
       
   875         // formats, binary if length is nonzero, but text for NULLs
       
   876         query->params.formats[idx] = param->length && param->data_raw ? 1 : 0;
       
   877     }
       
   878 
       
   879     // result format
       
   880     switch (params->result_fmt) {
       
   881         case EVSQL_FMT_TEXT:
       
   882             query->params.result_format = 0; break;
       
   883 
       
   884         case EVSQL_FMT_BINARY:
       
   885             query->params.result_format = 1; break;
       
   886 
       
   887         default:
       
   888             FATAL("params.result_fmt: %d", params->result_fmt);
       
   889     }
       
   890 
       
   891     // execute it
       
   892     if (_evsql_query_enqueue(evsql, trans, query, command))
       
   893         goto error;
       
   894 
       
   895 #ifdef DEBUG_ENABLED
       
   896     // debug it?
       
   897     DEBUG("evsql.%p: enqueued query=%p on trans=%p", evsql, query, trans);
       
   898     evsql_query_debug(command, params);
       
   899 #endif /* DEBUG_ENABLED */
       
   900 
       
   901     // ok
       
   902     return query;
       
   903 
       
   904 error:
       
   905     _evsql_query_free(query);
       
   906     
       
   907     return NULL;
       
   908 }
       
   909 
       
   910 void evsql_query_abort (struct evsql_trans *trans, struct evsql_query *query) {
       
   911     assert(query);
       
   912 
       
   913     if (trans) {
       
   914         // must be the right query
       
   915         assert(trans->query == query);
       
   916     }
       
   917 
       
   918     // just strip the callback and wait for it to complete as normal
       
   919     query->cb_fn = NULL;
       
   920 }
       
   921 
       
   922 void _evsql_trans_commit_res (const struct evsql_result_info *res, void *arg) {
       
   923     (void) arg;
       
   924 
       
   925     assert(res->trans);
       
   926 
   818 
   927     // check for errors
   819     // check for errors
   928     if (res->error)
   820     if (res->error)
   929         ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res));
   821         ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res));
   930     
   822     
   931     // transaction is now done
   823     // transaction is now done
   932     res->trans->done_fn(res->trans, res->trans->cb_arg);
   824     trans->done_fn(trans, trans->cb_arg);
   933     
   825     
   934     // release it
   826     // release it
   935     _evsql_trans_release(res->trans);
   827     _evsql_trans_release(trans);
   936 
   828 
   937     // success
   829     // success
   938     return;
   830     return;
   939 
   831 
   940 error:
   832 error:
   941     _evsql_trans_fail(res->trans);
   833     _evsql_trans_fail(trans);
   942 }
   834 }
   943 
   835 
   944 int evsql_trans_commit (struct evsql_trans *trans) {
   836 int evsql_trans_commit (struct evsql_trans *trans) {
   945     static const char *sql = "COMMIT TRANSACTION";
   837     static const char *sql = "COMMIT TRANSACTION";
   946 
   838 
   947     if (trans->query)
   839     if (trans->query)
   948         ERROR("cannot COMMIT because transaction is still busy");
   840         ERROR("cannot COMMIT because transaction is still busy");
   949     
   841     
   950     // query
   842     // query
   951     if (evsql_query(trans->evsql, trans, sql, _evsql_trans_commit_res, NULL) == NULL)
   843     if (evsql_query(trans->evsql, trans, sql, _evsql_trans_commit_res, trans) == NULL)
   952         goto error;
   844         goto error;
   953     
   845     
   954     // mark it as commited in case someone wants to abort it
   846     // mark it as commited in case someone wants to abort it
   955     trans->has_commit = 1;
   847     trans->has_commit = 1;
   956 
   848 
   959 
   851 
   960 error:
   852 error:
   961     return -1;
   853     return -1;
   962 }
   854 }
   963 
   855 
   964 void _evsql_trans_rollback_res (const struct evsql_result_info *res, void *arg) {
   856 void _evsql_trans_rollback_res (struct evsql_result *res, void *arg) {
   965     (void) arg;
   857     struct evsql_trans *trans = arg;
   966 
       
   967     assert(res->trans);
       
   968 
   858 
   969     // fail the connection on errors
   859     // fail the connection on errors
   970     if (res->error)
   860     if (res->error)
   971         ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res));
   861         ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res));
   972 
   862 
   973     // release it
   863     // release it
   974     _evsql_trans_release(res->trans);
   864     _evsql_trans_release(trans);
   975 
   865 
   976     // success
   866     // success
   977     return;
   867     return;
   978 
   868 
   979 error:
   869 error:
   980     // fail the connection too, errors are supressed
   870     // fail the connection too, errors are supressed
   981     _evsql_trans_fail(res->trans);
   871     _evsql_trans_fail(trans);
   982 }
   872 }
   983 
   873 
   984 /*
   874 /*
   985  * Used as the ready_fn callback in case of abort, otherwise directly
   875  * Used as the ready_fn callback in case of abort, otherwise directly
   986  */
   876  */
   987 void _evsql_trans_rollback (struct evsql_trans *trans, void *unused) {
   877 void _evsql_trans_rollback (struct evsql_trans *trans, void *arg) {
   988     static const char *sql = "ROLLBACK TRANSACTION";
   878     static const char *sql = "ROLLBACK TRANSACTION";
   989 
   879 
   990     (void) unused;
   880     (void) arg;
   991 
   881 
   992     // query
   882     // query
   993     if (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, NULL) == NULL) {
   883     if (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, trans) == NULL) {
   994         // fail the transaction/connection
   884         // fail the transaction/connection, errors are supressed
   995         _evsql_trans_fail(trans);
   885         _evsql_trans_fail(trans);
   996     }
   886     }
   997 
   887 
   998 }
   888 }
   999 
   889 
  1008 
   898 
  1009     if (trans->query) {
   899     if (trans->query) {
  1010         // gah, some query is running
   900         // gah, some query is running
  1011         WARNING("aborting pending query");
   901         WARNING("aborting pending query");
  1012         
   902         
  1013         // prepare to rollback once complete
   903         // prepare to rollback once complete by hijacking ready_fn
  1014         trans->ready_fn = _evsql_trans_rollback;
   904         trans->ready_fn = _evsql_trans_rollback;
  1015         
   905         
  1016         // abort
   906         // abort
  1017         evsql_query_abort(trans, trans->query);
   907         evsql_query_abort(trans, trans->query);
  1018 
   908