Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: return better plugin execution errors #25842

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

117 changes: 112 additions & 5 deletions influxdb3/tests/server/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use pretty_assertions::assert_eq;
use serde_json::{json, Value};
use test_helpers::assert_contains;
use test_helpers::tempfile::NamedTempFile;
#[cfg(feature = "system-py")]
use test_helpers::tempfile::TempDir;

pub fn run(args: &[&str]) -> String {
let process = Command::cargo_bin("influxdb3")
Expand Down Expand Up @@ -945,7 +947,7 @@ def process_writes(influxdb3_local, table_batches, args=None):
influxdb3_local.info("arg1: " + args["arg1"])

query_params = {"host": args["host"]}
query_result = influxdb3_local.query("SELECT * FROM cpu where host = $host", query_params)
query_result = influxdb3_local.query("SELECT host, region, usage FROM cpu where host = $host", query_params)
influxdb3_local.info("query result: " + str(query_result))

for table_batch in table_batches:
Expand Down Expand Up @@ -984,9 +986,9 @@ def process_writes(influxdb3_local, table_batches, args=None):
server
.write_lp_to_db(
"foo",
"cpu,host=s1,region=us-east usage=0.9 1\n\
cpu,host=s2,region=us-east usage=0.89 2\n\
cpu,host=s1,region=us-east usage=0.85 3",
"cpu,host=s1,region=us-east usage=0.9\n\
cpu,host=s2,region=us-east usage=0.89\n\
cpu,host=s1,region=us-east usage=0.85",
Precision::Nanosecond,
)
.await
Expand Down Expand Up @@ -1015,7 +1017,7 @@ def process_writes(influxdb3_local, table_batches, args=None):
let expected_result = r#"{
"log_lines": [
"INFO: arg1: arg1_value",
"INFO: query result: [{'host': 's2', 'region': 'us-east', 'time': 2, 'usage': 0.89}]",
"INFO: query result: [{'host': 's2', 'region': 'us-east', 'usage': 0.89}]",
"INFO: table: test_input",
"INFO: row: {'tag1': 'tag1_value', 'tag2': 'tag2_value', 'field1': 1, 'time': 500}",
"INFO: done"
Expand All @@ -1033,3 +1035,108 @@ def process_writes(influxdb3_local, table_batches, args=None):
let expected_result = serde_json::from_str::<serde_json::Value>(expected_result).unwrap();
assert_eq!(res, expected_result);
}

#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_wal_plugin_errors() {
use crate::ConfigProvider;
use influxdb3_client::Precision;

struct Test {
name: &'static str,
plugin_code: &'static str,
expected_error: &'static str,
}

let tests = vec![
Test {
name: "invalid_python",
plugin_code: r#"
lkjasdf9823
jjjjj / sss"#,
expected_error: "error executing plugin: IndentationError: unexpected indent (<string>, line 2)",
},
Test {
name: "no_process_writes",
plugin_code: r#"
def not_process_writes(influxdb3_local, table_batches, args=None):
influxdb3_local.info("done")"#,
expected_error: "error executing plugin: the process_writes function is not present in the plugin. Should be defined as: process_writes(influxdb3_local, table_batches, args=None)",
},
Test {
name: "no_args",
plugin_code: r#"
def process_writes(influxdb3_local, table_batches):
influxdb3_local.info("done")
"#,
expected_error: "error executing plugin: TypeError: process_writes() takes 2 positional arguments but 3 were given",
},
Test {
name: "no_table_batches",
plugin_code: r#"
def process_writes(influxdb3_local, args=None):
influxdb3_local.info("done")
"#,
expected_error: "error executing plugin: TypeError: process_writes() takes from 1 to 2 positional arguments but 3 were given",
},
Test {
name: "no_influxdb3_local",
plugin_code: r#"
def process_writes(table_batches, args=None):
influxdb3_local.info("done")
"#,
expected_error: "error executing plugin: TypeError: process_writes() takes from 1 to 2 positional arguments but 3 were given",
}];

let plugin_dir = TempDir::new().unwrap();

let server = TestServer::configure()
.with_plugin_dir(plugin_dir.path().to_str().unwrap())
.spawn()
.await;
let server_addr = server.client_addr();

server
.write_lp_to_db(
"foo",
"cpu,host=s1,region=us-east usage=0.9\n\
cpu,host=s2,region=us-east usage=0.89\n\
cpu,host=s1,region=us-east usage=0.85",
Precision::Nanosecond,
)
.await
.unwrap();

let db_name = "foo";

for test in tests {
let mut plugin_file = NamedTempFile::new_in(plugin_dir.path()).unwrap();
writeln!(plugin_file, "{}", test.plugin_code).unwrap();
let plugin_name = plugin_file.path().file_name().unwrap().to_str().unwrap();

let result = run_with_confirmation(&[
"test",
"wal_plugin",
"--database",
db_name,
"--host",
&server_addr,
"--lp",
"test_input,tag1=tag1_value,tag2=tag2_value field1=1i 500",
"--input-arguments",
"arg1=arg1_value,host=s2",
plugin_name,
]);
debug!(result = ?result, "test wal plugin");

println!("{}", result);
let res = serde_json::from_str::<serde_json::Value>(&result).unwrap();
let errors = res.get("errors").unwrap().as_array().unwrap();
let error = errors[0].as_str().unwrap();
assert_eq!(
error, test.expected_error,
"test: {}, response was: {}",
test.name, result
);
}
}
3 changes: 3 additions & 0 deletions influxdb3_processing_engine/src/plugins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ pub enum Error {

#[error("reading plugin file: {0}")]
ReadPluginError(#[from] std::io::Error),

#[error("error executing plugin: {0}")]
PluginExecutionError(#[from] influxdb3_py_api::ExecutePluginError),
}

#[cfg(feature = "system-py")]
Expand Down
2 changes: 2 additions & 0 deletions influxdb3_py_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ license.workspace = true
system-py = ["pyo3"]

[dependencies]
anyhow.workspace = true
arrow-array.workspace = true
arrow-schema.workspace = true
hashbrown.workspace = true
Expand All @@ -20,6 +21,7 @@ iox_query_params.workspace = true
observability_deps.workspace = true
parking_lot.workspace = true
futures.workspace = true
thiserror.workspace = true
tokio.workspace = true

[dependencies.pyo3]
Expand Down
9 changes: 9 additions & 0 deletions influxdb3_py_api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@
#[derive(Debug, thiserror::Error)]
pub enum ExecutePluginError {
#[error("the process_writes function is not present in the plugin. Should be defined as: process_writes(influxdb3_local, table_batches, args=None)")]
MissingProcessWritesFunction,

#[error("{0}")]
PluginError(#[from] anyhow::Error),
}

#[cfg(feature = "system-py")]
pub mod system_py;
76 changes: 52 additions & 24 deletions influxdb3_py_api/src/system_py.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::ExecutePluginError;
use anyhow::Context;
use arrow_array::types::Int32Type;
use arrow_array::{
BooleanArray, DictionaryArray, Float64Array, Int64Array, RecordBatch, StringArray,
Expand Down Expand Up @@ -360,7 +362,7 @@ pub fn execute_python_with_batch(
query_executor: Arc<dyn QueryExecutor>,
table_filter: Option<TableId>,
args: &Option<HashMap<String, String>>,
) -> PyResult<PluginReturnState> {
) -> Result<PluginReturnState, ExecutePluginError> {
Python::with_gil(|py| {
// import the LineBuilder for use in the python code
let globals = PyDict::new(py);
Expand All @@ -369,7 +371,8 @@ pub fn execute_python_with_batch(
&CString::new(LINE_BUILDER_CODE).unwrap(),
Some(&globals),
None,
)?;
)
.map_err(|e| anyhow::Error::new(e).context("failed to eval the LineBuilder API code"))?;

// convert the write batch into a python object
let mut table_batches = Vec::with_capacity(write_batch.table_chunks.len());
Expand All @@ -380,43 +383,61 @@ pub fn execute_python_with_batch(
continue;
}
}
let table_def = schema.tables.get(table_id).unwrap();
let table_def = schema.tables.get(table_id).context("table not found")?;

let dict = PyDict::new(py);
dict.set_item("table_name", table_def.table_name.as_ref())
.unwrap();
.context("failed to set table_name")?;

let mut rows: Vec<PyObject> = Vec::new();
for chunk in table_chunks.chunk_time_to_chunk.values() {
for row in &chunk.rows {
let py_row = PyDict::new(py);

for field in &row.fields {
let field_name = table_def.column_id_to_name(&field.id).unwrap();
let field_name = table_def
.column_id_to_name(&field.id)
.context("field not found")?;
match &field.value {
FieldData::String(s) => {
py_row.set_item(field_name.as_ref(), s.as_str()).unwrap();
py_row
.set_item(field_name.as_ref(), s.as_str())
.context("failed to set string field")?;
}
FieldData::Integer(i) => {
py_row.set_item(field_name.as_ref(), i).unwrap();
py_row
.set_item(field_name.as_ref(), i)
.context("failed to set integer field")?;
}
FieldData::UInteger(u) => {
py_row.set_item(field_name.as_ref(), u).unwrap();
py_row
.set_item(field_name.as_ref(), u)
.context("failed to set unsigned integer field")?;
}
FieldData::Float(f) => {
py_row.set_item(field_name.as_ref(), f).unwrap();
py_row
.set_item(field_name.as_ref(), f)
.context("failed to set float field")?;
}
FieldData::Boolean(b) => {
py_row.set_item(field_name.as_ref(), b).unwrap();
py_row
.set_item(field_name.as_ref(), b)
.context("failed to set boolean field")?;
}
FieldData::Tag(t) => {
py_row.set_item(field_name.as_ref(), t.as_str()).unwrap();
py_row
.set_item(field_name.as_ref(), t.as_str())
.context("failed to set tag field")?;
}
FieldData::Key(k) => {
py_row.set_item(field_name.as_ref(), k.as_str()).unwrap();
py_row
.set_item(field_name.as_ref(), k.as_str())
.context("failed to set key field")?;
}
FieldData::Timestamp(t) => {
py_row.set_item(field_name.as_ref(), t).unwrap();
py_row
.set_item(field_name.as_ref(), t)
.context("failed to set timestamp")?;
}
};
}
Expand All @@ -425,21 +446,23 @@ pub fn execute_python_with_batch(
}
}

let rows = PyList::new(py, rows).unwrap();
let rows = PyList::new(py, rows).context("failed to create rows list")?;

dict.set_item("rows", rows.unbind()).unwrap();
dict.set_item("rows", rows.unbind())
.context("failed to set rows")?;
table_batches.push(dict);
}

let py_batches = PyList::new(py, table_batches).unwrap();
let py_batches =
PyList::new(py, table_batches).context("failed to create table_batches list")?;

let api = PyPluginCallApi {
db_schema: schema,
query_executor,
return_state: Default::default(),
};
let return_state = Arc::clone(&api.return_state);
let local_api = api.into_pyobject(py)?;
let local_api = api.into_pyobject(py).map_err(anyhow::Error::from)?;

// turn args into an optional dict to pass into python
let args = args.as_ref().map(|args| {
Expand All @@ -451,14 +474,19 @@ pub fn execute_python_with_batch(
});

// run the code and get the python function to call
py.run(&CString::new(code).unwrap(), Some(&globals), None)?;
let py_func = py.eval(
&CString::new(PROCESS_WRITES_CALL_SITE).unwrap(),
Some(&globals),
None,
)?;
py.run(&CString::new(code).unwrap(), Some(&globals), None)
.map_err(anyhow::Error::from)?;
let py_func = py
.eval(
&CString::new(PROCESS_WRITES_CALL_SITE).unwrap(),
Some(&globals),
None,
)
.map_err(|_| ExecutePluginError::MissingProcessWritesFunction)?;

py_func.call1((local_api, py_batches.unbind(), args))?;
py_func
.call1((local_api, py_batches.unbind(), args))
.map_err(anyhow::Error::from)?;

// swap with an empty return state to avoid cloning
let empty_return_state = PluginReturnState::default();
Expand Down
Loading