-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Set the number of Hadoop task mappers to the number of scan segments #143
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
package com.scylladb.migrator.alternator | ||
import org.apache.commons.logging.LogFactory | ||
import org.apache.hadoop.dynamodb.DynamoDBConstants | ||
import org.apache.hadoop.mapred.{ InputSplit, JobConf } | ||
|
||
/** | ||
* Specializes the split strategy: | ||
* - do not bound the maximum number of partitions by the available memory per node | ||
* - use as many partitions as the number of scan segments | ||
* - by default, create segments that split the data into 128 MB chunks | ||
*/ | ||
class DynamoDBInputFormat extends org.apache.hadoop.dynamodb.read.DynamoDBInputFormat { | ||
|
||
private val log = LogFactory.getLog(classOf[DynamoDBInputFormat]) | ||
|
||
override def getSplits(conf: JobConf, desiredSplits: Int): Array[InputSplit] = { | ||
val readPercentage = conf.getDouble( | ||
DynamoDBConstants.THROUGHPUT_READ_PERCENT, | ||
DynamoDBConstants.DEFAULT_THROUGHPUT_PERCENTAGE.toDouble) | ||
if (readPercentage <= 0) { | ||
sys.error(s"Invalid read percentage: ${readPercentage}") | ||
} | ||
log.info(s"Read percentage: ${readPercentage}") | ||
val maxReadThroughputAllocated = conf.getInt(DynamoDBConstants.READ_THROUGHPUT, 1) | ||
val maxWriteThroughputAllocated = conf.getInt(DynamoDBConstants.WRITE_THROUGHPUT, 1) | ||
if (maxReadThroughputAllocated < 1.0) { | ||
sys.error( | ||
s"Read throughput should not be less than 1. Read throughput percent: ${maxReadThroughputAllocated}") | ||
} | ||
|
||
val configuredReadThroughput = | ||
math.max(math.floor(maxReadThroughputAllocated * readPercentage).intValue(), 1) | ||
|
||
val tableSizeBytes = conf.getLong(DynamoDBConstants.TABLE_SIZE_BYTES, 1) | ||
val numSegments = | ||
getNumSegments(maxReadThroughputAllocated, maxWriteThroughputAllocated, tableSizeBytes, conf) | ||
|
||
val numMappers = getNumMappers(numSegments, configuredReadThroughput, conf) | ||
|
||
log.info(s"Using ${numSegments} segments across ${numMappers} mappers") | ||
|
||
getSplitGenerator().generateSplits(numMappers, numSegments, conf) | ||
} | ||
|
||
override def getNumSegments(tableNormalizedReadThroughput: Int, | ||
tableNormalizedWriteThroughput: Int, | ||
currentTableSizeBytes: Long, | ||
conf: JobConf): Int = { | ||
// Use configured scan segment if provided | ||
val configuredScanSegment = conf.getInt(DynamoDBConstants.SCAN_SEGMENTS, -1) | ||
if (configuredScanSegment > 0) { | ||
val numSegments = | ||
math.max( | ||
math.min(configuredScanSegment, DynamoDBConstants.MAX_SCAN_SEGMENTS), | ||
DynamoDBConstants.MIN_SCAN_SEGMENTS | ||
) | ||
log.info( | ||
s"Using number of segments configured using ${DynamoDBConstants.SCAN_SEGMENTS}: ${numSegments}") | ||
numSegments | ||
} else { | ||
// split into segments of at most 100 MB each (note: upstream implementation splits into 1 GB segments) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The upstream implementation uses segments of 1 GB each. In my tests the throughput is the highest with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if you use faster cpu, memory and network also for big DBs you might end up with lots of tasks that will expect more memory on driver and master There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we make this configurable? @julienrf ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, good idea! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. btw. audience project had similar calculation, seems they used ~128MB There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. okay, you can always use numSegments to override manually ... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, you can already manually set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I’ve created #146 to track this possible feature that we could implement in the future. |
||
val numSegmentsForSize = { | ||
val bytesPerSegment = 100 * 1024 * 1024 | ||
(currentTableSizeBytes / bytesPerSegment).ceil.intValue() | ||
} | ||
log.info(s"Would use ${numSegmentsForSize} segments for size") | ||
|
||
val numSegmentsForThroughput = | ||
(tableNormalizedReadThroughput / DynamoDBConstants.MIN_IO_PER_SEGMENT).intValue() | ||
log.info(s"Would use ${numSegmentsForThroughput} segments for throughput") | ||
|
||
// Take the smallest and fit to bounds | ||
val numSegments = | ||
math.max( | ||
math.min( | ||
math.min(numSegmentsForSize, numSegmentsForThroughput), | ||
DynamoDBConstants.MAX_SCAN_SEGMENTS | ||
), | ||
DynamoDBConstants.MIN_SCAN_SEGMENTS | ||
) | ||
log.info(s"Using computed number of segments: ${numSegments}") | ||
numSegments | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
package com.scylladb.migrator.alternator | ||
|
||
import com.amazonaws.services.dynamodbv2.model.{BillingMode, BillingModeSummary, ProvisionedThroughputDescription, TableDescription} | ||
import com.scylladb.migrator.readers.DynamoDB | ||
import org.apache.spark.sql.SparkSession | ||
|
||
class DynamoDBInputFormatTest extends munit.FunSuite { | ||
|
||
val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate() | ||
|
||
val GB: Long = 1024 * 1024 * 1024 | ||
|
||
test("no configured scanSegments in on-demand billing mode") { | ||
checkPartitions(10)(tableSizeBytes = 1 * GB, tableProvisionedThroughput = None) | ||
} | ||
Comment on lines
+13
to
+15
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By default, a table containing 1 GB of data and using on-demand billing will be split into 10 chunks of 100 MB each. |
||
|
||
test("no configured scanSegments in on-demand billing mode and table size is 100 GB") { | ||
checkPartitions(1024)(tableSizeBytes = 100 * GB, tableProvisionedThroughput = None) | ||
} | ||
Comment on lines
+17
to
+19
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similarly, a table containing 100 GB of data will be split into 1024 chunks of 100 MB each. |
||
|
||
test("no configured scanSegments in provisioned billing mode") { | ||
checkPartitions(10)(tableSizeBytes = 1 * GB, tableProvisionedThroughput = Some((25, 25))) | ||
} | ||
Comment on lines
+21
to
+23
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When the billing mode is provisioned, we still get 10 chunks in the current algorithm because there is no point in using a higher number of smaller chunks, AFAIU. |
||
|
||
test("scanSegments = 42") { | ||
checkPartitions(42)(configuredScanSegments = Some(42)) | ||
} | ||
Comment on lines
+25
to
+27
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can also explicitly set the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 42? :-D |
||
|
||
test("scanSegements = 42 and maxMapTasks = 10") { | ||
checkPartitions(10)(configuredScanSegments = Some(42), configuredMaxMapTasks = Some(10)) | ||
} | ||
Comment on lines
+29
to
+31
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Last, you can explicitly set both the number of |
||
|
||
def checkPartitions(expectedPartitions: Int)( | ||
tableSizeBytes: Long = 0L, | ||
tableProvisionedThroughput: Option[(Int, Int)] = None, | ||
configuredScanSegments: Option[Int] = None, | ||
configuredMaxMapTasks: Option[Int] = None, | ||
configuredReadThroughput: Option[Int] = None, | ||
configuredThroughputReadPercent: Option[Float] = None | ||
): Unit = { | ||
val tableDescription = | ||
new TableDescription() | ||
.withTableName("DummyTable") | ||
.withTableSizeBytes(tableSizeBytes) | ||
tableProvisionedThroughput match { | ||
case Some((rcu, wcu)) => | ||
tableDescription.withProvisionedThroughput( | ||
new ProvisionedThroughputDescription() | ||
.withReadCapacityUnits(rcu) | ||
.withWriteCapacityUnits(wcu) | ||
) | ||
case None => | ||
tableDescription.withProvisionedThroughput(new ProvisionedThroughputDescription()) | ||
.withBillingModeSummary(new BillingModeSummary().withBillingMode(BillingMode.PAY_PER_REQUEST)) | ||
} | ||
|
||
val jobConf = DynamoDB.makeJobConf( | ||
spark = spark, | ||
endpoint = None, | ||
credentials = None, | ||
region = None, | ||
table = "DummyTable", | ||
scanSegments = configuredScanSegments, | ||
maxMapTasks = configuredMaxMapTasks, | ||
readThroughput = configuredReadThroughput, | ||
throughputReadPercent = configuredThroughputReadPercent, | ||
description = tableDescription | ||
) | ||
val splits = new DynamoDBInputFormat().getSplits(jobConf, 1) | ||
|
||
val partitions = splits.length | ||
assertEquals(partitions, expectedPartitions) | ||
} | ||
|
||
override def afterAll(): Unit = { | ||
spark.stop() | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our class overrides the base implementation of the methods
getSplits
andgetNumSegments
. Our implementation is based on the original implementation available here:https://github.com/awslabs/emr-dynamodb-connector/blob/07391dd0937bdbb20b86ec79444798df11b0711f/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/read/AbstractDynamoDBInputFormat.java
But with some variations.