Skip to content
This repository has been archived by the owner on Feb 9, 2019. It is now read-only.

Commit

Permalink
RC2/3 changes;
Browse files Browse the repository at this point in the history
better logging
Pushlösung
  • Loading branch information
rico666 committed Dec 8, 2018
1 parent bb25c7d commit e080d86
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 167 deletions.
21 changes: 7 additions & 14 deletions conf/brs-default.properties
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,11 @@ P2P.TimeoutIdle_ms = 30000
# Blacklist peers for 600000 milliseconds (i.e. 10 minutes by default).
P2P.BlacklistingTime_ms = 600000

# Enable re-broadcasting of new transactions until they are received back from at least one
# peer, or found in the blockchain. This feature can optionally be disabled, to avoid the
# risk of revealing that this node is the submitter of such re-broadcasted new transactions.
# Enable priority (re-)broadcasting of transactions. When enabled incoming transactions
# will be priority resent to the rebroadcast targets
P2P.enableTxRebroadcast = yes

# Transactions that aren't confirmed for this many blocks start getting rebroadcast.
P2P.rebroadcastTxAfter = 5

# Transactions being rebroadcast get rebroadcast every this many blocks until they are confirmed.
P2P.rebroadcastTxEvery = 2

# Consider a new transaction or block sent after 10 peers have received it.
# Amount of extra peers to send a transaction to after sending to all rebroadcast targets
P2P.sendToLimit=10

# Max number of unconfirmed transactions that will be kept in cache.
Expand All @@ -99,13 +92,10 @@ P2P.maxUnconfirmedTransactions = 8192
# Max percentage of unconfirmed transactions that have a full hash reference to another transaction kept in cache
P2P.maxUnconfirmedTransactionsFullHashReferencePercentage = 5

# Max amount of unconfirmed transactions that will be asked to a peer at the same time
P2P.limitUnconfirmedTransactionsToRetrieve = 1000

# JETTY pass-through options. See documentation at
# https://www.eclipse.org/jetty/documentation/9.2.22.v20170531/dos-filter.html
# P2P section:
JETTY.P2P.DoSFilter = off
JETTY.P2P.DoSFilter = on
JETTY.P2P.DoSFilter.maxRequestsPerSec = 30
JETTY.P2P.DoSFilter.delayMs = 500
JETTY.P2P.DoSFilter.maxRequestMs = 300000
Expand Down Expand Up @@ -308,3 +298,6 @@ brs.debugTraceQuote =

# Log changes to unconfirmed balances.
brs.debugLogUnconfirmed = false

# Timeout in Seconds to wait for a graceful shutdown
brs.ShutdownTimeout = 180
2 changes: 1 addition & 1 deletion src/brs/BlockchainProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ enum Event {

int getMinRollbackHeight();

void processPeerBlock(JSONObject request) throws BurstException;
void processPeerBlock(JSONObject request, Peer peer) throws BurstException;

void fullReset();

Expand Down
13 changes: 4 additions & 9 deletions src/brs/BlockchainProcessorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import brs.statistics.StatisticsManagerImpl;
import brs.services.AccountService;
import brs.transactionduplicates.TransactionDuplicatesCheckerImpl;
import brs.transactionduplicates.TransactionDuplicationResult;
import brs.unconfirmedtransactions.UnconfirmedTransactionStore;
import brs.util.ThreadPool;

Expand All @@ -32,11 +31,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -744,10 +740,10 @@ public int getMinRollbackHeight() {
}

@Override
public void processPeerBlock(JSONObject request) throws BurstException {
public void processPeerBlock(JSONObject request, Peer peer) throws BurstException {
Block newBlock = Block.parseBlock(request, blockchain.getHeight());
if (newBlock == null) {
logger.debug("Peer has announced an unprocessable block.");
logger.debug("Peer {} has announced an unprocessable block.", peer.getPeerAddress());
return;
}
/*
Expand All @@ -760,10 +756,9 @@ public void processPeerBlock(JSONObject request) throws BurstException {
newBlock.setByteLength(newBlock.toString().length());
blockService.calculateBaseTarget(newBlock, chainblock);
downloadCache.addBlock(newBlock);
logger.debug("Added from Anounce: Id: " +newBlock.getId()+" Height: "+newBlock.getHeight());
logger.debug("Peer {} added block from Announce: Id: {} Height: {}", peer.getPeerAddress(), newBlock.getId(), newBlock.getHeight());
} else {
logger.debug("Peer sent us block: " + newBlock.getPreviousBlockId()
+ " that does not match our chain.");
logger.debug("Peer {} sent us block: {} which is not the follow-up block for {}", peer.getPeerAddress(), newBlock.getPreviousBlockId(), chainblock.getId());
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/brs/TransactionProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ enum Event {

void clearUnconfirmedTransactions();

void broadcast(Transaction transaction) throws BurstException.ValidationException;
Integer broadcast(Transaction transaction) throws BurstException.ValidationException;

void processPeerTransactions(JSONObject request, Peer peer) throws BurstException.ValidationException;

Expand Down
41 changes: 17 additions & 24 deletions src/brs/TransactionProcessorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
Expand Down Expand Up @@ -84,11 +85,12 @@ public TransactionProcessorImpl(PropertyService propertyService,

JSONArray transactionsData = (JSONArray) response.get(UNCONFIRMED_TRANSACTIONS_RESPONSE);

if (transactionsData == null || transactionsData.isEmpty()) {
if (transactionsData == null) {
return;
}
try {
List<Transaction> addedTransactions = processPeerTransactions(transactionsData, peer);
Peers.feedingTime(peer, foodDispenser, doneFeedingLog);

if(! addedTransactions.isEmpty()) {
List<Peer> activePrioPlusExtra = Peers.getAllActivePriorityPlusSomeExtraPeers();
Expand Down Expand Up @@ -180,40 +182,25 @@ public Transaction.Builder newTransactionBuilder(byte[] senderPublicKey, long am
}

@Override
public void broadcast(Transaction transaction) throws BurstException.ValidationException {
public Integer broadcast(Transaction transaction) throws BurstException.ValidationException {
if (! transaction.verifySignature()) {
throw new BurstException.NotValidException("Transaction signature verification failed");
}
List<Transaction> processedTransactions;
if (dbs.getTransactionDb().hasTransaction(transaction.getId())) {
logger.info("Transaction " + transaction.getStringId() + " already in blockchain, will not broadcast again");
return;
return null;
}

if (unconfirmedTransactionStore.exists(transaction.getId())) {
/*
if (enableTransactionRebroadcasting) {
nonBroadcastedTransactions.add(transaction);
logger.info("Transaction " + transaction.getStringId() + " already in unconfirmed pool, will re-broadcast");
} else {*/
logger.info("Transaction " + transaction.getStringId() + " already in unconfirmed pool, will not broadcast again");
///}
return;
logger.info("Transaction " + transaction.getStringId() + " already in unconfirmed pool, will not broadcast again");
return null;
}

processedTransactions = processTransactions(Collections.singleton(transaction), null);

if(! processedTransactions.isEmpty()) {
broadcastToPeers();
}

if (processedTransactions.contains(transaction)) {
/*
if (enableTransactionRebroadcasting) {
nonBroadcastedTransactions.add(transaction);
}
*/
logger.debug("Accepted new transaction " + transaction.getStringId());
return broadcastToPeers(true);
} else {
logger.debug("Could not accept new transaction " + transaction.getStringId());
throw new BurstException.NotValidException("Invalid transaction " + transaction.getStringId());
Expand All @@ -226,7 +213,7 @@ public void processPeerTransactions(JSONObject request, Peer peer) throws BurstE
List<Transaction> processedTransactions = processPeerTransactions(transactionsData, peer);

if(! processedTransactions.isEmpty()) {
broadcastToPeers();
broadcastToPeers(false);
}
}

Expand Down Expand Up @@ -373,10 +360,16 @@ private List<Transaction> processTransactions(Collection<Transaction> transactio
}
}

private void broadcastToPeers() {
for(Peer p: Peers.getAllActivePriorityPlusSomeExtraPeers()) {
private int broadcastToPeers(boolean toAll) {
List<? extends Peer> peersToSendTo = toAll ? Peers.getActivePeers().stream().limit(100).collect(Collectors.toList()) : Peers.getAllActivePriorityPlusSomeExtraPeers();

logger.info("Queueing up {} Peers for feeding", peersToSendTo.size());

for(Peer p: peersToSendTo) {
Peers.feedingTime(p, foodDispenser, doneFeedingLog);
}

return peersToSendTo.size();
}

public void revalidateUnconfirmedTransactions() {
Expand Down
4 changes: 2 additions & 2 deletions src/brs/http/APITransactionManagerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static brs.http.common.ResultFields.BROADCASTED_RESPONSE;
import static brs.http.common.ResultFields.ERROR_RESPONSE;
import static brs.http.common.ResultFields.FULL_HASH_RESPONSE;
import static brs.http.common.ResultFields.NUMBER_PEERS_SENT_TO_RESPONSE;
import static brs.http.common.ResultFields.SIGNATURE_HASH_RESPONSE;
import static brs.http.common.ResultFields.TRANSACTION_BYTES_RESPONSE;
import static brs.http.common.ResultFields.TRANSACTION_JSON_RESPONSE;
Expand All @@ -38,7 +39,6 @@
import brs.Blockchain;
import brs.Burst;
import brs.BurstException;
import brs.Constants;
import brs.Transaction;
import brs.Transaction.Builder;
import brs.TransactionProcessor;
Expand Down Expand Up @@ -187,7 +187,7 @@ public JSONStreamAware createTransaction(HttpServletRequest req, Account senderA
response.put(TRANSACTION_BYTES_RESPONSE, Convert.toHexString(transaction.getBytes()));
response.put(SIGNATURE_HASH_RESPONSE, Convert.toHexString(Crypto.sha256().digest(transaction.getSignature())));
if (broadcast) {
transactionProcessor.broadcast(transaction);
response.put(NUMBER_PEERS_SENT_TO_RESPONSE, transactionProcessor.broadcast(transaction));
response.put(BROADCASTED_RESPONSE, true);
} else {
response.put(BROADCASTED_RESPONSE, false);
Expand Down
3 changes: 2 additions & 1 deletion src/brs/http/BroadcastTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static brs.http.common.ResultFields.ERROR_DESCRIPTION_RESPONSE;
import static brs.http.common.ResultFields.ERROR_RESPONSE;
import static brs.http.common.ResultFields.FULL_HASH_RESPONSE;
import static brs.http.common.ResultFields.NUMBER_PEERS_SENT_TO_RESPONSE;
import static brs.http.common.ResultFields.TRANSACTION_RESPONSE;

import brs.BurstException;
Expand Down Expand Up @@ -45,7 +46,7 @@ JSONStreamAware processRequest(HttpServletRequest req) throws BurstException {
JSONObject response = new JSONObject();
try {
transactionService.validate(transaction);
transactionProcessor.broadcast(transaction);
response.put(NUMBER_PEERS_SENT_TO_RESPONSE, transactionProcessor.broadcast(transaction));
response.put(TRANSACTION_RESPONSE, transaction.getStringId());
response.put(FULL_HASH_RESPONSE, transaction.getFullHash());
} catch (BurstException.ValidationException | RuntimeException e) {
Expand Down
1 change: 1 addition & 0 deletions src/brs/http/common/ResultFields.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class ResultFields {
public static final String DONE_RESPONSE = "done";
public static final String SCAN_TIME_RESPONSE = "scanTime";
public static final String BROADCASTED_RESPONSE = "broadcasted";
public static final String NUMBER_PEERS_SENT_TO_RESPONSE = "numberPeersSentTo";
public static final String UNSIGNED_TRANSACTION_BYTES_RESPONSE = "unsignedTransactionBytes";
public static final String TRANSACTION_JSON_RESPONSE = "transactionJSON";
public static final String TRANSACTION_BYTES_RESPONSE = "transactionBytes";
Expand Down
33 changes: 19 additions & 14 deletions src/brs/peer/Peers.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static brs.props.Props.P2P_ENABLE_TX_REBROADCAST;
import static brs.props.Props.P2P_SEND_TO_LIMIT;
import static brs.util.JSON.prepareRequest;

import brs.*;
import brs.props.Props;
Expand Down Expand Up @@ -164,7 +165,7 @@ public static void init(TimeService timeService, AccountService accountService,
logger.debug("My peer info:\n" + json.toJSONString());
myPeerInfoResponse = JSON.prepare(json);
json.put("requestType", "getInfo");
myPeerInfoRequest = JSON.prepareRequest(json);
myPeerInfoRequest = prepareRequest(json);

if(propertyService.getBoolean(P2P_ENABLE_TX_REBROADCAST)) {
rebroadcastPeers = Collections
Expand Down Expand Up @@ -509,7 +510,7 @@ private void updateSavedPeers() {
{
JSONObject request = new JSONObject();
request.put("requestType", "getPeers");
getPeersRequest = JSON.prepareRequest(request);
getPeersRequest = prepareRequest(request);
}

private volatile boolean addedNewPeer;
Expand Down Expand Up @@ -575,7 +576,7 @@ public void run() {
JSONObject request = new JSONObject();
request.put("requestType", "addPeers");
request.put("peers", myPeers);
peer.send(JSON.prepareRequest(request));
peer.send(prepareRequest(request));
}

} catch (Exception e) {
Expand Down Expand Up @@ -639,7 +640,7 @@ public static Collection<? extends Peer> getAllPeers() {
return allPeers;
}

public static Collection<? extends Peer> getActivePeers() {
public static List<? extends Peer> getActivePeers() {
List<PeerImpl> activePeers = new ArrayList<>();
for (PeerImpl peer : peers.values()) {
if (peer.getState() != Peer.State.NON_CONNECTED) {
Expand Down Expand Up @@ -752,7 +753,7 @@ public static void sendToSomePeers(Block block) {
request.put("requestType", "processBlock");

blocksSendingService.submit(() -> {
final JSONStreamAware jsonRequest = JSON.prepareRequest(request);
final JSONStreamAware jsonRequest = prepareRequest(request);

int successful = 0;
List<Future<JSONObject>> expectedResponses = new ArrayList<>();
Expand Down Expand Up @@ -789,7 +790,7 @@ public static void sendToSomePeers(Block block) {
static {
JSONObject request = new JSONObject();
request.put("requestType", "getUnconfirmedTransactions");
getUnconfirmedTransactionsRequest = JSON.prepareRequest(request);
getUnconfirmedTransactionsRequest = prepareRequest(request);
}

private static final ExecutorService utReceivingService = Executors.newCachedThreadPool();
Expand All @@ -814,26 +815,30 @@ public synchronized static void feedingTime(Peer peer, Function<Peer, List<Trans

private static void feedPeer(Peer peer, Function<Peer, List<Transaction>> foodDispenser, BiConsumer<Peer, List<Transaction>> doneFeedingLog) {
List<Transaction> transactionsToSend = foodDispenser.apply(peer);

if(! transactionsToSend.isEmpty()) {
logger.debug("Feeding {} {} transactions", peer.getPeerAddress(), transactionsToSend.size());
peer.send(sendUnconfirmedTransactionsRequest(transactionsToSend));
logger.info("Feeding {} {} transactions", peer.getPeerAddress(), transactionsToSend.size());
JSONObject response = peer.send(sendUnconfirmedTransactionsRequest(transactionsToSend));

if(response != null && response.get("error") == null) {
doneFeedingLog.accept(peer, transactionsToSend);
} else {
logger.error("Error feeding {} transactions: {} error: {}", peer.getPeerAddress(), transactionsToSend.stream().map(t -> t.getId()), response);
}
} else {
logger.debug("No need to feed {}", peer.getPeerAddress());
logger.info("No need to feed {}", peer.getPeerAddress());
}

beingProcessed.remove(peer);

doneFeedingLog.accept(peer, transactionsToSend);

if(processingQueue.contains(peer)) {
processingQueue.remove(peer);
beingProcessed.add(peer);
feedPeer(peer, foodDispenser, doneFeedingLog);
}
}


private static JSONObject sendUnconfirmedTransactionsRequest(List<Transaction> transactions) {
private static JSONStreamAware sendUnconfirmedTransactionsRequest(List<Transaction> transactions) {
JSONObject request = new JSONObject();
JSONArray transactionsData = new JSONArray();

Expand All @@ -844,7 +849,7 @@ private static JSONObject sendUnconfirmedTransactionsRequest(List<Transaction> t
request.put("requestType", "processTransactions");
request.put("transactions", transactionsData);

return request;
return prepareRequest(request);
}

private static boolean peerEligibleForSending(Peer peer, boolean sendSameBRSclass) {
Expand Down
2 changes: 1 addition & 1 deletion src/brs/peer/ProcessBlock.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public JSONStreamAware processRequest(JSONObject request, Peer peer) {
// when loading blockchain from scratch
return NOT_ACCEPTED;
}
blockchainProcessor.processPeerBlock(request);
blockchainProcessor.processPeerBlock(request, peer);
return ACCEPTED;

} catch (BurstException|RuntimeException e) {
Expand Down
4 changes: 2 additions & 2 deletions src/brs/unconfirmedtransactions/ReservedBalanceCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ void reserveBalanceAndPut(Transaction transaction) throws BurstException.Validat
);

if (senderAccount == null) {
LOGGER.debug(String.format("Transaction %d: Account %d does not exist and has no balance. Required funds: %d", transaction.getId(), transaction.getSenderId(), amountNQT));
LOGGER.info(String.format("Transaction %d: Account %d does not exist and has no balance. Required funds: %d", transaction.getId(), transaction.getSenderId(), amountNQT));

throw new BurstException.NotCurrentlyValidException("Account unknown");
} else if ( amountNQT > senderAccount.getUnconfirmedBalanceNQT() ) {
LOGGER.debug(String.format("Transaction %d: Account %d balance too low. You have %d > %d Balance",
LOGGER.info(String.format("Transaction %d: Account %d balance too low. You have %d > %d Balance",
transaction.getId(), transaction.getSenderId(), amountNQT, senderAccount.getUnconfirmedBalanceNQT()
));

Expand Down
Loading

0 comments on commit e080d86

Please sign in to comment.