From fa737eca4a068a0a38f9ffe93d663c4361d0b807 Mon Sep 17 00:00:00 2001 From: junha6316 Date: Fri, 1 Dec 2023 16:03:44 +0900 Subject: [PATCH 1/5] add to_dict method to BinLogEvent --- pymysqlreplication/event.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 82b89974..6fd28a9b 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -9,6 +9,7 @@ from pymysqlreplication.exceptions import StatusVariableMismatch from pymysqlreplication.util.bytes import parse_decimal_from_bytes from typing import Union, Optional +import json class BinLogEvent(object): @@ -67,15 +68,27 @@ def _verify_event(self): self.packet.read_bytes -= 19 + self.event_size + 4 self.packet.rewind(20) + @property + def formatted_timestamp(self) -> str: + return datetime.datetime.utcfromtimestamp(self.timestamp).isoformat() + def dump(self): print(f"=== {self.__class__.__name__} ===") - print(f"Date: {datetime.datetime.utcfromtimestamp(self.timestamp).isoformat()}") + print(f"Date: {self.formatted_timestamp}") print(f"Log position: {self.packet.log_pos}") print(f"Event size: {self.event_size}") print(f"Read bytes: {self.packet.read_bytes}") self._dump() print() + def to_dict(self) -> dict: + return { + "timestamp": self.formatted_timestamp, + "log_pos": self.packet.log_pos, + "event_size": self.event_size, + "read_bytes": self.packet.read_bytes, + } + def _dump(self): """Core data dumped for the event""" pass From 0a65ab8a88581d9b7db18833cda28adb923bc40f Mon Sep 17 00:00:00 2001 From: junha6316 Date: Fri, 1 Dec 2023 16:04:31 +0900 Subject: [PATCH 2/5] add to_json method to BinLogEvent --- pymysqlreplication/event.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 6fd28a9b..aa61b208 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -89,6 +89,9 @@ def to_dict(self) -> dict: "read_bytes": self.packet.read_bytes, } + def to_json(self) -> str: + return json.dumps(self.to_dict()) + def _dump(self): """Core data dumped for the event""" pass From 8496e0fa90ef7faba7d2f62d4f91789e466e87f1 Mon Sep 17 00:00:00 2001 From: junha6316 Date: Fri, 1 Dec 2023 16:05:25 +0900 Subject: [PATCH 3/5] add test fot to_json, to_dict --- pymysqlreplication/tests/test_event.py | 50 ++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 pymysqlreplication/tests/test_event.py diff --git a/pymysqlreplication/tests/test_event.py b/pymysqlreplication/tests/test_event.py new file mode 100644 index 00000000..332462e0 --- /dev/null +++ b/pymysqlreplication/tests/test_event.py @@ -0,0 +1,50 @@ +import pytest + + +from pymysqlreplication.tests.base import PyMySQLReplicationTestCase +from pymysqlreplication.event import QueryEvent +from pymysqlreplication import BinLogStreamReader +import json +class BinLogEventTestCase(PyMySQLReplicationTestCase): + + def setUp(self): + super(BinLogEventTestCase, self).setUp() + self.execute("SET SESSION binlog_rows_query_log_events=1") + + def tearDown(self): + self.execute("SET SESSION binlog_rows_query_log_events=0") + super(BinLogEventTestCase, self).tearDown() + + target_fields = [ + "timestamp", + "log_pos", + "event_size", + "read_bytes" + ] + + def test_to_dict(self): + self.stream = BinLogStreamReader(self.database, server_id=1024) + query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + self.execute(query) + self.execute("COMMIT") + + event = self.stream.fetchone() + + event_dict = event.to_dict() + + self.assertEqual(set(event_dict.keys()), set(self.target_fields)) + self.assertEqual(event_dict["timestamp"], event.formatted_timestamp) + self.assertEqual(event_dict["log_pos"], event.packet.log_pos) + self.assertEqual(event_dict["read_bytes"], event.packet.read_bytes) + self.assertEqual(event_dict["event_size"], event.event_size) + + def test_to_json(self): + self.stream = BinLogStreamReader(self.database, server_id=1024) + query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + self.execute(query) + self.execute("COMMIT") + + event = self.stream.fetchone() + + assert event.to_json() == json.dumps(event.to_dict()) + From 7b02034ca153ae38283e843c9612e67f7f1b6726 Mon Sep 17 00:00:00 2001 From: junha6316 Date: Fri, 1 Dec 2023 17:07:28 +0900 Subject: [PATCH 4/5] fix lint apply --- pymysqlreplication/tests/test_event.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pymysqlreplication/tests/test_event.py b/pymysqlreplication/tests/test_event.py index 332462e0..b55c0c4c 100644 --- a/pymysqlreplication/tests/test_event.py +++ b/pymysqlreplication/tests/test_event.py @@ -1,8 +1,4 @@ -import pytest - - from pymysqlreplication.tests.base import PyMySQLReplicationTestCase -from pymysqlreplication.event import QueryEvent from pymysqlreplication import BinLogStreamReader import json class BinLogEventTestCase(PyMySQLReplicationTestCase): From 2511ae6c9d59a9f47b1231a8d3201c5f64382c3a Mon Sep 17 00:00:00 2001 From: junha6316 Date: Fri, 1 Dec 2023 17:16:48 +0900 Subject: [PATCH 5/5] fix formatted by black --- pymysqlreplication/event.py | 4 ++-- pymysqlreplication/tests/test_event.py | 21 ++++++++------------- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index aa61b208..c928b2fe 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -88,10 +88,10 @@ def to_dict(self) -> dict: "event_size": self.event_size, "read_bytes": self.packet.read_bytes, } - + def to_json(self) -> str: return json.dumps(self.to_dict()) - + def _dump(self): """Core data dumped for the event""" pass diff --git a/pymysqlreplication/tests/test_event.py b/pymysqlreplication/tests/test_event.py index b55c0c4c..7d8a4f15 100644 --- a/pymysqlreplication/tests/test_event.py +++ b/pymysqlreplication/tests/test_event.py @@ -1,8 +1,9 @@ from pymysqlreplication.tests.base import PyMySQLReplicationTestCase -from pymysqlreplication import BinLogStreamReader +from pymysqlreplication import BinLogStreamReader import json -class BinLogEventTestCase(PyMySQLReplicationTestCase): + +class BinLogEventTestCase(PyMySQLReplicationTestCase): def setUp(self): super(BinLogEventTestCase, self).setUp() self.execute("SET SESSION binlog_rows_query_log_events=1") @@ -11,25 +12,20 @@ def tearDown(self): self.execute("SET SESSION binlog_rows_query_log_events=0") super(BinLogEventTestCase, self).tearDown() - target_fields = [ - "timestamp", - "log_pos", - "event_size", - "read_bytes" - ] + target_fields = ["timestamp", "log_pos", "event_size", "read_bytes"] def test_to_dict(self): self.stream = BinLogStreamReader(self.database, server_id=1024) query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - self.execute("COMMIT") + self.execute("COMMIT") event = self.stream.fetchone() event_dict = event.to_dict() - + self.assertEqual(set(event_dict.keys()), set(self.target_fields)) - self.assertEqual(event_dict["timestamp"], event.formatted_timestamp) + self.assertEqual(event_dict["timestamp"], event.formatted_timestamp) self.assertEqual(event_dict["log_pos"], event.packet.log_pos) self.assertEqual(event_dict["read_bytes"], event.packet.read_bytes) self.assertEqual(event_dict["event_size"], event.event_size) @@ -38,9 +34,8 @@ def test_to_json(self): self.stream = BinLogStreamReader(self.database, server_id=1024) query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - self.execute("COMMIT") + self.execute("COMMIT") event = self.stream.fetchone() assert event.to_json() == json.dumps(event.to_dict()) -