use crate::server::TestServer; use anyhow::{Result, bail}; use assert_cmd::cargo::CommandCargoExt; use serde_json::Value; use std::io::Write; use std::process::{Command, Stdio}; use std::thread; // Builder for the 'create database' command pub struct CreateDatabaseQuery<'a> { server: &'a TestServer, name: String, } impl TestServer { pub fn create_database(&self, name: impl Into) -> CreateDatabaseQuery { CreateDatabaseQuery { server: self, name: name.into(), } } fn run_with_options( &self, commands: Vec<&str>, args: &[&str], input: Option<&str>, ) -> Result { let client_addr = self.client_addr(); let mut command_args = commands.clone(); command_args.push("--host"); command_args.push(client_addr.as_str()); if let Some(token) = self.token() { command_args.push("--token"); command_args.push(token); } let mut child_process = Command::cargo_bin("influxdb3")? .args(&command_args) .args(args) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn()?; if let Some(input) = input { let input = input.to_string(); let mut stdin = child_process.stdin.take().expect("failed to open stdin"); thread::spawn(move || { stdin .write_all(input.as_bytes()) .expect("cannot write confirmation msg to stdin"); }); } let output = child_process.wait_with_output()?; if !output.status.success() { println!( "failed to run influxdb3 {} {}", command_args.join(" "), args.join(" ") ); bail!("{}", String::from_utf8_lossy(&output.stderr)); } Ok(String::from_utf8(output.stdout)?.trim().into()) } pub fn run(&self, commands: Vec<&str>, args: &[&str]) -> Result { self.run_with_options(commands, args, None) } pub fn run_with_confirmation(&self, commands: Vec<&str>, args: &[&str]) -> Result { self.run_with_options(commands, args, Some("yes")) } } impl CreateDatabaseQuery<'_> { pub fn run(self) -> Result { self.server.run( vec![ "create", "database", "--tls-ca", "../testing-certs/rootCA.pem", ], &[self.name.as_str()], ) } } // Builder for the 'create table' command pub struct CreateTableQuery<'a> { server: &'a TestServer, table_name: String, db_name: String, tags: Vec, fields: Vec<(String, String)>, } impl TestServer { pub fn create_table( &self, db_name: impl Into, table_name: impl Into, ) -> CreateTableQuery { CreateTableQuery { server: self, db_name: db_name.into(), table_name: table_name.into(), tags: Vec::new(), fields: Vec::new(), } } } impl CreateTableQuery<'_> { pub fn with_tags(mut self, tags: impl IntoIterator>) -> Self { self.tags = tags.into_iter().map(Into::into).collect(); self } pub fn add_tag(mut self, tag: impl Into) -> Self { self.tags.push(tag.into()); self } pub fn with_fields( mut self, fields: impl IntoIterator, String)>, ) -> Self { self.fields = fields .into_iter() .map(|(name, dt)| (name.into(), dt)) .collect(); self } pub fn add_field(mut self, name: impl Into, data_type: impl Into) -> Self { self.fields.push((name.into(), data_type.into())); self } pub fn run(self) -> Result { // Convert tags to comma-separated string for --tags argument let tags_arg = self.tags.join(","); // Convert fields to comma-separated "name:type" string for --fields argument let fields_arg = self .fields .iter() .map(|(name, dt)| format!("{}:{}", name, dt)) .collect::>() .join(","); let mut args = vec![ "--database", &self.db_name, &self.table_name, "--tags", &tags_arg, "--tls-ca", "../testing-certs/rootCA.pem", ]; if !self.fields.is_empty() { args.push("--fields"); args.push(&fields_arg); } self.server.run(vec!["create", "table"], &args) } } // Builder for the 'show databases' command pub struct ShowDatabasesQuery<'a> { server: &'a TestServer, format: Option, show_deleted: bool, } impl TestServer { pub fn show_databases(&self) -> ShowDatabasesQuery { ShowDatabasesQuery { server: self, format: None, show_deleted: false, } } } impl ShowDatabasesQuery<'_> { pub fn with_format(mut self, format: impl Into) -> Self { self.format = Some(format.into()); self } pub fn show_deleted(mut self, show: bool) -> Self { self.show_deleted = show; self } pub fn run(self) -> Result { let mut args = Vec::new(); if let Some(format) = self.format.as_ref() { args.push("--format"); args.push(format); } if self.show_deleted { args.push("--show-deleted"); } args.push("--tls-ca"); args.push("../testing-certs/rootCA.pem"); self.server.run(vec!["show", "databases"], &args) } } // Builder for the 'delete database' command pub struct DeleteDatabaseQuery<'a> { server: &'a TestServer, name: String, } impl TestServer { pub fn delete_database(&self, name: impl Into) -> DeleteDatabaseQuery { DeleteDatabaseQuery { server: self, name: name.into(), } } } impl DeleteDatabaseQuery<'_> { pub fn run(self) -> Result { self.server.run_with_confirmation( vec![ "delete", "database", "--tls-ca", "../testing-certs/rootCA.pem", ], &[self.name.as_str()], ) } } // Builder for SQL query commands pub struct QuerySqlQuery<'a> { server: &'a TestServer, database: String, query: Option, query_file: Option, output_file: Option, std_in: Option, } impl TestServer { pub fn query_sql(&self, database: impl Into) -> QuerySqlQuery { QuerySqlQuery { server: self, database: database.into(), query: None, query_file: None, output_file: None, std_in: None, } } } impl QuerySqlQuery<'_> { pub fn with_output_file(mut self, file_path: impl Into) -> Self { self.output_file = Some(file_path.into()); self } pub fn with_sql(mut self, query: impl Into) -> Self { self.query = Some(query.into()); self } pub fn with_query_file(mut self, query_file: impl Into) -> Self { self.query_file = Some(query_file.into()); self } pub fn run(self) -> Result { let mut args = vec![ "--database", &self.database, "--format", "json", "--tls-ca", "../testing-certs/rootCA.pem", ]; if let Some(file_path) = self.output_file.as_ref() { args.push("--output"); args.push(file_path); } if let Some(query) = self.query.as_ref() { args.push(query); Ok(serde_json::from_str( &self.server.run(vec!["query"], &args)?, )?) } else if let Some(query_file) = self.query_file.as_ref() { args.push("--file"); args.push(query_file); Ok(serde_json::from_str( &self.server.run(vec!["query"], &args)?, )?) } else if let Some(stdin) = self.std_in.as_ref() { Ok(serde_json::from_str(&self.server.run_with_options( vec!["query"], &args, Some(stdin.as_str()), )?)?) } else { Ok(serde_json::from_str( &self.server.run(vec!["query"], &args)?, )?) } } } // Builder for the 'delete table' command pub struct DeleteTableQuery<'a> { server: &'a TestServer, db_name: String, table_name: String, } impl TestServer { pub fn delete_table( &self, db_name: impl Into, table_name: impl Into, ) -> DeleteTableQuery { DeleteTableQuery { server: self, db_name: db_name.into(), table_name: table_name.into(), } } } impl DeleteTableQuery<'_> { pub fn run(self) -> Result { let args = vec![ self.table_name.as_str(), "--database", self.db_name.as_str(), "--tls-ca", "../testing-certs/rootCA.pem", ]; self.server .run_with_confirmation(vec!["delete", "table"], &args) } } // Builder for the 'create distinct_cache' command pub struct CreateDistinctCacheQuery<'a> { server: &'a TestServer, db_name: String, table_name: String, cache_name: String, columns: Vec, max_cardinality: Option, max_age: Option, } impl TestServer { pub fn create_distinct_cache( &self, db_name: impl Into, table_name: impl Into, cache_name: impl Into, ) -> CreateDistinctCacheQuery { CreateDistinctCacheQuery { server: self, db_name: db_name.into(), table_name: table_name.into(), cache_name: cache_name.into(), columns: Vec::new(), max_cardinality: None, max_age: None, } } } impl CreateDistinctCacheQuery<'_> { pub fn with_columns(mut self, columns: impl IntoIterator>) -> Self { self.columns = columns.into_iter().map(Into::into).collect(); self } pub fn add_column(mut self, column: impl Into) -> Self { self.columns.push(column.into()); self } pub fn with_max_cardinality(mut self, max_cardinality: usize) -> Self { self.max_cardinality = Some(max_cardinality); self } pub fn with_max_age(mut self, max_age: impl Into) -> Self { self.max_age = Some(max_age.into()); self } pub fn run(self) -> Result { if self.columns.is_empty() { bail!("At least one column must be specified for distinct cache"); } let columns_arg = self.columns.join(","); let mut args = vec![ "--database", self.db_name.as_str(), "--table", self.table_name.as_str(), "--columns", &columns_arg, "--tls-ca", "../testing-certs/rootCA.pem", ]; let max_cardinality = self.max_cardinality.unwrap_or_default(); let max_cardinality_str = max_cardinality.to_string(); if max_cardinality > 0 { args.push("--max-cardinality"); args.push(&max_cardinality_str); } if let Some(max_age) = &self.max_age { args.push("--max-age"); args.push(max_age); } args.push(self.cache_name.as_str()); self.server.run(vec!["create", "distinct_cache"], &args) } } // Builder for the 'delete distinct_cache' command pub struct DeleteDistinctCacheQuery<'a> { server: &'a TestServer, db_name: String, table_name: String, cache_name: String, } impl TestServer { pub fn delete_distinct_cache( &self, db_name: impl Into, table_name: impl Into, cache_name: impl Into, ) -> DeleteDistinctCacheQuery { DeleteDistinctCacheQuery { server: self, db_name: db_name.into(), table_name: table_name.into(), cache_name: cache_name.into(), } } } impl DeleteDistinctCacheQuery<'_> { pub fn run(self) -> Result { let args = vec![ "--database", self.db_name.as_str(), "--table", self.table_name.as_str(), self.cache_name.as_str(), "--tls-ca", "../testing-certs/rootCA.pem", ]; self.server .run(vec!["delete", "distinct_cache"], args.as_slice()) } } // Builder for the 'create trigger' command pub struct CreateTriggerQuery<'a> { server: &'a TestServer, trigger_name: String, db_name: String, plugin_filename: String, trigger_spec: String, trigger_arguments: Vec, disabled: bool, run_asynchronous: bool, error_behavior: Option, } impl TestServer { pub fn create_trigger( &self, db_name: impl Into, trigger_name: impl Into, plugin_filename: impl Into, trigger_spec: impl Into, ) -> CreateTriggerQuery { CreateTriggerQuery { server: self, db_name: db_name.into(), trigger_name: trigger_name.into(), plugin_filename: plugin_filename.into(), trigger_spec: trigger_spec.into(), trigger_arguments: Vec::new(), disabled: false, run_asynchronous: false, error_behavior: None, } } } impl CreateTriggerQuery<'_> { pub fn with_trigger_arguments( mut self, args: impl IntoIterator>, ) -> Self { self.trigger_arguments = args.into_iter().map(Into::into).collect(); self } pub fn add_trigger_argument(mut self, arg: impl Into) -> Self { self.trigger_arguments.push(arg.into()); self } pub fn disabled(mut self, disabled: bool) -> Self { self.disabled = disabled; self } pub fn run_asynchronous(mut self, async_run: bool) -> Self { self.run_asynchronous = async_run; self } pub fn error_behavior(mut self, behavior: impl Into) -> Self { self.error_behavior = Some(behavior.into()); self } pub fn run(self) -> Result { let mut args = vec![ self.trigger_name.as_str(), "--database", self.db_name.as_str(), "--plugin-filename", self.plugin_filename.as_str(), "--trigger-spec", self.trigger_spec.as_str(), "--tls-ca", "../testing-certs/rootCA.pem", ]; let trigger_args = self.trigger_arguments.join(","); if !self.trigger_arguments.is_empty() { args.push("--trigger-arguments"); args.push(trigger_args.as_str()); } if self.disabled { args.push("--disabled"); } if self.run_asynchronous { args.push("--run-asynchronous"); } if let Some(behavior) = &self.error_behavior { args.push("--error-behavior"); args.push(behavior); } self.server.run(vec!["create", "trigger"], args.as_slice()) } } // Builder for the 'enable trigger' command pub struct EnableTriggerQuery<'a> { server: &'a TestServer, db_name: String, trigger_name: String, } impl TestServer { pub fn enable_trigger( &self, db_name: impl Into, trigger_name: impl Into, ) -> EnableTriggerQuery { EnableTriggerQuery { server: self, db_name: db_name.into(), trigger_name: trigger_name.into(), } } } impl EnableTriggerQuery<'_> { pub fn run(self) -> Result { let args = vec![ self.trigger_name.as_str(), "--database", self.db_name.as_str(), "--tls-ca", "../testing-certs/rootCA.pem", ]; self.server.run(vec!["enable", "trigger"], args.as_slice()) } } // Builder for the 'disable trigger' command pub struct DisableTriggerQuery<'a> { server: &'a TestServer, db_name: String, trigger_name: String, } impl TestServer { pub fn disable_trigger( &self, db_name: impl Into, trigger_name: impl Into, ) -> DisableTriggerQuery { DisableTriggerQuery { server: self, db_name: db_name.into(), trigger_name: trigger_name.into(), } } } impl DisableTriggerQuery<'_> { pub fn run(self) -> Result { let args = vec![ self.trigger_name.as_str(), "--database", self.db_name.as_str(), "--tls-ca", "../testing-certs/rootCA.pem", ]; self.server.run(vec!["disable", "trigger"], args.as_slice()) } } // Builder for the 'delete trigger' command pub struct DeleteTriggerQuery<'a> { server: &'a TestServer, db_name: String, trigger_name: String, force: bool, } impl TestServer { pub fn delete_trigger( &self, db_name: impl Into, trigger_name: impl Into, ) -> DeleteTriggerQuery { DeleteTriggerQuery { server: self, db_name: db_name.into(), trigger_name: trigger_name.into(), force: false, } } } impl DeleteTriggerQuery<'_> { pub fn force(mut self, force: bool) -> Self { self.force = force; self } pub fn run(self) -> Result { let mut args = vec![ self.trigger_name.as_str(), "--database", self.db_name.as_str(), "--tls-ca", "../testing-certs/rootCA.pem", ]; if self.force { args.push("--force"); } self.server.run(vec!["delete", "trigger"], args.as_slice()) } } // Builder for the 'test wal_plugin' command pub struct TestWalPluginQuery<'a> { server: &'a TestServer, database: String, plugin_filename: String, line_protocol: Option, cache_name: Option, input_arguments: Vec, } impl TestServer { pub fn test_wal_plugin( &self, database: impl Into, plugin_filename: impl Into, ) -> TestWalPluginQuery { TestWalPluginQuery { server: self, database: database.into(), plugin_filename: plugin_filename.into(), line_protocol: None, cache_name: None, input_arguments: Vec::new(), } } } impl TestWalPluginQuery<'_> { pub fn with_line_protocol(mut self, lp: impl Into) -> Self { self.line_protocol = Some(lp.into()); self } pub fn with_input_arguments( mut self, args: impl IntoIterator>, ) -> Self { self.input_arguments = args.into_iter().map(Into::into).collect(); self } pub fn add_input_argument(mut self, arg: impl Into) -> Self { self.input_arguments.push(arg.into()); self } pub fn with_cache_name(mut self, cache_name: impl Into) -> Self { self.cache_name = Some(cache_name.into()); self } pub fn run(self) -> Result { let mut args = Vec::new(); args.push("--database"); args.push(&self.database); if let Some(lp) = &self.line_protocol { args.push("--lp"); args.push(lp); } if let Some(cache_name) = &self.cache_name { args.push("--cache-name"); args.push(cache_name); } let input_args = self.input_arguments.join(","); if !self.input_arguments.is_empty() { args.push("--input-arguments"); args.push(&input_args); } args.push(&self.plugin_filename); args.push("--tls-ca"); args.push("../testing-certs/rootCA.pem"); let output = self.server.run( vec!["test", "wal_plugin"], &args.iter().map(AsRef::as_ref).collect::>(), )?; // Parse the output as JSON let result = serde_json::from_str(&output).map_err(|e| { anyhow::anyhow!("Failed to parse WAL plugin test result as JSON: {}", e) })?; Ok(result) } } // Builder for the 'test schedule_plugin' command pub struct TestSchedulePluginQuery<'a> { server: &'a TestServer, db_name: String, plugin_name: String, schedule: String, cache_name: Option, input_arguments: Vec, } impl TestServer { pub fn test_schedule_plugin( &self, db_name: impl Into, plugin_name: impl Into, schedule: impl Into, ) -> TestSchedulePluginQuery { TestSchedulePluginQuery { server: self, db_name: db_name.into(), plugin_name: plugin_name.into(), schedule: schedule.into(), cache_name: None, input_arguments: Vec::new(), } } } impl TestSchedulePluginQuery<'_> { pub fn with_input_arguments( mut self, args: impl IntoIterator>, ) -> Self { self.input_arguments = args.into_iter().map(Into::into).collect(); self } pub fn with_cache_name(mut self, cache_name: impl Into) -> Self { self.cache_name = Some(cache_name.into()); self } pub fn add_input_argument(mut self, arg: impl Into) -> Self { self.input_arguments.push(arg.into()); self } pub fn run(self) -> Result { let mut args = vec![ "--database", self.db_name.as_str(), "--schedule", self.schedule.as_str(), "--tls-ca", "../testing-certs/rootCA.pem", ]; if let Some(cache_name) = &self.cache_name { args.push("--cache-name"); args.push(cache_name); } let input_argument = self.input_arguments.join(","); if !self.input_arguments.is_empty() { args.push("--input-arguments"); args.push(input_argument.as_str()); } args.push(self.plugin_name.as_str()); let result = self.server.run(vec!["test", "schedule_plugin"], &args)?; serde_json::from_str(&result).map_err(Into::into) } } // Builder for the 'package install' command pub struct InstallPackageQuery<'a> { server: &'a TestServer, packages: Vec, requirements_file: Option, virtual_env_location: Option, package_manager: Option, } impl TestServer { pub fn install_package(&self) -> InstallPackageQuery { InstallPackageQuery { server: self, packages: Vec::new(), requirements_file: None, virtual_env_location: None, package_manager: None, } } } impl InstallPackageQuery<'_> { pub fn with_packages(mut self, packages: impl IntoIterator>) -> Self { self.packages = packages.into_iter().map(|p| p.into()).collect(); self } pub fn add_package(mut self, package: impl Into) -> Self { self.packages.push(package.into()); self } pub fn with_requirements_file(mut self, requirements_file: impl Into) -> Self { self.requirements_file = Some(requirements_file.into()); self } pub fn with_virtual_env_location(mut self, virtual_env_location: impl Into) -> Self { self.virtual_env_location = Some(virtual_env_location.into()); self } pub fn with_package_manager(mut self, package_manager: impl Into) -> Self { self.package_manager = Some(package_manager.into()); self } pub fn run(self) -> Result { let mut args = Vec::new(); // Add the packages if specified if !self.packages.is_empty() { args.extend(self.packages.iter().map(|p| p.as_str())); } // Add optional arguments if let Some(req_file) = &self.requirements_file { args.push("--requirements"); args.push(req_file); } if let Some(venv) = &self.virtual_env_location { args.push("--virtual-env-location"); args.push(venv); } if let Some(pkg_mgr) = &self.package_manager { args.push("--package-manager"); args.push(pkg_mgr); } args.push("--tls-ca"); args.push("../testing-certs/rootCA.pem"); // Run the command self.server.run(vec!["package", "install"], &args) } } // Builder for the 'write' command pub struct WriteQuery<'a> { server: &'a TestServer, db_name: String, line_protocol: Option, input_file: Option, precision: Option, } impl TestServer { pub fn write(&self, db_name: impl Into) -> WriteQuery { WriteQuery { server: self, db_name: db_name.into(), line_protocol: None, input_file: None, precision: None, } } } impl WriteQuery<'_> { pub fn with_line_protocol(mut self, line_protocol: impl Into) -> Self { self.line_protocol = Some(line_protocol.into()); self } pub fn with_file(mut self, file_path: impl Into) -> Self { self.input_file = Some(file_path.into()); self } pub fn with_precision(mut self, precision: impl Into) -> Self { self.precision = Some(precision.into()); self } pub fn run(self) -> Result { let mut args = vec![ "--database", &self.db_name, "--tls-ca", "../testing-certs/rootCA.pem", ]; if let Some(precision) = &self.precision { args.push("--precision"); args.push(precision); } if let Some(file_path) = &self.input_file { args.push("--file"); args.push(file_path); return self.server.run(vec!["write"], &args); } if let Some(lp) = &self.line_protocol { // For line protocol provided directly as string args.push(lp); return self.server.run(vec!["write"], &args); } // If we get here, there's nothing to write bail!("No line protocol provided. Use with_line_protocol() or with_file().") } pub fn run_with_stdin(self, stdin_input: impl Into) -> Result { let mut args = vec![ "--database", &self.db_name, "--tls-ca", "../testing-certs/rootCA.pem", ]; if let Some(precision) = &self.precision { args.push("--precision"); args.push(precision); } let input = stdin_input.into(); self.server .run_with_options(vec!["write"], &args, Some(&input)) } } // Base struct for all "show system" subcommands pub struct ShowSystemQuery<'a> { server: &'a TestServer, db_name: String, format: Option, } // Specific struct for "table-list" subcommand pub struct ShowSystemTableListQuery<'a> { base: ShowSystemQuery<'a>, } // Specific struct for "table" subcommand pub struct ShowSystemTableQuery<'a> { base: ShowSystemQuery<'a>, system_table: String, limit: Option, order_by: Option, select: Option, } // Specific struct for "summary" subcommand pub struct ShowSystemSummaryQuery<'a> { base: ShowSystemQuery<'a>, limit: Option, } impl TestServer { // Entry point for show system commands pub fn show_system(&self, db_name: impl Into) -> ShowSystemQuery { ShowSystemQuery { server: self, db_name: db_name.into(), format: None, } } } impl<'a> ShowSystemQuery<'a> { // Common method to set output format for all system queries pub fn with_format(mut self, format: impl Into) -> Self { self.format = Some(format.into()); self } // Branch to table-list subcommand pub fn table_list(self) -> ShowSystemTableListQuery<'a> { ShowSystemTableListQuery { base: self } } // Branch to table subcommand pub fn table(self, system_table: impl Into) -> ShowSystemTableQuery<'a> { ShowSystemTableQuery { base: self, system_table: system_table.into(), limit: None, order_by: None, select: None, } } // Branch to summary subcommand pub fn summary(self) -> ShowSystemSummaryQuery<'a> { ShowSystemSummaryQuery { base: self, limit: None, } } } impl ShowSystemTableListQuery<'_> { // Run the table-list command pub fn run(self) -> Result { let mut args = vec![ "--database", &self.base.db_name, "table-list", "--tls-ca", "../testing-certs/rootCA.pem", ]; if let Some(format) = &self.base.format { args.push("--format"); args.push(format); } self.base.server.run(vec!["show", "system"], &args) } } impl ShowSystemTableQuery<'_> { // Set limit for table entries pub fn with_limit(mut self, limit: usize) -> Self { self.limit = Some(limit); self } // Set order by clause pub fn with_order_by(mut self, order_by: impl Into) -> Self { self.order_by = Some(order_by.into()); self } // Set select fields pub fn with_select(mut self, select: impl Into) -> Self { self.select = Some(select.into()); self } // Run the table command pub fn run(self) -> Result { let mut args = vec![ "--database", &self.base.db_name, "table", "--tls-ca", "../testing-certs/rootCA.pem", ]; if let Some(format) = &self.base.format { args.push("--format"); args.push(format); } let limit = self.limit.unwrap_or_default().to_string(); if self.limit.is_some() { args.push("--limit"); args.push(limit.as_str()); } if let Some(order_by) = &self.order_by { args.push("--order-by"); args.push(order_by); } if let Some(select) = &self.select { args.push("--select"); args.push(select); } // System table name is required args.push(&self.system_table); self.base.server.run(vec!["show", "system"], &args) } } impl ShowSystemSummaryQuery<'_> { // Set limit for summary entries pub fn with_limit(mut self, limit: usize) -> Self { self.limit = Some(limit); self } // Run the summary command pub fn run(self) -> Result { let mut args = vec![ "--database", &self.base.db_name, "summary", "--tls-ca", "../testing-certs/rootCA.pem", ]; if let Some(format) = &self.base.format { args.push("--format"); args.push(format); } let limit = self.limit.unwrap_or_default().to_string(); if self.limit.is_some() { args.push("--limit"); args.push(limit.as_str()); } self.base.server.run(vec!["show", "system"], &args) } }