From 77f5f77f2a8ea7d322f73fc6bb996e900401e8bd Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Wed, 8 Apr 2020 00:59:18 +0100 Subject: [PATCH 1/2] send heartbeat pulses and request tx using heartbeat --- src/main/java/com/iota/iri/Iota.java | 8 ++- .../iota/iri/MainInjectionConfiguration.java | 32 +++++----- .../com/iota/iri/network/HeartbeatPulse.java | 21 +++++++ .../iota/iri/network/HeartbeatPulseImpl.java | 62 +++++++++++++++++++ .../com/iota/iri/network/NeighborRouter.java | 9 +++ .../iota/iri/network/NeighborRouterImpl.java | 8 +++ .../iri/network/neighbor/NeighborMetrics.java | 12 ++++ .../neighbor/impl/NeighborMetricsImpl.java | 11 ++++ .../iri/network/pipeline/BroadcastStage.java | 11 +++- .../iota/iri/network/protocol/Protocol.java | 1 + .../impl/LocalSnapshotManagerImpl.java | 2 + 11 files changed, 161 insertions(+), 16 deletions(-) create mode 100644 src/main/java/com/iota/iri/network/HeartbeatPulse.java create mode 100644 src/main/java/com/iota/iri/network/HeartbeatPulseImpl.java diff --git a/src/main/java/com/iota/iri/Iota.java b/src/main/java/com/iota/iri/Iota.java index 420cc51c71..2c70f3164f 100644 --- a/src/main/java/com/iota/iri/Iota.java +++ b/src/main/java/com/iota/iri/Iota.java @@ -4,6 +4,7 @@ import com.iota.iri.conf.IotaConfig; import com.iota.iri.controllers.TipsViewModel; import com.iota.iri.controllers.TransactionViewModel; +import com.iota.iri.network.HeartbeatPulse; import com.iota.iri.network.NeighborRouter; import com.iota.iri.network.TipsRequester; import com.iota.iri.network.TransactionRequester; @@ -113,6 +114,7 @@ public class Iota { public LocalSnapshotsPersistenceProvider localSnapshotsDb; public final CacheManager cacheManager; + public final HeartbeatPulse heartbeatPulse; /** * Initializes the latest snapshot and then creates all services needed to run an IOTA node. @@ -130,7 +132,7 @@ public Iota(IotaConfig configuration, SpentAddressesProvider spentAddressesProvi TransactionRequester transactionRequester, NeighborRouter neighborRouter, TransactionProcessingPipeline transactionProcessingPipeline, TipsRequester tipsRequester, TipsViewModel tipsViewModel, TipSelector tipsSelector, LocalSnapshotsPersistenceProvider localSnapshotsDb, - CacheManager cacheManager, TransactionSolidifier transactionSolidifier) { + CacheManager cacheManager, TransactionSolidifier transactionSolidifier, HeartbeatPulse heartbeatPulse) { this.configuration = configuration; this.ledgerService = ledgerService; @@ -160,6 +162,7 @@ public Iota(IotaConfig configuration, SpentAddressesProvider spentAddressesProvi this.tipsSelector = tipsSelector; this.cacheManager = cacheManager; + this.heartbeatPulse = heartbeatPulse; } private void initDependencies() throws SnapshotException, SpentAddressesException { @@ -223,6 +226,8 @@ public void init() throws Exception { if (transactionPruner != null) { transactionPruner.start(); } + + heartbeatPulse.start(); } private void rescanDb() throws Exception { @@ -270,6 +275,7 @@ public void shutdown() throws Exception { localSnapshotManager.shutdown(); } + heartbeatPulse.shutdown(); tipsRequester.shutdown(); txPipeline.shutdown(); neighborRouter.shutdown(); diff --git a/src/main/java/com/iota/iri/MainInjectionConfiguration.java b/src/main/java/com/iota/iri/MainInjectionConfiguration.java index 221256930d..f070457009 100644 --- a/src/main/java/com/iota/iri/MainInjectionConfiguration.java +++ b/src/main/java/com/iota/iri/MainInjectionConfiguration.java @@ -4,9 +4,7 @@ import com.iota.iri.cache.impl.CacheManagerImpl; import com.iota.iri.conf.IotaConfig; import com.iota.iri.controllers.TipsViewModel; -import com.iota.iri.network.NeighborRouter; -import com.iota.iri.network.TipsRequester; -import com.iota.iri.network.TransactionRequester; +import com.iota.iri.network.*; import com.iota.iri.network.pipeline.TransactionProcessingPipeline; import com.iota.iri.service.API; import com.iota.iri.service.ledger.LedgerService; @@ -100,6 +98,12 @@ LatestMilestoneTracker provideLatestMilestoneTracker(Tangle tangle, SnapshotProv return new LatestMilestoneTrackerImpl(tangle, snapshotProvider, milestoneService, milestoneSolidifier, configuration); } + @Singleton + @Provides + HeartbeatPulse providerHeartbeatPulse(NeighborRouter neighborRouter, SnapshotProvider snapshotProvider){ + return new HeartbeatPulseImpl(neighborRouter, snapshotProvider); + } + @Singleton @Provides LatestSolidMilestoneTracker provideLatestSolidMilestoneTracker(Tangle tangle, SnapshotProvider snapshotProvider, @@ -171,21 +175,21 @@ TipSelector provideTipSelector(Tangle tangle, SnapshotProvider snapshotProvider, @Singleton @Provides Iota provideIota(SpentAddressesProvider spentAddressesProvider, SpentAddressesService spentAddressesService, - SnapshotProvider snapshotProvider, SnapshotService snapshotService, - @Nullable LocalSnapshotManager localSnapshotManager, MilestoneService milestoneService, - LatestMilestoneTracker latestMilestoneTracker, LatestSolidMilestoneTracker latestSolidMilestoneTracker, - SeenMilestonesRetriever seenMilestonesRetriever, LedgerService ledgerService, - @Nullable TransactionPruner transactionPruner, MilestoneSolidifier milestoneSolidifier, - BundleValidator bundleValidator, Tangle tangle, TransactionValidator transactionValidator, - TransactionRequester transactionRequester, NeighborRouter neighborRouter, - TransactionProcessingPipeline transactionProcessingPipeline, TipsRequester tipsRequester, - TipsViewModel tipsViewModel, TipSelector tipsSelector, LocalSnapshotsPersistenceProvider localSnapshotsDb, - CacheManager cacheManager, TransactionSolidifier transactionSolidifier) { + SnapshotProvider snapshotProvider, SnapshotService snapshotService, + @Nullable LocalSnapshotManager localSnapshotManager, MilestoneService milestoneService, + LatestMilestoneTracker latestMilestoneTracker, LatestSolidMilestoneTracker latestSolidMilestoneTracker, + SeenMilestonesRetriever seenMilestonesRetriever, LedgerService ledgerService, + @Nullable TransactionPruner transactionPruner, MilestoneSolidifier milestoneSolidifier, + BundleValidator bundleValidator, Tangle tangle, TransactionValidator transactionValidator, + TransactionRequester transactionRequester, NeighborRouter neighborRouter, + TransactionProcessingPipeline transactionProcessingPipeline, TipsRequester tipsRequester, + TipsViewModel tipsViewModel, TipSelector tipsSelector, LocalSnapshotsPersistenceProvider localSnapshotsDb, + CacheManager cacheManager, TransactionSolidifier transactionSolidifier, HeartbeatPulse heartbeatPulse) { return new Iota(configuration, spentAddressesProvider, spentAddressesService, snapshotProvider, snapshotService, localSnapshotManager, milestoneService, latestMilestoneTracker, latestSolidMilestoneTracker, seenMilestonesRetriever, ledgerService, transactionPruner, milestoneSolidifier, bundleValidator, tangle, transactionValidator, transactionRequester, neighborRouter, transactionProcessingPipeline, - tipsRequester, tipsViewModel, tipsSelector, localSnapshotsDb, cacheManager, transactionSolidifier); + tipsRequester, tipsViewModel, tipsSelector, localSnapshotsDb, cacheManager, transactionSolidifier, heartbeatPulse); } @Singleton diff --git a/src/main/java/com/iota/iri/network/HeartbeatPulse.java b/src/main/java/com/iota/iri/network/HeartbeatPulse.java new file mode 100644 index 0000000000..ac5759b6ef --- /dev/null +++ b/src/main/java/com/iota/iri/network/HeartbeatPulse.java @@ -0,0 +1,21 @@ +package com.iota.iri.network; + +/** + * A background worker that sends {@link com.iota.iri.network.protocol.Heartbeat}s to neighbors. + */ +public interface HeartbeatPulse { + /** + * Starts the background worker that calls {@link #sendHeartbeat()} rhythmically. + */ + void start(); + + /** + * Stops the background worker that sends out heartbeats. + */ + void shutdown(); + + /** + * Sends {@link com.iota.iri.network.protocol.Heartbeat} to all neighbors. + */ + void sendHeartbeat(); +} diff --git a/src/main/java/com/iota/iri/network/HeartbeatPulseImpl.java b/src/main/java/com/iota/iri/network/HeartbeatPulseImpl.java new file mode 100644 index 0000000000..1f77896e12 --- /dev/null +++ b/src/main/java/com/iota/iri/network/HeartbeatPulseImpl.java @@ -0,0 +1,62 @@ +package com.iota.iri.network; + +import com.iota.iri.network.neighbor.Neighbor; +import com.iota.iri.network.protocol.Heartbeat; +import com.iota.iri.service.snapshot.SnapshotProvider; +import com.iota.iri.utils.thread.DedicatedScheduledExecutorService; +import com.iota.iri.utils.thread.SilentScheduledExecutorService; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Implementation of {@link HeartbeatPulse} interface. + */ +public class HeartbeatPulseImpl implements HeartbeatPulse { + + private NeighborRouter neighborRouter; + private SnapshotProvider snapshotProvider; + + /** + * The rate in milliseconds, at which heartbeats are sent out. + */ + private static final int HEARTBEAT_RATE_MILLIS = 60000; + + /** + * Holds a reference to the manager of the background worker. + */ + private final SilentScheduledExecutorService executorService = new DedicatedScheduledExecutorService( + "Heartbeat Pulse"); + + public HeartbeatPulseImpl(NeighborRouter neighborRouter, SnapshotProvider snapshotProvider) { + this.neighborRouter = neighborRouter; + this.snapshotProvider = snapshotProvider; + } + + @Override + public void start() { + executorService.silentScheduleWithFixedDelay(this::sendHeartbeat, 0, HEARTBEAT_RATE_MILLIS, + TimeUnit.MILLISECONDS); + } + + @Override + public void shutdown() { + executorService.shutdownNow(); + } + + @Override + public void sendHeartbeat() { + int lastSolidMilestoneIndex = snapshotProvider.getLatestSnapshot().getIndex(); + // The first solid milestone in DB w/o pruning. + int firstSolidMilestoneIndex = 0; // TODO: how do we get this? + + Heartbeat heartbeat = new Heartbeat(); + heartbeat.setFirstSolidMilestoneIndex(firstSolidMilestoneIndex); + heartbeat.setLastSolidMilestoneIndex(lastSolidMilestoneIndex); + + Map currentlyConnectedNeighbors = neighborRouter.getConnectedNeighbors(); + for (Neighbor neighbor : currentlyConnectedNeighbors.values()) { + neighborRouter.gossipHeartbeatTo(neighbor, heartbeat); + } + } +} diff --git a/src/main/java/com/iota/iri/network/NeighborRouter.java b/src/main/java/com/iota/iri/network/NeighborRouter.java index cf64511ed5..e34d0eeac1 100644 --- a/src/main/java/com/iota/iri/network/NeighborRouter.java +++ b/src/main/java/com/iota/iri/network/NeighborRouter.java @@ -5,6 +5,7 @@ import com.iota.iri.network.neighbor.Neighbor; import com.iota.iri.network.pipeline.TransactionProcessingPipeline; import com.iota.iri.network.pipeline.TransactionProcessingPipelineImpl; +import com.iota.iri.network.protocol.Heartbeat; import java.util.List; import java.util.Map; @@ -96,6 +97,14 @@ public interface NeighborRouter { void gossipTransactionTo(Neighbor neighbor, TransactionViewModel tvm, boolean useHashOfTVM) throws Exception; + /** + * Gossips the given heartbeat to the given neighbor. + * + * @param neighbor The {@link Neighbor} to gossip the heartbeat to + * @param heartbeat The {@link Heartbeat} to gossip + */ + void gossipHeartbeatTo(Neighbor neighbor, Heartbeat heartbeat); + /** * Shut downs the {@link NeighborRouter} and all currently open connections. */ diff --git a/src/main/java/com/iota/iri/network/NeighborRouterImpl.java b/src/main/java/com/iota/iri/network/NeighborRouterImpl.java index 301813edc4..b3169069a6 100644 --- a/src/main/java/com/iota/iri/network/NeighborRouterImpl.java +++ b/src/main/java/com/iota/iri/network/NeighborRouterImpl.java @@ -11,6 +11,7 @@ import com.iota.iri.network.pipeline.TransactionProcessingPipeline; import com.iota.iri.network.pipeline.TransactionProcessingPipelineImpl; import com.iota.iri.network.protocol.Handshake; +import com.iota.iri.network.protocol.Heartbeat; import com.iota.iri.network.protocol.Protocol; import com.iota.iri.utils.Converter; @@ -909,6 +910,13 @@ public void gossipTransactionTo(Neighbor neighbor, TransactionViewModel tvm, boo neighbor.getMetrics().incrSentTransactionsCount(); } + @Override + public void gossipHeartbeatTo(Neighbor neighbor, Heartbeat heartbeat) { + ByteBuffer packet = Protocol.createHeartbeatPacket(heartbeat); + neighbor.send(packet); + neighbor.getMetrics().incrSentHeartbeatCount(); + } + @Override public void shutdown() { shutdown.set(true); diff --git a/src/main/java/com/iota/iri/network/neighbor/NeighborMetrics.java b/src/main/java/com/iota/iri/network/neighbor/NeighborMetrics.java index 735f0ec52c..64d3a61708 100644 --- a/src/main/java/com/iota/iri/network/neighbor/NeighborMetrics.java +++ b/src/main/java/com/iota/iri/network/neighbor/NeighborMetrics.java @@ -102,4 +102,16 @@ public interface NeighborMetrics { * @return the number of packets dropped from the neighbor's send queue */ long incrDroppedSendPacketsCount(); + + /** + * Increments the sent heartbeat count + * @return The number of heartbeat that have been sent + */ + long incrSentHeartbeatCount(); + + /** + * Gets the heartbeat count + * @return The heartbeat count + */ + long getHeartbeatCount(); } diff --git a/src/main/java/com/iota/iri/network/neighbor/impl/NeighborMetricsImpl.java b/src/main/java/com/iota/iri/network/neighbor/impl/NeighborMetricsImpl.java index 9b5c71f307..b7f698c662 100644 --- a/src/main/java/com/iota/iri/network/neighbor/impl/NeighborMetricsImpl.java +++ b/src/main/java/com/iota/iri/network/neighbor/impl/NeighborMetricsImpl.java @@ -16,6 +16,7 @@ public class NeighborMetricsImpl implements NeighborMetrics { private AtomicLong sentTxsCount = new AtomicLong(); private AtomicLong newTxsCount = new AtomicLong(); private AtomicLong droppedSendPacketsCount = new AtomicLong(); + private AtomicLong heartbeatCount = new AtomicLong(); @Override public long getAllTransactionsCount() { @@ -86,4 +87,14 @@ public long getDroppedSendPacketsCount() { public long incrDroppedSendPacketsCount() { return droppedSendPacketsCount.incrementAndGet(); } + + @Override + public long incrSentHeartbeatCount() { + return heartbeatCount.incrementAndGet(); + } + + @Override + public long getHeartbeatCount() { + return heartbeatCount.get(); + } } diff --git a/src/main/java/com/iota/iri/network/pipeline/BroadcastStage.java b/src/main/java/com/iota/iri/network/pipeline/BroadcastStage.java index 8d8848f34c..0b12f3ca32 100644 --- a/src/main/java/com/iota/iri/network/pipeline/BroadcastStage.java +++ b/src/main/java/com/iota/iri/network/pipeline/BroadcastStage.java @@ -3,6 +3,7 @@ import com.iota.iri.controllers.TransactionViewModel; import com.iota.iri.network.NeighborRouter; import com.iota.iri.network.neighbor.Neighbor; +import com.iota.iri.network.protocol.Heartbeat; import com.iota.iri.service.validation.TransactionSolidifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +53,15 @@ public ProcessingContext process(ProcessingContext ctx) { continue; } try { - neighborRouter.gossipTransactionTo(neighbor, tvm); + boolean shouldGossip = true; + //neighbor supports STING. Else fall backwards + if(neighbor.getProtocolVersion() >= 2){ + Heartbeat heartbeat = neighbor.heartbeat(); + shouldGossip = tvm.snapshotIndex() >= heartbeat.getFirstSolidMilestoneIndex() && tvm.snapshotIndex() <= heartbeat.getLastSolidMilestoneIndex(); + } + if(shouldGossip){ + neighborRouter.gossipTransactionTo(neighbor, tvm); + } } catch (Exception e) { log.error(e.getMessage()); } diff --git a/src/main/java/com/iota/iri/network/protocol/Protocol.java b/src/main/java/com/iota/iri/network/protocol/Protocol.java index 2e40e986ed..c47c7f0bf9 100644 --- a/src/main/java/com/iota/iri/network/protocol/Protocol.java +++ b/src/main/java/com/iota/iri/network/protocol/Protocol.java @@ -35,6 +35,7 @@ public class Protocol { public final static byte[] SUPPORTED_PROTOCOL_VERSIONS = { /* supports protocol version(s): 1 */ (byte) 0b00000001, + (byte) 0b00000011, }; /** * The amount of bytes dedicated for the message type in the packet header. diff --git a/src/main/java/com/iota/iri/service/snapshot/impl/LocalSnapshotManagerImpl.java b/src/main/java/com/iota/iri/service/snapshot/impl/LocalSnapshotManagerImpl.java index bb8c2963a7..b3d3c0154a 100644 --- a/src/main/java/com/iota/iri/service/snapshot/impl/LocalSnapshotManagerImpl.java +++ b/src/main/java/com/iota/iri/service/snapshot/impl/LocalSnapshotManagerImpl.java @@ -22,6 +22,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + /** *

* Creates a manager for the local snapshots, that takes care of automatically creating local snapshots when the defined From 8a773c9b8fbb198e5e087e2348de992325fb691d Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Tue, 14 Apr 2020 15:08:31 +0100 Subject: [PATCH 2/2] added unit tests --- .../iota/iri/MainInjectionConfiguration.java | 4 +- .../iota/iri/network/HeartbeatPulseImpl.java | 35 ++++++++- .../iota/iri/network/protocol/Protocol.java | 1 - src/main/java/com/iota/iri/service/API.java | 14 ++-- .../snapshot/LocalSnapshotManager.java | 7 ++ .../impl/LocalSnapshotManagerImpl.java | 12 ++- .../network/pipeline/BroadcastStageTest.java | 76 ++++++++++++++++++- 7 files changed, 133 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/iota/iri/MainInjectionConfiguration.java b/src/main/java/com/iota/iri/MainInjectionConfiguration.java index f070457009..d4d8162f78 100644 --- a/src/main/java/com/iota/iri/MainInjectionConfiguration.java +++ b/src/main/java/com/iota/iri/MainInjectionConfiguration.java @@ -100,8 +100,8 @@ LatestMilestoneTracker provideLatestMilestoneTracker(Tangle tangle, SnapshotProv @Singleton @Provides - HeartbeatPulse providerHeartbeatPulse(NeighborRouter neighborRouter, SnapshotProvider snapshotProvider){ - return new HeartbeatPulseImpl(neighborRouter, snapshotProvider); + HeartbeatPulse providerHeartbeatPulse(NeighborRouter neighborRouter, SnapshotProvider snapshotProvider, @Nullable LocalSnapshotManager localSnapshotManager){ + return new HeartbeatPulseImpl(neighborRouter, snapshotProvider, configuration, localSnapshotManager); } @Singleton diff --git a/src/main/java/com/iota/iri/network/HeartbeatPulseImpl.java b/src/main/java/com/iota/iri/network/HeartbeatPulseImpl.java index 1f77896e12..1bb6deafc2 100644 --- a/src/main/java/com/iota/iri/network/HeartbeatPulseImpl.java +++ b/src/main/java/com/iota/iri/network/HeartbeatPulseImpl.java @@ -1,14 +1,20 @@ package com.iota.iri.network; +import com.iota.iri.conf.SnapshotConfig; import com.iota.iri.network.neighbor.Neighbor; import com.iota.iri.network.protocol.Heartbeat; +import com.iota.iri.service.snapshot.LocalSnapshotManager; import com.iota.iri.service.snapshot.SnapshotProvider; +import com.iota.iri.service.transactionpruning.TransactionPruningException; import com.iota.iri.utils.thread.DedicatedScheduledExecutorService; import com.iota.iri.utils.thread.SilentScheduledExecutorService; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Implementation of {@link HeartbeatPulse} interface. */ @@ -16,6 +22,10 @@ public class HeartbeatPulseImpl implements HeartbeatPulse { private NeighborRouter neighborRouter; private SnapshotProvider snapshotProvider; + private SnapshotConfig snapshotConfig; + private LocalSnapshotManager localSnapshotManager; + + private static final Logger log = LoggerFactory.getLogger(HeartbeatPulseImpl.class); /** * The rate in milliseconds, at which heartbeats are sent out. @@ -28,9 +38,20 @@ public class HeartbeatPulseImpl implements HeartbeatPulse { private final SilentScheduledExecutorService executorService = new DedicatedScheduledExecutorService( "Heartbeat Pulse"); - public HeartbeatPulseImpl(NeighborRouter neighborRouter, SnapshotProvider snapshotProvider) { + /** + * Constructor for heartbeat pulse + * + * @param neighborRouter Neighbor router + * @param snapshotProvider Snapshot provider + * @param snapshotConfig Snapshot config + * @param localSnapshotManager local snapshot manager + */ + public HeartbeatPulseImpl(NeighborRouter neighborRouter, SnapshotProvider snapshotProvider, + SnapshotConfig snapshotConfig, LocalSnapshotManager localSnapshotManager) { this.neighborRouter = neighborRouter; this.snapshotProvider = snapshotProvider; + this.snapshotConfig = snapshotConfig; + this.localSnapshotManager = localSnapshotManager; } @Override @@ -47,8 +68,16 @@ public void shutdown() { @Override public void sendHeartbeat() { int lastSolidMilestoneIndex = snapshotProvider.getLatestSnapshot().getIndex(); - // The first solid milestone in DB w/o pruning. - int firstSolidMilestoneIndex = 0; // TODO: how do we get this? + int firstSolidMilestoneIndex = snapshotProvider.getInitialSnapshot().getIndex(); + + if (snapshotConfig.getLocalSnapshotsPruningEnabled()) { + try { + firstSolidMilestoneIndex = localSnapshotManager.maxSnapshotPruningMilestone(); + } catch (TransactionPruningException e) { + log.info("Failed to get first solid milestone with pruning" + e.getMessage()); + firstSolidMilestoneIndex = snapshotProvider.getInitialSnapshot().getIndex(); + } + } Heartbeat heartbeat = new Heartbeat(); heartbeat.setFirstSolidMilestoneIndex(firstSolidMilestoneIndex); diff --git a/src/main/java/com/iota/iri/network/protocol/Protocol.java b/src/main/java/com/iota/iri/network/protocol/Protocol.java index c47c7f0bf9..598310f512 100644 --- a/src/main/java/com/iota/iri/network/protocol/Protocol.java +++ b/src/main/java/com/iota/iri/network/protocol/Protocol.java @@ -34,7 +34,6 @@ public class Protocol { */ public final static byte[] SUPPORTED_PROTOCOL_VERSIONS = { /* supports protocol version(s): 1 */ - (byte) 0b00000001, (byte) 0b00000011, }; /** diff --git a/src/main/java/com/iota/iri/service/API.java b/src/main/java/com/iota/iri/service/API.java index b8aa757060..544d8ce2f5 100644 --- a/src/main/java/com/iota/iri/service/API.java +++ b/src/main/java/com/iota/iri/service/API.java @@ -153,7 +153,7 @@ public class API { private final TransactionValidator transactionValidator; private final TransactionSolidifier transactionSolidifier; private final LatestMilestoneTracker latestMilestoneTracker; - + private final int maxFindTxs; private final int maxRequestList; private final int maxGetTrytes; @@ -198,11 +198,11 @@ public class API { * */ public API(IotaConfig configuration, IXI ixi, TransactionRequester transactionRequester, - SpentAddressesService spentAddressesService, Tangle tangle, BundleValidator bundleValidator, - SnapshotProvider snapshotProvider, LedgerService ledgerService, NeighborRouter neighborRouter, - TipSelector tipsSelector, TipsViewModel tipsViewModel, TransactionValidator transactionValidator, - LatestMilestoneTracker latestMilestoneTracker, TransactionProcessingPipeline txPipeline, - TransactionSolidifier transactionSolidifier) { + SpentAddressesService spentAddressesService, Tangle tangle, BundleValidator bundleValidator, + SnapshotProvider snapshotProvider, LedgerService ledgerService, NeighborRouter neighborRouter, + TipSelector tipsSelector, TipsViewModel tipsViewModel, TransactionValidator transactionValidator, + LatestMilestoneTracker latestMilestoneTracker, TransactionProcessingPipeline txPipeline, + TransactionSolidifier transactionSolidifier) { this.configuration = configuration; this.ixi = ixi; @@ -219,7 +219,7 @@ public API(IotaConfig configuration, IXI ixi, TransactionRequester transactionRe this.transactionValidator = transactionValidator; this.transactionSolidifier = transactionSolidifier; this.latestMilestoneTracker = latestMilestoneTracker; - + maxFindTxs = configuration.getMaxFindTransactions(); maxRequestList = configuration.getMaxRequestsList(); maxGetTrytes = configuration.getMaxGetTrytes(); diff --git a/src/main/java/com/iota/iri/service/snapshot/LocalSnapshotManager.java b/src/main/java/com/iota/iri/service/snapshot/LocalSnapshotManager.java index 790113ce78..58c67b1de3 100644 --- a/src/main/java/com/iota/iri/service/snapshot/LocalSnapshotManager.java +++ b/src/main/java/com/iota/iri/service/snapshot/LocalSnapshotManager.java @@ -2,6 +2,7 @@ import com.iota.iri.service.milestone.LatestMilestoneTracker; import com.iota.iri.service.transactionpruning.PruningCondition; +import com.iota.iri.service.transactionpruning.TransactionPruningException; /** * Represents the manager for local {@link Snapshot}s that takes care of periodically creating a new {@link Snapshot} @@ -41,4 +42,10 @@ public interface LocalSnapshotManager { * @param conditions conditions on which we check to make a snapshot */ void addPruningConditions(PruningCondition... conditions); + + /** + * Get the max pruning milestone for all conditions. + * @return Max pruning milestone + */ + int maxSnapshotPruningMilestone() throws TransactionPruningException; } diff --git a/src/main/java/com/iota/iri/service/snapshot/impl/LocalSnapshotManagerImpl.java b/src/main/java/com/iota/iri/service/snapshot/impl/LocalSnapshotManagerImpl.java index b3d3c0154a..9f9f9c819e 100644 --- a/src/main/java/com/iota/iri/service/snapshot/impl/LocalSnapshotManagerImpl.java +++ b/src/main/java/com/iota/iri/service/snapshot/impl/LocalSnapshotManagerImpl.java @@ -22,7 +22,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; /** *

@@ -298,4 +297,15 @@ public void addPruningConditions(PruningCondition... conditions) { this.pruningConditions = ArrayUtils.addAll(this.pruningConditions, conditions); } } + + @Override + public int maxSnapshotPruningMilestone() throws TransactionPruningException { + int max = 0; + for (PruningCondition condition: pruningConditions) { + if(condition.getSnapshotPruningMilestone() > max){ + max = condition.getSnapshotPruningMilestone(); + } + } + return max; + } } diff --git a/src/test/java/com/iota/iri/network/pipeline/BroadcastStageTest.java b/src/test/java/com/iota/iri/network/pipeline/BroadcastStageTest.java index ce1b8fdc85..6f7f30f1f5 100644 --- a/src/test/java/com/iota/iri/network/pipeline/BroadcastStageTest.java +++ b/src/test/java/com/iota/iri/network/pipeline/BroadcastStageTest.java @@ -2,13 +2,18 @@ import com.iota.iri.controllers.TransactionViewModel; import com.iota.iri.model.persistables.Transaction; +import com.iota.iri.network.FakeChannel; import com.iota.iri.network.NeighborRouter; import com.iota.iri.network.neighbor.Neighbor; +import com.iota.iri.network.neighbor.NeighborState; import com.iota.iri.network.neighbor.impl.NeighborImpl; +import java.nio.ByteBuffer; +import java.nio.channels.Selector; import java.util.HashMap; import java.util.Map; +import com.iota.iri.network.protocol.Heartbeat; import com.iota.iri.service.validation.TransactionSolidifier; import org.junit.Rule; import org.junit.Test; @@ -28,9 +33,15 @@ public class BroadcastStageTest { @Mock private NeighborRouter neighborRouter; + @Mock + private Selector selector; + + private final static int firstProtocolVersion = 1; + private final static int stingProtocolVersion = 2; + // setup neighbors - private final static Neighbor neighborA = new NeighborImpl<>(null, null, "A", 0, null); - private final static Neighbor neighborB = new NeighborImpl<>(null, null, "B", 0, null); + private final static Neighbor neighborA = Mockito.spy(new NeighborImpl<>(null, null, "A", 0, null)); + private final static Neighbor neighborB = Mockito.spy(new NeighborImpl<>(null, null, "B", 0, null)); private static Map neighbors = new HashMap<>(); static { @@ -76,4 +87,65 @@ public void gossipsToAllIfNoOriginNeighbor() { } } + @Test + public void gossipToNeighborAccordingToHeartbeat() throws Exception { + TransactionViewModel tvm = Mockito.mock(TransactionViewModel.class); + Mockito.when(tvm.snapshotIndex()).thenReturn(90); + + Heartbeat heartbeat = new Heartbeat(); + heartbeat.setLastSolidMilestoneIndex(100); + heartbeat.setFirstSolidMilestoneIndex(50); + Neighbor neighborC = Mockito.spy(new NeighborImpl<>(selector, new FakeChannel() { + @Override + public int read(ByteBuffer dst) { + return 0; + } + }, "C", 0, null)); + + neighborC.setState(NeighborState.READY_FOR_MESSAGES); + Mockito.when(neighborB.getProtocolVersion()).thenReturn(firstProtocolVersion); + Mockito.when(neighborC.getProtocolVersion()).thenReturn(stingProtocolVersion); + Mockito.when(neighborC.heartbeat()).thenReturn(heartbeat); + neighbors.put(neighborC.getHostAddress(), neighborC); + Mockito.when(neighborRouter.getConnectedNeighbors()).thenReturn(neighbors); + + BroadcastStage broadcastStage = new BroadcastStage(neighborRouter, transactionSolidifier); + BroadcastPayload broadcastPayload = new BroadcastPayload(neighborA, tvm); + ProcessingContext ctx = new ProcessingContext(null, broadcastPayload); + broadcastStage.process(ctx); + + // should send the tvm as neighbor heartbeat is in the milestone range. + Mockito.verify(neighborRouter).gossipTransactionTo(neighborC, tvm); + } + + @Test + public void shouldNotGossipToNeighborAccordingToHeartbeat() throws Exception { + TransactionViewModel tvm = Mockito.mock(TransactionViewModel.class); + Mockito.when(tvm.snapshotIndex()).thenReturn(40); + + Heartbeat heartbeat = new Heartbeat(); + heartbeat.setLastSolidMilestoneIndex(100); + heartbeat.setFirstSolidMilestoneIndex(50); + Neighbor neighborC = Mockito.spy(new NeighborImpl<>(selector, new FakeChannel() { + @Override + public int read(ByteBuffer dst) { + return 0; + } + }, "C", 0, null)); + + neighborC.setState(NeighborState.READY_FOR_MESSAGES); + Mockito.when(neighborB.getProtocolVersion()).thenReturn(firstProtocolVersion); + Mockito.when(neighborC.getProtocolVersion()).thenReturn(stingProtocolVersion); + Mockito.when(neighborC.heartbeat()).thenReturn(heartbeat); + neighbors.put(neighborC.getHostAddress(), neighborC); + Mockito.when(neighborRouter.getConnectedNeighbors()).thenReturn(neighbors); + + BroadcastStage broadcastStage = new BroadcastStage(neighborRouter, transactionSolidifier); + BroadcastPayload broadcastPayload = new BroadcastPayload(neighborA, tvm); + ProcessingContext ctx = new ProcessingContext(null, broadcastPayload); + broadcastStage.process(ctx); + + // should not send the tvm as neighbor heartbeat is not in the milestone range. + Mockito.verify(neighborRouter, Mockito.never()).gossipTransactionTo(neighborC, tvm); + } } \ No newline at end of file