diff --git a/Cargo.lock b/Cargo.lock index f33155131f..96c53a9c0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2729,6 +2729,7 @@ dependencies = [ "thiserror", "tokio", "url", + "uuid", ] [[package]] diff --git a/influxdb3_write/Cargo.toml b/influxdb3_write/Cargo.toml index 33e4d367be..fe54ede39b 100644 --- a/influxdb3_write/Cargo.toml +++ b/influxdb3_write/Cargo.toml @@ -32,13 +32,14 @@ hex.workspace = true object_store.workspace = true parking_lot.workspace = true parquet.workspace = true -thiserror.workspace = true -tokio.workspace = true serde.workspace = true serde_json.workspace = true sha2.workspace = true snap.workspace = true +thiserror.workspace = true +tokio.workspace = true url.workspace = true +uuid.workspace = true [dev-dependencies] # Core Crates diff --git a/influxdb3_write/src/cache.rs b/influxdb3_write/src/cache.rs new file mode 100644 index 0000000000..3202b9d163 --- /dev/null +++ b/influxdb3_write/src/cache.rs @@ -0,0 +1,419 @@ +use crate::persister::serialize_to_parquet; +use crate::persister::Error; +use crate::ParquetFile; +use bytes::Bytes; +use datafusion::execution::memory_pool::MemoryPool; +use datafusion::physical_plan::SendableRecordBatchStream; +use object_store::memory::InMemory; +use object_store::path::Path as ObjPath; +use object_store::ObjectStore; +use parking_lot::RwLock; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::runtime::Handle; +use tokio::task; + +type MetaData = RwLock>>>; + +#[derive(Debug)] +pub struct ParquetCache { + object_store: Arc, + meta_data: MetaData, + mem_pool: Arc, +} + +impl ParquetCache { + /// Create a new `ParquetCache` + pub fn new(mem_pool: &Arc) -> Self { + Self { + object_store: Arc::new(InMemory::new()), + meta_data: RwLock::new(HashMap::new()), + mem_pool: Arc::clone(mem_pool), + } + } + + /// Get the parquet file metadata for a given database and table + pub fn get_parquet_files(&self, database_name: &str, table_name: &str) -> Vec { + self.meta_data + .read() + .get(database_name) + .and_then(|db| db.get(table_name)) + .cloned() + .unwrap_or_default() + .into_values() + .collect() + } + + /// Persist a new parquet file to the cache or pass an object store path to update a currently + /// existing file in the cache + // Note we want to hold across await points until everything is cleared + // before letting other tasks access the data + pub async fn persist_parquet_file( + &self, + db_name: &str, + table_name: &str, + min_time: i64, + max_time: i64, + record_batches: SendableRecordBatchStream, + path: Option, + ) -> Result<(), Error> { + let parquet = serialize_to_parquet(Arc::clone(&self.mem_pool), record_batches).await?; + // Generate a path for this + let id = uuid::Uuid::new_v4(); + let parquet_path = + path.unwrap_or_else(|| ObjPath::from(format!("{db_name}-{table_name}-{id}"))); + let size_bytes = parquet.bytes.len() as u64; + let meta_data = parquet.meta_data; + + // Lock the data structure until everything is written into the object + // store and metadata. We block on writing to the ObjectStore so that we + // don't yield the thread while maintaining the lock + let mut meta_data_lock = self.meta_data.write(); + let path = parquet_path.to_string(); + task::block_in_place(move || -> Result<_, Error> { + Handle::current() + .block_on(self.object_store.put(&parquet_path, parquet.bytes)) + .map_err(Into::into) + })?; + + meta_data_lock + .entry(db_name.into()) + .and_modify(|db| { + db.entry(table_name.into()) + .and_modify(|files| { + files.insert( + path.clone(), + ParquetFile { + path: path.clone(), + size_bytes, + row_count: meta_data.num_rows as u64, + min_time, + max_time, + }, + ); + }) + .or_insert_with(|| { + HashMap::from([( + path.clone(), + ParquetFile { + path: path.clone(), + size_bytes, + row_count: meta_data.num_rows as u64, + min_time, + max_time, + }, + )]) + }); + }) + .or_insert_with(|| { + HashMap::from([( + table_name.into(), + HashMap::from([( + path.clone(), + ParquetFile { + path: path.clone(), + size_bytes, + row_count: meta_data.num_rows as u64, + min_time, + max_time, + }, + )]), + )]) + }); + + Ok(()) + } + + /// Load the file from the cache + pub async fn load_parquet_file(&self, path: ObjPath) -> Result { + Ok(self.object_store.get(&path).await?.bytes().await?) + } + + /// Remove the file from the cache + pub async fn remove_parquet_file(&self, path: ObjPath) -> Result<(), Error> { + let closure_path = path.clone(); + let mut split = path.as_ref().split('-'); + let db = split + .next() + .expect("cache keys are in the form db-table-uuid"); + let table = split + .next() + .expect("cache keys are in the form db-table-uuid"); + + // Lock the cache until this function is completed. We block on the + // delete so that we don't hold the lock across await points + let mut meta_data_lock = self.meta_data.write(); + task::block_in_place(move || -> Result<_, Error> { + Handle::current() + .block_on(self.object_store.delete(&closure_path)) + .map_err(Into::into) + })?; + meta_data_lock + .get_mut(db) + .and_then(|tables| tables.get_mut(table)) + .expect("the file exists in the meta_data table as well") + .remove(path.as_ref()); + + Ok(()) + } + + /// Purge the whole cache + pub async fn purge_cache(&self) -> Result<(), Error> { + // Lock the metadata table and thus all writing to the cache + let mut meta_data_lock = self.meta_data.write(); + // Remove every object from the object store + for db in meta_data_lock.values() { + for table in db.values() { + for file in table.values() { + // Block on deletes so that we don't hold the lock across + // the await point + task::block_in_place(move || -> Result<_, Error> { + Handle::current() + .block_on(self.object_store.delete(&file.path.as_str().into())) + .map_err(Into::into) + })?; + } + } + } + + // Reset the metadata table back to a new state + *meta_data_lock = HashMap::new(); + + Ok(()) + } + + // Get a reference to the ObjectStore backing the cache + pub fn object_store(&self) -> Arc { + Arc::clone(&self.object_store) + } +} + +#[cfg(test)] +mod tests { + use super::Error; + use super::ParquetCache; + use arrow::array::TimestampNanosecondArray; + use arrow::datatypes::DataType; + use arrow::datatypes::Field; + use arrow::datatypes::Schema; + use arrow::datatypes::TimeUnit; + use arrow::record_batch::RecordBatch; + use datafusion::execution::memory_pool::MemoryPool; + use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder; + use std::sync::Arc; + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn cache_persist() -> Result<(), Error> { + let cache = make_cache(); + let schema = Arc::new(Schema::new(vec![Field::new( + "time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + )])); + let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5); + + let time_array = TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(time_array)]).unwrap(); + + stream_builder.tx().send(Ok(batch)).await.unwrap(); + + let stream = stream_builder.build(); + + cache + .persist_parquet_file("test_db", "test_table", 1, 5, stream, None) + .await?; + + let tables = cache.get_parquet_files("test_db", "test_table"); + assert_eq!(tables.len(), 1); + + let _bytes = cache + .load_parquet_file(tables[0].path.as_str().into()) + .await?; + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn cache_update() -> Result<(), Error> { + let cache = make_cache(); + let schema = Arc::new(Schema::new(vec![Field::new( + "time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + )])); + let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5); + + let time_array = TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(time_array)]).unwrap(); + + stream_builder.tx().send(Ok(batch)).await.unwrap(); + + let stream = stream_builder.build(); + + cache + .persist_parquet_file("test_db", "test_table", 1, 5, stream, None) + .await?; + + let tables = cache.get_parquet_files("test_db", "test_table"); + assert_eq!(tables.len(), 1); + + let path: object_store::path::Path = tables[0].path.as_str().into(); + let orig_bytes = cache.load_parquet_file(path.clone()).await?; + + let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5); + + let time_array = TimestampNanosecondArray::from(vec![6, 7, 8, 9, 10]); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(time_array)]).unwrap(); + + stream_builder.tx().send(Ok(batch)).await.unwrap(); + + let stream = stream_builder.build(); + + cache + .persist_parquet_file("test_db", "test_table", 6, 10, stream, Some(path.clone())) + .await?; + + let new_bytes = cache.load_parquet_file(path).await?; + + assert_ne!(orig_bytes, new_bytes); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn multiple_parquet() -> Result<(), Error> { + let cache = make_cache(); + let schema = Arc::new(Schema::new(vec![Field::new( + "time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + )])); + let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5); + + let time_array = TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(time_array)]).unwrap(); + + stream_builder.tx().send(Ok(batch)).await.unwrap(); + + let stream = stream_builder.build(); + + cache + .persist_parquet_file("test_db", "test_table", 1, 5, stream, None) + .await?; + + let tables = cache.get_parquet_files("test_db", "test_table"); + assert_eq!(tables.len(), 1); + + let path: object_store::path::Path = tables[0].path.as_str().into(); + let _ = cache.load_parquet_file(path.clone()).await?; + + let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5); + + let time_array = TimestampNanosecondArray::from(vec![6, 7, 8, 9, 10]); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(time_array)]).unwrap(); + + stream_builder.tx().send(Ok(batch)).await.unwrap(); + + let stream = stream_builder.build(); + + cache + .persist_parquet_file("test_db", "test_table", 6, 10, stream, None) + .await?; + + let tables = cache.get_parquet_files("test_db", "test_table"); + assert_eq!(tables.len(), 2); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn purge_cache() -> Result<(), Error> { + let cache = make_cache(); + let schema = Arc::new(Schema::new(vec![Field::new( + "time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + )])); + + let time_array = TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(time_array)]).unwrap(); + + let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5); + stream_builder.tx().send(Ok(batch.clone())).await.unwrap(); + let stream = stream_builder.build(); + cache + .persist_parquet_file("test_db", "test_table", 1, 5, stream, None) + .await?; + let tables = cache.get_parquet_files("test_db", "test_table"); + assert_eq!(tables.len(), 1); + + let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5); + stream_builder.tx().send(Ok(batch.clone())).await.unwrap(); + let stream = stream_builder.build(); + cache + .persist_parquet_file("test_db_2", "test_table", 1, 5, stream, None) + .await?; + let tables = cache.get_parquet_files("test_db_2", "test_table"); + assert_eq!(tables.len(), 1); + + let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5); + stream_builder.tx().send(Ok(batch.clone())).await.unwrap(); + let stream = stream_builder.build(); + cache + .persist_parquet_file("test_db_3", "test_table", 1, 5, stream, None) + .await?; + let tables = cache.get_parquet_files("test_db_3", "test_table"); + assert_eq!(tables.len(), 1); + + let size = cache.object_store.list(None).size_hint().0; + assert_eq!(size, 3); + + cache.purge_cache().await?; + let size = cache.object_store.list(None).size_hint().0; + assert_eq!(size, 0); + assert_eq!(cache.meta_data.read().len(), 0); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn cache_remove_parquet() -> Result<(), Error> { + let cache = make_cache(); + let schema = Arc::new(Schema::new(vec![Field::new( + "time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + )])); + let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5); + + let time_array = TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(time_array)]).unwrap(); + + stream_builder.tx().send(Ok(batch)).await.unwrap(); + + let stream = stream_builder.build(); + + cache + .persist_parquet_file("test_db", "test_table", 1, 5, stream, None) + .await?; + + let tables = cache.get_parquet_files("test_db", "test_table"); + assert_eq!(tables.len(), 1); + + let path = object_store::path::Path::from(tables[0].path.as_str()); + let _bytes = cache.load_parquet_file(path.clone()).await?; + + cache.remove_parquet_file(path.clone()).await?; + let tables = cache.get_parquet_files("test_db", "test_table"); + assert_eq!(tables.len(), 0); + assert!(cache.load_parquet_file(path.clone()).await.is_err()); + + Ok(()) + } + + fn make_cache() -> ParquetCache { + let mem_pool: Arc = + Arc::new(datafusion::execution::memory_pool::UnboundedMemoryPool::default()); + ParquetCache::new(&mem_pool) + } +} diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index e5a2820bc8..6d8b86d2f8 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -6,6 +6,7 @@ //! When the segment reaches a certain size, or a certain amount of time has passed, it will be closed and marked //! to be persisted. A new open segment will be created and new writes will be written to that segment. +pub mod cache; pub mod catalog; mod chunk; pub mod paths; diff --git a/influxdb3_write/src/paths.rs b/influxdb3_write/src/paths.rs index 5038bd8df4..b883f2ff60 100644 --- a/influxdb3_write/src/paths.rs +++ b/influxdb3_write/src/paths.rs @@ -67,7 +67,7 @@ impl ParquetFilePath { Self(path) } - pub fn new_with_parititon_key( + pub fn new_with_partition_key( db_name: &str, table_name: &str, partition_key: &str, diff --git a/influxdb3_write/src/persister.rs b/influxdb3_write/src/persister.rs index c137a646b9..d68602cda1 100644 --- a/influxdb3_write/src/persister.rs +++ b/influxdb3_write/src/persister.rs @@ -6,7 +6,10 @@ use crate::catalog::InnerCatalog; use crate::paths::CatalogFilePath; use crate::paths::ParquetFilePath; use crate::paths::SegmentInfoFilePath; -use crate::{PersistedCatalog, PersistedSegment, Persister, SegmentId}; +use crate::PersistedCatalog; +use crate::PersistedSegment; +use crate::Persister; +use crate::SegmentId; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use async_trait::async_trait; @@ -57,7 +60,7 @@ pub type Result = std::result::Result; #[derive(Debug)] pub struct PersisterImpl { object_store: Arc, - mem_pool: Arc, + pub(crate) mem_pool: Arc, } impl PersisterImpl { @@ -72,38 +75,41 @@ impl PersisterImpl { &self, batches: SendableRecordBatchStream, ) -> Result { - // The ArrowWriter::write() call will return an error if any subsequent - // batch does not match this schema, enforcing schema uniformity. - let schema = batches.schema(); - - let stream = batches; - let mut bytes = Vec::new(); - pin_mut!(stream); - - // Construct the arrow serializer with the metadata as part of the parquet - // file properties. - let mut writer = TrackedMemoryArrowWriter::try_new( - &mut bytes, - Arc::clone(&schema), - self.mem_pool.clone(), - )?; - - while let Some(batch) = stream.try_next().await? { - writer.write(batch)?; - } - - let writer_meta = writer.close()?; - if writer_meta.num_rows == 0 { - return Err(Error::NoRows); - } - - Ok(ParquetBytes { - meta_data: writer_meta, - bytes: Bytes::from(bytes), - }) + serialize_to_parquet(Arc::clone(&self.mem_pool), batches).await } } +pub async fn serialize_to_parquet( + mem_pool: Arc, + batches: SendableRecordBatchStream, +) -> Result { + // The ArrowWriter::write() call will return an error if any subsequent + // batch does not match this schema, enforcing schema uniformity. + let schema = batches.schema(); + + let stream = batches; + let mut bytes = Vec::new(); + pin_mut!(stream); + + // Construct the arrow serializer with the metadata as part of the parquet + // file properties. + let mut writer = TrackedMemoryArrowWriter::try_new(&mut bytes, Arc::clone(&schema), mem_pool)?; + + while let Some(batch) = stream.try_next().await? { + writer.write(batch)?; + } + + let writer_meta = writer.close()?; + if writer_meta.num_rows == 0 { + return Err(Error::NoRows); + } + + Ok(ParquetBytes { + meta_data: writer_meta, + bytes: Bytes::from(bytes), + }) +} + #[async_trait] impl Persister for PersisterImpl { type Error = Error; diff --git a/influxdb3_write/src/write_buffer/buffer_segment.rs b/influxdb3_write/src/write_buffer/buffer_segment.rs index c00f1afbcc..71a10a0183 100644 --- a/influxdb3_write/src/write_buffer/buffer_segment.rs +++ b/influxdb3_write/src/write_buffer/buffer_segment.rs @@ -464,7 +464,7 @@ impl ClosedBufferSegment { let row_count = data.iter().map(|b| b.num_rows()).sum::(); let batch_stream = stream_from_batches(table.schema().as_arrow(), data); - let parquet_file_path = ParquetFilePath::new_with_parititon_key( + let parquet_file_path = ParquetFilePath::new_with_partition_key( db_name, &table.name, &table_buffer.segment_key.to_string(), @@ -726,7 +726,7 @@ pub(crate) mod tests { // file number of the path should match the segment id assert_eq!( cpu_parqet.path, - ParquetFilePath::new_with_parititon_key("db1", "cpu", SEGMENT_KEY, 4).to_string() + ParquetFilePath::new_with_partition_key("db1", "cpu", SEGMENT_KEY, 4).to_string() ); assert_eq!(cpu_parqet.row_count, 2); assert_eq!(cpu_parqet.min_time, 10); @@ -738,7 +738,7 @@ pub(crate) mod tests { // file number of the path should match the segment id assert_eq!( mem_parqet.path, - ParquetFilePath::new_with_parititon_key("db1", "mem", SEGMENT_KEY, 4).to_string() + ParquetFilePath::new_with_partition_key("db1", "mem", SEGMENT_KEY, 4).to_string() ); assert_eq!(mem_parqet.row_count, 2); assert_eq!(mem_parqet.min_time, 15); diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index c0c01af353..f44ed01a3b 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -6,17 +6,18 @@ mod loader; mod segment_state; mod table_buffer; +use crate::cache::ParquetCache; use crate::catalog::{ Catalog, DatabaseSchema, TableDefinition, SERIES_ID_COLUMN_NAME, TIME_COLUMN_NAME, }; use crate::chunk::ParquetChunk; +use crate::persister::PersisterImpl; use crate::write_buffer::flusher::WriteBufferFlusher; use crate::write_buffer::loader::load_starting_state; use crate::write_buffer::segment_state::{run_buffer_segment_persist_and_cleanup, SegmentState}; -use crate::DatabaseTables; use crate::{ - persister, BufferSegment, BufferedWriteRequest, Bufferer, ChunkContainer, LpWriteOp, Persister, - Precision, SegmentDuration, SegmentId, SequenceNumber, Wal, WalOp, WriteBuffer, WriteLineError, + BufferSegment, BufferedWriteRequest, Bufferer, ChunkContainer, LpWriteOp, Persister, Precision, + SegmentDuration, SegmentId, SequenceNumber, Wal, WalOp, WriteBuffer, WriteLineError, }; use async_trait::async_trait; use data_types::{ @@ -25,14 +26,13 @@ use data_types::{ use datafusion::common::DataFusionError; use datafusion::execution::context::SessionState; use datafusion::logical_expr::Expr; +use datafusion::physical_plan::SendableRecordBatchStream; use influxdb_line_protocol::{parse_lines, FieldValue, ParsedLine, Series, TagSet}; use iox_query::chunk_statistics::create_chunk_statistics; use iox_query::QueryChunk; use iox_time::{Time, TimeProvider}; -use object_store::memory::InMemory; use object_store::path::Path as ObjPath; use object_store::ObjectMeta; -use object_store::ObjectStore; use observability_deps::tracing::{debug, error}; use parking_lot::{Mutex, RwLock}; use parquet_file::storage::ParquetExecInput; @@ -40,7 +40,7 @@ use sha2::Digest; use sha2::Sha256; use std::borrow::Cow; use std::collections::{BTreeMap, HashMap}; -use std::ops::Deref; +use std::i64; use std::sync::{Arc, OnceLock}; use thiserror::Error; use tokio::sync::watch; @@ -89,12 +89,11 @@ pub struct WriteRequest<'a> { } #[derive(Debug)] -pub struct WriteBufferImpl { +pub struct WriteBufferImpl { catalog: Arc, + persister: Arc, + parquet_cache: Arc, segment_state: Arc>>, - parquet_cache: Arc, - parquet_cache_metadata: Arc>, - persister: Arc

, wal: Option>, write_buffer_flusher: WriteBufferFlusher, segment_duration: SegmentDuration, @@ -106,19 +105,14 @@ pub struct WriteBufferImpl { shutdown_segment_persist_tx: watch::Sender<()>, } -impl WriteBufferImpl { +impl WriteBufferImpl { pub async fn new( - persister: Arc

, + persister: Arc, wal: Option>, time_provider: Arc, segment_duration: SegmentDuration, executor: Arc, - ) -> Result - where - P: Persister, - Error: From<

::Error>, - persister::Error: From<

::Error>, - { + ) -> Result { let now = time_provider.now(); let loaded_state = load_starting_state(Arc::clone(&persister), wal.clone(), now, segment_duration).await?; @@ -157,10 +151,7 @@ impl WriteBufferImpl { Ok(Self { catalog: loaded_state.catalog, segment_state, - parquet_cache: Arc::new(InMemory::new()), - parquet_cache_metadata: Arc::new(RwLock::new(DatabaseTables { - tables: HashMap::new(), - })), + parquet_cache: Arc::new(ParquetCache::new(&persister.mem_pool)), persister, wal, write_buffer_flusher, @@ -285,18 +276,12 @@ impl WriteBufferImpl { } // Get any cached files and add them to the query - let parquet_files = self - .parquet_cache_metadata - .read() - .deref() - .tables - .get(database_name) - .map(|table| table.parquet_files.clone()) - .unwrap_or_default(); - // This is mostly the same as above, but we change the object store to // point to the in memory cache - for parquet_file in parquet_files { + for parquet_file in self + .parquet_cache + .get_parquet_files(database_name, table_name) + { let partition_key = data_types::PartitionKey::from(parquet_file.path.clone()); let partition_id = data_types::partition::TransitionPartitionId::new( data_types::TableId::new(0), @@ -321,7 +306,7 @@ impl WriteBufferImpl { e_tag: None, version: None, }, - object_store: Arc::clone(&self.parquet_cache), + object_store: Arc::clone(&self.parquet_cache.object_store()), }; let parquet_chunk = ParquetChunk { @@ -342,6 +327,43 @@ impl WriteBufferImpl { Ok(chunks) } + pub async fn cache_parquet( + &self, + db_name: &str, + table_name: &str, + min_time: i64, + max_time: i64, + records: SendableRecordBatchStream, + ) -> Result<(), Error> { + Ok(self + .parquet_cache + .persist_parquet_file(db_name, table_name, min_time, max_time, records, None) + .await?) + } + + pub async fn update_parquet( + &self, + db_name: &str, + table_name: &str, + min_time: i64, + max_time: i64, + path: ObjPath, + records: SendableRecordBatchStream, + ) -> Result<(), Error> { + Ok(self + .parquet_cache + .persist_parquet_file(db_name, table_name, min_time, max_time, records, Some(path)) + .await?) + } + + pub async fn remove_parquet(&self, path: ObjPath) -> Result<(), Error> { + Ok(self.parquet_cache.remove_parquet_file(path).await?) + } + + pub async fn purge_cache(&self) -> Result<(), Error> { + Ok(self.parquet_cache.purge_cache().await?) + } + #[cfg(test)] fn get_table_record_batches( &self, @@ -358,7 +380,7 @@ impl WriteBufferImpl { } #[async_trait] -impl Bufferer for WriteBufferImpl { +impl Bufferer for WriteBufferImpl { async fn write_lp( &self, database: NamespaceName<'static>, @@ -392,7 +414,7 @@ impl Bufferer for WriteBufferImpl ChunkContainer for WriteBufferImpl { +impl ChunkContainer for WriteBufferImpl { fn get_table_chunks( &self, database_name: &str, @@ -405,7 +427,7 @@ impl ChunkContainer for WriteBufferImpl WriteBuffer for WriteBufferImpl {} +impl WriteBuffer for WriteBufferImpl {} /// Returns a validated result and the sequence number of the catalog before any updates were /// applied. @@ -1156,7 +1178,7 @@ mod tests { } async fn get_table_batches( - write_buffer: &WriteBufferImpl, + write_buffer: &WriteBufferImpl, database_name: &str, table_name: &str, ctx: &IOxSessionContext,