read/pull support
authorTero Marttila <terom@fixme.fi>
Sat, 09 Aug 2008 20:11:59 +0300
changeset 37 f0188b445c84
parent 36 b4023990811e
child 38 9894df13b779
read/pull support
cache/engine.h
cache/engines/fs.c
cache/op.c
cache/op.h
cache/req.c
cache/req.h
cache_test.c
--- a/cache/engine.h	Sat Aug 09 14:42:59 2008 +0300
+++ b/cache/engine.h	Sat Aug 09 20:11:59 2008 +0300
@@ -37,6 +37,14 @@
      * written (may be zero if writes are currently paused).
      */
     int (*fn_op_push) (struct cache_op *, int fd, size_t *size);
+    
+    /*
+     * Write some data into the given fd from this cache entry. Size either specifies how many bytes to read (and
+     * should not be more than is available), or zero to read as much as possible. Offset is the offset from the
+     * beginning of the cache entry, and should be updated to what the next unread byte would be. Size should be
+     * updated to how many bytes was read.
+     */
+    int (*fn_op_pull) (struct cache_op *, int fd, size_t *offset, size_t *size);
 
     /*
      * No more calls to fn_op_push will take place. The cache entry now contains the complete data.
--- a/cache/engines/fs.c	Sat Aug 09 14:42:59 2008 +0300
+++ b/cache/engines/fs.c	Sat Aug 09 20:11:59 2008 +0300
@@ -39,10 +39,18 @@
     // custom
     int fd;
     
+    // what kind of access... PROT_READ | PROT_WRITE or PROT_READ
+    int mmap_prot;
+    
     /*
      * Either contains the final size of the cache entry, or zero.
      */
     off_t size;
+    
+    /*
+     * Contains the size of the underlying file. Usually this is the same as mmap_size.
+     */
+    off_t file_size;
 
     /*
      * Contains the size of the currently mmap'd region
@@ -54,32 +62,41 @@
     off_t write_offset;
 };
 
+#define FS_OP_DATA_SIZE(op) (op->size ? op->size : op->write_offset)
+
 /*
  * if new_size is equal to op->mmap_size, nothing will be changed
  */
 static int _fs_mmap (struct cache_op_fs *op, size_t new_size) {
-    off_t old_size = op->mmap_size;
-
     assert(new_size > 0);
-
-    if (new_size != old_size) {
-        // calc new size
-        op->mmap_size = new_size;
+    
+    // resize file?
+    if (new_size != op->file_size) {
+        // new size
+        op->file_size = new_size;
         
         // and ftruncate
-        if (ftruncate(op->fd, op->mmap_size))
+        if (ftruncate(op->fd, op->file_size))
             PERROR("ftruncate");
     }
     
+    // create mmap, or resize?
     if (op->mmap) {
-        assert(old_size > 0);
-
-        // XXX: test
-        if ((op->mmap = mremap(op->mmap, old_size, op->mmap_size, MREMAP_MAYMOVE)) == MAP_FAILED)
-            PERROR("mremap");
+        assert(op->mmap_size > 0);
+        
+        // resize mmap?
+        if (op->mmap_size != new_size) {
+            if ((op->mmap = mremap(op->mmap, new_size, op->mmap_size, MREMAP_MAYMOVE)) == MAP_FAILED)
+                PERROR("mremap");
+            
+            op->mmap_size = new_size;
+        }
 
     } else {
-        if ((op->mmap = mmap(NULL, op->mmap_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, op->fd, 0)) == MAP_FAILED)
+        // do new mmap
+        op->mmap_size = new_size;
+
+        if ((op->mmap = mmap(NULL, op->mmap_size, op->mmap_prot, MAP_SHARED | MAP_POPULATE, op->fd, 0)) == MAP_FAILED)
             PERROR("mmap");
 
     }
@@ -177,7 +194,7 @@
             PERROR("stat: %s", path);
     } else {
         found = 1;
-        op->size = stat_info.st_size;
+        op->file_size = stat_info.st_size;
     }
 
     // indicate that the key was found/not found
@@ -208,6 +225,54 @@
     return 0;
 }
 
+int _fs_do_op_begin_read (struct cache_op *op_base) {
+    struct cache_op_fs *op = (struct cache_op_fs *) op_base;
+    struct cache_engine_fs *engine = (struct cache_engine_fs *) op->base.cache->engine;
+    
+    const char *path;
+    
+    // should be known (nonzero) for HITs that we read from
+    assert(op->file_size > 0);
+
+    // fetch the path
+    if ((path = _fs_path(engine, op)) == NULL)
+        goto error;
+
+    // create the appropriate file for read-write, exclusively
+    if ((op->fd = open(path, O_RDONLY)) == -1)
+        PERROR("open: %s", path);
+    
+    {
+        struct stat stat_info;
+
+        // assert size hasn't changed
+        if (fstat(op->fd, &stat_info))
+            PERROR("fstat");
+
+        assert(stat_info.st_size == op->file_size);
+    }
+
+    // assign exact size
+    op->size = op->file_size;
+
+    // read-only access
+    op->mmap_prot = PROT_READ;
+
+    // doesn't ftruncate, just mmap's
+    if (_fs_mmap(op, op->size))
+        goto error;
+    
+    // great
+    if (_cache_op_read_ready(&op->base))
+        goto error;
+
+    // done
+    return 0;
+
+error:
+    return -1;
+}
+
 int _fs_do_op_begin_write (struct cache_op *op_base, size_t size_hint) {
     struct cache_op_fs *op = (struct cache_op_fs *) op_base;
     struct cache_engine_fs *engine = (struct cache_engine_fs *) op->base.cache->engine;
@@ -216,7 +281,7 @@
 
     assert(size_hint >= 0);
 
-    // should be unknown (0) for HITs that we then write to
+    // should be unknown (0) for MISS's that we then write to
     assert(op->size == 0);
 
     // fetch the path
@@ -227,6 +292,9 @@
     if ((op->fd = open(path, O_CREAT | O_RDWR | O_EXCL, 0644)) == -1)
         PERROR("open: %s", path);
     
+    // read/write access
+    op->mmap_prot = PROT_READ | PROT_WRITE;
+    
     // ftruncate, and then mmap
     if (_fs_mmap(op, size_hint ? size_hint : FS_INITIAL_SIZE))
         goto error;
@@ -287,6 +355,48 @@
     return -1;
 }
 
+int _fs_do_op_pull (struct cache_op *op_base, int fd, size_t *offset, size_t *size) {
+    struct cache_op_fs *op = (struct cache_op_fs *) op_base;
+    struct cache_engine_fs *engine = (struct cache_engine_fs *) op->base.cache->engine;
+
+    size_t ret;
+
+    // must have called begin_read first...
+    assert(op->fd > 0);
+    assert(op->mmap_size > 0);
+    assert(op->mmap != NULL);
+    // op->size may be zero
+
+    // default size if none specified
+    if (*size == 0) {
+        *size = FS_OP_DATA_SIZE(op) - *offset;
+
+        if (*size == 0) {
+            ERROR("no more data available");
+        }
+
+    } else if (*size + *offset > FS_OP_DATA_SIZE(op)) {
+        ERROR("more data requested is available");
+
+    } else if (*size == 0) {
+        ERROR("size may not be zero");
+    }
+
+    // write the data from the mmap'd region
+    if ((ret = write(fd, op->mmap + *offset, *size)) == -1)
+        // XXX: EAGAIN?
+        PERROR("write");
+    
+    // move the offset along
+    *offset += ret;
+
+    // great
+    return 0;
+
+error:
+    return -1;
+}
+
 int _fs_do_op_done (struct cache_op *op_base) {
     struct cache_op_fs *op = (struct cache_op_fs *) op_base;
     struct cache_engine_fs *engine = (struct cache_engine_fs *) op->base.cache->engine;
@@ -349,8 +459,10 @@
     ctx->base.fn_init = &_fs_do_init;
     ctx->base.fn_op_start = &_fs_do_op_start;
     ctx->base.fn_op_available = &_fs_do_op_available;
+    ctx->base.fn_op_begin_read = &_fs_do_op_begin_read;
     ctx->base.fn_op_begin_write = &_fs_do_op_begin_write;
     ctx->base.fn_op_push = &_fs_do_op_push;
+    ctx->base.fn_op_pull = &_fs_do_op_pull;
     ctx->base.fn_op_done = &_fs_do_op_done;
     ctx->base.fn_op_close = &_fs_do_op_close;
     
--- a/cache/op.c	Sat Aug 09 14:42:59 2008 +0300
+++ b/cache/op.c	Sat Aug 09 20:11:59 2008 +0300
@@ -105,6 +105,12 @@
     return op->cache->engine->fn_op_push(op, fd, size);
 }
 
+int cache_op_pull (struct cache_op *op, int fd, size_t *offset, size_t *size) {
+    assert(op->state == OP_STATE_READ || op->state == OP_STATE_WRITE);
+
+    return op->cache->engine->fn_op_pull(op, fd, offset, size);
+}
+
 int cache_op_done (struct cache_op *op) {
     assert(op->state == OP_STATE_WRITE);
 
@@ -119,6 +125,14 @@
     return _cache_op_notify(op);
 }
 
+int _cache_op_read_ready (struct cache_op *op) {
+    // modify state
+    op->state = OP_STATE_READ;
+
+    // notify waiting reqs
+    return _cache_op_notify(op);
+}
+
 int _cache_op_write_ready (struct cache_op *op) {
     // modify state
     op->state = OP_STATE_WRITE;
--- a/cache/op.h	Sat Aug 09 14:42:59 2008 +0300
+++ b/cache/op.h	Sat Aug 09 20:11:59 2008 +0300
@@ -133,6 +133,18 @@
 int cache_op_push (struct cache_op *op, int fd, size_t *size);
 
 /*
+ * Read some data from the cache into the given fd.
+ *
+ * The data pulled from the cache entry should be *size bytes, starting at *offset. If the value of *size is zero,
+ * then as much data as is available will be pulled, otherwise, pull the given number of bytes. If the cache entry
+ * contains less bytes than requested, this is an error. Offset should be updated to point to what will be the next
+ * unread data, and size should be updated to how many bytes were read (if it was given as zero).
+ *
+ * The op mustbe in the OP_STATE_READ or OP_STATE_WRITE state.
+ */
+int cache_op_pull (struct cache_op *op, int fd, size_t *offset, size_t *size);
+
+/*
  * Indicate that the freshly written cache entry is now complete. This should be called after the last cache_op_push
  * call, and no more cache_op_push calls may ensue.
  *
@@ -153,6 +165,13 @@
 int _cache_op_lookup_done (struct cache_op *op, int found);
 
 /*
+ * cache_op_begin_read completed, ad the cache op is ready for cache_op_pull
+ *
+ * OP_STATE_OPEN_READ -> OP_STATE_READ
+ */
+int _cache_op_read_ready (struct cache_op *op);
+
+/*
  * cache_op_begin_write completed, and the cache op is ready for cache_op_push.
  *
  * OP_STATE_OPEN_WRITE -> OP_STATE_WRITE
--- a/cache/req.c	Sat Aug 09 14:42:59 2008 +0300
+++ b/cache/req.c	Sat Aug 09 20:11:59 2008 +0300
@@ -226,6 +226,16 @@
     return -1;
 }
 
+int cache_req_pull (struct cache_req *req, int fd, size_t *size) {
+    if (req->op->state != OP_STATE_READ && req->op->state != OP_STATE_WRITE)
+        ERROR("req is not readable");
+
+    return cache_op_pull(req->op, fd, &req->read_offset, size);
+
+error:
+    return -1;
+}
+
 int cache_req_done (struct cache_req *req) {
     if (req->op->state != OP_STATE_WRITE || !req->is_write)
         ERROR("req not in write mode");
--- a/cache/req.h	Sat Aug 09 14:42:59 2008 +0300
+++ b/cache/req.h	Sat Aug 09 20:11:59 2008 +0300
@@ -28,7 +28,7 @@
     int is_write;
     
     // our own read offset into the cache entry.
-    off_t read_offset;
+    size_t read_offset;
 };
 
 /*
--- a/cache_test.c	Sat Aug 09 14:42:59 2008 +0300
+++ b/cache_test.c	Sat Aug 09 20:11:59 2008 +0300
@@ -2,6 +2,7 @@
 #include <stdio.h>
 #include <string.h>
 #include <stdlib.h>
+#include <assert.h>
 
 #include "cache.h"
 #include "cache_engines.h"
@@ -82,8 +83,10 @@
 void cmd_req (int index, char *key_buf);
 void cmd_status (int index, char *key);
 void cmd_available (int index, char *unused);
+void cmd_read (int index, char *unused);
 void cmd_write (int index, char *unused);
 void cmd_push (int index, char *data);
+void cmd_pull (int index, char *unused);
 void cmd_done (int index, char *unused);
 void cmd_release (int index, char *unused);
 
@@ -98,8 +101,10 @@
     {   "req",      &cmd_req,       "req <req_id> <key>"        },
     {   "status",   &cmd_status,    "status <req_id>"           },
     {   "available",&cmd_available, "available <req_id>"        },
+    {   "read",     &cmd_read,      "read <req_id>"             },
     {   "write",    &cmd_write,     "write <req_id> [<size_hint>]"  },
     {   "push",     &cmd_push,      "push <req_id> <data>"      },
+    {   "pull",     &cmd_pull,      "pull <req_id>"             },
     {   "done",     &cmd_done,      "done <req_id>"             },
     {   "release",  &cmd_release,   "release <req_id>"          },
     {   NULL,       NULL,           NULL                        }
@@ -205,6 +210,24 @@
 
 }
 
+void cmd_read (int index, char *unused) {
+    const struct cache_key *key;
+
+    if (index < 0 || index >= REQ_COUNT)
+        ERROR("index is out of range");
+
+    key = cache_req_key(req_list[index].req);
+
+    INFO("Request %d (%*s): beginning read", 
+        index, (int) key->length, key->buf
+    );
+
+    if (cache_req_begin_read(req_list[index].req))
+        ERROR("cache_req_begin_read failed");
+        
+error:
+    return;
+}
 
 void cmd_write (int index, char *hint_str) {
     size_t hint;
@@ -264,6 +287,42 @@
     return;
 }
 
+void cmd_pull (int index, char *unused) {
+    size_t data_length, length;
+    const struct cache_key *key;
+
+    char buf[LINE_LENGTH];
+    
+    // unknown size
+    length = 0;
+    
+    key = cache_req_key(req_list[index].req);
+
+    if (cache_req_pull(req_list[index].req, req_list[index].pipe_write, &length))
+        ERROR("cache_req_pull failed");
+    
+    assert(length < LINE_LENGTH);
+
+    // read from the pipe
+    if ((data_length = read(req_list[index].pipe_read, buf, length)) == -1)
+        PERROR("read");
+    
+    // terminate
+    buf[data_length] = '\0';
+
+    if (length != data_length)
+        PWARNING("Only %zu/%zu bytes read from pipe!", data_length, length);
+    
+    INFO("Request %d (%*s): pulled %zu/%zu bytes: %s", 
+        index, (int) key->length, key->buf,
+        data_length, length,
+        buf
+    );
+
+error:
+    return;
+}
+
 void cmd_done (int index, char *unused) {
     const struct cache_key *key;