From 31a41daadc32b3f282619c435f29110c44b516bf Mon Sep 17 00:00:00 2001 From: idk Date: Tue, 30 May 2023 02:02:00 +0000 Subject: [PATCH] make every possible I2NP message processor use a separate virtual replay context --- .../i2p/router/message/HandleGarlicMessageJob.java | 4 ++-- .../net/i2p/router/message/SendMessageDirectJob.java | 12 ++++++++++++ .../networkdb/HandleDatabaseLookupMessageJob.java | 8 +++++--- .../i2p/router/networkdb/kademlia/ExploreJob.java | 4 ++-- .../FloodfillDatabaseLookupMessageHandler.java | 5 ++++- .../FloodfillDatabaseStoreMessageHandler.java | 5 ++++- .../HandleFloodfillDatabaseLookupMessageJob.java | 4 ++-- .../HandleFloodfillDatabaseStoreMessageJob.java | 12 +++++++----- .../net/i2p/router/networkdb/kademlia/SearchJob.java | 6 ++++-- .../router/networkdb/kademlia/StartExplorersJob.java | 5 ++++- .../i2p/router/tunnel/InboundMessageDistributor.java | 4 +++- 11 files changed, 49 insertions(+), 20 deletions(-) diff --git a/router/java/src/net/i2p/router/message/HandleGarlicMessageJob.java b/router/java/src/net/i2p/router/message/HandleGarlicMessageJob.java index 94a356f4e7..bfc66d3401 100644 --- a/router/java/src/net/i2p/router/message/HandleGarlicMessageJob.java +++ b/router/java/src/net/i2p/router/message/HandleGarlicMessageJob.java @@ -81,7 +81,7 @@ public void handleClove(DeliveryInstructions instructions, I2NPMessage data) { if (_log.shouldLog(Log.DEBUG)) _log.debug("local delivery instructions for clove: " + data); // Here we are adding the message to the InNetMessagePool and it is Local. Xor the messageID with - // a long unique to the router/session. + // a long unique to the router/session. getContext().inNetMessagePool().add(data, null, null, _msgIDBloomXorLocal); return; case DeliveryInstructions.DELIVERY_MODE_DESTINATION: @@ -105,7 +105,7 @@ public void handleClove(DeliveryInstructions instructions, I2NPMessage data) { // where the message will be added to the InNetMessagePool(see SendMessageDirectJob 159-179) SendMessageDirectJob j = new SendMessageDirectJob(getContext(), data, instructions.getRouter(), - 10*1000, ROUTER_PRIORITY); + 10*1000, ROUTER_PRIORITY, _msgIDBloomXorRouter); // run it inline (adds to the outNetPool if it has the router info, otherwise queue a lookup) j.runJob(); //getContext().jobQueue().addJob(j); diff --git a/router/java/src/net/i2p/router/message/SendMessageDirectJob.java b/router/java/src/net/i2p/router/message/SendMessageDirectJob.java index 5436bdc3a7..3a8caf5c96 100644 --- a/router/java/src/net/i2p/router/message/SendMessageDirectJob.java +++ b/router/java/src/net/i2p/router/message/SendMessageDirectJob.java @@ -70,6 +70,18 @@ public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, this(ctx, message, toPeer, null, onSuccess, onFail, selector, timeoutMs, priority, 0); } + /** + * @param toPeer may be ourselves + * @param onSuccess may be null + * @param onFail may be null + * @param selector be null + * @param msgIdBloomXor value to xor the messageID with before passing to the InNetMessagePool, may be 0 + */ + public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, ReplyJob onSuccess, + Job onFail, MessageSelector selector, int timeoutMs, int priority, long msgIDBloomXor) { + this(ctx, message, toPeer, null, onSuccess, onFail, selector, timeoutMs, priority, msgIDBloomXor); + } + /** * @param toPeer may be ourselves * @param onSend may be null diff --git a/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java b/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java index 309c10df56..ec65168a66 100644 --- a/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java +++ b/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java @@ -45,6 +45,7 @@ public class HandleDatabaseLookupMessageJob extends JobImpl { private final DatabaseLookupMessage _message; private boolean _replyKeyConsumed; private final Hash _us; + private final long _msgIDBloomXor; private final static int MAX_ROUTERS_RETURNED = 3; private final static int CLOSENESS_THRESHOLD = 8; // FNDF.MAX_TO_FLOOD + 1 @@ -57,11 +58,12 @@ public class HandleDatabaseLookupMessageJob extends JobImpl { */ public final static long EXPIRE_DELAY = 60*60*1000; - public HandleDatabaseLookupMessageJob(RouterContext ctx, DatabaseLookupMessage receivedMessage, RouterIdentity from, Hash fromHash) { + public HandleDatabaseLookupMessageJob(RouterContext ctx, DatabaseLookupMessage receivedMessage, RouterIdentity from, Hash fromHash, long msgIDBloomXor) { super(ctx); _log = ctx.logManager().getLog(HandleDatabaseLookupMessageJob.class); _message = receivedMessage; _us = ctx.routerHash(); + _msgIDBloomXor = msgIDBloomXor; } protected boolean answerAllQueries() { return false; } @@ -295,7 +297,7 @@ protected void sendMessage(I2NPMessage message, Hash toPeer, TunnelId replyTunne } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending reply directly to " + toPeer); - Job send = new SendMessageDirectJob(getContext(), message, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY); + Job send = new SendMessageDirectJob(getContext(), message, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY, _msgIDBloomXor); send.runJob(); //getContext().netDb().lookupRouterInfo(toPeer, send, null, REPLY_TIMEOUT); } @@ -338,7 +340,7 @@ private void sendThroughTunnel(I2NPMessage message, Hash toPeer, TunnelId replyT m.setMessage(message); m.setMessageExpiration(message.getMessageExpiration()); m.setTunnelId(replyTunnel); - SendMessageDirectJob j = new SendMessageDirectJob(getContext(), m, toPeer, 10*1000, MESSAGE_PRIORITY); + SendMessageDirectJob j = new SendMessageDirectJob(getContext(), m, toPeer, 10*1000, MESSAGE_PRIORITY, _msgIDBloomXor); j.runJob(); //getContext().jobQueue().addJob(j); } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/ExploreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/ExploreJob.java index 9ff5ecfd39..7ef93db6ca 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/ExploreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/ExploreJob.java @@ -56,11 +56,11 @@ class ExploreJob extends SearchJob { * @param isRealExplore if true, a standard exploration (no floodfills will be returned) * if false, a standard lookup (floodfills will be returned, use if low on floodfills) */ - public ExploreJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Hash key, boolean isRealExplore) { + public ExploreJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Hash key, boolean isRealExplore, long msgIDBloomXor) { // note that we're treating the last param (isLease) as *false* since we're just exploring. // if this collides with an actual leaseSet's key, neat, but that wouldn't imply we're actually // attempting to send that lease a message! - super(context, facade, key, null, null, MAX_EXPLORE_TIME, false, false); + super(context, facade, key, null, null, MAX_EXPLORE_TIME, false, false, msgIDBloomXor); _peerSelector = (FloodfillPeerSelector) (_facade.getPeerSelector()); _isRealExplore = isRealExplore; } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillDatabaseLookupMessageHandler.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillDatabaseLookupMessageHandler.java index 944891c461..851c9ad9fe 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillDatabaseLookupMessageHandler.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillDatabaseLookupMessageHandler.java @@ -8,6 +8,8 @@ * */ +import java.util.Random; + import net.i2p.data.Hash; import net.i2p.data.router.RouterIdentity; import net.i2p.data.i2np.DatabaseLookupMessage; @@ -25,6 +27,7 @@ public class FloodfillDatabaseLookupMessageHandler implements HandlerJobBuilder private RouterContext _context; private FloodfillNetworkDatabaseFacade _facade; private Log _log; + private final long _msgIDBloomXor = new Random().nextLong(); public FloodfillDatabaseLookupMessageHandler(RouterContext context, FloodfillNetworkDatabaseFacade facade) { _context = context; @@ -47,7 +50,7 @@ public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash from DatabaseLookupMessage dlm = (DatabaseLookupMessage)receivedMessage; if (!_facade.shouldThrottleLookup(dlm.getFrom(), dlm.getReplyTunnel())) { - Job j = new HandleFloodfillDatabaseLookupMessageJob(_context, dlm, from, fromHash); + Job j = new HandleFloodfillDatabaseLookupMessageJob(_context, dlm, from, fromHash, _msgIDBloomXor); //if (false) { // // might as well inline it, all the heavy lifting is queued up in later jobs, if necessary // j.runJob(); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillDatabaseStoreMessageHandler.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillDatabaseStoreMessageHandler.java index 37ce81bf75..0cc170a906 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillDatabaseStoreMessageHandler.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillDatabaseStoreMessageHandler.java @@ -8,6 +8,8 @@ * */ +import java.util.Random; + import net.i2p.data.Hash; import net.i2p.data.router.RouterIdentity; import net.i2p.data.i2np.DatabaseStoreMessage; @@ -23,6 +25,7 @@ public class FloodfillDatabaseStoreMessageHandler implements HandlerJobBuilder { private RouterContext _context; private FloodfillNetworkDatabaseFacade _facade; + private final long _msgIDBloomXor = new Random().nextLong(); public FloodfillDatabaseStoreMessageHandler(RouterContext context, FloodfillNetworkDatabaseFacade facade) { _context = context; @@ -35,7 +38,7 @@ public FloodfillDatabaseStoreMessageHandler(RouterContext context, FloodfillNetw } public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash fromHash) { - Job j = new HandleFloodfillDatabaseStoreMessageJob(_context, (DatabaseStoreMessage)receivedMessage, from, fromHash, _facade); + Job j = new HandleFloodfillDatabaseStoreMessageJob(_context, (DatabaseStoreMessage)receivedMessage, from, fromHash, _facade, _msgIDBloomXor); if (false) { j.runJob(); return null; diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/HandleFloodfillDatabaseLookupMessageJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/HandleFloodfillDatabaseLookupMessageJob.java index eceb4b576e..1fbca44a72 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/HandleFloodfillDatabaseLookupMessageJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/HandleFloodfillDatabaseLookupMessageJob.java @@ -26,8 +26,8 @@ * */ public class HandleFloodfillDatabaseLookupMessageJob extends HandleDatabaseLookupMessageJob { - public HandleFloodfillDatabaseLookupMessageJob(RouterContext ctx, DatabaseLookupMessage receivedMessage, RouterIdentity from, Hash fromHash) { - super(ctx, receivedMessage, from, fromHash); + public HandleFloodfillDatabaseLookupMessageJob(RouterContext ctx, DatabaseLookupMessage receivedMessage, RouterIdentity from, Hash fromHash, long msgIDBloomXor) { + super(ctx, receivedMessage, from, fromHash, msgIDBloomXor); } /** diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/HandleFloodfillDatabaseStoreMessageJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/HandleFloodfillDatabaseStoreMessageJob.java index 848ce7caca..98486bfb4c 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/HandleFloodfillDatabaseStoreMessageJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/HandleFloodfillDatabaseStoreMessageJob.java @@ -50,19 +50,21 @@ class HandleFloodfillDatabaseStoreMessageJob extends JobImpl { // must be lower than LIMIT_ROUTERS in StartExplorersJob // because exploration does not register a reply job private static final int LIMIT_ROUTERS = SystemVersion.isSlow() ? 1000 : 4000; + private final long _msgIDBloomXor; /** * @param receivedMessage must never have reply token set if it came down a tunnel */ public HandleFloodfillDatabaseStoreMessageJob(RouterContext ctx, DatabaseStoreMessage receivedMessage, RouterIdentity from, Hash fromHash, - FloodfillNetworkDatabaseFacade facade) { + FloodfillNetworkDatabaseFacade facade, long msgIDBloomXor) { super(ctx); _log = ctx.logManager().getLog(getClass()); _message = receivedMessage; _from = from; _fromHash = fromHash; _facade = facade; + _msgIDBloomXor = msgIDBloomXor; } public void runJob() { @@ -417,10 +419,10 @@ private void sendAck(Hash storedKey) { return; } if (toUs) { - Job send = new SendMessageDirectJob(getContext(), msg, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY); + Job send = new SendMessageDirectJob(getContext(), msg, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY, _msgIDBloomXor); send.runJob(); if (msg2 != null) { - Job send2 = new SendMessageDirectJob(getContext(), msg2, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY); + Job send2 = new SendMessageDirectJob(getContext(), msg2, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY, _msgIDBloomXor); send2.runJob(); } return; @@ -492,10 +494,10 @@ private void sendAck(Hash storedKey) { out2 = tgm2; } } - Job send = new SendMessageDirectJob(getContext(), out1, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY); + Job send = new SendMessageDirectJob(getContext(), out1, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY, _msgIDBloomXor); send.runJob(); if (msg2 != null) { - Job send2 = new SendMessageDirectJob(getContext(), out2, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY); + Job send2 = new SendMessageDirectJob(getContext(), out2, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY, _msgIDBloomXor); send2.runJob(); } return; diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java index b6d79eeef0..f3dc8074d2 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java @@ -56,6 +56,7 @@ class SearchJob extends JobImpl { private long _startedOn; private boolean _floodfillPeersExhausted; private int _floodfillSearchesOutstanding; + private final long _msgIDBloomXor; private static final int SEARCH_BREDTH = 3; // 10 peers at a time /** only send the 10 closest "dont tell me about" refs */ @@ -91,7 +92,7 @@ class SearchJob extends JobImpl { * */ public SearchJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Hash key, - Job onSuccess, Job onFailure, long timeoutMs, boolean keepStats, boolean isLease) { + Job onSuccess, Job onFailure, long timeoutMs, boolean keepStats, boolean isLease, long msgIDBloomXor) { super(context); if ( (key == null) || (key.getData() == null) ) throw new IllegalArgumentException("Search for null key?"); @@ -107,6 +108,7 @@ public SearchJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Ha _peerSelector = facade.getPeerSelector(); _startedOn = -1; _expiration = getContext().clock().now() + timeoutMs; + _msgIDBloomXor = msgIDBloomXor; getContext().statManager().addRateData("netDb.searchCount", 1); if (_log.shouldLog(Log.DEBUG)) _log.debug("Search (" + getClass().getName() + " for " + key, new Exception("Search enqueued by")); @@ -500,7 +502,7 @@ protected void sendRouterSearch(RouterInfo router) { SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this); SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, to, reply, new FailedJob(getContext(), router), sel, timeout, - OutNetMessage.PRIORITY_EXPLORATORY); + OutNetMessage.PRIORITY_EXPLORATORY, _msgIDBloomXor); if (FloodfillNetworkDatabaseFacade.isFloodfill(router)) _floodfillSearchesOutstanding++; j.runJob(); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StartExplorersJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/StartExplorersJob.java index 738078c130..7663e06a98 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/StartExplorersJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StartExplorersJob.java @@ -9,6 +9,7 @@ */ import java.util.HashSet; +import java.util.Random; import java.util.Set; import net.i2p.data.Hash; @@ -57,6 +58,8 @@ class StartExplorersJob extends JobImpl { private static final long MAX_LAG = 100; private static final long MAX_MSG_DELAY = 1500; + + private final long _msgIDBloomXor = new Random().nextLong(); public StartExplorersJob(RouterContext context, KademliaNetworkDatabaseFacade facade) { super(context); @@ -99,7 +102,7 @@ else if (count < LOW_ROUTERS) // This is very effective so we don't need to do it often boolean realexpl = !((needffs && getContext().random().nextInt(2) == 0) || (lowffs && getContext().random().nextInt(4) == 0)); - ExploreJob j = new ExploreJob(getContext(), _facade, key, realexpl); + ExploreJob j = new ExploreJob(getContext(), _facade, key, realexpl, _msgIDBloomXor); if (delay > 0) j.getTiming().setStartAfter(getContext().clock().now() + delay); getContext().jobQueue().addJob(j); diff --git a/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java b/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java index b506b5b7c7..6214b83141 100644 --- a/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java +++ b/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java @@ -1,5 +1,7 @@ package net.i2p.router.tunnel; +import java.util.Random; + import net.i2p.data.DatabaseEntry; import net.i2p.data.Hash; import net.i2p.data.LeaseSet; @@ -56,7 +58,7 @@ public InboundMessageDistributor(RouterContext ctx, Hash client) { _msgIdBloomXor = clienttps.getMsgIdBloomXor(); } else { _clientNickname = "NULL/Expl"; - _msgIdBloomXor = 0; + _msgIdBloomXor = new Random().nextLong(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Initializing null or exploratory InboundMessageDistributor"); }