Skip to content

Commit

Permalink
Fire InvalidConcurrentBatchesConfig warning via warn_or_error so …
Browse files Browse the repository at this point in the history
…it can be silenced
  • Loading branch information
QMalcolm committed Dec 12, 2024
1 parent 5259718 commit 27e7719
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
2 changes: 1 addition & 1 deletion core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1501,7 +1501,7 @@ def check_forcing_batch_concurrency(self) -> None:
models_forcing_concurrent_batches += 1

if models_forcing_concurrent_batches > 0:
fire_event(
warn_or_error(
InvalidConcurrentBatchesConfig(
num_models=models_forcing_concurrent_batches,
adapter_type=adapter.type(),
Expand Down
45 changes: 45 additions & 0 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
ArtifactWritten,
EndOfRunSummary,
GenericExceptionOnRun,
InvalidConcurrentBatchesConfig,
JinjaLogDebug,
LogBatchResult,
LogModelResult,
Expand Down Expand Up @@ -71,6 +72,11 @@
select * from {{ ref('input_model') }}
"""

microbatch_model_force_concurrent_batches_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0), concurrent_batches=true) }}
select * from {{ ref('input_model') }}
"""

microbatch_yearly_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='year', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
select * from {{ ref('input_model') }}
Expand Down Expand Up @@ -1083,3 +1089,42 @@ def test_microbatch(

# we had a bug where having only one batch caused a generic exception
assert len(generic_exception_catcher.caught_events) == 0


class TestCanSilenceInvalidConcurrentBatchesConfigWarning(BaseMicrobatchTest):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": input_model_sql,
"microbatch_model.sql": microbatch_model_force_concurrent_batches_sql,
}

@pytest.fixture
def event_catcher(self) -> EventCatcher:
return EventCatcher(event_to_catch=InvalidConcurrentBatchesConfig) # type: ignore

def test_microbatch(
self,
project,
event_catcher: EventCatcher,
) -> None:
# This test works because postgres doesn't support concurrent batch execution
# If the postgres adapter starts supporting concurrent batch execution we'll
# need to start mocking the return value of `adapter.supports()`

with patch_microbatch_end_time("2020-01-01 13:57:00"):
_ = run_dbt(["run"], callbacks=[event_catcher.catch])
# We didn't silence the warning, so we get it
assert len(event_catcher.caught_events) == 1

# Clear caught events
event_catcher.caught_events = []

# Run again with silencing
with patch_microbatch_end_time("2020-01-01 13:57:00"):
_ = run_dbt(
["run", "--warn-error-options", "{'silence': ['InvalidConcurrentBatchesConfig']}"],
callbacks=[event_catcher.catch],
)
# Because we silenced the warning, it shouldn't get fired
assert len(event_catcher.caught_events) == 0

0 comments on commit 27e7719

Please sign in to comment.