diff --git a/src/ra.hrl b/src/ra.hrl index 539f1931..1c45521f 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -276,6 +276,8 @@ "Number of checkpoint bytes written"}, {checkpoints_promoted, ?C_RA_LOG_CHECKPOINTS_PROMOTED, counter, "Number of checkpoints promoted to snapshots"}, + {checkpoints_promotions_dropped, ?C_RA_LOG_CHECKPOINTS_PROMOTIONS_DROPPED, + counter, "Number of checkpoints promotions that were not exectued"}, {reserved_1, ?C_RA_LOG_RESERVED, counter, "Reserved counter"} ]). -define(C_RA_LOG_WRITE_OPS, 1). @@ -293,7 +295,8 @@ -define(C_RA_LOG_CHECKPOINTS_WRITTEN, 13). -define(C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, 14). -define(C_RA_LOG_CHECKPOINTS_PROMOTED, 15). --define(C_RA_LOG_RESERVED, 16). +-define(C_RA_LOG_CHECKPOINTS_PROMOTIONS_DROPPED, 16). +-define(C_RA_LOG_RESERVED, 17). -define(C_RA_SRV_AER_RECEIVED_FOLLOWER, ?C_RA_LOG_RESERVED + 1). -define(C_RA_SRV_AER_REPLIES_SUCCESS, ?C_RA_LOG_RESERVED + 2). diff --git a/src/ra_directory.erl b/src/ra_directory.erl index 8696e732..caf81ecf 100644 --- a/src/ra_directory.erl +++ b/src/ra_directory.erl @@ -53,6 +53,11 @@ init(Dir, #{directory := Name, [{file, Dets}, {auto_save, 500}, {access, read_write}]), + _ = dets:foldl(fun({ServerName, UId}, Acc) -> + true = ets:insert(Name, {UId, undefined, undefined, + ServerName, undefined}), + Acc + end, [], NameRev), ok. -spec deinit(atom() | ra_system:names()) -> ok. @@ -116,8 +121,9 @@ where_is(#{directory_rev := DirRev} = Names, ServerName) end; where_is(#{directory := Dir}, UId) when is_binary(UId) -> case ets:lookup(Dir, UId) of - [{_, Pid, _, _, _}] -> Pid; - [] -> undefined + [{_, Pid, _, _, _}] when is_pid(Pid) -> + Pid; + _ -> undefined end. -spec where_is_parent(atom() | ra_system:names(), ra_uid() | atom()) -> @@ -133,8 +139,8 @@ where_is_parent(#{directory_rev := DirRev} = Names, ServerName) end; where_is_parent(#{directory := Dir}, UId) when is_binary(UId) -> case ets:lookup(Dir, UId) of - [{_, _, Pid, _, _}] -> Pid; - [] -> undefined + [{_, _, Pid, _, _}] when is_pid(Pid) -> Pid; + _ -> undefined end. -spec name_of(atom() | ra_system:names(), ra_uid()) -> option(atom()). @@ -143,13 +149,7 @@ name_of(SystemOrNames, UId) -> case ets:lookup(Tbl, UId) of [{_, _, _, ServerName, _}] -> ServerName; [] -> - TblRev = get_reverse(SystemOrNames), - case dets:select(TblRev, [{{'_', UId}, [], ['$_']}]) of - [] -> - undefined; - [{Name, UId}] -> - Name - end + undefined end. -spec cluster_name_of(ra_system:names() | atom(), ra_uid()) -> @@ -157,16 +157,18 @@ name_of(SystemOrNames, UId) -> cluster_name_of(SystemOrNames, UId) -> Tbl = get_name(SystemOrNames), case ets:lookup(Tbl, UId) of - [{_, _, _, _, ClusterName}] -> ClusterName; - [] -> undefined + [{_, _, _, _, ClusterName}] + when ClusterName /= undefined -> + ClusterName; + _ -> undefined end. -spec pid_of(atom() | ra_system:names(), ra_uid()) -> option(pid()). pid_of(SystemOrNames, UId) -> case ets:lookup(get_name(SystemOrNames), UId) of - [{_, Pid, _, _, _}] -> Pid; - [] -> undefined + [{_, Pid, _, _, _}] when is_pid(Pid) -> Pid; + _ -> undefined end. uid_of(System, ServerName) when is_atom(System) -> @@ -203,17 +205,19 @@ overview(System) when is_atom(System) -> undefined)}} end, #{}, Dir). --spec list_registered(atom()) -> [{atom(), ra_uid()}]. -list_registered(System) when is_atom(System) -> - Tbl = get_reverse(System), +-spec list_registered(atom() | ra_system:names()) -> + [{atom(), ra_uid()}]. +list_registered(SystemOrNames) + when is_atom(SystemOrNames) orelse + is_map(SystemOrNames) -> + Tbl = get_reverse(SystemOrNames), dets:select(Tbl, [{'_', [], ['$_']}]). -spec is_registered_uid(atom() | ra_system:names(), ra_uid()) -> boolean(). is_registered_uid(SystemOrNames, UId) when (is_atom(SystemOrNames) orelse is_map(SystemOrNames)) andalso is_binary(UId) -> - Tbl = get_reverse(SystemOrNames), - [] =/= dets:select(Tbl, [{{'_', UId}, [], ['$_']}]). + name_of(SystemOrNames, UId) =/= undefined. get_name(#{directory := Tbl}) -> Tbl; @@ -230,4 +234,3 @@ get_reverse(#{directory_rev := DirRev}) -> get_names(System) when is_atom(System) -> #{names := Names} = ra_system:fetch(System), Names. - diff --git a/src/ra_log.erl b/src/ra_log.erl index 53129fe7..deeacae4 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -895,6 +895,7 @@ promote_checkpoint(Idx, #?MODULE{cfg = Cfg, snapshot_state = SnapState0} = State) -> case ra_snapshot:pending(SnapState0) of {_WriterPid, _IdxTerm, snapshot} -> + ok = incr_counter(Cfg, ?C_RA_LOG_CHECKPOINTS_PROMOTIONS_DROPPED, 1), %% If we're currently writing a snapshot, skip promoting a %% checkpoint. {State, []}; diff --git a/src/ra_log_segment_writer.erl b/src/ra_log_segment_writer.erl index 5a0d7d52..7e94a778 100644 --- a/src/ra_log_segment_writer.erl +++ b/src/ra_log_segment_writer.erl @@ -137,6 +137,7 @@ segments_for(UId, #state{data_dir = DataDir}) -> handle_cast({mem_tables, Ranges, WalFile}, #state{data_dir = Dir, system = System} = State) -> + T1 = erlang:monotonic_time(), ok = counters:add(State#state.counter, ?C_MEM_TABLES, map_size(Ranges)), #{names := Names} = ra_system:fetch(System), Degree = erlang:system_info(schedulers), @@ -158,7 +159,6 @@ handle_cast({mem_tables, Ranges, WalFile}, #state{data_dir = Dir, end end, [], Ranges), - T1 = erlang:monotonic_time(), _ = [begin {_, Failures} = ra_lib:partition_parallel( fun (TidRange) -> @@ -365,9 +365,9 @@ snap_idx(ServerUId) -> send_segments(System, ServerUId, TidRanges, SegRefs) -> case ra_directory:pid_of(System, ServerUId) of undefined -> - ?DEBUG("ra_log_segment_writer: error sending " + ?DEBUG("ra_log_segment_writer: unable to send " "ra_log_event to: " - "~ts. Error: ~s", + "~ts. Reason: ~s", [ServerUId, "No Pid"]), %% delete from the memtable on the non-running server's behalf [begin diff --git a/src/ra_log_wal.erl b/src/ra_log_wal.erl index 87aa87e6..87ca98d3 100644 --- a/src/ra_log_wal.erl +++ b/src/ra_log_wal.erl @@ -377,26 +377,26 @@ recover_wal(Dir, #conf{segment_writer = SegWriter, filename:extension(File) == ".wal"], WalFiles = lists:sort(Files), AllWriters = - [begin - ?DEBUG("wal: recovering ~ts, Mode ~s", [F, Mode]), - Fd = open_at_first_record(filename:join(Dir, F)), - {Time, #recovery{ranges = Ranges, - writers = Writers}} = - timer:tc(fun () -> recover_wal_chunks(Conf, Fd, Mode) end), + [begin + ?DEBUG("wal: recovering ~ts, Mode ~s", [F, Mode]), + Fd = open_at_first_record(filename:join(Dir, F)), + {Time, #recovery{ranges = Ranges, + writers = Writers}} = + timer:tc(fun () -> recover_wal_chunks(Conf, Fd, Mode) end), - ok = ra_log_segment_writer:accept_mem_tables(SegWriter, Ranges, F), + ok = ra_log_segment_writer:accept_mem_tables(SegWriter, Ranges, F), - close_existing(Fd), - ?DEBUG("wal: recovered ~ts time taken ~bms - Writer state recovered ~p", - [F, Time div 1000, Writers]), - Writers - end || F <- WalFiles], + close_existing(Fd), + ?DEBUG("wal: recovered ~ts time taken ~bms - recovered ~b writers", + [F, Time div 1000, map_size(Writers)]), + Writers + end || F <- WalFiles], FinalWriters = lists:foldl(fun (New, Acc) -> maps:merge(Acc, New) end, #{}, AllWriters), - ?DEBUG("wal: final writer state recovered ~p", [FinalWriters]), + ?DEBUG("wal: recovered ~b writers", [map_size(FinalWriters)]), FileNum = extract_file_num(lists:reverse(WalFiles)), State = roll_over(#state{conf = Conf, diff --git a/test/ra_log_segment_writer_SUITE.erl b/test/ra_log_segment_writer_SUITE.erl index 03cde21b..5aed1caa 100644 --- a/test/ra_log_segment_writer_SUITE.erl +++ b/test/ra_log_segment_writer_SUITE.erl @@ -347,6 +347,8 @@ accept_mem_tables_for_down_server(Config) -> %% but not running ok = dets:insert(maps:get(directory_rev, get_names(default)), {down_uid, DownUId}), + true = ets:insert(maps:get(directory, get_names(default)), + {DownUId, undefined, undefined, down_uid, undefined}), ok = ra_lib:make_dir(filename:join(Dir, DownUId)), application:start(sasl), {ok, TblWriterPid} = ra_log_segment_writer:start_link(#{system => default,