forked from brimdata/super
-
Notifications
You must be signed in to change notification settings - Fork 0
/
op.go
65 lines (55 loc) · 1.53 KB
/
op.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package op
import (
"context"
"github.com/brimdata/zed"
"github.com/brimdata/zed/compiler/ast/dag"
"github.com/brimdata/zed/order"
"github.com/brimdata/zed/runtime/expr/extent"
"github.com/brimdata/zed/zbuf"
"github.com/segmentio/ksuid"
"go.uber.org/zap"
)
const BatchLen = 100
type DataAdaptor interface {
PoolID(context.Context, string) (ksuid.KSUID, error)
CommitObject(context.Context, ksuid.KSUID, string) (ksuid.KSUID, error)
Layout(context.Context, dag.Source) order.Layout
NewScheduler(context.Context, *zed.Context, dag.Source, extent.Span, zbuf.Filter, *dag.Filter) (Scheduler, error)
Open(context.Context, *zed.Context, string, string, zbuf.Filter) (zbuf.Puller, error)
}
type Scheduler interface {
PullScanTask() (zbuf.Puller, error)
Progress() zbuf.Progress
}
// Result is a convenient way to bundle the result of Proc.Pull() to
// send over channels.
type Result struct {
Batch zbuf.Batch
Err error
}
// Context provides states used by all procs to provide the outside context
// in which they are running.
type Context struct {
context.Context
Logger *zap.Logger
Zctx *zed.Context
cancel context.CancelFunc
}
func NewContext(ctx context.Context, zctx *zed.Context, logger *zap.Logger) *Context {
ctx, cancel := context.WithCancel(ctx)
if logger == nil {
logger = zap.NewNop()
}
return &Context{
Context: ctx,
cancel: cancel,
Logger: logger,
Zctx: zctx,
}
}
func DefaultContext() *Context {
return NewContext(context.Background(), zed.NewContext(), nil)
}
func (c *Context) Cancel() {
c.cancel()
}