From 6e37ec0f795487cb0f730bd3f564c12ac26a77fd Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Mon, 5 Jun 2023 14:32:22 +0800 Subject: [PATCH] feat: update protos and make it work (#19) * feat: update protos and make it work * ci: update greptimedb to 0.3.0 * feat: impl write_batch, close #20 * refactor: spec for write_batch * ci: update greptimedb to 0.3.0 --- .github/workflows/erlang.yml | 2 +- README.md | 12 ++ protos/column.proto | 22 +- protos/common.proto | 26 ++- protos/database.proto | 41 +++- protos/health.proto | 15 +- src/database_pb.erl | 408 +++++++++++++++++++++++++++++++++-- src/greptimedb.erl | 28 ++- src/greptimedb_encoder.erl | 42 ++-- src/greptimedb_stream.erl | 2 +- test/greptimedb_SUITE.erl | 33 ++- 11 files changed, 560 insertions(+), 71 deletions(-) diff --git a/.github/workflows/erlang.yml b/.github/workflows/erlang.yml index 7294fba..174f073 100644 --- a/.github/workflows/erlang.yml +++ b/.github/workflows/erlang.yml @@ -22,7 +22,7 @@ jobs: - uses: actions/checkout@v3 - name: Setup greptimedb run: | - GREPTIMEDB_VER=v0.2.0 + GREPTIMEDB_VER=v0.3.0 DOWNLOAD_URL=https://github.com/GreptimeTeam/greptimedb curl -L ${DOWNLOAD_URL}/releases/download/${GREPTIMEDB_VER}/greptime-linux-amd64.tgz -o /tmp/greptimedb-${GREPTIMEDB_VER}-linux-amd64.tar.gz mkdir -p /tmp/greptimedb-download diff --git a/README.md b/README.md index 12fbf64..9341a3c 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ greptimedb-client-erl An Erlang client library for [GreptimeDB](https://github.com/GreptimeTeam/greptimedb). > **_NOTE:_** GreptimeDB and this project is under heavy development. Do not use it in production at the moment. +> 0.1.0: only working for GreptimeDB 0.2, otherwise for the latest GreptimeDB releases. ## Usage @@ -51,6 +52,17 @@ Write data by rows: greptimedb:write(Client, Metric, Points). ``` +Batch write: +```erlang +Metric1 = <<"temperatures">>, +Points1 = [...], +Metric2 = <<"humidities">>, +Points2 = [...], +Batch = [{Metric1, Points1}, {Metric2, Points}], + +{ok, _} = greptimedb:write_batch(Client, Batch). +``` + Streaming write: ```erlang diff --git a/protos/column.proto b/protos/column.proto index e32b69a..bb9f8b6 100644 --- a/protos/column.proto +++ b/protos/column.proto @@ -1,8 +1,22 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + syntax = "proto3"; package greptime.v1; -option java_package="io.greptime.v1"; +option java_package = "io.greptime.v1"; option java_outer_classname = "Columns"; option go_package = "github.com/GreptimeTeam/greptime-proto/go/greptime/v1"; @@ -43,7 +57,8 @@ message Column { } // The array of non-null values in this column. // - // For example: suppose there is a column "foo" that contains some int32 values (1, 2, 3, 4, 5, null, 7, 8, 9, null); + // For example: suppose there is a column "foo" that contains some int32 + // values (1, 2, 3, 4, 5, null, 7, 8, 9, null); // column: // column_name: foo // semantic_type: Tag @@ -52,7 +67,8 @@ message Column { Values values = 3; // Mask maps the positions of null values. - // If a bit in null_mask is 1, it indicates that the column value at that position is null. + // If a bit in null_mask is 1, it indicates that the column value at that + // position is null. bytes null_mask = 4; // Helpful in creating vector from column. diff --git a/protos/common.proto b/protos/common.proto index c168103..e332fd9 100644 --- a/protos/common.proto +++ b/protos/common.proto @@ -1,3 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + syntax = "proto3"; package greptime.v1; @@ -31,14 +45,8 @@ message Basic { string password = 2; } -message Token { - string token = 1; -} +message Token { string token = 1; } -message AffectedRows { - uint32 value = 1; -} +message AffectedRows { uint32 value = 1; } -message FlightMetadata { - AffectedRows affected_rows = 1; -} +message FlightMetadata { AffectedRows affected_rows = 1; } diff --git a/protos/database.proto b/protos/database.proto index 63c22e6..54597ef 100644 --- a/protos/database.proto +++ b/protos/database.proto @@ -1,3 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + syntax = "proto3"; package greptime.v1; @@ -19,26 +33,30 @@ service GreptimeDatabase { message GreptimeRequest { RequestHeader header = 1; oneof request { - InsertRequest insert = 2; + InsertRequests inserts = 2; QueryRequest query = 3; DdlRequest ddl = 4; + DeleteRequest delete = 5; } } message GreptimeResponse { ResponseHeader header = 1; - oneof response { - AffectedRows affected_rows = 2; - } + oneof response { AffectedRows affected_rows = 2; } } message QueryRequest { oneof query { string sql = 1; bytes logical_plan = 2; + // PromRangeQuery prom_range_query = 3; } } +message InsertRequests { + repeated InsertRequest inserts = 1; +} + message InsertRequest { string table_name = 1; @@ -53,3 +71,18 @@ message InsertRequest { // The region number of current insert request. uint32 region_number = 5; } + +message DeleteRequest { + // The table name to delete from. Catalog name and schema name are in the + // `RequestHeader`. + string table_name = 1; + + // The region number of current delete request. + uint32 region_number = 2; + + // The data to delete, indexed by key columns. + repeated Column key_columns = 3; + + // The row count of all columns above. + uint32 row_count = 4; +} diff --git a/protos/health.proto b/protos/health.proto index ca21a0e..d9a2e05 100644 --- a/protos/health.proto +++ b/protos/health.proto @@ -1,3 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + syntax = "proto3"; package greptime.v1; @@ -6,7 +20,6 @@ option java_package = "io.greptime.v1"; option java_outer_classname = "Health"; option go_package = "github.com/GreptimeTeam/greptime-proto/go/greptime/v1"; - service HealthCheck { rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse); } diff --git a/src/database_pb.erl b/src/database_pb.erl index 3bd6d64..b04a808 100644 --- a/src/database_pb.erl +++ b/src/database_pb.erl @@ -57,7 +57,7 @@ %% message types -type greptime_request() :: #{header => request_header(), % = 1, optional - request => {insert, insert_request()} | {query, query_request()} | {ddl, ddl_request()} % oneof + request => {inserts, insert_requests()} | {query, query_request()} | {ddl, ddl_request()} | {delete, delete_request()} % oneof }. -type greptime_response() :: @@ -69,6 +69,10 @@ #{query => {sql, unicode:chardata()} | {logical_plan, iodata()} % oneof }. +-type insert_requests() :: + #{inserts => [insert_request()] % = 1, repeated + }. + -type insert_request() :: #{table_name => unicode:chardata(), % = 1, optional columns => [column()], % = 3, repeated @@ -76,6 +80,13 @@ region_number => non_neg_integer() % = 5, optional, 32 bits }. +-type delete_request() :: + #{table_name => unicode:chardata(), % = 1, optional + region_number => non_neg_integer(), % = 2, optional, 32 bits + key_columns => [column()], % = 3, repeated + row_count => non_neg_integer() % = 4, optional, 32 bits + }. + -type ddl_request() :: #{expr => {create_database, create_database_expr()} | {create_table, create_table_expr()} | {alter, alter_expr()} | {drop_table, drop_table_expr()} | {flush_table, flush_table_expr()} % oneof }. @@ -214,9 +225,9 @@ #{affected_rows => affected_rows() % = 1, optional }. --export_type(['greptime_request'/0, 'greptime_response'/0, 'query_request'/0, 'insert_request'/0, 'ddl_request'/0, 'create_table_expr'/0, 'alter_expr'/0, 'drop_table_expr'/0, 'flush_table_expr'/0, 'create_database_expr'/0, 'add_columns'/0, 'drop_columns'/0, 'rename_table'/0, 'add_column'/0, 'drop_column'/0, 'table_id'/0, 'values'/0, 'column'/0, 'column_def'/0, 'response_header'/0, 'request_header'/0, 'auth_header'/0, 'basic'/0, 'token'/0, 'affected_rows'/0, 'flight_metadata'/0]). --type '$msg_name'() :: greptime_request | greptime_response | query_request | insert_request | ddl_request | create_table_expr | alter_expr | drop_table_expr | flush_table_expr | create_database_expr | add_columns | drop_columns | rename_table | add_column | drop_column | table_id | values | column | column_def | response_header | request_header | auth_header | basic | token | affected_rows | flight_metadata. --type '$msg'() :: greptime_request() | greptime_response() | query_request() | insert_request() | ddl_request() | create_table_expr() | alter_expr() | drop_table_expr() | flush_table_expr() | create_database_expr() | add_columns() | drop_columns() | rename_table() | add_column() | drop_column() | table_id() | values() | column() | column_def() | response_header() | request_header() | auth_header() | basic() | token() | affected_rows() | flight_metadata(). +-export_type(['greptime_request'/0, 'greptime_response'/0, 'query_request'/0, 'insert_requests'/0, 'insert_request'/0, 'delete_request'/0, 'ddl_request'/0, 'create_table_expr'/0, 'alter_expr'/0, 'drop_table_expr'/0, 'flush_table_expr'/0, 'create_database_expr'/0, 'add_columns'/0, 'drop_columns'/0, 'rename_table'/0, 'add_column'/0, 'drop_column'/0, 'table_id'/0, 'values'/0, 'column'/0, 'column_def'/0, 'response_header'/0, 'request_header'/0, 'auth_header'/0, 'basic'/0, 'token'/0, 'affected_rows'/0, 'flight_metadata'/0]). +-type '$msg_name'() :: greptime_request | greptime_response | query_request | insert_requests | insert_request | delete_request | ddl_request | create_table_expr | alter_expr | drop_table_expr | flush_table_expr | create_database_expr | add_columns | drop_columns | rename_table | add_column | drop_column | table_id | values | column | column_def | response_header | request_header | auth_header | basic | token | affected_rows | flight_metadata. +-type '$msg'() :: greptime_request() | greptime_response() | query_request() | insert_requests() | insert_request() | delete_request() | ddl_request() | create_table_expr() | alter_expr() | drop_table_expr() | flush_table_expr() | create_database_expr() | add_columns() | drop_columns() | rename_table() | add_column() | drop_column() | table_id() | values() | column() | column_def() | response_header() | request_header() | auth_header() | basic() | token() | affected_rows() | flight_metadata(). -export_type(['$msg_name'/0, '$msg'/0]). -if(?OTP_RELEASE >= 24). @@ -239,7 +250,9 @@ encode_msg(Msg, MsgName, Opts) -> greptime_request -> encode_msg_greptime_request(id(Msg, TrUserData), TrUserData); greptime_response -> encode_msg_greptime_response(id(Msg, TrUserData), TrUserData); query_request -> encode_msg_query_request(id(Msg, TrUserData), TrUserData); + insert_requests -> encode_msg_insert_requests(id(Msg, TrUserData), TrUserData); insert_request -> encode_msg_insert_request(id(Msg, TrUserData), TrUserData); + delete_request -> encode_msg_delete_request(id(Msg, TrUserData), TrUserData); ddl_request -> encode_msg_ddl_request(id(Msg, TrUserData), TrUserData); create_table_expr -> encode_msg_create_table_expr(id(Msg, TrUserData), TrUserData); alter_expr -> encode_msg_alter_expr(id(Msg, TrUserData), TrUserData); @@ -282,9 +295,10 @@ encode_msg_greptime_request(#{} = M, Bin, TrUserData) -> case M of #{request := F2} -> case id(F2, TrUserData) of - {insert, TF2} -> begin TrTF2 = id(TF2, TrUserData), e_mfield_greptime_request_insert(TrTF2, <>, TrUserData) end; + {inserts, TF2} -> begin TrTF2 = id(TF2, TrUserData), e_mfield_greptime_request_inserts(TrTF2, <>, TrUserData) end; {query, TF2} -> begin TrTF2 = id(TF2, TrUserData), e_mfield_greptime_request_query(TrTF2, <>, TrUserData) end; - {ddl, TF2} -> begin TrTF2 = id(TF2, TrUserData), e_mfield_greptime_request_ddl(TrTF2, <>, TrUserData) end + {ddl, TF2} -> begin TrTF2 = id(TF2, TrUserData), e_mfield_greptime_request_ddl(TrTF2, <>, TrUserData) end; + {delete, TF2} -> begin TrTF2 = id(TF2, TrUserData), e_mfield_greptime_request_delete(TrTF2, <>, TrUserData) end end; _ -> B1 end. @@ -321,6 +335,19 @@ encode_msg_query_request(#{} = M, Bin, TrUserData) -> _ -> Bin end. +encode_msg_insert_requests(Msg, TrUserData) -> encode_msg_insert_requests(Msg, <<>>, TrUserData). + + +encode_msg_insert_requests(#{} = M, Bin, TrUserData) -> + case M of + #{inserts := F1} -> + TrF1 = id(F1, TrUserData), + if TrF1 == [] -> Bin; + true -> e_field_insert_requests_inserts(TrF1, Bin, TrUserData) + end; + _ -> Bin + end. + encode_msg_insert_request(Msg, TrUserData) -> encode_msg_insert_request(Msg, <<>>, TrUserData). @@ -365,6 +392,50 @@ encode_msg_insert_request(#{} = M, Bin, TrUserData) -> _ -> B3 end. +encode_msg_delete_request(Msg, TrUserData) -> encode_msg_delete_request(Msg, <<>>, TrUserData). + + +encode_msg_delete_request(#{} = M, Bin, TrUserData) -> + B1 = case M of + #{table_name := F1} -> + begin + TrF1 = id(F1, TrUserData), + case is_empty_string(TrF1) of + true -> Bin; + false -> e_type_string(TrF1, <>, TrUserData) + end + end; + _ -> Bin + end, + B2 = case M of + #{region_number := F2} -> + begin + TrF2 = id(F2, TrUserData), + if TrF2 =:= 0 -> B1; + true -> e_varint(TrF2, <>, TrUserData) + end + end; + _ -> B1 + end, + B3 = case M of + #{key_columns := F3} -> + TrF3 = id(F3, TrUserData), + if TrF3 == [] -> B2; + true -> e_field_delete_request_key_columns(TrF3, B2, TrUserData) + end; + _ -> B2 + end, + case M of + #{row_count := F4} -> + begin + TrF4 = id(F4, TrUserData), + if TrF4 =:= 0 -> B3; + true -> e_varint(TrF4, <>, TrUserData) + end + end; + _ -> B3 + end. + encode_msg_ddl_request(Msg, TrUserData) -> encode_msg_ddl_request(Msg, <<>>, TrUserData). @@ -1157,8 +1228,8 @@ e_mfield_greptime_request_header(Msg, Bin, TrUserData) -> Bin2 = e_varint(byte_size(SubBin), Bin), <>. -e_mfield_greptime_request_insert(Msg, Bin, TrUserData) -> - SubBin = encode_msg_insert_request(Msg, <<>>, TrUserData), +e_mfield_greptime_request_inserts(Msg, Bin, TrUserData) -> + SubBin = encode_msg_insert_requests(Msg, <<>>, TrUserData), Bin2 = e_varint(byte_size(SubBin), Bin), <>. @@ -1172,6 +1243,11 @@ e_mfield_greptime_request_ddl(Msg, Bin, TrUserData) -> Bin2 = e_varint(byte_size(SubBin), Bin), <>. +e_mfield_greptime_request_delete(Msg, Bin, TrUserData) -> + SubBin = encode_msg_delete_request(Msg, <<>>, TrUserData), + Bin2 = e_varint(byte_size(SubBin), Bin), + <>. + e_mfield_greptime_response_header(_Msg, Bin, _TrUserData) -> <>. e_mfield_greptime_response_affected_rows(Msg, Bin, TrUserData) -> @@ -1179,6 +1255,17 @@ e_mfield_greptime_response_affected_rows(Msg, Bin, TrUserData) -> Bin2 = e_varint(byte_size(SubBin), Bin), <>. +e_mfield_insert_requests_inserts(Msg, Bin, TrUserData) -> + SubBin = encode_msg_insert_request(Msg, <<>>, TrUserData), + Bin2 = e_varint(byte_size(SubBin), Bin), + <>. + +e_field_insert_requests_inserts([Elem | Rest], Bin, TrUserData) -> + Bin2 = <>, + Bin3 = e_mfield_insert_requests_inserts(id(Elem, TrUserData), Bin2, TrUserData), + e_field_insert_requests_inserts(Rest, Bin3, TrUserData); +e_field_insert_requests_inserts([], Bin, _TrUserData) -> Bin. + e_mfield_insert_request_columns(Msg, Bin, TrUserData) -> SubBin = encode_msg_column(Msg, <<>>, TrUserData), Bin2 = e_varint(byte_size(SubBin), Bin), @@ -1190,6 +1277,17 @@ e_field_insert_request_columns([Elem | Rest], Bin, TrUserData) -> e_field_insert_request_columns(Rest, Bin3, TrUserData); e_field_insert_request_columns([], Bin, _TrUserData) -> Bin. +e_mfield_delete_request_key_columns(Msg, Bin, TrUserData) -> + SubBin = encode_msg_column(Msg, <<>>, TrUserData), + Bin2 = e_varint(byte_size(SubBin), Bin), + <>. + +e_field_delete_request_key_columns([Elem | Rest], Bin, TrUserData) -> + Bin2 = <>, + Bin3 = e_mfield_delete_request_key_columns(id(Elem, TrUserData), Bin2, TrUserData), + e_field_delete_request_key_columns(Rest, Bin3, TrUserData); +e_field_delete_request_key_columns([], Bin, _TrUserData) -> Bin. + e_mfield_ddl_request_create_database(Msg, Bin, TrUserData) -> SubBin = encode_msg_create_database_expr(Msg, <<>>, TrUserData), Bin2 = e_varint(byte_size(SubBin), Bin), @@ -1722,7 +1820,9 @@ decode_msg_1_catch(Bin, MsgName, TrUserData) -> decode_msg_2_doit(greptime_request, Bin, TrUserData) -> id(decode_msg_greptime_request(Bin, TrUserData), TrUserData); decode_msg_2_doit(greptime_response, Bin, TrUserData) -> id(decode_msg_greptime_response(Bin, TrUserData), TrUserData); decode_msg_2_doit(query_request, Bin, TrUserData) -> id(decode_msg_query_request(Bin, TrUserData), TrUserData); +decode_msg_2_doit(insert_requests, Bin, TrUserData) -> id(decode_msg_insert_requests(Bin, TrUserData), TrUserData); decode_msg_2_doit(insert_request, Bin, TrUserData) -> id(decode_msg_insert_request(Bin, TrUserData), TrUserData); +decode_msg_2_doit(delete_request, Bin, TrUserData) -> id(decode_msg_delete_request(Bin, TrUserData), TrUserData); decode_msg_2_doit(ddl_request, Bin, TrUserData) -> id(decode_msg_ddl_request(Bin, TrUserData), TrUserData); decode_msg_2_doit(create_table_expr, Bin, TrUserData) -> id(decode_msg_create_table_expr(Bin, TrUserData), TrUserData); decode_msg_2_doit(alter_expr, Bin, TrUserData) -> id(decode_msg_alter_expr(Bin, TrUserData), TrUserData); @@ -1751,9 +1851,10 @@ decode_msg_2_doit(flight_metadata, Bin, TrUserData) -> id(decode_msg_flight_meta decode_msg_greptime_request(Bin, TrUserData) -> dfp_read_field_def_greptime_request(Bin, 0, 0, 0, id('$undef', TrUserData), id('$undef', TrUserData), TrUserData). dfp_read_field_def_greptime_request(<<10, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> d_field_greptime_request_header(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); -dfp_read_field_def_greptime_request(<<18, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> d_field_greptime_request_insert(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); +dfp_read_field_def_greptime_request(<<18, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> d_field_greptime_request_inserts(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); dfp_read_field_def_greptime_request(<<26, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> d_field_greptime_request_query(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); dfp_read_field_def_greptime_request(<<34, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> d_field_greptime_request_ddl(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); +dfp_read_field_def_greptime_request(<<42, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> d_field_greptime_request_delete(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); dfp_read_field_def_greptime_request(<<>>, 0, 0, _, F@_1, F@_2, _) -> S1 = #{}, S2 = if F@_1 == '$undef' -> S1; @@ -1769,9 +1870,10 @@ dg_read_field_def_greptime_request(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F Key = X bsl N + Acc, case Key of 10 -> d_field_greptime_request_header(Rest, 0, 0, 0, F@_1, F@_2, TrUserData); - 18 -> d_field_greptime_request_insert(Rest, 0, 0, 0, F@_1, F@_2, TrUserData); + 18 -> d_field_greptime_request_inserts(Rest, 0, 0, 0, F@_1, F@_2, TrUserData); 26 -> d_field_greptime_request_query(Rest, 0, 0, 0, F@_1, F@_2, TrUserData); 34 -> d_field_greptime_request_ddl(Rest, 0, 0, 0, F@_1, F@_2, TrUserData); + 42 -> d_field_greptime_request_delete(Rest, 0, 0, 0, F@_1, F@_2, TrUserData); _ -> case Key band 7 of 0 -> skip_varint_greptime_request(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); @@ -1803,18 +1905,18 @@ d_field_greptime_request_header(<<0:1, X:7, Rest/binary>>, N, Acc, F, Prev, F@_2 F@_2, TrUserData). -d_field_greptime_request_insert(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> d_field_greptime_request_insert(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); -d_field_greptime_request_insert(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, Prev, TrUserData) -> - {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, {id(decode_msg_insert_request(Bs, TrUserData), TrUserData), Rest2} end, +d_field_greptime_request_inserts(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> d_field_greptime_request_inserts(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); +d_field_greptime_request_inserts(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, Prev, TrUserData) -> + {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, {id(decode_msg_insert_requests(Bs, TrUserData), TrUserData), Rest2} end, dfp_read_field_def_greptime_request(RestF, 0, 0, F, F@_1, case Prev of - '$undef' -> id({insert, NewFValue}, TrUserData); - {insert, MVPrev} -> id({insert, merge_msg_insert_request(MVPrev, NewFValue, TrUserData)}, TrUserData); - _ -> id({insert, NewFValue}, TrUserData) + '$undef' -> id({inserts, NewFValue}, TrUserData); + {inserts, MVPrev} -> id({inserts, merge_msg_insert_requests(MVPrev, NewFValue, TrUserData)}, TrUserData); + _ -> id({inserts, NewFValue}, TrUserData) end, TrUserData). @@ -1848,6 +1950,21 @@ d_field_greptime_request_ddl(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, Prev, T end, TrUserData). +d_field_greptime_request_delete(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> d_field_greptime_request_delete(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); +d_field_greptime_request_delete(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, Prev, TrUserData) -> + {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, {id(decode_msg_delete_request(Bs, TrUserData), TrUserData), Rest2} end, + dfp_read_field_def_greptime_request(RestF, + 0, + 0, + F, + F@_1, + case Prev of + '$undef' -> id({delete, NewFValue}, TrUserData); + {delete, MVPrev} -> id({delete, merge_msg_delete_request(MVPrev, NewFValue, TrUserData)}, TrUserData); + _ -> id({delete, NewFValue}, TrUserData) + end, + TrUserData). + skip_varint_greptime_request(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> skip_varint_greptime_request(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); skip_varint_greptime_request(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_greptime_request(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData). @@ -2007,6 +2124,58 @@ skip_32_query_request(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, TrUserData) -> dfp skip_64_query_request(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, TrUserData) -> dfp_read_field_def_query_request(Rest, Z1, Z2, F, F@_1, TrUserData). +decode_msg_insert_requests(Bin, TrUserData) -> dfp_read_field_def_insert_requests(Bin, 0, 0, 0, id([], TrUserData), TrUserData). + +dfp_read_field_def_insert_requests(<<10, Rest/binary>>, Z1, Z2, F, F@_1, TrUserData) -> d_field_insert_requests_inserts(Rest, Z1, Z2, F, F@_1, TrUserData); +dfp_read_field_def_insert_requests(<<>>, 0, 0, _, R1, TrUserData) -> + S1 = #{}, + if R1 == '$undef' -> S1; + true -> S1#{inserts => lists_reverse(R1, TrUserData)} + end; +dfp_read_field_def_insert_requests(Other, Z1, Z2, F, F@_1, TrUserData) -> dg_read_field_def_insert_requests(Other, Z1, Z2, F, F@_1, TrUserData). + +dg_read_field_def_insert_requests(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, TrUserData) when N < 32 - 7 -> dg_read_field_def_insert_requests(Rest, N + 7, X bsl N + Acc, F, F@_1, TrUserData); +dg_read_field_def_insert_requests(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, TrUserData) -> + Key = X bsl N + Acc, + case Key of + 10 -> d_field_insert_requests_inserts(Rest, 0, 0, 0, F@_1, TrUserData); + _ -> + case Key band 7 of + 0 -> skip_varint_insert_requests(Rest, 0, 0, Key bsr 3, F@_1, TrUserData); + 1 -> skip_64_insert_requests(Rest, 0, 0, Key bsr 3, F@_1, TrUserData); + 2 -> skip_length_delimited_insert_requests(Rest, 0, 0, Key bsr 3, F@_1, TrUserData); + 3 -> skip_group_insert_requests(Rest, 0, 0, Key bsr 3, F@_1, TrUserData); + 5 -> skip_32_insert_requests(Rest, 0, 0, Key bsr 3, F@_1, TrUserData) + end + end; +dg_read_field_def_insert_requests(<<>>, 0, 0, _, R1, TrUserData) -> + S1 = #{}, + if R1 == '$undef' -> S1; + true -> S1#{inserts => lists_reverse(R1, TrUserData)} + end. + +d_field_insert_requests_inserts(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, TrUserData) when N < 57 -> d_field_insert_requests_inserts(Rest, N + 7, X bsl N + Acc, F, F@_1, TrUserData); +d_field_insert_requests_inserts(<<0:1, X:7, Rest/binary>>, N, Acc, F, Prev, TrUserData) -> + {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, {id(decode_msg_insert_request(Bs, TrUserData), TrUserData), Rest2} end, + dfp_read_field_def_insert_requests(RestF, 0, 0, F, cons(NewFValue, Prev, TrUserData), TrUserData). + +skip_varint_insert_requests(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, TrUserData) -> skip_varint_insert_requests(Rest, Z1, Z2, F, F@_1, TrUserData); +skip_varint_insert_requests(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, TrUserData) -> dfp_read_field_def_insert_requests(Rest, Z1, Z2, F, F@_1, TrUserData). + +skip_length_delimited_insert_requests(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, TrUserData) when N < 57 -> skip_length_delimited_insert_requests(Rest, N + 7, X bsl N + Acc, F, F@_1, TrUserData); +skip_length_delimited_insert_requests(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, TrUserData) -> + Length = X bsl N + Acc, + <<_:Length/binary, Rest2/binary>> = Rest, + dfp_read_field_def_insert_requests(Rest2, 0, 0, F, F@_1, TrUserData). + +skip_group_insert_requests(Bin, _, Z2, FNum, F@_1, TrUserData) -> + {_, Rest} = read_group(Bin, FNum), + dfp_read_field_def_insert_requests(Rest, 0, Z2, FNum, F@_1, TrUserData). + +skip_32_insert_requests(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, TrUserData) -> dfp_read_field_def_insert_requests(Rest, Z1, Z2, F, F@_1, TrUserData). + +skip_64_insert_requests(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, TrUserData) -> dfp_read_field_def_insert_requests(Rest, Z1, Z2, F, F@_1, TrUserData). + decode_msg_insert_request(Bin, TrUserData) -> dfp_read_field_def_insert_request(Bin, 0, 0, 0, id(<<>>, TrUserData), id([], TrUserData), id(0, TrUserData), id(0, TrUserData), TrUserData). dfp_read_field_def_insert_request(<<10, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> d_field_insert_request_table_name(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData); @@ -2080,6 +2249,79 @@ skip_32_insert_request(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, skip_64_insert_request(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> dfp_read_field_def_insert_request(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData). +decode_msg_delete_request(Bin, TrUserData) -> dfp_read_field_def_delete_request(Bin, 0, 0, 0, id(<<>>, TrUserData), id(0, TrUserData), id([], TrUserData), id(0, TrUserData), TrUserData). + +dfp_read_field_def_delete_request(<<10, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> d_field_delete_request_table_name(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +dfp_read_field_def_delete_request(<<16, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> d_field_delete_request_region_number(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +dfp_read_field_def_delete_request(<<26, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> d_field_delete_request_key_columns(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +dfp_read_field_def_delete_request(<<32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> d_field_delete_request_row_count(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +dfp_read_field_def_delete_request(<<>>, 0, 0, _, F@_1, F@_2, R1, F@_4, TrUserData) -> + S1 = #{table_name => F@_1, region_number => F@_2, row_count => F@_4}, + if R1 == '$undef' -> S1; + true -> S1#{key_columns => lists_reverse(R1, TrUserData)} + end; +dfp_read_field_def_delete_request(Other, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> dg_read_field_def_delete_request(Other, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData). + +dg_read_field_def_delete_request(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData) when N < 32 - 7 -> dg_read_field_def_delete_request(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +dg_read_field_def_delete_request(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, F@_3, F@_4, TrUserData) -> + Key = X bsl N + Acc, + case Key of + 10 -> d_field_delete_request_table_name(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, TrUserData); + 16 -> d_field_delete_request_region_number(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, TrUserData); + 26 -> d_field_delete_request_key_columns(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, TrUserData); + 32 -> d_field_delete_request_row_count(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, TrUserData); + _ -> + case Key band 7 of + 0 -> skip_varint_delete_request(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, TrUserData); + 1 -> skip_64_delete_request(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, TrUserData); + 2 -> skip_length_delimited_delete_request(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, TrUserData); + 3 -> skip_group_delete_request(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, TrUserData); + 5 -> skip_32_delete_request(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, TrUserData) + end + end; +dg_read_field_def_delete_request(<<>>, 0, 0, _, F@_1, F@_2, R1, F@_4, TrUserData) -> + S1 = #{table_name => F@_1, region_number => F@_2, row_count => F@_4}, + if R1 == '$undef' -> S1; + true -> S1#{key_columns => lists_reverse(R1, TrUserData)} + end. + +d_field_delete_request_table_name(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData) when N < 57 -> d_field_delete_request_table_name(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +d_field_delete_request_table_name(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, F@_3, F@_4, TrUserData) -> + {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end, + dfp_read_field_def_delete_request(RestF, 0, 0, F, NewFValue, F@_2, F@_3, F@_4, TrUserData). + +d_field_delete_request_region_number(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData) when N < 57 -> d_field_delete_request_region_number(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +d_field_delete_request_region_number(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, _, F@_3, F@_4, TrUserData) -> + {NewFValue, RestF} = {id((X bsl N + Acc) band 4294967295, TrUserData), Rest}, + dfp_read_field_def_delete_request(RestF, 0, 0, F, F@_1, NewFValue, F@_3, F@_4, TrUserData). + +d_field_delete_request_key_columns(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData) when N < 57 -> d_field_delete_request_key_columns(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +d_field_delete_request_key_columns(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, Prev, F@_4, TrUserData) -> + {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, {id(decode_msg_column(Bs, TrUserData), TrUserData), Rest2} end, + dfp_read_field_def_delete_request(RestF, 0, 0, F, F@_1, F@_2, cons(NewFValue, Prev, TrUserData), F@_4, TrUserData). + +d_field_delete_request_row_count(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData) when N < 57 -> d_field_delete_request_row_count(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +d_field_delete_request_row_count(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, _, TrUserData) -> + {NewFValue, RestF} = {id((X bsl N + Acc) band 4294967295, TrUserData), Rest}, + dfp_read_field_def_delete_request(RestF, 0, 0, F, F@_1, F@_2, F@_3, NewFValue, TrUserData). + +skip_varint_delete_request(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> skip_varint_delete_request(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +skip_varint_delete_request(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> dfp_read_field_def_delete_request(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData). + +skip_length_delimited_delete_request(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData) when N < 57 -> skip_length_delimited_delete_request(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +skip_length_delimited_delete_request(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> + Length = X bsl N + Acc, + <<_:Length/binary, Rest2/binary>> = Rest, + dfp_read_field_def_delete_request(Rest2, 0, 0, F, F@_1, F@_2, F@_3, F@_4, TrUserData). + +skip_group_delete_request(Bin, _, Z2, FNum, F@_1, F@_2, F@_3, F@_4, TrUserData) -> + {_, Rest} = read_group(Bin, FNum), + dfp_read_field_def_delete_request(Rest, 0, Z2, FNum, F@_1, F@_2, F@_3, F@_4, TrUserData). + +skip_32_delete_request(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> dfp_read_field_def_delete_request(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData). + +skip_64_delete_request(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> dfp_read_field_def_delete_request(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData). + decode_msg_ddl_request(Bin, TrUserData) -> dfp_read_field_def_ddl_request(Bin, 0, 0, 0, id('$undef', TrUserData), TrUserData). dfp_read_field_def_ddl_request(<<10, Rest/binary>>, Z1, Z2, F, F@_1, TrUserData) -> d_field_ddl_request_create_database(Rest, Z1, Z2, F, F@_1, TrUserData); @@ -4266,7 +4508,9 @@ merge_msgs(Prev, New, MsgName, Opts) -> greptime_request -> merge_msg_greptime_request(Prev, New, TrUserData); greptime_response -> merge_msg_greptime_response(Prev, New, TrUserData); query_request -> merge_msg_query_request(Prev, New, TrUserData); + insert_requests -> merge_msg_insert_requests(Prev, New, TrUserData); insert_request -> merge_msg_insert_request(Prev, New, TrUserData); + delete_request -> merge_msg_delete_request(Prev, New, TrUserData); ddl_request -> merge_msg_ddl_request(Prev, New, TrUserData); create_table_expr -> merge_msg_create_table_expr(Prev, New, TrUserData); alter_expr -> merge_msg_alter_expr(Prev, New, TrUserData); @@ -4301,9 +4545,10 @@ merge_msg_greptime_request(PMsg, NMsg, TrUserData) -> {_, _} -> S1 end, case {PMsg, NMsg} of - {#{request := {insert, OPFrequest}}, #{request := {insert, ONFrequest}}} -> S2#{request => {insert, merge_msg_insert_request(OPFrequest, ONFrequest, TrUserData)}}; + {#{request := {inserts, OPFrequest}}, #{request := {inserts, ONFrequest}}} -> S2#{request => {inserts, merge_msg_insert_requests(OPFrequest, ONFrequest, TrUserData)}}; {#{request := {query, OPFrequest}}, #{request := {query, ONFrequest}}} -> S2#{request => {query, merge_msg_query_request(OPFrequest, ONFrequest, TrUserData)}}; {#{request := {ddl, OPFrequest}}, #{request := {ddl, ONFrequest}}} -> S2#{request => {ddl, merge_msg_ddl_request(OPFrequest, ONFrequest, TrUserData)}}; + {#{request := {delete, OPFrequest}}, #{request := {delete, ONFrequest}}} -> S2#{request => {delete, merge_msg_delete_request(OPFrequest, ONFrequest, TrUserData)}}; {_, #{request := NFrequest}} -> S2#{request => NFrequest}; {#{request := PFrequest}, _} -> S2#{request => PFrequest}; {_, _} -> S2 @@ -4334,6 +4579,16 @@ merge_msg_query_request(PMsg, NMsg, _) -> _ -> S1 end. +-compile({nowarn_unused_function,merge_msg_insert_requests/3}). +merge_msg_insert_requests(PMsg, NMsg, TrUserData) -> + S1 = #{}, + case {PMsg, NMsg} of + {#{inserts := PFinserts}, #{inserts := NFinserts}} -> S1#{inserts => 'erlang_++'(PFinserts, NFinserts, TrUserData)}; + {_, #{inserts := NFinserts}} -> S1#{inserts => NFinserts}; + {#{inserts := PFinserts}, _} -> S1#{inserts => PFinserts}; + {_, _} -> S1 + end. + -compile({nowarn_unused_function,merge_msg_insert_request/3}). merge_msg_insert_request(PMsg, NMsg, TrUserData) -> S1 = #{}, @@ -4359,6 +4614,31 @@ merge_msg_insert_request(PMsg, NMsg, TrUserData) -> _ -> S4 end. +-compile({nowarn_unused_function,merge_msg_delete_request/3}). +merge_msg_delete_request(PMsg, NMsg, TrUserData) -> + S1 = #{}, + S2 = case {PMsg, NMsg} of + {_, #{table_name := NFtable_name}} -> S1#{table_name => NFtable_name}; + {#{table_name := PFtable_name}, _} -> S1#{table_name => PFtable_name}; + _ -> S1 + end, + S3 = case {PMsg, NMsg} of + {_, #{region_number := NFregion_number}} -> S2#{region_number => NFregion_number}; + {#{region_number := PFregion_number}, _} -> S2#{region_number => PFregion_number}; + _ -> S2 + end, + S4 = case {PMsg, NMsg} of + {#{key_columns := PFkey_columns}, #{key_columns := NFkey_columns}} -> S3#{key_columns => 'erlang_++'(PFkey_columns, NFkey_columns, TrUserData)}; + {_, #{key_columns := NFkey_columns}} -> S3#{key_columns => NFkey_columns}; + {#{key_columns := PFkey_columns}, _} -> S3#{key_columns => PFkey_columns}; + {_, _} -> S3 + end, + case {PMsg, NMsg} of + {_, #{row_count := NFrow_count}} -> S4#{row_count => NFrow_count}; + {#{row_count := PFrow_count}, _} -> S4#{row_count => PFrow_count}; + _ -> S4 + end. + -compile({nowarn_unused_function,merge_msg_ddl_request/3}). merge_msg_ddl_request(PMsg, NMsg, TrUserData) -> S1 = #{}, @@ -4850,7 +5130,9 @@ verify_msg(Msg, MsgName, Opts) -> greptime_request -> v_msg_greptime_request(Msg, [MsgName], TrUserData); greptime_response -> v_msg_greptime_response(Msg, [MsgName], TrUserData); query_request -> v_msg_query_request(Msg, [MsgName], TrUserData); + insert_requests -> v_msg_insert_requests(Msg, [MsgName], TrUserData); insert_request -> v_msg_insert_request(Msg, [MsgName], TrUserData); + delete_request -> v_msg_delete_request(Msg, [MsgName], TrUserData); ddl_request -> v_msg_ddl_request(Msg, [MsgName], TrUserData); create_table_expr -> v_msg_create_table_expr(Msg, [MsgName], TrUserData); alter_expr -> v_msg_alter_expr(Msg, [MsgName], TrUserData); @@ -4885,9 +5167,10 @@ v_msg_greptime_request(#{} = M, Path, TrUserData) -> _ -> ok end, case M of - #{request := {insert, OF2}} -> v_msg_insert_request(OF2, [insert, request | Path], TrUserData); + #{request := {inserts, OF2}} -> v_msg_insert_requests(OF2, [inserts, request | Path], TrUserData); #{request := {query, OF2}} -> v_msg_query_request(OF2, [query, request | Path], TrUserData); #{request := {ddl, OF2}} -> v_msg_ddl_request(OF2, [ddl, request | Path], TrUserData); + #{request := {delete, OF2}} -> v_msg_delete_request(OF2, [delete, request | Path], TrUserData); #{request := F2} -> mk_type_error(invalid_oneof, F2, [request | Path]); _ -> ok end, @@ -4938,6 +5221,26 @@ v_msg_query_request(#{} = M, Path, TrUserData) -> v_msg_query_request(M, Path, _TrUserData) when is_map(M) -> mk_type_error({missing_fields, [] -- maps:keys(M), query_request}, M, Path); v_msg_query_request(X, Path, _TrUserData) -> mk_type_error({expected_msg, query_request}, X, Path). +-compile({nowarn_unused_function,v_msg_insert_requests/3}). +-dialyzer({nowarn_function,v_msg_insert_requests/3}). +v_msg_insert_requests(#{} = M, Path, TrUserData) -> + case M of + #{inserts := F1} -> + if is_list(F1) -> + _ = [v_msg_insert_request(Elem, [inserts | Path], TrUserData) || Elem <- F1], + ok; + true -> mk_type_error({invalid_list_of, {msg, insert_request}}, F1, [inserts | Path]) + end; + _ -> ok + end, + lists:foreach(fun (inserts) -> ok; + (OtherKey) -> mk_type_error({extraneous_key, OtherKey}, M, Path) + end, + maps:keys(M)), + ok; +v_msg_insert_requests(M, Path, _TrUserData) when is_map(M) -> mk_type_error({missing_fields, [] -- maps:keys(M), insert_requests}, M, Path); +v_msg_insert_requests(X, Path, _TrUserData) -> mk_type_error({expected_msg, insert_requests}, X, Path). + -compile({nowarn_unused_function,v_msg_insert_request/3}). -dialyzer({nowarn_function,v_msg_insert_request/3}). v_msg_insert_request(#{} = M, Path, TrUserData) -> @@ -4973,6 +5276,41 @@ v_msg_insert_request(#{} = M, Path, TrUserData) -> v_msg_insert_request(M, Path, _TrUserData) when is_map(M) -> mk_type_error({missing_fields, [] -- maps:keys(M), insert_request}, M, Path); v_msg_insert_request(X, Path, _TrUserData) -> mk_type_error({expected_msg, insert_request}, X, Path). +-compile({nowarn_unused_function,v_msg_delete_request/3}). +-dialyzer({nowarn_function,v_msg_delete_request/3}). +v_msg_delete_request(#{} = M, Path, TrUserData) -> + case M of + #{table_name := F1} -> v_type_string(F1, [table_name | Path], TrUserData); + _ -> ok + end, + case M of + #{region_number := F2} -> v_type_uint32(F2, [region_number | Path], TrUserData); + _ -> ok + end, + case M of + #{key_columns := F3} -> + if is_list(F3) -> + _ = [v_msg_column(Elem, [key_columns | Path], TrUserData) || Elem <- F3], + ok; + true -> mk_type_error({invalid_list_of, {msg, column}}, F3, [key_columns | Path]) + end; + _ -> ok + end, + case M of + #{row_count := F4} -> v_type_uint32(F4, [row_count | Path], TrUserData); + _ -> ok + end, + lists:foreach(fun (table_name) -> ok; + (region_number) -> ok; + (key_columns) -> ok; + (row_count) -> ok; + (OtherKey) -> mk_type_error({extraneous_key, OtherKey}, M, Path) + end, + maps:keys(M)), + ok; +v_msg_delete_request(M, Path, _TrUserData) when is_map(M) -> mk_type_error({missing_fields, [] -- maps:keys(M), delete_request}, M, Path); +v_msg_delete_request(X, Path, _TrUserData) -> mk_type_error({expected_msg, delete_request}, X, Path). + -compile({nowarn_unused_function,v_msg_ddl_request/3}). -dialyzer({nowarn_function,v_msg_ddl_request/3}). v_msg_ddl_request(#{} = M, Path, TrUserData) -> @@ -5883,20 +6221,27 @@ get_msg_defs() -> [#{name => header, fnum => 1, rnum => 2, type => {msg, request_header}, occurrence => defaulty, opts => []}, #{name => request, rnum => 3, fields => - [#{name => insert, fnum => 2, rnum => 3, type => {msg, insert_request}, occurrence => optional, opts => []}, + [#{name => inserts, fnum => 2, rnum => 3, type => {msg, insert_requests}, occurrence => optional, opts => []}, #{name => query, fnum => 3, rnum => 3, type => {msg, query_request}, occurrence => optional, opts => []}, - #{name => ddl, fnum => 4, rnum => 3, type => {msg, ddl_request}, occurrence => optional, opts => []}], + #{name => ddl, fnum => 4, rnum => 3, type => {msg, ddl_request}, occurrence => optional, opts => []}, + #{name => delete, fnum => 5, rnum => 3, type => {msg, delete_request}, occurrence => optional, opts => []}], opts => []}]}, {{msg, greptime_response}, [#{name => header, fnum => 1, rnum => 2, type => {msg, response_header}, occurrence => defaulty, opts => []}, #{name => response, rnum => 3, fields => [#{name => affected_rows, fnum => 2, rnum => 3, type => {msg, affected_rows}, occurrence => optional, opts => []}], opts => []}]}, {{msg, query_request}, [#{name => query, rnum => 2, fields => [#{name => sql, fnum => 1, rnum => 2, type => string, occurrence => optional, opts => []}, #{name => logical_plan, fnum => 2, rnum => 2, type => bytes, occurrence => optional, opts => []}], opts => []}]}, + {{msg, insert_requests}, [#{name => inserts, fnum => 1, rnum => 2, type => {msg, insert_request}, occurrence => repeated, opts => []}]}, {{msg, insert_request}, [#{name => table_name, fnum => 1, rnum => 2, type => string, occurrence => defaulty, opts => []}, #{name => columns, fnum => 3, rnum => 3, type => {msg, column}, occurrence => repeated, opts => []}, #{name => row_count, fnum => 4, rnum => 4, type => uint32, occurrence => defaulty, opts => []}, #{name => region_number, fnum => 5, rnum => 5, type => uint32, occurrence => defaulty, opts => []}]}, + {{msg, delete_request}, + [#{name => table_name, fnum => 1, rnum => 2, type => string, occurrence => defaulty, opts => []}, + #{name => region_number, fnum => 2, rnum => 3, type => uint32, occurrence => defaulty, opts => []}, + #{name => key_columns, fnum => 3, rnum => 4, type => {msg, column}, occurrence => repeated, opts => []}, + #{name => row_count, fnum => 4, rnum => 5, type => uint32, occurrence => defaulty, opts => []}]}, {{msg, ddl_request}, [#{name => expr, rnum => 2, fields => @@ -5995,7 +6340,9 @@ get_msg_names() -> [greptime_request, greptime_response, query_request, + insert_requests, insert_request, + delete_request, ddl_request, create_table_expr, alter_expr, @@ -6027,7 +6374,9 @@ get_msg_or_group_names() -> [greptime_request, greptime_response, query_request, + insert_requests, insert_request, + delete_request, ddl_request, create_table_expr, alter_expr, @@ -6073,20 +6422,27 @@ find_msg_def(greptime_request) -> [#{name => header, fnum => 1, rnum => 2, type => {msg, request_header}, occurrence => defaulty, opts => []}, #{name => request, rnum => 3, fields => - [#{name => insert, fnum => 2, rnum => 3, type => {msg, insert_request}, occurrence => optional, opts => []}, + [#{name => inserts, fnum => 2, rnum => 3, type => {msg, insert_requests}, occurrence => optional, opts => []}, #{name => query, fnum => 3, rnum => 3, type => {msg, query_request}, occurrence => optional, opts => []}, - #{name => ddl, fnum => 4, rnum => 3, type => {msg, ddl_request}, occurrence => optional, opts => []}], + #{name => ddl, fnum => 4, rnum => 3, type => {msg, ddl_request}, occurrence => optional, opts => []}, + #{name => delete, fnum => 5, rnum => 3, type => {msg, delete_request}, occurrence => optional, opts => []}], opts => []}]; find_msg_def(greptime_response) -> [#{name => header, fnum => 1, rnum => 2, type => {msg, response_header}, occurrence => defaulty, opts => []}, #{name => response, rnum => 3, fields => [#{name => affected_rows, fnum => 2, rnum => 3, type => {msg, affected_rows}, occurrence => optional, opts => []}], opts => []}]; find_msg_def(query_request) -> [#{name => query, rnum => 2, fields => [#{name => sql, fnum => 1, rnum => 2, type => string, occurrence => optional, opts => []}, #{name => logical_plan, fnum => 2, rnum => 2, type => bytes, occurrence => optional, opts => []}], opts => []}]; +find_msg_def(insert_requests) -> [#{name => inserts, fnum => 1, rnum => 2, type => {msg, insert_request}, occurrence => repeated, opts => []}]; find_msg_def(insert_request) -> [#{name => table_name, fnum => 1, rnum => 2, type => string, occurrence => defaulty, opts => []}, #{name => columns, fnum => 3, rnum => 3, type => {msg, column}, occurrence => repeated, opts => []}, #{name => row_count, fnum => 4, rnum => 4, type => uint32, occurrence => defaulty, opts => []}, #{name => region_number, fnum => 5, rnum => 5, type => uint32, occurrence => defaulty, opts => []}]; +find_msg_def(delete_request) -> + [#{name => table_name, fnum => 1, rnum => 2, type => string, occurrence => defaulty, opts => []}, + #{name => region_number, fnum => 2, rnum => 3, type => uint32, occurrence => defaulty, opts => []}, + #{name => key_columns, fnum => 3, rnum => 4, type => {msg, column}, occurrence => repeated, opts => []}, + #{name => row_count, fnum => 4, rnum => 5, type => uint32, occurrence => defaulty, opts => []}]; find_msg_def(ddl_request) -> [#{name => expr, rnum => 2, fields => @@ -6326,7 +6682,9 @@ service_and_rpc_name_to_fqbins(S, R) -> error({gpb_error, {badservice_or_rpc, {S fqbin_to_msg_name(<<"greptime.v1.GreptimeRequest">>) -> greptime_request; fqbin_to_msg_name(<<"greptime.v1.GreptimeResponse">>) -> greptime_response; fqbin_to_msg_name(<<"greptime.v1.QueryRequest">>) -> query_request; +fqbin_to_msg_name(<<"greptime.v1.InsertRequests">>) -> insert_requests; fqbin_to_msg_name(<<"greptime.v1.InsertRequest">>) -> insert_request; +fqbin_to_msg_name(<<"greptime.v1.DeleteRequest">>) -> delete_request; fqbin_to_msg_name(<<"greptime.v1.DdlRequest">>) -> ddl_request; fqbin_to_msg_name(<<"greptime.v1.CreateTableExpr">>) -> create_table_expr; fqbin_to_msg_name(<<"greptime.v1.AlterExpr">>) -> alter_expr; @@ -6355,7 +6713,9 @@ fqbin_to_msg_name(E) -> error({gpb_error, {badmsg, E}}). msg_name_to_fqbin(greptime_request) -> <<"greptime.v1.GreptimeRequest">>; msg_name_to_fqbin(greptime_response) -> <<"greptime.v1.GreptimeResponse">>; msg_name_to_fqbin(query_request) -> <<"greptime.v1.QueryRequest">>; +msg_name_to_fqbin(insert_requests) -> <<"greptime.v1.InsertRequests">>; msg_name_to_fqbin(insert_request) -> <<"greptime.v1.InsertRequest">>; +msg_name_to_fqbin(delete_request) -> <<"greptime.v1.DeleteRequest">>; msg_name_to_fqbin(ddl_request) -> <<"greptime.v1.DdlRequest">>; msg_name_to_fqbin(create_table_expr) -> <<"greptime.v1.CreateTableExpr">>; msg_name_to_fqbin(alter_expr) -> <<"greptime.v1.AlterExpr">>; @@ -6418,7 +6778,7 @@ get_all_source_basenames() -> ["database.proto", "ddl.proto", "column.proto", "c get_all_proto_names() -> ["database", "ddl", "column", "common"]. -get_msg_containment("database") -> [greptime_request, greptime_response, insert_request, query_request]; +get_msg_containment("database") -> [delete_request, greptime_request, greptime_response, insert_request, insert_requests, query_request]; get_msg_containment("ddl") -> [add_column, add_columns, alter_expr, create_database_expr, create_table_expr, ddl_request, drop_column, drop_columns, drop_table_expr, flush_table_expr, rename_table, table_id]; get_msg_containment("column") -> [column, values, column_def]; get_msg_containment("common") -> [affected_rows, auth_header, basic, flight_metadata, request_header, response_header, token]; @@ -6467,11 +6827,13 @@ get_proto_by_msg_name_as_fqbin(<<"greptime.v1.AffectedRows">>) -> "common"; get_proto_by_msg_name_as_fqbin(<<"greptime.v1.Column.Values">>) -> "column"; get_proto_by_msg_name_as_fqbin(<<"greptime.v1.DropColumns">>) -> "ddl"; get_proto_by_msg_name_as_fqbin(<<"greptime.v1.AddColumns">>) -> "ddl"; +get_proto_by_msg_name_as_fqbin(<<"greptime.v1.InsertRequests">>) -> "database"; get_proto_by_msg_name_as_fqbin(<<"greptime.v1.TableId">>) -> "ddl"; get_proto_by_msg_name_as_fqbin(<<"greptime.v1.DdlRequest">>) -> "ddl"; get_proto_by_msg_name_as_fqbin(<<"greptime.v1.QueryRequest">>) -> "database"; get_proto_by_msg_name_as_fqbin(<<"greptime.v1.InsertRequest">>) -> "database"; get_proto_by_msg_name_as_fqbin(<<"greptime.v1.GreptimeRequest">>) -> "database"; +get_proto_by_msg_name_as_fqbin(<<"greptime.v1.DeleteRequest">>) -> "database"; get_proto_by_msg_name_as_fqbin(<<"greptime.v1.RenameTable">>) -> "ddl"; get_proto_by_msg_name_as_fqbin(<<"greptime.v1.GreptimeResponse">>) -> "database"; get_proto_by_msg_name_as_fqbin(<<"greptime.v1.ColumnDef">>) -> "column"; diff --git a/src/greptimedb.erl b/src/greptimedb.erl index e7f5c7d..0943c05 100644 --- a/src/greptimedb.erl +++ b/src/greptimedb.erl @@ -14,8 +14,8 @@ -module(greptimedb). --export([start_client/1, stop_client/1, write/3, write_stream/1, is_alive/1, is_alive/2, - ddl/1]). +-export([start_client/1, stop_client/1, write_batch/2, write/3, write_stream/1, + is_alive/1, is_alive/2, ddl/1]). -spec start_client(list()) -> {ok, Client :: map()} | @@ -38,8 +38,10 @@ start_client(Options0) -> {error, Reason} end. --spec write(Client, Metric, Points) -> {ok, term()} | {error, term()} +-spec write_batch(Client, MetricAndPoints) -> {ok, term()} | {error, term()} when Client :: map(), + MetricAndPoints :: [MetricAndPoint], + MetricAndPoint :: {Metric, Points}, Metric :: Table | {DbName, Table}, DbName :: atom() | binary() | list(), Table :: atom() | binary() | list(), @@ -48,17 +50,29 @@ start_client(Options0) -> #{tags => map(), fields => map(), timestamp => integer()}. -write(Client, Metric, Points) -> +write_batch(Client, MetricAndPoints) -> try - Request = greptimedb_encoder:insert_request(Client, Metric, Points), + Request = greptimedb_encoder:insert_requests(Client, MetricAndPoints), handle(Client, Request) catch E:R:S -> - logger:error("[GreptimeDB] write ~0p failed: ~0p ~0p ~0p ~p", - [Metric, Points, E, R, S]), + logger:error("[GreptimeDB] write ~0p failed: ~0p ~0p ~p", [MetricAndPoints, E, R, S]), {error, R} end. +-spec write(Client, Metric, Points) -> {ok, term()} | {error, term()} + when Client :: map(), + Metric :: Table | {DbName, Table}, + DbName :: atom() | binary() | list(), + Table :: atom() | binary() | list(), + Points :: [Point], + Point :: + #{tags => map(), + fields => map(), + timestamp => integer()}. +write(Client, Metric, Points) -> + write_batch(Client, [{Metric, Points}]). + -spec write_stream(Client) -> {ok, term()} | {error, term()} when Client :: map(). write_stream(Client) -> try diff --git a/src/greptimedb_encoder.erl b/src/greptimedb_encoder.erl index a5a6771..b73b66a 100644 --- a/src/greptimedb_encoder.erl +++ b/src/greptimedb_encoder.erl @@ -14,33 +14,43 @@ -module(greptimedb_encoder). --export([insert_request/3]). +-export([insert_requests/2]). -define(TS_COLUMN, <<"greptime_timestamp">>). -define(DEFAULT_DBNAME, "greptime-public"). -insert_request(#{cli_opts := Options} = _Client, {DbName, Table}, Points) -> - RowCount = length(Points), - Columns = - lists:map(fun(Column) -> pad_null_mask(Column, RowCount) end, collect_columns(Points)), +insert_requests(Client, TableAndPoints) -> + insert_requests(Client, TableAndPoints, unknown, []). + +insert_requests(#{cli_opts := Options} = _Client, [], DbName, Inserts) -> AuthHeader = proplists:get_value(auth, Options, {}), Header = case AuthHeader of {} -> #{dbname => DbName}; Scheme -> - #{dbname => DbName, - authorization => #{auth_scheme => Scheme}} + #{dbname => DbName, authorization => #{auth_scheme => Scheme}} end, - - Request = - {insert, - #{table_name => Table, - columns => Columns, - row_count => RowCount}}, - #{header => Header, request => Request}; -insert_request(Client, Table, Points) -> - insert_request(Client, {?DEFAULT_DBNAME, Table}, Points). + #{header => Header, request => {inserts, #{inserts => Inserts}}}; +insert_requests(Client, [{Table, Points} | T], PrevDbName, Inserts) -> + {DbName, Insert} = insert_request(Table, Points), + case PrevDbName of + unknown -> + insert_requests(Client, T, DbName, [Insert | Inserts]); + Name when Name == DbName -> + insert_requests(Client, T, Name, [Insert | Inserts]) + end. + +insert_request({DbName, Table}, Points) -> + RowCount = length(Points), + Columns = + lists:map(fun(Column) -> pad_null_mask(Column, RowCount) end, collect_columns(Points)), + {DbName, + #{table_name => Table, + columns => Columns, + row_count => RowCount}}; +insert_request(Table, Points) -> + insert_request({?DEFAULT_DBNAME, Table}, Points). %%%=================================================================== %%% Internal functions diff --git a/src/greptimedb_stream.erl b/src/greptimedb_stream.erl index a833ddd..5c7bffa 100644 --- a/src/greptimedb_stream.erl +++ b/src/greptimedb_stream.erl @@ -14,7 +14,7 @@ timestamp => integer()}. write(Stream, Metric, Points) -> try - Request = greptimedb_encoder:insert_request(Stream, Metric, Points), + Request = greptimedb_encoder:insert_requests(Stream, [{Metric, Points}]), grpcbox_client:send(Stream, Request) catch E:R:S -> diff --git a/test/greptimedb_SUITE.erl b/test/greptimedb_SUITE.erl index b052f37..5091e43 100644 --- a/test/greptimedb_SUITE.erl +++ b/test/greptimedb_SUITE.erl @@ -6,7 +6,7 @@ -include_lib("eunit/include/eunit.hrl"). all() -> - [t_write, t_write_stream, t_collect_columns]. + [t_write, t_write_stream, t_collect_columns, t_write_batch]. init_per_suite(Config) -> application:ensure_all_started(greptimedb), @@ -49,12 +49,10 @@ t_collect_columns(_) -> Metric = "Test", AuthInfo = {basic, #{username => "test", password => "test"}}, Client = #{cli_opts => [{auth, AuthInfo}]}, - Request = greptimedb_encoder:insert_request(Client, Metric, Points), + Request = greptimedb_encoder:insert_requests(Client, [{Metric, Points}]), case Request of - #{header := - #{dbname := DbName, - authorization := Auth}, - request := {insert, #{columns := Columns}}} -> + #{header := #{dbname := DbName, authorization := Auth}, + request := {inserts, #{inserts := [#{columns := Columns}]}}} -> ?assertEqual(DbName, "greptime-public"), ?assertEqual(8, length(Columns)), ?assertEqual(Auth, #{auth_scheme => AuthInfo}), @@ -140,3 +138,26 @@ t_write_stream(_) -> {ok, #{response := {affected_rows, #{value := 55}}}} = greptimedb_stream:finish(Stream), ok. + +t_write_batch(_) -> + Options = + [{endpoints, [{http, "localhost", 4001}]}, + {pool, greptimedb_client_pool_4}, + {pool_size, 8}, + {pool_type, random}, + {auth, {basic, #{username => <<"greptime_user">>, password => <<"greptime_pwd">>}}}], + + {ok, Client} = greptimedb:start_client(Options), + true = greptimedb:is_alive(Client), + + Metric = <<"temperatures_">>, + MetricAndPoints = + lists:map(fun(N) -> + Points = points(N), + {erlang:iolist_to_binary([Metric, integer_to_binary(N)]), Points} + end, + lists:seq(1, 10)), + + {ok, #{response := {affected_rows, #{value := 55}}}} = + greptimedb:write_batch(Client, MetricAndPoints), + ok.