From 0997a77ae357b7a9b744b3d99e751f202dcf8059 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Wed, 29 Jan 2025 20:00:15 +0300 Subject: [PATCH] blockfetcher: add headerSizeMap according to CommitteeHistory Signed-off-by: Ekaterina Pavlova --- pkg/services/blockfetcher/blockfetcher.go | 74 +++++++++++++++++------ 1 file changed, 55 insertions(+), 19 deletions(-) diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go index 3717673441..928b101f72 100644 --- a/pkg/services/blockfetcher/blockfetcher.go +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -65,6 +65,11 @@ func (p poolWrapper) Close() error { return nil } +type indexedOID struct { + Index int + OID oid.ID +} + // Service is a service that fetches blocks from NeoFS. type Service struct { // isActive denotes whether the service is working or in the process of shutdown. @@ -75,15 +80,15 @@ type Service struct { operationMode OperationMode stateRootInHeader bool - // headerSize is the size of the header in bytes. - headerSize int + // headerSizeMap is a map of height to expected header size. + headerSizeMap map[int]int chain Ledger pool poolWrapper enqueue func(block bqueue.Queueable) error account *wallet.Account - oidsCh chan oid.ID + oidsCh chan indexedOID // wg is a wait group for block downloaders. wg sync.WaitGroup @@ -101,7 +106,7 @@ type Service struct { shutdownCallback func() // Depends on the OperationMode, the following functions are set to the appropriate functions. - getFunc func(ctx context.Context, oid string) (io.ReadCloser, error) + getFunc func(ctx context.Context, oid string, index int) (io.ReadCloser, error) readFunc func(rc io.ReadCloser) (any, error) heightFunc func() uint32 } @@ -168,7 +173,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun log: logger, cfg: cfg, operationMode: opt, - headerSize: getHeaderSize(chain.GetConfig()), + headerSizeMap: getHeaderSizeMap(chain.GetConfig()), enqueue: put, account: account, @@ -184,13 +189,14 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun // * first full block of OIDs is processing by Downloader // * second full block of OIDs is available to be fetched by Downloader immediately // * third half-filled block of OIDs is being collected by OIDsFetcher. - oidsCh: make(chan oid.ID, 2*cfg.OIDBatchSize), + oidsCh: make(chan indexedOID, 2*cfg.OIDBatchSize), }, nil } -func getHeaderSize(chain config.Blockchain) int { - m := smartcontract.GetDefaultHonestNodeCount(int(chain.ValidatorsCount)) +func getHeaderSizeMap(chain config.Blockchain) map[int]int { + headerSizeMap := make(map[int]int) vs, _ := keys.NewPublicKeysFromStrings(chain.StandbyCommittee) + m := smartcontract.GetDefaultHonestNodeCount(int(chain.ValidatorsCount)) verification, _ := smartcontract.CreateDefaultMultiSigRedeemScript(vs[:chain.GetNumOfCNs(0)]) b := block.Header{ StateRootEnabled: chain.StateRootInHeader, @@ -200,7 +206,22 @@ func getHeaderSize(chain config.Blockchain) int { InvocationScript: make([]byte, 66*m), }, } - return b.GetExpectedHeaderSize() + headerSizeMap[0] = b.GetExpectedHeaderSize() + + for height := range chain.CommitteeHistory { + m = smartcontract.GetDefaultHonestNodeCount(chain.GetNumOfCNs(height)) + verification, _ = smartcontract.CreateDefaultMultiSigRedeemScript(vs[:chain.GetNumOfCNs(height)]) + b = block.Header{ + StateRootEnabled: chain.StateRootInHeader, + Timestamp: 42, + Script: transaction.Witness{ + VerificationScript: verification, + InvocationScript: make([]byte, 66*m), + }, + } + headerSizeMap[int(height)] = b.GetExpectedHeaderSize() + } + return headerSizeMap } // Start runs the NeoFS BlockFetcher service. @@ -288,11 +309,13 @@ func (bfs *Service) oidDownloader() { func (bfs *Service) blockDownloader() { defer bfs.wg.Done() - for blkOid := range bfs.oidsCh { + for indexedOid := range bfs.oidsCh { + index := indexedOid.Index + blkOid := indexedOid.OID ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) defer cancel() - rc, err := bfs.getFunc(ctx, blkOid.String()) + rc, err := bfs.getFunc(ctx, blkOid.String(), index) if err != nil { if isContextCanceledErr(err) { return @@ -358,7 +381,7 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error { blockCtx, blockCancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) defer blockCancel() - oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String()) + oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String(), -1) if err != nil { if isContextCanceledErr(err) { return nil @@ -366,7 +389,7 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error { return fmt.Errorf("failed to fetch '%s' object with index %d: %w", bfs.cfg.IndexFileAttribute, startIndex, err) } - err = bfs.streamBlockOIDs(oidsRC, int(skip)) + err = bfs.streamBlockOIDs(oidsRC, int(startIndex), int(skip)) if err != nil { if isContextCanceledErr(err) { return nil @@ -381,7 +404,7 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error { } // streamBlockOIDs reads block OIDs from the read closer and sends them to the OIDs channel. -func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error { +func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, startIndex, skip int) error { defer rc.Close() oidBytes := make([]byte, oid.Size) oidsProcessed := 0 @@ -408,7 +431,7 @@ func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error { select { case <-bfs.exiterToOIDDownloader: return nil - case bfs.oidsCh <- oidBlock: + case bfs.oidsCh <- indexedOID{Index: startIndex*int(bfs.cfg.IndexFileSize) + oidsProcessed, OID: oidBlock}: } oidsProcessed++ @@ -453,12 +476,14 @@ func (bfs *Service) fetchOIDsBySearch() error { bfs.log.Info(fmt.Sprintf("NeoFS BlockFetcher service: no block found with index %d, stopping", startIndex)) return nil } + index := int(startIndex) for _, oid := range blockOids { select { case <-bfs.exiterToOIDDownloader: return nil - case bfs.oidsCh <- oid: + case bfs.oidsCh <- indexedOID{Index: index, OID: oid}: } + index++ //Won't work properly if neofs.ObjectSearch results are not ordered. } startIndex += batchSize } @@ -592,7 +617,7 @@ func (bfs *Service) retry(action func() error) error { return err } -func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) { +func (bfs *Service) objectGet(ctx context.Context, oid string, index int) (io.ReadCloser, error) { u, err := url.Parse(fmt.Sprintf("%s:%s/%s", neofs.URIScheme, bfs.cfg.ContainerID, oid)) if err != nil { return nil, err @@ -605,8 +630,19 @@ func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, e return rc, err } -func (bfs *Service) objectGetRange(ctx context.Context, oid string) (io.ReadCloser, error) { - u, err := url.Parse(fmt.Sprintf("%s:%s/%s/%s/%d|%d", neofs.URIScheme, bfs.cfg.ContainerID, oid, "range", 0, bfs.headerSize)) +func (bfs *Service) objectGetRange(ctx context.Context, oid string, height int) (io.ReadCloser, error) { + nearestHeight := 0 + for h := range bfs.headerSizeMap { + if h <= height && h > nearestHeight { + nearestHeight = h + } + if nearestHeight >= height { + break + } + } + + size := bfs.headerSizeMap[nearestHeight] + u, err := url.Parse(fmt.Sprintf("%s:%s/%s/%s/%d|%d", neofs.URIScheme, bfs.cfg.ContainerID, oid, "range", 0, size)) if err != nil { return nil, err }