From fc7ecd185bfac8a7b6cef1fc32c92cfd56e11a1c Mon Sep 17 00:00:00 2001 From: attaluris Date: Thu, 6 Dec 2018 10:55:29 -0800 Subject: [PATCH] started json_cursor functionality; some fixes to json-parser functionality --- libconfluo/confluo/atomic_multilog.h | 32 +- .../confluo/container/cursor/json_cursors.h | 51 +++ libconfluo/confluo/schema/schema.h | 5 +- libconfluo/src/atomic_multilog.cc | 51 ++- .../src/container/cursor/json_cursors.cc | 33 ++ libconfluo/src/schema/schema.cc | 2 +- librpc/src/rpc_server.cc | 4 +- pyclient/confluo/rpc/rpc_service-remote | 299 ------------------ 8 files changed, 171 insertions(+), 306 deletions(-) create mode 100644 libconfluo/confluo/container/cursor/json_cursors.h create mode 100644 libconfluo/src/container/cursor/json_cursors.cc delete mode 100755 pyclient/confluo/rpc/rpc_service-remote diff --git a/libconfluo/confluo/atomic_multilog.h b/libconfluo/confluo/atomic_multilog.h index a5214bc13..68b0f1cae 100644 --- a/libconfluo/confluo/atomic_multilog.h +++ b/libconfluo/confluo/atomic_multilog.h @@ -22,6 +22,7 @@ #include "conf/configuration_params.h" #include "container/data_log.h" #include "container/cursor/record_cursors.h" +#include "container/cursor/json_cursors.h" #include "container/cursor/alert_cursor.h" #include "container/monolog/monolog.h" #include "container/radix_tree.h" @@ -252,7 +253,7 @@ class atomic_multilog { * @param json_data The json-formatted data to be stored * @return The offset of where the data is located */ - size_t append_json(std::string json_data); + size_t append_json(const std::string &json_data); // TODO: Add a std::tuple based variant // TODO: Add a JSON based variant @@ -333,6 +334,13 @@ class atomic_multilog { */ std::unique_ptr execute_filter(const std::string &expr) const; + /** + * Executes the filter expression + * @param expr The filter expression + * @return The result of applying the filter to the atomic multilog + */ +// std::unique_ptr execute_filter_json(const std::string &expr) const; + // TODO: Add tests /** * Executes an aggregate @@ -355,6 +363,17 @@ class atomic_multilog { uint64_t begin_ms, uint64_t end_ms) const; + /** + * Queries an existing filter + * @param filter_name Name of the filter + * @param begin_ms Beginning of time-range in ms + * @param end_ms End of time-range in ms + * @return A stream containing the results of the filter + */ +// std::unique_ptr query_filter_json(const std::string &filter_name, +// uint64_t begin_ms, +// uint64_t end_ms) const; + /** * Queries an existing filter * @param filter_name The name of the filter @@ -366,6 +385,17 @@ class atomic_multilog { std::unique_ptr query_filter(const std::string &filter_name, uint64_t begin_ms, uint64_t end_ms, const std::string &additional_filter_expr) const; + /** + * Queries an existing filter + * @param filter_name The name of the filter + * @param begin_ms Beginning of time-range in ms + * @param end_ms End of time-range in ms + * @param additional_filter_expr Additional filter expression + * @return A stream containing the results of the filter + */ +// std::unique_ptr query_filter_json(const std::string &filter_name, uint64_t begin_ms, uint64_t end_ms, +// const std::string &additional_filter_expr) const; + /** * Query a stored aggregate. * @param aggregate_name The name of the aggregate diff --git a/libconfluo/confluo/container/cursor/json_cursors.h b/libconfluo/confluo/container/cursor/json_cursors.h new file mode 100644 index 000000000..cceb0cd9e --- /dev/null +++ b/libconfluo/confluo/container/cursor/json_cursors.h @@ -0,0 +1,51 @@ +#ifndef CONFLUO_CONTAINER_CURSOR_JSON_CURSOR_H_ +#define CONFLUO_CONTAINER_CURSOR_JSON_CURSOR_H_ + +#include + +#include "batched_cursor.h" +#include "offset_cursors.h" +#include "schema/record.h" +#include "parser/expression_compiler.h" +#include "container/data_log.h" + +namespace confluo { + +typedef batched_cursor json_cursor; + +/** + * A json cursor that make records into a json formatted string + */ +class aggregated_json_cursor : public json_cursor { + public: + /** + * Initializes the filter record + * + * @param o_cursor The offset cursor + * @param dlog The data log pointer + * @param schema The schema + * @param cexpr The filter expression + * @param batch_size The number of records in the batch + */ + aggregated_json_cursor(std::unique_ptr o_cursor, + const data_log *dlog, const schema_t *schema, + const parser::compiled_expression &cexpr, + size_t batch_size = 64); + + /** + * Loads the next batch from the cursor + * + * @return The size of the batch + */ + virtual size_t load_next_batch() override; + + private: + std::unique_ptr o_cursor_; + const data_log *dlog_; + const schema_t *schema_; + const parser::compiled_expression &cexpr_; +}; + +} + +#endif /* CONFLUO_CONTAINER_CURSOR_JSON_CURSOR_H_ */ diff --git a/libconfluo/confluo/schema/schema.h b/libconfluo/confluo/schema/schema.h index ab286ac51..c90841438 100644 --- a/libconfluo/confluo/schema/schema.h +++ b/libconfluo/confluo/schema/schema.h @@ -169,14 +169,13 @@ class schema_t { * * @return A pointer to the record data */ - void *json_string_to_data(const std::string json_record) const; + void *json_string_to_data(const std::string &json_record) const; /** * Converts the records into a pointer to the record data * + * @param ret The json string that is filled up with the data * @param record The records used for conversion - * - * @return A pointer to the record data */ void data_to_json_string(std::string &ret, const void *data) const; diff --git a/libconfluo/src/atomic_multilog.cc b/libconfluo/src/atomic_multilog.cc index 8f4dfd77b..d6f842967 100644 --- a/libconfluo/src/atomic_multilog.cc +++ b/libconfluo/src/atomic_multilog.cc @@ -228,7 +228,7 @@ size_t atomic_multilog::append(void *data) { return offset; } -size_t atomic_multilog::append_json(std::string json_record) { +size_t atomic_multilog::append_json(const std::string &json_record) { void *buf = schema_.json_string_to_data(json_record); size_t off = append(buf); delete[] reinterpret_cast(buf); @@ -299,6 +299,15 @@ std::unique_ptr atomic_multilog::execute_filter(const std::string return plan.execute(version); } +//std::unique_ptr atomic_multilog::execute_filter_json(const std::string &expr) const { +// uint64_t version = rt_.get(); +// auto t = parser::parse_expression(expr); +// auto cexpr = parser::compile_expression(t, schema_); +// query_plan plan = planner_.plan(cexpr); +// return plan.execute(version); +// // TODO: figure out return for this +//} + numeric atomic_multilog::execute_aggregate(const std::string &aggregate_expr, const std::string &filter_expr) { auto pa = parser::parse_aggregate(aggregate_expr); aggregator agg = aggregate_manager::get_aggregator(pa.agg); @@ -331,6 +340,27 @@ std::unique_ptr atomic_multilog::query_filter(const std::string & parser::compiled_expression())); } +//std::unique_ptr atomic_multilog::query_filter_json(const std::string &filter_name, +// uint64_t begin_ms, +// uint64_t end_ms) const { +// filter_id_t filter_id; +// if (filter_map_.get(filter_name, filter_id) == -1) { +// throw invalid_operation_exception( +// "Filter " + filter_name + " does not exist."); +// } +// +// filter::range_result res = filters_.at(filter_id)->lookup_range(begin_ms, +// end_ms); +// uint64_t version = rt_.get(); +// std::unique_ptr o_cursor( +// new offset_iterator_cursor(res.begin(), +// res.end(), +// version)); +// return std::unique_ptr( +// new json_cursor(std::move(o_cursor), &data_log_, &schema_, +// parser::compiled_expression())); +//} + std::unique_ptr atomic_multilog::query_filter(const std::string &filter_name, uint64_t begin_ms, uint64_t end_ms, @@ -350,6 +380,25 @@ std::unique_ptr atomic_multilog::query_filter(const std::string & return std::unique_ptr(new filter_record_cursor(std::move(o_cursor), &data_log_, &schema_, e)); } +//std::unique_ptr atomic_multilog::query_filter_json(const std::string &filter_name, +// uint64_t begin_ms, +// uint64_t end_ms, +// const std::string &additional_filter_expr) const { +// auto t = parser::parse_expression(additional_filter_expr); +// auto e = parser::compile_expression(t, schema_); +// filter_id_t filter_id; +// if (filter_map_.get(filter_name, filter_id) == -1) { +// throw invalid_operation_exception( +// "Filter " + filter_name + " does not exist."); +// } +// +// filter::range_result res = filters_.at(filter_id)->lookup_range(begin_ms, end_ms); +// uint64_t version = rt_.get(); +// std::unique_ptr o_cursor( +// new offset_iterator_cursor(res.begin(), res.end(), version)); +// return std::unique_ptr(new json_cursor(std::move(o_cursor), &data_log_, &schema_, e)); +//} + numeric atomic_multilog::get_aggregate(const std::string &aggregate_name, uint64_t begin_ms, uint64_t end_ms) { aggregate_id_t aggregate_id; if (aggregate_map_.get(aggregate_name, aggregate_id) == -1) { diff --git a/libconfluo/src/container/cursor/json_cursors.cc b/libconfluo/src/container/cursor/json_cursors.cc new file mode 100644 index 000000000..2ad11ea80 --- /dev/null +++ b/libconfluo/src/container/cursor/json_cursors.cc @@ -0,0 +1,33 @@ +#include "container/cursor/json_cursors.h" + +namespace confluo { + +aggregated_json_cursor::aggregated_json_cursor(std::unique_ptr o_cursor, + const data_log *dlog, + const schema_t *schema, + const parser::compiled_expression &cexpr, + size_t batch_size) + : json_cursor(batch_size), + o_cursor_(std::move(o_cursor)), + dlog_(dlog), + schema_(schema), + cexpr_(cexpr) { + init(); +} + +size_t aggregated_json_cursor::load_next_batch() { + // TODO add functionality to make into a json_string + size_t i = 0; + for (; i < current_batch_.size() && o_cursor_->has_more(); + ++i, o_cursor_->advance()) { + uint64_t o = o_cursor_->get(); + read_only_data_log_ptr ptr; + dlog_->cptr(o, ptr); + if (!cexpr_.test(current_batch_[i] = schema_->apply(o, ptr))) { + i--; + } + } + return i; +} + +} \ No newline at end of file diff --git a/libconfluo/src/schema/schema.cc b/libconfluo/src/schema/schema.cc index 9ffcf0e68..3cf97b22d 100644 --- a/libconfluo/src/schema/schema.cc +++ b/libconfluo/src/schema/schema.cc @@ -94,7 +94,7 @@ std::string schema_t::to_string() const { return str; } -void *schema_t::json_string_to_data(const std::string json_record) const { +void *schema_t::json_string_to_data(const std::string &json_record) const { // need to convert json_data into a record vector std::stringstream ss; // putting the json data into a stream diff --git a/librpc/src/rpc_server.cc b/librpc/src/rpc_server.cc index 4936ebe00..3e2edfe35 100644 --- a/librpc/src/rpc_server.cc +++ b/librpc/src/rpc_server.cc @@ -178,9 +178,11 @@ void rpc_service_handler::read(std::string &_return, int64_t id, const int64_t o _return.assign(data, size); } void rpc_service_handler::read_json(std::string &_return, int64_t id, const int64_t offset, const int64_t nrecords) { + if (nrecords > 1) { + THROW(unsupported_exception, "Reading more than 1 JSON records at a time is unsupported!"); + } atomic_multilog *mlog = store_->get_atomic_multilog(id); _return = mlog->read_json((uint64_t) offset); - // TODO: put in functionality for nrecords to be read } void rpc_service_handler::query_aggregate(std::string &_return, int64_t id, diff --git a/pyclient/confluo/rpc/rpc_service-remote b/pyclient/confluo/rpc/rpc_service-remote deleted file mode 100755 index 0544b2776..000000000 --- a/pyclient/confluo/rpc/rpc_service-remote +++ /dev/null @@ -1,299 +0,0 @@ -#!/usr/bin/env python -# -# Autogenerated by Thrift Compiler (0.11.0) -# -# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING -# -# options string: py -# - -import sys -import pprint -if sys.version_info[0] > 2: - from urllib.parse import urlparse -else: - from urlparse import urlparse -from thrift.transport import TTransport, TSocket, TSSLSocket, THttpClient -from thrift.protocol.TBinaryProtocol import TBinaryProtocol - -from confluo.rpc import rpc_service -from confluo.rpc.ttypes import * - -if len(sys.argv) <= 1 or sys.argv[1] == '--help': - print('') - print('Usage: ' + sys.argv[0] + ' [-h host[:port]] [-u url] [-f[ramed]] [-s[sl]] [-novalidate] [-ca_certs certs] [-keyfile keyfile] [-certfile certfile] function [arg1 [arg2...]]') - print('') - print('Functions:') - print(' void register_handler()') - print(' void deregister_handler()') - print(' i64 create_atomic_multilog(string name, rpc_schema schema, rpc_storage_mode mode)') - print(' rpc_atomic_multilog_info get_atomic_multilog_info(string name)') - print(' void remove_atomic_multilog(i64 multilog_id)') - print(' void add_index(i64 multilog_id, string field_name, double bucket_size)') - print(' void remove_index(i64 multilog_id, string field_name)') - print(' void add_filter(i64 multilog_id, string filter_name, string filter_expr)') - print(' void remove_filter(i64 multilog_id, string filter_name)') - print(' void add_aggregate(i64 mutlilog_id, string aggregate_name, string filter_name, string aggregate_expr)') - print(' void remove_aggregate(i64 multilog_id, string aggregate_name)') - print(' void add_trigger(i64 multilog_id, string trigger_name, string trigger_expr)') - print(' void remove_trigger(i64 multilog_id, string trigger_name)') - print(' i64 append(i64 multilog_id, string data)') - print(' i64 append_json(i64 multilog_id, string data)') - print(' i64 append_batch(i64 multilog_id, rpc_record_batch batch)') - print(' string read(i64 multilog_id, i64 offset, i64 nrecords)') - print(' string read_json(i64 multilog_id, i64 offset, i64 nrecords)') - print(' string query_aggregate(i64 multilog_id, string aggregate_name, i64 begin_ms, i64 end_ms)') - print(' string adhoc_aggregate(i64 multilog_id, string aggregate_expr, string filter_expr)') - print(' rpc_iterator_handle adhoc_filter(i64 multilog_id, string filter_expr)') - print(' rpc_iterator_handle predef_filter(i64 multilog_id, string filter_name, i64 begin_ms, i64 end_ms)') - print(' rpc_iterator_handle combined_filter(i64 multilog_id, string filter_name, string filter_expr, i64 begin_ms, i64 end_ms)') - print(' rpc_iterator_handle alerts_by_time(i64 multilog_id, i64 begin_ms, i64 end_ms)') - print(' rpc_iterator_handle alerts_by_trigger_and_time(i64 multilog_id, string trigger_name, i64 begin_ms, i64 end_ms)') - print(' rpc_iterator_handle get_more(i64 multilog_id, rpc_iterator_descriptor desc)') - print(' i64 num_records(i64 multilog_id)') - print('') - sys.exit(0) - -pp = pprint.PrettyPrinter(indent=2) -host = 'localhost' -port = 9090 -uri = '' -framed = False -ssl = False -validate = True -ca_certs = None -keyfile = None -certfile = None -http = False -argi = 1 - -if sys.argv[argi] == '-h': - parts = sys.argv[argi + 1].split(':') - host = parts[0] - if len(parts) > 1: - port = int(parts[1]) - argi += 2 - -if sys.argv[argi] == '-u': - url = urlparse(sys.argv[argi + 1]) - parts = url[1].split(':') - host = parts[0] - if len(parts) > 1: - port = int(parts[1]) - else: - port = 80 - uri = url[2] - if url[4]: - uri += '?%s' % url[4] - http = True - argi += 2 - -if sys.argv[argi] == '-f' or sys.argv[argi] == '-framed': - framed = True - argi += 1 - -if sys.argv[argi] == '-s' or sys.argv[argi] == '-ssl': - ssl = True - argi += 1 - -if sys.argv[argi] == '-novalidate': - validate = False - argi += 1 - -if sys.argv[argi] == '-ca_certs': - ca_certs = sys.argv[argi+1] - argi += 2 - -if sys.argv[argi] == '-keyfile': - keyfile = sys.argv[argi+1] - argi += 2 - -if sys.argv[argi] == '-certfile': - certfile = sys.argv[argi+1] - argi += 2 - -cmd = sys.argv[argi] -args = sys.argv[argi + 1:] - -if http: - transport = THttpClient.THttpClient(host, port, uri) -else: - if ssl: - socket = TSSLSocket.TSSLSocket(host, port, validate=validate, ca_certs=ca_certs, keyfile=keyfile, certfile=certfile) - else: - socket = TSocket.TSocket(host, port) - if framed: - transport = TTransport.TFramedTransport(socket) - else: - transport = TTransport.TBufferedTransport(socket) -protocol = TBinaryProtocol(transport) -client = rpc_service.Client(protocol) -transport.open() - -if cmd == 'register_handler': - if len(args) != 0: - print('register_handler requires 0 args') - sys.exit(1) - pp.pprint(client.register_handler()) - -elif cmd == 'deregister_handler': - if len(args) != 0: - print('deregister_handler requires 0 args') - sys.exit(1) - pp.pprint(client.deregister_handler()) - -elif cmd == 'create_atomic_multilog': - if len(args) != 3: - print('create_atomic_multilog requires 3 args') - sys.exit(1) - pp.pprint(client.create_atomic_multilog(args[0], eval(args[1]), eval(args[2]),)) - -elif cmd == 'get_atomic_multilog_info': - if len(args) != 1: - print('get_atomic_multilog_info requires 1 args') - sys.exit(1) - pp.pprint(client.get_atomic_multilog_info(args[0],)) - -elif cmd == 'remove_atomic_multilog': - if len(args) != 1: - print('remove_atomic_multilog requires 1 args') - sys.exit(1) - pp.pprint(client.remove_atomic_multilog(eval(args[0]),)) - -elif cmd == 'add_index': - if len(args) != 3: - print('add_index requires 3 args') - sys.exit(1) - pp.pprint(client.add_index(eval(args[0]), args[1], eval(args[2]),)) - -elif cmd == 'remove_index': - if len(args) != 2: - print('remove_index requires 2 args') - sys.exit(1) - pp.pprint(client.remove_index(eval(args[0]), args[1],)) - -elif cmd == 'add_filter': - if len(args) != 3: - print('add_filter requires 3 args') - sys.exit(1) - pp.pprint(client.add_filter(eval(args[0]), args[1], args[2],)) - -elif cmd == 'remove_filter': - if len(args) != 2: - print('remove_filter requires 2 args') - sys.exit(1) - pp.pprint(client.remove_filter(eval(args[0]), args[1],)) - -elif cmd == 'add_aggregate': - if len(args) != 4: - print('add_aggregate requires 4 args') - sys.exit(1) - pp.pprint(client.add_aggregate(eval(args[0]), args[1], args[2], args[3],)) - -elif cmd == 'remove_aggregate': - if len(args) != 2: - print('remove_aggregate requires 2 args') - sys.exit(1) - pp.pprint(client.remove_aggregate(eval(args[0]), args[1],)) - -elif cmd == 'add_trigger': - if len(args) != 3: - print('add_trigger requires 3 args') - sys.exit(1) - pp.pprint(client.add_trigger(eval(args[0]), args[1], args[2],)) - -elif cmd == 'remove_trigger': - if len(args) != 2: - print('remove_trigger requires 2 args') - sys.exit(1) - pp.pprint(client.remove_trigger(eval(args[0]), args[1],)) - -elif cmd == 'append': - if len(args) != 2: - print('append requires 2 args') - sys.exit(1) - pp.pprint(client.append(eval(args[0]), args[1],)) - -elif cmd == 'append_json': - if len(args) != 2: - print('append_json requires 2 args') - sys.exit(1) - pp.pprint(client.append_json(eval(args[0]), args[1],)) - -elif cmd == 'append_batch': - if len(args) != 2: - print('append_batch requires 2 args') - sys.exit(1) - pp.pprint(client.append_batch(eval(args[0]), eval(args[1]),)) - -elif cmd == 'read': - if len(args) != 3: - print('read requires 3 args') - sys.exit(1) - pp.pprint(client.read(eval(args[0]), eval(args[1]), eval(args[2]),)) - -elif cmd == 'read_json': - if len(args) != 3: - print('read_json requires 3 args') - sys.exit(1) - pp.pprint(client.read_json(eval(args[0]), eval(args[1]), eval(args[2]),)) - -elif cmd == 'query_aggregate': - if len(args) != 4: - print('query_aggregate requires 4 args') - sys.exit(1) - pp.pprint(client.query_aggregate(eval(args[0]), args[1], eval(args[2]), eval(args[3]),)) - -elif cmd == 'adhoc_aggregate': - if len(args) != 3: - print('adhoc_aggregate requires 3 args') - sys.exit(1) - pp.pprint(client.adhoc_aggregate(eval(args[0]), args[1], args[2],)) - -elif cmd == 'adhoc_filter': - if len(args) != 2: - print('adhoc_filter requires 2 args') - sys.exit(1) - pp.pprint(client.adhoc_filter(eval(args[0]), args[1],)) - -elif cmd == 'predef_filter': - if len(args) != 4: - print('predef_filter requires 4 args') - sys.exit(1) - pp.pprint(client.predef_filter(eval(args[0]), args[1], eval(args[2]), eval(args[3]),)) - -elif cmd == 'combined_filter': - if len(args) != 5: - print('combined_filter requires 5 args') - sys.exit(1) - pp.pprint(client.combined_filter(eval(args[0]), args[1], args[2], eval(args[3]), eval(args[4]),)) - -elif cmd == 'alerts_by_time': - if len(args) != 3: - print('alerts_by_time requires 3 args') - sys.exit(1) - pp.pprint(client.alerts_by_time(eval(args[0]), eval(args[1]), eval(args[2]),)) - -elif cmd == 'alerts_by_trigger_and_time': - if len(args) != 4: - print('alerts_by_trigger_and_time requires 4 args') - sys.exit(1) - pp.pprint(client.alerts_by_trigger_and_time(eval(args[0]), args[1], eval(args[2]), eval(args[3]),)) - -elif cmd == 'get_more': - if len(args) != 2: - print('get_more requires 2 args') - sys.exit(1) - pp.pprint(client.get_more(eval(args[0]), eval(args[1]),)) - -elif cmd == 'num_records': - if len(args) != 1: - print('num_records requires 1 args') - sys.exit(1) - pp.pprint(client.num_records(eval(args[0]),)) - -else: - print('Unrecognized method %s' % cmd) - sys.exit(1) - -transport.close()