Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ar_data_sync: Skip unpacking if the chunk was downloaded from the local peer #637

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion apps/arweave/include/ar_data_sync.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -211,5 +211,7 @@
%% fragmentation.
store_chunk_queue_threshold = ?STORE_CHUNK_QUEUE_FLUSH_SIZE_THRESHOLD,
%% Cache mapping peers to /data_sync_record responses
all_peers_intervals = #{}
all_peers_intervals = #{},
%% List of local peers used to check if we need to skip block verification.
local_peers = []
}).
19 changes: 13 additions & 6 deletions apps/arweave/src/ar_data_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,8 @@ init({"default" = StoreID, _}) ->
weave_size = maps:get(weave_size, StateMap),
disk_pool_cursor = first,
disk_pool_threshold = DiskPoolThreshold,
store_id = StoreID
store_id = StoreID,
local_peers = Config#config.local_peers
},
timer:apply_interval(?REMOVE_EXPIRED_DATA_ROOTS_FREQUENCY_MS, ?MODULE,
remove_expired_disk_pool_data_roots, []),
Expand Down Expand Up @@ -682,18 +683,20 @@ init({StoreID, RepackInPlacePacking}) ->
process_flag(trap_exit, true),
[ok, ok] = ar_events:subscribe([node_state, disksup]),
State = init_kv(StoreID),
{ok, Config} = application:get_env(arweave, config),
State2 = State#sync_data_state{local_peers = Config#config.local_peers},
case RepackInPlacePacking of
none ->
gen_server:cast(self(), process_store_chunk_queue),
{RangeStart, RangeEnd} = ar_storage_module:get_range(StoreID),
State2 = State#sync_data_state{
State3 = State2#sync_data_state{
store_id = StoreID,
range_start = RangeStart,
range_end = RangeEnd
},
{ok, may_be_start_syncing(State2)};
{ok, may_be_start_syncing(State3)};
_ ->
{ok, State}
{ok, State2}
end.

handle_cast({move_data_root_index, Cursor, N}, State) ->
Expand Down Expand Up @@ -1078,6 +1081,7 @@ handle_cast({store_fetched_chunk, Peer, Byte, Proof} = Cast, State) ->
Offset = SeekByte - BlockStartOffset,
ValidateDataPathRuleset = ar_poa:get_data_path_validation_ruleset(BlockStartOffset,
get_merkle_rebase_threshold()),
IsLocalPeer = lists:member(Peer, State#sync_data_state.local_peers),
shizzard marked this conversation as resolved.
Show resolved Hide resolved
case validate_proof(TXRoot, BlockStartOffset, Offset, BlockSize, Proof,
ValidateDataPathRuleset) of
{need_unpacking, AbsoluteOffset, ChunkArgs, VArgs} ->
Expand All @@ -1089,6 +1093,8 @@ handle_cast({store_fetched_chunk, Peer, Byte, Proof} = Cast, State) ->
true ->
decrement_chunk_cache_size(),
{noreply, State};
false when IsLocalPeer ->
process_valid_fetched_chunk(ChunkArgs, Args, State);
shizzard marked this conversation as resolved.
Show resolved Hide resolved
false ->
case ar_packing_server:is_buffer_full() of
true ->
Expand All @@ -1106,8 +1112,9 @@ handle_cast({store_fetched_chunk, Peer, Byte, Proof} = Cast, State) ->
{AbsoluteOffset, unpacked}}),
{noreply, State#sync_data_state{
packing_map = PackingMap#{
{AbsoluteOffset, unpacked} => {unpack_fetched_chunk,
Args} } }}
{AbsoluteOffset, unpacked} => {unpack_fetched_chunk, Args}
}
}}
end
end;
false ->
Expand Down
21 changes: 15 additions & 6 deletions apps/arweave/src/ar_http_iface_middleware.erl
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ handle(<<"GET">>, [<<"recent">>], Req, _Pid) ->
true ->
{200, #{}, ar_serialize:jsonify(ar_info:get_recent()), Req}
end;

handle(<<"GET">>, [<<"is_tx_blacklisted">>, EncodedTXID], Req, _Pid) ->
case ar_util:safe_decode(EncodedTXID) of
{error, invalid} ->
Expand Down Expand Up @@ -2008,13 +2008,19 @@ handle_get_chunk(OffsetBinary, Req, Encoding) ->
{Packing, ok};
{{true, _}, _StoreID} ->
{ok, Config} = application:get_env(arweave, config),
case lists:member(pack_served_chunks, Config#config.enable) of
false ->
{none, {reply, {404, #{}, <<>>, Req}}};
true ->
IsPackServedChunks = lists:member(pack_served_chunks, Config#config.enable),
Peer = cowboy_req:peer(Req),
IsLocalPeerAddr = lists:foldl(fun
(_, true) -> true;
(LocalPeer, _) -> is_matching_addr(Peer, LocalPeer)
end, false, Config#config.local_peers),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

local_peers should be all in the same format so I think we can just use lists:member

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually not, that's the problem here.
Cowboy uses standard Erlang address tuple: {{O1, O2, O3, O4}, Port}, while local peers are parsed into five-tuple: {O1, O2, O3, O4, Port}.
Probably should change this, but I'm afraid I can suddenly break something. Should I try?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shizzard unfortunately it looks ar_util:safe_parse_peer and ar_util:parse_peer are used to parse all config peers and they always dump to the 5 element tuple. I agree probably too risky to update that now, but can you log a github issue? Seems like something we should rationalize with cowboy.

FWIW it looks like the parse_peer code is from 7 years ago (!)

case {IsPackServedChunks, IsLocalPeerAddr} of
{true, true} ->
ok = ar_semaphore:acquire(get_and_pack_chunk,
infinity),
{RequestedPacking, ok}
{RequestedPacking, ok};
{_, _} ->
{none, {reply, {404, #{}, <<>>, Req}}}
end
end,
case CheckRecords of
Expand Down Expand Up @@ -2056,6 +2062,9 @@ handle_get_chunk(OffsetBinary, Req, Encoding) ->
{400, #{}, jiffy:encode(#{ error => invalid_offset }), Req}
end.

is_matching_addr({{O1, O2, O3, O4}, _Port1}, {O1, O2, O3, O4, _Port2}) -> true;
is_matching_addr(_CowboyPeer, _LocalPeer) -> false.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't really have a good place for peer-related utilities. But there are a few in ar_util. I think is_matching_addr and maybe even the local_peers fold could live there with the other peer utilities (e.g. maybe ar_util:peer_addr_is_member or something)


handle_get_chunk_proof(OffsetBinary, Req, Encoding) ->
case catch binary_to_integer(OffsetBinary) of
Offset when is_integer(Offset) ->
Expand Down
Loading