Skip to content

Commit

Permalink
Provide option to force_delete for GCSToBigQueryOperator (apache#…
Browse files Browse the repository at this point in the history
…43785)

* Adding a parameter to provide the option to force delete the destination table if it already exists.

* Adding a test for force_delete

* Update providers/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py

Co-authored-by: Shahar Epstein <[email protected]>

---------

Co-authored-by: Shahar Epstein <[email protected]>
  • Loading branch information
nathadfield and shahar1 authored Nov 11, 2024
1 parent c82a76e commit 606ef45
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ class GCSToBigQueryOperator(BaseOperator):
destination table is newly created. If the table already exists and a value different than the
current description is provided, the job will fail.
:param deferrable: Run operator in the deferrable mode
:param force_delete: Force the destination table to be deleted if it already exists.
"""

template_fields: Sequence[str] = (
Expand Down Expand Up @@ -231,6 +232,7 @@ def __init__(
force_rerun: bool = True,
reattach_states: set[str] | None = None,
project_id: str = PROVIDE_PROJECT_ID,
force_delete: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand Down Expand Up @@ -296,6 +298,7 @@ def __init__(
self.force_rerun = force_rerun
self.reattach_states: set[str] = reattach_states or set()
self.cancel_on_kill = cancel_on_kill
self.force_delete = force_delete

self.source_uris: list[str] = []

Expand Down Expand Up @@ -378,7 +381,11 @@ def execute(self, context: Context):
max_id = self._find_max_value_in_column()
return max_id
else:
self.log.info("Using existing BigQuery table for storing data...")
if self.force_delete:
self.log.info("Deleting table %s", self.destination_project_dataset_table)
hook.delete_table(table_id=self.destination_project_dataset_table)
else:
self.log.info("Using existing BigQuery table for storing data...")
self.configuration = self._use_existing_table()

try:
Expand Down
25 changes: 25 additions & 0 deletions providers/tests/google/cloud/transfers/test_gcs_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1946,3 +1946,28 @@ def create_context(self, task):
"task_instance": task_instance,
"logical_date": logical_date,
}

@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_force_delete_should_execute_successfully(self, hook):
hook.return_value.insert_job.side_effect = [
MagicMock(job_id=REAL_JOB_ID, error_result=False),
REAL_JOB_ID,
]
hook.return_value.generate_job_id.return_value = REAL_JOB_ID
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE)
hook.return_value.get_job.return_value.result.return_value = ("1",)

operator = GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
write_disposition=WRITE_DISPOSITION,
schema_fields=SCHEMA_FIELDS_INT,
autodetect=True,
project_id=JOB_PROJECT_ID,
force_delete=True,
)

operator.execute(context=MagicMock())
hook.return_value.delete_table.assert_called_once_with(table_id=TEST_EXPLICIT_DEST)

0 comments on commit 606ef45

Please sign in to comment.