Skip to content

Commit

Permalink
join: Add automatic sort (#4770)
Browse files Browse the repository at this point in the history
Add functionality to join operator to automatically sort any unsorted
input.
  • Loading branch information
mattnibs authored Sep 26, 2023
1 parent d55ca74 commit d938fba
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 52 deletions.
6 changes: 2 additions & 4 deletions cmd/zed/manage/lakemanage/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,8 @@ func scan(ctx context.Context, it *objectIterator, pool *pools.Config, runCh cha

const iteratorQuery = `
from %q@%q:objects
| sort min, id
| left join (
from %q@%q:vectors | sort min, id
) on id=id vector:=true
| left join (from %q@%q:vectors) on id=id vector:=true
| sort min
`

type objectIterator struct {
Expand Down
14 changes: 7 additions & 7 deletions compiler/ast/dag/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ type (
Count int `json:"count"`
}
Join struct {
Kind string `json:"kind" unpack:""`
Style string `json:"style"`
LeftKey Expr `json:"left_key"`
LeftOrder order.Which `json:"left_order"`
RightKey Expr `json:"right_key"`
RightOrder order.Which `json:"right_order"`
Args []Assignment `json:"args"`
Kind string `json:"kind" unpack:""`
Style string `json:"style"`
LeftKey Expr `json:"left_key"`
LeftDir order.Direction `json:"left_dir"`
RightKey Expr `json:"right_key"`
RightDir order.Direction `json:"right_dir"`
Args []Assignment `json:"args"`
}
Load struct {
Kind string `json:"kind" unpack:""`
Expand Down
6 changes: 3 additions & 3 deletions compiler/kernel/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ func (b *Builder) compile(o dag.Op, parents []zbuf.Puller) ([]zbuf.Puller, error
return nil, err
}
leftParent, rightParent := parents[0], parents[1]
leftOrder, rightOrder := o.LeftOrder, o.RightOrder
leftDir, rightDir := o.LeftDir, o.RightDir
var anti, inner bool
switch o.Style {
case "anti":
Expand All @@ -581,11 +581,11 @@ func (b *Builder) compile(o dag.Op, parents []zbuf.Puller) ([]zbuf.Puller, error
case "right":
leftKey, rightKey = rightKey, leftKey
leftParent, rightParent = rightParent, leftParent
leftOrder, rightOrder = rightOrder, leftOrder
leftDir, rightDir = rightDir, leftDir
default:
return nil, fmt.Errorf("unknown kind of join: '%s'", o.Style)
}
join, err := join.New(b.octx, anti, inner, leftParent, rightParent, leftKey, rightKey, leftOrder, rightOrder, lhs, rhs)
join, err := join.New(b.octx, anti, inner, leftParent, rightParent, leftKey, rightKey, leftDir, rightDir, lhs, rhs)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions compiler/optimizer/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,10 +286,10 @@ func (o *Optimizer) propagateSortKeyOp(op dag.Op, parents []order.SortKey) ([]or
return nil, errors.New("internal error: join does not have two parents")
}
if fieldOf(join.LeftKey).Equal(parents[0].Primary()) {
join.LeftOrder = parents[0].Order
join.LeftDir = parents[0].Order.Direction()
}
if fieldOf(join.RightKey).Equal(parents[1].Primary()) {
join.RightOrder = parents[1].Order
join.RightDir = parents[1].Order.Direction()
}
// XXX There is definitely a way to propagate the sort key but there's
// some complexity here. The propagated sort key should be whatever key
Expand Down
2 changes: 2 additions & 0 deletions compiler/semantic/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,9 @@ func (a *analyzer) semOp(o ast.Op, seq dag.Seq) (dag.Seq, error) {
join := &dag.Join{
Kind: "Join",
Style: o.Style,
LeftDir: order.Unknown,
LeftKey: leftKey,
RightDir: order.Unknown,
RightKey: rightKey,
Args: assignments,
}
Expand Down
7 changes: 3 additions & 4 deletions docs/language/operators/join.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@ The available join types are:

For anti join, the `<right-expr>` is undefined and thus cannot be specified.

> Currently, only exact equi-join is supported and the inputs must be sorted
> in ascending order by their respective keys. Also, the join keys must
> be field expressions. A future version of join will not require sorted inputs
> and will have more flexible join expressions.
> Currently, only exact equi-join is supported and join keys must be field
> expressions. A future version of join will have more flexible join
> expressions.
### Examples

Expand Down
49 changes: 22 additions & 27 deletions docs/tutorials/join.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ sidebar_label: Join

This is a brief primer on Zed's [`join` operator](../language/operators/join.md).

Currently, `join` is limited in the following ways:
* only merge join is implemented, requiring inputs to be explicitly sorted, and
* only equi-join (i.e., a join predicate containing `=`) is supported.
Currently, `join` is limited in that only equi-join (i.e., a join predicate
containing `=`) is supported.

## Example Data

Expand Down Expand Up @@ -46,9 +45,9 @@ explicit `inner` is not strictly necessary, but including it clarifies our inten

The Zed script `inner-join.zed`:
```mdtest-input inner-join.zed
file fruit.ndjson | sort flavor
file fruit.ndjson
| inner join (
file people.ndjson | sort likes
file people.ndjson
) on flavor=likes eater:=name
```

Expand Down Expand Up @@ -81,9 +80,9 @@ original field name `age` is maintained in the results.

The Zed script `left-join.zed`:
```mdtest-input left-join.zed
file fruit.ndjson | sort flavor
file fruit.ndjson
| left join (
file people.ndjson | sort likes
file people.ndjson
) on flavor=likes eater:=name,age
```

Expand Down Expand Up @@ -114,9 +113,9 @@ the `note` field from the right-hand input to appear in the joined results.

The Zed script `right-join.zed`:
```mdtest-input right-join.zed
file fruit.ndjson | sort flavor
file fruit.ndjson
| right join (
file people.ndjson | sort likes
file people.ndjson
) on flavor=likes fruit:=name
```
Executing the Zed script:
Expand Down Expand Up @@ -144,10 +143,6 @@ lake, we would instead specify those pools using the
Here we'll load our input data to pools in a temporary Zed lake, then execute
our inner join using `zed query`.

Notice that because we happened to use `-orderby` to sort our pools by the same
keys that we reference in our `join`, we did not need to use any explicit
upstream `sort`.

The Zed script `inner-join-pools.zed`:

```mdtest-input inner-join-pools.zed
Expand Down Expand Up @@ -192,8 +187,8 @@ in the [Inner Join section](#inner-join).
The Zed script `inner-join-alternate.zed`:
```mdtest-input inner-join-alternate.zed
from (
file fruit.ndjson => sort flavor
file people.ndjson => sort likes
file fruit.ndjson
file people.ndjson
) | inner join on flavor=likes eater:=name
```

Expand Down Expand Up @@ -226,8 +221,8 @@ The Zed script `inner-join-streamed.zed`:

```mdtest-input inner-join-streamed.zed
switch (
case has(color) => sort flavor
case has(age) => sort likes
case has(color) => pass
case has(age) => pass
) | inner join on flavor=likes eater:=name
```

Expand Down Expand Up @@ -271,9 +266,9 @@ they look like, but since it represents redundant data, in practice we'd
typically [`drop`](../language/operators/drop.md) it after the `join` in our Zed pipeline.

```mdtest-input multi-value-join.zed
file fruit.ndjson | put fruitkey:={name,color} | sort fruitkey
file fruit.ndjson | put fruitkey:={name,color}
| inner join (
file inventory.ndjson | put invkey:={name,color} | sort invkey
file inventory.ndjson | put invkey:={name,color}
) on fruitkey=invkey quantity
```

Expand Down Expand Up @@ -308,12 +303,12 @@ previously for our inner join by piping its output to an additional join
against the price list.

```mdtest-input three-way-join.zed
file fruit.ndjson | sort flavor
file fruit.ndjson
| inner join (
file people.ndjson | sort likes
) on flavor=likes eater:=name | sort name
file people.ndjson
) on flavor=likes eater:=name
| inner join (
file prices.ndjson | sort name
file prices.ndjson
) on name=name price:=price
```

Expand Down Expand Up @@ -349,9 +344,9 @@ in the result.
The Zed script `embed-opposite.zed`:

```mdtest-input embed-opposite.zed
file fruit.ndjson | sort flavor
file fruit.ndjson
| inner join (
file people.ndjson | sort likes
file people.ndjson
) on flavor=likes eaterinfo:=this
```

Expand Down Expand Up @@ -379,9 +374,9 @@ left and right inputs. We'll demonstrate this by augmenting `embed-opposite.zed`
to produce `merge-opposite.zed`.

```mdtest-input merge-opposite.zed
file fruit.ndjson | sort flavor
file fruit.ndjson
| inner join (
file people.ndjson | sort likes
file people.ndjson
) on flavor=likes eaterinfo:=this
| rename fruit:=name
| yield {...this,...eaterinfo}
Expand Down
7 changes: 7 additions & 0 deletions order/which.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ func (w Which) String() string {
return "asc"
}

func (w Which) Direction() Direction {
if w == Desc {
return Down
}
return Up
}

func (w Which) MarshalJSON() ([]byte, error) {
return json.Marshal(w.String())
}
Expand Down
27 changes: 25 additions & 2 deletions runtime/op/join/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/brimdata/zed/pkg/field"
"github.com/brimdata/zed/runtime/expr"
"github.com/brimdata/zed/runtime/op"
"github.com/brimdata/zed/runtime/op/sort"
"github.com/brimdata/zed/zbuf"
"github.com/brimdata/zed/zio"
)
Expand All @@ -33,11 +34,33 @@ type Op struct {
types map[int]map[int]*zed.TypeRecord
}

func New(octx *op.Context, anti, inner bool, left, right zbuf.Puller, leftKey, rightKey expr.Evaluator, leftOrder, rightOrder order.Which, lhs field.List, rhs []expr.Evaluator) (*Op, error) {
func New(octx *op.Context, anti, inner bool, left, right zbuf.Puller, leftKey, rightKey expr.Evaluator,
leftDir, rightDir order.Direction, lhs field.List,
rhs []expr.Evaluator) (*Op, error) {
cutter, err := expr.NewCutter(octx.Zctx, lhs, rhs)
if err != nil {
return nil, err
}
var o order.Which
switch {
case leftDir != order.Unknown:
o = leftDir == order.Down
case rightDir != order.Unknown:
o = rightDir == order.Down
}
// Add sorts if needed.
if !leftDir.HasOrder(o) {
left, err = sort.New(octx, left, []expr.Evaluator{leftKey}, o, true)
if err != nil {
return nil, err
}
}
if !rightDir.HasOrder(o) {
right, err = sort.New(octx, right, []expr.Evaluator{rightKey}, o, true)
if err != nil {
return nil, err
}
}
ctx, cancel := context.WithCancel(octx.Context)
return &Op{
octx: octx,
Expand All @@ -49,7 +72,7 @@ func New(octx *op.Context, anti, inner bool, left, right zbuf.Puller, leftKey, r
getRightKey: rightKey,
left: newPuller(left, ctx),
right: zio.NewPeeker(newPuller(right, ctx)),
compare: expr.NewValueCompareFn(leftOrder, true),
compare: expr.NewValueCompareFn(o, true),
cutter: cutter,
types: make(map[int]map[int]*zed.TypeRecord),
}, nil
Expand Down
43 changes: 43 additions & 0 deletions runtime/op/join/ztests/auto-sort.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
script: |
zq -z 'file a.zson | join (file b.zson) on a=b'
echo '// ==='
zq -z 'file a.zson | join (file b.zson | sort -r b) on a=b'
echo '// ==='
zq -z 'file a.zson | join (file b.zson | sort b) on a=b'
echo '// ==='
zq -z 'file a.zson | sort a | join (file b.zson | sort -r b) on a=b'
echo '// ==='
zq -z 'file a.zson | sort -r a | join (file b.zson) on a=b'
echo '// ==='
zq -z 'file a.zson | sort -r a | join (file b.zson | sort b) on a=b'
inputs:
- name: a.zson
data: |
{a:1}
{a:2}
- name: b.zson
data: |
{b:2}
{b:1}
outputs:
- name: stdout
data: |
{a:1}
{a:2}
// ===
{a:2}
{a:1}
// ===
{a:1}
{a:2}
// ===
{a:1}
{a:2}
// ===
{a:2}
{a:1}
// ===
{a:2}
{a:1}
6 changes: 3 additions & 3 deletions runtime/op/join/ztests/expr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ inputs:
outputs:
- name: stdout
data: |
{a:1(int32),s:"a"}
{a:2(int32),s:"B"}
{a:1(int32),s:"a"}
{a:3(int32),s:"c",b:6(int32)}
===
{a:1(int32),s:"a",b:4(int32)}
{a:2(int32),s:"B"}
{a:1(int32),s:"a",b:4(int32)}
{a:3(int32),s:"c",b:6(int32)}
===
{a:1(int32),s:"a",b:4(int32)}
{a:2(int32),s:"B",b:5(int32)}
{a:3(int32),s:"c",b:6(int32)}
===
{a:1(int32),s:"a"}
{a:2(int32),s:"B"}
{a:1(int32),s:"a"}
{a:3(int32),s:"c"}

0 comments on commit d938fba

Please sign in to comment.