Skip to content

Commit

Permalink
Merge pull request #584 from junha6316/feature-json-binlogevent
Browse files Browse the repository at this point in the history
add json feature for  binlogevent
  • Loading branch information
sean-k1 authored Dec 5, 2023
2 parents 579277e + 2511ae6 commit 78f9bf3
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 1 deletion.
18 changes: 17 additions & 1 deletion pymysqlreplication/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pymysqlreplication.exceptions import StatusVariableMismatch
from pymysqlreplication.util.bytes import parse_decimal_from_bytes
from typing import Union, Optional
import json


class BinLogEvent(object):
Expand Down Expand Up @@ -68,15 +69,30 @@ def _verify_event(self):
self.packet.read_bytes -= 19 + self.event_size + 4
self.packet.rewind(20)

@property
def formatted_timestamp(self) -> str:
return datetime.datetime.utcfromtimestamp(self.timestamp).isoformat()

def dump(self):
print(f"=== {self.__class__.__name__} ===")
print(f"Date: {datetime.datetime.utcfromtimestamp(self.timestamp).isoformat()}")
print(f"Date: {self.formatted_timestamp}")
print(f"Log position: {self.packet.log_pos}")
print(f"Event size: {self.event_size}")
print(f"Read bytes: {self.packet.read_bytes}")
self._dump()
print()

def to_dict(self) -> dict:
return {
"timestamp": self.formatted_timestamp,
"log_pos": self.packet.log_pos,
"event_size": self.event_size,
"read_bytes": self.packet.read_bytes,
}

def to_json(self) -> str:
return json.dumps(self.to_dict())

def _dump(self):
"""Core data dumped for the event"""
pass
Expand Down
41 changes: 41 additions & 0 deletions pymysqlreplication/tests/test_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from pymysqlreplication.tests.base import PyMySQLReplicationTestCase
from pymysqlreplication import BinLogStreamReader
import json


class BinLogEventTestCase(PyMySQLReplicationTestCase):
def setUp(self):
super(BinLogEventTestCase, self).setUp()
self.execute("SET SESSION binlog_rows_query_log_events=1")

def tearDown(self):
self.execute("SET SESSION binlog_rows_query_log_events=0")
super(BinLogEventTestCase, self).tearDown()

target_fields = ["timestamp", "log_pos", "event_size", "read_bytes"]

def test_to_dict(self):
self.stream = BinLogStreamReader(self.database, server_id=1024)
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
self.execute(query)
self.execute("COMMIT")

event = self.stream.fetchone()

event_dict = event.to_dict()

self.assertEqual(set(event_dict.keys()), set(self.target_fields))
self.assertEqual(event_dict["timestamp"], event.formatted_timestamp)
self.assertEqual(event_dict["log_pos"], event.packet.log_pos)
self.assertEqual(event_dict["read_bytes"], event.packet.read_bytes)
self.assertEqual(event_dict["event_size"], event.event_size)

def test_to_json(self):
self.stream = BinLogStreamReader(self.database, server_id=1024)
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
self.execute(query)
self.execute("COMMIT")

event = self.stream.fetchone()

assert event.to_json() == json.dumps(event.to_dict())

0 comments on commit 78f9bf3

Please sign in to comment.