From 9c5097ad289c4de41c63117183462a1aec110531 Mon Sep 17 00:00:00 2001 From: Jiyong Huang Date: Fri, 10 Jan 2025 13:08:35 +0800 Subject: [PATCH] chore(schema): simple sink schema support Signed-off-by: Jiyong Huang --- internal/converter/converter.go | 10 ++++----- internal/server/rule_manager.go | 20 ++++++++++++++++- internal/topo/context/default.go | 26 +++++++++++++++++++++- internal/topo/node/batch_writer_op.go | 11 +++++---- internal/topo/planner/planner.go | 19 ++++++++++++---- internal/topo/planner/planner_graph.go | 4 ++-- internal/topo/planner/planner_sink.go | 15 +++++++------ internal/topo/planner/planner_sink_test.go | 6 ++--- internal/topo/topo.go | 5 +++-- pkg/modules/converter.go | 4 ++-- 10 files changed, 89 insertions(+), 31 deletions(-) diff --git a/internal/converter/converter.go b/internal/converter/converter.go index fd2e28c823..f0c3bf3ccf 100644 --- a/internal/converter/converter.go +++ b/internal/converter/converter.go @@ -1,4 +1,4 @@ -// Copyright 2022-2024 EMQ Technologies Co., Ltd. +// Copyright 2022-2025 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -43,7 +43,7 @@ func init() { modules.RegisterConverter(message.FormatUrlEncoded, func(_ api.StreamContext, _ string, _ map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error) { return urlencoded.NewConverter(props) }) - modules.RegisterWriterConverter(message.FormatDelimited, func(ctx api.StreamContext, avscPath string, props map[string]any) (message.ConvertWriter, error) { + modules.RegisterWriterConverter(message.FormatDelimited, func(ctx api.StreamContext, avscPath string, _ map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error) { return delimited.NewCsvWriter(ctx, props) }) } @@ -65,11 +65,11 @@ func GetOrCreateConverter(ctx api.StreamContext, format string, schemaId string, return nil, fmt.Errorf("format type %s not supported", t) } -func GetConvertWriter(ctx api.StreamContext, format string, schemaId string, props map[string]any) (message.ConvertWriter, error) { +func GetConvertWriter(ctx api.StreamContext, format string, schemaId string, schema map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error) { if cw, ok := modules.ConvertWriters[format]; ok { - return cw(ctx, schemaId, props) + return cw(ctx, schemaId, schema, props) } - c, err := GetOrCreateConverter(ctx, format, schemaId, nil, props) + c, err := GetOrCreateConverter(ctx, format, schemaId, schema, props) if err != nil { return nil, err } diff --git a/internal/server/rule_manager.go b/internal/server/rule_manager.go index 1dc3bd99d7..f5759d2d6b 100644 --- a/internal/server/rule_manager.go +++ b/internal/server/rule_manager.go @@ -1,4 +1,4 @@ -// Copyright 2021-2024 EMQ Technologies Co., Ltd. +// Copyright 2021-2025 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,6 +17,8 @@ package server import ( "encoding/json" "fmt" + "os" + "path/filepath" "sort" "strings" "sync" @@ -208,6 +210,7 @@ func (rr *RuleRegistry) DeleteRule(name string) error { } deleteRuleMetrics(name) } + deleteRuleData(name) return err } @@ -483,3 +486,18 @@ func deleteRuleMetrics(name string) { promMetrics.RemoveRuleStatus(name) } } + +func deleteRuleData(name string) { + dataLoc, err := conf.GetDataLoc() + if err != nil { + conf.Log.Errorf("delete rule data error: %v", err) + return + } + ruleDataPath := filepath.Join(dataLoc, "rule_"+name) + err = os.RemoveAll(ruleDataPath) + if err != nil { + conf.Log.Errorf("delete rule data error: %v", err) + } else { + conf.Log.Infof("delete rule data: %s", ruleDataPath) + } +} diff --git a/internal/topo/context/default.go b/internal/topo/context/default.go index 3851114cd2..7794d1a2bc 100644 --- a/internal/topo/context/default.go +++ b/internal/topo/context/default.go @@ -1,4 +1,4 @@ -// Copyright 2022-2024 EMQ Technologies Co., Ltd. +// Copyright 2022-2025 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -252,6 +252,30 @@ func (c *DefaultContext) WithInstance(instanceId int) api.StreamContext { } } +func (c *DefaultContext) WithRuleId(ruleId string) api.StreamContext { + return &DefaultContext{ + instanceId: c.instanceId, + ruleId: ruleId, + opId: c.opId, + ctx: c.ctx, + state: c.state, + isTraceEnabled: c.isTraceEnabled, + strategy: c.strategy, + } +} + +func (c *DefaultContext) WithOpId(opId string) api.StreamContext { + return &DefaultContext{ + instanceId: c.instanceId, + ruleId: c.ruleId, + opId: opId, + ctx: c.ctx, + state: c.state, + isTraceEnabled: c.isTraceEnabled, + strategy: c.strategy, + } +} + func (c *DefaultContext) WithCancel() (api.StreamContext, context.CancelFunc) { ctx, cancel := context.WithCancel(c.ctx) return &DefaultContext{ diff --git a/internal/topo/node/batch_writer_op.go b/internal/topo/node/batch_writer_op.go index 249e6c40e7..2366ed3466 100644 --- a/internal/topo/node/batch_writer_op.go +++ b/internal/topo/node/batch_writer_op.go @@ -1,4 +1,4 @@ -// Copyright 2024 EMQ Technologies Co., Ltd. +// Copyright 2024-2025 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,7 +22,9 @@ import ( "github.com/lf-edge/ekuiper/v2/internal/converter" "github.com/lf-edge/ekuiper/v2/internal/pkg/def" + "github.com/lf-edge/ekuiper/v2/internal/topo/context" "github.com/lf-edge/ekuiper/v2/internal/xsql" + "github.com/lf-edge/ekuiper/v2/pkg/ast" "github.com/lf-edge/ekuiper/v2/pkg/infra" "github.com/lf-edge/ekuiper/v2/pkg/message" ) @@ -38,12 +40,13 @@ type BatchWriterOp struct { lastRow any } -func NewBatchWriterOp(ctx api.StreamContext, name string, rOpt *def.RuleOption, sc *SinkConf) (*BatchWriterOp, error) { - c, err := converter.GetConvertWriter(ctx, sc.Format, sc.SchemaId, nil) +func NewBatchWriterOp(ctx api.StreamContext, name string, rOpt *def.RuleOption, schema map[string]*ast.JsonStreamField, sc *SinkConf) (*BatchWriterOp, error) { + nctx := ctx.(*context.DefaultContext).WithOpId(name) + c, err := converter.GetConvertWriter(nctx, sc.Format, sc.SchemaId, schema, nil) if err != nil { return nil, err } - err = c.New(ctx) + err = c.New(nctx) if err != nil { return nil, fmt.Errorf("writer fail to initialize new file: %s", err) } diff --git a/internal/topo/planner/planner.go b/internal/topo/planner/planner.go index 17dec8faf8..abd40bd7f2 100644 --- a/internal/topo/planner/planner.go +++ b/internal/topo/planner/planner.go @@ -1,4 +1,4 @@ -// Copyright 2022-2024 EMQ Technologies Co., Ltd. +// Copyright 2022-2025 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -76,13 +76,23 @@ func PlanSQLWithSourcesAndSinks(rule *def.Rule, mockSourcesProp map[string]map[s if err != nil { return nil, err } - tp, err := createTopo(rule, lp, mockSourcesProp, streamsFromStmt) + tp, err := createTopo(rule, lp, mockSourcesProp, streamsFromStmt, getSinkSchema(stmt)) if err != nil { return nil, err } return tp, nil } +func getSinkSchema(stmt *ast.SelectStatement) map[string]*ast.JsonStreamField { + schema := make(map[string]*ast.JsonStreamField, len(stmt.Fields)) + for _, field := range stmt.Fields { + if field.GetName() != "*" { + schema[field.GetName()] = nil + } + } + return schema +} + func validateStmt(stmt *ast.SelectStatement) error { var vErr error ast.WalkFunc(stmt, func(n ast.Node) bool { @@ -97,7 +107,7 @@ func validateStmt(stmt *ast.SelectStatement) error { return vErr } -func createTopo(rule *def.Rule, lp LogicalPlan, mockSourcesProp map[string]map[string]any, streamsFromStmt []string) (t *topo.Topo, err error) { +func createTopo(rule *def.Rule, lp LogicalPlan, mockSourcesProp map[string]map[string]any, streamsFromStmt []string, schema map[string]*ast.JsonStreamField) (t *topo.Topo, err error) { defer func() { if err != nil { err = errorx.NewWithCode(errorx.ExecutorError, err.Error()) @@ -117,7 +127,7 @@ func createTopo(rule *def.Rule, lp LogicalPlan, mockSourcesProp map[string]map[s } inputs := []node.Emitter{input} // Add actions - err = buildActions(tp, rule, inputs, len(streamsFromStmt)) + err = buildActions(tp, rule, inputs, len(streamsFromStmt), schema) if err != nil { return nil, err } @@ -187,6 +197,7 @@ func ExplainFromLogicalPlan(lp LogicalPlan, ruleID string) (string, error) { return res, nil } +// return the last schema if there are multiple sources func buildOps(lp LogicalPlan, tp *topo.Topo, options *def.RuleOption, sources map[string]map[string]any, streamsFromStmt []string, index int) (node.Emitter, int, error) { var inputs []node.Emitter newIndex := index diff --git a/internal/topo/planner/planner_graph.go b/internal/topo/planner/planner_graph.go index 353bbdd542..991b177b78 100644 --- a/internal/topo/planner/planner_graph.go +++ b/internal/topo/planner/planner_graph.go @@ -1,4 +1,4 @@ -// Copyright 2022-2024 EMQ Technologies Co., Ltd. +// Copyright 2022-2025 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -109,7 +109,7 @@ func PlanByGraph(rule *def.Rule) (*topo.Topo, error) { if _, ok := ruleGraph.Topo.Edges[nodeName]; ok { return nil, fmt.Errorf("sink %s has edge", nodeName) } - cn, err := SinkToComp(tp, gn.NodeType, nodeName, gn.Props, rule, len(sourceNames)) + cn, err := SinkToComp(tp, gn.NodeType, nodeName, gn.Props, rule, len(sourceNames), nil) if err != nil { return nil, err } diff --git a/internal/topo/planner/planner_sink.go b/internal/topo/planner/planner_sink.go index c607177071..9e1d16c876 100644 --- a/internal/topo/planner/planner_sink.go +++ b/internal/topo/planner/planner_sink.go @@ -1,4 +1,4 @@ -// Copyright 2024 EMQ Technologies Co., Ltd. +// Copyright 2024-2025 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -26,13 +26,14 @@ import ( "github.com/lf-edge/ekuiper/v2/internal/topo" "github.com/lf-edge/ekuiper/v2/internal/topo/node" "github.com/lf-edge/ekuiper/v2/internal/topo/node/conf" + "github.com/lf-edge/ekuiper/v2/pkg/ast" "github.com/lf-edge/ekuiper/v2/pkg/model" ) // SinkPlanner is the planner for sink node. It transforms logical sink plan to multiple physical nodes. // It will split the sink plan into multiple sink nodes according to its sink configurations. -func buildActions(tp *topo.Topo, rule *def.Rule, inputs []node.Emitter, streamCount int) error { +func buildActions(tp *topo.Topo, rule *def.Rule, inputs []node.Emitter, streamCount int, schema map[string]*ast.JsonStreamField) error { for i, m := range rule.Actions { for name, action := range m { props, ok := action.(map[string]any) @@ -44,7 +45,7 @@ func buildActions(tp *topo.Topo, rule *def.Rule, inputs []node.Emitter, streamCo return err } sinkName := fmt.Sprintf("%s_%d", name, i) - cn, err := SinkToComp(tp, name, sinkName, props, rule, streamCount) + cn, err := SinkToComp(tp, name, sinkName, props, rule, streamCount, schema) if err != nil { return err } @@ -75,7 +76,7 @@ func PlanSinkOps(tp *topo.Topo, inputs []node.Emitter, cn node.CompNode) { } } -func SinkToComp(tp *topo.Topo, sinkType string, sinkName string, props map[string]any, rule *def.Rule, streamCount int) (node.CompNode, error) { +func SinkToComp(tp *topo.Topo, sinkType string, sinkName string, props map[string]any, rule *def.Rule, streamCount int, schema map[string]*ast.JsonStreamField) (node.CompNode, error) { s, _ := io.Sink(sinkType) if s == nil { return nil, fmt.Errorf("sink %s is not defined", sinkType) @@ -86,7 +87,7 @@ func SinkToComp(tp *topo.Topo, sinkType string, sinkName string, props map[strin } templates := findTemplateProps(props) // Split sink node - sinkOps, err := splitSink(tp, s, sinkName, rule.Options, commonConf, templates) + sinkOps, err := splitSink(tp, s, sinkName, rule.Options, commonConf, templates, schema) if err != nil { return nil, err } @@ -167,7 +168,7 @@ func findTemplateProps(props map[string]any) []string { } // Split sink node according to the sink configuration. Return the new input emitters. -func splitSink(tp *topo.Topo, s api.Sink, sinkName string, options *def.RuleOption, sc *node.SinkConf, templates []string) ([]node.TopNode, error) { +func splitSink(tp *topo.Topo, s api.Sink, sinkName string, options *def.RuleOption, sc *node.SinkConf, templates []string, schema map[string]*ast.JsonStreamField) ([]node.TopNode, error) { index := 0 result := make([]node.TopNode, 0) batchEnabled := sc.BatchSize > 0 || sc.LingerInterval > 0 @@ -189,7 +190,7 @@ func splitSink(tp *topo.Topo, s api.Sink, sinkName string, options *def.RuleOpti index++ result = append(result, transformOp) if batchEnabled { - batchWriterOp, err := node.NewBatchWriterOp(tp.GetContext(), fmt.Sprintf("%s_%d_batchWriter", sinkName, index), options, sc) + batchWriterOp, err := node.NewBatchWriterOp(tp.GetContext(), fmt.Sprintf("%s_%d_batchWriter", sinkName, index), options, schema, sc) if err != nil { return nil, err } diff --git a/internal/topo/planner/planner_sink_test.go b/internal/topo/planner/planner_sink_test.go index 390171a494..5ec7a692d3 100644 --- a/internal/topo/planner/planner_sink_test.go +++ b/internal/topo/planner/planner_sink_test.go @@ -1,4 +1,4 @@ -// Copyright 2024 EMQ Technologies Co., Ltd. +// Copyright 2024-2025 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -257,7 +257,7 @@ func TestSinkPlan(t *testing.T) { assert.NoError(t, err) tp.AddSrc(n) inputs := []node.Emitter{n} - err = buildActions(tp, c.rule, inputs, 1) + err = buildActions(tp, c.rule, inputs, 1, nil) assert.NoError(t, err) assert.Equal(t, c.topo, tp.GetTopo()) }) @@ -348,7 +348,7 @@ func TestSinkPlanError(t *testing.T) { assert.NoError(t, err) tp.AddSrc(n) inputs := []node.Emitter{n} - err = buildActions(tp, c.rule, inputs, 1) + err = buildActions(tp, c.rule, inputs, 1, nil) assert.Error(t, err) assert.Equal(t, c.err, err.Error()) }) diff --git a/internal/topo/topo.go b/internal/topo/topo.go index 35ae5a8ed2..30d7d4a02a 100644 --- a/internal/topo/topo.go +++ b/internal/topo/topo.go @@ -1,4 +1,4 @@ -// Copyright 2021-2024 EMQ Technologies Co., Ltd. +// Copyright 2021-2025 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -227,7 +227,8 @@ func (s *Topo) prepareContext() { ctx := kctx.WithValue(kctx.RuleBackground(s.name), kctx.LoggerKey, contextLogger) ctx = kctx.WithValue(ctx, kctx.RuleStartKey, timex.GetNowInMilli()) ctx = kctx.WithValue(ctx, kctx.RuleWaitGroupKey, s.opsWg) - s.ctx, s.cancel = ctx.WithCancel() + nctx := ctx.WithRuleId(s.name) + s.ctx, s.cancel = nctx.WithCancel() } } diff --git a/pkg/modules/converter.go b/pkg/modules/converter.go index 98710a43af..cf479197df 100644 --- a/pkg/modules/converter.go +++ b/pkg/modules/converter.go @@ -1,4 +1,4 @@ -// Copyright 2024 EMQ Technologies Co., Ltd. +// Copyright 2024-2025 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -42,7 +42,7 @@ func IsFormatSupported(format string) bool { // ConvertWriters are sink converter to use together with batch var ConvertWriters = map[string]ConvertWriterProvider{} -type ConvertWriterProvider func(ctx api.StreamContext, schemaId string, props map[string]any) (message.ConvertWriter, error) +type ConvertWriterProvider func(ctx api.StreamContext, schemaId string, logicalSchema map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error) func RegisterWriterConverter(name string, provider ConvertWriterProvider) { ConvertWriters[name] = provider