From bffda0a1b4c860ee2da4f2ada512232402e50694 Mon Sep 17 00:00:00 2001 From: SergeyRadist Date: Tue, 22 Sep 2020 19:22:06 +0300 Subject: [PATCH 1/8] penv-614 priority poc --- etl/controller/controller.go | 30 ++++++++++++++++++++++++++---- etl/controller/misseddata.go | 4 +--- etl/controller/pulsemaintainer.go | 10 +++++----- etl/extractor/platform_impl.go | 23 ++++++++++++++--------- etl/interfaces/interfaces.go | 2 +- 5 files changed, 47 insertions(+), 22 deletions(-) diff --git a/etl/controller/controller.go b/etl/controller/controller.go index b2887203..6a0158a0 100644 --- a/etl/controller/controller.go +++ b/etl/controller/controller.go @@ -12,6 +12,7 @@ import ( "github.com/insolar/block-explorer/configuration" "github.com/insolar/block-explorer/etl/models" + "github.com/insolar/block-explorer/instrumentation/belogger" "github.com/pkg/errors" @@ -48,10 +49,16 @@ func NewController(cfg configuration.Controller, extractor interfaces.JetDropsEx jetDropRegister: make(map[types.Pulse]map[string]struct{}), missedDataManager: NewMissedDataManager(time.Second*time.Duration(cfg.ReloadPeriod), time.Second*time.Duration(cfg.ReloadCleanPeriod)), } + return c, nil +} + +func (c *Controller) setIncompletePulses(ctx context.Context) error { pulses, err := c.storage.GetIncompletePulses() if err != nil { - return nil, errors.Wrap(err, "can't get not complete pulses from storage") + return errors.Wrap(err, "can't get not complete pulses from storage") } + log := belogger.FromContext(ctx) + log.Debugf("Found %d incomplete pulses in db", len(pulses)) for _, p := range pulses { key := types.Pulse{PulseNo: p.PulseNumber, PrevPulseNumber: p.PrevPulseNumber, NextPulseNumber: p.NextPulseNumber} func() { @@ -61,17 +68,23 @@ func NewController(cfg configuration.Controller, extractor interfaces.JetDropsEx }() jetDrops, err := c.storage.GetJetDrops(p) if err != nil { - return nil, errors.Wrapf(err, "can't get jetDrops for pulse %d from storage", p.PulseNumber) + return errors.Wrapf(err, "can't get jetDrops for pulse %d from storage", p.PulseNumber) } for _, jd := range jetDrops { c.SetJetDropData(key, jd.JetID) } } + return nil +} + +func (c *Controller) setSeqPulse(ctx context.Context) error { + log := belogger.FromContext(ctx) c.sequentialPulseLock.Lock() defer c.sequentialPulseLock.Unlock() + var err error c.sequentialPulse, err = c.storage.GetSequentialPulse() if err != nil { - return nil, errors.Wrap(err, "can't get sequential pulse from storage") + return errors.Wrap(err, "can't get sequential pulse from storage") } emptyPulse := models.Pulse{} if c.sequentialPulse == emptyPulse { @@ -79,11 +92,20 @@ func NewController(cfg configuration.Controller, extractor interfaces.JetDropsEx PulseNumber: 0, } } - return c, nil + log.Debugf("Found last sequential pulse %d in db", c.sequentialPulse.PulseNumber) + return nil } // Start implements interfaces.Starter func (c *Controller) Start(ctx context.Context) error { + err := c.setIncompletePulses(ctx) + if err != nil { + return err + } + err = c.setSeqPulse(ctx) + if err != nil { + return err + } ctx, c.cancelFunc = context.WithCancel(ctx) c.missedDataManager.Start() go c.pulseMaintainer(ctx) diff --git a/etl/controller/misseddata.go b/etl/controller/misseddata.go index 8b662f83..43f2869f 100644 --- a/etl/controller/misseddata.go +++ b/etl/controller/misseddata.go @@ -9,8 +9,6 @@ import ( "context" "sync" "time" - - "github.com/insolar/block-explorer/instrumentation/belogger" ) type missedData struct { @@ -68,7 +66,7 @@ func (mdm *MissedDataManager) Add(ctx context.Context, fromPulse, toPulse int64) for _, missed := range mdm.missedDataPool { if missed.fromPulse <= fromPulse && missed.toPulse >= toPulse { - belogger.FromContext(ctx).Infof("Data from pulse %d to %d was already reload", fromPulse, toPulse) + // belogger.FromContext(ctx).Infof("Data from pulse %d to %d was already reload", fromPulse, toPulse) return false } } diff --git a/etl/controller/pulsemaintainer.go b/etl/controller/pulsemaintainer.go index 87c6eec1..c8108156 100644 --- a/etl/controller/pulsemaintainer.go +++ b/etl/controller/pulsemaintainer.go @@ -71,7 +71,7 @@ func eraseJetDropRegister(ctx context.Context, c *Controller, log log.Logger) { } else { PulseNotCompleteCounter.Inc() log.Debugf("Pulse %d not completed, reloading", p.PulseNo) - c.reloadData(ctx, p.PrevPulseNumber, p.PulseNo) + c.reloadData(ctx, p.PrevPulseNumber, p.PulseNo, false) CurrentIncompletePulse.Set(float64(p.PrevPulseNumber)) } } @@ -113,8 +113,8 @@ func (c *Controller) pulseSequence(ctx context.Context) { log.Info("no next saved pulse. skipping") return } - log.Debugf("Reloading not seq pulses %d - %d", c.sequentialPulse.PulseNumber, toPulse.PrevPulseNumber) - c.reloadData(ctx, c.sequentialPulse.PulseNumber, toPulse.PrevPulseNumber) + // log.Debugf("Reloading not seq pulses %d - %d", c.sequentialPulse.PulseNumber, toPulse.PrevPulseNumber) + c.reloadData(ctx, c.sequentialPulse.PulseNumber, toPulse.PrevPulseNumber, true) return } if nextSequential.IsComplete { @@ -201,14 +201,14 @@ Main: return true } -func (c *Controller) reloadData(ctx context.Context, fromPulseNumber int64, toPulseNumber int64) { +func (c *Controller) reloadData(ctx context.Context, fromPulseNumber int64, toPulseNumber int64, priority bool) { log := belogger.FromContext(ctx) if fromPulseNumber == 0 { fromPulseNumber = pulse.MinTimePulse - 1 } if c.missedDataManager.Add(ctx, fromPulseNumber, toPulseNumber) { log.Infof("Reload data from %d to %d", fromPulseNumber, toPulseNumber) - err := c.extractor.LoadJetDrops(ctx, fromPulseNumber, toPulseNumber) + err := c.extractor.LoadJetDrops(ctx, fromPulseNumber, toPulseNumber, priority) if err != nil { log.Errorf("During loading missing data from extractor: %s", err.Error()) return diff --git a/etl/extractor/platform_impl.go b/etl/extractor/platform_impl.go index c42823cc..e1ae6670 100644 --- a/etl/extractor/platform_impl.go +++ b/etl/extractor/platform_impl.go @@ -73,8 +73,8 @@ func (e *PlatformExtractor) GetJetDrops(ctx context.Context) <-chan *types.Platf return e.mainPulseDataChan } -func (e *PlatformExtractor) LoadJetDrops(ctx context.Context, fromPulseNumber int64, toPulseNumber int64) error { - go e.retrievePulses(ctx, fromPulseNumber, toPulseNumber) +func (e *PlatformExtractor) LoadJetDrops(ctx context.Context, fromPulseNumber int64, toPulseNumber int64, priority bool) error { + go e.retrievePulses(ctx, fromPulseNumber, toPulseNumber, priority) return nil } @@ -96,7 +96,7 @@ func (e *PlatformExtractor) Start(ctx context.Context) error { belogger.FromContext(ctx).Info("Starting platform extractor mainthread...") e.hasStarted = true ctx, e.cancel = context.WithCancel(ctx) - go e.retrievePulses(ctx, 0, 0) + go e.retrievePulses(ctx, 0, 0, false) } return nil } @@ -112,14 +112,14 @@ func closeStream(ctx context.Context, stream exporter.RecordExporter_ExportClien // retrievePulses - initiates full pulse retrieving between not including from and until // zero from is latest pulse, zero until - never stop -func (e *PlatformExtractor) retrievePulses(ctx context.Context, from, until int64) { +func (e *PlatformExtractor) retrievePulses(ctx context.Context, from, until int64, priority bool) { RetrievePulsesCount.Inc() defer RetrievePulsesCount.Dec() mainThread := until <= 0 pu := &exporter.FullPulse{PulseNumber: insolar.PulseNumber(from)} var err error - logger := belogger.FromContext(ctx) + logger := belogger.FromContext(ctx).WithField("prior", priority) if mainThread { logger = logger.WithField("main", mainThread) } else { @@ -143,7 +143,7 @@ func (e *PlatformExtractor) retrievePulses(ctx context.Context, from, until int6 // check free workers if not main thread if !mainThread { - for !e.takeWorker() { + for !e.takeWorker(priority) { sleepMs := rand.Intn(1500) + 500 time.Sleep(time.Millisecond * time.Duration(sleepMs)) } @@ -310,7 +310,7 @@ func (e *PlatformExtractor) retrieveRecords(ctx context.Context, pu exporter.Ful closeStream(cancelCtx, stream) e.mainPulseDataChan <- pulseData FromExtractorDataQueue.Set(float64(len(e.mainPulseDataChan))) - log.Debug("retrieveRecords(): Done in %s", time.Since(startedAt)) + log.Debugf("retrieveRecords(): Done in %s, recs: %d", time.Since(startedAt), len(pulseData.Records)) iterateFrom := resp.ShouldIterateFrom if iterateFrom == nil { itf := resp.Record.ID.Pulse() @@ -326,8 +326,13 @@ func (e *PlatformExtractor) retrieveRecords(ctx context.Context, pu exporter.Ful } -func (e *PlatformExtractor) takeWorker() bool { - if atomic.AddInt32(&e.workers, 1) > e.maxWorkers { +func (e *PlatformExtractor) takeWorker(priority bool) bool { + max := e.maxWorkers + if priority { + // if prior then we have x2 workers, other worker stops + max = max * 2 + } + if atomic.AddInt32(&e.workers, 1) > max { atomic.AddInt32(&e.workers, -1) return false } diff --git a/etl/interfaces/interfaces.go b/etl/interfaces/interfaces.go index 1a86263a..4337af57 100644 --- a/etl/interfaces/interfaces.go +++ b/etl/interfaces/interfaces.go @@ -37,7 +37,7 @@ type JetDropsExtractor interface { // GetJetDrops stores JetDrop data in the main JetDrop channel GetJetDrops(ctx context.Context) <-chan *types.PlatformPulseData // LoadJetDrops loads JetDrop data between pulse numbers: (fromPulseNumber, toPulseNumber] - LoadJetDrops(ctx context.Context, fromPulseNumber int64, toPulseNumber int64) error + LoadJetDrops(ctx context.Context, fromPulseNumber int64, toPulseNumber int64, priority bool) error } //go:generate minimock -i github.com/insolar/block-explorer/etl/interfaces.PulseExtractor -o ./mock -s _mock.go -g From 32d9b4887199f317a0e9dc3402c5bfb0ec649302 Mon Sep 17 00:00:00 2001 From: SergeyRadist Date: Wed, 23 Sep 2020 19:05:15 +0300 Subject: [PATCH 2/8] penv-615 priority v2 --- etl/controller/metrics.go | 5 ----- etl/controller/misseddata.go | 4 +++- etl/controller/pulsemaintainer.go | 36 ++++++++++++++++--------------- etl/extractor/platform_impl.go | 2 +- etl/processor/processor.go | 2 +- etl/storage/storage.go | 2 +- 6 files changed, 25 insertions(+), 26 deletions(-) diff --git a/etl/controller/metrics.go b/etl/controller/metrics.go index fc50cc31..53bce82a 100644 --- a/etl/controller/metrics.go +++ b/etl/controller/metrics.go @@ -20,10 +20,6 @@ var ( Name: "gbe_controller_current_seq_pulse", Help: "Current sequentual pulse rerequested from platform", }) - CurrentIncompletePulse = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "gbe_controller_current_incomplete_pulse", - Help: "Current incomplete pulse that records are rerequested from platform", - }) PulseCompleteCounter = prometheus.NewCounter(prometheus.CounterOpts{ Name: "gbe_controller_pulse_complete_counter", Help: "How many pulses is completed by 'pulseIsComplete' check", @@ -44,7 +40,6 @@ func (s Metrics) Metrics(p *metrics.Prometheus) []prometheus.Collector { return []prometheus.Collector{ IncompletePulsesQueue, CurrentSeqPulse, - CurrentIncompletePulse, PulseNotCompleteCounter, PulseCompleteCounter, } diff --git a/etl/controller/misseddata.go b/etl/controller/misseddata.go index 43f2869f..8b662f83 100644 --- a/etl/controller/misseddata.go +++ b/etl/controller/misseddata.go @@ -9,6 +9,8 @@ import ( "context" "sync" "time" + + "github.com/insolar/block-explorer/instrumentation/belogger" ) type missedData struct { @@ -66,7 +68,7 @@ func (mdm *MissedDataManager) Add(ctx context.Context, fromPulse, toPulse int64) for _, missed := range mdm.missedDataPool { if missed.fromPulse <= fromPulse && missed.toPulse >= toPulse { - // belogger.FromContext(ctx).Infof("Data from pulse %d to %d was already reload", fromPulse, toPulse) + belogger.FromContext(ctx).Infof("Data from pulse %d to %d was already reload", fromPulse, toPulse) return false } } diff --git a/etl/controller/pulsemaintainer.go b/etl/controller/pulsemaintainer.go index c8108156..e4dc5be1 100644 --- a/etl/controller/pulsemaintainer.go +++ b/etl/controller/pulsemaintainer.go @@ -33,6 +33,7 @@ func (c *Controller) pulseMaintainer(ctx context.Context) { } func eraseJetDropRegister(ctx context.Context, c *Controller, log log.Logger) { + log.Debugf("pulseMaintainer(): eraseJetDropRegister start") jetDropRegisterCopy := map[types.Pulse]map[string]struct{}{} func() { c.jetDropRegisterLock.Lock() @@ -70,9 +71,9 @@ func eraseJetDropRegister(ctx context.Context, c *Controller, log log.Logger) { } } else { PulseNotCompleteCounter.Inc() - log.Debugf("Pulse %d not completed, reloading", p.PulseNo) - c.reloadData(ctx, p.PrevPulseNumber, p.PulseNo, false) - CurrentIncompletePulse.Set(float64(p.PrevPulseNumber)) + // commented for worker priority proof + // log.Debugf("Pulse %d not completed, reloading", p.PulseNo) + // c.reloadData(ctx, p.PrevPulseNumber, p.PulseNo, false) } } } @@ -103,7 +104,19 @@ func (c *Controller) pulseSequence(ctx context.Context) { return } - if nextSequential == emptyPulse { + if nextSequential.IsComplete { + err = c.storage.SequencePulse(nextSequential.PulseNumber) + if err != nil { + log.Errorf("During sequence next sequential pulse: %s", err.Error()) + return + } + c.sequentialPulse = nextSequential + log.Infof("Pulse %d sequenced", nextSequential.PulseNumber) + waitTime = time.Duration(0) + return + } + + if !nextSequential.IsComplete || nextSequential == emptyPulse { toPulse, err := c.storage.GetNextSavedPulse(c.sequentialPulse) if err != nil && !gorm.IsRecordNotFoundError(err) { log.Errorf("During loading next existing pulse: %s", err.Error()) @@ -113,21 +126,10 @@ func (c *Controller) pulseSequence(ctx context.Context) { log.Info("no next saved pulse. skipping") return } - // log.Debugf("Reloading not seq pulses %d - %d", c.sequentialPulse.PulseNumber, toPulse.PrevPulseNumber) + log.Debugf("Reloading not seq pulses %d - %d", c.sequentialPulse.PulseNumber, toPulse.PrevPulseNumber) c.reloadData(ctx, c.sequentialPulse.PulseNumber, toPulse.PrevPulseNumber, true) return } - if nextSequential.IsComplete { - err = c.storage.SequencePulse(nextSequential.PulseNumber) - if err != nil { - log.Errorf("During sequence next sequential pulse: %s", err.Error()) - return - } - c.sequentialPulse = nextSequential - log.Infof("Pulse %d sequenced", nextSequential.PulseNumber) - waitTime = time.Duration(0) - return - } }() } } @@ -207,7 +209,7 @@ func (c *Controller) reloadData(ctx context.Context, fromPulseNumber int64, toPu fromPulseNumber = pulse.MinTimePulse - 1 } if c.missedDataManager.Add(ctx, fromPulseNumber, toPulseNumber) { - log.Infof("Reload data from %d to %d", fromPulseNumber, toPulseNumber) + log.Infof("Reload data from %d to %d, prior=%v", fromPulseNumber, toPulseNumber, priority) err := c.extractor.LoadJetDrops(ctx, fromPulseNumber, toPulseNumber, priority) if err != nil { log.Errorf("During loading missing data from extractor: %s", err.Error()) diff --git a/etl/extractor/platform_impl.go b/etl/extractor/platform_impl.go index e1ae6670..f2c4ee2e 100644 --- a/etl/extractor/platform_impl.go +++ b/etl/extractor/platform_impl.go @@ -192,7 +192,7 @@ func (e *PlatformExtractor) retrievePulses(ctx context.Context, from, until int6 continue } - log.Debugf("retrievePulses(): Done, jets %d", len(pu.Jets)) + log.Debugf("retrievePulses(): Done, jets %d, new pulse: %d", len(pu.Jets), pu.PulseNumber) ReceivedPulses.Inc() LastPulseFetched.Set(float64(pu.PulseNumber)) diff --git a/etl/processor/processor.go b/etl/processor/processor.go index 870127fe..78109bcb 100644 --- a/etl/processor/processor.go +++ b/etl/processor/processor.go @@ -164,5 +164,5 @@ func (p *Processor) process(ctx context.Context, jd *types.JetDrop) { return } p.controller.SetJetDropData(pd, mjd.JetID) - logger.Infof("Processed: pulseNumber = %d, jetID = %v\n", pd.PulseNo, mjd.JetID) + logger.Infof("Processed: pulseNumber = %d, jetID = %v", pd.PulseNo, mjd.JetID) } diff --git a/etl/storage/storage.go b/etl/storage/storage.go index 51fd5e7b..e9b6738b 100644 --- a/etl/storage/storage.go +++ b/etl/storage/storage.go @@ -453,7 +453,7 @@ func (s *Storage) GetNextSavedPulse(fromPulseNumber models.Pulse) (models.Pulse, defer timer.ObserveDuration() var pulses []models.Pulse - err := s.db.Where("pulse_number > ?", fromPulseNumber.PulseNumber).Order("pulse_number asc").Limit(1).Find(&pulses).Error + err := s.db.Where("pulse_number > ? AND is_complete=?", fromPulseNumber.PulseNumber, true).Order("pulse_number asc").Limit(1).Find(&pulses).Error if err != nil { return models.Pulse{}, err } From affd2ff56bba5e6bf16c9f9aacde745f5d25b874 Mon Sep 17 00:00:00 2001 From: SergeyRadist Date: Thu, 24 Sep 2020 18:34:41 +0300 Subject: [PATCH 3/8] penv-614 removed goroutine --- etl/transformer/mainnet_impl.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/etl/transformer/mainnet_impl.go b/etl/transformer/mainnet_impl.go index 2f3c509c..0f2316f0 100644 --- a/etl/transformer/mainnet_impl.go +++ b/etl/transformer/mainnet_impl.go @@ -70,18 +70,16 @@ func (m *MainNetTransformer) run(ctx context.Context) { Errors.Inc() return } - go func() { - if len(transform) == 0 { - belogger.FromContext(ctx).Warn("no transformed data to logging") - } else { - belogger.FromContext(ctx). - Infof("transformed jet drop to canonical for pulse: %d", transform[0].MainSection.Start.PulseData.PulseNo) - for _, jetDrop := range transform { - m.transformerChan <- jetDrop - FromTransformerDataQueue.Set(float64(len(m.transformerChan))) - } + if len(transform) == 0 { + belogger.FromContext(ctx).Warn("no transformed data to logging") + } else { + belogger.FromContext(ctx). + Infof("transformed jet drop to canonical for pulse: %d", transform[0].MainSection.Start.PulseData.PulseNo) + for _, jetDrop := range transform { + m.transformerChan <- jetDrop + FromTransformerDataQueue.Set(float64(len(m.transformerChan))) } - }() + } case <-m.stopSignal: m.stopSignal <- true return From 97fc546956291043e482d42bcc60c38607077be1 Mon Sep 17 00:00:00 2001 From: SergeyRadist Date: Fri, 25 Sep 2020 12:49:06 +0300 Subject: [PATCH 4/8] penv-615 fix infinity incomplete pulses, some refactoring --- etl/controller/controller.go | 3 +++ etl/controller/pulsemaintainer.go | 34 +++++++++++++++++++++++++++++++ etl/processor/processor.go | 19 +++++++++++------ 3 files changed, 50 insertions(+), 6 deletions(-) diff --git a/etl/controller/controller.go b/etl/controller/controller.go index 6a0158a0..f2cfb7a6 100644 --- a/etl/controller/controller.go +++ b/etl/controller/controller.go @@ -38,6 +38,9 @@ type Controller struct { // sequentialPulse is greatest complete pulse after which all pulses complete too sequentialPulse models.Pulse sequentialPulseLock sync.RWMutex + + // incompletePulseCounter temp for penv-615 + incompletePulseCounter int } // NewController returns implementation of interfaces.Controller diff --git a/etl/controller/pulsemaintainer.go b/etl/controller/pulsemaintainer.go index e4dc5be1..ae618b77 100644 --- a/etl/controller/pulsemaintainer.go +++ b/etl/controller/pulsemaintainer.go @@ -76,6 +76,12 @@ func eraseJetDropRegister(ctx context.Context, c *Controller, log log.Logger) { // c.reloadData(ctx, p.PrevPulseNumber, p.PulseNo, false) } } + + if c.incompletePulseCounter == 1000 { + c.cleanJetDropRegister(ctx) + c.incompletePulseCounter = 0 + } + c.incompletePulseCounter++ } // pulseSequence check if we have spaces between pulses and rerequests this pulses @@ -217,3 +223,31 @@ func (c *Controller) reloadData(ctx context.Context, fromPulseNumber int64, toPu } } } + +func (c *Controller) cleanJetDropRegister(ctx context.Context) { + log := belogger.FromContext(ctx) + jetDropRegisterCopy := map[types.Pulse]map[string]struct{}{} + func() { + c.jetDropRegisterLock.Lock() + defer c.jetDropRegisterLock.Unlock() + for k, v := range c.jetDropRegister { + jetDropsCopy := map[string]struct{}{} + for jetID := range v { + jetDropsCopy[jetID] = struct{}{} + } + jetDropRegisterCopy[k] = jetDropsCopy + } + }() + + for p := range jetDropRegisterCopy { + if c.sequentialPulse.PulseNumber > p.PulseNo { + log.Infof("Pulse %d less then seq pulse, dropping it", p.PulseNo) + func() { + c.jetDropRegisterLock.Lock() + defer c.jetDropRegisterLock.Unlock() + delete(c.jetDropRegister, p) + }() + + } + } +} diff --git a/etl/processor/processor.go b/etl/processor/processor.go index 78109bcb..7432313f 100644 --- a/etl/processor/processor.go +++ b/etl/processor/processor.go @@ -44,13 +44,20 @@ func NewProcessor(jb interfaces.Transformer, storage interfaces.StorageSetter, c var ErrorAlreadyStarted = errors.New("Already started") func (p *Processor) Start(ctx context.Context) error { - p.taskCCloseMu.Lock() - if !atomic.CompareAndSwapInt32(&p.active, 0, 1) { - p.taskCCloseMu.Unlock() - return ErrorAlreadyStarted + startOnce := func() error { + p.taskCCloseMu.Lock() + defer p.taskCCloseMu.Unlock() + if !atomic.CompareAndSwapInt32(&p.active, 0, 1) { + return ErrorAlreadyStarted + } + p.taskC = make(chan Task) + return nil + } + + err := startOnce() + if err != nil { + return err } - p.taskC = make(chan Task) - p.taskCCloseMu.Unlock() for i := 0; i < p.workers; i++ { go func() { From 2bd8341fdfec31c69739bd9a16b8f4aa0467e4a5 Mon Sep 17 00:00:00 2001 From: SergeyRadist Date: Mon, 28 Sep 2020 12:51:49 +0300 Subject: [PATCH 5/8] penv-615 queue len in config --- cmd/block-explorer/main.go | 6 +++++- configuration/configuration.go | 21 ++++++++++++++------- etl/extractor/extractor_bench_test.go | 2 +- etl/extractor/platform_impl.go | 5 +++-- etl/extractor/platform_test.go | 8 ++++---- etl/transformer/mainnet_impl.go | 4 ++-- etl/transformer/mainnet_test.go | 2 +- instrumentation/belogger/log_cfg_test.go | 7 +++++-- testutils/betestsetup/betestsetup.go | 4 ++-- 9 files changed, 37 insertions(+), 22 deletions(-) diff --git a/cmd/block-explorer/main.go b/cmd/block-explorer/main.go index 9dcf6d7d..13128378 100644 --- a/cmd/block-explorer/main.go +++ b/cmd/block-explorer/main.go @@ -83,6 +83,7 @@ func main() { 100, cfg.Replicator.ContinuousPulseRetrievingHalfPulseSeconds, int32(cfg.Replicator.ParallelConnections), + cfg.Replicator.QueueLen, pulseExtractor, exporter.NewRecordExporterClient(client.GetGRPCConn()), shutdownBE, @@ -98,7 +99,10 @@ func main() { } }() - mainNetTransformer := transformer.NewMainNetTransformer(platformExtractor.GetJetDrops(ctx)) + mainNetTransformer := transformer.NewMainNetTransformer( + platformExtractor.GetJetDrops(ctx), + cfg.Transformer.QueueLen, + ) err = mainNetTransformer.Start(ctx) if err != nil { logger.Fatal("cannot start transformer: ", err) diff --git a/configuration/configuration.go b/configuration/configuration.go index d5c5323b..401b9005 100644 --- a/configuration/configuration.go +++ b/configuration/configuration.go @@ -17,13 +17,14 @@ func init() { } type BlockExplorer struct { - Log Log - DB DB - Replicator Replicator - Controller Controller - Processor Processor - Metrics Metrics - Profefe Profefe + Log Log + DB DB + Replicator Replicator + Controller Controller + Processor Processor + Transformer Transformer + Metrics Metrics + Profefe Profefe } type API struct { @@ -58,6 +59,7 @@ type Replicator struct { WaitForConnectionRecoveryTimeout time.Duration `insconfig:"30s| Connection recovery timeout"` ContinuousPulseRetrievingHalfPulseSeconds uint32 `insconfig:"5| Half pulse in seconds"` ParallelConnections uint32 `insconfig:"100| Maximum parallel pulse retrievers"` + QueueLen uint32 `insconfig:"500| Max elements in extractor queue"` Auth Auth } @@ -115,6 +117,11 @@ type Processor struct { Workers int `insconfig:"200| The count of workers for processing transformed data"` } +// Transformer transforms raw platform data to canonical GBE data types +type Transformer struct { + QueueLen uint32 `insconfig:"500| Max elements in transformer queue"` +} + type Profefe struct { StartAgent bool `insconfig:"true| if true, start the profefe agent"` Address string `insconfig:"http://127.0.0.1:10100| Profefe collector public address to send profiling data"` diff --git a/etl/extractor/extractor_bench_test.go b/etl/extractor/extractor_bench_test.go index c76b1311..9f0fb7fc 100644 --- a/etl/extractor/extractor_bench_test.go +++ b/etl/extractor/extractor_bench_test.go @@ -41,7 +41,7 @@ func BenchmarkPlatformExtractorGetJetDrops(b *testing.B) { require.NoError(b, err) pulseClient := clients.GetTestPulseClient(1, nil) - extractor := NewPlatformExtractor(uint32(defaultLocalBatchSize), 0, 100, NewPlatformPulseExtractor(pulseClient), &RecordExporterClient{}, func() {}) + extractor := NewPlatformExtractor(uint32(defaultLocalBatchSize), 0, 100, 100, NewPlatformPulseExtractor(pulseClient), &RecordExporterClient{}, func() {}) fullPulse, err := clients.GetFullPulse(uint32(StartPulseNumber), nil) require.NoError(b, err) go extractor.retrieveRecords(ctx, *fullPulse, true, false) diff --git a/etl/extractor/platform_impl.go b/etl/extractor/platform_impl.go index f2c4ee2e..d1a7079c 100644 --- a/etl/extractor/platform_impl.go +++ b/etl/extractor/platform_impl.go @@ -50,6 +50,7 @@ func NewPlatformExtractor( batchSize uint32, continuousPulseRetrievingHalfPulseSeconds uint32, maxWorkers int32, + queueLen uint32, pulseExtractor interfaces.PulseExtractor, exporterClient exporter.RecordExporterClient, shutdownBE func(), @@ -59,7 +60,7 @@ func NewPlatformExtractor( startStopMutex: &sync.Mutex{}, client: exporterClient, request: request, - mainPulseDataChan: make(chan *types.PlatformPulseData, 1000), + mainPulseDataChan: make(chan *types.PlatformPulseData, queueLen), pulseExtractor: pulseExtractor, batchSize: batchSize, @@ -310,7 +311,7 @@ func (e *PlatformExtractor) retrieveRecords(ctx context.Context, pu exporter.Ful closeStream(cancelCtx, stream) e.mainPulseDataChan <- pulseData FromExtractorDataQueue.Set(float64(len(e.mainPulseDataChan))) - log.Debugf("retrieveRecords(): Done in %s, recs: %d", time.Since(startedAt), len(pulseData.Records)) + log.Debugf("retrieveRecords(): Done in %s, recs: %d", time.Since(startedAt).String(), len(pulseData.Records)) iterateFrom := resp.ShouldIterateFrom if iterateFrom == nil { itf := resp.Record.ID.Pulse() diff --git a/etl/extractor/platform_test.go b/etl/extractor/platform_test.go index 5aca811d..46f624a3 100644 --- a/etl/extractor/platform_test.go +++ b/etl/extractor/platform_test.go @@ -55,7 +55,7 @@ func TestGetJetDrops(t *testing.T) { pulseClient := clients.GetTestPulseClient(65537, nil) pulseExtractor := NewPlatformPulseExtractor(pulseClient) - extractor := NewPlatformExtractor(uint32(pulseCount), 0, 100, pulseExtractor, recordClient, func() {}) + extractor := NewPlatformExtractor(uint32(pulseCount), 0, 100, 100, pulseExtractor, recordClient, func() {}) err = extractor.Start(ctx) require.NoError(t, err) defer extractor.Stop(ctx) @@ -98,7 +98,7 @@ func TestGetJetDrops_WrongVersionOnPulseError(t *testing.T) { pulseExtractor.GetNextFinalizedPulseMock.Set(func(ctx context.Context, p int64) (fp1 *exporter.FullPulse, err error) { return nil, errors.New("unknown heavy-version") }) - extractor := NewPlatformExtractor(uint32(1), 0, 100, pulseExtractor, recordClient, shutdownBETestFunc) + extractor := NewPlatformExtractor(uint32(1), 0, 100, 100, pulseExtractor, recordClient, shutdownBETestFunc) err := extractor.Start(ctx) defer extractor.Stop(ctx) require.NoError(mc, err) @@ -135,7 +135,7 @@ func TestGetJetDrops_WrongVersionOnRecordError(t *testing.T) { } pulseClient := clients.GetTestPulseClient(65537, nil) pulseExtractor := NewPlatformPulseExtractor(pulseClient) - extractor := NewPlatformExtractor(uint32(1), 0, 100, pulseExtractor, recordClient, shutdownBETestFunc) + extractor := NewPlatformExtractor(uint32(1), 0, 100, 100, pulseExtractor, recordClient, shutdownBETestFunc) err := extractor.Start(ctx) defer extractor.Stop(ctx) require.NoError(mc, err) @@ -229,7 +229,7 @@ func TestLoadJetDrops_returnsRecordByPulses(t *testing.T) { return pp, err }) - extractor := NewPlatformExtractor(77, 0, 100, pulseExtractor, recordClient, func() {}) + extractor := NewPlatformExtractor(77, 0, 100, 100, pulseExtractor, recordClient, func() {}) err := extractor.LoadJetDrops(ctx, int64(startPulseNumber-10), int64(startPulseNumber+10*(test.differentPulseCount-1))) require.NoError(t, err) for i := 0; i < test.differentPulseCount; i++ { diff --git a/etl/transformer/mainnet_impl.go b/etl/transformer/mainnet_impl.go index 0f2316f0..ea88f510 100644 --- a/etl/transformer/mainnet_impl.go +++ b/etl/transformer/mainnet_impl.go @@ -18,11 +18,11 @@ type MainNetTransformer struct { transformerChan chan *types.JetDrop } -func NewMainNetTransformer(ch <-chan *types.PlatformPulseData) *MainNetTransformer { +func NewMainNetTransformer(ch <-chan *types.PlatformPulseData, queueLen uint32) *MainNetTransformer { return &MainNetTransformer{ stopSignal: make(chan bool, 1), extractorChan: ch, - transformerChan: make(chan *types.JetDrop, 1000), + transformerChan: make(chan *types.JetDrop, queueLen), } } diff --git a/etl/transformer/mainnet_test.go b/etl/transformer/mainnet_test.go index 000617ae..4640b1f6 100644 --- a/etl/transformer/mainnet_test.go +++ b/etl/transformer/mainnet_test.go @@ -46,7 +46,7 @@ func TestTransformer_withDifferentJetId(t *testing.T) { } dropsCh := make(chan *types.PlatformPulseData) - var transformer interfaces.Transformer = NewMainNetTransformer(dropsCh) + var transformer interfaces.Transformer = NewMainNetTransformer(dropsCh, 100) err := transformer.Start(ctx) require.NoError(t, err) defer transformer.Stop(ctx) diff --git a/instrumentation/belogger/log_cfg_test.go b/instrumentation/belogger/log_cfg_test.go index d2feda49..ccd35a1a 100644 --- a/instrumentation/belogger/log_cfg_test.go +++ b/instrumentation/belogger/log_cfg_test.go @@ -120,16 +120,18 @@ func TestLog_Timestamp(t *testing.T) { for _, adapter := range adapters { adapter := adapter t.Run(adapter, func(t *testing.T) { - logger, err := newTestLogger(configuration.Log{Level: "info", Adapter: adapter, Formatter: "json"}) + logger, err := newTestLogger(configuration.Log{Level: "info", Adapter: adapter, Formatter: "text"}) require.NoError(t, err) require.NotNil(t, logger) + logger, _ = logger.Copy().WithLevel(logcommon.InfoLevel).WithCaller(logcommon.CallerFieldWithFuncName).Build() var buf bytes.Buffer + logger.Error("test") logger, err = logger.Copy().WithOutput(&buf).Build() require.NoError(t, err) logger.Error("test") - + println(buf.String()) assert.Regexp(t, regexp.MustCompile("[0-9][0-9]:[0-9][0-9]:[0-9][0-9]"), buf.String()) }) } @@ -202,6 +204,7 @@ func TestMain(m *testing.M) { } global.SetLogger(l) _ = global.SetFilter(log.DebugLevel) + l.Error("ttest") exitCode := m.Run() os.Exit(exitCode) } diff --git a/testutils/betestsetup/betestsetup.go b/testutils/betestsetup/betestsetup.go index 8282a517..6b9e17ea 100644 --- a/testutils/betestsetup/betestsetup.go +++ b/testutils/betestsetup/betestsetup.go @@ -55,13 +55,13 @@ func (b *BlockExplorerTestSetUp) Start() error { b.ctx = context.Background() pulseExtractor := extractor.NewPlatformPulseExtractor(b.PulseClient) - b.extr = extractor.NewPlatformExtractor(100, 0, 100, pulseExtractor, b.ExporterClient, func() {}) + b.extr = extractor.NewPlatformExtractor(100, 0, 100, 100, pulseExtractor, b.ExporterClient, func() {}) err := b.extr.Start(b.ctx) if err != nil { return err } - b.trsf = transformer.NewMainNetTransformer(b.extr.GetJetDrops(b.ctx)) + b.trsf = transformer.NewMainNetTransformer(b.extr.GetJetDrops(b.ctx), 100) err = b.trsf.Start(b.ctx) if err != nil { return err From 9bf668692c3cd862c6d52d0cab9a491c96805922 Mon Sep 17 00:00:00 2001 From: SergeyRadist Date: Wed, 30 Sep 2020 15:05:22 +0300 Subject: [PATCH 6/8] penv-615 priority removed, added platfrom version in cfg --- api/handlers.go | 4 +-- configuration/configuration.go | 1 + etl/controller/controller.go | 6 +++-- etl/controller/pulsemaintainer.go | 31 +++++++++++++++--------- etl/extractor/platform_impl.go | 25 +++++++++---------- etl/interfaces/interfaces.go | 6 ++--- etl/storage/storage.go | 8 ++++-- etl/storage/storage_test.go | 4 +-- go.mod | 2 +- instrumentation/belogger/log_cfg_test.go | 7 ++---- 10 files changed, 51 insertions(+), 43 deletions(-) diff --git a/api/handlers.go b/api/handlers.go index 175c5f67..51bd9760 100644 --- a/api/handlers.go +++ b/api/handlers.go @@ -259,9 +259,7 @@ func (s *Server) getEnrichingJetDrops(oldestPulse, newestPulse int64) ([]models. enrichedDrops = append(enrichedDrops, ejd...) } - nextPulse, err := s.storage.GetNextSavedPulse(models.Pulse{ - PulseNumber: newestPulse, - }) + nextPulse, err := s.storage.GetNextSavedPulse(models.Pulse{PulseNumber: newestPulse}, false) if err == nil && nextPulse != emptyPulse { ejd, err := s.storage.GetJetDrops(models.Pulse{PulseNumber: nextPulse.PulseNumber}) if err != nil { diff --git a/configuration/configuration.go b/configuration/configuration.go index 401b9005..cc15c1cf 100644 --- a/configuration/configuration.go +++ b/configuration/configuration.go @@ -54,6 +54,7 @@ type TestDB struct { // Replicator represents a configuration of the Platform connection type Replicator struct { + PlatformVersion int `insconfig:"1| Platform version, can be 1 or 2"` Addr string `insconfig:"127.0.0.1:5678| The gRPC server address"` MaxTransportMsg int `insconfig:"1073741824| Maximum message size the client can send"` WaitForConnectionRecoveryTimeout time.Duration `insconfig:"30s| Connection recovery timeout"` diff --git a/etl/controller/controller.go b/etl/controller/controller.go index f2cfb7a6..4b69297a 100644 --- a/etl/controller/controller.go +++ b/etl/controller/controller.go @@ -39,18 +39,20 @@ type Controller struct { sequentialPulse models.Pulse sequentialPulseLock sync.RWMutex - // incompletePulseCounter temp for penv-615 + // incompletePulseCounter for penv-615 incompletePulseCounter int + platformVersion int } // NewController returns implementation of interfaces.Controller -func NewController(cfg configuration.Controller, extractor interfaces.JetDropsExtractor, storage interfaces.Storage) (*Controller, error) { +func NewController(cfg configuration.Controller, extractor interfaces.JetDropsExtractor, storage interfaces.Storage, pv int) (*Controller, error) { c := &Controller{ cfg: cfg, extractor: extractor, storage: storage, jetDropRegister: make(map[types.Pulse]map[string]struct{}), missedDataManager: NewMissedDataManager(time.Second*time.Duration(cfg.ReloadPeriod), time.Second*time.Duration(cfg.ReloadCleanPeriod)), + platformVersion: pv, } return c, nil } diff --git a/etl/controller/pulsemaintainer.go b/etl/controller/pulsemaintainer.go index ae618b77..8e17aa11 100644 --- a/etl/controller/pulsemaintainer.go +++ b/etl/controller/pulsemaintainer.go @@ -71,17 +71,20 @@ func eraseJetDropRegister(ctx context.Context, c *Controller, log log.Logger) { } } else { PulseNotCompleteCounter.Inc() - // commented for worker priority proof - // log.Debugf("Pulse %d not completed, reloading", p.PulseNo) - // c.reloadData(ctx, p.PrevPulseNumber, p.PulseNo, false) + if c.platformVersion != 1 { + log.Debugf("Pulse %d not completed, reloading", p.PulseNo) + c.reloadData(ctx, p.PrevPulseNumber, p.PulseNo) + } } } - if c.incompletePulseCounter == 1000 { - c.cleanJetDropRegister(ctx) - c.incompletePulseCounter = 0 + if c.platformVersion == 1 { + if c.incompletePulseCounter == 1000 { + c.cleanJetDropRegister(ctx) + c.incompletePulseCounter = 0 + } + c.incompletePulseCounter++ } - c.incompletePulseCounter++ } // pulseSequence check if we have spaces between pulses and rerequests this pulses @@ -123,7 +126,11 @@ func (c *Controller) pulseSequence(ctx context.Context) { } if !nextSequential.IsComplete || nextSequential == emptyPulse { - toPulse, err := c.storage.GetNextSavedPulse(c.sequentialPulse) + completed := false + if c.platformVersion == 1 { + completed = true + } + toPulse, err := c.storage.GetNextSavedPulse(c.sequentialPulse, completed) if err != nil && !gorm.IsRecordNotFoundError(err) { log.Errorf("During loading next existing pulse: %s", err.Error()) return @@ -133,7 +140,7 @@ func (c *Controller) pulseSequence(ctx context.Context) { return } log.Debugf("Reloading not seq pulses %d - %d", c.sequentialPulse.PulseNumber, toPulse.PrevPulseNumber) - c.reloadData(ctx, c.sequentialPulse.PulseNumber, toPulse.PrevPulseNumber, true) + c.reloadData(ctx, c.sequentialPulse.PulseNumber, toPulse.PrevPulseNumber) return } }() @@ -209,14 +216,14 @@ Main: return true } -func (c *Controller) reloadData(ctx context.Context, fromPulseNumber int64, toPulseNumber int64, priority bool) { +func (c *Controller) reloadData(ctx context.Context, fromPulseNumber int64, toPulseNumber int64) { log := belogger.FromContext(ctx) if fromPulseNumber == 0 { fromPulseNumber = pulse.MinTimePulse - 1 } if c.missedDataManager.Add(ctx, fromPulseNumber, toPulseNumber) { - log.Infof("Reload data from %d to %d, prior=%v", fromPulseNumber, toPulseNumber, priority) - err := c.extractor.LoadJetDrops(ctx, fromPulseNumber, toPulseNumber, priority) + log.Infof("Reload data from %d to %d", fromPulseNumber, toPulseNumber) + err := c.extractor.LoadJetDrops(ctx, fromPulseNumber, toPulseNumber) if err != nil { log.Errorf("During loading missing data from extractor: %s", err.Error()) return diff --git a/etl/extractor/platform_impl.go b/etl/extractor/platform_impl.go index d1a7079c..50c60571 100644 --- a/etl/extractor/platform_impl.go +++ b/etl/extractor/platform_impl.go @@ -74,8 +74,8 @@ func (e *PlatformExtractor) GetJetDrops(ctx context.Context) <-chan *types.Platf return e.mainPulseDataChan } -func (e *PlatformExtractor) LoadJetDrops(ctx context.Context, fromPulseNumber int64, toPulseNumber int64, priority bool) error { - go e.retrievePulses(ctx, fromPulseNumber, toPulseNumber, priority) +func (e *PlatformExtractor) LoadJetDrops(ctx context.Context, fromPulseNumber int64, toPulseNumber int64) error { + go e.retrievePulses(ctx, fromPulseNumber, toPulseNumber) return nil } @@ -94,10 +94,10 @@ func (e *PlatformExtractor) Start(ctx context.Context) error { e.startStopMutex.Lock() defer e.startStopMutex.Unlock() if !e.hasStarted { - belogger.FromContext(ctx).Info("Starting platform extractor mainthread...") + belogger.FromContext(ctx).Info("Starting platform extractor main thread...") e.hasStarted = true ctx, e.cancel = context.WithCancel(ctx) - go e.retrievePulses(ctx, 0, 0, false) + go e.retrievePulses(ctx, 0, 0) } return nil } @@ -113,14 +113,14 @@ func closeStream(ctx context.Context, stream exporter.RecordExporter_ExportClien // retrievePulses - initiates full pulse retrieving between not including from and until // zero from is latest pulse, zero until - never stop -func (e *PlatformExtractor) retrievePulses(ctx context.Context, from, until int64, priority bool) { +func (e *PlatformExtractor) retrievePulses(ctx context.Context, from, until int64) { RetrievePulsesCount.Inc() defer RetrievePulsesCount.Dec() mainThread := until <= 0 pu := &exporter.FullPulse{PulseNumber: insolar.PulseNumber(from)} var err error - logger := belogger.FromContext(ctx).WithField("prior", priority) + logger := belogger.FromContext(ctx) if mainThread { logger = logger.WithField("main", mainThread) } else { @@ -144,7 +144,7 @@ func (e *PlatformExtractor) retrievePulses(ctx context.Context, from, until int6 // check free workers if not main thread if !mainThread { - for !e.takeWorker(priority) { + for !e.takeWorker() { sleepMs := rand.Intn(1500) + 500 time.Sleep(time.Millisecond * time.Duration(sleepMs)) } @@ -198,7 +198,10 @@ func (e *PlatformExtractor) retrievePulses(ctx context.Context, from, until int6 ReceivedPulses.Inc() LastPulseFetched.Set(float64(pu.PulseNumber)) if !mainThread && e.maxWorkers <= 3 { - // go to single worker scheme + // This hack made for 1 platform only + // If you set maxWorkers in config <=3, then we start receive data in serial 1 by 1. + // 3 threads made for we can get 3 potential skipped spaces between pulses at the same time. + // If some day heavy node will be able to process many parallel getRecords requests - delete this hack if nextNotEmptyPulseNumber == nil || pu.PulseNumber >= *nextNotEmptyPulseNumber { sif := "nil" if nextNotEmptyPulseNumber != nil { @@ -327,12 +330,8 @@ func (e *PlatformExtractor) retrieveRecords(ctx context.Context, pu exporter.Ful } -func (e *PlatformExtractor) takeWorker(priority bool) bool { +func (e *PlatformExtractor) takeWorker() bool { max := e.maxWorkers - if priority { - // if prior then we have x2 workers, other worker stops - max = max * 2 - } if atomic.AddInt32(&e.workers, 1) > max { atomic.AddInt32(&e.workers, -1) return false diff --git a/etl/interfaces/interfaces.go b/etl/interfaces/interfaces.go index 4337af57..9ca4de20 100644 --- a/etl/interfaces/interfaces.go +++ b/etl/interfaces/interfaces.go @@ -37,7 +37,7 @@ type JetDropsExtractor interface { // GetJetDrops stores JetDrop data in the main JetDrop channel GetJetDrops(ctx context.Context) <-chan *types.PlatformPulseData // LoadJetDrops loads JetDrop data between pulse numbers: (fromPulseNumber, toPulseNumber] - LoadJetDrops(ctx context.Context, fromPulseNumber int64, toPulseNumber int64, priority bool) error + LoadJetDrops(ctx context.Context, fromPulseNumber int64, toPulseNumber int64) error } //go:generate minimock -i github.com/insolar/block-explorer/etl/interfaces.PulseExtractor -o ./mock -s _mock.go -g @@ -122,7 +122,7 @@ type StorageAPIFetcher interface { // GetRecordsByJetDrop returns records for provided jet drop, ordered by order field. GetRecordsByJetDrop(jetDropID models.JetDropID, fromIndex, recordType *string, limit, offset int) ([]models.Record, int, error) // GetNextSavedPulse returns first pulse with pulse number bigger then fromPulseNumber from db. - GetNextSavedPulse(fromPulseNumber models.Pulse) (models.Pulse, error) + GetNextSavedPulse(fromPulseNumber models.Pulse, completedOnly bool) (models.Pulse, error) // GetJetDrops returns jetDrops for provided pulse from db. GetJetDrops(pulse models.Pulse) ([]models.JetDrop, error) } @@ -135,7 +135,7 @@ type StorageFetcher interface { // GetPulseByPrev returns pulse with provided prev pulse number from db. GetPulseByPrev(prevPulse models.Pulse) (models.Pulse, error) // GetNextSavedPulse returns first pulse with pulse number bigger then fromPulseNumber from db. - GetNextSavedPulse(fromPulseNumber models.Pulse) (models.Pulse, error) + GetNextSavedPulse(fromPulseNumber models.Pulse, completedOnly bool) (models.Pulse, error) // GetJetDrops returns jetDrops for provided pulse from db. GetJetDrops(pulse models.Pulse) ([]models.JetDrop, error) } diff --git a/etl/storage/storage.go b/etl/storage/storage.go index e9b6738b..13086019 100644 --- a/etl/storage/storage.go +++ b/etl/storage/storage.go @@ -448,12 +448,16 @@ func (s *Storage) GetSequentialPulse() (models.Pulse, error) { } // GetNextSavedPulse returns first pulse with pulse number bigger then fromPulseNumber from db. -func (s *Storage) GetNextSavedPulse(fromPulseNumber models.Pulse) (models.Pulse, error) { +func (s *Storage) GetNextSavedPulse(fromPulseNumber models.Pulse, completedOnly bool) (models.Pulse, error) { timer := prometheus.NewTimer(GetNextSavedPulseDuration) defer timer.ObserveDuration() var pulses []models.Pulse - err := s.db.Where("pulse_number > ? AND is_complete=?", fromPulseNumber.PulseNumber, true).Order("pulse_number asc").Limit(1).Find(&pulses).Error + db := s.db.Debug().Where("pulse_number > ?", fromPulseNumber.PulseNumber) + if completedOnly { + db = db.Where("is_complete = ?", true) + } + err := db.Order("pulse_number asc").Limit(1).Find(&pulses).Error if err != nil { return models.Pulse{}, err } diff --git a/etl/storage/storage_test.go b/etl/storage/storage_test.go index 05ecff32..545fb9a0 100644 --- a/etl/storage/storage_test.go +++ b/etl/storage/storage_test.go @@ -1799,7 +1799,7 @@ func TestStorage_GetNextSavedPulse(t *testing.T) { err = testutils.CreatePulse(testDB, expectedPulse) require.NoError(t, err) - res, err := s.GetNextSavedPulse(pulse) + res, err := s.GetNextSavedPulse(pulse, false) require.NoError(t, err) require.Equal(t, expectedPulse, res) } @@ -1810,7 +1810,7 @@ func TestStorage_GetNextSavedPulse_Empty(t *testing.T) { pulse, err := testutils.InitPulseDB() require.NoError(t, err) - sequentialPulse, err := s.GetNextSavedPulse(pulse) + sequentialPulse, err := s.GetNextSavedPulse(pulse, false) require.NoError(t, err) require.Equal(t, models.Pulse{}, sequentialPulse) } diff --git a/go.mod b/go.mod index ed58b2bb..08238bb0 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/jackc/pgx/v4 v4.8.1 // indirect github.com/jinzhu/gorm v1.9.15 github.com/jinzhu/now v1.1.1 // indirect - github.com/kelindar/binary v1.0.9 + github.com/kelindar/binary v1.0.9 // indirect github.com/labstack/echo/v4 v4.1.16 github.com/lib/pq v1.8.0 // indirect github.com/mattn/go-colorable v0.1.7 // indirect diff --git a/instrumentation/belogger/log_cfg_test.go b/instrumentation/belogger/log_cfg_test.go index ccd35a1a..d2feda49 100644 --- a/instrumentation/belogger/log_cfg_test.go +++ b/instrumentation/belogger/log_cfg_test.go @@ -120,18 +120,16 @@ func TestLog_Timestamp(t *testing.T) { for _, adapter := range adapters { adapter := adapter t.Run(adapter, func(t *testing.T) { - logger, err := newTestLogger(configuration.Log{Level: "info", Adapter: adapter, Formatter: "text"}) + logger, err := newTestLogger(configuration.Log{Level: "info", Adapter: adapter, Formatter: "json"}) require.NoError(t, err) require.NotNil(t, logger) - logger, _ = logger.Copy().WithLevel(logcommon.InfoLevel).WithCaller(logcommon.CallerFieldWithFuncName).Build() var buf bytes.Buffer - logger.Error("test") logger, err = logger.Copy().WithOutput(&buf).Build() require.NoError(t, err) logger.Error("test") - println(buf.String()) + assert.Regexp(t, regexp.MustCompile("[0-9][0-9]:[0-9][0-9]:[0-9][0-9]"), buf.String()) }) } @@ -204,7 +202,6 @@ func TestMain(m *testing.M) { } global.SetLogger(l) _ = global.SetFilter(log.DebugLevel) - l.Error("ttest") exitCode := m.Run() os.Exit(exitCode) } From e6efbff2308448f9e0c6e446db3c4d249b120ec9 Mon Sep 17 00:00:00 2001 From: SergeyRadist Date: Wed, 30 Sep 2020 17:20:19 +0300 Subject: [PATCH 7/8] penv-615 fix units --- cmd/block-explorer/main.go | 2 +- etl/controller/controller_test.go | 43 +++++++++++++++++++++----- etl/controller/pulsemaintainer_test.go | 28 ++++++++++------- etl/interfaces/mock/storage_mock.go | 29 ++++++++--------- etl/storage/storage.go | 2 +- testutils/betestsetup/betestsetup.go | 2 +- 6 files changed, 70 insertions(+), 36 deletions(-) diff --git a/cmd/block-explorer/main.go b/cmd/block-explorer/main.go index 13128378..f9f995b0 100644 --- a/cmd/block-explorer/main.go +++ b/cmd/block-explorer/main.go @@ -132,7 +132,7 @@ func main() { repository := storage.NewStorage(db) - gbeController, err := controller.NewController(cfg.Controller, platformExtractor, repository) + gbeController, err := controller.NewController(cfg.Controller, platformExtractor, repository, cfg.Replicator.PlatformVersion) if err != nil { logger.Fatal("cannot initialize gbeController: ", err) } diff --git a/etl/controller/controller_test.go b/etl/controller/controller_test.go index d8d36ee2..9ae5fe8b 100644 --- a/etl/controller/controller_test.go +++ b/etl/controller/controller_test.go @@ -8,6 +8,7 @@ package controller import ( + "context" "sync" "testing" @@ -22,6 +23,7 @@ import ( ) var cfg = configuration.Controller{PulsePeriod: 10, ReloadPeriod: 10, ReloadCleanPeriod: 1, SequentialPeriod: 1} +var platformVersion = 2 func TestNewController_NoPulses(t *testing.T) { extractor := mock.NewJetDropsExtractorMock(t) @@ -29,12 +31,17 @@ func TestNewController_NoPulses(t *testing.T) { sm := mock.NewStorageMock(t) sm.GetIncompletePulsesMock.Return(nil, nil) sm.GetSequentialPulseMock.Return(models.Pulse{}, nil) + sm.GetPulseByPrevMock.Return(models.Pulse{}, nil) + sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil) - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, platformVersion) require.NoError(t, err) require.NotNil(t, c) require.Empty(t, c.jetDropRegister) require.NotNil(t, c.missedDataManager) + err = c.Start(context.Background()) + require.NoError(t, err) + require.Equal(t, uint64(1), sm.GetIncompletePulsesAfterCounter()) } @@ -50,11 +57,15 @@ func TestNewController_OneNotCompletePulse(t *testing.T) { sm.GetIncompletePulsesMock.Return([]models.Pulse{{PulseNumber: pulseNumber}}, nil) sm.GetJetDropsMock.Return([]models.JetDrop{{JetID: firstJetID}, {JetID: secondJetID}}, nil) sm.GetSequentialPulseMock.Return(models.Pulse{}, nil) + sm.GetPulseByPrevMock.Return(models.Pulse{}, nil) + sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil) - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, platformVersion) require.NoError(t, err) require.NotNil(t, c) require.NotNil(t, c.missedDataManager) + err = c.Start(context.Background()) + require.NoError(t, err) require.Equal(t, expectedData, c.jetDropRegister) @@ -72,11 +83,15 @@ func TestNewController_OneNotCompletePulse_NoJets(t *testing.T) { sm.GetIncompletePulsesMock.Return([]models.Pulse{{PulseNumber: pulseNumber}}, nil) sm.GetJetDropsMock.Return([]models.JetDrop{}, nil) sm.GetSequentialPulseMock.Return(models.Pulse{}, nil) + sm.GetPulseByPrevMock.Return(models.Pulse{}, nil) + sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil) - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, platformVersion) require.NoError(t, err) require.NotNil(t, c) require.NotNil(t, c.missedDataManager) + err = c.Start(context.Background()) + require.NoError(t, err) require.Equal(t, expectedData, c.jetDropRegister) @@ -109,11 +124,15 @@ func TestNewController_SeveralNotCompletePulses(t *testing.T) { sm.GetIncompletePulsesMock.Return([]models.Pulse{{PulseNumber: firstPulseNumber}, {PulseNumber: secondPulseNumber}}, nil) sm.GetJetDropsMock.Set(getJetDrops) sm.GetSequentialPulseMock.Return(models.Pulse{}, nil) + sm.GetPulseByPrevMock.Return(models.Pulse{}, nil) + sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil) - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, platformVersion) require.NoError(t, err) require.NotNil(t, c) require.NotNil(t, c.missedDataManager) + err = c.Start(context.Background()) + require.NoError(t, err) require.Equal(t, expectedData, c.jetDropRegister) @@ -127,11 +146,15 @@ func TestNewController_ErrorGetPulses(t *testing.T) { sm := mock.NewStorageMock(t) sm.GetIncompletePulsesMock.Return(nil, errors.New("test error")) sm.GetSequentialPulseMock.Return(models.Pulse{}, nil) + sm.GetPulseByPrevMock.Return(models.Pulse{}, nil) + sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil) - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, platformVersion) + require.NoError(t, err) + err = c.Start(context.Background()) require.Error(t, err) require.Contains(t, err.Error(), "test error") - require.Nil(t, c) + require.NotNil(t, c) require.Equal(t, uint64(1), sm.GetIncompletePulsesAfterCounter()) } @@ -144,11 +167,15 @@ func TestNewController_ErrorGetJetDrops(t *testing.T) { sm.GetIncompletePulsesMock.Return([]models.Pulse{{PulseNumber: pulseNumber}}, nil) sm.GetJetDropsMock.Return(nil, errors.New("test error")) sm.GetSequentialPulseMock.Return(models.Pulse{}, nil) + sm.GetPulseByPrevMock.Return(models.Pulse{}, nil) + sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil) - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, platformVersion) + require.NoError(t, err) + err = c.Start(context.Background()) require.Error(t, err) require.Contains(t, err.Error(), "test error") - require.Nil(t, c) + require.NotNil(t, c) require.Equal(t, uint64(1), sm.GetIncompletePulsesAfterCounter()) require.Equal(t, uint64(1), sm.GetJetDropsAfterCounter()) } diff --git a/etl/controller/pulsemaintainer_test.go b/etl/controller/pulsemaintainer_test.go index f6ba5026..0264b0fe 100644 --- a/etl/controller/pulsemaintainer_test.go +++ b/etl/controller/pulsemaintainer_test.go @@ -37,7 +37,7 @@ func TestController_pulseMaintainer(t *testing.T) { sm.GetPulseByPrevMock.Return(models.Pulse{}, errors.New("test error")) defer leaktest.Check(t)() - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, 2) require.NoError(t, err) ctx := context.Background() require.NoError(t, c.Start(ctx)) @@ -79,13 +79,13 @@ func TestController_pulseSequence_StartFromNothing(t *testing.T) { require.Equal(t, int64(pulse.MinTimePulse), prevPulse.PulseNumber) return models.Pulse{}, nil }) - sm.GetNextSavedPulseMock.Set(func(fromPulseNumber models.Pulse) (p1 models.Pulse, err error) { + sm.GetNextSavedPulseMock.Set(func(fromPulseNumber models.Pulse, completedOnly bool) (p1 models.Pulse, err error) { if sm.GetNextSavedPulseBeforeCounter() == 1 || sm.GetPulseByPrevBeforeCounter() == 2 { require.Equal(t, int64(0), fromPulseNumber.PulseNumber) return models.Pulse{PrevPulseNumber: 1000100, PulseNumber: 1000110, NextPulseNumber: 1000120}, nil } if sm.GetNextSavedPulseBeforeCounter() == 3 { - require.Equal(t, int64(pulse.MinTimePulse), fromPulseNumber.PulseNumber) + require.Equal(t, int64(0), fromPulseNumber.PulseNumber) return models.Pulse{PrevPulseNumber: 1000100, PulseNumber: 1000110, NextPulseNumber: 1000120}, nil } require.Equal(t, int64(pulse.MinTimePulse), fromPulseNumber.PulseNumber) @@ -108,7 +108,7 @@ func TestController_pulseSequence_StartFromNothing(t *testing.T) { }) defer leaktest.Check(t)() - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, 2) require.NoError(t, err) ctx := context.Background() err = c.Start(ctx) @@ -152,7 +152,7 @@ func TestController_pulseSequence_StartFromSomething(t *testing.T) { require.Equal(t, int64(1000020), prevPulse.PulseNumber) return models.Pulse{PrevPulseNumber: 1000020, PulseNumber: 1000030, NextPulseNumber: 1000040, IsComplete: false}, nil }) - sm.GetNextSavedPulseMock.Set(func(fromPulseNumber models.Pulse) (p1 models.Pulse, err error) { + sm.GetNextSavedPulseMock.Set(func(fromPulseNumber models.Pulse, completedOnly bool) (p1 models.Pulse, err error) { if sm.GetNextSavedPulseBeforeCounter() == 1 || sm.GetPulseByPrevBeforeCounter() == 2 { require.Equal(t, int64(1000000), fromPulseNumber.PulseNumber) return models.Pulse{PrevPulseNumber: 1000010, PulseNumber: 1000020, NextPulseNumber: 1000030}, nil @@ -180,7 +180,7 @@ func TestController_pulseSequence_StartFromSomething(t *testing.T) { }) defer leaktest.Check(t)() - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, 2) require.NoError(t, err) ctx := context.Background() err = c.Start(ctx) @@ -216,7 +216,7 @@ func TestController_pulseSequence_Start_NoMissedData(t *testing.T) { return models.Pulse{PrevPulseNumber: 1000010, PulseNumber: 1000020, NextPulseNumber: 1000030, IsComplete: true}, nil } require.Equal(t, int64(1000020), prevPulse.PulseNumber) - return models.Pulse{PrevPulseNumber: 1000020, PulseNumber: 1000030, NextPulseNumber: 1000040, IsComplete: false}, nil + return models.Pulse{}, nil }) sm.SequencePulseMock.Set(func(pulseNumber int64) (err error) { if sm.SequencePulseBeforeCounter() == 1 { @@ -227,9 +227,12 @@ func TestController_pulseSequence_Start_NoMissedData(t *testing.T) { } return nil }) + sm.GetNextSavedPulseMock.Set(func(fromPulseNumber models.Pulse, completedOnly bool) (p1 models.Pulse, err error) { + return models.Pulse{}, nil + }) defer leaktest.Check(t)() - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, 2) require.NoError(t, err) ctx := context.Background() err = c.Start(ctx) @@ -274,9 +277,12 @@ func TestController_pulseMaintainer_Start_PulsesCompleteAndNot(t *testing.T) { wg.Done() return nil }) + sm.GetNextSavedPulseMock.Set(func(fromPulseNumber models.Pulse, completedOnly bool) (p1 models.Pulse, err error) { + return models.Pulse{}, nil + }) defer leaktest.Check(t)() - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, 2) require.NoError(t, err) ctx := context.Background() err = c.Start(ctx) @@ -303,7 +309,7 @@ func TestController_pulseSequence_ReloadPeriodExpired(t *testing.T) { sm.GetPulseByPrevMock.Set(func(prevPulse models.Pulse) (p1 models.Pulse, err error) { return models.Pulse{}, nil }) - sm.GetNextSavedPulseMock.Set(func(fromPulseNumber models.Pulse) (p1 models.Pulse, err error) { + sm.GetNextSavedPulseMock.Set(func(fromPulseNumber models.Pulse, completedOnly bool) (p1 models.Pulse, err error) { return models.Pulse{PrevPulseNumber: 1000010, PulseNumber: 1000020, NextPulseNumber: 1000030}, nil }) extractor.LoadJetDropsMock.Set(func(ctx context.Context, fromPulseNumber int64, toPulseNumber int64) (err error) { @@ -317,7 +323,7 @@ func TestController_pulseSequence_ReloadPeriodExpired(t *testing.T) { }) defer leaktest.Check(t)() - c, err := NewController(cfg, extractor, sm) + c, err := NewController(cfg, extractor, sm, 2) require.NoError(t, err) ctx := context.Background() err = c.Start(ctx) diff --git a/etl/interfaces/mock/storage_mock.go b/etl/interfaces/mock/storage_mock.go index 5b3ea15f..8413ff12 100644 --- a/etl/interfaces/mock/storage_mock.go +++ b/etl/interfaces/mock/storage_mock.go @@ -33,8 +33,8 @@ type StorageMock struct { beforeGetJetDropsCounter uint64 GetJetDropsMock mStorageMockGetJetDrops - funcGetNextSavedPulse func(fromPulseNumber models.Pulse) (p1 models.Pulse, err error) - inspectFuncGetNextSavedPulse func(fromPulseNumber models.Pulse) + funcGetNextSavedPulse func(fromPulseNumber models.Pulse, completedOnly bool) (p1 models.Pulse, err error) + inspectFuncGetNextSavedPulse func(fromPulseNumber models.Pulse, completedOnly bool) afterGetNextSavedPulseCounter uint64 beforeGetNextSavedPulseCounter uint64 GetNextSavedPulseMock mStorageMockGetNextSavedPulse @@ -700,6 +700,7 @@ type StorageMockGetNextSavedPulseExpectation struct { // StorageMockGetNextSavedPulseParams contains parameters of the Storage.GetNextSavedPulse type StorageMockGetNextSavedPulseParams struct { fromPulseNumber models.Pulse + completedOnly bool } // StorageMockGetNextSavedPulseResults contains results of the Storage.GetNextSavedPulse @@ -709,7 +710,7 @@ type StorageMockGetNextSavedPulseResults struct { } // Expect sets up expected params for Storage.GetNextSavedPulse -func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) Expect(fromPulseNumber models.Pulse) *mStorageMockGetNextSavedPulse { +func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) Expect(fromPulseNumber models.Pulse, completedOnly bool) *mStorageMockGetNextSavedPulse { if mmGetNextSavedPulse.mock.funcGetNextSavedPulse != nil { mmGetNextSavedPulse.mock.t.Fatalf("StorageMock.GetNextSavedPulse mock is already set by Set") } @@ -718,7 +719,7 @@ func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) Expect(fromPulseNumber mmGetNextSavedPulse.defaultExpectation = &StorageMockGetNextSavedPulseExpectation{} } - mmGetNextSavedPulse.defaultExpectation.params = &StorageMockGetNextSavedPulseParams{fromPulseNumber} + mmGetNextSavedPulse.defaultExpectation.params = &StorageMockGetNextSavedPulseParams{fromPulseNumber, completedOnly} for _, e := range mmGetNextSavedPulse.expectations { if minimock.Equal(e.params, mmGetNextSavedPulse.defaultExpectation.params) { mmGetNextSavedPulse.mock.t.Fatalf("Expectation set by When has same params: %#v", *mmGetNextSavedPulse.defaultExpectation.params) @@ -729,7 +730,7 @@ func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) Expect(fromPulseNumber } // Inspect accepts an inspector function that has same arguments as the Storage.GetNextSavedPulse -func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) Inspect(f func(fromPulseNumber models.Pulse)) *mStorageMockGetNextSavedPulse { +func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) Inspect(f func(fromPulseNumber models.Pulse, completedOnly bool)) *mStorageMockGetNextSavedPulse { if mmGetNextSavedPulse.mock.inspectFuncGetNextSavedPulse != nil { mmGetNextSavedPulse.mock.t.Fatalf("Inspect function is already set for StorageMock.GetNextSavedPulse") } @@ -753,7 +754,7 @@ func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) Return(p1 models.Pulse } //Set uses given function f to mock the Storage.GetNextSavedPulse method -func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) Set(f func(fromPulseNumber models.Pulse) (p1 models.Pulse, err error)) *StorageMock { +func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) Set(f func(fromPulseNumber models.Pulse, completedOnly bool) (p1 models.Pulse, err error)) *StorageMock { if mmGetNextSavedPulse.defaultExpectation != nil { mmGetNextSavedPulse.mock.t.Fatalf("Default expectation is already set for the Storage.GetNextSavedPulse method") } @@ -768,14 +769,14 @@ func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) Set(f func(fromPulseNu // When sets expectation for the Storage.GetNextSavedPulse which will trigger the result defined by the following // Then helper -func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) When(fromPulseNumber models.Pulse) *StorageMockGetNextSavedPulseExpectation { +func (mmGetNextSavedPulse *mStorageMockGetNextSavedPulse) When(fromPulseNumber models.Pulse, completedOnly bool) *StorageMockGetNextSavedPulseExpectation { if mmGetNextSavedPulse.mock.funcGetNextSavedPulse != nil { mmGetNextSavedPulse.mock.t.Fatalf("StorageMock.GetNextSavedPulse mock is already set by Set") } expectation := &StorageMockGetNextSavedPulseExpectation{ mock: mmGetNextSavedPulse.mock, - params: &StorageMockGetNextSavedPulseParams{fromPulseNumber}, + params: &StorageMockGetNextSavedPulseParams{fromPulseNumber, completedOnly}, } mmGetNextSavedPulse.expectations = append(mmGetNextSavedPulse.expectations, expectation) return expectation @@ -788,15 +789,15 @@ func (e *StorageMockGetNextSavedPulseExpectation) Then(p1 models.Pulse, err erro } // GetNextSavedPulse implements interfaces.Storage -func (mmGetNextSavedPulse *StorageMock) GetNextSavedPulse(fromPulseNumber models.Pulse) (p1 models.Pulse, err error) { +func (mmGetNextSavedPulse *StorageMock) GetNextSavedPulse(fromPulseNumber models.Pulse, completedOnly bool) (p1 models.Pulse, err error) { mm_atomic.AddUint64(&mmGetNextSavedPulse.beforeGetNextSavedPulseCounter, 1) defer mm_atomic.AddUint64(&mmGetNextSavedPulse.afterGetNextSavedPulseCounter, 1) if mmGetNextSavedPulse.inspectFuncGetNextSavedPulse != nil { - mmGetNextSavedPulse.inspectFuncGetNextSavedPulse(fromPulseNumber) + mmGetNextSavedPulse.inspectFuncGetNextSavedPulse(fromPulseNumber, completedOnly) } - mm_params := &StorageMockGetNextSavedPulseParams{fromPulseNumber} + mm_params := &StorageMockGetNextSavedPulseParams{fromPulseNumber, completedOnly} // Record call args mmGetNextSavedPulse.GetNextSavedPulseMock.mutex.Lock() @@ -813,7 +814,7 @@ func (mmGetNextSavedPulse *StorageMock) GetNextSavedPulse(fromPulseNumber models if mmGetNextSavedPulse.GetNextSavedPulseMock.defaultExpectation != nil { mm_atomic.AddUint64(&mmGetNextSavedPulse.GetNextSavedPulseMock.defaultExpectation.Counter, 1) mm_want := mmGetNextSavedPulse.GetNextSavedPulseMock.defaultExpectation.params - mm_got := StorageMockGetNextSavedPulseParams{fromPulseNumber} + mm_got := StorageMockGetNextSavedPulseParams{fromPulseNumber, completedOnly} if mm_want != nil && !minimock.Equal(*mm_want, mm_got) { mmGetNextSavedPulse.t.Errorf("StorageMock.GetNextSavedPulse got unexpected parameters, want: %#v, got: %#v%s\n", *mm_want, mm_got, minimock.Diff(*mm_want, mm_got)) } @@ -825,9 +826,9 @@ func (mmGetNextSavedPulse *StorageMock) GetNextSavedPulse(fromPulseNumber models return (*mm_results).p1, (*mm_results).err } if mmGetNextSavedPulse.funcGetNextSavedPulse != nil { - return mmGetNextSavedPulse.funcGetNextSavedPulse(fromPulseNumber) + return mmGetNextSavedPulse.funcGetNextSavedPulse(fromPulseNumber, completedOnly) } - mmGetNextSavedPulse.t.Fatalf("Unexpected call to StorageMock.GetNextSavedPulse. %v", fromPulseNumber) + mmGetNextSavedPulse.t.Fatalf("Unexpected call to StorageMock.GetNextSavedPulse. %v %v", fromPulseNumber, completedOnly) return } diff --git a/etl/storage/storage.go b/etl/storage/storage.go index 13086019..1972fdcb 100644 --- a/etl/storage/storage.go +++ b/etl/storage/storage.go @@ -453,7 +453,7 @@ func (s *Storage) GetNextSavedPulse(fromPulseNumber models.Pulse, completedOnly defer timer.ObserveDuration() var pulses []models.Pulse - db := s.db.Debug().Where("pulse_number > ?", fromPulseNumber.PulseNumber) + db := s.db.Where("pulse_number > ?", fromPulseNumber.PulseNumber) if completedOnly { db = db.Where("is_complete = ?", true) } diff --git a/testutils/betestsetup/betestsetup.go b/testutils/betestsetup/betestsetup.go index 6b9e17ea..43178371 100644 --- a/testutils/betestsetup/betestsetup.go +++ b/testutils/betestsetup/betestsetup.go @@ -68,7 +68,7 @@ func (b *BlockExplorerTestSetUp) Start() error { } b.strg = storage.NewStorage(b.DB) - b.cont, err = controller.NewController(cfg, b.extr, b.strg) + b.cont, err = controller.NewController(cfg, b.extr, b.strg, 2) if err != nil { return err } From 9dabe9cfb3ed7a4742f1a85614e4954bd0e58e3e Mon Sep 17 00:00:00 2001 From: SergeyRadist Date: Wed, 30 Sep 2020 17:39:23 +0300 Subject: [PATCH 8/8] penv-615 fix units --- etl/controller/controller_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/etl/controller/controller_test.go b/etl/controller/controller_test.go index 9ae5fe8b..686374d6 100644 --- a/etl/controller/controller_test.go +++ b/etl/controller/controller_test.go @@ -33,6 +33,7 @@ func TestNewController_NoPulses(t *testing.T) { sm.GetSequentialPulseMock.Return(models.Pulse{}, nil) sm.GetPulseByPrevMock.Return(models.Pulse{}, nil) sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil) + extractor.LoadJetDropsMock.Return(nil) c, err := NewController(cfg, extractor, sm, platformVersion) require.NoError(t, err) @@ -59,6 +60,7 @@ func TestNewController_OneNotCompletePulse(t *testing.T) { sm.GetSequentialPulseMock.Return(models.Pulse{}, nil) sm.GetPulseByPrevMock.Return(models.Pulse{}, nil) sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil) + extractor.LoadJetDropsMock.Return(nil) c, err := NewController(cfg, extractor, sm, platformVersion) require.NoError(t, err) @@ -85,6 +87,7 @@ func TestNewController_OneNotCompletePulse_NoJets(t *testing.T) { sm.GetSequentialPulseMock.Return(models.Pulse{}, nil) sm.GetPulseByPrevMock.Return(models.Pulse{}, nil) sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil) + extractor.LoadJetDropsMock.Return(nil) c, err := NewController(cfg, extractor, sm, platformVersion) require.NoError(t, err) @@ -126,6 +129,7 @@ func TestNewController_SeveralNotCompletePulses(t *testing.T) { sm.GetSequentialPulseMock.Return(models.Pulse{}, nil) sm.GetPulseByPrevMock.Return(models.Pulse{}, nil) sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil) + extractor.LoadJetDropsMock.Return(nil) c, err := NewController(cfg, extractor, sm, platformVersion) require.NoError(t, err) @@ -148,6 +152,7 @@ func TestNewController_ErrorGetPulses(t *testing.T) { sm.GetSequentialPulseMock.Return(models.Pulse{}, nil) sm.GetPulseByPrevMock.Return(models.Pulse{}, nil) sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil) + extractor.LoadJetDropsMock.Return(nil) c, err := NewController(cfg, extractor, sm, platformVersion) require.NoError(t, err) @@ -169,6 +174,7 @@ func TestNewController_ErrorGetJetDrops(t *testing.T) { sm.GetSequentialPulseMock.Return(models.Pulse{}, nil) sm.GetPulseByPrevMock.Return(models.Pulse{}, nil) sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil) + extractor.LoadJetDropsMock.Return(nil) c, err := NewController(cfg, extractor, sm, platformVersion) require.NoError(t, err)