Skip to content

Commit

Permalink
fix delimited file memory leak
Browse files Browse the repository at this point in the history
  • Loading branch information
Yejun Yang committed May 23, 2015
1 parent 9937d7d commit 7b951a4
Showing 1 changed file with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.Decompressor;

import com.google.common.io.CountingOutputStream;
import com.pinterest.secor.common.LogFilePath;
Expand Down Expand Up @@ -57,14 +60,16 @@ public FileWriter BuildFileWriter(LogFilePath logFilePath, CompressionCodec code
protected class DelimitedTextFileReader implements FileReader {
private final BufferedInputStream mReader;
private long mOffset;
private Decompressor mDecompressor;

public DelimitedTextFileReader(LogFilePath path, CompressionCodec codec) throws IOException {
Path fsPath = new Path(path.getLogFilePath());
FileSystem fs = FileUtil.getFileSystem(path.getLogFilePath());
InputStream inputStream = fs.open(fsPath);
this.mReader = (codec == null) ? new BufferedInputStream(inputStream)
: new BufferedInputStream(
codec.createInputStream(inputStream));
codec.createInputStream(inputStream,
mDecompressor = CodecPool.getDecompressor(codec)));
this.mOffset = path.getOffset();
}

Expand All @@ -89,20 +94,25 @@ public KeyValue next() throws IOException {
@Override
public void close() throws IOException {
this.mReader.close();
if (mDecompressor != null) {
CodecPool.returnDecompressor(mDecompressor);
}
}
}

protected class DelimitedTextFileWriter implements FileWriter {
private final CountingOutputStream mCountingStream;
private final BufferedOutputStream mWriter;
private Compressor mCompressor;

public DelimitedTextFileWriter(LogFilePath path, CompressionCodec codec) throws IOException {
Path fsPath = new Path(path.getLogFilePath());
FileSystem fs = FileUtil.getFileSystem(path.getLogFilePath());
this.mCountingStream = new CountingOutputStream(fs.create(fsPath));
this.mWriter = (codec == null) ? new BufferedOutputStream(
this.mCountingStream) : new BufferedOutputStream(
codec.createOutputStream(this.mCountingStream));
codec.createOutputStream(this.mCountingStream,
mCompressor = CodecPool.getCompressor(codec)));
}

@Override
Expand All @@ -120,6 +130,9 @@ public void write(KeyValue keyValue) throws IOException {
@Override
public void close() throws IOException {
this.mWriter.close();
if (mCompressor != null) {
CodecPool.returnCompressor(mCompressor);
}
}
}
}
}

0 comments on commit 7b951a4

Please sign in to comment.