fix: last cache catalog configuration tracks explicit vs. non-explicit value columns (#25185)

* fix: catalog support for last caches that accept new fields

Last cache definitions in the catalog were augmented to either store an
explicit set of column names (including time), or to accept new fields.

This will allow these caches to be loaded properly on server restart such
that all non-key columns are cached.

* refactor: use tagged serialization for last cache values def

This also updated the client code to accept the new structure in
influxdb3_client.

* test: add e2e tests to catch regressions in influxdb3_client

* chore: cargo update for audit
pull/25193/head
Trevor Hilton 2024-07-24 11:00:40 -04:00 committed by GitHub
parent dfecf570e6
commit 10dd22b6de
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 259 additions and 123 deletions

149
Cargo.lock generated
View File

@ -133,9 +133,9 @@ checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
[[package]]
name = "arrayref"
version = "0.3.7"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545"
checksum = "9d151e35f61089500b617991b791fc8bfd237ae50cd5950803758a179b41e67a"
[[package]]
name = "arrayvec"
@ -434,9 +434,9 @@ dependencies = [
[[package]]
name = "async-compression"
version = "0.4.11"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd066d0b4ef8ecb03a55319dc13aa6910616d0f44008a045bb1835af830abff5"
checksum = "fec134f64e2bc57411226dfc4e52dec859ddfc7e711fc5e07b612584f000e4aa"
dependencies = [
"bzip2",
"flate2",
@ -469,7 +469,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -480,7 +480,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -766,9 +766,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.1.5"
version = "1.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "324c74f2155653c90b04f25b2a47a8a631360cb908f92a772695f430c7e31052"
checksum = "2aba8f4e9906c7ce3c73463f62a7f0c65183ada1a2d47e397cc8810827f9694f"
dependencies = [
"jobserver",
"libc",
@ -825,9 +825,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.5.9"
version = "4.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64acc1846d54c1fe936a78dc189c34e28d3f5afc348403f28ecf53660b9b8462"
checksum = "8f6b81fb3c84f5563d509c59b5a48d935f689e993afa90fe39047f05adef9142"
dependencies = [
"clap_builder",
"clap_derive",
@ -867,9 +867,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.5.9"
version = "4.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fb8393d67ba2e7bfaf28a23458e4e2b543cc73a99595511eb207fdb8aede942"
checksum = "5ca6706fd5224857d9ac5eb9355f6683563cc0541c7cd9d014043b57cbec78ac"
dependencies = [
"anstream",
"anstyle",
@ -886,7 +886,7 @@ dependencies = [
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -1177,7 +1177,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -1201,7 +1201,7 @@ dependencies = [
"proc-macro2",
"quote",
"strsim",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -1212,7 +1212,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806"
dependencies = [
"darling_core",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -1899,7 +1899,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -2270,7 +2270,7 @@ dependencies = [
"http 1.1.0",
"hyper 1.4.1",
"hyper-util",
"rustls 0.23.11",
"rustls 0.23.12",
"rustls-native-certs 0.7.1",
"rustls-pki-types",
"tokio",
@ -2905,9 +2905,9 @@ checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
[[package]]
name = "jobserver"
version = "0.1.31"
version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2b099aaa34a9751c5bf0878add70444e1ed2dd73f347be99003d4577277de6e"
checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0"
dependencies = [
"libc",
]
@ -3180,13 +3180,14 @@ dependencies = [
[[package]]
name = "mio"
version = "0.8.11"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
checksum = "4569e456d394deccd22ce1c1913e6ea0e54519f577285001215d33557431afe4"
dependencies = [
"hermit-abi",
"libc",
"wasi",
"windows-sys 0.48.0",
"windows-sys 0.52.0",
]
[[package]]
@ -3395,18 +3396,18 @@ dependencies = [
[[package]]
name = "object"
version = "0.36.1"
version = "0.36.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "081b846d1d56ddfc18fdf1a922e4f6e07a11768ea1b92dec44e42b72712ccfce"
checksum = "3f203fa8daa7bb185f760ae12bd8e097f63d17041dcdcaf675ac54cdf863170e"
dependencies = [
"memchr",
]
[[package]]
name = "object_store"
version = "0.10.1"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbebfd32c213ba1907fa7a9c9138015a8de2b43e30c5aa45b18f7deb46786ad6"
checksum = "e6da452820c715ce78221e8202ccc599b4a52f3e1eb3eedb487b680c81a8e3f3"
dependencies = [
"async-trait",
"base64 0.22.1",
@ -3415,7 +3416,7 @@ dependencies = [
"futures",
"humantime",
"hyper 1.4.1",
"itertools 0.12.1",
"itertools 0.13.0",
"md-5",
"parking_lot",
"percent-encoding",
@ -3731,7 +3732,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -3847,7 +3848,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e"
dependencies = [
"proc-macro2",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -3942,7 +3943,7 @@ dependencies = [
"prost 0.12.6",
"prost-types 0.12.6",
"regex",
"syn 2.0.71",
"syn 2.0.72",
"tempfile",
]
@ -3969,7 +3970,7 @@ dependencies = [
"itertools 0.12.1",
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -4014,9 +4015,9 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quick-xml"
version = "0.31.0"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33"
checksum = "96a05e2e8efddfa51a84ca47cec303fac86c8541b686d37cac5efc0e094417bc"
dependencies = [
"memchr",
"serde",
@ -4033,7 +4034,7 @@ dependencies = [
"quinn-proto",
"quinn-udp",
"rustc-hash",
"rustls 0.23.11",
"rustls 0.23.12",
"thiserror",
"tokio",
"tracing",
@ -4049,7 +4050,7 @@ dependencies = [
"rand",
"ring",
"rustc-hash",
"rustls 0.23.11",
"rustls 0.23.12",
"slab",
"thiserror",
"tinyvec",
@ -4058,14 +4059,13 @@ dependencies = [
[[package]]
name = "quinn-udp"
version = "0.5.2"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9096629c45860fc7fb143e125eb826b5e721e10be3263160c7d60ca832cf8c46"
checksum = "8bffec3605b73c6f1754535084a85229fa8a30f86014e6c81aeec4abb68b0285"
dependencies = [
"libc",
"once_cell",
"socket2",
"tracing",
"windows-sys 0.52.0",
]
@ -4270,7 +4270,7 @@ dependencies = [
"percent-encoding",
"pin-project-lite",
"quinn",
"rustls 0.23.11",
"rustls 0.23.12",
"rustls-native-certs 0.7.1",
"rustls-pemfile 2.1.2",
"rustls-pki-types",
@ -4380,21 +4380,21 @@ dependencies = [
"log",
"ring",
"rustls-pki-types",
"rustls-webpki 0.102.5",
"rustls-webpki 0.102.6",
"subtle",
"zeroize",
]
[[package]]
name = "rustls"
version = "0.23.11"
version = "0.23.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4828ea528154ae444e5a642dbb7d5623354030dc9822b83fd9bb79683c7399d0"
checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044"
dependencies = [
"once_cell",
"ring",
"rustls-pki-types",
"rustls-webpki 0.102.5",
"rustls-webpki 0.102.6",
"subtle",
"zeroize",
]
@ -4461,9 +4461,9 @@ dependencies = [
[[package]]
name = "rustls-webpki"
version = "0.102.5"
version = "0.102.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9a6fccd794a42c2c105b513a2f62bc3fd8f3ba57a4593677ceb0bd035164d78"
checksum = "8e6b52d4fda176fd835fdc55a835d4a89b8499cad995885a21149d5ad62f852e"
dependencies = [
"ring",
"rustls-pki-types",
@ -4603,7 +4603,7 @@ checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -4656,7 +4656,7 @@ dependencies = [
"darling",
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -4755,9 +4755,9 @@ dependencies = [
[[package]]
name = "similar"
version = "2.5.0"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa42c91313f1d05da9b26f267f931cf178d4aba455b4c4622dd7355eb80c6640"
checksum = "1de1d4f81173b03af4c0cbed3c898f6bff5b870e4a7f5d6f4057d62a7a4b686e"
[[package]]
name = "siphasher"
@ -4826,7 +4826,7 @@ dependencies = [
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -4892,7 +4892,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -5149,7 +5149,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -5171,9 +5171,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.71"
version = "2.0.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b146dcf730474b4bcd16c311627b31ede9ab149045db4d6088b3becaea046462"
checksum = "dc4b9b9bf2add8093d3f2c0204471e951b2285580335de42f9d2534f3ae7a8af"
dependencies = [
"proc-macro2",
"quote",
@ -5268,22 +5268,22 @@ dependencies = [
[[package]]
name = "thiserror"
version = "1.0.62"
version = "1.0.63"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2675633b1499176c2dff06b0856a27976a8f9d436737b4cf4f312d4d91d8bbb"
checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.62"
version = "1.0.63"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d20468752b09f49e909e55a5d338caa8bedf615594e9d80bc4c565d30faf798c"
checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -5406,22 +5406,21 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.38.0"
version = "1.39.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a"
checksum = "d040ac2b29ab03b09d4129c2f5bbd012a3ac2f79d38ff506a4bf8dd34b0eac8a"
dependencies = [
"backtrace",
"bytes",
"libc",
"mio",
"num_cpus",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"tracing",
"windows-sys 0.48.0",
"windows-sys 0.52.0",
]
[[package]]
@ -5436,13 +5435,13 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "2.3.0"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a"
checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -5472,7 +5471,7 @@ version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4"
dependencies = [
"rustls 0.23.11",
"rustls 0.23.12",
"rustls-pki-types",
"tokio",
]
@ -5593,7 +5592,7 @@ dependencies = [
"proc-macro2",
"prost-build",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -5726,7 +5725,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -6042,7 +6041,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
"wasm-bindgen-shared",
]
@ -6076,7 +6075,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@ -6428,7 +6427,7 @@ dependencies = [
"strum",
"subtle",
"syn 1.0.109",
"syn 2.0.71",
"syn 2.0.72",
"thrift",
"tokio",
"tokio-stream",
@ -6482,7 +6481,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]
@ -6502,7 +6501,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.71",
"syn 2.0.72",
]
[[package]]

View File

@ -0,0 +1,76 @@
//! End-to-end tests for the influxdb3_client
//!
//! This is useful for verifying that the client can parse API responses from the server
use influxdb3_client::{Format, LastCacheCreatedResponse, Precision};
use crate::TestServer;
#[tokio::test]
async fn write_and_query() {
let server = TestServer::spawn().await;
let db_name = "foo";
let tbl_name = "bar";
let client = influxdb3_client::Client::new(server.client_addr()).unwrap();
client
.api_v3_write_lp(db_name)
.precision(Precision::Nanosecond)
.accept_partial(false)
.body(format!("{tbl_name},t1=a,t2=aa f1=123"))
.send()
.await
.expect("make write_lp request");
client
.api_v3_query_sql(db_name, format!("SELECT * FROM {tbl_name}"))
.format(Format::Json)
.send()
.await
.expect("query SQL for JSON response");
client
.api_v3_query_influxql(db_name, format!("SELECT * FROM {tbl_name}"))
.format(Format::Csv)
.send()
.await
.expect("query InfluxQL for CSV response");
}
#[tokio::test]
async fn configure_last_caches() {
let server = TestServer::spawn().await;
let db_name = "foo";
let tbl_name = "bar";
let client = influxdb3_client::Client::new(server.client_addr()).unwrap();
client
.api_v3_write_lp(db_name)
.precision(Precision::Nanosecond)
.accept_partial(false)
.body(format!("{tbl_name},t1=a,t2=aa f1=123"))
.send()
.await
.expect("make write_lp request");
let Some(LastCacheCreatedResponse { name, .. }) = client
.api_v3_configure_last_cache_create(db_name, tbl_name)
.send()
.await
.expect("send create last cache with defaults")
else {
panic!("should have created the cache");
};
client
.api_v3_configure_last_cache_delete(db_name, tbl_name, name)
.await
.expect("deletes the cache");
let Some(LastCacheCreatedResponse { name, .. }) = client
.api_v3_configure_last_cache_create(db_name, tbl_name)
.value_columns(["f1"])
.send()
.await
.expect("send create last cache with explicit value columns")
else {
panic!("should have created the cache");
};
client
.api_v3_configure_last_cache_delete(db_name, tbl_name, name)
.await
.expect("should delete the cache");
}

View File

@ -14,6 +14,7 @@ use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use reqwest::Response;
mod auth;
mod client;
mod configure;
mod flight;
mod limits;

View File

@ -158,6 +158,7 @@ async fn last_caches_table() {
"db": db1_name,
"table": "mem",
"name": "mem_last_cache",
"value_columns": ["usage"],
"ttl": 60
}))
.await
@ -182,13 +183,14 @@ async fn last_caches_table() {
.await
.unwrap();
let batches = collect_stream(resp).await;
assert_batches_sorted_eq!([
"+-------+---------------------+----------------+----------------------------+-------+-------+",
"| table | name | key_columns | value_columns | count | ttl |",
"+-------+---------------------+----------------+----------------------------+-------+-------+",
"| cpu | cpu_host_last_cache | [host] | [cpu, region, time, usage] | 1 | 14400 |",
"| mem | mem_last_cache | [host, region] | [time, usage] | 1 | 60 |",
"+-------+---------------------+----------------+----------------------------+-------+-------+",
assert_batches_sorted_eq!(
[
"+-------+---------------------+----------------+---------------+-------+-------+",
"| table | name | key_columns | value_columns | count | ttl |",
"+-------+---------------------+----------------+---------------+-------+-------+",
"| cpu | cpu_host_last_cache | [host] | | 1 | 14400 |",
"| mem | mem_last_cache | [host, region] | [time, usage] | 1 | 60 |",
"+-------+---------------------+----------------+---------------+-------+-------+",
],
&batches
);
@ -205,7 +207,7 @@ async fn last_caches_table() {
"+-------+--------------------------------+---------------------+---------------+-------+-------+",
"| table | name | key_columns | value_columns | count | ttl |",
"+-------+--------------------------------+---------------------+---------------+-------+-------+",
"| cpu | cpu_cpu_host_region_last_cache | [cpu, host, region] | [time, usage] | 5 | 14400 |",
"| cpu | cpu_cpu_host_region_last_cache | [cpu, host, region] | | 5 | 14400 |",
"+-------+--------------------------------+---------------------+---------------+-------+-------+",
],
&batches
@ -246,7 +248,8 @@ async fn last_caches_table() {
}
// Add fields to one of the caches, in this case, the `temp` field will get added to the
// value columns for the respective cache:
// value columns for the respective cache, but since this cache accepts new fields, the value
// columns are not shown in the system table result:
{
server
.write_lp_to_db(
@ -265,11 +268,11 @@ async fn last_caches_table() {
.unwrap();
let batches = collect_stream(resp).await;
assert_batches_sorted_eq!([
"+-------+--------------------------------+---------------------+---------------------+-------+-------+",
"| table | name | key_columns | value_columns | count | ttl |",
"+-------+--------------------------------+---------------------+---------------------+-------+-------+",
"| cpu | cpu_cpu_host_region_last_cache | [cpu, host, region] | [time, usage, temp] | 5 | 14400 |",
"+-------+--------------------------------+---------------------+---------------------+-------+-------+",
"+-------+--------------------------------+---------------------+---------------+-------+-------+",
"| table | name | key_columns | value_columns | count | ttl |",
"+-------+--------------------------------+---------------------+---------------+-------+-------+",
"| cpu | cpu_cpu_host_region_last_cache | [cpu, host, region] | | 5 | 14400 |",
"+-------+--------------------------------+---------------------+---------------+-------+-------+",
],
&batches
);

View File

@ -713,13 +713,24 @@ pub struct LastCacheCreatedResponse {
/// Columns intended to be used as predicates in the cache
pub key_columns: Vec<String>,
/// Columns that store values in the cache
pub value_columns: Vec<String>,
pub value_columns: LastCacheValueColumnsDef,
/// The number of last values to hold in the cache
pub count: usize,
/// The time-to-live (TTL) in seconds for entries in the cache
pub ttl: u64,
}
/// A last cache will either store values for an explicit set of columns, or will accept all
/// non-key columns
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum LastCacheValueColumnsDef {
/// Explicit list of column names
Explicit { columns: Vec<String> },
/// Stores all non-key columns
AllNonKeyColumns,
}
#[cfg(test)]
mod tests {
use mockito::{Matcher, Server};
@ -989,7 +1000,10 @@ mod tests {
"table": "table",
"name": "cache_name",
"key_columns": ["col1", "col2"],
"value_columns": ["col3", "col4"],
"value_columns": {
"type": "explicit",
"columns": ["col3", "col4"]
},
"ttl": 120,
"count": 5
}"#,

View File

@ -4,7 +4,10 @@ use arrow::array::{GenericListBuilder, StringBuilder};
use arrow_array::{ArrayRef, RecordBatch, StringArray, UInt64Array};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::{error::DataFusionError, logical_expr::Expr};
use influxdb3_write::{catalog::LastCacheDefinition, last_cache::LastCacheProvider};
use influxdb3_write::{
catalog::{LastCacheDefinition, LastCacheValueColumnsDef},
last_cache::LastCacheProvider,
};
use iox_system_tables::IoxSystemTable;
pub(super) struct LastCachesTable {
@ -35,7 +38,7 @@ fn last_caches_schema() -> SchemaRef {
Field::new(
"value_columns",
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
false,
true,
),
Field::new("count", DataType::UInt64, false),
Field::new("ttl", DataType::UInt64, false),
@ -98,10 +101,17 @@ fn from_last_cache_definitions(
let mut builder = GenericListBuilder::<i32, _>::new(values_builder);
for c in caches {
c.value_columns
.iter()
.for_each(|v| builder.values().append_value(v));
builder.append(true);
match &c.value_columns {
LastCacheValueColumnsDef::Explicit { columns } => {
columns
.iter()
.for_each(|v| builder.values().append_value(v));
builder.append(true);
}
LastCacheValueColumnsDef::AllNonKeyColumns => {
builder.append_null();
}
}
}
Arc::new(builder.finish())
});

View File

@ -375,7 +375,7 @@ pub struct LastCacheDefinition {
/// Columns intended to be used as predicates in the cache
pub key_columns: Vec<String>,
/// Columns that store values in the cache
pub value_columns: Vec<String>,
pub value_columns: LastCacheValueColumnsDef,
/// The number of last values to hold in the cache
pub count: LastCacheSize,
/// The time-to-live (TTL) in seconds for entries in the cache
@ -397,7 +397,9 @@ impl LastCacheDefinition {
table: table.into(),
name: name.into(),
key_columns: key_columns.into_iter().map(Into::into).collect(),
value_columns: value_columns.into_iter().map(Into::into).collect(),
value_columns: LastCacheValueColumnsDef::Explicit {
columns: value_columns.into_iter().map(Into::into).collect(),
},
count: count.try_into()?,
ttl,
})
@ -412,19 +414,36 @@ impl LastCacheDefinition {
table: table.into(),
name: name.into(),
key_columns: cache.key_columns.iter().cloned().collect(),
value_columns: cache
.schema
.fields()
.iter()
.filter(|f| !cache.key_columns.contains(f.name()))
.map(|f| f.name().to_owned())
.collect(),
value_columns: if cache.accept_new_fields {
LastCacheValueColumnsDef::AllNonKeyColumns
} else {
LastCacheValueColumnsDef::Explicit {
columns: cache
.schema
.fields()
.iter()
.filter(|f| !cache.key_columns.contains(f.name()))
.map(|f| f.name().to_owned())
.collect(),
}
},
count: cache.count,
ttl: cache.ttl.as_secs(),
}
}
}
/// A last cache will either store values for an explicit set of columns, or will accept all
/// non-key columns
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum LastCacheValueColumnsDef {
/// Explicit list of column names
Explicit { columns: Vec<String> },
/// Stores all non-key columns
AllNonKeyColumns,
}
/// The maximum allowed size for a last cache
pub const LAST_CACHE_MAX_SIZE: usize = 10;

View File

@ -4,7 +4,7 @@ use arrow::datatypes::DataType as ArrowDataType;
use schema::{InfluxColumnType, SchemaBuilder};
use serde::{Deserialize, Serialize};
use super::{LastCacheDefinition, TableDefinition};
use super::{LastCacheDefinition, LastCacheValueColumnsDef, TableDefinition};
impl Serialize for TableDefinition {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
@ -267,7 +267,7 @@ struct LastCacheSnapshot<'a> {
table: &'a str,
name: &'a str,
keys: Vec<&'a str>,
vals: Vec<&'a str>,
vals: Option<Vec<&'a str>>,
n: usize,
ttl: u64,
}
@ -278,7 +278,12 @@ impl<'a> From<&'a LastCacheDefinition> for LastCacheSnapshot<'a> {
table: &lcd.table,
name: &lcd.name,
keys: lcd.key_columns.iter().map(|v| v.as_str()).collect(),
vals: lcd.value_columns.iter().map(|v| v.as_str()).collect(),
vals: match &lcd.value_columns {
LastCacheValueColumnsDef::Explicit { columns } => {
Some(columns.iter().map(|v| v.as_str()).collect())
}
LastCacheValueColumnsDef::AllNonKeyColumns => None,
},
n: lcd.count.into(),
ttl: lcd.ttl,
}
@ -291,7 +296,12 @@ impl<'a> From<LastCacheSnapshot<'a>> for LastCacheDefinition {
table: snap.table.to_string(),
name: snap.name.to_string(),
key_columns: snap.keys.iter().map(|s| s.to_string()).collect(),
value_columns: snap.vals.iter().map(|s| s.to_string()).collect(),
value_columns: match snap.vals {
Some(cols) => LastCacheValueColumnsDef::Explicit {
columns: cols.iter().map(|s| s.to_string()).collect(),
},
None => LastCacheValueColumnsDef::AllNonKeyColumns,
},
count: snap
.n
.try_into()

View File

@ -27,7 +27,7 @@ use parking_lot::RwLock;
use schema::{InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME};
use crate::{
catalog::{InnerCatalog, LastCacheDefinition, LastCacheSize},
catalog::{InnerCatalog, LastCacheDefinition, LastCacheSize, LastCacheValueColumnsDef},
write_buffer::{buffer_segment::WriteBatch, Field, FieldData, Row},
};
@ -131,7 +131,11 @@ impl LastCacheProvider {
count: Some(cache_def.count.into()),
ttl: Some(Duration::from_secs(cache_def.ttl)),
key_columns: Some(cache_def.key_columns.clone()),
value_columns: Some(cache_def.value_columns.clone()),
value_columns: match &cache_def.value_columns {
LastCacheValueColumnsDef::Explicit { columns } =>
Some(columns.clone()),
LastCacheValueColumnsDef::AllNonKeyColumns => None,
},
})?
.is_some(),
"catalog should not contain duplicate last cache definitions"
@ -337,7 +341,13 @@ impl LastCacheProvider {
table: tbl_name,
name: cache_name,
key_columns,
value_columns,
value_columns: if accept_new_fields {
LastCacheValueColumnsDef::AllNonKeyColumns
} else {
LastCacheValueColumnsDef::Explicit {
columns: value_columns,
}
},
count,
ttl: ttl.as_secs(),
}))
@ -471,14 +481,14 @@ pub(crate) struct LastCache {
pub(crate) key_columns: Arc<IndexSet<String>>,
/// The Arrow Schema for the table that this cache is associated with
pub(crate) schema: ArrowSchemaRef,
/// Whether or not this cache accepts newly written fields
pub(crate) accept_new_fields: bool,
/// Optionally store the series key for tables that use it for ensuring non-nullability in the
/// column buffer for series key columns
///
/// We only use this to check for columns that are part of the series key, so we don't care
/// about the order, and a HashSet is sufficient.
series_key: Option<HashSet<String>>,
/// Whether or not this cache accepts newly written fields
accept_new_fields: bool,
/// The internal state of the cache
state: LastCacheState,
}

View File

@ -49,10 +49,7 @@ expression: catalog_json
"name": "cache",
"table": "table",
"ttl": 14400,
"vals": [
"f1",
"time"
]
"vals": null
}
],
"name": "table"

View File

@ -44,10 +44,7 @@ expression: catalog_json
"name": "cache",
"table": "table",
"ttl": 14400,
"vals": [
"f1",
"time"
]
"vals": null
}
],
"name": "table"