feat: hook up last cache to query executor using DataFusion traits (#25143)
* feat: impl datafusion traits on last cache Created a new module for the DataFusion table function implementations. The TableProvider impl for LastCache was moved there, and new code that implements the TableFunctionImpl trait to make the last cache queryable was also written. The LastCacheProvider and LastCache were augmented to make this work: - The provider stores an Arc<LastCache> instead of a LastCache - The LastCache uses interior mutability via an RwLock, to make the above possible. * feat: register last_cache UDTF on query context * refactor: make server accept listener instead of socket addr The server used to accept a socket address and bind it directly, returning error if the bind fails. This commit changes that so the ServerBuilder accepts a TcpListener. The behaviour is essentially the same, but this allows us to bind the address from tests when instantiating the server, so we can easily assign unused ports. Tests in the influxdb3_server were updated to exploit this in order to use port 0 auto assignment and stop flaky test failures. A new, failing, test was also added to that module for the last cache. * refactor: naive implementation of last cache key columns Committing here as the last cache is in a working state, but it is naively implemented as it just stores all key columns again (still with the hierarchy) * refactor: make the last cache work with the query executor * chore: fix my own feedback and appease clippy * refactor: remove lower lock in last cache * chore: cargo update * refactor: rename function * fix: broken doc commentpull/25147/head^2
parent
0b8fbf456c
commit
0279461738
|
@ -469,7 +469,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -480,7 +480,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -653,9 +653,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "blake3"
|
||||
version = "1.5.1"
|
||||
version = "1.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "30cca6d3674597c30ddf2c587bf8d9d65c9a84d2326d941cc79c9842dfe0ef52"
|
||||
checksum = "e9ec96fe9a81b5e365f9db71fe00edc4fe4ca2cc7dcb7861f0603012a7caa210"
|
||||
dependencies = [
|
||||
"arrayref",
|
||||
"arrayvec",
|
||||
|
@ -719,9 +719,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
|
|||
|
||||
[[package]]
|
||||
name = "bytes"
|
||||
version = "1.6.0"
|
||||
version = "1.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9"
|
||||
checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952"
|
||||
|
||||
[[package]]
|
||||
name = "bzip2"
|
||||
|
@ -766,13 +766,12 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "cc"
|
||||
version = "1.1.0"
|
||||
version = "1.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eaff6f8ce506b9773fa786672d63fc7a191ffea1be33f72bbd4aeacefca9ffc8"
|
||||
checksum = "324c74f2155653c90b04f25b2a47a8a631360cb908f92a772695f430c7e31052"
|
||||
dependencies = [
|
||||
"jobserver",
|
||||
"libc",
|
||||
"once_cell",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -887,7 +886,7 @@ dependencies = [
|
|||
"heck 0.5.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1178,7 +1177,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1202,7 +1201,7 @@ dependencies = [
|
|||
"proc-macro2",
|
||||
"quote",
|
||||
"strsim",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1213,7 +1212,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806"
|
|||
dependencies = [
|
||||
"darling_core",
|
||||
"quote",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1900,7 +1899,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -2164,9 +2163,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "http-body"
|
||||
version = "1.0.0"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643"
|
||||
checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"http 1.1.0",
|
||||
|
@ -2181,7 +2180,7 @@ dependencies = [
|
|||
"bytes",
|
||||
"futures-util",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"http-body 1.0.1",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
|
@ -2238,7 +2237,7 @@ dependencies = [
|
|||
"futures-util",
|
||||
"h2 0.4.5",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"http-body 1.0.1",
|
||||
"httparse",
|
||||
"itoa",
|
||||
"pin-project-lite",
|
||||
|
@ -2301,7 +2300,7 @@ dependencies = [
|
|||
"futures-channel",
|
||||
"futures-util",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"http-body 1.0.1",
|
||||
"hyper 1.4.1",
|
||||
"pin-project-lite",
|
||||
"socket2",
|
||||
|
@ -3529,7 +3528,7 @@ checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
|
|||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"redox_syscall 0.5.2",
|
||||
"redox_syscall 0.5.3",
|
||||
"smallvec",
|
||||
"windows-targets 0.52.6",
|
||||
]
|
||||
|
@ -3731,7 +3730,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -3847,7 +3846,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -3942,7 +3941,7 @@ dependencies = [
|
|||
"prost 0.12.6",
|
||||
"prost-types 0.12.6",
|
||||
"regex",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
"tempfile",
|
||||
]
|
||||
|
||||
|
@ -3969,7 +3968,7 @@ dependencies = [
|
|||
"itertools 0.12.1",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -4148,9 +4147,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.5.2"
|
||||
version = "0.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd"
|
||||
checksum = "2a908a6e00f1fdd0dfd9c0eb08ce85126f6d8bbda50017e74bc4a4b7d4a926a4"
|
||||
dependencies = [
|
||||
"bitflags 2.6.0",
|
||||
]
|
||||
|
@ -4257,7 +4256,7 @@ dependencies = [
|
|||
"futures-util",
|
||||
"h2 0.4.5",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"http-body 1.0.1",
|
||||
"http-body-util",
|
||||
"hyper 1.4.1",
|
||||
"hyper-rustls 0.27.2",
|
||||
|
@ -4553,9 +4552,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "security-framework"
|
||||
version = "2.11.0"
|
||||
version = "2.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c627723fd09706bacdb5cf41499e95098555af3c3c29d014dc3c458ef6be11c0"
|
||||
checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02"
|
||||
dependencies = [
|
||||
"bitflags 2.6.0",
|
||||
"core-foundation",
|
||||
|
@ -4566,9 +4565,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "security-framework-sys"
|
||||
version = "2.11.0"
|
||||
version = "2.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "317936bbbd05227752583946b9e66d7ce3b489f84e11a94a510b4437fef407d7"
|
||||
checksum = "75da29fe9b9b08fe9d6b22b5b4bcbc75d8db3aa31e639aa56bb62e9d46bfceaf"
|
||||
dependencies = [
|
||||
"core-foundation-sys",
|
||||
"libc",
|
||||
|
@ -4603,7 +4602,7 @@ checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -4631,9 +4630,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "serde_with"
|
||||
version = "3.8.3"
|
||||
version = "3.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e73139bc5ec2d45e6c5fd85be5a46949c1c39a4c18e56915f5eb4c12f975e377"
|
||||
checksum = "69cecfa94848272156ea67b2b1a53f20fc7bc638c4a46d2f8abde08f05f4b857"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"chrono",
|
||||
|
@ -4649,14 +4648,14 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "serde_with_macros"
|
||||
version = "3.8.3"
|
||||
version = "3.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b80d3d6b56b64335c0180e5ffde23b3c5e08c14c585b51a15bd0e95393f46703"
|
||||
checksum = "a8fee4991ef4f274617a51ad4af30519438dacb2f56ac773b08a1922ff743350"
|
||||
dependencies = [
|
||||
"darling",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -4826,7 +4825,7 @@ dependencies = [
|
|||
"heck 0.5.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -4892,7 +4891,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -5149,7 +5148,7 @@ dependencies = [
|
|||
"proc-macro2",
|
||||
"quote",
|
||||
"rustversion",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -5171,9 +5170,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "2.0.70"
|
||||
version = "2.0.71"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2f0209b68b3613b093e0ec905354eccaedcfe83b8cb37cbdeae64026c3064c16"
|
||||
checksum = "b146dcf730474b4bcd16c311627b31ede9ab149045db4d6088b3becaea046462"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
|
@ -5268,22 +5267,22 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "1.0.61"
|
||||
version = "1.0.62"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709"
|
||||
checksum = "f2675633b1499176c2dff06b0856a27976a8f9d436737b4cf4f312d4d91d8bbb"
|
||||
dependencies = [
|
||||
"thiserror-impl",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror-impl"
|
||||
version = "1.0.61"
|
||||
version = "1.0.62"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533"
|
||||
checksum = "d20468752b09f49e909e55a5d338caa8bedf615594e9d80bc4c565d30faf798c"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -5442,7 +5441,7 @@ checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -5593,7 +5592,7 @@ dependencies = [
|
|||
"proc-macro2",
|
||||
"prost-build",
|
||||
"quote",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -5726,7 +5725,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -6042,7 +6041,7 @@ dependencies = [
|
|||
"once_cell",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
"wasm-bindgen-shared",
|
||||
]
|
||||
|
||||
|
@ -6076,7 +6075,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
"wasm-bindgen-backend",
|
||||
"wasm-bindgen-shared",
|
||||
]
|
||||
|
@ -6428,7 +6427,7 @@ dependencies = [
|
|||
"strum",
|
||||
"subtle",
|
||||
"syn 1.0.109",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
"thrift",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
|
@ -6482,7 +6481,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -6502,7 +6501,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
|
|||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.70",
|
||||
"syn 2.0.71",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
@ -31,6 +31,7 @@ use std::{
|
|||
sync::Arc,
|
||||
};
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use trace_exporters::TracingConfig;
|
||||
use trace_http::ctx::TraceHeaderParser;
|
||||
|
@ -54,6 +55,9 @@ pub enum Error {
|
|||
#[error("Error initializing tokio runtime: {0}")]
|
||||
TokioRuntime(#[source] std::io::Error),
|
||||
|
||||
#[error("Failed to bind address")]
|
||||
BindAddress(#[source] std::io::Error),
|
||||
|
||||
#[error("Server error: {0}")]
|
||||
Server(#[from] influxdb3_server::Error),
|
||||
|
||||
|
@ -266,12 +270,8 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
)
|
||||
.with_jaeger_debug_name(config.tracing_config.traces_jaeger_debug_name);
|
||||
|
||||
let common_state = CommonServerState::new(
|
||||
Arc::clone(&metrics),
|
||||
trace_exporter,
|
||||
trace_header_parser,
|
||||
*config.http_bind_address,
|
||||
)?;
|
||||
let common_state =
|
||||
CommonServerState::new(Arc::clone(&metrics), trace_exporter, trace_header_parser)?;
|
||||
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
|
||||
let wal: Option<Arc<WalImpl>> = config
|
||||
.wal_directory
|
||||
|
@ -300,12 +300,17 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
config.query_log_size,
|
||||
));
|
||||
|
||||
let listener = TcpListener::bind(*config.http_bind_address)
|
||||
.await
|
||||
.map_err(Error::BindAddress)?;
|
||||
|
||||
let builder = ServerBuilder::new(common_state)
|
||||
.max_request_size(config.max_http_request_size)
|
||||
.write_buffer(write_buffer)
|
||||
.query_executor(query_executor)
|
||||
.time_provider(time_provider)
|
||||
.persister(persister);
|
||||
.persister(persister)
|
||||
.tcp_listener(listener);
|
||||
|
||||
let server = if let Some(token) = config.bearer_token.map(hex::decode).transpose()? {
|
||||
builder
|
||||
|
|
|
@ -1,21 +1,23 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use authz::Authorizer;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
use crate::{auth::DefaultAuthorizer, http::HttpApi, CommonServerState, Server};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ServerBuilder<W, Q, P, T> {
|
||||
pub struct ServerBuilder<W, Q, P, T, L> {
|
||||
common_state: CommonServerState,
|
||||
time_provider: T,
|
||||
max_request_size: usize,
|
||||
write_buffer: W,
|
||||
query_executor: Q,
|
||||
persister: P,
|
||||
listener: L,
|
||||
authorizer: Arc<dyn Authorizer>,
|
||||
}
|
||||
|
||||
impl ServerBuilder<NoWriteBuf, NoQueryExec, NoPersister, NoTimeProvider> {
|
||||
impl ServerBuilder<NoWriteBuf, NoQueryExec, NoPersister, NoTimeProvider, NoListener> {
|
||||
pub fn new(common_state: CommonServerState) -> Self {
|
||||
Self {
|
||||
common_state,
|
||||
|
@ -24,12 +26,13 @@ impl ServerBuilder<NoWriteBuf, NoQueryExec, NoPersister, NoTimeProvider> {
|
|||
write_buffer: NoWriteBuf,
|
||||
query_executor: NoQueryExec,
|
||||
persister: NoPersister,
|
||||
listener: NoListener,
|
||||
authorizer: Arc::new(DefaultAuthorizer),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<W, Q, P, T> ServerBuilder<W, Q, P, T> {
|
||||
impl<W, Q, P, T, L> ServerBuilder<W, Q, P, T, L> {
|
||||
pub fn max_request_size(mut self, max_request_size: usize) -> Self {
|
||||
self.max_request_size = max_request_size;
|
||||
self
|
||||
|
@ -57,9 +60,13 @@ pub struct WithPersister<P>(Arc<P>);
|
|||
pub struct NoTimeProvider;
|
||||
#[derive(Debug)]
|
||||
pub struct WithTimeProvider<T>(Arc<T>);
|
||||
#[derive(Debug)]
|
||||
pub struct NoListener;
|
||||
#[derive(Debug)]
|
||||
pub struct WithListener(TcpListener);
|
||||
|
||||
impl<Q, P, T> ServerBuilder<NoWriteBuf, Q, P, T> {
|
||||
pub fn write_buffer<W>(self, wb: Arc<W>) -> ServerBuilder<WithWriteBuf<W>, Q, P, T> {
|
||||
impl<Q, P, T, L> ServerBuilder<NoWriteBuf, Q, P, T, L> {
|
||||
pub fn write_buffer<W>(self, wb: Arc<W>) -> ServerBuilder<WithWriteBuf<W>, Q, P, T, L> {
|
||||
ServerBuilder {
|
||||
common_state: self.common_state,
|
||||
time_provider: self.time_provider,
|
||||
|
@ -67,13 +74,14 @@ impl<Q, P, T> ServerBuilder<NoWriteBuf, Q, P, T> {
|
|||
write_buffer: WithWriteBuf(wb),
|
||||
query_executor: self.query_executor,
|
||||
persister: self.persister,
|
||||
listener: self.listener,
|
||||
authorizer: self.authorizer,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<W, P, T> ServerBuilder<W, NoQueryExec, P, T> {
|
||||
pub fn query_executor<Q>(self, qe: Arc<Q>) -> ServerBuilder<W, WithQueryExec<Q>, P, T> {
|
||||
impl<W, P, T, L> ServerBuilder<W, NoQueryExec, P, T, L> {
|
||||
pub fn query_executor<Q>(self, qe: Arc<Q>) -> ServerBuilder<W, WithQueryExec<Q>, P, T, L> {
|
||||
ServerBuilder {
|
||||
common_state: self.common_state,
|
||||
time_provider: self.time_provider,
|
||||
|
@ -81,13 +89,14 @@ impl<W, P, T> ServerBuilder<W, NoQueryExec, P, T> {
|
|||
write_buffer: self.write_buffer,
|
||||
query_executor: WithQueryExec(qe),
|
||||
persister: self.persister,
|
||||
listener: self.listener,
|
||||
authorizer: self.authorizer,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<W, Q, T> ServerBuilder<W, Q, NoPersister, T> {
|
||||
pub fn persister<P>(self, p: Arc<P>) -> ServerBuilder<W, Q, WithPersister<P>, T> {
|
||||
impl<W, Q, T, L> ServerBuilder<W, Q, NoPersister, T, L> {
|
||||
pub fn persister<P>(self, p: Arc<P>) -> ServerBuilder<W, Q, WithPersister<P>, T, L> {
|
||||
ServerBuilder {
|
||||
common_state: self.common_state,
|
||||
time_provider: self.time_provider,
|
||||
|
@ -95,13 +104,14 @@ impl<W, Q, T> ServerBuilder<W, Q, NoPersister, T> {
|
|||
write_buffer: self.write_buffer,
|
||||
query_executor: self.query_executor,
|
||||
persister: WithPersister(p),
|
||||
listener: self.listener,
|
||||
authorizer: self.authorizer,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<W, Q, P> ServerBuilder<W, Q, P, NoTimeProvider> {
|
||||
pub fn time_provider<T>(self, tp: Arc<T>) -> ServerBuilder<W, Q, P, WithTimeProvider<T>> {
|
||||
impl<W, Q, P, L> ServerBuilder<W, Q, P, NoTimeProvider, L> {
|
||||
pub fn time_provider<T>(self, tp: Arc<T>) -> ServerBuilder<W, Q, P, WithTimeProvider<T>, L> {
|
||||
ServerBuilder {
|
||||
common_state: self.common_state,
|
||||
time_provider: WithTimeProvider(tp),
|
||||
|
@ -109,13 +119,35 @@ impl<W, Q, P> ServerBuilder<W, Q, P, NoTimeProvider> {
|
|||
write_buffer: self.write_buffer,
|
||||
query_executor: self.query_executor,
|
||||
persister: self.persister,
|
||||
listener: self.listener,
|
||||
authorizer: self.authorizer,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<W, Q, P, T> ServerBuilder<W, Q, P, T, NoListener> {
|
||||
pub fn tcp_listener(self, listener: TcpListener) -> ServerBuilder<W, Q, P, T, WithListener> {
|
||||
ServerBuilder {
|
||||
common_state: self.common_state,
|
||||
time_provider: self.time_provider,
|
||||
max_request_size: self.max_request_size,
|
||||
write_buffer: self.write_buffer,
|
||||
query_executor: self.query_executor,
|
||||
persister: self.persister,
|
||||
listener: WithListener(listener),
|
||||
authorizer: self.authorizer,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<W, Q, P, T>
|
||||
ServerBuilder<WithWriteBuf<W>, WithQueryExec<Q>, WithPersister<P>, WithTimeProvider<T>>
|
||||
ServerBuilder<
|
||||
WithWriteBuf<W>,
|
||||
WithQueryExec<Q>,
|
||||
WithPersister<P>,
|
||||
WithTimeProvider<T>,
|
||||
WithListener,
|
||||
>
|
||||
{
|
||||
pub fn build(self) -> Server<W, Q, P, T> {
|
||||
let persister = Arc::clone(&self.persister.0);
|
||||
|
@ -133,6 +165,7 @@ impl<W, Q, P, T>
|
|||
http,
|
||||
persister,
|
||||
authorizer,
|
||||
listener: self.listener.0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,8 @@ use crate::http::HttpApi;
|
|||
use async_trait::async_trait;
|
||||
use authz::Authorizer;
|
||||
use datafusion::execution::SendableRecordBatchStream;
|
||||
use hyper::server::conn::AddrIncoming;
|
||||
use hyper::server::conn::Http;
|
||||
use hyper::service::service_fn;
|
||||
use influxdb3_write::{Persister, WriteBuffer};
|
||||
use iox_query::QueryDatabase;
|
||||
|
@ -33,9 +35,9 @@ use observability_deps::tracing::error;
|
|||
use service::hybrid;
|
||||
use std::convert::Infallible;
|
||||
use std::fmt::Debug;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tower::Layer;
|
||||
use trace::ctx::SpanContext;
|
||||
|
@ -76,7 +78,6 @@ pub struct CommonServerState {
|
|||
metrics: Arc<metric::Registry>,
|
||||
trace_exporter: Option<Arc<trace_exporters::export::AsyncExporter>>,
|
||||
trace_header_parser: TraceHeaderParser,
|
||||
http_addr: SocketAddr,
|
||||
}
|
||||
|
||||
impl CommonServerState {
|
||||
|
@ -84,13 +85,11 @@ impl CommonServerState {
|
|||
metrics: Arc<metric::Registry>,
|
||||
trace_exporter: Option<Arc<trace_exporters::export::AsyncExporter>>,
|
||||
trace_header_parser: TraceHeaderParser,
|
||||
http_addr: SocketAddr,
|
||||
) -> Result<Self> {
|
||||
Ok(Self {
|
||||
metrics,
|
||||
trace_exporter,
|
||||
trace_header_parser,
|
||||
http_addr,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -120,6 +119,7 @@ pub struct Server<W, Q, P, T> {
|
|||
http: Arc<HttpApi<W, Q, T>>,
|
||||
persister: Arc<P>,
|
||||
authorizer: Arc<dyn Authorizer>,
|
||||
listener: TcpListener,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
@ -193,7 +193,8 @@ where
|
|||
|
||||
let hybrid_make_service = hybrid(rest_service, grpc_service);
|
||||
|
||||
hyper::Server::bind(&server.common_state.http_addr)
|
||||
let addr = AddrIncoming::from_listener(server.listener)?;
|
||||
hyper::server::Builder::new(addr, Http::new())
|
||||
.serve(hybrid_make_service)
|
||||
.with_graceful_shutdown(shutdown.cancelled())
|
||||
.await?;
|
||||
|
@ -231,6 +232,8 @@ mod tests {
|
|||
use datafusion::parquet::data_type::AsBytes;
|
||||
use hyper::{body, Body, Client, Request, Response, StatusCode};
|
||||
use influxdb3_write::persister::PersisterImpl;
|
||||
use influxdb3_write::wal::WalImpl;
|
||||
use influxdb3_write::write_buffer::WriteBufferImpl;
|
||||
use influxdb3_write::SegmentDuration;
|
||||
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
|
||||
use iox_time::{MockProvider, Time};
|
||||
|
@ -238,75 +241,17 @@ mod tests {
|
|||
use parquet_file::storage::{ParquetStorage, StorageId};
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::HashMap;
|
||||
use std::net::{SocketAddr, SocketAddrV4};
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::atomic::{AtomicU16, Ordering};
|
||||
use std::sync::Arc;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
static NEXT_PORT: AtomicU16 = AtomicU16::new(8090);
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn write_and_query() {
|
||||
let addr = get_free_port();
|
||||
let trace_header_parser = trace_http::ctx::TraceHeaderParser::new();
|
||||
let metrics = Arc::new(metric::Registry::new());
|
||||
let common_state =
|
||||
crate::CommonServerState::new(Arc::clone(&metrics), None, trace_header_parser, addr)
|
||||
.unwrap();
|
||||
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::new());
|
||||
let parquet_store =
|
||||
ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3"));
|
||||
let exec = Arc::new(Executor::new_with_config_and_executor(
|
||||
ExecutorConfig {
|
||||
target_query_partitions: NonZeroUsize::new(1).unwrap(),
|
||||
object_stores: [&parquet_store]
|
||||
.into_iter()
|
||||
.map(|store| (store.id(), Arc::clone(store.object_store())))
|
||||
.collect(),
|
||||
metric_registry: Arc::clone(&metrics),
|
||||
mem_pool_size: usize::MAX,
|
||||
},
|
||||
DedicatedExecutor::new_testing(),
|
||||
));
|
||||
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
|
||||
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
|
||||
let start_time = 0;
|
||||
let (server, shutdown, _) = setup_server(start_time).await;
|
||||
|
||||
let write_buffer = Arc::new(
|
||||
influxdb3_write::write_buffer::WriteBufferImpl::new(
|
||||
Arc::clone(&persister),
|
||||
None::<Arc<influxdb3_write::wal::WalImpl>>,
|
||||
Arc::clone(&time_provider),
|
||||
SegmentDuration::new_5m(),
|
||||
Arc::clone(&exec),
|
||||
10000,
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
);
|
||||
let query_executor = Arc::new(crate::query_executor::QueryExecutorImpl::new(
|
||||
write_buffer.catalog(),
|
||||
Arc::clone(&write_buffer),
|
||||
Arc::clone(&exec),
|
||||
Arc::clone(&metrics),
|
||||
Arc::new(HashMap::new()),
|
||||
10,
|
||||
10,
|
||||
));
|
||||
|
||||
let server = ServerBuilder::new(common_state)
|
||||
.write_buffer(Arc::clone(&write_buffer))
|
||||
.query_executor(Arc::clone(&query_executor))
|
||||
.persister(Arc::clone(&persister))
|
||||
.authorizer(Arc::new(DefaultAuthorizer))
|
||||
.time_provider(Arc::clone(&time_provider))
|
||||
.build();
|
||||
let frontend_shutdown = CancellationToken::new();
|
||||
let shutdown = frontend_shutdown.clone();
|
||||
|
||||
tokio::spawn(async move { serve(server, frontend_shutdown).await });
|
||||
|
||||
let server = format!("http://{}", addr);
|
||||
write_lp(
|
||||
&server,
|
||||
"foo",
|
||||
|
@ -409,66 +354,9 @@ mod tests {
|
|||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn write_lp_tests() {
|
||||
let addr = get_free_port();
|
||||
let trace_header_parser = trace_http::ctx::TraceHeaderParser::new();
|
||||
let metrics = Arc::new(metric::Registry::new());
|
||||
let common_state =
|
||||
crate::CommonServerState::new(Arc::clone(&metrics), None, trace_header_parser, addr)
|
||||
.unwrap();
|
||||
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::new());
|
||||
let parquet_store =
|
||||
ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3"));
|
||||
let exec = Arc::new(Executor::new_with_config_and_executor(
|
||||
ExecutorConfig {
|
||||
target_query_partitions: NonZeroUsize::new(1).unwrap(),
|
||||
object_stores: [&parquet_store]
|
||||
.into_iter()
|
||||
.map(|store| (store.id(), Arc::clone(store.object_store())))
|
||||
.collect(),
|
||||
metric_registry: Arc::clone(&metrics),
|
||||
mem_pool_size: usize::MAX,
|
||||
},
|
||||
DedicatedExecutor::new_testing(),
|
||||
));
|
||||
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
|
||||
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
|
||||
let start_time = 0;
|
||||
let (server, shutdown, _) = setup_server(start_time).await;
|
||||
|
||||
let write_buffer = Arc::new(
|
||||
influxdb3_write::write_buffer::WriteBufferImpl::new(
|
||||
Arc::clone(&persister),
|
||||
None::<Arc<influxdb3_write::wal::WalImpl>>,
|
||||
Arc::clone(&time_provider),
|
||||
SegmentDuration::new_5m(),
|
||||
Arc::clone(&exec),
|
||||
10000,
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
);
|
||||
let query_executor = crate::query_executor::QueryExecutorImpl::new(
|
||||
write_buffer.catalog(),
|
||||
Arc::clone(&write_buffer),
|
||||
Arc::clone(&exec),
|
||||
Arc::clone(&metrics),
|
||||
Arc::new(HashMap::new()),
|
||||
10,
|
||||
10,
|
||||
);
|
||||
|
||||
let server = ServerBuilder::new(common_state)
|
||||
.write_buffer(Arc::clone(&write_buffer))
|
||||
.query_executor(Arc::new(query_executor))
|
||||
.persister(persister)
|
||||
.authorizer(Arc::new(DefaultAuthorizer))
|
||||
.time_provider(Arc::clone(&time_provider))
|
||||
.build();
|
||||
let frontend_shutdown = CancellationToken::new();
|
||||
let shutdown = frontend_shutdown.clone();
|
||||
|
||||
tokio::spawn(async move { serve(server, frontend_shutdown).await });
|
||||
|
||||
// Test that only one error comes back
|
||||
let server = format!("http://{}", addr);
|
||||
let resp = write_lp(
|
||||
&server,
|
||||
"foo",
|
||||
|
@ -615,67 +503,9 @@ mod tests {
|
|||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn write_lp_precision_tests() {
|
||||
let addr = get_free_port();
|
||||
let trace_header_parser = trace_http::ctx::TraceHeaderParser::new();
|
||||
let metrics = Arc::new(metric::Registry::new());
|
||||
let common_state =
|
||||
crate::CommonServerState::new(Arc::clone(&metrics), None, trace_header_parser, addr)
|
||||
.unwrap();
|
||||
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::new());
|
||||
let parquet_store =
|
||||
ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3"));
|
||||
let exec = Arc::new(Executor::new_with_config_and_executor(
|
||||
ExecutorConfig {
|
||||
target_query_partitions: NonZeroUsize::new(1).unwrap(),
|
||||
object_stores: [&parquet_store]
|
||||
.into_iter()
|
||||
.map(|store| (store.id(), Arc::clone(store.object_store())))
|
||||
.collect(),
|
||||
metric_registry: Arc::clone(&metrics),
|
||||
mem_pool_size: usize::MAX,
|
||||
},
|
||||
DedicatedExecutor::new_testing(),
|
||||
));
|
||||
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
|
||||
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(
|
||||
1708473607000000000,
|
||||
)));
|
||||
let start_time = 1708473607000000000;
|
||||
let (server, shutdown, _) = setup_server(start_time).await;
|
||||
|
||||
let write_buffer = Arc::new(
|
||||
influxdb3_write::write_buffer::WriteBufferImpl::new(
|
||||
Arc::clone(&persister),
|
||||
None::<Arc<influxdb3_write::wal::WalImpl>>,
|
||||
Arc::clone(&time_provider),
|
||||
SegmentDuration::new_5m(),
|
||||
Arc::clone(&exec),
|
||||
10000,
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
);
|
||||
let query_executor = crate::query_executor::QueryExecutorImpl::new(
|
||||
write_buffer.catalog(),
|
||||
Arc::clone(&write_buffer),
|
||||
Arc::clone(&exec),
|
||||
Arc::clone(&metrics),
|
||||
Arc::new(HashMap::new()),
|
||||
10,
|
||||
10,
|
||||
);
|
||||
|
||||
let server = ServerBuilder::new(common_state)
|
||||
.write_buffer(Arc::clone(&write_buffer))
|
||||
.query_executor(Arc::new(query_executor))
|
||||
.persister(persister)
|
||||
.authorizer(Arc::new(DefaultAuthorizer))
|
||||
.time_provider(Arc::clone(&time_provider))
|
||||
.build();
|
||||
let frontend_shutdown = CancellationToken::new();
|
||||
let shutdown = frontend_shutdown.clone();
|
||||
|
||||
tokio::spawn(async move { serve(server, frontend_shutdown).await });
|
||||
|
||||
let server = format!("http://{}", addr);
|
||||
let resp = write_lp(
|
||||
&server,
|
||||
"foo",
|
||||
|
@ -789,6 +619,153 @@ mod tests {
|
|||
shutdown.cancel();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn query_from_last_cache() {
|
||||
let start_time = 0;
|
||||
let (url, shutdown, wbuf) = setup_server(start_time).await;
|
||||
let db_name = "foo";
|
||||
let tbl_name = "cpu";
|
||||
|
||||
// Write to generate a db/table in the catalog:
|
||||
let resp = write_lp(
|
||||
&url,
|
||||
db_name,
|
||||
format!("{tbl_name},region=us,host=a usage=50 500"),
|
||||
None,
|
||||
false,
|
||||
"second",
|
||||
)
|
||||
.await;
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
|
||||
// Create the last cache:
|
||||
wbuf.create_last_cache(db_name, tbl_name, None, None, None, None, None)
|
||||
.expect("create last cache");
|
||||
|
||||
// Write to put something in the last cache:
|
||||
let resp = write_lp(
|
||||
&url,
|
||||
db_name,
|
||||
format!(
|
||||
"\
|
||||
{tbl_name},region=us,host=a usage=11 1000\n\
|
||||
{tbl_name},region=us,host=b usage=22 1000\n\
|
||||
{tbl_name},region=us,host=c usage=33 1000\n\
|
||||
{tbl_name},region=ca,host=d usage=44 1000\n\
|
||||
{tbl_name},region=ca,host=e usage=55 1000\n\
|
||||
{tbl_name},region=eu,host=f usage=66 1000\n\
|
||||
"
|
||||
),
|
||||
None,
|
||||
false,
|
||||
"second",
|
||||
)
|
||||
.await;
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
|
||||
// Query from the last cache:
|
||||
let res = query(
|
||||
&url,
|
||||
db_name,
|
||||
format!("SELECT * FROM last_cache('{tbl_name}') ORDER BY host"),
|
||||
"pretty",
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
let body = body::to_bytes(res.into_body()).await.unwrap();
|
||||
let body = String::from_utf8(body.as_bytes().to_vec()).unwrap();
|
||||
assert_eq!(
|
||||
"\
|
||||
+------+--------+---------------------+-------+\n\
|
||||
| host | region | time | usage |\n\
|
||||
+------+--------+---------------------+-------+\n\
|
||||
| a | us | 1970-01-01T00:16:40 | 11.0 |\n\
|
||||
| b | us | 1970-01-01T00:16:40 | 22.0 |\n\
|
||||
| c | us | 1970-01-01T00:16:40 | 33.0 |\n\
|
||||
| d | ca | 1970-01-01T00:16:40 | 44.0 |\n\
|
||||
| e | ca | 1970-01-01T00:16:40 | 55.0 |\n\
|
||||
| f | eu | 1970-01-01T00:16:40 | 66.0 |\n\
|
||||
+------+--------+---------------------+-------+",
|
||||
body
|
||||
);
|
||||
|
||||
shutdown.cancel();
|
||||
}
|
||||
|
||||
async fn setup_server(
|
||||
start_time: i64,
|
||||
) -> (
|
||||
String,
|
||||
CancellationToken,
|
||||
Arc<WriteBufferImpl<WalImpl, MockProvider>>,
|
||||
) {
|
||||
let trace_header_parser = trace_http::ctx::TraceHeaderParser::new();
|
||||
let metrics = Arc::new(metric::Registry::new());
|
||||
let common_state =
|
||||
crate::CommonServerState::new(Arc::clone(&metrics), None, trace_header_parser).unwrap();
|
||||
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::new());
|
||||
let parquet_store =
|
||||
ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3"));
|
||||
let exec = Arc::new(Executor::new_with_config_and_executor(
|
||||
ExecutorConfig {
|
||||
target_query_partitions: NonZeroUsize::new(1).unwrap(),
|
||||
object_stores: [&parquet_store]
|
||||
.into_iter()
|
||||
.map(|store| (store.id(), Arc::clone(store.object_store())))
|
||||
.collect(),
|
||||
metric_registry: Arc::clone(&metrics),
|
||||
mem_pool_size: usize::MAX,
|
||||
},
|
||||
DedicatedExecutor::new_testing(),
|
||||
));
|
||||
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store)));
|
||||
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(start_time)));
|
||||
|
||||
let write_buffer = Arc::new(
|
||||
influxdb3_write::write_buffer::WriteBufferImpl::new(
|
||||
Arc::clone(&persister),
|
||||
None::<Arc<influxdb3_write::wal::WalImpl>>,
|
||||
Arc::clone(&time_provider),
|
||||
SegmentDuration::new_5m(),
|
||||
Arc::clone(&exec),
|
||||
10000,
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
);
|
||||
let query_executor = crate::query_executor::QueryExecutorImpl::new(
|
||||
write_buffer.catalog(),
|
||||
Arc::clone(&write_buffer),
|
||||
Arc::clone(&exec),
|
||||
Arc::clone(&metrics),
|
||||
Arc::new(HashMap::new()),
|
||||
10,
|
||||
10,
|
||||
);
|
||||
|
||||
// bind to port 0 will assign a random available port:
|
||||
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
||||
let listener = TcpListener::bind(socket_addr)
|
||||
.await
|
||||
.expect("bind tcp address");
|
||||
let addr = listener.local_addr().unwrap();
|
||||
|
||||
let server = ServerBuilder::new(common_state)
|
||||
.write_buffer(Arc::clone(&write_buffer))
|
||||
.query_executor(Arc::new(query_executor))
|
||||
.persister(persister)
|
||||
.authorizer(Arc::new(DefaultAuthorizer))
|
||||
.time_provider(Arc::clone(&time_provider))
|
||||
.tcp_listener(listener)
|
||||
.build();
|
||||
let frontend_shutdown = CancellationToken::new();
|
||||
let shutdown = frontend_shutdown.clone();
|
||||
|
||||
tokio::spawn(async move { serve(server, frontend_shutdown).await });
|
||||
|
||||
(format!("http://{addr}"), shutdown, write_buffer)
|
||||
}
|
||||
|
||||
pub(crate) async fn write_lp(
|
||||
server: impl Into<String> + Send,
|
||||
database: impl Into<String> + Send,
|
||||
|
@ -853,17 +830,4 @@ mod tests {
|
|||
.await
|
||||
.expect("http error sending query")
|
||||
}
|
||||
|
||||
pub(crate) fn get_free_port() -> SocketAddr {
|
||||
let ip = std::net::Ipv4Addr::new(127, 0, 0, 1);
|
||||
|
||||
loop {
|
||||
let port = NEXT_PORT.fetch_add(1, Ordering::SeqCst);
|
||||
let addr = SocketAddrV4::new(ip, port);
|
||||
|
||||
if std::net::TcpListener::bind(addr).is_ok() {
|
||||
return addr.into();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ use datafusion::physical_plan::ExecutionPlan;
|
|||
use datafusion::prelude::Expr;
|
||||
use datafusion_util::config::DEFAULT_SCHEMA;
|
||||
use datafusion_util::MemoryStream;
|
||||
use influxdb3_write::last_cache::LastCacheFunction;
|
||||
use influxdb3_write::{
|
||||
catalog::{Catalog, DatabaseSchema},
|
||||
WriteBuffer,
|
||||
|
@ -443,10 +444,20 @@ impl<B: WriteBuffer> QueryNamespace for Database<B> {
|
|||
cfg = cfg.with_config_option(k, v);
|
||||
}
|
||||
|
||||
cfg.build()
|
||||
let ctx = cfg.build();
|
||||
ctx.inner().register_udtf(
|
||||
LAST_CACHE_UDTF_NAME,
|
||||
Arc::new(LastCacheFunction::new(
|
||||
&self.db_schema.name,
|
||||
self.write_buffer.last_cache(),
|
||||
)),
|
||||
);
|
||||
ctx
|
||||
}
|
||||
}
|
||||
|
||||
const LAST_CACHE_UDTF_NAME: &str = "last_cache";
|
||||
|
||||
impl<B: WriteBuffer> CatalogProvider for Database<B> {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self as &dyn Any
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
use std::{
|
||||
any::Any,
|
||||
collections::VecDeque,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
|
@ -17,26 +16,24 @@ use arrow::{
|
|||
},
|
||||
error::ArrowError,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use datafusion::{
|
||||
common::Result as DFResult,
|
||||
datasource::{TableProvider, TableType},
|
||||
execution::context::SessionState,
|
||||
logical_expr::{BinaryExpr, Expr, Operator, TableProviderFilterPushDown},
|
||||
physical_plan::{memory::MemoryExec, ExecutionPlan},
|
||||
logical_expr::{BinaryExpr, Expr, Operator},
|
||||
scalar::ScalarValue,
|
||||
};
|
||||
use hashbrown::{HashMap, HashSet};
|
||||
use indexmap::{IndexMap, IndexSet};
|
||||
use iox_time::Time;
|
||||
use parking_lot::RwLock;
|
||||
use schema::{InfluxColumnType, InfluxFieldType, Schema, SchemaBuilder, TIME_COLUMN_NAME};
|
||||
use schema::{InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME};
|
||||
|
||||
use crate::{
|
||||
catalog::LastCacheSize,
|
||||
write_buffer::{buffer_segment::WriteBatch, Field, FieldData, Row},
|
||||
};
|
||||
|
||||
mod table_function;
|
||||
pub use table_function::LastCacheFunction;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
#[error("invalid cache size")]
|
||||
|
@ -51,6 +48,8 @@ pub enum Error {
|
|||
ValueColumnDoesNotExist { column_name: String },
|
||||
#[error("schema builder error: {0}")]
|
||||
SchemaBuilder(#[from] schema::builder::Error),
|
||||
#[error("requested last cache does not exist")]
|
||||
CacheDoesNotExist,
|
||||
}
|
||||
|
||||
impl Error {
|
||||
|
@ -118,6 +117,34 @@ impl LastCacheProvider {
|
|||
}
|
||||
}
|
||||
|
||||
/// Get a particular cache's name and arrow schema
|
||||
///
|
||||
/// This is used for the implementation of DataFusion's `TableFunctionImpl` and `TableProvider`
|
||||
/// traits.
|
||||
fn get_cache_name_and_schema(
|
||||
&self,
|
||||
db_name: &str,
|
||||
tbl_name: &str,
|
||||
cache_name: Option<&str>,
|
||||
) -> Option<(String, ArrowSchemaRef)> {
|
||||
self.cache_map
|
||||
.read()
|
||||
.get(db_name)
|
||||
.and_then(|db| db.get(tbl_name))
|
||||
.and_then(|tbl| {
|
||||
if let Some(name) = cache_name {
|
||||
tbl.get(name)
|
||||
.map(|lc| (name.to_string(), Arc::clone(&lc.schema)))
|
||||
} else if tbl.len() == 1 {
|
||||
tbl.iter()
|
||||
.map(|(name, lc)| (name.to_string(), Arc::clone(&lc.schema)))
|
||||
.next()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Create a new entry in the last cache for a given database and table, along with the given
|
||||
/// parameters.
|
||||
pub(crate) fn create_cache(
|
||||
|
@ -201,14 +228,26 @@ impl LastCacheProvider {
|
|||
)
|
||||
};
|
||||
|
||||
// build a schema that only holds the field columns
|
||||
let mut schema_builder = SchemaBuilder::new();
|
||||
for (t, name) in schema
|
||||
let mut schema_builder = ArrowSchemaBuilder::new();
|
||||
// Add key columns first:
|
||||
for (t, field) in schema
|
||||
.iter()
|
||||
.filter(|&(_, f)| key_columns.contains(f.name()))
|
||||
{
|
||||
if let InfluxColumnType::Tag = t {
|
||||
// override tags with string type in the schema, because the KeyValue type stores
|
||||
// them as strings, and produces them as StringArray when creating RecordBatches:
|
||||
schema_builder.push(ArrowField::new(field.name(), DataType::Utf8, false))
|
||||
} else {
|
||||
schema_builder.push(field.clone());
|
||||
};
|
||||
}
|
||||
// Add value columns second:
|
||||
for (_, field) in schema
|
||||
.iter()
|
||||
.filter(|&(_, f)| value_columns.contains(f.name()))
|
||||
.map(|(t, f)| (t, f.name()))
|
||||
{
|
||||
schema_builder.influx_column(name, t);
|
||||
schema_builder.push(field.clone());
|
||||
}
|
||||
|
||||
let series_key = schema
|
||||
|
@ -223,7 +262,7 @@ impl LastCacheProvider {
|
|||
.map_err(|_| Error::InvalidCacheSize)?,
|
||||
ttl.unwrap_or(DEFAULT_CACHE_TTL),
|
||||
key_columns,
|
||||
schema_builder.build()?.as_arrow(),
|
||||
Arc::new(schema_builder.finish()),
|
||||
series_key,
|
||||
accept_new_fields,
|
||||
);
|
||||
|
@ -340,9 +379,7 @@ pub(crate) struct LastCache {
|
|||
/// The key columns for this cache
|
||||
///
|
||||
/// Uses an [`IndexSet`] for both fast iteration and fast lookup.
|
||||
key_columns: IndexSet<String>,
|
||||
/// The Arrow Schema for the table that this cache is associated with
|
||||
schema: ArrowSchemaRef,
|
||||
key_columns: Arc<IndexSet<String>>,
|
||||
/// Optionally store the series key for tables that use it for ensuring non-nullability in the
|
||||
/// column buffer for series key columns
|
||||
///
|
||||
|
@ -351,6 +388,8 @@ pub(crate) struct LastCache {
|
|||
series_key: Option<HashSet<String>>,
|
||||
/// Whether or not this cache accepts newly written fields
|
||||
accept_new_fields: bool,
|
||||
/// The Arrow Schema for the table that this cache is associated with
|
||||
schema: ArrowSchemaRef,
|
||||
/// The internal state of the cache
|
||||
state: LastCacheState,
|
||||
}
|
||||
|
@ -368,10 +407,10 @@ impl LastCache {
|
|||
Self {
|
||||
count,
|
||||
ttl,
|
||||
key_columns: key_columns.into_iter().collect(),
|
||||
schema,
|
||||
key_columns: Arc::new(key_columns.into_iter().collect()),
|
||||
series_key,
|
||||
accept_new_fields,
|
||||
schema,
|
||||
state: LastCacheState::Init,
|
||||
}
|
||||
}
|
||||
|
@ -418,6 +457,7 @@ impl LastCache {
|
|||
/// This will panic if the internal cache state's keys are out-of-order with respect to the
|
||||
/// order of the `key_columns` on this [`LastCache`]
|
||||
pub(crate) fn push(&mut self, row: &Row) {
|
||||
let schema = Arc::clone(&self.schema);
|
||||
let mut target = &mut self.state;
|
||||
let mut key_iter = self.key_columns.iter().peekable();
|
||||
while let (Some(key), peek) = (key_iter.next(), key_iter.peek()) {
|
||||
|
@ -451,7 +491,8 @@ impl LastCache {
|
|||
LastCacheState::Store(LastCacheStore::new(
|
||||
self.count.into(),
|
||||
self.ttl,
|
||||
Arc::clone(&self.schema),
|
||||
Arc::clone(&schema),
|
||||
Arc::clone(&self.key_columns),
|
||||
self.series_key.as_ref(),
|
||||
))
|
||||
}
|
||||
|
@ -462,14 +503,15 @@ impl LastCache {
|
|||
*target = LastCacheState::Store(LastCacheStore::new(
|
||||
self.count.into(),
|
||||
self.ttl,
|
||||
Arc::clone(&self.schema),
|
||||
Arc::clone(&schema),
|
||||
Arc::clone(&self.key_columns),
|
||||
self.series_key.as_ref(),
|
||||
));
|
||||
}
|
||||
let store = target.as_store_mut().expect(
|
||||
"cache target should be the actual store after iterating through all key columns",
|
||||
);
|
||||
let Some(new_columns) = store.push(row, self.accept_new_fields, &self.key_columns) else {
|
||||
let Some(new_columns) = store.push(row, self.accept_new_fields) else {
|
||||
// Unless new columns were added, and we need to update the schema, we are done.
|
||||
return;
|
||||
};
|
||||
|
@ -490,10 +532,10 @@ impl LastCache {
|
|||
fn to_record_batches(&self, predicates: &[Predicate]) -> Result<Vec<RecordBatch>, ArrowError> {
|
||||
// map the provided predicates on to the key columns
|
||||
// there may not be predicates provided for each key column, hence the Option
|
||||
let predicates: Vec<Option<Predicate>> = self
|
||||
let predicates: Vec<Option<&Predicate>> = self
|
||||
.key_columns
|
||||
.iter()
|
||||
.map(|key| predicates.iter().find(|p| p.key == *key).cloned())
|
||||
.map(|key| predicates.iter().find(|p| p.key == *key))
|
||||
.collect();
|
||||
|
||||
let mut caches = vec![ExtendedLastCacheState {
|
||||
|
@ -508,13 +550,15 @@ impl LastCache {
|
|||
let mut new_caches = vec![];
|
||||
'cache_loop: for c in caches {
|
||||
let cache_key = c.state.as_key().unwrap();
|
||||
if let Some(ref pred) = predicate {
|
||||
if let Some(pred) = predicate {
|
||||
let Some(next_state) = cache_key.evaluate_predicate(pred) else {
|
||||
continue 'cache_loop;
|
||||
};
|
||||
let mut additional_columns = c.additional_columns.clone();
|
||||
additional_columns.push((&cache_key.column_name, &pred.value));
|
||||
new_caches.push(ExtendedLastCacheState {
|
||||
state: next_state,
|
||||
additional_columns: c.additional_columns.clone(),
|
||||
additional_columns,
|
||||
});
|
||||
} else {
|
||||
new_caches.extend(cache_key.value_map.iter().map(|(v, state)| {
|
||||
|
@ -825,6 +869,8 @@ struct LastCacheStore {
|
|||
///
|
||||
/// [perf]: https://github.com/indexmap-rs/indexmap?tab=readme-ov-file#performance
|
||||
cache: IndexMap<String, CacheColumn>,
|
||||
/// A reference to the set of key columns for the cache
|
||||
key_columns: Arc<IndexSet<String>>,
|
||||
/// A ring buffer holding the instants at which entries in the cache were inserted
|
||||
///
|
||||
/// This is used to evict cache values that outlive the `ttl`
|
||||
|
@ -845,11 +891,13 @@ impl LastCacheStore {
|
|||
count: usize,
|
||||
ttl: Duration,
|
||||
schema: ArrowSchemaRef,
|
||||
key_columns: Arc<IndexSet<String>>,
|
||||
series_keys: Option<&HashSet<String>>,
|
||||
) -> Self {
|
||||
let cache = schema
|
||||
.fields()
|
||||
.iter()
|
||||
.filter(|f| !key_columns.contains(f.name()))
|
||||
.map(|f| {
|
||||
(
|
||||
f.name().to_string(),
|
||||
|
@ -863,6 +911,7 @@ impl LastCacheStore {
|
|||
.collect();
|
||||
Self {
|
||||
cache,
|
||||
key_columns,
|
||||
instants: VecDeque::with_capacity(count),
|
||||
count,
|
||||
ttl,
|
||||
|
@ -888,7 +937,6 @@ impl LastCacheStore {
|
|||
&mut self,
|
||||
row: &'a Row,
|
||||
accept_new_fields: bool,
|
||||
key_columns: &IndexSet<String>,
|
||||
) -> Option<Vec<(&'a str, DataType)>> {
|
||||
if row.time <= self.last_time.timestamp_nanos() {
|
||||
return None;
|
||||
|
@ -904,7 +952,7 @@ impl LastCacheStore {
|
|||
if let Some(col) = self.cache.get_mut(&field.name) {
|
||||
// In this case, the field already has an entry in the cache, so just push:
|
||||
col.push(&field.value);
|
||||
} else if !key_columns.contains(&field.name) {
|
||||
} else if !self.key_columns.contains(&field.name) {
|
||||
// In this case, there is not an entry for the field in the cache, so if the
|
||||
// value is not one of the key columns, then it is a new field being added.
|
||||
let data_type = data_type_from_buffer_field(field);
|
||||
|
@ -963,11 +1011,14 @@ impl LastCacheStore {
|
|||
.fields()
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(|f| {
|
||||
.filter_map(|f| {
|
||||
if let Some(c) = self.cache.get(f.name()) {
|
||||
(f, c.data.as_array())
|
||||
Some((f, c.data.as_array()))
|
||||
} else if self.key_columns.contains(f.name()) {
|
||||
// We prepend key columns with the extended set provided
|
||||
None
|
||||
} else {
|
||||
(Arc::clone(&f), new_null_array(f.data_type(), self.len()))
|
||||
Some((Arc::clone(&f), new_null_array(f.data_type(), self.len())))
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
@ -1006,52 +1057,15 @@ impl LastCacheStore {
|
|||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl TableProvider for LastCache {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self as &dyn Any
|
||||
}
|
||||
|
||||
fn schema(&self) -> ArrowSchemaRef {
|
||||
Arc::clone(&self.schema)
|
||||
}
|
||||
|
||||
fn table_type(&self) -> TableType {
|
||||
TableType::Temporary
|
||||
}
|
||||
|
||||
fn supports_filters_pushdown(
|
||||
&self,
|
||||
filters: &[&Expr],
|
||||
) -> DFResult<Vec<TableProviderFilterPushDown>> {
|
||||
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
|
||||
}
|
||||
|
||||
async fn scan(
|
||||
&self,
|
||||
ctx: &SessionState,
|
||||
projection: Option<&Vec<usize>>,
|
||||
filters: &[Expr],
|
||||
_limit: Option<usize>,
|
||||
) -> DFResult<Arc<dyn ExecutionPlan>> {
|
||||
let predicates = self.convert_filter_exprs(filters);
|
||||
let partitions = vec![self.to_record_batches(&predicates)?];
|
||||
let mut exec = MemoryExec::try_new(&partitions, self.schema(), projection.cloned())?;
|
||||
|
||||
let show_sizes = ctx.config_options().explain.show_sizes;
|
||||
exec = exec.with_show_sizes(show_sizes);
|
||||
|
||||
Ok(Arc::new(exec))
|
||||
}
|
||||
}
|
||||
|
||||
/// A column in a [`LastCache`]
|
||||
///
|
||||
/// Stores its size so it can evict old data on push. Stores the time-to-live (TTL) in order
|
||||
/// to remove expired data.
|
||||
#[derive(Debug)]
|
||||
struct CacheColumn {
|
||||
/// The number of entries the [`CacheColumn`] will hold before evicting old ones on push
|
||||
size: usize,
|
||||
/// The buffer containing data for the column
|
||||
data: CacheColumnData,
|
||||
}
|
||||
|
||||
|
@ -1381,11 +1395,11 @@ mod tests {
|
|||
|
||||
assert_batches_eq!(
|
||||
[
|
||||
"+--------+-----------------------------+-------+",
|
||||
"| region | time | usage |",
|
||||
"+--------+-----------------------------+-------+",
|
||||
"| us | 1970-01-01T00:00:00.000002Z | 99.0 |",
|
||||
"+--------+-----------------------------+-------+",
|
||||
"+------+--------+-----------------------------+-------+",
|
||||
"| host | region | time | usage |",
|
||||
"+------+--------+-----------------------------+-------+",
|
||||
"| a | us | 1970-01-01T00:00:00.000002Z | 99.0 |",
|
||||
"+------+--------+-----------------------------+-------+",
|
||||
],
|
||||
&batch
|
||||
);
|
||||
|
@ -1409,11 +1423,11 @@ mod tests {
|
|||
|
||||
assert_batches_eq!(
|
||||
[
|
||||
"+--------+-----------------------------+-------+",
|
||||
"| region | time | usage |",
|
||||
"+--------+-----------------------------+-------+",
|
||||
"| us | 1970-01-01T00:00:00.000003Z | 88.0 |",
|
||||
"+--------+-----------------------------+-------+",
|
||||
"+------+--------+-----------------------------+-------+",
|
||||
"| host | region | time | usage |",
|
||||
"+------+--------+-----------------------------+-------+",
|
||||
"| a | us | 1970-01-01T00:00:00.000003Z | 88.0 |",
|
||||
"+------+--------+-----------------------------+-------+",
|
||||
],
|
||||
&batch
|
||||
);
|
||||
|
@ -1497,11 +1511,11 @@ mod tests {
|
|||
Predicate::new("host", KeyValue::string("c")),
|
||||
],
|
||||
expected: &[
|
||||
"+-----------------------------+-------+",
|
||||
"| time | usage |",
|
||||
"+-----------------------------+-------+",
|
||||
"| 1970-01-01T00:00:00.000001Z | 60.0 |",
|
||||
"+-----------------------------+-------+",
|
||||
"+--------+------+-----------------------------+-------+",
|
||||
"| region | host | time | usage |",
|
||||
"+--------+------+-----------------------------+-------+",
|
||||
"| us | c | 1970-01-01T00:00:00.000001Z | 60.0 |",
|
||||
"+--------+------+-----------------------------+-------+",
|
||||
],
|
||||
},
|
||||
// Predicate on only region key column will have host column outputted in addition to
|
||||
|
@ -1509,26 +1523,26 @@ mod tests {
|
|||
TestCase {
|
||||
predicates: &[Predicate::new("region", KeyValue::string("us"))],
|
||||
expected: &[
|
||||
"+------+-----------------------------+-------+",
|
||||
"| host | time | usage |",
|
||||
"+------+-----------------------------+-------+",
|
||||
"| a | 1970-01-01T00:00:00.000001Z | 100.0 |",
|
||||
"| c | 1970-01-01T00:00:00.000001Z | 60.0 |",
|
||||
"| b | 1970-01-01T00:00:00.000001Z | 80.0 |",
|
||||
"+------+-----------------------------+-------+",
|
||||
"+--------+------+-----------------------------+-------+",
|
||||
"| region | host | time | usage |",
|
||||
"+--------+------+-----------------------------+-------+",
|
||||
"| us | a | 1970-01-01T00:00:00.000001Z | 100.0 |",
|
||||
"| us | b | 1970-01-01T00:00:00.000001Z | 80.0 |",
|
||||
"| us | c | 1970-01-01T00:00:00.000001Z | 60.0 |",
|
||||
"+--------+------+-----------------------------+-------+",
|
||||
],
|
||||
},
|
||||
// Similar to previous, with a different region predicate:
|
||||
TestCase {
|
||||
predicates: &[Predicate::new("region", KeyValue::string("ca"))],
|
||||
expected: &[
|
||||
"+------+-----------------------------+-------+",
|
||||
"| host | time | usage |",
|
||||
"+------+-----------------------------+-------+",
|
||||
"| d | 1970-01-01T00:00:00.000001Z | 40.0 |",
|
||||
"| e | 1970-01-01T00:00:00.000001Z | 20.0 |",
|
||||
"| f | 1970-01-01T00:00:00.000001Z | 30.0 |",
|
||||
"+------+-----------------------------+-------+",
|
||||
"+--------+------+-----------------------------+-------+",
|
||||
"| region | host | time | usage |",
|
||||
"+--------+------+-----------------------------+-------+",
|
||||
"| ca | d | 1970-01-01T00:00:00.000001Z | 40.0 |",
|
||||
"| ca | e | 1970-01-01T00:00:00.000001Z | 20.0 |",
|
||||
"| ca | f | 1970-01-01T00:00:00.000001Z | 30.0 |",
|
||||
"+--------+------+-----------------------------+-------+",
|
||||
],
|
||||
},
|
||||
// Predicate on only host key column will have region column outputted in addition to
|
||||
|
@ -1536,11 +1550,11 @@ mod tests {
|
|||
TestCase {
|
||||
predicates: &[Predicate::new("host", KeyValue::string("a"))],
|
||||
expected: &[
|
||||
"+--------+-----------------------------+-------+",
|
||||
"| region | time | usage |",
|
||||
"+--------+-----------------------------+-------+",
|
||||
"| us | 1970-01-01T00:00:00.000001Z | 100.0 |",
|
||||
"+--------+-----------------------------+-------+",
|
||||
"+--------+------+-----------------------------+-------+",
|
||||
"| region | host | time | usage |",
|
||||
"+--------+------+-----------------------------+-------+",
|
||||
"| us | a | 1970-01-01T00:00:00.000001Z | 100.0 |",
|
||||
"+--------+------+-----------------------------+-------+",
|
||||
],
|
||||
},
|
||||
// Omitting all key columns from the predicate will have all key columns included in
|
||||
|
@ -1704,57 +1718,57 @@ mod tests {
|
|||
Predicate::new("host", KeyValue::string("a")),
|
||||
],
|
||||
expected: &[
|
||||
"+--------------------------------+-------+",
|
||||
"| time | usage |",
|
||||
"+--------------------------------+-------+",
|
||||
"| 1970-01-01T00:00:00.000001500Z | 99.0 |",
|
||||
"| 1970-01-01T00:00:00.000001Z | 100.0 |",
|
||||
"| 1970-01-01T00:00:00.000002500Z | 90.0 |",
|
||||
"| 1970-01-01T00:00:00.000002Z | 95.0 |",
|
||||
"+--------------------------------+-------+",
|
||||
"+--------+------+--------------------------------+-------+",
|
||||
"| region | host | time | usage |",
|
||||
"+--------+------+--------------------------------+-------+",
|
||||
"| us | a | 1970-01-01T00:00:00.000001500Z | 99.0 |",
|
||||
"| us | a | 1970-01-01T00:00:00.000001Z | 100.0 |",
|
||||
"| us | a | 1970-01-01T00:00:00.000002500Z | 90.0 |",
|
||||
"| us | a | 1970-01-01T00:00:00.000002Z | 95.0 |",
|
||||
"+--------+------+--------------------------------+-------+",
|
||||
],
|
||||
},
|
||||
TestCase {
|
||||
predicates: &[Predicate::new("region", KeyValue::string("us"))],
|
||||
expected: &[
|
||||
"+------+--------------------------------+-------+",
|
||||
"| host | time | usage |",
|
||||
"+------+--------------------------------+-------+",
|
||||
"| a | 1970-01-01T00:00:00.000001500Z | 99.0 |",
|
||||
"| a | 1970-01-01T00:00:00.000001Z | 100.0 |",
|
||||
"| a | 1970-01-01T00:00:00.000002500Z | 90.0 |",
|
||||
"| a | 1970-01-01T00:00:00.000002Z | 95.0 |",
|
||||
"| b | 1970-01-01T00:00:00.000001500Z | 88.0 |",
|
||||
"| b | 1970-01-01T00:00:00.000001Z | 80.0 |",
|
||||
"| b | 1970-01-01T00:00:00.000002500Z | 99.0 |",
|
||||
"| b | 1970-01-01T00:00:00.000002Z | 92.0 |",
|
||||
"+------+--------------------------------+-------+",
|
||||
"+--------+------+--------------------------------+-------+",
|
||||
"| region | host | time | usage |",
|
||||
"+--------+------+--------------------------------+-------+",
|
||||
"| us | a | 1970-01-01T00:00:00.000001500Z | 99.0 |",
|
||||
"| us | a | 1970-01-01T00:00:00.000001Z | 100.0 |",
|
||||
"| us | a | 1970-01-01T00:00:00.000002500Z | 90.0 |",
|
||||
"| us | a | 1970-01-01T00:00:00.000002Z | 95.0 |",
|
||||
"| us | b | 1970-01-01T00:00:00.000001500Z | 88.0 |",
|
||||
"| us | b | 1970-01-01T00:00:00.000001Z | 80.0 |",
|
||||
"| us | b | 1970-01-01T00:00:00.000002500Z | 99.0 |",
|
||||
"| us | b | 1970-01-01T00:00:00.000002Z | 92.0 |",
|
||||
"+--------+------+--------------------------------+-------+",
|
||||
],
|
||||
},
|
||||
TestCase {
|
||||
predicates: &[Predicate::new("host", KeyValue::string("a"))],
|
||||
expected: &[
|
||||
"+--------+--------------------------------+-------+",
|
||||
"| region | time | usage |",
|
||||
"+--------+--------------------------------+-------+",
|
||||
"| us | 1970-01-01T00:00:00.000001500Z | 99.0 |",
|
||||
"| us | 1970-01-01T00:00:00.000001Z | 100.0 |",
|
||||
"| us | 1970-01-01T00:00:00.000002500Z | 90.0 |",
|
||||
"| us | 1970-01-01T00:00:00.000002Z | 95.0 |",
|
||||
"+--------+--------------------------------+-------+",
|
||||
"+--------+------+--------------------------------+-------+",
|
||||
"| region | host | time | usage |",
|
||||
"+--------+------+--------------------------------+-------+",
|
||||
"| us | a | 1970-01-01T00:00:00.000001500Z | 99.0 |",
|
||||
"| us | a | 1970-01-01T00:00:00.000001Z | 100.0 |",
|
||||
"| us | a | 1970-01-01T00:00:00.000002500Z | 90.0 |",
|
||||
"| us | a | 1970-01-01T00:00:00.000002Z | 95.0 |",
|
||||
"+--------+------+--------------------------------+-------+",
|
||||
],
|
||||
},
|
||||
TestCase {
|
||||
predicates: &[Predicate::new("host", KeyValue::string("b"))],
|
||||
expected: &[
|
||||
"+--------+--------------------------------+-------+",
|
||||
"| region | time | usage |",
|
||||
"+--------+--------------------------------+-------+",
|
||||
"| us | 1970-01-01T00:00:00.000001500Z | 88.0 |",
|
||||
"| us | 1970-01-01T00:00:00.000001Z | 80.0 |",
|
||||
"| us | 1970-01-01T00:00:00.000002500Z | 99.0 |",
|
||||
"| us | 1970-01-01T00:00:00.000002Z | 92.0 |",
|
||||
"+--------+--------------------------------+-------+",
|
||||
"+--------+------+--------------------------------+-------+",
|
||||
"| region | host | time | usage |",
|
||||
"+--------+------+--------------------------------+-------+",
|
||||
"| us | b | 1970-01-01T00:00:00.000001500Z | 88.0 |",
|
||||
"| us | b | 1970-01-01T00:00:00.000001Z | 80.0 |",
|
||||
"| us | b | 1970-01-01T00:00:00.000002500Z | 99.0 |",
|
||||
"| us | b | 1970-01-01T00:00:00.000002Z | 92.0 |",
|
||||
"+--------+------+--------------------------------+-------+",
|
||||
],
|
||||
},
|
||||
TestCase {
|
||||
|
@ -1853,11 +1867,11 @@ mod tests {
|
|||
|
||||
assert_batches_sorted_eq!(
|
||||
[
|
||||
"+-----------------------------+-------+",
|
||||
"| time | usage |",
|
||||
"+-----------------------------+-------+",
|
||||
"| 1970-01-01T00:00:00.000001Z | 100.0 |",
|
||||
"+-----------------------------+-------+",
|
||||
"+--------+------+-----------------------------+-------+",
|
||||
"| region | host | time | usage |",
|
||||
"+--------+------+-----------------------------+-------+",
|
||||
"| us | a | 1970-01-01T00:00:00.000001Z | 100.0 |",
|
||||
"+--------+------+-----------------------------+-------+",
|
||||
],
|
||||
&batches
|
||||
);
|
||||
|
@ -1903,11 +1917,11 @@ mod tests {
|
|||
|
||||
assert_batches_sorted_eq!(
|
||||
[
|
||||
"+--------+--------------------------+-------+",
|
||||
"| region | time | usage |",
|
||||
"+--------+--------------------------+-------+",
|
||||
"| us | 1970-01-01T00:00:00.500Z | 333.0 |",
|
||||
"+--------+--------------------------+-------+",
|
||||
"+--------+------+--------------------------+-------+",
|
||||
"| region | host | time | usage |",
|
||||
"+--------+------+--------------------------+-------+",
|
||||
"| us | a | 1970-01-01T00:00:00.500Z | 333.0 |",
|
||||
"+--------+------+--------------------------+-------+",
|
||||
],
|
||||
&batches
|
||||
);
|
||||
|
@ -1997,36 +2011,36 @@ mod tests {
|
|||
TestCase {
|
||||
predicates: &[Predicate::new("component_id", KeyValue::string("333"))],
|
||||
expected: &[
|
||||
"+--------+--------+------+---------+-----------------------------+",
|
||||
"| active | type | loc | reading | time |",
|
||||
"+--------+--------+------+---------+-----------------------------+",
|
||||
"| true | camera | fore | 145.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+--------+--------+------+---------+-----------------------------+",
|
||||
"+--------------+--------+--------+------+---------+-----------------------------+",
|
||||
"| component_id | active | type | loc | reading | time |",
|
||||
"+--------------+--------+--------+------+---------+-----------------------------+",
|
||||
"| 333 | true | camera | fore | 145.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+--------------+--------+--------+------+---------+-----------------------------+",
|
||||
],
|
||||
},
|
||||
// Predicate on a non-string field key:
|
||||
TestCase {
|
||||
predicates: &[Predicate::new("active", KeyValue::Bool(false))],
|
||||
expected: &[
|
||||
"+--------------+-------------+---------+---------+-----------------------------+",
|
||||
"| component_id | type | loc | reading | time |",
|
||||
"+--------------+-------------+---------+---------+-----------------------------+",
|
||||
"| 555 | solar-panel | huygens | 200.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| 666 | comms-dish | huygens | 220.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+--------------+-------------+---------+---------+-----------------------------+",
|
||||
"+--------------+--------+-------------+---------+---------+-----------------------------+",
|
||||
"| component_id | active | type | loc | reading | time |",
|
||||
"+--------------+--------+-------------+---------+---------+-----------------------------+",
|
||||
"| 555 | false | solar-panel | huygens | 200.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| 666 | false | comms-dish | huygens | 220.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+--------------+--------+-------------+---------+---------+-----------------------------+",
|
||||
],
|
||||
},
|
||||
// Predicate on a string field key:
|
||||
TestCase {
|
||||
predicates: &[Predicate::new("type", KeyValue::string("camera"))],
|
||||
expected: &[
|
||||
"+--------------+--------+-----------+---------+-----------------------------+",
|
||||
"| component_id | active | loc | reading | time |",
|
||||
"+--------------+--------+-----------+---------+-----------------------------+",
|
||||
"| 111 | true | port | 150.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| 222 | true | starboard | 250.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| 333 | true | fore | 145.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+--------------+--------+-----------+---------+-----------------------------+",
|
||||
"+--------------+--------+--------+-----------+---------+-----------------------------+",
|
||||
"| component_id | active | type | loc | reading | time |",
|
||||
"+--------------+--------+--------+-----------+---------+-----------------------------+",
|
||||
"| 111 | true | camera | port | 150.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| 222 | true | camera | starboard | 250.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| 333 | true | camera | fore | 145.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+--------------+--------+--------+-----------+---------+-----------------------------+",
|
||||
],
|
||||
}
|
||||
];
|
||||
|
@ -2110,39 +2124,39 @@ mod tests {
|
|||
TestCase {
|
||||
predicates: &[Predicate::new("state", KeyValue::string("ca"))],
|
||||
expected: &[
|
||||
"+--------+-------+-------+-----------------------------+",
|
||||
"| county | farm | speed | time |",
|
||||
"+--------+-------+-------+-----------------------------+",
|
||||
"| napa | 10-01 | 50.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| napa | 10-02 | 49.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| nevada | 40-01 | 66.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| orange | 20-01 | 40.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| orange | 20-02 | 33.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| yolo | 30-01 | 62.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+--------+-------+-------+-----------------------------+",
|
||||
"+-------+--------+-------+-------+-----------------------------+",
|
||||
"| state | county | farm | speed | time |",
|
||||
"+-------+--------+-------+-------+-----------------------------+",
|
||||
"| ca | napa | 10-01 | 50.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| ca | napa | 10-02 | 49.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| ca | nevada | 40-01 | 66.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| ca | orange | 20-01 | 40.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| ca | orange | 20-02 | 33.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| ca | yolo | 30-01 | 62.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+-------+--------+-------+-------+-----------------------------+",
|
||||
],
|
||||
},
|
||||
// Predicate on county column, which is part of the series key:
|
||||
TestCase {
|
||||
predicates: &[Predicate::new("county", KeyValue::string("napa"))],
|
||||
expected: &[
|
||||
"+-------+-------+-------+-----------------------------+",
|
||||
"| state | farm | speed | time |",
|
||||
"+-------+-------+-------+-----------------------------+",
|
||||
"| ca | 10-01 | 50.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| ca | 10-02 | 49.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+-------+-------+-------+-----------------------------+",
|
||||
"+-------+--------+-------+-------+-----------------------------+",
|
||||
"| state | county | farm | speed | time |",
|
||||
"+-------+--------+-------+-------+-----------------------------+",
|
||||
"| ca | napa | 10-01 | 50.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| ca | napa | 10-02 | 49.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+-------+--------+-------+-------+-----------------------------+",
|
||||
],
|
||||
},
|
||||
// Predicate on farm column, which is part of the series key:
|
||||
TestCase {
|
||||
predicates: &[Predicate::new("farm", KeyValue::string("30-01"))],
|
||||
expected: &[
|
||||
"+-------+--------+-------+-----------------------------+",
|
||||
"| state | county | speed | time |",
|
||||
"+-------+--------+-------+-----------------------------+",
|
||||
"| ca | yolo | 62.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+-------+--------+-------+-----------------------------+",
|
||||
"+-------+--------+-------+-------+-----------------------------+",
|
||||
"| state | county | farm | speed | time |",
|
||||
"+-------+--------+-------+-------+-----------------------------+",
|
||||
"| ca | yolo | 30-01 | 62.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+-------+--------+-------+-------+-----------------------------+",
|
||||
],
|
||||
},
|
||||
// Predicate on all series key columns:
|
||||
|
@ -2153,11 +2167,11 @@ mod tests {
|
|||
Predicate::new("farm", KeyValue::string("40-01")),
|
||||
],
|
||||
expected: &[
|
||||
"+-------+-----------------------------+",
|
||||
"| speed | time |",
|
||||
"+-------+-----------------------------+",
|
||||
"| 66.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+-------+-----------------------------+",
|
||||
"+-------+--------+-------+-------+-----------------------------+",
|
||||
"| state | county | farm | speed | time |",
|
||||
"+-------+--------+-------+-------+-----------------------------+",
|
||||
"| ca | nevada | 40-01 | 66.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+-------+--------+-------+-------+-----------------------------+",
|
||||
],
|
||||
},
|
||||
];
|
||||
|
@ -2241,39 +2255,39 @@ mod tests {
|
|||
TestCase {
|
||||
predicates: &[Predicate::new("state", KeyValue::string("ca"))],
|
||||
expected: &[
|
||||
"+--------+-------+-------+-----------------------------+",
|
||||
"| county | farm | speed | time |",
|
||||
"+--------+-------+-------+-----------------------------+",
|
||||
"| napa | 10-01 | 50.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| napa | 10-02 | 49.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| nevada | 40-01 | 66.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| orange | 20-01 | 40.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| orange | 20-02 | 33.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| yolo | 30-01 | 62.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+--------+-------+-------+-----------------------------+",
|
||||
"+--------+-------+-------+-------+-----------------------------+",
|
||||
"| county | farm | state | speed | time |",
|
||||
"+--------+-------+-------+-------+-----------------------------+",
|
||||
"| napa | 10-01 | ca | 50.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| napa | 10-02 | ca | 49.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| nevada | 40-01 | ca | 66.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| orange | 20-01 | ca | 40.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| orange | 20-02 | ca | 33.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| yolo | 30-01 | ca | 62.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+--------+-------+-------+-------+-----------------------------+",
|
||||
],
|
||||
},
|
||||
// Predicate on county column, which is part of the series key:
|
||||
TestCase {
|
||||
predicates: &[Predicate::new("county", KeyValue::string("napa"))],
|
||||
expected: &[
|
||||
"+-------+-------+-------+-----------------------------+",
|
||||
"| farm | state | speed | time |",
|
||||
"+-------+-------+-------+-----------------------------+",
|
||||
"| 10-01 | ca | 50.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| 10-02 | ca | 49.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+-------+-------+-------+-----------------------------+",
|
||||
"+--------+-------+-------+-------+-----------------------------+",
|
||||
"| county | farm | state | speed | time |",
|
||||
"+--------+-------+-------+-------+-----------------------------+",
|
||||
"| napa | 10-01 | ca | 50.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"| napa | 10-02 | ca | 49.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+--------+-------+-------+-------+-----------------------------+",
|
||||
],
|
||||
},
|
||||
// Predicate on farm column, which is part of the series key:
|
||||
TestCase {
|
||||
predicates: &[Predicate::new("farm", KeyValue::string("30-01"))],
|
||||
expected: &[
|
||||
"+--------+-------+-------+-----------------------------+",
|
||||
"| county | state | speed | time |",
|
||||
"+--------+-------+-------+-----------------------------+",
|
||||
"| yolo | ca | 62.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+--------+-------+-------+-----------------------------+",
|
||||
"+--------+-------+-------+-------+-----------------------------+",
|
||||
"| county | farm | state | speed | time |",
|
||||
"+--------+-------+-------+-------+-----------------------------+",
|
||||
"| yolo | 30-01 | ca | 62.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+--------+-------+-------+-------+-----------------------------+",
|
||||
],
|
||||
},
|
||||
// Predicate on all series key columns:
|
||||
|
@ -2284,11 +2298,11 @@ mod tests {
|
|||
Predicate::new("farm", KeyValue::string("40-01")),
|
||||
],
|
||||
expected: &[
|
||||
"+-------+-----------------------------+",
|
||||
"| speed | time |",
|
||||
"+-------+-----------------------------+",
|
||||
"| 66.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+-------+-----------------------------+",
|
||||
"+--------+-------+-------+-------+-----------------------------+",
|
||||
"| county | farm | state | speed | time |",
|
||||
"+--------+-------+-------+-------+-----------------------------+",
|
||||
"| nevada | 40-01 | ca | 66.0 | 1970-01-01T00:00:00.000001Z |",
|
||||
"+--------+-------+-------+-------+-----------------------------+",
|
||||
],
|
||||
},
|
||||
];
|
||||
|
@ -2421,22 +2435,22 @@ mod tests {
|
|||
TestCase {
|
||||
predicates: &[Predicate::new("game_id", KeyValue::string("4"))],
|
||||
expected: &[
|
||||
"+-----------+-----------------------------+------+------+",
|
||||
"| player | time | type | zone |",
|
||||
"+-----------+-----------------------------+------+------+",
|
||||
"| bobrovsky | 1970-01-01T00:00:00.000001Z | save | home |",
|
||||
"+-----------+-----------------------------+------+------+",
|
||||
"+---------+-----------+-----------------------------+------+------+",
|
||||
"| game_id | player | time | type | zone |",
|
||||
"+---------+-----------+-----------------------------+------+------+",
|
||||
"| 4 | bobrovsky | 1970-01-01T00:00:00.000001Z | save | home |",
|
||||
"+---------+-----------+-----------------------------+------+------+",
|
||||
],
|
||||
},
|
||||
// Cache that does not have a zone column will produce it with nulls:
|
||||
TestCase {
|
||||
predicates: &[Predicate::new("game_id", KeyValue::string("1"))],
|
||||
expected: &[
|
||||
"+-----------+-----------------------------+------+------+",
|
||||
"| player | time | type | zone |",
|
||||
"+-----------+-----------------------------+------+------+",
|
||||
"| mackinnon | 1970-01-01T00:00:00.000001Z | shot | |",
|
||||
"+-----------+-----------------------------+------+------+",
|
||||
"+---------+-----------+-----------------------------+------+------+",
|
||||
"| game_id | player | time | type | zone |",
|
||||
"+---------+-----------+-----------------------------+------+------+",
|
||||
"| 1 | mackinnon | 1970-01-01T00:00:00.000001Z | shot | |",
|
||||
"+---------+-----------+-----------------------------+------+------+",
|
||||
],
|
||||
},
|
||||
// Pulling from multiple caches will fill in with nulls:
|
||||
|
@ -2545,31 +2559,31 @@ mod tests {
|
|||
TestCase {
|
||||
predicates: &[Predicate::new("t1", KeyValue::string("a"))],
|
||||
expected: &[
|
||||
"+-----+--------------------------------+-----+-----+-----+",
|
||||
"| f1 | time | f2 | f3 | f4 |",
|
||||
"+-----+--------------------------------+-----+-----+-----+",
|
||||
"| 1.0 | 1970-01-01T00:00:00.000001500Z | 2.0 | 3.0 | 4.0 |",
|
||||
"+-----+--------------------------------+-----+-----+-----+",
|
||||
"+----+-----+--------------------------------+-----+-----+-----+",
|
||||
"| t1 | f1 | time | f2 | f3 | f4 |",
|
||||
"+----+-----+--------------------------------+-----+-----+-----+",
|
||||
"| a | 1.0 | 1970-01-01T00:00:00.000001500Z | 2.0 | 3.0 | 4.0 |",
|
||||
"+----+-----+--------------------------------+-----+-----+-----+",
|
||||
],
|
||||
},
|
||||
TestCase {
|
||||
predicates: &[Predicate::new("t1", KeyValue::string("b"))],
|
||||
expected: &[
|
||||
"+------+--------------------------------+----+------+------+",
|
||||
"| f1 | time | f2 | f3 | f4 |",
|
||||
"+------+--------------------------------+----+------+------+",
|
||||
"| 10.0 | 1970-01-01T00:00:00.000001500Z | | 30.0 | 40.0 |",
|
||||
"+------+--------------------------------+----+------+------+",
|
||||
"+----+------+--------------------------------+----+------+------+",
|
||||
"| t1 | f1 | time | f2 | f3 | f4 |",
|
||||
"+----+------+--------------------------------+----+------+------+",
|
||||
"| b | 10.0 | 1970-01-01T00:00:00.000001500Z | | 30.0 | 40.0 |",
|
||||
"+----+------+--------------------------------+----+------+------+",
|
||||
],
|
||||
},
|
||||
TestCase {
|
||||
predicates: &[Predicate::new("t1", KeyValue::string("c"))],
|
||||
expected: &[
|
||||
"+-------+--------------------------------+-------+-------+----+",
|
||||
"| f1 | time | f2 | f3 | f4 |",
|
||||
"+-------+--------------------------------+-------+-------+----+",
|
||||
"| 100.0 | 1970-01-01T00:00:00.000001500Z | 200.0 | 300.0 | |",
|
||||
"+-------+--------------------------------+-------+-------+----+",
|
||||
"+----+-------+--------------------------------+-------+-------+----+",
|
||||
"| t1 | f1 | time | f2 | f3 | f4 |",
|
||||
"+----+-------+--------------------------------+-------+-------+----+",
|
||||
"| c | 100.0 | 1970-01-01T00:00:00.000001500Z | 200.0 | 300.0 | |",
|
||||
"+----+-------+--------------------------------+-------+-------+----+",
|
||||
],
|
||||
},
|
||||
// Can query accross key column values:
|
|
@ -0,0 +1,117 @@
|
|||
use std::{any::Any, sync::Arc};
|
||||
|
||||
use arrow::datatypes::SchemaRef;
|
||||
use async_trait::async_trait;
|
||||
use datafusion::{
|
||||
common::{plan_err, Result},
|
||||
datasource::{function::TableFunctionImpl, TableProvider, TableType},
|
||||
execution::context::SessionState,
|
||||
logical_expr::{Expr, TableProviderFilterPushDown},
|
||||
physical_plan::{memory::MemoryExec, ExecutionPlan},
|
||||
scalar::ScalarValue,
|
||||
};
|
||||
|
||||
use super::LastCacheProvider;
|
||||
|
||||
struct LastCacheFunctionProvider {
|
||||
db_name: String,
|
||||
table_name: String,
|
||||
cache_name: String,
|
||||
schema: SchemaRef,
|
||||
provider: Arc<LastCacheProvider>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl TableProvider for LastCacheFunctionProvider {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self as &dyn Any
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
Arc::clone(&self.schema)
|
||||
}
|
||||
|
||||
fn table_type(&self) -> TableType {
|
||||
TableType::Temporary
|
||||
}
|
||||
|
||||
fn supports_filters_pushdown(
|
||||
&self,
|
||||
filters: &[&Expr],
|
||||
) -> Result<Vec<TableProviderFilterPushDown>> {
|
||||
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
|
||||
}
|
||||
|
||||
async fn scan(
|
||||
&self,
|
||||
ctx: &SessionState,
|
||||
projection: Option<&Vec<usize>>,
|
||||
filters: &[Expr],
|
||||
_limit: Option<usize>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
let read = self.provider.cache_map.read();
|
||||
let batches = if let Some(cache) = read
|
||||
.get(&self.db_name)
|
||||
.and_then(|db| db.get(&self.table_name))
|
||||
.and_then(|tbl| tbl.get(&self.cache_name))
|
||||
{
|
||||
let predicates = cache.convert_filter_exprs(filters);
|
||||
cache.to_record_batches(&predicates)?
|
||||
} else {
|
||||
// If there is no cache, it means that it was removed, in which case, we just return
|
||||
// an empty set of record batches.
|
||||
vec![]
|
||||
};
|
||||
let mut exec = MemoryExec::try_new(&[batches], self.schema(), projection.cloned())?;
|
||||
|
||||
let show_sizes = ctx.config_options().explain.show_sizes;
|
||||
exec = exec.with_show_sizes(show_sizes);
|
||||
|
||||
Ok(Arc::new(exec))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LastCacheFunction {
|
||||
db_name: String,
|
||||
provider: Arc<LastCacheProvider>,
|
||||
}
|
||||
|
||||
impl LastCacheFunction {
|
||||
pub fn new(db_name: impl Into<String>, provider: Arc<LastCacheProvider>) -> Self {
|
||||
Self {
|
||||
db_name: db_name.into(),
|
||||
provider,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TableFunctionImpl for LastCacheFunction {
|
||||
fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
|
||||
let Some(Expr::Literal(ScalarValue::Utf8(Some(table_name)))) = args.first() else {
|
||||
return plan_err!("first argument must be the table name as a string");
|
||||
};
|
||||
|
||||
let cache_name = match args.get(1) {
|
||||
Some(Expr::Literal(ScalarValue::Utf8(Some(name)))) => Some(name),
|
||||
Some(_) => {
|
||||
return plan_err!("second argument, if passed, must be the cache name as a string")
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
match self.provider.get_cache_name_and_schema(
|
||||
&self.db_name,
|
||||
table_name,
|
||||
cache_name.map(|x| x.as_str()),
|
||||
) {
|
||||
Some((cache_name, schema)) => Ok(Arc::new(LastCacheFunctionProvider {
|
||||
db_name: self.db_name.clone(),
|
||||
table_name: table_name.clone(),
|
||||
cache_name,
|
||||
schema,
|
||||
provider: Arc::clone(&self.provider),
|
||||
})),
|
||||
None => plan_err!("could not find cache for the given arguments"),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -9,7 +9,7 @@
|
|||
pub mod cache;
|
||||
pub mod catalog;
|
||||
mod chunk;
|
||||
mod last_cache;
|
||||
pub mod last_cache;
|
||||
pub mod paths;
|
||||
pub mod persister;
|
||||
pub mod wal;
|
||||
|
|
Loading…
Reference in New Issue