Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not try to infer a schema when migrating from DynamoDB to Alternator #105

Merged
merged 7 commits into from
Mar 18, 2024

Conversation

julienrf
Copy link
Collaborator

@julienrf julienrf commented Mar 1, 2024

Fixes #103.

Instead of using com.audienceproject:spark-dynamodb to migrate the data, we use com.amazon.emr:emr-dynamodb-hadoop as described in https://aws.amazon.com/fr/blogs/big-data/analyze-your-data-on-amazon-dynamodb-with-apache-spark/.

The former approach used to load the data as a DataFrame, which required us to infer the data schema, but that was not doable in a reliable way.

The new approach still benefits from the Spark infrastructure to handle the data transfer efficiently, but the data is loaded as an RDD.

To achieve this, I had to de-unify the migrators for Scylla and Alternator (so, in a sense, I am undoing what was done in #23). The benefit is that the Alternator migrator is not anymore constrained by the requirements of the Scylla migrator.

val item = itemWritable.getItem
for (rename <- renames) {
item.put(rename.to, item.get(rename.from))
item.remove(rename.from)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I guess this would work and it could be also properly split and won't have much overhead I suppose
wondering how cassandra DF handles it (maybe they have the same code in their connector)

@tarzanek
Copy link
Contributor

tarzanek commented Mar 5, 2024

@julienrf please continue here
goal is to get rid of the audience project dependency (fork)
and ideally also get rid of our kinesis fork

@julienrf julienrf force-pushed the discard-schema-inference branch from c0d557a to 1f4fafa Compare March 8, 2024 13:38
val rdd = msgs.collect {
case Some(item) => (new Text(), new DynamoDBItemWritable(item))
}
// FIXME re-partition by columns? msgs.repartition(???)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the previous implementation, we used to repartition the items by partitionColumns. I am not sure why this was useful. Repartitioning by columns does not seem to be natively supported on RDDs (only on DataFrames), so I didn’t try to achieve something “equivalent” at the RDD level.

Copy link
Contributor

@tarzanek tarzanek Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's likely due to shuffle

the problem is that when you start reading partitions in order you will saturate only single cpu that usually owns sequential tokens (PKs)
if you want to maximize parallelism you need to have various random tokens being read in parallel - this makes ALL cpus busy due to how partitions(token ranges) are divided between them

so if we can shuffle the tasks created on top of RDD then we would achieve the same, is this doable? (I know spark 3 tries to improve this even more to avoid explicit shuffle need, but RDDs might need it, especially if their partitioner generates tasks in order of token ranges (so 64 bit signed long IDs from smallest to biggest sequentially per task will definitely create bottlenecks in executing such tasks in parallel))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.aws.amazon.com/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/parallelize-tasks.html explains
so repartition is needed to have more partitions, so basically 2-3x more than cpus
so again it's about parallelism/size of the tasks that should ideally be shuffled

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the pointer.

What was previously done is what is described on that page:

However, when writing a partitioned table, this can lead to an explosion of files (each partition can potentially generate a file into each table partition). To avoid this, you can repartition your DataFrame by column. This uses the table partition columns so the data is organized before writing. You can specify a higher number of partitions without getting small files on the table partitions. However, be careful to avoid data skew, in which some partition values end up with most of the data and delay the completion of the task.

However, we cannot repartion by column anymore since we don’t have a schema anymore. I can try the “2-3× more than CPUs” rule, though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this eventually should be a tunable, after all this in the end will be the number of map reduce tasks on RDD side
which will impact the whole speed and performance of solution

@pdbossman we recently figure out a good split rate , do you have the maths behind it handy? can you share that tuning (related to dynamo DB side?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but I just noticed it's for stream ... do we even care?
but maybe we do ... if there is huge amount of writes in original table, we need to scale out for them and be ready eventually

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how important this is, to be honest. I did it because the existing implementation does something similar:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect it's about scalability for high throughput - if streaming will do lots of writes

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I think me and Pat came to this re num partitions: https://github.com/audienceproject/spark-dynamodb/blob/816c6e6d8a250a2d8b700761b94a198a872a7ea6/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala#L64

Here is the relevant piece of code from the link:

  val numPartitions = parameters.get("readpartitions").map(_.toInt).getOrElse({
      val sizeBased = (tableSize / maxPartitionBytes).toInt max 1
      val remainder = sizeBased % parallelism
      if (remainder > 0) sizeBased + (parallelism - remainder)
      else sizeBased
  })

One challenge to implement it in the context of streamed changes is that there is no fast and accurate way to get the size of an RDD. We would also have to introduce more settings to get e.g. the maxPartitionBytes value.

@julienrf julienrf force-pushed the discard-schema-inference branch 2 times, most recently from a67be0f to 5a03158 Compare March 12, 2024 09:50
This is a work in progress to address scylladb#103.

Instead of using `com.audienceproject:spark-dynamodb` to migrate the data, we use `com.amazon.emr:emr-dynamodb-hadoop` as described in https://aws.amazon.com/fr/blogs/big-data/analyze-your-data-on-amazon-dynamodb-with-apache-spark/.

The former approach used to load the data as a `DataFrame`, which required us to infer the data schema, but which was not doable in a reliable way.

The new approach still benefits from the Spark infrastructure to handle the data transfer efficiently, but the data is loaded as an `RDD`.

To achieve this, I had to de-unify the migrators for Scylla and Alternator (so, in a sense, I am undoing what was done in scylladb#23). The benefit is that the Alternator migrator is not anymore constrained by the requirements of the Scylla migrator.
@julienrf julienrf force-pushed the discard-schema-inference branch from 6e6704c to 1db4c85 Compare March 12, 2024 16:06
The RDD keys are not serializable, which can fail some RDD operations.

We create the RDD element keys _after_ repartitioning to avoid them being serialized across partitions.

This change allowed me to successfully run a data migration with stream changes enabled. Such a scenario can not be added to our test suite though because KCL only works with the real AWS servers (see scylladb#113)
@julienrf julienrf force-pushed the discard-schema-inference branch from 359a0e1 to afcbfd8 Compare March 13, 2024 12:29
@julienrf julienrf marked this pull request as ready for review March 13, 2024 12:29
@julienrf julienrf requested a review from tarzanek March 13, 2024 12:30
)
.dynamodb(source.table)
val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration)
def setOptionalConf(name: String, maybeValue: Option[String]): Unit =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?
all below are plain strings

why this map and converting?

Copy link
Contributor

@tarzanek tarzanek Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I see
maybe we don't want to duplicate this, is there some util where we can put it so setOptionalConf function is not duplicated below?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 8e9728d.

case (acc, Rename(from, to)) => acc.withColumnRenamed(from, to)
val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration)

def setOptionalConf(name: String, maybeValue: Option[String]): Unit =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dedup if possible

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 8e9728d.

@tarzanek
Copy link
Contributor

merging, thank you @julienrf !

@tarzanek tarzanek merged commit b4c60d5 into scylladb:master Mar 18, 2024
1 check passed
@julienrf julienrf deleted the discard-schema-inference branch April 18, 2024 08:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Schema inference for DynamoDB is broken
2 participants