influxdb/influxdb3/tests/server/system_tables.rs

437 lines
17 KiB
Rust

use arrow_util::assert_batches_sorted_eq;
use influxdb3_client::Precision;
use serde_json::json;
use crate::server::{collect_stream, TestServer};
#[tokio::test]
async fn queries_table() {
let server = TestServer::spawn().await;
server
.write_lp_to_db(
"foo",
"cpu,host=s1,region=us-east usage=0.9 2998574931\n\
cpu,host=s1,region=us-east usage=0.89 2998574932\n\
cpu,host=s1,region=us-east usage=0.85 2998574933",
Precision::Second,
)
.await
.expect("write some lp");
let mut client = server.flight_sql_client("foo").await;
// Check queries table for completed queries, will be empty:
{
let response = client
.query("SELECT COUNT(*) FROM system.queries WHERE running = false")
.await
.unwrap();
let batches = collect_stream(response).await;
assert_batches_sorted_eq!(
[
"+----------+",
"| count(*) |",
"+----------+",
"| 0 |",
"+----------+",
],
&batches
);
}
// Do some queries, to produce some query logs:
{
let queries = [
"SELECT * FROM cpu", // valid
"SELECT * FROM mem", // not valid table, will fail, and not be logged
"SELECT usage, time FROM cpu", // specific columns
];
for q in queries {
let response = client.query(q).await;
// collect the stream to make sure the query completes:
if let Ok(stream) = response {
let _batches = collect_stream(stream).await;
}
}
}
// Now check the log:
{
// A sub-set of columns is selected in this query, because the queries
// table contains may columns whose values change in susequent test runs
let response = client
.query(
"SELECT \
phase, \
query_type, \
query_text, \
success, \
running, \
cancelled \
FROM system.queries \
WHERE success = true",
)
.await
.unwrap();
let batches = collect_stream(response).await;
assert_batches_sorted_eq!(
[
"+---------+------------+--------------------------------------------------------------------------------+---------+---------+-----------+",
"| phase | query_type | query_text | success | running | cancelled |",
"+---------+------------+--------------------------------------------------------------------------------+---------+---------+-----------+",
"| success | flightsql | CommandStatementQuerySELECT * FROM cpu | true | false | false |",
"| success | flightsql | CommandStatementQuerySELECT COUNT(*) FROM system.queries WHERE running = false | true | false | false |",
"| success | flightsql | CommandStatementQuerySELECT usage, time FROM cpu | true | false | false |",
"+---------+------------+--------------------------------------------------------------------------------+---------+---------+-----------+",
],
&batches
);
}
}
#[tokio::test]
async fn last_caches_table() {
let server = TestServer::spawn().await;
let db1_name = "foo";
let db2_name = "bar";
// Write some LP to initialize the catalog
server
.write_lp_to_db(
db1_name,
"\
cpu,region=us,host=a,cpu=1 usage=90\n\
mem,region=us,host=a usage=500\n\
",
Precision::Second,
)
.await
.expect("write to db");
server
.write_lp_to_db(
db2_name,
"\
cpu,region=us,host=a,cpu=1 usage=90\n\
mem,region=us,host=a usage=500\n\
",
Precision::Second,
)
.await
.expect("write to db");
// Check that there are no last caches:
{
let resp = server
.flight_sql_client(db1_name)
.await
.query("SELECT * FROM system.last_caches")
.await
.unwrap();
let batches = collect_stream(resp).await;
assert_batches_sorted_eq!(["++", "++",], &batches);
}
{
let resp = server
.flight_sql_client(db2_name)
.await
.query("SELECT * FROM system.last_caches")
.await
.unwrap();
let batches = collect_stream(resp).await;
assert_batches_sorted_eq!(["++", "++",], &batches);
}
// Create some last caches, two on DB1 and one on DB2:
assert!(server
.api_v3_configure_last_cache_create(&json!({
"db": db1_name,
"table": "cpu",
"key_columns": ["host"],
}))
.await
.status()
.is_success());
assert!(server
.api_v3_configure_last_cache_create(&json!({
"db": db1_name,
"table": "mem",
"name": "mem_last_cache",
"value_columns": ["usage"],
"ttl": 60
}))
.await
.status()
.is_success());
assert!(server
.api_v3_configure_last_cache_create(&json!({
"db": db2_name,
"table": "cpu",
"count": 5
}))
.await
.status()
.is_success());
// Check the system table for each DB:
{
let resp = server
.flight_sql_client(db1_name)
.await
.query("SELECT * FROM system.last_caches")
.await
.unwrap();
let batches = collect_stream(resp).await;
assert_batches_sorted_eq!(
[
"+-------+---------------------+----------------+------------------+------------------+--------------------+-------+-------+",
"| table | name | key_column_ids | key_column_names | value_column_ids | value_column_names | count | ttl |",
"+-------+---------------------+----------------+------------------+------------------+--------------------+-------+-------+",
"| cpu | cpu_host_last_cache | [1] | [host] | | | 1 | 14400 |",
"| mem | mem_last_cache | [5, 6] | [region, host] | [7, 8] | [usage, time] | 1 | 60 |",
"+-------+---------------------+----------------+------------------+------------------+--------------------+-------+-------+",
],
&batches
);
}
{
let resp = server
.flight_sql_client(db2_name)
.await
.query("SELECT * FROM system.last_caches")
.await
.unwrap();
let batches = collect_stream(resp).await;
assert_batches_sorted_eq!([
"+-------+--------------------------------+----------------+---------------------+------------------+--------------------+-------+-------+",
"| table | name | key_column_ids | key_column_names | value_column_ids | value_column_names | count | ttl |",
"+-------+--------------------------------+----------------+---------------------+------------------+--------------------+-------+-------+",
"| cpu | cpu_region_host_cpu_last_cache | [9, 10, 11] | [region, host, cpu] | | | 5 | 14400 |",
"+-------+--------------------------------+----------------+---------------------+------------------+--------------------+-------+-------+",
],
&batches
);
}
// Make some changes to the caches and check the system table
// Delete one of the caches:
{
assert!(server
.api_v3_configure_last_cache_delete(&json!({
"db": db1_name,
"table": "cpu",
"name": "cpu_host_last_cache",
}))
.await
.status()
.is_success());
let resp = server
.flight_sql_client(db1_name)
.await
.query("SELECT * FROM system.last_caches")
.await
.unwrap();
let batches = collect_stream(resp).await;
assert_batches_sorted_eq!(
[
"+-------+----------------+----------------+------------------+------------------+--------------------+-------+-----+",
"| table | name | key_column_ids | key_column_names | value_column_ids | value_column_names | count | ttl |",
"+-------+----------------+----------------+------------------+------------------+--------------------+-------+-----+",
"| mem | mem_last_cache | [5, 6] | [region, host] | [7, 8] | [usage, time] | 1 | 60 |",
"+-------+----------------+----------------+------------------+------------------+--------------------+-------+-----+",
],
&batches
);
}
// Add fields to one of the caches, in this case, the `temp` field will get added to the
// value columns for the respective cache, but since this cache accepts new fields, the value
// columns are not shown in the system table result:
{
server
.write_lp_to_db(
db2_name,
"cpu,region=us,host=a,cpu=2 usage=40,temp=95",
Precision::Second,
)
.await
.unwrap();
let resp = server
.flight_sql_client(db2_name)
.await
.query("SELECT * FROM system.last_caches")
.await
.unwrap();
let batches = collect_stream(resp).await;
assert_batches_sorted_eq!([
"+-------+--------------------------------+----------------+---------------------+------------------+--------------------+-------+-------+",
"| table | name | key_column_ids | key_column_names | value_column_ids | value_column_names | count | ttl |",
"+-------+--------------------------------+----------------+---------------------+------------------+--------------------+-------+-------+",
"| cpu | cpu_region_host_cpu_last_cache | [9, 10, 11] | [region, host, cpu] | | | 5 | 14400 |",
"+-------+--------------------------------+----------------+---------------------+------------------+--------------------+-------+-------+",
],
&batches
);
}
}
#[tokio::test]
async fn distinct_caches_table() {
let server = TestServer::spawn().await;
let db_1_name = "foo";
let db_2_name = "bar";
// write some lp to both db's to initialize the catalog:
server
.write_lp_to_db(
db_1_name,
"\
cpu,region=us-east,host=a usage=90\n\
mem,region=us-east,host=a usage=90\n\
",
Precision::Second,
)
.await
.unwrap();
server
.write_lp_to_db(
db_2_name,
"\
cpu,region=us-east,host=a usage=90\n\
mem,region=us-east,host=a usage=90\n\
",
Precision::Second,
)
.await
.unwrap();
// check that there are no distinct caches:
for db_name in [db_1_name, db_2_name] {
let response_stream = server
.flight_sql_client(db_name)
.await
.query("SELECT * FROM system.distinct_caches")
.await
.unwrap();
let batches = collect_stream(response_stream).await;
assert_batches_sorted_eq!(["++", "++",], &batches);
}
// create some distinct caches on the two databases:
assert!(server
.api_v3_configure_distinct_cache_create(&json!({
"db": db_1_name,
"table": "cpu",
"columns": ["region", "host"],
}))
.await
.status()
.is_success());
assert!(server
.api_v3_configure_distinct_cache_create(&json!({
"db": db_1_name,
"table": "mem",
"columns": ["region", "host"],
"max_cardinality": 1_000,
}))
.await
.status()
.is_success());
assert!(server
.api_v3_configure_distinct_cache_create(&json!({
"db": db_2_name,
"table": "cpu",
"columns": ["host"],
"max_age": 1_000,
}))
.await
.status()
.is_success());
// check the system table query for each db:
{
let response_stream = server
.flight_sql_client(db_1_name)
.await
.query("SELECT * FROM system.distinct_caches")
.await
.unwrap();
let batches = collect_stream(response_stream).await;
assert_batches_sorted_eq!([
"+-------+--------------------------------+------------+----------------+-----------------+-----------------+",
"| table | name | column_ids | column_names | max_cardinality | max_age_seconds |",
"+-------+--------------------------------+------------+----------------+-----------------+-----------------+",
"| cpu | cpu_region_host_distinct_cache | [0, 1] | [region, host] | 100000 | 86400 |",
"| mem | mem_region_host_distinct_cache | [4, 5] | [region, host] | 1000 | 86400 |",
"+-------+--------------------------------+------------+----------------+-----------------+-----------------+",
], &batches);
}
{
let response_stream = server
.flight_sql_client(db_2_name)
.await
.query("SELECT * FROM system.distinct_caches")
.await
.unwrap();
let batches = collect_stream(response_stream).await;
assert_batches_sorted_eq!([
"+-------+-------------------------+------------+--------------+-----------------+-----------------+",
"| table | name | column_ids | column_names | max_cardinality | max_age_seconds |",
"+-------+-------------------------+------------+--------------+-----------------+-----------------+",
"| cpu | cpu_host_distinct_cache | [9] | [host] | 100000 | 1000 |",
"+-------+-------------------------+------------+--------------+-----------------+-----------------+",
], &batches);
}
// delete caches and check that the system tables reflect those changes:
assert!(server
.api_v3_configure_distinct_cache_delete(&json!({
"db": db_1_name,
"table": "cpu",
"name": "cpu_region_host_distinct_cache",
}))
.await
.status()
.is_success());
assert!(server
.api_v3_configure_distinct_cache_delete(&json!({
"db": db_2_name,
"table": "cpu",
"name": "cpu_host_distinct_cache",
}))
.await
.status()
.is_success());
// check the system tables again:
{
let response_stream = server
.flight_sql_client(db_1_name)
.await
.query("SELECT * FROM system.distinct_caches")
.await
.unwrap();
let batches = collect_stream(response_stream).await;
assert_batches_sorted_eq!([
"+-------+--------------------------------+------------+----------------+-----------------+-----------------+",
"| table | name | column_ids | column_names | max_cardinality | max_age_seconds |",
"+-------+--------------------------------+------------+----------------+-----------------+-----------------+",
"| mem | mem_region_host_distinct_cache | [4, 5] | [region, host] | 1000 | 86400 |",
"+-------+--------------------------------+------------+----------------+-----------------+-----------------+",
], &batches);
}
{
let response_stream = server
.flight_sql_client(db_2_name)
.await
.query("SELECT * FROM system.distinct_caches")
.await
.unwrap();
let batches = collect_stream(response_stream).await;
assert_batches_sorted_eq!(["++", "++",], &batches);
}
}