Skip to content

Commit 9386361

Browse files
committed
update method calls
1 parent 908c4f4 commit 9386361

File tree

5 files changed

+59
-28
lines changed

5 files changed

+59
-28
lines changed

pkg/orchestrator/orchestrator_test.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
proc_builtin "github.com/conduitio/conduit/pkg/plugin/processor/builtin"
3838
"github.com/conduitio/conduit/pkg/processor"
3939
"github.com/google/go-cmp/cmp"
40+
"github.com/jpillora/backoff"
4041
"github.com/matryer/is"
4142
"github.com/rs/zerolog"
4243
"go.uber.org/mock/gomock"
@@ -90,10 +91,12 @@ func TestPipelineSimple(t *testing.T) {
9091
nil,
9192
)
9293

94+
b := &backoff.Backoff{}
95+
9396
orc := NewOrchestrator(
9497
db,
9598
logger,
96-
pipeline.NewService(logger, db),
99+
pipeline.NewService(logger, db, b),
97100
connector.NewService(logger, db, connector.NewPersister(logger, db, time.Second, 3)),
98101
processor.NewService(logger, db, procPluginService),
99102
connPluginService,

pkg/pipeline/lifecycle.go

-2
Original file line numberDiff line numberDiff line change
@@ -634,8 +634,6 @@ func (s *Service) runPipeline(ctx context.Context, pl *Instance) error {
634634
} else {
635635
pl.SetStatus(StatusRecovering)
636636
// TODO: Implement backoff strategy
637-
// check backoff strategy
638-
639637
}
640638
}
641639

pkg/pipeline/lifecycle_test.go

+18-9
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
pmock "github.com/conduitio/conduit/pkg/plugin/connector/mock"
3636
"github.com/conduitio/conduit/pkg/processor"
3737
"github.com/google/uuid"
38+
"github.com/jpillora/backoff"
3839
"github.com/matryer/is"
3940
"github.com/rs/zerolog"
4041
"go.uber.org/mock/gomock"
@@ -50,8 +51,9 @@ func TestServiceLifecycle_buildNodes(t *testing.T) {
5051
logger := log.New(zerolog.Nop())
5152
db := &inmemory.DB{}
5253
persister := connector.NewPersister(logger, db, time.Second, 3)
54+
b := &backoff.Backoff{}
5355

54-
ps := NewService(logger, db)
56+
ps := NewService(logger, db, b)
5557

5658
source := dummySource(persister)
5759
destination := dummyDestination(persister)
@@ -133,8 +135,9 @@ func TestService_buildNodes_NoSourceNode(t *testing.T) {
133135
logger := log.New(zerolog.Nop())
134136
db := &inmemory.DB{}
135137
persister := connector.NewPersister(logger, db, time.Second, 3)
138+
b := &backoff.Backoff{}
136139

137-
ps := NewService(logger, db)
140+
ps := NewService(logger, db, b)
138141

139142
wantErr := "can't build pipeline without any source connectors"
140143

@@ -180,8 +183,9 @@ func TestService_buildNodes_NoDestinationNode(t *testing.T) {
180183
logger := log.New(zerolog.Nop())
181184
db := &inmemory.DB{}
182185
persister := connector.NewPersister(logger, db, time.Second, 3)
186+
b := &backoff.Backoff{}
183187

184-
ps := NewService(logger, db)
188+
ps := NewService(logger, db, b)
185189

186190
wantErr := "can't build pipeline without any destination connectors"
187191

@@ -228,8 +232,9 @@ func TestServiceLifecycle_PipelineSuccess(t *testing.T) {
228232
db := &inmemory.DB{}
229233
persister := connector.NewPersister(logger, db, time.Second, 3)
230234
defer persister.Wait()
235+
b := &backoff.Backoff{}
231236

232-
ps := NewService(logger, db)
237+
ps := NewService(logger, db, b)
233238

234239
// create a host pipeline
235240
pl, err := ps.Create(ctx, uuid.NewString(), Config{Name: "test pipeline"}, ProvisionTypeAPI)
@@ -287,8 +292,9 @@ func TestServiceLifecycle_PipelineError(t *testing.T) {
287292
logger := log.Test(t)
288293
db := &inmemory.DB{}
289294
persister := connector.NewPersister(logger, db, time.Second, 3)
295+
b := &backoff.Backoff{}
290296

291-
ps := NewService(logger, db)
297+
ps := NewService(logger, db, b)
292298

293299
// create a host pipeline
294300
pl, err := ps.Create(ctx, uuid.NewString(), Config{Name: "test pipeline"}, ProvisionTypeAPI)
@@ -371,8 +377,9 @@ func TestServiceLifecycle_StopAll_Recovering(t *testing.T) {
371377
logger := log.New(zerolog.Nop())
372378
db := &inmemory.DB{}
373379
persister := connector.NewPersister(logger, db, time.Second, 3)
380+
b := &backoff.Backoff{}
374381

375-
ps := NewService(logger, db)
382+
ps := NewService(logger, db, b)
376383

377384
// create a host pipeline
378385
pl, err := ps.Create(ctx, uuid.NewString(), Config{Name: "test pipeline"}, ProvisionTypeAPI)
@@ -472,8 +479,9 @@ func TestServiceLifecycle_PipelineStop(t *testing.T) {
472479
logger := log.New(zerolog.Nop())
473480
db := &inmemory.DB{}
474481
persister := connector.NewPersister(logger, db, time.Second, 3)
482+
b := &backoff.Backoff{}
475483

476-
ps := NewService(logger, db)
484+
ps := NewService(logger, db, b)
477485

478486
// create a host pipeline
479487
pl, err := ps.Create(ctx, uuid.NewString(), Config{Name: "test pipeline"}, ProvisionTypeAPI)
@@ -533,8 +541,9 @@ func TestService_Run_Rerun(t *testing.T) {
533541
logger := log.Test(t)
534542
db := &inmemory.DB{}
535543
persister := connector.NewPersister(logger, db, time.Second, 3)
544+
b := &backoff.Backoff{}
536545

537-
ps := NewService(logger, db)
546+
ps := NewService(logger, db, b)
538547

539548
// create a host pipeline
540549
pl, err := ps.Create(ctx, uuid.NewString(), Config{Name: "test pipeline"}, ProvisionTypeAPI)
@@ -571,7 +580,7 @@ func TestService_Run_Rerun(t *testing.T) {
571580
is.NoErr(err)
572581

573582
// create a new pipeline service and initialize it
574-
ps = NewService(logger, db)
583+
ps = NewService(logger, db, b)
575584
err = ps.Init(ctx)
576585
is.NoErr(err)
577586
err = ps.Run(

pkg/pipeline/service_test.go

+30-15
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/conduitio/conduit/pkg/foundation/cerrors"
2626
"github.com/conduitio/conduit/pkg/foundation/log"
2727
"github.com/google/uuid"
28+
"github.com/jpillora/backoff"
2829
"github.com/matryer/is"
2930
"go.uber.org/mock/gomock"
3031
)
@@ -34,15 +35,16 @@ func TestService_Init_Simple(t *testing.T) {
3435
ctx := context.Background()
3536
logger := log.Nop()
3637
db := &inmemory.DB{}
38+
b := &backoff.Backoff{}
3739

38-
service := NewService(logger, db)
40+
service := NewService(logger, db, b)
3941
_, err := service.Create(ctx, uuid.NewString(), Config{Name: "test-pipeline"}, ProvisionTypeAPI)
4042
is.NoErr(err)
4143

4244
want := service.List(ctx)
4345

4446
// create a new pipeline service and initialize it
45-
service = NewService(logger, db)
47+
service = NewService(logger, db, b)
4648
err = service.Init(ctx)
4749
is.NoErr(err)
4850

@@ -61,6 +63,7 @@ func TestService_Check(t *testing.T) {
6163
ctx := context.Background()
6264
logger := log.Nop()
6365
db := mock.NewDB(gomock.NewController(t))
66+
b := &backoff.Backoff{}
6467

6568
testCases := []struct {
6669
name string
@@ -80,7 +83,7 @@ func TestService_Check(t *testing.T) {
8083
t.Run(tc.name, func(t *testing.T) {
8184
is := is.New(t)
8285
db.EXPECT().Ping(gomock.Any()).Return(tc.wantErr)
83-
service := NewService(logger, db)
86+
service := NewService(logger, db, b)
8487

8588
gotErr := service.Check(ctx)
8689
is.Equal(tc.wantErr, gotErr)
@@ -92,8 +95,9 @@ func TestService_CreateSuccess(t *testing.T) {
9295
ctx := context.Background()
9396
logger := log.Nop()
9497
db := &inmemory.DB{}
98+
b := &backoff.Backoff{}
9599

96-
service := NewService(logger, db)
100+
service := NewService(logger, db, b)
97101

98102
testCases := []struct {
99103
id string
@@ -151,8 +155,9 @@ func TestService_Create_ValidateSuccess(t *testing.T) {
151155
ctx := context.Background()
152156
logger := log.Nop()
153157
db := &inmemory.DB{}
158+
b := &backoff.Backoff{}
154159

155-
service := NewService(logger, db)
160+
service := NewService(logger, db, b)
156161

157162
testCases := []struct {
158163
name string
@@ -193,8 +198,9 @@ func TestService_Create_ValidateError(t *testing.T) {
193198
ctx := context.Background()
194199
logger := log.Nop()
195200
db := &inmemory.DB{}
201+
b := &backoff.Backoff{}
196202

197-
service := NewService(logger, db)
203+
service := NewService(logger, db, b)
198204

199205
testCases := []struct {
200206
name string
@@ -262,8 +268,9 @@ func TestService_Create_PipelineNameExists(t *testing.T) {
262268
ctx := context.Background()
263269
logger := log.Nop()
264270
db := &inmemory.DB{}
271+
b := &backoff.Backoff{}
265272

266-
service := NewService(logger, db)
273+
service := NewService(logger, db, b)
267274

268275
conf := Config{Name: "test-pipeline"}
269276
got, err := service.Create(ctx, uuid.NewString(), conf, ProvisionTypeAPI)
@@ -279,8 +286,9 @@ func TestService_CreateEmptyName(t *testing.T) {
279286
ctx := context.Background()
280287
logger := log.Nop()
281288
db := &inmemory.DB{}
289+
b := &backoff.Backoff{}
282290

283-
service := NewService(logger, db)
291+
service := NewService(logger, db, b)
284292
got, err := service.Create(ctx, uuid.NewString(), Config{Name: ""}, ProvisionTypeAPI)
285293
is.True(err != nil)
286294
is.Equal(got, nil)
@@ -291,8 +299,9 @@ func TestService_GetSuccess(t *testing.T) {
291299
ctx := context.Background()
292300
logger := log.Nop()
293301
db := &inmemory.DB{}
302+
b := &backoff.Backoff{}
294303

295-
service := NewService(logger, db)
304+
service := NewService(logger, db, b)
296305
want, err := service.Create(ctx, uuid.NewString(), Config{Name: "test-pipeline"}, ProvisionTypeAPI)
297306
is.NoErr(err)
298307

@@ -306,8 +315,9 @@ func TestService_GetInstanceNotFound(t *testing.T) {
306315
ctx := context.Background()
307316
logger := log.Nop()
308317
db := &inmemory.DB{}
318+
b := &backoff.Backoff{}
309319

310-
service := NewService(logger, db)
320+
service := NewService(logger, db, b)
311321

312322
// get pipeline instance that does not exist
313323
got, err := service.Get(ctx, uuid.NewString())
@@ -321,8 +331,9 @@ func TestService_DeleteSuccess(t *testing.T) {
321331
ctx := context.Background()
322332
logger := log.Nop()
323333
db := &inmemory.DB{}
334+
b := &backoff.Backoff{}
324335

325-
service := NewService(logger, db)
336+
service := NewService(logger, db, b)
326337
instance, err := service.Create(ctx, uuid.NewString(), Config{Name: "test-pipeline"}, ProvisionTypeAPI)
327338
is.NoErr(err)
328339

@@ -339,8 +350,9 @@ func TestService_List(t *testing.T) {
339350
ctx := context.Background()
340351
logger := log.Nop()
341352
db := &inmemory.DB{}
353+
b := &backoff.Backoff{}
342354

343-
service := NewService(logger, db)
355+
service := NewService(logger, db, b)
344356

345357
want := make(map[string]*Instance)
346358
for i := 0; i < 10; i++ {
@@ -358,8 +370,9 @@ func TestService_UpdateSuccess(t *testing.T) {
358370
ctx := context.Background()
359371
logger := log.Nop()
360372
db := &inmemory.DB{}
373+
b := &backoff.Backoff{}
361374

362-
service := NewService(logger, db)
375+
service := NewService(logger, db, b)
363376
instance, err := service.Create(ctx, uuid.NewString(), Config{Name: "test-pipeline"}, ProvisionTypeAPI)
364377
is.NoErr(err)
365378

@@ -378,8 +391,9 @@ func TestService_Update_PipelineNameExists(t *testing.T) {
378391
ctx := context.Background()
379392
logger := log.Nop()
380393
db := &inmemory.DB{}
394+
b := &backoff.Backoff{}
381395

382-
service := NewService(logger, db)
396+
service := NewService(logger, db, b)
383397
_, err := service.Create(ctx, uuid.NewString(), Config{Name: "test-pipeline"}, ProvisionTypeAPI)
384398
is.NoErr(err)
385399
instance2, err2 := service.Create(ctx, uuid.NewString(), Config{Name: "test-pipeline2"}, ProvisionTypeAPI)
@@ -400,8 +414,9 @@ func TestService_UpdateInvalidConfig(t *testing.T) {
400414
ctx := context.Background()
401415
logger := log.Nop()
402416
db := &inmemory.DB{}
417+
b := &backoff.Backoff{}
403418

404-
service := NewService(logger, db)
419+
service := NewService(logger, db, b)
405420
instance, err := service.Create(ctx, uuid.NewString(), Config{Name: "test-pipeline"}, ProvisionTypeAPI)
406421
is.NoErr(err)
407422

pkg/provisioning/service_test.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
p4 "github.com/conduitio/conduit/pkg/provisioning/test/pipelines4-integration-test"
4141
"github.com/google/go-cmp/cmp"
4242
"github.com/google/go-cmp/cmp/cmpopts"
43+
"github.com/jpillora/backoff"
4344
"github.com/matryer/is"
4445
"github.com/rs/zerolog"
4546
"go.uber.org/mock/gomock"
@@ -513,8 +514,13 @@ func TestService_IntegrationTestServices(t *testing.T) {
513514
proc_builtin.NewRegistry(logger, proc_builtin.DefaultBuiltinProcessors, schemaRegistry),
514515
nil,
515516
)
517+
b := &backoff.Backoff{
518+
Factor: 2,
519+
Min: time.Millisecond * 100,
520+
Max: time.Second, // 8 tries
521+
}
516522

517-
plService := pipeline.NewService(logger, db)
523+
plService := pipeline.NewService(logger, db, b)
518524
connService := connector.NewService(logger, db, connector.NewPersister(logger, db, time.Second, 3))
519525
procService := processor.NewService(logger, db, procPluginService)
520526

0 commit comments

Comments
 (0)