diff --git a/meilisync/event.py b/meilisync/event.py index 2f47f4b..fee447e 100644 --- a/meilisync/event.py +++ b/meilisync/event.py @@ -10,7 +10,8 @@ def __init__(self): def add_event(self, sync: Sync, event: Event): pk = event.data[sync.pk] self._events.setdefault(sync, {}) - self._events[sync][pk] = event + self._events[sync].setdefault(pk, []) + self._events[sync][pk].append(event) @property def size(self): @@ -21,16 +22,17 @@ def pop_events(self): updated_events = {} created_events = {} deleted_events = {} - for sync, events in self._events.items(): + for sync, pk_events_dict in self._events.items(): updated_events[sync] = [] created_events[sync] = [] deleted_events[sync] = [] - for event in events.values(): - if event.type == EventType.create: - created_events[sync].append(event) - elif event.type == EventType.update: - updated_events[sync].append(event) - elif event.type == EventType.delete: - deleted_events[sync].append(event) + for events in pk_events_dict.values(): + for event in events: + if event.type == EventType.create: + created_events[sync].append(event) + elif event.type == EventType.update: + updated_events[sync].append(event) + elif event.type == EventType.delete: + deleted_events[sync].append(event) self._events = {} return created_events, updated_events, deleted_events diff --git a/meilisync/source/mysql.py b/meilisync/source/mysql.py index 862d613..95a094d 100644 --- a/meilisync/source/mysql.py +++ b/meilisync/source/mysql.py @@ -107,23 +107,24 @@ async def __aiter__(self): async for event in self.stream: if isinstance(event, WriteRowsEvent): event_type = EventType.create - data = event.rows[0]["values"] + data_list = [row["values"] for row in event.rows] elif isinstance(event, UpdateRowsEvent): event_type = EventType.update - data = event.rows[0]["after_values"] + data_list = [row["after_values"] for row in event.rows] elif isinstance(event, DeleteRowsEvent): event_type = EventType.delete - data = event.rows[0]["values"] + data_list = [row["values"] for row in event.rows] else: continue self.progress["master_log_file"] = self.stream._master_log_file self.progress["master_log_position"] = self.stream._master_log_position - yield Event( - type=event_type, - table=event.table, - data=data, - progress=self.progress, - ) + for data in data_list: + yield Event( + type=event_type, + table=event.table, + data=data, + progress=self.progress, + ) except OperationalError as e: logger.exception(f"Binlog stream error: {e}, sleep 10s and retry...") await asyncio.sleep(10)