# HG changeset patch # User Tero Marttila # Date 1224183893 -10800 # Node ID 5de62ca9a5aa162537722e9c27674992c7a94c97 # Parent e944453ca9248260afb74460016d8eac7dc17203 reorganize evsql into a separate dir, rename dbfs slightly, and split dbfs_op out from dirop (in prep for fileop) diff -r e944453ca924 -r 5de62ca9a5aa Makefile --- a/Makefile Wed Oct 15 01:14:22 2008 +0300 +++ b/Makefile Thu Oct 16 22:04:53 2008 +0300 @@ -20,7 +20,7 @@ # complex modules EVSQL_OBJS = obj/evsql.o obj/evsql_util.o obj/evpq.o -DBFS_OBJS = obj/dbfs/dbfs.o obj/dbfs/common.o obj/dbfs/core.o obj/dbfs/dirop.o obj/dirbuf.o +DBFS_OBJS = obj/dbfs/dbfs.o obj/dbfs/common.o obj/dbfs/core.o obj/dbfs/op_base.o obj/dbfs/dirop.o obj/dirbuf.o # first target all: ${BIN_PATHS} @@ -44,7 +44,11 @@ # other targets clean : - -rm obj/* bin/* + -rm obj/* bin/* build/deps/* + +clean-deps: + -rm build/deps/*/*.d + -rm build/deps/*.d #obj-dirs: # python build/make_obj_dirs.py $(BIN_PATHS) diff -r e944453ca924 -r 5de62ca9a5aa src/dbfs/common.c --- a/src/dbfs/common.c Wed Oct 15 01:14:22 2008 +0300 +++ b/src/dbfs/common.c Thu Oct 16 22:04:53 2008 +0300 @@ -1,7 +1,8 @@ #include -#include "common.h" +#include "dbfs.h" +#include "../lib/log.h" mode_t _dbfs_mode (const char *type) { if (!strcmp(type, "DIR")) diff -r e944453ca924 -r 5de62ca9a5aa src/dbfs/common.h --- a/src/dbfs/common.h Wed Oct 15 01:14:22 2008 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,62 +0,0 @@ -#ifndef DBFS_COMMON_H -#define DBFS_COMMON_H - -#include -#include - -#include - -#include "../evfuse.h" -#include "../evsql.h" -#include "../lib/log.h" -#include "../lib/misc.h" - -/* - * Structs and functions shared between all dbfs components - */ - -#define SERROR(val) do { (val); goto error; } while(0) - -struct dbfs { - struct event_base *ev_base; - - const char *db_conninfo; - struct evsql *db; - - struct evfuse *ev_fuse; -}; - -// XXX: not sure how this should work -#define CACHE_TIMEOUT 1.0 - -/* - * Convert the CHAR(4) inodes.type from SQL into a mode_t. - * - * Returns zero for unknown types. - */ -mode_t _dbfs_mode (const char *type); - -/* - * Check that the number of rows and columns in the result set matches what we expect. - * - * If rows is nonzero, there must be exactly that many rows (mostly useful for rows=1). - * The number of columns must always be given, and match. - * - * Returns; - * -1 if the query failed, the columns/rows do not match - * 0 the results match - * 1 there were no results (zero rows) - */ -int _dbfs_check_res (const struct evsql_result_info *res, size_t rows, size_t cols); - -/* - * Fill a `struct state` with info retrieved from a SQL query. - * - * The result must contain four columns, starting at the given offset: - * inodes.type, inodes.mode, inodes.size, count(*) AS nlink - * - * Note that this does not fill the st_ino field - */ -int _dbfs_stat_info (struct stat *st, const struct evsql_result_info *res, size_t row, size_t col_offset); - -#endif /* DBFS_COMMON_H */ diff -r e944453ca924 -r 5de62ca9a5aa src/dbfs/core.c --- a/src/dbfs/core.c Wed Oct 15 01:14:22 2008 +0300 +++ b/src/dbfs/core.c Thu Oct 16 22:04:53 2008 +0300 @@ -1,6 +1,8 @@ -#include "common.h" -#include "ops.h" +#include "../lib/log.h" +#include "../lib/misc.h" + +#include "dbfs.h" /* * Core fs functionality like lookup, getattr diff -r e944453ca924 -r 5de62ca9a5aa src/dbfs/dbfs.c --- a/src/dbfs/dbfs.c Wed Oct 15 01:14:22 2008 +0300 +++ b/src/dbfs/dbfs.c Thu Oct 16 22:04:53 2008 +0300 @@ -1,9 +1,10 @@ #include +#include "dbfs.h" #include "../dbfs.h" -#include "common.h" -#include "ops.h" +#include "../lib/log.h" +#include "../lib/misc.h" static struct fuse_lowlevel_ops dbfs_llops = { diff -r e944453ca924 -r 5de62ca9a5aa src/dbfs/dbfs.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/dbfs/dbfs.h Thu Oct 16 22:04:53 2008 +0300 @@ -0,0 +1,61 @@ +#ifndef DBFS_DBFS_H +#define DBFS_DBFS_H + +#include +#include + +#include + +#include "ops.h" +#include "../evfuse.h" +#include "../evsql.h" + +/* + * Structs and functions shared between all dbfs components + */ + +#define SERROR(val) do { (val); goto error; } while(0) + +struct dbfs { + struct event_base *ev_base; + + const char *db_conninfo; + struct evsql *db; + + struct evfuse *ev_fuse; +}; + +// XXX: not sure how this should work +#define CACHE_TIMEOUT 1.0 + +/* + * Convert the CHAR(4) inodes.type from SQL into a mode_t. + * + * Returns zero for unknown types. + */ +mode_t _dbfs_mode (const char *type); + +/* + * Check that the number of rows and columns in the result set matches what we expect. + * + * If rows is nonzero, there must be exactly that many rows (mostly useful for rows=1). + * The number of columns must always be given, and match. + * + * Returns; + * -1 if the query failed, the columns/rows do not match + * 0 the results match + * 1 there were no results (zero rows) + */ +int _dbfs_check_res (const struct evsql_result_info *res, size_t rows, size_t cols); + +/* + * Fill a `struct state` with info retrieved from a SQL query. + * + * The result must contain four columns, starting at the given offset: + * inodes.type, inodes.mode, inodes.size, count(*) AS nlink + * + * Note that this does not fill the st_ino field + */ +int _dbfs_stat_info (struct stat *st, const struct evsql_result_info *res, size_t row, size_t col_offset); + +#endif /* DBFS_DBFS_H */ diff -r e944453ca924 -r 5de62ca9a5aa src/dbfs/dirop.c --- a/src/dbfs/dirop.c Wed Oct 15 01:14:22 2008 +0300 +++ b/src/dbfs/dirop.c Thu Oct 16 22:04:53 2008 +0300 @@ -2,100 +2,44 @@ #include #include -#include "common.h" -#include "ops.h" +#include "dbfs.h" +#include "op_base.h" #include "../dirbuf.h" +#include "../lib/log.h" /* * Directory related functionality like opendir, readdir, releasedir */ - struct dbfs_dirop { - struct fuse_file_info fi; - struct fuse_req *req; + struct dbfs_op base; - struct evsql_trans *trans; + // parent dir inodes + uint32_t parent; - // dir/parent dir inodes - uint32_t ino, parent; - - // opendir has returned and releasedir hasn't been called yet - int open; - // for readdir struct dirbuf dirbuf; }; /* - * Free the dirop, aborting any in-progress transaction. - * - * The dirop must any oustanding request responded to first, must not be open, and must not have a transaction. - * - * The dirbuf will be released, and the dirop free'd. + * Release the dirbuf. */ -static void _dbfs_dirop_free (struct dbfs_dirop *dirop) { - assert(dirop); - assert(!dirop->open); - assert(!dirop->req); - assert(!dirop->trans); - +static void _dbfs_dirop_free (struct dbfs_op *op_base) { + struct dbfs_dirop *dirop = (struct dbfs_dirop *) op_base; + // just release the dirbuf dirbuf_release(&dirop->dirbuf); - - // and then free the dirop - free(dirop); -} - -/* - * This will handle backend failures during requests. - * - * 1) if we have a trans, abort it - * 2) fail the req (mandatory) - * - * If the dirop is open, then we don't release it, but if it's not open, then the dirop will be free'd completely. - * - */ -static void _dbfs_dirop_fail (struct dbfs_dirop *dirop) { - int err; - - assert(dirop->req); - - if (dirop->trans) { - // abort the trans - evsql_trans_abort(dirop->trans); - - dirop->trans = NULL; - } - - // send an error reply - if ((err = fuse_reply_err(dirop->req, err))) - // XXX: handle these failures /somehow/, or requests will hang and interrupts might handle invalid dirops - EFATAL(err, "dbfs.fail %p:%p dirop_fail: reply with fuse_reply_err", dirop, dirop->req); - - // drop the req - dirop->req = NULL; - - // is it open? - if (!dirop->open) { - // no, we can free it now and then forget about the whole thing - _dbfs_dirop_free(dirop); - - } else { - // we need to wait for releasedir - - } } /* * Handle the results for the initial attribute lookup for the dir itself during opendir ops. */ -static void dbfs_opendir_info_res (const struct evsql_result_info *res, void *arg) { +static void dbfs_opendir_res (const struct evsql_result_info *res, void *arg) { struct dbfs_dirop *dirop = arg; int err; - assert(dirop->trans); - assert(dirop->req); - assert(!dirop->open); + assert(dirop->base.req); + assert(dirop->base.trans); // query callbacks don't get called if the trans fails + assert(!dirop->base.open); // check the results if ((err = _dbfs_check_res(res, 1, 2))) @@ -114,17 +58,11 @@ if (_dbfs_mode(type) != S_IFDIR) EERROR(err = ENOTDIR, "wrong type: %s", type); - INFO("[dbfs.opendir %p:%p] -> ino=%lu, parent=%lu, type=%s", dirop, dirop->req, (unsigned long int) dirop->ino, (unsigned long int) dirop->parent, type); + INFO("[dbfs.opendir %p:%p] -> ino=%lu, parent=%lu, type=%s", dirop, dirop->base.req, (unsigned long int) dirop->base.ino, (unsigned long int) dirop->parent, type); - // send the openddir reply - if ((err = fuse_reply_open(dirop->req, &dirop->fi))) - EERROR(err, "fuse_reply_open"); - - // req is done - dirop->req = NULL; - - // dirop is now open - dirop->open = 1; + // open_fn done, do the open_reply + if ((err = dbfs_op_open_reply(&dirop->base))) + goto error; // success, fallthrough for evsql_result_free err = 0; @@ -132,7 +70,7 @@ error: if (err) // fail it - _dbfs_dirop_fail(dirop); + dbfs_op_fail(&dirop->base, err); // free evsql_result_free(res); @@ -141,20 +79,16 @@ /* * The opendir transaction is ready for use. Query for the given dir's info */ -static void dbfs_dirop_ready (struct evsql_trans *trans, void *arg) { - struct dbfs_dirop *dirop = arg; - struct dbfs *ctx = fuse_req_userdata(dirop->req); +static void dbfs_dirop_open (struct dbfs_op *op_base) { + struct dbfs_dirop *dirop = (struct dbfs_dirop *) op_base; + struct dbfs *ctx = fuse_req_userdata(dirop->base.req); int err; - // XXX: unless we abort queries - assert(trans == dirop->trans); - assert(dirop->req); - assert(!dirop->open); + assert(dirop->base.trans); + assert(dirop->base.req); + assert(!dirop->base.open); - INFO("[dbfs.opendir %p:%p] -> trans=%p", dirop, dirop->req, trans); - - // remember the transaction - dirop->trans = trans; + INFO("[dbfs.opendir %p:%p] -> trans=%p", dirop, dirop->base.req, dirop->base.trans); // first fetch info about the dir itself const char *sql = @@ -171,12 +105,12 @@ // build params if (0 - || evsql_param_uint32(¶ms, 0, dirop->ino) + || evsql_param_uint32(¶ms, 0, dirop->base.ino) ) SERROR(err = EIO); // query - if (evsql_query_params(ctx->db, dirop->trans, sql, ¶ms, dbfs_opendir_info_res, dirop) == NULL) + if (evsql_query_params(ctx->db, dirop->base.trans, sql, ¶ms, dbfs_opendir_res, dirop) == NULL) SERROR(err = EIO); // ok, wait for the info results @@ -184,67 +118,11 @@ error: // fail it - _dbfs_dirop_fail(dirop); + dbfs_op_fail(&dirop->base, err); } /* - * The dirop trans was committed, i.e. releasedir has completed - */ -static void dbfs_dirop_done (struct evsql_trans *trans, void *arg) { - struct dbfs_dirop *dirop = arg; - int err; - - assert(dirop->trans); - assert(dirop->req); - assert(!dirop->open); // should not be considered as open anymore at this point, as errors should release - - INFO("[dbfs.releasedir %p:%p] -> OK", dirop, dirop->req); - - // forget trans - dirop->trans = NULL; - - // just reply - if ((err = fuse_reply_err(dirop->req, 0))) - // XXX: handle these failures /somehow/, or requests will hang and interrupts might handle invalid dirops - EFATAL(err, "[dbfs.releasedir %p:%p] dirop_done: reply with fuse_reply_err", dirop, dirop->req); - - // req is done - dirop->req = NULL; - - // then we can just free dirop - _dbfs_dirop_free(dirop); -} - -/* - * The dirop trans has failed, somehow, at some point, some where. - * - * This might happend during the opendir evsql_trans, during a readdir evsql_query, during the releasedir - * evsql_trans_commit, or at any point in between. - * - * 1) loose the transaction - * 2) if dirop has a req, we handle failing it - */ -static void dbfs_dirop_error (struct evsql_trans *trans, void *arg) { - struct dbfs_dirop *dirop = arg; - - INFO("[dbfs:dirop %p:%p] evsql transaction error: %s", dirop, dirop->req, evsql_trans_error(trans)); - - // deassociate the trans - dirop->trans = NULL; - - // if we were answering a req, error it out, and if the dirop isn't open, release it - // if we didn't have a req outstanding, the dirop must be open, so we wouldn't free it in any case, and must wait - // for the next readdir/releasedir to detect this and return an error reply - if (dirop->req) - _dbfs_dirop_fail(dirop); - else - assert(dirop->open); -} - -/* - * Handle opendir(), this means starting a new transaction, dbfs_dirop_ready/error will continue on from there. - * - * The contents of fi will be copied into the dirop, and will be used as the basis for the fuse_reply_open reply. + * Handle opendir(), this means starting a new op. */ void dbfs_opendir (struct fuse_req *req, fuse_ino_t ino, struct fuse_file_info *fi) { struct dbfs *ctx = fuse_req_userdata(req); @@ -255,28 +133,19 @@ if ((dirop = calloc(1, sizeof(*dirop))) == NULL && (err = EIO)) ERROR("calloc"); + // do the op_open + if ((err = dbfs_op_open(ctx, &dirop->base, req, ino, fi, _dbfs_dirop_free, dbfs_dirop_open))) + ERROR("dbfs_op_open"); + INFO("[dbfs.opendir %p:%p] ino=%lu, fi=%p", dirop, req, ino, fi); - // store the dirop - // copy *fi since it's on the stack - dirop->fi = *fi; - dirop->fi.fh = (uint64_t) dirop; - dirop->req = req; - dirop->ino = ino; - - // start a new transaction - if ((dirop->trans = evsql_trans(ctx->db, EVSQL_TRANS_SERIALIZABLE, dbfs_dirop_error, dbfs_dirop_ready, dbfs_dirop_done, dirop)) == NULL) - SERROR(err = EIO); - - // XXX: handle interrupts - // wait return; error: if (dirop) { // we can fail normally - _dbfs_dirop_fail(dirop); + dbfs_op_fail(&dirop->base, err); } else { // must error out manually as we couldn't alloc the context @@ -291,20 +160,20 @@ * Fill up the dirbuf, and then send the reply. * */ -static void dbfs_readdir_files_res (const struct evsql_result_info *res, void *arg) { +static void dbfs_readdir_res (const struct evsql_result_info *res, void *arg) { struct dbfs_dirop *dirop = arg; int err; size_t row; - assert(dirop->req); - assert(dirop->trans); - assert(dirop->open); + assert(dirop->base.req); + assert(dirop->base.trans); // query callbacks don't get called if the trans fails + assert(dirop->base.open); // check the results if ((err = _dbfs_check_res(res, 0, 4)) < 0) SERROR(err = EIO); - INFO("[dbfs.readdir %p:%p] -> files: res_rows=%zu", dirop, dirop->req, evsql_result_rows(res)); + INFO("[dbfs.readdir %p:%p] -> files: res_rows=%zu", dirop, dirop->base.req, evsql_result_rows(res)); // iterate over the rows for (row = 0; row < evsql_result_rows(res); row++) { @@ -324,7 +193,7 @@ // add to the dirbuf // offsets are just offset + 2 - if ((err = dirbuf_add(dirop->req, &dirop->dirbuf, off + 2, off + 3, name, ino, _dbfs_mode(type))) < 0 && (err = EIO)) + if ((err = dirbuf_add(dirop->base.req, &dirop->dirbuf, off + 2, off + 3, name, ino, _dbfs_mode(type))) < 0 && (err = EIO)) ERROR("failed to add dirent for inode=%lu", (long unsigned int) ino); // stop if it's full @@ -333,18 +202,19 @@ } // send it - if ((err = dirbuf_done(dirop->req, &dirop->dirbuf))) + if ((err = dirbuf_done(dirop->base.req, &dirop->dirbuf))) EERROR(err, "failed to send buf"); + + // handled the req + if ((err = dbfs_op_req_done(&dirop->base))) + goto error; - // req is done - dirop->req = NULL; - // good, fallthrough err = 0; error: if (err) - _dbfs_dirop_fail(dirop); + dbfs_op_fail(&dirop->base, err); // free evsql_result_free(res); @@ -352,7 +222,7 @@ /* * Handle a readdir request. This will execute a SQL query inside the transaction to get the files at the given offset, - * and _dbfs_readdir_res will handle the results. + * and dbfs_readdir_res will handle the results. * * If trans failed earlier, detect that and return an error. */ @@ -360,20 +230,12 @@ struct dbfs *ctx = fuse_req_userdata(req); struct dbfs_dirop *dirop = (struct dbfs_dirop *) fi->fh; int err; - - assert(dirop); - assert(!dirop->req); - assert(dirop->open); - assert(dirop->ino == ino); + + // get the op + if ((dirop = (struct dbfs_dirop *) dbfs_op_req(req, ino, fi)) == NULL) + SERROR(err = EIO); - // store the new req - dirop->req = req; - - // detect earlier failures - if (!dirop->trans && (err = EIO)) - ERROR("dirop trans has failed"); - - INFO("[dbfs.readdir %p:%p] ino=%lu, size=%zu, off=%zu, fi=%p : trans=%p", dirop, req, ino, size, off, fi, dirop->trans); + INFO("[dbfs.readdir %p:%p] ino=%lu, size=%zu, off=%zu, fi=%p : trans=%p", dirop, req, ino, size, off, fi, dirop->base.trans); // create the dirbuf if (dirbuf_init(&dirop->dirbuf, size, off)) @@ -383,9 +245,9 @@ // we set the next offset to 2, because all dirent offsets will be larger than that // assume that these two should *always* fit if ((err = (0 - || dirbuf_add(req, &dirop->dirbuf, 0, 1, ".", dirop->ino, S_IFDIR ) + || dirbuf_add(req, &dirop->dirbuf, 0, 1, ".", dirop->base.ino, S_IFDIR ) || dirbuf_add(req, &dirop->dirbuf, 1, 2, "..", - dirop->parent ? dirop->parent : dirop->ino, S_IFDIR ) + dirop->parent ? dirop->parent : dirop->base.ino, S_IFDIR ) )) && (err = EIO)) ERROR("failed to add . and .. dirents"); @@ -411,21 +273,21 @@ // build params if (0 - || evsql_param_uint32(¶ms, 0, dirop->ino) + || evsql_param_uint32(¶ms, 0, ino) || evsql_param_uint32(¶ms, 1, off) || evsql_param_uint32(¶ms, 2, dirbuf_estimate(&dirop->dirbuf, 0)) ) SERROR(err = EIO); // query - if (evsql_query_params(ctx->db, dirop->trans, sql, ¶ms, dbfs_readdir_files_res, dirop) == NULL) + if (evsql_query_params(ctx->db, dirop->base.trans, sql, ¶ms, dbfs_readdir_res, dirop) == NULL) SERROR(err = EIO); // good, wait return; error: - _dbfs_dirop_fail(dirop); + dbfs_op_fail(&dirop->base, err); } /* @@ -434,63 +296,7 @@ * The dirop may be in a failed state. */ void dbfs_releasedir (struct fuse_req *req, fuse_ino_t ino, struct fuse_file_info *fi) { - struct dbfs *ctx = fuse_req_userdata(req); - struct dbfs_dirop *dirop = (struct dbfs_dirop *) fi->fh; - int err; - - (void) ctx; - - assert(dirop); - assert(!dirop->req); - assert(dirop->ino == ino); - - // update to this req - dirop->req = req; - - // fi is irrelevant, we don't touch the flags anyways - (void) fi; - - // handle failed trans - if (!dirop->trans) - ERROR("trans has failed"); - - // log - INFO("[dbfs.releasedir %p:%p] ino=%lu, fi=%p : trans=%p", dirop, req, ino, fi, dirop->trans); - - // we must commit the transaction (although it was jut SELECTs, no changes). - // Note that this might cause dbfs_dirop_error to be called, we can tell if that happaned by looking at dirop->req - // or dirop->trans this means that we need to keep the dirop open when calling trans_commit, so that dirop_error - // doesn't free it out from underneath us. - if (evsql_trans_commit(dirop->trans)) - SERROR(err = EIO); - - // fall-through to cleanup - err = 0; - -error: - // the dirop is not open anymore and can be free'd: - // a) if we already caught an error - // b) if we get+send an error later on - // c) if we get+send the done/no-error later on - dirop->open = 0; - - // did the commit/pre-commit-checks fail? - if (err) { - // a) the trans failed earlier (readdir), so we have a req but no trans - // b) the trans commit failed, dirop_error got called -> no req and no trans - // c) the trans commit failed, dirop_error did not get called -> have req and trans - // we either have a req (may or may not have trans), or we don't have a trans either - // i.e. there is no situation where we don't have a req but do have a trans - - if (dirop->req) - _dbfs_dirop_fail(dirop); - else - assert(!dirop->trans); - - } else { - // shouldn't slip by, dirop_done should not get called directly. Once it does, it will handle both. - assert(dirop->req); - assert(dirop->trans); - } + // just passthrough to dbfs_op + dbfs_op_release(req, ino, fi); } diff -r e944453ca924 -r 5de62ca9a5aa src/dbfs/op_base.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/dbfs/op_base.c Thu Oct 16 22:04:53 2008 +0300 @@ -0,0 +1,285 @@ +#include +#include + +#include "op_base.h" +#include "../lib/log.h" + +/* + * Free the op. + * + * The op must any oustanding request responded to first, must not be open, and must not have a transaction. + * + * The op will be free'd. + */ +static void dbfs_op_free (struct dbfs_op *op) { + assert(op); + assert(!op->open); + assert(!op->req); + assert(!op->trans); + + // free_fn + if (op->free_fn) + op->free_fn(op); + + // and then free the op + free(op); +} + +void dbfs_op_fail (struct dbfs_op *op, int err) { + assert(op->req); + + if (op->trans) { + // abort the trans + evsql_trans_abort(op->trans); + + op->trans = NULL; + } + + // send an error reply + if ((err = fuse_reply_err(op->req, err))) + // XXX: handle these failures /somehow/, or requests will hang and interrupts might handle invalid ops + EFATAL(err, "\tdbfs_op.fail %p:%p -> reply with fuse_reply_err", op, op->req); + + // drop the req + op->req = NULL; + + // is it open? + if (!op->open) { + // no, we can free it now and then forget about the whole thing + dbfs_op_free(op); + + } else { + // we need to wait for release + + } +} + +/* + * The op_open transaction is ready for use. + */ +static void dbfs_op_ready (struct evsql_trans *trans, void *arg) { + struct dbfs_op *op = arg; + + assert(trans == op->trans); + assert(op->req); + assert(!op->open); + + INFO("\tdbfs_op.ready %p:%p -> trans=%p", op, op->req, trans); + + // remember the transaction + op->trans = trans; + + // ready + op->open_fn(op); + + // good + return; +} + +/* + * The op trans was committed, i.e. release has completed + */ +static void dbfs_op_done (struct evsql_trans *trans, void *arg) { + struct dbfs_op *op = arg; + int err; + + assert(trans == op->trans); + assert(op->req); + assert(!op->open); // should not be considered as open anymore at this point, as errors should release + + INFO("\tdbfs_op.done %p:%p -> OK", op, op->req); + + // forget trans + op->trans = NULL; + + // just reply + if ((err = fuse_reply_err(op->req, 0))) + // XXX: handle these failures /somehow/, or requests will hang and interrupts might handle invalid ops + EFATAL(err, "dbfs_op.done %p:%p -> reply with fuse_reply_err", op, op->req); + + // req is done + op->req = NULL; + + // then we can just free op + dbfs_op_free(op); +} + +/* + * The op trans has failed, somehow, at some point, some where. + * + * This might happend during the open evsql_trans, during a read evsql_query, during the release + * evsql_trans_commit, or at any point in between. + * + * 1) loose the transaction + * 2) if op has a req, we handle failing it + */ +static void dbfs_op_error (struct evsql_trans *trans, void *arg) { + struct dbfs_op *op = arg; + + // unless we fail + assert(trans == op->trans); + + INFO("\tdbfs_op.error %p:%p -> evsql transaction error: %s", op, op->req, evsql_trans_error(trans)); + + // deassociate the trans + op->trans = NULL; + + // if we were answering a req, error it out, and if the op isn't open, free + // if we didn't have a req outstanding, the op must be open, so we wouldn't free it in any case, and must wait + // for the next read/release to detect this and return an error reply + if (op->req) + dbfs_op_fail(op, EIO); + else + assert(op->open); +} + +int dbfs_op_open (struct dbfs *ctx, struct dbfs_op *op, struct fuse_req *req, fuse_ino_t ino, struct fuse_file_info *fi, dbfs_op_free_cb free_fn, dbfs_op_open_cb open_fn) { + int err; + + assert(op && req && ino && fi); + assert(!(op->req || op->ino)); + + // initialize the op + op->req = req; + op->ino = ino; + // copy *fi since it's on the stack + op->fi = *fi; + op->fi.fh = (uint64_t) op; + op->free_fn = free_fn; + op->open_fn = open_fn; + + // start a new transaction + if ((op->trans = evsql_trans(ctx->db, EVSQL_TRANS_SERIALIZABLE, dbfs_op_error, dbfs_op_ready, dbfs_op_done, op)) == NULL) + SERROR(err = EIO); + + // XXX: handle interrupts + + // good + return 0; + +error: + // nothing of ours to cleanup + return err; +} + +int dbfs_op_open_reply (struct dbfs_op *op) { + int err; + + // detect earlier failures + if (!op->trans && (err = EIO)) + ERROR("op trans has failed"); + + // send the openddir reply + if ((err = fuse_reply_open(op->req, &op->fi))) + EERROR(err, "fuse_reply_open"); + + // req is done + op->req = NULL; + + // op is now open + op->open = 1; + + // good + return 0; + +error: + return err; +} + +struct dbfs_op *dbfs_op_req (struct fuse_req *req, fuse_ino_t ino, struct fuse_file_info *fi) { + struct dbfs_op *op = (struct dbfs_op *) fi->fh; + int err; + + // validate + assert(op); + assert(!op->req); + assert(op->open); + assert(op->ino == ino); + + // store the new req + op->req = req; + + // detect earlier failures + if (!op->trans && (err = EIO)) + ERROR("op trans has failed"); + + // good + return op; + +error: + dbfs_op_fail(op, err); + + return NULL; +} + +int dbfs_op_req_done (struct dbfs_op *op) { + // validate + assert(op); + assert(op->req); + assert(op->open); + + // unassign the req + op->req = NULL; + + // k + return 0; +} + +void dbfs_op_release (struct fuse_req *req, fuse_ino_t ino, struct fuse_file_info *fi) { + struct dbfs_op *op = (struct dbfs_op *) fi->fh; + int err; + + assert(op); + assert(!op->req); + assert(op->ino == ino); + + // update to this req + op->req = req; + + // fi is irrelevant, we don't touch the flags anyways + (void) fi; + + // handle failed trans + if (!op->trans && (err = EIO)) + ERROR("trans has failed"); + + // log + INFO("\tdbfs_op.release %p:%p : ino=%lu, fi=%p : trans=%p", op, req, ino, fi, op->trans); + + // we must commit the transaction. + // Note that this might cause dbfs_op_error to be called, we can tell if that happaned by looking at op->req + // or op->trans - this means that we need to keep the op open when calling trans_commit, so that op_error + // doesn't free it out from underneath us. + if (evsql_trans_commit(op->trans)) + SERROR(err = EIO); + + // fall-through to cleanup + err = 0; + +error: + // the op is not open anymore and can be free'd next, because we either: + // a) already caught an error + // b) we get+send an error later on + // c) we get+send the done/no-error later on + op->open = 0; + + // did the commit/pre-commit-checks fail? + if (err) { + // a) the trans failed earlier (read), so we have a req but no trans + // b) the trans commit failed, op_error got called -> no req and no trans + // c) the trans commit failed, op_error did not get called -> have req and trans + // we either have a req (may or may not have trans), or we don't have a trans either + // i.e. there is no situation where we don't have a req but do have a trans + + if (op->req) + dbfs_op_fail(op, err); + else + assert(!op->trans); + + } else { + // shouldn't slip by, op_done should not get called directly. Once it does, it will handle both. + assert(op->req); + assert(op->trans); + } +} + diff -r e944453ca924 -r 5de62ca9a5aa src/dbfs/op_base.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/dbfs/op_base.h Thu Oct 16 22:04:53 2008 +0300 @@ -0,0 +1,94 @@ +#ifndef DBFS_OP_BASE_H +#define DBFS_OP_BASE_H + +#include "dbfs.h" + +// forward-declaration for callbacks +struct dbfs_op; + +/* + * Called by dbfs_op_free to release any resources when the op is free'd (i.e. not open anymore). + */ +typedef void (*dbfs_op_free_cb) (struct dbfs_op *op_base); + +/* + * Called after the transaction has been opened, and before reply_open. + * + * You can do any at-open initialization here. + */ +typedef void (*dbfs_op_open_cb) (struct dbfs_op *op_base); + +// the base op state +struct dbfs_op { + struct fuse_file_info fi; + struct fuse_req *req; + + struct evsql_trans *trans; + + // op target inode + uint32_t ino; + + // open has returned and release hasn't been called yet + int open; + + // callbacks + dbfs_op_free_cb free_fn; + dbfs_op_open_cb open_fn; +}; + +/* + * This will handle failures during requests. + * + * 1) if we have a trans, abort it + * 2) fail the req (mandatory) with the given err + * + * If the op is open, then we don't release it, but if it's not open, then the op will be free'd completely. + * + */ +void dbfs_op_fail (struct dbfs_op *op, int err); + +/* + * Open the op, that is, store all the initial state, and open a new transaction. + * + * The op must be pre-allocated and zero-initialized. + * + * This will always set op->req, so op is safe for dbfs_op_fail after this. + * + * This does not fail the dirop, handle error replies yourself. + * + * Returns zero on success, err on failure. + */ +int dbfs_op_open (struct dbfs *ctx, struct dbfs_op *op, struct fuse_req *req, fuse_ino_t ino, struct fuse_file_info *fi, dbfs_op_free_cb free_fn, dbfs_op_open_cb ready_fn); + +/* + * Should be called from open_fn to send the fuse_reply_open with fi and mark the op as open. + * + * If the op has failed earlier or fuse_reply_open fails, this will return nonzero. Fail the op yourself. + */ +int dbfs_op_open_reply (struct dbfs_op *op); + +/* + * Start handling a normal op requests. + * + * Lookup the op for the given fi, validate params, and assign the new req. + * + * In case the op failed previously, this will error the req and return NULL, indicating that the req has been handled. + */ +struct dbfs_op *dbfs_op_req (struct fuse_req *req, fuse_ino_t ino, struct fuse_file_info *fi); + +/* + * Done handling a request, adjust state accordingly. + * + * req *must* have been replied to. + */ +int dbfs_op_req_done (struct dbfs_op *op); + +/* + * Handle the op release. + * + * This will take care of committing the transaction, sending any reply/error, closing the op and freeing it. + */ +void dbfs_op_release (struct fuse_req *req, fuse_ino_t, struct fuse_file_info *fi); + + +#endif /* DBFS_OP_BASE_H */ diff -r e944453ca924 -r 5de62ca9a5aa src/evfuse.c --- a/src/evfuse.c Wed Oct 15 01:14:22 2008 +0300 +++ b/src/evfuse.c Thu Oct 16 22:04:53 2008 +0300 @@ -47,7 +47,7 @@ ERROR("fuse_chan_recv failed: %s", strerror(-res)); if (res > 0) { - INFO("[evfuse] got %d bytes from /dev/fuse", res); + DEBUG("got %d bytes from /dev/fuse", res); // received a fuse_req, so process it fuse_session_process(ctx->session, ctx->recv_buf, res, ch); diff -r e944453ca924 -r 5de62ca9a5aa src/evsql.c --- a/src/evsql.c Wed Oct 15 01:14:22 2008 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1019 +0,0 @@ -#define _GNU_SOURCE -#include -#include -#include - -#include "evsql.h" -#include "evsql_internal.h" -#include "evpq.h" -#include "lib/log.h" -#include "lib/error.h" -#include "lib/misc.h" - -/* - * A couple function prototypes - */ -static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn); - -/* - * Actually execute the given query. - * - * The backend should be able to accept the query at this time. - * - * You should assume that if trying to execute a query fails, then the connection should also be considred as failed. - */ -static int _evsql_query_exec (struct evsql_conn *conn, struct evsql_query *query, const char *command) { - int err; - - switch (conn->evsql->type) { - case EVSQL_EVPQ: - // got params? - if (query->params.count) { - err = evpq_query_params(conn->engine.evpq, command, - query->params.count, - query->params.types, - query->params.values, - query->params.lengths, - query->params.formats, - query->params.result_format - ); - - } else { - // plain 'ole query - err = evpq_query(conn->engine.evpq, command); - } - - if (err) { - if (PQstatus(evpq_pgconn(conn->engine.evpq)) != CONNECTION_OK) - WARNING("conn failed"); - else - WARNING("query failed, dropping conn as well"); - } - - break; - - default: - FATAL("evsql->type"); - } - - if (!err) - // assign the query - conn->query = query; - - return err; -} - -/* - * Free the query and related resources, doesn't trigger any callbacks or remove from any queues. - * - * The command should already be taken care of (NULL). - */ -static void _evsql_query_free (struct evsql_query *query) { - if (!query) - return; - - assert(query->command == NULL); - - // free params if present - free(query->params.types); - free(query->params.values); - free(query->params.lengths); - free(query->params.formats); - - // free the query itself - free(query); -} - -/* - * Execute the callback if res is given, and free the query. - * - * The query has been aborted, it will simply be freed - */ -static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *res) { - if (res) { - if (query->cb_fn) - // call the callback - query->cb_fn(res, query->cb_arg); - else - WARNING("supressing cb_fn because query was aborted"); - } - - // free - _evsql_query_free(query); -} - -/* - * XXX: - * / -static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *res) { - struct evsql_query *query; - - // clear the queue - while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) { - _evsql_query_done(query, res); - - TAILQ_REMOVE(&evsql->query_queue, query, entry); - } - - // free - free(evsql); -} -*/ - -/* - * Free the transaction, it should already be deassociated from the query and conn. - */ -static void _evsql_trans_free (struct evsql_trans *trans) { - // ensure we don't leak anything - assert(trans->query == NULL); - assert(trans->conn == NULL); - - // free - free(trans); -} - -/* - * Release a connection. It should already be deassociated from the trans and query. - * - * Releases the engine, removes from the conn_list and frees this. - */ -static void _evsql_conn_release (struct evsql_conn *conn) { - // ensure we don't leak anything - assert(conn->trans == NULL); - assert(conn->query == NULL); - - // release the engine - switch (conn->evsql->type) { - case EVSQL_EVPQ: - evpq_release(conn->engine.evpq); - break; - - default: - FATAL("evsql->type"); - } - - // remove from list - LIST_REMOVE(conn, entry); - - // catch deadlocks - assert(!LIST_EMPTY(&conn->evsql->conn_list) || TAILQ_EMPTY(&conn->evsql->query_queue)); - - // free - free(conn); -} - -/* - * Release a transaction, it should already be deassociated from the query. - * - * Perform a two-way-deassociation with the conn, and then free the trans. - */ -static void _evsql_trans_release (struct evsql_trans *trans) { - assert(trans->query == NULL); - assert(trans->conn != NULL); - - // deassociate the conn - trans->conn->trans = NULL; trans->conn = NULL; - - // free the trans - _evsql_trans_free(trans); -} - -/* - * Fail a single query, this will trigger the callback and free it. - * - * NOTE: Only for *TRANSACTIONLESS* queries. - */ -static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) { - struct evsql_result_info res; ZINIT(res); - - // set up the result_info - res.evsql = evsql; - res.trans = NULL; - res.error = 1; - - // finish off the query - _evsql_query_done(query, &res); -} - -/* - * Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the - * conn, and then free the trans. - */ -static void _evsql_trans_fail (struct evsql_trans *trans) { - if (trans->query) { - // free the query silently - _evsql_query_free(trans->query); trans->query = NULL; - - // also deassociate it from the conn! - trans->conn->query = NULL; - } - - // tell the user - // XXX: trans is in a bad state during this call - if (trans->error_fn) - trans->error_fn(trans, trans->cb_arg); - else - WARNING("supressing error because error_fn was NULL"); - - // deassociate and release the conn - trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL; - - // pump the queue for requests that were waiting for this connection - _evsql_pump(trans->evsql, NULL); - - // free the trans - _evsql_trans_free(trans); -} - -/* - * Fail a connection. If the connection is transactional, this will just call _evsql_trans_fail, but otherwise it will - * fail any ongoing query, and then release the connection. - */ -static void _evsql_conn_fail (struct evsql_conn *conn) { - if (conn->trans) { - // let transactions handle their connection failures - _evsql_trans_fail(conn->trans); - - } else { - if (conn->query) { - // fail the in-progress query - _evsql_query_fail(conn->evsql, conn->query); conn->query = NULL; - } - - // finish off the whole connection - _evsql_conn_release(conn); - } -} - -/* - * Processes enqueued non-transactional queries until the queue is empty, or we managed to exec a query. - * - * If execing a query on a connection fails, both the query and the connection are failed (in that order). - * - * Any further queries will then also be failed, because there's no reconnection/retry logic yet. - * - * This means that if conn is NULL, all queries are failed. - */ -static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) { - struct evsql_query *query; - int err; - - // look for waiting queries - while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) { - // dequeue - TAILQ_REMOVE(&evsql->query_queue, query, entry); - - if (conn) { - // try and execute it - err = _evsql_query_exec(conn, query, query->command); - } - - // free the command buf - free(query->command); query->command = NULL; - - if (err || !conn) { - if (!conn) { - // warn when dropping queries - WARNING("failing query becuse there are no conns"); - } - - // fail the query - _evsql_query_fail(evsql, query); - - if (conn) { - // fail the connection - WARNING("failing the connection because a query-exec failed"); - - _evsql_conn_fail(conn); conn = NULL; - } - - } else { - // we have succesfully enqueued a query, and we can wait for this connection to complete - break; - - } - - // handle the rest of the queue - } - - // ok - return; -} - -/* - * Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use. - */ -static void _evsql_trans_ready (const struct evsql_result_info *res, void *arg) { - (void) arg; - - assert(res->trans); - - // check for errors - if (res->error) - ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res)); - - // transaction is now ready for use - res->trans->ready_fn(res->trans, res->trans->cb_arg); - - // good - return; - -error: - _evsql_trans_fail(res->trans); -} - -/* - * The transaction's connection is ready, send the 'BEGIN' query. - * - * If anything fails, calls _evsql_trans_fail and returns nonzero, zero on success - */ -static int _evsql_trans_conn_ready (struct evsql *evsql, struct evsql_trans *trans) { - char trans_sql[EVSQL_QUERY_BEGIN_BUF]; - const char *isolation_level; - int ret; - - // determine the isolation_level to use - switch (trans->type) { - case EVSQL_TRANS_DEFAULT: - isolation_level = NULL; break; - - case EVSQL_TRANS_SERIALIZABLE: - isolation_level = "SERIALIZABLE"; break; - - case EVSQL_TRANS_REPEATABLE_READ: - isolation_level = "REPEATABLE READ"; break; - - case EVSQL_TRANS_READ_COMMITTED: - isolation_level = "READ COMMITTED"; break; - - case EVSQL_TRANS_READ_UNCOMMITTED: - isolation_level = "READ UNCOMMITTED"; break; - - default: - FATAL("trans->type: %d", trans->type); - } - - // build the trans_sql - if (isolation_level) - ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION ISOLATION LEVEL %s", isolation_level); - else - ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION"); - - // make sure it wasn't truncated - if (ret >= EVSQL_QUERY_BEGIN_BUF) - ERROR("trans_sql overflow: %d >= %d", ret, EVSQL_QUERY_BEGIN_BUF); - - // execute the query - if (evsql_query(evsql, trans, trans_sql, _evsql_trans_ready, NULL) == NULL) - ERROR("evsql_query"); - - // success - return 0; - -error: - // fail the transaction - _evsql_trans_fail(trans); - - return -1; -} - -/* - * The evpq connection was succesfully established. - */ -static void _evsql_evpq_connected (struct evpq_conn *_conn, void *arg) { - struct evsql_conn *conn = arg; - - if (conn->trans) - // notify the transaction - // don't care about errors - (void) _evsql_trans_conn_ready(conn->evsql, conn->trans); - - else - // pump any waiting transactionless queries - _evsql_pump(conn->evsql, conn); -} - -/* - * Got one result on this evpq connection. - */ -static void _evsql_evpq_result (struct evpq_conn *_conn, PGresult *result, void *arg) { - struct evsql_conn *conn = arg; - struct evsql_query *query = conn->query; - - assert(query != NULL); - - // if we get multiple results, only return the first one - if (query->result.evpq) { - WARNING("[evsql] evpq query returned multiple results, discarding previous one"); - - PQclear(query->result.evpq); query->result.evpq = NULL; - } - - // remember the result - query->result.evpq = result; -} - -/* - * No more results for this query. - */ -static void _evsql_evpq_done (struct evpq_conn *_conn, void *arg) { - struct evsql_conn *conn = arg; - struct evsql_query *query = conn->query; - struct evsql_result_info res; ZINIT(res); - - assert(query != NULL); - - // set up the result_info - res.evsql = conn->evsql; - res.trans = conn->trans; - - if (query->result.evpq == NULL) { - // if a query didn't return any results (bug?), warn and fail the query - WARNING("[evsql] evpq query didn't return any results"); - - res.error = 1; - - } else if (strcmp(PQresultErrorMessage(query->result.evpq), "") != 0) { - // the query failed with some error - res.error = 1; - res.result.pq = query->result.evpq; - - } else { - res.error = 0; - res.result.pq = query->result.evpq; - - } - - // de-associate the query from the connection - conn->query = NULL; - - // how we handle query completion depends on if we're a transaction or not - if (conn->trans) { - // we can deassign the trans's query - conn->trans->query = NULL; - - // was an abort? - if (!query->cb_fn) - // notify the user that the transaction query has been aborted - conn->trans->ready_fn(conn->trans, conn->trans->cb_arg); - - // then hand the query to the user - _evsql_query_done(query, &res); - - } else { - // a transactionless query, so just finish it off and pump any other waiting ones - _evsql_query_done(query, &res); - - // pump the next one - _evsql_pump(conn->evsql, conn); - } -} - -/* - * The connection failed. - */ -static void _evsql_evpq_failure (struct evpq_conn *_conn, void *arg) { - struct evsql_conn *conn = arg; - - // just fail the conn - _evsql_conn_fail(conn); -} - -/* - * Our evpq behaviour - */ -static struct evpq_callback_info _evsql_evpq_cb_info = { - .fn_connected = _evsql_evpq_connected, - .fn_result = _evsql_evpq_result, - .fn_done = _evsql_evpq_done, - .fn_failure = _evsql_evpq_failure, -}; - -/* - * Allocate the generic evsql context. - */ -static struct evsql *_evsql_new_base (struct event_base *ev_base, evsql_error_cb error_fn, void *cb_arg) { - struct evsql *evsql = NULL; - - // allocate it - if ((evsql = calloc(1, sizeof(*evsql))) == NULL) - ERROR("calloc"); - - // store - evsql->ev_base = ev_base; - evsql->error_fn = error_fn; - evsql->cb_arg = cb_arg; - - // init - LIST_INIT(&evsql->conn_list); - TAILQ_INIT(&evsql->query_queue); - - // done - return evsql; - -error: - return NULL; -} - -/* - * Start a new connection and add it to the list, it won't be ready until _evsql_evpq_connected is called - */ -static struct evsql_conn *_evsql_conn_new (struct evsql *evsql) { - struct evsql_conn *conn = NULL; - - // allocate - if ((conn = calloc(1, sizeof(*conn))) == NULL) - ERROR("calloc"); - - // init - conn->evsql = evsql; - - // connect the engine - switch (evsql->type) { - case EVSQL_EVPQ: - if ((conn->engine.evpq = evpq_connect(evsql->ev_base, evsql->engine_conf.evpq, _evsql_evpq_cb_info, conn)) == NULL) - goto error; - - break; - - default: - FATAL("evsql->type"); - } - - // add it to the list - LIST_INSERT_HEAD(&evsql->conn_list, conn, entry); - - // success - return conn; - -error: - free(conn); - - return NULL; -} - -struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) { - struct evsql *evsql = NULL; - - // base init - if ((evsql = _evsql_new_base (ev_base, error_fn, cb_arg)) == NULL) - goto error; - - // store conf - evsql->engine_conf.evpq = pq_conninfo; - - // pre-create one connection - if (_evsql_conn_new(evsql) == NULL) - goto error; - - // done - return evsql; - -error: - // XXX: more complicated than this? - free(evsql); - - return NULL; -} - -/* - * Checks if the connection is already allocated for some other trans/query. - * - * Returns: - * 0 connection idle, can be allocated - * >1 connection busy - */ -static int _evsql_conn_busy (struct evsql_conn *conn) { - // transactions get the connection to themselves - if (conn->trans) - return 1; - - // if it has a query assigned, it's busy - if (conn->query) - return 1; - - // otherwise, it's all idle - return 0; -} - -/* - * Checks if the connection is ready for use (i.e. _evsql_evpq_connected was called). - * - * The connection should not already have a query running. - * - * Returns - * <0 the connection is not valid (failed, query in progress) - * 0 the connection is still pending, and will become ready at some point - * >0 it's ready - */ -static int _evsql_conn_ready (struct evsql_conn *conn) { - switch (conn->evsql->type) { - case EVSQL_EVPQ: { - enum evpq_state state = evpq_state(conn->engine.evpq); - - switch (state) { - case EVPQ_CONNECT: - return 0; - - case EVPQ_CONNECTED: - return 1; - - case EVPQ_QUERY: - case EVPQ_INIT: - case EVPQ_FAILURE: - return -1; - - default: - FATAL("evpq_state: %d", state); - } - - } - - default: - FATAL("evsql->type: %d", conn->evsql->type); - } -} - -/* - * Allocate a connection for use and return it via *conn_ptr, or if may_queue is nonzero and the connection pool is - * getting full, return NULL (query should be queued). - * - * Note that the returned connection might not be ready for use yet (if we created a new one, see _evsql_conn_ready). - * - * Returns zero if a connection was found or the request should be queued, or nonzero if something failed and the - * request should be dropped. - */ -static int _evsql_conn_get (struct evsql *evsql, struct evsql_conn **conn_ptr, int may_queue) { - int have_nontrans = 0; - *conn_ptr = NULL; - - // find a connection that isn't busy and is ready (unless the query queue is empty). - LIST_FOREACH(*conn_ptr, &evsql->conn_list, entry) { - // we can only have a query enqueue itself if there is a non-trans conn it can later use - if (!(*conn_ptr)->trans) - have_nontrans = 1; - - // skip busy conns always - if (_evsql_conn_busy(*conn_ptr)) - continue; - - // accept pending conns as long as there are NO enqueued queries (might cause deadlock otherwise) - if (_evsql_conn_ready(*conn_ptr) == 0 && TAILQ_EMPTY(&evsql->query_queue)) - break; - - // accept conns that are in a fully ready state - if (_evsql_conn_ready(*conn_ptr) > 0) - break; - } - - // if we found an idle connection, we can just return that right away - if (*conn_ptr) - return 0; - - // return NULL if may_queue and we have a non-trans conn that we can, at some point, use - if (may_queue && have_nontrans) - return 0; - - // we need to open a new connection - if ((*conn_ptr = _evsql_conn_new(evsql)) == NULL) - goto error; - - // good - return 0; -error: - return -1; -} - -struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, evsql_trans_done_cb done_fn, void *cb_arg) { - struct evsql_trans *trans = NULL; - - // allocate it - if ((trans = calloc(1, sizeof(*trans))) == NULL) - ERROR("calloc"); - - // store - trans->evsql = evsql; - trans->ready_fn = ready_fn; - trans->done_fn = done_fn; - trans->cb_arg = cb_arg; - trans->type = type; - - // find a connection - if (_evsql_conn_get(evsql, &trans->conn, 0)) - ERROR("_evsql_conn_get"); - - // associate the conn - trans->conn->trans = trans; - - // is it already ready? - if (_evsql_conn_ready(trans->conn) > 0) { - // call _evsql_trans_conn_ready directly, it will handle cleanup (silently, !error_fn) - if (_evsql_trans_conn_ready(evsql, trans)) { - // return NULL directly - return NULL; - } - - } else { - // otherwise, wait for the conn to be ready - - } - - // and let it pass errors to the user - trans->error_fn = error_fn; - - // ok - return trans; - -error: - free(trans); - - return NULL; -} - -/* - * Validate and allocate the basic stuff for a new query. - */ -static struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) { - struct evsql_query *query = NULL; - - // if it's part of a trans, then make sure the trans is idle - if (trans && trans->query) - ERROR("transaction is busy"); - - // allocate it - if ((query = calloc(1, sizeof(*query))) == NULL) - ERROR("calloc"); - - // store - query->cb_fn = query_fn; - query->cb_arg = cb_arg; - - // success - return query; - -error: - return NULL; -} - -/* - * Handle a new query. - * - * For transactions this will associate the query and then execute it, otherwise this will either find an idle - * connection and send the query, or enqueue it. - */ -static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_trans *trans, struct evsql_query *query, const char *command) { - // transaction queries are handled differently - if (trans) { - // it's an in-transaction query - assert(trans->query == NULL); - - // assign the query - trans->query = query; - - // execute directly - if (_evsql_query_exec(trans->conn, query, command)) { - // ack, fail the transaction - _evsql_trans_fail(trans); - - // caller frees query - goto error; - } - - } else { - struct evsql_conn *conn; - - // find an idle connection - if ((_evsql_conn_get(evsql, &conn, 1))) - ERROR("couldn't allocate a connection for the query"); - - // we must enqueue if no idle conn or the conn is not yet ready - if (conn && _evsql_conn_ready(conn) > 0) { - // execute directly - if (_evsql_query_exec(conn, query, command)) { - // ack, fail the connection - _evsql_conn_fail(conn); - - // make sure we don't deadlock any queries, but if this query got a conn directly, then we shouldn't - // have any queries enqueued anyways - assert(TAILQ_EMPTY(&evsql->query_queue)); - - // caller frees query - goto error; - } - - } else { - // copy the command for later execution - if ((query->command = strdup(command)) == NULL) - ERROR("strdup"); - - // enqueue until some connection pumps the queue - TAILQ_INSERT_TAIL(&evsql->query_queue, query, entry); - } - } - - // ok, good - return 0; - -error: - return -1; -} - -struct evsql_query *evsql_query (struct evsql *evsql, struct evsql_trans *trans, const char *command, evsql_query_cb query_fn, void *cb_arg) { - struct evsql_query *query = NULL; - - // alloc new query - if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL) - goto error; - - // just execute the command string directly - if (_evsql_query_enqueue(evsql, trans, query, command)) - goto error; - - // ok - return query; - -error: - _evsql_query_free(query); - - return NULL; -} - -struct evsql_query *evsql_query_params (struct evsql *evsql, struct evsql_trans *trans, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg) { - struct evsql_query *query = NULL; - const struct evsql_query_param *param; - int idx; - - // alloc new query - if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL) - goto error; - - // count the params - for (param = params->list; param->type; param++) - query->params.count++; - - // allocate the vertical storage for the parameters - if (0 - -// !(query->params.types = calloc(query->params.count, sizeof(Oid))) - || !(query->params.values = calloc(query->params.count, sizeof(char *))) - || !(query->params.lengths = calloc(query->params.count, sizeof(int))) - || !(query->params.formats = calloc(query->params.count, sizeof(int))) - ) - ERROR("calloc"); - - // transform - for (param = params->list, idx = 0; param->type; param++, idx++) { - // `types` stays NULL - // query->params.types[idx] = 0; - - // values - query->params.values[idx] = param->data_raw; - - // lengths - query->params.lengths[idx] = param->length; - - // formats, binary if length is nonzero - query->params.formats[idx] = param->length ? 1 : 0; - } - - // result format - switch (params->result_fmt) { - case EVSQL_FMT_TEXT: - query->params.result_format = 0; break; - - case EVSQL_FMT_BINARY: - query->params.result_format = 1; break; - - default: - FATAL("params.result_fmt: %d", params->result_fmt); - } - - // execute it - if (_evsql_query_enqueue(evsql, trans, query, command)) - goto error; - - // ok - return query; - -error: - _evsql_query_free(query); - - return NULL; -} - -void evsql_query_abort (struct evsql_trans *trans, struct evsql_query *query) { - assert(query); - - if (trans) { - // must be the right query - assert(trans->query == query); - } - - // just strip the callback and wait for it to complete as normal - query->cb_fn = NULL; -} - -void _evsql_trans_commit_res (const struct evsql_result_info *res, void *arg) { - (void) arg; - - assert(res->trans); - - // check for errors - if (res->error) - ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res)); - - // transaction is now done - res->trans->done_fn(res->trans, res->trans->cb_arg); - - // release it - _evsql_trans_release(res->trans); - - // success - return; - -error: - _evsql_trans_fail(res->trans); -} - -int evsql_trans_commit (struct evsql_trans *trans) { - static const char *sql = "COMMIT TRANSACTION"; - - if (trans->query) - ERROR("cannot COMMIT because transaction is still busy"); - - // query - if (evsql_query(trans->evsql, trans, sql, _evsql_trans_commit_res, NULL) == NULL) - goto error; - - // mark it as commited in case someone wants to abort it - trans->has_commit = 1; - - // success - return 0; - -error: - return -1; -} - -void _evsql_trans_rollback_res (const struct evsql_result_info *res, void *arg) { - (void) arg; - - assert(res->trans); - - // fail the connection on errors - if (res->error) - ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res)); - - // release it - _evsql_trans_release(res->trans); - - // success - return; - -error: - // fail the connection too, errors are supressed - _evsql_trans_fail(res->trans); -} - -/* - * Used as the ready_fn callback in case of abort, otherwise directly - */ -void _evsql_trans_rollback (struct evsql_trans *trans, void *unused) { - static const char *sql = "ROLLBACK TRANSACTION"; - - (void) unused; - - // query - if (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, NULL) == NULL) { - // fail the transaction/connection - _evsql_trans_fail(trans); - } - -} - -void evsql_trans_abort (struct evsql_trans *trans) { - // supress errors - trans->error_fn = NULL; - - if (trans->has_commit) { - // abort after commit doesn't make sense - FATAL("transaction was already commited"); - } - - if (trans->query) { - // gah, some query is running - WARNING("aborting pending query"); - - // prepare to rollback once complete - trans->ready_fn = _evsql_trans_rollback; - - // abort - evsql_query_abort(trans, trans->query); - - } else { - // just rollback directly - _evsql_trans_rollback(trans, NULL); - - } -} - diff -r e944453ca924 -r 5de62ca9a5aa src/evsql/evsql.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/evsql/evsql.c Thu Oct 16 22:04:53 2008 +0300 @@ -0,0 +1,1017 @@ +#define _GNU_SOURCE +#include +#include +#include + +#include "evsql.h" +#include "../lib/log.h" +#include "../lib/error.h" +#include "../lib/misc.h" + +/* + * A couple function prototypes + */ +static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn); + +/* + * Actually execute the given query. + * + * The backend should be able to accept the query at this time. + * + * You should assume that if trying to execute a query fails, then the connection should also be considred as failed. + */ +static int _evsql_query_exec (struct evsql_conn *conn, struct evsql_query *query, const char *command) { + int err; + + switch (conn->evsql->type) { + case EVSQL_EVPQ: + // got params? + if (query->params.count) { + err = evpq_query_params(conn->engine.evpq, command, + query->params.count, + query->params.types, + query->params.values, + query->params.lengths, + query->params.formats, + query->params.result_format + ); + + } else { + // plain 'ole query + err = evpq_query(conn->engine.evpq, command); + } + + if (err) { + if (PQstatus(evpq_pgconn(conn->engine.evpq)) != CONNECTION_OK) + WARNING("conn failed"); + else + WARNING("query failed, dropping conn as well"); + } + + break; + + default: + FATAL("evsql->type"); + } + + if (!err) + // assign the query + conn->query = query; + + return err; +} + +/* + * Free the query and related resources, doesn't trigger any callbacks or remove from any queues. + * + * The command should already be taken care of (NULL). + */ +static void _evsql_query_free (struct evsql_query *query) { + if (!query) + return; + + assert(query->command == NULL); + + // free params if present + free(query->params.types); + free(query->params.values); + free(query->params.lengths); + free(query->params.formats); + + // free the query itself + free(query); +} + +/* + * Execute the callback if res is given, and free the query. + * + * The query has been aborted, it will simply be freed + */ +static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *res) { + if (res) { + if (query->cb_fn) + // call the callback + query->cb_fn(res, query->cb_arg); + else + WARNING("supressing cb_fn because query was aborted"); + } + + // free + _evsql_query_free(query); +} + +/* + * XXX: + * / +static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *res) { + struct evsql_query *query; + + // clear the queue + while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) { + _evsql_query_done(query, res); + + TAILQ_REMOVE(&evsql->query_queue, query, entry); + } + + // free + free(evsql); +} +*/ + +/* + * Free the transaction, it should already be deassociated from the query and conn. + */ +static void _evsql_trans_free (struct evsql_trans *trans) { + // ensure we don't leak anything + assert(trans->query == NULL); + assert(trans->conn == NULL); + + // free + free(trans); +} + +/* + * Release a connection. It should already be deassociated from the trans and query. + * + * Releases the engine, removes from the conn_list and frees this. + */ +static void _evsql_conn_release (struct evsql_conn *conn) { + // ensure we don't leak anything + assert(conn->trans == NULL); + assert(conn->query == NULL); + + // release the engine + switch (conn->evsql->type) { + case EVSQL_EVPQ: + evpq_release(conn->engine.evpq); + break; + + default: + FATAL("evsql->type"); + } + + // remove from list + LIST_REMOVE(conn, entry); + + // catch deadlocks + assert(!LIST_EMPTY(&conn->evsql->conn_list) || TAILQ_EMPTY(&conn->evsql->query_queue)); + + // free + free(conn); +} + +/* + * Release a transaction, it should already be deassociated from the query. + * + * Perform a two-way-deassociation with the conn, and then free the trans. + */ +static void _evsql_trans_release (struct evsql_trans *trans) { + assert(trans->query == NULL); + assert(trans->conn != NULL); + + // deassociate the conn + trans->conn->trans = NULL; trans->conn = NULL; + + // free the trans + _evsql_trans_free(trans); +} + +/* + * Fail a single query, this will trigger the callback and free it. + * + * NOTE: Only for *TRANSACTIONLESS* queries. + */ +static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) { + struct evsql_result_info res; ZINIT(res); + + // set up the result_info + res.evsql = evsql; + res.trans = NULL; + res.error = 1; + + // finish off the query + _evsql_query_done(query, &res); +} + +/* + * Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the + * conn, and then free the trans. + */ +static void _evsql_trans_fail (struct evsql_trans *trans) { + if (trans->query) { + // free the query silently + _evsql_query_free(trans->query); trans->query = NULL; + + // also deassociate it from the conn! + trans->conn->query = NULL; + } + + // tell the user + // XXX: trans is in a bad state during this call + if (trans->error_fn) + trans->error_fn(trans, trans->cb_arg); + else + WARNING("supressing error because error_fn was NULL"); + + // deassociate and release the conn + trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL; + + // pump the queue for requests that were waiting for this connection + _evsql_pump(trans->evsql, NULL); + + // free the trans + _evsql_trans_free(trans); +} + +/* + * Fail a connection. If the connection is transactional, this will just call _evsql_trans_fail, but otherwise it will + * fail any ongoing query, and then release the connection. + */ +static void _evsql_conn_fail (struct evsql_conn *conn) { + if (conn->trans) { + // let transactions handle their connection failures + _evsql_trans_fail(conn->trans); + + } else { + if (conn->query) { + // fail the in-progress query + _evsql_query_fail(conn->evsql, conn->query); conn->query = NULL; + } + + // finish off the whole connection + _evsql_conn_release(conn); + } +} + +/* + * Processes enqueued non-transactional queries until the queue is empty, or we managed to exec a query. + * + * If execing a query on a connection fails, both the query and the connection are failed (in that order). + * + * Any further queries will then also be failed, because there's no reconnection/retry logic yet. + * + * This means that if conn is NULL, all queries are failed. + */ +static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) { + struct evsql_query *query; + int err; + + // look for waiting queries + while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) { + // dequeue + TAILQ_REMOVE(&evsql->query_queue, query, entry); + + if (conn) { + // try and execute it + err = _evsql_query_exec(conn, query, query->command); + } + + // free the command buf + free(query->command); query->command = NULL; + + if (err || !conn) { + if (!conn) { + // warn when dropping queries + WARNING("failing query becuse there are no conns"); + } + + // fail the query + _evsql_query_fail(evsql, query); + + if (conn) { + // fail the connection + WARNING("failing the connection because a query-exec failed"); + + _evsql_conn_fail(conn); conn = NULL; + } + + } else { + // we have succesfully enqueued a query, and we can wait for this connection to complete + break; + + } + + // handle the rest of the queue + } + + // ok + return; +} + +/* + * Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use. + */ +static void _evsql_trans_ready (const struct evsql_result_info *res, void *arg) { + (void) arg; + + assert(res->trans); + + // check for errors + if (res->error) + ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res)); + + // transaction is now ready for use + res->trans->ready_fn(res->trans, res->trans->cb_arg); + + // good + return; + +error: + _evsql_trans_fail(res->trans); +} + +/* + * The transaction's connection is ready, send the 'BEGIN' query. + * + * If anything fails, calls _evsql_trans_fail and returns nonzero, zero on success + */ +static int _evsql_trans_conn_ready (struct evsql *evsql, struct evsql_trans *trans) { + char trans_sql[EVSQL_QUERY_BEGIN_BUF]; + const char *isolation_level; + int ret; + + // determine the isolation_level to use + switch (trans->type) { + case EVSQL_TRANS_DEFAULT: + isolation_level = NULL; break; + + case EVSQL_TRANS_SERIALIZABLE: + isolation_level = "SERIALIZABLE"; break; + + case EVSQL_TRANS_REPEATABLE_READ: + isolation_level = "REPEATABLE READ"; break; + + case EVSQL_TRANS_READ_COMMITTED: + isolation_level = "READ COMMITTED"; break; + + case EVSQL_TRANS_READ_UNCOMMITTED: + isolation_level = "READ UNCOMMITTED"; break; + + default: + FATAL("trans->type: %d", trans->type); + } + + // build the trans_sql + if (isolation_level) + ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION ISOLATION LEVEL %s", isolation_level); + else + ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION"); + + // make sure it wasn't truncated + if (ret >= EVSQL_QUERY_BEGIN_BUF) + ERROR("trans_sql overflow: %d >= %d", ret, EVSQL_QUERY_BEGIN_BUF); + + // execute the query + if (evsql_query(evsql, trans, trans_sql, _evsql_trans_ready, NULL) == NULL) + ERROR("evsql_query"); + + // success + return 0; + +error: + // fail the transaction + _evsql_trans_fail(trans); + + return -1; +} + +/* + * The evpq connection was succesfully established. + */ +static void _evsql_evpq_connected (struct evpq_conn *_conn, void *arg) { + struct evsql_conn *conn = arg; + + if (conn->trans) + // notify the transaction + // don't care about errors + (void) _evsql_trans_conn_ready(conn->evsql, conn->trans); + + else + // pump any waiting transactionless queries + _evsql_pump(conn->evsql, conn); +} + +/* + * Got one result on this evpq connection. + */ +static void _evsql_evpq_result (struct evpq_conn *_conn, PGresult *result, void *arg) { + struct evsql_conn *conn = arg; + struct evsql_query *query = conn->query; + + assert(query != NULL); + + // if we get multiple results, only return the first one + if (query->result.evpq) { + WARNING("[evsql] evpq query returned multiple results, discarding previous one"); + + PQclear(query->result.evpq); query->result.evpq = NULL; + } + + // remember the result + query->result.evpq = result; +} + +/* + * No more results for this query. + */ +static void _evsql_evpq_done (struct evpq_conn *_conn, void *arg) { + struct evsql_conn *conn = arg; + struct evsql_query *query = conn->query; + struct evsql_result_info res; ZINIT(res); + + assert(query != NULL); + + // set up the result_info + res.evsql = conn->evsql; + res.trans = conn->trans; + + if (query->result.evpq == NULL) { + // if a query didn't return any results (bug?), warn and fail the query + WARNING("[evsql] evpq query didn't return any results"); + + res.error = 1; + + } else if (strcmp(PQresultErrorMessage(query->result.evpq), "") != 0) { + // the query failed with some error + res.error = 1; + res.result.pq = query->result.evpq; + + } else { + res.error = 0; + res.result.pq = query->result.evpq; + + } + + // de-associate the query from the connection + conn->query = NULL; + + // how we handle query completion depends on if we're a transaction or not + if (conn->trans) { + // we can deassign the trans's query + conn->trans->query = NULL; + + // was an abort? + if (!query->cb_fn) + // notify the user that the transaction query has been aborted + conn->trans->ready_fn(conn->trans, conn->trans->cb_arg); + + // then hand the query to the user + _evsql_query_done(query, &res); + + } else { + // a transactionless query, so just finish it off and pump any other waiting ones + _evsql_query_done(query, &res); + + // pump the next one + _evsql_pump(conn->evsql, conn); + } +} + +/* + * The connection failed. + */ +static void _evsql_evpq_failure (struct evpq_conn *_conn, void *arg) { + struct evsql_conn *conn = arg; + + // just fail the conn + _evsql_conn_fail(conn); +} + +/* + * Our evpq behaviour + */ +static struct evpq_callback_info _evsql_evpq_cb_info = { + .fn_connected = _evsql_evpq_connected, + .fn_result = _evsql_evpq_result, + .fn_done = _evsql_evpq_done, + .fn_failure = _evsql_evpq_failure, +}; + +/* + * Allocate the generic evsql context. + */ +static struct evsql *_evsql_new_base (struct event_base *ev_base, evsql_error_cb error_fn, void *cb_arg) { + struct evsql *evsql = NULL; + + // allocate it + if ((evsql = calloc(1, sizeof(*evsql))) == NULL) + ERROR("calloc"); + + // store + evsql->ev_base = ev_base; + evsql->error_fn = error_fn; + evsql->cb_arg = cb_arg; + + // init + LIST_INIT(&evsql->conn_list); + TAILQ_INIT(&evsql->query_queue); + + // done + return evsql; + +error: + return NULL; +} + +/* + * Start a new connection and add it to the list, it won't be ready until _evsql_evpq_connected is called + */ +static struct evsql_conn *_evsql_conn_new (struct evsql *evsql) { + struct evsql_conn *conn = NULL; + + // allocate + if ((conn = calloc(1, sizeof(*conn))) == NULL) + ERROR("calloc"); + + // init + conn->evsql = evsql; + + // connect the engine + switch (evsql->type) { + case EVSQL_EVPQ: + if ((conn->engine.evpq = evpq_connect(evsql->ev_base, evsql->engine_conf.evpq, _evsql_evpq_cb_info, conn)) == NULL) + goto error; + + break; + + default: + FATAL("evsql->type"); + } + + // add it to the list + LIST_INSERT_HEAD(&evsql->conn_list, conn, entry); + + // success + return conn; + +error: + free(conn); + + return NULL; +} + +struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) { + struct evsql *evsql = NULL; + + // base init + if ((evsql = _evsql_new_base (ev_base, error_fn, cb_arg)) == NULL) + goto error; + + // store conf + evsql->engine_conf.evpq = pq_conninfo; + + // pre-create one connection + if (_evsql_conn_new(evsql) == NULL) + goto error; + + // done + return evsql; + +error: + // XXX: more complicated than this? + free(evsql); + + return NULL; +} + +/* + * Checks if the connection is already allocated for some other trans/query. + * + * Returns: + * 0 connection idle, can be allocated + * >1 connection busy + */ +static int _evsql_conn_busy (struct evsql_conn *conn) { + // transactions get the connection to themselves + if (conn->trans) + return 1; + + // if it has a query assigned, it's busy + if (conn->query) + return 1; + + // otherwise, it's all idle + return 0; +} + +/* + * Checks if the connection is ready for use (i.e. _evsql_evpq_connected was called). + * + * The connection should not already have a query running. + * + * Returns + * <0 the connection is not valid (failed, query in progress) + * 0 the connection is still pending, and will become ready at some point + * >0 it's ready + */ +static int _evsql_conn_ready (struct evsql_conn *conn) { + switch (conn->evsql->type) { + case EVSQL_EVPQ: { + enum evpq_state state = evpq_state(conn->engine.evpq); + + switch (state) { + case EVPQ_CONNECT: + return 0; + + case EVPQ_CONNECTED: + return 1; + + case EVPQ_QUERY: + case EVPQ_INIT: + case EVPQ_FAILURE: + return -1; + + default: + FATAL("evpq_state: %d", state); + } + + } + + default: + FATAL("evsql->type: %d", conn->evsql->type); + } +} + +/* + * Allocate a connection for use and return it via *conn_ptr, or if may_queue is nonzero and the connection pool is + * getting full, return NULL (query should be queued). + * + * Note that the returned connection might not be ready for use yet (if we created a new one, see _evsql_conn_ready). + * + * Returns zero if a connection was found or the request should be queued, or nonzero if something failed and the + * request should be dropped. + */ +static int _evsql_conn_get (struct evsql *evsql, struct evsql_conn **conn_ptr, int may_queue) { + int have_nontrans = 0; + *conn_ptr = NULL; + + // find a connection that isn't busy and is ready (unless the query queue is empty). + LIST_FOREACH(*conn_ptr, &evsql->conn_list, entry) { + // we can only have a query enqueue itself if there is a non-trans conn it can later use + if (!(*conn_ptr)->trans) + have_nontrans = 1; + + // skip busy conns always + if (_evsql_conn_busy(*conn_ptr)) + continue; + + // accept pending conns as long as there are NO enqueued queries (might cause deadlock otherwise) + if (_evsql_conn_ready(*conn_ptr) == 0 && TAILQ_EMPTY(&evsql->query_queue)) + break; + + // accept conns that are in a fully ready state + if (_evsql_conn_ready(*conn_ptr) > 0) + break; + } + + // if we found an idle connection, we can just return that right away + if (*conn_ptr) + return 0; + + // return NULL if may_queue and we have a non-trans conn that we can, at some point, use + if (may_queue && have_nontrans) + return 0; + + // we need to open a new connection + if ((*conn_ptr = _evsql_conn_new(evsql)) == NULL) + goto error; + + // good + return 0; +error: + return -1; +} + +struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, evsql_trans_done_cb done_fn, void *cb_arg) { + struct evsql_trans *trans = NULL; + + // allocate it + if ((trans = calloc(1, sizeof(*trans))) == NULL) + ERROR("calloc"); + + // store + trans->evsql = evsql; + trans->ready_fn = ready_fn; + trans->done_fn = done_fn; + trans->cb_arg = cb_arg; + trans->type = type; + + // find a connection + if (_evsql_conn_get(evsql, &trans->conn, 0)) + ERROR("_evsql_conn_get"); + + // associate the conn + trans->conn->trans = trans; + + // is it already ready? + if (_evsql_conn_ready(trans->conn) > 0) { + // call _evsql_trans_conn_ready directly, it will handle cleanup (silently, !error_fn) + if (_evsql_trans_conn_ready(evsql, trans)) { + // return NULL directly + return NULL; + } + + } else { + // otherwise, wait for the conn to be ready + + } + + // and let it pass errors to the user + trans->error_fn = error_fn; + + // ok + return trans; + +error: + free(trans); + + return NULL; +} + +/* + * Validate and allocate the basic stuff for a new query. + */ +static struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) { + struct evsql_query *query = NULL; + + // if it's part of a trans, then make sure the trans is idle + if (trans && trans->query) + ERROR("transaction is busy"); + + // allocate it + if ((query = calloc(1, sizeof(*query))) == NULL) + ERROR("calloc"); + + // store + query->cb_fn = query_fn; + query->cb_arg = cb_arg; + + // success + return query; + +error: + return NULL; +} + +/* + * Handle a new query. + * + * For transactions this will associate the query and then execute it, otherwise this will either find an idle + * connection and send the query, or enqueue it. + */ +static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_trans *trans, struct evsql_query *query, const char *command) { + // transaction queries are handled differently + if (trans) { + // it's an in-transaction query + assert(trans->query == NULL); + + // assign the query + trans->query = query; + + // execute directly + if (_evsql_query_exec(trans->conn, query, command)) { + // ack, fail the transaction + _evsql_trans_fail(trans); + + // caller frees query + goto error; + } + + } else { + struct evsql_conn *conn; + + // find an idle connection + if ((_evsql_conn_get(evsql, &conn, 1))) + ERROR("couldn't allocate a connection for the query"); + + // we must enqueue if no idle conn or the conn is not yet ready + if (conn && _evsql_conn_ready(conn) > 0) { + // execute directly + if (_evsql_query_exec(conn, query, command)) { + // ack, fail the connection + _evsql_conn_fail(conn); + + // make sure we don't deadlock any queries, but if this query got a conn directly, then we shouldn't + // have any queries enqueued anyways + assert(TAILQ_EMPTY(&evsql->query_queue)); + + // caller frees query + goto error; + } + + } else { + // copy the command for later execution + if ((query->command = strdup(command)) == NULL) + ERROR("strdup"); + + // enqueue until some connection pumps the queue + TAILQ_INSERT_TAIL(&evsql->query_queue, query, entry); + } + } + + // ok, good + return 0; + +error: + return -1; +} + +struct evsql_query *evsql_query (struct evsql *evsql, struct evsql_trans *trans, const char *command, evsql_query_cb query_fn, void *cb_arg) { + struct evsql_query *query = NULL; + + // alloc new query + if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL) + goto error; + + // just execute the command string directly + if (_evsql_query_enqueue(evsql, trans, query, command)) + goto error; + + // ok + return query; + +error: + _evsql_query_free(query); + + return NULL; +} + +struct evsql_query *evsql_query_params (struct evsql *evsql, struct evsql_trans *trans, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg) { + struct evsql_query *query = NULL; + const struct evsql_query_param *param; + int idx; + + // alloc new query + if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL) + goto error; + + // count the params + for (param = params->list; param->type; param++) + query->params.count++; + + // allocate the vertical storage for the parameters + if (0 + +// !(query->params.types = calloc(query->params.count, sizeof(Oid))) + || !(query->params.values = calloc(query->params.count, sizeof(char *))) + || !(query->params.lengths = calloc(query->params.count, sizeof(int))) + || !(query->params.formats = calloc(query->params.count, sizeof(int))) + ) + ERROR("calloc"); + + // transform + for (param = params->list, idx = 0; param->type; param++, idx++) { + // `types` stays NULL + // query->params.types[idx] = 0; + + // values + query->params.values[idx] = param->data_raw; + + // lengths + query->params.lengths[idx] = param->length; + + // formats, binary if length is nonzero + query->params.formats[idx] = param->length ? 1 : 0; + } + + // result format + switch (params->result_fmt) { + case EVSQL_FMT_TEXT: + query->params.result_format = 0; break; + + case EVSQL_FMT_BINARY: + query->params.result_format = 1; break; + + default: + FATAL("params.result_fmt: %d", params->result_fmt); + } + + // execute it + if (_evsql_query_enqueue(evsql, trans, query, command)) + goto error; + + // ok + return query; + +error: + _evsql_query_free(query); + + return NULL; +} + +void evsql_query_abort (struct evsql_trans *trans, struct evsql_query *query) { + assert(query); + + if (trans) { + // must be the right query + assert(trans->query == query); + } + + // just strip the callback and wait for it to complete as normal + query->cb_fn = NULL; +} + +void _evsql_trans_commit_res (const struct evsql_result_info *res, void *arg) { + (void) arg; + + assert(res->trans); + + // check for errors + if (res->error) + ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res)); + + // transaction is now done + res->trans->done_fn(res->trans, res->trans->cb_arg); + + // release it + _evsql_trans_release(res->trans); + + // success + return; + +error: + _evsql_trans_fail(res->trans); +} + +int evsql_trans_commit (struct evsql_trans *trans) { + static const char *sql = "COMMIT TRANSACTION"; + + if (trans->query) + ERROR("cannot COMMIT because transaction is still busy"); + + // query + if (evsql_query(trans->evsql, trans, sql, _evsql_trans_commit_res, NULL) == NULL) + goto error; + + // mark it as commited in case someone wants to abort it + trans->has_commit = 1; + + // success + return 0; + +error: + return -1; +} + +void _evsql_trans_rollback_res (const struct evsql_result_info *res, void *arg) { + (void) arg; + + assert(res->trans); + + // fail the connection on errors + if (res->error) + ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res)); + + // release it + _evsql_trans_release(res->trans); + + // success + return; + +error: + // fail the connection too, errors are supressed + _evsql_trans_fail(res->trans); +} + +/* + * Used as the ready_fn callback in case of abort, otherwise directly + */ +void _evsql_trans_rollback (struct evsql_trans *trans, void *unused) { + static const char *sql = "ROLLBACK TRANSACTION"; + + (void) unused; + + // query + if (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, NULL) == NULL) { + // fail the transaction/connection + _evsql_trans_fail(trans); + } + +} + +void evsql_trans_abort (struct evsql_trans *trans) { + // supress errors + trans->error_fn = NULL; + + if (trans->has_commit) { + // abort after commit doesn't make sense + FATAL("transaction was already commited"); + } + + if (trans->query) { + // gah, some query is running + WARNING("aborting pending query"); + + // prepare to rollback once complete + trans->ready_fn = _evsql_trans_rollback; + + // abort + evsql_query_abort(trans, trans->query); + + } else { + // just rollback directly + _evsql_trans_rollback(trans, NULL); + + } +} + diff -r e944453ca924 -r 5de62ca9a5aa src/evsql/evsql.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/evsql/evsql.h Thu Oct 16 22:04:53 2008 +0300 @@ -0,0 +1,136 @@ +#ifndef EVSQL_INTERNAL_H +#define EVSQL_INTERNAL_H + +/* + * Internal interfaces + */ + +#include + +#include + +#include "../evsql.h" +#include "../evpq.h" + +/* + * The engine type + */ +enum evsql_type { + EVSQL_EVPQ, // evpq +}; + +/* + * Contains the type, engine configuration, list of connections and waiting query queue. + */ +struct evsql { + // what event_base to use + struct event_base *ev_base; + + // what engine we use + enum evsql_type type; + + // callbacks + evsql_error_cb error_fn; + void *cb_arg; + + // engine-specific connection configuration + union { + const char *evpq; + } engine_conf; + + // list of connections that are open + LIST_HEAD(evsql_conn_list, evsql_conn) conn_list; + + // list of queries running or waiting to run + TAILQ_HEAD(evsql_query_queue, evsql_query) query_queue; +}; + +/* + * A single connection to the server. + * + * Contains the engine connection, may have a transaction associated, and may have a query associated. + */ +struct evsql_conn { + // evsql we belong to + struct evsql *evsql; + + // engine-specific connection info + union { + struct evpq_conn *evpq; + } engine; + + // our position in the conn list + LIST_ENTRY(evsql_conn) entry; + + // are we running a transaction? + struct evsql_trans *trans; + + // are we running a transactionless query? + struct evsql_query *query; +}; + +/* + * A single transaction. + * + * Has a connection associated and possibly a query (which will also be associated with the connection) + */ +struct evsql_trans { + // our evsql_conn/evsql + struct evsql *evsql; + struct evsql_conn *conn; + + // callbacks + evsql_trans_error_cb error_fn; + evsql_trans_ready_cb ready_fn; + evsql_trans_done_cb done_fn; + void *cb_arg; + + // the transaction type + enum evsql_trans_type type; + + // has evsql_trans_commit be called? + int has_commit : 1; + + // our current query + struct evsql_query *query; + +}; + +/* + * A single query. + * + * Has the info needed to exec the query (as these may be queued), and the callback/result info. + */ +struct evsql_query { + // the actual SQL query, this may or may not be ours, see _evsql_query_exec + char *command; + + // possible query params + struct evsql_query_param_info { + int count; + + Oid *types; + const char **values; + int *lengths; + int *formats; + + int result_format; + } params; + + // our callback + evsql_query_cb cb_fn; + void *cb_arg; + + // our position in the query list + TAILQ_ENTRY(evsql_query) entry; + + // the result + union { + PGresult *evpq; + } result; +}; + +// maximum length for a 'BEGIN TRANSACTION ...' query +#define EVSQL_QUERY_BEGIN_BUF 512 + +#endif /* EVSQL_INTERNAL_H */ diff -r e944453ca924 -r 5de62ca9a5aa src/evsql/util.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/evsql/util.c Thu Oct 16 22:04:53 2008 +0300 @@ -0,0 +1,199 @@ +#include + +#include "evsql.h" +#include "../lib/error.h" +#include "../lib/misc.h" + +int evsql_param_string (struct evsql_query_params *params, size_t param, const char *ptr) { + struct evsql_query_param *p = ¶ms->list[param]; + + assert(p->type == EVSQL_PARAM_STRING); + + p->data_raw = ptr; + p->length = 0; + + return 0; +} + +int evsql_param_uint32 (struct evsql_query_params *params, size_t param, uint32_t uval) { + struct evsql_query_param *p = ¶ms->list[param]; + + assert(p->type == EVSQL_PARAM_UINT32); + + p->data.uint32 = htonl(uval); + p->data_raw = (const char *) &p->data.uint32; + p->length = sizeof(uval); + + return 0; +} + +const char *evsql_result_error (const struct evsql_result_info *res) { + if (!res->error) + return "No error"; + + switch (res->evsql->type) { + case EVSQL_EVPQ: + if (!res->result.pq) + return "unknown error (no result)"; + + return PQresultErrorMessage(res->result.pq); + + default: + FATAL("res->evsql->type"); + } + +} + +size_t evsql_result_rows (const struct evsql_result_info *res) { + switch (res->evsql->type) { + case EVSQL_EVPQ: + return PQntuples(res->result.pq); + + default: + FATAL("res->evsql->type"); + } +} + +size_t evsql_result_cols (const struct evsql_result_info *res) { + switch (res->evsql->type) { + case EVSQL_EVPQ: + return PQnfields(res->result.pq); + + default: + FATAL("res->evsql->type"); + } +} + +int evsql_result_binary (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, size_t size, int nullok) { + *ptr = NULL; + + switch (res->evsql->type) { + case EVSQL_EVPQ: + if (PQgetisnull(res->result.pq, row, col)) { + if (nullok) + return 0; + else + ERROR("[%zu:%zu] field is null", row, col); + } + + if (PQfformat(res->result.pq, col) != 1) + ERROR("[%zu:%zu] PQfformat is not binary: %d", row, col, PQfformat(res->result.pq, col)); + + if (size && PQgetlength(res->result.pq, row, col) != size) + ERROR("[%zu:%zu] field size mismatch: %zu -> %d", row, col, size, PQgetlength(res->result.pq, row, col)); + + *ptr = PQgetvalue(res->result.pq, row, col); + + return 0; + + default: + FATAL("res->evsql->type"); + } + +error: + return -1; +} + +int evsql_result_string (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, int nullok) { + return evsql_result_binary(res, row, col, ptr, 0, nullok); +} + +int evsql_result_uint16 (const struct evsql_result_info *res, size_t row, size_t col, uint16_t *uval, int nullok) { + const char *data; + int16_t sval; + + if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok)) + goto error; + + if (!data) + return 0; + + sval = ntohs(*((int16_t *) data)); + + if (sval < 0) + ERROR("negative value for unsigned: %d", sval); + + *uval = sval; + + return 0; + +error: + return nullok ? 0 : -1; +} + +int evsql_result_uint32 (const struct evsql_result_info *res, size_t row, size_t col, uint32_t *uval, int nullok) { + const char *data; + int32_t sval; + + if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok)) + goto error; + + if (!data) + return 0; + + sval = ntohl(*(int32_t *) data); + + if (sval < 0) + ERROR("negative value for unsigned: %d", sval); + + *uval = sval; + + return 0; + +error: + return nullok ? 0 : -1; +} + +int evsql_result_uint64 (const struct evsql_result_info *res, size_t row, size_t col, uint64_t *uval, int nullok) { + const char *data; + int64_t sval; + + if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok)) + goto error; + + if (!data) + return 0; + + sval = ntohq(*(int64_t *) data); + + if (sval < 0) + ERROR("negative value for unsigned: %ld", sval); + + *uval = sval; + + return 0; + +error: + return nullok ? 0 : -1; +} + +void evsql_result_free (const struct evsql_result_info *res) { + switch (res->evsql->type) { + case EVSQL_EVPQ: + return PQclear(res->result.pq); + + default: + FATAL("res->evsql->type"); + } +} + +const char *evsql_conn_error (struct evsql_conn *conn) { + switch (conn->evsql->type) { + case EVSQL_EVPQ: + if (!conn->engine.evpq) + return "unknown error (no conn)"; + + return evpq_error_message(conn->engine.evpq); + + default: + FATAL("res->evsql->type"); + } +} + +const char *evsql_trans_error (struct evsql_trans *trans) { + if (trans->conn == NULL) + return "unknown error (no trans conn)"; + + return evsql_conn_error(trans->conn); +} + diff -r e944453ca924 -r 5de62ca9a5aa src/evsql_internal.h --- a/src/evsql_internal.h Wed Oct 15 01:14:22 2008 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,129 +0,0 @@ -#ifndef EVSQL_INTERNAL_H -#define EVSQL_INTERNAL_H - -#include - -#include "evsql.h" - -/* - * The engine type - */ -enum evsql_type { - EVSQL_EVPQ, // evpq -}; - -/* - * Contains the type, engine configuration, list of connections and waiting query queue. - */ -struct evsql { - // what event_base to use - struct event_base *ev_base; - - // what engine we use - enum evsql_type type; - - // callbacks - evsql_error_cb error_fn; - void *cb_arg; - - // engine-specific connection configuration - union { - const char *evpq; - } engine_conf; - - // list of connections that are open - LIST_HEAD(evsql_conn_list, evsql_conn) conn_list; - - // list of queries running or waiting to run - TAILQ_HEAD(evsql_query_queue, evsql_query) query_queue; -}; - -/* - * A single connection to the server. - * - * Contains the engine connection, may have a transaction associated, and may have a query associated. - */ -struct evsql_conn { - // evsql we belong to - struct evsql *evsql; - - // engine-specific connection info - union { - struct evpq_conn *evpq; - } engine; - - // our position in the conn list - LIST_ENTRY(evsql_conn) entry; - - // are we running a transaction? - struct evsql_trans *trans; - - // are we running a transactionless query? - struct evsql_query *query; -}; - -/* - * A single transaction. - * - * Has a connection associated and possibly a query (which will also be associated with the connection) - */ -struct evsql_trans { - // our evsql_conn/evsql - struct evsql *evsql; - struct evsql_conn *conn; - - // callbacks - evsql_trans_error_cb error_fn; - evsql_trans_ready_cb ready_fn; - evsql_trans_done_cb done_fn; - void *cb_arg; - - // the transaction type - enum evsql_trans_type type; - - // has evsql_trans_commit be called? - int has_commit : 1; - - // our current query - struct evsql_query *query; - -}; - -/* - * A single query. - * - * Has the info needed to exec the query (as these may be queued), and the callback/result info. - */ -struct evsql_query { - // the actual SQL query, this may or may not be ours, see _evsql_query_exec - char *command; - - // possible query params - struct evsql_query_param_info { - int count; - - Oid *types; - const char **values; - int *lengths; - int *formats; - - int result_format; - } params; - - // our callback - evsql_query_cb cb_fn; - void *cb_arg; - - // our position in the query list - TAILQ_ENTRY(evsql_query) entry; - - // the result - union { - PGresult *evpq; - } result; -}; - -// maximum length for a 'BEGIN TRANSACTION ...' query -#define EVSQL_QUERY_BEGIN_BUF 512 - -#endif /* EVSQL_INTERNAL_H */ diff -r e944453ca924 -r 5de62ca9a5aa src/evsql_util.c --- a/src/evsql_util.c Wed Oct 15 01:14:22 2008 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,201 +0,0 @@ -#include - -#include "evsql.h" -#include "evsql_internal.h" -#include "evpq.h" -#include "lib/error.h" -#include "lib/misc.h" - -int evsql_param_string (struct evsql_query_params *params, size_t param, const char *ptr) { - struct evsql_query_param *p = ¶ms->list[param]; - - assert(p->type == EVSQL_PARAM_STRING); - - p->data_raw = ptr; - p->length = 0; - - return 0; -} - -int evsql_param_uint32 (struct evsql_query_params *params, size_t param, uint32_t uval) { - struct evsql_query_param *p = ¶ms->list[param]; - - assert(p->type == EVSQL_PARAM_UINT32); - - p->data.uint32 = htonl(uval); - p->data_raw = (const char *) &p->data.uint32; - p->length = sizeof(uval); - - return 0; -} - -const char *evsql_result_error (const struct evsql_result_info *res) { - if (!res->error) - return "No error"; - - switch (res->evsql->type) { - case EVSQL_EVPQ: - if (!res->result.pq) - return "unknown error (no result)"; - - return PQresultErrorMessage(res->result.pq); - - default: - FATAL("res->evsql->type"); - } - -} - -size_t evsql_result_rows (const struct evsql_result_info *res) { - switch (res->evsql->type) { - case EVSQL_EVPQ: - return PQntuples(res->result.pq); - - default: - FATAL("res->evsql->type"); - } -} - -size_t evsql_result_cols (const struct evsql_result_info *res) { - switch (res->evsql->type) { - case EVSQL_EVPQ: - return PQnfields(res->result.pq); - - default: - FATAL("res->evsql->type"); - } -} - -int evsql_result_binary (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, size_t size, int nullok) { - *ptr = NULL; - - switch (res->evsql->type) { - case EVSQL_EVPQ: - if (PQgetisnull(res->result.pq, row, col)) { - if (nullok) - return 0; - else - ERROR("[%zu:%zu] field is null", row, col); - } - - if (PQfformat(res->result.pq, col) != 1) - ERROR("[%zu:%zu] PQfformat is not binary: %d", row, col, PQfformat(res->result.pq, col)); - - if (size && PQgetlength(res->result.pq, row, col) != size) - ERROR("[%zu:%zu] field size mismatch: %zu -> %d", row, col, size, PQgetlength(res->result.pq, row, col)); - - *ptr = PQgetvalue(res->result.pq, row, col); - - return 0; - - default: - FATAL("res->evsql->type"); - } - -error: - return -1; -} - -int evsql_result_string (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, int nullok) { - return evsql_result_binary(res, row, col, ptr, 0, nullok); -} - -int evsql_result_uint16 (const struct evsql_result_info *res, size_t row, size_t col, uint16_t *uval, int nullok) { - const char *data; - int16_t sval; - - if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok)) - goto error; - - if (!data) - return 0; - - sval = ntohs(*((int16_t *) data)); - - if (sval < 0) - ERROR("negative value for unsigned: %d", sval); - - *uval = sval; - - return 0; - -error: - return nullok ? 0 : -1; -} - -int evsql_result_uint32 (const struct evsql_result_info *res, size_t row, size_t col, uint32_t *uval, int nullok) { - const char *data; - int32_t sval; - - if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok)) - goto error; - - if (!data) - return 0; - - sval = ntohl(*(int32_t *) data); - - if (sval < 0) - ERROR("negative value for unsigned: %d", sval); - - *uval = sval; - - return 0; - -error: - return nullok ? 0 : -1; -} - -int evsql_result_uint64 (const struct evsql_result_info *res, size_t row, size_t col, uint64_t *uval, int nullok) { - const char *data; - int64_t sval; - - if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok)) - goto error; - - if (!data) - return 0; - - sval = ntohq(*(int64_t *) data); - - if (sval < 0) - ERROR("negative value for unsigned: %ld", sval); - - *uval = sval; - - return 0; - -error: - return nullok ? 0 : -1; -} - -void evsql_result_free (const struct evsql_result_info *res) { - switch (res->evsql->type) { - case EVSQL_EVPQ: - return PQclear(res->result.pq); - - default: - FATAL("res->evsql->type"); - } -} - -const char *evsql_conn_error (struct evsql_conn *conn) { - switch (conn->evsql->type) { - case EVSQL_EVPQ: - if (!conn->engine.evpq) - return "unknown error (no conn)"; - - return evpq_error_message(conn->engine.evpq); - - default: - FATAL("res->evsql->type"); - } -} - -const char *evsql_trans_error (struct evsql_trans *trans) { - if (trans->conn == NULL) - return "unknown error (no trans conn)"; - - return evsql_conn_error(trans->conn); -} -