fix: Do fewer queries for metadata

By adding another _with_metadata catalog function. Also introduce a new
type rather than passing around tuples everywhere.
pull/24376/head
Carol (Nichols || Goulding) 2022-04-11 15:44:11 -04:00
parent bba4251363
commit 94dcde4996
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
11 changed files with 360 additions and 186 deletions

View File

@ -10,8 +10,8 @@ use arrow::record_batch::RecordBatch;
use backoff::{Backoff, BackoffConfig};
use bytes::Bytes;
use data_types2::{
ParquetFile, ParquetFileId, PartitionId, SequencerId, TableId, TablePartition, Timestamp,
Tombstone, TombstoneId,
ParquetFile, ParquetFileId, ParquetFileWithMetadata, PartitionId, SequencerId, TableId,
TablePartition, Timestamp, Tombstone, TombstoneId,
};
use datafusion::error::DataFusionError;
use iox_catalog::interface::{Catalog, Transaction};
@ -19,10 +19,7 @@ use iox_object_store::ParquetFilePath;
use metric::{Attributes, Metric, U64Counter, U64Gauge, U64Histogram, U64HistogramOptions};
use object_store::DynObjectStore;
use observability_deps::tracing::{debug, info, warn};
use parquet_file::{
chunk::DecodedParquetFile,
metadata::{IoxMetadata, IoxParquetMetaData},
};
use parquet_file::metadata::{IoxMetadata, IoxParquetMetaData};
use query::{
compute_sort_key_for_chunks, exec::ExecutorType, frontend::reorg::ReorgPlanner,
util::compute_timenanosecond_min_max,
@ -379,7 +376,7 @@ impl Compactor {
.repositories()
.await
.parquet_files()
.list_by_partition_not_to_delete(partition_id)
.list_by_partition_not_to_delete_with_metadata(partition_id)
.await
.context(ListParquetFilesSnafu)?;
if parquet_files.is_empty() {
@ -491,7 +488,7 @@ impl Compactor {
fn group_small_contiguous_groups(
mut file_groups: Vec<GroupWithMinTimeAndSize>,
compaction_max_size_bytes: i64,
) -> Vec<Vec<ParquetFile>> {
) -> Vec<Vec<ParquetFileWithMetadata>> {
let mut groups = Vec::with_capacity(file_groups.len());
if file_groups.is_empty() {
return groups;
@ -558,19 +555,8 @@ impl Compactor {
return Ok(compacted);
}
// Fetch the parquet metadata for the first file to reuse IDs and names
let parquet_metadata_bytes = {
let mut repos = self.catalog.repositories().await;
repos
.parquet_files()
.parquet_metadata(overlapped_files[0].data.id)
.await
.context(ParquetMetadataSnafu)?
};
let decoded_metadata =
DecodedParquetFile::new(*overlapped_files[0].data, parquet_metadata_bytes);
let iox_metadata = decoded_metadata.iox_metadata;
// Save the parquet metadata for the first file to reuse IDs and names
let iox_metadata = overlapped_files[0].iox_metadata();
// Collect all unique tombstone
let mut tombstone_map = overlapped_files[0].tombstones();
@ -600,23 +586,15 @@ impl Compactor {
}
// Convert the input files into QueryableParquetChunk for making query plan
let mut query_chunks = Vec::with_capacity(overlapped_files.len());
{
let mut repos = self.catalog.repositories().await;
for f in overlapped_files {
let parquet_metadata_bytes = repos
.parquet_files()
.parquet_metadata(f.data.id)
.await
.context(ParquetMetadataSnafu)?;
query_chunks.push(f.to_queryable_parquet_chunk(
let query_chunks: Vec<_> = overlapped_files
.iter()
.map(|f| {
f.to_queryable_parquet_chunk(
Arc::clone(&self.object_store),
iox_metadata.table_name.to_string(),
parquet_metadata_bytes,
))
}
}
)
})
.collect();
debug!(
n_query_chunks = query_chunks.len(),
@ -846,7 +824,9 @@ impl Compactor {
/// Given a list of parquet files that come from the same Table Partition, group files together
/// if their (min_time, max_time) ranges overlap. Does not preserve or guarantee any ordering.
fn overlapped_groups(mut parquet_files: Vec<ParquetFile>) -> Vec<GroupWithMinTimeAndSize> {
fn overlapped_groups(
mut parquet_files: Vec<ParquetFileWithMetadata>,
) -> Vec<GroupWithMinTimeAndSize> {
let mut groups = Vec::with_capacity(parquet_files.len());
// While there are still files not in any group
@ -978,7 +958,7 @@ impl Compactor {
async fn add_tombstones_to_groups(
&self,
groups: Vec<Vec<ParquetFile>>,
groups: Vec<Vec<ParquetFileWithMetadata>>,
) -> Result<Vec<GroupWithTombstones>> {
let mut repo = self.catalog.repositories().await;
let tombstone_repo = repo.tombstones();
@ -1158,7 +1138,9 @@ mod tests {
.await
.unwrap();
// should have 2 non-deleted level_0 files. The original file was marked deleted and not counted
let files = catalog.list_by_table_not_to_delete(table.table.id).await;
let mut files = catalog
.list_by_table_not_to_delete_with_metadata(table.table.id)
.await;
assert_eq!(files.len(), 2);
// 2 newly created level-1 files as the result of compaction
assert_eq!((files[0].id.get(), files[0].compaction_level), (2, 1));
@ -1185,16 +1167,10 @@ mod tests {
catalog.time_provider(),
);
// create chunks for 2 files
let chunk_0_metadata = catalog.parquet_metadata(files[0].id).await;
let chunk_0 = adapter
.new_querier_chunk(files[0], chunk_0_metadata)
.await
.unwrap();
let chunk_1_metadata = catalog.parquet_metadata(files[1].id).await;
let chunk_1 = adapter
.new_querier_chunk(files[1], chunk_1_metadata)
.await
.unwrap();
let files1 = files.pop().unwrap();
let files0 = files.pop().unwrap();
let chunk_0 = adapter.new_querier_chunk(files0).await.unwrap();
let chunk_1 = adapter.new_querier_chunk(files1).await.unwrap();
// query the chunks
// least recent compacted first half (~90%)
let batches = collect_read_filter(&chunk_0).await;
@ -1280,7 +1256,7 @@ mod tests {
// parquet files
// pf1 does not overlap with any and very large ==> will be upgraded to level 1 during compaction
let _pf1 = partition
partition
.create_parquet_file_with_min_max_size_and_creation_time(
&lp1,
1,
@ -1290,10 +1266,9 @@ mod tests {
compactor.config.compaction_max_size_bytes() + 10,
20,
)
.await
.parquet_file;
.await;
// pf2 overlaps with pf3 ==> compacted and marked to_delete with a timestamp
let _pf2 = partition
partition
.create_parquet_file_with_min_max_and_creation_time(
&lp2,
4,
@ -1302,10 +1277,9 @@ mod tests {
20000,
time.now().timestamp_nanos(),
)
.await
.parquet_file;
.await;
// pf3 overlaps with pf2 ==> compacted and marked to_delete with a timestamp
let _pf3 = partition
partition
.create_parquet_file_with_min_max_and_creation_time(
&lp3,
8,
@ -1314,10 +1288,9 @@ mod tests {
25000,
time.now().timestamp_nanos(),
)
.await
.parquet_file;
.await;
// pf4 does not overlap with any but small => will also be compacted with pf2 and pf3
let _pf4 = partition
partition
.create_parquet_file_with_min_max_and_creation_time(
&lp4,
18,
@ -1326,8 +1299,7 @@ mod tests {
28000,
time.now().timestamp_nanos(),
)
.await
.parquet_file;
.await;
// should have 4 level-0 files before compacting
let count = catalog.count_level_0_files(sequencer.sequencer.id).await;
assert_eq!(count, 4);
@ -1370,7 +1342,9 @@ mod tests {
.unwrap();
// Should have 3 non-soft-deleted files: pf1 not compacted and stay, and 2 newly created after compacting pf2, pf3, pf4
let files = catalog.list_by_table_not_to_delete(table.table.id).await;
let mut files = catalog
.list_by_table_not_to_delete_with_metadata(table.table.id)
.await;
assert_eq!(files.len(), 3);
// pf1 upgraded to level 1
assert_eq!((files[0].id.get(), files[0].compaction_level), (1, 1));
@ -1401,16 +1375,10 @@ mod tests {
catalog.time_provider(),
);
// create chunks for 2 files
let chunk_0_metadata = catalog.parquet_metadata(files[1].id).await;
let chunk_0 = adapter
.new_querier_chunk(files[1], chunk_0_metadata)
.await
.unwrap();
let chunk_1_metadata = catalog.parquet_metadata(files[2].id).await;
let chunk_1 = adapter
.new_querier_chunk(files[2], chunk_1_metadata)
.await
.unwrap();
let files2 = files.pop().unwrap();
let files1 = files.pop().unwrap();
let chunk_0 = adapter.new_querier_chunk(files1).await.unwrap();
let chunk_1 = adapter.new_querier_chunk(files2).await.unwrap();
// query the chunks
// least recent compacted first half (~90%)
let batches = collect_read_filter(&chunk_0).await;
@ -1735,7 +1703,7 @@ mod tests {
/// A test utility function to make minimially-viable ParquetFile records with particular
/// min/max times. Does not involve the catalog at all.
fn arbitrary_parquet_file(min_time: i64, max_time: i64) -> ParquetFile {
fn arbitrary_parquet_file(min_time: i64, max_time: i64) -> ParquetFileWithMetadata {
arbitrary_parquet_file_with_size(min_time, max_time, 100)
}
@ -1743,8 +1711,8 @@ mod tests {
min_time: i64,
max_time: i64,
file_size_bytes: i64,
) -> ParquetFile {
ParquetFile {
) -> ParquetFileWithMetadata {
ParquetFileWithMetadata {
id: ParquetFileId::new(0),
sequencer_id: SequencerId::new(0),
namespace_id: NamespaceId::new(0),
@ -1757,6 +1725,7 @@ mod tests {
max_time: Timestamp::new(max_time),
to_delete: None,
file_size_bytes,
parquet_metadata: vec![],
row_count: 0,
compaction_level: INITIAL_COMPACTION_LEVEL, // level of file of new writes
created_at: Timestamp::new(1),
@ -1802,22 +1771,18 @@ mod tests {
data: Arc::new(pf1),
tombstones: vec![],
};
let pt1_metadata = catalog.parquet_metadata(pt1.data.id).await;
let pt2 = ParquetFileWithTombstone {
data: Arc::new(pf2),
tombstones: vec![],
};
let pt2_metadata = catalog.parquet_metadata(pt2.data.id).await;
let pc1 = pt1.to_queryable_parquet_chunk(
Arc::clone(&catalog.object_store),
table.table.name.clone(),
pt1_metadata,
);
let pc2 = pt2.to_queryable_parquet_chunk(
Arc::clone(&catalog.object_store),
table.table.name.clone(),
pt2_metadata,
);
// Vector of chunks
@ -1841,7 +1806,7 @@ mod tests {
let pf1 = arbitrary_parquet_file(1, 2);
let pf2 = arbitrary_parquet_file(3, 4);
let groups = Compactor::overlapped_groups(vec![pf1, pf2]);
let groups = Compactor::overlapped_groups(vec![pf1.clone(), pf2.clone()]);
// They should be 2 groups
assert_eq!(groups.len(), 2, "There should have been two group");
@ -1856,7 +1821,7 @@ mod tests {
let pf1 = arbitrary_parquet_file(1, 3);
let pf2 = arbitrary_parquet_file(2, 4);
let groups = Compactor::overlapped_groups(vec![pf1, pf2]);
let groups = Compactor::overlapped_groups(vec![pf1.clone(), pf2.clone()]);
// They should be in one group (order not guaranteed)
assert_eq!(groups.len(), 1, "There should have only been one group");
@ -1885,13 +1850,13 @@ mod tests {
// Given a bunch of files in an arbitrary order,
let all = vec![
min_equals_max,
overlaps_many,
alone,
another,
max_equals_min,
contained_completely_within,
partial_overlap,
min_equals_max.clone(),
overlaps_many.clone(),
alone.clone(),
another.clone(),
max_equals_min.clone(),
contained_completely_within.clone(),
partial_overlap.clone(),
];
let mut groups = Compactor::overlapped_groups(all);
@ -1955,16 +1920,16 @@ mod tests {
let pf1 = arbitrary_parquet_file_with_size(1, 2, 100);
let pf2 = arbitrary_parquet_file_with_size(3, 4, 200);
let overlapped_groups = Compactor::overlapped_groups(vec![pf1, pf2]);
let overlapped_groups = Compactor::overlapped_groups(vec![pf1.clone(), pf2.clone()]);
// 2 overlapped groups
assert_eq!(overlapped_groups.len(), 2);
let g1 = GroupWithMinTimeAndSize {
parquet_files: vec![pf1],
parquet_files: vec![pf1.clone()],
min_time: Timestamp::new(1),
total_file_size_bytes: 100,
};
let g2 = GroupWithMinTimeAndSize {
parquet_files: vec![pf2],
parquet_files: vec![pf2.clone()],
min_time: Timestamp::new(3),
total_file_size_bytes: 200,
};
@ -1988,16 +1953,16 @@ mod tests {
let pf1 = arbitrary_parquet_file_with_size(1, 2, 100);
let pf2 = arbitrary_parquet_file_with_size(3, 4, compaction_max_size_bytes); // too large to group
let overlapped_groups = Compactor::overlapped_groups(vec![pf1, pf2]);
let overlapped_groups = Compactor::overlapped_groups(vec![pf1.clone(), pf2.clone()]);
// 2 overlapped groups
assert_eq!(overlapped_groups.len(), 2);
let g1 = GroupWithMinTimeAndSize {
parquet_files: vec![pf1],
parquet_files: vec![pf1.clone()],
min_time: Timestamp::new(1),
total_file_size_bytes: 100,
};
let g2 = GroupWithMinTimeAndSize {
parquet_files: vec![pf2],
parquet_files: vec![pf2.clone()],
min_time: Timestamp::new(3),
total_file_size_bytes: compaction_max_size_bytes,
};
@ -2031,13 +1996,13 @@ mod tests {
// Given a bunch of files in an arbitrary order,
let all = vec![
min_equals_max,
overlaps_many,
alone,
another,
max_equals_min,
contained_completely_within,
partial_overlap,
min_equals_max.clone(),
overlaps_many.clone(),
alone.clone(),
another.clone(),
max_equals_min.clone(),
contained_completely_within.clone(),
partial_overlap.clone(),
];
// Group into overlapped groups
@ -2082,13 +2047,13 @@ mod tests {
// Given a bunch of files in an arbitrary order
let all = vec![
min_equals_max,
overlaps_many,
alone,
another,
max_equals_min,
contained_completely_within,
partial_overlap,
min_equals_max.clone(),
overlaps_many.clone(),
alone.clone(),
another.clone(),
max_equals_min.clone(),
contained_completely_within.clone(),
partial_overlap.clone(),
];
// Group into overlapped groups
@ -2134,13 +2099,13 @@ mod tests {
// Given a bunch of files in an arbitrary order
let all = vec![
min_equals_max,
overlaps_many,
alone,
another,
max_equals_min,
contained_completely_within,
partial_overlap,
min_equals_max.clone(),
overlaps_many.clone(),
alone.clone(),
another.clone(),
max_equals_min.clone(),
contained_completely_within.clone(),
partial_overlap.clone(),
];
// Group into overlapped groups
@ -2238,9 +2203,13 @@ mod tests {
..p1.clone()
};
let pf1 = txn.parquet_files().create(p1).await.unwrap();
let pf1_metadata = txn.parquet_files().parquet_metadata(pf1.id).await.unwrap();
let pf1 = ParquetFileWithMetadata::new(pf1, pf1_metadata);
let pf2 = txn.parquet_files().create(p2).await.unwrap();
let pf2_metadata = txn.parquet_files().parquet_metadata(pf2.id).await.unwrap();
let pf2 = ParquetFileWithMetadata::new(pf2, pf2_metadata);
let parquet_files = vec![pf1, pf2];
let parquet_files = vec![pf1.clone(), pf2.clone()];
let groups = vec![
vec![], // empty group should get filtered out
parquet_files,

View File

@ -3,7 +3,7 @@
use crate::query::QueryableParquetChunk;
use arrow::record_batch::RecordBatch;
use data_types2::{
ParquetFile, ParquetFileId, ParquetFileParams, Timestamp, Tombstone, TombstoneId,
ParquetFileId, ParquetFileParams, ParquetFileWithMetadata, Timestamp, Tombstone, TombstoneId,
};
use iox_object_store::IoxObjectStore;
use object_store::DynObjectStore;
@ -37,8 +37,8 @@ impl GroupWithTombstones {
/// Wrapper of group of parquet files with their min time and total size
#[derive(Debug, Clone, PartialEq)]
pub struct GroupWithMinTimeAndSize {
/// Parquet files
pub(crate) parquet_files: Vec<ParquetFile>,
/// Parquet files and their metadata
pub(crate) parquet_files: Vec<ParquetFileWithMetadata>,
/// min time of all parquet_files
pub(crate) min_time: Timestamp,
@ -51,7 +51,7 @@ pub struct GroupWithMinTimeAndSize {
#[allow(missing_docs)]
#[derive(Debug, Clone)]
pub struct ParquetFileWithTombstone {
pub(crate) data: Arc<ParquetFile>,
pub(crate) data: Arc<ParquetFileWithMetadata>,
pub(crate) tombstones: Vec<Tombstone>,
}
@ -89,9 +89,8 @@ impl ParquetFileWithTombstone {
&self,
object_store: Arc<DynObjectStore>,
table_name: String,
parquet_metadata_bytes: Vec<u8>,
) -> QueryableParquetChunk {
let decoded_parquet_file = DecodedParquetFile::new(*self.data, parquet_metadata_bytes);
let decoded_parquet_file = DecodedParquetFile::new((*self.data).clone());
let root_path = IoxObjectStore::root_path_for(&*object_store, self.data.object_store_id);
let iox_object_store = IoxObjectStore::existing(object_store, root_path);
let parquet_chunk = new_parquet_chunk(
@ -117,6 +116,12 @@ impl ParquetFileWithTombstone {
&self.tombstones,
)
}
/// Return iox metadata of the parquet file
pub fn iox_metadata(&self) -> IoxMetadata {
let decoded_parquet_file = DecodedParquetFile::new((*self.data).clone());
decoded_parquet_file.iox_metadata
}
}
/// Struct holding output of a compacted stream

View File

@ -754,6 +754,131 @@ pub struct ParquetFile {
pub created_at: Timestamp,
}
/// Data for a parquet file reference that has been inserted in the catalog, including the
/// `parquet_metadata` field that can be expensive to fetch.
#[derive(Debug, Clone, PartialEq, sqlx::FromRow)]
pub struct ParquetFileWithMetadata {
/// the id of the file in the catalog
pub id: ParquetFileId,
/// the sequencer that sequenced writes that went into this file
pub sequencer_id: SequencerId,
/// the namespace
pub namespace_id: NamespaceId,
/// the table
pub table_id: TableId,
/// the partition
pub partition_id: PartitionId,
/// the uuid used in the object store path for this file
pub object_store_id: Uuid,
/// the minimum sequence number from a record in this file
pub min_sequence_number: SequenceNumber,
/// the maximum sequence number from a record in this file
pub max_sequence_number: SequenceNumber,
/// the min timestamp of data in this file
pub min_time: Timestamp,
/// the max timestamp of data in this file
pub max_time: Timestamp,
/// When this file was marked for deletion
pub to_delete: Option<Timestamp>,
/// file size in bytes
pub file_size_bytes: i64,
/// thrift-encoded parquet metadata
pub parquet_metadata: Vec<u8>,
/// the number of rows of data in this file
pub row_count: i64,
/// the compaction level of the file
pub compaction_level: i16,
/// the creation time of the parquet file
pub created_at: Timestamp,
}
impl ParquetFileWithMetadata {
/// Create an instance from an instance of ParquetFile and metadata bytes fetched from the
/// catalog.
pub fn new(parquet_file: ParquetFile, parquet_metadata: Vec<u8>) -> Self {
let ParquetFile {
id,
sequencer_id,
namespace_id,
table_id,
partition_id,
object_store_id,
min_sequence_number,
max_sequence_number,
min_time,
max_time,
to_delete,
file_size_bytes,
row_count,
compaction_level,
created_at,
} = parquet_file;
Self {
id,
sequencer_id,
namespace_id,
table_id,
partition_id,
object_store_id,
min_sequence_number,
max_sequence_number,
min_time,
max_time,
to_delete,
file_size_bytes,
parquet_metadata,
row_count,
compaction_level,
created_at,
}
}
/// Split the parquet_metadata off, leaving a regular ParquetFile and the bytes to transfer
/// ownership separately.
pub fn split_off_metadata(self) -> (ParquetFile, Vec<u8>) {
let Self {
id,
sequencer_id,
namespace_id,
table_id,
partition_id,
object_store_id,
min_sequence_number,
max_sequence_number,
min_time,
max_time,
to_delete,
file_size_bytes,
parquet_metadata,
row_count,
compaction_level,
created_at,
} = self;
(
ParquetFile {
id,
sequencer_id,
namespace_id,
table_id,
partition_id,
object_store_id,
min_sequence_number,
max_sequence_number,
min_time,
max_time,
to_delete,
file_size_bytes,
row_count,
compaction_level,
created_at,
},
parquet_metadata,
)
}
}
/// Data for a parquet file to be inserted into the catalog.
#[derive(Debug, Clone, PartialEq)]
pub struct ParquetFileParams {

View File

@ -3,10 +3,10 @@
use async_trait::async_trait;
use data_types2::{
Column, ColumnSchema, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace,
NamespaceId, NamespaceSchema, ParquetFile, ParquetFileId, ParquetFileParams, Partition,
PartitionId, PartitionInfo, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber,
Sequencer, SequencerId, Table, TableId, TablePartition, TableSchema, Timestamp, Tombstone,
TombstoneId,
NamespaceId, NamespaceSchema, ParquetFile, ParquetFileId, ParquetFileParams,
ParquetFileWithMetadata, Partition, PartitionId, PartitionInfo, ProcessedTombstone, QueryPool,
QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table, TableId, TablePartition,
TableSchema, Timestamp, Tombstone, TombstoneId,
};
use snafu::{OptionExt, Snafu};
use std::{collections::BTreeMap, convert::TryFrom, fmt::Debug, sync::Arc};
@ -530,7 +530,7 @@ pub trait ParquetFileRepo: Send + Sync {
async fn list_by_table_not_to_delete_with_metadata(
&mut self,
table_id: TableId,
) -> Result<Vec<(ParquetFile, Vec<u8>)>>;
) -> Result<Vec<ParquetFileWithMetadata>>;
/// Delete all parquet files that were marked to be deleted earlier than the specified time.
/// Returns the deleted records.
@ -550,12 +550,20 @@ pub trait ParquetFileRepo: Send + Sync {
max_time: Timestamp,
) -> Result<Vec<ParquetFile>>;
/// List parquet files for a given partition that are NOT marked as [`to_delete`](ParquetFile::to_delete).
/// List parquet files for a given partition that are NOT marked as
/// [`to_delete`](ParquetFile::to_delete).
async fn list_by_partition_not_to_delete(
&mut self,
partition_id: PartitionId,
) -> Result<Vec<ParquetFile>>;
/// List parquet files and their metadata for a given partition that are NOT marked as
/// [`to_delete`](ParquetFile::to_delete). Fetching metadata can be expensive.
async fn list_by_partition_not_to_delete_with_metadata(
&mut self,
partition_id: PartitionId,
) -> Result<Vec<ParquetFileWithMetadata>>;
/// Update the compaction level of the specified parquet files to level 1. Returns the IDs
/// of the files that were successfully updated.
async fn update_to_level_1(
@ -1862,7 +1870,10 @@ pub(crate) mod test_helpers {
.list_by_table_not_to_delete_with_metadata(other_table.id)
.await
.unwrap();
assert_eq!(files, vec![(other_file, b"md1".to_vec())]);
assert_eq!(
files,
vec![ParquetFileWithMetadata::new(other_file, b"md1".to_vec())]
);
// test list_by_namespace_not_to_delete
let namespace2 = repos
@ -2480,6 +2491,19 @@ pub(crate) mod test_helpers {
.await
.unwrap();
assert_eq!(files, vec![parquet_file, level1_file]);
let files = repos
.parquet_files()
.list_by_partition_not_to_delete_with_metadata(partition.id)
.await
.unwrap();
assert_eq!(
files,
vec![
ParquetFileWithMetadata::new(parquet_file, b"md1".to_vec()),
ParquetFileWithMetadata::new(level1_file, b"md1".to_vec()),
]
);
}
async fn test_update_to_compaction_level_1(catalog: Arc<dyn Catalog>) {

View File

@ -13,9 +13,9 @@ use crate::{
use async_trait::async_trait;
use data_types2::{
Column, ColumnId, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId,
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo,
ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table,
TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
ParquetFile, ParquetFileId, ParquetFileParams, ParquetFileWithMetadata, Partition, PartitionId,
PartitionInfo, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer,
SequencerId, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
};
use observability_deps::tracing::warn;
use std::{
@ -1040,7 +1040,7 @@ impl ParquetFileRepo for MemTxn {
async fn list_by_table_not_to_delete_with_metadata(
&mut self,
table_id: TableId,
) -> Result<Vec<(ParquetFile, Vec<u8>)>> {
) -> Result<Vec<ParquetFileWithMetadata>> {
let stage = self.stage();
let parquet_files: Vec<_> = stage
@ -1049,7 +1049,7 @@ impl ParquetFileRepo for MemTxn {
.filter(|f| table_id == f.table_id && f.to_delete.is_none())
.cloned()
.map(|f| {
(
ParquetFileWithMetadata::new(
f,
stage
.parquet_file_metadata
@ -1129,6 +1129,30 @@ impl ParquetFileRepo for MemTxn {
.collect())
}
async fn list_by_partition_not_to_delete_with_metadata(
&mut self,
partition_id: PartitionId,
) -> Result<Vec<ParquetFileWithMetadata>> {
let stage = self.stage();
Ok(stage
.parquet_files
.iter()
.filter(|f| f.partition_id == partition_id && f.to_delete.is_none())
.cloned()
.map(|f| {
ParquetFileWithMetadata::new(
f,
stage
.parquet_file_metadata
.get(&f.id)
.cloned()
.unwrap_or_default(),
)
})
.collect())
}
async fn update_to_level_1(
&mut self,
parquet_file_ids: &[ParquetFileId],

View File

@ -8,9 +8,9 @@ use crate::interface::{
use async_trait::async_trait;
use data_types2::{
Column, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId,
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo,
ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table,
TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
ParquetFile, ParquetFileId, ParquetFileParams, ParquetFileWithMetadata, Partition, PartitionId,
PartitionInfo, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer,
SequencerId, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
};
use metric::{Metric, U64Histogram, U64HistogramOptions};
use std::{fmt::Debug, sync::Arc};
@ -269,9 +269,10 @@ decorate!(
"parquet_list_by_sequencer_greater_than" = list_by_sequencer_greater_than(&mut self, sequencer_id: SequencerId, sequence_number: SequenceNumber) -> Result<Vec<ParquetFile>>;
"parquet_list_by_namespace_not_to_delete" = list_by_namespace_not_to_delete(&mut self, namespace_id: NamespaceId) -> Result<Vec<ParquetFile>>;
"parquet_list_by_table_not_to_delete" = list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result<Vec<ParquetFile>>;
"parquet_list_by_table_not_to_delete_with_metadata" = list_by_table_not_to_delete_with_metadata(&mut self, table_id: TableId) -> Result<Vec<ParquetFileWithMetadata>>;
"parquet_delete_old" = delete_old(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFile>>;
"parquet_list_by_partition_not_to_delete" = list_by_partition_not_to_delete(&mut self, partition_id: PartitionId) -> Result<Vec<ParquetFile>>;
"parquet_list_by_table_not_to_delete_with_metadata" = list_by_table_not_to_delete_with_metadata(&mut self, table_id: TableId) -> Result<Vec<(ParquetFile, Vec<u8>)>>;
"parquet_list_by_partition_not_to_delete_with_metadata" = list_by_partition_not_to_delete_with_metadata(&mut self, partition_id: PartitionId) -> Result<Vec<ParquetFileWithMetadata>>;
"parquet_level_0" = level_0(&mut self, sequencer_id: SequencerId) -> Result<Vec<ParquetFile>>;
"parquet_level_1" = level_1(&mut self, table_partition: TablePartition, min_time: Timestamp, max_time: Timestamp) -> Result<Vec<ParquetFile>>;
"parquet_update_to_level_1" = update_to_level_1(&mut self, parquet_file_ids: &[ParquetFileId]) -> Result<Vec<ParquetFileId>>;

View File

@ -12,12 +12,12 @@ use crate::{
use async_trait::async_trait;
use data_types2::{
Column, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId,
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo,
ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table,
TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
ParquetFile, ParquetFileId, ParquetFileParams, ParquetFileWithMetadata, Partition, PartitionId,
PartitionInfo, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer,
SequencerId, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
};
use observability_deps::tracing::{info, warn};
use sqlx::{migrate::Migrator, postgres::PgPoolOptions, Acquire, Executor, FromRow, Postgres, Row};
use sqlx::{migrate::Migrator, postgres::PgPoolOptions, Acquire, Executor, Postgres, Row};
use sqlx_hotswap_pool::HotSwapPool;
use std::{collections::HashMap, sync::Arc, time::Duration};
use time::{SystemProvider, TimeProvider};
@ -1568,8 +1568,8 @@ WHERE table_id = $1 AND to_delete IS NULL;
async fn list_by_table_not_to_delete_with_metadata(
&mut self,
table_id: TableId,
) -> Result<Vec<(ParquetFile, Vec<u8>)>> {
let rows = sqlx::query(
) -> Result<Vec<ParquetFileWithMetadata>> {
sqlx::query_as::<_, ParquetFileWithMetadata>(
r#"
SELECT *
FROM parquet_file
@ -1579,16 +1579,7 @@ WHERE table_id = $1 AND to_delete IS NULL;
.bind(&table_id) // $1
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(rows
.into_iter()
.map(|row| {
ParquetFile::from_row(&row)
.map(|parquet_file| (parquet_file, row.get("parquet_metadata")))
.map_err(|e| Error::SqlxError { source: e })
})
.collect::<Result<Vec<_>, _>>()?)
.map_err(|e| Error::SqlxError { source: e })
}
async fn delete_old(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFile>> {
@ -1684,6 +1675,24 @@ WHERE parquet_file.partition_id = $1
.map_err(|e| Error::SqlxError { source: e })
}
async fn list_by_partition_not_to_delete_with_metadata(
&mut self,
partition_id: PartitionId,
) -> Result<Vec<ParquetFileWithMetadata>> {
sqlx::query_as::<_, ParquetFileWithMetadata>(
r#"
SELECT *
FROM parquet_file
WHERE parquet_file.partition_id = $1
AND parquet_file.to_delete IS NULL;
"#,
)
.bind(&partition_id) // $1
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn update_to_level_1(
&mut self,
parquet_file_ids: &[ParquetFileId],

View File

@ -7,8 +7,8 @@ use arrow::{
use bytes::Bytes;
use data_types2::{
Column, ColumnType, KafkaPartition, KafkaTopic, Namespace, ParquetFile, ParquetFileId,
ParquetFileParams, Partition, QueryPool, SequenceNumber, Sequencer, SequencerId, Table,
TableId, Timestamp, Tombstone, TombstoneId,
ParquetFileParams, ParquetFileWithMetadata, Partition, QueryPool, SequenceNumber, Sequencer,
SequencerId, Table, TableId, Timestamp, Tombstone, TombstoneId,
};
use iox_catalog::{
interface::{Catalog, INITIAL_COMPACTION_LEVEL},
@ -190,6 +190,20 @@ impl TestCatalog {
.unwrap()
}
/// List all non-deleted files with their metadata
pub async fn list_by_table_not_to_delete_with_metadata(
self: &Arc<Self>,
table_id: TableId,
) -> Vec<ParquetFileWithMetadata> {
self.catalog
.repositories()
.await
.parquet_files()
.list_by_table_not_to_delete_with_metadata(table_id)
.await
.unwrap()
}
/// Get a parquet file's metadata bytes
pub async fn parquet_metadata(&self, parquet_file_id: ParquetFileId) -> Vec<u8> {
self.catalog
@ -384,7 +398,7 @@ pub struct TestPartition {
impl TestPartition {
/// Create a parquet for the partition
pub async fn create_parquet_file(self: &Arc<Self>, lp: &str) -> Arc<TestParquetFile> {
pub async fn create_parquet_file(self: &Arc<Self>, lp: &str) -> TestParquetFile {
self.create_parquet_file_with_min_max(
lp,
1,
@ -403,7 +417,7 @@ impl TestPartition {
max_seq: i64,
min_time: i64,
max_time: i64,
) -> Arc<TestParquetFile> {
) -> TestParquetFile {
self.create_parquet_file_with_min_max_and_creation_time(
lp, min_seq, max_seq, min_time, max_time, 1,
)
@ -419,7 +433,7 @@ impl TestPartition {
min_time: i64,
max_time: i64,
creation_time: i64,
) -> Arc<TestParquetFile> {
) -> TestParquetFile {
let mut repos = self.catalog.catalog.repositories().await;
let (table, batch) = lp_to_mutable_batch(lp);
@ -462,7 +476,7 @@ impl TestPartition {
min_time: Timestamp::new(min_time),
max_time: Timestamp::new(max_time),
file_size_bytes: real_file_size_bytes as i64,
parquet_metadata: parquet_metadata_bin,
parquet_metadata: parquet_metadata_bin.clone(),
row_count: row_count as i64,
created_at: Timestamp::new(creation_time),
compaction_level: INITIAL_COMPACTION_LEVEL,
@ -473,11 +487,13 @@ impl TestPartition {
.await
.unwrap();
Arc::new(TestParquetFile {
let parquet_file = ParquetFileWithMetadata::new(parquet_file, parquet_metadata_bin);
TestParquetFile {
catalog: Arc::clone(&self.catalog),
namespace: Arc::clone(&self.namespace),
parquet_file,
})
}
}
/// Create a parquet for the partition with fake sizew for testing
@ -491,7 +507,7 @@ impl TestPartition {
max_time: i64,
file_size_bytes: i64,
creation_time: i64,
) -> Arc<TestParquetFile> {
) -> TestParquetFile {
let mut repos = self.catalog.catalog.repositories().await;
let (table, batch) = lp_to_mutable_batch(lp);
@ -534,7 +550,7 @@ impl TestPartition {
min_time: Timestamp::new(min_time),
max_time: Timestamp::new(max_time),
file_size_bytes,
parquet_metadata: parquet_metadata_bin,
parquet_metadata: parquet_metadata_bin.clone(),
row_count: row_count as i64,
created_at: Timestamp::new(creation_time),
compaction_level: INITIAL_COMPACTION_LEVEL,
@ -545,11 +561,13 @@ impl TestPartition {
.await
.unwrap();
Arc::new(TestParquetFile {
let parquet_file = ParquetFileWithMetadata::new(parquet_file, parquet_metadata_bin);
TestParquetFile {
catalog: Arc::clone(&self.catalog),
namespace: Arc::clone(&self.namespace),
parquet_file,
})
}
}
}
@ -601,12 +619,12 @@ async fn create_parquet_file(
pub struct TestParquetFile {
pub catalog: Arc<TestCatalog>,
pub namespace: Arc<TestNamespace>,
pub parquet_file: ParquetFile,
pub parquet_file: ParquetFileWithMetadata,
}
impl TestParquetFile {
/// Make the parquet file deletable
pub async fn flag_for_delete(self: &Arc<Self>) {
pub async fn flag_for_delete(&self) {
let mut repos = self.catalog.catalog.repositories().await;
repos
@ -615,6 +633,11 @@ impl TestParquetFile {
.await
.unwrap()
}
/// When only the ParquetFile is needed without the metadata, use this instead of the field
pub fn parquet_file_no_metadata(self) -> ParquetFile {
self.parquet_file.split_off_metadata().0
}
}
/// A catalog test tombstone
@ -627,7 +650,7 @@ pub struct TestTombstone {
impl TestTombstone {
/// mark the tombstone proccesed
pub async fn mark_processed(self: &Arc<Self>, parquet_file: &Arc<TestParquetFile>) {
pub async fn mark_processed(self: &Arc<Self>, parquet_file: &TestParquetFile) {
assert!(Arc::ptr_eq(&self.catalog, &parquet_file.catalog));
assert!(Arc::ptr_eq(&self.namespace, &parquet_file.namespace));

View File

@ -6,7 +6,7 @@ use data_types::{
partition_metadata::{Statistics, TableSummary},
timestamp::{TimestampMinMax, TimestampRange},
};
use data_types2::ParquetFile;
use data_types2::{ParquetFile, ParquetFileWithMetadata};
use datafusion::physical_plan::SendableRecordBatchStream;
use iox_object_store::{IoxObjectStore, ParquetFilePath};
use observability_deps::tracing::*;
@ -280,10 +280,9 @@ pub struct DecodedParquetFile {
}
impl DecodedParquetFile {
pub fn new(parquet_file: ParquetFile, parquet_metadata_bytes: Vec<u8>) -> Self {
let parquet_metadata = Arc::new(IoxParquetMetaData::from_thrift_bytes(
parquet_metadata_bytes,
));
pub fn new(parquet_file_with_metadata: ParquetFileWithMetadata) -> Self {
let (parquet_file, parquet_metadata) = parquet_file_with_metadata.split_off_metadata();
let parquet_metadata = Arc::new(IoxParquetMetaData::from_thrift_bytes(parquet_metadata));
let decoded_metadata = parquet_metadata.decode().expect("parquet metadata broken");
let iox_metadata = decoded_metadata
.read_iox_metadata_new()

View File

@ -3,8 +3,8 @@
use crate::cache::CatalogCache;
use arrow::record_batch::RecordBatch;
use data_types2::{
ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, ParquetFile, ParquetFileId, SequenceNumber,
SequencerId,
ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, ParquetFile, ParquetFileId,
ParquetFileWithMetadata, SequenceNumber, SequencerId,
};
use futures::StreamExt;
use iox_catalog::interface::Catalog;
@ -214,10 +214,9 @@ impl ParquetChunkAdapter {
/// Returns `None` if some data required to create this chunk is already gone from the catalog.
pub async fn new_querier_chunk(
&self,
parquet_file: ParquetFile,
parquet_metadata_bytes: Vec<u8>,
parquet_file_with_metadata: ParquetFileWithMetadata,
) -> Option<QuerierChunk> {
let decoded_parquet_file = DecodedParquetFile::new(parquet_file, parquet_metadata_bytes);
let decoded_parquet_file = DecodedParquetFile::new(parquet_file_with_metadata);
let chunk = Arc::new(self.new_parquet_chunk(&decoded_parquet_file).await?);
let addr = self
@ -341,11 +340,7 @@ pub mod tests {
.parquet_file;
// create chunk
let parquet_metadata = catalog.parquet_metadata(parquet_file.id).await;
let chunk = adapter
.new_querier_chunk(parquet_file, parquet_metadata)
.await
.unwrap();
let chunk = adapter.new_querier_chunk(parquet_file).await.unwrap();
// check chunk addr
assert_eq!(

View File

@ -132,10 +132,10 @@ impl QuerierTable {
// convert parquet files and tombstones to nicer objects
let mut chunks = Vec::with_capacity(parquet_files.len());
for (parquet_file, parquet_metadata) in parquet_files {
for parquet_file_with_metadata in parquet_files {
if let Some(chunk) = self
.chunk_adapter
.new_querier_chunk(parquet_file, parquet_metadata)
.new_querier_chunk(parquet_file_with_metadata)
.await
{
chunks.push(chunk);