feat: implement parquet cache persistance (#24907)
* feat: use concrete type for Persister Up to this point we'd been using a generic `Persister` trait, however, in practice even for tests we only use one singular type, the `PersisterImpl`. In order to share the `MemoryPool` between it and the upcoming `ParquetCache` we need it to be the concrete type. This simplifies the code to grok as well by removing uneeded generic bounds. * fix: new_with_partition_key fn name typo * feat: implement parquet cache persistance * fix: incorporate feedback and don't hold across awaitpull/24953/head
parent
db8c8d5cc4
commit
43368981c7
|
@ -2729,6 +2729,7 @@ dependencies = [
|
|||
"thiserror",
|
||||
"tokio",
|
||||
"url",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
@ -32,13 +32,14 @@ hex.workspace = true
|
|||
object_store.workspace = true
|
||||
parking_lot.workspace = true
|
||||
parquet.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
sha2.workspace = true
|
||||
snap.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio.workspace = true
|
||||
url.workspace = true
|
||||
uuid.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
# Core Crates
|
||||
|
|
|
@ -0,0 +1,419 @@
|
|||
use crate::persister::serialize_to_parquet;
|
||||
use crate::persister::Error;
|
||||
use crate::ParquetFile;
|
||||
use bytes::Bytes;
|
||||
use datafusion::execution::memory_pool::MemoryPool;
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use object_store::memory::InMemory;
|
||||
use object_store::path::Path as ObjPath;
|
||||
use object_store::ObjectStore;
|
||||
use parking_lot::RwLock;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::task;
|
||||
|
||||
type MetaData = RwLock<HashMap<String, HashMap<String, HashMap<String, ParquetFile>>>>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ParquetCache {
|
||||
object_store: Arc<dyn ObjectStore>,
|
||||
meta_data: MetaData,
|
||||
mem_pool: Arc<dyn MemoryPool>,
|
||||
}
|
||||
|
||||
impl ParquetCache {
|
||||
/// Create a new `ParquetCache`
|
||||
pub fn new(mem_pool: &Arc<dyn MemoryPool>) -> Self {
|
||||
Self {
|
||||
object_store: Arc::new(InMemory::new()),
|
||||
meta_data: RwLock::new(HashMap::new()),
|
||||
mem_pool: Arc::clone(mem_pool),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the parquet file metadata for a given database and table
|
||||
pub fn get_parquet_files(&self, database_name: &str, table_name: &str) -> Vec<ParquetFile> {
|
||||
self.meta_data
|
||||
.read()
|
||||
.get(database_name)
|
||||
.and_then(|db| db.get(table_name))
|
||||
.cloned()
|
||||
.unwrap_or_default()
|
||||
.into_values()
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Persist a new parquet file to the cache or pass an object store path to update a currently
|
||||
/// existing file in the cache
|
||||
// Note we want to hold across await points until everything is cleared
|
||||
// before letting other tasks access the data
|
||||
pub async fn persist_parquet_file(
|
||||
&self,
|
||||
db_name: &str,
|
||||
table_name: &str,
|
||||
min_time: i64,
|
||||
max_time: i64,
|
||||
record_batches: SendableRecordBatchStream,
|
||||
path: Option<ObjPath>,
|
||||
) -> Result<(), Error> {
|
||||
let parquet = serialize_to_parquet(Arc::clone(&self.mem_pool), record_batches).await?;
|
||||
// Generate a path for this
|
||||
let id = uuid::Uuid::new_v4();
|
||||
let parquet_path =
|
||||
path.unwrap_or_else(|| ObjPath::from(format!("{db_name}-{table_name}-{id}")));
|
||||
let size_bytes = parquet.bytes.len() as u64;
|
||||
let meta_data = parquet.meta_data;
|
||||
|
||||
// Lock the data structure until everything is written into the object
|
||||
// store and metadata. We block on writing to the ObjectStore so that we
|
||||
// don't yield the thread while maintaining the lock
|
||||
let mut meta_data_lock = self.meta_data.write();
|
||||
let path = parquet_path.to_string();
|
||||
task::block_in_place(move || -> Result<_, Error> {
|
||||
Handle::current()
|
||||
.block_on(self.object_store.put(&parquet_path, parquet.bytes))
|
||||
.map_err(Into::into)
|
||||
})?;
|
||||
|
||||
meta_data_lock
|
||||
.entry(db_name.into())
|
||||
.and_modify(|db| {
|
||||
db.entry(table_name.into())
|
||||
.and_modify(|files| {
|
||||
files.insert(
|
||||
path.clone(),
|
||||
ParquetFile {
|
||||
path: path.clone(),
|
||||
size_bytes,
|
||||
row_count: meta_data.num_rows as u64,
|
||||
min_time,
|
||||
max_time,
|
||||
},
|
||||
);
|
||||
})
|
||||
.or_insert_with(|| {
|
||||
HashMap::from([(
|
||||
path.clone(),
|
||||
ParquetFile {
|
||||
path: path.clone(),
|
||||
size_bytes,
|
||||
row_count: meta_data.num_rows as u64,
|
||||
min_time,
|
||||
max_time,
|
||||
},
|
||||
)])
|
||||
});
|
||||
})
|
||||
.or_insert_with(|| {
|
||||
HashMap::from([(
|
||||
table_name.into(),
|
||||
HashMap::from([(
|
||||
path.clone(),
|
||||
ParquetFile {
|
||||
path: path.clone(),
|
||||
size_bytes,
|
||||
row_count: meta_data.num_rows as u64,
|
||||
min_time,
|
||||
max_time,
|
||||
},
|
||||
)]),
|
||||
)])
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Load the file from the cache
|
||||
pub async fn load_parquet_file(&self, path: ObjPath) -> Result<Bytes, Error> {
|
||||
Ok(self.object_store.get(&path).await?.bytes().await?)
|
||||
}
|
||||
|
||||
/// Remove the file from the cache
|
||||
pub async fn remove_parquet_file(&self, path: ObjPath) -> Result<(), Error> {
|
||||
let closure_path = path.clone();
|
||||
let mut split = path.as_ref().split('-');
|
||||
let db = split
|
||||
.next()
|
||||
.expect("cache keys are in the form db-table-uuid");
|
||||
let table = split
|
||||
.next()
|
||||
.expect("cache keys are in the form db-table-uuid");
|
||||
|
||||
// Lock the cache until this function is completed. We block on the
|
||||
// delete so that we don't hold the lock across await points
|
||||
let mut meta_data_lock = self.meta_data.write();
|
||||
task::block_in_place(move || -> Result<_, Error> {
|
||||
Handle::current()
|
||||
.block_on(self.object_store.delete(&closure_path))
|
||||
.map_err(Into::into)
|
||||
})?;
|
||||
meta_data_lock
|
||||
.get_mut(db)
|
||||
.and_then(|tables| tables.get_mut(table))
|
||||
.expect("the file exists in the meta_data table as well")
|
||||
.remove(path.as_ref());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Purge the whole cache
|
||||
pub async fn purge_cache(&self) -> Result<(), Error> {
|
||||
// Lock the metadata table and thus all writing to the cache
|
||||
let mut meta_data_lock = self.meta_data.write();
|
||||
// Remove every object from the object store
|
||||
for db in meta_data_lock.values() {
|
||||
for table in db.values() {
|
||||
for file in table.values() {
|
||||
// Block on deletes so that we don't hold the lock across
|
||||
// the await point
|
||||
task::block_in_place(move || -> Result<_, Error> {
|
||||
Handle::current()
|
||||
.block_on(self.object_store.delete(&file.path.as_str().into()))
|
||||
.map_err(Into::into)
|
||||
})?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Reset the metadata table back to a new state
|
||||
*meta_data_lock = HashMap::new();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Get a reference to the ObjectStore backing the cache
|
||||
pub fn object_store(&self) -> Arc<dyn ObjectStore> {
|
||||
Arc::clone(&self.object_store)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::Error;
|
||||
use super::ParquetCache;
|
||||
use arrow::array::TimestampNanosecondArray;
|
||||
use arrow::datatypes::DataType;
|
||||
use arrow::datatypes::Field;
|
||||
use arrow::datatypes::Schema;
|
||||
use arrow::datatypes::TimeUnit;
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use datafusion::execution::memory_pool::MemoryPool;
|
||||
use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||
async fn cache_persist() -> Result<(), Error> {
|
||||
let cache = make_cache();
|
||||
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"time",
|
||||
DataType::Timestamp(TimeUnit::Nanosecond, None),
|
||||
false,
|
||||
)]));
|
||||
let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5);
|
||||
|
||||
let time_array = TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]);
|
||||
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(time_array)]).unwrap();
|
||||
|
||||
stream_builder.tx().send(Ok(batch)).await.unwrap();
|
||||
|
||||
let stream = stream_builder.build();
|
||||
|
||||
cache
|
||||
.persist_parquet_file("test_db", "test_table", 1, 5, stream, None)
|
||||
.await?;
|
||||
|
||||
let tables = cache.get_parquet_files("test_db", "test_table");
|
||||
assert_eq!(tables.len(), 1);
|
||||
|
||||
let _bytes = cache
|
||||
.load_parquet_file(tables[0].path.as_str().into())
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||
async fn cache_update() -> Result<(), Error> {
|
||||
let cache = make_cache();
|
||||
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"time",
|
||||
DataType::Timestamp(TimeUnit::Nanosecond, None),
|
||||
false,
|
||||
)]));
|
||||
let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5);
|
||||
|
||||
let time_array = TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]);
|
||||
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(time_array)]).unwrap();
|
||||
|
||||
stream_builder.tx().send(Ok(batch)).await.unwrap();
|
||||
|
||||
let stream = stream_builder.build();
|
||||
|
||||
cache
|
||||
.persist_parquet_file("test_db", "test_table", 1, 5, stream, None)
|
||||
.await?;
|
||||
|
||||
let tables = cache.get_parquet_files("test_db", "test_table");
|
||||
assert_eq!(tables.len(), 1);
|
||||
|
||||
let path: object_store::path::Path = tables[0].path.as_str().into();
|
||||
let orig_bytes = cache.load_parquet_file(path.clone()).await?;
|
||||
|
||||
let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5);
|
||||
|
||||
let time_array = TimestampNanosecondArray::from(vec![6, 7, 8, 9, 10]);
|
||||
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(time_array)]).unwrap();
|
||||
|
||||
stream_builder.tx().send(Ok(batch)).await.unwrap();
|
||||
|
||||
let stream = stream_builder.build();
|
||||
|
||||
cache
|
||||
.persist_parquet_file("test_db", "test_table", 6, 10, stream, Some(path.clone()))
|
||||
.await?;
|
||||
|
||||
let new_bytes = cache.load_parquet_file(path).await?;
|
||||
|
||||
assert_ne!(orig_bytes, new_bytes);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||
async fn multiple_parquet() -> Result<(), Error> {
|
||||
let cache = make_cache();
|
||||
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"time",
|
||||
DataType::Timestamp(TimeUnit::Nanosecond, None),
|
||||
false,
|
||||
)]));
|
||||
let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5);
|
||||
|
||||
let time_array = TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]);
|
||||
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(time_array)]).unwrap();
|
||||
|
||||
stream_builder.tx().send(Ok(batch)).await.unwrap();
|
||||
|
||||
let stream = stream_builder.build();
|
||||
|
||||
cache
|
||||
.persist_parquet_file("test_db", "test_table", 1, 5, stream, None)
|
||||
.await?;
|
||||
|
||||
let tables = cache.get_parquet_files("test_db", "test_table");
|
||||
assert_eq!(tables.len(), 1);
|
||||
|
||||
let path: object_store::path::Path = tables[0].path.as_str().into();
|
||||
let _ = cache.load_parquet_file(path.clone()).await?;
|
||||
|
||||
let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5);
|
||||
|
||||
let time_array = TimestampNanosecondArray::from(vec![6, 7, 8, 9, 10]);
|
||||
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(time_array)]).unwrap();
|
||||
|
||||
stream_builder.tx().send(Ok(batch)).await.unwrap();
|
||||
|
||||
let stream = stream_builder.build();
|
||||
|
||||
cache
|
||||
.persist_parquet_file("test_db", "test_table", 6, 10, stream, None)
|
||||
.await?;
|
||||
|
||||
let tables = cache.get_parquet_files("test_db", "test_table");
|
||||
assert_eq!(tables.len(), 2);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||
async fn purge_cache() -> Result<(), Error> {
|
||||
let cache = make_cache();
|
||||
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"time",
|
||||
DataType::Timestamp(TimeUnit::Nanosecond, None),
|
||||
false,
|
||||
)]));
|
||||
|
||||
let time_array = TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]);
|
||||
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(time_array)]).unwrap();
|
||||
|
||||
let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5);
|
||||
stream_builder.tx().send(Ok(batch.clone())).await.unwrap();
|
||||
let stream = stream_builder.build();
|
||||
cache
|
||||
.persist_parquet_file("test_db", "test_table", 1, 5, stream, None)
|
||||
.await?;
|
||||
let tables = cache.get_parquet_files("test_db", "test_table");
|
||||
assert_eq!(tables.len(), 1);
|
||||
|
||||
let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5);
|
||||
stream_builder.tx().send(Ok(batch.clone())).await.unwrap();
|
||||
let stream = stream_builder.build();
|
||||
cache
|
||||
.persist_parquet_file("test_db_2", "test_table", 1, 5, stream, None)
|
||||
.await?;
|
||||
let tables = cache.get_parquet_files("test_db_2", "test_table");
|
||||
assert_eq!(tables.len(), 1);
|
||||
|
||||
let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5);
|
||||
stream_builder.tx().send(Ok(batch.clone())).await.unwrap();
|
||||
let stream = stream_builder.build();
|
||||
cache
|
||||
.persist_parquet_file("test_db_3", "test_table", 1, 5, stream, None)
|
||||
.await?;
|
||||
let tables = cache.get_parquet_files("test_db_3", "test_table");
|
||||
assert_eq!(tables.len(), 1);
|
||||
|
||||
let size = cache.object_store.list(None).size_hint().0;
|
||||
assert_eq!(size, 3);
|
||||
|
||||
cache.purge_cache().await?;
|
||||
let size = cache.object_store.list(None).size_hint().0;
|
||||
assert_eq!(size, 0);
|
||||
assert_eq!(cache.meta_data.read().len(), 0);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||
async fn cache_remove_parquet() -> Result<(), Error> {
|
||||
let cache = make_cache();
|
||||
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"time",
|
||||
DataType::Timestamp(TimeUnit::Nanosecond, None),
|
||||
false,
|
||||
)]));
|
||||
let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5);
|
||||
|
||||
let time_array = TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]);
|
||||
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(time_array)]).unwrap();
|
||||
|
||||
stream_builder.tx().send(Ok(batch)).await.unwrap();
|
||||
|
||||
let stream = stream_builder.build();
|
||||
|
||||
cache
|
||||
.persist_parquet_file("test_db", "test_table", 1, 5, stream, None)
|
||||
.await?;
|
||||
|
||||
let tables = cache.get_parquet_files("test_db", "test_table");
|
||||
assert_eq!(tables.len(), 1);
|
||||
|
||||
let path = object_store::path::Path::from(tables[0].path.as_str());
|
||||
let _bytes = cache.load_parquet_file(path.clone()).await?;
|
||||
|
||||
cache.remove_parquet_file(path.clone()).await?;
|
||||
let tables = cache.get_parquet_files("test_db", "test_table");
|
||||
assert_eq!(tables.len(), 0);
|
||||
assert!(cache.load_parquet_file(path.clone()).await.is_err());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn make_cache() -> ParquetCache {
|
||||
let mem_pool: Arc<dyn MemoryPool> =
|
||||
Arc::new(datafusion::execution::memory_pool::UnboundedMemoryPool::default());
|
||||
ParquetCache::new(&mem_pool)
|
||||
}
|
||||
}
|
|
@ -6,6 +6,7 @@
|
|||
//! When the segment reaches a certain size, or a certain amount of time has passed, it will be closed and marked
|
||||
//! to be persisted. A new open segment will be created and new writes will be written to that segment.
|
||||
|
||||
pub mod cache;
|
||||
pub mod catalog;
|
||||
mod chunk;
|
||||
pub mod paths;
|
||||
|
|
|
@ -67,7 +67,7 @@ impl ParquetFilePath {
|
|||
Self(path)
|
||||
}
|
||||
|
||||
pub fn new_with_parititon_key(
|
||||
pub fn new_with_partition_key(
|
||||
db_name: &str,
|
||||
table_name: &str,
|
||||
partition_key: &str,
|
||||
|
|
|
@ -6,7 +6,10 @@ use crate::catalog::InnerCatalog;
|
|||
use crate::paths::CatalogFilePath;
|
||||
use crate::paths::ParquetFilePath;
|
||||
use crate::paths::SegmentInfoFilePath;
|
||||
use crate::{PersistedCatalog, PersistedSegment, Persister, SegmentId};
|
||||
use crate::PersistedCatalog;
|
||||
use crate::PersistedSegment;
|
||||
use crate::Persister;
|
||||
use crate::SegmentId;
|
||||
use arrow::datatypes::SchemaRef;
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use async_trait::async_trait;
|
||||
|
@ -57,7 +60,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
#[derive(Debug)]
|
||||
pub struct PersisterImpl {
|
||||
object_store: Arc<dyn ObjectStore>,
|
||||
mem_pool: Arc<dyn MemoryPool>,
|
||||
pub(crate) mem_pool: Arc<dyn MemoryPool>,
|
||||
}
|
||||
|
||||
impl PersisterImpl {
|
||||
|
@ -72,38 +75,41 @@ impl PersisterImpl {
|
|||
&self,
|
||||
batches: SendableRecordBatchStream,
|
||||
) -> Result<ParquetBytes> {
|
||||
// The ArrowWriter::write() call will return an error if any subsequent
|
||||
// batch does not match this schema, enforcing schema uniformity.
|
||||
let schema = batches.schema();
|
||||
|
||||
let stream = batches;
|
||||
let mut bytes = Vec::new();
|
||||
pin_mut!(stream);
|
||||
|
||||
// Construct the arrow serializer with the metadata as part of the parquet
|
||||
// file properties.
|
||||
let mut writer = TrackedMemoryArrowWriter::try_new(
|
||||
&mut bytes,
|
||||
Arc::clone(&schema),
|
||||
self.mem_pool.clone(),
|
||||
)?;
|
||||
|
||||
while let Some(batch) = stream.try_next().await? {
|
||||
writer.write(batch)?;
|
||||
}
|
||||
|
||||
let writer_meta = writer.close()?;
|
||||
if writer_meta.num_rows == 0 {
|
||||
return Err(Error::NoRows);
|
||||
}
|
||||
|
||||
Ok(ParquetBytes {
|
||||
meta_data: writer_meta,
|
||||
bytes: Bytes::from(bytes),
|
||||
})
|
||||
serialize_to_parquet(Arc::clone(&self.mem_pool), batches).await
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn serialize_to_parquet(
|
||||
mem_pool: Arc<dyn MemoryPool>,
|
||||
batches: SendableRecordBatchStream,
|
||||
) -> Result<ParquetBytes> {
|
||||
// The ArrowWriter::write() call will return an error if any subsequent
|
||||
// batch does not match this schema, enforcing schema uniformity.
|
||||
let schema = batches.schema();
|
||||
|
||||
let stream = batches;
|
||||
let mut bytes = Vec::new();
|
||||
pin_mut!(stream);
|
||||
|
||||
// Construct the arrow serializer with the metadata as part of the parquet
|
||||
// file properties.
|
||||
let mut writer = TrackedMemoryArrowWriter::try_new(&mut bytes, Arc::clone(&schema), mem_pool)?;
|
||||
|
||||
while let Some(batch) = stream.try_next().await? {
|
||||
writer.write(batch)?;
|
||||
}
|
||||
|
||||
let writer_meta = writer.close()?;
|
||||
if writer_meta.num_rows == 0 {
|
||||
return Err(Error::NoRows);
|
||||
}
|
||||
|
||||
Ok(ParquetBytes {
|
||||
meta_data: writer_meta,
|
||||
bytes: Bytes::from(bytes),
|
||||
})
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Persister for PersisterImpl {
|
||||
type Error = Error;
|
||||
|
|
|
@ -464,7 +464,7 @@ impl ClosedBufferSegment {
|
|||
let row_count = data.iter().map(|b| b.num_rows()).sum::<usize>();
|
||||
|
||||
let batch_stream = stream_from_batches(table.schema().as_arrow(), data);
|
||||
let parquet_file_path = ParquetFilePath::new_with_parititon_key(
|
||||
let parquet_file_path = ParquetFilePath::new_with_partition_key(
|
||||
db_name,
|
||||
&table.name,
|
||||
&table_buffer.segment_key.to_string(),
|
||||
|
@ -726,7 +726,7 @@ pub(crate) mod tests {
|
|||
// file number of the path should match the segment id
|
||||
assert_eq!(
|
||||
cpu_parqet.path,
|
||||
ParquetFilePath::new_with_parititon_key("db1", "cpu", SEGMENT_KEY, 4).to_string()
|
||||
ParquetFilePath::new_with_partition_key("db1", "cpu", SEGMENT_KEY, 4).to_string()
|
||||
);
|
||||
assert_eq!(cpu_parqet.row_count, 2);
|
||||
assert_eq!(cpu_parqet.min_time, 10);
|
||||
|
@ -738,7 +738,7 @@ pub(crate) mod tests {
|
|||
// file number of the path should match the segment id
|
||||
assert_eq!(
|
||||
mem_parqet.path,
|
||||
ParquetFilePath::new_with_parititon_key("db1", "mem", SEGMENT_KEY, 4).to_string()
|
||||
ParquetFilePath::new_with_partition_key("db1", "mem", SEGMENT_KEY, 4).to_string()
|
||||
);
|
||||
assert_eq!(mem_parqet.row_count, 2);
|
||||
assert_eq!(mem_parqet.min_time, 15);
|
||||
|
|
|
@ -6,17 +6,18 @@ mod loader;
|
|||
mod segment_state;
|
||||
mod table_buffer;
|
||||
|
||||
use crate::cache::ParquetCache;
|
||||
use crate::catalog::{
|
||||
Catalog, DatabaseSchema, TableDefinition, SERIES_ID_COLUMN_NAME, TIME_COLUMN_NAME,
|
||||
};
|
||||
use crate::chunk::ParquetChunk;
|
||||
use crate::persister::PersisterImpl;
|
||||
use crate::write_buffer::flusher::WriteBufferFlusher;
|
||||
use crate::write_buffer::loader::load_starting_state;
|
||||
use crate::write_buffer::segment_state::{run_buffer_segment_persist_and_cleanup, SegmentState};
|
||||
use crate::DatabaseTables;
|
||||
use crate::{
|
||||
persister, BufferSegment, BufferedWriteRequest, Bufferer, ChunkContainer, LpWriteOp, Persister,
|
||||
Precision, SegmentDuration, SegmentId, SequenceNumber, Wal, WalOp, WriteBuffer, WriteLineError,
|
||||
BufferSegment, BufferedWriteRequest, Bufferer, ChunkContainer, LpWriteOp, Persister, Precision,
|
||||
SegmentDuration, SegmentId, SequenceNumber, Wal, WalOp, WriteBuffer, WriteLineError,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use data_types::{
|
||||
|
@ -25,14 +26,13 @@ use data_types::{
|
|||
use datafusion::common::DataFusionError;
|
||||
use datafusion::execution::context::SessionState;
|
||||
use datafusion::logical_expr::Expr;
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use influxdb_line_protocol::{parse_lines, FieldValue, ParsedLine, Series, TagSet};
|
||||
use iox_query::chunk_statistics::create_chunk_statistics;
|
||||
use iox_query::QueryChunk;
|
||||
use iox_time::{Time, TimeProvider};
|
||||
use object_store::memory::InMemory;
|
||||
use object_store::path::Path as ObjPath;
|
||||
use object_store::ObjectMeta;
|
||||
use object_store::ObjectStore;
|
||||
use observability_deps::tracing::{debug, error};
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use parquet_file::storage::ParquetExecInput;
|
||||
|
@ -40,7 +40,7 @@ use sha2::Digest;
|
|||
use sha2::Sha256;
|
||||
use std::borrow::Cow;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::ops::Deref;
|
||||
use std::i64;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use thiserror::Error;
|
||||
use tokio::sync::watch;
|
||||
|
@ -89,12 +89,11 @@ pub struct WriteRequest<'a> {
|
|||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct WriteBufferImpl<W, T, P> {
|
||||
pub struct WriteBufferImpl<W, T> {
|
||||
catalog: Arc<Catalog>,
|
||||
persister: Arc<PersisterImpl>,
|
||||
parquet_cache: Arc<ParquetCache>,
|
||||
segment_state: Arc<RwLock<SegmentState<T, W>>>,
|
||||
parquet_cache: Arc<dyn ObjectStore>,
|
||||
parquet_cache_metadata: Arc<RwLock<DatabaseTables>>,
|
||||
persister: Arc<P>,
|
||||
wal: Option<Arc<W>>,
|
||||
write_buffer_flusher: WriteBufferFlusher,
|
||||
segment_duration: SegmentDuration,
|
||||
|
@ -106,19 +105,14 @@ pub struct WriteBufferImpl<W, T, P> {
|
|||
shutdown_segment_persist_tx: watch::Sender<()>,
|
||||
}
|
||||
|
||||
impl<W: Wal, T: TimeProvider, P: Persister> WriteBufferImpl<W, T, P> {
|
||||
impl<W: Wal, T: TimeProvider> WriteBufferImpl<W, T> {
|
||||
pub async fn new(
|
||||
persister: Arc<P>,
|
||||
persister: Arc<PersisterImpl>,
|
||||
wal: Option<Arc<W>>,
|
||||
time_provider: Arc<T>,
|
||||
segment_duration: SegmentDuration,
|
||||
executor: Arc<iox_query::exec::Executor>,
|
||||
) -> Result<Self>
|
||||
where
|
||||
P: Persister,
|
||||
Error: From<<P as Persister>::Error>,
|
||||
persister::Error: From<<P as Persister>::Error>,
|
||||
{
|
||||
) -> Result<Self> {
|
||||
let now = time_provider.now();
|
||||
let loaded_state =
|
||||
load_starting_state(Arc::clone(&persister), wal.clone(), now, segment_duration).await?;
|
||||
|
@ -157,10 +151,7 @@ impl<W: Wal, T: TimeProvider, P: Persister> WriteBufferImpl<W, T, P> {
|
|||
Ok(Self {
|
||||
catalog: loaded_state.catalog,
|
||||
segment_state,
|
||||
parquet_cache: Arc::new(InMemory::new()),
|
||||
parquet_cache_metadata: Arc::new(RwLock::new(DatabaseTables {
|
||||
tables: HashMap::new(),
|
||||
})),
|
||||
parquet_cache: Arc::new(ParquetCache::new(&persister.mem_pool)),
|
||||
persister,
|
||||
wal,
|
||||
write_buffer_flusher,
|
||||
|
@ -285,18 +276,12 @@ impl<W: Wal, T: TimeProvider, P: Persister> WriteBufferImpl<W, T, P> {
|
|||
}
|
||||
|
||||
// Get any cached files and add them to the query
|
||||
let parquet_files = self
|
||||
.parquet_cache_metadata
|
||||
.read()
|
||||
.deref()
|
||||
.tables
|
||||
.get(database_name)
|
||||
.map(|table| table.parquet_files.clone())
|
||||
.unwrap_or_default();
|
||||
|
||||
// This is mostly the same as above, but we change the object store to
|
||||
// point to the in memory cache
|
||||
for parquet_file in parquet_files {
|
||||
for parquet_file in self
|
||||
.parquet_cache
|
||||
.get_parquet_files(database_name, table_name)
|
||||
{
|
||||
let partition_key = data_types::PartitionKey::from(parquet_file.path.clone());
|
||||
let partition_id = data_types::partition::TransitionPartitionId::new(
|
||||
data_types::TableId::new(0),
|
||||
|
@ -321,7 +306,7 @@ impl<W: Wal, T: TimeProvider, P: Persister> WriteBufferImpl<W, T, P> {
|
|||
e_tag: None,
|
||||
version: None,
|
||||
},
|
||||
object_store: Arc::clone(&self.parquet_cache),
|
||||
object_store: Arc::clone(&self.parquet_cache.object_store()),
|
||||
};
|
||||
|
||||
let parquet_chunk = ParquetChunk {
|
||||
|
@ -342,6 +327,43 @@ impl<W: Wal, T: TimeProvider, P: Persister> WriteBufferImpl<W, T, P> {
|
|||
Ok(chunks)
|
||||
}
|
||||
|
||||
pub async fn cache_parquet(
|
||||
&self,
|
||||
db_name: &str,
|
||||
table_name: &str,
|
||||
min_time: i64,
|
||||
max_time: i64,
|
||||
records: SendableRecordBatchStream,
|
||||
) -> Result<(), Error> {
|
||||
Ok(self
|
||||
.parquet_cache
|
||||
.persist_parquet_file(db_name, table_name, min_time, max_time, records, None)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn update_parquet(
|
||||
&self,
|
||||
db_name: &str,
|
||||
table_name: &str,
|
||||
min_time: i64,
|
||||
max_time: i64,
|
||||
path: ObjPath,
|
||||
records: SendableRecordBatchStream,
|
||||
) -> Result<(), Error> {
|
||||
Ok(self
|
||||
.parquet_cache
|
||||
.persist_parquet_file(db_name, table_name, min_time, max_time, records, Some(path))
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn remove_parquet(&self, path: ObjPath) -> Result<(), Error> {
|
||||
Ok(self.parquet_cache.remove_parquet_file(path).await?)
|
||||
}
|
||||
|
||||
pub async fn purge_cache(&self) -> Result<(), Error> {
|
||||
Ok(self.parquet_cache.purge_cache().await?)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn get_table_record_batches(
|
||||
&self,
|
||||
|
@ -358,7 +380,7 @@ impl<W: Wal, T: TimeProvider, P: Persister> WriteBufferImpl<W, T, P> {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<W: Wal, T: TimeProvider, P: Persister> Bufferer for WriteBufferImpl<W, T, P> {
|
||||
impl<W: Wal, T: TimeProvider> Bufferer for WriteBufferImpl<W, T> {
|
||||
async fn write_lp(
|
||||
&self,
|
||||
database: NamespaceName<'static>,
|
||||
|
@ -392,7 +414,7 @@ impl<W: Wal, T: TimeProvider, P: Persister> Bufferer for WriteBufferImpl<W, T, P
|
|||
}
|
||||
}
|
||||
|
||||
impl<W: Wal, T: TimeProvider, P: Persister> ChunkContainer for WriteBufferImpl<W, T, P> {
|
||||
impl<W: Wal, T: TimeProvider> ChunkContainer for WriteBufferImpl<W, T> {
|
||||
fn get_table_chunks(
|
||||
&self,
|
||||
database_name: &str,
|
||||
|
@ -405,7 +427,7 @@ impl<W: Wal, T: TimeProvider, P: Persister> ChunkContainer for WriteBufferImpl<W
|
|||
}
|
||||
}
|
||||
|
||||
impl<W: Wal, T: TimeProvider, P: Persister> WriteBuffer for WriteBufferImpl<W, T, P> {}
|
||||
impl<W: Wal, T: TimeProvider> WriteBuffer for WriteBufferImpl<W, T> {}
|
||||
|
||||
/// Returns a validated result and the sequence number of the catalog before any updates were
|
||||
/// applied.
|
||||
|
@ -1156,7 +1178,7 @@ mod tests {
|
|||
}
|
||||
|
||||
async fn get_table_batches(
|
||||
write_buffer: &WriteBufferImpl<WalImpl, MockProvider, PersisterImpl>,
|
||||
write_buffer: &WriteBufferImpl<WalImpl, MockProvider>,
|
||||
database_name: &str,
|
||||
table_name: &str,
|
||||
ctx: &IOxSessionContext,
|
||||
|
|
Loading…
Reference in New Issue