fix: avoid creating empty (0 dbs) snapshot file
parent
bdc879ea52
commit
0014ff1281
|
@ -357,17 +357,12 @@ impl Catalog {
|
|||
&self.inner
|
||||
}
|
||||
|
||||
pub fn table_id(&self, db_id: &DbId, name: Arc<str>) -> Option<TableId> {
|
||||
pub fn table_id(&self, db_id: &DbId, table_name: Arc<str>) -> Option<TableId> {
|
||||
let inner = self.inner.read();
|
||||
inner.databases.get(db_id).and_then(|db| {
|
||||
db.tables().find_map(|table_defn| {
|
||||
if table_defn.table_name == name {
|
||||
Some(table_defn.table_id)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
})
|
||||
inner
|
||||
.databases
|
||||
.get(db_id)
|
||||
.and_then(|db| db.table_name_to_id(table_name))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -92,12 +92,8 @@ impl SnapshotTracker {
|
|||
}
|
||||
|
||||
fn should_run_snapshot(&mut self, force_snapshot: bool) -> bool {
|
||||
// wal buffer can be empty but wal periods shouldn't be
|
||||
// this assumption doesn't hold anymore, we might have just added a single
|
||||
// no-op wal and it's wal period that means when force snapshot is set
|
||||
// wal periods are never empty but the queryable buffer may not hold
|
||||
// data that will get evicted to be snapshottedif the snapshots are happening
|
||||
// very close to each other.
|
||||
// When force_snapshot is set the wal_periods won't be empty, as call site always adds a
|
||||
// no-op when wal buffer is empty and adds the wal period
|
||||
if self.wal_periods.is_empty() {
|
||||
if force_snapshot {
|
||||
info!("cannot force a snapshot when wal periods are empty");
|
||||
|
|
|
@ -198,7 +198,6 @@ impl WriteBufferImpl {
|
|||
first_snapshot.next_file_id.set_next_id();
|
||||
}
|
||||
|
||||
debug!(?persisted_snapshots, ">>> all loaded snapshots");
|
||||
let persisted_files = Arc::new(PersistedFiles::new_from_persisted_snapshots(
|
||||
Arc::clone(&time_provider),
|
||||
persisted_snapshots,
|
||||
|
@ -2647,9 +2646,11 @@ mod tests {
|
|||
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn test_check_mem_and_force_snapshot() {
|
||||
// let obj_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
|
||||
let tmp_dir = test_helpers::tmp_dir().unwrap();
|
||||
debug!(?tmp_dir, ">>> using tmp dir for test");
|
||||
debug!(
|
||||
?tmp_dir,
|
||||
">>> using tmp dir for test_check_mem_and_force_snapshot"
|
||||
);
|
||||
let obj_store: Arc<dyn ObjectStore> =
|
||||
Arc::new(LocalFileSystem::new_with_prefix(tmp_dir).unwrap());
|
||||
let (write_buffer, _, _) = setup(
|
||||
|
@ -2702,28 +2703,10 @@ mod tests {
|
|||
let total_buffer_size_bytes_after = write_buffer.buffer.get_total_size_bytes();
|
||||
debug!(?total_buffer_size_bytes_after, ">>> total buffer size");
|
||||
assert!(total_buffer_size_bytes_before > total_buffer_size_bytes_after);
|
||||
let from = object_store::path::Path::from("test_host/snapshots/");
|
||||
let file_paths = load_files_from_obj_store(&obj_store, &from).await;
|
||||
debug!(?file_paths, ">>> obj store snapshots");
|
||||
for file_path in file_paths {
|
||||
let bytes = obj_store
|
||||
.get(&file_path)
|
||||
.await
|
||||
.unwrap()
|
||||
.bytes()
|
||||
.await
|
||||
.unwrap();
|
||||
let persisted_snapshot: PersistedSnapshot = serde_json::from_slice(&bytes).unwrap();
|
||||
assert!(persisted_snapshot.min_time != i64::MAX);
|
||||
assert!(persisted_snapshot.max_time != i64::MIN);
|
||||
}
|
||||
assert_dbs_not_empty_in_snapshot_file(&obj_store, "test_host").await;
|
||||
|
||||
let total_buffer_size_bytes_before = total_buffer_size_bytes_after;
|
||||
debug!(">>> 2nd snapshot..");
|
||||
// This is technically the same issue that has been observed when running under
|
||||
// high memory pressure, a snapshot file with no real content inside. The sample
|
||||
// file looks like below,
|
||||
//
|
||||
// PersistedSnapshot{
|
||||
// writer_id: "test_host",
|
||||
// next_file_id: ParquetFileId(1),
|
||||
|
@ -2739,35 +2722,28 @@ mod tests {
|
|||
// max_time: -9223372036854775808,
|
||||
// databases: SerdeVecMap({})
|
||||
// }
|
||||
// This snapshot file was observed when running under high memory pressure.
|
||||
//
|
||||
// The min/max time comes from the snapshot chunks that have been evicted from
|
||||
// the query buffer. But when there's nothing evicted then the min/max stays
|
||||
// the same as what they were initialized to i64::MAX/i64::MIN respectively.
|
||||
//
|
||||
// When forcing the snapshot for 2nd time here, the snapshotting process
|
||||
// kicks off again because force_snapshot is set although wal buffer is empty
|
||||
// and the wal periods are empty (cleared in 1st snapshot). In real run, the
|
||||
// _theory_ is when under high mem pressure, with 1s flush interval and snapshot
|
||||
// size 1, somehow the snapshotting process is queued to run immediately after
|
||||
// the previous one which leads to this undesirable state.
|
||||
//
|
||||
// So, queryable buffer is empty (or doesn't hold data to filter out with new no-ops end
|
||||
// time marker) when snapshot tracker's wal period is not empty
|
||||
// this leads to writing a wal file and a snapshot file with empty rows, dbs etc.
|
||||
// This however does not stop loading the data into memory as no empty parquet files are
|
||||
// written out.
|
||||
//
|
||||
// This however does not stop loading the data into memory as no empty
|
||||
// parquet files are written out. But this test recreates that issue and checks
|
||||
// object store directly to make sure inconsistent snapshot file isn't written
|
||||
// out in the first place
|
||||
check_mem_and_force_snapshot(&Arc::clone(&write_buffer), 50).await;
|
||||
let total_buffer_size_bytes_after = write_buffer.buffer.get_total_size_bytes();
|
||||
// no other writes so nothing can be snapshotted, so mem should stay same
|
||||
assert!(total_buffer_size_bytes_before == total_buffer_size_bytes_after);
|
||||
|
||||
drop(write_buffer);
|
||||
assert_dbs_not_empty_in_snapshot_file(&obj_store, "test_host").await;
|
||||
|
||||
// restart
|
||||
debug!(">>> Restarting..");
|
||||
let (_, _, _) = setup(
|
||||
Time::from_timestamp_nanos(0),
|
||||
let (write_buffer_after_restart, _, _) = setup(
|
||||
Time::from_timestamp_nanos(300),
|
||||
Arc::clone(&obj_store),
|
||||
WalConfig {
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
|
@ -2778,8 +2754,28 @@ mod tests {
|
|||
)
|
||||
.await;
|
||||
|
||||
let from = object_store::path::Path::from("test_host/snapshots/");
|
||||
let file_paths = load_files_from_obj_store(&obj_store, &from).await;
|
||||
assert_dbs_not_empty_in_snapshot_file(&obj_store, "test_host").await;
|
||||
drop(write_buffer_after_restart);
|
||||
|
||||
// restart
|
||||
debug!(">>> Restarting again..");
|
||||
let (_, _, _) = setup(
|
||||
Time::from_timestamp_nanos(400),
|
||||
Arc::clone(&obj_store),
|
||||
WalConfig {
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
max_write_buffer_size: 100_000,
|
||||
flush_interval: Duration::from_millis(10),
|
||||
snapshot_size: 10,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
assert_dbs_not_empty_in_snapshot_file(&obj_store, "test_host").await;
|
||||
}
|
||||
|
||||
async fn assert_dbs_not_empty_in_snapshot_file(obj_store: &Arc<dyn ObjectStore>, host: &str) {
|
||||
let from = Path::from(format!("{host}/snapshots/"));
|
||||
let file_paths = load_files_from_obj_store(obj_store, &from).await;
|
||||
debug!(?file_paths, ">>> obj store snapshots");
|
||||
for file_path in file_paths {
|
||||
let bytes = obj_store
|
||||
|
@ -2790,6 +2786,9 @@ mod tests {
|
|||
.await
|
||||
.unwrap();
|
||||
let persisted_snapshot: PersistedSnapshot = serde_json::from_slice(&bytes).unwrap();
|
||||
// dbs not empty
|
||||
assert!(!persisted_snapshot.databases.is_empty());
|
||||
// min and max times aren't defaults
|
||||
assert!(persisted_snapshot.min_time != i64::MAX);
|
||||
assert!(persisted_snapshot.max_time != i64::MIN);
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ use iox_query::frontend::reorg::ReorgPlanner;
|
|||
use iox_query::QueryChunk;
|
||||
use iox_time::TimeProvider;
|
||||
use object_store::path::Path;
|
||||
use observability_deps::tracing::{debug, error, info};
|
||||
use observability_deps::tracing::{error, info};
|
||||
use parking_lot::RwLock;
|
||||
use parquet::format::FileMetaData;
|
||||
use schema::sort::SortKey;
|
||||
|
@ -204,9 +204,6 @@ impl QueryableBuffer {
|
|||
.expect("table exists");
|
||||
let snapshot_chunks =
|
||||
table_buffer.snapshot(table_def, snapshot_details.end_time_marker);
|
||||
for chunk in &snapshot_chunks {
|
||||
debug!(?chunk.chunk_time, num_rows_in_chunk = ?chunk.record_batch.num_rows(), ">>> removing chunk with records");
|
||||
}
|
||||
|
||||
for chunk in snapshot_chunks {
|
||||
let table_name =
|
||||
|
@ -291,6 +288,7 @@ impl QueryableBuffer {
|
|||
catalog.sequence_number(),
|
||||
);
|
||||
let mut cache_notifiers = vec![];
|
||||
let persist_jobs_empty = persist_jobs.is_empty();
|
||||
for persist_job in persist_jobs {
|
||||
let path = persist_job.path.to_string();
|
||||
let database_id = persist_job.database_id;
|
||||
|
@ -338,7 +336,40 @@ impl QueryableBuffer {
|
|||
)
|
||||
}
|
||||
|
||||
// persist the snapshot file
|
||||
// persist the snapshot file - only if persist jobs are present
|
||||
// if persist_jobs is empty, then parquet file wouldn't have been
|
||||
// written out, so it's desirable to not write empty snapshot file.
|
||||
//
|
||||
// How can persist jobs be empty even though snapshot is triggered?
|
||||
//
|
||||
// When force snapshot is set, wal_periods (tracked by
|
||||
// snapshot_tracker) will never be empty as a no-op is added. This
|
||||
// means even though there is a wal period the query buffer might
|
||||
// still be empty. The reason is, when snapshots are happening very
|
||||
// close to each other (when force snapshot is set), they could get
|
||||
// queued to run immediately one after the other as illustrated in
|
||||
// example series of flushes and force snapshots below,
|
||||
//
|
||||
// 1 (only wal flush) // triggered by flush interval 1s
|
||||
// 2 (snapshot) // triggered by flush interval 1s
|
||||
// 3 (force_snapshot) // triggered by mem check interval 10s
|
||||
// 4 (force_snapshot) // triggered by mem check interval 10s
|
||||
//
|
||||
// Although the flush interval an mem check intervals aren't same
|
||||
// there's a good chance under high memory pressure there will be
|
||||
// a lot of overlapping.
|
||||
//
|
||||
// In this setup - after 2 (snapshot), we emptied wal buffer and as
|
||||
// soon as snapshot is done, 3 will try to run the snapshot but wal
|
||||
// buffer can be empty at this point, which means it adds a no-op.
|
||||
// no-op has the current time which will be used as the
|
||||
// end_time_marker. That would evict everything from query buffer, so
|
||||
// when 4 (force snapshot) runs there's no data in the query
|
||||
// buffer though it has a wal_period. When normal (i.e without
|
||||
// force_snapshot) snapshot runs, snapshot_tracker will check if
|
||||
// wal_periods are empty so it won't trigger a snapshot in the first
|
||||
// place.
|
||||
if !persist_jobs_empty {
|
||||
loop {
|
||||
match persister.persist_snapshot(&persisted_snapshot).await {
|
||||
Ok(_) => {
|
||||
|
@ -354,6 +385,7 @@ impl QueryableBuffer {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// clear out the write buffer and add all the persisted files to the persisted files
|
||||
// on a background task to ensure that the cache has been populated before we clear
|
||||
|
@ -363,6 +395,10 @@ impl QueryableBuffer {
|
|||
for notifier in cache_notifiers.into_iter().flatten() {
|
||||
let _ = notifier.await;
|
||||
}
|
||||
|
||||
// same reason as explained above, if persist jobs are empty, no snapshotting
|
||||
// has happened so no need to clear the snapshots
|
||||
if !persist_jobs_empty {
|
||||
let mut buffer = buffer.write();
|
||||
for (_, table_map) in buffer.db_to_table.iter_mut() {
|
||||
for (_, table_buffer) in table_map.iter_mut() {
|
||||
|
@ -371,6 +407,7 @@ impl QueryableBuffer {
|
|||
}
|
||||
|
||||
persisted_files.add_persisted_snapshot_files(persisted_snapshot);
|
||||
}
|
||||
});
|
||||
|
||||
let _ = sender.send(snapshot_details);
|
||||
|
|
Loading…
Reference in New Issue