Skip to content

Add counters #538

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@
"Number of checkpoint bytes written"},
{checkpoints_promoted, ?C_RA_LOG_CHECKPOINTS_PROMOTED, counter,
"Number of checkpoints promoted to snapshots"},
{checkpoints_promotions_dropped, ?C_RA_LOG_CHECKPOINTS_PROMOTIONS_DROPPED,
counter, "Number of checkpoints promotions that were not exectued"},
{reserved_1, ?C_RA_LOG_RESERVED, counter, "Reserved counter"}
]).
-define(C_RA_LOG_WRITE_OPS, 1).
Expand All @@ -293,7 +295,8 @@
-define(C_RA_LOG_CHECKPOINTS_WRITTEN, 13).
-define(C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, 14).
-define(C_RA_LOG_CHECKPOINTS_PROMOTED, 15).
-define(C_RA_LOG_RESERVED, 16).
-define(C_RA_LOG_CHECKPOINTS_PROMOTIONS_DROPPED, 16).
-define(C_RA_LOG_RESERVED, 17).

-define(C_RA_SRV_AER_RECEIVED_FOLLOWER, ?C_RA_LOG_RESERVED + 1).
-define(C_RA_SRV_AER_REPLIES_SUCCESS, ?C_RA_LOG_RESERVED + 2).
Expand Down
45 changes: 24 additions & 21 deletions src/ra_directory.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ init(Dir, #{directory := Name,
[{file, Dets},
{auto_save, 500},
{access, read_write}]),
_ = dets:foldl(fun({ServerName, UId}, Acc) ->
true = ets:insert(Name, {UId, undefined, undefined,
ServerName, undefined}),
Acc
end, [], NameRev),
ok.

-spec deinit(atom() | ra_system:names()) -> ok.
Expand Down Expand Up @@ -116,8 +121,9 @@ where_is(#{directory_rev := DirRev} = Names, ServerName)
end;
where_is(#{directory := Dir}, UId) when is_binary(UId) ->
case ets:lookup(Dir, UId) of
[{_, Pid, _, _, _}] -> Pid;
[] -> undefined
[{_, Pid, _, _, _}] when is_pid(Pid) ->
Pid;
_ -> undefined
end.

-spec where_is_parent(atom() | ra_system:names(), ra_uid() | atom()) ->
Expand All @@ -133,8 +139,8 @@ where_is_parent(#{directory_rev := DirRev} = Names, ServerName)
end;
where_is_parent(#{directory := Dir}, UId) when is_binary(UId) ->
case ets:lookup(Dir, UId) of
[{_, _, Pid, _, _}] -> Pid;
[] -> undefined
[{_, _, Pid, _, _}] when is_pid(Pid) -> Pid;
_ -> undefined
end.

-spec name_of(atom() | ra_system:names(), ra_uid()) -> option(atom()).
Expand All @@ -143,30 +149,26 @@ name_of(SystemOrNames, UId) ->
case ets:lookup(Tbl, UId) of
[{_, _, _, ServerName, _}] -> ServerName;
[] ->
TblRev = get_reverse(SystemOrNames),
case dets:select(TblRev, [{{'_', UId}, [], ['$_']}]) of
[] ->
undefined;
[{Name, UId}] ->
Name
end
undefined
end.

-spec cluster_name_of(ra_system:names() | atom(), ra_uid()) ->
option(ra_cluster_name()).
cluster_name_of(SystemOrNames, UId) ->
Tbl = get_name(SystemOrNames),
case ets:lookup(Tbl, UId) of
[{_, _, _, _, ClusterName}] -> ClusterName;
[] -> undefined
[{_, _, _, _, ClusterName}]
when ClusterName /= undefined ->
ClusterName;
_ -> undefined
end.


-spec pid_of(atom() | ra_system:names(), ra_uid()) -> option(pid()).
pid_of(SystemOrNames, UId) ->
case ets:lookup(get_name(SystemOrNames), UId) of
[{_, Pid, _, _, _}] -> Pid;
[] -> undefined
[{_, Pid, _, _, _}] when is_pid(Pid) -> Pid;
_ -> undefined
end.

uid_of(System, ServerName) when is_atom(System) ->
Expand Down Expand Up @@ -203,17 +205,19 @@ overview(System) when is_atom(System) ->
undefined)}}
end, #{}, Dir).

-spec list_registered(atom()) -> [{atom(), ra_uid()}].
list_registered(System) when is_atom(System) ->
Tbl = get_reverse(System),
-spec list_registered(atom() | ra_system:names()) ->
[{atom(), ra_uid()}].
list_registered(SystemOrNames)
when is_atom(SystemOrNames) orelse
is_map(SystemOrNames) ->
Tbl = get_reverse(SystemOrNames),
dets:select(Tbl, [{'_', [], ['$_']}]).

-spec is_registered_uid(atom() | ra_system:names(), ra_uid()) -> boolean().
is_registered_uid(SystemOrNames, UId)
when (is_atom(SystemOrNames) orelse is_map(SystemOrNames)) andalso
is_binary(UId) ->
Tbl = get_reverse(SystemOrNames),
[] =/= dets:select(Tbl, [{{'_', UId}, [], ['$_']}]).
name_of(SystemOrNames, UId) =/= undefined.

get_name(#{directory := Tbl}) ->
Tbl;
Expand All @@ -230,4 +234,3 @@ get_reverse(#{directory_rev := DirRev}) ->
get_names(System) when is_atom(System) ->
#{names := Names} = ra_system:fetch(System),
Names.

1 change: 1 addition & 0 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,7 @@ promote_checkpoint(Idx, #?MODULE{cfg = Cfg,
snapshot_state = SnapState0} = State) ->
case ra_snapshot:pending(SnapState0) of
{_WriterPid, _IdxTerm, snapshot} ->
ok = incr_counter(Cfg, ?C_RA_LOG_CHECKPOINTS_PROMOTIONS_DROPPED, 1),
%% If we're currently writing a snapshot, skip promoting a
%% checkpoint.
{State, []};
Expand Down
6 changes: 3 additions & 3 deletions src/ra_log_segment_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ segments_for(UId, #state{data_dir = DataDir}) ->

handle_cast({mem_tables, Ranges, WalFile}, #state{data_dir = Dir,
system = System} = State) ->
T1 = erlang:monotonic_time(),
ok = counters:add(State#state.counter, ?C_MEM_TABLES, map_size(Ranges)),
#{names := Names} = ra_system:fetch(System),
Degree = erlang:system_info(schedulers),
Expand All @@ -158,7 +159,6 @@ handle_cast({mem_tables, Ranges, WalFile}, #state{data_dir = Dir,
end
end, [], Ranges),

T1 = erlang:monotonic_time(),
_ = [begin
{_, Failures} = ra_lib:partition_parallel(
fun (TidRange) ->
Expand Down Expand Up @@ -365,9 +365,9 @@ snap_idx(ServerUId) ->
send_segments(System, ServerUId, TidRanges, SegRefs) ->
case ra_directory:pid_of(System, ServerUId) of
undefined ->
?DEBUG("ra_log_segment_writer: error sending "
?DEBUG("ra_log_segment_writer: unable to send "
"ra_log_event to: "
"~ts. Error: ~s",
"~ts. Reason: ~s",
[ServerUId, "No Pid"]),
%% delete from the memtable on the non-running server's behalf
[begin
Expand Down
26 changes: 13 additions & 13 deletions src/ra_log_wal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -377,26 +377,26 @@ recover_wal(Dir, #conf{segment_writer = SegWriter,
filename:extension(File) == ".wal"],
WalFiles = lists:sort(Files),
AllWriters =
[begin
?DEBUG("wal: recovering ~ts, Mode ~s", [F, Mode]),
Fd = open_at_first_record(filename:join(Dir, F)),
{Time, #recovery{ranges = Ranges,
writers = Writers}} =
timer:tc(fun () -> recover_wal_chunks(Conf, Fd, Mode) end),
[begin
?DEBUG("wal: recovering ~ts, Mode ~s", [F, Mode]),
Fd = open_at_first_record(filename:join(Dir, F)),
{Time, #recovery{ranges = Ranges,
writers = Writers}} =
timer:tc(fun () -> recover_wal_chunks(Conf, Fd, Mode) end),

ok = ra_log_segment_writer:accept_mem_tables(SegWriter, Ranges, F),
ok = ra_log_segment_writer:accept_mem_tables(SegWriter, Ranges, F),

close_existing(Fd),
?DEBUG("wal: recovered ~ts time taken ~bms - Writer state recovered ~p",
[F, Time div 1000, Writers]),
Writers
end || F <- WalFiles],
close_existing(Fd),
?DEBUG("wal: recovered ~ts time taken ~bms - recovered ~b writers",
[F, Time div 1000, map_size(Writers)]),
Writers
end || F <- WalFiles],

FinalWriters = lists:foldl(fun (New, Acc) ->
maps:merge(Acc, New)
end, #{}, AllWriters),

?DEBUG("wal: final writer state recovered ~p", [FinalWriters]),
?DEBUG("wal: recovered ~b writers", [map_size(FinalWriters)]),

FileNum = extract_file_num(lists:reverse(WalFiles)),
State = roll_over(#state{conf = Conf,
Expand Down
2 changes: 2 additions & 0 deletions test/ra_log_segment_writer_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ accept_mem_tables_for_down_server(Config) ->
%% but not running
ok = dets:insert(maps:get(directory_rev, get_names(default)),
{down_uid, DownUId}),
true = ets:insert(maps:get(directory, get_names(default)),
{DownUId, undefined, undefined, down_uid, undefined}),
ok = ra_lib:make_dir(filename:join(Dir, DownUId)),
application:start(sasl),
{ok, TblWriterPid} = ra_log_segment_writer:start_link(#{system => default,
Expand Down
Loading