diff --git a/sqlmesh/core/plan/evaluator.py b/sqlmesh/core/plan/evaluator.py index d2aa5ef08..8ed75693d 100644 --- a/sqlmesh/core/plan/evaluator.py +++ b/sqlmesh/core/plan/evaluator.py @@ -21,6 +21,7 @@ from sqlmesh.core import analytics from sqlmesh.core import constants as c from sqlmesh.core.console import Console, get_console +from sqlmesh.core.environment import EnvironmentNamingInfo from sqlmesh.core.notification_target import ( NotificationTarget, ) @@ -32,6 +33,8 @@ SnapshotEvaluator, SnapshotIntervals, SnapshotId, + SnapshotInfoLike, + SnapshotTableInfo, ) from sqlmesh.core.state_sync import StateSync from sqlmesh.core.state_sync.base import PromotionResult @@ -306,14 +309,16 @@ def _update_views( completed = False try: - self.snapshot_evaluator.promote( + self._promote_snapshots( + plan, [snapshots[s.snapshot_id] for s in promotion_result.added], environment.naming_info, deployability_index=deployability_index, on_complete=lambda s: self.console.update_promotion_progress(s, True), ) if promotion_result.removed_environment_naming_info: - self.snapshot_evaluator.demote( + self._demote_snapshots( + plan, promotion_result.removed, promotion_result.removed_environment_naming_info, on_complete=lambda s: self.console.update_promotion_progress(s, False), @@ -323,6 +328,32 @@ def _update_views( finally: self.console.stop_promotion_progress(success=completed) + def _promote_snapshots( + self, + plan: EvaluatablePlan, + target_snapshots: t.Iterable[Snapshot], + environment_naming_info: EnvironmentNamingInfo, + deployability_index: t.Optional[DeployabilityIndex] = None, + on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None, + ) -> None: + self.snapshot_evaluator.promote( + target_snapshots, + environment_naming_info, + deployability_index=deployability_index, + on_complete=on_complete, + ) + + def _demote_snapshots( + self, + plan: EvaluatablePlan, + target_snapshots: t.Iterable[SnapshotTableInfo], + environment_naming_info: EnvironmentNamingInfo, + on_complete: t.Optional[t.Callable[[SnapshotInfoLike], None]] = None, + ) -> None: + self.snapshot_evaluator.demote( + target_snapshots, environment_naming_info, on_complete=on_complete + ) + def _restate(self, plan: EvaluatablePlan, snapshots_by_name: t.Dict[str, Snapshot]) -> None: if not plan.restatements: return