influxdb/tests/end_to_end_cases/management_api.rs

721 lines
20 KiB
Rust

use std::num::NonZeroU32;
use generated_types::{
google::protobuf::{Duration, Empty},
influxdata::iox::management::v1::*,
};
use influxdb_iox_client::{management::CreateDatabaseError, operations};
use test_helpers::assert_contains;
use super::scenario::{
create_readable_database, create_two_partition_database, create_unreadable_database, rand_name,
};
use crate::common::server_fixture::ServerFixture;
#[tokio::test]
async fn test_list_update_remotes() {
let server_fixture = ServerFixture::create_single_use().await;
let mut client = server_fixture.management_client();
const TEST_REMOTE_ID_1: u32 = 42;
const TEST_REMOTE_ADDR_1: &str = "1.2.3.4:1234";
const TEST_REMOTE_ID_2: u32 = 84;
const TEST_REMOTE_ADDR_2: &str = "4.3.2.1:4321";
const TEST_REMOTE_ADDR_2_UPDATED: &str = "40.30.20.10:4321";
let res = client.list_remotes().await.expect("list remotes failed");
assert_eq!(res.len(), 0);
client
.update_remote(TEST_REMOTE_ID_1, TEST_REMOTE_ADDR_1)
.await
.expect("update failed");
let res = client.list_remotes().await.expect("list remotes failed");
assert_eq!(res.len(), 1);
client
.update_remote(TEST_REMOTE_ID_2, TEST_REMOTE_ADDR_2)
.await
.expect("update failed");
let res = client.list_remotes().await.expect("list remotes failed");
assert_eq!(res.len(), 2);
assert_eq!(res[0].id, TEST_REMOTE_ID_1);
assert_eq!(res[0].connection_string, TEST_REMOTE_ADDR_1);
assert_eq!(res[1].id, TEST_REMOTE_ID_2);
assert_eq!(res[1].connection_string, TEST_REMOTE_ADDR_2);
client
.delete_remote(TEST_REMOTE_ID_1)
.await
.expect("delete failed");
client
.delete_remote(TEST_REMOTE_ID_1)
.await
.expect_err("expected delete to fail");
let res = client.list_remotes().await.expect("list remotes failed");
assert_eq!(res.len(), 1);
assert_eq!(res[0].id, TEST_REMOTE_ID_2);
assert_eq!(res[0].connection_string, TEST_REMOTE_ADDR_2);
client
.update_remote(TEST_REMOTE_ID_2, TEST_REMOTE_ADDR_2_UPDATED)
.await
.expect("update failed");
let res = client.list_remotes().await.expect("list remotes failed");
assert_eq!(res.len(), 1);
assert_eq!(res[0].id, TEST_REMOTE_ID_2);
assert_eq!(res[0].connection_string, TEST_REMOTE_ADDR_2_UPDATED);
}
#[tokio::test]
async fn test_set_get_writer_id() {
let server_fixture = ServerFixture::create_single_use().await;
let mut client = server_fixture.management_client();
const TEST_ID: u32 = 42;
client
.update_writer_id(NonZeroU32::new(TEST_ID).unwrap())
.await
.expect("set ID failed");
let got = client.get_writer_id().await.expect("get ID failed");
assert_eq!(got.get(), TEST_ID);
}
#[tokio::test]
async fn test_create_database_duplicate_name() {
let server_fixture = ServerFixture::create_shared().await;
let mut client = server_fixture.management_client();
let db_name = rand_name();
client
.create_database(DatabaseRules {
name: db_name.clone(),
..Default::default()
})
.await
.expect("create database failed");
let err = client
.create_database(DatabaseRules {
name: db_name,
..Default::default()
})
.await
.expect_err("create database failed");
assert!(matches!(
dbg!(err),
CreateDatabaseError::DatabaseAlreadyExists
))
}
#[tokio::test]
async fn test_create_database_invalid_name() {
let server_fixture = ServerFixture::create_shared().await;
let mut client = server_fixture.management_client();
let err = client
.create_database(DatabaseRules {
name: "my_example\ndb".to_string(),
..Default::default()
})
.await
.expect_err("expected request to fail");
assert!(matches!(dbg!(err), CreateDatabaseError::InvalidArgument(_)));
}
#[tokio::test]
async fn test_list_databases() {
let server_fixture = ServerFixture::create_shared().await;
let mut client = server_fixture.management_client();
let name = rand_name();
client
.create_database(DatabaseRules {
name: name.clone(),
..Default::default()
})
.await
.expect("create database failed");
let names = client
.list_databases()
.await
.expect("list databases failed");
assert!(names.contains(&name));
}
#[tokio::test]
async fn test_create_get_update_database() {
let server_fixture = ServerFixture::create_shared().await;
let mut client = server_fixture.management_client();
let db_name = rand_name();
// Specify everything to allow direct comparison between request and response
// Otherwise would expect difference due to server-side defaulting
let mut rules = DatabaseRules {
name: db_name.clone(),
partition_template: Some(PartitionTemplate {
parts: vec![partition_template::Part {
part: Some(partition_template::part::Part::Table(Empty {})),
}],
}),
wal_buffer_config: Some(WalBufferConfig {
buffer_size: 24,
segment_size: 2,
buffer_rollover: wal_buffer_config::Rollover::DropIncoming as _,
persist_segments: true,
close_segment_after: Some(Duration {
seconds: 324,
nanos: 2,
}),
}),
lifecycle_rules: Some(LifecycleRules {
buffer_size_hard: 553,
sort_order: Some(lifecycle_rules::SortOrder {
order: Order::Asc as _,
sort: Some(lifecycle_rules::sort_order::Sort::CreatedAtTime(Empty {})),
}),
..Default::default()
}),
shard_config: None,
};
client
.create_database(rules.clone())
.await
.expect("create database failed");
let response = client
.get_database(&db_name)
.await
.expect("get database failed");
assert_eq!(response.shard_config, None);
rules.shard_config = Some(ShardConfig {
ignore_errors: true,
..Default::default()
});
let updated_rules = client
.update_database(rules.clone())
.await
.expect("update database failed");
assert_eq!(updated_rules, rules);
let response = client
.get_database(&db_name)
.await
.expect("get database failed");
assert_eq!(
response
.shard_config
.expect("shard config missing")
.ignore_errors,
true
);
}
#[tokio::test]
async fn test_chunk_get() {
use generated_types::influxdata::iox::management::v1::{Chunk, ChunkStorage};
let fixture = ServerFixture::create_shared().await;
let mut management_client = fixture.management_client();
let mut write_client = fixture.write_client();
let db_name = rand_name();
create_readable_database(&db_name, fixture.grpc_channel()).await;
let lp_lines = vec![
"cpu,region=west user=23.2 100",
"cpu,region=west user=21.0 150",
"disk,region=east bytes=99i 200",
];
write_client
.write(&db_name, lp_lines.join("\n"))
.await
.expect("write succeded");
let mut chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
// ensure the output order is consistent
chunks.sort_by(|c1, c2| c1.partition_key.cmp(&c2.partition_key));
// make sure there were timestamps prior to normalization
assert!(
chunks[0].time_of_first_write.is_some()
&& chunks[0].time_of_last_write.is_some()
&& chunks[0].time_closing.is_none(), // chunk is not yet closed
"actual:{:#?}",
chunks[0]
);
let chunks = normalize_chunks(chunks);
let expected: Vec<Chunk> = vec![
Chunk {
partition_key: "cpu".into(),
id: 0,
storage: ChunkStorage::OpenMutableBuffer as i32,
estimated_bytes: 137,
time_of_first_write: None,
time_of_last_write: None,
time_closing: None,
},
Chunk {
partition_key: "disk".into(),
id: 0,
storage: ChunkStorage::OpenMutableBuffer as i32,
estimated_bytes: 103,
time_of_first_write: None,
time_of_last_write: None,
time_closing: None,
},
];
assert_eq!(
expected, chunks,
"expected:\n\n{:#?}\n\nactual:{:#?}",
expected, chunks
);
}
#[tokio::test]
async fn test_chunk_get_errors() {
let fixture = ServerFixture::create_shared().await;
let mut management_client = fixture.management_client();
let db_name = rand_name();
let err = management_client
.list_chunks(&db_name)
.await
.expect_err("no db had been created");
assert_contains!(
err.to_string(),
"Some requested entity was not found: Resource database"
);
create_unreadable_database(&db_name, fixture.grpc_channel()).await;
}
#[tokio::test]
async fn test_partition_list() {
let fixture = ServerFixture::create_shared().await;
let mut management_client = fixture.management_client();
let db_name = rand_name();
create_two_partition_database(&db_name, fixture.grpc_channel()).await;
let mut partitions = management_client
.list_partitions(&db_name)
.await
.expect("listing partition");
// ensure the output order is consistent
partitions.sort_by(|p1, p2| p1.key.cmp(&p2.key));
let expected = vec![
Partition {
key: "cpu".to_string(),
},
Partition {
key: "mem".to_string(),
},
];
assert_eq!(
expected, partitions,
"expected:\n\n{:#?}\n\nactual:{:#?}",
expected, partitions
);
}
#[tokio::test]
async fn test_partition_list_error() {
let fixture = ServerFixture::create_shared().await;
let mut management_client = fixture.management_client();
let err = management_client
.list_partitions("this database does not exist")
.await
.expect_err("expected error");
assert_contains!(err.to_string(), "Database not found");
}
#[tokio::test]
async fn test_partition_get() {
use generated_types::influxdata::iox::management::v1::Partition;
let fixture = ServerFixture::create_shared().await;
let mut management_client = fixture.management_client();
let db_name = rand_name();
create_two_partition_database(&db_name, fixture.grpc_channel()).await;
let partition_key = "cpu";
let partition = management_client
.get_partition(&db_name, partition_key)
.await
.expect("getting partition");
let expected = Partition { key: "cpu".into() };
assert_eq!(
expected, partition,
"expected:\n\n{:#?}\n\nactual:{:#?}",
expected, partition
);
}
#[tokio::test]
async fn test_partition_get_error() {
let fixture = ServerFixture::create_shared().await;
let mut management_client = fixture.management_client();
let mut write_client = fixture.write_client();
let err = management_client
.list_partitions("this database does not exist")
.await
.expect_err("expected error");
assert_contains!(err.to_string(), "Database not found");
let db_name = rand_name();
create_readable_database(&db_name, fixture.grpc_channel()).await;
let lp_lines =
vec!["processes,host=foo running=4i,sleeping=514i,total=519i 1591894310000000000"];
write_client
.write(&db_name, lp_lines.join("\n"))
.await
.expect("write succeded");
let err = management_client
.get_partition(&db_name, "non existent partition")
.await
.expect_err("exepcted error getting partition");
assert_contains!(err.to_string(), "Partition not found");
}
#[tokio::test]
async fn test_list_partition_chunks() {
let fixture = ServerFixture::create_shared().await;
let mut management_client = fixture.management_client();
let mut write_client = fixture.write_client();
let db_name = rand_name();
create_readable_database(&db_name, fixture.grpc_channel()).await;
let lp_lines = vec![
"cpu,region=west user=23.2 100",
"cpu,region=west user=21.0 150",
"disk,region=east bytes=99i 200",
];
write_client
.write(&db_name, lp_lines.join("\n"))
.await
.expect("write succeded");
let partition_key = "cpu";
let chunks = management_client
.list_partition_chunks(&db_name, partition_key)
.await
.expect("getting partition chunks");
let chunks = normalize_chunks(chunks);
let expected: Vec<Chunk> = vec![Chunk {
partition_key: "cpu".into(),
id: 0,
storage: ChunkStorage::OpenMutableBuffer as i32,
estimated_bytes: 137,
time_of_first_write: None,
time_of_last_write: None,
time_closing: None,
}];
assert_eq!(
expected, chunks,
"expected:\n\n{:#?}\n\nactual:{:#?}",
expected, chunks
);
}
#[tokio::test]
async fn test_list_partition_chunk_errors() {
let fixture = ServerFixture::create_shared().await;
let mut management_client = fixture.management_client();
let db_name = rand_name();
let err = management_client
.list_partition_chunks(&db_name, "cpu")
.await
.expect_err("no db had been created");
assert_contains!(
err.to_string(),
"Some requested entity was not found: Resource database"
);
}
#[tokio::test]
async fn test_new_partition_chunk() {
let fixture = ServerFixture::create_shared().await;
let mut management_client = fixture.management_client();
let mut write_client = fixture.write_client();
let db_name = rand_name();
create_readable_database(&db_name, fixture.grpc_channel()).await;
let lp_lines = vec!["cpu,region=west user=23.2 100"];
write_client
.write(&db_name, lp_lines.join("\n"))
.await
.expect("write succeded");
let chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
assert_eq!(chunks.len(), 1, "Chunks: {:#?}", chunks);
let partition_key = "cpu";
// Rollover the a second chunk
management_client
.new_partition_chunk(&db_name, partition_key)
.await
.expect("new partition chunk");
// Load some more data and now expect that we have a second chunk
let lp_lines = vec!["cpu,region=west user=21.0 150"];
write_client
.write(&db_name, lp_lines.join("\n"))
.await
.expect("write succeded");
let chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
assert_eq!(chunks.len(), 2, "Chunks: {:#?}", chunks);
// Made all chunks in the same partition
assert_eq!(
chunks.iter().filter(|c| c.partition_key == "cpu").count(),
2,
"Chunks: {:#?}",
chunks
);
// Rollover a (currently non existent) partition which is not OK
let err = management_client
.new_partition_chunk(&db_name, "non_existent_partition")
.await
.expect_err("new partition chunk");
assert_eq!(
"Resource partition/non_existent_partition not found",
err.to_string()
);
}
#[tokio::test]
async fn test_new_partition_chunk_error() {
let fixture = ServerFixture::create_shared().await;
let mut management_client = fixture.management_client();
let err = management_client
.new_partition_chunk("this database does not exist", "nor_does_this_partition")
.await
.expect_err("expected error");
assert_contains!(
err.to_string(),
"Resource database/this database does not exist not found"
);
}
#[tokio::test]
async fn test_close_partition_chunk() {
use influxdb_iox_client::management::generated_types::operation_metadata::Job;
use influxdb_iox_client::management::generated_types::ChunkStorage;
let fixture = ServerFixture::create_shared().await;
let mut management_client = fixture.management_client();
let mut write_client = fixture.write_client();
let mut operations_client = fixture.operations_client();
let db_name = rand_name();
create_readable_database(&db_name, fixture.grpc_channel()).await;
let partition_key = "cpu";
let lp_lines = vec!["cpu,region=west user=23.2 100"];
write_client
.write(&db_name, lp_lines.join("\n"))
.await
.expect("write succeded");
let chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
assert_eq!(chunks.len(), 1, "Chunks: {:#?}", chunks);
assert_eq!(chunks[0].id, 0);
assert_eq!(chunks[0].storage, ChunkStorage::OpenMutableBuffer as i32);
// Move the chunk to read buffer
let operation = management_client
.close_partition_chunk(&db_name, partition_key, 0)
.await
.expect("new partition chunk");
println!("Operation response is {:?}", operation);
let operation_id = operation.name.parse().expect("not an integer");
let meta = operations::ClientOperation::try_new(operation)
.unwrap()
.metadata();
// ensure we got a legit job description back
if let Some(Job::CloseChunk(close_chunk)) = meta.job {
assert_eq!(close_chunk.db_name, db_name);
assert_eq!(close_chunk.partition_key, partition_key);
assert_eq!(close_chunk.chunk_id, 0);
} else {
panic!("unexpected job returned")
};
// wait for the job to be done
operations_client
.wait_operation(operation_id, Some(std::time::Duration::from_secs(1)))
.await
.expect("failed to wait operation");
// And now the chunk should be good
let mut chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
chunks.sort_by(|c1, c2| c1.id.cmp(&c2.id));
assert_eq!(chunks.len(), 1, "Chunks: {:#?}", chunks);
assert_eq!(chunks[0].id, 0);
assert_eq!(chunks[0].storage, ChunkStorage::ReadBuffer as i32);
}
#[tokio::test]
async fn test_close_partition_chunk_error() {
let fixture = ServerFixture::create_shared().await;
let mut management_client = fixture.management_client();
let err = management_client
.close_partition_chunk("this database does not exist", "nor_does_this_partition", 0)
.await
.expect_err("expected error");
assert_contains!(err.to_string(), "Database not found");
}
#[tokio::test]
async fn test_chunk_lifecycle() {
use influxdb_iox_client::management::generated_types::ChunkStorage;
let fixture = ServerFixture::create_shared().await;
let mut management_client = fixture.management_client();
let mut write_client = fixture.write_client();
let db_name = rand_name();
management_client
.create_database(DatabaseRules {
name: db_name.clone(),
lifecycle_rules: Some(LifecycleRules {
mutable_linger_seconds: 1,
..Default::default()
}),
..Default::default()
})
.await
.unwrap();
let lp_lines = vec!["cpu,region=west user=23.2 100"];
write_client
.write(&db_name, lp_lines.join("\n"))
.await
.expect("write succeded");
let chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].storage, ChunkStorage::OpenMutableBuffer as i32);
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
let chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].storage, ChunkStorage::ReadBuffer as i32);
}
/// Normalizes a set of Chunks for comparison by removing timestamps
fn normalize_chunks(chunks: Vec<Chunk>) -> Vec<Chunk> {
chunks
.into_iter()
.map(|summary| {
let Chunk {
partition_key,
id,
storage,
estimated_bytes,
..
} = summary;
Chunk {
partition_key,
id,
storage,
estimated_bytes,
time_of_first_write: None,
time_of_last_write: None,
time_closing: None,
}
})
.collect::<Vec<_>>()
}