Skip to content

Commit f9a59ba

Browse files
authored
Handle timestamp incremental replication (wintersrd#89)
* Handle timestamp (nothing to do with datetime -- SQL Server rowversion) incremental replication * Release 2.6.4 prep
1 parent fde353a commit f9a59ba

File tree

6 files changed

+108
-14
lines changed

6 files changed

+108
-14
lines changed

.bumpversion.cfg

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 2.6.3
2+
current_version = 2.6.4
33
parse = (?P<major>\d+)
44
\.(?P<minor>\d+)
55
\.(?P<patch>\d+)

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# tap-mssql 2.6.4 2024-10-24
2+
* Update to handle `timestamp` (not a datetime value, a [deprecated](https://learn.microsoft.com/en-us/sql/t-sql/data-types/rowversion-transact-sql?view=sql-server-ver16#remarks) synonym of internal `rowversion`) as string
3+
* Add tests for incremental syncing using a `timestamp` column as `replication-key`
4+
15
# tap-mssql 2.6.3 2024-10-17
26
* Updating CDC documentation with a packaged method to maintain CDC tables.
37

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "tap-mssql"
3-
version = "2.6.3"
3+
version = "2.6.4"
44
description = "A pipelinewise compatible tap for connecting Microsoft SQL Server"
55
authors = ["Rob Winters <[email protected]>"]
66
license = "GNU Affero"

tap_mssql/__init__.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666

6767
DECIMAL_TYPES = set(["decimal", "number", "money", "smallmoney", "numeric"])
6868

69-
DATETIME_TYPES = set(["datetime2", "datetime", "datetimeoffset", "timestamp", "smalldatetime"])
69+
DATETIME_TYPES = set(["datetime2", "datetime", "datetimeoffset", "smalldatetime"])
7070

7171
DATE_TYPES = set(["date"])
7272

@@ -101,6 +101,10 @@ def schema_for_column(c, config):
101101

102102
if data_type == "bit":
103103
result.type = ["null", "boolean"]
104+
105+
elif data_type in ["timestamp", "rowversion"]:
106+
result.type = ["null", "string"]
107+
result.format = "rowversion"
104108

105109
elif data_type in BYTES_FOR_INTEGER_TYPE:
106110
result.type = ["null", "integer"]

tap_mssql/sync_strategies/incremental.py

+11-3
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,19 @@ def sync_table(mssql_conn, config, catalog_entry, state, columns):
5757
replication_key_value = datetime.fromtimestamp(
5858
pendulum.parse(replication_key_value).timestamp()
5959
)
60+
# Handle timestamp incremental (timestamp)
61+
if catalog_entry.schema.properties[replication_key_metadata].format == 'rowversion':
62+
select_sql += """ WHERE CAST("{}" AS BIGINT) >=
63+
convert(bigint, convert (varbinary(8), '0x{}', 1))
64+
ORDER BY "{}" ASC""".format(
65+
replication_key_metadata, replication_key_value, replication_key_metadata
66+
)
6067

68+
else:
69+
select_sql += ' WHERE "{}" >= %(replication_key_value)s ORDER BY "{}" ASC'.format(
70+
replication_key_metadata, replication_key_metadata
71+
)
6172

62-
select_sql += ' WHERE "{}" >= %(replication_key_value)s ORDER BY "{}" ASC'.format(
63-
replication_key_metadata, replication_key_metadata
64-
)
6573

6674
params["replication_key_value"] = replication_key_value
6775
elif replication_key_metadata is not None:

tests/test_tap_mssql.py

+86-8
Original file line numberDiff line numberDiff line change
@@ -561,15 +561,15 @@ def test_with_no_state(self):
561561

562562
(message_types, versions) = message_types_and_versions(SINGER_MESSAGES)
563563

564-
self.assertEqual(
565-
[
566-
"ActivateVersionMessage",
567-
"RecordMessage",
568-
],
569-
sorted(list(set(message_types))),
570-
)
564+
571565
self.assertTrue(isinstance(versions[0], int))
572566
self.assertEqual(versions[0], versions[1])
567+
record_messages = [message for message in SINGER_MESSAGES if isinstance(message,singer.RecordMessage)]
568+
incremental_record_messages = [m for m in record_messages if m.stream == 'dbo-incremental']
569+
integer_incremental_record_messages = [m for m in record_messages if m.stream == 'dbo-integer_incremental']
570+
571+
self.assertEqual(len(incremental_record_messages),3)
572+
self.assertEqual(len(integer_incremental_record_messages),3)
573573

574574
def test_with_state(self):
575575
state = {
@@ -602,7 +602,14 @@ def test_with_state(self):
602602
)
603603
self.assertTrue(isinstance(versions[0], int))
604604
self.assertEqual(versions[0], versions[1])
605-
self.assertEqual(versions[1], 12345)
605+
606+
# Based on state values provided check the number of record messages emitted
607+
record_messages = [message for message in SINGER_MESSAGES if isinstance(message,singer.RecordMessage)]
608+
incremental_record_messages = [m for m in record_messages if m.stream == 'dbo-incremental']
609+
integer_incremental_record_messages = [m for m in record_messages if m.stream == 'dbo-integer_incremental']
610+
611+
self.assertEqual(len(incremental_record_messages),2)
612+
self.assertEqual(len(integer_incremental_record_messages),1)
606613

607614

608615
class TestViews(unittest.TestCase):
@@ -650,6 +657,76 @@ def test_do_not_discover_key_properties_for_view(self):
650657

651658
self.assertEqual(primary_keys, {"a_table": ["id"], "a_view": []})
652659

660+
class TestTimestampIncrementalReplication(unittest.TestCase):
661+
def setUp(self):
662+
self.conn = test_utils.get_test_connection()
663+
664+
with connect_with_backoff(self.conn) as open_conn:
665+
with open_conn.cursor() as cursor:
666+
try:
667+
cursor.execute("drop table incremental")
668+
except:
669+
pass
670+
cursor.execute("CREATE TABLE incremental (val int, updated timestamp)")
671+
cursor.execute("INSERT INTO incremental (val) VALUES (1)") #00000000000007d1
672+
cursor.execute("INSERT INTO incremental (val) VALUES (2)") #00000000000007d2
673+
cursor.execute("INSERT INTO incremental (val) VALUES (3)") #00000000000007d3
674+
675+
self.catalog = test_utils.discover_catalog(self.conn, {})
676+
677+
for stream in self.catalog.streams:
678+
stream.metadata = [
679+
{
680+
"breadcrumb": (),
681+
"metadata": {
682+
"selected": True,
683+
"table-key-properties": [],
684+
"database-name": "dbo",
685+
},
686+
},
687+
{"breadcrumb": ("properties", "val"), "metadata": {"selected": True}},
688+
]
689+
690+
stream.stream = stream.table
691+
test_utils.set_replication_method_and_key(stream, "INCREMENTAL", "updated")
692+
693+
def test_with_no_state(self):
694+
state = {}
695+
696+
global SINGER_MESSAGES
697+
SINGER_MESSAGES.clear()
698+
699+
tap_mssql.do_sync(self.conn, test_utils.get_db_config(), self.catalog, state)
700+
701+
(message_types, versions) = message_types_and_versions(SINGER_MESSAGES)
702+
703+
record_messages = [message for message in SINGER_MESSAGES if isinstance(message,singer.RecordMessage)]
704+
705+
self.assertEqual(len(record_messages),3)
706+
707+
708+
def test_with_state(self):
709+
state = {
710+
"bookmarks": {
711+
"dbo-incremental": {
712+
"version": 1,
713+
"replication_key_value": '00000000000007d2',
714+
"replication_key": "updated",
715+
},
716+
}
717+
}
718+
719+
global SINGER_MESSAGES
720+
SINGER_MESSAGES.clear()
721+
tap_mssql.do_sync(self.conn, test_utils.get_db_config(), self.catalog, state)
722+
723+
(message_types, versions) = message_types_and_versions(SINGER_MESSAGES)
724+
725+
# Given the state value supplied, there should only be two RECORD messages
726+
record_messages = [message for message in SINGER_MESSAGES if isinstance(message,singer.RecordMessage)]
727+
728+
self.assertEqual(len(record_messages),2)
729+
653730
class TestPrimaryKeyUniqueKey(unittest.TestCase):
654731
def setUp(self):
655732
self.conn = test_utils.get_test_connection()
@@ -708,6 +785,7 @@ def test_only_primary_key(self):
708785
self.assertEqual(primary_keys["pk_only_table"], ["pk"])
709786
self.assertEqual(primary_keys["pk_uc_table"], ["pk"])
710787

788+
711789
if __name__ == "__main__":
712790
# test1 = TestBinlogReplication()
713791
# test1.setUp()

0 commit comments

Comments
 (0)