Skip to content

Commit

Permalink
implemented json_string_cursor and corresponding multilog filters
Browse files Browse the repository at this point in the history
  • Loading branch information
attaluris committed Dec 14, 2018
1 parent 6e0c2a0 commit 0cce8a8
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 306 deletions.
32 changes: 31 additions & 1 deletion libconfluo/confluo/atomic_multilog.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "conf/configuration_params.h"
#include "container/data_log.h"
#include "container/cursor/record_cursors.h"
#include "container/cursor/json_cursors.h"
#include "container/cursor/alert_cursor.h"
#include "container/monolog/monolog.h"
#include "container/radix_tree.h"
Expand Down Expand Up @@ -252,7 +253,7 @@ class atomic_multilog {
* @param json_data The json-formatted data to be stored
* @return The offset of where the data is located
*/
size_t append_json(std::string json_data);
size_t append_json(const std::string &json_data);

// TODO: Add a std::tuple based variant
// TODO: Add a JSON based variant
Expand Down Expand Up @@ -333,6 +334,13 @@ class atomic_multilog {
*/
std::unique_ptr<record_cursor> execute_filter(const std::string &expr) const;

/**
* Executes the filter expression
* @param expr The filter expression
* @return The result of applying the filter to the atomic multilog
*/
std::unique_ptr<json_cursor> execute_filter_json(const std::string &expr) const;

// TODO: Add tests
/**
* Executes an aggregate
Expand All @@ -355,6 +363,17 @@ class atomic_multilog {
uint64_t begin_ms,
uint64_t end_ms) const;

/**
* Queries an existing filter
* @param filter_name Name of the filter
* @param begin_ms Beginning of time-range in ms
* @param end_ms End of time-range in ms
* @return A stream containing the results of the filter
*/
std::unique_ptr<json_cursor> query_filter_json(const std::string &filter_name,
uint64_t begin_ms,
uint64_t end_ms) const;

/**
* Queries an existing filter
* @param filter_name The name of the filter
Expand All @@ -366,6 +385,17 @@ class atomic_multilog {
std::unique_ptr<record_cursor> query_filter(const std::string &filter_name, uint64_t begin_ms, uint64_t end_ms,
const std::string &additional_filter_expr) const;

/**
* Queries an existing filter
* @param filter_name The name of the filter
* @param begin_ms Beginning of time-range in ms
* @param end_ms End of time-range in ms
* @param additional_filter_expr Additional filter expression
* @return A stream containing the results of the filter
*/
std::unique_ptr<json_cursor> query_filter_json(const std::string &filter_name, uint64_t begin_ms, uint64_t end_ms,
const std::string &additional_filter_expr) const;

/**
* Query a stored aggregate.
* @param aggregate_name The name of the aggregate
Expand Down
47 changes: 47 additions & 0 deletions libconfluo/confluo/container/cursor/json_cursors.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#ifndef CONFLUO_CONTAINER_CURSOR_JSON_CURSOR_H_
#define CONFLUO_CONTAINER_CURSOR_JSON_CURSOR_H_

#include <unordered_set>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>

#include "batched_cursor.h"
#include "offset_cursors.h"
#include "schema/record.h"
#include "parser/expression_compiler.h"
#include "container/data_log.h"

namespace confluo {

typedef batched_cursor<std::string> json_cursor;

/**
* A json cursor that make records into a json formatted string
*/
class json_string_cursor : public json_cursor {
public:
/**
* Initializes the filter record
*
* @param r_cursor The record cursor
* @param schema The schema
* @param batch_size The number of records in the batch
*/
json_string_cursor(std::unique_ptr<record_cursor> r_cursor, const schema_t *schema,
size_t batch_size = 64);

/**
* Loads the next batch from the cursor
*
* @return The size of the batch
*/
virtual size_t load_next_batch() override;

private:
std::unique_ptr<record_cursor> r_cursor_;
const schema_t *schema_;
};

}

#endif /* CONFLUO_CONTAINER_CURSOR_JSON_CURSOR_H_ */
5 changes: 2 additions & 3 deletions libconfluo/confluo/schema/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,13 @@ class schema_t {
*
* @return A pointer to the record data
*/
void *json_string_to_data(const std::string json_record) const;
void *json_string_to_data(const std::string &json_record) const;

/**
* Converts the records into a pointer to the record data
*
* @param ret The json string that is filled up with the data
* @param record The records used for conversion
*
* @return A pointer to the record data
*/
void data_to_json_string(std::string &ret, const void *data) const;

Expand Down
25 changes: 24 additions & 1 deletion libconfluo/src/atomic_multilog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ size_t atomic_multilog::append(void *data) {
return offset;
}

size_t atomic_multilog::append_json(std::string json_record) {
size_t atomic_multilog::append_json(const std::string &json_record) {
void *buf = schema_.json_string_to_data(json_record);
size_t off = append(buf);
delete[] reinterpret_cast<uint8_t *>(buf);
Expand Down Expand Up @@ -299,6 +299,12 @@ std::unique_ptr<record_cursor> atomic_multilog::execute_filter(const std::string
return plan.execute(version);
}

std::unique_ptr<json_cursor> atomic_multilog::execute_filter_json(const std::string &expr) const {
std::unique_ptr<record_cursor> r_cursor = execute_filter(expr);

return std::unique_ptr<json_cursor>(new json_string_cursor(std::move(r_cursor), &schema_));
}

numeric atomic_multilog::execute_aggregate(const std::string &aggregate_expr, const std::string &filter_expr) {
auto pa = parser::parse_aggregate(aggregate_expr);
aggregator agg = aggregate_manager::get_aggregator(pa.agg);
Expand Down Expand Up @@ -331,6 +337,14 @@ std::unique_ptr<record_cursor> atomic_multilog::query_filter(const std::string &
parser::compiled_expression()));
}

std::unique_ptr<json_cursor> atomic_multilog::query_filter_json(const std::string &filter_name,
uint64_t begin_ms,
uint64_t end_ms) const {
std::unique_ptr<record_cursor> r_cursor = query_filter(filter_name, begin_ms, end_ms);

return std::unique_ptr<json_cursor>(new json_string_cursor(std::move(r_cursor), &schema_));
}

std::unique_ptr<record_cursor> atomic_multilog::query_filter(const std::string &filter_name,
uint64_t begin_ms,
uint64_t end_ms,
Expand All @@ -350,6 +364,15 @@ std::unique_ptr<record_cursor> atomic_multilog::query_filter(const std::string &
return std::unique_ptr<record_cursor>(new filter_record_cursor(std::move(o_cursor), &data_log_, &schema_, e));
}

std::unique_ptr<json_cursor> atomic_multilog::query_filter_json(const std::string &filter_name,
uint64_t begin_ms,
uint64_t end_ms,
const std::string &additional_filter_expr) const {
std::unique_ptr<record_cursor> r_cursor = query_filter(filter_name, begin_ms, end_ms, additional_filter_expr);

return std::unique_ptr<json_cursor>(new json_string_cursor(std::move(r_cursor), &schema_));
}

numeric atomic_multilog::get_aggregate(const std::string &aggregate_name, uint64_t begin_ms, uint64_t end_ms) {
aggregate_id_t aggregate_id;
if (aggregate_map_.get(aggregate_name, aggregate_id) == -1) {
Expand Down
28 changes: 28 additions & 0 deletions libconfluo/src/container/cursor/json_cursors.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#include "container/cursor/json_cursors.h"

namespace confluo {

json_string_cursor::json_string_cursor(std::unique_ptr<record_cursor> r_cursor,
const schema_t *schema,
size_t batch_size)
: json_cursor(batch_size),
r_cursor_(std::move(r_cursor)),
schema_(schema) {
init();
}

size_t json_string_cursor::load_next_batch() {
namespace pt = boost::property_tree;
pt::ptree root;

size_t i = 0;
for (; i < current_batch_.size() && r_cursor_->has_more();
++i, r_cursor_->advance()) {
record_t r = r_cursor_->get();
std::string json_rec = schema_.data_to_json_string(r.data());
current_batch_[i] = json_rec;
}
return i;
}

}
2 changes: 1 addition & 1 deletion libconfluo/src/schema/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ std::string schema_t::to_string() const {
return str;
}

void *schema_t::json_string_to_data(const std::string json_record) const {
void *schema_t::json_string_to_data(const std::string &json_record) const {
// need to convert json_data into a record vector
std::stringstream ss;
// putting the json data into a stream
Expand Down
4 changes: 3 additions & 1 deletion librpc/src/rpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,11 @@ void rpc_service_handler::read(std::string &_return, int64_t id, const int64_t o
_return.assign(data, size);
}
void rpc_service_handler::read_json(std::string &_return, int64_t id, const int64_t offset, const int64_t nrecords) {
if (nrecords > 1) {
THROW(unsupported_exception, "Reading more than 1 JSON records at a time is unsupported!");
}
atomic_multilog *mlog = store_->get_atomic_multilog(id);
_return = mlog->read_json((uint64_t) offset);
// TODO: put in functionality for nrecords to be read
}
void rpc_service_handler::query_aggregate(std::string &_return,
int64_t id,
Expand Down
Loading

0 comments on commit 0cce8a8

Please sign in to comment.