Skip to content

Commit

Permalink
Parallelize lookml generation (#1021)
Browse files Browse the repository at this point in the history
* use dryrun

* Add dryrun

* Update tests to mock dry run

* Have use_cloud_function as parameter

* Pass dryrun instance

* raise DryRunErrors

* Download files from looker-hub

* Review feedback

* Parallelize lookml generation

* Update datagroup tests

* Update tests for multiprocessing usage

* Use FileSystemLoader for loading templates

* Address review feedback

* Review feedback

* Fix repo init
  • Loading branch information
scholtzan authored Aug 6, 2024
1 parent e1b288d commit 64a9f4c
Show file tree
Hide file tree
Showing 17 changed files with 453 additions and 301 deletions.
70 changes: 65 additions & 5 deletions generator/dryrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,21 @@
)


def id_token():
"""Get token to authenticate against Cloud Function."""
auth_req = GoogleAuthRequest()
def credentials(auth_req: Optional[GoogleAuthRequest] = None):
"""Get GCP credentials."""
auth_req = auth_req or GoogleAuthRequest()
creds, _ = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
creds.refresh(auth_req)
return creds


def id_token():
"""Get token to authenticate against Cloud Function."""
auth_req = GoogleAuthRequest()
creds = credentials(auth_req)

if hasattr(creds, "id_token"):
# Get token from default credentials for the current environment created via Cloud SDK run
id_token = creds.id_token
Expand All @@ -43,6 +51,17 @@ def __init__(self, message, error, use_cloud_function, table_id):
self.use_cloud_function = use_cloud_function
self.table_id = table_id

def __reduce__(self):
"""
Override to ensure that all parameters are being passed when pickling.
Pickling happens when passing exception between processes (e.g. via multiprocessing)
"""
return (
self.__class__,
self.args + (self.error, self.use_cloud_function, self.table_id),
)


class Errors(Enum):
"""DryRun errors that require special handling."""
Expand All @@ -53,14 +72,50 @@ class Errors(Enum):
PERMISSION_DENIED = 4


class DryRunContext:
"""DryRun builder class."""

def __init__(
self,
use_cloud_function=False,
id_token=None,
credentials=None,
dry_run_url=DRY_RUN_URL,
):
"""Initialize dry run instance."""
self.use_cloud_function = use_cloud_function
self.dry_run_url = dry_run_url
self.id_token = id_token
self.credentials = credentials

def create(
self,
sql=None,
project="moz-fx-data-shared-prod",
dataset=None,
table=None,
):
"""Initialize a DryRun instance."""
return DryRun(
use_cloud_function=self.use_cloud_function,
id_token=self.id_token,
credentials=self.credentials,
sql=sql,
project=project,
dataset=dataset,
table=table,
dry_run_url=self.dry_run_url,
)


class DryRun:
"""Dry run SQL."""

def __init__(
self,
client=None,
use_cloud_function=False,
id_token=None,
credentials=None,
sql=None,
project="moz-fx-data-shared-prod",
dataset=None,
Expand All @@ -70,12 +125,17 @@ def __init__(
"""Initialize dry run instance."""
self.sql = sql
self.use_cloud_function = use_cloud_function
self.client = client
self.project = project
self.dataset = dataset
self.table = table
self.dry_run_url = dry_run_url
self.id_token = id_token
self.credentials = credentials

@cached_property
def client(self):
"""Get BigQuery client instance."""
return bigquery.Client(credentials=self.credentials)

@cached_property
def dry_run_result(self):
Expand Down
Loading

0 comments on commit 64a9f4c

Please sign in to comment.