From dd77210756e6432ef2737ab59a65c238e3abfdda Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 29 Oct 2024 17:26:28 -0500 Subject: [PATCH] Update microbatch `end_time` to the `batch_size` ceiling (#10883) * Add function to MicrobatchBuilder to get ceiling of timestamp by batch_size * Update `MicrobatchBuilder.build_end_time` to use `ceiling_timestamp` * fix TestMicrobatchBuilder.test_build_end_time by specifying a BatchSize + asserting actual is a ceiling timestamp * Add changie --------- Co-authored-by: Michelle Ark --- .../unreleased/Fixes-20241029-161615.yaml | 7 +++ .../incremental/microbatch.py | 25 ++++++++- .../incremental/test_microbatch.py | 56 ++++++++++++++++++- 3 files changed, 84 insertions(+), 4 deletions(-) create mode 100644 .changes/unreleased/Fixes-20241029-161615.yaml diff --git a/.changes/unreleased/Fixes-20241029-161615.yaml b/.changes/unreleased/Fixes-20241029-161615.yaml new file mode 100644 index 00000000000..ba27493d303 --- /dev/null +++ b/.changes/unreleased/Fixes-20241029-161615.yaml @@ -0,0 +1,7 @@ +kind: Fixes +body: Take `end_time` for batches to the ceiling to handle edge case where `event_time` + column is a date +time: 2024-10-29T16:16:15.714993-05:00 +custom: + Author: QMalcolm MichelleArk + Issue: "10868" diff --git a/core/dbt/materializations/incremental/microbatch.py b/core/dbt/materializations/incremental/microbatch.py index 690083f3c3e..b89c834d4a2 100644 --- a/core/dbt/materializations/incremental/microbatch.py +++ b/core/dbt/materializations/incremental/microbatch.py @@ -40,7 +40,8 @@ def __init__( def build_end_time(self): """Defaults the end_time to the current time in UTC unless a non `None` event_time_end was provided""" - return self.event_time_end or self.default_end_time + end_time = self.event_time_end or self.default_end_time + return MicrobatchBuilder.ceiling_timestamp(end_time, self.model.config.batch_size) def build_start_time(self, checkpoint: Optional[datetime]): """Create a start time based off the passed in checkpoint. @@ -161,7 +162,7 @@ def offset_timestamp(timestamp: datetime, batch_size: BatchSize, offset: int) -> return offset_timestamp @staticmethod - def truncate_timestamp(timestamp: datetime, batch_size: BatchSize): + def truncate_timestamp(timestamp: datetime, batch_size: BatchSize) -> datetime: """Truncates the passed in timestamp based on the batch_size. 2024-09-17 16:06:00 + Batchsize.hour -> 2024-09-17 16:00:00 @@ -201,3 +202,23 @@ def format_batch_start( return str( batch_start.date() if (batch_start and batch_size != BatchSize.hour) else batch_start ) + + @staticmethod + def ceiling_timestamp(timestamp: datetime, batch_size: BatchSize) -> datetime: + """Takes the given timestamp and moves it to the ceiling for the given batch size + + Note, if the timestamp is already the batch size ceiling, that is returned + 2024-09-17 16:06:00 + BatchSize.hour -> 2024-09-17 17:00:00 + 2024-09-17 16:00:00 + BatchSize.hour -> 2024-09-17 16:00:00 + 2024-09-17 16:06:00 + BatchSize.day -> 2024-09-18 00:00:00 + 2024-09-17 00:00:00 + BatchSize.day -> 2024-09-17 00:00:00 + 2024-09-17 16:06:00 + BatchSize.month -> 2024-10-01 00:00:00 + 2024-09-01 00:00:00 + BatchSize.month -> 2024-09-01 00:00:00 + 2024-09-17 16:06:00 + BatchSize.year -> 2025-01-01 00:00:00 + 2024-01-01 00:00:00 + BatchSize.year -> 2024-01-01 00:00:00 + + """ + ceiling = truncated = MicrobatchBuilder.truncate_timestamp(timestamp, batch_size) + if truncated != timestamp: + ceiling = MicrobatchBuilder.offset_timestamp(truncated, batch_size, 1) + return ceiling diff --git a/tests/unit/materializations/incremental/test_microbatch.py b/tests/unit/materializations/incremental/test_microbatch.py index 8581e074ee7..f114d8649c3 100644 --- a/tests/unit/materializations/incremental/test_microbatch.py +++ b/tests/unit/materializations/incremental/test_microbatch.py @@ -20,6 +20,7 @@ def microbatch_model(self): model.config.materialized = "incremental" model.config.incremental_strategy = "microbatch" model.config.begin = MODEL_CONFIG_BEGIN + model.config.batch_size = BatchSize.day return model @@ -30,12 +31,12 @@ def microbatch_model(self): ( False, None, - datetime(2024, 9, 5, 8, 56, 0, 0, pytz.UTC), + datetime(2024, 9, 6, 0, 0, 0, 0, pytz.UTC), ), ( True, None, - datetime(2024, 9, 5, 8, 56, 0, 0, pytz.UTC), + datetime(2024, 9, 6, 0, 0, 0, 0, pytz.UTC), ), ( False, @@ -616,3 +617,54 @@ def test_format_batch_start(self, batch_size, batch_start, expected_formatted_ba MicrobatchBuilder.format_batch_start(batch_start, batch_size) == expected_formatted_batch_start ) + + @pytest.mark.parametrize( + "timestamp,batch_size,expected_datetime", + [ + ( + datetime(2024, 9, 17, 16, 6, 0, 0, pytz.UTC), + BatchSize.hour, + datetime(2024, 9, 17, 17, 0, 0, 0, pytz.UTC), + ), + ( + datetime(2024, 9, 17, 16, 0, 0, 0, pytz.UTC), + BatchSize.hour, + datetime(2024, 9, 17, 16, 0, 0, 0, pytz.UTC), + ), + ( + datetime(2024, 9, 17, 16, 6, 0, 0, pytz.UTC), + BatchSize.day, + datetime(2024, 9, 18, 0, 0, 0, 0, pytz.UTC), + ), + ( + datetime(2024, 9, 17, 0, 0, 0, 0, pytz.UTC), + BatchSize.day, + datetime(2024, 9, 17, 0, 0, 0, 0, pytz.UTC), + ), + ( + datetime(2024, 9, 17, 16, 6, 0, 0, pytz.UTC), + BatchSize.month, + datetime(2024, 10, 1, 0, 0, 0, 0, pytz.UTC), + ), + ( + datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC), + BatchSize.month, + datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC), + ), + ( + datetime(2024, 9, 17, 16, 6, 0, 0, pytz.UTC), + BatchSize.year, + datetime(2025, 1, 1, 0, 0, 0, 0, pytz.UTC), + ), + ( + datetime(2024, 1, 1, 0, 0, 0, 0, pytz.UTC), + BatchSize.year, + datetime(2024, 1, 1, 0, 0, 0, 0, pytz.UTC), + ), + ], + ) + def test_ceiling_timestamp( + self, timestamp: datetime, batch_size: BatchSize, expected_datetime: datetime + ) -> None: + ceilinged = MicrobatchBuilder.ceiling_timestamp(timestamp, batch_size) + assert ceilinged == expected_datetime