Skip to content

Commit

Permalink
remove receiptsAndBlobs from GRPC
Browse files Browse the repository at this point in the history
  • Loading branch information
badgersrus committed Dec 17, 2024
1 parent 636cde0 commit e4b03f0
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 94 deletions.
2 changes: 1 addition & 1 deletion go/common/enclave.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type EnclaveAdmin interface {
// It is the responsibility of the host to gossip the returned rollup
// For good functioning the caller should always submit blocks ordered by height
// submitting a block before receiving ancestors of it, will result in it being ignored
SubmitL1Block(ctx context.Context, blockHeader *types.Header, receipts []*TxAndReceiptAndBlobs, processed *ProcessedL1Data) (*BlockSubmissionResponse, SystemError)
SubmitL1Block(ctx context.Context, blockHeader *types.Header, processed *ProcessedL1Data) (*BlockSubmissionResponse, SystemError)

// SubmitBatch submits a batch received from the sequencer for processing.
SubmitBatch(ctx context.Context, batch *ExtBatch) SystemError
Expand Down
17 changes: 8 additions & 9 deletions go/enclave/components/block_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,31 +59,31 @@ func NewBlockProcessor(storage storage.Storage, cc *crosschain.Processors, gasOr
}
}

func (bp *l1BlockProcessor) Process(ctx context.Context, br *common.BlockAndReceipts, processed *common.ProcessedL1Data) (*BlockIngestionType, error) {
defer core.LogMethodDuration(bp.logger, measure.NewStopwatch(), "L1 block processed", log.BlockHashKey, br.BlockHeader.Hash())
func (bp *l1BlockProcessor) Process(ctx context.Context, processed *common.ProcessedL1Data) (*BlockIngestionType, error) {
defer core.LogMethodDuration(bp.logger, measure.NewStopwatch(), "L1 block processed", log.BlockHashKey, processed.BlockHeader.Hash())

ingestion, err := bp.tryAndInsertBlock(ctx, br)
ingestion, err := bp.tryAndInsertBlock(ctx, processed.BlockHeader)
if err != nil {
return nil, err
}

if !ingestion.PreGenesis {
// This requires block to be stored first ... but can permanently fail a block
err = bp.crossChainProcessors.Remote.StoreCrossChainMessages(ctx, br.BlockHeader, br.Receipts(), processed)
err = bp.crossChainProcessors.Remote.StoreCrossChainMessages(ctx, processed.BlockHeader, processed)
if err != nil {
return nil, errors.New("failed to process cross chain messages")
}

err = bp.crossChainProcessors.Remote.StoreCrossChainValueTransfers(ctx, br.BlockHeader, br.Receipts(), processed)
err = bp.crossChainProcessors.Remote.StoreCrossChainValueTransfers(ctx, processed.BlockHeader, processed)
if err != nil {
return nil, fmt.Errorf("failed to process cross chain transfers. Cause: %w", err)
}
}

// todo @siliev - not sure if this is the best way to update the price, will pick up random stale blocks from forks?
bp.gasOracle.ProcessL1Block(br.BlockHeader)
bp.gasOracle.ProcessL1Block(processed.BlockHeader)

h := br.BlockHeader.Hash()
h := processed.BlockHeader.Hash()
bp.currentL1Head = &h
bp.lastIngestedBlock.Mark()
return ingestion, nil
Expand All @@ -99,8 +99,7 @@ func (bp *l1BlockProcessor) HealthCheck() (bool, error) {
return true, nil
}

func (bp *l1BlockProcessor) tryAndInsertBlock(ctx context.Context, br *common.BlockAndReceipts) (*BlockIngestionType, error) {
block := br.BlockHeader
func (bp *l1BlockProcessor) tryAndInsertBlock(ctx context.Context, block *types.Header) (*BlockIngestionType, error) {

// We insert the block into the L1 chain and store it.
// in case the block already exists in the database, this will be treated like a fork, because the head changes to
Expand Down
4 changes: 2 additions & 2 deletions go/enclave/components/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (bit *BlockIngestionType) IsFork() bool {
}

type L1BlockProcessor interface {
Process(ctx context.Context, br *common.BlockAndReceipts, processed *common.ProcessedL1Data) (*BlockIngestionType, error)
Process(ctx context.Context, processed *common.ProcessedL1Data) (*BlockIngestionType, error)
GetHead(context.Context) (*types.Header, error)
GetCrossChainContractAddress() *gethcommon.Address
HealthCheck() (bool, error)
Expand Down Expand Up @@ -157,5 +157,5 @@ type RollupProducer interface {
type RollupConsumer interface {
// ProcessBlobsInBlock - extracts the blob hashes from the block's transactions and builds the blob hashes from the blobs,
// compares this with the hashes seen in the block.
ProcessBlobsInBlock(ctx context.Context, b *common.BlockAndReceipts, processed *common.ProcessedL1Data) error
ProcessBlobsInBlock(ctx context.Context, processed *common.ProcessedL1Data) error
}
35 changes: 12 additions & 23 deletions go/enclave/components/rollup_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewRollupConsumer(

// ProcessBlobsInBlock - processes the blobs in a block, extracts the rollups, verifies the rollups and stores them
// FIXME remove BlockAndReceipts
func (rc *rollupConsumerImpl) ProcessBlobsInBlock(ctx context.Context, _ *common.BlockAndReceipts, processed *common.ProcessedL1Data) error {
func (rc *rollupConsumerImpl) ProcessBlobsInBlock(ctx context.Context, processed *common.ProcessedL1Data) error {
defer core.LogMethodDuration(rc.logger, measure.NewStopwatch(), "Rollup consumer processed blobs", log.BlockHashKey, processed.BlockHeader.Hash())

block := processed.BlockHeader
Expand Down Expand Up @@ -126,54 +126,43 @@ func (rc *rollupConsumerImpl) extractAndVerifyRollups(processed *common.Processe
rollupTxs := processed.GetEvents(common.RollupTx)
rollups := make([]*common.ExtRollup, 0, len(rollupTxs))

//println("---- START OF ROLLUP CHECK -------")
//println("length of rollupTxs: ", len(rollupTxs))
blobs, blobHashes, err := rc.extractBlobsAndHashes(rollupTxs)
if err != nil {
return nil, err
}

for i, tx := range rollupTxs {
t := rc.MgmtContractLib.DecodeTx(tx.Transaction)
if t == nil {
continue
}

//println("TX HASH: ", tx.Transaction.Hash().Hex())
rollupHashes, ok := t.(*ethadapter.L1RollupHashes)
if !ok {
continue
}

// Only use blobs from this transaction
blobs := tx.Blobs
_, blobHashes, err := ethadapter.MakeSidecar(blobs)
if err != nil {
return nil, fmt.Errorf("could not create blob sidecar and blob hashes. Cause: %w", err)
}

if err := verifyBlobHashes(rollupHashes, blobHashes); err != nil {
rc.logger.Warn(fmt.Sprintf("blob hashes in rollup at index %d do not match the rollup blob hashes. Cause: %s", i, err), log.NodeIDKey)
continue
rc.logger.Warn(fmt.Sprintf("blob hashes in rollup at index %d do not match the rollup blob hashes. Cause: %s", i, err))
continue // Blob hashes don't match, skip this rollup
}

//println("ROLLUP CONSUMER blobs: ", len(blobs))
//for i, h := range blobHashes {
// println("ROLLUP CONSUMER blob hash: ", h.Hex(), " at index: ", i)
//}

r, err := ethadapter.ReconstructRollup(blobs)
if err != nil {
// This is a critical error because we've already verified the blob hashes
// If we can't reconstruct the rollup at this point, something is seriously wrong
return nil, fmt.Errorf("could not recreate rollup from blobs. Cause: %w", err)
}

//println("Adding rollup to list: ", r.Hash().Hex())
rollups = append(rollups, r)

rc.logger.Info("Extracted rollup from block", log.RollupHashKey, r.Hash(), log.BlockHashKey, processed.BlockHeader.Hash())
}
if len(rollups) > 1 {
println("HERE")
if rollups[0].Hash() == rollups[1].Hash() {
println("ROLLUPS THE SAME: ", rollups[0].Hash().Hex())
return []*common.ExtRollup{rollups[0]}, nil
//rc.logger.Crit("DUPLICATE ROLLUP")
println("ROLLUPS THE SAME")
}

}
return rollups, nil
}
Expand Down
4 changes: 2 additions & 2 deletions go/enclave/crosschain/block_message_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (m *blockMessageExtractor) Enabled() bool {
return m.GetBusAddress().Big().Cmp(gethcommon.Big0) != 0
}

func (m *blockMessageExtractor) StoreCrossChainValueTransfers(ctx context.Context, block *types.Header, receipts common.L1Receipts, processed *common.ProcessedL1Data) error {
func (m *blockMessageExtractor) StoreCrossChainValueTransfers(ctx context.Context, block *types.Header, processed *common.ProcessedL1Data) error {
defer core.LogMethodDuration(m.logger, measure.NewStopwatch(), "BlockHeader value transfer messages processed", log.BlockHashKey, block.Hash())

transferEvents := processed.GetEvents(common.CrossChainValueTranserTx)
Expand Down Expand Up @@ -68,7 +68,7 @@ func (m *blockMessageExtractor) StoreCrossChainValueTransfers(ctx context.Contex
// block - the L1 block for which events are extracted.
// receipts - all of the receipts for the corresponding block. This is validated.
// FIXME remove receipts arg
func (m *blockMessageExtractor) StoreCrossChainMessages(ctx context.Context, block *types.Header, _ common.L1Receipts, processed *common.ProcessedL1Data) error {
func (m *blockMessageExtractor) StoreCrossChainMessages(ctx context.Context, block *types.Header, processed *common.ProcessedL1Data) error {
defer core.LogMethodDuration(m.logger, measure.NewStopwatch(), "BlockHeader cross chain messages processed", log.BlockHashKey, block.Hash())

messageEvents := processed.GetEvents(common.CrossChainMessageTx)
Expand Down
4 changes: 2 additions & 2 deletions go/enclave/crosschain/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ type (

type BlockMessageExtractor interface {
// StoreCrossChainMessages - Verifies receipts belong to block and saves the relevant cross chain messages from the receipts
StoreCrossChainMessages(ctx context.Context, block *types.Header, receipts common.L1Receipts, processed *common.ProcessedL1Data) error
StoreCrossChainMessages(ctx context.Context, block *types.Header, processed *common.ProcessedL1Data) error

StoreCrossChainValueTransfers(ctx context.Context, block *types.Header, receipts common.L1Receipts, processed *common.ProcessedL1Data) error
StoreCrossChainValueTransfers(ctx context.Context, block *types.Header, processed *common.ProcessedL1Data) error

// GetBusAddress - Returns the L1 message bus address.
GetBusAddress() *common.L1Address
Expand Down
4 changes: 2 additions & 2 deletions go/enclave/enclave.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,11 @@ func (e *enclaveImpl) StreamL2Updates() (chan common.StreamL2UpdatesResponse, fu
}

// SubmitL1Block is used to update the enclave with an additional L1 block.
func (e *enclaveImpl) SubmitL1Block(ctx context.Context, blockHeader *types.Header, receipts []*common.TxAndReceiptAndBlobs, processed *common.ProcessedL1Data) (*common.BlockSubmissionResponse, common.SystemError) {
func (e *enclaveImpl) SubmitL1Block(ctx context.Context, blockHeader *types.Header, processed *common.ProcessedL1Data) (*common.BlockSubmissionResponse, common.SystemError) {
if systemError := checkStopping(e.stopControl); systemError != nil {
return nil, systemError
}
return e.adminAPI.SubmitL1Block(ctx, blockHeader, receipts, processed)
return e.adminAPI.SubmitL1Block(ctx, blockHeader, processed)
}

func (e *enclaveImpl) SubmitBatch(ctx context.Context, extBatch *common.ExtBatch) common.SystemError {
Expand Down
26 changes: 13 additions & 13 deletions go/enclave/enclave_admin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,17 @@ func (e *enclaveAdminService) MakeActive() common.SystemError {
}

// SubmitL1Block is used to update the enclave with an additional L1 block.
func (e *enclaveAdminService) SubmitL1Block(ctx context.Context, blockHeader *types.Header, receipts []*common.TxAndReceiptAndBlobs, processed *common.ProcessedL1Data) (*common.BlockSubmissionResponse, common.SystemError) {
func (e *enclaveAdminService) SubmitL1Block(ctx context.Context, blockHeader *types.Header, processed *common.ProcessedL1Data) (*common.BlockSubmissionResponse, common.SystemError) {
e.mainMutex.Lock()
defer e.mainMutex.Unlock()

e.logger.Info("SubmitL1Block", log.BlockHeightKey, blockHeader.Number, log.BlockHashKey, blockHeader.Hash())

// If the block and receipts do not match, reject the block.
br, err := common.ParseBlockAndReceipts(blockHeader, receipts)
if err != nil {
return nil, e.rejectBlockErr(ctx, fmt.Errorf("could not submit L1 block. Cause: %w", err))
}
//// If the block and receipts do not match, reject the block.
//br, err := common.ParseBlockAndReceipts(blockHeader, receipts)
//if err != nil {
// return nil, e.rejectBlockErr(ctx, fmt.Errorf("could not submit L1 block. Cause: %w", err))
//}

// Verify the block header matches the one in processedData
if blockHeader.Hash() != processed.BlockHeader.Hash() {
Expand All @@ -190,7 +190,7 @@ func (e *enclaveAdminService) SubmitL1Block(ctx context.Context, blockHeader *ty

// TODO verify proof provided with block processed.Proof

result, err := e.ingestL1Block(ctx, br, processed)
result, err := e.ingestL1Block(ctx, processed)
if err != nil {
return nil, e.rejectBlockErr(ctx, fmt.Errorf("could not submit L1 block. Cause: %w", err))
}
Expand Down Expand Up @@ -471,20 +471,20 @@ func (e *enclaveAdminService) streamEventsForNewHeadBatch(ctx context.Context, b
}
}

func (e *enclaveAdminService) ingestL1Block(ctx context.Context, br *common.BlockAndReceipts, processed *common.ProcessedL1Data) (*components.BlockIngestionType, error) {
e.logger.Info("Start ingesting block", log.BlockHashKey, br.BlockHeader.Hash())
ingestion, err := e.l1BlockProcessor.Process(ctx, br, processed)
func (e *enclaveAdminService) ingestL1Block(ctx context.Context, processed *common.ProcessedL1Data) (*components.BlockIngestionType, error) {
e.logger.Info("Start ingesting block", log.BlockHashKey, processed.BlockHeader.Hash())
ingestion, err := e.l1BlockProcessor.Process(ctx, processed)
if err != nil {
// only warn for unexpected errors
if errors.Is(err, errutil.ErrBlockAncestorNotFound) || errors.Is(err, errutil.ErrBlockAlreadyProcessed) {
e.logger.Debug("Did not ingest block", log.ErrKey, err, log.BlockHashKey, br.BlockHeader.Hash())
e.logger.Debug("Did not ingest block", log.ErrKey, err, log.BlockHashKey, processed.BlockHeader.Hash())
} else {
e.logger.Warn("Failed ingesting block", log.ErrKey, err, log.BlockHashKey, br.BlockHeader.Hash())
e.logger.Warn("Failed ingesting block", log.ErrKey, err, log.BlockHashKey, processed.BlockHeader.Hash())
}
return nil, err
}

err = e.rollupConsumer.ProcessBlobsInBlock(ctx, br, processed)
err = e.rollupConsumer.ProcessBlobsInBlock(ctx, processed)
if err != nil && !errors.Is(err, components.ErrDuplicateRollup) {
e.logger.Error("Encountered error while processing l1 block rollups", log.ErrKey, err)
// Unsure what to do here; block has been stored
Expand Down
19 changes: 1 addition & 18 deletions go/enclave/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,18 +148,13 @@ func (s *RPCServer) SubmitL1Block(ctx context.Context, request *generated.Submit
return nil, err
}

txReceiptsAndBlobs, err := s.decodeReceiptsAndBlobs(request.EncodedReceipts)
if err != nil {
s.logger.Error("Error decoding receipts", log.ErrKey, err)
return nil, err
}
processedData, err := s.decodeProcessedData(request.EncodedProcessedData)
if err != nil {
s.logger.Error("Error decoding receipts", log.ErrKey, err)
return nil, err
}

blockSubmissionResponse, err := s.enclave.SubmitL1Block(ctx, bl, txReceiptsAndBlobs, processedData)
blockSubmissionResponse, err := s.enclave.SubmitL1Block(ctx, bl, processedData)
if err != nil {
var rejErr *errutil.BlockRejectError
isReject := errors.As(err, &rejErr)
Expand Down Expand Up @@ -441,18 +436,6 @@ func (s *RPCServer) decodeBlock(encodedBlock []byte) (*types.Header, error) {
return &block, nil
}

// decodeReceiptsAndBlobs - converts the rlp encoded bytes to receipts if possible.
func (s *RPCServer) decodeReceiptsAndBlobs(encodedReceipts []byte) ([]*common.TxAndReceiptAndBlobs, error) {
receipts := make([]*common.TxAndReceiptAndBlobs, 0)

err := rlp.DecodeBytes(encodedReceipts, &receipts)
if err != nil {
return nil, fmt.Errorf("unable to decode receipts, bytes=%x, err=%w", encodedReceipts, err)
}

return receipts, nil
}

// decodeProcessedData - converts the rlp encoded bytes to processed if possible.
func (s *RPCServer) decodeProcessedData(encodedData []byte) (*common.ProcessedL1Data, error) {
var processed common.ProcessedL1Data
Expand Down
6 changes: 3 additions & 3 deletions go/ethadapter/mgmtcontractlib/mgmt_contract_lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ func (c *contractLibImpl) CreateBlobRollup(t *ethadapter.L1RollupTx) (types.TxDa
return nil, fmt.Errorf("failed to make sidecar: %w", err)
}

for i, b := range blobHashes {
println("MGMT CONTRACT creating blob with hashes ", b.Hex(), " at index: ", i)
}
//for i, b := range blobHashes {
// println("MGMT CONTRACT creating blob with hashes ", b.Hex(), " at index: ", i)
//}
return &types.BlobTx{
To: *c.addr,
Data: data,
Expand Down
47 changes: 39 additions & 8 deletions go/host/enclave/guardian.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,15 +466,21 @@ func (g *Guardian) submitL1Block(block *common.L1Block, isLatest bool) (bool, er
g.logger.Debug("Unable to submit block, enclave is busy processing data")
return false, nil
}
receipts, err := g.sl.L1Repo().FetchObscuroReceipts(block)
//receipts, err := g.sl.L1Repo().FetchObscuroReceipts(block)
//if err != nil {
// g.submitDataLock.Unlock() // lock must be released before returning
// return false, fmt.Errorf("could not fetch obscuro receipts for block=%s - %w", block.Hash(), err)
//}
//txsReceiptsAndBlobs, rollupTxs, contractAddressTxs := g.sl.L1Publisher().ExtractRelevantTenTransactions(block, receipts)
processedData, err := g.sl.L1Repo().ExtractTenTransactions(block)
if err != nil {
g.submitDataLock.Unlock() // lock must be released before returning
return false, fmt.Errorf("could not fetch obscuro receipts for block=%s - %w", block.Hash(), err)
return false, fmt.Errorf("could not extract ten transaction for block=%s - %w", block.Hash(), err)
}
txsReceiptsAndBlobs, rollupTxs, contractAddressTxs := g.sl.L1Publisher().ExtractRelevantTenTransactions(block, receipts)
processedData, err := g.sl.L1Repo().ExtractTenTransactions(block)

resp, err := g.enclaveClient.SubmitL1Block(context.Background(), block.Header(), txsReceiptsAndBlobs, processedData)
rollupTxs, syncContracts := g.getRollupsAndContractAddrTxs(*processedData)

resp, err := g.enclaveClient.SubmitL1Block(context.Background(), block.Header(), processedData)
g.submitDataLock.Unlock() // lock is only guarding the enclave call, so we can release it now
if err != nil {
if strings.Contains(err.Error(), errutil.ErrBlockAlreadyProcessed.Error()) {
Expand All @@ -494,7 +500,7 @@ func (g *Guardian) submitL1Block(block *common.L1Block, isLatest bool) (bool, er
}
// successfully processed block, update the state
g.state.OnProcessedBlock(block.Hash())
g.processL1BlockTransactions(block, rollupTxs, contractAddressTxs)
g.processL1BlockTransactions(block, rollupTxs, syncContracts)

if err != nil {
return false, fmt.Errorf("submitted block to enclave but could not store the block processing result. Cause: %w", err)
Expand All @@ -508,7 +514,7 @@ func (g *Guardian) submitL1Block(block *common.L1Block, isLatest bool) (bool, er
return true, nil
}

func (g *Guardian) processL1BlockTransactions(block *common.L1Block, rollupTxs []*ethadapter.L1RollupTx, contractAddressTxs []*ethadapter.L1SetImportantContractsTx) {
func (g *Guardian) processL1BlockTransactions(block *common.L1Block, rollupTxs []*ethadapter.L1RollupTx, syncContracts bool) {
// TODO (@will) this should be removed and pulled from the L1
err := g.storage.AddBlock(block.Header())
if err != nil {
Expand Down Expand Up @@ -536,7 +542,7 @@ func (g *Guardian) processL1BlockTransactions(block *common.L1Block, rollupTxs [
}
}

if len(contractAddressTxs) > 0 {
if syncContracts {
go func() {
err := g.sl.L1Publisher().ResyncImportantContracts()
if err != nil {
Expand Down Expand Up @@ -817,3 +823,28 @@ func (g *Guardian) evictEnclaveFromHAPool() {
}
go g.sl.Enclaves().EvictEnclave(g.enclaveID)
}

func (g *Guardian) getRollupsAndContractAddrTxs(processed common.ProcessedL1Data) ([]*ethadapter.L1RollupTx, bool) {
rollupTxs := make([]*ethadapter.L1RollupTx, 0)
var syncContracts bool
syncContracts = false

for _, txData := range processed.GetEvents(common.RollupTx) {
encodedRlp, err := ethadapter.DecodeBlobs(txData.Blobs)
if err != nil {
g.logger.Crit("could not decode blobs.", log.ErrKey, err)
continue
}

rlp := &ethadapter.L1RollupTx{
Rollup: encodedRlp,
}
rollupTxs = append(rollupTxs, rlp)
}

// if any contracts have been updated then we need to resync
if len(processed.GetEvents(common.SetImportantContractsTx)) > 0 {
syncContracts = true
}
return rollupTxs, syncContracts
}
Loading

0 comments on commit e4b03f0

Please sign in to comment.