--- a/src/evpq.c Sat Oct 11 21:35:48 2008 +0300
+++ b/src/evpq.c Sun Oct 12 00:17:09 2008 +0300
@@ -9,6 +9,7 @@
struct evpq_conn {
struct event_base *ev_base;
struct evpq_callback_info user_cb;
+ void *user_cb_arg;
PGconn *pg_conn;
@@ -25,7 +26,7 @@
conn->state = EVPQ_FAILURE;
// notify
- conn->user_cb.fn_failure(conn, conn->user_cb.cb_arg);
+ conn->user_cb.fn_failure(conn, conn->user_cb_arg);
}
/*
@@ -36,7 +37,7 @@
conn->state = EVPQ_CONNECTED;
// notify
- conn->user_cb.fn_connected(conn, conn->user_cb.cb_arg);
+ conn->user_cb.fn_connected(conn, conn->user_cb_arg);
}
/*
@@ -61,14 +62,14 @@
conn->state = EVPQ_CONNECTED;
// tell the user the query is done
- conn->user_cb.fn_done(conn, conn->user_cb.cb_arg);
+ conn->user_cb.fn_done(conn, conn->user_c_arg);
// stop waiting for more results
return 1;
} else {
// got a result, give it to the user
- conn->user_cb.fn_result(conn, result, conn->user_cb.cb_arg);
+ conn->user_cb.fn_result(conn, result, conn->user_cb_arg);
// great
return 0;
@@ -206,7 +207,7 @@
}
-struct evpq_conn *evpq_connect (struct event_base *ev_base, const char *conninfo, const struct evpq_callback_info cb_info) {
+struct evpq_conn *evpq_connect (struct event_base *ev_base, const char *conninfo, const struct evpq_callback_info cb_info, void *cb_arg) {
struct evpq_conn *conn = NULL;
// alloc our context
@@ -216,6 +217,7 @@
// initial state
conn->ev_base = ev_base;
conn->user_cb = cb_info;
+ conn->user_cb_arg = cb_arg;
conn->state = EVPQ_INIT;
// create our PGconn
@@ -272,6 +274,10 @@
return -1;
}
+enum evpq_state evpq_state (struct evpq_conn *conn) {
+ return conn->state;
+}
+
const PGconn *evpq_pgconn (struct evpq_conn *conn) {
return conn->pg_conn;
}
--- a/src/evpq.h Sat Oct 11 21:35:48 2008 +0300
+++ b/src/evpq.h Sun Oct 12 00:17:09 2008 +0300
@@ -33,16 +33,13 @@
void (*fn_done)(struct evpq_conn *conn, void *arg);
/*
- * Something caused this evpq_conn to fail :(
+ * The evpq_conn has suffered a complete failure.
+ *
+ * Most likely, this means that the connection to the server was lost, or not established at all.
*
* XXX: add a `what` arg?
*/
void (*fn_failure)(struct evpq_conn *conn, void *arg);
-
- /*
- * Arg to pass through to all callbacks.
- */
- void *cb_arg;
};
/*
@@ -69,7 +66,7 @@
*
* cb_info contains the callback functions (and the user argument) to use.
*/
-struct evpq_conn *evpq_connect (struct event_base *ev_base, const char *conninfo, const struct evpq_callback_info cb_info);
+struct evpq_conn *evpq_connect (struct event_base *ev_base, const char *conninfo, const struct evpq_callback_info cb_info, void *cb_arg);
/*
* Execute a query.
@@ -83,6 +80,11 @@
int evpq_query (struct evpq_conn *conn, const char *command);
/*
+ * Connection state à la evpq.
+ */
+enum evpq_state evpq_state (struct evpq_conn *conn);
+
+/*
* Get the actual PGconn.
*
* This can safely be used to access all of the normal PQ functions.
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/evsql.c Sun Oct 12 00:17:09 2008 +0300
@@ -0,0 +1,344 @@
+#define _GNU_SOURCE
+#include <stdlib.h>
+#include <sys/queue.h>
+#include <assert.h>
+#include <string.h>
+
+#include "evsql.h"
+#include "evpq.h"
+#include "lib/log.h"
+#include "lib/error.h"
+#include "lib/misc.h"
+
+enum evsql_type {
+ EVSQL_EVPQ,
+};
+
+struct evsql {
+ // callbacks
+ evsql_error_cb error_fn;
+ void *cb_arg;
+
+ // backend engine
+ enum evsql_type type;
+
+ union {
+ struct evpq_conn *evpq;
+ } engine;
+
+ // list of queries running or waiting to run
+ TAILQ_HEAD(evsql_queue, evsql_query) queue;
+};
+
+struct evsql_query {
+ // the evsql we are querying
+ struct evsql *evsql;
+
+ // the actual SQL query, this may or may not be ours, see _evsql_query_exec
+ char *command;
+
+ // our callback
+ evsql_query_cb cb_fn;
+ void *cb_arg;
+
+ // our position in the query list
+ TAILQ_ENTRY(evsql_query) entry;
+
+ // the result
+ union {
+ PGresult *evpq;
+ } result;
+};
+
+/*
+ * Actually execute the given query.
+ *
+ * The backend should be able to accept the query at this time.
+ *
+ * query->command must be valid during the execution of this function, but once it returns, the command is not needed
+ * anymore, and should be set to NULL.
+ */
+static int _evsql_query_exec (struct evsql *evsql, struct evsql_query *query, const char *command) {
+ switch (evsql->type) {
+ case EVSQL_EVPQ:
+ // just pass it through
+ return evpq_query(evsql->engine.evpq, command);
+
+ default:
+ FATAL("evsql->type");
+ }
+}
+
+/*
+ * Dequeue the query, execute the callback, and free it.
+ */
+static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *result_info) {
+ assert(query->command == NULL);
+
+ // dequeue
+ TAILQ_REMOVE(&query->evsql->queue, query, entry);
+
+ if (result_info)
+ // call the callback
+ query->cb_fn(*result_info, query->cb_arg);
+
+ // free
+ free(query);
+}
+
+/*
+ * A query has failed, notify the user and remove it.
+ */
+static void _evsql_query_failure (struct evsql *evsql, struct evsql_query *query) {
+ struct evsql_result_info result; ZINIT(result);
+
+ // set up the result_info
+ result.evsql = evsql;
+ result.error = 1;
+
+ // finish it off
+ _evsql_query_done(query, &result);
+}
+
+/*
+ * Clear every enqueued query and then free the evsql.
+ *
+ * If result_info is given, each query will also recieve it via their callback, and the error_fn will be called.
+ */
+static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *result_info) {
+ struct evsql_query *query;
+
+ // clear the queue
+ while ((query = TAILQ_FIRST(&evsql->queue)) != NULL) {
+ _evsql_query_done(query, result_info);
+
+ TAILQ_REMOVE(&evsql->queue, query, entry);
+ }
+
+ // do the error callback if required
+ if (result_info)
+ evsql->error_fn(evsql, evsql->cb_arg);
+
+ // free
+ free(evsql);
+}
+
+
+/*
+ * Sends the next query if there are more enqueued
+ */
+static void _evsql_pump (struct evsql *evsql) {
+ struct evsql_query *query;
+
+ // look for the next query
+ if ((query = TAILQ_FIRST(&evsql->queue)) != NULL) {
+ // try and execute it
+ if (_evsql_query_exec(evsql, query, query->command)) {
+ // the query failed
+ _evsql_query_failure(evsql, query);
+ }
+
+ // free the command
+ free(query->command); query->command = NULL;
+
+ // ok, then we just wait
+ }
+}
+
+
+static void _evsql_evpq_connected (struct evpq_conn *conn, void *arg) {
+ struct evsql *evsql = arg;
+
+ // no state to update, just pump any waiting queries
+ _evsql_pump(evsql);
+}
+
+static void _evsql_evpq_result (struct evpq_conn *conn, PGresult *result, void *arg) {
+ struct evsql *evsql = arg;
+ struct evsql_query *query;
+
+ assert((query = TAILQ_FIRST(&evsql->queue)) != NULL);
+
+ // if we get multiple results, only return the first one
+ if (query->result.evpq) {
+ WARNING("[evsql] evpq query returned multiple results, discarding previous one");
+
+ PQclear(query->result.evpq); query->result.evpq = NULL;
+ }
+
+ // remember the result
+ query->result.evpq = result;
+}
+
+static void _evsql_evpq_done (struct evpq_conn *conn, void *arg) {
+ struct evsql *evsql = arg;
+ struct evsql_query *query;
+ struct evsql_result_info result; ZINIT(result);
+
+ assert((query = TAILQ_FIRST(&evsql->queue)) != NULL);
+
+ // set up the result_info
+ result.evsql = evsql;
+
+ if (query->result.evpq == NULL) {
+ // if a query didn't return any results (bug?), warn and fail the query
+ WARNING("[evsql] evpq query didn't return any results");
+
+ result.error = 1;
+
+ } else {
+ result.error = 0;
+ result.result.pq = query->result.evpq;
+
+ }
+
+ // finish it off
+ _evsql_query_done(query, &result);
+
+ // pump the next one
+ _evsql_pump(evsql);
+}
+
+static void _evsql_evpq_failure (struct evpq_conn *conn, void *arg) {
+ struct evsql *evsql = arg;
+ struct evsql_result_info result; ZINIT(result);
+
+ // OH SHI...
+
+ // set up the result_info
+ result.evsql = evsql;
+ result.error = 1;
+
+ // finish off the whole connection
+ _evsql_destroy(evsql, &result);
+}
+
+static struct evpq_callback_info _evsql_evpq_cb_info = {
+ .fn_connected = _evsql_evpq_connected,
+ .fn_result = _evsql_evpq_result,
+ .fn_done = _evsql_evpq_done,
+ .fn_failure = _evsql_evpq_failure,
+};
+
+static struct evsql *_evsql_new_base (evsql_error_cb error_fn, void *cb_arg) {
+ struct evsql *evsql = NULL;
+
+ // allocate it
+ if ((evsql = calloc(1, sizeof(*evsql))) == NULL)
+ ERROR("calloc");
+
+ // store
+ evsql->error_fn = error_fn;
+ evsql->cb_arg = cb_arg;
+
+ // init
+ TAILQ_INIT(&evsql->queue);
+
+ // done
+ return evsql;
+
+error:
+ return NULL;
+}
+
+struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) {
+ struct evsql *evsql = NULL;
+
+ // base init
+ if ((evsql = _evsql_new_base (error_fn, cb_arg)) == NULL)
+ goto error;
+
+ // connect the engine
+ if ((evsql->engine.evpq = evpq_connect(ev_base, pq_conninfo, _evsql_evpq_cb_info, evsql)) == NULL)
+ goto error;
+
+ // done
+ return evsql;
+
+error:
+ // XXX: more complicated than this?
+ free(evsql);
+
+ return NULL;
+}
+
+/*
+ * Checks what the state of the connection is in regards to executing a query.
+ *
+ * Returns:
+ * <0 connection failure, query not possible
+ * 0 connection idle, can query immediately
+ * 1 connection busy, must queue query
+ */
+static int _evsql_query_idle (struct evsql *evsql) {
+ switch (evsql->type) {
+ case EVSQL_EVPQ: {
+ enum evpq_state state = evpq_state(evsql->engine.evpq);
+
+ switch (state) {
+ case EVPQ_CONNECT:
+ case EVPQ_QUERY:
+ return 1;
+
+ case EVPQ_CONNECTED:
+ return 0;
+
+ case EVPQ_INIT:
+ case EVPQ_FAILURE:
+ return -1;
+
+ default:
+ FATAL("evpq_state");
+ }
+
+ }
+
+ default:
+ FATAL("evsql->type");
+ }
+}
+
+
+struct evsql_query *evsql_query (struct evsql *evsql, const char *command, evsql_query_cb query_fn, void *cb_arg) {
+ struct evsql_query *query;
+ int idle;
+
+ // allocate it
+ if ((query = calloc(1, sizeof(*query))) == NULL)
+ ERROR("calloc");
+
+ // store
+ query->evsql = evsql;
+ query->cb_fn = query_fn;
+ query->cb_arg = cb_arg;
+
+ // check state
+ if ((idle = _evsql_query_idle(evsql)) < 0)
+ ERROR("connection is not valid");
+
+ if (idle) {
+ assert(TAILQ_EMPTY(&evsql->queue));
+
+ // execute directly
+ if (_evsql_query_exec(evsql, query, command))
+ goto error;
+
+ } else {
+ // copy the command for later execution
+ if ((query->command = strdup(command)) == NULL)
+ ERROR("strdup");
+ }
+
+ // store it on the list
+ TAILQ_INSERT_TAIL(&evsql->queue, query, entry);
+
+ // success
+ return query;
+
+error:
+ // do *NOT* free query->command, ever
+ free(query);
+
+ return NULL;
+}
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/evsql.h Sun Oct 12 00:17:09 2008 +0300
@@ -0,0 +1,80 @@
+#ifndef EVSQL_H
+#define EVSQL_H
+
+/*
+ * An event-based (Postgre)SQL client API using libevent
+ */
+
+// XXX: libpq
+#include <postgresql/libpq-fe.h>
+#include <event2/event.h>
+
+/*
+ * The generic context handle
+ */
+struct evsql;
+
+/*
+ * A query handle
+ */
+struct evsql_query;
+
+/*
+ * Query parameter info
+ * /
+struct evsql_query_params {
+ int count;
+ const char * const *values;
+ const int *lengths;
+ const int *formats;
+ int result_format;
+}; */
+
+/*
+ * Result type
+ */
+struct evsql_result_info {
+ struct evsql *evsql;
+
+ int error;
+
+ union {
+ // XXX: libpq
+ PGresult *pq;
+
+ } result;
+};
+
+/*
+ * Callback for handling query-level errors.
+ *
+ * The query has completed, either succesfully or unsuccesfully (look at info.error).
+ * info.result contains the result à la the evsql's type.
+ */
+typedef void (*evsql_query_cb)(struct evsql_result_info info, void *arg);
+
+/*
+ * Callback for handling connection-level errors.
+ *
+ * The SQL context/connection suffered an error. It is not valid anymore, and may not be used.
+ */
+typedef void (*evsql_error_cb)(struct evsql *evsql, void *arg);
+
+/*
+ * Create a new PostgreSQL/libpq(evpq) -based evsql using the given conninfo.
+ */
+struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg);
+
+/*
+ * Queue the given query for execution.
+ */
+struct evsql_query *evsql_query (struct evsql *evsql, const char *command, evsql_query_cb query_fn, void *cb_arg);
+
+/*
+ * Close a connection. Callbacks for waiting queries will not be run.
+ *
+ * XXX: not implemented yet.
+ */
+void evsql_close (struct evsql *evsql);
+
+#endif /* EVSQL_H */