Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Tokens in config are no longer ignored when there are tokens in the environment, part of refactoring in preparation for app token refreshing #284

Merged
merged 9 commits into from
Aug 9, 2024
129 changes: 95 additions & 34 deletions tap_github/authenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import logging
import time
from copy import deepcopy
from datetime import datetime
from datetime import datetime, timedelta
from os import environ
from random import choice, shuffle
from typing import Any, Dict, List, Optional, Set
from typing import Any, Dict, List, Optional, Set, Tuple

import jwt
import requests
Expand All @@ -15,7 +15,9 @@


class TokenManager:
"""A class to store a token's attributes and state."""
"""A class to store a token's attributes and state.
This parent class should not be used directly, use a subclass instead.
"""

DEFAULT_RATE_LIMIT = 5000
# The DEFAULT_RATE_LIMIT_BUFFER buffer serves two purposes:
Expand All @@ -25,7 +27,7 @@ class TokenManager:

def __init__(
self,
token: str,
token: Optional[str],
rate_limit_buffer: Optional[int] = None,
logger: Optional[Any] = None,
):
Expand All @@ -50,6 +52,9 @@ def update_rate_limit(self, response_headers: Any) -> None:

def is_valid_token(self) -> bool:
"""Try making a request with the current token. If the request succeeds return True, else False."""
if not self.token:
return False
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved

try:
response = requests.get(
url="https://api.github.com/rate_limit",
Expand All @@ -61,7 +66,7 @@ def is_valid_token(self) -> bool:
return True
except requests.exceptions.HTTPError:
msg = (
f"A token was dismissed. "
f"A token could not be validated. "
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
f"{response.status_code} Client Error: "
f"{str(response.content)} (Reason: {response.reason})"
)
Expand All @@ -70,7 +75,7 @@ def is_valid_token(self) -> bool:
return False

def has_calls_remaining(self) -> bool:
"""Check if token is valid.
"""Check if a token has capacity to make more calls.

Returns:
True if the token is valid and has enough api calls remaining.
Expand All @@ -85,6 +90,14 @@ def has_calls_remaining(self) -> bool:
return True


class PersonalTokenManager(TokenManager):
"""A class to store token rate limiting information."""

def __init__(self, token: str, rate_limit_buffer: Optional[int] = None, **kwargs):
"""Init PersonalTokenRateLimit info."""
super().__init__(token, rate_limit_buffer=rate_limit_buffer, **kwargs)


def generate_jwt_token(
github_app_id: str,
github_private_key: str,
Expand All @@ -110,7 +123,8 @@ def generate_app_access_token(
github_app_id: str,
github_private_key: str,
github_installation_id: Optional[str] = None,
) -> str:
) -> Tuple[str, datetime]:
produced_at = datetime.now()
jwt_token = generate_jwt_token(github_app_id, github_private_key)

headers = {"Authorization": f"Bearer {jwt_token}"}
Expand All @@ -135,14 +149,71 @@ def generate_app_access_token(
if resp.status_code != 201:
resp.raise_for_status()

return resp.json()["token"]
expires_at = produced_at + timedelta(hours=1)
return resp.json()["token"], expires_at
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved


class AppTokenManager(TokenManager):
"""A class to store an app token's attributes and state, and handle token refreshing"""

DEFAULT_RATE_LIMIT = 15000
DEFAULT_EXPIRY_BUFFER_MINS = 10

def __init__(self, env_key: str, rate_limit_buffer: Optional[int] = None, **kwargs):
if rate_limit_buffer is None:
rate_limit_buffer = self.DEFAULT_RATE_LIMIT_BUFFER
super().__init__(None, rate_limit_buffer=rate_limit_buffer, **kwargs)

parts = env_key.split(";;")
self.github_app_id = parts[0]
self.github_private_key = (parts[1:2] or [""])[0].replace("\\n", "\n")
self.github_installation_id: Optional[str] = (parts[2:3] or [""])[0]

self.token_expires_at: Optional[datetime] = None
self.claim_token()

def claim_token(self):
"""Updates the TokenManager's token and token_expires_at attributes.

The outcome will be _either_ that self.token is updated to a newly claimed valid token and
self.token_expires_at is updated to the anticipated expiry time (erring on the side of an early estimate)
_or_ self.token and self.token_expires_at are both set to None.
"""
self.token = None
self.token_expires_at = None

# Make sure we have the details we need
if not self.github_app_id or not self.github_private_key:
raise ValueError(
"GITHUB_APP_PRIVATE_KEY could not be parsed. The expected format is "
'":app_id:;;-----BEGIN RSA PRIVATE KEY-----\\n_YOUR_P_KEY_\\n-----END RSA PRIVATE KEY-----"'
)

self.token, self.token_expires_at = generate_app_access_token(
self.github_app_id, self.github_private_key, self.github_installation_id
)

# Check if the token isn't valid. If not, overwrite it with None
if not self.is_valid_token():
if self.logger:
self.logger.warning(
"An app token was generated but could not be validated."
)
self.token = None
self.token_expires_at = None


class GitHubTokenAuthenticator(APIAuthenticatorBase):
"""Base class for offloading API auth."""

@staticmethod
def get_env():
return dict(environ)
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved

def prepare_tokens(self) -> List[TokenManager]:
# Save GitHub tokens
"""Prep GitHub tokens"""

env_dict = self.get_env()
rate_limit_buffer = self._config.get("rate_limit_buffer", None)

personal_tokens: Set[str] = set()
Expand All @@ -156,52 +227,42 @@ def prepare_tokens(self) -> List[TokenManager]:
# Accept multiple tokens using environment variables GITHUB_TOKEN*
env_tokens = {
value
for key, value in environ.items()
for key, value in env_dict.items()
if key.startswith("GITHUB_TOKEN")
}
if len(env_tokens) > 0:
self.logger.info(
f"Found {len(env_tokens)} 'GITHUB_TOKEN' environment variables for authentication."
)
personal_tokens = env_tokens
personal_tokens = personal_tokens.union(env_tokens)
TrishGillett marked this conversation as resolved.
Show resolved Hide resolved

token_managers: List[TokenManager] = []
for token in personal_tokens:
token_manager = TokenManager(
token_manager = PersonalTokenManager(
token, rate_limit_buffer=rate_limit_buffer, logger=self.logger
)
if token_manager.is_valid_token():
token_managers.append(token_manager)
else:
logging.warn("A token was dismissed.")

# Parse App level private key and generate a token
if "GITHUB_APP_PRIVATE_KEY" in environ.keys():
if "GITHUB_APP_PRIVATE_KEY" in env_dict.keys():
# To simplify settings, we use a single env-key formatted as follows:
# "{app_id};;{-----BEGIN RSA PRIVATE KEY-----\n_YOUR_PRIVATE_KEY_\n-----END RSA PRIVATE KEY-----}"
parts = environ["GITHUB_APP_PRIVATE_KEY"].split(";;")
github_app_id = parts[0]
github_private_key = (parts[1:2] or [""])[0].replace("\\n", "\n")
github_installation_id = (parts[2:3] or [""])[0]

if not (github_private_key):
self.logger.warning(
"GITHUB_APP_PRIVATE_KEY could not be parsed. The expected format is "
'":app_id:;;-----BEGIN RSA PRIVATE KEY-----\n_YOUR_P_KEY_\n-----END RSA PRIVATE KEY-----"'
env_key = env_dict["GITHUB_APP_PRIVATE_KEY"]
try:
app_token_manager = AppTokenManager(
env_key, rate_limit_buffer=rate_limit_buffer, logger=self.logger
)

else:
app_token = generate_app_access_token(
github_app_id, github_private_key, github_installation_id or None
)
token_manager = TokenManager(
app_token, rate_limit_buffer=rate_limit_buffer, logger=self.logger
if app_token_manager.is_valid_token():
token_managers.append(app_token_manager)
except ValueError as e:
self.logger.warn(
f"An error was thrown while preparing an app token: {e}"
)
if token_manager.is_valid_token():
token_managers.append(token_manager)

self.logger.info(f"Tap will run with {len(token_managers)} auth tokens")

# Create a dict of TokenManager
# TODO - separate app_token and add logic to refresh the token using generate_app_access_token.
return token_managers

def __init__(self, stream: RESTStream) -> None:
Expand Down
Loading