terom@29: #define _GNU_SOURCE terom@29: #include terom@29: #include terom@29: #include terom@29: terom@56: #include "internal.h" terom@56: #include "lib/log.h" terom@56: #include "lib/error.h" terom@56: #include "lib/misc.h" terom@29: terom@29: /* terom@29: * A couple function prototypes terom@29: */ terom@29: static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn); terom@29: terom@29: /* terom@29: * Actually execute the given query. terom@29: * terom@29: * The backend should be able to accept the query at this time. terom@29: * terom@29: * You should assume that if trying to execute a query fails, then the connection should also be considred as failed. terom@29: */ terom@29: static int _evsql_query_exec (struct evsql_conn *conn, struct evsql_query *query, const char *command) { terom@29: int err; terom@29: terom@36: DEBUG("evsql.%p: exec query=%p on trans=%p on conn=%p:", conn->evsql, query, conn->trans, conn); terom@36: terom@29: switch (conn->evsql->type) { terom@29: case EVSQL_EVPQ: terom@29: // got params? terom@29: if (query->params.count) { terom@29: err = evpq_query_params(conn->engine.evpq, command, terom@29: query->params.count, terom@29: query->params.types, terom@29: query->params.values, terom@29: query->params.lengths, terom@29: query->params.formats, terom@29: query->params.result_format terom@29: ); terom@29: terom@29: } else { terom@29: // plain 'ole query terom@29: err = evpq_query(conn->engine.evpq, command); terom@29: } terom@29: terom@29: if (err) { terom@29: if (PQstatus(evpq_pgconn(conn->engine.evpq)) != CONNECTION_OK) terom@29: WARNING("conn failed"); terom@29: else terom@29: WARNING("query failed, dropping conn as well"); terom@29: } terom@29: terom@29: break; terom@29: terom@29: default: terom@29: FATAL("evsql->type"); terom@29: } terom@29: terom@29: if (!err) terom@29: // assign the query terom@29: conn->query = query; terom@29: terom@29: return err; terom@29: } terom@29: terom@45: void _evsql_query_free (struct evsql_query *query) { terom@29: if (!query) terom@29: return; terom@29: terom@29: assert(query->command == NULL); terom@29: terom@29: // free params if present terom@29: free(query->params.types); terom@29: free(query->params.values); terom@29: free(query->params.lengths); terom@29: free(query->params.formats); terom@29: terom@29: // free the query itself terom@29: free(query); terom@29: } terom@29: terom@29: /* terom@29: * Execute the callback if res is given, and free the query. terom@29: * terom@29: * The query has been aborted, it will simply be freed terom@29: */ terom@45: static void _evsql_query_done (struct evsql_query *query, struct evsql_result *res) { terom@29: if (res) { terom@29: if (query->cb_fn) terom@29: // call the callback terom@29: query->cb_fn(res, query->cb_arg); terom@45: else { terom@29: WARNING("supressing cb_fn because query was aborted"); terom@45: terom@45: // free the results terom@45: evsql_result_free(res); terom@45: } terom@29: } terom@29: terom@29: // free terom@29: _evsql_query_free(query); terom@29: } terom@29: terom@29: /* terom@29: * XXX: terom@29: * / terom@45: static void _evsql_destroy (struct evsql *evsql, const struct evsql_result *res) { terom@29: struct evsql_query *query; terom@29: terom@29: // clear the queue terom@29: while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) { terom@29: _evsql_query_done(query, res); terom@29: terom@29: TAILQ_REMOVE(&evsql->query_queue, query, entry); terom@29: } terom@29: terom@29: // free terom@29: free(evsql); terom@29: } terom@29: */ terom@29: terom@29: /* terom@29: * Free the transaction, it should already be deassociated from the query and conn. terom@29: */ terom@29: static void _evsql_trans_free (struct evsql_trans *trans) { terom@29: // ensure we don't leak anything terom@29: assert(trans->query == NULL); terom@29: assert(trans->conn == NULL); terom@29: terom@29: // free terom@29: free(trans); terom@29: } terom@29: terom@29: /* terom@29: * Release a connection. It should already be deassociated from the trans and query. terom@29: * terom@29: * Releases the engine, removes from the conn_list and frees this. terom@29: */ terom@29: static void _evsql_conn_release (struct evsql_conn *conn) { terom@29: // ensure we don't leak anything terom@29: assert(conn->trans == NULL); terom@29: assert(conn->query == NULL); terom@29: terom@29: // release the engine terom@29: switch (conn->evsql->type) { terom@29: case EVSQL_EVPQ: terom@29: evpq_release(conn->engine.evpq); terom@29: break; terom@29: terom@29: default: terom@29: FATAL("evsql->type"); terom@29: } terom@29: terom@29: // remove from list terom@29: LIST_REMOVE(conn, entry); terom@29: terom@29: // catch deadlocks terom@29: assert(!LIST_EMPTY(&conn->evsql->conn_list) || TAILQ_EMPTY(&conn->evsql->query_queue)); terom@29: terom@29: // free terom@29: free(conn); terom@29: } terom@29: terom@29: /* terom@29: * Release a transaction, it should already be deassociated from the query. terom@29: * terom@29: * Perform a two-way-deassociation with the conn, and then free the trans. terom@29: */ terom@29: static void _evsql_trans_release (struct evsql_trans *trans) { terom@29: assert(trans->query == NULL); terom@29: assert(trans->conn != NULL); terom@29: terom@29: // deassociate the conn terom@29: trans->conn->trans = NULL; trans->conn = NULL; terom@29: terom@29: // free the trans terom@29: _evsql_trans_free(trans); terom@29: } terom@29: terom@29: /* terom@29: * Fail a single query, this will trigger the callback and free it. terom@29: * terom@29: * NOTE: Only for *TRANSACTIONLESS* queries. terom@29: */ terom@29: static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) { terom@45: struct evsql_result res; ZINIT(res); terom@29: terom@29: // set up the result_info terom@29: res.evsql = evsql; terom@29: res.error = 1; terom@29: terom@29: // finish off the query terom@29: _evsql_query_done(query, &res); terom@29: } terom@29: terom@29: /* terom@29: * Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the terom@29: * conn, and then free the trans. terom@29: */ terom@29: static void _evsql_trans_fail (struct evsql_trans *trans) { terom@29: if (trans->query) { terom@29: // free the query silently terom@29: _evsql_query_free(trans->query); trans->query = NULL; terom@29: terom@29: // also deassociate it from the conn! terom@29: trans->conn->query = NULL; terom@29: } terom@29: terom@29: // tell the user terom@29: // XXX: trans is in a bad state during this call terom@29: if (trans->error_fn) terom@29: trans->error_fn(trans, trans->cb_arg); terom@29: else terom@29: WARNING("supressing error because error_fn was NULL"); terom@29: terom@29: // deassociate and release the conn terom@29: trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL; terom@29: terom@29: // pump the queue for requests that were waiting for this connection terom@29: _evsql_pump(trans->evsql, NULL); terom@29: terom@29: // free the trans terom@29: _evsql_trans_free(trans); terom@29: } terom@29: terom@29: /* terom@29: * Fail a connection. If the connection is transactional, this will just call _evsql_trans_fail, but otherwise it will terom@29: * fail any ongoing query, and then release the connection. terom@29: */ terom@29: static void _evsql_conn_fail (struct evsql_conn *conn) { terom@29: if (conn->trans) { terom@29: // let transactions handle their connection failures terom@29: _evsql_trans_fail(conn->trans); terom@29: terom@29: } else { terom@29: if (conn->query) { terom@29: // fail the in-progress query terom@29: _evsql_query_fail(conn->evsql, conn->query); conn->query = NULL; terom@29: } terom@29: terom@29: // finish off the whole connection terom@29: _evsql_conn_release(conn); terom@29: } terom@29: } terom@29: terom@29: /* terom@29: * Processes enqueued non-transactional queries until the queue is empty, or we managed to exec a query. terom@29: * terom@29: * If execing a query on a connection fails, both the query and the connection are failed (in that order). terom@29: * terom@29: * Any further queries will then also be failed, because there's no reconnection/retry logic yet. terom@29: * terom@29: * This means that if conn is NULL, all queries are failed. terom@29: */ terom@29: static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) { terom@29: struct evsql_query *query; terom@29: int err; terom@29: terom@29: // look for waiting queries terom@29: while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) { terom@52: // zero err terom@52: err = 0; terom@52: terom@29: // dequeue terom@29: TAILQ_REMOVE(&evsql->query_queue, query, entry); terom@29: terom@29: if (conn) { terom@29: // try and execute it terom@29: err = _evsql_query_exec(conn, query, query->command); terom@29: } terom@29: terom@29: // free the command buf terom@29: free(query->command); query->command = NULL; terom@29: terom@29: if (err || !conn) { terom@29: if (!conn) { terom@29: // warn when dropping queries terom@29: WARNING("failing query becuse there are no conns"); terom@29: } terom@29: terom@29: // fail the query terom@29: _evsql_query_fail(evsql, query); terom@29: terom@29: if (conn) { terom@29: // fail the connection terom@29: WARNING("failing the connection because a query-exec failed"); terom@29: terom@29: _evsql_conn_fail(conn); conn = NULL; terom@29: } terom@29: terom@29: } else { terom@29: // we have succesfully enqueued a query, and we can wait for this connection to complete terom@29: break; terom@29: terom@29: } terom@29: terom@29: // handle the rest of the queue terom@29: } terom@29: terom@29: // ok terom@29: return; terom@29: } terom@29: terom@29: /* terom@29: * Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use. terom@29: */ terom@45: static void _evsql_trans_ready (struct evsql_result *res, void *arg) { terom@45: struct evsql_trans *trans = arg; terom@29: terom@45: assert(trans != NULL); terom@29: terom@29: // check for errors terom@29: if (res->error) terom@29: ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res)); terom@29: terom@29: // transaction is now ready for use terom@45: trans->ready_fn(trans, trans->cb_arg); terom@29: terom@29: // good terom@29: return; terom@29: terom@29: error: terom@45: _evsql_trans_fail(trans); terom@29: } terom@29: terom@29: /* terom@29: * The transaction's connection is ready, send the 'BEGIN' query. terom@29: * terom@29: * If anything fails, calls _evsql_trans_fail and returns nonzero, zero on success terom@29: */ terom@29: static int _evsql_trans_conn_ready (struct evsql *evsql, struct evsql_trans *trans) { terom@29: char trans_sql[EVSQL_QUERY_BEGIN_BUF]; terom@29: const char *isolation_level; terom@29: int ret; terom@29: terom@29: // determine the isolation_level to use terom@29: switch (trans->type) { terom@29: case EVSQL_TRANS_DEFAULT: terom@29: isolation_level = NULL; break; terom@29: terom@29: case EVSQL_TRANS_SERIALIZABLE: terom@29: isolation_level = "SERIALIZABLE"; break; terom@29: terom@29: case EVSQL_TRANS_REPEATABLE_READ: terom@29: isolation_level = "REPEATABLE READ"; break; terom@29: terom@29: case EVSQL_TRANS_READ_COMMITTED: terom@29: isolation_level = "READ COMMITTED"; break; terom@29: terom@29: case EVSQL_TRANS_READ_UNCOMMITTED: terom@29: isolation_level = "READ UNCOMMITTED"; break; terom@29: terom@29: default: terom@29: FATAL("trans->type: %d", trans->type); terom@29: } terom@29: terom@29: // build the trans_sql terom@29: if (isolation_level) terom@29: ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION ISOLATION LEVEL %s", isolation_level); terom@29: else terom@29: ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION"); terom@29: terom@29: // make sure it wasn't truncated terom@29: if (ret >= EVSQL_QUERY_BEGIN_BUF) terom@29: ERROR("trans_sql overflow: %d >= %d", ret, EVSQL_QUERY_BEGIN_BUF); terom@29: terom@29: // execute the query terom@45: if (evsql_query(evsql, trans, trans_sql, _evsql_trans_ready, trans) == NULL) terom@29: ERROR("evsql_query"); terom@29: terom@29: // success terom@29: return 0; terom@29: terom@29: error: terom@29: // fail the transaction terom@29: _evsql_trans_fail(trans); terom@29: terom@29: return -1; terom@29: } terom@29: terom@29: /* terom@29: * The evpq connection was succesfully established. terom@29: */ terom@29: static void _evsql_evpq_connected (struct evpq_conn *_conn, void *arg) { terom@29: struct evsql_conn *conn = arg; terom@29: terom@29: if (conn->trans) terom@29: // notify the transaction terom@29: // don't care about errors terom@29: (void) _evsql_trans_conn_ready(conn->evsql, conn->trans); terom@29: terom@29: else terom@29: // pump any waiting transactionless queries terom@29: _evsql_pump(conn->evsql, conn); terom@29: } terom@29: terom@29: /* terom@29: * Got one result on this evpq connection. terom@29: */ terom@29: static void _evsql_evpq_result (struct evpq_conn *_conn, PGresult *result, void *arg) { terom@29: struct evsql_conn *conn = arg; terom@29: struct evsql_query *query = conn->query; terom@29: terom@29: assert(query != NULL); terom@29: terom@29: // if we get multiple results, only return the first one terom@45: if (query->result.pq) { terom@29: WARNING("[evsql] evpq query returned multiple results, discarding previous one"); terom@29: terom@45: PQclear(query->result.pq); query->result.pq = NULL; terom@29: } terom@29: terom@29: // remember the result terom@45: query->result.pq = result; terom@29: } terom@29: terom@29: /* terom@29: * No more results for this query. terom@29: */ terom@29: static void _evsql_evpq_done (struct evpq_conn *_conn, void *arg) { terom@29: struct evsql_conn *conn = arg; terom@29: struct evsql_query *query = conn->query; terom@45: struct evsql_result res; ZINIT(res); terom@29: terom@29: assert(query != NULL); terom@29: terom@29: // set up the result_info terom@29: res.evsql = conn->evsql; terom@45: res.result = query->result; terom@29: terom@45: if (query->result.pq == NULL) { terom@29: // if a query didn't return any results (bug?), warn and fail the query terom@29: WARNING("[evsql] evpq query didn't return any results"); terom@29: terom@29: res.error = 1; terom@29: terom@45: } else if (strcmp(PQresultErrorMessage(query->result.pq), "") != 0) { terom@29: // the query failed with some error terom@29: res.error = 1; terom@29: terom@29: } else { terom@45: // the query succeeded \o/ terom@29: res.error = 0; terom@29: terom@29: } terom@29: terom@29: // de-associate the query from the connection terom@29: conn->query = NULL; terom@29: terom@29: // how we handle query completion depends on if we're a transaction or not terom@29: if (conn->trans) { terom@29: // we can deassign the trans's query terom@29: conn->trans->query = NULL; terom@29: terom@29: // was an abort? terom@29: if (!query->cb_fn) terom@29: // notify the user that the transaction query has been aborted terom@29: conn->trans->ready_fn(conn->trans, conn->trans->cb_arg); terom@29: terom@29: // then hand the query to the user terom@29: _evsql_query_done(query, &res); terom@29: terom@29: } else { terom@29: // a transactionless query, so just finish it off and pump any other waiting ones terom@29: _evsql_query_done(query, &res); terom@29: terom@29: // pump the next one terom@29: _evsql_pump(conn->evsql, conn); terom@29: } terom@29: } terom@29: terom@29: /* terom@29: * The connection failed. terom@29: */ terom@29: static void _evsql_evpq_failure (struct evpq_conn *_conn, void *arg) { terom@29: struct evsql_conn *conn = arg; terom@29: terom@29: // just fail the conn terom@29: _evsql_conn_fail(conn); terom@29: } terom@29: terom@29: /* terom@29: * Our evpq behaviour terom@29: */ terom@29: static struct evpq_callback_info _evsql_evpq_cb_info = { terom@29: .fn_connected = _evsql_evpq_connected, terom@29: .fn_result = _evsql_evpq_result, terom@29: .fn_done = _evsql_evpq_done, terom@29: .fn_failure = _evsql_evpq_failure, terom@29: }; terom@29: terom@29: /* terom@29: * Allocate the generic evsql context. terom@29: */ terom@29: static struct evsql *_evsql_new_base (struct event_base *ev_base, evsql_error_cb error_fn, void *cb_arg) { terom@29: struct evsql *evsql = NULL; terom@29: terom@29: // allocate it terom@29: if ((evsql = calloc(1, sizeof(*evsql))) == NULL) terom@29: ERROR("calloc"); terom@29: terom@29: // store terom@29: evsql->ev_base = ev_base; terom@29: evsql->error_fn = error_fn; terom@29: evsql->cb_arg = cb_arg; terom@29: terom@29: // init terom@29: LIST_INIT(&evsql->conn_list); terom@29: TAILQ_INIT(&evsql->query_queue); terom@29: terom@29: // done terom@29: return evsql; terom@29: terom@29: error: terom@29: return NULL; terom@29: } terom@29: terom@29: /* terom@29: * Start a new connection and add it to the list, it won't be ready until _evsql_evpq_connected is called terom@29: */ terom@29: static struct evsql_conn *_evsql_conn_new (struct evsql *evsql) { terom@29: struct evsql_conn *conn = NULL; terom@29: terom@29: // allocate terom@29: if ((conn = calloc(1, sizeof(*conn))) == NULL) terom@29: ERROR("calloc"); terom@29: terom@29: // init terom@29: conn->evsql = evsql; terom@29: terom@29: // connect the engine terom@29: switch (evsql->type) { terom@29: case EVSQL_EVPQ: terom@29: if ((conn->engine.evpq = evpq_connect(evsql->ev_base, evsql->engine_conf.evpq, _evsql_evpq_cb_info, conn)) == NULL) terom@29: goto error; terom@29: terom@29: break; terom@29: terom@29: default: terom@29: FATAL("evsql->type"); terom@29: } terom@29: terom@29: // add it to the list terom@29: LIST_INSERT_HEAD(&evsql->conn_list, conn, entry); terom@29: terom@29: // success terom@29: return conn; terom@29: terom@29: error: terom@29: free(conn); terom@29: terom@29: return NULL; terom@29: } terom@29: terom@29: struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) { terom@29: struct evsql *evsql = NULL; terom@29: terom@29: // base init terom@29: if ((evsql = _evsql_new_base (ev_base, error_fn, cb_arg)) == NULL) terom@29: goto error; terom@29: terom@29: // store conf terom@29: evsql->engine_conf.evpq = pq_conninfo; terom@29: terom@29: // pre-create one connection terom@29: if (_evsql_conn_new(evsql) == NULL) terom@29: goto error; terom@29: terom@29: // done terom@29: return evsql; terom@29: terom@29: error: terom@29: // XXX: more complicated than this? terom@29: free(evsql); terom@29: terom@29: return NULL; terom@29: } terom@29: terom@29: /* terom@29: * Checks if the connection is already allocated for some other trans/query. terom@29: * terom@29: * Returns: terom@29: * 0 connection idle, can be allocated terom@29: * >1 connection busy terom@29: */ terom@29: static int _evsql_conn_busy (struct evsql_conn *conn) { terom@29: // transactions get the connection to themselves terom@29: if (conn->trans) terom@29: return 1; terom@29: terom@29: // if it has a query assigned, it's busy terom@29: if (conn->query) terom@29: return 1; terom@29: terom@29: // otherwise, it's all idle terom@29: return 0; terom@29: } terom@29: terom@29: /* terom@29: * Checks if the connection is ready for use (i.e. _evsql_evpq_connected was called). terom@29: * terom@29: * The connection should not already have a query running. terom@29: * terom@29: * Returns terom@29: * <0 the connection is not valid (failed, query in progress) terom@29: * 0 the connection is still pending, and will become ready at some point terom@29: * >0 it's ready terom@29: */ terom@29: static int _evsql_conn_ready (struct evsql_conn *conn) { terom@29: switch (conn->evsql->type) { terom@29: case EVSQL_EVPQ: { terom@29: enum evpq_state state = evpq_state(conn->engine.evpq); terom@29: terom@29: switch (state) { terom@29: case EVPQ_CONNECT: terom@29: return 0; terom@29: terom@29: case EVPQ_CONNECTED: terom@29: return 1; terom@29: terom@29: case EVPQ_QUERY: terom@29: case EVPQ_INIT: terom@29: case EVPQ_FAILURE: terom@29: return -1; terom@29: terom@29: default: terom@29: FATAL("evpq_state: %d", state); terom@29: } terom@29: terom@29: } terom@29: terom@29: default: terom@29: FATAL("evsql->type: %d", conn->evsql->type); terom@29: } terom@29: } terom@29: terom@29: /* terom@29: * Allocate a connection for use and return it via *conn_ptr, or if may_queue is nonzero and the connection pool is terom@29: * getting full, return NULL (query should be queued). terom@29: * terom@29: * Note that the returned connection might not be ready for use yet (if we created a new one, see _evsql_conn_ready). terom@29: * terom@29: * Returns zero if a connection was found or the request should be queued, or nonzero if something failed and the terom@29: * request should be dropped. terom@29: */ terom@29: static int _evsql_conn_get (struct evsql *evsql, struct evsql_conn **conn_ptr, int may_queue) { terom@29: int have_nontrans = 0; terom@29: *conn_ptr = NULL; terom@29: terom@29: // find a connection that isn't busy and is ready (unless the query queue is empty). terom@29: LIST_FOREACH(*conn_ptr, &evsql->conn_list, entry) { terom@29: // we can only have a query enqueue itself if there is a non-trans conn it can later use terom@29: if (!(*conn_ptr)->trans) terom@29: have_nontrans = 1; terom@29: terom@29: // skip busy conns always terom@29: if (_evsql_conn_busy(*conn_ptr)) terom@29: continue; terom@29: terom@29: // accept pending conns as long as there are NO enqueued queries (might cause deadlock otherwise) terom@29: if (_evsql_conn_ready(*conn_ptr) == 0 && TAILQ_EMPTY(&evsql->query_queue)) terom@29: break; terom@29: terom@29: // accept conns that are in a fully ready state terom@29: if (_evsql_conn_ready(*conn_ptr) > 0) terom@29: break; terom@29: } terom@29: terom@29: // if we found an idle connection, we can just return that right away terom@29: if (*conn_ptr) terom@29: return 0; terom@29: terom@29: // return NULL if may_queue and we have a non-trans conn that we can, at some point, use terom@29: if (may_queue && have_nontrans) terom@29: return 0; terom@29: terom@29: // we need to open a new connection terom@29: if ((*conn_ptr = _evsql_conn_new(evsql)) == NULL) terom@29: goto error; terom@29: terom@29: // good terom@29: return 0; terom@29: error: terom@29: return -1; terom@29: } terom@29: terom@29: struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, evsql_trans_done_cb done_fn, void *cb_arg) { terom@29: struct evsql_trans *trans = NULL; terom@29: terom@29: // allocate it terom@29: if ((trans = calloc(1, sizeof(*trans))) == NULL) terom@29: ERROR("calloc"); terom@29: terom@29: // store terom@29: trans->evsql = evsql; terom@29: trans->ready_fn = ready_fn; terom@29: trans->done_fn = done_fn; terom@29: trans->cb_arg = cb_arg; terom@29: trans->type = type; terom@29: terom@29: // find a connection terom@29: if (_evsql_conn_get(evsql, &trans->conn, 0)) terom@29: ERROR("_evsql_conn_get"); terom@29: terom@29: // associate the conn terom@29: trans->conn->trans = trans; terom@29: terom@29: // is it already ready? terom@29: if (_evsql_conn_ready(trans->conn) > 0) { terom@29: // call _evsql_trans_conn_ready directly, it will handle cleanup (silently, !error_fn) terom@29: if (_evsql_trans_conn_ready(evsql, trans)) { terom@29: // return NULL directly terom@29: return NULL; terom@29: } terom@29: terom@29: } else { terom@29: // otherwise, wait for the conn to be ready terom@29: terom@29: } terom@29: terom@29: // and let it pass errors to the user terom@29: trans->error_fn = error_fn; terom@29: terom@29: // ok terom@29: return trans; terom@29: terom@29: error: terom@29: free(trans); terom@29: terom@29: return NULL; terom@29: } terom@29: terom@29: /* terom@44: * Internal query functions terom@29: */ terom@45: struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) { terom@29: struct evsql_query *query = NULL; terom@29: terom@29: // if it's part of a trans, then make sure the trans is idle terom@29: if (trans && trans->query) terom@29: ERROR("transaction is busy"); terom@29: terom@29: // allocate it terom@29: if ((query = calloc(1, sizeof(*query))) == NULL) terom@29: ERROR("calloc"); terom@29: terom@29: // store terom@29: query->cb_fn = query_fn; terom@29: query->cb_arg = cb_arg; terom@29: terom@29: // success terom@29: return query; terom@29: terom@29: error: terom@29: return NULL; terom@29: } terom@29: terom@45: int _evsql_query_enqueue (struct evsql *evsql, struct evsql_trans *trans, struct evsql_query *query, const char *command) { terom@29: // transaction queries are handled differently terom@29: if (trans) { terom@29: // it's an in-transaction query terom@29: assert(trans->query == NULL); terom@29: terom@29: // assign the query terom@29: trans->query = query; terom@29: terom@29: // execute directly terom@29: if (_evsql_query_exec(trans->conn, query, command)) { terom@29: // ack, fail the transaction terom@29: _evsql_trans_fail(trans); terom@29: terom@29: // caller frees query terom@29: goto error; terom@29: } terom@29: terom@29: } else { terom@29: struct evsql_conn *conn; terom@29: terom@29: // find an idle connection terom@29: if ((_evsql_conn_get(evsql, &conn, 1))) terom@29: ERROR("couldn't allocate a connection for the query"); terom@29: terom@29: // we must enqueue if no idle conn or the conn is not yet ready terom@29: if (conn && _evsql_conn_ready(conn) > 0) { terom@29: // execute directly terom@29: if (_evsql_query_exec(conn, query, command)) { terom@29: // ack, fail the connection terom@29: _evsql_conn_fail(conn); terom@29: terom@29: // make sure we don't deadlock any queries, but if this query got a conn directly, then we shouldn't terom@29: // have any queries enqueued anyways terom@29: assert(TAILQ_EMPTY(&evsql->query_queue)); terom@29: terom@29: // caller frees query terom@29: goto error; terom@29: } terom@29: terom@29: } else { terom@29: // copy the command for later execution terom@29: if ((query->command = strdup(command)) == NULL) terom@29: ERROR("strdup"); terom@29: terom@29: // enqueue until some connection pumps the queue terom@29: TAILQ_INSERT_TAIL(&evsql->query_queue, query, entry); terom@29: } terom@29: } terom@29: terom@29: // ok, good terom@29: return 0; terom@29: terom@29: error: terom@29: return -1; terom@29: } terom@29: terom@29: terom@45: void _evsql_trans_commit_res (struct evsql_result *res, void *arg) { terom@45: struct evsql_trans *trans = arg; terom@29: terom@29: // check for errors terom@29: if (res->error) terom@29: ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res)); terom@29: terom@29: // transaction is now done terom@45: trans->done_fn(trans, trans->cb_arg); terom@29: terom@29: // release it terom@45: _evsql_trans_release(trans); terom@29: terom@29: // success terom@29: return; terom@29: terom@29: error: terom@45: _evsql_trans_fail(trans); terom@29: } terom@29: terom@29: int evsql_trans_commit (struct evsql_trans *trans) { terom@29: static const char *sql = "COMMIT TRANSACTION"; terom@29: terom@29: if (trans->query) terom@29: ERROR("cannot COMMIT because transaction is still busy"); terom@29: terom@29: // query terom@47: if (evsql_query(trans->evsql, trans, sql, _evsql_trans_commit_res, trans) == NULL) terom@29: goto error; terom@29: terom@29: // mark it as commited in case someone wants to abort it terom@29: trans->has_commit = 1; terom@29: terom@29: // success terom@29: return 0; terom@29: terom@29: error: terom@29: return -1; terom@29: } terom@29: terom@45: void _evsql_trans_rollback_res (struct evsql_result *res, void *arg) { terom@45: struct evsql_trans *trans = arg; terom@29: terom@29: // fail the connection on errors terom@29: if (res->error) terom@29: ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res)); terom@29: terom@29: // release it terom@45: _evsql_trans_release(trans); terom@29: terom@29: // success terom@29: return; terom@29: terom@29: error: terom@29: // fail the connection too, errors are supressed terom@45: _evsql_trans_fail(trans); terom@29: } terom@29: terom@29: /* terom@29: * Used as the ready_fn callback in case of abort, otherwise directly terom@29: */ terom@45: void _evsql_trans_rollback (struct evsql_trans *trans, void *arg) { terom@29: static const char *sql = "ROLLBACK TRANSACTION"; terom@29: terom@45: (void) arg; terom@29: terom@29: // query terom@45: if (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, trans) == NULL) { terom@45: // fail the transaction/connection, errors are supressed terom@29: _evsql_trans_fail(trans); terom@29: } terom@29: terom@29: } terom@29: terom@29: void evsql_trans_abort (struct evsql_trans *trans) { terom@29: // supress errors terom@29: trans->error_fn = NULL; terom@29: terom@29: if (trans->has_commit) { terom@29: // abort after commit doesn't make sense terom@29: FATAL("transaction was already commited"); terom@29: } terom@29: terom@29: if (trans->query) { terom@29: // gah, some query is running terom@29: WARNING("aborting pending query"); terom@29: terom@45: // prepare to rollback once complete by hijacking ready_fn terom@29: trans->ready_fn = _evsql_trans_rollback; terom@29: terom@29: // abort terom@29: evsql_query_abort(trans, trans->query); terom@29: terom@29: } else { terom@29: // just rollback directly terom@29: _evsql_trans_rollback(trans, NULL); terom@29: terom@29: } terom@29: } terom@29: