forked from brimdata/super
-
Notifications
You must be signed in to change notification settings - Fork 0
/
fuse.go
60 lines (51 loc) · 1.2 KB
/
fuse.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package agg
import (
"fmt"
"github.com/brimdata/zed"
)
type fuse struct {
shapes map[zed.Type]int
partials []zed.Value
}
var _ Function = (*fuse)(nil)
func newFuse() *fuse {
return &fuse{
shapes: make(map[zed.Type]int),
}
}
func (f *fuse) Consume(val *zed.Value) {
if _, ok := f.shapes[val.Type]; !ok {
f.shapes[val.Type] = len(f.shapes)
}
}
func (f *fuse) Result(zctx *zed.Context) *zed.Value {
if len(f.shapes)+len(f.partials) == 0 {
// empty input, return type(null)... XXX singleton
return zed.NewValue(zed.TypeType, nil)
}
schema := NewSchema(zctx)
for _, p := range f.partials {
typ, err := zctx.LookupByValue(p.Bytes)
if err != nil {
panic(fmt.Errorf("fuse: invalid partial value: %w", err))
}
schema.Mixin(typ)
}
shapes := make([]zed.Type, len(f.shapes))
for typ, i := range f.shapes {
shapes[i] = typ
}
for _, typ := range shapes {
schema.Mixin(typ)
}
return zctx.LookupTypeValue(schema.Type())
}
func (f *fuse) ConsumeAsPartial(partial *zed.Value) {
if partial.Type != zed.TypeType {
panic("fuse: partial not a type value")
}
f.partials = append(f.partials, *partial)
}
func (f *fuse) ResultAsPartial(zctx *zed.Context) *zed.Value {
return f.Result(zctx)
}