Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into support_schemales…
Browse files Browse the repository at this point in the history
…s_decode
  • Loading branch information
Yisaer committed Dec 15, 2023
2 parents f1ffdbe + 30d390e commit e36a82c
Show file tree
Hide file tree
Showing 8 changed files with 302 additions and 0 deletions.
23 changes: 23 additions & 0 deletions docs/en_US/configuration/global_configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,3 +337,26 @@ This section configures the portable plugin runtime.
## Ruleset Provision

Support file based stream and rule provisioning on startup. Users can put a [ruleset](../api/restapi/ruleset.md#ruleset-format) file named `init.json` into `data` directory to initialize the ruleset. The ruleset will only be import on the first startup of eKuiper.

## Configure FoundationDB as storage

eKuiper uses sqlite by default to store some meta-information. At the same time, eKuiper also supports using FoundationDB as meta-storage data. We can achieve this through the following steps:

* Confirm that the environment where eKuiper is located has installed and started FoundationDB, and confirm the storage path used by FoundationDB. Please refer to [Official Document](https://apple.github.io/foundationdb/administration.html#default-cluster-file)
* Confirm the APIVersion of the fdb c language library used by the eKuiper host, and replace the eKuiper dependent library with the corresponding version. Taking APIVersion 6.2.0 as an example, execute the following command in the eKuiper home directory:

```shell
go get github.com/apple/foundationdb/bindings/[email protected]
```

* Execute `make build_with_fdb` to compile kuiperd
* Modify the configuration as follows:

```yaml
store:
#Type of store that will be used for keeping state of the application
type: fdb
extStateType: fdb
fdb:
path: <path-of-fdb-cluster-file>
```
23 changes: 23 additions & 0 deletions docs/zh_CN/configuration/global_configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,3 +324,26 @@ SQL 中的 [get_keyed_state](../sqls/functions/other_functions.md#getkeyedstate)
## 初始化规则集

支持基于文件的流和规则的启动时配置。用户可以将名为 `init.json` 的[规则集](../api/restapi/ruleset.md#规则集格式)文件放入 `data` 目录,以初始化规则集。该规则集只在eKuiper 第一次启动时被导入。

## 配置 FoundationDB 作为存储

eKuiper 默认使用 sqlite 来存储一些元信息,同时 eKuiper 也支持使用 FoundationDB 来作为元存储数据,我们可以通过以下步骤实现:

* 确认 eKuiper 所在环境已经安装并启动 FoundationDB,并确认 FoundationDB 所使用的存储 Path. 可参考[官方文档](https://apple.github.io/foundationdb/administration.html#default-cluster-file)
* 确认 eKuiper 宿主机所使用的 fdb c 语言库的 APIVersion 版本,并将 eKuiper 依赖库替换为相应版本,以 APIVersion 6.2.0 为例,在 eKuiper 主目录执行以下命令:

```shell
go get github.com/apple/foundationdb/bindings/[email protected]
```

* 执行 `make build_with_fdb` 编译 kuiperd
* 在配置中按照如下修改:

```yaml
store:
#Type of store that will be used for keeping state of the application
type: fdb
extStateType: fdb
fdb:
path: <path-of-fdb-cluster-file>
```
3 changes: 3 additions & 0 deletions internal/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ type KuiperConf struct {
Sqlite struct {
Name string `yaml:"name"`
}
Fdb struct {
Path string `yaml:"path"`
}
}
Portable struct {
PythonBin string `yaml:"pythonBin"`
Expand Down
3 changes: 3 additions & 0 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ func getStoreConfigByKuiperConfig(c *conf.KuiperConf) (*store.StoreConf, error)
Path: dataDir,
Name: c.Store.Sqlite.Name,
},
FdbConfig: definition.FdbConfig{
Path: c.Store.Fdb.Path,
},
}
return sc, nil
}
Expand Down
120 changes: 120 additions & 0 deletions internal/topo/planner/dataSourcePlan.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,50 @@ func (p *DataSourcePlan) PruneColumns(fields []ast.Expr) error {
p.fields[p.timestampField] = nil
}
}
arrowFileds := make([]*ast.BinaryExpr, 0)
for _, field := range fields {
switch f := field.(type) {
case *ast.BinaryExpr:
if f.OP == ast.ARROW {
// only allowed case like a.b.c
valid := true
ast.WalkFunc(f, func(node ast.Node) bool {
switch c := node.(type) {
case *ast.BinaryExpr:
if c.OP != ast.ARROW {
valid = false
return false
}
case *ast.FieldRef:
if !c.IsColumn() {
valid = false
return false
}
case *ast.JsonFieldRef:
case *ast.MetaRef:
valid = false
if p.allMeta {
break
}
if c.StreamName == ast.DefaultStream || c.StreamName == p.name {
if c.Name == "*" {
p.allMeta = true
p.metaMap = nil
} else if !p.allMeta {
p.metaMap[strings.ToLower(c.Name)] = c.Name
}
}
return false
default:
valid = false
return false
}
return true
})
if valid {
arrowFileds = append(arrowFileds, f)
}
}
case *ast.Wildcard:
if len(f.Except) == 0 && len(f.Replace) == 0 {
p.isWildCard = true
Expand Down Expand Up @@ -214,9 +256,87 @@ func (p *DataSourcePlan) PruneColumns(fields []ast.Expr) error {
for key := range p.streamFields {
conf.Log.Infof("PruneColumns after, datasource:%v, name:%v", p.name, key)
}
if !p.isSchemaless {
p.handleArrowFields(arrowFileds)
}
return nil
}

func buildArrowReference(cur ast.Expr, root map[string]interface{}) (map[string]interface{}, string) {
switch c := cur.(type) {
case *ast.BinaryExpr:
node, name := buildArrowReference(c.LHS, root)
m := node[name].(map[string]interface{})
subName := c.RHS.(*ast.JsonFieldRef).Name
_, ok := m[subName]
if !ok {
m[subName] = map[string]interface{}{}
}
return m, subName
case *ast.FieldRef:
_, ok := root[c.Name]
if !ok {
root[c.Name] = map[string]interface{}{}
}
return root, c.Name
}
return nil, ""
}

// handleArrowFields mark the field and subField for the arrowFields which should be remained
// Then pruned the field which is not used.
func (p *DataSourcePlan) handleArrowFields(arrowFields []*ast.BinaryExpr) {
root := make(map[string]interface{})
for _, af := range arrowFields {
buildArrowReference(af, root)
}
for filedName, node := range root {
jsonStreamField, err := p.getField(filedName, true)
if err != nil {
continue
}
markPruneJSONStreamField(node, jsonStreamField)
}
for key, field := range p.streamFields {
if field != nil && field.Type == "struct" {
if !field.Selected {
delete(p.streamFields, key)
continue
}
pruneJSONStreamField(field)
}
}
}

func pruneJSONStreamField(cur *ast.JsonStreamField) {
cur.Selected = false
if cur.Type != "struct" {
return
}
for key, subField := range cur.Properties {
if !subField.Selected {
delete(cur.Properties, key)
}
pruneJSONStreamField(subField)
}
}

func markPruneJSONStreamField(cur interface{}, field *ast.JsonStreamField) {
field.Selected = true
if field.Type != "struct" {
return
}
curM, ok := cur.(map[string]interface{})
if !ok || len(curM) < 1 {
return
}
for filedName, v := range curM {
if subField, ok := field.Properties[filedName]; ok {
markPruneJSONStreamField(v, subField)
}
}
}

func (p *DataSourcePlan) getField(name string, strict bool) (*ast.JsonStreamField, error) {
if !p.isSchemaless {
r, ok := p.streamFields[name]
Expand Down
102 changes: 102 additions & 0 deletions internal/topo/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,16 @@ func Test_createLogicalPlan(t *testing.T) {
value STRING,
hum BIGINT
) WITH (TYPE="file");`,
"src3": `CREATE STREAM src3 (
a struct(b struct(c bigint,d bigint),e bigint),
a1 struct(b struct(c bigint,d bigint),e bigint),
a2 bigint
) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
}
types := map[string]ast.StreamType{
"src1": ast.TypeStream,
"src2": ast.TypeStream,
"src3": ast.TypeStream,
"tableInPlanner": ast.TypeTable,
}
for name, sql := range streamSqls {
Expand Down Expand Up @@ -135,11 +141,107 @@ func Test_createLogicalPlan(t *testing.T) {
}
tableHumRef.SetRefSource([]string{"tableInPlanner"})

arrowCRef := &ast.AliasRef{
Expression: &ast.BinaryExpr{
OP: ast.ARROW,
LHS: &ast.BinaryExpr{
OP: ast.ARROW,
LHS: &ast.FieldRef{
StreamName: "src3",
Name: "a",
},
RHS: &ast.JsonFieldRef{
Name: "b",
},
},
RHS: &ast.JsonFieldRef{
Name: "c",
},
},
}
arrowCRef.SetRefSource([]string{"src3"})
arrowPRef := &ast.AliasRef{
Expression: &ast.BinaryExpr{
OP: ast.ARROW,
LHS: &ast.FieldRef{
StreamName: "src3",
Name: "a",
},
RHS: &ast.JsonFieldRef{
Name: "e",
},
},
}
arrowPRef.SetRefSource([]string{"src3"})
tests := []struct {
sql string
p LogicalPlan
err string
}{
{
sql: "select a.b.c as c, a.e as e, a2 from src3",
p: ProjectPlan{
baseLogicalPlan: baseLogicalPlan{
children: []LogicalPlan{
DataSourcePlan{
baseLogicalPlan: baseLogicalPlan{},
name: "src3",
streamFields: map[string]*ast.JsonStreamField{
"a": {
Type: "struct",
Properties: map[string]*ast.JsonStreamField{
"b": {
Type: "struct",
Properties: map[string]*ast.JsonStreamField{
"c": {
Type: "bigint",
},
},
},
"e": {
Type: "bigint",
},
},
},
"a2": {
Type: "bigint",
},
},
streamStmt: streams["src3"],
metaFields: []string{},
pruneFields: []string{},
}.Init(),
},
},
fields: []ast.Field{
{
Name: "",
AName: "c",
Expr: &ast.FieldRef{
StreamName: ast.AliasStream,
Name: "c",
AliasRef: arrowCRef,
},
},
{
Name: "e",
AName: "e",
Expr: &ast.FieldRef{
StreamName: ast.AliasStream,
Name: "e",
AliasRef: arrowPRef,
},
},
{
Name: "a2",
Expr: &ast.FieldRef{
StreamName: "src3",
Name: "a2",
},
},
},
}.Init(),
},
{
sql: "select src2.hum as hum, tableInPlanner.hum as hum2 from src2 left join tableInPlanner on src2.hum = tableInPlanner.hum",
p: ProjectPlan{
Expand Down
26 changes: 26 additions & 0 deletions internal/topo/planner/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ func getFields(node ast.Node) []ast.Expr {
}
case *ast.SortField:
result = append(result, t)
case *ast.BinaryExpr:
if t.OP == ast.ARROW {
hasMeta := false
result, hasMeta = getFieldRef(n, result)
if !hasMeta {
result = append(result, t)
}
return hasMeta
}
}
return true
})
Expand All @@ -84,3 +93,20 @@ func getFields(node ast.Node) []ast.Expr {
}
return result
}

func getFieldRef(node ast.Node, result []ast.Expr) ([]ast.Expr, bool) {
hasMeta := false
ast.WalkFunc(node, func(n ast.Node) bool {
switch t := n.(type) {
case *ast.FieldRef:
if t.IsColumn() {
result = append(result, t)
}
case *ast.MetaRef:
hasMeta = true
return false
}
return true
})
return result, hasMeta
}
2 changes: 2 additions & 0 deletions pkg/ast/sourceStmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type JsonStreamField struct {
Type string `json:"type"`
Items *JsonStreamField `json:"items,omitempty"`
Properties map[string]*JsonStreamField `json:"properties,omitempty"`

Selected bool
}

func (u *StreamField) MarshalJSON() ([]byte, error) {
Expand Down

0 comments on commit e36a82c

Please sign in to comment.