evsql transactions, it compiles...
authorTero Marttila <terom@fixme.fi>
Sun, 12 Oct 2008 20:10:47 +0300
changeset 25 99a41f48e29b
parent 24 82cfdb6680d1
child 26 61668c57f4bb
evsql transactions, it compiles...
Makefile
src/dbfs.c
src/evpq.c
src/evpq.h
src/evsql.c
src/evsql.h
src/evsql_internal.h
src/evsql_util.c
--- a/Makefile	Sun Oct 12 14:57:06 2008 +0300
+++ b/Makefile	Sun Oct 12 20:10:47 2008 +0300
@@ -21,7 +21,7 @@
 bin/simple_hello: obj/evfuse.o obj/dirbuf.o obj/lib/log.o obj/lib/signals.o obj/simple.o
 bin/evpq_test: obj/evpq.o obj/lib/log.o
 bin/url_test: obj/lib/url.o obj/lib/lex.o obj/lib/log.o
-bin/dbfs: obj/evsql.o obj/evpq.o obj/evfuse.o obj/dirbuf.o obj/lib/log.o obj/lib/signals.o
+bin/dbfs: obj/evsql.o obj/evsql_util.o obj/evpq.o obj/evfuse.o obj/dirbuf.o obj/lib/log.o obj/lib/signals.o
 
 # computed
 LDFLAGS = ${LIBRARY_PATHS} ${LIBRARY_LIST}
--- a/src/dbfs.c	Sun Oct 12 14:57:06 2008 +0300
+++ b/src/dbfs.c	Sun Oct 12 20:10:47 2008 +0300
@@ -56,51 +56,98 @@
 
 }
 
+/*
+ * Check the result set.
+ *
+ * Returns;
+ *  -1  if the query failed, the columns do not match, or there are too many/few rows
+ *  0   the results match
+ *  1   there were no results
+ */
+int _dbfs_check_res (const struct evsql_result_info *res, size_t rows, size_t cols) {
+    int err = 0;
+
+    // check if it failed
+    if (res->error)
+        NERROR(evsql_result_error(res));
+        
+    // not found?
+    if (evsql_result_rows(res) == 0)
+        SERROR(err = 1);
+
+    // duplicate rows?
+    if (evsql_result_rows(res) != rows)
+        ERROR("multiple rows returned");
+    
+    // correct number of columns
+    if (evsql_result_cols(res) != 5)
+        ERROR("wrong number of columns: %zu", evsql_result_cols(res));
+
+    // good
+    return 0;
+
+error:
+    if (!err)
+        err = -1;
+
+    return err;
+}
+
+int _dbfs_stat_info (struct stat *st, const struct evsql_result_info *res, size_t row, size_t col_offset) {
+    int err = 0;
+    
+    uint16_t mode;
+    uint64_t size, nlink;
+    const char *type;
+    
+    // extract the data
+    if (0
+        ||  evsql_result_string(res, row, col_offset + 0, &type,       0 ) // inodes.type
+        ||  evsql_result_uint16(res, row, col_offset + 1, &mode,       0 ) // inodes.mode
+        ||  evsql_result_uint64(res, row, col_offset + 2, &size,       0 ) // inodes.size
+        ||  evsql_result_uint64(res, row, col_offset + 3, &nlink,      0 ) // count(*)
+    )
+        EERROR(err = EIO, "invalid db data");
+
+    INFO("\tst_mode=S_IF%s | %ho, st_nlink=%llu, st_size=%llu", type, mode, (long long unsigned int) nlink, (long long unsigned int) size);
+
+    // convert and store
+    st->st_mode = _dbfs_mode(type) | mode;
+    st->st_nlink = nlink;
+    st->st_size = size;
+    
+    // good
+    return 0;
+
+error:
+    return -1;
+}
+
 void _dbfs_lookup_result (const struct evsql_result_info *res, void *arg) {
     struct fuse_req *req = arg;
     struct fuse_entry_param e; ZINIT(e);
     int err = 0;
     
-    uint16_t mode;
     uint32_t ino;
-    uint64_t size, nlink;
-    const char *type;
     
-    // check if it failed
-    if (res->error && (err = EIO))
-        NERROR(evsql_result_error(res));
-
-    // duplicate rows?
-    if (evsql_result_rows(res) > 1)
-        EERROR(err = EIO, "multiple rows returned");
-
-    // not found?
-    if (evsql_result_rows(res) == 0)
-        SERROR(err = ENOENT);
-    
-    // correct number of columns
-    if (evsql_result_cols(res) != 5)
-        EERROR(err = EIO, "wrong number of columns: %zu", evsql_result_cols(res));
+    // check the results
+    if ((err = _dbfs_check_res(res, 1, 5)))
+        SERROR(err = (err ==  1 ? ENOENT : EIO));
     
     // get the data
     if (0
         ||  evsql_result_uint32(res, 0, 0, &ino,        0 ) // inodes.ino
-        ||  evsql_result_string(res, 0, 1, &type,       0 ) // inodes.type
-        ||  evsql_result_uint16(res, 0, 2, &mode,       0 ) // inodes.mode
-        ||  evsql_result_uint64(res, 0, 3, &size,       0 ) // inodes.size
-        ||  evsql_result_uint64(res, 0, 4, &nlink,      0 ) // count(*)
     )
         EERROR(err = EIO, "invalid db data");
+        
+    INFO("[dbfs.lookup] -> ion=%u", ino);
     
-    INFO("[dbfs.look] -> ino=%u, st_mode=S_IF%s | %ho, st_nlink=%llu, st_size=%llu", ino, type, mode, (long long unsigned int) nlink, (long long unsigned int) size);
+    // stat attrs
+    if (_dbfs_stat_info(&e.attr, res, 0, 1))
+        goto error;
 
-    // convert and store
+    // other attrs
     e.ino = e.attr.st_ino = ino;
-    e.attr.st_mode = _dbfs_mode(type) | mode;
-    e.attr.st_nlink = nlink;
-    e.attr.st_size = size;
-    
-    // XXX: timeouts
     e.attr_timeout = CACHE_TIMEOUT;
     e.entry_timeout = CACHE_TIMEOUT;
         
@@ -127,7 +174,7 @@
         "SELECT"
         " inodes.ino, inodes.type, inodes.mode, inodes.size, count(*)"
         " FROM file_tree INNER JOIN inodes ON (file_tree.inode = inodes.ino)"
-        " WHERE file_tree.parent = $1::int AND file_tree.name = $2::varchar"
+        " WHERE file_tree.parent = $1::int4 AND file_tree.name = $2::varchar"
         " GROUP BY inodes.ino, inodes.type, inodes.mode, inodes.size";
     
     static struct evsql_query_params params = EVSQL_PARAMS(EVSQL_FMT_BINARY) {
@@ -145,7 +192,7 @@
         EERROR(err = EIO, "evsql_param_*");
 
     // query
-    if (evsql_query_params(ctx->db, sql, &params, _dbfs_lookup_result, req) == NULL)
+    if (evsql_query_params(ctx->db, NULL, sql, &params, _dbfs_lookup_result, req) == NULL)
         EERROR(err = EIO, "evsql_query_params");
 
     // XXX: handle interrupts
@@ -158,11 +205,85 @@
         EWARNING(err, "fuse_reply_err");
 }
 
+void _dbfs_getattr_result (const struct evsql_result_info *res, void *arg) {
+    struct fuse_req *req = arg;
+    struct stat st; ZINIT(st);
+    int err = 0;
+    
+    // check the results
+    if ((err = _dbfs_check_res(res, 1, 4)))
+        SERROR(err = (err ==  1 ? ENOENT : EIO));
+        
+    INFO("[dbfs.getattr %p] -> (stat follows)", req);
+    
+    // stat attrs
+    if (_dbfs_stat_info(&st, res, 0, 0))
+        goto error;
+
+    // XXX: we don't have the ino
+    st.st_ino = 0;
+
+    // reply
+    if ((err = fuse_reply_attr(req, &st, CACHE_TIMEOUT)))
+        EERROR(err, "fuse_reply_entry");
+
+error:
+    if (err && (err = fuse_reply_err(req, err)))
+        EWARNING(err, "fuse_reply_err");
+
+    // free
+    evsql_result_free(res);
+}
+
+static void dbfs_getattr (struct fuse_req *req, fuse_ino_t ino, struct fuse_file_info *fi) {
+    struct dbfs *ctx = fuse_req_userdata(req);
+    int err;
+    
+    (void) fi;
+
+    INFO("[dbfs.getattr %p] ino=%lu", req, ino);
+
+    const char *sql =
+        "SELECT"
+        " inodes.type, inodes.mode, inodes.size, count(*)"
+        " FROM inodes"
+        " WHERE inodes.ino = ‰1::int4"
+        " GROUP BY inodes.type, inodes.mode, inodes.size";
+
+    static struct evsql_query_params params = EVSQL_PARAMS(EVSQL_FMT_BINARY) {
+        EVSQL_PARAM ( UINT32 ),
+
+        EVSQL_PARAMS_END
+    };
+
+    // build params
+    if (0
+        ||  evsql_param_uint32(&params, 0, ino)
+    )
+        SERROR(err = EIO);
+        
+    // query
+    if (evsql_query_params(ctx->db, NULL, sql, &params, _dbfs_getattr_result, req) == NULL)
+        SERROR(err = EIO);
+
+    // XXX: handle interrupts
+    
+    // wait
+    return;
+
+error:
+    if ((err = fuse_reply_err(req, err)))
+        EWARNING(err, "fuse_reply_err");
+}
+
 struct fuse_lowlevel_ops dbfs_llops = {
+
     .init           = dbfs_init,
     .destroy        = dbfs_destroy,
     
     .lookup         = dbfs_lookup,
+
+    .getattr        = dbfs_getattr,
 };
 
 void dbfs_sql_error (struct evsql *evsql, void *arg) {
--- a/src/evpq.c	Sun Oct 12 14:57:06 2008 +0300
+++ b/src/evpq.c	Sun Oct 12 20:10:47 2008 +0300
@@ -239,12 +239,8 @@
     return conn;
 
 error:
-    if (conn) {
-        if (conn->pg_conn)
-            PQfinish(conn->pg_conn);
-        
-        free(conn);
-    }
+    if (conn) 
+        evpq_release(conn);
 
     return NULL;
 }
@@ -319,6 +315,16 @@
 
 }
 
+void evpq_release (struct evpq_conn *conn) {
+    if (conn->ev)
+        event_free(conn->ev);
+
+    if (conn->pg_conn)
+        PQfinish(conn->pg_conn);
+    
+    free(conn);
+}
+
 enum evpq_state evpq_state (struct evpq_conn *conn) {
     return conn->state;
 }
--- a/src/evpq.h	Sun Oct 12 14:57:06 2008 +0300
+++ b/src/evpq.h	Sun Oct 12 20:10:47 2008 +0300
@@ -101,5 +101,11 @@
 // convenience wrappers
 #define evpq_error_message(conn) PQerrorMessage(evpq_pgconn(conn))
 
+/*
+ * Release the evpq_conn, closing all connections and freeing all resources.
+ *
+ * You must call this yourself in all cases after evpq_connect returns an evpq_conn.
+ */
+void evpq_release (struct evpq_conn *conn);
 
 #endif /* EVPQ_H */
--- a/src/evsql.c	Sun Oct 12 14:57:06 2008 +0300
+++ b/src/evsql.c	Sun Oct 12 20:10:47 2008 +0300
@@ -1,82 +1,31 @@
 #define _GNU_SOURCE
 #include <stdlib.h>
-#include <sys/queue.h>
 #include <assert.h>
 #include <string.h>
 
 #include "evsql.h"
+#include "evsql_internal.h"
 #include "evpq.h"
 #include "lib/log.h"
 #include "lib/error.h"
 #include "lib/misc.h"
 
-enum evsql_type {
-    EVSQL_EVPQ,
-};
-
-struct evsql {
-    // callbacks
-    evsql_error_cb error_fn;
-    void *cb_arg;
-
-    // backend engine
-    enum evsql_type type;
-
-    union {
-        struct evpq_conn *evpq;
-    } engine;
-    
-    // list of queries running or waiting to run
-    TAILQ_HEAD(evsql_queue, evsql_query) queue;
-};
-
-struct evsql_query {
-    // the evsql we are querying
-    struct evsql *evsql;
-
-    // the actual SQL query, this may or may not be ours, see _evsql_query_exec
-    char *command;
-    
-    // possible query params
-    struct evsql_query_param_info {
-        int count;
-
-        Oid *types;
-        const char **values;
-        int *lengths;
-        int *formats;
-
-        int result_format;
-    } params;
-
-    // our callback
-    evsql_query_cb cb_fn;
-    void *cb_arg;
-
-    // our position in the query list
-    TAILQ_ENTRY(evsql_query) entry;
-
-    // the result
-    union {
-        PGresult *evpq;
-    } result;
-};
-
 
 /*
  * Actually execute the given query.
  *
  * The backend should be able to accept the query at this time.
  *
- * query->command must be valid during the execution of this function, but once it returns, the command is not needed
- * anymore, and should be set to NULL.
+ * You should assume that if trying to execute a query fails, then the connection should also be considred as failed.
  */
-static int _evsql_query_exec (struct evsql *evsql, struct evsql_query *query, const char *command) {
-    switch (evsql->type) {
+static int _evsql_query_exec (struct evsql_conn *conn, struct evsql_query *query, const char *command) {
+    int err;
+
+    switch (conn->evsql->type) {
         case EVSQL_EVPQ:
             // got params?
             if (query->params.count) {
-                return evpq_query_params(evsql->engine.evpq, command,
+                err = evpq_query_params(conn->engine.evpq, command,
                     query->params.count, 
                     query->params.types, 
                     query->params.values, 
@@ -87,16 +36,33 @@
 
             } else {
                 // plain 'ole query
-                return evpq_query(evsql->engine.evpq, command);
+                err = evpq_query(conn->engine.evpq, command);
             }
+
+            if (err) {
+                if (PQstatus(evpq_pgconn(conn->engine.evpq)) != CONNECTION_OK)
+                    WARNING("conn failed");
+                else
+                    WARNING("query failed, dropping conn as well");
+            }
+
+            break;
         
         default:
             FATAL("evsql->type");
     }
+
+    if (!err)
+        // assign the query
+        conn->query = query;
+
+    return err;
 }
 
 /*
- * Free the query and related resources, doesn't trigger any callbacks or remove from any queues
+ * Free the query and related resources, doesn't trigger any callbacks or remove from any queues.
+ *
+ * The command should already be taken care of (NULL).
  */
 static void _evsql_query_free (struct evsql_query *query) {
     assert(query->command == NULL);
@@ -112,12 +78,9 @@
 }
 
 /*
- * Dequeue the query, execute the callback, and free it.
+ * Execute the callback if res is given, and free the query.
  */
 static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *res) {
-    // dequeue
-    TAILQ_REMOVE(&query->evsql->queue, query, entry);
-    
     if (res) 
         // call the callback
         query->cb_fn(res, query->cb_arg);
@@ -127,77 +90,263 @@
 }
 
 /*
- * A query has failed, notify the user and remove it.
- */
-static void _evsql_query_failure (struct evsql *evsql, struct evsql_query *query) {
-    struct evsql_result_info res; ZINIT(res);
-
-    // set up the result_info
-    res.evsql = evsql;
-    res.error = 1;
-
-    // finish it off
-    _evsql_query_done(query, &res);
-}
-
-/*
- * Clear every enqueued query and then free the evsql.
- *
- * If result_info is given, each query will also recieve it via their callback, and the error_fn will be called.
- */
+ * XXX:
+ * /
 static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *res) {
     struct evsql_query *query;
     
     // clear the queue
-    while ((query = TAILQ_FIRST(&evsql->queue)) != NULL) {
+    while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
         _evsql_query_done(query, res);
         
-        TAILQ_REMOVE(&evsql->queue, query, entry);
+        TAILQ_REMOVE(&evsql->query_queue, query, entry);
     }
     
-    // do the error callback if required
-    if (res)
-        evsql->error_fn(evsql, evsql->cb_arg);
-    
     // free
     free(evsql);
 }
-
+*/
 
 /*
- * Sends the next query if there are more enqueued
+ * Free the transaction, it should already be deassociated from the query and conn.
  */
-static void _evsql_pump (struct evsql *evsql) {
-    struct evsql_query *query;
+static void _evsql_trans_free (struct evsql_trans *trans) {
+    // ensure we don't leak anything
+    assert(trans->query == NULL);
+    assert(trans->conn == NULL);
     
-    // look for the next query
-    if ((query = TAILQ_FIRST(&evsql->queue)) != NULL) {
-        // try and execute it
-        if (_evsql_query_exec(evsql, query, query->command)) {
-            // the query failed
-            _evsql_query_failure(evsql, query);
+    // free
+    free(trans);
+}
+
+/*
+ * Release a connection. It should already be deassociated from the trans and query.
+ *
+ * Releases the engine, removes from the conn_list and frees this.
+ */
+static void _evsql_conn_release (struct evsql_conn *conn) {
+    // ensure we don't leak anything
+    assert(conn->trans == NULL);
+    assert(conn->query == NULL);
+
+    // release the engine
+    switch (conn->evsql->type) {
+        case EVSQL_EVPQ:
+            evpq_release(conn->engine.evpq);
+            break;
+        
+        default:
+            FATAL("evsql->type");
+    }
+    
+    // remove from list
+    LIST_REMOVE(conn, entry);
+
+    // free
+    free(conn);
+}
+
+/*
+ * Fail a single query, this will trigger the callback and free it.
+ */
+static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) {
+    struct evsql_result_info res; ZINIT(res);
+    
+    // set up the result_info
+    res.evsql = evsql;
+    res.error = 1;
+    
+    // finish off the query
+    _evsql_query_done(query, &res);
+}
+
+/*
+ * Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the
+ * conn, and then free the trans.
+ */ 
+static void _evsql_trans_fail (struct evsql_trans *trans) {
+    if (trans->query) {
+        // free the query silently
+        _evsql_query_free(trans->query); trans->query = NULL;
+    }
+
+    // tell the user
+    // XXX: trans is in a bad state during this call
+    trans->error_fn(trans, trans->cb_arg);
+
+    // fail the conn
+    trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL;
+
+    // free the trans
+    _evsql_trans_free(trans);
+}
+
+/*
+ * Fail a connection. If the connection is transactional, this will just call _evsql_trans_fail, but otherwise it will
+ * fail any ongoing query, and then release the connection.
+ */
+static void _evsql_conn_fail (struct evsql_conn *conn) {
+    if (conn->trans) {
+        // let transactions handle their connection failures
+        _evsql_trans_fail(conn->trans);
+
+    } else {
+        if (conn->query) {
+            // fail the in-progress query
+            _evsql_query_fail(conn->evsql, conn->query); conn->query = NULL;
         }
 
-        // free the command
-        free(query->command); query->command = NULL;
-
-        // ok, then we just wait
+        // finish off the whole connection
+        _evsql_conn_release(conn);
     }
 }
 
+/*
+ * Processes enqueued non-transactional queries until the queue is empty, or we managed to exec a query.
+ *
+ * If execing a query on a connection fails, both the query and the connection are failed (in that order).
+ *
+ * Any further queries will then also be failed, because there's no reconnection/retry logic yet.
+ */
+static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) {
+    struct evsql_query *query;
+    int err;
+    
+    // look for waiting queries
+    while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
+        // dequeue
+        TAILQ_REMOVE(&evsql->query_queue, query, entry);
+        
+        if (conn) {
+            // try and execute it
+            err = _evsql_query_exec(conn, query, query->command);
+        }
 
-static void _evsql_evpq_connected (struct evpq_conn *conn, void *arg) {
-    struct evsql *evsql = arg;
+        // free the command buf
+        free(query->command); query->command = NULL;
 
-    // no state to update, just pump any waiting queries
-    _evsql_pump(evsql);
+        if (err || !conn) {
+            if (!conn) {
+                // warn when dropping queries
+                WARNING("failing query becuse there are no conns");
+            }
+
+            // fail the query
+            _evsql_query_fail(evsql, query);
+            
+            if (conn) {
+                // fail the connection
+                WARNING("failing the connection because a query-exec failed");
+
+                _evsql_conn_fail(conn); conn = NULL;
+            }
+
+        } else {
+            // we have succesfully enqueued a query, and we can wait for this connection to complete
+            break;
+
+        }
+
+        // handle the rest of the queue
+    }
+    
+    // ok
+    return;
 }
 
-static void _evsql_evpq_result (struct evpq_conn *conn, PGresult *result, void *arg) {
-    struct evsql *evsql = arg;
-    struct evsql_query *query;
+/*
+ * Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use.
+ */
+static void _evsql_trans_ready (const struct evsql_result_info *res, void *arg) {
+    (void) arg;
 
-    assert((query = TAILQ_FIRST(&evsql->queue)) != NULL);
+    assert(res->trans);
+
+    // check for errors
+    if (res->error)
+        ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res));
+    
+    // transaction is now ready for use
+    res->trans->ready_fn(res->trans, res->trans->cb_arg);
+
+error:
+    _evsql_trans_fail(res->trans);
+}
+
+/*
+ * The transaction's connection is ready, send the 'BEGIN' query.
+ */
+static void _evsql_trans_conn_ready (struct evsql *evsql, struct evsql_trans *trans) {
+    char trans_sql[EVSQL_QUERY_BEGIN_BUF];
+    const char *isolation_level;
+    int ret;
+    
+    // determine the isolation_level to use
+    switch (trans->type) {
+        case EVSQL_TRANS_DEFAULT:
+            isolation_level = NULL; break;
+
+        case EVSQL_TRANS_SERIALIZABLE:
+            isolation_level = "SERIALIZABLE"; break;
+
+        case EVSQL_TRANS_REPEATABLE_READ:
+            isolation_level = "REPEATABLE READ"; break;
+
+        case EVSQL_TRANS_READ_COMMITTED:
+            isolation_level = "READ COMMITTED"; break;
+
+        case EVSQL_TRANS_READ_UNCOMMITTED:
+            isolation_level = "READ UNCOMMITTED"; break;
+
+        default:
+            FATAL("trans->type: %d", trans->type);
+    }
+    
+    // build the trans_sql
+    if (isolation_level)
+        ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION ISOLATION LEVEL %s", isolation_level);
+    else
+        ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION");
+    
+    // make sure it wasn't truncated
+    if (ret >= EVSQL_QUERY_BEGIN_BUF)
+        ERROR("trans_sql overflow: %d >= %d", ret, EVSQL_QUERY_BEGIN_BUF);
+    
+    // execute the query
+    if (evsql_query(evsql, trans, trans_sql, _evsql_trans_ready, NULL))
+        ERROR("evsql_query");
+    
+    // success
+    return;
+
+error:
+    // fail the transaction
+    _evsql_trans_fail(trans);
+}
+
+/*
+ * The evpq connection was succesfully established.
+ */ 
+static void _evsql_evpq_connected (struct evpq_conn *_conn, void *arg) {
+    struct evsql_conn *conn = arg;
+
+    if (conn->trans)
+        // notify the transaction
+        _evsql_trans_conn_ready(conn->evsql, conn->trans);
+    
+    else
+        // pump any waiting transactionless queries
+        _evsql_pump(conn->evsql, conn);
+}
+
+/*
+ * Got one result on this evpq connection.
+ */
+static void _evsql_evpq_result (struct evpq_conn *_conn, PGresult *result, void *arg) {
+    struct evsql_conn *conn = arg;
+    struct evsql_query *query = conn->query;
+
+    assert(query != NULL);
 
     // if we get multiple results, only return the first one
     if (query->result.evpq) {
@@ -210,15 +359,18 @@
     query->result.evpq = result;
 }
 
-static void _evsql_evpq_done (struct evpq_conn *conn, void *arg) {
-    struct evsql *evsql = arg;
-    struct evsql_query *query;
+/*
+ * No more results for this query.
+ */
+static void _evsql_evpq_done (struct evpq_conn *_conn, void *arg) {
+    struct evsql_conn *conn = arg;
+    struct evsql_query *query = conn->query;
     struct evsql_result_info res; ZINIT(res);
-
-    assert((query = TAILQ_FIRST(&evsql->queue)) != NULL);
+    
+    assert(query != NULL);
     
     // set up the result_info
-    res.evsql = evsql;
+    res.evsql = conn->evsql;
     
     if (query->result.evpq == NULL) {
         // if a query didn't return any results (bug?), warn and fail the query
@@ -237,27 +389,39 @@
 
     }
 
-    // finish it off
-    _evsql_query_done(query, &res);
+    // de-associate the query from the connection
+    conn->query = NULL;
+    
+    // how we handle query completion depends on if we're a transaction or not
+    if (conn->trans) {
+        // we can deassign the trans's query
+        conn->trans->query = NULL;
 
-    // pump the next one
-    _evsql_pump(evsql);
+        // then hand the query to the user
+        _evsql_query_done(query, &res);
+        
+    } else {
+        // a transactionless query, so just finish it off and pump any other waiting ones
+        _evsql_query_done(query, &res);
+
+        // pump the next one
+        _evsql_pump(conn->evsql, conn);
+    }
 }
 
-static void _evsql_evpq_failure (struct evpq_conn *conn, void *arg) {
-    struct evsql *evsql = arg;
-    struct evsql_result_info result; ZINIT(result);
-    
-    // OH SHI...
+/*
+ * The connection failed.
+ */
+static void _evsql_evpq_failure (struct evpq_conn *_conn, void *arg) {
+    struct evsql_conn *conn = arg;
     
-    // set up the result_info
-    result.evsql = evsql;
-    result.error = 1;
-
-    // finish off the whole connection
-    _evsql_destroy(evsql, &result);
+    // just fail the conn
+    _evsql_conn_fail(conn);
 }
 
+/*
+ * Our evpq behaviour
+ */
 static struct evpq_callback_info _evsql_evpq_cb_info = {
     .fn_connected       = _evsql_evpq_connected,
     .fn_result          = _evsql_evpq_result,
@@ -265,7 +429,10 @@
     .fn_failure         = _evsql_evpq_failure,
 };
 
-static struct evsql *_evsql_new_base (evsql_error_cb error_fn, void *cb_arg) {
+/*
+ * Allocate the generic evsql context.
+ */
+static struct evsql *_evsql_new_base (struct event_base *ev_base, evsql_error_cb error_fn, void *cb_arg) {
     struct evsql *evsql = NULL;
     
     // allocate it
@@ -273,11 +440,13 @@
         ERROR("calloc");
 
     // store
+    evsql->ev_base = ev_base;
     evsql->error_fn = error_fn;
     evsql->cb_arg = cb_arg;
 
     // init
-    TAILQ_INIT(&evsql->queue);
+    LIST_INIT(&evsql->conn_list);
+    TAILQ_INIT(&evsql->query_queue);
 
     // done
     return evsql;
@@ -286,15 +455,55 @@
     return NULL;
 }
 
+/*
+ * Start a new connection and add it to the list, it won't be ready until _evsql_evpq_connected is called
+ */
+static struct evsql_conn *_evsql_conn_new (struct evsql *evsql) {
+    struct evsql_conn *conn = NULL;
+    
+    // allocate
+    if ((conn = calloc(1, sizeof(*conn))) == NULL)
+        ERROR("calloc");
+
+    // init
+    conn->evsql = evsql;
+    
+    // connect the engine
+    switch (evsql->type) {
+        case EVSQL_EVPQ:
+            if ((conn->engine.evpq = evpq_connect(evsql->ev_base, evsql->engine_conf.evpq, _evsql_evpq_cb_info, conn)) == NULL)
+                goto error;
+            
+            break;
+            
+        default:
+            FATAL("evsql->type");
+    }
+
+    // add it to the list
+    LIST_INSERT_HEAD(&evsql->conn_list, conn, entry);
+
+    // success
+    return conn;
+
+error:
+    free(conn);
+
+    return NULL;
+}
+
 struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) {
     struct evsql *evsql = NULL;
     
     // base init
-    if ((evsql = _evsql_new_base (error_fn, cb_arg)) == NULL)
+    if ((evsql = _evsql_new_base (ev_base, error_fn, cb_arg)) == NULL)
         goto error;
 
-    // connect the engine
-    if ((evsql->engine.evpq = evpq_connect(ev_base, pq_conninfo, _evsql_evpq_cb_info, evsql)) == NULL)
+    // store conf
+    evsql->engine_conf.evpq = pq_conninfo;
+
+    // pre-create one connection
+    if (_evsql_conn_new(evsql) == NULL)
         goto error;
 
     // done
@@ -308,50 +517,123 @@
 }
 
 /*
- * Checks what the state of the connection is in regards to executing a query.
+ * Checks if the connection is already allocated for some other trans/query.
  *
  * Returns:
- *      <0      connection failure, query not possible
- *      0       connection idle, can query immediately
- *      1       connection busy, must queue query
+ *      0       connection idle, can be allocated
+ *      >1      connection busy
  */
-static int _evsql_query_busy (struct evsql *evsql) {
-    switch (evsql->type) {
+static int _evsql_conn_busy (struct evsql_conn *conn) {
+    // transactions get the connection to themselves
+    if (conn->trans)
+        return 1;
+    
+    // if it has a query assigned, it's busy
+    if (conn->query)
+        return 1;
+
+    // otherwise, it's all idle
+    return 0;
+}
+
+/*
+ * Checks if the connection is ready for use (i.e. _evsql_evpq_connected was called).
+ *
+ * The connection should not already have a query running.
+ *
+ * Returns 
+ *  <0  the connection is not valid (failed, query in progress)
+ *  0   the connection is still pending, and will become ready at some point
+ *  >0  it's ready
+ */
+static int _evsql_conn_ready (struct evsql_conn *conn) {
+    switch (conn->evsql->type) {
         case EVSQL_EVPQ: {
-            enum evpq_state state = evpq_state(evsql->engine.evpq);
+            enum evpq_state state = evpq_state(conn->engine.evpq);
             
             switch (state) {
                 case EVPQ_CONNECT:
-                case EVPQ_QUERY:
-                    return 1;
+                    return 0;
                 
                 case EVPQ_CONNECTED:
-                    return 0;
+                    return 1;
 
+                case EVPQ_QUERY:
                 case EVPQ_INIT:
                 case EVPQ_FAILURE:
                     return -1;
                 
                 default:
-                    FATAL("evpq_state");
+                    FATAL("evpq_state: %d", state);
             }
 
         }
         
         default:
-            FATAL("evsql->type");
+            FATAL("evsql->type: %d", conn->evsql->type);
     }
 }
 
-static struct evsql_query *_evsql_query_new (struct evsql *evsql, evsql_query_cb query_fn, void *cb_arg) {
+/*
+ * Allocate a connection for use and return it via *conn_ptr, or if may_queue is nonzero and the connection pool is
+ * getting full, return NULL (query should be queued).
+ *
+ * Note that the returned connection might not be ready for use yet (if we created a new one, see _evsql_conn_ready).
+ *
+ * Returns zero if a connection was found or the request should be queued, or nonzero if something failed and the
+ * request should be dropped.
+ */
+static int _evsql_conn_get (struct evsql *evsql, struct evsql_conn **conn_ptr, int may_queue) {
+    *conn_ptr = NULL;
+    
+    // find a connection that isn't busy and is ready (unless the query queue is empty).
+    LIST_FOREACH(*conn_ptr, &evsql->conn_list, entry) {
+        // skip busy conns always
+        if (_evsql_conn_busy(*conn_ptr))
+            continue;
+        
+        // accept pending conns as long as there are NO enqueued queries (might cause deadlock otherwise)
+        if (_evsql_conn_ready(*conn_ptr) == 0 && TAILQ_EMPTY(&evsql->query_queue))
+            break;
+
+        // accept conns that are in a fully ready state
+        if (_evsql_conn_ready(*conn_ptr) > 0)
+            break;
+    }
+    
+    // if we found an idle connection, we can just return that right away
+    if (*conn_ptr)
+        return 0;
+
+    // return NULL if may_queue and the conn list is not empty
+    if (may_queue && !LIST_EMPTY(&evsql->conn_list))
+        return 0;
+    
+    // we need to open a new connection
+    if ((*conn_ptr = _evsql_conn_new(evsql)) == NULL)
+        goto error;
+
+    // good
+    return 0;
+error:
+    return -1;
+}
+
+/*
+ * Validate and allocate the basic stuff for a new query.
+ */
+static struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) {
     struct evsql_query *query;
     
+    // if it's part of a trans, then make sure the trans is idle
+    if (trans && trans->query)
+        ERROR("transaction is busy");
+
     // allocate it
     if ((query = calloc(1, sizeof(*query))) == NULL)
         ERROR("calloc");
 
     // store
-    query->evsql = evsql;
     query->cb_fn = query_fn;
     query->cb_arg = cb_arg;
 
@@ -362,29 +644,48 @@
     return NULL;
 }
 
-static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_query *query, const char *command) {
-    int busy;
-    
-    // check state
-    if ((busy = _evsql_query_busy(evsql)) < 0)
-        ERROR("connection is not valid");
-    
-    if (busy) {
-        // copy the command for later execution
-        if ((query->command = strdup(command)) == NULL)
-            ERROR("strdup");
+/*
+ * Handle a new query.
+ *
+ * For transactions this will associate the query and then execute it, otherwise this will either find an idle
+ * connection and send the query, or enqueue it.
+ */
+static int _evsql_query_enqueue (struct evsql *evsql, struct evsql_trans *trans, struct evsql_query *query, const char *command) {
+    // transaction queries are handled differently
+    if (trans) {
+        // it's an in-transaction query
+        assert(trans->query == NULL);
+        
+        // assign the query
+        trans->query = query;
+
+        // execute directly
+        if (_evsql_query_exec(trans->conn, query, command))
+            goto error;
 
     } else {
-        assert(TAILQ_EMPTY(&evsql->queue));
+        struct evsql_conn *conn;
+        
+        // find an idle connection
+        if ((_evsql_conn_get(evsql, &conn, 1)))
+            ERROR("couldn't allocate a connection for the query");
 
-        // execute directly
-        if (_evsql_query_exec(evsql, query, command))
-            goto error;
+        // we must enqueue if no idle conn or the conn is not yet ready
+        if (conn && _evsql_conn_ready(conn) > 0) {
+            // execute directly
+            if (_evsql_query_exec(conn, query, command))
+                goto error;
 
+
+        } else {
+            // copy the command for later execution
+            if ((query->command = strdup(command)) == NULL)
+                ERROR("strdup");
+            
+            // enqueue until some connection pumps the queue
+            TAILQ_INSERT_TAIL(&evsql->query_queue, query, entry);
+        }
     }
-    
-    // store it on the list
-    TAILQ_INSERT_TAIL(&evsql->queue, query, entry);
 
     // ok, good
     return 0;
@@ -393,15 +694,15 @@
     return -1;
 }
 
-struct evsql_query *evsql_query (struct evsql *evsql, const char *command, evsql_query_cb query_fn, void *cb_arg) {
+struct evsql_query *evsql_query (struct evsql *evsql, struct evsql_trans *trans, const char *command, evsql_query_cb query_fn, void *cb_arg) {
     struct evsql_query *query = NULL;
     
     // alloc new query
-    if ((query = _evsql_query_new(evsql, query_fn, cb_arg)) == NULL)
+    if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL)
         goto error;
     
     // just execute the command string directly
-    if (_evsql_query_enqueue(evsql, query, command))
+    if (_evsql_query_enqueue(evsql, trans, query, command))
         goto error;
 
     // ok
@@ -413,13 +714,13 @@
     return NULL;
 }
 
-struct evsql_query *evsql_query_params (struct evsql *evsql, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg) {
+struct evsql_query *evsql_query_params (struct evsql *evsql, struct evsql_trans *trans, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg) {
     struct evsql_query *query = NULL;
     const struct evsql_query_param *param;
     int idx;
     
     // alloc new query
-    if ((query = _evsql_query_new(evsql, query_fn, cb_arg)) == NULL)
+    if ((query = _evsql_query_new(evsql, trans, query_fn, cb_arg)) == NULL)
         goto error;
 
     // count the params
@@ -464,7 +765,7 @@
     }
 
     // execute it
-    if (_evsql_query_enqueue(evsql, query, command))
+    if (_evsql_query_enqueue(evsql, trans, query, command))
         goto error;
 
     // ok
@@ -476,166 +777,3 @@
     return NULL;
 }
 
-int evsql_param_string (struct evsql_query_params *params, size_t param, const char *ptr) {
-    struct evsql_query_param *p = &params->list[param];
-    
-    assert(p->type == EVSQL_PARAM_STRING);
-
-    p->data_raw = ptr;
-    p->length = 0;
-
-    return 0;
-}
-
-int evsql_param_uint32 (struct evsql_query_params *params, size_t param, uint32_t uval) {
-    struct evsql_query_param *p = &params->list[param];
-    
-    assert(p->type == EVSQL_PARAM_UINT32);
-
-    p->data.uint32 = htonl(uval);
-    p->data_raw = (const char *) &p->data.uint32;
-    p->length = sizeof(uval);
-
-    return 0;
-}
-
-const char *evsql_result_error (const struct evsql_result_info *res) {
-    if (!res->error)
-        return "No error";
-
-    switch (res->evsql->type) {
-        case EVSQL_EVPQ:
-            if (!res->result.pq)
-                return "unknown error (no result)";
-            
-            return PQresultErrorMessage(res->result.pq);
-
-        default:
-            FATAL("res->evsql->type");
-    }
-
-}
-
-size_t evsql_result_rows (const struct evsql_result_info *res) {
-    switch (res->evsql->type) {
-        case EVSQL_EVPQ:
-            return PQntuples(res->result.pq);
-
-        default:
-            FATAL("res->evsql->type");
-    }
-}
-
-size_t evsql_result_cols (const struct evsql_result_info *res) {
-    switch (res->evsql->type) {
-        case EVSQL_EVPQ:
-            return PQnfields(res->result.pq);
-
-        default:
-            FATAL("res->evsql->type");
-    }
-}
-
-int evsql_result_binary (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, size_t size, int nullok) {
-    *ptr = NULL;
-
-    switch (res->evsql->type) {
-        case EVSQL_EVPQ:
-            if (PQgetisnull(res->result.pq, row, col)) {
-                if (nullok)
-                    return 0;
-                else
-                    ERROR("[%zu:%zu] field is null", row, col);
-            }
-
-            if (PQfformat(res->result.pq, col) != 1)
-                ERROR("[%zu:%zu] PQfformat is not binary: %d", row, col, PQfformat(res->result.pq, col));
-    
-            if (size && PQgetlength(res->result.pq, row, col) != size)
-                ERROR("[%zu:%zu] field size mismatch: %zu -> %d", row, col, size, PQgetlength(res->result.pq, row, col));
-
-            *ptr = PQgetvalue(res->result.pq, row, col);
-
-            return 0;
-
-        default:
-            FATAL("res->evsql->type");
-    }
-
-error:
-    return -1;
-}
-
-int evsql_result_string (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, int nullok) {
-    return evsql_result_binary(res, row, col, ptr, 0, nullok);
-}
-
-int evsql_result_uint16 (const struct evsql_result_info *res, size_t row, size_t col, uint16_t *uval, int nullok) {
-    const char *data;
-    int16_t sval;
-
-    if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok))
-        goto error;
-
-    sval = ntohs(*((int16_t *) data));
-
-    if (sval < 0)
-        ERROR("negative value for unsigned: %d", sval);
-
-    *uval = sval;
-    
-    return 0;
-
-error:
-    return nullok ? 0 : -1;
-}
-
-int evsql_result_uint32 (const struct evsql_result_info *res, size_t row, size_t col, uint32_t *uval, int nullok) {
-    const char *data;
-    int32_t sval;
-
-    if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok))
-        goto error;
-
-    sval = ntohl(*(int32_t *) data);
-
-    if (sval < 0)
-        ERROR("negative value for unsigned: %d", sval);
-
-    *uval = sval;
-    
-    return 0;
-
-error:
-    return nullok ? 0 : -1;
-}
-
-int evsql_result_uint64 (const struct evsql_result_info *res, size_t row, size_t col, uint64_t *uval, int nullok) {
-    const char *data;
-    int64_t sval;
-
-    if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok))
-        goto error;
-
-    sval = ntohq(*(int64_t *) data);
-
-    if (sval < 0)
-        ERROR("negative value for unsigned: %ld", sval);
-
-    *uval = sval;
-    
-    return 0;
-
-error:
-    return nullok ? 0 : -1;
-}
-
-void evsql_result_free (const struct evsql_result_info *res) {
-    switch (res->evsql->type) {
-        case EVSQL_EVPQ:
-            return PQclear(res->result.pq);
-
-        default:
-            FATAL("res->evsql->type");
-    }
-}
--- a/src/evsql.h	Sun Oct 12 14:57:06 2008 +0300
+++ b/src/evsql.h	Sun Oct 12 20:10:47 2008 +0300
@@ -20,6 +20,22 @@
 struct evsql_query;
 
 /*
+ * Transaction handle
+ */
+struct evsql_trans;
+
+/*
+ * Transaction type
+ */
+enum evsql_trans_type {
+    EVSQL_TRANS_DEFAULT,
+    EVSQL_TRANS_SERIALIZABLE,
+    EVSQL_TRANS_REPEATABLE_READ,
+    EVSQL_TRANS_READ_COMMITTED,
+    EVSQL_TRANS_READ_UNCOMMITTED,
+};
+
+/*
  * Parameter type
  */
 enum evsql_param_format {
@@ -77,48 +93,79 @@
 /*
  * Result type
  */
-union evsql_result {
-    // libpq
-    PGresult *pq;
-};
-
 struct evsql_result_info {
     struct evsql *evsql;
+    struct evsql_trans *trans;
     
     int error;
 
-    union evsql_result result;
+    union evsql_result {
+        // libpq
+        PGresult *pq;
+    } result;
 };
 
 /*
- * Callback for handling query-level errors.
+ * Callback for handling query results.
  *
- * The query has completed, either succesfully or unsuccesfully (look at info.error).
- * info.result contains the result à la the evsql's type.
+ * The query has completed, either succesfully or unsuccesfully (nonzero .error).
+ *
+ * Use the evsql_result_* functions to manipulate the results.
  */
 typedef void (*evsql_query_cb)(const struct evsql_result_info *res, void *arg);
 
 /*
- * Callback for handling connection-level errors.
+ * Callback for handling global-level errors.
  *
- * The SQL context/connection suffered an error. It is not valid anymore, and may not be used.
+ * The evsql is not useable anymore.
+ *
+ * XXX: this is not actually called yet, no retry logic implemented.
  */
 typedef void (*evsql_error_cb)(struct evsql *evsql, void *arg);
 
 /*
+ * Callback for handling transaction-level errors.
+ *
+ * The transaction is not useable anymore.
+ */
+typedef void (*evsql_trans_error_cb)(struct evsql_trans *trans, void *arg);
+
+/*
+ * The transaction is ready for use.
+ */
+typedef void (*evsql_trans_ready_cb)(struct evsql_trans *trans, void *arg);
+
+/*
  * Create a new PostgreSQL/libpq(evpq) -based evsql using the given conninfo.
+ *
+ * The given conninfo must stay valid for the duration of the evsql's lifetime.
  */
 struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg);
 
 /*
- * Queue the given query for execution.
+ * Create a new transaction.
+ *
+ * Transactions are separate connections that provide transaction-isolation.
+ *
+ * Once the transaction is ready for use, ready_fn will be called. If the transaction fails, any pending query will be forgotten, and error_fn called.
  */
-struct evsql_query *evsql_query (struct evsql *evsql, const char *command, evsql_query_cb query_fn, void *cb_arg);
+struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, void *cb_arg);
 
 /*
- * Same, but uses the SQL-level support for binding parameters.
+ * Queue the given query for execution.
+ *
+ * If trans is specified (not NULL), then the transaction must be idle, and the query will be executed in that
+ * transaction's context. Otherwise, the query will be executed without a transaction, andmay be executed immediately,
+ * or if other similar queries are running, it will be queued for later execution.
+ *
+ * Once the query is complete (got a result, got an error, the connection failed), then the query_cb will be triggered.
  */
-struct evsql_query *evsql_query_params (struct evsql *evsql, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg);
+struct evsql_query *evsql_query (struct evsql *evsql, struct evsql_trans *trans, const char *command, evsql_query_cb query_fn, void *cb_arg);
+
+/*
+ * Same as evsql_query, but uses the SQL-level support for binding parameters.
+ */
+struct evsql_query *evsql_query_params (struct evsql *evsql, struct evsql_trans *trans, const char *command, const struct evsql_query_params *params, evsql_query_cb query_fn, void *cb_arg);
 
 /*
  * Param-building functions
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/evsql_internal.h	Sun Oct 12 20:10:47 2008 +0300
@@ -0,0 +1,125 @@
+#ifndef EVSQL_INTERNAL_H
+#define EVSQL_INTERNAL_H
+
+#include <sys/queue.h>
+
+#include "evsql.h"
+
+/*
+ * The engine type
+ */
+enum evsql_type {
+    EVSQL_EVPQ,     // evpq
+};
+
+/*
+ * Contains the type, engine configuration, list of connections and waiting query queue.
+ */
+struct evsql {
+    // what event_base to use
+    struct event_base *ev_base;
+
+    // what engine we use
+    enum evsql_type type;
+
+    // callbacks
+    evsql_error_cb error_fn;
+    void *cb_arg;
+    
+    // engine-specific connection configuration
+    union {
+        const char *evpq;
+    } engine_conf;
+
+    // list of connections that are open
+    LIST_HEAD(evsql_conn_list, evsql_conn) conn_list;
+   
+    // list of queries running or waiting to run
+    TAILQ_HEAD(evsql_query_queue, evsql_query) query_queue;
+};
+
+/*
+ * A single connection to the server.
+ *
+ * Contains the engine connection, may have a transaction associated, and may have a query associated.
+ */
+struct evsql_conn {
+    // evsql we belong to
+    struct evsql *evsql;
+
+    // engine-specific connection info
+    union {
+        struct evpq_conn *evpq;
+    } engine;
+
+    // our position in the conn list
+    LIST_ENTRY(evsql_conn) entry;
+
+    // are we running a transaction?
+    struct evsql_trans *trans;
+
+    // are we running a transactionless query?
+    struct evsql_query *query;
+};
+
+/*
+ * A single transaction.
+ *
+ * Has a connection associated and possibly a query (which will also be associated with the connection)
+ */
+struct evsql_trans {
+    // our evsql_conn/evsql
+    //struct evsql *evsql;
+    struct evsql_conn *conn;
+    
+    // callbacks
+    evsql_trans_error_cb error_fn;
+    evsql_trans_ready_cb ready_fn;
+    void *cb_arg;
+
+    // the transaction type
+    enum evsql_trans_type type;
+
+    // our current query
+    struct evsql_query *query;
+
+};
+
+/*
+ * A single query.
+ *
+ * Has the info needed to exec the query (as these may be queued), and the callback/result info.
+ */
+struct evsql_query {
+    // the actual SQL query, this may or may not be ours, see _evsql_query_exec
+    char *command;
+    
+    // possible query params
+    struct evsql_query_param_info {
+        int count;
+
+        Oid *types;
+        const char **values;
+        int *lengths;
+        int *formats;
+
+        int result_format;
+    } params;
+
+    // our callback
+    evsql_query_cb cb_fn;
+    void *cb_arg;
+
+    // our position in the query list
+    TAILQ_ENTRY(evsql_query) entry;
+
+    // the result
+    union {
+        PGresult *evpq;
+    } result;
+};
+
+// maximum length for a 'BEGIN TRANSACTION ...' query
+#define EVSQL_QUERY_BEGIN_BUF 512
+
+#endif /* EVSQL_INTERNAL_H */
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/evsql_util.c	Sun Oct 12 20:10:47 2008 +0300
@@ -0,0 +1,170 @@
+#include <assert.h>
+
+#include "evsql.h"
+#include "evsql_internal.h"
+#include "lib/error.h"
+#include "lib/misc.h"
+
+int evsql_param_string (struct evsql_query_params *params, size_t param, const char *ptr) {
+    struct evsql_query_param *p = &params->list[param];
+    
+    assert(p->type == EVSQL_PARAM_STRING);
+
+    p->data_raw = ptr;
+    p->length = 0;
+
+    return 0;
+}
+
+int evsql_param_uint32 (struct evsql_query_params *params, size_t param, uint32_t uval) {
+    struct evsql_query_param *p = &params->list[param];
+    
+    assert(p->type == EVSQL_PARAM_UINT32);
+
+    p->data.uint32 = htonl(uval);
+    p->data_raw = (const char *) &p->data.uint32;
+    p->length = sizeof(uval);
+
+    return 0;
+}
+
+const char *evsql_result_error (const struct evsql_result_info *res) {
+    if (!res->error)
+        return "No error";
+
+    switch (res->evsql->type) {
+        case EVSQL_EVPQ:
+            if (!res->result.pq)
+                return "unknown error (no result)";
+            
+            return PQresultErrorMessage(res->result.pq);
+
+        default:
+            FATAL("res->evsql->type");
+    }
+
+}
+
+size_t evsql_result_rows (const struct evsql_result_info *res) {
+    switch (res->evsql->type) {
+        case EVSQL_EVPQ:
+            return PQntuples(res->result.pq);
+
+        default:
+            FATAL("res->evsql->type");
+    }
+}
+
+size_t evsql_result_cols (const struct evsql_result_info *res) {
+    switch (res->evsql->type) {
+        case EVSQL_EVPQ:
+            return PQnfields(res->result.pq);
+
+        default:
+            FATAL("res->evsql->type");
+    }
+}
+
+int evsql_result_binary (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, size_t size, int nullok) {
+    *ptr = NULL;
+
+    switch (res->evsql->type) {
+        case EVSQL_EVPQ:
+            if (PQgetisnull(res->result.pq, row, col)) {
+                if (nullok)
+                    return 0;
+                else
+                    ERROR("[%zu:%zu] field is null", row, col);
+            }
+
+            if (PQfformat(res->result.pq, col) != 1)
+                ERROR("[%zu:%zu] PQfformat is not binary: %d", row, col, PQfformat(res->result.pq, col));
+    
+            if (size && PQgetlength(res->result.pq, row, col) != size)
+                ERROR("[%zu:%zu] field size mismatch: %zu -> %d", row, col, size, PQgetlength(res->result.pq, row, col));
+
+            *ptr = PQgetvalue(res->result.pq, row, col);
+
+            return 0;
+
+        default:
+            FATAL("res->evsql->type");
+    }
+
+error:
+    return -1;
+}
+
+int evsql_result_string (const struct evsql_result_info *res, size_t row, size_t col, const char **ptr, int nullok) {
+    return evsql_result_binary(res, row, col, ptr, 0, nullok);
+}
+
+int evsql_result_uint16 (const struct evsql_result_info *res, size_t row, size_t col, uint16_t *uval, int nullok) {
+    const char *data;
+    int16_t sval;
+
+    if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok))
+        goto error;
+
+    sval = ntohs(*((int16_t *) data));
+
+    if (sval < 0)
+        ERROR("negative value for unsigned: %d", sval);
+
+    *uval = sval;
+    
+    return 0;
+
+error:
+    return nullok ? 0 : -1;
+}
+
+int evsql_result_uint32 (const struct evsql_result_info *res, size_t row, size_t col, uint32_t *uval, int nullok) {
+    const char *data;
+    int32_t sval;
+
+    if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok))
+        goto error;
+
+    sval = ntohl(*(int32_t *) data);
+
+    if (sval < 0)
+        ERROR("negative value for unsigned: %d", sval);
+
+    *uval = sval;
+    
+    return 0;
+
+error:
+    return nullok ? 0 : -1;
+}
+
+int evsql_result_uint64 (const struct evsql_result_info *res, size_t row, size_t col, uint64_t *uval, int nullok) {
+    const char *data;
+    int64_t sval;
+
+    if (evsql_result_binary(res, row, col, &data, sizeof(*uval), nullok))
+        goto error;
+
+    sval = ntohq(*(int64_t *) data);
+
+    if (sval < 0)
+        ERROR("negative value for unsigned: %ld", sval);
+
+    *uval = sval;
+    
+    return 0;
+
+error:
+    return nullok ? 0 : -1;
+}
+
+void evsql_result_free (const struct evsql_result_info *res) {
+    switch (res->evsql->type) {
+        case EVSQL_EVPQ:
+            return PQclear(res->result.pq);
+
+        default:
+            FATAL("res->evsql->type");
+    }
+}