new evsql for queueing SQL queries
authorTero Marttila <terom@fixme.fi>
Sun, 12 Oct 2008 00:17:09 +0300
changeset 21 e5da1d428e3e
parent 20 f0ef6d8880b4
child 22 85ba190a9e68
new evsql for queueing SQL queries
src/evpq.c
src/evpq.h
src/evsql.c
src/evsql.h
--- a/src/evpq.c	Sat Oct 11 21:35:48 2008 +0300
+++ b/src/evpq.c	Sun Oct 12 00:17:09 2008 +0300
@@ -9,6 +9,7 @@
 struct evpq_conn {
     struct event_base *ev_base;
     struct evpq_callback_info user_cb;
+    void *user_cb_arg;
 
     PGconn *pg_conn;
 
@@ -25,7 +26,7 @@
     conn->state = EVPQ_FAILURE;
 
     // notify
-    conn->user_cb.fn_failure(conn, conn->user_cb.cb_arg);
+    conn->user_cb.fn_failure(conn, conn->user_cb_arg);
 }
 
 /*
@@ -36,7 +37,7 @@
     conn->state = EVPQ_CONNECTED;
 
     // notify
-    conn->user_cb.fn_connected(conn, conn->user_cb.cb_arg);
+    conn->user_cb.fn_connected(conn, conn->user_cb_arg);
 }
 
 /*
@@ -61,14 +62,14 @@
         conn->state = EVPQ_CONNECTED;
 
         // tell the user the query is done
-        conn->user_cb.fn_done(conn, conn->user_cb.cb_arg);
+        conn->user_cb.fn_done(conn, conn->user_c_arg);
 
         // stop waiting for more results
         return 1;
 
     } else {
         // got a result, give it to the user
-        conn->user_cb.fn_result(conn, result, conn->user_cb.cb_arg);
+        conn->user_cb.fn_result(conn, result, conn->user_cb_arg);
 
         // great
         return 0;
@@ -206,7 +207,7 @@
 
 }
 
-struct evpq_conn *evpq_connect (struct event_base *ev_base, const char *conninfo, const struct evpq_callback_info cb_info) {
+struct evpq_conn *evpq_connect (struct event_base *ev_base, const char *conninfo, const struct evpq_callback_info cb_info, void *cb_arg) {
     struct evpq_conn *conn = NULL;
     
     // alloc our context
@@ -216,6 +217,7 @@
     // initial state
     conn->ev_base = ev_base;
     conn->user_cb = cb_info;
+    conn->user_cb_arg = cb_arg;
     conn->state = EVPQ_INIT;
 
     // create our PGconn
@@ -272,6 +274,10 @@
     return -1;
 }
 
+enum evpq_state evpq_state (struct evpq_conn *conn) {
+    return conn->state;
+}
+
 const PGconn *evpq_pgconn (struct evpq_conn *conn) {
     return conn->pg_conn;
 }
--- a/src/evpq.h	Sat Oct 11 21:35:48 2008 +0300
+++ b/src/evpq.h	Sun Oct 12 00:17:09 2008 +0300
@@ -33,16 +33,13 @@
     void (*fn_done)(struct evpq_conn *conn, void *arg);
     
     /*
-     * Something caused this evpq_conn to fail :(
+     * The evpq_conn has suffered a complete failure.
+     *
+     * Most likely, this means that the connection to the server was lost, or not established at all.
      *
      * XXX: add a `what` arg?
      */
     void (*fn_failure)(struct evpq_conn *conn, void *arg);
-
-    /*
-     * Arg to pass through to all callbacks.
-     */
-    void *cb_arg;
 };
 
 /*
@@ -69,7 +66,7 @@
  *
  * cb_info contains the callback functions (and the user argument) to use.
  */
-struct evpq_conn *evpq_connect (struct event_base *ev_base, const char *conninfo, const struct evpq_callback_info cb_info);
+struct evpq_conn *evpq_connect (struct event_base *ev_base, const char *conninfo, const struct evpq_callback_info cb_info, void *cb_arg);
 
 /*
  * Execute a query.
@@ -83,6 +80,11 @@
 int evpq_query (struct evpq_conn *conn, const char *command);
 
 /*
+ * Connection state à la evpq.
+ */
+enum evpq_state evpq_state (struct evpq_conn *conn);
+
+/*
  * Get the actual PGconn.
  *
  * This can safely be used to access all of the normal PQ functions.
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/evsql.c	Sun Oct 12 00:17:09 2008 +0300
@@ -0,0 +1,344 @@
+#define _GNU_SOURCE
+#include <stdlib.h>
+#include <sys/queue.h>
+#include <assert.h>
+#include <string.h>
+
+#include "evsql.h"
+#include "evpq.h"
+#include "lib/log.h"
+#include "lib/error.h"
+#include "lib/misc.h"
+
+enum evsql_type {
+    EVSQL_EVPQ,
+};
+
+struct evsql {
+    // callbacks
+    evsql_error_cb error_fn;
+    void *cb_arg;
+
+    // backend engine
+    enum evsql_type type;
+
+    union {
+        struct evpq_conn *evpq;
+    } engine;
+    
+    // list of queries running or waiting to run
+    TAILQ_HEAD(evsql_queue, evsql_query) queue;
+};
+
+struct evsql_query {
+    // the evsql we are querying
+    struct evsql *evsql;
+
+    // the actual SQL query, this may or may not be ours, see _evsql_query_exec
+    char *command;
+
+    // our callback
+    evsql_query_cb cb_fn;
+    void *cb_arg;
+
+    // our position in the query list
+    TAILQ_ENTRY(evsql_query) entry;
+
+    // the result
+    union {
+        PGresult *evpq;
+    } result;
+};
+
+/*
+ * Actually execute the given query.
+ *
+ * The backend should be able to accept the query at this time.
+ *
+ * query->command must be valid during the execution of this function, but once it returns, the command is not needed
+ * anymore, and should be set to NULL.
+ */
+static int _evsql_query_exec (struct evsql *evsql, struct evsql_query *query, const char *command) {
+    switch (evsql->type) {
+        case EVSQL_EVPQ:
+            // just pass it through
+            return evpq_query(evsql->engine.evpq, command);
+        
+        default:
+            FATAL("evsql->type");
+    }
+}
+
+/*
+ * Dequeue the query, execute the callback, and free it.
+ */
+static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *result_info) {
+    assert(query->command == NULL);
+
+    // dequeue
+    TAILQ_REMOVE(&query->evsql->queue, query, entry);
+    
+    if (result_info) 
+        // call the callback
+        query->cb_fn(*result_info, query->cb_arg);
+    
+    // free
+    free(query);
+}
+
+/*
+ * A query has failed, notify the user and remove it.
+ */
+static void _evsql_query_failure (struct evsql *evsql, struct evsql_query *query) {
+    struct evsql_result_info result; ZINIT(result);
+
+    // set up the result_info
+    result.evsql = evsql;
+    result.error = 1;
+
+    // finish it off
+    _evsql_query_done(query, &result);
+}
+
+/*
+ * Clear every enqueued query and then free the evsql.
+ *
+ * If result_info is given, each query will also recieve it via their callback, and the error_fn will be called.
+ */
+static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *result_info) {
+    struct evsql_query *query;
+    
+    // clear the queue
+    while ((query = TAILQ_FIRST(&evsql->queue)) != NULL) {
+        _evsql_query_done(query, result_info);
+        
+        TAILQ_REMOVE(&evsql->queue, query, entry);
+    }
+    
+    // do the error callback if required
+    if (result_info)
+        evsql->error_fn(evsql, evsql->cb_arg);
+    
+    // free
+    free(evsql);
+}
+
+
+/*
+ * Sends the next query if there are more enqueued
+ */
+static void _evsql_pump (struct evsql *evsql) {
+    struct evsql_query *query;
+    
+    // look for the next query
+    if ((query = TAILQ_FIRST(&evsql->queue)) != NULL) {
+        // try and execute it
+        if (_evsql_query_exec(evsql, query, query->command)) {
+            // the query failed
+            _evsql_query_failure(evsql, query);
+        }
+
+        // free the command
+        free(query->command); query->command = NULL;
+
+        // ok, then we just wait
+    }
+}
+
+
+static void _evsql_evpq_connected (struct evpq_conn *conn, void *arg) {
+    struct evsql *evsql = arg;
+
+    // no state to update, just pump any waiting queries
+    _evsql_pump(evsql);
+}
+
+static void _evsql_evpq_result (struct evpq_conn *conn, PGresult *result, void *arg) {
+    struct evsql *evsql = arg;
+    struct evsql_query *query;
+
+    assert((query = TAILQ_FIRST(&evsql->queue)) != NULL);
+
+    // if we get multiple results, only return the first one
+    if (query->result.evpq) {
+        WARNING("[evsql] evpq query returned multiple results, discarding previous one");
+        
+        PQclear(query->result.evpq); query->result.evpq = NULL;
+    }
+    
+    // remember the result
+    query->result.evpq = result;
+}
+
+static void _evsql_evpq_done (struct evpq_conn *conn, void *arg) {
+    struct evsql *evsql = arg;
+    struct evsql_query *query;
+    struct evsql_result_info result; ZINIT(result);
+
+    assert((query = TAILQ_FIRST(&evsql->queue)) != NULL);
+    
+    // set up the result_info
+    result.evsql = evsql;
+    
+    if (query->result.evpq == NULL) {
+        // if a query didn't return any results (bug?), warn and fail the query
+        WARNING("[evsql] evpq query didn't return any results");
+
+        result.error = 1;
+
+    } else {
+        result.error = 0;
+        result.result.pq = query->result.evpq;
+
+    }
+
+    // finish it off
+    _evsql_query_done(query, &result);
+
+    // pump the next one
+    _evsql_pump(evsql);
+}
+
+static void _evsql_evpq_failure (struct evpq_conn *conn, void *arg) {
+    struct evsql *evsql = arg;
+    struct evsql_result_info result; ZINIT(result);
+    
+    // OH SHI...
+    
+    // set up the result_info
+    result.evsql = evsql;
+    result.error = 1;
+
+    // finish off the whole connection
+    _evsql_destroy(evsql, &result);
+}
+
+static struct evpq_callback_info _evsql_evpq_cb_info = {
+    .fn_connected       = _evsql_evpq_connected,
+    .fn_result          = _evsql_evpq_result,
+    .fn_done            = _evsql_evpq_done,
+    .fn_failure         = _evsql_evpq_failure,
+};
+
+static struct evsql *_evsql_new_base (evsql_error_cb error_fn, void *cb_arg) {
+    struct evsql *evsql = NULL;
+    
+    // allocate it
+    if ((evsql = calloc(1, sizeof(*evsql))) == NULL)
+        ERROR("calloc");
+
+    // store
+    evsql->error_fn = error_fn;
+    evsql->cb_arg = cb_arg;
+
+    // init
+    TAILQ_INIT(&evsql->queue);
+
+    // done
+    return evsql;
+
+error:
+    return NULL;
+}
+
+struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) {
+    struct evsql *evsql = NULL;
+    
+    // base init
+    if ((evsql = _evsql_new_base (error_fn, cb_arg)) == NULL)
+        goto error;
+
+    // connect the engine
+    if ((evsql->engine.evpq = evpq_connect(ev_base, pq_conninfo, _evsql_evpq_cb_info, evsql)) == NULL)
+        goto error;
+
+    // done
+    return evsql;
+
+error:
+    // XXX: more complicated than this?
+    free(evsql); 
+
+    return NULL;
+}
+
+/*
+ * Checks what the state of the connection is in regards to executing a query.
+ *
+ * Returns:
+ *      <0      connection failure, query not possible
+ *      0       connection idle, can query immediately
+ *      1       connection busy, must queue query
+ */
+static int _evsql_query_idle (struct evsql *evsql) {
+    switch (evsql->type) {
+        case EVSQL_EVPQ: {
+            enum evpq_state state = evpq_state(evsql->engine.evpq);
+            
+            switch (state) {
+                case EVPQ_CONNECT:
+                case EVPQ_QUERY:
+                    return 1;
+                
+                case EVPQ_CONNECTED:
+                    return 0;
+
+                case EVPQ_INIT:
+                case EVPQ_FAILURE:
+                    return -1;
+                
+                default:
+                    FATAL("evpq_state");
+            }
+
+        }
+        
+        default:
+            FATAL("evsql->type");
+    }
+}
+
+
+struct evsql_query *evsql_query (struct evsql *evsql, const char *command, evsql_query_cb query_fn, void *cb_arg) {
+    struct evsql_query *query;
+    int idle;
+
+    // allocate it
+    if ((query = calloc(1, sizeof(*query))) == NULL)
+        ERROR("calloc");
+
+    // store
+    query->evsql = evsql;
+    query->cb_fn = query_fn;
+    query->cb_arg = cb_arg;
+    
+    // check state
+    if ((idle = _evsql_query_idle(evsql)) < 0)
+        ERROR("connection is not valid");
+    
+    if (idle) {
+        assert(TAILQ_EMPTY(&evsql->queue));
+
+        // execute directly
+        if (_evsql_query_exec(evsql, query, command))
+            goto error;
+
+    } else {
+        // copy the command for later execution
+        if ((query->command = strdup(command)) == NULL)
+            ERROR("strdup");
+    }
+    
+    // store it on the list
+    TAILQ_INSERT_TAIL(&evsql->queue, query, entry);
+    
+    // success
+    return query;
+
+error:
+    // do *NOT* free query->command, ever
+    free(query);
+
+    return NULL;
+}
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/evsql.h	Sun Oct 12 00:17:09 2008 +0300
@@ -0,0 +1,80 @@
+#ifndef EVSQL_H
+#define EVSQL_H
+
+/*
+ * An event-based (Postgre)SQL client API using libevent
+ */
+
+// XXX: libpq
+#include <postgresql/libpq-fe.h>
+#include <event2/event.h>
+
+/*
+ * The generic context handle
+ */
+struct evsql;
+
+/*
+ * A query handle
+ */
+struct evsql_query;
+
+/*
+ * Query parameter info
+ * /
+struct evsql_query_params {
+    int count;
+    const char * const *values;
+    const int *lengths;
+    const int *formats;
+    int result_format;
+}; */
+
+/*
+ * Result type
+ */
+struct evsql_result_info {
+    struct evsql *evsql;
+    
+    int error;
+
+    union {
+        // XXX: libpq
+        PGresult *pq;
+
+    } result;
+};
+
+/*
+ * Callback for handling query-level errors.
+ *
+ * The query has completed, either succesfully or unsuccesfully (look at info.error).
+ * info.result contains the result à la the evsql's type.
+ */
+typedef void (*evsql_query_cb)(struct evsql_result_info info, void *arg);
+
+/*
+ * Callback for handling connection-level errors.
+ *
+ * The SQL context/connection suffered an error. It is not valid anymore, and may not be used.
+ */
+typedef void (*evsql_error_cb)(struct evsql *evsql, void *arg);
+
+/*
+ * Create a new PostgreSQL/libpq(evpq) -based evsql using the given conninfo.
+ */
+struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg);
+
+/*
+ * Queue the given query for execution.
+ */
+struct evsql_query *evsql_query (struct evsql *evsql, const char *command, evsql_query_cb query_fn, void *cb_arg);
+
+/*
+ * Close a connection. Callbacks for waiting queries will not be run.
+ *
+ * XXX: not implemented yet.
+ */
+void evsql_close (struct evsql *evsql);
+
+#endif /* EVSQL_H */