fix: Change to_delete column on parquet_files to be a time (#4117)
Set to_delete to the time the file was marked as deleted rather than true. Fixes #4059. Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
51da6dd7fa
commit
67e13a7c34
|
@ -1092,7 +1092,7 @@ mod tests {
|
|||
max_sequence_number: SequenceNumber::new(1),
|
||||
min_time: Timestamp::new(min_time),
|
||||
max_time: Timestamp::new(max_time),
|
||||
to_delete: false,
|
||||
to_delete: None,
|
||||
file_size_bytes: 0,
|
||||
parquet_metadata: vec![],
|
||||
row_count: 0,
|
||||
|
|
|
@ -718,8 +718,8 @@ pub struct ParquetFile {
|
|||
pub min_time: Timestamp,
|
||||
/// the max timestamp of data in this file
|
||||
pub max_time: Timestamp,
|
||||
/// flag to mark that this file should be deleted from object storage
|
||||
pub to_delete: bool,
|
||||
/// When this file was marked for deletion
|
||||
pub to_delete: Option<Timestamp>,
|
||||
/// file size in bytes
|
||||
pub file_size_bytes: i64,
|
||||
/// thrift-encoded parquet metadata
|
||||
|
|
|
@ -1580,7 +1580,7 @@ mod tests {
|
|||
assert_eq!(pf.min_sequence_number, SequenceNumber::new(1));
|
||||
assert_eq!(pf.max_sequence_number, SequenceNumber::new(2));
|
||||
assert_eq!(pf.sequencer_id, sequencer1.id);
|
||||
assert!(!pf.to_delete);
|
||||
assert!(pf.to_delete.is_none());
|
||||
|
||||
let mem_table = n.table_data("mem").unwrap();
|
||||
let mem_table = mem_table.read().await;
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
ALTER TABLE
|
||||
IF EXISTS parquet_file
|
||||
ALTER COLUMN to_delete
|
||||
TYPE BIGINT
|
||||
-- If to_delete is set to true, assign it a zero-but-not-null timestamp since we don't have the
|
||||
-- actual time it was marked to_delete, but we want it to be deleted. If to_delete is false or
|
||||
-- null, set it to null.
|
||||
USING CASE WHEN to_delete THEN 0 ELSE NULL END
|
||||
;
|
|
@ -1712,8 +1712,8 @@ pub(crate) mod test_helpers {
|
|||
.unwrap();
|
||||
assert_eq!(vec![other_file.clone()], files);
|
||||
|
||||
// verify that to_delete is initially set to false and that it can be updated to true
|
||||
assert!(!parquet_file.to_delete);
|
||||
// verify that to_delete is initially set to null and that it can be updated to a timestamp
|
||||
assert!(parquet_file.to_delete.is_none());
|
||||
repos
|
||||
.parquet_files()
|
||||
.flag_for_delete(parquet_file.id)
|
||||
|
@ -1724,7 +1724,7 @@ pub(crate) mod test_helpers {
|
|||
.list_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1))
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(files.first().unwrap().to_delete);
|
||||
assert!(files.first().unwrap().to_delete.is_some());
|
||||
|
||||
// test list_by_table_not_to_delete
|
||||
let files = repos
|
||||
|
|
|
@ -21,6 +21,7 @@ use observability_deps::tracing::warn;
|
|||
use std::fmt::Formatter;
|
||||
use std::sync::Arc;
|
||||
use std::{collections::HashSet, convert::TryFrom};
|
||||
use time::{SystemProvider, TimeProvider};
|
||||
use tokio::sync::{Mutex, OwnedMutexGuard};
|
||||
|
||||
/// In-memory catalog that implements the `RepoCollection` and individual repo traits from
|
||||
|
@ -46,7 +47,7 @@ impl std::fmt::Debug for MemCatalog {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone)]
|
||||
#[derive(Debug, Clone)]
|
||||
struct MemCollections {
|
||||
kafka_topics: Vec<KafkaTopic>,
|
||||
query_pools: Vec<QueryPool>,
|
||||
|
@ -58,6 +59,25 @@ struct MemCollections {
|
|||
tombstones: Vec<Tombstone>,
|
||||
parquet_files: Vec<ParquetFile>,
|
||||
processed_tombstones: Vec<ProcessedTombstone>,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
}
|
||||
|
||||
impl Default for MemCollections {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
kafka_topics: Default::default(),
|
||||
query_pools: Default::default(),
|
||||
namespaces: Default::default(),
|
||||
tables: Default::default(),
|
||||
columns: Default::default(),
|
||||
sequencers: Default::default(),
|
||||
partitions: Default::default(),
|
||||
tombstones: Default::default(),
|
||||
parquet_files: Default::default(),
|
||||
processed_tombstones: Default::default(),
|
||||
time_provider: Arc::new(SystemProvider::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -902,7 +922,7 @@ impl ParquetFileRepo for MemTxn {
|
|||
min_time,
|
||||
max_time,
|
||||
row_count,
|
||||
to_delete: false,
|
||||
to_delete: None,
|
||||
file_size_bytes,
|
||||
parquet_metadata,
|
||||
compaction_level: INITIAL_COMPACTION_LEVEL,
|
||||
|
@ -914,9 +934,10 @@ impl ParquetFileRepo for MemTxn {
|
|||
|
||||
async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> {
|
||||
let stage = self.stage();
|
||||
let marked_at = Timestamp::new(stage.time_provider.now().timestamp_nanos());
|
||||
|
||||
match stage.parquet_files.iter_mut().find(|p| p.id == id) {
|
||||
Some(f) => f.to_delete = true,
|
||||
Some(f) => f.to_delete = Some(marked_at),
|
||||
None => return Err(Error::ParquetRecordNotFound { id }),
|
||||
}
|
||||
|
||||
|
@ -953,7 +974,7 @@ impl ParquetFileRepo for MemTxn {
|
|||
let parquet_files: Vec<_> = stage
|
||||
.parquet_files
|
||||
.iter()
|
||||
.filter(|f| table_ids.contains(&f.table_id) && !f.to_delete)
|
||||
.filter(|f| table_ids.contains(&f.table_id) && f.to_delete.is_none())
|
||||
.cloned()
|
||||
.collect();
|
||||
Ok(parquet_files)
|
||||
|
@ -965,7 +986,7 @@ impl ParquetFileRepo for MemTxn {
|
|||
let parquet_files: Vec<_> = stage
|
||||
.parquet_files
|
||||
.iter()
|
||||
.filter(|f| table_id == f.table_id && !f.to_delete)
|
||||
.filter(|f| table_id == f.table_id && f.to_delete.is_none())
|
||||
.cloned()
|
||||
.collect();
|
||||
Ok(parquet_files)
|
||||
|
@ -977,7 +998,9 @@ impl ParquetFileRepo for MemTxn {
|
|||
Ok(stage
|
||||
.parquet_files
|
||||
.iter()
|
||||
.filter(|f| f.sequencer_id == sequencer_id && f.compaction_level == 0 && !f.to_delete)
|
||||
.filter(|f| {
|
||||
f.sequencer_id == sequencer_id && f.compaction_level == 0 && f.to_delete.is_none()
|
||||
})
|
||||
.cloned()
|
||||
.collect())
|
||||
}
|
||||
|
@ -998,7 +1021,7 @@ impl ParquetFileRepo for MemTxn {
|
|||
&& f.table_id == table_partition.table_id
|
||||
&& f.partition_id == table_partition.partition_id
|
||||
&& f.compaction_level == 1
|
||||
&& !f.to_delete
|
||||
&& f.to_delete.is_none()
|
||||
&& ((f.min_time <= min_time && f.max_time >= min_time)
|
||||
|| (f.min_time > min_time && f.min_time <= max_time))
|
||||
})
|
||||
|
|
|
@ -20,6 +20,7 @@ use observability_deps::tracing::{info, warn};
|
|||
use sqlx::{migrate::Migrator, postgres::PgPoolOptions, Acquire, Executor, Postgres, Row};
|
||||
use sqlx_hotswap_pool::HotSwapPool;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use time::{SystemProvider, TimeProvider};
|
||||
|
||||
const CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
const IDLE_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
@ -37,6 +38,7 @@ pub struct PostgresCatalog {
|
|||
metrics: Arc<metric::Registry>,
|
||||
pool: HotSwapPool<Postgres>,
|
||||
schema_name: String,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
}
|
||||
|
||||
// struct to get return value from "select count(*) ..." query
|
||||
|
@ -69,6 +71,7 @@ impl PostgresCatalog {
|
|||
pool,
|
||||
metrics,
|
||||
schema_name,
|
||||
time_provider: Arc::new(SystemProvider::new()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -77,6 +80,7 @@ impl PostgresCatalog {
|
|||
#[derive(Debug)]
|
||||
pub struct PostgresTxn {
|
||||
inner: PostgresTxnInner,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -246,6 +250,7 @@ impl Catalog for PostgresCatalog {
|
|||
Ok(Box::new(MetricDecorator::new(
|
||||
PostgresTxn {
|
||||
inner: PostgresTxnInner::Txn(Some(transaction)),
|
||||
time_provider: Arc::clone(&self.time_provider),
|
||||
},
|
||||
Arc::clone(&self.metrics),
|
||||
)))
|
||||
|
@ -255,6 +260,7 @@ impl Catalog for PostgresCatalog {
|
|||
Box::new(MetricDecorator::new(
|
||||
PostgresTxn {
|
||||
inner: PostgresTxnInner::Oneshot(self.pool.clone()),
|
||||
time_provider: Arc::clone(&self.time_provider),
|
||||
},
|
||||
Arc::clone(&self.metrics),
|
||||
))
|
||||
|
@ -1331,9 +1337,9 @@ impl ParquetFileRepo for PostgresTxn {
|
|||
r#"
|
||||
INSERT INTO parquet_file (
|
||||
sequencer_id, table_id, partition_id, object_store_id, min_sequence_number,
|
||||
max_sequence_number, min_time, max_time, to_delete, file_size_bytes, parquet_metadata,
|
||||
max_sequence_number, min_time, max_time, file_size_bytes, parquet_metadata,
|
||||
row_count, compaction_level, created_at )
|
||||
VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, false, $9, $10, $11, $12, $13 )
|
||||
VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 )
|
||||
RETURNING *;
|
||||
"#,
|
||||
)
|
||||
|
@ -1366,8 +1372,11 @@ RETURNING *;
|
|||
}
|
||||
|
||||
async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> {
|
||||
let _ = sqlx::query(r#"UPDATE parquet_file SET to_delete = true WHERE id = $1;"#)
|
||||
.bind(&id) // $1
|
||||
let marked_at = Timestamp::new(self.time_provider.now().timestamp_nanos());
|
||||
|
||||
let _ = sqlx::query(r#"UPDATE parquet_file SET to_delete = $1 WHERE id = $2;"#)
|
||||
.bind(&marked_at) // $1
|
||||
.bind(&id) // $2
|
||||
.execute(&mut self.inner)
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })?;
|
||||
|
@ -1406,7 +1415,7 @@ SELECT parquet_file.*
|
|||
FROM parquet_file
|
||||
INNER JOIN table_name on table_name.id = parquet_file.table_id
|
||||
WHERE table_name.namespace_id = $1
|
||||
AND parquet_file.to_delete = false;
|
||||
AND parquet_file.to_delete IS NULL;
|
||||
"#,
|
||||
)
|
||||
.bind(&namespace_id) // $1
|
||||
|
@ -1420,7 +1429,7 @@ WHERE table_name.namespace_id = $1
|
|||
r#"
|
||||
SELECT *
|
||||
FROM parquet_file
|
||||
WHERE table_id = $1 AND to_delete = false;
|
||||
WHERE table_id = $1 AND to_delete IS NULL;
|
||||
"#,
|
||||
)
|
||||
.bind(&table_id) // $1
|
||||
|
@ -1436,7 +1445,7 @@ SELECT *
|
|||
FROM parquet_file
|
||||
WHERE parquet_file.sequencer_id = $1
|
||||
AND parquet_file.compaction_level = 0
|
||||
AND parquet_file.to_delete = false;
|
||||
AND parquet_file.to_delete IS NULL;
|
||||
"#,
|
||||
)
|
||||
.bind(&sequencer_id) // $1
|
||||
|
@ -1459,7 +1468,7 @@ WHERE parquet_file.sequencer_id = $1
|
|||
AND parquet_file.table_id = $2
|
||||
AND parquet_file.partition_id = $3
|
||||
AND parquet_file.compaction_level = 1
|
||||
AND parquet_file.to_delete = false
|
||||
AND parquet_file.to_delete IS NULL
|
||||
AND ((parquet_file.min_time <= $5 AND parquet_file.max_time >= $4)
|
||||
OR (parquet_file.min_time > $5 AND parquet_file.min_time <= $5));
|
||||
"#,
|
||||
|
|
Loading…
Reference in New Issue