feat: add wildcard support to persist partition CLI command (#3790)
* feat: add wildcard support to persist partition * chore: fmt Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
0229147909
commit
1960645055
|
@ -1,5 +1,7 @@
|
|||
//! This module implements the `partition` CLI command
|
||||
use data_types::chunk_metadata::ChunkStorage;
|
||||
use generated_types::google::FieldViolation;
|
||||
use hashbrown::HashSet;
|
||||
use influxdb_iox_client::{
|
||||
connection::Connection,
|
||||
management::{self},
|
||||
|
@ -18,6 +20,9 @@ pub enum Error {
|
|||
|
||||
#[error("Received invalid response: {0}")]
|
||||
InvalidResponse(#[from] FieldViolation),
|
||||
|
||||
#[error("Must either specify a TABLE_NAME or --all-tables")]
|
||||
MissingTableName,
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -59,11 +64,15 @@ struct Persist {
|
|||
partition_key: String,
|
||||
|
||||
/// The table name
|
||||
table_name: String,
|
||||
table_name: Option<String>,
|
||||
|
||||
/// Persist all data irrespective of arrival time
|
||||
#[clap(long)]
|
||||
force: bool,
|
||||
|
||||
/// Persist all tables that have data
|
||||
#[clap(long)]
|
||||
all_tables: bool,
|
||||
}
|
||||
|
||||
/// Compact Object Store Chunks
|
||||
|
@ -245,11 +254,38 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
|||
partition_key,
|
||||
table_name,
|
||||
force,
|
||||
all_tables,
|
||||
} = persist;
|
||||
|
||||
client
|
||||
.persist_partition(db_name, table_name, partition_key, force)
|
||||
.await?;
|
||||
match (table_name, all_tables) {
|
||||
(Some(table_name), false) => {
|
||||
client
|
||||
.persist_partition(db_name, table_name, partition_key, force)
|
||||
.await?;
|
||||
}
|
||||
(None, true) => {
|
||||
let tables: HashSet<_> = client
|
||||
.list_partition_chunks(&db_name, &partition_key)
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter(|chunk| {
|
||||
ChunkStorage::try_from(chunk.storage())
|
||||
.map(|s| !s.has_object_store())
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.map(|chunk| chunk.table_name)
|
||||
.collect();
|
||||
|
||||
for table_name in tables {
|
||||
println!("Persisting table: {}", table_name);
|
||||
client
|
||||
.persist_partition(&db_name, table_name, &partition_key, force)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
_ => return Err(Error::MissingTableName),
|
||||
}
|
||||
|
||||
println!("Ok");
|
||||
}
|
||||
Command::CompactObjectStoreChunks(compact) => {
|
||||
|
|
|
@ -1592,6 +1592,8 @@ async fn test_drop_partition_error() {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_persist_partition() {
|
||||
use generated_types::influxdata::iox::management::v1::{partition_template, PartitionTemplate};
|
||||
|
||||
let fixture = ServerFixture::create_shared(ServerType::Database).await;
|
||||
let addr = fixture.grpc_base();
|
||||
let db_name = rand_name();
|
||||
|
@ -1600,16 +1602,29 @@ async fn test_persist_partition() {
|
|||
.persist(true)
|
||||
.persist_age_threshold_seconds(1_000)
|
||||
.late_arrive_window_seconds(1)
|
||||
.partition_template(PartitionTemplate {
|
||||
parts: vec![partition_template::Part {
|
||||
part: Some(partition_template::part::Part::Time("00-00-00".into())),
|
||||
}],
|
||||
})
|
||||
.build(fixture.grpc_channel())
|
||||
.await;
|
||||
|
||||
let lp_data = vec!["cpu,region=west user=23.2 10"];
|
||||
let lp_data = vec![
|
||||
"cpu,region=west user=23.2 10",
|
||||
"foo,region=west user=23.2 10",
|
||||
"bar,region=west user=23.2 10",
|
||||
];
|
||||
load_lp(addr, &db_name, lp_data);
|
||||
|
||||
wait_for_exact_chunk_states(
|
||||
&fixture,
|
||||
&db_name,
|
||||
vec![ChunkStorage::OpenMutableBuffer],
|
||||
vec![
|
||||
ChunkStorage::OpenMutableBuffer,
|
||||
ChunkStorage::OpenMutableBuffer,
|
||||
ChunkStorage::OpenMutableBuffer,
|
||||
],
|
||||
std::time::Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
|
@ -1622,13 +1637,32 @@ async fn test_persist_partition() {
|
|||
.arg("partition")
|
||||
.arg("persist")
|
||||
.arg(&db_name)
|
||||
.arg("cpu")
|
||||
.arg("00-00-00")
|
||||
.arg("cpu")
|
||||
.arg("--host")
|
||||
.arg(addr)
|
||||
.assert()
|
||||
.success()
|
||||
.stdout(predicate::str::contains("Ok"));
|
||||
|
||||
Command::cargo_bin("influxdb_iox")
|
||||
.unwrap()
|
||||
.arg("database")
|
||||
.arg("partition")
|
||||
.arg("persist")
|
||||
.arg(&db_name)
|
||||
.arg("00-00-00")
|
||||
.arg("--all-tables")
|
||||
.arg("--host")
|
||||
.arg(addr)
|
||||
.assert()
|
||||
.success()
|
||||
.stdout(
|
||||
predicate::str::contains("Ok")
|
||||
.and(predicate::str::contains("foo"))
|
||||
.and(predicate::str::contains("bar"))
|
||||
.and(predicate::str::contains("cpu").not()),
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
@ -353,6 +353,11 @@ impl DatabaseBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn partition_template(mut self, partition_template: PartitionTemplate) -> Self {
|
||||
self.partition_template = partition_template;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn buffer_size_hard(mut self, buffer_size_hard: u64) -> Self {
|
||||
self.lifecycle_rules.buffer_size_hard = buffer_size_hard;
|
||||
self
|
||||
|
|
Loading…
Reference in New Issue