Skip to content

Commit

Permalink
Net: remove message wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
semux committed Aug 23, 2019
1 parent 2abc154 commit 6cea48b
Show file tree
Hide file tree
Showing 13 changed files with 47 additions and 139 deletions.
3 changes: 2 additions & 1 deletion src/main/java/org/semux/api/util/TransactionBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.semux.crypto.Hex;
import org.semux.crypto.Key;
import org.semux.util.Bytes;
import org.semux.util.TimeUtil;

/**
* This is a builder class for building transactions required by Semux API with
Expand Down Expand Up @@ -266,7 +267,7 @@ public Transaction buildUnsigned() {

Long timestamp = this.timestamp;
if (timestamp == null) {
timestamp = System.currentTimeMillis();
timestamp = TimeUtil.currentTimeMillis();
}

byte[] data = this.data;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/semux/config/AbstractConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public abstract class AbstractConfig implements Config, ChainSpec {
// =========================
// Sync
// =========================
protected long syncDownloadTimeout = 2000L;
protected long syncDownloadTimeout = 10_000L;
protected int syncMaxQueuedJobs = 8192;
protected int syncMaxPendingJobs = 256;
protected int syncMaxPendingBlocks = 512;
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/org/semux/core/PendingManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public Thread newThread(Runnable r) {
private static final int QUEUE_SIZE_LIMIT = 128 * 1024;
private static final int VALID_TXS_LIMIT = 16 * 1024;
private static final int LARGE_NONCE_TXS_LIMIT = 32 * 1024;
private static final int PROCESSED_TXS_LIMIT = 256 * 1024;
private static final int PROCESSED_TXS_LIMIT = 128 * 1024;

private final Kernel kernel;
private final BlockStore blockStore;
Expand Down Expand Up @@ -291,7 +291,7 @@ public synchronized void run() {

// process the transaction
int accepted = processTransaction(entry.getValue(), false, false).accepted;
processedTxs.put(entry.getKey(), System.currentTimeMillis());
processedTxs.put(entry.getKey(), TimeUtil.currentTimeMillis());

// include one tx per call
if (accepted > 0) {
Expand Down Expand Up @@ -422,7 +422,7 @@ private SemuxBlock createDummyBlock() {
Block prevBlock = chain.getLatestBlock();
BlockHeader blockHeader = new BlockHeader(
prevBlock.getNumber() + 1,
new Key().toAddress(), prevBlock.getHash(), System.currentTimeMillis(), Bytes.EMPTY_BYTES,
new Key().toAddress(), prevBlock.getHash(), TimeUtil.currentTimeMillis(), Bytes.EMPTY_BYTES,
Bytes.EMPTY_BYTES, Bytes.EMPTY_BYTES, Bytes.EMPTY_BYTES);
return new SemuxBlock(blockHeader, kernel.getConfig().spec().maxBlockGasLimit());
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/semux/net/PeerClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public PeerClient(String ip, int port, Key coinbase) {
this.port = port;
this.coinbase = coinbase;

this.workerGroup = new NioEventLoopGroup(0, factory);
this.workerGroup = new NioEventLoopGroup(4, factory);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/semux/net/PeerServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void start(String ip, int port) {

try {
bossGroup = new NioEventLoopGroup(1, factory);
workerGroup = new NioEventLoopGroup(0, factory);
workerGroup = new NioEventLoopGroup(4, factory);

ServerBootstrap b = new ServerBootstrap();

Expand Down
13 changes: 7 additions & 6 deletions src/main/java/org/semux/net/SemuxP2pHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.semux.net.NodeManager.Node;
import org.semux.net.msg.Message;
import org.semux.net.msg.MessageQueue;
import org.semux.net.msg.MessageWrapper;
import org.semux.net.msg.ReasonCode;
import org.semux.net.msg.consensus.BlockHeaderMessage;
import org.semux.net.msg.consensus.BlockMessage;
Expand Down Expand Up @@ -213,7 +212,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
@Override
public void channelRead0(final ChannelHandlerContext ctx, Message msg) throws InterruptedException {
logger.trace("Received message: {}", msg);
MessageWrapper request = msgQueue.onMessageReceived(msg);

switch (msg.getCode()) {
/* p2p */
Expand All @@ -232,7 +230,7 @@ public void channelRead0(final ChannelHandlerContext ctx, Message msg) throws In
onPing();
break;
case PONG:
onPong(request);
onPong();
break;
case GET_NODES:
onGetNodes();
Expand Down Expand Up @@ -338,14 +336,17 @@ protected void onWorld(org.semux.net.msg.p2p.handshake.v1.WorldMessage msg) {
onHandshakeDone(peer);
}

private long lastPing;

protected void onPing() {
PongMessage pong = new PongMessage();
msgQueue.sendMessage(pong);
lastPing = TimeUtil.currentTimeMillis();
}

protected void onPong(MessageWrapper request) {
if (request != null) {
long latency = TimeUtil.currentTimeMillis() - request.getLastTimestamp();
protected void onPong() {
if (lastPing > 0) {
long latency = TimeUtil.currentTimeMillis() - lastPing;
channel.getRemotePeer().setLatency(latency);
}
}
Expand Down
85 changes: 25 additions & 60 deletions src/main/java/org/semux/net/msg/MessageQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class MessageQueue {

private static final Logger logger = LoggerFactory.getLogger(MessageQueue.class);

private static final ScheduledExecutorService timer = Executors.newScheduledThreadPool(2, new ThreadFactory() {
private static final ScheduledExecutorService timer = Executors.newScheduledThreadPool(4, new ThreadFactory() {
private final AtomicInteger cnt = new AtomicInteger(0);

public Thread newThread(Runnable r) {
Expand All @@ -42,9 +42,8 @@ public Thread newThread(Runnable r) {

private final Config config;

private final Queue<MessageWrapper> requests = new ConcurrentLinkedQueue<>();
private final Queue<MessageWrapper> responses = new ConcurrentLinkedQueue<>();
private final Queue<MessageWrapper> prioritizedResponses = new ConcurrentLinkedQueue<>();
private final Queue<Message> queue = new ConcurrentLinkedQueue<>();
private final Queue<Message> prioritized = new ConcurrentLinkedQueue<>();

private ChannelHandlerContext ctx;
private ScheduledFuture<?> timerTask;
Expand Down Expand Up @@ -73,7 +72,7 @@ public synchronized void activate(ChannelHandlerContext ctx) {
} catch (Exception t) {
logger.error("Exception in MessageQueue", t);
}
}, 1, 1, TimeUnit.MILLISECONDS);
}, 10, 10, TimeUnit.MILLISECONDS);
}

/**
Expand All @@ -86,7 +85,11 @@ public synchronized void deactivate() {
/**
* Returns if this message queue is idle.
*
* @return true if request/response queues are empty, otherwise false
* NOTE that requests are no longer kept in the queue after we send them out.
* even through the message queue is idle, from our perspective, the peer
* may still be busy responding our requests.
*
* @return true if message queues are empty, otherwise false
*/
public boolean isIdle() {
return size() == 0;
Expand Down Expand Up @@ -115,82 +118,44 @@ public void disconnect(ReasonCode code) {
* false
*/
public boolean sendMessage(Message msg) {
int maxQueueSize = config.netMaxMessageQueueSize();
if (size() >= maxQueueSize) {
if (size() >= config.netMaxMessageQueueSize()) {
disconnect(ReasonCode.MESSAGE_QUEUE_FULL);
return false;
}

if (msg.getResponseMessageClass() != null) {
requests.add(new MessageWrapper(msg));
if (config.netPrioritizedMessages().contains(msg.getCode())) {
prioritized.add(msg);
} else {
if (config.netPrioritizedMessages().contains(msg.getCode())) {
prioritizedResponses.add(new MessageWrapper(msg));
} else {
responses.add(new MessageWrapper(msg));
}
queue.add(msg);
}
return true;
}

/**
* Notifies this message queue that a new message has been received.
*
* @param msg
*/
public MessageWrapper onMessageReceived(Message msg) {
if (requests.peek() != null) {
MessageWrapper mw = requests.peek();
Message m = mw.getMessage();

if (m.getResponseMessageClass() != null && msg.getClass() == m.getResponseMessageClass()) {
mw.answer();
return mw;
}
}

return null;
}

/**
* Returns the number of messages in queue.
*
* @return
*/
public int size() {
return requests.size() + responses.size() + prioritizedResponses.size();
return queue.size() + prioritized.size();
}

protected void nudgeQueue() {
removeAnsweredMessage(requests.peek());

// send responses
sendToWire(prioritizedResponses.poll());
sendToWire(responses.poll());

// send requests
sendToWire(requests.peek());
}

protected void removeAnsweredMessage(MessageWrapper mw) {
if (mw != null && mw.isAnswered()) {
requests.remove();
// 1000 / 10 * 5 = 500 messages per second
int n = Math.min(5, size());
if (n == 0) {
return;
}
}

protected void sendToWire(MessageWrapper mw) {
if (mw != null && mw.getRetries() == 0) {
Message msg = mw.getMessage();
// write out n messages
for (int i = 0; i < n; i++) {
Message msg = !prioritized.isEmpty() ? prioritized.poll() : queue.poll();

logger.trace("Wiring message: {}", msg);
ctx.writeAndFlush(msg).addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
logger.trace("Message {} to {} took {} ms", msg.code, ctx.channel().remoteAddress(),
TimeUtil.currentTimeMillis() - mw.getLastTimestamp());

if (msg.getResponseMessageClass() != null) {
mw.increaseRetries();
mw.saveTime();
}
ctx.write(msg).addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
}

// flush
ctx.flush();
}
}
60 changes: 0 additions & 60 deletions src/main/java/org/semux/net/msg/MessageWrapper.java

This file was deleted.

5 changes: 3 additions & 2 deletions src/test/java/org/semux/net/msg/MessageQueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.semux.net.msg.p2p.PingMessage;
import org.semux.net.msg.p2p.PongMessage;
import org.semux.rules.KernelRule;
import org.semux.util.TimeUtil;

public class MessageQueueTest {

Expand Down Expand Up @@ -64,10 +65,10 @@ private Channel connect() throws InterruptedException {
client.connect(remoteNode, ci).sync();

long maxWaitTime = 30_000;
long startTime = System.currentTimeMillis();
long startTime = TimeUtil.currentTimeMillis();
while (kernel2.getChannelManager().getActiveChannels().isEmpty()) {
Thread.sleep(100);
if (System.currentTimeMillis() - startTime > maxWaitTime) {
if (TimeUtil.currentTimeMillis() - startTime > maxWaitTime) {
fail("Took too long to connect peers");
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/org/semux/util/TimeUtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testFormatDuration() {

@Test
public void testNtpTime() {
long currentTime = System.currentTimeMillis();
long currentTime = TimeUtil.currentTimeMillis();
long offset = TimeUtil.getTimeOffsetFromNtp();
// ensure the time is within the actual time offset.
Assert.assertTrue(Math.abs(currentTime + offset - TimeUtil.currentTimeMillis()) < 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void testInternalTransfer() {
as.setCode(to, contract);

SemuxBlock bh = new SemuxBlock(
new BlockHeader(123, Bytes.random(20), Bytes.random(20), System.currentTimeMillis(),
new BlockHeader(123, Bytes.random(20), Bytes.random(20), TimeUtil.currentTimeMillis(),
Bytes.random(20), Bytes.random(20), Bytes.random(20), Bytes.random(20)),
config.spec().maxBlockGasLimit());
as.adjustAvailable(from, Amount.of(1000, SEM));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testSuccess() {
as.setCode(to, contract);

SemuxBlock block = new SemuxBlock(
new BlockHeader(123, Bytes.random(20), Bytes.random(20), System.currentTimeMillis(),
new BlockHeader(123, Bytes.random(20), Bytes.random(20), TimeUtil.currentTimeMillis(),
Bytes.random(20), Bytes.random(20), Bytes.random(20), Bytes.random(20)),
config.spec().maxBlockGasLimit());
as.adjustAvailable(from, Amount.of(1000, SEM));
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/org/semux/vm/client/VmTransactionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void prepare() {
doReturn(true).when(chain).isForkActivated(any());

block = new SemuxBlock(
new BlockHeader(1, Bytes.random(20), Bytes.random(32), System.currentTimeMillis(),
new BlockHeader(1, Bytes.random(20), Bytes.random(32), TimeUtil.currentTimeMillis(),
Bytes.random(20), Bytes.random(20), Bytes.random(20), Bytes.random(20)),
config.spec().maxBlockGasLimit());
}
Expand Down

0 comments on commit 6cea48b

Please sign in to comment.