diff --git a/pkg/types/automation/basetypes.go b/pkg/types/automation/basetypes.go new file mode 100644 index 000000000..96013c95f --- /dev/null +++ b/pkg/types/automation/basetypes.go @@ -0,0 +1,333 @@ +package automation + +import ( + "encoding/hex" + "encoding/json" + "fmt" + "math/big" + "strings" + "time" +) + +const ( + checkResultDelimiter = 0x09 +) + +// checkResultStringTemplate is a JSON template, used for debugging purposes only. +var checkResultStringTemplate = `{ + "PipelineExecutionState":%d, + "Retryable":%v, + "Eligible":%v, + "IneligibilityReason":%d, + "UpkeepID":%s, + "Trigger":%s, + "WorkID":"%s", + "GasAllocated":%d, + "PerformData":"%s", + "FastGasWei":%s, + "LinkNative":%s, + "RetryInterval":%d +}` + +func init() { + checkResultStringTemplate = strings.Replace(checkResultStringTemplate, " ", "", -1) + checkResultStringTemplate = strings.Replace(checkResultStringTemplate, "\t", "", -1) + checkResultStringTemplate = strings.Replace(checkResultStringTemplate, "\n", "", -1) +} + +type TransmitEventType int + +const ( + UnknownEvent TransmitEventType = iota + PerformEvent + StaleReportEvent + ReorgReportEvent + InsufficientFundsReportEvent +) + +// UpkeepState is a final state of some unit of work. +type UpkeepState uint8 + +const ( + UnknownState UpkeepState = iota + // Performed means the upkeep was performed + Performed + // Ineligible means the upkeep was not eligible to be performed + Ineligible +) + +// UpkeepIdentifier is a unique identifier for the upkeep, represented as uint256 in the contract. +type UpkeepIdentifier [32]byte + +// String returns a base 10 numerical string representation of the upkeep identifier. +func (u UpkeepIdentifier) String() string { + return u.BigInt().String() +} + +func (u UpkeepIdentifier) BigInt() *big.Int { + return big.NewInt(0).SetBytes(u[:]) +} + +// FromBigInt sets the upkeep identifier from a big.Int, +// returning true if the big.Int is valid and false otherwise. +// in case of an invalid big.Int the upkeep identifier is set to 32 zeros. +func (u *UpkeepIdentifier) FromBigInt(i *big.Int) bool { + *u = [32]byte{} + if i.Cmp(big.NewInt(0)) == -1 { + return false + } + b := i.Bytes() + if len(b) == 0 { + return true + } + if len(b) <= 32 { + copy(u[32-len(b):], i.Bytes()) + return true + } + return false +} + +type BlockNumber uint64 + +// BlockKey represent a block (number and hash) +// NOTE: This struct is sent on the p2p network as part of observations to get quorum +// Any change here should be backwards compatible and should keep validation and +// quorum requirements in mind. Please ensure to get a proper review along with an +// upgrade plan before changing this +type BlockKey struct { + Number BlockNumber + Hash [32]byte +} + +type TransmitEvent struct { + // Type describes the type of event + Type TransmitEventType + // TransmitBlock is the block height of the transmit event + TransmitBlock BlockNumber + // Confirmations is the block height behind latest + Confirmations int64 + // TransactionHash is the hash for the transaction where the event originated + TransactionHash [32]byte + // UpkeepID uniquely identifies the upkeep in the registry + UpkeepID UpkeepIdentifier + // WorkID uniquely identifies the unit of work for the specified upkeep + WorkID string + // CheckBlock is the block value that the upkeep was originally checked at + CheckBlock BlockNumber +} + +// NOTE: This struct is sent on the p2p network as part of observations to get quorum +// Any change here should be backwards compatible and should keep validation and +// quorum requirements in mind. Any field that is needed to be encoded should be added +// as well to checkResultMsg struct, and to be encoded/decoded in the MarshalJSON and +// UnmarshalJSON functions. Please ensure to get a proper review along with an upgrade +// plan before changing this. +type CheckResult struct { + // zero if success, else indicates an error code + PipelineExecutionState uint8 + // if PipelineExecutionState is non zero, then retryable indicates that the same + // payload can be processed again in order to get a successful execution + Retryable bool + // Rest of these fields are only applicable if PipelineExecutionState is zero + // Eligible indicates whether this result is eligible to be performed + Eligible bool + // If result is not eligible then the reason it failed. Should be 0 if eligible + IneligibilityReason uint8 + // Upkeep is all the information that identifies the upkeep + UpkeepID UpkeepIdentifier + // Trigger is the event that triggered the upkeep to be checked + Trigger Trigger + // WorkID represents the unit of work for the check result + // Exploratory: Make workID an internal field and an external WorkID() function which generates WID + WorkID string + // GasAllocated is the gas to provide an upkeep in a report + GasAllocated uint64 + // PerformData is the raw data returned when simulating an upkeep perform + PerformData []byte + // FastGasWei is the fast gas price in wei when performing this upkeep + FastGasWei *big.Int + // Link to native ratio to be used when performing this upkeep + LinkNative *big.Int + // RetryInterval is the time interval after which the same payload can be retried. + // This field is used is special cases (such as mercury lookup), where we want to + // have a different retry interval than the default one (30s) + // NOTE: this field is not encoded in JSON and is only used internally + RetryInterval time.Duration +} + +// checkResultMsg is used for encoding and decoding check results. +type checkResultMsg struct { + PipelineExecutionState uint8 + Retryable bool + Eligible bool + IneligibilityReason uint8 + UpkeepID UpkeepIdentifier + Trigger Trigger + WorkID string + GasAllocated uint64 + PerformData []byte + FastGasWei *big.Int + LinkNative *big.Int +} + +// UniqueID returns a unique identifier for the check result. +// It is used to achieve quorum on results before being sent within a report. +func (r CheckResult) UniqueID() string { + var resultBytes []byte + + resultBytes = append(resultBytes, r.PipelineExecutionState) + resultBytes = append(resultBytes, checkResultDelimiter) + + resultBytes = append(resultBytes, []byte(fmt.Sprintf("%+v", r.Retryable))...) + resultBytes = append(resultBytes, checkResultDelimiter) + + resultBytes = append(resultBytes, []byte(fmt.Sprintf("%+v", r.Eligible))...) + resultBytes = append(resultBytes, checkResultDelimiter) + + resultBytes = append(resultBytes, r.IneligibilityReason) + resultBytes = append(resultBytes, checkResultDelimiter) + + resultBytes = append(resultBytes, r.UpkeepID[:]...) + resultBytes = append(resultBytes, checkResultDelimiter) + + resultBytes = append(resultBytes, r.Trigger.BlockHash[:]...) + resultBytes = append(resultBytes, checkResultDelimiter) + + resultBytes = append(resultBytes, big.NewInt(int64(r.Trigger.BlockNumber)).Bytes()...) + resultBytes = append(resultBytes, checkResultDelimiter) + + if r.Trigger.LogTriggerExtension != nil { + // Note: We encode the whole trigger extension so the behaiour of + // LogTriggerExtentsion.BlockNumber and LogTriggerExtentsion.BlockHash should be + // consistent across nodes when sending observations + resultBytes = append(resultBytes, []byte(fmt.Sprintf("%+v", r.Trigger.LogTriggerExtension))...) + } + resultBytes = append(resultBytes, checkResultDelimiter) + + resultBytes = append(resultBytes, r.WorkID[:]...) + resultBytes = append(resultBytes, checkResultDelimiter) + + resultBytes = append(resultBytes, big.NewInt(int64(r.GasAllocated)).Bytes()...) + resultBytes = append(resultBytes, checkResultDelimiter) + + resultBytes = append(resultBytes, r.PerformData[:]...) + resultBytes = append(resultBytes, checkResultDelimiter) + + if r.FastGasWei != nil { + resultBytes = append(resultBytes, r.FastGasWei.Bytes()...) + } + resultBytes = append(resultBytes, checkResultDelimiter) + + if r.LinkNative != nil { + resultBytes = append(resultBytes, r.LinkNative.Bytes()...) + } + resultBytes = append(resultBytes, checkResultDelimiter) + + return fmt.Sprintf("%x", resultBytes) +} + +// NOTE: this function is used for debugging purposes only. +// for encoding check results, please use the Encoder interface +func (r CheckResult) String() string { + return fmt.Sprintf( + checkResultStringTemplate, r.PipelineExecutionState, r.Retryable, r.Eligible, + r.IneligibilityReason, r.UpkeepID, r.Trigger, r.WorkID, r.GasAllocated, + hex.EncodeToString(r.PerformData), r.FastGasWei, r.LinkNative, r.RetryInterval, + ) +} + +func (r CheckResult) MarshalJSON() ([]byte, error) { + crm := &checkResultMsg{ + PipelineExecutionState: r.PipelineExecutionState, + Retryable: r.Retryable, + Eligible: r.Eligible, + IneligibilityReason: r.IneligibilityReason, + UpkeepID: r.UpkeepID, + Trigger: r.Trigger, + WorkID: r.WorkID, + GasAllocated: r.GasAllocated, + PerformData: r.PerformData, + FastGasWei: r.FastGasWei, + LinkNative: r.LinkNative, + } + + return json.Marshal(crm) +} + +func (r *CheckResult) UnmarshalJSON(data []byte) error { + var crm checkResultMsg + + if err := json.Unmarshal(data, &crm); err != nil { + return err + } + + r.PipelineExecutionState = crm.PipelineExecutionState + r.Retryable = crm.Retryable + r.Eligible = crm.Eligible + r.IneligibilityReason = crm.IneligibilityReason + r.UpkeepID = crm.UpkeepID + r.Trigger = crm.Trigger + r.WorkID = crm.WorkID + r.GasAllocated = crm.GasAllocated + r.PerformData = crm.PerformData + r.FastGasWei = crm.FastGasWei + r.LinkNative = crm.LinkNative + + return nil +} + +// BlockHistory is a list of block keys +type BlockHistory []BlockKey + +func (bh BlockHistory) Latest() (BlockKey, error) { + if len(bh) == 0 { + return BlockKey{}, fmt.Errorf("empty block history") + } + + return bh[0], nil +} + +type UpkeepPayload struct { + // Upkeep is all the information that identifies the upkeep + UpkeepID UpkeepIdentifier + // Trigger is the event that triggered the upkeep to be checked + Trigger Trigger + // WorkID uniquely identifies the unit of work for the specified upkeep + WorkID string + // CheckData is the data used to check the upkeep + CheckData []byte +} + +// Determines whether the payload is empty, used within filtering +func (p UpkeepPayload) IsEmpty() bool { + return p.WorkID == "" +} + +// CoordinatedBlockProposal is used to represent a unit of work that can be performed +// after a check block has been coordinated between nodes. +// NOTE: This struct is sent on the p2p network as part of observations to get quorum +// Any change here should be backwards compatible and should keep validation and +// quorum requirements in mind. Please ensure to get a proper review along with an +// upgrade plan before changing this +// NOTE: Only the trigger.BlockHash and trigger.BlockNumber are coordinated across +// the network to get a quorum. WorkID is guaranteed to be correctly generated. +// Rest of the fields here SHOULD NOT BE TRUSTED as they can be manipulated by +// a single malicious node. +type CoordinatedBlockProposal struct { + // UpkeepID is the id of the proposed upkeep + UpkeepID UpkeepIdentifier + // Trigger represents the event that triggered the upkeep to be checked + Trigger Trigger + // WorkID represents the unit of work for the coordinated proposal + WorkID string +} + +// ReportedUpkeep contains details of an upkeep for which a report was generated. +type ReportedUpkeep struct { + // UpkeepID id of the underlying upkeep + UpkeepID UpkeepIdentifier + // Trigger data for the upkeep + Trigger Trigger + // WorkID represents the unit of work for the reported upkeep + WorkID string +} diff --git a/pkg/types/automation/basetypes_test.go b/pkg/types/automation/basetypes_test.go new file mode 100644 index 000000000..84a9cc90f --- /dev/null +++ b/pkg/types/automation/basetypes_test.go @@ -0,0 +1,588 @@ +package automation + +import ( + "encoding/hex" + "encoding/json" + "fmt" + "math/big" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTriggerUnmarshal(t *testing.T) { + input := Trigger{ + BlockNumber: 5, + BlockHash: [32]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}, + LogTriggerExtension: &LogTriggerExtension{ + TxHash: [32]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}, + Index: 99, + }, + } + + encoded, _ := json.Marshal(input) + + rawJSON := `{"BlockNumber":5,"BlockHash":[1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4],"LogTriggerExtension":{"TxHash":[1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4],"Index":99,"BlockHash":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"BlockNumber":0}}` + + // the encoded value above should match the rawjson expected + assert.Equal(t, rawJSON, string(encoded), "encoded should match expected") + + // the plugin will decode and re-encode the trigger value at least once + // before some decoding might happen + var decodeOnce Trigger + _ = json.Unmarshal([]byte(rawJSON), &decodeOnce) + + encoded, _ = json.Marshal(decodeOnce) + + // used the re-encoded output to verify data integrity + var output Trigger + err := json.Unmarshal(encoded, &output) + + assert.NoError(t, err, "no error expected from decoding") + + expected := Trigger{ + BlockNumber: 5, + BlockHash: [32]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}, + LogTriggerExtension: &LogTriggerExtension{ + TxHash: [32]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}, + Index: 99, + }, + } + + assert.Equal(t, expected, output, "decoding should leave extension in its raw encoded state") +} + +func TestTriggerString(t *testing.T) { + input := Trigger{ + BlockNumber: 5, + BlockHash: [32]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}, + LogTriggerExtension: &LogTriggerExtension{ + TxHash: [32]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}, + Index: 99, + }, + } + + stringified := fmt.Sprintf("%v", input) + expected := ` + { + "BlockNumber":5, + "BlockHash":"0102030401020304010203040102030401020304010203040102030401020304", + "LogTriggerExtension": { + "BlockHash":"0000000000000000000000000000000000000000000000000000000000000000", + "BlockNumber":0, + "Index":99, + "TxHash":"0102030401020304010203040102030401020304010203040102030401020304" + } + }` + + assertJSONEqual(t, expected, stringified) + + input = Trigger{ + BlockNumber: 5, + BlockHash: [32]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}, + } + + stringified = fmt.Sprintf("%v", input) + expected = `{"BlockNumber":5,"BlockHash":"0102030401020304010203040102030401020304010203040102030401020304"}` + + assertJSONEqual(t, expected, stringified) +} + +func TestLogIdentifier(t *testing.T) { + input := Trigger{ + BlockNumber: 5, + BlockHash: [32]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}, + LogTriggerExtension: &LogTriggerExtension{ + TxHash: [32]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}, + Index: 99, + BlockHash: [32]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}, + }, + } + + logIdentifier := input.LogTriggerExtension.LogIdentifier() + assert.Equal(t, hex.EncodeToString(logIdentifier), "0102030401020304010203040102030401020304010203040102030401020304010203040102030401020304010203040102030401020304010203040102030400000063") +} + +func TestTriggerUnmarshal_EmptyExtension(t *testing.T) { + input := Trigger{ + BlockNumber: 5, + BlockHash: [32]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}, + } + + encoded, _ := json.Marshal(input) + + rawJSON := `{"BlockNumber":5,"BlockHash":[1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4],"LogTriggerExtension":null}` + + // the encoded value above should match the rawjson expected + assert.Equal(t, rawJSON, string(encoded), "encoded should match expected") + + // the plugin will decode and re-encode the trigger value at least once + // before some decoding might happen + var decodeOnce Trigger + _ = json.Unmarshal([]byte(rawJSON), &decodeOnce) + + encoded, _ = json.Marshal(decodeOnce) + + // used the re-encoded output to verify data integrity + var output Trigger + err := json.Unmarshal(encoded, &output) + + assert.NoError(t, err, "no error expected from decoding") + + expected := Trigger{ + BlockNumber: 5, + BlockHash: [32]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}, + } + + assert.Equal(t, expected, output, "decoding should leave extension in its raw encoded state") +} + +func TestUpkeepIdentifier_BigInt(t *testing.T) { + tests := []struct { + name string + id *big.Int + want string + ignoreConvert bool + }{ + { + name: "log trigger from decimal", + id: func() *big.Int { + id, _ := big.NewInt(0).SetString("32329108151019397958065800113404894502874153543356521479058624064899121404671", 10) + return id + }(), + want: "32329108151019397958065800113404894502874153543356521479058624064899121404671", + }, + { + name: "condition trigger from hex", + id: func() *big.Int { + id, _ := big.NewInt(0).SetString("4779a07400000000000000000000000042d780684c0bbe59fab87e6ea7f3daff", 16) + return id + }(), + want: "32329108151019397958065800113404894502533871176435583015595249457467353193215", + }, + { + name: "0 upkeep ID", + id: big.NewInt(0), + want: "0", + }, + { + name: "random upkeep ID", + id: func() *big.Int { + id, _ := big.NewInt(0).SetString("32329108151019423423423", 10) + return id + }(), + want: "32329108151019423423423", + }, + { + name: "negative upkeep ID", + id: big.NewInt(-10), + want: "0", + ignoreConvert: true, + }, + { + name: "max upkeep ID (2^256-1)", + id: func() *big.Int { + id, _ := big.NewInt(0).SetString("115792089237316195423570985008687907853269984665640564039457584007913129639935", 10) + return id + }(), + want: "115792089237316195423570985008687907853269984665640564039457584007913129639935", + }, + { + name: "out of range upkeep ID (2^256)", + id: func() *big.Int { + id, _ := big.NewInt(0).SetString("115792089237316195423570985008687907853269984665640564039457584007913129639936", 10) + return id + }(), + want: "0", + ignoreConvert: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + uid := new(UpkeepIdentifier) + ok := uid.FromBigInt(tc.id) + assert.Equal(t, !tc.ignoreConvert, ok) + assert.Equal(t, tc.want, uid.String()) + if !tc.ignoreConvert { + assert.Equal(t, tc.id.String(), uid.BigInt().String()) + } + }) + } +} + +func TestCheckResultEncoding(t *testing.T) { + tests := []struct { + name string + input CheckResult + expected string + decoded CheckResult + }{ + { + name: "check result with retry interval", + input: CheckResult{ + PipelineExecutionState: 1, + Retryable: true, + Eligible: true, + IneligibilityReason: 10, + UpkeepID: UpkeepIdentifier{1, 2, 3, 4, 5, 6, 7, 8}, + Trigger: Trigger{ + BlockNumber: 5, + BlockHash: [32]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}, + LogTriggerExtension: &LogTriggerExtension{ + TxHash: [32]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}, + Index: 99, + }, + }, + WorkID: "work id", + GasAllocated: 1001, + PerformData: []byte{1, 2, 3, 4, 5, 6}, + FastGasWei: big.NewInt(12), + LinkNative: big.NewInt(13), + RetryInterval: 1, + }, + expected: `{"PipelineExecutionState":1,"Retryable":true,"Eligible":true,"IneligibilityReason":10,"UpkeepID":[1,2,3,4,5,6,7,8,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"Trigger":{"BlockNumber":5,"BlockHash":[1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4],"LogTriggerExtension":{"TxHash":[1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4,1,2,3,4],"Index":99,"BlockHash":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"BlockNumber":0}},"WorkID":"work id","GasAllocated":1001,"PerformData":"AQIDBAUG","FastGasWei":12,"LinkNative":13}`, + decoded: CheckResult{ + PipelineExecutionState: 1, + Retryable: true, + Eligible: true, + IneligibilityReason: 10, + UpkeepID: UpkeepIdentifier{1, 2, 3, 4, 5, 6, 7, 8}, + Trigger: Trigger{ + BlockNumber: 5, + BlockHash: [32]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}, + LogTriggerExtension: &LogTriggerExtension{ + TxHash: [32]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}, + Index: 99, + }, + }, + WorkID: "work id", + GasAllocated: 1001, + PerformData: []byte{1, 2, 3, 4, 5, 6}, + FastGasWei: big.NewInt(12), + LinkNative: big.NewInt(13), + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + encoded, err := json.Marshal(tc.input) + require.NoError(t, err) + assert.Equal(t, tc.expected, string(encoded)) + + var decoded CheckResult + err = json.Unmarshal(encoded, &decoded) + require.NoError(t, err) + assert.True(t, reflect.DeepEqual(tc.decoded, decoded)) + }) + } +} + +func TestCheckResultString(t *testing.T) { + input := CheckResult{ + PipelineExecutionState: 1, + Retryable: true, + Eligible: true, + IneligibilityReason: 10, + UpkeepID: UpkeepIdentifier{1, 2, 3, 4, 5, 6, 7, 8}, + Trigger: Trigger{ + BlockNumber: 5, + BlockHash: [32]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}, + LogTriggerExtension: &LogTriggerExtension{ + TxHash: [32]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}, + Index: 99, + }, + }, + WorkID: "work id", + GasAllocated: 1001, + PerformData: []byte{1, 2, 3, 4, 5, 6}, + FastGasWei: big.NewInt(12), + LinkNative: big.NewInt(13), + RetryInterval: 1, + } + + result := fmt.Sprintf("%v", input) + expected := ` + { + "PipelineExecutionState":1, + "Retryable":true, + "Eligible":true, + "IneligibilityReason":10, + "UpkeepID":455867356320691211288303676705517652851520854420902457558325773249309310976, + "Trigger": { + "BlockHash":"0102030401020304010203040102030401020304010203040102030401020304", + "BlockNumber":5, + "LogTriggerExtension": { + "BlockHash":"0000000000000000000000000000000000000000000000000000000000000000", + "BlockNumber":0, + "Index":99, + "TxHash":"0102030401020304010203040102030401020304010203040102030401020304" + } + }, + "WorkID":"work id", + "GasAllocated":1001, + "PerformData":"010203040506", + "FastGasWei":12, + "LinkNative":13, + "RetryInterval":1 + } + ` + assertJSONEqual(t, expected, result) + assertJSONContainsAllStructFields(t, result, input) +} + +func TestCheckResult_UniqueID(t *testing.T) { + for _, tc := range []struct { + name string + result CheckResult + wantID string + }{ + { + name: "empty check result", + result: CheckResult{ + PipelineExecutionState: 0, + Retryable: false, + Eligible: false, + IneligibilityReason: 0, + UpkeepID: UpkeepIdentifier{}, + Trigger: Trigger{}, + WorkID: "", + GasAllocated: 0, + PerformData: nil, + FastGasWei: nil, + LinkNative: nil, + }, + wantID: "000966616c73650966616c736509000900000000000000000000000000000000000000000000000000000000000000000900000000000000000000000000000000000000000000000000000000000000000909090909090909", + }, + { + name: "errored execution state", + result: CheckResult{ + PipelineExecutionState: 1, + Retryable: false, + Eligible: false, + IneligibilityReason: 0, + UpkeepID: UpkeepIdentifier{}, + Trigger: Trigger{}, + WorkID: "", + GasAllocated: 0, + PerformData: nil, + FastGasWei: nil, + LinkNative: nil, + }, + wantID: "010966616c73650966616c736509000900000000000000000000000000000000000000000000000000000000000000000900000000000000000000000000000000000000000000000000000000000000000909090909090909", + }, + { + name: "retryable errored execution state", + result: CheckResult{ + PipelineExecutionState: 2, + Retryable: true, + Eligible: false, + IneligibilityReason: 0, + UpkeepID: UpkeepIdentifier{}, + Trigger: Trigger{}, + WorkID: "", + GasAllocated: 0, + PerformData: nil, + FastGasWei: nil, + LinkNative: nil, + }, + wantID: "0209747275650966616c736509000900000000000000000000000000000000000000000000000000000000000000000900000000000000000000000000000000000000000000000000000000000000000909090909090909", + }, + { + name: "retryable eligible errored execution state", + result: CheckResult{ + PipelineExecutionState: 2, + Retryable: true, + Eligible: true, + IneligibilityReason: 0, + UpkeepID: UpkeepIdentifier{}, + Trigger: Trigger{}, + WorkID: "", + GasAllocated: 0, + PerformData: nil, + FastGasWei: nil, + LinkNative: nil, + }, + wantID: "020974727565097472756509000900000000000000000000000000000000000000000000000000000000000000000900000000000000000000000000000000000000000000000000000000000000000909090909090909", + }, + { + name: "retryable eligible errored execution state with non zero ineligibilty reason", + result: CheckResult{ + PipelineExecutionState: 2, + Retryable: true, + Eligible: true, + IneligibilityReason: 6, + UpkeepID: UpkeepIdentifier{}, + Trigger: Trigger{}, + WorkID: "", + GasAllocated: 0, + PerformData: nil, + FastGasWei: nil, + LinkNative: nil, + }, + wantID: "020974727565097472756509060900000000000000000000000000000000000000000000000000000000000000000900000000000000000000000000000000000000000000000000000000000000000909090909090909", + }, + { + name: "retryable eligible errored execution state with non zero ineligibilty reason and upkeep ID", + result: CheckResult{ + PipelineExecutionState: 2, + Retryable: true, + Eligible: true, + IneligibilityReason: 6, + UpkeepID: UpkeepIdentifier([32]byte{9, 9, 9, 9}), + Trigger: Trigger{}, + WorkID: "", + GasAllocated: 0, + PerformData: nil, + FastGasWei: nil, + LinkNative: nil, + }, + wantID: "020974727565097472756509060909090909000000000000000000000000000000000000000000000000000000000900000000000000000000000000000000000000000000000000000000000000000909090909090909", + }, + { + name: "retryable eligible errored execution state with non zero ineligibilty reason, upkeep ID, and trigger", + result: CheckResult{ + PipelineExecutionState: 2, + Retryable: true, + Eligible: true, + IneligibilityReason: 6, + UpkeepID: UpkeepIdentifier([32]byte{9, 9, 9, 9}), + Trigger: Trigger{ + BlockNumber: BlockNumber(44), + BlockHash: [32]byte{8, 8, 8, 8}, + LogTriggerExtension: &LogTriggerExtension{ + TxHash: [32]byte{7, 7, 7, 7}, + Index: 63, + BlockHash: [32]byte{6, 6, 6, 6}, + BlockNumber: BlockNumber(55), + }, + }, + WorkID: "", + GasAllocated: 0, + PerformData: nil, + FastGasWei: nil, + LinkNative: nil, + }, + wantID: "02097472756509747275650906090909090900000000000000000000000000000000000000000000000000000000090808080800000000000000000000000000000000000000000000000000000000092c097b22426c6f636b48617368223a2230363036303630363030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030222c22426c6f636b4e756d626572223a35352c22496e646578223a36332c22547848617368223a2230373037303730373030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030227d090909090909", + }, + { + name: "retryable eligible errored execution state with non zero ineligibilty reason, upkeep ID, trigger, and workID", + result: CheckResult{ + PipelineExecutionState: 2, + Retryable: true, + Eligible: true, + IneligibilityReason: 6, + UpkeepID: UpkeepIdentifier([32]byte{9, 9, 9, 9}), + Trigger: Trigger{ + BlockNumber: BlockNumber(44), + BlockHash: [32]byte{8, 8, 8, 8}, + LogTriggerExtension: &LogTriggerExtension{ + TxHash: [32]byte{7, 7, 7, 7}, + Index: 63, + BlockHash: [32]byte{6, 6, 6, 6}, + BlockNumber: BlockNumber(55), + }, + }, + WorkID: "abcdef", + GasAllocated: 0, + PerformData: nil, + FastGasWei: nil, + LinkNative: nil, + }, + wantID: "02097472756509747275650906090909090900000000000000000000000000000000000000000000000000000000090808080800000000000000000000000000000000000000000000000000000000092c097b22426c6f636b48617368223a2230363036303630363030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030222c22426c6f636b4e756d626572223a35352c22496e646578223a36332c22547848617368223a2230373037303730373030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030227d096162636465660909090909", + }, + { + name: "retryable eligible errored execution state with non zero ineligibilty reason, upkeep ID, trigger, and workID", + result: CheckResult{ + PipelineExecutionState: 2, + Retryable: true, + Eligible: true, + IneligibilityReason: 6, + UpkeepID: UpkeepIdentifier([32]byte{9, 9, 9, 9}), + Trigger: Trigger{ + BlockNumber: BlockNumber(44), + BlockHash: [32]byte{8, 8, 8, 8}, + LogTriggerExtension: &LogTriggerExtension{ + TxHash: [32]byte{7, 7, 7, 7}, + Index: 63, + BlockHash: [32]byte{6, 6, 6, 6}, + BlockNumber: BlockNumber(55), + }, + }, + WorkID: "abcdef", + GasAllocated: 543, + PerformData: []byte("xyz"), + FastGasWei: nil, + LinkNative: nil, + }, + wantID: "02097472756509747275650906090909090900000000000000000000000000000000000000000000000000000000090808080800000000000000000000000000000000000000000000000000000000092c097b22426c6f636b48617368223a2230363036303630363030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030222c22426c6f636b4e756d626572223a35352c22496e646578223a36332c22547848617368223a2230373037303730373030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030227d0961626364656609021f0978797a090909", + }, + { + name: "all fields", + result: CheckResult{ + PipelineExecutionState: 2, + Retryable: true, + Eligible: true, + IneligibilityReason: 6, + UpkeepID: UpkeepIdentifier([32]byte{9, 9, 9, 9}), + Trigger: Trigger{ + BlockNumber: BlockNumber(44), + BlockHash: [32]byte{8, 8, 8, 8}, + LogTriggerExtension: &LogTriggerExtension{ + TxHash: [32]byte{7, 7, 7, 7}, + Index: 63, + BlockHash: [32]byte{6, 6, 6, 6}, + BlockNumber: BlockNumber(55), + }, + }, + WorkID: "abcdef", + GasAllocated: 543, + PerformData: []byte("xyz"), + FastGasWei: big.NewInt(456), + LinkNative: big.NewInt(789), + }, + wantID: "02097472756509747275650906090909090900000000000000000000000000000000000000000000000000000000090808080800000000000000000000000000000000000000000000000000000000092c097b22426c6f636b48617368223a2230363036303630363030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030222c22426c6f636b4e756d626572223a35352c22496e646578223a36332c22547848617368223a2230373037303730373030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030303030227d0961626364656609021f0978797a0901c809031509", + }, + } { + t.Run(tc.name, func(t *testing.T) { + id := tc.result.UniqueID() + assert.Equal(t, tc.wantID, id) + }) + } +} + +func assertJSONEqual(t *testing.T, expected, actual string) { + var expectedMap, actualMap map[string]interface{} + require.NoError(t, json.Unmarshal([]byte(expected), &expectedMap), "expected is invalid json") + require.NoError(t, json.Unmarshal([]byte(actual), &actualMap), "actual is invalid json") + assert.True(t, reflect.DeepEqual(expectedMap, actualMap), "expected and result json strings do not match") +} + +func assertJSONContainsAllStructFields(t *testing.T, jsonString string, anyStruct interface{}) { + // if fields are added to the struct in the future, but omitted from the "pretty" string template, this test will fail + var jsonMap map[string]interface{} + var structMap map[string]interface{} + require.NoError(t, json.Unmarshal([]byte(jsonString), &jsonMap), "jsonString is invalid json") + structJson, err := json.Marshal(anyStruct) + require.NoError(t, err) + require.NoError(t, json.Unmarshal(structJson, &structMap)) + assertCongruentKeyStructure(t, structMap, jsonMap) +} + +func assertCongruentKeyStructure(t *testing.T, structMap, jsonMap map[string]interface{}) { + // this functions asserts that the two inputs have congruent key shapes, while disregarding + // the values + for k := range structMap { + assert.True(t, jsonMap[k] != nil, "json string does not contain field %s", k) + if nested1, ok := structMap[k].(map[string]interface{}); ok { + if nested2, ok := jsonMap[k].(map[string]interface{}); ok { + assertCongruentKeyStructure(t, nested1, nested2) + } else { + assert.Fail(t, "maps do not contain the same type for key %s", k) + } + } + } +} diff --git a/pkg/types/automation/interfaces.go b/pkg/types/automation/interfaces.go new file mode 100644 index 000000000..a42913ef7 --- /dev/null +++ b/pkg/types/automation/interfaces.go @@ -0,0 +1,79 @@ +package automation + +import ( + "context" + "io" + + "github.com/smartcontractkit/chainlink-common/pkg/services" +) + +// UpkeepStateStore is the interface for managing upkeeps final state in a local store. +type UpkeepStateStore interface { + UpkeepStateUpdater + UpkeepStateReader + Start(context.Context) error + io.Closer +} + +type Registry interface { + CheckUpkeeps(ctx context.Context, keys ...UpkeepPayload) ([]CheckResult, error) + Name() string + Start(ctx context.Context) error + Close() error + HealthReport() map[string]error +} + +type EventProvider interface { + services.Service + GetLatestEvents(ctx context.Context) ([]TransmitEvent, error) +} + +type LogRecoverer interface { + RecoverableProvider + GetProposalData(context.Context, CoordinatedBlockProposal) ([]byte, error) + + Start(context.Context) error + io.Closer +} + +// UpkeepStateReader is the interface for reading the current state of upkeeps. +type UpkeepStateReader interface { + SelectByWorkIDs(ctx context.Context, workIDs ...string) ([]UpkeepState, error) +} + +type Encoder interface { + Encode(...CheckResult) ([]byte, error) + Extract([]byte) ([]ReportedUpkeep, error) +} + +type LogEventProvider interface { + GetLatestPayloads(context.Context) ([]UpkeepPayload, error) + Start(context.Context) error + Close() error +} + +type RecoverableProvider interface { + GetRecoveryProposals(context.Context) ([]UpkeepPayload, error) +} + +type ConditionalUpkeepProvider interface { + GetActiveUpkeeps(context.Context) ([]UpkeepPayload, error) +} + +type PayloadBuilder interface { + // Can get payloads for a subset of proposals along with an error + BuildPayloads(context.Context, ...CoordinatedBlockProposal) ([]UpkeepPayload, error) +} + +type BlockSubscriber interface { + // Subscribe provides an identifier integer, a new channel, and potentially an error + Subscribe() (int, chan BlockHistory, error) + // Unsubscribe requires an identifier integer and indicates the provided channel should be closed + Unsubscribe(int) error + Start(context.Context) error + Close() error +} + +type UpkeepStateUpdater interface { + SetUpkeepState(context.Context, CheckResult, UpkeepState) error +} diff --git a/pkg/types/automation/trigger.go b/pkg/types/automation/trigger.go new file mode 100644 index 000000000..a600ecbfe --- /dev/null +++ b/pkg/types/automation/trigger.go @@ -0,0 +1,87 @@ +package automation + +import ( + "bytes" + "encoding/binary" + "encoding/hex" + "fmt" +) + +// Trigger represents a trigger for an upkeep. +// It contains an extension per trigger type, and the block number + hash +// in which the trigger was checked. +// NOTE: This struct is sent on the p2p network as part of observations to get quorum +// Any change here should be backwards compatible and should keep validation and +// quorum requirements in mind. Please ensure to get a proper review along with an +// upgrade plan before changing this +type Trigger struct { + // BlockNumber is the block number in which the trigger was checked + BlockNumber BlockNumber + // BlockHash is the block hash in which the trigger was checked + BlockHash [32]byte + // LogTriggerExtension is the extension for log triggers + LogTriggerExtension *LogTriggerExtension +} + +func (r Trigger) String() string { + res := fmt.Sprintf(`{"BlockNumber":%d,"BlockHash":"%s"`, r.BlockNumber, hex.EncodeToString(r.BlockHash[:])) + if r.LogTriggerExtension != nil { + res += fmt.Sprintf(`,"LogTriggerExtension":%s`, r.LogTriggerExtension) + } + res += "}" + return res +} + +// NewTrigger returns a new basic trigger w/o extension +func NewTrigger(blockNumber BlockNumber, blockHash [32]byte) Trigger { + return Trigger{ + BlockNumber: blockNumber, + BlockHash: blockHash, + } +} + +func NewLogTrigger(blockNumber BlockNumber, blockHash [32]byte, logTriggerExtension *LogTriggerExtension) Trigger { + return Trigger{ + BlockNumber: blockNumber, + BlockHash: blockHash, + LogTriggerExtension: logTriggerExtension, + } +} + +// LogTriggerExtension is the extension used for log triggers, +// It contains information of the log event that was triggered. +// NOTE: This struct is sent on the p2p network as part of observations to get quorum +// Any change here should be backwards compatible and should keep validation and +// quorum requirements in mind. Please ensure to get a proper review along with an +// upgrade plan before changing this +type LogTriggerExtension struct { + // LogTxHash is the transaction hash of the log event + TxHash [32]byte + // Index is the index of the log event in the transaction + Index uint32 + // BlockHash is the block hash in which the event occurred + BlockHash [32]byte + // BlockNumber is the block number in which the event occurred + // NOTE: This field might be empty. If relying on this field check + // it is non empty, if it's empty derive from BlockHash + BlockNumber BlockNumber +} + +// LogIdentifier returns a unique identifier for the log event, +// composed of the transaction hash and the log index bytes. +func (e LogTriggerExtension) LogIdentifier() []byte { + indexBytes := make([]byte, 4) // uint32 is 4 bytes + binary.BigEndian.PutUint32(indexBytes, e.Index) + return bytes.Join([][]byte{ + e.BlockHash[:], + e.TxHash[:], + indexBytes, + }, []byte{}) +} + +func (e LogTriggerExtension) String() string { + return fmt.Sprintf( + `{"BlockHash":"%s","BlockNumber":%d,"Index":%d,"TxHash":"%s"}`, + hex.EncodeToString(e.BlockHash[:]), e.BlockNumber, e.Index, hex.EncodeToString(e.TxHash[:]), + ) +} diff --git a/pkg/types/provider_automation.go b/pkg/types/provider_automation.go index f3ea3407c..ee4f217f2 100644 --- a/pkg/types/provider_automation.go +++ b/pkg/types/provider_automation.go @@ -1,6 +1,17 @@ package types +import "github.com/smartcontractkit/chainlink-common/pkg/types/automation" + // AutomationProvider provides components needed for the automation OCR2 plugin. type AutomationProvider interface { PluginProvider + Registry() automation.Registry + Encoder() automation.Encoder + TransmitEventProvider() automation.EventProvider + BlockSubscriber() automation.BlockSubscriber + PayloadBuilder() automation.PayloadBuilder + UpkeepStateStore() automation.UpkeepStateStore + LogEventProvider() automation.LogEventProvider + LogRecoverer() automation.LogRecoverer + UpkeepProvider() automation.ConditionalUpkeepProvider } diff --git a/pkg/types/relayer.go b/pkg/types/relayer.go index 5ad2d4568..9b7873863 100644 --- a/pkg/types/relayer.go +++ b/pkg/types/relayer.go @@ -16,12 +16,20 @@ type PluginArgs struct { } type RelayArgs struct { - ExternalJobID uuid.UUID - JobID int32 - ContractID string - New bool // Whether this is a first time job add. - RelayConfig []byte - ProviderType string + ExternalJobID uuid.UUID + JobID int32 + ContractID string + New bool // Whether this is a first time job add. + RelayConfig []byte + ProviderType string + MercuryCredentials *MercuryCredentials +} + +type MercuryCredentials struct { + LegacyURL string + URL string + Username string + Password string } type ChainStatus struct {