fix: ingester replay logic (#4212)
Fix the ingester to track the max persisted sequence number per partition. Ensure replay takes in data from unpersisted partitions. Simplify the table persist info to not return a max persisted sequence number for the table as that information isn't needed.pull/24376/head
parent
f1799d836f
commit
81d41f81a1
|
@ -257,7 +257,7 @@ impl Persister for IngesterData {
|
|||
|
||||
// and remove the persisted data from memory
|
||||
namespace
|
||||
.mark_persisted_and_remove_if_empty(
|
||||
.mark_persisted(
|
||||
&partition_info.table_name,
|
||||
&partition_info.partition.partition_key,
|
||||
iox_meta.max_sequence_number,
|
||||
|
@ -572,7 +572,6 @@ impl NamespaceData {
|
|||
Entry::Vacant(v) => {
|
||||
let v = v.insert(Arc::new(tokio::sync::RwLock::new(TableData::new(
|
||||
info.table_id,
|
||||
info.parquet_max_sequence_number,
|
||||
info.tombstone_max_sequence_number,
|
||||
))));
|
||||
self.table_count.inc(1);
|
||||
|
@ -584,10 +583,10 @@ impl NamespaceData {
|
|||
Ok(data)
|
||||
}
|
||||
|
||||
/// Walks down the table and partition and clears the persisting batch. If there is no
|
||||
/// data buffered in the partition, it is removed. The sequence number is the max_sequence_number
|
||||
/// for the persisted parquet file, which should be kept in the table data buffer.
|
||||
async fn mark_persisted_and_remove_if_empty(
|
||||
/// Walks down the table and partition and clears the persisting batch. The sequence number is
|
||||
/// the max_sequence_number for the persisted parquet file, which should be kept in the table
|
||||
/// data buffer.
|
||||
async fn mark_persisted(
|
||||
&self,
|
||||
table_name: &str,
|
||||
partition_key: &str,
|
||||
|
@ -598,19 +597,11 @@ impl NamespaceData {
|
|||
let partition = t.partition_data.get_mut(partition_key);
|
||||
|
||||
if let Some(p) = partition {
|
||||
p.data.max_persisted_sequence_number = Some(sequence_number);
|
||||
p.data.persisting = None;
|
||||
// clear the deletes kept for this persisting batch
|
||||
p.data.deletes_during_persisting.clear();
|
||||
if p.data.is_empty() {
|
||||
t.partition_data.remove(partition_key);
|
||||
}
|
||||
}
|
||||
|
||||
let max_sequence_number = t
|
||||
.parquet_max_sequence_number
|
||||
.unwrap_or(sequence_number)
|
||||
.max(sequence_number);
|
||||
t.parquet_max_sequence_number = Some(max_sequence_number);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -619,8 +610,6 @@ impl NamespaceData {
|
|||
#[derive(Debug)]
|
||||
pub(crate) struct TableData {
|
||||
table_id: TableId,
|
||||
// the max sequence number persisted for this table
|
||||
parquet_max_sequence_number: Option<SequenceNumber>,
|
||||
// the max sequence number for a tombstone associated with this table
|
||||
tombstone_max_sequence_number: Option<SequenceNumber>,
|
||||
// Map pf partition key to its data
|
||||
|
@ -629,14 +618,9 @@ pub(crate) struct TableData {
|
|||
|
||||
impl TableData {
|
||||
/// Initialize new table buffer
|
||||
pub fn new(
|
||||
table_id: TableId,
|
||||
parquet_max_sequence_number: Option<SequenceNumber>,
|
||||
tombstone_max_sequence_number: Option<SequenceNumber>,
|
||||
) -> Self {
|
||||
pub fn new(table_id: TableId, tombstone_max_sequence_number: Option<SequenceNumber>) -> Self {
|
||||
Self {
|
||||
table_id,
|
||||
parquet_max_sequence_number,
|
||||
tombstone_max_sequence_number,
|
||||
partition_data: Default::default(),
|
||||
}
|
||||
|
@ -646,13 +630,11 @@ impl TableData {
|
|||
#[cfg(test)]
|
||||
pub fn new_for_test(
|
||||
table_id: TableId,
|
||||
parquet_max_sequence_number: Option<SequenceNumber>,
|
||||
tombstone_max_sequence_number: Option<SequenceNumber>,
|
||||
partitions: BTreeMap<String, PartitionData>,
|
||||
) -> Self {
|
||||
Self {
|
||||
table_id,
|
||||
parquet_max_sequence_number,
|
||||
tombstone_max_sequence_number,
|
||||
partition_data: partitions,
|
||||
}
|
||||
|
@ -660,7 +642,11 @@ impl TableData {
|
|||
|
||||
/// Return parquet_max_sequence_number
|
||||
pub fn parquet_max_sequence_number(&self) -> Option<SequenceNumber> {
|
||||
self.parquet_max_sequence_number
|
||||
self.partition_data
|
||||
.values()
|
||||
.map(|p| p.data.max_persisted_sequence_number)
|
||||
.max()
|
||||
.flatten()
|
||||
}
|
||||
|
||||
/// Return tombstone_max_sequence_number
|
||||
|
@ -678,12 +664,6 @@ impl TableData {
|
|||
catalog: &dyn Catalog,
|
||||
lifecycle_handle: &LifecycleHandle,
|
||||
) -> Result<bool> {
|
||||
if let Some(max) = self.parquet_max_sequence_number {
|
||||
if sequence_number <= max {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
|
||||
let (_, col) = batch
|
||||
.columns()
|
||||
.find(|(name, _)| *name == TIME_COLUMN_NAME)
|
||||
|
@ -708,6 +688,13 @@ impl TableData {
|
|||
}
|
||||
};
|
||||
|
||||
// skip the write if it has already been persisted
|
||||
if let Some(max) = partition_data.data.max_persisted_sequence_number {
|
||||
if max >= sequence_number {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
|
||||
let should_pause = lifecycle_handle.log_write(
|
||||
partition_data.id,
|
||||
sequencer_id,
|
||||
|
@ -787,7 +774,20 @@ impl TableData {
|
|||
.create_or_get(partition_key, sequencer_id, self.table_id)
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
let data = PartitionData::new(partition.id);
|
||||
|
||||
// get info on the persisted parquet files to use later for replay or for snapshot
|
||||
// information on query.
|
||||
let files = repos
|
||||
.parquet_files()
|
||||
.list_by_partition_not_to_delete(partition.id)
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
// for now we just need the max persisted
|
||||
let max_persisted_sequence_number = files.iter().map(|p| p.max_sequence_number).max();
|
||||
|
||||
let mut data = PartitionData::new(partition.id);
|
||||
data.data.max_persisted_sequence_number = max_persisted_sequence_number;
|
||||
|
||||
self.partition_data.insert(partition.partition_key, data);
|
||||
|
||||
Ok(())
|
||||
|
@ -822,11 +822,13 @@ impl PartitionData {
|
|||
.snapshot_to_persisting(sequencer_id, table_id, partition_id, table_name)
|
||||
}
|
||||
|
||||
/// Clears the persisting batch and returns true if there is no other data in the partition.
|
||||
fn clear_persisting(&mut self) -> bool {
|
||||
/// Clears the persisting batch, updates the max_persisted_sequence_number.
|
||||
fn mark_persisted(&mut self) {
|
||||
if let Some(persisting) = &self.data.persisting {
|
||||
let (_, max) = persisting.data.min_max_sequence_numbers();
|
||||
self.data.max_persisted_sequence_number = Some(max);
|
||||
}
|
||||
self.data.persisting = None;
|
||||
|
||||
self.data.snapshots.is_empty() && self.data.buffer.is_none()
|
||||
}
|
||||
|
||||
/// Snapshot whatever is in the buffer and return a new vec of the
|
||||
|
@ -973,6 +975,9 @@ struct DataBuffer {
|
|||
/// Buffer of incoming writes
|
||||
pub(crate) buffer: Option<BufferBatch>,
|
||||
|
||||
/// The max_persisted_sequence number for any parquet_file in this partition
|
||||
pub(crate) max_persisted_sequence_number: Option<SequenceNumber>,
|
||||
|
||||
/// Buffer of tombstones whose time range may overlap with this partition.
|
||||
/// All tombstones were already applied to corresponding snapshots. This list
|
||||
/// only keep the ones that come during persisting. The reason
|
||||
|
@ -1587,12 +1592,9 @@ mod tests {
|
|||
let mem_table = n.table_data("mem").unwrap();
|
||||
let mem_table = mem_table.read().await;
|
||||
|
||||
// verify that the partition got removed from the table because it is now empty
|
||||
assert!(mem_table.partition_data.get("1970-01-01").is_none());
|
||||
|
||||
// verify that the parquet_max_sequence_number got updated
|
||||
assert_eq!(
|
||||
mem_table.parquet_max_sequence_number,
|
||||
mem_table.parquet_max_sequence_number(),
|
||||
Some(SequenceNumber::new(2))
|
||||
);
|
||||
}
|
||||
|
@ -1897,6 +1899,12 @@ mod tests {
|
|||
.create_or_get("1970-01-01", sequencer.id, table.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let partition2 = repos
|
||||
.partitions()
|
||||
.create_or_get("1970-01-02", sequencer.id, table.id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let parquet_file_params = ParquetFileParams {
|
||||
sequencer_id: sequencer.id,
|
||||
namespace_id: namespace.id,
|
||||
|
@ -1915,7 +1923,23 @@ mod tests {
|
|||
};
|
||||
repos
|
||||
.parquet_files()
|
||||
.create(parquet_file_params)
|
||||
.create(parquet_file_params.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// now create a parquet file in another partition with a much higher sequence persisted
|
||||
// sequence number. We want to make sure that this doesn't cause our write in the other
|
||||
// partition to get ignored.
|
||||
let other_file_params = ParquetFileParams {
|
||||
min_sequence_number: SequenceNumber::new(12),
|
||||
max_sequence_number: SequenceNumber::new(15),
|
||||
object_store_id: Uuid::new_v4(),
|
||||
partition_id: partition2.id,
|
||||
..parquet_file_params
|
||||
};
|
||||
repos
|
||||
.parquet_files()
|
||||
.create(other_file_params)
|
||||
.await
|
||||
.unwrap();
|
||||
std::mem::drop(repos);
|
||||
|
@ -1943,11 +1967,12 @@ mod tests {
|
|||
{
|
||||
let tables = data.tables.read();
|
||||
let table = tables.get("mem").unwrap().read().await;
|
||||
let p = table.partition_data.get("1970-01-01").unwrap();
|
||||
assert_eq!(
|
||||
table.parquet_max_sequence_number,
|
||||
p.data.max_persisted_sequence_number,
|
||||
Some(SequenceNumber::new(1))
|
||||
);
|
||||
assert!(table.partition_data.is_empty());
|
||||
assert!(p.data.buffer.is_none());
|
||||
}
|
||||
assert!(!should_pause);
|
||||
|
||||
|
|
|
@ -633,12 +633,10 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa
|
|||
let empty_tbl = Arc::new(tokio::sync::RwLock::new(TableData::new(
|
||||
empty_table_id,
|
||||
None,
|
||||
None,
|
||||
)));
|
||||
let data_tbl = Arc::new(tokio::sync::RwLock::new(TableData::new_for_test(
|
||||
data_table_id,
|
||||
None,
|
||||
None,
|
||||
partitions,
|
||||
)));
|
||||
tables.insert(TEST_TABLE_EMPTY.to_string(), empty_tbl);
|
||||
|
@ -683,7 +681,7 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa
|
|||
|
||||
// Two tables: one empty and one with data of one or two partitions
|
||||
let mut tables = BTreeMap::new();
|
||||
let data_tbl = TableData::new_for_test(data_table_id, None, None, partitions);
|
||||
let data_tbl = TableData::new_for_test(data_table_id, None, partitions);
|
||||
tables.insert(
|
||||
TEST_TABLE.to_string(),
|
||||
Arc::new(tokio::sync::RwLock::new(data_tbl)),
|
||||
|
|
|
@ -323,8 +323,6 @@ pub struct TablePersistInfo {
|
|||
pub sequencer_id: SequencerId,
|
||||
/// the global identifier for the table
|
||||
pub table_id: TableId,
|
||||
/// max max_sequence_number from this table's parquet_files for this sequencer
|
||||
pub parquet_max_sequence_number: Option<SequenceNumber>,
|
||||
/// max sequence number from this table's tombstones for this sequencer
|
||||
pub tombstone_max_sequence_number: Option<SequenceNumber>,
|
||||
}
|
||||
|
@ -925,78 +923,6 @@ pub(crate) mod test_helpers {
|
|||
TablePersistInfo {
|
||||
sequencer_id: seq.id,
|
||||
table_id: t.id,
|
||||
parquet_max_sequence_number: None,
|
||||
tombstone_max_sequence_number: None
|
||||
}
|
||||
);
|
||||
|
||||
// and now with a parquet file persisted
|
||||
let partition = repos
|
||||
.partitions()
|
||||
.create_or_get("1970-01-01", seq.id, t.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let parquet_file_params = ParquetFileParams {
|
||||
namespace_id: namespace.id,
|
||||
sequencer_id: seq.id,
|
||||
table_id: t.id,
|
||||
partition_id: partition.id,
|
||||
object_store_id: Uuid::new_v4(),
|
||||
min_sequence_number: SequenceNumber::new(10),
|
||||
max_sequence_number: SequenceNumber::new(513),
|
||||
min_time: Timestamp::new(1),
|
||||
max_time: Timestamp::new(2),
|
||||
file_size_bytes: 0,
|
||||
parquet_metadata: vec![],
|
||||
row_count: 0,
|
||||
compaction_level: INITIAL_COMPACTION_LEVEL,
|
||||
created_at: Timestamp::new(1),
|
||||
};
|
||||
let p1 = repos
|
||||
.parquet_files()
|
||||
.create(parquet_file_params.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let ti = repos
|
||||
.tables()
|
||||
.get_table_persist_info(seq.id, t.namespace_id, &t.name)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
ti,
|
||||
TablePersistInfo {
|
||||
sequencer_id: seq.id,
|
||||
table_id: t.id,
|
||||
parquet_max_sequence_number: Some(p1.max_sequence_number),
|
||||
tombstone_max_sequence_number: None
|
||||
}
|
||||
);
|
||||
|
||||
// and with another parquet file persisted
|
||||
let parquet_file_params = ParquetFileParams {
|
||||
object_store_id: Uuid::new_v4(),
|
||||
min_sequence_number: SequenceNumber::new(514),
|
||||
max_sequence_number: SequenceNumber::new(1008),
|
||||
..parquet_file_params
|
||||
};
|
||||
let p1 = repos
|
||||
.parquet_files()
|
||||
.create(parquet_file_params.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let ti = repos
|
||||
.tables()
|
||||
.get_table_persist_info(seq.id, t.namespace_id, &t.name)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
ti,
|
||||
TablePersistInfo {
|
||||
sequencer_id: seq.id,
|
||||
table_id: t.id,
|
||||
parquet_max_sequence_number: Some(p1.max_sequence_number),
|
||||
tombstone_max_sequence_number: None
|
||||
}
|
||||
);
|
||||
|
@ -1025,7 +951,6 @@ pub(crate) mod test_helpers {
|
|||
TablePersistInfo {
|
||||
sequencer_id: seq.id,
|
||||
table_id: t.id,
|
||||
parquet_max_sequence_number: Some(p1.max_sequence_number),
|
||||
tombstone_max_sequence_number: Some(tombstone.sequence_number),
|
||||
}
|
||||
);
|
||||
|
|
|
@ -445,12 +445,6 @@ impl TableRepo for MemTxn {
|
|||
.iter()
|
||||
.find(|t| t.name == table_name && t.namespace_id == namespace_id)
|
||||
{
|
||||
let parquet_max_sequence_number = stage
|
||||
.parquet_files
|
||||
.iter()
|
||||
.filter(|p| p.sequencer_id == sequencer_id && p.table_id == table.id)
|
||||
.max_by_key(|p| p.max_sequence_number)
|
||||
.map(|p| p.max_sequence_number);
|
||||
let tombstone_max_sequence_number = stage
|
||||
.tombstones
|
||||
.iter()
|
||||
|
@ -461,7 +455,6 @@ impl TableRepo for MemTxn {
|
|||
return Ok(Some(TablePersistInfo {
|
||||
sequencer_id,
|
||||
table_id: table.id,
|
||||
parquet_max_sequence_number,
|
||||
tombstone_max_sequence_number,
|
||||
}));
|
||||
}
|
||||
|
|
|
@ -799,7 +799,6 @@ WHERE namespace_id = $1;
|
|||
r#"
|
||||
WITH tid as (SELECT id FROM table_name WHERE name = $2 AND namespace_id = $3)
|
||||
SELECT $1 as sequencer_id, id as table_id,
|
||||
parquet_file.max_sequence_number AS parquet_max_sequence_number,
|
||||
tombstone.sequence_number as tombstone_max_sequence_number
|
||||
FROM tid
|
||||
LEFT JOIN (
|
||||
|
@ -809,13 +808,6 @@ LEFT JOIN (
|
|||
ORDER BY sequence_number DESC
|
||||
LIMIT 1
|
||||
) tombstone ON tombstone.table_id = tid.id
|
||||
LEFT JOIN (
|
||||
SELECT parquet_file.table_id, max_sequence_number
|
||||
FROM parquet_file
|
||||
WHERE parquet_file.sequencer_id = $1 AND parquet_file.table_id = (SELECT id from tid)
|
||||
ORDER BY max_sequence_number DESC
|
||||
LIMIT 1
|
||||
) parquet_file ON parquet_file.table_id = tid.id;
|
||||
"#,
|
||||
)
|
||||
.bind(&sequencer_id) // $1
|
||||
|
|
Loading…
Reference in New Issue