Skip to content

Commit

Permalink
Merge pull request #103 from KDMW-IO/bug/mysql-bulk-queries-fix
Browse files Browse the repository at this point in the history
MySQL support for bulk inserts, deletes by reading multiple event rows
  • Loading branch information
long2ice authored Aug 8, 2024
2 parents fc1877d + b1234bd commit ab1a5ed
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 18 deletions.
20 changes: 11 additions & 9 deletions meilisync/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ def __init__(self):
def add_event(self, sync: Sync, event: Event):
pk = event.data[sync.pk]
self._events.setdefault(sync, {})
self._events[sync][pk] = event
self._events[sync].setdefault(pk, [])
self._events[sync][pk].append(event)

@property
def size(self):
Expand All @@ -21,16 +22,17 @@ def pop_events(self):
updated_events = {}
created_events = {}
deleted_events = {}
for sync, events in self._events.items():
for sync, pk_events_dict in self._events.items():
updated_events[sync] = []
created_events[sync] = []
deleted_events[sync] = []
for event in events.values():
if event.type == EventType.create:
created_events[sync].append(event)
elif event.type == EventType.update:
updated_events[sync].append(event)
elif event.type == EventType.delete:
deleted_events[sync].append(event)
for events in pk_events_dict.values():
for event in events:
if event.type == EventType.create:
created_events[sync].append(event)
elif event.type == EventType.update:
updated_events[sync].append(event)
elif event.type == EventType.delete:
deleted_events[sync].append(event)
self._events = {}
return created_events, updated_events, deleted_events
19 changes: 10 additions & 9 deletions meilisync/source/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,24 @@ async def __aiter__(self):
async for event in self.stream:
if isinstance(event, WriteRowsEvent):
event_type = EventType.create
data = event.rows[0]["values"]
data_list = [row["values"] for row in event.rows]
elif isinstance(event, UpdateRowsEvent):
event_type = EventType.update
data = event.rows[0]["after_values"]
data_list = [row["after_values"] for row in event.rows]
elif isinstance(event, DeleteRowsEvent):
event_type = EventType.delete
data = event.rows[0]["values"]
data_list = [row["values"] for row in event.rows]
else:
continue
self.progress["master_log_file"] = self.stream._master_log_file
self.progress["master_log_position"] = self.stream._master_log_position
yield Event(
type=event_type,
table=event.table,
data=data,
progress=self.progress,
)
for data in data_list:
yield Event(
type=event_type,
table=event.table,
data=data,
progress=self.progress,
)
except OperationalError as e:
logger.exception(f"Binlog stream error: {e}, sleep 10s and retry...")
await asyncio.sleep(10)
Expand Down

0 comments on commit ab1a5ed

Please sign in to comment.