add the coro_test.c file, and update .hgignore a bit
authorTero Marttila <>
Thu, 28 Aug 2008 03:14:07 +0300
changeset 47 a5c09677ca6f
parent 46 8a832c0e01ee
child 48 1c67f512779b
add the coro_test.c file, and update .hgignore a bit
--- a/.hgignore	Thu Aug 28 03:12:11 2008 +0300
+++ b/.hgignore	Thu Aug 28 03:14:07 2008 +0300
@@ -5,5 +5,8 @@
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/coro_test.c	Thu Aug 28 03:14:07 2008 +0300
@@ -0,0 +1,477 @@
+// splice
+#define _GNU_SOURCE
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <signal.h>
+#include <errno.h>
+#include <event2/util.h>
+#include <event2/event.h>
+#include <event2/event_compat.h>
+#include <event2/event_struct.h>
+#include <pcl.h>
+#include "common.h"
+#include "config.h"
+#include "socket.h"
+// what port to listen on
+#define LISTEN_PORT 10559
+// stack size to use for coroutines
+#define CLIENT_STACK_SIZE 128 * 1024
+#define PIPELINE_STACK_SIZE 128 * 1024
+// this should be equal to the system page size
+struct co_connect_info {
+    // what co this is running as
+    coroutine_t co;
+    // set to the value of errno in case of an error, zero otherwise
+    int error;
+static void _co_connect_write (evutil_socket_t fd, short what, void *arg) {
+    struct co_connect_info *ctx = arg;
+    // retreive the error code
+    // XXX: this function was lost in a hg-fuckup
+    if (socket_connect_error(fd, &ctx->error))
+        ctx->error = -1;   // mark it as failed
+    // resume the co
+    co_call(ctx->co);
+// coroutine-connect
+int co_connect (struct config_endpoint *endpoint, int sock_type, int *sock) {
+    // our state
+    struct co_connect_info ctx;
+    // the event for the connect
+    struct event ev;
+    // clean it
+    memset(&ctx, 0, sizeof(ctx));
+    // initiate the connect()
+    if ((*sock = socket_connect_async(endpoint, sock_type)) == -1)
+        goto error;
+    // set up the event
+    event_set(&ev, *sock, EV_WRITE, &_co_connect_write, &ctx);
+    if (event_add(&ev, NULL))
+        goto error;
+    // store our co handle in there
+ = co_current();
+    // wait for the result
+    co_resume();
+    // return the error code
+    return ctx.error;
+    return -1;
+struct co_splice_info {
+    int fd_in, fd_out;
+    size_t len;
+    short wait;
+    struct event ev_read, ev_write;
+    int error;
+    coroutine_t co;
+void _co_splice_ev (int fd, short what, void *arg) {
+    struct co_splice_info *ctx = arg;
+    ctx->wait &= ~what;
+    if (!ctx->wait) {
+        // both ends are readable/writeable!
+        long ret;
+        if ((ret = splice(ctx->fd_in, NULL, ctx->fd_out, NULL, ctx->len, SPLICE_F_MOVE | SPLICE_F_NONBLOCK)) == -1) {
+            if (errno == EAGAIN) {
+                ctx->error = 0;
+                ctx->len = 0;
+            } else {
+                PERROR("splice");
+            }
+        } else {
+            ctx->error = 0;
+            ctx->len = ret;
+        }
+        // let the co do it's thing
+        co_call(ctx->co);
+    }
+    // ok, done succesfully
+    return;
+    // let the co handle this error
+    ctx->error = errno;
+    co_call(ctx->co);
+// coroutine-splice
+int co_splice (int fd_in, int fd_out, size_t *len, int *error) {
+    // first, try and splice directly
+    // XXX: ensure all sockets are O_NONBLOCK!
+    long ret;
+    if ((ret = splice(fd_in, NULL, fd_out, NULL, *len, SPLICE_F_MOVE | SPLICE_F_NONBLOCK)) == -1) {
+        if (errno == EAGAIN) {
+            // wait for in/out to become readable/writeable
+            // our state
+            ZINIT(struct co_splice_info, ctx);
+            ctx.fd_in = fd_in;
+            ctx.fd_out = fd_out;
+            ctx.len = *len;
+            ctx.wait = EV_READ | EV_WRITE;
+   = co_current();
+            // two events
+            event_set(&ctx.ev_read, fd_in, EV_READ, &_co_splice_ev, &ctx);
+            event_set(&ctx.ev_write, fd_out, EV_WRITE, &_co_splice_ev, &ctx);
+            // add both
+            if (
+                    event_add(&ctx.ev_read, NULL)
+                ||  event_add(&ctx.ev_write, NULL)
+            )
+                ERROR("event_add"); // XXX: cleanup
+            // and wait...
+            co_resume();
+            // error
+            if (ctx.error) {
+                *error = ctx.error;
+                return -1;
+            } else {
+                *len = ctx.len;
+                return 0;
+            }
+        } else
+            PERROR("splice");
+    } else {
+        // success!
+        *len = ret;
+        return 0;
+    }
+    return -1;
+struct co_pipeline_info {
+    int sock_read, sock_write;
+    int pipe_read, pipe_write;
+    / *
+     * Two coroutines:
+     *  *   co_write waits for co_read to indicate that there is enough data in the pipe to send, whereupon it will
+     *      attempt to splice data from pipe_read to sock_write.
+     *  *   co_read waits for data to become available on sock_src, splices it into pipe_write, and notifies co_write
+     *      that there is data avilable for sending.
+     * /
+    coroutine_t co_read, co_write;
+    char co_read_stack[PIPELINE_STACK_SIZE], co_write_stack[PIPELINE_STACK_SIZE];
+void _co_pipeline_read (void *arg) {
+void _co_pipeline_write (void *arg) {
+int co_pipeline (int sock_read, int sock_write) {
+//    ZINIT(struct co_pipeline_info, ctx);
+    // create our pipe
+    int pipefds[2];
+    if (pipe(pipefds))
+        PERROR("pipe");
+    // copy over the fds into ctx
+    int pipe_read = pipefds[0];
+    int pipe_write = pipefds[1];
+    // start shuffling data around
+    size_t pipe_size = 0, read_len, write_len;
+    int error = 0;
+    while (1) {
+        read_len = PIPELINE_CHUNKSIZE;
+        if (co_splice(sock_read, pipe_write, &read_len, &error))
+            PERROR("co_splice(sock_read, pipe_write, %zu)", read_len);
+        if (!read_len) {
+            // EOF
+            break;
+        }
+        pipe_size += read_len;
+        if (!pipe_size)
+            continue;
+        while (pipe_size > 0) {
+            write_len = pipe_size;
+            if (co_splice(pipe_read, sock_write, &write_len, &error))
+                PERROR("co_splice(pipe_read, sock_write, %zu", write_len);
+            pipe_size -= write_len;
+        }
+    }
+    // create the two coros
+    if (
+            (ctx.co_read = co_create(&_co_pipeline_read, &ctx, ctx.co_read_stack, PIPELINE_STACK_SIZE)) == NULL
+        ||  (ctx.co_write = co_create(&_co_pipeline_write, &ctx, ctx.co_write_stack, PIPELINE_STACK_SIZE)) == NULL
+    ) 
+        ERROR("co_create");
+    // first start off the read one, which then eventually calls the write one
+    co_call(ctx.co_read);
+    // success
+    return 0;
+    if (ctx.co_read)
+        co_delete(ctx.co_read);
+    if (ctx.co_write)
+        co_delete(ctx.co_write);
+    return -1;
+ * State needed to handle a client
+ */
+struct client_info {
+    // the client socket
+    evutil_socket_t socket;
+    // the connect target endpoint
+    struct config_endpoint *connect_target;
+    // the coroutine that handles it
+    coroutine_t co;
+void client_co (void *arg) {
+    struct client_info *ctx = arg;
+    // the outbound socket
+    int conn_sock = -1;
+    // connect
+    if (co_connect(ctx->connect_target, SOCK_STREAM, &conn_sock))
+        goto error;
+    // pipe data around \o/
+    if (co_pipeline(ctx->socket, conn_sock))
+        goto error;
+    // close the sockets
+    if (conn_sock != -1)
+        if (close(conn_sock))
+            PWARNING("close connect_socket");
+    if (close(ctx->socket))
+        PWARNING("close listen_socket");
+    // free the ctx
+    free(ctx);
+    // exit
+    co_exit();
+static void handle_accept (evutil_socket_t fd, short event, void *arg) {
+    // the new client's client_info
+    struct client_info *ctx = NULL;
+    // the socket from accept()
+    evutil_socket_t socket = -1;
+    // not used
+    struct sockaddr_storage addr;
+    socklen_t addr_len;
+    // arg is NULL and unused
+    (void) arg;
+    // accept the connection
+    addr_len = sizeof(struct sockaddr_storage);
+    if ((socket = accept(fd, (struct sockaddr *) &addr, &addr_len)) == -1)
+        PERROR("accept");
+    // alloc a new client_info
+    if (!(ctx = calloc(1, sizeof(*ctx))))
+        ERROR("calloc");
+    // store the socket
+    ctx->socket = socket;
+    // and the endpoint
+    ctx->connect_target = arg;
+    // create the coroutine
+    if ((ctx->co = co_create(&client_co, ctx, NULL, CLIENT_STACK_SIZE)) == NULL)
+        ERROR("co_create");
+    // we can start up the coroutine right away
+    co_call(ctx->co);
+    // done handling this accept
+    return;
+    if (ctx) {
+        if (ctx->co)
+            co_delete(ctx->co);
+        free(ctx);
+    }
+    else if (socket != -1)
+        close(socket);
+int main (int argc, char **argv) {
+     // libevent init
+    struct event_base *ev_base = event_init();
+    if (!ev_base)
+        FATAL("event_init");
+    // process command-line arguments
+    int opt;
+    const char *listen_spec = NULL, *connect_spec = NULL;
+    while ((opt = getopt(argc, argv, "hl:c:")) != -1) {
+        switch (opt) {
+            case 'l':
+                if (listen_spec)
+                    FATAL("only specify -l once");
+                listen_spec = optarg;
+                break;
+            case 'c':
+                if (connect_spec)
+                    FATAL("only specify -c once");
+                connect_spec = optarg;
+                break;
+            case 'h':
+            default:
+                err_exit("Usage: %s [-h] [-l listen] -c <connect>", argv[0]);
+        }
+    }
+    if (!connect_spec)
+        err_exit("Must specify -c; see `%s -h`", argv[0]);
+    // ignore SIGPIPE
+    struct sigaction sigpipe;
+    memset(&sigpipe, 0, sizeof(sigpipe));
+    sigpipe.sa_handler = SIG_IGN;
+    sigaction(SIGPIPE, &sigpipe, NULL);
+    // endpoints
+    struct config_endpoint listen_endpoint, connect_endpoint;
+    endpoint_init(&listen_endpoint, LISTEN_PORT);
+    endpoint_init(&connect_endpoint, 0);
+    if (
+            endpoint_parse(&listen_endpoint, listen_spec)
+        ||  endpoint_parse(&connect_endpoint, connect_spec)
+    )
+        goto error;
+    // the listen socket
+    int listen_sock;
+    if ((listen_sock = socket_listen(&listen_endpoint, SOCK_STREAM)) == -1)
+        goto error;
+    // create the listen event
+    struct event listen_ev;
+    event_set(&listen_ev, listen_sock, EV_READ | EV_PERSIST, &handle_accept, &connect_endpoint);
+    if (event_add(&listen_ev, NULL))
+        PERROR("event_add");
+    // we shall now run
+    INFO("run");
+    // run the libevent mainloop
+    if (event_base_dispatch(ev_base))
+        WARNING("event_dispatch");
+    INFO("shutdown");
+    // cleanup
+    event_base_free(ev_base);
+    close(listen_sock);
+    return EXIT_SUCCESS;
+    return EXIT_FAILURE;