Merge branch 'main' into dom/batch-column-upsert
commit
03ffc03c4b
|
@ -1,11 +1,11 @@
|
|||
//! This module implements the `partition` CLI command
|
||||
use data_types::chunk_metadata::ChunkStorage;
|
||||
use generated_types::google::FieldViolation;
|
||||
use hashbrown::HashSet;
|
||||
use influxdb_iox_client::{
|
||||
connection::Connection,
|
||||
management::{self},
|
||||
};
|
||||
use std::collections::BTreeSet;
|
||||
use thiserror::Error;
|
||||
use uuid::Uuid;
|
||||
|
||||
|
@ -21,8 +21,14 @@ pub enum Error {
|
|||
#[error("Received invalid response: {0}")]
|
||||
InvalidResponse(#[from] FieldViolation),
|
||||
|
||||
#[error("Must either specify a TABLE_NAME or --all-tables")]
|
||||
#[error("Must either specify --table-name or --all-tables")]
|
||||
MissingTableName,
|
||||
|
||||
#[error("Must either specify a --partition-key or --all-partitions")]
|
||||
MissingPartitionKey,
|
||||
|
||||
#[error("Some operations returned an error, but --continue-on-error passed")]
|
||||
ContinuedOnError,
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -61,9 +67,11 @@ struct Persist {
|
|||
db_name: String,
|
||||
|
||||
/// The partition key
|
||||
partition_key: String,
|
||||
#[clap(long)]
|
||||
partition_key: Option<String>,
|
||||
|
||||
/// The table name
|
||||
#[clap(long)]
|
||||
table_name: Option<String>,
|
||||
|
||||
/// Persist all data irrespective of arrival time
|
||||
|
@ -73,6 +81,14 @@ struct Persist {
|
|||
/// Persist all tables that have data
|
||||
#[clap(long)]
|
||||
all_tables: bool,
|
||||
|
||||
/// Persist all partitions that have data
|
||||
#[clap(long)]
|
||||
all_partitions: bool,
|
||||
|
||||
/// Continue on error
|
||||
#[clap(long)]
|
||||
continue_on_error: bool,
|
||||
}
|
||||
|
||||
/// Compact Object Store Chunks
|
||||
|
@ -255,38 +271,64 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
|||
table_name,
|
||||
force,
|
||||
all_tables,
|
||||
all_partitions,
|
||||
continue_on_error,
|
||||
} = persist;
|
||||
|
||||
match (table_name, all_tables) {
|
||||
(Some(table_name), false) => {
|
||||
client
|
||||
.persist_partition(db_name, table_name, partition_key, force)
|
||||
.await?;
|
||||
}
|
||||
(None, true) => {
|
||||
let tables: HashSet<_> = client
|
||||
.list_partition_chunks(&db_name, &partition_key)
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter(|chunk| {
|
||||
ChunkStorage::try_from(chunk.storage())
|
||||
.map(|s| !s.has_object_store())
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.map(|chunk| chunk.table_name)
|
||||
.collect();
|
||||
let mut has_error = false;
|
||||
|
||||
for table_name in tables {
|
||||
println!("Persisting table: {}", table_name);
|
||||
client
|
||||
.persist_partition(&db_name, table_name, &partition_key, force)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
let partition_filter = match (partition_key, all_partitions) {
|
||||
(Some(partition_key), false) => Some(partition_key),
|
||||
(None, true) => None,
|
||||
_ => return Err(Error::MissingPartitionKey),
|
||||
};
|
||||
|
||||
let table_filter = match (table_name, all_tables) {
|
||||
(Some(table_name), false) => Some(table_name),
|
||||
(None, true) => None,
|
||||
_ => return Err(Error::MissingTableName),
|
||||
};
|
||||
|
||||
let mut partition_tables = BTreeSet::new();
|
||||
let chunks = client.list_chunks(&db_name).await?;
|
||||
for chunk in chunks {
|
||||
let partition_mismatch =
|
||||
matches!(&partition_filter, Some(x) if &chunk.partition_key != x);
|
||||
|
||||
let table_mismatch = matches!(&table_filter, Some(x) if &chunk.table_name != x);
|
||||
let already_persisted = ChunkStorage::try_from(chunk.storage())?.has_object_store();
|
||||
if !partition_mismatch && !table_mismatch && !already_persisted {
|
||||
partition_tables.insert((chunk.partition_key, chunk.table_name));
|
||||
}
|
||||
}
|
||||
|
||||
println!("Ok");
|
||||
for (partition, table_name) in partition_tables {
|
||||
println!(
|
||||
"Persisting partition: \"{}\", table: \"{}\"",
|
||||
partition, table_name
|
||||
);
|
||||
|
||||
let result = client
|
||||
.persist_partition(&db_name, &table_name, &partition, force)
|
||||
.await;
|
||||
|
||||
if let Err(e) = result {
|
||||
if !continue_on_error {
|
||||
return Err(e.into());
|
||||
}
|
||||
|
||||
has_error = true;
|
||||
eprintln!(
|
||||
"Error persisting partition: \"{}\", table: \"{}\": {}",
|
||||
partition, table_name, e
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
match has_error {
|
||||
true => return Err(Error::ContinuedOnError),
|
||||
false => println!("Ok"),
|
||||
}
|
||||
}
|
||||
Command::CompactObjectStoreChunks(compact) => {
|
||||
let CompactObjectStoreChunks {
|
||||
|
|
|
@ -1604,9 +1604,10 @@ async fn test_persist_partition() {
|
|||
.persist(true)
|
||||
.persist_age_threshold_seconds(1_000)
|
||||
.late_arrive_window_seconds(1)
|
||||
.mub_row_threshold(100)
|
||||
.partition_template(PartitionTemplate {
|
||||
parts: vec![partition_template::Part {
|
||||
part: Some(partition_template::part::Part::Time("00-00-00".into())),
|
||||
part: Some(partition_template::part::Part::Column("region".into())),
|
||||
}],
|
||||
})
|
||||
.build(fixture.grpc_channel())
|
||||
|
@ -1616,6 +1617,7 @@ async fn test_persist_partition() {
|
|||
"cpu,region=west user=23.2 10",
|
||||
"foo,region=west user=23.2 10",
|
||||
"bar,region=west user=23.2 10",
|
||||
"bar,region=east user=23.2 10",
|
||||
];
|
||||
load_lp(addr, &db_name, lp_data);
|
||||
|
||||
|
@ -1626,12 +1628,25 @@ async fn test_persist_partition() {
|
|||
ChunkStorage::OpenMutableBuffer,
|
||||
ChunkStorage::OpenMutableBuffer,
|
||||
ChunkStorage::OpenMutableBuffer,
|
||||
ChunkStorage::OpenMutableBuffer,
|
||||
],
|
||||
std::time::Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(1500)).await;
|
||||
// Late arrival period is 1 second
|
||||
wait_for_exact_chunk_states(
|
||||
&fixture,
|
||||
&db_name,
|
||||
vec![
|
||||
ChunkStorage::ReadBuffer,
|
||||
ChunkStorage::ReadBuffer,
|
||||
ChunkStorage::ReadBuffer,
|
||||
ChunkStorage::ReadBuffer,
|
||||
],
|
||||
std::time::Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
|
||||
Command::cargo_bin("influxdb_iox")
|
||||
.unwrap()
|
||||
|
@ -1639,7 +1654,9 @@ async fn test_persist_partition() {
|
|||
.arg("partition")
|
||||
.arg("persist")
|
||||
.arg(&db_name)
|
||||
.arg("00-00-00")
|
||||
.arg("--partition-key")
|
||||
.arg("region_west")
|
||||
.arg("--table-name")
|
||||
.arg("cpu")
|
||||
.arg("--host")
|
||||
.arg(addr)
|
||||
|
@ -1647,13 +1664,26 @@ async fn test_persist_partition() {
|
|||
.success()
|
||||
.stdout(predicate::str::contains("Ok"));
|
||||
|
||||
wait_for_exact_chunk_states(
|
||||
&fixture,
|
||||
&db_name,
|
||||
vec![
|
||||
ChunkStorage::ReadBuffer,
|
||||
ChunkStorage::ReadBuffer,
|
||||
ChunkStorage::ReadBuffer,
|
||||
ChunkStorage::ReadBufferAndObjectStore,
|
||||
],
|
||||
std::time::Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
|
||||
Command::cargo_bin("influxdb_iox")
|
||||
.unwrap()
|
||||
.arg("database")
|
||||
.arg("partition")
|
||||
.arg("persist")
|
||||
.arg(&db_name)
|
||||
.arg("00-00-00")
|
||||
.arg("--all-partitions")
|
||||
.arg("--all-tables")
|
||||
.arg("--host")
|
||||
.arg(addr)
|
||||
|
@ -1663,6 +1693,8 @@ async fn test_persist_partition() {
|
|||
predicate::str::contains("Ok")
|
||||
.and(predicate::str::contains("foo"))
|
||||
.and(predicate::str::contains("bar"))
|
||||
.and(predicate::str::contains("west"))
|
||||
.and(predicate::str::contains("east"))
|
||||
.and(predicate::str::contains("cpu").not()),
|
||||
);
|
||||
}
|
||||
|
@ -1676,29 +1708,23 @@ async fn test_persist_partition_error() {
|
|||
DatabaseBuilder::new(db_name.clone())
|
||||
.persist(true)
|
||||
.persist_age_threshold_seconds(1_000)
|
||||
.late_arrive_window_seconds(1_000)
|
||||
.late_arrive_window_seconds(1)
|
||||
.build(fixture.grpc_channel())
|
||||
.await;
|
||||
|
||||
let lp_data = vec!["cpu,region=west user=23.2 10"];
|
||||
load_lp(addr, &db_name, lp_data);
|
||||
|
||||
wait_for_exact_chunk_states(
|
||||
&fixture,
|
||||
&db_name,
|
||||
vec![ChunkStorage::OpenMutableBuffer],
|
||||
std::time::Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
|
||||
// there is no old data (late arrival window is 1000s) that can be persisted
|
||||
// cannot persist data immediately
|
||||
Command::cargo_bin("influxdb_iox")
|
||||
.unwrap()
|
||||
.arg("database")
|
||||
.arg("partition")
|
||||
.arg("persist")
|
||||
.arg(&db_name)
|
||||
.arg("--table-name")
|
||||
.arg("cpu")
|
||||
.arg("--partition-key")
|
||||
.arg("cpu")
|
||||
.arg("--host")
|
||||
.arg(addr)
|
||||
|
@ -1707,6 +1733,51 @@ async fn test_persist_partition_error() {
|
|||
.stderr(predicate::str::contains(
|
||||
"Cannot persist partition because it cannot be flushed at the moment",
|
||||
));
|
||||
|
||||
// Late arrival period is 1 second
|
||||
wait_for_exact_chunk_states(
|
||||
&fixture,
|
||||
&db_name,
|
||||
vec![ChunkStorage::ReadBuffer],
|
||||
std::time::Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
|
||||
let lp_data = vec!["bananas,region=west val=23.2 10"];
|
||||
load_lp(addr, &db_name, lp_data);
|
||||
|
||||
Command::cargo_bin("influxdb_iox")
|
||||
.unwrap()
|
||||
.arg("database")
|
||||
.arg("partition")
|
||||
.arg("persist")
|
||||
.arg(&db_name)
|
||||
.arg("--all-tables")
|
||||
.arg("--all-partitions")
|
||||
.arg("--continue-on-error")
|
||||
.arg("--host")
|
||||
.arg(addr)
|
||||
.assert()
|
||||
.failure()
|
||||
.stdout(predicate::str::contains("cpu"))
|
||||
.stderr(
|
||||
predicate::str::contains(
|
||||
"Cannot persist partition because it cannot be flushed at the moment",
|
||||
)
|
||||
.and(predicate::str::contains("bananas")),
|
||||
);
|
||||
|
||||
// Should have persisted first write
|
||||
wait_for_exact_chunk_states(
|
||||
&fixture,
|
||||
&db_name,
|
||||
vec![
|
||||
ChunkStorage::ReadBuffer,
|
||||
ChunkStorage::ReadBufferAndObjectStore,
|
||||
],
|
||||
std::time::Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
@ -480,11 +480,12 @@ pub async fn wait_for_exact_chunk_states(
|
|||
wait_time: std::time::Duration,
|
||||
) -> Vec<ChunkSummary> {
|
||||
// ensure consistent order
|
||||
desired_storages.sort();
|
||||
desired_storages.sort_unstable();
|
||||
|
||||
let fail_message = format!("persisted chunks in exactly {:?}", desired_storages);
|
||||
let pred = |chunks: &[ChunkSummary]| {
|
||||
let actual_storages = chunks.iter().map(|chunk| chunk.storage).collect::<Vec<_>>();
|
||||
let mut actual_storages = chunks.iter().map(|chunk| chunk.storage).collect::<Vec<_>>();
|
||||
actual_storages.sort_unstable();
|
||||
|
||||
desired_storages == actual_storages
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue