--- a/cache/engine.h Sat Aug 09 14:42:59 2008 +0300
+++ b/cache/engine.h Sat Aug 09 20:11:59 2008 +0300
@@ -37,6 +37,14 @@
* written (may be zero if writes are currently paused).
*/
int (*fn_op_push) (struct cache_op *, int fd, size_t *size);
+
+ /*
+ * Write some data into the given fd from this cache entry. Size either specifies how many bytes to read (and
+ * should not be more than is available), or zero to read as much as possible. Offset is the offset from the
+ * beginning of the cache entry, and should be updated to what the next unread byte would be. Size should be
+ * updated to how many bytes was read.
+ */
+ int (*fn_op_pull) (struct cache_op *, int fd, size_t *offset, size_t *size);
/*
* No more calls to fn_op_push will take place. The cache entry now contains the complete data.
--- a/cache/engines/fs.c Sat Aug 09 14:42:59 2008 +0300
+++ b/cache/engines/fs.c Sat Aug 09 20:11:59 2008 +0300
@@ -39,10 +39,18 @@
// custom
int fd;
+ // what kind of access... PROT_READ | PROT_WRITE or PROT_READ
+ int mmap_prot;
+
/*
* Either contains the final size of the cache entry, or zero.
*/
off_t size;
+
+ /*
+ * Contains the size of the underlying file. Usually this is the same as mmap_size.
+ */
+ off_t file_size;
/*
* Contains the size of the currently mmap'd region
@@ -54,32 +62,41 @@
off_t write_offset;
};
+#define FS_OP_DATA_SIZE(op) (op->size ? op->size : op->write_offset)
+
/*
* if new_size is equal to op->mmap_size, nothing will be changed
*/
static int _fs_mmap (struct cache_op_fs *op, size_t new_size) {
- off_t old_size = op->mmap_size;
-
assert(new_size > 0);
-
- if (new_size != old_size) {
- // calc new size
- op->mmap_size = new_size;
+
+ // resize file?
+ if (new_size != op->file_size) {
+ // new size
+ op->file_size = new_size;
// and ftruncate
- if (ftruncate(op->fd, op->mmap_size))
+ if (ftruncate(op->fd, op->file_size))
PERROR("ftruncate");
}
+ // create mmap, or resize?
if (op->mmap) {
- assert(old_size > 0);
-
- // XXX: test
- if ((op->mmap = mremap(op->mmap, old_size, op->mmap_size, MREMAP_MAYMOVE)) == MAP_FAILED)
- PERROR("mremap");
+ assert(op->mmap_size > 0);
+
+ // resize mmap?
+ if (op->mmap_size != new_size) {
+ if ((op->mmap = mremap(op->mmap, new_size, op->mmap_size, MREMAP_MAYMOVE)) == MAP_FAILED)
+ PERROR("mremap");
+
+ op->mmap_size = new_size;
+ }
} else {
- if ((op->mmap = mmap(NULL, op->mmap_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, op->fd, 0)) == MAP_FAILED)
+ // do new mmap
+ op->mmap_size = new_size;
+
+ if ((op->mmap = mmap(NULL, op->mmap_size, op->mmap_prot, MAP_SHARED | MAP_POPULATE, op->fd, 0)) == MAP_FAILED)
PERROR("mmap");
}
@@ -177,7 +194,7 @@
PERROR("stat: %s", path);
} else {
found = 1;
- op->size = stat_info.st_size;
+ op->file_size = stat_info.st_size;
}
// indicate that the key was found/not found
@@ -208,6 +225,54 @@
return 0;
}
+int _fs_do_op_begin_read (struct cache_op *op_base) {
+ struct cache_op_fs *op = (struct cache_op_fs *) op_base;
+ struct cache_engine_fs *engine = (struct cache_engine_fs *) op->base.cache->engine;
+
+ const char *path;
+
+ // should be known (nonzero) for HITs that we read from
+ assert(op->file_size > 0);
+
+ // fetch the path
+ if ((path = _fs_path(engine, op)) == NULL)
+ goto error;
+
+ // create the appropriate file for read-write, exclusively
+ if ((op->fd = open(path, O_RDONLY)) == -1)
+ PERROR("open: %s", path);
+
+ {
+ struct stat stat_info;
+
+ // assert size hasn't changed
+ if (fstat(op->fd, &stat_info))
+ PERROR("fstat");
+
+ assert(stat_info.st_size == op->file_size);
+ }
+
+ // assign exact size
+ op->size = op->file_size;
+
+ // read-only access
+ op->mmap_prot = PROT_READ;
+
+ // doesn't ftruncate, just mmap's
+ if (_fs_mmap(op, op->size))
+ goto error;
+
+ // great
+ if (_cache_op_read_ready(&op->base))
+ goto error;
+
+ // done
+ return 0;
+
+error:
+ return -1;
+}
+
int _fs_do_op_begin_write (struct cache_op *op_base, size_t size_hint) {
struct cache_op_fs *op = (struct cache_op_fs *) op_base;
struct cache_engine_fs *engine = (struct cache_engine_fs *) op->base.cache->engine;
@@ -216,7 +281,7 @@
assert(size_hint >= 0);
- // should be unknown (0) for HITs that we then write to
+ // should be unknown (0) for MISS's that we then write to
assert(op->size == 0);
// fetch the path
@@ -227,6 +292,9 @@
if ((op->fd = open(path, O_CREAT | O_RDWR | O_EXCL, 0644)) == -1)
PERROR("open: %s", path);
+ // read/write access
+ op->mmap_prot = PROT_READ | PROT_WRITE;
+
// ftruncate, and then mmap
if (_fs_mmap(op, size_hint ? size_hint : FS_INITIAL_SIZE))
goto error;
@@ -287,6 +355,48 @@
return -1;
}
+int _fs_do_op_pull (struct cache_op *op_base, int fd, size_t *offset, size_t *size) {
+ struct cache_op_fs *op = (struct cache_op_fs *) op_base;
+ struct cache_engine_fs *engine = (struct cache_engine_fs *) op->base.cache->engine;
+
+ size_t ret;
+
+ // must have called begin_read first...
+ assert(op->fd > 0);
+ assert(op->mmap_size > 0);
+ assert(op->mmap != NULL);
+ // op->size may be zero
+
+ // default size if none specified
+ if (*size == 0) {
+ *size = FS_OP_DATA_SIZE(op) - *offset;
+
+ if (*size == 0) {
+ ERROR("no more data available");
+ }
+
+ } else if (*size + *offset > FS_OP_DATA_SIZE(op)) {
+ ERROR("more data requested is available");
+
+ } else if (*size == 0) {
+ ERROR("size may not be zero");
+ }
+
+ // write the data from the mmap'd region
+ if ((ret = write(fd, op->mmap + *offset, *size)) == -1)
+ // XXX: EAGAIN?
+ PERROR("write");
+
+ // move the offset along
+ *offset += ret;
+
+ // great
+ return 0;
+
+error:
+ return -1;
+}
+
int _fs_do_op_done (struct cache_op *op_base) {
struct cache_op_fs *op = (struct cache_op_fs *) op_base;
struct cache_engine_fs *engine = (struct cache_engine_fs *) op->base.cache->engine;
@@ -349,8 +459,10 @@
ctx->base.fn_init = &_fs_do_init;
ctx->base.fn_op_start = &_fs_do_op_start;
ctx->base.fn_op_available = &_fs_do_op_available;
+ ctx->base.fn_op_begin_read = &_fs_do_op_begin_read;
ctx->base.fn_op_begin_write = &_fs_do_op_begin_write;
ctx->base.fn_op_push = &_fs_do_op_push;
+ ctx->base.fn_op_pull = &_fs_do_op_pull;
ctx->base.fn_op_done = &_fs_do_op_done;
ctx->base.fn_op_close = &_fs_do_op_close;
--- a/cache/op.c Sat Aug 09 14:42:59 2008 +0300
+++ b/cache/op.c Sat Aug 09 20:11:59 2008 +0300
@@ -105,6 +105,12 @@
return op->cache->engine->fn_op_push(op, fd, size);
}
+int cache_op_pull (struct cache_op *op, int fd, size_t *offset, size_t *size) {
+ assert(op->state == OP_STATE_READ || op->state == OP_STATE_WRITE);
+
+ return op->cache->engine->fn_op_pull(op, fd, offset, size);
+}
+
int cache_op_done (struct cache_op *op) {
assert(op->state == OP_STATE_WRITE);
@@ -119,6 +125,14 @@
return _cache_op_notify(op);
}
+int _cache_op_read_ready (struct cache_op *op) {
+ // modify state
+ op->state = OP_STATE_READ;
+
+ // notify waiting reqs
+ return _cache_op_notify(op);
+}
+
int _cache_op_write_ready (struct cache_op *op) {
// modify state
op->state = OP_STATE_WRITE;
--- a/cache/op.h Sat Aug 09 14:42:59 2008 +0300
+++ b/cache/op.h Sat Aug 09 20:11:59 2008 +0300
@@ -133,6 +133,18 @@
int cache_op_push (struct cache_op *op, int fd, size_t *size);
/*
+ * Read some data from the cache into the given fd.
+ *
+ * The data pulled from the cache entry should be *size bytes, starting at *offset. If the value of *size is zero,
+ * then as much data as is available will be pulled, otherwise, pull the given number of bytes. If the cache entry
+ * contains less bytes than requested, this is an error. Offset should be updated to point to what will be the next
+ * unread data, and size should be updated to how many bytes were read (if it was given as zero).
+ *
+ * The op mustbe in the OP_STATE_READ or OP_STATE_WRITE state.
+ */
+int cache_op_pull (struct cache_op *op, int fd, size_t *offset, size_t *size);
+
+/*
* Indicate that the freshly written cache entry is now complete. This should be called after the last cache_op_push
* call, and no more cache_op_push calls may ensue.
*
@@ -153,6 +165,13 @@
int _cache_op_lookup_done (struct cache_op *op, int found);
/*
+ * cache_op_begin_read completed, ad the cache op is ready for cache_op_pull
+ *
+ * OP_STATE_OPEN_READ -> OP_STATE_READ
+ */
+int _cache_op_read_ready (struct cache_op *op);
+
+/*
* cache_op_begin_write completed, and the cache op is ready for cache_op_push.
*
* OP_STATE_OPEN_WRITE -> OP_STATE_WRITE
--- a/cache/req.c Sat Aug 09 14:42:59 2008 +0300
+++ b/cache/req.c Sat Aug 09 20:11:59 2008 +0300
@@ -226,6 +226,16 @@
return -1;
}
+int cache_req_pull (struct cache_req *req, int fd, size_t *size) {
+ if (req->op->state != OP_STATE_READ && req->op->state != OP_STATE_WRITE)
+ ERROR("req is not readable");
+
+ return cache_op_pull(req->op, fd, &req->read_offset, size);
+
+error:
+ return -1;
+}
+
int cache_req_done (struct cache_req *req) {
if (req->op->state != OP_STATE_WRITE || !req->is_write)
ERROR("req not in write mode");
--- a/cache/req.h Sat Aug 09 14:42:59 2008 +0300
+++ b/cache/req.h Sat Aug 09 20:11:59 2008 +0300
@@ -28,7 +28,7 @@
int is_write;
// our own read offset into the cache entry.
- off_t read_offset;
+ size_t read_offset;
};
/*
--- a/cache_test.c Sat Aug 09 14:42:59 2008 +0300
+++ b/cache_test.c Sat Aug 09 20:11:59 2008 +0300
@@ -2,6 +2,7 @@
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
+#include <assert.h>
#include "cache.h"
#include "cache_engines.h"
@@ -82,8 +83,10 @@
void cmd_req (int index, char *key_buf);
void cmd_status (int index, char *key);
void cmd_available (int index, char *unused);
+void cmd_read (int index, char *unused);
void cmd_write (int index, char *unused);
void cmd_push (int index, char *data);
+void cmd_pull (int index, char *unused);
void cmd_done (int index, char *unused);
void cmd_release (int index, char *unused);
@@ -98,8 +101,10 @@
{ "req", &cmd_req, "req <req_id> <key>" },
{ "status", &cmd_status, "status <req_id>" },
{ "available",&cmd_available, "available <req_id>" },
+ { "read", &cmd_read, "read <req_id>" },
{ "write", &cmd_write, "write <req_id> [<size_hint>]" },
{ "push", &cmd_push, "push <req_id> <data>" },
+ { "pull", &cmd_pull, "pull <req_id>" },
{ "done", &cmd_done, "done <req_id>" },
{ "release", &cmd_release, "release <req_id>" },
{ NULL, NULL, NULL }
@@ -205,6 +210,24 @@
}
+void cmd_read (int index, char *unused) {
+ const struct cache_key *key;
+
+ if (index < 0 || index >= REQ_COUNT)
+ ERROR("index is out of range");
+
+ key = cache_req_key(req_list[index].req);
+
+ INFO("Request %d (%*s): beginning read",
+ index, (int) key->length, key->buf
+ );
+
+ if (cache_req_begin_read(req_list[index].req))
+ ERROR("cache_req_begin_read failed");
+
+error:
+ return;
+}
void cmd_write (int index, char *hint_str) {
size_t hint;
@@ -264,6 +287,42 @@
return;
}
+void cmd_pull (int index, char *unused) {
+ size_t data_length, length;
+ const struct cache_key *key;
+
+ char buf[LINE_LENGTH];
+
+ // unknown size
+ length = 0;
+
+ key = cache_req_key(req_list[index].req);
+
+ if (cache_req_pull(req_list[index].req, req_list[index].pipe_write, &length))
+ ERROR("cache_req_pull failed");
+
+ assert(length < LINE_LENGTH);
+
+ // read from the pipe
+ if ((data_length = read(req_list[index].pipe_read, buf, length)) == -1)
+ PERROR("read");
+
+ // terminate
+ buf[data_length] = '\0';
+
+ if (length != data_length)
+ PWARNING("Only %zu/%zu bytes read from pipe!", data_length, length);
+
+ INFO("Request %d (%*s): pulled %zu/%zu bytes: %s",
+ index, (int) key->length, key->buf,
+ data_length, length,
+ buf
+ );
+
+error:
+ return;
+}
+
void cmd_done (int index, char *unused) {
const struct cache_key *key;