Skip to content

Commit

Permalink
TSV support (#4891)
Browse files Browse the repository at this point in the history
Add tsv for support for both input and output. Also support auto-detection of
tsv input.

Closes #4746
  • Loading branch information
mattnibs authored Nov 29, 2023
1 parent eaf7235 commit b84255c
Show file tree
Hide file tree
Showing 14 changed files with 80 additions and 20 deletions.
5 changes: 5 additions & 0 deletions api/mime.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
MediaTypeLine = "application/x-line"
MediaTypeNDJSON = "application/x-ndjson"
MediaTypeParquet = "application/x-parquet"
MediaTypeTSV = "application/tab-separated-values"
MediaTypeVNG = "application/x-vng"
MediaTypeZeek = "application/x-zeek"
MediaTypeZJSON = "application/x-zjson"
Expand Down Expand Up @@ -55,6 +56,8 @@ func MediaTypeToFormat(s string, dflt string) (string, error) {
return "ndjson", nil
case MediaTypeParquet:
return "parquet", nil
case MediaTypeTSV:
return "tsv", nil
case MediaTypeVNG:
return "vng", nil
case MediaTypeZeek:
Expand Down Expand Up @@ -83,6 +86,8 @@ func FormatToMediaType(format string) (string, error) {
return MediaTypeNDJSON, nil
case "parquet":
return MediaTypeParquet, nil
case "tsv":
return MediaTypeTSV, nil
case "vng":
return MediaTypeVNG, nil
case "zeek":
Expand Down
2 changes: 1 addition & 1 deletion cli/inputflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (f *Flags) Options() anyio.ReaderOpts {
}

func (f *Flags) SetFlags(fs *flag.FlagSet, validate bool) {
fs.StringVar(&f.Format, "i", "auto", "format of input data [auto,arrows,csv,json,line,parquet,vng,zeek,zjson,zng,zson]")
fs.StringVar(&f.Format, "i", "auto", "format of input data [auto,arrows,csv,json,line,parquet,tsv,vng,zeek,zjson,zng,zson]")
f.CSV.Delim = ','
fs.Func("csv.delim", `CSV field delimiter (default ",")`, func(s string) error {
if len(s) != 1 {
Expand Down
2 changes: 1 addition & 1 deletion cli/outputflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (f *Flags) SetFormatFlags(fs *flag.FlagSet) {
if f.DefaultFormat == "" {
f.DefaultFormat = "zng"
}
fs.StringVar(&f.Format, "f", f.DefaultFormat, "format for output data [arrows,csv,json,lake,parquet,table,text,vng,zeek,zjson,zng,zson]")
fs.StringVar(&f.Format, "f", f.DefaultFormat, "format for output data [arrows,csv,json,lake,parquet,table,text,tsv,vng,zeek,zjson,zng,zson]")
fs.BoolVar(&f.jsonShortcut, "j", false, "use line-oriented JSON output independent of -f option")
fs.BoolVar(&f.zsonShortcut, "z", false, "use line-oriented ZSON output independent of -f option")
fs.BoolVar(&f.zsonPretty, "Z", false, "use formatted ZSON output independent of -f option")
Expand Down
1 change: 1 addition & 0 deletions docs/commands/zq.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Note here that the query `1+1` [implies](../language/dataflow-model.md#implied-o
| `csv` | yes | [CSV RFC 4180](https://www.rfc-editor.org/rfc/rfc4180.html) |
| `line` | no | One string value per input line |
| `parquet` | yes | [Apache Parquet](https://github.com/apache/parquet-format) |
| `tsv` | yes | [TSV - Tab-Separated Values](https://en.wikipedia.org/wiki/Tab-separated_values) |
| `vng` | yes | [VNG - Binary Columnar Format](../formats/vng.md) |
| `zson` | yes | [ZSON - Human-readable Format](../formats/zson.md) |
| `zng` | yes | [ZNG - Binary Row Format](../formats/zson.md) |
Expand Down
1 change: 1 addition & 0 deletions docs/lake/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ The following table shows the supported MIME types and where they can be used.
| Line | yes | no | `application/x-line` |
| NDJSON | no | yes | `application/x-ndjson` |
| Parquet | yes | yes | `application/x-parquet` |
| TSV | yes | yes | `text/tab-separated-values` |
| VNG | yes | yes | `application/x-vng` |
| Zeek | yes | yes | `application/x-zeek` |
| ZJSON | yes | yes | `application/x-zjson` |
Expand Down
2 changes: 1 addition & 1 deletion service/ztests/curl-load-error.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ inputs:
outputs:
- name: stdout
data: |
{"type":"Error","kind":"invalid operation","error":"format detection error\n\tarrows: schema message length exceeds 1 MiB\n\tcsv: line 1: EOF\n\tjson: invalid character 'T' looking for beginning of value\n\tline: auto-detection not supported\n\tparquet: auto-detection requires seekable input\n\tvng: auto-detection requires seekable input\n\tzeek: line 1: bad types/fields definition in zeek header\n\tzjson: line 1: malformed ZJSON: bad type object: \"This is not a detectable format.\": unpacker error parsing JSON: invalid character 'T' looking for beginning of value\n\tzng: malformed zng record\n\tzson: ZSON syntax error"}
{"type":"Error","kind":"invalid operation","error":"format detection error\n\tarrows: schema message length exceeds 1 MiB\n\tcsv: line 1: EOF\n\tjson: invalid character 'T' looking for beginning of value\n\tline: auto-detection not supported\n\tparquet: auto-detection requires seekable input\n\ttsv: line 1: EOF\n\tvng: auto-detection requires seekable input\n\tzeek: line 1: bad types/fields definition in zeek header\n\tzjson: line 1: malformed ZJSON: bad type object: \"This is not a detectable format.\": unpacker error parsing JSON: invalid character 'T' looking for beginning of value\n\tzng: malformed zng record\n\tzson: ZSON syntax error"}
code 400
{"type":"Error","kind":"invalid operation","error":"unsupported MIME type: unsupported"}
code 400
17 changes: 17 additions & 0 deletions service/ztests/curl-tsv.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
script: |
source service.sh
zed create -q -orderby a test
curl -H Content-Type:application/tab-separated-values --data-binary @in.tsv \
--fail $ZED_LAKE/pool/test/branch/main > /dev/null
curl -H Accept:application/tab-separated-values -d '{"query":"from test"}' $ZED_LAKE/query
inputs:
- name: in.tsv
data: &in_tsv |
a b
1 2
- name: service.sh

outputs:
- name: stdout
data: *in_tsv
3 changes: 2 additions & 1 deletion service/ztests/load-garbage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ outputs:
data: |
stdio:stdin: format detection error
arrows: schema message length exceeds 1 MiB
csv: line 1: no comma found
csv: line 1: delimiter ',' not found
json: invalid character 'T' looking for beginning of value
line: auto-detection not supported
parquet: auto-detection requires seekable input
tsv: line 1: delimiter '\t' not found
vng: auto-detection requires seekable input
zeek: line 1: bad types/fields definition in zeek header
zjson: line 1: malformed ZJSON: bad type object: "This file contains no records.": unpacker error parsing JSON: invalid character 'T' looking for beginning of value
Expand Down
3 changes: 3 additions & 0 deletions zio/anyio/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func lookupReader(zctx *zed.Context, r io.Reader, demandOut demand.Demand, opts
return nil, err
}
return zio.NopReadCloser(zr), nil
case "tsv":
opts.CSV.Delim = '\t'
return zio.NopReadCloser(csvio.NewReader(zctx, r, opts.CSV)), nil
case "vng":
zr, err := vngio.NewReader(zctx, r, demandOut)
if err != nil {
Expand Down
31 changes: 20 additions & 11 deletions zio/anyio/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,15 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, demandOut demand.Demand,
}
track.Reset()

var csvErr error
if s, err := bufio.NewReader(track).ReadString('\n'); err != nil {
csvErr = fmt.Errorf("csv: line 1: %w", err)
} else if !strings.Contains(s, ",") {
csvErr = errors.New("csv: line 1: no comma found")
} else {
track.Reset()
csvErr = match(csvio.NewReader(zed.NewContext(), track, opts.CSV), "csv", 1)
if csvErr == nil {
return zio.NopReadCloser(csvio.NewReader(zctx, track.Reader(), opts.CSV)), nil
}
csvErr := isCSVStream(track, ',', "csv")
if csvErr == nil {
return zio.NopReadCloser(csvio.NewReader(zctx, track.Reader(), csvio.ReaderOpts{Delim: ','})), nil
}
track.Reset()

tsvErr := isCSVStream(track, '\t', "tsv")
if tsvErr == nil {
return zio.NopReadCloser(csvio.NewReader(zctx, track.Reader(), csvio.ReaderOpts{Delim: '\t'})), nil
}
track.Reset()

Expand All @@ -138,6 +136,7 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, demandOut demand.Demand,
jsonErr,
lineErr,
parquetErr,
tsvErr,
vngErr,
zeekErr,
zjsonErr,
Expand Down Expand Up @@ -176,6 +175,16 @@ func isArrowStream(track *Track) error {
return err
}

func isCSVStream(track *Track, delim rune, name string) error {
if s, err := bufio.NewReader(track).ReadString('\n'); err != nil {
return fmt.Errorf("%s: line 1: %w", name, err)
} else if !strings.Contains(s, string(delim)) {
return fmt.Errorf("%s: line 1: delimiter %q not found", name, delim)
}
track.Reset()
return match(csvio.NewReader(zed.NewContext(), track, csvio.ReaderOpts{Delim: delim}), name, 1)
}

func joinErrs(errs []error) error {
s := "format detection error"
for _, e := range errs {
Expand Down
6 changes: 5 additions & 1 deletion zio/anyio/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
type WriterOpts struct {
Format string
Lake lakeio.WriterOpts
CSV csvio.WriterOpts
VNG *vngio.WriterOpts // Nil means use defaults via vngio.NewWriter.
ZNG *zngio.WriterOpts // Nil means use defaults via zngio.NewWriter.
ZSON zsonio.WriterOpts
Expand All @@ -33,7 +34,7 @@ func NewWriter(w io.WriteCloser, opts WriterOpts) (zio.WriteCloser, error) {
case "arrows":
return arrowio.NewWriter(w), nil
case "csv":
return csvio.NewWriter(w), nil
return csvio.NewWriter(w, opts.CSV), nil
case "json":
return jsonio.NewWriter(w), nil
case "lake":
Expand All @@ -46,6 +47,9 @@ func NewWriter(w io.WriteCloser, opts WriterOpts) (zio.WriteCloser, error) {
return tableio.NewWriter(w), nil
case "text":
return textio.NewWriter(w), nil
case "tsv":
opts.CSV.Delim = '\t'
return csvio.NewWriter(w, opts.CSV), nil
case "vng":
if opts.VNG == nil {
return vngio.NewWriter(w)
Expand Down
3 changes: 2 additions & 1 deletion zio/anyio/ztests/huge.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ outputs:
data: |
stdio:stdin: format detection error
arrows: schema message length exceeds 1 MiB
csv: line 1: no comma found
csv: line 1: delimiter ',' not found
json: buffer exceeded max size trying to infer input format
line: auto-detection not supported
parquet: auto-detection requires seekable input
tsv: line 1: delimiter '\t' not found
vng: auto-detection requires seekable input
zeek: line 1: bad types/fields definition in zeek header
zjson: line 1: malformed ZJSON: bad type object: "": unpacker error parsing JSON: unexpected end of JSON input
Expand Down
14 changes: 14 additions & 0 deletions zio/anyio/ztests/tsv.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
script: |
zq - | zq -f tsv -
inputs:
- name: stdin
data: &stdin |
# Year Winery
1 2019 Brutocao
1 2021 Cabana
1 2020 Castoro

outputs:
- name: stdout
data: *stdin
10 changes: 7 additions & 3 deletions zio/csvio/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ type Writer struct {
}

type WriterOpts struct {
UTF8 bool
Delim rune
}

func NewWriter(w io.WriteCloser) *Writer {
func NewWriter(w io.WriteCloser, opts WriterOpts) *Writer {
encoder := csv.NewWriter(w)
if opts.Delim != 0 {
encoder.Comma = opts.Delim
}
return &Writer{
writer: w,
encoder: csv.NewWriter(w),
encoder: encoder,
flattener: expr.NewFlattener(zed.NewContext()),
types: make(map[int]struct{}),
}
Expand Down

0 comments on commit b84255c

Please sign in to comment.