Skip to content

Commit

Permalink
Add initial implementation of a text protocol on top of TCP
Browse files Browse the repository at this point in the history
Loosely inspired by RESP but simplified, just handling strings, errors
and arrays for the time being.
  • Loading branch information
codepr committed Mar 17, 2024
1 parent aaeebaf commit 162d2b6
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 17 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ LIB_SOURCES = src/timeseries.c src/partition.c src/wal.c src/disk_io.c src/binar
LIB_OBJECTS = $(LIB_SOURCES:.c=.o)
LIB_PERSISTENCE = logdata

SERVER_SOURCES = src/main.c src/parser.c src/server.c src/ev.h src/ev_tcp.h
SERVER_SOURCES = src/main.c src/parser.c src/protocol.c src/server.c src/ev.h src/ev_tcp.h
SERVER_OBJECTS = $(SERVER_SOURCES:.c=.o)
SERVER_EXECUTABLE = server

Expand Down
46 changes: 31 additions & 15 deletions src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "logging.h"
#include "parser.h"
#include "persistent_index.h"
#include "protocol.h"
#include "server.h"
#include "timeseries.h"
#include <stdio.h>
Expand Down Expand Up @@ -294,22 +295,37 @@ int main(void) {
Statement_Select select = parse_select(tokens, total_tokens);

print_select(&select);
/* printf("SELECT\n\t%s\nFROM\n\t%s\nRANGE\n\t%li TO %li\nWHERE\n\t%s %i "

uint8_t dst[64];
Response rs = {.type = STRING_RSP,
.string_response = {.length = 3, .rc = 0, .message = "Ok!"}};
encode_response(&rs, &dst[0]);
printf("%s", dst);
Response rsb;
decode_response(&dst[0], &rsb);
printf("(%i) %s (%lu)\n", rsb.type, rsb.string_response.message,
rsb.string_response.length);

Request rq = {.length = 36,
.query = "SELECT temp FROM test RANGE 10 TO 45"};
encode_request(&rq, &dst[0]);
printf("%s", dst);

Request rqb;
decode_request(&dst[0], &rqb);
printf("%s (%lu)\n", rqb.query, rqb.length);
/* Select_Response r = {.length = 2, */
/* .db_name = (String_View){.p = "test-db", .length
* = 8}, */
/* .ts_name = (String_View){.p = "test-ts", .length
* = 8}, */
/* .records = {{.timestamp = 1982398, .value =
* 0.7227},
*/
/* "%.2lf\nAGGREGATE\n\t%i\nBY\n\t%lu\n", */
/* select.ts_name, select.db_name, select.start_time,
* select.end_time, */
/* select.where.key, select.where.operator, select.where.value, */
/* select.af, select.interval); */

/* AST_Node *ast = parse(tokens, 5); */
/* print_ast(ast); */
/* Command cmd = parse_ast(ast); */
/* printf("%s %lu %lu\n", cmd.query.ts_name, cmd.query.start_ts, */
/* cmd.query.end_ts); */
/* ast_free(ast); */

roachdb_server_run("127.0.0.1", 17678);
/* {.timestamp = 1982398, .value =
* 0.7227}}}; */

/* roachdb_server_run("127.0.0.1", 17678); */

return 0;
}
1 change: 0 additions & 1 deletion src/parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ static Token *tokenize_create(Lexer *l, size_t *token_count) {
while (l->length > 0 && ++i) {
token = lexer_next(l);

printf("%s (%lu)\n", token.p, token.length);
if (strncmp(token.p, "INTO", token.length) == 0) {
tokens[i].type = TOKEN_INTO;
token = lexer_next(l);
Expand Down
190 changes: 190 additions & 0 deletions src/protocol.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
#include "protocol.h"

static ssize_t encode_string(uint8_t *dst, const char *src, size_t length) {
size_t i = 0, j = 0;

// Payload length just after the $ indicator
size_t n = snprintf((char *)dst, 21, "%lu", length);
i += n;

// CRLF
dst[i++] = '\r';
dst[i++] = '\n';

// The query content
while (length-- > 0)
dst[i++] = src[j++];

// CRLF
dst[i++] = '\r';
dst[i++] = '\n';

return i;
}

ssize_t encode_request(const Request *r, uint8_t *dst) {
dst[0] = '$';
return 1 + encode_string(dst + 1, r->query, r->length);
}

ssize_t decode_request(const uint8_t *data, Request *dst) {
if (data[0] != '$')
return -1;

size_t i = 0;
const uint8_t *ptr = &data[1];

// Read length
while (*ptr != '\r' && *(ptr + 1) != '\n') {
dst->length *= 10;
dst->length += *ptr - '0';
ptr++;
}

// Jump over \r\n
ptr += 2;

// Read query string
while (*ptr != '\r' && *(ptr + 1) != '\n')
dst->query[i++] = *ptr++;

return dst->length;
}

ssize_t encode_response(const Response *r, uint8_t *dst) {
if (r->type == STRING_RSP) {
// String response
dst[0] = r->string_response.rc == 0 ? '$' : '!';
return 1 + encode_string(dst + 1, r->string_response.message,
r->string_response.length);
}
// Array response
dst[0] = '#';
ssize_t i = 1;
size_t length = r->array_response.length, j = 0;

// Array length
size_t n = snprintf((char *)dst + i, 20, "%lu", r->array_response.length);
i += n;

// CRLF
dst[i++] = '\r';
dst[i++] = '\n';

// Records
while (length-- > 0) {
// Timestamp
dst[i++] = ':';
n = snprintf((char *)dst + i, 21, "%lu",
r->array_response.records[j].timestamp);
i += n;
dst[i++] = '\r';
dst[i++] = '\n';
// Value
dst[i++] = ';';
n = snprintf((char *)dst + i, 21, "%.20lf",
r->array_response.records[j].value);
i += n;
dst[i++] = '\r';
dst[i++] = '\n';
j++;
}

// CRLF
dst[i++] = '\r';
dst[i++] = '\n';

return i;
}

static ssize_t decode_string(const uint8_t *ptr, Response *dst) {
size_t i = 0, n = 1;

// For simplicty, assume the only error code is 1 for now, it's not used ATM
dst->string_response.rc = *ptr == '!' ? 1 : 0;
ptr++;

// Read length
while (*ptr != '\r' && *(ptr + 1) != '\n') {
dst->string_response.length *= 10;
dst->string_response.length += *ptr - '0';
ptr++;
n++;
}

// Move forward after CRLF
ptr += 2;
n += 2;

while (*ptr != '\r' && *(ptr + 1) != '\n')
dst->string_response.message[i++] = *ptr++;

return i + n;
}

ssize_t decode_response(const uint8_t *data, Response *dst) {
const uint8_t *ptr = data;
uint8_t byte = *ptr;
ssize_t length = 0;

dst->type = byte == '*' ? ARRAY_RSP : STRING_RSP;

switch (byte) {
case '$':
case '!':
// Treat error and common strings the same for now
length = decode_string(ptr, dst);
break;
case '*':
// Read length
while (*ptr != '\r' && *(ptr + 1) != '\n') {
dst->array_response.length *= 10;
dst->array_response.length += *ptr - '0';
ptr++;
length++;
}

// Jump over \r\n
ptr += 2;
length += 2;

// Read records
size_t j = 0;
size_t total_records = dst->array_response.length;
// TODO arena malloc here
dst->array_response.records =
malloc(total_records * sizeof(*dst->array_response.records));
while (total_records-- > 0) {
// Timestamp
if (*ptr++ == ':') {
while (*ptr != '\r' && *(ptr + 1) != '\n') {
dst->array_response.records[j].timestamp *= 10;
dst->array_response.records[j].timestamp += *ptr - '0';
ptr++;
length++;
}
} else {
// Value
uint8_t buf[20];
size_t k = 0;
while (*ptr != '\r' && *(ptr + 1) != '\n') {
buf[k++] = *ptr;
ptr++;
length++;
}
char *end;
dst->array_response.records[j].value =
strtod((char *)buf, &end);
}
// Skip CRLF
ptr += 2;
length += 2;
j++;
}
break;
default:
break;
}

return length;
}
65 changes: 65 additions & 0 deletions src/protocol.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#ifndef PROTOCOL_H
#define PROTOCOL_H

#include "parser.h"
#include <math.h>
#include <stdint.h>
#include <stdlib.h>

/*
* Define a basic request, for the time being it's fine to treat
* every request as a simple string paired with it's length.
*/
typedef struct {
size_t length;
char query[512];
} Request;

/*
* Define a response of type string, ideally RC (return code) should have a
* meaning going forward.
*/
typedef struct {
size_t length;
uint8_t rc;
char message[512];
} String_Response;

/*
* Define a response of type array, mainly used as SELECT response.
*/
typedef struct {
size_t length;
struct {
uint64_t timestamp;
double_t value;
} *records;
} Array_Response;

typedef enum { STRING_RSP, ARRAY_RSP } Response_Type;

/*
* Define a generic response which can either be a string response or an array
* response for the time being
*/
typedef struct {
Response_Type type;
union {
String_Response string_response;
Array_Response array_response;
};
} Response;

// Encode a request into an array of bytes
ssize_t encode_request(const Request *r, uint8_t *dst);

// Decode a request from an array of bytes into a Request struct
ssize_t decode_request(const uint8_t *data, Request *dst);

// Encode a response into an array of bytes
ssize_t encode_response(const Response *r, uint8_t *dst);

// Decode a response from an array of bytes into a Response struct
ssize_t decode_response(const uint8_t *data, Response *dst);

#endif // PROTOCOL_H

0 comments on commit 162d2b6

Please sign in to comment.