Skip to content

Commit

Permalink
Refactor COPY into a method
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Nov 22, 2024
1 parent f1fbc3a commit c2183c6
Showing 1 changed file with 38 additions and 29 deletions.
67 changes: 38 additions & 29 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,38 @@ def generate_copy_statement(

return sql

def _do_copy(
self,
connection: sa.engine.Connection,
copy_statement: str,
columns: list[sa.Column],
data_to_copy: list[dict[str, t.Any]],
) -> None:
# Prepare to process the rows into csv. Use each column's bind_processor to do
# most of the work, then do the final construction of the csv rows ourselves
# to control exactly how values are converted and which ones are quoted.
column_bind_processors = {
column.name: column.type.bind_processor(connection.dialect)
for column in columns
}

# Use copy to run the copy statement.
# https://www.psycopg.org/psycopg3/docs/basic/copy.html
with connection.connection.cursor().copy(copy_statement) as copy: # type: ignore[attr-defined]
for row in data_to_copy:
processed_row = []
for row_column_name in row:
if column_bind_processors[row_column_name] is not None:
processed_row.append(
column_bind_processors[row_column_name](
row[row_column_name]
)
)
else:
processed_row.append(row[row_column_name])

copy.write_row(processed_row)

def bulk_insert_records( # type: ignore[override]
self,
table: sa.Table,
Expand Down Expand Up @@ -167,51 +199,28 @@ def bulk_insert_records( # type: ignore[override]
copy_statement: str = self.generate_copy_statement(table.name, columns)
self.logger.info("Inserting with SQL: %s", copy_statement)

data_to_copy: list[dict[str, t.Any]] = []
data: list[dict[str, t.Any]] = []

# If append only is False, we only take the latest record one per primary key
if self.append_only is False:
unique_copy_records: dict[tuple, dict] = {} # pk tuple: values
unique_records: dict[tuple, dict] = {} # pk tuple: values
for record in records:
insert_record = {
column.name: record.get(column.name) for column in columns
}
# No need to check for a KeyError here because the SDK already
# guarantees that all key properties exist in the record.
primary_key_tuple = tuple(record[key] for key in primary_keys)
unique_copy_records[primary_key_tuple] = insert_record
data_to_copy = list(unique_copy_records.values())
unique_records[primary_key_tuple] = insert_record
data = list(unique_records.values())
else:
for record in records:
insert_record = {
column.name: record.get(column.name) for column in columns
}
data_to_copy.append(insert_record)
data.append(insert_record)

# Prepare to process the rows into csv. Use each column's bind_processor to do
# most of the work, then do the final construction of the csv rows ourselves
# to control exactly how values are converted and which ones are quoted.
column_bind_processors = {
column.name: column.type.bind_processor(connection.dialect)
for column in columns
}

# Use copy to run the copy statement.
# https://www.psycopg.org/psycopg3/docs/basic/copy.html
with connection.connection.cursor().copy(copy_statement) as copy: # type: ignore[attr-defined]
for row in data_to_copy:
processed_row = []
for row_column_name in row:
if column_bind_processors[row_column_name] is not None:
processed_row.append(
column_bind_processors[row_column_name](
row[row_column_name]
)
)
else:
processed_row.append(row[row_column_name])

copy.write_row(processed_row)
self._do_copy(connection, copy_statement, columns, data)

return True

Expand Down

0 comments on commit c2183c6

Please sign in to comment.