Skip to content

Commit

Permalink
Merge pull request pinterest#96 from yyejun/fix_memory_leak
Browse files Browse the repository at this point in the history
Fix memory leak when DelimitedTextFileReaderWriterFactory combined with native lib
  • Loading branch information
pgarbacki committed May 27, 2015
2 parents 83d2b3f + 18f4af2 commit 61d5f05
Showing 1 changed file with 14 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.Decompressor;

import com.google.common.io.CountingOutputStream;
import com.pinterest.secor.common.LogFilePath;
Expand Down Expand Up @@ -57,14 +60,16 @@ public FileWriter BuildFileWriter(LogFilePath logFilePath, CompressionCodec code
protected class DelimitedTextFileReader implements FileReader {
private final BufferedInputStream mReader;
private long mOffset;
private Decompressor mDecompressor = null;

public DelimitedTextFileReader(LogFilePath path, CompressionCodec codec) throws IOException {
Path fsPath = new Path(path.getLogFilePath());
FileSystem fs = FileUtil.getFileSystem(path.getLogFilePath());
InputStream inputStream = fs.open(fsPath);
this.mReader = (codec == null) ? new BufferedInputStream(inputStream)
: new BufferedInputStream(
codec.createInputStream(inputStream));
codec.createInputStream(inputStream,
mDecompressor = CodecPool.getDecompressor(codec)));
this.mOffset = path.getOffset();
}

Expand All @@ -89,20 +94,24 @@ public KeyValue next() throws IOException {
@Override
public void close() throws IOException {
this.mReader.close();
CodecPool.returnDecompressor(mDecompressor);
mDecompressor = null;
}
}

protected class DelimitedTextFileWriter implements FileWriter {
private final CountingOutputStream mCountingStream;
private final BufferedOutputStream mWriter;
private Compressor mCompressor = null;

public DelimitedTextFileWriter(LogFilePath path, CompressionCodec codec) throws IOException {
Path fsPath = new Path(path.getLogFilePath());
FileSystem fs = FileUtil.getFileSystem(path.getLogFilePath());
this.mCountingStream = new CountingOutputStream(fs.create(fsPath));
this.mWriter = (codec == null) ? new BufferedOutputStream(
this.mCountingStream) : new BufferedOutputStream(
codec.createOutputStream(this.mCountingStream));
codec.createOutputStream(this.mCountingStream,
mCompressor = CodecPool.getCompressor(codec)));
}

@Override
Expand All @@ -120,6 +129,8 @@ public void write(KeyValue keyValue) throws IOException {
@Override
public void close() throws IOException {
this.mWriter.close();
CodecPool.returnCompressor(mCompressor);
mCompressor = null;
}
}
}
}

0 comments on commit 61d5f05

Please sign in to comment.