#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <assert.h>
#include "cache.h"
#include "cache_engines.h"
#include "common.h"
#define LINE_LENGTH 1024
#define REQ_COUNT 8
static struct cache *cache = NULL;
static struct cache_engine *fs_engine = NULL;
static struct cache_req_ctx {
struct cache_req *req;
int index;
int pipe_read;
int pipe_write;
} req_list[REQ_COUNT];
static int quit = 0;
void usage (char *argv_0) {
err_exit("Usage: %s [-h] <cache_dir>", argv_0);
}
const char *cache_state_str (enum cache_req_state state) {
switch (state) {
case CACHE_STATE_INVALID: return "INVALID";
case CACHE_STATE_LOOKUP: return "LOOKUP";
case CACHE_STATE_HIT: return "HIT";
case CACHE_STATE_MISS: return "MISS";
case CACHE_STATE_OPEN_READ: return "OPEN_READ";
case CACHE_STATE_READ: return "READ";
case CACHE_STATE_OPEN_WRITE: return "OPEN_WRITE";
case CACHE_STATE_WRITE: return "WRITE";
case CACHE_STATE_WRITE_PAUSE: return "WRITE_PAUSE";
case CACHE_STATE_ERROR: return "ERROR";
default: return "???";
}
}
/*
const char *cache_event_str (enum cache_req_event event) {
switch (event) {
case CACHE_EVENT_HIT: return "HIT";
case CACHE_EVENT_MISS: return "MISS";
case CACHE_EVENT_BEGIN_WRITE: return "BEGIN_WRITE";
case CACHE_EVENT_BEGIN_READ: return "BEGIN_READ";
case CACHE_EVENT_PAUSE_WRITE: return "PAUSE_WRITE";
case CACHE_EVENT_RESUME_WRITE: return "RESUME_WRITE";
case CACHE_EVENT_DATA_AVAILABLE:return "DATA_AVAILABLE";
case CACHE_EVENT_DONE: return "DONE";
case CACHE_EVENT_ERROR: return "ERROR";
default: return "???";
}
}
*/
//int cache_cb (struct cache_req *req, enum cache_req_event event, void *arg) {
int cache_cb (struct cache_req *req, void *arg) {
struct cache_req_ctx *ctx = arg;
const struct cache_key *key = cache_req_key(ctx->req);
INFO("Request %d (%*s) event: state = %s",
ctx->index,
(int) key->length, key->buf,
cache_state_str(cache_req_state(req))
);
return 0;
}
void cmd_help (int index, char *cmd_name);
void cmd_quit (int index, char *cmd_name);
void cmd_req (int index, char *key_buf);
void cmd_status (int index, char *key);
void cmd_available (int index, char *unused);
void cmd_read (int index, char *unused);
void cmd_write (int index, char *unused);
void cmd_push (int index, char *data);
void cmd_pull (int index, char *unused);
void cmd_done (int index, char *unused);
void cmd_release (int index, char *unused);
struct cmd {
char *name;
void (*func)(int index, char *str);
char *usage;
} _commands[] = {
{ "quit", &cmd_quit, "quit" },
{ "help", &cmd_help, "help [<ignored> <cmd>]" },
{ "req", &cmd_req, "req <req_id> <key>" },
{ "status", &cmd_status, "status <req_id>" },
{ "available",&cmd_available, "available <req_id>" },
{ "read", &cmd_read, "read <req_id>" },
{ "write", &cmd_write, "write <req_id> [<size_hint>]" },
{ "push", &cmd_push, "push <req_id> <data>" },
{ "pull", &cmd_pull, "pull <req_id>" },
{ "done", &cmd_done, "done <req_id>" },
{ "release", &cmd_release, "release <req_id>" },
{ NULL, NULL, NULL }
};
void cmd_help (int index, char *cmd_name) {
struct cmd *cmd;
if (cmd_name == NULL) {
printf("Available commands: ");
for (cmd = _commands; cmd->func != NULL; cmd++) {
printf("%s ", cmd->name);
}
printf("\n");
} else {
for (cmd = _commands; cmd->func != NULL; cmd++) {
if (strcmp(cmd->name, cmd_name) == 0)
break;
}
if (cmd->func) {
INFO("%s", cmd->usage);
} else {
ERROR("Unknown command: '%s'", cmd_name);
}
}
error:
return;
}
void cmd_quit (int index, char *cmd_name) {
quit = 1;
}
void cmd_req (int index, char *key_buf) {
struct cache_key key;
if (index < 0 || index >= REQ_COUNT)
ERROR("index is out of range");
if (!key_buf)
ERROR("No key given");
key.buf = key_buf;
key.length = 0;
if ((req_list[index].req = cache_req(cache, &key, &cache_cb, &req_list[index])) == NULL)
ERROR("req_list failed");
INFO("Request opened: %s", cache_state_str(cache_req_state(req_list[index].req)));
error:
return;
}
void cmd_status (int index, char *unused) {
const struct cache_key *key;
if (index < 0 || index >= REQ_COUNT)
ERROR("index is out of range");
if (unused)
ERROR("Too many arguments");
key = cache_req_key(req_list[index].req);
INFO("Request %d (%*s) status: %s", index, (int) key->length, key->buf, cache_state_str(cache_req_state(req_list[index].req)));
error:
return;
}
void cmd_available (int index, char *unused) {
const struct cache_key *key;
size_t size, offset, available;
if (index < 0 || index >= REQ_COUNT)
ERROR("index is out of range");
if (unused)
ERROR("Too many arguments");
key = cache_req_key(req_list[index].req);
if (cache_req_available(req_list[index].req, &size, &offset, &available))
ERROR("cache_req_available failed");
INFO("Request %d (%*s) available: %zu/%zu, %zu to read",
index,
(int) key->length, key->buf,
offset, size, available
);
error:
return;
}
void cmd_read (int index, char *unused) {
const struct cache_key *key;
if (index < 0 || index >= REQ_COUNT)
ERROR("index is out of range");
key = cache_req_key(req_list[index].req);
INFO("Request %d (%*s): beginning read",
index, (int) key->length, key->buf
);
if (cache_req_begin_read(req_list[index].req))
ERROR("cache_req_begin_read failed");
error:
return;
}
void cmd_write (int index, char *hint_str) {
size_t hint;
const struct cache_key *key;
if (index < 0 || index >= REQ_COUNT)
ERROR("index is out of range");
if (hint_str)
hint = atoi(hint_str);
else
hint = 0;
key = cache_req_key(req_list[index].req);
INFO("Request %d (%*s): beginning write",
index, (int) key->length, key->buf
);
if (cache_req_begin_write(req_list[index].req, hint))
ERROR("cache_req_begin_write failed");
error:
return;
}
void cmd_push (int index, char *data) {
size_t data_length, length;
const struct cache_key *key;
data_length = data ? strlen(data) : 0;
if (data) {
// write to the pipe
if ((length = write(req_list[index].pipe_write, data, data_length)) == -1)
PERROR("write");
if (length != data_length)
PWARNING("Only %zu/%zu bytes written to pipe!", length, data_length);
}
// unknown size
length = 0;
key = cache_req_key(req_list[index].req);
if (cache_req_push(req_list[index].req, req_list[index].pipe_read, &length))
ERROR("cache_req_push failed");
INFO("Request %d (%*s): pushed %zu/%zu bytes",
index, (int) key->length, key->buf,
length, data_length
);
error:
return;
}
void cmd_pull (int index, char *unused) {
size_t data_length, length;
const struct cache_key *key;
char buf[LINE_LENGTH];
// unknown size
length = 0;
key = cache_req_key(req_list[index].req);
if (cache_req_pull(req_list[index].req, req_list[index].pipe_write, &length))
ERROR("cache_req_pull failed");
assert(length < LINE_LENGTH);
// read from the pipe
if ((data_length = read(req_list[index].pipe_read, buf, length)) == -1)
PERROR("read");
// terminate
buf[data_length] = '\0';
if (length != data_length)
PWARNING("Only %zu/%zu bytes read from pipe!", data_length, length);
INFO("Request %d (%*s): pulled %zu/%zu bytes: %s",
index, (int) key->length, key->buf,
data_length, length,
buf
);
error:
return;
}
void cmd_done (int index, char *unused) {
const struct cache_key *key;
key = cache_req_key(req_list[index].req);
if (cache_req_done(req_list[index].req))
ERROR("cache_req_done failed");
INFO("Request %d (%*s): done",
index, (int) key->length, key->buf
);
error:
return;
}
void cmd_release (int index, char *unused) {
const struct cache_key *key;
key = cache_req_key(req_list[index].req);
cache_req_release(req_list[index].req);
INFO("Request %d (%*s): released",
index, (int) key->length, key->buf
);
req_list[index].req = NULL;
}
int main (int argc, char **argv) {
int pipefds[2];
char line[LINE_LENGTH];
// init req_list
for (int index = 0; index < REQ_COUNT; index++) {
req_list[index].index = index;
if (pipe(pipefds))
PFATAL("pipe");
req_list[index].pipe_read = pipefds[0];
req_list[index].pipe_write = pipefds[1];
}
// parse arguments
int opt;
char *cache_dir;
while ((opt = getopt(argc, argv, "h")) != -1) {
switch (opt) {
case 'h':
default:
usage(argv[0]);
}
}
// the cache_dir argument
if (optind >= argc)
usage(argv[0]);
cache_dir = argv[optind];
// create the fs_engine
if ((fs_engine = cache_engine_fs(cache_dir)) == NULL)
FATAL("failed to initialize fs cache engine");
// create the cache
if ((cache = cache_open(fs_engine)) == NULL)
FATAL("failed to open the cache");
INFO("Filesystem cache opened at '%s'", cache_dir);
// enter the interactive loop
do {
printf(" > ");
if (fgets(line, LINE_LENGTH, stdin) == NULL)
quit = 1;
else {
char *line_ptr = line;
char *cmd_name = NULL, *str = NULL, *nl = NULL;
int index = 0;
// strip the newline
if ((nl = strchr(line, '\n')) != NULL)
*nl = '\0';
// parse the command/args
if (line_ptr)
cmd_name = strsep(&line_ptr, " ");
if (strlen(cmd_name) == 0)
cmd_name = NULL;
if (line_ptr)
index = atoi(strsep(&line_ptr, " "));
if (line_ptr)
str = strsep(&line_ptr, " ");
if (!cmd_name) {
continue;
}
struct cmd *cmd;
for (cmd = _commands; cmd->func != NULL; cmd++) {
if (strcmp(cmd->name, cmd_name) == 0)
break;
}
if (!cmd->func) {
INFO("Unknown command");
continue;
}
cmd->func(index, str);
}
printf("\n");
} while (!quit);
return 0;
}