Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend WAL and segment file names from 8 to 16 characters #496

Merged
merged 2 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 30 additions & 16 deletions src/ra_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
zpad_filename/3,
zpad_filename_incr/1,
zpad_extract_num/1,
zpad_upgrade/3,
recursive_delete/1,
make_uid/0,
make_uid/1,
Expand Down Expand Up @@ -146,29 +147,44 @@ zpad_hex(Num) ->
lists:flatten(io_lib:format("~16.16.0B", [Num])).

zpad_filename("", Ext, Num) ->
lists:flatten(io_lib:format("~8..0B.~ts", [Num, Ext]));
lists:flatten(io_lib:format("~16..0B.~ts", [Num, Ext]));
zpad_filename(Prefix, Ext, Num) ->
lists:flatten(io_lib:format("~ts_~8..0B.~ts", [Prefix, Num, Ext])).
lists:flatten(io_lib:format("~ts_~16..0B.~ts", [Prefix, Num, Ext])).

zpad_filename_incr(Fn) ->
Base = filename:basename(Fn),
Dir = filename:dirname(Fn),
case re:run(Base, "(.*)([0-9]{8})(.*)",
case re:run(Base, "(.*)([0-9]{16})(.*)",
[{capture, all_but_first, list}]) of
{match, [Prefix, NumStr, Ext]} ->
Num = list_to_integer(NumStr),
filename:join(Dir,
lists:flatten(
io_lib:format("~ts~8..0B~ts", [Prefix, Num+1, Ext])));
NewFn = lists:flatten(io_lib:format("~ts~16..0B~ts",
[Prefix, Num + 1, Ext])),
filename:join(Dir, NewFn);
_ ->
undefined
end.

zpad_extract_num(Fn) ->
{match, [_, NumStr, _]} = re:run(Fn, "(.*)([0-9]{8})(.*)",
{match, [_, NumStr, _]} = re:run(Fn, "(.*)([0-9]{16})(.*)",
[{capture, all_but_first, list}]),
list_to_integer(NumStr).

zpad_upgrade(Dir, File, Ext) ->
B = filename:basename(File, Ext),
case length(B) of
8 ->
%% old format, convert and rename
F = "00000000" ++ B ++ Ext,
New = filename:join(Dir, F),
Old = filename:join(Dir, File),
ok = file:rename(Old, New),
F;
16 ->
File
end.


recursive_delete(Dir) ->
case is_dir(Dir) of
true ->
Expand Down Expand Up @@ -427,16 +443,14 @@ lists_shuffle(List0) ->

is_dir(Dir) ->
case prim_file:read_file_info(Dir) of
{ok, #file_info{type=directory}} ->
{ok, #file_info{type = directory}} ->
true;
_ ->
false
end.

is_file(File) ->
case prim_file:read_file_info(File) of
{ok, #file_info{type = directory}} ->
true;
{ok, #file_info{type = regular}} ->
true;
_ ->
Expand Down Expand Up @@ -516,17 +530,17 @@ make_uid_test() ->
ok.

zpad_filename_incr_test() ->
Fn = "/lib/blah/prefix_00000001.segment",
Ex = "/lib/blah/prefix_00000002.segment",
Fn = "/lib/blah/prefix_0000000000000001.segment",
Ex = "/lib/blah/prefix_0000000000000002.segment",
Ex = zpad_filename_incr(Fn),
undefined = zpad_filename_incr("0000001"),
undefined = zpad_filename_incr("000000000000001"),
ok.

zpad_filename_incr_utf8_test() ->
Fn = "/lib/🐰/prefix/00000001.segment",
Ex = "/lib/🐰/prefix/00000002.segment",
Fn = "/lib/🐰/prefix/0000000000000001.segment",
Ex = "/lib/🐰/prefix/0000000000000002.segment",
Ex = zpad_filename_incr(Fn),
undefined = zpad_filename_incr("0000001"),
undefined = zpad_filename_incr("000000000000001"),
ok.

derive_safe_string_test() ->
Expand Down
32 changes: 29 additions & 3 deletions src/ra_log_segment_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ await(SegWriter) ->
ok
end.

%%%===================================================================
-define(UPGRADE_MARKER, "segment_name_upgrade_marker").

%%%==================================================================
%%% gen_server callbacks
%%%===================================================================

Expand All @@ -115,6 +117,7 @@ init([#{data_dir := DataDir,
process_flag(trap_exit, true),
CRef = ra_counters:new(SegWriterName, ?COUNTER_FIELDS),
SegmentConf = maps:get(segment_conf, Conf, #{}),
maybe_upgrade_segment_file_names(System, DataDir),
{ok, #state{system = System,
data_dir = DataDir,
counter = CRef,
Expand Down Expand Up @@ -460,8 +463,8 @@ find_segment_files(Dir) ->
segment_files(Dir) ->
case prim_file:list_dir(Dir) of
{ok, Files0} ->
Files = [filename:join(Dir, F) || F <- Files0,
filename:extension(F) == ".segment"],
Files = [filename:join(Dir, F)
|| F <- Files0, filename:extension(F) =:= ".segment"],
lists:sort(Files);
{error, enoent} ->
[]
Expand Down Expand Up @@ -529,3 +532,26 @@ maybe_wait_for_segment_writer(SegWriter, TimeRemaining)
maybe_wait_for_segment_writer(_SegWriter, _TimeRemaining) ->
ok.

maybe_upgrade_segment_file_names(System, DataDir) ->
Marker = filename:join(DataDir, ?UPGRADE_MARKER),
case ra_lib:is_file(Marker) of
false ->
?INFO("segment_writer: upgrading segment file names to "
"new format in dirctory ~ts",
[DataDir]),
[begin
Dir = filename:join(DataDir, UId),
case prim_file:list_dir(Dir) of
{ok, Files} ->
[ra_lib:zpad_upgrade(Dir, F, ".segment")
|| F <- Files, filename:extension(F) =:= ".segment"];
{error, enoent} ->
ok
end
end || {_, UId} <- ra_directory:list_registered(System)],
the-mikedavis marked this conversation as resolved.
Show resolved Hide resolved

ok = ra_lib:write_file(Marker, <<>>);
true ->
ok
end.

9 changes: 6 additions & 3 deletions src/ra_log_wal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -370,9 +370,12 @@ recover_wal(Dir, #conf{segment_writer = SegWriter,
ok = ra_log_segment_writer:await(SegWriter),
post_boot
end,
{ok, Files} = file:list_dir(Dir),
WalFiles = lists:sort([F || F <- Files,
filename:extension(F) == ".wal"]),
{ok, Files0} = file:list_dir(Dir),
Files = [begin
ra_lib:zpad_upgrade(Dir, File, ".wal")
end || File <- Files0,
filename:extension(File) == ".wal"],
WalFiles = lists:sort(Files),
AllWriters =
[begin
?DEBUG("wal: recovering ~ts, Mode ~s", [F, Mode]),
Expand Down
2 changes: 1 addition & 1 deletion test/ra_dbg_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ execute_state_machine() ->

wal_file() ->
{ok, RaDataDir} = application:get_env(ra, data_dir),
filename:join([RaDataDir, node(), "00000001.wal"]).
filename:join([RaDataDir, node(), "0000000000000001.wal"]).

report(Pid, Count) ->
receive
Expand Down
63 changes: 53 additions & 10 deletions test/ra_log_segment_writer_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ all_tests() ->
truncate_segments_with_pending_update,
truncate_segments_with_pending_overwrite,
my_segments,
upgrade_segment_name_format,
skip_entries_lower_than_snapshot_index,
skip_all_entries_lower_than_snapshot_index
].
Expand All @@ -61,7 +62,6 @@ init_per_testcase(TestCase, Config) ->
SysCfg = ra_system:default_config(),
ra_system:store(SysCfg),
_ = ra_log_ets:start_link(SysCfg),
% ra_directory:init(default),
ra_counters:init(),
UId = atom_to_binary(TestCase, utf8),
ok = ra_directory:register_name(default, UId, self(), undefined,
Expand Down Expand Up @@ -314,23 +314,23 @@ accept_mem_tables_for_down_server(Config) ->
ets:new(ra_log_closed_mem_tables, [named_table, bag, public]),
Dir = ?config(wal_dir, Config),
UId = ?config(uid, Config),
FakeUId = <<"down-uid">>,
DownUId = <<"down-uid">>,
%% only insert into dets so that the server is shown as registered
%% but not running
ok = dets:insert(maps:get(directory_rev, get_names(default)),
{down_uid, FakeUId}),
ok = ra_lib:make_dir(filename:join(Dir, FakeUId)),
{down_uid, DownUId}),
ok = ra_lib:make_dir(filename:join(Dir, DownUId)),
application:start(sasl),
{ok, TblWriterPid} = ra_log_segment_writer:start_link(#{system => default,
name => ?SEGWR,
data_dir => Dir}),
% fake up a mem segment for Self
Entries = [{1, 42, a}, {2, 42, b}, {3, 43, c}],
Mt = make_mem_table(FakeUId, Entries),
Mt = make_mem_table(DownUId, Entries),
Mt2 = make_mem_table(UId, Entries),
Tid = ra_mt:tid(Mt),
Tid2 = ra_mt:tid(Mt2),
Ranges = #{FakeUId => [{Tid, {1, 3}}],
Ranges = #{DownUId => [{Tid, {1, 3}}],
UId => [{Tid2, {1, 3}}]},
WalFile = filename:join(Dir, "00001.wal"),
ok = file:write_file(WalFile, <<"waldata">>),
Expand All @@ -348,10 +348,11 @@ accept_mem_tables_for_down_server(Config) ->
end,
%% validate fake uid entries were written
ra_log_segment_writer:await(?SEGWR),
FakeSegmentFile = filename:join([?config(wal_dir, Config),
FakeUId,
"00000001.segment"]),
{ok, FakeSeg} = ra_log_segment:open(FakeSegmentFile, #{mode => read}),
DownFn = ra_lib:zpad_filename("", "segment", 1),
ct:pal("DownFn ~s", [DownFn]),
DownSegmentFile = filename:join([?config(wal_dir, Config),
DownUId, DownFn]),
{ok, FakeSeg} = ra_log_segment:open(DownSegmentFile, #{mode => read}),
% assert Entries have been fully transferred
Entries = [{I, T, binary_to_term(B)}
|| {I, T, B} <- read_sparse(FakeSeg, [1, 2, 3])],
Expand Down Expand Up @@ -701,6 +702,48 @@ my_segments(Config) ->
proc_lib:stop(TblWriterPid),
ok.

upgrade_segment_name_format(Config) ->
Dir = ?config(wal_dir, Config),
{ok, TblWriterPid} = ra_log_segment_writer:start_link(#{name => ?SEGWR,
system => default,
data_dir => Dir}),
UId = ?config(uid, Config),
% fake up a mem segment for Self
Entries = [{1, 42, a}, {2, 42, b}, {3, 43, c}],
Mt = make_mem_table(UId, Entries),
Ranges = #{UId => [{ra_mt:tid(Mt), ra_mt:range(Mt)}]},
TidRanges = maps:get(UId, Ranges),
WalFile = make_wal(Config, "00001.wal"),
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, WalFile),
File =
receive
{ra_log_event, {segments, TidRanges, [{{1, 3}, _Fn}]}} ->
[MyFile] = ra_log_segment_writer:my_segments(?SEGWR,UId),
MyFile
after 2000 ->
flush(),
exit(ra_log_event_timeout)
end,

%% stop segment writer and rename existing segment to old format
proc_lib:stop(TblWriterPid),
Root = filename:dirname(File),
Base = filename:basename(File),
{_, FileOld} = lists:split(8, Base),
ok = file:rename(File, filename:join(Root, FileOld)),
%% also remove upgrade marker file
ok = file:delete(filename:join(Dir, "segment_name_upgrade_marker")),
%% restart segment writer which should trigger upgrade process
{ok, Pid2} = ra_log_segment_writer:start_link(#{name => ?SEGWR,
system => default,
data_dir => Dir}),
%% validate the renamed segment has been renamed back to the new
%% 16 character format
[File] = ra_log_segment_writer:my_segments(?SEGWR, UId),

proc_lib:stop(Pid2),
ok.

skip_entries_lower_than_snapshot_index(Config) ->
Dir = ?config(wal_dir, Config),
UId = ?config(uid, Config),
Expand Down
36 changes: 35 additions & 1 deletion test/ra_log_wal_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ all() ->
all_tests() ->
[
basic_log_writes,
wal_filename_upgrade,
same_uid_different_process,
consecutive_terms_in_batch_should_result_in_two_written_events,
overwrite_in_same_batch,
Expand Down Expand Up @@ -143,7 +144,7 @@ basic_log_writes(Config) ->
ra_log_wal:force_roll_over(Pid),
receive
{'$gen_cast',
{mem_tables, #{UId := [{Tid, {12, 13}}]}, "00000001.wal"}} ->
{mem_tables, #{UId := [{Tid, {12, 13}}]}, "0000000000000001.wal"}} ->
ok
after 5000 ->
flush(),
Expand All @@ -153,6 +154,39 @@ basic_log_writes(Config) ->
meck:unload(),
ok.

wal_filename_upgrade(Config) ->
meck:new(ra_log_segment_writer, [passthrough]),
meck:expect(ra_log_segment_writer, await, fun(_) -> ok end),
Conf = ?config(wal_conf, Config),
#{dir := Dir} = Conf,
{UId, _} = WriterId = ?config(writer_id, Config),
Tid = ets:new(?FUNCTION_NAME, []),
{ok, Pid} = ra_log_wal:start_link(Conf#{segment_writer => self()}),
{ok, _} = ra_log_wal:write(Pid, WriterId, Tid, 12, 1, "value"),
ok = await_written(WriterId, 1, {12, 12}),
{ok, _} = ra_log_wal:write(Pid, WriterId, Tid, 13, 1, "value2"),
ok = await_written(WriterId, 1, {13, 13}),
proc_lib:stop(Pid),
%% rename file to old 8 character format
Fn = filename:join(Dir, "0000000000000001.wal"),
FnOld = filename:join(Dir, "00000001.wal"),
ok = file:rename(Fn, FnOld),
% debugger:start(),
% int:i(ra_log_wal),
% int:break(ra_log_wal, 373),
{ok, Pid2} = ra_log_wal:start_link(Conf#{segment_writer => self()}),
receive
{'$gen_cast',
{mem_tables, #{UId := [{_Tid, {12, 13}}]}, "0000000000000001.wal"}} ->
ok
after 5000 ->
flush(),
ct:fail("receiving mem tables timed out")
end,
proc_lib:stop(Pid2),
meck:unload(),
ok.

same_uid_different_process(Config) ->
meck:new(ra_log_segment_writer, [passthrough]),
meck:expect(ra_log_segment_writer, await, fun(_) -> ok end),
Expand Down
Loading