Skip to content

Commit

Permalink
blockfetcher: add headerSizeMap to GetRange headers accordingly
Browse files Browse the repository at this point in the history
Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Jan 31, 2025
1 parent b2e4c2c commit 18c8d20
Showing 1 changed file with 55 additions and 19 deletions.
74 changes: 55 additions & 19 deletions pkg/services/blockfetcher/blockfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ func (p poolWrapper) Close() error {
return nil
}

type indexedOID struct {
Index int
OID oid.ID
}

// Service is a service that fetches blocks from NeoFS.
type Service struct {
// isActive denotes whether the service is working or in the process of shutdown.
Expand All @@ -75,15 +80,15 @@ type Service struct {
operationMode OperationMode

stateRootInHeader bool
// headerSize is the size of the header in bytes.
headerSize int
// headerSizeMap is a map of height to expected header size.
headerSizeMap map[int]int

chain Ledger
pool poolWrapper
enqueue func(block bqueue.Queueable) error
account *wallet.Account

oidsCh chan oid.ID
oidsCh chan indexedOID
// wg is a wait group for block downloaders.
wg sync.WaitGroup

Expand All @@ -101,7 +106,7 @@ type Service struct {
shutdownCallback func()

// Depends on the OperationMode, the following functions are set to the appropriate functions.
getFunc func(ctx context.Context, oid string) (io.ReadCloser, error)
getFunc func(ctx context.Context, oid string, index int) (io.ReadCloser, error)
readFunc func(rc io.ReadCloser) (any, error)
heightFunc func() uint32
}
Expand Down Expand Up @@ -168,7 +173,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun
log: logger,
cfg: cfg,
operationMode: opt,
headerSize: getHeaderSize(chain.GetConfig()),
headerSizeMap: getHeaderSizeMap(chain.GetConfig()),

enqueue: put,
account: account,
Expand All @@ -184,13 +189,14 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun
// * first full block of OIDs is processing by Downloader
// * second full block of OIDs is available to be fetched by Downloader immediately
// * third half-filled block of OIDs is being collected by OIDsFetcher.
oidsCh: make(chan oid.ID, 2*cfg.OIDBatchSize),
oidsCh: make(chan indexedOID, 2*cfg.OIDBatchSize),
}, nil
}

func getHeaderSize(chain config.Blockchain) int {
m := smartcontract.GetDefaultHonestNodeCount(int(chain.ValidatorsCount))
func getHeaderSizeMap(chain config.Blockchain) map[int]int {
headerSizeMap := make(map[int]int)
vs, _ := keys.NewPublicKeysFromStrings(chain.StandbyCommittee)
m := smartcontract.GetDefaultHonestNodeCount(int(chain.ValidatorsCount))
verification, _ := smartcontract.CreateDefaultMultiSigRedeemScript(vs[:chain.GetNumOfCNs(0)])
b := block.Header{
StateRootEnabled: chain.StateRootInHeader,
Expand All @@ -200,7 +206,22 @@ func getHeaderSize(chain config.Blockchain) int {
InvocationScript: make([]byte, 66*m),
},
}
return b.GetExpectedHeaderSize()
headerSizeMap[0] = b.GetExpectedHeaderSize()

for height := range chain.CommitteeHistory {
m = smartcontract.GetDefaultHonestNodeCount(chain.GetNumOfCNs(height))
verification, _ = smartcontract.CreateDefaultMultiSigRedeemScript(vs[:chain.GetNumOfCNs(height)])
b = block.Header{
StateRootEnabled: chain.StateRootInHeader,
Timestamp: 42,
Script: transaction.Witness{
VerificationScript: verification,
InvocationScript: make([]byte, 66*m),
},
}
headerSizeMap[int(height)] = b.GetExpectedHeaderSize()
}

Check warning on line 223 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L212-L223

Added lines #L212 - L223 were not covered by tests
return headerSizeMap
}

// Start runs the NeoFS BlockFetcher service.
Expand Down Expand Up @@ -288,11 +309,13 @@ func (bfs *Service) oidDownloader() {
func (bfs *Service) blockDownloader() {
defer bfs.wg.Done()

for blkOid := range bfs.oidsCh {
for indexedOid := range bfs.oidsCh {
index := indexedOid.Index
blkOid := indexedOid.OID

Check warning on line 314 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L312-L314

Added lines #L312 - L314 were not covered by tests
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
defer cancel()

rc, err := bfs.getFunc(ctx, blkOid.String())
rc, err := bfs.getFunc(ctx, blkOid.String(), index)

Check warning on line 318 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L318

Added line #L318 was not covered by tests
if err != nil {
if isContextCanceledErr(err) {
return
Expand Down Expand Up @@ -358,15 +381,15 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error {

blockCtx, blockCancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
defer blockCancel()
oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String())
oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String(), -1)

Check warning on line 384 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L384

Added line #L384 was not covered by tests
if err != nil {
if isContextCanceledErr(err) {
return nil
}
return fmt.Errorf("failed to fetch '%s' object with index %d: %w", bfs.cfg.IndexFileAttribute, startIndex, err)
}

err = bfs.streamBlockOIDs(oidsRC, int(skip))
err = bfs.streamBlockOIDs(oidsRC, int(startIndex), int(skip))

Check warning on line 392 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L392

Added line #L392 was not covered by tests
if err != nil {
if isContextCanceledErr(err) {
return nil
Expand All @@ -381,7 +404,7 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error {
}

// streamBlockOIDs reads block OIDs from the read closer and sends them to the OIDs channel.
func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, startIndex, skip int) error {

Check warning on line 407 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L407

Added line #L407 was not covered by tests
defer rc.Close()
oidBytes := make([]byte, oid.Size)
oidsProcessed := 0
Expand All @@ -408,7 +431,7 @@ func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
select {
case <-bfs.exiterToOIDDownloader:
return nil
case bfs.oidsCh <- oidBlock:
case bfs.oidsCh <- indexedOID{Index: startIndex*int(bfs.cfg.IndexFileSize) + oidsProcessed, OID: oidBlock}:

Check warning on line 434 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L434

Added line #L434 was not covered by tests
}

oidsProcessed++
Expand Down Expand Up @@ -453,12 +476,14 @@ func (bfs *Service) fetchOIDsBySearch() error {
bfs.log.Info(fmt.Sprintf("NeoFS BlockFetcher service: no block found with index %d, stopping", startIndex))
return nil
}
index := int(startIndex)

Check warning on line 479 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L479

Added line #L479 was not covered by tests
for _, oid := range blockOids {
select {
case <-bfs.exiterToOIDDownloader:
return nil
case bfs.oidsCh <- oid:
case bfs.oidsCh <- indexedOID{Index: index, OID: oid}:

Check warning on line 484 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L484

Added line #L484 was not covered by tests
}
index++ //Won't work properly if neofs.ObjectSearch results are not ordered.

Check warning on line 486 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L486

Added line #L486 was not covered by tests
}
startIndex += batchSize
}
Expand Down Expand Up @@ -592,7 +617,7 @@ func (bfs *Service) retry(action func() error) error {
return err
}

func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) {
func (bfs *Service) objectGet(ctx context.Context, oid string, index int) (io.ReadCloser, error) {

Check warning on line 620 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L620

Added line #L620 was not covered by tests
u, err := url.Parse(fmt.Sprintf("%s:%s/%s", neofs.URIScheme, bfs.cfg.ContainerID, oid))
if err != nil {
return nil, err
Expand All @@ -605,8 +630,19 @@ func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, e
return rc, err
}

func (bfs *Service) objectGetRange(ctx context.Context, oid string) (io.ReadCloser, error) {
u, err := url.Parse(fmt.Sprintf("%s:%s/%s/%s/%d|%d", neofs.URIScheme, bfs.cfg.ContainerID, oid, "range", 0, bfs.headerSize))
func (bfs *Service) objectGetRange(ctx context.Context, oid string, height int) (io.ReadCloser, error) {
nearestHeight := 0
for h := range bfs.headerSizeMap {
if h <= height && h > nearestHeight {
nearestHeight = h
}
if nearestHeight >= height {
break

Check warning on line 640 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L633-L640

Added lines #L633 - L640 were not covered by tests
}
}

size := bfs.headerSizeMap[nearestHeight]
u, err := url.Parse(fmt.Sprintf("%s:%s/%s/%s/%d|%d", neofs.URIScheme, bfs.cfg.ContainerID, oid, "range", 0, size))
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 18c8d20

Please sign in to comment.