terom@12: terom@12: #include terom@12: #include terom@12: #include terom@12: terom@12: #include "evpq.h" terom@12: #include "lib/error.h" terom@12: terom@12: struct evpq_conn { terom@12: struct event_base *ev_base; terom@12: struct evpq_callback_info user_cb; terom@21: void *user_cb_arg; terom@12: terom@12: PGconn *pg_conn; terom@12: terom@12: struct event *ev; terom@12: terom@12: enum evpq_state state; terom@12: }; terom@12: terom@12: /* terom@12: * This evpq_conn has experienced a GENERAL FAILURE. terom@12: */ terom@12: static void _evpq_failure (struct evpq_conn *conn) { terom@12: // update state terom@12: conn->state = EVPQ_FAILURE; terom@12: terom@12: // notify terom@21: conn->user_cb.fn_failure(conn, conn->user_cb_arg); terom@12: } terom@12: terom@12: /* terom@12: * Initial connect was succesfull terom@12: */ terom@12: static void _evpq_connect_ok (struct evpq_conn *conn) { terom@12: // update state terom@12: conn->state = EVPQ_CONNECTED; terom@12: terom@12: // notify terom@21: conn->user_cb.fn_connected(conn, conn->user_cb_arg); terom@12: } terom@12: terom@12: /* terom@12: * Initial connect failed terom@12: */ terom@12: static void _evpq_connect_fail (struct evpq_conn *conn) { terom@12: // just mark it as a generic failure terom@12: _evpq_failure(conn); terom@12: } terom@12: terom@12: /* terom@12: * Receive a result and gives it to the user. If there was no more results, update state and tell the user. terom@12: * terom@12: * Returns zero if we got a result, 1 if there were/are no more results to handle. terom@12: */ terom@12: static int _evpq_query_result (struct evpq_conn *conn) { terom@12: PGresult *result; terom@12: terom@12: // get the result terom@12: if ((result = PQgetResult(conn->pg_conn)) == NULL) { terom@12: // no more results, update state terom@12: conn->state = EVPQ_CONNECTED; terom@12: terom@12: // tell the user the query is done terom@22: conn->user_cb.fn_done(conn, conn->user_cb_arg); terom@12: terom@12: // stop waiting for more results terom@12: return 1; terom@12: terom@12: } else { terom@12: // got a result, give it to the user terom@21: conn->user_cb.fn_result(conn, result, conn->user_cb_arg); terom@12: terom@12: // great terom@12: return 0; terom@12: } terom@12: } terom@12: terom@12: /* terom@12: * Schedule a new _evpq_event for this connection. terom@12: */ terom@12: static int _evpq_schedule (struct evpq_conn *conn, short what, void (*handler)(evutil_socket_t, short, void *)) { terom@12: assert(conn->pg_conn != NULL); terom@12: terom@12: // ensure we have a valid socket, this should be the case after the PQstatus check... terom@12: if (PQsocket(conn->pg_conn) < 0) terom@12: FATAL("PQsocket gave invalid socket"); terom@12: terom@12: // reschedule with a new event terom@12: if (conn->ev) { terom@12: event_assign(conn->ev, conn->ev_base, PQsocket(conn->pg_conn), what, handler, conn); terom@12: terom@12: } else { terom@12: if ((conn->ev = event_new(conn->ev_base, PQsocket(conn->pg_conn), what, handler, conn)) == NULL) terom@12: PERROR("event_new"); terom@12: terom@12: } terom@12: terom@12: // add it terom@12: // XXX: timeouts? terom@12: if (event_add(conn->ev, NULL)) terom@12: PERROR("event_add"); terom@12: terom@12: // success terom@12: return 0; terom@12: terom@12: error: terom@12: return -1; terom@12: } terom@12: terom@12: /* terom@12: * Handle events on the PQ socket while connecting terom@12: */ terom@12: static void _evpq_connect_event (evutil_socket_t fd, short what, void *arg) { terom@12: struct evpq_conn *conn = arg; terom@12: PostgresPollingStatusType poll_status; terom@12: terom@12: // this is only for connect events terom@12: assert(conn->state == EVPQ_CONNECT); terom@12: terom@12: // XXX: timeouts? terom@12: terom@12: // ask PQ what to do terom@12: switch ((poll_status = PQconnectPoll(conn->pg_conn))) { terom@12: case PGRES_POLLING_READING: terom@12: // poll for read terom@12: what = EV_READ; terom@12: terom@12: // reschedule terom@12: break; terom@12: terom@12: case PGRES_POLLING_WRITING: terom@12: // poll for write terom@12: what = EV_WRITE; terom@12: terom@12: // reschedule terom@12: break; terom@12: terom@12: case PGRES_POLLING_OK: terom@12: // connected terom@12: _evpq_connect_ok(conn); terom@12: terom@12: // done terom@12: return; terom@12: terom@12: case PGRES_POLLING_FAILED: terom@12: // faaaaail! terom@12: _evpq_connect_fail(conn); terom@12: terom@12: // done terom@12: return; terom@12: terom@12: default: terom@12: FATAL("PQconnectPoll gave a weird value: %d", poll_status); terom@12: } terom@12: terom@12: // reschedule terom@12: if (_evpq_schedule(conn, what, _evpq_connect_event)) terom@12: goto error; terom@12: terom@12: // done, wait for the next event terom@12: return; terom@12: terom@12: error: terom@12: // XXX: reset? terom@12: _evpq_failure(conn); terom@12: } terom@12: terom@12: static void _evpq_query_event (evutil_socket_t fd, short what, void *arg) { terom@12: struct evpq_conn *conn = arg; terom@12: terom@12: // this is only for query events terom@12: assert(conn->state == EVPQ_QUERY); terom@12: terom@12: // XXX: PQflush, timeouts terom@12: assert(what == EV_READ); terom@12: terom@12: // we're going to assume that all queries will *require* data for their results terom@12: // this would break otherwise (PQconsumeInput might block?) terom@12: assert(PQisBusy(conn->pg_conn) != 0); terom@12: terom@12: // handle input terom@12: if (PQconsumeInput(conn->pg_conn) == 0) terom@12: ERROR("PQconsumeInput: %s", PQerrorMessage(conn->pg_conn)); terom@12: terom@12: // handle results terom@12: while (PQisBusy(conn->pg_conn) == 0) { terom@12: // handle the result terom@12: if (_evpq_query_result(conn) == 1) { terom@12: // no need to wait for anything anymore terom@12: return; terom@12: } terom@12: terom@12: // loop to handle the next result terom@12: } terom@12: terom@12: // still need to wait for a result, so reschedule terom@12: if (_evpq_schedule(conn, EV_READ, _evpq_query_event)) terom@12: goto error; terom@12: terom@12: // done, wait for the next event terom@12: return; terom@12: terom@12: error: terom@12: // XXX: reset? terom@12: _evpq_failure(conn); terom@12: terom@12: } terom@12: terom@21: struct evpq_conn *evpq_connect (struct event_base *ev_base, const char *conninfo, const struct evpq_callback_info cb_info, void *cb_arg) { terom@12: struct evpq_conn *conn = NULL; terom@12: terom@12: // alloc our context terom@12: if ((conn = calloc(1, sizeof(*conn))) == NULL) terom@12: ERROR("calloc"); terom@12: terom@12: // initial state terom@12: conn->ev_base = ev_base; terom@12: conn->user_cb = cb_info; terom@21: conn->user_cb_arg = cb_arg; terom@12: conn->state = EVPQ_INIT; terom@12: terom@12: // create our PGconn terom@12: if ((conn->pg_conn = PQconnectStart(conninfo)) == NULL) terom@12: PERROR("PQconnectStart"); terom@12: terom@12: // check for immediate failure terom@12: if (PQstatus(conn->pg_conn) == CONNECTION_BAD) terom@12: ERROR("PQstatus indicates CONNECTION_BAD after PQconnectStart"); terom@12: terom@12: // assume PGRES_POLLING_WRITING terom@12: if (_evpq_schedule(conn, EV_WRITE, _evpq_connect_event)) terom@12: goto error; terom@12: terom@12: // connecting terom@12: conn->state = EVPQ_CONNECT; terom@12: terom@12: // success, wait for the connection to be established terom@12: return conn; terom@12: terom@12: error: terom@25: if (conn) terom@25: evpq_release(conn); terom@12: terom@12: return NULL; terom@12: } terom@12: terom@23: static int _evpq_check_query (struct evpq_conn *conn) { terom@23: // just check the state terom@12: if (conn->state != EVPQ_CONNECTED) terom@12: ERROR("invalid evpq state: %d", conn->state); terom@12: terom@23: // ok terom@23: return 0; terom@23: terom@23: error: terom@23: return -1; terom@23: } terom@23: terom@23: static int _evpq_handle_query (struct evpq_conn *conn) { terom@12: // update state terom@12: conn->state = EVPQ_QUERY; terom@12: terom@12: // XXX: PQflush terom@12: terom@12: // poll for read terom@12: if (_evpq_schedule(conn, EV_READ, _evpq_query_event)) terom@12: goto error; terom@12: terom@12: // and then we wait terom@12: return 0; terom@12: terom@12: error: terom@12: return -1; terom@12: } terom@12: terom@23: int evpq_query (struct evpq_conn *conn, const char *command) { terom@23: // check state terom@23: if (_evpq_check_query(conn)) terom@23: goto error; terom@23: terom@23: // do the query terom@23: if (PQsendQuery(conn->pg_conn, command) == 0) terom@23: ERROR("PQsendQuery: %s", PQerrorMessage(conn->pg_conn)); terom@23: terom@23: // handle it terom@24: if (_evpq_handle_query(conn)) terom@23: goto error; terom@23: terom@23: // success terom@23: return 0; terom@23: terom@23: error: terom@23: return -1; terom@23: } terom@23: terom@23: int evpq_query_params (struct evpq_conn *conn, const char *command, int nParams, const Oid *paramTypes, const char * const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat) { terom@23: // check state terom@23: if (_evpq_check_query(conn)) terom@23: goto error; terom@23: terom@23: // do the query terom@23: if (PQsendQueryParams(conn->pg_conn, command, nParams, paramTypes, paramValues, paramLengths, paramFormats, resultFormat) == 0) terom@23: ERROR("PQsendQueryParams: %s", PQerrorMessage(conn->pg_conn)); terom@23: terom@23: // handle it terom@24: if (_evpq_handle_query(conn)) terom@23: goto error; terom@23: terom@23: // success terom@23: return 0; terom@23: terom@23: error: terom@23: return -1; terom@23: terom@23: } terom@23: terom@25: void evpq_release (struct evpq_conn *conn) { terom@25: if (conn->ev) terom@25: event_free(conn->ev); terom@25: terom@25: if (conn->pg_conn) terom@25: PQfinish(conn->pg_conn); terom@25: terom@25: free(conn); terom@25: } terom@25: terom@21: enum evpq_state evpq_state (struct evpq_conn *conn) { terom@21: return conn->state; terom@21: } terom@21: terom@12: const PGconn *evpq_pgconn (struct evpq_conn *conn) { terom@12: return conn->pg_conn; terom@12: }