--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/evpq.c Sat Oct 04 18:07:45 2008 +0300
@@ -0,0 +1,277 @@
+
+#include <event2/event.h>
+#include <assert.h>
+#include <stdlib.h>
+
+#include "evpq.h"
+#include "lib/error.h"
+
+struct evpq_conn {
+ struct event_base *ev_base;
+ struct evpq_callback_info user_cb;
+
+ PGconn *pg_conn;
+
+ struct event *ev;
+
+ enum evpq_state state;
+};
+
+/*
+ * This evpq_conn has experienced a GENERAL FAILURE.
+ */
+static void _evpq_failure (struct evpq_conn *conn) {
+ // update state
+ conn->state = EVPQ_FAILURE;
+
+ // notify
+ conn->user_cb.fn_failure(conn, conn->user_cb.cb_arg);
+}
+
+/*
+ * Initial connect was succesfull
+ */
+static void _evpq_connect_ok (struct evpq_conn *conn) {
+ // update state
+ conn->state = EVPQ_CONNECTED;
+
+ // notify
+ conn->user_cb.fn_connected(conn, conn->user_cb.cb_arg);
+}
+
+/*
+ * Initial connect failed
+ */
+static void _evpq_connect_fail (struct evpq_conn *conn) {
+ // just mark it as a generic failure
+ _evpq_failure(conn);
+}
+
+/*
+ * Receive a result and gives it to the user. If there was no more results, update state and tell the user.
+ *
+ * Returns zero if we got a result, 1 if there were/are no more results to handle.
+ */
+static int _evpq_query_result (struct evpq_conn *conn) {
+ PGresult *result;
+
+ // get the result
+ if ((result = PQgetResult(conn->pg_conn)) == NULL) {
+ // no more results, update state
+ conn->state = EVPQ_CONNECTED;
+
+ // tell the user the query is done
+ conn->user_cb.fn_done(conn, conn->user_cb.cb_arg);
+
+ // stop waiting for more results
+ return 1;
+
+ } else {
+ // got a result, give it to the user
+ conn->user_cb.fn_result(conn, result, conn->user_cb.cb_arg);
+
+ // great
+ return 0;
+ }
+}
+
+/*
+ * Schedule a new _evpq_event for this connection.
+ */
+static int _evpq_schedule (struct evpq_conn *conn, short what, void (*handler)(evutil_socket_t, short, void *)) {
+ assert(conn->pg_conn != NULL);
+
+ // ensure we have a valid socket, this should be the case after the PQstatus check...
+ if (PQsocket(conn->pg_conn) < 0)
+ FATAL("PQsocket gave invalid socket");
+
+ // reschedule with a new event
+ if (conn->ev) {
+ event_assign(conn->ev, conn->ev_base, PQsocket(conn->pg_conn), what, handler, conn);
+
+ } else {
+ if ((conn->ev = event_new(conn->ev_base, PQsocket(conn->pg_conn), what, handler, conn)) == NULL)
+ PERROR("event_new");
+
+ }
+
+ // add it
+ // XXX: timeouts?
+ if (event_add(conn->ev, NULL))
+ PERROR("event_add");
+
+ // success
+ return 0;
+
+error:
+ return -1;
+}
+
+/*
+ * Handle events on the PQ socket while connecting
+ */
+static void _evpq_connect_event (evutil_socket_t fd, short what, void *arg) {
+ struct evpq_conn *conn = arg;
+ PostgresPollingStatusType poll_status;
+
+ // this is only for connect events
+ assert(conn->state == EVPQ_CONNECT);
+
+ // XXX: timeouts?
+
+ // ask PQ what to do
+ switch ((poll_status = PQconnectPoll(conn->pg_conn))) {
+ case PGRES_POLLING_READING:
+ // poll for read
+ what = EV_READ;
+
+ // reschedule
+ break;
+
+ case PGRES_POLLING_WRITING:
+ // poll for write
+ what = EV_WRITE;
+
+ // reschedule
+ break;
+
+ case PGRES_POLLING_OK:
+ // connected
+ _evpq_connect_ok(conn);
+
+ // done
+ return;
+
+ case PGRES_POLLING_FAILED:
+ // faaaaail!
+ _evpq_connect_fail(conn);
+
+ // done
+ return;
+
+ default:
+ FATAL("PQconnectPoll gave a weird value: %d", poll_status);
+ }
+
+ // reschedule
+ if (_evpq_schedule(conn, what, _evpq_connect_event))
+ goto error;
+
+ // done, wait for the next event
+ return;
+
+error:
+ // XXX: reset?
+ _evpq_failure(conn);
+}
+
+static void _evpq_query_event (evutil_socket_t fd, short what, void *arg) {
+ struct evpq_conn *conn = arg;
+
+ // this is only for query events
+ assert(conn->state == EVPQ_QUERY);
+
+ // XXX: PQflush, timeouts
+ assert(what == EV_READ);
+
+ // we're going to assume that all queries will *require* data for their results
+ // this would break otherwise (PQconsumeInput might block?)
+ assert(PQisBusy(conn->pg_conn) != 0);
+
+ // handle input
+ if (PQconsumeInput(conn->pg_conn) == 0)
+ ERROR("PQconsumeInput: %s", PQerrorMessage(conn->pg_conn));
+
+ // handle results
+ while (PQisBusy(conn->pg_conn) == 0) {
+ // handle the result
+ if (_evpq_query_result(conn) == 1) {
+ // no need to wait for anything anymore
+ return;
+ }
+
+ // loop to handle the next result
+ }
+
+ // still need to wait for a result, so reschedule
+ if (_evpq_schedule(conn, EV_READ, _evpq_query_event))
+ goto error;
+
+ // done, wait for the next event
+ return;
+
+error:
+ // XXX: reset?
+ _evpq_failure(conn);
+
+}
+
+struct evpq_conn *evpq_connect (struct event_base *ev_base, const char *conninfo, const struct evpq_callback_info cb_info) {
+ struct evpq_conn *conn = NULL;
+
+ // alloc our context
+ if ((conn = calloc(1, sizeof(*conn))) == NULL)
+ ERROR("calloc");
+
+ // initial state
+ conn->ev_base = ev_base;
+ conn->user_cb = cb_info;
+ conn->state = EVPQ_INIT;
+
+ // create our PGconn
+ if ((conn->pg_conn = PQconnectStart(conninfo)) == NULL)
+ PERROR("PQconnectStart");
+
+ // check for immediate failure
+ if (PQstatus(conn->pg_conn) == CONNECTION_BAD)
+ ERROR("PQstatus indicates CONNECTION_BAD after PQconnectStart");
+
+ // assume PGRES_POLLING_WRITING
+ if (_evpq_schedule(conn, EV_WRITE, _evpq_connect_event))
+ goto error;
+
+ // connecting
+ conn->state = EVPQ_CONNECT;
+
+ // success, wait for the connection to be established
+ return conn;
+
+error:
+ if (conn) {
+ if (conn->pg_conn)
+ PQfinish(conn->pg_conn);
+
+ free(conn);
+ }
+
+ return NULL;
+}
+
+int evpq_query (struct evpq_conn *conn, const char *command) {
+ // check state
+ if (conn->state != EVPQ_CONNECTED)
+ ERROR("invalid evpq state: %d", conn->state);
+
+ // do the query
+ if (PQsendQuery(conn->pg_conn, command) == 0)
+ ERROR("PQsendQuery: %s", PQerrorMessage(conn->pg_conn));
+
+ // update state
+ conn->state = EVPQ_QUERY;
+
+ // XXX: PQflush
+
+ // poll for read
+ if (_evpq_schedule(conn, EV_READ, _evpq_query_event))
+ goto error;
+
+ // and then we wait
+ return 0;
+
+error:
+ return -1;
+}
+
+const PGconn *evpq_pgconn (struct evpq_conn *conn) {
+ return conn->pg_conn;
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/evpq.h Sat Oct 04 18:07:45 2008 +0300
@@ -0,0 +1,96 @@
+#ifndef EVPQ_H
+#define EVPQ_H
+
+/*
+ * Convenience functions for using libpq (the PostgreSQL client library) with libevent.
+ */
+
+#include <event2/event.h>
+#include <postgresql/libpq-fe.h>
+
+/*
+ * Our PGconn context wrapper.
+ */
+struct evpq_conn;
+
+/*
+ * Callback functions used
+ */
+struct evpq_callback_info {
+ /*
+ * This evpq_conn has connected succesfully \o/
+ */
+ void (*fn_connected)(struct evpq_conn *conn, void *arg);
+
+ /*
+ * Got a result.
+ */
+ void (*fn_result)(struct evpq_conn *conn, PGresult *result, void *arg);
+
+ /*
+ * No more results for the query
+ */
+ void (*fn_done)(struct evpq_conn *conn, void *arg);
+
+ /*
+ * Something caused this evpq_conn to fail :(
+ *
+ * XXX: add a `what` arg?
+ */
+ void (*fn_failure)(struct evpq_conn *conn, void *arg);
+
+ /*
+ * Arg to pass through to all callbacks.
+ */
+ void *cb_arg;
+};
+
+/*
+ * evpq_conn states
+ */
+enum evpq_state {
+ EVPQ_INIT,
+
+ EVPQ_CONNECT,
+ EVPQ_CONNECTED,
+
+ EVPQ_QUERY,
+
+ EVPQ_FAILURE,
+};
+
+/*
+ * Create a new evpq connection.
+ *
+ * This corresponds directly to PQconnectStart, and handles all the libevent setup/polling needed.
+ *
+ * The connection will initially be in the EVPQ_CONNECT state, and will then either callback via fn_connected
+ * (EVPQ_CONNECTED) or fn_failure (EVPQ_FAILURE).
+ *
+ * cb_info contains the callback functions (and the user argument) to use.
+ */
+struct evpq_conn *evpq_connect (struct event_base *ev_base, const char *conninfo, const struct evpq_callback_info cb_info);
+
+/*
+ * Execute a query.
+ *
+ * This corresponds directly to PQsendQuery. This evpq must be in the EVPQ_CONNECTED state, so you must wait after
+ * calling evpq_connect, and you may not run two queries at the same time.
+ *
+ * The query will result in a series of fn_result (EVPQ_RESULT) calls (if multiple queries in the query string),
+ * followed by a fn_done (EVPQ_CONNECTED).
+ */
+int evpq_query (struct evpq_conn *conn, const char *command);
+
+/*
+ * Get the actual PGconn.
+ *
+ * This can safely be used to access all of the normal PQ functions.
+ */
+const PGconn *evpq_pgconn (struct evpq_conn *conn);
+
+// convenience wrappers
+#define evpq_error_message(conn) PQerrorMessage(evpq_pgconn(conn))
+
+
+#endif /* EVPQ_H */
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/evpq_test.c Sat Oct 04 18:07:45 2008 +0300
@@ -0,0 +1,91 @@
+
+#include <stdio.h>
+
+#include "evpq.h"
+#include "lib/log.h"
+
+#define CONNINFO_DEFAULT "dbname=test"
+#define QUERY_DEFAULT "SELECT a, b FROM foo"
+
+void cb_connected (struct evpq_conn *conn, void *arg) {
+ INFO("[evpq_test] connected");
+
+ if (evpq_query(conn, QUERY_DEFAULT))
+ FATAL("evpq_query");
+}
+
+void cb_result (struct evpq_conn *conn, PGresult *result, void *arg) {
+
+ INFO("[evpq_test] result: %s", PQresStatus(PQresultStatus(result)));
+
+ // fatal error?
+ if (PQresultStatus(result) != PGRES_TUPLES_OK)
+ FATAL("error: %s", PQresultErrorMessage(result));
+
+ // dump it to stdout
+ PQprintOpt popt = {
+ .header = 1,
+ .align = 1,
+ .standard = 0,
+ .html3 = 0,
+ .expanded = 1,
+ .pager = 0,
+ .fieldSep = "|",
+ .tableOpt = NULL,
+ .caption = NULL,
+ .fieldName = NULL,
+ };
+
+ PQprint(stdout, result, &popt);
+
+ // don't care about the result anymore
+ PQclear(result);
+}
+
+void cb_done (struct evpq_conn *conn, void *arg) {
+ INFO("[evpq_test] done");
+}
+
+void cb_failure (struct evpq_conn *conn, void *arg) {
+ INFO("[evpq_test] failure");
+ INFO("\t%s", evpq_error_message(conn));
+
+ FATAL("exiting");
+}
+
+int main (int argc, char **argv) {
+ struct event_base *ev_base = NULL;
+ struct evpq_conn *conn = NULL;
+ const char *conninfo = CONNINFO_DEFAULT;
+
+ struct evpq_callback_info cb_info = {
+ .fn_connected = cb_connected,
+ .fn_result = cb_result,
+ .fn_done = cb_done,
+ .fn_failure = cb_failure,
+
+ .cb_arg = NULL,
+ };
+
+ // initialize libevent
+ if ((ev_base = event_base_new()) == NULL)
+ ERROR("event_base_new");
+
+ // establish the evpq connection
+ if ((conn = evpq_connect(ev_base, conninfo, cb_info)) == NULL)
+ ERROR("evpq_connect");
+
+ // run libevent
+ INFO("running libevent loop");
+
+ if (event_base_dispatch(ev_base))
+ ERROR("event_base_dispatch");
+
+ // clean shutdown
+
+error:
+ if (ev_base)
+ event_base_free(ev_base);
+}
+
+