Skip to content

Commit

Permalink
Preserve tags on FileSourceScanExec (#10461)
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Lowe <[email protected]>
  • Loading branch information
jlowe authored Feb 22, 2024
1 parent 3d16605 commit 3f6196d
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -339,7 +339,9 @@ abstract class Spark31XShims extends Spark31Xuntil33XShims with Logging {
override def tagPlanForGpu(): Unit = GpuFileSourceScanExec.tagSupport(this)

override def convertToCpu(): SparkPlan = {
wrapped.copy(partitionFilters = partitionFilters)
val cpu = wrapped.copy(partitionFilters = partitionFilters)
cpu.copyTagsFrom(wrapped)
cpu
}

override def convertToGpu(): GpuExec = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -85,7 +85,9 @@ class BatchScanExecMeta(p: BatchScanExec,
}

override def convertToCpu(): SparkPlan = {
wrapped.copy(runtimeFilters = runtimeFilters)
val cpu = wrapped.copy(runtimeFilters = runtimeFilters)
cpu.copyTagsFrom(wrapped)
cpu
}

override def convertToGpu(): GpuExec =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -82,7 +82,9 @@ class FileSourceScanExecMeta(plan: FileSourceScanExec,
override def tagPlanForGpu(): Unit = ScanExecShims.tagGpuFileSourceScanExecSupport(this)

override def convertToCpu(): SparkPlan = {
wrapped.copy(partitionFilters = partitionFilters)
val cpu = wrapped.copy(partitionFilters = partitionFilters)
cpu.copyTagsFrom(wrapped)
cpu
}

override def convertToGpu(): GpuExec = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -126,7 +126,9 @@ class FileSourceScanExecMeta(plan: FileSourceScanExec,
}

override def convertToCpu(): SparkPlan = {
wrapped.copy(partitionFilters = partitionFilters, dataFilters = dataFilters)
val cpu = wrapped.copy(partitionFilters = partitionFilters, dataFilters = dataFilters)
cpu.copyTagsFrom(wrapped)
cpu
}

override def convertToGpu(): GpuExec = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -76,7 +76,9 @@ class BatchScanExecMeta(p: BatchScanExec,
}

override def convertToCpu(): SparkPlan = {
wrapped.copy(runtimeFilters = runtimeFilters)
val cpu = wrapped.copy(runtimeFilters = runtimeFilters)
cpu.copyTagsFrom(wrapped)
cpu
}

override def convertToGpu(): GpuExec =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -75,7 +75,9 @@ class BatchScanExecMeta(p: BatchScanExec,
}

override def convertToCpu(): SparkPlan = {
wrapped.copy(runtimeFilters = runtimeFilters)
val cpu = wrapped.copy(runtimeFilters = runtimeFilters)
cpu.copyTagsFrom(wrapped)
cpu
}

override def convertToGpu(): GpuExec = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -73,7 +73,9 @@ class BatchScanExecMeta(p: BatchScanExec,
}

override def convertToCpu(): SparkPlan = {
wrapped.copy(runtimeFilters = runtimeFilters)
val cpu = wrapped.copy(runtimeFilters = runtimeFilters)
cpu.copyTagsFrom(wrapped)
cpu
}

override def convertToGpu(): GpuExec = {
Expand Down

0 comments on commit 3f6196d

Please sign in to comment.