Skip to content

Commit

Permalink
Handle index out of bound case
Browse files Browse the repository at this point in the history
  • Loading branch information
codepr committed Mar 10, 2024
1 parent 047bb1b commit 7af0760
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 6 deletions.
1 change: 1 addition & 0 deletions .github/workflows/roach.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: make
run: make
20 changes: 18 additions & 2 deletions src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ int main(void) {
ts_range(ts, timestamps[1], timestamps[41], &coll);
for (size_t i = 0; i < vec_size(coll); i++) {
Record r = vec_at(coll, i);
log_info(" %lu {.sec: %lu, .nsec: %lu .value: %.02f", r.timestamp,
log_info(" %lu {.sec: %lu, .nsec: %lu .value: %.02f}", r.timestamp,
r.tv.tv_sec, r.tv.tv_nsec, r.value);
}

Expand All @@ -138,10 +138,26 @@ int main(void) {
ts_range(ts, timestamps[1], timestamps[89], &coll);
for (size_t i = 0; i < vec_size(coll); i++) {
Record r = vec_at(coll, i);
log_info(" %lu {.sec: %lu, .nsec: %lu .value: %.02f", r.timestamp,
log_info(" %lu {.sec: %lu, .nsec: %lu .value: %.02f}", r.timestamp,
r.tv.tv_sec, r.tv.tv_nsec, r.value);
}

log_info("[3] Add an out of bounds timestamp");
ts_set_record(ts, timestamps[89] + 5e9, 181.1);
for (size_t i = 0; i <= ts->partition_nr; ++i)
c_log_print(&ts->partitions[i].clog);
(void)ts_find_record(ts, timestamps[89] + 5e9, &r);
log_info(" %lu {.sec: %lu, .nsec: %lu .value: %.02f}", r.timestamp,
r.tv.tv_sec, r.tv.tv_nsec, r.value);

log_info("[4] Add a prev range timestamp");
ts_set_record(ts, timestamps[89] + 5e7, 141.231);
for (size_t i = 0; i <= ts->partition_nr; ++i)
c_log_print(&ts->partitions[i].clog);
(void)ts_find_record(ts, timestamps[89] + 5e7, &r);
log_info(" %lu {.sec: %lu, .nsec: %lu .value: %.02f}", r.timestamp,
r.tv.tv_sec, r.tv.tv_nsec, r.value);

vec_destroy(coll);
ts_destroy(ts);

Expand Down
43 changes: 39 additions & 4 deletions src/timeseries.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,20 @@ static void ts_chunk_destroy(Timeseries_Chunk *tc) {
tc->max_index = 0;
}

static int ts_chunk_record_fit(const Timeseries_Chunk *tc, uint64_t sec) {
// Relative offset inside the 2 arrays
size_t index = sec - tc->base_offset;

// Index outside of the head chunk range
// 1. Flush the tail chunk to persistence
// 2. Create a new head chunk set on the next 15 min
// 3. Make the current head chunk the new tail chunk
if (index > TS_CHUNK_SIZE - 1)
return -1;

return 0;
}

/*
* Set a record in the chunk at a relative index based on the first timestamp
* stored e.g.
Expand All @@ -138,14 +152,18 @@ static void ts_chunk_destroy(Timeseries_Chunk *tc) {
* Index 6
*
* The information stored is initially formed by a timestamp and a long double
* value
* value.
*
* Remarks
*
* - This function assumes the record will fit in the chunk, by previously
* checking it with `ts_chunk_record_fit(2)`
*
*/
static int ts_chunk_set_record(Timeseries_Chunk *tc, uint64_t sec,
uint64_t nsec, double_t value) {
// Relative offset inside the 2 arrays
size_t index = sec - tc->base_offset;
assert(index < TS_CHUNK_SIZE);

// Append to the last record in this timestamp bucket
Record point = {
.value = value,
Expand Down Expand Up @@ -318,7 +336,9 @@ int ts_set_record(Timeseries *ts, uint64_t timestamp, double_t value) {
// Persist to disk for disaster recovery
wal_append_record(&ts->prev.wal, timestamp, value);

return ts_chunk_set_record(&ts->prev, sec, nsec, value);
// If we successfully insert the record, we can return
if (ts_chunk_record_fit(&ts->prev, sec) == 0)
return ts_chunk_set_record(&ts->prev, sec, nsec, value);
}

// TODO
Expand All @@ -330,6 +350,21 @@ int ts_set_record(Timeseries *ts, uint64_t timestamp, double_t value) {

// Persist to disk for disaster recovery
wal_append_record(&ts->head.wal, timestamp, value);
// Insert it into the head chunk
if (ts_chunk_record_fit(&ts->head, sec) < 0) {
// Flush the prev chunk to persistence
if (partition_flush_chunk(&ts->partitions[ts->partition_nr],
&ts->prev) < 0)
return -1;
// Clean up the prev chunk and delete it's WAL
ts_chunk_destroy(&ts->prev);
wal_delete(&ts->prev.wal);
// Set the current head as new prev
ts->prev = ts->head;
// Reset the current head as new head
ts_chunk_destroy(&ts->head);
wal_delete(&ts->head.wal);
}
return ts_chunk_set_record(&ts->head, sec, nsec, value);
}

Expand Down

0 comments on commit 7af0760

Please sign in to comment.