diff --git a/Cargo.lock b/Cargo.lock index 6f820f89a3..2c32bebc98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -469,7 +469,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -480,7 +480,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -653,9 +653,9 @@ dependencies = [ [[package]] name = "blake3" -version = "1.5.1" +version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30cca6d3674597c30ddf2c587bf8d9d65c9a84d2326d941cc79c9842dfe0ef52" +checksum = "e9ec96fe9a81b5e365f9db71fe00edc4fe4ca2cc7dcb7861f0603012a7caa210" dependencies = [ "arrayref", "arrayvec", @@ -719,9 +719,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.6.0" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" [[package]] name = "bzip2" @@ -766,13 +766,12 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.0" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaff6f8ce506b9773fa786672d63fc7a191ffea1be33f72bbd4aeacefca9ffc8" +checksum = "324c74f2155653c90b04f25b2a47a8a631360cb908f92a772695f430c7e31052" dependencies = [ "jobserver", "libc", - "once_cell", ] [[package]] @@ -887,7 +886,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1178,7 +1177,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1202,7 +1201,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1213,7 +1212,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1900,7 +1899,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -2164,9 +2163,9 @@ dependencies = [ [[package]] name = "http-body" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", "http 1.1.0", @@ -2181,7 +2180,7 @@ dependencies = [ "bytes", "futures-util", "http 1.1.0", - "http-body 1.0.0", + "http-body 1.0.1", "pin-project-lite", ] @@ -2238,7 +2237,7 @@ dependencies = [ "futures-util", "h2 0.4.5", "http 1.1.0", - "http-body 1.0.0", + "http-body 1.0.1", "httparse", "itoa", "pin-project-lite", @@ -2301,7 +2300,7 @@ dependencies = [ "futures-channel", "futures-util", "http 1.1.0", - "http-body 1.0.0", + "http-body 1.0.1", "hyper 1.4.1", "pin-project-lite", "socket2", @@ -3529,7 +3528,7 @@ checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ "cfg-if", "libc", - "redox_syscall 0.5.2", + "redox_syscall 0.5.3", "smallvec", "windows-targets 0.52.6", ] @@ -3731,7 +3730,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -3847,7 +3846,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e" dependencies = [ "proc-macro2", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -3942,7 +3941,7 @@ dependencies = [ "prost 0.12.6", "prost-types 0.12.6", "regex", - "syn 2.0.70", + "syn 2.0.71", "tempfile", ] @@ -3969,7 +3968,7 @@ dependencies = [ "itertools 0.12.1", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4148,9 +4147,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd" +checksum = "2a908a6e00f1fdd0dfd9c0eb08ce85126f6d8bbda50017e74bc4a4b7d4a926a4" dependencies = [ "bitflags 2.6.0", ] @@ -4257,7 +4256,7 @@ dependencies = [ "futures-util", "h2 0.4.5", "http 1.1.0", - "http-body 1.0.0", + "http-body 1.0.1", "http-body-util", "hyper 1.4.1", "hyper-rustls 0.27.2", @@ -4553,9 +4552,9 @@ dependencies = [ [[package]] name = "security-framework" -version = "2.11.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c627723fd09706bacdb5cf41499e95098555af3c3c29d014dc3c458ef6be11c0" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ "bitflags 2.6.0", "core-foundation", @@ -4566,9 +4565,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.11.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317936bbbd05227752583946b9e66d7ce3b489f84e11a94a510b4437fef407d7" +checksum = "75da29fe9b9b08fe9d6b22b5b4bcbc75d8db3aa31e639aa56bb62e9d46bfceaf" dependencies = [ "core-foundation-sys", "libc", @@ -4603,7 +4602,7 @@ checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4631,9 +4630,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.8.3" +version = "3.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e73139bc5ec2d45e6c5fd85be5a46949c1c39a4c18e56915f5eb4c12f975e377" +checksum = "69cecfa94848272156ea67b2b1a53f20fc7bc638c4a46d2f8abde08f05f4b857" dependencies = [ "base64 0.22.1", "chrono", @@ -4649,14 +4648,14 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.8.3" +version = "3.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b80d3d6b56b64335c0180e5ffde23b3c5e08c14c585b51a15bd0e95393f46703" +checksum = "a8fee4991ef4f274617a51ad4af30519438dacb2f56ac773b08a1922ff743350" dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4826,7 +4825,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4892,7 +4891,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -5149,7 +5148,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -5171,9 +5170,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.70" +version = "2.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f0209b68b3613b093e0ec905354eccaedcfe83b8cb37cbdeae64026c3064c16" +checksum = "b146dcf730474b4bcd16c311627b31ede9ab149045db4d6088b3becaea046462" dependencies = [ "proc-macro2", "quote", @@ -5268,22 +5267,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.61" +version = "1.0.62" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709" +checksum = "f2675633b1499176c2dff06b0856a27976a8f9d436737b4cf4f312d4d91d8bbb" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.61" +version = "1.0.62" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" +checksum = "d20468752b09f49e909e55a5d338caa8bedf615594e9d80bc4c565d30faf798c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -5442,7 +5441,7 @@ checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -5593,7 +5592,7 @@ dependencies = [ "proc-macro2", "prost-build", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -5726,7 +5725,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -6042,7 +6041,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", "wasm-bindgen-shared", ] @@ -6076,7 +6075,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -6428,7 +6427,7 @@ dependencies = [ "strum", "subtle", "syn 1.0.109", - "syn 2.0.70", + "syn 2.0.71", "thrift", "tokio", "tokio-stream", @@ -6482,7 +6481,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -6502,7 +6501,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index fcceada514..436372f088 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -31,6 +31,7 @@ use std::{ sync::Arc, }; use thiserror::Error; +use tokio::net::TcpListener; use tokio_util::sync::CancellationToken; use trace_exporters::TracingConfig; use trace_http::ctx::TraceHeaderParser; @@ -54,6 +55,9 @@ pub enum Error { #[error("Error initializing tokio runtime: {0}")] TokioRuntime(#[source] std::io::Error), + #[error("Failed to bind address")] + BindAddress(#[source] std::io::Error), + #[error("Server error: {0}")] Server(#[from] influxdb3_server::Error), @@ -266,12 +270,8 @@ pub async fn command(config: Config) -> Result<()> { ) .with_jaeger_debug_name(config.tracing_config.traces_jaeger_debug_name); - let common_state = CommonServerState::new( - Arc::clone(&metrics), - trace_exporter, - trace_header_parser, - *config.http_bind_address, - )?; + let common_state = + CommonServerState::new(Arc::clone(&metrics), trace_exporter, trace_header_parser)?; let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store))); let wal: Option> = config .wal_directory @@ -300,12 +300,17 @@ pub async fn command(config: Config) -> Result<()> { config.query_log_size, )); + let listener = TcpListener::bind(*config.http_bind_address) + .await + .map_err(Error::BindAddress)?; + let builder = ServerBuilder::new(common_state) .max_request_size(config.max_http_request_size) .write_buffer(write_buffer) .query_executor(query_executor) .time_provider(time_provider) - .persister(persister); + .persister(persister) + .tcp_listener(listener); let server = if let Some(token) = config.bearer_token.map(hex::decode).transpose()? { builder diff --git a/influxdb3_server/src/builder.rs b/influxdb3_server/src/builder.rs index d7000c4944..7a44648f07 100644 --- a/influxdb3_server/src/builder.rs +++ b/influxdb3_server/src/builder.rs @@ -1,21 +1,23 @@ use std::sync::Arc; use authz::Authorizer; +use tokio::net::TcpListener; use crate::{auth::DefaultAuthorizer, http::HttpApi, CommonServerState, Server}; #[derive(Debug)] -pub struct ServerBuilder { +pub struct ServerBuilder { common_state: CommonServerState, time_provider: T, max_request_size: usize, write_buffer: W, query_executor: Q, persister: P, + listener: L, authorizer: Arc, } -impl ServerBuilder { +impl ServerBuilder { pub fn new(common_state: CommonServerState) -> Self { Self { common_state, @@ -24,12 +26,13 @@ impl ServerBuilder { write_buffer: NoWriteBuf, query_executor: NoQueryExec, persister: NoPersister, + listener: NoListener, authorizer: Arc::new(DefaultAuthorizer), } } } -impl ServerBuilder { +impl ServerBuilder { pub fn max_request_size(mut self, max_request_size: usize) -> Self { self.max_request_size = max_request_size; self @@ -57,9 +60,13 @@ pub struct WithPersister

(Arc

); pub struct NoTimeProvider; #[derive(Debug)] pub struct WithTimeProvider(Arc); +#[derive(Debug)] +pub struct NoListener; +#[derive(Debug)] +pub struct WithListener(TcpListener); -impl ServerBuilder { - pub fn write_buffer(self, wb: Arc) -> ServerBuilder, Q, P, T> { +impl ServerBuilder { + pub fn write_buffer(self, wb: Arc) -> ServerBuilder, Q, P, T, L> { ServerBuilder { common_state: self.common_state, time_provider: self.time_provider, @@ -67,13 +74,14 @@ impl ServerBuilder { write_buffer: WithWriteBuf(wb), query_executor: self.query_executor, persister: self.persister, + listener: self.listener, authorizer: self.authorizer, } } } -impl ServerBuilder { - pub fn query_executor(self, qe: Arc) -> ServerBuilder, P, T> { +impl ServerBuilder { + pub fn query_executor(self, qe: Arc) -> ServerBuilder, P, T, L> { ServerBuilder { common_state: self.common_state, time_provider: self.time_provider, @@ -81,13 +89,14 @@ impl ServerBuilder { write_buffer: self.write_buffer, query_executor: WithQueryExec(qe), persister: self.persister, + listener: self.listener, authorizer: self.authorizer, } } } -impl ServerBuilder { - pub fn persister

(self, p: Arc

) -> ServerBuilder, T> { +impl ServerBuilder { + pub fn persister

(self, p: Arc

) -> ServerBuilder, T, L> { ServerBuilder { common_state: self.common_state, time_provider: self.time_provider, @@ -95,13 +104,14 @@ impl ServerBuilder { write_buffer: self.write_buffer, query_executor: self.query_executor, persister: WithPersister(p), + listener: self.listener, authorizer: self.authorizer, } } } -impl ServerBuilder { - pub fn time_provider(self, tp: Arc) -> ServerBuilder> { +impl ServerBuilder { + pub fn time_provider(self, tp: Arc) -> ServerBuilder, L> { ServerBuilder { common_state: self.common_state, time_provider: WithTimeProvider(tp), @@ -109,13 +119,35 @@ impl ServerBuilder { write_buffer: self.write_buffer, query_executor: self.query_executor, persister: self.persister, + listener: self.listener, + authorizer: self.authorizer, + } + } +} + +impl ServerBuilder { + pub fn tcp_listener(self, listener: TcpListener) -> ServerBuilder { + ServerBuilder { + common_state: self.common_state, + time_provider: self.time_provider, + max_request_size: self.max_request_size, + write_buffer: self.write_buffer, + query_executor: self.query_executor, + persister: self.persister, + listener: WithListener(listener), authorizer: self.authorizer, } } } impl - ServerBuilder, WithQueryExec, WithPersister

, WithTimeProvider> + ServerBuilder< + WithWriteBuf, + WithQueryExec, + WithPersister

, + WithTimeProvider, + WithListener, + > { pub fn build(self) -> Server { let persister = Arc::clone(&self.persister.0); @@ -133,6 +165,7 @@ impl http, persister, authorizer, + listener: self.listener.0, } } } diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index ddcf6e62b4..41077a3956 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -24,6 +24,8 @@ use crate::http::HttpApi; use async_trait::async_trait; use authz::Authorizer; use datafusion::execution::SendableRecordBatchStream; +use hyper::server::conn::AddrIncoming; +use hyper::server::conn::Http; use hyper::service::service_fn; use influxdb3_write::{Persister, WriteBuffer}; use iox_query::QueryDatabase; @@ -33,9 +35,9 @@ use observability_deps::tracing::error; use service::hybrid; use std::convert::Infallible; use std::fmt::Debug; -use std::net::SocketAddr; use std::sync::Arc; use thiserror::Error; +use tokio::net::TcpListener; use tokio_util::sync::CancellationToken; use tower::Layer; use trace::ctx::SpanContext; @@ -76,7 +78,6 @@ pub struct CommonServerState { metrics: Arc, trace_exporter: Option>, trace_header_parser: TraceHeaderParser, - http_addr: SocketAddr, } impl CommonServerState { @@ -84,13 +85,11 @@ impl CommonServerState { metrics: Arc, trace_exporter: Option>, trace_header_parser: TraceHeaderParser, - http_addr: SocketAddr, ) -> Result { Ok(Self { metrics, trace_exporter, trace_header_parser, - http_addr, }) } @@ -120,6 +119,7 @@ pub struct Server { http: Arc>, persister: Arc

, authorizer: Arc, + listener: TcpListener, } #[async_trait] @@ -193,7 +193,8 @@ where let hybrid_make_service = hybrid(rest_service, grpc_service); - hyper::Server::bind(&server.common_state.http_addr) + let addr = AddrIncoming::from_listener(server.listener)?; + hyper::server::Builder::new(addr, Http::new()) .serve(hybrid_make_service) .with_graceful_shutdown(shutdown.cancelled()) .await?; @@ -231,6 +232,8 @@ mod tests { use datafusion::parquet::data_type::AsBytes; use hyper::{body, Body, Client, Request, Response, StatusCode}; use influxdb3_write::persister::PersisterImpl; + use influxdb3_write::wal::WalImpl; + use influxdb3_write::write_buffer::WriteBufferImpl; use influxdb3_write::SegmentDuration; use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig}; use iox_time::{MockProvider, Time}; @@ -238,75 +241,17 @@ mod tests { use parquet_file::storage::{ParquetStorage, StorageId}; use pretty_assertions::assert_eq; use std::collections::HashMap; - use std::net::{SocketAddr, SocketAddrV4}; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::num::NonZeroUsize; - use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::Arc; + use tokio::net::TcpListener; use tokio_util::sync::CancellationToken; - static NEXT_PORT: AtomicU16 = AtomicU16::new(8090); - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn write_and_query() { - let addr = get_free_port(); - let trace_header_parser = trace_http::ctx::TraceHeaderParser::new(); - let metrics = Arc::new(metric::Registry::new()); - let common_state = - crate::CommonServerState::new(Arc::clone(&metrics), None, trace_header_parser, addr) - .unwrap(); - let object_store: Arc = Arc::new(object_store::memory::InMemory::new()); - let parquet_store = - ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3")); - let exec = Arc::new(Executor::new_with_config_and_executor( - ExecutorConfig { - target_query_partitions: NonZeroUsize::new(1).unwrap(), - object_stores: [&parquet_store] - .into_iter() - .map(|store| (store.id(), Arc::clone(store.object_store()))) - .collect(), - metric_registry: Arc::clone(&metrics), - mem_pool_size: usize::MAX, - }, - DedicatedExecutor::new_testing(), - )); - let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store))); - let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + let start_time = 0; + let (server, shutdown, _) = setup_server(start_time).await; - let write_buffer = Arc::new( - influxdb3_write::write_buffer::WriteBufferImpl::new( - Arc::clone(&persister), - None::>, - Arc::clone(&time_provider), - SegmentDuration::new_5m(), - Arc::clone(&exec), - 10000, - ) - .await - .unwrap(), - ); - let query_executor = Arc::new(crate::query_executor::QueryExecutorImpl::new( - write_buffer.catalog(), - Arc::clone(&write_buffer), - Arc::clone(&exec), - Arc::clone(&metrics), - Arc::new(HashMap::new()), - 10, - 10, - )); - - let server = ServerBuilder::new(common_state) - .write_buffer(Arc::clone(&write_buffer)) - .query_executor(Arc::clone(&query_executor)) - .persister(Arc::clone(&persister)) - .authorizer(Arc::new(DefaultAuthorizer)) - .time_provider(Arc::clone(&time_provider)) - .build(); - let frontend_shutdown = CancellationToken::new(); - let shutdown = frontend_shutdown.clone(); - - tokio::spawn(async move { serve(server, frontend_shutdown).await }); - - let server = format!("http://{}", addr); write_lp( &server, "foo", @@ -409,66 +354,9 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn write_lp_tests() { - let addr = get_free_port(); - let trace_header_parser = trace_http::ctx::TraceHeaderParser::new(); - let metrics = Arc::new(metric::Registry::new()); - let common_state = - crate::CommonServerState::new(Arc::clone(&metrics), None, trace_header_parser, addr) - .unwrap(); - let object_store: Arc = Arc::new(object_store::memory::InMemory::new()); - let parquet_store = - ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3")); - let exec = Arc::new(Executor::new_with_config_and_executor( - ExecutorConfig { - target_query_partitions: NonZeroUsize::new(1).unwrap(), - object_stores: [&parquet_store] - .into_iter() - .map(|store| (store.id(), Arc::clone(store.object_store()))) - .collect(), - metric_registry: Arc::clone(&metrics), - mem_pool_size: usize::MAX, - }, - DedicatedExecutor::new_testing(), - )); - let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store))); - let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + let start_time = 0; + let (server, shutdown, _) = setup_server(start_time).await; - let write_buffer = Arc::new( - influxdb3_write::write_buffer::WriteBufferImpl::new( - Arc::clone(&persister), - None::>, - Arc::clone(&time_provider), - SegmentDuration::new_5m(), - Arc::clone(&exec), - 10000, - ) - .await - .unwrap(), - ); - let query_executor = crate::query_executor::QueryExecutorImpl::new( - write_buffer.catalog(), - Arc::clone(&write_buffer), - Arc::clone(&exec), - Arc::clone(&metrics), - Arc::new(HashMap::new()), - 10, - 10, - ); - - let server = ServerBuilder::new(common_state) - .write_buffer(Arc::clone(&write_buffer)) - .query_executor(Arc::new(query_executor)) - .persister(persister) - .authorizer(Arc::new(DefaultAuthorizer)) - .time_provider(Arc::clone(&time_provider)) - .build(); - let frontend_shutdown = CancellationToken::new(); - let shutdown = frontend_shutdown.clone(); - - tokio::spawn(async move { serve(server, frontend_shutdown).await }); - - // Test that only one error comes back - let server = format!("http://{}", addr); let resp = write_lp( &server, "foo", @@ -615,67 +503,9 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn write_lp_precision_tests() { - let addr = get_free_port(); - let trace_header_parser = trace_http::ctx::TraceHeaderParser::new(); - let metrics = Arc::new(metric::Registry::new()); - let common_state = - crate::CommonServerState::new(Arc::clone(&metrics), None, trace_header_parser, addr) - .unwrap(); - let object_store: Arc = Arc::new(object_store::memory::InMemory::new()); - let parquet_store = - ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3")); - let exec = Arc::new(Executor::new_with_config_and_executor( - ExecutorConfig { - target_query_partitions: NonZeroUsize::new(1).unwrap(), - object_stores: [&parquet_store] - .into_iter() - .map(|store| (store.id(), Arc::clone(store.object_store()))) - .collect(), - metric_registry: Arc::clone(&metrics), - mem_pool_size: usize::MAX, - }, - DedicatedExecutor::new_testing(), - )); - let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store))); - let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos( - 1708473607000000000, - ))); + let start_time = 1708473607000000000; + let (server, shutdown, _) = setup_server(start_time).await; - let write_buffer = Arc::new( - influxdb3_write::write_buffer::WriteBufferImpl::new( - Arc::clone(&persister), - None::>, - Arc::clone(&time_provider), - SegmentDuration::new_5m(), - Arc::clone(&exec), - 10000, - ) - .await - .unwrap(), - ); - let query_executor = crate::query_executor::QueryExecutorImpl::new( - write_buffer.catalog(), - Arc::clone(&write_buffer), - Arc::clone(&exec), - Arc::clone(&metrics), - Arc::new(HashMap::new()), - 10, - 10, - ); - - let server = ServerBuilder::new(common_state) - .write_buffer(Arc::clone(&write_buffer)) - .query_executor(Arc::new(query_executor)) - .persister(persister) - .authorizer(Arc::new(DefaultAuthorizer)) - .time_provider(Arc::clone(&time_provider)) - .build(); - let frontend_shutdown = CancellationToken::new(); - let shutdown = frontend_shutdown.clone(); - - tokio::spawn(async move { serve(server, frontend_shutdown).await }); - - let server = format!("http://{}", addr); let resp = write_lp( &server, "foo", @@ -789,6 +619,153 @@ mod tests { shutdown.cancel(); } + #[tokio::test] + async fn query_from_last_cache() { + let start_time = 0; + let (url, shutdown, wbuf) = setup_server(start_time).await; + let db_name = "foo"; + let tbl_name = "cpu"; + + // Write to generate a db/table in the catalog: + let resp = write_lp( + &url, + db_name, + format!("{tbl_name},region=us,host=a usage=50 500"), + None, + false, + "second", + ) + .await; + assert_eq!(resp.status(), StatusCode::OK); + + // Create the last cache: + wbuf.create_last_cache(db_name, tbl_name, None, None, None, None, None) + .expect("create last cache"); + + // Write to put something in the last cache: + let resp = write_lp( + &url, + db_name, + format!( + "\ + {tbl_name},region=us,host=a usage=11 1000\n\ + {tbl_name},region=us,host=b usage=22 1000\n\ + {tbl_name},region=us,host=c usage=33 1000\n\ + {tbl_name},region=ca,host=d usage=44 1000\n\ + {tbl_name},region=ca,host=e usage=55 1000\n\ + {tbl_name},region=eu,host=f usage=66 1000\n\ + " + ), + None, + false, + "second", + ) + .await; + assert_eq!(resp.status(), StatusCode::OK); + + // Query from the last cache: + let res = query( + &url, + db_name, + format!("SELECT * FROM last_cache('{tbl_name}') ORDER BY host"), + "pretty", + None, + ) + .await; + let body = body::to_bytes(res.into_body()).await.unwrap(); + let body = String::from_utf8(body.as_bytes().to_vec()).unwrap(); + assert_eq!( + "\ + +------+--------+---------------------+-------+\n\ + | host | region | time | usage |\n\ + +------+--------+---------------------+-------+\n\ + | a | us | 1970-01-01T00:16:40 | 11.0 |\n\ + | b | us | 1970-01-01T00:16:40 | 22.0 |\n\ + | c | us | 1970-01-01T00:16:40 | 33.0 |\n\ + | d | ca | 1970-01-01T00:16:40 | 44.0 |\n\ + | e | ca | 1970-01-01T00:16:40 | 55.0 |\n\ + | f | eu | 1970-01-01T00:16:40 | 66.0 |\n\ + +------+--------+---------------------+-------+", + body + ); + + shutdown.cancel(); + } + + async fn setup_server( + start_time: i64, + ) -> ( + String, + CancellationToken, + Arc>, + ) { + let trace_header_parser = trace_http::ctx::TraceHeaderParser::new(); + let metrics = Arc::new(metric::Registry::new()); + let common_state = + crate::CommonServerState::new(Arc::clone(&metrics), None, trace_header_parser).unwrap(); + let object_store: Arc = Arc::new(object_store::memory::InMemory::new()); + let parquet_store = + ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3")); + let exec = Arc::new(Executor::new_with_config_and_executor( + ExecutorConfig { + target_query_partitions: NonZeroUsize::new(1).unwrap(), + object_stores: [&parquet_store] + .into_iter() + .map(|store| (store.id(), Arc::clone(store.object_store()))) + .collect(), + metric_registry: Arc::clone(&metrics), + mem_pool_size: usize::MAX, + }, + DedicatedExecutor::new_testing(), + )); + let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store))); + let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(start_time))); + + let write_buffer = Arc::new( + influxdb3_write::write_buffer::WriteBufferImpl::new( + Arc::clone(&persister), + None::>, + Arc::clone(&time_provider), + SegmentDuration::new_5m(), + Arc::clone(&exec), + 10000, + ) + .await + .unwrap(), + ); + let query_executor = crate::query_executor::QueryExecutorImpl::new( + write_buffer.catalog(), + Arc::clone(&write_buffer), + Arc::clone(&exec), + Arc::clone(&metrics), + Arc::new(HashMap::new()), + 10, + 10, + ); + + // bind to port 0 will assign a random available port: + let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); + let listener = TcpListener::bind(socket_addr) + .await + .expect("bind tcp address"); + let addr = listener.local_addr().unwrap(); + + let server = ServerBuilder::new(common_state) + .write_buffer(Arc::clone(&write_buffer)) + .query_executor(Arc::new(query_executor)) + .persister(persister) + .authorizer(Arc::new(DefaultAuthorizer)) + .time_provider(Arc::clone(&time_provider)) + .tcp_listener(listener) + .build(); + let frontend_shutdown = CancellationToken::new(); + let shutdown = frontend_shutdown.clone(); + + tokio::spawn(async move { serve(server, frontend_shutdown).await }); + + (format!("http://{addr}"), shutdown, write_buffer) + } + pub(crate) async fn write_lp( server: impl Into + Send, database: impl Into + Send, @@ -853,17 +830,4 @@ mod tests { .await .expect("http error sending query") } - - pub(crate) fn get_free_port() -> SocketAddr { - let ip = std::net::Ipv4Addr::new(127, 0, 0, 1); - - loop { - let port = NEXT_PORT.fetch_add(1, Ordering::SeqCst); - let addr = SocketAddrV4::new(ip, port); - - if std::net::TcpListener::bind(addr).is_ok() { - return addr.into(); - } - } - } } diff --git a/influxdb3_server/src/query_executor.rs b/influxdb3_server/src/query_executor.rs index f3ac594b15..c88315cc55 100644 --- a/influxdb3_server/src/query_executor.rs +++ b/influxdb3_server/src/query_executor.rs @@ -22,6 +22,7 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::Expr; use datafusion_util::config::DEFAULT_SCHEMA; use datafusion_util::MemoryStream; +use influxdb3_write::last_cache::LastCacheFunction; use influxdb3_write::{ catalog::{Catalog, DatabaseSchema}, WriteBuffer, @@ -443,10 +444,20 @@ impl QueryNamespace for Database { cfg = cfg.with_config_option(k, v); } - cfg.build() + let ctx = cfg.build(); + ctx.inner().register_udtf( + LAST_CACHE_UDTF_NAME, + Arc::new(LastCacheFunction::new( + &self.db_schema.name, + self.write_buffer.last_cache(), + )), + ); + ctx } } +const LAST_CACHE_UDTF_NAME: &str = "last_cache"; + impl CatalogProvider for Database { fn as_any(&self) -> &dyn Any { self as &dyn Any diff --git a/influxdb3_write/src/last_cache.rs b/influxdb3_write/src/last_cache/mod.rs similarity index 83% rename from influxdb3_write/src/last_cache.rs rename to influxdb3_write/src/last_cache/mod.rs index d2a717f577..a0c563f379 100644 --- a/influxdb3_write/src/last_cache.rs +++ b/influxdb3_write/src/last_cache/mod.rs @@ -1,5 +1,4 @@ use std::{ - any::Any, collections::VecDeque, sync::Arc, time::{Duration, Instant}, @@ -17,26 +16,24 @@ use arrow::{ }, error::ArrowError, }; -use async_trait::async_trait; use datafusion::{ - common::Result as DFResult, - datasource::{TableProvider, TableType}, - execution::context::SessionState, - logical_expr::{BinaryExpr, Expr, Operator, TableProviderFilterPushDown}, - physical_plan::{memory::MemoryExec, ExecutionPlan}, + logical_expr::{BinaryExpr, Expr, Operator}, scalar::ScalarValue, }; use hashbrown::{HashMap, HashSet}; use indexmap::{IndexMap, IndexSet}; use iox_time::Time; use parking_lot::RwLock; -use schema::{InfluxColumnType, InfluxFieldType, Schema, SchemaBuilder, TIME_COLUMN_NAME}; +use schema::{InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME}; use crate::{ catalog::LastCacheSize, write_buffer::{buffer_segment::WriteBatch, Field, FieldData, Row}, }; +mod table_function; +pub use table_function::LastCacheFunction; + #[derive(Debug, thiserror::Error)] pub enum Error { #[error("invalid cache size")] @@ -51,6 +48,8 @@ pub enum Error { ValueColumnDoesNotExist { column_name: String }, #[error("schema builder error: {0}")] SchemaBuilder(#[from] schema::builder::Error), + #[error("requested last cache does not exist")] + CacheDoesNotExist, } impl Error { @@ -118,6 +117,34 @@ impl LastCacheProvider { } } + /// Get a particular cache's name and arrow schema + /// + /// This is used for the implementation of DataFusion's `TableFunctionImpl` and `TableProvider` + /// traits. + fn get_cache_name_and_schema( + &self, + db_name: &str, + tbl_name: &str, + cache_name: Option<&str>, + ) -> Option<(String, ArrowSchemaRef)> { + self.cache_map + .read() + .get(db_name) + .and_then(|db| db.get(tbl_name)) + .and_then(|tbl| { + if let Some(name) = cache_name { + tbl.get(name) + .map(|lc| (name.to_string(), Arc::clone(&lc.schema))) + } else if tbl.len() == 1 { + tbl.iter() + .map(|(name, lc)| (name.to_string(), Arc::clone(&lc.schema))) + .next() + } else { + None + } + }) + } + /// Create a new entry in the last cache for a given database and table, along with the given /// parameters. pub(crate) fn create_cache( @@ -201,14 +228,26 @@ impl LastCacheProvider { ) }; - // build a schema that only holds the field columns - let mut schema_builder = SchemaBuilder::new(); - for (t, name) in schema + let mut schema_builder = ArrowSchemaBuilder::new(); + // Add key columns first: + for (t, field) in schema + .iter() + .filter(|&(_, f)| key_columns.contains(f.name())) + { + if let InfluxColumnType::Tag = t { + // override tags with string type in the schema, because the KeyValue type stores + // them as strings, and produces them as StringArray when creating RecordBatches: + schema_builder.push(ArrowField::new(field.name(), DataType::Utf8, false)) + } else { + schema_builder.push(field.clone()); + }; + } + // Add value columns second: + for (_, field) in schema .iter() .filter(|&(_, f)| value_columns.contains(f.name())) - .map(|(t, f)| (t, f.name())) { - schema_builder.influx_column(name, t); + schema_builder.push(field.clone()); } let series_key = schema @@ -223,7 +262,7 @@ impl LastCacheProvider { .map_err(|_| Error::InvalidCacheSize)?, ttl.unwrap_or(DEFAULT_CACHE_TTL), key_columns, - schema_builder.build()?.as_arrow(), + Arc::new(schema_builder.finish()), series_key, accept_new_fields, ); @@ -340,9 +379,7 @@ pub(crate) struct LastCache { /// The key columns for this cache /// /// Uses an [`IndexSet`] for both fast iteration and fast lookup. - key_columns: IndexSet, - /// The Arrow Schema for the table that this cache is associated with - schema: ArrowSchemaRef, + key_columns: Arc>, /// Optionally store the series key for tables that use it for ensuring non-nullability in the /// column buffer for series key columns /// @@ -351,6 +388,8 @@ pub(crate) struct LastCache { series_key: Option>, /// Whether or not this cache accepts newly written fields accept_new_fields: bool, + /// The Arrow Schema for the table that this cache is associated with + schema: ArrowSchemaRef, /// The internal state of the cache state: LastCacheState, } @@ -368,10 +407,10 @@ impl LastCache { Self { count, ttl, - key_columns: key_columns.into_iter().collect(), - schema, + key_columns: Arc::new(key_columns.into_iter().collect()), series_key, accept_new_fields, + schema, state: LastCacheState::Init, } } @@ -418,6 +457,7 @@ impl LastCache { /// This will panic if the internal cache state's keys are out-of-order with respect to the /// order of the `key_columns` on this [`LastCache`] pub(crate) fn push(&mut self, row: &Row) { + let schema = Arc::clone(&self.schema); let mut target = &mut self.state; let mut key_iter = self.key_columns.iter().peekable(); while let (Some(key), peek) = (key_iter.next(), key_iter.peek()) { @@ -451,7 +491,8 @@ impl LastCache { LastCacheState::Store(LastCacheStore::new( self.count.into(), self.ttl, - Arc::clone(&self.schema), + Arc::clone(&schema), + Arc::clone(&self.key_columns), self.series_key.as_ref(), )) } @@ -462,14 +503,15 @@ impl LastCache { *target = LastCacheState::Store(LastCacheStore::new( self.count.into(), self.ttl, - Arc::clone(&self.schema), + Arc::clone(&schema), + Arc::clone(&self.key_columns), self.series_key.as_ref(), )); } let store = target.as_store_mut().expect( "cache target should be the actual store after iterating through all key columns", ); - let Some(new_columns) = store.push(row, self.accept_new_fields, &self.key_columns) else { + let Some(new_columns) = store.push(row, self.accept_new_fields) else { // Unless new columns were added, and we need to update the schema, we are done. return; }; @@ -490,10 +532,10 @@ impl LastCache { fn to_record_batches(&self, predicates: &[Predicate]) -> Result, ArrowError> { // map the provided predicates on to the key columns // there may not be predicates provided for each key column, hence the Option - let predicates: Vec> = self + let predicates: Vec> = self .key_columns .iter() - .map(|key| predicates.iter().find(|p| p.key == *key).cloned()) + .map(|key| predicates.iter().find(|p| p.key == *key)) .collect(); let mut caches = vec![ExtendedLastCacheState { @@ -508,13 +550,15 @@ impl LastCache { let mut new_caches = vec![]; 'cache_loop: for c in caches { let cache_key = c.state.as_key().unwrap(); - if let Some(ref pred) = predicate { + if let Some(pred) = predicate { let Some(next_state) = cache_key.evaluate_predicate(pred) else { continue 'cache_loop; }; + let mut additional_columns = c.additional_columns.clone(); + additional_columns.push((&cache_key.column_name, &pred.value)); new_caches.push(ExtendedLastCacheState { state: next_state, - additional_columns: c.additional_columns.clone(), + additional_columns, }); } else { new_caches.extend(cache_key.value_map.iter().map(|(v, state)| { @@ -825,6 +869,8 @@ struct LastCacheStore { /// /// [perf]: https://github.com/indexmap-rs/indexmap?tab=readme-ov-file#performance cache: IndexMap, + /// A reference to the set of key columns for the cache + key_columns: Arc>, /// A ring buffer holding the instants at which entries in the cache were inserted /// /// This is used to evict cache values that outlive the `ttl` @@ -845,11 +891,13 @@ impl LastCacheStore { count: usize, ttl: Duration, schema: ArrowSchemaRef, + key_columns: Arc>, series_keys: Option<&HashSet>, ) -> Self { let cache = schema .fields() .iter() + .filter(|f| !key_columns.contains(f.name())) .map(|f| { ( f.name().to_string(), @@ -863,6 +911,7 @@ impl LastCacheStore { .collect(); Self { cache, + key_columns, instants: VecDeque::with_capacity(count), count, ttl, @@ -888,7 +937,6 @@ impl LastCacheStore { &mut self, row: &'a Row, accept_new_fields: bool, - key_columns: &IndexSet, ) -> Option> { if row.time <= self.last_time.timestamp_nanos() { return None; @@ -904,7 +952,7 @@ impl LastCacheStore { if let Some(col) = self.cache.get_mut(&field.name) { // In this case, the field already has an entry in the cache, so just push: col.push(&field.value); - } else if !key_columns.contains(&field.name) { + } else if !self.key_columns.contains(&field.name) { // In this case, there is not an entry for the field in the cache, so if the // value is not one of the key columns, then it is a new field being added. let data_type = data_type_from_buffer_field(field); @@ -963,11 +1011,14 @@ impl LastCacheStore { .fields() .iter() .cloned() - .map(|f| { + .filter_map(|f| { if let Some(c) = self.cache.get(f.name()) { - (f, c.data.as_array()) + Some((f, c.data.as_array())) + } else if self.key_columns.contains(f.name()) { + // We prepend key columns with the extended set provided + None } else { - (Arc::clone(&f), new_null_array(f.data_type(), self.len())) + Some((Arc::clone(&f), new_null_array(f.data_type(), self.len()))) } }) .collect(); @@ -1006,52 +1057,15 @@ impl LastCacheStore { } } -#[async_trait] -impl TableProvider for LastCache { - fn as_any(&self) -> &dyn Any { - self as &dyn Any - } - - fn schema(&self) -> ArrowSchemaRef { - Arc::clone(&self.schema) - } - - fn table_type(&self) -> TableType { - TableType::Temporary - } - - fn supports_filters_pushdown( - &self, - filters: &[&Expr], - ) -> DFResult> { - Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) - } - - async fn scan( - &self, - ctx: &SessionState, - projection: Option<&Vec>, - filters: &[Expr], - _limit: Option, - ) -> DFResult> { - let predicates = self.convert_filter_exprs(filters); - let partitions = vec![self.to_record_batches(&predicates)?]; - let mut exec = MemoryExec::try_new(&partitions, self.schema(), projection.cloned())?; - - let show_sizes = ctx.config_options().explain.show_sizes; - exec = exec.with_show_sizes(show_sizes); - - Ok(Arc::new(exec)) - } -} - /// A column in a [`LastCache`] /// /// Stores its size so it can evict old data on push. Stores the time-to-live (TTL) in order /// to remove expired data. #[derive(Debug)] struct CacheColumn { + /// The number of entries the [`CacheColumn`] will hold before evicting old ones on push size: usize, + /// The buffer containing data for the column data: CacheColumnData, } @@ -1381,11 +1395,11 @@ mod tests { assert_batches_eq!( [ - "+--------+-----------------------------+-------+", - "| region | time | usage |", - "+--------+-----------------------------+-------+", - "| us | 1970-01-01T00:00:00.000002Z | 99.0 |", - "+--------+-----------------------------+-------+", + "+------+--------+-----------------------------+-------+", + "| host | region | time | usage |", + "+------+--------+-----------------------------+-------+", + "| a | us | 1970-01-01T00:00:00.000002Z | 99.0 |", + "+------+--------+-----------------------------+-------+", ], &batch ); @@ -1409,11 +1423,11 @@ mod tests { assert_batches_eq!( [ - "+--------+-----------------------------+-------+", - "| region | time | usage |", - "+--------+-----------------------------+-------+", - "| us | 1970-01-01T00:00:00.000003Z | 88.0 |", - "+--------+-----------------------------+-------+", + "+------+--------+-----------------------------+-------+", + "| host | region | time | usage |", + "+------+--------+-----------------------------+-------+", + "| a | us | 1970-01-01T00:00:00.000003Z | 88.0 |", + "+------+--------+-----------------------------+-------+", ], &batch ); @@ -1497,11 +1511,11 @@ mod tests { Predicate::new("host", KeyValue::string("c")), ], expected: &[ - "+-----------------------------+-------+", - "| time | usage |", - "+-----------------------------+-------+", - "| 1970-01-01T00:00:00.000001Z | 60.0 |", - "+-----------------------------+-------+", + "+--------+------+-----------------------------+-------+", + "| region | host | time | usage |", + "+--------+------+-----------------------------+-------+", + "| us | c | 1970-01-01T00:00:00.000001Z | 60.0 |", + "+--------+------+-----------------------------+-------+", ], }, // Predicate on only region key column will have host column outputted in addition to @@ -1509,26 +1523,26 @@ mod tests { TestCase { predicates: &[Predicate::new("region", KeyValue::string("us"))], expected: &[ - "+------+-----------------------------+-------+", - "| host | time | usage |", - "+------+-----------------------------+-------+", - "| a | 1970-01-01T00:00:00.000001Z | 100.0 |", - "| c | 1970-01-01T00:00:00.000001Z | 60.0 |", - "| b | 1970-01-01T00:00:00.000001Z | 80.0 |", - "+------+-----------------------------+-------+", + "+--------+------+-----------------------------+-------+", + "| region | host | time | usage |", + "+--------+------+-----------------------------+-------+", + "| us | a | 1970-01-01T00:00:00.000001Z | 100.0 |", + "| us | b | 1970-01-01T00:00:00.000001Z | 80.0 |", + "| us | c | 1970-01-01T00:00:00.000001Z | 60.0 |", + "+--------+------+-----------------------------+-------+", ], }, // Similar to previous, with a different region predicate: TestCase { predicates: &[Predicate::new("region", KeyValue::string("ca"))], expected: &[ - "+------+-----------------------------+-------+", - "| host | time | usage |", - "+------+-----------------------------+-------+", - "| d | 1970-01-01T00:00:00.000001Z | 40.0 |", - "| e | 1970-01-01T00:00:00.000001Z | 20.0 |", - "| f | 1970-01-01T00:00:00.000001Z | 30.0 |", - "+------+-----------------------------+-------+", + "+--------+------+-----------------------------+-------+", + "| region | host | time | usage |", + "+--------+------+-----------------------------+-------+", + "| ca | d | 1970-01-01T00:00:00.000001Z | 40.0 |", + "| ca | e | 1970-01-01T00:00:00.000001Z | 20.0 |", + "| ca | f | 1970-01-01T00:00:00.000001Z | 30.0 |", + "+--------+------+-----------------------------+-------+", ], }, // Predicate on only host key column will have region column outputted in addition to @@ -1536,11 +1550,11 @@ mod tests { TestCase { predicates: &[Predicate::new("host", KeyValue::string("a"))], expected: &[ - "+--------+-----------------------------+-------+", - "| region | time | usage |", - "+--------+-----------------------------+-------+", - "| us | 1970-01-01T00:00:00.000001Z | 100.0 |", - "+--------+-----------------------------+-------+", + "+--------+------+-----------------------------+-------+", + "| region | host | time | usage |", + "+--------+------+-----------------------------+-------+", + "| us | a | 1970-01-01T00:00:00.000001Z | 100.0 |", + "+--------+------+-----------------------------+-------+", ], }, // Omitting all key columns from the predicate will have all key columns included in @@ -1704,57 +1718,57 @@ mod tests { Predicate::new("host", KeyValue::string("a")), ], expected: &[ - "+--------------------------------+-------+", - "| time | usage |", - "+--------------------------------+-------+", - "| 1970-01-01T00:00:00.000001500Z | 99.0 |", - "| 1970-01-01T00:00:00.000001Z | 100.0 |", - "| 1970-01-01T00:00:00.000002500Z | 90.0 |", - "| 1970-01-01T00:00:00.000002Z | 95.0 |", - "+--------------------------------+-------+", + "+--------+------+--------------------------------+-------+", + "| region | host | time | usage |", + "+--------+------+--------------------------------+-------+", + "| us | a | 1970-01-01T00:00:00.000001500Z | 99.0 |", + "| us | a | 1970-01-01T00:00:00.000001Z | 100.0 |", + "| us | a | 1970-01-01T00:00:00.000002500Z | 90.0 |", + "| us | a | 1970-01-01T00:00:00.000002Z | 95.0 |", + "+--------+------+--------------------------------+-------+", ], }, TestCase { predicates: &[Predicate::new("region", KeyValue::string("us"))], expected: &[ - "+------+--------------------------------+-------+", - "| host | time | usage |", - "+------+--------------------------------+-------+", - "| a | 1970-01-01T00:00:00.000001500Z | 99.0 |", - "| a | 1970-01-01T00:00:00.000001Z | 100.0 |", - "| a | 1970-01-01T00:00:00.000002500Z | 90.0 |", - "| a | 1970-01-01T00:00:00.000002Z | 95.0 |", - "| b | 1970-01-01T00:00:00.000001500Z | 88.0 |", - "| b | 1970-01-01T00:00:00.000001Z | 80.0 |", - "| b | 1970-01-01T00:00:00.000002500Z | 99.0 |", - "| b | 1970-01-01T00:00:00.000002Z | 92.0 |", - "+------+--------------------------------+-------+", + "+--------+------+--------------------------------+-------+", + "| region | host | time | usage |", + "+--------+------+--------------------------------+-------+", + "| us | a | 1970-01-01T00:00:00.000001500Z | 99.0 |", + "| us | a | 1970-01-01T00:00:00.000001Z | 100.0 |", + "| us | a | 1970-01-01T00:00:00.000002500Z | 90.0 |", + "| us | a | 1970-01-01T00:00:00.000002Z | 95.0 |", + "| us | b | 1970-01-01T00:00:00.000001500Z | 88.0 |", + "| us | b | 1970-01-01T00:00:00.000001Z | 80.0 |", + "| us | b | 1970-01-01T00:00:00.000002500Z | 99.0 |", + "| us | b | 1970-01-01T00:00:00.000002Z | 92.0 |", + "+--------+------+--------------------------------+-------+", ], }, TestCase { predicates: &[Predicate::new("host", KeyValue::string("a"))], expected: &[ - "+--------+--------------------------------+-------+", - "| region | time | usage |", - "+--------+--------------------------------+-------+", - "| us | 1970-01-01T00:00:00.000001500Z | 99.0 |", - "| us | 1970-01-01T00:00:00.000001Z | 100.0 |", - "| us | 1970-01-01T00:00:00.000002500Z | 90.0 |", - "| us | 1970-01-01T00:00:00.000002Z | 95.0 |", - "+--------+--------------------------------+-------+", + "+--------+------+--------------------------------+-------+", + "| region | host | time | usage |", + "+--------+------+--------------------------------+-------+", + "| us | a | 1970-01-01T00:00:00.000001500Z | 99.0 |", + "| us | a | 1970-01-01T00:00:00.000001Z | 100.0 |", + "| us | a | 1970-01-01T00:00:00.000002500Z | 90.0 |", + "| us | a | 1970-01-01T00:00:00.000002Z | 95.0 |", + "+--------+------+--------------------------------+-------+", ], }, TestCase { predicates: &[Predicate::new("host", KeyValue::string("b"))], expected: &[ - "+--------+--------------------------------+-------+", - "| region | time | usage |", - "+--------+--------------------------------+-------+", - "| us | 1970-01-01T00:00:00.000001500Z | 88.0 |", - "| us | 1970-01-01T00:00:00.000001Z | 80.0 |", - "| us | 1970-01-01T00:00:00.000002500Z | 99.0 |", - "| us | 1970-01-01T00:00:00.000002Z | 92.0 |", - "+--------+--------------------------------+-------+", + "+--------+------+--------------------------------+-------+", + "| region | host | time | usage |", + "+--------+------+--------------------------------+-------+", + "| us | b | 1970-01-01T00:00:00.000001500Z | 88.0 |", + "| us | b | 1970-01-01T00:00:00.000001Z | 80.0 |", + "| us | b | 1970-01-01T00:00:00.000002500Z | 99.0 |", + "| us | b | 1970-01-01T00:00:00.000002Z | 92.0 |", + "+--------+------+--------------------------------+-------+", ], }, TestCase { @@ -1853,11 +1867,11 @@ mod tests { assert_batches_sorted_eq!( [ - "+-----------------------------+-------+", - "| time | usage |", - "+-----------------------------+-------+", - "| 1970-01-01T00:00:00.000001Z | 100.0 |", - "+-----------------------------+-------+", + "+--------+------+-----------------------------+-------+", + "| region | host | time | usage |", + "+--------+------+-----------------------------+-------+", + "| us | a | 1970-01-01T00:00:00.000001Z | 100.0 |", + "+--------+------+-----------------------------+-------+", ], &batches ); @@ -1903,11 +1917,11 @@ mod tests { assert_batches_sorted_eq!( [ - "+--------+--------------------------+-------+", - "| region | time | usage |", - "+--------+--------------------------+-------+", - "| us | 1970-01-01T00:00:00.500Z | 333.0 |", - "+--------+--------------------------+-------+", + "+--------+------+--------------------------+-------+", + "| region | host | time | usage |", + "+--------+------+--------------------------+-------+", + "| us | a | 1970-01-01T00:00:00.500Z | 333.0 |", + "+--------+------+--------------------------+-------+", ], &batches ); @@ -1997,36 +2011,36 @@ mod tests { TestCase { predicates: &[Predicate::new("component_id", KeyValue::string("333"))], expected: &[ - "+--------+--------+------+---------+-----------------------------+", - "| active | type | loc | reading | time |", - "+--------+--------+------+---------+-----------------------------+", - "| true | camera | fore | 145.0 | 1970-01-01T00:00:00.000001Z |", - "+--------+--------+------+---------+-----------------------------+", + "+--------------+--------+--------+------+---------+-----------------------------+", + "| component_id | active | type | loc | reading | time |", + "+--------------+--------+--------+------+---------+-----------------------------+", + "| 333 | true | camera | fore | 145.0 | 1970-01-01T00:00:00.000001Z |", + "+--------------+--------+--------+------+---------+-----------------------------+", ], }, // Predicate on a non-string field key: TestCase { predicates: &[Predicate::new("active", KeyValue::Bool(false))], expected: &[ - "+--------------+-------------+---------+---------+-----------------------------+", - "| component_id | type | loc | reading | time |", - "+--------------+-------------+---------+---------+-----------------------------+", - "| 555 | solar-panel | huygens | 200.0 | 1970-01-01T00:00:00.000001Z |", - "| 666 | comms-dish | huygens | 220.0 | 1970-01-01T00:00:00.000001Z |", - "+--------------+-------------+---------+---------+-----------------------------+", + "+--------------+--------+-------------+---------+---------+-----------------------------+", + "| component_id | active | type | loc | reading | time |", + "+--------------+--------+-------------+---------+---------+-----------------------------+", + "| 555 | false | solar-panel | huygens | 200.0 | 1970-01-01T00:00:00.000001Z |", + "| 666 | false | comms-dish | huygens | 220.0 | 1970-01-01T00:00:00.000001Z |", + "+--------------+--------+-------------+---------+---------+-----------------------------+", ], }, // Predicate on a string field key: TestCase { predicates: &[Predicate::new("type", KeyValue::string("camera"))], expected: &[ - "+--------------+--------+-----------+---------+-----------------------------+", - "| component_id | active | loc | reading | time |", - "+--------------+--------+-----------+---------+-----------------------------+", - "| 111 | true | port | 150.0 | 1970-01-01T00:00:00.000001Z |", - "| 222 | true | starboard | 250.0 | 1970-01-01T00:00:00.000001Z |", - "| 333 | true | fore | 145.0 | 1970-01-01T00:00:00.000001Z |", - "+--------------+--------+-----------+---------+-----------------------------+", + "+--------------+--------+--------+-----------+---------+-----------------------------+", + "| component_id | active | type | loc | reading | time |", + "+--------------+--------+--------+-----------+---------+-----------------------------+", + "| 111 | true | camera | port | 150.0 | 1970-01-01T00:00:00.000001Z |", + "| 222 | true | camera | starboard | 250.0 | 1970-01-01T00:00:00.000001Z |", + "| 333 | true | camera | fore | 145.0 | 1970-01-01T00:00:00.000001Z |", + "+--------------+--------+--------+-----------+---------+-----------------------------+", ], } ]; @@ -2110,39 +2124,39 @@ mod tests { TestCase { predicates: &[Predicate::new("state", KeyValue::string("ca"))], expected: &[ - "+--------+-------+-------+-----------------------------+", - "| county | farm | speed | time |", - "+--------+-------+-------+-----------------------------+", - "| napa | 10-01 | 50.0 | 1970-01-01T00:00:00.000001Z |", - "| napa | 10-02 | 49.0 | 1970-01-01T00:00:00.000001Z |", - "| nevada | 40-01 | 66.0 | 1970-01-01T00:00:00.000001Z |", - "| orange | 20-01 | 40.0 | 1970-01-01T00:00:00.000001Z |", - "| orange | 20-02 | 33.0 | 1970-01-01T00:00:00.000001Z |", - "| yolo | 30-01 | 62.0 | 1970-01-01T00:00:00.000001Z |", - "+--------+-------+-------+-----------------------------+", + "+-------+--------+-------+-------+-----------------------------+", + "| state | county | farm | speed | time |", + "+-------+--------+-------+-------+-----------------------------+", + "| ca | napa | 10-01 | 50.0 | 1970-01-01T00:00:00.000001Z |", + "| ca | napa | 10-02 | 49.0 | 1970-01-01T00:00:00.000001Z |", + "| ca | nevada | 40-01 | 66.0 | 1970-01-01T00:00:00.000001Z |", + "| ca | orange | 20-01 | 40.0 | 1970-01-01T00:00:00.000001Z |", + "| ca | orange | 20-02 | 33.0 | 1970-01-01T00:00:00.000001Z |", + "| ca | yolo | 30-01 | 62.0 | 1970-01-01T00:00:00.000001Z |", + "+-------+--------+-------+-------+-----------------------------+", ], }, // Predicate on county column, which is part of the series key: TestCase { predicates: &[Predicate::new("county", KeyValue::string("napa"))], expected: &[ - "+-------+-------+-------+-----------------------------+", - "| state | farm | speed | time |", - "+-------+-------+-------+-----------------------------+", - "| ca | 10-01 | 50.0 | 1970-01-01T00:00:00.000001Z |", - "| ca | 10-02 | 49.0 | 1970-01-01T00:00:00.000001Z |", - "+-------+-------+-------+-----------------------------+", + "+-------+--------+-------+-------+-----------------------------+", + "| state | county | farm | speed | time |", + "+-------+--------+-------+-------+-----------------------------+", + "| ca | napa | 10-01 | 50.0 | 1970-01-01T00:00:00.000001Z |", + "| ca | napa | 10-02 | 49.0 | 1970-01-01T00:00:00.000001Z |", + "+-------+--------+-------+-------+-----------------------------+", ], }, // Predicate on farm column, which is part of the series key: TestCase { predicates: &[Predicate::new("farm", KeyValue::string("30-01"))], expected: &[ - "+-------+--------+-------+-----------------------------+", - "| state | county | speed | time |", - "+-------+--------+-------+-----------------------------+", - "| ca | yolo | 62.0 | 1970-01-01T00:00:00.000001Z |", - "+-------+--------+-------+-----------------------------+", + "+-------+--------+-------+-------+-----------------------------+", + "| state | county | farm | speed | time |", + "+-------+--------+-------+-------+-----------------------------+", + "| ca | yolo | 30-01 | 62.0 | 1970-01-01T00:00:00.000001Z |", + "+-------+--------+-------+-------+-----------------------------+", ], }, // Predicate on all series key columns: @@ -2153,11 +2167,11 @@ mod tests { Predicate::new("farm", KeyValue::string("40-01")), ], expected: &[ - "+-------+-----------------------------+", - "| speed | time |", - "+-------+-----------------------------+", - "| 66.0 | 1970-01-01T00:00:00.000001Z |", - "+-------+-----------------------------+", + "+-------+--------+-------+-------+-----------------------------+", + "| state | county | farm | speed | time |", + "+-------+--------+-------+-------+-----------------------------+", + "| ca | nevada | 40-01 | 66.0 | 1970-01-01T00:00:00.000001Z |", + "+-------+--------+-------+-------+-----------------------------+", ], }, ]; @@ -2241,39 +2255,39 @@ mod tests { TestCase { predicates: &[Predicate::new("state", KeyValue::string("ca"))], expected: &[ - "+--------+-------+-------+-----------------------------+", - "| county | farm | speed | time |", - "+--------+-------+-------+-----------------------------+", - "| napa | 10-01 | 50.0 | 1970-01-01T00:00:00.000001Z |", - "| napa | 10-02 | 49.0 | 1970-01-01T00:00:00.000001Z |", - "| nevada | 40-01 | 66.0 | 1970-01-01T00:00:00.000001Z |", - "| orange | 20-01 | 40.0 | 1970-01-01T00:00:00.000001Z |", - "| orange | 20-02 | 33.0 | 1970-01-01T00:00:00.000001Z |", - "| yolo | 30-01 | 62.0 | 1970-01-01T00:00:00.000001Z |", - "+--------+-------+-------+-----------------------------+", + "+--------+-------+-------+-------+-----------------------------+", + "| county | farm | state | speed | time |", + "+--------+-------+-------+-------+-----------------------------+", + "| napa | 10-01 | ca | 50.0 | 1970-01-01T00:00:00.000001Z |", + "| napa | 10-02 | ca | 49.0 | 1970-01-01T00:00:00.000001Z |", + "| nevada | 40-01 | ca | 66.0 | 1970-01-01T00:00:00.000001Z |", + "| orange | 20-01 | ca | 40.0 | 1970-01-01T00:00:00.000001Z |", + "| orange | 20-02 | ca | 33.0 | 1970-01-01T00:00:00.000001Z |", + "| yolo | 30-01 | ca | 62.0 | 1970-01-01T00:00:00.000001Z |", + "+--------+-------+-------+-------+-----------------------------+", ], }, // Predicate on county column, which is part of the series key: TestCase { predicates: &[Predicate::new("county", KeyValue::string("napa"))], expected: &[ - "+-------+-------+-------+-----------------------------+", - "| farm | state | speed | time |", - "+-------+-------+-------+-----------------------------+", - "| 10-01 | ca | 50.0 | 1970-01-01T00:00:00.000001Z |", - "| 10-02 | ca | 49.0 | 1970-01-01T00:00:00.000001Z |", - "+-------+-------+-------+-----------------------------+", + "+--------+-------+-------+-------+-----------------------------+", + "| county | farm | state | speed | time |", + "+--------+-------+-------+-------+-----------------------------+", + "| napa | 10-01 | ca | 50.0 | 1970-01-01T00:00:00.000001Z |", + "| napa | 10-02 | ca | 49.0 | 1970-01-01T00:00:00.000001Z |", + "+--------+-------+-------+-------+-----------------------------+", ], }, // Predicate on farm column, which is part of the series key: TestCase { predicates: &[Predicate::new("farm", KeyValue::string("30-01"))], expected: &[ - "+--------+-------+-------+-----------------------------+", - "| county | state | speed | time |", - "+--------+-------+-------+-----------------------------+", - "| yolo | ca | 62.0 | 1970-01-01T00:00:00.000001Z |", - "+--------+-------+-------+-----------------------------+", + "+--------+-------+-------+-------+-----------------------------+", + "| county | farm | state | speed | time |", + "+--------+-------+-------+-------+-----------------------------+", + "| yolo | 30-01 | ca | 62.0 | 1970-01-01T00:00:00.000001Z |", + "+--------+-------+-------+-------+-----------------------------+", ], }, // Predicate on all series key columns: @@ -2284,11 +2298,11 @@ mod tests { Predicate::new("farm", KeyValue::string("40-01")), ], expected: &[ - "+-------+-----------------------------+", - "| speed | time |", - "+-------+-----------------------------+", - "| 66.0 | 1970-01-01T00:00:00.000001Z |", - "+-------+-----------------------------+", + "+--------+-------+-------+-------+-----------------------------+", + "| county | farm | state | speed | time |", + "+--------+-------+-------+-------+-----------------------------+", + "| nevada | 40-01 | ca | 66.0 | 1970-01-01T00:00:00.000001Z |", + "+--------+-------+-------+-------+-----------------------------+", ], }, ]; @@ -2421,22 +2435,22 @@ mod tests { TestCase { predicates: &[Predicate::new("game_id", KeyValue::string("4"))], expected: &[ - "+-----------+-----------------------------+------+------+", - "| player | time | type | zone |", - "+-----------+-----------------------------+------+------+", - "| bobrovsky | 1970-01-01T00:00:00.000001Z | save | home |", - "+-----------+-----------------------------+------+------+", + "+---------+-----------+-----------------------------+------+------+", + "| game_id | player | time | type | zone |", + "+---------+-----------+-----------------------------+------+------+", + "| 4 | bobrovsky | 1970-01-01T00:00:00.000001Z | save | home |", + "+---------+-----------+-----------------------------+------+------+", ], }, // Cache that does not have a zone column will produce it with nulls: TestCase { predicates: &[Predicate::new("game_id", KeyValue::string("1"))], expected: &[ - "+-----------+-----------------------------+------+------+", - "| player | time | type | zone |", - "+-----------+-----------------------------+------+------+", - "| mackinnon | 1970-01-01T00:00:00.000001Z | shot | |", - "+-----------+-----------------------------+------+------+", + "+---------+-----------+-----------------------------+------+------+", + "| game_id | player | time | type | zone |", + "+---------+-----------+-----------------------------+------+------+", + "| 1 | mackinnon | 1970-01-01T00:00:00.000001Z | shot | |", + "+---------+-----------+-----------------------------+------+------+", ], }, // Pulling from multiple caches will fill in with nulls: @@ -2545,31 +2559,31 @@ mod tests { TestCase { predicates: &[Predicate::new("t1", KeyValue::string("a"))], expected: &[ - "+-----+--------------------------------+-----+-----+-----+", - "| f1 | time | f2 | f3 | f4 |", - "+-----+--------------------------------+-----+-----+-----+", - "| 1.0 | 1970-01-01T00:00:00.000001500Z | 2.0 | 3.0 | 4.0 |", - "+-----+--------------------------------+-----+-----+-----+", + "+----+-----+--------------------------------+-----+-----+-----+", + "| t1 | f1 | time | f2 | f3 | f4 |", + "+----+-----+--------------------------------+-----+-----+-----+", + "| a | 1.0 | 1970-01-01T00:00:00.000001500Z | 2.0 | 3.0 | 4.0 |", + "+----+-----+--------------------------------+-----+-----+-----+", ], }, TestCase { predicates: &[Predicate::new("t1", KeyValue::string("b"))], expected: &[ - "+------+--------------------------------+----+------+------+", - "| f1 | time | f2 | f3 | f4 |", - "+------+--------------------------------+----+------+------+", - "| 10.0 | 1970-01-01T00:00:00.000001500Z | | 30.0 | 40.0 |", - "+------+--------------------------------+----+------+------+", + "+----+------+--------------------------------+----+------+------+", + "| t1 | f1 | time | f2 | f3 | f4 |", + "+----+------+--------------------------------+----+------+------+", + "| b | 10.0 | 1970-01-01T00:00:00.000001500Z | | 30.0 | 40.0 |", + "+----+------+--------------------------------+----+------+------+", ], }, TestCase { predicates: &[Predicate::new("t1", KeyValue::string("c"))], expected: &[ - "+-------+--------------------------------+-------+-------+----+", - "| f1 | time | f2 | f3 | f4 |", - "+-------+--------------------------------+-------+-------+----+", - "| 100.0 | 1970-01-01T00:00:00.000001500Z | 200.0 | 300.0 | |", - "+-------+--------------------------------+-------+-------+----+", + "+----+-------+--------------------------------+-------+-------+----+", + "| t1 | f1 | time | f2 | f3 | f4 |", + "+----+-------+--------------------------------+-------+-------+----+", + "| c | 100.0 | 1970-01-01T00:00:00.000001500Z | 200.0 | 300.0 | |", + "+----+-------+--------------------------------+-------+-------+----+", ], }, // Can query accross key column values: diff --git a/influxdb3_write/src/last_cache/table_function.rs b/influxdb3_write/src/last_cache/table_function.rs new file mode 100644 index 0000000000..094987eafc --- /dev/null +++ b/influxdb3_write/src/last_cache/table_function.rs @@ -0,0 +1,117 @@ +use std::{any::Any, sync::Arc}; + +use arrow::datatypes::SchemaRef; +use async_trait::async_trait; +use datafusion::{ + common::{plan_err, Result}, + datasource::{function::TableFunctionImpl, TableProvider, TableType}, + execution::context::SessionState, + logical_expr::{Expr, TableProviderFilterPushDown}, + physical_plan::{memory::MemoryExec, ExecutionPlan}, + scalar::ScalarValue, +}; + +use super::LastCacheProvider; + +struct LastCacheFunctionProvider { + db_name: String, + table_name: String, + cache_name: String, + schema: SchemaRef, + provider: Arc, +} + +#[async_trait] +impl TableProvider for LastCacheFunctionProvider { + fn as_any(&self) -> &dyn Any { + self as &dyn Any + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn table_type(&self) -> TableType { + TableType::Temporary + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) + } + + async fn scan( + &self, + ctx: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + _limit: Option, + ) -> Result> { + let read = self.provider.cache_map.read(); + let batches = if let Some(cache) = read + .get(&self.db_name) + .and_then(|db| db.get(&self.table_name)) + .and_then(|tbl| tbl.get(&self.cache_name)) + { + let predicates = cache.convert_filter_exprs(filters); + cache.to_record_batches(&predicates)? + } else { + // If there is no cache, it means that it was removed, in which case, we just return + // an empty set of record batches. + vec![] + }; + let mut exec = MemoryExec::try_new(&[batches], self.schema(), projection.cloned())?; + + let show_sizes = ctx.config_options().explain.show_sizes; + exec = exec.with_show_sizes(show_sizes); + + Ok(Arc::new(exec)) + } +} + +pub struct LastCacheFunction { + db_name: String, + provider: Arc, +} + +impl LastCacheFunction { + pub fn new(db_name: impl Into, provider: Arc) -> Self { + Self { + db_name: db_name.into(), + provider, + } + } +} + +impl TableFunctionImpl for LastCacheFunction { + fn call(&self, args: &[Expr]) -> Result> { + let Some(Expr::Literal(ScalarValue::Utf8(Some(table_name)))) = args.first() else { + return plan_err!("first argument must be the table name as a string"); + }; + + let cache_name = match args.get(1) { + Some(Expr::Literal(ScalarValue::Utf8(Some(name)))) => Some(name), + Some(_) => { + return plan_err!("second argument, if passed, must be the cache name as a string") + } + None => None, + }; + + match self.provider.get_cache_name_and_schema( + &self.db_name, + table_name, + cache_name.map(|x| x.as_str()), + ) { + Some((cache_name, schema)) => Ok(Arc::new(LastCacheFunctionProvider { + db_name: self.db_name.clone(), + table_name: table_name.clone(), + cache_name, + schema, + provider: Arc::clone(&self.provider), + })), + None => plan_err!("could not find cache for the given arguments"), + } + } +} diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index 9155fab042..174e1fe685 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -9,7 +9,7 @@ pub mod cache; pub mod catalog; mod chunk; -mod last_cache; +pub mod last_cache; pub mod paths; pub mod persister; pub mod wal;