diff --git a/pom.xml b/pom.xml index f1fe37a..776a412 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ org.disq-bio disq - 0.3.7-SNAPSHOT + 0.3.7-bugfix-SNAPSHOT Disq A library for manipulating bioinformatics sequencing formats in Apache Spark. https://github.com/disq-bio/disq diff --git a/src/main/java/org/disq_bio/disq/impl/file/FileSplitInputFormat.java b/src/main/java/org/disq_bio/disq/impl/file/FileSplitInputFormat.java index 2c84b29..b9cd076 100644 --- a/src/main/java/org/disq_bio/disq/impl/file/FileSplitInputFormat.java +++ b/src/main/java/org/disq_bio/disq/impl/file/FileSplitInputFormat.java @@ -26,7 +26,11 @@ package org.disq_bio.disq.impl.file; import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -84,4 +88,14 @@ public RecordReader createRecordReader( InputSplit split, TaskAttemptContext context) { return new FileSplitRecordReader(); } + + /** + * We override this method because super.listStatus returns files in an effectively random order + * on some filesystems. This breaks ordering assumptions and results in data corruption. We sort + * the results here in order to guarantee part files are returned in numeric order. + */ + @Override + protected List listStatus(final JobContext job) throws IOException { + return super.listStatus(job).stream().sorted().collect(Collectors.toList()); + } } diff --git a/src/test/java/org/disq_bio/disq/AnySamTestUtil.java b/src/test/java/org/disq_bio/disq/AnySamTestUtil.java index fc1084d..4efb4f3 100644 --- a/src/test/java/org/disq_bio/disq/AnySamTestUtil.java +++ b/src/test/java/org/disq_bio/disq/AnySamTestUtil.java @@ -50,6 +50,8 @@ import java.nio.file.Path; import java.util.Collections; import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; import org.disq_bio.disq.impl.file.NioFileSystemWrapper; import org.disq_bio.disq.impl.formats.BoundedTraversalUtil; import org.disq_bio.disq.impl.formats.sam.SamFormat; @@ -255,4 +257,19 @@ private static int size(Iterator iterator) { } return count; } + + public static List loadEntireReadsFile(String samPath, String referencePath) + throws IOException { + ReferenceSource referenceSource = + referencePath == null + ? null + : new ReferenceSource(NioFileSystemWrapper.asPath(referencePath)); + try (SamReader reader = + SamReaderFactory.makeDefault() + .validationStringency(ValidationStringency.DEFAULT_STRINGENCY) + .referenceSource(referenceSource) + .open(SamInputResource.of(NioFileSystemWrapper.asPath(samPath)))) { + return reader.iterator().stream().collect(Collectors.toList()); + } + } } diff --git a/src/test/java/org/disq_bio/disq/HtsjdkReadsRddTest.java b/src/test/java/org/disq_bio/disq/HtsjdkReadsRddTest.java index 10d3903..7317b28 100644 --- a/src/test/java/org/disq_bio/disq/HtsjdkReadsRddTest.java +++ b/src/test/java/org/disq_bio/disq/HtsjdkReadsRddTest.java @@ -44,6 +44,7 @@ import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; +import java.util.List; import junitparams.JUnitParamsRunner; import junitparams.Parameters; import org.apache.spark.api.java.JavaRDD; @@ -152,6 +153,11 @@ public void testReadAndWrite( Assert.assertEquals(expectedCount, SamtoolsTestUtil.countReads(outputPath, refPath)); } + // check the actual reads match + List expectedSamRecords = AnySamTestUtil.loadEntireReadsFile(inputPath, refPath); + List actualSamRecords = AnySamTestUtil.loadEntireReadsFile(outputPath, refPath); + Assert.assertEquals(expectedSamRecords, actualSamRecords); + // check we can read back what we've just written Assert.assertEquals(expectedCount, htsjdkReadsRddStorage.read(outputPath).getReads().count()); } @@ -257,7 +263,13 @@ public void testReadAndWriteMultiple( } // check we can read back what we've just written - Assert.assertEquals(expectedCount, htsjdkReadsRddStorage.read(outputPath).getReads().count()); + final JavaRDD readsRdd = htsjdkReadsRddStorage.read(outputPath).getReads(); + Assert.assertEquals(expectedCount, readsRdd.count()); + + // check the actual reads match + List expectedSamRecords = AnySamTestUtil.loadEntireReadsFile(inputPath, refPath); + final List actualSamRecords = readsRdd.collect(); + Assert.assertEquals(expectedSamRecords, actualSamRecords); } protected Object[] parametersForTestReadAndWriteSubset() {