From 6cea48bbf60572b0c1ec7968188f04b4619122fa Mon Sep 17 00:00:00 2001 From: semux Date: Fri, 23 Aug 2019 12:04:10 +0100 Subject: [PATCH] Net: remove message wrapper --- .../semux/api/util/TransactionBuilder.java | 3 +- .../java/org/semux/config/AbstractConfig.java | 2 +- .../java/org/semux/core/PendingManager.java | 6 +- src/main/java/org/semux/net/PeerClient.java | 2 +- src/main/java/org/semux/net/PeerServer.java | 2 +- .../java/org/semux/net/SemuxP2pHandler.java | 13 +-- .../java/org/semux/net/msg/MessageQueue.java | 85 ++++++------------- .../org/semux/net/msg/MessageWrapper.java | 60 ------------- .../org/semux/net/msg/MessageQueueTest.java | 5 +- .../java/org/semux/util/TimeUtilTest.java | 2 +- .../vm/client/InternalTransactionTest.java | 2 +- .../vm/client/PrecompiledContractTest.java | 2 +- .../semux/vm/client/VmTransactionTest.java | 2 +- 13 files changed, 47 insertions(+), 139 deletions(-) delete mode 100644 src/main/java/org/semux/net/msg/MessageWrapper.java diff --git a/src/main/java/org/semux/api/util/TransactionBuilder.java b/src/main/java/org/semux/api/util/TransactionBuilder.java index 0d6cc496e..b9707f818 100644 --- a/src/main/java/org/semux/api/util/TransactionBuilder.java +++ b/src/main/java/org/semux/api/util/TransactionBuilder.java @@ -15,6 +15,7 @@ import org.semux.crypto.Hex; import org.semux.crypto.Key; import org.semux.util.Bytes; +import org.semux.util.TimeUtil; /** * This is a builder class for building transactions required by Semux API with @@ -266,7 +267,7 @@ public Transaction buildUnsigned() { Long timestamp = this.timestamp; if (timestamp == null) { - timestamp = System.currentTimeMillis(); + timestamp = TimeUtil.currentTimeMillis(); } byte[] data = this.data; diff --git a/src/main/java/org/semux/config/AbstractConfig.java b/src/main/java/org/semux/config/AbstractConfig.java index 9e8e048aa..dd77d0789 100644 --- a/src/main/java/org/semux/config/AbstractConfig.java +++ b/src/main/java/org/semux/config/AbstractConfig.java @@ -97,7 +97,7 @@ public abstract class AbstractConfig implements Config, ChainSpec { // ========================= // Sync // ========================= - protected long syncDownloadTimeout = 2000L; + protected long syncDownloadTimeout = 10_000L; protected int syncMaxQueuedJobs = 8192; protected int syncMaxPendingJobs = 256; protected int syncMaxPendingBlocks = 512; diff --git a/src/main/java/org/semux/core/PendingManager.java b/src/main/java/org/semux/core/PendingManager.java index 5085bda7c..7661a893b 100644 --- a/src/main/java/org/semux/core/PendingManager.java +++ b/src/main/java/org/semux/core/PendingManager.java @@ -67,7 +67,7 @@ public Thread newThread(Runnable r) { private static final int QUEUE_SIZE_LIMIT = 128 * 1024; private static final int VALID_TXS_LIMIT = 16 * 1024; private static final int LARGE_NONCE_TXS_LIMIT = 32 * 1024; - private static final int PROCESSED_TXS_LIMIT = 256 * 1024; + private static final int PROCESSED_TXS_LIMIT = 128 * 1024; private final Kernel kernel; private final BlockStore blockStore; @@ -291,7 +291,7 @@ public synchronized void run() { // process the transaction int accepted = processTransaction(entry.getValue(), false, false).accepted; - processedTxs.put(entry.getKey(), System.currentTimeMillis()); + processedTxs.put(entry.getKey(), TimeUtil.currentTimeMillis()); // include one tx per call if (accepted > 0) { @@ -422,7 +422,7 @@ private SemuxBlock createDummyBlock() { Block prevBlock = chain.getLatestBlock(); BlockHeader blockHeader = new BlockHeader( prevBlock.getNumber() + 1, - new Key().toAddress(), prevBlock.getHash(), System.currentTimeMillis(), Bytes.EMPTY_BYTES, + new Key().toAddress(), prevBlock.getHash(), TimeUtil.currentTimeMillis(), Bytes.EMPTY_BYTES, Bytes.EMPTY_BYTES, Bytes.EMPTY_BYTES, Bytes.EMPTY_BYTES); return new SemuxBlock(blockHeader, kernel.getConfig().spec().maxBlockGasLimit()); } diff --git a/src/main/java/org/semux/net/PeerClient.java b/src/main/java/org/semux/net/PeerClient.java index 627b10c74..2dfe71dfb 100644 --- a/src/main/java/org/semux/net/PeerClient.java +++ b/src/main/java/org/semux/net/PeerClient.java @@ -73,7 +73,7 @@ public PeerClient(String ip, int port, Key coinbase) { this.port = port; this.coinbase = coinbase; - this.workerGroup = new NioEventLoopGroup(0, factory); + this.workerGroup = new NioEventLoopGroup(4, factory); } /** diff --git a/src/main/java/org/semux/net/PeerServer.java b/src/main/java/org/semux/net/PeerServer.java index 7752b4c53..a6aaf0272 100644 --- a/src/main/java/org/semux/net/PeerServer.java +++ b/src/main/java/org/semux/net/PeerServer.java @@ -60,7 +60,7 @@ public void start(String ip, int port) { try { bossGroup = new NioEventLoopGroup(1, factory); - workerGroup = new NioEventLoopGroup(0, factory); + workerGroup = new NioEventLoopGroup(4, factory); ServerBootstrap b = new ServerBootstrap(); diff --git a/src/main/java/org/semux/net/SemuxP2pHandler.java b/src/main/java/org/semux/net/SemuxP2pHandler.java index 34e5e4289..0a39e5a04 100644 --- a/src/main/java/org/semux/net/SemuxP2pHandler.java +++ b/src/main/java/org/semux/net/SemuxP2pHandler.java @@ -35,7 +35,6 @@ import org.semux.net.NodeManager.Node; import org.semux.net.msg.Message; import org.semux.net.msg.MessageQueue; -import org.semux.net.msg.MessageWrapper; import org.semux.net.msg.ReasonCode; import org.semux.net.msg.consensus.BlockHeaderMessage; import org.semux.net.msg.consensus.BlockMessage; @@ -213,7 +212,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E @Override public void channelRead0(final ChannelHandlerContext ctx, Message msg) throws InterruptedException { logger.trace("Received message: {}", msg); - MessageWrapper request = msgQueue.onMessageReceived(msg); switch (msg.getCode()) { /* p2p */ @@ -232,7 +230,7 @@ public void channelRead0(final ChannelHandlerContext ctx, Message msg) throws In onPing(); break; case PONG: - onPong(request); + onPong(); break; case GET_NODES: onGetNodes(); @@ -338,14 +336,17 @@ protected void onWorld(org.semux.net.msg.p2p.handshake.v1.WorldMessage msg) { onHandshakeDone(peer); } + private long lastPing; + protected void onPing() { PongMessage pong = new PongMessage(); msgQueue.sendMessage(pong); + lastPing = TimeUtil.currentTimeMillis(); } - protected void onPong(MessageWrapper request) { - if (request != null) { - long latency = TimeUtil.currentTimeMillis() - request.getLastTimestamp(); + protected void onPong() { + if (lastPing > 0) { + long latency = TimeUtil.currentTimeMillis() - lastPing; channel.getRemotePeer().setLatency(latency); } } diff --git a/src/main/java/org/semux/net/msg/MessageQueue.java b/src/main/java/org/semux/net/msg/MessageQueue.java index e1686e490..eacb1759e 100644 --- a/src/main/java/org/semux/net/msg/MessageQueue.java +++ b/src/main/java/org/semux/net/msg/MessageQueue.java @@ -32,7 +32,7 @@ public class MessageQueue { private static final Logger logger = LoggerFactory.getLogger(MessageQueue.class); - private static final ScheduledExecutorService timer = Executors.newScheduledThreadPool(2, new ThreadFactory() { + private static final ScheduledExecutorService timer = Executors.newScheduledThreadPool(4, new ThreadFactory() { private final AtomicInteger cnt = new AtomicInteger(0); public Thread newThread(Runnable r) { @@ -42,9 +42,8 @@ public Thread newThread(Runnable r) { private final Config config; - private final Queue requests = new ConcurrentLinkedQueue<>(); - private final Queue responses = new ConcurrentLinkedQueue<>(); - private final Queue prioritizedResponses = new ConcurrentLinkedQueue<>(); + private final Queue queue = new ConcurrentLinkedQueue<>(); + private final Queue prioritized = new ConcurrentLinkedQueue<>(); private ChannelHandlerContext ctx; private ScheduledFuture timerTask; @@ -73,7 +72,7 @@ public synchronized void activate(ChannelHandlerContext ctx) { } catch (Exception t) { logger.error("Exception in MessageQueue", t); } - }, 1, 1, TimeUnit.MILLISECONDS); + }, 10, 10, TimeUnit.MILLISECONDS); } /** @@ -86,7 +85,11 @@ public synchronized void deactivate() { /** * Returns if this message queue is idle. * - * @return true if request/response queues are empty, otherwise false + * NOTE that requests are no longer kept in the queue after we send them out. + * even through the message queue is idle, from our perspective, the peer + * may still be busy responding our requests. + * + * @return true if message queues are empty, otherwise false */ public boolean isIdle() { return size() == 0; @@ -115,82 +118,44 @@ public void disconnect(ReasonCode code) { * false */ public boolean sendMessage(Message msg) { - int maxQueueSize = config.netMaxMessageQueueSize(); - if (size() >= maxQueueSize) { + if (size() >= config.netMaxMessageQueueSize()) { disconnect(ReasonCode.MESSAGE_QUEUE_FULL); return false; } - if (msg.getResponseMessageClass() != null) { - requests.add(new MessageWrapper(msg)); + if (config.netPrioritizedMessages().contains(msg.getCode())) { + prioritized.add(msg); } else { - if (config.netPrioritizedMessages().contains(msg.getCode())) { - prioritizedResponses.add(new MessageWrapper(msg)); - } else { - responses.add(new MessageWrapper(msg)); - } + queue.add(msg); } return true; } - /** - * Notifies this message queue that a new message has been received. - * - * @param msg - */ - public MessageWrapper onMessageReceived(Message msg) { - if (requests.peek() != null) { - MessageWrapper mw = requests.peek(); - Message m = mw.getMessage(); - - if (m.getResponseMessageClass() != null && msg.getClass() == m.getResponseMessageClass()) { - mw.answer(); - return mw; - } - } - - return null; - } - /** * Returns the number of messages in queue. * * @return */ public int size() { - return requests.size() + responses.size() + prioritizedResponses.size(); + return queue.size() + prioritized.size(); } protected void nudgeQueue() { - removeAnsweredMessage(requests.peek()); - - // send responses - sendToWire(prioritizedResponses.poll()); - sendToWire(responses.poll()); - - // send requests - sendToWire(requests.peek()); - } - - protected void removeAnsweredMessage(MessageWrapper mw) { - if (mw != null && mw.isAnswered()) { - requests.remove(); + // 1000 / 10 * 5 = 500 messages per second + int n = Math.min(5, size()); + if (n == 0) { + return; } - } - protected void sendToWire(MessageWrapper mw) { - if (mw != null && mw.getRetries() == 0) { - Message msg = mw.getMessage(); + // write out n messages + for (int i = 0; i < n; i++) { + Message msg = !prioritized.isEmpty() ? prioritized.poll() : queue.poll(); logger.trace("Wiring message: {}", msg); - ctx.writeAndFlush(msg).addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); - logger.trace("Message {} to {} took {} ms", msg.code, ctx.channel().remoteAddress(), - TimeUtil.currentTimeMillis() - mw.getLastTimestamp()); - - if (msg.getResponseMessageClass() != null) { - mw.increaseRetries(); - mw.saveTime(); - } + ctx.write(msg).addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); } + + // flush + ctx.flush(); } } diff --git a/src/main/java/org/semux/net/msg/MessageWrapper.java b/src/main/java/org/semux/net/msg/MessageWrapper.java deleted file mode 100644 index fd957b9b4..000000000 --- a/src/main/java/org/semux/net/msg/MessageWrapper.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Copyright (c) 2017-2018 The Semux Developers - * - * Distributed under the MIT software license, see the accompanying file - * LICENSE or https://opensource.org/licenses/mit-license.php - */ -package org.semux.net.msg; - -import org.semux.util.TimeUtil; - -/** - * Utility that keeps track of the number of retries and lastTimestamp. - * - */ -public class MessageWrapper { - - private final Message message; - private long lastTimestamp = 0; - - private int retries = 0; - private boolean isAnswered = false; - - /** - * Create a message round trip. - * - * @param message - */ - public MessageWrapper(Message message) { - this.message = message; - saveTime(); - } - - public void saveTime() { - lastTimestamp = TimeUtil.currentTimeMillis(); - } - - public void answer() { - this.isAnswered = true; - } - - public void increaseRetries() { - ++retries; - } - - public Message getMessage() { - return message; - } - - public long getLastTimestamp() { - return lastTimestamp; - } - - public int getRetries() { - return retries; - } - - public boolean isAnswered() { - return isAnswered; - } -} \ No newline at end of file diff --git a/src/test/java/org/semux/net/msg/MessageQueueTest.java b/src/test/java/org/semux/net/msg/MessageQueueTest.java index 14159a61b..edb6106c4 100644 --- a/src/test/java/org/semux/net/msg/MessageQueueTest.java +++ b/src/test/java/org/semux/net/msg/MessageQueueTest.java @@ -23,6 +23,7 @@ import org.semux.net.msg.p2p.PingMessage; import org.semux.net.msg.p2p.PongMessage; import org.semux.rules.KernelRule; +import org.semux.util.TimeUtil; public class MessageQueueTest { @@ -64,10 +65,10 @@ private Channel connect() throws InterruptedException { client.connect(remoteNode, ci).sync(); long maxWaitTime = 30_000; - long startTime = System.currentTimeMillis(); + long startTime = TimeUtil.currentTimeMillis(); while (kernel2.getChannelManager().getActiveChannels().isEmpty()) { Thread.sleep(100); - if (System.currentTimeMillis() - startTime > maxWaitTime) { + if (TimeUtil.currentTimeMillis() - startTime > maxWaitTime) { fail("Took too long to connect peers"); } } diff --git a/src/test/java/org/semux/util/TimeUtilTest.java b/src/test/java/org/semux/util/TimeUtilTest.java index a7954565b..c6d72cf11 100644 --- a/src/test/java/org/semux/util/TimeUtilTest.java +++ b/src/test/java/org/semux/util/TimeUtilTest.java @@ -42,7 +42,7 @@ public void testFormatDuration() { @Test public void testNtpTime() { - long currentTime = System.currentTimeMillis(); + long currentTime = TimeUtil.currentTimeMillis(); long offset = TimeUtil.getTimeOffsetFromNtp(); // ensure the time is within the actual time offset. Assert.assertTrue(Math.abs(currentTime + offset - TimeUtil.currentTimeMillis()) < 1000); diff --git a/src/test/java/org/semux/vm/client/InternalTransactionTest.java b/src/test/java/org/semux/vm/client/InternalTransactionTest.java index 93b357ae2..dff84be7d 100644 --- a/src/test/java/org/semux/vm/client/InternalTransactionTest.java +++ b/src/test/java/org/semux/vm/client/InternalTransactionTest.java @@ -85,7 +85,7 @@ public void testInternalTransfer() { as.setCode(to, contract); SemuxBlock bh = new SemuxBlock( - new BlockHeader(123, Bytes.random(20), Bytes.random(20), System.currentTimeMillis(), + new BlockHeader(123, Bytes.random(20), Bytes.random(20), TimeUtil.currentTimeMillis(), Bytes.random(20), Bytes.random(20), Bytes.random(20), Bytes.random(20)), config.spec().maxBlockGasLimit()); as.adjustAvailable(from, Amount.of(1000, SEM)); diff --git a/src/test/java/org/semux/vm/client/PrecompiledContractTest.java b/src/test/java/org/semux/vm/client/PrecompiledContractTest.java index 5a8ed721e..7f965281a 100644 --- a/src/test/java/org/semux/vm/client/PrecompiledContractTest.java +++ b/src/test/java/org/semux/vm/client/PrecompiledContractTest.java @@ -90,7 +90,7 @@ public void testSuccess() { as.setCode(to, contract); SemuxBlock block = new SemuxBlock( - new BlockHeader(123, Bytes.random(20), Bytes.random(20), System.currentTimeMillis(), + new BlockHeader(123, Bytes.random(20), Bytes.random(20), TimeUtil.currentTimeMillis(), Bytes.random(20), Bytes.random(20), Bytes.random(20), Bytes.random(20)), config.spec().maxBlockGasLimit()); as.adjustAvailable(from, Amount.of(1000, SEM)); diff --git a/src/test/java/org/semux/vm/client/VmTransactionTest.java b/src/test/java/org/semux/vm/client/VmTransactionTest.java index 6adbe119c..319b30a69 100644 --- a/src/test/java/org/semux/vm/client/VmTransactionTest.java +++ b/src/test/java/org/semux/vm/client/VmTransactionTest.java @@ -72,7 +72,7 @@ public void prepare() { doReturn(true).when(chain).isForkActivated(any()); block = new SemuxBlock( - new BlockHeader(1, Bytes.random(20), Bytes.random(32), System.currentTimeMillis(), + new BlockHeader(1, Bytes.random(20), Bytes.random(32), TimeUtil.currentTimeMillis(), Bytes.random(20), Bytes.random(20), Bytes.random(20), Bytes.random(20)), config.spec().maxBlockGasLimit()); }