diff --git a/bqflow/bq.py b/bqflow/bq.py new file mode 100755 index 0000000..604ef9f --- /dev/null +++ b/bqflow/bq.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python3 + +########################################################################### +# +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +########################################################################### + +import json +import textwrap +import argparse + +from util.bigquery_api import BigQuery, get_schema +from util.configuration import Configuration +from util.csv import csv_to_rows +from util.google_api import API_BigQuery + + +def dashboard_template(schema, _level=0): + """ Helper for creating null query used in Looker Studio. + + Generates a query string that when called generates the exact + schema that is given as an argument. + + Args: + - schema: (dict) The schema as returned by BigQuery. + - _level: (int) Used to track indentation, not passed by caller. + + Returns: + String containing the query. + """ + + fields = [] + + for field in schema: + if field['type'] == 'RECORD': + if field['mode'] == 'REPEATED': + fields.append('ARRAY (SELECT AS STRUCT {}) AS {}'.format(dashboard_template(field['fields'], _level + 2), field['name'])) + else: + fields.append('STRUCT ({}\n) AS {}'.format(dashboard_template(field['fields'], _level + 2), field['name'])) + else: + fields.append('CAST(NULL AS {type}) AS {name}'.format(**field)) + + return ('' if _level else 'SELECT ') + ('\n'+ ' ' * _level) + (',\n'+ ' ' * _level).join(fields) + + + +def task_template(auth, table): + """ Grabs view from BigQuery and embeds into a BQFlow task. + + Handles indentation and character escaping. Also replaces + dataset and project with a paremeter field for portability. + Does not handle comments well, must be terminated by user. + + Args: + - table: (dict) The view definition as returned by BigQuery. + + Returns: + Dictionary containing the BQFlow task. + """ + + task = { + "bigquery":{ + "auth":auth, + "from":{ + "query":table['view']['query'].replace(table['tableReference']['projectId'] + '.', '').replace(table['tableReference']['datasetId'] + '.', '{dataset}.'), + "legacy":table['view']['useLegacySql'], + "parameters":{ + "dataset":table['tableReference']['datasetId'] + } + }, + "to":{ + "dataset":table['tableReference']['datasetId'], + "view":table['tableReference']['tableId'] + } + } + } + return task + + +def main(): + # get parameters + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + description=textwrap.dedent("""\ + Command line to get table schema from BigQuery. + + Helps developers upload data to BigQuery and pull schemas. These are the + most common BigQuery tasks when developing solutions. + + Examples: + Display table schema: `python bigquery.py --project [id] --dataset [name] --table [name] -s [credentials]` + Create view task: `python bigquery.py --project [id] --dataset [name] --view [name] -s [credentials]` + Upload csv table: `python bigquery.py --project [id] --dataset [name] --table [name] --csv [file] --schema [file] -s [credentials]` + + """)) + + parser.add_argument('--user', '-u', help='Path to USER credentials json file.', default=None) + parser.add_argument('--service', '-s', help='Path to SERVICE credentials json file.', default=None) + parser.add_argument('--project', '-p', help='Name of cloud project.', default=None) + + parser.add_argument( '--dataset', help='name of BigQuery dataset', default=None) + parser.add_argument( '--table', help='name of BigQuery table', default=None) + parser.add_argument( '--view', help='name of view to turn into BQFlow task', default=None) + parser.add_argument( '--csv', help='CSV file path', default=None) + parser.add_argument( '--schema', help='SCHEMA file path', default=None) + parser.add_argument( '--dashboard', help='Generate a dashboard query to mimic table schema.', default=None) + + # initialize project + + args = parser.parse_args() + config = Configuration( + user=args.user, + service=args.service, + project=args.project + ) + + auth = 'service' if args.service else 'user' + + schema = json.loads(args.schema) if args.schema else None + + if args.view: + print(json.dumps(task_template( + auth, + API_BigQuery(config, auth).tables().get(projectId=config.project, datasetId=args.dataset, tableId=args.view).execute() + ), indent=2).replace('\\n', '\n')) + + elif args.csv: + + with open(args.csv, 'r') as csv_file: + rows = csv_to_rows(csv_file.read()) + + if not schema: + rows, schema = get_schema(rows) + print('DETECETED SCHEMA', json.dumps(schema)) + print('Please run again with the above schema provided.') + exit() + + BigQuery(config, auth).rows_to_table( + config.project, + args.dataset, + args.table, + rows, + schema + ) + + else: + schema = BigQuery(config, auth).table_to_schema( + config.project, + args.dataset, + args.table or args.dashboard + ) + + if args.dashboard: + print() + print(dashboard_template(schema)) + print() + + else: + print(json.dumps(schema, indent=2)) + +if __name__ == '__main__': + main() diff --git a/bqflow/cm.py b/bqflow/cm.py new file mode 100755 index 0000000..88aabf3 --- /dev/null +++ b/bqflow/cm.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python3 + +########################################################################### +# +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +########################################################################### + +import json +import argparse +import textwrap + +from util.cm_api import get_profile_for_api, report_to_rows, report_clean, report_file, report_schema +from util.configuration import Configuration +from util.csv import rows_to_type, rows_print +from util.google_api import API_DCM + + +def task_template(auth, report): + """Helper to create a BQFlow compatible task JSON from CM report.""" + + task = { + "cm_report":{ + "auth":auth, + "report": { + "name":report['name'], + "account":report['accountId'], + "body":report + }, + "out":{ + "bigquery":{ + "auth":auth, + "dataset":"CM360_Dataset", + "table":"CM360_Report", + } + } + } + } + + try: del task['cm_report']['report']['body']['lastModifiedTime'] + except KeyError: pass + try: del task['cm_report']['report']['body']['ownerProfileId'] + except KeyError: pass + try: del task['cm_report']['report']['body']['accountId'] + except KeyError: pass + try: del task['cm_report']['report']['body']['fileName'] + except KeyError: pass + try: del task['cm_report']['report']['body']['name'] + except KeyError: pass + try: del task['cm_report']['report']['body']['etag'] + except KeyError: pass + try: del task['cm_report']['report']['body']['id'] + except KeyError: pass + + return task + + +def main(): + + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + description=textwrap.dedent("""\ + Command line to help debug CM reports and build reporting tools. + + Examples: + To get list of reports: python cm.py --account [id] --list -u [user credentials path] + To get report: python cm.py --account [id] --report [id] -u [user credentials path] + To get report files: python cm.py --account [id] --files [id] -u [user credentials path] + To get report sample: python cm.py --account [id] --sample [id] -u [user credentials path] + To get report schema: python cm.py --account [id] --schema [id] -u [user credentials path] + + """ + )) + + parser.add_argument('--user', '-u', help='Path to USER credentials json file.', default=None) + parser.add_argument('--service', '-s', help='Path to SERVICE credentials json file.', default=None) + + parser.add_argument('--account', help='Account ID to use to pull the report.', required=True) + parser.add_argument('--report', help='Report ID to pull JSON definition.', default=None) + parser.add_argument('--schema', help='Report ID to pull achema definition.', default=None) + parser.add_argument('--sample', help='Report ID to pull sample data.', default=None) + parser.add_argument('--files', help='Report ID to pull file list.', default=None) + parser.add_argument('--list', help='List reports.', action='store_true') + parser.add_argument('--task', help='Report ID to pull task definition.', default=None) + + # initialize project + args = parser.parse_args() + config = Configuration( + user=args.user, + service=args.service + ) + + auth = 'service' if args.service else 'user' + + profile = get_profile_for_api(config, auth, args.account) + kwargs = { 'profileId': profile } + + # get report json + if args.report: + kwargs['reportId'] = args.report + report = API_DCM(config, auth).reports().get(**kwargs).execute() + print(json.dumps(report, indent=2, sort_keys=True)) + + # get task json + elif args.task: + kwargs['reportId'] = args.task + report = API_DCM(config, auth).reports().get(**kwargs).execute() + print(json.dumps(task_template(auth, report), indent=2, sort_keys=True)) + + # get report files + elif args.files: + kwargs['reportId'] = args.files + for rf in API_DCM(config, auth, iterate=True).reports().files().list(**kwargs).execute(): + print(json.dumps(rf, indent=2, sort_keys=True)) + + # get schema + elif args.schema: + filename, report = report_file(config, auth, args.account, + args.schema, None, 10) + rows = report_to_rows(report) + rows = report_clean(rows) + print(json.dumps(report_schema(next(rows)), indent=2, sort_keys=True)) + + # get sample + elif args.sample: + filename, report = report_file(config, auth, args.account, args.sample, None, 10) + rows = report_to_rows(report) + rows = report_clean(rows) + rows = rows_to_type(rows) + for r in rows_print(rows, row_min=0, row_max=20): + pass + + # get list + else: + for report in API_DCM( config, auth, iterate=True).reports().list(**kwargs).execute(): + print(json.dumps(report, indent=2, sort_keys=True)) + + +if __name__ == '__main__': + main() diff --git a/bqflow/dv.py b/bqflow/dv.py new file mode 100755 index 0000000..bdf5d36 --- /dev/null +++ b/bqflow/dv.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 + +########################################################################### +# +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +########################################################################### + +import json +import argparse +import textwrap + +from util.bigquery_api import get_schema +from util.csv import rows_to_type, rows_print +from util.configuration import Configuration +from util.dv_api import report_file, report_to_rows, report_clean +from util.google_api import API_DBM + +def task_template(auth, report): + """Helper to create a BQFlow compatible task JSON from DV report.""" + + task = { + "dv_report":{ + "auth":auth, + "report": { + "name":report['metadata']['title'], + "body":report + }, + "out":{ + "bigquery":{ + "auth":auth, + "dataset":"DV360_Dataset", + "table":"DV360_Report", + } + } + } + } + + try: del task['dv_report']['report']['body']['queryId'] + except KeyError: pass + + return task + + +def main(): + + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + description=textwrap.dedent("""\ + Command line to help debug DV360 reports and build reporting tools. + + Examples: + To get list of reports: python dv.py --list -u [user credentials path] + To get report json: python dv.py --report [id] -u [user credentials path] + To get report schema: python dv.py --schema [id] -u [user credentials path] + To get report sample: python dv.py --sample [id] -u [user credentials path] + + """)) + + # create parameters + parser.add_argument('--user', '-u', help='Path to USER credentials json file.', default=None) + parser.add_argument('--service', '-s', help='Path to SERVICE credentials json file.', default=None) + + parser.add_argument('--report', help='report ID to pull json definition', default=None) + parser.add_argument('--schema', help='report ID to pull schema format', default=None) + parser.add_argument('--sample', help='report ID to pull sample data', default=None) + parser.add_argument('--list', help='list reports', action='store_true') + parser.add_argument('--task', help='report ID to pull json task', default=None) + + # initialize project + args = parser.parse_args() + config = Configuration( + user=args.user, + service=args.service + ) + + auth = 'service' if args.service else 'user' + + # get report + if args.report: + report = API_DBM(config, auth).queries().get(queryId=args.report).execute() + print(json.dumps(report, indent=2, sort_keys=True)) + + # get task json + elif args.task: + report = API_DBM(config, auth).queries().get(queryId=args.task).execute() + print(json.dumps(task_template(auth, report), indent=2, sort_keys=True)) + + # get schema + elif args.schema: + filename, report = report_file(config, auth, args.schema, None, 10) + rows = report_to_rows(report) + rows = report_clean(rows) + rows = rows_to_type(rows) + print(json.dumps(get_schema(rows)[1], indent=2, sort_keys=True)) + + # get sample + elif args.sample: + filename, report = report_file(config, auth, args.sample, None, 10) + rows = report_to_rows(report) + rows = report_clean(rows) + rows = rows_to_type(rows) + for r in rows_print(rows, row_min=0, row_max=20): + pass + + # get list + else: + for report in API_DBM(config, auth, iterate=True).queries().list().execute(): + print(json.dumps(report, indent=2, sort_keys=True)) + + +if __name__ == '__main__': + main() diff --git a/bqflow/run.py b/bqflow/run.py index 78700a4..694e825 100644 --- a/bqflow/run.py +++ b/bqflow/run.py @@ -222,84 +222,18 @@ def main(): Example: python run.py [path to workflow file] Caution: This script does NOT check if the last job finished, potentially causing overruns. - Notes: - - To avoid running the entire script when debugging a single task, the command line - can easily replace "all" with the name of any "task" in the json. For example - python run.py scripts/say_hello.json - - - Or specified further to run only the second hello task: - python run.py scripts/say_hello.json -t 2 - """)) - parser.add_argument( - 'workflow', - help='Path, local or Google Drive link, to workflow json file to run.' - ) - - parser.add_argument( - '--project', - '-p', - help='Cloud ID of Google Cloud Project.', - default=None - ) - - parser.add_argument( - '--key', - '-k', - help='API Key of Google Cloud Project.', - default=None - ) - - parser.add_argument( - '--service', - '-s', - help='Path to SERVICE credentials json file.', - default=None - ) - - parser.add_argument( - '--client', - '-c', - help='Path to CLIENT credentials json file.', - default=None - ) - - parser.add_argument( - '--user', - '-u', - help='Path to USER credentials json file.', - default=None - ) - - parser.add_argument( - '--timezone', - '-tz', - help='Time zone to run schedules on.', - default='America/Los_Angeles', - ) - - parser.add_argument( - '--task', - '-t', - help='Task number of the task to run starting at 1.', - default=None, - type=int - ) - - parser.add_argument( - '--verbose', - '-v', - help='Print all the steps as they happen.', - action='store_true' - ) - - parser.add_argument( - '--force', - '-force', - help='Not used but included for compatiblity with another script.', - action='store_true' - ) + parser.add_argument('workflow', help='Path, local or Google Drive link, to workflow json file to run.') + parser.add_argument('--project', '-p', help='Cloud ID of Google Cloud Project.', default=None) + parser.add_argument('--key', '-k', help='API Key of Google Cloud Project.', default=None) + parser.add_argument('--service', '-s', help='Path to SERVICE credentials json file.', default=None) + parser.add_argument('--client', '-c', help='Path to CLIENT credentials json file.', default=None) + parser.add_argument('--user', '-u', help='Path to USER credentials json file.', default=None) + parser.add_argument('--timezone', '-tz', help='Time zone to run schedules on.', default='America/Los_Angeles') + parser.add_argument('--task', '-t', help='Task number of the task to run starting at 1.', default=None, type=int) + parser.add_argument('--verbose', '-v', help='Print all the steps as they happen.', action='store_true') + parser.add_argument('--force', '-force', help='Not used but included for compatiblity with another script.', action='store_true') args = parser.parse_args() diff --git a/setup.py b/setup.py index 8d13103..61de9fb 100755 --- a/setup.py +++ b/setup.py @@ -42,13 +42,16 @@ author='Paul Kenjora', author_email='kenjora@google.com', url='https://github.com/google/bqflow', - packages=find_packages(), - package_dir={'bqflow': 'bqflow'}, + packages=['bqflow'], include_package_data=True, install_requires=REQUIREMENTS, entry_points={ 'console_scripts': [ - 'run = bqflow.run:main', + 'bqflow_run = bqflow.run:main', + 'bqflow_auth = bqflow.auth:main', + 'bqflow_cm = bqflow.cm:main', + 'bqflow_dv = bqflow.dv:main', + 'bqflow_bq = bqflow.bq:main', ] }, license='Apache License, Version 2.0',