Merge branch 'main' into fixjaeger

pull/24376/head
kodiakhq[bot] 2021-06-11 10:58:17 +00:00 committed by GitHub
commit 95cb5bce8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 105 additions and 2 deletions

View File

@ -326,8 +326,8 @@ impl ChunkMover for LifecycleManager {
fn write_to_object_store(
&mut self,
partition_key: String,
table_name: String,
partition_key: String,
chunk_id: u32,
) -> TaskTracker<Self::Job> {
info!(%partition_key, %chunk_id, "write chunk to object store");
@ -338,7 +338,7 @@ impl ChunkMover for LifecycleManager {
tracker
}
fn drop_chunk(&mut self, partition_key: String, table_name: String, chunk_id: u32) {
fn drop_chunk(&mut self, table_name: String, partition_key: String, chunk_id: u32) {
info!(%partition_key, %chunk_id, "dropping chunk");
let _ = self
.db

View File

@ -4,6 +4,7 @@ pub mod management_api;
pub mod management_cli;
pub mod operations_api;
pub mod operations_cli;
mod persistence;
pub mod preservation;
pub mod read_api;
pub mod read_cli;

View File

@ -0,0 +1,65 @@
use std::{
convert::TryInto,
time::{Duration, Instant},
};
use data_types::chunk_metadata::ChunkSummary;
use crate::common::server_fixture::ServerFixture;
use super::scenario::{create_quickly_persisting_database, rand_name};
#[tokio::test]
async fn test_persistence() {
let fixture = ServerFixture::create_shared().await;
let mut write_client = fixture.write_client();
let mut management_client = fixture.management_client();
let db_name = rand_name();
create_quickly_persisting_database(&db_name, fixture.grpc_channel()).await;
// Stream in a write that should exceed the limit
let lp_lines: Vec<_> = (0..1_000)
.map(|i| format!("data,tag1=val{} x={} {}", i, i * 10, i))
.collect();
let num_lines_written = write_client
.write(&db_name, lp_lines.join("\n"))
.await
.expect("successful write");
assert_eq!(num_lines_written, 1000);
// wait for the chunk to be written to object store
let deadline = Instant::now() + Duration::from_secs(5);
let mut chunks = vec![];
loop {
assert!(
Instant::now() < deadline,
"Chunk did not persist in time. Chunks were: {:#?}",
chunks
);
chunks = management_client
.list_chunks(&db_name)
.await
.expect("listing chunks");
let storage_string = chunks
.iter()
.map(|c| {
let c: ChunkSummary = c.clone().try_into().unwrap();
format!("{:?}", c.storage)
})
.collect::<Vec<_>>()
.join(",");
// Found a persisted chunk, all good
if storage_string == "ReadBufferAndObjectStore" {
return;
}
// keep looking
println!("Current chunk storage: {:#?}", storage_string);
tokio::time::sleep(Duration::from_millis(200)).await
}
}

View File

@ -316,6 +316,43 @@ pub async fn create_readable_database(
.expect("create database failed");
}
/// given a channel to talk with the management api, create a new
/// database with the specified name that will aggressively try and
/// persist all data quickly
pub async fn create_quickly_persisting_database(
db_name: impl Into<String>,
channel: tonic::transport::Channel,
) {
let db_name = db_name.into();
let mut management_client = influxdb_iox_client::management::Client::new(channel);
let rules = DatabaseRules {
name: db_name.clone(),
partition_template: Some(PartitionTemplate {
parts: vec![partition_template::Part {
part: Some(partition_template::part::Part::Time(
"%Y-%m-%d %H:00:00".into(),
)),
}],
}),
lifecycle_rules: Some(LifecycleRules {
mutable_linger_seconds: 1,
mutable_size_threshold: 100,
buffer_size_soft: 1024 * 1024,
buffer_size_hard: 1024 * 1024,
persist: true,
..Default::default()
}),
..Default::default()
};
management_client
.create_database(rules.clone())
.await
.expect("create database failed");
println!("Created quickly persisting database {}", db_name);
}
/// given a channel to talk with the managment api, create a new
/// database with no mutable buffer configured, no partitioning rules
pub async fn create_unreadable_database(