working basic evpq lib and test
authorTero Marttila <terom@fixme.fi>
Sat, 04 Oct 2008 18:07:45 +0300
changeset 12 7f159ee3a3ff
parent 11 a4e382d4a22a
child 13 385b9a10d096
working basic evpq lib and test
src/evpq.c
src/evpq.h
src/evpq_test.c
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/evpq.c	Sat Oct 04 18:07:45 2008 +0300
@@ -0,0 +1,277 @@
+
+#include <event2/event.h>
+#include <assert.h>
+#include <stdlib.h>
+
+#include "evpq.h"
+#include "lib/error.h"
+
+struct evpq_conn {
+    struct event_base *ev_base;
+    struct evpq_callback_info user_cb;
+
+    PGconn *pg_conn;
+
+    struct event *ev;
+
+    enum evpq_state state;
+};
+
+/*
+ * This evpq_conn has experienced a GENERAL FAILURE.
+ */
+static void _evpq_failure (struct evpq_conn *conn) {
+    // update state
+    conn->state = EVPQ_FAILURE;
+
+    // notify
+    conn->user_cb.fn_failure(conn, conn->user_cb.cb_arg);
+}
+
+/*
+ * Initial connect was succesfull
+ */
+static void _evpq_connect_ok (struct evpq_conn *conn) {
+    // update state
+    conn->state = EVPQ_CONNECTED;
+
+    // notify
+    conn->user_cb.fn_connected(conn, conn->user_cb.cb_arg);
+}
+
+/*
+ * Initial connect failed
+ */
+static void _evpq_connect_fail (struct evpq_conn *conn) {
+    // just mark it as a generic failure
+    _evpq_failure(conn);
+}
+
+/*
+ * Receive a result and gives it to the user. If there was no more results, update state and tell the user.
+ *
+ * Returns zero if we got a result, 1 if there were/are no more results to handle.
+ */
+static int _evpq_query_result (struct evpq_conn *conn) {
+    PGresult *result;
+    
+    // get the result
+    if ((result = PQgetResult(conn->pg_conn)) == NULL) {
+        // no more results, update state
+        conn->state = EVPQ_CONNECTED;
+
+        // tell the user the query is done
+        conn->user_cb.fn_done(conn, conn->user_cb.cb_arg);
+
+        // stop waiting for more results
+        return 1;
+
+    } else {
+        // got a result, give it to the user
+        conn->user_cb.fn_result(conn, result, conn->user_cb.cb_arg);
+
+        // great
+        return 0;
+    }
+}
+
+/*
+ * Schedule a new _evpq_event for this connection.
+ */ 
+static int _evpq_schedule (struct evpq_conn *conn, short what, void (*handler)(evutil_socket_t, short, void *)) {
+    assert(conn->pg_conn != NULL);
+
+    // ensure we have a valid socket, this should be the case after the PQstatus check...
+    if (PQsocket(conn->pg_conn) < 0)
+        FATAL("PQsocket gave invalid socket");
+
+    // reschedule with a new event
+    if (conn->ev) {
+        event_assign(conn->ev, conn->ev_base, PQsocket(conn->pg_conn), what, handler, conn);
+
+    } else {
+        if ((conn->ev = event_new(conn->ev_base, PQsocket(conn->pg_conn), what, handler, conn)) == NULL)
+            PERROR("event_new");
+
+    }
+
+    // add it
+    // XXX: timeouts?
+    if (event_add(conn->ev, NULL))
+        PERROR("event_add");
+    
+    // success
+    return 0;
+
+error:
+    return -1;
+}
+
+/*
+ * Handle events on the PQ socket while connecting
+ */
+static void _evpq_connect_event (evutil_socket_t fd, short what, void *arg) {
+    struct evpq_conn *conn = arg;
+    PostgresPollingStatusType poll_status;
+    
+    // this is only for connect events
+    assert(conn->state == EVPQ_CONNECT);
+    
+    // XXX: timeouts?
+
+    // ask PQ what to do 
+    switch ((poll_status = PQconnectPoll(conn->pg_conn))) {
+        case PGRES_POLLING_READING:
+            // poll for read
+            what = EV_READ;
+            
+            // reschedule
+            break;
+
+        case PGRES_POLLING_WRITING:
+            // poll for write
+            what = EV_WRITE;
+            
+            // reschedule
+            break;
+
+        case PGRES_POLLING_OK:
+            // connected
+            _evpq_connect_ok(conn);
+            
+            // done
+            return;
+        
+        case PGRES_POLLING_FAILED:
+            // faaaaail!
+            _evpq_connect_fail(conn);
+
+            // done
+            return;
+        
+        default:
+            FATAL("PQconnectPoll gave a weird value: %d", poll_status);
+    }
+    
+    // reschedule
+    if (_evpq_schedule(conn, what, _evpq_connect_event))
+        goto error;
+
+    // done, wait for the next event
+    return;
+
+error:
+    // XXX: reset?
+    _evpq_failure(conn);
+}
+
+static void _evpq_query_event (evutil_socket_t fd, short what, void *arg) {
+    struct evpq_conn *conn = arg;
+    
+    // this is only for query events
+    assert(conn->state == EVPQ_QUERY);
+
+    // XXX: PQflush, timeouts
+    assert(what == EV_READ);
+
+    // we're going to assume that all queries will *require* data for their results
+    // this would break otherwise (PQconsumeInput might block?)
+    assert(PQisBusy(conn->pg_conn) != 0);
+
+    // handle input
+    if (PQconsumeInput(conn->pg_conn) == 0)
+        ERROR("PQconsumeInput: %s", PQerrorMessage(conn->pg_conn));
+    
+    // handle results
+    while (PQisBusy(conn->pg_conn) == 0) {
+        // handle the result
+        if (_evpq_query_result(conn) == 1) {
+            // no need to wait for anything anymore
+            return;
+        }
+
+        // loop to handle the next result
+    }
+
+    // still need to wait for a result, so reschedule
+    if (_evpq_schedule(conn, EV_READ, _evpq_query_event))
+        goto error;
+        
+    // done, wait for the next event
+    return;
+
+error:
+    // XXX: reset?
+    _evpq_failure(conn);
+
+}
+
+struct evpq_conn *evpq_connect (struct event_base *ev_base, const char *conninfo, const struct evpq_callback_info cb_info) {
+    struct evpq_conn *conn = NULL;
+    
+    // alloc our context
+    if ((conn = calloc(1, sizeof(*conn))) == NULL)
+        ERROR("calloc");
+    
+    // initial state
+    conn->ev_base = ev_base;
+    conn->user_cb = cb_info;
+    conn->state = EVPQ_INIT;
+
+    // create our PGconn
+    if ((conn->pg_conn = PQconnectStart(conninfo)) == NULL)
+        PERROR("PQconnectStart");
+    
+    // check for immediate failure
+    if (PQstatus(conn->pg_conn) == CONNECTION_BAD)
+        ERROR("PQstatus indicates CONNECTION_BAD after PQconnectStart");
+    
+    // assume PGRES_POLLING_WRITING
+    if (_evpq_schedule(conn, EV_WRITE, _evpq_connect_event))
+        goto error;
+
+    // connecting
+    conn->state = EVPQ_CONNECT;
+
+    // success, wait for the connection to be established
+    return conn;
+
+error:
+    if (conn) {
+        if (conn->pg_conn)
+            PQfinish(conn->pg_conn);
+        
+        free(conn);
+    }
+
+    return NULL;
+}
+
+int evpq_query (struct evpq_conn *conn, const char *command) {
+    // check state
+    if (conn->state != EVPQ_CONNECTED)
+        ERROR("invalid evpq state: %d", conn->state);
+    
+    // do the query
+    if (PQsendQuery(conn->pg_conn, command) == 0)
+        ERROR("PQsendQuery: %s", PQerrorMessage(conn->pg_conn));
+    
+    // update state
+    conn->state = EVPQ_QUERY;
+    
+    // XXX: PQflush
+
+    // poll for read
+    if (_evpq_schedule(conn, EV_READ, _evpq_query_event))
+        goto error;
+
+    // and then we wait
+    return 0;
+
+error:
+    return -1;
+}
+
+const PGconn *evpq_pgconn (struct evpq_conn *conn) {
+    return conn->pg_conn;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/evpq.h	Sat Oct 04 18:07:45 2008 +0300
@@ -0,0 +1,96 @@
+#ifndef EVPQ_H
+#define EVPQ_H
+
+/*
+ * Convenience functions for using libpq (the PostgreSQL client library) with libevent.
+ */
+
+#include <event2/event.h>
+#include <postgresql/libpq-fe.h>
+
+/*
+ * Our PGconn context wrapper.
+ */
+struct evpq_conn;
+
+/*
+ * Callback functions used
+ */
+struct evpq_callback_info {
+    /*
+     * This evpq_conn has connected succesfully \o/
+     */
+    void (*fn_connected)(struct evpq_conn *conn, void *arg);
+
+    /*
+     * Got a result.
+     */
+    void (*fn_result)(struct evpq_conn *conn, PGresult *result, void *arg);
+
+    /*
+     * No more results for the query
+     */
+    void (*fn_done)(struct evpq_conn *conn, void *arg);
+    
+    /*
+     * Something caused this evpq_conn to fail :(
+     *
+     * XXX: add a `what` arg?
+     */
+    void (*fn_failure)(struct evpq_conn *conn, void *arg);
+
+    /*
+     * Arg to pass through to all callbacks.
+     */
+    void *cb_arg;
+};
+
+/*
+ * evpq_conn states
+ */
+enum evpq_state {
+    EVPQ_INIT,
+
+    EVPQ_CONNECT,
+    EVPQ_CONNECTED,
+
+    EVPQ_QUERY,
+
+    EVPQ_FAILURE,
+};
+
+/*
+ * Create a new evpq connection.
+ *
+ * This corresponds directly to PQconnectStart, and handles all the libevent setup/polling needed.
+ *
+ * The connection will initially be in the EVPQ_CONNECT state, and will then either callback via fn_connected
+ * (EVPQ_CONNECTED) or fn_failure (EVPQ_FAILURE).
+ *
+ * cb_info contains the callback functions (and the user argument) to use.
+ */
+struct evpq_conn *evpq_connect (struct event_base *ev_base, const char *conninfo, const struct evpq_callback_info cb_info);
+
+/*
+ * Execute a query.
+ *
+ * This corresponds directly to PQsendQuery. This evpq must be in the EVPQ_CONNECTED state, so you must wait after
+ * calling evpq_connect, and you may not run two queries at the same time.
+ *
+ * The query will result in a series of fn_result (EVPQ_RESULT) calls (if multiple queries in the query string),
+ * followed by a fn_done (EVPQ_CONNECTED).
+ */
+int evpq_query (struct evpq_conn *conn, const char *command);
+
+/*
+ * Get the actual PGconn.
+ *
+ * This can safely be used to access all of the normal PQ functions.
+ */
+const PGconn *evpq_pgconn (struct evpq_conn *conn);
+
+// convenience wrappers
+#define evpq_error_message(conn) PQerrorMessage(evpq_pgconn(conn))
+
+
+#endif /* EVPQ_H */
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/evpq_test.c	Sat Oct 04 18:07:45 2008 +0300
@@ -0,0 +1,91 @@
+
+#include <stdio.h>
+
+#include "evpq.h"
+#include "lib/log.h"
+
+#define CONNINFO_DEFAULT "dbname=test"
+#define QUERY_DEFAULT "SELECT a, b FROM foo"
+
+void cb_connected (struct evpq_conn *conn, void *arg) {
+    INFO("[evpq_test] connected");
+
+    if (evpq_query(conn, QUERY_DEFAULT))
+        FATAL("evpq_query");
+}
+
+void cb_result (struct evpq_conn *conn, PGresult *result, void *arg) {
+
+    INFO("[evpq_test] result: %s", PQresStatus(PQresultStatus(result)));
+
+    // fatal error?
+    if (PQresultStatus(result) != PGRES_TUPLES_OK)
+        FATAL("error: %s", PQresultErrorMessage(result));
+    
+    // dump it to stdout
+    PQprintOpt popt = {
+        .header     = 1,
+        .align      = 1,
+        .standard   = 0,
+        .html3      = 0,
+        .expanded   = 1,
+        .pager      = 0,
+        .fieldSep   = "|",
+        .tableOpt   = NULL,
+        .caption    = NULL,
+        .fieldName  = NULL,
+    };
+
+    PQprint(stdout, result, &popt);
+
+    // don't care about the result anymore
+    PQclear(result);
+}
+
+void cb_done (struct evpq_conn *conn, void *arg) {
+    INFO("[evpq_test] done");
+}
+
+void cb_failure (struct evpq_conn *conn, void *arg) {
+    INFO("[evpq_test] failure");
+    INFO("\t%s", evpq_error_message(conn));
+    
+    FATAL("exiting");
+}
+
+int main (int argc, char **argv) {
+    struct event_base *ev_base = NULL;
+    struct evpq_conn *conn = NULL;
+    const char *conninfo = CONNINFO_DEFAULT;
+
+    struct evpq_callback_info cb_info = {
+        .fn_connected = cb_connected,
+        .fn_result = cb_result,
+        .fn_done = cb_done,
+        .fn_failure = cb_failure,
+
+        .cb_arg = NULL,
+    };
+
+    // initialize libevent
+    if ((ev_base = event_base_new()) == NULL)
+        ERROR("event_base_new");
+
+    // establish the evpq connection
+    if ((conn = evpq_connect(ev_base, conninfo, cb_info)) == NULL)
+        ERROR("evpq_connect");
+
+    // run libevent
+    INFO("running libevent loop");
+
+    if (event_base_dispatch(ev_base))
+        ERROR("event_base_dispatch");
+    
+    // clean shutdown
+
+error:
+    if (ev_base)
+        event_base_free(ev_base);
+}
+
+