refactor: Extract a ParquetFileParams type for create
This has the advantages of: - Not needing to create fake parquet file IDs or fake deleted_at values that aren't used by create before insertion - Not needing too many arguments for create - Naming the arguments so it's easier to see what value is what argument, especially in tests - Easier to reuse arguments or parts of arguments by using copies of params, which makes it easier to see differences, especially in testspull/24376/head
parent
f03ebd79ab
commit
ff31407dce
|
@ -619,7 +619,7 @@ pub struct Tombstone {
|
|||
pub serialized_predicate: String,
|
||||
}
|
||||
|
||||
/// Data for a parquet file reference in the catalog.
|
||||
/// Data for a parquet file reference that has been inserted in the catalog.
|
||||
#[derive(Debug, Clone, PartialEq, sqlx::FromRow)]
|
||||
pub struct ParquetFile {
|
||||
/// the id of the file in the catalog
|
||||
|
@ -654,6 +654,37 @@ pub struct ParquetFile {
|
|||
pub created_at: Timestamp,
|
||||
}
|
||||
|
||||
/// Data for a parquet file to be inserted into the catalog.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct ParquetFileParams {
|
||||
/// the sequencer that sequenced writes that went into this file
|
||||
pub sequencer_id: SequencerId,
|
||||
/// the table
|
||||
pub table_id: TableId,
|
||||
/// the partition
|
||||
pub partition_id: PartitionId,
|
||||
/// the uuid used in the object store path for this file
|
||||
pub object_store_id: Uuid,
|
||||
/// the minimum sequence number from a record in this file
|
||||
pub min_sequence_number: SequenceNumber,
|
||||
/// the maximum sequence number from a record in this file
|
||||
pub max_sequence_number: SequenceNumber,
|
||||
/// the min timestamp of data in this file
|
||||
pub min_time: Timestamp,
|
||||
/// the max timestamp of data in this file
|
||||
pub max_time: Timestamp,
|
||||
/// file size in bytes
|
||||
pub file_size_bytes: i64,
|
||||
/// thrift-encoded parquet metadata
|
||||
pub parquet_metadata: Vec<u8>,
|
||||
/// the number of rows of data in this file
|
||||
pub row_count: i64,
|
||||
/// the compaction level of the file
|
||||
pub compaction_level: i16,
|
||||
/// the creation time of the parquet file
|
||||
pub created_at: Timestamp,
|
||||
}
|
||||
|
||||
/// The starting compaction level for parquet files is zero.
|
||||
pub const INITIAL_COMPACTION_LEVEL: i16 = 0;
|
||||
|
||||
|
|
|
@ -242,24 +242,7 @@ impl Persister for IngesterData {
|
|||
Backoff::new(&self.backoff_config)
|
||||
.retry_all_errors("add parquet file to catalog", || async {
|
||||
let mut repos = self.catalog.repositories().await;
|
||||
repos
|
||||
.parquet_files()
|
||||
.create(
|
||||
parquet_file.sequencer_id,
|
||||
parquet_file.table_id,
|
||||
parquet_file.partition_id,
|
||||
parquet_file.object_store_id,
|
||||
parquet_file.min_sequence_number,
|
||||
parquet_file.max_sequence_number,
|
||||
parquet_file.min_time,
|
||||
parquet_file.max_time,
|
||||
parquet_file.file_size_bytes,
|
||||
parquet_file.parquet_metadata.clone(),
|
||||
parquet_file.row_count,
|
||||
parquet_file.compaction_level,
|
||||
parquet_file.created_at,
|
||||
)
|
||||
.await
|
||||
repos.parquet_files().create(parquet_file.clone()).await
|
||||
})
|
||||
.await
|
||||
.expect("retry forever");
|
||||
|
@ -1262,7 +1245,7 @@ mod tests {
|
|||
use super::*;
|
||||
use crate::{lifecycle::LifecycleConfig, test_util::create_tombstone};
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
use data_types2::{NamespaceSchema, Sequence};
|
||||
use data_types2::{NamespaceSchema, ParquetFileParams, Sequence};
|
||||
use dml::{DmlMeta, DmlWrite};
|
||||
use futures::TryStreamExt;
|
||||
use iox_catalog::{mem::MemCatalog, validate_or_insert_schema};
|
||||
|
@ -1909,23 +1892,24 @@ mod tests {
|
|||
.create_or_get("1970-01-01", sequencer.id, table.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let parquet_file_params = ParquetFileParams {
|
||||
sequencer_id: sequencer.id,
|
||||
table_id: table.id,
|
||||
partition_id: partition.id,
|
||||
object_store_id: Uuid::new_v4(),
|
||||
min_sequence_number: SequenceNumber::new(0),
|
||||
max_sequence_number: SequenceNumber::new(1),
|
||||
min_time: Timestamp::new(1),
|
||||
max_time: Timestamp::new(1),
|
||||
file_size_bytes: 0,
|
||||
parquet_metadata: vec![],
|
||||
row_count: 0,
|
||||
compaction_level: 0,
|
||||
created_at: Timestamp::new(1),
|
||||
};
|
||||
repos
|
||||
.parquet_files()
|
||||
.create(
|
||||
sequencer.id,
|
||||
table.id,
|
||||
partition.id,
|
||||
Uuid::new_v4(),
|
||||
SequenceNumber::new(0),
|
||||
SequenceNumber::new(1),
|
||||
Timestamp::new(1),
|
||||
Timestamp::new(1),
|
||||
0,
|
||||
vec![],
|
||||
0,
|
||||
0,
|
||||
Timestamp::new(1),
|
||||
)
|
||||
.create(parquet_file_params)
|
||||
.await
|
||||
.unwrap();
|
||||
std::mem::drop(repos);
|
||||
|
|
|
@ -3,9 +3,9 @@
|
|||
use async_trait::async_trait;
|
||||
use data_types2::{
|
||||
Column, ColumnSchema, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace,
|
||||
NamespaceId, NamespaceSchema, ParquetFile, ParquetFileId, Partition, PartitionId,
|
||||
PartitionInfo, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer,
|
||||
SequencerId, Table, TableId, TableSchema, Timestamp, Tombstone, TombstoneId,
|
||||
NamespaceId, NamespaceSchema, ParquetFile, ParquetFileId, ParquetFileParams, Partition,
|
||||
PartitionId, PartitionInfo, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber,
|
||||
Sequencer, SequencerId, Table, TableId, TableSchema, Timestamp, Tombstone, TombstoneId,
|
||||
};
|
||||
use snafu::{OptionExt, Snafu};
|
||||
use std::{collections::BTreeMap, convert::TryFrom, fmt::Debug, sync::Arc};
|
||||
|
@ -410,23 +410,7 @@ pub trait TombstoneRepo: Send + Sync {
|
|||
#[async_trait]
|
||||
pub trait ParquetFileRepo: Send + Sync {
|
||||
/// create the parquet file
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn create(
|
||||
&mut self,
|
||||
sequencer_id: SequencerId,
|
||||
table_id: TableId,
|
||||
partition_id: PartitionId,
|
||||
object_store_id: Uuid,
|
||||
min_sequence_number: SequenceNumber,
|
||||
max_sequence_number: SequenceNumber,
|
||||
min_time: Timestamp,
|
||||
max_time: Timestamp,
|
||||
file_size_bytes: i64,
|
||||
parquet_metadata: Vec<u8>,
|
||||
row_count: i64,
|
||||
level: i16,
|
||||
created_at: Timestamp,
|
||||
) -> Result<ParquetFile>;
|
||||
async fn create(&mut self, parquet_file_params: ParquetFileParams) -> Result<ParquetFile>;
|
||||
|
||||
/// Flag the parquet file for deletion
|
||||
async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()>;
|
||||
|
@ -759,23 +743,24 @@ pub(crate) mod test_helpers {
|
|||
.create_or_get("1970-01-01", seq.id, t.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let parquet_file_params = ParquetFileParams {
|
||||
sequencer_id: seq.id,
|
||||
table_id: t.id,
|
||||
partition_id: partition.id,
|
||||
object_store_id: Uuid::new_v4(),
|
||||
min_sequence_number: SequenceNumber::new(10),
|
||||
max_sequence_number: SequenceNumber::new(513),
|
||||
min_time: Timestamp::new(1),
|
||||
max_time: Timestamp::new(2),
|
||||
file_size_bytes: 0,
|
||||
parquet_metadata: vec![],
|
||||
row_count: 0,
|
||||
compaction_level: 0,
|
||||
created_at: Timestamp::new(1),
|
||||
};
|
||||
let p1 = repos
|
||||
.parquet_files()
|
||||
.create(
|
||||
seq.id,
|
||||
t.id,
|
||||
partition.id,
|
||||
Uuid::new_v4(),
|
||||
SequenceNumber::new(10),
|
||||
SequenceNumber::new(513),
|
||||
Timestamp::new(1),
|
||||
Timestamp::new(2),
|
||||
0,
|
||||
vec![],
|
||||
0,
|
||||
0,
|
||||
Timestamp::new(1),
|
||||
)
|
||||
.create(parquet_file_params.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let ti = repos
|
||||
|
@ -795,23 +780,15 @@ pub(crate) mod test_helpers {
|
|||
);
|
||||
|
||||
// and with another parquet file persisted
|
||||
let parquet_file_params = ParquetFileParams {
|
||||
object_store_id: Uuid::new_v4(),
|
||||
min_sequence_number: SequenceNumber::new(514),
|
||||
max_sequence_number: SequenceNumber::new(1008),
|
||||
..parquet_file_params
|
||||
};
|
||||
let p1 = repos
|
||||
.parquet_files()
|
||||
.create(
|
||||
seq.id,
|
||||
t.id,
|
||||
partition.id,
|
||||
Uuid::new_v4(),
|
||||
SequenceNumber::new(514),
|
||||
SequenceNumber::new(1008),
|
||||
Timestamp::new(1),
|
||||
Timestamp::new(2),
|
||||
0,
|
||||
vec![],
|
||||
0,
|
||||
0,
|
||||
Timestamp::new(1),
|
||||
)
|
||||
.create(parquet_file_params.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let ti = repos
|
||||
|
@ -1286,67 +1263,44 @@ pub(crate) mod test_helpers {
|
|||
let min_time = Timestamp::new(1);
|
||||
let max_time = Timestamp::new(10);
|
||||
|
||||
let parquet_file_params = ParquetFileParams {
|
||||
sequencer_id: sequencer.id,
|
||||
table_id: partition.table_id,
|
||||
partition_id: partition.id,
|
||||
object_store_id: Uuid::new_v4(),
|
||||
min_sequence_number: SequenceNumber::new(10),
|
||||
max_sequence_number: SequenceNumber::new(140),
|
||||
min_time,
|
||||
max_time,
|
||||
file_size_bytes: 1337,
|
||||
parquet_metadata: b"md1".to_vec(),
|
||||
row_count: 0,
|
||||
compaction_level: 0,
|
||||
created_at: Timestamp::new(1),
|
||||
};
|
||||
let parquet_file = repos
|
||||
.parquet_files()
|
||||
.create(
|
||||
sequencer.id,
|
||||
partition.table_id,
|
||||
partition.id,
|
||||
Uuid::new_v4(),
|
||||
SequenceNumber::new(10),
|
||||
SequenceNumber::new(140),
|
||||
min_time,
|
||||
max_time,
|
||||
1337,
|
||||
b"md1".to_vec(),
|
||||
0,
|
||||
0,
|
||||
Timestamp::new(1),
|
||||
)
|
||||
.create(parquet_file_params.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// verify that trying to create a file with the same UUID throws an error
|
||||
let err = repos
|
||||
.parquet_files()
|
||||
.create(
|
||||
sequencer.id,
|
||||
partition.table_id,
|
||||
partition.id,
|
||||
parquet_file.object_store_id,
|
||||
SequenceNumber::new(10),
|
||||
SequenceNumber::new(140),
|
||||
min_time,
|
||||
max_time,
|
||||
1338,
|
||||
b"md2".to_vec(),
|
||||
0,
|
||||
0,
|
||||
Timestamp::new(1),
|
||||
)
|
||||
.create(parquet_file_params.clone())
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(matches!(err, Error::FileExists { object_store_id: _ }));
|
||||
|
||||
let other_file = repos
|
||||
.parquet_files()
|
||||
.create(
|
||||
sequencer.id,
|
||||
other_partition.table_id,
|
||||
other_partition.id,
|
||||
Uuid::new_v4(),
|
||||
SequenceNumber::new(45),
|
||||
SequenceNumber::new(200),
|
||||
min_time,
|
||||
max_time,
|
||||
1339,
|
||||
b"md3".to_vec(),
|
||||
0,
|
||||
0,
|
||||
Timestamp::new(1),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let other_params = ParquetFileParams {
|
||||
table_id: other_partition.table_id,
|
||||
partition_id: other_partition.id,
|
||||
object_store_id: Uuid::new_v4(),
|
||||
min_sequence_number: SequenceNumber::new(45),
|
||||
max_sequence_number: SequenceNumber::new(200),
|
||||
..parquet_file_params.clone()
|
||||
};
|
||||
let other_file = repos.parquet_files().create(other_params).await.unwrap();
|
||||
|
||||
let exist_id = parquet_file.id;
|
||||
let non_exist_id = ParquetFileId::new(other_file.id.get() + 10);
|
||||
|
@ -1404,44 +1358,24 @@ pub(crate) mod test_helpers {
|
|||
.await
|
||||
.unwrap();
|
||||
assert_eq!(Vec::<ParquetFile>::new(), files);
|
||||
|
||||
let f1_params = ParquetFileParams {
|
||||
table_id: partition2.table_id,
|
||||
partition_id: partition2.id,
|
||||
object_store_id: Uuid::new_v4(),
|
||||
..parquet_file_params
|
||||
};
|
||||
let f1 = repos
|
||||
.parquet_files()
|
||||
.create(
|
||||
sequencer.id,
|
||||
partition2.table_id,
|
||||
partition2.id,
|
||||
Uuid::new_v4(),
|
||||
SequenceNumber::new(10),
|
||||
SequenceNumber::new(140),
|
||||
min_time,
|
||||
max_time,
|
||||
1337,
|
||||
b"md4".to_vec(),
|
||||
0,
|
||||
0,
|
||||
Timestamp::new(1),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let f2 = repos
|
||||
.parquet_files()
|
||||
.create(
|
||||
sequencer.id,
|
||||
partition2.table_id,
|
||||
partition2.id,
|
||||
Uuid::new_v4(),
|
||||
SequenceNumber::new(10),
|
||||
SequenceNumber::new(140),
|
||||
min_time,
|
||||
max_time,
|
||||
1337,
|
||||
b"md4".to_vec(),
|
||||
0,
|
||||
0,
|
||||
Timestamp::new(1),
|
||||
)
|
||||
.create(f1_params.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let f2_params = ParquetFileParams {
|
||||
object_store_id: Uuid::new_v4(),
|
||||
..f1_params
|
||||
};
|
||||
let f2 = repos.parquet_files().create(f2_params).await.unwrap();
|
||||
let files = repos
|
||||
.parquet_files()
|
||||
.list_by_namespace_not_to_delete(namespace2.id)
|
||||
|
@ -1533,9 +1467,8 @@ pub(crate) mod test_helpers {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
// Prepare metadata in form of ParquetFile to get added with tombstone
|
||||
let parquet = ParquetFile {
|
||||
id: ParquetFileId::new(0), //fake id that will never be used
|
||||
// Prepare metadata in form of ParquetFileParams to get added with tombstone
|
||||
let parquet = ParquetFileParams {
|
||||
sequencer_id: sequencer.id,
|
||||
table_id: table.id,
|
||||
partition_id: partition.id,
|
||||
|
@ -1544,46 +1477,23 @@ pub(crate) mod test_helpers {
|
|||
max_sequence_number: SequenceNumber::new(10),
|
||||
min_time,
|
||||
max_time,
|
||||
to_delete: false,
|
||||
file_size_bytes: 1337,
|
||||
parquet_metadata: b"md1".to_vec(),
|
||||
row_count: 0,
|
||||
compaction_level: 0,
|
||||
created_at: Timestamp::new(1),
|
||||
};
|
||||
let other_parquet = ParquetFile {
|
||||
id: ParquetFileId::new(0), //fake id that will never be used
|
||||
sequencer_id: sequencer.id,
|
||||
table_id: table.id,
|
||||
partition_id: partition.id,
|
||||
let other_parquet = ParquetFileParams {
|
||||
object_store_id: Uuid::new_v4(),
|
||||
min_sequence_number: SequenceNumber::new(11),
|
||||
max_sequence_number: SequenceNumber::new(20),
|
||||
min_time,
|
||||
max_time,
|
||||
to_delete: false,
|
||||
file_size_bytes: 1338,
|
||||
parquet_metadata: b"md2".to_vec(),
|
||||
row_count: 0,
|
||||
compaction_level: 0,
|
||||
created_at: Timestamp::new(1),
|
||||
..parquet.clone()
|
||||
};
|
||||
let another_parquet = ParquetFile {
|
||||
id: ParquetFileId::new(0), //fake id that will never be used
|
||||
sequencer_id: sequencer.id,
|
||||
table_id: table.id,
|
||||
partition_id: partition.id,
|
||||
let another_parquet = ParquetFileParams {
|
||||
object_store_id: Uuid::new_v4(),
|
||||
min_sequence_number: SequenceNumber::new(21),
|
||||
max_sequence_number: SequenceNumber::new(30),
|
||||
min_time,
|
||||
max_time,
|
||||
to_delete: false,
|
||||
file_size_bytes: 1339,
|
||||
parquet_metadata: b"md3".to_vec(),
|
||||
row_count: 0,
|
||||
compaction_level: 0,
|
||||
created_at: Timestamp::new(1),
|
||||
..parquet.clone()
|
||||
};
|
||||
|
||||
let parquet_file_count_before = txn.parquet_files().count().await.unwrap();
|
||||
|
@ -1592,10 +1502,13 @@ pub(crate) mod test_helpers {
|
|||
|
||||
// Add parquet and processed tombstone in one transaction
|
||||
let mut txn = catalog.start_transaction().await.unwrap();
|
||||
let (parquet_file, p_tombstones) =
|
||||
add_parquet_file_with_tombstones(&parquet, &[t1.clone(), t2.clone()], txn.deref_mut())
|
||||
.await
|
||||
.unwrap();
|
||||
let (parquet_file, p_tombstones) = add_parquet_file_with_tombstones(
|
||||
parquet.clone(),
|
||||
&[t1.clone(), t2.clone()],
|
||||
txn.deref_mut(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
txn.commit().await.unwrap();
|
||||
assert_eq!(p_tombstones.len(), 2);
|
||||
assert_eq!(t1.id, p_tombstones[0].tombstone_id);
|
||||
|
@ -1625,7 +1538,7 @@ pub(crate) mod test_helpers {
|
|||
|
||||
// Error due to duplicate parquet file
|
||||
let mut txn = catalog.start_transaction().await.unwrap();
|
||||
add_parquet_file_with_tombstones(&parquet, &[t3.clone(), t1.clone()], txn.deref_mut())
|
||||
add_parquet_file_with_tombstones(parquet, &[t3.clone(), t1.clone()], txn.deref_mut())
|
||||
.await
|
||||
.unwrap_err();
|
||||
txn.abort().await.unwrap();
|
||||
|
@ -1640,7 +1553,7 @@ pub(crate) mod test_helpers {
|
|||
|
||||
// Add new parquet and new tombstone. Should go trhough
|
||||
let (parquet_file, p_tombstones) =
|
||||
add_parquet_file_with_tombstones(&other_parquet, &[t3.clone()], txn.deref_mut())
|
||||
add_parquet_file_with_tombstones(other_parquet, &[t3.clone()], txn.deref_mut())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(p_tombstones.len(), 1);
|
||||
|
@ -1664,7 +1577,7 @@ pub(crate) mod test_helpers {
|
|||
let mut txn = catalog.start_transaction().await.unwrap();
|
||||
let mut t4 = t3.clone();
|
||||
t4.id = TombstoneId::new(t4.id.get() + 10);
|
||||
add_parquet_file_with_tombstones(&another_parquet, &[t4], txn.deref_mut())
|
||||
add_parquet_file_with_tombstones(another_parquet, &[t4], txn.deref_mut())
|
||||
.await
|
||||
.unwrap_err();
|
||||
txn.abort().await.unwrap();
|
||||
|
|
|
@ -13,8 +13,8 @@
|
|||
|
||||
use crate::interface::{Error, Result, Transaction};
|
||||
use data_types2::{
|
||||
ColumnType, KafkaPartition, KafkaTopic, NamespaceSchema, ParquetFile, ProcessedTombstone,
|
||||
QueryPool, Sequencer, SequencerId, TableSchema, Tombstone,
|
||||
ColumnType, KafkaPartition, KafkaTopic, NamespaceSchema, ParquetFile, ParquetFileParams,
|
||||
ProcessedTombstone, QueryPool, Sequencer, SequencerId, TableSchema, Tombstone,
|
||||
};
|
||||
use interface::{ColumnUpsertRequest, RepoCollection};
|
||||
use mutable_batch::MutableBatch;
|
||||
|
@ -196,29 +196,12 @@ pub async fn create_or_get_default_records(
|
|||
// TODO: this function is no longer needed in the ingester. It might be needed by the Compactor
|
||||
/// Insert the conpacted parquet file and its tombstones
|
||||
pub async fn add_parquet_file_with_tombstones(
|
||||
parquet_file: &ParquetFile,
|
||||
parquet_file: ParquetFileParams,
|
||||
tombstones: &[Tombstone],
|
||||
txn: &mut dyn Transaction,
|
||||
) -> Result<(ParquetFile, Vec<ProcessedTombstone>), Error> {
|
||||
// create a parquet file in the catalog first
|
||||
let parquet = txn
|
||||
.parquet_files()
|
||||
.create(
|
||||
parquet_file.sequencer_id,
|
||||
parquet_file.table_id,
|
||||
parquet_file.partition_id,
|
||||
parquet_file.object_store_id,
|
||||
parquet_file.min_sequence_number,
|
||||
parquet_file.max_sequence_number,
|
||||
parquet_file.min_time,
|
||||
parquet_file.max_time,
|
||||
parquet_file.file_size_bytes,
|
||||
parquet_file.parquet_metadata.clone(),
|
||||
parquet_file.row_count,
|
||||
parquet_file.compaction_level,
|
||||
parquet_file.created_at,
|
||||
)
|
||||
.await?;
|
||||
let parquet = txn.parquet_files().create(parquet_file).await?;
|
||||
|
||||
// Now the parquet available, create its processed tombstones
|
||||
let mut processed_tombstones = Vec::with_capacity(tombstones.len());
|
||||
|
|
|
@ -13,16 +13,15 @@ use crate::{
|
|||
use async_trait::async_trait;
|
||||
use data_types2::{
|
||||
Column, ColumnId, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId,
|
||||
ParquetFile, ParquetFileId, Partition, PartitionId, PartitionInfo, ProcessedTombstone,
|
||||
QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table, TableId, Timestamp,
|
||||
Tombstone, TombstoneId,
|
||||
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo,
|
||||
ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table,
|
||||
TableId, Timestamp, Tombstone, TombstoneId,
|
||||
};
|
||||
use observability_deps::tracing::warn;
|
||||
use std::fmt::Formatter;
|
||||
use std::sync::Arc;
|
||||
use std::{collections::HashSet, convert::TryFrom};
|
||||
use tokio::sync::{Mutex, OwnedMutexGuard};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// In-memory catalog that implements the `RepoCollection` and individual repo traits from
|
||||
/// the catalog interface.
|
||||
|
@ -750,24 +749,25 @@ impl TombstoneRepo for MemTxn {
|
|||
|
||||
#[async_trait]
|
||||
impl ParquetFileRepo for MemTxn {
|
||||
async fn create(
|
||||
&mut self,
|
||||
sequencer_id: SequencerId,
|
||||
table_id: TableId,
|
||||
partition_id: PartitionId,
|
||||
object_store_id: Uuid,
|
||||
min_sequence_number: SequenceNumber,
|
||||
max_sequence_number: SequenceNumber,
|
||||
min_time: Timestamp,
|
||||
max_time: Timestamp,
|
||||
file_size_bytes: i64,
|
||||
parquet_metadata: Vec<u8>,
|
||||
row_count: i64,
|
||||
level: i16,
|
||||
created_at: Timestamp,
|
||||
) -> Result<ParquetFile> {
|
||||
async fn create(&mut self, parquet_file_params: ParquetFileParams) -> Result<ParquetFile> {
|
||||
let stage = self.stage();
|
||||
|
||||
let ParquetFileParams {
|
||||
sequencer_id,
|
||||
table_id,
|
||||
partition_id,
|
||||
object_store_id,
|
||||
min_sequence_number,
|
||||
max_sequence_number,
|
||||
min_time,
|
||||
max_time,
|
||||
file_size_bytes,
|
||||
parquet_metadata,
|
||||
row_count,
|
||||
compaction_level,
|
||||
created_at,
|
||||
} = parquet_file_params;
|
||||
|
||||
if stage
|
||||
.parquet_files
|
||||
.iter()
|
||||
|
@ -790,7 +790,7 @@ impl ParquetFileRepo for MemTxn {
|
|||
to_delete: false,
|
||||
file_size_bytes,
|
||||
parquet_metadata,
|
||||
compaction_level: level,
|
||||
compaction_level,
|
||||
created_at,
|
||||
};
|
||||
stage.parquet_files.push(parquet_file);
|
||||
|
|
|
@ -8,14 +8,13 @@ use crate::interface::{
|
|||
use async_trait::async_trait;
|
||||
use data_types2::{
|
||||
Column, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId,
|
||||
ParquetFile, ParquetFileId, Partition, PartitionId, PartitionInfo, ProcessedTombstone,
|
||||
QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table, TableId, Timestamp,
|
||||
Tombstone, TombstoneId,
|
||||
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo,
|
||||
ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table,
|
||||
TableId, Timestamp, Tombstone, TombstoneId,
|
||||
};
|
||||
use metric::{Metric, U64Histogram, U64HistogramOptions};
|
||||
use std::{fmt::Debug, sync::Arc};
|
||||
use time::{SystemProvider, TimeProvider};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Decorates a implementation of the catalog's [`RepoCollection`] (and the
|
||||
/// transactional variant) with instrumentation that emits latency histograms
|
||||
|
@ -259,7 +258,7 @@ decorate!(
|
|||
decorate!(
|
||||
impl_trait = ParquetFileRepo,
|
||||
methods = [
|
||||
"parquet_create" = create( &mut self, sequencer_id: SequencerId, table_id: TableId, partition_id: PartitionId, object_store_id: Uuid, min_sequence_number: SequenceNumber, max_sequence_number: SequenceNumber, min_time: Timestamp, max_time: Timestamp, file_size_bytes: i64, parquet_metadata: Vec<u8>, row_count: i64, level: i16, created_at: Timestamp) -> Result<ParquetFile>;
|
||||
"parquet_create" = create( &mut self, parquet_file_params: ParquetFileParams) -> Result<ParquetFile>;
|
||||
"parquet_flag_for_delete" = flag_for_delete(&mut self, id: ParquetFileId) -> Result<()>;
|
||||
"parquet_list_by_sequencer_greater_than" = list_by_sequencer_greater_than(&mut self, sequencer_id: SequencerId, sequence_number: SequenceNumber) -> Result<Vec<ParquetFile>>;
|
||||
"parquet_list_by_namespace_not_to_delete" = list_by_namespace_not_to_delete(&mut self, namespace_id: NamespaceId) -> Result<Vec<ParquetFile>>;
|
||||
|
|
|
@ -12,15 +12,14 @@ use crate::{
|
|||
use async_trait::async_trait;
|
||||
use data_types2::{
|
||||
Column, ColumnType, KafkaPartition, KafkaTopic, KafkaTopicId, Namespace, NamespaceId,
|
||||
ParquetFile, ParquetFileId, Partition, PartitionId, PartitionInfo, ProcessedTombstone,
|
||||
QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table, TableId, Timestamp,
|
||||
Tombstone, TombstoneId,
|
||||
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo,
|
||||
ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Sequencer, SequencerId, Table,
|
||||
TableId, Timestamp, Tombstone, TombstoneId,
|
||||
};
|
||||
use observability_deps::tracing::{info, warn};
|
||||
use sqlx::{migrate::Migrator, postgres::PgPoolOptions, Acquire, Executor, Postgres, Row};
|
||||
use sqlx_hotswap_pool::HotSwapPool;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use uuid::Uuid;
|
||||
|
||||
const CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
const IDLE_TIMEOUT: Duration = Duration::from_secs(500);
|
||||
|
@ -1159,22 +1158,23 @@ WHERE table_name.namespace_id = $1;
|
|||
|
||||
#[async_trait]
|
||||
impl ParquetFileRepo for PostgresTxn {
|
||||
async fn create(
|
||||
&mut self,
|
||||
sequencer_id: SequencerId,
|
||||
table_id: TableId,
|
||||
partition_id: PartitionId,
|
||||
object_store_id: Uuid,
|
||||
min_sequence_number: SequenceNumber,
|
||||
max_sequence_number: SequenceNumber,
|
||||
min_time: Timestamp,
|
||||
max_time: Timestamp,
|
||||
file_size_bytes: i64,
|
||||
parquet_metadata: Vec<u8>,
|
||||
row_count: i64,
|
||||
level: i16,
|
||||
created_at: Timestamp,
|
||||
) -> Result<ParquetFile> {
|
||||
async fn create(&mut self, parquet_file_params: ParquetFileParams) -> Result<ParquetFile> {
|
||||
let ParquetFileParams {
|
||||
sequencer_id,
|
||||
table_id,
|
||||
partition_id,
|
||||
object_store_id,
|
||||
min_sequence_number,
|
||||
max_sequence_number,
|
||||
min_time,
|
||||
max_time,
|
||||
file_size_bytes,
|
||||
parquet_metadata,
|
||||
row_count,
|
||||
compaction_level,
|
||||
created_at,
|
||||
} = parquet_file_params;
|
||||
|
||||
let rec = sqlx::query_as::<_, ParquetFile>(
|
||||
r#"
|
||||
INSERT INTO parquet_file ( sequencer_id, table_id, partition_id, object_store_id, min_sequence_number, max_sequence_number, min_time, max_time, to_delete, file_size_bytes, parquet_metadata, row_count, compaction_level, created_at )
|
||||
|
@ -1193,7 +1193,7 @@ RETURNING *
|
|||
.bind(file_size_bytes) // $9
|
||||
.bind(parquet_metadata) // $10
|
||||
.bind(row_count) // $11
|
||||
.bind(level) // $12
|
||||
.bind(compaction_level) // $12
|
||||
.bind(created_at) // $13
|
||||
.fetch_one(&mut self.inner)
|
||||
.await
|
||||
|
|
|
@ -3,8 +3,9 @@
|
|||
use arrow::record_batch::RecordBatch;
|
||||
use bytes::Bytes;
|
||||
use data_types2::{
|
||||
ColumnType, InfluxDbType, KafkaPartition, KafkaTopic, Namespace, ParquetFile, Partition,
|
||||
QueryPool, SequenceNumber, Sequencer, Table, Timestamp, Tombstone,
|
||||
ColumnType, InfluxDbType, KafkaPartition, KafkaTopic, Namespace, ParquetFile,
|
||||
ParquetFileParams, Partition, QueryPool, SequenceNumber, Sequencer, Table, Timestamp,
|
||||
Tombstone,
|
||||
};
|
||||
use iox_catalog::{interface::Catalog, mem::MemCatalog};
|
||||
use iox_object_store::{IoxObjectStore, ParquetFilePath};
|
||||
|
@ -302,23 +303,24 @@ impl TestPartition {
|
|||
})
|
||||
.unwrap();
|
||||
|
||||
let parquet_file_params = ParquetFileParams {
|
||||
sequencer_id: self.sequencer.sequencer.id,
|
||||
table_id: self.table.table.id,
|
||||
partition_id: self.partition.id,
|
||||
object_store_id,
|
||||
min_sequence_number,
|
||||
max_sequence_number,
|
||||
min_time: Timestamp::new(ts_min_max.min),
|
||||
max_time: Timestamp::new(ts_min_max.max),
|
||||
file_size_bytes: file_size_bytes as i64,
|
||||
parquet_metadata: parquet_metadata_bin,
|
||||
row_count: row_count as i64,
|
||||
compaction_level: 0,
|
||||
created_at: Timestamp::new(1),
|
||||
};
|
||||
let parquet_file = repos
|
||||
.parquet_files()
|
||||
.create(
|
||||
self.sequencer.sequencer.id,
|
||||
self.table.table.id,
|
||||
self.partition.id,
|
||||
object_store_id,
|
||||
min_sequence_number,
|
||||
max_sequence_number,
|
||||
Timestamp::new(ts_min_max.min),
|
||||
Timestamp::new(ts_min_max.max),
|
||||
file_size_bytes as i64,
|
||||
parquet_metadata_bin,
|
||||
row_count as i64,
|
||||
0,
|
||||
Timestamp::new(1),
|
||||
)
|
||||
.create(parquet_file_params)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
|
|
@ -91,8 +91,8 @@ use data_types::{
|
|||
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics},
|
||||
};
|
||||
use data_types2::{
|
||||
NamespaceId, ParquetFile, ParquetFileId, PartitionId, SequenceNumber, SequencerId, TableId,
|
||||
Timestamp, INITIAL_COMPACTION_LEVEL,
|
||||
NamespaceId, ParquetFileParams, PartitionId, SequenceNumber, SequencerId, TableId, Timestamp,
|
||||
INITIAL_COMPACTION_LEVEL,
|
||||
};
|
||||
use generated_types::influxdata::iox::{
|
||||
ingester::v1 as proto, preserved_catalog::v1 as preserved_catalog,
|
||||
|
@ -635,9 +635,8 @@ impl IoxMetadata {
|
|||
&self,
|
||||
file_size_bytes: usize,
|
||||
metadata: &IoxParquetMetaData,
|
||||
) -> ParquetFile {
|
||||
ParquetFile {
|
||||
id: ParquetFileId::new(0), // this will be created in the DB. This 0 won't be used anywhere
|
||||
) -> ParquetFileParams {
|
||||
ParquetFileParams {
|
||||
sequencer_id: self.sequencer_id,
|
||||
table_id: self.table_id,
|
||||
partition_id: self.partition_id,
|
||||
|
@ -646,7 +645,6 @@ impl IoxMetadata {
|
|||
max_sequence_number: self.max_sequence_number,
|
||||
min_time: Timestamp::new(self.time_of_first_write.timestamp_nanos()),
|
||||
max_time: Timestamp::new(self.time_of_last_write.timestamp_nanos()),
|
||||
to_delete: false,
|
||||
file_size_bytes: file_size_bytes as i64,
|
||||
parquet_metadata: metadata.thrift_bytes().to_vec(),
|
||||
row_count: self.row_count,
|
||||
|
|
Loading…
Reference in New Issue