Skip to content

Commit 9f297cd

Browse files
committed
wip
1 parent 7d86818 commit 9f297cd

32 files changed

+82
-129
lines changed

.golangci.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ linters:
6262
# inverted configuration with `enable-all` and `disable` is not scalable during updates of golangci-lint
6363
disable-all: true
6464
enable:
65-
# We plan to enable all of the linters which are commented out.
65+
# We plan to enable all the linters which are commented out.
6666
# However, we want to enable them one by one (so we don't have to fix many issues at once).
6767
- bodyclose
6868
- depguard

pkg/pipeline/instance.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ type DLQ struct {
8383
Plugin string
8484
Settings map[string]string
8585

86-
WindowSize int
87-
WindowNackThreshold int
86+
WindowSize uint64
87+
WindowNackThreshold uint64
8888
}
8989

9090
var DefaultDLQ = DLQ{

pkg/pipeline/lifecycle.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -324,9 +324,9 @@ func (s *Service) buildParallelProcessorNode(
324324
) *stream.ParallelNode {
325325
return &stream.ParallelNode{
326326
Name: proc.ID + "-parallel",
327-
NewNode: func(i int) stream.PubSubNode {
327+
NewNode: func(i uint64) stream.PubSubNode {
328328
n := s.buildProcessorNode(pl, proc)
329-
n.Name = n.Name + "-" + strconv.Itoa(i) // add suffix to name
329+
n.Name = n.Name + "-" + strconv.FormatUint(i, 10) // add suffix to name
330330
return n
331331
},
332332
Workers: proc.Config.Workers,

pkg/pipeline/service.go

-6
Original file line numberDiff line numberDiff line change
@@ -187,12 +187,6 @@ func (s *Service) UpdateDLQ(ctx context.Context, pipelineID string, cfg DLQ) (*I
187187
if cfg.Plugin == "" {
188188
return nil, cerrors.New("DLQ plugin must be provided")
189189
}
190-
if cfg.WindowSize < 0 {
191-
return nil, cerrors.New("DLQ window size must be non-negative")
192-
}
193-
if cfg.WindowNackThreshold < 0 {
194-
return nil, cerrors.New("DLQ window nack threshold must be non-negative")
195-
}
196190
if cfg.WindowSize > 0 && cfg.WindowSize <= cfg.WindowNackThreshold {
197191
return nil, cerrors.New("DLQ window nack threshold must be lower than window size")
198192
}

pkg/pipeline/stream/dlq.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ type DLQHandlerNode struct {
4242
Name string
4343
Handler DLQHandler
4444

45-
WindowSize int
46-
WindowNackThreshold int
45+
WindowSize uint64
46+
WindowNackThreshold uint64
4747

4848
Timer metrics.Timer
4949
Histogram metrics.RecordBytesHistogram
@@ -212,13 +212,13 @@ type dlqWindow struct {
212212
// nackThreshold represents the number of tolerated nacks, if the threshold
213213
// is exceeded the window is frozen and returns an error for all further
214214
// nacks.
215-
nackThreshold int
215+
nackThreshold uint64
216216

217-
ackCount int
218-
nackCount int
217+
ackCount uint64
218+
nackCount uint64
219219
}
220220

221-
func newDLQWindow(size, threshold int) *dlqWindow {
221+
func newDLQWindow(size, threshold uint64) *dlqWindow {
222222
if size > 0 && threshold == 0 {
223223
// optimization - if threshold is 0 the window size does not matter,
224224
// setting it to 1 ensures we don't use more memory than needed

pkg/pipeline/stream/dlq_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ func TestDLQHandlerNode_Nack_ForwardToDLQ_Success(t *testing.T) {
236236
is.NoErr(err)
237237
}()
238238

239-
for i := 0; i < n.WindowNackThreshold; i++ {
239+
for i := uint64(0); i < n.WindowNackThreshold; i++ {
240240
msg := &Message{
241241
Ctx: ctx,
242242
Record: opencdc.Record{Position: []byte(uuid.NewString())},
@@ -326,8 +326,8 @@ func TestDLQWindow_WindowDisabled(t *testing.T) {
326326

327327
func TestDLQWindow_NackThresholdExceeded(t *testing.T) {
328328
testCases := []struct {
329-
windowSize int
330-
nackThreshold int
329+
windowSize uint64
330+
nackThreshold uint64
331331
}{
332332
{1, 0},
333333
{2, 0},
@@ -344,19 +344,19 @@ func TestDLQWindow_NackThresholdExceeded(t *testing.T) {
344344
w := newDLQWindow(tc.windowSize, tc.nackThreshold)
345345

346346
// fill up window with nacks up to the threshold
347-
for i := 0; i < tc.nackThreshold; i++ {
347+
for i := uint64(0); i < tc.nackThreshold; i++ {
348348
ok := w.Nack()
349349
is.True(ok)
350350
}
351351

352352
// fill up window again with acks
353-
for i := 0; i < tc.windowSize; i++ {
353+
for i := uint64(0); i < tc.windowSize; i++ {
354354
w.Ack()
355355
}
356356

357357
// since window is full of acks we should be able to fill up
358358
// the window with nacks again
359-
for i := 0; i < tc.nackThreshold; i++ {
359+
for i := uint64(0); i < tc.nackThreshold; i++ {
360360
ok := w.Nack()
361361
is.True(ok)
362362
}
@@ -367,7 +367,7 @@ func TestDLQWindow_NackThresholdExceeded(t *testing.T) {
367367

368368
// adding acks after that should make no difference, all nacks
369369
// need to fail after the threshold is reached
370-
for i := 0; i < tc.windowSize; i++ {
370+
for i := uint64(0); i < tc.windowSize; i++ {
371371
w.Ack()
372372
}
373373
ok = w.Nack()

pkg/pipeline/stream/parallel.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ type ParallelNode struct {
3232
Name string
3333
// NewNode is the constructor of the wrapped PubSubNode, it should create
3434
// the i-th node (useful for distinguishing nodes in logs).
35-
NewNode func(i int) PubSubNode
36-
Workers int
35+
NewNode func(i uint64) PubSubNode
36+
Workers uint64
3737

3838
base pubSubNodeBase
3939
logger log.CtxLogger
@@ -73,7 +73,7 @@ func (n *ParallelNode) Run(ctx context.Context) error {
7373
// buffered, so it blocks when all workers are busy
7474
workerJobs := make(chan parallelNodeJob)
7575
var workerWg sync.WaitGroup
76-
for i := 0; i < n.Workers; i++ {
76+
for i := uint64(0); i < n.Workers; i++ {
7777
node := n.NewNode(i)
7878
worker := newParallelNodeWorker(node, workerJobs, n.logger)
7979
workerWg.Add(1)

pkg/pipeline/stream/parallel_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ func TestParallelNode_Success(t *testing.T) {
343343

344344
const workerCount = 10
345345

346-
newPubSubNode := func(i int) PubSubNode {
346+
newPubSubNode := func(i uint64) PubSubNode {
347347
return &parallelTestNode{
348348
Name: fmt.Sprintf("test-node-%d", i),
349349
F: func(ctx context.Context, sub <-chan *Message, pub chan<- *Message) error {
@@ -422,7 +422,7 @@ func TestParallelNode_ErrorAll(t *testing.T) {
422422

423423
const workerCount = 10
424424

425-
newPubSubNode := func(i int) PubSubNode {
425+
newPubSubNode := func(i uint64) PubSubNode {
426426
name := fmt.Sprintf("test-node-%d", i)
427427
return &parallelTestNode{
428428
Name: name,
@@ -483,7 +483,7 @@ func TestParallelNode_ErrorSingle(t *testing.T) {
483483

484484
const workerCount = 10
485485

486-
newPubSubNode := func(i int) PubSubNode {
486+
newPubSubNode := func(i uint64) PubSubNode {
487487
name := fmt.Sprintf("test-node-%d", i)
488488
return &parallelTestNode{
489489
Name: name,
@@ -585,7 +585,7 @@ func TestParallelNode_Processor(t *testing.T) {
585585
Teardown(gomock.Any()).
586586
Times(workerCount)
587587

588-
newProcNode := func(i int) PubSubNode {
588+
newProcNode := func(i uint64) PubSubNode {
589589
return &ProcessorNode{
590590
Name: fmt.Sprintf("test-%d", i),
591591
Processor: proc,

pkg/plugin/processor/standalone/host_module.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func (*hostModuleInstance) Close(context.Context) error { return nil }
106106
// message. If the buffer is too small, it returns the size of the command
107107
// request message and parks the command request. The next call to this function
108108
// will return the same command request.
109-
func (m *hostModuleInstance) commandRequest(ctx context.Context, buf types.Bytes) types.Uint32 {
109+
func (m *hostModuleInstance) commandRequest(ctx context.Context, buf types.Bytes) types.Uint64 {
110110
m.logger.Trace(ctx).Msg("executing command_request")
111111

112112
if m.parkedCommandRequest == nil {
@@ -126,7 +126,7 @@ func (m *hostModuleInstance) commandRequest(ctx context.Context, buf types.Bytes
126126
Int("command_bytes", size).
127127
Int("allocated_bytes", len(buf)).
128128
Msgf("insufficient memory, command will be parked until next call to command_request")
129-
return types.Uint32(size)
129+
return types.Uint64(size)
130130
}
131131

132132
// If the buffer is large enough, we marshal the command into the buffer and
@@ -140,7 +140,7 @@ func (m *hostModuleInstance) commandRequest(ctx context.Context, buf types.Bytes
140140
m.parkedCommandRequest = nil
141141

142142
m.logger.Trace(ctx).Msg("returning next command")
143-
return types.Uint32(len(out))
143+
return types.Uint64(len(out))
144144
}
145145

146146
// commandResponse is the exported function that is called by the WASM module to

pkg/processor/instance.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,5 +94,5 @@ type Parent struct {
9494
// Config holds configuration data for building a processor.
9595
type Config struct {
9696
Settings map[string]string
97-
Workers int
97+
Workers uint64
9898
}

pkg/processor/service.go

-3
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,6 @@ func (s *Service) Create(
119119
pt ProvisionType,
120120
cond string,
121121
) (*Instance, error) {
122-
if cfg.Workers < 0 {
123-
return nil, cerrors.New("processor workers can't be negative")
124-
}
125122
if cfg.Workers == 0 {
126123
cfg.Workers = 1
127124
}

pkg/processor/service_test.go

-21
Original file line numberDiff line numberDiff line change
@@ -197,27 +197,6 @@ func TestService_Create_BuilderFail(t *testing.T) {
197197
is.Equal(i, nil)
198198
}
199199

200-
func TestService_Create_WorkersNegative(t *testing.T) {
201-
is := is.New(t)
202-
ctx := context.Background()
203-
db := &inmemory.DB{}
204-
205-
service := NewService(log.Nop(), db, &proc_plugin.PluginService{})
206-
207-
got, err := service.Create(
208-
ctx,
209-
uuid.NewString(),
210-
"processor-type",
211-
Parent{},
212-
Config{Workers: -1},
213-
ProvisionTypeAPI,
214-
"{{true}}",
215-
)
216-
is.True(err != nil) // expected workers error
217-
is.Equal("processor workers can't be negative", err.Error())
218-
is.Equal(got, nil)
219-
}
220-
221200
func TestService_Delete_Success(t *testing.T) {
222201
is := is.New(t)
223202
ctx := context.Background()

pkg/provisioning/config/parser.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,15 @@ type Processor struct {
4242
ID string
4343
Plugin string
4444
Settings map[string]string
45-
Workers int
45+
Workers uint64
4646
Condition string
4747
}
4848

4949
type DLQ struct {
5050
Plugin string
5151
Settings map[string]string
52-
WindowSize *int
53-
WindowNackThreshold *int
52+
WindowSize *uint64
53+
WindowNackThreshold *uint64
5454
}
5555

5656
// Classify fields as immutable, mutable or ignored. This is used by the

pkg/provisioning/config/validate.go

-3
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,6 @@ func validateProcessors(mp []Processor) []error {
9797
if cfg.Plugin == "" {
9898
errs = append(errs, cerrors.Errorf("processor %q: \"plugin\" is mandatory: %w", cfg.ID, ErrMandatoryField))
9999
}
100-
if cfg.Workers < 0 {
101-
errs = append(errs, cerrors.Errorf("processor %q: \"workers\" can't be negative: %w", cfg.ID, ErrInvalidField))
102-
}
103100
if ids[cfg.ID] {
104101
errs = append(errs, cerrors.Errorf("processor %q: \"id\" must be unique: %w", cfg.ID, ErrDuplicateID))
105102
}

pkg/provisioning/config/validate_test.go

-16
Original file line numberDiff line numberDiff line change
@@ -209,22 +209,6 @@ func TestValidator_InvalidFields(t *testing.T) {
209209
}},
210210
},
211211
wantErr: ErrInvalidField,
212-
}, {
213-
name: "processor workers is negative",
214-
config: Pipeline{
215-
ID: "pipeline1",
216-
Status: "running",
217-
Name: "pipeline1",
218-
Description: "desc1",
219-
Processors: []Processor{{
220-
ID: "proc1",
221-
Plugin: "js",
222-
Settings: map[string]string{},
223-
// invalid field
224-
Workers: -1,
225-
}},
226-
},
227-
wantErr: ErrInvalidField,
228212
}}
229213
for _, tt := range tests {
230214
t.Run(tt.name, func(t *testing.T) {

pkg/provisioning/config/yaml/parser_test.go

+8-6
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ func TestParser_V1_Success(t *testing.T) {
3535
is := is.New(t)
3636
parser := NewParser(log.Nop())
3737
filepath := "./v1/testdata/pipelines1-success.yml"
38-
intPtr := func(i int) *int { return &i }
38+
39+
uint64Ptr := func(i uint64) *uint64 { return &i }
3940
want := Configurations{
4041
v1.Configuration{
4142
Version: "1.0",
@@ -78,8 +79,8 @@ func TestParser_V1_Success(t *testing.T) {
7879
Settings: map[string]string{
7980
"foo": "bar",
8081
},
81-
WindowSize: intPtr(4),
82-
WindowNackThreshold: intPtr(2),
82+
WindowSize: uint64Ptr(4),
83+
WindowNackThreshold: uint64Ptr(2),
8384
},
8485
},
8586
},
@@ -274,7 +275,8 @@ func TestParser_V2_Success(t *testing.T) {
274275
is := is.New(t)
275276
parser := NewParser(log.Nop())
276277
filepath := "./v2/testdata/pipelines1-success.yml"
277-
intPtr := func(i int) *int { return &i }
278+
279+
uint64Ptr := func(i uint64) *uint64 { return &i }
278280
want := Configurations{
279281
v2.Configuration{
280282
Version: "2.2",
@@ -321,8 +323,8 @@ func TestParser_V2_Success(t *testing.T) {
321323
Settings: map[string]string{
322324
"foo": "bar",
323325
},
324-
WindowSize: intPtr(4),
325-
WindowNackThreshold: intPtr(2),
326+
WindowSize: uint64Ptr(4),
327+
WindowNackThreshold: uint64Ptr(2),
326328
},
327329
},
328330
},

pkg/provisioning/config/yaml/v1/model.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,14 @@ type Connector struct {
6868
type Processor struct {
6969
Type string `yaml:"type"`
7070
Settings map[string]string `yaml:"settings"`
71-
Workers int `yaml:"workers"`
71+
Workers uint64 `yaml:"workers"`
7272
}
7373

7474
type DLQ struct {
7575
Plugin string `yaml:"plugin"`
7676
Settings map[string]string `yaml:"settings"`
77-
WindowSize *int `yaml:"window-size"`
78-
WindowNackThreshold *int `yaml:"window-nack-threshold"`
77+
WindowSize *uint64
78+
WindowNackThreshold *uint64 `yaml:"window-nack-threshold"`
7979
}
8080

8181
func (c Configuration) ToConfig() []config.Pipeline {

0 commit comments

Comments
 (0)