reorganize evsql into a separate dir, rename dbfs slightly, and split dbfs_op out from dirop (in prep for fileop)
--- a/Makefile Wed Oct 15 01:14:22 2008 +0300
+++ b/Makefile Thu Oct 16 22:04:53 2008 +0300
@@ -20,7 +20,7 @@
# complex modules
EVSQL_OBJS = obj/evsql.o obj/evsql_util.o obj/evpq.o
-DBFS_OBJS = obj/dbfs/dbfs.o obj/dbfs/common.o obj/dbfs/core.o obj/dbfs/dirop.o obj/dirbuf.o
+DBFS_OBJS = obj/dbfs/dbfs.o obj/dbfs/common.o obj/dbfs/core.o obj/dbfs/op_base.o obj/dbfs/dirop.o obj/dirbuf.o
# first target
all: ${BIN_PATHS}
@@ -44,7 +44,11 @@
# other targets
clean :
- -rm obj/* bin/*
+ -rm obj/* bin/* build/deps/*
+
+clean-deps:
+ -rm build/deps/*/*.d
+ -rm build/deps/*.d
#obj-dirs:
# python build/make_obj_dirs.py $(BIN_PATHS)
--- a/src/dbfs/common.c Wed Oct 15 01:14:22 2008 +0300
+++ b/src/dbfs/common.c Thu Oct 16 22:04:53 2008 +0300
@@ -1,7 +1,8 @@
#include <string.h>
-#include "common.h"
+#include "dbfs.h"
+#include "../lib/log.h"
mode_t _dbfs_mode (const char *type) {
if (!strcmp(type, "DIR"))
--- a/src/dbfs/common.h Wed Oct 15 01:14:22 2008 +0300
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,62 +0,0 @@
-#ifndef DBFS_COMMON_H
-#define DBFS_COMMON_H
-
-#include <sys/stat.h>
-#include <errno.h>
-
-#include <event2/event.h>
-
-#include "../evfuse.h"
-#include "../evsql.h"
-#include "../lib/log.h"
-#include "../lib/misc.h"
-
-/*
- * Structs and functions shared between all dbfs components
- */
-
-#define SERROR(val) do { (val); goto error; } while(0)
-
-struct dbfs {
- struct event_base *ev_base;
-
- const char *db_conninfo;
- struct evsql *db;
-
- struct evfuse *ev_fuse;
-};
-
-// XXX: not sure how this should work
-#define CACHE_TIMEOUT 1.0
-
-/*
- * Convert the CHAR(4) inodes.type from SQL into a mode_t.
- *
- * Returns zero for unknown types.
- */
-mode_t _dbfs_mode (const char *type);
-
-/*
- * Check that the number of rows and columns in the result set matches what we expect.
- *
- * If rows is nonzero, there must be exactly that many rows (mostly useful for rows=1).
- * The number of columns must always be given, and match.
- *
- * Returns;
- * -1 if the query failed, the columns/rows do not match
- * 0 the results match
- * 1 there were no results (zero rows)
- */
-int _dbfs_check_res (const struct evsql_result_info *res, size_t rows, size_t cols);
-
-/*
- * Fill a `struct state` with info retrieved from a SQL query.
- *
- * The result must contain four columns, starting at the given offset:
- * inodes.type, inodes.mode, inodes.size, count(*) AS nlink
- *
- * Note that this does not fill the st_ino field
- */
-int _dbfs_stat_info (struct stat *st, const struct evsql_result_info *res, size_t row, size_t col_offset);
-
-#endif /* DBFS_COMMON_H */
--- a/src/dbfs/core.c Wed Oct 15 01:14:22 2008 +0300
+++ b/src/dbfs/core.c Thu Oct 16 22:04:53 2008 +0300
@@ -1,6 +1,8 @@
-#include "common.h"
-#include "ops.h"
+#include "../lib/log.h"
+#include "../lib/misc.h"
+
+#include "dbfs.h"
/*
* Core fs functionality like lookup, getattr
--- a/src/dbfs/dbfs.c Wed Oct 15 01:14:22 2008 +0300
+++ b/src/dbfs/dbfs.c Thu Oct 16 22:04:53 2008 +0300
@@ -1,9 +1,10 @@
#include <stdlib.h>
+#include "dbfs.h"
#include "../dbfs.h"
-#include "common.h"
-#include "ops.h"
+#include "../lib/log.h"
+#include "../lib/misc.h"
static struct fuse_lowlevel_ops dbfs_llops = {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/dbfs/dbfs.h Thu Oct 16 22:04:53 2008 +0300
@@ -0,0 +1,61 @@
+#ifndef DBFS_DBFS_H
+#define DBFS_DBFS_H
+
+#include <sys/stat.h>
+#include <errno.h>
+
+#include <event2/event.h>
+
+#include "ops.h"
+#include "../evfuse.h"
+#include "../evsql.h"
+
+/*
+ * Structs and functions shared between all dbfs components
+ */
+
+#define SERROR(val) do { (val); goto error; } while(0)
+
+struct dbfs {
+ struct event_base *ev_base;
+
+ const char *db_conninfo;
+ struct evsql *db;
+
+ struct evfuse *ev_fuse;
+};
+
+// XXX: not sure how this should work
+#define CACHE_TIMEOUT 1.0
+
+/*
+ * Convert the CHAR(4) inodes.type from SQL into a mode_t.
+ *
+ * Returns zero for unknown types.
+ */
+mode_t _dbfs_mode (const char *type);
+
+/*
+ * Check that the number of rows and columns in the result set matches what we expect.
+ *
+ * If rows is nonzero, there must be exactly that many rows (mostly useful for rows=1).
+ * The number of columns must always be given, and match.
+ *
+ * Returns;
+ * -1 if the query failed, the columns/rows do not match
+ * 0 the results match
+ * 1 there were no results (zero rows)
+ */
+int _dbfs_check_res (const struct evsql_result_info *res, size_t rows, size_t cols);
+
+/*
+ * Fill a `struct state` with info retrieved from a SQL query.
+ *
+ * The result must contain four columns, starting at the given offset:
+ * inodes.type, inodes.mode, inodes.size, count(*) AS nlink
+ *
+ * Note that this does not fill the st_ino field
+ */
+int _dbfs_stat_info (struct stat *st, const struct evsql_result_info *res, size_t row, size_t col_offset);
+
+#endif /* DBFS_DBFS_H */
--- a/src/dbfs/dirop.c Wed Oct 15 01:14:22 2008 +0300
+++ b/src/dbfs/dirop.c Thu Oct 16 22:04:53 2008 +0300
@@ -2,100 +2,44 @@
#include <stdlib.h>
#include <assert.h>
-#include "common.h"
-#include "ops.h"
+#include "dbfs.h"
+#include "op_base.h"
#include "../dirbuf.h"
+#include "../lib/log.h"
/*
* Directory related functionality like opendir, readdir, releasedir
*/
-
struct dbfs_dirop {
- struct fuse_file_info fi;
- struct fuse_req *req;
+ struct dbfs_op base;
- struct evsql_trans *trans;
+ // parent dir inodes
+ uint32_t parent;
- // dir/parent dir inodes
- uint32_t ino, parent;
-
- // opendir has returned and releasedir hasn't been called yet
- int open;
-
// for readdir
struct dirbuf dirbuf;
};
/*
- * Free the dirop, aborting any in-progress transaction.
- *
- * The dirop must any oustanding request responded to first, must not be open, and must not have a transaction.
- *
- * The dirbuf will be released, and the dirop free'd.
+ * Release the dirbuf.
*/
-static void _dbfs_dirop_free (struct dbfs_dirop *dirop) {
- assert(dirop);
- assert(!dirop->open);
- assert(!dirop->req);
- assert(!dirop->trans);
-
+static void _dbfs_dirop_free (struct dbfs_op *op_base) {
+ struct dbfs_dirop *dirop = (struct dbfs_dirop *) op_base;
+
// just release the dirbuf
dirbuf_release(&dirop->dirbuf);
-
- // and then free the dirop
- free(dirop);
-}
-
-/*
- * This will handle backend failures during requests.
- *
- * 1) if we have a trans, abort it
- * 2) fail the req (mandatory)
- *
- * If the dirop is open, then we don't release it, but if it's not open, then the dirop will be free'd completely.
- *
- */
-static void _dbfs_dirop_fail (struct dbfs_dirop *dirop) {
- int err;
-
- assert(dirop->req);
-
- if (dirop->trans) {
- // abort the trans
- evsql_trans_abort(dirop->trans);
-
- dirop->trans = NULL;
- }
-
- // send an error reply
- if ((err = fuse_reply_err(dirop->req, err)))
- // XXX: handle these failures /somehow/, or requests will hang and interrupts might handle invalid dirops
- EFATAL(err, "dbfs.fail %p:%p dirop_fail: reply with fuse_reply_err", dirop, dirop->req);
-
- // drop the req
- dirop->req = NULL;
-
- // is it open?
- if (!dirop->open) {
- // no, we can free it now and then forget about the whole thing
- _dbfs_dirop_free(dirop);
-
- } else {
- // we need to wait for releasedir
-
- }
}
/*
* Handle the results for the initial attribute lookup for the dir itself during opendir ops.
*/
-static void dbfs_opendir_info_res (const struct evsql_result_info *res, void *arg) {
+static void dbfs_opendir_res (const struct evsql_result_info *res, void *arg) {
struct dbfs_dirop *dirop = arg;
int err;
- assert(dirop->trans);
- assert(dirop->req);
- assert(!dirop->open);
+ assert(dirop->base.req);
+ assert(dirop->base.trans); // query callbacks don't get called if the trans fails
+ assert(!dirop->base.open);
// check the results
if ((err = _dbfs_check_res(res, 1, 2)))
@@ -114,17 +58,11 @@
if (_dbfs_mode(type) != S_IFDIR)
EERROR(err = ENOTDIR, "wrong type: %s", type);
- INFO("[dbfs.opendir %p:%p] -> ino=%lu, parent=%lu, type=%s", dirop, dirop->req, (unsigned long int) dirop->ino, (unsigned long int) dirop->parent, type);
+ INFO("[dbfs.opendir %p:%p] -> ino=%lu, parent=%lu, type=%s", dirop, dirop->base.req, (unsigned long int) dirop->base.ino, (unsigned long int) dirop->parent, type);
- // send the openddir reply
- if ((err = fuse_reply_open(dirop->req, &dirop->fi)))
- EERROR(err, "fuse_reply_open");
-
- // req is done
- dirop->req = NULL;
-
- // dirop is now open
- dirop->open = 1;
+ // open_fn done, do the open_reply
+ if ((err = dbfs_op_open_reply(&dirop->base)))
+ goto error;
// success, fallthrough for evsql_result_free
err = 0;
@@ -132,7 +70,7 @@
error:
if (err)
// fail it
- _dbfs_dirop_fail(dirop);
+ dbfs_op_fail(&dirop->base, err);
// free
evsql_result_free(res);
@@ -141,20 +79,16 @@
/*
* The opendir transaction is ready for use. Query for the given dir's info
*/
-static void dbfs_dirop_ready (struct evsql_trans *trans, void *arg) {
- struct dbfs_dirop *dirop = arg;
- struct dbfs *ctx = fuse_req_userdata(dirop->req);
+static void dbfs_dirop_open (struct dbfs_op *op_base) {
+ struct dbfs_dirop *dirop = (struct dbfs_dirop *) op_base;
+ struct dbfs *ctx = fuse_req_userdata(dirop->base.req);
int err;
- // XXX: unless we abort queries
- assert(trans == dirop->trans);
- assert(dirop->req);
- assert(!dirop->open);
+ assert(dirop->base.trans);
+ assert(dirop->base.req);
+ assert(!dirop->base.open);
- INFO("[dbfs.opendir %p:%p] -> trans=%p", dirop, dirop->req, trans);
-
- // remember the transaction
- dirop->trans = trans;
+ INFO("[dbfs.opendir %p:%p] -> trans=%p", dirop, dirop->base.req, dirop->base.trans);
// first fetch info about the dir itself
const char *sql =
@@ -171,12 +105,12 @@
// build params
if (0
- || evsql_param_uint32(¶ms, 0, dirop->ino)
+ || evsql_param_uint32(¶ms, 0, dirop->base.ino)
)
SERROR(err = EIO);
// query
- if (evsql_query_params(ctx->db, dirop->trans, sql, ¶ms, dbfs_opendir_info_res, dirop) == NULL)
+ if (evsql_query_params(ctx->db, dirop->base.trans, sql, ¶ms, dbfs_opendir_res, dirop) == NULL)
SERROR(err = EIO);
// ok, wait for the info results
@@ -184,67 +118,11 @@
error:
// fail it
- _dbfs_dirop_fail(dirop);
+ dbfs_op_fail(&dirop->base, err);
}
/*
- * The dirop trans was committed, i.e. releasedir has completed
- */
-static void dbfs_dirop_done (struct evsql_trans *trans, void *arg) {
- struct dbfs_dirop *dirop = arg;
- int err;
-
- assert(dirop->trans);
- assert(dirop->req);
- assert(!dirop->open); // should not be considered as open anymore at this point, as errors should release
-
- INFO("[dbfs.releasedir %p:%p] -> OK", dirop, dirop->req);
-
- // forget trans
- dirop->trans = NULL;
-
- // just reply
- if ((err = fuse_reply_err(dirop->req, 0)))
- // XXX: handle these failures /somehow/, or requests will hang and interrupts might handle invalid dirops
- EFATAL(err, "[dbfs.releasedir %p:%p] dirop_done: reply with fuse_reply_err", dirop, dirop->req);
-
- // req is done
- dirop->req = NULL;
-
- // then we can just free dirop
- _dbfs_dirop_free(dirop);
-}
-
-/*
- * The dirop trans has failed, somehow, at some point, some where.
- *
- * This might happend during the opendir evsql_trans, during a readdir evsql_query, during the releasedir
- * evsql_trans_commit, or at any point in between.
- *
- * 1) loose the transaction
- * 2) if dirop has a req, we handle failing it
- */
-static void dbfs_dirop_error (struct evsql_trans *trans, void *arg) {
- struct dbfs_dirop *dirop = arg;
-
- INFO("[dbfs:dirop %p:%p] evsql transaction error: %s", dirop, dirop->req, evsql_trans_error(trans));
-
- // deassociate the trans
- dirop->trans = NULL;
-
- // if we were answering a req, error it out, and if the dirop isn't open, release it
- // if we didn't have a req outstanding, the dirop must be open, so we wouldn't free it in any case, and must wait
- // for the next readdir/releasedir to detect this and return an error reply
- if (dirop->req)
- _dbfs_dirop_fail(dirop);
- else
- assert(dirop->open);
-}
-
-/*
- * Handle opendir(), this means starting a new transaction, dbfs_dirop_ready/error will continue on from there.
- *
- * The contents of fi will be copied into the dirop, and will be used as the basis for the fuse_reply_open reply.
+ * Handle opendir(), this means starting a new op.
*/
void dbfs_opendir (struct fuse_req *req, fuse_ino_t ino, struct fuse_file_info *fi) {
struct dbfs *ctx = fuse_req_userdata(req);
@@ -255,28 +133,19 @@
if ((dirop = calloc(1, sizeof(*dirop))) == NULL && (err = EIO))
ERROR("calloc");
+ // do the op_open
+ if ((err = dbfs_op_open(ctx, &dirop->base, req, ino, fi, _dbfs_dirop_free, dbfs_dirop_open)))
+ ERROR("dbfs_op_open");
+
INFO("[dbfs.opendir %p:%p] ino=%lu, fi=%p", dirop, req, ino, fi);
- // store the dirop
- // copy *fi since it's on the stack
- dirop->fi = *fi;
- dirop->fi.fh = (uint64_t) dirop;
- dirop->req = req;
- dirop->ino = ino;
-
- // start a new transaction
- if ((dirop->trans = evsql_trans(ctx->db, EVSQL_TRANS_SERIALIZABLE, dbfs_dirop_error, dbfs_dirop_ready, dbfs_dirop_done, dirop)) == NULL)
- SERROR(err = EIO);
-
- // XXX: handle interrupts
-
// wait
return;
error:
if (dirop) {
// we can fail normally
- _dbfs_dirop_fail(dirop);
+ dbfs_op_fail(&dirop->base, err);
} else {
// must error out manually as we couldn't alloc the context
@@ -291,20 +160,20 @@
* Fill up the dirbuf, and then send the reply.
*
*/
-static void dbfs_readdir_files_res (const struct evsql_result_info *res, void *arg) {
+static void dbfs_readdir_res (const struct evsql_result_info *res, void *arg) {
struct dbfs_dirop *dirop = arg;
int err;
size_t row;
- assert(dirop->req);
- assert(dirop->trans);
- assert(dirop->open);
+ assert(dirop->base.req);
+ assert(dirop->base.trans); // query callbacks don't get called if the trans fails
+ assert(dirop->base.open);
// check the results
if ((err = _dbfs_check_res(res, 0, 4)) < 0)
SERROR(err = EIO);
- INFO("[dbfs.readdir %p:%p] -> files: res_rows=%zu", dirop, dirop->req, evsql_result_rows(res));
+ INFO("[dbfs.readdir %p:%p] -> files: res_rows=%zu", dirop, dirop->base.req, evsql_result_rows(res));
// iterate over the rows
for (row = 0; row < evsql_result_rows(res); row++) {
@@ -324,7 +193,7 @@
// add to the dirbuf
// offsets are just offset + 2
- if ((err = dirbuf_add(dirop->req, &dirop->dirbuf, off + 2, off + 3, name, ino, _dbfs_mode(type))) < 0 && (err = EIO))
+ if ((err = dirbuf_add(dirop->base.req, &dirop->dirbuf, off + 2, off + 3, name, ino, _dbfs_mode(type))) < 0 && (err = EIO))
ERROR("failed to add dirent for inode=%lu", (long unsigned int) ino);
// stop if it's full
@@ -333,18 +202,19 @@
}
// send it
- if ((err = dirbuf_done(dirop->req, &dirop->dirbuf)))
+ if ((err = dirbuf_done(dirop->base.req, &dirop->dirbuf)))
EERROR(err, "failed to send buf");
+
+ // handled the req
+ if ((err = dbfs_op_req_done(&dirop->base)))
+ goto error;
- // req is done
- dirop->req = NULL;
-
// good, fallthrough
err = 0;
error:
if (err)
- _dbfs_dirop_fail(dirop);
+ dbfs_op_fail(&dirop->base, err);
// free
evsql_result_free(res);
@@ -352,7 +222,7 @@
/*
* Handle a readdir request. This will execute a SQL query inside the transaction to get the files at the given offset,
- * and _dbfs_readdir_res will handle the results.
+ * and dbfs_readdir_res will handle the results.
*
* If trans failed earlier, detect that and return an error.
*/
@@ -360,20 +230,12 @@
struct dbfs *ctx = fuse_req_userdata(req);
struct dbfs_dirop *dirop = (struct dbfs_dirop *) fi->fh;
int err;
-
- assert(dirop);
- assert(!dirop->req);
- assert(dirop->open);
- assert(dirop->ino == ino);
+
+ // get the op
+ if ((dirop = (struct dbfs_dirop *) dbfs_op_req(req, ino, fi)) == NULL)
+ SERROR(err = EIO);
- // store the new req
- dirop->req = req;
-
- // detect earlier failures
- if (!dirop->trans && (err = EIO))
- ERROR("dirop trans has failed");
-
- INFO("[dbfs.readdir %p:%p] ino=%lu, size=%zu, off=%zu, fi=%p : trans=%p", dirop, req, ino, size, off, fi, dirop->trans);
+ INFO("[dbfs.readdir %p:%p] ino=%lu, size=%zu, off=%zu, fi=%p : trans=%p", dirop, req, ino, size, off, fi, dirop->base.trans);
// create the dirbuf
if (dirbuf_init(&dirop->dirbuf, size, off))
@@ -383,9 +245,9 @@
// we set the next offset to 2, because all dirent offsets will be larger than that
// assume that these two should *always* fit
if ((err = (0
- || dirbuf_add(req, &dirop->dirbuf, 0, 1, ".", dirop->ino, S_IFDIR )
+ || dirbuf_add(req, &dirop->dirbuf, 0, 1, ".", dirop->base.ino, S_IFDIR )
|| dirbuf_add(req, &dirop->dirbuf, 1, 2, "..",
- dirop->parent ? dirop->parent : dirop->ino, S_IFDIR )
+ dirop->parent ? dirop->parent : dirop->base.ino, S_IFDIR )
)) && (err = EIO))
ERROR("failed to add . and .. dirents");
@@ -411,21 +273,21 @@
// build params
if (0
- || evsql_param_uint32(¶ms, 0, dirop->ino)
+ || evsql_param_uint32(¶ms, 0, ino)
|| evsql_param_uint32(¶ms, 1, off)
|| evsql_param_uint32(¶ms, 2, dirbuf_estimate(&dirop->dirbuf, 0))
)
SERROR(err = EIO);
// query
- if (evsql_query_params(ctx->db, dirop->trans, sql, ¶ms, dbfs_readdir_files_res, dirop) == NULL)
+ if (evsql_query_params(ctx->db, dirop->base.trans, sql, ¶ms, dbfs_readdir_res, dirop) == NULL)
SERROR(err = EIO);
// good, wait
return;
error:
- _dbfs_dirop_fail(dirop);
+ dbfs_op_fail(&dirop->base, err);
}
/*
@@ -434,63 +296,7 @@
* The dirop may be in a failed state.
*/
void dbfs_releasedir (struct fuse_req *req, fuse_ino_t ino, struct fuse_file_info *fi) {
- struct dbfs *ctx = fuse_req_userdata(req);
- struct dbfs_dirop *dirop = (struct dbfs_dirop *) fi->fh;
- int err;
-
- (void) ctx;
-
- assert(dirop);
- assert(!dirop->req);
- assert(dirop->ino == ino);
-
- // update to this req
- dirop->req = req;
-
- // fi is irrelevant, we don't touch the flags anyways
- (void) fi;
-
- // handle failed trans
- if (!dirop->trans)
- ERROR("trans has failed");
-
- // log
- INFO("[dbfs.releasedir %p:%p] ino=%lu, fi=%p : trans=%p", dirop, req, ino, fi, dirop->trans);
-
- // we must commit the transaction (although it was jut SELECTs, no changes).
- // Note that this might cause dbfs_dirop_error to be called, we can tell if that happaned by looking at dirop->req
- // or dirop->trans this means that we need to keep the dirop open when calling trans_commit, so that dirop_error
- // doesn't free it out from underneath us.
- if (evsql_trans_commit(dirop->trans))
- SERROR(err = EIO);
-
- // fall-through to cleanup
- err = 0;
-
-error:
- // the dirop is not open anymore and can be free'd:
- // a) if we already caught an error
- // b) if we get+send an error later on
- // c) if we get+send the done/no-error later on
- dirop->open = 0;
-
- // did the commit/pre-commit-checks fail?
- if (err) {
- // a) the trans failed earlier (readdir), so we have a req but no trans
- // b) the trans commit failed, dirop_error got called -> no req and no trans
- // c) the trans commit failed, dirop_error did not get called -> have req and trans
- // we either have a req (may or may not have trans), or we don't have a trans either
- // i.e. there is no situation where we don't have a req but do have a trans
-
- if (dirop->req)
- _dbfs_dirop_fail(dirop);
- else
- assert(!dirop->trans);
-
- } else {
- // shouldn't slip by, dirop_done should not get called directly. Once it does, it will handle both.
- assert(dirop->req);
- assert(dirop->trans);
- }
+ // just passthrough to dbfs_op
+ dbfs_op_release(req, ino, fi);
}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/dbfs/op_base.c Thu Oct 16 22:04:53 2008 +0300
@@ -0,0 +1,285 @@
+#include <stdlib.h>
+#include <assert.h>
+
+#include "op_base.h"
+#include "../lib/log.h"
+
+/*
+ * Free the op.
+ *
+ * The op must any oustanding request responded to first, must not be open, and must not have a transaction.
+ *
+ * The op will be free'd.
+ */
+static void dbfs_op_free (struct dbfs_op *op) {
+ assert(op);
+ assert(!op->open);
+ assert(!op->req);
+ assert(!op->trans);
+
+ // free_fn
+ if (op->free_fn)
+ op->free_fn(op);
+
+ // and then free the op
+ free(op);
+}
+
+void dbfs_op_fail (struct dbfs_op *op, int err) {
+ assert(op->req);
+
+ if (op->trans) {
+ // abort the trans
+ evsql_trans_abort(op->trans);
+
+ op->trans = NULL;
+ }
+
+ // send an error reply
+ if ((err = fuse_reply_err(op->req, err)))
+ // XXX: handle these failures /somehow/, or requests will hang and interrupts might handle invalid ops
+ EFATAL(err, "\tdbfs_op.fail %p:%p -> reply with fuse_reply_err", op, op->req);
+
+ // drop the req
+ op->req = NULL;
+
+ // is it open?
+ if (!op->open) {
+ // no, we can free it now and then forget about the whole thing
+ dbfs_op_free(op);
+
+ } else {
+ // we need to wait for release
+
+ }
+}
+
+/*
+ * The op_open transaction is ready for use.
+ */
+static void dbfs_op_ready (struct evsql_trans *trans, void *arg) {
+ struct dbfs_op *op = arg;
+
+ assert(trans == op->trans);
+ assert(op->req);
+ assert(!op->open);
+
+ INFO("\tdbfs_op.ready %p:%p -> trans=%p", op, op->req, trans);
+
+ // remember the transaction
+ op->trans = trans;
+
+ // ready
+ op->open_fn(op);
+
+ // good
+ return;
+}
+
+/*
+ * The op trans was committed, i.e. release has completed
+ */
+static void dbfs_op_done (struct evsql_trans *trans, void *arg) {
+ struct dbfs_op *op = arg;
+ int err;
+
+ assert(trans == op->trans);
+ assert(op->req);
+ assert(!op->open); // should not be considered as open anymore at this point, as errors should release
+
+ INFO("\tdbfs_op.done %p:%p -> OK", op, op->req);
+
+ // forget trans
+ op->trans = NULL;
+
+ // just reply
+ if ((err = fuse_reply_err(op->req, 0)))
+ // XXX: handle these failures /somehow/, or requests will hang and interrupts might handle invalid ops
+ EFATAL(err, "dbfs_op.done %p:%p -> reply with fuse_reply_err", op, op->req);
+
+ // req is done
+ op->req = NULL;
+
+ // then we can just free op
+ dbfs_op_free(op);
+}
+
+/*
+ * The op trans has failed, somehow, at some point, some where.
+ *
+ * This might happend during the open evsql_trans, during a read evsql_query, during the release
+ * evsql_trans_commit, or at any point in between.
+ *
+ * 1) loose the transaction
+ * 2) if op has a req, we handle failing it
+ */
+static void dbfs_op_error (struct evsql_trans *trans, void *arg) {
+ struct dbfs_op *op = arg;
+
+ // unless we fail
+ assert(trans == op->trans);
+
+ INFO("\tdbfs_op.error %p:%p -> evsql transaction error: %s", op, op->req, evsql_trans_error(trans));
+
+ // deassociate the trans
+ op->trans = NULL;
+
+ // if we were answering a req, error it out, and if the op isn't open, free
+ // if we didn't have a req outstanding, the op must be open, so we wouldn't free it in any case, and must wait
+ // for the next read/release to detect this and return an error reply
+ if (op->req)
+ dbfs_op_fail(op, EIO);
+ else
+ assert(op->open);
+}
+
+int dbfs_op_open (struct dbfs *ctx, struct dbfs_op *op, struct fuse_req *req, fuse_ino_t ino, struct fuse_file_info *fi, dbfs_op_free_cb free_fn, dbfs_op_open_cb open_fn) {
+ int err;
+
+ assert(op && req && ino && fi);
+ assert(!(op->req || op->ino));
+
+ // initialize the op
+ op->req = req;
+ op->ino = ino;
+ // copy *fi since it's on the stack
+ op->fi = *fi;
+ op->fi.fh = (uint64_t) op;
+ op->free_fn = free_fn;
+ op->open_fn = open_fn;
+
+ // start a new transaction
+ if ((op->trans = evsql_trans(ctx->db, EVSQL_TRANS_SERIALIZABLE, dbfs_op_error, dbfs_op_ready, dbfs_op_done, op)) == NULL)
+ SERROR(err = EIO);
+
+ // XXX: handle interrupts
+
+ // good
+ return 0;
+
+error:
+ // nothing of ours to cleanup
+ return err;
+}
+
+int dbfs_op_open_reply (struct dbfs_op *op) {
+ int err;
+
+ // detect earlier failures
+ if (!op->trans && (err = EIO))
+ ERROR("op trans has failed");
+
+ // send the openddir reply
+ if ((err = fuse_reply_open(op->req, &op->fi)))
+ EERROR(err, "fuse_reply_open");
+
+ // req is done
+ op->req = NULL;
+
+ // op is now open
+ op->open = 1;
+
+ // good
+ return 0;
+
+error:
+ return err;
+}
+
+struct dbfs_op *dbfs_op_req (struct fuse_req *req, fuse_ino_t ino, struct fuse_file_info *fi) {
+ struct dbfs_op *op = (struct dbfs_op *) fi->fh;
+ int err;
+
+ // validate
+ assert(op);
+ assert(!op->req);
+ assert(op->open);
+ assert(op->ino == ino);
+
+ // store the new req
+ op->req = req;
+
+ // detect earlier failures
+ if (!op->trans && (err = EIO))
+ ERROR("op trans has failed");
+
+ // good
+ return op;
+
+error:
+ dbfs_op_fail(op, err);
+
+ return NULL;
+}
+
+int dbfs_op_req_done (struct dbfs_op *op) {
+ // validate
+ assert(op);
+ assert(op->req);
+ assert(op->open);
+
+ // unassign the req
+ op->req = NULL;
+
+ // k
+ return 0;
+}
+
+void dbfs_op_release (struct fuse_req *req, fuse_ino_t ino, struct fuse_file_info *fi) {
+ struct dbfs_op *op = (struct dbfs_op *) fi->fh;
+ int err;
+
+ assert(op);
+ assert(!op->req);
+ assert(op->ino == ino);
+
+ // update to this req
+ op->req = req;
+
+ // fi is irrelevant, we don't touch the flags anyways
+ (void) fi;
+
+ // handle failed trans
+ if (!op->trans && (err = EIO))
+ ERROR("trans has failed");
+
+ // log
+ INFO("\tdbfs_op.release %p:%p : ino=%lu, fi=%p : trans=%p", op, req, ino, fi, op->trans);
+
+ // we must commit the transaction.
+ // Note that this might cause dbfs_op_error to be called, we can tell if that happaned by looking at op->req
+ // or op->trans - this means that we need to keep the op open when calling trans_commit, so that op_error
+ // doesn't free it out from underneath us.
+ if (evsql_trans_commit(op->trans))
+ SERROR(err = EIO);
+
+ // fall-through to cleanup
+ err = 0;
+
+error:
+ // the op is not open anymore and can be free'd next, because we either:
+ // a) already caught an error
+ // b) we get+send an error later on
+ // c) we get+send the done/no-error later on
+ op->open = 0;
+
+ // did the commit/pre-commit-checks fail?
+ if (err) {
+ // a) the trans failed earlier (read), so we have a req but no trans
+ // b) the trans commit failed, op_error got called -> no req and no trans
+ // c) the trans commit failed, op_error did not get called -> have req and trans
+ // we either have a req (may or may not have trans), or we don't have a trans either
+ // i.e. there is no situation where we don't have a req but do have a trans
+
+ if (op->req)
+ dbfs_op_fail(op, err);
+ else
+ assert(!op->trans);
+
+ } else {
+ // shouldn't slip by, op_done should not get called directly. Once it does, it will handle both.
+ assert(op->req);
+ assert(op->trans);
+ }
+}
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/dbfs/op_base.h Thu Oct 16 22:04:53 2008 +0300
@@ -0,0 +1,94 @@
+#ifndef DBFS_OP_BASE_H
+#define DBFS_OP_BASE_H
+
+#include "dbfs.h"
+
+// forward-declaration for callbacks
+struct dbfs_op;
+
+/*
+ * Called by dbfs_op_free to release any resources when the op is free'd (i.e. not open anymore).
+ */
+typedef void (*dbfs_op_free_cb) (struct dbfs_op *op_base);
+
+/*
+ * Called after the transaction has been opened, and before reply_open.
+ *
+ * You can do any at-open initialization here.
+ */
+typedef void (*dbfs_op_open_cb) (struct dbfs_op *op_base);
+
+// the base op state
+struct dbfs_op {
+ struct fuse_file_info fi;
+ struct fuse_req *req;
+
+ struct evsql_trans *trans;
+
+ // op target inode
+ uint32_t ino;
+
+ // open has returned and release hasn't been called yet
+ int open;
+
+ // callbacks
+ dbfs_op_free_cb free_fn;
+ dbfs_op_open_cb open_fn;
+};
+
+/*
+ * This will handle failures during requests.
+ *
+ * 1) if we have a trans, abort it
+ * 2) fail the req (mandatory) with the given err
+ *
+ * If the op is open, then we don't release it, but if it's not open, then the op will be free'd completely.
+ *
+ */
+void dbfs_op_fail (struct dbfs_op *op, int err);
+
+/*
+ * Open the op, that is, store all the initial state, and open a new transaction.
+ *
+ * The op must be pre-allocated and zero-initialized.
+ *
+ * This will always set op->req, so op is safe for dbfs_op_fail after this.
+ *
+ * This does not fail the dirop, handle error replies yourself.
+ *
+ * Returns zero on success, err on failure.
+ */
+int dbfs_op_open (struct dbfs *ctx, struct dbfs_op *op, struct fuse_req *req, fuse_ino_t ino, struct fuse_file_info *fi, dbfs_op_free_cb free_fn, dbfs_op_open_cb ready_fn);
+
+/*
+ * Should be called from open_fn to send the fuse_reply_open with fi and mark the op as open.
+ *
+ * If the op has failed earlier or fuse_reply_open fails, this will return nonzero. Fail the op yourself.
+ */
+int dbfs_op_open_reply (struct dbfs_op *op);
+
+/*
+ * Start handling a normal op requests.
+ *
+ * Lookup the op for the given fi, validate params, and assign the new req.
+ *
+ * In case the op failed previously, this will error the req and return NULL, indicating that the req has been handled.
+ */
+struct dbfs_op *dbfs_op_req (struct fuse_req *req, fuse_ino_t ino, struct fuse_file_info *fi);
+
+/*
+ * Done handling a request, adjust state accordingly.
+ *
+ * req *must* have been replied to.
+ */
+int dbfs_op_req_done (struct dbfs_op *op);
+
+/*
+ * Handle the op release.
+ *
+ * This will take care of committing the transaction, sending any reply/error, closing the op and freeing it.
+ */
+void dbfs_op_release (struct fuse_req *req, fuse_ino_t, struct fuse_file_info *fi);
+
+
+#endif /* DBFS_OP_BASE_H */
--- a/src/evfuse.c Wed Oct 15 01:14:22 2008 +0300
+++ b/src/evfuse.c Thu Oct 16 22:04:53 2008 +0300
@@ -47,7 +47,7 @@
ERROR("fuse_chan_recv failed: %s", strerror(-res));
if (res > 0) {
- INFO("[evfuse] got %d bytes from /dev/fuse", res);
+ DEBUG("got %d bytes from /dev/fuse", res);
// received a fuse_req, so process it
fuse_session_process(ctx->session, ctx->recv_buf, res, ch);
--- a/src/evsql.c Wed Oct 15 01:14:22 2008 +0300
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,1019 +0,0 @@
-#define _GNU_SOURCE
-#include <stdlib.h>
-#include <assert.h>
-#include <string.h>
-
-#include "evsql.h"
-#include "evsql_internal.h"
-#include "evpq.h"
-#include "lib/log.h"
-#include "lib/error.h"
-#include "lib/misc.h"
-
-/*
- * A couple function prototypes
- */
-static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn);
-
-/*
- * Actually execute the given query.
- *
- * The backend should be able to accept the query at this time.
- *
- * You should assume that if trying to execute a query fails, then the connection should also be considred as failed.
- */
-static int _evsql_query_exec (struct evsql_conn *conn, struct evsql_query *query, const char *command) {
- int err;
-
- switch (conn->evsql->type) {
- case EVSQL_EVPQ:
- // got params?
- if (query->params.count) {
- err = evpq_query_params(conn->engine.evpq, command,
- query->params.count,
- query->params.types,
- query->params.values,
- query->params.lengths,
- query->params.formats,
- query->params.result_format
- );
-
- } else {
- // plain 'ole query
- err = evpq_query(conn->engine.evpq, command);
- }
-
- if (err) {
- if (PQstatus(evpq_pgconn(conn->engine.evpq)) != CONNECTION_OK)
- WARNING("conn failed");
- else
- WARNING("query failed, dropping conn as well");
- }
-
- break;
-
- default:
- FATAL("evsql->type");
- }
-
- if (!err)
- // assign the query
- conn->query = query;
-
- return err;
-}
-
-/*
- * Free the query and related resources, doesn't trigger any callbacks or remove from any queues.
- *
- * The command should already be taken care of (NULL).
- */
-static void _evsql_query_free (struct evsql_query *query) {
- if (!query)
- return;
-
- assert(query->command == NULL);
-
- // free params if present
- free(query->params.types);
- free(query->params.values);
- free(query->params.lengths);
- free(query->params.formats);
-
- // free the query itself
- free(query);
-}
-
-/*
- * Execute the callback if res is given, and free the query.
- *
- * The query has been aborted, it will simply be freed
- */
-static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *res) {
- if (res) {
- if (query->cb_fn)
- // call the callback
- query->cb_fn(res, query->cb_arg);
- else
- WARNING("supressing cb_fn because query was aborted");
- }
-
- // free
- _evsql_query_free(query);
-}
-
-/*
- * XXX:
- * /
-static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *res) {
- struct evsql_query *query;
-
- // clear the queue
- while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
- _evsql_query_done(query, res);
-
- TAILQ_REMOVE(&evsql->query_queue, query, entry);
- }
-
- // free
- free(evsql);
-}
-*/
-
-/*
- * Free the transaction, it should already be deassociated from the query and conn.
- */
-static void _evsql_trans_free (struct evsql_trans *trans) {
- // ensure we don't leak anything
- assert(trans->query == NULL);
- assert(trans->conn == NULL);
-
- // free
- free(trans);
-}
-
-/*
- * Release a connection. It should already be deassociated from the trans and query.
- *
- * Releases the engine, removes from the conn_list and frees this.
- */
-static void _evsql_conn_release (struct evsql_conn *conn) {
- // ensure we don't leak anything
- assert(conn->trans == NULL);
- assert(conn->query == NULL);
-
- // release the engine
- switch (conn->evsql->type) {
- case EVSQL_EVPQ:
- evpq_release(conn->engine.evpq);
- break;
-
- default:
- FATAL("evsql->type");
- }
-
- // remove from list
- LIST_REMOVE(conn, entry);
-
- // catch deadlocks
- assert(!LIST_EMPTY(&conn->evsql->conn_list) || TAILQ_EMPTY(&conn->evsql->query_queue));
-
- // free
- free(conn);
-}
-
-/*
- * Release a transaction, it should already be deassociated from the query.
- *
- * Perform a two-way-deassociation with the conn, and then free the trans.
- */
-static void _evsql_trans_release (struct evsql_trans *trans) {
- assert(trans->query == NULL);
- assert(trans->conn != NULL);
-
- // deassociate the conn
- trans->conn->trans = NULL; trans->conn = NULL;
-
- // free the trans
- _evsql_trans_free(trans);
-}
-
-/*
- * Fail a single query, this will trigger the callback and free it.
- *
- * NOTE: Only for *TRANSACTIONLESS* queries.
- */
-static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) {
- struct evsql_result_info res; ZINIT(res);
-
- // set up the result_info
- res.evsql = evsql;
- res.trans = NULL;
- res.error = 1;
-
- // finish off the query
- _evsql_query_done(query, &res);
-}
-
-/*
- * Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the
- * conn, and then free the trans.
- */
-static void _evsql_trans_fail (struct evsql_trans *trans) {
- if (trans->query) {
- // free the query silently
- _evsql_query_free(trans->query); trans->query = NULL;
-
- // also deassociate it from the conn!
- trans->conn->query = NULL;
- }
-
- // tell the user
- // XXX: trans is in a bad state during this call
- if (trans->error_fn)
- trans->error_fn(trans, trans->cb_arg);
- else
- WARNING("supressing error because error_fn was NULL");
-
- // deassociate and release the conn
- trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL;
-
- // pump the queue for requests that were waiting for this connection
- _evsql_pump(trans->evsql, NULL);
-
- // free the trans
- _evsql_trans_free(trans);
-}
-
-/*
- * Fail a connection. If the connection is transactional, this will just call _evsql_trans_fail, but otherwise it will
- * fail any ongoing query, and then release the connection.
- */
-static void _evsql_conn_fail (struct evsql_conn *conn) {
- if (conn->trans) {
- // let transactions handle their connection failures
- _evsql_trans_fail(conn->trans);
-
- } else {
- if (conn->query) {
- // fail the in-progress query
- _evsql_query_fail(conn->evsql, conn->query); conn->query = NULL;
- }
-
- // finish off the whole connection
- _evsql_conn_release(conn);
- }
-}
-
-/*
- * Processes enqueued non-transactional queries until the queue is empty, or we managed to exec a query.
- *
- * If execing a query on a connection fails, both the query and the connection are failed (in that order).
- *
- * Any further queries will then also be failed, because there's no reconnection/retry logic yet.
- *
- * This means that if conn is NULL, all queries are failed.
- */
-static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) {
- struct evsql_query *query;
- int err;
-
- // look for waiting queries
- while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
- // dequeue
- TAILQ_REMOVE(&evsql->query_queue, query, entry);
-
- if (conn) {
- // try and execute it
- err = _evsql_query_exec(conn, query, query->command);
- }
-
- // free the command buf
- free(query->command); query->command = NULL;
-
- if (err || !conn) {
- if (!conn) {
- // warn when dropping queries
- WARNING("failing query becuse there are no conns");
- }
-
- // fail the query
- _evsql_query_fail(evsql, query);
-
- if (conn) {
- // fail the connection
- WARNING("failing the connection because a query-exec failed");
-
- _evsql_conn_fail(conn); conn = NULL;
- }
-
- } else {
- // we have succesfully enqueued a query, and we can wait for this connection to complete
- break;
-
- }
-
- // handle the rest of the queue
- }
-
- // ok
- return;
-}
-
-/*
- * Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use.
- */
-static void _evsql_trans_ready (const struct evsql_result_info *res, void *arg) {
- (void) arg;
-
- assert(res->trans);
-
- // check for errors
- if (res->error)
- ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res));
-
- // transaction is now ready for use
- res->trans->ready_fn(res->trans, res->trans->cb_arg);
-
- // good
- return;
-
-error:
- _evsql_trans_fail(res->trans);
-}
-
-/*
- * The transaction's connection is ready, send the 'BEGIN' query.
- *
- * If anything fails, calls _evsql_trans_fail and returns nonzero, zero on success
- */
-static int _evsql_trans_conn_ready (struct evsql *evsql, struct evsql_trans *trans) {
- char trans_sql[EVSQL_QUERY_BEGIN_BUF];
- const char *isolation_level;
- int ret;
-
- // determine the isolation_level to use
- switch (trans->type) {
- case EVSQL_TRANS_DEFAULT:
- isolation_level = NULL; break;
-
- case EVSQL_TRANS_SERIALIZABLE:
- isolation_level = "SERIALIZABLE"; break;
-
- case EVSQL_TRANS_REPEATABLE_READ:
- isolation_level = "REPEATABLE READ"; break;
-
- case EVSQL_TRANS_READ_COMMITTED:
- isolation_level = "READ COMMITTED"; break;
-
- case EVSQL_TRANS_READ_UNCOMMITTED:
- isolation_level = "READ UNCOMMITTED"; break;
-
- default:
- FATAL("trans->type: %d", trans->type);
- }
-
- // build the trans_sql
- if (isolation_level)
- ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION ISOLATION LEVEL %s", isolation_level);
- else
- ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION");
-
- // make sure it wasn't truncated
- if (ret >= EVSQL_QUERY_BEGIN_BUF)
- ERROR("trans_sql overflow: %d >= %d", ret, EVSQL_QUERY_BEGIN_BUF);
-
- // execute the query
- if (evsql_query(evsql, trans, trans_sql, _evsql_trans_ready, NULL) == NULL)
- ERROR("evsql_query");
-
- // success
- return 0;
-
-error:
- // fail the transaction
- _evsql_trans_fail(trans);
-
- return -1;
-}
-
-/*
- * The evpq connection was succesfully established.
- */
-static void _evsql_evpq_connected (struct evpq_conn *_conn, void *arg) {
- struct evsql_conn *conn = arg;
-
- if (conn->trans)
- // notify the transaction
- // don't care about errors
- (void) _evsql_trans_conn_ready(conn->evsql, conn->trans);
-
- else
- // pump any waiting transactionless queries
- _evsql_pump(conn->evsql, conn);
-}
-
-/*
- * Got one result on this evpq connection.
- */
-static void _evsql_evpq_result (struct evpq_conn *_conn, PGresult *result, void *arg) {
- struct evsql_conn *conn = arg;
- struct evsql_query *query = conn->query;
-
- assert(query != NULL);
-
- // if we get multiple results, only return the first one
- if (query->result.evpq) {
- WARNING("[evsql] evpq query returned multiple results, discarding previous one");
-
- PQclear(query->result.evpq); query->result.evpq = NULL;
- }
-
- // remember the result
- query->result.evpq = result;
-}
-
-/*
- * No more results for this query.
- */
-static void _evsql_evpq_done (struct evpq_conn *_conn, void *arg) {
- struct evsql_conn *conn = arg;
- struct evsql_query *query = conn->query;
- struct evsql_result_info res; ZINIT(res);
-
- assert(query != NULL);
-
- // set up the result_info
- res.evsql = conn->evsql;
- res.trans = conn->trans;
-
- if (query->result.evpq == NULL) {
- // if a query didn't return any results (bug?), warn and fail the query
- WARNING("[evsql] evpq query didn't return any results");
-
- res.error = 1;
-
- } else if (strcmp(PQresultErrorMessage(query->result.evpq), "") != 0) {
- // the query failed with some error
- res.error = 1;
- res.result.pq = query->result.evpq;
-
- } else {
- res.error = 0;
- res.result.pq = query->result.evpq;
-
- }
-
- // de-associate the query from the connection
- conn->query = NULL;
-
- // how we handle query completion depends on if we're a transaction or not
- if (conn->trans) {
- // we can deassign the trans's query
- conn->trans->query = NULL;
-
- // was an abort?
- if (!query->cb_fn)
- // notify the user that the transaction query has been aborted
- conn->trans->ready_fn(conn->trans, conn->trans->cb_arg);
-
- // then hand the query to the user
- _evsql_query_done(query, &res);
-
- } else {
- // a transactionless query, so just finish it off and pump any other waiting ones
- _evsql_query_done(query, &res);
-
- // pump the next one
- _evsql_pump(conn->evsql, conn);
- }
-}
-
-/*
- * The connection failed.
- */
-static void _evsql_evpq_failure (struct evpq_conn *_conn, void *arg) {
- struct evsql_conn *conn = arg;
-
- // just fail the conn
- _evsql_conn_fail(conn);
-}
-
-/*
- * Our evpq behaviour
- */
-static struct evpq_callback_info _evsql_evpq_cb_info = {
- .fn_connected = _evsql_evpq_connected,
- .fn_result = _evsql_evpq_result,
- .fn_done = _evsql_evpq_done,
- .fn_failure = _evsql_evpq_failure,
-};
-
-/*
- * Allocate the generic evsql context.
- */
-static struct evsql *_evsql_new_base (struct event_base *ev_base, evsql_error_cb error_fn, void *cb_arg) {
- struct evsql *evsql = NULL;
-
- // allocate it
- if ((evsql = calloc(1, sizeof(*evsql))) == NULL)
- ERROR("calloc");
-
- // store
- evsql->ev_base = ev_base;
- evsql->error_fn = error_fn;
- evsql->cb_arg = cb_arg;
-
- // init
- LIST_INIT(&evsql->conn_list);
- TAILQ_INIT(&evsql->query_queue);
-
- // done
- return evsql;
-
-error:
- return NULL;
-}
-
-/*
- * Start a new connection and add it to the list, it won't be ready until _evsql_evpq_connected is called
- */
-static struct evsql_conn *_evsql_conn_new (struct evsql *evsql) {
- struct evsql_conn *conn = NULL;
-
- // allocate
- if ((conn = calloc(1, sizeof(*conn))) == NULL)
- ERROR("calloc");
-
- // init
- conn->evsql = evsql;
-
- // connect the engine
- switch (evsql->type) {
- case EVSQL_EVPQ:
- if ((conn->engine.evpq = evpq_connect(evsql->ev_base, evsql->engine_conf.evpq, _evsql_evpq_cb_info, conn)) == NULL)
- goto error;
-
- break;
-
- default:
- FATAL("evsql->type");
- }
-
- // add it to the list
- LIST_INSERT_HEAD(&evsql->conn_list, conn, entry);
-
- // success
- return conn;
-
-error:
- free(conn);
-
- return NULL;
-}
-
-struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) {
- struct evsql *evsql = NULL;
-
- // base init
- if ((evsql = _evsql_new_base (ev_base, error_fn, cb_arg)) == NULL)
- goto error;
-
- // store conf
- evsql->engine_conf.evpq = pq_conninfo;
-
- // pre-create one connection
- if (_evsql_conn_new(evsql) == NULL)
- goto error;
-
- // done
- return evsql;
-
-error:
- // XXX: more complicated than this?
- free(evsql);
-
- return NULL;
-}
-
-/*
- * Checks if the connection is already allocated for some other trans/query.
- *
- * Returns:
- * 0 connection idle, can be allocated
- * >1 connection busy
- */
-static int _evsql_conn_busy (struct evsql_conn *conn) {
- // transactions get the connection to themselves
- if (conn->trans)
- return 1;
-
- // if it has a query assigned, it's busy
- if (conn->query)
- return 1;
-
- // otherwise, it's all idle
- return 0;
-}
-
-/*
- * Checks if the connection is ready for use (i.e. _evsql_evpq_connected was called).
- *
- * The connection should not already have a query running.
- *
- * Returns
- * <0 the connection is not valid (failed, query in progress)
- * 0 the connection is still pending, and will become ready at some point
- * >0 it's ready
- */
-static int _evsql_conn_ready (struct evsql_conn *conn) {
- switch (conn->evsql->type) {
- case EVSQL_EVPQ: {
- enum evpq_state state = evpq_state(conn->engine.evpq);
-
- switch (state) {
- case EVPQ_CONNECT:
- return 0;
-
- case EVPQ_CONNECTED:
- return 1;
-
- case EVPQ_QUERY:
- case EVPQ_INIT:
- case EVPQ_FAILURE:
- return -1;
-
- default:
- FATAL("evpq_state: %d", state);
- }
-
- }
-
- default:
- FATAL("evsql->type: %d", conn->evsql->type);
- }
-}
-
-/*
- * Allocate a connection for use and return it via *conn_ptr, or if may_queue is nonzero and the connection pool is
- * getting full, return NULL (query should be queued).
- *
- * Note that the returned connection might not be ready for use yet (if we created a new one, see _evsql_conn_ready).
- *
- * Returns zero if a connection was found or the request should be queued, or nonzero if something failed and the
- * request should be dropped.
- */
-static int _evsql_conn_get (struct evsql *evsql, struct evsql_conn **conn_ptr, int may_queue) {
- int have_nontrans = 0;
- *conn_ptr = NULL;
-
- // find a connection that isn't busy and is ready (unless the query queue is empty).
- LIST_FOREACH(*conn_ptr, &evsql->conn_list, entry) {
- // we can only have a query enqueue itself if there is a non-trans conn it can later use
- if (!(*conn_ptr)->trans)
- have_nontrans = 1;
-
- // skip busy conns always
- if (_evsql_conn_busy(*conn_ptr))
- continue;
-
- // accept pending conns as long as there are NO enqueued queries (might cause deadlock otherwise)
- if (_evsql_conn_ready(*conn_ptr) == 0 && TAILQ_EMPTY(&evsql->query_queue))
- break;
-
- // accept conns that are in a fully ready state
- if (_evsql_conn_ready(*conn_ptr) > 0)
- break;
- }
-
- // if we found an idle connection, we can just return that right away
- if (*conn_ptr)
- return 0;
-
- // return NULL if may_queue and we have a non-trans conn that we can, at some point, use
- if (may_queue && have_nontrans)
- return 0;
-
- // we need to open a new connection
- if ((*conn_ptr = _evsql_conn_new(evsql)) == NULL)
- goto error;
-
- // good
- return 0;
-error:
- return -1;
-}
-
-struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, evsql_trans_done_cb done_fn, void *cb_arg) {
- struct evsql_trans *trans = NULL;
-
- // allocate it
- if ((trans = calloc(1, sizeof(*trans))) == NULL)
- ERROR("calloc");
-
- // store
- trans->evsql = evsql;
- trans->ready_fn = ready_fn;
- trans->done_fn = done_fn;
- trans->cb_arg = cb_arg;
- trans->type = type;
-
- // find a connection
- if (_evsql_conn_get(evsql, &trans->conn, 0))
- ERROR("_evsql_conn_get");
-
- // associate the conn
- trans->conn->trans = trans;
-
- // is it already ready?
- if (_evsql_conn_ready(trans->conn) > 0) {
- // call _evsql_trans_conn_ready directly, it will handle cleanup (silently, !error_fn)
- if (_evsql_trans_conn_ready(evsql, trans)) {
- // return NULL directly
- return NULL;
- }
-
- } else {
- // otherwise, wait for the conn to be ready
-
- }
-
- // and let it pass errors to the user
- trans->error_fn = error_fn;
-
- // ok
- return trans;
-
-error:
- free(trans);
-
- return NULL;
-}
-
-/*
- * Validate and allocate the basic stuff for a new query.
- */
-static struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) {
- struct evsql_query *query = NULL;
-
- // if it's part of a trans, then make sure the trans is idle
- if (trans && trans->query)
- ERROR("transaction is busy");
-
- // allocate it
- if ((query = calloc(1, sizeof(*query))) == NULL)
- ERROR("calloc");
-
- // store
- query->cb_fn = query_fn;
- query->cb_arg = cb_arg;
-
- // success
- return query;
-
-error:
- return NULL;
-}
-
-/*
- * Handle a new query.
- *
- * For transactions this will associate the query and then execute it, otherwise this will either find an idle
- * connection and send the query, or enqueue it.
- */
-static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_trans *trans, struct evsql_query *query, const char *command) {
- // transaction queries are handled differently
- if (trans) {
- // it's an in-transaction query
- assert(trans->query == NULL);
-
- // assign the query
- trans->query = query;
-
- // execute directly
- if (_evsql_query_exec(trans->conn, query, command)) {
- // ack, fail the transaction
- _evsql_trans_fail(trans);
-
- // caller frees query
- goto error;
- }
-
- } else {
- struct evsql_conn *conn;
-
- // find an idle connection
- if ((_evsql_conn_get(evsql, &conn, 1)))
- ERROR("couldn't allocate a connection for the query");
-
- // we must enqueue if no idle conn or the conn is not yet ready
- if (conn && _evsql_conn_ready(conn) > 0) {
- // execute directly
- if (_evsql_query_exec(conn, query, command)) {
- // ack, fail the connection
- _evsql_conn_fail(conn);
-
- // make sure we don't deadlock any queries, but if this query got a conn directly, then we shouldn't
- // have any queries enqueued anyways
- assert(TAILQ_EMPTY(&evsql->query_queue));
-
- // caller frees query
- goto error;
- }
-
- } else {
- // copy the command for later execution
- if ((query->command = strdup(command)) == NULL)
- ERROR("strdup");
-
- // enqueue until some connection pumps the queue
- TAILQ_INSERT_TAIL(&evsql->query_queue, query, entry);
- }
- }
-
- // ok, good
- return 0;
-
-error:
- return -1;
-}
-
-struct evsql_query *evsql_query (struct evsql *evsql, struct evsql_trans *trans, const char *command, evsql_query_cb query_fn, void *cb_arg) {
- struct evsql_query *query = NULL;
-
- // alloc new query
- if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL)
- goto error;
-
- // just execute the command string directly
- if (_evsql_query_enqueue(evsql, trans, query, command))
- goto error;
-
- // ok
- return query;
-
-error:
- _evsql_query_free(query);
-
- return NULL;
-}
-
-struct evsql_query *evsql_query_params (struct evsql *evsql, struct evsql_trans *trans, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg) {
- struct evsql_query *query = NULL;
- const struct evsql_query_param *param;
- int idx;
-
- // alloc new query
- if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL)
- goto error;
-
- // count the params
- for (param = params->list; param->type; param++)
- query->params.count++;
-
- // allocate the vertical storage for the parameters
- if (0
-
-// !(query->params.types = calloc(query->params.count, sizeof(Oid)))
- || !(query->params.values = calloc(query->params.count, sizeof(char *)))
- || !(query->params.lengths = calloc(query->params.count, sizeof(int)))
- || !(query->params.formats = calloc(query->params.count, sizeof(int)))
- )
- ERROR("calloc");
-
- // transform
- for (param = params->list, idx = 0; param->type; param++, idx++) {
- // `types` stays NULL
- // query->params.types[idx] = 0;
-
- // values
- query->params.values[idx] = param->data_raw;
-
- // lengths
- query->params.lengths[idx] = param->length;
-
- // formats, binary if length is nonzero
- query->params.formats[idx] = param->length ? 1 : 0;
- }
-
- // result format
- switch (params->result_fmt) {
- case EVSQL_FMT_TEXT:
- query->params.result_format = 0; break;
-
- case EVSQL_FMT_BINARY:
- query->params.result_format = 1; break;
-
- default:
- FATAL("params.result_fmt: %d", params->result_fmt);
- }
-
- // execute it
- if (_evsql_query_enqueue(evsql, trans, query, command))
- goto error;
-
- // ok
- return query;
-
-error:
- _evsql_query_free(query);
-
- return NULL;
-}
-
-void evsql_query_abort (struct evsql_trans *trans, struct evsql_query *query) {
- assert(query);
-
- if (trans) {
- // must be the right query
- assert(trans->query == query);
- }
-
- // just strip the callback and wait for it to complete as normal
- query->cb_fn = NULL;
-}
-
-void _evsql_trans_commit_res (const struct evsql_result_info *res, void *arg) {
- (void) arg;
-
- assert(res->trans);
-
- // check for errors
- if (res->error)
- ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res));
-
- // transaction is now done
- res->trans->done_fn(res->trans, res->trans->cb_arg);
-
- // release it
- _evsql_trans_release(res->trans);
-
- // success
- return;
-
-error:
- _evsql_trans_fail(res->trans);
-}
-
-int evsql_trans_commit (struct evsql_trans *trans) {
- static const char *sql = "COMMIT TRANSACTION";
-
- if (trans->query)
- ERROR("cannot COMMIT because transaction is still busy");
-
- // query
- if (evsql_query(trans->evsql, trans, sql, _evsql_trans_commit_res, NULL) == NULL)
- goto error;
-
- // mark it as commited in case someone wants to abort it
- trans->has_commit = 1;
-
- // success
- return 0;
-
-error:
- return -1;
-}
-
-void _evsql_trans_rollback_res (const struct evsql_result_info *res, void *arg) {
- (void) arg;
-
- assert(res->trans);
-
- // fail the connection on errors
- if (res->error)
- ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res));
-
- // release it
- _evsql_trans_release(res->trans);
-
- // success
- return;
-
-error:
- // fail the connection too, errors are supressed
- _evsql_trans_fail(res->trans);
-}
-
-/*
- * Used as the ready_fn callback in case of abort, otherwise directly
- */
-void _evsql_trans_rollback (struct evsql_trans *trans, void *unused) {
- static const char *sql = "ROLLBACK TRANSACTION";
-
- (void) unused;
-
- // query
- if (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, NULL) == NULL) {
- // fail the transaction/connection
- _evsql_trans_fail(trans);
- }
-
-}
-
-void evsql_trans_abort (struct evsql_trans *trans) {
- // supress errors
- trans->error_fn = NULL;
-
- if (trans->has_commit) {
- // abort after commit doesn't make sense
- FATAL("transaction was already commited");
- }
-
- if (trans->query) {
- // gah, some query is running
- WARNING("aborting pending query");
-
- // prepare to rollback once complete
- trans->ready_fn = _evsql_trans_rollback;
-
- // abort
- evsql_query_abort(trans, trans->query);
-
- } else {
- // just rollback directly
- _evsql_trans_rollback(trans, NULL);
-
- }
-}
-
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/evsql/evsql.c Thu Oct 16 22:04:53 2008 +0300
@@ -0,0 +1,1017 @@
+#define _GNU_SOURCE
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+
+#include "evsql.h"
+#include "../lib/log.h"
+#include "../lib/error.h"
+#include "../lib/misc.h"
+
+/*
+ * A couple function prototypes
+ */
+static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn);
+
+/*
+ * Actually execute the given query.
+ *
+ * The backend should be able to accept the query at this time.
+ *
+ * You should assume that if trying to execute a query fails, then the connection should also be considred as failed.
+ */
+static int _evsql_query_exec (struct evsql_conn *conn, struct evsql_query *query, const char *command) {
+ int err;
+
+ switch (conn->evsql->type) {
+ case EVSQL_EVPQ:
+ // got params?
+ if (query->params.count) {
+ err = evpq_query_params(conn->engine.evpq, command,
+ query->params.count,
+ query->params.types,
+ query->params.values,
+ query->params.lengths,
+ query->params.formats,
+ query->params.result_format
+ );
+
+ } else {
+ // plain 'ole query
+ err = evpq_query(conn->engine.evpq, command);
+ }
+
+ if (err) {
+ if (PQstatus(evpq_pgconn(conn->engine.evpq)) != CONNECTION_OK)
+ WARNING("conn failed");
+ else
+ WARNING("query failed, dropping conn as well");
+ }
+
+ break;
+
+ default:
+ FATAL("evsql->type");
+ }
+
+ if (!err)
+ // assign the query
+ conn->query = query;
+
+ return err;
+}
+
+/*
+ * Free the query and related resources, doesn't trigger any callbacks or remove from any queues.
+ *
+ * The command should already be taken care of (NULL).
+ */
+static void _evsql_query_free (struct evsql_query *query) {
+ if (!query)
+ return;
+
+ assert(query->command == NULL);
+
+ // free params if present
+ free(query->params.types);
+ free(query->params.values);
+ free(query->params.lengths);
+ free(query->params.formats);
+
+ // free the query itself
+ free(query);
+}
+
+/*
+ * Execute the callback if res is given, and free the query.
+ *
+ * The query has been aborted, it will simply be freed
+ */
+static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *res) {
+ if (res) {
+ if (query->cb_fn)
+ // call the callback
+ query->cb_fn(res, query->cb_arg);
+ else
+ WARNING("supressing cb_fn because query was aborted");
+ }
+
+ // free
+ _evsql_query_free(query);
+}
+
+/*
+ * XXX:
+ * /
+static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *res) {
+ struct evsql_query *query;
+
+ // clear the queue
+ while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
+ _evsql_query_done(query, res);
+
+ TAILQ_REMOVE(&evsql->query_queue, query, entry);
+ }
+
+ // free
+ free(evsql);
+}
+*/
+
+/*
+ * Free the transaction, it should already be deassociated from the query and conn.
+ */
+static void _evsql_trans_free (struct evsql_trans *trans) {
+ // ensure we don't leak anything
+ assert(trans->query == NULL);
+ assert(trans->conn == NULL);
+
+ // free
+ free(trans);
+}
+
+/*
+ * Release a connection. It should already be deassociated from the trans and query.
+ *
+ * Releases the engine, removes from the conn_list and frees this.
+ */
+static void _evsql_conn_release (struct evsql_conn *conn) {
+ // ensure we don't leak anything
+ assert(conn->trans == NULL);
+ assert(conn->query == NULL);
+
+ // release the engine
+ switch (conn->evsql->type) {
+ case EVSQL_EVPQ:
+ evpq_release(conn->engine.evpq);
+ break;
+
+ default:
+ FATAL("evsql->type");
+ }
+
+ // remove from list
+ LIST_REMOVE(conn, entry);
+
+ // catch deadlocks
+ assert(!LIST_EMPTY(&conn->evsql->conn_list) || TAILQ_EMPTY(&conn->evsql->query_queue));
+
+ // free
+ free(conn);
+}
+
+/*
+ * Release a transaction, it should already be deassociated from the query.
+ *
+ * Perform a two-way-deassociation with the conn, and then free the trans.
+ */
+static void _evsql_trans_release (struct evsql_trans *trans) {
+ assert(trans->query == NULL);
+ assert(trans->conn != NULL);
+
+ // deassociate the conn
+ trans->conn->trans = NULL; trans->conn = NULL;
+
+ // free the trans
+ _evsql_trans_free(trans);
+}
+
+/*
+ * Fail a single query, this will trigger the callback and free it.
+ *
+ * NOTE: Only for *TRANSACTIONLESS* queries.
+ */
+static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) {
+ struct evsql_result_info res; ZINIT(res);
+
+ // set up the result_info
+ res.evsql = evsql;
+ res.trans = NULL;
+ res.error = 1;
+
+ // finish off the query
+ _evsql_query_done(query, &res);
+}
+
+/*
+ * Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the
+ * conn, and then free the trans.
+ */
+static void _evsql_trans_fail (struct evsql_trans *trans) {
+ if (trans->query) {
+ // free the query silently
+ _evsql_query_free(trans->query); trans->query = NULL;
+
+ // also deassociate it from the conn!
+ trans->conn->query = NULL;
+ }
+
+ // tell the user
+ // XXX: trans is in a bad state during this call
+ if (trans->error_fn)
+ trans->error_fn(trans, trans->cb_arg);
+ else
+ WARNING("supressing error because error_fn was NULL");
+
+ // deassociate and release the conn
+ trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL;
+
+ // pump the queue for requests that were waiting for this connection
+ _evsql_pump(trans->evsql, NULL);
+
+ // free the trans
+ _evsql_trans_free(trans);
+}
+
+/*
+ * Fail a connection. If the connection is transactional, this will just call _evsql_trans_fail, but otherwise it will
+ * fail any ongoing query, and then release the connection.
+ */
+static void _evsql_conn_fail (struct evsql_conn *conn) {
+ if (conn->trans) {
+ // let transactions handle their connection failures
+ _evsql_trans_fail(conn->trans);
+
+ } else {
+ if (conn->query) {
+ // fail the in-progress query
+ _evsql_query_fail(conn->evsql, conn->query); conn->query = NULL;
+ }
+
+ // finish off the whole connection
+ _evsql_conn_release(conn);
+ }
+}
+
+/*
+ * Processes enqueued non-transactional queries until the queue is empty, or we managed to exec a query.
+ *
+ * If execing a query on a connection fails, both the query and the connection are failed (in that order).
+ *
+ * Any further queries will then also be failed, because there's no reconnection/retry logic yet.
+ *
+ * This means that if conn is NULL, all queries are failed.
+ */
+static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) {
+ struct evsql_query *query;
+ int err;
+
+ // look for waiting queries
+ while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
+ // dequeue
+ TAILQ_REMOVE(&evsql->query_queue, query, entry);
+
+ if (conn) {
+ // try and execute it
+ err = _evsql_query_exec(conn, query, query->command);
+ }
+
+ // free the command buf
+ free(query->command); query->command = NULL;
+
+ if (err || !conn) {
+ if (!conn) {
+ // warn when dropping queries
+ WARNING("failing query becuse there are no conns");
+ }
+
+ // fail the query
+ _evsql_query_fail(evsql, query);
+
+ if (conn) {
+ // fail the connection
+ WARNING("failing the connection because a query-exec failed");
+
+ _evsql_conn_fail(conn); conn = NULL;
+ }
+
+ } else {
+ // we have succesfully enqueued a query, and we can wait for this connection to complete
+ break;
+
+ }
+
+ // handle the rest of the queue
+ }
+
+ // ok
+ return;
+}
+
+/*
+ * Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use.
+ */
+static void _evsql_trans_ready (const struct evsql_result_info *res, void *arg) {
+ (void) arg;
+
+ assert(res->trans);
+
+ // check for errors
+ if (res->error)
+ ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res));
+
+ // transaction is now ready for use
+ res->trans->ready_fn(res->trans, res->trans->cb_arg);
+
+ // good
+ return;
+
+error:
+ _evsql_trans_fail(res->trans);
+}
+
+/*
+ * The transaction's connection is ready, send the 'BEGIN' query.
+ *
+ * If anything fails, calls _evsql_trans_fail and returns nonzero, zero on success
+ */
+static int _evsql_trans_conn_ready (struct evsql *evsql, struct evsql_trans *trans) {
+ char trans_sql[EVSQL_QUERY_BEGIN_BUF];
+ const char *isolation_level;
+ int ret;
+
+ // determine the isolation_level to use
+ switch (trans->type) {
+ case EVSQL_TRANS_DEFAULT:
+ isolation_level = NULL; break;
+
+ case EVSQL_TRANS_SERIALIZABLE:
+ isolation_level = "SERIALIZABLE"; break;
+
+ case EVSQL_TRANS_REPEATABLE_READ:
+ isolation_level = "REPEATABLE READ"; break;
+
+ case EVSQL_TRANS_READ_COMMITTED:
+ isolation_level = "READ COMMITTED"; break;
+
+ case EVSQL_TRANS_READ_UNCOMMITTED:
+ isolation_level = "READ UNCOMMITTED"; break;
+
+ default:
+ FATAL("trans->type: %d", trans->type);
+ }
+
+ // build the trans_sql
+ if (isolation_level)
+ ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION ISOLATION LEVEL %s", isolation_level);
+ else
+ ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION");
+
+ // make sure it wasn't truncated
+ if (ret >= EVSQL_QUERY_BEGIN_BUF)
+ ERROR("trans_sql overflow: %d >= %d", ret, EVSQL_QUERY_BEGIN_BUF);
+
+ // execute the query
+ if (evsql_query(evsql, trans, trans_sql, _evsql_trans_ready, NULL) == NULL)
+ ERROR("evsql_query");
+
+ // success
+ return 0;
+
+error:
+ // fail the transaction
+ _evsql_trans_fail(trans);
+
+ return -1;
+}
+
+/*
+ * The evpq connection was succesfully established.
+ */
+static void _evsql_evpq_connected (struct evpq_conn *_conn, void *arg) {
+ struct evsql_conn *conn = arg;
+
+ if (conn->trans)
+ // notify the transaction
+ // don't care about errors
+ (void) _evsql_trans_conn_ready(conn->evsql, conn->trans);
+
+ else
+ // pump any waiting transactionless queries
+ _evsql_pump(conn->evsql, conn);
+}
+
+/*
+ * Got one result on this evpq connection.
+ */
+static void _evsql_evpq_result (struct evpq_conn *_conn, PGresult *result, void *arg) {
+ struct evsql_conn *conn = arg;
+ struct evsql_query *query = conn->query;
+
+ assert(query != NULL);
+
+ // if we get multiple results, only return the first one
+ if (query->result.evpq) {
+ WARNING("[evsql] evpq query returned multiple results, discarding previous one");
+
+ PQclear(query->result.evpq); query->result.evpq = NULL;
+ }
+
+ // remember the result
+ query->result.evpq = result;
+}
+
+/*
+ * No more results for this query.
+ */
+static void _evsql_evpq_done (struct evpq_conn *_conn, void *arg) {
+ struct evsql_conn *conn = arg;
+ struct evsql_query *query = conn->query;
+ struct evsql_result_info res; ZINIT(res);
+
+ assert(query != NULL);
+
+ // set up the result_info
+ res.evsql = conn->evsql;
+ res.trans = conn->trans;
+
+ if (query->result.evpq == NULL) {
+ // if a query didn't return any results (bug?), warn and fail the query
+ WARNING("[evsql] evpq query didn't return any results");
+
+ res.error = 1;
+
+ } else if (strcmp(PQresultErrorMessage(query->result.evpq), "") != 0) {
+ // the query failed with some error
+ res.error = 1;
+ res.result.pq = query->result.evpq;
+
+ } else {
+ res.error = 0;
+ res.result.pq = query->result.evpq;
+
+ }
+
+ // de-associate the query from the connection
+ conn->query = NULL;
+
+ // how we handle query completion depends on if we're a transaction or not
+ if (conn->trans) {
+ // we can deassign the trans's query
+ conn->trans->query = NULL;
+
+ // was an abort?
+ if (!query->cb_fn)
+ // notify the user that the transaction query has been aborted
+ conn->trans->ready_fn(conn->trans, conn->trans->cb_arg);
+
+ // then hand the query to the user
+ _evsql_query_done(query, &res);
+
+ } else {
+ // a transactionless query, so just finish it off and pump any other waiting ones
+ _evsql_query_done(query, &res);
+
+ // pump the next one
+ _evsql_pump(conn->evsql, conn);
+ }
+}
+
+/*
+ * The connection failed.
+ */
+static void _evsql_evpq_failure (struct evpq_conn *_conn, void *arg) {
+ struct evsql_conn *conn = arg;
+
+ // just fail the conn
+ _evsql_conn_fail(conn);
+}
+
+/*
+ * Our evpq behaviour
+ */
+static struct evpq_callback_info _evsql_evpq_cb_info = {
+ .fn_connected = _evsql_evpq_connected,
+ .fn_result = _evsql_evpq_result,
+ .fn_done = _evsql_evpq_done,
+ .fn_failure = _evsql_evpq_failure,
+};
+
+/*
+ * Allocate the generic evsql context.
+ */
+static struct evsql *_evsql_new_base (struct event_base *ev_base, evsql_error_cb error_fn, void *cb_arg) {
+ struct evsql *evsql = NULL;
+
+ // allocate it
+ if ((evsql = calloc(1, sizeof(*evsql))) == NULL)
+ ERROR("calloc");
+
+ // store
+ evsql->ev_base = ev_base;
+ evsql->error_fn = error_fn;
+ evsql->cb_arg = cb_arg;
+
+ // init
+ LIST_INIT(&evsql->conn_list);
+ TAILQ_INIT(&evsql->query_queue);
+
+ // done
+ return evsql;
+
+error:
+ return NULL;
+}
+
+/*
+ * Start a new connection and add it to the list, it won't be ready until _evsql_evpq_connected is called
+ */
+static struct evsql_conn *_evsql_conn_new (struct evsql *evsql) {
+ struct evsql_conn *conn = NULL;
+
+ // allocate
+ if ((conn = calloc(1, sizeof(*conn))) == NULL)
+ ERROR("calloc");
+
+ // init
+ conn->evsql = evsql;
+
+ // connect the engine
+ switch (evsql->type) {
+ case EVSQL_EVPQ:
+ if ((conn->engine.evpq = evpq_connect(evsql->ev_base, evsql->engine_conf.evpq, _evsql_evpq_cb_info, conn)) == NULL)
+ goto error;
+
+ break;
+
+ default:
+ FATAL("evsql->type");
+ }
+
+ // add it to the list
+ LIST_INSERT_HEAD(&evsql->conn_list, conn, entry);
+
+ // success
+ return conn;
+
+error:
+ free(conn);
+
+ return NULL;
+}
+
+struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) {
+ struct evsql *evsql = NULL;
+
+ // base init
+ if ((evsql = _evsql_new_base (ev_base, error_fn, cb_arg)) == NULL)
+ goto error;
+
+ // store conf
+ evsql->engine_conf.evpq = pq_conninfo;
+
+ // pre-create one connection
+ if (_evsql_conn_new(evsql) == NULL)
+ goto error;
+
+ // done
+ return evsql;
+
+error:
+ // XXX: more complicated than this?
+ free(evsql);
+
+ return NULL;
+}
+
+/*
+ * Checks if the connection is already allocated for some other trans/query.
+ *
+ * Returns:
+ * 0 connection idle, can be allocated
+ * >1 connection busy
+ */
+static int _evsql_conn_busy (struct evsql_conn *conn) {
+ // transactions get the connection to themselves
+ if (conn->trans)
+ return 1;
+
+ // if it has a query assigned, it's busy
+ if (conn->query)
+ return 1;
+
+ // otherwise, it's all idle
+ return 0;
+}
+
+/*
+ * Checks if the connection is ready for use (i.e. _evsql_evpq_connected was called).
+ *
+ * The connection should not already have a query running.
+ *
+ * Returns
+ * <0 the connection is not valid (failed, query in progress)
+ * 0 the connection is still pending, and will become ready at some point
+ * >0 it's ready
+ */
+static int _evsql_conn_ready (struct evsql_conn *conn) {
+ switch (conn->evsql->type) {
+ case EVSQL_EVPQ: {
+ enum evpq_state state = evpq_state(conn->engine.evpq);
+
+ switch (state) {
+ case EVPQ_CONNECT:
+ return 0;
+
+ case EVPQ_CONNECTED:
+ return 1;
+
+ case EVPQ_QUERY:
+ case EVPQ_INIT:
+ case EVPQ_FAILURE:
+ return -1;
+
+ default:
+ FATAL("evpq_state: %d", state);
+ }
+
+ }
+
+ default:
+ FATAL("evsql->type: %d", conn->evsql->type);
+ }
+}
+
+/*
+ * Allocate a connection for use and return it via *conn_ptr, or if may_queue is nonzero and the connection pool is
+ * getting full, return NULL (query should be queued).
+ *
+ * Note that the returned connection might not be ready for use yet (if we created a new one, see _evsql_conn_ready).
+ *
+ * Returns zero if a connection was found or the request should be queued, or nonzero if something failed and the
+ * request should be dropped.
+ */
+static int _evsql_conn_get (struct evsql *evsql, struct evsql_conn **conn_ptr, int may_queue) {
+ int have_nontrans = 0;
+ *conn_ptr = NULL;
+
+ // find a connection that isn't busy and is ready (unless the query queue is empty).
+ LIST_FOREACH(*conn_ptr, &evsql->conn_list, entry) {
+ // we can only have a query enqueue itself if there is a non-trans conn it can later use
+ if (!(*conn_ptr)->trans)
+ have_nontrans = 1;
+
+ // skip busy conns always
+ if (_evsql_conn_busy(*conn_ptr))
+ continue;
+
+ // accept pending conns as long as there are NO enqueued queries (might cause deadlock otherwise)
+ if (_evsql_conn_ready(*conn_ptr) == 0 && TAILQ_EMPTY(&evsql->query_queue))
+ break;
+
+ // accept conns that are in a fully ready state
+ if (_evsql_conn_ready(*conn_ptr) > 0)
+ break;
+ }
+
+ // if we found an idle connection, we can just return that right away
+ if (*conn_ptr)
+ return 0;
+
+ // return NULL if may_queue and we have a non-trans conn that we can, at some point, use
+ if (may_queue && have_nontrans)
+ return 0;
+
+ // we need to open a new connection
+ if ((*conn_ptr = _evsql_conn_new(evsql)) == NULL)
+ goto error;
+
+ // good
+ return 0;
+error:
+ return -1;
+}
+
+struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, evsql_trans_done_cb done_fn, void *cb_arg) {
+ struct evsql_trans *trans = NULL;
+
+ // allocate it
+ if ((trans = calloc(1, sizeof(*trans))) == NULL)
+ ERROR("calloc");
+
+ // store
+ trans->evsql = evsql;
+ trans->ready_fn = ready_fn;
+ trans->done_fn = done_fn;
+ trans->cb_arg = cb_arg;
+ trans->type = type;
+
+ // find a connection
+ if (_evsql_conn_get(evsql, &trans->conn, 0))
+ ERROR("_evsql_conn_get");
+
+ // associate the conn
+ trans->conn->trans = trans;
+
+ // is it already ready?
+ if (_evsql_conn_ready(trans->conn) > 0) {
+ // call _evsql_trans_conn_ready directly, it will handle cleanup (silently, !error_fn)
+ if (_evsql_trans_conn_ready(evsql, trans)) {
+ // return NULL directly
+ return NULL;
+ }
+
+ } else {
+ // otherwise, wait for the conn to be ready
+
+ }
+
+ // and let it pass errors to the user
+ trans->error_fn = error_fn;
+
+ // ok
+ return trans;
+
+error:
+ free(trans);
+
+ return NULL;
+}
+
+/*
+ * Validate and allocate the basic stuff for a new query.
+ */
+static struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) {
+ struct evsql_query *query = NULL;
+
+ // if it's part of a trans, then make sure the trans is idle
+ if (trans && trans->query)
+ ERROR("transaction is busy");
+
+ // allocate it
+ if ((query = calloc(1, sizeof(*query))) == NULL)
+ ERROR("calloc");
+
+ // store
+ query->cb_fn = query_fn;
+ query->cb_arg = cb_arg;
+
+ // success
+ return query;
+
+error:
+ return NULL;
+}
+
+/*
+ * Handle a new query.
+ *
+ * For transactions this will associate the query and then execute it, otherwise this will either find an idle
+ * connection and send the query, or enqueue it.
+ */
+static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_trans *trans, struct evsql_query *query, const char *command) {
+ // transaction queries are handled differently
+ if (trans) {
+ // it's an in-transaction query
+ assert(trans->query == NULL);
+
+ // assign the query
+ trans->query = query;
+
+ // execute directly
+ if (_evsql_query_exec(trans->conn, query, command)) {
+ // ack, fail the transaction
+ _evsql_trans_fail(trans);
+
+ // caller frees query
+ goto error;
+ }
+
+ } else {
+ struct evsql_conn *conn;
+
+ // find an idle connection
+ if ((_evsql_conn_get(evsql, &conn, 1)))
+ ERROR("couldn't allocate a connection for the query");
+
+ // we must enqueue if no idle conn or the conn is not yet ready
+ if (conn && _evsql_conn_ready(conn) > 0) {
+ // execute directly
+ if (_evsql_query_exec(conn, query, command)) {
+ // ack, fail the connection
+ _evsql_conn_fail(conn);
+
+ // make sure we don't deadlock any queries, but if this query got a conn directly, then we shouldn't
+ // have any queries enqueued anyways
+ assert(TAILQ_EMPTY(&evsql->query_queue));
+
+ // caller frees query
+ goto error;
+ }
+
+ } else {
+ // copy the command for later execution
+ if ((query->command = strdup(command)) == NULL)
+ ERROR("strdup");
+
+ // enqueue until some connection pumps the queue
+ TAILQ_INSERT_TAIL(&evsql->query_queue, query, entry);
+ }
+ }
+
+ // ok, good
+ return 0;
+
+error:
+ return -1;
+}
+
+struct evsql_query *evsql_query (struct evsql *evsql, struct evsql_trans *trans, const char *command, evsql_query_cb query_fn, void *cb_arg) {
+ struct evsql_query *query = NULL;
+
+ // alloc new query
+ if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL)
+ goto error;
+
+ // just execute the command string directly
+ if (_evsql_query_enqueue(evsql, trans, query, command))
+ goto error;
+
+ // ok
+ return query;
+
+error:
+ _evsql_query_free(query);
+
+ return NULL;
+}
+
+struct evsql_query *evsql_query_params (struct evsql *evsql, struct evsql_trans *trans, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg) {
+ struct evsql_query *query = NULL;
+ const struct evsql_query_param *param;
+ int idx;
+
+ // alloc new query
+ if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL)
+ goto error;
+
+ // count the params
+ for (param = params->list; param->type; param++)
+ query->params.count++;
+
+ // allocate the vertical storage for the parameters
+ if (0
+
+// !(query->params.types = calloc(query->params.count, sizeof(Oid)))
+ || !(query->params.values = calloc(query->params.count, sizeof(char *)))
+ || !(query->params.lengths = calloc(query->params.count, sizeof(int)))
+ || !(query->params.formats = calloc(query->params.count, sizeof(int)))
+ )
+ ERROR("calloc");
+
+ // transform
+ for (param = params->list, idx = 0; param->type; param++, idx++) {
+ // `types` stays NULL
+ // query->params.types[idx] = 0;
+
+ // values
+ query->params.values[idx] = param->data_raw;
+
+ // lengths
+ query->params.lengths[idx] = param->length;
+
+ // formats, binary if length is nonzero
+ query->params.formats[idx] = param->length ? 1 : 0;
+ }
+
+ // result format
+ switch (params->result_fmt) {
+ case EVSQL_FMT_TEXT:
+ query->params.result_format = 0; break;
+
+ case EVSQL_FMT_BINARY:
+ query->params.result_format = 1; break;
+
+ default:
+ FATAL("params.result_fmt: %d", params->result_fmt);
+ }
+
+ // execute it
+ if (_evsql_query_enqueue(evsql, trans, query, command))
+ goto error;
+
+ // ok
+ return query;
+
+error:
+ _evsql_query_free(query);
+
+ return NULL;
+}
+
+void evsql_query_abort (struct evsql_trans *trans, struct evsql_query *query) {
+ assert(query);
+
+ if (trans) {
+ // must be the right query
+ assert(trans->query == query);
+ }
+
+ // just strip the callback and wait for it to complete as normal
+ query->cb_fn = NULL;
+}
+
+void _evsql_trans_commit_res (const struct evsql_result_info *res, void *arg) {
+ (void) arg;
+
+ assert(res->trans);
+
+ // check for errors
+ if (res->error)
+ ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res));
+
+ // transaction is now done
+ res->trans->done_fn(res->trans, res->trans->cb_arg);
+
+ // release it
+ _evsql_trans_release(res->trans);
+
+ // success
+ return;
+
+error:
+ _evsql_trans_fail(res->trans);
+}
+
+int evsql_trans_commit (struct evsql_trans *trans) {
+ static const char *sql = "COMMIT TRANSACTION";
+
+ if (trans->query)
+ ERROR("cannot COMMIT because transaction is still busy");
+
+ // query
+ if (evsql_query(trans->evsql, trans, sql, _evsql_trans_commit_res, NULL) == NULL)
+ goto error;
+
+ // mark it as commited in case someone wants to abort it
+ trans->has_commit = 1;
+
+ // success
+ return 0;
+
+error:
+ return -1;
+}
+
+void _evsql_trans_rollback_res (const struct evsql_result_info *res, void *arg) {
+ (void) arg;
+
+ assert(res->trans);
+
+ // fail the connection on errors
+ if (res->error)
+ ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res));
+
+ // release it
+ _evsql_trans_release(res->trans);
+
+ // success
+ return;
+
+error:
+ // fail the connection too, errors are supressed
+ _evsql_trans_fail(res->trans);
+}
+
+/*
+ * Used as the ready_fn callback in case of abort, otherwise directly
+ */
+void _evsql_trans_rollback (struct evsql_trans *trans, void *unused) {
+ static const char *sql = "ROLLBACK TRANSACTION";
+
+ (void) unused;
+
+ // query
+ if (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, NULL) == NULL) {
+ // fail the transaction/connection
+ _evsql_trans_fail(trans);
+ }
+
+}
+
+void evsql_trans_abort (struct evsql_trans *trans) {
+ // supress errors
+ trans->error_fn = NULL;
+
+ if (trans->has_commit) {
+ // abort after commit doesn't make sense
+ FATAL("transaction was already commited");
+ }
+
+ if (trans->query) {
+ // gah, some query is running
+ WARNING("aborting pending query");
+
+ // prepare to rollback once complete
+ trans->ready_fn = _evsql_trans_rollback;
+
+ // abort
+ evsql_query_abort(trans, trans->query);
+
+ } else {
+ // just rollback directly
+ _evsql_trans_rollback(trans, NULL);
+
+ }
+}
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/evsql/evsql.h Thu Oct 16 22:04:53 2008 +0300
@@ -0,0 +1,136 @@
+#ifndef EVSQL_INTERNAL_H
+#define EVSQL_INTERNAL_H
+
+/*
+ * Internal interfaces
+ */
+
+#include <sys/queue.h>
+
+#include <event2/event.h>
+
+#include "../evsql.h"
+#include "../evpq.h"
+
+/*
+ * The engine type
+ */
+enum evsql_type {
+ EVSQL_EVPQ, // evpq
+};
+
+/*
+ * Contains the type, engine configuration, list of connections and waiting query queue.
+ */
+struct evsql {
+ // what event_base to use
+ struct event_base *ev_base;
+
+ // what engine we use
+ enum evsql_type type;
+
+ // callbacks
+ evsql_error_cb error_fn;
+ void *cb_arg;
+
+ // engine-specific connection configuration
+ union {
+ const char *evpq;
+ } engine_conf;
+
+ // list of connections that are open
+ LIST_HEAD(evsql_conn_list, evsql_conn) conn_list;
+
+ // list of queries running or waiting to run
+ TAILQ_HEAD(evsql_query_queue, evsql_query) query_queue;
+};
+
+/*
+ * A single connection to the server.
+ *
+ * Contains the engine connection, may have a transaction associated, and may have a query associated.
+ */
+struct evsql_conn {
+ // evsql we belong to
+ struct evsql *evsql;
+
+ // engine-specific connection info
+ union {
+ struct evpq_conn *evpq;
+ } engine;
+
+ // our position in the conn list
+ LIST_ENTRY(evsql_conn) entry;
+
+ // are we running a transaction?
+ struct evsql_trans *trans;
+
+ // are we running a transactionless query?
+ struct evsql_query *query;
+};
+
+/*
+ * A single transaction.
+ *
+ * Has a connection associated and possibly a query (which will also be associated with the connection)
+ */
+struct evsql_trans {
+ // our evsql_conn/evsql
+ struct evsql *evsql;
+ struct evsql_conn *conn;
+
+ // callbacks
+ evsql_trans_error_cb error_fn;
+ evsql_trans_ready_cb ready_fn;
+ evsql_trans_done_cb done_fn;
+ void *cb_arg;
+
+ // the transaction type
+ enum evsql_trans_type type;
+
+ // has evsql_trans_commit be called?
+ int has_commit : 1;
+
+ // our current query
+ struct evsql_query *query;
+
+};
+
+/*
+ * A single query.
+ *
+ * Has the info needed to exec the query (as these may be queued), and the callback/result info.
+ */
+struct evsql_query {
+ // the actual SQL query, this may or may not be ours, see _evsql_query_exec
+ char *command;
+
+ // possible query params
+ struct evsql_query_param_info {
+ int count;
+
+ Oid *types;
+ const char **values;
+ int *lengths;
+ int *formats;
+
+ int result_format;
+ } params;
+
+ // our callback
+ evsql_query_cb cb_fn;
+ void *cb_arg;
+
+ // our position in the query list
+ TAILQ_ENTRY(evsql_query) entry;
+
+ // the result
+ union {
+ PGresult *evpq;
+ } result;
+};
+
+// maximum length for a 'BEGIN TRANSACTION ...' query
+#define EVSQL_QUERY_BEGIN_BUF 512
+
+#endif /* EVSQL_INTERNAL_H */
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/evsql/util.c Thu Oct 16 22:04:53 2008 +0300
@@ -0,0 +1,199 @@
+#include <assert.h>
+
+#include "evsql.h"
+#include "../lib/error.h"
+#include "../lib/misc.h"
+
+int evsql_param_string (struct evsql_query_params *params, size_t param, const char *ptr) {
+ struct evsql_query_param *p = ¶ms->list[param];
+
+ assert(p->type == EVSQL_PARAM_STRING);
+
+ p->data_raw = ptr;
+ p->length = 0;
+
+ return 0;
+}
+
+int evsql_param_uint32 (struct evsql_query_params *params, size_t param, uint32_t uval) {
+ struct evsql_query_param *p = ¶ms->list[param];
+
+ assert(p->type == EVSQL_PARAM_UINT32);
+
+ p->data.uint32 = htonl(uval);
+ p->data_raw = (const char *) &p->data.uint32;
+ p->length = sizeof(uval);
+
+ return 0;
+}
+
+const char *evsql_result_error (const struct evsql_result_info *res) {
+ if (!res->error)
+ return "No error";
+
+ switch (res->evsql->type) {
+ case EVSQL_EVPQ:
+ if (!res->result.pq)
+ return "unknown error (no result)";
+
+ return PQresultErrorMessage(res->result.pq);
+
+ default:
+ FATAL("res->evsql->type");
+ }
+
+}
+
+size_t evsql_result_rows (const struct evsql_result_info *res) {
+ switch (res->evsql->type) {
+ case EVSQL_EVPQ:
+ return PQntuples(res->result.pq);
+
+ default:
+ FATAL("res->evsql->type");
+ }
+}
+
+size_t evsql_result_cols (const struct evsql_result_info *res) {
+ switch (res->evsql->type) {
+ case EVSQL_EVPQ:
+ return PQnfields(res->result.pq);
+
+ default:
+ FATAL("res->evsql->type");
+ }
+}
+
+int evsql_result_binary (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, size_t size, int nullok) {
+ *ptr = NULL;
+
+ switch (res->evsql->type) {
+ case EVSQL_EVPQ:
+ if (PQgetisnull(res->result.pq, row, col)) {
+ if (nullok)
+ return 0;
+ else
+ ERROR("[%zu:%zu] field is null", row, col);
+ }
+
+ if (PQfformat(res->result.pq, col) != 1)
+ ERROR("[%zu:%zu] PQfformat is not binary: %d", row, col, PQfformat(res->result.pq, col));
+
+ if (size && PQgetlength(res->result.pq, row, col) != size)
+ ERROR("[%zu:%zu] field size mismatch: %zu -> %d", row, col, size, PQgetlength(res->result.pq, row, col));
+
+ *ptr = PQgetvalue(res->result.pq, row, col);
+
+ return 0;
+
+ default:
+ FATAL("res->evsql->type");
+ }
+
+error:
+ return -1;
+}
+
+int evsql_result_string (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, int nullok) {
+ return evsql_result_binary(res, row, col, ptr, 0, nullok);
+}
+
+int evsql_result_uint16 (const struct evsql_result_info *res, size_t row, size_t col, uint16_t *uval, int nullok) {
+ const char *data;
+ int16_t sval;
+
+ if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok))
+ goto error;
+
+ if (!data)
+ return 0;
+
+ sval = ntohs(*((int16_t *) data));
+
+ if (sval < 0)
+ ERROR("negative value for unsigned: %d", sval);
+
+ *uval = sval;
+
+ return 0;
+
+error:
+ return nullok ? 0 : -1;
+}
+
+int evsql_result_uint32 (const struct evsql_result_info *res, size_t row, size_t col, uint32_t *uval, int nullok) {
+ const char *data;
+ int32_t sval;
+
+ if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok))
+ goto error;
+
+ if (!data)
+ return 0;
+
+ sval = ntohl(*(int32_t *) data);
+
+ if (sval < 0)
+ ERROR("negative value for unsigned: %d", sval);
+
+ *uval = sval;
+
+ return 0;
+
+error:
+ return nullok ? 0 : -1;
+}
+
+int evsql_result_uint64 (const struct evsql_result_info *res, size_t row, size_t col, uint64_t *uval, int nullok) {
+ const char *data;
+ int64_t sval;
+
+ if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok))
+ goto error;
+
+ if (!data)
+ return 0;
+
+ sval = ntohq(*(int64_t *) data);
+
+ if (sval < 0)
+ ERROR("negative value for unsigned: %ld", sval);
+
+ *uval = sval;
+
+ return 0;
+
+error:
+ return nullok ? 0 : -1;
+}
+
+void evsql_result_free (const struct evsql_result_info *res) {
+ switch (res->evsql->type) {
+ case EVSQL_EVPQ:
+ return PQclear(res->result.pq);
+
+ default:
+ FATAL("res->evsql->type");
+ }
+}
+
+const char *evsql_conn_error (struct evsql_conn *conn) {
+ switch (conn->evsql->type) {
+ case EVSQL_EVPQ:
+ if (!conn->engine.evpq)
+ return "unknown error (no conn)";
+
+ return evpq_error_message(conn->engine.evpq);
+
+ default:
+ FATAL("res->evsql->type");
+ }
+}
+
+const char *evsql_trans_error (struct evsql_trans *trans) {
+ if (trans->conn == NULL)
+ return "unknown error (no trans conn)";
+
+ return evsql_conn_error(trans->conn);
+}
+
--- a/src/evsql_internal.h Wed Oct 15 01:14:22 2008 +0300
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,129 +0,0 @@
-#ifndef EVSQL_INTERNAL_H
-#define EVSQL_INTERNAL_H
-
-#include <sys/queue.h>
-
-#include "evsql.h"
-
-/*
- * The engine type
- */
-enum evsql_type {
- EVSQL_EVPQ, // evpq
-};
-
-/*
- * Contains the type, engine configuration, list of connections and waiting query queue.
- */
-struct evsql {
- // what event_base to use
- struct event_base *ev_base;
-
- // what engine we use
- enum evsql_type type;
-
- // callbacks
- evsql_error_cb error_fn;
- void *cb_arg;
-
- // engine-specific connection configuration
- union {
- const char *evpq;
- } engine_conf;
-
- // list of connections that are open
- LIST_HEAD(evsql_conn_list, evsql_conn) conn_list;
-
- // list of queries running or waiting to run
- TAILQ_HEAD(evsql_query_queue, evsql_query) query_queue;
-};
-
-/*
- * A single connection to the server.
- *
- * Contains the engine connection, may have a transaction associated, and may have a query associated.
- */
-struct evsql_conn {
- // evsql we belong to
- struct evsql *evsql;
-
- // engine-specific connection info
- union {
- struct evpq_conn *evpq;
- } engine;
-
- // our position in the conn list
- LIST_ENTRY(evsql_conn) entry;
-
- // are we running a transaction?
- struct evsql_trans *trans;
-
- // are we running a transactionless query?
- struct evsql_query *query;
-};
-
-/*
- * A single transaction.
- *
- * Has a connection associated and possibly a query (which will also be associated with the connection)
- */
-struct evsql_trans {
- // our evsql_conn/evsql
- struct evsql *evsql;
- struct evsql_conn *conn;
-
- // callbacks
- evsql_trans_error_cb error_fn;
- evsql_trans_ready_cb ready_fn;
- evsql_trans_done_cb done_fn;
- void *cb_arg;
-
- // the transaction type
- enum evsql_trans_type type;
-
- // has evsql_trans_commit be called?
- int has_commit : 1;
-
- // our current query
- struct evsql_query *query;
-
-};
-
-/*
- * A single query.
- *
- * Has the info needed to exec the query (as these may be queued), and the callback/result info.
- */
-struct evsql_query {
- // the actual SQL query, this may or may not be ours, see _evsql_query_exec
- char *command;
-
- // possible query params
- struct evsql_query_param_info {
- int count;
-
- Oid *types;
- const char **values;
- int *lengths;
- int *formats;
-
- int result_format;
- } params;
-
- // our callback
- evsql_query_cb cb_fn;
- void *cb_arg;
-
- // our position in the query list
- TAILQ_ENTRY(evsql_query) entry;
-
- // the result
- union {
- PGresult *evpq;
- } result;
-};
-
-// maximum length for a 'BEGIN TRANSACTION ...' query
-#define EVSQL_QUERY_BEGIN_BUF 512
-
-#endif /* EVSQL_INTERNAL_H */
--- a/src/evsql_util.c Wed Oct 15 01:14:22 2008 +0300
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,201 +0,0 @@
-#include <assert.h>
-
-#include "evsql.h"
-#include "evsql_internal.h"
-#include "evpq.h"
-#include "lib/error.h"
-#include "lib/misc.h"
-
-int evsql_param_string (struct evsql_query_params *params, size_t param, const char *ptr) {
- struct evsql_query_param *p = ¶ms->list[param];
-
- assert(p->type == EVSQL_PARAM_STRING);
-
- p->data_raw = ptr;
- p->length = 0;
-
- return 0;
-}
-
-int evsql_param_uint32 (struct evsql_query_params *params, size_t param, uint32_t uval) {
- struct evsql_query_param *p = ¶ms->list[param];
-
- assert(p->type == EVSQL_PARAM_UINT32);
-
- p->data.uint32 = htonl(uval);
- p->data_raw = (const char *) &p->data.uint32;
- p->length = sizeof(uval);
-
- return 0;
-}
-
-const char *evsql_result_error (const struct evsql_result_info *res) {
- if (!res->error)
- return "No error";
-
- switch (res->evsql->type) {
- case EVSQL_EVPQ:
- if (!res->result.pq)
- return "unknown error (no result)";
-
- return PQresultErrorMessage(res->result.pq);
-
- default:
- FATAL("res->evsql->type");
- }
-
-}
-
-size_t evsql_result_rows (const struct evsql_result_info *res) {
- switch (res->evsql->type) {
- case EVSQL_EVPQ:
- return PQntuples(res->result.pq);
-
- default:
- FATAL("res->evsql->type");
- }
-}
-
-size_t evsql_result_cols (const struct evsql_result_info *res) {
- switch (res->evsql->type) {
- case EVSQL_EVPQ:
- return PQnfields(res->result.pq);
-
- default:
- FATAL("res->evsql->type");
- }
-}
-
-int evsql_result_binary (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, size_t size, int nullok) {
- *ptr = NULL;
-
- switch (res->evsql->type) {
- case EVSQL_EVPQ:
- if (PQgetisnull(res->result.pq, row, col)) {
- if (nullok)
- return 0;
- else
- ERROR("[%zu:%zu] field is null", row, col);
- }
-
- if (PQfformat(res->result.pq, col) != 1)
- ERROR("[%zu:%zu] PQfformat is not binary: %d", row, col, PQfformat(res->result.pq, col));
-
- if (size && PQgetlength(res->result.pq, row, col) != size)
- ERROR("[%zu:%zu] field size mismatch: %zu -> %d", row, col, size, PQgetlength(res->result.pq, row, col));
-
- *ptr = PQgetvalue(res->result.pq, row, col);
-
- return 0;
-
- default:
- FATAL("res->evsql->type");
- }
-
-error:
- return -1;
-}
-
-int evsql_result_string (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, int nullok) {
- return evsql_result_binary(res, row, col, ptr, 0, nullok);
-}
-
-int evsql_result_uint16 (const struct evsql_result_info *res, size_t row, size_t col, uint16_t *uval, int nullok) {
- const char *data;
- int16_t sval;
-
- if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok))
- goto error;
-
- if (!data)
- return 0;
-
- sval = ntohs(*((int16_t *) data));
-
- if (sval < 0)
- ERROR("negative value for unsigned: %d", sval);
-
- *uval = sval;
-
- return 0;
-
-error:
- return nullok ? 0 : -1;
-}
-
-int evsql_result_uint32 (const struct evsql_result_info *res, size_t row, size_t col, uint32_t *uval, int nullok) {
- const char *data;
- int32_t sval;
-
- if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok))
- goto error;
-
- if (!data)
- return 0;
-
- sval = ntohl(*(int32_t *) data);
-
- if (sval < 0)
- ERROR("negative value for unsigned: %d", sval);
-
- *uval = sval;
-
- return 0;
-
-error:
- return nullok ? 0 : -1;
-}
-
-int evsql_result_uint64 (const struct evsql_result_info *res, size_t row, size_t col, uint64_t *uval, int nullok) {
- const char *data;
- int64_t sval;
-
- if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok))
- goto error;
-
- if (!data)
- return 0;
-
- sval = ntohq(*(int64_t *) data);
-
- if (sval < 0)
- ERROR("negative value for unsigned: %ld", sval);
-
- *uval = sval;
-
- return 0;
-
-error:
- return nullok ? 0 : -1;
-}
-
-void evsql_result_free (const struct evsql_result_info *res) {
- switch (res->evsql->type) {
- case EVSQL_EVPQ:
- return PQclear(res->result.pq);
-
- default:
- FATAL("res->evsql->type");
- }
-}
-
-const char *evsql_conn_error (struct evsql_conn *conn) {
- switch (conn->evsql->type) {
- case EVSQL_EVPQ:
- if (!conn->engine.evpq)
- return "unknown error (no conn)";
-
- return evpq_error_message(conn->engine.evpq);
-
- default:
- FATAL("res->evsql->type");
- }
-}
-
-const char *evsql_trans_error (struct evsql_trans *trans) {
- if (trans->conn == NULL)
- return "unknown error (no trans conn)";
-
- return evsql_conn_error(trans->conn);
-}
-