-
Notifications
You must be signed in to change notification settings - Fork 39
/
manager.go
118 lines (95 loc) · 3.17 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package mempool
import (
"errors"
"sort"
"sync"
"github.com/obscuronet/go-obscuro/go/common/log"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/obscuronet/go-obscuro/go/enclave/core"
"github.com/obscuronet/go-obscuro/go/enclave/limiters"
gethlog "github.com/ethereum/go-ethereum/log"
"github.com/obscuronet/go-obscuro/go/common"
)
// sortByNonce a very primitive way to implement mempool logic that
// adds transactions sorted by the nonce in the rollup
// which is what the EVM expects
type sortByNonce []*common.L2Tx
func (c sortByNonce) Len() int { return len(c) }
func (c sortByNonce) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
func (c sortByNonce) Less(i, j int) bool { return c[i].Nonce() < c[j].Nonce() }
// todo - optimize this to use a different data structure that does not require a global lock.
type mempoolManager struct {
mpMutex sync.RWMutex // Controls access to `mempool`
obscuroChainID int64
logger gethlog.Logger
mempool map[gethcommon.Hash]*common.L2Tx
}
func New(chainID int64, logger gethlog.Logger) Manager {
return &mempoolManager{
mempool: make(map[gethcommon.Hash]*common.L2Tx),
obscuroChainID: chainID,
mpMutex: sync.RWMutex{},
logger: logger,
}
}
func (db *mempoolManager) AddMempoolTx(tx *common.L2Tx) error {
// We do not care about the sender return value at this point, only that
// there is no error coming from validating the signature of said sender.
_, err := core.GetAuthenticatedSender(db.obscuroChainID, tx)
if err != nil {
return err
}
db.mpMutex.Lock()
defer db.mpMutex.Unlock()
db.mempool[tx.Hash()] = tx
return nil
}
func (db *mempoolManager) FetchMempoolTxs() []*common.L2Tx {
db.mpMutex.RLock()
defer db.mpMutex.RUnlock()
mpCopy := make([]*common.L2Tx, len(db.mempool))
i := 0
for _, tx := range db.mempool {
mpCopy[i] = tx
i++
}
return mpCopy
}
func (db *mempoolManager) RemoveTxs(transactions types.Transactions) error {
db.mpMutex.Lock()
defer db.mpMutex.Unlock()
for _, tx := range transactions {
delete(db.mempool, tx.Hash())
}
return nil
}
// CurrentTxs - Calculate transactions to be included in the current batch
func (db *mempoolManager) CurrentTxs(stateDB *state.StateDB, limiter limiters.BatchSizeLimiter) ([]*common.L2Tx, error) {
txes := db.FetchMempoolTxs()
sort.Sort(sortByNonce(txes))
applicableTransactions := make(common.L2Transactions, 0)
nonceTracker := NewNonceTracker(stateDB)
for _, tx := range txes {
sender, _ := core.GetAuthenticatedSender(db.obscuroChainID, tx)
if sender == nil {
continue
}
if tx.Nonce() != nonceTracker.GetNonce(*sender) {
continue
}
err := limiter.AcceptTransaction(tx)
if err != nil {
if errors.Is(err, limiters.ErrInsufficientSpace) { // Batch ran out of space
break
}
// Limiter encountered unexpected error
return nil, err
}
applicableTransactions = append(applicableTransactions, tx)
nonceTracker.IncrementNonce(*sender)
db.logger.Debug("Including transaction in batch", log.TxKey, tx.Hash(), "nonce", tx.Nonce())
}
return applicableTransactions, nil
}