diff --git a/src/greptime_v_1_greptime_database_bhvr.erl b/src/greptime_v_1_greptime_database_bhvr.erl index 8e09006..bf2858e 100644 --- a/src/greptime_v_1_greptime_database_bhvr.erl +++ b/src/greptime_v_1_greptime_database_bhvr.erl @@ -8,10 +8,9 @@ -module(greptime_v_1_greptime_database_bhvr). %% Unary RPC --callback handle(ctx:t(), database_pb:greptime_request()) -> - {ok, database_pb:greptime_response(), ctx:t()} | grpcbox_stream:grpc_error_response(). +-callback handle(ctx:t(), greptimedb_database_pb:greptime_request()) -> + {ok, greptimedb_database_pb:greptime_response(), ctx:t()} | grpcbox_stream:grpc_error_response(). -%% +%% -callback handle_requests(reference(), grpcbox_stream:t()) -> - {ok, database_pb:greptime_response(), ctx:t()} | grpcbox_stream:grpc_error_response(). - + {ok, greptimedb_database_pb:greptime_response(), ctx:t()} | grpcbox_stream:grpc_error_response(). diff --git a/src/greptime_v_1_greptime_database_client.erl b/src/greptime_v_1_greptime_database_client.erl index b3bb5ce..be12789 100644 --- a/src/greptime_v_1_greptime_database_client.erl +++ b/src/greptime_v_1_greptime_database_client.erl @@ -15,7 +15,7 @@ -define(is_ctx(Ctx), is_tuple(Ctx) andalso element(1, Ctx) =:= ctx). -define(SERVICE, 'greptime.v1.GreptimeDatabase'). --define(PROTO_MODULE, 'database_pb'). +-define(PROTO_MODULE, 'greptimedb_database_pb'). -define(MARSHAL_FUN(T), fun(I) -> ?PROTO_MODULE:encode_msg(I, T) end). -define(UNMARSHAL_FUN(T), fun(I) -> ?PROTO_MODULE:decode_msg(I, T) end). -define(DEF(Input, Output, MessageType), #grpcbox_def{service=?SERVICE, @@ -24,24 +24,24 @@ unmarshal_fun=?UNMARSHAL_FUN(Output)}). %% @doc Unary RPC --spec handle(database_pb:greptime_request()) -> - {ok, database_pb:greptime_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}. +-spec handle(greptimedb_database_pb:greptime_request()) -> + {ok, greptimedb_database_pb:greptime_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}. handle(Input) -> handle(ctx:new(), Input, #{}). --spec handle(ctx:t() | database_pb:greptime_request(), database_pb:greptime_request() | grpcbox_client:options()) -> - {ok, database_pb:greptime_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}. +-spec handle(ctx:t() | greptimedb_database_pb:greptime_request(), greptimedb_database_pb:greptime_request() | grpcbox_client:options()) -> + {ok, greptimedb_database_pb:greptime_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}. handle(Ctx, Input) when ?is_ctx(Ctx) -> handle(Ctx, Input, #{}); handle(Input, Options) -> handle(ctx:new(), Input, Options). --spec handle(ctx:t(), database_pb:greptime_request(), grpcbox_client:options()) -> - {ok, database_pb:greptime_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}. +-spec handle(ctx:t(), greptimedb_database_pb:greptime_request(), grpcbox_client:options()) -> + {ok, greptimedb_database_pb:greptime_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}. handle(Ctx, Input, Options) -> grpcbox_client:unary(Ctx, <<"/greptime.v1.GreptimeDatabase/Handle">>, Input, ?DEF(greptime_request, greptime_response, <<"greptime.v1.GreptimeRequest">>), Options). -%% @doc +%% @doc -spec handle_requests() -> {ok, grpcbox_client:stream()} | grpcbox_stream:grpc_error_response() | {error, any()}. handle_requests() -> @@ -58,4 +58,3 @@ handle_requests(Options) -> {ok, grpcbox_client:stream()} | grpcbox_stream:grpc_error_response() | {error, any()}. handle_requests(Ctx, Options) -> grpcbox_client:stream(Ctx, <<"/greptime.v1.GreptimeDatabase/HandleRequests">>, ?DEF(greptime_request, greptime_response, <<"greptime.v1.GreptimeRequest">>), Options). - diff --git a/src/greptime_v_1_health_check_bhvr.erl b/src/greptime_v_1_health_check_bhvr.erl index b8573ed..2553b0e 100644 --- a/src/greptime_v_1_health_check_bhvr.erl +++ b/src/greptime_v_1_health_check_bhvr.erl @@ -8,6 +8,5 @@ -module(greptime_v_1_health_check_bhvr). %% Unary RPC --callback health_check(ctx:t(), health_pb:health_check_request()) -> - {ok, health_pb:health_check_response(), ctx:t()} | grpcbox_stream:grpc_error_response(). - +-callback health_check(ctx:t(), greptimedb_health_pb:health_check_request()) -> + {ok, greptimedb_health_pb:health_check_response(), ctx:t()} | grpcbox_stream:grpc_error_response(). diff --git a/src/greptime_v_1_health_check_client.erl b/src/greptime_v_1_health_check_client.erl index 576eead..db4e07c 100644 --- a/src/greptime_v_1_health_check_client.erl +++ b/src/greptime_v_1_health_check_client.erl @@ -15,7 +15,7 @@ -define(is_ctx(Ctx), is_tuple(Ctx) andalso element(1, Ctx) =:= ctx). -define(SERVICE, 'greptime.v1.HealthCheck'). --define(PROTO_MODULE, 'health_pb'). +-define(PROTO_MODULE, 'greptimedb_health_pb'). -define(MARSHAL_FUN(T), fun(I) -> ?PROTO_MODULE:encode_msg(I, T) end). -define(UNMARSHAL_FUN(T), fun(I) -> ?PROTO_MODULE:decode_msg(I, T) end). -define(DEF(Input, Output, MessageType), #grpcbox_def{service=?SERVICE, @@ -24,20 +24,19 @@ unmarshal_fun=?UNMARSHAL_FUN(Output)}). %% @doc Unary RPC --spec health_check(health_pb:health_check_request()) -> - {ok, health_pb:health_check_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}. +-spec health_check(greptimedb_health_pb:health_check_request()) -> + {ok, greptimedb_health_pb:health_check_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}. health_check(Input) -> health_check(ctx:new(), Input, #{}). --spec health_check(ctx:t() | health_pb:health_check_request(), health_pb:health_check_request() | grpcbox_client:options()) -> - {ok, health_pb:health_check_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}. +-spec health_check(ctx:t() | greptimedb_health_pb:health_check_request(), greptimedb_health_pb:health_check_request() | grpcbox_client:options()) -> + {ok, greptimedb_health_pb:health_check_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}. health_check(Ctx, Input) when ?is_ctx(Ctx) -> health_check(Ctx, Input, #{}); health_check(Input, Options) -> health_check(ctx:new(), Input, Options). --spec health_check(ctx:t(), health_pb:health_check_request(), grpcbox_client:options()) -> - {ok, health_pb:health_check_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}. +-spec health_check(ctx:t(), greptimedb_health_pb:health_check_request(), grpcbox_client:options()) -> + {ok, greptimedb_health_pb:health_check_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}. health_check(Ctx, Input, Options) -> grpcbox_client:unary(Ctx, <<"/greptime.v1.HealthCheck/HealthCheck">>, Input, ?DEF(health_check_request, health_check_response, <<"greptime.v1.HealthCheckRequest">>), Options). - diff --git a/src/column_pb.erl b/src/greptimedb_column_pb.erl similarity index 99% rename from src/column_pb.erl rename to src/greptimedb_column_pb.erl index 7b3cf28..6d802c8 100644 --- a/src/column_pb.erl +++ b/src/greptimedb_column_pb.erl @@ -3,7 +3,7 @@ %% Automatically generated, do not edit %% Generated by gpb_compile version 4.19.7 %% Version source: file --module(column_pb). +-module(greptimedb_column_pb). -export([encode_msg/2, encode_msg/3]). -export([decode_msg/2, decode_msg/3]). diff --git a/src/common_pb.erl b/src/greptimedb_common_pb.erl similarity index 99% rename from src/common_pb.erl rename to src/greptimedb_common_pb.erl index 15f2d06..b26866f 100644 --- a/src/common_pb.erl +++ b/src/greptimedb_common_pb.erl @@ -3,7 +3,7 @@ %% Automatically generated, do not edit %% Generated by gpb_compile version 4.19.7 %% Version source: file --module(common_pb). +-module(greptimedb_common_pb). -export([encode_msg/2, encode_msg/3]). -export([decode_msg/2, decode_msg/3]). diff --git a/src/database_pb.erl b/src/greptimedb_database_pb.erl similarity index 99% rename from src/database_pb.erl rename to src/greptimedb_database_pb.erl index b04a808..4d34b30 100644 --- a/src/database_pb.erl +++ b/src/greptimedb_database_pb.erl @@ -3,7 +3,7 @@ %% Automatically generated, do not edit %% Generated by gpb_compile version 4.19.7 %% Version source: file --module(database_pb). +-module(greptimedb_database_pb). -export([encode_msg/2, encode_msg/3]). -export([decode_msg/2, decode_msg/3]). diff --git a/src/ddl_pb.erl b/src/greptimedb_ddl_pb.erl similarity index 99% rename from src/ddl_pb.erl rename to src/greptimedb_ddl_pb.erl index 186ba8f..3f7239a 100644 --- a/src/ddl_pb.erl +++ b/src/greptimedb_ddl_pb.erl @@ -3,7 +3,7 @@ %% Automatically generated, do not edit %% Generated by gpb_compile version 4.19.7 %% Version source: file --module(ddl_pb). +-module(greptimedb_ddl_pb). -export([encode_msg/2, encode_msg/3]). -export([decode_msg/2, decode_msg/3]). diff --git a/src/health_pb.erl b/src/greptimedb_health_pb.erl similarity index 99% rename from src/health_pb.erl rename to src/greptimedb_health_pb.erl index d159454..1d18b74 100644 --- a/src/health_pb.erl +++ b/src/greptimedb_health_pb.erl @@ -3,7 +3,7 @@ %% Automatically generated, do not edit %% Generated by gpb_compile version 4.19.7 %% Version source: file --module(health_pb). +-module(greptimedb_health_pb). -export([encode_msg/2, encode_msg/3]). -export([decode_msg/2, decode_msg/3]). diff --git a/src/greptimedb_values.erl b/src/greptimedb_values.erl index 59d8914..f3c2154 100644 --- a/src/greptimedb_values.erl +++ b/src/greptimedb_values.erl @@ -15,7 +15,8 @@ -module(greptimedb_values). -export([int32_value/1, int64_value/1, float64_value/1, boolean_value/1, binary_value/1, - string_value/1, date_value/1, datetime_value/1, timestamp_second_value/1, + string_value/1, date_value/1, datetime_value/1, timestamp_second_value/1, uint32_value/1, + uint64_value/1, timestamp_millisecond_value/1, timestamp_microsecond_value/1, timestamp_nanosecond_value/1]). @@ -25,6 +26,12 @@ int32_value(V) -> int64_value(V) -> #{values => #{i64_values => [V]}, datatype => 'INT64'}. +uint32_value(V) -> + #{values => #{u32_values => [V]}, datatype => 'UINT32'}. + +uint64_value(V) -> + #{values => #{u64_values => [V]}, datatype => 'UINT64'}. + float64_value(V) -> #{values => #{f64_values => [V]}, datatype => 'FLOAT64'}. diff --git a/src/greptimedb_worker.erl b/src/greptimedb_worker.erl index 45931dc..79a5a25 100644 --- a/src/greptimedb_worker.erl +++ b/src/greptimedb_worker.erl @@ -18,19 +18,26 @@ -behavihour(ecpool_worker). +-include_lib("grpcbox/include/grpcbox.hrl"). + -export([handle/2, stream/1, ddl/0, health_check/1]). -export([start_link/1, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). -export([connect/1]). -record(state, {channel}). +-define(CALL_TIMEOUT, 12_000). +-define(HEALTH_CHECK_TIMEOUT, 1_000). +-define(REQUEST_TIMEOUT, 10_000). +-define(CONNECT_TIMEOUT, 5_000). + %% =================================================================== %% gen_server callbacks %% =================================================================== init(Args) -> logger:debug("[GreptimeDB] genserver has started (~w)~n", [self()]), Endpoints = proplists:get_value(endpoints, Args), - Options = proplists:get_value(gprc_options, Args, #{}), + Options = proplists:get_value(gprc_options, Args, #{connect_timeout => ?CONNECT_TIMEOUT}), Channels = lists:map(fun({Schema, Host, Port}) -> {Schema, Host, Port, []} end, Endpoints), Channel = list_to_atom(pid_to_list(self())), @@ -38,16 +45,21 @@ init(Args) -> {ok, #state{channel = Channel}}. handle_call({handle, Request}, _From, #state{channel = Channel} = State) -> - Reply = greptime_v_1_greptime_database_client:handle(Request, #{channel => Channel}), + Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond), + Reply = greptime_v_1_greptime_database_client:handle(Ctx, Request, #{channel => Channel}), case Reply of {ok, Resp, _} -> {reply, {ok, Resp}, State}; + {error, {?GRPC_STATUS_UNAUTHENTICATED, Msg}, Other} -> + {reply, {error, {unauth, Msg, Other}}, State}; Err -> {reply, Err, State} end; handle_call(health_check, _From, #state{channel = Channel} = State) -> Request = #{}, - Reply = greptime_v_1_health_check_client:health_check(Request, #{channel => Channel}), + Ctx = ctx:with_deadline_after(?HEALTH_CHECK_TIMEOUT, millisecond), + Reply = + greptime_v_1_health_check_client:health_check(Ctx, Request, #{channel => Channel}), case Reply of {ok, Resp, _} -> {reply, {ok, Resp}, State}; @@ -75,14 +87,15 @@ terminate(Reason, #state{channel = Channel} = State) -> %%% Public functions %%%=================================================================== handle(Pid, Request) -> - gen_server:call(Pid, {handle, Request}). + gen_server:call(Pid, {handle, Request}, ?CALL_TIMEOUT). health_check(Pid) -> - gen_server:call(Pid, health_check). + gen_server:call(Pid, health_check, ?HEALTH_CHECK_TIMEOUT). stream(Pid) -> - {ok, Channel} = gen_server:call(Pid, channel), - greptime_v_1_greptime_database_client:handle_requests(#{channel => Channel}). + {ok, Channel} = gen_server:call(Pid, channel, ?CALL_TIMEOUT), + Ctx = ctx:with_deadline_after(?REQUEST_TIMEOUT, millisecond), + greptime_v_1_greptime_database_client:handle_requests(Ctx, #{channel => Channel}). ddl() -> todo. diff --git a/test/greptimedb_SUITE.erl b/test/greptimedb_SUITE.erl index 80a4296..60f00a4 100644 --- a/test/greptimedb_SUITE.erl +++ b/test/greptimedb_SUITE.erl @@ -6,7 +6,7 @@ -include_lib("eunit/include/eunit.hrl"). all() -> - [t_write, t_write_stream, t_insert_requests, t_write_batch, t_bench_perf]. + [t_write, t_write_stream, t_insert_requests, t_write_batch, t_bench_perf, t_auth_error]. %%[t_bench_perf]. %%[t_insert_requests, t_bench_perf]. @@ -131,6 +131,33 @@ t_write(_) -> greptimedb:stop_client(Client), ok. +t_auth_error(_) -> + Metric = <<"temperatures">>, + Points = + [#{fields => #{<<"temperature">> => 1}, + tags => + #{<<"from">> => <<"mqttx_4b963a8e">>, + <<"host">> => <<"serverA">>, + <<"qos">> => greptimedb_values:int64_value(0), + <<"region">> => <<"hangzhou">>}, + timestamp => 1619775142098}, + #{fields => #{<<"temperature">> => 2}, + tags => + #{<<"from">> => <<"mqttx_4b963a8e">>, + <<"host">> => <<"serverB">>, + <<"qos">> => greptimedb_values:int64_value(1), + <<"region">> => <<"ningbo">>, + <<"to">> => <<"kafka">>}, + timestamp => 1619775143098}], + Options = + [{endpoints, [{http, "localhost", 4001}]}, + {pool, greptimedb_client_pool}, + {pool_size, 5}, + {pool_type, random}, + {auth, {basic, #{username => <<"greptime_user">>, password => <<"wrong_pwd">>}}}], + {ok, Client} = greptimedb:start_client(Options), + {error, {unauth, _, _}} = greptimedb:write(Client, Metric, Points). + t_write_stream(_) -> Options = [{endpoints, [{http, "localhost", 4001}]},