feat: add watch to writebuffer for persisted snapshots (#25330)
parent
ed2050f448
commit
341b8d7aff
|
@ -78,6 +78,9 @@ pub trait Bufferer: Debug + Send + Sync + 'static {
|
|||
|
||||
/// Returns the parquet files for a given database and table
|
||||
fn parquet_files(&self, db_name: &str, table_name: &str) -> Vec<ParquetFile>;
|
||||
|
||||
/// A channel to watch for when new persisted snapshots are created
|
||||
fn watch_persisted_snapshots(&self) -> tokio::sync::watch::Receiver<Option<PersistedSnapshot>>;
|
||||
}
|
||||
|
||||
/// ChunkContainer is used by the query engine to get chunks for a given table. Chunks will generally be in the
|
||||
|
|
|
@ -13,8 +13,8 @@ use crate::write_buffer::persisted_files::PersistedFiles;
|
|||
use crate::write_buffer::queryable_buffer::QueryableBuffer;
|
||||
use crate::write_buffer::validator::WriteValidator;
|
||||
use crate::{
|
||||
BufferedWriteRequest, Bufferer, ChunkContainer, LastCacheManager, ParquetFile, Precision,
|
||||
WriteBuffer, WriteLineError, NEXT_FILE_ID,
|
||||
BufferedWriteRequest, Bufferer, ChunkContainer, LastCacheManager, ParquetFile,
|
||||
PersistedSnapshot, Precision, WriteBuffer, WriteLineError, NEXT_FILE_ID,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use data_types::{ChunkId, ChunkOrder, ColumnType, NamespaceName, NamespaceNameError};
|
||||
|
@ -42,6 +42,7 @@ use std::sync::atomic::Ordering;
|
|||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
use tokio::sync::watch::Receiver;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
|
@ -491,6 +492,10 @@ impl Bufferer for WriteBufferImpl {
|
|||
fn parquet_files(&self, db_name: &str, table_name: &str) -> Vec<ParquetFile> {
|
||||
self.buffer.persisted_parquet_files(db_name, table_name)
|
||||
}
|
||||
|
||||
fn watch_persisted_snapshots(&self) -> Receiver<Option<PersistedSnapshot>> {
|
||||
self.buffer.persisted_snapshot_notify_rx()
|
||||
}
|
||||
}
|
||||
|
||||
impl ChunkContainer for WriteBufferImpl {
|
||||
|
@ -1510,6 +1515,55 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn notifies_watchers_of_snapshot() {
|
||||
let obj_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
|
||||
let (wbuf, _) = setup(
|
||||
Time::from_timestamp_nanos(0),
|
||||
Arc::clone(&obj_store),
|
||||
WalConfig {
|
||||
gen1_duration: Gen1Duration::new_1m(),
|
||||
max_write_buffer_size: 100,
|
||||
flush_interval: Duration::from_millis(10),
|
||||
snapshot_size: 1,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut watcher = wbuf.watch_persisted_snapshots();
|
||||
watcher.mark_changed();
|
||||
|
||||
let db_name = "coffee_shop";
|
||||
let tbl_name = "menu";
|
||||
|
||||
// do some writes to get a snapshot:
|
||||
do_writes(
|
||||
db_name,
|
||||
&wbuf,
|
||||
&[
|
||||
TestWrite {
|
||||
lp: format!("{tbl_name},name=espresso price=2.50"),
|
||||
time_seconds: 1,
|
||||
},
|
||||
TestWrite {
|
||||
lp: format!("{tbl_name},name=americano price=3.00"),
|
||||
time_seconds: 2,
|
||||
},
|
||||
TestWrite {
|
||||
lp: format!("{tbl_name},name=latte price=4.50"),
|
||||
time_seconds: 3,
|
||||
},
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
// wait for snapshot to be created:
|
||||
verify_snapshot_count(1, &wbuf.persister).await;
|
||||
watcher.changed().await.unwrap();
|
||||
let snapshot = watcher.borrow();
|
||||
assert!(snapshot.is_some(), "watcher should be notified of snapshot");
|
||||
}
|
||||
|
||||
struct TestWrite<LP> {
|
||||
lp: LP,
|
||||
time_seconds: i64,
|
||||
|
|
|
@ -40,6 +40,9 @@ pub struct QueryableBuffer {
|
|||
persister: Arc<Persister>,
|
||||
persisted_files: Arc<PersistedFiles>,
|
||||
buffer: Arc<RwLock<BufferState>>,
|
||||
/// Sends a notification to this watch channel whenever a snapshot info is persisted
|
||||
persisted_snapshot_notify_rx: tokio::sync::watch::Receiver<Option<PersistedSnapshot>>,
|
||||
persisted_snapshot_notify_tx: tokio::sync::watch::Sender<Option<PersistedSnapshot>>,
|
||||
}
|
||||
|
||||
impl QueryableBuffer {
|
||||
|
@ -51,6 +54,8 @@ impl QueryableBuffer {
|
|||
persisted_files: Arc<PersistedFiles>,
|
||||
) -> Self {
|
||||
let buffer = Arc::new(RwLock::new(BufferState::new(Arc::clone(&catalog))));
|
||||
let (persisted_snapshot_notify_tx, persisted_snapshot_notify_rx) =
|
||||
tokio::sync::watch::channel(None);
|
||||
Self {
|
||||
executor,
|
||||
catalog,
|
||||
|
@ -58,6 +63,8 @@ impl QueryableBuffer {
|
|||
persister,
|
||||
persisted_files,
|
||||
buffer,
|
||||
persisted_snapshot_notify_rx,
|
||||
persisted_snapshot_notify_tx,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -178,6 +185,7 @@ impl QueryableBuffer {
|
|||
let wal_file_number = write.wal_file_number;
|
||||
let buffer = Arc::clone(&self.buffer);
|
||||
let catalog = Arc::clone(&self.catalog);
|
||||
let notify_snapshot_tx = self.persisted_snapshot_notify_tx.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
// persist the catalog if it has been updated
|
||||
|
@ -247,7 +255,13 @@ impl QueryableBuffer {
|
|||
// persist the snapshot file
|
||||
loop {
|
||||
match persister.persist_snapshot(&persisted_snapshot).await {
|
||||
Ok(_) => break,
|
||||
Ok(_) => {
|
||||
let persisted_snapshot = Some(persisted_snapshot.clone());
|
||||
notify_snapshot_tx
|
||||
.send(persisted_snapshot)
|
||||
.expect("persisted snapshot notify tx should not be closed");
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(%e, "Error persisting snapshot, sleeping and retrying...");
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
@ -274,6 +288,12 @@ impl QueryableBuffer {
|
|||
pub fn persisted_parquet_files(&self, db_name: &str, table_name: &str) -> Vec<ParquetFile> {
|
||||
self.persisted_files.get_files(db_name, table_name)
|
||||
}
|
||||
|
||||
pub fn persisted_snapshot_notify_rx(
|
||||
&self,
|
||||
) -> tokio::sync::watch::Receiver<Option<PersistedSnapshot>> {
|
||||
self.persisted_snapshot_notify_rx.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
|
Loading…
Reference in New Issue