Skip to content

Commit

Permalink
Remove Flush call from zbuf.CopyWithContext (#1066)
Browse files Browse the repository at this point in the history
It's odd that zbuf.CopyWithContext calls dst.Flush.  (Its inspiration,
io.Copy, does not.)  Remove the call and adjust zbuf.Copy and
zbuf.CopyWithContext to take a zbuf.Writer instead of a
zbuf.WriteFlusher.
  • Loading branch information
nwt authored Aug 6, 2020
1 parent a747329 commit 7f68479
Show file tree
Hide file tree
Showing 10 changed files with 33 additions and 48 deletions.
2 changes: 1 addition & 1 deletion archive/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func indexQuery(t *testing.T, ark *Archive, query IndexQuery, opts ...FindOption
defer rc.Close()

var buf bytes.Buffer
w := zbuf.NopFlusher(tzngio.NewWriter(&buf))
w := tzngio.NewWriter(&buf)
require.NoError(t, zbuf.Copy(w, rc))

return buf.String()
Expand Down
2 changes: 1 addition & 1 deletion cmd/zapi/cmd/get/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (c *Command) Run(args []string) error {
}
stream := api.NewZngSearch(r)
stream.SetOnCtrl(c.handleControl)
if err := zbuf.Copy(zbuf.NopFlusher(writer), stream); err != nil {
if err := zbuf.Copy(writer, stream); err != nil {
writer.Close()
if c.Context().Err() != nil {
return errors.New("search aborted")
Expand Down
3 changes: 1 addition & 2 deletions emitter/dir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ func TestDirS3Source(t *testing.T) {
require.NoError(t, err)
w, err := NewDirWithSource(uri, "", os.Stderr, &zio.WriterFlags{Format: "tzng"}, src)
require.NoError(t, err)
err = zbuf.Copy(zbuf.NopFlusher(w), r)
require.NoError(t, err)
require.NoError(t, zbuf.Copy(w, r))
}

type nopCloser struct{ *bytes.Buffer }
Expand Down
4 changes: 0 additions & 4 deletions zbuf/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ func (n *Sink) Write(rec *zng.Record) error {
return nil
}

func (n *Sink) Flush() error {
return nil
}

func TestCounter(t *testing.T) {
var count int64
var wg sync.WaitGroup
Expand Down
32 changes: 11 additions & 21 deletions zbuf/zng.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,33 +68,23 @@ func NewReadCloser(r Reader, c io.Closer) ReadCloser {
return extReadCloser{r, c}
}

func CopyWithContext(ctx context.Context, dst WriteFlusher, src Reader) error {
var err error
for ctx.Err() == nil {
var rec *zng.Record
rec, err = src.Read()
func CopyWithContext(ctx context.Context, dst Writer, src Reader) error {
for {
if err := ctx.Err(); err != nil {
return err
}
rec, err := src.Read()
if err != nil || rec == nil {
break
return err
}
err = dst.Write(rec)
if err != nil {
break
if err := dst.Write(rec); err != nil {
return err
}
}
dstErr := dst.Flush()
switch {
case err != nil:
return err
case dstErr != nil:
return dstErr
default:
return ctx.Err()
}
}

// Copy copies src to dst a la io.Copy. The src reader is read from
// while the dst writer is written to and closed.
func Copy(dst WriteFlusher, src Reader) error {
// Copy copies src to dst a la io.Copy.
func Copy(dst Writer, src Reader) error {
return CopyWithContext(context.Background(), dst, src)
}

Expand Down
6 changes: 3 additions & 3 deletions zdx/zdx.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
// below in the hiearchy where the key is the first key found in that stream and
// the value is the offset or the stream in the file below.
//
// zdx.Reader implements zbuf.Reader and zdx.Writer implements zbuf.Writer and
// zbuf.WriteFlusher so generic zng functionality applies, e.g., a Reader can be
// copied to a Writer using zbuf.Copy().
// zdx.Reader implements zbuf.Reader and zdx.Writer implements zbuf.Writer so
// generic zng functionality applies, e.g., a Reader can be copied to a Writer
// using zbuf.Copy().
package zdx

import (
Expand Down
6 changes: 3 additions & 3 deletions zio/ndjsonio/ndjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestNDJSONWriter(t *testing.T) {
var out bytes.Buffer
w := NewWriter(&out)
r := tzngio.NewReader(strings.NewReader(c.input), resolver.NewContext())
require.NoError(t, zbuf.Copy(zbuf.NopFlusher(w), r))
require.NoError(t, zbuf.Copy(w, r))
NDJSONEq(t, c.output, out.String())
})
}
Expand Down Expand Up @@ -134,7 +134,7 @@ func runtestcase(t *testing.T, input, output string) {
w := NewWriter(&out)
r, err := NewReader(strings.NewReader(input), resolver.NewContext(), nil, "", "")
require.NoError(t, err)
require.NoError(t, zbuf.Copy(zbuf.NopFlusher(w), r))
require.NoError(t, zbuf.Copy(w, r))
NDJSONEq(t, output, out.String())
}

Expand Down Expand Up @@ -356,7 +356,7 @@ func TestNDJSONTypeErrors(t *testing.T) {
err = r.configureTypes(typeConfig, c.defaultPath)
require.NoError(t, err)

err = zbuf.Copy(zbuf.NopFlusher(w), r)
err = zbuf.Copy(w, r)
if c.success {
require.NoError(t, err)
} else {
Expand Down
4 changes: 2 additions & 2 deletions zio/tzngio/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func boomerangErr(t *testing.T, name, logs, errorMsg string, errorArgs ...interf
t.Run(name, func(t *testing.T) {
in := []byte(strings.TrimSpace(logs) + "\n")
zngSrc := tzngio.NewReader(bytes.NewReader(in), resolver.NewContext())
zngDst := zbuf.NopFlusher(tzngio.NewWriter(&output{}))
zngDst := tzngio.NewWriter(&output{})
err := zbuf.Copy(zngDst, zngSrc)
assert.Errorf(t, err, errorMsg, errorArgs...)
})
Expand All @@ -128,7 +128,7 @@ func boomerang(t *testing.T, name, logs string) {
var out output
in := []byte(strings.TrimSpace(logs) + "\n")
zngSrc := tzngio.NewReader(bytes.NewReader(in), resolver.NewContext())
zngDst := zbuf.NopFlusher(tzngio.NewWriter(&out))
zngDst := tzngio.NewWriter(&out)
err := zbuf.Copy(zngDst, zngSrc)
require.NoError(t, err)
assert.Equal(t, string(in), out.String())
Expand Down
14 changes: 7 additions & 7 deletions zio/zng_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (o *Output) Close() error {

func identity(t *testing.T, logs string) {
var out Output
dst := zbuf.NopFlusher(tzngio.NewWriter(&out))
dst := tzngio.NewWriter(&out)
in := []byte(strings.TrimSpace(logs) + "\n")
src := tzngio.NewReader(bytes.NewReader(in), resolver.NewContext())
err := zbuf.Copy(dst, src)
Expand All @@ -43,13 +43,13 @@ func boomerang(t *testing.T, logs string, compress bool) {
tzngSrc := tzngio.NewReader(bytes.NewReader(in), resolver.NewContext())
var rawzng Output
rawDst := zngio.NewWriter(&rawzng, zio.WriterFlags{ZngLZ4BlockSize: zio.DefaultZngLZ4BlockSize})
err := zbuf.Copy(rawDst, tzngSrc)
require.NoError(t, err)
require.NoError(t, zbuf.Copy(rawDst, tzngSrc))
require.NoError(t, rawDst.Flush())

var out Output
rawSrc := zngio.NewReader(bytes.NewReader(rawzng.Bytes()), resolver.NewContext())
tzngDst := zbuf.NopFlusher(tzngio.NewWriter(&out))
err = zbuf.Copy(tzngDst, rawSrc)
tzngDst := tzngio.NewWriter(&out)
err := zbuf.Copy(tzngDst, rawSrc)
if assert.NoError(t, err) {
assert.Equal(t, in, out.Bytes())
}
Expand All @@ -58,13 +58,13 @@ func boomerang(t *testing.T, logs string, compress bool) {
func boomerangZJSON(t *testing.T, logs string) {
tzngSrc := tzngio.NewReader(strings.NewReader(logs), resolver.NewContext())
var zjsonOutput Output
zjsonDst := zbuf.NopFlusher(zjsonio.NewWriter(&zjsonOutput))
zjsonDst := zjsonio.NewWriter(&zjsonOutput)
err := zbuf.Copy(zjsonDst, tzngSrc)
require.NoError(t, err)

var out Output
zjsonSrc := zjsonio.NewReader(bytes.NewReader(zjsonOutput.Bytes()), resolver.NewContext())
tzngDst := zbuf.NopFlusher(tzngio.NewWriter(&out))
tzngDst := tzngio.NewWriter(&out)
err = zbuf.Copy(tzngDst, zjsonSrc)
if assert.NoError(t, err) {
assert.Equal(t, strings.TrimSpace(logs), strings.TrimSpace(out.String()))
Expand Down
8 changes: 4 additions & 4 deletions zqd/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestSearchNoCtrl(t *testing.T) {
msgs = append(msgs, i)
})
buf := bytes.NewBuffer(nil)
w := zbuf.NopFlusher(tzngio.NewWriter(buf))
w := tzngio.NewWriter(buf)
require.NoError(t, zbuf.Copy(w, r))
require.Equal(t, test.Trim(src), buf.String())
require.Equal(t, 0, len(msgs))
Expand Down Expand Up @@ -1032,7 +1032,7 @@ func archiveStat(t *testing.T, client *api.Connection, space api.SpaceID) string
r, err := client.ArchiveStat(context.Background(), space, nil)
require.NoError(t, err)
buf := bytes.NewBuffer(nil)
w := zbuf.NopFlusher(tzngio.NewWriter(buf))
w := tzngio.NewWriter(buf)
require.NoError(t, zbuf.Copy(w, r))
return buf.String()
}
Expand All @@ -1045,7 +1045,7 @@ func indexSearch(t *testing.T, client *api.Connection, space api.SpaceID, indexN
r, err := client.IndexSearch(context.Background(), space, req, nil)
require.NoError(t, err)
buf := bytes.NewBuffer(nil)
w := zbuf.NopFlusher(tzngio.NewWriter(buf))
w := tzngio.NewWriter(buf)
var msgs []interface{}
r.SetOnCtrl(func(i interface{}) {
msgs = append(msgs, i)
Expand All @@ -1071,7 +1071,7 @@ func search(t *testing.T, client *api.Connection, space api.SpaceID, prog string
r, err := client.Search(context.Background(), req, nil)
require.NoError(t, err)
buf := bytes.NewBuffer(nil)
w := zbuf.NopFlusher(tzngio.NewWriter(buf))
w := tzngio.NewWriter(buf)
var msgs []interface{}
r.SetOnCtrl(func(i interface{}) {
msgs = append(msgs, i)
Expand Down

0 comments on commit 7f68479

Please sign in to comment.