Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(schema): simple sink schema support #3500

Merged
merged 1 commit into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions internal/converter/converter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
// Copyright 2022-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,7 +43,7 @@
modules.RegisterConverter(message.FormatUrlEncoded, func(_ api.StreamContext, _ string, _ map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error) {
return urlencoded.NewConverter(props)
})
modules.RegisterWriterConverter(message.FormatDelimited, func(ctx api.StreamContext, avscPath string, props map[string]any) (message.ConvertWriter, error) {
modules.RegisterWriterConverter(message.FormatDelimited, func(ctx api.StreamContext, avscPath string, _ map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error) {
return delimited.NewCsvWriter(ctx, props)
})
}
Expand All @@ -65,11 +65,11 @@
return nil, fmt.Errorf("format type %s not supported", t)
}

func GetConvertWriter(ctx api.StreamContext, format string, schemaId string, props map[string]any) (message.ConvertWriter, error) {
func GetConvertWriter(ctx api.StreamContext, format string, schemaId string, schema map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error) {
if cw, ok := modules.ConvertWriters[format]; ok {
return cw(ctx, schemaId, props)
return cw(ctx, schemaId, schema, props)

Check warning on line 70 in internal/converter/converter.go

View check run for this annotation

Codecov / codecov/patch

internal/converter/converter.go#L70

Added line #L70 was not covered by tests
}
c, err := GetOrCreateConverter(ctx, format, schemaId, nil, props)
c, err := GetOrCreateConverter(ctx, format, schemaId, schema, props)
if err != nil {
return nil, err
}
Expand Down
20 changes: 19 additions & 1 deletion internal/server/rule_manager.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
// Copyright 2021-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,8 @@
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -208,6 +210,7 @@
}
deleteRuleMetrics(name)
}
deleteRuleData(name)
return err
}

Expand Down Expand Up @@ -483,3 +486,18 @@
promMetrics.RemoveRuleStatus(name)
}
}

func deleteRuleData(name string) {
dataLoc, err := conf.GetDataLoc()
if err != nil {
conf.Log.Errorf("delete rule data error: %v", err)
return
}

Check warning on line 495 in internal/server/rule_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/server/rule_manager.go#L493-L495

Added lines #L493 - L495 were not covered by tests
ruleDataPath := filepath.Join(dataLoc, "rule_"+name)
err = os.RemoveAll(ruleDataPath)
if err != nil {
conf.Log.Errorf("delete rule data error: %v", err)

Check warning on line 499 in internal/server/rule_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/server/rule_manager.go#L499

Added line #L499 was not covered by tests
} else {
conf.Log.Infof("delete rule data: %s", ruleDataPath)
}
}
26 changes: 25 additions & 1 deletion internal/topo/context/default.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
// Copyright 2022-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -252,6 +252,30 @@ func (c *DefaultContext) WithInstance(instanceId int) api.StreamContext {
}
}

func (c *DefaultContext) WithRuleId(ruleId string) api.StreamContext {
return &DefaultContext{
instanceId: c.instanceId,
ruleId: ruleId,
opId: c.opId,
ctx: c.ctx,
state: c.state,
isTraceEnabled: c.isTraceEnabled,
strategy: c.strategy,
}
}

func (c *DefaultContext) WithOpId(opId string) api.StreamContext {
return &DefaultContext{
instanceId: c.instanceId,
ruleId: c.ruleId,
opId: opId,
ctx: c.ctx,
state: c.state,
isTraceEnabled: c.isTraceEnabled,
strategy: c.strategy,
}
}

func (c *DefaultContext) WithCancel() (api.StreamContext, context.CancelFunc) {
ctx, cancel := context.WithCancel(c.ctx)
return &DefaultContext{
Expand Down
11 changes: 7 additions & 4 deletions internal/topo/node/batch_writer_op.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
// Copyright 2024-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,9 @@ import (

"github.com/lf-edge/ekuiper/v2/internal/converter"
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
"github.com/lf-edge/ekuiper/v2/internal/topo/context"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/ast"
"github.com/lf-edge/ekuiper/v2/pkg/infra"
"github.com/lf-edge/ekuiper/v2/pkg/message"
)
Expand All @@ -38,12 +40,13 @@ type BatchWriterOp struct {
lastRow any
}

func NewBatchWriterOp(ctx api.StreamContext, name string, rOpt *def.RuleOption, sc *SinkConf) (*BatchWriterOp, error) {
c, err := converter.GetConvertWriter(ctx, sc.Format, sc.SchemaId, nil)
func NewBatchWriterOp(ctx api.StreamContext, name string, rOpt *def.RuleOption, schema map[string]*ast.JsonStreamField, sc *SinkConf) (*BatchWriterOp, error) {
nctx := ctx.(*context.DefaultContext).WithOpId(name)
c, err := converter.GetConvertWriter(nctx, sc.Format, sc.SchemaId, schema, nil)
if err != nil {
return nil, err
}
err = c.New(ctx)
err = c.New(nctx)
if err != nil {
return nil, fmt.Errorf("writer fail to initialize new file: %s", err)
}
Expand Down
19 changes: 15 additions & 4 deletions internal/topo/planner/planner.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
// Copyright 2022-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -76,13 +76,23 @@ func PlanSQLWithSourcesAndSinks(rule *def.Rule, mockSourcesProp map[string]map[s
if err != nil {
return nil, err
}
tp, err := createTopo(rule, lp, mockSourcesProp, streamsFromStmt)
tp, err := createTopo(rule, lp, mockSourcesProp, streamsFromStmt, getSinkSchema(stmt))
if err != nil {
return nil, err
}
return tp, nil
}

func getSinkSchema(stmt *ast.SelectStatement) map[string]*ast.JsonStreamField {
schema := make(map[string]*ast.JsonStreamField, len(stmt.Fields))
for _, field := range stmt.Fields {
if field.GetName() != "*" {
schema[field.GetName()] = nil
}
}
return schema
}

func validateStmt(stmt *ast.SelectStatement) error {
var vErr error
ast.WalkFunc(stmt, func(n ast.Node) bool {
Expand All @@ -97,7 +107,7 @@ func validateStmt(stmt *ast.SelectStatement) error {
return vErr
}

func createTopo(rule *def.Rule, lp LogicalPlan, mockSourcesProp map[string]map[string]any, streamsFromStmt []string) (t *topo.Topo, err error) {
func createTopo(rule *def.Rule, lp LogicalPlan, mockSourcesProp map[string]map[string]any, streamsFromStmt []string, schema map[string]*ast.JsonStreamField) (t *topo.Topo, err error) {
defer func() {
if err != nil {
err = errorx.NewWithCode(errorx.ExecutorError, err.Error())
Expand All @@ -117,7 +127,7 @@ func createTopo(rule *def.Rule, lp LogicalPlan, mockSourcesProp map[string]map[s
}
inputs := []node.Emitter{input}
// Add actions
err = buildActions(tp, rule, inputs, len(streamsFromStmt))
err = buildActions(tp, rule, inputs, len(streamsFromStmt), schema)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -187,6 +197,7 @@ func ExplainFromLogicalPlan(lp LogicalPlan, ruleID string) (string, error) {
return res, nil
}

// return the last schema if there are multiple sources
func buildOps(lp LogicalPlan, tp *topo.Topo, options *def.RuleOption, sources map[string]map[string]any, streamsFromStmt []string, index int) (node.Emitter, int, error) {
var inputs []node.Emitter
newIndex := index
Expand Down
4 changes: 2 additions & 2 deletions internal/topo/planner/planner_graph.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
// Copyright 2022-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -109,7 +109,7 @@ func PlanByGraph(rule *def.Rule) (*topo.Topo, error) {
if _, ok := ruleGraph.Topo.Edges[nodeName]; ok {
return nil, fmt.Errorf("sink %s has edge", nodeName)
}
cn, err := SinkToComp(tp, gn.NodeType, nodeName, gn.Props, rule, len(sourceNames))
cn, err := SinkToComp(tp, gn.NodeType, nodeName, gn.Props, rule, len(sourceNames), nil)
if err != nil {
return nil, err
}
Expand Down
15 changes: 8 additions & 7 deletions internal/topo/planner/planner_sink.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
// Copyright 2024-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -26,13 +26,14 @@ import (
"github.com/lf-edge/ekuiper/v2/internal/topo"
"github.com/lf-edge/ekuiper/v2/internal/topo/node"
"github.com/lf-edge/ekuiper/v2/internal/topo/node/conf"
"github.com/lf-edge/ekuiper/v2/pkg/ast"
"github.com/lf-edge/ekuiper/v2/pkg/model"
)

// SinkPlanner is the planner for sink node. It transforms logical sink plan to multiple physical nodes.
// It will split the sink plan into multiple sink nodes according to its sink configurations.

func buildActions(tp *topo.Topo, rule *def.Rule, inputs []node.Emitter, streamCount int) error {
func buildActions(tp *topo.Topo, rule *def.Rule, inputs []node.Emitter, streamCount int, schema map[string]*ast.JsonStreamField) error {
for i, m := range rule.Actions {
for name, action := range m {
props, ok := action.(map[string]any)
Expand All @@ -44,7 +45,7 @@ func buildActions(tp *topo.Topo, rule *def.Rule, inputs []node.Emitter, streamCo
return err
}
sinkName := fmt.Sprintf("%s_%d", name, i)
cn, err := SinkToComp(tp, name, sinkName, props, rule, streamCount)
cn, err := SinkToComp(tp, name, sinkName, props, rule, streamCount, schema)
if err != nil {
return err
}
Expand Down Expand Up @@ -75,7 +76,7 @@ func PlanSinkOps(tp *topo.Topo, inputs []node.Emitter, cn node.CompNode) {
}
}

func SinkToComp(tp *topo.Topo, sinkType string, sinkName string, props map[string]any, rule *def.Rule, streamCount int) (node.CompNode, error) {
func SinkToComp(tp *topo.Topo, sinkType string, sinkName string, props map[string]any, rule *def.Rule, streamCount int, schema map[string]*ast.JsonStreamField) (node.CompNode, error) {
s, _ := io.Sink(sinkType)
if s == nil {
return nil, fmt.Errorf("sink %s is not defined", sinkType)
Expand All @@ -86,7 +87,7 @@ func SinkToComp(tp *topo.Topo, sinkType string, sinkName string, props map[strin
}
templates := findTemplateProps(props)
// Split sink node
sinkOps, err := splitSink(tp, s, sinkName, rule.Options, commonConf, templates)
sinkOps, err := splitSink(tp, s, sinkName, rule.Options, commonConf, templates, schema)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -167,7 +168,7 @@ func findTemplateProps(props map[string]any) []string {
}

// Split sink node according to the sink configuration. Return the new input emitters.
func splitSink(tp *topo.Topo, s api.Sink, sinkName string, options *def.RuleOption, sc *node.SinkConf, templates []string) ([]node.TopNode, error) {
func splitSink(tp *topo.Topo, s api.Sink, sinkName string, options *def.RuleOption, sc *node.SinkConf, templates []string, schema map[string]*ast.JsonStreamField) ([]node.TopNode, error) {
index := 0
result := make([]node.TopNode, 0)
batchEnabled := sc.BatchSize > 0 || sc.LingerInterval > 0
Expand All @@ -189,7 +190,7 @@ func splitSink(tp *topo.Topo, s api.Sink, sinkName string, options *def.RuleOpti
index++
result = append(result, transformOp)
if batchEnabled {
batchWriterOp, err := node.NewBatchWriterOp(tp.GetContext(), fmt.Sprintf("%s_%d_batchWriter", sinkName, index), options, sc)
batchWriterOp, err := node.NewBatchWriterOp(tp.GetContext(), fmt.Sprintf("%s_%d_batchWriter", sinkName, index), options, schema, sc)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions internal/topo/planner/planner_sink_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
// Copyright 2024-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -257,7 +257,7 @@ func TestSinkPlan(t *testing.T) {
assert.NoError(t, err)
tp.AddSrc(n)
inputs := []node.Emitter{n}
err = buildActions(tp, c.rule, inputs, 1)
err = buildActions(tp, c.rule, inputs, 1, nil)
assert.NoError(t, err)
assert.Equal(t, c.topo, tp.GetTopo())
})
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestSinkPlanError(t *testing.T) {
assert.NoError(t, err)
tp.AddSrc(n)
inputs := []node.Emitter{n}
err = buildActions(tp, c.rule, inputs, 1)
err = buildActions(tp, c.rule, inputs, 1, nil)
assert.Error(t, err)
assert.Equal(t, c.err, err.Error())
})
Expand Down
5 changes: 3 additions & 2 deletions internal/topo/topo.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
// Copyright 2021-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -227,7 +227,8 @@ func (s *Topo) prepareContext() {
ctx := kctx.WithValue(kctx.RuleBackground(s.name), kctx.LoggerKey, contextLogger)
ctx = kctx.WithValue(ctx, kctx.RuleStartKey, timex.GetNowInMilli())
ctx = kctx.WithValue(ctx, kctx.RuleWaitGroupKey, s.opsWg)
s.ctx, s.cancel = ctx.WithCancel()
nctx := ctx.WithRuleId(s.name)
s.ctx, s.cancel = nctx.WithCancel()
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/modules/converter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
// Copyright 2024-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,7 +42,7 @@ func IsFormatSupported(format string) bool {
// ConvertWriters are sink converter to use together with batch
var ConvertWriters = map[string]ConvertWriterProvider{}

type ConvertWriterProvider func(ctx api.StreamContext, schemaId string, props map[string]any) (message.ConvertWriter, error)
type ConvertWriterProvider func(ctx api.StreamContext, schemaId string, logicalSchema map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error)

func RegisterWriterConverter(name string, provider ConvertWriterProvider) {
ConvertWriters[name] = provider
Expand Down
Loading