From 341b8d7aff9e700fbd945e185b6fdb0d5d692c46 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Fri, 13 Sep 2024 16:58:18 -0400 Subject: [PATCH] feat: add watch to writebuffer for persisted snapshots (#25330) --- influxdb3_write/src/lib.rs | 3 + influxdb3_write/src/write_buffer/mod.rs | 58 ++++++++++++++++++- .../src/write_buffer/queryable_buffer.rs | 22 ++++++- 3 files changed, 80 insertions(+), 3 deletions(-) diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index 2dec080076..c4fd330246 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -78,6 +78,9 @@ pub trait Bufferer: Debug + Send + Sync + 'static { /// Returns the parquet files for a given database and table fn parquet_files(&self, db_name: &str, table_name: &str) -> Vec; + + /// A channel to watch for when new persisted snapshots are created + fn watch_persisted_snapshots(&self) -> tokio::sync::watch::Receiver>; } /// ChunkContainer is used by the query engine to get chunks for a given table. Chunks will generally be in the diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index cb7967ee49..0fe9d28747 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -13,8 +13,8 @@ use crate::write_buffer::persisted_files::PersistedFiles; use crate::write_buffer::queryable_buffer::QueryableBuffer; use crate::write_buffer::validator::WriteValidator; use crate::{ - BufferedWriteRequest, Bufferer, ChunkContainer, LastCacheManager, ParquetFile, Precision, - WriteBuffer, WriteLineError, NEXT_FILE_ID, + BufferedWriteRequest, Bufferer, ChunkContainer, LastCacheManager, ParquetFile, + PersistedSnapshot, Precision, WriteBuffer, WriteLineError, NEXT_FILE_ID, }; use async_trait::async_trait; use data_types::{ChunkId, ChunkOrder, ColumnType, NamespaceName, NamespaceNameError}; @@ -42,6 +42,7 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use thiserror::Error; +use tokio::sync::watch::Receiver; #[derive(Debug, Error)] pub enum Error { @@ -491,6 +492,10 @@ impl Bufferer for WriteBufferImpl { fn parquet_files(&self, db_name: &str, table_name: &str) -> Vec { self.buffer.persisted_parquet_files(db_name, table_name) } + + fn watch_persisted_snapshots(&self) -> Receiver> { + self.buffer.persisted_snapshot_notify_rx() + } } impl ChunkContainer for WriteBufferImpl { @@ -1510,6 +1515,55 @@ mod tests { ); } + #[tokio::test] + async fn notifies_watchers_of_snapshot() { + let obj_store: Arc = Arc::new(InMemory::new()); + let (wbuf, _) = setup( + Time::from_timestamp_nanos(0), + Arc::clone(&obj_store), + WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }, + ) + .await; + + let mut watcher = wbuf.watch_persisted_snapshots(); + watcher.mark_changed(); + + let db_name = "coffee_shop"; + let tbl_name = "menu"; + + // do some writes to get a snapshot: + do_writes( + db_name, + &wbuf, + &[ + TestWrite { + lp: format!("{tbl_name},name=espresso price=2.50"), + time_seconds: 1, + }, + TestWrite { + lp: format!("{tbl_name},name=americano price=3.00"), + time_seconds: 2, + }, + TestWrite { + lp: format!("{tbl_name},name=latte price=4.50"), + time_seconds: 3, + }, + ], + ) + .await; + + // wait for snapshot to be created: + verify_snapshot_count(1, &wbuf.persister).await; + watcher.changed().await.unwrap(); + let snapshot = watcher.borrow(); + assert!(snapshot.is_some(), "watcher should be notified of snapshot"); + } + struct TestWrite { lp: LP, time_seconds: i64, diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 09f83e13dc..d202090dd4 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -40,6 +40,9 @@ pub struct QueryableBuffer { persister: Arc, persisted_files: Arc, buffer: Arc>, + /// Sends a notification to this watch channel whenever a snapshot info is persisted + persisted_snapshot_notify_rx: tokio::sync::watch::Receiver>, + persisted_snapshot_notify_tx: tokio::sync::watch::Sender>, } impl QueryableBuffer { @@ -51,6 +54,8 @@ impl QueryableBuffer { persisted_files: Arc, ) -> Self { let buffer = Arc::new(RwLock::new(BufferState::new(Arc::clone(&catalog)))); + let (persisted_snapshot_notify_tx, persisted_snapshot_notify_rx) = + tokio::sync::watch::channel(None); Self { executor, catalog, @@ -58,6 +63,8 @@ impl QueryableBuffer { persister, persisted_files, buffer, + persisted_snapshot_notify_rx, + persisted_snapshot_notify_tx, } } @@ -178,6 +185,7 @@ impl QueryableBuffer { let wal_file_number = write.wal_file_number; let buffer = Arc::clone(&self.buffer); let catalog = Arc::clone(&self.catalog); + let notify_snapshot_tx = self.persisted_snapshot_notify_tx.clone(); tokio::spawn(async move { // persist the catalog if it has been updated @@ -247,7 +255,13 @@ impl QueryableBuffer { // persist the snapshot file loop { match persister.persist_snapshot(&persisted_snapshot).await { - Ok(_) => break, + Ok(_) => { + let persisted_snapshot = Some(persisted_snapshot.clone()); + notify_snapshot_tx + .send(persisted_snapshot) + .expect("persisted snapshot notify tx should not be closed"); + break; + } Err(e) => { error!(%e, "Error persisting snapshot, sleeping and retrying..."); tokio::time::sleep(Duration::from_secs(1)).await; @@ -274,6 +288,12 @@ impl QueryableBuffer { pub fn persisted_parquet_files(&self, db_name: &str, table_name: &str) -> Vec { self.persisted_files.get_files(db_name, table_name) } + + pub fn persisted_snapshot_notify_rx( + &self, + ) -> tokio::sync::watch::Receiver> { + self.persisted_snapshot_notify_rx.clone() + } } #[async_trait]