diff --git a/drr.go b/drr.go index 51c9ab5..df7ef67 100644 --- a/drr.go +++ b/drr.go @@ -20,16 +20,16 @@ var ( ErrContextIsNil = errors.New("ContextIsNil") ) -type flow struct { - c <-chan interface{} +type flow[T any] struct { + c <-chan T prio int } // DRR is a Deficit Round Robin scheduler, as detailed in // https://en.wikipedia.org/wiki/Deficit_round_robin. -type DRR struct { - flows []flow - outChan chan interface{} +type DRR[T any] struct { + flows []flow[T] + outChan chan T flowsToDelete []int } @@ -37,11 +37,11 @@ type DRR struct { // // The outChan must be non-nil, otherwise NewDRR returns // ErrChannelIsNil error. -func NewDRR(outChan chan interface{}) (*DRR, error) { +func NewDRR[T any](outChan chan T) (*DRR[T], error) { if outChan == nil { return nil, ErrChannelIsNil } - return &DRR{ + return &DRR[T]{ outChan: outChan, }, nil } @@ -52,26 +52,26 @@ func NewDRR(outChan chan interface{}) (*DRR, error) { // Input returns ErrChannelIsNil if input channel is nil. // Priority must be greater than 0, otherwise Input returns // ErrInvalidPriorityValue error. -func (d *DRR) Input(prio int, in <-chan interface{}) error { +func (d *DRR[T]) Input(prio int, in <-chan T) error { if prio <= 0 { return ErrInvalidPriorityValue } if in == nil { return ErrChannelIsNil } - d.flows = append(d.flows, flow{c: in, prio: prio}) + d.flows = append(d.flows, flow[T]{c: in, prio: prio}) return nil } // Start actually spawns the DRR goroutine. Once Start is called, -// goroutine starts forwarding from input channels previously registered +// the goroutine starts forwarding from input channels previously registered // through Input method to output channel. // // Start returns ContextIsNil error if ctx is nil. // // DRR goroutine exits when context.Context expires or when all the input // channels are closed. DRR goroutine closes the output channel upon termination. -func (d *DRR) Start(ctx context.Context) error { +func (d *DRR[T]) Start(ctx context.Context) error { if ctx == nil { return ErrContextIsNil } @@ -79,7 +79,7 @@ func (d *DRR) Start(ctx context.Context) error { defer close(d.outChan) for { // Wait for at least one channel to be ready - readyIndex, value, ok := getReadyChannel( + readyIndex, value, ok := d.getReadyChannel( ctx, d.flows) if readyIndex < 0 { @@ -136,13 +136,13 @@ func (d *DRR) Start(ctx context.Context) error { return nil } -func (d *DRR) prepareToUnregister(index int) { +func (d *DRR[T]) prepareToUnregister(index int) { d.flowsToDelete = append(d.flowsToDelete, index) } -func (d *DRR) unregisterFlows() bool { +func (d *DRR[T]) unregisterFlows() bool { oldFlows := d.flows - d.flows = make([]flow, 0, len(oldFlows)-len(d.flowsToDelete)) + d.flows = make([]flow[T], 0, len(oldFlows)-len(d.flowsToDelete)) oldFlowsLoop: for i, flow := range oldFlows { for _, index := range d.flowsToDelete { @@ -156,7 +156,7 @@ oldFlowsLoop: return len(d.flows) == 0 } -func getReadyChannel(ctx context.Context, flows []flow) (int, interface{}, bool) { +func (d *DRR[T]) getReadyChannel(ctx context.Context, flows []flow[T]) (int, T, bool) { cases := make([]reflect.SelectCase, 0, len(flows)+1) //First case is the termiantion channel for context cancellation c := reflect.SelectCase{ @@ -176,8 +176,9 @@ func getReadyChannel(ctx context.Context, flows []flow) (int, interface{}, bool) index, value, ok := reflect.Select(cases) //Termination channel if index == 0 { - return -1, nil, false + var zeroT T + return -1, zeroT, false } //Rescaling index (-1) because of additional termination channel - return index - 1, value.Interface(), ok + return index - 1, value.Interface().(T), ok } diff --git a/drr_test.go b/drr_test.go index 7cf2619..53b4122 100644 --- a/drr_test.go +++ b/drr_test.go @@ -14,16 +14,16 @@ const ( chanSize = 10 ) -func generator(prefix string, n int) chan interface{} { - payload := make([]interface{}, n) +func generator(prefix string, n int) chan string { + payload := make([]string, n) for i := 0; i < n; i++ { payload[i] = fmt.Sprintf("%s: %d", prefix, i) } return generatorWithPayload(payload) } -func generatorWithPayload(payload []interface{}) chan interface{} { - out := make(chan interface{}, chanSize) +func generatorWithPayload(payload []string) chan string { + out := make(chan string, chanSize) go func() { for _, msg := range payload { out <- msg @@ -33,25 +33,30 @@ func generatorWithPayload(payload []interface{}) chan interface{} { return out } -func TestDRR(t *testing.T) { - var drr *DRR - outChan := make(chan interface{}, 10) - Convey("Create DRR", t, func() { - drr, _ = NewDRR(outChan) +func TestNewDRR(t *testing.T) { + Convey("Create new DRR", t, func() { + outChan := make(chan string, 10) + drr, err := NewDRR(outChan) So(drr, ShouldNotEqual, nil) + So(err, ShouldEqual, nil) }) +} + +func TestDRR(t *testing.T) { + outChan := make(chan string, 10) + drr, _ := NewDRR(outChan) + Convey("Register flow", t, func() { flow1 := generator("flow1", 5) flow2 := generator("flow2", 5) drr.Input(2, flow1) drr.Input(1, flow2) }) + Convey("Check output", t, func() { drr.Start(context.TODO()) for out := range outChan { - s, ok := out.(string) - So(ok, ShouldEqual, true) - So(s, ShouldNotEqual, "") + So(out, ShouldNotEqual, "") } }) } @@ -59,17 +64,14 @@ func TestDRR(t *testing.T) { func TestIntegrityAndOrder(t *testing.T) { nFlows := 100 flowSize := 100 - var drr *DRR - outChan := make(chan interface{}, 10) - Convey("Create DRR", t, func() { - drr, _ = NewDRR(outChan) - So(drr, ShouldNotEqual, nil) - }) - var flows []chan interface{} - payloads := make(map[int][]interface{}) + outChan := make(chan string, 10) + drr, _ := NewDRR(outChan) + + var flows []chan string + payloads := make(map[int][]string) Convey("Prepare flow with known payload", t, func() { for flowID := 0; flowID < nFlows; flowID++ { - payload := make([]interface{}, 0, flowSize) + payload := make([]string, 0, flowSize) for x := 0; x < flowSize; x++ { msg := fmt.Sprintf("%d:%d", flowID, x) payload = append(payload, msg) @@ -78,20 +80,20 @@ func TestIntegrityAndOrder(t *testing.T) { flows = append(flows, generatorWithPayload(payload)) } }) + Convey("Register all flows", t, func() { for prio, f := range flows { drr.Input(prio+1, f) } }) + Convey("Check output w.r.t. known payloads", t, func() { drr.Start(context.TODO()) - outputPayloads := make(map[int][]interface{}) + outputPayloads := make(map[int][]string) for out := range outChan { - s, ok := out.(string) - So(ok, ShouldEqual, true) - So(s, ShouldNotEqual, "") - flowID := getFlowID(s) - outputPayloads[flowID] = append(outputPayloads[flowID], s) + So(out, ShouldNotEqual, "") + flowID := getFlowID(out) + outputPayloads[flowID] = append(outputPayloads[flowID], out) } So(len(outputPayloads), ShouldEqual, len(payloads)) @@ -119,12 +121,12 @@ func getFlowID(s string) int { func TestMeasureOutputRate(t *testing.T) { nFlows := 100 flowSize := 10000 - outChan := make(chan interface{}, flowSize) + outChan := make(chan int, flowSize) drr, _ := NewDRR(outChan) - var flows []chan interface{} + var flows []chan int Convey("Prepare flow with known payload", t, func() { for flowID := 0; flowID < nFlows; flowID++ { - inChan := make(chan interface{}, flowSize) + inChan := make(chan int, flowSize) for x := 0; x < flowSize; x++ { inChan <- flowID } @@ -148,8 +150,7 @@ func TestMeasureOutputRate(t *testing.T) { drr.Start(context.TODO()) hist := make(map[int]int) for i := 0; i < flowSize; i++ { - val := <-outChan - flowID := val.(int) + flowID := <-outChan hist[flowID]++ } @@ -162,19 +163,19 @@ func TestMeasureOutputRate(t *testing.T) { func TestErrorInput(t *testing.T) { Convey("Create DRR by passing nil output chan", t, func() { - drr, err := NewDRR(nil) + drr, err := NewDRR[int](nil) So(drr, ShouldEqual, nil) So(err, ShouldEqual, ErrChannelIsNil) }) Convey("Create DRR and pass wrong values in Input API", t, func() { - drr, _ := NewDRR(make(chan interface{})) - err := drr.Input(0, make(chan interface{})) + drr, _ := NewDRR(make(chan string)) + err := drr.Input(0, make(chan string)) So(err, ShouldEqual, ErrInvalidPriorityValue) err = drr.Input(1, nil) So(err, ShouldEqual, ErrChannelIsNil) }) Convey("Create DRR and pass wrong values in Input API", t, func() { - drr, _ := NewDRR(make(chan interface{})) + drr, _ := NewDRR(make(chan string)) err := drr.Start(nil) So(err, ShouldEqual, ErrContextIsNil) }) @@ -182,19 +183,19 @@ func TestErrorInput(t *testing.T) { func TestContextExipre(t *testing.T) { Convey("Create an empty DRR, start it and cancel the context", t, func() { - outChan := make(chan interface{}) + outChan := make(chan string) drr, _ := NewDRR(outChan) ctx, cancel := context.WithCancel(context.Background()) err := drr.Start(ctx) So(err, ShouldEqual, nil) cancel() val, ok := <-outChan - So(val, ShouldEqual, nil) + So(val, ShouldEqual, "") So(ok, ShouldEqual, false) }) Convey("Create DRR with one flow, start it and cancel the context", t, func() { - outChan := make(chan interface{}) + outChan := make(chan string) drr, _ := NewDRR(outChan) flow := generator("flow", 5) drr.Input(10, flow) @@ -202,21 +203,21 @@ func TestContextExipre(t *testing.T) { err := drr.Start(ctx) So(err, ShouldEqual, nil) val, ok := <-outChan - So(val, ShouldNotEqual, nil) + So(val, ShouldNotEqual, "") So(ok, ShouldEqual, true) cancel() val, ok = <-outChan - So(val, ShouldNotEqual, nil) + So(val, ShouldNotEqual, "") So(ok, ShouldEqual, true) val, ok = <-outChan - So(val, ShouldEqual, nil) + So(val, ShouldEqual, "") So(ok, ShouldEqual, false) }) } func BenchmarkOverheadUnloaded(b *testing.B) { - outChan := make(chan interface{}) - inChan := make(chan interface{}) + outChan := make(chan int) + inChan := make(chan int) drr, _ := NewDRR(outChan) drr.Input(10, inChan) drr.Start(context.TODO()) @@ -229,12 +230,12 @@ func BenchmarkOverheadUnloaded(b *testing.B) { func ExampleDRR() { chanSize := 5 - outChan := make(chan interface{}, chanSize) + outChan := make(chan string, chanSize) // Create new DRR drr, _ := NewDRR(outChan) // First input channel with priority = 3 - inChan1 := make(chan interface{}, chanSize) + inChan1 := make(chan string, chanSize) prio1 := 3 // Prepare known workload for i := 0; i < chanSize; i++ { @@ -244,7 +245,7 @@ func ExampleDRR() { drr.Input(prio1, inChan1) // Second input channel with priority = 2 - inChan2 := make(chan interface{}, chanSize) + inChan2 := make(chan string, chanSize) prio2 := 2 // Prepare known workload for i := 0; i < chanSize; i++ { @@ -261,8 +262,7 @@ func ExampleDRR() { // with priority 3 and 2/5 should come from second // channel with priority 2. for i := 0; i < chanSize; i++ { - val := <-outChan - str := val.(string) + str := <-outChan fmt.Println(str) } diff --git a/go.mod b/go.mod index 28d1bae..d13b61d 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,11 @@ module github.com/bigmikes/drr -go 1.13 +go 1.19 + +require github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 require ( - github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 - golang.org/x/net v0.0.0-20191101175033-0deb6923b6d9 // indirect - golang.org/x/tools v0.0.0-20191101200257-8dbcdeb83d3f // indirect + github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 // indirect + github.com/jtolds/gls v4.20.0+incompatible // indirect + github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d // indirect ) diff --git a/go.sum b/go.sum index f68bf3b..6a9ccb1 100644 --- a/go.sum +++ b/go.sum @@ -7,16 +7,7 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1 github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 h1:WN9BUFbdyOsSH/XohnWpXOlq9NBD5sGAB2FciQMUEe8= github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20191101175033-0deb6923b6d9 h1:DPz9iiH3YoKiKhX/ijjoZvT0VFwK2c6CWYWQ7Zyr8TU= -golang.org/x/net v0.0.0-20191101175033-0deb6923b6d9/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/tools v0.0.0-20190328211700-ab21143f2384 h1:TFlARGu6Czu1z7q93HTxcP1P+/ZFC/IKythI5RzrnRg= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20191101200257-8dbcdeb83d3f h1:+QO45yvqhfD79HVNFPAgvstYLFye8zA+rd0mHFsGV9s= -golang.org/x/tools v0.0.0-20191101200257-8dbcdeb83d3f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=