diff --git a/archive/archive_test.go b/archive/archive_test.go index 816514daa9..5e9ae3a0f7 100644 --- a/archive/archive_test.go +++ b/archive/archive_test.go @@ -57,7 +57,7 @@ func indexQuery(t *testing.T, ark *Archive, query IndexQuery, opts ...FindOption defer rc.Close() var buf bytes.Buffer - w := zbuf.NopFlusher(tzngio.NewWriter(&buf)) + w := tzngio.NewWriter(&buf) require.NoError(t, zbuf.Copy(w, rc)) return buf.String() diff --git a/cmd/zapi/cmd/get/get.go b/cmd/zapi/cmd/get/get.go index e58a17aa8f..0c8696fcb2 100644 --- a/cmd/zapi/cmd/get/get.go +++ b/cmd/zapi/cmd/get/get.go @@ -104,7 +104,7 @@ func (c *Command) Run(args []string) error { } stream := api.NewZngSearch(r) stream.SetOnCtrl(c.handleControl) - if err := zbuf.Copy(zbuf.NopFlusher(writer), stream); err != nil { + if err := zbuf.Copy(writer, stream); err != nil { writer.Close() if c.Context().Err() != nil { return errors.New("search aborted") diff --git a/emitter/dir_test.go b/emitter/dir_test.go index 8431c5bb94..c733eaf686 100644 --- a/emitter/dir_test.go +++ b/emitter/dir_test.go @@ -38,8 +38,7 @@ func TestDirS3Source(t *testing.T) { require.NoError(t, err) w, err := NewDirWithSource(uri, "", os.Stderr, &zio.WriterFlags{Format: "tzng"}, src) require.NoError(t, err) - err = zbuf.Copy(zbuf.NopFlusher(w), r) - require.NoError(t, err) + require.NoError(t, zbuf.Copy(w, r)) } type nopCloser struct{ *bytes.Buffer } diff --git a/zbuf/counter_test.go b/zbuf/counter_test.go index 2ee1be1d28..f088b4b219 100644 --- a/zbuf/counter_test.go +++ b/zbuf/counter_test.go @@ -16,10 +16,6 @@ func (n *Sink) Write(rec *zng.Record) error { return nil } -func (n *Sink) Flush() error { - return nil -} - func TestCounter(t *testing.T) { var count int64 var wg sync.WaitGroup diff --git a/zbuf/zng.go b/zbuf/zng.go index 73aaf931ad..80fc1c6db4 100644 --- a/zbuf/zng.go +++ b/zbuf/zng.go @@ -68,33 +68,23 @@ func NewReadCloser(r Reader, c io.Closer) ReadCloser { return extReadCloser{r, c} } -func CopyWithContext(ctx context.Context, dst WriteFlusher, src Reader) error { - var err error - for ctx.Err() == nil { - var rec *zng.Record - rec, err = src.Read() +func CopyWithContext(ctx context.Context, dst Writer, src Reader) error { + for { + if err := ctx.Err(); err != nil { + return err + } + rec, err := src.Read() if err != nil || rec == nil { - break + return err } - err = dst.Write(rec) - if err != nil { - break + if err := dst.Write(rec); err != nil { + return err } } - dstErr := dst.Flush() - switch { - case err != nil: - return err - case dstErr != nil: - return dstErr - default: - return ctx.Err() - } } -// Copy copies src to dst a la io.Copy. The src reader is read from -// while the dst writer is written to and closed. -func Copy(dst WriteFlusher, src Reader) error { +// Copy copies src to dst a la io.Copy. +func Copy(dst Writer, src Reader) error { return CopyWithContext(context.Background(), dst, src) } diff --git a/zdx/zdx.go b/zdx/zdx.go index 8257e44266..3e5209bb9e 100644 --- a/zdx/zdx.go +++ b/zdx/zdx.go @@ -19,9 +19,9 @@ // below in the hiearchy where the key is the first key found in that stream and // the value is the offset or the stream in the file below. // -// zdx.Reader implements zbuf.Reader and zdx.Writer implements zbuf.Writer and -// zbuf.WriteFlusher so generic zng functionality applies, e.g., a Reader can be -// copied to a Writer using zbuf.Copy(). +// zdx.Reader implements zbuf.Reader and zdx.Writer implements zbuf.Writer so +// generic zng functionality applies, e.g., a Reader can be copied to a Writer +// using zbuf.Copy(). package zdx import ( diff --git a/zio/ndjsonio/ndjson_test.go b/zio/ndjsonio/ndjson_test.go index 0941aa1734..6e4a74b3a8 100644 --- a/zio/ndjsonio/ndjson_test.go +++ b/zio/ndjsonio/ndjson_test.go @@ -53,7 +53,7 @@ func TestNDJSONWriter(t *testing.T) { var out bytes.Buffer w := NewWriter(&out) r := tzngio.NewReader(strings.NewReader(c.input), resolver.NewContext()) - require.NoError(t, zbuf.Copy(zbuf.NopFlusher(w), r)) + require.NoError(t, zbuf.Copy(w, r)) NDJSONEq(t, c.output, out.String()) }) } @@ -134,7 +134,7 @@ func runtestcase(t *testing.T, input, output string) { w := NewWriter(&out) r, err := NewReader(strings.NewReader(input), resolver.NewContext(), nil, "", "") require.NoError(t, err) - require.NoError(t, zbuf.Copy(zbuf.NopFlusher(w), r)) + require.NoError(t, zbuf.Copy(w, r)) NDJSONEq(t, output, out.String()) } @@ -356,7 +356,7 @@ func TestNDJSONTypeErrors(t *testing.T) { err = r.configureTypes(typeConfig, c.defaultPath) require.NoError(t, err) - err = zbuf.Copy(zbuf.NopFlusher(w), r) + err = zbuf.Copy(w, r) if c.success { require.NoError(t, err) } else { diff --git a/zio/tzngio/reader_test.go b/zio/tzngio/reader_test.go index 12bc9b0068..8645b16358 100644 --- a/zio/tzngio/reader_test.go +++ b/zio/tzngio/reader_test.go @@ -116,7 +116,7 @@ func boomerangErr(t *testing.T, name, logs, errorMsg string, errorArgs ...interf t.Run(name, func(t *testing.T) { in := []byte(strings.TrimSpace(logs) + "\n") zngSrc := tzngio.NewReader(bytes.NewReader(in), resolver.NewContext()) - zngDst := zbuf.NopFlusher(tzngio.NewWriter(&output{})) + zngDst := tzngio.NewWriter(&output{}) err := zbuf.Copy(zngDst, zngSrc) assert.Errorf(t, err, errorMsg, errorArgs...) }) @@ -128,7 +128,7 @@ func boomerang(t *testing.T, name, logs string) { var out output in := []byte(strings.TrimSpace(logs) + "\n") zngSrc := tzngio.NewReader(bytes.NewReader(in), resolver.NewContext()) - zngDst := zbuf.NopFlusher(tzngio.NewWriter(&out)) + zngDst := tzngio.NewWriter(&out) err := zbuf.Copy(zngDst, zngSrc) require.NoError(t, err) assert.Equal(t, string(in), out.String()) diff --git a/zio/zng_test.go b/zio/zng_test.go index fcce635201..eae5ca3b70 100644 --- a/zio/zng_test.go +++ b/zio/zng_test.go @@ -28,7 +28,7 @@ func (o *Output) Close() error { func identity(t *testing.T, logs string) { var out Output - dst := zbuf.NopFlusher(tzngio.NewWriter(&out)) + dst := tzngio.NewWriter(&out) in := []byte(strings.TrimSpace(logs) + "\n") src := tzngio.NewReader(bytes.NewReader(in), resolver.NewContext()) err := zbuf.Copy(dst, src) @@ -43,13 +43,13 @@ func boomerang(t *testing.T, logs string, compress bool) { tzngSrc := tzngio.NewReader(bytes.NewReader(in), resolver.NewContext()) var rawzng Output rawDst := zngio.NewWriter(&rawzng, zio.WriterFlags{ZngLZ4BlockSize: zio.DefaultZngLZ4BlockSize}) - err := zbuf.Copy(rawDst, tzngSrc) - require.NoError(t, err) + require.NoError(t, zbuf.Copy(rawDst, tzngSrc)) + require.NoError(t, rawDst.Flush()) var out Output rawSrc := zngio.NewReader(bytes.NewReader(rawzng.Bytes()), resolver.NewContext()) - tzngDst := zbuf.NopFlusher(tzngio.NewWriter(&out)) - err = zbuf.Copy(tzngDst, rawSrc) + tzngDst := tzngio.NewWriter(&out) + err := zbuf.Copy(tzngDst, rawSrc) if assert.NoError(t, err) { assert.Equal(t, in, out.Bytes()) } @@ -58,13 +58,13 @@ func boomerang(t *testing.T, logs string, compress bool) { func boomerangZJSON(t *testing.T, logs string) { tzngSrc := tzngio.NewReader(strings.NewReader(logs), resolver.NewContext()) var zjsonOutput Output - zjsonDst := zbuf.NopFlusher(zjsonio.NewWriter(&zjsonOutput)) + zjsonDst := zjsonio.NewWriter(&zjsonOutput) err := zbuf.Copy(zjsonDst, tzngSrc) require.NoError(t, err) var out Output zjsonSrc := zjsonio.NewReader(bytes.NewReader(zjsonOutput.Bytes()), resolver.NewContext()) - tzngDst := zbuf.NopFlusher(tzngio.NewWriter(&out)) + tzngDst := tzngio.NewWriter(&out) err = zbuf.Copy(tzngDst, zjsonSrc) if assert.NoError(t, err) { assert.Equal(t, strings.TrimSpace(logs), strings.TrimSpace(out.String())) diff --git a/zqd/handlers_test.go b/zqd/handlers_test.go index 914e634cc7..57f1d974df 100644 --- a/zqd/handlers_test.go +++ b/zqd/handlers_test.go @@ -81,7 +81,7 @@ func TestSearchNoCtrl(t *testing.T) { msgs = append(msgs, i) }) buf := bytes.NewBuffer(nil) - w := zbuf.NopFlusher(tzngio.NewWriter(buf)) + w := tzngio.NewWriter(buf) require.NoError(t, zbuf.Copy(w, r)) require.Equal(t, test.Trim(src), buf.String()) require.Equal(t, 0, len(msgs)) @@ -1032,7 +1032,7 @@ func archiveStat(t *testing.T, client *api.Connection, space api.SpaceID) string r, err := client.ArchiveStat(context.Background(), space, nil) require.NoError(t, err) buf := bytes.NewBuffer(nil) - w := zbuf.NopFlusher(tzngio.NewWriter(buf)) + w := tzngio.NewWriter(buf) require.NoError(t, zbuf.Copy(w, r)) return buf.String() } @@ -1045,7 +1045,7 @@ func indexSearch(t *testing.T, client *api.Connection, space api.SpaceID, indexN r, err := client.IndexSearch(context.Background(), space, req, nil) require.NoError(t, err) buf := bytes.NewBuffer(nil) - w := zbuf.NopFlusher(tzngio.NewWriter(buf)) + w := tzngio.NewWriter(buf) var msgs []interface{} r.SetOnCtrl(func(i interface{}) { msgs = append(msgs, i) @@ -1071,7 +1071,7 @@ func search(t *testing.T, client *api.Connection, space api.SpaceID, prog string r, err := client.Search(context.Background(), req, nil) require.NoError(t, err) buf := bytes.NewBuffer(nil) - w := zbuf.NopFlusher(tzngio.NewWriter(buf)) + w := tzngio.NewWriter(buf) var msgs []interface{} r.SetOnCtrl(func(i interface{}) { msgs = append(msgs, i)