Skip to content
This repository has been archived by the owner on Jan 16, 2023. It is now read-only.

intuned github #2

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import dpath
import pendulum
import requests
from requests.auth import AuthBase
from typing import Any, List, Mapping, MutableMapping, Tuple, Union
from abc import abstractmethod


class AbstractGithubIntunedAuthenticator(AuthBase):
"""
Abstract class for an OAuth authenticators that implements the OAuth token refresh flow. The authenticator
is designed to generically perform the refresh flow without regard to how config fields are get/set by
delegating that behavior to the classes implementing the interface.
"""

def __call__(self, request: requests.Request) -> requests.Request:
"""Attach the HTTP headers required to authenticate on the HTTP request"""
request.headers.update(self.get_auth_header())
return request

def get_auth_header(self) -> Mapping[str, Any]:
"""HTTP header to set on the requests"""
return {"Authorization": f"Bearer {self.get_access_token()}"}

def get_access_token(self) -> str:
"""Returns the access token"""
if self.token_has_expired():
current_datetime = pendulum.now()
accessToken, expires_at = self.refresh_access_token()
self.access_token = accessToken
self.set_token_expiry_date(current_datetime, expires_at)
return self.access_token
return self.access_token

def token_has_expired(self) -> bool:
"""Returns True if the token is expired"""
if not self.get_token_expiry_date():
return True
return pendulum.now() > self.get_token_expiry_date()

def _get_refresh_access_token_response(self):
response = requests.request(method="POST", url=self.get_token_refresh_endpoint(), data=self.build_refresh_request_body())
response.raise_for_status()
return response.json()

def refresh_access_token(self) -> Tuple[str, int]:
"""
Returns the refresh token and its lifespan in seconds

:return: a tuple of (access_token, token_lifespan_in_seconds)
"""
try:
response_json = self._get_refresh_access_token_response()
return (
response_json["accessToken"],
response_json["expiresAt"],
)
except Exception as e:
raise Exception(f"Error while refreshing access token: {e}") from e

def build_refresh_request_body(self) -> Mapping[str, Any]:
"""
Returns the request body to set on the refresh request

Override to define additional parameters
"""
payload: MutableMapping[str, Any] = {
"installationId": self.get_installation_id(),
"secret": self.get_secret(),
}
return payload

@abstractmethod
def get_token_refresh_endpoint(self) -> str:
"""Returns the endpoint to refresh the access token"""

@abstractmethod
def get_installation_id(self) -> int:
"""The client id to authenticate"""

@abstractmethod
def get_secret(self) -> str:
"""The client secret to authenticate"""

@abstractmethod
def get_token_expiry_date(self) -> pendulum.DateTime:
"""Expiration date of the access token"""

@abstractmethod
def set_token_expiry_date(self, initial_time: pendulum.DateTime, value: Union[str, int]):
"""Setter for access token expiration date"""

@property
@abstractmethod
def access_token(self) -> str:
"""Returns the access token"""

@access_token.setter
@abstractmethod
def access_token(self, value: str) -> str:
"""Setter for the access token"""

class GithubIntunedAuthenticator(AbstractGithubIntunedAuthenticator):
"""
Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials.
The generated access token is attached to each request via the Authorization header.
If a connector_config is provided any mutation of it's value in the scope of this class will emit AirbyteControlConnectorConfigMessage.
"""

def __init__(
self,
token_refresh_endpoint: str,
installation_id: str,
secret: str,
):
self._token_refresh_endpoint = token_refresh_endpoint
self.installation_id = installation_id
self.secret = secret
self._token_expiry_date = None
self._access_token = None

def get_token_refresh_endpoint(self) -> str:
return self._token_refresh_endpoint

def get_installation_id(self) -> str:
return self.installation_id

def get_secret(self) -> str:
return self.secret

def get_access_token_name(self) -> str:
return self._access_token_name

def get_token_expiry_date(self) -> pendulum.DateTime:
return self._token_expiry_date

def set_token_expiry_date(self, initial_time: pendulum.DateTime, value: Union[str, int]):
self._token_expiry_date = pendulum.parse(value)


@property
def access_token(self) -> str:
return self._access_token

@access_token.setter
def access_token(self, value: str):
self._access_token = value
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.auth import MultipleTokenAuthenticator
from .authenticator import GithubIntunedAuthenticator

from .streams import (
Assignees,
Expand Down Expand Up @@ -58,12 +58,12 @@

class SourceGithub(AbstractSource):
@staticmethod
def _get_org_repositories(config: Mapping[str, Any], authenticator: MultipleTokenAuthenticator) -> Tuple[List[str], List[str]]:
def _get_org_repositories(config: Mapping[str, Any], authenticator: GithubIntunedAuthenticator) -> Tuple[List[str], List[str]]:
"""
Parse config.repository and produce two lists: organizations, repositories.
Args:
config (dict): Dict representing connector's config
authenticator(MultipleTokenAuthenticator): authenticator object
authenticator(GithubIntunedAuthenticator): authenticator object
"""
config_repositories = set(filter(None, config["repository"].split(" ")))
if not config_repositories:
Expand Down Expand Up @@ -104,14 +104,8 @@ def _get_org_repositories(config: Mapping[str, Any], authenticator: MultipleToke

@staticmethod
def _get_authenticator(config: Dict[str, Any]):
# Before we supported oauth, personal_access_token was called `access_token` and it lived at the
# config root. So we first check to make sure any backwards compatbility is handled.
token = config.get("access_token")
if not token:
creds = config.get("credentials")
token = creds.get("access_token") or creds.get("personal_access_token")
tokens = [t.strip() for t in token.split(TOKEN_SEPARATOR)]
return MultipleTokenAuthenticator(tokens=tokens, auth_method="token")
credentials = config.get("credentials")
return GithubIntunedAuthenticator(installation_id=credentials["installationId"], token_refresh_endpoint=credentials["refresh_api"], secret=credentials["secret"])

@staticmethod
def _get_branches_data(selected_branches: str, full_refresh_args: Dict[str, Any] = None) -> Tuple[Dict[str, str], Dict[str, List[str]]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,33 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "GitHub Source Spec",
"type": "object",
"required": ["start_date", "repository"],
"required": ["start_date", "repository", "credentials"],
"additionalProperties": true,
"properties": {
"credentials": {
"title": "Authentication",
"description": "Choose how to authenticate to GitHub",
"type": "object",
"order": 0,
"oneOf": [
{
"type": "object",
"title": "OAuth",
"required": ["access_token"],
"properties": {
"option_title": {
"type": "string",
"const": "OAuth Credentials",
"order": 0
},
"access_token": {
"type": "string",
"title": "Access Token",
"description": "OAuth access token",
"airbyte_secret": true
}
}
"required": ["installationId", "secret", "refresh_api"],
"properties": {
"installationId": {
"type": "integer",
"title": "Installation id",
"description": "github installation id"
},
"secret": {
"type": "string",
"title": "secret",
"description": "secret specified in intuned",
"airbyte_secret": true
},
{
"type": "object",
"title": "Personal Access Token",
"required": ["personal_access_token"],
"properties": {
"option_title": {
"type": "string",
"const": "PAT Credentials",
"order": 0
},
"personal_access_token": {
"type": "string",
"title": "Personal Access Tokens",
"description": "Log into GitHub and then generate a <a href=\"https://github.com/settings/tokens\">personal access token</a>. To load balance your API quota consumption across multiple API tokens, input multiple tokens separated with \",\"",
"airbyte_secret": true
}
}
"refresh_api": {
"type": "string",
"title": "refresh api",
"description": "refresh api specified in intuned"
}
]
}
},
"start_date": {
"type": "string",
Expand Down Expand Up @@ -87,48 +68,5 @@
"order": 4
}
}
},
"advanced_auth": {
"auth_flow_type": "oauth2.0",
"predicate_key": ["credentials", "option_title"],
"predicate_value": "OAuth Credentials",
"oauth_config_specification": {
"complete_oauth_output_specification": {
"type": "object",
"additionalProperties": false,
"properties": {
"access_token": {
"type": "string",
"path_in_connector_config": ["credentials", "access_token"]
}
}
},
"complete_oauth_server_input_specification": {
"type": "object",
"additionalProperties": false,
"properties": {
"client_id": {
"type": "string"
},
"client_secret": {
"type": "string"
}
}
},
"complete_oauth_server_output_specification": {
"type": "object",
"additionalProperties": false,
"properties": {
"client_id": {
"type": "string",
"path_in_connector_config": ["credentials", "client_id"]
},
"client_secret": {
"type": "string",
"path_in_connector_config": ["credentials", "client_secret"]
}
}
}
}
}
}