refactor: use more `dyn Trait` in write buffer (#25264)

* refactor: use dyn traits in WriteBufferImpl

This changes the WriteBufferImpl to use a dyn TimeProvider instead of
a generic in its type signature.

The Server type now uses a dyn WriteBuffer instead of using a generic
in its type signature, and the ServerBuilder was updated to accommodate
this accordingly.

These chages were to make downstream code changes more seamless.

* refactor: make some items pub

This makes functions on the QueryableBuffer and LastCache pub so that they
can be used downstream.
pull/25271/head
Trevor Hilton 2024-08-23 11:21:20 -07:00 committed by GitHub
parent cbb7bc5901
commit e0e0075766
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 50 additions and 74 deletions

View File

@ -15,8 +15,8 @@ use influxdb3_server::{
CommonServerState,
};
use influxdb3_wal::{Level0Duration, WalConfig};
use influxdb3_write::persister::Persister;
use influxdb3_write::write_buffer::WriteBufferImpl;
use influxdb3_write::{persister::Persister, WriteBuffer};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
use iox_time::SystemProvider;
use object_store::DynObjectStore;
@ -305,10 +305,10 @@ pub async fn command(config: Config) -> Result<()> {
};
let time_provider = Arc::new(SystemProvider::new());
let write_buffer = Arc::new(
let write_buffer: Arc<dyn WriteBuffer> = Arc::new(
WriteBufferImpl::new(
Arc::clone(&persister),
Arc::clone(&time_provider),
Arc::<SystemProvider>::clone(&time_provider),
Arc::clone(&exec),
wal_config,
)
@ -316,7 +316,7 @@ pub async fn command(config: Config) -> Result<()> {
);
let query_executor = Arc::new(QueryExecutorImpl::new(
write_buffer.catalog(),
Arc::<WriteBufferImpl<SystemProvider>>::clone(&write_buffer),
Arc::clone(&write_buffer),
Arc::clone(&exec),
Arc::clone(&metrics),
Arc::new(config.datafusion_config),

View File

@ -1,7 +1,7 @@
use std::sync::Arc;
use authz::Authorizer;
use influxdb3_write::persister::Persister;
use influxdb3_write::{persister::Persister, WriteBuffer};
use tokio::net::TcpListener;
use crate::{auth::DefaultAuthorizer, http::HttpApi, CommonServerState, Server};
@ -48,7 +48,7 @@ impl<W, Q, P, T, L> ServerBuilder<W, Q, P, T, L> {
#[derive(Debug)]
pub struct NoWriteBuf;
#[derive(Debug)]
pub struct WithWriteBuf<W>(Arc<W>);
pub struct WithWriteBuf(Arc<dyn WriteBuffer>);
#[derive(Debug)]
pub struct NoQueryExec;
#[derive(Debug)]
@ -67,7 +67,7 @@ pub struct NoListener;
pub struct WithListener(TcpListener);
impl<Q, P, T, L> ServerBuilder<NoWriteBuf, Q, P, T, L> {
pub fn write_buffer<W>(self, wb: Arc<W>) -> ServerBuilder<WithWriteBuf<W>, Q, P, T, L> {
pub fn write_buffer(self, wb: Arc<dyn WriteBuffer>) -> ServerBuilder<WithWriteBuf, Q, P, T, L> {
ServerBuilder {
common_state: self.common_state,
time_provider: self.time_provider,
@ -141,16 +141,10 @@ impl<W, Q, P, T> ServerBuilder<W, Q, P, T, NoListener> {
}
}
impl<W, Q, T>
ServerBuilder<
WithWriteBuf<W>,
WithQueryExec<Q>,
WithPersister,
WithTimeProvider<T>,
WithListener,
>
impl<Q, T>
ServerBuilder<WithWriteBuf, WithQueryExec<Q>, WithPersister, WithTimeProvider<T>, WithListener>
{
pub fn build(self) -> Server<W, Q, T> {
pub fn build(self) -> Server<Q, T> {
let persister = Arc::clone(&self.persister.0);
let authorizer = Arc::clone(&self.authorizer);
let http = Arc::new(HttpApi::new(

View File

@ -330,9 +330,9 @@ impl Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub(crate) struct HttpApi<W, Q, T> {
pub(crate) struct HttpApi<Q, T> {
common_state: CommonServerState,
write_buffer: Arc<W>,
write_buffer: Arc<dyn WriteBuffer>,
time_provider: Arc<T>,
pub(crate) query_executor: Arc<Q>,
max_request_bytes: usize,
@ -340,11 +340,11 @@ pub(crate) struct HttpApi<W, Q, T> {
legacy_write_param_unifier: SingleTenantRequestUnifier,
}
impl<W, Q, T> HttpApi<W, Q, T> {
impl<Q, T> HttpApi<Q, T> {
pub(crate) fn new(
common_state: CommonServerState,
time_provider: Arc<T>,
write_buffer: Arc<W>,
write_buffer: Arc<dyn WriteBuffer>,
query_executor: Arc<Q>,
max_request_bytes: usize,
authorizer: Arc<dyn Authorizer>,
@ -362,9 +362,8 @@ impl<W, Q, T> HttpApi<W, Q, T> {
}
}
impl<W, Q, T> HttpApi<W, Q, T>
impl<Q, T> HttpApi<Q, T>
where
W: WriteBuffer,
Q: QueryExecutor,
T: TimeProvider,
Error: From<<Q as QueryExecutor>::Error>,
@ -1048,8 +1047,8 @@ struct LastCacheDeleteRequest {
name: String,
}
pub(crate) async fn route_request<W: WriteBuffer, Q: QueryExecutor, T: TimeProvider>(
http_server: Arc<HttpApi<W, Q, T>>,
pub(crate) async fn route_request<Q: QueryExecutor, T: TimeProvider>(
http_server: Arc<HttpApi<Q, T>>,
mut req: Request<Body>,
) -> Result<Response<Body>, Infallible>
where

View File

@ -23,7 +23,6 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{ready, stream::Fuse, Stream, StreamExt};
use hyper::http::HeaderValue;
use hyper::{header::ACCEPT, header::CONTENT_TYPE, Body, Request, Response, StatusCode};
use influxdb3_write::WriteBuffer;
use iox_time::TimeProvider;
use observability_deps::tracing::info;
use schema::{INFLUXQL_MEASUREMENT_COLUMN_NAME, TIME_COLUMN_NAME};
@ -36,9 +35,8 @@ use super::{Error, HttpApi, Result};
const DEFAULT_CHUNK_SIZE: usize = 10_000;
impl<W, Q, T> HttpApi<W, Q, T>
impl<Q, T> HttpApi<Q, T>
where
W: WriteBuffer,
Q: QueryExecutor,
T: TimeProvider,
Error: From<<Q as QueryExecutor>::Error>,

View File

@ -29,7 +29,6 @@ use hyper::server::conn::AddrIncoming;
use hyper::server::conn::Http;
use hyper::service::service_fn;
use influxdb3_write::persister::Persister;
use influxdb3_write::WriteBuffer;
use iox_query::QueryDatabase;
use iox_query_params::StatementParams;
use iox_time::TimeProvider;
@ -116,9 +115,9 @@ impl CommonServerState {
#[allow(dead_code)]
#[derive(Debug)]
pub struct Server<W, Q, T> {
pub struct Server<Q, T> {
common_state: CommonServerState,
http: Arc<HttpApi<W, Q, T>>,
http: Arc<HttpApi<Q, T>>,
persister: Arc<Persister>,
authorizer: Arc<dyn Authorizer>,
listener: TcpListener,
@ -152,15 +151,14 @@ pub enum QueryKind {
Sql,
InfluxQl,
}
impl<W, Q, T> Server<W, Q, T> {
impl<Q, T> Server<Q, T> {
pub fn authorizer(&self) -> Arc<dyn Authorizer> {
Arc::clone(&self.authorizer)
}
}
pub async fn serve<W, Q, T>(server: Server<W, Q, T>, shutdown: CancellationToken) -> Result<()>
pub async fn serve<Q, T>(server: Server<Q, T>, shutdown: CancellationToken) -> Result<()>
where
W: WriteBuffer,
Q: QueryExecutor,
http::Error: From<<Q as QueryExecutor>::Error>,
T: TimeProvider,
@ -231,8 +229,7 @@ mod tests {
use hyper::{body, Body, Client, Request, Response, StatusCode};
use influxdb3_wal::WalConfig;
use influxdb3_write::persister::Persister;
use influxdb3_write::write_buffer::WriteBufferImpl;
use influxdb3_write::LastCacheManager;
use influxdb3_write::WriteBuffer;
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
use iox_time::{MockProvider, Time};
use object_store::DynObjectStore;
@ -739,13 +736,7 @@ mod tests {
shutdown.cancel();
}
async fn setup_server(
start_time: i64,
) -> (
String,
CancellationToken,
Arc<WriteBufferImpl<MockProvider>>,
) {
async fn setup_server(start_time: i64) -> (String, CancellationToken, Arc<dyn WriteBuffer>) {
let trace_header_parser = trace_http::ctx::TraceHeaderParser::new();
let metrics = Arc::new(metric::Registry::new());
let common_state =
@ -768,10 +759,10 @@ mod tests {
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(start_time)));
let write_buffer = Arc::new(
let write_buffer: Arc<dyn WriteBuffer> = Arc::new(
influxdb3_write::write_buffer::WriteBufferImpl::new(
Arc::clone(&persister),
Arc::clone(&time_provider),
Arc::<MockProvider>::clone(&time_provider),
Arc::clone(&exec),
WalConfig::test_config(),
)
@ -780,7 +771,7 @@ mod tests {
);
let query_executor = crate::query_executor::QueryExecutorImpl::new(
write_buffer.catalog(),
Arc::<WriteBufferImpl<MockProvider>>::clone(&write_buffer),
Arc::clone(&write_buffer),
Arc::clone(&exec),
Arc::clone(&metrics),
Arc::new(HashMap::new()),

View File

@ -582,7 +582,7 @@ mod tests {
use datafusion::{assert_batches_sorted_eq, error::DataFusionError};
use futures::TryStreamExt;
use influxdb3_wal::{Level0Duration, WalConfig};
use influxdb3_write::{persister::Persister, write_buffer::WriteBufferImpl, Bufferer};
use influxdb3_write::{persister::Persister, write_buffer::WriteBufferImpl, WriteBuffer};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
use iox_time::{MockProvider, Time};
use metric::Registry;
@ -615,9 +615,7 @@ mod tests {
))
}
type TestWriteBuffer = WriteBufferImpl<MockProvider>;
async fn setup() -> (Arc<TestWriteBuffer>, QueryExecutorImpl, Arc<MockProvider>) {
async fn setup() -> (Arc<dyn WriteBuffer>, QueryExecutorImpl, Arc<MockProvider>) {
// Set up QueryExecutor
let object_store: Arc<dyn ObjectStore> =
Arc::new(LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap());
@ -627,7 +625,7 @@ mod tests {
let write_buffer = Arc::new(
WriteBufferImpl::new(
Arc::clone(&persister),
Arc::clone(&time_provider),
Arc::<MockProvider>::clone(&time_provider),
Arc::clone(&executor),
WalConfig {
level_0_duration: Level0Duration::new_1m(),
@ -643,7 +641,7 @@ mod tests {
let df_config = Arc::new(Default::default());
let query_executor = QueryExecutorImpl::new(
write_buffer.catalog(),
Arc::<WriteBufferImpl<MockProvider>>::clone(&write_buffer),
Arc::<WriteBufferImpl>::clone(&write_buffer),
executor,
metrics,
df_config,

View File

@ -117,7 +117,7 @@ impl LastCacheProvider {
}
/// Initialize a [`LastCacheProvider`] from a [`InnerCatalog`]
pub(crate) fn new_from_catalog(catalog: &InnerCatalog) -> Result<Self, Error> {
pub fn new_from_catalog(catalog: &InnerCatalog) -> Result<Self, Error> {
let provider = LastCacheProvider::new();
for db_schema in catalog.databases() {
for tbl_def in db_schema.tables() {
@ -1580,7 +1580,7 @@ mod tests {
use insta::assert_json_snapshot;
use iox_time::{MockProvider, Time};
async fn setup_write_buffer() -> WriteBufferImpl<MockProvider> {
async fn setup_write_buffer() -> WriteBufferImpl {
let obj_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let persister = Arc::new(Persister::new(obj_store, "test_host"));
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));

View File

@ -99,7 +99,7 @@ pub struct WriteRequest<'a> {
}
#[derive(Debug)]
pub struct WriteBufferImpl<T> {
pub struct WriteBufferImpl {
catalog: Arc<Catalog>,
persister: Arc<Persister>,
parquet_cache: Arc<ParquetCache>,
@ -108,16 +108,16 @@ pub struct WriteBufferImpl<T> {
wal_config: WalConfig,
wal: Arc<dyn Wal>,
#[allow(dead_code)]
time_provider: Arc<T>,
time_provider: Arc<dyn TimeProvider>,
last_cache: Arc<LastCacheProvider>,
}
const N_SNAPSHOTS_TO_LOAD_ON_START: usize = 1_000;
impl<T: TimeProvider> WriteBufferImpl<T> {
impl WriteBufferImpl {
pub async fn new(
persister: Arc<Persister>,
time_provider: Arc<T>,
time_provider: Arc<dyn TimeProvider>,
executor: Arc<iox_query::exec::Executor>,
wal_config: WalConfig,
) -> Result<Self> {
@ -463,7 +463,7 @@ pub(crate) fn parquet_chunk_from_file(
}
#[async_trait]
impl<T: TimeProvider> Bufferer for WriteBufferImpl<T> {
impl Bufferer for WriteBufferImpl {
async fn write_lp(
&self,
database: NamespaceName<'static>,
@ -497,7 +497,7 @@ impl<T: TimeProvider> Bufferer for WriteBufferImpl<T> {
}
}
impl<T: TimeProvider> ChunkContainer for WriteBufferImpl<T> {
impl ChunkContainer for WriteBufferImpl {
fn get_table_chunks(
&self,
database_name: &str,
@ -511,7 +511,7 @@ impl<T: TimeProvider> ChunkContainer for WriteBufferImpl<T> {
}
#[async_trait::async_trait]
impl<T: TimeProvider> LastCacheManager for WriteBufferImpl<T> {
impl LastCacheManager for WriteBufferImpl {
fn last_cache_provider(&self) -> Arc<LastCacheProvider> {
Arc::clone(&self.last_cache)
}
@ -593,7 +593,7 @@ impl<T: TimeProvider> LastCacheManager for WriteBufferImpl<T> {
}
}
impl<T: TimeProvider> WriteBuffer for WriteBufferImpl<T> {}
impl WriteBuffer for WriteBufferImpl {}
#[cfg(test)]
mod tests {
@ -644,7 +644,7 @@ mod tests {
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let write_buffer = WriteBufferImpl::new(
Arc::clone(&persister),
Arc::clone(&time_provider),
Arc::<MockProvider>::clone(&time_provider),
crate::test_help::make_exec(),
WalConfig::test_config(),
)
@ -713,7 +713,7 @@ mod tests {
// now load a new buffer from object storage
let write_buffer = WriteBufferImpl::new(
Arc::clone(&persister),
Arc::clone(&time_provider),
Arc::<MockProvider>::clone(&time_provider),
crate::test_help::make_exec(),
WalConfig {
level_0_duration: Level0Duration::new_1m(),
@ -1341,12 +1341,12 @@ mod tests {
start: Time,
object_store: Arc<dyn ObjectStore>,
wal_config: WalConfig,
) -> (WriteBufferImpl<MockProvider>, IOxSessionContext) {
) -> (WriteBufferImpl, IOxSessionContext) {
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
let time_provider = Arc::new(MockProvider::new(start));
let wbuf = WriteBufferImpl::new(
Arc::clone(&persister),
Arc::clone(&time_provider),
Arc::<MockProvider>::clone(&time_provider),
crate::test_help::make_exec(),
wal_config,
)
@ -1359,7 +1359,7 @@ mod tests {
}
async fn get_table_batches(
write_buffer: &WriteBufferImpl<MockProvider>,
write_buffer: &WriteBufferImpl,
database_name: &str,
table_name: &str,
ctx: &IOxSessionContext,

View File

@ -33,7 +33,7 @@ use tokio::sync::oneshot;
use tokio::sync::oneshot::Receiver;
#[derive(Debug)]
pub(crate) struct QueryableBuffer {
pub struct QueryableBuffer {
pub(crate) executor: Arc<Executor>,
catalog: Arc<Catalog>,
last_cache_provider: Arc<LastCacheProvider>,
@ -43,7 +43,7 @@ pub(crate) struct QueryableBuffer {
}
impl QueryableBuffer {
pub(crate) fn new(
pub fn new(
executor: Arc<Executor>,
catalog: Arc<Catalog>,
persister: Arc<Persister>,
@ -61,7 +61,7 @@ impl QueryableBuffer {
}
}
pub(crate) fn get_table_chunks(
pub fn get_table_chunks(
&self,
db_schema: Arc<DatabaseSchema>,
table_name: &str,
@ -283,11 +283,7 @@ impl QueryableBuffer {
receiver
}
pub(crate) fn persisted_parquet_files(
&self,
db_name: &str,
table_name: &str,
) -> Vec<ParquetFile> {
pub fn persisted_parquet_files(&self, db_name: &str, table_name: &str) -> Vec<ParquetFile> {
self.persisted_files.get_files(db_name, table_name)
}
}