feat: add method to get table persistence information from catalog (#3848)

pull/24376/head
Paul Dix 2022-02-24 11:18:14 -05:00 committed by GitHub
parent 49d1be30e7
commit 8571c132cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 231 additions and 14 deletions

View File

@ -467,6 +467,27 @@ pub trait TableRepo: Send + Sync {
/// Lists all tables in the catalog for the given namespace id.
async fn list_by_namespace_id(&mut self, namespace_id: NamespaceId) -> Result<Vec<Table>>;
/// Gets the table persistence info for the given sequencer
async fn get_table_persist_info(
&mut self,
sequencer_id: SequencerId,
namespace_id: NamespaceId,
table_name: &str,
) -> Result<Option<TablePersistInfo>>;
}
/// Information for a table's persistence information for a specific sequencer from the catalog
#[derive(Debug, Copy, Clone, Eq, PartialEq, sqlx::FromRow)]
pub struct TablePersistInfo {
/// sequencer the sequence numbers are associated with
pub sequencer_id: SequencerId,
/// the global identifier for the table
pub table_id: TableId,
/// max max_sequence_number from this table's parquet_files for this sequencer
pub parquet_max_sequence_number: Option<SequenceNumber>,
/// max sequence number from this table's tombstones for this sequencer
pub tombstone_max_sequence_number: Option<SequenceNumber>,
}
/// Functions for working with columns in the catalog
@ -1197,7 +1218,7 @@ pub(crate) mod test_helpers {
.list_by_namespace_id(namespace.id)
.await
.unwrap();
assert_eq!(vec![t], tables);
assert_eq!(vec![t.clone()], tables);
// test we can create a table of the same name in a different namespace
let namespace2 = repos
@ -1213,6 +1234,130 @@ pub(crate) mod test_helpers {
.unwrap();
assert_ne!(tt, test_table);
assert_eq!(test_table.namespace_id, namespace2.id);
// test we can get table persistence info with no persistence so far
let seq = repos
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(555))
.await
.unwrap();
let ti = repos
.tables()
.get_table_persist_info(seq.id, t.namespace_id, &t.name)
.await
.unwrap()
.unwrap();
assert_eq!(
ti,
TablePersistInfo {
sequencer_id: seq.id,
table_id: t.id,
parquet_max_sequence_number: None,
tombstone_max_sequence_number: None
}
);
// and now with a parquet file persisted
let partition = repos
.partitions()
.create_or_get("1970-01-01", seq.id, t.id)
.await
.unwrap();
let p1 = repos
.parquet_files()
.create(
seq.id,
t.id,
partition.id,
Uuid::new_v4(),
SequenceNumber::new(10),
SequenceNumber::new(513),
Timestamp::new(1),
Timestamp::new(2),
0,
vec![],
0,
)
.await
.unwrap();
let ti = repos
.tables()
.get_table_persist_info(seq.id, t.namespace_id, &t.name)
.await
.unwrap()
.unwrap();
assert_eq!(
ti,
TablePersistInfo {
sequencer_id: seq.id,
table_id: t.id,
parquet_max_sequence_number: Some(p1.max_sequence_number),
tombstone_max_sequence_number: None
}
);
// and with another parquet file persisted
let p1 = repos
.parquet_files()
.create(
seq.id,
t.id,
partition.id,
Uuid::new_v4(),
SequenceNumber::new(514),
SequenceNumber::new(1008),
Timestamp::new(1),
Timestamp::new(2),
0,
vec![],
0,
)
.await
.unwrap();
let ti = repos
.tables()
.get_table_persist_info(seq.id, t.namespace_id, &t.name)
.await
.unwrap()
.unwrap();
assert_eq!(
ti,
TablePersistInfo {
sequencer_id: seq.id,
table_id: t.id,
parquet_max_sequence_number: Some(p1.max_sequence_number),
tombstone_max_sequence_number: None
}
);
// and now with a tombstone persisted
let tombstone = repos
.tombstones()
.create_or_get(
t.id,
seq.id,
SequenceNumber::new(2001),
Timestamp::new(1),
Timestamp::new(10),
"wahtevs",
)
.await
.unwrap();
let ti = repos
.tables()
.get_table_persist_info(seq.id, t.namespace_id, &t.name)
.await
.unwrap()
.unwrap();
assert_eq!(
ti,
TablePersistInfo {
sequencer_id: seq.id,
table_id: t.id,
parquet_max_sequence_number: Some(p1.max_sequence_number),
tombstone_max_sequence_number: Some(tombstone.sequence_number),
}
);
}
async fn test_column(catalog: Arc<dyn Catalog>) {
@ -1542,10 +1687,6 @@ pub(crate) mod test_helpers {
let min_time = Timestamp::new(1);
let max_time = Timestamp::new(10);
// Must have no parquet file records
let num_parquet_files = repos.parquet_files().count().await.unwrap();
assert_eq!(num_parquet_files, 0);
let parquet_file = repos
.parquet_files()
.create(
@ -1602,10 +1743,6 @@ pub(crate) mod test_helpers {
.await
.unwrap();
// Must have 2 parquet files
let num_parquet_files = repos.parquet_files().count().await.unwrap();
assert_eq!(num_parquet_files, 2);
let exist_id = parquet_file.id;
let non_exist_id = ParquetFileId::new(other_file.id.get() + 10);
// make sure exists_id != non_exist_id

View File

@ -7,8 +7,8 @@ use crate::interface::{
NamespaceRepo, ParquetFile, ParquetFileId, ParquetFileRepo, Partition, PartitionId,
PartitionInfo, PartitionRepo, ProcessedTombstone, ProcessedTombstoneRepo, QueryPool,
QueryPoolId, QueryPoolRepo, RepoCollection, Result, SequenceNumber, Sequencer, SequencerId,
SequencerRepo, Table, TableId, TableRepo, Timestamp, Tombstone, TombstoneId, TombstoneRepo,
Transaction,
SequencerRepo, Table, TableId, TablePersistInfo, TableRepo, Timestamp, Tombstone, TombstoneId,
TombstoneRepo, Transaction,
};
use async_trait::async_trait;
use observability_deps::tracing::warn;
@ -349,6 +349,43 @@ impl TableRepo for MemTxn {
.collect();
Ok(tables)
}
async fn get_table_persist_info(
&mut self,
sequencer_id: SequencerId,
namespace_id: NamespaceId,
table_name: &str,
) -> Result<Option<TablePersistInfo>> {
let stage = self.stage();
if let Some(table) = stage
.tables
.iter()
.find(|t| t.name == table_name && t.namespace_id == namespace_id)
{
let parquet_max_sequence_number = stage
.parquet_files
.iter()
.filter(|p| p.sequencer_id == sequencer_id && p.table_id == table.id)
.max_by_key(|p| p.max_sequence_number)
.map(|p| p.max_sequence_number);
let tombstone_max_sequence_number = stage
.tombstones
.iter()
.filter(|t| t.sequencer_id == sequencer_id && t.table_id == table.id)
.max_by_key(|t| t.sequence_number)
.map(|t| t.sequence_number);
return Ok(Some(TablePersistInfo {
sequencer_id,
table_id: table.id,
parquet_max_sequence_number,
tombstone_max_sequence_number,
}));
}
Ok(None)
}
}
#[async_trait]

View File

@ -14,7 +14,7 @@ use crate::{
ParquetFileId, ParquetFileRepo, Partition, PartitionId, PartitionInfo, PartitionRepo,
ProcessedTombstone, ProcessedTombstoneRepo, QueryPool, QueryPoolId, QueryPoolRepo,
RepoCollection, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId,
TableRepo, Timestamp, Tombstone, TombstoneId, TombstoneRepo,
TablePersistInfo, TableRepo, Timestamp, Tombstone, TombstoneId, TombstoneRepo,
},
Result,
};
@ -190,6 +190,7 @@ decorate!(
"table_create_or_get" = create_or_get(&mut self, name: &str, namespace_id: NamespaceId) -> Result<Table>;
"table_get_by_id" = get_by_id(&mut self, table_id: TableId) -> Result<Option<Table>>;
"table_list_by_namespace_id" = list_by_namespace_id(&mut self, namespace_id: NamespaceId) -> Result<Vec<Table>>;
"get_table_persist_info" = get_table_persist_info(&mut self, sequencer_id: SequencerId, namespace_id: NamespaceId, table_name: &str) -> Result<Option<TablePersistInfo>>;
]
);

View File

@ -7,8 +7,8 @@ use crate::{
NamespaceRepo, ParquetFile, ParquetFileId, ParquetFileRepo, Partition, PartitionId,
PartitionInfo, PartitionRepo, ProcessedTombstone, ProcessedTombstoneRepo, QueryPool,
QueryPoolId, QueryPoolRepo, RepoCollection, Result, SequenceNumber, Sequencer, SequencerId,
SequencerRepo, Table, TableId, TableRepo, Timestamp, Tombstone, TombstoneId, TombstoneRepo,
Transaction,
SequencerRepo, Table, TableId, TablePersistInfo, TableRepo, Timestamp, Tombstone,
TombstoneId, TombstoneRepo, Transaction,
},
metrics::MetricDecorator,
};
@ -626,6 +626,48 @@ WHERE namespace_id = $1;
Ok(rec)
}
async fn get_table_persist_info(
&mut self,
sequencer_id: SequencerId,
namespace_id: NamespaceId,
table_name: &str,
) -> Result<Option<TablePersistInfo>> {
let rec = sqlx::query_as::<_, TablePersistInfo>(
r#"
WITH tid as (SELECT id FROM table_name WHERE name = $2 AND namespace_id = $3)
SELECT $1 as sequencer_id, id as table_id, parquet_file.max_sequence_number AS parquet_max_sequence_number,
tombstone.sequence_number as tombstone_max_sequence_number
FROM tid
LEFT JOIN (
SELECT tombstone.table_id, sequence_number
FROM tombstone
WHERE sequencer_id = $1 AND tombstone.table_id = (SELECT id FROM tid)
ORDER BY sequence_number DESC
LIMIT 1
) tombstone ON tombstone.table_id = tid.id
LEFT JOIN (
SELECT parquet_file.table_id, max_sequence_number
FROM parquet_file
WHERE parquet_file.sequencer_id = $1 AND parquet_file.table_id = (SELECT id from tid)
ORDER BY max_sequence_number DESC
LIMIT 1
) parquet_file ON parquet_file.table_id = tid.id;
"#)
.bind(&sequencer_id) // $1
.bind(&table_name) // $2
.bind(&namespace_id) // $3
.fetch_one(&mut self.inner)
.await;
if let Err(sqlx::Error::RowNotFound) = rec {
return Ok(None);
}
let info = rec.map_err(|e| Error::SqlxError { source: e })?;
Ok(Some(info))
}
}
#[async_trait]