Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set the number of Hadoop task mappers to the number of scan segments #143

Merged
merged 1 commit into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ lazy val migrator = (project in file("migrator")).settings(
lazy val tests = project.in(file("tests")).settings(
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk-dynamodb" % awsSdkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.cassandra" % "java-driver-query-builder" % "4.18.0",
"com.github.mjakubowski84" %% "parquet4s-core" % "1.9.4",
"org.apache.hadoop" % "hadoop-client" % "2.9.2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ object DynamoUtils {
jobConf.set(
"mapred.output.format.class",
"org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
jobConf.set("mapred.input.format.class", "com.scylladb.migrator.alternator.DynamoDBInputFormat")
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.scylladb.migrator.alternator
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.dynamodb.DynamoDBConstants
import org.apache.hadoop.mapred.{ InputSplit, JobConf }

/**
* Specializes the split strategy:
* - do not bound the maximum number of partitions by the available memory per node
* - use as many partitions as the number of scan segments
* - by default, create segments that split the data into 128 MB chunks
*/
class DynamoDBInputFormat extends org.apache.hadoop.dynamodb.read.DynamoDBInputFormat {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our class overrides the base implementation of the methods getSplits and getNumSegments. Our implementation is based on the original implementation available here:

https://github.com/awslabs/emr-dynamodb-connector/blob/07391dd0937bdbb20b86ec79444798df11b0711f/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/read/AbstractDynamoDBInputFormat.java

But with some variations.


private val log = LogFactory.getLog(classOf[DynamoDBInputFormat])

override def getSplits(conf: JobConf, desiredSplits: Int): Array[InputSplit] = {
val readPercentage = conf.getDouble(
DynamoDBConstants.THROUGHPUT_READ_PERCENT,
DynamoDBConstants.DEFAULT_THROUGHPUT_PERCENTAGE.toDouble)
if (readPercentage <= 0) {
sys.error(s"Invalid read percentage: ${readPercentage}")
}
log.info(s"Read percentage: ${readPercentage}")
val maxReadThroughputAllocated = conf.getInt(DynamoDBConstants.READ_THROUGHPUT, 1)
val maxWriteThroughputAllocated = conf.getInt(DynamoDBConstants.WRITE_THROUGHPUT, 1)
if (maxReadThroughputAllocated < 1.0) {
sys.error(
s"Read throughput should not be less than 1. Read throughput percent: ${maxReadThroughputAllocated}")
}

val configuredReadThroughput =
math.max(math.floor(maxReadThroughputAllocated * readPercentage).intValue(), 1)

val tableSizeBytes = conf.getLong(DynamoDBConstants.TABLE_SIZE_BYTES, 1)
val numSegments =
getNumSegments(maxReadThroughputAllocated, maxWriteThroughputAllocated, tableSizeBytes, conf)

val numMappers = getNumMappers(numSegments, configuredReadThroughput, conf)

log.info(s"Using ${numSegments} segments across ${numMappers} mappers")

getSplitGenerator().generateSplits(numMappers, numSegments, conf)
}

override def getNumSegments(tableNormalizedReadThroughput: Int,
tableNormalizedWriteThroughput: Int,
currentTableSizeBytes: Long,
conf: JobConf): Int = {
// Use configured scan segment if provided
val configuredScanSegment = conf.getInt(DynamoDBConstants.SCAN_SEGMENTS, -1)
if (configuredScanSegment > 0) {
val numSegments =
math.max(
math.min(configuredScanSegment, DynamoDBConstants.MAX_SCAN_SEGMENTS),
DynamoDBConstants.MIN_SCAN_SEGMENTS
)
log.info(
s"Using number of segments configured using ${DynamoDBConstants.SCAN_SEGMENTS}: ${numSegments}")
numSegments
} else {
// split into segments of at most 100 MB each (note: upstream implementation splits into 1 GB segments)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The upstream implementation uses segments of 1 GB each.

In my tests the throughput is the highest with n segments of about 100 MB where n is also divisible by the total number of CPUs of the Spark cluster (e.g., in my case I had 8 cores, and the best throughput was observed with 16 segments of 106 MB to transfer 1.7 GB of data in total).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you use faster cpu, memory and network
100MB might be bigger

also for big DBs you might end up with lots of tasks that will expect more memory on driver and master

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this configurable? @julienrf ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good idea!

Copy link
Contributor

@tarzanek tarzanek May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, you can always use numSegments to override manually ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, you can always use numSegments to override manually ...

Yes, you can already manually set scanSegments and the logic below is used only in case it is missing. We could also provide a segmentSize setting which could be an alternative to scanSegments to control the partitioning. The benefits would be that the same configuration (e.g. segmentSize: 100) could be used with different tables and it would automatically compute an appropriate number of segments that fit the desired size. Whereas with scanSegments you have to adapt its value to every table you migrate (because you want to adjust the number of segments according to each table size).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve created #146 to track this possible feature that we could implement in the future.

val numSegmentsForSize = {
val bytesPerSegment = 100 * 1024 * 1024
(currentTableSizeBytes / bytesPerSegment).ceil.intValue()
}
log.info(s"Would use ${numSegmentsForSize} segments for size")

val numSegmentsForThroughput =
(tableNormalizedReadThroughput / DynamoDBConstants.MIN_IO_PER_SEGMENT).intValue()
log.info(s"Would use ${numSegmentsForThroughput} segments for throughput")

// Take the smallest and fit to bounds
val numSegments =
math.max(
math.min(
math.min(numSegmentsForSize, numSegmentsForThroughput),
DynamoDBConstants.MAX_SCAN_SEGMENTS
),
DynamoDBConstants.MIN_SCAN_SEGMENTS
)
log.info(s"Using computed number of segments: ${numSegments}")
numSegments
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package com.scylladb.migrator.readers
import com.amazonaws.services.dynamodbv2.model.TableDescription
import com.scylladb.migrator.DynamoUtils
import com.scylladb.migrator.DynamoUtils.{ setDynamoDBJobConf, setOptionalConf }
import com.scylladb.migrator.alternator.DynamoDBInputFormat
import com.scylladb.migrator.config.{ AWSCredentials, DynamoDBEndpoint, SourceSettings }
import org.apache.hadoop.dynamodb.{ DynamoDBConstants, DynamoDBItemWritable }
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -41,10 +41,46 @@ object DynamoDB {
maxMapTasks: Option[Int],
readThroughput: Option[Int],
throughputReadPercent: Option[Float]): (RDD[(Text, DynamoDBItemWritable)], TableDescription) = {
val description = DynamoUtils

val tableDescription = DynamoUtils
.buildDynamoClient(endpoint, credentials.map(_.toAWSCredentialsProvider), region)
.describeTable(table)
.getTable

val jobConf =
makeJobConf(
spark,
endpoint,
credentials,
region,
table,
scanSegments,
maxMapTasks,
readThroughput,
throughputReadPercent,
tableDescription)

val rdd =
spark.sparkContext.hadoopRDD(
jobConf,
classOf[DynamoDBInputFormat],
classOf[Text],
classOf[DynamoDBItemWritable])
(rdd, tableDescription)
}

private[migrator] def makeJobConf(
spark: SparkSession,
endpoint: Option[DynamoDBEndpoint],
credentials: Option[AWSCredentials],
region: Option[String],
table: String,
scanSegments: Option[Int],
maxMapTasks: Option[Int],
readThroughput: Option[Int],
throughputReadPercent: Option[Float],
description: TableDescription
): JobConf = {
val maybeItemCount = Option(description.getItemCount).map(_.toLong)
val maybeAvgItemSize =
for {
Expand All @@ -66,6 +102,7 @@ object DynamoDB {
jobConf.set(DynamoDBConstants.INPUT_TABLE_NAME, table)
setOptionalConf(jobConf, DynamoDBConstants.ITEM_COUNT, maybeItemCount.map(_.toString))
setOptionalConf(jobConf, DynamoDBConstants.AVG_ITEM_SIZE, maybeAvgItemSize.map(_.toString))
setOptionalConf(jobConf, DynamoDBConstants.TABLE_SIZE_BYTES, Option(description.getTableSizeBytes).map(_.toString))
jobConf.set(
DynamoDBConstants.READ_THROUGHPUT,
readThroughput
Expand All @@ -76,13 +113,7 @@ object DynamoDB {
DynamoDBConstants.THROUGHPUT_READ_PERCENT,
throughputReadPercent.map(_.toString))

val rdd =
spark.sparkContext.hadoopRDD(
jobConf,
classOf[DynamoDBInputFormat],
classOf[Text],
classOf[DynamoDBItemWritable])
(rdd, description)
jobConf
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.scylladb.migrator.alternator

import com.amazonaws.services.dynamodbv2.model.{BillingMode, BillingModeSummary, ProvisionedThroughputDescription, TableDescription}
import com.scylladb.migrator.readers.DynamoDB
import org.apache.spark.sql.SparkSession

class DynamoDBInputFormatTest extends munit.FunSuite {

val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()

val GB: Long = 1024 * 1024 * 1024

test("no configured scanSegments in on-demand billing mode") {
checkPartitions(10)(tableSizeBytes = 1 * GB, tableProvisionedThroughput = None)
}
Comment on lines +13 to +15
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, a table containing 1 GB of data and using on-demand billing will be split into 10 chunks of 100 MB each.


test("no configured scanSegments in on-demand billing mode and table size is 100 GB") {
checkPartitions(1024)(tableSizeBytes = 100 * GB, tableProvisionedThroughput = None)
}
Comment on lines +17 to +19
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, a table containing 100 GB of data will be split into 1024 chunks of 100 MB each.


test("no configured scanSegments in provisioned billing mode") {
checkPartitions(10)(tableSizeBytes = 1 * GB, tableProvisionedThroughput = Some((25, 25)))
}
Comment on lines +21 to +23
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the billing mode is provisioned, we still get 10 chunks in the current algorithm because there is no point in using a higher number of smaller chunks, AFAIU.


test("scanSegments = 42") {
checkPartitions(42)(configuredScanSegments = Some(42))
}
Comment on lines +25 to +27
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also explicitly set the scanSegments in the configuration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

42? :-D


test("scanSegements = 42 and maxMapTasks = 10") {
checkPartitions(10)(configuredScanSegments = Some(42), configuredMaxMapTasks = Some(10))
}
Comment on lines +29 to +31
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last, you can explicitly set both the number of scanSegments and maxMapTasks. In such a case, the number of task mappers will be bound to maxMapTasks.


def checkPartitions(expectedPartitions: Int)(
tableSizeBytes: Long = 0L,
tableProvisionedThroughput: Option[(Int, Int)] = None,
configuredScanSegments: Option[Int] = None,
configuredMaxMapTasks: Option[Int] = None,
configuredReadThroughput: Option[Int] = None,
configuredThroughputReadPercent: Option[Float] = None
): Unit = {
val tableDescription =
new TableDescription()
.withTableName("DummyTable")
.withTableSizeBytes(tableSizeBytes)
tableProvisionedThroughput match {
case Some((rcu, wcu)) =>
tableDescription.withProvisionedThroughput(
new ProvisionedThroughputDescription()
.withReadCapacityUnits(rcu)
.withWriteCapacityUnits(wcu)
)
case None =>
tableDescription.withProvisionedThroughput(new ProvisionedThroughputDescription())
.withBillingModeSummary(new BillingModeSummary().withBillingMode(BillingMode.PAY_PER_REQUEST))
}

val jobConf = DynamoDB.makeJobConf(
spark = spark,
endpoint = None,
credentials = None,
region = None,
table = "DummyTable",
scanSegments = configuredScanSegments,
maxMapTasks = configuredMaxMapTasks,
readThroughput = configuredReadThroughput,
throughputReadPercent = configuredThroughputReadPercent,
description = tableDescription
)
val splits = new DynamoDBInputFormat().getSplits(jobConf, 1)

val partitions = splits.length
assertEquals(partitions, expectedPartitions)
}

override def afterAll(): Unit = {
spark.stop()
}

}
Loading