Merge pull request #1093 from influxdata/cn/influxdb2-integration-tests

chore: Add integration tests for influxdb2_client against InfluxDB 2.0 OSS
pull/24376/head
kodiakhq[bot] 2021-04-07 19:42:26 +00:00 committed by GitHub
commit d59c5de39a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 479 additions and 44 deletions

View File

@ -99,6 +99,29 @@ jobs:
command: cargo test --workspace
- cache_save
# Integration tests for the influxdb2_client crate against InfluxDB 2.0 OSS.
test_influxdb2_client:
docker:
- image: quay.io/influxdb/rust:ci
environment:
# Disable full debug symbol generation to speed up CI build
# "1" means line tables only, which is useful for panic tracebacks.
RUSTFLAGS: "-C debuginfo=1"
steps:
- checkout
- run:
name: Install InfluxDB 2.0 OSS
command: |
curl -o influxdb2.tar.gz https://dl.influxdata.com/influxdb/releases/influxdb2-2.0.4-linux-amd64.tar.gz
tar xvzf influxdb2.tar.gz
sudo cp influxdb2-2.0.4-linux-amd64/{influx,influxd} /usr/local/bin/
- rust_nightly
- cache_restore
- run:
name: Cargo test
command: TEST_INTEGRATION=1 cargo test -p influxdb2_client
- cache_save
# Build a dev binary.
#
# Compiles a binary with the default ("dev") cargo profile from the iox source
@ -209,6 +232,7 @@ workflows:
- lint
- protobuf-lint
- test
- test_influxdb2_client
- build
# Internal pipeline for perf builds.

3
Cargo.lock generated
View File

@ -1423,10 +1423,13 @@ dependencies = [
"bytes",
"futures",
"mockito",
"once_cell",
"parking_lot",
"reqwest",
"serde",
"serde_json",
"snafu",
"test_helpers",
"tokio",
]

View File

@ -30,21 +30,12 @@ Other than possibly configuring multiple object stores, configuring the tests to
store services is the same as configuring the server to use an object store service. See the output
of `influxdb_iox run --help` for instructions.
## InfluxDB IOx Client
## InfluxDB 2 Client
The `influxdb_iox_client` crate might be used by people who are using a managed IOx server. In
other words, they might only use the `influxdb_iox_client` crate and not the rest of the crates in
this workspace. The tests in `influxdb_iox_client` see an IOx server in the same way as IOx servers
see the object store services: sometimes you'll want to run the tests against an actual server, and
sometimes you won't.
The `influxdb2_client` crate may be used by people using InfluxDB 2.0 OSS, and should be compatible
with both that and IOx. If you have `influxd` in your path, the integration tests for the
`influxdb2_client` crate will run integration tests against `influxd`. If you do not have
`influxd`, those tests will not be run and will silently pass.
Like in the `object_store` crate, the `influxdb_iox_client` crate's tests use the
`TEST_INTEGRATION` environment variable to enforce running tests that use an actual IOx server.
Running `cargo test -p influxdb_iox_client` will silently pass tests that contact a server.
Start an IOx server in one terminal and run `TEST_INTEGRATION=1
TEST_IOX_ENDPOINT=http://127.0.0.1:8080 cargo test -p influxdb_iox_client` in another (where
`http://127.0.0.1:8080` is the address to the IOx HTTP server) to run the client tests against the
server. If you set `TEST_INTEGRATION` but not `TEST_IOX_ENDPOINT`, the integration tests will fail
because of the missed configuration. If you set `TEST_IOX_ENDPOINT` but not `TEST_INTEGRATION`, the
integration tests will be run.
To ensure you're running the `influxdb2_client` integration tests, you can run `TEST_INTEGRATION=1
cargo test -p influxdb2_client`, which will fail the tests if `influxd` is not available.

View File

@ -14,4 +14,7 @@ snafu = "0.6.6"
[dev-dependencies] # In alphabetical order
mockito = "0.26.0"
once_cell = { version = "1.4.0", features = ["parking_lot"] }
parking_lot = "0.11.1"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
test_helpers = { path = "../test_helpers" }

View File

@ -35,13 +35,9 @@ mod tests {
#[tokio::test]
async fn ready() {
let token = "some-token";
let mock_server = mock("GET", "/ready").create();
let mock_server = mock("GET", "/ready")
.match_header("Authorization", format!("Token {}", token).as_str())
.create();
let client = Client::new(&mockito::server_url(), token);
let client = Client::new(&mockito::server_url(), "");
let _result = client.ready().await;

View File

@ -60,7 +60,7 @@ impl Client {
.context(ReqwestProcessing)?;
match response.status() {
StatusCode::OK => Ok(response
StatusCode::CREATED => Ok(response
.json::<OnboardingResponse>()
.await
.context(ReqwestProcessing)?),
@ -100,7 +100,7 @@ impl Client {
.context(ReqwestProcessing)?;
match response.status() {
StatusCode::OK => Ok(response
StatusCode::CREATED => Ok(response
.json::<OnboardingResponse>()
.await
.context(ReqwestProcessing)?),
@ -119,13 +119,9 @@ mod tests {
#[tokio::test]
async fn is_onboarding_allowed() {
let token = "some-token";
let mock_server = mock("GET", "/api/v2/setup").create();
let mock_server = mock("GET", "/api/v2/setup")
.match_header("Authorization", format!("Token {}", token).as_str())
.create();
let client = Client::new(&mockito::server_url(), token);
let client = Client::new(&mockito::server_url(), "");
let _result = client.is_onboarding_allowed().await;
@ -142,7 +138,6 @@ mod tests {
let retention_period_hrs = 1;
let mock_server = mock("POST", "/api/v2/setup")
.match_header("Authorization", format!("Token {}", token).as_str())
.match_body(
format!(
r#"{{"username":"{}","org":"{}","bucket":"{}","password":"{}","retentionPeriodHrs":{}}}"#,
@ -204,13 +199,11 @@ mod tests {
#[tokio::test]
async fn onboarding_opt() {
let token = "some-token";
let username = "some-user";
let org = "some-org";
let bucket = "some-bucket";
let mock_server = mock("POST", "/api/v2/setup")
.match_header("Authorization", format!("Token {}", token).as_str())
.match_body(
format!(
r#"{{"username":"{}","org":"{}","bucket":"{}"}}"#,
@ -220,7 +213,7 @@ mod tests {
)
.create();
let client = Client::new(&mockito::server_url(), token);
let client = Client::new(&mockito::server_url(), "");
let _result = client
.onboarding(username, org, bucket, None, None, None)

View File

@ -64,10 +64,7 @@ use futures::{Stream, StreamExt};
use reqwest::{Body, Method};
use serde::Serialize;
use snafu::{ResultExt, Snafu};
use std::{
fmt,
io::{self, Write},
};
use std::io::{self, Write};
pub mod data_point;
pub use data_point::{DataPoint, FieldValue, WriteDataPoint};
@ -106,7 +103,7 @@ pub enum RequestError {
pub struct Client {
/// The base URL this client sends requests to
pub url: String,
auth_header: String,
auth_header: Option<String>,
reqwest: reqwest::Client,
}
@ -120,19 +117,30 @@ impl Client {
/// ```
/// let client = influxdb2_client::Client::new("http://localhost:8888", "my-token");
/// ```
pub fn new(url: impl Into<String>, auth_token: impl fmt::Display) -> Self {
pub fn new(url: impl Into<String>, auth_token: impl Into<String>) -> Self {
let token = auth_token.into();
let auth_header = if token.is_empty() {
None
} else {
Some(format!("Token {}", token))
};
Self {
url: url.into(),
auth_header: format!("Token {}", auth_token),
auth_header,
reqwest: reqwest::Client::new(),
}
}
/// Consolidate common request building code
fn request(&self, method: Method, url: &str) -> reqwest::RequestBuilder {
self.reqwest
.request(method, url)
.header("Authorization", &self.auth_header)
let mut req = self.reqwest.request(method, url);
if let Some(auth) = &self.auth_header {
req = req.header("Authorization", auth);
}
req
}
/// Write line protocol data to the specified organization and bucket.

View File

@ -0,0 +1 @@
pub mod server_fixture;

View File

@ -0,0 +1,298 @@
use once_cell::sync::OnceCell;
use std::{
fs::File,
process::{Child, Command},
sync::{
atomic::{AtomicUsize, Ordering::SeqCst},
Arc, Weak,
},
time::Duration,
};
use tokio::sync::Mutex;
type Result<T = (), E = Box<dyn std::error::Error>> = std::result::Result<T, E>;
#[macro_export]
/// If InfluxDB 2.0 OSS is available in the PATH at `influxd`, set up the server
/// as requested return it to the caller.
///
/// If `influxd` is not available, skip the calling test by returning
/// early. Additionally if `TEST_INTEGRATION` is set, turn this early return
/// into a panic to force a hard fail for skipped integration tests.
macro_rules! maybe_skip_integration {
($server_fixture:expr) => {
match (
std::process::Command::new("which")
.arg("influxd")
.stdout(std::process::Stdio::null())
.status()
.expect("should be able to run `which`")
.success(),
std::env::var("TEST_INTEGRATION").is_ok(),
) {
(true, _) => $server_fixture,
(false, true) => {
panic!(
"TEST_INTEGRATION is set which requires running integration tests, but \
`influxd` is not available"
)
}
_ => {
eprintln!("skipping integration test - install `influxd` to run");
return Ok(());
}
}
};
}
/// Represents a server that has been started and is available for
/// testing.
pub struct ServerFixture {
server: Arc<TestServer>,
}
impl ServerFixture {
/// Create a new server fixture and wait for it to be ready. This
/// is called "create" rather than new because it is async and
/// waits. The shared database can be used immediately.
///
/// This is currently implemented as a singleton so all tests *must*
/// use a new database and not interfere with the existing database.
pub async fn create_shared() -> Self {
// Try and reuse the same shared server, if there is already
// one present
static SHARED_SERVER: OnceCell<parking_lot::Mutex<Weak<TestServer>>> = OnceCell::new();
let shared_server = SHARED_SERVER.get_or_init(|| parking_lot::Mutex::new(Weak::new()));
let mut shared_server = shared_server.lock();
// is a shared server already present?
let server = match shared_server.upgrade() {
Some(server) => server,
None => {
// if not, create one
let mut server = TestServer::new().expect("Could start test server");
// ensure the server is ready
server.wait_until_ready(InitialConfig::Onboarded).await;
let server = Arc::new(server);
// save a reference for other threads that may want to
// use this server, but don't prevent it from being
// destroyed when going out of scope
*shared_server = Arc::downgrade(&server);
server
}
};
std::mem::drop(shared_server);
Self { server }
}
/// Create a new server fixture and wait for it to be ready. This
/// is called "create" rather than new because it is async and
/// waits. The database is left unconfigured and is not shared
/// with any other tests.
pub async fn create_single_use() -> Self {
let mut server = TestServer::new().expect("Could start test server");
// ensure the server is ready
server.wait_until_ready(InitialConfig::None).await;
let server = Arc::new(server);
Self { server }
}
/// Return a client suitable for communicating with this server
pub fn client(&self) -> influxdb2_client::Client {
match self.server.admin_token.as_ref() {
Some(token) => influxdb2_client::Client::new(self.http_base(), token),
None => influxdb2_client::Client::new(self.http_base(), ""),
}
}
/// Return the http base URL for the HTTP API
pub fn http_base(&self) -> &str {
&self.server.http_base
}
}
/// Specifies whether the server should be set up initially
#[derive(Debug, Copy, Clone, PartialEq)]
enum InitialConfig {
/// Don't set up the server, the test will (for testing onboarding)
None,
/// Onboard the server and set up the client with the associated token (for
/// most tests)
Onboarded,
}
// These port numbers are chosen to not collide with a development ioxd/influxd
// server running locally.
// TODO(786): allocate random free ports instead of hardcoding.
// TODO(785): we cannot use localhost here.
static NEXT_PORT: AtomicUsize = AtomicUsize::new(8190);
/// Represents the current known state of a TestServer
#[derive(Debug)]
enum ServerState {
Started,
Ready,
Error,
}
const ADMIN_TEST_USER: &str = "admin-test-user";
const ADMIN_TEST_ORG: &str = "admin-test-org";
const ADMIN_TEST_BUCKET: &str = "admin-test-bucket";
const ADMIN_TEST_PASSWORD: &str = "admin-test-password";
struct TestServer {
/// Is the server ready to accept connections?
ready: Mutex<ServerState>,
/// Handle to the server process being controlled
server_process: Child,
/// HTTP API base
http_base: String,
/// Admin token, if onboarding has happened
admin_token: Option<String>,
}
impl TestServer {
fn new() -> Result<Self> {
let ready = Mutex::new(ServerState::Started);
let http_port = NEXT_PORT.fetch_add(1, SeqCst);
let http_base = format!("http://127.0.0.1:{}", http_port);
let temp_dir = test_helpers::tmp_dir().unwrap();
let mut log_path = temp_dir.path().to_path_buf();
log_path.push(format!("influxdb_server_fixture_{}.log", http_port));
let mut bolt_path = temp_dir.path().to_path_buf();
bolt_path.push(format!("influxd_{}.bolt", http_port));
let mut engine_path = temp_dir.path().to_path_buf();
engine_path.push(format!("influxd_{}_engine", http_port));
println!("****************");
println!("Server Logging to {:?}", log_path);
println!("****************");
let log_file = File::create(log_path).expect("Opening log file");
let stdout_log_file = log_file
.try_clone()
.expect("cloning file handle for stdout");
let stderr_log_file = log_file;
let server_process = Command::new("influxd")
.arg("--http-bind-address")
.arg(format!(":{}", http_port))
.arg("--bolt-path")
.arg(bolt_path)
.arg("--engine-path")
.arg(engine_path)
// redirect output to log file
.stdout(stdout_log_file)
.stderr(stderr_log_file)
.spawn()?;
Ok(Self {
ready,
server_process,
http_base,
admin_token: None,
})
}
async fn wait_until_ready(&mut self, initial_config: InitialConfig) {
let mut ready = self.ready.lock().await;
match *ready {
ServerState::Started => {} // first time, need to try and start it
ServerState::Ready => {
return;
}
ServerState::Error => {
panic!("Server was previously found to be in Error, aborting");
}
}
let try_http_connect = async {
let client = reqwest::Client::new();
let url = format!("{}/health", self.http_base);
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
match client.get(&url).send().await {
Ok(resp) => {
println!("Successfully got a response from HTTP: {:?}", resp);
return;
}
Err(e) => {
println!("Waiting for HTTP server to be up: {}", e);
}
}
interval.tick().await;
}
};
let capped_check = tokio::time::timeout(Duration::from_secs(100), try_http_connect);
match capped_check.await {
Ok(_) => {
println!("Successfully started {}", self);
*ready = ServerState::Ready;
}
Err(e) => {
// tell others that this server had some problem
*ready = ServerState::Error;
std::mem::drop(ready);
panic!("Server was not ready in required time: {}", e);
}
}
// Onboard, if requested.
if initial_config == InitialConfig::Onboarded {
let client = influxdb2_client::Client::new(&self.http_base, "");
let response = client
.onboarding(
ADMIN_TEST_USER,
ADMIN_TEST_ORG,
ADMIN_TEST_BUCKET,
Some(ADMIN_TEST_PASSWORD.to_string()),
Some(0),
None,
)
.await;
match response {
Ok(onboarding) => {
let token = onboarding
.auth
.expect("Onboarding should have returned auth info")
.token
.expect("Onboarding auth should have returned a token");
self.admin_token = Some(token);
}
Err(e) => {
*ready = ServerState::Error;
std::mem::drop(ready);
panic!("Could not onboard: {}", e);
}
}
}
}
}
impl std::fmt::Display for TestServer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
write!(f, "TestServer (http api: {})", self.http_base)
}
}
impl Drop for TestServer {
fn drop(&mut self) {
self.server_process
.kill()
.expect("Should have been able to kill the test server");
}
}

View File

@ -0,0 +1,118 @@
pub mod common;
use common::server_fixture::ServerFixture;
type Result<T = (), E = Box<dyn std::error::Error>> = std::result::Result<T, E>;
#[tokio::test]
async fn new_server_needs_onboarded() -> Result {
let server_fixture = maybe_skip_integration!(ServerFixture::create_single_use()).await;
let client = server_fixture.client();
let res = client.is_onboarding_allowed().await?;
assert!(res);
// Creating a new setup user without first onboarding is an error
let username = "some-user";
let org = "some-org";
let bucket = "some-bucket";
let password = "some-password";
let retention_period_hrs = 0;
let err = client
.post_setup_user(
username,
org,
bucket,
Some(password.to_string()),
Some(retention_period_hrs),
None,
)
.await
.expect_err("Expected error, got success");
assert!(matches!(
err,
influxdb2_client::RequestError::Http {
status: reqwest::StatusCode::UNAUTHORIZED,
..
}
));
Ok(())
}
#[tokio::test]
async fn onboarding() -> Result {
let server_fixture = maybe_skip_integration!(ServerFixture::create_single_use()).await;
let client = server_fixture.client();
let username = "some-user";
let org = "some-org";
let bucket = "some-bucket";
let password = "some-password";
let retention_period_hrs = 0;
client
.onboarding(
username,
org,
bucket,
Some(password.to_string()),
Some(retention_period_hrs),
None,
)
.await?;
let res = client.is_onboarding_allowed().await?;
assert!(!res);
// Onboarding twice is an error
let err = client
.onboarding(
username,
org,
bucket,
Some(password.to_string()),
Some(retention_period_hrs),
None,
)
.await
.expect_err("Expected error, got success");
assert!(matches!(
err,
influxdb2_client::RequestError::Http {
status: reqwest::StatusCode::UNPROCESSABLE_ENTITY,
..
}
));
Ok(())
}
#[tokio::test]
async fn create_users() -> Result {
// Using a server that has been set up
let server_fixture = maybe_skip_integration!(ServerFixture::create_shared()).await;
let client = server_fixture.client();
let username = "another-user";
let org = "another-org";
let bucket = "another-bucket";
let password = "another-password";
let retention_period_hrs = 0;
// Creating a user should work
client
.post_setup_user(
username,
org,
bucket,
Some(password.to_string()),
Some(retention_period_hrs),
None,
)
.await?;
Ok(())
}