From 8c91fcec50ab18ddabaaf97d2461a32f8ce4fb44 Mon Sep 17 00:00:00 2001 From: bareketamir Date: Tue, 19 Nov 2024 23:26:59 +0200 Subject: [PATCH] Fix error file not found. tmp file is deleted before inserting rows to DB in VerticaToMySQLOperator bulk (#44028) * Fix error file not found. tmp file is deleted before inserting rows to DB in VerticaToMySQLOperator bulk . * fix the mock_get_conn function , so it will really mock the data and will return empty results * fix ruff-format --------- Co-authored-by: Amir.Ba --- .../mysql/transfers/vertica_to_mysql.py | 29 +++++++++---------- .../mysql/transfers/test_vertica_to_mysql.py | 26 ++++++++++------- 2 files changed, 30 insertions(+), 25 deletions(-) diff --git a/providers/src/airflow/providers/mysql/transfers/vertica_to_mysql.py b/providers/src/airflow/providers/mysql/transfers/vertica_to_mysql.py index ee821b38da4de..e13ee7bddadb7 100644 --- a/providers/src/airflow/providers/mysql/transfers/vertica_to_mysql.py +++ b/providers/src/airflow/providers/mysql/transfers/vertica_to_mysql.py @@ -141,21 +141,20 @@ def _bulk_load_transfer(self, mysql, vertica): count += 1 tmpfile.flush() - self._run_preoperator(mysql) - try: - self.log.info("Bulk inserting rows into MySQL...") - with closing(mysql.get_conn()) as conn, closing(conn.cursor()) as cursor: - cursor.execute( - f"LOAD DATA LOCAL INFILE '{tmpfile.name}' " - f"INTO TABLE {self.mysql_table} " - f"LINES TERMINATED BY '\r\n' ({', '.join(selected_columns)})" - ) - conn.commit() - tmpfile.close() - self.log.info("Inserted rows into MySQL %s", count) - except (MySQLdb.Error, MySQLdb.Warning): - self.log.info("Inserted rows into MySQL 0") - raise + self._run_preoperator(mysql) + try: + self.log.info("Bulk inserting rows into MySQL...") + with closing(mysql.get_conn()) as conn, closing(conn.cursor()) as cursor: + cursor.execute( + f"LOAD DATA LOCAL INFILE '{tmpfile.name}' " + f"INTO TABLE {self.mysql_table} " + f"LINES TERMINATED BY '\r\n' ({', '.join(selected_columns)})" + ) + conn.commit() + self.log.info("Inserted rows into MySQL %s", count) + except (MySQLdb.Error, MySQLdb.Warning): + self.log.info("Inserted rows into MySQL 0") + raise def _run_preoperator(self, mysql): if self.mysql_preoperator: diff --git a/providers/tests/mysql/transfers/test_vertica_to_mysql.py b/providers/tests/mysql/transfers/test_vertica_to_mysql.py index 7656a036449f8..da43d38a74e2e 100644 --- a/providers/tests/mysql/transfers/test_vertica_to_mysql.py +++ b/providers/tests/mysql/transfers/test_vertica_to_mysql.py @@ -31,17 +31,23 @@ def mock_get_conn(): + class MockCol: + def __init__(self, name): + self.name = name + + col_a = MockCol(name="a") + col_b = MockCol(name="b") + col_c = MockCol(name="c") + commit_mock = mock.MagicMock() - cursor_mock = mock.MagicMock( - execute=[], - fetchall=[["1", "2", "3"]], - description=["a", "b", "c"], - iterate=[["1", "2", "3"]], - ) - conn_mock = mock.MagicMock( - commit=commit_mock, - cursor=cursor_mock, - ) + cursor_mock = mock.MagicMock(description=[col_a, col_b, col_c]) + cursor_mock.execute.return_value = [] + cursor_mock.fetchall.return_value = [["1", "2", "3"]] + cursor_mock.iterate.return_value = [["1", "2", "3"]] + conn_mock = mock.MagicMock() + conn_mock.commit.return_value = commit_mock + conn_mock.cursor.return_value = cursor_mock + return conn_mock