mod api; use crate::server::{ConfigProvider, TestServer}; use assert_cmd::Command as AssertCmd; use assert_cmd::cargo::CommandCargoExt; use observability_deps::tracing::debug; use pretty_assertions::assert_eq; use serde_json::{Value, json}; use std::fs::File; use std::path::PathBuf; use std::{ fs, io::Write, process::{Command, Stdio}, }; use test_helpers::tempfile::NamedTempFile; use test_helpers::tempfile::TempDir; use test_helpers::{assert_contains, assert_not_contains}; pub const WRITE_REPORTS_PLUGIN_CODE: &str = r#" def process_writes(influxdb3_local, table_batches, args=None): for table_batch in table_batches: # Skip if table_name is write_reports if table_batch["table_name"] == "write_reports": continue row_count = len(table_batch["rows"]) # Double row count if table name matches args table_name if args and "double_count_table" in args and table_batch["table_name"] == args["double_count_table"]: row_count *= 2 line = LineBuilder("write_reports")\ .tag("table_name", table_batch["table_name"])\ .int64_field("row_count", row_count) influxdb3_local.write(line) "#; fn create_plugin_in_temp_dir(code: &str) -> (TempDir, PathBuf) { // Create a temporary directory that will exist until it's dropped let temp_dir = tempfile::tempdir().unwrap(); let file_path = write_plugin_to_temp_dir(&temp_dir, "plugin.py", code); // Return both the directory (which must be kept alive) and the file path (temp_dir, file_path) } fn write_plugin_to_temp_dir(temp_dir: &TempDir, name: &str, code: &str) -> PathBuf { // Create a path for the plugin file within this directory let file_path = temp_dir.path().join(name); // Write the code to the file let mut file = File::create(&file_path).unwrap(); writeln!(file, "{}", code).unwrap(); file_path } #[test_log::test(tokio::test)] async fn test_telemetry_disabled_with_debug_msg() { let serve_args = &[ "serve", "--node-id", "the-best-node", "--object-store", "memory", ]; let expected_disabled: &str = "Initializing TelemetryStore with upload disabled."; // validate we get a debug message indicating upload disabled let output = AssertCmd::cargo_bin("influxdb3") .unwrap() .args(serve_args) .arg("-vv") .arg("--disable-telemetry-upload") .timeout(std::time::Duration::from_millis(500)) .assert() .failure() .get_output() .stdout .clone(); let output = String::from_utf8(output).expect("must be able to convert output to String"); assert_contains!(output, expected_disabled); } #[test_log::test(tokio::test)] async fn test_telemetry_disabled() { let serve_args = &[ "serve", "--node-id", "the-best-node", "--object-store", "memory", ]; let expected_disabled: &str = "Initializing TelemetryStore with upload disabled."; // validate no message when debug output disabled let output = AssertCmd::cargo_bin("influxdb3") .unwrap() .args(serve_args) .arg("-v") .arg("--disable-telemetry-upload") .timeout(std::time::Duration::from_millis(500)) .assert() .failure() .get_output() .stdout .clone(); let output = String::from_utf8(output).expect("must be able to convert output to String"); assert_not_contains!(output, expected_disabled); } #[test_log::test(tokio::test)] async fn test_telemetry_enabled_with_debug_msg() { let serve_args = &[ "serve", "--node-id", "the-best-node", "--object-store", "memory", ]; let expected_enabled: &str = "Initializing TelemetryStore with upload enabled for http://localhost:9999."; // validate debug output shows which endpoint we are hitting when telemetry enabled let output = AssertCmd::cargo_bin("influxdb3") .unwrap() .args(serve_args) .arg("-vv") .arg("--telemetry-endpoint") .arg("http://localhost:9999") .timeout(std::time::Duration::from_millis(500)) .assert() .failure() .get_output() .stdout .clone(); let output = String::from_utf8(output).expect("must be able to convert output to String"); assert_contains!(output, expected_enabled); } #[test_log::test(tokio::test)] async fn test_telementry_enabled() { let serve_args = &[ "serve", "--node-id", "the-best-node", "--object-store", "memory", ]; let expected_enabled: &str = "Initializing TelemetryStore with upload enabled for http://localhost:9999."; // validate no telemetry endpoint reported when debug output not enabled let output = AssertCmd::cargo_bin("influxdb3") .unwrap() .args(serve_args) .arg("-v") .arg("--telemetry-endpoint") .arg("http://localhost:9999") .timeout(std::time::Duration::from_millis(500)) .assert() .failure() .get_output() .stdout .clone(); let output = String::from_utf8(output).expect("must be able to convert output to String"); assert_not_contains!(output, expected_enabled); } #[test_log::test(tokio::test)] async fn test_show_databases() { let server = TestServer::spawn().await; // Create two databases server.create_database("foo").run().unwrap(); server.create_database("bar").run().unwrap(); // Show databases with default format (pretty) let output = server.show_databases().run().unwrap(); assert_eq!( "\ +---------------+\n\ | iox::database |\n\ +---------------+\n\ | bar |\n\ | foo |\n\ +---------------+\ ", output ); // Show databases with JSON format let output = server.show_databases().with_format("json").run().unwrap(); assert_eq!( r#"[{"iox::database":"bar"},{"iox::database":"foo"}]"#, output ); // Show databases with CSV format let output = server.show_databases().with_format("csv").run().unwrap(); assert_eq!( "\ iox::database\n\ bar\n\ foo\ ", output ); // Show databases with JSONL format let output = server.show_databases().with_format("jsonl").run().unwrap(); assert_eq!( "\ {\"iox::database\":\"bar\"}\n\ {\"iox::database\":\"foo\"}\ ", output ); // Delete a database server.delete_database("foo").run().unwrap(); // Show databases after deletion let output = server.show_databases().run().unwrap(); assert_eq!( "\ +---------------+\n\ | iox::database |\n\ +---------------+\n\ | bar |\n\ +---------------+", output ); // Show databases including deleted ones let output = server.show_databases().show_deleted(true).run().unwrap(); // don't assert on actual output since it contains a time stamp which would be flaky assert_contains!(output, "foo-"); } #[test_log::test(tokio::test)] async fn test_show_empty_database() { let server = TestServer::spawn().await; // Show empty database list with default format (pretty) let output = server.show_databases().run().unwrap(); assert_eq!( "\ +---------------+\n\ | iox::database |\n\ +---------------+\n\ +---------------+", output ); // Show empty database list with JSON format let output = server.show_databases().with_format("json").run().unwrap(); assert_eq!(output, "[]"); // Show empty database list with JSONL format let output = server.show_databases().with_format("jsonl").run().unwrap(); assert_eq!(output, ""); } #[test_log::test(tokio::test)] async fn test_create_database() { let server = TestServer::spawn().await; let db_name = "foo"; let result = server.create_database(db_name).run().unwrap(); debug!(result = ?result, "create database"); assert_contains!(&result, "Database \"foo\" created successfully"); } #[test_log::test(tokio::test)] async fn test_create_database_limit() { let server = TestServer::spawn().await; let db_name = "foo"; // Create 5 databases successfully for i in 0..5 { let name = format!("{db_name}{i}"); let result = server.create_database(&name).run().unwrap(); debug!(result = ?result, "create database"); assert_contains!(&result, format!("Database \"{name}\" created successfully")); } // Try to create a 6th database, which should fail let result = server.create_database("foo5").run(); assert!(result.is_err()); let err = result.unwrap_err().to_string(); debug!(error = ?err, "create database error"); assert_contains!( &err, "Adding a new database would exceed limit of 5 databases" ); } #[test_log::test(tokio::test)] async fn test_delete_database() { let server = TestServer::spawn().await; let db_name = "foo"; // Write data to the database first server .write_lp_to_db( db_name, "cpu,t1=a,t2=b,t3=c f1=true,f2=\"hello\",f3=4i,f4=4u,f5=5 1000", influxdb3_client::Precision::Second, ) .await .expect("write to db"); // Delete the database using our new API let result = server .delete_database(db_name) .run() .expect("delete database"); debug!(result = ?result, "delete database"); // We need to modify the DeleteDatabaseQuery to return the output string assert_contains!(&result, "Database \"foo\" deleted successfully"); } #[test_log::test(tokio::test)] async fn test_delete_missing_database() { let server = TestServer::spawn().await; let db_name = "foo"; // Try to delete a non-existent database let result = server.delete_database(db_name).run(); // Check that we got the expected error assert!(result.is_err()); let err = result.unwrap_err().to_string(); debug!(err = ?err, "delete missing database"); assert_contains!(&err, "404"); } #[test_log::test(tokio::test)] async fn test_create_table() { let server = TestServer::spawn().await; let db_name = "foo"; let table_name = "bar"; // Create database using the new query API let result = server.create_database(db_name).run().unwrap(); debug!(result = ?result, "create database"); assert_contains!(&result, "Database \"foo\" created successfully"); // Create table using the new query API let result = server .create_table(db_name, table_name) .with_tags(["one", "two", "three"]) .with_fields([ ("four", "utf8".to_string()), ("five", "uint64".to_string()), ("six", "float64".to_string()), ("seven", "int64".to_string()), ("eight", "bool".to_string()), ]) .run() .unwrap(); debug!(result = ?result, "create table"); assert_contains!(&result, "Table \"foo\".\"bar\" created successfully"); // Check that we can query the table and that it has no values let result = server .query_sql(db_name) .with_sql("SELECT * FROM bar") .run() .unwrap(); assert_eq!(result, json!([])); // Write data to the table server .write_lp_to_db( db_name, format!("{table_name},one=1,two=2,three=3 four=\"4\",five=5u,six=6,seven=7i,eight=true 2998574937"), influxdb3_client::Precision::Second, ) .await .expect("write to db"); // Check that we can get data from the table let result = server .query_sql(db_name) .with_sql("SELECT * FROM bar") .run() .unwrap(); debug!(result = ?result, "data written"); assert_eq!( result, json!([{ "one": "1", "two": "2", "three": "3", "four": "4", "five": 5, "six": 6.0, "seven": 7, "eight": true, "time": "2065-01-07T17:28:57" }]) ); } #[test_log::test(tokio::test)] async fn test_create_table_fail_existing() { let server = TestServer::spawn().await; let db_name = "foo"; let table_name = "bar"; // Create database let result = server.create_database(db_name).run().unwrap(); debug!(result = ?result, "create database"); assert_contains!(&result, "Database \"foo\" created successfully"); // Create table successfully let result = server .create_table(db_name, table_name) .add_tag("one") .add_field("four", "utf8") .run() .unwrap(); debug!(result = ?result, "create table"); assert_contains!(&result, "Table \"foo\".\"bar\" created successfully"); // Try creating table again, expect failure let err = server .create_table(db_name, table_name) .add_tag("one") .add_field("four", "utf8") .run() .unwrap_err(); insta::assert_snapshot!("test_create_table_fail_existing", err.to_string()); } #[test_log::test(tokio::test)] async fn test_delete_table() { let server = TestServer::spawn().await; let db_name = "foo"; let table_name = "cpu"; server .write_lp_to_db( db_name, format!("{table_name},t1=a,t2=b,t3=c f1=true,f2=\"hello\",f3=4i,f4=4u,f5=5 1000"), influxdb3_client::Precision::Second, ) .await .expect("write to db"); let result = server.delete_table(db_name, table_name).run().unwrap(); debug!(result = ?result, "delete table"); assert_contains!(&result, "Table \"foo\".\"cpu\" deleted successfully"); } #[test_log::test(tokio::test)] async fn test_delete_missing_table() { let server = TestServer::spawn().await; let db_name = "foo"; let table_name = "mem"; // Write data to the "mem" table server .write_lp_to_db( db_name, format!("{table_name},t1=a,t2=b,t3=c f1=true,f2=\"hello\",f3=4i,f4=4u,f5=5 1000"), influxdb3_client::Precision::Second, ) .await .expect("write to db"); // Try to delete a non-existent "cpu" table let err = server.delete_table(db_name, "cpu").run().unwrap_err(); debug!(result = ?err, "delete missing table"); assert_contains!(err.to_string(), "404"); } #[tokio::test] async fn test_create_delete_distinct_cache() { let server = TestServer::spawn().await; let db_name = "foo"; let table_name = "bar"; server .write_lp_to_db( db_name, format!("{table_name},t1=a,t2=aa,t3=aaa f1=true,f2=\"hello\",f3=42"), influxdb3_client::Precision::Second, ) .await .unwrap(); let cache_name = "baz"; // First create the cache let result = server .create_distinct_cache(db_name, table_name, cache_name) .with_columns(["t1", "t2"]) .run() .unwrap(); assert_contains!(&result, "new cache created"); // Doing the same thing over again will be a no-op and return an error let err = server .create_distinct_cache(db_name, table_name, cache_name) .with_columns(["t1", "t2"]) .run() .unwrap_err(); assert_contains!(&err.to_string(), "[409 Conflict]"); // Now delete the cache let result = server .delete_distinct_cache(db_name, table_name, cache_name) .run() .unwrap(); assert_contains!(&result, "distinct cache deleted successfully"); // Trying to delete again should result in an error as the cache no longer exists let err = server .delete_distinct_cache(db_name, table_name, cache_name) .run() .unwrap_err(); assert_contains!(&err.to_string(), "[404 Not Found]"); } #[test_log::test(tokio::test)] async fn test_create_trigger_and_run() { // create a plugin and trigger and write data in, verifying that the trigger is enabled // and sent data let (temp_dir, plugin_path) = create_plugin_in_temp_dir(WRITE_REPORTS_PLUGIN_CODE); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_filename = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "foo"; let trigger_name = "test_trigger"; // Setup: create database server.create_database(db_name).run().unwrap(); // Creating the trigger should enable it let result = server .create_trigger(db_name, trigger_name, plugin_filename, "all_tables") .add_trigger_argument("double_count_table=cpu") .run() .unwrap(); debug!(result = ?result, "create trigger"); assert_contains!(&result, "Trigger test_trigger created successfully"); // Now let's write data and see if it gets processed server .write_lp_to_db( db_name, "cpu,host=a f1=1.0\ncpu,host=b f1=2.0\nmem,host=a usage=234", influxdb3_client::Precision::Second, ) .await .expect("write to db"); let expected = json!( [ {"table_name": "cpu", "row_count": 4}, {"table_name": "mem", "row_count": 1} ] ); // Query to see if the processed data is there. We loop because it could take a bit to write // back the data. There's also a condition where the table may have been created, but the // write hasn't happened yet, which returns empty results. This ensures we don't hit that race. let mut check_count = 0; loop { match server .query_sql(db_name) .with_sql("SELECT table_name, row_count FROM write_reports") .run() { Ok(value) => { if value == expected { return; } check_count += 1; if check_count > 10 { panic!( "Unexpected query result, got: {:#?}, expected {:#?}", value, expected ); } } Err(e) => { check_count += 1; if check_count > 10 { panic!("Failed to query processed data: {}", e); } tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; } }; } } #[test_log::test(tokio::test)] async fn test_triggers_are_started() { // Create a plugin and trigger and write data in, verifying that the trigger is enabled // and sent data let (temp_dir, plugin_path) = create_plugin_in_temp_dir(WRITE_REPORTS_PLUGIN_CODE); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_filename = plugin_path.file_name().unwrap().to_str().unwrap(); // Create tmp dir for object store let tmp_file = TempDir::new().unwrap(); let tmp_dir = tmp_file.path().to_str().unwrap(); let mut server = TestServer::configure() .with_plugin_dir(plugin_dir) .with_object_store_dir(tmp_dir) .spawn() .await; let db_name = "foo"; let trigger_name = "test_trigger"; // Setup: create database server.create_database(db_name).run().unwrap(); // Creating the trigger should enable it let result = server .create_trigger(db_name, trigger_name, plugin_filename, "all_tables") .add_trigger_argument("double_count_table=cpu") .run() .unwrap(); debug!(result = ?result, "create trigger"); assert_contains!(&result, "Trigger test_trigger created successfully"); // Restart the server server.kill(); server = TestServer::configure() .with_plugin_dir(plugin_dir) .with_object_store_dir(tmp_dir) .spawn() .await; // Now let's write data and see if it gets processed server .write_lp_to_db( db_name, "cpu,host=a f1=1.0\ncpu,host=b f1=2.0\nmem,host=a usage=234", influxdb3_client::Precision::Second, ) .await .expect("write to db"); let expected = json!( [ {"table_name": "cpu", "row_count": 4}, {"table_name": "mem", "row_count": 1} ] ); // Query to see if the processed data is there. We loop because it could take a bit to write // back the data. There's also a condition where the table may have been created, but the // write hasn't happened yet, which returns empty results. This ensures we don't hit that race. let mut check_count = 0; loop { match server .query_sql(db_name) .with_sql("SELECT table_name, row_count FROM write_reports") .run() { Ok(value) => { if value == expected { return; } check_count += 1; if check_count > 10 { panic!( "Unexpected query result, got: {:#?}, expected {:#?}", value, expected ); } } Err(e) => { check_count += 1; if check_count > 10 { panic!("Failed to query processed data: {}", e); } tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; } }; } } #[test_log::test(tokio::test)] async fn test_database_create_persists() { // create tmp dir for object store let tmp_file = TempDir::new().unwrap(); let tmp_dir = tmp_file.path().to_str().unwrap(); let mut server = TestServer::configure() .with_object_store_dir(tmp_dir) .spawn() .await; let db_name = "foo"; let result = server.create_database(db_name).run().unwrap(); debug!(result = ?result, "create database"); assert_contains!(&result, "Database \"foo\" created successfully"); // restart the server server.kill(); server = TestServer::configure() .with_object_store_dir(tmp_dir) .spawn() .await; let result = server.show_databases().run().unwrap(); assert_eq!( r#"+---------------+ | iox::database | +---------------+ | foo | +---------------+"#, result ); } #[test_log::test(tokio::test)] async fn test_trigger_enable() { let (temp_dir, plugin_path) = create_plugin_in_temp_dir( r#" def process_writes(influxdb3_local, table_batches, args=None): influxdb3_local.info("done") "#, ); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_filename = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "foo"; let trigger_name = "test_trigger"; // Setup: create database and trigger server.create_database(db_name).run().unwrap(); server .create_trigger(db_name, trigger_name, plugin_filename, "all_tables") .run() .unwrap(); // Test enabling let result = server.enable_trigger(db_name, trigger_name).run().unwrap(); debug!(result = ?result, "enable trigger"); assert_contains!(&result, "Trigger test_trigger enabled successfully"); // Test disable let result = server.disable_trigger(db_name, trigger_name).run().unwrap(); debug!(result = ?result, "disable trigger"); assert_contains!(&result, "Trigger test_trigger disabled successfully"); } #[test_log::test(tokio::test)] async fn test_delete_enabled_trigger() { let (temp_dir, plugin_path) = create_plugin_in_temp_dir( r#" def process_writes(influxdb3_local, table_batches, args=None): influxdb3_local.info("done") "#, ); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_filename = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "foo"; let trigger_name = "test_trigger"; // Setup: create database, plugin, and enable trigger server.create_database(db_name).run().unwrap(); server .create_trigger(db_name, trigger_name, plugin_filename, "all_tables") .run() .unwrap(); // Try to delete the enabled trigger without force flag let err = server .delete_trigger(db_name, trigger_name) .run() .unwrap_err(); debug!(result = ?err, "delete enabled trigger without force"); assert_contains!(&err.to_string(), "command failed"); // Delete active trigger with force flag let result = server .delete_trigger(db_name, trigger_name) .force(true) .run() .unwrap(); debug!(result = ?result, "delete enabled trigger with force"); assert_contains!(&result, "Trigger test_trigger deleted successfully"); } #[test_log::test(tokio::test)] async fn test_table_specific_trigger() { let (temp_dir, plugin_path) = create_plugin_in_temp_dir( r#" def process_writes(influxdb3_local, table_batches, args=None): influxdb3_local.info("done") "#, ); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_filename = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "foo"; let table_name = "bar"; let trigger_name = "test_trigger"; // Setup: create database, table, and plugin server.create_database(db_name).run().unwrap(); server .create_table(db_name, table_name) .add_tag("tag1") .add_field("field1", "float64") .run() .unwrap(); // Create table-specific trigger let result = server .create_trigger( db_name, trigger_name, plugin_filename, format!("table:{}", table_name), ) .run() .unwrap(); debug!(result = ?result, "create table-specific trigger"); assert_contains!(&result, "Trigger test_trigger created successfully"); } #[test] fn test_create_token() { let process = Command::cargo_bin("influxdb3") .unwrap() .args(["create", "token"]) .stdout(Stdio::piped()) .output() .unwrap(); let result: String = String::from_utf8_lossy(&process.stdout).trim().into(); assert_contains!( &result, "This will grant you access to every HTTP endpoint or deny it otherwise" ); } #[test_log::test(tokio::test)] async fn test_show_system() { let server = TestServer::configure().spawn().await; let db_name = "foo"; server .write_lp_to_db( db_name, "cpu,t1=a,t2=b,t3=c f1=true,f2=\"hello\",f3=4i,f4=4u,f5=5 1000", influxdb3_client::Precision::Second, ) .await .expect("write to db"); // Test successful cases // 1. Summary command let summary_output = server.show_system(db_name).summary().run().unwrap(); insta::assert_snapshot!( "summary_should_show_up_to_ten_entries_from_each_table", summary_output ); // 2. Table command for "queries" let table_output = server.show_system(db_name).table("queries").run().unwrap(); insta::assert_snapshot!( "table_NAME_should_show_queries_table_without_information_or_system_schema_queries", table_output ); // 3. Table-list command let table_list_output = server.show_system(db_name).table_list().run().unwrap(); insta::assert_snapshot!( "table-list_should_list_system_schema_tables_only", table_list_output ); // Test failure cases // 1. Missing database (this can't be tested with the fluent API since we always require a db name) let output = server .run(vec!["show", "system"], &["table-list"]) .unwrap_err() .to_string(); insta::assert_snapshot!("fail_without_database_name", output); // 2. Non-existent table name let result = server.show_system(db_name).table("meow").run(); assert!(result.is_err()); insta::assert_snapshot!( "random_table_name_doesn't_exist,_should_error", format!("{}", result.unwrap_err()) ); // 3. IOx schema table (not a system table) let result = server.show_system(db_name).table("cpu").run(); assert!(result.is_err()); insta::assert_snapshot!( "iox_schema_table_name_exists,_but_should_error_because_we're_concerned_here_with_system_tables", format!("{}", result.unwrap_err()) ); } #[tokio::test] async fn distinct_cache_create_and_delete() { let server = TestServer::spawn().await; let db_name = "foo"; server .write_lp_to_db( db_name, "cpu,t1=a,t2=b,t3=c f1=true,f2=\"hello\",f3=4i,f4=4u,f5=5 1000", influxdb3_client::Precision::Second, ) .await .expect("write to db"); // Create distinct cache let result = server .create_distinct_cache(db_name, "cpu", "cache_money") .with_columns(["t1", "t2"]) .with_max_cardinality(20000) .with_max_age("200s") .run() .unwrap(); assert_contains!(result, "new cache created"); // Try to create again, should not work let err = server .create_distinct_cache(db_name, "cpu", "cache_money") .with_columns(["t1", "t2"]) .with_max_cardinality(20000) .with_max_age("200s") .run() .unwrap_err(); assert_contains!( err.to_string(), "attempted to create a resource that already exists" ); // Delete the cache let result = server .delete_distinct_cache(db_name, "cpu", "cache_money") .run() .unwrap(); assert_contains!(result, "distinct cache deleted successfully"); } #[test_log::test(tokio::test)] async fn test_linebuilder_escaping() { let (temp_dir, plugin_path) = create_plugin_in_temp_dir( r#" def process_writes(influxdb3_local, table_batches, args=None): # Basic test with all field types basic_line = LineBuilder("metrics")\ .tag("host", "server01")\ .tag("region", "us-west")\ .int64_field("cpu", 42)\ .uint64_field("memory_bytes", 8589934592)\ .float64_field("load", 86.5)\ .string_field("status", "online")\ .bool_field("healthy", True)\ .time_ns(1609459200000000000) influxdb3_local.write(basic_line) # Test escaping spaces in tag values spaces_line = LineBuilder("system_metrics")\ .tag("server", "app server 1")\ .tag("datacenter", "us west")\ .int64_field("count", 1) influxdb3_local.write(spaces_line) # Test escaping commas in tag values commas_line = LineBuilder("network")\ .tag("servers", "web,app,db")\ .tag("location", "floor1,rack3")\ .int64_field("connections", 256) influxdb3_local.write(commas_line) # Test escaping equals signs in tag values equals_line = LineBuilder("formulas")\ .tag("equation", "y=mx+b")\ .tag("result", "a=b=c")\ .float64_field("value", 3.14159) influxdb3_local.write(equals_line) # Test escaping backslashes in tag values backslash_line = LineBuilder("paths")\ .tag("windows_path", "C:\\Program Files\\App")\ .tag("regex", "\\d+\\w+")\ .string_field("description", "Windows\\Unix paths")\ .int64_field("count", 42) influxdb3_local.write(backslash_line) # Test escaping quotes in string fields quotes_line = LineBuilder("messages")\ .tag("type", "notification")\ .string_field("content", "User said \"Hello World\"")\ .string_field("json", "{\"key\": \"value\"}")\ .int64_field("priority", 1) influxdb3_local.write(quotes_line) # Test a complex case with multiple escape characters complex_line = LineBuilder("complex,measurement")\ .tag("location", "New York, USA")\ .tag("details", "floor=5, room=3")\ .tag("path", "C:\\Users\\Admin\\Documents")\ .string_field("message", "Error in line: \"x = y + z\"")\ .string_field("query", "SELECT * FROM table WHERE id=\"abc\"")\ .float64_field("value", 123.456)\ .time_ns(1609459200000000000) influxdb3_local.write(complex_line) # Test writing to a specific database specific_db_line = LineBuilder("memory_stats")\ .tag("host", "server1")\ .int64_field("usage", 75) influxdb3_local.write_to_db("metrics_db", specific_db_line) # Test multiple chained methods chained_line = LineBuilder("sensor_data")\ .tag("device", "thermostat").tag("room", "living room").tag("floor", "1")\ .float64_field("temperature", 72.5).int64_field("humidity", 45).string_field("mode", "auto") influxdb3_local.write(chained_line) influxdb3_local.info("All LineBuilder tests completed")"#, ); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_filename = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "test_db"; // Run the test using the fluent API let result = server .test_wal_plugin(db_name, plugin_filename) .with_line_protocol("test_input,tag1=tag1_value field1=1i 500") .add_input_argument("arg1=test") .run() .expect("Failed to run wal plugin test"); let expected_result = serde_json::json!({ "log_lines": [ "INFO: All LineBuilder tests completed" ], "database_writes": { "test_db": [ "metrics,host=server01,region=us-west cpu=42i,memory_bytes=8589934592u,load=86.5,status=\"online\",healthy=t 1609459200000000000", "system_metrics,server=app\\ server\\ 1,datacenter=us\\ west count=1i", "network,servers=web\\,app\\,db,location=floor1\\,rack3 connections=256i", "formulas,equation=y\\=mx+b,result=a\\=b\\=c value=3.14159", "paths,windows_path=C:\\\\Program\\ Files\\\\App,regex=\\\\d+\\\\w+ description=\"Windows\\\\Unix paths\",count=42i", "messages,type=notification content=\"User said \\\"Hello World\\\"\",json=\"{\\\"key\\\": \\\"value\\\"}\",priority=1i", "complex\\,measurement,location=New\\ York\\,\\ USA,details=floor\\=5\\,\\ room\\=3,path=C:\\\\Users\\\\Admin\\\\Documents message=\"Error in line: \\\"x = y + z\\\"\",query=\"SELECT * FROM table WHERE id=\\\"abc\\\"\",value=123.456 1609459200000000000", "sensor_data,device=thermostat,room=living\\ room,floor=1 temperature=72.5,humidity=45i,mode=\"auto\"" ], "metrics_db": [ "memory_stats,host=server1 usage=75i" ] }, "errors": [] }); assert_eq!(result, expected_result); } #[test_log::test(tokio::test)] async fn test_wal_plugin_test() { use crate::server::ConfigProvider; use influxdb3_client::Precision; let (temp_dir, plugin_path) = create_plugin_in_temp_dir( r#" def process_writes(influxdb3_local, table_batches, args=None): influxdb3_local.info("arg1: " + args["arg1"]) query_params = {"host": args["host"]} query_result = influxdb3_local.query("SELECT host, region, usage FROM cpu where host = $host", query_params) influxdb3_local.info("query result: " + str(query_result)) influxdb3_local.info("i", query_result, args["host"]) influxdb3_local.warn("w:", query_result) influxdb3_local.error("err", query_result) for table_batch in table_batches: influxdb3_local.info("table: " + table_batch["table_name"]) for row in table_batch["rows"]: influxdb3_local.info("row: " + str(row)) line = LineBuilder("some_table")\ .tag("tag1", "tag1_value")\ .tag("tag2", "tag2_value")\ .int64_field("field1", 1)\ .float64_field("field2", 2.0)\ .string_field("field3", "number three") influxdb3_local.write(line) other_line = LineBuilder("other_table") other_line.int64_field("other_field", 1) other_line.float64_field("other_field2", 3.14) other_line.time_ns(1302) influxdb3_local.write_to_db("mytestdb", other_line) influxdb3_local.info("done")"#, ); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_filename = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; server .write_lp_to_db( "foo", "cpu,host=s1,region=us-east usage=0.9\n\ cpu,host=s2,region=us-east usage=0.89\n\ cpu,host=s1,region=us-east usage=0.85", Precision::Nanosecond, ) .await .unwrap(); let db_name = "foo"; // Run the test using the fluent API let result = server .test_wal_plugin(db_name, plugin_filename) .with_line_protocol("test_input,tag1=tag1_value,tag2=tag2_value field1=1i 500") .with_input_arguments(["arg1=arg1_value", "host=s2"]) .run() .expect("Failed to run test_wal_plugin"); debug!(result = ?result, "test wal plugin"); let expected_result = r#"{ "log_lines": [ "INFO: arg1: arg1_value", "INFO: query result: [{'host': 's2', 'region': 'us-east', 'usage': 0.89}]", "INFO: i [{'host': 's2', 'region': 'us-east', 'usage': 0.89}] s2", "WARN: w: [{'host': 's2', 'region': 'us-east', 'usage': 0.89}]", "ERROR: err [{'host': 's2', 'region': 'us-east', 'usage': 0.89}]", "INFO: table: test_input", "INFO: row: {'tag1': 'tag1_value', 'tag2': 'tag2_value', 'field1': 1, 'time': 500}", "INFO: done" ], "database_writes": { "mytestdb": [ "other_table other_field=1i,other_field2=3.14 1302" ], "foo": [ "some_table,tag1=tag1_value,tag2=tag2_value field1=1i,field2=2.0,field3=\"number three\"" ] }, "errors": [] }"#; let expected_result = serde_json::from_str::(expected_result).unwrap(); assert_eq!(result, expected_result); } #[test_log::test(tokio::test)] async fn test_schedule_plugin_test() { use crate::server::ConfigProvider; use influxdb3_client::Precision; // Create plugin file with a scheduled task let (temp_dir, plugin_path) = create_plugin_in_temp_dir( r#" def process_scheduled_call(influxdb3_local, schedule_time, args=None): influxdb3_local.info(f"args are {args}") influxdb3_local.info("Successfully called")"#, ); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_name = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; // Write some test data server .write_lp_to_db( "foo", "cpu,host=host1,region=us-east usage=0.75\n\ cpu,host=host2,region=us-west usage=0.82\n\ cpu,host=host3,region=us-east usage=0.91", Precision::Nanosecond, ) .await .unwrap(); let db_name = "foo"; // Run the schedule plugin test using the new fluent API let result = server .test_schedule_plugin(db_name, plugin_name, "*/5 * * * * *") // Run every 5 seconds .add_input_argument("region=us-east") .run() .expect("Failed to run schedule plugin test"); debug!(result = ?result, "test schedule plugin"); // The trigger_time will be dynamic, so we'll just verify it exists and is in the right format let trigger_time = result["trigger_time"].as_str().unwrap(); assert!(trigger_time.contains('T')); // Basic RFC3339 format check // Check the rest of the response structure let expected_result = serde_json::json!({ "log_lines": [ "INFO: args are {'region': 'us-east'}", "INFO: Successfully called" ], "database_writes": { }, "errors": [] }); assert_eq!(result["log_lines"], expected_result["log_lines"]); assert_eq!( result["database_writes"], expected_result["database_writes"] ); assert_eq!(result["errors"], expected_result["errors"]); } #[test_log::test(tokio::test)] async fn test_schedule_plugin_test_with_strftime() { use crate::server::ConfigProvider; use influxdb3_client::Precision; let (temp_dir, plugin_path) = create_plugin_in_temp_dir( r#" import datetime def process_scheduled_call(influxdb3_local, schedule_time, args=None): timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S') influxdb3_local.info(f"Current timestamp: {timestamp}") influxdb3_local.info(f"args are {args}") influxdb3_local.info("Successfully called")"#, ); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_name = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; // Write some test data server .write_lp_to_db( "foo", "cpu,host=host1,region=us-east usage=0.75\n\ cpu,host=host2,region=us-west usage=0.82\n\ cpu,host=host3,region=us-east usage=0.91", Precision::Nanosecond, ) .await .unwrap(); let db_name = "foo"; // Run the schedule plugin test using the fluent API let result = server .test_schedule_plugin(db_name, plugin_name, "*/5 * * * * *") // Run every 5 seconds .add_input_argument("region=us-east") .run() .unwrap(); debug!(result = ?result, "test schedule plugin"); // The trigger_time will be dynamic, so we'll just verify it exists and is in the right format let trigger_time = result["trigger_time"].as_str().unwrap(); assert!(trigger_time.contains('T')); // Basic RFC3339 format check // Check the rest of the response structure // Modified expectations to include the timestamp message let log_lines = &result["log_lines"]; assert_eq!(log_lines.as_array().unwrap().len(), 3); assert!( log_lines[0] .as_str() .unwrap() .starts_with("INFO: Current timestamp:") ); assert_eq!( log_lines[1].as_str().unwrap(), "INFO: args are {'region': 'us-east'}" ); assert_eq!(log_lines[2].as_str().unwrap(), "INFO: Successfully called"); assert_eq!(result["database_writes"], serde_json::json!({})); assert_eq!(result["errors"], serde_json::json!([])); } #[test_log::test(tokio::test)] async fn test_wal_plugin_errors() { use crate::server::ConfigProvider; use influxdb3_client::Precision; struct Test { name: &'static str, plugin_code: &'static str, expected_error: &'static str, } let tests = vec![ Test { name: "invalid_python", plugin_code: r#" lkjasdf9823 jjjjj / sss"#, expected_error: "error executing plugin: IndentationError: unexpected indent (, line 2)", }, Test { name: "no_process_writes", plugin_code: r#" def not_process_writes(influxdb3_local, table_batches, args=None): influxdb3_local.info("done")"#, expected_error: "error executing plugin: the process_writes function is not present in the plugin. Should be defined as: process_writes(influxdb3_local, table_batches, args=None)", }, Test { name: "no_args", plugin_code: r#" def process_writes(influxdb3_local, table_batches): influxdb3_local.info("done") "#, expected_error: "error executing plugin: TypeError: process_writes() takes 2 positional arguments but 3 were given", }, Test { name: "no_table_batches", plugin_code: r#" def process_writes(influxdb3_local, args=None): influxdb3_local.info("done") "#, expected_error: "error executing plugin: TypeError: process_writes() takes from 1 to 2 positional arguments but 3 were given", }, Test { name: "no_influxdb3_local", plugin_code: r#" def process_writes(table_batches, args=None): influxdb3_local.info("done") "#, expected_error: "error executing plugin: TypeError: process_writes() takes from 1 to 2 positional arguments but 3 were given", }, Test { name: "line_builder_no_field", plugin_code: r#" def process_writes(influxdb3_local, table_batches, args=None): line = LineBuilder("some_table") influxdb3_local.write(line) "#, expected_error: "error executing plugin: InvalidLineError: At least one field is required: some_table", }, Test { name: "query_no_table", plugin_code: r#" def process_writes(influxdb3_local, table_batches, args=None): influxdb3_local.query("SELECT foo FROM not_here") "#, expected_error: "error executing plugin: QueryError: error: error while planning query: Error during planning: table 'public.iox.not_here' not found executing query: SELECT foo FROM not_here", }, ]; let plugin_dir = TempDir::new().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir.path().to_str().unwrap()) .spawn() .await; server .write_lp_to_db( "foo", "cpu,host=s1,region=us-east usage=0.9\n\ cpu,host=s2,region=us-east usage=0.89\n\ cpu,host=s1,region=us-east usage=0.85", Precision::Nanosecond, ) .await .unwrap(); let db_name = "foo"; let lp = "test_input,tag1=tag1_value,tag2=tag2_value field1=1i 500"; let input_args = vec!["arg1=arg1_value", "host=s2"]; for test in tests { let mut plugin_file = NamedTempFile::new_in(plugin_dir.path()).unwrap(); writeln!(plugin_file, "{}", test.plugin_code).unwrap(); let plugin_name = plugin_file.path().file_name().unwrap().to_str().unwrap(); // Using the fluent API for testing wal plugins let result = server .test_wal_plugin(db_name, plugin_name) .with_line_protocol(lp) .with_input_arguments(input_args.clone()) .run() .unwrap(); debug!(result = ?result, "test wal plugin"); let errors = result.get("errors").unwrap().as_array().unwrap(); let error = errors[0].as_str().unwrap(); assert_eq!( error, test.expected_error, "test: {}, response was: {}", test.name, serde_json::to_string_pretty(&result).unwrap() ); } } #[test_log::test(tokio::test)] async fn test_load_wal_plugin_from_gh() { use crate::server::ConfigProvider; use influxdb3_client::Precision; let plugin_dir = TempDir::new().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir.path().to_str().unwrap()) .spawn() .await; server .write_lp_to_db( "foo", "cpu,host=s1,region=us-east usage=0.9\n\ cpu,host=s2,region=us-east usage=0.89\n\ cpu,host=s1,region=us-east usage=0.85", Precision::Nanosecond, ) .await .unwrap(); let db_name = "foo"; // this will pull from https://github.com/influxdata/influxdb3_plugins/blob/main/examples/wal_plugin/wal_plugin.py let plugin_name = "gh:examples/wal_plugin/wal_plugin.py"; let lp = "test_input,tag1=tag1_value,tag2=tag2_value field1=1i 500"; // Run the test using the fluent API let result = server .test_wal_plugin(db_name, plugin_name) .with_line_protocol(lp) .run() .unwrap(); debug!(result = ?result, "test wal plugin"); let expected_result = r#"{ "log_lines": [ "INFO: wal_plugin.py done" ], "database_writes": { "foo": [ "write_reports,table_name=test_input row_count=1i" ] }, "errors": [] }"#; let expected_result = serde_json::from_str::(expected_result).unwrap(); assert_eq!(result, expected_result); } #[test_log::test(tokio::test)] async fn test_request_plugin_and_trigger() { let plugin_code = r#" import json def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): for k, v in query_parameters.items(): influxdb3_local.info(f"query_parameters: {k}={v}") for k, v in request_headers.items(): influxdb3_local.info(f"request_headers: {k}={v}") request_data = json.loads(request_body) influxdb3_local.info("parsed JSON request body:", request_data) # write the data to the database line = LineBuilder("request_data").tag("tag1", "tag1_value").int64_field("field1", 1) # get a string of the line to return as the body line_str = line.build() influxdb3_local.write(line) return {"status": "ok", "line": line_str} "#; let (temp_dir, plugin_path) = create_plugin_in_temp_dir(plugin_code); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_filename = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "foo"; // Setup: create database and plugin server.create_database(db_name).run().unwrap(); let trigger_path = "foo"; // creating the trigger should enable it let result = server .create_trigger(db_name, trigger_path, plugin_filename, "request:bar") .add_trigger_argument("test_arg=hello") .run() .unwrap(); debug!(result = ?result, "create trigger"); assert_contains!(&result, "Trigger foo created successfully"); // send an HTTP request to the server let client = reqwest::Client::new(); let response = client .post(format!("{}/api/v3/engine/bar", server.client_addr())) .header("Content-Type", "application/json") .query(&[("q1", "whatevs")]) .body(r#"{"hello": "world"}"#) .send() .await .unwrap(); assert_eq!(response.status(), 200); let body = response.text().await.unwrap(); let body = serde_json::from_str::(&body).unwrap(); // the plugin was just supposed to return the line that it wrote into the DB assert_eq!( body, json!({"status": "ok", "line": "request_data,tag1=tag1_value field1=1i"}) ); // query to see if the plugin did the write into the DB let val = server .query_sql(db_name) .with_sql("SELECT tag1, field1 FROM request_data") .run() .unwrap(); assert_eq!(val, json!([{"tag1": "tag1_value", "field1": 1}])); // now update it to make sure that it reloads let plugin_code = r#" import json def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): return {"status": "updated"} "#; // Rewrite the file. let mut file = fs::OpenOptions::new() .write(true) .truncate(true) .open(&plugin_path) .unwrap(); file.write_all(plugin_code.as_bytes()).unwrap(); // send an HTTP request to the server let response = client .post(format!("{}/api/v3/engine/bar", server.client_addr())) .header("Content-Type", "application/json") .query(&[("q1", "whatevs")]) .body(r#"{"hello": "world"}"#) .send() .await .unwrap(); let body = response.text().await.unwrap(); let body = serde_json::from_str::(&body).unwrap(); assert_eq!(body, json!({"status": "updated"})); } #[test_log::test(tokio::test)] async fn test_flask_string_response() { let plugin_code = r#" def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): # Return a simple string (should become HTML with 200 status) return "Hello, World!" "#; let (temp_dir, plugin_path) = create_plugin_in_temp_dir(plugin_code); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_filename = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "flask_test_string"; // Setup: create database and plugin server .create_database(db_name) .run() .expect("Failed to create database"); let trigger_path = "string_test"; server .create_trigger(db_name, trigger_path, plugin_filename, "request:test_route") .run() .expect("Failed to create trigger"); // Send request to test string response let client = reqwest::Client::new(); let response = client .get(format!("{}/api/v3/engine/test_route", server.client_addr())) .send() .await .unwrap(); assert_eq!(response.status(), 200); assert_eq!(response.headers().get("content-type").unwrap(), "text/html"); let body = response.text().await.unwrap(); assert_eq!(body, "Hello, World!"); } #[test_log::test(tokio::test)] async fn test_flask_dict_json_response() { let plugin_code = r#" def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): # Return a dictionary (should be converted to JSON) return {"message": "Hello, World!", "status": "success"} "#; let (temp_dir, plugin_path) = create_plugin_in_temp_dir(plugin_code); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_filename = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "flask_test_dict"; // Setup: create database and plugin server .create_database(db_name) .run() .expect("Failed to create database"); let trigger_path = "dict_test"; server .create_trigger(db_name, trigger_path, plugin_filename, "request:test_route") .run() .expect("Failed to create trigger"); // Send request to test dict/JSON response let client = reqwest::Client::new(); let response = client .get(format!("{}/api/v3/engine/test_route", server.client_addr())) .send() .await .unwrap(); assert_eq!(response.status(), 200); assert_eq!( response.headers().get("content-type").unwrap(), "application/json" ); let body = response.json::().await.unwrap(); assert_eq!( body, json!({"message": "Hello, World!", "status": "success"}) ); } #[test_log::test(tokio::test)] async fn test_flask_tuple_response_with_status() { let plugin_code = r#" def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): # Return a tuple with content and status code return "Created successfully", 201 "#; let (temp_dir, plugin_path) = create_plugin_in_temp_dir(plugin_code); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_filename = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "flask_test_tuple_status"; // Setup: create database and plugin server .create_database(db_name) .run() .expect("Failed to create database"); let trigger_path = "tuple_status_test"; server .create_trigger(db_name, trigger_path, plugin_filename, "request:test_route") .run() .expect("Failed to create trigger"); // Send request to test tuple with status response let client = reqwest::Client::new(); let response = client .get(format!("{}/api/v3/engine/test_route", server.client_addr())) .send() .await .unwrap(); assert_eq!(response.status(), 201); assert_eq!(response.headers().get("content-type").unwrap(), "text/html"); let body = response.text().await.unwrap(); assert_eq!(body, "Created successfully"); } #[test_log::test(tokio::test)] async fn test_flask_tuple_response_with_headers() { let plugin_code = r#" def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): # Return a tuple with content and headers return "Custom Content-Type", {"Content-Type": "text/plain", "X-Custom-Header": "test-value"} "#; let (temp_dir, plugin_path) = create_plugin_in_temp_dir(plugin_code); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_filename = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "flask_test_tuple_headers"; // Setup: create database and plugin server .create_database(db_name) .run() .expect("Failed to create database"); let trigger_path = "tuple_headers_test"; server .create_trigger(db_name, trigger_path, plugin_filename, "request:test_route") .run() .expect("Failed to create trigger"); // Send request to test tuple with headers response let client = reqwest::Client::new(); let response = client .get(format!("{}/api/v3/engine/test_route", server.client_addr())) .send() .await .unwrap(); assert_eq!(response.status(), 200); assert_eq!( response.headers().get("content-type").unwrap(), "text/plain" ); assert_eq!( response.headers().get("x-custom-header").unwrap(), "test-value" ); let body = response.text().await.unwrap(); assert_eq!(body, "Custom Content-Type"); } #[test_log::test(tokio::test)] async fn test_flask_tuple_response_with_status_and_headers() { let plugin_code = r#" def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): # Return a tuple with content, status, and headers return "Not Found", 404, {"Content-Type": "text/plain", "X-Error-Code": "NOT_FOUND"} "#; let (temp_dir, plugin_path) = create_plugin_in_temp_dir(plugin_code); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_filename = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "flask_test_tuple_status_headers"; // Setup: create database and plugin server .create_database(db_name) .run() .expect("Failed to create database"); let trigger_path = "tuple_status_headers_test"; server .create_trigger(db_name, trigger_path, plugin_filename, "request:test_route") .run() .expect("Failed to create trigger"); // Send request to test tuple with status and headers response let client = reqwest::Client::new(); let response = client .get(format!("{}/api/v3/engine/test_route", server.client_addr())) .send() .await .unwrap(); assert_eq!(response.status(), 404); assert_eq!( response.headers().get("content-type").unwrap(), "text/plain" ); assert_eq!(response.headers().get("x-error-code").unwrap(), "NOT_FOUND"); let body = response.text().await.unwrap(); assert_eq!(body, "Not Found"); } #[test_log::test(tokio::test)] async fn test_flask_list_json_response() { let plugin_code = r#" def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): # Return a list (should be converted to JSON array) return ["item1", "item2", "item3"] "#; let (temp_dir, plugin_path) = create_plugin_in_temp_dir(plugin_code); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_filename = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "flask_test_list"; // Setup: create database and plugin server .create_database(db_name) .run() .expect("Failed to create database"); let trigger_path = "list_test"; server .create_trigger(db_name, trigger_path, plugin_filename, "request:test_route") .run() .expect("Failed to create trigger"); // Send request to test list/JSON response let client = reqwest::Client::new(); let response = client .get(format!("{}/api/v3/engine/test_route", server.client_addr())) .send() .await .unwrap(); assert_eq!(response.status(), 200); assert_eq!( response.headers().get("content-type").unwrap(), "application/json" ); let body = response.json::().await.unwrap(); assert_eq!(body, json!(["item1", "item2", "item3"])); } #[test_log::test(tokio::test)] async fn test_flask_iterator_response() { let plugin_code = r#" def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): # Return a generator that yields strings def generate_content(): yield "Line 1\n" yield "Line 2\n" yield "Line 3\n" return generate_content() "#; let (temp_dir, plugin_path) = create_plugin_in_temp_dir(plugin_code); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_filename = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "flask_test_iterator"; // Setup: create database and plugin server .create_database(db_name) .run() .expect("Failed to create database"); let trigger_path = "iterator_test"; server .create_trigger(db_name, trigger_path, plugin_filename, "request:test_route") .run() .expect("Failed to create trigger"); // Send request to test iterator/generator response let client = reqwest::Client::new(); let response = client .get(format!("{}/api/v3/engine/test_route", server.client_addr())) .send() .await .unwrap(); assert_eq!(response.status(), 200); assert_eq!(response.headers().get("content-type").unwrap(), "text/html"); let body = response.text().await.unwrap(); assert_eq!(body, "Line 1\nLine 2\nLine 3\n"); } #[test_log::test(tokio::test)] async fn test_flask_response_object() { let plugin_code = r#" def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): # Creating a mock Flask Response object class FlaskResponse: def __init__(self, response, status=200, headers=None): self.response = response self.status_code = status self.headers = headers or {} def get_data(self): return self.response def __flask_response__(self): return True # Return a Flask Response object response = FlaskResponse( "Custom Flask Response", status=202, headers={"Content-Type": "text/custom", "X-Generated-By": "FlaskResponse"} ) return response "#; let (temp_dir, plugin_path) = create_plugin_in_temp_dir(plugin_code); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_filename = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "flask_test_response_obj"; // Setup: create database and trigger server.create_database(db_name).run().unwrap(); let trigger_path = "response_obj_test"; server .create_trigger(db_name, trigger_path, plugin_filename, "request:test_route") .run() .unwrap(); // Send request to test Flask Response object let client = reqwest::Client::new(); let response = client .get(format!("{}/api/v3/engine/test_route", server.client_addr())) .send() .await .unwrap(); assert_eq!(response.status(), 202); assert_eq!( response.headers().get("content-type").unwrap(), "text/custom" ); assert_eq!( response.headers().get("x-generated-by").unwrap(), "FlaskResponse" ); let body = response.text().await.unwrap(); assert_eq!(body, "Custom Flask Response"); } #[test_log::test(tokio::test)] async fn test_flask_json_dict_with_status_in_tuple() { let plugin_code = r#" def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None): # Return a tuple with a dictionary and status code return {"error": "Not Found", "code": 404}, 404 "#; let (temp_dir, plugin_path) = create_plugin_in_temp_dir(plugin_code); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_filename = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "flask_test_json_status"; // Setup: create database and trigger server.create_database(db_name).run().unwrap(); let trigger_path = "json_status_test"; server .create_trigger(db_name, trigger_path, plugin_filename, "request:test_route") .run() .unwrap(); // Send request to test JSON dict with status let client = reqwest::Client::new(); let response = client .get(format!("{}/api/v3/engine/test_route", server.client_addr())) .send() .await .unwrap(); assert_eq!(response.status(), 404); assert_eq!( response.headers().get("content-type").unwrap(), "application/json" ); let body = response.json::().await.unwrap(); assert_eq!(body, json!({"error": "Not Found", "code": 404})); } #[test_log::test(tokio::test)] async fn test_trigger_create_validates_file_present() { let plugin_dir = TempDir::new().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir.path().to_str().unwrap()) .spawn() .await; let db_name = "foo"; // Setup: create database server.create_database(db_name).run().unwrap(); let trigger_path = "foo"; // creating the trigger should return a 404 error from github let result = server .create_trigger(db_name, trigger_path, "gh:not_a_file.py", "request:foo") .add_trigger_argument("test_arg=hello") .run(); assert!(result.is_err()); let err = result.unwrap_err().to_string(); assert!(err.contains("error reading file from Github: 404 Not Found")); } fn check_logs(response: &Value, expected_logs: &[&str]) { assert!( response["errors"].as_array().unwrap().is_empty(), "Unexpected errors in response {:#?}, expected logs {:#?}", response, expected_logs ); let logs = response["log_lines"].as_array().unwrap(); assert_eq!( expected_logs, logs, "mismatched log lines, expected {:#?}, got {:#?}, errors are {:#?}", expected_logs, logs, response["errors"] ); } #[test_log::test(tokio::test)] async fn test_basic_cache_functionality() { // Create plugin file that uses the cache let plugin_code = r#" def process_scheduled_call(influxdb3_local, schedule_time, args=None): # Store a value in the cache influxdb3_local.cache.put("test_key", "test_value") # Retrieve the value value = influxdb3_local.cache.get("test_key") influxdb3_local.info(f"Retrieved value: {value}") # Verify the value matches what we stored assert value == "test_value", f"Expected 'test_value', got {value}" influxdb3_local.info("Cache test passed")"#; let (temp_dir, plugin_path) = create_plugin_in_temp_dir(plugin_code); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_name = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "foo"; // Setup: create database server.create_database(db_name).run().unwrap(); // Run the schedule plugin test let res = server .test_schedule_plugin(db_name, plugin_name, "* * * * * *") .with_cache_name("test_basic_cache") .run() .unwrap(); // Check that the cache operations were successful let expected_logs = [ "INFO: Retrieved value: test_value", "INFO: Cache test passed", ]; check_logs(&res, &expected_logs); } #[test_log::test(tokio::test)] async fn test_cache_ttl() { // Create plugin file that tests cache TTL let plugin_code = r#" import time def process_scheduled_call(influxdb3_local, schedule_time, args=None): # Store a value with 1 second TTL influxdb3_local.cache.put("ttl_key", "expires_soon", ttl=1.0) # Immediately check - should exist value = influxdb3_local.cache.get("ttl_key") influxdb3_local.info(f"Immediate retrieval: {value}") # Wait for TTL to expire influxdb3_local.info("Waiting for TTL to expire...") time.sleep(1.5) # Check again - should be gone expired_value = influxdb3_local.cache.get("ttl_key") influxdb3_local.info(f"After expiration: {expired_value}") # Verify the value is None after TTL expiration assert expired_value is None, f"Expected None after TTL expiration, got {expired_value}" influxdb3_local.info("TTL test passed")"#; let (temp_dir, plugin_path) = create_plugin_in_temp_dir(plugin_code); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_name = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "foo"; // Setup: create database server.create_database(db_name).run().unwrap(); // Run the schedule plugin test let res = server .test_schedule_plugin(db_name, plugin_name, "* * * * * *") .with_cache_name("ttl_test_cache") .run() .unwrap(); // Check logs to verify TTL functionality let expected_logs = [ "INFO: Immediate retrieval: expires_soon", "INFO: Waiting for TTL to expire...", "INFO: After expiration: None", "INFO: TTL test passed", ]; check_logs(&res, &expected_logs); } #[test_log::test(tokio::test)] async fn test_cache_namespaces() { // Create plugin file that tests different cache namespaces let plugin_code = r#" def process_scheduled_call(influxdb3_local, schedule_time, args=None): # Store value in local (trigger) cache influxdb3_local.cache.put("namespace_key", "trigger_value", use_global=False) # Store different value in global cache with same key influxdb3_local.cache.put("namespace_key", "global_value", use_global=True) # Retrieve from both caches trigger_value = influxdb3_local.cache.get("namespace_key", use_global=False) global_value = influxdb3_local.cache.get("namespace_key", use_global=True) influxdb3_local.info(f"Trigger cache value: {trigger_value}") influxdb3_local.info(f"Global cache value: {global_value}") # Verify namespace isolation assert trigger_value == "trigger_value", f"Expected 'trigger_value', got {trigger_value}" assert global_value == "global_value", f"Expected 'global_value', got {global_value}" influxdb3_local.info("Cache namespace test passed")"#; let (temp_dir, plugin_path) = create_plugin_in_temp_dir(plugin_code); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_name = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "foo"; // Setup: create database server.create_database(db_name).run().unwrap(); // Run the schedule plugin test let res = server .test_schedule_plugin(db_name, plugin_name, "* * * * * *") .with_cache_name("test_cache_namespaces") .run() .unwrap(); // Check logs to verify namespace isolation let expected_logs = [ "INFO: Trigger cache value: trigger_value", "INFO: Global cache value: global_value", "INFO: Cache namespace test passed", ]; check_logs(&res, &expected_logs); } #[test_log::test(tokio::test)] async fn test_cache_persistence_across_runs() { // Create plugin file that stores a value in the cache let first_plugin_code = r#" def process_scheduled_call(influxdb3_local, schedule_time, args=None): # Store a value that should persist across test runs with the same cache name influxdb3_local.cache.put("persistent_key", "I should persist") influxdb3_local.info("Stored value in shared test cache")"#; let (temp_dir, first_plugin_file) = create_plugin_in_temp_dir(first_plugin_code); let plugin_dir = temp_dir.path().to_str().unwrap(); // Create second plugin file that retrieves the value let second_plugin_code = r#" def process_scheduled_call(influxdb3_local, schedule_time, args=None): # Retrieve the value from the previous run value = influxdb3_local.cache.get("persistent_key") influxdb3_local.info(f"Retrieved value from previous run: {value}") # Verify the value persisted assert value == "I should persist", f"Expected 'I should persist', got {value}" influxdb3_local.info("Cache persistence test passed")"#; let second_plugin_file = write_plugin_to_temp_dir(&temp_dir, "second_plugin.py", second_plugin_code); let first_plugin_name = first_plugin_file.file_name().unwrap().to_str().unwrap(); let second_plugin_name = second_plugin_file.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "foo"; let cache_name = "shared_test_cache"; // Setup: create database server.create_database(db_name).run().unwrap(); // First run - store the value let first_res = server .test_schedule_plugin(db_name, first_plugin_name, "* * * * * *") .with_cache_name(cache_name) .run() .unwrap(); let expected_logs = ["INFO: Stored value in shared test cache"]; check_logs(&first_res, &expected_logs); // Second run - retrieve the value let second_res = server .test_schedule_plugin(db_name, second_plugin_name, "*/5 * * * * *") .with_cache_name(cache_name) .run() .unwrap(); // Check logs to verify persistence let expected_logs = [ "INFO: Retrieved value from previous run: I should persist", "INFO: Cache persistence test passed", ]; check_logs(&second_res, &expected_logs); } #[test_log::test(tokio::test)] async fn test_cache_deletion() { // Create plugin file that tests cache deletion let plugin_code = r#" def process_scheduled_call(influxdb3_local, schedule_time, args=None): # Store some values influxdb3_local.cache.put("delete_key", "delete me") # Verify the value exists before_delete = influxdb3_local.cache.get("delete_key") influxdb3_local.info(f"Before deletion: {before_delete}") # Delete the value deleted = influxdb3_local.cache.delete("delete_key") influxdb3_local.info(f"Deletion result: {deleted}") # Try to retrieve the deleted value after_delete = influxdb3_local.cache.get("delete_key") influxdb3_local.info(f"After deletion: {after_delete}") # Verify deletion was successful assert deleted is True, f"Expected True for successful deletion, got {deleted}" assert after_delete is None, f"Expected None after deletion, got {after_delete}" # Test deleting non-existent key non_existent = influxdb3_local.cache.delete("non_existent_key") influxdb3_local.info(f"Deleting non-existent key: {non_existent}") assert non_existent is False, f"Expected False for non-existent key, got {non_existent}" influxdb3_local.info("Cache deletion test passed")"#; let (temp_dir, plugin_path) = create_plugin_in_temp_dir(plugin_code); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_name = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "foo"; // Setup: create database server.create_database(db_name).run().unwrap(); // Run the schedule plugin test let res = server .test_schedule_plugin(db_name, plugin_name, "* * * * * *") .with_cache_name("test_cache_deletion") .run() .unwrap(); // Check logs to verify deletion functionality let expected_logs = [ "INFO: Before deletion: delete me", "INFO: Deletion result: True", "INFO: After deletion: None", "INFO: Deleting non-existent key: False", "INFO: Cache deletion test passed", ]; check_logs(&res, &expected_logs); } #[test_log::test(tokio::test)] async fn test_cache_with_wal_plugin() { // Create plugin file that uses cache in a WAL plugin let plugin_code = r#" def process_writes(influxdb3_local, table_batches, args=None): # Count the number of records processed record_count = sum(len(batch['rows']) for batch in table_batches) # Get previous count from cache or default to 0 previous_count = influxdb3_local.cache.get("processed_records") or 0 if previous_count == 0: influxdb3_local.info("First run, initializing count") else: influxdb3_local.info(f"Previous count: {previous_count}") # Update the count in cache new_count = previous_count + record_count influxdb3_local.cache.put("processed_records", new_count) influxdb3_local.info(f"Updated count: {new_count}") # We're not modifying any records for this test return table_batches"#; let (temp_dir, plugin_path) = create_plugin_in_temp_dir(plugin_code); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_name = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; // Make sure each test creates its own database let db_name = "foo"; let cache_name = "test_cache"; // Setup: create database server.create_database(db_name).run().unwrap(); // First run with 3 data points let first_lp = "cpu,host=host1 usage=0.75\ncpu,host=host2 usage=0.82\ncpu,host=host3 usage=0.91"; // First test run let first_res = server .test_wal_plugin(db_name, plugin_name) .with_line_protocol(first_lp) .with_cache_name(cache_name) .run() .unwrap(); // Check first run logs let init_log = "INFO: First run, initializing count"; let update_log = "INFO: Updated count: 3"; check_logs(&first_res, &[init_log, update_log]); // Second run with 2 more data points let second_lp = "cpu,host=host4 usage=0.65\ncpu,host=host5 usage=0.72"; // Second test run let second_res = server .test_wal_plugin(db_name, plugin_name) .with_line_protocol(second_lp) .with_cache_name(cache_name) .run() .unwrap(); // Check second run logs let prev_log = "INFO: Previous count: 3"; let update_log = "INFO: Updated count: 5"; check_logs(&second_res, &[prev_log, update_log]); } #[test_log::test(tokio::test)] async fn test_trigger_cache_cleanup() { // Create a scheduled trigger script that writes different data based on whether it has cache data let trigger_script = r#" def process_scheduled_call(influxdb3_local, schedule_time, args=None): # Check if we've run before by looking for a cache value run_count = influxdb3_local.cache.get("run_counter") influxdb3_local.info(f"run count {run_count}") if run_count is None: line = LineBuilder("cache_test").tag("test", "cleanup").int64_field("count", 1) influxdb3_local.write(line) influxdb3_local.cache.put("run_counter", 1) else: influxdb3_local.cache.put("run_counter", run_count + 1)"#; let (temp_dir, plugin_path) = create_plugin_in_temp_dir(trigger_script); let plugin_dir = temp_dir.path().to_str().unwrap(); let trigger_filename = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "foo"; let trigger_name = "cache_cleanup_trigger"; // Setup: create database server.create_database(db_name).run().unwrap(); // Create scheduled trigger server .create_trigger(db_name, trigger_name, trigger_filename, "cron:* * * * * *") .run() .unwrap(); // Wait for trigger to run several times tokio::time::sleep(std::time::Duration::from_millis(3100)).await; // Query to see what values were written before disabling let first_query_result = server .query_sql(db_name) .with_sql("SELECT count(*) FROM cache_test") .run() .unwrap(); // There should be a single row assert_eq!(json!([{"count(*)":1}]), first_query_result); // Disable trigger (which should clear its cache) server.disable_trigger(db_name, trigger_name).run().unwrap(); // Re-enable trigger server.enable_trigger(db_name, trigger_name).run().unwrap(); // Wait for trigger to run again tokio::time::sleep(std::time::Duration::from_millis(1100)).await; // Query results after re-enabling let second_query_result = server .query_sql(db_name) .with_sql("SELECT count(*) FROM cache_test") .run() .unwrap(); // If cache was cleared, we should see a second row assert_eq!(json!([{"count(*)":2}]), second_query_result); } #[test_log::test(tokio::test)] async fn test_complex_object_caching() { // Create plugin file that tests caching of complex Python objects let plugin_code = r#" def process_scheduled_call(influxdb3_local, schedule_time, args=None): # Create a complex object (nested dictionaries and lists) complex_obj = { "string_key": "value", "int_key": 42, "float_key": 3.14, "bool_key": True, "list_key": [1, 2, 3, "four", 5.0], "nested_dict": { "inner_key": "inner_value", "inner_list": [{"a": 1}, {"b": 2}] } } # Store in cache influxdb3_local.cache.put("complex_obj", complex_obj) influxdb3_local.info("Stored complex object in cache") # Retrieve from cache retrieved_obj = influxdb3_local.cache.get("complex_obj") # Verify structure was preserved influxdb3_local.info(f"Retrieved object type: {type(retrieved_obj).__name__}") influxdb3_local.info(f"Retrieved nested_dict type: {type(retrieved_obj['nested_dict']).__name__}") influxdb3_local.info(f"Retrieved list_key type: {type(retrieved_obj['list_key']).__name__}") # Test with some assertions assert retrieved_obj["string_key"] == "value", "String value mismatch" assert retrieved_obj["int_key"] == 42, "Integer value mismatch" assert retrieved_obj["nested_dict"]["inner_key"] == "inner_value", "Nested value mismatch" assert retrieved_obj["list_key"][3] == "four", "List value mismatch" assert retrieved_obj["nested_dict"]["inner_list"][1]["b"] == 2, "Deeply nested value mismatch" influxdb3_local.info("Complex object verification passed")"#; let (temp_dir, plugin_path) = create_plugin_in_temp_dir(plugin_code); let plugin_dir = temp_dir.path().to_str().unwrap(); let plugin_name = plugin_path.file_name().unwrap().to_str().unwrap(); let server = TestServer::configure() .with_plugin_dir(plugin_dir) .spawn() .await; let db_name = "foo"; // Setup: create database server.create_database(db_name).run().unwrap(); // Run the schedule plugin test let res = server .test_schedule_plugin(db_name, plugin_name, "* * * * * *") .with_cache_name("test_complex_objects") .run() .unwrap(); // Check logs to verify complex object preservation let expected_logs = [ "INFO: Stored complex object in cache", "INFO: Retrieved object type: dict", "INFO: Retrieved nested_dict type: dict", "INFO: Retrieved list_key type: list", "INFO: Complex object verification passed", ]; check_logs(&res, &expected_logs); } #[test_log::test(tokio::test)] async fn write_and_query_via_stdin() { let server = TestServer::spawn().await; let db_name = "foo"; // Write data via stdin let result = server .write(db_name) .run_with_stdin("bar,tag1=1,tag2=2 field1=1,field2=2 0") .unwrap(); assert_eq!("success", result); debug!(result = ?result, "wrote data to database"); // Query data let result = server .query_sql(db_name) .with_sql("SELECT * FROM bar") .run() .unwrap(); debug!(result = ?result, "queried data from database"); assert_eq!( json!([{"field1":1.0,"field2":2.0,"tag1":"1","tag2":"2","time":"1970-01-01T00:00:00"}]), result ); } #[test_log::test(tokio::test)] async fn write_and_query_via_file() { let server = TestServer::spawn().await; let db_name = "foo"; // Write data via file let result = server .write(db_name) .with_file("tests/server/fixtures/file.lp") .run() .unwrap(); assert_eq!("success", result); debug!(result = ?result, "wrote data to database"); // Query data via file let result = server .query_sql(db_name) .with_query_file("tests/server/fixtures/file.sql") .run() .unwrap(); debug!(result = ?result, "queried data from database"); assert_eq!( json!([{"field1":1.0,"field2":2.0,"tag1":"1","tag2":"2","time":"1970-01-01T00:00:00"}]), result ); } #[test_log::test(tokio::test)] async fn write_and_query_via_string() { let server = TestServer::spawn().await; let db_name = "foo"; // Write data directly let result = server .write(db_name) .with_line_protocol("bar,tag1=1,tag2=2 field1=1,field2=2 0") .run() .unwrap(); assert_eq!("success", result); debug!(result = ?result, "wrote data to database"); // Query data directly let result = server .query_sql(db_name) .with_sql("SELECT * FROM bar") .run() .unwrap(); debug!(result = ?result, "queried data from database"); assert_eq!( json!([{"field1":1.0,"field2":2.0,"tag1":"1","tag2":"2","time":"1970-01-01T00:00:00"}]), result ); } #[test_log::test(tokio::test)] async fn write_with_precision_arg() { let server = TestServer::spawn().await; let db_name = "foo"; let table_name = "bar"; struct TestCase { name: &'static str, precision: Option<&'static str>, expected: &'static str, } let tests = vec![ TestCase { name: "default precision is seconds", precision: None, expected: "1970-01-01T00:00:01", }, TestCase { name: "set seconds precision", precision: Some("s"), expected: "1970-01-01T00:00:01", }, TestCase { name: "set milliseconds precision", precision: Some("ms"), expected: "1970-01-01T00:00:00.001", }, TestCase { name: "set microseconds precision", precision: Some("us"), expected: "1970-01-01T00:00:00.000001", }, TestCase { name: "set nanoseconds precision", precision: Some("ns"), expected: "1970-01-01T00:00:00.000000001", }, ]; for test in tests { let TestCase { name, precision, expected, } = test; let name_tag = name.replace(' ', "_"); let lp = format!("{table_name},name={name_tag} theanswer=42 1"); // Build write query with precision if specified let mut write_query = server.write(db_name).with_line_protocol(&lp); if let Some(prec) = precision { write_query = write_query.with_precision(prec); } // Execute write write_query.run().unwrap(); // Verify result let result = server .query_sql(db_name) .with_sql(format!( "SELECT * FROM {table_name} WHERE name='{name_tag}'" )) .run() .unwrap(); assert_eq!( result, json!([{ "name": name_tag, "theanswer": 42.0, "time": expected, }]), "test failed: {name}", ); } // Test invalid precision let lp = format!("{table_name},name=invalid_precision theanswer=42 1"); let result = server .write(db_name) .with_line_protocol(&lp) .with_precision("fake") .run(); // This should result in an error assert!(result.is_err()); insta::assert_snapshot!("invalid_precision", format!("{}", result.unwrap_err())); }