diff --git a/.circleci/config.yml b/.circleci/config.yml index 6000ceebef..c636cf9c6f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -99,6 +99,29 @@ jobs: command: cargo test --workspace - cache_save + # Integration tests for the influxdb2_client crate against InfluxDB 2.0 OSS. + test_influxdb2_client: + docker: + - image: quay.io/influxdb/rust:ci + environment: + # Disable full debug symbol generation to speed up CI build + # "1" means line tables only, which is useful for panic tracebacks. + RUSTFLAGS: "-C debuginfo=1" + steps: + - checkout + - run: + name: Install InfluxDB 2.0 OSS + command: | + curl -o influxdb2.tar.gz https://dl.influxdata.com/influxdb/releases/influxdb2-2.0.4-linux-amd64.tar.gz + tar xvzf influxdb2.tar.gz + sudo cp influxdb2-2.0.4-linux-amd64/{influx,influxd} /usr/local/bin/ + - rust_nightly + - cache_restore + - run: + name: Cargo test + command: TEST_INTEGRATION=1 cargo test -p influxdb2_client + - cache_save + # Build a dev binary. # # Compiles a binary with the default ("dev") cargo profile from the iox source @@ -209,6 +232,7 @@ workflows: - lint - protobuf-lint - test + - test_influxdb2_client - build # Internal pipeline for perf builds. diff --git a/Cargo.lock b/Cargo.lock index e3abffd53a..ee951f1fd6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1423,10 +1423,13 @@ dependencies = [ "bytes", "futures", "mockito", + "once_cell", + "parking_lot", "reqwest", "serde", "serde_json", "snafu", + "test_helpers", "tokio", ] diff --git a/docs/testing.md b/docs/testing.md index 9e424127ec..7f853bcf94 100644 --- a/docs/testing.md +++ b/docs/testing.md @@ -30,21 +30,12 @@ Other than possibly configuring multiple object stores, configuring the tests to store services is the same as configuring the server to use an object store service. See the output of `influxdb_iox run --help` for instructions. -## InfluxDB IOx Client +## InfluxDB 2 Client -The `influxdb_iox_client` crate might be used by people who are using a managed IOx server. In -other words, they might only use the `influxdb_iox_client` crate and not the rest of the crates in -this workspace. The tests in `influxdb_iox_client` see an IOx server in the same way as IOx servers -see the object store services: sometimes you'll want to run the tests against an actual server, and -sometimes you won't. +The `influxdb2_client` crate may be used by people using InfluxDB 2.0 OSS, and should be compatible +with both that and IOx. If you have `influxd` in your path, the integration tests for the +`influxdb2_client` crate will run integration tests against `influxd`. If you do not have +`influxd`, those tests will not be run and will silently pass. -Like in the `object_store` crate, the `influxdb_iox_client` crate's tests use the -`TEST_INTEGRATION` environment variable to enforce running tests that use an actual IOx server. -Running `cargo test -p influxdb_iox_client` will silently pass tests that contact a server. - -Start an IOx server in one terminal and run `TEST_INTEGRATION=1 -TEST_IOX_ENDPOINT=http://127.0.0.1:8080 cargo test -p influxdb_iox_client` in another (where -`http://127.0.0.1:8080` is the address to the IOx HTTP server) to run the client tests against the -server. If you set `TEST_INTEGRATION` but not `TEST_IOX_ENDPOINT`, the integration tests will fail -because of the missed configuration. If you set `TEST_IOX_ENDPOINT` but not `TEST_INTEGRATION`, the -integration tests will be run. +To ensure you're running the `influxdb2_client` integration tests, you can run `TEST_INTEGRATION=1 +cargo test -p influxdb2_client`, which will fail the tests if `influxd` is not available. diff --git a/influxdb2_client/Cargo.toml b/influxdb2_client/Cargo.toml index b4fc4a7354..d18a43bfbe 100644 --- a/influxdb2_client/Cargo.toml +++ b/influxdb2_client/Cargo.toml @@ -14,4 +14,7 @@ snafu = "0.6.6" [dev-dependencies] # In alphabetical order mockito = "0.26.0" +once_cell = { version = "1.4.0", features = ["parking_lot"] } +parking_lot = "0.11.1" tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } +test_helpers = { path = "../test_helpers" } diff --git a/influxdb2_client/src/api/ready.rs b/influxdb2_client/src/api/ready.rs index 504b081771..6d86d3d06c 100644 --- a/influxdb2_client/src/api/ready.rs +++ b/influxdb2_client/src/api/ready.rs @@ -35,13 +35,9 @@ mod tests { #[tokio::test] async fn ready() { - let token = "some-token"; + let mock_server = mock("GET", "/ready").create(); - let mock_server = mock("GET", "/ready") - .match_header("Authorization", format!("Token {}", token).as_str()) - .create(); - - let client = Client::new(&mockito::server_url(), token); + let client = Client::new(&mockito::server_url(), ""); let _result = client.ready().await; diff --git a/influxdb2_client/src/api/setup.rs b/influxdb2_client/src/api/setup.rs index 3b2ac4eb59..49d6181437 100644 --- a/influxdb2_client/src/api/setup.rs +++ b/influxdb2_client/src/api/setup.rs @@ -60,7 +60,7 @@ impl Client { .context(ReqwestProcessing)?; match response.status() { - StatusCode::OK => Ok(response + StatusCode::CREATED => Ok(response .json::() .await .context(ReqwestProcessing)?), @@ -100,7 +100,7 @@ impl Client { .context(ReqwestProcessing)?; match response.status() { - StatusCode::OK => Ok(response + StatusCode::CREATED => Ok(response .json::() .await .context(ReqwestProcessing)?), @@ -119,13 +119,9 @@ mod tests { #[tokio::test] async fn is_onboarding_allowed() { - let token = "some-token"; + let mock_server = mock("GET", "/api/v2/setup").create(); - let mock_server = mock("GET", "/api/v2/setup") - .match_header("Authorization", format!("Token {}", token).as_str()) - .create(); - - let client = Client::new(&mockito::server_url(), token); + let client = Client::new(&mockito::server_url(), ""); let _result = client.is_onboarding_allowed().await; @@ -142,7 +138,6 @@ mod tests { let retention_period_hrs = 1; let mock_server = mock("POST", "/api/v2/setup") - .match_header("Authorization", format!("Token {}", token).as_str()) .match_body( format!( r#"{{"username":"{}","org":"{}","bucket":"{}","password":"{}","retentionPeriodHrs":{}}}"#, @@ -204,13 +199,11 @@ mod tests { #[tokio::test] async fn onboarding_opt() { - let token = "some-token"; let username = "some-user"; let org = "some-org"; let bucket = "some-bucket"; let mock_server = mock("POST", "/api/v2/setup") - .match_header("Authorization", format!("Token {}", token).as_str()) .match_body( format!( r#"{{"username":"{}","org":"{}","bucket":"{}"}}"#, @@ -220,7 +213,7 @@ mod tests { ) .create(); - let client = Client::new(&mockito::server_url(), token); + let client = Client::new(&mockito::server_url(), ""); let _result = client .onboarding(username, org, bucket, None, None, None) diff --git a/influxdb2_client/src/lib.rs b/influxdb2_client/src/lib.rs index 9c417b6609..0fe80f80c7 100644 --- a/influxdb2_client/src/lib.rs +++ b/influxdb2_client/src/lib.rs @@ -64,10 +64,7 @@ use futures::{Stream, StreamExt}; use reqwest::{Body, Method}; use serde::Serialize; use snafu::{ResultExt, Snafu}; -use std::{ - fmt, - io::{self, Write}, -}; +use std::io::{self, Write}; pub mod data_point; pub use data_point::{DataPoint, FieldValue, WriteDataPoint}; @@ -106,7 +103,7 @@ pub enum RequestError { pub struct Client { /// The base URL this client sends requests to pub url: String, - auth_header: String, + auth_header: Option, reqwest: reqwest::Client, } @@ -120,19 +117,30 @@ impl Client { /// ``` /// let client = influxdb2_client::Client::new("http://localhost:8888", "my-token"); /// ``` - pub fn new(url: impl Into, auth_token: impl fmt::Display) -> Self { + pub fn new(url: impl Into, auth_token: impl Into) -> Self { + let token = auth_token.into(); + let auth_header = if token.is_empty() { + None + } else { + Some(format!("Token {}", token)) + }; + Self { url: url.into(), - auth_header: format!("Token {}", auth_token), + auth_header, reqwest: reqwest::Client::new(), } } /// Consolidate common request building code fn request(&self, method: Method, url: &str) -> reqwest::RequestBuilder { - self.reqwest - .request(method, url) - .header("Authorization", &self.auth_header) + let mut req = self.reqwest.request(method, url); + + if let Some(auth) = &self.auth_header { + req = req.header("Authorization", auth); + } + + req } /// Write line protocol data to the specified organization and bucket. diff --git a/influxdb2_client/tests/common/mod.rs b/influxdb2_client/tests/common/mod.rs new file mode 100644 index 0000000000..a057eb4940 --- /dev/null +++ b/influxdb2_client/tests/common/mod.rs @@ -0,0 +1 @@ +pub mod server_fixture; diff --git a/influxdb2_client/tests/common/server_fixture.rs b/influxdb2_client/tests/common/server_fixture.rs new file mode 100644 index 0000000000..d5381386d5 --- /dev/null +++ b/influxdb2_client/tests/common/server_fixture.rs @@ -0,0 +1,298 @@ +use once_cell::sync::OnceCell; +use std::{ + fs::File, + process::{Child, Command}, + sync::{ + atomic::{AtomicUsize, Ordering::SeqCst}, + Arc, Weak, + }, + time::Duration, +}; +use tokio::sync::Mutex; + +type Result> = std::result::Result; + +#[macro_export] +/// If InfluxDB 2.0 OSS is available in the PATH at `influxd`, set up the server +/// as requested return it to the caller. +/// +/// If `influxd` is not available, skip the calling test by returning +/// early. Additionally if `TEST_INTEGRATION` is set, turn this early return +/// into a panic to force a hard fail for skipped integration tests. +macro_rules! maybe_skip_integration { + ($server_fixture:expr) => { + match ( + std::process::Command::new("which") + .arg("influxd") + .stdout(std::process::Stdio::null()) + .status() + .expect("should be able to run `which`") + .success(), + std::env::var("TEST_INTEGRATION").is_ok(), + ) { + (true, _) => $server_fixture, + (false, true) => { + panic!( + "TEST_INTEGRATION is set which requires running integration tests, but \ + `influxd` is not available" + ) + } + _ => { + eprintln!("skipping integration test - install `influxd` to run"); + return Ok(()); + } + } + }; +} + +/// Represents a server that has been started and is available for +/// testing. +pub struct ServerFixture { + server: Arc, +} + +impl ServerFixture { + /// Create a new server fixture and wait for it to be ready. This + /// is called "create" rather than new because it is async and + /// waits. The shared database can be used immediately. + /// + /// This is currently implemented as a singleton so all tests *must* + /// use a new database and not interfere with the existing database. + pub async fn create_shared() -> Self { + // Try and reuse the same shared server, if there is already + // one present + static SHARED_SERVER: OnceCell>> = OnceCell::new(); + + let shared_server = SHARED_SERVER.get_or_init(|| parking_lot::Mutex::new(Weak::new())); + + let mut shared_server = shared_server.lock(); + + // is a shared server already present? + let server = match shared_server.upgrade() { + Some(server) => server, + None => { + // if not, create one + let mut server = TestServer::new().expect("Could start test server"); + // ensure the server is ready + server.wait_until_ready(InitialConfig::Onboarded).await; + + let server = Arc::new(server); + // save a reference for other threads that may want to + // use this server, but don't prevent it from being + // destroyed when going out of scope + *shared_server = Arc::downgrade(&server); + server + } + }; + std::mem::drop(shared_server); + + Self { server } + } + + /// Create a new server fixture and wait for it to be ready. This + /// is called "create" rather than new because it is async and + /// waits. The database is left unconfigured and is not shared + /// with any other tests. + pub async fn create_single_use() -> Self { + let mut server = TestServer::new().expect("Could start test server"); + + // ensure the server is ready + server.wait_until_ready(InitialConfig::None).await; + + let server = Arc::new(server); + + Self { server } + } + + /// Return a client suitable for communicating with this server + pub fn client(&self) -> influxdb2_client::Client { + match self.server.admin_token.as_ref() { + Some(token) => influxdb2_client::Client::new(self.http_base(), token), + None => influxdb2_client::Client::new(self.http_base(), ""), + } + } + + /// Return the http base URL for the HTTP API + pub fn http_base(&self) -> &str { + &self.server.http_base + } +} + +/// Specifies whether the server should be set up initially +#[derive(Debug, Copy, Clone, PartialEq)] +enum InitialConfig { + /// Don't set up the server, the test will (for testing onboarding) + None, + /// Onboard the server and set up the client with the associated token (for + /// most tests) + Onboarded, +} + +// These port numbers are chosen to not collide with a development ioxd/influxd +// server running locally. +// TODO(786): allocate random free ports instead of hardcoding. +// TODO(785): we cannot use localhost here. +static NEXT_PORT: AtomicUsize = AtomicUsize::new(8190); + +/// Represents the current known state of a TestServer +#[derive(Debug)] +enum ServerState { + Started, + Ready, + Error, +} + +const ADMIN_TEST_USER: &str = "admin-test-user"; +const ADMIN_TEST_ORG: &str = "admin-test-org"; +const ADMIN_TEST_BUCKET: &str = "admin-test-bucket"; +const ADMIN_TEST_PASSWORD: &str = "admin-test-password"; + +struct TestServer { + /// Is the server ready to accept connections? + ready: Mutex, + /// Handle to the server process being controlled + server_process: Child, + /// HTTP API base + http_base: String, + /// Admin token, if onboarding has happened + admin_token: Option, +} + +impl TestServer { + fn new() -> Result { + let ready = Mutex::new(ServerState::Started); + let http_port = NEXT_PORT.fetch_add(1, SeqCst); + let http_base = format!("http://127.0.0.1:{}", http_port); + + let temp_dir = test_helpers::tmp_dir().unwrap(); + + let mut log_path = temp_dir.path().to_path_buf(); + log_path.push(format!("influxdb_server_fixture_{}.log", http_port)); + + let mut bolt_path = temp_dir.path().to_path_buf(); + bolt_path.push(format!("influxd_{}.bolt", http_port)); + + let mut engine_path = temp_dir.path().to_path_buf(); + engine_path.push(format!("influxd_{}_engine", http_port)); + + println!("****************"); + println!("Server Logging to {:?}", log_path); + println!("****************"); + let log_file = File::create(log_path).expect("Opening log file"); + + let stdout_log_file = log_file + .try_clone() + .expect("cloning file handle for stdout"); + let stderr_log_file = log_file; + + let server_process = Command::new("influxd") + .arg("--http-bind-address") + .arg(format!(":{}", http_port)) + .arg("--bolt-path") + .arg(bolt_path) + .arg("--engine-path") + .arg(engine_path) + // redirect output to log file + .stdout(stdout_log_file) + .stderr(stderr_log_file) + .spawn()?; + + Ok(Self { + ready, + server_process, + http_base, + admin_token: None, + }) + } + + async fn wait_until_ready(&mut self, initial_config: InitialConfig) { + let mut ready = self.ready.lock().await; + match *ready { + ServerState::Started => {} // first time, need to try and start it + ServerState::Ready => { + return; + } + ServerState::Error => { + panic!("Server was previously found to be in Error, aborting"); + } + } + + let try_http_connect = async { + let client = reqwest::Client::new(); + let url = format!("{}/health", self.http_base); + let mut interval = tokio::time::interval(Duration::from_secs(5)); + loop { + match client.get(&url).send().await { + Ok(resp) => { + println!("Successfully got a response from HTTP: {:?}", resp); + return; + } + Err(e) => { + println!("Waiting for HTTP server to be up: {}", e); + } + } + interval.tick().await; + } + }; + + let capped_check = tokio::time::timeout(Duration::from_secs(100), try_http_connect); + + match capped_check.await { + Ok(_) => { + println!("Successfully started {}", self); + *ready = ServerState::Ready; + } + Err(e) => { + // tell others that this server had some problem + *ready = ServerState::Error; + std::mem::drop(ready); + panic!("Server was not ready in required time: {}", e); + } + } + + // Onboard, if requested. + if initial_config == InitialConfig::Onboarded { + let client = influxdb2_client::Client::new(&self.http_base, ""); + let response = client + .onboarding( + ADMIN_TEST_USER, + ADMIN_TEST_ORG, + ADMIN_TEST_BUCKET, + Some(ADMIN_TEST_PASSWORD.to_string()), + Some(0), + None, + ) + .await; + + match response { + Ok(onboarding) => { + let token = onboarding + .auth + .expect("Onboarding should have returned auth info") + .token + .expect("Onboarding auth should have returned a token"); + self.admin_token = Some(token); + } + Err(e) => { + *ready = ServerState::Error; + std::mem::drop(ready); + panic!("Could not onboard: {}", e); + } + } + } + } +} + +impl std::fmt::Display for TestServer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { + write!(f, "TestServer (http api: {})", self.http_base) + } +} + +impl Drop for TestServer { + fn drop(&mut self) { + self.server_process + .kill() + .expect("Should have been able to kill the test server"); + } +} diff --git a/influxdb2_client/tests/setup.rs b/influxdb2_client/tests/setup.rs new file mode 100644 index 0000000000..1ba03f96d4 --- /dev/null +++ b/influxdb2_client/tests/setup.rs @@ -0,0 +1,118 @@ +pub mod common; +use common::server_fixture::ServerFixture; + +type Result> = std::result::Result; + +#[tokio::test] +async fn new_server_needs_onboarded() -> Result { + let server_fixture = maybe_skip_integration!(ServerFixture::create_single_use()).await; + let client = server_fixture.client(); + + let res = client.is_onboarding_allowed().await?; + assert!(res); + + // Creating a new setup user without first onboarding is an error + let username = "some-user"; + let org = "some-org"; + let bucket = "some-bucket"; + let password = "some-password"; + let retention_period_hrs = 0; + + let err = client + .post_setup_user( + username, + org, + bucket, + Some(password.to_string()), + Some(retention_period_hrs), + None, + ) + .await + .expect_err("Expected error, got success"); + + assert!(matches!( + err, + influxdb2_client::RequestError::Http { + status: reqwest::StatusCode::UNAUTHORIZED, + .. + } + )); + + Ok(()) +} + +#[tokio::test] +async fn onboarding() -> Result { + let server_fixture = maybe_skip_integration!(ServerFixture::create_single_use()).await; + let client = server_fixture.client(); + + let username = "some-user"; + let org = "some-org"; + let bucket = "some-bucket"; + let password = "some-password"; + let retention_period_hrs = 0; + + client + .onboarding( + username, + org, + bucket, + Some(password.to_string()), + Some(retention_period_hrs), + None, + ) + .await?; + + let res = client.is_onboarding_allowed().await?; + assert!(!res); + + // Onboarding twice is an error + let err = client + .onboarding( + username, + org, + bucket, + Some(password.to_string()), + Some(retention_period_hrs), + None, + ) + .await + .expect_err("Expected error, got success"); + + assert!(matches!( + err, + influxdb2_client::RequestError::Http { + status: reqwest::StatusCode::UNPROCESSABLE_ENTITY, + .. + } + )); + + Ok(()) +} + +#[tokio::test] +async fn create_users() -> Result { + // Using a server that has been set up + let server_fixture = maybe_skip_integration!(ServerFixture::create_shared()).await; + let client = server_fixture.client(); + + let username = "another-user"; + let org = "another-org"; + let bucket = "another-bucket"; + let password = "another-password"; + let retention_period_hrs = 0; + + // Creating a user should work + client + .post_setup_user( + username, + org, + bucket, + Some(password.to_string()), + Some(retention_period_hrs), + None, + ) + .await?; + + Ok(()) +}