fix: reproducer for the empty snapshot file issue
parent
30fcc2bf54
commit
bdc879ea52
|
@ -356,6 +356,19 @@ impl Catalog {
|
|||
pub fn inner(&self) -> &RwLock<InnerCatalog> {
|
||||
&self.inner
|
||||
}
|
||||
|
||||
pub fn table_id(&self, db_id: &DbId, name: Arc<str>) -> Option<TableId> {
|
||||
let inner = self.inner.read();
|
||||
inner.databases.get(db_id).and_then(|db| {
|
||||
db.tables().find_map(|table_defn| {
|
||||
if table_defn.table_name == name {
|
||||
Some(table_defn.table_id)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_with::serde_as]
|
||||
|
|
|
@ -93,6 +93,11 @@ impl SnapshotTracker {
|
|||
|
||||
fn should_run_snapshot(&mut self, force_snapshot: bool) -> bool {
|
||||
// wal buffer can be empty but wal periods shouldn't be
|
||||
// this assumption doesn't hold anymore, we might have just added a single
|
||||
// no-op wal and it's wal period that means when force snapshot is set
|
||||
// wal periods are never empty but the queryable buffer may not hold
|
||||
// data that will get evicted to be snapshottedif the snapshots are happening
|
||||
// very close to each other.
|
||||
if self.wal_periods.is_empty() {
|
||||
if force_snapshot {
|
||||
info!("cannot force a snapshot when wal periods are empty");
|
||||
|
|
|
@ -198,6 +198,7 @@ impl WriteBufferImpl {
|
|||
first_snapshot.next_file_id.set_next_id();
|
||||
}
|
||||
|
||||
debug!(?persisted_snapshots, ">>> all loaded snapshots");
|
||||
let persisted_files = Arc::new(PersistedFiles::new_from_persisted_snapshots(
|
||||
Arc::clone(&time_provider),
|
||||
persisted_snapshots,
|
||||
|
@ -881,7 +882,9 @@ mod tests {
|
|||
};
|
||||
use object_store::local::LocalFileSystem;
|
||||
use object_store::memory::InMemory;
|
||||
use object_store::path::Path;
|
||||
use object_store::{ObjectStore, PutPayload};
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn parse_lp_into_buffer() {
|
||||
|
@ -2644,7 +2647,11 @@ mod tests {
|
|||
|
||||
#[test_log::test(tokio::test)]
|
||||
async fn test_check_mem_and_force_snapshot() {
|
||||
let obj_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
|
||||
// let obj_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
|
||||
let tmp_dir = test_helpers::tmp_dir().unwrap();
|
||||
debug!(?tmp_dir, ">>> using tmp dir for test");
|
||||
let obj_store: Arc<dyn ObjectStore> =
|
||||
Arc::new(LocalFileSystem::new_with_prefix(tmp_dir).unwrap());
|
||||
let (write_buffer, _, _) = setup(
|
||||
Time::from_timestamp_nanos(0),
|
||||
Arc::clone(&obj_store),
|
||||
|
@ -2688,17 +2695,132 @@ mod tests {
|
|||
let total_buffer_size_bytes_before = write_buffer.buffer.get_total_size_bytes();
|
||||
debug!(?total_buffer_size_bytes_before, ">>> total buffer size");
|
||||
|
||||
check_mem_and_force_snapshot(&Arc::clone(&write_buffer), 100).await;
|
||||
debug!(">>> 1st snapshot..");
|
||||
check_mem_and_force_snapshot(&Arc::clone(&write_buffer), 50).await;
|
||||
|
||||
// check memory has gone down after forcing first snapshot
|
||||
let total_buffer_size_bytes_after = write_buffer.buffer.get_total_size_bytes();
|
||||
debug!(?total_buffer_size_bytes_after, ">>> total buffer size");
|
||||
assert!(total_buffer_size_bytes_before > total_buffer_size_bytes_after);
|
||||
let from = object_store::path::Path::from("test_host/snapshots/");
|
||||
let file_paths = load_files_from_obj_store(&obj_store, &from).await;
|
||||
debug!(?file_paths, ">>> obj store snapshots");
|
||||
for file_path in file_paths {
|
||||
let bytes = obj_store
|
||||
.get(&file_path)
|
||||
.await
|
||||
.unwrap()
|
||||
.bytes()
|
||||
.await
|
||||
.unwrap();
|
||||
let persisted_snapshot: PersistedSnapshot = serde_json::from_slice(&bytes).unwrap();
|
||||
assert!(persisted_snapshot.min_time != i64::MAX);
|
||||
assert!(persisted_snapshot.max_time != i64::MIN);
|
||||
}
|
||||
|
||||
// no other writes so nothing can be snapshotted, so mem should stay same
|
||||
let total_buffer_size_bytes_before = total_buffer_size_bytes_after;
|
||||
check_mem_and_force_snapshot(&Arc::clone(&write_buffer), 100).await;
|
||||
debug!(">>> 2nd snapshot..");
|
||||
// This is technically the same issue that has been observed when running under
|
||||
// high memory pressure, a snapshot file with no real content inside. The sample
|
||||
// file looks like below,
|
||||
//
|
||||
// PersistedSnapshot{
|
||||
// writer_id: "test_host",
|
||||
// next_file_id: ParquetFileId(1),
|
||||
// next_db_id: DbId(1),
|
||||
// next_table_id: TableId(1),
|
||||
// next_column_id: ColumnId(4),
|
||||
// snapshot_sequence_number: SnapshotSequenceNumber(2),
|
||||
// wal_file_sequence_number: WalFileSequenceNumber(22),
|
||||
// catalog_sequence_number: CatalogSequenceNumber(2),
|
||||
// parquet_size_bytes: 0,
|
||||
// row_count: 0,
|
||||
// min_time: 9223372036854775807,
|
||||
// max_time: -9223372036854775808,
|
||||
// databases: SerdeVecMap({})
|
||||
// }
|
||||
//
|
||||
// The min/max time comes from the snapshot chunks that have been evicted from
|
||||
// the query buffer. But when there's nothing evicted then the min/max stays
|
||||
// the same as what they were initialized to i64::MAX/i64::MIN respectively.
|
||||
//
|
||||
// When forcing the snapshot for 2nd time here, the snapshotting process
|
||||
// kicks off again because force_snapshot is set although wal buffer is empty
|
||||
// and the wal periods are empty (cleared in 1st snapshot). In real run, the
|
||||
// _theory_ is when under high mem pressure, with 1s flush interval and snapshot
|
||||
// size 1, somehow the snapshotting process is queued to run immediately after
|
||||
// the previous one which leads to this undesirable state.
|
||||
//
|
||||
// So, queryable buffer is empty (or doesn't hold data to filter out with new no-ops end
|
||||
// time marker) when snapshot tracker's wal period is not empty
|
||||
// this leads to writing a wal file and a snapshot file with empty rows, dbs etc.
|
||||
// This however does not stop loading the data into memory as no empty parquet files are
|
||||
// written out.
|
||||
//
|
||||
check_mem_and_force_snapshot(&Arc::clone(&write_buffer), 50).await;
|
||||
let total_buffer_size_bytes_after = write_buffer.buffer.get_total_size_bytes();
|
||||
// no other writes so nothing can be snapshotted, so mem should stay same
|
||||
assert!(total_buffer_size_bytes_before == total_buffer_size_bytes_after);
|
||||
|
||||
drop(write_buffer);
|
||||
|
||||
// restart
|
||||
debug!(">>> Restarting..");
|
||||
let (_, _, _) = setup(
|
||||
Time::from_timestamp_nanos(0),
|
||||
Arc::clone(&obj_store),
|
||||
WalConfig {
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
max_write_buffer_size: 100_000,
|
||||
flush_interval: Duration::from_millis(10),
|
||||
snapshot_size: 10,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
let from = object_store::path::Path::from("test_host/snapshots/");
|
||||
let file_paths = load_files_from_obj_store(&obj_store, &from).await;
|
||||
debug!(?file_paths, ">>> obj store snapshots");
|
||||
for file_path in file_paths {
|
||||
let bytes = obj_store
|
||||
.get(&file_path)
|
||||
.await
|
||||
.unwrap()
|
||||
.bytes()
|
||||
.await
|
||||
.unwrap();
|
||||
let persisted_snapshot: PersistedSnapshot = serde_json::from_slice(&bytes).unwrap();
|
||||
assert!(persisted_snapshot.min_time != i64::MAX);
|
||||
assert!(persisted_snapshot.max_time != i64::MIN);
|
||||
}
|
||||
}
|
||||
|
||||
async fn load_files_from_obj_store(
|
||||
object_store: &Arc<dyn ObjectStore>,
|
||||
path: &Path,
|
||||
) -> Vec<Path> {
|
||||
let mut paths = Vec::new();
|
||||
let mut offset: Option<Path> = None;
|
||||
loop {
|
||||
let mut listing = if let Some(offset) = offset {
|
||||
object_store.list_with_offset(Some(path), &offset)
|
||||
} else {
|
||||
object_store.list(Some(path))
|
||||
};
|
||||
let path_count = paths.len();
|
||||
|
||||
while let Some(item) = listing.next().await {
|
||||
paths.push(item.unwrap().location);
|
||||
}
|
||||
|
||||
if path_count == paths.len() {
|
||||
paths.sort();
|
||||
break;
|
||||
}
|
||||
|
||||
paths.sort();
|
||||
offset = Some(paths.last().unwrap().clone())
|
||||
}
|
||||
paths
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue