From c6a0cb1ef09894670b85675d436386ba7fd732f6 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 20 Jan 2025 11:51:24 +0800 Subject: [PATCH] fix: fix extract bufferLength (#3522) Signed-off-by: Song Gao --- internal/topo/node/source_node.go | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/internal/topo/node/source_node.go b/internal/topo/node/source_node.go index a82f946c2..2b03c3ee5 100644 --- a/internal/topo/node/source_node.go +++ b/internal/topo/node/source_node.go @@ -64,6 +64,25 @@ func NewSourceNode(name string, st ast.StreamType, op UnOperation, options *ast. } } +func extractBufferLength(props map[string]any, originValue int) int { + if props == nil { + return originValue + } + if c, ok := props["bufferLength"]; ok { + t, err := cast.ToInt(c, cast.STRICT) + if err == nil { + return t + } + } + if c, ok := props["bufferlength"]; ok { + t, err := cast.ToInt(c, cast.STRICT) + if err == nil { + return t + } + } + return originValue +} + func (m *SourceNode) SetProps(props map[string]interface{}) { m.props = props } @@ -90,14 +109,7 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) { } } bl := 102400 - if c, ok := props["bufferlength"]; ok { - if t, err := cast.ToInt(c, cast.STRICT); err != nil || t <= 0 { - logger.Warnf("invalid type for bufferLength property, should be positive integer but found %t", c) - } else { - bl = t - } - } - m.bufferLength = bl + m.bufferLength = extractBufferLength(props, bl) if m.streamType == ast.TypeTable { props["isTable"] = true }