From cf28b9f90ca61787253a2cc75abb2b8b9780327b Mon Sep 17 00:00:00 2001 From: Colin Date: Tue, 3 Dec 2024 20:36:25 -0800 Subject: [PATCH 01/10] support retryable exceptions during query execution --- dbt/adapters/sql/connections.py | 85 +++++++++++++++++++++++++++------ 1 file changed, 70 insertions(+), 15 deletions(-) diff --git a/dbt/adapters/sql/connections.py b/dbt/adapters/sql/connections.py index baccddc9f..f5c0f03a2 100644 --- a/dbt/adapters/sql/connections.py +++ b/dbt/adapters/sql/connections.py @@ -1,23 +1,25 @@ import abc import time -from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, TYPE_CHECKING +from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, TYPE_CHECKING, Callable, Type, Union from dbt_common.events.contextvars import get_node_info from dbt_common.events.functions import fire_event -from dbt_common.exceptions import DbtInternalError, NotImplementedError +from dbt_common.exceptions import DbtInternalError, NotImplementedError, DbtRuntimeError from dbt_common.utils import cast_to_str from dbt.adapters.base import BaseConnectionManager +from dbt.adapters.base.connections import SleepTime from dbt.adapters.contracts.connection import ( AdapterResponse, Connection, ConnectionState, ) +from dbt.adapters.events.logging import AdapterLogger from dbt.adapters.events.types import ( ConnectionUsed, SQLCommit, SQLQuery, - SQLQueryStatus, + SQLQueryStatus, AdapterEventDebug, ) if TYPE_CHECKING: @@ -56,11 +58,14 @@ def cancel_open(self) -> List[str]: return names def add_query( - self, - sql: str, - auto_begin: bool = True, - bindings: Optional[Any] = None, - abridge_sql_log: bool = False, + self, + sql: str, + auto_begin: bool = True, + bindings: Optional[Any] = None, + abridge_sql_log: bool = False, + retryable_exceptions: Iterable[Type[Exception]] = [], + retry_limit: int = 1, + retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1, ) -> Tuple[Connection, Any]: connection = self.get_thread_connection() if auto_begin and connection.transaction_open is False: @@ -90,7 +95,14 @@ def add_query( pre = time.perf_counter() cursor = connection.handle.cursor() - cursor.execute(sql, bindings) + self._retryable_cursor_execute( + execute_fn=cursor.execute, + sql=sql, + bindings=bindings, + retryable_exceptions=retryable_exceptions, + retry_limit=retry_limit, + retry_timeout=retry_timeout + ) result = self.get_response(cursor) @@ -113,7 +125,7 @@ def get_response(cls, cursor: Any) -> AdapterResponse: @classmethod def process_results( - cls, column_names: Iterable[str], rows: Iterable[Any] + cls, column_names: Iterable[str], rows: Iterable[Any] ) -> Iterator[Dict[str, Any]]: unique_col_names = dict() # type: ignore[var-annotated] for idx in range(len(column_names)): # type: ignore[arg-type] @@ -145,11 +157,11 @@ def get_result_from_cursor(cls, cursor: Any, limit: Optional[int]) -> "agate.Tab return table_from_data_flat(data, column_names) def execute( - self, - sql: str, - auto_begin: bool = False, - fetch: bool = False, - limit: Optional[int] = None, + self, + sql: str, + auto_begin: bool = False, + fetch: bool = False, + limit: Optional[int] = None, ) -> Tuple[AdapterResponse, "agate.Table"]: from dbt_common.clients.agate_helper import empty_table @@ -199,3 +211,46 @@ def commit(self): connection.transaction_open = False return connection + + def _retryable_cursor_execute(self, + execute_fn: Callable, + sql: str, + bindings: Optional[Any] = None, + retryable_exceptions: Iterable[Type[Exception]] = [], + retry_limit: int = 1, + retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1, + _attempts: int = 0, + ) -> None: + timeout = retry_timeout(_attempts) if callable(retry_timeout) else retry_timeout + if timeout < 0: + raise DbtRuntimeError( + "retry_timeout cannot be negative or return a negative time." + ) + + try: + execute_fn(sql, bindings) + + except tuple(retryable_exceptions) as e: + retry_limit -= 1 + if retry_limit <= 0: + raise e + fire_event( + AdapterEventDebug( + message=f"Got a retryable error {type(e)} when attempting to execute a query.\n" + f"{retry_limit} attempts remaining. Retrying in {timeout} seconds.\n" + f"Error:\n{e}" + ) + ) + + time.sleep(timeout) + return self._retryable_cursor_execute( + execute_fn=execute_fn, + sql=sql, + retry_limit=retry_limit - 1, + retry_timeout=retry_timeout, + retryable_exceptions=retryable_exceptions, + _attempts=_attempts + 1, + ) + + except Exception as e: + raise e From 1eeefc3544a94a92f831e299df3e45746613d20f Mon Sep 17 00:00:00 2001 From: VersusFacit <67295367+VersusFacit@users.noreply.github.com> Date: Wed, 4 Dec 2024 18:59:26 -0800 Subject: [PATCH 02/10] Add changelog. --- .changes/unreleased/Under the Hood-20241204-185912.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Under the Hood-20241204-185912.yaml diff --git a/.changes/unreleased/Under the Hood-20241204-185912.yaml b/.changes/unreleased/Under the Hood-20241204-185912.yaml new file mode 100644 index 000000000..5c7317031 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20241204-185912.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Add retry logic for retryable exceptions. +time: 2024-12-04T18:59:12.48816-08:00 +custom: + Author: 'colin-rogers-dbt ' + Issue: "368" From 128bb28581d606d070a4c1eca77011ad817007a9 Mon Sep 17 00:00:00 2001 From: VersusFacit <67295367+VersusFacit@users.noreply.github.com> Date: Wed, 4 Dec 2024 19:00:13 -0800 Subject: [PATCH 03/10] Lint. --- dbt/adapters/sql/connections.py | 85 +++++++++++++++++++-------------- 1 file changed, 48 insertions(+), 37 deletions(-) diff --git a/dbt/adapters/sql/connections.py b/dbt/adapters/sql/connections.py index f5c0f03a2..0c6797cf3 100644 --- a/dbt/adapters/sql/connections.py +++ b/dbt/adapters/sql/connections.py @@ -1,6 +1,18 @@ import abc import time -from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, TYPE_CHECKING, Callable, Type, Union +from typing import ( + Any, + Dict, + Iterable, + Iterator, + List, + Optional, + Tuple, + TYPE_CHECKING, + Callable, + Type, + Union, +) from dbt_common.events.contextvars import get_node_info from dbt_common.events.functions import fire_event @@ -14,12 +26,12 @@ Connection, ConnectionState, ) -from dbt.adapters.events.logging import AdapterLogger from dbt.adapters.events.types import ( ConnectionUsed, SQLCommit, SQLQuery, - SQLQueryStatus, AdapterEventDebug, + SQLQueryStatus, + AdapterEventDebug, ) if TYPE_CHECKING: @@ -58,14 +70,14 @@ def cancel_open(self) -> List[str]: return names def add_query( - self, - sql: str, - auto_begin: bool = True, - bindings: Optional[Any] = None, - abridge_sql_log: bool = False, - retryable_exceptions: Iterable[Type[Exception]] = [], - retry_limit: int = 1, - retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1, + self, + sql: str, + auto_begin: bool = True, + bindings: Optional[Any] = None, + abridge_sql_log: bool = False, + retryable_exceptions: Iterable[Type[Exception]] = [], + retry_limit: int = 1, + retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1, ) -> Tuple[Connection, Any]: connection = self.get_thread_connection() if auto_begin and connection.transaction_open is False: @@ -96,12 +108,12 @@ def add_query( cursor = connection.handle.cursor() self._retryable_cursor_execute( - execute_fn=cursor.execute, - sql=sql, - bindings=bindings, - retryable_exceptions=retryable_exceptions, - retry_limit=retry_limit, - retry_timeout=retry_timeout + execute_fn=cursor.execute, + sql=sql, + bindings=bindings, + retryable_exceptions=retryable_exceptions, + retry_limit=retry_limit, + retry_timeout=retry_timeout, ) result = self.get_response(cursor) @@ -125,7 +137,7 @@ def get_response(cls, cursor: Any) -> AdapterResponse: @classmethod def process_results( - cls, column_names: Iterable[str], rows: Iterable[Any] + cls, column_names: Iterable[str], rows: Iterable[Any] ) -> Iterator[Dict[str, Any]]: unique_col_names = dict() # type: ignore[var-annotated] for idx in range(len(column_names)): # type: ignore[arg-type] @@ -157,11 +169,11 @@ def get_result_from_cursor(cls, cursor: Any, limit: Optional[int]) -> "agate.Tab return table_from_data_flat(data, column_names) def execute( - self, - sql: str, - auto_begin: bool = False, - fetch: bool = False, - limit: Optional[int] = None, + self, + sql: str, + auto_begin: bool = False, + fetch: bool = False, + limit: Optional[int] = None, ) -> Tuple[AdapterResponse, "agate.Table"]: from dbt_common.clients.agate_helper import empty_table @@ -212,20 +224,19 @@ def commit(self): return connection - def _retryable_cursor_execute(self, - execute_fn: Callable, - sql: str, - bindings: Optional[Any] = None, - retryable_exceptions: Iterable[Type[Exception]] = [], - retry_limit: int = 1, - retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1, - _attempts: int = 0, - ) -> None: + def _retryable_cursor_execute( + self, + execute_fn: Callable, + sql: str, + bindings: Optional[Any] = None, + retryable_exceptions: Iterable[Type[Exception]] = [], + retry_limit: int = 1, + retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1, + _attempts: int = 0, + ) -> None: timeout = retry_timeout(_attempts) if callable(retry_timeout) else retry_timeout if timeout < 0: - raise DbtRuntimeError( - "retry_timeout cannot be negative or return a negative time." - ) + raise DbtRuntimeError("retry_timeout cannot be negative or return a negative time.") try: execute_fn(sql, bindings) @@ -237,8 +248,8 @@ def _retryable_cursor_execute(self, fire_event( AdapterEventDebug( message=f"Got a retryable error {type(e)} when attempting to execute a query.\n" - f"{retry_limit} attempts remaining. Retrying in {timeout} seconds.\n" - f"Error:\n{e}" + f"{retry_limit} attempts remaining. Retrying in {timeout} seconds.\n" + f"Error:\n{e}" ) ) From 17945f81c3f8c76bf600ad0be8b8f533ab67aa7c Mon Sep 17 00:00:00 2001 From: VersusFacit <67295367+VersusFacit@users.noreply.github.com> Date: Mon, 16 Dec 2024 02:50:03 -0800 Subject: [PATCH 04/10] Backtrack some of the abstraction and make this simpler. Still pipe through retry number to query execution. --- dbt/adapters/sql/connections.py | 90 ++++++++++++++++----------------- 1 file changed, 44 insertions(+), 46 deletions(-) diff --git a/dbt/adapters/sql/connections.py b/dbt/adapters/sql/connections.py index 0c6797cf3..fc6c02106 100644 --- a/dbt/adapters/sql/connections.py +++ b/dbt/adapters/sql/connections.py @@ -77,8 +77,48 @@ def add_query( abridge_sql_log: bool = False, retryable_exceptions: Iterable[Type[Exception]] = [], retry_limit: int = 1, - retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1, ) -> Tuple[Connection, Any]: + + """ + Retry function encapsulated here to avoid commitment to some + user-facing interface. Right now, Redshift commits to a 1 second + retry timeout so this serves as a default. + """ + def __execute_query_with_retry( + cursor: Any, + sql: str, + bindings: Optional[Any], + retryable_exceptions: Tuple[Type[Exception], ...], + retry_limit: int, + attempt: int, + ): + """ + A success sees the try exit cleanly and avoid any recursive + retries. Failure begins a sleep and retry routine. + """ + try: + cursor.execute(sql, bindings) + except retryable_exceptions as e: + # Cease retries and fail when limit is hit. + if attempt >= retry_limit: + raise e + + fire_event( + AdapterEventDebug( + message=f"Got a retryable error {type(e)}. {retry_limit-attempt} retries left. Retrying in 1 second.\nError:\n{e}" + ) + ) + time.sleep(1) + + return self.__execute_query_with_retry( + cursor=cursor, + sql=sql, + bindings=bindings, + retryable_exceptions=retryable_exceptions, + retry_limit=retry_limit, + attempt=1, + ) + connection = self.get_thread_connection() if auto_begin and connection.transaction_open is False: self.begin() @@ -107,13 +147,13 @@ def add_query( pre = time.perf_counter() cursor = connection.handle.cursor() - self._retryable_cursor_execute( - execute_fn=cursor.execute, + self.__execute_query_with_retry( + cursor=cursor, sql=sql, bindings=bindings, retryable_exceptions=retryable_exceptions, retry_limit=retry_limit, - retry_timeout=retry_timeout, + attempt=1, ) result = self.get_response(cursor) @@ -223,45 +263,3 @@ def commit(self): connection.transaction_open = False return connection - - def _retryable_cursor_execute( - self, - execute_fn: Callable, - sql: str, - bindings: Optional[Any] = None, - retryable_exceptions: Iterable[Type[Exception]] = [], - retry_limit: int = 1, - retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1, - _attempts: int = 0, - ) -> None: - timeout = retry_timeout(_attempts) if callable(retry_timeout) else retry_timeout - if timeout < 0: - raise DbtRuntimeError("retry_timeout cannot be negative or return a negative time.") - - try: - execute_fn(sql, bindings) - - except tuple(retryable_exceptions) as e: - retry_limit -= 1 - if retry_limit <= 0: - raise e - fire_event( - AdapterEventDebug( - message=f"Got a retryable error {type(e)} when attempting to execute a query.\n" - f"{retry_limit} attempts remaining. Retrying in {timeout} seconds.\n" - f"Error:\n{e}" - ) - ) - - time.sleep(timeout) - return self._retryable_cursor_execute( - execute_fn=execute_fn, - sql=sql, - retry_limit=retry_limit - 1, - retry_timeout=retry_timeout, - retryable_exceptions=retryable_exceptions, - _attempts=_attempts + 1, - ) - - except Exception as e: - raise e From 165bf8b2012c8f9ebb04b05bf1d50bb7bd3cd1d6 Mon Sep 17 00:00:00 2001 From: VersusFacit <67295367+VersusFacit@users.noreply.github.com> Date: Mon, 16 Dec 2024 02:54:46 -0800 Subject: [PATCH 05/10] Update types. --- dbt/adapters/sql/connections.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dbt/adapters/sql/connections.py b/dbt/adapters/sql/connections.py index fc6c02106..5582cafad 100644 --- a/dbt/adapters/sql/connections.py +++ b/dbt/adapters/sql/connections.py @@ -9,9 +9,7 @@ Optional, Tuple, TYPE_CHECKING, - Callable, Type, - Union, ) from dbt_common.events.contextvars import get_node_info @@ -75,7 +73,7 @@ def add_query( auto_begin: bool = True, bindings: Optional[Any] = None, abridge_sql_log: bool = False, - retryable_exceptions: Iterable[Type[Exception]] = [], + retryable_exceptions: Tuple[Type[Exception], ...] = [], retry_limit: int = 1, ) -> Tuple[Connection, Any]: From 657b427cae88b71ac9808ec46502509d316f97f4 Mon Sep 17 00:00:00 2001 From: VersusFacit <67295367+VersusFacit@users.noreply.github.com> Date: Mon, 16 Dec 2024 03:14:09 -0800 Subject: [PATCH 06/10] made them private, should be protected. --- dbt/adapters/sql/connections.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dbt/adapters/sql/connections.py b/dbt/adapters/sql/connections.py index 5582cafad..48de47ecc 100644 --- a/dbt/adapters/sql/connections.py +++ b/dbt/adapters/sql/connections.py @@ -82,7 +82,7 @@ def add_query( user-facing interface. Right now, Redshift commits to a 1 second retry timeout so this serves as a default. """ - def __execute_query_with_retry( + def _execute_query_with_retry( cursor: Any, sql: str, bindings: Optional[Any], @@ -108,7 +108,7 @@ def __execute_query_with_retry( ) time.sleep(1) - return self.__execute_query_with_retry( + return self._execute_query_with_retry( cursor=cursor, sql=sql, bindings=bindings, @@ -145,7 +145,7 @@ def __execute_query_with_retry( pre = time.perf_counter() cursor = connection.handle.cursor() - self.__execute_query_with_retry( + self._execute_query_with_retry( cursor=cursor, sql=sql, bindings=bindings, From 49ddd1ffbbcbd38e17573c9aaf6092d1b60645d9 Mon Sep 17 00:00:00 2001 From: VersusFacit <67295367+VersusFacit@users.noreply.github.com> Date: Mon, 16 Dec 2024 03:17:37 -0800 Subject: [PATCH 07/10] Python things. --- dbt/adapters/sql/connections.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt/adapters/sql/connections.py b/dbt/adapters/sql/connections.py index 48de47ecc..64e4b0957 100644 --- a/dbt/adapters/sql/connections.py +++ b/dbt/adapters/sql/connections.py @@ -108,7 +108,7 @@ def _execute_query_with_retry( ) time.sleep(1) - return self._execute_query_with_retry( + return _execute_query_with_retry( cursor=cursor, sql=sql, bindings=bindings, @@ -145,7 +145,7 @@ def _execute_query_with_retry( pre = time.perf_counter() cursor = connection.handle.cursor() - self._execute_query_with_retry( + _execute_query_with_retry( cursor=cursor, sql=sql, bindings=bindings, From 2610936f8f7a8bd201b718bba1bb8b87e861abb0 Mon Sep 17 00:00:00 2001 From: VersusFacit <67295367+VersusFacit@users.noreply.github.com> Date: Mon, 16 Dec 2024 04:16:02 -0800 Subject: [PATCH 08/10] Fix flake errors. --- dbt/adapters/sql/connections.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/dbt/adapters/sql/connections.py b/dbt/adapters/sql/connections.py index 64e4b0957..e57114d6c 100644 --- a/dbt/adapters/sql/connections.py +++ b/dbt/adapters/sql/connections.py @@ -14,11 +14,10 @@ from dbt_common.events.contextvars import get_node_info from dbt_common.events.functions import fire_event -from dbt_common.exceptions import DbtInternalError, NotImplementedError, DbtRuntimeError +from dbt_common.exceptions import DbtInternalError, NotImplementedError from dbt_common.utils import cast_to_str from dbt.adapters.base import BaseConnectionManager -from dbt.adapters.base.connections import SleepTime from dbt.adapters.contracts.connection import ( AdapterResponse, Connection, @@ -73,15 +72,15 @@ def add_query( auto_begin: bool = True, bindings: Optional[Any] = None, abridge_sql_log: bool = False, - retryable_exceptions: Tuple[Type[Exception], ...] = [], + retryable_exceptions: Tuple[Type[Exception], ...] = tuple(), retry_limit: int = 1, ) -> Tuple[Connection, Any]: - """ Retry function encapsulated here to avoid commitment to some user-facing interface. Right now, Redshift commits to a 1 second retry timeout so this serves as a default. """ + def _execute_query_with_retry( cursor: Any, sql: str, From 8f447c31db83d8540ced77af1695731d46d8fefe Mon Sep 17 00:00:00 2001 From: VersusFacit <67295367+VersusFacit@users.noreply.github.com> Date: Mon, 16 Dec 2024 13:58:11 -0800 Subject: [PATCH 09/10] Readd increment to avoid infinite recursion. --- dbt/adapters/sql/connections.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/sql/connections.py b/dbt/adapters/sql/connections.py index e57114d6c..140533625 100644 --- a/dbt/adapters/sql/connections.py +++ b/dbt/adapters/sql/connections.py @@ -113,7 +113,7 @@ def _execute_query_with_retry( bindings=bindings, retryable_exceptions=retryable_exceptions, retry_limit=retry_limit, - attempt=1, + attempt=attempt+1, ) connection = self.get_thread_connection() From 94f4e11c6690c6c2ddddb8b9608164a6dcb7f315 Mon Sep 17 00:00:00 2001 From: Colin Date: Tue, 17 Dec 2024 09:03:06 -0800 Subject: [PATCH 10/10] formatting --- dbt/adapters/sql/connections.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/sql/connections.py b/dbt/adapters/sql/connections.py index 140533625..04b5e4014 100644 --- a/dbt/adapters/sql/connections.py +++ b/dbt/adapters/sql/connections.py @@ -113,7 +113,7 @@ def _execute_query_with_retry( bindings=bindings, retryable_exceptions=retryable_exceptions, retry_limit=retry_limit, - attempt=attempt+1, + attempt=attempt + 1, ) connection = self.get_thread_connection()