From 5c2ebfe7de6a22d39568aa8843830fd56ff337c9 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Thu, 29 Sep 2022 10:20:03 -0400 Subject: [PATCH 1/3] Remove offset - 1 from _build_offsets --- faust/tables/recovery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index a9615d52e..862f865d1 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -706,7 +706,7 @@ async def _build_offsets( earliest = await consumer.earliest_offsets(*tps) # FIXME To be consistent with the offset -1 logic earliest = { - tp: offset - 1 if offset is not None else None + tp: offset if offset is not None else None for tp, offset in earliest.items() } From 6ef0cb97fe85ff45b1457da3d14af04c71eca9de Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Tue, 22 Nov 2022 16:35:49 -0500 Subject: [PATCH 2/3] try this? --- faust/tables/recovery.py | 7 +------ tests/unit/tables/test_recovery.py | 2 +- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index 862f865d1..7c0f92b9b 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -704,11 +704,6 @@ async def _build_offsets( # -- Update offsets # Offsets may have been compacted, need to get to the recent ones earliest = await consumer.earliest_offsets(*tps) - # FIXME To be consistent with the offset -1 logic - earliest = { - tp: offset if offset is not None else None - for tp, offset in earliest.items() - } for tp in tps: last_value = destination[tp] @@ -721,7 +716,7 @@ async def _build_offsets( elif new_value is None: destination[tp] = last_value else: - destination[tp] = max(last_value, new_value) + destination[tp] = max(last_value, new_value - 1) if destination: self.log.info( diff --git a/tests/unit/tables/test_recovery.py b/tests/unit/tables/test_recovery.py index 0412ea6fe..33e90bdc8 100644 --- a/tests/unit/tables/test_recovery.py +++ b/tests/unit/tables/test_recovery.py @@ -249,7 +249,7 @@ async def test__build_offsets_with_none(self, *, recovery, app) -> None: destination = {TP1: None, TP2: 1, TP3: 8, TP4: -1} await recovery._build_offsets(consumer, tps, destination, "some-title") assert len(destination) == 4 - assert destination[TP1] == -1 + assert destination[TP1] == 0 assert destination[TP2] == 2 assert destination[TP3] == 8 assert destination[TP4] == -1 From 12266d2b683bd21141fdf98b03e7cc9c2ee42ec8 Mon Sep 17 00:00:00 2001 From: szicari-streambit <80933567+szicari-streambit@users.noreply.github.com> Date: Sat, 14 Dec 2024 13:14:58 -0600 Subject: [PATCH 3/3] Update test based on flake output. --- tests/unit/tables/test_wrappers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/tables/test_wrappers.py b/tests/unit/tables/test_wrappers.py index 18c233f6d..0aa11d519 100644 --- a/tests/unit/tables/test_wrappers.py +++ b/tests/unit/tables/test_wrappers.py @@ -374,7 +374,7 @@ def wset(self, *, iwtable, event): @pytest.fixture() def data(self, *, freeze_time, iwtable): - iwtable.key_index_table = {k: 1 for k in self.TABLE_DATA} + iwtable.key_index_table = dict.fromkeys(self.TABLE_DATA, 1) iwtable.table._data = {} for w in iwtable.table._window_ranges(freeze_time.time): iwtable.table._data.update({(k, w): v for k, v in self.TABLE_DATA.items()})