-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Offchain usdc #112
Offchain usdc #112
Conversation
core/chains/evm/logpoller/orm.go
Outdated
AND tx_hash = $3 | ||
AND event_sig = $4 | ||
ORDER BY (evm_logs.block_number, evm_logs.log_index)`, | ||
utils.NewBig(o.chainID), txHash.Bytes(), eventSig.Bytes()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: Make sure query is properly hitting indexes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, please make sure it doesn't hurt db performance, I described how to create a simple test on a local machine and test it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably we miss the index for tx_hash
. Postgres will be able to performantly pick up data using index only for chain_id
and event_sig
. Then will have to perform filtering on the remaining part using tx_hash
Example on my local machine
Sort (cost=575.91..575.92 rows=1 width=297) (actual time=213.647..259.292 rows=100000 loops=1)
Sort Key: (ROW(block_number, log_index))
Sort Method: external merge Disk: 31240kB
-> Index Scan using idx_evm_logs_ordered_by_block_and_created_at on logs (cost=0.55..575.90 rows=1 width=297) (actual time=0.070..89.148 rows=100000 loops=1)
Index Cond: ((evm_chain_id = '1'::numeric) AND (event_sig = '\xa32c5f1d88735034143171f18fbb4d447fbbe0fbf4c98733d54092a081ba5d2a'::bytea))
Filter: (tx_hash = '\xbb0f07700587fee22c5863ed75a5e05b68fcac4763449a582987fa4e8e9a2d0f'::bytea)
@@ -39,6 +41,9 @@ type Client interface { | |||
// GetExecutionStateChangesBetweenSeqNums returns all the execution state change events for the provided message sequence numbers (inclusive). | |||
GetExecutionStateChangesBetweenSeqNums(ctx context.Context, offRamp common.Address, seqNumMin, seqNumMax uint64, confs int) ([]Event[evm_2_evm_offramp.EVM2EVMOffRampExecutionStateChanged], error) | |||
|
|||
// GetLastUSDCMessagePriorToLogIndexInTx returns the last USDC message that was sent before the provided log index in the given transaction. | |||
GetLastUSDCMessagePriorToLogIndexInTx(ctx context.Context, logIndex int64, txHash common.Hash) ([]byte, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be cleaner to change the method signature to:
GetLastUSDCMessagePriotToSeqNum(ctx, seqNum)
That way you will have to make the two calls inside the ccipevents/
method, so the caller code is cleaner, it only needs to know the sequence number to access that method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would need the params from GetSendRequestsBetweenSeqNums as well, so the onRampAddress. Might be worth it though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolved in other ways, please check if the new way works for you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe update this method to accept a ccip msg (trying to keep the interface close to ccip not to evm, that way it's going to be easier to add non-evm).
GetLastUSDCMessagePriorToCcipMsg(ctx context.Context, msg)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logIndex
won't work for e.g. solana
|
||
tokenDataProviders := make(map[common.Address]offchaintokendata.Provider) | ||
|
||
// TODO the called function only uses the chainId to get the message transmitter address |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The filter register/unregister logic becomes weird. But I'm not sure if this can be improved :S
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jup, happy to hear any suggestions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we put the relevant addresses in the plugin config this problem goes away right? Maybe just put a USDCConfig struct in the plugin config like:
type USDCConfig struct {
AttestationAPI string `json:"attestationAPI"`
MessageTransmitterAddress common `json:"MessageTransmitterAddress"`
TokenAddress common `json:"MessageTransmitterAddress"`
}
func (uc USDCConfig) Validate(chainID) error { // run basic sanity on the addresses}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean the job spec?
@@ -66,6 +67,7 @@ type ExecutionPluginConfig struct { | |||
sourceClient evmclient.Client | |||
destGasEstimator gas.EvmFeeEstimator | |||
leafHasher hasher.LeafHasherInterface[[32]byte] | |||
tokenDataProviders map[common.Address]offchaintokendata.Provider |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking about replacing the map with:
type TokenDataProvider interface {
IsAttestationComplete(ctx context.Context, token common.Address, seqNum uint64) (ready bool, attestation []byte, err error)
GetSourceLogPollerFilters(token common.Address) []logpoller.Filter
GetSourceToken(token common.Address) common.Address
}
i.e. accept token address on each method call as first arg, I think that it would make the plugin code cleaner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After implementing TokenDataProvider
for another token you also won't have to touch plugin code.
return AttestationResponse{}, err | ||
} | ||
req.Header.Add("accept", "application/json") | ||
res, err := http.DefaultClient.Do(req) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to hook a properly configured http client
instance to the struct instead of using the http.DefaultClient
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't there any lib in our codebase to deal with http calls? which leads to not do manual stuff like req.Header.Add("accept", "application/json") -> read body -> parse body -> etc...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is the best method. DefaultClient already is a reused object so there doesn't appear to be much overhead.
core/chains/evm/logpoller/orm.go
Outdated
var logs []Log | ||
err := q.Select(&logs, ` | ||
SELECT * FROM evm_logs | ||
WHERE evm_logs.evm_chain_id = $1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefixing by table name is not needed here (we don't even use joins here). IMO it's more confusing with prefixes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copied it from the other functions, they all do it
core/chains/evm/logpoller/orm.go
Outdated
AND tx_hash = $3 | ||
AND event_sig = $4 | ||
ORDER BY (evm_logs.block_number, evm_logs.log_index)`, | ||
utils.NewBig(o.chainID), txHash.Bytes(), eventSig.Bytes()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, please make sure it doesn't hurt db performance, I described how to create a simple test on a local machine and test it
@@ -1005,6 +1006,10 @@ func (lp *logPoller) LatestBlockByEventSigsAddrsWithConfs(fromBlock int64, event | |||
return lp.orm.SelectLatestBlockNumberEventSigsAddrsWithConfs(fromBlock, eventSigs, addresses, confs, qopts...) | |||
} | |||
|
|||
func (lp *logPoller) IndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure to backport these changes to the main repo
core/services/ocr2/plugins/ccip/offchaintokendata/offchain_data_provider.go
Outdated
Show resolved
Hide resolved
var tokenData [][]byte | ||
for _, token := range msg.TokenAmounts { | ||
if offchainTokenDataProvider, ok := r.config.tokenDataProviders[token.Token]; ok { | ||
ready, attestation, err2 := offchainTokenDataProvider.IsAttestationComplete(context.TODO(), msg.SequenceNumber) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, it probably needs some pooling or running those concurrently in different go routines; also a good idea with timeouts.
wrap error, improved url formatting, orm fix
# Conflicts: # core/services/ocr2/plugins/ccip/execution_plugin.go
# Conflicts: # core/services/ocr2/plugins/ccip/execution_plugin.go # core/services/ocr2/plugins/ccip/execution_reporting_plugin.go # core/services/ocr2/plugins/ccip/execution_reporting_plugin_test.go # core/services/ocr2/plugins/ccip/internal/ccipevents/client_mock.go
observed logpoller rename IsAttestationComplete mutex in usdc mapping
core/services/ocr2/plugins/ccip/offchaintokendata/offchain_data_provider.go
Outdated
Show resolved
Hide resolved
core/services/ocr2/plugins/ccip/offchaintokendata/offchain_data_provider.go
Outdated
Show resolved
Hide resolved
ccipSendLogIndex := int64(ccipSendRequest.Data.Raw.Index) | ||
|
||
// Get the USDC message body | ||
usdcMessageBody, err := s.sourceChainEvents.GetLastUSDCMessagePriorToLogIndexInTx(ctx, ccipSendLogIndex, ccipSendTxHash) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I actually wonder if this is better suited to getReportsWithSendRequests and just part of meta in sendEventWithMeta for USDC sends? That way we load all relevant log data in one place and this component doesn't have to know about log stuff at all. It just can do simple dead simple memoization based on the calldata->responses. Also would help keep buildBatch more pure/testable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm true but that means spilling the USDC logic into the rest of the codebase. It's either spreading USDC or spreading log lookups. I'd prefer to not spill USDC, as there might be many cases like USDC and log getting is fairly generic. I see the value in keeping buildBatch more pure, but with the proper testing framework it should be extremely easy to mock out USDC logic, meaning it would remain pure from the tests perspective
|
||
tokenDataProviders := make(map[common.Address]offchaintokendata.Provider) | ||
|
||
// TODO the called function only uses the chainId to get the message transmitter address |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we put the relevant addresses in the plugin config this problem goes away right? Maybe just put a USDCConfig struct in the plugin config like:
type USDCConfig struct {
AttestationAPI string `json:"attestationAPI"`
MessageTransmitterAddress common `json:"MessageTransmitterAddress"`
TokenAddress common `json:"MessageTransmitterAddress"`
}
func (uc USDCConfig) Validate(chainID) error { // run basic sanity on the addresses}
core/services/ocr2/plugins/ccip/offchaintokendata/usdc/usdc_test.go
Outdated
Show resolved
Hide resolved
# Conflicts: # core/services/ocr2/plugins/ccip/execution_plugin.go # core/services/ocr2/plugins/ccip/execution_reporting_plugin_test.go # core/services/ocr2/plugins/ccip/internal/ccipdata/client_mock.go # core/services/ocr2/plugins/ccip/internal/ccipdata/logpoller.go # core/services/ocr2/plugins/ccip/internal/ccipdata/reader.go
return []byte{}, err | ||
} | ||
|
||
r.cache[msg.SequenceNumber] = tokenData |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe use sync.RWMutex
and do the lock like this?
that way you make the cache readable while making the readTokenData
call, which might take some time.
RLock()
try to read from cache
RUnlock
readTokenData()
Lock()
put in cache
Unlock()
considering the load it's minor though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe also add a comment to highlight that this method will also update the cache, not only read
// ... **and then Updates the cache**
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also add some unit test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will never be called concurrently, but yeah that is a better locking method.
# Conflicts: # core/chains/evm/logpoller/disabled.go # core/services/ocr2/plugins/ccip/execution_plugin.go # core/services/ocr2/plugins/ccip/execution_reporting_plugin.go
func getTokenDataProviders(pluginConfig ccipconfig.ExecutionPluginJobSpecConfig, onRampAddress common.Address, sourceChainEventClient *ccipdata.LogPollerReader) (map[common.Address]tokendata.Reader, error) { | ||
tokenDataProviders := make(map[common.Address]tokendata.Reader) | ||
|
||
if pluginConfig.USDCConfig.AttestationAPI != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit could invert this to avoid nesting i.e. if == "" early return. Maybe also want an info log indicating we're proceeding without USDC config to speed up debugging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was imagining this function to do setup for all future token data providers, therefore having an if for each. With just usdc in mind it makes sense flipping
ae97b39
to
4a62657
Compare
4a62657
to
e4f6b07
Compare
Motivation
USDC is supported onchain, now we need to add offchain support
Solution
Add attestation logic and a new USDC service to manage USDC related logic
PENDING main repo logpoller and index changes