diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 82b89974..c928b2fe 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -9,6 +9,7 @@ from pymysqlreplication.exceptions import StatusVariableMismatch from pymysqlreplication.util.bytes import parse_decimal_from_bytes from typing import Union, Optional +import json class BinLogEvent(object): @@ -67,15 +68,30 @@ def _verify_event(self): self.packet.read_bytes -= 19 + self.event_size + 4 self.packet.rewind(20) + @property + def formatted_timestamp(self) -> str: + return datetime.datetime.utcfromtimestamp(self.timestamp).isoformat() + def dump(self): print(f"=== {self.__class__.__name__} ===") - print(f"Date: {datetime.datetime.utcfromtimestamp(self.timestamp).isoformat()}") + print(f"Date: {self.formatted_timestamp}") print(f"Log position: {self.packet.log_pos}") print(f"Event size: {self.event_size}") print(f"Read bytes: {self.packet.read_bytes}") self._dump() print() + def to_dict(self) -> dict: + return { + "timestamp": self.formatted_timestamp, + "log_pos": self.packet.log_pos, + "event_size": self.event_size, + "read_bytes": self.packet.read_bytes, + } + + def to_json(self) -> str: + return json.dumps(self.to_dict()) + def _dump(self): """Core data dumped for the event""" pass diff --git a/pymysqlreplication/tests/test_event.py b/pymysqlreplication/tests/test_event.py new file mode 100644 index 00000000..7d8a4f15 --- /dev/null +++ b/pymysqlreplication/tests/test_event.py @@ -0,0 +1,41 @@ +from pymysqlreplication.tests.base import PyMySQLReplicationTestCase +from pymysqlreplication import BinLogStreamReader +import json + + +class BinLogEventTestCase(PyMySQLReplicationTestCase): + def setUp(self): + super(BinLogEventTestCase, self).setUp() + self.execute("SET SESSION binlog_rows_query_log_events=1") + + def tearDown(self): + self.execute("SET SESSION binlog_rows_query_log_events=0") + super(BinLogEventTestCase, self).tearDown() + + target_fields = ["timestamp", "log_pos", "event_size", "read_bytes"] + + def test_to_dict(self): + self.stream = BinLogStreamReader(self.database, server_id=1024) + query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + self.execute(query) + self.execute("COMMIT") + + event = self.stream.fetchone() + + event_dict = event.to_dict() + + self.assertEqual(set(event_dict.keys()), set(self.target_fields)) + self.assertEqual(event_dict["timestamp"], event.formatted_timestamp) + self.assertEqual(event_dict["log_pos"], event.packet.log_pos) + self.assertEqual(event_dict["read_bytes"], event.packet.read_bytes) + self.assertEqual(event_dict["event_size"], event.event_size) + + def test_to_json(self): + self.stream = BinLogStreamReader(self.database, server_id=1024) + query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + self.execute(query) + self.execute("COMMIT") + + event = self.stream.fetchone() + + assert event.to_json() == json.dumps(event.to_dict())