Skip to content

Commit

Permalink
Add listen only option
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Dec 21, 2023
1 parent 1c313c5 commit a7874af
Showing 1 changed file with 91 additions and 47 deletions.
138 changes: 91 additions & 47 deletions node/hack/query/mainnet_test/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ var (
nodeKeyPath = flag.String("nodeKey", "mainnet_test.nodeKey", "Path to node key (will be generated if it doesn't exist)")
signerKeyPath = flag.String("signerKey", "mainnet_test.signerKey", "Path to key used to sign unsigned queries")
configDir = flag.String("configDir", ".", "Directory where nodeKey and signerKey are loaded from (default is .)")
listenOnly = flag.Bool("listenOnly", false, "Only listen for responses, don't publish anything (default is false)")
targetPeerId = flag.String("targetPeerId", "", "Only process responses from this peer ID (default is everything)")
)

Expand All @@ -80,14 +81,18 @@ func main() {
logger, _ := zap.NewDevelopment()

nodeKey := *configDir + "/" + *nodeKeyPath
signerKey := *configDir + "/" + *signerKeyPath

logger.Info("Loading signing key", zap.String("signingKeyPath", signerKey))
sk, err := common.LoadArmoredKey(signerKey, CCQ_SERVER_SIGNING_KEY, true)
if err != nil {
logger.Fatal("failed to load guardian key", zap.Error(err))
var err error
var sk *ecdsa.PrivateKey
if !*listenOnly {
signerKey := *configDir + "/" + *signerKeyPath
logger.Info("Loading signing key", zap.String("signingKeyPath", signerKey))
sk, err = common.LoadArmoredKey(signerKey, CCQ_SERVER_SIGNING_KEY, true)
if err != nil {
logger.Fatal("failed to load guardian key", zap.Error(err))
}
logger.Info("Signing key loaded", zap.String("publicKey", ethCrypto.PubkeyToAddress(sk.PublicKey).Hex()))
}
logger.Info("Signing key loaded", zap.String("publicKey", ethCrypto.PubkeyToAddress(sk.PublicKey).Hex()))

// Load p2p private key
var priv crypto.PrivKey
Expand Down Expand Up @@ -155,57 +160,61 @@ func main() {
// END SETUP
//

wethAbi, err := abi.JSON(strings.NewReader("[{\"constant\":true,\"inputs\":[],\"name\":\"name\",\"outputs\":[{\"name\":\"\",\"type\":\"string\"}],\"payable\":false,\"stateMutability\":\"view\",\"type\":\"function\"},{\"constant\":true,\"inputs\":[],\"name\":\"totalSupply\",\"outputs\":[{\"name\":\"\",\"type\":\"uint256\"}],\"payable\":false,\"stateMutability\":\"view\",\"type\":\"function\"}]"))
if err != nil {
panic(err)
}

methods := []string{"name", "totalSupply"}
callData := []*query.EthCallData{}
to, _ := hex.DecodeString("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2")

for _, method := range methods {
data, err := wethAbi.Pack(method)
if *listenOnly {
listenForMessages(th_req, ctx, logger, sub)
} else {
wethAbi, err := abi.JSON(strings.NewReader("[{\"constant\":true,\"inputs\":[],\"name\":\"name\",\"outputs\":[{\"name\":\"\",\"type\":\"string\"}],\"payable\":false,\"stateMutability\":\"view\",\"type\":\"function\"},{\"constant\":true,\"inputs\":[],\"name\":\"totalSupply\",\"outputs\":[{\"name\":\"\",\"type\":\"uint256\"}],\"payable\":false,\"stateMutability\":\"view\",\"type\":\"function\"}]"))
if err != nil {
panic(err)
}

callData = append(callData, &query.EthCallData{
To: to,
Data: data,
})
}
methods := []string{"name", "totalSupply"}
callData := []*query.EthCallData{}
to, _ := hex.DecodeString("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2")

// Fetch the latest block number
//url := "https://localhost:8545"
url := "https://rpc.ankr.com/eth"
logger.Info("Querying for latest block height", zap.String("url", url))
blockNum, err := utils.FetchLatestBlockNumberFromUrl(ctx, url)
if err != nil {
logger.Fatal("Failed to fetch latest block number", zap.Error(err))
}
for _, method := range methods {
data, err := wethAbi.Pack(method)
if err != nil {
panic(err)
}

logger.Info("latest block", zap.String("num", blockNum.String()), zap.String("encoded", hexutil.EncodeBig(blockNum)))
callData = append(callData, &query.EthCallData{
To: to,
Data: data,
})
}

// block := "0x28d9630"
// block := "latest"
// block := "0x9999bac44d09a7f69ee7941819b0a19c59ccb1969640cc513be09ef95ed2d8e2"
// Fetch the latest block number
//url := "https://localhost:8545"
url := "https://rpc.ankr.com/eth"
logger.Info("Querying for latest block height", zap.String("url", url))
blockNum, err := utils.FetchLatestBlockNumberFromUrl(ctx, url)
if err != nil {
logger.Fatal("Failed to fetch latest block number", zap.Error(err))
}

// Start of query creation...
callRequest := &query.EthCallQueryRequest{
BlockId: hexutil.EncodeBig(blockNum),
CallData: callData,
}
logger.Info("latest block", zap.String("num", blockNum.String()), zap.String("encoded", hexutil.EncodeBig(blockNum)))

// block := "0x28d9630"
// block := "latest"
// block := "0x9999bac44d09a7f69ee7941819b0a19c59ccb1969640cc513be09ef95ed2d8e2"

// Send 2 individual requests for the same thing but 5 blocks apart
// First request...
logger.Info("calling sendQueryAndGetRsp for ", zap.String("blockNum", blockNum.String()), zap.String("publicKey", ethCrypto.PubkeyToAddress(sk.PublicKey).Hex()))
queryRequest := createQueryRequest(callRequest)
sendQueryAndGetRsp(queryRequest, sk, th_req, ctx, logger, sub, wethAbi, methods)
// Start of query creation...
callRequest := &query.EthCallQueryRequest{
BlockId: hexutil.EncodeBig(blockNum),
CallData: callData,
}

// This is just so that when I look at the output, it is easier for me. (Paul)
logger.Info("sleeping for 5 seconds")
time.Sleep(time.Second * 5)
// Send 2 individual requests for the same thing but 5 blocks apart
// First request...
logger.Info("calling sendQueryAndGetRsp for ", zap.String("blockNum", blockNum.String()), zap.String("publicKey", ethCrypto.PubkeyToAddress(sk.PublicKey).Hex()))
queryRequest := createQueryRequest(callRequest)
sendQueryAndGetRsp(queryRequest, sk, th_req, ctx, logger, sub, wethAbi, methods)

// This is just so that when I look at the output, it is easier for me. (Paul)
logger.Info("sleeping for 5 seconds")
time.Sleep(time.Second * 5)
}

// Cleanly shutdown
// Without this the same host won't properly discover peers until some timeout
Expand Down Expand Up @@ -381,3 +390,38 @@ func sendQueryAndGetRsp(queryRequest *query.QueryRequest, sk *ecdsa.PrivateKey,
}
}
}

func listenForMessages(th *pubsub.Topic, ctx context.Context, logger *zap.Logger, sub *pubsub.Subscription) {

Check failure on line 394 in node/hack/query/mainnet_test/mainnet.go

View workflow job for this annotation

GitHub Actions / node-lint

`listenForMessages` - `th` is unused (unparam)
if *targetPeerId == "" {
logger.Info("Will not publish, only listening for messages from all peers...")
} else {
logger.Info("Will not publish, only listening for messages from a single peer...", zap.String("peerID", *targetPeerId))
}
for {
envelope, err := sub.Next(ctx)
if err != nil {
logger.Panic("failed to receive pubsub message", zap.Error(err))
}
var msg gossipv1.GossipMessage
err = proto.Unmarshal(envelope.Data, &msg)
if err != nil {
logger.Info("received invalid message",
zap.Binary("data", envelope.Data),
zap.String("from", envelope.GetFrom().String()))
continue
}
switch m := msg.Message.(type) {
case *gossipv1.GossipMessage_SignedQueryResponse:
if *targetPeerId != "" && envelope.GetFrom().String() != *targetPeerId {
continue
}
logger.Info("query response received",
zap.String("from", envelope.GetFrom().String()),
zap.Any("response", m.SignedQueryResponse),
zap.String("responseBytes", hexutil.Encode(m.SignedQueryResponse.QueryResponse)),
zap.String("sigBytes", hexutil.Encode(m.SignedQueryResponse.Signature)))
default:
continue
}
}
}

0 comments on commit a7874af

Please sign in to comment.