--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/coro_test.c Thu Aug 28 03:14:07 2008 +0300
@@ -0,0 +1,477 @@
+
+// splice
+#define _GNU_SOURCE
+#include <fcntl.h>
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <signal.h>
+#include <errno.h>
+
+#include <event2/util.h>
+#include <event2/event.h>
+#include <event2/event_compat.h>
+#include <event2/event_struct.h>
+#include <pcl.h>
+
+#include "common.h"
+#include "config.h"
+#include "socket.h"
+
+// what port to listen on
+#define LISTEN_PORT 10559
+
+// stack size to use for coroutines
+#define CLIENT_STACK_SIZE 128 * 1024
+#define PIPELINE_STACK_SIZE 128 * 1024
+
+// this should be equal to the system page size
+#define PIPELINE_CHUNKSIZE 4096
+
+struct co_connect_info {
+ // what co this is running as
+ coroutine_t co;
+
+ // set to the value of errno in case of an error, zero otherwise
+ int error;
+};
+
+static void _co_connect_write (evutil_socket_t fd, short what, void *arg) {
+ struct co_connect_info *ctx = arg;
+
+ // retreive the error code
+ // XXX: this function was lost in a hg-fuckup
+ if (socket_connect_error(fd, &ctx->error))
+ ctx->error = -1; // mark it as failed
+
+ // resume the co
+ co_call(ctx->co);
+}
+
+// coroutine-connect
+int co_connect (struct config_endpoint *endpoint, int sock_type, int *sock) {
+ // our state
+ struct co_connect_info ctx;
+
+ // the event for the connect
+ struct event ev;
+
+ // clean it
+ memset(&ctx, 0, sizeof(ctx));
+
+ // initiate the connect()
+ if ((*sock = socket_connect_async(endpoint, sock_type)) == -1)
+ goto error;
+
+ // set up the event
+ event_set(&ev, *sock, EV_WRITE, &_co_connect_write, &ctx);
+
+ if (event_add(&ev, NULL))
+ goto error;
+
+ // store our co handle in there
+ ctx.co = co_current();
+
+ // wait for the result
+ co_resume();
+
+ // return the error code
+ return ctx.error;
+
+error:
+ return -1;
+}
+
+struct co_splice_info {
+ int fd_in, fd_out;
+ size_t len;
+
+ short wait;
+
+ struct event ev_read, ev_write;
+
+ int error;
+
+ coroutine_t co;
+};
+
+void _co_splice_ev (int fd, short what, void *arg) {
+ struct co_splice_info *ctx = arg;
+
+ ctx->wait &= ~what;
+
+ if (!ctx->wait) {
+ // both ends are readable/writeable!
+
+ long ret;
+
+ if ((ret = splice(ctx->fd_in, NULL, ctx->fd_out, NULL, ctx->len, SPLICE_F_MOVE | SPLICE_F_NONBLOCK)) == -1) {
+ if (errno == EAGAIN) {
+ ctx->error = 0;
+ ctx->len = 0;
+
+ } else {
+ PERROR("splice");
+ }
+ } else {
+ ctx->error = 0;
+ ctx->len = ret;
+ }
+
+ // let the co do it's thing
+ co_call(ctx->co);
+
+ }
+
+
+ // ok, done succesfully
+ return;
+
+error:
+ // let the co handle this error
+ ctx->error = errno;
+ co_call(ctx->co);
+}
+
+// coroutine-splice
+int co_splice (int fd_in, int fd_out, size_t *len, int *error) {
+ // first, try and splice directly
+ // XXX: ensure all sockets are O_NONBLOCK!
+ long ret;
+
+ if ((ret = splice(fd_in, NULL, fd_out, NULL, *len, SPLICE_F_MOVE | SPLICE_F_NONBLOCK)) == -1) {
+ if (errno == EAGAIN) {
+ // wait for in/out to become readable/writeable
+
+ // our state
+ ZINIT(struct co_splice_info, ctx);
+
+ ctx.fd_in = fd_in;
+ ctx.fd_out = fd_out;
+ ctx.len = *len;
+ ctx.wait = EV_READ | EV_WRITE;
+ ctx.co = co_current();
+
+ // two events
+ event_set(&ctx.ev_read, fd_in, EV_READ, &_co_splice_ev, &ctx);
+ event_set(&ctx.ev_write, fd_out, EV_WRITE, &_co_splice_ev, &ctx);
+
+ // add both
+ if (
+ event_add(&ctx.ev_read, NULL)
+ || event_add(&ctx.ev_write, NULL)
+ )
+ ERROR("event_add"); // XXX: cleanup
+
+ // and wait...
+ co_resume();
+
+ // error
+ if (ctx.error) {
+ *error = ctx.error;
+
+ return -1;
+
+ } else {
+ *len = ctx.len;
+
+ return 0;
+ }
+ } else
+ PERROR("splice");
+ } else {
+ // success!
+ *len = ret;
+
+ return 0;
+ }
+
+error:
+ return -1;
+}
+
+/*
+struct co_pipeline_info {
+ int sock_read, sock_write;
+
+ int pipe_read, pipe_write;
+
+ / *
+ * Two coroutines:
+ * * co_write waits for co_read to indicate that there is enough data in the pipe to send, whereupon it will
+ * attempt to splice data from pipe_read to sock_write.
+ * * co_read waits for data to become available on sock_src, splices it into pipe_write, and notifies co_write
+ * that there is data avilable for sending.
+ * /
+ coroutine_t co_read, co_write;
+
+ char co_read_stack[PIPELINE_STACK_SIZE], co_write_stack[PIPELINE_STACK_SIZE];
+};
+
+void _co_pipeline_read (void *arg) {
+
+}
+
+void _co_pipeline_write (void *arg) {
+
+}
+*/
+
+int co_pipeline (int sock_read, int sock_write) {
+// ZINIT(struct co_pipeline_info, ctx);
+
+ // create our pipe
+ int pipefds[2];
+
+ if (pipe(pipefds))
+ PERROR("pipe");
+
+ // copy over the fds into ctx
+ int pipe_read = pipefds[0];
+ int pipe_write = pipefds[1];
+
+ // start shuffling data around
+ size_t pipe_size = 0, read_len, write_len;
+ int error = 0;
+
+ while (1) {
+ read_len = PIPELINE_CHUNKSIZE;
+
+ if (co_splice(sock_read, pipe_write, &read_len, &error))
+ PERROR("co_splice(sock_read, pipe_write, %zu)", read_len);
+
+ if (!read_len) {
+ // EOF
+ break;
+ }
+
+ pipe_size += read_len;
+
+ if (!pipe_size)
+ continue;
+
+ while (pipe_size > 0) {
+ write_len = pipe_size;
+
+ if (co_splice(pipe_read, sock_write, &write_len, &error))
+ PERROR("co_splice(pipe_read, sock_write, %zu", write_len);
+
+ pipe_size -= write_len;
+ }
+ }
+
+/*
+ // create the two coros
+ if (
+ (ctx.co_read = co_create(&_co_pipeline_read, &ctx, ctx.co_read_stack, PIPELINE_STACK_SIZE)) == NULL
+ || (ctx.co_write = co_create(&_co_pipeline_write, &ctx, ctx.co_write_stack, PIPELINE_STACK_SIZE)) == NULL
+ )
+ ERROR("co_create");
+
+ // first start off the read one, which then eventually calls the write one
+ co_call(ctx.co_read);
+*/
+
+ // success
+ return 0;
+
+error:
+/*
+ if (ctx.co_read)
+ co_delete(ctx.co_read);
+
+ if (ctx.co_write)
+ co_delete(ctx.co_write);
+*/
+ return -1;
+}
+
+/*
+ * State needed to handle a client
+ */
+struct client_info {
+ // the client socket
+ evutil_socket_t socket;
+
+ // the connect target endpoint
+ struct config_endpoint *connect_target;
+
+ // the coroutine that handles it
+ coroutine_t co;
+};
+
+void client_co (void *arg) {
+ struct client_info *ctx = arg;
+
+ // the outbound socket
+ int conn_sock = -1;
+
+ // connect
+ if (co_connect(ctx->connect_target, SOCK_STREAM, &conn_sock))
+ goto error;
+
+ // pipe data around \o/
+ if (co_pipeline(ctx->socket, conn_sock))
+ goto error;
+
+error:
+ // close the sockets
+ if (conn_sock != -1)
+ if (close(conn_sock))
+ PWARNING("close connect_socket");
+
+ if (close(ctx->socket))
+ PWARNING("close listen_socket");
+
+ // free the ctx
+ free(ctx);
+
+ // exit
+ co_exit();
+}
+
+static void handle_accept (evutil_socket_t fd, short event, void *arg) {
+ // the new client's client_info
+ struct client_info *ctx = NULL;
+
+ // the socket from accept()
+ evutil_socket_t socket = -1;
+
+ // not used
+ struct sockaddr_storage addr;
+ socklen_t addr_len;
+
+ // arg is NULL and unused
+ (void) arg;
+
+ // accept the connection
+ addr_len = sizeof(struct sockaddr_storage);
+
+ if ((socket = accept(fd, (struct sockaddr *) &addr, &addr_len)) == -1)
+ PERROR("accept");
+
+ // alloc a new client_info
+ if (!(ctx = calloc(1, sizeof(*ctx))))
+ ERROR("calloc");
+
+ // store the socket
+ ctx->socket = socket;
+
+ // and the endpoint
+ ctx->connect_target = arg;
+
+ // create the coroutine
+ if ((ctx->co = co_create(&client_co, ctx, NULL, CLIENT_STACK_SIZE)) == NULL)
+ ERROR("co_create");
+
+ // we can start up the coroutine right away
+ co_call(ctx->co);
+
+ // done handling this accept
+ return;
+
+error:
+ if (ctx) {
+ if (ctx->co)
+ co_delete(ctx->co);
+
+ free(ctx);
+ }
+
+ else if (socket != -1)
+ close(socket);
+}
+
+
+int main (int argc, char **argv) {
+ // libevent init
+ struct event_base *ev_base = event_init();
+
+ if (!ev_base)
+ FATAL("event_init");
+
+ // process command-line arguments
+ int opt;
+ const char *listen_spec = NULL, *connect_spec = NULL;
+
+ while ((opt = getopt(argc, argv, "hl:c:")) != -1) {
+ switch (opt) {
+ case 'l':
+ if (listen_spec)
+ FATAL("only specify -l once");
+
+ listen_spec = optarg;
+
+ break;
+
+ case 'c':
+ if (connect_spec)
+ FATAL("only specify -c once");
+
+ connect_spec = optarg;
+
+ break;
+
+ case 'h':
+ default:
+ err_exit("Usage: %s [-h] [-l listen] -c <connect>", argv[0]);
+
+ }
+ }
+
+ if (!connect_spec)
+ err_exit("Must specify -c; see `%s -h`", argv[0]);
+
+ // ignore SIGPIPE
+ struct sigaction sigpipe;
+ memset(&sigpipe, 0, sizeof(sigpipe));
+
+ sigpipe.sa_handler = SIG_IGN;
+
+ sigaction(SIGPIPE, &sigpipe, NULL);
+
+ // endpoints
+ struct config_endpoint listen_endpoint, connect_endpoint;
+
+ endpoint_init(&listen_endpoint, LISTEN_PORT);
+ endpoint_init(&connect_endpoint, 0);
+
+ if (
+ endpoint_parse(&listen_endpoint, listen_spec)
+ || endpoint_parse(&connect_endpoint, connect_spec)
+ )
+ goto error;
+
+ // the listen socket
+ int listen_sock;
+
+ if ((listen_sock = socket_listen(&listen_endpoint, SOCK_STREAM)) == -1)
+ goto error;
+
+ // create the listen event
+ struct event listen_ev;
+
+ event_set(&listen_ev, listen_sock, EV_READ | EV_PERSIST, &handle_accept, &connect_endpoint);
+
+ if (event_add(&listen_ev, NULL))
+ PERROR("event_add");
+
+ // we shall now run
+ INFO("run");
+
+ // run the libevent mainloop
+ if (event_base_dispatch(ev_base))
+ WARNING("event_dispatch");
+
+ INFO("shutdown");
+
+ // cleanup
+ event_base_free(ev_base);
+ close(listen_sock);
+
+ return EXIT_SUCCESS;
+
+error:
+ return EXIT_FAILURE;
+}
+