-
Notifications
You must be signed in to change notification settings - Fork 1
/
create_schemaless_table.py
59 lines (45 loc) · 2.22 KB
/
create_schemaless_table.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import argparse, datetime, json, logging, os
from google.cloud import bigquery
from google.cloud.bigquery.external_config import HivePartitioningOptions
try:
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--project', help='GCP project ID', required=True)
parser.add_argument('-g', '--gcs_uri', help='Cloud Storage URI (gs://xxx/yyy', required=True)
parser.add_argument('-d', '--dataset', help='BigQuery dataset name', required=True)
parser.add_argument('-t', '--table', help='BigQuery table name', required=True)
args = parser.parse_args()
except ImportError:
args = None
logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.ERROR)
logger = logging.getLogger()
def create_bq_table(project, gcs_uri, dataset, table, require_hive_partition_filter=True):
bq_client = bigquery.Client(project=project)
table_ref = bq_client.dataset(dataset).table(table)
table = bigquery.Table(table_ref)
hive_partition_options = HivePartitioningOptions()
hive_partition_options.mode = "AUTO"
hive_partition_options.source_uri_prefix = gcs_uri
# To prevent one from accidentaly scan the whole table, set this
# partition filter requirement.
#
# table.require_partition_filter = True is not supported by the class yet.
# hive_partition_options.require_partition_filter = True is not
# supported by the class yet.
# So I need to do the following to include the option:
hive_partition_options._properties["require_partition_filter"] = require_hive_partition_filter
extconfig = bigquery.ExternalConfig('CSV')
extconfig.schema = [bigquery.SchemaField('line', 'STRING')]
extconfig.options.field_delimiter = u'\u00ff'
extconfig.options.quote_character = ''
# extconfig.compression = 'GZIP'
extconfig.options.allow_jagged_rows = False
extconfig.options.allow_quoted_newlines = False
extconfig.max_bad_records = 10000000
extconfig.source_uris=[os.path.join(gcs_uri, "*")]
extconfig.hive_partitioning = hive_partition_options
table.external_data_configuration = extconfig
bq_client.create_table(table)
def main():
create_bq_table(args.project, args.gcs_uri, args.dataset, args.table)
if __name__ == "__main__":
main()