Skip to content

Commit

Permalink
Updated server module to use the parser
Browse files Browse the repository at this point in the history
  • Loading branch information
codepr committed Mar 16, 2024
1 parent e855c15 commit aaeebaf
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 16 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ Build it linking the library
gcc -o my_project main.c -I/path/to/timeseries/include -L/path/to/timeseries -ltimeseries
```

To run it

```bash
LD_LIBRARY_PATH=/path/to/timeseries.so ./my_project
```

#### Quickstart

```c
Expand Down
2 changes: 2 additions & 0 deletions src/arena.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ static void *arena_alloc_aligned(Arena *a, size_t size, size_t alignment) {

return ptr;
}

void *arena_alloc(size_t size, void *context) {
if (!size)
return 0;

return arena_alloc_aligned((Arena *)context, size, DEFAULT_ALIGNMENT);
}

void arena_free(size_t size, void *ptr, void *context) {
(void)ptr;
(void)size;
Expand Down
54 changes: 46 additions & 8 deletions src/main.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#include "commit_log.h"
#include "logging.h"
#include "parser.h"
#include "persistent_index.h"
#include "protocol.h"
#include "server.h"
#include "timeseries.h"
#include <stdio.h>
#include <stdlib.h>
Expand Down Expand Up @@ -265,13 +266,50 @@ int main(void) {

/* tsdb_close(db); */

Token *tokens = tokenize("TS.QUERY RANGE temperatures 12 24");
AST_Node *ast = parse(tokens, 5);
print_ast(ast);
Command cmd = parse_ast(ast);
printf("%s %lu %lu\n", cmd.query.ts_name, cmd.query.start_ts,
cmd.query.end_ts);
ast_free(ast);
size_t total_tokens = 0;
Token *tokens = tokenize("CREATE temperatures INTO db_test", &total_tokens);

Statement_Create create = parse_create(tokens, total_tokens);

print_create(&create);
printf("\n");

total_tokens = 0;

tokens = tokenize(
"INSERT temperatures INTO db_test 12, 98.2, 15, 96.2, 18, 99.1 ",
&total_tokens);

Statement_Insert insert = parse_insert(tokens, total_tokens);

print_insert(&insert);
printf("\n");

total_tokens = 0;

tokens = tokenize("SELECT temperatures FROM test_db RANGE 10 TO 45 "
"WHERE value > 67.8 AGGREGATE AVG BY 3600",
&total_tokens);

Statement_Select select = parse_select(tokens, total_tokens);

print_select(&select);
/* printf("SELECT\n\t%s\nFROM\n\t%s\nRANGE\n\t%li TO %li\nWHERE\n\t%s %i "
*/
/* "%.2lf\nAGGREGATE\n\t%i\nBY\n\t%lu\n", */
/* select.ts_name, select.db_name, select.start_time,
* select.end_time, */
/* select.where.key, select.where.operator, select.where.value, */
/* select.af, select.interval); */

/* AST_Node *ast = parse(tokens, 5); */
/* print_ast(ast); */
/* Command cmd = parse_ast(ast); */
/* printf("%s %lu %lu\n", cmd.query.ts_name, cmd.query.start_ts, */
/* cmd.query.end_ts); */
/* ast_free(ast); */

roachdb_server_run("127.0.0.1", 17678);

return 0;
}
12 changes: 8 additions & 4 deletions src/parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,11 @@ Statement_Insert parse_insert(Token *tokens, size_t token_count) {
snprintf(insert.db_name, sizeof(insert.db_name), "%s",
tokens[i].value);
} else if (tokens[i].type == TOKEN_TIMESTAMP) {
insert.records[j].timestamp = atoll(tokens[i].value);
// Crude check for empty timestamp
if (tokens[i].value[0] == '*')
insert.records[j].timestamp = -1;
else
insert.records[j].timestamp = atoll(tokens[i].value);
} else if (tokens[i].type == TOKEN_LITERAL) {
insert.records[j++].value = strtod(tokens[i].value, &endptr);
}
Expand Down Expand Up @@ -315,11 +319,11 @@ Statement_Select parse_select(Token *tokens, size_t token_count) {
break;
case TOKEN_AGGREGATE_FN:
if (strncmp(tokens[i].value, "AVG", 3) == 0)
select.af = AF_AVG;
select.af = AFN_AVG;
else if (strncmp(tokens[i].value, "MIN", 3) == 0)
select.af = AF_MIN;
select.af = AFN_MIN;
else if (strncmp(tokens[i].value, "MAX", 3) == 0)
select.af = AF_MAX;
select.af = AFN_MAX;
break;
default:
break;
Expand Down
4 changes: 2 additions & 2 deletions src/parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ typedef struct {
} Token;

// Define aggregate function types
typedef enum { AF_AVG, AF_MIN, AF_MAX } Aggregate_Function;
typedef enum { AFN_AVG, AFN_MIN, AFN_MAX } Aggregate_Function;

// Define operator types
typedef enum { OP_EQ, OP_NE, OP_GE, OP_GT, OP_LE, OP_LT } Operator;
Expand Down Expand Up @@ -109,7 +109,7 @@ typedef struct {

// Define a pair (timestamp, value) for INSERT statements
typedef struct {
uint64_t timestamp;
int64_t timestamp;
double_t value;
} Create_Record;

Expand Down
94 changes: 92 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,79 @@
#define EV_SOURCE
#define EV_TCP_SOURCE
#include "ev_tcp.h"
#include "protocol.h"
#include "logging.h"
#include "parser.h"
#include "server.h"
#include "timeseries.h"

#define BACKLOG 128

// testing dummy
static Timeseries_DB *db;

static void execute_statement(Statement *statement) {
Record r = {0};
Timeseries *ts = NULL;
struct timespec tv;

switch (statement->type) {
case STATEMENT_CREATE:
if (statement->create.mask == 0)
db = tsdb_init(statement->create.db_name);
else
(void)ts_create(db, statement->create.ts_name, 0, DP_IGNORE);
break;
case STATEMENT_INSERT:
ts = ts_get(db, statement->insert.ts_name);
uint64_t timestamp = 0;
for (size_t i = 0; i < statement->insert.record_len; ++i) {
if (statement->insert.records[i].timestamp == -1) {
clock_gettime(CLOCK_REALTIME, &tv);
timestamp = tv.tv_sec * 1e9 + tv.tv_nsec;
} else {
timestamp = statement->insert.records[i].timestamp;
}
ts_insert(ts, timestamp, statement->insert.records[i].value);
}
break;
case STATEMENT_SELECT:
ts = ts_get(db, statement->select.ts_name);
int err = 0;
Points coll;
vec_new(coll);

if (statement->select.mask & SM_SINGLE) {
err = ts_find(ts, statement->select.start_time, &r);
if (err < 0)
log_error("Couldn't find the record %lu",
statement->select.start_time);
else
log_info("Record found: %lu %.2lf", r.timestamp, r.value);
} else if (statement->select.mask & SM_RANGE) {
err = ts_range(ts, statement->select.start_time,
statement->select.end_time, &coll);
if (err < 0) {
log_error("Couldn't find the record %lu",
statement->select.start_time);
} else {
for (size_t i = 0; i < vec_size(coll); i++) {
Record r = vec_at(coll, i);
log_info(" %lu {.sec: %lu, .nsec: %lu .value: %.02f }", r.timestamp,
r.tv.tv_sec, r.tv.tv_nsec, r.value);

}
}
}
break;
default:
log_error("Unknown command");
break;
}

if (ts)
ts_close(ts);
}

static void on_close(ev_tcp_handle *client, int err) {
(void)client;
if (err == EV_TCP_SUCCESS)
Expand All @@ -22,7 +90,26 @@ static void on_write(ev_tcp_handle *client) {
}

static void on_data(ev_tcp_handle *client) {
printf("Data received (%lu)\n", client->buffer.size);
if (client->buffer.size == 0)
return;

char *line_start = client->buffer.buf;
char *line_end;

while ((line_end = strstr(line_start, "\r\n")) != NULL) {
*line_end = '\0';
log_info("Line received %s", line_start);
// Line gets processed here
// Parse into Statement
size_t total_tokens;
Token *tokens = tokenize(line_start, &total_tokens);
Statement statement = parse(tokens, total_tokens);
// Execute it
execute_statement(&statement);
line_start = line_end + 2;
}

ev_tcp_queue_write(client);
}

static void on_connection(ev_tcp_handle *server) {
Expand All @@ -39,6 +126,7 @@ static void on_connection(ev_tcp_handle *server) {
}

int roachdb_server_run(const char *host, int port) {
db = tsdb_init("testdb");
ev_context *ctx = ev_get_context();
ev_tcp_server server;
ev_tcp_server_init(&server, ctx, BACKLOG);
Expand All @@ -59,5 +147,7 @@ int roachdb_server_run(const char *host, int port) {
// to stop the server with Ctrl+C
ev_tcp_server_stop(&server);

tsdb_close(db);

return 0;
}

0 comments on commit aaeebaf

Please sign in to comment.