diff --git a/internal/topo/node/window_op.go b/internal/topo/node/window_op.go index 88d426c43b..15114330d9 100644 --- a/internal/topo/node/window_op.go +++ b/internal/topo/node/window_op.go @@ -608,11 +608,21 @@ func (o *WindowOperator) handleInputs(ctx api.StreamContext, inputs []*xsql.Tupl } func (o *WindowOperator) gcInputs(inputs []*xsql.Tuple, triggerTime time.Time, ctx api.StreamContext) []*xsql.Tuple { - var discard []*xsql.Tuple - inputs, discard, _ = o.handleInputs(ctx, inputs, triggerTime) - ctx.GetLogger().Debugf("after scan %v", inputs) - o.handleTraceDiscardTuple(ctx, discard) - return inputs + length := o.window.Length + o.window.Delay + gcIndex := -1 + for i, tuple := range inputs { + if tuple.Timestamp.Add(length).Compare(triggerTime) >= 0 { + break + } + gcIndex = i + } + if gcIndex == len(inputs)-1 { + return inputs[:0] + } + if gcIndex == -1 { + return inputs + } + return inputs[gcIndex+1:] } func (o *WindowOperator) scan(inputs []*xsql.Tuple, triggerTime time.Time, ctx api.StreamContext) []*xsql.Tuple {