-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Do not try to infer a schema when migrating from DynamoDB to Alternator #105
Conversation
val item = itemWritable.getItem | ||
for (rename <- renames) { | ||
item.put(rename.to, item.get(rename.from)) | ||
item.remove(rename.from) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I guess this would work and it could be also properly split and won't have much overhead I suppose
wondering how cassandra DF handles it (maybe they have the same code in their connector)
src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala
Outdated
Show resolved
Hide resolved
@julienrf please continue here |
c0d557a
to
1f4fafa
Compare
val rdd = msgs.collect { | ||
case Some(item) => (new Text(), new DynamoDBItemWritable(item)) | ||
} | ||
// FIXME re-partition by columns? msgs.repartition(???) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the previous implementation, we used to repartition
the items by partitionColumns
. I am not sure why this was useful. Repartitioning by columns does not seem to be natively supported on RDD
s (only on DataFrame
s), so I didn’t try to achieve something “equivalent” at the RDD level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's likely due to shuffle
the problem is that when you start reading partitions in order you will saturate only single cpu that usually owns sequential tokens (PKs)
if you want to maximize parallelism you need to have various random tokens being read in parallel - this makes ALL cpus busy due to how partitions(token ranges) are divided between them
so if we can shuffle the tasks created on top of RDD then we would achieve the same, is this doable? (I know spark 3 tries to improve this even more to avoid explicit shuffle need, but RDDs might need it, especially if their partitioner generates tasks in order of token ranges (so 64 bit signed long IDs from smallest to biggest sequentially per task will definitely create bottlenecks in executing such tasks in parallel))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://docs.aws.amazon.com/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/parallelize-tasks.html explains
so repartition is needed to have more partitions, so basically 2-3x more than cpus
so again it's about parallelism/size of the tasks that should ideally be shuffled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the pointer.
What was previously done is what is described on that page:
However, when writing a partitioned table, this can lead to an explosion of files (each partition can potentially generate a file into each table partition). To avoid this, you can repartition your DataFrame by column. This uses the table partition columns so the data is organized before writing. You can specify a higher number of partitions without getting small files on the table partitions. However, be careful to avoid data skew, in which some partition values end up with most of the data and delay the completion of the task.
However, we cannot repartion by column anymore since we don’t have a schema anymore. I can try the “2-3× more than CPUs” rule, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this eventually should be a tunable, after all this in the end will be the number of map reduce tasks on RDD side
which will impact the whole speed and performance of solution
@pdbossman we recently figure out a good split rate , do you have the maths behind it handy? can you share that tuning (related to dynamo DB side?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I think me and Pat came to this re num partitions:
https://github.com/audienceproject/spark-dynamodb/blob/816c6e6d8a250a2d8b700761b94a198a872a7ea6/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala#L64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but I just noticed it's for stream ... do we even care?
but maybe we do ... if there is huge amount of writes in original table, we need to scale out for them and be ready eventually
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure how important this is, to be honest. I did it because the existing implementation does something similar:
scylla-migrator/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala
Line 95 in a7d43e0
.repartition(partitionColumns: _*) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I expect it's about scalability for high throughput - if streaming will do lots of writes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I think me and Pat came to this re num partitions: https://github.com/audienceproject/spark-dynamodb/blob/816c6e6d8a250a2d8b700761b94a198a872a7ea6/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala#L64
Here is the relevant piece of code from the link:
val numPartitions = parameters.get("readpartitions").map(_.toInt).getOrElse({
val sizeBased = (tableSize / maxPartitionBytes).toInt max 1
val remainder = sizeBased % parallelism
if (remainder > 0) sizeBased + (parallelism - remainder)
else sizeBased
})
One challenge to implement it in the context of streamed changes is that there is no fast and accurate way to get the size of an RDD. We would also have to introduce more settings to get e.g. the maxPartitionBytes
value.
a67be0f
to
5a03158
Compare
This is a work in progress to address scylladb#103. Instead of using `com.audienceproject:spark-dynamodb` to migrate the data, we use `com.amazon.emr:emr-dynamodb-hadoop` as described in https://aws.amazon.com/fr/blogs/big-data/analyze-your-data-on-amazon-dynamodb-with-apache-spark/. The former approach used to load the data as a `DataFrame`, which required us to infer the data schema, but which was not doable in a reliable way. The new approach still benefits from the Spark infrastructure to handle the data transfer efficiently, but the data is loaded as an `RDD`. To achieve this, I had to de-unify the migrators for Scylla and Alternator (so, in a sense, I am undoing what was done in scylladb#23). The benefit is that the Alternator migrator is not anymore constrained by the requirements of the Scylla migrator.
6e6704c
to
1db4c85
Compare
The RDD keys are not serializable, which can fail some RDD operations. We create the RDD element keys _after_ repartitioning to avoid them being serialized across partitions. This change allowed me to successfully run a data migration with stream changes enabled. Such a scenario can not be added to our test suite though because KCL only works with the real AWS servers (see scylladb#113)
359a0e1
to
afcbfd8
Compare
) | ||
.dynamodb(source.table) | ||
val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration) | ||
def setOptionalConf(name: String, maybeValue: Option[String]): Unit = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why?
all below are plain strings
why this map and converting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I see
maybe we don't want to duplicate this, is there some util where we can put it so setOptionalConf function is not duplicated below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 8e9728d.
case (acc, Rename(from, to)) => acc.withColumnRenamed(from, to) | ||
val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration) | ||
|
||
def setOptionalConf(name: String, maybeValue: Option[String]): Unit = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dedup if possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 8e9728d.
merging, thank you @julienrf ! |
Fixes #103.
Instead of using
com.audienceproject:spark-dynamodb
to migrate the data, we usecom.amazon.emr:emr-dynamodb-hadoop
as described in https://aws.amazon.com/fr/blogs/big-data/analyze-your-data-on-amazon-dynamodb-with-apache-spark/.The former approach used to load the data as a
DataFrame
, which required us to infer the data schema, but that was not doable in a reliable way.The new approach still benefits from the Spark infrastructure to handle the data transfer efficiently, but the data is loaded as an
RDD
.To achieve this, I had to de-unify the migrators for Scylla and Alternator (so, in a sense, I am undoing what was done in #23). The benefit is that the Alternator migrator is not anymore constrained by the requirements of the Scylla migrator.