node_main.c
changeset 23 31307efd7e78
parent 21 e2916f8ebaa6
child 24 8307d28329ae
--- a/node_main.c	Tue Jun 17 19:08:05 2008 +0300
+++ b/node_main.c	Thu Jun 26 01:32:56 2008 +0300
@@ -7,16 +7,19 @@
 #include <string.h>
 #include <unistd.h>
 
+#include <assert.h>
+
+#include <event2/event.h>
+#include <event2/event_struct.h>
+#include <event2/event_compat.h>
+#include <event2/bufferevent.h>
+
 #include "common.h"
 #include "render.h"
-#include "render_remote.h"  // for RENDER_PORT_NAME
-#include "render_local.h"
-#include "render_threads.h"
-
-void sigpipe_handler (int signal) {
-    /* ignore */
-    fprintf(stderr, "SIGPIPE\n");
-}
+#include "render_struct.h"
+#include "render_net.h"
+#include "render_thread.h"
+#include "render_thread_struct.h"
 
 void sigpipe_ignore () {
     struct sigaction sigpipe_action;
@@ -28,123 +31,211 @@
         perr_exit("sigaction");
 }
 
-int my_fread(FILE *fh, void *ptr, size_t size) {
-    int ret = fread(ptr, size, 1, fh);
+/*
+ * State needed to handle a client
+ */
+struct client_info {
+    // the client socket
+    evutil_socket_t socket;
     
-    if (ret == 0) {
-        error("EOF");
-        return 0;
+    // the read-a-command buffer
+    struct bufferevent *bufev;
 
-    } else if (ret != 1) {
-        perror("fread");
-        return 0;
+    // the write-a-mandelbrot stream
+    FILE *out_stream;
+
+    // the render_thread op
+    // thread_info.is_active is useful
+    struct render_thread thread_info;
+};
+
+static void client_free (struct client_info *ctx) {
+    // free the read-a-command buffer
+    if (ctx->bufev)
+        bufferevent_free(ctx->bufev);
+    
+    // cancel the render thread if needed
+    if (ctx->thread_info.is_active)
+        render_thread_cancel(&ctx->thread_info);
+    
+    // deinit it in any case
+    render_thread_deinit(&ctx->thread_info);
+
+    // close the write-a-mandelbrot stream, or just the socket
+    if (ctx->out_stream) {
+        if (fclose(ctx->out_stream))
+            PWARNING("fclose");
+
+    } else if (ctx->socket != -1) {
+        if (close(ctx->socket))
+            PWARNING("close");
+
     }
 
-    return 1;
-}
-
-int read_byte (FILE *fh, u_int8_t *byte) {
-    return my_fread(fh, byte, sizeof(*byte));
-}
-
-int read_int (FILE *fh, u_int32_t *i) {
-    if (!my_fread(fh, i, sizeof(*i)))
-        return 0;
-
-    *i = ntohl(*i);
-
-    return 1;
-}
-
-int read_double (FILE *fh, double *dbl) {
-    if (!my_fread(fh, dbl, sizeof(*dbl)))
-        return 0;
-
-    return 1;
+    // free the client info
+    free(ctx);
 }
 
-void handle_client (int sock) {
-    double duration;
-    struct render *ctx = NULL;
-    FILE *fh;
-    u_int8_t mode;
-    u_int32_t img_w, img_h;
-    double x1, y1, x2, y2;
+static void handle_render_done (struct render_thread *thread_info, void *arg) {
+    struct client_info *ctx = arg;
 
-    struct render_threads *threads_info = NULL;
-    
-    // open it as a FILE*
-    if (!(fh = fdopen(sock, "r+")))
+    // just free it, it takes care of closing it as well
+    client_free(ctx);
+}
+
+static int handle_render_cmd (struct client_info *ctx, struct render_cmd *cmd) {
+    // the render ctx...
+    struct render render_info;
+
+    // open it as a normal FILE*
+    if (!(ctx->out_stream = fdopen(ctx->socket, "w")))
         ERROR("fdopen");
-    
-    // read the parameters
+
+#if INFO_ENABLED
+    INFO("client [%p]: render [%ux%u] (%f, %f) -> (%f, %f)", ctx, cmd->img_w, cmd->img_h, cmd->x1, cmd->y1, cmd->x2, cmd->y2);
+#endif    
+
+    // set up the render_info
     if (
-            !read_byte(fh, &mode)
-         || !read_int(fh, &img_w)
-         || !read_int(fh, &img_h)
-         || !read_double(fh, &x1)
-         || !read_double(fh, &y1)
-         || !read_double(fh, &x2)
-         || !read_double(fh, &y2)
+            render_init(&render_info)
+         || render_set_mode(&render_info, cmd->mode)
+         || render_set_size(&render_info, cmd->img_w, cmd->img_h)
+         || render_region_raw(&render_info, cmd->x1, cmd->y1, cmd->x2, cmd->y2)
+         || render_io_stream(&render_info, ctx->out_stream)
     )
-        ERROR("read_{byte,int,double}");
-
-    printf("RENDER: [%ux%u] (%f, %f) -> (%f, %f): ... ", img_w, img_h, x1, y1, x2, y2);
-    fflush(stdout);
-
-    // set up the render_ctx
-    if (!(ctx = render_alloc()))
-        ERROR("render_alloc");
-
-    if (render_set_mode(ctx, mode))
-        ERROR("render_set_mode");
+        ERROR("render_*");
+    
+    // start the render thread
+    if (render_thread_init(&ctx->thread_info, &render_info, &handle_render_done, ctx))
+        ERROR("render_thread_init");
 
-    if (render_set_size(ctx, img_w, img_h))
-        ERROR("render_set_size");
-
-    if (render_region_raw(ctx, x1, y1, x2, y2))
-        ERROR("render_region_raw");
+    // ok, wait for it to complete
+    return 0;
 
-    if (render_io_stream(ctx, fh))
-        ERROR("render_io_stream");
+error:
+    // FAAAIL
+    return -1;
+}
+
+static void handle_read (struct bufferevent *bev, void *arg) {
+    struct client_info *ctx = arg;
+    struct render_cmd cmd;
+
+    // meh, just read it in
+    size_t len;
     
+    // we set a watermark, so this should hold true
+    assert(len = bufferevent_read(bev, &cmd, sizeof(cmd)) == sizeof(cmd));
 
-    // render threaded \o/
-    if (!(threads_info = render_threads_alloc(ctx)))
+    // fix the byte order
+    cmd.img_w = ntohl(cmd.img_w);
+    cmd.img_h = ntohl(cmd.img_h);
+
+    // handle it
+    if (handle_render_cmd(ctx, &cmd))
         goto error;
     
-    if (render_threads_wait(threads_info))
-        goto error;
+    // ok
+    return;
+
+error:
+    client_free(ctx);
+}
+
+static void handle_error (struct bufferevent *bev, short what, void *arg) {
+    struct client_info *ctx = arg;
     
-    printf("done!\n");
-/*
-    // render!
-    if (render_local(ctx, &duration))
-        ERROR("render_local");
+    // read-EOF
+    if ((what & (EVBUFFER_READ | EVBUFFER_EOF)) && ctx->thread_info.is_active) {
+        // this is fine, expected, and doesn't matter
+        return;
+    }
+    
+    PWARNING("eventbuffer: %s %s",
+        (what & EVBUFFER_READ) ? "read" : ((what & EVBUFFER_WRITE) ? "write" : "???"),
+        (what & EVBUFFER_EOF) ? "eof" : ((what & EVBUFFER_ERROR) ? "error" : ((what & EVBUFFER_TIMEOUT) ? "timeout" : "???"))
+    );
 
-    printf("time=%fs\n", duration);
-*/
+    client_free(ctx);
+}
+
+static void handle_accept (evutil_socket_t fd, short event, void *arg) {
+    struct client_info *ctx = NULL;
+
+    evutil_socket_t socket = -1;
+    struct sockaddr_storage addr;
+    socklen_t addr_len;
     
-    // fall through to just clean up normally
+    // arg is NULL and unused
+    (void) arg;
+    
+    // accept the connection
+    addr_len = sizeof(struct sockaddr_storage);
+
+    if ((socket = accept(fd, (struct sockaddr *) &addr, &addr_len)) == -1)
+        PERROR("accept");
+    
+#if INFO_ENABLED
+    assert(INET_ADDRSTRLEN < INET6_ADDRSTRLEN);
+
+    char addr_buf[INET6_ADDRSTRLEN];
+    const char *addr_str;
+    short nport;
+
+    if (addr.ss_family == AF_UNIX)
+        addr_str = "local";
+    else if (addr.ss_family == AF_INET || addr.ss_family == AF_INET6) {
+        const void *src;
+        
+        if (addr.ss_family == AF_INET) {
+            src = &(((struct sockaddr_in *) &addr)->sin_addr);
+            nport = ((struct sockaddr_in *) &addr)->sin_port;
+        } else {
+            src = &(((struct sockaddr_in6 *) &addr)->sin6_addr);
+            nport = ((struct sockaddr_in6 *) &addr)->sin6_port;
+        }
+
+        if (!(inet_ntop(addr.ss_family, src, addr_buf, sizeof(addr_buf))))
+            PERROR("inet_ntop");
+
+        addr_str = addr_buf;
+    }
+
+    INFO("ACCEPT: %s:%hu\n", addr_str, ntohs(nport));
+
+#endif
+
+    // alloc a new client_info
+    if (!(ctx = calloc(1, sizeof(*ctx))))
+        ERROR("calloc");
+    
+    // store the socket
+    ctx->socket = socket;
+
+    // then a bufferevent so that we can read in the command
+    if (!(ctx->bufev = bufferevent_new(ctx->socket, &handle_read, NULL, &handle_error, ctx)))
+        ERROR("bufferevent_new");
+    
+    // and enable it for read only
+    if (bufferevent_enable(ctx->bufev, EV_READ))
+        ERROR("bufferevent_enable");
+    
+    // set the watermark for receiving the render_cmd
+    bufferevent_setwatermark(ctx->bufev, EV_READ, sizeof(struct render_cmd), 0);
+    
+    // now we just wait for the cmd...
 
 error:
     if (ctx)
-        render_free(ctx);
-
-    if (threads_info)
-        render_threads_free(threads_info);
-
-    // close the FILE* and socket
-    fclose(fh);
-    
-    return;
+        client_free(ctx);
+    else if (socket != -1)
+        close(socket);
 }
 
-
 int main (int argc, char** argv) {
-    int ssock, sock;
+    struct event_base *ev_base;
+    int ssock;
     struct sockaddr_in addr;
-    socklen_t addr_len;
     
     // parse arguments
     int opt;
@@ -171,6 +262,10 @@
 
     if (!(port = atoi(port_name)))
         ERROR("invalid port: %s", port_name);
+    
+    // init libevent
+    if (!(ev_base = event_init()))
+        FATAL("event_init");
 
     // create the socket
     if ((ssock = socket(PF_INET, SOCK_STREAM, 0)) == -1)
@@ -186,26 +281,32 @@
     if (listen(ssock, 1) == -1)
         PERROR("listen");
     
+    // create the listen event
+    struct event listen_ev;
+
+    event_set(&listen_ev, ssock, EV_READ, &handle_accept, NULL);
+
+    if (event_add(&listen_ev, NULL))
+        PERROR("event_add");
+    
     // ignore sigpipe
     sigpipe_ignore();
     
-    // main accept loop
+    // run the libevent mainloop
     printf("RUN: %s:%hu\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
-
-    while (1) {
-        addr_len = sizeof(struct sockaddr_in);
+ 
+    if (event_base_dispatch(ev_base))
+        WARNING("event_dispatch");
 
-        // accept a new client
-        if ((sock = accept(ssock, (struct sockaddr *) &addr, &addr_len)) == -1)
-            PERROR("accept");
-        
-        printf("ACCEPT: %s:%hu\n", inet_ntoa(addr.sin_addr), addr.sin_port);
-        
-        // handle their resquest
-        handle_client(sock);
-    }
+    printf("SHUTDOWN\n");
+    
+    event_base_free(ev_base);
+
+    // succesful exit
+    return EXIT_SUCCESS;
 
 error:
-    return 1;
+    // failure
+    return EXIT_FAILURE;
 }