--- a/src/dbfs.c Sun Oct 12 20:10:47 2008 +0300
+++ b/src/dbfs.c Sun Oct 12 21:59:52 2008 +0300
@@ -3,8 +3,10 @@
* A simple PostgreSQL-based filesystem.
*/
+#include <stdlib.h>
#include <string.h>
#include <errno.h>
+#include <assert.h>
#include <event2/event.h>
@@ -276,6 +278,193 @@
EWARNING(err, "fuse_reply_err");
}
+struct dbfs_dirop {
+ struct fuse_file_info *fi;
+ struct fuse_req *req;
+
+ struct evsql_trans *trans;
+
+ // opendir has returned and releasedir hasn't been called yet
+ int open;
+};
+
+/*
+ * Free the dirop, aborting any in-progress transaction.
+ *
+ * req must be NULL.
+ */
+static void dbfs_dirop_free (struct dbfs_dirop *dirop) {
+ assert(dirop->req == NULL);
+
+ if (dirop->trans)
+ evsql_trans_abort(dirop->trans);
+
+ free(dirop);
+}
+
+/*
+ * The opendir transaction is ready
+ */
+static void dbfs_dirop_ready (struct evsql_trans *trans, void *arg) {
+ struct dbfs_dirop *dirop = arg;
+ struct fuse_req *req = dirop->req; dirop->req = NULL;
+ int err;
+
+ INFO("[dbfs.openddir %p:%p] -> trans=%p", dirop, req, trans);
+
+ // remember the transaction
+ dirop->trans = trans;
+
+ // send the openddir reply
+ if ((err = fuse_reply_open(dirop->req, dirop->fi)))
+ EERROR(err, "fuse_reply_open");
+
+ // dirop is now open
+ dirop->open = 1;
+
+ // ok, wait for the next fs req
+ return;
+
+error:
+ dbfs_dirop_free(dirop);
+
+ if ((err = fuse_reply_err(req, err)))
+ EWARNING(err, "fuse_reply_err");
+}
+
+static void dbfs_dirop_done (struct evsql_trans *trans, void *arg) {
+ struct dbfs_dirop *dirop = arg;
+ int err;
+
+
+}
+
+static void dbfs_dirop_error (struct evsql_trans *trans, void *arg) {
+ struct dbfs_dirop *dirop = arg;
+ int err;
+
+ INFO("[dbfs:dirop %p:%p] evsql transaction error: %s", dirop, dirop->req, evsql_trans_error(trans));
+
+ // deassociate the trans
+ dirop->trans = NULL;
+
+ // error out and pending req
+ if (dirop->req) {
+ if ((err = fuse_reply_err(dirop->req, EIO)))
+ EWARNING(err, "fuse_erply_err");
+
+ dirop->req = NULL;
+
+ // only free the dirop if it isn't open
+ if (!dirop->open)
+ dbfs_dirop_free(dirop);
+ }
+}
+
+static void dbfs_opendir (struct fuse_req *req, fuse_ino_t ino, struct fuse_file_info *fi) {
+ struct dbfs *ctx = fuse_req_userdata(req);
+ struct dbfs_dirop *dirop = NULL;
+ int err;
+
+ // allocate it
+ if ((dirop = calloc(1, sizeof(*dirop))) == NULL && (err = EIO))
+ ERROR("calloc");
+
+ INFO("[dbfs.opendir %p:%p] ino=%lu, fi=%p", dirop, req, ino, fi);
+
+ // store the dirop
+ fi->fh = (uint64_t) dirop;
+ dirop->req = req;
+ dirop->fi = fi;
+
+ // start a new transaction
+ if (evsql_trans(ctx->db, EVSQL_TRANS_SERIALIZABLE, dbfs_dirop_error, dbfs_dirop_ready, dbfs_dirop_done, dirop))
+ SERROR(err = EIO);
+
+ // XXX: handle interrupts
+
+ // wait
+ return;
+
+error:
+ // we handle the req
+ dirop->req = NULL;
+
+ dbfs_dirop_free(dirop);
+
+ if ((err = fuse_reply_err(req, err)))
+ EWARNING(err, "fuse_reply_err");
+}
+
+static void dbfs_readdir (struct fuse_req *req, fuse_ino_t ino, size_t size, off_t off, struct fuse_file_info *fi) {
+ struct dbfs *ctx = fuse_req_userdata(req);
+ struct dbfs_dirop *dirop = (struct dbfs_dirop *) fi->fh;
+ int err;
+
+ INFO("[dbfs.readdir %p:%p] ino=%lu, size=%zu, off=%zu, fi=%p : trans=%p", dirop, req, ino, size, off, fi, dirop->trans);
+
+ // update dirop
+ dirop->req = req;
+ assert(dirop->fi == fi);
+
+ // select all relevant file entries
+ const char *sql =
+ "SELECT"
+ " \"file_tree.offset\", file_tree.name, inodes.ino, inodes.type"
+ " FROM file_tree LEFT OUTER JOIN inodes ON (file_tree.inode = inodes.ino)"
+ " WHERE file_tree.parent = $1::int4 AND \"file_tree.offset\" >= $2::int4"
+ " LIMIT $3::int4";
+
+ static struct evsql_query_params params = EVSQL_PARAMS(EVSQL_FMT_BINARY) {
+ EVSQL_PARAM ( UINT32 ),
+ EVSQL_PARAM ( UINT32 ),
+ EVSQL_PARAM ( UINT32 ),
+
+ EVSQL_PARAMS_END
+ };
+
+ // XXX: incomplete
+}
+
+static void dbfs_releasedir (struct fuse_req *req, fuse_ino_t ino, struct fuse_file_info *fi) {
+ struct dbfs *ctx = fuse_req_userdata(req);
+ struct dbfs_dirop *dirop = (struct dbfs_dirop *) fi->fh;
+ int err;
+
+ (void) ctx;
+
+ INFO("[dbfs.releasedir %p:%p] ino=%lu, fi=%p : trans=%p", dirop, req, ino, fi, dirop->trans);
+
+ // update dirop. Must keep it open so that dbfs_dirop_error won't free it
+ dirop->req = req;
+ assert(dirop->fi == fi);
+
+ // we can commit the transaction, although we didn't make any changes
+ // if this fails the transaction, then dbfs_dirop_error will take care of sending the error, and dirop->req will be
+ // NULL
+ if (evsql_trans_commit(dirop->trans))
+ SERROR(err = EIO);
+
+ // not open anymore
+ dirop->open = 0;
+
+ // XXX: handle interrupts
+
+ // wait
+ return;
+
+error:
+ if (dirop->req) {
+ // we handle the req
+ dirop->req = NULL;
+
+ dbfs_dirop_free(dirop);
+
+ if ((err = fuse_reply_err(req, err)))
+ EWARNING(err, "fuse_reply_err");
+ }
+}
+
struct fuse_lowlevel_ops dbfs_llops = {
.init = dbfs_init,
@@ -284,6 +473,9 @@
.lookup = dbfs_lookup,
.getattr = dbfs_getattr,
+
+ .opendir = dbfs_opendir,
+ .releasedir = dbfs_releasedir,
};
void dbfs_sql_error (struct evsql *evsql, void *arg) {
--- a/src/dirbuf.c Sun Oct 12 20:10:47 2008 +0300
+++ b/src/dirbuf.c Sun Oct 12 21:59:52 2008 +0300
@@ -22,6 +22,19 @@
return -1;
}
+size_t difbuf_estimate (size_t req_size, size_t min_namelen) {
+ char namebuf[DIRBUF_NAME_MAX];
+ int i;
+
+ // build a dummy string of the right length
+ for (i = 0; i < min_namelen && i < DIRBUF_NAME_MAX - 1; i++)
+ namebuf[i] = 'x';
+
+ namebuf[i] = '\0';
+
+ return req_size / (fuse_add_direntry(NULL, NULL, 0, namebuf, NULL, 0));
+}
+
int dirbuf_add (fuse_req_t req, off_t req_off, struct dirbuf *buf, off_t ent_off, off_t next_off, const char *ent_name, fuse_ino_t ent_ino, mode_t ent_mode) {
struct stat stbuf;
size_t ent_size;
--- a/src/dirbuf.h Sun Oct 12 20:10:47 2008 +0300
+++ b/src/dirbuf.h Sun Oct 12 21:59:52 2008 +0300
@@ -16,6 +16,13 @@
size_t off;
};
+// maximum length for a dirbuf name, including NUL byte
+#define DIRBUF_NAME_MAX 256
+
+/*
+ * Estimate how many dir entries will, at most, fit into a difbuf of the given size, based on a minimum filename size.
+ */
+size_t difbuf_estimate (size_t req_size, size_t min_namelen);
/*
* Initialize a dirbuf for a request. The dirbuf will be filled with at most req_size bytes of dir entries.
--- a/src/evsql.c Sun Oct 12 20:10:47 2008 +0300
+++ b/src/evsql.c Sun Oct 12 21:59:52 2008 +0300
@@ -10,6 +10,10 @@
#include "lib/error.h"
#include "lib/misc.h"
+/*
+ * A couple function prototypes
+ */
+static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn);
/*
* Actually execute the given query.
@@ -141,12 +145,31 @@
// remove from list
LIST_REMOVE(conn, entry);
+
+ // catch deadlocks
+ assert(!LIST_EMPTY(&conn->evsql->conn_list) || TAILQ_EMPTY(&conn->evsql->query_queue));
// free
free(conn);
}
/*
+ * Release a transaction, it should already be deassociated from the query.
+ *
+ * Perform a two-way-deassociation with the conn, and then free the trans.
+ */
+static void _evsql_trans_release (struct evsql_trans *trans) {
+ assert(trans->query == NULL);
+ assert(trans->conn != NULL);
+
+ // deassociate the conn
+ trans->conn->trans = NULL; trans->conn = NULL;
+
+ // free the trans
+ _evsql_trans_free(trans);
+}
+
+/*
* Fail a single query, this will trigger the callback and free it.
*/
static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) {
@@ -164,7 +187,7 @@
* Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the
* conn, and then free the trans.
*/
-static void _evsql_trans_fail (struct evsql_trans *trans) {
+static void _evsql_trans_fail (struct evsql_trans *trans, int silent) {
if (trans->query) {
// free the query silently
_evsql_query_free(trans->query); trans->query = NULL;
@@ -172,10 +195,14 @@
// tell the user
// XXX: trans is in a bad state during this call
- trans->error_fn(trans, trans->cb_arg);
+ if (!silent)
+ trans->error_fn(trans, trans->cb_arg);
+
+ // deassociate and release the conn
+ trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL;
- // fail the conn
- trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL;
+ // pump the queue for requests that were waiting for this connection
+ _evsql_pump(trans->evsql, NULL);
// free the trans
_evsql_trans_free(trans);
@@ -188,7 +215,7 @@
static void _evsql_conn_fail (struct evsql_conn *conn) {
if (conn->trans) {
// let transactions handle their connection failures
- _evsql_trans_fail(conn->trans);
+ _evsql_trans_fail(conn->trans, 0);
} else {
if (conn->query) {
@@ -207,6 +234,8 @@
* If execing a query on a connection fails, both the query and the connection are failed (in that order).
*
* Any further queries will then also be failed, because there's no reconnection/retry logic yet.
+ *
+ * This means that if conn is NULL, all queries are failed.
*/
static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) {
struct evsql_query *query;
@@ -268,9 +297,12 @@
// transaction is now ready for use
res->trans->ready_fn(res->trans, res->trans->cb_arg);
+
+ // good
+ return;
error:
- _evsql_trans_fail(res->trans);
+ _evsql_trans_fail(res->trans, 0);
}
/*
@@ -321,7 +353,7 @@
error:
// fail the transaction
- _evsql_trans_fail(trans);
+ _evsql_trans_fail(trans, 0);
}
/*
@@ -619,11 +651,52 @@
return -1;
}
+struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, evsql_trans_done_cb done_fn, void *cb_arg) {
+ struct evsql_trans *trans = NULL;
+
+ // allocate it
+ if ((trans = calloc(1, sizeof(*trans))) == NULL)
+ ERROR("calloc");
+
+ // store
+ trans->evsql = evsql;
+ trans->error_fn = error_fn;
+ trans->ready_fn = ready_fn;
+ trans->done_fn = done_fn;
+ trans->cb_arg = cb_arg;
+ trans->type = type;
+
+ // find a connection
+ if (_evsql_conn_get(evsql, &trans->conn, 0))
+ ERROR("_evsql_conn_get");
+
+ // associate the conn
+ trans->conn->trans = trans;
+
+ // is it already ready?
+ if (_evsql_conn_ready(trans->conn) > 0) {
+ // call _evsql_trans_conn_ready directly
+ _evsql_trans_conn_ready(evsql, trans);
+
+ } else {
+ // otherwise, wait for the conn to be ready
+
+ }
+
+ // ok
+ return trans;
+
+error:
+ free(trans);
+
+ return NULL;
+}
+
/*
* Validate and allocate the basic stuff for a new query.
*/
static struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) {
- struct evsql_query *query;
+ struct evsql_query *query = NULL;
// if it's part of a trans, then make sure the trans is idle
if (trans && trans->query)
@@ -660,8 +733,13 @@
trans->query = query;
// execute directly
- if (_evsql_query_exec(trans->conn, query, command))
+ if (_evsql_query_exec(trans->conn, query, command)) {
+ // ack, fail the connection
+ _evsql_conn_fail(trans->conn);
+
+ // caller frees query
goto error;
+ }
} else {
struct evsql_conn *conn;
@@ -673,9 +751,17 @@
// we must enqueue if no idle conn or the conn is not yet ready
if (conn && _evsql_conn_ready(conn) > 0) {
// execute directly
- if (_evsql_query_exec(conn, query, command))
+ if (_evsql_query_exec(conn, query, command)) {
+ // ack, fail the connection
+ _evsql_conn_fail(conn);
+
+ // make sure we don't deadlock any queries, but if this query got a conn directly, then we shouldn't
+ // have any queries enqueued anyways
+ assert(TAILQ_EMPTY(&evsql->query_queue));
+
+ // caller frees query
goto error;
-
+ }
} else {
// copy the command for later execution
@@ -777,3 +863,59 @@
return NULL;
}
+void _evsql_trans_commit_res (const struct evsql_result_info *res, void *arg) {
+ (void) arg;
+
+ assert(res->trans);
+
+ // check for errors
+ if (res->error)
+ ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res));
+
+ // transaction is now done
+ res->trans->done_fn(res->trans, res->trans->cb_arg);
+
+ // release it
+ _evsql_trans_release(res->trans);
+
+ // success
+ return;
+
+error:
+ _evsql_trans_fail(res->trans, 0);
+}
+
+int evsql_trans_commit (struct evsql_trans *trans) {
+ static const char *sql = "COMMIT TRANSACTION";
+
+ // query
+ return (evsql_query(trans->evsql, trans, sql, _evsql_trans_commit_res, NULL) != NULL);
+}
+
+void _evsql_trans_rollback_res (const struct evsql_result_info *res, void *arg) {
+ (void) arg;
+
+ assert(res->trans);
+
+ // fail the connection on errors
+ if (res->error)
+ ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res));
+
+ // release it
+ _evsql_trans_release(res->trans);
+
+ // success
+ return;
+
+error:
+ // but do it silently
+ _evsql_trans_fail(res->trans, 1);
+}
+
+int evsql_trans_abort (struct evsql_trans *trans) {
+ static const char *sql = "ROLLBACK TRANSACTION";
+
+ // query
+ return (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, NULL) != NULL);
+}
+
--- a/src/evsql.h Sun Oct 12 20:10:47 2008 +0300
+++ b/src/evsql.h Sun Oct 12 21:59:52 2008 +0300
@@ -136,6 +136,11 @@
typedef void (*evsql_trans_ready_cb)(struct evsql_trans *trans, void *arg);
/*
+ * The transaction was commited, and should not be used anymore.
+ */
+typedef void (*evsql_trans_done_cb)(struct evsql_trans *trans, void *arg);
+
+/*
* Create a new PostgreSQL/libpq(evpq) -based evsql using the given conninfo.
*
* The given conninfo must stay valid for the duration of the evsql's lifetime.
@@ -147,9 +152,11 @@
*
* Transactions are separate connections that provide transaction-isolation.
*
- * Once the transaction is ready for use, ready_fn will be called. If the transaction fails, any pending query will be forgotten, and error_fn called.
+ * Once the transaction is ready for use, ready_fn will be called. If the transaction fails, any pending query will be
+ * forgotten, and error_fn called. This also includes some (but not all) cases where evsql_query returns nonzero.
+ *
*/
-struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, void *cb_arg);
+struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, evsql_trans_done_cb done_fn, void *cb_arg);
/*
* Queue the given query for execution.
@@ -168,6 +175,26 @@
struct evsql_query *evsql_query_params (struct evsql *evsql, struct evsql_trans *trans, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg);
/*
+ * Commit a transaction, calling done_fn if it was succesfull (error_fn otherwise).
+ */
+int evsql_trans_commit (struct evsql_trans *trans);
+
+/*
+ * Abort a transaction, rolling it back. No callbacks will be called, unless this function returns nonzero, in which
+ * case error_fn might be called.
+ */
+int evsql_trans_abort (struct evsql_trans *trans);
+
+/*
+ * Transaction-handling functions
+ */
+
+// error string, meant to be called from evsql_trans_error_cb
+const char *evsql_trans_error (struct evsql_trans *trans);
+
+// commit the transaction, calling
+
+/*
* Param-building functions
*/
int evsql_param_string (struct evsql_query_params *params, size_t param, const char *ptr);
--- a/src/evsql_internal.h Sun Oct 12 20:10:47 2008 +0300
+++ b/src/evsql_internal.h Sun Oct 12 21:59:52 2008 +0300
@@ -69,12 +69,13 @@
*/
struct evsql_trans {
// our evsql_conn/evsql
- //struct evsql *evsql;
+ struct evsql *evsql;
struct evsql_conn *conn;
// callbacks
evsql_trans_error_cb error_fn;
evsql_trans_ready_cb ready_fn;
+ evsql_trans_done_cb done_fn;
void *cb_arg;
// the transaction type
--- a/src/evsql_util.c Sun Oct 12 20:10:47 2008 +0300
+++ b/src/evsql_util.c Sun Oct 12 21:59:52 2008 +0300
@@ -2,6 +2,7 @@
#include "evsql.h"
#include "evsql_internal.h"
+#include "evpq.h"
#include "lib/error.h"
#include "lib/misc.h"
@@ -168,3 +169,24 @@
FATAL("res->evsql->type");
}
}
+
+const char *evsql_conn_error (struct evsql_conn *conn) {
+ switch (conn->evsql->type) {
+ case EVSQL_EVPQ:
+ if (!conn->engine.evpq)
+ return "unknown error (no conn)";
+
+ return evpq_error_message(conn->engine.evpq);
+
+ default:
+ FATAL("res->evsql->type");
+ }
+}
+
+const char *evsql_trans_error (struct evsql_trans *trans) {
+ if (trans->conn == NULL)
+ return "unknown error (no trans conn)";
+
+ return evsql_conn_error(trans->conn);
+}
+