Skip to content

Commit

Permalink
feat: health checking (#18)
Browse files Browse the repository at this point in the history
* feat: adds back health proto

* feat: adds is_alive/1 function

* fix: health request

* feat: adds spec for public APIs
  • Loading branch information
killme2008 authored May 16, 2023
1 parent 05bb76b commit 28c9644
Show file tree
Hide file tree
Showing 8 changed files with 763 additions and 7 deletions.
16 changes: 16 additions & 0 deletions protos/health.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
syntax = "proto3";

package greptime.v1;

option java_package = "io.greptime.v1";
option java_outer_classname = "Health";
option go_package = "github.com/GreptimeTeam/greptime-proto/go/greptime/v1";


service HealthCheck {
rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse);
}

message HealthCheckRequest {}

message HealthCheckResponse {}
13 changes: 13 additions & 0 deletions src/greptime_v_1_health_check_bhvr.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
%%%-------------------------------------------------------------------
%% @doc Behaviour to implement for grpc service greptime.v1.HealthCheck.
%% @end
%%%-------------------------------------------------------------------

%% this module was generated and should not be modified manually

-module(greptime_v_1_health_check_bhvr).

%% Unary RPC
-callback health_check(ctx:t(), health_pb:health_check_request()) ->
{ok, health_pb:health_check_response(), ctx:t()} | grpcbox_stream:grpc_error_response().

43 changes: 43 additions & 0 deletions src/greptime_v_1_health_check_client.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
%%%-------------------------------------------------------------------
%% @doc Client module for grpc service greptime.v1.HealthCheck.
%% @end
%%%-------------------------------------------------------------------

%% this module was generated and should not be modified manually

-module(greptime_v_1_health_check_client).

-compile(export_all).
-compile(nowarn_export_all).

-include_lib("grpcbox/include/grpcbox.hrl").

-define(is_ctx(Ctx), is_tuple(Ctx) andalso element(1, Ctx) =:= ctx).

-define(SERVICE, 'greptime.v1.HealthCheck').
-define(PROTO_MODULE, 'health_pb').
-define(MARSHAL_FUN(T), fun(I) -> ?PROTO_MODULE:encode_msg(I, T) end).
-define(UNMARSHAL_FUN(T), fun(I) -> ?PROTO_MODULE:decode_msg(I, T) end).
-define(DEF(Input, Output, MessageType), #grpcbox_def{service=?SERVICE,
message_type=MessageType,
marshal_fun=?MARSHAL_FUN(Input),
unmarshal_fun=?UNMARSHAL_FUN(Output)}).

%% @doc Unary RPC
-spec health_check(health_pb:health_check_request()) ->
{ok, health_pb:health_check_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}.
health_check(Input) ->
health_check(ctx:new(), Input, #{}).

-spec health_check(ctx:t() | health_pb:health_check_request(), health_pb:health_check_request() | grpcbox_client:options()) ->
{ok, health_pb:health_check_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}.
health_check(Ctx, Input) when ?is_ctx(Ctx) ->
health_check(Ctx, Input, #{});
health_check(Input, Options) ->
health_check(ctx:new(), Input, Options).

-spec health_check(ctx:t(), health_pb:health_check_request(), grpcbox_client:options()) ->
{ok, health_pb:health_check_response(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response() | {error, any()}.
health_check(Ctx, Input, Options) ->
grpcbox_client:unary(Ctx, <<"/greptime.v1.HealthCheck/HealthCheck">>, Input, ?DEF(health_check_request, health_check_response, <<"greptime.v1.HealthCheckRequest">>), Options).

55 changes: 51 additions & 4 deletions src/greptimedb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

-module(greptimedb).

-export([start_client/1, stop_client/1, write/3, write_stream/1, ddl/1]).
-export([start_client/1, stop_client/1, write/3, write_stream/1, is_alive/1, is_alive/2,
ddl/1]).

-spec start_client(list()) ->
{ok, Client :: map()} |
Expand All @@ -37,17 +38,28 @@ start_client(Options0) ->
{error, Reason}
end.

-spec write(Client, Metric, Points) -> {ok, term()} | {error, term()}
when Client :: map(),
Metric :: Table | {DbName, Table},
DbName :: atom() | binary() | list(),
Table :: atom() | binary() | list(),
Points :: [Point],
Point ::
#{tags => map(),
fields => map(),
timestamp => integer()}.
write(Client, Metric, Points) ->
try
Request = greptimedb_encoder:insert_request(Client, Metric, Points),
rpc_call(Client, Request)
handle(Client, Request)
catch
E:R:S ->
logger:error("[GreptimeDB] write ~0p failed: ~0p ~0p ~0p ~p",
[Metric, Points, E, R, S]),
{error, R}
end.

-spec write_stream(Client) -> {ok, term()} | {error, term()} when Client :: map().
write_stream(Client) ->
try
rpc_write_stream(Client)
Expand All @@ -60,6 +72,26 @@ write_stream(Client) ->
ddl(_Client) ->
todo.

-spec is_alive(Client :: map()) -> true | false.
is_alive(Client) ->
is_alive(Client, false).

-spec is_alive(Client :: map(), ReturnReason :: boolean()) ->
true | false | {false, Reason :: term()}.
is_alive(Client, ReturnReason) ->
try
case health_check(Client) of
{ok, _Resp} ->
true;
Return ->
maybe_return_reason(Return, ReturnReason)
end
catch
E:R:S ->
logger:error("[GreptimeDB] health check failed: ~0p ~0p ~p", [E, R, S]),
maybe_return_reason({error, R}, ReturnReason)
end.

-spec stop_client(Client :: map()) -> ok | term().
stop_client(#{pool := Pool}) ->
ecpool:stop_sup_pool(Pool).
Expand All @@ -68,8 +100,8 @@ stop_client(#{pool := Pool}) ->
%%% Internal functions
%%%===================================================================

rpc_call(#{pool := Pool} = _Client, Request) ->
Fun = fun(Worker) -> greptimedb_worker:rpc_call(Worker, Request) end,
handle(#{pool := Pool} = _Client, Request) ->
Fun = fun(Worker) -> greptimedb_worker:handle(Worker, Request) end,
try
ecpool:with_client(Pool, Fun)
catch
Expand All @@ -78,6 +110,16 @@ rpc_call(#{pool := Pool} = _Client, Request) ->
{error, {E, R}}
end.

health_check(#{pool := Pool} = _Client) ->
Fun = fun(Worker) -> greptimedb_worker:health_check(Worker) end,
try
ecpool:with_client(Pool, Fun)
catch
E:R:S ->
logger:error("[GreptimeDB] grpc health check failed: ~0p ~0p ~0p", [E, R, S]),
{error, {E, R}}
end.

rpc_write_stream(#{pool := Pool, cli_opts := Options} = _Client) ->
Fun = fun(Worker) ->
case greptimedb_worker:stream(Worker) of
Expand All @@ -94,3 +136,8 @@ rpc_write_stream(#{pool := Pool, cli_opts := Options} = _Client) ->
logger:error("[GreptimeDB] grpc write fail: ~0p ~0p ~0p", [E, R, S]),
{error, {E, R}}
end.

maybe_return_reason({error, Reason}, true) ->
{false, Reason};
maybe_return_reason(_, _) ->
false.
12 changes: 12 additions & 0 deletions src/greptimedb_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

-export([write/3, finish/1]).

-spec write(Stream, Metric, Points) -> {ok, term()} | {error, term()}
when Stream :: map(),
Metric :: Table | {DbName, Table},
DbName :: atom() | binary() | list(),
Table :: atom() | binary() | list(),
Points :: [Point],
Point ::
#{tags => map(),
fields => map(),
timestamp => integer()}.
write(Stream, Metric, Points) ->
try
Request = greptimedb_encoder:insert_request(Stream, Metric, Points),
Expand All @@ -13,9 +23,11 @@ write(Stream, Metric, Points) ->
{error, R}
end.

-spec finish(Stream :: map()) -> {ok, term()} | {error, term()}.
finish(Stream) ->
finish(Stream, 5000).

-spec finish(Stream :: map(), Timeout :: integer()) -> {ok, term()} | {error, term()}.
finish(Stream, Timeout) ->
try
ok = grpcbox_client:close_send(Stream),
Expand Down
17 changes: 14 additions & 3 deletions src/greptimedb_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

-behavihour(ecpool_worker).

-export([rpc_call/2, stream/1, ddl/0]).
-export([handle/2, stream/1, ddl/0, health_check/1]).
-export([start_link/1, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-export([connect/1]).

Expand All @@ -45,7 +45,15 @@ handle_call({handle, Request}, _From, #state{channel = Channel} = State) ->
Err ->
{reply, Err, State}
end;

handle_call(health_check, _From, #state{channel = Channel} = State) ->
Request = #{},
Reply = greptime_v_1_health_check_client:health_check(Request, #{channel => Channel}),
case Reply of
{ok, Resp, _} ->
{reply, {ok, Resp}, State};
Err ->
{reply, Err, State}
end;
handle_call(channel, _From, #state{channel = Channel} = State) ->
{reply, {ok, Channel}, State}.

Expand All @@ -66,9 +74,12 @@ terminate(Reason, #state{channel = Channel} = State) ->
%%%===================================================================
%%% Public functions
%%%===================================================================
rpc_call(Pid, Request) ->
handle(Pid, Request) ->
gen_server:call(Pid, {handle, Request}).

health_check(Pid) ->
gen_server:call(Pid, health_check).

stream(Pid) ->
{ok, Channel} = gen_server:call(Pid, channel),
greptime_v_1_greptime_database_client:handle_requests(#{channel => Channel}).
Expand Down
Loading

0 comments on commit 28c9644

Please sign in to comment.