3
3
import logging
4
4
import os
5
5
import sys
6
+ import itertools
6
7
7
8
from google .api_core .exceptions import InternalServerError , NotFound
8
9
from google .api_core .retry import if_exception_type , Retry
18
19
19
20
RETRY_EXCEPTIONS = [InternalServerError ]
20
21
22
+ logging .basicConfig ()
23
+
24
+ logger = logging .getLogger (__name__ )
25
+ logger .setLevel (logging .DEBUG )
26
+
21
27
22
28
def get_credentials (
23
29
service_account_blob : dict = None ,
@@ -320,13 +326,15 @@ def create_table_from_dataframe(
320
326
column_name = dataframe .columns [column_index ]
321
327
db_column_name = underscore (parameterize (column_name ))
322
328
column_name_conversion [column_name ] = db_column_name
323
- datatype = dataframe .dtypes [column_index ].name
329
+ datatype = dataframe .dtypes . iloc [column_index ].name
324
330
if datatype == "object" :
325
331
column_definitions .append (f"{ db_column_name } STRING" )
326
332
elif datatype == "int64" :
327
333
column_definitions .append (f"{ db_column_name } INT64" )
328
334
elif datatype == "float64" :
329
335
column_definitions .append (f"{ db_column_name } NUMERIC" )
336
+ elif datatype == "bool" :
337
+ column_definitions .append (f"{ db_column_name } BOOL" )
330
338
else :
331
339
raise ValueError (f"Unknown data type { datatype } on column { column_name } " )
332
340
@@ -341,7 +349,14 @@ def create_table_from_dataframe(
341
349
"""
342
350
print (table_definition_sql )
343
351
run_query (table_definition_sql , client = client )
344
- load_data_from_dataframe (client , dataframe , project_name , dataset_name , table_name )
352
+ load_data_from_dataframe (
353
+ client ,
354
+ dataframe ,
355
+ project_name ,
356
+ dataset_name ,
357
+ table_name ,
358
+ retry_exceptions = [NotFound , * RETRY_EXCEPTIONS ],
359
+ )
345
360
346
361
347
362
def get_table_for_loading (
@@ -396,7 +411,12 @@ def load_data_from_dataframe(
396
411
retry = retry_policy ,
397
412
)
398
413
399
- logging .info (f"inserted { len (results )} rows" )
414
+ print (f"completed insert to { table } " )
415
+ if len (results ) > 0 :
416
+ # any result from insert_rows_to_dataframe indicates errors in some rows, log them
417
+ logger .error (f"Errors encountered inserting to { table } " )
418
+ errors = itertools .chain (results )
419
+ logger .error (errors )
400
420
401
421
402
422
def load_data_from_list (
@@ -423,7 +443,7 @@ def load_data_from_list(
423
443
retry_policy = Retry (predicate = if_exception_type (* retry_exceptions ))
424
444
results = client .insert_rows (table = table , rows = data , retry = retry_policy )
425
445
426
- logging .info (f"inserted { len (results )} rows" )
446
+ logger .info (f"inserted { len (results )} rows" )
427
447
428
448
429
449
def upload_data_to_gcs (
0 commit comments