Skip to content

Commit

Permalink
fix: fix extract bufferLength (#3522)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <[email protected]>
  • Loading branch information
Yisaer authored Jan 20, 2025
1 parent a936944 commit c6a0cb1
Showing 1 changed file with 20 additions and 8 deletions.
28 changes: 20 additions & 8 deletions internal/topo/node/source_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,25 @@ func NewSourceNode(name string, st ast.StreamType, op UnOperation, options *ast.
}
}

func extractBufferLength(props map[string]any, originValue int) int {
if props == nil {
return originValue
}
if c, ok := props["bufferLength"]; ok {
t, err := cast.ToInt(c, cast.STRICT)
if err == nil {
return t
}
}
if c, ok := props["bufferlength"]; ok {
t, err := cast.ToInt(c, cast.STRICT)
if err == nil {
return t
}
}
return originValue
}

func (m *SourceNode) SetProps(props map[string]interface{}) {
m.props = props
}
Expand All @@ -90,14 +109,7 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
}
}
bl := 102400
if c, ok := props["bufferlength"]; ok {
if t, err := cast.ToInt(c, cast.STRICT); err != nil || t <= 0 {
logger.Warnf("invalid type for bufferLength property, should be positive integer but found %t", c)
} else {
bl = t
}
}
m.bufferLength = bl
m.bufferLength = extractBufferLength(props, bl)
if m.streamType == ast.TypeTable {
props["isTable"] = true
}
Expand Down

0 comments on commit c6a0cb1

Please sign in to comment.