From d095f4125f76a600b73bf32e90d889b783248899 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 4 Aug 2024 21:59:01 +0000 Subject: [PATCH] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tap_github/authenticator.py | 22 +- tap_github/tests/test_authenticator.py | 292 +++++++++++++++---------- 2 files changed, 196 insertions(+), 118 deletions(-) diff --git a/tap_github/authenticator.py b/tap_github/authenticator.py index 75b4ca69..4a16c7cc 100644 --- a/tap_github/authenticator.py +++ b/tap_github/authenticator.py @@ -102,7 +102,7 @@ def generate_jwt_token( github_app_id: str, github_private_key: str, expiration_time: int = 600, - algorithm: str = "RS256" + algorithm: str = "RS256", ) -> str: actual_time = int(time.time()) @@ -189,12 +189,16 @@ def claim_token(self): '":app_id:;;-----BEGIN RSA PRIVATE KEY-----\\n_YOUR_P_KEY_\\n-----END RSA PRIVATE KEY-----"' ) - self.token, self.token_expires_at = generate_app_access_token(self.github_app_id, self.github_private_key, self.github_installation_id) + self.token, self.token_expires_at = generate_app_access_token( + self.github_app_id, self.github_private_key, self.github_installation_id + ) # Check if the token isn't valid. If not, overwrite it with None if not self.is_valid_token(): if self.logger: - self.logger.warning("An app token was generated but could not be validated.") + self.logger.warning( + "An app token was generated but could not be validated." + ) self.token = None self.token_expires_at = None @@ -235,7 +239,9 @@ def prepare_tokens(self) -> List[TokenManager]: token_managers: List[TokenManager] = [] for token in personal_tokens: - token_manager = PersonalTokenManager(token, rate_limit_buffer=rate_limit_buffer, logger=self.logger) + token_manager = PersonalTokenManager( + token, rate_limit_buffer=rate_limit_buffer, logger=self.logger + ) if token_manager.is_valid_token(): token_managers.append(token_manager) @@ -245,11 +251,15 @@ def prepare_tokens(self) -> List[TokenManager]: # "{app_id};;{-----BEGIN RSA PRIVATE KEY-----\n_YOUR_PRIVATE_KEY_\n-----END RSA PRIVATE KEY-----}" env_key = env_dict["GITHUB_APP_PRIVATE_KEY"] try: - app_token_manager = AppTokenManager(env_key, rate_limit_buffer=rate_limit_buffer, logger=self.logger) + app_token_manager = AppTokenManager( + env_key, rate_limit_buffer=rate_limit_buffer, logger=self.logger + ) if app_token_manager.is_valid_token(): token_managers.append(app_token_manager) except ValueError as e: - self.logger.warn(f"An error was thrown while preparing an app token: {e}") + self.logger.warn( + f"An error was thrown while preparing an app token: {e}" + ) self.logger.info(f"Tap will run with {len(token_managers)} auth tokens") return token_managers diff --git a/tap_github/tests/test_authenticator.py b/tap_github/tests/test_authenticator.py index dd897026..e3917ce7 100644 --- a/tap_github/tests/test_authenticator.py +++ b/tap_github/tests/test_authenticator.py @@ -3,9 +3,14 @@ import pytest import requests - from singer_sdk.streams import RESTStream -from tap_github.authenticator import AppTokenManager, GitHubTokenAuthenticator, PersonalTokenManager, TokenManager + +from tap_github.authenticator import ( + AppTokenManager, + GitHubTokenAuthenticator, + PersonalTokenManager, + TokenManager, +) class TestTokenManager: @@ -118,44 +123,46 @@ def test_has_calls_remaining_fails_if_few_calls_remaining_and_reset_time_not_rea class TestAppTokenManager: def test_initialization_with_3_part_env_key(self): - with patch.object(AppTokenManager, 'claim_token', return_value=None): - token_manager = AppTokenManager('12345;;key\\ncontent;;67890') - assert token_manager.github_app_id == '12345' - assert token_manager.github_private_key == 'key\ncontent' - assert token_manager.github_installation_id == '67890' + with patch.object(AppTokenManager, "claim_token", return_value=None): + token_manager = AppTokenManager("12345;;key\\ncontent;;67890") + assert token_manager.github_app_id == "12345" + assert token_manager.github_private_key == "key\ncontent" + assert token_manager.github_installation_id == "67890" def test_initialization_with_2_part_env_key(self): - with patch.object(AppTokenManager, 'claim_token', return_value=None): - token_manager = AppTokenManager('12345;;key\\ncontent') - assert token_manager.github_app_id == '12345' - assert token_manager.github_private_key == 'key\ncontent' - assert token_manager.github_installation_id == '' + with patch.object(AppTokenManager, "claim_token", return_value=None): + token_manager = AppTokenManager("12345;;key\\ncontent") + assert token_manager.github_app_id == "12345" + assert token_manager.github_private_key == "key\ncontent" + assert token_manager.github_installation_id == "" def test_initialization_with_malformed_env_key(self): with pytest.raises(ValueError) as exc_info: - AppTokenManager('12345key\\ncontent') + AppTokenManager("12345key\\ncontent") assert str(exc_info.value) == ( - 'GITHUB_APP_PRIVATE_KEY could not be parsed. The expected format is ' + "GITHUB_APP_PRIVATE_KEY could not be parsed. The expected format is " '":app_id:;;-----BEGIN RSA PRIVATE KEY-----\\n_YOUR_P_KEY_\\n-----END RSA PRIVATE KEY-----"' ) def test_generate_token_with_invalid_credentials(self): - with patch.object(AppTokenManager, 'is_valid_token', return_value=False), \ - patch('tap_github.authenticator.generate_app_access_token', - return_value=('some_token', MagicMock())): - token_manager = AppTokenManager('12345;;key\\ncontent;;67890') + with patch.object(AppTokenManager, "is_valid_token", return_value=False), patch( + "tap_github.authenticator.generate_app_access_token", + return_value=("some_token", MagicMock()), + ): + token_manager = AppTokenManager("12345;;key\\ncontent;;67890") assert token_manager.token is None assert token_manager.token_expires_at is None def test_successful_token_generation(self): token_time = MagicMock() - with patch.object(AppTokenManager, 'is_valid_token', return_value=True), \ - patch('tap_github.authenticator.generate_app_access_token', - return_value=('valid_token', token_time)): - token_manager = AppTokenManager('12345;;key\\ncontent;;67890') + with patch.object(AppTokenManager, "is_valid_token", return_value=True), patch( + "tap_github.authenticator.generate_app_access_token", + return_value=("valid_token", token_time), + ): + token_manager = AppTokenManager("12345;;key\\ncontent;;67890") token_manager.claim_token() - assert token_manager.token == 'valid_token' + assert token_manager.token == "valid_token" assert token_manager.token_expires_at == token_time @@ -163,18 +170,17 @@ def test_successful_token_generation(self): def mock_stream(): stream = MagicMock(spec=RESTStream) stream.logger = MagicMock() - stream.tap_name = 'tap_github' - stream.config = { - 'rate_limit_buffer': 5 - } + stream.tap_name = "tap_github" + stream.config = {"rate_limit_buffer": 5} return stream class TestGitHubTokenAuthenticator: def test_prepare_tokens_returns_empty_if_none_found(self, mock_stream): - with patch('os.environ', {'GITHUB_TLJKJFDS': 'gt1'}), \ - patch.object(PersonalTokenManager, 'is_valid_token', return_value=True): + with patch("os.environ", {"GITHUB_TLJKJFDS": "gt1"}), patch.object( + PersonalTokenManager, "is_valid_token", return_value=True + ): auth = GitHubTokenAuthenticator(stream=mock_stream) token_managers = auth.prepare_tokens() @@ -182,156 +188,214 @@ def test_prepare_tokens_returns_empty_if_none_found(self, mock_stream): assert len(token_managers) == 0 def test_config_auth_token_only(self, mock_stream): - with patch.object(GitHubTokenAuthenticator, 'get_env', - return_value={'OTHER_TOKEN': 'blah', 'NOT_THE_RIGHT_TOKEN': 'meh'}), \ - patch.object(PersonalTokenManager, 'is_valid_token', return_value=True): + with patch.object( + GitHubTokenAuthenticator, + "get_env", + return_value={"OTHER_TOKEN": "blah", "NOT_THE_RIGHT_TOKEN": "meh"}, + ), patch.object(PersonalTokenManager, "is_valid_token", return_value=True): stream = mock_stream - stream.config.update({'auth_token': 'gt5'}) + stream.config.update({"auth_token": "gt5"}) auth = GitHubTokenAuthenticator(stream=stream) token_managers = auth.prepare_tokens() assert len(token_managers) == 1 - assert token_managers[0].token == 'gt5' + assert token_managers[0].token == "gt5" def test_config_additional_auth_tokens_only(self, mock_stream): - with patch.object(GitHubTokenAuthenticator, 'get_env', - return_value={'OTHER_TOKEN': 'blah', 'NOT_THE_RIGHT_TOKEN': 'meh'}), \ - patch.object(PersonalTokenManager, 'is_valid_token', return_value=True): + with patch.object( + GitHubTokenAuthenticator, + "get_env", + return_value={"OTHER_TOKEN": "blah", "NOT_THE_RIGHT_TOKEN": "meh"}, + ), patch.object(PersonalTokenManager, "is_valid_token", return_value=True): stream = mock_stream - stream.config.update({ - 'additional_auth_tokens': ['gt7', 'gt8', 'gt9'] - }) + stream.config.update({"additional_auth_tokens": ["gt7", "gt8", "gt9"]}) auth = GitHubTokenAuthenticator(stream=stream) token_managers = auth.prepare_tokens() assert len(token_managers) == 3 - assert sorted({tm.token for tm in token_managers}) == ['gt7', 'gt8', 'gt9'] + assert sorted({tm.token for tm in token_managers}) == ["gt7", "gt8", "gt9"] def test_env_personal_tokens_only(self, mock_stream): - with patch.object(GitHubTokenAuthenticator, 'get_env', - return_value={'GITHUB_TOKEN1': 'gt1', 'GITHUB_TOKENxyz': 'gt2', 'OTHER_TOKEN': 'blah'}), \ - patch.object(PersonalTokenManager, 'is_valid_token', return_value=True): + with patch.object( + GitHubTokenAuthenticator, + "get_env", + return_value={ + "GITHUB_TOKEN1": "gt1", + "GITHUB_TOKENxyz": "gt2", + "OTHER_TOKEN": "blah", + }, + ), patch.object(PersonalTokenManager, "is_valid_token", return_value=True): auth = GitHubTokenAuthenticator(stream=mock_stream) token_managers = auth.prepare_tokens() assert len(token_managers) == 2 - assert sorted({tm.token for tm in token_managers}) == ['gt1', 'gt2'] + assert sorted({tm.token for tm in token_managers}) == ["gt1", "gt2"] def test_env_app_key_only(self, mock_stream): - with patch.object(GitHubTokenAuthenticator, 'get_env', - return_value={'GITHUB_APP_PRIVATE_KEY': '123;;key', 'OTHER_TOKEN': 'blah'}), \ - patch.object(AppTokenManager, 'is_valid_token', return_value=True), \ - patch('tap_github.authenticator.generate_app_access_token', - return_value=('installationtoken12345', MagicMock())): + with patch.object( + GitHubTokenAuthenticator, + "get_env", + return_value={"GITHUB_APP_PRIVATE_KEY": "123;;key", "OTHER_TOKEN": "blah"}, + ), patch.object(AppTokenManager, "is_valid_token", return_value=True), patch( + "tap_github.authenticator.generate_app_access_token", + return_value=("installationtoken12345", MagicMock()), + ): auth = GitHubTokenAuthenticator(stream=mock_stream) token_managers = auth.prepare_tokens() assert len(token_managers) == 1 - assert token_managers[0].token == 'installationtoken12345' + assert token_managers[0].token == "installationtoken12345" def test_all_token_types(self, mock_stream): # Expectations: # - the presence of additional_auth_tokens causes personal tokens in the environment to be ignored. # - the other types all coexist - with patch.object(GitHubTokenAuthenticator, 'get_env', - return_value={'GITHUB_TOKEN1': 'gt1', 'GITHUB_TOKENxyz': 'gt2', - 'GITHUB_APP_PRIVATE_KEY': '123;;key;;install_id', 'OTHER_TOKEN': 'blah'}), \ - patch.object(TokenManager, 'is_valid_token', return_value=True), \ - patch('tap_github.authenticator.generate_app_access_token', - return_value=('installationtoken12345', MagicMock())): + with patch.object( + GitHubTokenAuthenticator, + "get_env", + return_value={ + "GITHUB_TOKEN1": "gt1", + "GITHUB_TOKENxyz": "gt2", + "GITHUB_APP_PRIVATE_KEY": "123;;key;;install_id", + "OTHER_TOKEN": "blah", + }, + ), patch.object(TokenManager, "is_valid_token", return_value=True), patch( + "tap_github.authenticator.generate_app_access_token", + return_value=("installationtoken12345", MagicMock()), + ): stream = mock_stream - stream.config.update({ - 'auth_token': 'gt5', - 'additional_auth_tokens': ['gt7', 'gt8', 'gt9'] - }) + stream.config.update( + {"auth_token": "gt5", "additional_auth_tokens": ["gt7", "gt8", "gt9"]} + ) auth = GitHubTokenAuthenticator(stream=stream) token_managers = auth.prepare_tokens() assert len(token_managers) == 5 - assert sorted({tm.token for tm in token_managers}) == ['gt5', 'gt7', 'gt8', 'gt9', 'installationtoken12345'] + assert sorted({tm.token for tm in token_managers}) == [ + "gt5", + "gt7", + "gt8", + "gt9", + "installationtoken12345", + ] def test_all_token_types_except_additional_auth_tokens(self, mock_stream): # Expectations: # - in the absence of additional_auth_tokens, all the other types can coexist - with patch.object(GitHubTokenAuthenticator, 'get_env', - return_value={'GITHUB_TOKEN1': 'gt1', 'GITHUB_TOKENxyz': 'gt2', - 'GITHUB_APP_PRIVATE_KEY': '123;;key;;install_id', 'OTHER_TOKEN': 'blah'}), \ - patch.object(TokenManager, 'is_valid_token', return_value=True), \ - patch('tap_github.authenticator.generate_app_access_token', - return_value=('installationtoken12345', MagicMock())): + with patch.object( + GitHubTokenAuthenticator, + "get_env", + return_value={ + "GITHUB_TOKEN1": "gt1", + "GITHUB_TOKENxyz": "gt2", + "GITHUB_APP_PRIVATE_KEY": "123;;key;;install_id", + "OTHER_TOKEN": "blah", + }, + ), patch.object(TokenManager, "is_valid_token", return_value=True), patch( + "tap_github.authenticator.generate_app_access_token", + return_value=("installationtoken12345", MagicMock()), + ): stream = mock_stream - stream.config.update({ - 'auth_token': 'gt5', - }) + stream.config.update( + { + "auth_token": "gt5", + } + ) auth = GitHubTokenAuthenticator(stream=stream) token_managers = auth.prepare_tokens() assert len(token_managers) == 4 - assert sorted({tm.token for tm in token_managers}) == ['gt1', 'gt2', 'gt5', 'installationtoken12345'] + assert sorted({tm.token for tm in token_managers}) == [ + "gt1", + "gt2", + "gt5", + "installationtoken12345", + ] def test_auth_token_and_additional_auth_tokens_deduped(self, mock_stream): - with patch.object(GitHubTokenAuthenticator, 'get_env', - return_value={'GITHUB_TOKEN1': 'gt1', 'GITHUB_TOKENxyz': 'gt2', 'OTHER_TOKEN': 'blah'}), \ - patch.object(TokenManager, 'is_valid_token', return_value=True), \ - patch('tap_github.authenticator.generate_app_access_token', - return_value=('installationtoken12345', MagicMock())): + with patch.object( + GitHubTokenAuthenticator, + "get_env", + return_value={ + "GITHUB_TOKEN1": "gt1", + "GITHUB_TOKENxyz": "gt2", + "OTHER_TOKEN": "blah", + }, + ), patch.object(TokenManager, "is_valid_token", return_value=True), patch( + "tap_github.authenticator.generate_app_access_token", + return_value=("installationtoken12345", MagicMock()), + ): stream = mock_stream - stream.config.update({ - 'auth_token': 'gt1', - 'additional_auth_tokens': ['gt1', 'gt1', 'gt8', 'gt8', 'gt9'] - }) + stream.config.update( + { + "auth_token": "gt1", + "additional_auth_tokens": ["gt1", "gt1", "gt8", "gt8", "gt9"], + } + ) auth = GitHubTokenAuthenticator(stream=stream) token_managers = auth.prepare_tokens() assert len(token_managers) == 3 - assert sorted({tm.token for tm in token_managers}) == ['gt1', 'gt8', 'gt9'] + assert sorted({tm.token for tm in token_managers}) == ["gt1", "gt8", "gt9"] def test_auth_token_and_env_tokens_deduped(self, mock_stream): - with patch.object(GitHubTokenAuthenticator, 'get_env', - return_value={'GITHUB_TOKEN1': 'gt1', 'GITHUB_TOKENa': 'gt2', 'GITHUB_TOKENxyz': 'gt2', 'OTHER_TOKEN': 'blah'}), \ - patch.object(TokenManager, 'is_valid_token', return_value=True), \ - patch('tap_github.authenticator.generate_app_access_token', - return_value=('installationtoken12345', MagicMock())): + with patch.object( + GitHubTokenAuthenticator, + "get_env", + return_value={ + "GITHUB_TOKEN1": "gt1", + "GITHUB_TOKENa": "gt2", + "GITHUB_TOKENxyz": "gt2", + "OTHER_TOKEN": "blah", + }, + ), patch.object(TokenManager, "is_valid_token", return_value=True), patch( + "tap_github.authenticator.generate_app_access_token", + return_value=("installationtoken12345", MagicMock()), + ): stream = mock_stream - stream.config.update({ - 'auth_token': 'gt1' - }) + stream.config.update({"auth_token": "gt1"}) auth = GitHubTokenAuthenticator(stream=stream) token_managers = auth.prepare_tokens() assert len(token_managers) == 2 - assert sorted({tm.token for tm in token_managers}) == ['gt1', 'gt2'] + assert sorted({tm.token for tm in token_managers}) == ["gt1", "gt2"] def test_handle_error_if_app_key_invalid(self, mock_stream): # Confirm expected behaviour if an error is raised while setting up the app token manager: # - don't crash # - print the error as a warning # - continue with any other obtained tokens - with patch.object(GitHubTokenAuthenticator, 'get_env', - return_value={'GITHUB_APP_PRIVATE_KEY': '123garbagekey'}), \ - patch('tap_github.authenticator.AppTokenManager') as mock_app_manager: - mock_app_manager.side_effect = ValueError('Invalid key format') + with patch.object( + GitHubTokenAuthenticator, + "get_env", + return_value={"GITHUB_APP_PRIVATE_KEY": "123garbagekey"}, + ), patch("tap_github.authenticator.AppTokenManager") as mock_app_manager: + mock_app_manager.side_effect = ValueError("Invalid key format") auth = GitHubTokenAuthenticator(stream=mock_stream) auth.prepare_tokens() mock_stream.logger.warn.assert_called_with( - 'An error was thrown while preparing an app token: Invalid key format') + "An error was thrown while preparing an app token: Invalid key format" + ) def test_exclude_generated_app_token_if_invalid(self, mock_stream): - with patch.object(GitHubTokenAuthenticator, 'get_env', - return_value={'GITHUB_APP_PRIVATE_KEY': '123;;key'}), \ - patch.object(AppTokenManager, 'is_valid_token', return_value=False), \ - patch('tap_github.authenticator.generate_app_access_token', - return_value=('installationtoken12345', MagicMock())): + with patch.object( + GitHubTokenAuthenticator, + "get_env", + return_value={"GITHUB_APP_PRIVATE_KEY": "123;;key"}, + ), patch.object(AppTokenManager, "is_valid_token", return_value=False), patch( + "tap_github.authenticator.generate_app_access_token", + return_value=("installationtoken12345", MagicMock()), + ): auth = GitHubTokenAuthenticator(stream=mock_stream) token_managers = auth.prepare_tokens() @@ -339,20 +403,24 @@ def test_exclude_generated_app_token_if_invalid(self, mock_stream): assert len(token_managers) == 0 def test_prepare_tokens_returns_empty_if_all_tokens_invalid(self, mock_stream): - with patch.object(GitHubTokenAuthenticator, 'get_env', - return_value={'GITHUB_TOKEN1': 'gt1', 'GITHUB_APP_PRIVATE_KEY': '123;;key'}), \ - patch.object(PersonalTokenManager, 'is_valid_token', return_value=False), \ - patch.object(AppTokenManager, 'is_valid_token', return_value=False), \ - patch('tap_github.authenticator.generate_app_access_token', - return_value=('installationtoken12345', MagicMock())): + with patch.object( + GitHubTokenAuthenticator, + "get_env", + return_value={"GITHUB_TOKEN1": "gt1", "GITHUB_APP_PRIVATE_KEY": "123;;key"}, + ), patch.object( + PersonalTokenManager, "is_valid_token", return_value=False + ), patch.object( + AppTokenManager, "is_valid_token", return_value=False + ), patch( + "tap_github.authenticator.generate_app_access_token", + return_value=("installationtoken12345", MagicMock()), + ): stream = mock_stream - stream.config.update({ - 'auth_token': 'gt5', - 'additional_auth_tokens': ['gt7', 'gt8', 'gt9'] - }) + stream.config.update( + {"auth_token": "gt5", "additional_auth_tokens": ["gt7", "gt8", "gt9"]} + ) auth = GitHubTokenAuthenticator(stream=stream) token_managers = auth.prepare_tokens() assert len(token_managers) == 0 -