diff --git a/pom.xml b/pom.xml
index f1fe37a..776a412 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
org.disq-bio
disq
- 0.3.7-SNAPSHOT
+ 0.3.7-bugfix-SNAPSHOT
Disq
A library for manipulating bioinformatics sequencing formats in Apache Spark.
https://github.com/disq-bio/disq
diff --git a/src/main/java/org/disq_bio/disq/impl/file/FileSplitInputFormat.java b/src/main/java/org/disq_bio/disq/impl/file/FileSplitInputFormat.java
index 2c84b29..b9cd076 100644
--- a/src/main/java/org/disq_bio/disq/impl/file/FileSplitInputFormat.java
+++ b/src/main/java/org/disq_bio/disq/impl/file/FileSplitInputFormat.java
@@ -26,7 +26,11 @@
package org.disq_bio.disq.impl.file;
import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -84,4 +88,14 @@ public RecordReader createRecordReader(
InputSplit split, TaskAttemptContext context) {
return new FileSplitRecordReader();
}
+
+ /**
+ * We override this method because super.listStatus returns files in an effectively random order
+ * on some filesystems. This breaks ordering assumptions and results in data corruption. We sort
+ * the results here in order to guarantee part files are returned in numeric order.
+ */
+ @Override
+ protected List listStatus(final JobContext job) throws IOException {
+ return super.listStatus(job).stream().sorted().collect(Collectors.toList());
+ }
}
diff --git a/src/test/java/org/disq_bio/disq/AnySamTestUtil.java b/src/test/java/org/disq_bio/disq/AnySamTestUtil.java
index fc1084d..4efb4f3 100644
--- a/src/test/java/org/disq_bio/disq/AnySamTestUtil.java
+++ b/src/test/java/org/disq_bio/disq/AnySamTestUtil.java
@@ -50,6 +50,8 @@
import java.nio.file.Path;
import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
import org.disq_bio.disq.impl.file.NioFileSystemWrapper;
import org.disq_bio.disq.impl.formats.BoundedTraversalUtil;
import org.disq_bio.disq.impl.formats.sam.SamFormat;
@@ -255,4 +257,19 @@ private static int size(Iterator iterator) {
}
return count;
}
+
+ public static List loadEntireReadsFile(String samPath, String referencePath)
+ throws IOException {
+ ReferenceSource referenceSource =
+ referencePath == null
+ ? null
+ : new ReferenceSource(NioFileSystemWrapper.asPath(referencePath));
+ try (SamReader reader =
+ SamReaderFactory.makeDefault()
+ .validationStringency(ValidationStringency.DEFAULT_STRINGENCY)
+ .referenceSource(referenceSource)
+ .open(SamInputResource.of(NioFileSystemWrapper.asPath(samPath)))) {
+ return reader.iterator().stream().collect(Collectors.toList());
+ }
+ }
}
diff --git a/src/test/java/org/disq_bio/disq/HtsjdkReadsRddTest.java b/src/test/java/org/disq_bio/disq/HtsjdkReadsRddTest.java
index 10d3903..7317b28 100644
--- a/src/test/java/org/disq_bio/disq/HtsjdkReadsRddTest.java
+++ b/src/test/java/org/disq_bio/disq/HtsjdkReadsRddTest.java
@@ -44,6 +44,7 @@
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import org.apache.spark.api.java.JavaRDD;
@@ -152,6 +153,11 @@ public void testReadAndWrite(
Assert.assertEquals(expectedCount, SamtoolsTestUtil.countReads(outputPath, refPath));
}
+ // check the actual reads match
+ List expectedSamRecords = AnySamTestUtil.loadEntireReadsFile(inputPath, refPath);
+ List actualSamRecords = AnySamTestUtil.loadEntireReadsFile(outputPath, refPath);
+ Assert.assertEquals(expectedSamRecords, actualSamRecords);
+
// check we can read back what we've just written
Assert.assertEquals(expectedCount, htsjdkReadsRddStorage.read(outputPath).getReads().count());
}
@@ -257,7 +263,13 @@ public void testReadAndWriteMultiple(
}
// check we can read back what we've just written
- Assert.assertEquals(expectedCount, htsjdkReadsRddStorage.read(outputPath).getReads().count());
+ final JavaRDD readsRdd = htsjdkReadsRddStorage.read(outputPath).getReads();
+ Assert.assertEquals(expectedCount, readsRdd.count());
+
+ // check the actual reads match
+ List expectedSamRecords = AnySamTestUtil.loadEntireReadsFile(inputPath, refPath);
+ final List actualSamRecords = readsRdd.collect();
+ Assert.assertEquals(expectedSamRecords, actualSamRecords);
}
protected Object[] parametersForTestReadAndWriteSubset() {