Skip to content

Commit

Permalink
Merge pull request #189 from insolar/penv-668-temp-fix
Browse files Browse the repository at this point in the history
penv-668 temp fix of db error problem in penv-667
  • Loading branch information
SergeyRadist authored Oct 1, 2020
2 parents 9e1764c + 118da1d commit 2c3f594
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 10 deletions.
18 changes: 12 additions & 6 deletions etl/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package processor
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -44,6 +45,7 @@ func NewProcessor(jb interfaces.Transformer, storage interfaces.StorageSetter, c
var ErrorAlreadyStarted = errors.New("Already started")

func (p *Processor) Start(ctx context.Context) error {
log := belogger.FromContext(ctx)
startOnce := func() error {
p.taskCCloseMu.Lock()
defer p.taskCCloseMu.Unlock()
Expand All @@ -66,7 +68,12 @@ func (p *Processor) Start(ctx context.Context) error {
if !ok {
return
}
p.process(ctx, t.JD)
err := p.process(ctx, t.JD)
// todo remove this in penv-667
if err != nil {
log.Error(err)
p.taskC <- t
}
}
}()
}
Expand Down Expand Up @@ -107,7 +114,7 @@ type Task struct {
JD *types.JetDrop
}

func (p *Processor) process(ctx context.Context, jd *types.JetDrop) {
func (p *Processor) process(ctx context.Context, jd *types.JetDrop) error {
ms := jd.MainSection
pd := ms.Start.PulseData

Expand All @@ -123,8 +130,7 @@ func (p *Processor) process(ctx context.Context, jd *types.JetDrop) {
}
err := p.storage.SavePulse(mp)
if err != nil {
logger.Errorf("cannot save pulse data: %s. pulse = %+v", err.Error(), mp)
return
return fmt.Errorf("cannot save pulse data: %s. pulse = %+v", err.Error(), mp)
}

var firstPrevHash []byte
Expand Down Expand Up @@ -166,10 +172,10 @@ func (p *Processor) process(ctx context.Context, jd *types.JetDrop) {
}
err = p.storage.SaveJetDropData(mjd, mrs, mp.PulseNumber)
if err != nil {
logger.Errorf("cannot save jetDrop data: %s. jetDrop:{jetID: %s, pulseNumber: %d}, record amount = %d\n",
return fmt.Errorf("cannot save jetDrop data: %s. jetDrop:{jetID: %s, pulseNumber: %d}, record amount = %d",
err.Error(), mjd.JetID, mjd.PulseNumber, len(mrs))
return
}
p.controller.SetJetDropData(pd, mjd.JetID)
logger.Infof("Processed: pulseNumber = %d, jetID = %v", pd.PulseNo, mjd.JetID)
return nil
}
12 changes: 8 additions & 4 deletions etl/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ func TestProcessor_process_EmptyPrev(t *testing.T) {
p := NewProcessor(trm, sm, contr, 3)
require.NotNil(t, p)

p.process(ctx, &jd)
err := p.process(ctx, &jd)
require.Nil(t, err)

require.Equal(t, uint64(1), sm.SaveJetDropDataAfterCounter())
require.Equal(t, uint64(1), contr.SetJetDropDataAfterCounter())
Expand Down Expand Up @@ -166,7 +167,8 @@ func TestProcessor_process_SeveralPrev(t *testing.T) {
p := NewProcessor(trm, sm, contr, 3)
require.NotNil(t, p)

p.process(ctx, &jd)
err := p.process(ctx, &jd)
require.Nil(t, err)

require.Equal(t, uint64(1), sm.SaveJetDropDataAfterCounter())
require.Equal(t, uint64(1), contr.SetJetDropDataAfterCounter())
Expand Down Expand Up @@ -197,7 +199,8 @@ func TestProcessor_process_StorageSaveJetDropErr(t *testing.T) {
p := NewProcessor(trm, sm, contr, 3)
require.NotNil(t, p)

p.process(ctx, &jd)
err := p.process(ctx, &jd)
require.NotNil(t, err)

require.Equal(t, uint64(1), sm.SaveJetDropDataAfterCounter())
}
Expand All @@ -224,7 +227,8 @@ func TestProcessor_process_StorageSavePulseErr(t *testing.T) {
p := NewProcessor(trm, sm, contr, 3)
require.NotNil(t, p)

p.process(ctx, &jd)
err := p.process(ctx, &jd)
require.NotNil(t, err)

require.Equal(t, uint64(1), sm.SavePulseAfterCounter())
}

0 comments on commit 2c3f594

Please sign in to comment.