Skip to content

Commit

Permalink
fix(window): fix sliding delay scan and test (#3425)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying authored Dec 5, 2024
1 parent 6a8e3f0 commit fd9ee57
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 2 deletions.
6 changes: 4 additions & 2 deletions internal/topo/node/window_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
if o.isMatchCondition(ctx, d) {
if o.window.Delay > 0 {
go func(ts time.Time) {
after := time.After(o.window.Delay)
after := timex.After(o.window.Delay)
select {
case <-after:
delayCh <- ts
Expand All @@ -358,7 +358,7 @@ func (o *WindowOperator) execProcessingWindow(ctx api.StreamContext, inputs []*x
}
} else {
// clear inputs if condition not matched
inputs = o.gcInputs(inputs, d.Timestamp, ctx)
inputs = o.gcInputs(inputs, d.Timestamp.Add(o.window.Delay), ctx)
}
case ast.SESSION_WINDOW:
if timeoutTicker != nil {
Expand Down Expand Up @@ -602,12 +602,14 @@ func (o *WindowOperator) handleInputs(ctx api.StreamContext, inputs []*xsql.Tupl
if nextleft < 0 {
return inputs[:0], inputs, content
}
ctx.GetLogger().Debugf("discard before %d", nextleft)
return inputs[nextleft:], inputs[:nextleft], content
}

func (o *WindowOperator) gcInputs(inputs []*xsql.Tuple, triggerTime time.Time, ctx api.StreamContext) []*xsql.Tuple {
var discard []*xsql.Tuple
inputs, discard, _ = o.handleInputs(ctx, inputs, triggerTime)
ctx.GetLogger().Debugf("after scan %v", inputs)
o.handleTraceDiscardTuple(ctx, discard)
return inputs
}
Expand Down
25 changes: 25 additions & 0 deletions internal/topo/topotest/window_rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,31 @@ func TestWindow(t *testing.T) {
"source_demo_0_records_out_total": int64(5),
},
},
{
Name: `TestSlidingDelay`,
Sql: `SELECT size,color FROM demo GROUP BY SlidingWindow(ss, 5, 1) Over (when size = 2)`,
R: [][]map[string]interface{}{
{
{
"size": 3,
"color": "red",
},
{
"size": 6,
"color": "blue",
},
{
"size": 2,
"color": "blue",
},
{
"size": 4,
"color": "yellow",
},
},
},
M: map[string]interface{}{},
},
}
HandleStream(true, streamList, t)
options := []*def.RuleOption{
Expand Down
4 changes: 4 additions & 0 deletions pkg/timex/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func GetTicker(duration time.Duration) *clock.Ticker {
return Clock.Ticker(duration)
}

func After(duration time.Duration) <-chan time.Time {
return Clock.After(duration)
}

func GetTimer(duration time.Duration) *clock.Timer {
return Clock.Timer(duration)
}
Expand Down

0 comments on commit fd9ee57

Please sign in to comment.