diff --git a/influxdb3/src/commands/deactivate.rs b/influxdb3/src/commands/disable.rs similarity index 85% rename from influxdb3/src/commands/deactivate.rs rename to influxdb3/src/commands/disable.rs index fab34c251ad..1759a0948a9 100644 --- a/influxdb3/src/commands/deactivate.rs +++ b/influxdb3/src/commands/disable.rs @@ -32,7 +32,7 @@ impl Config { #[derive(Debug, clap::Subcommand)] enum SubCommand { - /// Deactivate a plugin trigger + /// Disable a plugin trigger Trigger(TriggerConfig), } @@ -41,7 +41,7 @@ struct TriggerConfig { #[clap(flatten)] influxdb3_config: InfluxDb3Config, - /// Name of trigger to deactivate + /// Name of trigger to disable #[clap(required = true)] trigger_name: String, } @@ -54,9 +54,9 @@ pub async fn command(config: Config) -> Result<(), Box> { trigger_name, }) => { client - .api_v3_configure_processing_engine_trigger_deactivate(database_name, &trigger_name) + .api_v3_configure_processing_engine_trigger_disable(database_name, &trigger_name) .await?; - println!("Trigger {} deactivated successfully", trigger_name); + println!("Trigger {} disabled successfully", trigger_name); } } Ok(()) diff --git a/influxdb3/src/commands/activate.rs b/influxdb3/src/commands/enable.rs similarity index 85% rename from influxdb3/src/commands/activate.rs rename to influxdb3/src/commands/enable.rs index dd74e871423..aab0d7ab31c 100644 --- a/influxdb3/src/commands/activate.rs +++ b/influxdb3/src/commands/enable.rs @@ -32,7 +32,7 @@ impl Config { #[derive(Debug, clap::Subcommand)] enum SubCommand { - /// Activate a trigger to enable plugin execution + /// Enable a trigger to enable plugin execution Trigger(TriggerConfig), } @@ -41,7 +41,7 @@ struct TriggerConfig { #[clap(flatten)] influxdb3_config: InfluxDb3Config, - /// Name of trigger to manage + /// Name of trigger to enable #[clap(required = true)] trigger_name: String, } @@ -54,9 +54,9 @@ pub async fn command(config: Config) -> Result<(), Box> { trigger_name, }) => { client - .api_v3_configure_processing_engine_trigger_activate(database_name, &trigger_name) + .api_v3_configure_processing_engine_trigger_enable(database_name, &trigger_name) .await?; - println!("Trigger {} activated successfully", trigger_name); + println!("Trigger {} enabled successfully", trigger_name); } } Ok(()) diff --git a/influxdb3/src/main.rs b/influxdb3/src/main.rs index c171afdcf58..6cbe80b1422 100644 --- a/influxdb3/src/main.rs +++ b/influxdb3/src/main.rs @@ -21,11 +21,11 @@ use trogging::{ }; mod commands { - pub mod activate; pub(crate) mod common; pub mod create; - pub mod deactivate; pub mod delete; + pub mod disable; + pub mod enable; pub mod query; pub mod serve; pub mod show; @@ -84,14 +84,14 @@ struct Config { #[derive(Debug, clap::Parser)] #[allow(clippy::large_enum_variant)] enum Command { - /// Activate a resource such as a trigger - Activate(commands::activate::Config), + /// Enable a resource such as a trigger + Enable(commands::enable::Config), /// Create a resource such as a database or auth token Create(commands::create::Config), - /// Deactivate a resource such as a trigger - Deactivate(commands::deactivate::Config), + /// Disable a resource such as a trigger + Disable(commands::disable::Config), /// Delete a resource such as a database or table Delete(commands::delete::Config), @@ -136,9 +136,9 @@ fn main() -> Result<(), std::io::Error> { match config.command { None => println!("command required, -h/--help for help"), - Some(Command::Activate(config)) => { - if let Err(e) = commands::activate::command(config).await { - eprintln!("Activate command failed: {e}"); + Some(Command::Enable(config)) => { + if let Err(e) = commands::enable::command(config).await { + eprintln!("Enable command failed: {e}"); std::process::exit(ReturnCode::Failure as _) } } @@ -148,9 +148,9 @@ fn main() -> Result<(), std::io::Error> { std::process::exit(ReturnCode::Failure as _) } } - Some(Command::Deactivate(config)) => { - if let Err(e) = commands::deactivate::command(config).await { - eprintln!("Deactivate command failed: {e}"); + Some(Command::Disable(config)) => { + if let Err(e) = commands::disable::command(config).await { + eprintln!("Disable command failed: {e}"); std::process::exit(ReturnCode::Failure as _) } } diff --git a/influxdb3/tests/server/cli.rs b/influxdb3/tests/server/cli.rs index cc3ace19d27..4ca330d0f1f 100644 --- a/influxdb3/tests/server/cli.rs +++ b/influxdb3/tests/server/cli.rs @@ -520,7 +520,7 @@ def process_rows(iterator, output): #[cfg(feature = "system-py")] #[test_log::test(tokio::test)] async fn test_create_trigger_and_run() { - // create a plugin and trigger and write data in, verifying that the trigger is activated + // create a plugin and trigger and write data in, verifying that the trigger is enabled // and sent data let server = TestServer::spawn().await; let server_addr = server.client_addr(); @@ -560,7 +560,7 @@ def process_writes(influxdb3_local, table_batches, args=None): plugin_name, ]); - // creating the trigger should activate it + // creating the trigger should enable it let result = run_with_confirmation(&[ "create", "trigger", @@ -589,9 +589,18 @@ def process_writes(influxdb3_local, table_batches, args=None): .await .expect("write to db"); - // query to see if the processed data is there + let expected = json!( + [ + {"table_name": "cpu", "row_count": 4}, + {"table_name": "mem", "row_count": 1} + ] + ); + + // query to see if the processed data is there. we loop because it could take a bit to write + // back the data. There's also a condition where the table may have been created, but the + // write hasn't happend yet, which returns empty results. This ensures we don't hit that race. let mut check_count = 0; - let result = loop { + loop { match server .api_v3_query_sql(&[ ("db", db_name), @@ -602,7 +611,18 @@ def process_writes(influxdb3_local, table_batches, args=None): .json::() .await { - Ok(value) => break value, + Ok(value) => { + if value == expected { + return; + } + check_count += 1; + if check_count > 10 { + panic!( + "Unexpected query result, got: {:#?}, expected {:#?}", + value, expected + ); + } + } Err(e) => { check_count += 1; if check_count > 10 { @@ -611,21 +631,11 @@ def process_writes(influxdb3_local, table_batches, args=None): tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; } }; - }; - - assert_eq!( - result, - json!( - [ - {"table_name": "cpu", "row_count": 4}, - {"table_name": "mem", "row_count": 1} - ] - ) - ); + } } #[test_log::test(tokio::test)] -async fn test_trigger_activation() { +async fn test_trigger_enable() { let server = TestServer::spawn().await; let server_addr = server.client_addr(); let db_name = "foo"; @@ -668,9 +678,9 @@ def process_rows(iterator, output): trigger_name, ]); - // Test activation + // Test enabling let result = run_with_confirmation(&[ - "activate", + "enable", "trigger", "--database", db_name, @@ -678,12 +688,12 @@ def process_rows(iterator, output): &server_addr, trigger_name, ]); - debug!(result = ?result, "activate trigger"); - assert_contains!(&result, "Trigger test_trigger activated successfully"); + debug!(result = ?result, "enable trigger"); + assert_contains!(&result, "Trigger test_trigger enabled successfully"); - // Test deactivation + // Test disable let result = run_with_confirmation(&[ - "deactivate", + "disable", "trigger", "--database", db_name, @@ -691,19 +701,19 @@ def process_rows(iterator, output): &server_addr, trigger_name, ]); - debug!(result = ?result, "deactivate trigger"); - assert_contains!(&result, "Trigger test_trigger deactivated successfully"); + debug!(result = ?result, "disable trigger"); + assert_contains!(&result, "Trigger test_trigger disabled successfully"); } #[test_log::test(tokio::test)] -async fn test_delete_active_trigger() { +async fn test_delete_enabled_trigger() { let server = TestServer::spawn().await; let server_addr = server.client_addr(); let db_name = "foo"; let plugin_name = "test_plugin"; let trigger_name = "test_trigger"; - // Setup: create database, plugin, and active trigger + // Setup: create database, plugin, and enable trigger run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]); let plugin_file = create_plugin_file( @@ -739,17 +749,7 @@ def process_rows(iterator, output): trigger_name, ]); - run_with_confirmation(&[ - "activate", - "trigger", - "--database", - db_name, - "--host", - &server_addr, - trigger_name, - ]); - - // Try to delete active trigger without force flag + // Try to delete the enabled trigger without force flag let result = run_with_confirmation_and_err(&[ "delete", "trigger", @@ -759,7 +759,7 @@ def process_rows(iterator, output): &server_addr, trigger_name, ]); - debug!(result = ?result, "delete active trigger without force"); + debug!(result = ?result, "delete enabled trigger without force"); assert_contains!(&result, "command failed"); // Delete active trigger with force flag @@ -773,7 +773,7 @@ def process_rows(iterator, output): trigger_name, "--force", ]); - debug!(result = ?result, "delete active trigger with force"); + debug!(result = ?result, "delete enabled trigger with force"); assert_contains!(&result, "Trigger test_trigger deleted successfully"); } diff --git a/influxdb3_client/src/lib.rs b/influxdb3_client/src/lib.rs index 241972c5463..15014475e25 100644 --- a/influxdb3_client/src/lib.rs +++ b/influxdb3_client/src/lib.rs @@ -645,13 +645,13 @@ impl Client { } } - /// Make a request to `POST /api/v3/configure/processing_engine_trigger/activate` - pub async fn api_v3_configure_processing_engine_trigger_activate( + /// Make a request to `POST /api/v3/configure/processing_engine_trigger/enable` + pub async fn api_v3_configure_processing_engine_trigger_enable( &self, db: impl Into + Send, trigger_name: impl Into + Send, ) -> Result<()> { - let api_path = "/api/v3/configure/processing_engine_trigger/activate"; + let api_path = "/api/v3/configure/processing_engine_trigger/enable"; let url = self.base_url.join(api_path)?; let mut req = self @@ -676,13 +676,13 @@ impl Client { } } - /// Make a request to `POST /api/v3/configure/processing_engine_trigger/deactivate` - pub async fn api_v3_configure_processing_engine_trigger_deactivate( + /// Make a request to `POST /api/v3/configure/processing_engine_trigger/disable` + pub async fn api_v3_configure_processing_engine_trigger_disable( &self, db: impl Into + Send, trigger_name: impl Into + Send, ) -> Result<()> { - let api_path = "/api/v3/configure/processing_engine_trigger/deactivate"; + let api_path = "/api/v3/configure/processing_engine_trigger/disable"; let url = self.base_url.join(api_path)?; let mut req = self diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index 02cbf558bed..606e985e539 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -235,14 +235,14 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { // Do this first to avoid a dangling running plugin. // Potential edge-case of a plugin being stopped but not deleted, // but should be okay given desire to force delete. - let needs_deactivate = force + let needs_disable = force && db_schema .processing_engine_triggers .get(trigger_name) .is_some_and(|trigger| !trigger.disabled); - if needs_deactivate { - self.deactivate_trigger(db, trigger_name).await?; + if needs_disable { + self.disable_trigger(db, trigger_name).await?; } if let Some(catalog_batch) = self.catalog.apply_catalog_batch(&catalog_batch)? { @@ -293,7 +293,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { Ok(()) } - async fn deactivate_trigger( + async fn disable_trigger( &self, db_name: &str, trigger_name: &str, @@ -336,7 +336,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { Ok(()) } - async fn activate_trigger( + async fn enable_trigger( &self, write_buffer: Arc, query_executor: Arc, @@ -705,8 +705,8 @@ mod tests { .await .unwrap(); - // Deactivate the trigger - let result = pem.deactivate_trigger("foo", "test_trigger").await; + // Disable the trigger + let result = pem.disable_trigger("foo", "test_trigger").await; assert!(result.is_ok()); // Verify trigger is disabled in schema @@ -717,9 +717,9 @@ mod tests { .unwrap(); assert!(trigger.disabled); - // Activate the trigger + // Enable the trigger let result = pem - .activate_trigger( + .enable_trigger( Arc::clone(&write_buffer), Arc::clone(&pem._query_executor), "foo", @@ -797,7 +797,7 @@ mod tests { } #[tokio::test] - async fn test_activate_nonexistent_trigger() -> influxdb3_write::write_buffer::Result<()> { + async fn test_enable_nonexistent_trigger() -> influxdb3_write::write_buffer::Result<()> { let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); let test_store = Arc::new(InMemory::new()); let wal_config = WalConfig { @@ -822,7 +822,7 @@ mod tests { .await?; let result = pem - .activate_trigger( + .enable_trigger( Arc::clone(&write_buffer), Arc::clone(&pem._query_executor), "foo", diff --git a/influxdb3_processing_engine/src/manager.rs b/influxdb3_processing_engine/src/manager.rs index 96e63173f59..a1ddfdf1096 100644 --- a/influxdb3_processing_engine/src/manager.rs +++ b/influxdb3_processing_engine/src/manager.rs @@ -65,13 +65,13 @@ pub trait ProcessingEngineManager: Debug + Send + Sync + 'static { trigger_name: &str, ) -> Result<(), ProcessingEngineError>; - async fn deactivate_trigger( + async fn disable_trigger( &self, db_name: &str, trigger_name: &str, ) -> Result<(), ProcessingEngineError>; - async fn activate_trigger( + async fn enable_trigger( &self, write_buffer: Arc, query_executor: Arc, diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index 9ebafeee0a5..d3ef8a216f7 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -1091,27 +1091,24 @@ where .body(Body::empty())?) } - async fn deactivate_processing_engine_trigger( + async fn disable_processing_engine_trigger( &self, req: Request, ) -> Result> { let query = req.uri().query().unwrap_or(""); let delete_req = serde_urlencoded::from_str::(query)?; self.processing_engine - .deactivate_trigger(delete_req.db.as_str(), delete_req.trigger_name.as_str()) + .disable_trigger(delete_req.db.as_str(), delete_req.trigger_name.as_str()) .await?; Ok(Response::builder() .status(StatusCode::OK) .body(Body::empty())?) } - async fn activate_processing_engine_trigger( - &self, - req: Request, - ) -> Result> { + async fn enable_processing_engine_trigger(&self, req: Request) -> Result> { let query = req.uri().query().unwrap_or(""); let delete_req = serde_urlencoded::from_str::(query)?; self.processing_engine - .activate_trigger( + .enable_trigger( Arc::clone(&self.write_buffer), Arc::clone(&self.query_executor), delete_req.db.as_str(), @@ -1753,11 +1750,11 @@ pub(crate) async fn route_request( (Method::DELETE, "/api/v3/configure/processing_engine_plugin") => { http_server.delete_processing_engine_plugin(req).await } - (Method::POST, "/api/v3/configure/processing_engine_trigger/deactivate") => { - http_server.deactivate_processing_engine_trigger(req).await + (Method::POST, "/api/v3/configure/processing_engine_trigger/disable") => { + http_server.disable_processing_engine_trigger(req).await } - (Method::POST, "/api/v3/configure/processing_engine_trigger/activate") => { - http_server.activate_processing_engine_trigger(req).await + (Method::POST, "/api/v3/configure/processing_engine_trigger/enable") => { + http_server.enable_processing_engine_trigger(req).await } (Method::POST, "/api/v3/configure/processing_engine_trigger") => { http_server.configure_processing_engine_trigger(req).await