Skip to content

Commit

Permalink
Use setting for opt-in
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Nov 22, 2024
1 parent c2183c6 commit 3bd50f2
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 16 deletions.
11 changes: 0 additions & 11 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,3 @@ repos:
- id: ruff
args: [--fix]
- id: ruff-format

- repo: https://github.com/pre-commit/mirrors-mypy
rev: 'v1.13.0'
hooks:
- id: mypy
exclude: tests
additional_dependencies:
- types-paramiko
- types-simplejson
- types-sqlalchemy
- types-jsonschema
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ This target is tested with all actively supported [Python](https://devguide.pyth
| user | False | None | User name used to authenticate. |
| password | False | None | Password used to authenticate. |
| database | False | None | Database name. |
| use_copy | False | None | Use the COPY command to insert data. This is usually faster than INSERT statements. |
| default_target_schema | False | melty | Postgres schema to send data to, example: tap-clickup |
| activate_version | False | 1 | If set to false, the tap will ignore activate version messages. If set to true, add_record_metadata must be set to true as well. |
| hard_delete | False | 0 | When activate version is sent from a tap this specefies if we should delete the records that don't match, or mark them with a date in the `_sdc_deleted_at` column. This config option is ignored if `activate_version` is set to false. |
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ types-jsonschema = ">=4.19.0.3"

[tool.mypy]
exclude = "tests"
warn_redundant_casts = true
warn_unused_configs = true
warn_unused_ignores = true

[[tool.mypy.overrides]]
module = ["sshtunnel"]
Expand Down
2 changes: 1 addition & 1 deletion target_postgres/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ def guess_key_type(self, key_data: str) -> paramiko.PKey:
paramiko.Ed25519Key,
):
try:
key = key_class.from_private_key(io.StringIO(key_data)) # type: ignore[attr-defined]
key = key_class.from_private_key(io.StringIO(key_data))
except paramiko.SSHException: # noqa: PERF203
continue
else:
Expand Down
19 changes: 15 additions & 4 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def generate_temp_table_name(self):
def generate_copy_statement(
self,
full_table_name: str | FullyQualifiedName,
columns: list[sa.Column], # type: ignore[override]
columns: list[sa.Column],
) -> str:
"""Generate a copy statement for bulk copy.
Expand Down Expand Up @@ -196,8 +196,6 @@ def bulk_insert_records( # type: ignore[override]
True if table exists, False if not, None if unsure or undetectable.
"""
columns = self.column_representation(schema)
copy_statement: str = self.generate_copy_statement(table.name, columns)
self.logger.info("Inserting with SQL: %s", copy_statement)

data: list[dict[str, t.Any]] = []

Expand All @@ -220,7 +218,20 @@ def bulk_insert_records( # type: ignore[override]
}
data.append(insert_record)

self._do_copy(connection, copy_statement, columns, data)
if self.config["use_copy"]:
copy_statement: str = self.generate_copy_statement(table.name, columns)
self.logger.info("Inserting with SQL: %s", copy_statement)
self._do_copy(connection, copy_statement, columns, data)
else:
insert: str = t.cast(
str,
self.generate_insert_statement(
table.name,
columns,
),
)
self.logger.info("Inserting with SQL: %s", insert)
connection.execute(insert, data)

return True

Expand Down
10 changes: 10 additions & 0 deletions target_postgres/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ def __init__(
th.StringType,
description="Database name.",
),
th.Property(
"use_copy",
th.BooleanType,
default=False,
description=(
"Use the COPY command to insert data. This is usually faster than "
"INSERT statements."
),
title="Use COPY",
),
th.Property(
"sqlalchemy_url",
th.StringType,
Expand Down

0 comments on commit 3bd50f2

Please sign in to comment.