From affeca41bf67fba8f3487d674d2c6b8f4149ebed Mon Sep 17 00:00:00 2001 From: Roman Boiko Date: Wed, 8 Jan 2025 20:39:38 +0100 Subject: [PATCH] Modify dataset env var API to upsert style (#1015) * Modify dataset env var API to upsert style * Add service tests --- CHANGELOG.md | 3 +- resources/schema.gql | 50 ++--- .../src/mutations/dataset_env_vars_mut.rs | 118 ++++++------ .../tests/tests/test_gql_dataset_env_vars.rs | 71 ++------ .../domain/src/entities/dataset_env_var.rs | 4 +- .../src/repos/dataset_env_var_repository.rs | 47 ++--- .../src/services/dataset_env_var_service.rs | 19 +- .../src/dataset_env_var_service_impl.rs | 52 ++---- .../src/dataset_env_var_service_null.rs | 15 +- .../datasets/services/tests/tests/mod.rs | 1 + .../test_dataset_env_var_service_impl.rs | 172 ++++++++++++++++++ .../repos/inmem_dataset_env_var_repository.rs | 77 ++++---- .../test_inmem_dataset_env_var_repository.rs | 2 +- ...4387c3d021b0248fd1a2fea65e4e76bc24d36.json | 39 ++++ ...35ae09b6fb859545d821ea52f492f5f3168e2.json | 16 -- ...67cbeb2c89c9db598a30d703bf0a8fadc8d87.json | 19 -- .../postgres_dataset_env_var_repository.rs | 87 ++++----- ...est_postgres_dataset_env_var_repository.rs | 2 +- .../dataset_env_var_repository_test_suite.rs | 46 +++-- ...fca0f03c0f7b5ffa11ba0236ab1b0a0324eef.json | 12 ++ ...723cbffa51b784536d2045db121a64d795e2e.json | 50 +++++ ...35ae09b6fb859545d821ea52f492f5f3168e2.json | 12 -- ...67cbeb2c89c9db598a30d703bf0a8fadc8d87.json | 12 -- src/infra/datasets/sqlite/src/lib.rs | 2 + .../sqlite_dataset_env_var_repository.rs | 117 ++++++------ .../test_sqlite_dataset_env_var_repository.rs | 4 +- 26 files changed, 559 insertions(+), 490 deletions(-) create mode 100644 src/domain/datasets/services/tests/tests/test_dataset_env_var_service_impl.rs create mode 100644 src/infra/datasets/postgres/.sqlx/query-0a636b42b4d1cd67772d8eb9d624387c3d021b0248fd1a2fea65e4e76bc24d36.json delete mode 100644 src/infra/datasets/postgres/.sqlx/query-a82eb4926ca63888399a8f1eb3335ae09b6fb859545d821ea52f492f5f3168e2.json delete mode 100644 src/infra/datasets/postgres/.sqlx/query-e7c75f2b36a20c1c1fd30531e0b67cbeb2c89c9db598a30d703bf0a8fadc8d87.json create mode 100644 src/infra/datasets/sqlite/.sqlx/query-0739a2ee5162394a888a644efd6fca0f03c0f7b5ffa11ba0236ab1b0a0324eef.json create mode 100644 src/infra/datasets/sqlite/.sqlx/query-140e91ce459e7759cbd0f57dc8f723cbffa51b784536d2045db121a64d795e2e.json delete mode 100644 src/infra/datasets/sqlite/.sqlx/query-a82eb4926ca63888399a8f1eb3335ae09b6fb859545d821ea52f492f5f3168e2.json delete mode 100644 src/infra/datasets/sqlite/.sqlx/query-e7c75f2b36a20c1c1fd30531e0b67cbeb2c89c9db598a30d703bf0a8fadc8d87.json diff --git a/CHANGELOG.md b/CHANGELOG.md index 2493f57db2..6b89ed86f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,8 @@ Recommendation: for ease of reading, use the following order: ## [Unreleased] ### Changed -- GraphQL: accountListFlows returns list sorted by status and last event time +- GraphQL: flows are listed ordered by status and last event time +- Merged two methods(`saveEnvVariable` and `modifyEnvVariable`) from `DatasetEnvVarsMut` info one `upsertEnvVariable` ### Fixed - GQL api flows queries now fetch dataset polling source only once per dataset(and only if Ingest flow type is here) - Flow trigger status now become disable on flow fail diff --git a/resources/schema.gql b/resources/schema.gql index 614e37bd43..d4b78bae1e 100644 --- a/resources/schema.gql +++ b/resources/schema.gql @@ -560,9 +560,8 @@ type DatasetEnvVars { } type DatasetEnvVarsMut { - saveEnvVariable(key: String!, value: String!, isSecret: Boolean!): SaveDatasetEnvVarResult! + upsertEnvVariable(key: String!, value: String!, isSecret: Boolean!): UpsertDatasetEnvVarResult! deleteEnvVariable(id: DatasetEnvVarID!): DeleteDatasetEnvVarResult! - modifyEnvVariable(id: DatasetEnvVarID!, newValue: String!, isSecret: Boolean!): ModifyDatasetEnvVarResult! } type DatasetFlowConfigs { @@ -1454,20 +1453,6 @@ type MetadataManifestUnsupportedVersion implements CommitResult & CreateDatasetF message: String! } -interface ModifyDatasetEnvVarResult { - message: String! -} - -type ModifyDatasetEnvVarResultNotFound implements ModifyDatasetEnvVarResult { - envVarId: DatasetEnvVarID! - message: String! -} - -type ModifyDatasetEnvVarResultSuccess implements ModifyDatasetEnvVarResult { - envVarId: DatasetEnvVarID! - message: String! -} - enum MqttQos { AT_MOST_ONCE AT_LEAST_ONCE @@ -1698,21 +1683,6 @@ type RevokeResultSuccess implements RevokeResult { message: String! } -interface SaveDatasetEnvVarResult { - message: String! -} - -type SaveDatasetEnvVarResultDuplicate implements SaveDatasetEnvVarResult { - datasetEnvVarKey: String! - datasetName: DatasetName! - message: String! -} - -type SaveDatasetEnvVarResultSuccess implements SaveDatasetEnvVarResult { - envVar: ViewDatasetEnvVar! - message: String! -} - input ScheduleInput @oneOf { timeDelta: TimeDeltaInput """ @@ -1988,6 +1958,24 @@ interface UpdateReadmeResult { message: String! } +interface UpsertDatasetEnvVarResult { + message: String! +} + +type UpsertDatasetEnvVarResultCreated implements UpsertDatasetEnvVarResult { + envVar: ViewDatasetEnvVar! + message: String! +} + +type UpsertDatasetEnvVarResultUpdated implements UpsertDatasetEnvVarResult { + envVar: ViewDatasetEnvVar! + message: String! +} + +type UpsertDatasetEnvVarUpToDate implements UpsertDatasetEnvVarResult { + message: String! +} + type ViewAccessToken { """ Unique identifier of the access token diff --git a/src/adapter/graphql/src/mutations/dataset_env_vars_mut.rs b/src/adapter/graphql/src/mutations/dataset_env_vars_mut.rs index 52d2e3726c..664a217a56 100644 --- a/src/adapter/graphql/src/mutations/dataset_env_vars_mut.rs +++ b/src/adapter/graphql/src/mutations/dataset_env_vars_mut.rs @@ -11,8 +11,7 @@ use kamu_datasets::{ DatasetEnvVarService, DatasetEnvVarValue, DeleteDatasetEnvVarError, - ModifyDatasetEnvVarError, - SaveDatasetEnvVarError, + UpsertDatasetEnvVarStatus, }; use opendatafabric as odf; use secrecy::SecretString; @@ -34,13 +33,13 @@ impl DatasetEnvVarsMut { Self { dataset_handle } } - async fn save_env_variable( + async fn upsert_env_variable( &self, ctx: &Context<'_>, key: String, value: String, is_secret: bool, - ) -> Result { + ) -> Result { utils::check_dataset_write_access(ctx, &self.dataset_handle).await?; let dataset_env_var_service = from_catalog_n!(ctx, dyn DatasetEnvVarService); @@ -51,31 +50,30 @@ impl DatasetEnvVarsMut { DatasetEnvVarValue::Regular(value) }; - match dataset_env_var_service - .create_dataset_env_var( + let upsert_result = dataset_env_var_service + .upsert_dataset_env_var( key.as_str(), &dataset_env_var_value, &self.dataset_handle.id, ) .await - { - Ok(created_dataset_env_var) => Ok(SaveDatasetEnvVarResult::Success( - SaveDatasetEnvVarResultSuccess { - env_var: ViewDatasetEnvVar::new(created_dataset_env_var), - }, - )), - Err(err) => match err { - SaveDatasetEnvVarError::Duplicate(_) => Ok(SaveDatasetEnvVarResult::Duplicate( - SaveDatasetEnvVarResultDuplicate { - dataset_env_var_key: key, - dataset_name: self.dataset_handle.alias.dataset_name.clone().into(), - }, - )), - SaveDatasetEnvVarError::Internal(internal_err) => { - Err(GqlError::Internal(internal_err)) - } - }, - } + .map_err(GqlError::Internal)?; + + Ok(match upsert_result.status { + UpsertDatasetEnvVarStatus::Created => { + UpsertDatasetEnvVarResult::Created(UpsertDatasetEnvVarResultCreated { + env_var: ViewDatasetEnvVar::new(upsert_result.dataset_env_var), + }) + } + UpsertDatasetEnvVarStatus::Updated => { + UpsertDatasetEnvVarResult::Updated(UpsertDatasetEnvVarResultUpdated { + env_var: ViewDatasetEnvVar::new(upsert_result.dataset_env_var), + }) + } + UpsertDatasetEnvVarStatus::UpToDate => { + UpsertDatasetEnvVarResult::UpToDate(UpsertDatasetEnvVarUpToDate) + } + }) } async fn delete_env_variable( @@ -108,65 +106,51 @@ impl DatasetEnvVarsMut { }, } } +} - async fn modify_env_variable( - &self, - ctx: &Context<'_>, - id: DatasetEnvVarID, - new_value: String, - is_secret: bool, - ) -> Result { - utils::check_dataset_write_access(ctx, &self.dataset_handle).await?; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - let dataset_env_var_service = from_catalog_n!(ctx, dyn DatasetEnvVarService); - let dataset_env_var_value = if is_secret { - DatasetEnvVarValue::Secret(SecretString::from(new_value)) - } else { - DatasetEnvVarValue::Regular(new_value) - }; +#[derive(Interface, Debug, Clone)] +#[graphql(field(name = "message", ty = "String"))] +pub enum UpsertDatasetEnvVarResult { + Created(UpsertDatasetEnvVarResultCreated), + Updated(UpsertDatasetEnvVarResultUpdated), + UpToDate(UpsertDatasetEnvVarUpToDate), +} - match dataset_env_var_service - .modify_dataset_env_var(&id.clone().into(), &dataset_env_var_value) - .await - { - Ok(_) => Ok(ModifyDatasetEnvVarResult::Success( - ModifyDatasetEnvVarResultSuccess { - env_var_id: id.clone(), - }, - )), - Err(err) => match err { - ModifyDatasetEnvVarError::NotFound(_) => Ok(ModifyDatasetEnvVarResult::NotFound( - ModifyDatasetEnvVarResultNotFound { - env_var_id: id.clone(), - }, - )), - ModifyDatasetEnvVarError::Internal(internal_err) => { - Err(GqlError::Internal(internal_err)) - } - }, - } +#[derive(Debug, Clone)] +pub struct UpsertDatasetEnvVarUpToDate; + +#[Object] +impl UpsertDatasetEnvVarUpToDate { + pub async fn message(&self) -> String { + "Dataset env var is up to date".to_string() } } -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +#[derive(SimpleObject, Debug, Clone)] +#[graphql(complex)] +pub struct UpsertDatasetEnvVarResultCreated { + pub env_var: ViewDatasetEnvVar, +} -#[derive(Interface, Debug, Clone)] -#[graphql(field(name = "message", ty = "String"))] -pub enum SaveDatasetEnvVarResult { - Success(SaveDatasetEnvVarResultSuccess), - Duplicate(SaveDatasetEnvVarResultDuplicate), +#[ComplexObject] +impl UpsertDatasetEnvVarResultCreated { + async fn message(&self) -> String { + "Created".to_string() + } } #[derive(SimpleObject, Debug, Clone)] #[graphql(complex)] -pub struct SaveDatasetEnvVarResultSuccess { +pub struct UpsertDatasetEnvVarResultUpdated { pub env_var: ViewDatasetEnvVar, } #[ComplexObject] -impl SaveDatasetEnvVarResultSuccess { +impl UpsertDatasetEnvVarResultUpdated { async fn message(&self) -> String { - "Success".to_string() + "Updated".to_string() } } diff --git a/src/adapter/graphql/tests/tests/test_gql_dataset_env_vars.rs b/src/adapter/graphql/tests/tests/test_gql_dataset_env_vars.rs index 69d74b3da7..fccdd54487 100644 --- a/src/adapter/graphql/tests/tests/test_gql_dataset_env_vars.rs +++ b/src/adapter/graphql/tests/tests/test_gql_dataset_env_vars.rs @@ -41,7 +41,7 @@ async fn test_create_and_get_dataset_env_var() { let harness = DatasetEnvVarsHarness::new().await; let created_dataset = harness.create_dataset().await; - let mutation_code = DatasetEnvVarsHarness::create_dataset_env( + let mutation_code = DatasetEnvVarsHarness::upsert_dataset_env( created_dataset.dataset_handle.id.to_string().as_str(), "foo", "foo_value", @@ -63,8 +63,8 @@ async fn test_create_and_get_dataset_env_var() { "datasets": { "byId": { "envVars": { - "saveEnvVariable": { - "message": "Success" + "upsertEnvVariable": { + "message": "Created" } } } @@ -147,7 +147,7 @@ async fn test_delete_dataset_env_var() { let harness = DatasetEnvVarsHarness::new().await; let created_dataset = harness.create_dataset().await; - let mutation_code = DatasetEnvVarsHarness::create_dataset_env( + let mutation_code = DatasetEnvVarsHarness::upsert_dataset_env( created_dataset.dataset_handle.id.to_string().as_str(), "foo", "foo_value", @@ -169,8 +169,8 @@ async fn test_delete_dataset_env_var() { "datasets": { "byId": { "envVars": { - "saveEnvVariable": { - "message": "Success" + "upsertEnvVariable": { + "message": "Created" } } } @@ -227,7 +227,7 @@ async fn test_modify_dataset_env_var() { let harness = DatasetEnvVarsHarness::new().await; let created_dataset = harness.create_dataset().await; - let mutation_code = DatasetEnvVarsHarness::create_dataset_env( + let mutation_code = DatasetEnvVarsHarness::upsert_dataset_env( created_dataset.dataset_handle.id.to_string().as_str(), "foo", "foo_value", @@ -249,8 +249,8 @@ async fn test_modify_dataset_env_var() { "datasets": { "byId": { "envVars": { - "saveEnvVariable": { - "message": "Success" + "upsertEnvVariable": { + "message": "Created" } } } @@ -258,23 +258,9 @@ async fn test_modify_dataset_env_var() { }) ); - let query_code = DatasetEnvVarsHarness::get_dataset_env_vars_with_id( + let mutation_code = DatasetEnvVarsHarness::upsert_dataset_env( created_dataset.dataset_handle.id.to_string().as_str(), - ); - let res = schema - .execute( - async_graphql::Request::new(query_code.clone()) - .data(harness.catalog_authorized.clone()), - ) - .await; - let json = serde_json::to_string(&res.data).unwrap(); - let json = serde_json::from_str::(&json).unwrap(); - let created_dataset_env_var_id = - json["datasets"]["byId"]["envVars"]["listEnvVariables"]["nodes"][0]["id"].clone(); - - let mutation_code = DatasetEnvVarsHarness::modify_dataset_env( - created_dataset.dataset_handle.id.to_string().as_str(), - &created_dataset_env_var_id.to_string(), + "foo", "new_foo_value", false, ); @@ -292,8 +278,8 @@ async fn test_modify_dataset_env_var() { "datasets": { "byId": { "envVars": { - "modifyEnvVariable": { - "message": "Success" + "upsertEnvVariable": { + "message": "Updated" } } } @@ -465,7 +451,7 @@ impl DatasetEnvVarsHarness { .replace("", dataset_env_var_id) } - fn create_dataset_env( + fn upsert_dataset_env( dataset_id: &str, env_var_key: &str, env_var_value: &str, @@ -477,7 +463,7 @@ impl DatasetEnvVarsHarness { datasets { byId(datasetId: "") { envVars { - saveEnvVariable(key: "", value: "", isSecret: ) { + upsertEnvVariable(key: "", value: "", isSecret: ) { message } } @@ -511,31 +497,4 @@ impl DatasetEnvVarsHarness { .replace("", dataset_id) .replace("", env_var_id) } - - fn modify_dataset_env( - dataset_id: &str, - env_var_id: &str, - new_value: &str, - is_secret: bool, - ) -> String { - indoc!( - r#" - mutation { - datasets { - byId(datasetId: "") { - envVars { - modifyEnvVariable(id: , newValue: "", isSecret: ) { - message - } - } - } - } - } - "# - ) - .replace("", dataset_id) - .replace("", env_var_id) - .replace("", new_value) - .replace("", if is_secret { "true" } else { "false" }) - } } diff --git a/src/domain/datasets/domain/src/entities/dataset_env_var.rs b/src/domain/datasets/domain/src/entities/dataset_env_var.rs index edceabe2ba..e77196ef0d 100644 --- a/src/domain/datasets/domain/src/entities/dataset_env_var.rs +++ b/src/domain/datasets/domain/src/entities/dataset_env_var.rs @@ -97,7 +97,7 @@ impl DatasetEnvVar { ) -> Result { if let Some(secret_nonce) = self.secret_nonce.as_ref() { let cipher = Aes256Gcm::new(Key::::from_slice(encryption_key.as_bytes())); - let decypted_value = cipher + let decrypted_value = cipher .decrypt( GenericArray::from_slice(secret_nonce.as_slice()), self.value.as_ref(), @@ -105,7 +105,7 @@ impl DatasetEnvVar { .map_err(|err| DatasetEnvVarEncryptionError::InvalidCipherKeyError { source: Box::new(AesGcmError(err)), })?; - return Ok(std::str::from_utf8(decypted_value.as_slice()) + return Ok(std::str::from_utf8(decrypted_value.as_slice()) .map_err(|err| DatasetEnvVarEncryptionError::InternalError(err.int_err()))? .to_string()); } diff --git a/src/domain/datasets/domain/src/repos/dataset_env_var_repository.rs b/src/domain/datasets/domain/src/repos/dataset_env_var_repository.rs index 775e1bf1dc..17e1c10879 100644 --- a/src/domain/datasets/domain/src/repos/dataset_env_var_repository.rs +++ b/src/domain/datasets/domain/src/repos/dataset_env_var_repository.rs @@ -19,10 +19,10 @@ use crate::DatasetEnvVar; #[async_trait::async_trait] pub trait DatasetEnvVarRepository: Send + Sync { - async fn save_dataset_env_var( + async fn upsert_dataset_env_var( &self, dataset_env_var: &DatasetEnvVar, - ) -> Result<(), SaveDatasetEnvVarError>; + ) -> Result; async fn get_all_dataset_env_vars_count_by_dataset_id( &self, @@ -50,31 +50,6 @@ pub trait DatasetEnvVarRepository: Send + Sync { &self, dataset_env_var_id: &Uuid, ) -> Result<(), DeleteDatasetEnvVarError>; - - async fn modify_dataset_env_var( - &self, - dataset_env_var_id: &Uuid, - new_value: Vec, - secret_nonce: Option>, - ) -> Result<(), ModifyDatasetEnvVarError>; -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -#[derive(Error, Debug)] -pub enum SaveDatasetEnvVarError { - #[error(transparent)] - Duplicate(SaveDatasetEnvVarErrorDuplicate), - - #[error(transparent)] - Internal(#[from] InternalError), -} - -#[derive(Error, Debug)] -#[error("Dataset env var not saved, duplicate {dataset_env_var_key} for dataset {dataset_id}")] -pub struct SaveDatasetEnvVarErrorDuplicate { - pub dataset_env_var_key: String, - pub dataset_id: DatasetID, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -107,13 +82,15 @@ pub enum DeleteDatasetEnvVarError { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -#[derive(Error, Debug)] -pub enum ModifyDatasetEnvVarError { - #[error(transparent)] - NotFound(DatasetEnvVarNotFoundError), - - #[error(transparent)] - Internal(#[from] InternalError), +#[derive(Debug)] +pub struct UpsertDatasetEnvVarResult { + pub id: Uuid, + pub status: UpsertDatasetEnvVarStatus, } -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +#[derive(Debug, PartialEq, Eq)] +pub enum UpsertDatasetEnvVarStatus { + Created, + Updated, + UpToDate, +} diff --git a/src/domain/datasets/domain/src/services/dataset_env_var_service.rs b/src/domain/datasets/domain/src/services/dataset_env_var_service.rs index ceae4a6ff4..b1cdb020cb 100644 --- a/src/domain/datasets/domain/src/services/dataset_env_var_service.rs +++ b/src/domain/datasets/domain/src/services/dataset_env_var_service.rs @@ -17,20 +17,19 @@ use crate::{ DatasetEnvVarValue, DeleteDatasetEnvVarError, GetDatasetEnvVarError, - ModifyDatasetEnvVarError, - SaveDatasetEnvVarError, + UpsertDatasetEnvVarStatus, }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[async_trait::async_trait] pub trait DatasetEnvVarService: Sync + Send { - async fn create_dataset_env_var( + async fn upsert_dataset_env_var( &self, dataset_env_var_key: &str, dataset_env_var_value: &DatasetEnvVarValue, dataset_id: &DatasetID, - ) -> Result; + ) -> Result; async fn get_dataset_env_var_by_id( &self, @@ -52,12 +51,6 @@ pub trait DatasetEnvVarService: Sync + Send { &self, dataset_env_var_id: &Uuid, ) -> Result<(), DeleteDatasetEnvVarError>; - - async fn modify_dataset_env_var( - &self, - dataset_env_var_id: &Uuid, - dataset_env_var_new_value: &DatasetEnvVarValue, - ) -> Result<(), ModifyDatasetEnvVarError>; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -67,4 +60,10 @@ pub struct DatasetEnvVarListing { pub total_count: usize, } +#[derive(Debug)] +pub struct DatasetEnvVarUpsertResult { + pub dataset_env_var: DatasetEnvVar, + pub status: UpsertDatasetEnvVarStatus, +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/datasets/services/src/dataset_env_var_service_impl.rs b/src/domain/datasets/services/src/dataset_env_var_service_impl.rs index c78a25cb7d..ad33e15047 100644 --- a/src/domain/datasets/services/src/dataset_env_var_service_impl.rs +++ b/src/domain/datasets/services/src/dataset_env_var_service_impl.rs @@ -11,18 +11,17 @@ use std::sync::Arc; use database_common::PaginationOpts; use dill::*; -use internal_error::{ErrorIntoInternal, InternalError, ResultIntoInternal}; +use internal_error::{InternalError, ResultIntoInternal}; use kamu_datasets::{ DatasetEnvVar, DatasetEnvVarListing, DatasetEnvVarRepository, DatasetEnvVarService, + DatasetEnvVarUpsertResult, DatasetEnvVarValue, DatasetEnvVarsConfig, DeleteDatasetEnvVarError, GetDatasetEnvVarError, - ModifyDatasetEnvVarError, - SaveDatasetEnvVarError, }; use opendatafabric::DatasetID; use secrecy::{ExposeSecret, SecretString}; @@ -66,24 +65,31 @@ impl DatasetEnvVarServiceImpl { #[async_trait::async_trait] impl DatasetEnvVarService for DatasetEnvVarServiceImpl { - async fn create_dataset_env_var( + async fn upsert_dataset_env_var( &self, dataset_env_var_key: &str, dataset_env_var_value: &DatasetEnvVarValue, dataset_id: &DatasetID, - ) -> Result { - let dataset_env_var = DatasetEnvVar::new( + ) -> Result { + let mut dataset_env_var = DatasetEnvVar::new( dataset_env_var_key, self.time_source.now(), dataset_env_var_value, dataset_id, self.dataset_env_var_encryption_key.expose_secret(), ) - .map_err(|err| SaveDatasetEnvVarError::Internal(err.int_err()))?; - self.dataset_env_var_repository - .save_dataset_env_var(&dataset_env_var) + .int_err()?; + + let upsert_result = self + .dataset_env_var_repository + .upsert_dataset_env_var(&dataset_env_var) .await?; - Ok(dataset_env_var) + dataset_env_var.id = upsert_result.id; + + Ok(DatasetEnvVarUpsertResult { + dataset_env_var, + status: upsert_result.status, + }) } async fn get_exposed_value( @@ -142,30 +148,4 @@ impl DatasetEnvVarService for DatasetEnvVarServiceImpl { .delete_dataset_env_var(dataset_env_var_id) .await } - - async fn modify_dataset_env_var( - &self, - dataset_env_var_id: &Uuid, - dataset_env_var_new_value: &DatasetEnvVarValue, - ) -> Result<(), ModifyDatasetEnvVarError> { - let existing_dataset_env_var = self - .dataset_env_var_repository - .get_dataset_env_var_by_id(dataset_env_var_id) - .await - .map_err(|err| match err { - GetDatasetEnvVarError::NotFound(e) => ModifyDatasetEnvVarError::NotFound(e), - GetDatasetEnvVarError::Internal(e) => ModifyDatasetEnvVarError::Internal(e), - })?; - - let (new_value, nonce) = existing_dataset_env_var - .generate_new_value( - dataset_env_var_new_value, - self.dataset_env_var_encryption_key.expose_secret(), - ) - .int_err() - .map_err(ModifyDatasetEnvVarError::Internal)?; - self.dataset_env_var_repository - .modify_dataset_env_var(dataset_env_var_id, new_value, nonce) - .await - } } diff --git a/src/domain/datasets/services/src/dataset_env_var_service_null.rs b/src/domain/datasets/services/src/dataset_env_var_service_null.rs index c8e4c922b4..0fefd46d6d 100644 --- a/src/domain/datasets/services/src/dataset_env_var_service_null.rs +++ b/src/domain/datasets/services/src/dataset_env_var_service_null.rs @@ -14,11 +14,10 @@ use kamu_datasets::{ DatasetEnvVar, DatasetEnvVarListing, DatasetEnvVarService, + DatasetEnvVarUpsertResult, DatasetEnvVarValue, DeleteDatasetEnvVarError, GetDatasetEnvVarError, - ModifyDatasetEnvVarError, - SaveDatasetEnvVarError, }; use opendatafabric::DatasetID; use uuid::Uuid; @@ -41,12 +40,12 @@ impl DatasetEnvVarServiceNull { #[async_trait::async_trait] impl DatasetEnvVarService for DatasetEnvVarServiceNull { - async fn create_dataset_env_var( + async fn upsert_dataset_env_var( &self, _dataset_env_var_key: &str, _dataset_env_var_value: &DatasetEnvVarValue, _dataset_id: &DatasetID, - ) -> Result { + ) -> Result { unreachable!() } @@ -81,12 +80,4 @@ impl DatasetEnvVarService for DatasetEnvVarServiceNull { ) -> Result<(), DeleteDatasetEnvVarError> { unreachable!() } - - async fn modify_dataset_env_var( - &self, - _dataset_env_var_id: &Uuid, - _dataset_env_var_new_value: &DatasetEnvVarValue, - ) -> Result<(), ModifyDatasetEnvVarError> { - unreachable!() - } } diff --git a/src/domain/datasets/services/tests/tests/mod.rs b/src/domain/datasets/services/tests/tests/mod.rs index 4bd756220b..efb384c401 100644 --- a/src/domain/datasets/services/tests/tests/mod.rs +++ b/src/domain/datasets/services/tests/tests/mod.rs @@ -8,4 +8,5 @@ // by the Apache License, Version 2.0. mod test_dataset_entry_service; +mod test_dataset_env_var_service_impl; mod test_dependency_graph_service_impl; diff --git a/src/domain/datasets/services/tests/tests/test_dataset_env_var_service_impl.rs b/src/domain/datasets/services/tests/tests/test_dataset_env_var_service_impl.rs new file mode 100644 index 0000000000..a263d42a6e --- /dev/null +++ b/src/domain/datasets/services/tests/tests/test_dataset_env_var_service_impl.rs @@ -0,0 +1,172 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::assert_matches::assert_matches; +use std::sync::Arc; + +use dill::{Catalog, CatalogBuilder}; +use kamu_datasets::{ + DatasetEnvVarRepository, + DatasetEnvVarService, + DatasetEnvVarUpsertResult, + DatasetEnvVarValue, + DatasetEnvVarsConfig, + UpsertDatasetEnvVarStatus, +}; +use kamu_datasets_inmem::InMemoryDatasetEnvVarRepository; +use kamu_datasets_services::DatasetEnvVarServiceImpl; +use opendatafabric::DatasetID; +use secrecy::SecretString; +use time_source::SystemTimeSourceDefault; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[test_log::test(tokio::test)] +async fn test_upsert_dataset_env_var() { + let harness = DatasetEnvVarServiceHarness::new(); + + let create_result = harness + .dataset_env_var_service + .upsert_dataset_env_var( + "foo_key", + &DatasetEnvVarValue::Secret(SecretString::from("foo_value")), + &DatasetID::new_seeded_ed25519(b"foo"), + ) + .await; + + assert_matches!( + create_result, + Ok(DatasetEnvVarUpsertResult { + status: UpsertDatasetEnvVarStatus::Created, + .. + }) + ); + + // ToDo: currently we are not checking if the secret values are equal, so we + // are updating the same value in case they are both secret + // The blocker is postgres implementation which requires one more additional + // db request to fetch the value from the db and compare it with the new value + let update_up_to_date_result = harness + .dataset_env_var_service + .upsert_dataset_env_var( + "foo_key", + &DatasetEnvVarValue::Secret(SecretString::from("foo_value")), + &DatasetID::new_seeded_ed25519(b"foo"), + ) + .await; + + assert_matches!( + update_up_to_date_result, + Ok(DatasetEnvVarUpsertResult { + status: UpsertDatasetEnvVarStatus::Updated, + .. + }) + ); + + // Change visibility of env var from secret to regular + let update_modified_result = harness + .dataset_env_var_service + .upsert_dataset_env_var( + "foo_key", + &DatasetEnvVarValue::Regular("foo_value".to_owned()), + &DatasetID::new_seeded_ed25519(b"foo"), + ) + .await; + + assert_matches!( + update_modified_result, + Ok(DatasetEnvVarUpsertResult { + status: UpsertDatasetEnvVarStatus::Updated, + .. + }) + ); + + // Try to modify the regular value of the env var with the same value will + // return up to date + let update_modified_result = harness + .dataset_env_var_service + .upsert_dataset_env_var( + "foo_key", + &DatasetEnvVarValue::Regular("foo_value".to_owned()), + &DatasetID::new_seeded_ed25519(b"foo"), + ) + .await; + + assert_matches!( + update_modified_result, + Ok(DatasetEnvVarUpsertResult { + status: UpsertDatasetEnvVarStatus::UpToDate, + .. + }) + ); + + let update_modified_result = harness + .dataset_env_var_service + .upsert_dataset_env_var( + "foo_key", + &DatasetEnvVarValue::Regular("new_foo_value".to_owned()), + &DatasetID::new_seeded_ed25519(b"foo"), + ) + .await; + + assert_matches!( + update_modified_result, + Ok(DatasetEnvVarUpsertResult { + status: UpsertDatasetEnvVarStatus::Updated, + .. + }) + ); + + // Change visibility of env var back to secret + let update_modified_result = harness + .dataset_env_var_service + .upsert_dataset_env_var( + "foo_key", + &DatasetEnvVarValue::Secret(SecretString::from("new_foo_value")), + &DatasetID::new_seeded_ed25519(b"foo"), + ) + .await; + + assert_matches!( + update_modified_result, + Ok(DatasetEnvVarUpsertResult { + status: UpsertDatasetEnvVarStatus::Updated, + .. + }) + ); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +struct DatasetEnvVarServiceHarness { + _catalog: Catalog, + dataset_env_var_service: Arc, +} + +impl DatasetEnvVarServiceHarness { + fn new() -> Self { + let catalog = { + let mut b = CatalogBuilder::new(); + + b.add::(); + b.add_value(InMemoryDatasetEnvVarRepository::new()); + b.bind::(); + + b.add::(); + b.add_value(DatasetEnvVarsConfig::sample()); + + b.build() + }; + + Self { + dataset_env_var_service: catalog.get_one().unwrap(), + _catalog: catalog, + } + } +} diff --git a/src/infra/datasets/inmem/src/repos/inmem_dataset_env_var_repository.rs b/src/infra/datasets/inmem/src/repos/inmem_dataset_env_var_repository.rs index 65bd9719ab..8e7d1b4b00 100644 --- a/src/infra/datasets/inmem/src/repos/inmem_dataset_env_var_repository.rs +++ b/src/infra/datasets/inmem/src/repos/inmem_dataset_env_var_repository.rs @@ -62,28 +62,49 @@ impl InMemoryDatasetEnvVarRepository { #[async_trait::async_trait] impl DatasetEnvVarRepository for InMemoryDatasetEnvVarRepository { - async fn save_dataset_env_var( + async fn upsert_dataset_env_var( &self, dataset_env_var: &DatasetEnvVar, - ) -> Result<(), SaveDatasetEnvVarError> { + ) -> Result { let mut guard = self.state.lock().unwrap(); - if let Some(existing_dataset_env_var_key_id) = - guard.dataset_env_var_ids_by_keys.get(&dataset_env_var.key) - && guard + + // Modify env var if such exists + if let Some(existing_dataset_env_var_key_id) = guard + .dataset_env_var_ids_by_keys + .get(&dataset_env_var.key) + .copied() + && let Some(existing_dataset_env_var) = guard .dataset_env_vars_by_ids - .get(existing_dataset_env_var_key_id) - .unwrap() - .dataset_id - == dataset_env_var.dataset_id + .get_mut(&existing_dataset_env_var_key_id) + && existing_dataset_env_var.dataset_id == dataset_env_var.dataset_id { - return Err(SaveDatasetEnvVarError::Duplicate( - SaveDatasetEnvVarErrorDuplicate { - dataset_env_var_key: dataset_env_var.key.clone(), - dataset_id: dataset_env_var.dataset_id.clone(), - }, - )); + let mut upsert_status = UpsertDatasetEnvVarStatus::UpToDate; + if existing_dataset_env_var.value != dataset_env_var.value { + existing_dataset_env_var + .value + .clone_from(&dataset_env_var.value); + upsert_status = UpsertDatasetEnvVarStatus::Updated; + } + if (existing_dataset_env_var.secret_nonce.is_none() + && dataset_env_var.secret_nonce.is_some()) + || (existing_dataset_env_var.secret_nonce.is_some() + && dataset_env_var.secret_nonce.is_none()) + { + existing_dataset_env_var + .secret_nonce + .clone_from(&dataset_env_var.secret_nonce); + upsert_status = UpsertDatasetEnvVarStatus::Updated; + } + existing_dataset_env_var + .secret_nonce + .clone_from(&dataset_env_var.secret_nonce); + return Ok(UpsertDatasetEnvVarResult { + id: existing_dataset_env_var.id, + status: upsert_status, + }); } + // Create a new env var guard .dataset_env_vars_by_ids .insert(dataset_env_var.id, dataset_env_var.clone()); @@ -99,7 +120,10 @@ impl DatasetEnvVarRepository for InMemoryDatasetEnvVarRepository { }; dataset_env_vars_entries.push(dataset_env_var.id); - Ok(()) + return Ok(UpsertDatasetEnvVarResult { + id: dataset_env_var.id, + status: UpsertDatasetEnvVarStatus::Created, + }); } async fn get_all_dataset_env_vars_by_dataset_id( @@ -208,27 +232,6 @@ impl DatasetEnvVarRepository for InMemoryDatasetEnvVarRepository { }, )) } - - async fn modify_dataset_env_var( - &self, - dataset_env_var_id: &Uuid, - new_value: Vec, - secret_nonce: Option>, - ) -> Result<(), ModifyDatasetEnvVarError> { - let mut guard = self.state.lock().unwrap(); - if let Some(existing_dataset_env_var) = - guard.dataset_env_vars_by_ids.get_mut(dataset_env_var_id) - { - existing_dataset_env_var.value = new_value; - existing_dataset_env_var.secret_nonce = secret_nonce; - return Ok(()); - } - return Err(ModifyDatasetEnvVarError::NotFound( - DatasetEnvVarNotFoundError { - dataset_env_var_key: dataset_env_var_id.to_string(), - }, - )); - } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/infra/datasets/inmem/tests/repos/test_inmem_dataset_env_var_repository.rs b/src/infra/datasets/inmem/tests/repos/test_inmem_dataset_env_var_repository.rs index 466385806f..55921dfc63 100644 --- a/src/infra/datasets/inmem/tests/repos/test_inmem_dataset_env_var_repository.rs +++ b/src/infra/datasets/inmem/tests/repos/test_inmem_dataset_env_var_repository.rs @@ -49,7 +49,7 @@ database_transactional_test!( database_transactional_test!( storage = inmem, - fixture = dataset_env_var_repo::test_modify_dataset_env_vars, + fixture = dataset_env_var_repo::test_upsert_dataset_env_vars, harness = InMemoryDatasetEnvVarRepositoryHarness ); diff --git a/src/infra/datasets/postgres/.sqlx/query-0a636b42b4d1cd67772d8eb9d624387c3d021b0248fd1a2fea65e4e76bc24d36.json b/src/infra/datasets/postgres/.sqlx/query-0a636b42b4d1cd67772d8eb9d624387c3d021b0248fd1a2fea65e4e76bc24d36.json new file mode 100644 index 0000000000..57609d9735 --- /dev/null +++ b/src/infra/datasets/postgres/.sqlx/query-0a636b42b4d1cd67772d8eb9d624387c3d021b0248fd1a2fea65e4e76bc24d36.json @@ -0,0 +1,39 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO dataset_env_vars (id, key, value, secret_nonce, created_at, dataset_id)\n VALUES ($1, $2, $3, $4, $5, $6)\n ON CONFLICT (key, dataset_id)\n DO UPDATE SET\n value = EXCLUDED.value,\n secret_nonce = CASE\n WHEN dataset_env_vars.secret_nonce IS NULL AND EXCLUDED.secret_nonce IS NOT NULL THEN EXCLUDED.secret_nonce\n WHEN dataset_env_vars.secret_nonce IS NOT NULL AND EXCLUDED.secret_nonce IS NULL THEN NULL\n ELSE EXCLUDED.secret_nonce\n END\n RETURNING xmax = 0 AS is_inserted,\n id,\n (\n SELECT value FROM dataset_env_vars WHERE key = $2 and dataset_id = $6\n ) as \"value: Vec\";\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "is_inserted", + "type_info": "Bool" + }, + { + "ordinal": 1, + "name": "id", + "type_info": "Uuid" + }, + { + "ordinal": 2, + "name": "value: Vec", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Varchar", + "Bytea", + "Bytea", + "Timestamptz", + "Varchar" + ] + }, + "nullable": [ + null, + false, + null + ] + }, + "hash": "0a636b42b4d1cd67772d8eb9d624387c3d021b0248fd1a2fea65e4e76bc24d36" +} diff --git a/src/infra/datasets/postgres/.sqlx/query-a82eb4926ca63888399a8f1eb3335ae09b6fb859545d821ea52f492f5f3168e2.json b/src/infra/datasets/postgres/.sqlx/query-a82eb4926ca63888399a8f1eb3335ae09b6fb859545d821ea52f492f5f3168e2.json deleted file mode 100644 index 353fd3ee5a..0000000000 --- a/src/infra/datasets/postgres/.sqlx/query-a82eb4926ca63888399a8f1eb3335ae09b6fb859545d821ea52f492f5f3168e2.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE dataset_env_vars SET value = $1, secret_nonce = $2 where id = $3\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Bytea", - "Bytea", - "Uuid" - ] - }, - "nullable": [] - }, - "hash": "a82eb4926ca63888399a8f1eb3335ae09b6fb859545d821ea52f492f5f3168e2" -} diff --git a/src/infra/datasets/postgres/.sqlx/query-e7c75f2b36a20c1c1fd30531e0b67cbeb2c89c9db598a30d703bf0a8fadc8d87.json b/src/infra/datasets/postgres/.sqlx/query-e7c75f2b36a20c1c1fd30531e0b67cbeb2c89c9db598a30d703bf0a8fadc8d87.json deleted file mode 100644 index cafbdc785a..0000000000 --- a/src/infra/datasets/postgres/.sqlx/query-e7c75f2b36a20c1c1fd30531e0b67cbeb2c89c9db598a30d703bf0a8fadc8d87.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO dataset_env_vars (id, key, value, secret_nonce, created_at, dataset_id)\n VALUES ($1, $2, $3, $4, $5, $6)\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Uuid", - "Varchar", - "Bytea", - "Bytea", - "Timestamptz", - "Varchar" - ] - }, - "nullable": [] - }, - "hash": "e7c75f2b36a20c1c1fd30531e0b67cbeb2c89c9db598a30d703bf0a8fadc8d87" -} diff --git a/src/infra/datasets/postgres/src/repos/postgres_dataset_env_var_repository.rs b/src/infra/datasets/postgres/src/repos/postgres_dataset_env_var_repository.rs index 4650146ced..9052da7c86 100644 --- a/src/infra/datasets/postgres/src/repos/postgres_dataset_env_var_repository.rs +++ b/src/infra/datasets/postgres/src/repos/postgres_dataset_env_var_repository.rs @@ -9,7 +9,7 @@ use database_common::{PaginationOpts, TransactionRef, TransactionRefT}; use dill::{component, interface}; -use internal_error::{ErrorIntoInternal, ResultIntoInternal}; +use internal_error::{InternalError, ResultIntoInternal}; use opendatafabric::DatasetID; use uuid::Uuid; @@ -33,19 +33,31 @@ impl PostgresDatasetEnvVarRepository { #[async_trait::async_trait] impl DatasetEnvVarRepository for PostgresDatasetEnvVarRepository { - async fn save_dataset_env_var( + async fn upsert_dataset_env_var( &self, dataset_env_var: &DatasetEnvVar, - ) -> Result<(), SaveDatasetEnvVarError> { + ) -> Result { let mut tr = self.transaction.lock().await; - let connection_mut = tr.connection_mut().await?; - sqlx::query!( + let result = sqlx::query!( r#" INSERT INTO dataset_env_vars (id, key, value, secret_nonce, created_at, dataset_id) - VALUES ($1, $2, $3, $4, $5, $6) - "#, + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (key, dataset_id) + DO UPDATE SET + value = EXCLUDED.value, + secret_nonce = CASE + WHEN dataset_env_vars.secret_nonce IS NULL AND EXCLUDED.secret_nonce IS NOT NULL THEN EXCLUDED.secret_nonce + WHEN dataset_env_vars.secret_nonce IS NOT NULL AND EXCLUDED.secret_nonce IS NULL THEN NULL + ELSE EXCLUDED.secret_nonce + END + RETURNING xmax = 0 AS is_inserted, + id, + ( + SELECT value FROM dataset_env_vars WHERE key = $2 and dataset_id = $6 + ) as "value: Vec"; + "#, dataset_env_var.id, dataset_env_var.key, dataset_env_var.value, @@ -53,23 +65,22 @@ impl DatasetEnvVarRepository for PostgresDatasetEnvVarRepository { dataset_env_var.created_at, dataset_env_var.dataset_id.to_string(), ) - .execute(connection_mut) + .fetch_one(connection_mut) .await - .map_err(|e: sqlx::Error| match e { - sqlx::Error::Database(e) => { - if e.is_unique_violation() { - SaveDatasetEnvVarError::Duplicate(SaveDatasetEnvVarErrorDuplicate { - dataset_env_var_key: dataset_env_var.key.clone(), - dataset_id: dataset_env_var.dataset_id.clone(), - }) - } else { - SaveDatasetEnvVarError::Internal(e.int_err()) - } - } - _ => SaveDatasetEnvVarError::Internal(e.int_err()), - })?; + .int_err()?; - Ok(()) + let status = if result.is_inserted.unwrap_or(false) { + UpsertDatasetEnvVarStatus::Created + } else if dataset_env_var.value == result.value.unwrap() { + UpsertDatasetEnvVarStatus::UpToDate + } else { + UpsertDatasetEnvVarStatus::Updated + }; + + Ok(UpsertDatasetEnvVarResult { + id: result.id, + status, + }) } async fn get_all_dataset_env_vars_by_dataset_id( @@ -234,38 +245,6 @@ impl DatasetEnvVarRepository for PostgresDatasetEnvVarRepository { } Ok(()) } - - async fn modify_dataset_env_var( - &self, - dataset_env_var_id: &Uuid, - new_value: Vec, - secret_nonce: Option>, - ) -> Result<(), ModifyDatasetEnvVarError> { - let mut tr = self.transaction.lock().await; - - let connection_mut = tr.connection_mut().await?; - - let update_result = sqlx::query!( - r#" - UPDATE dataset_env_vars SET value = $1, secret_nonce = $2 where id = $3 - "#, - new_value, - secret_nonce, - dataset_env_var_id, - ) - .execute(&mut *connection_mut) - .await - .int_err()?; - - if update_result.rows_affected() == 0 { - return Err(ModifyDatasetEnvVarError::NotFound( - DatasetEnvVarNotFoundError { - dataset_env_var_key: dataset_env_var_id.to_string(), - }, - )); - } - Ok(()) - } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/infra/datasets/postgres/tests/repos/test_postgres_dataset_env_var_repository.rs b/src/infra/datasets/postgres/tests/repos/test_postgres_dataset_env_var_repository.rs index 548d6f432e..29d3c79eeb 100644 --- a/src/infra/datasets/postgres/tests/repos/test_postgres_dataset_env_var_repository.rs +++ b/src/infra/datasets/postgres/tests/repos/test_postgres_dataset_env_var_repository.rs @@ -51,7 +51,7 @@ database_transactional_test!( database_transactional_test!( storage = postgres, - fixture = dataset_env_var_repo::test_modify_dataset_env_vars, + fixture = dataset_env_var_repo::test_upsert_dataset_env_vars, harness = PostgresDatasetEnvVarRepositoryHarness ); diff --git a/src/infra/datasets/repo-tests/src/dataset_env_var_repository_test_suite.rs b/src/infra/datasets/repo-tests/src/dataset_env_var_repository_test_suite.rs index ba8510411c..f06d80a2d9 100644 --- a/src/infra/datasets/repo-tests/src/dataset_env_var_repository_test_suite.rs +++ b/src/infra/datasets/repo-tests/src/dataset_env_var_repository_test_suite.rs @@ -20,7 +20,7 @@ use kamu_datasets::{ DatasetEnvVarValue, DeleteDatasetEnvVarError, GetDatasetEnvVarError, - ModifyDatasetEnvVarError, + UpsertDatasetEnvVarStatus, SAMPLE_DATASET_ENV_VAR_ENCRYPTION_KEY, }; use opendatafabric::DatasetID; @@ -90,7 +90,7 @@ pub async fn test_insert_and_get_dataset_env_var(catalog: &Catalog) { ) .unwrap(); let save_result = dataset_env_var_repo - .save_dataset_env_var(&new_dataset_env_var) + .upsert_dataset_env_var(&new_dataset_env_var) .await; assert!(save_result.is_ok()); @@ -164,11 +164,11 @@ pub async fn test_insert_and_get_multiple_dataset_env_vars(catalog: &Catalog) { .unwrap(); let save_result = dataset_env_var_repo - .save_dataset_env_var(&new_dataset_env_var) + .upsert_dataset_env_var(&new_dataset_env_var) .await; assert!(save_result.is_ok()); let save_result = dataset_env_var_repo - .save_dataset_env_var(&new_secret_dataset_env_var) + .upsert_dataset_env_var(&new_secret_dataset_env_var) .await; assert!(save_result.is_ok()); @@ -229,11 +229,11 @@ pub async fn test_delete_dataset_env_vars(catalog: &Catalog) { ) .unwrap(); let save_result = dataset_env_var_repo - .save_dataset_env_var(&new_dataset_env_var) + .upsert_dataset_env_var(&new_dataset_env_var) .await; assert!(save_result.is_ok()); let save_result = dataset_env_var_repo - .save_dataset_env_var(&new_bar_dataset_env_var) + .upsert_dataset_env_var(&new_bar_dataset_env_var) .await; assert!(save_result.is_ok()); @@ -265,7 +265,7 @@ pub async fn test_delete_dataset_env_vars(catalog: &Catalog) { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -pub async fn test_modify_dataset_env_vars(catalog: &Catalog) { +pub async fn test_upsert_dataset_env_vars(catalog: &Catalog) { let dataset_env_var_repo = catalog.get_one::().unwrap(); let dataset_entry_repo = catalog.get_one::().unwrap(); let account_repo = catalog.get_one::().unwrap(); @@ -278,40 +278,38 @@ pub async fn test_modify_dataset_env_vars(catalog: &Catalog) { .await .unwrap(); - let new_dataset_env_var = DatasetEnvVar::new( + let mut new_dataset_env_var = DatasetEnvVar::new( "foo", Utc::now().round_subsecs(6), - &DatasetEnvVarValue::Regular("foo".to_string()), + &DatasetEnvVarValue::Secret(SecretString::from("foo")), &entry_foo.id, SAMPLE_DATASET_ENV_VAR_ENCRYPTION_KEY, ) .unwrap(); - let save_result = dataset_env_var_repo - .save_dataset_env_var(&new_dataset_env_var) + let upsert_result = dataset_env_var_repo + .upsert_dataset_env_var(&new_dataset_env_var) .await; - assert!(save_result.is_ok()); + assert_matches!(upsert_result, Ok(res) if res.status == UpsertDatasetEnvVarStatus::Created); - let modify_result = dataset_env_var_repo - .modify_dataset_env_var(&Uuid::new_v4(), vec![], None) + let upsert_result = dataset_env_var_repo + .upsert_dataset_env_var(&new_dataset_env_var) .await; + assert_matches!(upsert_result, Ok(res) if res.status == UpsertDatasetEnvVarStatus::UpToDate); - assert_matches!(modify_result, Err(ModifyDatasetEnvVarError::NotFound(_))); let (new_value, new_nonce) = new_dataset_env_var .generate_new_value( &DatasetEnvVarValue::Regular("new_foo".to_string()), SAMPLE_DATASET_ENV_VAR_ENCRYPTION_KEY, ) .unwrap(); + new_dataset_env_var.value.clone_from(&new_value); + new_dataset_env_var.secret_nonce.clone_from(&new_nonce); - let modify_result = dataset_env_var_repo - .modify_dataset_env_var( - &new_dataset_env_var.id, - new_value.clone(), - new_nonce.clone(), - ) + let upsert_result = dataset_env_var_repo + .upsert_dataset_env_var(&new_dataset_env_var) .await; - assert!(modify_result.is_ok()); + assert_matches!(upsert_result, Ok(res) if res.status == UpsertDatasetEnvVarStatus::Updated); let db_dataset_env_var = dataset_env_var_repo .get_dataset_env_var_by_id(&new_dataset_env_var.id) @@ -358,11 +356,11 @@ pub async fn test_delete_all_dataset_env_vars(catalog: &Catalog) { ) .unwrap(); let save_result = dataset_env_var_repo - .save_dataset_env_var(&new_dataset_env_var) + .upsert_dataset_env_var(&new_dataset_env_var) .await; assert!(save_result.is_ok()); let save_result = dataset_env_var_repo - .save_dataset_env_var(&new_bar_dataset_env_var) + .upsert_dataset_env_var(&new_bar_dataset_env_var) .await; assert!(save_result.is_ok()); diff --git a/src/infra/datasets/sqlite/.sqlx/query-0739a2ee5162394a888a644efd6fca0f03c0f7b5ffa11ba0236ab1b0a0324eef.json b/src/infra/datasets/sqlite/.sqlx/query-0739a2ee5162394a888a644efd6fca0f03c0f7b5ffa11ba0236ab1b0a0324eef.json new file mode 100644 index 0000000000..4fba5658bc --- /dev/null +++ b/src/infra/datasets/sqlite/.sqlx/query-0739a2ee5162394a888a644efd6fca0f03c0f7b5ffa11ba0236ab1b0a0324eef.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n INSERT INTO dataset_env_vars (id, key, value, secret_nonce, created_at, dataset_id)\n VALUES ($1, $2, $3, $4, $5, $6)\n ON CONFLICT (key, dataset_id)\n DO UPDATE SET\n value = EXCLUDED.value,\n secret_nonce = CASE\n WHEN dataset_env_vars.secret_nonce IS NULL AND excluded.secret_nonce IS NOT NULL THEN excluded.secret_nonce\n WHEN dataset_env_vars.secret_nonce IS NOT NULL AND excluded.secret_nonce IS NULL THEN NULL\n ELSE excluded.secret_nonce\n END\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 6 + }, + "nullable": [] + }, + "hash": "0739a2ee5162394a888a644efd6fca0f03c0f7b5ffa11ba0236ab1b0a0324eef" +} diff --git a/src/infra/datasets/sqlite/.sqlx/query-140e91ce459e7759cbd0f57dc8f723cbffa51b784536d2045db121a64d795e2e.json b/src/infra/datasets/sqlite/.sqlx/query-140e91ce459e7759cbd0f57dc8f723cbffa51b784536d2045db121a64d795e2e.json new file mode 100644 index 0000000000..b6292b2563 --- /dev/null +++ b/src/infra/datasets/sqlite/.sqlx/query-140e91ce459e7759cbd0f57dc8f723cbffa51b784536d2045db121a64d795e2e.json @@ -0,0 +1,50 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT\n id as \"id: Uuid\",\n key,\n value as \"value: _\",\n secret_nonce as \"secret_nonce: _\",\n created_at as \"created_at: _\",\n dataset_id as \"dataset_id: _\"\n FROM dataset_env_vars\n WHERE key = $1 and dataset_id = $2\n ", + "describe": { + "columns": [ + { + "name": "id: Uuid", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "key", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "value: _", + "ordinal": 2, + "type_info": "Blob" + }, + { + "name": "secret_nonce: _", + "ordinal": 3, + "type_info": "Blob" + }, + { + "name": "created_at: _", + "ordinal": 4, + "type_info": "Null" + }, + { + "name": "dataset_id: _", + "ordinal": 5, + "type_info": "Text" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [ + false, + false, + false, + true, + false, + false + ] + }, + "hash": "140e91ce459e7759cbd0f57dc8f723cbffa51b784536d2045db121a64d795e2e" +} diff --git a/src/infra/datasets/sqlite/.sqlx/query-a82eb4926ca63888399a8f1eb3335ae09b6fb859545d821ea52f492f5f3168e2.json b/src/infra/datasets/sqlite/.sqlx/query-a82eb4926ca63888399a8f1eb3335ae09b6fb859545d821ea52f492f5f3168e2.json deleted file mode 100644 index 9760ae9d6c..0000000000 --- a/src/infra/datasets/sqlite/.sqlx/query-a82eb4926ca63888399a8f1eb3335ae09b6fb859545d821ea52f492f5f3168e2.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n UPDATE dataset_env_vars SET value = $1, secret_nonce = $2 where id = $3\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 3 - }, - "nullable": [] - }, - "hash": "a82eb4926ca63888399a8f1eb3335ae09b6fb859545d821ea52f492f5f3168e2" -} diff --git a/src/infra/datasets/sqlite/.sqlx/query-e7c75f2b36a20c1c1fd30531e0b67cbeb2c89c9db598a30d703bf0a8fadc8d87.json b/src/infra/datasets/sqlite/.sqlx/query-e7c75f2b36a20c1c1fd30531e0b67cbeb2c89c9db598a30d703bf0a8fadc8d87.json deleted file mode 100644 index aa56d7440f..0000000000 --- a/src/infra/datasets/sqlite/.sqlx/query-e7c75f2b36a20c1c1fd30531e0b67cbeb2c89c9db598a30d703bf0a8fadc8d87.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n INSERT INTO dataset_env_vars (id, key, value, secret_nonce, created_at, dataset_id)\n VALUES ($1, $2, $3, $4, $5, $6)\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 6 - }, - "nullable": [] - }, - "hash": "e7c75f2b36a20c1c1fd30531e0b67cbeb2c89c9db598a30d703bf0a8fadc8d87" -} diff --git a/src/infra/datasets/sqlite/src/lib.rs b/src/infra/datasets/sqlite/src/lib.rs index 3a5b60937e..5fba9310d8 100644 --- a/src/infra/datasets/sqlite/src/lib.rs +++ b/src/infra/datasets/sqlite/src/lib.rs @@ -7,6 +7,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +#![feature(let_chains)] + // Re-exports pub use kamu_datasets as domain; diff --git a/src/infra/datasets/sqlite/src/repos/sqlite_dataset_env_var_repository.rs b/src/infra/datasets/sqlite/src/repos/sqlite_dataset_env_var_repository.rs index 229d0b9387..4df4013fe5 100644 --- a/src/infra/datasets/sqlite/src/repos/sqlite_dataset_env_var_repository.rs +++ b/src/infra/datasets/sqlite/src/repos/sqlite_dataset_env_var_repository.rs @@ -9,7 +9,7 @@ use database_common::{PaginationOpts, TransactionRef, TransactionRefT}; use dill::{component, interface}; -use internal_error::{ErrorIntoInternal, ResultIntoInternal}; +use internal_error::{InternalError, ResultIntoInternal}; use opendatafabric::DatasetID; use uuid::Uuid; @@ -33,50 +33,75 @@ impl SqliteDatasetEnvVarRepository { #[async_trait::async_trait] impl DatasetEnvVarRepository for SqliteDatasetEnvVarRepository { - async fn save_dataset_env_var( + async fn upsert_dataset_env_var( &self, dataset_env_var: &DatasetEnvVar, - ) -> Result<(), SaveDatasetEnvVarError> { + ) -> Result { let mut tr = self.transaction.lock().await; - let connection_mut = tr.connection_mut().await?; - let dataset_env_var_id = dataset_env_var.id; - let dataset_env_var_key = &dataset_env_var.key; - let dataset_env_var_value = &dataset_env_var.value; - let dataset_env_var_secret_nonce = &dataset_env_var.secret_nonce; - let dataset_env_var_created_at = dataset_env_var.created_at; let dataset_env_var_dataset_id = dataset_env_var.dataset_id.to_string(); + let old_record = sqlx::query_as!( + DatasetEnvVarRowModel, + r#" + SELECT + id as "id: Uuid", + key, + value as "value: _", + secret_nonce as "secret_nonce: _", + created_at as "created_at: _", + dataset_id as "dataset_id: _" + FROM dataset_env_vars + WHERE key = $1 and dataset_id = $2 + "#, + dataset_env_var.key, + dataset_env_var_dataset_id, + ) + .fetch_optional(&mut *connection_mut) + .await + .int_err()?; + + // ToDo compare decrypted value once postgres implementation is done + if let Some(record) = &old_record + && dataset_env_var.value == record.value + { + return Ok(UpsertDatasetEnvVarResult { + id: record.id, + status: UpsertDatasetEnvVarStatus::UpToDate, + }); + } sqlx::query!( r#" INSERT INTO dataset_env_vars (id, key, value, secret_nonce, created_at, dataset_id) - VALUES ($1, $2, $3, $4, $5, $6) - "#, - dataset_env_var_id, - dataset_env_var_key, - dataset_env_var_value, - dataset_env_var_secret_nonce, - dataset_env_var_created_at, + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (key, dataset_id) + DO UPDATE SET + value = EXCLUDED.value, + secret_nonce = CASE + WHEN dataset_env_vars.secret_nonce IS NULL AND excluded.secret_nonce IS NOT NULL THEN excluded.secret_nonce + WHEN dataset_env_vars.secret_nonce IS NOT NULL AND excluded.secret_nonce IS NULL THEN NULL + ELSE excluded.secret_nonce + END + "#, + dataset_env_var.id, + dataset_env_var.key, + dataset_env_var.value, + dataset_env_var.secret_nonce, + dataset_env_var.created_at, dataset_env_var_dataset_id, ) .execute(connection_mut) .await - .map_err(|e: sqlx::Error| match e { - sqlx::Error::Database(e) => { - if e.is_unique_violation() { - SaveDatasetEnvVarError::Duplicate(SaveDatasetEnvVarErrorDuplicate { - dataset_env_var_key: dataset_env_var.key.clone(), - dataset_id: dataset_env_var.dataset_id.clone(), - }) - } else { - SaveDatasetEnvVarError::Internal(e.int_err()) - } - } - _ => SaveDatasetEnvVarError::Internal(e.int_err()), - })?; + .int_err()?; - Ok(()) + let (id, status) = if let Some(record) = old_record { + (record.id, UpsertDatasetEnvVarStatus::Updated) + } else { + (dataset_env_var.id, UpsertDatasetEnvVarStatus::Created) + }; + + Ok(UpsertDatasetEnvVarResult { id, status }) } async fn get_all_dataset_env_vars_by_dataset_id( @@ -251,38 +276,6 @@ impl DatasetEnvVarRepository for SqliteDatasetEnvVarRepository { } Ok(()) } - - async fn modify_dataset_env_var( - &self, - dataset_env_var_id: &Uuid, - new_value: Vec, - secret_nonce: Option>, - ) -> Result<(), ModifyDatasetEnvVarError> { - let mut tr = self.transaction.lock().await; - - let connection_mut = tr.connection_mut().await?; - - let update_result = sqlx::query!( - r#" - UPDATE dataset_env_vars SET value = $1, secret_nonce = $2 where id = $3 - "#, - new_value, - secret_nonce, - dataset_env_var_id, - ) - .execute(&mut *connection_mut) - .await - .int_err()?; - - if update_result.rows_affected() == 0 { - return Err(ModifyDatasetEnvVarError::NotFound( - DatasetEnvVarNotFoundError { - dataset_env_var_key: dataset_env_var_id.to_string(), - }, - )); - } - Ok(()) - } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/infra/datasets/sqlite/tests/repos/test_sqlite_dataset_env_var_repository.rs b/src/infra/datasets/sqlite/tests/repos/test_sqlite_dataset_env_var_repository.rs index 2117dc000a..56722e0ef7 100644 --- a/src/infra/datasets/sqlite/tests/repos/test_sqlite_dataset_env_var_repository.rs +++ b/src/infra/datasets/sqlite/tests/repos/test_sqlite_dataset_env_var_repository.rs @@ -51,7 +51,7 @@ database_transactional_test!( database_transactional_test!( storage = sqlite, - fixture = dataset_env_var_repo::test_modify_dataset_env_vars, + fixture = dataset_env_var_repo::test_delete_all_dataset_env_vars, harness = SqliteDatasetEnvVarRepositoryHarness ); @@ -59,7 +59,7 @@ database_transactional_test!( database_transactional_test!( storage = sqlite, - fixture = dataset_env_var_repo::test_delete_all_dataset_env_vars, + fixture = dataset_env_var_repo::test_upsert_dataset_env_vars, harness = SqliteDatasetEnvVarRepositoryHarness );