From e471ea633464b7f52f14d6dd99301705c02bb894 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Mon, 22 Jan 2024 20:43:47 +0200 Subject: [PATCH 1/2] feat: add API to set time unit in a metric This will allow reusing the same client for writing data to different tables with different time units. --- src/greptimedb.erl | 55 +++++++++++++------------------------- src/greptimedb_encoder.erl | 32 +++++++++++++++------- test/greptimedb_SUITE.erl | 21 +++++++++++++++ 3 files changed, 63 insertions(+), 45 deletions(-) diff --git a/src/greptimedb.erl b/src/greptimedb.erl index dcb561e..23db644 100644 --- a/src/greptimedb.erl +++ b/src/greptimedb.erl @@ -17,10 +17,23 @@ -export([start_client/1, stop_client/1, write_batch/2, write/3, write_stream/1, async_write/4, async_write_batch/3, is_alive/1, is_alive/2, ddl/1]). +-export_type([metric/0, point/0, timeunit/0]). + +-type table() :: atom() | binary() | list(). +-type dbname() :: atom() | binary() | list(). +-type timeunit() :: ns | us| ms | s | nanosecond | microsecond | millisecond | second. +-type metric() :: table() + | {dbname(), table()} + | #{dbname => dbname(), table := table(), timeunit => timeunit()}. +-type point() :: #{tags => map(), + fields => map(), + timestamp => integer()}. + -spec start_client(list()) -> {ok, Client :: map()} | {error, {already_started, Client :: map()}} | {error, Reason :: term()}. + start_client(Options0) -> Pool = proplists:get_value(pool, Options0), Options = lists:keydelete(protocol, 1, lists:keydelete(pool, 1, Options0)), @@ -41,30 +54,15 @@ start_client(Options0) -> %% @doc Write points to the metric table, return the result. -spec write(Client, Metric, Points) -> {ok, term()} | {error, term()} when Client :: map(), - Metric :: Table | {DbName, Table}, - DbName :: atom() | binary() | list(), - Table :: atom() | binary() | list(), - Points :: [Point], - Point :: - #{tags => map(), - fields => map(), - timestamp => integer()}. + Metric :: metric(), + Points :: [point()]. write(Client, Metric, Points) -> write_batch(Client, [{Metric, Points}]). %% @doc Write a batch of data points to the database, return the result. -spec write_batch(Client, MetricAndPoints) -> {ok, term()} | {error, term()} when Client :: map(), - MetricAndPoints :: [MetricAndPoint], - MetricAndPoint :: {Metric, Points}, - Metric :: Table | {DbName, Table}, - DbName :: atom() | binary() | list(), - Table :: atom() | binary() | list(), - Points :: [Point], - Point :: - #{tags => map(), - fields => map(), - timestamp => integer()}. + MetricAndPoints :: [{metric(), [point()]}]. write_batch(Client, MetricAndPoints) -> try Request = greptimedb_encoder:insert_requests(Client, MetricAndPoints), @@ -89,14 +87,8 @@ write_stream(Client) -> %% @doc Send an async request to write points to the metric table. The callback is evaluated when an error happens or response is received. -spec async_write(Client, Metric, Points, ResultCallback) -> ok | {error, term()} when Client :: map(), - Metric :: Table | {DbName, Table}, - DbName :: atom() | binary() | list(), - Table :: atom() | binary() | list(), - Points :: [Point], - Point :: - #{tags => map(), - fields => map(), - timestamp => integer()}, + Metric :: metric(), + Points :: [point()], ResultCallback :: {function(), list()}. async_write(Client, Metric, Points, ResultCallback) -> async_write_batch(Client, [{Metric, Points}], ResultCallback). @@ -104,16 +96,7 @@ async_write(Client, Metric, Points, ResultCallback) -> %% @doc Send a batch of async request. The callback is evaluated when an error happens or response is received. -spec async_write_batch(Client, MetricAndPoints, ResultCallback) -> ok | {error, term()} when Client :: map(), - MetricAndPoints :: [MetricAndPoint], - MetricAndPoint :: {Metric, Points}, - Metric :: Table | {DbName, Table}, - DbName :: atom() | binary() | list(), - Table :: atom() | binary() | list(), - Points :: [Point], - Point :: - #{tags => map(), - fields => map(), - timestamp => integer()}, + MetricAndPoints :: [{metric(), [point()]}], ResultCallback :: {function(), list()}. async_write_batch(Client, MetricAndPoints, ResultCallback) -> Request = greptimedb_encoder:insert_requests(Client, MetricAndPoints), diff --git a/src/greptimedb_encoder.erl b/src/greptimedb_encoder.erl index 13fa4c2..fa8dd7e 100644 --- a/src/greptimedb_encoder.erl +++ b/src/greptimedb_encoder.erl @@ -32,8 +32,8 @@ insert_requests(#{cli_opts := Options} = _Client, [], DbName, Inserts) -> #{dbname => DbName, authorization => #{auth_scheme => Scheme}} end, #{header => Header, request => {inserts, #{inserts => Inserts}}}; -insert_requests(#{cli_opts := Options} = Client, [{Table, Points} | T], PrevDbName, Inserts) -> - {DbName, Insert} = insert_request(Options, Table, Points), +insert_requests(#{cli_opts := Options} = Client, [{Metric, Points} | T], PrevDbName, Inserts) -> + {DbName, Insert} = insert_request(Options, metric(Options, Metric), Points), case PrevDbName of unknown -> insert_requests(Client, T, DbName, [Insert | Inserts]); @@ -41,21 +41,35 @@ insert_requests(#{cli_opts := Options} = Client, [{Table, Points} | T], PrevDbNa insert_requests(Client, T, Name, [Insert | Inserts]) end. -insert_request(Options, {DbName, Table}, Points) -> - Timeunit = proplists:get_value(timeunit, Options, ms), +insert_request(_Options, #{dbname := DbName, table := Table, timeunit := Timeunit}, Points) -> RowCount = length(Points), - Columns = - lists:map(fun(Column) -> pad_null_mask(Column, RowCount) end, collect_columns(Timeunit, Points)), + Columns = lists:map(fun(Column) -> pad_null_mask(Column, RowCount) end, + collect_columns(Timeunit, Points)), {DbName, #{table_name => Table, columns => Columns, - row_count => RowCount}}; -insert_request(Options, Table, Points) -> - insert_request(Options, {?DEFAULT_DBNAME, Table}, Points). + row_count => RowCount}}. %%%=================================================================== %%% Internal functions %%%=================================================================== + +metric(Options, Metric) -> + metric_with_default(default_metric(Options), Metric). + +default_metric(Options) -> + #{dbname => ?DEFAULT_DBNAME, + timeunit => proplists:get_value(timeunit, Options, ms)}. + +%% table is required +metric_with_default(Default, #{table := _} = Metric) -> + maps:merge(Default, Metric); +%% backward compatibility +metric_with_default(Default, {DbName, Table}) -> + Default#{dbname => DbName, table => Table}; +metric_with_default(Default, Table) when is_atom(Table); is_list(Table); is_binary(Table) -> + Default#{table => Table}. + collect_columns(Timeunit, Points) -> collect_columns(Timeunit, Points, []). diff --git a/test/greptimedb_SUITE.erl b/test/greptimedb_SUITE.erl index 365fd9b..5e8b597 100644 --- a/test/greptimedb_SUITE.erl +++ b/test/greptimedb_SUITE.erl @@ -9,6 +9,7 @@ all() -> [t_write, t_write_stream, t_insert_requests, + t_insert_requests_with_timeunit, t_write_failure, t_write_batch, t_bench_perf, @@ -107,6 +108,26 @@ t_insert_requests(_) -> end, ok. +t_insert_requests_with_timeunit(_) -> + TsNano = 1705946037724448346, + Points = [#{fields => #{<<"temperature">> => 1}, + tags => + #{<<"from">> => <<"mqttx_4b963a8e">>, + <<"host">> => <<"serverA">>, + <<"qos">> => "0", + <<"device">> => <<"NO.1">>, + <<"region">> => <<"hangzhou">>}, + timestamp => TsNano}], + AuthInfo = {basic, #{username => "test", password => "test"}}, + Client = #{cli_opts => [{auth, AuthInfo}, {timeunit, second}]}, + Metric = #{table => "Test", timeunit => nanosecond}, + Request = greptimedb_encoder:insert_requests(Client, [{Metric, Points}]), + #{header := #{dbname := _DbName, authorization := _Auth}, + request := {inserts, #{inserts := [#{columns := Columns}]}}} = Request, + {value, TimestampColumn} = + lists:search(fun(C) -> maps:get(column_name, C) == <<"greptime_timestamp">> end, Columns), + ?assertEqual([TsNano], maps:get(timestamp_nanosecond_values, maps:get(values, TimestampColumn))). + t_write_failure(_) -> Metric = <<"temperatures">>, Points = From a1e4f94aae78d3ac6599f6048ae7e416eee66704 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Thu, 25 Jan 2024 14:31:34 +0200 Subject: [PATCH 2/2] fix: upgrade deps in rebar.lock to match rebar.config --- rebar.lock | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/rebar.lock b/rebar.lock index e135f8d..36c07b5 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,27 +1,27 @@ {"1.2.0", [{<<"acceptor_pool">>,{pkg,<<"acceptor_pool">>,<<"1.0.0">>},1}, - {<<"chatterbox">>,{pkg,<<"ts_chatterbox">>,<<"0.13.0">>},1}, + {<<"chatterbox">>,{pkg,<<"ts_chatterbox">>,<<"0.15.1">>},1}, {<<"ctx">>,{pkg,<<"ctx">>,<<"0.6.0">>},1}, {<<"ecpool">>, {git,"https://github.com/emqx/ecpool", - {ref,"d01b8cb99af90bc177eeeabe29075133db878fb3"}}, + {ref,"a9719f2d4ae9c778a8ca59f1ec9f644bcf7874a7"}}, 0}, {<<"gproc">>,{pkg,<<"gproc">>,<<"0.8.0">>},1}, - {<<"grpcbox">>,{pkg,<<"grpcbox">>,<<"0.16.0">>},0}, - {<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.2.3">>},2}]}. + {<<"grpcbox">>,{pkg,<<"grpcbox">>,<<"0.17.1">>},0}, + {<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.3.0">>},2}]}. [ {pkg_hash,[ {<<"acceptor_pool">>, <<"43C20D2ACAE35F0C2BCD64F9D2BDE267E459F0F3FD23DAB26485BF518C281B21">>}, - {<<"chatterbox">>, <<"6F059D97BCAA758B8EA6FFFE2B3B81362BD06B639D3EA2BB088335511D691EBF">>}, + {<<"chatterbox">>, <<"5CAC4D15DD7AD61FC3C4415CE4826FC563D4643DEE897A558EC4EA0B1C835C9C">>}, {<<"ctx">>, <<"8FF88B70E6400C4DF90142E7F130625B82086077A45364A78D208ED3ED53C7FE">>}, {<<"gproc">>, <<"CEA02C578589C61E5341FCE149EA36CCEF236CC2ECAC8691FBA408E7EA77EC2F">>}, - {<<"grpcbox">>, <<"B83F37C62D6EECA347B77F9B1EC7E9F62231690CDFEB3A31BE07CD4002BA9C82">>}, - {<<"hpack">>, <<"17670F83FF984AE6CD74B1C456EDDE906D27FF013740EE4D9EFAA4F1BF999633">>}]}, + {<<"grpcbox">>, <<"6E040AB3EF16FE699FFB513B0EF8E2E896DA7B18931A1EF817143037C454BCCE">>}, + {<<"hpack">>, <<"2461899CC4AB6A0EF8E970C1661C5FC6A52D3C25580BC6DD204F84CE94669926">>}]}, {pkg_hash_ext,[ {<<"acceptor_pool">>, <<"0CBCD83FDC8B9AD2EEE2067EF8B91A14858A5883CB7CD800E6FCD5803E158788">>}, - {<<"chatterbox">>, <<"B93D19104D86AF0B3F2566C4CBA2A57D2E06D103728246BA1AC6C3C0FF010AA7">>}, + {<<"chatterbox">>, <<"4F75B91451338BC0DA5F52F3480FA6EF6E3A2AEECFC33686D6B3D0A0948F31AA">>}, {<<"ctx">>, <<"A14ED2D1B67723DBEBBE423B28D7615EB0BDCBA6FF28F2D1F1B0A7E1D4AA5FC2">>}, {<<"gproc">>, <<"580ADAFA56463B75263EF5A5DF4C86AF321F68694E7786CB057FD805D1E2A7DE">>}, - {<<"grpcbox">>, <<"294DF743AE20A7E030889F00644001370A4F7CE0121F3BBDAF13CF3169C62913">>}, - {<<"hpack">>, <<"06F580167C4B8B8A6429040DF36CC93BBA6D571FAEAEC1B28816523379CBB23A">>}]} + {<<"grpcbox">>, <<"4A3B5D7111DAABC569DC9CBD9B202A3237D81C80BF97212FBC676832CB0CEB17">>}, + {<<"hpack">>, <<"D6137D7079169D8C485C6962DFE261AF5B9EF60FBC557344511C1E65E3D95FB0">>}]} ].