diff --git a/api/handlers.go b/api/handlers.go index f1e1d6f7..dd92d3d4 100644 --- a/api/handlers.go +++ b/api/handlers.go @@ -259,9 +259,7 @@ func (s *Server) getEnrichingJetDrops(oldestPulse, newestPulse int64) ([]models. enrichedDrops = append(enrichedDrops, ejd...) } - nextPulse, err := s.storage.GetNextSavedPulse(models.Pulse{ - PulseNumber: newestPulse, - }) + nextPulse, err := s.storage.GetNextSavedPulse(models.Pulse{PulseNumber: newestPulse}, false) if err == nil && nextPulse != emptyPulse { ejd, err := s.storage.GetJetDrops(models.Pulse{PulseNumber: nextPulse.PulseNumber}) if err != nil { diff --git a/cmd/block-explorer/main.go b/cmd/block-explorer/main.go index 9dcf6d7d..f9f995b0 100644 --- a/cmd/block-explorer/main.go +++ b/cmd/block-explorer/main.go @@ -83,6 +83,7 @@ func main() { 100, cfg.Replicator.ContinuousPulseRetrievingHalfPulseSeconds, int32(cfg.Replicator.ParallelConnections), + cfg.Replicator.QueueLen, pulseExtractor, exporter.NewRecordExporterClient(client.GetGRPCConn()), shutdownBE, @@ -98,7 +99,10 @@ func main() { } }() - mainNetTransformer := transformer.NewMainNetTransformer(platformExtractor.GetJetDrops(ctx)) + mainNetTransformer := transformer.NewMainNetTransformer( + platformExtractor.GetJetDrops(ctx), + cfg.Transformer.QueueLen, + ) err = mainNetTransformer.Start(ctx) if err != nil { logger.Fatal("cannot start transformer: ", err) @@ -128,7 +132,7 @@ func main() { repository := storage.NewStorage(db) - gbeController, err := controller.NewController(cfg.Controller, platformExtractor, repository) + gbeController, err := controller.NewController(cfg.Controller, platformExtractor, repository, cfg.Replicator.PlatformVersion) if err != nil { logger.Fatal("cannot initialize gbeController: ", err) } diff --git a/configuration/configuration.go b/configuration/configuration.go index d5c5323b..cc15c1cf 100644 --- a/configuration/configuration.go +++ b/configuration/configuration.go @@ -17,13 +17,14 @@ func init() { } type BlockExplorer struct { - Log Log - DB DB - Replicator Replicator - Controller Controller - Processor Processor - Metrics Metrics - Profefe Profefe + Log Log + DB DB + Replicator Replicator + Controller Controller + Processor Processor + Transformer Transformer + Metrics Metrics + Profefe Profefe } type API struct { @@ -53,11 +54,13 @@ type TestDB struct { // Replicator represents a configuration of the Platform connection type Replicator struct { + PlatformVersion int `insconfig:"1| Platform version, can be 1 or 2"` Addr string `insconfig:"127.0.0.1:5678| The gRPC server address"` MaxTransportMsg int `insconfig:"1073741824| Maximum message size the client can send"` WaitForConnectionRecoveryTimeout time.Duration `insconfig:"30s| Connection recovery timeout"` ContinuousPulseRetrievingHalfPulseSeconds uint32 `insconfig:"5| Half pulse in seconds"` ParallelConnections uint32 `insconfig:"100| Maximum parallel pulse retrievers"` + QueueLen uint32 `insconfig:"500| Max elements in extractor queue"` Auth Auth } @@ -115,6 +118,11 @@ type Processor struct { Workers int `insconfig:"200| The count of workers for processing transformed data"` } +// Transformer transforms raw platform data to canonical GBE data types +type Transformer struct { + QueueLen uint32 `insconfig:"500| Max elements in transformer queue"` +} + type Profefe struct { StartAgent bool `insconfig:"true| if true, start the profefe agent"` Address string `insconfig:"http://127.0.0.1:10100| Profefe collector public address to send profiling data"` diff --git a/etl/controller/controller.go b/etl/controller/controller.go index b2887203..4b69297a 100644 --- a/etl/controller/controller.go +++ b/etl/controller/controller.go @@ -12,6 +12,7 @@ import ( "github.com/insolar/block-explorer/configuration" "github.com/insolar/block-explorer/etl/models" + "github.com/insolar/block-explorer/instrumentation/belogger" "github.com/pkg/errors" @@ -37,21 +38,32 @@ type Controller struct { // sequentialPulse is greatest complete pulse after which all pulses complete too sequentialPulse models.Pulse sequentialPulseLock sync.RWMutex + + // incompletePulseCounter for penv-615 + incompletePulseCounter int + platformVersion int } // NewController returns implementation of interfaces.Controller -func NewController(cfg configuration.Controller, extractor interfaces.JetDropsExtractor, storage interfaces.Storage) (*Controller, error) { +func NewController(cfg configuration.Controller, extractor interfaces.JetDropsExtractor, storage interfaces.Storage, pv int) (*Controller, error) { c := &Controller{ cfg: cfg, extractor: extractor, storage: storage, jetDropRegister: make(map[types.Pulse]map[string]struct{}), missedDataManager: NewMissedDataManager(time.Second*time.Duration(cfg.ReloadPeriod), time.Second*time.Duration(cfg.ReloadCleanPeriod)), + platformVersion: pv, } + return c, nil +} + +func (c *Controller) setIncompletePulses(ctx context.Context) error { pulses, err := c.storage.GetIncompletePulses() if err != nil { - return nil, errors.Wrap(err, "can't get not complete pulses from storage") + return errors.Wrap(err, "can't get not complete pulses from storage") } + log := belogger.FromContext(ctx) + log.Debugf("Found %d incomplete pulses in db", len(pulses)) for _, p := range pulses { key := types.Pulse{PulseNo: p.PulseNumber, PrevPulseNumber: p.PrevPulseNumber, NextPulseNumber: p.NextPulseNumber} func() { @@ -61,17 +73,23 @@ func NewController(cfg configuration.Controller, extractor interfaces.JetDropsEx }() jetDrops, err := c.storage.GetJetDrops(p) if err != nil { - return nil, errors.Wrapf(err, "can't get jetDrops for pulse %d from storage", p.PulseNumber) + return errors.Wrapf(err, "can't get jetDrops for pulse %d from storage", p.PulseNumber) } for _, jd := range jetDrops { c.SetJetDropData(key, jd.JetID) } } + return nil +} + +func (c *Controller) setSeqPulse(ctx context.Context) error { + log := belogger.FromContext(ctx) c.sequentialPulseLock.Lock() defer c.sequentialPulseLock.Unlock() + var err error c.sequentialPulse, err = c.storage.GetSequentialPulse() if err != nil { - return nil, errors.Wrap(err, "can't get sequential pulse from storage") + return errors.Wrap(err, "can't get sequential pulse from storage") } emptyPulse := models.Pulse{} if c.sequentialPulse == emptyPulse { @@ -79,11 +97,20 @@ func NewController(cfg configuration.Controller, extractor interfaces.JetDropsEx PulseNumber: 0, } } - return c, nil + log.Debugf("Found last sequential pulse %d in db", c.sequentialPulse.PulseNumber) + return nil } // Start implements interfaces.Starter func (c *Controller) Start(ctx context.Context) error { + err := c.setIncompletePulses(ctx) + if err != nil { + return err + } + err = c.setSeqPulse(ctx) + if err != nil { + return err + } ctx, c.cancelFunc = context.WithCancel(ctx) c.missedDataManager.Start() go c.pulseMaintainer(ctx) diff --git a/etl/controller/controller_test.go b/etl/controller/controller_test.go index d8d36ee2..686374d6 100644 --- a/etl/controller/controller_test.go +++ b/etl/controller/controller_test.go @@ -8,6 +8,7 @@ package controller import ( + "context" "sync" "testing" @@ -22,6 +23,7 @@ import ( ) var cfg = configuration.Controller{PulsePeriod: 10, ReloadPeriod: 10, ReloadCleanPeriod: 1, SequentialPeriod: 1} +var platformVersion = 2 func TestNewController_NoPulses(t *testing.T) { extractor := mock.NewJetDropsExtractorMock(t) @@ -29,12 +31,18 @@ func TestNewController_NoPulses(t *testing.T) { sm := mock.NewStorageMock(t) sm.GetIncompletePulsesMock.Return(nil, nil) sm.GetSequentialPulseMock.Return(models.Pulse{}, nil) + sm.GetPulseByPrevMock.Return(models.Pulse{}, nil) + sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil) + extractor.LoadJetDropsMock.Return(nil) - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, platformVersion) require.NoError(t, err) require.NotNil(t, c) require.Empty(t, c.jetDropRegister) require.NotNil(t, c.missedDataManager) + err = c.Start(context.Background()) + require.NoError(t, err) + require.Equal(t, uint64(1), sm.GetIncompletePulsesAfterCounter()) } @@ -50,11 +58,16 @@ func TestNewController_OneNotCompletePulse(t *testing.T) { sm.GetIncompletePulsesMock.Return([]models.Pulse{{PulseNumber: pulseNumber}}, nil) sm.GetJetDropsMock.Return([]models.JetDrop{{JetID: firstJetID}, {JetID: secondJetID}}, nil) sm.GetSequentialPulseMock.Return(models.Pulse{}, nil) + sm.GetPulseByPrevMock.Return(models.Pulse{}, nil) + sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil) + extractor.LoadJetDropsMock.Return(nil) - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, platformVersion) require.NoError(t, err) require.NotNil(t, c) require.NotNil(t, c.missedDataManager) + err = c.Start(context.Background()) + require.NoError(t, err) require.Equal(t, expectedData, c.jetDropRegister) @@ -72,11 +85,16 @@ func TestNewController_OneNotCompletePulse_NoJets(t *testing.T) { sm.GetIncompletePulsesMock.Return([]models.Pulse{{PulseNumber: pulseNumber}}, nil) sm.GetJetDropsMock.Return([]models.JetDrop{}, nil) sm.GetSequentialPulseMock.Return(models.Pulse{}, nil) + sm.GetPulseByPrevMock.Return(models.Pulse{}, nil) + sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil) + extractor.LoadJetDropsMock.Return(nil) - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, platformVersion) require.NoError(t, err) require.NotNil(t, c) require.NotNil(t, c.missedDataManager) + err = c.Start(context.Background()) + require.NoError(t, err) require.Equal(t, expectedData, c.jetDropRegister) @@ -109,11 +127,16 @@ func TestNewController_SeveralNotCompletePulses(t *testing.T) { sm.GetIncompletePulsesMock.Return([]models.Pulse{{PulseNumber: firstPulseNumber}, {PulseNumber: secondPulseNumber}}, nil) sm.GetJetDropsMock.Set(getJetDrops) sm.GetSequentialPulseMock.Return(models.Pulse{}, nil) + sm.GetPulseByPrevMock.Return(models.Pulse{}, nil) + sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil) + extractor.LoadJetDropsMock.Return(nil) - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, platformVersion) require.NoError(t, err) require.NotNil(t, c) require.NotNil(t, c.missedDataManager) + err = c.Start(context.Background()) + require.NoError(t, err) require.Equal(t, expectedData, c.jetDropRegister) @@ -127,11 +150,16 @@ func TestNewController_ErrorGetPulses(t *testing.T) { sm := mock.NewStorageMock(t) sm.GetIncompletePulsesMock.Return(nil, errors.New("test error")) sm.GetSequentialPulseMock.Return(models.Pulse{}, nil) + sm.GetPulseByPrevMock.Return(models.Pulse{}, nil) + sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil) + extractor.LoadJetDropsMock.Return(nil) - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, platformVersion) + require.NoError(t, err) + err = c.Start(context.Background()) require.Error(t, err) require.Contains(t, err.Error(), "test error") - require.Nil(t, c) + require.NotNil(t, c) require.Equal(t, uint64(1), sm.GetIncompletePulsesAfterCounter()) } @@ -144,11 +172,16 @@ func TestNewController_ErrorGetJetDrops(t *testing.T) { sm.GetIncompletePulsesMock.Return([]models.Pulse{{PulseNumber: pulseNumber}}, nil) sm.GetJetDropsMock.Return(nil, errors.New("test error")) sm.GetSequentialPulseMock.Return(models.Pulse{}, nil) + sm.GetPulseByPrevMock.Return(models.Pulse{}, nil) + sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil) + extractor.LoadJetDropsMock.Return(nil) - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, platformVersion) + require.NoError(t, err) + err = c.Start(context.Background()) require.Error(t, err) require.Contains(t, err.Error(), "test error") - require.Nil(t, c) + require.NotNil(t, c) require.Equal(t, uint64(1), sm.GetIncompletePulsesAfterCounter()) require.Equal(t, uint64(1), sm.GetJetDropsAfterCounter()) } diff --git a/etl/controller/metrics.go b/etl/controller/metrics.go index fc50cc31..53bce82a 100644 --- a/etl/controller/metrics.go +++ b/etl/controller/metrics.go @@ -20,10 +20,6 @@ var ( Name: "gbe_controller_current_seq_pulse", Help: "Current sequentual pulse rerequested from platform", }) - CurrentIncompletePulse = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "gbe_controller_current_incomplete_pulse", - Help: "Current incomplete pulse that records are rerequested from platform", - }) PulseCompleteCounter = prometheus.NewCounter(prometheus.CounterOpts{ Name: "gbe_controller_pulse_complete_counter", Help: "How many pulses is completed by 'pulseIsComplete' check", @@ -44,7 +40,6 @@ func (s Metrics) Metrics(p *metrics.Prometheus) []prometheus.Collector { return []prometheus.Collector{ IncompletePulsesQueue, CurrentSeqPulse, - CurrentIncompletePulse, PulseNotCompleteCounter, PulseCompleteCounter, } diff --git a/etl/controller/pulsemaintainer.go b/etl/controller/pulsemaintainer.go index 87c6eec1..8e17aa11 100644 --- a/etl/controller/pulsemaintainer.go +++ b/etl/controller/pulsemaintainer.go @@ -33,6 +33,7 @@ func (c *Controller) pulseMaintainer(ctx context.Context) { } func eraseJetDropRegister(ctx context.Context, c *Controller, log log.Logger) { + log.Debugf("pulseMaintainer(): eraseJetDropRegister start") jetDropRegisterCopy := map[types.Pulse]map[string]struct{}{} func() { c.jetDropRegisterLock.Lock() @@ -70,10 +71,19 @@ func eraseJetDropRegister(ctx context.Context, c *Controller, log log.Logger) { } } else { PulseNotCompleteCounter.Inc() - log.Debugf("Pulse %d not completed, reloading", p.PulseNo) - c.reloadData(ctx, p.PrevPulseNumber, p.PulseNo) - CurrentIncompletePulse.Set(float64(p.PrevPulseNumber)) + if c.platformVersion != 1 { + log.Debugf("Pulse %d not completed, reloading", p.PulseNo) + c.reloadData(ctx, p.PrevPulseNumber, p.PulseNo) + } + } + } + + if c.platformVersion == 1 { + if c.incompletePulseCounter == 1000 { + c.cleanJetDropRegister(ctx) + c.incompletePulseCounter = 0 } + c.incompletePulseCounter++ } } @@ -103,8 +113,24 @@ func (c *Controller) pulseSequence(ctx context.Context) { return } - if nextSequential == emptyPulse { - toPulse, err := c.storage.GetNextSavedPulse(c.sequentialPulse) + if nextSequential.IsComplete { + err = c.storage.SequencePulse(nextSequential.PulseNumber) + if err != nil { + log.Errorf("During sequence next sequential pulse: %s", err.Error()) + return + } + c.sequentialPulse = nextSequential + log.Infof("Pulse %d sequenced", nextSequential.PulseNumber) + waitTime = time.Duration(0) + return + } + + if !nextSequential.IsComplete || nextSequential == emptyPulse { + completed := false + if c.platformVersion == 1 { + completed = true + } + toPulse, err := c.storage.GetNextSavedPulse(c.sequentialPulse, completed) if err != nil && !gorm.IsRecordNotFoundError(err) { log.Errorf("During loading next existing pulse: %s", err.Error()) return @@ -117,17 +143,6 @@ func (c *Controller) pulseSequence(ctx context.Context) { c.reloadData(ctx, c.sequentialPulse.PulseNumber, toPulse.PrevPulseNumber) return } - if nextSequential.IsComplete { - err = c.storage.SequencePulse(nextSequential.PulseNumber) - if err != nil { - log.Errorf("During sequence next sequential pulse: %s", err.Error()) - return - } - c.sequentialPulse = nextSequential - log.Infof("Pulse %d sequenced", nextSequential.PulseNumber) - waitTime = time.Duration(0) - return - } }() } } @@ -215,3 +230,31 @@ func (c *Controller) reloadData(ctx context.Context, fromPulseNumber int64, toPu } } } + +func (c *Controller) cleanJetDropRegister(ctx context.Context) { + log := belogger.FromContext(ctx) + jetDropRegisterCopy := map[types.Pulse]map[string]struct{}{} + func() { + c.jetDropRegisterLock.Lock() + defer c.jetDropRegisterLock.Unlock() + for k, v := range c.jetDropRegister { + jetDropsCopy := map[string]struct{}{} + for jetID := range v { + jetDropsCopy[jetID] = struct{}{} + } + jetDropRegisterCopy[k] = jetDropsCopy + } + }() + + for p := range jetDropRegisterCopy { + if c.sequentialPulse.PulseNumber > p.PulseNo { + log.Infof("Pulse %d less then seq pulse, dropping it", p.PulseNo) + func() { + c.jetDropRegisterLock.Lock() + defer c.jetDropRegisterLock.Unlock() + delete(c.jetDropRegister, p) + }() + + } + } +} diff --git a/etl/controller/pulsemaintainer_test.go b/etl/controller/pulsemaintainer_test.go index f6ba5026..0264b0fe 100644 --- a/etl/controller/pulsemaintainer_test.go +++ b/etl/controller/pulsemaintainer_test.go @@ -37,7 +37,7 @@ func TestController_pulseMaintainer(t *testing.T) { sm.GetPulseByPrevMock.Return(models.Pulse{}, errors.New("test error")) defer leaktest.Check(t)() - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, 2) require.NoError(t, err) ctx := context.Background() require.NoError(t, c.Start(ctx)) @@ -79,13 +79,13 @@ func TestController_pulseSequence_StartFromNothing(t *testing.T) { require.Equal(t, int64(pulse.MinTimePulse), prevPulse.PulseNumber) return models.Pulse{}, nil }) - sm.GetNextSavedPulseMock.Set(func(fromPulseNumber models.Pulse) (p1 models.Pulse, err error) { + sm.GetNextSavedPulseMock.Set(func(fromPulseNumber models.Pulse, completedOnly bool) (p1 models.Pulse, err error) { if sm.GetNextSavedPulseBeforeCounter() == 1 || sm.GetPulseByPrevBeforeCounter() == 2 { require.Equal(t, int64(0), fromPulseNumber.PulseNumber) return models.Pulse{PrevPulseNumber: 1000100, PulseNumber: 1000110, NextPulseNumber: 1000120}, nil } if sm.GetNextSavedPulseBeforeCounter() == 3 { - require.Equal(t, int64(pulse.MinTimePulse), fromPulseNumber.PulseNumber) + require.Equal(t, int64(0), fromPulseNumber.PulseNumber) return models.Pulse{PrevPulseNumber: 1000100, PulseNumber: 1000110, NextPulseNumber: 1000120}, nil } require.Equal(t, int64(pulse.MinTimePulse), fromPulseNumber.PulseNumber) @@ -108,7 +108,7 @@ func TestController_pulseSequence_StartFromNothing(t *testing.T) { }) defer leaktest.Check(t)() - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, 2) require.NoError(t, err) ctx := context.Background() err = c.Start(ctx) @@ -152,7 +152,7 @@ func TestController_pulseSequence_StartFromSomething(t *testing.T) { require.Equal(t, int64(1000020), prevPulse.PulseNumber) return models.Pulse{PrevPulseNumber: 1000020, PulseNumber: 1000030, NextPulseNumber: 1000040, IsComplete: false}, nil }) - sm.GetNextSavedPulseMock.Set(func(fromPulseNumber models.Pulse) (p1 models.Pulse, err error) { + sm.GetNextSavedPulseMock.Set(func(fromPulseNumber models.Pulse, completedOnly bool) (p1 models.Pulse, err error) { if sm.GetNextSavedPulseBeforeCounter() == 1 || sm.GetPulseByPrevBeforeCounter() == 2 { require.Equal(t, int64(1000000), fromPulseNumber.PulseNumber) return models.Pulse{PrevPulseNumber: 1000010, PulseNumber: 1000020, NextPulseNumber: 1000030}, nil @@ -180,7 +180,7 @@ func TestController_pulseSequence_StartFromSomething(t *testing.T) { }) defer leaktest.Check(t)() - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, 2) require.NoError(t, err) ctx := context.Background() err = c.Start(ctx) @@ -216,7 +216,7 @@ func TestController_pulseSequence_Start_NoMissedData(t *testing.T) { return models.Pulse{PrevPulseNumber: 1000010, PulseNumber: 1000020, NextPulseNumber: 1000030, IsComplete: true}, nil } require.Equal(t, int64(1000020), prevPulse.PulseNumber) - return models.Pulse{PrevPulseNumber: 1000020, PulseNumber: 1000030, NextPulseNumber: 1000040, IsComplete: false}, nil + return models.Pulse{}, nil }) sm.SequencePulseMock.Set(func(pulseNumber int64) (err error) { if sm.SequencePulseBeforeCounter() == 1 { @@ -227,9 +227,12 @@ func TestController_pulseSequence_Start_NoMissedData(t *testing.T) { } return nil }) + sm.GetNextSavedPulseMock.Set(func(fromPulseNumber models.Pulse, completedOnly bool) (p1 models.Pulse, err error) { + return models.Pulse{}, nil + }) defer leaktest.Check(t)() - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, 2) require.NoError(t, err) ctx := context.Background() err = c.Start(ctx) @@ -274,9 +277,12 @@ func TestController_pulseMaintainer_Start_PulsesCompleteAndNot(t *testing.T) { wg.Done() return nil }) + sm.GetNextSavedPulseMock.Set(func(fromPulseNumber models.Pulse, completedOnly bool) (p1 models.Pulse, err error) { + return models.Pulse{}, nil + }) defer leaktest.Check(t)() - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, 2) require.NoError(t, err) ctx := context.Background() err = c.Start(ctx) @@ -303,7 +309,7 @@ func TestController_pulseSequence_ReloadPeriodExpired(t *testing.T) { sm.GetPulseByPrevMock.Set(func(prevPulse models.Pulse) (p1 models.Pulse, err error) { return models.Pulse{}, nil }) - sm.GetNextSavedPulseMock.Set(func(fromPulseNumber models.Pulse) (p1 models.Pulse, err error) { + sm.GetNextSavedPulseMock.Set(func(fromPulseNumber models.Pulse, completedOnly bool) (p1 models.Pulse, err error) { return models.Pulse{PrevPulseNumber: 1000010, PulseNumber: 1000020, NextPulseNumber: 1000030}, nil }) extractor.LoadJetDropsMock.Set(func(ctx context.Context, fromPulseNumber int64, toPulseNumber int64) (err error) { @@ -317,7 +323,7 @@ func TestController_pulseSequence_ReloadPeriodExpired(t *testing.T) { }) defer leaktest.Check(t)() - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, 2) require.NoError(t, err) ctx := context.Background() err = c.Start(ctx) diff --git a/etl/extractor/extractor_bench_test.go b/etl/extractor/extractor_bench_test.go index c76b1311..9f0fb7fc 100644 --- a/etl/extractor/extractor_bench_test.go +++ b/etl/extractor/extractor_bench_test.go @@ -41,7 +41,7 @@ func BenchmarkPlatformExtractorGetJetDrops(b *testing.B) { require.NoError(b, err) pulseClient := clients.GetTestPulseClient(1, nil) - extractor := NewPlatformExtractor(uint32(defaultLocalBatchSize), 0, 100, NewPlatformPulseExtractor(pulseClient), &RecordExporterClient{}, func() {}) + extractor := NewPlatformExtractor(uint32(defaultLocalBatchSize), 0, 100, 100, NewPlatformPulseExtractor(pulseClient), &RecordExporterClient{}, func() {}) fullPulse, err := clients.GetFullPulse(uint32(StartPulseNumber), nil) require.NoError(b, err) go extractor.retrieveRecords(ctx, *fullPulse, true, false) diff --git a/etl/extractor/platform_impl.go b/etl/extractor/platform_impl.go index c42823cc..50c60571 100644 --- a/etl/extractor/platform_impl.go +++ b/etl/extractor/platform_impl.go @@ -50,6 +50,7 @@ func NewPlatformExtractor( batchSize uint32, continuousPulseRetrievingHalfPulseSeconds uint32, maxWorkers int32, + queueLen uint32, pulseExtractor interfaces.PulseExtractor, exporterClient exporter.RecordExporterClient, shutdownBE func(), @@ -59,7 +60,7 @@ func NewPlatformExtractor( startStopMutex: &sync.Mutex{}, client: exporterClient, request: request, - mainPulseDataChan: make(chan *types.PlatformPulseData, 1000), + mainPulseDataChan: make(chan *types.PlatformPulseData, queueLen), pulseExtractor: pulseExtractor, batchSize: batchSize, @@ -93,7 +94,7 @@ func (e *PlatformExtractor) Start(ctx context.Context) error { e.startStopMutex.Lock() defer e.startStopMutex.Unlock() if !e.hasStarted { - belogger.FromContext(ctx).Info("Starting platform extractor mainthread...") + belogger.FromContext(ctx).Info("Starting platform extractor main thread...") e.hasStarted = true ctx, e.cancel = context.WithCancel(ctx) go e.retrievePulses(ctx, 0, 0) @@ -192,12 +193,15 @@ func (e *PlatformExtractor) retrievePulses(ctx context.Context, from, until int6 continue } - log.Debugf("retrievePulses(): Done, jets %d", len(pu.Jets)) + log.Debugf("retrievePulses(): Done, jets %d, new pulse: %d", len(pu.Jets), pu.PulseNumber) ReceivedPulses.Inc() LastPulseFetched.Set(float64(pu.PulseNumber)) if !mainThread && e.maxWorkers <= 3 { - // go to single worker scheme + // This hack made for 1 platform only + // If you set maxWorkers in config <=3, then we start receive data in serial 1 by 1. + // 3 threads made for we can get 3 potential skipped spaces between pulses at the same time. + // If some day heavy node will be able to process many parallel getRecords requests - delete this hack if nextNotEmptyPulseNumber == nil || pu.PulseNumber >= *nextNotEmptyPulseNumber { sif := "nil" if nextNotEmptyPulseNumber != nil { @@ -310,7 +314,7 @@ func (e *PlatformExtractor) retrieveRecords(ctx context.Context, pu exporter.Ful closeStream(cancelCtx, stream) e.mainPulseDataChan <- pulseData FromExtractorDataQueue.Set(float64(len(e.mainPulseDataChan))) - log.Debug("retrieveRecords(): Done in %s", time.Since(startedAt)) + log.Debugf("retrieveRecords(): Done in %s, recs: %d", time.Since(startedAt).String(), len(pulseData.Records)) iterateFrom := resp.ShouldIterateFrom if iterateFrom == nil { itf := resp.Record.ID.Pulse() @@ -327,7 +331,8 @@ func (e *PlatformExtractor) retrieveRecords(ctx context.Context, pu exporter.Ful } func (e *PlatformExtractor) takeWorker() bool { - if atomic.AddInt32(&e.workers, 1) > e.maxWorkers { + max := e.maxWorkers + if atomic.AddInt32(&e.workers, 1) > max { atomic.AddInt32(&e.workers, -1) return false } diff --git a/etl/extractor/platform_test.go b/etl/extractor/platform_test.go index 5aca811d..46f624a3 100644 --- a/etl/extractor/platform_test.go +++ b/etl/extractor/platform_test.go @@ -55,7 +55,7 @@ func TestGetJetDrops(t *testing.T) { pulseClient := clients.GetTestPulseClient(65537, nil) pulseExtractor := NewPlatformPulseExtractor(pulseClient) - extractor := NewPlatformExtractor(uint32(pulseCount), 0, 100, pulseExtractor, recordClient, func() {}) + extractor := NewPlatformExtractor(uint32(pulseCount), 0, 100, 100, pulseExtractor, recordClient, func() {}) err = extractor.Start(ctx) require.NoError(t, err) defer extractor.Stop(ctx) @@ -98,7 +98,7 @@ func TestGetJetDrops_WrongVersionOnPulseError(t *testing.T) { pulseExtractor.GetNextFinalizedPulseMock.Set(func(ctx context.Context, p int64) (fp1 *exporter.FullPulse, err error) { return nil, errors.New("unknown heavy-version") }) - extractor := NewPlatformExtractor(uint32(1), 0, 100, pulseExtractor, recordClient, shutdownBETestFunc) + extractor := NewPlatformExtractor(uint32(1), 0, 100, 100, pulseExtractor, recordClient, shutdownBETestFunc) err := extractor.Start(ctx) defer extractor.Stop(ctx) require.NoError(mc, err) @@ -135,7 +135,7 @@ func TestGetJetDrops_WrongVersionOnRecordError(t *testing.T) { } pulseClient := clients.GetTestPulseClient(65537, nil) pulseExtractor := NewPlatformPulseExtractor(pulseClient) - extractor := NewPlatformExtractor(uint32(1), 0, 100, pulseExtractor, recordClient, shutdownBETestFunc) + extractor := NewPlatformExtractor(uint32(1), 0, 100, 100, pulseExtractor, recordClient, shutdownBETestFunc) err := extractor.Start(ctx) defer extractor.Stop(ctx) require.NoError(mc, err) @@ -229,7 +229,7 @@ func TestLoadJetDrops_returnsRecordByPulses(t *testing.T) { return pp, err }) - extractor := NewPlatformExtractor(77, 0, 100, pulseExtractor, recordClient, func() {}) + extractor := NewPlatformExtractor(77, 0, 100, 100, pulseExtractor, recordClient, func() {}) err := extractor.LoadJetDrops(ctx, int64(startPulseNumber-10), int64(startPulseNumber+10*(test.differentPulseCount-1))) require.NoError(t, err) for i := 0; i < test.differentPulseCount; i++ { diff --git a/etl/interfaces/interfaces.go b/etl/interfaces/interfaces.go index 1a86263a..9ca4de20 100644 --- a/etl/interfaces/interfaces.go +++ b/etl/interfaces/interfaces.go @@ -122,7 +122,7 @@ type StorageAPIFetcher interface { // GetRecordsByJetDrop returns records for provided jet drop, ordered by order field. GetRecordsByJetDrop(jetDropID models.JetDropID, fromIndex, recordType *string, limit, offset int) ([]models.Record, int, error) // GetNextSavedPulse returns first pulse with pulse number bigger then fromPulseNumber from db. - GetNextSavedPulse(fromPulseNumber models.Pulse) (models.Pulse, error) + GetNextSavedPulse(fromPulseNumber models.Pulse, completedOnly bool) (models.Pulse, error) // GetJetDrops returns jetDrops for provided pulse from db. GetJetDrops(pulse models.Pulse) ([]models.JetDrop, error) } @@ -135,7 +135,7 @@ type StorageFetcher interface { // GetPulseByPrev returns pulse with provided prev pulse number from db. GetPulseByPrev(prevPulse models.Pulse) (models.Pulse, error) // GetNextSavedPulse returns first pulse with pulse number bigger then fromPulseNumber from db. - GetNextSavedPulse(fromPulseNumber models.Pulse) (models.Pulse, error) + GetNextSavedPulse(fromPulseNumber models.Pulse, completedOnly bool) (models.Pulse, error) // GetJetDrops returns jetDrops for provided pulse from db. GetJetDrops(pulse models.Pulse) ([]models.JetDrop, error) } diff --git a/etl/interfaces/mock/storage_mock.go b/etl/interfaces/mock/storage_mock.go index 5b3ea15f..8413ff12 100644 --- a/etl/interfaces/mock/storage_mock.go +++ b/etl/interfaces/mock/storage_mock.go @@ -33,8 +33,8 @@ type StorageMock struct { beforeGetJetDropsCounter uint64 GetJetDropsMock mStorageMockGetJetDrops - funcGetNextSavedPulse func(fromPulseNumber models.Pulse) (p1 models.Pulse, err error) - inspectFuncGetNextSavedPulse func(fromPulseNumber models.Pulse) + funcGetNextSavedPulse func(fromPulseNumber models.Pulse, completedOnly bool) (p1 models.Pulse, err error) + inspectFuncGetNextSavedPulse func(fromPulseNumber models.Pulse, completedOnly bool) afterGetNextSavedPulseCounter uint64 beforeGetNextSavedPulseCounter uint64 GetNextSavedPulseMock mStorageMockGetNextSavedPulse @@ -700,6 +700,7 @@ type StorageMockGetNextSavedPulseExpectation struct { // StorageMockGetNextSavedPulseParams contains parameters of the Storage.GetNextSavedPulse type StorageMockGetNextSavedPulseParams struct { fromPulseNumber models.Pulse + completedOnly bool } // StorageMockGetNextSavedPulseResults contains results of the Storage.GetNextSavedPulse @@ -709,7 +710,7 @@ type StorageMockGetNextSavedPulseResults struct { } // Expect sets up expected params for Storage.GetNextSavedPulse -func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) Expect(fromPulseNumber models.Pulse) *mStorageMockGetNextSavedPulse { +func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) Expect(fromPulseNumber models.Pulse, completedOnly bool) *mStorageMockGetNextSavedPulse { if mmGetNextSavedPulse.mock.funcGetNextSavedPulse != nil { mmGetNextSavedPulse.mock.t.Fatalf("StorageMock.GetNextSavedPulse mock is already set by Set") } @@ -718,7 +719,7 @@ func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) Expect(fromPulseNumber mmGetNextSavedPulse.defaultExpectation = &StorageMockGetNextSavedPulseExpectation{} } - mmGetNextSavedPulse.defaultExpectation.params = &StorageMockGetNextSavedPulseParams{fromPulseNumber} + mmGetNextSavedPulse.defaultExpectation.params = &StorageMockGetNextSavedPulseParams{fromPulseNumber, completedOnly} for _, e := range mmGetNextSavedPulse.expectations { if minimock.Equal(e.params, mmGetNextSavedPulse.defaultExpectation.params) { mmGetNextSavedPulse.mock.t.Fatalf("Expectation set by When has same params: %#v", *mmGetNextSavedPulse.defaultExpectation.params) @@ -729,7 +730,7 @@ func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) Expect(fromPulseNumber } // Inspect accepts an inspector function that has same arguments as the Storage.GetNextSavedPulse -func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) Inspect(f func(fromPulseNumber models.Pulse)) *mStorageMockGetNextSavedPulse { +func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) Inspect(f func(fromPulseNumber models.Pulse, completedOnly bool)) *mStorageMockGetNextSavedPulse { if mmGetNextSavedPulse.mock.inspectFuncGetNextSavedPulse != nil { mmGetNextSavedPulse.mock.t.Fatalf("Inspect function is already set for StorageMock.GetNextSavedPulse") } @@ -753,7 +754,7 @@ func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) Return(p1 models.Pulse } //Set uses given function f to mock the Storage.GetNextSavedPulse method -func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) Set(f func(fromPulseNumber models.Pulse) (p1 models.Pulse, err error)) *StorageMock { +func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) Set(f func(fromPulseNumber models.Pulse, completedOnly bool) (p1 models.Pulse, err error)) *StorageMock { if mmGetNextSavedPulse.defaultExpectation != nil { mmGetNextSavedPulse.mock.t.Fatalf("Default expectation is already set for the Storage.GetNextSavedPulse method") } @@ -768,14 +769,14 @@ func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) Set(f func(fromPulseNu // When sets expectation for the Storage.GetNextSavedPulse which will trigger the result defined by the following // Then helper -func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) When(fromPulseNumber models.Pulse) *StorageMockGetNextSavedPulseExpectation { +func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) When(fromPulseNumber models.Pulse, completedOnly bool) *StorageMockGetNextSavedPulseExpectation { if mmGetNextSavedPulse.mock.funcGetNextSavedPulse != nil { mmGetNextSavedPulse.mock.t.Fatalf("StorageMock.GetNextSavedPulse mock is already set by Set") } expectation := &StorageMockGetNextSavedPulseExpectation{ mock: mmGetNextSavedPulse.mock, - params: &StorageMockGetNextSavedPulseParams{fromPulseNumber}, + params: &StorageMockGetNextSavedPulseParams{fromPulseNumber, completedOnly}, } mmGetNextSavedPulse.expectations = append(mmGetNextSavedPulse.expectations, expectation) return expectation @@ -788,15 +789,15 @@ func (e *StorageMockGetNextSavedPulseExpectation) Then(p1 models.Pulse, err erro } // GetNextSavedPulse implements interfaces.Storage -func (mmGetNextSavedPulse *StorageMock) GetNextSavedPulse(fromPulseNumber models.Pulse) (p1 models.Pulse, err error) { +func (mmGetNextSavedPulse *StorageMock) GetNextSavedPulse(fromPulseNumber models.Pulse, completedOnly bool) (p1 models.Pulse, err error) { mm_atomic.AddUint64(&mmGetNextSavedPulse.beforeGetNextSavedPulseCounter, 1) defer mm_atomic.AddUint64(&mmGetNextSavedPulse.afterGetNextSavedPulseCounter, 1) if mmGetNextSavedPulse.inspectFuncGetNextSavedPulse != nil { - mmGetNextSavedPulse.inspectFuncGetNextSavedPulse(fromPulseNumber) + mmGetNextSavedPulse.inspectFuncGetNextSavedPulse(fromPulseNumber, completedOnly) } - mm_params := &StorageMockGetNextSavedPulseParams{fromPulseNumber} + mm_params := &StorageMockGetNextSavedPulseParams{fromPulseNumber, completedOnly} // Record call args mmGetNextSavedPulse.GetNextSavedPulseMock.mutex.Lock() @@ -813,7 +814,7 @@ func (mmGetNextSavedPulse *StorageMock) GetNextSavedPulse(fromPulseNumber models if mmGetNextSavedPulse.GetNextSavedPulseMock.defaultExpectation != nil { mm_atomic.AddUint64(&mmGetNextSavedPulse.GetNextSavedPulseMock.defaultExpectation.Counter, 1) mm_want := mmGetNextSavedPulse.GetNextSavedPulseMock.defaultExpectation.params - mm_got := StorageMockGetNextSavedPulseParams{fromPulseNumber} + mm_got := StorageMockGetNextSavedPulseParams{fromPulseNumber, completedOnly} if mm_want != nil && !minimock.Equal(*mm_want, mm_got) { mmGetNextSavedPulse.t.Errorf("StorageMock.GetNextSavedPulse got unexpected parameters, want: %#v, got: %#v%s\n", *mm_want, mm_got, minimock.Diff(*mm_want, mm_got)) } @@ -825,9 +826,9 @@ func (mmGetNextSavedPulse *StorageMock) GetNextSavedPulse(fromPulseNumber models return (*mm_results).p1, (*mm_results).err } if mmGetNextSavedPulse.funcGetNextSavedPulse != nil { - return mmGetNextSavedPulse.funcGetNextSavedPulse(fromPulseNumber) + return mmGetNextSavedPulse.funcGetNextSavedPulse(fromPulseNumber, completedOnly) } - mmGetNextSavedPulse.t.Fatalf("Unexpected call to StorageMock.GetNextSavedPulse. %v", fromPulseNumber) + mmGetNextSavedPulse.t.Fatalf("Unexpected call to StorageMock.GetNextSavedPulse. %v %v", fromPulseNumber, completedOnly) return } diff --git a/etl/processor/processor.go b/etl/processor/processor.go index 870127fe..7432313f 100644 --- a/etl/processor/processor.go +++ b/etl/processor/processor.go @@ -44,13 +44,20 @@ func NewProcessor(jb interfaces.Transformer, storage interfaces.StorageSetter, c var ErrorAlreadyStarted = errors.New("Already started") func (p *Processor) Start(ctx context.Context) error { - p.taskCCloseMu.Lock() - if !atomic.CompareAndSwapInt32(&p.active, 0, 1) { - p.taskCCloseMu.Unlock() - return ErrorAlreadyStarted + startOnce := func() error { + p.taskCCloseMu.Lock() + defer p.taskCCloseMu.Unlock() + if !atomic.CompareAndSwapInt32(&p.active, 0, 1) { + return ErrorAlreadyStarted + } + p.taskC = make(chan Task) + return nil + } + + err := startOnce() + if err != nil { + return err } - p.taskC = make(chan Task) - p.taskCCloseMu.Unlock() for i := 0; i < p.workers; i++ { go func() { @@ -164,5 +171,5 @@ func (p *Processor) process(ctx context.Context, jd *types.JetDrop) { return } p.controller.SetJetDropData(pd, mjd.JetID) - logger.Infof("Processed: pulseNumber = %d, jetID = %v\n", pd.PulseNo, mjd.JetID) + logger.Infof("Processed: pulseNumber = %d, jetID = %v", pd.PulseNo, mjd.JetID) } diff --git a/etl/storage/storage.go b/etl/storage/storage.go index 51fd5e7b..1972fdcb 100644 --- a/etl/storage/storage.go +++ b/etl/storage/storage.go @@ -448,12 +448,16 @@ func (s *Storage) GetSequentialPulse() (models.Pulse, error) { } // GetNextSavedPulse returns first pulse with pulse number bigger then fromPulseNumber from db. -func (s *Storage) GetNextSavedPulse(fromPulseNumber models.Pulse) (models.Pulse, error) { +func (s *Storage) GetNextSavedPulse(fromPulseNumber models.Pulse, completedOnly bool) (models.Pulse, error) { timer := prometheus.NewTimer(GetNextSavedPulseDuration) defer timer.ObserveDuration() var pulses []models.Pulse - err := s.db.Where("pulse_number > ?", fromPulseNumber.PulseNumber).Order("pulse_number asc").Limit(1).Find(&pulses).Error + db := s.db.Where("pulse_number > ?", fromPulseNumber.PulseNumber) + if completedOnly { + db = db.Where("is_complete = ?", true) + } + err := db.Order("pulse_number asc").Limit(1).Find(&pulses).Error if err != nil { return models.Pulse{}, err } diff --git a/etl/storage/storage_test.go b/etl/storage/storage_test.go index 05ecff32..545fb9a0 100644 --- a/etl/storage/storage_test.go +++ b/etl/storage/storage_test.go @@ -1799,7 +1799,7 @@ func TestStorage_GetNextSavedPulse(t *testing.T) { err = testutils.CreatePulse(testDB, expectedPulse) require.NoError(t, err) - res, err := s.GetNextSavedPulse(pulse) + res, err := s.GetNextSavedPulse(pulse, false) require.NoError(t, err) require.Equal(t, expectedPulse, res) } @@ -1810,7 +1810,7 @@ func TestStorage_GetNextSavedPulse_Empty(t *testing.T) { pulse, err := testutils.InitPulseDB() require.NoError(t, err) - sequentialPulse, err := s.GetNextSavedPulse(pulse) + sequentialPulse, err := s.GetNextSavedPulse(pulse, false) require.NoError(t, err) require.Equal(t, models.Pulse{}, sequentialPulse) } diff --git a/etl/transformer/mainnet_impl.go b/etl/transformer/mainnet_impl.go index 2f3c509c..ea88f510 100644 --- a/etl/transformer/mainnet_impl.go +++ b/etl/transformer/mainnet_impl.go @@ -18,11 +18,11 @@ type MainNetTransformer struct { transformerChan chan *types.JetDrop } -func NewMainNetTransformer(ch <-chan *types.PlatformPulseData) *MainNetTransformer { +func NewMainNetTransformer(ch <-chan *types.PlatformPulseData, queueLen uint32) *MainNetTransformer { return &MainNetTransformer{ stopSignal: make(chan bool, 1), extractorChan: ch, - transformerChan: make(chan *types.JetDrop, 1000), + transformerChan: make(chan *types.JetDrop, queueLen), } } @@ -70,18 +70,16 @@ func (m *MainNetTransformer) run(ctx context.Context) { Errors.Inc() return } - go func() { - if len(transform) == 0 { - belogger.FromContext(ctx).Warn("no transformed data to logging") - } else { - belogger.FromContext(ctx). - Infof("transformed jet drop to canonical for pulse: %d", transform[0].MainSection.Start.PulseData.PulseNo) - for _, jetDrop := range transform { - m.transformerChan <- jetDrop - FromTransformerDataQueue.Set(float64(len(m.transformerChan))) - } + if len(transform) == 0 { + belogger.FromContext(ctx).Warn("no transformed data to logging") + } else { + belogger.FromContext(ctx). + Infof("transformed jet drop to canonical for pulse: %d", transform[0].MainSection.Start.PulseData.PulseNo) + for _, jetDrop := range transform { + m.transformerChan <- jetDrop + FromTransformerDataQueue.Set(float64(len(m.transformerChan))) } - }() + } case <-m.stopSignal: m.stopSignal <- true return diff --git a/etl/transformer/mainnet_test.go b/etl/transformer/mainnet_test.go index 000617ae..4640b1f6 100644 --- a/etl/transformer/mainnet_test.go +++ b/etl/transformer/mainnet_test.go @@ -46,7 +46,7 @@ func TestTransformer_withDifferentJetId(t *testing.T) { } dropsCh := make(chan *types.PlatformPulseData) - var transformer interfaces.Transformer = NewMainNetTransformer(dropsCh) + var transformer interfaces.Transformer = NewMainNetTransformer(dropsCh, 100) err := transformer.Start(ctx) require.NoError(t, err) defer transformer.Stop(ctx) diff --git a/go.mod b/go.mod index ed58b2bb..08238bb0 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/jackc/pgx/v4 v4.8.1 // indirect github.com/jinzhu/gorm v1.9.15 github.com/jinzhu/now v1.1.1 // indirect - github.com/kelindar/binary v1.0.9 + github.com/kelindar/binary v1.0.9 // indirect github.com/labstack/echo/v4 v4.1.16 github.com/lib/pq v1.8.0 // indirect github.com/mattn/go-colorable v0.1.7 // indirect diff --git a/testutils/betestsetup/betestsetup.go b/testutils/betestsetup/betestsetup.go index 8282a517..43178371 100644 --- a/testutils/betestsetup/betestsetup.go +++ b/testutils/betestsetup/betestsetup.go @@ -55,20 +55,20 @@ func (b *BlockExplorerTestSetUp) Start() error { b.ctx = context.Background() pulseExtractor := extractor.NewPlatformPulseExtractor(b.PulseClient) - b.extr = extractor.NewPlatformExtractor(100, 0, 100, pulseExtractor, b.ExporterClient, func() {}) + b.extr = extractor.NewPlatformExtractor(100, 0, 100, 100, pulseExtractor, b.ExporterClient, func() {}) err := b.extr.Start(b.ctx) if err != nil { return err } - b.trsf = transformer.NewMainNetTransformer(b.extr.GetJetDrops(b.ctx)) + b.trsf = transformer.NewMainNetTransformer(b.extr.GetJetDrops(b.ctx), 100) err = b.trsf.Start(b.ctx) if err != nil { return err } b.strg = storage.NewStorage(b.DB) - b.cont, err = controller.NewController(cfg, b.extr, b.strg) + b.cont, err = controller.NewController(cfg, b.extr, b.strg, 2) if err != nil { return err }