coro_test.c
author Tero Marttila <terom@fixme.fi>
Sat, 30 Aug 2008 19:13:15 +0300
changeset 49 10c7dce1a043
parent 47 a5c09677ca6f
permissions -rw-r--r--
autogenerate the memcache_test help output, and pipeline memcache requests

// splice
#define _GNU_SOURCE
#include <fcntl.h>

#include <unistd.h>
#include <stdlib.h>
#include <signal.h>
#include <errno.h>

#include <event2/util.h>
#include <event2/event.h>
#include <event2/event_compat.h>
#include <event2/event_struct.h>
#include <pcl.h>

#include "common.h"
#include "config.h"
#include "socket.h"

// what port to listen on
#define LISTEN_PORT 10559

// stack size to use for coroutines
#define CLIENT_STACK_SIZE 128 * 1024
#define PIPELINE_STACK_SIZE 128 * 1024

// this should be equal to the system page size
#define PIPELINE_CHUNKSIZE 4096

struct co_connect_info {
    // what co this is running as
    coroutine_t co;

    // set to the value of errno in case of an error, zero otherwise
    int error;
};

static void _co_connect_write (evutil_socket_t fd, short what, void *arg) {
    struct co_connect_info *ctx = arg;
    
    // retreive the error code
    // XXX: this function was lost in a hg-fuckup
    if (socket_connect_error(fd, &ctx->error))
        ctx->error = -1;   // mark it as failed
    
    // resume the co
    co_call(ctx->co);
}

// coroutine-connect
int co_connect (struct config_endpoint *endpoint, int sock_type, int *sock) {
    // our state
    struct co_connect_info ctx;

    // the event for the connect
    struct event ev;

    // clean it
    memset(&ctx, 0, sizeof(ctx));
    
    // initiate the connect()
    if ((*sock = socket_connect_async(endpoint, sock_type)) == -1)
        goto error;
    
    // set up the event
    event_set(&ev, *sock, EV_WRITE, &_co_connect_write, &ctx);

    if (event_add(&ev, NULL))
        goto error;
    
    // store our co handle in there
    ctx.co = co_current();

    // wait for the result
    co_resume();
    
    // return the error code
    return ctx.error;

error:
    return -1;
}

struct co_splice_info {
    int fd_in, fd_out;
    size_t len;

    short wait;

    struct event ev_read, ev_write;

    int error;

    coroutine_t co;
};

void _co_splice_ev (int fd, short what, void *arg) {
    struct co_splice_info *ctx = arg;
    
    ctx->wait &= ~what;

    if (!ctx->wait) {
        // both ends are readable/writeable!

        long ret;

        if ((ret = splice(ctx->fd_in, NULL, ctx->fd_out, NULL, ctx->len, SPLICE_F_MOVE | SPLICE_F_NONBLOCK)) == -1) {
            if (errno == EAGAIN) {
                ctx->error = 0;
                ctx->len = 0;

            } else {
                PERROR("splice");
            }
        } else {
            ctx->error = 0;
            ctx->len = ret;
        }

        // let the co do it's thing
        co_call(ctx->co);

    }
    

    // ok, done succesfully
    return;

error:
    // let the co handle this error
    ctx->error = errno;
    co_call(ctx->co);
}

// coroutine-splice
int co_splice (int fd_in, int fd_out, size_t *len, int *error) {
    // first, try and splice directly
    // XXX: ensure all sockets are O_NONBLOCK!
    long ret;

    if ((ret = splice(fd_in, NULL, fd_out, NULL, *len, SPLICE_F_MOVE | SPLICE_F_NONBLOCK)) == -1) {
        if (errno == EAGAIN) {
            // wait for in/out to become readable/writeable
            
            // our state
            ZINIT(struct co_splice_info, ctx);

            ctx.fd_in = fd_in;
            ctx.fd_out = fd_out;
            ctx.len = *len;
            ctx.wait = EV_READ | EV_WRITE;
            ctx.co = co_current();
            
            // two events
            event_set(&ctx.ev_read, fd_in, EV_READ, &_co_splice_ev, &ctx);
            event_set(&ctx.ev_write, fd_out, EV_WRITE, &_co_splice_ev, &ctx);

            // add both
            if (
                    event_add(&ctx.ev_read, NULL)
                ||  event_add(&ctx.ev_write, NULL)
            )
                ERROR("event_add"); // XXX: cleanup
            
            // and wait...
            co_resume();
            
            // error
            if (ctx.error) {
                *error = ctx.error;

                return -1;

            } else {
                *len = ctx.len;

                return 0;
            }
        } else
            PERROR("splice");
    } else {
        // success!
        *len = ret;
        
        return 0;
    }

error:
    return -1;
}

/*
struct co_pipeline_info {
    int sock_read, sock_write;

    int pipe_read, pipe_write;
    
    / *
     * Two coroutines:
     *  *   co_write waits for co_read to indicate that there is enough data in the pipe to send, whereupon it will
     *      attempt to splice data from pipe_read to sock_write.
     *  *   co_read waits for data to become available on sock_src, splices it into pipe_write, and notifies co_write
     *      that there is data avilable for sending.
     * /
    coroutine_t co_read, co_write;

    char co_read_stack[PIPELINE_STACK_SIZE], co_write_stack[PIPELINE_STACK_SIZE];
};

void _co_pipeline_read (void *arg) {
    
}

void _co_pipeline_write (void *arg) {

}
*/

int co_pipeline (int sock_read, int sock_write) {
//    ZINIT(struct co_pipeline_info, ctx);
    
    // create our pipe
    int pipefds[2];

    if (pipe(pipefds))
        PERROR("pipe");
    
    // copy over the fds into ctx
    int pipe_read = pipefds[0];
    int pipe_write = pipefds[1];

    // start shuffling data around
    size_t pipe_size = 0, read_len, write_len;
    int error = 0;

    while (1) {
        read_len = PIPELINE_CHUNKSIZE;

        if (co_splice(sock_read, pipe_write, &read_len, &error))
            PERROR("co_splice(sock_read, pipe_write, %zu)", read_len);
        
        if (!read_len) {
            // EOF
            break;
        }

        pipe_size += read_len;

        if (!pipe_size)
            continue;

        while (pipe_size > 0) {
            write_len = pipe_size;

            if (co_splice(pipe_read, sock_write, &write_len, &error))
                PERROR("co_splice(pipe_read, sock_write, %zu", write_len);

            pipe_size -= write_len;
        }
    }

/*
    // create the two coros
    if (
            (ctx.co_read = co_create(&_co_pipeline_read, &ctx, ctx.co_read_stack, PIPELINE_STACK_SIZE)) == NULL
        ||  (ctx.co_write = co_create(&_co_pipeline_write, &ctx, ctx.co_write_stack, PIPELINE_STACK_SIZE)) == NULL
    ) 
        ERROR("co_create");
    
    // first start off the read one, which then eventually calls the write one
    co_call(ctx.co_read);
*/

    // success
    return 0;

error:
/*
    if (ctx.co_read)
        co_delete(ctx.co_read);

    if (ctx.co_write)
        co_delete(ctx.co_write);
*/
    return -1;
}

/*
 * State needed to handle a client
 */
struct client_info {
    // the client socket
    evutil_socket_t socket;

    // the connect target endpoint
    struct config_endpoint *connect_target;

    // the coroutine that handles it
    coroutine_t co;
};

void client_co (void *arg) {
    struct client_info *ctx = arg;
    
    // the outbound socket
    int conn_sock = -1;

    // connect
    if (co_connect(ctx->connect_target, SOCK_STREAM, &conn_sock))
        goto error;
    
    // pipe data around \o/
    if (co_pipeline(ctx->socket, conn_sock))
        goto error;

error:
    // close the sockets
    if (conn_sock != -1)
        if (close(conn_sock))
            PWARNING("close connect_socket");
    
    if (close(ctx->socket))
        PWARNING("close listen_socket");
    
    // free the ctx
    free(ctx);

    // exit
    co_exit();
}

static void handle_accept (evutil_socket_t fd, short event, void *arg) {
    // the new client's client_info
    struct client_info *ctx = NULL;
    
    // the socket from accept()
    evutil_socket_t socket = -1;

    // not used
    struct sockaddr_storage addr;
    socklen_t addr_len;
    
    // arg is NULL and unused
    (void) arg;
    
    // accept the connection
    addr_len = sizeof(struct sockaddr_storage);

    if ((socket = accept(fd, (struct sockaddr *) &addr, &addr_len)) == -1)
        PERROR("accept");
    
    // alloc a new client_info
    if (!(ctx = calloc(1, sizeof(*ctx))))
        ERROR("calloc");
    
    // store the socket
    ctx->socket = socket;

    // and the endpoint
    ctx->connect_target = arg;

    // create the coroutine
    if ((ctx->co = co_create(&client_co, ctx, NULL, CLIENT_STACK_SIZE)) == NULL)
        ERROR("co_create");

    // we can start up the coroutine right away
    co_call(ctx->co);
    
    // done handling this accept
    return;

error:
    if (ctx) {
        if (ctx->co)
            co_delete(ctx->co);

        free(ctx);
    }

    else if (socket != -1)
        close(socket);
}


int main (int argc, char **argv) {
     // libevent init
    struct event_base *ev_base = event_init();

    if (!ev_base)
        FATAL("event_init");
    
    // process command-line arguments
    int opt;
    const char *listen_spec = NULL, *connect_spec = NULL;

    while ((opt = getopt(argc, argv, "hl:c:")) != -1) {
        switch (opt) {
            case 'l':
                if (listen_spec)
                    FATAL("only specify -l once");

                listen_spec = optarg;

                break;

            case 'c':
                if (connect_spec)
                    FATAL("only specify -c once");
                
                connect_spec = optarg;

                break;

            case 'h':
            default:
                err_exit("Usage: %s [-h] [-l listen] -c <connect>", argv[0]);
        
        }
    }

    if (!connect_spec)
        err_exit("Must specify -c; see `%s -h`", argv[0]);
    
    // ignore SIGPIPE
    struct sigaction sigpipe;
    memset(&sigpipe, 0, sizeof(sigpipe));

    sigpipe.sa_handler = SIG_IGN;

    sigaction(SIGPIPE, &sigpipe, NULL);
    
    // endpoints
    struct config_endpoint listen_endpoint, connect_endpoint;
    
    endpoint_init(&listen_endpoint, LISTEN_PORT);
    endpoint_init(&connect_endpoint, 0);

    if (
            endpoint_parse(&listen_endpoint, listen_spec)
        ||  endpoint_parse(&connect_endpoint, connect_spec)
    )
        goto error;
    
    // the listen socket
    int listen_sock;

    if ((listen_sock = socket_listen(&listen_endpoint, SOCK_STREAM)) == -1)
        goto error;

    // create the listen event
    struct event listen_ev;

    event_set(&listen_ev, listen_sock, EV_READ | EV_PERSIST, &handle_accept, &connect_endpoint);

    if (event_add(&listen_ev, NULL))
        PERROR("event_add");

    // we shall now run
    INFO("run");
    
    // run the libevent mainloop
    if (event_base_dispatch(ev_base))
        WARNING("event_dispatch");

    INFO("shutdown");

    // cleanup
    event_base_free(ev_base);
    close(listen_sock);

    return EXIT_SUCCESS;

error:
    return EXIT_FAILURE;
}