Skip to content

Commit

Permalink
finalize mercury 0.3 integration details (#10538)
Browse files Browse the repository at this point in the history
* add pprof to auto 2.1

* use uint32 for timestamp

* add 0x prefix to decode full report

* decode full report as byte slice

* introduce a delay for timestamp

* remove log

* update URL

* update

* update

* update

* remove url encoding

* add a TODO
  • Loading branch information
FelixFan1992 authored Sep 8, 2023
1 parent 47ef226 commit ebc6a02
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
MercuryUnmarshalError PipelineExecutionState = 6
InvalidMercuryRequest PipelineExecutionState = 7
InvalidMercuryResponse PipelineExecutionState = 8 // this will only happen if Mercury server sends bad responses
UpkeepNotAuthorized PipelineExecutionState = 9
)

type UpkeepInfo = iregistry21.KeeperRegistryBase21UpkeepInfo
Expand Down
77 changes: 41 additions & 36 deletions core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ type MercuryV03Response struct {
}

type MercuryV03Report struct {
FeedID string `json:"feedID"` // feed id in hex
ValidFromTimestamp string `json:"validFromTimestamp"`
ObservationsTimestamp string `json:"observationsTimestamp"`
FullReport string `json:"fullReport"` // the actual mercury report of this feed, can be sent to verifier
FeedID []byte `json:"feedID"` // feed id in hex
ValidFromTimestamp uint32 `json:"validFromTimestamp"`
ObservationsTimestamp uint32 `json:"observationsTimestamp"`
FullReport []byte `json:"fullReport"` // the actual mercury report of this feed, can be sent to verifier
}

type MercuryData struct {
Expand Down Expand Up @@ -163,7 +163,7 @@ func (r *EvmRegistry) doLookup(ctx context.Context, wg *sync.WaitGroup, lookup *

state, reason, values, retryable, err := r.doMercuryRequest(ctx, lookup, lggr)
if err != nil {
lggr.Errorf("upkeep %s retryable %v doMercuryRequest: %v", lookup.upkeepId, retryable, err)
lggr.Errorf("upkeep %s retryable %v doMercuryRequest: %s", lookup.upkeepId, retryable, err.Error())
checkResults[i].Retryable = retryable
checkResults[i].PipelineExecutionState = uint8(state)
checkResults[i].IneligibilityReason = uint8(reason)
Expand All @@ -176,7 +176,7 @@ func (r *EvmRegistry) doLookup(ctx context.Context, wg *sync.WaitGroup, lookup *

state, retryable, mercuryBytes, err := r.checkCallback(ctx, values, lookup)
if err != nil {
lggr.Errorf("at block %d upkeep %s checkCallback err: %v", lookup.block, lookup.upkeepId, err)
lggr.Errorf("at block %d upkeep %s checkCallback err: %s", lookup.block, lookup.upkeepId, err.Error())
checkResults[i].Retryable = retryable
checkResults[i].PipelineExecutionState = uint8(state)
return
Expand All @@ -185,7 +185,7 @@ func (r *EvmRegistry) doLookup(ctx context.Context, wg *sync.WaitGroup, lookup *

state, needed, performData, failureReason, _, err := r.packer.UnpackCheckCallbackResult(mercuryBytes)
if err != nil {
lggr.Errorf("at block %d upkeep %s UnpackCheckCallbackResult err: %v", lookup.block, lookup.upkeepId, err)
lggr.Errorf("at block %d upkeep %s UnpackCheckCallbackResult err: %s", lookup.block, lookup.upkeepId, err.Error())
checkResults[i].PipelineExecutionState = uint8(state)
return
}
Expand Down Expand Up @@ -318,7 +318,6 @@ func (r *EvmRegistry) doMercuryRequest(ctx context.Context, sl *StreamsLookup, l
results[m.Index] = m.Bytes[0]
}
}
lggr.Debugf("upkeep %s retryable %t reqErr %w", sl.upkeepId.String(), retryable && !allSuccess, reqErr)
// only retry when not all successful AND none are not retryable
return state, encoding.UpkeepFailureReasonNone, results, retryable && !allSuccess, reqErr
}
Expand Down Expand Up @@ -431,13 +430,14 @@ func (r *EvmRegistry) singleFeedRequest(ctx context.Context, ch chan<- MercuryDa

// multiFeedsRequest sends a Mercury v0.3 request for a multi-feed report
func (r *EvmRegistry) multiFeedsRequest(ctx context.Context, ch chan<- MercuryData, sl *StreamsLookup, lggr logger.Logger) {
q := url.Values{
feedIDs: {strings.Join(sl.feeds, ",")},
timestamp: {sl.time.String()},
}

reqUrl := fmt.Sprintf("%s%s%s", r.mercury.cred.URL, mercuryBatchPathV03, q.Encode())
lggr.Debugf("request URL for upkeep %s feed %s: %s", sl.upkeepId.String(), strings.Join(sl.feeds, ","), reqUrl)
// this won't work bc q.Encode() will encode commas as '%2C' but the server is strictly expecting a comma separated list
//q := url.Values{
// feedIDs: {strings.Join(sl.feeds, ",")},
// timestamp: {sl.time.String()},
//}
params := fmt.Sprintf("%s=%s&%s=%s", feedIDs, strings.Join(sl.feeds, ","), timestamp, sl.time.String())
reqUrl := fmt.Sprintf("%s%s%s", r.mercury.cred.URL, mercuryBatchPathV03, params)
lggr.Debugf("request URL for upkeep %s userId %s: %s", sl.upkeepId.String(), r.mercury.cred.Username, reqUrl)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqUrl, nil)
if err != nil {
Expand All @@ -446,7 +446,7 @@ func (r *EvmRegistry) multiFeedsRequest(ctx context.Context, ch chan<- MercuryDa
}

ts := time.Now().UTC().UnixMilli()
signature := r.generateHMAC(http.MethodGet, mercuryBatchPathV03+q.Encode(), []byte{}, r.mercury.cred.Username, r.mercury.cred.Password, ts)
signature := r.generateHMAC(http.MethodGet, mercuryBatchPathV03+params, []byte{}, r.mercury.cred.Username, r.mercury.cred.Password, ts)
req.Header.Set(headerContentType, applicationJson)
// username here is often referred to as user id
req.Header.Set(headerAuthorization, r.mercury.cred.Username)
Expand All @@ -465,7 +465,7 @@ func (r *EvmRegistry) multiFeedsRequest(ctx context.Context, ch chan<- MercuryDa
retryable = false
resp, err1 := r.hc.Do(req)
if err1 != nil {
lggr.Warnf("at block %s upkeep %s GET request fails from mercury v0.3: %v", sl.time.String(), sl.upkeepId.String(), err1)
lggr.Warnf("at timestamp %s upkeep %s GET request fails from mercury v0.3: %v", sl.time.String(), sl.upkeepId.String(), err1)
retryable = true
state = encoding.MercuryFlakyFailure
return err1
Expand All @@ -484,43 +484,48 @@ func (r *EvmRegistry) multiFeedsRequest(ctx context.Context, ch chan<- MercuryDa
return err1
}

if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusInternalServerError {
lggr.Warnf("at block %s upkeep %s received status code %d from mercury v0.3", sl.time.String(), sl.upkeepId.String(), resp.StatusCode)
lggr.Infof("at timestamp %s upkeep %s received status code %d from mercury v0.3", sl.time.String(), sl.upkeepId.String(), resp.StatusCode)
if resp.StatusCode == http.StatusUnauthorized {
retryable = false
state = encoding.UpkeepNotAuthorized
return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by unauthorized upkeep", sl.time.String(), sl.upkeepId.String(), resp.StatusCode)
} else if resp.StatusCode == http.StatusBadRequest {
retryable = false
state = encoding.InvalidMercuryRequest
return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by invalid format of timestamp", sl.time.String(), sl.upkeepId.String(), resp.StatusCode)
} else if resp.StatusCode == http.StatusInternalServerError {
retryable = true
state = encoding.MercuryFlakyFailure
return errors.New(strconv.FormatInt(int64(resp.StatusCode), 10))
return fmt.Errorf("%d", http.StatusInternalServerError)
} else if resp.StatusCode == 420 {
// in 0.3, this will happen when missing/malformed query args, missing or bad required headers, non-existent feeds, or no permissions for feeds
retryable = false
return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by missing/malformed query args, missing or bad required headers, non-existent feeds, or no permissions for feeds", sl.time.String(), sl.upkeepId.String(), resp.StatusCode)
} else if resp.StatusCode != http.StatusOK {
retryable = false
state = encoding.InvalidMercuryRequest
return fmt.Errorf("at block %s upkeep %s received status code %d from mercury v0.3", sl.time.String(), sl.upkeepId.String(), resp.StatusCode)
return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3", sl.time.String(), sl.upkeepId.String(), resp.StatusCode)
}

var response MercuryV03Response
err1 = json.Unmarshal(body, &response)
if err1 != nil {
lggr.Warnf("at block %s upkeep %s failed to unmarshal body to MercuryV03Response from mercury v0.3: %v", sl.time.String(), sl.upkeepId.String(), err1)
lggr.Warnf("at timestamp %s upkeep %s failed to unmarshal body to MercuryV03Response from mercury v0.3: %v", sl.time.String(), sl.upkeepId.String(), err1)
retryable = false
state = encoding.MercuryUnmarshalError
return err1
}
// in v0.3, if some feeds are not available, the server will only return available feeds, but we need to make sure ALL feeds are retrieved before calling user contract
// hence, retry in this case. retry will help when we send a very new timestamp and reports are not yet generated
if len(response.Reports) != len(sl.feeds) {
// this should never happen. if this upkeep does not have permission for any feeds it requests, or if certain feeds are
// missing in mercury server, the mercury server v0.3 should respond with 400s, rather than returning partial results
retryable = false
state = encoding.InvalidMercuryResponse
return fmt.Errorf("at block %s upkeep %s requested %d feeds but received %d reports from mercury v0.3", sl.time.String(), sl.upkeepId.String(), len(sl.feeds), len(response.Reports))
// TODO: AUTO-5044: calculate what reports are missing and log a warning
retryable = true
state = encoding.MercuryFlakyFailure
return fmt.Errorf("%d", http.StatusNotFound)
}
var reportBytes [][]byte
var b []byte
for _, rsp := range response.Reports {
b, err1 = hexutil.Decode(rsp.FullReport)
if err1 != nil {
lggr.Warnf("upkeep %s block %s failed to decode fullReport %s from mercury v0.3: %v", sl.upkeepId.String(), sl.time.String(), rsp.FullReport, err1)
retryable = false
state = encoding.InvalidMercuryResponse
return err1
}
reportBytes = append(reportBytes, b)
reportBytes = append(reportBytes, rsp.FullReport)
}
ch <- MercuryData{
Index: 0,
Expand Down
Loading

0 comments on commit ebc6a02

Please sign in to comment.