refactor: move catalog and last cache initialization out of write buffer (#25273)

* refactor: add catalog as dep to influxdb3

* refactor: move catalog and last cache initialization out of write buffer

The Write buffer used to handle initialization of the catalog and last
n value cache. This commit moves that logic out, so that both can be
initialized independently, and injected into the write buffer. This is to
enable downstream changes that will need to make sharing the catalog and
last cache possible.
pull/25280/head
Trevor Hilton 2024-08-27 13:41:40 -07:00 committed by GitHub
parent 734fd2d19a
commit 3f7b0e655c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 102 additions and 28 deletions

2
Cargo.lock generated
View File

@ -2388,6 +2388,7 @@ dependencies = [
name = "influxdb3"
version = "0.1.0"
dependencies = [
"anyhow",
"arrow",
"arrow-array",
"arrow-flight",
@ -2406,6 +2407,7 @@ dependencies = [
"hex",
"humantime",
"hyper 0.14.30",
"influxdb3_catalog",
"influxdb3_client",
"influxdb3_process",
"influxdb3_server",

View File

@ -24,6 +24,7 @@ trace_exporters.workspace = true
trogging.workspace = true
# Local Crates
influxdb3_catalog = { path = "../influxdb3_catalog" }
influxdb3_client = { path = "../influxdb3_client" }
influxdb3_process = { path = "../influxdb3_process", default-features = false }
influxdb3_server = { path = "../influxdb3_server" }
@ -31,6 +32,7 @@ influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_write = { path = "../influxdb3_write" }
# Crates.io dependencies
anyhow.workspace = true
backtrace.workspace = true
base64.workspace = true
clap.workspace = true

View File

@ -15,8 +15,7 @@ use influxdb3_server::{
CommonServerState,
};
use influxdb3_wal::{Level0Duration, WalConfig};
use influxdb3_write::write_buffer::WriteBufferImpl;
use influxdb3_write::{persister::Persister, WriteBuffer};
use influxdb3_write::{persister::Persister, write_buffer::WriteBufferImpl, WriteBuffer};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
use iox_time::SystemProvider;
use object_store::DynObjectStore;
@ -61,6 +60,12 @@ pub enum Error {
#[error("invalid token: {0}")]
InvalidToken(#[from] hex::FromHexError),
#[error("failed to initialized write buffer: {0}")]
WriteBufferInit(#[source] anyhow::Error),
#[error("failed to initialize from persisted catalog: {0}")]
InitializePersistedCatalog(#[source] influxdb3_write::persister::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -305,14 +310,21 @@ pub async fn command(config: Config) -> Result<()> {
};
let time_provider = Arc::new(SystemProvider::new());
let (last_cache, catalog) = persister
.load_last_cache_and_catalog()
.await
.map_err(Error::InitializePersistedCatalog)?;
let write_buffer: Arc<dyn WriteBuffer> = Arc::new(
WriteBufferImpl::new(
Arc::clone(&persister),
Arc::new(catalog),
Arc::new(last_cache),
Arc::<SystemProvider>::clone(&time_provider),
Arc::clone(&exec),
wal_config,
)
.await?,
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?,
);
let query_executor = Arc::new(QueryExecutorImpl::new(
write_buffer.catalog(),

View File

@ -227,7 +227,9 @@ mod tests {
use crate::serve;
use datafusion::parquet::data_type::AsBytes;
use hyper::{body, Body, Client, Request, Response, StatusCode};
use influxdb3_catalog::catalog::Catalog;
use influxdb3_wal::WalConfig;
use influxdb3_write::last_cache::LastCacheProvider;
use influxdb3_write::persister::Persister;
use influxdb3_write::WriteBuffer;
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
@ -762,6 +764,8 @@ mod tests {
let write_buffer: Arc<dyn WriteBuffer> = Arc::new(
influxdb3_write::write_buffer::WriteBufferImpl::new(
Arc::clone(&persister),
Arc::new(Catalog::new()),
Arc::new(LastCacheProvider::new()),
Arc::<MockProvider>::clone(&time_provider),
Arc::clone(&exec),
WalConfig::test_config(),

View File

@ -581,8 +581,12 @@ mod tests {
use data_types::NamespaceName;
use datafusion::{assert_batches_sorted_eq, error::DataFusionError};
use futures::TryStreamExt;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_wal::{Level0Duration, WalConfig};
use influxdb3_write::{persister::Persister, write_buffer::WriteBufferImpl, WriteBuffer};
use influxdb3_write::{
last_cache::LastCacheProvider, persister::Persister, write_buffer::WriteBufferImpl,
WriteBuffer,
};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
use iox_time::{MockProvider, Time};
use metric::Registry;
@ -622,9 +626,11 @@ mod tests {
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let executor = make_exec(object_store);
let write_buffer = Arc::new(
let write_buffer: Arc<dyn WriteBuffer> = Arc::new(
WriteBufferImpl::new(
Arc::clone(&persister),
Arc::new(Catalog::new()),
Arc::new(LastCacheProvider::new()),
Arc::<MockProvider>::clone(&time_provider),
Arc::clone(&executor),
WalConfig {
@ -641,7 +647,7 @@ mod tests {
let df_config = Arc::new(Default::default());
let query_executor = QueryExecutorImpl::new(
write_buffer.catalog(),
Arc::<WriteBufferImpl>::clone(&write_buffer),
Arc::clone(&write_buffer),
executor,
metrics,
df_config,

View File

@ -63,6 +63,7 @@ impl Error {
type CacheMap = RwLock<HashMap<String, HashMap<String, HashMap<String, LastCache>>>>;
/// Provides all last-N-value caches for the entire database
#[derive(Default)]
pub struct LastCacheProvider {
cache_map: CacheMap,
}
@ -110,10 +111,8 @@ pub struct CreateCacheArguments {
impl LastCacheProvider {
/// Create a new [`LastCacheProvider`]
pub(crate) fn new() -> Self {
Self {
cache_map: Default::default(),
}
pub fn new() -> Self {
Self::default()
}
/// Initialize a [`LastCacheProvider`] from a [`InnerCatalog`]
@ -1586,6 +1585,8 @@ mod tests {
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
WriteBufferImpl::new(
persister,
Arc::new(Catalog::new()),
Arc::new(LastCacheProvider::new()),
time_provider,
crate::test_help::make_exec(),
WalConfig::test_config(),

View File

@ -1,6 +1,8 @@
//! This is the implementation of the `Persister` used to write data from the buffer to object
//! storage.
use crate::last_cache;
use crate::last_cache::LastCacheProvider;
use crate::paths::CatalogFilePath;
use crate::paths::ParquetFilePath;
use crate::paths::SnapshotInfoFilePath;
@ -52,6 +54,9 @@ pub enum Error {
#[error("parse int error: {0}")]
ParseInt(#[from] std::num::ParseIntError),
#[error("failed to initialize last cache: {0}")]
InitializingLastCache(#[from] last_cache::Error),
}
impl From<Error> for DataFusionError {
@ -97,6 +102,7 @@ impl Persister {
}
}
/// Get the Object Store URL
pub fn object_store_url(&self) -> &ObjectStoreUrl {
&self.object_store_url
}
@ -108,11 +114,28 @@ impl Persister {
serialize_to_parquet(Arc::clone(&self.mem_pool), batches).await
}
/// Get the host identifier prefix
pub fn host_identifier_prefix(&self) -> &str {
&self.host_identifier_prefix
}
/// Loads the most recently persisted catalog from object storage, and uses it to initialize
/// a [`LastCacheProvider`].
///
/// This is intended to be used on server start.
pub async fn load_last_cache_and_catalog(&self) -> Result<(LastCacheProvider, Catalog)> {
match self.load_catalog().await? {
Some(c) => Ok((
LastCacheProvider::new_from_catalog(&c.catalog)?,
Catalog::from_inner(c.catalog),
)),
None => Ok((LastCacheProvider::new(), Catalog::new())),
}
}
/// Loads the most recently persisted catalog from object storage.
///
/// This is used on server start.
pub async fn load_catalog(&self) -> Result<Option<PersistedCatalog>> {
let mut list = self
.object_store
@ -169,6 +192,8 @@ impl Persister {
}
/// Loads the most recently persisted N snapshot parquet file lists from object storage.
///
/// This is intended to be used on server start.
pub async fn load_snapshots(&self, mut most_recent_n: usize) -> Result<Vec<PersistedSnapshot>> {
let mut output = Vec::new();
let mut offset: Option<ObjPath> = None;
@ -230,6 +255,7 @@ impl Persister {
}
/// Loads a Parquet file from ObjectStore
#[cfg(test)]
pub async fn load_parquet_file(&self, path: ParquetFilePath) -> Result<Bytes> {
Ok(self.object_store.get(&path).await?.bytes().await?)
}

View File

@ -1,7 +1,7 @@
//! Implementation of an in-memory buffer for writes that persists data into a wal if it is configured.
pub mod persisted_files;
mod queryable_buffer;
pub mod queryable_buffer;
mod table_buffer;
pub(crate) mod validator;
@ -87,6 +87,9 @@ pub enum Error {
#[error("error from wal: {0}")]
WalError(#[from] influxdb3_wal::Error),
#[error("cannot write to a read-only server")]
NoWriteInReadOnly,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -107,7 +110,6 @@ pub struct WriteBufferImpl {
buffer: Arc<QueryableBuffer>,
wal_config: WalConfig,
wal: Arc<dyn Wal>,
#[allow(dead_code)]
time_provider: Arc<dyn TimeProvider>,
last_cache: Arc<LastCacheProvider>,
}
@ -117,20 +119,13 @@ const N_SNAPSHOTS_TO_LOAD_ON_START: usize = 1_000;
impl WriteBufferImpl {
pub async fn new(
persister: Arc<Persister>,
catalog: Arc<Catalog>,
last_cache: Arc<LastCacheProvider>,
time_provider: Arc<dyn TimeProvider>,
executor: Arc<iox_query::exec::Executor>,
wal_config: WalConfig,
) -> Result<Self> {
// load up the catalog, the snapshots, and replay the wal into the in memory buffer
let catalog = persister.load_catalog().await?;
let catalog = Arc::new(
catalog
.map(|c| Catalog::from_inner(c.catalog))
.unwrap_or_else(Catalog::new),
);
let last_cache = Arc::new(LastCacheProvider::new_from_catalog(&catalog.clone_inner())?);
// load snapshots and replay the wal into the in memory buffer
let persisted_snapshots = persister
.load_snapshots(N_SNAPSHOTS_TO_LOAD_ON_START)
.await?;
@ -160,7 +155,7 @@ impl WriteBufferImpl {
));
// create the wal instance, which will replay into the queryable buffer and start
// teh background flush task.
// the background flush task.
let wal = WalObjectStore::new(
persister.object_store(),
persister.host_identifier_prefix(),
@ -641,10 +636,14 @@ mod tests {
async fn writes_data_to_wal_and_is_queryable() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let (last_cache, catalog) = persister.load_last_cache_and_catalog().await.unwrap();
let time_provider: Arc<dyn TimeProvider> =
Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let write_buffer = WriteBufferImpl::new(
Arc::clone(&persister),
Arc::<MockProvider>::clone(&time_provider),
Arc::new(catalog),
Arc::new(last_cache),
Arc::clone(&time_provider),
crate::test_help::make_exec(),
WalConfig::test_config(),
)
@ -711,9 +710,12 @@ mod tests {
assert_batches_eq!(&expected, &actual);
// now load a new buffer from object storage
let (last_cache, catalog) = persister.load_last_cache_and_catalog().await.unwrap();
let write_buffer = WriteBufferImpl::new(
Arc::clone(&persister),
Arc::<MockProvider>::clone(&time_provider),
Arc::new(catalog),
Arc::new(last_cache),
Arc::clone(&time_provider),
crate::test_help::make_exec(),
WalConfig {
level_0_duration: Level0Duration::new_1m(),
@ -761,8 +763,11 @@ mod tests {
.unwrap();
// load a new write buffer to ensure its durable
let (last_cache, catalog) = wbuf.persister.load_last_cache_and_catalog().await.unwrap();
let wbuf = WriteBufferImpl::new(
Arc::clone(&wbuf.persister),
Arc::new(catalog),
Arc::new(last_cache),
Arc::clone(&wbuf.time_provider),
Arc::clone(&wbuf.buffer.executor),
WalConfig {
@ -791,8 +796,11 @@ mod tests {
.unwrap();
// and do another replay and verification
let (last_cache, catalog) = wbuf.persister.load_last_cache_and_catalog().await.unwrap();
let wbuf = WriteBufferImpl::new(
Arc::clone(&wbuf.persister),
Arc::new(catalog),
Arc::new(last_cache),
Arc::clone(&wbuf.time_provider),
Arc::clone(&wbuf.buffer.executor),
WalConfig {
@ -842,8 +850,11 @@ mod tests {
.unwrap();
// do another reload and verify it's gone
let (last_cache, catalog) = wbuf.persister.load_last_cache_and_catalog().await.unwrap();
let wbuf = WriteBufferImpl::new(
Arc::clone(&wbuf.persister),
Arc::new(catalog),
Arc::new(last_cache),
Arc::clone(&wbuf.time_provider),
Arc::clone(&wbuf.buffer.executor),
WalConfig {
@ -985,8 +996,15 @@ mod tests {
assert_batches_eq!(&expected, &actual);
// and now replay in a new write buffer and attempt to write
let (last_cache, catalog) = write_buffer
.persister
.load_last_cache_and_catalog()
.await
.unwrap();
let write_buffer = WriteBufferImpl::new(
Arc::clone(&write_buffer.persister),
Arc::new(catalog),
Arc::new(last_cache),
Arc::clone(&write_buffer.time_provider),
Arc::clone(&write_buffer.buffer.executor),
WalConfig {
@ -1343,10 +1361,13 @@ mod tests {
wal_config: WalConfig,
) -> (WriteBufferImpl, IOxSessionContext) {
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
let time_provider = Arc::new(MockProvider::new(start));
let time_provider: Arc<dyn TimeProvider> = Arc::new(MockProvider::new(start));
let (last_cache, catalog) = persister.load_last_cache_and_catalog().await.unwrap();
let wbuf = WriteBufferImpl::new(
Arc::clone(&persister),
Arc::<MockProvider>::clone(&time_provider),
Arc::new(catalog),
Arc::new(last_cache),
Arc::clone(&time_provider),
crate::test_help::make_exec(),
wal_config,
)