Skip to content

Commit

Permalink
Delete respects graph order (#844)
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk authored Sep 24, 2024
1 parent 385bbf5 commit c2f0988
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 38 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ Recommendation: for ease of reading, use the following order:
- Fixed
-->

## [Unreleased]
### Added
- `kamu delete` command will respect dependency graph ordering allowing to delete multiple datasets without encountering dangling reference errors

## [0.203.0] - 2024-09-22
### Added
- Support `List` and `Struct` arrow types in `json` and `json-aoa` encodings
Expand Down
76 changes: 38 additions & 38 deletions src/app/cli/src/commands/delete_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

use std::sync::Arc;

use futures::{future, StreamExt, TryStreamExt};
use futures::{StreamExt, TryStreamExt};
use internal_error::ResultIntoInternal;
use kamu::domain::*;
use kamu::utils::datasets_filtering::filter_datasets_by_local_pattern;
Expand Down Expand Up @@ -61,45 +61,36 @@ impl Command for DeleteCommand {
return Err(CLIError::usage_error("Specify a dataset or use --all flag"));
}

let dataset_ids: Vec<_> = if self.all {
self.dataset_repo
.get_all_datasets()
.map_ok(|dataset_handle| dataset_handle.id)
.try_collect()
.await?
let dataset_handles: Vec<DatasetHandle> = if self.all {
self.dataset_repo.get_all_datasets().try_collect().await?
} else {
filter_datasets_by_local_pattern(
self.dataset_repo.as_ref(),
self.dataset_ref_patterns.clone(),
)
.map_ok(|dataset_handle| dataset_handle.id)
.try_collect()
.await?
};

let dataset_refs: Vec<_> = if self.recursive
|| self.all
|| self
.dataset_ref_patterns
.contains(&DatasetRefPattern::Pattern(DatasetAliasPattern::new(
None,
DatasetNamePattern::new_unchecked("%"),
))) {
let dataset_handles: Vec<DatasetHandle> = if !self.recursive {
dataset_handles
} else {
self.dependency_graph_service
.get_recursive_downstream_dependencies(dataset_ids)
.get_recursive_downstream_dependencies(
dataset_handles.into_iter().map(|h| h.id).collect(),
)
.await
.int_err()?
.map(DatasetRef::ID)
.collect()
.await
} else {
dataset_ids
.iter()
.map(|dataset_id| DatasetRef::ID(dataset_id.clone()))
.collect()
.map(DatasetID::into_local_ref)
.then(|hdl| {
let repo = self.dataset_repo.clone();
async move { repo.resolve_dataset_ref(&hdl).await }
})
.try_collect()
.await?
};

if dataset_refs.is_empty() {
if dataset_handles.is_empty() {
eprintln!(
"{}",
console::style("There are no datasets matching the pattern").yellow()
Expand All @@ -110,15 +101,10 @@ impl Command for DeleteCommand {
let confirmed = if self.no_confirmation {
true
} else {
let dataset_aliases = future::join_all(dataset_refs.iter().map(|dataset_id| async {
let dataset_hdl = self
.dataset_repo
.resolve_dataset_ref(dataset_id)
.await
.unwrap();
dataset_hdl.alias.to_string()
}))
.await;
let dataset_aliases: Vec<String> = dataset_handles
.iter()
.map(|h| h.alias.to_string())
.collect();

common::prompt_yes_no(&format!(
"{}\n {}\n{}\nDo you wish to continue? [y/N]: ",
Expand All @@ -132,8 +118,22 @@ impl Command for DeleteCommand {
return Err(CLIError::Aborted);
}

for dataset_ref in &dataset_refs {
match self.delete_dataset.execute_via_ref(dataset_ref).await {
// TODO: Multiple rounds of resolving IDs to handles
let dataset_ids = self
.dependency_graph_service
.in_dependency_order(
dataset_handles.into_iter().map(|h| h.id).collect(),
DependencyOrder::DepthFirst,
)
.await
.map_err(CLIError::critical)?;

for id in &dataset_ids {
match self
.delete_dataset
.execute_via_ref(&id.as_local_ref())
.await
{
Ok(_) => Ok(()),
Err(DeleteDatasetError::DanglingReference(e)) => Err(CLIError::failure(e)),
Err(DeleteDatasetError::Access(e)) => Err(CLIError::failure(e)),
Expand All @@ -143,7 +143,7 @@ impl Command for DeleteCommand {

eprintln!(
"{}",
console::style(format!("Deleted {} dataset(s)", dataset_refs.len()))
console::style(format!("Deleted {} dataset(s)", dataset_ids.len()))
.green()
.bold()
);
Expand Down
17 changes: 17 additions & 0 deletions src/domain/core/src/services/dependency_graph_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ pub trait DependencyGraphService: Sync + Send {
&self,
dataset_ids: Vec<DatasetID>,
) -> Result<DatasetIDStream, GetDependenciesError>;

/// Given a set of dataset IDs this will sort them in depth-first or
/// breadth-first graph traversal order which is useful for operations that
/// require upstream datasets to be processed before downstream or vice
/// versa
async fn in_dependency_order(
&self,
dataset_ids: Vec<DatasetID>,
order: DependencyOrder,
) -> Result<Vec<DatasetID>, GetDependenciesError>;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -58,6 +68,13 @@ pub type DatasetIDStream<'a> = std::pin::Pin<Box<dyn Stream<Item = DatasetID> +

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

pub enum DependencyOrder {
BreadthFirst,
DepthFirst,
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(Error, Debug)]
pub enum GetDependenciesError {
#[error(transparent)]
Expand Down
28 changes: 28 additions & 0 deletions src/infra/core/src/dependency_graph_service_inmem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,34 @@ impl DependencyGraphService for DependencyGraphServiceInMemory {

Ok(Box::pin(tokio_stream::iter(upstream_node_datasets)))
}

async fn in_dependency_order(
&self,
dataset_ids: Vec<DatasetID>,
order: DependencyOrder,
) -> Result<Vec<DatasetID>, GetDependenciesError> {
self.ensure_datasets_initially_scanned()
.await
.int_err()
.map_err(GetDependenciesError::Internal)
.unwrap();

let original_set: std::collections::HashSet<_> = dataset_ids.iter().cloned().collect();

let mut result = match order {
DependencyOrder::BreadthFirst => {
self.run_recursive_reversed_breadth_first_search(dataset_ids)
.await?
}
DependencyOrder::DepthFirst => {
self.run_recursive_depth_first_search(dataset_ids).await?
}
};

result.retain(|id| original_set.contains(id));

Ok(result)
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
73 changes: 73 additions & 0 deletions src/infra/core/tests/tests/test_dependency_graph_inmem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,51 @@ async fn test_get_recursive_upstream_dependencies() {
assert_eq!(result, expected_result);
}

#[test_log::test(tokio::test)]
async fn test_in_dependency_order() {
let harness = create_large_dataset_graph().await;

// First, millde, last dataset in breadth-first preserve order
let result = harness
.in_dependency_order(
vec![
"test-root-foo",
"test-derive-foo-foo-foo",
"test-derive-foo-foo-foo-bar-foo-bar",
],
DependencyOrder::BreadthFirst,
)
.await;
assert_eq!(
result,
vec![
"test-root-foo",
"test-derive-foo-foo-foo",
"test-derive-foo-foo-foo-bar-foo-bar"
]
);

// First, millde, last dataset in depth-first reverse order
let result = harness
.in_dependency_order(
vec![
"test-root-foo",
"test-derive-foo-foo-foo",
"test-derive-foo-foo-foo-bar-foo-bar",
],
DependencyOrder::DepthFirst,
)
.await;
assert_eq!(
result,
vec![
"test-derive-foo-foo-foo-bar-foo-bar",
"test-derive-foo-foo-foo",
"test-root-foo"
]
);
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

struct DependencyGraphHarness {
Expand Down Expand Up @@ -780,6 +825,34 @@ impl DependencyGraphHarness {
res
}

async fn in_dependency_order(
&self,
dataset_names: Vec<&str>,
order: DependencyOrder,
) -> Vec<String> {
let dataset_ids: Vec<_> = future::join_all(
dataset_names
.iter()
.map(|dataset_name| async { self.dataset_id_by_name(dataset_name).await }),
)
.await;

let ids: Vec<_> = self
.dependency_graph_service
.in_dependency_order(dataset_ids, order)
.await
.int_err()
.unwrap();

let mut res = Vec::new();
for id in ids {
let dataset_alias = self.dataset_alias_by_id(&id).await;
res.push(format!("{dataset_alias}"));
}

res
}

async fn dataset_id_by_name(&self, dataset_name: &str) -> DatasetID {
let dataset_alias = DatasetAlias::try_from(dataset_name).unwrap();
let dataset_hdl = self
Expand Down

0 comments on commit c2f0988

Please sign in to comment.