Merge branch 'main' into dom/schema-api

pull/24376/head
kodiakhq[bot] 2022-06-27 21:34:07 +00:00 committed by GitHub
commit c22aed4347
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 180 additions and 697 deletions

View File

@ -1276,11 +1276,11 @@ mod tests {
ColumnSet, ColumnType, KafkaPartition, NamespaceId, ParquetFileParams, SequenceNumber,
};
use futures::{stream::FuturesOrdered, StreamExt, TryStreamExt};
use iox_catalog::interface::{get_schema_by_id, INITIAL_COMPACTION_LEVEL};
use iox_tests::util::TestCatalog;
use iox_catalog::interface::INITIAL_COMPACTION_LEVEL;
use iox_tests::util::{TestCatalog, TestTable};
use iox_time::SystemProvider;
use parquet_file::ParquetFilePath;
use schema::{selection::Selection, sort::SortKey, Schema};
use schema::{selection::Selection, sort::SortKey};
use std::sync::atomic::{AtomicI64, Ordering};
use test_helpers::maybe_start_logging;
@ -1338,7 +1338,7 @@ mod tests {
table.create_column("tag1", ColumnType::Tag).await;
table.create_column("field_int", ColumnType::I64).await;
table.create_column("time", ColumnType::Time).await;
let table_schema = read_table_schema(&catalog, ns.namespace.id, &table.table.name).await;
let table_schema = table.schema().await;
// One parquet file
let partition = table
@ -1440,7 +1440,7 @@ mod tests {
// query the chunks
// most recent compacted second half (~10%)
let files1 = files.pop().unwrap();
let batches = read_parquet_file(&catalog, files1).await;
let batches = read_parquet_file(&table, files1).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+-----------------------------+",
@ -1453,7 +1453,7 @@ mod tests {
);
// least recent compacted first half (~90%)
let files2 = files.pop().unwrap();
let batches = read_parquet_file(&catalog, files2).await;
let batches = read_parquet_file(&table, files2).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+-----------------------------+",
@ -1513,7 +1513,7 @@ mod tests {
table.create_column("tag3", ColumnType::Tag).await;
table.create_column("field_int", ColumnType::I64).await;
table.create_column("time", ColumnType::Time).await;
let table_schema = read_table_schema(&catalog, ns.namespace.id, &table.table.name).await;
let table_schema = table.schema().await;
let partition = table
.with_sequencer(&sequencer)
.create_partition("part")
@ -1666,7 +1666,7 @@ mod tests {
// Compacted file
let file2 = files.pop().unwrap();
let batches = read_parquet_file(&catalog, file2).await;
let batches = read_parquet_file(&table, file2).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+------+------+-----------------------------+",
@ -1684,7 +1684,7 @@ mod tests {
);
// Non-compacted file
let file1 = files.pop().unwrap();
let batches = read_parquet_file(&catalog, file1).await;
let batches = read_parquet_file(&table, file1).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+--------------------------------+",
@ -1716,7 +1716,7 @@ mod tests {
table.create_column("tag1", ColumnType::Tag).await;
table.create_column("field_int", ColumnType::I64).await;
table.create_column("time", ColumnType::Time).await;
let table_schema = read_table_schema(&catalog, ns.namespace.id, &table.table.name).await;
let table_schema = table.schema().await;
let partition = table
.with_sequencer(&sequencer)
.create_partition("part")
@ -1724,9 +1724,7 @@ mod tests {
let parquet_file = partition
.create_parquet_file_with_min_max(&lp, 1, 1, 8000, 20000)
.await
.parquet_file
.split_off_metadata()
.0;
.parquet_file;
let split_percentage = 90;
let max_concurrent_compaction_size_bytes = 100000;
@ -1848,7 +1846,7 @@ mod tests {
table.create_column("tag1", ColumnType::Tag).await;
table.create_column("field_int", ColumnType::I64).await;
table.create_column("time", ColumnType::Time).await;
let table_schema = read_table_schema(&catalog, ns.namespace.id, &table.table.name).await;
let table_schema = table.schema().await;
let partition = table
.with_sequencer(&sequencer)
.create_partition("part")
@ -1856,9 +1854,7 @@ mod tests {
let parquet_file = partition
.create_parquet_file_with_min_max(&lp, 1, 1, 8000, 20000)
.await
.parquet_file
.split_off_metadata()
.0;
.parquet_file;
let split_percentage = 100;
let max_concurrent_compaction_size_bytes = 100000;
@ -1960,7 +1956,7 @@ mod tests {
table.create_column("tag1", ColumnType::Tag).await;
table.create_column("field_int", ColumnType::I64).await;
table.create_column("time", ColumnType::Time).await;
let table_schema = read_table_schema(&catalog, ns.namespace.id, &table.table.name).await;
let table_schema = table.schema().await;
let partition = table
.with_sequencer(&sequencer)
.create_partition("part")
@ -1969,15 +1965,11 @@ mod tests {
let parquet_file1 = partition
.create_parquet_file_with_min_max_size(&lp1, 1, 5, 8000, 20000, 140000)
.await
.parquet_file
.split_off_metadata()
.0;
.parquet_file;
let parquet_file2 = partition
.create_parquet_file_with_min_max_size(&lp2, 10, 15, 6000, 25000, 100000)
.await
.parquet_file
.split_off_metadata()
.0;
.parquet_file;
let split_percentage = 90;
let max_concurrent_compaction_size_bytes = 100000;
@ -2110,7 +2102,7 @@ mod tests {
table.create_column("tag3", ColumnType::Tag).await;
table.create_column("field_int", ColumnType::I64).await;
table.create_column("time", ColumnType::Time).await;
let table_schema = read_table_schema(&catalog, ns.namespace.id, &table.table.name).await;
let table_schema = table.schema().await;
let partition = table
.with_sequencer(&sequencer)
.create_partition("part")
@ -2121,21 +2113,15 @@ mod tests {
let parquet_file1 = partition
.create_parquet_file_with_min_max_size(&lp1, 1, 5, 8000, 20000, 50000)
.await
.parquet_file
.split_off_metadata()
.0;
.parquet_file;
let parquet_file2 = partition
.create_parquet_file_with_min_max_size(&lp2, 10, 15, 6000, 25000, 50000)
.await
.parquet_file
.split_off_metadata()
.0;
.parquet_file;
let parquet_file3 = partition
.create_parquet_file_with_min_max_size(&lp3, 20, 25, 6000, 8000, 20000)
.await
.parquet_file
.split_off_metadata()
.0;
.parquet_file;
let split_percentage = 90;
let max_concurrent_compaction_size_bytes = 100000;
@ -2292,7 +2278,7 @@ mod tests {
table.create_column("tag1", ColumnType::Tag).await;
table.create_column("field_int", ColumnType::I64).await;
table.create_column("time", ColumnType::Time).await;
let table_schema = read_table_schema(&catalog, ns.namespace.id, &table.table.name).await;
let table_schema = table.schema().await;
let partition = table
.with_sequencer(&sequencer)
.create_partition("part")
@ -2301,15 +2287,11 @@ mod tests {
let pf1 = partition
.create_parquet_file_with_min_max(&lp1, 1, 5, 8000, 20000)
.await
.parquet_file
.split_off_metadata()
.0;
.parquet_file;
let pf2 = partition
.create_parquet_file_with_min_max(&lp2, 1, 5, 28000, 35000)
.await
.parquet_file
.split_off_metadata()
.0;
.parquet_file;
// Build 2 QueryableParquetChunks
let pt1 = ParquetFileWithTombstone::new(Arc::new(pf1), vec![]);
@ -2356,7 +2338,7 @@ mod tests {
table.create_column("field_int", ColumnType::I64).await;
table.create_column("field_float", ColumnType::F64).await;
table.create_column("time", ColumnType::Time).await;
let table_schema = read_table_schema(&catalog, ns.namespace.id, &table.table.name).await;
let table_schema = table.schema().await;
let partition = table
.with_sequencer(&sequencer)
.create_partition("part")
@ -2364,12 +2346,7 @@ mod tests {
.update_sort_key(SortKey::from_columns(["tag1", "tag2", "time"]))
.await;
let pf = partition
.create_parquet_file(&lp)
.await
.parquet_file
.split_off_metadata()
.0;
let pf = partition.create_parquet_file(&lp).await.parquet_file;
let pt = ParquetFileWithTombstone::new(Arc::new(pf), vec![]);
@ -2990,7 +2967,6 @@ mod tests {
min_time: Timestamp::new(1),
max_time: Timestamp::new(5),
file_size_bytes: 1337,
parquet_metadata: b"md1".to_vec(),
row_count: 0,
created_at: Timestamp::new(1),
compaction_level: INITIAL_COMPACTION_LEVEL,
@ -3257,7 +3233,6 @@ mod tests {
min_time,
max_time,
file_size_bytes: 1337,
parquet_metadata: b"md1".to_vec(),
compaction_level: INITIAL_COMPACTION_LEVEL, // level of file of new writes
row_count: 0,
created_at: Timestamp::new(1),
@ -3439,7 +3414,6 @@ mod tests {
min_time: Timestamp::new(1),
max_time: Timestamp::new(5),
file_size_bytes: 1337,
parquet_metadata: b"md1".to_vec(),
row_count: 0,
compaction_level: INITIAL_COMPACTION_LEVEL, // level of file of new writes
created_at: Timestamp::new(1),
@ -3553,7 +3527,7 @@ mod tests {
table.create_column("tag1", ColumnType::Tag).await;
table.create_column("ifield", ColumnType::I64).await;
table.create_column("time", ColumnType::Time).await;
let table_schema = read_table_schema(&catalog, ns.namespace.id, &table.table.name).await;
let table_schema = table.schema().await;
let partition = table
.with_sequencer(&sequencer)
.create_partition("part")
@ -3562,15 +3536,11 @@ mod tests {
let parquet_file1 = partition
.create_parquet_file_with_min_max_size(&lp1, 1, 5, 1, 1000, 60000)
.await
.parquet_file
.split_off_metadata()
.0;
.parquet_file;
let parquet_file2 = partition
.create_parquet_file_with_min_max_size(&lp2, 10, 15, 500, 1500, 60000)
.await
.parquet_file
.split_off_metadata()
.0;
.parquet_file;
let split_percentage = 90;
let max_concurrent_compaction_size_bytes = 100000;
@ -3633,38 +3603,11 @@ mod tests {
assert_eq!(num_rows, 1499);
}
async fn read_table_schema(
catalog: &TestCatalog,
namespace_id: NamespaceId,
table_name: &str,
) -> Schema {
let mut repos = catalog.catalog().repositories().await;
let namespace_schema = get_schema_by_id(namespace_id, repos.as_mut())
.await
.unwrap();
namespace_schema
.tables
.get(table_name)
.unwrap()
.clone()
.try_into()
.unwrap()
}
async fn read_parquet_file(catalog: &TestCatalog, file: ParquetFile) -> Vec<RecordBatch> {
let storage = ParquetStorage::new(catalog.object_store());
async fn read_parquet_file(table: &Arc<TestTable>, file: ParquetFile) -> Vec<RecordBatch> {
let storage = ParquetStorage::new(table.catalog.object_store());
// get schema
let mut repos = catalog.catalog().repositories().await;
let table_name = repos
.tables()
.get_by_id(file.table_id)
.await
.unwrap()
.unwrap()
.name;
drop(repos);
let table_schema = read_table_schema(catalog, file.namespace_id, &table_name).await;
let table_schema = table.schema().await;
let selection: Vec<_> = file.column_set.iter().map(|s| s.as_str()).collect();
let schema = table_schema.select_by_names(&selection).unwrap();

View File

@ -182,7 +182,6 @@ mod tests {
min_time,
max_time,
file_size_bytes: 1337,
parquet_metadata: b"md1".to_vec(),
row_count: 0,
created_at: Timestamp::new(1),
compaction_level: INITIAL_COMPACTION_LEVEL,
@ -265,7 +264,6 @@ mod tests {
min_time,
max_time,
file_size_bytes: 1337,
parquet_metadata: b"md1".to_vec(),
row_count: 0,
created_at: Timestamp::new(1),
compaction_level: INITIAL_COMPACTION_LEVEL,
@ -352,7 +350,6 @@ mod tests {
min_time,
max_time,
file_size_bytes: 1337,
parquet_metadata: b"md1".to_vec(),
row_count: 0,
created_at: Timestamp::new(1),
compaction_level: INITIAL_COMPACTION_LEVEL,

View File

@ -799,8 +799,7 @@ pub struct Tombstone {
impl Tombstone {
/// Estimate the memory consumption of this object and its contents
pub fn size(&self) -> usize {
// No additional heap allocations
std::mem::size_of_val(self)
std::mem::size_of_val(self) + self.serialized_predicate.capacity()
}
}
@ -921,139 +920,6 @@ impl ParquetFile {
}
}
/// Data for a parquet file reference that has been inserted in the catalog, including the
/// `parquet_metadata` field that can be expensive to fetch.
#[derive(Debug, Clone, PartialEq, sqlx::FromRow)]
pub struct ParquetFileWithMetadata {
/// the id of the file in the catalog
pub id: ParquetFileId,
/// the sequencer that sequenced writes that went into this file
pub sequencer_id: SequencerId,
/// the namespace
pub namespace_id: NamespaceId,
/// the table
pub table_id: TableId,
/// the partition
pub partition_id: PartitionId,
/// the uuid used in the object store path for this file
pub object_store_id: Uuid,
/// the minimum sequence number from a record in this file
pub min_sequence_number: SequenceNumber,
/// the maximum sequence number from a record in this file
pub max_sequence_number: SequenceNumber,
/// the min timestamp of data in this file
pub min_time: Timestamp,
/// the max timestamp of data in this file
pub max_time: Timestamp,
/// When this file was marked for deletion
pub to_delete: Option<Timestamp>,
/// file size in bytes
pub file_size_bytes: i64,
/// thrift-encoded parquet metadata
pub parquet_metadata: Vec<u8>,
/// the number of rows of data in this file
pub row_count: i64,
/// the compaction level of the file
pub compaction_level: i16,
/// the creation time of the parquet file
pub created_at: Timestamp,
/// Set of columns within this parquet file.
///
/// See [`ParquetFile::column_set`].
pub column_set: ColumnSet,
}
impl ParquetFileWithMetadata {
/// Create an instance from an instance of ParquetFile and metadata bytes fetched from the
/// catalog.
pub fn new(parquet_file: ParquetFile, parquet_metadata: Vec<u8>) -> Self {
let ParquetFile {
id,
sequencer_id,
namespace_id,
table_id,
partition_id,
object_store_id,
min_sequence_number,
max_sequence_number,
min_time,
max_time,
to_delete,
file_size_bytes,
row_count,
compaction_level,
created_at,
column_set: columns,
} = parquet_file;
Self {
id,
sequencer_id,
namespace_id,
table_id,
partition_id,
object_store_id,
min_sequence_number,
max_sequence_number,
min_time,
max_time,
to_delete,
file_size_bytes,
parquet_metadata,
row_count,
compaction_level,
created_at,
column_set: columns,
}
}
/// Split the parquet_metadata off, leaving a regular ParquetFile and the bytes to transfer
/// ownership separately.
pub fn split_off_metadata(self) -> (ParquetFile, Vec<u8>) {
let Self {
id,
sequencer_id,
namespace_id,
table_id,
partition_id,
object_store_id,
min_sequence_number,
max_sequence_number,
min_time,
max_time,
to_delete,
file_size_bytes,
parquet_metadata,
row_count,
compaction_level,
created_at,
column_set: columns,
} = self;
(
ParquetFile {
id,
sequencer_id,
namespace_id,
table_id,
partition_id,
object_store_id,
min_sequence_number,
max_sequence_number,
min_time,
max_time,
to_delete,
file_size_bytes,
row_count,
compaction_level,
created_at,
column_set: columns,
},
parquet_metadata,
)
}
}
/// Data for a parquet file to be inserted into the catalog.
#[derive(Debug, Clone, PartialEq)]
pub struct ParquetFileParams {
@ -1077,8 +943,6 @@ pub struct ParquetFileParams {
pub max_time: Timestamp,
/// file size in bytes
pub file_size_bytes: i64,
/// thrift-encoded parquet metadata
pub parquet_metadata: Vec<u8>,
/// the number of rows of data in this file
pub row_count: i64,
/// the compaction level of the file

View File

@ -360,7 +360,6 @@ async fn load_parquet_files(
min_time: Timestamp::new(p.min_time),
max_time: Timestamp::new(p.max_time),
file_size_bytes: p.file_size_bytes,
parquet_metadata: vec![],
row_count: p.row_count,
compaction_level: p.compaction_level as i16,
created_at: Timestamp::new(p.created_at),

View File

@ -2473,7 +2473,6 @@ mod tests {
min_time: Timestamp::new(1),
max_time: Timestamp::new(1),
file_size_bytes: 0,
parquet_metadata: vec![],
row_count: 0,
compaction_level: INITIAL_COMPACTION_LEVEL,
created_at: Timestamp::new(1),

View File

@ -0,0 +1 @@
ALTER TABLE parquet_file DROP COLUMN IF EXISTS parquet_metadata;

View File

@ -3,10 +3,10 @@
use async_trait::async_trait;
use data_types::{
Column, ColumnSchema, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace,
NamespaceId, NamespaceSchema, ParquetFile, ParquetFileId, ParquetFileParams,
ParquetFileWithMetadata, Partition, PartitionId, PartitionInfo, PartitionKey,
ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table,
TableId, TablePartition, TableSchema, Timestamp, Tombstone, TombstoneId,
NamespaceId, NamespaceSchema, ParquetFile, ParquetFileId, ParquetFileParams, Partition,
PartitionId, PartitionInfo, PartitionKey, ProcessedTombstone, QueryPool, QueryPoolId,
SequenceNumber, Sequencer, SequencerId, Table, TableId, TablePartition, TableSchema, Timestamp,
Tombstone, TombstoneId,
};
use iox_time::TimeProvider;
use snafu::{OptionExt, Snafu};
@ -525,13 +525,6 @@ pub trait ParquetFileRepo: Send + Sync {
/// [`to_delete`](ParquetFile::to_delete).
async fn list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result<Vec<ParquetFile>>;
/// List all parquet files and their metadata within a given table that are NOT marked as
/// [`to_delete`](ParquetFile::to_delete). Fetching metadata can be expensive.
async fn list_by_table_not_to_delete_with_metadata(
&mut self,
table_id: TableId,
) -> Result<Vec<ParquetFileWithMetadata>>;
/// Delete all parquet files that were marked to be deleted earlier than the specified time.
/// Returns the deleted records.
async fn delete_old(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFile>>;
@ -557,13 +550,6 @@ pub trait ParquetFileRepo: Send + Sync {
partition_id: PartitionId,
) -> Result<Vec<ParquetFile>>;
/// List parquet files and their metadata for a given partition that are NOT marked as
/// [`to_delete`](ParquetFile::to_delete). Fetching metadata can be expensive.
async fn list_by_partition_not_to_delete_with_metadata(
&mut self,
partition_id: PartitionId,
) -> Result<Vec<ParquetFileWithMetadata>>;
/// Update the compaction level of the specified parquet files to level 1. Returns the IDs
/// of the files that were successfully updated.
async fn update_to_level_1(
@ -574,9 +560,6 @@ pub trait ParquetFileRepo: Send + Sync {
/// Verify if the parquet file exists by selecting its id
async fn exist(&mut self, id: ParquetFileId) -> Result<bool>;
/// Fetch the parquet_metadata bytes for the given id. Potentially expensive.
async fn parquet_metadata(&mut self, id: ParquetFileId) -> Result<Vec<u8>>;
/// Return count
async fn count(&mut self) -> Result<i64>;
@ -1691,7 +1674,6 @@ pub(crate) mod test_helpers {
min_time,
max_time,
file_size_bytes: 1337,
parquet_metadata: b"md1".to_vec(),
row_count: 0,
compaction_level: INITIAL_COMPACTION_LEVEL,
created_at: Timestamp::new(1),
@ -1903,7 +1885,6 @@ pub(crate) mod test_helpers {
min_time: Timestamp::new(1),
max_time: Timestamp::new(10),
file_size_bytes: 1337,
parquet_metadata: b"md1".to_vec(),
row_count: 0,
compaction_level: INITIAL_COMPACTION_LEVEL,
created_at: Timestamp::new(1),
@ -1923,13 +1904,6 @@ pub(crate) mod test_helpers {
.unwrap();
assert_eq!(parquet_file, pfg.unwrap());
let metadata = repos
.parquet_files()
.parquet_metadata(parquet_file.id)
.await
.unwrap();
assert_eq!(metadata, b"md1".to_vec());
// verify that trying to create a file with the same UUID throws an error
let err = repos
.parquet_files()
@ -2025,23 +1999,6 @@ pub(crate) mod test_helpers {
.unwrap();
assert_eq!(files, vec![other_file.clone()]);
// test list_by_table_not_to_delete_with_metadata
let files = repos
.parquet_files()
.list_by_table_not_to_delete_with_metadata(table.id)
.await
.unwrap();
assert_eq!(files, vec![]);
let files = repos
.parquet_files()
.list_by_table_not_to_delete_with_metadata(other_table.id)
.await
.unwrap();
assert_eq!(
files,
vec![ParquetFileWithMetadata::new(other_file, b"md1".to_vec())]
);
// test list_by_namespace_not_to_delete
let namespace2 = repos
.namespaces()
@ -2264,7 +2221,6 @@ pub(crate) mod test_helpers {
min_time,
max_time,
file_size_bytes: 1337,
parquet_metadata: b"md1".to_vec(),
row_count: 0,
compaction_level: INITIAL_COMPACTION_LEVEL,
created_at: Timestamp::new(1),
@ -2395,7 +2351,6 @@ pub(crate) mod test_helpers {
min_time: query_min_time + 1,
max_time: query_max_time - 1,
file_size_bytes: 1337,
parquet_metadata: b"md1".to_vec(),
row_count: 0,
compaction_level: INITIAL_COMPACTION_LEVEL,
created_at: Timestamp::new(1),
@ -2602,7 +2557,6 @@ pub(crate) mod test_helpers {
min_time,
max_time,
file_size_bytes: 1337,
parquet_metadata: b"md1".to_vec(),
row_count: 0,
compaction_level: INITIAL_COMPACTION_LEVEL,
created_at: Timestamp::new(1),
@ -2661,19 +2615,6 @@ pub(crate) mod test_helpers {
.await
.unwrap();
assert_eq!(files, vec![parquet_file.clone(), level1_file.clone()]);
let files = repos
.parquet_files()
.list_by_partition_not_to_delete_with_metadata(partition.id)
.await
.unwrap();
assert_eq!(
files,
vec![
ParquetFileWithMetadata::new(parquet_file, b"md1".to_vec()),
ParquetFileWithMetadata::new(level1_file, b"md1".to_vec()),
]
);
}
async fn test_update_to_compaction_level_1(catalog: Arc<dyn Catalog>) {
@ -2722,7 +2663,6 @@ pub(crate) mod test_helpers {
min_time: query_min_time + 1,
max_time: query_max_time - 1,
file_size_bytes: 1337,
parquet_metadata: b"md1".to_vec(),
row_count: 0,
compaction_level: INITIAL_COMPACTION_LEVEL,
created_at: Timestamp::new(1),
@ -2840,7 +2780,6 @@ pub(crate) mod test_helpers {
min_time: Timestamp::new(100),
max_time: Timestamp::new(250),
file_size_bytes: 1337,
parquet_metadata: b"md1".to_vec(),
row_count: 0,
compaction_level: INITIAL_COMPACTION_LEVEL,
created_at: Timestamp::new(1),

View File

@ -13,19 +13,14 @@ use crate::{
use async_trait::async_trait;
use data_types::{
Column, ColumnId, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId,
ParquetFile, ParquetFileId, ParquetFileParams, ParquetFileWithMetadata, Partition, PartitionId,
PartitionInfo, PartitionKey, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber,
Sequencer, SequencerId, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo,
PartitionKey, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer,
SequencerId, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
};
use iox_time::{SystemProvider, TimeProvider};
use observability_deps::tracing::warn;
use sqlx::types::Uuid;
use std::{
collections::{BTreeMap, HashSet},
convert::TryFrom,
fmt::Formatter,
sync::Arc,
};
use std::{collections::HashSet, convert::TryFrom, fmt::Formatter, sync::Arc};
use tokio::sync::{Mutex, OwnedMutexGuard};
/// In-memory catalog that implements the `RepoCollection` and individual repo traits from
@ -64,7 +59,6 @@ struct MemCollections {
partitions: Vec<Partition>,
tombstones: Vec<Tombstone>,
parquet_files: Vec<ParquetFile>,
parquet_file_metadata: BTreeMap<ParquetFileId, Vec<u8>>,
processed_tombstones: Vec<ProcessedTombstone>,
}
@ -947,7 +941,6 @@ impl ParquetFileRepo for MemTxn {
min_time,
max_time,
file_size_bytes,
parquet_metadata,
row_count,
compaction_level,
created_at,
@ -980,12 +973,8 @@ impl ParquetFileRepo for MemTxn {
created_at,
column_set,
};
stage
.parquet_file_metadata
.insert(parquet_file.id, parquet_metadata);
stage.parquet_files.push(parquet_file);
Ok(stage.parquet_files.last().unwrap().clone())
}
@ -1049,33 +1038,6 @@ impl ParquetFileRepo for MemTxn {
Ok(parquet_files)
}
async fn list_by_table_not_to_delete_with_metadata(
&mut self,
table_id: TableId,
) -> Result<Vec<ParquetFileWithMetadata>> {
let stage = self.stage();
let parquet_files: Vec<_> = stage
.parquet_files
.iter()
.filter(|f| table_id == f.table_id && f.to_delete.is_none())
.cloned()
.map(|f| {
let parquet_file_id = f.id;
ParquetFileWithMetadata::new(
f,
stage
.parquet_file_metadata
.get(&parquet_file_id)
.cloned()
.unwrap_or_default(),
)
})
.collect();
Ok(parquet_files)
}
async fn delete_old(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFile>> {
let stage = self.stage();
@ -1085,10 +1047,6 @@ impl ParquetFileRepo for MemTxn {
stage.parquet_files = keep;
for delete in &delete {
stage.parquet_file_metadata.remove(&delete.id);
}
Ok(delete)
}
@ -1143,32 +1101,6 @@ impl ParquetFileRepo for MemTxn {
.collect())
}
async fn list_by_partition_not_to_delete_with_metadata(
&mut self,
partition_id: PartitionId,
) -> Result<Vec<ParquetFileWithMetadata>> {
let stage = self.stage();
Ok(stage
.parquet_files
.iter()
.filter(|f| f.partition_id == partition_id && f.to_delete.is_none())
.cloned()
.map(|f| {
let parquet_file_id = f.id;
ParquetFileWithMetadata::new(
f,
stage
.parquet_file_metadata
.get(&parquet_file_id)
.cloned()
.unwrap_or_default(),
)
})
.collect())
}
async fn update_to_level_1(
&mut self,
parquet_file_ids: &[ParquetFileId],
@ -1195,16 +1127,6 @@ impl ParquetFileRepo for MemTxn {
Ok(stage.parquet_files.iter().any(|f| f.id == id))
}
async fn parquet_metadata(&mut self, id: ParquetFileId) -> Result<Vec<u8>> {
let stage = self.stage();
stage
.parquet_file_metadata
.get(&id)
.cloned()
.ok_or(Error::ParquetRecordNotFound { id })
}
async fn count(&mut self) -> Result<i64> {
let stage = self.stage();

View File

@ -8,9 +8,9 @@ use crate::interface::{
use async_trait::async_trait;
use data_types::{
Column, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId,
ParquetFile, ParquetFileId, ParquetFileParams, ParquetFileWithMetadata, Partition, PartitionId,
PartitionInfo, PartitionKey, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber,
Sequencer, SequencerId, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo,
PartitionKey, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer,
SequencerId, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
};
use iox_time::{SystemProvider, TimeProvider};
use metric::{DurationHistogram, Metric};
@ -268,15 +268,12 @@ decorate!(
"parquet_list_by_sequencer_greater_than" = list_by_sequencer_greater_than(&mut self, sequencer_id: SequencerId, sequence_number: SequenceNumber) -> Result<Vec<ParquetFile>>;
"parquet_list_by_namespace_not_to_delete" = list_by_namespace_not_to_delete(&mut self, namespace_id: NamespaceId) -> Result<Vec<ParquetFile>>;
"parquet_list_by_table_not_to_delete" = list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result<Vec<ParquetFile>>;
"parquet_list_by_table_not_to_delete_with_metadata" = list_by_table_not_to_delete_with_metadata(&mut self, table_id: TableId) -> Result<Vec<ParquetFileWithMetadata>>;
"parquet_delete_old" = delete_old(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFile>>;
"parquet_list_by_partition_not_to_delete" = list_by_partition_not_to_delete(&mut self, partition_id: PartitionId) -> Result<Vec<ParquetFile>>;
"parquet_list_by_partition_not_to_delete_with_metadata" = list_by_partition_not_to_delete_with_metadata(&mut self, partition_id: PartitionId) -> Result<Vec<ParquetFileWithMetadata>>;
"parquet_level_0" = level_0(&mut self, sequencer_id: SequencerId) -> Result<Vec<ParquetFile>>;
"parquet_level_1" = level_1(&mut self, table_partition: TablePartition, min_time: Timestamp, max_time: Timestamp) -> Result<Vec<ParquetFile>>;
"parquet_update_to_level_1" = update_to_level_1(&mut self, parquet_file_ids: &[ParquetFileId]) -> Result<Vec<ParquetFileId>>;
"parquet_exist" = exist(&mut self, id: ParquetFileId) -> Result<bool>;
"parquet_metadata" = parquet_metadata(&mut self, id: ParquetFileId) -> Result<Vec<u8>>;
"parquet_count" = count(&mut self) -> Result<i64>;
"parquet_count_by_overlaps" = count_by_overlaps(&mut self, table_id: TableId, sequencer_id: SequencerId, min_time: Timestamp, max_time: Timestamp, sequence_number: SequenceNumber) -> Result<i64>;
"parquet_get_by_object_store_id" = get_by_object_store_id(&mut self, object_store_id: Uuid) -> Result<Option<ParquetFile>>;

View File

@ -12,9 +12,9 @@ use crate::{
use async_trait::async_trait;
use data_types::{
Column, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId,
ParquetFile, ParquetFileId, ParquetFileParams, ParquetFileWithMetadata, Partition, PartitionId,
PartitionInfo, PartitionKey, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber,
Sequencer, SequencerId, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo,
PartitionKey, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer,
SequencerId, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
};
use iox_time::{SystemProvider, TimeProvider};
use observability_deps::tracing::{debug, info, warn};
@ -1465,7 +1465,6 @@ impl ParquetFileRepo for PostgresTxn {
min_time,
max_time,
file_size_bytes,
parquet_metadata,
row_count,
compaction_level,
created_at,
@ -1476,9 +1475,9 @@ impl ParquetFileRepo for PostgresTxn {
r#"
INSERT INTO parquet_file (
sequencer_id, table_id, partition_id, object_store_id, min_sequence_number,
max_sequence_number, min_time, max_time, file_size_bytes, parquet_metadata,
max_sequence_number, min_time, max_time, file_size_bytes,
row_count, compaction_level, created_at, namespace_id, column_set )
VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15 )
VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14 )
RETURNING *;
"#,
)
@ -1491,12 +1490,11 @@ RETURNING *;
.bind(min_time) // $7
.bind(max_time) // $8
.bind(file_size_bytes) // $9
.bind(parquet_metadata) // $10
.bind(row_count) // $11
.bind(compaction_level) // $12
.bind(created_at) // $13
.bind(namespace_id) // $14
.bind(column_set) // $15
.bind(row_count) // $10
.bind(compaction_level) // $11
.bind(created_at) // $12
.bind(namespace_id) // $13
.bind(column_set) // $14
.fetch_one(&mut self.inner)
.await
.map_err(|e| {
@ -1593,23 +1591,6 @@ WHERE table_id = $1 AND to_delete IS NULL;
.map_err(|e| Error::SqlxError { source: e })
}
async fn list_by_table_not_to_delete_with_metadata(
&mut self,
table_id: TableId,
) -> Result<Vec<ParquetFileWithMetadata>> {
sqlx::query_as::<_, ParquetFileWithMetadata>(
r#"
SELECT *
FROM parquet_file
WHERE table_id = $1 AND to_delete IS NULL;
"#,
)
.bind(&table_id) // $1
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn delete_old(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFile>> {
sqlx::query_as::<_, ParquetFile>(
r#"
@ -1703,24 +1684,6 @@ WHERE parquet_file.partition_id = $1
.map_err(|e| Error::SqlxError { source: e })
}
async fn list_by_partition_not_to_delete_with_metadata(
&mut self,
partition_id: PartitionId,
) -> Result<Vec<ParquetFileWithMetadata>> {
sqlx::query_as::<_, ParquetFileWithMetadata>(
r#"
SELECT *
FROM parquet_file
WHERE parquet_file.partition_id = $1
AND parquet_file.to_delete IS NULL;
"#,
)
.bind(&partition_id) // $1
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn update_to_level_1(
&mut self,
parquet_file_ids: &[ParquetFileId],
@ -1757,17 +1720,6 @@ RETURNING id;
Ok(read_result.count > 0)
}
async fn parquet_metadata(&mut self, id: ParquetFileId) -> Result<Vec<u8>> {
let read_result =
sqlx::query(r#"SELECT parquet_metadata FROM parquet_file WHERE id = $1;"#)
.bind(&id) // $1
.fetch_one(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(read_result.get("parquet_metadata"))
}
async fn count(&mut self) -> Result<i64> {
let read_result =
sqlx::query_as::<_, Count>(r#"SELECT count(*) as count FROM parquet_file;"#)

View File

@ -5,13 +5,13 @@ use arrow::{
record_batch::RecordBatch,
};
use data_types::{
Column, ColumnSet, ColumnType, KafkaPartition, KafkaTopic, Namespace, ParquetFile,
ParquetFileId, ParquetFileParams, ParquetFileWithMetadata, Partition, PartitionId, QueryPool,
SequenceNumber, Sequencer, SequencerId, Table, TableId, Timestamp, Tombstone, TombstoneId,
Column, ColumnSet, ColumnType, KafkaPartition, KafkaTopic, Namespace, NamespaceSchema,
ParquetFile, ParquetFileParams, Partition, PartitionId, QueryPool, SequenceNumber, Sequencer,
SequencerId, Table, TableId, Timestamp, Tombstone, TombstoneId,
};
use datafusion::physical_plan::metrics::Count;
use iox_catalog::{
interface::{Catalog, PartitionRepo, INITIAL_COMPACTION_LEVEL},
interface::{get_schema_by_id, Catalog, PartitionRepo, INITIAL_COMPACTION_LEVEL},
mem::MemCatalog,
};
use iox_query::{exec::Executor, provider::RecordBatchDeduplicator, util::arrow_sort_key_exprs};
@ -19,7 +19,7 @@ use iox_time::{MockProvider, Time, TimeProvider};
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use object_store::{memory::InMemory, DynObjectStore};
use observability_deps::tracing::debug;
use parquet_file::{chunk::DecodedParquetFile, metadata::IoxMetadata, storage::ParquetStorage};
use parquet_file::{metadata::IoxMetadata, storage::ParquetStorage};
use schema::{
selection::Selection,
sort::{adjust_sort_key_columns, compute_sort_key, SortKey},
@ -209,31 +209,6 @@ impl TestCatalog {
.await
.unwrap()
}
/// List all non-deleted files with their metadata
pub async fn list_by_table_not_to_delete_with_metadata(
self: &Arc<Self>,
table_id: TableId,
) -> Vec<ParquetFileWithMetadata> {
self.catalog
.repositories()
.await
.parquet_files()
.list_by_table_not_to_delete_with_metadata(table_id)
.await
.unwrap()
}
/// Get a parquet file's metadata bytes
pub async fn parquet_metadata(&self, parquet_file_id: ParquetFileId) -> Vec<u8> {
self.catalog
.repositories()
.await
.parquet_files()
.parquet_metadata(parquet_file_id)
.await
.unwrap()
}
}
/// A test namespace
@ -280,6 +255,14 @@ impl TestNamespace {
sequencer,
})
}
/// Get namespace schema for this namespace.
pub async fn schema(&self) -> NamespaceSchema {
let mut repos = self.catalog.catalog.repositories().await;
get_schema_by_id(self.namespace.id, repos.as_mut())
.await
.unwrap()
}
}
/// A test sequencer with ist namespace in the catalog
@ -338,6 +321,19 @@ impl TestTable {
column,
})
}
/// Get schema for this table.
pub async fn schema(&self) -> Schema {
self.namespace
.schema()
.await
.tables
.get(&self.table.name)
.unwrap()
.clone()
.try_into()
.unwrap()
}
}
/// A test column.
@ -575,7 +571,7 @@ impl TestPartition {
.iter()
.map(|f| f.name().clone()),
);
let (parquet_metadata_bin, real_file_size_bytes) = create_parquet_file(
let real_file_size_bytes = create_parquet_file(
ParquetStorage::new(Arc::clone(&self.catalog.object_store)),
&metadata,
record_batch,
@ -593,7 +589,6 @@ impl TestPartition {
min_time: Timestamp::new(min_time),
max_time: Timestamp::new(max_time),
file_size_bytes: file_size_bytes.unwrap_or(real_file_size_bytes as i64),
parquet_metadata: parquet_metadata_bin.clone(),
row_count: row_count as i64,
created_at: Timestamp::new(creation_time),
compaction_level: INITIAL_COMPACTION_LEVEL,
@ -605,13 +600,14 @@ impl TestPartition {
.await
.unwrap();
let parquet_file = ParquetFileWithMetadata::new(parquet_file, parquet_metadata_bin);
update_catalog_sort_key_if_needed(repos.partitions(), self.partition.id, sort_key).await;
TestParquetFile {
catalog: Arc::clone(&self.catalog),
namespace: Arc::clone(&self.namespace),
table: Arc::clone(&self.table),
sequencer: Arc::clone(&self.sequencer),
partition: Arc::clone(self),
parquet_file,
}
}
@ -713,18 +709,18 @@ async fn update_catalog_sort_key_if_needed(
}
}
/// Create parquet file and return thrift-encoded and zstd-compressed parquet metadata as well as the file size.
/// Create parquet file and return file size.
async fn create_parquet_file(
store: ParquetStorage,
metadata: &IoxMetadata,
record_batch: RecordBatch,
) -> (Vec<u8>, usize) {
) -> usize {
let stream = futures::stream::once(async { Ok(record_batch) });
let (meta, file_size) = store
let (_meta, file_size) = store
.upload(stream, metadata)
.await
.expect("persisting parquet file should succeed");
(meta.thrift_bytes().to_vec(), file_size)
file_size
}
/// A test parquet file of the catalog
@ -732,7 +728,10 @@ async fn create_parquet_file(
pub struct TestParquetFile {
pub catalog: Arc<TestCatalog>,
pub namespace: Arc<TestNamespace>,
pub parquet_file: ParquetFileWithMetadata,
pub table: Arc<TestTable>,
pub sequencer: Arc<TestSequencer>,
pub partition: Arc<TestPartition>,
pub parquet_file: ParquetFile,
}
impl TestParquetFile {
@ -747,14 +746,16 @@ impl TestParquetFile {
.unwrap()
}
/// When only the ParquetFile is needed without the metadata, use this instead of the field
pub fn parquet_file_no_metadata(self) -> ParquetFile {
self.parquet_file.split_off_metadata().0
}
/// Get Parquet file schema.
pub fn schema(&self) -> Arc<Schema> {
DecodedParquetFile::new(self.parquet_file.clone()).schema()
pub async fn schema(&self) -> Arc<Schema> {
let table_schema = self.table.schema().await;
let selection: Vec<_> = self
.parquet_file
.column_set
.iter()
.map(|s| s.as_str())
.collect();
Arc::new(table_schema.select_by_names(&selection).unwrap())
}
}

View File

@ -1,18 +1,11 @@
//! A metadata summary of a Parquet file in object storage, with the ability to
//! download & execute a scan.
use crate::{
metadata::{DecodedIoxParquetMetaData, IoxMetadata, IoxParquetMetaData},
storage::ParquetStorage,
ParquetFilePath,
};
use data_types::{
ParquetFile, ParquetFileId, ParquetFileWithMetadata, PartitionId, SequenceNumber, SequencerId,
TableId, TimestampMinMax, TimestampRange,
};
use crate::{storage::ParquetStorage, ParquetFilePath};
use data_types::{ParquetFile, TimestampMinMax, TimestampRange};
use datafusion::physical_plan::SendableRecordBatchStream;
use predicate::Predicate;
use schema::{selection::Selection, sort::SortKey, Schema};
use schema::{selection::Selection, Schema};
use std::{collections::BTreeSet, mem, sync::Arc};
use uuid::Uuid;
@ -118,87 +111,3 @@ impl ParquetChunk {
}
}
}
/// Parquet file with decoded metadata.
#[derive(Debug)]
#[allow(missing_docs)]
pub struct DecodedParquetFile {
pub parquet_file: ParquetFile,
pub parquet_metadata: Arc<IoxParquetMetaData>,
pub decoded_metadata: DecodedIoxParquetMetaData,
pub iox_metadata: IoxMetadata,
}
impl DecodedParquetFile {
/// initialise a [`DecodedParquetFile`] from the provided file & metadata.
pub fn new(parquet_file_with_metadata: ParquetFileWithMetadata) -> Self {
let (parquet_file, parquet_metadata) = parquet_file_with_metadata.split_off_metadata();
let parquet_metadata = Arc::new(IoxParquetMetaData::from_thrift_bytes(parquet_metadata));
let decoded_metadata = parquet_metadata.decode().expect("parquet metadata broken");
let iox_metadata = decoded_metadata
.read_iox_metadata_new()
.expect("cannot read IOx metadata from parquet MD");
Self {
parquet_file,
parquet_metadata,
decoded_metadata,
iox_metadata,
}
}
/// The IOx schema from the decoded IOx parquet metadata
pub fn schema(&self) -> Arc<Schema> {
self.decoded_metadata.read_schema().unwrap()
}
/// The IOx parquet file ID
pub fn parquet_file_id(&self) -> ParquetFileId {
self.parquet_file.id
}
/// The IOx partition ID
pub fn partition_id(&self) -> PartitionId {
self.parquet_file.partition_id
}
/// The IOx sequencer ID
pub fn sequencer_id(&self) -> SequencerId {
self.iox_metadata.sequencer_id
}
/// The IOx table ID
pub fn table_id(&self) -> TableId {
self.parquet_file.table_id
}
/// The sort key from the IOx metadata
pub fn sort_key(&self) -> Option<&SortKey> {
self.iox_metadata.sort_key.as_ref()
}
/// The minimum sequence number in this file
pub fn min_sequence_number(&self) -> SequenceNumber {
self.parquet_file.min_sequence_number
}
/// The maximum sequence number in this file
pub fn max_sequence_number(&self) -> SequenceNumber {
self.parquet_file.max_sequence_number
}
/// Estimate the memory consumption of this object and its contents
pub fn size(&self) -> usize {
// note substract size of non Arc'd members as they are
// already included in Type::size()
mem::size_of_val(self) +
self.parquet_file.size() -
mem::size_of_val(&self.parquet_file) +
self.parquet_metadata.size() +
// parquet_metadata is wrapped in Arc so not included in size of self
self.decoded_metadata.size()
- mem::size_of_val(&self.decoded_metadata)
+ self.iox_metadata.size()
- mem::size_of_val(&self.iox_metadata)
}
}

View File

@ -439,7 +439,6 @@ impl IoxMetadata {
min_time,
max_time,
file_size_bytes: file_size_bytes as i64,
parquet_metadata: metadata.thrift_bytes().to_vec(),
compaction_level: self.compaction_level,
row_count: row_count.try_into().expect("row count overflows i64"),
created_at: Timestamp::new(self.creation_timestamp.timestamp_nanos()),

View File

@ -223,7 +223,7 @@ mod tests {
use std::collections::HashSet;
use super::*;
use data_types::{ParquetFile, ParquetFileId};
use data_types::ParquetFileId;
use iox_tests::util::{TestCatalog, TestNamespace, TestParquetFile, TestPartition, TestTable};
use crate::cache::{ram::test_util::test_ram_pool, test_util::assert_histogram_metric_count};
@ -239,8 +239,8 @@ mod tests {
let cached_files = cache.get(table.table.id).await.vec();
assert_eq!(cached_files.len(), 1);
let expected_parquet_file = to_file(tfile);
assert_eq!(cached_files[0].as_ref(), &expected_parquet_file);
let expected_parquet_file = &tfile.parquet_file;
assert_eq!(cached_files[0].as_ref(), expected_parquet_file);
// validate a second request doens't result in a catalog request
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
@ -263,13 +263,13 @@ mod tests {
let cached_files = cache.get(table1.table.id).await.vec();
assert_eq!(cached_files.len(), 1);
let expected_parquet_file = to_file(tfile1);
assert_eq!(cached_files[0].as_ref(), &expected_parquet_file);
let expected_parquet_file = &tfile1.parquet_file;
assert_eq!(cached_files[0].as_ref(), expected_parquet_file);
let cached_files = cache.get(table2.table.id).await.vec();
assert_eq!(cached_files.len(), 1);
let expected_parquet_file = to_file(tfile2);
assert_eq!(cached_files[0].as_ref(), &expected_parquet_file);
let expected_parquet_file = &tfile2.parquet_file;
assert_eq!(cached_files[0].as_ref(), expected_parquet_file);
}
#[tokio::test]
@ -479,9 +479,4 @@ mod tests {
test_ram_pool(),
)
}
fn to_file(tfile: TestParquetFile) -> ParquetFile {
let (parquet_file, _meta) = tfile.parquet_file.split_off_metadata();
parquet_file
}
}

View File

@ -136,6 +136,9 @@ struct CachedPartition {
impl CachedPartition {
/// RAM-bytes EXCLUDING `self`.
fn size(&self) -> usize {
// Arc heap allocation
size_of_val(self.sort_key.as_ref()) +
// Arc content
self.sort_key
.as_ref()
.as_ref()

View File

@ -203,6 +203,7 @@ mod tests {
use crate::cache::ram::test_util::test_ram_pool;
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_eq;
use data_types::ColumnType;
use datafusion_util::stream_from_batches;
use iox_tests::util::{TestCatalog, TestPartition};
use metric::{Attributes, CumulativeGauge, Metric, U64Counter};
@ -225,6 +226,8 @@ mod tests {
let ns = catalog.create_namespace("ns").await;
let table = ns.create_table("table1").await;
table.create_column("foo", ColumnType::F64).await;
table.create_column("time", ColumnType::Time).await;
let sequencer1 = ns.create_sequencer(1).await;
let partition = table
@ -240,8 +243,8 @@ mod tests {
let (catalog, partition) = make_catalog().await;
let test_parquet_file = partition.create_parquet_file("table1 foo=1 11").await;
let schema = test_parquet_file.schema();
let parquet_file = Arc::new(test_parquet_file.parquet_file_no_metadata());
let schema = test_parquet_file.schema().await;
let parquet_file = Arc::new(test_parquet_file.parquet_file.clone());
let storage = ParquetStorage::new(Arc::clone(&catalog.object_store));
let cache = make_cache(&catalog);
@ -299,6 +302,8 @@ mod tests {
for i in 1..=3 {
let table_name = format!("cached_table{i}");
let table = ns.create_table(&table_name).await;
table.create_column("foo", ColumnType::F64).await;
table.create_column("time", ColumnType::Time).await;
let sequencer1 = ns.create_sequencer(1).await;
let partition = table
@ -309,8 +314,8 @@ mod tests {
let test_parquet_file = partition
.create_parquet_file(&format!("{table_name} foo=1 11"))
.await;
let schema = test_parquet_file.schema();
let parquet_file = Arc::new(test_parquet_file.parquet_file_no_metadata());
let schema = test_parquet_file.schema().await;
let parquet_file = Arc::new(test_parquet_file.parquet_file.clone());
parquet_files.push(parquet_file);
schemas.push(schema);
}
@ -469,8 +474,8 @@ mod tests {
let (catalog, partition) = make_catalog().await;
let test_parquet_file = partition.create_parquet_file("table1 foo=1 11").await;
let schema = test_parquet_file.schema();
let parquet_file = Arc::new(test_parquet_file.parquet_file_no_metadata());
let schema = test_parquet_file.schema().await;
let parquet_file = Arc::new(test_parquet_file.parquet_file.clone());
let storage = ParquetStorage::new(Arc::clone(&catalog.object_store));
let cache = make_cache(&catalog);

View File

@ -275,8 +275,8 @@ mod tests {
let cache = make_cache(&catalog);
let single_tombstone_size = 96;
let two_tombstone_size = 176;
let single_tombstone_size = 101;
let two_tombstone_size = 186;
assert!(single_tombstone_size < two_tombstone_size);
// Create tombstone 1

View File

@ -2,15 +2,12 @@
use crate::cache::CatalogCache;
use data_types::{
ChunkId, ChunkOrder, DeletePredicate, ParquetFile, ParquetFileId, ParquetFileWithMetadata,
PartitionId, SequenceNumber, SequencerId, TableSummary, TimestampMinMax, TimestampRange,
ChunkId, ChunkOrder, DeletePredicate, ParquetFile, ParquetFileId, PartitionId, SequenceNumber,
SequencerId, TableSummary, TimestampMinMax, TimestampRange,
};
use iox_catalog::interface::Catalog;
use iox_time::TimeProvider;
use parquet_file::{
chunk::{DecodedParquetFile, ParquetChunk},
storage::ParquetStorage,
};
use parquet_file::{chunk::ParquetChunk, storage::ParquetStorage};
use read_buffer::RBChunk;
use schema::{sort::SortKey, Schema};
use std::{collections::HashSet, sync::Arc};
@ -299,83 +296,31 @@ impl ChunkAdapter {
self.catalog_cache.catalog()
}
/// Create parquet chunk.
///
/// Returns `None` if some data required to create this chunk is already gone from the catalog.
///
/// CURRENTLY UNUSED: The querier is creating and caching read buffer chunks instead, using
/// the `new_rb_chunk` method.
async fn new_parquet_chunk(
&self,
decoded_parquet_file: &DecodedParquetFile,
) -> Option<ParquetChunk> {
Some(ParquetChunk::new(
Arc::new(decoded_parquet_file.parquet_file.clone()),
decoded_parquet_file.schema(),
self.store.clone(),
))
}
/// Create new querier Parquet chunk from a catalog record
///
/// Returns `None` if some data required to create this chunk is already gone from the catalog.
///
/// CURRENTLY UNUSED: The querier is creating and caching read buffer chunks instead, using
/// the `new_rb_chunk` method.
#[allow(dead_code)]
async fn new_querier_parquet_chunk_from_file_with_metadata(
&self,
parquet_file_with_metadata: ParquetFileWithMetadata,
) -> Option<QuerierParquetChunk> {
let decoded_parquet_file = DecodedParquetFile::new(parquet_file_with_metadata);
self.new_querier_parquet_chunk(&decoded_parquet_file).await
}
/// Create new querier Parquet chunk.
///
/// Returns `None` if some data required to create this chunk is already gone from the catalog.
///
/// CURRENTLY UNUSED: The querier is creating and caching read buffer chunks instead, using
/// the `new_rb_chunk` method.
pub async fn new_querier_parquet_chunk(
pub async fn new_parquet_chunk(
&self,
decoded_parquet_file: &DecodedParquetFile,
namespace_name: Arc<str>,
parquet_file: Arc<ParquetFile>,
) -> Option<QuerierParquetChunk> {
let parquet_file_id = decoded_parquet_file.parquet_file_id();
let table_id = decoded_parquet_file.table_id();
let chunk = Arc::new(self.new_parquet_chunk(decoded_parquet_file).await?);
let chunk_id = ChunkId::from(Uuid::from_u128(parquet_file_id.get() as _));
let table_name = self.catalog_cache.table().name(table_id).await?;
let order = ChunkOrder::new(decoded_parquet_file.min_sequence_number().get())
.expect("Error converting min sequence number to chunk order");
// Read partition sort key
let schema = decoded_parquet_file.schema();
let pk_columns: HashSet<&str> = schema.primary_key().into_iter().collect();
let partition_sort_key = self
.catalog_cache()
.partition()
.sort_key(decoded_parquet_file.partition_id(), &pk_columns)
.await;
let meta = Arc::new(ChunkMeta {
chunk_id,
table_name,
order,
sort_key: decoded_parquet_file.sort_key().cloned(),
sequencer_id: decoded_parquet_file.sequencer_id(),
partition_id: decoded_parquet_file.partition_id(),
min_sequence_number: decoded_parquet_file.min_sequence_number(),
max_sequence_number: decoded_parquet_file.max_sequence_number(),
});
let parts = self
.chunk_parts(namespace_name, Arc::clone(&parquet_file))
.await?;
let chunk = Arc::new(ParquetChunk::new(
parquet_file,
parts.schema,
self.store.clone(),
));
Some(QuerierParquetChunk::new(
parquet_file_id,
parts.parquet_file_id,
chunk,
meta,
partition_sort_key,
parts.meta,
parts.partition_sort_key,
))
}
@ -385,6 +330,30 @@ impl ChunkAdapter {
namespace_name: Arc<str>,
parquet_file: Arc<ParquetFile>,
) -> Option<QuerierRBChunk> {
let parts = self
.chunk_parts(namespace_name, Arc::clone(&parquet_file))
.await?;
let rb_chunk = self
.catalog_cache()
.read_buffer()
.get(parquet_file, Arc::clone(&parts.schema), self.store.clone())
.await;
Some(QuerierRBChunk::new(
parts.parquet_file_id,
rb_chunk,
parts.meta,
parts.schema,
parts.partition_sort_key,
))
}
async fn chunk_parts(
&self,
namespace_name: Arc<str>,
parquet_file: Arc<ParquetFile>,
) -> Option<ChunkParts> {
// gather schema information
let file_columns: HashSet<&str> =
parquet_file.column_set.iter().map(|s| s.as_str()).collect();
@ -461,16 +430,6 @@ impl ChunkAdapter {
let chunk_id = ChunkId::from(Uuid::from_u128(parquet_file.id.get() as _));
let rb_chunk = self
.catalog_cache()
.read_buffer()
.get(
Arc::clone(&parquet_file),
Arc::clone(&schema),
self.store.clone(),
)
.await;
let order = ChunkOrder::new(parquet_file.min_sequence_number.get())
.expect("Error converting min sequence number to chunk order");
@ -485,16 +444,22 @@ impl ChunkAdapter {
max_sequence_number: parquet_file.max_sequence_number,
});
Some(QuerierRBChunk::new(
parquet_file.id,
rb_chunk,
Some(ChunkParts {
parquet_file_id: parquet_file.id,
meta,
schema,
partition_sort_key,
))
})
}
}
struct ChunkParts {
parquet_file_id: ParquetFileId,
meta: Arc<ChunkMeta>,
schema: Arc<Schema>,
partition_sort_key: Arc<Option<SortKey>>,
}
#[cfg(test)]
pub mod tests {
use super::*;
@ -547,12 +512,7 @@ pub mod tests {
.await
.update_sort_key(SortKey::from_columns(["tag1", "tag2", "tag4", "time"]))
.await;
let parquet_file = Arc::new(
partition
.create_parquet_file(&lp)
.await
.parquet_file_no_metadata(),
);
let parquet_file = Arc::new(partition.create_parquet_file(&lp).await.parquet_file);
// create chunk
let chunk = adapter

View File

@ -172,7 +172,6 @@ mod tests {
min_time: Timestamp::new(1),
max_time: Timestamp::new(5),
file_size_bytes: 2343,
parquet_metadata: vec![],
row_count: 29,
compaction_level: 0,
created_at: Timestamp::new(2343),

View File

@ -150,7 +150,6 @@ mod tests {
min_time: Timestamp::new(1),
max_time: Timestamp::new(5),
file_size_bytes: 2343,
parquet_metadata: vec![],
row_count: 29,
compaction_level: 0,
created_at: Timestamp::new(2343),