req/write/push implemented
authorTero Marttila <terom@fixme.fi>
Fri, 08 Aug 2008 00:15:29 +0300
changeset 31 12d5361e7472
parent 30 33e464fd6773
child 32 1b09dad6757e
req/write/push implemented
Makefile
cache.h
cache/cache.c
cache/engine.h
cache/engines/fs.c
cache/op.c
cache/op.h
cache/req.c
cache/req.h
cache_engines.h
--- a/Makefile	Thu Aug 07 20:28:06 2008 +0300
+++ b/Makefile	Fri Aug 08 00:15:29 2008 +0300
@@ -1,38 +1,106 @@
-LDFLAGS = -Llib/libevent-dev/lib -levent -lpng -pthread
-CFLAGS = -Wall -g -Ilib/libevent-dev/include -std=gnu99
-
-EXECS = file_main web_main node_main
-
-all: web_main file_main node_main
-
-common.o: common.c common.h
-http.o: http.c http.h common.h
-config.o: config.c config.h
-socket.o: socket.c socket.h config.h common.h
+LIBEVENT_HEADERS = lib/libevent-dev/include
+LIBEVENT_LIBRARY = lib/libevent-dev/lib
+DEFINES = 
 
-remote_node.o: remote_node.c remote_node.h common.h render_net.h socket.h config.h
-remote_pool.o: remote_pool.c remote_pool.h common.h
-render.o: render.c render.h common.h
-render_remote.o: render_remote.c render_remote.h common.h socket.h remote_node.h remote_pool.h
-render_png.o: render_png.c render_png.h common.h
-render_raw.o: render_raw.c render_raw.h common.h
-render_local.o: render_local.c render_local.h common.h render_png.h render_raw.h render_mandelbrot.h
-render_multi.o: render_multi.c render_multi.h render_remote.o common.h
-render_mandelbrot.o: render_mandelbrot.c render_mandelbrot.h common.h
-render_slices.o: render_slices.c render_slices.h common.h
-render_threads.o: render_threads.c render_threads.h common.h
-render_thread.o: render_thread.c render_thread.h common.h render_local.h
-tile.o: tile.c tile.h render.h render_struct.h
-static.o: static.c static.h common.h
+LDFLAGS = -L${LIBEVENT_LIBRARY} -levent -lpng -pthread -lpcl
+CFLAGS = -Wall -g -I${LIBEVENT_HEADERS} ${DEFINES} -std=gnu99
 
-file_main.o: file_main.c
-node_main.o: node_main.c common.h render.h render_struct.h render_thread.h render_thread_struct.h render_net.h socket.h
-web_main.o: web_main.c common.h render.h render_struct.h remote_node.h remote_pool.h render_remote.h config.h
+SRCS = *.c cache/*.c cache/engines/*.c
+OBJS = *.o
+EXECS = file_main web_main node_main cache_test
+
+all: depend ${EXECS}
 
 file_main: file_main.o common.o render.o render_raw.o render_png.o render_local.o render_mandelbrot.o
 node_main: node_main.o common.o config.o socket.o render.o render_thread.o render_local.o render_png.o render_raw.o render_mandelbrot.o
-web_main: web_main.o common.o config.o socket.o http.o render.o render_png.o remote_node.o remote_pool.o render_remote.o render_multi.o render_slices.o tile.o static.o
+web_main: web_main.o common.o config.o socket.o http.o render.o remote_node.o remote_pool.o render_remote.o tile.o static.o
+coro_test: coro_test.o common.o config.o socket.o
+cache_test: cache_test.o common.o cache/cache.o cache/req.o cache/op.o cache/engines/fs.o
 
 clean :
-	rm *.o ${EXECS}
+	-rm *.o ${EXECS}
 
+depend:
+	makedepend -Y -- $(CFLAGS) -- $(SRCS) 2> /dev/null
+
+# DO NOT DELETE
+
+cache_test.o: cache.h cache_engines.h common.h
+common.o: common.h
+config.o: config.h common.h
+coro_test.o: lib/libevent-dev/include/event2/util.h
+coro_test.o: lib/libevent-dev/include/event-config.h
+coro_test.o: lib/libevent-dev/include/event2/event.h
+coro_test.o: lib/libevent-dev/include/event2/event_compat.h
+coro_test.o: lib/libevent-dev/include/event2/event_struct.h common.h config.h
+coro_test.o: socket.h
+file_main.o: common.h render.h render_local.h render_png_struct.h
+file_main.o: render_png.h render_raw_struct.h render_raw.h
+http.o: lib/libevent-dev/include/event2/event_struct.h
+http.o: lib/libevent-dev/include/event-config.h
+http.o: lib/libevent-dev/include/event2/util.h http.h
+http.o: lib/libevent-dev/include/event2/http.h common.h
+node_main.o: lib/libevent-dev/include/event2/event.h
+node_main.o: lib/libevent-dev/include/event-config.h
+node_main.o: lib/libevent-dev/include/event2/util.h
+node_main.o: lib/libevent-dev/include/event2/event_struct.h
+node_main.o: lib/libevent-dev/include/event2/event_compat.h
+node_main.o: lib/libevent-dev/include/event2/bufferevent.h common.h socket.h
+node_main.o: config.h render.h render_struct.h render_net.h render_thread.h
+node_main.o: render_thread_struct.h
+remote_node.o: remote_node.h config.h common.h render_net.h socket.h
+remote_pool.o: remote_pool.h remote_node.h config.h common.h
+render.o: render_struct.h render.h
+render_local.o: common.h render_struct.h render.h render_local.h
+render_local.o: render_png_struct.h render_png.h render_raw_struct.h
+render_local.o: render_raw.h render_mandelbrot.h
+render_mandelbrot.o: common.h render_struct.h render.h render_mandelbrot.h
+render_multi.o: common.h render_struct.h render.h render_multi.h
+render_multi.o: lib/libevent-dev/include/event2/util.h
+render_multi.o: lib/libevent-dev/include/event-config.h
+render_multi.o: lib/libevent-dev/include/event2/buffer.h remote_pool.h
+render_multi.o: remote_node.h config.h render_remote.h render_slices_struct.h
+render_multi.o: render_png_struct.h render_png.h render_slices.h
+render_png.o: common.h render_struct.h render.h render_png_struct.h
+render_png.o: render_png.h
+render_raw.o: common.h render_struct.h render.h render_raw_struct.h
+render_raw.o: render_raw.h
+render_remote.o: lib/libevent-dev/include/event2/event.h
+render_remote.o: lib/libevent-dev/include/event-config.h
+render_remote.o: lib/libevent-dev/include/event2/util.h
+render_remote.o: lib/libevent-dev/include/event2/event_struct.h
+render_remote.o: lib/libevent-dev/include/event2/bufferevent.h
+render_remote.o: render_remote.h lib/libevent-dev/include/event2/buffer.h
+render_remote.o: render.h remote_pool.h remote_node.h config.h common.h
+render_remote.o: render_struct.h render_net.h socket.h
+render_slices.o: common.h render_struct.h render.h render_slices_struct.h
+render_slices.o: render_png_struct.h render_png.h render_slices.h
+render_thread.o: lib/libevent-dev/include/event2/event.h
+render_thread.o: lib/libevent-dev/include/event-config.h
+render_thread.o: lib/libevent-dev/include/event2/util.h common.h
+render_thread.o: render_thread.h render.h render_thread_struct.h
+render_thread.o: lib/libevent-dev/include/event2/event_struct.h
+render_thread.o: render_struct.h render_local.h render_png_struct.h
+render_thread.o: render_png.h render_raw_struct.h render_raw.h
+render_threads.o: common.h render_threads.h render.h render_slices_struct.h
+render_threads.o: render_struct.h render_png_struct.h render_png.h
+render_threads.o: render_slices.h render_mandelbrot.h
+socket.o: socket.h config.h common.h
+static.o: lib/libevent-dev/include/event2/http.h
+static.o: lib/libevent-dev/include/event2/util.h
+static.o: lib/libevent-dev/include/event-config.h
+static.o: lib/libevent-dev/include/event2/buffer.h static.h common.h
+tile.o: tile.h render.h render_struct.h
+web_main.o: lib/libevent-dev/include/event2/event.h
+web_main.o: lib/libevent-dev/include/event-config.h
+web_main.o: lib/libevent-dev/include/event2/util.h
+web_main.o: lib/libevent-dev/include/event2/event_compat.h
+web_main.o: lib/libevent-dev/include/event2/http.h
+web_main.o: lib/libevent-dev/include/event2/event_struct.h common.h http.h
+web_main.o: render_struct.h render.h remote_node.h config.h remote_pool.h
+web_main.o: render_remote.h lib/libevent-dev/include/event2/buffer.h tile.h
+web_main.o: static.h
+cache/cache.o: cache.h cache.h cache/engine.h
+cache/op.o: cache.h cache.h cache/op.h cache/req.h cache/engine.h common.h
+cache/req.o: cache.h cache.h cache/req.h cache/op.h cache/engine.h common.h
+cache/engines/fs.o: cache.h cache.h cache/engine.h cache/op.h common.h
--- a/cache.h	Thu Aug 07 20:28:06 2008 +0300
+++ b/cache.h	Fri Aug 08 00:15:29 2008 +0300
@@ -1,6 +1,8 @@
 #ifndef CACHE_H
 #define CACHE_H
 
+#include <sys/types.h>
+
 /*
  * The interface to the internal caching mechanism.
  *
@@ -29,8 +31,8 @@
  * What we use as keys in the cache. Key is a pointer to an arbitrary char buffer, length is the size of the key
  * in bytes. If this is given as zero, it will be calcuated using strlen(). Zero-length keys are invalid.
  */
-struct cache_key_t {
-    const char *buf;
+struct cache_key {
+    char *buf;
     size_t length;
 };
 
@@ -41,9 +43,8 @@
     CACHE_STATE_INVALID,
 
     CACHE_STATE_LOOKUP,
-    
-    CACHE_STATE_OPEN,
-
+    CACHE_STATE_WRITE_BEGIN,
+    CACHE_STATE_READ_BEGIN,
     CACHE_STATE_WRITE,
     CACHE_STATE_WRITE_PAUSE,
     CACHE_STATE_READ,
@@ -53,13 +54,14 @@
 };
 
 /*
+/ *
  * Transitions between states
- */
+ * /
 enum cache_req_event {
     // LOOKUP -> OPEN
     CACHE_EVENT_HIT,
     CACHE_EVENT_MISS,
-    
+
     // OPEN -> WRITE
     CACHE_EVENT_BEGIN_WRITE,
     
@@ -81,11 +83,13 @@
     // * -> ERROR
     CACHE_EVENT_ERROR,
 };
+*/
 
 /*
  * The callback used for cache_reqs
  */
-typedef (int) (*cache_callback) (struct cache_req *, enum cache_req_event, void *arg);
+//typedef int (*cache_callback) (struct cache_req *, enum cache_req_event, void *arg);
+typedef int (*cache_callback) (struct cache_req *, void *arg);
 
 
 
@@ -101,12 +105,17 @@
  * Create a new request. The given callback function will be called at the various stages in the request, and can then
  * drive the request forward.
  */
-struct cache_req *cache_request (struct cache *cache, struct cache_key *key, cache_callback cb_func, void *cb_data);
+struct cache_req *cache_req (struct cache *cache, const struct cache_key *key, cache_callback cb_func, void *cb_data);
 
 /*
  * Get the request's state.
  */ 
-enum cache_req_state cache_request_state (struct cache_req *req);
+enum cache_req_state cache_req_state (struct cache_req *req);
+
+/*
+ * Get the rquest's key
+ */
+const struct cache_key *cache_req_key (struct cache_req *req);
 
 /*
  * Get information about the amount of data in this cache entry.
@@ -117,6 +126,11 @@
 void cache_req_available (struct cache_req *req, ssize_t *size, ssize_t *offset, ssize_t *available);
 
 /*
+ * Prepare this cache req for writing in the data. Hint, if nonzero, is used to pre-allocate resources for the entry.
+ */
+int cache_req_begin_write(struct cache_req *req, size_t hint);
+
+/*
  * Add some data into this cache entry, reading from the given fd. This is only valid for cache_req's in
  * CACHE_REQ_WRITE mode.
  *
--- a/cache/cache.c	Thu Aug 07 20:28:06 2008 +0300
+++ b/cache/cache.c	Fri Aug 08 00:15:29 2008 +0300
@@ -1,6 +1,8 @@
+#include <stdlib.h>
 
 #include "../cache.h"
 #include "cache.h"
+#include "engine.h"
 
 struct cache *cache_open (struct cache_engine *engine) {
     struct cache *cache;
--- a/cache/engine.h	Thu Aug 07 20:28:06 2008 +0300
+++ b/cache/engine.h	Fri Aug 08 00:15:29 2008 +0300
@@ -5,14 +5,23 @@
     /*
      * Allocate a `struct cache`-compatible struct and return it via cache_ptr.
      */
-    int (fn_init*) (struct cache_engine *engine, struct cache **cache_ptr);
+    int (*fn_init) (struct cache_engine *, struct cache **);
     
     /*
      * Allocate a `struct cache_op`-compatible struct and return it via cache_ptr.
      *
      * Begin the index lookup.
      */
-    int (fn_op_start*) (struct cache *cache, struct cache_op **op_ptr, struct cache_key *key);
+    int (*fn_op_start) (struct cache *, struct cache_op **, struct cache_key *);
+
+    /*
+     * Prepare to write and possibly read this cache entry.
+     *
+     * size_hint, if nonzero, provides a guess at the size of the cache entry that can be used to optimize stuff
+     */
+    int (*fn_op_begin_write) (struct cache_op *, size_t size_hint);
+
+    int (*fn_op_push) (struct cache_op *op, int fd, size_t *size);
 };
 
 #endif /* CACHE_ENGINE_H */
--- a/cache/engines/fs.c	Thu Aug 07 20:28:06 2008 +0300
+++ b/cache/engines/fs.c	Fri Aug 08 00:15:29 2008 +0300
@@ -1,11 +1,24 @@
+#define _GNU_SOURCE
+#include <sys/types.h>
+#include <unistd.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <stdlib.h>
+#include <sys/param.h>
+#include <stdio.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <assert.h>
 
+#include "../../cache.h"
 #include "../cache.h"
 #include "../engine.h"
 #include "../op.h"
 #include "../../common.h"
 
-#define FS_BLOCK_START 4096
-#define FS_BLOCK_GROW_FACTOR 2
+#define FS_PAGE_SIZE (4096)
+#define FS_INITIAL_SIZE (1 * FS_PAGE_SIZE)
+#define FS_PAGE_GROW_FACTOR (2)
 
 struct cache_engine_fs {
     struct cache_engine base;
@@ -29,26 +42,36 @@
     off_t file_size;
 
     void *mmap;
+
+    off_t write_offset;
 };
 
-static int _fs_mmap (struct cache_op_fs *op, int grow) {
+/*
+ * if new_size is equal to op->file_size, nothing will be changed
+ */
+static int _fs_mmap (struct cache_op_fs *op, size_t new_size) {
     off_t old_size = op->file_size;
 
-    if (op->file_size == 0 || grow) {
-        // update size and ftruncate
-        op->file_size = op->file_size ? op->file_size * 2 : FS_INITIAL_SIZE;
+    assert(new_size > 0);
 
-        if (ftruncate(ctx->fd, ctx->file_size))
+    if (new_size != old_size) {
+        // calc new size
+        op->file_size = new_size;
+        
+        // and ftruncate
+        if (ftruncate(op->fd, op->file_size))
             PERROR("ftruncate");
     }
     
     if (op->mmap) {
+        assert(old_size > 0);
+
         // XXX: test
-        if ((ctx->mmap = mremap(ctx->mmap, old_size, ctx->file_size, MREMAP_MAYMOVE)) == MAP_FAILED)
+        if ((op->mmap = mremap(op->mmap, old_size, op->file_size, MREMAP_MAYMOVE)) == MAP_FAILED)
             PERROR("mremap");
 
     } else {
-        if ((ctx->mmap = mmap(NULL, ctx->file_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, ctx->fd, 0)) == MAP_FAILED)
+        if ((op->mmap = mmap(NULL, op->file_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, op->fd, 0)) == MAP_FAILED)
             PERROR("mmap");
 
     }
@@ -59,6 +82,41 @@
     return -1;
 }
 
+/*
+ * reuturn a pointer to a static buf containing the path to the key in the given op
+ */
+static const char *_fs_path (struct cache_engine_fs *engine, struct cache_op_fs *op) {
+    static char path[PATH_MAX];
+
+    struct cache_key *key = op->base.key;
+
+    // construct the path to the appropriate file
+    if (snprintf(path, PATH_MAX, "%s/%*s", engine->cache_dir, (int) key->length - 1, key->buf) >= PATH_MAX)
+        ERROR("path too long: %s/%*s", engine->cache_dir, (int) key->length - 1, key->buf);
+    
+    return path;
+
+error:
+    return NULL;
+}
+ 
+/*
+ * Grow the file if needed so that it fits the given amount of bytes
+ */
+static int _fs_grow (struct cache_op_fs *op, size_t new_size_hint) {
+    if (op->file_size >= new_size_hint)
+        return 0;
+    
+    // XXX: need some math.ceil
+    size_t new_size = ((op->file_size / FS_PAGE_SIZE) + 1) * FS_PAGE_SIZE;
+
+    while (new_size < new_size_hint) {
+        new_size *= FS_PAGE_GROW_FACTOR;
+    }
+
+    return _fs_mmap(op, new_size);
+}
+
 static int _fs_do_init (struct cache_engine *engine, struct cache **cache_ptr) {
     struct cache_engine_fs *ctx = (struct cache_engine_fs *) engine;
     struct cache_fs *cache = NULL;
@@ -81,11 +139,11 @@
 }
     
 static int _fs_do_op_start (struct cache *cache, struct cache_op **op_ptr, struct cache_key *key) {
-    struct cache_engine_fs *ctx = (struct cache_engine_fs *) engine;
+    struct cache_engine_fs *engine = (struct cache_engine_fs *) cache->engine;
     struct cache_op_fs *op = NULL;
-    char path[PATH_MAX];
-    struct stat stat;
+    struct stat stat_info;
     int found = 0;
+    const char *path;
     
     // allocate it
     if ((op = calloc(1, sizeof(*op))) == NULL)
@@ -95,48 +153,143 @@
     if (cache_op_init(&op->base, cache, key))
         goto error;
     
-    // mark it as being in the lookup state... shouldn't even be needed, as this is sync
+    // mark it as being in the lookup state...
     op->base.state = OP_STATE_LOOKUP;
     
-    // construct the path to the appropriate file
-    if (snprintf(path, PATH_MAX, "%s/%s", ctx->cache_dir, key->key) >= PATH_MAX)
-        ERROR("path too long: %s/%s", ctx->cache_dir, key->key);
-    
-    // open the appropriate file
-    if ((op->fd = open(path, O_CREAT | O_RDWR, 0644)) == -1)
-        PERROR("open: %s", path);
-    
-    // stat for filesize
-    if (fstat(op->fd, &stat))
-        PERROR("fstat");
-    
-    op->file_size = stat.st_size;
-    
-    // size == 0 -> new file -> not found
-    found = (size > 0);
+    // fetch the path
+    if ((path = _fs_path(engine, op)) == NULL)
+        goto error;
 
-    // grow if needed, and then mmap
-    if (_fs_mmap(op, 0))
-        goto error;
-    
+    // stat
+    if (stat(path, &stat_info)) {
+        if (errno == ENOENT)
+            found = 0;
+        else
+            PERROR("stat: %s", path);
+    } else
+        found = 1;
+
     // indicate that the key was found/not found
     if (cache_op_lookup_done(&op->base, found))
         goto error;
     
+    *op_ptr = &op->base;
+
     // done!
     return 0;
 
 error:
-    free(cache);
+    free(op);
 
     return -1;
    
 }
 
-struct cache_engine_fs *cache_engine_fs (const char *cache_dir) {
+int _fs_do_op_begin_write (struct cache_op *op_base, size_t size_hint) {
+    struct cache_op_fs *op = (struct cache_op_fs *) op_base;
+    struct cache_engine_fs *engine = (struct cache_engine_fs *) op->base.cache->engine;
+    
+    const char *path;
+
+    assert(size_hint >= 0);
+
+    // should still just be calloc'd
+    assert(op->file_size == 0);
+
+    // fetch the path
+    if ((path = _fs_path(engine, op)) == NULL)
+        goto error;
+
+    // create the appropriate file for read-write, exclusively
+    if ((op->fd = open(path, O_CREAT | O_RDWR | O_EXCL, 0644)) == -1)
+        PERROR("open: %s", path);
+    
+    // ftruncate, and then mmap
+    if (_fs_mmap(op, size_hint ? size_hint : FS_INITIAL_SIZE))
+        goto error;
+    
+    // great
+    if (cache_op_write_ready(&op->base))
+        goto error;
+
+    // done
+    return 0;
+
+error:
+    return -1;
+}
+
+int _fs_do_op_push (struct cache_op *op_base, int fd, size_t *size_ptr) {
+    struct cache_op_fs *op = (struct cache_op_fs *) op_base;
+    struct cache_engine_fs *engine = (struct cache_engine_fs *) op->base.cache->engine;
+
+    size_t ret, size;
+
+    assert(op->fd > 0);
+    assert(op->file_size > 0);
+    assert(op->mmap != NULL);
+    assert(op->write_offset <= op->file_size);
+
+    // default size if none specified
+    if (*size_ptr == 0)
+        size = FS_INITIAL_SIZE;
+    else
+        size = *size_ptr;
+
+    // grow the file if needed
+    if (_fs_grow(op, op->write_offset + size))
+        goto error;
+    
+    // read the data into the mmap'd region
+    if ((ret = read(fd, op->mmap + op->write_offset, size)) == -1)
+        // XXX: EAGAIN
+        PERROR("read");
+    
+    // move the write offset along
+    op->write_offset += ret;
+
+    // return something
+    *size_ptr = ret;
+
+    // notify newly available data
+    if (cache_op_data_available(&op->base))
+        goto error;
+
+    // great
+    return 0;
+
+error:
+    return -1;
+}
+
+int _fs_do_op_done (struct cache_op *op_base) {
+    struct cache_op_fs *op = (struct cache_op_fs *) op_base;
+    struct cache_engine_fs *engine = (struct cache_engine_fs *) op->base.cache->engine;
+
+    assert(op->fd > 0);
+    assert(op->file_size > 0);
+    assert(op->mmap != NULL);
+    assert(op->write_offset <= op->file_size);
+    
+    // truncate to match data length
+    if (_fs_mmap(op, op->write_offset))
+        goto error;
+
+    // notify that data is complete
+    if (cache_op_write_done(&op->base))
+        goto error;
+
+    // great
+    return 0;
+
+error:
+    return -1;
+
+}
+
+struct cache_engine *cache_engine_fs (const char *cache_dir) {
     struct cache_engine_fs *ctx = NULL;
 
-
     if ((ctx = calloc(1, sizeof(*ctx))) == NULL)
         ERROR("calloc");
 
@@ -145,12 +298,14 @@
     // set up the fn table
     ctx->base.fn_init = &_fs_do_init;
     ctx->base.fn_op_start = &_fs_do_op_start;
-
-    return 0;
+    ctx->base.fn_op_begin_write = &_fs_do_op_begin_write;
+    ctx->base.fn_op_push = &_fs_do_op_push;
+    
+    return &ctx->base;
 
 error:
     free(ctx);
 
-    return -1;
+    return NULL;
 }
 
--- a/cache/op.c	Thu Aug 07 20:28:06 2008 +0300
+++ b/cache/op.c	Fri Aug 08 00:15:29 2008 +0300
@@ -1,8 +1,13 @@
-
+#include <stdlib.h>
 #include <sys/queue.h>
 #include <string.h>
+#include <assert.h>
 
+#include "../cache.h"
+#include "cache.h"
 #include "op.h"
+#include "req.h"
+#include "engine.h"
 #include "../common.h"
 
 int cache_op_init(struct cache_op *op, struct cache *cache, struct cache_key *key) {
@@ -11,30 +16,34 @@
     op->state = OP_STATE_INVALID;
 
     LIST_INIT(&op->req_list);
+
+    // add this to the cache's list of ops
+    LIST_INSERT_HEAD(&cache->op_list, op, node);
+
+    return 0;
 }
 
 struct cache_op *cache_op_find (struct cache *cache, struct cache_key *key) {
     struct cache_op *op;
 
-    for (op = cache->op_list.lh_first, op != NULL; op = op->node.le_next) {
-        if (op->key->length == key->length && memcmp(op->key->buf, key->buf, key->length))
+    for (op = cache->op_list.lh_first; op != NULL; op = op->node.le_next) {
+        if (op->key->length == key->length && memcmp(op->key->buf, key->buf, key->length) == 0)
             break;
     }
 
-    if (op)
-        cache_op_incref(op);
-
     return op;
 }
 
 int cache_op_register (struct cache_op *op, struct cache_req *req) {
     LIST_INSERT_HEAD(&op->req_list, req, node);
+
+    return 0;
 }
 
 static int _cache_op_notify (struct cache_op *op) {
     struct cache_req *req;
 
-    for (req = op->req_list.lh_first, req != NULL; req = req->node.le_next) {
+    for (req = op->req_list.lh_first; req != NULL; req = req->node.le_next) {
         if (cache_req_notify(req))
             goto error;
     }
@@ -45,6 +54,14 @@
     return -1;
 }
 
+int cache_op_begin_write (struct cache_op *op, size_t size_hint) {
+    return op->cache->engine->fn_op_begin_write(op, size_hint);
+}
+
+int cache_op_push (struct cache_op *op, int fd, size_t *size) {
+    return op->cache->engine->fn_op_push(op, fd, size);
+}
+
 int cache_op_lookup_done (struct cache_op *op, int found) {
     // modify state
     op->state = found ? OP_STATE_HIT : OP_STATE_MISS;
@@ -53,3 +70,23 @@
     return _cache_op_notify(op);
 }
 
+int cache_op_write_ready (struct cache_op *op) {
+    // modify state
+    op->state = OP_STATE_WRITE;
+
+    // notify waiting reqs
+    return _cache_op_notify(op);
+}
+
+int cache_op_data_available (struct cache_op *op) {
+    // notify waiting reqs
+    return _cache_op_notify(op);
+}
+
+int cache_op_write_done (struct cache_op *op) {
+    op->state = OP_STATE_DONE;
+
+    // notify waiting reqs
+    return _cache_op_notify(op);
+}
+
--- a/cache/op.h	Thu Aug 07 20:28:06 2008 +0300
+++ b/cache/op.h	Fri Aug 08 00:15:29 2008 +0300
@@ -1,12 +1,16 @@
 #ifndef CACHE_OP_H
 #define CACHE_OP_H
 
+#include <sys/queue.h>
+
 enum cache_op_state {
     OP_STATE_INVALID,
     OP_STATE_LOOKUP,
 
     OP_STATE_MISS,
     OP_STATE_HIT,
+
+    OP_STATE_WRITE,
 };
 
 struct cache_op {
@@ -38,8 +42,23 @@
 int cache_op_register (struct cache_op *op, struct cache_req *req);
 
 /*
+ * Prepare op for writing data to it, size_hint can be used to preallocate resources
+ */
+int cache_op_begin_write (struct cache_op *op, size_t size_hint);
+
+int cache_op_push (struct cache_op *op, int fd, size_t *size);
+
+/*
  * Used by the engines to notify that the key lookup completed
  */
 int cache_op_lookup_done (struct cache_op *op, int found);
 
+/*
+ * in OP_STATE_WRITE
+ */
+int cache_op_write_ready (struct cache_op *op);
+
+int cache_op_data_available (struct cache_op *op);
+int cache_op_write_done (struct cache_op *op);
+
 #endif /* CACHE_OP_H */
--- a/cache/req.c	Thu Aug 07 20:28:06 2008 +0300
+++ b/cache/req.c	Fri Aug 08 00:15:29 2008 +0300
@@ -1,10 +1,20 @@
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
 
 #include "../cache.h"
+#include "cache.h"
 #include "req.h"
 #include "op.h"
-#include "common.h"
+#include "engine.h"
+#include "../common.h"
 
-struct cache_req *cache_request (struct cache *cache, struct cache_key *key, cache_callback cb_func, void *cb_data) {
+void _cache_req_free (struct cache_req *req) {
+    free(req->key_copy.buf);
+    free(req);
+}
+
+struct cache_req *cache_req (struct cache *cache, const struct cache_key *key, cache_callback cb_func, void *cb_data) {
     struct cache_req *req = NULL;
 
     // calloc the req info
@@ -12,16 +22,31 @@
         ERROR("calloc");
     
     // set up basic state
-    req->key = key;
     req->cb_func = cb_func;
     req->cb_data = cb_data;
     req->is_write = 0;
 
+    // copy the key
+    if (key->length == 0)
+        req->key_copy.length = strlen(key->buf) + 1;
+    else
+        req->key_copy.length = key->length;
+
+    if (req->key_copy.length == 0)
+        ERROR("zero-length key");
+
+    if ((req->key_copy.buf = malloc(req->key_copy.length)) == NULL)
+        ERROR("malloc");
+
+    memcpy(req->key_copy.buf, key->buf, req->key_copy.length);
+    
+    req->key = &req->key_copy;
+
     // look for an existing cache_op for this key
-    if ((req->op = cache_op_find(cache, key)) == NULL) {
+    if ((req->op = cache_op_find(cache, req->key)) == NULL) {
 
         // none available, start a new cache op
-        if (cache->engine->fn_op_start(cache, &req->op, key))
+        if (cache->engine->fn_op_start(cache, &req->op, req->key))
             goto error;
 
         // since we were the one that created it, we take care of writing it
@@ -32,21 +57,25 @@
         
     }
     
+    // engine may forget to update the op_ptr
+    assert(req->op);
+    
     // register
-    if (cache_op_reigster(req->op, req))
+    if (cache_op_register(req->op, req))
         goto error;
 
     // hurray, we now have an active cache_op \o/
-    return 0;
+    return req;
 
 error:
-    free(req);
+    _cache_req_free(req);
 
-    return -1;
+    return NULL;
 }
 
-int _cache_req_notify (struct cache_req *req, enum cache_req_event event) {
-    if (req->cb_func(req, event, req->cb_arg)) {
+/*int _cache_req_notify (struct cache_req *req, enum cache_req_event event) { */
+int _cache_req_notify (struct cache_req *req) {
+    if (req->cb_func(req, req->cb_data)) {
         // XXX: handle errors
         assert(0);
     }
@@ -55,6 +84,7 @@
 }
 
 int cache_req_notify (struct cache_req *req) {
+/*
     switch (req->op->state) {
         case OP_STATE_INVALID:
         case OP_STATE_LOOKUP:
@@ -74,6 +104,12 @@
             
             break;
         
+        case OP_STATE_WRITE:
+            if (_cache_req_notify(req, req->is_write ? CACHE_EVENT_BEGIN_WRITE : CACHE_EVENT_BEGIN_READ))
+                goto error;
+
+            break;
+
         default:
             assert(0);
 
@@ -84,9 +120,12 @@
 
 error:
     return -1;
+*/
+    
+    return _cache_req_notify(req);
 }
 
-enum cache_req_state cache_request_state (struct cache_req *req) {
+enum cache_req_state cache_req_state (struct cache_req *req) {
      switch (req->op->state) {
         case OP_STATE_INVALID:
             return CACHE_STATE_INVALID;
@@ -95,11 +134,40 @@
             return CACHE_STATE_LOOKUP;
         
         case OP_STATE_HIT:
+            return CACHE_STATE_READ_BEGIN;
+
         case OP_STATE_MISS:
-            return CACHE_STATE_OPEN;
+            return req->is_write ? CACHE_STATE_WRITE_BEGIN : CACHE_STATE_READ_BEGIN;
         
+        case OP_STATE_WRITE:
+            return req->is_write ? CACHE_STATE_WRITE : CACHE_STATE_READ;
+
         default:
             assert(0);
     }
 }
 
+const struct cache_key *cache_req_key (struct cache_req *req) {
+    return req->key;
+}
+
+int cache_req_begin_write(struct cache_req *req, size_t hint) {
+    if (req->op->state != OP_STATE_MISS || !req->is_write)
+        ERROR("req not in pre-write mode");
+
+    return cache_op_begin_write(req->op, hint);
+
+error:
+    return -1;
+}
+
+int cache_req_push (struct cache_req *req, int fd, size_t *size) {
+    if (req->op->state != OP_STATE_WRITE || !req->is_write)
+        ERROR("req not in write mode");
+    
+    return cache_op_push(req->op, fd, size); 
+
+error:
+    return -1;
+}
+
--- a/cache/req.h	Thu Aug 07 20:28:06 2008 +0300
+++ b/cache/req.h	Fri Aug 08 00:15:29 2008 +0300
@@ -1,11 +1,14 @@
 #ifndef CACHE_REQ_H
 #define CACHE_REQ_H
 
+#include <sys/queue.h>
+
 #include "../cache.h"
 #include "op.h"
 
 struct cache_req {
     struct cache_key *key;
+    struct cache_key key_copy;  // XXX: who keeps the copy around?
     
     LIST_ENTRY(cache_req) node;
     
--- a/cache_engines.h	Thu Aug 07 20:28:06 2008 +0300
+++ b/cache_engines.h	Fri Aug 08 00:15:29 2008 +0300
@@ -1,9 +1,7 @@
 #ifndef CACHE_ENGINES_H
 #define CACHE_ENGINES_H
 
-struct cache_engine_fs;
-
-struct cache_engine_fs *cache_engine_fs (const char *cache_dir);
+struct cache_engine *cache_engine_fs (const char *cache_dir);
 
 
 #endif /* CACHE_ENGINES_H */