src/evsql.c
author Tero Marttila <terom@fixme.fi>
Sun, 12 Oct 2008 00:17:09 +0300
changeset 21 e5da1d428e3e
child 23 1dee73ae4ad0
permissions -rw-r--r--
new evsql for queueing SQL queries
#define _GNU_SOURCE
#include <stdlib.h>
#include <sys/queue.h>
#include <assert.h>
#include <string.h>

#include "evsql.h"
#include "evpq.h"
#include "lib/log.h"
#include "lib/error.h"
#include "lib/misc.h"

enum evsql_type {
    EVSQL_EVPQ,
};

struct evsql {
    // callbacks
    evsql_error_cb error_fn;
    void *cb_arg;

    // backend engine
    enum evsql_type type;

    union {
        struct evpq_conn *evpq;
    } engine;
    
    // list of queries running or waiting to run
    TAILQ_HEAD(evsql_queue, evsql_query) queue;
};

struct evsql_query {
    // the evsql we are querying
    struct evsql *evsql;

    // the actual SQL query, this may or may not be ours, see _evsql_query_exec
    char *command;

    // our callback
    evsql_query_cb cb_fn;
    void *cb_arg;

    // our position in the query list
    TAILQ_ENTRY(evsql_query) entry;

    // the result
    union {
        PGresult *evpq;
    } result;
};

/*
 * Actually execute the given query.
 *
 * The backend should be able to accept the query at this time.
 *
 * query->command must be valid during the execution of this function, but once it returns, the command is not needed
 * anymore, and should be set to NULL.
 */
static int _evsql_query_exec (struct evsql *evsql, struct evsql_query *query, const char *command) {
    switch (evsql->type) {
        case EVSQL_EVPQ:
            // just pass it through
            return evpq_query(evsql->engine.evpq, command);
        
        default:
            FATAL("evsql->type");
    }
}

/*
 * Dequeue the query, execute the callback, and free it.
 */
static void _evsql_query_done (struct evsql_query *query, const struct evsql_result_info *result_info) {
    assert(query->command == NULL);

    // dequeue
    TAILQ_REMOVE(&query->evsql->queue, query, entry);
    
    if (result_info) 
        // call the callback
        query->cb_fn(*result_info, query->cb_arg);
    
    // free
    free(query);
}

/*
 * A query has failed, notify the user and remove it.
 */
static void _evsql_query_failure (struct evsql *evsql, struct evsql_query *query) {
    struct evsql_result_info result; ZINIT(result);

    // set up the result_info
    result.evsql = evsql;
    result.error = 1;

    // finish it off
    _evsql_query_done(query, &result);
}

/*
 * Clear every enqueued query and then free the evsql.
 *
 * If result_info is given, each query will also recieve it via their callback, and the error_fn will be called.
 */
static void _evsql_destroy (struct evsql *evsql, const struct evsql_result_info *result_info) {
    struct evsql_query *query;
    
    // clear the queue
    while ((query = TAILQ_FIRST(&evsql->queue)) != NULL) {
        _evsql_query_done(query, result_info);
        
        TAILQ_REMOVE(&evsql->queue, query, entry);
    }
    
    // do the error callback if required
    if (result_info)
        evsql->error_fn(evsql, evsql->cb_arg);
    
    // free
    free(evsql);
}


/*
 * Sends the next query if there are more enqueued
 */
static void _evsql_pump (struct evsql *evsql) {
    struct evsql_query *query;
    
    // look for the next query
    if ((query = TAILQ_FIRST(&evsql->queue)) != NULL) {
        // try and execute it
        if (_evsql_query_exec(evsql, query, query->command)) {
            // the query failed
            _evsql_query_failure(evsql, query);
        }

        // free the command
        free(query->command); query->command = NULL;

        // ok, then we just wait
    }
}


static void _evsql_evpq_connected (struct evpq_conn *conn, void *arg) {
    struct evsql *evsql = arg;

    // no state to update, just pump any waiting queries
    _evsql_pump(evsql);
}

static void _evsql_evpq_result (struct evpq_conn *conn, PGresult *result, void *arg) {
    struct evsql *evsql = arg;
    struct evsql_query *query;

    assert((query = TAILQ_FIRST(&evsql->queue)) != NULL);

    // if we get multiple results, only return the first one
    if (query->result.evpq) {
        WARNING("[evsql] evpq query returned multiple results, discarding previous one");
        
        PQclear(query->result.evpq); query->result.evpq = NULL;
    }
    
    // remember the result
    query->result.evpq = result;
}

static void _evsql_evpq_done (struct evpq_conn *conn, void *arg) {
    struct evsql *evsql = arg;
    struct evsql_query *query;
    struct evsql_result_info result; ZINIT(result);

    assert((query = TAILQ_FIRST(&evsql->queue)) != NULL);
    
    // set up the result_info
    result.evsql = evsql;
    
    if (query->result.evpq == NULL) {
        // if a query didn't return any results (bug?), warn and fail the query
        WARNING("[evsql] evpq query didn't return any results");

        result.error = 1;

    } else {
        result.error = 0;
        result.result.pq = query->result.evpq;

    }

    // finish it off
    _evsql_query_done(query, &result);

    // pump the next one
    _evsql_pump(evsql);
}

static void _evsql_evpq_failure (struct evpq_conn *conn, void *arg) {
    struct evsql *evsql = arg;
    struct evsql_result_info result; ZINIT(result);
    
    // OH SHI...
    
    // set up the result_info
    result.evsql = evsql;
    result.error = 1;

    // finish off the whole connection
    _evsql_destroy(evsql, &result);
}

static struct evpq_callback_info _evsql_evpq_cb_info = {
    .fn_connected       = _evsql_evpq_connected,
    .fn_result          = _evsql_evpq_result,
    .fn_done            = _evsql_evpq_done,
    .fn_failure         = _evsql_evpq_failure,
};

static struct evsql *_evsql_new_base (evsql_error_cb error_fn, void *cb_arg) {
    struct evsql *evsql = NULL;
    
    // allocate it
    if ((evsql = calloc(1, sizeof(*evsql))) == NULL)
        ERROR("calloc");

    // store
    evsql->error_fn = error_fn;
    evsql->cb_arg = cb_arg;

    // init
    TAILQ_INIT(&evsql->queue);

    // done
    return evsql;

error:
    return NULL;
}

struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) {
    struct evsql *evsql = NULL;
    
    // base init
    if ((evsql = _evsql_new_base (error_fn, cb_arg)) == NULL)
        goto error;

    // connect the engine
    if ((evsql->engine.evpq = evpq_connect(ev_base, pq_conninfo, _evsql_evpq_cb_info, evsql)) == NULL)
        goto error;

    // done
    return evsql;

error:
    // XXX: more complicated than this?
    free(evsql); 

    return NULL;
}

/*
 * Checks what the state of the connection is in regards to executing a query.
 *
 * Returns:
 *      <0      connection failure, query not possible
 *      0       connection idle, can query immediately
 *      1       connection busy, must queue query
 */
static int _evsql_query_idle (struct evsql *evsql) {
    switch (evsql->type) {
        case EVSQL_EVPQ: {
            enum evpq_state state = evpq_state(evsql->engine.evpq);
            
            switch (state) {
                case EVPQ_CONNECT:
                case EVPQ_QUERY:
                    return 1;
                
                case EVPQ_CONNECTED:
                    return 0;

                case EVPQ_INIT:
                case EVPQ_FAILURE:
                    return -1;
                
                default:
                    FATAL("evpq_state");
            }

        }
        
        default:
            FATAL("evsql->type");
    }
}


struct evsql_query *evsql_query (struct evsql *evsql, const char *command, evsql_query_cb query_fn, void *cb_arg) {
    struct evsql_query *query;
    int idle;

    // allocate it
    if ((query = calloc(1, sizeof(*query))) == NULL)
        ERROR("calloc");

    // store
    query->evsql = evsql;
    query->cb_fn = query_fn;
    query->cb_arg = cb_arg;
    
    // check state
    if ((idle = _evsql_query_idle(evsql)) < 0)
        ERROR("connection is not valid");
    
    if (idle) {
        assert(TAILQ_EMPTY(&evsql->queue));

        // execute directly
        if (_evsql_query_exec(evsql, query, command))
            goto error;

    } else {
        // copy the command for later execution
        if ((query->command = strdup(command)) == NULL)
            ERROR("strdup");
    }
    
    // store it on the list
    TAILQ_INSERT_TAIL(&evsql->queue, query, entry);
    
    // success
    return query;

error:
    // do *NOT* free query->command, ever
    free(query);

    return NULL;
}