--- a/node_main.c Tue Jun 17 19:08:05 2008 +0300
+++ b/node_main.c Thu Jun 26 01:32:56 2008 +0300
@@ -7,16 +7,19 @@
#include <string.h>
#include <unistd.h>
+#include <assert.h>
+
+#include <event2/event.h>
+#include <event2/event_struct.h>
+#include <event2/event_compat.h>
+#include <event2/bufferevent.h>
+
#include "common.h"
#include "render.h"
-#include "render_remote.h" // for RENDER_PORT_NAME
-#include "render_local.h"
-#include "render_threads.h"
-
-void sigpipe_handler (int signal) {
- /* ignore */
- fprintf(stderr, "SIGPIPE\n");
-}
+#include "render_struct.h"
+#include "render_net.h"
+#include "render_thread.h"
+#include "render_thread_struct.h"
void sigpipe_ignore () {
struct sigaction sigpipe_action;
@@ -28,123 +31,211 @@
perr_exit("sigaction");
}
-int my_fread(FILE *fh, void *ptr, size_t size) {
- int ret = fread(ptr, size, 1, fh);
+/*
+ * State needed to handle a client
+ */
+struct client_info {
+ // the client socket
+ evutil_socket_t socket;
- if (ret == 0) {
- error("EOF");
- return 0;
+ // the read-a-command buffer
+ struct bufferevent *bufev;
- } else if (ret != 1) {
- perror("fread");
- return 0;
+ // the write-a-mandelbrot stream
+ FILE *out_stream;
+
+ // the render_thread op
+ // thread_info.is_active is useful
+ struct render_thread thread_info;
+};
+
+static void client_free (struct client_info *ctx) {
+ // free the read-a-command buffer
+ if (ctx->bufev)
+ bufferevent_free(ctx->bufev);
+
+ // cancel the render thread if needed
+ if (ctx->thread_info.is_active)
+ render_thread_cancel(&ctx->thread_info);
+
+ // deinit it in any case
+ render_thread_deinit(&ctx->thread_info);
+
+ // close the write-a-mandelbrot stream, or just the socket
+ if (ctx->out_stream) {
+ if (fclose(ctx->out_stream))
+ PWARNING("fclose");
+
+ } else if (ctx->socket != -1) {
+ if (close(ctx->socket))
+ PWARNING("close");
+
}
- return 1;
-}
-
-int read_byte (FILE *fh, u_int8_t *byte) {
- return my_fread(fh, byte, sizeof(*byte));
-}
-
-int read_int (FILE *fh, u_int32_t *i) {
- if (!my_fread(fh, i, sizeof(*i)))
- return 0;
-
- *i = ntohl(*i);
-
- return 1;
-}
-
-int read_double (FILE *fh, double *dbl) {
- if (!my_fread(fh, dbl, sizeof(*dbl)))
- return 0;
-
- return 1;
+ // free the client info
+ free(ctx);
}
-void handle_client (int sock) {
- double duration;
- struct render *ctx = NULL;
- FILE *fh;
- u_int8_t mode;
- u_int32_t img_w, img_h;
- double x1, y1, x2, y2;
+static void handle_render_done (struct render_thread *thread_info, void *arg) {
+ struct client_info *ctx = arg;
- struct render_threads *threads_info = NULL;
-
- // open it as a FILE*
- if (!(fh = fdopen(sock, "r+")))
+ // just free it, it takes care of closing it as well
+ client_free(ctx);
+}
+
+static int handle_render_cmd (struct client_info *ctx, struct render_cmd *cmd) {
+ // the render ctx...
+ struct render render_info;
+
+ // open it as a normal FILE*
+ if (!(ctx->out_stream = fdopen(ctx->socket, "w")))
ERROR("fdopen");
-
- // read the parameters
+
+#if INFO_ENABLED
+ INFO("client [%p]: render [%ux%u] (%f, %f) -> (%f, %f)", ctx, cmd->img_w, cmd->img_h, cmd->x1, cmd->y1, cmd->x2, cmd->y2);
+#endif
+
+ // set up the render_info
if (
- !read_byte(fh, &mode)
- || !read_int(fh, &img_w)
- || !read_int(fh, &img_h)
- || !read_double(fh, &x1)
- || !read_double(fh, &y1)
- || !read_double(fh, &x2)
- || !read_double(fh, &y2)
+ render_init(&render_info)
+ || render_set_mode(&render_info, cmd->mode)
+ || render_set_size(&render_info, cmd->img_w, cmd->img_h)
+ || render_region_raw(&render_info, cmd->x1, cmd->y1, cmd->x2, cmd->y2)
+ || render_io_stream(&render_info, ctx->out_stream)
)
- ERROR("read_{byte,int,double}");
-
- printf("RENDER: [%ux%u] (%f, %f) -> (%f, %f): ... ", img_w, img_h, x1, y1, x2, y2);
- fflush(stdout);
-
- // set up the render_ctx
- if (!(ctx = render_alloc()))
- ERROR("render_alloc");
-
- if (render_set_mode(ctx, mode))
- ERROR("render_set_mode");
+ ERROR("render_*");
+
+ // start the render thread
+ if (render_thread_init(&ctx->thread_info, &render_info, &handle_render_done, ctx))
+ ERROR("render_thread_init");
- if (render_set_size(ctx, img_w, img_h))
- ERROR("render_set_size");
-
- if (render_region_raw(ctx, x1, y1, x2, y2))
- ERROR("render_region_raw");
+ // ok, wait for it to complete
+ return 0;
- if (render_io_stream(ctx, fh))
- ERROR("render_io_stream");
+error:
+ // FAAAIL
+ return -1;
+}
+
+static void handle_read (struct bufferevent *bev, void *arg) {
+ struct client_info *ctx = arg;
+ struct render_cmd cmd;
+
+ // meh, just read it in
+ size_t len;
+ // we set a watermark, so this should hold true
+ assert(len = bufferevent_read(bev, &cmd, sizeof(cmd)) == sizeof(cmd));
- // render threaded \o/
- if (!(threads_info = render_threads_alloc(ctx)))
+ // fix the byte order
+ cmd.img_w = ntohl(cmd.img_w);
+ cmd.img_h = ntohl(cmd.img_h);
+
+ // handle it
+ if (handle_render_cmd(ctx, &cmd))
goto error;
- if (render_threads_wait(threads_info))
- goto error;
+ // ok
+ return;
+
+error:
+ client_free(ctx);
+}
+
+static void handle_error (struct bufferevent *bev, short what, void *arg) {
+ struct client_info *ctx = arg;
- printf("done!\n");
-/*
- // render!
- if (render_local(ctx, &duration))
- ERROR("render_local");
+ // read-EOF
+ if ((what & (EVBUFFER_READ | EVBUFFER_EOF)) && ctx->thread_info.is_active) {
+ // this is fine, expected, and doesn't matter
+ return;
+ }
+
+ PWARNING("eventbuffer: %s %s",
+ (what & EVBUFFER_READ) ? "read" : ((what & EVBUFFER_WRITE) ? "write" : "???"),
+ (what & EVBUFFER_EOF) ? "eof" : ((what & EVBUFFER_ERROR) ? "error" : ((what & EVBUFFER_TIMEOUT) ? "timeout" : "???"))
+ );
- printf("time=%fs\n", duration);
-*/
+ client_free(ctx);
+}
+
+static void handle_accept (evutil_socket_t fd, short event, void *arg) {
+ struct client_info *ctx = NULL;
+
+ evutil_socket_t socket = -1;
+ struct sockaddr_storage addr;
+ socklen_t addr_len;
- // fall through to just clean up normally
+ // arg is NULL and unused
+ (void) arg;
+
+ // accept the connection
+ addr_len = sizeof(struct sockaddr_storage);
+
+ if ((socket = accept(fd, (struct sockaddr *) &addr, &addr_len)) == -1)
+ PERROR("accept");
+
+#if INFO_ENABLED
+ assert(INET_ADDRSTRLEN < INET6_ADDRSTRLEN);
+
+ char addr_buf[INET6_ADDRSTRLEN];
+ const char *addr_str;
+ short nport;
+
+ if (addr.ss_family == AF_UNIX)
+ addr_str = "local";
+ else if (addr.ss_family == AF_INET || addr.ss_family == AF_INET6) {
+ const void *src;
+
+ if (addr.ss_family == AF_INET) {
+ src = &(((struct sockaddr_in *) &addr)->sin_addr);
+ nport = ((struct sockaddr_in *) &addr)->sin_port;
+ } else {
+ src = &(((struct sockaddr_in6 *) &addr)->sin6_addr);
+ nport = ((struct sockaddr_in6 *) &addr)->sin6_port;
+ }
+
+ if (!(inet_ntop(addr.ss_family, src, addr_buf, sizeof(addr_buf))))
+ PERROR("inet_ntop");
+
+ addr_str = addr_buf;
+ }
+
+ INFO("ACCEPT: %s:%hu\n", addr_str, ntohs(nport));
+
+#endif
+
+ // alloc a new client_info
+ if (!(ctx = calloc(1, sizeof(*ctx))))
+ ERROR("calloc");
+
+ // store the socket
+ ctx->socket = socket;
+
+ // then a bufferevent so that we can read in the command
+ if (!(ctx->bufev = bufferevent_new(ctx->socket, &handle_read, NULL, &handle_error, ctx)))
+ ERROR("bufferevent_new");
+
+ // and enable it for read only
+ if (bufferevent_enable(ctx->bufev, EV_READ))
+ ERROR("bufferevent_enable");
+
+ // set the watermark for receiving the render_cmd
+ bufferevent_setwatermark(ctx->bufev, EV_READ, sizeof(struct render_cmd), 0);
+
+ // now we just wait for the cmd...
error:
if (ctx)
- render_free(ctx);
-
- if (threads_info)
- render_threads_free(threads_info);
-
- // close the FILE* and socket
- fclose(fh);
-
- return;
+ client_free(ctx);
+ else if (socket != -1)
+ close(socket);
}
-
int main (int argc, char** argv) {
- int ssock, sock;
+ struct event_base *ev_base;
+ int ssock;
struct sockaddr_in addr;
- socklen_t addr_len;
// parse arguments
int opt;
@@ -171,6 +262,10 @@
if (!(port = atoi(port_name)))
ERROR("invalid port: %s", port_name);
+
+ // init libevent
+ if (!(ev_base = event_init()))
+ FATAL("event_init");
// create the socket
if ((ssock = socket(PF_INET, SOCK_STREAM, 0)) == -1)
@@ -186,26 +281,32 @@
if (listen(ssock, 1) == -1)
PERROR("listen");
+ // create the listen event
+ struct event listen_ev;
+
+ event_set(&listen_ev, ssock, EV_READ, &handle_accept, NULL);
+
+ if (event_add(&listen_ev, NULL))
+ PERROR("event_add");
+
// ignore sigpipe
sigpipe_ignore();
- // main accept loop
+ // run the libevent mainloop
printf("RUN: %s:%hu\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
-
- while (1) {
- addr_len = sizeof(struct sockaddr_in);
+
+ if (event_base_dispatch(ev_base))
+ WARNING("event_dispatch");
- // accept a new client
- if ((sock = accept(ssock, (struct sockaddr *) &addr, &addr_len)) == -1)
- PERROR("accept");
-
- printf("ACCEPT: %s:%hu\n", inet_ntoa(addr.sin_addr), addr.sin_port);
-
- // handle their resquest
- handle_client(sock);
- }
+ printf("SHUTDOWN\n");
+
+ event_base_free(ev_base);
+
+ // succesful exit
+ return EXIT_SUCCESS;
error:
- return 1;
+ // failure
+ return EXIT_FAILURE;
}