-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial work to add an in-memory layer on top of the TxStore #11190
Conversation
I see that you haven't updated any README files. Would it make sense to do so? |
…age mechanics for address state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main goal of the InMemoryStore is to reduce entanglement with the postgres DB. It is part of a larger goal to make creating integrations faster and easier for developers.
This seems to me more like performance optimisation, than simplification of integration process or reduction of entanglement with postgres. It definitely makes TxStore more complex. It would be great to measure if performance improvements are actually sufficient to pay this price.
If we are sure that potential of performance gain here is significant. We might want to considered changing the paradigm from using some form of key store that is shared between different runners (broadcaster, confirmer, etc) to using channels to transfer responsibility over the tx.
Example: confirmer instead of pooling storage for transactions to check, receives them from broadcaster via channel.
This requires more refactoring to TxM compared to in memory store, but makes it more modular and extandable.
offset := 0 | ||
limit := 50 | ||
for { | ||
txs, count, err := txStore.UnstartedTransactions(offset, limit, as.fromAddress, as.chainID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there guarantee that two different instances of core won't run simultaneously on top of single DB? Example:(during maintenance or update of the node)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no i dont think there is. what scenarios are you thinking of?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory we might have a node that persists new tx after our node fetched them
common/txmgr/address_state.go
Outdated
offset += limit | ||
} | ||
|
||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also need to keep confirmed txs in memory to guarantee idempotent behaviour
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is true. i omitted it just to keep the PR from being too large
|
||
var maxSeq SEQ | ||
if as.inprogress != nil && as.inprogress.Sequence != nil { | ||
if (*as.inprogress.Sequence).Int64() > maxSeq.Int64() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably out of score for this PR, but should we use comparable restriction for SEQ instead of relaying on evm specific property Int64
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say yes lets add that in
return nil, fmt.Errorf("peek_in_progress_tx: %w (address: %s)", ErrTxnNotFound, as.fromAddress) | ||
} | ||
|
||
return tx, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sharing pointer to tx feels extremely unsafe, should we implement marshal/unmarshal when receiving/returning instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with this 💯 . We should think of ways to do this safely
return fmt.Errorf("move_unstarted_to_in_progress: address %s already has a transaction in progress", as.fromAddress) | ||
} | ||
|
||
if tx != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in which cases caller knowns better which tx to move?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you reword this statement? I am not sure i understand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moveUnstartedToInProgress accepts tx as argument, but allows it to be nil. In that case we get tx from the priority queue. In which cases caller of the moveUnstartedToInProgress
might want to use their own value of tx instead of one from the queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my reasoning for doing this was to minimize potential conflicts with the persistent store and the inmemory state. so in this case the caller gets the next unstarted tx from the db. I think we should move from this in the future though... it might make sense to exclude it until we are ready to use it though
@@ -695,7 +695,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next | |||
defer cancel() | |||
etx := &txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} | |||
if err := eb.txStore.FindNextUnstartedTransactionFromAddress(ctx, etx, fromAddress, eb.chainID); err != nil { | |||
if errors.Is(err, sql.ErrNoRows) { | |||
if errors.Is(err, sql.ErrNoRows) || errors.Is(err, ErrTxnNotFound) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels weird to force users of txStore to check for two different error, that actually mean the same. Won't it be easier to return sql.ErrNoRows from in memory storage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to get rid of the reliance on sql.ErrNoRows so this is the first step. relying on it forces us to be dependent on the sql pkg... i think we should use our own error types for it so we can make this less dependent
maxUnstarted := 50 | ||
addresses, err := keyStore.EnabledAddressesForChain(chainID) | ||
if err != nil { | ||
return nil, fmt.Errorf("new_in_memory_store: %w", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO wrapping should add context that is not know to caller. Should not we specify here that we failed to check if address is enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds reasonable we can add more context to the errors
common/txmgr/inmemory_store.go
Outdated
if ms.chainID.String() != chainID.String() { | ||
return tx, fmt.Errorf("create_transaction: %w", ErrInvalidChainID) | ||
} | ||
if _, ok := ms.addressStates[txRequest.FromAddress]; !ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we sure that it's not possible to add new keys on the fly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good find will add the missing locks
SonarQube Quality Gate Reliability Rating on New Code (is worse than A) See analysis details on SonarQube Fix issues before they fail your Quality Gate with SonarLint in your IDE. |
|
||
// Load all unconfirmed transactions from persistent storage | ||
offset = 0 | ||
limit = 50 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Could make the load limit a const. Would be easier to change in a single place if needed.
func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Push(tx any) { | ||
pq.txs = append(pq.txs, tx.(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE])) | ||
} | ||
func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Pop() any { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this different from RemoveNextTx()
? Can you add a comment explaining this, feels like we might not need both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work :)
This PR is meant to start a discussion on a proposed design of the In-Memory Storage.
Some key points