482 lines
15 KiB
Rust
482 lines
15 KiB
Rust
use std::{
|
|
future::Future,
|
|
io::{BufRead, BufReader},
|
|
process::{Child, Command, Stdio},
|
|
time::Duration,
|
|
};
|
|
|
|
use arrow::record_batch::RecordBatch;
|
|
use arrow_flight::{decode::FlightRecordBatchStream, FlightClient};
|
|
use assert_cmd::cargo::CommandCargoExt;
|
|
use futures::TryStreamExt;
|
|
use influxdb3_client::Precision;
|
|
use influxdb_iox_client::flightsql::FlightSqlClient;
|
|
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
|
|
use reqwest::Response;
|
|
|
|
mod auth;
|
|
mod client;
|
|
mod configure;
|
|
mod flight;
|
|
mod limits;
|
|
#[cfg(feature = "system-py")]
|
|
mod packages;
|
|
mod ping;
|
|
mod query;
|
|
mod system_tables;
|
|
mod write;
|
|
|
|
pub trait ConfigProvider {
|
|
/// Convert this to a set of command line arguments for `influxdb3 serve`
|
|
fn as_args(&self) -> Vec<String>;
|
|
|
|
/// Get the auth token from this config if it was set
|
|
fn auth_token(&self) -> Option<&str>;
|
|
|
|
/// Spawn a new [`TestServer`] with this configuration
|
|
///
|
|
/// This will run the `influxdb3 serve` command and bind its HTTP address to a random port
|
|
/// on localhost.
|
|
fn spawn(&self) -> impl Future<Output = TestServer>
|
|
where
|
|
Self: Sized + Sync,
|
|
{
|
|
async { TestServer::spawn_inner(self).await }
|
|
}
|
|
}
|
|
|
|
/// Configuration for a [`TestServer`]
|
|
#[derive(Debug, Default)]
|
|
pub struct TestConfig {
|
|
auth_token: Option<(String, String)>,
|
|
node_id: Option<String>,
|
|
plugin_dir: Option<String>,
|
|
virtual_env_dir: Option<String>,
|
|
package_manager: Option<String>,
|
|
// If None, use memory object store.
|
|
object_store_dir: Option<String>,
|
|
}
|
|
|
|
impl TestConfig {
|
|
/// Set the auth token for this [`TestServer`]
|
|
pub fn with_auth_token<S: Into<String>, R: Into<String>>(
|
|
mut self,
|
|
hashed_token: S,
|
|
raw_token: R,
|
|
) -> Self {
|
|
self.auth_token = Some((hashed_token.into(), raw_token.into()));
|
|
self
|
|
}
|
|
|
|
/// Set a host identifier prefix on the spawned [`TestServer`]
|
|
pub fn with_node_id<S: Into<String>>(mut self, node_id: S) -> Self {
|
|
self.node_id = Some(node_id.into());
|
|
self
|
|
}
|
|
|
|
/// Set the plugin dir for this [`TestServer`]
|
|
pub fn with_plugin_dir<S: Into<String>>(mut self, plugin_dir: S) -> Self {
|
|
self.plugin_dir = Some(plugin_dir.into());
|
|
self
|
|
}
|
|
|
|
/// Set the virtual env dir for this [`TestServer`]
|
|
pub fn with_virtual_env<S: Into<String>>(mut self, virtual_env_dir: S) -> Self {
|
|
self.virtual_env_dir = Some(virtual_env_dir.into());
|
|
self
|
|
}
|
|
|
|
pub fn with_package_manager<S: Into<String>>(mut self, package_manager: S) -> Self {
|
|
self.package_manager = Some(package_manager.into());
|
|
self
|
|
}
|
|
|
|
// Set the object store dir for this [`TestServer`]
|
|
pub fn with_object_store_dir<S: Into<String>>(mut self, object_store_dir: S) -> Self {
|
|
self.object_store_dir = Some(object_store_dir.into());
|
|
self
|
|
}
|
|
}
|
|
|
|
impl ConfigProvider for TestConfig {
|
|
fn as_args(&self) -> Vec<String> {
|
|
let mut args = vec![];
|
|
if let Some((token, _)) = &self.auth_token {
|
|
args.append(&mut vec!["--bearer-token".to_string(), token.to_owned()]);
|
|
}
|
|
if let Some(plugin_dir) = &self.plugin_dir {
|
|
args.append(&mut vec!["--plugin-dir".to_string(), plugin_dir.to_owned()]);
|
|
}
|
|
if let Some(virtual_env_dir) = &self.virtual_env_dir {
|
|
args.append(&mut vec![
|
|
"--virtual-env-location".to_string(),
|
|
virtual_env_dir.to_owned(),
|
|
]);
|
|
}
|
|
if let Some(package_manager) = &self.package_manager {
|
|
args.append(&mut vec![
|
|
"--package-manager".to_string(),
|
|
package_manager.to_owned(),
|
|
]);
|
|
}
|
|
args.push("--node-id".to_string());
|
|
if let Some(host) = &self.node_id {
|
|
args.push(host.to_owned());
|
|
} else {
|
|
args.push("test-server".to_string());
|
|
}
|
|
if let Some(object_store_dir) = &self.object_store_dir {
|
|
args.append(&mut vec![
|
|
"--object-store".to_string(),
|
|
"file".to_string(),
|
|
"--data-dir".to_string(),
|
|
object_store_dir.to_owned(),
|
|
]);
|
|
} else {
|
|
args.append(&mut vec![
|
|
"--object-store".to_string(),
|
|
"memory".to_string(),
|
|
]);
|
|
}
|
|
args
|
|
}
|
|
|
|
fn auth_token(&self) -> Option<&str> {
|
|
self.auth_token.as_ref().map(|(_, t)| t.as_str())
|
|
}
|
|
}
|
|
|
|
/// A running instance of the `influxdb3 serve` process
|
|
///
|
|
/// Logs will be emitted to stdout/stderr if the TEST_LOG environment variable is set, e.g.,
|
|
/// ```
|
|
/// TEST_LOG= cargo nextest run -p influxdb3 --nocapture
|
|
/// ```
|
|
/// This will forward the value provided in `TEST_LOG` to the `LOG_FILTER` env var on the running
|
|
/// `influxdb` binary. By default, a log filter of `info` is used, which would provide similar
|
|
/// output to what is seen in production, however, per-crate filters can be provided via this
|
|
/// argument, e.g., `info,influxdb3_write=debug` would emit logs at `INFO` level for all crates
|
|
/// except for the `influxdb3_write` crate, which will emit logs at the `DEBUG` level.
|
|
pub struct TestServer {
|
|
auth_token: Option<String>,
|
|
bind_addr: String,
|
|
server_process: Child,
|
|
http_client: reqwest::Client,
|
|
}
|
|
|
|
impl TestServer {
|
|
/// Spawn a new [`TestServer`]
|
|
///
|
|
/// This will run the `influxdb3 serve` command, and bind its HTTP
|
|
/// address to a random port on localhost.
|
|
pub async fn spawn() -> Self {
|
|
Self::spawn_inner(&TestConfig::default()).await
|
|
}
|
|
|
|
/// Configure a [`TestServer`] before spawning
|
|
pub fn configure() -> TestConfig {
|
|
TestConfig::default()
|
|
}
|
|
|
|
async fn spawn_inner(config: &impl ConfigProvider) -> Self {
|
|
let mut command = Command::cargo_bin("influxdb3").expect("create the influxdb3 command");
|
|
let command = command
|
|
.arg("serve")
|
|
.arg("--disable-telemetry-upload")
|
|
// bind to port 0 to get a random port assigned:
|
|
.args(["--http-bind", "0.0.0.0:0"])
|
|
.args(["--wal-flush-interval", "10ms"])
|
|
.args(config.as_args())
|
|
.stdout(Stdio::piped());
|
|
|
|
// Use the TEST_LOG env var to determine if logs are emitted from the spawned process
|
|
let emit_logs = if std::env::var("TEST_LOG").is_ok() {
|
|
// use "info" filter, as would be used in production:
|
|
command.env("LOG_FILTER", "info");
|
|
true
|
|
} else {
|
|
false
|
|
};
|
|
|
|
let mut server_process = command.spawn().expect("spawn the influxdb3 server process");
|
|
|
|
// pipe stdout so we can get the randomly assigned port from the log output:
|
|
let process_stdout = server_process
|
|
.stdout
|
|
.take()
|
|
.expect("should acquire stdout from process");
|
|
|
|
let mut lines = BufReader::new(process_stdout).lines();
|
|
let bind_addr = loop {
|
|
let Some(Ok(line)) = lines.next() else {
|
|
panic!("stdout closed unexpectedly");
|
|
};
|
|
if emit_logs {
|
|
println!("{line}");
|
|
}
|
|
if line.contains("startup time") {
|
|
if let Some(address) = line.split("address=").last() {
|
|
break address.to_string();
|
|
}
|
|
}
|
|
};
|
|
|
|
tokio::task::spawn_blocking(move || {
|
|
for line in lines {
|
|
let line = line.expect("io error while getting line from stdout");
|
|
if emit_logs {
|
|
println!("{line}");
|
|
}
|
|
}
|
|
});
|
|
|
|
let server = Self {
|
|
auth_token: config.auth_token().map(|s| s.to_owned()),
|
|
bind_addr,
|
|
server_process,
|
|
http_client: reqwest::Client::new(),
|
|
};
|
|
|
|
server.wait_until_ready().await;
|
|
server
|
|
}
|
|
|
|
/// Get the URL of the running service for use with an HTTP client
|
|
pub fn client_addr(&self) -> String {
|
|
format!("http://{addr}", addr = self.bind_addr)
|
|
}
|
|
|
|
/// Get a [`FlightSqlClient`] for making requests to the running service over gRPC
|
|
pub async fn flight_sql_client(&self, database: &str) -> FlightSqlClient {
|
|
let channel = tonic::transport::Channel::from_shared(self.client_addr())
|
|
.expect("create tonic channel")
|
|
.connect()
|
|
.await
|
|
.expect("connect to gRPC client");
|
|
let mut client = FlightSqlClient::new(channel);
|
|
client.add_header("database", database).unwrap();
|
|
client
|
|
}
|
|
|
|
/// Get a raw [`FlightClient`] for performing Flight actions directly
|
|
pub async fn flight_client(&self) -> FlightClient {
|
|
let channel = tonic::transport::Channel::from_shared(self.client_addr())
|
|
.expect("create tonic channel")
|
|
.connect()
|
|
.await
|
|
.expect("connect to gRPC client");
|
|
FlightClient::new(channel)
|
|
}
|
|
|
|
pub fn kill(&mut self) {
|
|
self.server_process.kill().expect("kill the server process");
|
|
}
|
|
|
|
async fn wait_until_ready(&self) {
|
|
let mut count = 0;
|
|
while self
|
|
.http_client
|
|
.get(format!("{base}/health", base = self.client_addr()))
|
|
.send()
|
|
.await
|
|
.is_err()
|
|
{
|
|
if count > 500 {
|
|
panic!("server failed to start");
|
|
} else {
|
|
count += 1;
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Drop for TestServer {
|
|
fn drop(&mut self) {
|
|
self.kill();
|
|
}
|
|
}
|
|
|
|
impl TestServer {
|
|
/// Write some line protocol to the server
|
|
pub async fn write_lp_to_db(
|
|
&self,
|
|
database: &str,
|
|
lp: impl ToString,
|
|
precision: Precision,
|
|
) -> Result<(), influxdb3_client::Error> {
|
|
let mut client = influxdb3_client::Client::new(self.client_addr()).unwrap();
|
|
if let Some(token) = &self.auth_token {
|
|
client = client.with_auth_token(token);
|
|
}
|
|
client
|
|
.api_v3_write_lp(database)
|
|
.body(lp.to_string())
|
|
.precision(precision)
|
|
.send()
|
|
.await
|
|
}
|
|
|
|
pub async fn api_v3_query_sql_with_header(
|
|
&self,
|
|
params: &[(&str, &str)],
|
|
headers: HeaderMap<HeaderValue>,
|
|
) -> Response {
|
|
self.http_client
|
|
.get(format!(
|
|
"{base}/api/v3/query_sql",
|
|
base = self.client_addr()
|
|
))
|
|
.query(params)
|
|
.headers(headers)
|
|
.send()
|
|
.await
|
|
.expect("send /api/v3/query_sql request to server")
|
|
}
|
|
|
|
pub async fn api_v3_query_sql(&self, params: &[(&str, &str)]) -> Response {
|
|
self.http_client
|
|
.get(format!(
|
|
"{base}/api/v3/query_sql",
|
|
base = self.client_addr()
|
|
))
|
|
.query(params)
|
|
.send()
|
|
.await
|
|
.expect("send /api/v3/query_sql request to server")
|
|
}
|
|
|
|
pub async fn api_v3_query_influxql(&self, params: &[(&str, &str)]) -> Response {
|
|
self.http_client
|
|
.get(format!(
|
|
"{base}/api/v3/query_influxql",
|
|
base = self.client_addr()
|
|
))
|
|
.query(params)
|
|
.send()
|
|
.await
|
|
.expect("send /api/v3/query_influxql request to server")
|
|
}
|
|
|
|
pub async fn api_v1_query(
|
|
&self,
|
|
params: &[(&str, &str)],
|
|
headers: Option<&[(&str, &str)]>,
|
|
) -> Response {
|
|
let default_headers = [("Accept", "application/json")];
|
|
let headers = headers.unwrap_or(&default_headers);
|
|
|
|
let mut header_map = HeaderMap::new();
|
|
for (key, value) in headers {
|
|
header_map.insert(
|
|
HeaderName::from_bytes(key.as_bytes()).expect("Invalid header key"),
|
|
HeaderValue::from_bytes(value.as_bytes()).expect("Invalid header value"),
|
|
);
|
|
}
|
|
|
|
self.http_client
|
|
.get(format!("{base}/query", base = self.client_addr(),))
|
|
.headers(header_map)
|
|
.query(params)
|
|
.send()
|
|
.await
|
|
.expect("send /query request to server")
|
|
}
|
|
|
|
pub async fn api_v3_configure_table_create(&self, request: &serde_json::Value) -> Response {
|
|
self.http_client
|
|
.post(format!(
|
|
"{base}/api/v3/configure/table",
|
|
base = self.client_addr()
|
|
))
|
|
.json(request)
|
|
.send()
|
|
.await
|
|
.expect("failed to send request to create table")
|
|
}
|
|
|
|
pub async fn api_v3_configure_last_cache_create(
|
|
&self,
|
|
request: &serde_json::Value,
|
|
) -> Response {
|
|
self.http_client
|
|
.post(format!(
|
|
"{base}/api/v3/configure/last_cache",
|
|
base = self.client_addr()
|
|
))
|
|
.json(request)
|
|
.send()
|
|
.await
|
|
.expect("failed to send request to create last cache")
|
|
}
|
|
|
|
pub async fn api_v3_configure_last_cache_delete(
|
|
&self,
|
|
request: &serde_json::Value,
|
|
) -> Response {
|
|
self.http_client
|
|
.delete(format!(
|
|
"{base}/api/v3/configure/last_cache",
|
|
base = self.client_addr()
|
|
))
|
|
.json(request)
|
|
.send()
|
|
.await
|
|
.expect("failed to send request to delete last cache")
|
|
}
|
|
|
|
pub async fn api_v3_configure_distinct_cache_create(
|
|
&self,
|
|
request: &serde_json::Value,
|
|
) -> Response {
|
|
self.http_client
|
|
.post(format!(
|
|
"{base}/api/v3/configure/distinct_cache",
|
|
base = self.client_addr()
|
|
))
|
|
.json(request)
|
|
.send()
|
|
.await
|
|
.expect("failed to send request to create distinct cache")
|
|
}
|
|
|
|
pub async fn api_v3_configure_distinct_cache_delete(
|
|
&self,
|
|
request: &serde_json::Value,
|
|
) -> Response {
|
|
self.http_client
|
|
.delete(format!(
|
|
"{base}/api/v3/configure/distinct_cache",
|
|
base = self.client_addr()
|
|
))
|
|
.json(request)
|
|
.send()
|
|
.await
|
|
.expect("failed to send request to delete distinct cache")
|
|
}
|
|
}
|
|
|
|
/// Write to the server with the line protocol
|
|
pub async fn write_lp_to_db(
|
|
server: &TestServer,
|
|
database: &str,
|
|
lp: &str,
|
|
precision: Precision,
|
|
) -> Result<(), influxdb3_client::Error> {
|
|
let client = influxdb3_client::Client::new(server.client_addr()).unwrap();
|
|
client
|
|
.api_v3_write_lp(database)
|
|
.body(lp.to_string())
|
|
.precision(precision)
|
|
.send()
|
|
.await
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
pub async fn collect_stream(stream: FlightRecordBatchStream) -> Vec<RecordBatch> {
|
|
stream
|
|
.try_collect()
|
|
.await
|
|
.expect("gather record batch stream")
|
|
}
|