preliminary *dir implementation
authorTero Marttila <terom@fixme.fi>
Sun, 12 Oct 2008 21:59:52 +0300
changeset 26 61668c57f4bb
parent 25 99a41f48e29b
child 27 461be4cd34a3
preliminary *dir implementation
src/dbfs.c
src/dirbuf.c
src/dirbuf.h
src/evsql.c
src/evsql.h
src/evsql_internal.h
src/evsql_util.c
--- a/src/dbfs.c	Sun Oct 12 20:10:47 2008 +0300
+++ b/src/dbfs.c	Sun Oct 12 21:59:52 2008 +0300
@@ -3,8 +3,10 @@
  * A simple PostgreSQL-based filesystem.
  */
 
+#include <stdlib.h>
 #include <string.h>
 #include <errno.h>
+#include <assert.h>
 
 #include <event2/event.h>
 
@@ -276,6 +278,193 @@
         EWARNING(err, "fuse_reply_err");
 }
 
+struct dbfs_dirop {
+    struct fuse_file_info *fi;
+    struct fuse_req *req;
+
+    struct evsql_trans *trans;
+    
+    // opendir has returned and releasedir hasn't been called yet
+    int open;
+};
+
+/*
+ * Free the dirop, aborting any in-progress transaction.
+ *
+ * req must be NULL.
+ */
+static void dbfs_dirop_free (struct dbfs_dirop *dirop) {
+    assert(dirop->req == NULL);
+
+    if (dirop->trans)
+        evsql_trans_abort(dirop->trans);
+
+    free(dirop);
+}
+
+/*
+ * The opendir transaction is ready
+ */
+static void dbfs_dirop_ready (struct evsql_trans *trans, void *arg) {
+    struct dbfs_dirop *dirop = arg;
+    struct fuse_req *req = dirop->req; dirop->req = NULL;
+    int err;
+
+    INFO("[dbfs.openddir %p:%p] -> trans=%p", dirop, req, trans);
+
+    // remember the transaction
+    dirop->trans = trans;
+
+    // send the openddir reply
+    if ((err = fuse_reply_open(dirop->req, dirop->fi)))
+        EERROR(err, "fuse_reply_open");
+    
+    // dirop is now open
+    dirop->open = 1;
+
+    // ok, wait for the next fs req
+    return;
+
+error:
+    dbfs_dirop_free(dirop);
+
+    if ((err = fuse_reply_err(req, err)))
+        EWARNING(err, "fuse_reply_err");
+}
+
+static void dbfs_dirop_done (struct evsql_trans *trans, void *arg) {
+    struct dbfs_dirop *dirop = arg;
+    int err;
+    
+
+}
+
+static void dbfs_dirop_error (struct evsql_trans *trans, void *arg) {
+    struct dbfs_dirop *dirop = arg;
+    int err;
+
+    INFO("[dbfs:dirop %p:%p] evsql transaction error: %s", dirop, dirop->req, evsql_trans_error(trans));
+    
+    // deassociate the trans
+    dirop->trans = NULL;
+    
+    // error out and pending req
+    if (dirop->req) {
+        if ((err = fuse_reply_err(dirop->req, EIO)))
+            EWARNING(err, "fuse_erply_err");
+
+        dirop->req = NULL;
+
+        // only free the dirop if it isn't open
+        if (!dirop->open)
+            dbfs_dirop_free(dirop);
+    }
+}
+
+static void dbfs_opendir (struct fuse_req *req, fuse_ino_t ino, struct fuse_file_info *fi) {
+    struct dbfs *ctx = fuse_req_userdata(req);
+    struct dbfs_dirop *dirop = NULL;
+    int err;
+    
+    // allocate it
+    if ((dirop = calloc(1, sizeof(*dirop))) == NULL && (err = EIO))
+        ERROR("calloc");
+
+    INFO("[dbfs.opendir %p:%p] ino=%lu, fi=%p", dirop, req, ino, fi);
+        
+    // store the dirop
+    fi->fh = (uint64_t) dirop;
+    dirop->req = req;
+    dirop->fi = fi;
+
+    // start a new transaction
+    if (evsql_trans(ctx->db, EVSQL_TRANS_SERIALIZABLE, dbfs_dirop_error, dbfs_dirop_ready, dbfs_dirop_done, dirop))
+        SERROR(err = EIO);
+    
+    // XXX: handle interrupts
+    
+    // wait
+    return;
+
+error:
+    // we handle the req
+    dirop->req = NULL;
+
+    dbfs_dirop_free(dirop);
+
+    if ((err = fuse_reply_err(req, err)))
+        EWARNING(err, "fuse_reply_err");
+}
+
+static void dbfs_readdir (struct fuse_req *req, fuse_ino_t ino, size_t size, off_t off, struct fuse_file_info *fi) {
+    struct dbfs *ctx = fuse_req_userdata(req);
+    struct dbfs_dirop *dirop = (struct dbfs_dirop *) fi->fh;
+    int err;
+
+    INFO("[dbfs.readdir %p:%p] ino=%lu, size=%zu, off=%zu, fi=%p : trans=%p", dirop, req, ino, size, off, fi, dirop->trans);
+
+    // update dirop
+    dirop->req = req;
+    assert(dirop->fi == fi);
+
+    // select all relevant file entries
+    const char *sql = 
+        "SELECT"
+        " \"file_tree.offset\", file_tree.name, inodes.ino, inodes.type"
+        " FROM file_tree LEFT OUTER JOIN inodes ON (file_tree.inode = inodes.ino)"
+        " WHERE file_tree.parent = $1::int4 AND \"file_tree.offset\" >= $2::int4"
+        " LIMIT $3::int4";
+
+    static struct evsql_query_params params = EVSQL_PARAMS(EVSQL_FMT_BINARY) {
+        EVSQL_PARAM ( UINT32 ),
+        EVSQL_PARAM ( UINT32 ),
+        EVSQL_PARAM ( UINT32 ),
+
+        EVSQL_PARAMS_END
+    };
+
+    // XXX: incomplete
+}
+
+static void dbfs_releasedir (struct fuse_req *req, fuse_ino_t ino, struct fuse_file_info *fi) {
+    struct dbfs *ctx = fuse_req_userdata(req);
+    struct dbfs_dirop *dirop = (struct dbfs_dirop *) fi->fh;
+    int err;
+
+    (void) ctx;
+
+    INFO("[dbfs.releasedir %p:%p] ino=%lu, fi=%p : trans=%p", dirop, req, ino, fi, dirop->trans);
+
+    // update dirop. Must keep it open so that dbfs_dirop_error won't free it
+    dirop->req = req;
+    assert(dirop->fi == fi);
+
+    // we can commit the transaction, although we didn't make any changes
+    // if this fails the transaction, then dbfs_dirop_error will take care of sending the error, and dirop->req will be
+    // NULL
+    if (evsql_trans_commit(dirop->trans))
+        SERROR(err = EIO);
+
+    // not open anymore
+    dirop->open = 0;
+
+    // XXX: handle interrupts
+    
+    // wait
+    return;
+
+error:
+    if (dirop->req) {
+        // we handle the req
+        dirop->req = NULL;
+
+        dbfs_dirop_free(dirop);
+
+        if ((err = fuse_reply_err(req, err)))
+            EWARNING(err, "fuse_reply_err");
+    }
+}
+
 struct fuse_lowlevel_ops dbfs_llops = {
 
     .init           = dbfs_init,
@@ -284,6 +473,9 @@
     .lookup         = dbfs_lookup,
 
     .getattr        = dbfs_getattr,
+
+    .opendir        = dbfs_opendir,
+    .releasedir     = dbfs_releasedir,
 };
 
 void dbfs_sql_error (struct evsql *evsql, void *arg) {
--- a/src/dirbuf.c	Sun Oct 12 20:10:47 2008 +0300
+++ b/src/dirbuf.c	Sun Oct 12 21:59:52 2008 +0300
@@ -22,6 +22,19 @@
     return -1;
 }
 
+size_t difbuf_estimate (size_t req_size, size_t min_namelen) {
+    char namebuf[DIRBUF_NAME_MAX];
+    int i;
+    
+    // build a dummy string of the right length
+    for (i = 0; i < min_namelen && i < DIRBUF_NAME_MAX - 1; i++)
+        namebuf[i] = 'x';
+
+    namebuf[i] = '\0';
+
+    return req_size / (fuse_add_direntry(NULL, NULL, 0, namebuf, NULL, 0));
+}
+
 int dirbuf_add (fuse_req_t req, off_t req_off, struct dirbuf *buf, off_t ent_off, off_t next_off, const char *ent_name, fuse_ino_t ent_ino, mode_t ent_mode) {
     struct stat stbuf;
     size_t ent_size;
--- a/src/dirbuf.h	Sun Oct 12 20:10:47 2008 +0300
+++ b/src/dirbuf.h	Sun Oct 12 21:59:52 2008 +0300
@@ -16,6 +16,13 @@
     size_t off;
 };
 
+// maximum length for a dirbuf name, including NUL byte
+#define DIRBUF_NAME_MAX 256
+
+/*
+ * Estimate how many dir entries will, at most, fit into a difbuf of the given size, based on a minimum filename size.
+ */
+size_t difbuf_estimate (size_t req_size, size_t min_namelen);
 
 /*
  * Initialize a dirbuf for a request. The dirbuf will be filled with at most req_size bytes of dir entries.
--- a/src/evsql.c	Sun Oct 12 20:10:47 2008 +0300
+++ b/src/evsql.c	Sun Oct 12 21:59:52 2008 +0300
@@ -10,6 +10,10 @@
 #include "lib/error.h"
 #include "lib/misc.h"
 
+/*
+ * A couple function prototypes
+ */ 
+static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn);
 
 /*
  * Actually execute the given query.
@@ -141,12 +145,31 @@
     
     // remove from list
     LIST_REMOVE(conn, entry);
+    
+    // catch deadlocks
+    assert(!LIST_EMPTY(&conn->evsql->conn_list) || TAILQ_EMPTY(&conn->evsql->query_queue));
 
     // free
     free(conn);
 }
 
 /*
+ * Release a transaction, it should already be deassociated from the query.
+ *
+ * Perform a two-way-deassociation with the conn, and then free the trans.
+ */
+static void _evsql_trans_release (struct evsql_trans *trans) {
+    assert(trans->query == NULL);
+    assert(trans->conn != NULL);
+
+    // deassociate the conn
+    trans->conn->trans = NULL; trans->conn = NULL;
+
+    // free the trans
+    _evsql_trans_free(trans);
+}
+
+/*
  * Fail a single query, this will trigger the callback and free it.
  */
 static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) {
@@ -164,7 +187,7 @@
  * Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the
  * conn, and then free the trans.
  */ 
-static void _evsql_trans_fail (struct evsql_trans *trans) {
+static void _evsql_trans_fail (struct evsql_trans *trans, int silent) {
     if (trans->query) {
         // free the query silently
         _evsql_query_free(trans->query); trans->query = NULL;
@@ -172,10 +195,14 @@
 
     // tell the user
     // XXX: trans is in a bad state during this call
-    trans->error_fn(trans, trans->cb_arg);
+    if (!silent)
+        trans->error_fn(trans, trans->cb_arg);
+ 
+    // deassociate and release the conn
+    trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL;
 
-    // fail the conn
-    trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL;
+    // pump the queue for requests that were waiting for this connection
+    _evsql_pump(trans->evsql, NULL);
 
     // free the trans
     _evsql_trans_free(trans);
@@ -188,7 +215,7 @@
 static void _evsql_conn_fail (struct evsql_conn *conn) {
     if (conn->trans) {
         // let transactions handle their connection failures
-        _evsql_trans_fail(conn->trans);
+        _evsql_trans_fail(conn->trans, 0);
 
     } else {
         if (conn->query) {
@@ -207,6 +234,8 @@
  * If execing a query on a connection fails, both the query and the connection are failed (in that order).
  *
  * Any further queries will then also be failed, because there's no reconnection/retry logic yet.
+ *
+ * This means that if conn is NULL, all queries are failed.
  */
 static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) {
     struct evsql_query *query;
@@ -268,9 +297,12 @@
     
     // transaction is now ready for use
     res->trans->ready_fn(res->trans, res->trans->cb_arg);
+    
+    // good
+    return;
 
 error:
-    _evsql_trans_fail(res->trans);
+    _evsql_trans_fail(res->trans, 0);
 }
 
 /*
@@ -321,7 +353,7 @@
 
 error:
     // fail the transaction
-    _evsql_trans_fail(trans);
+    _evsql_trans_fail(trans, 0);
 }
 
 /*
@@ -619,11 +651,52 @@
     return -1;
 }
 
+struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, evsql_trans_done_cb done_fn, void *cb_arg) {
+    struct evsql_trans *trans = NULL;
+
+    // allocate it
+    if ((trans = calloc(1, sizeof(*trans))) == NULL)
+        ERROR("calloc");
+
+    // store
+    trans->evsql = evsql;
+    trans->error_fn = error_fn;
+    trans->ready_fn = ready_fn;
+    trans->done_fn = done_fn;
+    trans->cb_arg = cb_arg;
+    trans->type = type;
+
+    // find a connection
+    if (_evsql_conn_get(evsql, &trans->conn, 0))
+        ERROR("_evsql_conn_get");
+
+    // associate the conn
+    trans->conn->trans = trans;
+
+    // is it already ready?
+    if (_evsql_conn_ready(trans->conn) > 0) {
+        // call _evsql_trans_conn_ready directly
+        _evsql_trans_conn_ready(evsql, trans);
+
+    } else {
+        // otherwise, wait for the conn to be ready
+         
+    }
+
+    // ok
+    return trans;
+
+error:
+    free(trans);
+
+    return NULL;
+}
+
 /*
  * Validate and allocate the basic stuff for a new query.
  */
 static struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) {
-    struct evsql_query *query;
+    struct evsql_query *query = NULL;
     
     // if it's part of a trans, then make sure the trans is idle
     if (trans && trans->query)
@@ -660,8 +733,13 @@
         trans->query = query;
 
         // execute directly
-        if (_evsql_query_exec(trans->conn, query, command))
+        if (_evsql_query_exec(trans->conn, query, command)) {
+            // ack, fail the connection
+            _evsql_conn_fail(trans->conn);
+            
+            // caller frees query
             goto error;
+        }
 
     } else {
         struct evsql_conn *conn;
@@ -673,9 +751,17 @@
         // we must enqueue if no idle conn or the conn is not yet ready
         if (conn && _evsql_conn_ready(conn) > 0) {
             // execute directly
-            if (_evsql_query_exec(conn, query, command))
+            if (_evsql_query_exec(conn, query, command)) {
+                // ack, fail the connection
+                _evsql_conn_fail(conn);
+                
+                // make sure we don't deadlock any queries, but if this query got a conn directly, then we shouldn't
+                // have any queries enqueued anyways
+                assert(TAILQ_EMPTY(&evsql->query_queue));
+                
+                // caller frees query
                 goto error;
-
+            }
 
         } else {
             // copy the command for later execution
@@ -777,3 +863,59 @@
     return NULL;
 }
 
+void _evsql_trans_commit_res (const struct evsql_result_info *res, void *arg) {
+    (void) arg;
+
+    assert(res->trans);
+
+    // check for errors
+    if (res->error)
+        ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res));
+    
+    // transaction is now done
+    res->trans->done_fn(res->trans, res->trans->cb_arg);
+    
+    // release it
+    _evsql_trans_release(res->trans);
+
+    // success
+    return;
+
+error:
+    _evsql_trans_fail(res->trans, 0);
+}
+
+int evsql_trans_commit (struct evsql_trans *trans) {
+    static const char *sql = "COMMIT TRANSACTION";
+    
+    // query
+    return (evsql_query(trans->evsql, trans, sql, _evsql_trans_commit_res, NULL) != NULL);
+}
+
+void _evsql_trans_rollback_res (const struct evsql_result_info *res, void *arg) {
+    (void) arg;
+
+    assert(res->trans);
+
+    // fail the connection on errors
+    if (res->error)
+        ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res));
+
+    // release it
+    _evsql_trans_release(res->trans);
+
+    // success
+    return;
+
+error:
+    // but do it silently
+    _evsql_trans_fail(res->trans, 1);
+}
+
+int evsql_trans_abort (struct evsql_trans *trans) {
+    static const char *sql = "ROLLBACK TRANSACTION";
+
+    // query
+    return (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, NULL) != NULL);
+}
+
--- a/src/evsql.h	Sun Oct 12 20:10:47 2008 +0300
+++ b/src/evsql.h	Sun Oct 12 21:59:52 2008 +0300
@@ -136,6 +136,11 @@
 typedef void (*evsql_trans_ready_cb)(struct evsql_trans *trans, void *arg);
 
 /*
+ * The transaction was commited, and should not be used anymore.
+ */
+typedef void (*evsql_trans_done_cb)(struct evsql_trans *trans, void *arg);
+
+/*
  * Create a new PostgreSQL/libpq(evpq) -based evsql using the given conninfo.
  *
  * The given conninfo must stay valid for the duration of the evsql's lifetime.
@@ -147,9 +152,11 @@
  *
  * Transactions are separate connections that provide transaction-isolation.
  *
- * Once the transaction is ready for use, ready_fn will be called. If the transaction fails, any pending query will be forgotten, and error_fn called.
+ * Once the transaction is ready for use, ready_fn will be called. If the transaction fails, any pending query will be
+ * forgotten, and error_fn called. This also includes some (but not all) cases where evsql_query returns nonzero.
+ *
  */
-struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, void *cb_arg);
+struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, evsql_trans_done_cb done_fn, void *cb_arg);
 
 /*
  * Queue the given query for execution.
@@ -168,6 +175,26 @@
 struct evsql_query *evsql_query_params (struct evsql *evsql, struct evsql_trans *trans, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg);
 
 /*
+ * Commit a transaction, calling done_fn if it was succesfull (error_fn otherwise).
+ */
+int evsql_trans_commit (struct evsql_trans *trans);
+
+/*
+ * Abort a transaction, rolling it back. No callbacks will be called, unless this function returns nonzero, in which
+ * case error_fn might be called.
+ */
+int evsql_trans_abort (struct evsql_trans *trans);
+
+/*
+ * Transaction-handling functions
+ */
+
+// error string, meant to be called from evsql_trans_error_cb
+const char *evsql_trans_error (struct evsql_trans *trans);
+
+// commit the transaction, calling 
+
+/*
  * Param-building functions
  */
 int evsql_param_string (struct evsql_query_params *params, size_t param, const char *ptr);
--- a/src/evsql_internal.h	Sun Oct 12 20:10:47 2008 +0300
+++ b/src/evsql_internal.h	Sun Oct 12 21:59:52 2008 +0300
@@ -69,12 +69,13 @@
  */
 struct evsql_trans {
     // our evsql_conn/evsql
-    //struct evsql *evsql;
+    struct evsql *evsql;
     struct evsql_conn *conn;
     
     // callbacks
     evsql_trans_error_cb error_fn;
     evsql_trans_ready_cb ready_fn;
+    evsql_trans_done_cb done_fn;
     void *cb_arg;
 
     // the transaction type
--- a/src/evsql_util.c	Sun Oct 12 20:10:47 2008 +0300
+++ b/src/evsql_util.c	Sun Oct 12 21:59:52 2008 +0300
@@ -2,6 +2,7 @@
 
 #include "evsql.h"
 #include "evsql_internal.h"
+#include "evpq.h"
 #include "lib/error.h"
 #include "lib/misc.h"
 
@@ -168,3 +169,24 @@
             FATAL("res->evsql->type");
     }
 }
+
+const char *evsql_conn_error (struct evsql_conn *conn) {
+    switch (conn->evsql->type) {
+        case EVSQL_EVPQ:
+            if (!conn->engine.evpq)
+                return "unknown error (no conn)";
+            
+            return evpq_error_message(conn->engine.evpq);
+
+        default:
+            FATAL("res->evsql->type");
+    }
+}
+
+const char *evsql_trans_error (struct evsql_trans *trans) {
+    if (trans->conn == NULL)
+        return "unknown error (no trans conn)";
+
+    return evsql_conn_error(trans->conn);
+}
+