From 733fa9a85633d6f136237a34076e7fcf8c3dd7e4 Mon Sep 17 00:00:00 2001 From: Andrea Date: Sun, 17 Mar 2024 23:10:54 +0000 Subject: [PATCH] Hacked together a client and CLI --- .gitignore | 2 + Makefile | 16 +++++-- README.md | 24 +++++----- src/client.c | 116 ++++++++++++++++++++++++++++++++++++++++++++++++ src/client.h | 45 +++++++++++++++++++ src/parser.c | 2 +- src/parser.h | 4 +- src/protocol.c | 1 + src/protocol.h | 3 +- src/roach_cli.c | 98 ++++++++++++++++++++++++++++++++++++++++ 10 files changed, 292 insertions(+), 19 deletions(-) create mode 100644 src/client.c create mode 100644 src/client.h create mode 100644 src/roach_cli.c diff --git a/.gitignore b/.gitignore index 2e68701..6e95656 100644 --- a/.gitignore +++ b/.gitignore @@ -20,4 +20,6 @@ logs tags build roach +roach-cli +roach-server server diff --git a/Makefile b/Makefile index e0390bc..f6dae3d 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,7 @@ CC = gcc CFLAGS = -Wall -Wextra -Werror -Wunused -std=c11 -pedantic -ggdb -fsanitize=address -fsanitize=undefined -fno-omit-frame-pointer -pg -D_DEFAULT_SOURCE=200809L -Iinclude -Isrc LDFLAGS = -L. -ltimeseries -fsanitize=address -fsanitize=undefined +LDFLAGS_CLI = -fsanitize=address -fsanitize=undefined LIB_SOURCES = src/timeseries.c src/partition.c src/wal.c src/disk_io.c src/binary.c src/logging.c src/persistent_index.c src/commit_log.c LIB_OBJECTS = $(LIB_SOURCES:.c=.o) @@ -8,9 +9,13 @@ LIB_PERSISTENCE = logdata SERVER_SOURCES = src/main.c src/parser.c src/protocol.c src/server.c src/ev.h src/ev_tcp.h SERVER_OBJECTS = $(SERVER_SOURCES:.c=.o) -SERVER_EXECUTABLE = server +SERVER_EXECUTABLE = roach-server -all: libtimeseries.so $(SERVER_EXECUTABLE) +CLI_SOURCES = src/client.c src/roach_cli.c src/protocol.c src/ev.h src/ev_tcp.h +CLI_OBJECTS = $(CLI_SOURCES:.c=.o) +CLI_EXECUTABLE = roach-cli + +all: libtimeseries.so $(SERVER_EXECUTABLE) $(CLI_EXECUTABLE) libtimeseries.so: $(LIB_OBJECTS) $(CC) -shared -o $@ $(LIB_OBJECTS) @@ -18,9 +23,12 @@ libtimeseries.so: $(LIB_OBJECTS) $(SERVER_EXECUTABLE): $(SERVER_OBJECTS) libtimeseries.so $(CC) -o $@ $(SERVER_OBJECTS) $(LDFLAGS) +$(CLI_EXECUTABLE): $(CLI_OBJECTS) + $(CC) -o $@ $(CLI_OBJECTS) $(LDFLAGS_CLI) + %.o: %.c $(CC) $(CFLAGS) -fPIC -c $< -o $@ clean: - rm -f $(LIB_OBJECTS) $(SERVER_OBJECTS) libtimeseries.so $(SERVER_EXECUTABLE) - rm -rf $(LIB_PERSISTENCE) 2> /dev/null + @rm -f $(LIB_OBJECTS) $(SERVER_OBJECTS) libtimeseries.so $(SERVER_EXECUTABLE) + @rm -rf $(LIB_PERSISTENCE) 2> /dev/null diff --git a/README.md b/README.md index cc8dbfc..a321c5c 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ Small timeseries database written in C, designed to efficiently store and retrieve timestamped data records. Mostly to explore persistent log data structure applications. -### Basics +## Basics Still at a very early stage, the main concepts are @@ -22,7 +22,7 @@ Still at a very early stage, the main concepts are log, providing durability and recovery in case of crashes or failures. -### TODO +## TODO - Duplicate points policy - CRC32 of records for data integrity @@ -31,7 +31,7 @@ Still at a very early stage, the main concepts are - Schema definitions - Server: Text based protocol, a simplified SQL-like would be cool -### Usage +## Usage At the current stage, no server attached, just a tiny library with some crude APIs; @@ -47,7 +47,7 @@ At the current stage, no server attached, just a tiny library with some crude AP Plus a few other helpers. -#### As a library +### As a library Build the `libtimeseries.so` first @@ -83,7 +83,7 @@ To run it LD_LIBRARY_PATH=/path/to/timeseries.so ./my_project ``` -#### Quickstart +### Quickstart ```c #include "timeseries.h" @@ -123,18 +123,18 @@ int main() { ``` -### Roach server draft +## Roach server draft Event based server (rely on [ev](https://github.com/codepr/ev.git) at least initially), TCP as the main transport protocol, text-based custom protocol inspired by RESP but simpler. -#### Simple query language +### Simple query language Definition of a simple, text-based format for clients to interact with the server, allowing them to send commands and receive responses. -##### Basic outline +#### Basic outline - **Text-Based Format:** Use a text-based format where each command and response is represented as a single line of text. @@ -145,25 +145,29 @@ server, allowing them to send commands and receive responses. clients after processing commands. Responses should provide relevant information or acknowledge the completion of the requested operation. -##### Core commands +#### Core commands Define the basic operations in a SQL-like query language - **CREATE** creates a database or a timeseries + `CREATE ` `CREATE INTO [] []` - **INSERT** insertion of point(s) in a timeseries + `INSERT INTO , ...` - **SELECT** query a timeseries, selection of point(s) and aggregations + `SELECT FROM RANGE TO WHERE value [>|<|=|<=|>=|!=] AGGREGATE [AVG|MIN|MAX] BY ` - **DELETE** delete a timeseries or a database + `DELETE ` `DELETE FROM ` -##### Flow: +#### Flow: 1. **Client Sends Command:** Clients send commands to the server in the specified text format. diff --git a/src/client.c b/src/client.c new file mode 100644 index 0000000..dd85a67 --- /dev/null +++ b/src/client.c @@ -0,0 +1,116 @@ +#include "client.h" +#include "protocol.h" +#include +#include +#include +#include +#include +#include +#include +#include + +#define BUFSIZE 2048 + +/* + * Create a non-blocking socket and use it to connect to the specified host and + * port + */ +static int roach_connect(const struct connect_options *opts) { + + /* socket: create the socket */ + int fd = socket(opts->s_family, SOCK_STREAM, 0); + if (fd < 0) + goto err; + + /* Set socket timeout for read and write if present on options */ + if (opts->timeout > 0) { + struct timeval tv; + tv.tv_sec = opts->timeout; + tv.tv_usec = 0; + setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (const char *)&tv, sizeof(tv)); + setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (const char *)&tv, sizeof(tv)); + } + + if (opts->s_family == AF_INET) { + struct sockaddr_in addr; + struct hostent *server; + + /* gethostbyname: get the server's DNS entry */ + server = gethostbyname(opts->s_addr); + if (server == NULL) + goto err; + + /* build the server's address */ + addr.sin_family = opts->s_family; + addr.sin_port = htons(opts->s_port); + addr.sin_addr = *((struct in_addr *)server->h_addr); + bzero(&(addr.sin_zero), 8); + + /* connect: create a connection with the server */ + if (connect(fd, (const struct sockaddr *)&addr, sizeof(addr)) == -1) + goto err; + + } else if (opts->s_family == AF_UNIX) { + struct sockaddr_un addr; + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + if (*opts->s_addr == '\0') { + *addr.sun_path = '\0'; + strncpy(addr.sun_path + 1, opts->s_addr + 1, + sizeof(addr.sun_path) - 2); + } else { + strncpy(addr.sun_path, opts->s_addr, sizeof(addr.sun_path) - 1); + } + + if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) + goto err; + } + + return fd; + +err: + + if (errno == EINPROGRESS) + return fd; + + perror("socket(2) opening socket failed"); + return CLIENT_FAILURE; +} + +void client_init(Client *c, const struct connect_options *opts) { + c->opts = opts; +} + +int client_connect(Client *c) { + int fd = roach_connect(c->opts); + if (fd < 0) + return CLIENT_FAILURE; + c->fd = fd; + return CLIENT_SUCCESS; +} + +void client_disconnect(Client *c) { close(c->fd); } + +int client_send_command(Client *c, char *buf) { + uint8_t data[BUFSIZE]; + Request rq = {.length = strlen(buf)}; + snprintf(rq.query, sizeof(rq.query), "%s", buf); + ssize_t n = encode_request(&rq, data); + if (n < 0) + return -1; + + return write(c->fd, data, n); +} + +int client_recv_response(Client *c, Response *rs) { + uint8_t data[BUFSIZE]; + ssize_t n = read(c->fd, data, BUFSIZE); + if (n < 0) + return -1; + + n = decode_response(data, rs); + if (n < 0) + return -1; + + return n; +} diff --git a/src/client.h b/src/client.h new file mode 100644 index 0000000..0074c7e --- /dev/null +++ b/src/client.h @@ -0,0 +1,45 @@ +#ifndef CLIENT_H +#define CLIENT_H + +#include +#include + +#define CLIENT_SUCCESS 0 +#define CLIENT_FAILURE -1 +#define CLIENT_UNKNOWN_CMD -2 + +typedef struct response Response; + +typedef struct client Client; + +/* + * Connection options, use this structure to specify connection related opts + * like socket family, host port and timeout for communication + */ +struct connect_options { + int timeout; + int s_family; + int s_port; + char *s_addr; +}; + +/* + * Pretty basic connection wrapper, just a FD with a buffer tracking bytes and + * some options for connection + */ +struct client { + int fd; + const struct connect_options *opts; +}; + +void client_init(Client *c, const struct connect_options *opts); + +int client_connect(Client *c); + +void client_disconnect(Client *c); + +int client_send_command(Client *c, char *buf); + +int client_recv_response(Client *c, Response *rs); + +#endif // CLIENT_H diff --git a/src/parser.c b/src/parser.c index 22f4038..3c5b227 100644 --- a/src/parser.c +++ b/src/parser.c @@ -333,7 +333,7 @@ Statement_Select parse_select(Token *tokens, size_t token_count) { } Statement parse(Token *tokens, size_t token_count) { - Statement statement = {.type = STATEMENT_UNKNOW}; + Statement statement = {.type = STATEMENT_UNKNOWN}; switch (tokens[0].type) { case TOKEN_CREATE: diff --git a/src/parser.h b/src/parser.h index df99621..4c64289 100644 --- a/src/parser.h +++ b/src/parser.h @@ -142,10 +142,10 @@ typedef struct { // Define statement types typedef enum { - STATEMENT_UNKNOW, STATEMENT_CREATE, STATEMENT_INSERT, - STATEMENT_SELECT + STATEMENT_SELECT, + STATEMENT_UNKNOWN } Statement_Type; // Define a generic statement diff --git a/src/protocol.c b/src/protocol.c index 6defcd0..a421444 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -1,4 +1,5 @@ #include "protocol.h" +#include static ssize_t encode_string(uint8_t *dst, const char *src, size_t length) { size_t i = 0, j = 0; diff --git a/src/protocol.h b/src/protocol.h index 1357fc3..2ff4a03 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -1,7 +1,6 @@ #ifndef PROTOCOL_H #define PROTOCOL_H -#include "parser.h" #include #include #include @@ -42,7 +41,7 @@ typedef enum { STRING_RSP, ARRAY_RSP } Response_Type; * Define a generic response which can either be a string response or an array * response for the time being */ -typedef struct { +typedef struct response { Response_Type type; union { String_Response string_response; diff --git a/src/roach_cli.c b/src/roach_cli.c new file mode 100644 index 0000000..f4ca0fe --- /dev/null +++ b/src/roach_cli.c @@ -0,0 +1,98 @@ +#include "client.h" +#include "protocol.h" +#include +#include +#include +#include + +#define LOCALHOST "127.0.0.1" +#define DEFAULT_PORT 17678 + +static const char *cmd_usage(const char *cmd) { + if (strncasecmp(cmd, "create", 6) == 0) + return "CREATE | [INTO ] [retention] [dup policy]"; + if (strncasecmp(cmd, "delete", 6) == 0) + return "DELETE |"; + if (strncasecmp(cmd, "insert", 3) == 0) + return "INSERT INTO timestamp|* " + "value, .."; + if (strncasecmp(cmd, "select", 5) == 0) + return "SELECT FROM [RANGE|AT] " + "start_timestamp [end_timestamp] [WHERE] " + "[<|>|<=|>=|=|!=] [AGGREGATE] [MIN|MAX|AVG] [BY literal]"; + return NULL; +} + +static double timespec_seconds(struct timespec *ts) { + return (double)ts->tv_sec + (double)ts->tv_nsec * 1.0e-9; +} + +static void prompt(Client *c) { + if (c->opts->s_family == AF_INET) + printf("%s:%i> ", c->opts->s_addr, c->opts->s_port); + else if (c->opts->s_family == AF_UNIX) + printf("%s> ", c->opts->s_addr); +} + +static void print_response(const Response *rs) { + if (rs->type == STRING_RSP) { + printf("%s\n", rs->string_response.message); + } else { + for (size_t i = 0; i < rs->array_response.length; ++i) + printf("%lu %.20f\n", rs->array_response.records[i].timestamp, + rs->array_response.records[i].value); + } +} + +int main(void) { + int port = DEFAULT_PORT, mode = AF_INET; + char *host = LOCALHOST; + size_t line_len = 0LL; + char *line = NULL; + Response rs; + double delta = 0.0; + + Client c; + struct connect_options conn_opts; + memset(&conn_opts, 0x00, sizeof(conn_opts)); + conn_opts.s_family = mode; + conn_opts.s_addr = host; + conn_opts.s_port = port; + client_init(&c, &conn_opts); + if (client_connect(&c) < 0) + exit(EXIT_FAILURE); + int err = 0; + struct timespec start_time = {0}, end_time = {0}; + while (1) { + prompt(&c); + getline(&line, &line_len, stdin); + (void)clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &start_time); + err = client_send_command(&c, line); + if (err <= 0) { + if (err == CLIENT_SUCCESS) { + client_disconnect(&c); + break; + } else if (err == CLIENT_UNKNOWN_CMD) { + printf("Unknown command or malformed one\n"); + const char *usage = cmd_usage(line); + if (usage) + printf("\nSuggesed usage: %s\n\n", usage); + } else if (err == CLIENT_FAILURE) { + printf("Couldn't send the command: %s\n", strerror(errno)); + } + continue; + } + client_recv_response(&c, &rs); + (void)clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &end_time); + print_response(&rs); + if (rs.type == ARRAY_RSP) { + delta = timespec_seconds(&end_time) - timespec_seconds(&end_time); + printf("%lu results in %lf seconds.\n", rs.array_response.length, + delta); + } + } + client_disconnect(&c); + free(line); + return 0; +}