feat: Add a function to the catalog to fetch level 1 parquet files

Fixes #3946.
pull/24376/head
Carol (Nichols || Goulding) 2022-03-10 16:06:37 -05:00
parent f184b7023c
commit 1dacf567d9
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
7 changed files with 402 additions and 36 deletions

View File

@ -2,10 +2,13 @@
use crate::{
query::QueryableParquetChunk,
utils::{CompactedData, ParquetFileWithTombstone, TablePartition},
utils::{CompactedData, ParquetFileWithTombstone},
};
use backoff::BackoffConfig;
use data_types2::{ParquetFile, ParquetFileId, PartitionId, SequencerId, TableId, TombstoneId};
use data_types2::{
ParquetFile, ParquetFileId, PartitionId, SequencerId, TableId, TablePartition, Timestamp,
TombstoneId,
};
use datafusion::error::DataFusionError;
use iox_catalog::interface::Catalog;
use object_store::ObjectStore;
@ -98,6 +101,11 @@ pub enum Error {
source: iox_catalog::interface::Error,
},
#[snafu(display("Error while requesting level 1 parquet files {}", source))]
Level1 {
source: iox_catalog::interface::Error,
},
#[snafu(display("Error updating catalog {}", source))]
Update {
source: iox_catalog::interface::Error,
@ -156,6 +164,21 @@ impl Compactor {
.context(Level0Snafu)
}
async fn level_1_parquet_files(
&self,
table_partition: TablePartition,
min_time: Timestamp,
max_time: Timestamp,
) -> Result<Vec<ParquetFile>> {
let mut repos = self.catalog.repositories().await;
repos
.parquet_files()
.level_1(table_partition, min_time, max_time)
.await
.context(Level1Snafu)
}
async fn update_to_level_1(&self, parquet_file_ids: &[ParquetFileId]) -> Result<()> {
let mut repos = self.catalog.repositories().await;
@ -189,12 +212,29 @@ impl Compactor {
// Read level-0 parquet files
let level_0_files = self.level_0_parquet_files(sequencer_id).await?;
// If there are no level-0 parquet files, return because there's nothing to do
if level_0_files.is_empty() {
return Ok(());
}
// Group files into table partition
let mut partitions = Self::group_parquet_files_into_partition(level_0_files);
// Get level-1 files overlapped with level-0
for (_key, val) in &mut partitions.iter_mut() {
let level_1_files: Vec<ParquetFile> = vec![]; // TODO: #3946
// Get level-1 files overlapped in time with level-0
for (key, val) in &mut partitions.iter_mut() {
let overall_min_time = val
.iter()
.map(|pf| pf.min_time)
.min()
.expect("The list of files was checked for emptiness above");
let overall_max_time = val
.iter()
.map(|pf| pf.max_time)
.max()
.expect("The list of files was checked for emptiness above");
let level_1_files = self
.level_1_parquet_files(*key, overall_min_time, overall_max_time)
.await?;
val.extend(level_1_files);
}
@ -285,7 +325,8 @@ impl Compactor {
}
// Compact given files. Assume the given files are overlaped in time.
// If the assumption does not meet, we will spend time not to compact anything but put data together
// If the assumption does not meet, we will spend time not to compact anything but put data
// together
async fn compact(
&self,
overlapped_files: Vec<ParquetFileWithTombstone>,

View File

@ -1,38 +1,15 @@
//! Helpers of the Compactor
use std::{collections::HashSet, sync::Arc};
use crate::query::QueryableParquetChunk;
use arrow::record_batch::RecordBatch;
use data_types2::{
ParquetFile, ParquetFileId, PartitionId, SequencerId, TableId, Tombstone, TombstoneId,
};
use data_types2::{ParquetFile, ParquetFileId, Tombstone, TombstoneId};
use iox_object_store::IoxObjectStore;
use object_store::ObjectStore;
use parquet_file::{
chunk::{new_parquet_chunk, ChunkMetrics, DecodedParquetFile},
metadata::IoxMetadata,
};
use crate::query::QueryableParquetChunk;
/// Define table partition
#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord)]
pub struct TablePartition {
sequencer_id: SequencerId,
table_id: TableId,
partition_id: PartitionId,
}
impl TablePartition {
/// Return a new table partition
pub fn new(sequencer_id: SequencerId, table_id: TableId, partition_id: PartitionId) -> Self {
Self {
sequencer_id,
table_id,
partition_id,
}
}
}
use std::{collections::HashSet, sync::Arc};
/// Wrapper of a parquet file and its tombstones
#[allow(missing_docs)]

View File

@ -17,6 +17,7 @@ use std::{
collections::BTreeMap,
convert::TryFrom,
fmt::{Debug, Formatter},
ops::{Add, Sub},
sync::Arc,
};
use uuid::Uuid;
@ -189,6 +190,29 @@ impl std::fmt::Display for PartitionId {
}
}
/// Combination of Sequencer ID, Table ID, and Partition ID useful for identifying groups of
/// Parquet files to be compacted together.
#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord)]
pub struct TablePartition {
/// The sequencer ID
pub sequencer_id: SequencerId,
/// The table ID
pub table_id: TableId,
/// The partition ID
pub partition_id: PartitionId,
}
impl TablePartition {
/// Combine the relevant parts
pub fn new(sequencer_id: SequencerId, table_id: TableId, partition_id: PartitionId) -> Self {
Self {
sequencer_id,
table_id,
partition_id,
}
}
}
/// Unique ID for a `Tombstone`
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
@ -234,6 +258,22 @@ impl Timestamp {
}
}
impl Add<i64> for Timestamp {
type Output = Self;
fn add(self, other: i64) -> Self {
Self(self.0 + other)
}
}
impl Sub<i64> for Timestamp {
type Output = Self;
fn sub(self, other: i64) -> Self {
Self(self.0 - other)
}
}
/// Unique ID for a `ParquetFile`
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]

View File

@ -5,7 +5,8 @@ use data_types2::{
Column, ColumnSchema, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace,
NamespaceId, NamespaceSchema, ParquetFile, ParquetFileId, ParquetFileParams, Partition,
PartitionId, PartitionInfo, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber,
Sequencer, SequencerId, Table, TableId, TableSchema, Timestamp, Tombstone, TombstoneId,
Sequencer, SequencerId, Table, TableId, TablePartition, TableSchema, Timestamp, Tombstone,
TombstoneId,
};
use snafu::{OptionExt, Snafu};
use std::{collections::BTreeMap, convert::TryFrom, fmt::Debug, sync::Arc};
@ -441,6 +442,16 @@ pub trait ParquetFileRepo: Send + Sync {
/// define a file as a candidate for compaction
async fn level_0(&mut self, sequencer_id: SequencerId) -> Result<Vec<ParquetFile>>;
/// List parquet files for a given table partition, in a given time range, with compaction
/// level 1, and other criteria that define a file as a candidate for compaction with a level 0
/// file
async fn level_1(
&mut self,
table_partition: TablePartition,
min_time: Timestamp,
max_time: Timestamp,
) -> Result<Vec<ParquetFile>>;
/// Update the compaction level of the specified parquet files to level 1. Returns the IDs
/// of the files that were successfully updated.
async fn update_to_level_1(
@ -551,6 +562,7 @@ pub(crate) mod test_helpers {
test_tombstone(Arc::clone(&catalog)).await;
test_parquet_file(Arc::clone(&catalog)).await;
test_parquet_file_compaction_level_0(Arc::clone(&catalog)).await;
test_parquet_file_compaction_level_1(Arc::clone(&catalog)).await;
test_update_to_compaction_level_1(Arc::clone(&catalog)).await;
test_add_parquet_file_with_tombstones(Arc::clone(&catalog)).await;
test_txn_isolation(Arc::clone(&catalog)).await;
@ -1541,6 +1553,230 @@ pub(crate) mod test_helpers {
);
}
async fn test_parquet_file_compaction_level_1(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let kafka = repos.kafka_topics().create_or_get("foo").await.unwrap();
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
let namespace = repos
.namespaces()
.create(
"namespace_parquet_file_compaction_level_1_test",
"inf",
kafka.id,
pool.id,
)
.await
.unwrap();
let table = repos
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let other_table = repos
.tables()
.create_or_get("test_table2", namespace.id)
.await
.unwrap();
let sequencer = repos
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(100))
.await
.unwrap();
let other_sequencer = repos
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(101))
.await
.unwrap();
let partition = repos
.partitions()
.create_or_get("one", sequencer.id, table.id)
.await
.unwrap();
let other_partition = repos
.partitions()
.create_or_get("two", sequencer.id, table.id)
.await
.unwrap();
// Set up the window of times we're interested in level 1 files for
let query_min_time = Timestamp::new(5);
let query_max_time = Timestamp::new(10);
// Create a file with times entirely within the window
let parquet_file_params = ParquetFileParams {
sequencer_id: sequencer.id,
table_id: partition.table_id,
partition_id: partition.id,
object_store_id: Uuid::new_v4(),
min_sequence_number: SequenceNumber::new(10),
max_sequence_number: SequenceNumber::new(140),
min_time: query_min_time + 1,
max_time: query_max_time - 1,
file_size_bytes: 1337,
parquet_metadata: b"md1".to_vec(),
row_count: 0,
created_at: Timestamp::new(1),
};
let parquet_file = repos
.parquet_files()
.create(parquet_file_params.clone())
.await
.unwrap();
// Create a file that will remain as level 0
let level_0_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
..parquet_file_params.clone()
};
let _level_0_file = repos.parquet_files().create(level_0_params).await.unwrap();
// Create a file completely before the window
let too_early_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
min_time: query_min_time - 2,
max_time: query_min_time - 1,
..parquet_file_params.clone()
};
let too_early_file = repos
.parquet_files()
.create(too_early_params)
.await
.unwrap();
// Create a file overlapping the window on the lower end
let overlap_lower_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
min_time: query_min_time - 1,
max_time: query_min_time + 1,
..parquet_file_params.clone()
};
let overlap_lower_file = repos
.parquet_files()
.create(overlap_lower_params)
.await
.unwrap();
// Create a file overlapping the window on the upper end
let overlap_upper_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
min_time: query_max_time - 1,
max_time: query_max_time + 1,
..parquet_file_params.clone()
};
let overlap_upper_file = repos
.parquet_files()
.create(overlap_upper_params)
.await
.unwrap();
// Create a file completely after the window
let too_late_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
min_time: query_max_time + 1,
max_time: query_max_time + 2,
..parquet_file_params.clone()
};
let too_late_file = repos.parquet_files().create(too_late_params).await.unwrap();
// Create a file for some other sequencer
let other_sequencer_params = ParquetFileParams {
sequencer_id: other_sequencer.id,
object_store_id: Uuid::new_v4(),
..parquet_file_params.clone()
};
let other_sequencer_file = repos
.parquet_files()
.create(other_sequencer_params)
.await
.unwrap();
// Create a file for the same sequencer but a different table
let other_table_params = ParquetFileParams {
table_id: other_table.id,
object_store_id: Uuid::new_v4(),
..parquet_file_params.clone()
};
let other_table_file = repos
.parquet_files()
.create(other_table_params)
.await
.unwrap();
// Create a file for the same sequencer and table but a different partition
let other_partition_params = ParquetFileParams {
partition_id: other_partition.id,
object_store_id: Uuid::new_v4(),
..parquet_file_params.clone()
};
let other_partition_file = repos
.parquet_files()
.create(other_partition_params)
.await
.unwrap();
// Create a file too big to be considered
let too_big_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
file_size_bytes: MAX_COMPACT_SIZE + 1,
..parquet_file_params.clone()
};
let too_big_file = repos.parquet_files().create(too_big_params).await.unwrap();
// Create a file marked to be deleted
let to_delete_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
..parquet_file_params.clone()
};
let to_delete_file = repos
.parquet_files()
.create(to_delete_params)
.await
.unwrap();
repos
.parquet_files()
.flag_for_delete(to_delete_file.id)
.await
.unwrap();
// Make all but _level_0_file compaction level 1
repos
.parquet_files()
.update_to_level_1(&[
parquet_file.id,
too_early_file.id,
too_late_file.id,
overlap_lower_file.id,
overlap_upper_file.id,
other_sequencer_file.id,
other_table_file.id,
other_partition_file.id,
too_big_file.id,
to_delete_file.id,
])
.await
.unwrap();
// Level 1 parquet files for a sequencer should contain only those that match the right
// criteria
let table_partition = TablePartition::new(sequencer.id, table.id, partition.id);
let level_1 = repos
.parquet_files()
.level_1(table_partition, query_min_time, query_max_time)
.await
.unwrap();
let mut level_1_ids: Vec<_> = level_1.iter().map(|pf| pf.id).collect();
level_1_ids.sort();
let expected = vec![parquet_file, overlap_lower_file, overlap_upper_file];
let mut expected_ids: Vec<_> = expected.iter().map(|pf| pf.id).collect();
expected_ids.sort();
assert_eq!(
level_1_ids, expected_ids,
"\nlevel 1: {:#?}\nexpected: {:#?}",
level_1, expected,
);
}
async fn test_update_to_compaction_level_1(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let kafka = repos.kafka_topics().create_or_get("foo").await.unwrap();

View File

@ -15,7 +15,7 @@ use data_types2::{
Column, ColumnId, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId,
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo,
ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table,
TableId, Timestamp, Tombstone, TombstoneId,
TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
};
use observability_deps::tracing::warn;
use std::fmt::Formatter;
@ -859,6 +859,31 @@ impl ParquetFileRepo for MemTxn {
.collect())
}
async fn level_1(
&mut self,
table_partition: TablePartition,
min_time: Timestamp,
max_time: Timestamp,
) -> Result<Vec<ParquetFile>> {
let stage = self.stage();
Ok(stage
.parquet_files
.iter()
.filter(|f| {
f.sequencer_id == table_partition.sequencer_id
&& f.table_id == table_partition.table_id
&& f.partition_id == table_partition.partition_id
&& f.compaction_level == 1
&& !f.to_delete
&& f.file_size_bytes <= MAX_COMPACT_SIZE
&& ((f.min_time <= min_time && f.max_time >= min_time)
|| (f.min_time > min_time && f.min_time <= max_time))
})
.cloned()
.collect())
}
async fn update_to_level_1(
&mut self,
parquet_file_ids: &[ParquetFileId],

View File

@ -10,7 +10,7 @@ use data_types2::{
Column, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId,
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo,
ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table,
TableId, Timestamp, Tombstone, TombstoneId,
TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
};
use metric::{Metric, U64Histogram, U64HistogramOptions};
use std::{fmt::Debug, sync::Arc};
@ -263,6 +263,7 @@ decorate!(
"parquet_list_by_sequencer_greater_than" = list_by_sequencer_greater_than(&mut self, sequencer_id: SequencerId, sequence_number: SequenceNumber) -> Result<Vec<ParquetFile>>;
"parquet_list_by_namespace_not_to_delete" = list_by_namespace_not_to_delete(&mut self, namespace_id: NamespaceId) -> Result<Vec<ParquetFile>>;
"parquet_level_0" = level_0(&mut self, sequencer_id: SequencerId) -> Result<Vec<ParquetFile>>;
"parquet_level_1" = level_1(&mut self, table_partition: TablePartition, min_time: Timestamp, max_time: Timestamp) -> Result<Vec<ParquetFile>>;
"parquet_update_to_level_1" = update_to_level_1(&mut self, parquet_file_ids: &[ParquetFileId]) -> Result<Vec<ParquetFileId>>;
"parquet_exist" = exist(&mut self, id: ParquetFileId) -> Result<bool>;
"parquet_count" = count(&mut self) -> Result<i64>;

View File

@ -14,7 +14,7 @@ use data_types2::{
Column, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId,
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo,
ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table,
TableId, Timestamp, Tombstone, TombstoneId,
TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
};
use observability_deps::tracing::{info, warn};
use sqlx::{migrate::Migrator, postgres::PgPoolOptions, Acquire, Executor, Postgres, Row};
@ -1300,6 +1300,52 @@ WHERE parquet_file.sequencer_id = $1
.map_err(|e| Error::SqlxError { source: e })
}
async fn level_1(
&mut self,
table_partition: TablePartition,
min_time: Timestamp,
max_time: Timestamp,
) -> Result<Vec<ParquetFile>> {
sqlx::query_as::<_, ParquetFile>(
r#"
SELECT
parquet_file.id as id,
parquet_file.sequencer_id as sequencer_id,
parquet_file.table_id as table_id,
parquet_file.partition_id as partition_id,
parquet_file.object_store_id as object_store_id,
parquet_file.min_sequence_number as min_sequence_number,
parquet_file.max_sequence_number as max_sequence_number,
parquet_file.min_time as min_time,
parquet_file.max_time as max_time,
parquet_file.to_delete as to_delete,
parquet_file.file_size_bytes as file_size_bytes,
parquet_file.parquet_metadata as parquet_metadata,
parquet_file.row_count as row_count,
parquet_file.compaction_level as compaction_level,
parquet_file.created_at as created_at
FROM parquet_file
WHERE parquet_file.sequencer_id = $1
AND parquet_file.table_id = $2
AND parquet_file.partition_id = $3
AND parquet_file.compaction_level = 1
AND parquet_file.to_delete = false
AND parquet_file.file_size_bytes <= $4
AND ((parquet_file.min_time <= $5 AND parquet_file.max_time >= $5)
OR (parquet_file.min_time > $5 AND parquet_file.min_time <= $6))
;"#,
)
.bind(&table_partition.sequencer_id) // $1
.bind(&table_partition.table_id) // $2
.bind(&table_partition.partition_id) // $3
.bind(MAX_COMPACT_SIZE) // $4
.bind(min_time) // $5
.bind(max_time) // $6
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn update_to_level_1(
&mut self,
parquet_file_ids: &[ParquetFileId],