--- a/Makefile Thu Aug 07 20:28:06 2008 +0300
+++ b/Makefile Fri Aug 08 00:15:29 2008 +0300
@@ -1,38 +1,106 @@
-LDFLAGS = -Llib/libevent-dev/lib -levent -lpng -pthread
-CFLAGS = -Wall -g -Ilib/libevent-dev/include -std=gnu99
-
-EXECS = file_main web_main node_main
-
-all: web_main file_main node_main
-
-common.o: common.c common.h
-http.o: http.c http.h common.h
-config.o: config.c config.h
-socket.o: socket.c socket.h config.h common.h
+LIBEVENT_HEADERS = lib/libevent-dev/include
+LIBEVENT_LIBRARY = lib/libevent-dev/lib
+DEFINES =
-remote_node.o: remote_node.c remote_node.h common.h render_net.h socket.h config.h
-remote_pool.o: remote_pool.c remote_pool.h common.h
-render.o: render.c render.h common.h
-render_remote.o: render_remote.c render_remote.h common.h socket.h remote_node.h remote_pool.h
-render_png.o: render_png.c render_png.h common.h
-render_raw.o: render_raw.c render_raw.h common.h
-render_local.o: render_local.c render_local.h common.h render_png.h render_raw.h render_mandelbrot.h
-render_multi.o: render_multi.c render_multi.h render_remote.o common.h
-render_mandelbrot.o: render_mandelbrot.c render_mandelbrot.h common.h
-render_slices.o: render_slices.c render_slices.h common.h
-render_threads.o: render_threads.c render_threads.h common.h
-render_thread.o: render_thread.c render_thread.h common.h render_local.h
-tile.o: tile.c tile.h render.h render_struct.h
-static.o: static.c static.h common.h
+LDFLAGS = -L${LIBEVENT_LIBRARY} -levent -lpng -pthread -lpcl
+CFLAGS = -Wall -g -I${LIBEVENT_HEADERS} ${DEFINES} -std=gnu99
-file_main.o: file_main.c
-node_main.o: node_main.c common.h render.h render_struct.h render_thread.h render_thread_struct.h render_net.h socket.h
-web_main.o: web_main.c common.h render.h render_struct.h remote_node.h remote_pool.h render_remote.h config.h
+SRCS = *.c cache/*.c cache/engines/*.c
+OBJS = *.o
+EXECS = file_main web_main node_main cache_test
+
+all: depend ${EXECS}
file_main: file_main.o common.o render.o render_raw.o render_png.o render_local.o render_mandelbrot.o
node_main: node_main.o common.o config.o socket.o render.o render_thread.o render_local.o render_png.o render_raw.o render_mandelbrot.o
-web_main: web_main.o common.o config.o socket.o http.o render.o render_png.o remote_node.o remote_pool.o render_remote.o render_multi.o render_slices.o tile.o static.o
+web_main: web_main.o common.o config.o socket.o http.o render.o remote_node.o remote_pool.o render_remote.o tile.o static.o
+coro_test: coro_test.o common.o config.o socket.o
+cache_test: cache_test.o common.o cache/cache.o cache/req.o cache/op.o cache/engines/fs.o
clean :
- rm *.o ${EXECS}
+ -rm *.o ${EXECS}
+depend:
+ makedepend -Y -- $(CFLAGS) -- $(SRCS) 2> /dev/null
+
+# DO NOT DELETE
+
+cache_test.o: cache.h cache_engines.h common.h
+common.o: common.h
+config.o: config.h common.h
+coro_test.o: lib/libevent-dev/include/event2/util.h
+coro_test.o: lib/libevent-dev/include/event-config.h
+coro_test.o: lib/libevent-dev/include/event2/event.h
+coro_test.o: lib/libevent-dev/include/event2/event_compat.h
+coro_test.o: lib/libevent-dev/include/event2/event_struct.h common.h config.h
+coro_test.o: socket.h
+file_main.o: common.h render.h render_local.h render_png_struct.h
+file_main.o: render_png.h render_raw_struct.h render_raw.h
+http.o: lib/libevent-dev/include/event2/event_struct.h
+http.o: lib/libevent-dev/include/event-config.h
+http.o: lib/libevent-dev/include/event2/util.h http.h
+http.o: lib/libevent-dev/include/event2/http.h common.h
+node_main.o: lib/libevent-dev/include/event2/event.h
+node_main.o: lib/libevent-dev/include/event-config.h
+node_main.o: lib/libevent-dev/include/event2/util.h
+node_main.o: lib/libevent-dev/include/event2/event_struct.h
+node_main.o: lib/libevent-dev/include/event2/event_compat.h
+node_main.o: lib/libevent-dev/include/event2/bufferevent.h common.h socket.h
+node_main.o: config.h render.h render_struct.h render_net.h render_thread.h
+node_main.o: render_thread_struct.h
+remote_node.o: remote_node.h config.h common.h render_net.h socket.h
+remote_pool.o: remote_pool.h remote_node.h config.h common.h
+render.o: render_struct.h render.h
+render_local.o: common.h render_struct.h render.h render_local.h
+render_local.o: render_png_struct.h render_png.h render_raw_struct.h
+render_local.o: render_raw.h render_mandelbrot.h
+render_mandelbrot.o: common.h render_struct.h render.h render_mandelbrot.h
+render_multi.o: common.h render_struct.h render.h render_multi.h
+render_multi.o: lib/libevent-dev/include/event2/util.h
+render_multi.o: lib/libevent-dev/include/event-config.h
+render_multi.o: lib/libevent-dev/include/event2/buffer.h remote_pool.h
+render_multi.o: remote_node.h config.h render_remote.h render_slices_struct.h
+render_multi.o: render_png_struct.h render_png.h render_slices.h
+render_png.o: common.h render_struct.h render.h render_png_struct.h
+render_png.o: render_png.h
+render_raw.o: common.h render_struct.h render.h render_raw_struct.h
+render_raw.o: render_raw.h
+render_remote.o: lib/libevent-dev/include/event2/event.h
+render_remote.o: lib/libevent-dev/include/event-config.h
+render_remote.o: lib/libevent-dev/include/event2/util.h
+render_remote.o: lib/libevent-dev/include/event2/event_struct.h
+render_remote.o: lib/libevent-dev/include/event2/bufferevent.h
+render_remote.o: render_remote.h lib/libevent-dev/include/event2/buffer.h
+render_remote.o: render.h remote_pool.h remote_node.h config.h common.h
+render_remote.o: render_struct.h render_net.h socket.h
+render_slices.o: common.h render_struct.h render.h render_slices_struct.h
+render_slices.o: render_png_struct.h render_png.h render_slices.h
+render_thread.o: lib/libevent-dev/include/event2/event.h
+render_thread.o: lib/libevent-dev/include/event-config.h
+render_thread.o: lib/libevent-dev/include/event2/util.h common.h
+render_thread.o: render_thread.h render.h render_thread_struct.h
+render_thread.o: lib/libevent-dev/include/event2/event_struct.h
+render_thread.o: render_struct.h render_local.h render_png_struct.h
+render_thread.o: render_png.h render_raw_struct.h render_raw.h
+render_threads.o: common.h render_threads.h render.h render_slices_struct.h
+render_threads.o: render_struct.h render_png_struct.h render_png.h
+render_threads.o: render_slices.h render_mandelbrot.h
+socket.o: socket.h config.h common.h
+static.o: lib/libevent-dev/include/event2/http.h
+static.o: lib/libevent-dev/include/event2/util.h
+static.o: lib/libevent-dev/include/event-config.h
+static.o: lib/libevent-dev/include/event2/buffer.h static.h common.h
+tile.o: tile.h render.h render_struct.h
+web_main.o: lib/libevent-dev/include/event2/event.h
+web_main.o: lib/libevent-dev/include/event-config.h
+web_main.o: lib/libevent-dev/include/event2/util.h
+web_main.o: lib/libevent-dev/include/event2/event_compat.h
+web_main.o: lib/libevent-dev/include/event2/http.h
+web_main.o: lib/libevent-dev/include/event2/event_struct.h common.h http.h
+web_main.o: render_struct.h render.h remote_node.h config.h remote_pool.h
+web_main.o: render_remote.h lib/libevent-dev/include/event2/buffer.h tile.h
+web_main.o: static.h
+cache/cache.o: cache.h cache.h cache/engine.h
+cache/op.o: cache.h cache.h cache/op.h cache/req.h cache/engine.h common.h
+cache/req.o: cache.h cache.h cache/req.h cache/op.h cache/engine.h common.h
+cache/engines/fs.o: cache.h cache.h cache/engine.h cache/op.h common.h
--- a/cache.h Thu Aug 07 20:28:06 2008 +0300
+++ b/cache.h Fri Aug 08 00:15:29 2008 +0300
@@ -1,6 +1,8 @@
#ifndef CACHE_H
#define CACHE_H
+#include <sys/types.h>
+
/*
* The interface to the internal caching mechanism.
*
@@ -29,8 +31,8 @@
* What we use as keys in the cache. Key is a pointer to an arbitrary char buffer, length is the size of the key
* in bytes. If this is given as zero, it will be calcuated using strlen(). Zero-length keys are invalid.
*/
-struct cache_key_t {
- const char *buf;
+struct cache_key {
+ char *buf;
size_t length;
};
@@ -41,9 +43,8 @@
CACHE_STATE_INVALID,
CACHE_STATE_LOOKUP,
-
- CACHE_STATE_OPEN,
-
+ CACHE_STATE_WRITE_BEGIN,
+ CACHE_STATE_READ_BEGIN,
CACHE_STATE_WRITE,
CACHE_STATE_WRITE_PAUSE,
CACHE_STATE_READ,
@@ -53,13 +54,14 @@
};
/*
+/ *
* Transitions between states
- */
+ * /
enum cache_req_event {
// LOOKUP -> OPEN
CACHE_EVENT_HIT,
CACHE_EVENT_MISS,
-
+
// OPEN -> WRITE
CACHE_EVENT_BEGIN_WRITE,
@@ -81,11 +83,13 @@
// * -> ERROR
CACHE_EVENT_ERROR,
};
+*/
/*
* The callback used for cache_reqs
*/
-typedef (int) (*cache_callback) (struct cache_req *, enum cache_req_event, void *arg);
+//typedef int (*cache_callback) (struct cache_req *, enum cache_req_event, void *arg);
+typedef int (*cache_callback) (struct cache_req *, void *arg);
@@ -101,12 +105,17 @@
* Create a new request. The given callback function will be called at the various stages in the request, and can then
* drive the request forward.
*/
-struct cache_req *cache_request (struct cache *cache, struct cache_key *key, cache_callback cb_func, void *cb_data);
+struct cache_req *cache_req (struct cache *cache, const struct cache_key *key, cache_callback cb_func, void *cb_data);
/*
* Get the request's state.
*/
-enum cache_req_state cache_request_state (struct cache_req *req);
+enum cache_req_state cache_req_state (struct cache_req *req);
+
+/*
+ * Get the rquest's key
+ */
+const struct cache_key *cache_req_key (struct cache_req *req);
/*
* Get information about the amount of data in this cache entry.
@@ -117,6 +126,11 @@
void cache_req_available (struct cache_req *req, ssize_t *size, ssize_t *offset, ssize_t *available);
/*
+ * Prepare this cache req for writing in the data. Hint, if nonzero, is used to pre-allocate resources for the entry.
+ */
+int cache_req_begin_write(struct cache_req *req, size_t hint);
+
+/*
* Add some data into this cache entry, reading from the given fd. This is only valid for cache_req's in
* CACHE_REQ_WRITE mode.
*
--- a/cache/cache.c Thu Aug 07 20:28:06 2008 +0300
+++ b/cache/cache.c Fri Aug 08 00:15:29 2008 +0300
@@ -1,6 +1,8 @@
+#include <stdlib.h>
#include "../cache.h"
#include "cache.h"
+#include "engine.h"
struct cache *cache_open (struct cache_engine *engine) {
struct cache *cache;
--- a/cache/engine.h Thu Aug 07 20:28:06 2008 +0300
+++ b/cache/engine.h Fri Aug 08 00:15:29 2008 +0300
@@ -5,14 +5,23 @@
/*
* Allocate a `struct cache`-compatible struct and return it via cache_ptr.
*/
- int (fn_init*) (struct cache_engine *engine, struct cache **cache_ptr);
+ int (*fn_init) (struct cache_engine *, struct cache **);
/*
* Allocate a `struct cache_op`-compatible struct and return it via cache_ptr.
*
* Begin the index lookup.
*/
- int (fn_op_start*) (struct cache *cache, struct cache_op **op_ptr, struct cache_key *key);
+ int (*fn_op_start) (struct cache *, struct cache_op **, struct cache_key *);
+
+ /*
+ * Prepare to write and possibly read this cache entry.
+ *
+ * size_hint, if nonzero, provides a guess at the size of the cache entry that can be used to optimize stuff
+ */
+ int (*fn_op_begin_write) (struct cache_op *, size_t size_hint);
+
+ int (*fn_op_push) (struct cache_op *op, int fd, size_t *size);
};
#endif /* CACHE_ENGINE_H */
--- a/cache/engines/fs.c Thu Aug 07 20:28:06 2008 +0300
+++ b/cache/engines/fs.c Fri Aug 08 00:15:29 2008 +0300
@@ -1,11 +1,24 @@
+#define _GNU_SOURCE
+#include <sys/types.h>
+#include <unistd.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <stdlib.h>
+#include <sys/param.h>
+#include <stdio.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <assert.h>
+#include "../../cache.h"
#include "../cache.h"
#include "../engine.h"
#include "../op.h"
#include "../../common.h"
-#define FS_BLOCK_START 4096
-#define FS_BLOCK_GROW_FACTOR 2
+#define FS_PAGE_SIZE (4096)
+#define FS_INITIAL_SIZE (1 * FS_PAGE_SIZE)
+#define FS_PAGE_GROW_FACTOR (2)
struct cache_engine_fs {
struct cache_engine base;
@@ -29,26 +42,36 @@
off_t file_size;
void *mmap;
+
+ off_t write_offset;
};
-static int _fs_mmap (struct cache_op_fs *op, int grow) {
+/*
+ * if new_size is equal to op->file_size, nothing will be changed
+ */
+static int _fs_mmap (struct cache_op_fs *op, size_t new_size) {
off_t old_size = op->file_size;
- if (op->file_size == 0 || grow) {
- // update size and ftruncate
- op->file_size = op->file_size ? op->file_size * 2 : FS_INITIAL_SIZE;
+ assert(new_size > 0);
- if (ftruncate(ctx->fd, ctx->file_size))
+ if (new_size != old_size) {
+ // calc new size
+ op->file_size = new_size;
+
+ // and ftruncate
+ if (ftruncate(op->fd, op->file_size))
PERROR("ftruncate");
}
if (op->mmap) {
+ assert(old_size > 0);
+
// XXX: test
- if ((ctx->mmap = mremap(ctx->mmap, old_size, ctx->file_size, MREMAP_MAYMOVE)) == MAP_FAILED)
+ if ((op->mmap = mremap(op->mmap, old_size, op->file_size, MREMAP_MAYMOVE)) == MAP_FAILED)
PERROR("mremap");
} else {
- if ((ctx->mmap = mmap(NULL, ctx->file_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, ctx->fd, 0)) == MAP_FAILED)
+ if ((op->mmap = mmap(NULL, op->file_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, op->fd, 0)) == MAP_FAILED)
PERROR("mmap");
}
@@ -59,6 +82,41 @@
return -1;
}
+/*
+ * reuturn a pointer to a static buf containing the path to the key in the given op
+ */
+static const char *_fs_path (struct cache_engine_fs *engine, struct cache_op_fs *op) {
+ static char path[PATH_MAX];
+
+ struct cache_key *key = op->base.key;
+
+ // construct the path to the appropriate file
+ if (snprintf(path, PATH_MAX, "%s/%*s", engine->cache_dir, (int) key->length - 1, key->buf) >= PATH_MAX)
+ ERROR("path too long: %s/%*s", engine->cache_dir, (int) key->length - 1, key->buf);
+
+ return path;
+
+error:
+ return NULL;
+}
+
+/*
+ * Grow the file if needed so that it fits the given amount of bytes
+ */
+static int _fs_grow (struct cache_op_fs *op, size_t new_size_hint) {
+ if (op->file_size >= new_size_hint)
+ return 0;
+
+ // XXX: need some math.ceil
+ size_t new_size = ((op->file_size / FS_PAGE_SIZE) + 1) * FS_PAGE_SIZE;
+
+ while (new_size < new_size_hint) {
+ new_size *= FS_PAGE_GROW_FACTOR;
+ }
+
+ return _fs_mmap(op, new_size);
+}
+
static int _fs_do_init (struct cache_engine *engine, struct cache **cache_ptr) {
struct cache_engine_fs *ctx = (struct cache_engine_fs *) engine;
struct cache_fs *cache = NULL;
@@ -81,11 +139,11 @@
}
static int _fs_do_op_start (struct cache *cache, struct cache_op **op_ptr, struct cache_key *key) {
- struct cache_engine_fs *ctx = (struct cache_engine_fs *) engine;
+ struct cache_engine_fs *engine = (struct cache_engine_fs *) cache->engine;
struct cache_op_fs *op = NULL;
- char path[PATH_MAX];
- struct stat stat;
+ struct stat stat_info;
int found = 0;
+ const char *path;
// allocate it
if ((op = calloc(1, sizeof(*op))) == NULL)
@@ -95,48 +153,143 @@
if (cache_op_init(&op->base, cache, key))
goto error;
- // mark it as being in the lookup state... shouldn't even be needed, as this is sync
+ // mark it as being in the lookup state...
op->base.state = OP_STATE_LOOKUP;
- // construct the path to the appropriate file
- if (snprintf(path, PATH_MAX, "%s/%s", ctx->cache_dir, key->key) >= PATH_MAX)
- ERROR("path too long: %s/%s", ctx->cache_dir, key->key);
-
- // open the appropriate file
- if ((op->fd = open(path, O_CREAT | O_RDWR, 0644)) == -1)
- PERROR("open: %s", path);
-
- // stat for filesize
- if (fstat(op->fd, &stat))
- PERROR("fstat");
-
- op->file_size = stat.st_size;
-
- // size == 0 -> new file -> not found
- found = (size > 0);
+ // fetch the path
+ if ((path = _fs_path(engine, op)) == NULL)
+ goto error;
- // grow if needed, and then mmap
- if (_fs_mmap(op, 0))
- goto error;
-
+ // stat
+ if (stat(path, &stat_info)) {
+ if (errno == ENOENT)
+ found = 0;
+ else
+ PERROR("stat: %s", path);
+ } else
+ found = 1;
+
// indicate that the key was found/not found
if (cache_op_lookup_done(&op->base, found))
goto error;
+ *op_ptr = &op->base;
+
// done!
return 0;
error:
- free(cache);
+ free(op);
return -1;
}
-struct cache_engine_fs *cache_engine_fs (const char *cache_dir) {
+int _fs_do_op_begin_write (struct cache_op *op_base, size_t size_hint) {
+ struct cache_op_fs *op = (struct cache_op_fs *) op_base;
+ struct cache_engine_fs *engine = (struct cache_engine_fs *) op->base.cache->engine;
+
+ const char *path;
+
+ assert(size_hint >= 0);
+
+ // should still just be calloc'd
+ assert(op->file_size == 0);
+
+ // fetch the path
+ if ((path = _fs_path(engine, op)) == NULL)
+ goto error;
+
+ // create the appropriate file for read-write, exclusively
+ if ((op->fd = open(path, O_CREAT | O_RDWR | O_EXCL, 0644)) == -1)
+ PERROR("open: %s", path);
+
+ // ftruncate, and then mmap
+ if (_fs_mmap(op, size_hint ? size_hint : FS_INITIAL_SIZE))
+ goto error;
+
+ // great
+ if (cache_op_write_ready(&op->base))
+ goto error;
+
+ // done
+ return 0;
+
+error:
+ return -1;
+}
+
+int _fs_do_op_push (struct cache_op *op_base, int fd, size_t *size_ptr) {
+ struct cache_op_fs *op = (struct cache_op_fs *) op_base;
+ struct cache_engine_fs *engine = (struct cache_engine_fs *) op->base.cache->engine;
+
+ size_t ret, size;
+
+ assert(op->fd > 0);
+ assert(op->file_size > 0);
+ assert(op->mmap != NULL);
+ assert(op->write_offset <= op->file_size);
+
+ // default size if none specified
+ if (*size_ptr == 0)
+ size = FS_INITIAL_SIZE;
+ else
+ size = *size_ptr;
+
+ // grow the file if needed
+ if (_fs_grow(op, op->write_offset + size))
+ goto error;
+
+ // read the data into the mmap'd region
+ if ((ret = read(fd, op->mmap + op->write_offset, size)) == -1)
+ // XXX: EAGAIN
+ PERROR("read");
+
+ // move the write offset along
+ op->write_offset += ret;
+
+ // return something
+ *size_ptr = ret;
+
+ // notify newly available data
+ if (cache_op_data_available(&op->base))
+ goto error;
+
+ // great
+ return 0;
+
+error:
+ return -1;
+}
+
+int _fs_do_op_done (struct cache_op *op_base) {
+ struct cache_op_fs *op = (struct cache_op_fs *) op_base;
+ struct cache_engine_fs *engine = (struct cache_engine_fs *) op->base.cache->engine;
+
+ assert(op->fd > 0);
+ assert(op->file_size > 0);
+ assert(op->mmap != NULL);
+ assert(op->write_offset <= op->file_size);
+
+ // truncate to match data length
+ if (_fs_mmap(op, op->write_offset))
+ goto error;
+
+ // notify that data is complete
+ if (cache_op_write_done(&op->base))
+ goto error;
+
+ // great
+ return 0;
+
+error:
+ return -1;
+
+}
+
+struct cache_engine *cache_engine_fs (const char *cache_dir) {
struct cache_engine_fs *ctx = NULL;
-
if ((ctx = calloc(1, sizeof(*ctx))) == NULL)
ERROR("calloc");
@@ -145,12 +298,14 @@
// set up the fn table
ctx->base.fn_init = &_fs_do_init;
ctx->base.fn_op_start = &_fs_do_op_start;
-
- return 0;
+ ctx->base.fn_op_begin_write = &_fs_do_op_begin_write;
+ ctx->base.fn_op_push = &_fs_do_op_push;
+
+ return &ctx->base;
error:
free(ctx);
- return -1;
+ return NULL;
}
--- a/cache/op.c Thu Aug 07 20:28:06 2008 +0300
+++ b/cache/op.c Fri Aug 08 00:15:29 2008 +0300
@@ -1,8 +1,13 @@
-
+#include <stdlib.h>
#include <sys/queue.h>
#include <string.h>
+#include <assert.h>
+#include "../cache.h"
+#include "cache.h"
#include "op.h"
+#include "req.h"
+#include "engine.h"
#include "../common.h"
int cache_op_init(struct cache_op *op, struct cache *cache, struct cache_key *key) {
@@ -11,30 +16,34 @@
op->state = OP_STATE_INVALID;
LIST_INIT(&op->req_list);
+
+ // add this to the cache's list of ops
+ LIST_INSERT_HEAD(&cache->op_list, op, node);
+
+ return 0;
}
struct cache_op *cache_op_find (struct cache *cache, struct cache_key *key) {
struct cache_op *op;
- for (op = cache->op_list.lh_first, op != NULL; op = op->node.le_next) {
- if (op->key->length == key->length && memcmp(op->key->buf, key->buf, key->length))
+ for (op = cache->op_list.lh_first; op != NULL; op = op->node.le_next) {
+ if (op->key->length == key->length && memcmp(op->key->buf, key->buf, key->length) == 0)
break;
}
- if (op)
- cache_op_incref(op);
-
return op;
}
int cache_op_register (struct cache_op *op, struct cache_req *req) {
LIST_INSERT_HEAD(&op->req_list, req, node);
+
+ return 0;
}
static int _cache_op_notify (struct cache_op *op) {
struct cache_req *req;
- for (req = op->req_list.lh_first, req != NULL; req = req->node.le_next) {
+ for (req = op->req_list.lh_first; req != NULL; req = req->node.le_next) {
if (cache_req_notify(req))
goto error;
}
@@ -45,6 +54,14 @@
return -1;
}
+int cache_op_begin_write (struct cache_op *op, size_t size_hint) {
+ return op->cache->engine->fn_op_begin_write(op, size_hint);
+}
+
+int cache_op_push (struct cache_op *op, int fd, size_t *size) {
+ return op->cache->engine->fn_op_push(op, fd, size);
+}
+
int cache_op_lookup_done (struct cache_op *op, int found) {
// modify state
op->state = found ? OP_STATE_HIT : OP_STATE_MISS;
@@ -53,3 +70,23 @@
return _cache_op_notify(op);
}
+int cache_op_write_ready (struct cache_op *op) {
+ // modify state
+ op->state = OP_STATE_WRITE;
+
+ // notify waiting reqs
+ return _cache_op_notify(op);
+}
+
+int cache_op_data_available (struct cache_op *op) {
+ // notify waiting reqs
+ return _cache_op_notify(op);
+}
+
+int cache_op_write_done (struct cache_op *op) {
+ op->state = OP_STATE_DONE;
+
+ // notify waiting reqs
+ return _cache_op_notify(op);
+}
+
--- a/cache/op.h Thu Aug 07 20:28:06 2008 +0300
+++ b/cache/op.h Fri Aug 08 00:15:29 2008 +0300
@@ -1,12 +1,16 @@
#ifndef CACHE_OP_H
#define CACHE_OP_H
+#include <sys/queue.h>
+
enum cache_op_state {
OP_STATE_INVALID,
OP_STATE_LOOKUP,
OP_STATE_MISS,
OP_STATE_HIT,
+
+ OP_STATE_WRITE,
};
struct cache_op {
@@ -38,8 +42,23 @@
int cache_op_register (struct cache_op *op, struct cache_req *req);
/*
+ * Prepare op for writing data to it, size_hint can be used to preallocate resources
+ */
+int cache_op_begin_write (struct cache_op *op, size_t size_hint);
+
+int cache_op_push (struct cache_op *op, int fd, size_t *size);
+
+/*
* Used by the engines to notify that the key lookup completed
*/
int cache_op_lookup_done (struct cache_op *op, int found);
+/*
+ * in OP_STATE_WRITE
+ */
+int cache_op_write_ready (struct cache_op *op);
+
+int cache_op_data_available (struct cache_op *op);
+int cache_op_write_done (struct cache_op *op);
+
#endif /* CACHE_OP_H */
--- a/cache/req.c Thu Aug 07 20:28:06 2008 +0300
+++ b/cache/req.c Fri Aug 08 00:15:29 2008 +0300
@@ -1,10 +1,20 @@
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
#include "../cache.h"
+#include "cache.h"
#include "req.h"
#include "op.h"
-#include "common.h"
+#include "engine.h"
+#include "../common.h"
-struct cache_req *cache_request (struct cache *cache, struct cache_key *key, cache_callback cb_func, void *cb_data) {
+void _cache_req_free (struct cache_req *req) {
+ free(req->key_copy.buf);
+ free(req);
+}
+
+struct cache_req *cache_req (struct cache *cache, const struct cache_key *key, cache_callback cb_func, void *cb_data) {
struct cache_req *req = NULL;
// calloc the req info
@@ -12,16 +22,31 @@
ERROR("calloc");
// set up basic state
- req->key = key;
req->cb_func = cb_func;
req->cb_data = cb_data;
req->is_write = 0;
+ // copy the key
+ if (key->length == 0)
+ req->key_copy.length = strlen(key->buf) + 1;
+ else
+ req->key_copy.length = key->length;
+
+ if (req->key_copy.length == 0)
+ ERROR("zero-length key");
+
+ if ((req->key_copy.buf = malloc(req->key_copy.length)) == NULL)
+ ERROR("malloc");
+
+ memcpy(req->key_copy.buf, key->buf, req->key_copy.length);
+
+ req->key = &req->key_copy;
+
// look for an existing cache_op for this key
- if ((req->op = cache_op_find(cache, key)) == NULL) {
+ if ((req->op = cache_op_find(cache, req->key)) == NULL) {
// none available, start a new cache op
- if (cache->engine->fn_op_start(cache, &req->op, key))
+ if (cache->engine->fn_op_start(cache, &req->op, req->key))
goto error;
// since we were the one that created it, we take care of writing it
@@ -32,21 +57,25 @@
}
+ // engine may forget to update the op_ptr
+ assert(req->op);
+
// register
- if (cache_op_reigster(req->op, req))
+ if (cache_op_register(req->op, req))
goto error;
// hurray, we now have an active cache_op \o/
- return 0;
+ return req;
error:
- free(req);
+ _cache_req_free(req);
- return -1;
+ return NULL;
}
-int _cache_req_notify (struct cache_req *req, enum cache_req_event event) {
- if (req->cb_func(req, event, req->cb_arg)) {
+/*int _cache_req_notify (struct cache_req *req, enum cache_req_event event) { */
+int _cache_req_notify (struct cache_req *req) {
+ if (req->cb_func(req, req->cb_data)) {
// XXX: handle errors
assert(0);
}
@@ -55,6 +84,7 @@
}
int cache_req_notify (struct cache_req *req) {
+/*
switch (req->op->state) {
case OP_STATE_INVALID:
case OP_STATE_LOOKUP:
@@ -74,6 +104,12 @@
break;
+ case OP_STATE_WRITE:
+ if (_cache_req_notify(req, req->is_write ? CACHE_EVENT_BEGIN_WRITE : CACHE_EVENT_BEGIN_READ))
+ goto error;
+
+ break;
+
default:
assert(0);
@@ -84,9 +120,12 @@
error:
return -1;
+*/
+
+ return _cache_req_notify(req);
}
-enum cache_req_state cache_request_state (struct cache_req *req) {
+enum cache_req_state cache_req_state (struct cache_req *req) {
switch (req->op->state) {
case OP_STATE_INVALID:
return CACHE_STATE_INVALID;
@@ -95,11 +134,40 @@
return CACHE_STATE_LOOKUP;
case OP_STATE_HIT:
+ return CACHE_STATE_READ_BEGIN;
+
case OP_STATE_MISS:
- return CACHE_STATE_OPEN;
+ return req->is_write ? CACHE_STATE_WRITE_BEGIN : CACHE_STATE_READ_BEGIN;
+ case OP_STATE_WRITE:
+ return req->is_write ? CACHE_STATE_WRITE : CACHE_STATE_READ;
+
default:
assert(0);
}
}
+const struct cache_key *cache_req_key (struct cache_req *req) {
+ return req->key;
+}
+
+int cache_req_begin_write(struct cache_req *req, size_t hint) {
+ if (req->op->state != OP_STATE_MISS || !req->is_write)
+ ERROR("req not in pre-write mode");
+
+ return cache_op_begin_write(req->op, hint);
+
+error:
+ return -1;
+}
+
+int cache_req_push (struct cache_req *req, int fd, size_t *size) {
+ if (req->op->state != OP_STATE_WRITE || !req->is_write)
+ ERROR("req not in write mode");
+
+ return cache_op_push(req->op, fd, size);
+
+error:
+ return -1;
+}
+
--- a/cache/req.h Thu Aug 07 20:28:06 2008 +0300
+++ b/cache/req.h Fri Aug 08 00:15:29 2008 +0300
@@ -1,11 +1,14 @@
#ifndef CACHE_REQ_H
#define CACHE_REQ_H
+#include <sys/queue.h>
+
#include "../cache.h"
#include "op.h"
struct cache_req {
struct cache_key *key;
+ struct cache_key key_copy; // XXX: who keeps the copy around?
LIST_ENTRY(cache_req) node;
--- a/cache_engines.h Thu Aug 07 20:28:06 2008 +0300
+++ b/cache_engines.h Fri Aug 08 00:15:29 2008 +0300
@@ -1,9 +1,7 @@
#ifndef CACHE_ENGINES_H
#define CACHE_ENGINES_H
-struct cache_engine_fs;
-
-struct cache_engine_fs *cache_engine_fs (const char *cache_dir);
+struct cache_engine *cache_engine_fs (const char *cache_dir);
#endif /* CACHE_ENGINES_H */