Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Repeated errors with PartitionRange caused by query timeouts #304

Open
lemalucau-r7 opened this issue Sep 11, 2024 · 4 comments
Open

Repeated errors with PartitionRange caused by query timeouts #304

lemalucau-r7 opened this issue Sep 11, 2024 · 4 comments

Comments

@lemalucau-r7
Copy link

After running a migration for a table from Cassandra 2.1 to 4, one or more partitions keep on returning an error during the validation stage because the query keeps timing out:

24/09/11 14:01:00 ERROR DiffJobSession: Error with PartitionRange -- ThreadID: 121 Processing min: 139515770437584770019983589047024966756 max: 141217182272189462337300462084183807813
com.datastax.oss.driver.api.core.DriverTimeoutException: Query timed out after PT2M
        at com.datastax.oss.driver.api.core.DriverTimeoutException.copy(DriverTimeoutException.java:36)
        at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:151)
        at com.datastax.oss.driver.internal.core.cql.MultiPageResultSet$RowIterator.maybeMoveToNextPage(MultiPageResultSet.java:101)
        at com.datastax.oss.driver.internal.core.cql.MultiPageResultSet$RowIterator.computeNext(MultiPageResultSet.java:93)
        at com.datastax.oss.driver.internal.core.cql.MultiPageResultSet$RowIterator.computeNext(MultiPageResultSet.java:81)
        at com.datastax.oss.driver.internal.core.util.CountingIterator.tryToComputeNext(CountingIterator.java:93)
        at com.datastax.oss.driver.internal.core.util.CountingIterator.hasNext(CountingIterator.java:88)
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:132)
        at com.datastax.oss.driver.internal.core.cql.PagingIterableSpliterator.forEachRemaining(PagingIterableSpliterator.java:120)
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
        at com.datastax.cdm.job.DiffJobSession.getDataAndDiff(DiffJobSession.java:153)
        at com.datastax.cdm.job.DiffJobSession.processSlice(DiffJobSession.java:124)
        at com.datastax.cdm.job.DiffJobSession.processSlice(DiffJobSession.java:54)
        at com.datastax.cdm.job.DiffData$.$anonfun$execute$5(DiffData.scala:33)                                                                                                                                                                                          [0/693]
        at com.datastax.cdm.job.DiffData$.$anonfun$execute$5$adapted(DiffData.scala:31)
        at com.datastax.spark.connector.cql.CassandraConnector.$anonfun$withSessionDo$1(CassandraConnector.scala:104)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:121)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:103)
        at com.datastax.cdm.job.DiffData$.$anonfun$execute$4(DiffData.scala:31)
        at com.datastax.cdm.job.DiffData$.$anonfun$execute$4$adapted(DiffData.scala:30)
        at com.datastax.spark.connector.cql.CassandraConnector.$anonfun$withSessionDo$1(CassandraConnector.scala:104)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:121)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:103)
        at com.datastax.cdm.job.DiffData$.$anonfun$execute$3(DiffData.scala:30)
        at com.datastax.cdm.job.DiffData$.$anonfun$execute$3$adapted(DiffData.scala:29)
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1031)
        at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1031)
        at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)

But I took a look at the final record count and there are no mismatches or missing records, so it looks like the validation/migration ran perfectly?

24/09/11 14:02:36 INFO JobCounter: ################################################################################################
24/09/11 14:02:36 INFO JobCounter: Final Read Record Count: 144824
24/09/11 14:02:36 INFO JobCounter: Final Mismatch Record Count: 0
24/09/11 14:02:36 INFO JobCounter: Final Corrected Mismatch Record Count: 0
24/09/11 14:02:36 INFO JobCounter: Final Missing Record Count: 0
24/09/11 14:02:36 INFO JobCounter: Final Corrected Missing Record Count: 0
24/09/11 14:02:36 INFO JobCounter: Final Valid Record Count: 144824
24/09/11 14:02:36 INFO JobCounter: Final Skipped Record Count: 0
24/09/11 14:02:36 INFO JobCounter: ################################################################################################

I've tried validating those particular partitions but I still get the same error. I tried processing smaller chunks of the partitions but I still get the same error.

How do you deal with this issue? Also how can I be sure that the final record count is accurate if there was is an error with one or more partitions?

Any help would be appreciated, thank you!

@pravinbhat
Copy link
Collaborator

pravinbhat commented Sep 12, 2024

Hi @lemalucau-r7

Can you use the latest version (4.3.10) of CDM, we had recently addressed the issue related to count mismatch when a partition error happens. As for the failure itself, I think you cluster may be having a large partition. I would recommend you use the below property to handle large partitions, you can go with a value of 100 (default is 1000) & check if it works. Let us know what you find.

spark.cdm.perfops.fetchSizeInRows 100

@lemalucau-r7
Copy link
Author

HI @pravinbhat, I tried again with the latest version of CDM and used the spark.cdm.perfops.fetchSizeInRows property you suggested but the query was still timing out and giving me partition range errors.

I tried a lot of different variations with the spark.cdm.perfops.fetchSizeInRows and other perfops properties but still no luck. The partition must be too large to process before the query times out.

Since it failed to process the partition, does that mean there is data missing in the migrated table?

@pravinbhat
Copy link
Collaborator

Hi @lemalucau-r7,
When you ran with the new version of CDM, did it report the error counts for the failed partitions this time? It is likely that both validation & migration jobs are having trouble with a few partitions that are extra large, however, both the jobs should reports the correct error counts. Does it fail even when you use a very small value (e.g. 10) for fetchSizeInRows?

You could try to find the details about large partitions using the DSBulk tool.

@lemalucau-r7
Copy link
Author

lemalucau-r7 commented Oct 17, 2024

After running with the latest version of CDM, it reported the error counts correctly which was useful. Yeah it's definitely having issues due to the partition sizes. Also, I did try vey small values fetchSizeInRows but no luck. I ended up leaving it since the data in the table wasn't very important, and I haven't had any large partition issues with other tables I migrated so I'm happy with closing this issue.

Thanks for taking a look and for the link to the DSBulk tool, if I run into the issue again I'll definitely use it to find out more about the large partitions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants