Merge pull request #4262 from influxdata/cn/parquet-metadata-on-demand
feat: Store parquet_metadata in catalog, but don't fetch it by defaultpull/24376/head
commit
e46387db5f
|
@ -10,8 +10,8 @@ use arrow::record_batch::RecordBatch;
|
|||
use backoff::{Backoff, BackoffConfig};
|
||||
use bytes::Bytes;
|
||||
use data_types2::{
|
||||
ParquetFile, ParquetFileId, PartitionId, SequencerId, TableId, TablePartition, Timestamp,
|
||||
Tombstone, TombstoneId,
|
||||
ParquetFile, ParquetFileId, ParquetFileWithMetadata, PartitionId, SequencerId, TableId,
|
||||
TablePartition, Timestamp, Tombstone, TombstoneId,
|
||||
};
|
||||
use datafusion::error::DataFusionError;
|
||||
use iox_catalog::interface::{Catalog, Transaction};
|
||||
|
@ -124,6 +124,11 @@ pub enum Error {
|
|||
source: iox_catalog::interface::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Error while requesting parquet file metadata {}", source))]
|
||||
ParquetMetadata {
|
||||
source: iox_catalog::interface::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Error converting the parquet stream to bytes: {}", source))]
|
||||
ConvertingToBytes {
|
||||
source: parquet_file::storage::Error,
|
||||
|
@ -371,7 +376,7 @@ impl Compactor {
|
|||
.repositories()
|
||||
.await
|
||||
.parquet_files()
|
||||
.list_by_partition_not_to_delete(partition_id)
|
||||
.list_by_partition_not_to_delete_with_metadata(partition_id)
|
||||
.await
|
||||
.context(ListParquetFilesSnafu)?;
|
||||
if parquet_files.is_empty() {
|
||||
|
@ -483,7 +488,7 @@ impl Compactor {
|
|||
fn group_small_contiguous_groups(
|
||||
mut file_groups: Vec<GroupWithMinTimeAndSize>,
|
||||
compaction_max_size_bytes: i64,
|
||||
) -> Vec<Vec<ParquetFile>> {
|
||||
) -> Vec<Vec<ParquetFileWithMetadata>> {
|
||||
let mut groups = Vec::with_capacity(file_groups.len());
|
||||
if file_groups.is_empty() {
|
||||
return groups;
|
||||
|
@ -550,7 +555,7 @@ impl Compactor {
|
|||
return Ok(compacted);
|
||||
}
|
||||
|
||||
// Keep the fist IoxMetadata to reuse same IDs and names
|
||||
// Save the parquet metadata for the first file to reuse IDs and names
|
||||
let iox_metadata = overlapped_files[0].iox_metadata();
|
||||
|
||||
// Collect all unique tombstone
|
||||
|
@ -819,7 +824,9 @@ impl Compactor {
|
|||
|
||||
/// Given a list of parquet files that come from the same Table Partition, group files together
|
||||
/// if their (min_time, max_time) ranges overlap. Does not preserve or guarantee any ordering.
|
||||
fn overlapped_groups(mut parquet_files: Vec<ParquetFile>) -> Vec<GroupWithMinTimeAndSize> {
|
||||
fn overlapped_groups(
|
||||
mut parquet_files: Vec<ParquetFileWithMetadata>,
|
||||
) -> Vec<GroupWithMinTimeAndSize> {
|
||||
let mut groups = Vec::with_capacity(parquet_files.len());
|
||||
|
||||
// While there are still files not in any group
|
||||
|
@ -951,7 +958,7 @@ impl Compactor {
|
|||
|
||||
async fn add_tombstones_to_groups(
|
||||
&self,
|
||||
groups: Vec<Vec<ParquetFile>>,
|
||||
groups: Vec<Vec<ParquetFileWithMetadata>>,
|
||||
) -> Result<Vec<GroupWithTombstones>> {
|
||||
let mut repo = self.catalog.repositories().await;
|
||||
let tombstone_repo = repo.tombstones();
|
||||
|
@ -1131,7 +1138,9 @@ mod tests {
|
|||
.await
|
||||
.unwrap();
|
||||
// should have 2 non-deleted level_0 files. The original file was marked deleted and not counted
|
||||
let files = catalog.list_by_table_not_to_delete(table.table.id).await;
|
||||
let mut files = catalog
|
||||
.list_by_table_not_to_delete_with_metadata(table.table.id)
|
||||
.await;
|
||||
assert_eq!(files.len(), 2);
|
||||
// 2 newly created level-1 files as the result of compaction
|
||||
assert_eq!((files[0].id.get(), files[0].compaction_level), (2, 1));
|
||||
|
@ -1158,8 +1167,10 @@ mod tests {
|
|||
catalog.time_provider(),
|
||||
);
|
||||
// create chunks for 2 files
|
||||
let chunk_0 = adapter.new_querier_chunk(files[0].clone()).await.unwrap();
|
||||
let chunk_1 = adapter.new_querier_chunk(files[1].clone()).await.unwrap();
|
||||
let files1 = files.pop().unwrap();
|
||||
let files0 = files.pop().unwrap();
|
||||
let chunk_0 = adapter.new_querier_chunk(files0).await.unwrap();
|
||||
let chunk_1 = adapter.new_querier_chunk(files1).await.unwrap();
|
||||
// query the chunks
|
||||
// least recent compacted first half (~90%)
|
||||
let batches = collect_read_filter(&chunk_0).await;
|
||||
|
@ -1245,7 +1256,7 @@ mod tests {
|
|||
|
||||
// parquet files
|
||||
// pf1 does not overlap with any and very large ==> will be upgraded to level 1 during compaction
|
||||
let _pf1 = partition
|
||||
partition
|
||||
.create_parquet_file_with_min_max_size_and_creation_time(
|
||||
&lp1,
|
||||
1,
|
||||
|
@ -1255,11 +1266,9 @@ mod tests {
|
|||
compactor.config.compaction_max_size_bytes() + 10,
|
||||
20,
|
||||
)
|
||||
.await
|
||||
.parquet_file
|
||||
.clone();
|
||||
.await;
|
||||
// pf2 overlaps with pf3 ==> compacted and marked to_delete with a timestamp
|
||||
let _pf2 = partition
|
||||
partition
|
||||
.create_parquet_file_with_min_max_and_creation_time(
|
||||
&lp2,
|
||||
4,
|
||||
|
@ -1268,11 +1277,9 @@ mod tests {
|
|||
20000,
|
||||
time.now().timestamp_nanos(),
|
||||
)
|
||||
.await
|
||||
.parquet_file
|
||||
.clone();
|
||||
.await;
|
||||
// pf3 overlaps with pf2 ==> compacted and marked to_delete with a timestamp
|
||||
let _pf3 = partition
|
||||
partition
|
||||
.create_parquet_file_with_min_max_and_creation_time(
|
||||
&lp3,
|
||||
8,
|
||||
|
@ -1281,11 +1288,9 @@ mod tests {
|
|||
25000,
|
||||
time.now().timestamp_nanos(),
|
||||
)
|
||||
.await
|
||||
.parquet_file
|
||||
.clone();
|
||||
.await;
|
||||
// pf4 does not overlap with any but small => will also be compacted with pf2 and pf3
|
||||
let _pf4 = partition
|
||||
partition
|
||||
.create_parquet_file_with_min_max_and_creation_time(
|
||||
&lp4,
|
||||
18,
|
||||
|
@ -1294,9 +1299,7 @@ mod tests {
|
|||
28000,
|
||||
time.now().timestamp_nanos(),
|
||||
)
|
||||
.await
|
||||
.parquet_file
|
||||
.clone();
|
||||
.await;
|
||||
// should have 4 level-0 files before compacting
|
||||
let count = catalog.count_level_0_files(sequencer.sequencer.id).await;
|
||||
assert_eq!(count, 4);
|
||||
|
@ -1339,7 +1342,9 @@ mod tests {
|
|||
.unwrap();
|
||||
|
||||
// Should have 3 non-soft-deleted files: pf1 not compacted and stay, and 2 newly created after compacting pf2, pf3, pf4
|
||||
let files = catalog.list_by_table_not_to_delete(table.table.id).await;
|
||||
let mut files = catalog
|
||||
.list_by_table_not_to_delete_with_metadata(table.table.id)
|
||||
.await;
|
||||
assert_eq!(files.len(), 3);
|
||||
// pf1 upgraded to level 1
|
||||
assert_eq!((files[0].id.get(), files[0].compaction_level), (1, 1));
|
||||
|
@ -1370,8 +1375,10 @@ mod tests {
|
|||
catalog.time_provider(),
|
||||
);
|
||||
// create chunks for 2 files
|
||||
let chunk_0 = adapter.new_querier_chunk(files[1].clone()).await.unwrap();
|
||||
let chunk_1 = adapter.new_querier_chunk(files[2].clone()).await.unwrap();
|
||||
let files2 = files.pop().unwrap();
|
||||
let files1 = files.pop().unwrap();
|
||||
let chunk_0 = adapter.new_querier_chunk(files1).await.unwrap();
|
||||
let chunk_1 = adapter.new_querier_chunk(files2).await.unwrap();
|
||||
// query the chunks
|
||||
// least recent compacted first half (~90%)
|
||||
let batches = collect_read_filter(&chunk_0).await;
|
||||
|
@ -1422,8 +1429,7 @@ mod tests {
|
|||
.await
|
||||
.create_parquet_file_with_min_max(&lp, 1, 1, 8000, 20000)
|
||||
.await
|
||||
.parquet_file
|
||||
.clone();
|
||||
.parquet_file;
|
||||
|
||||
let compactor = Compactor::new(
|
||||
vec![sequencer.sequencer.id],
|
||||
|
@ -1516,13 +1522,11 @@ mod tests {
|
|||
let parquet_file1 = partition
|
||||
.create_parquet_file_with_min_max(&lp1, 1, 5, 8000, 20000)
|
||||
.await
|
||||
.parquet_file
|
||||
.clone();
|
||||
.parquet_file;
|
||||
let parquet_file2 = partition
|
||||
.create_parquet_file_with_min_max(&lp2, 10, 15, 6000, 25000)
|
||||
.await
|
||||
.parquet_file
|
||||
.clone();
|
||||
.parquet_file;
|
||||
|
||||
let compactor = Compactor::new(
|
||||
vec![sequencer.sequencer.id],
|
||||
|
@ -1618,18 +1622,15 @@ mod tests {
|
|||
let parquet_file1 = partition
|
||||
.create_parquet_file_with_min_max(&lp1, 1, 5, 8000, 20000)
|
||||
.await
|
||||
.parquet_file
|
||||
.clone();
|
||||
.parquet_file;
|
||||
let parquet_file2 = partition
|
||||
.create_parquet_file_with_min_max(&lp2, 10, 15, 6000, 25000)
|
||||
.await
|
||||
.parquet_file
|
||||
.clone();
|
||||
.parquet_file;
|
||||
let parquet_file3 = partition
|
||||
.create_parquet_file_with_min_max(&lp3, 20, 25, 6000, 8000)
|
||||
.await
|
||||
.parquet_file
|
||||
.clone();
|
||||
.parquet_file;
|
||||
|
||||
let compactor = Compactor::new(
|
||||
vec![sequencer.sequencer.id],
|
||||
|
@ -1702,7 +1703,7 @@ mod tests {
|
|||
|
||||
/// A test utility function to make minimially-viable ParquetFile records with particular
|
||||
/// min/max times. Does not involve the catalog at all.
|
||||
fn arbitrary_parquet_file(min_time: i64, max_time: i64) -> ParquetFile {
|
||||
fn arbitrary_parquet_file(min_time: i64, max_time: i64) -> ParquetFileWithMetadata {
|
||||
arbitrary_parquet_file_with_size(min_time, max_time, 100)
|
||||
}
|
||||
|
||||
|
@ -1710,8 +1711,8 @@ mod tests {
|
|||
min_time: i64,
|
||||
max_time: i64,
|
||||
file_size_bytes: i64,
|
||||
) -> ParquetFile {
|
||||
ParquetFile {
|
||||
) -> ParquetFileWithMetadata {
|
||||
ParquetFileWithMetadata {
|
||||
id: ParquetFileId::new(0),
|
||||
sequencer_id: SequencerId::new(0),
|
||||
namespace_id: NamespaceId::new(0),
|
||||
|
@ -1759,13 +1760,11 @@ mod tests {
|
|||
let pf1 = partition
|
||||
.create_parquet_file_with_min_max(&lp1, 1, 5, 8000, 20000)
|
||||
.await
|
||||
.parquet_file
|
||||
.clone();
|
||||
.parquet_file;
|
||||
let pf2 = partition
|
||||
.create_parquet_file_with_min_max(&lp2, 1, 5, 28000, 35000)
|
||||
.await
|
||||
.parquet_file
|
||||
.clone();
|
||||
.parquet_file;
|
||||
|
||||
// Build 2 QueryableParquetChunks
|
||||
let pt1 = ParquetFileWithTombstone {
|
||||
|
@ -1776,6 +1775,7 @@ mod tests {
|
|||
data: Arc::new(pf2),
|
||||
tombstones: vec![],
|
||||
};
|
||||
|
||||
let pc1 = pt1.to_queryable_parquet_chunk(
|
||||
Arc::clone(&catalog.object_store),
|
||||
table.table.name.clone(),
|
||||
|
@ -2203,7 +2203,11 @@ mod tests {
|
|||
..p1.clone()
|
||||
};
|
||||
let pf1 = txn.parquet_files().create(p1).await.unwrap();
|
||||
let pf1_metadata = txn.parquet_files().parquet_metadata(pf1.id).await.unwrap();
|
||||
let pf1 = ParquetFileWithMetadata::new(pf1, pf1_metadata);
|
||||
let pf2 = txn.parquet_files().create(p2).await.unwrap();
|
||||
let pf2_metadata = txn.parquet_files().parquet_metadata(pf2.id).await.unwrap();
|
||||
let pf2 = ParquetFileWithMetadata::new(pf2, pf2_metadata);
|
||||
|
||||
let parquet_files = vec![pf1.clone(), pf2.clone()];
|
||||
let groups = vec![
|
||||
|
|
|
@ -4,11 +4,11 @@ use std::sync::Arc;
|
|||
|
||||
use data_types2::{
|
||||
tombstones_to_delete_predicates, ChunkAddr, ChunkId, ChunkOrder, DeletePredicate,
|
||||
SequenceNumber, TableSummary, Tombstone,
|
||||
SequenceNumber, TableSummary, Timestamp, Tombstone,
|
||||
};
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use observability_deps::tracing::trace;
|
||||
use parquet_file::{chunk::ParquetChunk, metadata::IoxMetadata};
|
||||
use parquet_file::chunk::ParquetChunk;
|
||||
use predicate::{Predicate, PredicateMatch};
|
||||
use query::{
|
||||
exec::{stringset::StringSet, IOxSessionContext},
|
||||
|
@ -39,9 +39,12 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
#[derive(Debug, Clone)]
|
||||
pub struct QueryableParquetChunk {
|
||||
data: Arc<ParquetChunk>, // data of the parquet file
|
||||
iox_metadata: Arc<IoxMetadata>, // metadata of the parquet file
|
||||
delete_predicates: Vec<Arc<DeletePredicate>>, // converted from tombstones
|
||||
table_name: String, // needed to build query plan
|
||||
min_sequence_number: SequenceNumber,
|
||||
max_sequence_number: SequenceNumber,
|
||||
min_time: Timestamp,
|
||||
max_time: Timestamp,
|
||||
}
|
||||
|
||||
impl QueryableParquetChunk {
|
||||
|
@ -49,15 +52,21 @@ impl QueryableParquetChunk {
|
|||
pub fn new(
|
||||
table_name: impl Into<String>,
|
||||
data: Arc<ParquetChunk>,
|
||||
iox_metadata: Arc<IoxMetadata>,
|
||||
deletes: &[Tombstone],
|
||||
min_sequence_number: SequenceNumber,
|
||||
max_sequence_number: SequenceNumber,
|
||||
min_time: Timestamp,
|
||||
max_time: Timestamp,
|
||||
) -> Self {
|
||||
let delete_predicates = tombstones_to_delete_predicates(deletes);
|
||||
Self {
|
||||
data,
|
||||
iox_metadata,
|
||||
delete_predicates,
|
||||
table_name: table_name.into(),
|
||||
min_sequence_number,
|
||||
max_sequence_number,
|
||||
min_time,
|
||||
max_time,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -72,22 +81,22 @@ impl QueryableParquetChunk {
|
|||
|
||||
/// Return min sequence number
|
||||
pub fn min_sequence_number(&self) -> SequenceNumber {
|
||||
self.iox_metadata.min_sequence_number
|
||||
self.min_sequence_number
|
||||
}
|
||||
|
||||
/// Return max sequence number
|
||||
pub fn max_sequence_number(&self) -> SequenceNumber {
|
||||
self.iox_metadata.max_sequence_number
|
||||
self.max_sequence_number
|
||||
}
|
||||
|
||||
/// Return min time
|
||||
pub fn min_time(&self) -> i64 {
|
||||
self.iox_metadata.time_of_first_write.timestamp_nanos()
|
||||
self.min_time.get()
|
||||
}
|
||||
|
||||
/// Return max time
|
||||
pub fn max_time(&self) -> i64 {
|
||||
self.iox_metadata.time_of_last_write.timestamp_nanos()
|
||||
self.max_time.get()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -121,7 +130,7 @@ impl QueryChunk for QueryableParquetChunk {
|
|||
// Note: parquet_file's id is an uuid which is also the datatype of the ChunkId. However,
|
||||
// it is not safe to use it for sorting chunk
|
||||
fn id(&self) -> ChunkId {
|
||||
let timestamp_nano = self.iox_metadata.time_of_first_write.timestamp_nanos();
|
||||
let timestamp_nano = self.min_time.get();
|
||||
let timestamp_nano_u128 =
|
||||
u128::try_from(timestamp_nano).expect("Cannot convert timestamp nano to u128 ");
|
||||
|
||||
|
@ -221,7 +230,7 @@ impl QueryChunk for QueryableParquetChunk {
|
|||
|
||||
// Order of the chunk so they can be deduplicate correctly
|
||||
fn order(&self) -> ChunkOrder {
|
||||
let seq_num = self.iox_metadata.min_sequence_number.get();
|
||||
let seq_num = self.min_sequence_number.get();
|
||||
let seq_num = u32::try_from(seq_num)
|
||||
.expect("Sequence number should have been converted to chunk order successfully");
|
||||
ChunkOrder::new(seq_num)
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
use crate::query::QueryableParquetChunk;
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use data_types2::{
|
||||
ParquetFile, ParquetFileId, ParquetFileParams, Timestamp, Tombstone, TombstoneId,
|
||||
ParquetFileId, ParquetFileParams, ParquetFileWithMetadata, Timestamp, Tombstone, TombstoneId,
|
||||
};
|
||||
use iox_object_store::IoxObjectStore;
|
||||
use object_store::DynObjectStore;
|
||||
|
@ -37,8 +37,8 @@ impl GroupWithTombstones {
|
|||
/// Wrapper of group of parquet files with their min time and total size
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct GroupWithMinTimeAndSize {
|
||||
/// Parquet files
|
||||
pub(crate) parquet_files: Vec<ParquetFile>,
|
||||
/// Parquet files and their metadata
|
||||
pub(crate) parquet_files: Vec<ParquetFileWithMetadata>,
|
||||
|
||||
/// min time of all parquet_files
|
||||
pub(crate) min_time: Timestamp,
|
||||
|
@ -51,7 +51,7 @@ pub struct GroupWithMinTimeAndSize {
|
|||
#[allow(missing_docs)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ParquetFileWithTombstone {
|
||||
pub(crate) data: Arc<ParquetFile>,
|
||||
pub(crate) data: Arc<ParquetFileWithMetadata>,
|
||||
pub(crate) tombstones: Vec<Tombstone>,
|
||||
}
|
||||
|
||||
|
@ -112,8 +112,11 @@ impl ParquetFileWithTombstone {
|
|||
QueryableParquetChunk::new(
|
||||
table_name,
|
||||
Arc::new(parquet_chunk),
|
||||
Arc::new(decoded_parquet_file.iox_metadata),
|
||||
&self.tombstones,
|
||||
self.data.min_sequence_number,
|
||||
self.data.max_sequence_number,
|
||||
self.data.min_time,
|
||||
self.data.max_time,
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -720,8 +720,44 @@ pub fn tombstones_to_delete_predicates_iter(
|
|||
}
|
||||
|
||||
/// Data for a parquet file reference that has been inserted in the catalog.
|
||||
#[derive(Debug, Clone, PartialEq, sqlx::FromRow)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, sqlx::FromRow)]
|
||||
pub struct ParquetFile {
|
||||
/// the id of the file in the catalog
|
||||
pub id: ParquetFileId,
|
||||
/// the sequencer that sequenced writes that went into this file
|
||||
pub sequencer_id: SequencerId,
|
||||
/// the namespace
|
||||
pub namespace_id: NamespaceId,
|
||||
/// the table
|
||||
pub table_id: TableId,
|
||||
/// the partition
|
||||
pub partition_id: PartitionId,
|
||||
/// the uuid used in the object store path for this file
|
||||
pub object_store_id: Uuid,
|
||||
/// the minimum sequence number from a record in this file
|
||||
pub min_sequence_number: SequenceNumber,
|
||||
/// the maximum sequence number from a record in this file
|
||||
pub max_sequence_number: SequenceNumber,
|
||||
/// the min timestamp of data in this file
|
||||
pub min_time: Timestamp,
|
||||
/// the max timestamp of data in this file
|
||||
pub max_time: Timestamp,
|
||||
/// When this file was marked for deletion
|
||||
pub to_delete: Option<Timestamp>,
|
||||
/// file size in bytes
|
||||
pub file_size_bytes: i64,
|
||||
/// the number of rows of data in this file
|
||||
pub row_count: i64,
|
||||
/// the compaction level of the file
|
||||
pub compaction_level: i16,
|
||||
/// the creation time of the parquet file
|
||||
pub created_at: Timestamp,
|
||||
}
|
||||
|
||||
/// Data for a parquet file reference that has been inserted in the catalog, including the
|
||||
/// `parquet_metadata` field that can be expensive to fetch.
|
||||
#[derive(Debug, Clone, PartialEq, sqlx::FromRow)]
|
||||
pub struct ParquetFileWithMetadata {
|
||||
/// the id of the file in the catalog
|
||||
pub id: ParquetFileId,
|
||||
/// the sequencer that sequenced writes that went into this file
|
||||
|
@ -756,6 +792,93 @@ pub struct ParquetFile {
|
|||
pub created_at: Timestamp,
|
||||
}
|
||||
|
||||
impl ParquetFileWithMetadata {
|
||||
/// Create an instance from an instance of ParquetFile and metadata bytes fetched from the
|
||||
/// catalog.
|
||||
pub fn new(parquet_file: ParquetFile, parquet_metadata: Vec<u8>) -> Self {
|
||||
let ParquetFile {
|
||||
id,
|
||||
sequencer_id,
|
||||
namespace_id,
|
||||
table_id,
|
||||
partition_id,
|
||||
object_store_id,
|
||||
min_sequence_number,
|
||||
max_sequence_number,
|
||||
min_time,
|
||||
max_time,
|
||||
to_delete,
|
||||
file_size_bytes,
|
||||
row_count,
|
||||
compaction_level,
|
||||
created_at,
|
||||
} = parquet_file;
|
||||
|
||||
Self {
|
||||
id,
|
||||
sequencer_id,
|
||||
namespace_id,
|
||||
table_id,
|
||||
partition_id,
|
||||
object_store_id,
|
||||
min_sequence_number,
|
||||
max_sequence_number,
|
||||
min_time,
|
||||
max_time,
|
||||
to_delete,
|
||||
file_size_bytes,
|
||||
parquet_metadata,
|
||||
row_count,
|
||||
compaction_level,
|
||||
created_at,
|
||||
}
|
||||
}
|
||||
|
||||
/// Split the parquet_metadata off, leaving a regular ParquetFile and the bytes to transfer
|
||||
/// ownership separately.
|
||||
pub fn split_off_metadata(self) -> (ParquetFile, Vec<u8>) {
|
||||
let Self {
|
||||
id,
|
||||
sequencer_id,
|
||||
namespace_id,
|
||||
table_id,
|
||||
partition_id,
|
||||
object_store_id,
|
||||
min_sequence_number,
|
||||
max_sequence_number,
|
||||
min_time,
|
||||
max_time,
|
||||
to_delete,
|
||||
file_size_bytes,
|
||||
parquet_metadata,
|
||||
row_count,
|
||||
compaction_level,
|
||||
created_at,
|
||||
} = self;
|
||||
|
||||
(
|
||||
ParquetFile {
|
||||
id,
|
||||
sequencer_id,
|
||||
namespace_id,
|
||||
table_id,
|
||||
partition_id,
|
||||
object_store_id,
|
||||
min_sequence_number,
|
||||
max_sequence_number,
|
||||
min_time,
|
||||
max_time,
|
||||
to_delete,
|
||||
file_size_bytes,
|
||||
row_count,
|
||||
compaction_level,
|
||||
created_at,
|
||||
},
|
||||
parquet_metadata,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Data for a parquet file to be inserted into the catalog.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct ParquetFileParams {
|
||||
|
|
|
@ -3,10 +3,10 @@
|
|||
use async_trait::async_trait;
|
||||
use data_types2::{
|
||||
Column, ColumnSchema, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace,
|
||||
NamespaceId, NamespaceSchema, ParquetFile, ParquetFileId, ParquetFileParams, Partition,
|
||||
PartitionId, PartitionInfo, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber,
|
||||
Sequencer, SequencerId, Table, TableId, TablePartition, TableSchema, Timestamp, Tombstone,
|
||||
TombstoneId,
|
||||
NamespaceId, NamespaceSchema, ParquetFile, ParquetFileId, ParquetFileParams,
|
||||
ParquetFileWithMetadata, Partition, PartitionId, PartitionInfo, ProcessedTombstone, QueryPool,
|
||||
QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table, TableId, TablePartition,
|
||||
TableSchema, Timestamp, Tombstone, TombstoneId,
|
||||
};
|
||||
use snafu::{OptionExt, Snafu};
|
||||
use std::{collections::BTreeMap, convert::TryFrom, fmt::Debug, sync::Arc};
|
||||
|
@ -525,6 +525,13 @@ pub trait ParquetFileRepo: Send + Sync {
|
|||
/// [`to_delete`](ParquetFile::to_delete).
|
||||
async fn list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result<Vec<ParquetFile>>;
|
||||
|
||||
/// List all parquet files and their metadata within a given table that are NOT marked as
|
||||
/// [`to_delete`](ParquetFile::to_delete). Fetching metadata can be expensive.
|
||||
async fn list_by_table_not_to_delete_with_metadata(
|
||||
&mut self,
|
||||
table_id: TableId,
|
||||
) -> Result<Vec<ParquetFileWithMetadata>>;
|
||||
|
||||
/// Delete all parquet files that were marked to be deleted earlier than the specified time.
|
||||
/// Returns the deleted records.
|
||||
async fn delete_old(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFile>>;
|
||||
|
@ -543,12 +550,20 @@ pub trait ParquetFileRepo: Send + Sync {
|
|||
max_time: Timestamp,
|
||||
) -> Result<Vec<ParquetFile>>;
|
||||
|
||||
/// List parquet files for a given partition that are NOT marked as [`to_delete`](ParquetFile::to_delete).
|
||||
/// List parquet files for a given partition that are NOT marked as
|
||||
/// [`to_delete`](ParquetFile::to_delete).
|
||||
async fn list_by_partition_not_to_delete(
|
||||
&mut self,
|
||||
partition_id: PartitionId,
|
||||
) -> Result<Vec<ParquetFile>>;
|
||||
|
||||
/// List parquet files and their metadata for a given partition that are NOT marked as
|
||||
/// [`to_delete`](ParquetFile::to_delete). Fetching metadata can be expensive.
|
||||
async fn list_by_partition_not_to_delete_with_metadata(
|
||||
&mut self,
|
||||
partition_id: PartitionId,
|
||||
) -> Result<Vec<ParquetFileWithMetadata>>;
|
||||
|
||||
/// Update the compaction level of the specified parquet files to level 1. Returns the IDs
|
||||
/// of the files that were successfully updated.
|
||||
async fn update_to_level_1(
|
||||
|
@ -559,6 +574,9 @@ pub trait ParquetFileRepo: Send + Sync {
|
|||
/// Verify if the parquet file exists by selecting its id
|
||||
async fn exist(&mut self, id: ParquetFileId) -> Result<bool>;
|
||||
|
||||
/// Fetch the parquet_metadata bytes for the given id. Potentially expensive.
|
||||
async fn parquet_metadata(&mut self, id: ParquetFileId) -> Result<Vec<u8>>;
|
||||
|
||||
/// Return count
|
||||
async fn count(&mut self) -> Result<i64>;
|
||||
|
||||
|
@ -1738,6 +1756,13 @@ pub(crate) mod test_helpers {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
let metadata = repos
|
||||
.parquet_files()
|
||||
.parquet_metadata(parquet_file.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(metadata, b"md1".to_vec());
|
||||
|
||||
// verify that trying to create a file with the same UUID throws an error
|
||||
let err = repos
|
||||
.parquet_files()
|
||||
|
@ -1770,13 +1795,13 @@ pub(crate) mod test_helpers {
|
|||
.list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(vec![parquet_file.clone(), other_file.clone()], files);
|
||||
assert_eq!(vec![parquet_file, other_file], files);
|
||||
let files = repos
|
||||
.parquet_files()
|
||||
.list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(150))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(vec![other_file.clone()], files);
|
||||
assert_eq!(vec![other_file], files);
|
||||
|
||||
// verify that to_delete is initially set to null and the file does not get deleted
|
||||
assert!(parquet_file.to_delete.is_none());
|
||||
|
@ -1833,6 +1858,23 @@ pub(crate) mod test_helpers {
|
|||
.unwrap();
|
||||
assert_eq!(files, vec![other_file]);
|
||||
|
||||
// test list_by_table_not_to_delete_with_metadata
|
||||
let files = repos
|
||||
.parquet_files()
|
||||
.list_by_table_not_to_delete_with_metadata(table.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(files, vec![]);
|
||||
let files = repos
|
||||
.parquet_files()
|
||||
.list_by_table_not_to_delete_with_metadata(other_table.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
files,
|
||||
vec![ParquetFileWithMetadata::new(other_file, b"md1".to_vec())]
|
||||
);
|
||||
|
||||
// test list_by_namespace_not_to_delete
|
||||
let namespace2 = repos
|
||||
.namespaces()
|
||||
|
@ -1890,7 +1932,7 @@ pub(crate) mod test_helpers {
|
|||
.list_by_namespace_not_to_delete(namespace2.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(vec![f1.clone(), f2.clone()], files);
|
||||
assert_eq!(vec![f1, f2], files);
|
||||
|
||||
let f3_params = ParquetFileParams {
|
||||
object_store_id: Uuid::new_v4(),
|
||||
|
@ -1906,7 +1948,7 @@ pub(crate) mod test_helpers {
|
|||
.list_by_namespace_not_to_delete(namespace2.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(vec![f1.clone(), f2.clone(), f3.clone()], files);
|
||||
assert_eq!(vec![f1, f2, f3], files);
|
||||
|
||||
repos.parquet_files().flag_for_delete(f2.id).await.unwrap();
|
||||
let files = repos
|
||||
|
@ -1914,7 +1956,7 @@ pub(crate) mod test_helpers {
|
|||
.list_by_namespace_not_to_delete(namespace2.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(vec![f1.clone(), f3.clone()], files);
|
||||
assert_eq!(vec![f1, f3], files);
|
||||
|
||||
let files = repos
|
||||
.parquet_files()
|
||||
|
@ -2449,6 +2491,19 @@ pub(crate) mod test_helpers {
|
|||
.await
|
||||
.unwrap();
|
||||
assert_eq!(files, vec![parquet_file, level1_file]);
|
||||
|
||||
let files = repos
|
||||
.parquet_files()
|
||||
.list_by_partition_not_to_delete_with_metadata(partition.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
files,
|
||||
vec![
|
||||
ParquetFileWithMetadata::new(parquet_file, b"md1".to_vec()),
|
||||
ParquetFileWithMetadata::new(level1_file, b"md1".to_vec()),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
async fn test_update_to_compaction_level_1(catalog: Arc<dyn Catalog>) {
|
||||
|
@ -2519,7 +2574,7 @@ pub(crate) mod test_helpers {
|
|||
let nonexistent_parquet_file_id = ParquetFileId::new(level_0_file.id.get() + 1);
|
||||
|
||||
// Level 0 parquet files should contain both existing files at this point
|
||||
let expected = vec![parquet_file.clone(), level_0_file.clone()];
|
||||
let expected = vec![parquet_file, level_0_file];
|
||||
let level_0 = repos.parquet_files().level_0(sequencer.id).await.unwrap();
|
||||
let mut level_0_ids: Vec<_> = level_0.iter().map(|pf| pf.id).collect();
|
||||
level_0_ids.sort();
|
||||
|
|
|
@ -13,14 +13,17 @@ use crate::{
|
|||
use async_trait::async_trait;
|
||||
use data_types2::{
|
||||
Column, ColumnId, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId,
|
||||
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo,
|
||||
ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table,
|
||||
TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
|
||||
ParquetFile, ParquetFileId, ParquetFileParams, ParquetFileWithMetadata, Partition, PartitionId,
|
||||
PartitionInfo, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer,
|
||||
SequencerId, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
|
||||
};
|
||||
use observability_deps::tracing::warn;
|
||||
use std::fmt::Formatter;
|
||||
use std::sync::Arc;
|
||||
use std::{collections::HashSet, convert::TryFrom};
|
||||
use std::{
|
||||
collections::{BTreeMap, HashSet},
|
||||
convert::TryFrom,
|
||||
fmt::Formatter,
|
||||
sync::Arc,
|
||||
};
|
||||
use time::{SystemProvider, TimeProvider};
|
||||
use tokio::sync::{Mutex, OwnedMutexGuard};
|
||||
|
||||
|
@ -60,6 +63,7 @@ struct MemCollections {
|
|||
partitions: Vec<Partition>,
|
||||
tombstones: Vec<Tombstone>,
|
||||
parquet_files: Vec<ParquetFile>,
|
||||
parquet_file_metadata: BTreeMap<ParquetFileId, Vec<u8>>,
|
||||
processed_tombstones: Vec<ProcessedTombstone>,
|
||||
}
|
||||
|
||||
|
@ -961,12 +965,16 @@ impl ParquetFileRepo for MemTxn {
|
|||
row_count,
|
||||
to_delete: None,
|
||||
file_size_bytes,
|
||||
parquet_metadata,
|
||||
compaction_level,
|
||||
created_at,
|
||||
};
|
||||
|
||||
stage
|
||||
.parquet_file_metadata
|
||||
.insert(parquet_file.id, parquet_metadata);
|
||||
|
||||
stage.parquet_files.push(parquet_file);
|
||||
Ok(stage.parquet_files.last().unwrap().clone())
|
||||
Ok(*stage.parquet_files.last().unwrap())
|
||||
}
|
||||
|
||||
async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> {
|
||||
|
@ -1029,6 +1037,31 @@ impl ParquetFileRepo for MemTxn {
|
|||
Ok(parquet_files)
|
||||
}
|
||||
|
||||
async fn list_by_table_not_to_delete_with_metadata(
|
||||
&mut self,
|
||||
table_id: TableId,
|
||||
) -> Result<Vec<ParquetFileWithMetadata>> {
|
||||
let stage = self.stage();
|
||||
|
||||
let parquet_files: Vec<_> = stage
|
||||
.parquet_files
|
||||
.iter()
|
||||
.filter(|f| table_id == f.table_id && f.to_delete.is_none())
|
||||
.cloned()
|
||||
.map(|f| {
|
||||
ParquetFileWithMetadata::new(
|
||||
f,
|
||||
stage
|
||||
.parquet_file_metadata
|
||||
.get(&f.id)
|
||||
.cloned()
|
||||
.unwrap_or_default(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
Ok(parquet_files)
|
||||
}
|
||||
|
||||
async fn delete_old(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFile>> {
|
||||
let stage = self.stage();
|
||||
|
||||
|
@ -1038,6 +1071,10 @@ impl ParquetFileRepo for MemTxn {
|
|||
|
||||
stage.parquet_files = keep;
|
||||
|
||||
for delete in &delete {
|
||||
stage.parquet_file_metadata.remove(&delete.id);
|
||||
}
|
||||
|
||||
Ok(delete)
|
||||
}
|
||||
|
||||
|
@ -1092,6 +1129,30 @@ impl ParquetFileRepo for MemTxn {
|
|||
.collect())
|
||||
}
|
||||
|
||||
async fn list_by_partition_not_to_delete_with_metadata(
|
||||
&mut self,
|
||||
partition_id: PartitionId,
|
||||
) -> Result<Vec<ParquetFileWithMetadata>> {
|
||||
let stage = self.stage();
|
||||
|
||||
Ok(stage
|
||||
.parquet_files
|
||||
.iter()
|
||||
.filter(|f| f.partition_id == partition_id && f.to_delete.is_none())
|
||||
.cloned()
|
||||
.map(|f| {
|
||||
ParquetFileWithMetadata::new(
|
||||
f,
|
||||
stage
|
||||
.parquet_file_metadata
|
||||
.get(&f.id)
|
||||
.cloned()
|
||||
.unwrap_or_default(),
|
||||
)
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn update_to_level_1(
|
||||
&mut self,
|
||||
parquet_file_ids: &[ParquetFileId],
|
||||
|
@ -1118,6 +1179,16 @@ impl ParquetFileRepo for MemTxn {
|
|||
Ok(stage.parquet_files.iter().any(|f| f.id == id))
|
||||
}
|
||||
|
||||
async fn parquet_metadata(&mut self, id: ParquetFileId) -> Result<Vec<u8>> {
|
||||
let stage = self.stage();
|
||||
|
||||
stage
|
||||
.parquet_file_metadata
|
||||
.get(&id)
|
||||
.cloned()
|
||||
.ok_or(Error::ParquetRecordNotFound { id })
|
||||
}
|
||||
|
||||
async fn count(&mut self) -> Result<i64> {
|
||||
let stage = self.stage();
|
||||
|
||||
|
|
|
@ -8,9 +8,9 @@ use crate::interface::{
|
|||
use async_trait::async_trait;
|
||||
use data_types2::{
|
||||
Column, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId,
|
||||
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo,
|
||||
ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table,
|
||||
TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
|
||||
ParquetFile, ParquetFileId, ParquetFileParams, ParquetFileWithMetadata, Partition, PartitionId,
|
||||
PartitionInfo, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer,
|
||||
SequencerId, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
|
||||
};
|
||||
use metric::{Metric, U64Histogram, U64HistogramOptions};
|
||||
use std::{fmt::Debug, sync::Arc};
|
||||
|
@ -269,12 +269,15 @@ decorate!(
|
|||
"parquet_list_by_sequencer_greater_than" = list_by_sequencer_greater_than(&mut self, sequencer_id: SequencerId, sequence_number: SequenceNumber) -> Result<Vec<ParquetFile>>;
|
||||
"parquet_list_by_namespace_not_to_delete" = list_by_namespace_not_to_delete(&mut self, namespace_id: NamespaceId) -> Result<Vec<ParquetFile>>;
|
||||
"parquet_list_by_table_not_to_delete" = list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result<Vec<ParquetFile>>;
|
||||
"parquet_list_by_table_not_to_delete_with_metadata" = list_by_table_not_to_delete_with_metadata(&mut self, table_id: TableId) -> Result<Vec<ParquetFileWithMetadata>>;
|
||||
"parquet_delete_old" = delete_old(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFile>>;
|
||||
"parquet_list_by_partition_not_to_delete" = list_by_partition_not_to_delete(&mut self, partition_id: PartitionId) -> Result<Vec<ParquetFile>>;
|
||||
"parquet_list_by_partition_not_to_delete_with_metadata" = list_by_partition_not_to_delete_with_metadata(&mut self, partition_id: PartitionId) -> Result<Vec<ParquetFileWithMetadata>>;
|
||||
"parquet_level_0" = level_0(&mut self, sequencer_id: SequencerId) -> Result<Vec<ParquetFile>>;
|
||||
"parquet_level_1" = level_1(&mut self, table_partition: TablePartition, min_time: Timestamp, max_time: Timestamp) -> Result<Vec<ParquetFile>>;
|
||||
"parquet_update_to_level_1" = update_to_level_1(&mut self, parquet_file_ids: &[ParquetFileId]) -> Result<Vec<ParquetFileId>>;
|
||||
"parquet_exist" = exist(&mut self, id: ParquetFileId) -> Result<bool>;
|
||||
"parquet_metadata" = parquet_metadata(&mut self, id: ParquetFileId) -> Result<Vec<u8>>;
|
||||
"parquet_count" = count(&mut self) -> Result<i64>;
|
||||
"parquet_count_by_overlaps" = count_by_overlaps(&mut self, table_id: TableId, sequencer_id: SequencerId, min_time: Timestamp, max_time: Timestamp, sequence_number: SequenceNumber) -> Result<i64>;
|
||||
]
|
||||
|
|
|
@ -12,9 +12,9 @@ use crate::{
|
|||
use async_trait::async_trait;
|
||||
use data_types2::{
|
||||
Column, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId,
|
||||
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo,
|
||||
ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table,
|
||||
TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
|
||||
ParquetFile, ParquetFileId, ParquetFileParams, ParquetFileWithMetadata, Partition, PartitionId,
|
||||
PartitionInfo, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer,
|
||||
SequencerId, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
|
||||
};
|
||||
use observability_deps::tracing::{info, warn};
|
||||
use sqlx::{migrate::Migrator, postgres::PgPoolOptions, Acquire, Executor, Postgres, Row};
|
||||
|
@ -1502,9 +1502,13 @@ RETURNING *;
|
|||
sequencer_id: SequencerId,
|
||||
sequence_number: SequenceNumber,
|
||||
) -> Result<Vec<ParquetFile>> {
|
||||
// Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large
|
||||
// `parquet_metadata` column!!
|
||||
sqlx::query_as::<_, ParquetFile>(
|
||||
r#"
|
||||
SELECT *
|
||||
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
|
||||
min_sequence_number, max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
|
||||
row_count, compaction_level, created_at
|
||||
FROM parquet_file
|
||||
WHERE sequencer_id = $1
|
||||
AND max_sequence_number > $2
|
||||
|
@ -1522,9 +1526,15 @@ ORDER BY id;
|
|||
&mut self,
|
||||
namespace_id: NamespaceId,
|
||||
) -> Result<Vec<ParquetFile>> {
|
||||
// Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large
|
||||
// `parquet_metadata` column!!
|
||||
sqlx::query_as::<_, ParquetFile>(
|
||||
r#"
|
||||
SELECT parquet_file.*
|
||||
SELECT parquet_file.id, parquet_file.sequencer_id, parquet_file.namespace_id,
|
||||
parquet_file.table_id, parquet_file.partition_id, parquet_file.object_store_id,
|
||||
parquet_file.min_sequence_number, parquet_file.max_sequence_number, parquet_file.min_time,
|
||||
parquet_file.max_time, parquet_file.to_delete, parquet_file.file_size_bytes,
|
||||
parquet_file.row_count, parquet_file.compaction_level, parquet_file.created_at
|
||||
FROM parquet_file
|
||||
INNER JOIN table_name on table_name.id = parquet_file.table_id
|
||||
WHERE table_name.namespace_id = $1
|
||||
|
@ -1538,8 +1548,29 @@ WHERE table_name.namespace_id = $1
|
|||
}
|
||||
|
||||
async fn list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result<Vec<ParquetFile>> {
|
||||
// Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large
|
||||
// `parquet_metadata` column!!
|
||||
sqlx::query_as::<_, ParquetFile>(
|
||||
r#"
|
||||
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
|
||||
min_sequence_number, max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
|
||||
row_count, compaction_level, created_at
|
||||
FROM parquet_file
|
||||
WHERE table_id = $1 AND to_delete IS NULL;
|
||||
"#,
|
||||
)
|
||||
.bind(&table_id) // $1
|
||||
.fetch_all(&mut self.inner)
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })
|
||||
}
|
||||
|
||||
async fn list_by_table_not_to_delete_with_metadata(
|
||||
&mut self,
|
||||
table_id: TableId,
|
||||
) -> Result<Vec<ParquetFileWithMetadata>> {
|
||||
sqlx::query_as::<_, ParquetFileWithMetadata>(
|
||||
r#"
|
||||
SELECT *
|
||||
FROM parquet_file
|
||||
WHERE table_id = $1 AND to_delete IS NULL;
|
||||
|
@ -1568,10 +1599,14 @@ RETURNING *;
|
|||
async fn level_0(&mut self, sequencer_id: SequencerId) -> Result<Vec<ParquetFile>> {
|
||||
// this intentionally limits the returned files to 10,000 as it is used to make
|
||||
// a decision on the highest priority partitions. If compaction has never been
|
||||
// run this could end up returning millions of results and taking too long to run
|
||||
// run this could end up returning millions of results and taking too long to run.
|
||||
// Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large
|
||||
// `parquet_metadata` column!!
|
||||
sqlx::query_as::<_, ParquetFile>(
|
||||
r#"
|
||||
SELECT *
|
||||
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
|
||||
min_sequence_number, max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
|
||||
row_count, compaction_level, created_at
|
||||
FROM parquet_file
|
||||
WHERE parquet_file.sequencer_id = $1
|
||||
AND parquet_file.compaction_level = 0
|
||||
|
@ -1591,9 +1626,13 @@ WHERE parquet_file.sequencer_id = $1
|
|||
min_time: Timestamp,
|
||||
max_time: Timestamp,
|
||||
) -> Result<Vec<ParquetFile>> {
|
||||
// Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large
|
||||
// `parquet_metadata` column!!
|
||||
sqlx::query_as::<_, ParquetFile>(
|
||||
r#"
|
||||
SELECT *
|
||||
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
|
||||
min_sequence_number, max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
|
||||
row_count, compaction_level, created_at
|
||||
FROM parquet_file
|
||||
WHERE parquet_file.sequencer_id = $1
|
||||
AND parquet_file.table_id = $2
|
||||
|
@ -1618,13 +1657,35 @@ WHERE parquet_file.sequencer_id = $1
|
|||
&mut self,
|
||||
partition_id: PartitionId,
|
||||
) -> Result<Vec<ParquetFile>> {
|
||||
// Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large
|
||||
// `parquet_metadata` column!!
|
||||
sqlx::query_as::<_, ParquetFile>(
|
||||
r#"
|
||||
SELECT id, sequencer_id, namespace_id, table_id, partition_id, object_store_id,
|
||||
min_sequence_number, max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
|
||||
row_count, compaction_level, created_at
|
||||
FROM parquet_file
|
||||
WHERE parquet_file.partition_id = $1
|
||||
AND parquet_file.to_delete IS NULL;
|
||||
"#,
|
||||
)
|
||||
.bind(&partition_id) // $1
|
||||
.fetch_all(&mut self.inner)
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })
|
||||
}
|
||||
|
||||
async fn list_by_partition_not_to_delete_with_metadata(
|
||||
&mut self,
|
||||
partition_id: PartitionId,
|
||||
) -> Result<Vec<ParquetFileWithMetadata>> {
|
||||
sqlx::query_as::<_, ParquetFileWithMetadata>(
|
||||
r#"
|
||||
SELECT *
|
||||
FROM parquet_file
|
||||
WHERE parquet_file.partition_id = $1
|
||||
AND parquet_file.to_delete IS NULL;
|
||||
"#,
|
||||
"#,
|
||||
)
|
||||
.bind(&partition_id) // $1
|
||||
.fetch_all(&mut self.inner)
|
||||
|
@ -1668,6 +1729,17 @@ RETURNING id;
|
|||
Ok(read_result.count > 0)
|
||||
}
|
||||
|
||||
async fn parquet_metadata(&mut self, id: ParquetFileId) -> Result<Vec<u8>> {
|
||||
let read_result =
|
||||
sqlx::query(r#"SELECT parquet_metadata FROM parquet_file WHERE id = $1;"#)
|
||||
.bind(&id) // $1
|
||||
.fetch_one(&mut self.inner)
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })?;
|
||||
|
||||
Ok(read_result.get("parquet_metadata"))
|
||||
}
|
||||
|
||||
async fn count(&mut self) -> Result<i64> {
|
||||
let read_result =
|
||||
sqlx::query_as::<_, Count>(r#"SELECT count(*) as count FROM parquet_file;"#)
|
||||
|
|
|
@ -6,9 +6,9 @@ use arrow::{
|
|||
};
|
||||
use bytes::Bytes;
|
||||
use data_types2::{
|
||||
Column, ColumnType, KafkaPartition, KafkaTopic, Namespace, ParquetFile, ParquetFileParams,
|
||||
Partition, QueryPool, SequenceNumber, Sequencer, SequencerId, Table, TableId, Timestamp,
|
||||
Tombstone, TombstoneId,
|
||||
Column, ColumnType, KafkaPartition, KafkaTopic, Namespace, ParquetFile, ParquetFileId,
|
||||
ParquetFileParams, ParquetFileWithMetadata, Partition, QueryPool, SequenceNumber, Sequencer,
|
||||
SequencerId, Table, TableId, Timestamp, Tombstone, TombstoneId,
|
||||
};
|
||||
use iox_catalog::{
|
||||
interface::{Catalog, INITIAL_COMPACTION_LEVEL},
|
||||
|
@ -189,6 +189,31 @@ impl TestCatalog {
|
|||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// List all non-deleted files with their metadata
|
||||
pub async fn list_by_table_not_to_delete_with_metadata(
|
||||
self: &Arc<Self>,
|
||||
table_id: TableId,
|
||||
) -> Vec<ParquetFileWithMetadata> {
|
||||
self.catalog
|
||||
.repositories()
|
||||
.await
|
||||
.parquet_files()
|
||||
.list_by_table_not_to_delete_with_metadata(table_id)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Get a parquet file's metadata bytes
|
||||
pub async fn parquet_metadata(&self, parquet_file_id: ParquetFileId) -> Vec<u8> {
|
||||
self.catalog
|
||||
.repositories()
|
||||
.await
|
||||
.parquet_files()
|
||||
.parquet_metadata(parquet_file_id)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
/// A test namespace
|
||||
|
@ -373,7 +398,7 @@ pub struct TestPartition {
|
|||
|
||||
impl TestPartition {
|
||||
/// Create a parquet for the partition
|
||||
pub async fn create_parquet_file(self: &Arc<Self>, lp: &str) -> Arc<TestParquetFile> {
|
||||
pub async fn create_parquet_file(self: &Arc<Self>, lp: &str) -> TestParquetFile {
|
||||
self.create_parquet_file_with_min_max(
|
||||
lp,
|
||||
1,
|
||||
|
@ -392,7 +417,7 @@ impl TestPartition {
|
|||
max_seq: i64,
|
||||
min_time: i64,
|
||||
max_time: i64,
|
||||
) -> Arc<TestParquetFile> {
|
||||
) -> TestParquetFile {
|
||||
self.create_parquet_file_with_min_max_and_creation_time(
|
||||
lp, min_seq, max_seq, min_time, max_time, 1,
|
||||
)
|
||||
|
@ -408,7 +433,7 @@ impl TestPartition {
|
|||
min_time: i64,
|
||||
max_time: i64,
|
||||
creation_time: i64,
|
||||
) -> Arc<TestParquetFile> {
|
||||
) -> TestParquetFile {
|
||||
let mut repos = self.catalog.catalog.repositories().await;
|
||||
|
||||
let (table, batch) = lp_to_mutable_batch(lp);
|
||||
|
@ -451,7 +476,7 @@ impl TestPartition {
|
|||
min_time: Timestamp::new(min_time),
|
||||
max_time: Timestamp::new(max_time),
|
||||
file_size_bytes: real_file_size_bytes as i64,
|
||||
parquet_metadata: parquet_metadata_bin,
|
||||
parquet_metadata: parquet_metadata_bin.clone(),
|
||||
row_count: row_count as i64,
|
||||
created_at: Timestamp::new(creation_time),
|
||||
compaction_level: INITIAL_COMPACTION_LEVEL,
|
||||
|
@ -462,11 +487,13 @@ impl TestPartition {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
Arc::new(TestParquetFile {
|
||||
let parquet_file = ParquetFileWithMetadata::new(parquet_file, parquet_metadata_bin);
|
||||
|
||||
TestParquetFile {
|
||||
catalog: Arc::clone(&self.catalog),
|
||||
namespace: Arc::clone(&self.namespace),
|
||||
parquet_file,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a parquet for the partition with fake sizew for testing
|
||||
|
@ -480,7 +507,7 @@ impl TestPartition {
|
|||
max_time: i64,
|
||||
file_size_bytes: i64,
|
||||
creation_time: i64,
|
||||
) -> Arc<TestParquetFile> {
|
||||
) -> TestParquetFile {
|
||||
let mut repos = self.catalog.catalog.repositories().await;
|
||||
|
||||
let (table, batch) = lp_to_mutable_batch(lp);
|
||||
|
@ -523,7 +550,7 @@ impl TestPartition {
|
|||
min_time: Timestamp::new(min_time),
|
||||
max_time: Timestamp::new(max_time),
|
||||
file_size_bytes,
|
||||
parquet_metadata: parquet_metadata_bin,
|
||||
parquet_metadata: parquet_metadata_bin.clone(),
|
||||
row_count: row_count as i64,
|
||||
created_at: Timestamp::new(creation_time),
|
||||
compaction_level: INITIAL_COMPACTION_LEVEL,
|
||||
|
@ -534,11 +561,13 @@ impl TestPartition {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
Arc::new(TestParquetFile {
|
||||
let parquet_file = ParquetFileWithMetadata::new(parquet_file, parquet_metadata_bin);
|
||||
|
||||
TestParquetFile {
|
||||
catalog: Arc::clone(&self.catalog),
|
||||
namespace: Arc::clone(&self.namespace),
|
||||
parquet_file,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -590,12 +619,12 @@ async fn create_parquet_file(
|
|||
pub struct TestParquetFile {
|
||||
pub catalog: Arc<TestCatalog>,
|
||||
pub namespace: Arc<TestNamespace>,
|
||||
pub parquet_file: ParquetFile,
|
||||
pub parquet_file: ParquetFileWithMetadata,
|
||||
}
|
||||
|
||||
impl TestParquetFile {
|
||||
/// Make the parquet file deletable
|
||||
pub async fn flag_for_delete(self: &Arc<Self>) {
|
||||
pub async fn flag_for_delete(&self) {
|
||||
let mut repos = self.catalog.catalog.repositories().await;
|
||||
|
||||
repos
|
||||
|
@ -604,6 +633,11 @@ impl TestParquetFile {
|
|||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// When only the ParquetFile is needed without the metadata, use this instead of the field
|
||||
pub fn parquet_file_no_metadata(self) -> ParquetFile {
|
||||
self.parquet_file.split_off_metadata().0
|
||||
}
|
||||
}
|
||||
|
||||
/// A catalog test tombstone
|
||||
|
@ -616,7 +650,7 @@ pub struct TestTombstone {
|
|||
|
||||
impl TestTombstone {
|
||||
/// mark the tombstone proccesed
|
||||
pub async fn mark_processed(self: &Arc<Self>, parquet_file: &Arc<TestParquetFile>) {
|
||||
pub async fn mark_processed(self: &Arc<Self>, parquet_file: &TestParquetFile) {
|
||||
assert!(Arc::ptr_eq(&self.catalog, &parquet_file.catalog));
|
||||
assert!(Arc::ptr_eq(&self.namespace, &parquet_file.namespace));
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@ use data_types::{
|
|||
partition_metadata::{Statistics, TableSummary},
|
||||
timestamp::{TimestampMinMax, TimestampRange},
|
||||
};
|
||||
use data_types2::ParquetFile;
|
||||
use data_types2::{ParquetFile, ParquetFileWithMetadata};
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use iox_object_store::{IoxObjectStore, ParquetFilePath};
|
||||
use observability_deps::tracing::*;
|
||||
|
@ -280,10 +280,9 @@ pub struct DecodedParquetFile {
|
|||
}
|
||||
|
||||
impl DecodedParquetFile {
|
||||
pub fn new(parquet_file: ParquetFile) -> Self {
|
||||
let parquet_metadata = Arc::new(IoxParquetMetaData::from_thrift_bytes(
|
||||
parquet_file.parquet_metadata.clone(),
|
||||
));
|
||||
pub fn new(parquet_file_with_metadata: ParquetFileWithMetadata) -> Self {
|
||||
let (parquet_file, parquet_metadata) = parquet_file_with_metadata.split_off_metadata();
|
||||
let parquet_metadata = Arc::new(IoxParquetMetaData::from_thrift_bytes(parquet_metadata));
|
||||
let decoded_metadata = parquet_metadata.decode().expect("parquet metadata broken");
|
||||
let iox_metadata = decoded_metadata
|
||||
.read_iox_metadata_new()
|
||||
|
|
|
@ -3,8 +3,8 @@
|
|||
use crate::cache::CatalogCache;
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use data_types2::{
|
||||
ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, ParquetFile, ParquetFileId, SequenceNumber,
|
||||
SequencerId,
|
||||
ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, ParquetFile, ParquetFileId,
|
||||
ParquetFileWithMetadata, SequenceNumber, SequencerId,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use iox_catalog::interface::Catalog;
|
||||
|
@ -212,8 +212,11 @@ impl ParquetChunkAdapter {
|
|||
/// Create new querier chunk.
|
||||
///
|
||||
/// Returns `None` if some data required to create this chunk is already gone from the catalog.
|
||||
pub async fn new_querier_chunk(&self, parquet_file: ParquetFile) -> Option<QuerierChunk> {
|
||||
let decoded_parquet_file = DecodedParquetFile::new(parquet_file);
|
||||
pub async fn new_querier_chunk(
|
||||
&self,
|
||||
parquet_file_with_metadata: ParquetFileWithMetadata,
|
||||
) -> Option<QuerierChunk> {
|
||||
let decoded_parquet_file = DecodedParquetFile::new(parquet_file_with_metadata);
|
||||
let chunk = Arc::new(self.new_parquet_chunk(&decoded_parquet_file).await?);
|
||||
|
||||
let addr = self
|
||||
|
@ -334,8 +337,7 @@ pub mod tests {
|
|||
.await
|
||||
.create_parquet_file(&lp)
|
||||
.await
|
||||
.parquet_file
|
||||
.clone();
|
||||
.parquet_file;
|
||||
|
||||
// create chunk
|
||||
let chunk = adapter.new_querier_chunk(parquet_file).await.unwrap();
|
||||
|
|
|
@ -7,7 +7,7 @@ use crate::{
|
|||
IngesterConnection,
|
||||
};
|
||||
use backoff::{Backoff, BackoffConfig};
|
||||
use data_types2::{ParquetFile, TableId};
|
||||
use data_types2::{ParquetFileWithMetadata, TableId};
|
||||
use observability_deps::tracing::debug;
|
||||
use predicate::Predicate;
|
||||
use query::{provider::ChunkPruner, QueryChunk};
|
||||
|
@ -107,7 +107,7 @@ impl QuerierTable {
|
|||
|
||||
let parquet_files = txn
|
||||
.parquet_files()
|
||||
.list_by_table_not_to_delete(self.id)
|
||||
.list_by_table_not_to_delete_with_metadata(self.id)
|
||||
.await?;
|
||||
|
||||
let tombstones = txn.tombstones().list_by_table(self.id).await?;
|
||||
|
@ -132,8 +132,12 @@ impl QuerierTable {
|
|||
|
||||
// convert parquet files and tombstones to nicer objects
|
||||
let mut chunks = Vec::with_capacity(parquet_files.len());
|
||||
for parquet_file in parquet_files {
|
||||
if let Some(chunk) = self.chunk_adapter.new_querier_chunk(parquet_file).await {
|
||||
for parquet_file_with_metadata in parquet_files {
|
||||
if let Some(chunk) = self
|
||||
.chunk_adapter
|
||||
.new_querier_chunk(parquet_file_with_metadata)
|
||||
.await
|
||||
{
|
||||
chunks.push(chunk);
|
||||
}
|
||||
}
|
||||
|
@ -252,7 +256,10 @@ impl QuerierTable {
|
|||
///
|
||||
/// Specificially, ensure that the persisted number from all
|
||||
/// chunks is consistent with the parquet files we know about
|
||||
fn validate_cache(_partitions: &[Arc<IngesterPartition>], _parquet_files: &[ParquetFile]) -> bool {
|
||||
fn validate_cache(
|
||||
_partitions: &[Arc<IngesterPartition>],
|
||||
_parquet_files: &[ParquetFileWithMetadata],
|
||||
) -> bool {
|
||||
// TODO fill out the validation logic here
|
||||
true
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue