From 0787f7b9019ec635e36b124e5fac864355d62b38 Mon Sep 17 00:00:00 2001 From: SergeyRadist Date: Thu, 1 Oct 2020 16:07:31 +0300 Subject: [PATCH 1/3] penv-668 temp fix of db error problem in penv-667 --- etl/processor/processor.go | 17 +++++++++++------ etl/processor/processor_test.go | 12 ++++++++---- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/etl/processor/processor.go b/etl/processor/processor.go index 7432313f..c5876e49 100644 --- a/etl/processor/processor.go +++ b/etl/processor/processor.go @@ -8,6 +8,7 @@ package processor import ( "context" "errors" + "fmt" "sync" "sync/atomic" @@ -44,6 +45,7 @@ func NewProcessor(jb interfaces.Transformer, storage interfaces.StorageSetter, c var ErrorAlreadyStarted = errors.New("Already started") func (p *Processor) Start(ctx context.Context) error { + log := belogger.FromContext(ctx) startOnce := func() error { p.taskCCloseMu.Lock() defer p.taskCCloseMu.Unlock() @@ -66,7 +68,11 @@ func (p *Processor) Start(ctx context.Context) error { if !ok { return } - p.process(ctx, t.JD) + err := p.process(ctx, t.JD) + if err != nil { + log.Error(err) + p.taskC <- t + } } }() } @@ -107,7 +113,7 @@ type Task struct { JD *types.JetDrop } -func (p *Processor) process(ctx context.Context, jd *types.JetDrop) { +func (p *Processor) process(ctx context.Context, jd *types.JetDrop) error { ms := jd.MainSection pd := ms.Start.PulseData @@ -123,8 +129,7 @@ func (p *Processor) process(ctx context.Context, jd *types.JetDrop) { } err := p.storage.SavePulse(mp) if err != nil { - logger.Errorf("cannot save pulse data: %s. pulse = %+v", err.Error(), mp) - return + return fmt.Errorf("cannot save pulse data: %s. pulse = %+v", err.Error(), mp) } var firstPrevHash []byte @@ -166,10 +171,10 @@ func (p *Processor) process(ctx context.Context, jd *types.JetDrop) { } err = p.storage.SaveJetDropData(mjd, mrs, mp.PulseNumber) if err != nil { - logger.Errorf("cannot save jetDrop data: %s. jetDrop:{jetID: %s, pulseNumber: %d}, record amount = %d\n", + return fmt.Errorf("cannot save jetDrop data: %s. jetDrop:{jetID: %s, pulseNumber: %d}, record amount = %d\n", err.Error(), mjd.JetID, mjd.PulseNumber, len(mrs)) - return } p.controller.SetJetDropData(pd, mjd.JetID) logger.Infof("Processed: pulseNumber = %d, jetID = %v", pd.PulseNo, mjd.JetID) + return nil } diff --git a/etl/processor/processor_test.go b/etl/processor/processor_test.go index 3a4e9289..da3dd63c 100644 --- a/etl/processor/processor_test.go +++ b/etl/processor/processor_test.go @@ -124,7 +124,8 @@ func TestProcessor_process_EmptyPrev(t *testing.T) { p := NewProcessor(trm, sm, contr, 3) require.NotNil(t, p) - p.process(ctx, &jd) + err := p.process(ctx, &jd) + require.Nil(t, err) require.Equal(t, uint64(1), sm.SaveJetDropDataAfterCounter()) require.Equal(t, uint64(1), contr.SetJetDropDataAfterCounter()) @@ -166,7 +167,8 @@ func TestProcessor_process_SeveralPrev(t *testing.T) { p := NewProcessor(trm, sm, contr, 3) require.NotNil(t, p) - p.process(ctx, &jd) + err := p.process(ctx, &jd) + require.Nil(t, err) require.Equal(t, uint64(1), sm.SaveJetDropDataAfterCounter()) require.Equal(t, uint64(1), contr.SetJetDropDataAfterCounter()) @@ -197,7 +199,8 @@ func TestProcessor_process_StorageSaveJetDropErr(t *testing.T) { p := NewProcessor(trm, sm, contr, 3) require.NotNil(t, p) - p.process(ctx, &jd) + err := p.process(ctx, &jd) + require.NotNil(t, err) require.Equal(t, uint64(1), sm.SaveJetDropDataAfterCounter()) } @@ -224,7 +227,8 @@ func TestProcessor_process_StorageSavePulseErr(t *testing.T) { p := NewProcessor(trm, sm, contr, 3) require.NotNil(t, p) - p.process(ctx, &jd) + err := p.process(ctx, &jd) + require.NotNil(t, err) require.Equal(t, uint64(1), sm.SavePulseAfterCounter()) } From e38c5d54486672b72dec8d3491ad570ae2545194 Mon Sep 17 00:00:00 2001 From: SergeyRadist Date: Thu, 1 Oct 2020 16:19:54 +0300 Subject: [PATCH 2/3] penv-668 lint --- etl/processor/processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etl/processor/processor.go b/etl/processor/processor.go index c5876e49..58173da5 100644 --- a/etl/processor/processor.go +++ b/etl/processor/processor.go @@ -171,7 +171,7 @@ func (p *Processor) process(ctx context.Context, jd *types.JetDrop) error { } err = p.storage.SaveJetDropData(mjd, mrs, mp.PulseNumber) if err != nil { - return fmt.Errorf("cannot save jetDrop data: %s. jetDrop:{jetID: %s, pulseNumber: %d}, record amount = %d\n", + return fmt.Errorf("cannot save jetDrop data: %s. jetDrop:{jetID: %s, pulseNumber: %d}, record amount = %d", err.Error(), mjd.JetID, mjd.PulseNumber, len(mrs)) } p.controller.SetJetDropData(pd, mjd.JetID) From 118da1d554a79c9193fa4492846daabc2910e910 Mon Sep 17 00:00:00 2001 From: SergeyRadist Date: Thu, 1 Oct 2020 17:03:15 +0300 Subject: [PATCH 3/3] penv-668 todo --- etl/processor/processor.go | 1 + 1 file changed, 1 insertion(+) diff --git a/etl/processor/processor.go b/etl/processor/processor.go index 58173da5..b652fda9 100644 --- a/etl/processor/processor.go +++ b/etl/processor/processor.go @@ -69,6 +69,7 @@ func (p *Processor) Start(ctx context.Context) error { return } err := p.process(ctx, t.JD) + // todo remove this in penv-667 if err != nil { log.Error(err) p.taskC <- t