Merge branch 'main' into pd/add-regex-to-db-writers
commit
f477def27a
|
@ -42,6 +42,7 @@ message OperationMetadata {
|
||||||
CompactObjectStoreChunks compact_object_store_chunks = 18;
|
CompactObjectStoreChunks compact_object_store_chunks = 18;
|
||||||
LoadReadBufferChunk load_read_buffer_chunk = 19;
|
LoadReadBufferChunk load_read_buffer_chunk = 19;
|
||||||
RebuildPreservedCatalog rebuild_preserved_catalog = 20;
|
RebuildPreservedCatalog rebuild_preserved_catalog = 20;
|
||||||
|
CompactObjectStorePartition compact_object_store_partition = 21;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -110,6 +111,18 @@ message CompactObjectStoreChunks {
|
||||||
repeated bytes chunks = 4;
|
repeated bytes chunks = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Compact OS chunks of a partition into a single chunk
|
||||||
|
message CompactObjectStorePartition {
|
||||||
|
// name of the database
|
||||||
|
string db_name = 1;
|
||||||
|
|
||||||
|
// partition key
|
||||||
|
string partition_key = 2;
|
||||||
|
|
||||||
|
// table name
|
||||||
|
string table_name = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// Split and write chunks to object store
|
// Split and write chunks to object store
|
||||||
message PersistChunks {
|
message PersistChunks {
|
||||||
|
|
|
@ -95,6 +95,10 @@ service ManagementService {
|
||||||
//
|
//
|
||||||
// Errors if the chunks are not compacted yet and not contiguous
|
// Errors if the chunks are not compacted yet and not contiguous
|
||||||
rpc CompactObjectStoreChunks(CompactObjectStoreChunksRequest) returns (CompactObjectStoreChunksResponse);
|
rpc CompactObjectStoreChunks(CompactObjectStoreChunksRequest) returns (CompactObjectStoreChunksResponse);
|
||||||
|
|
||||||
|
// Compact all object store chunks of a given partition
|
||||||
|
//
|
||||||
|
rpc CompactObjectStorePartition(CompactObjectStorePartitionRequest) returns (CompactObjectStorePartitionResponse);
|
||||||
}
|
}
|
||||||
|
|
||||||
message ListDatabasesRequest {
|
message ListDatabasesRequest {
|
||||||
|
@ -505,3 +509,19 @@ message CompactObjectStoreChunksResponse {
|
||||||
google.longrunning.Operation operation = 1;
|
google.longrunning.Operation operation = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Request to commpact all object store of a given partition
|
||||||
|
message CompactObjectStorePartitionRequest {
|
||||||
|
// the name of the database
|
||||||
|
string db_name = 1;
|
||||||
|
|
||||||
|
// the partition key
|
||||||
|
string partition_key = 2;
|
||||||
|
|
||||||
|
// the table name
|
||||||
|
string table_name = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message CompactObjectStorePartitionResponse {
|
||||||
|
// The operation that tracks the work for compacting object store chunks
|
||||||
|
google.longrunning.Operation operation = 1;
|
||||||
|
}
|
||||||
|
|
|
@ -20,6 +20,10 @@ impl management::operation_metadata::Job {
|
||||||
db_name,
|
db_name,
|
||||||
..
|
..
|
||||||
}) => db_name,
|
}) => db_name,
|
||||||
|
Self::CompactObjectStorePartition(management::CompactObjectStorePartition {
|
||||||
|
db_name,
|
||||||
|
..
|
||||||
|
}) => db_name,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -85,6 +85,19 @@ struct CompactObjectStoreChunks {
|
||||||
chunk_ids: Vec<Uuid>,
|
chunk_ids: Vec<Uuid>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Compact all Object Store Chunks of a partition
|
||||||
|
#[derive(Debug, StructOpt)]
|
||||||
|
struct CompactObjectStorePartition {
|
||||||
|
/// The name of the database
|
||||||
|
db_name: String,
|
||||||
|
|
||||||
|
/// The partition key
|
||||||
|
partition_key: String,
|
||||||
|
|
||||||
|
/// The table name
|
||||||
|
table_name: String,
|
||||||
|
}
|
||||||
|
|
||||||
/// lists all chunks in this partition
|
/// lists all chunks in this partition
|
||||||
#[derive(Debug, StructOpt)]
|
#[derive(Debug, StructOpt)]
|
||||||
struct ListChunks {
|
struct ListChunks {
|
||||||
|
@ -175,6 +188,9 @@ enum Command {
|
||||||
/// Errors if the chunks are not yet compacted and not contiguous.
|
/// Errors if the chunks are not yet compacted and not contiguous.
|
||||||
CompactObjectStoreChunks(CompactObjectStoreChunks),
|
CompactObjectStoreChunks(CompactObjectStoreChunks),
|
||||||
|
|
||||||
|
/// Compact all object store chunks of a given partition
|
||||||
|
CompactObjectStorePartition(CompactObjectStorePartition),
|
||||||
|
|
||||||
/// Drop partition from memory and (if persisted) from object store.
|
/// Drop partition from memory and (if persisted) from object store.
|
||||||
Drop(DropPartition),
|
Drop(DropPartition),
|
||||||
|
|
||||||
|
@ -255,6 +271,19 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
||||||
|
|
||||||
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
|
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
|
||||||
}
|
}
|
||||||
|
Command::CompactObjectStorePartition(compact) => {
|
||||||
|
let CompactObjectStorePartition {
|
||||||
|
db_name,
|
||||||
|
partition_key,
|
||||||
|
table_name,
|
||||||
|
} = compact;
|
||||||
|
|
||||||
|
let operation = client
|
||||||
|
.compact_object_store_partition(db_name, table_name, partition_key)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
serde_json::to_writer_pretty(std::io::stdout(), &operation)?;
|
||||||
|
}
|
||||||
Command::Drop(drop_partition) => {
|
Command::Drop(drop_partition) => {
|
||||||
let DropPartition {
|
let DropPartition {
|
||||||
db_name,
|
db_name,
|
||||||
|
|
|
@ -535,6 +535,7 @@ impl management_service_server::ManagementService for ManagementService {
|
||||||
Ok(Response::new(DropPartitionResponse {}))
|
Ok(Response::new(DropPartitionResponse {}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Compact all given object store chunks
|
||||||
async fn compact_object_store_chunks(
|
async fn compact_object_store_chunks(
|
||||||
&self,
|
&self,
|
||||||
request: Request<CompactObjectStoreChunksRequest>,
|
request: Request<CompactObjectStoreChunksRequest>,
|
||||||
|
@ -570,6 +571,36 @@ impl management_service_server::ManagementService for ManagementService {
|
||||||
operation,
|
operation,
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Compact all object store chunks of the given partition
|
||||||
|
async fn compact_object_store_partition(
|
||||||
|
&self,
|
||||||
|
request: Request<CompactObjectStorePartitionRequest>,
|
||||||
|
) -> Result<Response<CompactObjectStorePartitionResponse>, Status> {
|
||||||
|
let CompactObjectStorePartitionRequest {
|
||||||
|
db_name,
|
||||||
|
partition_key,
|
||||||
|
table_name,
|
||||||
|
} = request.into_inner();
|
||||||
|
|
||||||
|
// Validate that the database name is legit
|
||||||
|
let db_name = DatabaseName::new(db_name).scope("db_name")?;
|
||||||
|
|
||||||
|
let db = self
|
||||||
|
.server
|
||||||
|
.db(&db_name)
|
||||||
|
.map_err(default_server_error_handler)?;
|
||||||
|
|
||||||
|
let tracker = db
|
||||||
|
.compact_object_store_partition(&table_name, &partition_key)
|
||||||
|
.map_err(default_db_error_handler)?;
|
||||||
|
|
||||||
|
let operation = Some(super::operations::encode_tracker(tracker)?);
|
||||||
|
|
||||||
|
Ok(Response::new(CompactObjectStorePartitionResponse {
|
||||||
|
operation,
|
||||||
|
}))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns [`DatabaseRules`] formated according to the `omit_defaults` flag. If `omit_defaults` is
|
/// Returns [`DatabaseRules`] formated according to the `omit_defaults` flag. If `omit_defaults` is
|
||||||
|
|
|
@ -17,9 +17,8 @@ use uuid::Uuid;
|
||||||
use crate::{
|
use crate::{
|
||||||
common::server_fixture::{ServerFixture, ServerType},
|
common::server_fixture::{ServerFixture, ServerType},
|
||||||
end_to_end_cases::scenario::{
|
end_to_end_cases::scenario::{
|
||||||
collect_query, create_readable_database, data_dir, db_data_dir, rand_name,
|
create_readable_database, data_dir, db_data_dir, rand_name, wait_for_database_initialized,
|
||||||
wait_for_database_initialized, wait_for_exact_chunk_states,
|
wait_for_exact_chunk_states, wait_for_operations_to_complete,
|
||||||
wait_for_operations_to_complete,
|
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -234,13 +233,14 @@ async fn migrate_table_files_from_one_server_to_another() {
|
||||||
wait_for_database_initialized(&fixture, &db_name, Duration::from_secs(5)).await;
|
wait_for_database_initialized(&fixture, &db_name, Duration::from_secs(5)).await;
|
||||||
|
|
||||||
// Now the data shoudl be available for the_table
|
// Now the data shoudl be available for the_table
|
||||||
let query_results = flight_client
|
let batches = flight_client
|
||||||
.perform_query(&db_name, sql_query)
|
.perform_query(&db_name, sql_query)
|
||||||
.await
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.collect()
|
||||||
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let batches = collect_query(query_results).await;
|
|
||||||
|
|
||||||
let expected = vec![
|
let expected = vec![
|
||||||
"+-----------------+",
|
"+-----------------+",
|
||||||
"| COUNT(UInt8(1)) |",
|
"| COUNT(UInt8(1)) |",
|
||||||
|
|
|
@ -52,7 +52,7 @@ async fn test_delete_on_database() {
|
||||||
.perform_query(db_name.clone(), "select * from cpu")
|
.perform_query(db_name.clone(), "select * from cpu")
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let batches = query_results.to_batches().await.unwrap();
|
let batches = query_results.collect().await.unwrap();
|
||||||
let expected = [
|
let expected = [
|
||||||
"+--------+--------------------------------+------+",
|
"+--------+--------------------------------+------+",
|
||||||
"| region | time | user |",
|
"| region | time | user |",
|
||||||
|
@ -86,7 +86,7 @@ async fn test_delete_on_database() {
|
||||||
.perform_query(db_name.clone(), "select * from cpu")
|
.perform_query(db_name.clone(), "select * from cpu")
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let batches = query_results.to_batches().await.unwrap();
|
let batches = query_results.collect().await.unwrap();
|
||||||
let expected = [
|
let expected = [
|
||||||
"+--------+--------------------------------+------+",
|
"+--------+--------------------------------+------+",
|
||||||
"| region | time | user |",
|
"| region | time | user |",
|
||||||
|
@ -104,7 +104,7 @@ async fn test_delete_on_database() {
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let batches = query_results.to_batches().await.unwrap();
|
let batches = query_results.collect().await.unwrap();
|
||||||
// result should be as above
|
// result should be as above
|
||||||
assert_batches_sorted_eq!(&expected, &batches);
|
assert_batches_sorted_eq!(&expected, &batches);
|
||||||
|
|
||||||
|
@ -113,7 +113,7 @@ async fn test_delete_on_database() {
|
||||||
.perform_query(db_name.clone(), "select * from cpu where user!=21")
|
.perform_query(db_name.clone(), "select * from cpu where user!=21")
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let batches = query_results.to_batches().await.unwrap();
|
let batches = query_results.collect().await.unwrap();
|
||||||
// result should be nothing
|
// result should be nothing
|
||||||
let expected = ["++", "++"];
|
let expected = ["++", "++"];
|
||||||
assert_batches_sorted_eq!(&expected, &batches);
|
assert_batches_sorted_eq!(&expected, &batches);
|
||||||
|
@ -135,7 +135,7 @@ async fn test_delete_on_database() {
|
||||||
.perform_query(db_name.clone(), "select * from cpu")
|
.perform_query(db_name.clone(), "select * from cpu")
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let batches = query_results.to_batches().await.unwrap();
|
let batches = query_results.collect().await.unwrap();
|
||||||
let cpu_expected = [
|
let cpu_expected = [
|
||||||
"+--------+--------------------------------+------+",
|
"+--------+--------------------------------+------+",
|
||||||
"| region | time | user |",
|
"| region | time | user |",
|
||||||
|
@ -149,7 +149,7 @@ async fn test_delete_on_database() {
|
||||||
.perform_query(db_name.clone(), "select * from disk")
|
.perform_query(db_name.clone(), "select * from disk")
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let batches = query_results.to_batches().await.unwrap();
|
let batches = query_results.collect().await.unwrap();
|
||||||
let disk_expected = [
|
let disk_expected = [
|
||||||
"+-------+--------+--------------------------------+",
|
"+-------+--------+--------------------------------+",
|
||||||
"| bytes | region | time |",
|
"| bytes | region | time |",
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use super::scenario::{collect_query, create_readable_database, rand_name, Scenario};
|
use super::scenario::{create_readable_database, rand_name, Scenario};
|
||||||
use crate::common::server_fixture::{ServerFixture, ServerType};
|
use crate::common::server_fixture::{ServerFixture, ServerType};
|
||||||
use arrow_util::assert_batches_sorted_eq;
|
use arrow_util::assert_batches_sorted_eq;
|
||||||
|
|
||||||
|
@ -20,13 +20,14 @@ pub async fn test() {
|
||||||
// This does nothing except test the client handshake implementation.
|
// This does nothing except test the client handshake implementation.
|
||||||
client.handshake().await.unwrap();
|
client.handshake().await.unwrap();
|
||||||
|
|
||||||
let query_results = client
|
let batches = client
|
||||||
.perform_query(scenario.database_name(), sql_query)
|
.perform_query(scenario.database_name(), sql_query)
|
||||||
.await
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.collect()
|
||||||
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let batches = collect_query(query_results).await;
|
|
||||||
|
|
||||||
let expected_read_data: Vec<_> = expected_read_data.iter().map(|s| s.as_str()).collect();
|
let expected_read_data: Vec<_> = expected_read_data.iter().map(|s| s.as_str()).collect();
|
||||||
assert_batches_sorted_eq!(expected_read_data, &batches);
|
assert_batches_sorted_eq!(expected_read_data, &batches);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,10 +1,14 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
common::server_fixture::{ServerFixture, ServerType, TestConfig, DEFAULT_SERVER_ID},
|
common::server_fixture::{ServerFixture, ServerType, TestConfig, DEFAULT_SERVER_ID},
|
||||||
end_to_end_cases::scenario::{
|
end_to_end_cases::{
|
||||||
create_readable_database, create_two_partition_database, create_unreadable_database,
|
management_cli::setup_load_and_persist_two_partition_chunks,
|
||||||
fixture_broken_catalog, rand_name, wait_for_exact_chunk_states, DatabaseBuilder,
|
scenario::{
|
||||||
|
create_readable_database, create_two_partition_database, create_unreadable_database,
|
||||||
|
fixture_broken_catalog, rand_name, wait_for_exact_chunk_states, DatabaseBuilder,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
use bytes::Bytes;
|
||||||
use data_types::chunk_metadata::ChunkId;
|
use data_types::chunk_metadata::ChunkId;
|
||||||
use generated_types::google::protobuf::{Duration, Empty};
|
use generated_types::google::protobuf::{Duration, Empty};
|
||||||
use influxdb_iox_client::{
|
use influxdb_iox_client::{
|
||||||
|
@ -1741,143 +1745,24 @@ async fn test_persist_partition_error() {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_compact_os_chunks() {
|
async fn test_compact_os_chunks() {
|
||||||
use data_types::chunk_metadata::ChunkStorage;
|
// Make 2 persisted chunks for a partition
|
||||||
|
let (fixture, db_name, _addr, chunk_ids) = setup_load_and_persist_two_partition_chunks().await;
|
||||||
let fixture = ServerFixture::create_shared(ServerType::Database).await;
|
assert!(chunk_ids.len() > 1);
|
||||||
let mut write_client = fixture.write_client();
|
|
||||||
let mut management_client = fixture.management_client();
|
let mut management_client = fixture.management_client();
|
||||||
let mut operations_client = fixture.operations_client();
|
let mut operations_client = fixture.operations_client();
|
||||||
|
|
||||||
let db_name = rand_name();
|
let c_ids: Vec<Bytes> = chunk_ids
|
||||||
DatabaseBuilder::new(db_name.clone())
|
.iter()
|
||||||
.persist(true)
|
.map(|id| {
|
||||||
.persist_age_threshold_seconds(1_000)
|
let id_uuid = Uuid::parse_str(id).unwrap();
|
||||||
.late_arrive_window_seconds(1)
|
id_uuid.as_bytes().to_vec().into()
|
||||||
.build(fixture.grpc_channel())
|
})
|
||||||
.await;
|
.collect();
|
||||||
|
|
||||||
// Chunk 1
|
// Compact all 2 OS chunks of the partition
|
||||||
let lp_lines = vec!["cpu,tag1=cupcakes bar=1 10", "cpu,tag1=cookies bar=2 10"];
|
// note that both partition and table_name are "cpu" in the setup
|
||||||
|
|
||||||
let num_lines_written = write_client
|
|
||||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
|
||||||
.await
|
|
||||||
.expect("write succeded");
|
|
||||||
assert_eq!(num_lines_written, 2);
|
|
||||||
|
|
||||||
wait_for_exact_chunk_states(
|
|
||||||
&fixture,
|
|
||||||
&db_name,
|
|
||||||
vec![ChunkStorage::OpenMutableBuffer],
|
|
||||||
std::time::Duration::from_secs(5),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let chunks = management_client
|
|
||||||
.list_chunks(&db_name)
|
|
||||||
.await
|
|
||||||
.expect("listing chunks");
|
|
||||||
assert_eq!(chunks.len(), 1);
|
|
||||||
let partition_key = &chunks[0].partition_key;
|
|
||||||
|
|
||||||
management_client
|
|
||||||
.persist_partition(&db_name, "cpu", &partition_key[..], true)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let chunks = management_client
|
|
||||||
.list_chunks(&db_name)
|
|
||||||
.await
|
|
||||||
.expect("listing chunks");
|
|
||||||
assert_eq!(chunks.len(), 1);
|
|
||||||
assert_eq!(
|
|
||||||
chunks[0].storage,
|
|
||||||
generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore
|
|
||||||
as i32
|
|
||||||
);
|
|
||||||
|
|
||||||
// chunk 2
|
|
||||||
let lp_lines = vec![
|
|
||||||
"cpu,tag1=cookies bar=2 20",
|
|
||||||
"cpu,tag1=cookies bar=3 30", // duplicate
|
|
||||||
"cpu,tag1=cupcakes bar=2 20",
|
|
||||||
];
|
|
||||||
|
|
||||||
let num_lines_written = write_client
|
|
||||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
|
||||||
.await
|
|
||||||
.expect("write succeded");
|
|
||||||
assert_eq!(num_lines_written, 3);
|
|
||||||
|
|
||||||
let chunks = management_client
|
|
||||||
.list_chunks(&db_name)
|
|
||||||
.await
|
|
||||||
.expect("listing chunks");
|
|
||||||
assert_eq!(chunks.len(), 2);
|
|
||||||
let partition_key = &chunks[0].partition_key;
|
|
||||||
|
|
||||||
management_client
|
|
||||||
.persist_partition(&db_name, "cpu", &partition_key[..], true)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let mut chunks = management_client
|
|
||||||
.list_chunks(&db_name)
|
|
||||||
.await
|
|
||||||
.expect("listing chunks");
|
|
||||||
// ensure chunk in deterministic order
|
|
||||||
chunks.sort_by(|c1, c2| c1.id.cmp(&c2.id));
|
|
||||||
assert_eq!(chunks.len(), 2);
|
|
||||||
assert_eq!(
|
|
||||||
chunks[0].storage,
|
|
||||||
generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore
|
|
||||||
as i32
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
chunks[1].storage,
|
|
||||||
generated_types::influxdata::iox::management::v1::ChunkStorage::ReadBufferAndObjectStore
|
|
||||||
as i32
|
|
||||||
);
|
|
||||||
|
|
||||||
let chunk_id_1 = chunks[0].id.clone();
|
|
||||||
let partition_key_1 = &chunks[0].partition_key;
|
|
||||||
let chunk_id_2 = chunks[1].id.clone();
|
|
||||||
let partition_key_2 = &chunks[1].partition_key;
|
|
||||||
assert_eq!(partition_key_1, partition_key_2);
|
|
||||||
|
|
||||||
// unload both RUBs
|
|
||||||
management_client
|
|
||||||
.unload_partition_chunk(&db_name, "cpu", &partition_key_1[..], chunk_id_1.clone())
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
management_client
|
|
||||||
.unload_partition_chunk(&db_name, "cpu", &partition_key_2[..], chunk_id_2.clone())
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// verify chunk status again
|
|
||||||
let chunks = management_client
|
|
||||||
.list_chunks(&db_name)
|
|
||||||
.await
|
|
||||||
.expect("listing chunks");
|
|
||||||
assert_eq!(chunks.len(), 2);
|
|
||||||
assert_eq!(
|
|
||||||
chunks[0].storage,
|
|
||||||
generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
chunks[1].storage,
|
|
||||||
generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32
|
|
||||||
);
|
|
||||||
|
|
||||||
// Compact 2 chunks
|
|
||||||
let iox_operation = management_client
|
let iox_operation = management_client
|
||||||
.compact_object_store_chunks(
|
.compact_object_store_chunks(&db_name, "cpu", "cpu", c_ids.clone())
|
||||||
&db_name,
|
|
||||||
"cpu",
|
|
||||||
&partition_key_1[..],
|
|
||||||
vec![chunk_id_1.clone(), chunk_id_2.clone()],
|
|
||||||
)
|
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
@ -1887,7 +1772,7 @@ async fn test_compact_os_chunks() {
|
||||||
match iox_operation.metadata.job {
|
match iox_operation.metadata.job {
|
||||||
Some(Job::CompactObjectStoreChunks(job)) => {
|
Some(Job::CompactObjectStoreChunks(job)) => {
|
||||||
assert_eq!(&job.db_name, &db_name);
|
assert_eq!(&job.db_name, &db_name);
|
||||||
assert_eq!(job.partition_key.as_str(), partition_key_1);
|
assert_eq!(job.partition_key.as_str(), "cpu");
|
||||||
assert_eq!(job.table_name.as_str(), "cpu");
|
assert_eq!(job.table_name.as_str(), "cpu");
|
||||||
}
|
}
|
||||||
job => panic!("unexpected job returned {:#?}", job),
|
job => panic!("unexpected job returned {:#?}", job),
|
||||||
|
@ -1910,6 +1795,55 @@ async fn test_compact_os_chunks() {
|
||||||
generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32
|
generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32
|
||||||
);
|
);
|
||||||
let new_chunk_id = chunks[0].id.clone();
|
let new_chunk_id = chunks[0].id.clone();
|
||||||
assert_ne!(new_chunk_id, chunk_id_1);
|
assert_ne!(new_chunk_id, c_ids[0]);
|
||||||
assert_ne!(new_chunk_id, chunk_id_2);
|
assert_ne!(new_chunk_id, c_ids[1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_compact_os_partition() {
|
||||||
|
// Make 2 persisted chunks for a partition
|
||||||
|
let (fixture, db_name, _addr, chunk_ids) = setup_load_and_persist_two_partition_chunks().await;
|
||||||
|
let mut management_client = fixture.management_client();
|
||||||
|
let mut operations_client = fixture.operations_client();
|
||||||
|
|
||||||
|
// Compact all 2 OS chunks of the partition
|
||||||
|
// note that both partition and table_name are "cpu" in the setup
|
||||||
|
let iox_operation = management_client
|
||||||
|
.compact_object_store_partition(&db_name, "cpu", "cpu")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let operation_id = iox_operation.operation.id();
|
||||||
|
|
||||||
|
// ensure we got a legit job description back
|
||||||
|
// note that since compact_object_store_partition invokes compact_object_store_chunks,
|
||||||
|
// its job is recorded as CompactObjectStoreChunks
|
||||||
|
match iox_operation.metadata.job {
|
||||||
|
Some(Job::CompactObjectStoreChunks(job)) => {
|
||||||
|
assert_eq!(&job.db_name, &db_name);
|
||||||
|
assert_eq!(job.partition_key.as_str(), "cpu");
|
||||||
|
assert_eq!(job.table_name.as_str(), "cpu");
|
||||||
|
}
|
||||||
|
job => panic!("unexpected job returned {:#?}", job),
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for the job to be done
|
||||||
|
operations_client
|
||||||
|
.wait_operation(operation_id, Some(std::time::Duration::from_secs(1)))
|
||||||
|
.await
|
||||||
|
.expect("failed to wait operation");
|
||||||
|
|
||||||
|
// verify chunks after compaction
|
||||||
|
let chunks = management_client
|
||||||
|
.list_chunks(&db_name)
|
||||||
|
.await
|
||||||
|
.expect("listing chunks");
|
||||||
|
assert_eq!(chunks.len(), 1);
|
||||||
|
assert_eq!(
|
||||||
|
chunks[0].storage,
|
||||||
|
generated_types::influxdata::iox::management::v1::ChunkStorage::ObjectStoreOnly as i32
|
||||||
|
);
|
||||||
|
let new_chunk_id = chunks[0].id.clone();
|
||||||
|
assert_ne!(new_chunk_id, chunk_ids[0]);
|
||||||
|
assert_ne!(new_chunk_id, chunk_ids[1]);
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,7 @@ use crate::{
|
||||||
common::server_fixture::{ServerFixture, ServerType},
|
common::server_fixture::{ServerFixture, ServerType},
|
||||||
end_to_end_cases::scenario::{
|
end_to_end_cases::scenario::{
|
||||||
fixture_broken_catalog, fixture_replay_broken, list_chunks, wait_for_exact_chunk_states,
|
fixture_broken_catalog, fixture_replay_broken, list_chunks, wait_for_exact_chunk_states,
|
||||||
DatabaseBuilder,
|
wait_for_operations_to_complete, DatabaseBuilder,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use assert_cmd::Command;
|
use assert_cmd::Command;
|
||||||
|
@ -1547,3 +1547,138 @@ async fn test_persist_partition_error() {
|
||||||
"Cannot persist partition because it cannot be flushed at the moment",
|
"Cannot persist partition because it cannot be flushed at the moment",
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_compact_os_partition() {
|
||||||
|
// Make 2 persisted chunks for a partition
|
||||||
|
let (fixture, db_name, addr, _chunk_ids) = setup_load_and_persist_two_partition_chunks().await;
|
||||||
|
|
||||||
|
// Compact the partition which will compact those 2 chunks
|
||||||
|
let iox_operation: IoxOperation = serde_json::from_slice(
|
||||||
|
&Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("database")
|
||||||
|
.arg("partition")
|
||||||
|
.arg("compact-object-store-partition")
|
||||||
|
.arg(&db_name)
|
||||||
|
.arg("cpu") // partition key
|
||||||
|
.arg("cpu") // table name
|
||||||
|
//.arg(chunk_ids)
|
||||||
|
.arg("--host")
|
||||||
|
.arg(addr)
|
||||||
|
.assert()
|
||||||
|
.success()
|
||||||
|
.get_output()
|
||||||
|
.stdout,
|
||||||
|
)
|
||||||
|
.expect("Expected JSON output");
|
||||||
|
|
||||||
|
// Ensure we got a legit job description back
|
||||||
|
match iox_operation.metadata.job {
|
||||||
|
Some(Job::CompactObjectStoreChunks(job)) => {
|
||||||
|
assert_eq!(job.chunks.len(), 2);
|
||||||
|
assert_eq!(&job.db_name, &db_name);
|
||||||
|
assert_eq!(job.partition_key.as_str(), "cpu");
|
||||||
|
assert_eq!(job.table_name.as_str(), "cpu");
|
||||||
|
}
|
||||||
|
job => panic!("unexpected job returned {:#?}", job),
|
||||||
|
}
|
||||||
|
// Wait for the compaction to complete
|
||||||
|
wait_for_operations_to_complete(&fixture, &db_name, Duration::from_secs(5)).await;
|
||||||
|
|
||||||
|
// Verify chunk the DB now only has one OS-only chunk
|
||||||
|
let chunks = list_chunks(&fixture, &db_name).await;
|
||||||
|
assert_eq!(chunks.len(), 1);
|
||||||
|
assert_eq!(chunks[0].storage, ChunkStorage::ObjectStoreOnly);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_compact_os_chunks() {
|
||||||
|
// Make 2 persisted chunks for a partition
|
||||||
|
let (fixture, db_name, addr, chunk_ids) = setup_load_and_persist_two_partition_chunks().await;
|
||||||
|
|
||||||
|
// Compact the partition which will compact those 2 chunks
|
||||||
|
let iox_operation: IoxOperation = serde_json::from_slice(
|
||||||
|
&Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("database")
|
||||||
|
.arg("partition")
|
||||||
|
.arg("compact-object-store-chunks")
|
||||||
|
.arg(&db_name)
|
||||||
|
.arg("cpu") // partition key
|
||||||
|
.arg("cpu") // table name
|
||||||
|
.arg(chunk_ids[0].clone())
|
||||||
|
.arg(chunk_ids[1].clone())
|
||||||
|
.arg("--host")
|
||||||
|
.arg(addr)
|
||||||
|
.assert()
|
||||||
|
.success()
|
||||||
|
.get_output()
|
||||||
|
.stdout,
|
||||||
|
)
|
||||||
|
.expect("Expected JSON output");
|
||||||
|
|
||||||
|
// Ensure we got a legit job description back
|
||||||
|
match iox_operation.metadata.job {
|
||||||
|
Some(Job::CompactObjectStoreChunks(job)) => {
|
||||||
|
assert_eq!(job.chunks.len(), 2);
|
||||||
|
assert_eq!(&job.db_name, &db_name);
|
||||||
|
assert_eq!(job.partition_key.as_str(), "cpu");
|
||||||
|
assert_eq!(job.table_name.as_str(), "cpu");
|
||||||
|
}
|
||||||
|
job => panic!("unexpected job returned {:#?}", job),
|
||||||
|
}
|
||||||
|
// Wait for the compaction to complete
|
||||||
|
wait_for_operations_to_complete(&fixture, &db_name, Duration::from_secs(5)).await;
|
||||||
|
|
||||||
|
// Verify chunk the DB now only has one OS-only chunk
|
||||||
|
let chunks = list_chunks(&fixture, &db_name).await;
|
||||||
|
assert_eq!(chunks.len(), 1);
|
||||||
|
assert_eq!(chunks[0].storage, ChunkStorage::ObjectStoreOnly);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn setup_load_and_persist_two_partition_chunks(
|
||||||
|
) -> (Arc<ServerFixture>, String, String, Vec<String>) {
|
||||||
|
let fixture = Arc::from(ServerFixture::create_shared(ServerType::Database).await);
|
||||||
|
let addr = fixture.grpc_base();
|
||||||
|
let db_name = rand_name();
|
||||||
|
|
||||||
|
DatabaseBuilder::new(db_name.clone())
|
||||||
|
.persist(true)
|
||||||
|
.persist_age_threshold_seconds(1)
|
||||||
|
.late_arrive_window_seconds(1)
|
||||||
|
.build(fixture.grpc_channel())
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Load first chunk and wait for it to get persisted
|
||||||
|
let lp_data = vec!["cpu,region=west user=23.2 10"];
|
||||||
|
load_lp(addr, &db_name, lp_data);
|
||||||
|
|
||||||
|
wait_for_exact_chunk_states(
|
||||||
|
&fixture,
|
||||||
|
&db_name,
|
||||||
|
vec![ChunkStorage::ReadBufferAndObjectStore],
|
||||||
|
std::time::Duration::from_secs(10),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Load second chunk and wait for it to get persisted, too
|
||||||
|
let lp_data = vec!["cpu,region=east user=79 30"];
|
||||||
|
load_lp(addr, &db_name, lp_data);
|
||||||
|
|
||||||
|
let chunks = wait_for_exact_chunk_states(
|
||||||
|
&fixture,
|
||||||
|
&db_name,
|
||||||
|
vec![
|
||||||
|
ChunkStorage::ReadBufferAndObjectStore,
|
||||||
|
ChunkStorage::ReadBufferAndObjectStore,
|
||||||
|
],
|
||||||
|
std::time::Duration::from_secs(10),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// collect chunk ids
|
||||||
|
let chunk_ids: Vec<_> = chunks.iter().map(|c| c.id.get().to_string()).collect();
|
||||||
|
|
||||||
|
(Arc::clone(&fixture), db_name, String::from(addr), chunk_ids)
|
||||||
|
}
|
||||||
|
|
|
@ -10,7 +10,7 @@ use crate::{
|
||||||
end_to_end_cases::scenario::{list_chunks, wait_for_exact_chunk_states},
|
end_to_end_cases::scenario::{list_chunks, wait_for_exact_chunk_states},
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::scenario::{collect_query, create_readable_database, rand_name, DatabaseBuilder};
|
use super::scenario::{create_readable_database, rand_name, DatabaseBuilder};
|
||||||
use crate::common::server_fixture::DEFAULT_SERVER_ID;
|
use crate::common::server_fixture::DEFAULT_SERVER_ID;
|
||||||
use generated_types::influxdata::iox::management::v1::{operation_metadata::Job, CompactChunks};
|
use generated_types::influxdata::iox::management::v1::{operation_metadata::Job, CompactChunks};
|
||||||
|
|
||||||
|
@ -310,9 +310,14 @@ async fn assert_chunk_query_works(fixture: &ServerFixture, db_name: &str) {
|
||||||
let mut client = fixture.flight_client();
|
let mut client = fixture.flight_client();
|
||||||
let sql_query = "select region, user, time from cpu";
|
let sql_query = "select region, user, time from cpu";
|
||||||
|
|
||||||
let query_results = client.perform_query(db_name, sql_query).await.unwrap();
|
let batches = client
|
||||||
|
.perform_query(db_name, sql_query)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.collect()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
let batches = collect_query(query_results).await;
|
|
||||||
let expected_read_data = vec![
|
let expected_read_data = vec![
|
||||||
"+--------+------+--------------------------------+",
|
"+--------+------+--------------------------------+",
|
||||||
"| region | user | time |",
|
"| region | user | time |",
|
||||||
|
|
|
@ -15,7 +15,6 @@ use generated_types::{
|
||||||
};
|
};
|
||||||
use influxdb_iox_client::{
|
use influxdb_iox_client::{
|
||||||
connection::Connection,
|
connection::Connection,
|
||||||
flight::PerformQuery,
|
|
||||||
management::{
|
management::{
|
||||||
self,
|
self,
|
||||||
generated_types::{partition_template, WriteBufferConnection},
|
generated_types::{partition_template, WriteBufferConnection},
|
||||||
|
@ -468,15 +467,6 @@ pub async fn create_two_partition_database(db_name: impl Into<String>, channel:
|
||||||
.expect("write succeded");
|
.expect("write succeded");
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Collect the results of a query into a vector of record batches
|
|
||||||
pub async fn collect_query(mut query_results: PerformQuery) -> Vec<RecordBatch> {
|
|
||||||
let mut batches = vec![];
|
|
||||||
while let Some(data) = query_results.next().await.unwrap() {
|
|
||||||
batches.push(data);
|
|
||||||
}
|
|
||||||
batches
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Wait for the chunks to be in exactly `desired_storages` states
|
/// Wait for the chunks to be in exactly `desired_storages` states
|
||||||
pub async fn wait_for_exact_chunk_states(
|
pub async fn wait_for_exact_chunk_states(
|
||||||
fixture: &ServerFixture,
|
fixture: &ServerFixture,
|
||||||
|
|
|
@ -4,7 +4,7 @@ use crate::{
|
||||||
};
|
};
|
||||||
use arrow_util::{assert_batches_eq, test_util::normalize_batches};
|
use arrow_util::{assert_batches_eq, test_util::normalize_batches};
|
||||||
|
|
||||||
use super::scenario::{collect_query, create_readable_database, list_chunks, rand_name};
|
use super::scenario::{create_readable_database, list_chunks, rand_name};
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_operations() {
|
async fn test_operations() {
|
||||||
|
@ -47,9 +47,13 @@ async fn test_operations() {
|
||||||
let mut client = fixture.flight_client();
|
let mut client = fixture.flight_client();
|
||||||
let sql_query = "select status, description from system.operations";
|
let sql_query = "select status, description from system.operations";
|
||||||
|
|
||||||
let query_results = client.perform_query(&db_name1, sql_query).await.unwrap();
|
let batches = client
|
||||||
|
.perform_query(&db_name1, sql_query)
|
||||||
let batches = collect_query(query_results).await;
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.collect()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
// parameterize on db_name1
|
// parameterize on db_name1
|
||||||
|
|
||||||
|
@ -64,9 +68,14 @@ async fn test_operations() {
|
||||||
assert_batches_eq!(expected_read_data, &batches);
|
assert_batches_eq!(expected_read_data, &batches);
|
||||||
|
|
||||||
// Should not see jobs from db1 when querying db2
|
// Should not see jobs from db1 when querying db2
|
||||||
let query_results = client.perform_query(&db_name2, sql_query).await.unwrap();
|
let batches = client
|
||||||
|
.perform_query(&db_name2, sql_query)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.collect()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
let batches = collect_query(query_results).await;
|
|
||||||
let expected_read_data = vec![
|
let expected_read_data = vec![
|
||||||
"+--------+-------------+",
|
"+--------+-------------+",
|
||||||
"| status | description |",
|
"| status | description |",
|
||||||
|
@ -109,13 +118,14 @@ async fn test_queries() {
|
||||||
let query = "select query_type, query_text from system.queries";
|
let query = "select query_type, query_text from system.queries";
|
||||||
|
|
||||||
// Query system.queries and should have an entry for the storage rpc
|
// Query system.queries and should have an entry for the storage rpc
|
||||||
let query_results = fixture
|
let batches = fixture
|
||||||
.flight_client()
|
.flight_client()
|
||||||
.perform_query(&db_name, query)
|
.perform_query(&db_name, query)
|
||||||
.await
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.collect()
|
||||||
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let batches = collect_query(query_results).await;
|
|
||||||
let batches = normalize_batches(batches, scenario.normalizer());
|
let batches = normalize_batches(batches, scenario.normalizer());
|
||||||
|
|
||||||
let expected_read_data = vec![
|
let expected_read_data = vec![
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use super::scenario::{collect_query, Scenario};
|
use super::scenario::Scenario;
|
||||||
use crate::common::{
|
use crate::common::{
|
||||||
server_fixture::{ServerFixture, ServerType, TestConfig},
|
server_fixture::{ServerFixture, ServerType, TestConfig},
|
||||||
udp_listener::UdpCapture,
|
udp_listener::UdpCapture,
|
||||||
|
@ -33,6 +33,7 @@ async fn setup() -> (UdpCapture, ServerFixture) {
|
||||||
(udp_capture, server_fixture)
|
(udp_capture, server_fixture)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Runs a query, discarding the results
|
||||||
async fn run_sql_query(server_fixture: &ServerFixture) {
|
async fn run_sql_query(server_fixture: &ServerFixture) {
|
||||||
let scenario = Scenario::new();
|
let scenario = Scenario::new();
|
||||||
scenario
|
scenario
|
||||||
|
@ -44,12 +45,13 @@ async fn run_sql_query(server_fixture: &ServerFixture) {
|
||||||
let sql_query = "select * from cpu_load_short";
|
let sql_query = "select * from cpu_load_short";
|
||||||
let mut client = server_fixture.flight_client();
|
let mut client = server_fixture.flight_client();
|
||||||
|
|
||||||
let query_results = client
|
client
|
||||||
.perform_query(scenario.database_name(), sql_query)
|
.perform_query(scenario.database_name(), sql_query)
|
||||||
.await
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.collect()
|
||||||
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
collect_query(query_results).await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|
|
@ -218,8 +218,8 @@ impl PerformQuery {
|
||||||
)?))
|
)?))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return all record batches of it
|
/// Collect and return all `RecordBatch`es into a `Vec`
|
||||||
pub async fn to_batches(&mut self) -> Result<Vec<RecordBatch>, Error> {
|
pub async fn collect(&mut self) -> Result<Vec<RecordBatch>, Error> {
|
||||||
let mut batches = Vec::new();
|
let mut batches = Vec::new();
|
||||||
while let Some(data) = self.next().await? {
|
while let Some(data) = self.next().await? {
|
||||||
batches.push(data);
|
batches.push(data);
|
||||||
|
|
|
@ -535,4 +535,31 @@ impl Client {
|
||||||
.unwrap_field("operation")?
|
.unwrap_field("operation")?
|
||||||
.try_into()?)
|
.try_into()?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Compact all object store of a give partition
|
||||||
|
pub async fn compact_object_store_partition(
|
||||||
|
&mut self,
|
||||||
|
db_name: impl Into<String> + Send,
|
||||||
|
table_name: impl Into<String> + Send,
|
||||||
|
partition_key: impl Into<String> + Send,
|
||||||
|
) -> Result<IoxOperation, Error> {
|
||||||
|
let db_name = db_name.into();
|
||||||
|
let partition_key = partition_key.into();
|
||||||
|
let table_name = table_name.into();
|
||||||
|
|
||||||
|
let response = self
|
||||||
|
.inner
|
||||||
|
.compact_object_store_partition(CompactObjectStorePartitionRequest {
|
||||||
|
db_name,
|
||||||
|
partition_key,
|
||||||
|
table_name,
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(response
|
||||||
|
.into_inner()
|
||||||
|
.operation
|
||||||
|
.unwrap_field("operation")?
|
||||||
|
.try_into()?)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1061,10 +1061,21 @@ pub enum InitError {
|
||||||
/// The Database startup state machine
|
/// The Database startup state machine
|
||||||
///
|
///
|
||||||
/// A Database starts in DatabaseState::Known and advances through the
|
/// A Database starts in DatabaseState::Known and advances through the
|
||||||
/// states in sequential order until it reaches Initialized or an error
|
/// non error states in sequential order until either:
|
||||||
/// is encountered.
|
///
|
||||||
|
/// 1. It reaches `Initialized`
|
||||||
|
///
|
||||||
|
/// 2. It is reset to `Known` and starts initialization again
|
||||||
|
///
|
||||||
|
/// 3. An error is encountered, in which case it transitions to one of
|
||||||
|
/// the error states. Most are Terminal (and thus require operator
|
||||||
|
/// intervention) but some (such as `WriteBufferCreationError`) may
|
||||||
|
/// resolve after some time to the basic initialization sequence
|
||||||
|
/// (e.g. `Initialized`)
|
||||||
|
///
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
enum DatabaseState {
|
enum DatabaseState {
|
||||||
|
// Basic initialization sequence states:
|
||||||
Known(DatabaseStateKnown),
|
Known(DatabaseStateKnown),
|
||||||
DatabaseObjectStoreFound(DatabaseStateDatabaseObjectStoreFound),
|
DatabaseObjectStoreFound(DatabaseStateDatabaseObjectStoreFound),
|
||||||
OwnerInfoLoaded(DatabaseStateOwnerInfoLoaded),
|
OwnerInfoLoaded(DatabaseStateOwnerInfoLoaded),
|
||||||
|
@ -1072,12 +1083,23 @@ enum DatabaseState {
|
||||||
CatalogLoaded(DatabaseStateCatalogLoaded),
|
CatalogLoaded(DatabaseStateCatalogLoaded),
|
||||||
Initialized(DatabaseStateInitialized),
|
Initialized(DatabaseStateInitialized),
|
||||||
|
|
||||||
|
// Error states
|
||||||
|
/// Terminal State
|
||||||
DatabaseObjectStoreLookupError(DatabaseStateKnown, Arc<InitError>),
|
DatabaseObjectStoreLookupError(DatabaseStateKnown, Arc<InitError>),
|
||||||
|
/// Terminal State
|
||||||
NoActiveDatabase(DatabaseStateKnown, Arc<InitError>),
|
NoActiveDatabase(DatabaseStateKnown, Arc<InitError>),
|
||||||
|
/// Terminal State
|
||||||
OwnerInfoLoadError(DatabaseStateDatabaseObjectStoreFound, Arc<InitError>),
|
OwnerInfoLoadError(DatabaseStateDatabaseObjectStoreFound, Arc<InitError>),
|
||||||
|
/// Terminal State
|
||||||
RulesLoadError(DatabaseStateOwnerInfoLoaded, Arc<InitError>),
|
RulesLoadError(DatabaseStateOwnerInfoLoaded, Arc<InitError>),
|
||||||
|
/// Terminal State
|
||||||
CatalogLoadError(DatabaseStateRulesLoaded, Arc<InitError>),
|
CatalogLoadError(DatabaseStateRulesLoaded, Arc<InitError>),
|
||||||
|
/// Non Terminal State: There was an error creating a connction to
|
||||||
|
/// the WriteBuffer, but the connection will be retried. If a
|
||||||
|
/// connection is successfully created, the database will
|
||||||
|
/// transition to `Initialized`
|
||||||
WriteBufferCreationError(DatabaseStateCatalogLoaded, Arc<InitError>),
|
WriteBufferCreationError(DatabaseStateCatalogLoaded, Arc<InitError>),
|
||||||
|
/// Terminal State
|
||||||
ReplayError(DatabaseStateCatalogLoaded, Arc<InitError>),
|
ReplayError(DatabaseStateCatalogLoaded, Arc<InitError>),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -675,6 +675,37 @@ impl Db {
|
||||||
fut.await.context(TaskCancelled)?.context(LifecycleError)
|
fut.await.context(TaskCancelled)?.context(LifecycleError)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Compact all persisted chunks in this partition
|
||||||
|
/// Return error if the persisted chunks are not contiguous. This means
|
||||||
|
/// there are chunks in between those OS chunks are not yet persisted
|
||||||
|
pub fn compact_object_store_partition(
|
||||||
|
self: &Arc<Self>,
|
||||||
|
table_name: &str,
|
||||||
|
partition_key: &str,
|
||||||
|
) -> Result<TaskTracker<Job>> {
|
||||||
|
// acquire partition read lock to get OS chunk ids
|
||||||
|
let partition = self.lockable_partition(table_name, partition_key)?;
|
||||||
|
let partition = partition.read();
|
||||||
|
let chunks = partition.chunks();
|
||||||
|
|
||||||
|
// Get all OS chunk IDs
|
||||||
|
let mut chunk_ids = vec![];
|
||||||
|
for chunk in chunks {
|
||||||
|
let chunk = chunk.read();
|
||||||
|
if chunk.is_persisted() {
|
||||||
|
chunk_ids.push(chunk.id());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// drop partition lock
|
||||||
|
partition.into_data();
|
||||||
|
|
||||||
|
// Compact all the OS chunks
|
||||||
|
// Error will return if those OS chunks are not contiguous which means
|
||||||
|
// a chunk in between those OS chunks are not yet persisted
|
||||||
|
self.compact_object_store_chunks(table_name, partition_key, chunk_ids)
|
||||||
|
}
|
||||||
|
|
||||||
/// Compact all provided persisted chunks
|
/// Compact all provided persisted chunks
|
||||||
pub fn compact_object_store_chunks(
|
pub fn compact_object_store_chunks(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
|
@ -2135,7 +2166,6 @@ mod tests {
|
||||||
load_parquet_from_store_for_path(&path_list[0], Arc::clone(&db.iox_object_store))
|
load_parquet_from_store_for_path(&path_list[0], Arc::clone(&db.iox_object_store))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone())
|
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone())
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
|
@ -690,7 +690,8 @@ mod tests {
|
||||||
let partition = partition.upgrade();
|
let partition = partition.upgrade();
|
||||||
let chunk1 = chunks[0].write();
|
let chunk1 = chunks[0].write();
|
||||||
let chunk2 = chunks[1].write();
|
let chunk2 = chunks[1].write();
|
||||||
let _compacted_chunk = compact_object_store_chunks(partition, vec![chunk1, chunk2])
|
// Provide the chunk ids in reverse contiguous order to see if we handle it well
|
||||||
|
let _compacted_chunk = compact_object_store_chunks(partition, vec![chunk2, chunk1])
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.1
|
.1
|
||||||
.await
|
.await
|
||||||
|
|
Loading…
Reference in New Issue