diff --git a/protos/health.proto b/protos/health.proto new file mode 100644 index 0000000..ca21a0e --- /dev/null +++ b/protos/health.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package greptime.v1; + +option java_package = "io.greptime.v1"; +option java_outer_classname = "Health"; +option go_package = "github.com/GreptimeTeam/greptime-proto/go/greptime/v1"; + + +service HealthCheck { + rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse); +} + +message HealthCheckRequest {} + +message HealthCheckResponse {} diff --git a/src/greptime_v_1_health_check_bhvr.erl b/src/greptime_v_1_health_check_bhvr.erl new file mode 100644 index 0000000..b8573ed --- /dev/null +++ b/src/greptime_v_1_health_check_bhvr.erl @@ -0,0 +1,13 @@ +%%%------------------------------------------------------------------- +%% @doc Behaviour to implement for grpc service greptime.v1.HealthCheck. +%% @end +%%%------------------------------------------------------------------- + +%% this module was generated and should not be modified manually + +-module(greptime_v_1_health_check_bhvr). + +%% Unary RPC +-callback health_check(ctx:t(), health_pb:health_check_request()) -> + {ok, health_pb:health_check_response(), ctx:t()} | grpcbox_stream:grpc_error_response(). + diff --git a/src/greptime_v_1_health_check_client.erl b/src/greptime_v_1_health_check_client.erl new file mode 100644 index 0000000..576eead --- /dev/null +++ b/src/greptime_v_1_health_check_client.erl @@ -0,0 +1,43 @@ +%%%------------------------------------------------------------------- +%% @doc Client module for grpc service greptime.v1.HealthCheck. +%% @end +%%%------------------------------------------------------------------- + +%% this module was generated and should not be modified manually + +-module(greptime_v_1_health_check_client). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("grpcbox/include/grpcbox.hrl"). + +-define(is_ctx(Ctx), is_tuple(Ctx) andalso element(1, Ctx) =:= ctx). + +-define(SERVICE, 'greptime.v1.HealthCheck'). +-define(PROTO_MODULE, 'health_pb'). +-define(MARSHAL_FUN(T), fun(I) -> ?PROTO_MODULE:encode_msg(I, T) end). +-define(UNMARSHAL_FUN(T), fun(I) -> ?PROTO_MODULE:decode_msg(I, T) end). +-define(DEF(Input, Output, MessageType), #grpcbox_def{service=?SERVICE, + message_type=MessageType, + marshal_fun=?MARSHAL_FUN(Input), + unmarshal_fun=?UNMARSHAL_FUN(Output)}). + +%% @doc Unary RPC +-spec health_check(health_pb:health_check_request()) -> + {ok, health_pb:health_check_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}. +health_check(Input) -> + health_check(ctx:new(), Input, #{}). + +-spec health_check(ctx:t() | health_pb:health_check_request(), health_pb:health_check_request() | grpcbox_client:options()) -> + {ok, health_pb:health_check_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}. +health_check(Ctx, Input) when ?is_ctx(Ctx) -> + health_check(Ctx, Input, #{}); +health_check(Input, Options) -> + health_check(ctx:new(), Input, Options). + +-spec health_check(ctx:t(), health_pb:health_check_request(), grpcbox_client:options()) -> + {ok, health_pb:health_check_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}. +health_check(Ctx, Input, Options) -> + grpcbox_client:unary(Ctx, <<"/greptime.v1.HealthCheck/HealthCheck">>, Input, ?DEF(health_check_request, health_check_response, <<"greptime.v1.HealthCheckRequest">>), Options). + diff --git a/src/greptimedb.erl b/src/greptimedb.erl index c3e7b25..e7f5c7d 100644 --- a/src/greptimedb.erl +++ b/src/greptimedb.erl @@ -14,7 +14,8 @@ -module(greptimedb). --export([start_client/1, stop_client/1, write/3, write_stream/1, ddl/1]). +-export([start_client/1, stop_client/1, write/3, write_stream/1, is_alive/1, is_alive/2, + ddl/1]). -spec start_client(list()) -> {ok, Client :: map()} | @@ -37,10 +38,20 @@ start_client(Options0) -> {error, Reason} end. +-spec write(Client, Metric, Points) -> {ok, term()} | {error, term()} + when Client :: map(), + Metric :: Table | {DbName, Table}, + DbName :: atom() | binary() | list(), + Table :: atom() | binary() | list(), + Points :: [Point], + Point :: + #{tags => map(), + fields => map(), + timestamp => integer()}. write(Client, Metric, Points) -> try Request = greptimedb_encoder:insert_request(Client, Metric, Points), - rpc_call(Client, Request) + handle(Client, Request) catch E:R:S -> logger:error("[GreptimeDB] write ~0p failed: ~0p ~0p ~0p ~p", @@ -48,6 +59,7 @@ write(Client, Metric, Points) -> {error, R} end. +-spec write_stream(Client) -> {ok, term()} | {error, term()} when Client :: map(). write_stream(Client) -> try rpc_write_stream(Client) @@ -60,6 +72,26 @@ write_stream(Client) -> ddl(_Client) -> todo. +-spec is_alive(Client :: map()) -> true | false. +is_alive(Client) -> + is_alive(Client, false). + +-spec is_alive(Client :: map(), ReturnReason :: boolean()) -> + true | false | {false, Reason :: term()}. +is_alive(Client, ReturnReason) -> + try + case health_check(Client) of + {ok, _Resp} -> + true; + Return -> + maybe_return_reason(Return, ReturnReason) + end + catch + E:R:S -> + logger:error("[GreptimeDB] health check failed: ~0p ~0p ~p", [E, R, S]), + maybe_return_reason({error, R}, ReturnReason) + end. + -spec stop_client(Client :: map()) -> ok | term(). stop_client(#{pool := Pool}) -> ecpool:stop_sup_pool(Pool). @@ -68,8 +100,8 @@ stop_client(#{pool := Pool}) -> %%% Internal functions %%%=================================================================== -rpc_call(#{pool := Pool} = _Client, Request) -> - Fun = fun(Worker) -> greptimedb_worker:rpc_call(Worker, Request) end, +handle(#{pool := Pool} = _Client, Request) -> + Fun = fun(Worker) -> greptimedb_worker:handle(Worker, Request) end, try ecpool:with_client(Pool, Fun) catch @@ -78,6 +110,16 @@ rpc_call(#{pool := Pool} = _Client, Request) -> {error, {E, R}} end. +health_check(#{pool := Pool} = _Client) -> + Fun = fun(Worker) -> greptimedb_worker:health_check(Worker) end, + try + ecpool:with_client(Pool, Fun) + catch + E:R:S -> + logger:error("[GreptimeDB] grpc health check failed: ~0p ~0p ~0p", [E, R, S]), + {error, {E, R}} + end. + rpc_write_stream(#{pool := Pool, cli_opts := Options} = _Client) -> Fun = fun(Worker) -> case greptimedb_worker:stream(Worker) of @@ -94,3 +136,8 @@ rpc_write_stream(#{pool := Pool, cli_opts := Options} = _Client) -> logger:error("[GreptimeDB] grpc write fail: ~0p ~0p ~0p", [E, R, S]), {error, {E, R}} end. + +maybe_return_reason({error, Reason}, true) -> + {false, Reason}; +maybe_return_reason(_, _) -> + false. diff --git a/src/greptimedb_stream.erl b/src/greptimedb_stream.erl index a4bdef4..a833ddd 100644 --- a/src/greptimedb_stream.erl +++ b/src/greptimedb_stream.erl @@ -2,6 +2,16 @@ -export([write/3, finish/1]). +-spec write(Stream, Metric, Points) -> {ok, term()} | {error, term()} + when Stream :: map(), + Metric :: Table | {DbName, Table}, + DbName :: atom() | binary() | list(), + Table :: atom() | binary() | list(), + Points :: [Point], + Point :: + #{tags => map(), + fields => map(), + timestamp => integer()}. write(Stream, Metric, Points) -> try Request = greptimedb_encoder:insert_request(Stream, Metric, Points), @@ -13,9 +23,11 @@ write(Stream, Metric, Points) -> {error, R} end. +-spec finish(Stream :: map()) -> {ok, term()} | {error, term()}. finish(Stream) -> finish(Stream, 5000). +-spec finish(Stream :: map(), Timeout :: integer()) -> {ok, term()} | {error, term()}. finish(Stream, Timeout) -> try ok = grpcbox_client:close_send(Stream), diff --git a/src/greptimedb_worker.erl b/src/greptimedb_worker.erl index a978021..45931dc 100644 --- a/src/greptimedb_worker.erl +++ b/src/greptimedb_worker.erl @@ -18,7 +18,7 @@ -behavihour(ecpool_worker). --export([rpc_call/2, stream/1, ddl/0]). +-export([handle/2, stream/1, ddl/0, health_check/1]). -export([start_link/1, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). -export([connect/1]). @@ -45,7 +45,15 @@ handle_call({handle, Request}, _From, #state{channel = Channel} = State) -> Err -> {reply, Err, State} end; - +handle_call(health_check, _From, #state{channel = Channel} = State) -> + Request = #{}, + Reply = greptime_v_1_health_check_client:health_check(Request, #{channel => Channel}), + case Reply of + {ok, Resp, _} -> + {reply, {ok, Resp}, State}; + Err -> + {reply, Err, State} + end; handle_call(channel, _From, #state{channel = Channel} = State) -> {reply, {ok, Channel}, State}. @@ -66,9 +74,12 @@ terminate(Reason, #state{channel = Channel} = State) -> %%%=================================================================== %%% Public functions %%%=================================================================== -rpc_call(Pid, Request) -> +handle(Pid, Request) -> gen_server:call(Pid, {handle, Request}). +health_check(Pid) -> + gen_server:call(Pid, health_check). + stream(Pid) -> {ok, Channel} = gen_server:call(Pid, channel), greptime_v_1_greptime_database_client:handle_requests(#{channel => Channel}). diff --git a/src/health_pb.erl b/src/health_pb.erl new file mode 100644 index 0000000..d159454 --- /dev/null +++ b/src/health_pb.erl @@ -0,0 +1,612 @@ +%% -*- coding: utf-8 -*- +%% @private +%% Automatically generated, do not edit +%% Generated by gpb_compile version 4.19.7 +%% Version source: file +-module(health_pb). + +-export([encode_msg/2, encode_msg/3]). +-export([decode_msg/2, decode_msg/3]). +-export([merge_msgs/3, merge_msgs/4]). +-export([verify_msg/2, verify_msg/3]). +-export([get_msg_defs/0]). +-export([get_msg_names/0]). +-export([get_group_names/0]). +-export([get_msg_or_group_names/0]). +-export([get_enum_names/0]). +-export([find_msg_def/1, fetch_msg_def/1]). +-export([find_enum_def/1, fetch_enum_def/1]). +-export([enum_symbol_by_value/2, enum_value_by_symbol/2]). +-export([get_service_names/0]). +-export([get_service_def/1]). +-export([get_rpc_names/1]). +-export([find_rpc_def/2, fetch_rpc_def/2]). +-export([fqbin_to_service_name/1]). +-export([service_name_to_fqbin/1]). +-export([fqbins_to_service_and_rpc_name/2]). +-export([service_and_rpc_name_to_fqbins/2]). +-export([fqbin_to_msg_name/1]). +-export([msg_name_to_fqbin/1]). +-export([fqbin_to_enum_name/1]). +-export([enum_name_to_fqbin/1]). +-export([get_package_name/0]). +-export([uses_packages/0]). +-export([source_basename/0]). +-export([get_all_source_basenames/0]). +-export([get_all_proto_names/0]). +-export([get_msg_containment/1]). +-export([get_pkg_containment/1]). +-export([get_service_containment/1]). +-export([get_rpc_containment/1]). +-export([get_enum_containment/1]). +-export([get_proto_by_msg_name_as_fqbin/1]). +-export([get_proto_by_service_name_as_fqbin/1]). +-export([get_proto_by_enum_name_as_fqbin/1]). +-export([get_protos_by_pkg_name_as_fqbin/1]). +-export([gpb_version_as_string/0, gpb_version_as_list/0]). +-export([gpb_version_source/0]). + + +%% enumerated types + +-export_type([]). + +%% message types +-type health_check_request() :: + #{ + }. + +-type health_check_response() :: + #{ + }. + +-export_type(['health_check_request'/0, 'health_check_response'/0]). +-type '$msg_name'() :: health_check_request | health_check_response. +-type '$msg'() :: health_check_request() | health_check_response(). +-export_type(['$msg_name'/0, '$msg'/0]). + +-if(?OTP_RELEASE >= 24). +-dialyzer({no_underspecs, encode_msg/2}). +-endif. +-spec encode_msg('$msg'(), '$msg_name'()) -> <<>>. +encode_msg(Msg, MsgName) when is_atom(MsgName) -> encode_msg(Msg, MsgName, []). + +-if(?OTP_RELEASE >= 24). +-dialyzer({no_underspecs, encode_msg/3}). +-endif. +-spec encode_msg('$msg'(), '$msg_name'(), list()) -> <<>>. +encode_msg(Msg, MsgName, Opts) -> + case proplists:get_bool(verify, Opts) of + true -> verify_msg(Msg, MsgName, Opts); + false -> ok + end, + TrUserData = proplists:get_value(user_data, Opts), + case MsgName of + health_check_request -> encode_msg_health_check_request(id(Msg, TrUserData), TrUserData); + health_check_response -> encode_msg_health_check_response(id(Msg, TrUserData), TrUserData) + end. + + +encode_msg_health_check_request(_Msg, _TrUserData) -> <<>>. + +encode_msg_health_check_response(_Msg, _TrUserData) -> <<>>. + +-compile({nowarn_unused_function,e_type_sint/3}). +e_type_sint(Value, Bin, _TrUserData) when Value >= 0 -> e_varint(Value * 2, Bin); +e_type_sint(Value, Bin, _TrUserData) -> e_varint(Value * -2 - 1, Bin). + +-compile({nowarn_unused_function,e_type_int32/3}). +e_type_int32(Value, Bin, _TrUserData) when 0 =< Value, Value =< 127 -> <>; +e_type_int32(Value, Bin, _TrUserData) -> + <> = <>, + e_varint(N, Bin). + +-compile({nowarn_unused_function,e_type_int64/3}). +e_type_int64(Value, Bin, _TrUserData) when 0 =< Value, Value =< 127 -> <>; +e_type_int64(Value, Bin, _TrUserData) -> + <> = <>, + e_varint(N, Bin). + +-compile({nowarn_unused_function,e_type_bool/3}). +e_type_bool(true, Bin, _TrUserData) -> <>; +e_type_bool(false, Bin, _TrUserData) -> <>; +e_type_bool(1, Bin, _TrUserData) -> <>; +e_type_bool(0, Bin, _TrUserData) -> <>. + +-compile({nowarn_unused_function,e_type_string/3}). +e_type_string(S, Bin, _TrUserData) -> + Utf8 = unicode:characters_to_binary(S), + Bin2 = e_varint(byte_size(Utf8), Bin), + <>. + +-compile({nowarn_unused_function,e_type_bytes/3}). +e_type_bytes(Bytes, Bin, _TrUserData) when is_binary(Bytes) -> + Bin2 = e_varint(byte_size(Bytes), Bin), + <>; +e_type_bytes(Bytes, Bin, _TrUserData) when is_list(Bytes) -> + BytesBin = iolist_to_binary(Bytes), + Bin2 = e_varint(byte_size(BytesBin), Bin), + <>. + +-compile({nowarn_unused_function,e_type_fixed32/3}). +e_type_fixed32(Value, Bin, _TrUserData) -> <>. + +-compile({nowarn_unused_function,e_type_sfixed32/3}). +e_type_sfixed32(Value, Bin, _TrUserData) -> <>. + +-compile({nowarn_unused_function,e_type_fixed64/3}). +e_type_fixed64(Value, Bin, _TrUserData) -> <>. + +-compile({nowarn_unused_function,e_type_sfixed64/3}). +e_type_sfixed64(Value, Bin, _TrUserData) -> <>. + +-compile({nowarn_unused_function,e_type_float/3}). +e_type_float(V, Bin, _) when is_number(V) -> <>; +e_type_float(infinity, Bin, _) -> <>; +e_type_float('-infinity', Bin, _) -> <>; +e_type_float(nan, Bin, _) -> <>. + +-compile({nowarn_unused_function,e_type_double/3}). +e_type_double(V, Bin, _) when is_number(V) -> <>; +e_type_double(infinity, Bin, _) -> <>; +e_type_double('-infinity', Bin, _) -> <>; +e_type_double(nan, Bin, _) -> <>. + +-compile({nowarn_unused_function,e_unknown_elems/2}). +e_unknown_elems([Elem | Rest], Bin) -> + BinR = case Elem of + {varint, FNum, N} -> + BinF = e_varint(FNum bsl 3, Bin), + e_varint(N, BinF); + {length_delimited, FNum, Data} -> + BinF = e_varint(FNum bsl 3 bor 2, Bin), + BinL = e_varint(byte_size(Data), BinF), + <>; + {group, FNum, GroupFields} -> + Bin1 = e_varint(FNum bsl 3 bor 3, Bin), + Bin2 = e_unknown_elems(GroupFields, Bin1), + e_varint(FNum bsl 3 bor 4, Bin2); + {fixed32, FNum, V} -> + BinF = e_varint(FNum bsl 3 bor 5, Bin), + <>; + {fixed64, FNum, V} -> + BinF = e_varint(FNum bsl 3 bor 1, Bin), + <> + end, + e_unknown_elems(Rest, BinR); +e_unknown_elems([], Bin) -> Bin. + +-compile({nowarn_unused_function,e_varint/3}). +e_varint(N, Bin, _TrUserData) -> e_varint(N, Bin). + +-compile({nowarn_unused_function,e_varint/2}). +e_varint(N, Bin) when N =< 127 -> <>; +e_varint(N, Bin) -> + Bin2 = <>, + e_varint(N bsr 7, Bin2). + + +decode_msg(Bin, MsgName) when is_binary(Bin) -> decode_msg(Bin, MsgName, []). + +decode_msg(Bin, MsgName, Opts) when is_binary(Bin) -> + TrUserData = proplists:get_value(user_data, Opts), + decode_msg_1_catch(Bin, MsgName, TrUserData). + +-ifdef('OTP_RELEASE'). +decode_msg_1_catch(Bin, MsgName, TrUserData) -> + try decode_msg_2_doit(MsgName, Bin, TrUserData) + catch + error:{gpb_error,_}=Reason:StackTrace -> + erlang:raise(error, Reason, StackTrace); + Class:Reason:StackTrace -> error({gpb_error,{decoding_failure, {Bin, MsgName, {Class, Reason, StackTrace}}}}) + end. +-else. +decode_msg_1_catch(Bin, MsgName, TrUserData) -> + try decode_msg_2_doit(MsgName, Bin, TrUserData) + catch + error:{gpb_error,_}=Reason -> + erlang:raise(error, Reason, + erlang:get_stacktrace()); + Class:Reason -> + StackTrace = erlang:get_stacktrace(), + error({gpb_error,{decoding_failure, {Bin, MsgName, {Class, Reason, StackTrace}}}}) + end. +-endif. + +decode_msg_2_doit(health_check_request, Bin, TrUserData) -> id(decode_msg_health_check_request(Bin, TrUserData), TrUserData); +decode_msg_2_doit(health_check_response, Bin, TrUserData) -> id(decode_msg_health_check_response(Bin, TrUserData), TrUserData). + + + +decode_msg_health_check_request(Bin, TrUserData) -> dfp_read_field_def_health_check_request(Bin, 0, 0, 0, TrUserData). + +dfp_read_field_def_health_check_request(<<>>, 0, 0, _, _) -> #{}; +dfp_read_field_def_health_check_request(Other, Z1, Z2, F, TrUserData) -> dg_read_field_def_health_check_request(Other, Z1, Z2, F, TrUserData). + +dg_read_field_def_health_check_request(<<1:1, X:7, Rest/binary>>, N, Acc, F, TrUserData) when N < 32 - 7 -> dg_read_field_def_health_check_request(Rest, N + 7, X bsl N + Acc, F, TrUserData); +dg_read_field_def_health_check_request(<<0:1, X:7, Rest/binary>>, N, Acc, _, TrUserData) -> + Key = X bsl N + Acc, + case Key band 7 of + 0 -> skip_varint_health_check_request(Rest, 0, 0, Key bsr 3, TrUserData); + 1 -> skip_64_health_check_request(Rest, 0, 0, Key bsr 3, TrUserData); + 2 -> skip_length_delimited_health_check_request(Rest, 0, 0, Key bsr 3, TrUserData); + 3 -> skip_group_health_check_request(Rest, 0, 0, Key bsr 3, TrUserData); + 5 -> skip_32_health_check_request(Rest, 0, 0, Key bsr 3, TrUserData) + end; +dg_read_field_def_health_check_request(<<>>, 0, 0, _, _) -> #{}. + +skip_varint_health_check_request(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, TrUserData) -> skip_varint_health_check_request(Rest, Z1, Z2, F, TrUserData); +skip_varint_health_check_request(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, TrUserData) -> dfp_read_field_def_health_check_request(Rest, Z1, Z2, F, TrUserData). + +skip_length_delimited_health_check_request(<<1:1, X:7, Rest/binary>>, N, Acc, F, TrUserData) when N < 57 -> skip_length_delimited_health_check_request(Rest, N + 7, X bsl N + Acc, F, TrUserData); +skip_length_delimited_health_check_request(<<0:1, X:7, Rest/binary>>, N, Acc, F, TrUserData) -> + Length = X bsl N + Acc, + <<_:Length/binary, Rest2/binary>> = Rest, + dfp_read_field_def_health_check_request(Rest2, 0, 0, F, TrUserData). + +skip_group_health_check_request(Bin, _, Z2, FNum, TrUserData) -> + {_, Rest} = read_group(Bin, FNum), + dfp_read_field_def_health_check_request(Rest, 0, Z2, FNum, TrUserData). + +skip_32_health_check_request(<<_:32, Rest/binary>>, Z1, Z2, F, TrUserData) -> dfp_read_field_def_health_check_request(Rest, Z1, Z2, F, TrUserData). + +skip_64_health_check_request(<<_:64, Rest/binary>>, Z1, Z2, F, TrUserData) -> dfp_read_field_def_health_check_request(Rest, Z1, Z2, F, TrUserData). + +decode_msg_health_check_response(Bin, TrUserData) -> dfp_read_field_def_health_check_response(Bin, 0, 0, 0, TrUserData). + +dfp_read_field_def_health_check_response(<<>>, 0, 0, _, _) -> #{}; +dfp_read_field_def_health_check_response(Other, Z1, Z2, F, TrUserData) -> dg_read_field_def_health_check_response(Other, Z1, Z2, F, TrUserData). + +dg_read_field_def_health_check_response(<<1:1, X:7, Rest/binary>>, N, Acc, F, TrUserData) when N < 32 - 7 -> dg_read_field_def_health_check_response(Rest, N + 7, X bsl N + Acc, F, TrUserData); +dg_read_field_def_health_check_response(<<0:1, X:7, Rest/binary>>, N, Acc, _, TrUserData) -> + Key = X bsl N + Acc, + case Key band 7 of + 0 -> skip_varint_health_check_response(Rest, 0, 0, Key bsr 3, TrUserData); + 1 -> skip_64_health_check_response(Rest, 0, 0, Key bsr 3, TrUserData); + 2 -> skip_length_delimited_health_check_response(Rest, 0, 0, Key bsr 3, TrUserData); + 3 -> skip_group_health_check_response(Rest, 0, 0, Key bsr 3, TrUserData); + 5 -> skip_32_health_check_response(Rest, 0, 0, Key bsr 3, TrUserData) + end; +dg_read_field_def_health_check_response(<<>>, 0, 0, _, _) -> #{}. + +skip_varint_health_check_response(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, TrUserData) -> skip_varint_health_check_response(Rest, Z1, Z2, F, TrUserData); +skip_varint_health_check_response(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, TrUserData) -> dfp_read_field_def_health_check_response(Rest, Z1, Z2, F, TrUserData). + +skip_length_delimited_health_check_response(<<1:1, X:7, Rest/binary>>, N, Acc, F, TrUserData) when N < 57 -> skip_length_delimited_health_check_response(Rest, N + 7, X bsl N + Acc, F, TrUserData); +skip_length_delimited_health_check_response(<<0:1, X:7, Rest/binary>>, N, Acc, F, TrUserData) -> + Length = X bsl N + Acc, + <<_:Length/binary, Rest2/binary>> = Rest, + dfp_read_field_def_health_check_response(Rest2, 0, 0, F, TrUserData). + +skip_group_health_check_response(Bin, _, Z2, FNum, TrUserData) -> + {_, Rest} = read_group(Bin, FNum), + dfp_read_field_def_health_check_response(Rest, 0, Z2, FNum, TrUserData). + +skip_32_health_check_response(<<_:32, Rest/binary>>, Z1, Z2, F, TrUserData) -> dfp_read_field_def_health_check_response(Rest, Z1, Z2, F, TrUserData). + +skip_64_health_check_response(<<_:64, Rest/binary>>, Z1, Z2, F, TrUserData) -> dfp_read_field_def_health_check_response(Rest, Z1, Z2, F, TrUserData). + +read_group(Bin, FieldNum) -> + {NumBytes, EndTagLen} = read_gr_b(Bin, 0, 0, 0, 0, FieldNum), + <> = Bin, + {Group, Rest}. + +%% Like skipping over fields, but record the total length, +%% Each field is <(FieldNum bsl 3) bor FieldType> ++ +%% Record the length because varints may be non-optimally encoded. +%% +%% Groups can be nested, but assume the same FieldNum cannot be nested +%% because group field numbers are shared with the rest of the fields +%% numbers. Thus we can search just for an group-end with the same +%% field number. +%% +%% (The only time the same group field number could occur would +%% be in a nested sub message, but then it would be inside a +%% length-delimited entry, which we skip-read by length.) +read_gr_b(<<1:1, X:7, Tl/binary>>, N, Acc, NumBytes, TagLen, FieldNum) + when N < (32-7) -> + read_gr_b(Tl, N+7, X bsl N + Acc, NumBytes, TagLen+1, FieldNum); +read_gr_b(<<0:1, X:7, Tl/binary>>, N, Acc, NumBytes, TagLen, + FieldNum) -> + Key = X bsl N + Acc, + TagLen1 = TagLen + 1, + case {Key bsr 3, Key band 7} of + {FieldNum, 4} -> % 4 = group_end + {NumBytes, TagLen1}; + {_, 0} -> % 0 = varint + read_gr_vi(Tl, 0, NumBytes + TagLen1, FieldNum); + {_, 1} -> % 1 = bits64 + <<_:64, Tl2/binary>> = Tl, + read_gr_b(Tl2, 0, 0, NumBytes + TagLen1 + 8, 0, FieldNum); + {_, 2} -> % 2 = length_delimited + read_gr_ld(Tl, 0, 0, NumBytes + TagLen1, FieldNum); + {_, 3} -> % 3 = group_start + read_gr_b(Tl, 0, 0, NumBytes + TagLen1, 0, FieldNum); + {_, 4} -> % 4 = group_end + read_gr_b(Tl, 0, 0, NumBytes + TagLen1, 0, FieldNum); + {_, 5} -> % 5 = bits32 + <<_:32, Tl2/binary>> = Tl, + read_gr_b(Tl2, 0, 0, NumBytes + TagLen1 + 4, 0, FieldNum) + end. + +read_gr_vi(<<1:1, _:7, Tl/binary>>, N, NumBytes, FieldNum) + when N < (64-7) -> + read_gr_vi(Tl, N+7, NumBytes+1, FieldNum); +read_gr_vi(<<0:1, _:7, Tl/binary>>, _, NumBytes, FieldNum) -> + read_gr_b(Tl, 0, 0, NumBytes+1, 0, FieldNum). + +read_gr_ld(<<1:1, X:7, Tl/binary>>, N, Acc, NumBytes, FieldNum) + when N < (64-7) -> + read_gr_ld(Tl, N+7, X bsl N + Acc, NumBytes+1, FieldNum); +read_gr_ld(<<0:1, X:7, Tl/binary>>, N, Acc, NumBytes, FieldNum) -> + Len = X bsl N + Acc, + NumBytes1 = NumBytes + 1, + <<_:Len/binary, Tl2/binary>> = Tl, + read_gr_b(Tl2, 0, 0, NumBytes1 + Len, 0, FieldNum). + +merge_msgs(Prev, New, MsgName) when is_atom(MsgName) -> merge_msgs(Prev, New, MsgName, []). + +merge_msgs(Prev, New, MsgName, Opts) -> + TrUserData = proplists:get_value(user_data, Opts), + case MsgName of + health_check_request -> merge_msg_health_check_request(Prev, New, TrUserData); + health_check_response -> merge_msg_health_check_response(Prev, New, TrUserData) + end. + +-compile({nowarn_unused_function,merge_msg_health_check_request/3}). +merge_msg_health_check_request(_Prev, New, _TrUserData) -> New. + +-compile({nowarn_unused_function,merge_msg_health_check_response/3}). +merge_msg_health_check_response(_Prev, New, _TrUserData) -> New. + + +verify_msg(Msg, MsgName) when is_atom(MsgName) -> verify_msg(Msg, MsgName, []). + +verify_msg(Msg, MsgName, Opts) -> + TrUserData = proplists:get_value(user_data, Opts), + case MsgName of + health_check_request -> v_msg_health_check_request(Msg, [MsgName], TrUserData); + health_check_response -> v_msg_health_check_response(Msg, [MsgName], TrUserData); + _ -> mk_type_error(not_a_known_message, Msg, []) + end. + + +-compile({nowarn_unused_function,v_msg_health_check_request/3}). +-dialyzer({nowarn_function,v_msg_health_check_request/3}). +v_msg_health_check_request(#{} = M, Path, _) -> + lists:foreach(fun (OtherKey) -> mk_type_error({extraneous_key, OtherKey}, M, Path) end, maps:keys(M)), + ok; +v_msg_health_check_request(M, Path, _TrUserData) when is_map(M) -> mk_type_error({missing_fields, [] -- maps:keys(M), health_check_request}, M, Path); +v_msg_health_check_request(X, Path, _TrUserData) -> mk_type_error({expected_msg, health_check_request}, X, Path). + +-compile({nowarn_unused_function,v_msg_health_check_response/3}). +-dialyzer({nowarn_function,v_msg_health_check_response/3}). +v_msg_health_check_response(#{} = M, Path, _) -> + lists:foreach(fun (OtherKey) -> mk_type_error({extraneous_key, OtherKey}, M, Path) end, maps:keys(M)), + ok; +v_msg_health_check_response(M, Path, _TrUserData) when is_map(M) -> mk_type_error({missing_fields, [] -- maps:keys(M), health_check_response}, M, Path); +v_msg_health_check_response(X, Path, _TrUserData) -> mk_type_error({expected_msg, health_check_response}, X, Path). + +-compile({nowarn_unused_function,mk_type_error/3}). +-spec mk_type_error(_, _, list()) -> no_return(). +mk_type_error(Error, ValueSeen, Path) -> + Path2 = prettify_path(Path), + erlang:error({gpb_type_error, {Error, [{value, ValueSeen}, {path, Path2}]}}). + + +-compile({nowarn_unused_function,prettify_path/1}). +-dialyzer({nowarn_function,prettify_path/1}). +prettify_path([]) -> top_level; +prettify_path(PathR) -> lists:append(lists:join(".", lists:map(fun atom_to_list/1, lists:reverse(PathR)))). + + +-compile({nowarn_unused_function,id/2}). +-compile({inline,id/2}). +id(X, _TrUserData) -> X. + +-compile({nowarn_unused_function,v_ok/3}). +-compile({inline,v_ok/3}). +v_ok(_Value, _Path, _TrUserData) -> ok. + +-compile({nowarn_unused_function,m_overwrite/3}). +-compile({inline,m_overwrite/3}). +m_overwrite(_Prev, New, _TrUserData) -> New. + +-compile({nowarn_unused_function,cons/3}). +-compile({inline,cons/3}). +cons(Elem, Acc, _TrUserData) -> [Elem | Acc]. + +-compile({nowarn_unused_function,lists_reverse/2}). +-compile({inline,lists_reverse/2}). +'lists_reverse'(L, _TrUserData) -> lists:reverse(L). +-compile({nowarn_unused_function,'erlang_++'/3}). +-compile({inline,'erlang_++'/3}). +'erlang_++'(A, B, _TrUserData) -> A ++ B. + + +get_msg_defs() -> [{{msg, health_check_request}, []}, {{msg, health_check_response}, []}]. + + +get_msg_names() -> [health_check_request, health_check_response]. + + +get_group_names() -> []. + + +get_msg_or_group_names() -> [health_check_request, health_check_response]. + + +get_enum_names() -> []. + + +fetch_msg_def(MsgName) -> + case find_msg_def(MsgName) of + Fs when is_list(Fs) -> Fs; + error -> erlang:error({no_such_msg, MsgName}) + end. + + +-spec fetch_enum_def(_) -> no_return(). +fetch_enum_def(EnumName) -> erlang:error({no_such_enum, EnumName}). + + +find_msg_def(health_check_request) -> []; +find_msg_def(health_check_response) -> []; +find_msg_def(_) -> error. + + +find_enum_def(_) -> error. + + +-spec enum_symbol_by_value(_, _) -> no_return(). +enum_symbol_by_value(E, V) -> erlang:error({no_enum_defs, E, V}). + + +-spec enum_value_by_symbol(_, _) -> no_return(). +enum_value_by_symbol(E, V) -> erlang:error({no_enum_defs, E, V}). + + + +get_service_names() -> ['greptime.v1.HealthCheck']. + + +get_service_def('greptime.v1.HealthCheck') -> {{service, 'greptime.v1.HealthCheck'}, [#{name => 'HealthCheck', input => health_check_request, output => health_check_response, input_stream => false, output_stream => false, opts => []}]}; +get_service_def(_) -> error. + + +get_rpc_names('greptime.v1.HealthCheck') -> ['HealthCheck']; +get_rpc_names(_) -> error. + + +find_rpc_def('greptime.v1.HealthCheck', RpcName) -> 'find_rpc_def_greptime.v1.HealthCheck'(RpcName); +find_rpc_def(_, _) -> error. + + +'find_rpc_def_greptime.v1.HealthCheck'('HealthCheck') -> #{name => 'HealthCheck', input => health_check_request, output => health_check_response, input_stream => false, output_stream => false, opts => []}; +'find_rpc_def_greptime.v1.HealthCheck'(_) -> error. + + +fetch_rpc_def(ServiceName, RpcName) -> + case find_rpc_def(ServiceName, RpcName) of + Def when is_map(Def) -> Def; + error -> erlang:error({no_such_rpc, ServiceName, RpcName}) + end. + + +%% Convert a a fully qualified (ie with package name) service name +%% as a binary to a service name as an atom. +fqbin_to_service_name(<<"greptime.v1.HealthCheck">>) -> 'greptime.v1.HealthCheck'; +fqbin_to_service_name(X) -> error({gpb_error, {badservice, X}}). + + +%% Convert a service name as an atom to a fully qualified +%% (ie with package name) name as a binary. +service_name_to_fqbin('greptime.v1.HealthCheck') -> <<"greptime.v1.HealthCheck">>; +service_name_to_fqbin(X) -> error({gpb_error, {badservice, X}}). + + +%% Convert a a fully qualified (ie with package name) service name +%% and an rpc name, both as binaries to a service name and an rpc +%% name, as atoms. +fqbins_to_service_and_rpc_name(<<"greptime.v1.HealthCheck">>, <<"HealthCheck">>) -> {'greptime.v1.HealthCheck', 'HealthCheck'}; +fqbins_to_service_and_rpc_name(S, R) -> error({gpb_error, {badservice_or_rpc, {S, R}}}). + + +%% Convert a service name and an rpc name, both as atoms, +%% to a fully qualified (ie with package name) service name and +%% an rpc name as binaries. +service_and_rpc_name_to_fqbins('greptime.v1.HealthCheck', 'HealthCheck') -> {<<"greptime.v1.HealthCheck">>, <<"HealthCheck">>}; +service_and_rpc_name_to_fqbins(S, R) -> error({gpb_error, {badservice_or_rpc, {S, R}}}). + + +fqbin_to_msg_name(<<"greptime.v1.HealthCheckRequest">>) -> health_check_request; +fqbin_to_msg_name(<<"greptime.v1.HealthCheckResponse">>) -> health_check_response; +fqbin_to_msg_name(E) -> error({gpb_error, {badmsg, E}}). + + +msg_name_to_fqbin(health_check_request) -> <<"greptime.v1.HealthCheckRequest">>; +msg_name_to_fqbin(health_check_response) -> <<"greptime.v1.HealthCheckResponse">>; +msg_name_to_fqbin(E) -> error({gpb_error, {badmsg, E}}). + + +-spec fqbin_to_enum_name(_) -> no_return(). +fqbin_to_enum_name(E) -> error({gpb_error, {badenum, E}}). + + +-spec enum_name_to_fqbin(_) -> no_return(). +enum_name_to_fqbin(E) -> error({gpb_error, {badenum, E}}). + + +get_package_name() -> 'greptime.v1'. + + +%% Whether or not the message names +%% are prepended with package name or not. +uses_packages() -> true. + + +source_basename() -> "health.proto". + + +%% Retrieve all proto file names, also imported ones. +%% The order is top-down. The first element is always the main +%% source file. The files are returned with extension, +%% see get_all_proto_names/0 for a version that returns +%% the basenames sans extension +get_all_source_basenames() -> ["health.proto"]. + + +%% Retrieve all proto file names, also imported ones. +%% The order is top-down. The first element is always the main +%% source file. The files are returned sans .proto extension, +%% to make it easier to use them with the various get_xyz_containment +%% functions. +get_all_proto_names() -> ["health"]. + + +get_msg_containment("health") -> [health_check_request, health_check_response]; +get_msg_containment(P) -> error({gpb_error, {badproto, P}}). + + +get_pkg_containment("health") -> 'greptime.v1'; +get_pkg_containment(P) -> error({gpb_error, {badproto, P}}). + + +get_service_containment("health") -> ['greptime.v1.HealthCheck']; +get_service_containment(P) -> error({gpb_error, {badproto, P}}). + + +get_rpc_containment("health") -> [{'greptime.v1.HealthCheck', 'HealthCheck'}]; +get_rpc_containment(P) -> error({gpb_error, {badproto, P}}). + + +get_enum_containment("health") -> []; +get_enum_containment(P) -> error({gpb_error, {badproto, P}}). + + +get_proto_by_msg_name_as_fqbin(<<"greptime.v1.HealthCheckRequest">>) -> "health"; +get_proto_by_msg_name_as_fqbin(<<"greptime.v1.HealthCheckResponse">>) -> "health"; +get_proto_by_msg_name_as_fqbin(E) -> error({gpb_error, {badmsg, E}}). + + +get_proto_by_service_name_as_fqbin(<<"greptime.v1.HealthCheck">>) -> "health"; +get_proto_by_service_name_as_fqbin(E) -> error({gpb_error, {badservice, E}}). + + +-spec get_proto_by_enum_name_as_fqbin(_) -> no_return(). +get_proto_by_enum_name_as_fqbin(E) -> error({gpb_error, {badenum, E}}). + + +get_protos_by_pkg_name_as_fqbin(<<"greptime.v1">>) -> ["health"]; +get_protos_by_pkg_name_as_fqbin(E) -> error({gpb_error, {badpkg, E}}). + + + +gpb_version_as_string() -> + "4.19.7". + +gpb_version_as_list() -> + [4,19,7]. + +gpb_version_source() -> + "file". diff --git a/test/greptimedb_SUITE.erl b/test/greptimedb_SUITE.erl index 22a443e..b052f37 100644 --- a/test/greptimedb_SUITE.erl +++ b/test/greptimedb_SUITE.erl @@ -114,6 +114,7 @@ t_write(_) -> {auth, {basic, #{username => <<"greptime_user">>, password => <<"greptime_pwd">>}}}], {ok, Client} = greptimedb:start_client(Options), + true = greptimedb:is_alive(Client), {ok, #{response := {affected_rows, #{value := 2}}}} = greptimedb:write(Client, Metric, Points), ok. @@ -127,6 +128,7 @@ t_write_stream(_) -> {auth, {basic, #{username => <<"greptime_user">>, password => <<"greptime_pwd">>}}}], {ok, Client} = greptimedb:start_client(Options), + true = greptimedb:is_alive(Client), {ok, Stream} = greptimedb:write_stream(Client), Metric = <<"temperatures_stream">>,