# HG changeset patch # User Tero Marttila # Date 1223132865 -10800 # Node ID 7f159ee3a3ffae736980b0ef2d37ddcd4e0fc74c # Parent a4e382d4a22a022ce7c58e4f337c077084a3d186 working basic evpq lib and test diff -r a4e382d4a22a -r 7f159ee3a3ff src/evpq.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/evpq.c Sat Oct 04 18:07:45 2008 +0300 @@ -0,0 +1,277 @@ + +#include +#include +#include + +#include "evpq.h" +#include "lib/error.h" + +struct evpq_conn { + struct event_base *ev_base; + struct evpq_callback_info user_cb; + + PGconn *pg_conn; + + struct event *ev; + + enum evpq_state state; +}; + +/* + * This evpq_conn has experienced a GENERAL FAILURE. + */ +static void _evpq_failure (struct evpq_conn *conn) { + // update state + conn->state = EVPQ_FAILURE; + + // notify + conn->user_cb.fn_failure(conn, conn->user_cb.cb_arg); +} + +/* + * Initial connect was succesfull + */ +static void _evpq_connect_ok (struct evpq_conn *conn) { + // update state + conn->state = EVPQ_CONNECTED; + + // notify + conn->user_cb.fn_connected(conn, conn->user_cb.cb_arg); +} + +/* + * Initial connect failed + */ +static void _evpq_connect_fail (struct evpq_conn *conn) { + // just mark it as a generic failure + _evpq_failure(conn); +} + +/* + * Receive a result and gives it to the user. If there was no more results, update state and tell the user. + * + * Returns zero if we got a result, 1 if there were/are no more results to handle. + */ +static int _evpq_query_result (struct evpq_conn *conn) { + PGresult *result; + + // get the result + if ((result = PQgetResult(conn->pg_conn)) == NULL) { + // no more results, update state + conn->state = EVPQ_CONNECTED; + + // tell the user the query is done + conn->user_cb.fn_done(conn, conn->user_cb.cb_arg); + + // stop waiting for more results + return 1; + + } else { + // got a result, give it to the user + conn->user_cb.fn_result(conn, result, conn->user_cb.cb_arg); + + // great + return 0; + } +} + +/* + * Schedule a new _evpq_event for this connection. + */ +static int _evpq_schedule (struct evpq_conn *conn, short what, void (*handler)(evutil_socket_t, short, void *)) { + assert(conn->pg_conn != NULL); + + // ensure we have a valid socket, this should be the case after the PQstatus check... + if (PQsocket(conn->pg_conn) < 0) + FATAL("PQsocket gave invalid socket"); + + // reschedule with a new event + if (conn->ev) { + event_assign(conn->ev, conn->ev_base, PQsocket(conn->pg_conn), what, handler, conn); + + } else { + if ((conn->ev = event_new(conn->ev_base, PQsocket(conn->pg_conn), what, handler, conn)) == NULL) + PERROR("event_new"); + + } + + // add it + // XXX: timeouts? + if (event_add(conn->ev, NULL)) + PERROR("event_add"); + + // success + return 0; + +error: + return -1; +} + +/* + * Handle events on the PQ socket while connecting + */ +static void _evpq_connect_event (evutil_socket_t fd, short what, void *arg) { + struct evpq_conn *conn = arg; + PostgresPollingStatusType poll_status; + + // this is only for connect events + assert(conn->state == EVPQ_CONNECT); + + // XXX: timeouts? + + // ask PQ what to do + switch ((poll_status = PQconnectPoll(conn->pg_conn))) { + case PGRES_POLLING_READING: + // poll for read + what = EV_READ; + + // reschedule + break; + + case PGRES_POLLING_WRITING: + // poll for write + what = EV_WRITE; + + // reschedule + break; + + case PGRES_POLLING_OK: + // connected + _evpq_connect_ok(conn); + + // done + return; + + case PGRES_POLLING_FAILED: + // faaaaail! + _evpq_connect_fail(conn); + + // done + return; + + default: + FATAL("PQconnectPoll gave a weird value: %d", poll_status); + } + + // reschedule + if (_evpq_schedule(conn, what, _evpq_connect_event)) + goto error; + + // done, wait for the next event + return; + +error: + // XXX: reset? + _evpq_failure(conn); +} + +static void _evpq_query_event (evutil_socket_t fd, short what, void *arg) { + struct evpq_conn *conn = arg; + + // this is only for query events + assert(conn->state == EVPQ_QUERY); + + // XXX: PQflush, timeouts + assert(what == EV_READ); + + // we're going to assume that all queries will *require* data for their results + // this would break otherwise (PQconsumeInput might block?) + assert(PQisBusy(conn->pg_conn) != 0); + + // handle input + if (PQconsumeInput(conn->pg_conn) == 0) + ERROR("PQconsumeInput: %s", PQerrorMessage(conn->pg_conn)); + + // handle results + while (PQisBusy(conn->pg_conn) == 0) { + // handle the result + if (_evpq_query_result(conn) == 1) { + // no need to wait for anything anymore + return; + } + + // loop to handle the next result + } + + // still need to wait for a result, so reschedule + if (_evpq_schedule(conn, EV_READ, _evpq_query_event)) + goto error; + + // done, wait for the next event + return; + +error: + // XXX: reset? + _evpq_failure(conn); + +} + +struct evpq_conn *evpq_connect (struct event_base *ev_base, const char *conninfo, const struct evpq_callback_info cb_info) { + struct evpq_conn *conn = NULL; + + // alloc our context + if ((conn = calloc(1, sizeof(*conn))) == NULL) + ERROR("calloc"); + + // initial state + conn->ev_base = ev_base; + conn->user_cb = cb_info; + conn->state = EVPQ_INIT; + + // create our PGconn + if ((conn->pg_conn = PQconnectStart(conninfo)) == NULL) + PERROR("PQconnectStart"); + + // check for immediate failure + if (PQstatus(conn->pg_conn) == CONNECTION_BAD) + ERROR("PQstatus indicates CONNECTION_BAD after PQconnectStart"); + + // assume PGRES_POLLING_WRITING + if (_evpq_schedule(conn, EV_WRITE, _evpq_connect_event)) + goto error; + + // connecting + conn->state = EVPQ_CONNECT; + + // success, wait for the connection to be established + return conn; + +error: + if (conn) { + if (conn->pg_conn) + PQfinish(conn->pg_conn); + + free(conn); + } + + return NULL; +} + +int evpq_query (struct evpq_conn *conn, const char *command) { + // check state + if (conn->state != EVPQ_CONNECTED) + ERROR("invalid evpq state: %d", conn->state); + + // do the query + if (PQsendQuery(conn->pg_conn, command) == 0) + ERROR("PQsendQuery: %s", PQerrorMessage(conn->pg_conn)); + + // update state + conn->state = EVPQ_QUERY; + + // XXX: PQflush + + // poll for read + if (_evpq_schedule(conn, EV_READ, _evpq_query_event)) + goto error; + + // and then we wait + return 0; + +error: + return -1; +} + +const PGconn *evpq_pgconn (struct evpq_conn *conn) { + return conn->pg_conn; +} diff -r a4e382d4a22a -r 7f159ee3a3ff src/evpq.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/evpq.h Sat Oct 04 18:07:45 2008 +0300 @@ -0,0 +1,96 @@ +#ifndef EVPQ_H +#define EVPQ_H + +/* + * Convenience functions for using libpq (the PostgreSQL client library) with libevent. + */ + +#include +#include + +/* + * Our PGconn context wrapper. + */ +struct evpq_conn; + +/* + * Callback functions used + */ +struct evpq_callback_info { + /* + * This evpq_conn has connected succesfully \o/ + */ + void (*fn_connected)(struct evpq_conn *conn, void *arg); + + /* + * Got a result. + */ + void (*fn_result)(struct evpq_conn *conn, PGresult *result, void *arg); + + /* + * No more results for the query + */ + void (*fn_done)(struct evpq_conn *conn, void *arg); + + /* + * Something caused this evpq_conn to fail :( + * + * XXX: add a `what` arg? + */ + void (*fn_failure)(struct evpq_conn *conn, void *arg); + + /* + * Arg to pass through to all callbacks. + */ + void *cb_arg; +}; + +/* + * evpq_conn states + */ +enum evpq_state { + EVPQ_INIT, + + EVPQ_CONNECT, + EVPQ_CONNECTED, + + EVPQ_QUERY, + + EVPQ_FAILURE, +}; + +/* + * Create a new evpq connection. + * + * This corresponds directly to PQconnectStart, and handles all the libevent setup/polling needed. + * + * The connection will initially be in the EVPQ_CONNECT state, and will then either callback via fn_connected + * (EVPQ_CONNECTED) or fn_failure (EVPQ_FAILURE). + * + * cb_info contains the callback functions (and the user argument) to use. + */ +struct evpq_conn *evpq_connect (struct event_base *ev_base, const char *conninfo, const struct evpq_callback_info cb_info); + +/* + * Execute a query. + * + * This corresponds directly to PQsendQuery. This evpq must be in the EVPQ_CONNECTED state, so you must wait after + * calling evpq_connect, and you may not run two queries at the same time. + * + * The query will result in a series of fn_result (EVPQ_RESULT) calls (if multiple queries in the query string), + * followed by a fn_done (EVPQ_CONNECTED). + */ +int evpq_query (struct evpq_conn *conn, const char *command); + +/* + * Get the actual PGconn. + * + * This can safely be used to access all of the normal PQ functions. + */ +const PGconn *evpq_pgconn (struct evpq_conn *conn); + +// convenience wrappers +#define evpq_error_message(conn) PQerrorMessage(evpq_pgconn(conn)) + + +#endif /* EVPQ_H */ diff -r a4e382d4a22a -r 7f159ee3a3ff src/evpq_test.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/evpq_test.c Sat Oct 04 18:07:45 2008 +0300 @@ -0,0 +1,91 @@ + +#include + +#include "evpq.h" +#include "lib/log.h" + +#define CONNINFO_DEFAULT "dbname=test" +#define QUERY_DEFAULT "SELECT a, b FROM foo" + +void cb_connected (struct evpq_conn *conn, void *arg) { + INFO("[evpq_test] connected"); + + if (evpq_query(conn, QUERY_DEFAULT)) + FATAL("evpq_query"); +} + +void cb_result (struct evpq_conn *conn, PGresult *result, void *arg) { + + INFO("[evpq_test] result: %s", PQresStatus(PQresultStatus(result))); + + // fatal error? + if (PQresultStatus(result) != PGRES_TUPLES_OK) + FATAL("error: %s", PQresultErrorMessage(result)); + + // dump it to stdout + PQprintOpt popt = { + .header = 1, + .align = 1, + .standard = 0, + .html3 = 0, + .expanded = 1, + .pager = 0, + .fieldSep = "|", + .tableOpt = NULL, + .caption = NULL, + .fieldName = NULL, + }; + + PQprint(stdout, result, &popt); + + // don't care about the result anymore + PQclear(result); +} + +void cb_done (struct evpq_conn *conn, void *arg) { + INFO("[evpq_test] done"); +} + +void cb_failure (struct evpq_conn *conn, void *arg) { + INFO("[evpq_test] failure"); + INFO("\t%s", evpq_error_message(conn)); + + FATAL("exiting"); +} + +int main (int argc, char **argv) { + struct event_base *ev_base = NULL; + struct evpq_conn *conn = NULL; + const char *conninfo = CONNINFO_DEFAULT; + + struct evpq_callback_info cb_info = { + .fn_connected = cb_connected, + .fn_result = cb_result, + .fn_done = cb_done, + .fn_failure = cb_failure, + + .cb_arg = NULL, + }; + + // initialize libevent + if ((ev_base = event_base_new()) == NULL) + ERROR("event_base_new"); + + // establish the evpq connection + if ((conn = evpq_connect(ev_base, conninfo, cb_info)) == NULL) + ERROR("evpq_connect"); + + // run libevent + INFO("running libevent loop"); + + if (event_base_dispatch(ev_base)) + ERROR("event_base_dispatch"); + + // clean shutdown + +error: + if (ev_base) + event_base_free(ev_base); +} + +