From 7b951a465ec328fd792b6c95b891ab14041eb715 Mon Sep 17 00:00:00 2001 From: Yejun Yang Date: Sat, 23 May 2015 20:15:25 +0000 Subject: [PATCH] fix delimited file memory leak --- .../DelimitedTextFileReaderWriterFactory.java | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/pinterest/secor/io/impl/DelimitedTextFileReaderWriterFactory.java b/src/main/java/com/pinterest/secor/io/impl/DelimitedTextFileReaderWriterFactory.java index f0d1738c3..879c5d267 100644 --- a/src/main/java/com/pinterest/secor/io/impl/DelimitedTextFileReaderWriterFactory.java +++ b/src/main/java/com/pinterest/secor/io/impl/DelimitedTextFileReaderWriterFactory.java @@ -29,6 +29,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.Decompressor; import com.google.common.io.CountingOutputStream; import com.pinterest.secor.common.LogFilePath; @@ -57,6 +60,7 @@ public FileWriter BuildFileWriter(LogFilePath logFilePath, CompressionCodec code protected class DelimitedTextFileReader implements FileReader { private final BufferedInputStream mReader; private long mOffset; + private Decompressor mDecompressor; public DelimitedTextFileReader(LogFilePath path, CompressionCodec codec) throws IOException { Path fsPath = new Path(path.getLogFilePath()); @@ -64,7 +68,8 @@ public DelimitedTextFileReader(LogFilePath path, CompressionCodec codec) throws InputStream inputStream = fs.open(fsPath); this.mReader = (codec == null) ? new BufferedInputStream(inputStream) : new BufferedInputStream( - codec.createInputStream(inputStream)); + codec.createInputStream(inputStream, + mDecompressor = CodecPool.getDecompressor(codec))); this.mOffset = path.getOffset(); } @@ -89,12 +94,16 @@ public KeyValue next() throws IOException { @Override public void close() throws IOException { this.mReader.close(); + if (mDecompressor != null) { + CodecPool.returnDecompressor(mDecompressor); + } } } protected class DelimitedTextFileWriter implements FileWriter { private final CountingOutputStream mCountingStream; private final BufferedOutputStream mWriter; + private Compressor mCompressor; public DelimitedTextFileWriter(LogFilePath path, CompressionCodec codec) throws IOException { Path fsPath = new Path(path.getLogFilePath()); @@ -102,7 +111,8 @@ public DelimitedTextFileWriter(LogFilePath path, CompressionCodec codec) throws this.mCountingStream = new CountingOutputStream(fs.create(fsPath)); this.mWriter = (codec == null) ? new BufferedOutputStream( this.mCountingStream) : new BufferedOutputStream( - codec.createOutputStream(this.mCountingStream)); + codec.createOutputStream(this.mCountingStream, + mCompressor = CodecPool.getCompressor(codec))); } @Override @@ -120,6 +130,9 @@ public void write(KeyValue keyValue) throws IOException { @Override public void close() throws IOException { this.mWriter.close(); + if (mCompressor != null) { + CodecPool.returnCompressor(mCompressor); + } } } -} \ No newline at end of file +}