# HG changeset patch # User Tero Marttila # Date 1223831447 -10800 # Node ID 99a41f48e29bb63340756a4b7f3ee78eefe9114c # Parent 82cfdb6680d12df4c06f3996f9c2ebfd3b8378a8 evsql transactions, it compiles... diff -r 82cfdb6680d1 -r 99a41f48e29b Makefile --- a/Makefile Sun Oct 12 14:57:06 2008 +0300 +++ b/Makefile Sun Oct 12 20:10:47 2008 +0300 @@ -21,7 +21,7 @@ bin/simple_hello: obj/evfuse.o obj/dirbuf.o obj/lib/log.o obj/lib/signals.o obj/simple.o bin/evpq_test: obj/evpq.o obj/lib/log.o bin/url_test: obj/lib/url.o obj/lib/lex.o obj/lib/log.o -bin/dbfs: obj/evsql.o obj/evpq.o obj/evfuse.o obj/dirbuf.o obj/lib/log.o obj/lib/signals.o +bin/dbfs: obj/evsql.o obj/evsql_util.o obj/evpq.o obj/evfuse.o obj/dirbuf.o obj/lib/log.o obj/lib/signals.o # computed LDFLAGS = ${LIBRARY_PATHS} ${LIBRARY_LIST} diff -r 82cfdb6680d1 -r 99a41f48e29b src/dbfs.c --- a/src/dbfs.c Sun Oct 12 14:57:06 2008 +0300 +++ b/src/dbfs.c Sun Oct 12 20:10:47 2008 +0300 @@ -56,51 +56,98 @@ } +/* + * Check the result set. + * + * Returns; + * -1 if the query failed, the columns do not match, or there are too many/few rows + * 0 the results match + * 1 there were no results + */ +int _dbfs_check_res (const struct evsql_result_info *res, size_t rows, size_t cols) { + int err = 0; + + // check if it failed + if (res->error) + NERROR(evsql_result_error(res)); + + // not found? + if (evsql_result_rows(res) == 0) + SERROR(err = 1); + + // duplicate rows? + if (evsql_result_rows(res) != rows) + ERROR("multiple rows returned"); + + // correct number of columns + if (evsql_result_cols(res) != 5) + ERROR("wrong number of columns: %zu", evsql_result_cols(res)); + + // good + return 0; + +error: + if (!err) + err = -1; + + return err; +} + +int _dbfs_stat_info (struct stat *st, const struct evsql_result_info *res, size_t row, size_t col_offset) { + int err = 0; + + uint16_t mode; + uint64_t size, nlink; + const char *type; + + // extract the data + if (0 + || evsql_result_string(res, row, col_offset + 0, &type, 0 ) // inodes.type + || evsql_result_uint16(res, row, col_offset + 1, &mode, 0 ) // inodes.mode + || evsql_result_uint64(res, row, col_offset + 2, &size, 0 ) // inodes.size + || evsql_result_uint64(res, row, col_offset + 3, &nlink, 0 ) // count(*) + ) + EERROR(err = EIO, "invalid db data"); + + INFO("\tst_mode=S_IF%s | %ho, st_nlink=%llu, st_size=%llu", type, mode, (long long unsigned int) nlink, (long long unsigned int) size); + + // convert and store + st->st_mode = _dbfs_mode(type) | mode; + st->st_nlink = nlink; + st->st_size = size; + + // good + return 0; + +error: + return -1; +} + void _dbfs_lookup_result (const struct evsql_result_info *res, void *arg) { struct fuse_req *req = arg; struct fuse_entry_param e; ZINIT(e); int err = 0; - uint16_t mode; uint32_t ino; - uint64_t size, nlink; - const char *type; - // check if it failed - if (res->error && (err = EIO)) - NERROR(evsql_result_error(res)); - - // duplicate rows? - if (evsql_result_rows(res) > 1) - EERROR(err = EIO, "multiple rows returned"); - - // not found? - if (evsql_result_rows(res) == 0) - SERROR(err = ENOENT); - - // correct number of columns - if (evsql_result_cols(res) != 5) - EERROR(err = EIO, "wrong number of columns: %zu", evsql_result_cols(res)); + // check the results + if ((err = _dbfs_check_res(res, 1, 5))) + SERROR(err = (err == 1 ? ENOENT : EIO)); // get the data if (0 || evsql_result_uint32(res, 0, 0, &ino, 0 ) // inodes.ino - || evsql_result_string(res, 0, 1, &type, 0 ) // inodes.type - || evsql_result_uint16(res, 0, 2, &mode, 0 ) // inodes.mode - || evsql_result_uint64(res, 0, 3, &size, 0 ) // inodes.size - || evsql_result_uint64(res, 0, 4, &nlink, 0 ) // count(*) ) EERROR(err = EIO, "invalid db data"); + + INFO("[dbfs.lookup] -> ion=%u", ino); - INFO("[dbfs.look] -> ino=%u, st_mode=S_IF%s | %ho, st_nlink=%llu, st_size=%llu", ino, type, mode, (long long unsigned int) nlink, (long long unsigned int) size); + // stat attrs + if (_dbfs_stat_info(&e.attr, res, 0, 1)) + goto error; - // convert and store + // other attrs e.ino = e.attr.st_ino = ino; - e.attr.st_mode = _dbfs_mode(type) | mode; - e.attr.st_nlink = nlink; - e.attr.st_size = size; - - // XXX: timeouts e.attr_timeout = CACHE_TIMEOUT; e.entry_timeout = CACHE_TIMEOUT; @@ -127,7 +174,7 @@ "SELECT" " inodes.ino, inodes.type, inodes.mode, inodes.size, count(*)" " FROM file_tree INNER JOIN inodes ON (file_tree.inode = inodes.ino)" - " WHERE file_tree.parent = $1::int AND file_tree.name = $2::varchar" + " WHERE file_tree.parent = $1::int4 AND file_tree.name = $2::varchar" " GROUP BY inodes.ino, inodes.type, inodes.mode, inodes.size"; static struct evsql_query_params params = EVSQL_PARAMS(EVSQL_FMT_BINARY) { @@ -145,7 +192,7 @@ EERROR(err = EIO, "evsql_param_*"); // query - if (evsql_query_params(ctx->db, sql, ¶ms, _dbfs_lookup_result, req) == NULL) + if (evsql_query_params(ctx->db, NULL, sql, ¶ms, _dbfs_lookup_result, req) == NULL) EERROR(err = EIO, "evsql_query_params"); // XXX: handle interrupts @@ -158,11 +205,85 @@ EWARNING(err, "fuse_reply_err"); } +void _dbfs_getattr_result (const struct evsql_result_info *res, void *arg) { + struct fuse_req *req = arg; + struct stat st; ZINIT(st); + int err = 0; + + // check the results + if ((err = _dbfs_check_res(res, 1, 4))) + SERROR(err = (err == 1 ? ENOENT : EIO)); + + INFO("[dbfs.getattr %p] -> (stat follows)", req); + + // stat attrs + if (_dbfs_stat_info(&st, res, 0, 0)) + goto error; + + // XXX: we don't have the ino + st.st_ino = 0; + + // reply + if ((err = fuse_reply_attr(req, &st, CACHE_TIMEOUT))) + EERROR(err, "fuse_reply_entry"); + +error: + if (err && (err = fuse_reply_err(req, err))) + EWARNING(err, "fuse_reply_err"); + + // free + evsql_result_free(res); +} + +static void dbfs_getattr (struct fuse_req *req, fuse_ino_t ino, struct fuse_file_info *fi) { + struct dbfs *ctx = fuse_req_userdata(req); + int err; + + (void) fi; + + INFO("[dbfs.getattr %p] ino=%lu", req, ino); + + const char *sql = + "SELECT" + " inodes.type, inodes.mode, inodes.size, count(*)" + " FROM inodes" + " WHERE inodes.ino = ‰1::int4" + " GROUP BY inodes.type, inodes.mode, inodes.size"; + + static struct evsql_query_params params = EVSQL_PARAMS(EVSQL_FMT_BINARY) { + EVSQL_PARAM ( UINT32 ), + + EVSQL_PARAMS_END + }; + + // build params + if (0 + || evsql_param_uint32(¶ms, 0, ino) + ) + SERROR(err = EIO); + + // query + if (evsql_query_params(ctx->db, NULL, sql, ¶ms, _dbfs_getattr_result, req) == NULL) + SERROR(err = EIO); + + // XXX: handle interrupts + + // wait + return; + +error: + if ((err = fuse_reply_err(req, err))) + EWARNING(err, "fuse_reply_err"); +} + struct fuse_lowlevel_ops dbfs_llops = { + .init = dbfs_init, .destroy = dbfs_destroy, .lookup = dbfs_lookup, + + .getattr = dbfs_getattr, }; void dbfs_sql_error (struct evsql *evsql, void *arg) { diff -r 82cfdb6680d1 -r 99a41f48e29b src/evpq.c --- a/src/evpq.c Sun Oct 12 14:57:06 2008 +0300 +++ b/src/evpq.c Sun Oct 12 20:10:47 2008 +0300 @@ -239,12 +239,8 @@ return conn; error: - if (conn) { - if (conn->pg_conn) - PQfinish(conn->pg_conn); - - free(conn); - } + if (conn) + evpq_release(conn); return NULL; } @@ -319,6 +315,16 @@ } +void evpq_release (struct evpq_conn *conn) { + if (conn->ev) + event_free(conn->ev); + + if (conn->pg_conn) + PQfinish(conn->pg_conn); + + free(conn); +} + enum evpq_state evpq_state (struct evpq_conn *conn) { return conn->state; } diff -r 82cfdb6680d1 -r 99a41f48e29b src/evpq.h --- a/src/evpq.h Sun Oct 12 14:57:06 2008 +0300 +++ b/src/evpq.h Sun Oct 12 20:10:47 2008 +0300 @@ -101,5 +101,11 @@ // convenience wrappers #define evpq_error_message(conn) PQerrorMessage(evpq_pgconn(conn)) +/* + * Release the evpq_conn, closing all connections and freeing all resources. + * + * You must call this yourself in all cases after evpq_connect returns an evpq_conn. + */ +void evpq_release (struct evpq_conn *conn); #endif /* EVPQ_H */ diff -r 82cfdb6680d1 -r 99a41f48e29b src/evsql.c --- a/src/evsql.c Sun Oct 12 14:57:06 2008 +0300 +++ b/src/evsql.c Sun Oct 12 20:10:47 2008 +0300 @@ -1,82 +1,31 @@ #define _GNU_SOURCE #include -#include #include #include #include "evsql.h" +#include "evsql_internal.h" #include "evpq.h" #include "lib/log.h" #include "lib/error.h" #include "lib/misc.h" -enum evsql_type { - EVSQL_EVPQ, -}; - -struct evsql { - // callbacks - evsql_error_cb error_fn; - void *cb_arg; - - // backend engine - enum evsql_type type; - - union { - struct evpq_conn *evpq; - } engine; - - // list of queries running or waiting to run - TAILQ_HEAD(evsql_queue, evsql_query) queue; -}; - -struct evsql_query { - // the evsql we are querying - struct evsql *evsql; - - // the actual SQL query, this may or may not be ours, see _evsql_query_exec - char *command; - - // possible query params - struct evsql_query_param_info { - int count; - - Oid *types; - const char **values; - int *lengths; - int *formats; - - int result_format; - } params; - - // our callback - evsql_query_cb cb_fn; - void *cb_arg; - - // our position in the query list - TAILQ_ENTRY(evsql_query) entry; - - // the result - union { - PGresult *evpq; - } result; -}; - /* * Actually execute the given query. * * The backend should be able to accept the query at this time. * - * query->command must be valid during the execution of this function, but once it returns, the command is not needed - * anymore, and should be set to NULL. + * You should assume that if trying to execute a query fails, then the connection should also be considred as failed. */ -static int _evsql_query_exec (struct evsql *evsql, struct evsql_query *query, const char *command) { - switch (evsql->type) { +static int _evsql_query_exec (struct evsql_conn *conn, struct evsql_query *query, const char *command) { + int err; + + switch (conn->evsql->type) { case EVSQL_EVPQ: // got params? if (query->params.count) { - return evpq_query_params(evsql->engine.evpq, command, + err = evpq_query_params(conn->engine.evpq, command, query->params.count, query->params.types, query->params.values, @@ -87,16 +36,33 @@ } else { // plain 'ole query - return evpq_query(evsql->engine.evpq, command); + err = evpq_query(conn->engine.evpq, command); } + + if (err) { + if (PQstatus(evpq_pgconn(conn->engine.evpq)) != CONNECTION_OK) + WARNING("conn failed"); + else + WARNING("query failed, dropping conn as well"); + } + + break; default: FATAL("evsql->type"); } + + if (!err) + // assign the query + conn->query = query; + + return err; } /* - * Free the query and related resources, doesn't trigger any callbacks or remove from any queues + * Free the query and related resources, doesn't trigger any callbacks or remove from any queues. + * + * The command should already be taken care of (NULL). */ static void _evsql_query_free (struct evsql_query *query) { assert(query->command == NULL); @@ -112,12 +78,9 @@ } /* - * Dequeue the query, execute the callback, and free it. + * Execute the callback if res is given, and free the query. */ static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *res) { - // dequeue - TAILQ_REMOVE(&query->evsql->queue, query, entry); - if (res) // call the callback query->cb_fn(res, query->cb_arg); @@ -127,77 +90,263 @@ } /* - * A query has failed, notify the user and remove it. - */ -static void _evsql_query_failure (struct evsql *evsql, struct evsql_query *query) { - struct evsql_result_info res; ZINIT(res); - - // set up the result_info - res.evsql = evsql; - res.error = 1; - - // finish it off - _evsql_query_done(query, &res); -} - -/* - * Clear every enqueued query and then free the evsql. - * - * If result_info is given, each query will also recieve it via their callback, and the error_fn will be called. - */ + * XXX: + * / static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *res) { struct evsql_query *query; // clear the queue - while ((query = TAILQ_FIRST(&evsql->queue)) != NULL) { + while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) { _evsql_query_done(query, res); - TAILQ_REMOVE(&evsql->queue, query, entry); + TAILQ_REMOVE(&evsql->query_queue, query, entry); } - // do the error callback if required - if (res) - evsql->error_fn(evsql, evsql->cb_arg); - // free free(evsql); } - +*/ /* - * Sends the next query if there are more enqueued + * Free the transaction, it should already be deassociated from the query and conn. */ -static void _evsql_pump (struct evsql *evsql) { - struct evsql_query *query; +static void _evsql_trans_free (struct evsql_trans *trans) { + // ensure we don't leak anything + assert(trans->query == NULL); + assert(trans->conn == NULL); - // look for the next query - if ((query = TAILQ_FIRST(&evsql->queue)) != NULL) { - // try and execute it - if (_evsql_query_exec(evsql, query, query->command)) { - // the query failed - _evsql_query_failure(evsql, query); + // free + free(trans); +} + +/* + * Release a connection. It should already be deassociated from the trans and query. + * + * Releases the engine, removes from the conn_list and frees this. + */ +static void _evsql_conn_release (struct evsql_conn *conn) { + // ensure we don't leak anything + assert(conn->trans == NULL); + assert(conn->query == NULL); + + // release the engine + switch (conn->evsql->type) { + case EVSQL_EVPQ: + evpq_release(conn->engine.evpq); + break; + + default: + FATAL("evsql->type"); + } + + // remove from list + LIST_REMOVE(conn, entry); + + // free + free(conn); +} + +/* + * Fail a single query, this will trigger the callback and free it. + */ +static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) { + struct evsql_result_info res; ZINIT(res); + + // set up the result_info + res.evsql = evsql; + res.error = 1; + + // finish off the query + _evsql_query_done(query, &res); +} + +/* + * Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the + * conn, and then free the trans. + */ +static void _evsql_trans_fail (struct evsql_trans *trans) { + if (trans->query) { + // free the query silently + _evsql_query_free(trans->query); trans->query = NULL; + } + + // tell the user + // XXX: trans is in a bad state during this call + trans->error_fn(trans, trans->cb_arg); + + // fail the conn + trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL; + + // free the trans + _evsql_trans_free(trans); +} + +/* + * Fail a connection. If the connection is transactional, this will just call _evsql_trans_fail, but otherwise it will + * fail any ongoing query, and then release the connection. + */ +static void _evsql_conn_fail (struct evsql_conn *conn) { + if (conn->trans) { + // let transactions handle their connection failures + _evsql_trans_fail(conn->trans); + + } else { + if (conn->query) { + // fail the in-progress query + _evsql_query_fail(conn->evsql, conn->query); conn->query = NULL; } - // free the command - free(query->command); query->command = NULL; - - // ok, then we just wait + // finish off the whole connection + _evsql_conn_release(conn); } } +/* + * Processes enqueued non-transactional queries until the queue is empty, or we managed to exec a query. + * + * If execing a query on a connection fails, both the query and the connection are failed (in that order). + * + * Any further queries will then also be failed, because there's no reconnection/retry logic yet. + */ +static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) { + struct evsql_query *query; + int err; + + // look for waiting queries + while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) { + // dequeue + TAILQ_REMOVE(&evsql->query_queue, query, entry); + + if (conn) { + // try and execute it + err = _evsql_query_exec(conn, query, query->command); + } -static void _evsql_evpq_connected (struct evpq_conn *conn, void *arg) { - struct evsql *evsql = arg; + // free the command buf + free(query->command); query->command = NULL; - // no state to update, just pump any waiting queries - _evsql_pump(evsql); + if (err || !conn) { + if (!conn) { + // warn when dropping queries + WARNING("failing query becuse there are no conns"); + } + + // fail the query + _evsql_query_fail(evsql, query); + + if (conn) { + // fail the connection + WARNING("failing the connection because a query-exec failed"); + + _evsql_conn_fail(conn); conn = NULL; + } + + } else { + // we have succesfully enqueued a query, and we can wait for this connection to complete + break; + + } + + // handle the rest of the queue + } + + // ok + return; } -static void _evsql_evpq_result (struct evpq_conn *conn, PGresult *result, void *arg) { - struct evsql *evsql = arg; - struct evsql_query *query; +/* + * Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use. + */ +static void _evsql_trans_ready (const struct evsql_result_info *res, void *arg) { + (void) arg; - assert((query = TAILQ_FIRST(&evsql->queue)) != NULL); + assert(res->trans); + + // check for errors + if (res->error) + ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res)); + + // transaction is now ready for use + res->trans->ready_fn(res->trans, res->trans->cb_arg); + +error: + _evsql_trans_fail(res->trans); +} + +/* + * The transaction's connection is ready, send the 'BEGIN' query. + */ +static void _evsql_trans_conn_ready (struct evsql *evsql, struct evsql_trans *trans) { + char trans_sql[EVSQL_QUERY_BEGIN_BUF]; + const char *isolation_level; + int ret; + + // determine the isolation_level to use + switch (trans->type) { + case EVSQL_TRANS_DEFAULT: + isolation_level = NULL; break; + + case EVSQL_TRANS_SERIALIZABLE: + isolation_level = "SERIALIZABLE"; break; + + case EVSQL_TRANS_REPEATABLE_READ: + isolation_level = "REPEATABLE READ"; break; + + case EVSQL_TRANS_READ_COMMITTED: + isolation_level = "READ COMMITTED"; break; + + case EVSQL_TRANS_READ_UNCOMMITTED: + isolation_level = "READ UNCOMMITTED"; break; + + default: + FATAL("trans->type: %d", trans->type); + } + + // build the trans_sql + if (isolation_level) + ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION ISOLATION LEVEL %s", isolation_level); + else + ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION"); + + // make sure it wasn't truncated + if (ret >= EVSQL_QUERY_BEGIN_BUF) + ERROR("trans_sql overflow: %d >= %d", ret, EVSQL_QUERY_BEGIN_BUF); + + // execute the query + if (evsql_query(evsql, trans, trans_sql, _evsql_trans_ready, NULL)) + ERROR("evsql_query"); + + // success + return; + +error: + // fail the transaction + _evsql_trans_fail(trans); +} + +/* + * The evpq connection was succesfully established. + */ +static void _evsql_evpq_connected (struct evpq_conn *_conn, void *arg) { + struct evsql_conn *conn = arg; + + if (conn->trans) + // notify the transaction + _evsql_trans_conn_ready(conn->evsql, conn->trans); + + else + // pump any waiting transactionless queries + _evsql_pump(conn->evsql, conn); +} + +/* + * Got one result on this evpq connection. + */ +static void _evsql_evpq_result (struct evpq_conn *_conn, PGresult *result, void *arg) { + struct evsql_conn *conn = arg; + struct evsql_query *query = conn->query; + + assert(query != NULL); // if we get multiple results, only return the first one if (query->result.evpq) { @@ -210,15 +359,18 @@ query->result.evpq = result; } -static void _evsql_evpq_done (struct evpq_conn *conn, void *arg) { - struct evsql *evsql = arg; - struct evsql_query *query; +/* + * No more results for this query. + */ +static void _evsql_evpq_done (struct evpq_conn *_conn, void *arg) { + struct evsql_conn *conn = arg; + struct evsql_query *query = conn->query; struct evsql_result_info res; ZINIT(res); - - assert((query = TAILQ_FIRST(&evsql->queue)) != NULL); + + assert(query != NULL); // set up the result_info - res.evsql = evsql; + res.evsql = conn->evsql; if (query->result.evpq == NULL) { // if a query didn't return any results (bug?), warn and fail the query @@ -237,27 +389,39 @@ } - // finish it off - _evsql_query_done(query, &res); + // de-associate the query from the connection + conn->query = NULL; + + // how we handle query completion depends on if we're a transaction or not + if (conn->trans) { + // we can deassign the trans's query + conn->trans->query = NULL; - // pump the next one - _evsql_pump(evsql); + // then hand the query to the user + _evsql_query_done(query, &res); + + } else { + // a transactionless query, so just finish it off and pump any other waiting ones + _evsql_query_done(query, &res); + + // pump the next one + _evsql_pump(conn->evsql, conn); + } } -static void _evsql_evpq_failure (struct evpq_conn *conn, void *arg) { - struct evsql *evsql = arg; - struct evsql_result_info result; ZINIT(result); - - // OH SHI... +/* + * The connection failed. + */ +static void _evsql_evpq_failure (struct evpq_conn *_conn, void *arg) { + struct evsql_conn *conn = arg; - // set up the result_info - result.evsql = evsql; - result.error = 1; - - // finish off the whole connection - _evsql_destroy(evsql, &result); + // just fail the conn + _evsql_conn_fail(conn); } +/* + * Our evpq behaviour + */ static struct evpq_callback_info _evsql_evpq_cb_info = { .fn_connected = _evsql_evpq_connected, .fn_result = _evsql_evpq_result, @@ -265,7 +429,10 @@ .fn_failure = _evsql_evpq_failure, }; -static struct evsql *_evsql_new_base (evsql_error_cb error_fn, void *cb_arg) { +/* + * Allocate the generic evsql context. + */ +static struct evsql *_evsql_new_base (struct event_base *ev_base, evsql_error_cb error_fn, void *cb_arg) { struct evsql *evsql = NULL; // allocate it @@ -273,11 +440,13 @@ ERROR("calloc"); // store + evsql->ev_base = ev_base; evsql->error_fn = error_fn; evsql->cb_arg = cb_arg; // init - TAILQ_INIT(&evsql->queue); + LIST_INIT(&evsql->conn_list); + TAILQ_INIT(&evsql->query_queue); // done return evsql; @@ -286,15 +455,55 @@ return NULL; } +/* + * Start a new connection and add it to the list, it won't be ready until _evsql_evpq_connected is called + */ +static struct evsql_conn *_evsql_conn_new (struct evsql *evsql) { + struct evsql_conn *conn = NULL; + + // allocate + if ((conn = calloc(1, sizeof(*conn))) == NULL) + ERROR("calloc"); + + // init + conn->evsql = evsql; + + // connect the engine + switch (evsql->type) { + case EVSQL_EVPQ: + if ((conn->engine.evpq = evpq_connect(evsql->ev_base, evsql->engine_conf.evpq, _evsql_evpq_cb_info, conn)) == NULL) + goto error; + + break; + + default: + FATAL("evsql->type"); + } + + // add it to the list + LIST_INSERT_HEAD(&evsql->conn_list, conn, entry); + + // success + return conn; + +error: + free(conn); + + return NULL; +} + struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) { struct evsql *evsql = NULL; // base init - if ((evsql = _evsql_new_base (error_fn, cb_arg)) == NULL) + if ((evsql = _evsql_new_base (ev_base, error_fn, cb_arg)) == NULL) goto error; - // connect the engine - if ((evsql->engine.evpq = evpq_connect(ev_base, pq_conninfo, _evsql_evpq_cb_info, evsql)) == NULL) + // store conf + evsql->engine_conf.evpq = pq_conninfo; + + // pre-create one connection + if (_evsql_conn_new(evsql) == NULL) goto error; // done @@ -308,50 +517,123 @@ } /* - * Checks what the state of the connection is in regards to executing a query. + * Checks if the connection is already allocated for some other trans/query. * * Returns: - * <0 connection failure, query not possible - * 0 connection idle, can query immediately - * 1 connection busy, must queue query + * 0 connection idle, can be allocated + * >1 connection busy */ -static int _evsql_query_busy (struct evsql *evsql) { - switch (evsql->type) { +static int _evsql_conn_busy (struct evsql_conn *conn) { + // transactions get the connection to themselves + if (conn->trans) + return 1; + + // if it has a query assigned, it's busy + if (conn->query) + return 1; + + // otherwise, it's all idle + return 0; +} + +/* + * Checks if the connection is ready for use (i.e. _evsql_evpq_connected was called). + * + * The connection should not already have a query running. + * + * Returns + * <0 the connection is not valid (failed, query in progress) + * 0 the connection is still pending, and will become ready at some point + * >0 it's ready + */ +static int _evsql_conn_ready (struct evsql_conn *conn) { + switch (conn->evsql->type) { case EVSQL_EVPQ: { - enum evpq_state state = evpq_state(evsql->engine.evpq); + enum evpq_state state = evpq_state(conn->engine.evpq); switch (state) { case EVPQ_CONNECT: - case EVPQ_QUERY: - return 1; + return 0; case EVPQ_CONNECTED: - return 0; + return 1; + case EVPQ_QUERY: case EVPQ_INIT: case EVPQ_FAILURE: return -1; default: - FATAL("evpq_state"); + FATAL("evpq_state: %d", state); } } default: - FATAL("evsql->type"); + FATAL("evsql->type: %d", conn->evsql->type); } } -static struct evsql_query *_evsql_query_new (struct evsql *evsql, evsql_query_cb query_fn, void *cb_arg) { +/* + * Allocate a connection for use and return it via *conn_ptr, or if may_queue is nonzero and the connection pool is + * getting full, return NULL (query should be queued). + * + * Note that the returned connection might not be ready for use yet (if we created a new one, see _evsql_conn_ready). + * + * Returns zero if a connection was found or the request should be queued, or nonzero if something failed and the + * request should be dropped. + */ +static int _evsql_conn_get (struct evsql *evsql, struct evsql_conn **conn_ptr, int may_queue) { + *conn_ptr = NULL; + + // find a connection that isn't busy and is ready (unless the query queue is empty). + LIST_FOREACH(*conn_ptr, &evsql->conn_list, entry) { + // skip busy conns always + if (_evsql_conn_busy(*conn_ptr)) + continue; + + // accept pending conns as long as there are NO enqueued queries (might cause deadlock otherwise) + if (_evsql_conn_ready(*conn_ptr) == 0 && TAILQ_EMPTY(&evsql->query_queue)) + break; + + // accept conns that are in a fully ready state + if (_evsql_conn_ready(*conn_ptr) > 0) + break; + } + + // if we found an idle connection, we can just return that right away + if (*conn_ptr) + return 0; + + // return NULL if may_queue and the conn list is not empty + if (may_queue && !LIST_EMPTY(&evsql->conn_list)) + return 0; + + // we need to open a new connection + if ((*conn_ptr = _evsql_conn_new(evsql)) == NULL) + goto error; + + // good + return 0; +error: + return -1; +} + +/* + * Validate and allocate the basic stuff for a new query. + */ +static struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) { struct evsql_query *query; + // if it's part of a trans, then make sure the trans is idle + if (trans && trans->query) + ERROR("transaction is busy"); + // allocate it if ((query = calloc(1, sizeof(*query))) == NULL) ERROR("calloc"); // store - query->evsql = evsql; query->cb_fn = query_fn; query->cb_arg = cb_arg; @@ -362,29 +644,48 @@ return NULL; } -static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_query *query, const char *command) { - int busy; - - // check state - if ((busy = _evsql_query_busy(evsql)) < 0) - ERROR("connection is not valid"); - - if (busy) { - // copy the command for later execution - if ((query->command = strdup(command)) == NULL) - ERROR("strdup"); +/* + * Handle a new query. + * + * For transactions this will associate the query and then execute it, otherwise this will either find an idle + * connection and send the query, or enqueue it. + */ +static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_trans *trans, struct evsql_query *query, const char *command) { + // transaction queries are handled differently + if (trans) { + // it's an in-transaction query + assert(trans->query == NULL); + + // assign the query + trans->query = query; + + // execute directly + if (_evsql_query_exec(trans->conn, query, command)) + goto error; } else { - assert(TAILQ_EMPTY(&evsql->queue)); + struct evsql_conn *conn; + + // find an idle connection + if ((_evsql_conn_get(evsql, &conn, 1))) + ERROR("couldn't allocate a connection for the query"); - // execute directly - if (_evsql_query_exec(evsql, query, command)) - goto error; + // we must enqueue if no idle conn or the conn is not yet ready + if (conn && _evsql_conn_ready(conn) > 0) { + // execute directly + if (_evsql_query_exec(conn, query, command)) + goto error; + + } else { + // copy the command for later execution + if ((query->command = strdup(command)) == NULL) + ERROR("strdup"); + + // enqueue until some connection pumps the queue + TAILQ_INSERT_TAIL(&evsql->query_queue, query, entry); + } } - - // store it on the list - TAILQ_INSERT_TAIL(&evsql->queue, query, entry); // ok, good return 0; @@ -393,15 +694,15 @@ return -1; } -struct evsql_query *evsql_query (struct evsql *evsql, const char *command, evsql_query_cb query_fn, void *cb_arg) { +struct evsql_query *evsql_query (struct evsql *evsql, struct evsql_trans *trans, const char *command, evsql_query_cb query_fn, void *cb_arg) { struct evsql_query *query = NULL; // alloc new query - if ((query = _evsql_query_new(evsql, query_fn, cb_arg)) == NULL) + if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL) goto error; // just execute the command string directly - if (_evsql_query_enqueue(evsql, query, command)) + if (_evsql_query_enqueue(evsql, trans, query, command)) goto error; // ok @@ -413,13 +714,13 @@ return NULL; } -struct evsql_query *evsql_query_params (struct evsql *evsql, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg) { +struct evsql_query *evsql_query_params (struct evsql *evsql, struct evsql_trans *trans, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg) { struct evsql_query *query = NULL; const struct evsql_query_param *param; int idx; // alloc new query - if ((query = _evsql_query_new(evsql, query_fn, cb_arg)) == NULL) + if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL) goto error; // count the params @@ -464,7 +765,7 @@ } // execute it - if (_evsql_query_enqueue(evsql, query, command)) + if (_evsql_query_enqueue(evsql, trans, query, command)) goto error; // ok @@ -476,166 +777,3 @@ return NULL; } -int evsql_param_string (struct evsql_query_params *params, size_t param, const char *ptr) { - struct evsql_query_param *p = ¶ms->list[param]; - - assert(p->type == EVSQL_PARAM_STRING); - - p->data_raw = ptr; - p->length = 0; - - return 0; -} - -int evsql_param_uint32 (struct evsql_query_params *params, size_t param, uint32_t uval) { - struct evsql_query_param *p = ¶ms->list[param]; - - assert(p->type == EVSQL_PARAM_UINT32); - - p->data.uint32 = htonl(uval); - p->data_raw = (const char *) &p->data.uint32; - p->length = sizeof(uval); - - return 0; -} - -const char *evsql_result_error (const struct evsql_result_info *res) { - if (!res->error) - return "No error"; - - switch (res->evsql->type) { - case EVSQL_EVPQ: - if (!res->result.pq) - return "unknown error (no result)"; - - return PQresultErrorMessage(res->result.pq); - - default: - FATAL("res->evsql->type"); - } - -} - -size_t evsql_result_rows (const struct evsql_result_info *res) { - switch (res->evsql->type) { - case EVSQL_EVPQ: - return PQntuples(res->result.pq); - - default: - FATAL("res->evsql->type"); - } -} - -size_t evsql_result_cols (const struct evsql_result_info *res) { - switch (res->evsql->type) { - case EVSQL_EVPQ: - return PQnfields(res->result.pq); - - default: - FATAL("res->evsql->type"); - } -} - -int evsql_result_binary (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, size_t size, int nullok) { - *ptr = NULL; - - switch (res->evsql->type) { - case EVSQL_EVPQ: - if (PQgetisnull(res->result.pq, row, col)) { - if (nullok) - return 0; - else - ERROR("[%zu:%zu] field is null", row, col); - } - - if (PQfformat(res->result.pq, col) != 1) - ERROR("[%zu:%zu] PQfformat is not binary: %d", row, col, PQfformat(res->result.pq, col)); - - if (size && PQgetlength(res->result.pq, row, col) != size) - ERROR("[%zu:%zu] field size mismatch: %zu -> %d", row, col, size, PQgetlength(res->result.pq, row, col)); - - *ptr = PQgetvalue(res->result.pq, row, col); - - return 0; - - default: - FATAL("res->evsql->type"); - } - -error: - return -1; -} - -int evsql_result_string (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, int nullok) { - return evsql_result_binary(res, row, col, ptr, 0, nullok); -} - -int evsql_result_uint16 (const struct evsql_result_info *res, size_t row, size_t col, uint16_t *uval, int nullok) { - const char *data; - int16_t sval; - - if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok)) - goto error; - - sval = ntohs(*((int16_t *) data)); - - if (sval < 0) - ERROR("negative value for unsigned: %d", sval); - - *uval = sval; - - return 0; - -error: - return nullok ? 0 : -1; -} - -int evsql_result_uint32 (const struct evsql_result_info *res, size_t row, size_t col, uint32_t *uval, int nullok) { - const char *data; - int32_t sval; - - if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok)) - goto error; - - sval = ntohl(*(int32_t *) data); - - if (sval < 0) - ERROR("negative value for unsigned: %d", sval); - - *uval = sval; - - return 0; - -error: - return nullok ? 0 : -1; -} - -int evsql_result_uint64 (const struct evsql_result_info *res, size_t row, size_t col, uint64_t *uval, int nullok) { - const char *data; - int64_t sval; - - if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok)) - goto error; - - sval = ntohq(*(int64_t *) data); - - if (sval < 0) - ERROR("negative value for unsigned: %ld", sval); - - *uval = sval; - - return 0; - -error: - return nullok ? 0 : -1; -} - -void evsql_result_free (const struct evsql_result_info *res) { - switch (res->evsql->type) { - case EVSQL_EVPQ: - return PQclear(res->result.pq); - - default: - FATAL("res->evsql->type"); - } -} diff -r 82cfdb6680d1 -r 99a41f48e29b src/evsql.h --- a/src/evsql.h Sun Oct 12 14:57:06 2008 +0300 +++ b/src/evsql.h Sun Oct 12 20:10:47 2008 +0300 @@ -20,6 +20,22 @@ struct evsql_query; /* + * Transaction handle + */ +struct evsql_trans; + +/* + * Transaction type + */ +enum evsql_trans_type { + EVSQL_TRANS_DEFAULT, + EVSQL_TRANS_SERIALIZABLE, + EVSQL_TRANS_REPEATABLE_READ, + EVSQL_TRANS_READ_COMMITTED, + EVSQL_TRANS_READ_UNCOMMITTED, +}; + +/* * Parameter type */ enum evsql_param_format { @@ -77,48 +93,79 @@ /* * Result type */ -union evsql_result { - // libpq - PGresult *pq; -}; - struct evsql_result_info { struct evsql *evsql; + struct evsql_trans *trans; int error; - union evsql_result result; + union evsql_result { + // libpq + PGresult *pq; + } result; }; /* - * Callback for handling query-level errors. + * Callback for handling query results. * - * The query has completed, either succesfully or unsuccesfully (look at info.error). - * info.result contains the result à la the evsql's type. + * The query has completed, either succesfully or unsuccesfully (nonzero .error). + * + * Use the evsql_result_* functions to manipulate the results. */ typedef void (*evsql_query_cb)(const struct evsql_result_info *res, void *arg); /* - * Callback for handling connection-level errors. + * Callback for handling global-level errors. * - * The SQL context/connection suffered an error. It is not valid anymore, and may not be used. + * The evsql is not useable anymore. + * + * XXX: this is not actually called yet, no retry logic implemented. */ typedef void (*evsql_error_cb)(struct evsql *evsql, void *arg); /* + * Callback for handling transaction-level errors. + * + * The transaction is not useable anymore. + */ +typedef void (*evsql_trans_error_cb)(struct evsql_trans *trans, void *arg); + +/* + * The transaction is ready for use. + */ +typedef void (*evsql_trans_ready_cb)(struct evsql_trans *trans, void *arg); + +/* * Create a new PostgreSQL/libpq(evpq) -based evsql using the given conninfo. + * + * The given conninfo must stay valid for the duration of the evsql's lifetime. */ struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg); /* - * Queue the given query for execution. + * Create a new transaction. + * + * Transactions are separate connections that provide transaction-isolation. + * + * Once the transaction is ready for use, ready_fn will be called. If the transaction fails, any pending query will be forgotten, and error_fn called. */ -struct evsql_query *evsql_query (struct evsql *evsql, const char *command, evsql_query_cb query_fn, void *cb_arg); +struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, void *cb_arg); /* - * Same, but uses the SQL-level support for binding parameters. + * Queue the given query for execution. + * + * If trans is specified (not NULL), then the transaction must be idle, and the query will be executed in that + * transaction's context. Otherwise, the query will be executed without a transaction, andmay be executed immediately, + * or if other similar queries are running, it will be queued for later execution. + * + * Once the query is complete (got a result, got an error, the connection failed), then the query_cb will be triggered. */ -struct evsql_query *evsql_query_params (struct evsql *evsql, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg); +struct evsql_query *evsql_query (struct evsql *evsql, struct evsql_trans *trans, const char *command, evsql_query_cb query_fn, void *cb_arg); + +/* + * Same as evsql_query, but uses the SQL-level support for binding parameters. + */ +struct evsql_query *evsql_query_params (struct evsql *evsql, struct evsql_trans *trans, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg); /* * Param-building functions diff -r 82cfdb6680d1 -r 99a41f48e29b src/evsql_internal.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/evsql_internal.h Sun Oct 12 20:10:47 2008 +0300 @@ -0,0 +1,125 @@ +#ifndef EVSQL_INTERNAL_H +#define EVSQL_INTERNAL_H + +#include + +#include "evsql.h" + +/* + * The engine type + */ +enum evsql_type { + EVSQL_EVPQ, // evpq +}; + +/* + * Contains the type, engine configuration, list of connections and waiting query queue. + */ +struct evsql { + // what event_base to use + struct event_base *ev_base; + + // what engine we use + enum evsql_type type; + + // callbacks + evsql_error_cb error_fn; + void *cb_arg; + + // engine-specific connection configuration + union { + const char *evpq; + } engine_conf; + + // list of connections that are open + LIST_HEAD(evsql_conn_list, evsql_conn) conn_list; + + // list of queries running or waiting to run + TAILQ_HEAD(evsql_query_queue, evsql_query) query_queue; +}; + +/* + * A single connection to the server. + * + * Contains the engine connection, may have a transaction associated, and may have a query associated. + */ +struct evsql_conn { + // evsql we belong to + struct evsql *evsql; + + // engine-specific connection info + union { + struct evpq_conn *evpq; + } engine; + + // our position in the conn list + LIST_ENTRY(evsql_conn) entry; + + // are we running a transaction? + struct evsql_trans *trans; + + // are we running a transactionless query? + struct evsql_query *query; +}; + +/* + * A single transaction. + * + * Has a connection associated and possibly a query (which will also be associated with the connection) + */ +struct evsql_trans { + // our evsql_conn/evsql + //struct evsql *evsql; + struct evsql_conn *conn; + + // callbacks + evsql_trans_error_cb error_fn; + evsql_trans_ready_cb ready_fn; + void *cb_arg; + + // the transaction type + enum evsql_trans_type type; + + // our current query + struct evsql_query *query; + +}; + +/* + * A single query. + * + * Has the info needed to exec the query (as these may be queued), and the callback/result info. + */ +struct evsql_query { + // the actual SQL query, this may or may not be ours, see _evsql_query_exec + char *command; + + // possible query params + struct evsql_query_param_info { + int count; + + Oid *types; + const char **values; + int *lengths; + int *formats; + + int result_format; + } params; + + // our callback + evsql_query_cb cb_fn; + void *cb_arg; + + // our position in the query list + TAILQ_ENTRY(evsql_query) entry; + + // the result + union { + PGresult *evpq; + } result; +}; + +// maximum length for a 'BEGIN TRANSACTION ...' query +#define EVSQL_QUERY_BEGIN_BUF 512 + +#endif /* EVSQL_INTERNAL_H */ diff -r 82cfdb6680d1 -r 99a41f48e29b src/evsql_util.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/evsql_util.c Sun Oct 12 20:10:47 2008 +0300 @@ -0,0 +1,170 @@ +#include + +#include "evsql.h" +#include "evsql_internal.h" +#include "lib/error.h" +#include "lib/misc.h" + +int evsql_param_string (struct evsql_query_params *params, size_t param, const char *ptr) { + struct evsql_query_param *p = ¶ms->list[param]; + + assert(p->type == EVSQL_PARAM_STRING); + + p->data_raw = ptr; + p->length = 0; + + return 0; +} + +int evsql_param_uint32 (struct evsql_query_params *params, size_t param, uint32_t uval) { + struct evsql_query_param *p = ¶ms->list[param]; + + assert(p->type == EVSQL_PARAM_UINT32); + + p->data.uint32 = htonl(uval); + p->data_raw = (const char *) &p->data.uint32; + p->length = sizeof(uval); + + return 0; +} + +const char *evsql_result_error (const struct evsql_result_info *res) { + if (!res->error) + return "No error"; + + switch (res->evsql->type) { + case EVSQL_EVPQ: + if (!res->result.pq) + return "unknown error (no result)"; + + return PQresultErrorMessage(res->result.pq); + + default: + FATAL("res->evsql->type"); + } + +} + +size_t evsql_result_rows (const struct evsql_result_info *res) { + switch (res->evsql->type) { + case EVSQL_EVPQ: + return PQntuples(res->result.pq); + + default: + FATAL("res->evsql->type"); + } +} + +size_t evsql_result_cols (const struct evsql_result_info *res) { + switch (res->evsql->type) { + case EVSQL_EVPQ: + return PQnfields(res->result.pq); + + default: + FATAL("res->evsql->type"); + } +} + +int evsql_result_binary (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, size_t size, int nullok) { + *ptr = NULL; + + switch (res->evsql->type) { + case EVSQL_EVPQ: + if (PQgetisnull(res->result.pq, row, col)) { + if (nullok) + return 0; + else + ERROR("[%zu:%zu] field is null", row, col); + } + + if (PQfformat(res->result.pq, col) != 1) + ERROR("[%zu:%zu] PQfformat is not binary: %d", row, col, PQfformat(res->result.pq, col)); + + if (size && PQgetlength(res->result.pq, row, col) != size) + ERROR("[%zu:%zu] field size mismatch: %zu -> %d", row, col, size, PQgetlength(res->result.pq, row, col)); + + *ptr = PQgetvalue(res->result.pq, row, col); + + return 0; + + default: + FATAL("res->evsql->type"); + } + +error: + return -1; +} + +int evsql_result_string (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, int nullok) { + return evsql_result_binary(res, row, col, ptr, 0, nullok); +} + +int evsql_result_uint16 (const struct evsql_result_info *res, size_t row, size_t col, uint16_t *uval, int nullok) { + const char *data; + int16_t sval; + + if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok)) + goto error; + + sval = ntohs(*((int16_t *) data)); + + if (sval < 0) + ERROR("negative value for unsigned: %d", sval); + + *uval = sval; + + return 0; + +error: + return nullok ? 0 : -1; +} + +int evsql_result_uint32 (const struct evsql_result_info *res, size_t row, size_t col, uint32_t *uval, int nullok) { + const char *data; + int32_t sval; + + if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok)) + goto error; + + sval = ntohl(*(int32_t *) data); + + if (sval < 0) + ERROR("negative value for unsigned: %d", sval); + + *uval = sval; + + return 0; + +error: + return nullok ? 0 : -1; +} + +int evsql_result_uint64 (const struct evsql_result_info *res, size_t row, size_t col, uint64_t *uval, int nullok) { + const char *data; + int64_t sval; + + if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok)) + goto error; + + sval = ntohq(*(int64_t *) data); + + if (sval < 0) + ERROR("negative value for unsigned: %ld", sval); + + *uval = sval; + + return 0; + +error: + return nullok ? 0 : -1; +} + +void evsql_result_free (const struct evsql_result_info *res) { + switch (res->evsql->type) { + case EVSQL_EVPQ: + return PQclear(res->result.pq); + + default: + FATAL("res->evsql->type"); + } +}