refactor: rename metadata cache to distinct value cache (#25775)

pull/25781/head
Trevor Hilton 2025-01-10 08:48:51 -05:00 committed by GitHub
parent 7b4703d69e
commit c71dafc313
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 467 additions and 438 deletions

View File

@ -39,7 +39,7 @@ impl Config {
},
..
})
| SubCommand::MetaCache(MetaCacheConfig {
| SubCommand::DistinctCache(DistinctCacheConfig {
influxdb3_config:
InfluxDb3Config {
host_url,
@ -95,9 +95,9 @@ pub enum SubCommand {
/// Create a new last value cache
#[clap(name = "last_cache")]
LastCache(LastCacheConfig),
/// Create a new metadata cache
#[clap(name = "meta_cache")]
MetaCache(MetaCacheConfig),
/// Create a new distinct value cache
#[clap(name = "distinct_cache")]
DistinctCache(DistinctCacheConfig),
/// Create a new processing engine plugin
Plugin(PluginConfig),
/// Create a new table in a database
@ -167,7 +167,7 @@ pub struct LastCacheConfig {
}
#[derive(Debug, clap::Args)]
pub struct MetaCacheConfig {
pub struct DistinctCacheConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
@ -298,7 +298,7 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
None => println!("a cache already exists for the provided parameters"),
}
}
SubCommand::MetaCache(MetaCacheConfig {
SubCommand::DistinctCache(DistinctCacheConfig {
influxdb3_config: InfluxDb3Config { database_name, .. },
table,
cache_name,
@ -306,7 +306,8 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
max_cardinality,
max_age,
}) => {
let mut b = client.api_v3_configure_meta_cache_create(database_name, table, columns);
let mut b =
client.api_v3_configure_distinct_cache_create(database_name, table, columns);
// Add the optional stuff:
if let Some(name) = cache_name {
@ -323,7 +324,7 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
Some(def) => println!(
"new cache created: {}",
serde_json::to_string_pretty(&def)
.expect("serialize meta cache definition as JSON")
.expect("serialize distinct cache definition as JSON")
),
None => println!("a cache already exists for the provided parameters"),
}

View File

@ -29,7 +29,7 @@ impl Config {
},
..
})
| SubCommand::MetaCache(MetaCacheConfig {
| SubCommand::DistinctCache(DistinctCacheConfig {
influxdb3_config:
InfluxDb3Config {
host_url,
@ -82,9 +82,9 @@ pub enum SubCommand {
/// Delete a last value cache
#[clap(name = "last_cache")]
LastCache(LastCacheConfig),
/// Delete a meta value cache
#[clap(name = "meta_cache")]
MetaCache(MetaCacheConfig),
/// Delete a distinct value cache
#[clap(name = "distinct_cache")]
DistinctCache(DistinctCacheConfig),
/// Delete an existing processing engine plugin
Plugin(PluginConfig),
/// Delete a table in a database
@ -128,7 +128,7 @@ pub struct LastCacheConfig {
}
#[derive(Debug, clap::Args)]
pub struct MetaCacheConfig {
pub struct DistinctCacheConfig {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,
@ -203,16 +203,16 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
println!("last cache deleted successfully");
}
SubCommand::MetaCache(MetaCacheConfig {
SubCommand::DistinctCache(DistinctCacheConfig {
influxdb3_config: InfluxDb3Config { database_name, .. },
table,
cache_name,
}) => {
client
.api_v3_configure_meta_cache_delete(database_name, table, cache_name)
.api_v3_configure_distinct_cache_delete(database_name, table, cache_name)
.await?;
println!("meta cache deleted successfully");
println!("distinct cache deleted successfully");
}
SubCommand::Plugin(PluginConfig {
influxdb3_config: InfluxDb3Config { database_name, .. },

View File

@ -3,8 +3,8 @@
use anyhow::{bail, Context};
use datafusion_util::config::register_iox_object_store;
use influxdb3_cache::{
distinct_cache::DistinctCacheProvider,
last_cache::{self, LastCacheProvider},
meta_cache::MetaCacheProvider,
parquet_cache::create_cached_obj_store_and_oracle,
};
use influxdb3_clap_blocks::{
@ -95,8 +95,8 @@ pub enum Error {
#[error("failed to initialize last cache: {0}")]
InitializeLastCache(#[source] last_cache::Error),
#[error("failed to initialize meta cache: {0:#}")]
InitializeMetaCache(#[source] influxdb3_cache::meta_cache::ProviderError),
#[error("failed to initialize distinct cache: {0:#}")]
InitializeDistinctCache(#[source] influxdb3_cache::distinct_cache::ProviderError),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -285,15 +285,15 @@ pub struct Config {
)]
pub last_cache_eviction_interval: humantime::Duration,
/// The interval on which to evict expired entries from the Last-N-Value cache, expressed as a
/// The interval on which to evict expired entries from the Distinct Value cache, expressed as a
/// human-readable time, e.g., "20s", "1m", "1h".
#[clap(
long = "meta-cache-eviction-interval",
env = "INFLUXDB3_META_CACHE_EVICTION_INTERVAL",
long = "distinct-cache-eviction-interval",
env = "INFLUXDB3_DISTINCT_CACHE_EVICTION_INTERVAL",
default_value = "10s",
action
)]
pub meta_cache_eviction_interval: humantime::Duration,
pub distinct_cache_eviction_interval: humantime::Duration,
/// The local directory that has python plugins and their test files.
#[clap(long = "plugin-dir", env = "INFLUXDB3_PLUGIN_DIR", action)]
@ -486,18 +486,18 @@ pub async fn command(config: Config) -> Result<()> {
)
.map_err(Error::InitializeLastCache)?;
let meta_cache = MetaCacheProvider::new_from_catalog_with_background_eviction(
let distinct_cache = DistinctCacheProvider::new_from_catalog_with_background_eviction(
Arc::clone(&time_provider) as _,
Arc::clone(&catalog),
config.meta_cache_eviction_interval.into(),
config.distinct_cache_eviction_interval.into(),
)
.map_err(Error::InitializeMetaCache)?;
.map_err(Error::InitializeDistinctCache)?;
let write_buffer_impl = WriteBufferImpl::new(WriteBufferImplArgs {
persister: Arc::clone(&persister),
catalog: Arc::clone(&catalog),
last_cache,
meta_cache,
distinct_cache,
time_provider: Arc::<SystemProvider>::clone(&time_provider),
executor: Arc::clone(&exec),
wal_config,

View File

@ -364,7 +364,7 @@ async fn test_delete_missing_table() {
}
#[tokio::test]
async fn test_create_delete_meta_cache() {
async fn test_create_delete_distinct_cache() {
let server = TestServer::spawn().await;
let server_addr = server.client_addr();
let db_name = "foo";
@ -381,7 +381,7 @@ async fn test_create_delete_meta_cache() {
// first create the cache:
let result = run(&[
"create",
"meta_cache",
"distinct_cache",
"--host",
&server_addr,
"--database",
@ -396,7 +396,7 @@ async fn test_create_delete_meta_cache() {
// doing the same thing over again will be a no-op
let result = run(&[
"create",
"meta_cache",
"distinct_cache",
"--host",
&server_addr,
"--database",
@ -414,7 +414,7 @@ async fn test_create_delete_meta_cache() {
// now delete it:
let result = run(&[
"delete",
"meta_cache",
"distinct_cache",
"--host",
&server_addr,
"--database",
@ -423,11 +423,11 @@ async fn test_create_delete_meta_cache() {
table_name,
cache_name,
]);
assert_contains!(&result, "meta cache deleted successfully");
assert_contains!(&result, "distinct cache deleted successfully");
// trying to delete again should result in an error as the cache no longer exists:
let result = run_and_err(&[
"delete",
"meta_cache",
"distinct_cache",
"--host",
&server_addr,
"--database",
@ -438,6 +438,7 @@ async fn test_create_delete_meta_cache() {
]);
assert_contains!(&result, "[404 Not Found]: cache not found");
}
#[test_log::test(tokio::test)]
async fn test_create_plugin() {
let server = TestServer::spawn().await;
@ -790,7 +791,7 @@ fn test_create_token() {
}
#[tokio::test]
async fn meta_cache_create_and_delete() {
async fn distinct_cache_create_and_delete() {
let server = TestServer::spawn().await;
let db_name = "foo";
let server_addr = server.client_addr();
@ -805,7 +806,7 @@ async fn meta_cache_create_and_delete() {
let result = run_with_confirmation(&[
"create",
"meta_cache",
"distinct_cache",
"-H",
&server_addr,
"-d",
@ -825,7 +826,7 @@ async fn meta_cache_create_and_delete() {
let result = run_with_confirmation(&[
"delete",
"meta_cache",
"distinct_cache",
"-H",
&server_addr,
"-d",

View File

@ -7,11 +7,11 @@ use test_helpers::assert_contains;
use crate::TestServer;
#[tokio::test]
async fn api_v3_configure_meta_cache_create() {
async fn api_v3_configure_distinct_cache_create() {
let server = TestServer::spawn().await;
let client = reqwest::Client::new();
let url = format!(
"{base}/api/v3/configure/meta_cache",
"{base}/api/v3/configure/distinct_cache",
base = server.client_addr()
);
@ -157,7 +157,7 @@ async fn api_v3_configure_meta_cache_create() {
.json(&body)
.send()
.await
.expect("send request to create meta cache");
.expect("send request to create distinct cache");
let status = resp.status();
assert_eq!(
tc.expected,
@ -169,11 +169,11 @@ async fn api_v3_configure_meta_cache_create() {
}
#[tokio::test]
async fn api_v3_configure_meta_cache_delete() {
async fn api_v3_configure_distinct_cache_delete() {
let server = TestServer::spawn().await;
let client = reqwest::Client::new();
let url = format!(
"{base}/api/v3/configure/meta_cache",
"{base}/api/v3/configure/distinct_cache",
base = server.client_addr()
);
@ -210,7 +210,7 @@ async fn api_v3_configure_meta_cache_delete() {
use Request::*;
let mut test_cases = [
TestCase {
description: "create a metadata cache",
description: "create a distinct cache",
request: Create(serde_json::json!({
"db": db_name,
"table": tbl_name,

View File

@ -117,8 +117,8 @@ async fn flight() -> Result<(), influxdb3_client::Error> {
"| public | information_schema | tables | VIEW |",
"| public | information_schema | views | VIEW |",
"| public | iox | cpu | BASE TABLE |",
"| public | system | distinct_caches | BASE TABLE |",
"| public | system | last_caches | BASE TABLE |",
"| public | system | meta_caches | BASE TABLE |",
"| public | system | parquet_files | BASE TABLE |",
"| public | system | processing_engine_plugins | BASE TABLE |",
"| public | system | processing_engine_triggers | BASE TABLE |",

View File

@ -351,34 +351,34 @@ impl TestServer {
.expect("failed to send request to delete last cache")
}
pub async fn api_v3_configure_meta_cache_create(
pub async fn api_v3_configure_distinct_cache_create(
&self,
request: &serde_json::Value,
) -> Response {
self.http_client
.post(format!(
"{base}/api/v3/configure/meta_cache",
"{base}/api/v3/configure/distinct_cache",
base = self.client_addr()
))
.json(request)
.send()
.await
.expect("failed to send request to create metadata cache")
.expect("failed to send request to create distinct cache")
}
pub async fn api_v3_configure_meta_cache_delete(
pub async fn api_v3_configure_distinct_cache_delete(
&self,
request: &serde_json::Value,
) -> Response {
self.http_client
.delete(format!(
"{base}/api/v3/configure/meta_cache",
"{base}/api/v3/configure/distinct_cache",
base = self.client_addr()
))
.json(request)
.send()
.await
.expect("failed to send request to delete metadata cache")
.expect("failed to send request to delete distinct cache")
}
}

View File

@ -1584,7 +1584,7 @@ async fn api_v1_query_uri_and_body() {
}
#[tokio::test]
async fn api_v3_query_sql_meta_cache() {
async fn api_v3_query_sql_distinct_cache() {
let server = TestServer::spawn().await;
server
.write_lp_to_db("foo", "cpu,region=us,host=a usage=99", Precision::Second)
@ -1594,7 +1594,7 @@ async fn api_v3_query_sql_meta_cache() {
server
.http_client
.post(format!(
"{base}/api/v3/configure/meta_cache",
"{base}/api/v3/configure/distinct_cache",
base = server.client_addr()
))
.json(&serde_json::json!({
@ -1623,7 +1623,7 @@ async fn api_v3_query_sql_meta_cache() {
.api_v3_query_sql(&[
("db", "foo"),
("format", "pretty"),
("q", "SELECT * FROM meta_cache('cpu')"),
("q", "SELECT * FROM distinct_cache('cpu')"),
])
.await
.text()
@ -1647,7 +1647,7 @@ async fn api_v3_query_sql_meta_cache() {
.api_v3_query_sql(&[
("db", "foo"),
("format", "json"),
("q", "SELECT * FROM meta_cache('cpu')"),
("q", "SELECT * FROM distinct_cache('cpu')"),
])
.await
.json::<Value>()

View File

@ -0,0 +1,5 @@
---
source: influxdb3/tests/server/cli.rs
expression: result
---
distinct cache deleted successfully

View File

@ -0,0 +1,5 @@
---
source: influxdb3/tests/server/cli.rs
expression: result
---
"new cache created: {\n \"table_id\": 0,\n \"table_name\": \"cpu\",\n \"cache_name\": \"cache_money\",\n \"column_ids\": [\n 0,\n 1\n ],\n \"max_cardinality\": 20000,\n \"max_age_seconds\": 200\n}"

View File

@ -280,7 +280,7 @@ async fn last_caches_table() {
}
#[tokio::test]
async fn meta_caches_table() {
async fn distinct_caches_table() {
let server = TestServer::spawn().await;
let db_1_name = "foo";
let db_2_name = "bar";
@ -308,21 +308,21 @@ async fn meta_caches_table() {
.await
.unwrap();
// check that there are no meta caches:
// check that there are no distinct caches:
for db_name in [db_1_name, db_2_name] {
let response_stream = server
.flight_sql_client(db_name)
.await
.query("SELECT * FROM system.meta_caches")
.query("SELECT * FROM system.distinct_caches")
.await
.unwrap();
let batches = collect_stream(response_stream).await;
assert_batches_sorted_eq!(["++", "++",], &batches);
}
// create some metadata caches on the two databases:
// create some distinct caches on the two databases:
assert!(server
.api_v3_configure_meta_cache_create(&json!({
.api_v3_configure_distinct_cache_create(&json!({
"db": db_1_name,
"table": "cpu",
"columns": ["region", "host"],
@ -331,7 +331,7 @@ async fn meta_caches_table() {
.status()
.is_success());
assert!(server
.api_v3_configure_meta_cache_create(&json!({
.api_v3_configure_distinct_cache_create(&json!({
"db": db_1_name,
"table": "mem",
"columns": ["region", "host"],
@ -341,7 +341,7 @@ async fn meta_caches_table() {
.status()
.is_success());
assert!(server
.api_v3_configure_meta_cache_create(&json!({
.api_v3_configure_distinct_cache_create(&json!({
"db": db_2_name,
"table": "cpu",
"columns": ["host"],
@ -356,51 +356,51 @@ async fn meta_caches_table() {
let response_stream = server
.flight_sql_client(db_1_name)
.await
.query("SELECT * FROM system.meta_caches")
.query("SELECT * FROM system.distinct_caches")
.await
.unwrap();
let batches = collect_stream(response_stream).await;
assert_batches_sorted_eq!([
"+-------+----------------------------+------------+----------------+-----------------+-----------------+",
"| table | name | column_ids | column_names | max_cardinality | max_age_seconds |",
"+-------+----------------------------+------------+----------------+-----------------+-----------------+",
"| cpu | cpu_region_host_meta_cache | [0, 1] | [region, host] | 100000 | 86400 |",
"| mem | mem_region_host_meta_cache | [4, 5] | [region, host] | 1000 | 86400 |",
"+-------+----------------------------+------------+----------------+-----------------+-----------------+",
"+-------+--------------------------------+------------+----------------+-----------------+-----------------+",
"| table | name | column_ids | column_names | max_cardinality | max_age_seconds |",
"+-------+--------------------------------+------------+----------------+-----------------+-----------------+",
"| cpu | cpu_region_host_distinct_cache | [0, 1] | [region, host] | 100000 | 86400 |",
"| mem | mem_region_host_distinct_cache | [4, 5] | [region, host] | 1000 | 86400 |",
"+-------+--------------------------------+------------+----------------+-----------------+-----------------+",
], &batches);
}
{
let response_stream = server
.flight_sql_client(db_2_name)
.await
.query("SELECT * FROM system.meta_caches")
.query("SELECT * FROM system.distinct_caches")
.await
.unwrap();
let batches = collect_stream(response_stream).await;
assert_batches_sorted_eq!([
"+-------+---------------------+------------+--------------+-----------------+-----------------+",
"| table | name | column_ids | column_names | max_cardinality | max_age_seconds |",
"+-------+---------------------+------------+--------------+-----------------+-----------------+",
"| cpu | cpu_host_meta_cache | [9] | [host] | 100000 | 1000 |",
"+-------+---------------------+------------+--------------+-----------------+-----------------+",
"+-------+-------------------------+------------+--------------+-----------------+-----------------+",
"| table | name | column_ids | column_names | max_cardinality | max_age_seconds |",
"+-------+-------------------------+------------+--------------+-----------------+-----------------+",
"| cpu | cpu_host_distinct_cache | [9] | [host] | 100000 | 1000 |",
"+-------+-------------------------+------------+--------------+-----------------+-----------------+",
], &batches);
}
// delete caches and check that the system tables reflect those changes:
assert!(server
.api_v3_configure_meta_cache_delete(&json!({
.api_v3_configure_distinct_cache_delete(&json!({
"db": db_1_name,
"table": "cpu",
"name": "cpu_region_host_meta_cache",
"name": "cpu_region_host_distinct_cache",
}))
.await
.status()
.is_success());
assert!(server
.api_v3_configure_meta_cache_delete(&json!({
.api_v3_configure_distinct_cache_delete(&json!({
"db": db_2_name,
"table": "cpu",
"name": "cpu_host_meta_cache",
"name": "cpu_host_distinct_cache",
}))
.await
.status()
@ -411,23 +411,23 @@ async fn meta_caches_table() {
let response_stream = server
.flight_sql_client(db_1_name)
.await
.query("SELECT * FROM system.meta_caches")
.query("SELECT * FROM system.distinct_caches")
.await
.unwrap();
let batches = collect_stream(response_stream).await;
assert_batches_sorted_eq!([
"+-------+----------------------------+------------+----------------+-----------------+-----------------+",
"| table | name | column_ids | column_names | max_cardinality | max_age_seconds |",
"+-------+----------------------------+------------+----------------+-----------------+-----------------+",
"| mem | mem_region_host_meta_cache | [4, 5] | [region, host] | 1000 | 86400 |",
"+-------+----------------------------+------------+----------------+-----------------+-----------------+",
"+-------+--------------------------------+------------+----------------+-----------------+-----------------+",
"| table | name | column_ids | column_names | max_cardinality | max_age_seconds |",
"+-------+--------------------------------+------------+----------------+-----------------+-----------------+",
"| mem | mem_region_host_distinct_cache | [4, 5] | [region, host] | 1000 | 86400 |",
"+-------+--------------------------------+------------+----------------+-----------------+-----------------+",
], &batches);
}
{
let response_stream = server
.flight_sql_client(db_2_name)
.await
.query("SELECT * FROM system.meta_caches")
.query("SELECT * FROM system.distinct_caches")
.await
.unwrap();
let batches = collect_stream(response_stream).await;

View File

@ -14,7 +14,7 @@ use arrow::{
use indexmap::IndexMap;
use influxdb3_catalog::catalog::TableDefinition;
use influxdb3_id::{ColumnId, TableId};
use influxdb3_wal::{FieldData, MetaCacheDefinition, Row};
use influxdb3_wal::{DistinctCacheDefinition, FieldData, Row};
use iox_time::TimeProvider;
use schema::{InfluxColumnType, InfluxFieldType};
use serde::Deserialize;
@ -24,7 +24,7 @@ pub enum CacheError {
#[error("must pass a non-empty set of column ids")]
EmptyColumnSet,
#[error(
"cannot use a column of type {attempted} in a metadata cache, only \
"cannot use a column of type {attempted} in a distinct value cache, only \
tags and string fields can be used"
)]
NonTagOrStringColumn { attempted: InfluxColumnType },
@ -34,9 +34,9 @@ pub enum CacheError {
Unexpected(#[from] anyhow::Error),
}
/// A metadata cache for storing distinct values for a set of columns in a table
/// A cache for storing distinct values for a set of columns in a table
#[derive(Debug)]
pub(crate) struct MetaCache {
pub(crate) struct DistinctCache {
time_provider: Arc<dyn TimeProvider>,
/// The maximum number of unique value combinations in the cache
max_cardinality: usize,
@ -45,23 +45,23 @@ pub(crate) struct MetaCache {
/// The fixed Arrow schema used to produce record batches from the cache
schema: SchemaRef,
/// Holds current state of the cache
state: MetaCacheState,
state: DistinctCacheState,
/// The identifiers of the columns used in the cache
column_ids: Vec<ColumnId>,
/// The cache data, stored in a tree
data: Node,
}
/// Type for tracking the current state of a [`MetaCache`]
/// Type for tracking the current state of a [`DistinctCache`]
#[derive(Debug, Default)]
struct MetaCacheState {
struct DistinctCacheState {
/// The current number of unique value combinations in the cache
cardinality: usize,
}
/// Arguments to create a new [`MetaCache`]
/// Arguments to create a new [`DistinctCache`]
#[derive(Debug)]
pub struct CreateMetaCacheArgs {
pub struct CreateDistinctCacheArgs {
pub table_def: Arc<TableDefinition>,
pub max_cardinality: MaxCardinality,
pub max_age: MaxAge,
@ -130,19 +130,19 @@ impl MaxAge {
}
}
impl MetaCache {
/// Create a new [`MetaCache`]
impl DistinctCache {
/// Create a new [`DistinctCache`]
///
/// Must pass a non-empty set of [`ColumnId`]s which correspond to valid columns in the provided
/// [`TableDefinition`].
pub(crate) fn new(
time_provider: Arc<dyn TimeProvider>,
CreateMetaCacheArgs {
CreateDistinctCacheArgs {
table_def,
max_cardinality,
max_age,
column_ids,
}: CreateMetaCacheArgs,
}: CreateDistinctCacheArgs,
) -> Result<Self, CacheError> {
if column_ids.is_empty() {
return Err(CacheError::EmptyColumnSet);
@ -151,7 +151,7 @@ impl MetaCache {
let mut builder = SchemaBuilder::new();
for id in &column_ids {
let col = table_def.columns.get(id).with_context(|| {
format!("invalid column id ({id}) encountered while creating metadata cache")
format!("invalid column id ({id}) encountered while creating distinct value cache")
})?;
let data_type = match col.data_type {
InfluxColumnType::Tag | InfluxColumnType::Field(InfluxFieldType::String) => {
@ -166,7 +166,7 @@ impl MetaCache {
time_provider,
max_cardinality: max_cardinality.into(),
max_age: max_age.into(),
state: MetaCacheState::default(),
state: DistinctCacheState::default(),
schema: Arc::new(builder.finish()),
column_ids,
data: Node::default(),
@ -341,14 +341,14 @@ impl MetaCache {
Ok(())
}
/// Create a [`MetaCacheDefinition`] from this cache along with the given args
/// Create a [`DistinctCacheDefinition`] from this cache along with the given args
pub(super) fn to_definition(
&self,
table_id: TableId,
table_name: Arc<str>,
cache_name: Arc<str>,
) -> MetaCacheDefinition {
MetaCacheDefinition {
) -> DistinctCacheDefinition {
DistinctCacheDefinition {
table_id,
table_name,
cache_name,
@ -359,7 +359,7 @@ impl MetaCache {
}
}
/// A node in the `data` tree of a [`MetaCache`]
/// A node in the `data` tree of a [`DistinctCache`]
///
/// Recursive struct holding a [`BTreeMap`] whose keys are the values nested under this node, and
/// whose values hold the last seen time as an [`i64`] of each value, and an optional reference to
@ -541,7 +541,7 @@ impl From<&FieldData> for Value {
| FieldData::Integer(_)
| FieldData::UInteger(_)
| FieldData::Float(_)
| FieldData::Boolean(_) => panic!("unexpected field type for metadata cache"),
| FieldData::Boolean(_) => panic!("unexpected field type for distinct value cache"),
}
}
}
@ -552,7 +552,7 @@ impl From<String> for Value {
}
}
/// A predicate that can be applied when gathering [`RecordBatch`]es from a [`MetaCache`]
/// A predicate that can be applied when gathering [`RecordBatch`]es from a [`DistinctCache`]
///
/// This is intended to be derived from a set of filter expressions in Datafusion by analyzing
/// them with a `LiteralGuarantee`.

View File

@ -1,12 +1,12 @@
//! The Metadata Cache holds the distinct values for a column or set of columns on a table
//! The Distinct Value Cache holds the distinct values for a column or set of columns on a table
mod cache;
pub use cache::{CacheError, CreateMetaCacheArgs, MaxAge, MaxCardinality};
pub use cache::{CacheError, CreateDistinctCacheArgs, MaxAge, MaxCardinality};
mod provider;
pub use provider::{MetaCacheProvider, ProviderError};
pub use provider::{DistinctCacheProvider, ProviderError};
mod table_function;
pub use table_function::MetaCacheFunction;
pub use table_function::META_CACHE_UDTF_NAME;
pub use table_function::DistinctCacheFunction;
pub use table_function::DISTINCT_CACHE_UDTF_NAME;
#[cfg(test)]
mod tests {
@ -18,9 +18,9 @@ mod tests {
use std::{sync::Arc, time::Duration};
use crate::{
meta_cache::{
cache::{CreateMetaCacheArgs, MaxAge, MaxCardinality, MetaCache, Predicate},
MetaCacheFunction, MetaCacheProvider, META_CACHE_UDTF_NAME,
distinct_cache::{
cache::{CreateDistinctCacheArgs, DistinctCache, MaxAge, MaxCardinality, Predicate},
DistinctCacheFunction, DistinctCacheProvider, DISTINCT_CACHE_UDTF_NAME,
},
test_helpers::TestWriter,
};
@ -57,9 +57,9 @@ mod tests {
let region_col_id = column_ids[0];
let host_col_id = column_ids[1];
// create the cache:
let mut cache = MetaCache::new(
let mut cache = DistinctCache::new(
time_provider,
CreateMetaCacheArgs {
CreateDistinctCacheArgs {
table_def,
max_cardinality: MaxCardinality::default(),
max_age: MaxAge::default(),
@ -210,9 +210,9 @@ mod tests {
.map(|name| table_def.column_name_to_id_unchecked(name))
.collect();
// create a cache with some cardinality and age limits:
let mut cache = MetaCache::new(
let mut cache = DistinctCache::new(
Arc::clone(&time_provider) as _,
CreateMetaCacheArgs {
CreateDistinctCacheArgs {
table_def,
max_cardinality: MaxCardinality::try_from(10).unwrap(),
max_age: MaxAge::from(Duration::from_nanos(100)),
@ -293,7 +293,7 @@ mod tests {
}
#[test]
fn meta_cache_limit() {
fn distinct_cache_limit() {
let writer = TestWriter::new();
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let rows = writer.write_lp_to_rows(
@ -318,9 +318,9 @@ mod tests {
.into_iter()
.map(|name| table_def.column_name_to_id_unchecked(name))
.collect();
let mut cache = MetaCache::new(
let mut cache = DistinctCache::new(
time_provider,
CreateMetaCacheArgs {
CreateDistinctCacheArgs {
table_def,
max_cardinality: MaxCardinality::default(),
max_age: MaxAge::default(),
@ -379,21 +379,21 @@ mod tests {
);
}
/// This test sets up a [`MetaCacheProvider`], creates a [`MetaCache`] using the `region` and
/// This test sets up a [`DistinctCacheProvider`], creates a [`DistinctCache`] using the `region` and
/// `host` column, and then writes several different unique combinations of values into it.
/// It then sets up a DataFusion [`SessionContext`], registers our [`MetaCacheFunction`] as a
/// It then sets up a DataFusion [`SessionContext`], registers our [`DistinctCacheFunction`] as a
/// UDTF, and then runs a series of test cases to verify queries against the function.
///
/// The purpose of this is to see that the cache works as intended, and importantly, that the
/// predicate pushdown is happening and being leveraged by the underlying [`MetaCache`], vs.
/// predicate pushdown is happening and being leveraged by the underlying [`DistinctCache`], vs.
/// DataFusion doing it for us with a higher level FilterExec.
///
/// Each test case verifies the `RecordBatch` output of the query as well as the output for
/// EXPLAIN on the same query. The EXPLAIN output contains a line for the MetaCacheExec, which
/// is the custom execution plan impl for the metadata cache that captures the predicates that
/// are pushed down to the underlying [`MetaCacahe::to_record_batch`] method, if any.
/// EXPLAIN on the same query. The EXPLAIN output contains a line for the DistinctCacheExec, which
/// is the custom execution plan impl for the distinct value cache that captures the predicates that
/// are pushed down to the underlying [`DistinctCacahe::to_record_batch`] method, if any.
#[tokio::test]
async fn test_datafusion_meta_cache_udtf() {
async fn test_datafusion_distinct_cache_udtf() {
// create a test writer and do a write in to populate the catalog with a db/table:
let writer = TestWriter::new();
let _ = writer.write_lp_to_write_batch(
@ -403,7 +403,7 @@ mod tests {
0,
);
// create a meta provider and a cache on tag columns 'region' and 'host':
// create a distinct provider and a cache on tag columns 'region' and 'host':
let db_schema = writer.db_schema();
let table_def = db_schema.table_definition("cpu").unwrap();
let column_ids: Vec<ColumnId> = ["region", "host"]
@ -411,13 +411,13 @@ mod tests {
.map(|name| table_def.column_name_to_id_unchecked(name))
.collect();
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let meta_provider =
MetaCacheProvider::new_from_catalog(time_provider, writer.catalog()).unwrap();
meta_provider
let distinct_provider =
DistinctCacheProvider::new_from_catalog(time_provider, writer.catalog()).unwrap();
distinct_provider
.create_cache(
db_schema.id,
None,
CreateMetaCacheArgs {
CreateDistinctCacheArgs {
table_def,
max_cardinality: MaxCardinality::default(),
max_age: MaxAge::default(),
@ -448,13 +448,14 @@ mod tests {
(0, 1, 0),
[influxdb3_wal::create::write_batch_op(write_batch)],
);
meta_provider.write_wal_contents_to_cache(&wal_contents);
distinct_provider.write_wal_contents_to_cache(&wal_contents);
// Spin up a DataFusion session context and add the meta_cache function to it so we
// Spin up a DataFusion session context and add the distinct_cache function to it so we
// can query for data in the cache we created and populated above:
let ctx = SessionContext::new();
let meta_func = MetaCacheFunction::new(db_schema.id, Arc::clone(&meta_provider));
ctx.register_udtf(META_CACHE_UDTF_NAME, Arc::new(meta_func));
let distinct_func =
DistinctCacheFunction::new(db_schema.id, Arc::clone(&distinct_provider));
ctx.register_udtf(DISTINCT_CACHE_UDTF_NAME, Arc::new(distinct_func));
struct TestCase<'a> {
/// A short description of the test
@ -462,14 +463,14 @@ mod tests {
/// A SQL expression to evaluate using the datafusion session context, should be of
/// the form:
/// ```sql
/// SELECT * FROM meta_cache('cpu') ...
/// SELECT * FROM distinct_cache('cpu') ...
/// ```
sql: &'a str,
/// Expected record batch output
expected: &'a [&'a str],
/// Expected EXPLAIN output contains this.
///
/// For checking the `MetaCacheExec` portion of the EXPLAIN output for the given `sql`
/// For checking the `DistinctCacheExec` portion of the EXPLAIN output for the given `sql`
/// query. A "contains" is used instead of matching the whole EXPLAIN output to prevent
/// flakyness from upstream changes to other parts of the query plan.
explain_contains: &'a str,
@ -480,7 +481,7 @@ mod tests {
///
/// The cache should produce results in a sorted order as-is, however, some queries
/// that process the results after they are emitted from the cache may have their order
/// altered by DataFusion, e.g., `SELECT DISTINCT(column_name) FROM meta_cache('table')`
/// altered by DataFusion, e.g., `SELECT DISTINCT(column_name) FROM distinct_cache('table')`
/// or queries that project columns that are not at the top level of the cache.
use_sorted_assert: bool,
}
@ -488,7 +489,7 @@ mod tests {
let test_cases = [
TestCase {
_desc: "no predicates",
sql: "SELECT * FROM meta_cache('cpu')",
sql: "SELECT * FROM distinct_cache('cpu')",
expected: &[
"+---------+------+",
"| region | host |",
@ -507,12 +508,12 @@ mod tests {
"| us-west | d |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: projection=[region@0, host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
_desc: "eq predicate on region",
sql: "SELECT * FROM meta_cache('cpu') WHERE region = 'us-east'",
sql: "SELECT * FROM distinct_cache('cpu') WHERE region = 'us-east'",
expected: &[
"+---------+------+",
"| region | host |",
@ -521,12 +522,12 @@ mod tests {
"| us-east | b |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
_desc: "eq predicate on region and host",
sql: "SELECT * FROM meta_cache('cpu') \
sql: "SELECT * FROM distinct_cache('cpu') \
WHERE region = 'us-east' AND host = 'a'",
expected: &[
"+---------+------+",
@ -535,12 +536,12 @@ mod tests {
"| us-east | a |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
_desc: "eq predicate on region; in predicate on host",
sql: "SELECT * FROM meta_cache('cpu') \
sql: "SELECT * FROM distinct_cache('cpu') \
WHERE region = 'us-east' AND host IN ('a', 'b')",
expected: &[
"+---------+------+",
@ -550,12 +551,12 @@ mod tests {
"| us-east | b |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
_desc: "eq predicate on region; not in predicate on host",
sql: "SELECT * FROM meta_cache('cpu') \
sql: "SELECT * FROM distinct_cache('cpu') \
WHERE region = 'us-east' AND host != 'a'",
expected: &[
"+---------+------+",
@ -564,12 +565,12 @@ mod tests {
"| us-east | b |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 NOT IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 NOT IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
_desc: "in predicate on region",
sql: "SELECT * FROM meta_cache('cpu') \
sql: "SELECT * FROM distinct_cache('cpu') \
WHERE region IN ('ca-cent', 'ca-east', 'us-east', 'us-west')",
expected: &[
"+---------+------+",
@ -583,12 +584,12 @@ mod tests {
"| us-west | d |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
_desc: "not in predicate on region",
sql: "SELECT * FROM meta_cache('cpu') \
sql: "SELECT * FROM distinct_cache('cpu') \
WHERE region NOT IN ('ca-cent', 'ca-east', 'us-east', 'us-west')",
expected: &[
"+---------+------+",
@ -602,12 +603,12 @@ mod tests {
"| eu-west | l |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 NOT IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 NOT IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
_desc: "or eq predicates on region",
sql: "SELECT * FROM meta_cache('cpu') \
sql: "SELECT * FROM distinct_cache('cpu') \
WHERE region = 'us-east' OR region = 'ca-east'",
expected: &[
"+---------+------+",
@ -618,12 +619,12 @@ mod tests {
"| us-east | b |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (ca-east,us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (ca-east,us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
_desc: "or eq predicate on host",
sql: "SELECT * FROM meta_cache('cpu') \
sql: "SELECT * FROM distinct_cache('cpu') \
WHERE host = 'd' OR host = 'e'",
expected: &[
"+---------+------+",
@ -633,12 +634,12 @@ mod tests {
"| us-west | d |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[host@1 IN (d,e)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[host@1 IN (d,e)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
_desc: "un-grouped host conditions are not handled in predicate pushdown",
sql: "SELECT * FROM meta_cache('cpu') \
sql: "SELECT * FROM distinct_cache('cpu') \
WHERE region = 'us-east' AND host = 'a' OR host = 'b'",
expected: &[
"+---------+------+",
@ -648,12 +649,12 @@ mod tests {
"| us-east | b |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: projection=[region@0, host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
_desc: "grouped host conditions are handled in predicate pushdown",
sql: "SELECT * FROM meta_cache('cpu') \
sql: "SELECT * FROM distinct_cache('cpu') \
WHERE region = 'us-east' AND (host = 'a' OR host = 'b')",
expected: &[
"+---------+------+",
@ -663,12 +664,12 @@ mod tests {
"| us-east | b |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
_desc: "project region column",
sql: "SELECT region FROM meta_cache('cpu')",
sql: "SELECT region FROM distinct_cache('cpu')",
expected: &[
"+---------+",
"| region |",
@ -682,12 +683,12 @@ mod tests {
"| us-west |",
"+---------+",
],
explain_contains: "MetaCacheExec: projection=[region@0] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region@0] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
_desc: "project region column taking distinct",
sql: "SELECT DISTINCT(region) FROM meta_cache('cpu')",
sql: "SELECT DISTINCT(region) FROM distinct_cache('cpu')",
expected: &[
"+---------+",
"| region |",
@ -701,13 +702,13 @@ mod tests {
"| us-west |",
"+---------+",
],
explain_contains: "MetaCacheExec: projection=[region@0] inner=MemoryExec: partitions=1, partition_sizes=[1",
explain_contains: "DistinctCacheExec: projection=[region@0] inner=MemoryExec: partitions=1, partition_sizes=[1",
// it seems that DISTINCT changes around the order of results
use_sorted_assert: true,
},
TestCase {
_desc: "project host column",
sql: "SELECT host FROM meta_cache('cpu')",
sql: "SELECT host FROM distinct_cache('cpu')",
expected: &[
"+------+", // commenting for no new line
"| host |", // commenting for no new line
@ -726,7 +727,7 @@ mod tests {
"| l |", // commenting for no new line
"+------+", // commenting for no new line
],
explain_contains: "MetaCacheExec: projection=[host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]",
// this column will not be sorted since the order of elements depends on the next level
// up in the cache, so the `region` column is iterated over in order, but the nested
// `host` values, although sorted within `region`s, will not be globally sorted.
@ -734,7 +735,7 @@ mod tests {
},
TestCase {
_desc: "project host column",
sql: "SELECT host FROM meta_cache('cpu') WHERE region = 'ca-cent'",
sql: "SELECT host FROM distinct_cache('cpu') WHERE region = 'ca-cent'",
expected: &[
"+------+", // commenting for no new line
"| host |", // commenting for no new line
@ -742,12 +743,12 @@ mod tests {
"| f |", // commenting for no new line
"+------+", // commenting for no new line
],
explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (ca-cent)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (ca-cent)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
_desc: "limit clause",
sql: "SELECT * FROM meta_cache('cpu') LIMIT 8",
sql: "SELECT * FROM distinct_cache('cpu') LIMIT 8",
expected: &[
"+---------+------+",
"| region | host |",
@ -762,12 +763,12 @@ mod tests {
"| eu-west | l |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: projection=[region@0, host@1] limit=8 inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] limit=8 inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
_desc: "limit and offset",
sql: "SELECT * FROM meta_cache('cpu') LIMIT 8 OFFSET 8",
sql: "SELECT * FROM distinct_cache('cpu') LIMIT 8 OFFSET 8",
expected: &[
"+---------+------+",
"| region | host |",
@ -778,12 +779,12 @@ mod tests {
"| us-west | d |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: projection=[region@0, host@1] limit=16 inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] limit=16 inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
_desc: "like clause",
sql: "SELECT * FROM meta_cache('cpu') \
sql: "SELECT * FROM distinct_cache('cpu') \
WHERE region LIKE 'u%'",
expected: &[
"+---------+------+",
@ -795,7 +796,7 @@ mod tests {
"| us-west | d |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: projection=[region@0, host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "DistinctCacheExec: projection=[region@0, host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
];
@ -820,7 +821,7 @@ mod tests {
// NOTE(hiltontj): this probably can be done a better way?
// The EXPLAIN output will have two columns, the one we are interested in that contains
// the details of the MetaCacheExec is called `plan`...
// the details of the DistinctCacheExec is called `plan`...
assert!(
explain
.column_by_name("plan")

View File

@ -4,14 +4,14 @@ use anyhow::Context;
use arrow::datatypes::SchemaRef;
use influxdb3_catalog::catalog::{Catalog, TableDefinition};
use influxdb3_id::{DbId, TableId};
use influxdb3_wal::{MetaCacheDefinition, WalContents, WalOp};
use influxdb3_wal::{DistinctCacheDefinition, WalContents, WalOp};
use iox_time::TimeProvider;
use parking_lot::RwLock;
use crate::meta_cache::cache::{MaxAge, MaxCardinality};
use crate::distinct_cache::cache::{MaxAge, MaxCardinality};
use super::{
cache::{CreateMetaCacheArgs, MetaCache},
cache::{CreateDistinctCacheArgs, DistinctCache},
CacheError,
};
@ -25,40 +25,40 @@ pub enum ProviderError {
Unexpected(#[from] anyhow::Error),
}
/// Triple nested map for storing a multiple metadata caches per table.
/// Triple nested map for storing a multiple distinct value caches per table.
///
/// That is, the map nesting is `database -> table -> cache name`
type CacheMap = RwLock<HashMap<DbId, HashMap<TableId, HashMap<Arc<str>, MetaCache>>>>;
type CacheMap = RwLock<HashMap<DbId, HashMap<TableId, HashMap<Arc<str>, DistinctCache>>>>;
/// Provides the metadata caches for the running instance of InfluxDB
/// Provides the distinct value caches for the running instance of InfluxDB
#[derive(Debug)]
pub struct MetaCacheProvider {
pub struct DistinctCacheProvider {
pub(crate) time_provider: Arc<dyn TimeProvider>,
pub(crate) catalog: Arc<Catalog>,
pub(crate) cache_map: CacheMap,
}
impl MetaCacheProvider {
/// Initialize a [`MetaCacheProvider`] from a [`Catalog`], populating the provider's
impl DistinctCacheProvider {
/// Initialize a [`DistinctCacheProvider`] from a [`Catalog`], populating the provider's
/// `cache_map` from the definitions in the catalog.
pub fn new_from_catalog(
time_provider: Arc<dyn TimeProvider>,
catalog: Arc<Catalog>,
) -> Result<Arc<Self>, ProviderError> {
let provider = Arc::new(MetaCacheProvider {
let provider = Arc::new(DistinctCacheProvider {
time_provider,
catalog: Arc::clone(&catalog),
cache_map: Default::default(),
});
for db_schema in catalog.list_db_schema() {
for table_def in db_schema.tables() {
for (cache_name, cache_def) in table_def.meta_caches() {
for (cache_name, cache_def) in table_def.distinct_caches() {
assert!(
provider
.create_cache(
db_schema.id,
Some(cache_name),
CreateMetaCacheArgs {
CreateDistinctCacheArgs {
table_def: Arc::clone(&table_def),
max_cardinality: MaxCardinality::try_from(
cache_def.max_cardinality
@ -78,10 +78,10 @@ impl MetaCacheProvider {
Ok(provider)
}
/// Initialize a [`MetaCacheProvider`] from a [`Catalog`], populating the provider's
/// Initialize a [`DistinctCacheProvider`] from a [`Catalog`], populating the provider's
/// `cache_map` from the definitions in the catalog. This starts a background process that
/// runs on the provided `eviction_interval` to perform eviction on all of the caches
/// in the created [`MetaCacheProvider`]'s `cache_map`.
/// in the created [`DistinctCacheProvider`]'s `cache_map`.
pub fn new_from_catalog_with_background_eviction(
time_provider: Arc<dyn TimeProvider>,
catalog: Arc<Catalog>,
@ -126,8 +126,8 @@ impl MetaCacheProvider {
})
}
/// Get a list of [`MetaCacheDefinition`]s for the given database
pub fn get_cache_definitions_for_db(&self, db_id: &DbId) -> Vec<MetaCacheDefinition> {
/// Get a list of [`DistinctCacheDefinition`]s for the given database
pub fn get_cache_definitions_for_db(&self, db_id: &DbId) -> Vec<DistinctCacheDefinition> {
let db_schema = self
.catalog
.db_schema_by_id(db_id)
@ -154,34 +154,34 @@ impl MetaCacheProvider {
.unwrap_or_default()
}
/// Create a new entry in the metadata cache for a given database and parameters.
/// Create a new entry in the distinct cache for a given database and parameters.
///
/// If a new cache is created, this will return the [`MetaCacheDefinition`] for the created
/// If a new cache is created, this will return the [`DistinctCacheDefinition`] for the created
/// cache; otherwise, if the provided arguments are identical to an existing cache, along with
/// any defaults, then `None` will be returned. It is an error to attempt to create a cache that
/// overwite an existing one with different parameters.
///
/// The cache name is optional; if not provided, it will be of the form:
/// ```text
/// <table_name>_<column_names>_meta_cache
/// <table_name>_<column_names>_distinct_cache
/// ```
/// Where `<column_names>` is an `_`-separated list of the column names used in the cache.
pub fn create_cache(
&self,
db_id: DbId,
cache_name: Option<Arc<str>>,
CreateMetaCacheArgs {
CreateDistinctCacheArgs {
table_def,
max_cardinality,
max_age,
column_ids,
}: CreateMetaCacheArgs,
) -> Result<Option<MetaCacheDefinition>, ProviderError> {
}: CreateDistinctCacheArgs,
) -> Result<Option<DistinctCacheDefinition>, ProviderError> {
let cache_name = if let Some(cache_name) = cache_name {
cache_name
} else {
format!(
"{table_name}_{cols}_meta_cache",
"{table_name}_{cols}_distinct_cache",
table_name = table_def.table_name,
cols = column_ids
.iter()
@ -196,9 +196,9 @@ impl MetaCacheProvider {
.into()
};
let new_cache = MetaCache::new(
let new_cache = DistinctCache::new(
Arc::clone(&self.time_provider),
CreateMetaCacheArgs {
CreateDistinctCacheArgs {
table_def: Arc::clone(&table_def),
max_cardinality,
max_age,
@ -224,7 +224,7 @@ impl MetaCacheProvider {
.or_default()
.insert(Arc::clone(&cache_name), new_cache);
Ok(Some(MetaCacheDefinition {
Ok(Some(DistinctCacheDefinition {
table_id: table_def.table_id,
table_name: Arc::clone(&table_def.table_name),
cache_name,
@ -240,11 +240,11 @@ impl MetaCacheProvider {
&self,
db_id: DbId,
table_def: Arc<TableDefinition>,
definition: &MetaCacheDefinition,
definition: &DistinctCacheDefinition,
) {
let meta_cache = MetaCache::new(
let distinct_cache = DistinctCache::new(
Arc::clone(&self.time_provider),
CreateMetaCacheArgs {
CreateDistinctCacheArgs {
table_def,
max_cardinality: definition
.max_cardinality
@ -261,7 +261,7 @@ impl MetaCacheProvider {
.or_default()
.entry(definition.table_id)
.or_default()
.insert(Arc::clone(&definition.cache_name), meta_cache);
.insert(Arc::clone(&definition.cache_name), distinct_cache);
}
/// Delete a cache from the provider
@ -350,7 +350,7 @@ impl MetaCacheProvider {
}
fn background_eviction_process(
provider: Arc<MetaCacheProvider>,
provider: Arc<DistinctCacheProvider>,
eviction_interval: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {

View File

@ -20,31 +20,31 @@ use indexmap::IndexMap;
use influxdb3_catalog::catalog::TableDefinition;
use influxdb3_id::{ColumnId, DbId};
use super::{cache::Predicate, MetaCacheProvider};
use super::{cache::Predicate, DistinctCacheProvider};
/// The name used to call the metadata cache in SQL queries
pub const META_CACHE_UDTF_NAME: &str = "meta_cache";
/// The name used to call the distinct value cache in SQL queries
pub const DISTINCT_CACHE_UDTF_NAME: &str = "distinct_cache";
/// Implementor of the [`TableProvider`] trait that is produced a call to the [`MetaCacheFunction`]
/// Implementor of the [`TableProvider`] trait that is produced a call to the [`DistinctCacheFunction`]
#[derive(Debug)]
struct MetaCacheFunctionProvider {
/// Reference to the [`MetaCache`][super::cache::MetaCache] being queried's schema
struct DistinctCacheFunctionProvider {
/// Reference to the [`DistinctCache`][super::cache::DistinctCache] being queried's schema
schema: SchemaRef,
/// Forwarded ref to the [`MetaCacheProvider`] which is used to get the
/// [`MetaCache`][super::cache::MetaCache] for the query, along with the `db_id` and
/// `table_def`. This is done instead of passing forward a reference to the `MetaCache`
/// Forwarded ref to the [`DistinctCacheProvider`] which is used to get the
/// [`DistinctCache`][super::cache::DistinctCache] for the query, along with the `db_id` and
/// `table_def`. This is done instead of passing forward a reference to the `DistinctCache`
/// directly because doing so is not easy or possible with the Rust borrow checker.
provider: Arc<MetaCacheProvider>,
provider: Arc<DistinctCacheProvider>,
/// The database ID that the called cache is related to
db_id: DbId,
/// The table definition that the called cache is related to
table_def: Arc<TableDefinition>,
/// The name of the cache, which is determined when calling the `meta_cache` function
/// The name of the cache, which is determined when calling the `distinct_cache` function
cache_name: Arc<str>,
}
#[async_trait]
impl TableProvider for MetaCacheFunctionProvider {
impl TableProvider for DistinctCacheFunctionProvider {
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
@ -98,7 +98,7 @@ impl TableProvider for MetaCacheFunctionProvider {
(vec![], None)
};
let mut meta_exec = MetaCacheExec::try_new(
let mut distinct_exec = DistinctCacheExec::try_new(
predicates,
Arc::clone(&self.table_def),
&[batches],
@ -108,9 +108,9 @@ impl TableProvider for MetaCacheFunctionProvider {
)?;
let show_sizes = ctx.config_options().explain.show_sizes;
meta_exec = meta_exec.with_show_sizes(show_sizes);
distinct_exec = distinct_exec.with_show_sizes(show_sizes);
Ok(Arc::new(meta_exec))
Ok(Arc::new(distinct_exec))
}
}
@ -161,7 +161,7 @@ fn convert_filter_exprs(
// and analyze it using DataFusion's `LiteralGuarantee`.
//
// This will distill the provided set of `Expr`s down to either an IN list, or a NOT IN list
// which we can convert to the `Predicate` type for the metadata cache.
// which we can convert to the `Predicate` type for the distinct value cache.
//
// The main caveat is that if for some reason there are multiple `Expr`s that apply predicates
// on a given column, i.e., leading to multiple `LiteralGuarantee`s on a specific column, we
@ -217,18 +217,18 @@ fn convert_filter_exprs(
/// Implementor of the [`TableFunctionImpl`] trait, to be registered as a user-defined table function
/// in the Datafusion `SessionContext`.
#[derive(Debug)]
pub struct MetaCacheFunction {
pub struct DistinctCacheFunction {
db_id: DbId,
provider: Arc<MetaCacheProvider>,
provider: Arc<DistinctCacheProvider>,
}
impl MetaCacheFunction {
pub fn new(db_id: DbId, provider: Arc<MetaCacheProvider>) -> Self {
impl DistinctCacheFunction {
pub fn new(db_id: DbId, provider: Arc<DistinctCacheProvider>) -> Self {
Self { db_id, provider }
}
}
impl TableFunctionImpl for MetaCacheFunction {
impl TableFunctionImpl for DistinctCacheFunction {
fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
let Some(Expr::Literal(ScalarValue::Utf8(Some(table_name)))) = args.first() else {
return plan_err!("first argument must be the table name as a string");
@ -254,9 +254,9 @@ impl TableFunctionImpl for MetaCacheFunction {
table_def.table_id,
cache_name.map(|n| n.as_str()),
) else {
return plan_err!("could not find meta cache for the given arguments");
return plan_err!("could not find distinct value cache for the given arguments");
};
Ok(Arc::new(MetaCacheFunctionProvider {
Ok(Arc::new(DistinctCacheFunctionProvider {
schema,
provider: Arc::clone(&self.provider),
db_id: self.db_id,
@ -266,7 +266,7 @@ impl TableFunctionImpl for MetaCacheFunction {
}
}
/// Custom implementor of the [`ExecutionPlan`] trait for use by the metadata cache
/// Custom implementor of the [`ExecutionPlan`] trait for use by the distinct value cache
///
/// Wraps a [`MemoryExec`] from DataFusion, and mostly re-uses that. The special functionality
/// provided by this type is to track the predicates that are pushed down to the underlying cache
@ -275,20 +275,20 @@ impl TableFunctionImpl for MetaCacheFunction {
/// # Example
///
/// For a query that does not provide any predicates, or one that does provide predicates, but they
/// do no get pushed down, the `EXPLAIN` for said query will contain a line for the `MetaCacheExec`
/// do no get pushed down, the `EXPLAIN` for said query will contain a line for the `DistinctCacheExec`
/// with no predicates, including what is emitted by the inner `MemoryExec`:
///
/// ```text
/// MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]
/// DistinctCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]
/// ```
///
/// For queries that do have predicates that get pushed down, the output will include them, e.g.:
///
/// ```text
/// MetaCacheExec: predicates=[[0 IN (us-east)], [1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]
/// DistinctCacheExec: predicates=[[0 IN (us-east)], [1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]
/// ```
#[derive(Debug)]
struct MetaCacheExec {
struct DistinctCacheExec {
inner: MemoryExec,
table_def: Arc<TableDefinition>,
predicates: Option<IndexMap<ColumnId, Predicate>>,
@ -296,7 +296,7 @@ struct MetaCacheExec {
limit: Option<usize>,
}
impl MetaCacheExec {
impl DistinctCacheExec {
fn try_new(
predicates: Option<IndexMap<ColumnId, Predicate>>,
table_def: Arc<TableDefinition>,
@ -323,11 +323,11 @@ impl MetaCacheExec {
}
}
impl DisplayAs for MetaCacheExec {
impl DisplayAs for DistinctCacheExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "MetaCacheExec:")?;
write!(f, "DistinctCacheExec:")?;
if let Some(projection) = &self.projection {
write!(f, " projection=[")?;
let schema = self.schema();
@ -363,9 +363,9 @@ impl DisplayAs for MetaCacheExec {
}
}
impl ExecutionPlan for MetaCacheExec {
impl ExecutionPlan for DistinctCacheExec {
fn name(&self) -> &str {
"MetaCacheExec"
"DistinctCacheExec"
}
fn as_any(&self) -> &dyn Any {

View File

@ -1,7 +1,7 @@
//! Crate holding the various cache implementations used by InfluxDB 3
pub mod distinct_cache;
pub mod last_cache;
pub mod meta_cache;
pub mod parquet_cache;
#[cfg(test)]

View File

@ -10,9 +10,9 @@ use indexmap::IndexMap;
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb3_wal::{
CatalogBatch, CatalogOp, DeleteDatabaseDefinition, DeletePluginDefinition,
DeleteTableDefinition, DeleteTriggerDefinition, FieldAdditions, FieldDefinition,
LastCacheDefinition, LastCacheDelete, MetaCacheDefinition, MetaCacheDelete,
OrderedCatalogBatch, PluginDefinition, TriggerDefinition, TriggerIdentifier,
DeleteTableDefinition, DeleteTriggerDefinition, DistinctCacheDefinition, DistinctCacheDelete,
FieldAdditions, FieldDefinition, LastCacheDefinition, LastCacheDelete, OrderedCatalogBatch,
PluginDefinition, TriggerDefinition, TriggerIdentifier,
};
use influxdb_line_protocol::FieldValue;
use iox_time::Time;
@ -707,11 +707,11 @@ impl UpdateDatabaseSchema for CatalogOp {
CatalogOp::CreateDatabase(_) => Ok(schema),
CatalogOp::CreateTable(create_table) => create_table.update_schema(schema),
CatalogOp::AddFields(field_additions) => field_additions.update_schema(schema),
CatalogOp::CreateMetaCache(meta_cache_definition) => {
meta_cache_definition.update_schema(schema)
CatalogOp::CreateDistinctCache(distinct_cache_definition) => {
distinct_cache_definition.update_schema(schema)
}
CatalogOp::DeleteMetaCache(delete_meta_cache) => {
delete_meta_cache.update_schema(schema)
CatalogOp::DeleteDistinctCache(delete_distinct_cache) => {
delete_distinct_cache.update_schema(schema)
}
CatalogOp::CreateLastCache(create_last_cache) => {
create_last_cache.update_schema(schema)
@ -963,7 +963,7 @@ pub struct TableDefinition {
pub series_key: Vec<ColumnId>,
pub series_key_names: Vec<Arc<str>>,
pub last_caches: HashMap<Arc<str>, LastCacheDefinition>,
pub meta_caches: HashMap<Arc<str>, MetaCacheDefinition>,
pub distinct_caches: HashMap<Arc<str>, DistinctCacheDefinition>,
pub deleted: bool,
}
@ -1024,7 +1024,7 @@ impl TableDefinition {
series_key,
series_key_names,
last_caches: HashMap::new(),
meta_caches: HashMap::new(),
distinct_caches: HashMap::new(),
deleted: false,
})
}
@ -1173,15 +1173,15 @@ impl TableDefinition {
.map(|def| def.data_type)
}
/// Add the given [`MetaCacheDefinition`] to this table
pub fn add_meta_cache(&mut self, meta_cache: MetaCacheDefinition) {
self.meta_caches
.insert(Arc::clone(&meta_cache.cache_name), meta_cache);
/// Add the given [`DistinctCacheDefinition`] to this table
pub fn add_distinct_cache(&mut self, distinct_cache: DistinctCacheDefinition) {
self.distinct_caches
.insert(Arc::clone(&distinct_cache.cache_name), distinct_cache);
}
/// Remove the meta cache with the given name
pub fn remove_meta_cache(&mut self, cache_name: &str) {
self.meta_caches.remove(cache_name);
/// Remove the distinct cache with the given name
pub fn remove_distinct_cache(&mut self, cache_name: &str) {
self.distinct_caches.remove(cache_name);
}
/// Add a new last cache to this table definition
@ -1201,8 +1201,8 @@ impl TableDefinition {
.map(|(name, def)| (Arc::clone(name), def))
}
pub fn meta_caches(&self) -> impl Iterator<Item = (Arc<str>, &MetaCacheDefinition)> {
self.meta_caches
pub fn distinct_caches(&self) -> impl Iterator<Item = (Arc<str>, &DistinctCacheDefinition)> {
self.distinct_caches
.iter()
.map(|(name, def)| (Arc::clone(name), def))
}
@ -1300,7 +1300,7 @@ impl TableUpdate for FieldAdditions {
}
}
impl TableUpdate for MetaCacheDefinition {
impl TableUpdate for DistinctCacheDefinition {
fn table_id(&self) -> TableId {
self.table_id
}
@ -1311,14 +1311,14 @@ impl TableUpdate for MetaCacheDefinition {
&self,
mut table: Cow<'a, TableDefinition>,
) -> Result<Cow<'a, TableDefinition>> {
if !table.meta_caches.contains_key(&self.cache_name) {
table.to_mut().add_meta_cache(self.clone());
if !table.distinct_caches.contains_key(&self.cache_name) {
table.to_mut().add_distinct_cache(self.clone());
}
Ok(table)
}
}
impl TableUpdate for MetaCacheDelete {
impl TableUpdate for DistinctCacheDelete {
fn table_id(&self) -> TableId {
self.table_id
}
@ -1329,8 +1329,8 @@ impl TableUpdate for MetaCacheDelete {
&self,
mut table: Cow<'a, TableDefinition>,
) -> Result<Cow<'a, TableDefinition>> {
if table.meta_caches.contains_key(&self.cache_name) {
table.to_mut().meta_caches.remove(&self.cache_name);
if table.distinct_caches.contains_key(&self.cache_name) {
table.to_mut().distinct_caches.remove(&self.cache_name);
}
Ok(table)
}

View File

@ -264,7 +264,7 @@ impl Client {
}
}
/// Compose a request to the `POST /api/v3/configure/meta_cache` API
/// Compose a request to the `POST /api/v3/configure/distinct_cache` API
///
/// # Example
/// ```no_run
@ -275,33 +275,33 @@ impl Client {
/// # async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
/// let client = Client::new("http://localhost:8181")?;
/// let resp = client
/// .api_v3_configure_meta_cache_create("db_name", "table_name", ["col1", "col2"])
/// .api_v3_configure_distinct_cache_create("db_name", "table_name", ["col1", "col2"])
/// .name("cache_name")
/// .max_cardinality(NonZeroUsize::new(1_000).unwrap())
/// .max_age(Duration::from_secs(3_600))
/// .send()
/// .await
/// .expect("send create meta cache request");
/// .expect("send create distinct cache request");
/// # Ok(())
/// # }
/// ```
pub fn api_v3_configure_meta_cache_create(
pub fn api_v3_configure_distinct_cache_create(
&self,
db: impl Into<String>,
table: impl Into<String>,
columns: impl IntoIterator<Item: Into<String>>,
) -> CreateMetaCacheRequestBuilder<'_> {
CreateMetaCacheRequestBuilder::new(self, db, table, columns)
) -> CreateDistinctCacheRequestBuilder<'_> {
CreateDistinctCacheRequestBuilder::new(self, db, table, columns)
}
/// Make a request to the `DELETE /api/v3/configure/meta_cache` API
pub async fn api_v3_configure_meta_cache_delete(
/// Make a request to the `DELETE /api/v3/configure/distinct_cache` API
pub async fn api_v3_configure_distinct_cache_delete(
&self,
db: impl Into<String> + Send,
table: impl Into<String> + Send,
name: impl Into<String> + Send,
) -> Result<()> {
let url = self.base_url.join("/api/v3/configure/meta_cache")?;
let url = self.base_url.join("/api/v3/configure/distinct_cache")?;
#[derive(Serialize)]
struct Req {
db: String,
@ -317,7 +317,7 @@ impl Client {
req = req.bearer_auth(token.expose_secret());
}
let resp = req.send().await.map_err(|src| {
Error::request_send(Method::DELETE, "/api/v3/configure/meta_cache", src)
Error::request_send(Method::DELETE, "/api/v3/configure/distinct_cache", src)
})?;
let status = resp.status();
match status {
@ -1255,10 +1255,10 @@ pub enum LastCacheValueColumnsDef {
AllNonKeyColumns,
}
/// Type for composing requests to the `POST /api/v3/configure/meta_cache` API created by the
/// [`Client::api_v3_configure_meta_cache_create`] method
/// Type for composing requests to the `POST /api/v3/configure/distinct_cache` API created by the
/// [`Client::api_v3_configure_distinct_cache_create`] method
#[derive(Debug, Serialize)]
pub struct CreateMetaCacheRequestBuilder<'c> {
pub struct CreateDistinctCacheRequestBuilder<'c> {
#[serde(skip_serializing)]
client: &'c Client,
db: String,
@ -1272,7 +1272,7 @@ pub struct CreateMetaCacheRequestBuilder<'c> {
max_age: Option<u64>,
}
impl<'c> CreateMetaCacheRequestBuilder<'c> {
impl<'c> CreateDistinctCacheRequestBuilder<'c> {
fn new(
client: &'c Client,
db: impl Into<String>,
@ -1309,20 +1309,23 @@ impl<'c> CreateMetaCacheRequestBuilder<'c> {
}
/// Send the create cache request
pub async fn send(self) -> Result<Option<MetaCacheCreatedResponse>> {
let url = self.client.base_url.join("/api/v3/configure/meta_cache")?;
pub async fn send(self) -> Result<Option<DistinctCacheCreatedResponse>> {
let url = self
.client
.base_url
.join("/api/v3/configure/distinct_cache")?;
let mut req = self.client.http_client.post(url).json(&self);
if let Some(token) = &self.client.auth_token {
req = req.bearer_auth(token.expose_secret());
}
let resp = req.send().await.map_err(|src| {
Error::request_send(Method::POST, "/api/v3/configure/meta_cache", src)
Error::request_send(Method::POST, "/api/v3/configure/distinct_cache", src)
})?;
let status = resp.status();
match status {
StatusCode::CREATED => {
let content = resp
.json::<MetaCacheCreatedResponse>()
.json::<DistinctCacheCreatedResponse>()
.await
.map_err(Error::Json)?;
Ok(Some(content))
@ -1337,7 +1340,7 @@ impl<'c> CreateMetaCacheRequestBuilder<'c> {
}
#[derive(Debug, Serialize, Deserialize)]
pub struct MetaCacheCreatedResponse {
pub struct DistinctCacheCreatedResponse {
/// The id of the table the cache was created on
pub table_id: u32,
/// The name of the table the cache was created on

View File

@ -403,8 +403,8 @@ mod tests {
use crate::ProcessingEngineManagerImpl;
use data_types::NamespaceName;
use datafusion_util::config::register_iox_object_store;
use influxdb3_cache::distinct_cache::DistinctCacheProvider;
use influxdb3_cache::last_cache::LastCacheProvider;
use influxdb3_cache::meta_cache::MetaCacheProvider;
use influxdb3_catalog::catalog;
use influxdb3_internal_api::query_executor::UnimplementedQueryExecutor;
use influxdb3_wal::{
@ -803,14 +803,16 @@ mod tests {
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
let catalog = Arc::new(persister.load_or_create_catalog().await.unwrap());
let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap();
let meta_cache =
MetaCacheProvider::new_from_catalog(Arc::clone(&time_provider), Arc::clone(&catalog))
.unwrap();
let distinct_cache = DistinctCacheProvider::new_from_catalog(
Arc::clone(&time_provider),
Arc::clone(&catalog),
)
.unwrap();
let wbuf = WriteBufferImpl::new(WriteBufferImplArgs {
persister,
catalog: Arc::clone(&catalog),
last_cache,
meta_cache,
distinct_cache,
time_provider: Arc::clone(&time_provider),
executor: make_exec(),
wal_config,

View File

@ -19,8 +19,8 @@ use hyper::header::CONTENT_TYPE;
use hyper::http::HeaderValue;
use hyper::HeaderMap;
use hyper::{Body, Method, Request, Response, StatusCode};
use influxdb3_cache::distinct_cache::{self, CreateDistinctCacheArgs, MaxAge, MaxCardinality};
use influxdb3_cache::last_cache;
use influxdb3_cache::meta_cache::{self, CreateMetaCacheArgs, MaxAge, MaxCardinality};
use influxdb3_catalog::catalog::Error as CatalogError;
use influxdb3_internal_api::query_executor::{QueryExecutor, QueryExecutorError, QueryKind};
use influxdb3_process::{INFLUXDB3_GIT_HASH_SHORT, INFLUXDB3_VERSION};
@ -322,24 +322,26 @@ impl Error {
.body(Body::from(self.to_string()))
.unwrap(),
},
Self::WriteBuffer(WriteBufferError::MetaCacheError(ref mc_err)) => match mc_err {
meta_cache::ProviderError::Cache(ref cache_err) => match cache_err {
meta_cache::CacheError::EmptyColumnSet
| meta_cache::CacheError::NonTagOrStringColumn { .. }
| meta_cache::CacheError::ConfigurationMismatch { .. } => Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from(mc_err.to_string()))
.unwrap(),
meta_cache::CacheError::Unexpected(_) => Response::builder()
Self::WriteBuffer(WriteBufferError::DistinctCacheError(ref mc_err)) => match mc_err {
distinct_cache::ProviderError::Cache(ref cache_err) => match cache_err {
distinct_cache::CacheError::EmptyColumnSet
| distinct_cache::CacheError::NonTagOrStringColumn { .. }
| distinct_cache::CacheError::ConfigurationMismatch { .. } => {
Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from(mc_err.to_string()))
.unwrap()
}
distinct_cache::CacheError::Unexpected(_) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(mc_err.to_string()))
.unwrap(),
},
meta_cache::ProviderError::CacheNotFound { .. } => Response::builder()
distinct_cache::ProviderError::CacheNotFound { .. } => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from(mc_err.to_string()))
.unwrap(),
meta_cache::ProviderError::Unexpected(_) => Response::builder()
distinct_cache::ProviderError::Unexpected(_) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(mc_err.to_string()))
.unwrap(),
@ -776,16 +778,16 @@ where
.map_err(Into::into)
}
/// Create a new metadata cache given the [`MetaCacheCreateRequest`] arguments in the request
/// Create a new distinct value cache given the [`DistinctCacheCreateRequest`] arguments in the request
/// body.
///
/// If the result is to create a cache that already exists, with the same configuration, this
/// will respond with a 204 NOT CREATED. If an existing cache would be overwritten with a
/// different configuration, that is a 400 BAD REQUEST
async fn configure_meta_cache_create(&self, req: Request<Body>) -> Result<Response<Body>> {
async fn configure_distinct_cache_create(&self, req: Request<Body>) -> Result<Response<Body>> {
let args = self.read_body_json(req).await?;
info!(?args, "create metadata cache request");
let MetaCacheCreateRequest {
info!(?args, "create distinct value cache request");
let DistinctCacheCreateRequest {
db,
table,
name,
@ -817,10 +819,10 @@ where
let max_cardinality = max_cardinality.unwrap_or_default();
match self
.write_buffer
.create_meta_cache(
.create_distinct_cache(
db_schema,
name,
CreateMetaCacheArgs {
CreateDistinctCacheArgs {
table_def,
max_cardinality,
max_age,
@ -841,11 +843,12 @@ where
}
}
/// Delete a metadata cache entry with the given [`MetaCacheDeleteRequest`] parameters
/// Delete a distinct value cache entry with the given [`DistinctCacheDeleteRequest`] parameters
///
/// The parameters must be passed in either the query string or the body of the request as JSON.
async fn configure_meta_cache_delete(&self, req: Request<Body>) -> Result<Response<Body>> {
let MetaCacheDeleteRequest { db, table, name } = if let Some(query) = req.uri().query() {
async fn configure_distinct_cache_delete(&self, req: Request<Body>) -> Result<Response<Body>> {
let DistinctCacheDeleteRequest { db, table, name } = if let Some(query) = req.uri().query()
{
serde_urlencoded::from_str(query)?
} else {
self.read_body_json(req).await?
@ -865,7 +868,7 @@ where
}
})?;
self.write_buffer
.delete_meta_cache(&db_id, &table_id, &name)
.delete_distinct_cache(&db_id, &table_id, &name)
.await?;
Ok(Response::builder()
.status(StatusCode::OK)
@ -1534,9 +1537,9 @@ impl From<iox_http::write::WriteParams> for WriteParams {
}
}
/// Request definition for the `POST /api/v3/configure/meta_cache` API
/// Request definition for the `POST /api/v3/configure/distinct_cache` API
#[derive(Debug, Deserialize)]
struct MetaCacheCreateRequest {
struct DistinctCacheCreateRequest {
/// The name of the database associated with the cache
db: String,
/// The name of the table associated with the cache
@ -1555,9 +1558,9 @@ struct MetaCacheCreateRequest {
max_age: Option<u64>,
}
/// Request definition for the `DELETE /api/v3/configure/meta_cache` API
/// Request definition for the `DELETE /api/v3/configure/distinct_cache` API
#[derive(Debug, Deserialize)]
struct MetaCacheDeleteRequest {
struct DistinctCacheDeleteRequest {
db: String,
table: String,
name: String,
@ -1727,11 +1730,11 @@ pub(crate) async fn route_request<T: TimeProvider>(
(Method::GET, "/health" | "/api/v1/health") => http_server.health(),
(Method::GET | Method::POST, "/ping") => http_server.ping(),
(Method::GET, "/metrics") => http_server.handle_metrics(),
(Method::POST, "/api/v3/configure/meta_cache") => {
http_server.configure_meta_cache_create(req).await
(Method::POST, "/api/v3/configure/distinct_cache") => {
http_server.configure_distinct_cache_create(req).await
}
(Method::DELETE, "/api/v3/configure/meta_cache") => {
http_server.configure_meta_cache_delete(req).await
(Method::DELETE, "/api/v3/configure/distinct_cache") => {
http_server.configure_distinct_cache_delete(req).await
}
(Method::POST, "/api/v3/configure/last_cache") => {
http_server.configure_last_cache_create(req).await

View File

@ -216,8 +216,8 @@ mod tests {
use crate::serve;
use datafusion::parquet::data_type::AsBytes;
use hyper::{body, Body, Client, Request, Response, StatusCode};
use influxdb3_cache::distinct_cache::DistinctCacheProvider;
use influxdb3_cache::last_cache::LastCacheProvider;
use influxdb3_cache::meta_cache::MetaCacheProvider;
use influxdb3_cache::parquet_cache::test_cached_obj_store_and_oracle;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_id::{DbId, TableId};
@ -768,7 +768,7 @@ mod tests {
persister: Arc::clone(&persister),
catalog: Arc::clone(&catalog),
last_cache: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(),
meta_cache: MetaCacheProvider::new_from_catalog(
distinct_cache: DistinctCacheProvider::new_from_catalog(
Arc::clone(&time_provider) as _,
Arc::clone(&catalog),
)

View File

@ -18,8 +18,8 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::Expr;
use datafusion_util::config::DEFAULT_SCHEMA;
use datafusion_util::MemoryStream;
use influxdb3_cache::distinct_cache::{DistinctCacheFunction, DISTINCT_CACHE_UDTF_NAME};
use influxdb3_cache::last_cache::{LastCacheFunction, LAST_CACHE_UDTF_NAME};
use influxdb3_cache::meta_cache::{MetaCacheFunction, META_CACHE_UDTF_NAME};
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema};
use influxdb3_internal_api::query_executor::{QueryExecutor, QueryExecutorError, QueryKind};
use influxdb3_sys_events::SysEventStore;
@ -485,10 +485,10 @@ impl QueryNamespace for Database {
)),
);
ctx.inner().register_udtf(
META_CACHE_UDTF_NAME,
Arc::new(MetaCacheFunction::new(
DISTINCT_CACHE_UDTF_NAME,
Arc::new(DistinctCacheFunction::new(
self.db_schema.id,
self.write_buffer.meta_cache_provider(),
self.write_buffer.distinct_cache_provider(),
)),
);
ctx
@ -636,7 +636,7 @@ mod tests {
use datafusion::assert_batches_sorted_eq;
use futures::TryStreamExt;
use influxdb3_cache::{
last_cache::LastCacheProvider, meta_cache::MetaCacheProvider,
distinct_cache::DistinctCacheProvider, last_cache::LastCacheProvider,
parquet_cache::test_cached_obj_store_and_oracle,
};
use influxdb3_catalog::catalog::Catalog;
@ -698,7 +698,7 @@ mod tests {
persister,
catalog: Arc::clone(&catalog),
last_cache: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(),
meta_cache: MetaCacheProvider::new_from_catalog(
distinct_cache: DistinctCacheProvider::new_from_catalog(
Arc::<MockProvider>::clone(&time_provider),
Arc::clone(&catalog),
)

View File

@ -4,29 +4,32 @@ use arrow::array::{GenericListBuilder, StringViewBuilder, UInt32Builder, UInt64B
use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::{error::DataFusionError, prelude::Expr};
use influxdb3_cache::meta_cache::MetaCacheProvider;
use influxdb3_cache::distinct_cache::DistinctCacheProvider;
use influxdb3_catalog::catalog::DatabaseSchema;
use influxdb3_wal::MetaCacheDefinition;
use influxdb3_wal::DistinctCacheDefinition;
use iox_system_tables::IoxSystemTable;
#[derive(Debug)]
pub(super) struct MetaCachesTable {
pub(super) struct DistinctCachesTable {
db_schema: Arc<DatabaseSchema>,
schema: SchemaRef,
provider: Arc<MetaCacheProvider>,
provider: Arc<DistinctCacheProvider>,
}
impl MetaCachesTable {
pub(super) fn new(db_schema: Arc<DatabaseSchema>, provider: Arc<MetaCacheProvider>) -> Self {
impl DistinctCachesTable {
pub(super) fn new(
db_schema: Arc<DatabaseSchema>,
provider: Arc<DistinctCacheProvider>,
) -> Self {
Self {
db_schema,
schema: meta_caches_schema(),
schema: distinct_caches_schema(),
provider,
}
}
}
fn meta_caches_schema() -> SchemaRef {
fn distinct_caches_schema() -> SchemaRef {
let columns = vec![
Field::new("table", DataType::Utf8View, false),
Field::new("name", DataType::Utf8View, false),
@ -47,7 +50,7 @@ fn meta_caches_schema() -> SchemaRef {
}
#[async_trait::async_trait]
impl IoxSystemTable for MetaCachesTable {
impl IoxSystemTable for DistinctCachesTable {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
@ -60,14 +63,14 @@ impl IoxSystemTable for MetaCachesTable {
let caches = self
.provider
.get_cache_definitions_for_db(&self.db_schema.id);
from_meta_cache_definitions(&self.db_schema, self.schema(), &caches)
from_distinct_cache_definitions(&self.db_schema, self.schema(), &caches)
}
}
fn from_meta_cache_definitions(
fn from_distinct_cache_definitions(
db_schema: &DatabaseSchema,
sys_table_schema: SchemaRef,
cache_definitions: &[MetaCacheDefinition],
cache_definitions: &[DistinctCacheDefinition],
) -> Result<RecordBatch, DataFusionError> {
let mut table_name_arr = StringViewBuilder::with_capacity(cache_definitions.len());
let mut cache_name_arr = StringViewBuilder::with_capacity(cache_definitions.len());
@ -90,7 +93,7 @@ fn from_meta_cache_definitions(
for cache in cache_definitions {
let table_def = db_schema
.table_definition_by_id(&cache.table_id)
.expect("table should exist for metadata cache");
.expect("table should exist for distinct value cache");
table_name_arr.append_value(&cache.table_name);
cache_name_arr.append_value(&cache.cache_name);

View File

@ -7,19 +7,19 @@ use datafusion::{
logical_expr::{col, BinaryExpr, Expr, Operator},
scalar::ScalarValue,
};
use distinct_caches::DistinctCachesTable;
use influxdb3_catalog::catalog::DatabaseSchema;
use influxdb3_sys_events::SysEventStore;
use influxdb3_write::WriteBuffer;
use iox_query::query_log::QueryLog;
use iox_system_tables::SystemTableProvider;
use meta_caches::MetaCachesTable;
use parquet_files::ParquetFilesTable;
use tonic::async_trait;
use self::{last_caches::LastCachesTable, queries::QueriesTable};
mod distinct_caches;
mod last_caches;
mod meta_caches;
mod parquet_files;
use crate::system_tables::python_call::{
ProcessingEnginePluginTable, ProcessingEngineTriggerTable,
@ -33,7 +33,7 @@ pub const TABLE_NAME_PREDICATE: &str = "table_name";
pub(crate) const QUERIES_TABLE_NAME: &str = "queries";
pub(crate) const LAST_CACHES_TABLE_NAME: &str = "last_caches";
pub(crate) const META_CACHES_TABLE_NAME: &str = "meta_caches";
pub(crate) const DISTINCT_CACHES_TABLE_NAME: &str = "distinct_caches";
pub(crate) const PARQUET_FILES_TABLE_NAME: &str = "parquet_files";
const PROCESSING_ENGINE_PLUGINS_TABLE_NAME: &str = "processing_engine_plugins";
@ -102,11 +102,10 @@ impl AllSystemSchemaTablesProvider {
buffer.last_cache_provider(),
))));
tables.insert(LAST_CACHES_TABLE_NAME, last_caches);
let meta_caches = Arc::new(SystemTableProvider::new(Arc::new(MetaCachesTable::new(
Arc::clone(&db_schema),
buffer.meta_cache_provider(),
))));
tables.insert(META_CACHES_TABLE_NAME, meta_caches);
let distinct_caches = Arc::new(SystemTableProvider::new(Arc::new(
DistinctCachesTable::new(Arc::clone(&db_schema), buffer.distinct_cache_provider()),
)));
tables.insert(DISTINCT_CACHES_TABLE_NAME, distinct_caches);
let parquet_files = Arc::new(SystemTableProvider::new(Arc::new(ParquetFilesTable::new(
db_schema.id,
buffer,

View File

@ -321,8 +321,8 @@ pub enum CatalogOp {
CreateDatabase(DatabaseDefinition),
CreateTable(TableDefinition),
AddFields(FieldAdditions),
CreateMetaCache(MetaCacheDefinition),
DeleteMetaCache(MetaCacheDelete),
CreateDistinctCache(DistinctCacheDefinition),
DeleteDistinctCache(DistinctCacheDelete),
CreateLastCache(LastCacheDefinition),
DeleteLastCache(LastCacheDelete),
DeleteDatabase(DeleteDatabaseDefinition),
@ -586,16 +586,16 @@ pub struct LastCacheDelete {
pub name: Arc<str>,
}
/// Defines a metadata cache in a given table and database
/// Defines a distinct value cache in a given table and database
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct MetaCacheDefinition {
pub struct DistinctCacheDefinition {
/// The id of the associated table
pub table_id: TableId,
/// The name of the associated table
pub table_name: Arc<str>,
/// The name of the cache, is unique within the associated table
pub cache_name: Arc<str>,
/// The ids of columns tracked by this metadata cache, in the defined order
/// The ids of columns tracked by this distinct value cache, in the defined order
pub column_ids: Vec<ColumnId>,
/// The maximum number of distinct value combintions the cache will hold
pub max_cardinality: usize,
@ -604,7 +604,7 @@ pub struct MetaCacheDefinition {
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct MetaCacheDelete {
pub struct DistinctCacheDelete {
pub table_name: Arc<str>,
pub table_id: TableId,
pub cache_name: Arc<str>,

View File

@ -11,27 +11,21 @@ pub mod write_buffer;
use async_trait::async_trait;
use data_types::{NamespaceName, TimestampMinMax};
use datafusion::catalog::Session;
use datafusion::error::DataFusionError;
use datafusion::prelude::Expr;
use influxdb3_cache::last_cache::LastCacheProvider;
use influxdb3_cache::meta_cache::CreateMetaCacheArgs;
use influxdb3_cache::meta_cache::MetaCacheProvider;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_catalog::catalog::CatalogSequenceNumber;
use influxdb3_catalog::catalog::DatabaseSchema;
use influxdb3_id::ParquetFileId;
use influxdb3_id::SerdeVecMap;
use influxdb3_id::TableId;
use influxdb3_id::{ColumnId, DbId};
use influxdb3_wal::{LastCacheDefinition, SnapshotSequenceNumber, WalFileSequenceNumber};
use influxdb3_wal::{MetaCacheDefinition, Wal};
use datafusion::{catalog::Session, error::DataFusionError, prelude::Expr};
use influxdb3_cache::{
distinct_cache::{CreateDistinctCacheArgs, DistinctCacheProvider},
last_cache::LastCacheProvider,
};
use influxdb3_catalog::catalog::{Catalog, CatalogSequenceNumber, DatabaseSchema};
use influxdb3_id::{ColumnId, DbId, ParquetFileId, SerdeVecMap, TableId};
use influxdb3_wal::{
DistinctCacheDefinition, LastCacheDefinition, SnapshotSequenceNumber, Wal,
WalFileSequenceNumber,
};
use iox_query::QueryChunk;
use iox_time::Time;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use std::{fmt::Debug, sync::Arc, time::Duration};
use thiserror::Error;
#[derive(Debug, Error)]
@ -49,7 +43,7 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
pub trait WriteBuffer:
Bufferer + ChunkContainer + MetaCacheManager + LastCacheManager + DatabaseManager
Bufferer + ChunkContainer + DistinctCacheManager + LastCacheManager + DatabaseManager
{
}
@ -112,21 +106,23 @@ pub trait ChunkContainer: Debug + Send + Sync + 'static {
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError>;
}
/// [`MetaCacheManager`] is used to manage interaction with a [`MetaCacheProvider`]. This enables
/// [`DistinctCacheManager`] is used to manage interaction with a [`DistinctCacheProvider`]. This enables
/// cache creation, deletion, and getting access to existing
#[async_trait::async_trait]
pub trait MetaCacheManager: Debug + Send + Sync + 'static {
/// Get a reference to the metadata cache provider
fn meta_cache_provider(&self) -> Arc<MetaCacheProvider>;
pub trait DistinctCacheManager: Debug + Send + Sync + 'static {
/// Get a reference to the distinct value cache provider
fn distinct_cache_provider(&self) -> Arc<DistinctCacheProvider>;
async fn create_meta_cache(
/// Create a new distinct value cache
async fn create_distinct_cache(
&self,
db_schema: Arc<DatabaseSchema>,
cache_name: Option<String>,
args: CreateMetaCacheArgs,
) -> Result<Option<MetaCacheDefinition>, write_buffer::Error>;
args: CreateDistinctCacheArgs,
) -> Result<Option<DistinctCacheDefinition>, write_buffer::Error>;
async fn delete_meta_cache(
/// Delete a distinct value cache
async fn delete_distinct_cache(
&self,
db_id: &DbId,
tbl_id: &TableId,

View File

@ -12,7 +12,7 @@ use crate::write_buffer::queryable_buffer::QueryableBuffer;
use crate::write_buffer::validator::WriteValidator;
use crate::{chunk::ParquetChunk, DatabaseManager};
use crate::{
BufferedWriteRequest, Bufferer, ChunkContainer, LastCacheManager, MetaCacheManager,
BufferedWriteRequest, Bufferer, ChunkContainer, DistinctCacheManager, LastCacheManager,
ParquetFile, PersistedSnapshot, Precision, WriteBuffer, WriteLineError,
};
use async_trait::async_trait;
@ -24,8 +24,8 @@ use datafusion::catalog::Session;
use datafusion::common::DataFusionError;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::logical_expr::Expr;
use influxdb3_cache::distinct_cache::{self, CreateDistinctCacheArgs, DistinctCacheProvider};
use influxdb3_cache::last_cache::{self, LastCacheProvider};
use influxdb3_cache::meta_cache::{self, CreateMetaCacheArgs, MetaCacheProvider};
use influxdb3_cache::parquet_cache::ParquetCacheOracle;
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema};
use influxdb3_id::{ColumnId, DbId, TableId};
@ -33,8 +33,8 @@ use influxdb3_wal::FieldDataType;
use influxdb3_wal::TableDefinition;
use influxdb3_wal::{object_store::WalObjectStore, DeleteDatabaseDefinition};
use influxdb3_wal::{
CatalogBatch, CatalogOp, LastCacheDefinition, LastCacheDelete, LastCacheSize,
MetaCacheDefinition, MetaCacheDelete, Wal, WalConfig, WalFileNotifier, WalOp,
CatalogBatch, CatalogOp, DistinctCacheDefinition, DistinctCacheDelete, LastCacheDefinition,
LastCacheDelete, LastCacheSize, Wal, WalConfig, WalFileNotifier, WalOp,
};
use influxdb3_wal::{CatalogOp::CreateLastCache, DeleteTableDefinition};
use influxdb3_wal::{DatabaseDefinition, FieldDefinition};
@ -114,8 +114,8 @@ pub enum Error {
#[error("cannot write to a read-only server")]
NoWriteInReadOnly,
#[error("error in metadata cache: {0}")]
MetaCacheError(#[from] meta_cache::ProviderError),
#[error("error in distinct value cache: {0}")]
DistinctCacheError(#[from] distinct_cache::ProviderError),
#[error("error: {0}")]
AnyhowError(#[from] anyhow::Error),
@ -144,7 +144,7 @@ pub struct WriteBufferImpl {
wal: Arc<dyn Wal>,
time_provider: Arc<dyn TimeProvider>,
metrics: WriteMetrics,
meta_cache: Arc<MetaCacheProvider>,
distinct_cache: Arc<DistinctCacheProvider>,
last_cache: Arc<LastCacheProvider>,
}
@ -156,7 +156,7 @@ pub struct WriteBufferImplArgs {
pub persister: Arc<Persister>,
pub catalog: Arc<Catalog>,
pub last_cache: Arc<LastCacheProvider>,
pub meta_cache: Arc<MetaCacheProvider>,
pub distinct_cache: Arc<DistinctCacheProvider>,
pub time_provider: Arc<dyn TimeProvider>,
pub executor: Arc<iox_query::exec::Executor>,
pub wal_config: WalConfig,
@ -170,7 +170,7 @@ impl WriteBufferImpl {
persister,
catalog,
last_cache,
meta_cache,
distinct_cache,
time_provider,
executor,
wal_config,
@ -204,7 +204,7 @@ impl WriteBufferImpl {
catalog: Arc::clone(&catalog),
persister: Arc::clone(&persister),
last_cache_provider: Arc::clone(&last_cache),
meta_cache_provider: Arc::clone(&meta_cache),
distinct_cache_provider: Arc::clone(&distinct_cache),
persisted_files: Arc::clone(&persisted_files),
parquet_cache: parquet_cache.clone(),
}));
@ -229,7 +229,7 @@ impl WriteBufferImpl {
wal_config,
wal,
time_provider,
meta_cache,
distinct_cache,
last_cache,
persisted_files,
buffer: queryable_buffer,
@ -445,23 +445,23 @@ impl ChunkContainer for WriteBufferImpl {
}
#[async_trait::async_trait]
impl MetaCacheManager for WriteBufferImpl {
fn meta_cache_provider(&self) -> Arc<MetaCacheProvider> {
Arc::clone(&self.meta_cache)
impl DistinctCacheManager for WriteBufferImpl {
fn distinct_cache_provider(&self) -> Arc<DistinctCacheProvider> {
Arc::clone(&self.distinct_cache)
}
async fn create_meta_cache(
async fn create_distinct_cache(
&self,
db_schema: Arc<DatabaseSchema>,
cache_name: Option<String>,
args: CreateMetaCacheArgs,
) -> Result<Option<MetaCacheDefinition>, Error> {
args: CreateDistinctCacheArgs,
) -> Result<Option<DistinctCacheDefinition>, Error> {
if let Some(new_cache_definition) = self
.meta_cache
.distinct_cache
.create_cache(db_schema.id, cache_name.map(Into::into), args)
.map_err(Error::MetaCacheError)?
.map_err(Error::DistinctCacheError)?
{
let catalog_op = CatalogOp::CreateMetaCache(new_cache_definition.clone());
let catalog_op = CatalogOp::CreateDistinctCache(new_cache_definition.clone());
let catalog_batch = CatalogBatch {
database_id: db_schema.id,
database_name: db_schema.name.clone(),
@ -479,7 +479,7 @@ impl MetaCacheManager for WriteBufferImpl {
}
}
async fn delete_meta_cache(
async fn delete_distinct_cache(
&self,
db_id: &DbId,
tbl_id: &TableId,
@ -487,12 +487,13 @@ impl MetaCacheManager for WriteBufferImpl {
) -> Result<(), Error> {
let catalog = self.catalog();
let db_schema = catalog.db_schema_by_id(db_id).expect("db should exist");
self.meta_cache.delete_cache(db_id, tbl_id, cache_name)?;
self.distinct_cache
.delete_cache(db_id, tbl_id, cache_name)?;
let catalog_batch = CatalogBatch {
database_id: *db_id,
database_name: Arc::clone(&db_schema.name),
time_ns: self.time_provider.now().timestamp_nanos(),
ops: vec![CatalogOp::DeleteMetaCache(MetaCacheDelete {
ops: vec![CatalogOp::DeleteDistinctCache(DistinctCacheDelete {
table_name: db_schema.table_id_to_name(tbl_id).expect("table exists"),
table_id: *tbl_id,
cache_name: cache_name.into(),
@ -917,14 +918,16 @@ mod tests {
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
let catalog = Arc::new(persister.load_or_create_catalog().await.unwrap());
let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap();
let meta_cache =
MetaCacheProvider::new_from_catalog(Arc::clone(&time_provider), Arc::clone(&catalog))
.unwrap();
let distinct_cache = DistinctCacheProvider::new_from_catalog(
Arc::clone(&time_provider),
Arc::clone(&catalog),
)
.unwrap();
let write_buffer = WriteBufferImpl::new(WriteBufferImplArgs {
persister: Arc::clone(&persister),
catalog,
last_cache,
meta_cache,
distinct_cache,
time_provider: Arc::clone(&time_provider),
executor: crate::test_help::make_exec(),
wal_config: WalConfig::test_config(),
@ -996,14 +999,16 @@ mod tests {
// now load a new buffer from object storage
let catalog = Arc::new(persister.load_or_create_catalog().await.unwrap());
let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap();
let meta_cache =
MetaCacheProvider::new_from_catalog(Arc::clone(&time_provider), Arc::clone(&catalog))
.unwrap();
let distinct_cache = DistinctCacheProvider::new_from_catalog(
Arc::clone(&time_provider),
Arc::clone(&catalog),
)
.unwrap();
let write_buffer = WriteBufferImpl::new(WriteBufferImplArgs {
persister,
catalog,
last_cache,
meta_cache,
distinct_cache,
time_provider,
executor: crate::test_help::make_exec(),
wal_config: WalConfig {
@ -1062,7 +1067,7 @@ mod tests {
let catalog = Arc::new(persister.load_or_create_catalog().await.unwrap());
let last_cache =
LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap();
let meta_cache = MetaCacheProvider::new_from_catalog(
let distinct_cache = DistinctCacheProvider::new_from_catalog(
Arc::clone(&time_provider),
Arc::clone(&catalog),
)
@ -1071,7 +1076,7 @@ mod tests {
persister: Arc::clone(&wbuf.persister),
catalog,
last_cache,
meta_cache,
distinct_cache,
time_provider,
executor: Arc::clone(&wbuf.buffer.executor),
wal_config: WalConfig {
@ -1291,14 +1296,16 @@ mod tests {
.unwrap(),
);
let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap();
let meta_cache =
MetaCacheProvider::new_from_catalog(Arc::clone(&time_provider), Arc::clone(&catalog))
.unwrap();
let distinct_cache = DistinctCacheProvider::new_from_catalog(
Arc::clone(&time_provider),
Arc::clone(&catalog),
)
.unwrap();
let write_buffer = WriteBufferImpl::new(WriteBufferImplArgs {
persister: Arc::clone(&write_buffer.persister),
catalog,
last_cache,
meta_cache,
distinct_cache,
time_provider: Arc::clone(&write_buffer.time_provider),
executor: Arc::clone(&write_buffer.buffer.executor),
wal_config: WalConfig {
@ -2581,14 +2588,16 @@ mod tests {
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
let catalog = Arc::new(persister.load_or_create_catalog().await.unwrap());
let last_cache = LastCacheProvider::new_from_catalog(Arc::clone(&catalog) as _).unwrap();
let meta_cache =
MetaCacheProvider::new_from_catalog(Arc::clone(&time_provider), Arc::clone(&catalog))
.unwrap();
let distinct_cache = DistinctCacheProvider::new_from_catalog(
Arc::clone(&time_provider),
Arc::clone(&catalog),
)
.unwrap();
let wbuf = WriteBufferImpl::new(WriteBufferImplArgs {
persister,
catalog,
last_cache,
meta_cache,
distinct_cache,
time_provider: Arc::clone(&time_provider),
executor: crate::test_help::make_exec(),
wal_config,

View File

@ -16,8 +16,8 @@ use datafusion::common::DataFusionError;
use datafusion::logical_expr::Expr;
use datafusion_util::stream_from_batches;
use hashbrown::HashMap;
use influxdb3_cache::distinct_cache::DistinctCacheProvider;
use influxdb3_cache::last_cache::LastCacheProvider;
use influxdb3_cache::meta_cache::MetaCacheProvider;
use influxdb3_cache::parquet_cache::{CacheRequest, ParquetCacheOracle};
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema};
use influxdb3_id::{DbId, TableId};
@ -41,7 +41,7 @@ use tokio::sync::oneshot::{self, Receiver};
pub struct QueryableBuffer {
pub(crate) executor: Arc<Executor>,
catalog: Arc<Catalog>,
meta_cache_provider: Arc<MetaCacheProvider>,
distinct_cache_provider: Arc<DistinctCacheProvider>,
last_cache_provider: Arc<LastCacheProvider>,
persister: Arc<Persister>,
persisted_files: Arc<PersistedFiles>,
@ -57,7 +57,7 @@ pub struct QueryableBufferArgs {
pub catalog: Arc<Catalog>,
pub persister: Arc<Persister>,
pub last_cache_provider: Arc<LastCacheProvider>,
pub meta_cache_provider: Arc<MetaCacheProvider>,
pub distinct_cache_provider: Arc<DistinctCacheProvider>,
pub persisted_files: Arc<PersistedFiles>,
pub parquet_cache: Option<Arc<dyn ParquetCacheOracle>>,
}
@ -69,7 +69,7 @@ impl QueryableBuffer {
catalog,
persister,
last_cache_provider,
meta_cache_provider,
distinct_cache_provider,
persisted_files,
parquet_cache,
}: QueryableBufferArgs,
@ -81,7 +81,7 @@ impl QueryableBuffer {
executor,
catalog,
last_cache_provider,
meta_cache_provider,
distinct_cache_provider,
persister,
persisted_files,
buffer,
@ -148,7 +148,8 @@ impl QueryableBuffer {
/// Update the caches managed by the database
fn write_wal_contents_to_caches(&self, write: &WalContents) {
self.last_cache_provider.write_wal_contents_to_cache(write);
self.meta_cache_provider.write_wal_contents_to_cache(write);
self.distinct_cache_provider
.write_wal_contents_to_cache(write);
}
/// Called when the wal has persisted a new file. Buffer the contents in memory and update the
@ -159,7 +160,7 @@ impl QueryableBuffer {
buffer.buffer_ops(
&write.ops,
&self.last_cache_provider,
&self.meta_cache_provider,
&self.distinct_cache_provider,
);
}
@ -181,7 +182,7 @@ impl QueryableBuffer {
buffer.buffer_ops(
&write.ops,
&self.last_cache_provider,
&self.meta_cache_provider,
&self.distinct_cache_provider,
);
let mut persisting_chunks = vec![];
@ -430,7 +431,7 @@ impl BufferState {
&mut self,
ops: &[WalOp],
last_cache_provider: &LastCacheProvider,
meta_cache_provider: &MetaCacheProvider,
distinct_cache_provider: &DistinctCacheProvider,
) {
for op in ops {
match op {
@ -452,20 +453,20 @@ impl BufferState {
// eg. creating or deleting last cache itself
for op in catalog_batch.ops {
match op {
CatalogOp::CreateMetaCache(definition) => {
CatalogOp::CreateDistinctCache(definition) => {
let table_def = db_schema
.table_definition_by_id(&definition.table_id)
.expect("table should exist");
meta_cache_provider.create_from_definition(
distinct_cache_provider.create_from_definition(
db_schema.id,
table_def,
&definition,
);
}
CatalogOp::DeleteMetaCache(cache) => {
CatalogOp::DeleteDistinctCache(cache) => {
// this only fails if the db/table/cache do not exist, so we ignore
// the error if it happens.
let _ = meta_cache_provider.delete_cache(
let _ = distinct_cache_provider.delete_cache(
&db_schema.id,
&cache.table_id,
&cache.cache_name,
@ -497,7 +498,7 @@ impl BufferState {
self.db_to_table.remove(&db_definition.database_id);
last_cache_provider
.delete_caches_for_db(&db_definition.database_id);
meta_cache_provider
distinct_cache_provider
.delete_caches_for_db(&db_definition.database_id);
}
CatalogOp::DeleteTable(table_definition) => {
@ -505,7 +506,7 @@ impl BufferState {
&table_definition.database_id,
&table_definition.table_id,
);
meta_cache_provider.delete_caches_for_db_and_table(
distinct_cache_provider.delete_caches_for_db_and_table(
&table_definition.database_id,
&table_definition.table_id,
);
@ -748,7 +749,7 @@ mod tests {
catalog: Arc::clone(&catalog),
persister: Arc::clone(&persister),
last_cache_provider: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(),
meta_cache_provider: MetaCacheProvider::new_from_catalog(
distinct_cache_provider: DistinctCacheProvider::new_from_catalog(
Arc::clone(&time_provider),
Arc::clone(&catalog),
)