cache/engines/fs.c
author Tero Marttila <terom@fixme.fi>
Wed, 27 Aug 2008 21:30:32 +0300
changeset 41 540737bf6bac
parent 37 f0188b445c84
permissions -rw-r--r--
sending requests, and partial support for receiving -- incomplete, not tested
#define _GNU_SOURCE
#include <sys/types.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <stdlib.h>
#include <sys/param.h>
#include <stdio.h>
#include <fcntl.h>
#include <errno.h>
#include <assert.h>

#include "../../cache.h"
#include "../cache.h"
#include "../engine.h"
#include "../op.h"
#include "../../common.h"

#define FS_PAGE_SIZE (4096)
#define FS_INITIAL_SIZE (1 * FS_PAGE_SIZE)
#define FS_PAGE_GROW_FACTOR (2)

struct cache_engine_fs {
    struct cache_engine base;
    
    // custom stuff
    const char *cache_dir;
};

struct cache_fs {
    struct cache base;

    // custom stuff?
};

struct cache_op_fs {
    struct cache_op base;

    // custom
    int fd;
    
    // what kind of access... PROT_READ | PROT_WRITE or PROT_READ
    int mmap_prot;
    
    /*
     * Either contains the final size of the cache entry, or zero.
     */
    off_t size;
    
    /*
     * Contains the size of the underlying file. Usually this is the same as mmap_size.
     */
    off_t file_size;

    /*
     * Contains the size of the currently mmap'd region
     */
    off_t mmap_size;

    void *mmap;

    off_t write_offset;
};

#define FS_OP_DATA_SIZE(op) (op->size ? op->size : op->write_offset)

/*
 * if new_size is equal to op->mmap_size, nothing will be changed
 */
static int _fs_mmap (struct cache_op_fs *op, size_t new_size) {
    assert(new_size > 0);
    
    // resize file?
    if (new_size != op->file_size) {
        // new size
        op->file_size = new_size;
        
        // and ftruncate
        if (ftruncate(op->fd, op->file_size))
            PERROR("ftruncate");
    }
    
    // create mmap, or resize?
    if (op->mmap) {
        assert(op->mmap_size > 0);
        
        // resize mmap?
        if (op->mmap_size != new_size) {
            if ((op->mmap = mremap(op->mmap, new_size, op->mmap_size, MREMAP_MAYMOVE)) == MAP_FAILED)
                PERROR("mremap");
            
            op->mmap_size = new_size;
        }

    } else {
        // do new mmap
        op->mmap_size = new_size;

        if ((op->mmap = mmap(NULL, op->mmap_size, op->mmap_prot, MAP_SHARED | MAP_POPULATE, op->fd, 0)) == MAP_FAILED)
            PERROR("mmap");

    }

    return 0;

error:
    return -1;
}

/*
 * reuturn a pointer to a static buf containing the path to the key in the given op
 */
static const char *_fs_path (struct cache_engine_fs *engine, struct cache_op_fs *op) {
    static char path[PATH_MAX];

    struct cache_key *key = op->base.key;

    // construct the path to the appropriate file
    if (snprintf(path, PATH_MAX, "%s/%*s", engine->cache_dir, (int) key->length - 1, key->buf) >= PATH_MAX)
        ERROR("path too long: %s/%*s", engine->cache_dir, (int) key->length - 1, key->buf);
    
    return path;

error:
    return NULL;
}
 
/*
 * Grow the file if needed so that it fits the given amount of bytes
 */
static int _fs_grow (struct cache_op_fs *op, size_t new_size_hint) {
    if (op->mmap_size >= new_size_hint)
        return 0;
    
    // XXX: need some math.ceil
    size_t new_size = ((op->mmap_size / FS_PAGE_SIZE) + 1) * FS_PAGE_SIZE;

    while (new_size < new_size_hint) {
        new_size *= FS_PAGE_GROW_FACTOR;
    }

    return _fs_mmap(op, new_size);
}

static int _fs_do_init (struct cache_engine *engine, struct cache **cache_ptr) {
    struct cache_engine_fs *ctx = (struct cache_engine_fs *) engine;
    struct cache_fs *cache = NULL;


    if ((cache = calloc(1, sizeof(*cache))) == NULL)
        ERROR("calloc");

    if (cache_init(&cache->base, &ctx->base))
        goto error;

    *cache_ptr = &cache->base;
    
    return 0;

error:
    free(cache);

    return -1;
}
    
static int _fs_do_op_start (struct cache *cache, struct cache_op **op_ptr, struct cache_key *key) {
    struct cache_engine_fs *engine = (struct cache_engine_fs *) cache->engine;
    struct cache_op_fs *op = NULL;
    struct stat stat_info;
    int found = 0;
    const char *path;
    
    // allocate it
    if ((op = calloc(1, sizeof(*op))) == NULL)
        ERROR("calloc");
    
    // init it
    if (_cache_op_init(&op->base, cache, key))
        goto error;
    
    // mark it as being in the lookup state...
    op->base.state = OP_STATE_LOOKUP;
    
    // fetch the path
    if ((path = _fs_path(engine, op)) == NULL)
        goto error;

    // stat
    if (stat(path, &stat_info)) {
        if (errno == ENOENT) {
            found = 0;
            // op->size remains 0
        } else
            PERROR("stat: %s", path);
    } else {
        found = 1;
        op->file_size = stat_info.st_size;
    }

    // indicate that the key was found/not found
    if (_cache_op_lookup_done(&op->base, found))
        goto error;
    
    *op_ptr = &op->base;

    // done!
    return 0;

error:
    free(op);

    return -1;
   
}

int _fs_do_op_available (struct cache_op *op_base, size_t *size, size_t *offset) {
    struct cache_op_fs *op = (struct cache_op_fs *) op_base;

    // this is easy, it's only known if a HIT, or after op_done, so it's the same as...
    *size = op->size;

    // op->size if nonzero, write_offset otherwise
    *offset = op->size ? op->size : op->write_offset;

    return 0;
}

int _fs_do_op_begin_read (struct cache_op *op_base) {
    struct cache_op_fs *op = (struct cache_op_fs *) op_base;
    struct cache_engine_fs *engine = (struct cache_engine_fs *) op->base.cache->engine;
    
    const char *path;
    
    // should be known (nonzero) for HITs that we read from
    assert(op->file_size > 0);

    // fetch the path
    if ((path = _fs_path(engine, op)) == NULL)
        goto error;

    // create the appropriate file for read-write, exclusively
    if ((op->fd = open(path, O_RDONLY)) == -1)
        PERROR("open: %s", path);
    
    {
        struct stat stat_info;

        // assert size hasn't changed
        if (fstat(op->fd, &stat_info))
            PERROR("fstat");

        assert(stat_info.st_size == op->file_size);
    }

    // assign exact size
    op->size = op->file_size;

    // read-only access
    op->mmap_prot = PROT_READ;

    // doesn't ftruncate, just mmap's
    if (_fs_mmap(op, op->size))
        goto error;
    
    // great
    if (_cache_op_read_ready(&op->base))
        goto error;

    // done
    return 0;

error:
    return -1;
}

int _fs_do_op_begin_write (struct cache_op *op_base, size_t size_hint) {
    struct cache_op_fs *op = (struct cache_op_fs *) op_base;
    struct cache_engine_fs *engine = (struct cache_engine_fs *) op->base.cache->engine;
    
    const char *path;

    assert(size_hint >= 0);

    // should be unknown (0) for MISS's that we then write to
    assert(op->size == 0);

    // fetch the path
    if ((path = _fs_path(engine, op)) == NULL)
        goto error;

    // create the appropriate file for read-write, exclusively
    if ((op->fd = open(path, O_CREAT | O_RDWR | O_EXCL, 0644)) == -1)
        PERROR("open: %s", path);
    
    // read/write access
    op->mmap_prot = PROT_READ | PROT_WRITE;
    
    // ftruncate, and then mmap
    if (_fs_mmap(op, size_hint ? size_hint : FS_INITIAL_SIZE))
        goto error;
    
    // great
    if (_cache_op_write_ready(&op->base))
        goto error;

    // done
    return 0;

error:
    return -1;
}

int _fs_do_op_push (struct cache_op *op_base, int fd, size_t *size_ptr) {
    struct cache_op_fs *op = (struct cache_op_fs *) op_base;
    struct cache_engine_fs *engine = (struct cache_engine_fs *) op->base.cache->engine;

    size_t ret, size;

    assert(op->fd > 0);

    // must have called begin_write first...
    assert(op->mmap_size > 0);
    assert(op->mmap != NULL);
    assert(op->write_offset <= op->mmap_size);

    // default size if none specified
    if (*size_ptr == 0)
        size = FS_INITIAL_SIZE;
    else
        size = *size_ptr;

    // grow the file if needed
    if (_fs_grow(op, op->write_offset + size))
        goto error;
    
    // read the data into the mmap'd region
    if ((ret = read(fd, op->mmap + op->write_offset, size)) == -1)
        // XXX: EAGAIN
        PERROR("read");
    
    // move the write offset along
    op->write_offset += ret;

    // return something
    *size_ptr = ret;

    // notify newly available data
    if (_cache_op_data_available(&op->base))
        goto error;

    // great
    return 0;

error:
    return -1;
}

int _fs_do_op_pull (struct cache_op *op_base, int fd, size_t *offset, size_t *size) {
    struct cache_op_fs *op = (struct cache_op_fs *) op_base;
    struct cache_engine_fs *engine = (struct cache_engine_fs *) op->base.cache->engine;

    size_t ret;

    // must have called begin_read first...
    assert(op->fd > 0);
    assert(op->mmap_size > 0);
    assert(op->mmap != NULL);
    // op->size may be zero

    // default size if none specified
    if (*size == 0) {
        *size = FS_OP_DATA_SIZE(op) - *offset;

        if (*size == 0) {
            ERROR("no more data available");
        }

    } else if (*size + *offset > FS_OP_DATA_SIZE(op)) {
        ERROR("more data requested is available");

    } else if (*size == 0) {
        ERROR("size may not be zero");
    }

    // write the data from the mmap'd region
    if ((ret = write(fd, op->mmap + *offset, *size)) == -1)
        // XXX: EAGAIN?
        PERROR("write");
    
    // move the offset along
    *offset += ret;

    // great
    return 0;

error:
    return -1;
}

int _fs_do_op_done (struct cache_op *op_base) {
    struct cache_op_fs *op = (struct cache_op_fs *) op_base;
    struct cache_engine_fs *engine = (struct cache_engine_fs *) op->base.cache->engine;

    assert(op->fd > 0);
    assert(op->mmap != NULL);
    assert(op->write_offset <= op->mmap_size);
    
    // empty cache entries are illegal
    assert(op->write_offset > 0);

    // final size is now known
    op->size = op->write_offset;
    
    // truncate to match data length
    if (_fs_mmap(op, op->size))
        goto error;

    // notify that data is complete
    if (_cache_op_write_done(&op->base))
        goto error;

    // great
    return 0;

error:
    return -1;

}

int _fs_do_op_close (struct cache_op *op_base) {
    struct cache_op_fs *op = (struct cache_op_fs *) op_base;

    // unmap
    if (op->mmap)
        if (munmap(op->mmap, op->mmap_size))
            PWARNING("munmap");

    // close
    if (op->fd > 0)
        if (close(op->fd))
            PWARNING("close");
    
    // XXX: delete if op->write_offset == 0?
    
    // free
    free(op);

    return 0;
}
struct cache_engine *cache_engine_fs (const char *cache_dir) {
    struct cache_engine_fs *ctx = NULL;

    if ((ctx = calloc(1, sizeof(*ctx))) == NULL)
        ERROR("calloc");

    ctx->cache_dir = cache_dir;

    // set up the fn table
    ctx->base.fn_init = &_fs_do_init;
    ctx->base.fn_op_start = &_fs_do_op_start;
    ctx->base.fn_op_available = &_fs_do_op_available;
    ctx->base.fn_op_begin_read = &_fs_do_op_begin_read;
    ctx->base.fn_op_begin_write = &_fs_do_op_begin_write;
    ctx->base.fn_op_push = &_fs_do_op_push;
    ctx->base.fn_op_pull = &_fs_do_op_pull;
    ctx->base.fn_op_done = &_fs_do_op_done;
    ctx->base.fn_op_close = &_fs_do_op_close;
    
    return &ctx->base;

error:
    free(ctx);

    return NULL;
}