diff --git a/meilisync/source/postgres.py b/meilisync/source/postgres.py index de102bd..70691c2 100644 --- a/meilisync/source/postgres.py +++ b/meilisync/source/postgres.py @@ -1,7 +1,7 @@ import asyncio import json from asyncio import Queue -from typing import List +from typing import List, Any import psycopg2 import psycopg2.errors @@ -88,51 +88,56 @@ def _(): def _consumer(self, msg: ReplicationMessage): payload = json.loads(msg.payload) - changes = payload.get("change") - if not changes: - return + next_lsn = payload["nextlsn"] + + changes = payload.get("change", []) for change in changes: - kind = change.get("kind") - table = change.get("table") - if table not in self.tables: - continue - columnnames = change.get("columnnames", []) - columnvalues = change.get("columnvalues", []) - columntypes = change.get("columntypes", []) - - for i in range(len(columntypes)): - if columntypes[i] == "json": - columnvalues[i] = json.loads(columnvalues[i]) - - if kind == "update": - values = dict(zip(columnnames, columnvalues)) - event_type = EventType.update - elif kind == "delete": - values = ( - dict(zip(columnnames, columnvalues)) - if columnvalues - else {change["oldkeys"]["keynames"][0]: change["oldkeys"]["keyvalues"][0]} - ) - event_type = EventType.delete - elif kind == "insert": - values = dict(zip(columnnames, columnvalues)) - event_type = EventType.create - else: - return - asyncio.new_event_loop().run_until_complete( - self.queue.put( # type: ignore - Event( - type=event_type, - table=table, - data=values, - progress={"start_lsn": payload.get("nextlsn")}, - ) - ) + self.__handle_change(change, next_lsn) + + # Always report success to the server to avoid a “disk full” condition. + # https://www.psycopg.org/docs/extras.html#psycopg2.extras.ReplicationCursor.consume_stream + msg.cursor.send_feedback(flush_lsn=msg.data_start) + + def __handle_change(self, change: dict[str, Any], next_lsn: str): + table = change.get("table") + if table not in self.tables: + return + + columnnames = change.get("columnnames", []) + columnvalues = change.get("columnvalues", []) + columntypes = change.get("columntypes", []) + + for i in range(len(columntypes)): + if columntypes[i] == "json": + columnvalues[i] = json.loads(columnvalues[i]) + + kind = change.get("kind") + if kind == "update": + values = dict(zip(columnnames, columnvalues)) + event_type = EventType.update + elif kind == "delete": + values = ( + dict(zip(columnnames, columnvalues)) + if columnvalues + else {change["oldkeys"]["keynames"][0]: change["oldkeys"]["keyvalues"][0]} ) + event_type = EventType.delete + elif kind == "insert": + values = dict(zip(columnnames, columnvalues)) + event_type = EventType.create + else: + return - # Report success to the server to avoid a “disk full” condition. - # https://www.psycopg.org/docs/extras.html#psycopg2.extras.ReplicationCursor.consume_stream - msg.cursor.send_feedback(flush_lsn=msg.data_start) + asyncio.new_event_loop().run_until_complete( + self.queue.put( # type: ignore + Event( + type=event_type, + table=table, + data=values, + progress={"start_lsn": next_lsn}, + ) + ) + ) async def get_count(self, sync: Sync): with self.conn_dict.cursor() as cur: