Skip to content

Commit

Permalink
Modify dataset env var API to upsert style (#1015)
Browse files Browse the repository at this point in the history
* Modify dataset env var API to upsert style

* Add service tests
  • Loading branch information
rmn-boiko authored Jan 8, 2025
1 parent 8175e2b commit affeca4
Show file tree
Hide file tree
Showing 26 changed files with 559 additions and 490 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ Recommendation: for ease of reading, use the following order:

## [Unreleased]
### Changed
- GraphQL: accountListFlows returns list sorted by status and last event time
- GraphQL: flows are listed ordered by status and last event time
- Merged two methods(`saveEnvVariable` and `modifyEnvVariable`) from `DatasetEnvVarsMut` info one `upsertEnvVariable`
### Fixed
- GQL api flows queries now fetch dataset polling source only once per dataset(and only if Ingest flow type is here)
- Flow trigger status now become disable on flow fail
Expand Down
50 changes: 19 additions & 31 deletions resources/schema.gql
Original file line number Diff line number Diff line change
Expand Up @@ -560,9 +560,8 @@ type DatasetEnvVars {
}

type DatasetEnvVarsMut {
saveEnvVariable(key: String!, value: String!, isSecret: Boolean!): SaveDatasetEnvVarResult!
upsertEnvVariable(key: String!, value: String!, isSecret: Boolean!): UpsertDatasetEnvVarResult!
deleteEnvVariable(id: DatasetEnvVarID!): DeleteDatasetEnvVarResult!
modifyEnvVariable(id: DatasetEnvVarID!, newValue: String!, isSecret: Boolean!): ModifyDatasetEnvVarResult!
}

type DatasetFlowConfigs {
Expand Down Expand Up @@ -1454,20 +1453,6 @@ type MetadataManifestUnsupportedVersion implements CommitResult & CreateDatasetF
message: String!
}

interface ModifyDatasetEnvVarResult {
message: String!
}

type ModifyDatasetEnvVarResultNotFound implements ModifyDatasetEnvVarResult {
envVarId: DatasetEnvVarID!
message: String!
}

type ModifyDatasetEnvVarResultSuccess implements ModifyDatasetEnvVarResult {
envVarId: DatasetEnvVarID!
message: String!
}

enum MqttQos {
AT_MOST_ONCE
AT_LEAST_ONCE
Expand Down Expand Up @@ -1698,21 +1683,6 @@ type RevokeResultSuccess implements RevokeResult {
message: String!
}

interface SaveDatasetEnvVarResult {
message: String!
}

type SaveDatasetEnvVarResultDuplicate implements SaveDatasetEnvVarResult {
datasetEnvVarKey: String!
datasetName: DatasetName!
message: String!
}

type SaveDatasetEnvVarResultSuccess implements SaveDatasetEnvVarResult {
envVar: ViewDatasetEnvVar!
message: String!
}

input ScheduleInput @oneOf {
timeDelta: TimeDeltaInput
"""
Expand Down Expand Up @@ -1988,6 +1958,24 @@ interface UpdateReadmeResult {
message: String!
}

interface UpsertDatasetEnvVarResult {
message: String!
}

type UpsertDatasetEnvVarResultCreated implements UpsertDatasetEnvVarResult {
envVar: ViewDatasetEnvVar!
message: String!
}

type UpsertDatasetEnvVarResultUpdated implements UpsertDatasetEnvVarResult {
envVar: ViewDatasetEnvVar!
message: String!
}

type UpsertDatasetEnvVarUpToDate implements UpsertDatasetEnvVarResult {
message: String!
}

type ViewAccessToken {
"""
Unique identifier of the access token
Expand Down
118 changes: 51 additions & 67 deletions src/adapter/graphql/src/mutations/dataset_env_vars_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ use kamu_datasets::{
DatasetEnvVarService,
DatasetEnvVarValue,
DeleteDatasetEnvVarError,
ModifyDatasetEnvVarError,
SaveDatasetEnvVarError,
UpsertDatasetEnvVarStatus,
};
use opendatafabric as odf;
use secrecy::SecretString;
Expand All @@ -34,13 +33,13 @@ impl DatasetEnvVarsMut {
Self { dataset_handle }
}

async fn save_env_variable(
async fn upsert_env_variable(
&self,
ctx: &Context<'_>,
key: String,
value: String,
is_secret: bool,
) -> Result<SaveDatasetEnvVarResult> {
) -> Result<UpsertDatasetEnvVarResult> {
utils::check_dataset_write_access(ctx, &self.dataset_handle).await?;

let dataset_env_var_service = from_catalog_n!(ctx, dyn DatasetEnvVarService);
Expand All @@ -51,31 +50,30 @@ impl DatasetEnvVarsMut {
DatasetEnvVarValue::Regular(value)
};

match dataset_env_var_service
.create_dataset_env_var(
let upsert_result = dataset_env_var_service
.upsert_dataset_env_var(
key.as_str(),
&dataset_env_var_value,
&self.dataset_handle.id,
)
.await
{
Ok(created_dataset_env_var) => Ok(SaveDatasetEnvVarResult::Success(
SaveDatasetEnvVarResultSuccess {
env_var: ViewDatasetEnvVar::new(created_dataset_env_var),
},
)),
Err(err) => match err {
SaveDatasetEnvVarError::Duplicate(_) => Ok(SaveDatasetEnvVarResult::Duplicate(
SaveDatasetEnvVarResultDuplicate {
dataset_env_var_key: key,
dataset_name: self.dataset_handle.alias.dataset_name.clone().into(),
},
)),
SaveDatasetEnvVarError::Internal(internal_err) => {
Err(GqlError::Internal(internal_err))
}
},
}
.map_err(GqlError::Internal)?;

Ok(match upsert_result.status {
UpsertDatasetEnvVarStatus::Created => {
UpsertDatasetEnvVarResult::Created(UpsertDatasetEnvVarResultCreated {
env_var: ViewDatasetEnvVar::new(upsert_result.dataset_env_var),
})
}
UpsertDatasetEnvVarStatus::Updated => {
UpsertDatasetEnvVarResult::Updated(UpsertDatasetEnvVarResultUpdated {
env_var: ViewDatasetEnvVar::new(upsert_result.dataset_env_var),
})
}
UpsertDatasetEnvVarStatus::UpToDate => {
UpsertDatasetEnvVarResult::UpToDate(UpsertDatasetEnvVarUpToDate)
}
})
}

async fn delete_env_variable(
Expand Down Expand Up @@ -108,65 +106,51 @@ impl DatasetEnvVarsMut {
},
}
}
}

async fn modify_env_variable(
&self,
ctx: &Context<'_>,
id: DatasetEnvVarID,
new_value: String,
is_secret: bool,
) -> Result<ModifyDatasetEnvVarResult> {
utils::check_dataset_write_access(ctx, &self.dataset_handle).await?;
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

let dataset_env_var_service = from_catalog_n!(ctx, dyn DatasetEnvVarService);
let dataset_env_var_value = if is_secret {
DatasetEnvVarValue::Secret(SecretString::from(new_value))
} else {
DatasetEnvVarValue::Regular(new_value)
};
#[derive(Interface, Debug, Clone)]
#[graphql(field(name = "message", ty = "String"))]
pub enum UpsertDatasetEnvVarResult {
Created(UpsertDatasetEnvVarResultCreated),
Updated(UpsertDatasetEnvVarResultUpdated),
UpToDate(UpsertDatasetEnvVarUpToDate),
}

match dataset_env_var_service
.modify_dataset_env_var(&id.clone().into(), &dataset_env_var_value)
.await
{
Ok(_) => Ok(ModifyDatasetEnvVarResult::Success(
ModifyDatasetEnvVarResultSuccess {
env_var_id: id.clone(),
},
)),
Err(err) => match err {
ModifyDatasetEnvVarError::NotFound(_) => Ok(ModifyDatasetEnvVarResult::NotFound(
ModifyDatasetEnvVarResultNotFound {
env_var_id: id.clone(),
},
)),
ModifyDatasetEnvVarError::Internal(internal_err) => {
Err(GqlError::Internal(internal_err))
}
},
}
#[derive(Debug, Clone)]
pub struct UpsertDatasetEnvVarUpToDate;

#[Object]
impl UpsertDatasetEnvVarUpToDate {
pub async fn message(&self) -> String {
"Dataset env var is up to date".to_string()
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(SimpleObject, Debug, Clone)]
#[graphql(complex)]
pub struct UpsertDatasetEnvVarResultCreated {
pub env_var: ViewDatasetEnvVar,
}

#[derive(Interface, Debug, Clone)]
#[graphql(field(name = "message", ty = "String"))]
pub enum SaveDatasetEnvVarResult {
Success(SaveDatasetEnvVarResultSuccess),
Duplicate(SaveDatasetEnvVarResultDuplicate),
#[ComplexObject]
impl UpsertDatasetEnvVarResultCreated {
async fn message(&self) -> String {
"Created".to_string()
}
}

#[derive(SimpleObject, Debug, Clone)]
#[graphql(complex)]
pub struct SaveDatasetEnvVarResultSuccess {
pub struct UpsertDatasetEnvVarResultUpdated {
pub env_var: ViewDatasetEnvVar,
}

#[ComplexObject]
impl SaveDatasetEnvVarResultSuccess {
impl UpsertDatasetEnvVarResultUpdated {
async fn message(&self) -> String {
"Success".to_string()
"Updated".to_string()
}
}

Expand Down
Loading

0 comments on commit affeca4

Please sign in to comment.