Skip to content

Commit

Permalink
fixing flow tests pt.1
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored and iskakaushik committed Oct 18, 2023
1 parent f7742b8 commit 8bd54ff
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type PeerDBOCFWriter struct {
stream *model.QRecordStream
avroSchema *model.QRecordAvroSchemaDefinition
compress bool
writer io.WriteCloser
}

func NewPeerDBOCFWriter(
Expand Down Expand Up @@ -52,27 +53,33 @@ func NewPeerDBOCFWriterWithCompression(
}
}

func (p *PeerDBOCFWriter) getCompressedWriter(w io.Writer) (io.WriteCloser, error) {
func (p *PeerDBOCFWriter) initWriteCloser(w io.Writer) error {
var err error
if p.compress {
return zstd.NewWriter(w)
p.writer, err = zstd.NewWriter(w)
if err != nil {
return fmt.Errorf("error while initializing zstd encoding writer: %w", err)
}
} else {
p.writer = &nopWriteCloser{w}
}
return &nopWriteCloser{w}, nil
return nil
}

func (p *PeerDBOCFWriter) createOCFWriter(w io.Writer) (*goavro.OCFWriter, error) {
compressedWriter, err := p.getCompressedWriter(w)
err := p.initWriteCloser(w)
if err != nil {
return nil, fmt.Errorf("failed to create compressed writer: %w", err)
}
defer compressedWriter.Close()

ocfWriter, err := goavro.NewOCFWriter(goavro.OCFConfig{
W: compressedWriter,
W: p.writer,
Schema: p.avroSchema.Schema,
})
if err != nil {
return nil, fmt.Errorf("failed to create OCF writer: %w", err)
}

return ocfWriter, nil
}

Expand Down Expand Up @@ -136,6 +143,9 @@ func (p *PeerDBOCFWriter) WriteOCF(w io.Writer) (int, error) {
if err != nil {
return 0, fmt.Errorf("failed to create OCF writer: %w", err)
}
// we have to keep a reference to the underlying writer as goavro doesn't provide any access to it
defer p.writer.Close()

numRows, err := p.writeRecordsToOCFWriter(ocfWriter)
if err != nil {
return 0, fmt.Errorf("failed to write records to OCF writer: %w", err)
Expand Down

0 comments on commit 8bd54ff

Please sign in to comment.