Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream fallback: error handler #12061

Closed
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
75c8094
Stop retry flow for error handling (log trigger) (#12026)
amirylm Feb 15, 2024
8ae5f77
add checkErrorHandler (#12037)
shileiwill Feb 15, 2024
30dc562
Stop retry flow for error handling (conditional trigger) (#12032)
amirylm Feb 16, 2024
ed7baf4
refactor streams error handler
infiloop2 Feb 19, 2024
fa054c1
fix mercury v0.2 request handling
infiloop2 Feb 19, 2024
1219a12
fix mercury 0.3 DoRequest
infiloop2 Feb 19, 2024
edd39eb
fix tests
infiloop2 Feb 19, 2024
9524f08
connect check error callback
infiloop2 Feb 19, 2024
02774b8
add todo
infiloop2 Feb 19, 2024
8e7e3fb
add todo
infiloop2 Feb 19, 2024
496e0e9
improve comments
infiloop2 Feb 19, 2024
e97962e
polish 0.2
infiloop2 Feb 19, 2024
566d1c4
fix debug.go, refactor eth_call on checkCallback and checkErrorHandle…
shileiwill Feb 20, 2024
cc46df7
small fixes
infiloop2 Feb 20, 2024
783ed9c
calculate retry config for conditionals
infiloop2 Feb 20, 2024
b660a70
rename to clarify function
infiloop2 Feb 20, 2024
0336020
cleanup pipeline execution errors
infiloop2 Feb 20, 2024
ceec78a
Merge pull request #12088 from smartcontractkit/auto-9006-call-error-…
infiloop2 Feb 20, 2024
d1e15f3
fix unit tests for v02_request.go
shileiwill Feb 21, 2024
6998268
Fix bug in 0.3 request error code
infiloop2 Feb 21, 2024
4969a0a
add state assertion to single feed request tests
infiloop2 Feb 21, 2024
9cdc759
add test case for invalid response bytes
infiloop2 Feb 21, 2024
b67d9e2
add tests for more retryable errors
infiloop2 Feb 21, 2024
1975a0d
add tests for retryable -> error conversion
infiloop2 Feb 21, 2024
d1c9d31
polish 0.2 combining multiple feeds
infiloop2 Feb 21, 2024
d0c6612
add more tests for different combination of feed responses
infiloop2 Feb 21, 2024
40d83f7
remove unused fields
shileiwill Feb 21, 2024
91718a9
unit tests for v03
shileiwill Feb 22, 2024
d4aad99
Merge branch 'auto-9004-stream-err-handler' into unittests
infiloop2 Feb 22, 2024
b4d26aa
fix mercury_test.go
infiloop2 Feb 22, 2024
2e473a5
minor polishing
infiloop2 Feb 22, 2024
a53eaff
Merge pull request #12116 from smartcontractkit/unittests
infiloop2 Feb 22, 2024
dfcb2a4
thread control: added function that accept a context
amirylm Feb 19, 2024
addf869
use threadCtrl.GoCtx to ensure the timeout is applied with the provid…
amirylm Feb 19, 2024
84f67cf
added timeout for mercury requests (including retries)
amirylm Feb 21, 2024
81b8139
fix lint
amirylm Feb 22, 2024
9e8719f
set timeout to 10s
amirylm Feb 22, 2024
81775b3
use GoCtx within request clients
amirylm Feb 22, 2024
1cd2a97
lint
amirylm Feb 22, 2024
fcc1fa2
add more 0.3 tests
infiloop2 Feb 22, 2024
528efd0
set err code to timeout if ctx is done
amirylm Feb 22, 2024
bad4376
Merge pull request #12089 from smartcontractkit/AUTO-9012-investigate…
infiloop2 Feb 22, 2024
4540b21
Finalize stream error codes, polish requests to return consistent nil…
infiloop2 Feb 22, 2024
bb9295a
add tests for checkErrorHanlder
shileiwill Feb 22, 2024
eb3ae85
Merge pull request #12146 from smartcontractkit/AUTO-9078
infiloop2 Feb 23, 2024
fae562a
treat timeout as non retryable error codes
infiloop2 Feb 23, 2024
3100273
allow empty feed request which returns error code
infiloop2 Feb 23, 2024
d8007e4
update test contract with a flag for checkErr result
amirylm Feb 20, 2024
46b9ec5
generate wrappers for LogTriggeredStreamsLookup
shileiwill Feb 20, 2024
777e5aa
handling empty performData case for err handler
amirylm Feb 21, 2024
22b2dcf
test (wip)
amirylm Feb 21, 2024
c8aa142
fixing contract
amirylm Feb 21, 2024
6b04b6c
waiting for err handler logs (wip)
amirylm Feb 21, 2024
f8f7db9
update contract and generate wrappers
shileiwill Feb 21, 2024
9c22402
lint
amirylm Feb 22, 2024
051e0ff
use startBlock instead of 1
amirylm Feb 22, 2024
5b18f3b
add missing arg
amirylm Feb 22, 2024
0bcbe19
check multiple responses:
amirylm Feb 22, 2024
360a14b
fix test
amirylm Feb 25, 2024
9ee1568
cleanup
amirylm Feb 25, 2024
5abd373
Merge pull request #12100 from smartcontractkit/AUTO-9007-error-handl…
infiloop2 Feb 26, 2024
adeaf70
set timeout in http client
infiloop2 Feb 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions contracts/src/v0.8/automation/dev/MercuryRegistry.sol
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ contract MercuryRegistry is ConfirmedOwner, AutomationCompatibleInterface, Strea
return (filteredValues.length > 0, performData);
}

function checkErrorHandler(
uint256 errCode,
bytes memory extraData
) external view override returns (bool upkeepNeeded, bytes memory performData) {
// dummy function with default values
return (false, new bytes(0));
}

// Use deviated off-chain values to update on-chain state.
function performUpkeep(bytes calldata performData) external override {
(bytes[] memory values /* bytes memory lookupData */, ) = abi.decode(performData, (bytes[], bytes));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ contract MercuryRegistryBatchUpkeep is ConfirmedOwner, AutomationCompatibleInter
return i_registry.checkCallback(values, lookupData);
}

function checkErrorHandler(
uint256 errCode,
bytes memory extraData
) external view override returns (bool upkeepNeeded, bytes memory performData) {
// dummy function with default values
return (false, new bytes(0));
}

// Use the master registry to update state.
function performUpkeep(bytes calldata performData) external override {
i_registry.performUpkeep(performData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,17 @@ interface StreamsLookupCompatibleInterface {
bytes[] memory values,
bytes memory extraData
) external view returns (bool upkeepNeeded, bytes memory performData);

/**
* @notice this is a new, optional function in streams lookup. It is meant to surface streams lookup errors.
* @param errCode an uint value that represents the streams lookup error code.
* @param extraData context data from streams lookup process.
* @return upkeepNeeded boolean to indicate whether the keeper should call performUpkeep or not.
* @return performData bytes that the keeper should call performUpkeep with, if
* upkeep is needed. If you would like to encode data to decode later, try `abi.encode`.
*/
function checkErrorHandler(
uint256 errCode,
bytes memory extraData
) external view returns (bool upkeepNeeded, bytes memory performData);
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ contract LogTriggeredStreamsLookup is ILogAutomation, StreamsLookupCompatibleInt
return (true, performData);
}

function checkErrorHandler(
uint256 errCode,
bytes memory extraData
) external view override returns (bool upkeepNeeded, bytes memory performData) {
// dummy function with default values
return (false, new bytes(0));
}

function getBlockNumber() internal view returns (uint256) {
if (useArbitrumBlockNum) {
return ARB_SYS.arbBlockNumber();
Expand Down
8 changes: 8 additions & 0 deletions contracts/src/v0.8/tests/StreamsLookupUpkeep.sol
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ contract StreamsLookupUpkeep is AutomationCompatibleInterface, StreamsLookupComp
return (callbackReturnBool, performData);
}

function checkErrorHandler(
uint256 errCode,
bytes memory extraData
) external view override returns (bool upkeepNeeded, bytes memory performData) {
// dummy function with default values
return (false, new bytes(0));
}

function checkUpkeep(bytes calldata data) external view returns (bool, bytes memory) {
if (!eligible()) {
return (false, data);
Expand Down
8 changes: 8 additions & 0 deletions contracts/src/v0.8/tests/VerifiableLoadLogTriggerUpkeep.sol
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,12 @@ contract VerifiableLoadLogTriggerUpkeep is VerifiableLoadBase, StreamsLookupComp
bytes memory performData = abi.encode(values, extraData);
return (true, performData);
}

function checkErrorHandler(
uint256 errCode,
bytes memory extraData
) external view override returns (bool upkeepNeeded, bytes memory performData) {
// dummy function with default values
return (false, new bytes(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ contract VerifiableLoadStreamsLookupUpkeep is VerifiableLoadBase, StreamsLookupC
return (true, performData);
}

function checkErrorHandler(
uint256 errCode,
bytes memory extraData
) external view override returns (bool upkeepNeeded, bytes memory performData) {
// dummy function with default values
return (false, new bytes(0));
}

function checkUpkeep(bytes calldata checkData) external returns (bool, bytes memory) {
uint256 startGas = gasleft();
uint256 upkeepId = abi.decode(checkData, (uint256));
Expand Down

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ keeper_registry_wrapper_2_1: ../../contracts/solc/v0.8.16/KeeperRegistry2_1/Keep
keeper_registry_wrapper_2_2: ../../contracts/solc/v0.8.16/AutomationRegistry2_2/AutomationRegistry2_2.abi ../../contracts/solc/v0.8.16/AutomationRegistry2_2/AutomationRegistry2_2.bin 8f1c6d3eddf36201a556a101350abd6eb733d4c3177da2ddb5eca0d24d9a88ef
keepers_vrf_consumer: ../../contracts/solc/v0.8.6/KeepersVRFConsumer/KeepersVRFConsumer.abi ../../contracts/solc/v0.8.6/KeepersVRFConsumer/KeepersVRFConsumer.bin fa75572e689c9e84705c63e8dbe1b7b8aa1a8fe82d66356c4873d024bb9166e8
log_emitter: ../../contracts/solc/v0.8.19/LogEmitter/LogEmitter.abi ../../contracts/solc/v0.8.19/LogEmitter/LogEmitter.bin 4b129ab93432c95ff9143f0631323e189887668889e0b36ccccf18a571e41ccf
log_triggered_streams_lookup_wrapper: ../../contracts/solc/v0.8.16/LogTriggeredStreamsLookup/LogTriggeredStreamsLookup.abi ../../contracts/solc/v0.8.16/LogTriggeredStreamsLookup/LogTriggeredStreamsLookup.bin f8da43a927c1a66238a9f4fd5d5dd7e280e361daa0444da1f7f79498ace901e1
log_triggered_streams_lookup_wrapper: ../../contracts/solc/v0.8.16/LogTriggeredStreamsLookup/LogTriggeredStreamsLookup.abi ../../contracts/solc/v0.8.16/LogTriggeredStreamsLookup/LogTriggeredStreamsLookup.bin bb5f9f72c698fb2a79667cbf9206fa3c848aa4cb4866c5fa8a541627f0af48ec
log_upkeep_counter_wrapper: ../../contracts/solc/v0.8.6/LogUpkeepCounter/LogUpkeepCounter.abi ../../contracts/solc/v0.8.6/LogUpkeepCounter/LogUpkeepCounter.bin 42426bbb83f96dfbe55fc576d6c65020eaeed690e2289cf99b0c4aa810a5f4ec
mock_aggregator_proxy: ../../contracts/solc/v0.8.6/MockAggregatorProxy/MockAggregatorProxy.abi ../../contracts/solc/v0.8.6/MockAggregatorProxy/MockAggregatorProxy.bin b16c108f3dd384c342ddff5e94da7c0a8d39d1be5e3d8f2cf61ecc7f0e50ff42
mock_ethlink_aggregator_wrapper: ../../contracts/solc/v0.6/MockETHLINKAggregator/MockETHLINKAggregator.abi ../../contracts/solc/v0.6/MockETHLINKAggregator/MockETHLINKAggregator.bin 1c52c24f797b8482aa12b8251dcea1c072827bd5b3426b822621261944b99ca0
Expand All @@ -66,16 +66,16 @@ solidity_vrf_request_id_v08: ../../contracts/solc/v0.8.6/VRFRequestIDBaseTestHel
solidity_vrf_v08_verifier_wrapper: ../../contracts/solc/v0.8.6/VRFTestHelper/VRFTestHelper.abi ../../contracts/solc/v0.8.6/VRFTestHelper/VRFTestHelper.bin f37f8b21a81c113085c6137835a2246db6ebda07da455c4f2b5c7ec60c725c3b
solidity_vrf_verifier_wrapper: ../../contracts/solc/v0.6/VRFTestHelper/VRFTestHelper.abi ../../contracts/solc/v0.6/VRFTestHelper/VRFTestHelper.bin 44c2b67d8d2990ab580453deb29d63508c6147a3dc49908a1db563bef06e6474
solidity_vrf_wrapper: ../../contracts/solc/v0.6/VRF/VRF.abi ../../contracts/solc/v0.6/VRF/VRF.bin 04ede5b83c06ba5b76ef99c081c72928007d8a7aaefcf21449a46a07cbd4bfc2
streams_lookup_compatible_interface: ../../contracts/solc/v0.8.16/StreamsLookupCompatibleInterface/StreamsLookupCompatibleInterface.abi ../../contracts/solc/v0.8.16/StreamsLookupCompatibleInterface/StreamsLookupCompatibleInterface.bin feb92cc666df21ea04ab9d7a588a513847b01b2f66fc167d06ab28ef2b17e015
streams_lookup_upkeep_wrapper: ../../contracts/solc/v0.8.16/StreamsLookupUpkeep/StreamsLookupUpkeep.abi ../../contracts/solc/v0.8.16/StreamsLookupUpkeep/StreamsLookupUpkeep.bin b1a598963cacac51ed4706538d0f142bdc0d94b9a4b13e2d402131cdf05c9bcf
streams_lookup_compatible_interface: ../../contracts/solc/v0.8.16/StreamsLookupCompatibleInterface/StreamsLookupCompatibleInterface.abi ../../contracts/solc/v0.8.16/StreamsLookupCompatibleInterface/StreamsLookupCompatibleInterface.bin 2861f553fb4731e89126b13319462df674727005a51982d1e617e2c2e44fa422
streams_lookup_upkeep_wrapper: ../../contracts/solc/v0.8.16/StreamsLookupUpkeep/StreamsLookupUpkeep.abi ../../contracts/solc/v0.8.16/StreamsLookupUpkeep/StreamsLookupUpkeep.bin 37e3a61091cc2a156539dd4aaff987e07577118aa02e97931a647df55705465e
test_api_consumer_wrapper: ../../contracts/solc/v0.6/TestAPIConsumer/TestAPIConsumer.abi ../../contracts/solc/v0.6/TestAPIConsumer/TestAPIConsumer.bin ed10893cb18894c18e275302329c955f14ea2de37ee044f84aa1e067ac5ea71e
trusted_blockhash_store: ../../contracts/solc/v0.8.6/TrustedBlockhashStore/TrustedBlockhashStore.abi ../../contracts/solc/v0.8.6/TrustedBlockhashStore/TrustedBlockhashStore.bin 98cb0dc06c15af5dcd3b53bdfc98e7ed2489edc96a42203294ac2fc0efdda02b
type_and_version_interface_wrapper: ../../contracts/solc/v0.8.6/KeeperRegistry1_2/TypeAndVersionInterface.abi ../../contracts/solc/v0.8.6/KeeperRegistry1_2/TypeAndVersionInterface.bin bc9c3a6e73e3ebd5b58754df0deeb3b33f4bb404d5709bb904aed51d32f4b45e
upkeep_counter_wrapper: ../../contracts/solc/v0.8.16/UpkeepCounter/UpkeepCounter.abi ../../contracts/solc/v0.8.16/UpkeepCounter/UpkeepCounter.bin 77f000229a501f638dd2dc439859257f632894c728b31e68aea4f6d6c52f1b71
upkeep_perform_counter_restrictive_wrapper: ../../contracts/solc/v0.8.16/UpkeepPerformCounterRestrictive/UpkeepPerformCounterRestrictive.abi ../../contracts/solc/v0.8.16/UpkeepPerformCounterRestrictive/UpkeepPerformCounterRestrictive.bin 20955b21acceb58355fa287b29194a73edf5937067ba7140667301017cb2b24c
upkeep_transcoder: ../../contracts/solc/v0.8.6/UpkeepTranscoder/UpkeepTranscoder.abi ../../contracts/solc/v0.8.6/UpkeepTranscoder/UpkeepTranscoder.bin 336c92a981597be26508455f81a908a0784a817b129a59686c5b2c4afcba730a
verifiable_load_log_trigger_upkeep_wrapper: ../../contracts/solc/v0.8.16/VerifiableLoadLogTriggerUpkeep/VerifiableLoadLogTriggerUpkeep.abi ../../contracts/solc/v0.8.16/VerifiableLoadLogTriggerUpkeep/VerifiableLoadLogTriggerUpkeep.bin fb674ba44c0e8f3b385cd10b2f7dea5cd07b5f38df08066747e8b1542e152557
verifiable_load_streams_lookup_upkeep_wrapper: ../../contracts/solc/v0.8.16/VerifiableLoadStreamsLookupUpkeep/VerifiableLoadStreamsLookupUpkeep.abi ../../contracts/solc/v0.8.16/VerifiableLoadStreamsLookupUpkeep/VerifiableLoadStreamsLookupUpkeep.bin 785f68c44bfff070505eaa65e38a1af94046e5f9afc1189bcf2c8cfcd1102d66
verifiable_load_log_trigger_upkeep_wrapper: ../../contracts/solc/v0.8.16/VerifiableLoadLogTriggerUpkeep/VerifiableLoadLogTriggerUpkeep.abi ../../contracts/solc/v0.8.16/VerifiableLoadLogTriggerUpkeep/VerifiableLoadLogTriggerUpkeep.bin ee5c608e4e84c80934e42b0c02a49624840adf10b50c91f688bf8f0c7c6994c2
verifiable_load_streams_lookup_upkeep_wrapper: ../../contracts/solc/v0.8.16/VerifiableLoadStreamsLookupUpkeep/VerifiableLoadStreamsLookupUpkeep.abi ../../contracts/solc/v0.8.16/VerifiableLoadStreamsLookupUpkeep/VerifiableLoadStreamsLookupUpkeep.bin 58f1f6b31a313e04ceb3e0e0f0393bc195cc2f4784a3b0e14a80a86fc836f427
verifiable_load_upkeep_wrapper: ../../contracts/solc/v0.8.16/VerifiableLoadUpkeep/VerifiableLoadUpkeep.abi ../../contracts/solc/v0.8.16/VerifiableLoadUpkeep/VerifiableLoadUpkeep.bin a3e02c43756ea91e7ce4b81e48c11648f1d12f6663c236780147e41dfa36ebee
vrf_consumer_v2: ../../contracts/solc/v0.8.6/VRFConsumerV2/VRFConsumerV2.abi ../../contracts/solc/v0.8.6/VRFConsumerV2/VRFConsumerV2.bin 9ef258bf8e9f8d880fd229ceb145593d91e24fc89366baa0bf19169c5787d15f
vrf_consumer_v2_plus_upgradeable_example: ../../contracts/solc/v0.8.6/VRFConsumerV2PlusUpgradeableExample/VRFConsumerV2PlusUpgradeableExample.abi ../../contracts/solc/v0.8.6/VRFConsumerV2PlusUpgradeableExample/VRFConsumerV2PlusUpgradeableExample.bin 3155c611e4d6882e9324b6e975033b31356776ea8b031ca63d63da37589d583b
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package encoding

import (
"net/http"

ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation"

"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/automation_utils_2_1"
Expand Down Expand Up @@ -43,6 +45,36 @@ const (
UpkeepNotAuthorized PipelineExecutionState = 9
)

// ErrCode is used for invoking an error handler with a specific error code.
type ErrCode uint32

const (
ErrCodeNil ErrCode = 0
ErrCodePartialContent ErrCode = 808206
ErrCodeDataStreamsError ErrCode = 808500
ErrCodeBadRequest ErrCode = 808400
ErrCodeUnauthorized ErrCode = 808401
ErrCodeEncodingError ErrCode = 808600
ErrCodeStreamLookupTimeout ErrCode = 808603
infiloop2 marked this conversation as resolved.
Show resolved Hide resolved
)

func HttpToErrCode(statusCode int) ErrCode {
switch statusCode {
case http.StatusOK:
return ErrCodeNil
case http.StatusPartialContent:
return ErrCodePartialContent
case http.StatusBadRequest:
return ErrCodeBadRequest
case http.StatusUnauthorized:
return ErrCodeUnauthorized
case http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
return ErrCodeDataStreamsError
default:
return 0
}
}

type UpkeepInfo = iregistry21.KeeperRegistryBase21UpkeepInfo

type Packer interface {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ const (
BlockNumber = "blockNumber" // valid for v0.2
Timestamp = "timestamp" // valid for v0.3
totalFastPluginRetries = 5
totalMediumPluginRetries = 10
totalMediumPluginRetries = totalFastPluginRetries + 1
RetryIntervalTimeout = time.Duration(-1)
)

var GenerateHMACFn = func(method string, path string, body []byte, clientId string, secret string, ts int64) string {
Expand All @@ -45,8 +46,7 @@ var GenerateHMACFn = func(method string, path string, body []byte, clientId stri
}

// CalculateRetryConfig returns plugin retry interval based on how many times plugin has retried this work
var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvider) time.Duration {
var retryInterval time.Duration
var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvider) (retryInterval time.Duration) {
var retries int
totalAttempts, ok := mercuryConfig.GetPluginRetry(prk)
if ok {
Expand All @@ -55,9 +55,9 @@ var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvide
retryInterval = 1 * time.Second
} else if retries < totalMediumPluginRetries {
retryInterval = 5 * time.Second
} else {
retryInterval = RetryIntervalTimeout
}
// if the core node has retried totalMediumPluginRetries times, do not set retry interval and plugin will use
// the default interval
} else {
retryInterval = 1 * time.Second
}
Expand All @@ -68,6 +68,7 @@ var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvide
type MercuryData struct {
Index int
Error error
ErrCode encoding.ErrCode
Retryable bool
Bytes [][]byte
State encoding.PipelineExecutionState
Expand All @@ -86,7 +87,17 @@ type HttpClient interface {
}

type MercuryClient interface {
DoRequest(ctx context.Context, streamsLookup *StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, error)
// DoRequest makes a data stream request, it manages retries and returns the following:
// state: the state of the pipeline execution, used by our components.
// upkeepFailureReason: the reason for the upkeep failure, used by our components.
// data: the data returned from the data stream.
// retryable: whether the request is retryable.
// retryInterval: the interval to wait before retrying the request, or RetryIntervalTimeout if no more retries are allowed.
// errCode: the error code of the request, to be passed to the user's error handler if applicable.
// error: the raw error that occurred during the request.
//
// Exploratory: consider to merge state/failureReason/errCode into a single object
DoRequest(ctx context.Context, streamsLookup *StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, encoding.ErrCode, error)
}

type StreamsLookupError struct {
Expand Down
Loading
Loading