From da99dca4cff5b977ad4e1f466b2cd39cc3d8c0c1 Mon Sep 17 00:00:00 2001 From: Tolu Aina <7848930+toluaina@users.noreply.github.com> Date: Wed, 3 Jul 2024 16:44:48 +0100 Subject: [PATCH] removed max lsn --- pgsync/base.py | 50 ++++++++++------------------------------------ pgsync/sync.py | 23 ++++++++++++--------- tests/test_sync.py | 11 ++++++---- 3 files changed, 32 insertions(+), 52 deletions(-) diff --git a/pgsync/base.py b/pgsync/base.py index 07fac26..43a361e 100644 --- a/pgsync/base.py +++ b/pgsync/base.py @@ -473,7 +473,7 @@ def _logical_slot_changes( func: sa.sql.functions._FunctionGenerator, txmin: t.Optional[int] = None, txmax: t.Optional[int] = None, - upto_lsn: t.Optional[int] = None, + upto_lsn: t.Optional[str] = None, upto_nchanges: t.Optional[int] = None, limit: t.Optional[int] = None, offset: t.Optional[int] = None, @@ -486,7 +486,7 @@ def _logical_slot_changes( func (sa.sql.functions._FunctionGenerator): The function to use to read from the slot. txmin (Optional[int], optional): The minimum transaction ID to read from. Defaults to None. txmax (Optional[int], optional): The maximum transaction ID to read from. Defaults to None. - upto_lsn (Optional[int], optional): The maximum LSN to read up to. Defaults to None. + upto_lsn (Optional[str], optional): The maximum LSN to read up to. Defaults to None. upto_nchanges (Optional[int], optional): The maximum number of changes to read. Defaults to None. limit (Optional[int], optional): The maximum number of rows to return. Defaults to None. offset (Optional[int], optional): The number of rows to skip before returning. Defaults to None. @@ -529,48 +529,20 @@ def _logical_slot_changes( statement = statement.offset(offset) return statement - def max_lsn( - self, - slot_name: str, - txmin: t.Optional[int] = None, - txmax: t.Optional[int] = None, - ): - filters: list = [] - statement: sa.sql.Select = sa.select( - sa.func.MAX(sa.text("lsn")), - ).select_from( - sa.func.PG_LOGICAL_SLOT_PEEK_CHANGES( - slot_name, - None, - None, - ) - ) - if txmin is not None: - filters.append( - sa.cast( - sa.cast(sa.column("xid"), sa.Text), - sa.BigInteger, - ) - >= txmin - ) - if txmax is not None: - filters.append( - sa.cast( - sa.cast(sa.column("xid"), sa.Text), - sa.BigInteger, - ) - < txmax + @property + def current_wal_lsn(self) -> str: + return self.fetchone( + sa.select(sa.func.MAX(sa.text("pg_current_wal_lsn"))).select_from( + sa.func.PG_CURRENT_WAL_LSN() ) - if filters: - statement = statement.where(sa.and_(*filters)) - return self.fetchone(statement)[0] + )[0] def logical_slot_get_changes( self, slot_name: str, txmin: t.Optional[int] = None, txmax: t.Optional[int] = None, - upto_lsn: t.Optional[int] = None, + upto_lsn: t.Optional[str] = None, upto_nchanges: t.Optional[int] = None, limit: t.Optional[int] = None, offset: t.Optional[int] = None, @@ -600,7 +572,7 @@ def logical_slot_peek_changes( slot_name: str, txmin: t.Optional[int] = None, txmax: t.Optional[int] = None, - upto_lsn: t.Optional[int] = None, + upto_lsn: t.Optional[str] = None, upto_nchanges: t.Optional[int] = None, limit: t.Optional[int] = None, offset: t.Optional[int] = None, @@ -626,7 +598,7 @@ def logical_slot_count_changes( slot_name: str, txmin: t.Optional[int] = None, txmax: t.Optional[int] = None, - upto_lsn: t.Optional[int] = None, + upto_lsn: t.Optional[str] = None, upto_nchanges: t.Optional[int] = None, ) -> int: statement: sa.sql.Select = self._logical_slot_changes( diff --git a/pgsync/sync.py b/pgsync/sync.py index 453aa99..2c875ca 100644 --- a/pgsync/sync.py +++ b/pgsync/sync.py @@ -350,6 +350,7 @@ def logical_slot_changes( txmin: t.Optional[int] = None, txmax: t.Optional[int] = None, upto_nchanges: t.Optional[int] = None, + upto_lsn: t.Optional[str] = None, ) -> None: """ Process changes from the db logical replication logs. @@ -377,19 +378,13 @@ def logical_slot_changes( # minimize the tmp file disk usage when calling # PG_LOGICAL_SLOT_PEEK_CHANGES and PG_LOGICAL_SLOT_GET_CHANGES # by limiting to a smaller batch size. - - upto_nchanges: int = upto_nchanges or settings.LOGICAL_SLOT_CHUNK_SIZE - - # this is the max lsn we can go upto - max_lsn: int = self.max_lsn(self.__name, txmin=txmin, txmax=txmax) - while True: changes: int = self.logical_slot_peek_changes( self.__name, txmin=txmin, txmax=txmax, upto_nchanges=upto_nchanges, - upto_lsn=max_lsn, + upto_lsn=upto_lsn, ) if not changes: break @@ -446,7 +441,7 @@ def logical_slot_changes( txmin=txmin, txmax=txmax, upto_nchanges=upto_nchanges, - upto_lsn=max_lsn, + upto_lsn=upto_lsn, ) self.count["xlog"] += len(rows) @@ -1220,13 +1215,22 @@ def pull(self) -> None: """Pull data from db.""" txmin: int = self.checkpoint txmax: int = self.txid_current + # this is the max lsn we should go upto + upto_lsn: str = self.current_wal_lsn + upto_nchanges: int = settings.LOGICAL_SLOT_CHUNK_SIZE + logger.debug(f"pull txmin: {txmin} - txmax: {txmax}") # forward pass sync self.search_client.bulk( self.index, self.sync(txmin=txmin, txmax=txmax) ) # now sync up to txmax to capture everything we may have missed - self.logical_slot_changes(txmin=txmin, txmax=txmax, upto_nchanges=None) + self.logical_slot_changes( + txmin=txmin, + txmax=txmax, + upto_nchanges=upto_nchanges, + upto_lsn=upto_lsn, + ) self.checkpoint: int = txmax or self.txid_current self._truncate = True @@ -1263,6 +1267,7 @@ async def async_status(self) -> None: await asyncio.sleep(settings.LOG_INTERVAL) def _status(self, label: str) -> None: + # TODO: indicate if we are processing logical logs or not sys.stdout.write( f"{label} {self.database}:{self.index} " f"Xlog: [{self.count['xlog']:,}] => " diff --git a/tests/test_sync.py b/tests/test_sync.py index f5efc35..bbf23d5 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -81,7 +81,7 @@ def test_logical_slot_changes(self, mock_logger, sync): "testdb_testdb", txmin=None, txmax=None, - upto_nchanges=settings.LOGICAL_SLOT_CHUNK_SIZE, + upto_nchanges=None, upto_lsn=None, ) mock_sync.assert_not_called() @@ -97,7 +97,7 @@ def test_logical_slot_changes(self, mock_logger, sync): "testdb_testdb", txmin=None, txmax=None, - upto_nchanges=settings.LOGICAL_SLOT_CHUNK_SIZE, + upto_nchanges=None, upto_lsn=None, ) mock_sync.assert_not_called() @@ -125,7 +125,7 @@ def test_logical_slot_changes(self, mock_logger, sync): "testdb_testdb", txmin=None, txmax=None, - upto_nchanges=settings.LOGICAL_SLOT_CHUNK_SIZE, + upto_nchanges=None, upto_lsn=None, ) mock_get.assert_called_once() @@ -353,7 +353,10 @@ def test_pull(self, mock_logger, mock_es, sync): txmin = None txmax = sync.txid_current - 1 mock_get.assert_called_once_with( - txmin=txmin, txmax=txmax, upto_nchanges=None + txmin=txmin, + txmax=txmax, + upto_nchanges=settings.LOGICAL_SLOT_CHUNK_SIZE, + upto_lsn=ANY, ) mock_logger.debug.assert_called_once_with( f"pull txmin: {txmin} - txmax: {txmax}"