Skip to content

Commit

Permalink
make every possible I2NP message processor use a separate virtual rep…
Browse files Browse the repository at this point in the history
…lay context
  • Loading branch information
eyedeekay committed May 30, 2023
1 parent 6d24b55 commit 31a41da
Show file tree
Hide file tree
Showing 11 changed files with 49 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void handleClove(DeliveryInstructions instructions, I2NPMessage data) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("local delivery instructions for clove: " + data);
// Here we are adding the message to the InNetMessagePool and it is Local. Xor the messageID with
// a long unique to the router/session.
// a long unique to the router/session.
getContext().inNetMessagePool().add(data, null, null, _msgIDBloomXorLocal);
return;
case DeliveryInstructions.DELIVERY_MODE_DESTINATION:
Expand All @@ -105,7 +105,7 @@ public void handleClove(DeliveryInstructions instructions, I2NPMessage data) {
// where the message will be added to the InNetMessagePool(see SendMessageDirectJob 159-179)
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), data,
instructions.getRouter(),
10*1000, ROUTER_PRIORITY);
10*1000, ROUTER_PRIORITY, _msgIDBloomXorRouter);
// run it inline (adds to the outNetPool if it has the router info, otherwise queue a lookup)
j.runJob();
//getContext().jobQueue().addJob(j);
Expand Down
12 changes: 12 additions & 0 deletions router/java/src/net/i2p/router/message/SendMessageDirectJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer,
this(ctx, message, toPeer, null, onSuccess, onFail, selector, timeoutMs, priority, 0);
}

/**
* @param toPeer may be ourselves
* @param onSuccess may be null
* @param onFail may be null
* @param selector be null
* @param msgIdBloomXor value to xor the messageID with before passing to the InNetMessagePool, may be 0
*/
public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, ReplyJob onSuccess,
Job onFail, MessageSelector selector, int timeoutMs, int priority, long msgIDBloomXor) {
this(ctx, message, toPeer, null, onSuccess, onFail, selector, timeoutMs, priority, msgIDBloomXor);
}

/**
* @param toPeer may be ourselves
* @param onSend may be null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
private final DatabaseLookupMessage _message;
private boolean _replyKeyConsumed;
private final Hash _us;
private final long _msgIDBloomXor;

private final static int MAX_ROUTERS_RETURNED = 3;
private final static int CLOSENESS_THRESHOLD = 8; // FNDF.MAX_TO_FLOOD + 1
Expand All @@ -57,11 +58,12 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
*/
public final static long EXPIRE_DELAY = 60*60*1000;

public HandleDatabaseLookupMessageJob(RouterContext ctx, DatabaseLookupMessage receivedMessage, RouterIdentity from, Hash fromHash) {
public HandleDatabaseLookupMessageJob(RouterContext ctx, DatabaseLookupMessage receivedMessage, RouterIdentity from, Hash fromHash, long msgIDBloomXor) {
super(ctx);
_log = ctx.logManager().getLog(HandleDatabaseLookupMessageJob.class);
_message = receivedMessage;
_us = ctx.routerHash();
_msgIDBloomXor = msgIDBloomXor;
}

protected boolean answerAllQueries() { return false; }
Expand Down Expand Up @@ -295,7 +297,7 @@ protected void sendMessage(I2NPMessage message, Hash toPeer, TunnelId replyTunne
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending reply directly to " + toPeer);
Job send = new SendMessageDirectJob(getContext(), message, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY);
Job send = new SendMessageDirectJob(getContext(), message, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY, _msgIDBloomXor);
send.runJob();
//getContext().netDb().lookupRouterInfo(toPeer, send, null, REPLY_TIMEOUT);
}
Expand Down Expand Up @@ -338,7 +340,7 @@ private void sendThroughTunnel(I2NPMessage message, Hash toPeer, TunnelId replyT
m.setMessage(message);
m.setMessageExpiration(message.getMessageExpiration());
m.setTunnelId(replyTunnel);
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), m, toPeer, 10*1000, MESSAGE_PRIORITY);
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), m, toPeer, 10*1000, MESSAGE_PRIORITY, _msgIDBloomXor);
j.runJob();
//getContext().jobQueue().addJob(j);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ class ExploreJob extends SearchJob {
* @param isRealExplore if true, a standard exploration (no floodfills will be returned)
* if false, a standard lookup (floodfills will be returned, use if low on floodfills)
*/
public ExploreJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Hash key, boolean isRealExplore) {
public ExploreJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Hash key, boolean isRealExplore, long msgIDBloomXor) {
// note that we're treating the last param (isLease) as *false* since we're just exploring.
// if this collides with an actual leaseSet's key, neat, but that wouldn't imply we're actually
// attempting to send that lease a message!
super(context, facade, key, null, null, MAX_EXPLORE_TIME, false, false);
super(context, facade, key, null, null, MAX_EXPLORE_TIME, false, false, msgIDBloomXor);
_peerSelector = (FloodfillPeerSelector) (_facade.getPeerSelector());
_isRealExplore = isRealExplore;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
*
*/

import java.util.Random;

import net.i2p.data.Hash;
import net.i2p.data.router.RouterIdentity;
import net.i2p.data.i2np.DatabaseLookupMessage;
Expand All @@ -25,6 +27,7 @@ public class FloodfillDatabaseLookupMessageHandler implements HandlerJobBuilder
private RouterContext _context;
private FloodfillNetworkDatabaseFacade _facade;
private Log _log;
private final long _msgIDBloomXor = new Random().nextLong();

public FloodfillDatabaseLookupMessageHandler(RouterContext context, FloodfillNetworkDatabaseFacade facade) {
_context = context;
Expand All @@ -47,7 +50,7 @@ public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash from

DatabaseLookupMessage dlm = (DatabaseLookupMessage)receivedMessage;
if (!_facade.shouldThrottleLookup(dlm.getFrom(), dlm.getReplyTunnel())) {
Job j = new HandleFloodfillDatabaseLookupMessageJob(_context, dlm, from, fromHash);
Job j = new HandleFloodfillDatabaseLookupMessageJob(_context, dlm, from, fromHash, _msgIDBloomXor);
//if (false) {
// // might as well inline it, all the heavy lifting is queued up in later jobs, if necessary
// j.runJob();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
*
*/

import java.util.Random;

import net.i2p.data.Hash;
import net.i2p.data.router.RouterIdentity;
import net.i2p.data.i2np.DatabaseStoreMessage;
Expand All @@ -23,6 +25,7 @@
public class FloodfillDatabaseStoreMessageHandler implements HandlerJobBuilder {
private RouterContext _context;
private FloodfillNetworkDatabaseFacade _facade;
private final long _msgIDBloomXor = new Random().nextLong();

public FloodfillDatabaseStoreMessageHandler(RouterContext context, FloodfillNetworkDatabaseFacade facade) {
_context = context;
Expand All @@ -35,7 +38,7 @@ public FloodfillDatabaseStoreMessageHandler(RouterContext context, FloodfillNetw
}

public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash fromHash) {
Job j = new HandleFloodfillDatabaseStoreMessageJob(_context, (DatabaseStoreMessage)receivedMessage, from, fromHash, _facade);
Job j = new HandleFloodfillDatabaseStoreMessageJob(_context, (DatabaseStoreMessage)receivedMessage, from, fromHash, _facade, _msgIDBloomXor);
if (false) {
j.runJob();
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
*
*/
public class HandleFloodfillDatabaseLookupMessageJob extends HandleDatabaseLookupMessageJob {
public HandleFloodfillDatabaseLookupMessageJob(RouterContext ctx, DatabaseLookupMessage receivedMessage, RouterIdentity from, Hash fromHash) {
super(ctx, receivedMessage, from, fromHash);
public HandleFloodfillDatabaseLookupMessageJob(RouterContext ctx, DatabaseLookupMessage receivedMessage, RouterIdentity from, Hash fromHash, long msgIDBloomXor) {
super(ctx, receivedMessage, from, fromHash, msgIDBloomXor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,21 @@ class HandleFloodfillDatabaseStoreMessageJob extends JobImpl {
// must be lower than LIMIT_ROUTERS in StartExplorersJob
// because exploration does not register a reply job
private static final int LIMIT_ROUTERS = SystemVersion.isSlow() ? 1000 : 4000;
private final long _msgIDBloomXor;

/**
* @param receivedMessage must never have reply token set if it came down a tunnel
*/
public HandleFloodfillDatabaseStoreMessageJob(RouterContext ctx, DatabaseStoreMessage receivedMessage,
RouterIdentity from, Hash fromHash,
FloodfillNetworkDatabaseFacade facade) {
FloodfillNetworkDatabaseFacade facade, long msgIDBloomXor) {
super(ctx);
_log = ctx.logManager().getLog(getClass());
_message = receivedMessage;
_from = from;
_fromHash = fromHash;
_facade = facade;
_msgIDBloomXor = msgIDBloomXor;
}

public void runJob() {
Expand Down Expand Up @@ -417,10 +419,10 @@ private void sendAck(Hash storedKey) {
return;
}
if (toUs) {
Job send = new SendMessageDirectJob(getContext(), msg, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY);
Job send = new SendMessageDirectJob(getContext(), msg, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY, _msgIDBloomXor);
send.runJob();
if (msg2 != null) {
Job send2 = new SendMessageDirectJob(getContext(), msg2, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY);
Job send2 = new SendMessageDirectJob(getContext(), msg2, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY, _msgIDBloomXor);
send2.runJob();
}
return;
Expand Down Expand Up @@ -492,10 +494,10 @@ private void sendAck(Hash storedKey) {
out2 = tgm2;
}
}
Job send = new SendMessageDirectJob(getContext(), out1, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY);
Job send = new SendMessageDirectJob(getContext(), out1, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY, _msgIDBloomXor);
send.runJob();
if (msg2 != null) {
Job send2 = new SendMessageDirectJob(getContext(), out2, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY);
Job send2 = new SendMessageDirectJob(getContext(), out2, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY, _msgIDBloomXor);
send2.runJob();
}
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class SearchJob extends JobImpl {
private long _startedOn;
private boolean _floodfillPeersExhausted;
private int _floodfillSearchesOutstanding;
private final long _msgIDBloomXor;

private static final int SEARCH_BREDTH = 3; // 10 peers at a time
/** only send the 10 closest "dont tell me about" refs */
Expand Down Expand Up @@ -91,7 +92,7 @@ class SearchJob extends JobImpl {
*
*/
public SearchJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Hash key,
Job onSuccess, Job onFailure, long timeoutMs, boolean keepStats, boolean isLease) {
Job onSuccess, Job onFailure, long timeoutMs, boolean keepStats, boolean isLease, long msgIDBloomXor) {
super(context);
if ( (key == null) || (key.getData() == null) )
throw new IllegalArgumentException("Search for null key?");
Expand All @@ -107,6 +108,7 @@ public SearchJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Ha
_peerSelector = facade.getPeerSelector();
_startedOn = -1;
_expiration = getContext().clock().now() + timeoutMs;
_msgIDBloomXor = msgIDBloomXor;
getContext().statManager().addRateData("netDb.searchCount", 1);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Search (" + getClass().getName() + " for " + key, new Exception("Search enqueued by"));
Expand Down Expand Up @@ -500,7 +502,7 @@ protected void sendRouterSearch(RouterInfo router) {
SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this);
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, to,
reply, new FailedJob(getContext(), router), sel, timeout,
OutNetMessage.PRIORITY_EXPLORATORY);
OutNetMessage.PRIORITY_EXPLORATORY, _msgIDBloomXor);
if (FloodfillNetworkDatabaseFacade.isFloodfill(router))
_floodfillSearchesOutstanding++;
j.runJob();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/

import java.util.HashSet;
import java.util.Random;
import java.util.Set;

import net.i2p.data.Hash;
Expand Down Expand Up @@ -57,6 +58,8 @@ class StartExplorersJob extends JobImpl {

private static final long MAX_LAG = 100;
private static final long MAX_MSG_DELAY = 1500;

private final long _msgIDBloomXor = new Random().nextLong();

public StartExplorersJob(RouterContext context, KademliaNetworkDatabaseFacade facade) {
super(context);
Expand Down Expand Up @@ -99,7 +102,7 @@ else if (count < LOW_ROUTERS)
// This is very effective so we don't need to do it often
boolean realexpl = !((needffs && getContext().random().nextInt(2) == 0) ||
(lowffs && getContext().random().nextInt(4) == 0));
ExploreJob j = new ExploreJob(getContext(), _facade, key, realexpl);
ExploreJob j = new ExploreJob(getContext(), _facade, key, realexpl, _msgIDBloomXor);
if (delay > 0)
j.getTiming().setStartAfter(getContext().clock().now() + delay);
getContext().jobQueue().addJob(j);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package net.i2p.router.tunnel;

import java.util.Random;

import net.i2p.data.DatabaseEntry;
import net.i2p.data.Hash;
import net.i2p.data.LeaseSet;
Expand Down Expand Up @@ -56,7 +58,7 @@ public InboundMessageDistributor(RouterContext ctx, Hash client) {
_msgIdBloomXor = clienttps.getMsgIdBloomXor();
} else {
_clientNickname = "NULL/Expl";
_msgIdBloomXor = 0;
_msgIdBloomXor = new Random().nextLong();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Initializing null or exploratory InboundMessageDistributor");
}
Expand Down

0 comments on commit 31a41da

Please sign in to comment.