Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set the number of Hadoop task mappers to the number of scan segments #143

Merged
merged 1 commit into from
May 30, 2024

Conversation

julienrf
Copy link
Collaborator

Also:

  • remove the constraint on the number of task mappers based on the memory of the worker nodes
  • by default split the data into tasks of 100 MB each

Fixes #130

* - use as many partitions as the number of scan segments
* - by default, create segments that split the data into 128 MB chunks
*/
class DynamoDBInputFormat extends org.apache.hadoop.dynamodb.read.DynamoDBInputFormat {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our class overrides the base implementation of the methods getSplits and getNumSegments. Our implementation is based on the original implementation available here:

https://github.com/awslabs/emr-dynamodb-connector/blob/07391dd0937bdbb20b86ec79444798df11b0711f/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/read/AbstractDynamoDBInputFormat.java

But with some variations.

s"Using number of segments configured using ${DynamoDBConstants.SCAN_SEGMENTS}: ${numSegments}")
numSegments
} else {
// split into segments of at most 100 MB each (note: upstream implementation splits into 1 GB segments)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The upstream implementation uses segments of 1 GB each.

In my tests the throughput is the highest with n segments of about 100 MB where n is also divisible by the total number of CPUs of the Spark cluster (e.g., in my case I had 8 cores, and the best throughput was observed with 16 segments of 106 MB to transfer 1.7 GB of data in total).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you use faster cpu, memory and network
100MB might be bigger

also for big DBs you might end up with lots of tasks that will expect more memory on driver and master

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this configurable? @julienrf ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good idea!

Copy link
Contributor

@tarzanek tarzanek May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, you can always use numSegments to override manually ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, you can always use numSegments to override manually ...

Yes, you can already manually set scanSegments and the logic below is used only in case it is missing. We could also provide a segmentSize setting which could be an alternative to scanSegments to control the partitioning. The benefits would be that the same configuration (e.g. segmentSize: 100) could be used with different tables and it would automatically compute an appropriate number of segments that fit the desired size. Whereas with scanSegments you have to adapt its value to every table you migrate (because you want to adjust the number of segments according to each table size).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve created #146 to track this possible feature that we could implement in the future.

Comment on lines +13 to +15
test("no configured scanSegments in on-demand billing mode") {
checkPartitions(10)(tableSizeBytes = 1 * GB, tableProvisionedThroughput = None)
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, a table containing 1 GB of data and using on-demand billing will be split into 10 chunks of 100 MB each.

Comment on lines +17 to +19
test("no configured scanSegments in on-demand billing mode and table size is 100 GB") {
checkPartitions(1024)(tableSizeBytes = 100 * GB, tableProvisionedThroughput = None)
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, a table containing 100 GB of data will be split into 1024 chunks of 100 MB each.

Comment on lines +21 to +23
test("no configured scanSegments in provisioned billing mode") {
checkPartitions(10)(tableSizeBytes = 1 * GB, tableProvisionedThroughput = Some((25, 25)))
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the billing mode is provisioned, we still get 10 chunks in the current algorithm because there is no point in using a higher number of smaller chunks, AFAIU.

Comment on lines +25 to +27
test("scanSegments = 42") {
checkPartitions(42)(configuredScanSegments = Some(42))
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also explicitly set the scanSegments in the configuration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

42? :-D

Comment on lines +29 to +31
test("scanSegements = 42 and maxMapTasks = 10") {
checkPartitions(10)(configuredScanSegments = Some(42), configuredMaxMapTasks = Some(10))
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last, you can explicitly set both the number of scanSegments and maxMapTasks. In such a case, the number of task mappers will be bound to maxMapTasks.

Also:
- remove the constraint on the number of task mappers based on the memory of the worker nodes
- by default split the data into tasks of 100 MB each

Fixes scylladb#130
@julienrf julienrf force-pushed the throughput-tuning branch from 5da8518 to e01e94e Compare May 23, 2024 14:54
@guy9
Copy link
Collaborator

guy9 commented May 26, 2024

Thanks @julienrf.
@pdbossman , please review

@pdbossman
Copy link
Contributor

I concur with the comments above, that 100MB is commonly used to determine partition size, and the ability to set scanSegments gives the user the control they need.

What I'm really waiting for is @GeoffMontee successful test with large number of partitions on our large dataset.

@tarzanek tarzanek merged commit 7187a2f into scylladb:master May 30, 2024
1 check passed
@julienrf julienrf deleted the throughput-tuning branch May 30, 2024 20:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Migration from DynamoDB falls back to single partition
4 participants