From cd58bbbab61894e8340a3962f29c2bde8053a376 Mon Sep 17 00:00:00 2001 From: James Piechota Date: Sat, 25 Jan 2025 02:20:59 +0000 Subject: [PATCH 1/2] WIP --- apps/arweave/src/ar_chunk_storage.erl | 434 ++++---------------- apps/arweave/src/ar_chunk_storage_sup.erl | 47 +-- apps/arweave/src/ar_entropy_gen.erl | 463 ++++++++++++++++++++++ apps/arweave/src/ar_entropy_storage.erl | 167 ++++---- apps/arweave/src/ar_repack.erl | 6 +- 5 files changed, 650 insertions(+), 467 deletions(-) create mode 100644 apps/arweave/src/ar_entropy_gen.erl diff --git a/apps/arweave/src/ar_chunk_storage.erl b/apps/arweave/src/ar_chunk_storage.erl index 906a8514e..1a29646b5 100644 --- a/apps/arweave/src/ar_chunk_storage.erl +++ b/apps/arweave/src/ar_chunk_storage.erl @@ -3,18 +3,19 @@ -behaviour(gen_server). --export([start_link/2, name/1, is_storage_supported/3, put/3, put/4, +-export([start_link/2, name/1, register_workers/0, is_storage_supported/3, put/3, put/4, open_files/1, get/1, get/2, get/3, get/5, locate_chunk_on_disk/2, - get_range/2, get_range/3, cut/2, delete/1, delete/2, + get_range/2, get_range/3, cut/2, delete/1, delete/2, set_entropy_context/2, get_filepath/2, get_handle_by_filepath/1, close_file/2, close_files/1, list_files/2, run_defragmentation/0, get_storage_module_path/2, get_chunk_storage_path/2, get_chunk_bucket_start/1, get_chunk_bucket_end/1, - sync_record_id/1, store_chunk/7, write_chunk/4, record_chunk/7]). + sync_record_id/1, store_chunk/6, write_chunk/4, record_chunk/7]). -export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]). -include("../include/ar.hrl"). +-include("../include/ar_sup.hrl"). -include("../include/ar_config.hrl"). -include("../include/ar_consensus.hrl"). -include("../include/ar_chunk_storage.hrl"). @@ -29,14 +30,11 @@ packing_labels = #{}, packing_map = #{}, repack_cursor = 0, - target_packing = none, + target_packing, repack_status = undefined, + entropy_context = none, %% some data we need pass to ar_entropy_storage range_start, - range_end, - reward_addr, - prepare_replica_2_9_cursor, - prepare_slice_index = 0, - prepare_status = undefined + range_end }). -ifdef(AR_TEST). @@ -50,13 +48,44 @@ %%%=================================================================== %% @doc Start the server. -start_link(Name, StoreID) -> - gen_server:start_link({local, Name}, ?MODULE, StoreID, []). +start_link(Name, {StoreID, RepackInPlacePacking}) -> + gen_server:start_link({local, Name}, ?MODULE, {StoreID, RepackInPlacePacking}, []). %% @doc Return the name of the server serving the given StoreID. name(StoreID) -> list_to_atom("ar_chunk_storage_" ++ ar_storage_module:label_by_id(StoreID)). +register_workers() -> + {ok, Config} = application:get_env(arweave, config), + ConfiguredWorkers = lists:map( + fun(StorageModule) -> + StoreID = ar_storage_module:id(StorageModule), + + ChunkStorageName = ar_chunk_storage:name(StoreID), + ?CHILD_WITH_ARGS(ar_chunk_storage, worker, + ChunkStorageName, [ChunkStorageName, {StoreID, none}]) + end, + Config#config.storage_modules + ), + + DefaultChunkStorageWorker = ?CHILD_WITH_ARGS(ar_chunk_storage, worker, + ar_chunk_storage_default, [ar_chunk_storage_default, {"default", none}]), + + RepackInPlaceWorkers = lists:map( + fun({StorageModule, Packing}) -> + StoreID = ar_storage_module:id(StorageModule), + %% Note: the config validation will prevent a StoreID from being used in both + %% `storage_modules` and `repack_in_place_storage_modules`, so there's + %% no risk of a `Name` clash with the workers spawned above. + ChunkStorageName = ar_chunk_storage:name(StoreID), + ?CHILD_WITH_ARGS(ar_chunk_storage, worker, + ChunkStorageName, [ChunkStorageName, {StoreID, Packing}]) + end, + Config#config.repack_in_place_storage_modules + ), + + ConfiguredWorkers ++ RepackInPlaceWorkers ++ [DefaultChunkStorageWorker]. + %% @doc Return true if we can accept the chunk for storage. %% 256 KiB chunks are stored in the blob storage optimized for read speed. %% Unpacked chunks smaller than 256 KiB cannot be stored here currently, @@ -268,6 +297,9 @@ get_chunk_bucket_end(EndOffset) -> PaddedEndOffset = ar_block:get_chunk_padded_offset(EndOffset), get_chunk_bucket_start(PaddedEndOffset) + ?DATA_CHUNK_SIZE. +set_entropy_context(StoreID, EntropyContext) -> + gen_server:cast(name(StoreID), {set_entropy_context, EntropyContext}). + %%%=================================================================== %%% Generic server callbacks. %%%=================================================================== @@ -312,49 +344,22 @@ init({StoreID, RepackInPlacePacking}) -> warn_custom_chunk_group_size(StoreID), {RangeStart, RangeEnd} = ar_storage_module:get_range(StoreID), StoreIDLabel = ar_storage_module:label_by_id(StoreID), - State = #state{ file_index = FileIndex2, store_id = StoreID, - range_start = RangeStart, range_end = RangeEnd, store_id_label = StoreIDLabel }, - RunEntropyProcess = - case RepackInPlacePacking of - none -> - case ar_storage_module:get_packing(StoreID) of - {replica_2_9, Addr} -> - {true, Addr}; - _ -> - false - end; - {replica_2_9, Addr} -> - {true, Addr}; - _ -> - false - end, - State2 = - case RunEntropyProcess of - {true, RewardAddr} -> - PrepareCursor = - read_prepare_replica_2_9_cursor(StoreID, RangeStart + 1), - ?LOG_INFO([{event, read_prepare_replica_2_9_cursor}, {store_id, StoreID}, - {cursor, PrepareCursor}, {range_start, RangeStart}, - {range_end, RangeEnd}]), - PrepareStatus = - case PrepareCursor =< RangeEnd of - true -> - gen_server:cast(self(), prepare_replica_2_9), - paused; - false -> - complete - end, - BucketEndOffset = get_chunk_bucket_end(PrepareCursor), - State#state{ reward_addr = RewardAddr, - prepare_replica_2_9_cursor = PrepareCursor, - prepare_slice_index = ar_replica_2_9:get_slice_index(BucketEndOffset), - prepare_status = PrepareStatus }; - _ -> - State#state{ prepare_status = off } - end, - case RepackInPlacePacking of + + State = #state{ + file_index = FileIndex2, + store_id = StoreID, + range_start = RangeStart, + range_end = RangeEnd, + store_id_label = StoreIDLabel + }, + + State2 = case RepackInPlacePacking of none -> - {ok, State2#state{ repack_cursor = none, repack_status = off }}; + State#state{ + repack_cursor = none, + repack_status = off, + target_packing = ar_storage_module:get_packing(StoreID) + }; Packing -> RepackCursor = ar_repack:read_cursor(StoreID, Packing, RangeStart), gen_server:cast(self(), {repack, Packing}), @@ -363,11 +368,17 @@ init({StoreID, RepackInPlacePacking}) -> {cursor, RepackCursor}, {store_id, StoreID}, {target_packing, ar_serialize:encode_packing(Packing, true)}]), - {ok, State2#state{ + State#state{ repack_cursor = RepackCursor, target_packing = Packing, - repack_status = paused }} - end. + repack_status = paused + } + end, + + EntropyContext = ar_entropy_gen:initialize_context(StoreID, State2#state.target_packing), + State3 = State2#state{ entropy_context = EntropyContext }, + + {ok, State3}. warn_custom_chunk_group_size(StoreID) -> case StoreID == "default" andalso get_chunk_group_size() /= ?CHUNK_GROUP_SIZE of @@ -383,28 +394,13 @@ warn_custom_chunk_group_size(StoreID) -> ok end. - -handle_cast(prepare_replica_2_9, State) -> - #state{ store_id = StoreID } = State, - NewStatus = ar_device_lock:acquire_lock(prepare, StoreID, State#state.prepare_status), - State2 = State#state{ prepare_status = NewStatus }, - State3 = case NewStatus of - active -> - do_prepare_replica_2_9(State2); - paused -> - ar_util:cast_after(?DEVICE_LOCK_WAIT, self(), prepare_replica_2_9), - State2; - _ -> - State2 - end, - {noreply, State3}; - handle_cast(store_repack_cursor, #state{ repack_status = complete } = State) -> {noreply, State}; handle_cast(store_repack_cursor, #state{ repack_cursor = Cursor, store_id = StoreID, target_packing = TargetPacking } = State) -> ar_repack:store_cursor(Cursor, StoreID, TargetPacking), + ar_entropy_gen:set_repack_cursor(StoreID, Cursor), {noreply, State}; handle_cast(repacking_complete, State) -> @@ -451,20 +447,22 @@ handle_cast({register_packing_ref, Ref, Args}, #state{ packing_map = Map } = Sta handle_cast({expire_repack_request, Ref}, #state{ packing_map = Map } = State) -> {noreply, State#state{ packing_map = maps:remove(Ref, Map) }}; +handle_cast({set_entropy_context, EntropyContext}, State) -> + {noreply, State#state{ entropy_context = EntropyContext }}; + handle_cast(Cast, State) -> ?LOG_WARNING([{event, unhandled_cast}, {module, ?MODULE}, {cast, Cast}]), {noreply, State}. handle_call({put, PaddedEndOffset, Chunk, Packing}, _From, State) when byte_size(Chunk) == ?DATA_CHUNK_SIZE -> - #state{ store_id = StoreID, store_id_label = StoreIDLabel, reward_addr = RewardAddr, - prepare_status = PrepareStatus, file_index = FileIndex } = State, + #state{ store_id = StoreID, store_id_label = StoreIDLabel, + entropy_context = EntropyContext, file_index = FileIndex } = State, - IsPrepared = PrepareStatus == complete, {PackingLabel, State2} = get_packing_label(Packing, State), Result = store_chunk( PaddedEndOffset, Chunk, Packing, StoreID, - StoreIDLabel, PackingLabel, FileIndex, IsPrepared, RewardAddr), + StoreIDLabel, PackingLabel, FileIndex, EntropyContext), case Result of {ok, FileIndex2, NewPacking} -> {reply, {ok, NewPacking}, State2#state{ file_index = FileIndex2 }}; @@ -520,11 +518,10 @@ handle_info({chunk, {packed, Ref, ChunkArgs}}, {noreply, State}; Args -> State2 = State#state{ packing_map = maps:remove(Ref, Map) }, - #state{ store_id = StoreID, reward_addr = RewardAddr, - prepare_status = PrepareStatus, file_index = FileIndex } = State2, - IsPrepared = PrepareStatus == complete, + #state{ store_id = StoreID, entropy_context = EntropyContext, + file_index = FileIndex } = State2, case ar_repack:chunk_repacked( - ChunkArgs, Args, StoreID, FileIndex, IsPrepared, RewardAddr) of + ChunkArgs, Args, StoreID, FileIndex, EntropyContext) of {ok, FileIndex2} -> {noreply, State2#state{ file_index = FileIndex2 }}; Error -> @@ -566,226 +563,26 @@ get_chunk_group_size() -> {ok, Config} = application:get_env(arweave, config), Config#config.chunk_storage_file_size. -do_prepare_replica_2_9(State) -> - #state{ reward_addr = RewardAddr, prepare_replica_2_9_cursor = Start, - range_start = RangeStart, range_end = RangeEnd, - store_id = StoreID, repack_cursor = RepackCursor, - prepare_slice_index = PreviousSliceIndex } = State, - - BucketEndOffset = get_chunk_bucket_end(Start), - PaddedRangeEnd = get_chunk_bucket_end(RangeEnd), - - %% Sanity checks: - BucketEndOffset = get_chunk_bucket_end(BucketEndOffset), - true = ( - get_chunk_bucket_start(ar_block:get_chunk_padded_offset(Start)) == - get_chunk_bucket_start(BucketEndOffset) - ), - true = ( - max(0, BucketEndOffset - ?DATA_CHUNK_SIZE) == get_chunk_bucket_start(BucketEndOffset) - ), - %% End of sanity checks. - - SliceIndex = ar_replica_2_9:get_slice_index(BucketEndOffset), - case SliceIndex of - _ when SliceIndex /= PreviousSliceIndex -> - %% Whenever the slice changes BucketEndOffset might be an offset that was - %% written to in a previous iteration. Furthermore it's possible (though unlikely), - %% that the write is still in process. So to make sure our "is recorded" checks - %% below consider all pending writes, we'll wait for the entropy storage process - %% to complete before proceeding. - %% - %% In practice we only expect pending writes to be a problem in tests. It can - %% hypothetically happen in production but is unlikely. - ?LOG_DEBUG([{event, prepare_replica_2_9_slice_changed}, {store_id, StoreID}, - {bucket_end_offset, BucketEndOffset}, - {previous_slice_index, PreviousSliceIndex}, - {slice_index, SliceIndex}]), - ar_entropy_storage:is_ready(StoreID); - _ -> - ok - end, - - CheckRangeEnd = - case BucketEndOffset > PaddedRangeEnd of - true -> - ar_device_lock:release_lock(prepare, StoreID), - ?LOG_INFO([{event, storage_module_replica_2_9_preparation_complete}, - {store_id, StoreID}]), - ar:console("The storage module ~s is prepared for 2.9 replication.~n", - [StoreID]), - complete; - false -> - false - end, - - Start2 = BucketEndOffset + ?DATA_CHUNK_SIZE, - State2 = State#state{ - prepare_replica_2_9_cursor = Start2, - prepare_slice_index = SliceIndex }, - CheckRepackCursor = - case CheckRangeEnd of - complete -> - complete; - false -> - case RepackCursor of - none -> - false; - _ -> - SectorSize = ar_replica_2_9:get_sector_size(), - RangeStart2 = get_chunk_bucket_start(RangeStart + 1), - RepackCursor2 = get_chunk_bucket_start(RepackCursor + 1), - RepackSectorShift = (RepackCursor2 - RangeStart2) rem SectorSize, - SectorShift = (BucketEndOffset - RangeStart2) rem SectorSize, - case SectorShift > RepackSectorShift of - true -> - waiting_for_repack; - false -> - false - end - end - end, - CheckIsRecorded = - case CheckRepackCursor of - complete -> - complete; - waiting_for_repack -> - waiting_for_repack; - false -> - ar_entropy_storage:is_entropy_recorded(BucketEndOffset, StoreID) - end, - - %% get_entropy_partition will use bucket *start* offset to determine the partition. - Partition = ar_replica_2_9:get_entropy_partition(BucketEndOffset), - StoreEntropy = - case CheckIsRecorded of - complete -> - complete; - waiting_for_repack -> - waiting_for_repack; - true -> - is_recorded; - false -> - %% Get all the entropies needed to encipher the chunk at BucketEndOffset. - Entropies = prometheus_histogram:observe_duration( - replica_2_9_entropy_duration_milliseconds, [32], - fun() -> - ar_entropy_storage:generate_entropies(RewardAddr, BucketEndOffset) - end), - case Entropies of - {error, Reason} -> - {error, Reason}; - _ -> - EntropyKeys = ar_entropy_storage:generate_entropy_keys( - RewardAddr, BucketEndOffset), - - %% A set of generated entropies covers slighly more than 3.6TB of - %% chunks, however we only want to use the first 3.6TB - %% (+ chunk padding) of it. - PartitionEnd = (Partition + 1) * ?PARTITION_SIZE, - PaddedPartitionEnd = - get_chunk_bucket_end( - ar_block:get_chunk_padded_offset(PartitionEnd)), - %% In addition to limiting this iteration to the PaddedPartitionEnd, - %% we also want to limit it to the current storage module's range. - %% This allows us to handle both the storage module range as well - %% as the small overlap region. - IterationEnd = min(PaddedPartitionEnd, RangeEnd), - %% Wait for the previous store_entropy to complete. Should only - %% return 'false' if the entropy storage process is down (e.g. during - %% shutdown) - case ar_entropy_storage:is_ready(StoreID) of - true -> - ar_entropy_storage:store_entropy( - StoreID, Entropies, BucketEndOffset, - IterationEnd, EntropyKeys, RewardAddr); - false -> - {error, entropy_storage_not_ready} - end - end - end, - ?LOG_DEBUG([{event, stored_replica_2_9_entropy}, {store_id, StoreID}, - {start, Start}, {bucket_end_offset, BucketEndOffset}, - {slice_index, ar_replica_2_9:get_slice_index(BucketEndOffset)}, - {range_start, RangeStart}, {range_end, RangeEnd}, - {partition, Partition}, - {repack_cursor, RepackCursor}, - {padded_range_end, PaddedRangeEnd}, - {check_is_recorded, CheckIsRecorded}, {store_entropy, StoreEntropy}]), - case StoreEntropy of - complete -> - State#state{ prepare_status = complete }; - waiting_for_repack -> - ?LOG_INFO([{event, waiting_for_repacking}, - {store_id, StoreID}, - {padded_end_offset, BucketEndOffset}, - {repack_cursor, RepackCursor}, - {cursor, Start}, - {range_start, RangeStart}, - {range_end, RangeEnd}]), - ar_util:cast_after(10000, self(), prepare_replica_2_9), - State; - is_recorded -> - gen_server:cast(self(), prepare_replica_2_9), - State2; - {error, Error} -> - ?LOG_WARNING([{event, failed_to_store_replica_2_9_entropy}, - {cursor, Start}, - {store_id, StoreID}, - {reason, io_lib:format("~p", [Error])}]), - ar_util:cast_after(500, self(), prepare_replica_2_9), - State; - ok -> - gen_server:cast(self(), prepare_replica_2_9), - case store_prepare_replica_2_9_cursor(Start2, StoreID) of - ok -> - ok; - {error, Error} -> - ?LOG_WARNING([{event, failed_to_store_prepare_replica_2_9_cursor}, - {chunk_cursor, Start2}, - {store_id, StoreID}, - {reason, io_lib:format("~p", [Error])}]) - end, - State2 - end. - -read_prepare_replica_2_9_cursor(StoreID, Default) -> - Filepath = get_filepath("prepare_replica_2_9_cursor", StoreID), - case file:read_file(Filepath) of - {ok, Bin} -> - case catch binary_to_term(Bin) of Cursor when is_integer(Cursor) -> - Cursor; - _ -> - Default - end; - _ -> - Default - end. - -store_prepare_replica_2_9_cursor(Cursor, StoreID) -> - Filepath = get_filepath("prepare_replica_2_9_cursor", StoreID), - file:write_file(Filepath, term_to_binary(Cursor)). - get_filepath(Name, StoreID) -> {ok, Config} = application:get_env(arweave, config), DataDir = Config#config.data_dir, ChunkDir = get_chunk_storage_path(DataDir, StoreID), filename:join([ChunkDir, Name]). -store_chunk(PaddedEndOffset, Chunk, Packing, StoreID, FileIndex, IsPrepared, RewardAddr) -> +store_chunk(PaddedEndOffset, Chunk, Packing, StoreID, FileIndex, PrepareContext) -> StoreIDLabel = ar_storage_module:label_by_id(StoreID), PackingLabel = ar_storage_module:packing_label(Packing), store_chunk(PaddedEndOffset, Chunk, Packing, StoreID, - StoreIDLabel, PackingLabel, FileIndex, IsPrepared, RewardAddr). + StoreIDLabel, PackingLabel, FileIndex, PrepareContext). store_chunk( PaddedEndOffset, Chunk, Packing, StoreID, StoreIDLabel, - PackingLabel, FileIndex, IsPrepared, RewardAddr) -> - case ar_entropy_storage:is_entropy_packing(Packing) of + PackingLabel, FileIndex, PrepareContext) -> + case ar_entropy_gen:is_entropy_packing(Packing) of true -> ar_entropy_storage:record_chunk( - PaddedEndOffset, Chunk, RewardAddr, StoreID, - StoreIDLabel, PackingLabel, FileIndex, IsPrepared); + PaddedEndOffset, Chunk, StoreID, + StoreIDLabel, PackingLabel, FileIndex, PrepareContext); false -> record_chunk( PaddedEndOffset, Chunk, Packing, StoreID, @@ -1237,71 +1034,6 @@ chunk_bucket_test() -> ?assertEqual(2621440, get_chunk_bucket_end(10 * ?DATA_CHUNK_SIZE)), ?assertEqual(2359296, get_chunk_bucket_start(10 * ?DATA_CHUNK_SIZE)). - -replica_2_9_test_() -> - {timeout, 20, fun test_replica_2_9/0}. - -test_replica_2_9() -> - RewardAddr = ar_wallet:to_address(ar_wallet:new_keyfile()), - Packing = {replica_2_9, RewardAddr}, - StorageModules = [ - {?PARTITION_SIZE, 0, Packing}, - {?PARTITION_SIZE, 1, Packing} - ], - {ok, Config} = application:get_env(arweave, config), - try - ar_test_node:start(#{ reward_addr => RewardAddr, storage_modules => StorageModules }), - StoreID1 = ar_storage_module:id(lists:nth(1, StorageModules)), - StoreID2 = ar_storage_module:id(lists:nth(2, StorageModules)), - C1 = crypto:strong_rand_bytes(?DATA_CHUNK_SIZE), - %% The replica_2_9 storage does not support updates and three chunks are written - %% into the first partition when the test node is launched. - ?assertEqual({error, already_stored}, - ar_chunk_storage:put(?DATA_CHUNK_SIZE, C1, Packing, StoreID1)), - ?assertEqual({error, already_stored}, - ar_chunk_storage:put(2 * ?DATA_CHUNK_SIZE, C1, Packing, StoreID1)), - ?assertEqual({error, already_stored}, - ar_chunk_storage:put(3 * ?DATA_CHUNK_SIZE, C1, Packing, StoreID1)), - - %% Store the new chunk. - ?assertEqual({ok, {replica_2_9, RewardAddr}}, - ar_chunk_storage:put(4 * ?DATA_CHUNK_SIZE, C1, Packing, StoreID1)), - {ok, P1, _Entropy} = - ar_packing_server:pack_replica_2_9_chunk(RewardAddr, 4 * ?DATA_CHUNK_SIZE, C1), - assert_get(P1, 4 * ?DATA_CHUNK_SIZE, StoreID1), - - assert_get(not_found, 8 * ?DATA_CHUNK_SIZE, StoreID1), - ?assertEqual({ok, {replica_2_9, RewardAddr}}, - ar_chunk_storage:put(8 * ?DATA_CHUNK_SIZE, C1, Packing, StoreID1)), - {ok, P2, _} = - ar_packing_server:pack_replica_2_9_chunk(RewardAddr, 8 * ?DATA_CHUNK_SIZE, C1), - assert_get(P2, 8 * ?DATA_CHUNK_SIZE, StoreID1), - - %% Store chunks in the second partition. - ?assertEqual({ok, {replica_2_9, RewardAddr}}, - ar_chunk_storage:put(12 * ?DATA_CHUNK_SIZE, C1, Packing, StoreID2)), - {ok, P3, Entropy3} = - ar_packing_server:pack_replica_2_9_chunk(RewardAddr, 12 * ?DATA_CHUNK_SIZE, C1), - - assert_get(P3, 12 * ?DATA_CHUNK_SIZE, StoreID2), - ?assertEqual({ok, {replica_2_9, RewardAddr}}, - ar_chunk_storage:put(15 * ?DATA_CHUNK_SIZE, C1, Packing, StoreID2)), - {ok, P4, Entropy4} = - ar_packing_server:pack_replica_2_9_chunk(RewardAddr, 15 * ?DATA_CHUNK_SIZE, C1), - assert_get(P4, 15 * ?DATA_CHUNK_SIZE, StoreID2), - ?assertNotEqual(P3, P4), - ?assertNotEqual(Entropy3, Entropy4), - - ?assertEqual({ok, {replica_2_9, RewardAddr}}, - ar_chunk_storage:put(16 * ?DATA_CHUNK_SIZE, C1, Packing, StoreID2)), - {ok, P5, Entropy5} = - ar_packing_server:pack_replica_2_9_chunk(RewardAddr, 16 * ?DATA_CHUNK_SIZE, C1), - assert_get(P5, 16 * ?DATA_CHUNK_SIZE, StoreID2), - ?assertNotEqual(Entropy4, Entropy5) - after - ok = application:set_env(arweave, config, Config) - end. - well_aligned_test_() -> {timeout, 20, fun test_well_aligned/0}. diff --git a/apps/arweave/src/ar_chunk_storage_sup.erl b/apps/arweave/src/ar_chunk_storage_sup.erl index 7680e35ee..1a2896240 100644 --- a/apps/arweave/src/ar_chunk_storage_sup.erl +++ b/apps/arweave/src/ar_chunk_storage_sup.erl @@ -22,49 +22,8 @@ start_link() -> init([]) -> ets:new(chunk_storage_file_index, [set, public, named_table, {read_concurrency, true}]), - {ok, Config} = application:get_env(arweave, config), - ConfiguredWorkers = lists:flatten( - lists:map( - fun(StorageModule) -> - StoreID = ar_storage_module:id(StorageModule), - ChunkStorageName = ar_chunk_storage:name(StoreID), - ChunkStorageWorker = ?CHILD_WITH_ARGS(ar_chunk_storage, worker, - ChunkStorageName, [ChunkStorageName, {StoreID, none}]), - - EntropyStorageName = ar_entropy_storage:name(StoreID), - EntropyStorageWorker = ?CHILD_WITH_ARGS(ar_entropy_storage, worker, - EntropyStorageName, [EntropyStorageName, StoreID]), - - [ChunkStorageWorker, EntropyStorageWorker] - end, - Config#config.storage_modules - ) - ), - - DefaultChunkStorageWorker = ?CHILD_WITH_ARGS(ar_chunk_storage, worker, - ar_chunk_storage_default, [ar_chunk_storage_default, {"default", none}]), - - RepackInPlaceWorkers = lists:flatten( - lists:map( - fun({StorageModule, Packing}) -> - StoreID = ar_storage_module:id(StorageModule), - %% Note: the config validation will prevent a StoreID from being used in both - %% `storage_modules` and `repack_in_place_storage_modules`, so there's - %% no risk of a `Name` clash with the workers spawned above. - ChunkStorageName = ar_chunk_storage:name(StoreID), - ChunkStorageWorker = ?CHILD_WITH_ARGS(ar_chunk_storage, worker, - ChunkStorageName, [ChunkStorageName, {StoreID, Packing}]), - - EntropyStorageName = ar_entropy_storage:name(StoreID), - EntropyStorageWorker = ?CHILD_WITH_ARGS(ar_entropy_storage, worker, - EntropyStorageName, [EntropyStorageName, StoreID]), - - [ChunkStorageWorker, EntropyStorageWorker] - end, - Config#config.repack_in_place_storage_modules - ) - ), - - Workers = [DefaultChunkStorageWorker] ++ ConfiguredWorkers ++ RepackInPlaceWorkers, + Workers = ar_chunk_storage:register_workers() ++ + ar_entropy_gen:register_workers(ar_entropy_gen) ++ + ar_entropy_gen:register_workers(ar_entropy_storage), {ok, {{one_for_one, 5, 10}, Workers}}. diff --git a/apps/arweave/src/ar_entropy_gen.erl b/apps/arweave/src/ar_entropy_gen.erl new file mode 100644 index 000000000..bc5e823ee --- /dev/null +++ b/apps/arweave/src/ar_entropy_gen.erl @@ -0,0 +1,463 @@ +-module(ar_entropy_gen). + +-behaviour(gen_server). + +-export([name/1, register_workers/1, initialize_context/2, is_entropy_packing/1, + set_repack_cursor/2, generate_entropies/2]). + +-export([start_link/2, init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]). + +-include("../include/ar.hrl"). +-include("../include/ar_sup.hrl"). +-include("../include/ar_config.hrl"). +-include("../include/ar_consensus.hrl"). + +-include_lib("eunit/include/eunit.hrl"). + +-record(state, { + store_id, + packing, + range_start, + range_end, + cursor, + slice_index, + prepare_status = undefined, + repack_cursor +}). + +-ifdef(AR_TEST). +-define(DEVICE_LOCK_WAIT, 100). +-else. +-define(DEVICE_LOCK_WAIT, 5_000). +-endif. + +%%%=================================================================== +%%% Public interface. +%%%=================================================================== + +%% @doc Start the server. +start_link(Name, {StoreID, Packing}) -> + gen_server:start_link({local, Name}, ?MODULE, {StoreID, Packing}, []). + +%% @doc Return the name of the server serving the given StoreID. +name(StoreID) -> + list_to_atom("ar_entropy_gen_" ++ ar_storage_module:label_by_id(StoreID)). + + +register_workers(Module) -> + {ok, Config} = application:get_env(arweave, config), + ConfiguredWorkers = lists:filtermap( + fun(StorageModule) -> + StoreID = ar_storage_module:id(StorageModule), + Packing = ar_storage_module:get_packing(StoreID), + + case is_entropy_packing(Packing) of + true -> + Worker = ?CHILD_WITH_ARGS( + Module, worker, Module:name(StoreID), + [Module:name(StoreID), {StoreID, Packing}]), + {true, Worker}; + false -> + false + end + end, + Config#config.storage_modules + ), + + RepackInPlaceWorkers = lists:filtermap( + fun({StorageModule, Packing}) -> + StoreID = ar_storage_module:id(StorageModule), + %% Note: the config validation will prevent a StoreID from being used in both + %% `storage_modules` and `repack_in_place_storage_modules`, so there's + %% no risk of a `Name` clash with the workers spawned above. + case is_entropy_packing(Packing) of + true -> + Worker = ?CHILD_WITH_ARGS( + Module, worker, Module:name(StoreID), + [Module:name(StoreID), {StoreID, Packing}]), + {true, Worker}; + false -> + false + end + end, + Config#config.repack_in_place_storage_modules + ), + + ConfiguredWorkers ++ RepackInPlaceWorkers. + +initialize_context(StoreID, Packing) -> + case Packing of + {replica_2_9, Addr} -> + {RangeStart, RangeEnd} = ar_storage_module:get_range(StoreID), + Cursor = read_cursor(StoreID, RangeStart + 1), + case Cursor =< RangeEnd of + true -> + {false, Addr}; + false -> + {true, Addr} + end; + _ -> + {true, none} + end. + + +-spec is_entropy_packing(ar_chunk_storage:packing()) -> boolean(). +is_entropy_packing(unpacked_padded) -> + true; +is_entropy_packing({replica_2_9, _}) -> + true; +is_entropy_packing(_) -> + false. + +set_repack_cursor(StoreID, RepackCursor) -> + gen_server:cast(name(StoreID), {set_repack_cursor, RepackCursor}). + +init({StoreID, Packing}) -> + ?LOG_INFO([{event, ar_entropy_storage_init}, + {name, name(StoreID)}, {store_id, StoreID}, + {packing, ar_serialize:encode_packing(Packing, true)}]), + + %% Senity checks + {replica_2_9, _} = Packing, + %% End sanity checks + + {RangeStart, RangeEnd} = ar_storage_module:get_range(StoreID), + + Cursor = read_cursor(StoreID, RangeStart + 1), + ?LOG_INFO([{event, read_prepare_replica_2_9_cursor}, {store_id, StoreID}, + {cursor, Cursor}, {range_start, RangeStart}, + {range_end, RangeEnd}]), + PrepareStatus = + case initialize_context(StoreID, Packing) of + {_, none} -> + %% ar_entropy_gen is only used for replica_2_9 packing + ?LOG_ERROR([{event, invalid_packing_for_entropy}, {module, ?MODULE}, + {store_id, StoreID}, + {packing, ar_serialize:encode_packing(Packing, true)}]), + off; + {false, _} -> + gen_server:cast(self(), prepare_entropy), + paused; + _ -> + complete + end, + BucketEndOffset = ar_chunk_storage:get_chunk_bucket_end(Cursor), + RepackCursor = + case Packing == ar_storage_module:get_packing(StoreID) of + true -> + none; + false -> + %% Provided Packing will only differ from the StoreID packing when this + %% module is configured to repack in place. + ar_repack:read_cursor(StoreID, Packing, RangeStart) + end, + State = #state{ + store_id = StoreID, + packing = Packing, + range_start = RangeStart, + range_end = RangeEnd, + cursor = Cursor, + slice_index = ar_replica_2_9:get_slice_index(BucketEndOffset), + prepare_status = PrepareStatus, + repack_cursor = RepackCursor + }, + {ok, State}. + + +handle_cast(prepare_entropy, State) -> + #state{ store_id = StoreID } = State, + NewStatus = ar_device_lock:acquire_lock(prepare, StoreID, State#state.prepare_status), + State2 = State#state{ prepare_status = NewStatus }, + State3 = case NewStatus of + active -> + do_prepare_entropy(State2); + paused -> + ar_util:cast_after(?DEVICE_LOCK_WAIT, self(), prepare_entropy), + State2; + _ -> + State2 + end, + {noreply, State3}; + +handle_cast({set_repack_cursor, RepackCursor}, State) -> + {noreply, State#state{ repack_cursor = RepackCursor }}; + +handle_cast(Cast, State) -> + ?LOG_WARNING([{event, unhandled_cast}, {module, ?MODULE}, {cast, Cast}]), + {noreply, State}. + +handle_call(Call, _From, State) -> + ?LOG_WARNING([{event, unhandled_call}, {module, ?MODULE}, {call, Call}]), + {reply, {error, unhandled_call}, State}. + +handle_info(Info, State) -> + ?LOG_WARNING([{event, unhandled_info}, {module, ?MODULE}, {info, Info}]), + {noreply, State}. + +terminate(Reason, State) -> + ?LOG_INFO([{event, terminate}, {module, ?MODULE}, + {reason, Reason}, {name, name(State#state.store_id)}, + {store_id, State#state.store_id}]), + ok. + +do_prepare_entropy(State) -> + #state{ + cursor = Start, range_start = RangeStart, range_end = RangeEnd, + packing = {replica_2_9, RewardAddr}, + store_id = StoreID, repack_cursor = RepackCursor, slice_index = PreviousSliceIndex + } = State, + + BucketEndOffset = ar_chunk_storage:get_chunk_bucket_end(Start), + PaddedRangeEnd = ar_chunk_storage:get_chunk_bucket_end(RangeEnd), + + %% Sanity checks: + BucketEndOffset = ar_chunk_storage:get_chunk_bucket_end(BucketEndOffset), + true = ( + ar_chunk_storage:get_chunk_bucket_start(ar_block:get_chunk_padded_offset(Start)) == + ar_chunk_storage:get_chunk_bucket_start(BucketEndOffset) + ), + true = ( + max(0, BucketEndOffset - ?DATA_CHUNK_SIZE) == + ar_chunk_storage:get_chunk_bucket_start(BucketEndOffset) + ), + %% End of sanity checks. + + SliceIndex = ar_replica_2_9:get_slice_index(BucketEndOffset), + case SliceIndex of + _ when SliceIndex /= PreviousSliceIndex -> + %% Whenever the slice changes BucketEndOffset might be an offset that was + %% written to in a previous iteration. Furthermore it's possible (though unlikely), + %% that the write is still in process. So to make sure our "is recorded" checks + %% below consider all pending writes, we'll wait for the entropy storage process + %% to complete before proceeding. + %% + %% In practice we only expect pending writes to be a problem in tests. It can + %% hypothetically happen in production but is unlikely. + ?LOG_DEBUG([{event, prepare_replica_2_9_slice_changed}, {store_id, StoreID}, + {bucket_end_offset, BucketEndOffset}, + {previous_slice_index, PreviousSliceIndex}, + {slice_index, SliceIndex}]), + ar_entropy_storage:is_ready(StoreID); + _ -> + ok + end, + + CheckRangeEnd = + case BucketEndOffset > PaddedRangeEnd of + true -> + ar_device_lock:release_lock(prepare, StoreID), + ?LOG_INFO([{event, storage_module_entropy_preparation_complete}, + {store_id, StoreID}]), + ar:console("The storage module ~s is prepared for 2.9 replication.~n", + [StoreID]), + ar_chunk_storage:set_entropy_context(StoreID, {true, RewardAddr}), + complete; + false -> + false + end, + + Start2 = BucketEndOffset + ?DATA_CHUNK_SIZE, + State2 = State#state{ cursor = Start2, slice_index = SliceIndex }, + CheckRepackCursor = + case CheckRangeEnd of + complete -> + complete; + false -> + case RepackCursor of + none -> + false; + _ -> + SectorSize = ar_replica_2_9:get_sector_size(), + RangeStart2 = + ar_chunk_storage:get_chunk_bucket_start(RangeStart + 1), + RepackCursor2 = + ar_chunk_storage:get_chunk_bucket_start(RepackCursor + 1), + RepackSectorShift = (RepackCursor2 - RangeStart2) rem SectorSize, + SectorShift = (BucketEndOffset - RangeStart2) rem SectorSize, + case SectorShift > RepackSectorShift of + true -> + waiting_for_repack; + false -> + false + end + end + end, + CheckIsRecorded = + case CheckRepackCursor of + complete -> + complete; + waiting_for_repack -> + waiting_for_repack; + false -> + ar_entropy_storage:is_entropy_recorded(BucketEndOffset, StoreID) + end, + + %% get_entropy_partition will use bucket *start* offset to determine the partition. + Partition = ar_replica_2_9:get_entropy_partition(BucketEndOffset), + StoreEntropy = + case CheckIsRecorded of + complete -> + complete; + waiting_for_repack -> + waiting_for_repack; + true -> + is_recorded; + false -> + %% Get all the entropies needed to encipher the chunk at BucketEndOffset. + Entropies = prometheus_histogram:observe_duration( + replica_2_9_entropy_duration_milliseconds, [32], + fun() -> + generate_entropies(RewardAddr, BucketEndOffset) + end), + case Entropies of + {error, Reason} -> + {error, Reason}; + _ -> + EntropyKeys = generate_entropy_keys(RewardAddr, BucketEndOffset), + + %% A set of generated entropies covers slighly more than 3.6TB of + %% chunks, however we only want to use the first 3.6TB + %% (+ chunk padding) of it. + PartitionEnd = (Partition + 1) * ?PARTITION_SIZE, + PaddedPartitionEnd = + ar_chunk_storage:get_chunk_bucket_end( + ar_block:get_chunk_padded_offset(PartitionEnd)), + %% In addition to limiting this iteration to the PaddedPartitionEnd, + %% we also want to limit it to the current storage module's range. + %% This allows us to handle both the storage module range as well + %% as the small overlap region. + IterationEnd = min(PaddedPartitionEnd, RangeEnd), + %% Wait for the previous store_entropy to complete. Should only + %% return 'false' if the entropy storage process is down (e.g. during + %% shutdown) + case ar_entropy_storage:is_ready(StoreID) of + true -> + ar_entropy_storage:store_entropy( + StoreID, Entropies, BucketEndOffset, + IterationEnd, EntropyKeys, RewardAddr); + false -> + {error, entropy_storage_not_ready} + end + end + end, + ?LOG_DEBUG([{event, stored_entropy}, {store_id, StoreID}, + {start, Start}, {bucket_end_offset, BucketEndOffset}, + {slice_index, ar_replica_2_9:get_slice_index(BucketEndOffset)}, + {range_start, RangeStart}, {range_end, RangeEnd}, + {partition, Partition}, + {repack_cursor, RepackCursor}, + {padded_range_end, PaddedRangeEnd}, + {check_is_recorded, CheckIsRecorded}, {store_entropy, StoreEntropy}]), + case StoreEntropy of + complete -> + State#state{ prepare_status = complete }; + waiting_for_repack -> + ?LOG_INFO([{event, waiting_for_repacking}, + {store_id, StoreID}, + {padded_end_offset, BucketEndOffset}, + {repack_cursor, RepackCursor}, + {cursor, Start}, + {range_start, RangeStart}, + {range_end, RangeEnd}]), + ar_util:cast_after(10000, self(), prepare_entropy), + State; + is_recorded -> + gen_server:cast(self(), prepare_entropy), + State2; + {error, Error} -> + ?LOG_WARNING([{event, failed_to_store_entropy}, + {cursor, Start}, + {store_id, StoreID}, + {reason, io_lib:format("~p", [Error])}]), + ar_util:cast_after(500, self(), prepare_entropy), + State; + ok -> + gen_server:cast(self(), prepare_entropy), + case store_cursor(Start2, StoreID) of + ok -> + ok; + {error, Error} -> + ?LOG_WARNING([{event, failed_to_store_prepare_entropy_cursor}, + {chunk_cursor, Start2}, + {store_id, StoreID}, + {reason, io_lib:format("~p", [Error])}]) + end, + State2 + end. + +%% @doc Returns all the entropies needed to encipher the chunk at PaddedEndOffset. +generate_entropies(RewardAddr, PaddedEndOffset) -> + SubChunkSize = ?COMPOSITE_PACKING_SUB_CHUNK_SIZE, + EntropyTasks = lists:map( + fun(Offset) -> + Ref = make_ref(), + ar_packing_server:request_entropy_generation( + Ref, self(), {RewardAddr, PaddedEndOffset, Offset}), + Ref + end, + lists:seq(0, ?DATA_CHUNK_SIZE - SubChunkSize, SubChunkSize) + ), + Entropies = collect_entropies(EntropyTasks, []), + case Entropies of + {error, _Reason} -> + flush_entropy_messages(); + _ -> + ok + end, + Entropies. + +generate_entropy_keys(RewardAddr, Offset) -> + generate_entropy_keys(RewardAddr, Offset, 0). +generate_entropy_keys(_RewardAddr, _Offset, SubChunkStart) + when SubChunkStart == ?DATA_CHUNK_SIZE -> + []; +generate_entropy_keys(RewardAddr, Offset, SubChunkStart) -> + SubChunkSize = ?COMPOSITE_PACKING_SUB_CHUNK_SIZE, + [ar_replica_2_9:get_entropy_key(RewardAddr, Offset, SubChunkStart) + | generate_entropy_keys(RewardAddr, Offset, SubChunkStart + SubChunkSize)]. + +collect_entropies([], Acc) -> + lists:reverse(Acc); +collect_entropies([Ref | Rest], Acc) -> + receive + {entropy_generated, Ref, {error, Reason}} -> + ?LOG_ERROR([{event, failed_to_generate_replica_2_9_entropy}, {error, Reason}]), + {error, Reason}; + {entropy_generated, Ref, Entropy} -> + collect_entropies(Rest, [Entropy | Acc]) + after 60000 -> + ?LOG_ERROR([{event, entropy_generation_timeout}, {ref, Ref}]), + {error, timeout} + end. + +flush_entropy_messages() -> + ?LOG_INFO([{event, flush_entropy_messages}]), + receive + {entropy_generated, _, _} -> + flush_entropy_messages() + after 0 -> + ok + end. + + + +read_cursor(StoreID, Default) -> + Filepath = ar_chunk_storage:get_filepath("prepare_replica_2_9_cursor", StoreID), + case file:read_file(Filepath) of + {ok, Bin} -> + case catch binary_to_term(Bin) of Cursor when is_integer(Cursor) -> + Cursor; + _ -> + Default + end; + _ -> + Default + end. + +store_cursor(Cursor, StoreID) -> + Filepath = ar_chunk_storage:get_filepath("prepare_replica_2_9_cursor", StoreID), + file:write_file(Filepath, term_to_binary(Cursor)). + +%%%=================================================================== +%%% Tests. +%%%=================================================================== diff --git a/apps/arweave/src/ar_entropy_storage.erl b/apps/arweave/src/ar_entropy_storage.erl index 211df8c58..0577a579d 100644 --- a/apps/arweave/src/ar_entropy_storage.erl +++ b/apps/arweave/src/ar_entropy_storage.erl @@ -2,9 +2,8 @@ -behaviour(gen_server). --export([name/1, is_entropy_packing/1, acquire_semaphore/1, release_semaphore/1, is_ready/1, - is_entropy_recorded/2, delete_record/2, generate_entropies/2, - generate_missing_entropy/2, generate_entropy_keys/2, store_entropy/6, record_chunk/8]). +-export([name/1, acquire_semaphore/1, release_semaphore/1, is_ready/1, + is_entropy_recorded/2, delete_record/2, store_entropy/6, record_chunk/7]). -export([start_link/2, init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]). @@ -22,7 +21,7 @@ %%%=================================================================== %% @doc Start the server. -start_link(Name, StoreID) -> +start_link(Name, {StoreID, _}) -> gen_server:start_link({local, Name}, ?MODULE, StoreID, []). %% @doc Return the name of the server serving the given StoreID. @@ -73,14 +72,6 @@ handle_info(Info, State) -> ?LOG_WARNING([{event, unhandled_info}, {module, ?MODULE}, {info, Info}]), {noreply, State}. --spec is_entropy_packing(ar_chunk_storage:packing()) -> boolean(). -is_entropy_packing(unpacked_padded) -> - true; -is_entropy_packing({replica_2_9, _}) -> - true; -is_entropy_packing(_) -> - false. - %% @doc Return true if the 2.9 entropy with the given offset is recorded. is_entropy_recorded(PaddedEndOffset, StoreID) -> %% Entropy indexing changed between 2.9.0 and 2.9.1. So we'll use a new @@ -131,7 +122,7 @@ delete_record(PaddedEndOffset, StoreID) -> ar_sync_record:delete(BucketStart + ?DATA_CHUNK_SIZE, BucketStart, ID, StoreID). generate_missing_entropy(PaddedEndOffset, RewardAddr) -> - Entropies = generate_entropies(RewardAddr, PaddedEndOffset), + Entropies = ar_entropy_gen:generate_entropies(RewardAddr, PaddedEndOffset), case Entropies of {error, Reason} -> {error, Reason}; @@ -140,60 +131,6 @@ generate_missing_entropy(PaddedEndOffset, RewardAddr) -> take_combined_entropy_by_index(Entropies, EntropyIndex) end. -%% @doc Returns all the entropies needed to encipher the chunk at PaddedEndOffset. -generate_entropies(RewardAddr, PaddedEndOffset) -> - SubChunkSize = ?COMPOSITE_PACKING_SUB_CHUNK_SIZE, - EntropyTasks = lists:map( - fun(Offset) -> - Ref = make_ref(), - ar_packing_server:request_entropy_generation( - Ref, self(), {RewardAddr, PaddedEndOffset, Offset}), - Ref - end, - lists:seq(0, ?DATA_CHUNK_SIZE - SubChunkSize, SubChunkSize) - ), - Entropies = collect_entropies(EntropyTasks, []), - case Entropies of - {error, _Reason} -> - flush_entropy_messages(); - _ -> - ok - end, - Entropies. - -generate_entropy_keys(RewardAddr, Offset) -> - generate_entropy_keys(RewardAddr, Offset, 0). -generate_entropy_keys(_RewardAddr, _Offset, SubChunkStart) - when SubChunkStart == ?DATA_CHUNK_SIZE -> - []; -generate_entropy_keys(RewardAddr, Offset, SubChunkStart) -> - SubChunkSize = ?COMPOSITE_PACKING_SUB_CHUNK_SIZE, - [ar_replica_2_9:get_entropy_key(RewardAddr, Offset, SubChunkStart) - | generate_entropy_keys(RewardAddr, Offset, SubChunkStart + SubChunkSize)]. - -collect_entropies([], Acc) -> - lists:reverse(Acc); -collect_entropies([Ref | Rest], Acc) -> - receive - {entropy_generated, Ref, {error, Reason}} -> - ?LOG_ERROR([{event, failed_to_generate_replica_2_9_entropy}, {error, Reason}]), - {error, Reason}; - {entropy_generated, Ref, Entropy} -> - collect_entropies(Rest, [Entropy | Acc]) - after 60000 -> - ?LOG_ERROR([{event, entropy_generation_timeout}, {ref, Ref}]), - {error, timeout} - end. - -flush_entropy_messages() -> - ?LOG_INFO([{event, flush_entropy_messages}]), - receive - {entropy_generated, _, _} -> - flush_entropy_messages() - after 0 -> - ok - end. - do_store_entropy(_Entropies, BucketEndOffset, RangeEnd, @@ -243,8 +180,8 @@ do_store_entropy(Entropies, end. record_chunk( - PaddedEndOffset, Chunk, RewardAddr, StoreID, - StoreIDLabel, PackingLabel, FileIndex, IsPrepared) -> + PaddedEndOffset, Chunk, StoreID, + StoreIDLabel, PackingLabel, FileIndex, {IsPrepared, RewardAddr}) -> %% Sanity checks true = PaddedEndOffset == ar_block:get_chunk_padded_offset(PaddedEndOffset), %% End sanity checks @@ -568,3 +505,95 @@ assert_reset_entropy_offset(ExpectedShiftedOffset, Offset) -> iolist_to_binary(io_lib:format("Offset: ~p, BucketEndOffset: ~p", [Offset, BucketEndOffset])) ). + + +replica_2_9_test_() -> + {timeout, 20, fun test_replica_2_9/0}. + +test_replica_2_9() -> + RewardAddr = ar_wallet:to_address(ar_wallet:new_keyfile()), + Packing = {replica_2_9, RewardAddr}, + StorageModules = [ + {?PARTITION_SIZE, 0, Packing}, + {?PARTITION_SIZE, 1, Packing} + ], + {ok, Config} = application:get_env(arweave, config), + try + ar_test_node:start(#{ reward_addr => RewardAddr, storage_modules => StorageModules }), + StoreID1 = ar_storage_module:id(lists:nth(1, StorageModules)), + StoreID2 = ar_storage_module:id(lists:nth(2, StorageModules)), + C1 = crypto:strong_rand_bytes(?DATA_CHUNK_SIZE), + %% The replica_2_9 storage does not support updates and three chunks are written + %% into the first partition when the test node is launched. + ?assertEqual({error, already_stored}, + ar_chunk_storage:put(?DATA_CHUNK_SIZE, C1, Packing, StoreID1)), + ?assertEqual({error, already_stored}, + ar_chunk_storage:put(2 * ?DATA_CHUNK_SIZE, C1, Packing, StoreID1)), + ?assertEqual({error, already_stored}, + ar_chunk_storage:put(3 * ?DATA_CHUNK_SIZE, C1, Packing, StoreID1)), + + %% Store the new chunk. + ?assertEqual({ok, {replica_2_9, RewardAddr}}, + ar_chunk_storage:put(4 * ?DATA_CHUNK_SIZE, C1, Packing, StoreID1)), + {ok, P1, _Entropy} = + ar_packing_server:pack_replica_2_9_chunk(RewardAddr, 4 * ?DATA_CHUNK_SIZE, C1), + assert_get(P1, 4 * ?DATA_CHUNK_SIZE, StoreID1), + + assert_get(not_found, 8 * ?DATA_CHUNK_SIZE, StoreID1), + ?assertEqual({ok, {replica_2_9, RewardAddr}}, + ar_chunk_storage:put(8 * ?DATA_CHUNK_SIZE, C1, Packing, StoreID1)), + {ok, P2, _} = + ar_packing_server:pack_replica_2_9_chunk(RewardAddr, 8 * ?DATA_CHUNK_SIZE, C1), + assert_get(P2, 8 * ?DATA_CHUNK_SIZE, StoreID1), + + %% Store chunks in the second partition. + ?assertEqual({ok, {replica_2_9, RewardAddr}}, + ar_chunk_storage:put(12 * ?DATA_CHUNK_SIZE, C1, Packing, StoreID2)), + {ok, P3, Entropy3} = + ar_packing_server:pack_replica_2_9_chunk(RewardAddr, 12 * ?DATA_CHUNK_SIZE, C1), + + assert_get(P3, 12 * ?DATA_CHUNK_SIZE, StoreID2), + ?assertEqual({ok, {replica_2_9, RewardAddr}}, + ar_chunk_storage:put(15 * ?DATA_CHUNK_SIZE, C1, Packing, StoreID2)), + {ok, P4, Entropy4} = + ar_packing_server:pack_replica_2_9_chunk(RewardAddr, 15 * ?DATA_CHUNK_SIZE, C1), + assert_get(P4, 15 * ?DATA_CHUNK_SIZE, StoreID2), + ?assertNotEqual(P3, P4), + ?assertNotEqual(Entropy3, Entropy4), + + ?assertEqual({ok, {replica_2_9, RewardAddr}}, + ar_chunk_storage:put(16 * ?DATA_CHUNK_SIZE, C1, Packing, StoreID2)), + {ok, P5, Entropy5} = + ar_packing_server:pack_replica_2_9_chunk(RewardAddr, 16 * ?DATA_CHUNK_SIZE, C1), + assert_get(P5, 16 * ?DATA_CHUNK_SIZE, StoreID2), + ?assertNotEqual(Entropy4, Entropy5) + after + ok = application:set_env(arweave, config, Config) + end. + + + +assert_get(Expected, Offset) -> + assert_get(Expected, Offset, "default"). + +assert_get(Expected, Offset, StoreID) -> + ExpectedResult = + case Expected of + not_found -> + not_found; + _ -> + {Offset, Expected} + end, + ?assertEqual(ExpectedResult, ar_chunk_storage:get(Offset - 1, StoreID)), + ?assertEqual(ExpectedResult, ar_chunk_storage:get(Offset - 2, StoreID)), + ?assertEqual(ExpectedResult, ar_chunk_storage:get(Offset - ?DATA_CHUNK_SIZE, StoreID)), + ?assertEqual(ExpectedResult, ar_chunk_storage:get(Offset - ?DATA_CHUNK_SIZE + 1, StoreID)), + ?assertEqual(ExpectedResult, ar_chunk_storage:get(Offset - ?DATA_CHUNK_SIZE + 2, StoreID)), + ?assertEqual(ExpectedResult, + ar_chunk_storage:get(Offset - ?DATA_CHUNK_SIZE div 2, StoreID)), + ?assertEqual(ExpectedResult, + ar_chunk_storage:get(Offset - ?DATA_CHUNK_SIZE div 2 + 1, StoreID)), + ?assertEqual(ExpectedResult, + ar_chunk_storage:get(Offset - ?DATA_CHUNK_SIZE div 2 - 1, StoreID)), + ?assertEqual(ExpectedResult, + ar_chunk_storage:get(Offset - ?DATA_CHUNK_SIZE div 3, StoreID)). \ No newline at end of file diff --git a/apps/arweave/src/ar_repack.erl b/apps/arweave/src/ar_repack.erl index b62fc0fbd..08f22cd79 100644 --- a/apps/arweave/src/ar_repack.erl +++ b/apps/arweave/src/ar_repack.erl @@ -1,6 +1,6 @@ -module(ar_repack). --export([read_cursor/3, store_cursor/3, repack/5, chunk_repacked/6]). +-export([read_cursor/3, store_cursor/3, repack/5, chunk_repacked/5]). -include("../include/ar.hrl"). -include("../include/ar_consensus.hrl"). @@ -307,7 +307,7 @@ send_chunk_for_repacking(AbsoluteOffset, ChunkMeta, Args) -> ok end. -chunk_repacked(ChunkArgs, Args, StoreID, FileIndex, IsPrepared, RewardAddr) -> +chunk_repacked(ChunkArgs, Args, StoreID, FileIndex, EntropyContext) -> {Packing, Chunk, Offset, _, ChunkSize} = ChunkArgs, PaddedEndOffset = ar_block:get_chunk_padded_offset(Offset), StartOffset = PaddedEndOffset - ?DATA_CHUNK_SIZE, @@ -331,7 +331,7 @@ chunk_repacked(ChunkArgs, Args, StoreID, FileIndex, IsPrepared, RewardAddr) -> {ok, FileIndex}; {ok, true} -> StoreResults = ar_chunk_storage:store_chunk(PaddedEndOffset, Chunk, Packing, - StoreID, FileIndex, IsPrepared, RewardAddr), + StoreID, FileIndex, EntropyContext), case StoreResults of {ok, FileIndex2, NewPacking} -> ar_sync_record:add_async(repacked_chunk, From 981ded9406c2a8a31b38fe3f2f6a34ea511c5d81 Mon Sep 17 00:00:00 2001 From: James Piechota Date: Sat, 25 Jan 2025 12:35:21 +0000 Subject: [PATCH 2/2] WIP --- apps/arweave/src/ar_chunk_storage.erl | 1 + apps/arweave/src/ar_data_sync.erl | 3 +++ apps/arweave/src/ar_entropy_gen.erl | 4 +++- 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/apps/arweave/src/ar_chunk_storage.erl b/apps/arweave/src/ar_chunk_storage.erl index 1a29646b5..677af8c1c 100644 --- a/apps/arweave/src/ar_chunk_storage.erl +++ b/apps/arweave/src/ar_chunk_storage.erl @@ -554,6 +554,7 @@ terminate(_Reason, #state{ repack_cursor = Cursor, store_id = StoreID, target_packing = TargetPacking }) -> sync_and_close_files(), ar_repack:store_cursor(Cursor, StoreID, TargetPacking), + ar_entropy_gen:set_repack_cursor(StoreID, Cursor), ok. %%%=================================================================== diff --git a/apps/arweave/src/ar_data_sync.erl b/apps/arweave/src/ar_data_sync.erl index d701cb7e3..34c5e1e82 100644 --- a/apps/arweave/src/ar_data_sync.erl +++ b/apps/arweave/src/ar_data_sync.erl @@ -2878,6 +2878,9 @@ write_not_blacklisted_chunk(Offset, ChunkDataKey, Chunk, ChunkSize, DataPath, Pa false -> case put_chunk_data(ChunkDataKey, StoreID, {Chunk, DataPath}) of ok -> + PackingLabel = ar_storage_module:packing_label(Packing), + StoreIDLabel = ar_storage_module:label_by_id(StoreID), + prometheus_counter:inc(chunks_stored, [PackingLabel, StoreIDLabel]), {ok, Packing}; Error -> Error diff --git a/apps/arweave/src/ar_entropy_gen.erl b/apps/arweave/src/ar_entropy_gen.erl index bc5e823ee..dd28c955c 100644 --- a/apps/arweave/src/ar_entropy_gen.erl +++ b/apps/arweave/src/ar_entropy_gen.erl @@ -124,7 +124,7 @@ init({StoreID, Packing}) -> {RangeStart, RangeEnd} = ar_storage_module:get_range(StoreID), Cursor = read_cursor(StoreID, RangeStart + 1), - ?LOG_INFO([{event, read_prepare_replica_2_9_cursor}, {store_id, StoreID}, + ?LOG_INFO([{event, read_entropy_gen_cursor}, {store_id, StoreID}, {cursor, Cursor}, {range_start, RangeStart}, {range_end, RangeEnd}]), PrepareStatus = @@ -136,9 +136,11 @@ init({StoreID, Packing}) -> {packing, ar_serialize:encode_packing(Packing, true)}]), off; {false, _} -> + %% Entropy generation is not complete gen_server:cast(self(), prepare_entropy), paused; _ -> + %% Entropy generation is complete complete end, BucketEndOffset = ar_chunk_storage:get_chunk_bucket_end(Cursor),