working write cycle support
authorTero Marttila <terom@fixme.fi>
Sat, 09 Aug 2008 00:59:01 +0300
changeset 33 b750e8865127
parent 32 1b09dad6757e
child 34 f3ab8656b6a0
working write cycle support
cache/engine.h
cache/engines/fs.c
cache/op.c
cache/op.h
cache/req.c
cache/req.h
--- a/cache/engine.h	Fri Aug 08 00:20:20 2008 +0300
+++ b/cache/engine.h	Sat Aug 09 00:59:01 2008 +0300
@@ -13,6 +13,11 @@
      * Begin the index lookup.
      */
     int (*fn_op_start) (struct cache *, struct cache_op **, struct cache_key *);
+    
+    /*
+     * Return some information about the available data.
+     */
+    int (*fn_op_available) (struct cache_op *op, size_t *size, size_t *offset);
 
     /*
      * Prepare to write and possibly read this cache entry.
@@ -20,8 +25,24 @@
      * size_hint, if nonzero, provides a guess at the size of the cache entry that can be used to optimize stuff
      */
     int (*fn_op_begin_write) (struct cache_op *, size_t size_hint);
+    
+    /*
+     * Read some data from the given fd into this cache entry. Size may contain a non-zero value to hint at how many
+     * bytes should be read, or zero to just read a suitable amount. Should be updated to how many bytes were actually
+     * written (may be zero if writes are currently paused).
+     */
+    int (*fn_op_push) (struct cache_op *op, int fd, size_t *size);
 
-    int (*fn_op_push) (struct cache_op *op, int fd, size_t *size);
+    /*
+     * No more calls to fn_op_push will take place. The cache entry now contains the complete data.
+     */
+    int (*fn_op_done) (struct cache_op *op);
+
+    /*
+     * The op is not needed for any operations anymore. Free any resources and memory associated with this op,
+     * including the op itself.
+     */
+    int (*fn_op_close) (struct cache_op *op);
 };
 
 #endif /* CACHE_ENGINE_H */
--- a/cache/engines/fs.c	Fri Aug 08 00:20:20 2008 +0300
+++ b/cache/engines/fs.c	Sat Aug 09 00:59:01 2008 +0300
@@ -38,8 +38,16 @@
 
     // custom
     int fd;
+    
+    /*
+     * Either contains the final size of the cache entry, or zero.
+     */
+    off_t size;
 
-    off_t file_size;
+    /*
+     * Contains the size of the currently mmap'd region
+     */
+    off_t mmap_size;
 
     void *mmap;
 
@@ -47,19 +55,19 @@
 };
 
 /*
- * if new_size is equal to op->file_size, nothing will be changed
+ * if new_size is equal to op->mmap_size, nothing will be changed
  */
 static int _fs_mmap (struct cache_op_fs *op, size_t new_size) {
-    off_t old_size = op->file_size;
+    off_t old_size = op->mmap_size;
 
     assert(new_size > 0);
 
     if (new_size != old_size) {
         // calc new size
-        op->file_size = new_size;
+        op->mmap_size = new_size;
         
         // and ftruncate
-        if (ftruncate(op->fd, op->file_size))
+        if (ftruncate(op->fd, op->mmap_size))
             PERROR("ftruncate");
     }
     
@@ -67,11 +75,11 @@
         assert(old_size > 0);
 
         // XXX: test
-        if ((op->mmap = mremap(op->mmap, old_size, op->file_size, MREMAP_MAYMOVE)) == MAP_FAILED)
+        if ((op->mmap = mremap(op->mmap, old_size, op->mmap_size, MREMAP_MAYMOVE)) == MAP_FAILED)
             PERROR("mremap");
 
     } else {
-        if ((op->mmap = mmap(NULL, op->file_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, op->fd, 0)) == MAP_FAILED)
+        if ((op->mmap = mmap(NULL, op->mmap_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, op->fd, 0)) == MAP_FAILED)
             PERROR("mmap");
 
     }
@@ -104,11 +112,11 @@
  * Grow the file if needed so that it fits the given amount of bytes
  */
 static int _fs_grow (struct cache_op_fs *op, size_t new_size_hint) {
-    if (op->file_size >= new_size_hint)
+    if (op->mmap_size >= new_size_hint)
         return 0;
     
     // XXX: need some math.ceil
-    size_t new_size = ((op->file_size / FS_PAGE_SIZE) + 1) * FS_PAGE_SIZE;
+    size_t new_size = ((op->mmap_size / FS_PAGE_SIZE) + 1) * FS_PAGE_SIZE;
 
     while (new_size < new_size_hint) {
         new_size *= FS_PAGE_GROW_FACTOR;
@@ -162,12 +170,15 @@
 
     // stat
     if (stat(path, &stat_info)) {
-        if (errno == ENOENT)
+        if (errno == ENOENT) {
             found = 0;
-        else
+            // op->size remains 0
+        } else
             PERROR("stat: %s", path);
-    } else
+    } else {
         found = 1;
+        op->size = stat_info.st_size;
+    }
 
     // indicate that the key was found/not found
     if (cache_op_lookup_done(&op->base, found))
@@ -185,6 +196,18 @@
    
 }
 
+int _fs_do_op_available (struct cache_op *op_base, size_t *size, size_t *offset) {
+    struct cache_op_fs *op = (struct cache_op_fs *) op_base;
+
+    // this is easy, it's only known if a HIT, or after op_done, so it's the same as...
+    *size = op->size;
+
+    // op->size if nonzero, write_offset otherwise
+    *offset = op->size ? op->size : op->write_offset;
+
+    return 0;
+}
+
 int _fs_do_op_begin_write (struct cache_op *op_base, size_t size_hint) {
     struct cache_op_fs *op = (struct cache_op_fs *) op_base;
     struct cache_engine_fs *engine = (struct cache_engine_fs *) op->base.cache->engine;
@@ -193,8 +216,8 @@
 
     assert(size_hint >= 0);
 
-    // should still just be calloc'd
-    assert(op->file_size == 0);
+    // should be unknown (0) for HITs that we then write to
+    assert(op->size == 0);
 
     // fetch the path
     if ((path = _fs_path(engine, op)) == NULL)
@@ -226,9 +249,11 @@
     size_t ret, size;
 
     assert(op->fd > 0);
-    assert(op->file_size > 0);
+
+    // must have called begin_write first...
+    assert(op->mmap_size > 0);
     assert(op->mmap != NULL);
-    assert(op->write_offset <= op->file_size);
+    assert(op->write_offset <= op->mmap_size);
 
     // default size if none specified
     if (*size_ptr == 0)
@@ -267,12 +292,17 @@
     struct cache_engine_fs *engine = (struct cache_engine_fs *) op->base.cache->engine;
 
     assert(op->fd > 0);
-    assert(op->file_size > 0);
     assert(op->mmap != NULL);
-    assert(op->write_offset <= op->file_size);
+    assert(op->write_offset <= op->mmap_size);
+    
+    // empty cache entries are illegal
+    assert(op->write_offset > 0);
+
+    // final size is now known
+    op->size = op->write_offset;
     
     // truncate to match data length
-    if (_fs_mmap(op, op->write_offset))
+    if (_fs_mmap(op, op->size))
         goto error;
 
     // notify that data is complete
@@ -287,6 +317,26 @@
 
 }
 
+int _fs_do_op_close (struct cache_op *op_base) {
+    struct cache_op_fs *op = (struct cache_op_fs *) op_base;
+
+    // unmap
+    if (op->mmap)
+        if (munmap(op->mmap, op->mmap_size))
+            PWARNING("munmap");
+
+    // close
+    if (op->fd > 0)
+        if (close(op->fd))
+            PWARNING("close");
+    
+    // XXX: delete if op->write_offset == 0?
+    
+    // free
+    free(op);
+
+    return 0;
+}
 struct cache_engine *cache_engine_fs (const char *cache_dir) {
     struct cache_engine_fs *ctx = NULL;
 
@@ -298,8 +348,11 @@
     // set up the fn table
     ctx->base.fn_init = &_fs_do_init;
     ctx->base.fn_op_start = &_fs_do_op_start;
+    ctx->base.fn_op_available = &_fs_do_op_available;
     ctx->base.fn_op_begin_write = &_fs_do_op_begin_write;
     ctx->base.fn_op_push = &_fs_do_op_push;
+    ctx->base.fn_op_done = &_fs_do_op_done;
+    ctx->base.fn_op_close = &_fs_do_op_close;
     
     return &ctx->base;
 
--- a/cache/op.c	Fri Aug 08 00:20:20 2008 +0300
+++ b/cache/op.c	Sat Aug 09 00:59:01 2008 +0300
@@ -10,7 +10,19 @@
 #include "engine.h"
 #include "../common.h"
 
-int cache_op_init(struct cache_op *op, struct cache *cache, struct cache_key *key) {
+void _cache_op_free (struct cache_op *op) {
+    // check we have no reqs listed
+    assert(op->req_list.lh_first == NULL);
+
+    // remove it from the engine op_list
+    LIST_REMOVE(op, node);
+
+    // tell the engine to close/free the op
+    if (op->cache->engine->fn_op_close(op))
+        WARNING("fn_op_close failed");
+}
+
+int cache_op_init (struct cache_op *op, struct cache *cache, struct cache_key *key) {
     op->cache = cache;
     op->key = key;
     op->state = OP_STATE_INVALID;
@@ -40,6 +52,19 @@
     return 0;
 }
 
+int cache_op_deregister (struct cache_op *op, struct cache_req *req) {
+    // XXX: check that the req is in our list of ops?
+    
+    LIST_REMOVE(req, node);
+    
+    if (op->req_list.lh_first == NULL) {
+        // the op is now unused
+        _cache_op_free(op);
+    }
+
+    return 0;
+}
+
 static int _cache_op_notify (struct cache_op *op) {
     struct cache_req *req;
 
@@ -54,6 +79,10 @@
     return -1;
 }
 
+int cache_op_available (struct cache_op *op, size_t *size, size_t *offset) {
+    return op->cache->engine->fn_op_available(op, size, offset);
+}
+
 int cache_op_begin_write (struct cache_op *op, size_t size_hint) {
     return op->cache->engine->fn_op_begin_write(op, size_hint);
 }
@@ -62,6 +91,10 @@
     return op->cache->engine->fn_op_push(op, fd, size);
 }
 
+int cache_op_done (struct cache_op *op) {
+    return op->cache->engine->fn_op_done(op);
+}
+
 int cache_op_lookup_done (struct cache_op *op, int found) {
     // modify state
     op->state = found ? OP_STATE_HIT : OP_STATE_MISS;
@@ -84,7 +117,7 @@
 }
 
 int cache_op_write_done (struct cache_op *op) {
-    op->state = OP_STATE_DONE;
+    op->state = OP_STATE_READ;
 
     // notify waiting reqs
     return _cache_op_notify(op);
--- a/cache/op.h	Fri Aug 08 00:20:20 2008 +0300
+++ b/cache/op.h	Sat Aug 09 00:59:01 2008 +0300
@@ -10,9 +10,8 @@
     OP_STATE_MISS,
     OP_STATE_HIT,
 
+    OP_STATE_READ,
     OP_STATE_WRITE,
-
-    OP_STATE_DONE,
 };
 
 struct cache_op {
@@ -44,11 +43,19 @@
 int cache_op_register (struct cache_op *op, struct cache_req *req);
 
 /*
+ * The given req is not interested in this op anymore. The op is removed if it was the only req
+ */
+int cache_op_deregister (struct cache_op *op, struct cache_req *req);
+
+int cache_op_available (struct cache_op *op, size_t *size, size_t *offset);
+
+/*
  * Prepare op for writing data to it, size_hint can be used to preallocate resources
  */
 int cache_op_begin_write (struct cache_op *op, size_t size_hint);
 
 int cache_op_push (struct cache_op *op, int fd, size_t *size);
+int cache_op_done (struct cache_op *op);
 
 /*
  * Used by the engines to notify that the key lookup completed
@@ -59,7 +66,6 @@
  * in OP_STATE_WRITE
  */
 int cache_op_write_ready (struct cache_op *op);
-
 int cache_op_data_available (struct cache_op *op);
 int cache_op_write_done (struct cache_op *op);
 
--- a/cache/req.c	Fri Aug 08 00:20:20 2008 +0300
+++ b/cache/req.c	Sat Aug 09 00:59:01 2008 +0300
@@ -10,6 +10,8 @@
 #include "../common.h"
 
 void _cache_req_free (struct cache_req *req) {
+    assert(req->op == NULL);
+
     free(req->key_copy.buf);
     free(req);
 }
@@ -68,6 +70,9 @@
     return req;
 
 error:
+    // we are not associated with any op
+    req->op = NULL;
+
     _cache_req_free(req);
 
     return NULL;
@@ -139,9 +144,12 @@
         case OP_STATE_MISS:
             return req->is_write ? CACHE_STATE_WRITE_BEGIN : CACHE_STATE_READ_BEGIN;
         
+        case OP_STATE_READ:
+            return CACHE_STATE_READ;
+
         case OP_STATE_WRITE:
             return req->is_write ? CACHE_STATE_WRITE : CACHE_STATE_READ;
-
+        
         default:
             assert(0);
     }
@@ -151,6 +159,21 @@
     return req->key;
 }
 
+int cache_req_available (struct cache_req *req, size_t *size, size_t *offset, size_t *available) {
+    if (req->op->state != OP_STATE_READ && req->op->state != OP_STATE_WRITE)
+        ERROR("req is not readable");
+
+    if (cache_op_available(req->op, size, offset))
+        goto error;
+
+    *available = (*offset - req->read_offset);
+
+    return 0;
+
+error:
+    return -1;
+}
+
 int cache_req_begin_write(struct cache_req *req, size_t hint) {
     if (req->op->state != OP_STATE_MISS || !req->is_write)
         ERROR("req not in pre-write mode");
@@ -171,3 +194,24 @@
     return -1;
 }
 
+int cache_req_done (struct cache_req *req) {
+    if (req->op->state != OP_STATE_WRITE || !req->is_write)
+        ERROR("req not in write mode");
+    
+    return cache_op_done(req->op); 
+
+error:
+    return -1;
+}
+
+void cache_req_release (struct cache_req *req) {
+    // unconditional deregister
+    cache_op_deregister(req->op, req);
+    
+    // we are not associated with any op anymore
+    req->op = NULL;
+    
+    // free
+    _cache_req_free(req);
+}
+
--- a/cache/req.h	Fri Aug 08 00:20:20 2008 +0300
+++ b/cache/req.h	Sat Aug 09 00:59:01 2008 +0300
@@ -18,6 +18,8 @@
     struct cache_op *op;
 
     int is_write;
+
+    off_t read_offset;
 };
 
 /*