feat: Implement Persister for PersisterImpl (#24588)
* feat: Implement Catalog r/w for persister This commit implements reading and writing the Catalog to the object store. This was already stubbed out functionality, but it just needed an implementation. Saving it to the object store is pretty straight forward as it just serializes it to JSON and writes it to the object store. For loading, it finds the most recently added Catalog based on the file name and returns that from the object store in it's deserialized form and returned to the caller. This commit also adds some tests to make sure that the above functionality works as intended. * feat: Implement Segment r/w for persister This commit continues the work on the persister by implementing the persist_segment and load_segment functions for the persister. Much like the Catalog implementation, it's serialized to JSON before being persisted to the object store in persist_segment. This is pretty straightforward. For the loading though we need to find the most recent n segment files and so we need to list them and then return the most recent n. This is a little more complicated to do, but there are comments in the code to make it easier to grok. We also implement more tests to make sure that this part of the persister works as expected. * feat: Implement Parquet r/w to persister This commit does a few things: - First we add methods to the persister trait for reading and writing parquet files as these were not stubbed out in prior commits - Secondly we add a method to serialize a SendableRecordBatchStream into Parquet bytes - With these in place implementing the trait methods is pretty straightforward: hand a path in and a stream and get back some metadata about the file persisted and also get the bytes back if loading from the store Of course we also add more tests to make sure this all works as expected. Do note that this does nothing to make sure that we bound how much memory is used or if this is the most efficient way to write parquet files. This is mostly to get things working with the understanding that future refinement on the approach might be needed. * fix: Update smallvec for crate advisory * fix: Implement better filename handling * feat: Handle loading > 1000 Segment Info filespull/24606/head
parent
e13cc476bb
commit
001a2a6653
|
@ -2389,16 +2389,19 @@ dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"byteorder",
|
"byteorder",
|
||||||
|
"bytes",
|
||||||
"chrono",
|
"chrono",
|
||||||
"crc32fast",
|
"crc32fast",
|
||||||
"data_types",
|
"data_types",
|
||||||
"datafusion",
|
"datafusion",
|
||||||
|
"futures-util",
|
||||||
"influxdb-line-protocol",
|
"influxdb-line-protocol",
|
||||||
"iox_catalog",
|
"iox_catalog",
|
||||||
"iox_query",
|
"iox_query",
|
||||||
"object_store",
|
"object_store",
|
||||||
"observability_deps",
|
"observability_deps",
|
||||||
"parking_lot 0.11.2",
|
"parking_lot 0.11.2",
|
||||||
|
"parquet",
|
||||||
"schema",
|
"schema",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
@ -4736,9 +4739,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "smallvec"
|
name = "smallvec"
|
||||||
version = "1.13.0"
|
version = "1.13.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "3b187f0231d56fe41bfb12034819dd2bf336422a5866de41bc3fec4b2e3883e8"
|
checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "snafu"
|
name = "snafu"
|
||||||
|
|
|
@ -10,7 +10,7 @@ data_types = { path = "../data_types" }
|
||||||
influxdb-line-protocol = { path = "../influxdb_line_protocol" }
|
influxdb-line-protocol = { path = "../influxdb_line_protocol" }
|
||||||
iox_catalog = { path = "../iox_catalog" }
|
iox_catalog = { path = "../iox_catalog" }
|
||||||
iox_query = { path = "../iox_query" }
|
iox_query = { path = "../iox_query" }
|
||||||
object_store = { workspace = true }
|
object_store.workspace = true
|
||||||
observability_deps = { path = "../observability_deps" }
|
observability_deps = { path = "../observability_deps" }
|
||||||
schema = { path = "../schema" }
|
schema = { path = "../schema" }
|
||||||
workspace-hack = { version = "0.1", path = "../workspace-hack" }
|
workspace-hack = { version = "0.1", path = "../workspace-hack" }
|
||||||
|
@ -23,11 +23,14 @@ chrono = "0.4"
|
||||||
crc32fast = "1.2.0"
|
crc32fast = "1.2.0"
|
||||||
datafusion = { workspace = true }
|
datafusion = { workspace = true }
|
||||||
parking_lot = "0.11.1"
|
parking_lot = "0.11.1"
|
||||||
|
parquet = { workspace = true }
|
||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
tokio = { version = "1.35", features = ["macros", "fs", "io-util", "parking_lot", "rt-multi-thread", "sync", "time"] }
|
tokio = { version = "1.35", features = ["macros", "fs", "io-util", "parking_lot", "rt-multi-thread", "sync", "time"] }
|
||||||
serde = { version = "1.0.188", features = ["derive"] }
|
serde = { version = "1.0.188", features = ["derive"] }
|
||||||
serde_json = "1.0.107"
|
serde_json = "1.0.107"
|
||||||
snap = "1.0.0"
|
snap = "1.0.0"
|
||||||
|
bytes = "1.5.0"
|
||||||
|
futures-util = "0.3.30"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
test_helpers = { path = "../test_helpers" }
|
test_helpers = { path = "../test_helpers" }
|
||||||
|
|
|
@ -79,6 +79,10 @@ impl Catalog {
|
||||||
info!("db_schema {}", name);
|
info!("db_schema {}", name);
|
||||||
self.inner.read().databases.get(name).cloned()
|
self.inner.read().databases.get(name).cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn into_inner(self) -> InnerCatalog {
|
||||||
|
self.inner.into_inner()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
|
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||||
|
@ -95,6 +99,11 @@ impl InnerCatalog {
|
||||||
sequence: 0,
|
sequence: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn db_exists(&self, db_name: &str) -> bool {
|
||||||
|
self.databases.get(db_name).is_some()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
|
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
|
||||||
|
|
|
@ -13,12 +13,16 @@ pub mod wal;
|
||||||
pub mod write_buffer;
|
pub mod write_buffer;
|
||||||
|
|
||||||
use crate::catalog::Catalog;
|
use crate::catalog::Catalog;
|
||||||
|
use crate::paths::ParquetFilePath;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
use bytes::Bytes;
|
||||||
use data_types::NamespaceName;
|
use data_types::NamespaceName;
|
||||||
use datafusion::error::DataFusionError;
|
use datafusion::error::DataFusionError;
|
||||||
use datafusion::execution::context::SessionState;
|
use datafusion::execution::context::SessionState;
|
||||||
|
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||||
use datafusion::prelude::Expr;
|
use datafusion::prelude::Expr;
|
||||||
use iox_query::QueryChunk;
|
use iox_query::QueryChunk;
|
||||||
|
use parquet::format::FileMetaData;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
|
@ -39,6 +43,21 @@ pub enum Error {
|
||||||
|
|
||||||
#[error("write buffer error: {0}")]
|
#[error("write buffer error: {0}")]
|
||||||
WriteBuffer(#[from] write_buffer::Error),
|
WriteBuffer(#[from] write_buffer::Error),
|
||||||
|
|
||||||
|
#[error("serde_json error: {0}")]
|
||||||
|
SerdeJson(#[from] serde_json::Error),
|
||||||
|
|
||||||
|
#[error("object_store error: {0}")]
|
||||||
|
ObjectStore(#[from] object_store::Error),
|
||||||
|
|
||||||
|
#[error("parse int error: {0}")]
|
||||||
|
ParseInt(#[from] std::num::ParseIntError),
|
||||||
|
|
||||||
|
#[error("parquet error: {0}")]
|
||||||
|
ParquetError(#[from] parquet::errors::ParquetError),
|
||||||
|
|
||||||
|
#[error("tried to serialize a parquet file with no rows")]
|
||||||
|
NoRows,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
|
@ -156,6 +175,9 @@ pub trait Persister: Debug + Send + Sync + 'static {
|
||||||
/// Loads the most recently persisted N segment parquet file lists from object storage.
|
/// Loads the most recently persisted N segment parquet file lists from object storage.
|
||||||
async fn load_segments(&self, most_recent_n: usize) -> Result<Vec<PersistedSegment>>;
|
async fn load_segments(&self, most_recent_n: usize) -> Result<Vec<PersistedSegment>>;
|
||||||
|
|
||||||
|
// Loads a Parquet file from ObjectStore
|
||||||
|
async fn load_parquet_file(&self, path: ParquetFilePath) -> crate::Result<Bytes>;
|
||||||
|
|
||||||
/// Persists the catalog with the given segment ID. If this is the highest segment ID, it will
|
/// Persists the catalog with the given segment ID. If this is the highest segment ID, it will
|
||||||
/// be the catalog that is returned the next time `load_catalog` is called.
|
/// be the catalog that is returned the next time `load_catalog` is called.
|
||||||
async fn persist_catalog(&self, segment_id: SegmentId, catalog: catalog::Catalog)
|
async fn persist_catalog(&self, segment_id: SegmentId, catalog: catalog::Catalog)
|
||||||
|
@ -165,6 +187,14 @@ pub trait Persister: Debug + Send + Sync + 'static {
|
||||||
/// for this segment.
|
/// for this segment.
|
||||||
async fn persist_segment(&self, persisted_segment: PersistedSegment) -> Result<()>;
|
async fn persist_segment(&self, persisted_segment: PersistedSegment) -> Result<()>;
|
||||||
|
|
||||||
|
// Writes a SendableRecorgBatchStream to the Parquet format and perists it
|
||||||
|
// to Object Store at the given path
|
||||||
|
async fn persist_parquet_file(
|
||||||
|
&self,
|
||||||
|
path: ParquetFilePath,
|
||||||
|
record_batch: SendableRecordBatchStream,
|
||||||
|
) -> crate::Result<FileMetaData>;
|
||||||
|
|
||||||
/// Returns the configured `ObjectStore` that data is loaded from and persisted to.
|
/// Returns the configured `ObjectStore` that data is loaded from and persisted to.
|
||||||
fn object_store(&self) -> Arc<dyn object_store::ObjectStore>;
|
fn object_store(&self) -> Arc<dyn object_store::ObjectStore>;
|
||||||
}
|
}
|
||||||
|
|
|
@ -121,6 +121,10 @@ impl SegmentInfoFilePath {
|
||||||
));
|
));
|
||||||
Self(path)
|
Self(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn dir() -> Self {
|
||||||
|
Self(ObjPath::from("segments"))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Deref for SegmentInfoFilePath {
|
impl Deref for SegmentInfoFilePath {
|
||||||
|
|
|
@ -2,46 +2,506 @@
|
||||||
//! storage.
|
//! storage.
|
||||||
|
|
||||||
use crate::catalog::Catalog;
|
use crate::catalog::Catalog;
|
||||||
|
use crate::catalog::InnerCatalog;
|
||||||
|
use crate::paths::CatalogFilePath;
|
||||||
|
use crate::paths::ParquetFilePath;
|
||||||
|
use crate::paths::SegmentInfoFilePath;
|
||||||
|
use crate::Error;
|
||||||
|
use crate::Result;
|
||||||
use crate::{PersistedCatalog, PersistedSegment, Persister, SegmentId};
|
use crate::{PersistedCatalog, PersistedSegment, Persister, SegmentId};
|
||||||
|
use arrow::datatypes::SchemaRef;
|
||||||
|
use arrow::record_batch::RecordBatch;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
use bytes::Bytes;
|
||||||
|
use datafusion::execution::memory_pool::MemoryConsumer;
|
||||||
|
use datafusion::execution::memory_pool::MemoryPool;
|
||||||
|
use datafusion::execution::memory_pool::MemoryReservation;
|
||||||
|
use datafusion::execution::memory_pool::UnboundedMemoryPool;
|
||||||
|
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||||
|
use futures_util::pin_mut;
|
||||||
|
use futures_util::stream::StreamExt;
|
||||||
|
use futures_util::stream::TryStreamExt;
|
||||||
|
use object_store::path::Path as ObjPath;
|
||||||
use object_store::ObjectStore;
|
use object_store::ObjectStore;
|
||||||
|
use parquet::arrow::ArrowWriter;
|
||||||
|
use parquet::basic::Compression;
|
||||||
|
use parquet::file::properties::WriterProperties;
|
||||||
|
use parquet::format::FileMetaData;
|
||||||
|
use std::io::Write;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct PersisterImpl {
|
pub struct PersisterImpl {
|
||||||
#[allow(dead_code)]
|
|
||||||
object_store: Arc<dyn ObjectStore>,
|
object_store: Arc<dyn ObjectStore>,
|
||||||
|
mem_pool: Arc<dyn MemoryPool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PersisterImpl {
|
impl PersisterImpl {
|
||||||
pub fn new(object_store: Arc<dyn ObjectStore>) -> Self {
|
pub fn new(object_store: Arc<dyn ObjectStore>) -> Self {
|
||||||
Self { object_store }
|
Self {
|
||||||
|
object_store,
|
||||||
|
mem_pool: Arc::new(UnboundedMemoryPool::default()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn serialize_to_parquet(
|
||||||
|
&self,
|
||||||
|
batches: SendableRecordBatchStream,
|
||||||
|
) -> Result<ParquetBytes> {
|
||||||
|
// The ArrowWriter::write() call will return an error if any subsequent
|
||||||
|
// batch does not match this schema, enforcing schema uniformity.
|
||||||
|
let schema = batches.schema();
|
||||||
|
|
||||||
|
let stream = batches;
|
||||||
|
let mut bytes = Vec::new();
|
||||||
|
pin_mut!(stream);
|
||||||
|
|
||||||
|
// Construct the arrow serializer with the metadata as part of the parquet
|
||||||
|
// file properties.
|
||||||
|
let mut writer = TrackedMemoryArrowWriter::try_new(
|
||||||
|
&mut bytes,
|
||||||
|
Arc::clone(&schema),
|
||||||
|
self.mem_pool.clone(),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
while let Some(batch) = stream.try_next().await? {
|
||||||
|
writer.write(batch)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let writer_meta = writer.close()?;
|
||||||
|
if writer_meta.num_rows == 0 {
|
||||||
|
return Err(Error::NoRows);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(ParquetBytes {
|
||||||
|
meta_data: writer_meta,
|
||||||
|
bytes: Bytes::from(bytes),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl Persister for PersisterImpl {
|
impl Persister for PersisterImpl {
|
||||||
async fn load_catalog(&self) -> crate::Result<Option<PersistedCatalog>> {
|
async fn load_catalog(&self) -> Result<Option<PersistedCatalog>> {
|
||||||
todo!()
|
let mut list = self
|
||||||
|
.object_store
|
||||||
|
.list(Some(&CatalogFilePath::dir()))
|
||||||
|
.await?;
|
||||||
|
let mut catalog_path: Option<ObjPath> = None;
|
||||||
|
while let Some(item) = list.next().await {
|
||||||
|
let item = item?;
|
||||||
|
catalog_path = match catalog_path {
|
||||||
|
Some(old_path) => {
|
||||||
|
let Some(new_catalog_name) = item.location.filename() else {
|
||||||
|
// Skip this iteration as this listed file has no
|
||||||
|
// filename
|
||||||
|
catalog_path = Some(old_path);
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let old_catalog_name = old_path
|
||||||
|
.filename()
|
||||||
|
// NOTE: this holds so long as CatalogFilePath is used
|
||||||
|
// from crate::paths
|
||||||
|
.expect("catalog file paths are guaranteed to have a filename");
|
||||||
|
|
||||||
|
// We order catalogs by number starting with u32::MAX and
|
||||||
|
// then decrease it, therefore if the new catalog file name
|
||||||
|
// is less than the old one this is the path we want
|
||||||
|
if new_catalog_name < old_catalog_name {
|
||||||
|
Some(item.location)
|
||||||
|
} else {
|
||||||
|
Some(old_path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => Some(item.location),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
match catalog_path {
|
||||||
|
None => Ok(None),
|
||||||
|
Some(path) => {
|
||||||
|
let bytes = self.object_store.get(&path).await?.bytes().await?;
|
||||||
|
let catalog: InnerCatalog = serde_json::from_slice(&bytes)?;
|
||||||
|
let file_name = path
|
||||||
|
.filename()
|
||||||
|
// NOTE: this holds so long as CatalogFilePath is used
|
||||||
|
// from crate::paths
|
||||||
|
.expect("catalog file paths are guaranteed to have a filename");
|
||||||
|
let parsed_number = file_name
|
||||||
|
.trim_end_matches(format!(".{}", crate::paths::CATALOG_FILE_EXTENSION).as_str())
|
||||||
|
.parse::<u32>()?;
|
||||||
|
let segment_id = SegmentId::new(u32::MAX - parsed_number);
|
||||||
|
Ok(Some(PersistedCatalog {
|
||||||
|
segment_id,
|
||||||
|
catalog,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn load_segments(&self, _most_recent_n: usize) -> crate::Result<Vec<PersistedSegment>> {
|
async fn load_segments(&self, mut most_recent_n: usize) -> Result<Vec<PersistedSegment>> {
|
||||||
todo!()
|
let mut output = Vec::new();
|
||||||
|
let mut offset: Option<ObjPath> = None;
|
||||||
|
while most_recent_n > 0 {
|
||||||
|
let count = if most_recent_n > 1000 {
|
||||||
|
most_recent_n -= 1000;
|
||||||
|
1000
|
||||||
|
} else {
|
||||||
|
let count = most_recent_n;
|
||||||
|
most_recent_n = 0;
|
||||||
|
count
|
||||||
|
};
|
||||||
|
|
||||||
|
let segment_list = if let Some(offset) = offset {
|
||||||
|
self.object_store
|
||||||
|
.list_with_offset(Some(&SegmentInfoFilePath::dir()), &offset)
|
||||||
|
.await?
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.await
|
||||||
|
} else {
|
||||||
|
self.object_store
|
||||||
|
.list(Some(&SegmentInfoFilePath::dir()))
|
||||||
|
.await?
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.await
|
||||||
|
};
|
||||||
|
|
||||||
|
// Why not collect into a Result<Vec<ObjectMeta>, object_store::Error>>
|
||||||
|
// like we could with Iterators? Well because it's a stream it ends up
|
||||||
|
// using different traits and can't really do that. So we need to loop
|
||||||
|
// through to return any errors that might have occurred, then do an
|
||||||
|
// unstable sort (which is faster and we know won't have any
|
||||||
|
// duplicates) since these can arrive out of order, and then issue gets
|
||||||
|
// on the n most recent segments that we want and is returned in order
|
||||||
|
// of the moste recent to least.
|
||||||
|
let mut list = Vec::new();
|
||||||
|
for segment in segment_list {
|
||||||
|
list.push(segment?);
|
||||||
|
}
|
||||||
|
|
||||||
|
list.sort_unstable_by(|a, b| a.location.cmp(&b.location));
|
||||||
|
|
||||||
|
let len = list.len();
|
||||||
|
let end = if len <= count { len } else { count };
|
||||||
|
|
||||||
|
for item in &list[0..end] {
|
||||||
|
let bytes = self.object_store.get(&item.location).await?.bytes().await?;
|
||||||
|
output.push(serde_json::from_slice(&bytes)?);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the last path in the array to use as an offset. This assumes
|
||||||
|
// we sorted the list as we can't guarantee otherwise the order of
|
||||||
|
// the list call to the object store.
|
||||||
|
offset = Some(list[end - 1].location.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(output)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn persist_catalog(
|
async fn load_parquet_file(&self, path: ParquetFilePath) -> Result<Bytes> {
|
||||||
|
Ok(self.object_store.get(&path).await?.bytes().await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn persist_catalog(&self, segment_id: SegmentId, catalog: Catalog) -> Result<()> {
|
||||||
|
let catalog_path = CatalogFilePath::new(segment_id);
|
||||||
|
let json = serde_json::to_vec_pretty(&catalog.into_inner())?;
|
||||||
|
self.object_store
|
||||||
|
.put(catalog_path.as_ref(), Bytes::from(json))
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn persist_segment(&self, persisted_segment: PersistedSegment) -> Result<()> {
|
||||||
|
let segment_file_path = SegmentInfoFilePath::new(persisted_segment.segment_id);
|
||||||
|
let json = serde_json::to_vec_pretty(&persisted_segment)?;
|
||||||
|
self.object_store
|
||||||
|
.put(segment_file_path.as_ref(), Bytes::from(json))
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn persist_parquet_file(
|
||||||
&self,
|
&self,
|
||||||
_segment_id: SegmentId,
|
path: ParquetFilePath,
|
||||||
_catalog: Catalog,
|
record_batch: SendableRecordBatchStream,
|
||||||
) -> crate::Result<()> {
|
) -> Result<FileMetaData> {
|
||||||
todo!()
|
let parquet = self.serialize_to_parquet(record_batch).await?;
|
||||||
}
|
self.object_store.put(path.as_ref(), parquet.bytes).await?;
|
||||||
|
|
||||||
async fn persist_segment(&self, _persisted_segment: PersistedSegment) -> crate::Result<()> {
|
Ok(parquet.meta_data)
|
||||||
todo!()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn object_store(&self) -> Arc<dyn ObjectStore> {
|
fn object_store(&self) -> Arc<dyn ObjectStore> {
|
||||||
todo!()
|
self.object_store.clone()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct ParquetBytes {
|
||||||
|
pub bytes: Bytes,
|
||||||
|
pub meta_data: FileMetaData,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wraps an [`ArrowWriter`] to track its buffered memory in a
|
||||||
|
/// DataFusion [`MemoryPool`]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct TrackedMemoryArrowWriter<W: Write + Send> {
|
||||||
|
/// The inner ArrowWriter
|
||||||
|
inner: ArrowWriter<W>,
|
||||||
|
/// DataFusion memory reservation with
|
||||||
|
reservation: MemoryReservation,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parquet row group write size
|
||||||
|
pub const ROW_GROUP_WRITE_SIZE: usize = 1024 * 1024;
|
||||||
|
|
||||||
|
impl<W: Write + Send> TrackedMemoryArrowWriter<W> {
|
||||||
|
/// create a new `TrackedMemoryArrowWriter<`
|
||||||
|
pub fn try_new(sink: W, schema: SchemaRef, mem_pool: Arc<dyn MemoryPool>) -> Result<Self> {
|
||||||
|
let props = WriterProperties::builder()
|
||||||
|
.set_compression(Compression::ZSTD(Default::default()))
|
||||||
|
.set_max_row_group_size(ROW_GROUP_WRITE_SIZE)
|
||||||
|
.build();
|
||||||
|
let inner = ArrowWriter::try_new(sink, schema, Some(props))?;
|
||||||
|
let consumer = MemoryConsumer::new("InfluxDB3 ParquetWriter (TrackedMemoryArrowWriter)");
|
||||||
|
let reservation = consumer.register(&mem_pool);
|
||||||
|
|
||||||
|
Ok(Self { inner, reservation })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Push a `RecordBatch` into the underlying writer, updating the
|
||||||
|
/// tracked allocation
|
||||||
|
pub fn write(&mut self, batch: RecordBatch) -> Result<()> {
|
||||||
|
// writer encodes the batch into its internal buffers
|
||||||
|
self.inner.write(&batch)?;
|
||||||
|
|
||||||
|
// In progress memory, in bytes
|
||||||
|
let in_progress_size = self.inner.in_progress_size();
|
||||||
|
|
||||||
|
// update the allocation with the pool.
|
||||||
|
self.reservation.try_resize(in_progress_size)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// closes the writer, flushing any remaining data and returning
|
||||||
|
/// the written [`FileMetaData`]
|
||||||
|
///
|
||||||
|
/// [`FileMetaData`]: parquet::format::FileMetaData
|
||||||
|
pub fn close(self) -> Result<parquet::format::FileMetaData> {
|
||||||
|
// reservation is returned on drop
|
||||||
|
Ok(self.inner.close()?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
use {
|
||||||
|
arrow::array::Int32Array, arrow::datatypes::DataType, arrow::datatypes::Field,
|
||||||
|
arrow::datatypes::Schema, chrono::Utc,
|
||||||
|
datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder,
|
||||||
|
object_store::local::LocalFileSystem, std::collections::HashMap,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn persist_catalog() {
|
||||||
|
let local_disk = LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
|
||||||
|
let persister = PersisterImpl::new(Arc::new(local_disk));
|
||||||
|
let catalog = Catalog::new();
|
||||||
|
let _ = catalog.db_or_create("my_db");
|
||||||
|
|
||||||
|
persister
|
||||||
|
.persist_catalog(SegmentId::new(0), catalog)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn persist_and_load_newest_catalog() {
|
||||||
|
let local_disk = LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
|
||||||
|
let persister = PersisterImpl::new(Arc::new(local_disk));
|
||||||
|
let catalog = Catalog::new();
|
||||||
|
let _ = catalog.db_or_create("my_db");
|
||||||
|
|
||||||
|
persister
|
||||||
|
.persist_catalog(SegmentId::new(0), catalog)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let catalog = Catalog::new();
|
||||||
|
let _ = catalog.db_or_create("my_second_db");
|
||||||
|
|
||||||
|
persister
|
||||||
|
.persist_catalog(SegmentId::new(1), catalog)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let catalog = persister
|
||||||
|
.load_catalog()
|
||||||
|
.await
|
||||||
|
.expect("loading the catalog did not cause an error")
|
||||||
|
.expect("there was a catalog to load");
|
||||||
|
|
||||||
|
assert_eq!(catalog.segment_id, SegmentId::new(1));
|
||||||
|
assert!(catalog.catalog.db_exists("my_second_db"));
|
||||||
|
assert!(!catalog.catalog.db_exists("my_db"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn persist_segment_info_file() {
|
||||||
|
let local_disk = LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
|
||||||
|
let persister = PersisterImpl::new(Arc::new(local_disk));
|
||||||
|
let info_file = PersistedSegment {
|
||||||
|
segment_id: SegmentId::new(0),
|
||||||
|
segment_wal_size_bytes: 0,
|
||||||
|
databases: HashMap::new(),
|
||||||
|
segment_min_time: 0,
|
||||||
|
segment_max_time: 1,
|
||||||
|
segment_row_count: 0,
|
||||||
|
segment_parquet_size_bytes: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
persister.persist_segment(info_file).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn persist_and_load_segment_info_files() {
|
||||||
|
let local_disk = LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
|
||||||
|
let persister = PersisterImpl::new(Arc::new(local_disk));
|
||||||
|
let info_file = PersistedSegment {
|
||||||
|
segment_id: SegmentId::new(0),
|
||||||
|
segment_wal_size_bytes: 0,
|
||||||
|
databases: HashMap::new(),
|
||||||
|
segment_min_time: 0,
|
||||||
|
segment_max_time: 1,
|
||||||
|
segment_row_count: 0,
|
||||||
|
segment_parquet_size_bytes: 0,
|
||||||
|
};
|
||||||
|
let info_file_2 = PersistedSegment {
|
||||||
|
segment_id: SegmentId::new(1),
|
||||||
|
segment_wal_size_bytes: 0,
|
||||||
|
databases: HashMap::new(),
|
||||||
|
segment_min_time: 0,
|
||||||
|
segment_max_time: 1,
|
||||||
|
segment_row_count: 0,
|
||||||
|
segment_parquet_size_bytes: 0,
|
||||||
|
};
|
||||||
|
let info_file_3 = PersistedSegment {
|
||||||
|
segment_id: SegmentId::new(2),
|
||||||
|
segment_wal_size_bytes: 0,
|
||||||
|
databases: HashMap::new(),
|
||||||
|
segment_min_time: 0,
|
||||||
|
segment_max_time: 1,
|
||||||
|
segment_row_count: 0,
|
||||||
|
segment_parquet_size_bytes: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
persister.persist_segment(info_file).await.unwrap();
|
||||||
|
persister.persist_segment(info_file_2).await.unwrap();
|
||||||
|
persister.persist_segment(info_file_3).await.unwrap();
|
||||||
|
|
||||||
|
let segments = persister.load_segments(2).await.unwrap();
|
||||||
|
assert_eq!(segments.len(), 2);
|
||||||
|
// The most recent one is first
|
||||||
|
assert_eq!(segments[0].segment_id.0, 2);
|
||||||
|
assert_eq!(segments[1].segment_id.0, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn persist_and_load_segment_info_files_with_fewer_than_requested() {
|
||||||
|
let local_disk = LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
|
||||||
|
let persister = PersisterImpl::new(Arc::new(local_disk));
|
||||||
|
let info_file = PersistedSegment {
|
||||||
|
segment_id: SegmentId::new(0),
|
||||||
|
segment_wal_size_bytes: 0,
|
||||||
|
databases: HashMap::new(),
|
||||||
|
segment_min_time: 0,
|
||||||
|
segment_max_time: 1,
|
||||||
|
segment_row_count: 0,
|
||||||
|
segment_parquet_size_bytes: 0,
|
||||||
|
};
|
||||||
|
persister.persist_segment(info_file).await.unwrap();
|
||||||
|
let segments = persister.load_segments(2).await.unwrap();
|
||||||
|
// We asked for the most recent 2 but there should only be 1
|
||||||
|
assert_eq!(segments.len(), 1);
|
||||||
|
assert_eq!(segments[0].segment_id.0, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
/// This test makes sure that the logic for offset lists works
|
||||||
|
async fn persist_and_load_over_9000_segment_info_files() {
|
||||||
|
let local_disk = LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
|
||||||
|
let persister = PersisterImpl::new(Arc::new(local_disk));
|
||||||
|
for id in 0..9001 {
|
||||||
|
let info_file = PersistedSegment {
|
||||||
|
segment_id: SegmentId::new(id),
|
||||||
|
segment_wal_size_bytes: 0,
|
||||||
|
databases: HashMap::new(),
|
||||||
|
segment_min_time: 0,
|
||||||
|
segment_max_time: 1,
|
||||||
|
segment_row_count: 0,
|
||||||
|
segment_parquet_size_bytes: 0,
|
||||||
|
};
|
||||||
|
persister.persist_segment(info_file).await.unwrap();
|
||||||
|
}
|
||||||
|
let segments = persister.load_segments(9500).await.unwrap();
|
||||||
|
// We asked for the most recent 9500 so there should be 9001 of them
|
||||||
|
assert_eq!(segments.len(), 9001);
|
||||||
|
assert_eq!(segments[0].segment_id.0, 9000);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn get_parquet_bytes() {
|
||||||
|
let local_disk = LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
|
||||||
|
let persister = PersisterImpl::new(Arc::new(local_disk));
|
||||||
|
|
||||||
|
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
|
||||||
|
let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5);
|
||||||
|
|
||||||
|
let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
|
||||||
|
let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(id_array)]).unwrap();
|
||||||
|
|
||||||
|
let id_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
|
||||||
|
let batch2 = RecordBatch::try_new(schema.clone(), vec![Arc::new(id_array)]).unwrap();
|
||||||
|
|
||||||
|
stream_builder.tx().send(Ok(batch1)).await.unwrap();
|
||||||
|
stream_builder.tx().send(Ok(batch2)).await.unwrap();
|
||||||
|
|
||||||
|
let parquet = persister
|
||||||
|
.serialize_to_parquet(stream_builder.build())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Assert we've written all the expected rows
|
||||||
|
assert_eq!(parquet.meta_data.num_rows, 10);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn persist_and_load_parquet_bytes() {
|
||||||
|
let local_disk = LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
|
||||||
|
let persister = PersisterImpl::new(Arc::new(local_disk));
|
||||||
|
|
||||||
|
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
|
||||||
|
let stream_builder = RecordBatchReceiverStreamBuilder::new(schema.clone(), 5);
|
||||||
|
|
||||||
|
let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
|
||||||
|
let batch1 = RecordBatch::try_new(schema.clone(), vec![Arc::new(id_array)]).unwrap();
|
||||||
|
|
||||||
|
let id_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
|
||||||
|
let batch2 = RecordBatch::try_new(schema.clone(), vec![Arc::new(id_array)]).unwrap();
|
||||||
|
|
||||||
|
stream_builder.tx().send(Ok(batch1)).await.unwrap();
|
||||||
|
stream_builder.tx().send(Ok(batch2)).await.unwrap();
|
||||||
|
|
||||||
|
let path = ParquetFilePath::new("db_one", "table_one", Utc::now(), 1);
|
||||||
|
let meta = persister
|
||||||
|
.persist_parquet_file(path.clone(), stream_builder.build())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Assert we've written all the expected rows
|
||||||
|
assert_eq!(meta.num_rows, 10);
|
||||||
|
|
||||||
|
let bytes = persister.load_parquet_file(path).await.unwrap();
|
||||||
|
|
||||||
|
// Assert that we have a file of bytes > 0
|
||||||
|
assert!(!bytes.is_empty())
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue