// splice
#define _GNU_SOURCE
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <signal.h>
#include <errno.h>
#include <event2/util.h>
#include <event2/event.h>
#include <event2/event_compat.h>
#include <event2/event_struct.h>
#include <pcl.h>
#include "common.h"
#include "config.h"
#include "socket.h"
// what port to listen on
#define LISTEN_PORT 10559
// stack size to use for coroutines
#define CLIENT_STACK_SIZE 128 * 1024
#define PIPELINE_STACK_SIZE 128 * 1024
// this should be equal to the system page size
#define PIPELINE_CHUNKSIZE 4096
struct co_connect_info {
// what co this is running as
coroutine_t co;
// set to the value of errno in case of an error, zero otherwise
int error;
};
static void _co_connect_write (evutil_socket_t fd, short what, void *arg) {
struct co_connect_info *ctx = arg;
// retreive the error code
// XXX: this function was lost in a hg-fuckup
if (socket_connect_error(fd, &ctx->error))
ctx->error = -1; // mark it as failed
// resume the co
co_call(ctx->co);
}
// coroutine-connect
int co_connect (struct config_endpoint *endpoint, int sock_type, int *sock) {
// our state
struct co_connect_info ctx;
// the event for the connect
struct event ev;
// clean it
memset(&ctx, 0, sizeof(ctx));
// initiate the connect()
if ((*sock = socket_connect_async(endpoint, sock_type)) == -1)
goto error;
// set up the event
event_set(&ev, *sock, EV_WRITE, &_co_connect_write, &ctx);
if (event_add(&ev, NULL))
goto error;
// store our co handle in there
ctx.co = co_current();
// wait for the result
co_resume();
// return the error code
return ctx.error;
error:
return -1;
}
struct co_splice_info {
int fd_in, fd_out;
size_t len;
short wait;
struct event ev_read, ev_write;
int error;
coroutine_t co;
};
void _co_splice_ev (int fd, short what, void *arg) {
struct co_splice_info *ctx = arg;
ctx->wait &= ~what;
if (!ctx->wait) {
// both ends are readable/writeable!
long ret;
if ((ret = splice(ctx->fd_in, NULL, ctx->fd_out, NULL, ctx->len, SPLICE_F_MOVE | SPLICE_F_NONBLOCK)) == -1) {
if (errno == EAGAIN) {
ctx->error = 0;
ctx->len = 0;
} else {
PERROR("splice");
}
} else {
ctx->error = 0;
ctx->len = ret;
}
// let the co do it's thing
co_call(ctx->co);
}
// ok, done succesfully
return;
error:
// let the co handle this error
ctx->error = errno;
co_call(ctx->co);
}
// coroutine-splice
int co_splice (int fd_in, int fd_out, size_t *len, int *error) {
// first, try and splice directly
// XXX: ensure all sockets are O_NONBLOCK!
long ret;
if ((ret = splice(fd_in, NULL, fd_out, NULL, *len, SPLICE_F_MOVE | SPLICE_F_NONBLOCK)) == -1) {
if (errno == EAGAIN) {
// wait for in/out to become readable/writeable
// our state
ZINIT(struct co_splice_info, ctx);
ctx.fd_in = fd_in;
ctx.fd_out = fd_out;
ctx.len = *len;
ctx.wait = EV_READ | EV_WRITE;
ctx.co = co_current();
// two events
event_set(&ctx.ev_read, fd_in, EV_READ, &_co_splice_ev, &ctx);
event_set(&ctx.ev_write, fd_out, EV_WRITE, &_co_splice_ev, &ctx);
// add both
if (
event_add(&ctx.ev_read, NULL)
|| event_add(&ctx.ev_write, NULL)
)
ERROR("event_add"); // XXX: cleanup
// and wait...
co_resume();
// error
if (ctx.error) {
*error = ctx.error;
return -1;
} else {
*len = ctx.len;
return 0;
}
} else
PERROR("splice");
} else {
// success!
*len = ret;
return 0;
}
error:
return -1;
}
/*
struct co_pipeline_info {
int sock_read, sock_write;
int pipe_read, pipe_write;
/ *
* Two coroutines:
* * co_write waits for co_read to indicate that there is enough data in the pipe to send, whereupon it will
* attempt to splice data from pipe_read to sock_write.
* * co_read waits for data to become available on sock_src, splices it into pipe_write, and notifies co_write
* that there is data avilable for sending.
* /
coroutine_t co_read, co_write;
char co_read_stack[PIPELINE_STACK_SIZE], co_write_stack[PIPELINE_STACK_SIZE];
};
void _co_pipeline_read (void *arg) {
}
void _co_pipeline_write (void *arg) {
}
*/
int co_pipeline (int sock_read, int sock_write) {
// ZINIT(struct co_pipeline_info, ctx);
// create our pipe
int pipefds[2];
if (pipe(pipefds))
PERROR("pipe");
// copy over the fds into ctx
int pipe_read = pipefds[0];
int pipe_write = pipefds[1];
// start shuffling data around
size_t pipe_size = 0, read_len, write_len;
int error = 0;
while (1) {
read_len = PIPELINE_CHUNKSIZE;
if (co_splice(sock_read, pipe_write, &read_len, &error))
PERROR("co_splice(sock_read, pipe_write, %zu)", read_len);
if (!read_len) {
// EOF
break;
}
pipe_size += read_len;
if (!pipe_size)
continue;
while (pipe_size > 0) {
write_len = pipe_size;
if (co_splice(pipe_read, sock_write, &write_len, &error))
PERROR("co_splice(pipe_read, sock_write, %zu", write_len);
pipe_size -= write_len;
}
}
/*
// create the two coros
if (
(ctx.co_read = co_create(&_co_pipeline_read, &ctx, ctx.co_read_stack, PIPELINE_STACK_SIZE)) == NULL
|| (ctx.co_write = co_create(&_co_pipeline_write, &ctx, ctx.co_write_stack, PIPELINE_STACK_SIZE)) == NULL
)
ERROR("co_create");
// first start off the read one, which then eventually calls the write one
co_call(ctx.co_read);
*/
// success
return 0;
error:
/*
if (ctx.co_read)
co_delete(ctx.co_read);
if (ctx.co_write)
co_delete(ctx.co_write);
*/
return -1;
}
/*
* State needed to handle a client
*/
struct client_info {
// the client socket
evutil_socket_t socket;
// the connect target endpoint
struct config_endpoint *connect_target;
// the coroutine that handles it
coroutine_t co;
};
void client_co (void *arg) {
struct client_info *ctx = arg;
// the outbound socket
int conn_sock = -1;
// connect
if (co_connect(ctx->connect_target, SOCK_STREAM, &conn_sock))
goto error;
// pipe data around \o/
if (co_pipeline(ctx->socket, conn_sock))
goto error;
error:
// close the sockets
if (conn_sock != -1)
if (close(conn_sock))
PWARNING("close connect_socket");
if (close(ctx->socket))
PWARNING("close listen_socket");
// free the ctx
free(ctx);
// exit
co_exit();
}
static void handle_accept (evutil_socket_t fd, short event, void *arg) {
// the new client's client_info
struct client_info *ctx = NULL;
// the socket from accept()
evutil_socket_t socket = -1;
// not used
struct sockaddr_storage addr;
socklen_t addr_len;
// arg is NULL and unused
(void) arg;
// accept the connection
addr_len = sizeof(struct sockaddr_storage);
if ((socket = accept(fd, (struct sockaddr *) &addr, &addr_len)) == -1)
PERROR("accept");
// alloc a new client_info
if (!(ctx = calloc(1, sizeof(*ctx))))
ERROR("calloc");
// store the socket
ctx->socket = socket;
// and the endpoint
ctx->connect_target = arg;
// create the coroutine
if ((ctx->co = co_create(&client_co, ctx, NULL, CLIENT_STACK_SIZE)) == NULL)
ERROR("co_create");
// we can start up the coroutine right away
co_call(ctx->co);
// done handling this accept
return;
error:
if (ctx) {
if (ctx->co)
co_delete(ctx->co);
free(ctx);
}
else if (socket != -1)
close(socket);
}
int main (int argc, char **argv) {
// libevent init
struct event_base *ev_base = event_init();
if (!ev_base)
FATAL("event_init");
// process command-line arguments
int opt;
const char *listen_spec = NULL, *connect_spec = NULL;
while ((opt = getopt(argc, argv, "hl:c:")) != -1) {
switch (opt) {
case 'l':
if (listen_spec)
FATAL("only specify -l once");
listen_spec = optarg;
break;
case 'c':
if (connect_spec)
FATAL("only specify -c once");
connect_spec = optarg;
break;
case 'h':
default:
err_exit("Usage: %s [-h] [-l listen] -c <connect>", argv[0]);
}
}
if (!connect_spec)
err_exit("Must specify -c; see `%s -h`", argv[0]);
// ignore SIGPIPE
struct sigaction sigpipe;
memset(&sigpipe, 0, sizeof(sigpipe));
sigpipe.sa_handler = SIG_IGN;
sigaction(SIGPIPE, &sigpipe, NULL);
// endpoints
struct config_endpoint listen_endpoint, connect_endpoint;
endpoint_init(&listen_endpoint, LISTEN_PORT);
endpoint_init(&connect_endpoint, 0);
if (
endpoint_parse(&listen_endpoint, listen_spec)
|| endpoint_parse(&connect_endpoint, connect_spec)
)
goto error;
// the listen socket
int listen_sock;
if ((listen_sock = socket_listen(&listen_endpoint, SOCK_STREAM)) == -1)
goto error;
// create the listen event
struct event listen_ev;
event_set(&listen_ev, listen_sock, EV_READ | EV_PERSIST, &handle_accept, &connect_endpoint);
if (event_add(&listen_ev, NULL))
PERROR("event_add");
// we shall now run
INFO("run");
// run the libevent mainloop
if (event_base_dispatch(ev_base))
WARNING("event_dispatch");
INFO("shutdown");
// cleanup
event_base_free(ev_base);
close(listen_sock);
return EXIT_SUCCESS;
error:
return EXIT_FAILURE;
}