diff --git a/.changeset/calm-tables-scream.md b/.changeset/calm-tables-scream.md new file mode 100644 index 0000000000..e3e170fa71 --- /dev/null +++ b/.changeset/calm-tables-scream.md @@ -0,0 +1,8 @@ +--- +"@farcaster/hub-nodejs": minor +"@farcaster/hub-web": minor +"@farcaster/core": minor +"@farcaster/hubble": minor +--- + +Remove PubSub PeerDiscovery in favor of ContactInfo based PeerDiscovery diff --git a/apps/hubble/grafana/grafana-dashboard.json b/apps/hubble/grafana/grafana-dashboard.json index f855167c7b..d125dd94f5 100644 --- a/apps/hubble/grafana/grafana-dashboard.json +++ b/apps/hubble/grafana/grafana-dashboard.json @@ -20,7 +20,7 @@ "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 0, - "id": null, + "id": 1, "links": [], "liveNow": false, "panels": [ @@ -75,7 +75,9 @@ "justifyMode": "auto", "orientation": "auto", "reduceOptions": { - "calcs": ["lastNotNull"], + "calcs": [ + "lastNotNull" + ], "fields": "", "values": false }, @@ -129,7 +131,9 @@ "justifyMode": "auto", "orientation": "auto", "reduceOptions": { - "calcs": ["lastNotNull"], + "calcs": [ + "lastNotNull" + ], "fields": "", "values": false }, @@ -373,7 +377,9 @@ "justifyMode": "auto", "orientation": "auto", "reduceOptions": { - "calcs": ["lastNotNull"], + "calcs": [ + "lastNotNull" + ], "fields": "", "values": false }, @@ -429,7 +435,9 @@ "justifyMode": "auto", "orientation": "auto", "reduceOptions": { - "calcs": ["lastNotNull"], + "calcs": [ + "lastNotNull" + ], "fields": "", "values": false }, @@ -502,7 +510,6 @@ "mode": "off" } }, - "displayName": "Messages", "mappings": [], "noValue": "Starting up... Please wait", "thresholds": { @@ -536,14 +543,18 @@ "showLegend": true }, "tooltip": { - "mode": "single", + "mode": "multi", "sort": "none" } }, + "pluginVersion": "10.0.3", "targets": [ { - "refId": "A", - "target": "stats_counts.hubble.gossip.messages", + "datasource": "Graphite", + "hide": false, + "refCount": 0, + "refId": "Protocol Messages", + "target": "stats_counts.hubble.gossip.*.*", "textEditor": true } ], @@ -599,7 +610,9 @@ "options": { "orientation": "auto", "reduceOptions": { - "calcs": ["lastNotNull"], + "calcs": [ + "lastNotNull" + ], "fields": "", "values": false }, @@ -695,6 +708,18 @@ "value": "Outbound Peers" } ] + }, + { + "matcher": { + "id": "byName", + "options": "stats.gauges.hubble.peer_store.count" + }, + "properties": [ + { + "id": "displayName", + "value": "Discovered Peers" + } + ] } ] }, @@ -713,21 +738,30 @@ "showLegend": true }, "tooltip": { - "mode": "single", + "mode": "multi", "sort": "none" } }, "targets": [ { + "datasource": "Graphite", "refCount": 0, "refId": "A", "target": "stats.gauges.hubble.gossip.peers.inbound" }, { + "datasource": "Graphite", "hide": false, "refCount": 0, "refId": "B", "target": "stats.gauges.hubble.gossip.peers.outbound" + }, + { + "datasource": "Graphite", + "hide": false, + "refCount": 0, + "refId": "C", + "target": "stats.gauges.hubble.peer_store.count" } ], "title": "Gossip Peers", @@ -870,7 +904,9 @@ "options": { "orientation": "auto", "reduceOptions": { - "calcs": ["sum"], + "calcs": [ + "sum" + ], "fields": "", "values": false }, @@ -923,7 +959,9 @@ "options": { "orientation": "auto", "reduceOptions": { - "calcs": ["lastNotNull"], + "calcs": [ + "lastNotNull" + ], "fields": "", "values": false }, @@ -1173,7 +1211,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -1258,7 +1297,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -1357,7 +1397,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -1465,7 +1506,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -1586,7 +1628,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -1669,7 +1712,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -1766,7 +1810,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -1863,7 +1908,9 @@ "footer": { "countRows": false, "fields": "", - "reducer": ["sum"], + "reducer": [ + "sum" + ], "show": false }, "showHeader": true, @@ -1888,7 +1935,9 @@ { "id": "reduce", "options": { - "reducers": ["lastNotNull"] + "reducers": [ + "lastNotNull" + ] } } ], @@ -1910,6 +1959,6 @@ "timezone": "", "title": "Hubble Dashboard", "uid": "af04c037-bd8f-484a-b93e-0cb4b7d3b026", - "version": 4, + "version": 2, "weekStart": "" -} +} \ No newline at end of file diff --git a/apps/hubble/package.json b/apps/hubble/package.json index 064a7b74c5..db74e67452 100644 --- a/apps/hubble/package.json +++ b/apps/hubble/package.json @@ -77,7 +77,6 @@ "@libp2p/interface-peer-id": "^2.0.1", "@libp2p/mplex": "^7.0.0", "@libp2p/peer-id-factory": "^2.0.0", - "@libp2p/pubsub-peer-discovery": "^8.0.0", "@libp2p/tcp": "^6.0.0", "@libp2p/utils": "^3.0.2", "@multiformats/multiaddr": "^11.0.0", diff --git a/apps/hubble/src/hubble.ts b/apps/hubble/src/hubble.ts index bf1ba7800d..6404eaae65 100644 --- a/apps/hubble/src/hubble.ts +++ b/apps/hubble/src/hubble.ts @@ -22,7 +22,7 @@ import { peerIdFromBytes, peerIdFromString } from "@libp2p/peer-id"; import { publicAddressesFirst } from "@libp2p/utils/address-sort"; import { Multiaddr, multiaddr } from "@multiformats/multiaddr"; import { Result, ResultAsync, err, ok } from "neverthrow"; -import { GossipNode, MAX_MESSAGE_QUEUE_SIZE } from "./network/p2p/gossipNode.js"; +import { GossipNode, MAX_MESSAGE_QUEUE_SIZE, GOSSIP_SEEN_TTL } from "./network/p2p/gossipNode.js"; import { PeriodicSyncJobScheduler } from "./network/sync/periodicSyncJob.js"; import SyncEngine from "./network/sync/syncEngine.js"; import AdminServer from "./rpc/adminServer.js"; @@ -91,6 +91,8 @@ export const FARCASTER_VERSIONS_SCHEDULE: VersionSchedule[] = [ { version: "2023.10.4", expiresAt: 1701216000000 }, // expires at 11/28/23 00:00 UTC ]; +const MAX_CONTACT_INFO_AGE_MS = GOSSIP_SEEN_TTL; + export interface HubInterface { engine: Engine; identity: string; @@ -520,7 +522,11 @@ export class Hub implements HubInterface { if (dbResult.isErr()) { retryCount++; logger.error( - { retryCount, error: dbResult.error, errorMessage: dbResult.error.message }, + { + retryCount, + error: dbResult.error, + errorMessage: dbResult.error.message, + }, "failed to open rocksdb. Retry in 15s", ); @@ -598,7 +604,9 @@ export class Hub implements HubInterface { if (this.options.network === FarcasterNetwork.MAINNET) { const networkConfig = await fetchNetworkConfig(); if (networkConfig.isErr()) { - log.error("failed to fetch network config", { error: networkConfig.error }); + log.error("failed to fetch network config", { + error: networkConfig.error, + }); } else { const shouldExit = this.applyNetworkConfig(networkConfig.value); if (shouldExit) { @@ -648,7 +656,7 @@ export class Hub implements HubInterface { this.pruneEventsJobScheduler.start(this.options.pruneEventsJobCron); this.checkFarcasterVersionJobScheduler.start(); this.validateOrRevokeMessagesJobScheduler.start(); - this.gossipContactInfoJobScheduler.start(); + this.gossipContactInfoJobScheduler.start("*/1 * * * *"); // Every minute this.checkIncomingPortsJobScheduler.start(); // Mainnet only jobs @@ -720,7 +728,9 @@ export class Hub implements HubInterface { log.info({ latestSnapshotKey }, "found latest S3 snapshot"); const snapshotUrl = `https://download.farcaster.xyz/${latestSnapshotKey}`; - const response2 = await axios.get(snapshotUrl, { responseType: "stream" }); + const response2 = await axios.get(snapshotUrl, { + responseType: "stream", + }); const totalSize = parseInt(response2.headers["content-length"], 10); let downloadedSize = 0; @@ -787,7 +797,11 @@ export class Hub implements HubInterface { const gossipPort = nodeMultiAddr?.nodeAddress().port; const rpcPort = this.rpcServer.address?.map((addr) => addr.port).unwrapOr(0); - const gossipAddressContactInfo = GossipAddressInfo.create({ address: announceIp, family, port: gossipPort }); + const gossipAddressContactInfo = GossipAddressInfo.create({ + address: announceIp, + family, + port: gossipPort, + }); const rpcAddressContactInfo = GossipAddressInfo.create({ address: announceIp, family, @@ -805,6 +819,7 @@ export class Hub implements HubInterface { hubVersion: FARCASTER_VERSION, network: this.options.network, appVersion: APP_VERSION, + timestamp: Date.now(), }); }); } @@ -879,7 +894,10 @@ export class Hub implements HubInterface { } else { const contactInfo = contactInfoResult.value; log.info( - { rpcAddress: contactInfo.rpcAddress?.address, rpcPort: contactInfo.rpcAddress?.port }, + { + rpcAddress: contactInfo.rpcAddress?.address, + rpcPort: contactInfo.rpcAddress?.port, + }, "gossiping contact info", ); @@ -908,7 +926,10 @@ export class Hub implements HubInterface { // If there are too many messages in the queue, drop this message. This is a gossip message, so the sync // will eventually re-fetch and merge this message in anyway. log.warn( - { syncTrieQ: this.syncEngine.syncTrieQSize, syncMergeQ: this.syncEngine.syncMergeQSize }, + { + syncTrieQ: this.syncEngine.syncTrieQSize, + syncMergeQ: this.syncEngine.syncMergeQSize, + }, "Sync queue is full, dropping gossip message", ); return err(new HubError("unavailable", "Sync queue is full")); @@ -933,22 +954,29 @@ export class Hub implements HubInterface { } return result.map(() => undefined); } else if (gossipMessage.contactInfoContent) { - await this.handleContactInfo(peerIdResult.value, gossipMessage.contactInfoContent); - this.gossipNode.reportValid(msgId, peerIdFromString(source.toString()).toBytes(), true); + const result = await this.handleContactInfo(peerIdResult.value, gossipMessage.contactInfoContent); + this.gossipNode.reportValid(msgId, peerIdFromString(source.toString()).toBytes(), result); return ok(undefined); } else { return err(new HubError("bad_request.invalid_param", "invalid message type")); } } - private async handleContactInfo(peerId: PeerId, message: ContactInfoContent): Promise { + private async handleContactInfo(peerId: PeerId, message: ContactInfoContent): Promise { + statsd().gauge("peer_store.count", await this.gossipNode.peerStoreCount()); + + // Don't process messages that are too old + if (message.timestamp && message.timestamp < Date.now() - MAX_CONTACT_INFO_AGE_MS) { + log.debug({ message }, "contact info message is too old"); + return false; + } // Updates the address book for this peer const gossipAddress = message.gossipAddress; if (gossipAddress) { const addressInfo = addressInfoFromGossip(gossipAddress); if (addressInfo.isErr()) { log.error({ error: addressInfo.error, gossipAddress }, "unable to parse gossip address for peer"); - return; + return false; } const p2pMultiAddrResult = p2pMultiAddrStr(addressInfo.value, peerId.toString()).map((addr: string) => @@ -960,24 +988,32 @@ export class Hub implements HubInterface { if (p2pMultiAddrResult.isErr()) { log.error( - { error: p2pMultiAddrResult.error, message, address: addressInfo.value }, + { + error: p2pMultiAddrResult.error, + message, + address: addressInfo.value, + }, "failed to create multiaddr", ); - return; + return false; } if (p2pMultiAddrResult.value.isErr()) { log.error( - { error: p2pMultiAddrResult.value.error, message, address: addressInfo.value }, + { + error: p2pMultiAddrResult.value.error, + message, + address: addressInfo.value, + }, "failed to parse multiaddr", ); - return; + return false; } if (!(await this.isValidPeer(peerId, message))) { await this.gossipNode.removePeerFromAddressBook(peerId); this.syncEngine.removeContactInfoForPeerId(peerId.toString()); - return; + return false; } const multiaddrValue = p2pMultiAddrResult.value.value; @@ -990,7 +1026,7 @@ export class Hub implements HubInterface { const peerInfo = this.syncEngine.getContactInfoForPeerId(peerId.toString()); if (peerInfo) { log.debug({ peerInfo }, "Already have this peer, skipping sync"); - return; + return true; } else { // If it is a new client, we do a sync against it log.info({ peerInfo, connectedPeers: this.syncEngine.getPeerCount() }, "New Peer ContactInfo"); @@ -1003,6 +1039,12 @@ export class Hub implements HubInterface { log.error({ error: syncResult.error, peerId }, "Failed to sync with new peer"); } } + + // if the contact info doesn't include a timestamp, consider it invalid but allow the peer to stay in the address book + // TODO remove this once all peers are updated past 1.6.4 + if (message.timestamp === 0) return false; + + return true; } /** Since we don't know if the peer is using SSL or not, we'll attempt to get the SSL version, @@ -1118,11 +1160,13 @@ export class Hub implements HubInterface { setTimeout(async () => { await this.gossipContactInfo(); }, 1 * 1000); + statsd().increment("peer_connect.count"); }); this.gossipNode.on("peerDisconnect", async (connection) => { // Remove this peer's connection this.syncEngine.removeContactInfoForPeerId(connection.remotePeer.toString()); + statsd().increment("peer_disconnect.count"); }); } @@ -1132,7 +1176,10 @@ export class Hub implements HubInterface { async submitMessage(submittedMessage: Message, source?: HubSubmitSource): HubAsyncResult { // message is a reserved key in some logging systems, so we use submittedMessage instead - const logMessage = log.child({ submittedMessage: messageToLog(submittedMessage), source }); + const logMessage = log.child({ + submittedMessage: messageToLog(submittedMessage), + source, + }); if (this.syncEngine.syncTrieQSize > MAX_MESSAGE_QUEUE_SIZE) { log.warn({ syncTrieQSize: this.syncEngine.syncTrieQSize }, "SubmitMessage rejected: Sync trie queue is full"); @@ -1178,7 +1225,10 @@ export class Hub implements HubInterface { } async submitUserNameProof(usernameProof: UserNameProof, source?: HubSubmitSource): HubAsyncResult { - const logEvent = log.child({ event: usernameProofToLog(usernameProof), source }); + const logEvent = log.child({ + event: usernameProofToLog(usernameProof), + source, + }); const mergeResult = await this.engine.mergeUserNameProof(usernameProof); @@ -1308,7 +1358,12 @@ export class Hub implements HubInterface { const versionCheckResult = ensureAboveMinFarcasterVersion(theirVersion); if (versionCheckResult.isErr()) { log.warn( - { peerId: otherPeerId, theirVersion, ourVersion: FARCASTER_VERSION, errMsg: versionCheckResult.error.message }, + { + peerId: otherPeerId, + theirVersion, + ourVersion: FARCASTER_VERSION, + errMsg: versionCheckResult.error.message, + }, "Peer is running an outdated version, ignoring", ); return false; @@ -1359,7 +1414,11 @@ export class Hub implements HubInterface { const latestJsonParams = { Bucket: this.s3_snapshot_bucket, Key: `${this.getSnapshotFolder()}/latest.json`, - Body: JSON.stringify({ key, timestamp: Date.now(), serverDate: new Date().toISOString() }), + Body: JSON.stringify({ + key, + timestamp: Date.now(), + serverDate: new Date().toISOString(), + }), }; try { @@ -1373,7 +1432,11 @@ export class Hub implements HubInterface { } async listS3Snapshots(): HubAsyncResult< - Array<{ Key: string | undefined; Size: number | undefined; LastModified: Date | undefined }> + Array<{ + Key: string | undefined; + Size: number | undefined; + LastModified: Date | undefined; + }> > { const network = FarcasterNetwork[this.options.network].toString(); diff --git a/apps/hubble/src/network/p2p/gossipNode.test.ts b/apps/hubble/src/network/p2p/gossipNode.test.ts index 77a3ce3f7c..85d85a10fa 100644 --- a/apps/hubble/src/network/p2p/gossipNode.test.ts +++ b/apps/hubble/src/network/p2p/gossipNode.test.ts @@ -205,16 +205,16 @@ describe("GossipNode", () => { expect(res._unsafeUnwrapErr().errCode).toEqual("bad_request.duplicate"); }); - test("Gossip Ids do not match for gossip internal messages", async () => { + test("Gossip Ids do match for gossip internal messages", async () => { await node.start([]); const contactInfo = ContactInfoContent.create(); await node.gossipContactInfo(contactInfo); const res2 = await node.gossipContactInfo(contactInfo); - expect(res2.isOk()).toBeTruthy(); + expect(res2.isErr()).toBeTruthy(); }); - test("Gossip Ids do not match for gossip V1 messages", async () => { + test("Gossip Ids do match for gossip V1 messages", async () => { const node = new LibP2PNode(FarcasterNetwork.DEVNET); await node.makeNode({}); @@ -226,10 +226,10 @@ describe("GossipNode", () => { }); await node.publish(v1Message); - // won't be detected as a duplicate + // should be detected as a duplicate const result = await node.publish(v1Message); result.forEach((res) => { - expect(res.isOk()).toBeTruthy(); + expect(res.isErr()).toBeTruthy(); }); }); diff --git a/apps/hubble/src/network/p2p/gossipNode.ts b/apps/hubble/src/network/p2p/gossipNode.ts index c146552032..376f834f7b 100644 --- a/apps/hubble/src/network/p2p/gossipNode.ts +++ b/apps/hubble/src/network/p2p/gossipNode.ts @@ -28,6 +28,8 @@ import EventEmitter from "events"; /** The maximum number of pending merge messages before we drop new incoming gossip or sync messages. */ export const MAX_MESSAGE_QUEUE_SIZE = 100_000; +/** The TTL for messages in the seen cache */ +export const GOSSIP_SEEN_TTL = 1000 * 60 * 5; const log = logger.child({ component: "GossipNode" }); const workerLog = logger.child({ component: "GossipNodeWorker" }); @@ -64,11 +66,16 @@ export interface NodeOptions { // A common return type for several methods on the libp2p node. // Includes a success flag, an error message and an optional error type -type SuccessOrError = { success: boolean; errorMessage: string | undefined; errorType: string | undefined }; +type SuccessOrError = { + success: boolean; + errorMessage: string | undefined; + errorType: string | undefined; +}; // An interface that defines the methods that can be called on the libp2p node export interface LibP2PNodeInterface { addToAddressBook: (peerId: Uint8Array, multiaddr: Uint8Array) => Promise; + peerStoreCount: () => Promise; removeFromAddressBook: (peerId: Uint8Array) => Promise; getFromAddressBook: (peerId: Uint8Array) => Promise; allPeerIds: () => Promise; @@ -150,7 +157,9 @@ export class GossipNode extends TypedEmitter { // We use the "../../../" so that it works when running tests from the root directory // and also in prod const workerPath = new URL("../../../build/network/p2p/gossipNodeWorker.js", import.meta.url); - this._nodeWorker = new Worker(workerPath, { workerData: { network: this._network } }); + this._nodeWorker = new Worker(workerPath, { + workerData: { network: this._network }, + }); this._nodeWorker.addListener("message", (event) => { // Check if this is a libp2p node event. These are events generated by the libp2p node and are @@ -211,6 +220,10 @@ export class GossipNode extends TypedEmitter { await this.callMethod("addToAddressBook", exportToProtobuf(peerId), multiaddr.bytes); } + async peerStoreCount(): Promise { + return this.callMethod("peerStoreCount"); + } + /** Removes the peer from the address book and hangs up on them */ async removePeerFromAddressBook(peerId: PeerId) { await this.callMethod("removeFromAddressBook", exportToProtobuf(peerId)); @@ -246,7 +259,13 @@ export class GossipNode extends TypedEmitter { this._multiaddrs = multiaddrs.map((m) => multiaddr(m)); this._isStarted = true; - log.info({ identity: this.identity, addresses: this.multiaddrs().map((m) => m.toString()) }, "Starting libp2p"); + log.info( + { + identity: this.identity, + addresses: this.multiaddrs().map((m) => m.toString()), + }, + "Starting libp2p", + ); // Wait for a few seconds for everything to initialize before connecting to bootstrap nodes setTimeout(() => this.bootstrap(bootstrapAddrs), 1 * 1000); @@ -358,7 +377,11 @@ export class GossipNode extends TypedEmitter { this._nodeEvents?.addListener("peer:connect", (detail) => { // console.log("Peer Connected", JSON.stringify(detail, null, 2)); log.info( - { peer: detail.remotePeer, addrs: detail.remoteAddr, type: detail.stat.direction }, + { + peer: detail.remotePeer, + addrs: detail.remoteAddr, + type: detail.stat.direction, + }, "P2P Connection established", ); this.emit("peerConnect", detail); @@ -369,6 +392,9 @@ export class GossipNode extends TypedEmitter { this.emit("peerDisconnect", detail); this.updateStatsdPeerGauges(); }); + this._nodeEvents?.addListener("peer:discovery", (detail) => { + log.info({ peer: detail }, "Discovered peer"); + }); this._nodeEvents?.addListener("gossipsub:message", (detail) => { log.debug({ identity: this.identity, @@ -376,6 +402,7 @@ export class GossipNode extends TypedEmitter { from: detail.propagationSource, topic: detail.msg.topic, }); + statsd().increment(`gossip.${detail.msg.topic}.messages`); // ignore messages not in our topic lists (e.g. GossipSub peer discovery messages) if (this.gossipTopics().includes(detail.msg.topic)) { @@ -396,7 +423,6 @@ export class GossipNode extends TypedEmitter { } catch (e) { logger.error({ e, data: detail.msg.data }, "Failed to decode message"); } - statsd().increment("gossip.messages"); } else { // Report other messages we don't care about (peer discovery mainly) as being valid, so they can be forwarded correctly this.reportValid(detail.msgId, peerIdFromString(detail.propagationSource.toString()).toBytes(), true); diff --git a/apps/hubble/src/network/p2p/gossipNodeWorker.ts b/apps/hubble/src/network/p2p/gossipNodeWorker.ts index 1d6d054cfc..e7c01ebe88 100644 --- a/apps/hubble/src/network/p2p/gossipNodeWorker.ts +++ b/apps/hubble/src/network/p2p/gossipNodeWorker.ts @@ -9,6 +9,7 @@ import { LibP2PNodeMethodGenericMessage, LibP2PNodeMethodReturnType, NodeOptions, + GOSSIP_SEEN_TTL, } from "./gossipNode.js"; import { ContactInfoContent, @@ -28,11 +29,11 @@ import { ConnectionFilter } from "./connectionFilter.js"; import { tcp } from "@libp2p/tcp"; import { mplex } from "@libp2p/mplex"; import { noise } from "@chainsafe/libp2p-noise"; -import { pubsubPeerDiscovery } from "@libp2p/pubsub-peer-discovery"; -import { GOSSIP_PROTOCOL_VERSION, msgIdFnStrictSign } from "./protocol.js"; +import { GOSSIP_PROTOCOL_VERSION, msgIdFnStrictNoSign, msgIdFnStrictSign } from "./protocol.js"; import { PeerId } from "@libp2p/interface-peer-id"; import { createFromProtobuf, exportToProtobuf } from "@libp2p/peer-id-factory"; import { Logger } from "../../utils/logger.js"; +import { statsd } from "../../utils/statsd.js"; const MultiaddrLocalHost = "/ip4/127.0.0.1"; @@ -41,7 +42,9 @@ const log = new Proxy({} as Logger, { get: (_target, prop) => { // biome-ignore lint/suspicious/noExplicitAny: return (...args: any[]) => { - parentPort?.postMessage({ log: { level: prop, logObj: args[0], message: args[1] } }); + parentPort?.postMessage({ + log: { level: prop, logObj: args[0], message: args[1] }, + }); }; }, }); @@ -85,7 +88,6 @@ export class LibP2PNode { const listenIPMultiAddr = options.ipMultiAddr ?? MultiaddrLocalHost; const listenPort = options.gossipPort ?? 0; const listenMultiAddrStr = `${listenIPMultiAddr}/tcp/${listenPort}`; - const peerDiscoveryTopic = `_farcaster.${this._network}.peer_discovery`; const peerId = options.peerId ? await createFromProtobuf(options.peerId) : undefined; @@ -114,7 +116,7 @@ export class LibP2PNode { msgIdFn: this.getMessageId.bind(this), directPeers: options.directPeers || [], canRelayMessage: true, - seenTTL: 1000 * 60 * 5, // Bump up the default to handle large flood of messages. 2 mins was not sufficient to prevent a loop + seenTTL: GOSSIP_SEEN_TTL, // Bump up the default to handle large flood of messages. 2 mins was not sufficient to prevent a loop scoreThresholds: { ...options.scoreThresholds }, }); @@ -130,7 +132,11 @@ export class LibP2PNode { ); } else { log.warn( - { identity: this.identity, deniedPeerIds: options.deniedPeerIdStrs, function: "createNode" }, + { + identity: this.identity, + deniedPeerIds: options.deniedPeerIdStrs, + function: "createNode", + }, "No PEER-ID RESTRICTIONS ENABLED. This node will accept connections from any peer", ); } @@ -149,11 +155,13 @@ export class LibP2PNode { streamMuxers: [mplex()], connectionEncryption: [noise()], pubsub: gossip, - peerDiscovery: [pubsubPeerDiscovery({ topics: [peerDiscoveryTopic] })], }), (e) => { log.error({ identity: this.identity, error: e }, "failed to create libp2p node"); - return new HubError("unavailable", { message: "failed to create libp2p node", cause: e as Error }); + return new HubError("unavailable", { + message: "failed to create libp2p node", + cause: e as Error, + }); }, ); @@ -212,7 +220,7 @@ export class LibP2PNode { } } } - return msgIdFnStrictSign(message); + return msgIdFnStrictNoSign(message); } static encodeMessage(message: GossipMessage): HubResult { @@ -249,7 +257,11 @@ export class LibP2PNode { log.error(error, `Failed to connect to peer at address: ${address}`); return err(new HubError("unavailable", error)); } - return err(new HubError("unavailable", { message: `cannot connect to peer: ${address}` })); + return err( + new HubError("unavailable", { + message: `cannot connect to peer: ${address}`, + }), + ); } async addPeerToAddressBook(peerId: PeerId, multiaddr: MultiAddr.Multiaddr) { @@ -267,6 +279,11 @@ export class LibP2PNode { } } + async peerStoreCount() { + const peers = await this._node?.peerStore.all(); + return peers?.length ?? 0; + } + /** Removes the peer from the address book and hangs up on them */ async removePeerFromAddressBook(peerId: PeerId) { if (this._node) { @@ -375,6 +392,10 @@ export class LibP2PNode { } async reportValid(messageId: string, propagationSource: PeerId, isValid: boolean) { + isValid + ? statsd().increment("gossip.async_validation.accept") + : statsd().increment("gossip.async_validation.reject"); + this.gossip?.reportMessageValidationResult( messageId, propagationSource, @@ -397,7 +418,12 @@ export class LibP2PNode { return (event: any) => { // console.log("Worker: Reboardcasting ", eventName, event.detail); // console.log(" with ", JSON.stringify(event.detail, bigIntSerializer, 2)); - parentPort?.postMessage({ event: { eventName, detail: JSON.stringify(event.detail, jsonSerializer) } }); + parentPort?.postMessage({ + event: { + eventName, + detail: JSON.stringify(event.detail, jsonSerializer), + }, + }); }; }; @@ -449,12 +475,18 @@ parentPort?.on("message", async (msg: LibP2PNodeMethodGenericMessage) => { const peerId = libp2pNode.peerId ? exportToProtobuf(libp2pNode.peerId) : new Uint8Array(); const multiaddrs = libp2pNode._node?.getMultiaddrs().map((m) => m.bytes) ?? []; - parentPort?.postMessage({ methodCallId, result: makeResult<"start">({ peerId, multiaddrs }) }); + parentPort?.postMessage({ + methodCallId, + result: makeResult<"start">({ peerId, multiaddrs }), + }); break; } case "stop": { await libp2pNode.stop(); - parentPort?.postMessage({ methodCallId, result: makeResult<"stop">(undefined) }); + parentPort?.postMessage({ + methodCallId, + result: makeResult<"stop">(undefined), + }); // Exit the worker thread setTimeout(() => process.exit(0), 1000); @@ -462,7 +494,10 @@ parentPort?.on("message", async (msg: LibP2PNodeMethodGenericMessage) => { } case "allPeerIds": { const peerIds = libp2pNode.allPeerIds(); - parentPort?.postMessage({ methodCallId, result: makeResult<"allPeerIds">(peerIds) }); + parentPort?.postMessage({ + methodCallId, + result: makeResult<"allPeerIds">(peerIds), + }); break; } case "addToAddressBook": { @@ -470,7 +505,19 @@ parentPort?.on("message", async (msg: LibP2PNodeMethodGenericMessage) => { const [peerId, multiaddr] = specificMsg.args; await libp2pNode.addPeerToAddressBook(await createFromProtobuf(peerId), MultiAddr.multiaddr(multiaddr)); - parentPort?.postMessage({ methodCallId, result: makeResult<"addToAddressBook">(undefined) }); + parentPort?.postMessage({ + methodCallId, + result: makeResult<"addToAddressBook">(undefined), + }); + break; + } + case "peerStoreCount": { + const count = await libp2pNode.peerStoreCount(); + + parentPort?.postMessage({ + methodCallId, + result: makeResult<"peerStoreCount">(count), + }); break; } case "removeFromAddressBook": { @@ -478,7 +525,10 @@ parentPort?.on("message", async (msg: LibP2PNodeMethodGenericMessage) => { const [peerId] = specificMsg.args; await libp2pNode.removePeerFromAddressBook(await createFromProtobuf(peerId)); - parentPort?.postMessage({ methodCallId, result: makeResult<"removeFromAddressBook">(undefined) }); + parentPort?.postMessage({ + methodCallId, + result: makeResult<"removeFromAddressBook">(undefined), + }); break; } case "connectAddress": { @@ -498,7 +548,10 @@ parentPort?.on("message", async (msg: LibP2PNodeMethodGenericMessage) => { } case "connectionStats": { const stats = await libp2pNode.connectionStats(); - parentPort?.postMessage({ methodCallId, result: makeResult<"connectionStats">(stats) }); + parentPort?.postMessage({ + methodCallId, + result: makeResult<"connectionStats">(stats), + }); break; } case "getPeerAddresses": { @@ -527,7 +580,10 @@ parentPort?.on("message", async (msg: LibP2PNodeMethodGenericMessage) => { const [peerIds] = specificMsg.args; libp2pNode.updateAllowedPeerIds(peerIds); - parentPort?.postMessage({ methodCallId, result: makeResult<"updateAllowedPeerIds">(undefined) }); + parentPort?.postMessage({ + methodCallId, + result: makeResult<"updateAllowedPeerIds">(undefined), + }); break; } case "updateDeniedPeerIds": { @@ -535,7 +591,10 @@ parentPort?.on("message", async (msg: LibP2PNodeMethodGenericMessage) => { const [peerIds] = specificMsg.args; libp2pNode.updateDeniedPeerIds(peerIds); - parentPort?.postMessage({ methodCallId, result: makeResult<"updateDeniedPeerIds">(undefined) }); + parentPort?.postMessage({ + methodCallId, + result: makeResult<"updateDeniedPeerIds">(undefined), + }); break; } case "subscribe": { @@ -543,7 +602,10 @@ parentPort?.on("message", async (msg: LibP2PNodeMethodGenericMessage) => { const [topic] = specificMsg.args; libp2pNode.subscribe(topic); - parentPort?.postMessage({ methodCallId, result: makeResult<"subscribe">(undefined) }); + parentPort?.postMessage({ + methodCallId, + result: makeResult<"subscribe">(undefined), + }); break; } case "gossipMessage": { @@ -587,7 +649,10 @@ parentPort?.on("message", async (msg: LibP2PNodeMethodGenericMessage) => { const [msgId, source, isValid] = specificMsg.args; const sourceId = peerIdFromBytes(source); await libp2pNode.reportValid(msgId, sourceId, isValid); - parentPort?.postMessage({ methodCallId, result: makeResult<"reportValid">(undefined) }); + parentPort?.postMessage({ + methodCallId, + result: makeResult<"reportValid">(undefined), + }); break; } } diff --git a/apps/hubble/src/network/p2p/protocol.ts b/apps/hubble/src/network/p2p/protocol.ts index c337b9ae7e..9aa5cb6fc2 100644 --- a/apps/hubble/src/network/p2p/protocol.ts +++ b/apps/hubble/src/network/p2p/protocol.ts @@ -1,6 +1,6 @@ import { GossipVersion } from "@farcaster/hub-nodejs"; import { Message as GossipSubMessage } from "@libp2p/interface-pubsub"; -import { msgId } from "@libp2p/pubsub/utils"; +import { msgId, noSignMsgId } from "@libp2p/pubsub/utils"; // Current gossip protocol version export const GOSSIP_PROTOCOL_VERSION = GossipVersion.V1_1; @@ -15,3 +15,9 @@ export const msgIdFnStrictSign = (message: GossipSubMessage): Uint8Array => { return msgId(message.from.toBytes(), message.sequenceNumber); }; + +/* This has been imported from the libp2p-gossipsub implementation as it's not public there */ +export function msgIdFnStrictNoSign(msg: GossipSubMessage): Uint8Array { + // Hashes the raw message data + return noSignMsgId(msg.data); +} diff --git a/apps/hubble/src/test/e2e/gossipNetwork.test.ts b/apps/hubble/src/test/e2e/gossipNetwork.test.ts index 8bb1cb4ef4..7a6bdcd52a 100644 --- a/apps/hubble/src/test/e2e/gossipNetwork.test.ts +++ b/apps/hubble/src/test/e2e/gossipNetwork.test.ts @@ -118,7 +118,7 @@ describe("gossip network tests", () => { numReactionAddMessages++; } else { castAddMessage = msg.message; - numCastAddMessages += 1; + numCastAddMessages++; } } // Cast add message must always be present, but it's ok for the reaction add message to be missing sometimes @@ -126,7 +126,7 @@ describe("gossip network tests", () => { }); expect(numCastAddMessages).toBe(NUM_NODES - 1); - // Reaction messages are not forwarded to all nodes because they are considered in valid. They stop after the first node. + // Reaction messages are not forwarded to all nodes because they are considered invalid. They stop after the first node. // This is a test that asyncValidation is working as expected. expect(numReactionAddMessages).toBe(1); diff --git a/packages/core/src/protobufs/generated/gossip.ts b/packages/core/src/protobufs/generated/gossip.ts index 6b1b6da6b4..26e47f5404 100644 --- a/packages/core/src/protobufs/generated/gossip.ts +++ b/packages/core/src/protobufs/generated/gossip.ts @@ -47,6 +47,7 @@ export interface ContactInfoContent { hubVersion: string; network: FarcasterNetwork; appVersion: string; + timestamp: number; } export interface PingMessageBody { @@ -187,6 +188,7 @@ function createBaseContactInfoContent(): ContactInfoContent { hubVersion: "", network: 0, appVersion: "", + timestamp: 0, }; } @@ -213,6 +215,9 @@ export const ContactInfoContent = { if (message.appVersion !== "") { writer.uint32(58).string(message.appVersion); } + if (message.timestamp !== 0) { + writer.uint32(64).uint64(message.timestamp); + } return writer; }, @@ -272,6 +277,13 @@ export const ContactInfoContent = { message.appVersion = reader.string(); continue; + case 8: + if (tag != 64) { + break; + } + + message.timestamp = longToNumber(reader.uint64() as Long); + continue; } if ((tag & 7) == 4 || tag == 0) { break; @@ -290,6 +302,7 @@ export const ContactInfoContent = { hubVersion: isSet(object.hubVersion) ? String(object.hubVersion) : "", network: isSet(object.network) ? farcasterNetworkFromJSON(object.network) : 0, appVersion: isSet(object.appVersion) ? String(object.appVersion) : "", + timestamp: isSet(object.timestamp) ? Number(object.timestamp) : 0, }; }, @@ -308,6 +321,7 @@ export const ContactInfoContent = { message.hubVersion !== undefined && (obj.hubVersion = message.hubVersion); message.network !== undefined && (obj.network = farcasterNetworkToJSON(message.network)); message.appVersion !== undefined && (obj.appVersion = message.appVersion); + message.timestamp !== undefined && (obj.timestamp = Math.round(message.timestamp)); return obj; }, @@ -328,6 +342,7 @@ export const ContactInfoContent = { message.hubVersion = object.hubVersion ?? ""; message.network = object.network ?? 0; message.appVersion = object.appVersion ?? ""; + message.timestamp = object.timestamp ?? 0; return message; }, }; diff --git a/protobufs/schemas/gossip.proto b/protobufs/schemas/gossip.proto index 75f7ef05ae..0b3c0cca6d 100644 --- a/protobufs/schemas/gossip.proto +++ b/protobufs/schemas/gossip.proto @@ -22,6 +22,7 @@ message ContactInfoContent { string hub_version = 5; FarcasterNetwork network = 6; string app_version = 7; + uint64 timestamp = 8; } message PingMessageBody { diff --git a/yarn.lock b/yarn.lock index c690c62109..bff71ff587 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2363,7 +2363,7 @@ dependencies: multiformats "^11.0.0" -"@libp2p/interface-peer-info@^1.0.0", "@libp2p/interface-peer-info@^1.0.2", "@libp2p/interface-peer-info@^1.0.3": +"@libp2p/interface-peer-info@^1.0.0", "@libp2p/interface-peer-info@^1.0.3": version "1.0.8" resolved "https://registry.npmjs.org/@libp2p/interface-peer-info/-/interface-peer-info-1.0.8.tgz#8380e9e40d0ec2c8be8e1a43e8a82ae97a0687c4" integrity sha512-LRvZt/9bZFYW7seAwuSg2hZuPl+FRTAsij5HtyvVwmpfVxipm6yQrKjQ+LiK/SZhIDVsSJ+UjF0mluJj+jeAzQ== @@ -2459,7 +2459,7 @@ p-defer "^4.0.0" uint8arraylist "^2.4.3" -"@libp2p/interfaces@^3.0.0", "@libp2p/interfaces@^3.0.2", "@libp2p/interfaces@^3.0.3", "@libp2p/interfaces@^3.2.0": +"@libp2p/interfaces@^3.0.0", "@libp2p/interfaces@^3.0.2", "@libp2p/interfaces@^3.2.0": version "3.3.1" resolved "https://registry.npmjs.org/@libp2p/interfaces/-/interfaces-3.3.1.tgz#519c77c030b10d776250bbebf65990af53ccb2ee" integrity sha512-3N+goQt74SmaVOjwpwMPKLNgh1uDQGw8GD12c40Kc86WOq0qvpm3NfACW+H8Su2X6KmWjCSMzk9JWs9+8FtUfg== @@ -2636,21 +2636,6 @@ uint8arraylist "^2.1.1" uint8arrays "^4.0.2" -"@libp2p/pubsub-peer-discovery@^8.0.0": - version "8.0.0" - resolved "https://registry.npmjs.org/@libp2p/pubsub-peer-discovery/-/pubsub-peer-discovery-8.0.0.tgz#3ba28875d5b3466fcd3764bcf72981a0885cee84" - integrity sha512-BRQXObeyyayAAcvG7C0+lwrM+/edvUKHuv5em4UiHAZ19PFe0GQDQ1/RXP8kQUWyVOn8jMmbhSfDPPq3pGcK1Q== - dependencies: - "@libp2p/interface-peer-discovery" "^1.0.1" - "@libp2p/interface-peer-id" "^2.0.0" - "@libp2p/interface-peer-info" "^1.0.2" - "@libp2p/interface-pubsub" "^3.0.0" - "@libp2p/interfaces" "^3.0.3" - "@libp2p/logger" "^2.0.1" - "@libp2p/peer-id" "^2.0.0" - "@multiformats/multiaddr" "^11.0.5" - protons-runtime "^4.0.1" - "@libp2p/pubsub@^6.0.0": version "6.0.1" resolved "https://registry.npmjs.org/@libp2p/pubsub/-/pubsub-6.0.1.tgz#b8e2636697aa6ef4af758c16e734bc6dbc2fc51f" @@ -2818,7 +2803,7 @@ dependencies: "@multiformats/multiaddr" "^12.0.0" -"@multiformats/multiaddr@^11.0.0", "@multiformats/multiaddr@^11.0.5": +"@multiformats/multiaddr@^11.0.0": version "11.4.0" resolved "https://registry.npmjs.org/@multiformats/multiaddr/-/multiaddr-11.4.0.tgz#101c04291f18a5bc9e07b8354d0672a4e8fbd810" integrity sha512-rLIhSOCKQhm/fCjg+5tVM9xrtjbZjZKJg6bb65YbFsNoPSYhweEohXO8Pkg2xbRy3NqVEVkS+8DB/+VhNvjd5Q==