87 /* |
82 /* |
88 * Execute the callback if res is given, and free the query. |
83 * Execute the callback if res is given, and free the query. |
89 * |
84 * |
90 * The query has been aborted, it will simply be freed |
85 * The query has been aborted, it will simply be freed |
91 */ |
86 */ |
92 static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *res) { |
87 static void _evsql_query_done (struct evsql_query *query, struct evsql_result *res) { |
93 if (res) { |
88 if (res) { |
94 if (query->cb_fn) |
89 if (query->cb_fn) |
95 // call the callback |
90 // call the callback |
96 query->cb_fn(res, query->cb_arg); |
91 query->cb_fn(res, query->cb_arg); |
97 else |
92 else { |
98 WARNING("supressing cb_fn because query was aborted"); |
93 WARNING("supressing cb_fn because query was aborted"); |
|
94 |
|
95 // free the results |
|
96 evsql_result_free(res); |
|
97 } |
99 } |
98 } |
100 |
99 |
101 // free |
100 // free |
102 _evsql_query_free(query); |
101 _evsql_query_free(query); |
103 } |
102 } |
104 |
103 |
105 /* |
104 /* |
106 * XXX: |
105 * XXX: |
107 * / |
106 * / |
108 static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *res) { |
107 static void _evsql_destroy (struct evsql *evsql, const struct evsql_result *res) { |
109 struct evsql_query *query; |
108 struct evsql_query *query; |
110 |
109 |
111 // clear the queue |
110 // clear the queue |
112 while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) { |
111 while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) { |
113 _evsql_query_done(query, res); |
112 _evsql_query_done(query, res); |
301 } |
302 } |
302 |
303 |
303 /* |
304 /* |
304 * Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use. |
305 * Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use. |
305 */ |
306 */ |
306 static void _evsql_trans_ready (const struct evsql_result_info *res, void *arg) { |
307 static void _evsql_trans_ready (struct evsql_result *res, void *arg) { |
307 (void) arg; |
308 struct evsql_trans *trans = arg; |
308 |
309 |
309 assert(res->trans); |
310 assert(trans != NULL); |
310 |
311 |
311 // check for errors |
312 // check for errors |
312 if (res->error) |
313 if (res->error) |
313 ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res)); |
314 ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res)); |
314 |
315 |
315 // transaction is now ready for use |
316 // transaction is now ready for use |
316 res->trans->ready_fn(res->trans, res->trans->cb_arg); |
317 trans->ready_fn(trans, trans->cb_arg); |
317 |
318 |
318 // good |
319 // good |
319 return; |
320 return; |
320 |
321 |
321 error: |
322 error: |
322 _evsql_trans_fail(res->trans); |
323 _evsql_trans_fail(trans); |
323 } |
324 } |
324 |
325 |
325 /* |
326 /* |
326 * The transaction's connection is ready, send the 'BEGIN' query. |
327 * The transaction's connection is ready, send the 'BEGIN' query. |
327 * |
328 * |
401 struct evsql_query *query = conn->query; |
402 struct evsql_query *query = conn->query; |
402 |
403 |
403 assert(query != NULL); |
404 assert(query != NULL); |
404 |
405 |
405 // if we get multiple results, only return the first one |
406 // if we get multiple results, only return the first one |
406 if (query->result.evpq) { |
407 if (query->result.pq) { |
407 WARNING("[evsql] evpq query returned multiple results, discarding previous one"); |
408 WARNING("[evsql] evpq query returned multiple results, discarding previous one"); |
408 |
409 |
409 PQclear(query->result.evpq); query->result.evpq = NULL; |
410 PQclear(query->result.pq); query->result.pq = NULL; |
410 } |
411 } |
411 |
412 |
412 // remember the result |
413 // remember the result |
413 query->result.evpq = result; |
414 query->result.pq = result; |
414 } |
415 } |
415 |
416 |
416 /* |
417 /* |
417 * No more results for this query. |
418 * No more results for this query. |
418 */ |
419 */ |
419 static void _evsql_evpq_done (struct evpq_conn *_conn, void *arg) { |
420 static void _evsql_evpq_done (struct evpq_conn *_conn, void *arg) { |
420 struct evsql_conn *conn = arg; |
421 struct evsql_conn *conn = arg; |
421 struct evsql_query *query = conn->query; |
422 struct evsql_query *query = conn->query; |
422 struct evsql_result_info res; ZINIT(res); |
423 struct evsql_result res; ZINIT(res); |
423 |
424 |
424 assert(query != NULL); |
425 assert(query != NULL); |
425 |
426 |
426 // set up the result_info |
427 // set up the result_info |
427 res.evsql = conn->evsql; |
428 res.evsql = conn->evsql; |
428 res.trans = conn->trans; |
429 res.result = query->result; |
429 |
430 |
430 if (query->result.evpq == NULL) { |
431 if (query->result.pq == NULL) { |
431 // if a query didn't return any results (bug?), warn and fail the query |
432 // if a query didn't return any results (bug?), warn and fail the query |
432 WARNING("[evsql] evpq query didn't return any results"); |
433 WARNING("[evsql] evpq query didn't return any results"); |
433 |
434 |
434 res.error = 1; |
435 res.error = 1; |
435 |
436 |
436 } else if (strcmp(PQresultErrorMessage(query->result.evpq), "") != 0) { |
437 } else if (strcmp(PQresultErrorMessage(query->result.pq), "") != 0) { |
437 // the query failed with some error |
438 // the query failed with some error |
438 res.error = 1; |
439 res.error = 1; |
439 res.result.pq = query->result.evpq; |
|
440 |
440 |
441 } else { |
441 } else { |
|
442 // the query succeeded \o/ |
442 res.error = 0; |
443 res.error = 0; |
443 res.result.pq = query->result.evpq; |
|
444 |
444 |
445 } |
445 } |
446 |
446 |
447 // de-associate the query from the connection |
447 // de-associate the query from the connection |
448 conn->query = NULL; |
448 conn->query = NULL; |
816 |
810 |
817 error: |
811 error: |
818 return -1; |
812 return -1; |
819 } |
813 } |
820 |
814 |
821 struct evsql_query *evsql_query (struct evsql *evsql, struct evsql_trans *trans, const char *command, evsql_query_cb query_fn, void *cb_arg) { |
815 |
822 struct evsql_query *query = NULL; |
816 void _evsql_trans_commit_res (struct evsql_result *res, void *arg) { |
823 |
817 struct evsql_trans *trans = arg; |
824 // alloc new query |
|
825 if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL) |
|
826 goto error; |
|
827 |
|
828 // just execute the command string directly |
|
829 if (_evsql_query_enqueue(evsql, trans, query, command)) |
|
830 goto error; |
|
831 |
|
832 // ok |
|
833 return query; |
|
834 |
|
835 error: |
|
836 _evsql_query_free(query); |
|
837 |
|
838 return NULL; |
|
839 } |
|
840 |
|
841 struct evsql_query *evsql_query_params (struct evsql *evsql, struct evsql_trans *trans, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg) { |
|
842 struct evsql_query *query = NULL; |
|
843 const struct evsql_query_param *param; |
|
844 int idx; |
|
845 |
|
846 // alloc new query |
|
847 if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL) |
|
848 goto error; |
|
849 |
|
850 // count the params |
|
851 for (param = params->list; param->type; param++) |
|
852 query->params.count++; |
|
853 |
|
854 // allocate the vertical storage for the parameters |
|
855 if (0 |
|
856 |
|
857 || !(query->params.types = calloc(query->params.count, sizeof(Oid))) |
|
858 || !(query->params.values = calloc(query->params.count, sizeof(char *))) |
|
859 || !(query->params.lengths = calloc(query->params.count, sizeof(int))) |
|
860 || !(query->params.formats = calloc(query->params.count, sizeof(int))) |
|
861 ) |
|
862 ERROR("calloc"); |
|
863 |
|
864 // transform |
|
865 for (param = params->list, idx = 0; param->type; param++, idx++) { |
|
866 // `set for NULLs, otherwise not |
|
867 query->params.types[idx] = param->data_raw ? 0 : EVSQL_PQ_ARBITRARY_TYPE_OID; |
|
868 |
|
869 // values |
|
870 query->params.values[idx] = param->data_raw; |
|
871 |
|
872 // lengths |
|
873 query->params.lengths[idx] = param->length; |
|
874 |
|
875 // formats, binary if length is nonzero, but text for NULLs |
|
876 query->params.formats[idx] = param->length && param->data_raw ? 1 : 0; |
|
877 } |
|
878 |
|
879 // result format |
|
880 switch (params->result_fmt) { |
|
881 case EVSQL_FMT_TEXT: |
|
882 query->params.result_format = 0; break; |
|
883 |
|
884 case EVSQL_FMT_BINARY: |
|
885 query->params.result_format = 1; break; |
|
886 |
|
887 default: |
|
888 FATAL("params.result_fmt: %d", params->result_fmt); |
|
889 } |
|
890 |
|
891 // execute it |
|
892 if (_evsql_query_enqueue(evsql, trans, query, command)) |
|
893 goto error; |
|
894 |
|
895 #ifdef DEBUG_ENABLED |
|
896 // debug it? |
|
897 DEBUG("evsql.%p: enqueued query=%p on trans=%p", evsql, query, trans); |
|
898 evsql_query_debug(command, params); |
|
899 #endif /* DEBUG_ENABLED */ |
|
900 |
|
901 // ok |
|
902 return query; |
|
903 |
|
904 error: |
|
905 _evsql_query_free(query); |
|
906 |
|
907 return NULL; |
|
908 } |
|
909 |
|
910 void evsql_query_abort (struct evsql_trans *trans, struct evsql_query *query) { |
|
911 assert(query); |
|
912 |
|
913 if (trans) { |
|
914 // must be the right query |
|
915 assert(trans->query == query); |
|
916 } |
|
917 |
|
918 // just strip the callback and wait for it to complete as normal |
|
919 query->cb_fn = NULL; |
|
920 } |
|
921 |
|
922 void _evsql_trans_commit_res (const struct evsql_result_info *res, void *arg) { |
|
923 (void) arg; |
|
924 |
|
925 assert(res->trans); |
|
926 |
818 |
927 // check for errors |
819 // check for errors |
928 if (res->error) |
820 if (res->error) |
929 ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res)); |
821 ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res)); |
930 |
822 |
931 // transaction is now done |
823 // transaction is now done |
932 res->trans->done_fn(res->trans, res->trans->cb_arg); |
824 trans->done_fn(trans, trans->cb_arg); |
933 |
825 |
934 // release it |
826 // release it |
935 _evsql_trans_release(res->trans); |
827 _evsql_trans_release(trans); |
936 |
828 |
937 // success |
829 // success |
938 return; |
830 return; |
939 |
831 |
940 error: |
832 error: |
941 _evsql_trans_fail(res->trans); |
833 _evsql_trans_fail(trans); |
942 } |
834 } |
943 |
835 |
944 int evsql_trans_commit (struct evsql_trans *trans) { |
836 int evsql_trans_commit (struct evsql_trans *trans) { |
945 static const char *sql = "COMMIT TRANSACTION"; |
837 static const char *sql = "COMMIT TRANSACTION"; |
946 |
838 |
947 if (trans->query) |
839 if (trans->query) |
948 ERROR("cannot COMMIT because transaction is still busy"); |
840 ERROR("cannot COMMIT because transaction is still busy"); |
949 |
841 |
950 // query |
842 // query |
951 if (evsql_query(trans->evsql, trans, sql, _evsql_trans_commit_res, NULL) == NULL) |
843 if (evsql_query(trans->evsql, trans, sql, _evsql_trans_commit_res, trans) == NULL) |
952 goto error; |
844 goto error; |
953 |
845 |
954 // mark it as commited in case someone wants to abort it |
846 // mark it as commited in case someone wants to abort it |
955 trans->has_commit = 1; |
847 trans->has_commit = 1; |
956 |
848 |
959 |
851 |
960 error: |
852 error: |
961 return -1; |
853 return -1; |
962 } |
854 } |
963 |
855 |
964 void _evsql_trans_rollback_res (const struct evsql_result_info *res, void *arg) { |
856 void _evsql_trans_rollback_res (struct evsql_result *res, void *arg) { |
965 (void) arg; |
857 struct evsql_trans *trans = arg; |
966 |
|
967 assert(res->trans); |
|
968 |
858 |
969 // fail the connection on errors |
859 // fail the connection on errors |
970 if (res->error) |
860 if (res->error) |
971 ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res)); |
861 ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res)); |
972 |
862 |
973 // release it |
863 // release it |
974 _evsql_trans_release(res->trans); |
864 _evsql_trans_release(trans); |
975 |
865 |
976 // success |
866 // success |
977 return; |
867 return; |
978 |
868 |
979 error: |
869 error: |
980 // fail the connection too, errors are supressed |
870 // fail the connection too, errors are supressed |
981 _evsql_trans_fail(res->trans); |
871 _evsql_trans_fail(trans); |
982 } |
872 } |
983 |
873 |
984 /* |
874 /* |
985 * Used as the ready_fn callback in case of abort, otherwise directly |
875 * Used as the ready_fn callback in case of abort, otherwise directly |
986 */ |
876 */ |
987 void _evsql_trans_rollback (struct evsql_trans *trans, void *unused) { |
877 void _evsql_trans_rollback (struct evsql_trans *trans, void *arg) { |
988 static const char *sql = "ROLLBACK TRANSACTION"; |
878 static const char *sql = "ROLLBACK TRANSACTION"; |
989 |
879 |
990 (void) unused; |
880 (void) arg; |
991 |
881 |
992 // query |
882 // query |
993 if (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, NULL) == NULL) { |
883 if (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, trans) == NULL) { |
994 // fail the transaction/connection |
884 // fail the transaction/connection, errors are supressed |
995 _evsql_trans_fail(trans); |
885 _evsql_trans_fail(trans); |
996 } |
886 } |
997 |
887 |
998 } |
888 } |
999 |
889 |