From e471ea633464b7f52f14d6dd99301705c02bb894 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Mon, 22 Jan 2024 20:43:47 +0200 Subject: [PATCH] feat: add API to set time unit in a metric This will allow reusing the same client for writing data to different tables with different time units. --- src/greptimedb.erl | 55 +++++++++++++------------------------- src/greptimedb_encoder.erl | 32 +++++++++++++++------- test/greptimedb_SUITE.erl | 21 +++++++++++++++ 3 files changed, 63 insertions(+), 45 deletions(-) diff --git a/src/greptimedb.erl b/src/greptimedb.erl index dcb561e..23db644 100644 --- a/src/greptimedb.erl +++ b/src/greptimedb.erl @@ -17,10 +17,23 @@ -export([start_client/1, stop_client/1, write_batch/2, write/3, write_stream/1, async_write/4, async_write_batch/3, is_alive/1, is_alive/2, ddl/1]). +-export_type([metric/0, point/0, timeunit/0]). + +-type table() :: atom() | binary() | list(). +-type dbname() :: atom() | binary() | list(). +-type timeunit() :: ns | us| ms | s | nanosecond | microsecond | millisecond | second. +-type metric() :: table() + | {dbname(), table()} + | #{dbname => dbname(), table := table(), timeunit => timeunit()}. +-type point() :: #{tags => map(), + fields => map(), + timestamp => integer()}. + -spec start_client(list()) -> {ok, Client :: map()} | {error, {already_started, Client :: map()}} | {error, Reason :: term()}. + start_client(Options0) -> Pool = proplists:get_value(pool, Options0), Options = lists:keydelete(protocol, 1, lists:keydelete(pool, 1, Options0)), @@ -41,30 +54,15 @@ start_client(Options0) -> %% @doc Write points to the metric table, return the result. -spec write(Client, Metric, Points) -> {ok, term()} | {error, term()} when Client :: map(), - Metric :: Table | {DbName, Table}, - DbName :: atom() | binary() | list(), - Table :: atom() | binary() | list(), - Points :: [Point], - Point :: - #{tags => map(), - fields => map(), - timestamp => integer()}. + Metric :: metric(), + Points :: [point()]. write(Client, Metric, Points) -> write_batch(Client, [{Metric, Points}]). %% @doc Write a batch of data points to the database, return the result. -spec write_batch(Client, MetricAndPoints) -> {ok, term()} | {error, term()} when Client :: map(), - MetricAndPoints :: [MetricAndPoint], - MetricAndPoint :: {Metric, Points}, - Metric :: Table | {DbName, Table}, - DbName :: atom() | binary() | list(), - Table :: atom() | binary() | list(), - Points :: [Point], - Point :: - #{tags => map(), - fields => map(), - timestamp => integer()}. + MetricAndPoints :: [{metric(), [point()]}]. write_batch(Client, MetricAndPoints) -> try Request = greptimedb_encoder:insert_requests(Client, MetricAndPoints), @@ -89,14 +87,8 @@ write_stream(Client) -> %% @doc Send an async request to write points to the metric table. The callback is evaluated when an error happens or response is received. -spec async_write(Client, Metric, Points, ResultCallback) -> ok | {error, term()} when Client :: map(), - Metric :: Table | {DbName, Table}, - DbName :: atom() | binary() | list(), - Table :: atom() | binary() | list(), - Points :: [Point], - Point :: - #{tags => map(), - fields => map(), - timestamp => integer()}, + Metric :: metric(), + Points :: [point()], ResultCallback :: {function(), list()}. async_write(Client, Metric, Points, ResultCallback) -> async_write_batch(Client, [{Metric, Points}], ResultCallback). @@ -104,16 +96,7 @@ async_write(Client, Metric, Points, ResultCallback) -> %% @doc Send a batch of async request. The callback is evaluated when an error happens or response is received. -spec async_write_batch(Client, MetricAndPoints, ResultCallback) -> ok | {error, term()} when Client :: map(), - MetricAndPoints :: [MetricAndPoint], - MetricAndPoint :: {Metric, Points}, - Metric :: Table | {DbName, Table}, - DbName :: atom() | binary() | list(), - Table :: atom() | binary() | list(), - Points :: [Point], - Point :: - #{tags => map(), - fields => map(), - timestamp => integer()}, + MetricAndPoints :: [{metric(), [point()]}], ResultCallback :: {function(), list()}. async_write_batch(Client, MetricAndPoints, ResultCallback) -> Request = greptimedb_encoder:insert_requests(Client, MetricAndPoints), diff --git a/src/greptimedb_encoder.erl b/src/greptimedb_encoder.erl index 13fa4c2..fa8dd7e 100644 --- a/src/greptimedb_encoder.erl +++ b/src/greptimedb_encoder.erl @@ -32,8 +32,8 @@ insert_requests(#{cli_opts := Options} = _Client, [], DbName, Inserts) -> #{dbname => DbName, authorization => #{auth_scheme => Scheme}} end, #{header => Header, request => {inserts, #{inserts => Inserts}}}; -insert_requests(#{cli_opts := Options} = Client, [{Table, Points} | T], PrevDbName, Inserts) -> - {DbName, Insert} = insert_request(Options, Table, Points), +insert_requests(#{cli_opts := Options} = Client, [{Metric, Points} | T], PrevDbName, Inserts) -> + {DbName, Insert} = insert_request(Options, metric(Options, Metric), Points), case PrevDbName of unknown -> insert_requests(Client, T, DbName, [Insert | Inserts]); @@ -41,21 +41,35 @@ insert_requests(#{cli_opts := Options} = Client, [{Table, Points} | T], PrevDbNa insert_requests(Client, T, Name, [Insert | Inserts]) end. -insert_request(Options, {DbName, Table}, Points) -> - Timeunit = proplists:get_value(timeunit, Options, ms), +insert_request(_Options, #{dbname := DbName, table := Table, timeunit := Timeunit}, Points) -> RowCount = length(Points), - Columns = - lists:map(fun(Column) -> pad_null_mask(Column, RowCount) end, collect_columns(Timeunit, Points)), + Columns = lists:map(fun(Column) -> pad_null_mask(Column, RowCount) end, + collect_columns(Timeunit, Points)), {DbName, #{table_name => Table, columns => Columns, - row_count => RowCount}}; -insert_request(Options, Table, Points) -> - insert_request(Options, {?DEFAULT_DBNAME, Table}, Points). + row_count => RowCount}}. %%%=================================================================== %%% Internal functions %%%=================================================================== + +metric(Options, Metric) -> + metric_with_default(default_metric(Options), Metric). + +default_metric(Options) -> + #{dbname => ?DEFAULT_DBNAME, + timeunit => proplists:get_value(timeunit, Options, ms)}. + +%% table is required +metric_with_default(Default, #{table := _} = Metric) -> + maps:merge(Default, Metric); +%% backward compatibility +metric_with_default(Default, {DbName, Table}) -> + Default#{dbname => DbName, table => Table}; +metric_with_default(Default, Table) when is_atom(Table); is_list(Table); is_binary(Table) -> + Default#{table => Table}. + collect_columns(Timeunit, Points) -> collect_columns(Timeunit, Points, []). diff --git a/test/greptimedb_SUITE.erl b/test/greptimedb_SUITE.erl index 365fd9b..5e8b597 100644 --- a/test/greptimedb_SUITE.erl +++ b/test/greptimedb_SUITE.erl @@ -9,6 +9,7 @@ all() -> [t_write, t_write_stream, t_insert_requests, + t_insert_requests_with_timeunit, t_write_failure, t_write_batch, t_bench_perf, @@ -107,6 +108,26 @@ t_insert_requests(_) -> end, ok. +t_insert_requests_with_timeunit(_) -> + TsNano = 1705946037724448346, + Points = [#{fields => #{<<"temperature">> => 1}, + tags => + #{<<"from">> => <<"mqttx_4b963a8e">>, + <<"host">> => <<"serverA">>, + <<"qos">> => "0", + <<"device">> => <<"NO.1">>, + <<"region">> => <<"hangzhou">>}, + timestamp => TsNano}], + AuthInfo = {basic, #{username => "test", password => "test"}}, + Client = #{cli_opts => [{auth, AuthInfo}, {timeunit, second}]}, + Metric = #{table => "Test", timeunit => nanosecond}, + Request = greptimedb_encoder:insert_requests(Client, [{Metric, Points}]), + #{header := #{dbname := _DbName, authorization := _Auth}, + request := {inserts, #{inserts := [#{columns := Columns}]}}} = Request, + {value, TimestampColumn} = + lists:search(fun(C) -> maps:get(column_name, C) == <<"greptime_timestamp">> end, Columns), + ?assertEqual([TsNano], maps:get(timestamp_nanosecond_values, maps:get(values, TimestampColumn))). + t_write_failure(_) -> Metric = <<"temperatures">>, Points =