feat: sync chunks in querier (#3911)
* feat: `ParquetFileRepo::list_by_namespace_not_to_delete` * feat: `ChunkAddr: Clone` * test: ensure that querier keeps same partition objects * test: improve `create_parquet_file` flexibility * feat: sync chunks in querier * test: improve `test_parquet_file`pull/24376/head
parent
6ba5e51897
commit
8d00aaba90
|
|
@ -4094,6 +4094,7 @@ dependencies = [
|
|||
"iox_object_store",
|
||||
"job_registry",
|
||||
"metric",
|
||||
"mutable_batch_lp",
|
||||
"object_store",
|
||||
"observability_deps",
|
||||
"parking_lot 0.12.0",
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ use uuid::Uuid;
|
|||
use crate::partition_metadata::PartitionAddr;
|
||||
|
||||
/// Address of the chunk within the catalog
|
||||
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord)]
|
||||
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
|
||||
pub struct ChunkAddr {
|
||||
/// Database name
|
||||
pub db_name: Arc<str>,
|
||||
|
|
|
|||
|
|
@ -436,6 +436,12 @@ pub trait ParquetFileRepo: Send + Sync {
|
|||
sequence_number: SequenceNumber,
|
||||
) -> Result<Vec<ParquetFile>>;
|
||||
|
||||
/// List all parquet files within a given namespace that are NOT marked as [`to_delete`](ParquetFile::to_delete).
|
||||
async fn list_by_namespace_not_to_delete(
|
||||
&mut self,
|
||||
namespace_id: NamespaceId,
|
||||
) -> Result<Vec<ParquetFile>>;
|
||||
|
||||
/// Verify if the parquet file exists by selecting its id
|
||||
async fn exist(&mut self, id: ParquetFileId) -> Result<bool>;
|
||||
|
||||
|
|
@ -1297,7 +1303,7 @@ pub(crate) mod test_helpers {
|
|||
.list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(150))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(vec![other_file], files);
|
||||
assert_eq!(vec![other_file.clone()], files);
|
||||
|
||||
// verify that to_delete is initially set to false and that it can be updated to true
|
||||
assert!(!parquet_file.to_delete);
|
||||
|
|
@ -1312,6 +1318,82 @@ pub(crate) mod test_helpers {
|
|||
.await
|
||||
.unwrap();
|
||||
assert!(files.first().unwrap().to_delete);
|
||||
|
||||
// test list_by_namespace_not_to_delete
|
||||
let namespace2 = repos
|
||||
.namespaces()
|
||||
.create("namespace_parquet_file_test1", "inf", kafka.id, pool.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let table2 = repos
|
||||
.tables()
|
||||
.create_or_get("test_table2", namespace2.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let partition2 = repos
|
||||
.partitions()
|
||||
.create_or_get("foo", sequencer.id, table2.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let files = repos
|
||||
.parquet_files()
|
||||
.list_by_namespace_not_to_delete(namespace2.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(Vec::<ParquetFile>::new(), files);
|
||||
let f1 = repos
|
||||
.parquet_files()
|
||||
.create(
|
||||
sequencer.id,
|
||||
partition2.table_id,
|
||||
partition2.id,
|
||||
Uuid::new_v4(),
|
||||
SequenceNumber::new(10),
|
||||
SequenceNumber::new(140),
|
||||
min_time,
|
||||
max_time,
|
||||
1337,
|
||||
b"md4".to_vec(),
|
||||
0,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let f2 = repos
|
||||
.parquet_files()
|
||||
.create(
|
||||
sequencer.id,
|
||||
partition2.table_id,
|
||||
partition2.id,
|
||||
Uuid::new_v4(),
|
||||
SequenceNumber::new(10),
|
||||
SequenceNumber::new(140),
|
||||
min_time,
|
||||
max_time,
|
||||
1337,
|
||||
b"md4".to_vec(),
|
||||
0,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let files = repos
|
||||
.parquet_files()
|
||||
.list_by_namespace_not_to_delete(namespace2.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(vec![f1.clone(), f2.clone()], files);
|
||||
repos.parquet_files().flag_for_delete(f2.id).await.unwrap();
|
||||
let files = repos
|
||||
.parquet_files()
|
||||
.list_by_namespace_not_to_delete(namespace2.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(vec![f1], files);
|
||||
let files = repos
|
||||
.parquet_files()
|
||||
.list_by_namespace_not_to_delete(NamespaceId::new(i32::MAX))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(Vec::<ParquetFile>::new(), files);
|
||||
}
|
||||
|
||||
async fn test_add_parquet_file_with_tombstones(catalog: Arc<dyn Catalog>) {
|
||||
|
|
|
|||
|
|
@ -803,6 +803,26 @@ impl ParquetFileRepo for MemTxn {
|
|||
Ok(files)
|
||||
}
|
||||
|
||||
async fn list_by_namespace_not_to_delete(
|
||||
&mut self,
|
||||
namespace_id: NamespaceId,
|
||||
) -> Result<Vec<ParquetFile>> {
|
||||
let stage = self.stage();
|
||||
|
||||
let table_ids: HashSet<_> = stage
|
||||
.tables
|
||||
.iter()
|
||||
.filter_map(|table| (table.namespace_id == namespace_id).then(|| table.id))
|
||||
.collect();
|
||||
let parquet_files: Vec<_> = stage
|
||||
.parquet_files
|
||||
.iter()
|
||||
.filter(|f| table_ids.contains(&f.table_id) && !f.to_delete)
|
||||
.cloned()
|
||||
.collect();
|
||||
Ok(parquet_files)
|
||||
}
|
||||
|
||||
async fn exist(&mut self, id: ParquetFileId) -> Result<bool> {
|
||||
let stage = self.stage();
|
||||
|
||||
|
|
|
|||
|
|
@ -261,6 +261,7 @@ decorate!(
|
|||
"parquet_create" = create( &mut self, sequencer_id: SequencerId, table_id: TableId, partition_id: PartitionId, object_store_id: Uuid, min_sequence_number: SequenceNumber, max_sequence_number: SequenceNumber, min_time: Timestamp, max_time: Timestamp, file_size_bytes: i64, parquet_metadata: Vec<u8>, row_count: i64) -> Result<ParquetFile>;
|
||||
"parquet_flag_for_delete" = flag_for_delete(&mut self, id: ParquetFileId) -> Result<()>;
|
||||
"parquet_list_by_sequencer_greater_than" = list_by_sequencer_greater_than(&mut self, sequencer_id: SequencerId, sequence_number: SequenceNumber) -> Result<Vec<ParquetFile>>;
|
||||
"parquet_list_by_namespace_not_to_delete" = list_by_namespace_not_to_delete(&mut self, namespace_id: NamespaceId) -> Result<Vec<ParquetFile>>;
|
||||
"parquet_exist" = exist(&mut self, id: ParquetFileId) -> Result<bool>;
|
||||
"parquet_count" = count(&mut self) -> Result<i64>;
|
||||
]
|
||||
|
|
|
|||
|
|
@ -1209,6 +1209,37 @@ RETURNING *
|
|||
.map_err(|e| Error::SqlxError { source: e })
|
||||
}
|
||||
|
||||
async fn list_by_namespace_not_to_delete(
|
||||
&mut self,
|
||||
namespace_id: NamespaceId,
|
||||
) -> Result<Vec<ParquetFile>> {
|
||||
sqlx::query_as::<_, ParquetFile>(
|
||||
r#"
|
||||
SELECT
|
||||
parquet_file.id as id,
|
||||
parquet_file.sequencer_id as sequencer_id,
|
||||
parquet_file.table_id as table_id,
|
||||
parquet_file.partition_id as partition_id,
|
||||
parquet_file.object_store_id as object_store_id,
|
||||
parquet_file.min_sequence_number as min_sequence_number,
|
||||
parquet_file.max_sequence_number as max_sequence_number,
|
||||
parquet_file.min_time as min_time,
|
||||
parquet_file.max_time as max_time,
|
||||
parquet_file.to_delete as to_delete,
|
||||
parquet_file.file_size_bytes as file_size_bytes,
|
||||
parquet_file.parquet_metadata as parquet_metadata,
|
||||
parquet_file.row_count as row_count
|
||||
FROM parquet_file
|
||||
INNER JOIN table_name on table_name.id = parquet_file.table_id
|
||||
WHERE table_name.namespace_id = $1 AND parquet_file.to_delete = false;
|
||||
"#,
|
||||
)
|
||||
.bind(&namespace_id) // $1
|
||||
.fetch_all(&mut self.inner)
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })
|
||||
}
|
||||
|
||||
async fn exist(&mut self, id: ParquetFileId) -> Result<bool> {
|
||||
let read_result = sqlx::query_as::<_, Count>(
|
||||
r#"SELECT count(*) as count FROM parquet_file WHERE id = $1;"#,
|
||||
|
|
|
|||
|
|
@ -43,3 +43,4 @@ workspace-hack = { path = "../workspace-hack"}
|
|||
arrow = { version = "9.1", features = ["prettyprint"] }
|
||||
arrow_util = { path = "../arrow_util" }
|
||||
bytes = "1.0"
|
||||
mutable_batch_lp = { path = "../mutable_batch_lp" }
|
||||
|
|
|
|||
|
|
@ -108,8 +108,12 @@ impl ParquetChunkAdapter {
|
|||
.expect("cannot create chunk")
|
||||
}
|
||||
|
||||
/// Create a catalog chunk.
|
||||
pub async fn new_catalog_chunk(&self, parquet_file: ParquetFile) -> CatalogChunk {
|
||||
/// Create all components to create a catalog chunk using
|
||||
/// [`Partition::insert_object_store_only_chunk`](db::catalog::partition::Partition::insert_object_store_only_chunk).
|
||||
pub async fn new_catalog_chunk_parts(
|
||||
&self,
|
||||
parquet_file: ParquetFile,
|
||||
) -> (ChunkAddr, ChunkOrder, ChunkMetadata, Arc<ParquetChunk>) {
|
||||
let decoded_parquet_file = DecodedParquetFile::new(parquet_file);
|
||||
let chunk = Arc::new(self.new_parquet_chunk(&decoded_parquet_file).await);
|
||||
|
||||
|
|
@ -117,9 +121,6 @@ impl ParquetChunkAdapter {
|
|||
.old_gen_chunk_addr(&decoded_parquet_file.parquet_file)
|
||||
.await;
|
||||
|
||||
// TODO: register metrics w/ catalog registry
|
||||
let metrics = CatalogChunkMetrics::new_unregistered();
|
||||
|
||||
let iox_metadata = &decoded_parquet_file.iox_metadata;
|
||||
|
||||
// Somewhat hacky workaround because NG has implicit chunk orders, use min sequence number and hope it doesn't
|
||||
|
|
@ -138,6 +139,16 @@ impl ParquetChunkAdapter {
|
|||
sort_key: None,
|
||||
};
|
||||
|
||||
(addr, order, metadata, chunk)
|
||||
}
|
||||
|
||||
/// Create a catalog chunk.
|
||||
pub async fn new_catalog_chunk(&self, parquet_file: ParquetFile) -> CatalogChunk {
|
||||
let (addr, order, metadata, chunk) = self.new_catalog_chunk_parts(parquet_file).await;
|
||||
|
||||
// TODO: register metrics w/ catalog registry
|
||||
let metrics = CatalogChunkMetrics::new_unregistered();
|
||||
|
||||
CatalogChunk::new_object_store_only(
|
||||
addr,
|
||||
order,
|
||||
|
|
@ -155,7 +166,7 @@ impl ParquetChunkAdapter {
|
|||
/// - `table.name -> table_name`
|
||||
/// - `sequencer.id X partition.name -> partition_key`
|
||||
/// - `parquet_file.id -> chunk_id`
|
||||
async fn old_gen_chunk_addr(&self, parquet_file: &ParquetFile) -> ChunkAddr {
|
||||
pub async fn old_gen_chunk_addr(&self, parquet_file: &ParquetFile) -> ChunkAddr {
|
||||
ChunkAddr {
|
||||
db_name: self
|
||||
.catalog_cache
|
||||
|
|
@ -197,6 +208,12 @@ mod tests {
|
|||
catalog.time_provider(),
|
||||
);
|
||||
|
||||
let lp = vec![
|
||||
"table,tag1=WA field_int=1000 8000",
|
||||
"table,tag1=VT field_int=10 10000",
|
||||
"table,tag1=UT field_int=70 20000",
|
||||
]
|
||||
.join("\n");
|
||||
let parquet_file = catalog
|
||||
.create_namespace("ns")
|
||||
.await
|
||||
|
|
@ -204,7 +221,7 @@ mod tests {
|
|||
.await
|
||||
.create_partition("part", 1)
|
||||
.await
|
||||
.create_parquet_file()
|
||||
.create_parquet_file(&lp)
|
||||
.await
|
||||
.parquet_file
|
||||
.clone();
|
||||
|
|
|
|||
|
|
@ -312,7 +312,131 @@ impl QuerierNamespace {
|
|||
}
|
||||
|
||||
async fn sync_chunks(&self) {
|
||||
// TODO: implement
|
||||
let parquet_files = Backoff::new(&self.backoff_config)
|
||||
.retry_all_errors("get parquet files", || async {
|
||||
self.catalog
|
||||
.repositories()
|
||||
.await
|
||||
.parquet_files()
|
||||
.list_by_namespace_not_to_delete(self.id)
|
||||
.await
|
||||
})
|
||||
.await
|
||||
.expect("retry forever");
|
||||
|
||||
let mut desired_chunks: HashMap<_, _> = HashMap::with_capacity(parquet_files.len());
|
||||
for parquet_file in parquet_files {
|
||||
let addr = self.chunk_adapter.old_gen_chunk_addr(&parquet_file).await;
|
||||
desired_chunks.insert(addr, parquet_file);
|
||||
}
|
||||
|
||||
let actual_chunk_addresses: HashSet<_> = self
|
||||
.db_catalog
|
||||
.chunks()
|
||||
.into_iter()
|
||||
.map(|c| {
|
||||
let c = c.read();
|
||||
c.addr().clone()
|
||||
})
|
||||
.collect();
|
||||
|
||||
let to_add: Vec<_> = desired_chunks
|
||||
.iter()
|
||||
.filter_map(|(addr, file)| {
|
||||
(!actual_chunk_addresses.contains(addr)).then(|| (addr.clone(), file.clone()))
|
||||
})
|
||||
.collect();
|
||||
let to_delete: Vec<_> = actual_chunk_addresses
|
||||
.iter()
|
||||
.filter(|addr| !desired_chunks.contains_key(addr))
|
||||
.cloned()
|
||||
.collect();
|
||||
info!(
|
||||
add = to_add.len(),
|
||||
delete = to_delete.len(),
|
||||
actual = actual_chunk_addresses.len(),
|
||||
desired = desired_chunks.len(),
|
||||
namespace = self.name.as_ref(),
|
||||
"Syncing chunks",
|
||||
);
|
||||
|
||||
// prepare to-be-added chunks, so we don't have to perform any IO while holding locks
|
||||
let to_add2 = to_add;
|
||||
let mut to_add = Vec::with_capacity(to_add2.len());
|
||||
for (addr, file) in to_add2 {
|
||||
let parts = self.chunk_adapter.new_catalog_chunk_parts(file).await;
|
||||
to_add.push((addr, parts));
|
||||
}
|
||||
|
||||
// group by table and partition to reduce locking attempts
|
||||
// table name => (partition key => (list of parts to be added, list of chunk IDs to be removed))
|
||||
let mut per_partition_add_delete: HashMap<_, HashMap<_, (Vec<_>, Vec<_>)>> = HashMap::new();
|
||||
for (addr, file) in to_add {
|
||||
per_partition_add_delete
|
||||
.entry(addr.table_name)
|
||||
.or_default()
|
||||
.entry(addr.partition_key)
|
||||
.or_default()
|
||||
.0
|
||||
.push(file);
|
||||
}
|
||||
for addr in to_delete {
|
||||
per_partition_add_delete
|
||||
.entry(addr.table_name)
|
||||
.or_default()
|
||||
.entry(addr.partition_key)
|
||||
.or_default()
|
||||
.1
|
||||
.push(addr.chunk_id);
|
||||
}
|
||||
|
||||
for (table, sub) in per_partition_add_delete {
|
||||
let table = match self.db_catalog.table_mut(Arc::clone(&table)) {
|
||||
Ok(table) => table,
|
||||
Err(e) => {
|
||||
// this might happen if some other process (e.g. management API) just removed the table
|
||||
warn!(
|
||||
%e,
|
||||
namespace = self.name.as_ref(),
|
||||
table = table.as_ref(),
|
||||
"Cannot add/remove chunks to/from table",
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
for (partition, (to_add, to_delete)) in sub {
|
||||
let partition = match table.partition(&partition) {
|
||||
Some(partition) => Arc::clone(partition),
|
||||
None => {
|
||||
// this might happen if some other process (e.g. management API) just removed the table
|
||||
warn!(
|
||||
namespace = self.name.as_ref(),
|
||||
table = table.name().as_ref(),
|
||||
partition = partition.as_ref(),
|
||||
"Cannot add/remove chunks to/from partition",
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let mut partition = partition.write();
|
||||
|
||||
for (addr, chunk_order, metadata, chunk) in to_add {
|
||||
let chunk_id = addr.chunk_id;
|
||||
partition.insert_object_store_only_chunk(
|
||||
chunk_id,
|
||||
chunk_order,
|
||||
metadata,
|
||||
chunk,
|
||||
);
|
||||
}
|
||||
|
||||
for chunk_id in to_delete {
|
||||
// it's OK if the chunk is already gone
|
||||
partition.force_drop_chunk(chunk_id).ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -380,9 +504,13 @@ impl ExecutionContextProvider for QuerierNamespace {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::test_util::{TestCatalog, TestNamespace};
|
||||
use data_types2::ColumnType;
|
||||
use crate::test_util::{TestCatalog, TestNamespace, TestParquetFile};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
use data_types2::{ChunkAddr, ChunkId, ColumnType};
|
||||
use query::frontend::sql::SqlQueryPlanner;
|
||||
use schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_sync_namespace_gone() {
|
||||
|
|
@ -509,6 +637,10 @@ mod tests {
|
|||
(String::from("table2"), String::from("1-k1")),
|
||||
],
|
||||
);
|
||||
let partition_a = querier_namespace
|
||||
.db_catalog
|
||||
.partition("table1", "1-k2")
|
||||
.unwrap();
|
||||
|
||||
table1.create_partition("k2", 2).await;
|
||||
querier_namespace.sync().await;
|
||||
|
|
@ -521,6 +653,144 @@ mod tests {
|
|||
(String::from("table2"), String::from("1-k1")),
|
||||
],
|
||||
);
|
||||
let partition_b = querier_namespace
|
||||
.db_catalog
|
||||
.partition("table1", "1-k2")
|
||||
.unwrap();
|
||||
assert!(Arc::ptr_eq(&partition_a, &partition_b));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_sync_chunks() {
|
||||
let catalog = TestCatalog::new();
|
||||
|
||||
let ns = catalog.create_namespace("ns").await;
|
||||
let table = ns.create_table("table").await;
|
||||
let partition = table.create_partition("k", 1).await;
|
||||
|
||||
let querier_namespace = querier_namespace(&catalog, &ns);
|
||||
querier_namespace.sync().await;
|
||||
assert_eq!(chunks(&querier_namespace), vec![],);
|
||||
|
||||
let file1 = partition.create_parquet_file("table foo=1 11").await;
|
||||
let file2 = partition.create_parquet_file("table foo=2 22").await;
|
||||
querier_namespace.sync().await;
|
||||
let partition_addr = PartitionAddr {
|
||||
db_name: Arc::from("ns"),
|
||||
table_name: Arc::from("table"),
|
||||
partition_key: Arc::from("1-k"),
|
||||
};
|
||||
assert_eq!(
|
||||
chunks(&querier_namespace),
|
||||
vec![
|
||||
ChunkAddr::new(&partition_addr, chunk_id(&file1)),
|
||||
ChunkAddr::new(&partition_addr, chunk_id(&file2)),
|
||||
],
|
||||
);
|
||||
let chunk_a = querier_namespace
|
||||
.db_catalog
|
||||
.chunk("table", "1-k", chunk_id(&file1))
|
||||
.unwrap()
|
||||
.0;
|
||||
|
||||
file2.flag_for_delete().await;
|
||||
let file3 = partition.create_parquet_file("table foo=3 33").await;
|
||||
querier_namespace.sync().await;
|
||||
assert_eq!(
|
||||
chunks(&querier_namespace),
|
||||
vec![
|
||||
ChunkAddr::new(&partition_addr, chunk_id(&file1)),
|
||||
ChunkAddr::new(&partition_addr, chunk_id(&file3)),
|
||||
],
|
||||
);
|
||||
let chunk_b = querier_namespace
|
||||
.db_catalog
|
||||
.chunk("table", "1-k", chunk_id(&file1))
|
||||
.unwrap()
|
||||
.0;
|
||||
assert!(Arc::ptr_eq(&chunk_a, &chunk_b));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_query() {
|
||||
let catalog = TestCatalog::new();
|
||||
|
||||
let ns = catalog.create_namespace("ns").await;
|
||||
|
||||
let table_cpu = ns.create_table("cpu").await;
|
||||
let table_mem = ns.create_table("mem").await;
|
||||
|
||||
table_cpu.create_column("host", ColumnType::Tag).await;
|
||||
table_cpu.create_column("time", ColumnType::Time).await;
|
||||
table_cpu.create_column("load", ColumnType::F64).await;
|
||||
table_cpu.create_column("foo", ColumnType::I64).await;
|
||||
table_mem.create_column("host", ColumnType::Tag).await;
|
||||
table_mem.create_column("time", ColumnType::Time).await;
|
||||
table_mem.create_column("perc", ColumnType::F64).await;
|
||||
|
||||
let partition_cpu_a_1 = table_cpu.create_partition("a", 1).await;
|
||||
let partition_cpu_a_2 = table_cpu.create_partition("a", 2).await;
|
||||
let partition_cpu_b_1 = table_cpu.create_partition("b", 1).await;
|
||||
let partition_mem_c_1 = table_mem.create_partition("c", 1).await;
|
||||
let partition_mem_c_2 = table_mem.create_partition("c", 2).await;
|
||||
|
||||
partition_cpu_a_1
|
||||
.create_parquet_file("cpu,host=a load=1 11")
|
||||
.await;
|
||||
partition_cpu_a_1
|
||||
.create_parquet_file("cpu,host=a load=2 22")
|
||||
.await
|
||||
.flag_for_delete()
|
||||
.await;
|
||||
partition_cpu_a_1
|
||||
.create_parquet_file("cpu,host=a load=3 33")
|
||||
.await;
|
||||
partition_cpu_a_2
|
||||
.create_parquet_file("cpu,host=a load=4 10001")
|
||||
.await;
|
||||
partition_cpu_b_1
|
||||
.create_parquet_file("cpu,host=b load=5 11")
|
||||
.await;
|
||||
partition_mem_c_1
|
||||
.create_parquet_file("mem,host=c perc=50 11\nmem,host=c perc=51 12")
|
||||
.await;
|
||||
partition_mem_c_2
|
||||
.create_parquet_file("mem,host=c perc=50 1001")
|
||||
.await
|
||||
.flag_for_delete()
|
||||
.await;
|
||||
|
||||
let querier_namespace = Arc::new(querier_namespace(&catalog, &ns));
|
||||
querier_namespace.sync().await;
|
||||
|
||||
assert_query(
|
||||
&querier_namespace,
|
||||
"SELECT * FROM cpu ORDER BY host,time",
|
||||
&[
|
||||
"+-----+------+------+--------------------------------+",
|
||||
"| foo | host | load | time |",
|
||||
"+-----+------+------+--------------------------------+",
|
||||
"| | a | 1 | 1970-01-01T00:00:00.000000011Z |",
|
||||
"| | a | 3 | 1970-01-01T00:00:00.000000033Z |",
|
||||
"| | a | 4 | 1970-01-01T00:00:00.000010001Z |",
|
||||
"| | b | 5 | 1970-01-01T00:00:00.000000011Z |",
|
||||
"+-----+------+------+--------------------------------+",
|
||||
],
|
||||
)
|
||||
.await;
|
||||
assert_query(
|
||||
&querier_namespace,
|
||||
"SELECT * FROM mem ORDER BY host,time",
|
||||
&[
|
||||
"+------+------+--------------------------------+",
|
||||
"| host | perc | time |",
|
||||
"+------+------+--------------------------------+",
|
||||
"| c | 50 | 1970-01-01T00:00:00.000000011Z |",
|
||||
"| c | 51 | 1970-01-01T00:00:00.000000012Z |",
|
||||
"+------+------+--------------------------------+",
|
||||
],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
fn querier_namespace(catalog: &Arc<TestCatalog>, ns: &Arc<TestNamespace>) -> QuerierNamespace {
|
||||
|
|
@ -568,4 +838,39 @@ mod tests {
|
|||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
fn chunks(querier_namespace: &QuerierNamespace) -> Vec<ChunkAddr> {
|
||||
sorted(
|
||||
querier_namespace
|
||||
.db_catalog
|
||||
.chunks()
|
||||
.into_iter()
|
||||
.map(|c| {
|
||||
let c = c.read();
|
||||
c.addr().clone()
|
||||
})
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
fn chunk_id(file: &Arc<TestParquetFile>) -> ChunkId {
|
||||
ChunkId::from(Uuid::from_u128(file.parquet_file.id.get() as _))
|
||||
}
|
||||
|
||||
async fn assert_query(
|
||||
querier_namespace: &Arc<QuerierNamespace>,
|
||||
sql: &str,
|
||||
expected_lines: &[&str],
|
||||
) {
|
||||
let planner = SqlQueryPlanner::default();
|
||||
let ctx = querier_namespace.new_query_context(None);
|
||||
|
||||
let physical_plan = planner
|
||||
.query(sql, &ctx)
|
||||
.await
|
||||
.expect("built plan successfully");
|
||||
|
||||
let results: Vec<RecordBatch> = ctx.collect(physical_plan).await.expect("Running plan");
|
||||
assert_batches_sorted_eq!(expected_lines, &results);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
use arrow::record_batch::RecordBatch;
|
||||
use bytes::Bytes;
|
||||
use data_types2::{
|
||||
ColumnType, InfluxDbType, KafkaPartition, KafkaTopic, Namespace, ParquetFile, Partition,
|
||||
|
|
@ -5,12 +6,11 @@ use data_types2::{
|
|||
};
|
||||
use iox_catalog::{interface::Catalog, mem::MemCatalog};
|
||||
use iox_object_store::{IoxObjectStore, ParquetFilePath};
|
||||
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
|
||||
use object_store::ObjectStore;
|
||||
use parquet_file::metadata::{IoxMetadata, IoxParquetMetaData};
|
||||
use query::{
|
||||
exec::Executor,
|
||||
test::{raw_data, TestChunk},
|
||||
};
|
||||
use query::exec::Executor;
|
||||
use schema::selection::Selection;
|
||||
use std::sync::Arc;
|
||||
use time::{MockProvider, Time, TimeProvider};
|
||||
use uuid::Uuid;
|
||||
|
|
@ -163,13 +163,17 @@ pub struct TestPartition {
|
|||
}
|
||||
|
||||
impl TestPartition {
|
||||
pub async fn create_parquet_file(self: &Arc<Self>) -> Arc<TestParquetFile> {
|
||||
pub async fn create_parquet_file(self: &Arc<Self>, lp: &str) -> Arc<TestParquetFile> {
|
||||
let mut repos = self.catalog.catalog.repositories().await;
|
||||
|
||||
let object_store_id = Uuid::nil();
|
||||
let (table, batch) = lp_to_mutable_batch(lp);
|
||||
assert_eq!(table, self.table.table.name);
|
||||
let row_count = batch.rows();
|
||||
let record_batch = batch.to_arrow(Selection::All).unwrap();
|
||||
|
||||
let object_store_id = Uuid::new_v4();
|
||||
let min_sequence_number = SequenceNumber::new(1);
|
||||
let max_sequence_number = SequenceNumber::new(100);
|
||||
let row_count_expected = 3;
|
||||
let metadata = IoxMetadata {
|
||||
object_store_id,
|
||||
creation_timestamp: now(),
|
||||
|
|
@ -184,10 +188,10 @@ impl TestPartition {
|
|||
time_of_last_write: now(),
|
||||
min_sequence_number,
|
||||
max_sequence_number,
|
||||
row_count: row_count_expected,
|
||||
row_count: row_count as i64,
|
||||
};
|
||||
let (parquet_metadata_bin, file_size_bytes) =
|
||||
create_parquet_file(&self.catalog.object_store, &metadata).await;
|
||||
create_parquet_file(&self.catalog.object_store, &metadata, record_batch).await;
|
||||
|
||||
// decode metadata because we need to store them within the catalog
|
||||
let parquet_metadata = Arc::new(IoxParquetMetaData::from_thrift_bytes(
|
||||
|
|
@ -196,8 +200,6 @@ impl TestPartition {
|
|||
let decoded_metadata = parquet_metadata.decode().unwrap();
|
||||
let schema = decoded_metadata.read_schema().unwrap();
|
||||
let stats = decoded_metadata.read_statistics(&schema).unwrap();
|
||||
let row_count = stats[0].total_count();
|
||||
assert_eq!(row_count as i64, row_count_expected);
|
||||
let ts_min_max = stats
|
||||
.iter()
|
||||
.find_map(|stat| {
|
||||
|
|
@ -224,7 +226,10 @@ impl TestPartition {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
Arc::new(TestParquetFile { parquet_file })
|
||||
Arc::new(TestParquetFile {
|
||||
catalog: Arc::clone(&self.catalog),
|
||||
parquet_file,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -232,25 +237,17 @@ impl TestPartition {
|
|||
async fn create_parquet_file(
|
||||
object_store: &Arc<ObjectStore>,
|
||||
metadata: &IoxMetadata,
|
||||
record_batch: RecordBatch,
|
||||
) -> (Vec<u8>, usize) {
|
||||
let iox_object_store = Arc::new(IoxObjectStore::existing(
|
||||
Arc::clone(object_store),
|
||||
IoxObjectStore::root_path_for(object_store, uuid::Uuid::new_v4()),
|
||||
));
|
||||
|
||||
let chunk1 = Arc::new(
|
||||
TestChunk::new("t")
|
||||
.with_id(1)
|
||||
.with_time_column()
|
||||
.with_tag_column("tag1")
|
||||
.with_i64_field_column("field_int")
|
||||
.with_three_rows_of_data(),
|
||||
);
|
||||
let record_batches = raw_data(&[chunk1]).await;
|
||||
let schema = record_batches.first().unwrap().schema();
|
||||
let schema = record_batch.schema();
|
||||
|
||||
let data = parquet_file::storage::Storage::new(Arc::clone(&iox_object_store))
|
||||
.parquet_bytes(record_batches, schema, metadata)
|
||||
.parquet_bytes(vec![record_batch], schema, metadata)
|
||||
.await
|
||||
.unwrap();
|
||||
let data = Arc::new(data);
|
||||
|
|
@ -280,9 +277,22 @@ async fn create_parquet_file(
|
|||
}
|
||||
|
||||
pub struct TestParquetFile {
|
||||
pub catalog: Arc<TestCatalog>,
|
||||
pub parquet_file: ParquetFile,
|
||||
}
|
||||
|
||||
impl TestParquetFile {
|
||||
pub async fn flag_for_delete(self: &Arc<Self>) {
|
||||
let mut repos = self.catalog.catalog.repositories().await;
|
||||
|
||||
repos
|
||||
.parquet_files()
|
||||
.flag_for_delete(self.parquet_file.id)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
fn now() -> Time {
|
||||
Time::from_timestamp(0, 0)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue