Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP archival trimming #3876

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions core/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package core
import (
"context"
"fmt"
"time"

"github.com/tendermint/tendermint/types"

Expand All @@ -15,9 +16,8 @@ import (
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/pruner/full"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability"
"github.com/celestiaorg/celestia-node/store"
)

Expand Down Expand Up @@ -61,16 +61,17 @@ func storeEDS(
eh *header.ExtendedHeader,
eds *rsmt2d.ExtendedDataSquare,
store *store.Store,
window pruner.AvailabilityWindow,
window time.Duration,
pruningEnabled bool,
) error {
if !pruner.IsWithinAvailabilityWindow(eh.Time(), window) {
if pruningEnabled && !availability.IsWithinWindow(eh.Time(), window) {
log.Debugw("skipping storage of historic block", "height", eh.Height())
return nil
}

var err error
// archival nodes should not store Q4 outside the availability window.
if pruner.IsWithinAvailabilityWindow(eh.Time(), full.Window) {
if availability.IsWithinWindow(eh.Time(), window) {
err = store.PutODSQ4(ctx, eh.DAH, eh.Height(), eds)
} else {
err = store.PutODS(ctx, eh.DAH, eh.Height(), eds)
Expand Down
9 changes: 5 additions & 4 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
libhead "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/store"
)

Expand All @@ -22,7 +21,8 @@ type Exchange struct {
store *store.Store
construct header.ConstructFn

availabilityWindow pruner.AvailabilityWindow
availabilityWindow time.Duration
pruningEnabled bool

metrics *exchangeMetrics
}
Expand Down Expand Up @@ -54,6 +54,7 @@ func NewExchange(
store: store,
construct: construct,
availabilityWindow: p.availabilityWindow,
pruningEnabled: p.pruningEnabled,
metrics: metrics,
}, nil
}
Expand Down Expand Up @@ -147,7 +148,7 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
&block.Height, hash, eh.Hash())
}

err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow)
err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow, ce.pruningEnabled)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -183,7 +184,7 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64
panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err))
}

err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow)
err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow, ce.pruningEnabled)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/celestiaorg/celestia-app/v3/test/util/testnode"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/store"
)
Expand Down Expand Up @@ -82,7 +81,8 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) {
fetcher,
store,
header.MakeExtendedHeader,
WithAvailabilityWindow(pruner.AvailabilityWindow(time.Nanosecond)), // all blocks will be "historic"
WithAvailabilityWindow(time.Nanosecond), // all blocks will be "historic"
WithPruningEnabled(),
)
require.NoError(t, err)

Expand Down
10 changes: 6 additions & 4 deletions core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
libhead "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
"github.com/celestiaorg/celestia-node/store"
)
Expand All @@ -36,9 +35,11 @@ var (
type Listener struct {
fetcher *BlockFetcher

construct header.ConstructFn
construct header.ConstructFn

store *store.Store
availabilityWindow pruner.AvailabilityWindow
availabilityWindow time.Duration
pruningEnabled bool

headerBroadcaster libhead.Broadcaster[*header.ExtendedHeader]
hashBroadcaster shrexsub.BroadcastFn
Expand Down Expand Up @@ -84,6 +85,7 @@ func NewListener(
construct: construct,
store: store,
availabilityWindow: p.availabilityWindow,
pruningEnabled: p.pruningEnabled,
listenerTimeout: 5 * blocktime,
metrics: metrics,
chainID: p.chainID,
Expand Down Expand Up @@ -238,7 +240,7 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS
panic(fmt.Errorf("making extended header: %w", err))
}

err = storeEDS(ctx, eh, eds, cl.store, cl.availabilityWindow)
err = storeEDS(ctx, eh, eds, cl.store, cl.availabilityWindow, cl.pruningEnabled)
if err != nil {
return fmt.Errorf("storing EDS: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions core/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/celestiaorg/celestia-node/header"
nodep2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
"github.com/celestiaorg/celestia-node/store"
)
Expand Down Expand Up @@ -118,7 +117,7 @@ func TestListener_DoesNotStoreHistoric(t *testing.T) {
require.NoError(t, err)

// create Listener and start listening
opt := WithAvailabilityWindow(pruner.AvailabilityWindow(time.Nanosecond))
opt := WithAvailabilityWindow(time.Nanosecond)
cl := createListener(ctx, t, fetcher, ps0, eds, store, testChainID, opt)

dataRoots := generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx)
Expand Down
25 changes: 18 additions & 7 deletions core/option.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
package core

import (
"time"

"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/pruner/archival"
"github.com/celestiaorg/celestia-node/share/availability"
)

type Option func(*params)

type params struct {
metrics bool
chainID string
availabilityWindow pruner.AvailabilityWindow
metrics bool
chainID string

availabilityWindow time.Duration
pruningEnabled bool
}

func defaultParams() params {
return params{
availabilityWindow: archival.Window,
availabilityWindow: availability.StorageWindow,
// TODO @renaynay: eventually once pruning is default, set to true
pruningEnabled: false,
}
}

Expand All @@ -34,8 +39,14 @@ func WithChainID(id p2p.Network) Option {
}
}

func WithAvailabilityWindow(window pruner.AvailabilityWindow) Option {
func WithAvailabilityWindow(window time.Duration) Option {
return func(p *params) {
p.availabilityWindow = window
}
}

func WithPruningEnabled() Option {
return func(p *params) {
p.pruningEnabled = true
}
}
4 changes: 2 additions & 2 deletions das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
libhead "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
)
Expand Down Expand Up @@ -162,7 +162,7 @@ func (d *DASer) Stop(ctx context.Context) error {
func (d *DASer) sample(ctx context.Context, h *header.ExtendedHeader) error {
// short-circuit if pruning is enabled and the header is outside the
// availability window
if !pruner.IsWithinAvailabilityWindow(h.Time(), d.params.samplingWindow) {
if d.params.pruningEnabled && !availability.IsWithinWindow(h.Time(), d.params.samplingWindow) {
log.Debugw("skipping header outside sampling window", "height", h.Height(),
"time", h.Time())
return errOutsideSamplingWindow
Expand Down
6 changes: 3 additions & 3 deletions das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/header/headertest"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability"
"github.com/celestiaorg/celestia-node/share/availability/mocks"
"github.com/celestiaorg/celestia-node/share/eds/edstest"
)
Expand Down Expand Up @@ -254,7 +254,7 @@ func TestDASer_SamplingWindow(t *testing.T) {

// create and start DASer
daser, err := NewDASer(avail, sub, getter, ds, fserv, newBroadcastMock(1),
WithSamplingWindow(pruner.AvailabilityWindow(time.Second)))
WithSamplingWindow(time.Second))
require.NoError(t, err)

tests := []struct {
Expand All @@ -276,7 +276,7 @@ func TestDASer_SamplingWindow(t *testing.T) {
assert.Equal(
t,
tt.withinWindow,
pruner.IsWithinAvailabilityWindow(eh.Time(), daser.params.samplingWindow),
availability.IsWithinWindow(eh.Time(), daser.params.samplingWindow),
)
})
}
Expand Down
17 changes: 13 additions & 4 deletions das/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"errors"
"fmt"
"time"

"github.com/celestiaorg/celestia-node/pruner"
)

// ErrInvalidOption is an error that is returned by Parameters.Validate
Expand Down Expand Up @@ -47,7 +45,10 @@ type Parameters struct {
// samplingWindow determines the time window that headers should fall into
// in order to be sampled. If set to 0, the sampling window will include
// all headers.
samplingWindow pruner.AvailabilityWindow
samplingWindow time.Duration
// pruningEnabled determines whether the DASer should skip sampling over
// headers that are outside the samplingWindow
pruningEnabled bool
}

// DefaultParameters returns the default configuration values for the daser parameters
Expand Down Expand Up @@ -166,8 +167,16 @@ func WithSampleTimeout(sampleTimeout time.Duration) Option {

// WithSamplingWindow is a functional option to configure the DASer's
// `samplingWindow` parameter.
func WithSamplingWindow(samplingWindow pruner.AvailabilityWindow) Option {
func WithSamplingWindow(samplingWindow time.Duration) Option {
return func(d *DASer) {
d.params.samplingWindow = samplingWindow
}
}

// WithPruningEnabled is a functional option to configure whether the
// DASer will sample over headers that are outside the samplingWindow.
func WithPruningEnabled() Option {
return func(d *DASer) {
d.params.pruningEnabled = true
}
}
17 changes: 14 additions & 3 deletions header/headertest/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/share"
)

Expand All @@ -43,7 +44,7 @@ type TestSuite struct {
}

func NewStore(t *testing.T) libhead.Store[*header.ExtendedHeader] {
return headertest.NewStore[*header.ExtendedHeader](t, NewTestSuite(t, 3, 0), 10)
return headertest.NewStore[*header.ExtendedHeader](t, NewTestSuite(t, 3, p2p.BlockTime), 10)
}

func NewCustomStore(
Expand Down Expand Up @@ -225,12 +226,22 @@ func (s *TestSuite) nextProposer() *types.Validator {
// RandExtendedHeader provides an ExtendedHeader fixture.
func RandExtendedHeader(t testing.TB) *header.ExtendedHeader {
timestamp := time.Now().UTC()
return RandExtendedHeaderAtTimestamp(t, timestamp)
return RandExtendedHeaderAtTimestamp(t, timestamp, nil)
}

func RandExtendedHeaderAtTimestamp(t testing.TB, timestamp time.Time) *header.ExtendedHeader {
func RandExtendedHeaderAtTimestamp(
t testing.TB,
timestamp time.Time,
eds *rsmt2d.ExtendedDataSquare,
) *header.ExtendedHeader {
dah := share.EmptyEDSRoots()

if eds != nil {
roots, err := share.NewAxisRoots(eds)
require.NoError(t, err)
dah = roots
}

rh := RandRawHeader(t)
rh.DataHash = dah.Hash()

Expand Down
6 changes: 3 additions & 3 deletions nodebuilder/das/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/celestiaorg/celestia-node/das"
"github.com/celestiaorg/celestia-node/header"
modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
)
Expand Down Expand Up @@ -45,10 +45,10 @@ func newDASer(
batching datastore.Batching,
fraudServ fraud.Service[*header.ExtendedHeader],
bFn shrexsub.BroadcastFn,
availWindow pruner.AvailabilityWindow,
availWindow availability.Window,
options ...das.Option,
) (*das.DASer, *modfraud.ServiceBreaker[*das.DASer, *header.ExtendedHeader], error) {
options = append(options, das.WithSamplingWindow(availWindow))
options = append(options, das.WithSamplingWindow(availWindow.Duration()))

ds, err := das.NewDASer(da, hsub, store, batching, fraudServ, bFn, options...)
if err != nil {
Expand Down
11 changes: 8 additions & 3 deletions nodebuilder/das/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/celestiaorg/celestia-node/header"
modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/pruner"
)

func ConstructModule(tp node.Type, cfg *Config) fx.Option {
Expand All @@ -23,14 +24,18 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
fx.Supply(*cfg),
fx.Error(err),
fx.Provide(
func(c Config) []das.Option {
return []das.Option{
func(c Config, pruningCfg pruner.Config) []das.Option {
var opts []das.Option
if pruningCfg.EnableService {
opts = append(opts, das.WithPruningEnabled())
}
return append(opts, []das.Option{
das.WithSamplingRange(c.SamplingRange),
das.WithConcurrencyLimit(c.ConcurrencyLimit),
das.WithBackgroundStoreInterval(c.BackgroundStoreInterval),
das.WithSampleFrom(c.SampleFrom),
das.WithSampleTimeout(c.SampleTimeout),
}
}...)
},
),
)
Expand Down
Loading
Loading