src/evpq.c
changeset 12 7f159ee3a3ff
child 21 e5da1d428e3e
equal deleted inserted replaced
11:a4e382d4a22a 12:7f159ee3a3ff
       
     1 
       
     2 #include <event2/event.h>
       
     3 #include <assert.h>
       
     4 #include <stdlib.h>
       
     5 
       
     6 #include "evpq.h"
       
     7 #include "lib/error.h"
       
     8 
       
     9 struct evpq_conn {
       
    10     struct event_base *ev_base;
       
    11     struct evpq_callback_info user_cb;
       
    12 
       
    13     PGconn *pg_conn;
       
    14 
       
    15     struct event *ev;
       
    16 
       
    17     enum evpq_state state;
       
    18 };
       
    19 
       
    20 /*
       
    21  * This evpq_conn has experienced a GENERAL FAILURE.
       
    22  */
       
    23 static void _evpq_failure (struct evpq_conn *conn) {
       
    24     // update state
       
    25     conn->state = EVPQ_FAILURE;
       
    26 
       
    27     // notify
       
    28     conn->user_cb.fn_failure(conn, conn->user_cb.cb_arg);
       
    29 }
       
    30 
       
    31 /*
       
    32  * Initial connect was succesfull
       
    33  */
       
    34 static void _evpq_connect_ok (struct evpq_conn *conn) {
       
    35     // update state
       
    36     conn->state = EVPQ_CONNECTED;
       
    37 
       
    38     // notify
       
    39     conn->user_cb.fn_connected(conn, conn->user_cb.cb_arg);
       
    40 }
       
    41 
       
    42 /*
       
    43  * Initial connect failed
       
    44  */
       
    45 static void _evpq_connect_fail (struct evpq_conn *conn) {
       
    46     // just mark it as a generic failure
       
    47     _evpq_failure(conn);
       
    48 }
       
    49 
       
    50 /*
       
    51  * Receive a result and gives it to the user. If there was no more results, update state and tell the user.
       
    52  *
       
    53  * Returns zero if we got a result, 1 if there were/are no more results to handle.
       
    54  */
       
    55 static int _evpq_query_result (struct evpq_conn *conn) {
       
    56     PGresult *result;
       
    57     
       
    58     // get the result
       
    59     if ((result = PQgetResult(conn->pg_conn)) == NULL) {
       
    60         // no more results, update state
       
    61         conn->state = EVPQ_CONNECTED;
       
    62 
       
    63         // tell the user the query is done
       
    64         conn->user_cb.fn_done(conn, conn->user_cb.cb_arg);
       
    65 
       
    66         // stop waiting for more results
       
    67         return 1;
       
    68 
       
    69     } else {
       
    70         // got a result, give it to the user
       
    71         conn->user_cb.fn_result(conn, result, conn->user_cb.cb_arg);
       
    72 
       
    73         // great
       
    74         return 0;
       
    75     }
       
    76 }
       
    77 
       
    78 /*
       
    79  * Schedule a new _evpq_event for this connection.
       
    80  */ 
       
    81 static int _evpq_schedule (struct evpq_conn *conn, short what, void (*handler)(evutil_socket_t, short, void *)) {
       
    82     assert(conn->pg_conn != NULL);
       
    83 
       
    84     // ensure we have a valid socket, this should be the case after the PQstatus check...
       
    85     if (PQsocket(conn->pg_conn) < 0)
       
    86         FATAL("PQsocket gave invalid socket");
       
    87 
       
    88     // reschedule with a new event
       
    89     if (conn->ev) {
       
    90         event_assign(conn->ev, conn->ev_base, PQsocket(conn->pg_conn), what, handler, conn);
       
    91 
       
    92     } else {
       
    93         if ((conn->ev = event_new(conn->ev_base, PQsocket(conn->pg_conn), what, handler, conn)) == NULL)
       
    94             PERROR("event_new");
       
    95 
       
    96     }
       
    97 
       
    98     // add it
       
    99     // XXX: timeouts?
       
   100     if (event_add(conn->ev, NULL))
       
   101         PERROR("event_add");
       
   102     
       
   103     // success
       
   104     return 0;
       
   105 
       
   106 error:
       
   107     return -1;
       
   108 }
       
   109 
       
   110 /*
       
   111  * Handle events on the PQ socket while connecting
       
   112  */
       
   113 static void _evpq_connect_event (evutil_socket_t fd, short what, void *arg) {
       
   114     struct evpq_conn *conn = arg;
       
   115     PostgresPollingStatusType poll_status;
       
   116     
       
   117     // this is only for connect events
       
   118     assert(conn->state == EVPQ_CONNECT);
       
   119     
       
   120     // XXX: timeouts?
       
   121 
       
   122     // ask PQ what to do 
       
   123     switch ((poll_status = PQconnectPoll(conn->pg_conn))) {
       
   124         case PGRES_POLLING_READING:
       
   125             // poll for read
       
   126             what = EV_READ;
       
   127             
       
   128             // reschedule
       
   129             break;
       
   130 
       
   131         case PGRES_POLLING_WRITING:
       
   132             // poll for write
       
   133             what = EV_WRITE;
       
   134             
       
   135             // reschedule
       
   136             break;
       
   137 
       
   138         case PGRES_POLLING_OK:
       
   139             // connected
       
   140             _evpq_connect_ok(conn);
       
   141             
       
   142             // done
       
   143             return;
       
   144         
       
   145         case PGRES_POLLING_FAILED:
       
   146             // faaaaail!
       
   147             _evpq_connect_fail(conn);
       
   148 
       
   149             // done
       
   150             return;
       
   151         
       
   152         default:
       
   153             FATAL("PQconnectPoll gave a weird value: %d", poll_status);
       
   154     }
       
   155     
       
   156     // reschedule
       
   157     if (_evpq_schedule(conn, what, _evpq_connect_event))
       
   158         goto error;
       
   159 
       
   160     // done, wait for the next event
       
   161     return;
       
   162 
       
   163 error:
       
   164     // XXX: reset?
       
   165     _evpq_failure(conn);
       
   166 }
       
   167 
       
   168 static void _evpq_query_event (evutil_socket_t fd, short what, void *arg) {
       
   169     struct evpq_conn *conn = arg;
       
   170     
       
   171     // this is only for query events
       
   172     assert(conn->state == EVPQ_QUERY);
       
   173 
       
   174     // XXX: PQflush, timeouts
       
   175     assert(what == EV_READ);
       
   176 
       
   177     // we're going to assume that all queries will *require* data for their results
       
   178     // this would break otherwise (PQconsumeInput might block?)
       
   179     assert(PQisBusy(conn->pg_conn) != 0);
       
   180 
       
   181     // handle input
       
   182     if (PQconsumeInput(conn->pg_conn) == 0)
       
   183         ERROR("PQconsumeInput: %s", PQerrorMessage(conn->pg_conn));
       
   184     
       
   185     // handle results
       
   186     while (PQisBusy(conn->pg_conn) == 0) {
       
   187         // handle the result
       
   188         if (_evpq_query_result(conn) == 1) {
       
   189             // no need to wait for anything anymore
       
   190             return;
       
   191         }
       
   192 
       
   193         // loop to handle the next result
       
   194     }
       
   195 
       
   196     // still need to wait for a result, so reschedule
       
   197     if (_evpq_schedule(conn, EV_READ, _evpq_query_event))
       
   198         goto error;
       
   199         
       
   200     // done, wait for the next event
       
   201     return;
       
   202 
       
   203 error:
       
   204     // XXX: reset?
       
   205     _evpq_failure(conn);
       
   206 
       
   207 }
       
   208 
       
   209 struct evpq_conn *evpq_connect (struct event_base *ev_base, const char *conninfo, const struct evpq_callback_info cb_info) {
       
   210     struct evpq_conn *conn = NULL;
       
   211     
       
   212     // alloc our context
       
   213     if ((conn = calloc(1, sizeof(*conn))) == NULL)
       
   214         ERROR("calloc");
       
   215     
       
   216     // initial state
       
   217     conn->ev_base = ev_base;
       
   218     conn->user_cb = cb_info;
       
   219     conn->state = EVPQ_INIT;
       
   220 
       
   221     // create our PGconn
       
   222     if ((conn->pg_conn = PQconnectStart(conninfo)) == NULL)
       
   223         PERROR("PQconnectStart");
       
   224     
       
   225     // check for immediate failure
       
   226     if (PQstatus(conn->pg_conn) == CONNECTION_BAD)
       
   227         ERROR("PQstatus indicates CONNECTION_BAD after PQconnectStart");
       
   228     
       
   229     // assume PGRES_POLLING_WRITING
       
   230     if (_evpq_schedule(conn, EV_WRITE, _evpq_connect_event))
       
   231         goto error;
       
   232 
       
   233     // connecting
       
   234     conn->state = EVPQ_CONNECT;
       
   235 
       
   236     // success, wait for the connection to be established
       
   237     return conn;
       
   238 
       
   239 error:
       
   240     if (conn) {
       
   241         if (conn->pg_conn)
       
   242             PQfinish(conn->pg_conn);
       
   243         
       
   244         free(conn);
       
   245     }
       
   246 
       
   247     return NULL;
       
   248 }
       
   249 
       
   250 int evpq_query (struct evpq_conn *conn, const char *command) {
       
   251     // check state
       
   252     if (conn->state != EVPQ_CONNECTED)
       
   253         ERROR("invalid evpq state: %d", conn->state);
       
   254     
       
   255     // do the query
       
   256     if (PQsendQuery(conn->pg_conn, command) == 0)
       
   257         ERROR("PQsendQuery: %s", PQerrorMessage(conn->pg_conn));
       
   258     
       
   259     // update state
       
   260     conn->state = EVPQ_QUERY;
       
   261     
       
   262     // XXX: PQflush
       
   263 
       
   264     // poll for read
       
   265     if (_evpq_schedule(conn, EV_READ, _evpq_query_event))
       
   266         goto error;
       
   267 
       
   268     // and then we wait
       
   269     return 0;
       
   270 
       
   271 error:
       
   272     return -1;
       
   273 }
       
   274 
       
   275 const PGconn *evpq_pgconn (struct evpq_conn *conn) {
       
   276     return conn->pg_conn;
       
   277 }