use std::{ future::Future, io::{BufRead, BufReader}, process::{Child, Command, Stdio}, time::Duration, }; use arrow::record_batch::RecordBatch; use arrow_flight::{decode::FlightRecordBatchStream, FlightClient}; use assert_cmd::cargo::CommandCargoExt; use futures::TryStreamExt; use influxdb3_client::Precision; use influxdb_iox_client::flightsql::FlightSqlClient; use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use reqwest::Response; mod auth; mod client; mod configure; mod flight; mod limits; #[cfg(feature = "system-py")] mod packages; mod ping; mod query; mod system_tables; mod write; pub trait ConfigProvider { /// Convert this to a set of command line arguments for `influxdb3 serve` fn as_args(&self) -> Vec; /// Get the auth token from this config if it was set fn auth_token(&self) -> Option<&str>; /// Spawn a new [`TestServer`] with this configuration /// /// This will run the `influxdb3 serve` command and bind its HTTP address to a random port /// on localhost. fn spawn(&self) -> impl Future where Self: Sized + Sync, { async { TestServer::spawn_inner(self).await } } } /// Configuration for a [`TestServer`] #[derive(Debug, Default)] pub struct TestConfig { auth_token: Option<(String, String)>, node_id: Option, plugin_dir: Option, virtual_env_dir: Option, package_manager: Option, // If None, use memory object store. object_store_dir: Option, } impl TestConfig { /// Set the auth token for this [`TestServer`] pub fn with_auth_token, R: Into>( mut self, hashed_token: S, raw_token: R, ) -> Self { self.auth_token = Some((hashed_token.into(), raw_token.into())); self } /// Set a host identifier prefix on the spawned [`TestServer`] pub fn with_node_id>(mut self, node_id: S) -> Self { self.node_id = Some(node_id.into()); self } /// Set the plugin dir for this [`TestServer`] pub fn with_plugin_dir>(mut self, plugin_dir: S) -> Self { self.plugin_dir = Some(plugin_dir.into()); self } /// Set the virtual env dir for this [`TestServer`] pub fn with_virtual_env>(mut self, virtual_env_dir: S) -> Self { self.virtual_env_dir = Some(virtual_env_dir.into()); self } pub fn with_package_manager>(mut self, package_manager: S) -> Self { self.package_manager = Some(package_manager.into()); self } // Set the object store dir for this [`TestServer`] pub fn with_object_store_dir>(mut self, object_store_dir: S) -> Self { self.object_store_dir = Some(object_store_dir.into()); self } } impl ConfigProvider for TestConfig { fn as_args(&self) -> Vec { let mut args = vec![]; if let Some((token, _)) = &self.auth_token { args.append(&mut vec!["--bearer-token".to_string(), token.to_owned()]); } if let Some(plugin_dir) = &self.plugin_dir { args.append(&mut vec!["--plugin-dir".to_string(), plugin_dir.to_owned()]); } if let Some(virtual_env_dir) = &self.virtual_env_dir { args.append(&mut vec![ "--virtual-env-location".to_string(), virtual_env_dir.to_owned(), ]); } if let Some(package_manager) = &self.package_manager { args.append(&mut vec![ "--package-manager".to_string(), package_manager.to_owned(), ]); } args.push("--node-id".to_string()); if let Some(host) = &self.node_id { args.push(host.to_owned()); } else { args.push("test-server".to_string()); } if let Some(object_store_dir) = &self.object_store_dir { args.append(&mut vec![ "--object-store".to_string(), "file".to_string(), "--data-dir".to_string(), object_store_dir.to_owned(), ]); } else { args.append(&mut vec![ "--object-store".to_string(), "memory".to_string(), ]); } args } fn auth_token(&self) -> Option<&str> { self.auth_token.as_ref().map(|(_, t)| t.as_str()) } } /// A running instance of the `influxdb3 serve` process /// /// Logs will be emitted to stdout/stderr if the TEST_LOG environment variable is set, e.g., /// ``` /// TEST_LOG= cargo nextest run -p influxdb3 --nocapture /// ``` /// This will forward the value provided in `TEST_LOG` to the `LOG_FILTER` env var on the running /// `influxdb` binary. By default, a log filter of `info` is used, which would provide similar /// output to what is seen in production, however, per-crate filters can be provided via this /// argument, e.g., `info,influxdb3_write=debug` would emit logs at `INFO` level for all crates /// except for the `influxdb3_write` crate, which will emit logs at the `DEBUG` level. pub struct TestServer { auth_token: Option, bind_addr: String, server_process: Child, http_client: reqwest::Client, } impl TestServer { /// Spawn a new [`TestServer`] /// /// This will run the `influxdb3 serve` command, and bind its HTTP /// address to a random port on localhost. pub async fn spawn() -> Self { Self::spawn_inner(&TestConfig::default()).await } /// Configure a [`TestServer`] before spawning pub fn configure() -> TestConfig { TestConfig::default() } async fn spawn_inner(config: &impl ConfigProvider) -> Self { let mut command = Command::cargo_bin("influxdb3").expect("create the influxdb3 command"); let command = command .arg("serve") .arg("--disable-telemetry-upload") // bind to port 0 to get a random port assigned: .args(["--http-bind", "0.0.0.0:0"]) .args(["--wal-flush-interval", "10ms"]) .args(config.as_args()) .stdout(Stdio::piped()); // Use the TEST_LOG env var to determine if logs are emitted from the spawned process let emit_logs = if std::env::var("TEST_LOG").is_ok() { // use "info" filter, as would be used in production: command.env("LOG_FILTER", "info"); true } else { false }; let mut server_process = command.spawn().expect("spawn the influxdb3 server process"); // pipe stdout so we can get the randomly assigned port from the log output: let process_stdout = server_process .stdout .take() .expect("should acquire stdout from process"); let mut lines = BufReader::new(process_stdout).lines(); let bind_addr = loop { let Some(Ok(line)) = lines.next() else { panic!("stdout closed unexpectedly"); }; if emit_logs { println!("{line}"); } if line.contains("startup time") { if let Some(address) = line.split("address=").last() { break address.to_string(); } } }; tokio::task::spawn_blocking(move || { for line in lines { let line = line.expect("io error while getting line from stdout"); if emit_logs { println!("{line}"); } } }); let server = Self { auth_token: config.auth_token().map(|s| s.to_owned()), bind_addr, server_process, http_client: reqwest::Client::new(), }; server.wait_until_ready().await; server } /// Get the URL of the running service for use with an HTTP client pub fn client_addr(&self) -> String { format!("http://{addr}", addr = self.bind_addr) } /// Get a [`FlightSqlClient`] for making requests to the running service over gRPC pub async fn flight_sql_client(&self, database: &str) -> FlightSqlClient { let channel = tonic::transport::Channel::from_shared(self.client_addr()) .expect("create tonic channel") .connect() .await .expect("connect to gRPC client"); let mut client = FlightSqlClient::new(channel); client.add_header("database", database).unwrap(); client } /// Get a raw [`FlightClient`] for performing Flight actions directly pub async fn flight_client(&self) -> FlightClient { let channel = tonic::transport::Channel::from_shared(self.client_addr()) .expect("create tonic channel") .connect() .await .expect("connect to gRPC client"); FlightClient::new(channel) } pub fn kill(&mut self) { self.server_process.kill().expect("kill the server process"); } async fn wait_until_ready(&self) { let mut count = 0; while self .http_client .get(format!("{base}/health", base = self.client_addr())) .send() .await .is_err() { if count > 500 { panic!("server failed to start"); } else { count += 1; } tokio::time::sleep(Duration::from_millis(10)).await; } } } impl Drop for TestServer { fn drop(&mut self) { self.kill(); } } impl TestServer { /// Write some line protocol to the server pub async fn write_lp_to_db( &self, database: &str, lp: impl ToString, precision: Precision, ) -> Result<(), influxdb3_client::Error> { let mut client = influxdb3_client::Client::new(self.client_addr()).unwrap(); if let Some(token) = &self.auth_token { client = client.with_auth_token(token); } client .api_v3_write_lp(database) .body(lp.to_string()) .precision(precision) .send() .await } pub async fn api_v3_query_sql_with_header( &self, params: &[(&str, &str)], headers: HeaderMap, ) -> Response { self.http_client .get(format!( "{base}/api/v3/query_sql", base = self.client_addr() )) .query(params) .headers(headers) .send() .await .expect("send /api/v3/query_sql request to server") } pub async fn api_v3_query_sql(&self, params: &[(&str, &str)]) -> Response { self.http_client .get(format!( "{base}/api/v3/query_sql", base = self.client_addr() )) .query(params) .send() .await .expect("send /api/v3/query_sql request to server") } pub async fn api_v3_query_influxql(&self, params: &[(&str, &str)]) -> Response { self.http_client .get(format!( "{base}/api/v3/query_influxql", base = self.client_addr() )) .query(params) .send() .await .expect("send /api/v3/query_influxql request to server") } pub async fn api_v1_query( &self, params: &[(&str, &str)], headers: Option<&[(&str, &str)]>, ) -> Response { let default_headers = [("Accept", "application/json")]; let headers = headers.unwrap_or(&default_headers); let mut header_map = HeaderMap::new(); for (key, value) in headers { header_map.insert( HeaderName::from_bytes(key.as_bytes()).expect("Invalid header key"), HeaderValue::from_bytes(value.as_bytes()).expect("Invalid header value"), ); } self.http_client .get(format!("{base}/query", base = self.client_addr(),)) .headers(header_map) .query(params) .send() .await .expect("send /query request to server") } pub async fn api_v3_configure_table_create(&self, request: &serde_json::Value) -> Response { self.http_client .post(format!( "{base}/api/v3/configure/table", base = self.client_addr() )) .json(request) .send() .await .expect("failed to send request to create table") } pub async fn api_v3_configure_last_cache_create( &self, request: &serde_json::Value, ) -> Response { self.http_client .post(format!( "{base}/api/v3/configure/last_cache", base = self.client_addr() )) .json(request) .send() .await .expect("failed to send request to create last cache") } pub async fn api_v3_configure_last_cache_delete( &self, request: &serde_json::Value, ) -> Response { self.http_client .delete(format!( "{base}/api/v3/configure/last_cache", base = self.client_addr() )) .json(request) .send() .await .expect("failed to send request to delete last cache") } pub async fn api_v3_configure_distinct_cache_create( &self, request: &serde_json::Value, ) -> Response { self.http_client .post(format!( "{base}/api/v3/configure/distinct_cache", base = self.client_addr() )) .json(request) .send() .await .expect("failed to send request to create distinct cache") } pub async fn api_v3_configure_distinct_cache_delete( &self, request: &serde_json::Value, ) -> Response { self.http_client .delete(format!( "{base}/api/v3/configure/distinct_cache", base = self.client_addr() )) .json(request) .send() .await .expect("failed to send request to delete distinct cache") } } /// Write to the server with the line protocol pub async fn write_lp_to_db( server: &TestServer, database: &str, lp: &str, precision: Precision, ) -> Result<(), influxdb3_client::Error> { let client = influxdb3_client::Client::new(server.client_addr()).unwrap(); client .api_v3_write_lp(database) .body(lp.to_string()) .precision(precision) .send() .await } #[allow(dead_code)] pub async fn collect_stream(stream: FlightRecordBatchStream) -> Vec { stream .try_collect() .await .expect("gather record batch stream") }