diff --git a/README.md b/README.md index fd2ecc60..67775ac8 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,7 @@ This is a fork from [awslabs/emr-dynamodb-connector](https://github.com/awslabs/ - Break down the data into chunks of 100 MB instead of 1 GB; - Set the number of task mappers to the number of scan segments by default; - Do not constrain the number of task mappers based on the estimated memory of the worker nodes. +- Add support for excluding a set of Scan segments. The complete changelog can be viewed here: [master...scylla-5.x](https://github.com/awslabs/emr-dynamodb-connector/compare/master...scylladb:emr-dynamodb-connector:scylla-5.x). diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBConstants.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBConstants.java index 23c49bda..001238c0 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBConstants.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBConstants.java @@ -95,6 +95,7 @@ public interface DynamoDBConstants { double READ_EVENTUALLY_TO_STRONGLY_CONSISTENT_FACTOR = 2; String SCAN_SEGMENTS = "dynamodb.scan.segments"; + String EXCLUDED_SCAN_SEGMENTS = "dynamodb.scan.segments.exclude"; int MAX_SCAN_SEGMENTS = 1000000; int MIN_SCAN_SEGMENTS = 1; double BYTES_PER_READ_CAPACITY_UNIT = 4096; diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/read/AbstractDynamoDBInputFormat.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/read/AbstractDynamoDBInputFormat.java index e9a35d8f..6e2d4b55 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/read/AbstractDynamoDBInputFormat.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/read/AbstractDynamoDBInputFormat.java @@ -14,6 +14,11 @@ package org.apache.hadoop.dynamodb.read; import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.dynamodb.DynamoDBClient; @@ -65,13 +70,26 @@ public InputSplit[] getSplits(JobConf conf, int desiredSplits) throws IOExceptio } long tableSizeBytes = conf.getLong(DynamoDBConstants.TABLE_SIZE_BYTES, 1); - int numSegments = getNumSegments(configuredReadThroughput, (int) + int numTotalSegments = getNumSegments(configuredReadThroughput, (int) maxWriteThroughputAllocated, tableSizeBytes, conf); - int numMappers = getNumMappers(numSegments, configuredReadThroughput, conf); - log.info("Using " + numSegments + " segments across " + numMappers + " mappers"); + Set excludedSegments = + Arrays.stream(conf.getInts(DynamoDBConstants.EXCLUDED_SCAN_SEGMENTS)) + .boxed() + .collect(Collectors.toSet()); - return getSplitGenerator().generateSplits(numMappers, numSegments, conf); + List segments = + IntStream.range(0, numTotalSegments) + .boxed() + .filter(i -> !excludedSegments.contains(i)) + .collect(Collectors.toList()); + + int numEffectiveSegments = segments.size(); + int numMappers = getNumMappers(numEffectiveSegments, configuredReadThroughput, conf); + + log.info("Using " + numEffectiveSegments + " segments across " + numMappers + " mappers"); + + return getSplitGenerator().generateSplits(numMappers, numTotalSegments, segments, conf); } protected DynamoDBRecordReaderContext buildDynamoDBRecordReaderContext(InputSplit split, diff --git a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/split/DynamoDBSplitGenerator.java b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/split/DynamoDBSplitGenerator.java index 8e6f8ba1..91e97fa9 100644 --- a/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/split/DynamoDBSplitGenerator.java +++ b/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/split/DynamoDBSplitGenerator.java @@ -27,10 +27,16 @@ public class DynamoDBSplitGenerator { private static final Log log = LogFactory.getLog(DynamoDBSplitGenerator.class); - public InputSplit[] generateSplits(int maxClusterMapTasks, int numSegments, JobConf conf) { - log.info("Generating " + numSegments + " segments for " + maxClusterMapTasks + " max mappers"); + public InputSplit[] generateSplits(int maxClusterMapTasks, int numTotalSegments, + List segments, JobConf conf) { + // Note: this can be different from 'numTotalSegments' because some segments may have been + // excluded + int numEffectiveSegments = segments.size(); + log.info( + "Generating " + segments.size() + " segments for " + maxClusterMapTasks + " max mappers" + ); - int numMappers = Math.min(maxClusterMapTasks, numSegments); + int numMappers = Math.min(maxClusterMapTasks, numEffectiveSegments); List> segmentsPerSplit = new ArrayList>(numMappers); for (int i = 0; i < numMappers; i++) { segmentsPerSplit.add(new ArrayList()); @@ -38,7 +44,7 @@ public InputSplit[] generateSplits(int maxClusterMapTasks, int numSegments, JobC // Round-robin which split gets which segment id int mapper = 0; - for (int i = 0; i < numSegments; i++) { + for (Integer i : segments) { segmentsPerSplit.get(mapper).add(i); mapper = (mapper + 1) % numMappers; } @@ -53,7 +59,7 @@ public InputSplit[] generateSplits(int maxClusterMapTasks, int numSegments, JobC log.info("Assigning " + segmentsPerSplit.get(i).size() + " segments to mapper " + i + ": " + segmentsPerSplit.get(i)); splits[i] = createDynamoDBSplit(getInputPath(conf), approxItemCountPerSplit, i, - segmentsPerSplit.get(i), numSegments, estimateLength); + segmentsPerSplit.get(i), numTotalSegments, estimateLength); } return splits; diff --git a/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/split/DynamoDBSplitGeneratorTest.java b/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/split/DynamoDBSplitGeneratorTest.java index f2ed8fd9..e9575705 100644 --- a/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/split/DynamoDBSplitGeneratorTest.java +++ b/emr-dynamodb-hadoop/src/test/java/org/apache/hadoop/dynamodb/split/DynamoDBSplitGeneratorTest.java @@ -21,37 +21,85 @@ import org.apache.hadoop.mapred.JobConf; import org.junit.Test; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + public class DynamoDBSplitGeneratorTest { DynamoDBSplitGenerator splitGenerator = new DynamoDBSplitGenerator(); @Test public void testGenerateEvenSplits() { - InputSplit[] splits = splitGenerator.generateSplits(1, 1, getTestConf()); + InputSplit[] splits = splitGenerator.generateSplits(1, 1, generateSegments(1), getTestConf()); verifySplits(splits, 1, 1); - splits = splitGenerator.generateSplits(1000, 1000, getTestConf()); + splits = splitGenerator.generateSplits(1000, 1000, generateSegments(1000), getTestConf()); verifySplits(splits, 1000, 1000); } @Test public void testGenerateFewerSegmentsThanMappers() { - InputSplit[] splits = splitGenerator.generateSplits(10, 1, getTestConf()); + InputSplit[] splits = splitGenerator.generateSplits(10, 1, generateSegments(1), getTestConf()); verifySplits(splits, 1, 1); } @Test public void testGenerateMoreSegmentsThanMappersEvenly() { - InputSplit[] splits = splitGenerator.generateSplits(10, 20, getTestConf()); + InputSplit[] splits = splitGenerator.generateSplits(10, 20, generateSegments(20), getTestConf()); verifySplits(splits, 20, 10); } @Test public void testGenerateMoreSegmentsThanMappersUnevenly() { - InputSplit[] splits = splitGenerator.generateSplits(10, 27, getTestConf()); + InputSplit[] splits = splitGenerator.generateSplits(10, 27, generateSegments(27), getTestConf()); verifySplits(splits, 27, 10); } + @Test + public void testExcludedSegments1() { + int maxClusterMapTasks = 10; + int numTotalSegments = 10; + int skippedSegments = 5; + List effectiveSegments = IntStream.range(0, 10).skip(skippedSegments).boxed().collect(Collectors.toList()); + InputSplit[] splits = splitGenerator.generateSplits(maxClusterMapTasks, numTotalSegments, effectiveSegments, getTestConf()); + // We expect as many mappers as the number of effective segments since maxClusterMapTasks is greater than + // the number of effective segments + int numMappers = numTotalSegments - skippedSegments; + verifySkippedSegments(splits, effectiveSegments, numTotalSegments, numMappers); + } + + @Test + public void testExcludedSegments2() { + int maxClusterMapTasks = 10; + int numTotalSegments = 10; + List effectiveSegments = generateSegments(numTotalSegments); + // Remove arbitrary segments + effectiveSegments.remove(9); + effectiveSegments.remove(7); + effectiveSegments.remove(5); + effectiveSegments.remove(1); + int skippedSegments = numTotalSegments - effectiveSegments.size(); + InputSplit[] splits = splitGenerator.generateSplits(maxClusterMapTasks, numTotalSegments, effectiveSegments, getTestConf()); + // We expect as many mappers as the number of effective segments since maxClusterMapTasks is greater than + // the number of effective segments + int numMappers = numTotalSegments - skippedSegments; + verifySkippedSegments(splits, effectiveSegments, numTotalSegments, numMappers); + } + + @Test + public void testExcludedSegments3() { + int maxClusterMapTasks = 4; + int numTotalSegments = 10; + List effectiveSegments = generateSegments(numTotalSegments); + // Remove arbitrary segments + effectiveSegments.remove(6); + effectiveSegments.remove(5); + effectiveSegments.remove(2); + InputSplit[] splits = splitGenerator.generateSplits(maxClusterMapTasks, numTotalSegments, effectiveSegments, getTestConf()); + verifySkippedSegments(splits, effectiveSegments, numTotalSegments, maxClusterMapTasks); + } + private JobConf getTestConf() { JobConf conf = new JobConf(); conf.set("mapred.input.dir", "abc"); @@ -85,4 +133,24 @@ private void verifySplits(InputSplit[] splits, int numSegments, int numMappers) } } + private void verifySkippedSegments(InputSplit[] splits, List effectiveSegments, int numTotalSegments, int numMappers) { + assertEquals(numMappers, splits.length); + for (InputSplit split1 : splits) { + DynamoDBSplit split = (DynamoDBSplit) split1; + // Yet, each splits remembers correctly the number of total segments so that Scan requests are + // correctly constructed + assertEquals(numTotalSegments, split.getTotalSegments()); + for (Integer segment : split.getSegments()) { + // Check that every segment in the split is part of the effective segments + assertTrue(effectiveSegments.remove(segment)); + } + } + // Check that all the effective segments have been allocated to a split + assertTrue(effectiveSegments.isEmpty()); + } + + private List generateSegments(int numSegments) { + return IntStream.range(0, numSegments).boxed().collect(Collectors.toList()); + } + }