-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathtarget.py
362 lines (346 loc) · 14.1 KB
/
target.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
"""Postgres target class."""
from __future__ import annotations
import typing as t
from singer_sdk import typing as th
from singer_sdk.target_base import SQLTarget
from target_postgres.driver import PSYCOPG3
from target_postgres.sinks import PostgresSink
if t.TYPE_CHECKING:
from pathlib import PurePath
class TargetPostgres(SQLTarget):
"""Target for Postgres."""
package_name = "meltanolabs-target-postgres"
def __init__(
self,
config: dict | PurePath | str | list[PurePath | str] | None = None,
parse_env_config: bool = False,
validate_config: bool = True,
) -> None:
"""Initialize the target.
Args:
config: Target configuration. Can be a dictionary, a single path to a
configuration file, or a list of paths to multiple configuration
files.
parse_env_config: Whether to look for configuration values in environment
variables.
validate_config: True to require validation of config settings.
"""
self.max_parallelism = 1
super().__init__(
config=config,
parse_env_config=parse_env_config,
validate_config=validate_config,
)
# sqlalchemy_url and dialect+driver are now deprecated in favor of the
# individual host, port, user, password, and dialect+driver fields.
if self.config.get("sqlalchemy_url"):
self.logger.warning(
"The `sqlalchemy_url` configuration option is deprecated. "
"Please use the `host`, `port`, `user`, `password` "
"configuration options instead."
)
if (driver := self.config.get("dialect+driver")) and driver != PSYCOPG3:
self.logger.warning(
"The `dialect+driver` configuration option is deprecated. "
f"Please set it to `{PSYCOPG3}`, as this will be the hard-coded "
"value in the future."
)
# There's a few ways to do this in JSON Schema but it is schema draft dependent.
# https://stackoverflow.com/questions/38717933/jsonschema-attribute-conditionally-required # noqa: E501
assert (self.config.get("sqlalchemy_url") is not None) or (
self.config.get("host") is not None
and self.config.get("port") is not None
and self.config.get("user") is not None
and self.config.get("password") is not None
and self.config.get("dialect+driver") is not None
), (
"Need either the sqlalchemy_url to be set or host, port, user, "
"password, and dialect+driver to be set"
)
# If sqlalchemy_url is not being used and ssl_enable is on, ssl_mode must have
# one of six allowable values. If ssl_mode is verify-ca or verify-full, a
# certificate authority must be provided to verify against.
assert (
(self.config.get("sqlalchemy_url") is not None)
or (self.config.get("ssl_enable") is False)
or (
self.config.get("ssl_mode") in {"disable", "allow", "prefer", "require"}
)
or (
self.config.get("ssl_mode") in {"verify-ca", "verify-full"}
and self.config.get("ssl_certificate_authority") is not None
)
), (
"ssl_enable is true but invalid values are provided for ssl_mode and/or"
+ "ssl_certificate_authority."
)
# If sqlalchemy_url is not being used and ssl_client_certificate_enable is on,
# the client must provide a certificate and associated private key.
assert (
(self.config.get("sqlalchemy_url") is not None)
or (self.config.get("ssl_client_certificate_enable") is False)
or (
self.config.get("ssl_client_certificate") is not None
and self.config.get("ssl_client_private_key") is not None
)
), (
"ssl_client_certificate_enable is true but one or both of"
+ " ssl_client_certificate or ssl_client_private_key are unset."
)
assert self.config.get("add_record_metadata") or not self.config.get(
"activate_version"
), (
"Activate version messages can't be processed unless add_record_metadata "
"is set to true. To ignore Activate version messages instead, Set the "
"`activate_version` configuration to False."
)
name = "target-postgres"
config_jsonschema = th.PropertiesList(
th.Property(
"host",
th.StringType,
description="Hostname for postgres instance.",
),
th.Property(
"port",
th.IntegerType,
default=5432,
description="The port on which postgres is awaiting connections.",
),
th.Property(
"user",
th.StringType,
description="User name used to authenticate.",
),
th.Property(
"password",
th.StringType,
description="Password used to authenticate.",
),
th.Property(
"database",
th.StringType,
description="Database name.",
),
th.Property(
"use_copy",
th.BooleanType,
default=False,
description=(
"Use the COPY command to insert data. This is usually faster than "
f"INSERT statements. This option is only available for the {PSYCOPG3} "
"dialect+driver."
),
title="Use COPY",
),
th.Property(
"sqlalchemy_url",
th.StringType,
description=(
"DEPRECATED. SQLAlchemy connection string. "
+ "This will override using host, user, password, port, "
+ "dialect, and all ssl settings. Note that you must escape password "
+ "special characters properly. See "
+ "https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords"
),
),
th.Property(
"dialect+driver",
th.StringType,
default=PSYCOPG3,
description=(
"DEPRECATED. Dialect+driver see "
+ "https://docs.sqlalchemy.org/en/20/core/engines.html. "
+ "Generally just leave this alone."
),
),
th.Property(
"default_target_schema",
th.StringType,
description="Postgres schema to send data to, example: tap-clickup",
default="melty",
),
th.Property(
"activate_version",
th.BooleanType,
default=True,
description=(
"If set to false, the tap will ignore activate version messages. If "
+ "set to true, add_record_metadata must be set to true as well."
),
),
th.Property(
"hard_delete",
th.BooleanType,
default=False,
description=(
"When activate version is sent from a tap this specefies "
+ "if we should delete the records that don't match, or mark "
+ "them with a date in the `_sdc_deleted_at` column. This config "
+ "option is ignored if `activate_version` is set to false."
),
),
th.Property(
"add_record_metadata",
th.BooleanType,
default=True,
description=(
"Note that this must be enabled for activate_version to work!"
+ "This adds _sdc_extracted_at, _sdc_batched_at, and more to every "
+ "table. See https://sdk.meltano.com/en/latest/implementation/record_metadata.html " # noqa: E501
+ "for more information."
),
),
th.Property(
"interpret_content_encoding",
th.BooleanType,
default=False,
description=(
"If set to true, the target will interpret the content encoding of the "
"schema to determine how to store the data. Using this option may "
"result in a more efficient storage of the data but may also result "
"in an error if the data is not encoded as expected."
),
),
th.Property(
"sanitize_null_text_characters",
th.BooleanType,
default=False,
description=(
"If set to true, the target will sanitize null characters in "
"char/text/varchar fields, as they are not supported by Postgres. "
"See [postgres documentation](https://www.postgresql.org/docs/current/functions-string.html) " # noqa: E501
"for more information about chr(0) not being supported."
),
),
th.Property(
"ssl_enable",
th.BooleanType,
default=False,
description=(
"Whether or not to use ssl to verify the server's identity. Use"
+ " ssl_certificate_authority and ssl_mode for further customization."
+ " To use a client certificate to authenticate yourself to the server,"
+ " use ssl_client_certificate_enable instead."
),
),
th.Property(
"ssl_client_certificate_enable",
th.BooleanType,
default=False,
description=(
"Whether or not to provide client-side certificates as a method of"
+ " authentication to the server. Use ssl_client_certificate and"
+ " ssl_client_private_key for further customization. To use SSL to"
+ " verify the server's identity, use ssl_enable instead."
),
),
th.Property(
"ssl_mode",
th.StringType,
default="verify-full",
description=(
"SSL Protection method, see [postgres documentation](https://www.postgresql.org/docs/current/libpq-ssl.html#LIBPQ-SSL-PROTECTION)"
+ " for more information. Must be one of disable, allow, prefer,"
+ " require, verify-ca, or verify-full."
),
),
th.Property(
"ssl_certificate_authority",
th.StringType,
default="~/.postgresql/root.crl",
description=(
"The certificate authority that should be used to verify the server's"
+ " identity. Can be provided either as the certificate itself (in"
+ " .env) or as a filepath to the certificate."
),
),
th.Property(
"ssl_client_certificate",
th.StringType,
default="~/.postgresql/postgresql.crt",
description=(
"The certificate that should be used to verify your identity to the"
+ " server. Can be provided either as the certificate itself (in .env)"
+ " or as a filepath to the certificate."
),
),
th.Property(
"ssl_client_private_key",
th.StringType,
default="~/.postgresql/postgresql.key",
description=(
"The private key for the certificate you provided. Can be provided"
+ " either as the certificate itself (in .env) or as a filepath to the"
+ " certificate."
),
),
th.Property(
"ssl_storage_directory",
th.StringType,
default=".secrets",
description=(
"The folder in which to store SSL certificates provided as raw values."
+ " When a certificate/key is provided as a raw value instead of as a"
+ " filepath, it must be written to a file before it can be used. This"
+ " configuration option determines where that file is created."
),
),
th.Property(
"ssh_tunnel",
th.ObjectType(
th.Property(
"enable",
th.BooleanType,
required=False,
default=False,
description=(
"Enable an ssh tunnel (also known as bastion host), see the "
"other ssh_tunnel.* properties for more details"
),
),
th.Property(
"host",
th.StringType,
required=False,
description=(
"Host of the bastion host, this is the host "
"we'll connect to via ssh"
),
),
th.Property(
"username",
th.StringType,
required=False,
description="Username to connect to bastion host",
),
th.Property(
"port",
th.IntegerType,
required=False,
default=22,
description="Port to connect to bastion host",
),
th.Property(
"private_key",
th.StringType,
required=False,
secret=True,
description="Private Key for authentication to the bastion host",
),
th.Property(
"private_key_password",
th.StringType,
required=False,
secret=True,
default=None,
description=(
"Private Key Password, leave None if no password is set"
),
),
),
required=False,
description="SSH Tunnel Configuration, this is a json object",
),
).to_dict()
default_sink_class = PostgresSink