From 0a86de7474aac098045e85d9e920e9f623b3a108 Mon Sep 17 00:00:00 2001 From: Farber98 Date: Wed, 27 Nov 2024 13:12:38 -0300 Subject: [PATCH 1/7] set defaults to test expiration --- pkg/solana/config/config.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/solana/config/config.go b/pkg/solana/config/config.go index 202b8fca8..68d151a73 100644 --- a/pkg/solana/config/config.go +++ b/pkg/solana/config/config.go @@ -18,20 +18,20 @@ var defaultConfigSet = Chain{ TxTimeout: config.MustNewDuration(time.Minute), // timeout for send tx method in client TxRetryTimeout: config.MustNewDuration(10 * time.Second), // duration for tx rebroadcasting to RPC node TxConfirmTimeout: config.MustNewDuration(30 * time.Second), // duration before discarding tx as unconfirmed. Set to 0 to disable discarding tx. - TxExpirationRebroadcast: ptr(false), // to enable rebroadcasting of expired transactions + TxExpirationRebroadcast: ptr(true), // to enable rebroadcasting of expired transactions TxRetentionTimeout: config.MustNewDuration(0 * time.Second), // duration to retain transactions after being marked as finalized or errored. Set to 0 to immediately drop transactions. SkipPreflight: ptr(true), // to enable or disable preflight checks Commitment: ptr(string(rpc.CommitmentConfirmed)), MaxRetries: ptr(int64(0)), // max number of retries (default = 0). when config.MaxRetries < 0), interpreted as MaxRetries = nil and rpc node will do a reasonable number of retries // fee estimator - FeeEstimatorMode: ptr("fixed"), + FeeEstimatorMode: ptr("blockhistory"), ComputeUnitPriceMax: ptr(uint64(1_000)), ComputeUnitPriceMin: ptr(uint64(0)), ComputeUnitPriceDefault: ptr(uint64(0)), FeeBumpPeriod: config.MustNewDuration(3 * time.Second), // set to 0 to disable fee bumping BlockHistoryPollPeriod: config.MustNewDuration(5 * time.Second), - BlockHistorySize: ptr(uint64(1)), // 1: uses latest block; >1: Uses multiple blocks, where n is number of blocks. DISCLAIMER: 1:1 ratio between n and RPC calls. + BlockHistorySize: ptr(uint64(15)), // 1: uses latest block; >1: Uses multiple blocks, where n is number of blocks. DISCLAIMER: 1:1 ratio between n and RPC calls. ComputeUnitLimitDefault: ptr(uint32(200_000)), // set to 0 to disable adding compute unit limit EstimateComputeUnitLimit: ptr(false), // set to false to disable compute unit limit estimation } From 1e2955111e69d7d6d8f54d0255da6209d9c246ca Mon Sep 17 00:00:00 2001 From: Farber98 Date: Thu, 28 Nov 2024 15:13:11 -0300 Subject: [PATCH 2/7] log block height vs currheight --- pkg/solana/txm/pendingtx.go | 8 ++++---- pkg/solana/txm/txm.go | 9 +++++++-- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/pkg/solana/txm/pendingtx.go b/pkg/solana/txm/pendingtx.go index 44895fd40..8123133ec 100644 --- a/pkg/solana/txm/pendingtx.go +++ b/pkg/solana/txm/pendingtx.go @@ -30,7 +30,7 @@ type PendingTxContext interface { ListAll() []solana.Signature // ListAllExpiredBroadcastedTxs returns all the txes that are in broadcasted state and have expired for given slot height compared against their lastValidBlockHeight. // Passing maxUint64 as currHeight will return all broadcasted txes. - ListAllExpiredBroadcastedTxs(currHeight uint64) []pendingTx + ListAllExpiredBroadcastedTxs(currHeight uint64) ([]pendingTx, int, int) // Expired returns whether or not confirmation timeout amount of time has passed since creation Expired(sig solana.Signature, confirmationTimeout time.Duration) bool // OnProcessed marks transactions as Processed @@ -223,7 +223,7 @@ func (c *pendingTxContext) ListAll() []solana.Signature { // ListAllExpiredBroadcastedTxs returns all the txes that are in broadcasted state and have expired for given slot height compared against their lastValidBlockHeight. // Passing maxUint64 as currHeight will return all broadcasted txes. -func (c *pendingTxContext) ListAllExpiredBroadcastedTxs(currHeight uint64) []pendingTx { +func (c *pendingTxContext) ListAllExpiredBroadcastedTxs(currHeight uint64) ([]pendingTx, int, int) { c.lock.RLock() defer c.lock.RUnlock() broadcastedTxes := make([]pendingTx, 0, len(c.broadcastedProcessedTxs)) // worst case, all of them @@ -232,7 +232,7 @@ func (c *pendingTxContext) ListAllExpiredBroadcastedTxs(currHeight uint64) []pen broadcastedTxes = append(broadcastedTxes, tx) } } - return broadcastedTxes + return broadcastedTxes, len(broadcastedTxes), len(c.broadcastedProcessedTxs) } // Expired returns if the timeout for trying to confirm a signature has been reached @@ -623,7 +623,7 @@ func (c *pendingTxContextWithProm) ListAll() []solana.Signature { return sigs } -func (c *pendingTxContextWithProm) ListAllExpiredBroadcastedTxs(currHeight uint64) []pendingTx { +func (c *pendingTxContextWithProm) ListAllExpiredBroadcastedTxs(currHeight uint64) ([]pendingTx, int, int) { return c.pendingTx.ListAllExpiredBroadcastedTxs(currHeight) } diff --git a/pkg/solana/txm/txm.go b/pkg/solana/txm/txm.go index 3ba39f2f5..95cc10139 100644 --- a/pkg/solana/txm/txm.go +++ b/pkg/solana/txm/txm.go @@ -576,8 +576,13 @@ func (txm *Txm) rebroadcastExpiredTxs(ctx context.Context, client client.ReaderW return } // Rebroadcast all expired txes - for _, tx := range txm.txs.ListAllExpiredBroadcastedTxs(currHeight) { - txm.lggr.Debugw("transaction expired, rebroadcasting", "id", tx.id, "signature", tx.signatures) + rebroadcastTxs, rebroadcastCount, allCount := txm.txs.ListAllExpiredBroadcastedTxs(currHeight) + txm.lggr.Debugw("rebroadcasting expired transactions", "rebroadcastCount", rebroadcastCount, "allCount", allCount) + for _, tx := range rebroadcastTxs { + if tx.lastValidBlockHeight > currHeight { + continue + } + txm.lggr.Debugw("transaction expired, rebroadcasting", "id", tx.id, "signature", tx.signatures, "currHeight", currHeight, "lastValidBlockHeight", tx.lastValidBlockHeight) if len(tx.signatures) == 0 { // prevent panic, shouldn't happen. txm.lggr.Errorw("no signatures found for expired transaction", "id", tx.id) continue From f7b6d3ea42bb0af50cd11c5e6fa735c2001f5979 Mon Sep 17 00:00:00 2001 From: Farber98 Date: Thu, 5 Dec 2024 08:42:39 -0300 Subject: [PATCH 3/7] add logs --- pkg/solana/txm/txm.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/solana/txm/txm.go b/pkg/solana/txm/txm.go index 95cc10139..d3ba8aaef 100644 --- a/pkg/solana/txm/txm.go +++ b/pkg/solana/txm/txm.go @@ -222,7 +222,7 @@ func (txm *Txm) sendWithRetry(ctx context.Context, msg pendingTx) (solanaGo.Tran return solanaGo.Transaction{}, "", solanaGo.Signature{}, fmt.Errorf("failed to save tx signature (%s) to inflight txs: %w", sig, err) } - txm.lggr.Debugw("tx initial broadcast", "id", msg.id, "fee", msg.cfg.BaseComputeUnitPrice, "signature", sig) + txm.lggr.Debugw("tx initial broadcast", "id", msg.id, "fee", msg.cfg.BaseComputeUnitPrice, "signature", sig, "lastValidBlockHeight", msg.lastValidBlockHeight) // Initialize signature list with initialTx signature. This list will be used to add new signatures and track retry attempts. sigs := &signatureList{} From 5e1aa9d75f769f15acce4319862283096bc46a95 Mon Sep 17 00:00:00 2001 From: Farber98 Date: Thu, 5 Dec 2024 11:04:46 -0300 Subject: [PATCH 4/7] slot height with same commitment --- pkg/solana/client/client.go | 1 + pkg/solana/txm/txm.go | 5 +---- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/solana/client/client.go b/pkg/solana/client/client.go index f9f6715b0..ee68ef6f1 100644 --- a/pkg/solana/client/client.go +++ b/pkg/solana/client/client.go @@ -32,6 +32,7 @@ type Reader interface { AccountReader Balance(ctx context.Context, addr solana.PublicKey) (uint64, error) SlotHeight(ctx context.Context) (uint64, error) + SlotHeightWithCommitment(ctx context.Context, commitment rpc.CommitmentType) (uint64, error) LatestBlockhash(ctx context.Context) (*rpc.GetLatestBlockhashResult, error) ChainID(ctx context.Context) (mn.StringID, error) GetFeeForMessage(ctx context.Context, msg string) (uint64, error) diff --git a/pkg/solana/txm/txm.go b/pkg/solana/txm/txm.go index d3ba8aaef..640127bee 100644 --- a/pkg/solana/txm/txm.go +++ b/pkg/solana/txm/txm.go @@ -570,7 +570,7 @@ func (txm *Txm) handleFinalizedSignatureStatus(sig solanaGo.Signature) { // An expired tx is one where it's blockhash lastValidBlockHeight is smaller than the current slot height. // If any error occurs during rebroadcast attempt, they are discarded, and the function continues with the next transaction. func (txm *Txm) rebroadcastExpiredTxs(ctx context.Context, client client.ReaderWriter) { - currHeight, err := client.SlotHeight(ctx) + currHeight, err := client.SlotHeightWithCommitment(ctx, txm.cfg.Commitment()) if err != nil { txm.lggr.Errorw("failed to get current slot height", "error", err) return @@ -579,9 +579,6 @@ func (txm *Txm) rebroadcastExpiredTxs(ctx context.Context, client client.ReaderW rebroadcastTxs, rebroadcastCount, allCount := txm.txs.ListAllExpiredBroadcastedTxs(currHeight) txm.lggr.Debugw("rebroadcasting expired transactions", "rebroadcastCount", rebroadcastCount, "allCount", allCount) for _, tx := range rebroadcastTxs { - if tx.lastValidBlockHeight > currHeight { - continue - } txm.lggr.Debugw("transaction expired, rebroadcasting", "id", tx.id, "signature", tx.signatures, "currHeight", currHeight, "lastValidBlockHeight", tx.lastValidBlockHeight) if len(tx.signatures) == 0 { // prevent panic, shouldn't happen. txm.lggr.Errorw("no signatures found for expired transaction", "id", tx.id) From f73893313f72e22be994ee8c1532a01bef9b7457 Mon Sep 17 00:00:00 2001 From: Farber98 Date: Thu, 5 Dec 2024 11:55:59 -0300 Subject: [PATCH 5/7] replace slot height with block height --- pkg/solana/txm/pendingtx.go | 2 +- pkg/solana/txm/txm.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/solana/txm/pendingtx.go b/pkg/solana/txm/pendingtx.go index 8123133ec..64e810a64 100644 --- a/pkg/solana/txm/pendingtx.go +++ b/pkg/solana/txm/pendingtx.go @@ -28,7 +28,7 @@ type PendingTxContext interface { Remove(sig solana.Signature) (string, error) // ListAll returns all of the signatures being tracked for all transactions not yet finalized or errored ListAll() []solana.Signature - // ListAllExpiredBroadcastedTxs returns all the txes that are in broadcasted state and have expired for given slot height compared against their lastValidBlockHeight. + // ListAllExpiredBroadcastedTxs returns all the txes that are in broadcasted state and have expired for given block height compared against their lastValidBlockHeight. // Passing maxUint64 as currHeight will return all broadcasted txes. ListAllExpiredBroadcastedTxs(currHeight uint64) ([]pendingTx, int, int) // Expired returns whether or not confirmation timeout amount of time has passed since creation diff --git a/pkg/solana/txm/txm.go b/pkg/solana/txm/txm.go index 640127bee..9711c4e5f 100644 --- a/pkg/solana/txm/txm.go +++ b/pkg/solana/txm/txm.go @@ -570,16 +570,16 @@ func (txm *Txm) handleFinalizedSignatureStatus(sig solanaGo.Signature) { // An expired tx is one where it's blockhash lastValidBlockHeight is smaller than the current slot height. // If any error occurs during rebroadcast attempt, they are discarded, and the function continues with the next transaction. func (txm *Txm) rebroadcastExpiredTxs(ctx context.Context, client client.ReaderWriter) { - currHeight, err := client.SlotHeightWithCommitment(ctx, txm.cfg.Commitment()) + currBlockHeight, err := client.GetLatestBlock(ctx) if err != nil { - txm.lggr.Errorw("failed to get current slot height", "error", err) + txm.lggr.Errorw("failed to get current block height", "error", err) return } // Rebroadcast all expired txes - rebroadcastTxs, rebroadcastCount, allCount := txm.txs.ListAllExpiredBroadcastedTxs(currHeight) + rebroadcastTxs, rebroadcastCount, allCount := txm.txs.ListAllExpiredBroadcastedTxs(*currBlockHeight.BlockHeight) txm.lggr.Debugw("rebroadcasting expired transactions", "rebroadcastCount", rebroadcastCount, "allCount", allCount) for _, tx := range rebroadcastTxs { - txm.lggr.Debugw("transaction expired, rebroadcasting", "id", tx.id, "signature", tx.signatures, "currHeight", currHeight, "lastValidBlockHeight", tx.lastValidBlockHeight) + txm.lggr.Debugw("transaction expired, rebroadcasting", "id", tx.id, "signature", tx.signatures, "currHeight", currBlockHeight.BlockHeight, "lastValidBlockHeight", tx.lastValidBlockHeight) if len(tx.signatures) == 0 { // prevent panic, shouldn't happen. txm.lggr.Errorw("no signatures found for expired transaction", "id", tx.id) continue From 3aa9278ac9e0f82d823149377d91d2ee1f8d1584 Mon Sep 17 00:00:00 2001 From: Farber98 Date: Thu, 5 Dec 2024 14:53:46 -0300 Subject: [PATCH 6/7] remove slot height commitment --- pkg/solana/client/client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/solana/client/client.go b/pkg/solana/client/client.go index ee68ef6f1..f9f6715b0 100644 --- a/pkg/solana/client/client.go +++ b/pkg/solana/client/client.go @@ -32,7 +32,6 @@ type Reader interface { AccountReader Balance(ctx context.Context, addr solana.PublicKey) (uint64, error) SlotHeight(ctx context.Context) (uint64, error) - SlotHeightWithCommitment(ctx context.Context, commitment rpc.CommitmentType) (uint64, error) LatestBlockhash(ctx context.Context) (*rpc.GetLatestBlockhashResult, error) ChainID(ctx context.Context) (mn.StringID, error) GetFeeForMessage(ctx context.Context, msg string) (uint64, error) From ea8b464236e0ff6f98b855fe08933ea777939af8 Mon Sep 17 00:00:00 2001 From: Farber98 Date: Thu, 5 Dec 2024 15:28:25 -0300 Subject: [PATCH 7/7] fix soak --- integration-tests/smoke/ocr2_test.go | 56 ++++++++++++++++++++-- integration-tests/testconfig/testconfig.go | 29 +++++++++++ 2 files changed, 82 insertions(+), 3 deletions(-) diff --git a/integration-tests/smoke/ocr2_test.go b/integration-tests/smoke/ocr2_test.go index 9b2645fcd..f285eb105 100644 --- a/integration-tests/smoke/ocr2_test.go +++ b/integration-tests/smoke/ocr2_test.go @@ -4,6 +4,7 @@ import ( "fmt" "maps" "os/exec" + "strings" "testing" "time" @@ -21,13 +22,22 @@ import ( func TestSolanaOCRV2Smoke(t *testing.T) { for _, test := range []struct { - name string - env map[string]string + name string + env map[string]string + workloadLabels map[string]string // Added workloadLabels }{ - {name: "embedded"}, + {name: "embedded", workloadLabels: map[string]string{ + "chain.link/team": "team-alpha", + "chain.link/cost-center": "cc-12345", + "chain.link/product": "product-X", + }}, {name: "plugins", env: map[string]string{ "CL_MEDIAN_CMD": "chainlink-feeds", "CL_SOLANA_CMD": "chainlink-solana", + }, workloadLabels: map[string]string{ + "chain.link/team": "team-beta", + "chain.link/cost-center": "cc-67890", + "chain.link/product": "product-Y", }}, } { config, err := tc.GetConfig("Smoke", tc.OCR2) @@ -35,6 +45,16 @@ func TestSolanaOCRV2Smoke(t *testing.T) { t.Fatal(err) } + // Merge workloadLabels into config.WorkloadLabels + if test.workloadLabels != nil { + if config.WorkloadLabels == nil { + config.WorkloadLabels = make(map[string]string) + } + for key, value := range test.workloadLabels { + config.WorkloadLabels[key] = value + } + } + test := test t.Run(test.name, func(t *testing.T) { t.Parallel() @@ -85,6 +105,36 @@ func startOCR2DataFeedsSmokeTest(t *testing.T, testname string, testenv map[stri state.SetupClients() require.NoError(t, err) + // Apply WorkloadLabels to Kubernetes resources + if len(config.WorkloadLabels) > 0 { + for resourceName, labelValue := range config.WorkloadLabels { + // Determine the resource type based on the resource name pattern + var resourceType string + switch { + case strings.Contains(resourceName, "node-creds-secret"): + resourceType = "secret" + case strings.Contains(resourceName, "cm"): + resourceType = "configmap" + case strings.Contains(resourceName, "postgres-node"): + resourceType = "deployment" // Adjust as needed + default: + resourceType = "deployment" // Default resource type + } + + // Construct the label key + splitName := strings.Split(resourceName, "-") + if len(splitName) == 0 { + require.Fail(t, "Invalid resource name format: "+resourceName) + } + labelKey := "chain.link/" + splitName[len(splitName)-1] + + // Apply the label using kubectl + cmd := exec.Command("kubectl", "label", resourceType, resourceName, fmt.Sprintf("%s=%s", labelKey, labelValue), "--overwrite") + output, errOutput := cmd.CombinedOutput() + require.NoError(t, errOutput, fmt.Sprintf("Failed to apply label to resource %s: %s", resourceName, string(output))) + } + } + gauntletConfig := map[string]string{ "SECRET": fmt.Sprintf("\"%s\"", *config.SolanaConfig.Secret), "NODE_URL": state.Common.ChainDetails.RPCURLExternal, diff --git a/integration-tests/testconfig/testconfig.go b/integration-tests/testconfig/testconfig.go index 1f482b7f5..ee71ba998 100644 --- a/integration-tests/testconfig/testconfig.go +++ b/integration-tests/testconfig/testconfig.go @@ -41,6 +41,7 @@ type TestConfig struct { OCR2 *ocr2_config.Config `toml:"OCR2"` SolanaConfig *SolanaConfig `toml:"SolanaConfig"` ConfigurationName string `toml:"-"` + WorkloadLabels map[string]string `toml:"WorkloadLabels"` // getter funcs for passing parameters GetChainID func() string @@ -224,9 +225,37 @@ func (c *TestConfig) ReadFromEnvVar() error { c.SolanaConfig.Secret = &solanaSecret } + // Add WorkloadLabels from environment variable if set + workloadLabelsEnv := os.Getenv("WORKLOAD_LABELS") + if workloadLabelsEnv != "" { + labels := parseWorkloadLabels(workloadLabelsEnv) + if c.WorkloadLabels == nil { + c.WorkloadLabels = make(map[string]string) + } + for key, value := range labels { + c.WorkloadLabels[key] = value + } + logger.Info().Msg("WorkloadLabels have been set from WORKLOAD_LABELS environment variable") + } + return nil } +// parseWorkloadLabels parses a string of labels in key=value format separated by commas +func parseWorkloadLabels(labels string) map[string]string { + result := make(map[string]string) + pairs := strings.Split(labels, ",") + for _, pair := range pairs { + kv := strings.SplitN(pair, "=", 2) + if len(kv) == 2 { + key := strings.TrimSpace(kv[0]) + value := strings.TrimSpace(kv[1]) + result[key] = value + } + } + return result +} + func (c *TestConfig) GetLoggingConfig() *ctf_config.LoggingConfig { return c.Logging }