diff --git a/internal/topo/node/window_op.go b/internal/topo/node/window_op.go index 491e57781d..9381c22d00 100644 --- a/internal/topo/node/window_op.go +++ b/internal/topo/node/window_op.go @@ -345,7 +345,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x if o.isMatchCondition(ctx, d) { if o.window.Delay > 0 { go func(ts time.Time) { - after := time.After(o.window.Delay) + after := timex.After(o.window.Delay) select { case <-after: delayCh <- ts @@ -358,7 +358,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x } } else { // clear inputs if condition not matched - inputs = o.gcInputs(inputs, d.Timestamp, ctx) + inputs = o.gcInputs(inputs, d.Timestamp.Add(o.window.Delay), ctx) } case ast.SESSION_WINDOW: if timeoutTicker != nil { @@ -602,12 +602,14 @@ func (o *WindowOperator) handleInputs(ctx api.StreamContext, inputs []*xsql.Tupl if nextleft < 0 { return inputs[:0], inputs, content } + ctx.GetLogger().Debugf("discard before %d", nextleft) return inputs[nextleft:], inputs[:nextleft], content } func (o *WindowOperator) gcInputs(inputs []*xsql.Tuple, triggerTime time.Time, ctx api.StreamContext) []*xsql.Tuple { var discard []*xsql.Tuple inputs, discard, _ = o.handleInputs(ctx, inputs, triggerTime) + ctx.GetLogger().Debugf("after scan %v", inputs) o.handleTraceDiscardTuple(ctx, discard) return inputs } diff --git a/internal/topo/topotest/window_rule_test.go b/internal/topo/topotest/window_rule_test.go index 4c7631ae7c..0750feb07b 100644 --- a/internal/topo/topotest/window_rule_test.go +++ b/internal/topo/topotest/window_rule_test.go @@ -675,6 +675,31 @@ func TestWindow(t *testing.T) { "source_demo_0_records_out_total": int64(5), }, }, + { + Name: `TestSlidingDelay`, + Sql: `SELECT size,color FROM demo GROUP BY SlidingWindow(ss, 5, 1) Over (when size = 2)`, + R: [][]map[string]interface{}{ + { + { + "size": 3, + "color": "red", + }, + { + "size": 6, + "color": "blue", + }, + { + "size": 2, + "color": "blue", + }, + { + "size": 4, + "color": "yellow", + }, + }, + }, + M: map[string]interface{}{}, + }, } HandleStream(true, streamList, t) options := []*def.RuleOption{ diff --git a/pkg/timex/time.go b/pkg/timex/time.go index e7ea6986a5..b37e6a73a2 100644 --- a/pkg/timex/time.go +++ b/pkg/timex/time.go @@ -51,6 +51,10 @@ func GetTicker(duration time.Duration) *clock.Ticker { return Clock.Ticker(duration) } +func After(duration time.Duration) <-chan time.Time { + return Clock.After(duration) +} + func GetTimer(duration time.Duration) *clock.Timer { return Clock.Timer(duration) }