Skip to content

Commit

Permalink
refactor: rename plugin activate/deactivate to enable/disable
Browse files Browse the repository at this point in the history
Closes #25789
  • Loading branch information
pauldix committed Jan 11, 2025
1 parent 1ff4f76 commit 51e0b52
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl Config {

#[derive(Debug, clap::Subcommand)]
enum SubCommand {
/// Deactivate a plugin trigger
/// Disable a plugin trigger
Trigger(TriggerConfig),
}

Expand All @@ -41,7 +41,7 @@ struct TriggerConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,

/// Name of trigger to deactivate
/// Name of trigger to disable
#[clap(required = true)]
trigger_name: String,
}
Expand All @@ -54,9 +54,9 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
trigger_name,
}) => {
client
.api_v3_configure_processing_engine_trigger_deactivate(database_name, &trigger_name)
.api_v3_configure_processing_engine_trigger_disable(database_name, &trigger_name)
.await?;
println!("Trigger {} deactivated successfully", trigger_name);
println!("Trigger {} disabled successfully", trigger_name);
}
}
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl Config {

#[derive(Debug, clap::Subcommand)]
enum SubCommand {
/// Activate a trigger to enable plugin execution
/// Enable a trigger to enable plugin execution
Trigger(TriggerConfig),
}

Expand All @@ -41,7 +41,7 @@ struct TriggerConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,

/// Name of trigger to manage
/// Name of trigger to enable
#[clap(required = true)]
trigger_name: String,
}
Expand All @@ -54,9 +54,9 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
trigger_name,
}) => {
client
.api_v3_configure_processing_engine_trigger_activate(database_name, &trigger_name)
.api_v3_configure_processing_engine_trigger_enable(database_name, &trigger_name)
.await?;
println!("Trigger {} activated successfully", trigger_name);
println!("Trigger {} enabled successfully", trigger_name);
}
}
Ok(())
Expand Down
24 changes: 12 additions & 12 deletions influxdb3/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ use trogging::{
};

mod commands {
pub mod activate;
pub(crate) mod common;
pub mod create;
pub mod deactivate;
pub mod delete;
pub mod disable;
pub mod enable;
pub mod query;
pub mod serve;
pub mod show;
Expand Down Expand Up @@ -84,14 +84,14 @@ struct Config {
#[derive(Debug, clap::Parser)]
#[allow(clippy::large_enum_variant)]
enum Command {
/// Activate a resource such as a trigger
Activate(commands::activate::Config),
/// Enable a resource such as a trigger
Enable(commands::enable::Config),

/// Create a resource such as a database or auth token
Create(commands::create::Config),

/// Deactivate a resource such as a trigger
Deactivate(commands::deactivate::Config),
/// Disable a resource such as a trigger
Disable(commands::disable::Config),

/// Delete a resource such as a database or table
Delete(commands::delete::Config),
Expand Down Expand Up @@ -136,9 +136,9 @@ fn main() -> Result<(), std::io::Error> {

match config.command {
None => println!("command required, -h/--help for help"),
Some(Command::Activate(config)) => {
if let Err(e) = commands::activate::command(config).await {
eprintln!("Activate command failed: {e}");
Some(Command::Enable(config)) => {
if let Err(e) = commands::enable::command(config).await {
eprintln!("Enable command failed: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
Expand All @@ -148,9 +148,9 @@ fn main() -> Result<(), std::io::Error> {
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::Deactivate(config)) => {
if let Err(e) = commands::deactivate::command(config).await {
eprintln!("Deactivate command failed: {e}");
Some(Command::Disable(config)) => {
if let Err(e) = commands::disable::command(config).await {
eprintln!("Disable command failed: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
Expand Down
77 changes: 37 additions & 40 deletions influxdb3/tests/server/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ def process_rows(iterator, output):
#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_create_trigger_and_run() {
// create a plugin and trigger and write data in, verifying that the trigger is activated
// create a plugin and trigger and write data in, verifying that the trigger is enabled
// and sent data
let server = TestServer::spawn().await;
let server_addr = server.client_addr();
Expand Down Expand Up @@ -560,7 +560,7 @@ def process_writes(influxdb3_local, table_batches, args=None):
plugin_name,
]);

// creating the trigger should activate it
// creating the trigger should enable it
let result = run_with_confirmation(&[
"create",
"trigger",
Expand Down Expand Up @@ -589,9 +589,18 @@ def process_writes(influxdb3_local, table_batches, args=None):
.await
.expect("write to db");

// query to see if the processed data is there
let expected = json!(
[
{"table_name": "cpu", "row_count": 4},
{"table_name": "mem", "row_count": 1}
]
);

// query to see if the processed data is there. we loop because it could take a bit to write
// back the data. There's also a condition where the table may have been created, but the
// write hasn't happend yet, which returns empty results. This ensures we don't hit that race.
let mut check_count = 0;
let result = loop {
loop {
match server
.api_v3_query_sql(&[
("db", db_name),
Expand All @@ -602,7 +611,15 @@ def process_writes(influxdb3_local, table_batches, args=None):
.json::<Value>()
.await
{
Ok(value) => break value,
Ok(value) => {
if value == expected {
return;
}
check_count += 1;
if check_count > 10 {
panic!("Unexpected query result, got: {:#?}, expected {:#?}", value, expected);
}
}
Err(e) => {
check_count += 1;
if check_count > 10 {
Expand All @@ -611,21 +628,11 @@ def process_writes(influxdb3_local, table_batches, args=None):
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
};
};

assert_eq!(
result,
json!(
[
{"table_name": "cpu", "row_count": 4},
{"table_name": "mem", "row_count": 1}
]
)
);
}
}

#[test_log::test(tokio::test)]
async fn test_trigger_activation() {
async fn test_trigger_enable() {
let server = TestServer::spawn().await;
let server_addr = server.client_addr();
let db_name = "foo";
Expand Down Expand Up @@ -668,42 +675,42 @@ def process_rows(iterator, output):
trigger_name,
]);

// Test activation
// Test enabling
let result = run_with_confirmation(&[
"activate",
"enable",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
trigger_name,
]);
debug!(result = ?result, "activate trigger");
assert_contains!(&result, "Trigger test_trigger activated successfully");
debug!(result = ?result, "enable trigger");
assert_contains!(&result, "Trigger test_trigger enabled successfully");

// Test deactivation
// Test disable
let result = run_with_confirmation(&[
"deactivate",
"disable",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
trigger_name,
]);
debug!(result = ?result, "deactivate trigger");
assert_contains!(&result, "Trigger test_trigger deactivated successfully");
debug!(result = ?result, "disable trigger");
assert_contains!(&result, "Trigger test_trigger disabled successfully");
}

#[test_log::test(tokio::test)]
async fn test_delete_active_trigger() {
async fn test_delete_enabled_trigger() {
let server = TestServer::spawn().await;
let server_addr = server.client_addr();
let db_name = "foo";
let plugin_name = "test_plugin";
let trigger_name = "test_trigger";

// Setup: create database, plugin, and active trigger
// Setup: create database, plugin, and enable trigger
run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]);

let plugin_file = create_plugin_file(
Expand Down Expand Up @@ -739,17 +746,7 @@ def process_rows(iterator, output):
trigger_name,
]);

run_with_confirmation(&[
"activate",
"trigger",
"--database",
db_name,
"--host",
&server_addr,
trigger_name,
]);

// Try to delete active trigger without force flag
// Try to delete the enabled trigger without force flag
let result = run_with_confirmation_and_err(&[
"delete",
"trigger",
Expand All @@ -759,7 +756,7 @@ def process_rows(iterator, output):
&server_addr,
trigger_name,
]);
debug!(result = ?result, "delete active trigger without force");
debug!(result = ?result, "delete enabled trigger without force");
assert_contains!(&result, "command failed");

// Delete active trigger with force flag
Expand All @@ -773,7 +770,7 @@ def process_rows(iterator, output):
trigger_name,
"--force",
]);
debug!(result = ?result, "delete active trigger with force");
debug!(result = ?result, "delete enabled trigger with force");
assert_contains!(&result, "Trigger test_trigger deleted successfully");
}

Expand Down
12 changes: 6 additions & 6 deletions influxdb3_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,13 +645,13 @@ impl Client {
}
}

/// Make a request to `POST /api/v3/configure/processing_engine_trigger/activate`
pub async fn api_v3_configure_processing_engine_trigger_activate(
/// Make a request to `POST /api/v3/configure/processing_engine_trigger/enable`
pub async fn api_v3_configure_processing_engine_trigger_enable(
&self,
db: impl Into<String> + Send,
trigger_name: impl Into<String> + Send,
) -> Result<()> {
let api_path = "/api/v3/configure/processing_engine_trigger/activate";
let api_path = "/api/v3/configure/processing_engine_trigger/enable";
let url = self.base_url.join(api_path)?;

let mut req = self
Expand All @@ -676,13 +676,13 @@ impl Client {
}
}

/// Make a request to `POST /api/v3/configure/processing_engine_trigger/deactivate`
pub async fn api_v3_configure_processing_engine_trigger_deactivate(
/// Make a request to `POST /api/v3/configure/processing_engine_trigger/disable`
pub async fn api_v3_configure_processing_engine_trigger_disable(
&self,
db: impl Into<String> + Send,
trigger_name: impl Into<String> + Send,
) -> Result<()> {
let api_path = "/api/v3/configure/processing_engine_trigger/deactivate";
let api_path = "/api/v3/configure/processing_engine_trigger/disable";
let url = self.base_url.join(api_path)?;

let mut req = self
Expand Down
22 changes: 11 additions & 11 deletions influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,14 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
// Do this first to avoid a dangling running plugin.
// Potential edge-case of a plugin being stopped but not deleted,
// but should be okay given desire to force delete.
let needs_deactivate = force
let needs_disable = force
&& db_schema
.processing_engine_triggers
.get(trigger_name)
.is_some_and(|trigger| !trigger.disabled);

if needs_deactivate {
self.deactivate_trigger(db, trigger_name).await?;
if needs_disable {
self.disable_trigger(db, trigger_name).await?;
}

if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? {
Expand Down Expand Up @@ -293,7 +293,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
Ok(())
}

async fn deactivate_trigger(
async fn disable_trigger(
&self,
db_name: &str,
trigger_name: &str,
Expand Down Expand Up @@ -336,7 +336,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
Ok(())
}

async fn activate_trigger(
async fn enable_trigger(
&self,
write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<dyn QueryExecutor>,
Expand Down Expand Up @@ -705,8 +705,8 @@ mod tests {
.await
.unwrap();

// Deactivate the trigger
let result = pem.deactivate_trigger("foo", "test_trigger").await;
// Disable the trigger
let result = pem.disable_trigger("foo", "test_trigger").await;
assert!(result.is_ok());

// Verify trigger is disabled in schema
Expand All @@ -717,9 +717,9 @@ mod tests {
.unwrap();
assert!(trigger.disabled);

// Activate the trigger
// Enable the trigger
let result = pem
.activate_trigger(
.enable_trigger(
Arc::clone(&write_buffer),
Arc::clone(&pem._query_executor),
"foo",
Expand Down Expand Up @@ -797,7 +797,7 @@ mod tests {
}

#[tokio::test]
async fn test_activate_nonexistent_trigger() -> influxdb3_write::write_buffer::Result<()> {
async fn test_enable_nonexistent_trigger() -> influxdb3_write::write_buffer::Result<()> {
let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap();
let test_store = Arc::new(InMemory::new());
let wal_config = WalConfig {
Expand All @@ -822,7 +822,7 @@ mod tests {
.await?;

let result = pem
.activate_trigger(
.enable_trigger(
Arc::clone(&write_buffer),
Arc::clone(&pem._query_executor),
"foo",
Expand Down
Loading

0 comments on commit 51e0b52

Please sign in to comment.