From 0140b45aa45240be250fd4dac57fd0a2dbf6786f Mon Sep 17 00:00:00 2001 From: roy0424 Date: Fri, 1 Dec 2023 17:25:26 +0900 Subject: [PATCH 01/10] implement partial format description event fields --- pymysqlreplication/event.py | 17 +++++++++- pymysqlreplication/tests/test_basic.py | 47 ++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 82b899745..f05ed7a6a 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -368,10 +368,25 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.mysql_version_str = self.packet.read(50).rstrip(b"\0").decode() numbers = self.mysql_version_str.split("-")[0] self.mysql_version = tuple(map(int, numbers.split("."))) + self.created = struct.unpack(" Date: Sun, 3 Dec 2023 06:04:44 +0900 Subject: [PATCH 02/10] applied black --- pymysqlreplication/event.py | 5 +++-- pymysqlreplication/tests/test_basic.py | 7 ++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index f05ed7a6a..e0246e905 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -370,7 +370,9 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.mysql_version = tuple(map(int, numbers.split("."))) self.created = struct.unpack(" Date: Sun, 3 Dec 2023 07:47:26 +0900 Subject: [PATCH 03/10] added dbms property in BinLogEvent --- pymysqlreplication/event.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index e0246e905..41cf11101 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -27,6 +27,7 @@ def __init__( ignore_decode_errors=False, verify_checksum=False, optional_meta_data=False, + dbms=None, ): self.packet = from_packet self.table_map = table_map @@ -43,6 +44,7 @@ def __init__( self._processed = True self.complete = True self._verify_event() + self.dbms = dbms def _read_table_id(self): # Table ID is 6 byte @@ -104,7 +106,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.gno = struct.unpack("= (5, 7): + if not self.is_mariadb and self.mysql_version >= (5, 7): self.last_committed = struct.unpack(" Date: Sun, 3 Dec 2023 07:50:17 +0900 Subject: [PATCH 04/10] remove version retrieval from session --- pymysqlreplication/binlogstream.py | 14 -------------- pymysqlreplication/row_event.py | 1 - 2 files changed, 15 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 85c9df6ab..9c7ac42ad 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -305,7 +305,6 @@ def __connect_to_ctl(self): self._ctl_connection_settings["cursorclass"] = DictCursor self._ctl_connection_settings["autocommit"] = True self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings) - self._ctl_connection._get_dbms = self.__get_dbms self.__connected_ctl = True self.__check_optional_meta_data() @@ -745,19 +744,6 @@ def _allowed_event_list( pass return frozenset(events) - def __get_dbms(self): - if not self.__connected_ctl: - self.__connect_to_ctl() - - cur = self._ctl_connection.cursor() - cur.execute("SELECT VERSION();") - - version_info = cur.fetchone().get("VERSION()", "") - - if "MariaDB" in version_info: - return "mariadb" - return "mysql" - def __log_valid_parameters(self): ignored = ["allowed_events", "table_map"] for parameter, value in self.__dict__.items(): diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index b371fcef7..7682ab4ee 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -782,7 +782,6 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.column_count = self.packet.read_length_coded_binary() self.columns = [] - self.dbms = self._ctl_connection._get_dbms() # Read columns meta data column_types = bytearray(self.packet.read(self.column_count)) self.packet.read_length_coded_binary() From 202753574efa16e297601134235a6804af6a65ed Mon Sep 17 00:00:00 2001 From: roy0424 Date: Sun, 3 Dec 2023 07:55:28 +0900 Subject: [PATCH 05/10] fixed used BinLogEvent property from GtidEvent --- pymysqlreplication/event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 41cf11101..70510b5c2 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -106,7 +106,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.gno = struct.unpack("= (5, 7): + if self.dbms is "mysql" and self.mysql_version >= (5, 7): self.last_committed = struct.unpack(" Date: Sun, 3 Dec 2023 08:00:02 +0900 Subject: [PATCH 06/10] added dbms property in format description event test --- pymysqlreplication/tests/test_basic.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 62fb1498c..e6371fa03 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -647,6 +647,7 @@ def test_format_description_event(self): self.assertIsInstance(event.post_header_len, tuple) self.assertIsInstance(event.mysql_version, tuple) self.assertEqual(len(event.mysql_version), 3) + self.assertEqual(event.dbms, "mysql") self.assertIsInstance(event.server_version_split, tuple) self.assertEqual(len(event.server_version_split), 3) self.assertIsInstance(event.number_of_event_types, int) @@ -1491,6 +1492,7 @@ def test_format_description_event(self): self.assertIsInstance(event.post_header_len, tuple) self.assertIsInstance(event.mysql_version, tuple) self.assertEqual(len(event.mysql_version), 3) + self.assertEqual(event.dbms, "mariadb") self.assertIsInstance(event.server_version_split, tuple) self.assertEqual(len(event.server_version_split), 3) self.assertIsInstance(event.number_of_event_types, int) From 5570ebdea81d795761cae05343cd0ab2b6488778 Mon Sep 17 00:00:00 2001 From: roy0424 Date: Sun, 3 Dec 2023 08:19:55 +0900 Subject: [PATCH 07/10] fixed comparison operator --- pymysqlreplication/event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 70510b5c2..8be9c8f78 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -106,7 +106,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.gno = struct.unpack("= (5, 7): + if self.dbms == "mysql" and self.mysql_version >= (5, 7): self.last_committed = struct.unpack(" Date: Mon, 4 Dec 2023 17:13:10 +0900 Subject: [PATCH 08/10] re-added version retrieval from session --- pymysqlreplication/binlogstream.py | 14 ++++++++++++++ pymysqlreplication/row_event.py | 1 + 2 files changed, 15 insertions(+) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 9c7ac42ad..85c9df6ab 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -305,6 +305,7 @@ def __connect_to_ctl(self): self._ctl_connection_settings["cursorclass"] = DictCursor self._ctl_connection_settings["autocommit"] = True self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings) + self._ctl_connection._get_dbms = self.__get_dbms self.__connected_ctl = True self.__check_optional_meta_data() @@ -744,6 +745,19 @@ def _allowed_event_list( pass return frozenset(events) + def __get_dbms(self): + if not self.__connected_ctl: + self.__connect_to_ctl() + + cur = self._ctl_connection.cursor() + cur.execute("SELECT VERSION();") + + version_info = cur.fetchone().get("VERSION()", "") + + if "MariaDB" in version_info: + return "mariadb" + return "mysql" + def __log_valid_parameters(self): ignored = ["allowed_events", "table_map"] for parameter, value in self.__dict__.items(): diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index cd5bf9fe2..de205380a 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -782,6 +782,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.column_count = self.packet.read_length_coded_binary() self.columns = [] + self.dbms = self._ctl_connection._get_dbms() # Read columns meta data column_types = bytearray(self.packet.read(self.column_count)) self.packet.read_length_coded_binary() From c8f83fe246af195e96ac95d7375670d19df3ef83 Mon Sep 17 00:00:00 2001 From: roy0424 Date: Mon, 4 Dec 2023 18:34:23 +0900 Subject: [PATCH 09/10] fixed version retrieval from get_server_info --- pymysqlreplication/binlogstream.py | 13 ++++++------- pymysqlreplication/event.py | 3 +-- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 85c9df6ab..c4c3efb38 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -287,6 +287,7 @@ def __init__( else: self.pymysql_wrapper = pymysql.connect self.mysql_version = (0, 0, 0) + self.dbms = None def close(self): if self.__connected_stream: @@ -748,14 +749,12 @@ def _allowed_event_list( def __get_dbms(self): if not self.__connected_ctl: self.__connect_to_ctl() - - cur = self._ctl_connection.cursor() - cur.execute("SELECT VERSION();") - - version_info = cur.fetchone().get("VERSION()", "") - - if "MariaDB" in version_info: + if self.dbms: + return self.dbms + if "MariaDB" in self._ctl_connection.get_server_info(): + self.dbms = "mariadb" return "mariadb" + self.dbms = "mysql" return "mysql" def __log_valid_parameters(self): diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 8be9c8f78..c9936252a 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -27,7 +27,6 @@ def __init__( ignore_decode_errors=False, verify_checksum=False, optional_meta_data=False, - dbms=None, ): self.packet = from_packet self.table_map = table_map @@ -44,7 +43,7 @@ def __init__( self._processed = True self.complete = True self._verify_event() - self.dbms = dbms + self.dbms = None def _read_table_id(self): # Table ID is 6 byte From 59202a28d6b5cc778b352afff46a3a910d2cfac5 Mon Sep 17 00:00:00 2001 From: roy0424 Date: Mon, 4 Dec 2023 19:11:06 +0900 Subject: [PATCH 10/10] get dbms from ctl_connection in BinLogEvent --- pymysqlreplication/event.py | 8 ++------ pymysqlreplication/row_event.py | 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index c9936252a..73e360512 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -43,7 +43,7 @@ def __init__( self._processed = True self.complete = True self._verify_event() - self.dbms = None + self.dbms = self._ctl_connection._get_dbms() def _read_table_id(self): # Table ID is 6 byte @@ -105,7 +105,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.gno = struct.unpack("= (5, 7): + if self.mysql_version >= (5, 7): self.last_committed = struct.unpack("