Skip to content

Commit

Permalink
Merge branch 'main' into mashumaro_fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
gshank committed Jan 9, 2025
2 parents 5f94de5 + b414ef2 commit a0e32c8
Show file tree
Hide file tree
Showing 44 changed files with 1,346 additions and 346 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241217-171631.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Calculate source freshness via a SQL query
time: 2024-12-17T17:16:31.841076-08:00
custom:
Author: ChenyuLInx
Issue: "8797"
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241218-170729.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add freshness definition on model for adaptive job
time: 2024-12-18T17:07:29.55754-08:00
custom:
Author: ChenyuLInx
Issue: "11123"
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20250106-132829.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Meta config for dimensions measures and entities
time: 2025-01-06T13:28:29.176439-06:00
custom:
Author: DevonFulcher
Issue: None
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20250102-140543.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Make partial parsing reparse referencing nodes of newly versioned models.
time: 2025-01-02T14:05:43.629959-05:00
custom:
Author: d-cole
Issue: "8872"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20250107-173719.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Ensure warning about microbatch lacking filter inputs is always fired
time: 2025-01-07T17:37:19.373261-06:00
custom:
Author: QMalcolm
Issue: "11159"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20250109-123309.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix microbatch dbt list --output json
time: 2025-01-09T12:33:09.958795+01:00
custom:
Author: internetcoffeephone
Issue: 10556 11098
7 changes: 7 additions & 0 deletions .changes/unreleased/Under the Hood-20250107-123955.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Under the Hood
body: Change exception type from DbtInternalException to UndefinedMacroError when
macro not found in 'run operation' command
time: 2025-01-07T12:39:55.234321-05:00
custom:
Author: michelleark
Issue: "11192"
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20250107-205838.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Create LogNodeResult event
time: 2025-01-07T20:58:38.821036Z
custom:
Author: aranke
Issue: ' '
7 changes: 6 additions & 1 deletion core/dbt/artifacts/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@
MetricTimeWindow,
MetricTypeParams,
)
from dbt.artifacts.resources.v1.model import Model, ModelConfig, TimeSpine
from dbt.artifacts.resources.v1.model import (
Model,
ModelConfig,
ModelFreshness,
TimeSpine,
)
from dbt.artifacts.resources.v1.owner import Owner
from dbt.artifacts.resources.v1.saved_query import (
Export,
Expand Down
22 changes: 21 additions & 1 deletion core/dbt/artifacts/resources/v1/model.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import enum
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Literal, Optional

from dbt.artifacts.resources.types import AccessType, NodeType
from dbt.artifacts.resources.types import AccessType, NodeType, TimePeriod
from dbt.artifacts.resources.v1.components import (
CompiledResource,
DeferRelation,
NodeVersion,
Time,
)
from dbt.artifacts.resources.v1.config import NodeConfig
from dbt_common.contracts.config.base import MergeBehavior
Expand Down Expand Up @@ -34,6 +36,23 @@ class TimeSpine(dbtClassMixin):
custom_granularities: List[CustomGranularity] = field(default_factory=list)


class ModelFreshnessDependsOnOptions(enum.Enum):
all = "all"
any = "any"


@dataclass
class ModelBuildAfter(Time):
depends_on: ModelFreshnessDependsOnOptions = ModelFreshnessDependsOnOptions.any
count: int = 0
period: TimePeriod = TimePeriod.hour


@dataclass
class ModelFreshness(dbtClassMixin):
build_after: ModelBuildAfter = field(default_factory=ModelBuildAfter)


@dataclass
class Model(CompiledResource):
resource_type: Literal[NodeType.Model]
Expand All @@ -46,6 +65,7 @@ class Model(CompiledResource):
defer_relation: Optional[DeferRelation] = None
primary_key: List[str] = field(default_factory=list)
time_spine: Optional[TimeSpine] = None
freshness: Optional[ModelFreshness] = None

def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
Expand Down
1 change: 1 addition & 0 deletions core/dbt/artifacts/resources/v1/source_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class ParsedSourceMandatory(GraphResource, HasRelationMetadata):
class SourceDefinition(ParsedSourceMandatory):
quoting: Quoting = field(default_factory=Quoting)
loaded_at_field: Optional[str] = None
loaded_at_query: Optional[str] = None
freshness: Optional[FreshnessThreshold] = None
external: Optional[ExternalTable] = None
description: str = ""
Expand Down
18 changes: 16 additions & 2 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ class OperationProvider(RuntimeProvider):

# Base context collection, used for parsing configs.
class ProviderContext(ManifestContext):
# subclasses are MacroContext, ModelContext, TestContext
# subclasses are MacroContext, ModelContext, TestContext, SourceContext
def __init__(
self,
model,
Expand All @@ -893,7 +893,7 @@ def __init__(
raise DbtInternalError(f"Invalid provider given to context: {provider}")
# mypy appeasement - we know it'll be a RuntimeConfig
self.config: RuntimeConfig
self.model: Union[Macro, ManifestNode] = model
self.model: Union[Macro, ManifestNode, SourceDefinition] = model
super().__init__(config, manifest, model.package_name)
self.sql_results: Dict[str, Optional[AttrDict]] = {}
self.config_builder: Optional[ConfigBuilder] = config_builder
Expand Down Expand Up @@ -1558,6 +1558,20 @@ def __init__(
self._search_package = search_package


class SourceContext(ProviderContext):
# SourceContext is being used to render jinja SQL during execution of
# custom SQL in source freshness. It is not used for parsing.
model: SourceDefinition

@contextproperty()
def this(self) -> Optional[RelationProxy]:
return self.db_wrapper.Relation.create_from(self.config, self.model)

@contextproperty()
def source_node(self) -> SourceDefinition:
return self.model


class ModelContext(ProviderContext):
model: ManifestNode

Expand Down
2 changes: 2 additions & 0 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from dbt.artifacts.resources import SqlOperation as SqlOperationResource
from dbt.artifacts.resources import TimeSpine
from dbt.artifacts.resources import UnitTestDefinition as UnitTestDefinitionResource
from dbt.artifacts.resources.v1.model import ModelFreshness
from dbt.artifacts.schemas.batch_results import BatchResults
from dbt.clients.jinja_static import statically_extract_has_name_this
from dbt.contracts.graph.model_config import UnitTestNodeConfig
Expand Down Expand Up @@ -1702,6 +1703,7 @@ class ParsedNodePatch(ParsedPatch):
constraints: List[Dict[str, Any]]
deprecation_date: Optional[datetime]
time_spine: Optional[TimeSpine] = None
freshness: Optional[ModelFreshness] = None


@dataclass
Expand Down
9 changes: 9 additions & 0 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
MacroArgument,
MaturityType,
MeasureAggregationParameters,
ModelFreshness,
NodeVersion,
Owner,
Quoting,
Expand Down Expand Up @@ -224,6 +225,7 @@ class UnparsedModelUpdate(UnparsedNodeUpdate):
versions: Sequence[UnparsedVersion] = field(default_factory=list)
deprecation_date: Optional[datetime.datetime] = None
time_spine: Optional[TimeSpine] = None
freshness: Optional[ModelFreshness] = None

def __post_init__(self) -> None:
if self.latest_version:
Expand Down Expand Up @@ -320,6 +322,7 @@ class UnparsedSourceTableDefinition(HasColumnTests, HasColumnAndTestProps):
config: Dict[str, Any] = field(default_factory=dict)
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
loaded_at_query: Optional[str] = None
identifier: Optional[str] = None
quoting: Quoting = field(default_factory=Quoting)
freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold)
Expand All @@ -345,6 +348,7 @@ class UnparsedSourceDefinition(dbtClassMixin):
freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold)
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
loaded_at_query: Optional[str] = None
tables: List[UnparsedSourceTableDefinition] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
config: Dict[str, Any] = field(default_factory=dict)
Expand Down Expand Up @@ -382,6 +386,7 @@ class SourceTablePatch(dbtClassMixin):
docs: Optional[Docs] = None
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
loaded_at_query: Optional[str] = None
identifier: Optional[str] = None
quoting: Quoting = field(default_factory=Quoting)
freshness: Optional[FreshnessThreshold] = field(default_factory=FreshnessThreshold)
Expand Down Expand Up @@ -425,6 +430,7 @@ class SourcePatch(dbtClassMixin):
freshness: Optional[Optional[FreshnessThreshold]] = field(default_factory=FreshnessThreshold)
loaded_at_field: Optional[str] = None
loaded_at_field_present: Optional[bool] = None
loaded_at_query: Optional[str] = None
tables: Optional[List[SourceTablePatch]] = None
tags: Optional[List[str]] = None

Expand Down Expand Up @@ -664,6 +670,7 @@ class UnparsedEntity(dbtClassMixin):
label: Optional[str] = None
role: Optional[str] = None
expr: Optional[str] = None
config: Dict[str, Any] = field(default_factory=dict)


@dataclass
Expand All @@ -684,6 +691,7 @@ class UnparsedMeasure(dbtClassMixin):
non_additive_dimension: Optional[UnparsedNonAdditiveDimension] = None
agg_time_dimension: Optional[str] = None
create_metric: bool = False
config: Dict[str, Any] = field(default_factory=dict)


@dataclass
Expand All @@ -701,6 +709,7 @@ class UnparsedDimension(dbtClassMixin):
is_partition: bool = False
type_params: Optional[UnparsedDimensionTypeParams] = None
expr: Optional[str] = None
config: Dict[str, Any] = field(default_factory=dict)


@dataclass
Expand Down
17 changes: 16 additions & 1 deletion core/dbt/events/core_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1355,8 +1355,23 @@ message LogTestResultMsg {
LogTestResult data = 2;
}

// Q008
message LogNodeResult {
NodeInfo node_info = 1;
string description = 2;
string status = 3;
int32 index = 4;
int32 total = 5;
float execution_time = 6;
string msg = 7;
}

message LogNodeResultMsg {
CoreEventInfo info = 1;
LogNodeResult data = 2;
}

// Skipped Q008, Q009, Q010
// Skipped Q009, Q010


// Q011
Expand Down
594 changes: 294 additions & 300 deletions core/dbt/events/core_types_pb2.py

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1342,7 +1342,15 @@ def status_to_level(cls, status):
return EventLevel.INFO


# Skipped Q008, Q009, Q010
class LogNodeResult(DynamicLevel):
def code(self) -> str:
return "Q008"

def message(self) -> str:
return self.msg


# Skipped Q009, Q010


class LogStartLine(InfoLevel):
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/graph/selector_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,7 @@ def search(self, included_nodes: Set[UniqueId], selector: str) -> Iterator[Uniqu

manifest: Manifest = self.previous_state.manifest

keyword_args = {} # initialize here to handle disabled node check below
for unique_id, node in self.all_nodes(included_nodes):
previous_node: Optional[SelectorTarget] = None

Expand All @@ -787,7 +788,6 @@ def search(self, included_nodes: Set[UniqueId], selector: str) -> Iterator[Uniqu
elif unique_id in manifest.saved_queries:
previous_node = SavedQuery.from_resource(manifest.saved_queries[unique_id])

keyword_args = {}
if checker.__name__ in [
"same_contract",
"check_modified_content",
Expand Down
38 changes: 23 additions & 15 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ def load(self) -> Manifest:
self.check_for_spaces_in_resource_names()
self.check_for_microbatch_deprecations()
self.check_forcing_batch_concurrency()
self.check_microbatch_model_has_a_filtered_input()

return self.manifest

Expand Down Expand Up @@ -1472,21 +1473,6 @@ def check_valid_microbatch_config(self):
f"Microbatch model '{node.name}' optional 'concurrent_batches' config must be of type `bool` if specified, but got: {type(concurrent_batches)})."
)

# Validate upstream node event_time (if configured)
has_input_with_event_time_config = False
for input_unique_id in node.depends_on.nodes:
input_node = self.manifest.expect(unique_id=input_unique_id)
input_event_time = input_node.config.event_time
if input_event_time:
if not isinstance(input_event_time, str):
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}."
)
has_input_with_event_time_config = True

if not has_input_with_event_time_config:
fire_event(MicrobatchModelNoEventTimeInputs(model_name=node.name))

def check_forcing_batch_concurrency(self) -> None:
if self.manifest.use_microbatch_batches(project_name=self.root_project.project_name):
adapter = get_adapter(self.root_project)
Expand All @@ -1508,6 +1494,28 @@ def check_forcing_batch_concurrency(self) -> None:
)
)

def check_microbatch_model_has_a_filtered_input(self):
if self.manifest.use_microbatch_batches(project_name=self.root_project.project_name):
for node in self.manifest.nodes.values():
if (
node.config.materialized == "incremental"
and node.config.incremental_strategy == "microbatch"
):
# Validate upstream node event_time (if configured)
has_input_with_event_time_config = False
for input_unique_id in node.depends_on.nodes:
input_node = self.manifest.expect(unique_id=input_unique_id)
input_event_time = input_node.config.event_time
if input_event_time:
if not isinstance(input_event_time, str):
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}."
)
has_input_with_event_time_config = True

if not has_input_with_event_time_config:
fire_event(MicrobatchModelNoEventTimeInputs(model_name=node.name))

def write_perf_info(self, target_path: str):
path = os.path.join(target_path, PERF_INFO_FILE_NAME)
write_file(path, json.dumps(self._perf_info, cls=dbt.utils.JSONEncoder, indent=4))
Expand Down
Loading

0 comments on commit a0e32c8

Please sign in to comment.