-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Set the number of Hadoop task mappers to the number of scan segments #143
Conversation
* - use as many partitions as the number of scan segments | ||
* - by default, create segments that split the data into 128 MB chunks | ||
*/ | ||
class DynamoDBInputFormat extends org.apache.hadoop.dynamodb.read.DynamoDBInputFormat { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our class overrides the base implementation of the methods getSplits
and getNumSegments
. Our implementation is based on the original implementation available here:
But with some variations.
s"Using number of segments configured using ${DynamoDBConstants.SCAN_SEGMENTS}: ${numSegments}") | ||
numSegments | ||
} else { | ||
// split into segments of at most 100 MB each (note: upstream implementation splits into 1 GB segments) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The upstream implementation uses segments of 1 GB each.
In my tests the throughput is the highest with n
segments of about 100 MB where n
is also divisible by the total number of CPUs of the Spark cluster (e.g., in my case I had 8 cores, and the best throughput was observed with 16 segments of 106 MB to transfer 1.7 GB of data in total).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you use faster cpu, memory and network
100MB might be bigger
also for big DBs you might end up with lots of tasks that will expect more memory on driver and master
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make this configurable? @julienrf ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good idea!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw. audience project had similar calculation,
see
https://github.com/audienceproject/spark-dynamodb/blob/816c6e6d8a250a2d8b700761b94a198a872a7ea6/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala#L64
seems they used ~128MB
( maxPartitionBytes )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, you can always use numSegments to override manually ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, you can always use numSegments to override manually ...
Yes, you can already manually set scanSegments
and the logic below is used only in case it is missing. We could also provide a segmentSize
setting which could be an alternative to scanSegments
to control the partitioning. The benefits would be that the same configuration (e.g. segmentSize: 100
) could be used with different tables and it would automatically compute an appropriate number of segments that fit the desired size. Whereas with scanSegments
you have to adapt its value to every table you migrate (because you want to adjust the number of segments according to each table size).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’ve created #146 to track this possible feature that we could implement in the future.
test("no configured scanSegments in on-demand billing mode") { | ||
checkPartitions(10)(tableSizeBytes = 1 * GB, tableProvisionedThroughput = None) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By default, a table containing 1 GB of data and using on-demand billing will be split into 10 chunks of 100 MB each.
test("no configured scanSegments in on-demand billing mode and table size is 100 GB") { | ||
checkPartitions(1024)(tableSizeBytes = 100 * GB, tableProvisionedThroughput = None) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, a table containing 100 GB of data will be split into 1024 chunks of 100 MB each.
test("no configured scanSegments in provisioned billing mode") { | ||
checkPartitions(10)(tableSizeBytes = 1 * GB, tableProvisionedThroughput = Some((25, 25))) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the billing mode is provisioned, we still get 10 chunks in the current algorithm because there is no point in using a higher number of smaller chunks, AFAIU.
test("scanSegments = 42") { | ||
checkPartitions(42)(configuredScanSegments = Some(42)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also explicitly set the scanSegments
in the configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
42? :-D
test("scanSegements = 42 and maxMapTasks = 10") { | ||
checkPartitions(10)(configuredScanSegments = Some(42), configuredMaxMapTasks = Some(10)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last, you can explicitly set both the number of scanSegments
and maxMapTasks
. In such a case, the number of task mappers will be bound to maxMapTasks
.
Also: - remove the constraint on the number of task mappers based on the memory of the worker nodes - by default split the data into tasks of 100 MB each Fixes scylladb#130
5da8518
to
e01e94e
Compare
Thanks @julienrf. |
I concur with the comments above, that 100MB is commonly used to determine partition size, and the ability to set scanSegments gives the user the control they need. What I'm really waiting for is @GeoffMontee successful test with large number of partitions on our large dataset. |
Also:
Fixes #130