Skip to content

Commit

Permalink
Merge pull request #186 from insolar/penv-615-shoulditeratefrom
Browse files Browse the repository at this point in the history
penv-614 priority poc
  • Loading branch information
SergeyRadist authored Oct 1, 2020
2 parents fc91e1e + 9dabe9c commit 9e1764c
Show file tree
Hide file tree
Showing 20 changed files with 242 additions and 113 deletions.
4 changes: 1 addition & 3 deletions api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,7 @@ func (s *Server) getEnrichingJetDrops(oldestPulse, newestPulse int64) ([]models.
enrichedDrops = append(enrichedDrops, ejd...)
}

nextPulse, err := s.storage.GetNextSavedPulse(models.Pulse{
PulseNumber: newestPulse,
})
nextPulse, err := s.storage.GetNextSavedPulse(models.Pulse{PulseNumber: newestPulse}, false)
if err == nil && nextPulse != emptyPulse {
ejd, err := s.storage.GetJetDrops(models.Pulse{PulseNumber: nextPulse.PulseNumber})
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions cmd/block-explorer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func main() {
100,
cfg.Replicator.ContinuousPulseRetrievingHalfPulseSeconds,
int32(cfg.Replicator.ParallelConnections),
cfg.Replicator.QueueLen,
pulseExtractor,
exporter.NewRecordExporterClient(client.GetGRPCConn()),
shutdownBE,
Expand All @@ -98,7 +99,10 @@ func main() {
}
}()

mainNetTransformer := transformer.NewMainNetTransformer(platformExtractor.GetJetDrops(ctx))
mainNetTransformer := transformer.NewMainNetTransformer(
platformExtractor.GetJetDrops(ctx),
cfg.Transformer.QueueLen,
)
err = mainNetTransformer.Start(ctx)
if err != nil {
logger.Fatal("cannot start transformer: ", err)
Expand Down Expand Up @@ -128,7 +132,7 @@ func main() {

repository := storage.NewStorage(db)

gbeController, err := controller.NewController(cfg.Controller, platformExtractor, repository)
gbeController, err := controller.NewController(cfg.Controller, platformExtractor, repository, cfg.Replicator.PlatformVersion)
if err != nil {
logger.Fatal("cannot initialize gbeController: ", err)
}
Expand Down
22 changes: 15 additions & 7 deletions configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ func init() {
}

type BlockExplorer struct {
Log Log
DB DB
Replicator Replicator
Controller Controller
Processor Processor
Metrics Metrics
Profefe Profefe
Log Log
DB DB
Replicator Replicator
Controller Controller
Processor Processor
Transformer Transformer
Metrics Metrics
Profefe Profefe
}

type API struct {
Expand Down Expand Up @@ -53,11 +54,13 @@ type TestDB struct {

// Replicator represents a configuration of the Platform connection
type Replicator struct {
PlatformVersion int `insconfig:"1| Platform version, can be 1 or 2"`
Addr string `insconfig:"127.0.0.1:5678| The gRPC server address"`
MaxTransportMsg int `insconfig:"1073741824| Maximum message size the client can send"`
WaitForConnectionRecoveryTimeout time.Duration `insconfig:"30s| Connection recovery timeout"`
ContinuousPulseRetrievingHalfPulseSeconds uint32 `insconfig:"5| Half pulse in seconds"`
ParallelConnections uint32 `insconfig:"100| Maximum parallel pulse retrievers"`
QueueLen uint32 `insconfig:"500| Max elements in extractor queue"`
Auth Auth
}

Expand Down Expand Up @@ -115,6 +118,11 @@ type Processor struct {
Workers int `insconfig:"200| The count of workers for processing transformed data"`
}

// Transformer transforms raw platform data to canonical GBE data types
type Transformer struct {
QueueLen uint32 `insconfig:"500| Max elements in transformer queue"`
}

type Profefe struct {
StartAgent bool `insconfig:"true| if true, start the profefe agent"`
Address string `insconfig:"http://127.0.0.1:10100| Profefe collector public address to send profiling data"`
Expand Down
37 changes: 32 additions & 5 deletions etl/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/insolar/block-explorer/configuration"
"github.com/insolar/block-explorer/etl/models"
"github.com/insolar/block-explorer/instrumentation/belogger"

"github.com/pkg/errors"

Expand All @@ -37,21 +38,32 @@ type Controller struct {
// sequentialPulse is greatest complete pulse after which all pulses complete too
sequentialPulse models.Pulse
sequentialPulseLock sync.RWMutex

// incompletePulseCounter for penv-615
incompletePulseCounter int
platformVersion int
}

// NewController returns implementation of interfaces.Controller
func NewController(cfg configuration.Controller, extractor interfaces.JetDropsExtractor, storage interfaces.Storage) (*Controller, error) {
func NewController(cfg configuration.Controller, extractor interfaces.JetDropsExtractor, storage interfaces.Storage, pv int) (*Controller, error) {
c := &Controller{
cfg: cfg,
extractor: extractor,
storage: storage,
jetDropRegister: make(map[types.Pulse]map[string]struct{}),
missedDataManager: NewMissedDataManager(time.Second*time.Duration(cfg.ReloadPeriod), time.Second*time.Duration(cfg.ReloadCleanPeriod)),
platformVersion: pv,
}
return c, nil
}

func (c *Controller) setIncompletePulses(ctx context.Context) error {
pulses, err := c.storage.GetIncompletePulses()
if err != nil {
return nil, errors.Wrap(err, "can't get not complete pulses from storage")
return errors.Wrap(err, "can't get not complete pulses from storage")
}
log := belogger.FromContext(ctx)
log.Debugf("Found %d incomplete pulses in db", len(pulses))
for _, p := range pulses {
key := types.Pulse{PulseNo: p.PulseNumber, PrevPulseNumber: p.PrevPulseNumber, NextPulseNumber: p.NextPulseNumber}
func() {
Expand All @@ -61,29 +73,44 @@ func NewController(cfg configuration.Controller, extractor interfaces.JetDropsEx
}()
jetDrops, err := c.storage.GetJetDrops(p)
if err != nil {
return nil, errors.Wrapf(err, "can't get jetDrops for pulse %d from storage", p.PulseNumber)
return errors.Wrapf(err, "can't get jetDrops for pulse %d from storage", p.PulseNumber)
}
for _, jd := range jetDrops {
c.SetJetDropData(key, jd.JetID)
}
}
return nil
}

func (c *Controller) setSeqPulse(ctx context.Context) error {
log := belogger.FromContext(ctx)
c.sequentialPulseLock.Lock()
defer c.sequentialPulseLock.Unlock()
var err error
c.sequentialPulse, err = c.storage.GetSequentialPulse()
if err != nil {
return nil, errors.Wrap(err, "can't get sequential pulse from storage")
return errors.Wrap(err, "can't get sequential pulse from storage")
}
emptyPulse := models.Pulse{}
if c.sequentialPulse == emptyPulse {
c.sequentialPulse = models.Pulse{
PulseNumber: 0,
}
}
return c, nil
log.Debugf("Found last sequential pulse %d in db", c.sequentialPulse.PulseNumber)
return nil
}

// Start implements interfaces.Starter
func (c *Controller) Start(ctx context.Context) error {
err := c.setIncompletePulses(ctx)
if err != nil {
return err
}
err = c.setSeqPulse(ctx)
if err != nil {
return err
}
ctx, c.cancelFunc = context.WithCancel(ctx)
c.missedDataManager.Start()
go c.pulseMaintainer(ctx)
Expand Down
49 changes: 41 additions & 8 deletions etl/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package controller

import (
"context"
"sync"
"testing"

Expand All @@ -22,19 +23,26 @@ import (
)

var cfg = configuration.Controller{PulsePeriod: 10, ReloadPeriod: 10, ReloadCleanPeriod: 1, SequentialPeriod: 1}
var platformVersion = 2

func TestNewController_NoPulses(t *testing.T) {
extractor := mock.NewJetDropsExtractorMock(t)

sm := mock.NewStorageMock(t)
sm.GetIncompletePulsesMock.Return(nil, nil)
sm.GetSequentialPulseMock.Return(models.Pulse{}, nil)
sm.GetPulseByPrevMock.Return(models.Pulse{}, nil)
sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil)
extractor.LoadJetDropsMock.Return(nil)

c, err := NewController(cfg, extractor, sm)
c, err := NewController(cfg, extractor, sm, platformVersion)
require.NoError(t, err)
require.NotNil(t, c)
require.Empty(t, c.jetDropRegister)
require.NotNil(t, c.missedDataManager)
err = c.Start(context.Background())
require.NoError(t, err)

require.Equal(t, uint64(1), sm.GetIncompletePulsesAfterCounter())
}

Expand All @@ -50,11 +58,16 @@ func TestNewController_OneNotCompletePulse(t *testing.T) {
sm.GetIncompletePulsesMock.Return([]models.Pulse{{PulseNumber: pulseNumber}}, nil)
sm.GetJetDropsMock.Return([]models.JetDrop{{JetID: firstJetID}, {JetID: secondJetID}}, nil)
sm.GetSequentialPulseMock.Return(models.Pulse{}, nil)
sm.GetPulseByPrevMock.Return(models.Pulse{}, nil)
sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil)
extractor.LoadJetDropsMock.Return(nil)

c, err := NewController(cfg, extractor, sm)
c, err := NewController(cfg, extractor, sm, platformVersion)
require.NoError(t, err)
require.NotNil(t, c)
require.NotNil(t, c.missedDataManager)
err = c.Start(context.Background())
require.NoError(t, err)

require.Equal(t, expectedData, c.jetDropRegister)

Expand All @@ -72,11 +85,16 @@ func TestNewController_OneNotCompletePulse_NoJets(t *testing.T) {
sm.GetIncompletePulsesMock.Return([]models.Pulse{{PulseNumber: pulseNumber}}, nil)
sm.GetJetDropsMock.Return([]models.JetDrop{}, nil)
sm.GetSequentialPulseMock.Return(models.Pulse{}, nil)
sm.GetPulseByPrevMock.Return(models.Pulse{}, nil)
sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil)
extractor.LoadJetDropsMock.Return(nil)

c, err := NewController(cfg, extractor, sm)
c, err := NewController(cfg, extractor, sm, platformVersion)
require.NoError(t, err)
require.NotNil(t, c)
require.NotNil(t, c.missedDataManager)
err = c.Start(context.Background())
require.NoError(t, err)

require.Equal(t, expectedData, c.jetDropRegister)

Expand Down Expand Up @@ -109,11 +127,16 @@ func TestNewController_SeveralNotCompletePulses(t *testing.T) {
sm.GetIncompletePulsesMock.Return([]models.Pulse{{PulseNumber: firstPulseNumber}, {PulseNumber: secondPulseNumber}}, nil)
sm.GetJetDropsMock.Set(getJetDrops)
sm.GetSequentialPulseMock.Return(models.Pulse{}, nil)
sm.GetPulseByPrevMock.Return(models.Pulse{}, nil)
sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil)
extractor.LoadJetDropsMock.Return(nil)

c, err := NewController(cfg, extractor, sm)
c, err := NewController(cfg, extractor, sm, platformVersion)
require.NoError(t, err)
require.NotNil(t, c)
require.NotNil(t, c.missedDataManager)
err = c.Start(context.Background())
require.NoError(t, err)

require.Equal(t, expectedData, c.jetDropRegister)

Expand All @@ -127,11 +150,16 @@ func TestNewController_ErrorGetPulses(t *testing.T) {
sm := mock.NewStorageMock(t)
sm.GetIncompletePulsesMock.Return(nil, errors.New("test error"))
sm.GetSequentialPulseMock.Return(models.Pulse{}, nil)
sm.GetPulseByPrevMock.Return(models.Pulse{}, nil)
sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil)
extractor.LoadJetDropsMock.Return(nil)

c, err := NewController(cfg, extractor, sm)
c, err := NewController(cfg, extractor, sm, platformVersion)
require.NoError(t, err)
err = c.Start(context.Background())
require.Error(t, err)
require.Contains(t, err.Error(), "test error")
require.Nil(t, c)
require.NotNil(t, c)
require.Equal(t, uint64(1), sm.GetIncompletePulsesAfterCounter())
}

Expand All @@ -144,11 +172,16 @@ func TestNewController_ErrorGetJetDrops(t *testing.T) {
sm.GetIncompletePulsesMock.Return([]models.Pulse{{PulseNumber: pulseNumber}}, nil)
sm.GetJetDropsMock.Return(nil, errors.New("test error"))
sm.GetSequentialPulseMock.Return(models.Pulse{}, nil)
sm.GetPulseByPrevMock.Return(models.Pulse{}, nil)
sm.GetNextSavedPulseMock.Return(models.Pulse{}, nil)
extractor.LoadJetDropsMock.Return(nil)

c, err := NewController(cfg, extractor, sm)
c, err := NewController(cfg, extractor, sm, platformVersion)
require.NoError(t, err)
err = c.Start(context.Background())
require.Error(t, err)
require.Contains(t, err.Error(), "test error")
require.Nil(t, c)
require.NotNil(t, c)
require.Equal(t, uint64(1), sm.GetIncompletePulsesAfterCounter())
require.Equal(t, uint64(1), sm.GetJetDropsAfterCounter())
}
Expand Down
5 changes: 0 additions & 5 deletions etl/controller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ var (
Name: "gbe_controller_current_seq_pulse",
Help: "Current sequentual pulse rerequested from platform",
})
CurrentIncompletePulse = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "gbe_controller_current_incomplete_pulse",
Help: "Current incomplete pulse that records are rerequested from platform",
})
PulseCompleteCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "gbe_controller_pulse_complete_counter",
Help: "How many pulses is completed by 'pulseIsComplete' check",
Expand All @@ -44,7 +40,6 @@ func (s Metrics) Metrics(p *metrics.Prometheus) []prometheus.Collector {
return []prometheus.Collector{
IncompletePulsesQueue,
CurrentSeqPulse,
CurrentIncompletePulse,
PulseNotCompleteCounter,
PulseCompleteCounter,
}
Expand Down
Loading

0 comments on commit 9e1764c

Please sign in to comment.