diff --git a/eprof.result b/eprof.result new file mode 100644 index 0000000..ce65137 --- /dev/null +++ b/eprof.result @@ -0,0 +1,106 @@ + +****** Process <0.909.0> -- 100.00 % of profiled time *** +FUNCTION CALLS % TIME [uS / CALLS] +-------- ----- ------- ---- [----------] +greptimedb_SUITE:bench_write/4 1 0.00 0 [ 0.00] +greptimedb_SUITE:'-t_bench_perf/1-fun-0-'/1 1 0.00 0 [ 0.00] +erlang:unique_integer/0 1 0.00 0 [ 0.00] +erlang:system_time/0 1 0.00 0 [ 0.00] +rand:seed/1 1 0.00 0 [ 0.00] +rand:seed_s/1 1 0.00 0 [ 0.00] +rand:seed_s/2 1 0.00 0 [ 0.00] +rand:exsss_seed/1 1 0.00 0 [ 0.00] +rand:seed58/1 3 0.00 0 [ 0.00] +erlang:phash2/1 1 0.00 1 [ 1.00] +erlang:apply/2 1 0.00 1 [ 1.00] +rand:default_seed/0 1 0.00 1 [ 1.00] +rand:mk_alg/1 1 0.00 1 [ 1.00] +rand:splitmix64_next/1 3 0.00 3 [ 1.00] +sets:mk_seg/1 10000 0.00 419 [ 0.04] +sets:fold_set/3 10000 0.00 550 [ 0.06] +sets:fold/3 10000 0.00 556 [ 0.06] +lists:seq/2 10000 0.00 577 [ 0.06] +lists:flatten/1 10000 0.00 579 [ 0.06] +lists:reverse/1 10000 0.00 1025 [ 0.10] +sets:new/0 10000 0.00 1064 [ 0.11] +sets:to_list/1 10000 0.00 1083 [ 0.11] +sets:fold_segs/4 20000 0.00 1230 [ 0.06] +greptimedb_encoder:collect_columns/1 10000 0.00 1370 [ 0.14] +greptimedb_encoder:insert_requests/2 10000 0.00 1399 [ 0.14] +greptimedb_SUITE:bench_points/2 10000 0.00 1550 [ 0.15] +sets:from_list/1 10000 0.00 1596 [ 0.16] +proplists:get_value/3 40000 0.00 1794 [ 0.04] +lists:foldl/3 20000 0.00 2019 [ 0.10] +greptimedb_encoder:insert_request/2 20000 0.01 6275 [ 0.31] +sets:'-to_list/1-fun-0-'/2 210000 0.01 7383 [ 0.04] +greptimedb_encoder:'-merge_columns/1-fun-1-'/1 210000 0.01 7452 [ 0.04] +greptimedb_encoder:insert_requests/4 20000 0.01 7764 [ 0.39] +greptimedb_encoder:'-merge_columns/1-fun-3-'/2 210000 0.01 7792 [ 0.04] +maps:remove/2 210000 0.01 9067 [ 0.04] +greptimedb_encoder:merge_columns/1 10000 0.01 9340 [ 0.93] +greptimedb_SUITE:bench_write/5 10001 0.01 10066 [ 1.01] +lists:reverse/2 10000 0.01 10671 [ 1.07] +lists:seq_loop/3 260000 0.01 10732 [ 0.04] +sets:fold_seg/4 170000 0.01 11829 [ 0.07] +sets:maybe_expand/1 210000 0.02 14624 [ 0.07] +maps:update_with/3 210000 0.02 19488 [ 0.09] +sets:fold_bucket/3 370000 0.02 20983 [ 0.06] +greptimedb_encoder:'-insert_request/2-fun-0-'/2 210000 0.03 22504 [ 0.11] +greptimedb_encoder:'-merge_columns/1-fun-4-'/1 210000 0.03 26390 [ 0.13] +greptimedb_encoder:'-merge_columns/1-fun-5-'/1 210000 0.03 28114 [ 0.13] +sets:update_bucket/3 210000 0.04 30147 [ 0.14] +greptimedb_encoder:flatten/1 210000 0.04 31857 [ 0.15] +rand:exsss_uniform/2 1000000 0.04 35355 [ 0.04] +rand:seed_get/0 1000000 0.04 35687 [ 0.04] +base64:encode/1 1000000 0.04 35714 [ 0.04] +erlang:setelement/3 840000 0.04 35747 [ 0.04] +rand:uniform_s/2 1000000 0.04 36359 [ 0.04] +erlang:put/2 1000001 0.05 40028 [ 0.04] +greptimedb_encoder:values_size/1 210000 0.06 50385 [ 0.24] +rand:seed_put/1 1000001 0.08 69585 [ 0.07] +greptimedb_SUITE:rand_string/1 1000000 0.09 77508 [ 0.08] +crypto:strong_rand_bytes/1 1000000 0.09 77718 [ 0.08] +greptimedb_encoder:pad_null_mask/2 210000 0.10 81287 [ 0.39] +maps:keys/1 1000000 0.13 106776 [ 0.11] +lists:map/2 1050000 0.13 113661 [ 0.11] +greptimedb_encoder:merge_columns/2 1000000 0.15 125319 [ 0.13] +greptimedb_encoder:ts_column/1 1000000 0.16 136602 [ 0.14] +greptimedb_encoder:'-merge_columns/1-fun-0-'/1 1000000 0.17 140289 [ 0.14] +maps:iterator/1 2210000 0.17 141323 [ 0.06] +rand:uniform/1 1000000 0.17 142184 [ 0.14] +greptimedb_encoder:collect_columns/2 1010000 0.17 143407 [ 0.14] +maps:merge/2 1000000 0.27 226383 [ 0.23] +base64:encode_binary/2 3000000 0.30 250704 [ 0.08] +greptimedb_SUITE:'-bench_points/2-fun-0-'/2 1000000 0.32 266201 [ 0.27] +erts_internal:map_next/3 2210000 0.34 287450 [ 0.13] +maps:put/3 1000000 0.41 341339 [ 0.34] +greptimedb_encoder:'-convert_columns/1-fun-1-'/2 10000000 0.45 379176 [ 0.04] +maps:from_list/1 2210000 0.50 421257 [ 0.19] +greptimedb_encoder:'-convert_columns/1-fun-0-'/2 10000000 0.51 430521 [ 0.04] +maps:map/2 2210000 0.53 443101 [ 0.20] +greptimedb_encoder:convert_columns/1 1000000 0.63 534406 [ 0.53] +crypto:strong_rand_bytes_nif/1 1000000 0.70 590735 [ 0.59] +sets:get_bucket_s/2 21000000 0.80 676987 [ 0.03] +maps:get/3 21000000 0.84 709155 [ 0.03] +sets:'-from_list/1-fun-0-'/2 21000000 0.89 753292 [ 0.04] +sets:get_bucket/2 21000000 0.90 757840 [ 0.04] +lists:member/2 21000000 0.96 811340 [ 0.04] +maps:next/1 22420000 0.97 814466 [ 0.04] +erlang:phash/2 21000000 1.05 881969 [ 0.04] +greptimedb_encoder:field_column/2 10000000 1.45 1223217 [ 0.12] +greptimedb_encoder:tag_column/2 10000000 1.65 1387376 [ 0.14] +sets:get_slot/2 21000000 1.74 1462640 [ 0.07] +greptimedb_encoder:'-merge_columns/2-fun-0-'/2 21000000 1.78 1498379 [ 0.07] +lists:foldl_1/3 22000000 1.84 1551736 [ 0.07] +lists:do_flatten/2 23010000 2.01 1690727 [ 0.07] +greptimedb_encoder:flatten/2 20790000 2.64 2222132 [ 0.11] +lists:map_1/2 23630000 2.88 2423506 [ 0.10] +greptimedb_encoder:merge_values/2 21000000 3.08 2597834 [ 0.12] +sets:add_element/2 21000000 3.48 2933195 [ 0.14] +maps:map_1/2 22420000 3.49 2942453 [ 0.13] +erts_internal:counters_add/3 197850000 9.09 7660091 [ 0.04] +greptimedb_encoder:merge_column/3 21000000 15.53 13086811 [ 0.62] +counters:add/3 197850000 17.13 14430363 [ 0.07] +persistent_term:get/1 197850000 18.54 15621369 [ 0.08] +------------------------------------------------ ---------- ------- -------- [----------] +Total: 1035360021 100.00% 84242381 [ 0.08] diff --git a/src/greptimedb_encoder.erl b/src/greptimedb_encoder.erl index b73b66a..9bc0969 100644 --- a/src/greptimedb_encoder.erl +++ b/src/greptimedb_encoder.erl @@ -59,7 +59,7 @@ collect_columns(Points) -> collect_columns(Points, []). collect_columns([], Columns) -> - maps:values(merge_columns(Columns)); + merge_columns(Columns); collect_columns([Point | T], Columns) -> collect_columns(T, [convert_columns(Point) | Columns]). @@ -95,9 +95,50 @@ values_size(#{ts_second_values := Values}) -> length(Values); values_size(#{ts_millisecond_values := Values}) -> length(Values); +values_size(#{ts_microsecond_values := Values}) -> + length(Values); values_size(#{ts_nanosecond_values := Values}) -> length(Values). +merge_values(V1, V2) when map_size(V1) == 0 -> + V2; +merge_values(#{i8_values := V1} = L, #{i8_values := V2}) -> + L#{i8_values := [V2 | V1]}; +merge_values(#{i16_values := V1} = L, #{i16_values := V2}) -> + L#{i16_values := [V2 | V1]}; +merge_values(#{i32_values := V1} = L, #{i32_values := V2}) -> + L#{i32_values := [V2 | V1]}; +merge_values(#{i64_values := V1} = L, #{i64_values := V2}) -> + L#{i64_values := [V2 | V1]}; +merge_values(#{u8_values := V1} = L, #{u8_values := V2}) -> + L#{u8_values := [V2 | V1]}; +merge_values(#{u16_values := V1} = L, #{u16_values := V2}) -> + L#{u16_values := [V2 | V1]}; +merge_values(#{u32_values := V1} = L, #{u32_values := V2}) -> + L#{u32_values := [V2 | V1]}; +merge_values(#{u64_values := V1} = L, #{u64_values := V2}) -> + L#{u64_values := [V2 | V1]}; +merge_values(#{f32_values := V1} = L, #{f32_values := V2}) -> + L#{f32_values := [V2 | V1]}; +merge_values(#{f64_values := V1} = L, #{f64_values := V2}) -> + L#{f64_values := [V2 | V1]}; +merge_values(#{bool_values := V1} = L, #{bool_values := V2}) -> + L#{bool_values := [V2 | V1]}; +merge_values(#{binary_values := V1} = L, #{binary_values := V2}) -> + L#{binary_values := [V2 | V1]}; +merge_values(#{string_values := V1} = L, #{string_values := V2}) -> + L#{string_values := [V2 | V1]}; +merge_values(#{date_values := V1} = L, #{date_values := V2}) -> + L#{date_values := [V2 | V1]}; +merge_values(#{ts_second_values := V1} = L, #{ts_second_values := V2}) -> + L#{ts_second_values := [V2 | V1]}; +merge_values(#{ts_millisecond_values := V1} = L, #{ts_millisecond_values := V2}) -> + L#{ts_millisecond_values := [V2 | V1]}; +merge_values(#{ts_microsecond_values := V1} = L, #{ts_microsecond_values := V2}) -> + L#{ts_microsecond_values := [V2 | V1]}; +merge_values(#{ts_nanosecond_values := V1} = L, #{ts_nanosecond_values := V2}) -> + L#{ts_nanosecond_values := [V2 | V1]}. + pad_null_mask(#{values := Values, null_mask := NullMask} = Column, RowCount) -> ValuesSize = values_size(Values), NewColumn = @@ -105,7 +146,7 @@ pad_null_mask(#{values := Values, null_mask := NullMask} = Column, RowCount) -> maps:remove(null_mask, Column); true -> Pad = 8 - (bit_size(NullMask) - floor(bit_size(NullMask) / 8) * 8), - Column#{null_mask => <<0:Pad/integer, NullMask/bits>>} + Column#{null_mask := <<0:Pad/integer, NullMask/bits>>} end, NewColumn. @@ -118,46 +159,52 @@ convert_columns(#{fields := Fields, maps:put( maps:get(column_name, TsColumn), TsColumn, maps:merge(FieldColumns, TagColumns)). -merge_column(#{null_mask := NullMask} = Column, NewColumn) -> - Values = maps:get(values, Column, #{}), - NewValues = maps:get(values, NewColumn), - MergedValues = - maps:merge_with(fun(_K, V1, V2) -> lists:foldr(fun(X, XS) -> [X | XS] end, V2, V1) end, - Values, - NewValues), - NewColumn1 = maps:merge(Column, NewColumn), - NewColumn1#{values => MergedValues, null_mask => <>}. +merge_column(#{null_mask := NullMask} = Column, Name, NextColumns) -> + case NextColumns of + #{Name := NewColumn} -> + Values = maps:get(values, Column, #{}), + NewValues = maps:get(values, NewColumn), + MergedValues = merge_values(Values, NewValues), + case map_size(Column) of + 1 -> + NewColumn#{values := MergedValues, null_mask => <>}; + _ -> + Column#{values := MergedValues, null_mask := <>} + end; + _ -> + Column#{null_mask := <>} + end. merge_columns(NextColumns, Columns) -> - maps:fold(fun(Name, #{null_mask := NullMask} = Column, AccColumns) -> - MergedColumn = - case maps:find(Name, NextColumns) of - {ok, NewColumn} -> - merge_column(Column, NewColumn); - _ -> - Column#{null_mask => <>} - end, - AccColumns#{Name => MergedColumn} - end, - Columns, + lists:map(fun({Name, Column}) -> {Name, merge_column(Column, Name, NextColumns)} end, Columns). -empty_column() -> - #{null_mask => <<>>}. +flatten([H]) -> + [H]; +flatten([[H] | T]) -> + flatten(T, [H]). + +flatten([], Acc) -> + Acc; +flatten([H], Acc) -> + [H | Acc]; +flatten([[H] | T], Acc) -> + flatten(T, [H | Acc]). merge_columns(Columns) -> Names = sets:to_list( - sets:union( - lists:map(fun(C) -> - sets:from_list( - maps:keys(C)) - end, - Columns))), - EmptyColumns = - maps:from_list( - lists:map(fun(Name) -> {Name, empty_column()} end, Names)), - lists:foldr(fun merge_columns/2, EmptyColumns, Columns). + sets:from_list( + lists:flatten( + lists:map(fun(C) -> maps:keys(C) end, Columns)))), + EmptyColumns = lists:map(fun(Name) -> {Name, #{null_mask => <<>>}} end, Names), + lists:map(fun({_Name, Column}) -> + maps:update_with(values, + fun(Values) -> maps:map(fun(_K, VS) -> flatten(VS) end, Values) + end, + Column) + end, + lists:foldl(fun merge_columns/2, EmptyColumns, lists:reverse(Columns))). ts_column(Ts) when is_map(Ts) -> maps:merge(#{column_name => ?TS_COLUMN, semantic_type => 'TIMESTAMP'}, Ts); diff --git a/test/greptimedb_SUITE.erl b/test/greptimedb_SUITE.erl index 5091e43..2b04f22 100644 --- a/test/greptimedb_SUITE.erl +++ b/test/greptimedb_SUITE.erl @@ -6,7 +6,10 @@ -include_lib("eunit/include/eunit.hrl"). all() -> - [t_write, t_write_stream, t_collect_columns, t_write_batch]. + [t_write, t_write_stream, t_collect_columns, t_write_batch, t_bench_perf]. + +%%[t_bench_perf]. +%%[t_collect_columns, t_bench_perf]. init_per_suite(Config) -> application:ensure_all_started(greptimedb), @@ -17,7 +20,7 @@ end_per_suite(_Config) -> points(N) -> lists:map(fun(Num) -> - #{fields => #{<<"temperature">> => 2}, + #{fields => #{<<"temperature">> => Num}, tags => #{<<"from">> => <<"mqttx_4b963a8e">>, <<"host">> => <<"serverB">>, @@ -45,7 +48,16 @@ t_collect_columns(_) -> <<"qos">> => "1", <<"region">> => <<"ningbo">>, <<"to">> => <<"kafka">>}, - timestamp => 1619775143098}], + timestamp => 1619775143098}, + #{fields => #{<<"temperature">> => 3}, + tags => + #{<<"from">> => <<"mqttx_4b963a8e">>, + <<"host">> => <<"serverB">>, + <<"qos">> => "2", + <<"region">> => <<"xiamen">>, + <<"to">> => <<"kafka">>}, + timestamp => 1619775144098}], + Metric = "Test", AuthInfo = {basic, #{username => "test", password => "test"}}, Client = #{cli_opts => [{auth, AuthInfo}]}, @@ -59,27 +71,28 @@ t_collect_columns(_) -> {value, TemperatureColumn} = lists:search(fun(C) -> maps:get(column_name, C) == <<"temperature">> end, Columns), - ?assertEqual([1, 2], maps:get(f64_values, maps:get(values, TemperatureColumn))), + ?assertEqual([1, 2, 3], maps:get(f64_values, maps:get(values, TemperatureColumn))), {value, QosColumn} = lists:search(fun(C) -> maps:get(column_name, C) == <<"qos">> end, Columns), - ?assertEqual(["0", "1"], maps:get(string_values, maps:get(values, QosColumn))), + ?assertEqual(["0", "1", "2"], maps:get(string_values, maps:get(values, QosColumn))), {value, ToColumn} = lists:search(fun(C) -> maps:get(column_name, C) == <<"to">> end, Columns), - ?assertEqual([<<"kafka">>], maps:get(string_values, maps:get(values, ToColumn))), - ?assertEqual(<<0:7/integer, 1:1/integer>>, maps:get(null_mask, ToColumn)), + ?assertEqual([<<"kafka">>, <<"kafka">>], + maps:get(string_values, maps:get(values, ToColumn))), + ?assertEqual(<<0:6/integer, 1:1/integer, 1:1/integer>>, maps:get(null_mask, ToColumn)), {value, DeviceColumn} = lists:search(fun(C) -> maps:get(column_name, C) == <<"device">> end, Columns), ?assertEqual([<<"NO.1">>], maps:get(string_values, maps:get(values, DeviceColumn))), - ?assertEqual(<<0:6/integer, 1:1/integer, 0:1/integer>>, + ?assertEqual(<<0:5/integer, 1:1/integer, 0:1/integer, 0:1/integer>>, maps:get(null_mask, DeviceColumn)), {value, TimestampColumn} = lists:search(fun(C) -> maps:get(column_name, C) == <<"greptime_timestamp">> end, Columns), - ?assertEqual([1619775142098, 1619775143098], + ?assertEqual([1619775142098, 1619775143098, 1619775144098], maps:get(ts_millisecond_values, maps:get(values, TimestampColumn))); _ -> ?assert(false) @@ -106,7 +119,7 @@ t_write(_) -> timestamp => 1619775143098}], Options = [{endpoints, [{http, "localhost", 4001}]}, - {pool, greptimedb_client_pool_1}, + {pool, greptimedb_client_pool}, {pool_size, 5}, {pool_type, random}, {auth, {basic, #{username => <<"greptime_user">>, password => <<"greptime_pwd">>}}}], @@ -115,12 +128,13 @@ t_write(_) -> true = greptimedb:is_alive(Client), {ok, #{response := {affected_rows, #{value := 2}}}} = greptimedb:write(Client, Metric, Points), + greptimedb:stop_client(Client), ok. t_write_stream(_) -> Options = [{endpoints, [{http, "localhost", 4001}]}, - {pool, greptimedb_client_pool_3}, + {pool, greptimedb_client_pool}, {pool_size, 8}, {pool_type, random}, {auth, {basic, #{username => <<"greptime_user">>, password => <<"greptime_pwd">>}}}], @@ -137,12 +151,13 @@ t_write_stream(_) -> lists:seq(1, 10)), {ok, #{response := {affected_rows, #{value := 55}}}} = greptimedb_stream:finish(Stream), + greptimedb:stop_client(Client), ok. t_write_batch(_) -> Options = [{endpoints, [{http, "localhost", 4001}]}, - {pool, greptimedb_client_pool_4}, + {pool, greptimedb_client_pool}, {pool_size, 8}, {pool_type, random}, {auth, {basic, #{username => <<"greptime_user">>, password => <<"greptime_pwd">>}}}], @@ -160,4 +175,132 @@ t_write_batch(_) -> {ok, #{response := {affected_rows, #{value := 55}}}} = greptimedb:write_batch(Client, MetricAndPoints), + greptimedb:stop_client(Client), + ok. + +rand_string(Bytes) -> + base64:encode( + crypto:strong_rand_bytes(Bytes)). + +bench_points(StartTs, N) -> + lists:map(fun(Num) -> + #{fields => + #{<<"f0">> => Num, + <<"f1">> => Num, + <<"f2">> => Num, + <<"f3">> => Num, + <<"f4">> => Num, + <<"f5">> => Num, + <<"f6">> => Num, + <<"f7">> => Num, + <<"f8">> => Num, + <<"f9">> => rand:uniform(Num)}, + tags => + #{<<"tag0">> => <<"tagv0">>, + <<"tag1">> => <<"tagv1">>, + <<"tag2">> => <<"tagv2">>, + <<"tag3">> => <<"tagv3">>, + <<"tag4">> => <<"tagv4">>, + <<"tag5">> => <<"tagv5">>, + <<"tag6">> => <<"tagv6">>, + <<"tag7">> => <<"tagv7">>, + <<"tag8">> => <<"tagv8">>, + <<"tag9">> => rand_string(8)}, + timestamp => StartTs + Num} + end, + lists:seq(1, N)). + +bench_write(N, StartMs, BatchSize, Client, BenchmarkEncoding) -> + bench_write(N, StartMs, BatchSize, Client, BenchmarkEncoding, 0). + +bench_write(0, _StartMs, _BatchSize, _Client, _BenchmarkEncoding, Written) -> + Written; +bench_write(N, StartMs, BatchSize, Client, BenchmarkEncoding, Written) -> + Rows = + case BenchmarkEncoding of + true -> + Metric = <<"bench_metrics">>, + Points = bench_points(StartMs - N, BatchSize), + _Request = greptimedb_encoder:insert_requests(Client, [{Metric, Points}]), + length(Points); + false -> + {ok, #{response := {affected_rows, #{value := AffectedRows}}}} = + greptimedb:write(Client, + <<"bench_metrics">>, + bench_points(1687814974000 - N, BatchSize)), + AffectedRows + end, + + NewWritten = Written + Rows, + bench_write(N - 1, StartMs, BatchSize, Client, BenchmarkEncoding, NewWritten). + +join([P | Ps]) -> + receive + {P, Result} -> + [Result | join(Ps)] + end; +join([]) -> + []. + +t_bench_perf(_) -> + Options = + [{endpoints, [{http, "localhost", 4001}]}, + {pool, greptimedb_client_pool}, + {pool_size, 8}, + {pool_type, random}, + {auth, {basic, #{username => <<"greptime_user">>, password => <<"greptime_pwd">>}}}], + + {ok, Client} = greptimedb:start_client(Options), + true = greptimedb:is_alive(Client), + BatchSize = 100, + Num = 1000, + Profile = false, + BenchmarkEncoding = false, + Concurrency = 3, + {MegaSecs, Secs, _MicroSecs} = erlang:timestamp(), + StartMs = (MegaSecs * 1000000 + Secs) * 1000, + + %% warmup + bench_write(1000, StartMs, BatchSize, Client, BenchmarkEncoding), + ct:print("Warmed up, start to benchmark writing..."), + %% benchmark + T1 = erlang:monotonic_time(), + Rows = + case Profile of + true -> + ct:print("Enable eprof..."), + eprof:start(), + eprof:log("/tmp/eprof.result"), + {ok, Ret} = + eprof:profile(fun() -> + bench_write(Num, StartMs, BatchSize, Client, BenchmarkEncoding) + end), + eprof:analyze(), + eprof:stop(), + Ret; + false -> + Parent = self(), + Pids = + lists:map(fun(C) -> + spawn(fun() -> + Written = + bench_write(Num, + StartMs - C * Num * BatchSize, + BatchSize, + Client, + BenchmarkEncoding), + Parent ! {self(), Written} + end) + end, + lists:seq(1, Concurrency)), + lists:sum(join(Pids)) + end, + + T2 = erlang:monotonic_time(), + Time = erlang:convert_time_unit(T2 - T1, native, seconds), + TPS = Rows / Time, + %% print the result + ct:print("Finish benchmark, concurrency: ~p, cost: ~p seconds, rows: ~p, TPS: ~p~n", + [Concurrency, Time, Rows, TPS]), + greptimedb:stop_client(Client), ok.