Skip to content

Commit

Permalink
fix(core-p2p): rate limit and peer broadcast (#4054)
Browse files Browse the repository at this point in the history
  • Loading branch information
air1one authored Sep 23, 2020
1 parent ec8cbb9 commit 6988cdd
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 26 deletions.
5 changes: 4 additions & 1 deletion __tests__/integration/core-p2p/peer-communicator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { eventEmitter } from "./mocks/core-container";

import { P2P } from "@arkecosystem/core-interfaces";
import { Blocks, Transactions } from "@arkecosystem/crypto";
import delay from "delay";
import { getPeerConfig } from "../../../packages/core-p2p/src/socket-server/utils/get-peer-config";
import { createPeerService, createStubPeer } from "../../helpers/peers";
import { TransactionFactory } from "../../helpers/transaction-factory";
Expand Down Expand Up @@ -41,11 +42,13 @@ afterAll(async () => {
socketManager.stopServer();
});

beforeEach(() => {
beforeEach(async () => {
({ communicator, storage } = createPeerService());

stubPeer = createStubPeer({ ip: "127.0.0.1", port: 4009 });
storage.setPeer(stubPeer);

await delay(2000);
});

afterEach(() => socketManager.resetAllMocks());
Expand Down
4 changes: 0 additions & 4 deletions packages/core-interfaces/src/core-p2p/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ export interface IPeer {

export interface IPeerBroadcast {
ip: string;
ports: IPeerPorts;
version: string;
height: number;
latency: number;
}

export interface IPeerState {
Expand Down
21 changes: 9 additions & 12 deletions packages/core-p2p/src/peer-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,15 @@ export class PeerConnector implements P2P.IPeerConnector {
socket.on("ping", () => this.terminate(peer));
socket.on("pong", () => this.terminate(peer));
socket.on("message", data => {
if (data === "#1") {
// this is to establish some rate limit on #1 messages
// a simple rate limit of 1 per second doesnt seem to be enough, so decided to give some margin
// and allow up to 10 per second which should be more than enough
const timeNow: number = new Date().getTime();
socket._last10Pings = socket._last10Pings || [];
socket._last10Pings.push(timeNow);
if (socket._last10Pings.length >= 10) {
socket._last10Pings = socket._last10Pings.slice(socket._last10Pings.length - 10);
if (timeNow - socket._last10Pings[0] < 1000) {
this.terminate(peer);
}
// this is to establish some rate limit on socket messages
// 30 messages per second is enough for socketcluster's + our own messages
const timeNow: number = new Date().getTime();
socket._last30Messages = socket._last30Messages || [];
socket._last30Messages.push(timeNow);
if (socket._last30Messages.length >= 30) {
socket._last30Messages = socket._last30Messages.slice(socket._last30Messages.length - 30);
if (timeNow - socket._last30Messages[0] < 1000) {
this.terminate(peer);
}
}
});
Expand Down
8 changes: 1 addition & 7 deletions packages/core-p2p/src/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ export class Peer implements P2P.IPeer {
}

public toBroadcast(): P2P.IPeerBroadcast {
return {
ip: this.ip,
ports: this.ports,
version: this.version,
height: this.state.height,
latency: this.latency,
};
return { ip: this.ip };
}
}
4 changes: 2 additions & 2 deletions packages/core-p2p/src/socket-server/versions/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ export const getPeers = ({ service }: { service: P2P.IPeerService }): P2P.IPeerB
return service
.getStorage()
.getPeers()
.map(peer => peer.toBroadcast())
.sort((a, b) => a.latency - b.latency);
.sort((a, b) => a.latency - b.latency)
.map(peer => peer.toBroadcast());
};

export const getCommonBlocks = async ({
Expand Down

0 comments on commit 6988cdd

Please sign in to comment.