feat: support `SHOW RETENTION POLICIES` (#24729)

feat: support SHOW RETENTION POLICIES

Added support through the influxdb3 Query Executor to perform
SHOW RETENTION POLICIES queries, both on a specific database as well
as accross all databases.

Test cases were added to check this functionality.
pull/24737/head
Trevor Hilton 2024-03-05 15:40:58 -05:00 committed by GitHub
parent 423308dcd4
commit fb4f09d675
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 182 additions and 25 deletions

37
Cargo.lock generated
View File

@ -760,6 +760,20 @@ name = "bytemuck"
version = "1.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2ef034f05691a48569bd920a96c81b9d91bbad1ab5ac7c4616c1f6ef36cb79f"
dependencies = [
"bytemuck_derive",
]
[[package]]
name = "bytemuck_derive"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "965ab7eb5f8f97d2a083c799f3a1b994fc397b2fe2da5d1da1626ce15a39f2b1"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.52",
]
[[package]]
name = "byteorder"
@ -2146,6 +2160,7 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5eceaaeec696539ddaf7b333340f1af35a5aa87ae3e4f3ead0532f72affab2e"
dependencies = [
"bytemuck",
"cfg-if",
"crunchy",
"num-traits",
@ -2546,6 +2561,7 @@ dependencies = [
"pretty_assertions",
"schema",
"serde",
"serde_arrow",
"serde_json",
"serde_urlencoded",
"service_common",
@ -3710,9 +3726,9 @@ dependencies = [
[[package]]
name = "object_store"
version = "0.9.0"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d139f545f64630e2e3688fd9f81c470888ab01edeb72d13b4e86c566f1130000"
checksum = "b8718f8b65fdf67a45108d1548347d4af7d71fb81ce727bbf9e3b2535e079db3"
dependencies = [
"async-trait",
"base64",
@ -3722,6 +3738,7 @@ dependencies = [
"humantime",
"hyper",
"itertools 0.12.1",
"md-5",
"parking_lot",
"percent-encoding",
"quick-xml 0.31.0",
@ -4936,6 +4953,22 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_arrow"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99e774f158723ad8fe05b884fd2f89355934599a11817edd0dc854cb1ef0e42a"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"bytemuck",
"chrono",
"half",
"serde",
]
[[package]]
name = "serde_derive"
version = "1.0.197"

View File

@ -73,6 +73,7 @@ prost-types = "0.12.3"
reqwest = { version = "0.11.24", default-features = false, features = ["rustls-tls"] }
secrecy = "0.8.0"
serde = { version = "1.0", features = ["derive"] }
serde_arrow = { version = "0.10", features = ["arrow-50"] }
serde_json = "1.0"
serde_urlencoded = "0.7.0"
sha2 = "0.10.8"

View File

@ -66,6 +66,21 @@ async fn api_v3_query_influxql() {
.await
.unwrap();
// write to another database for `SHOW DATABASES` and `SHOW RETENTION POLICIES`
server
.write_lp_to_db(
"bar",
"cpu,host=s1,region=us-east usage=0.9 1\n\
cpu,host=s1,region=us-east usage=0.89 2\n\
cpu,host=s1,region=us-east usage=0.85 3\n\
mem,host=s1,region=us-east usage=0.5 4\n\
mem,host=s1,region=us-east usage=0.6 5\n\
mem,host=s1,region=us-east usage=0.7 6",
Precision::Nanosecond,
)
.await
.unwrap();
struct TestCase<'a> {
database: Option<&'a str>,
query: &'a str,
@ -202,9 +217,38 @@ async fn api_v3_query_influxql() {
expected: "+---------------+\n\
| iox::database |\n\
+---------------+\n\
| bar |\n\
| foo |\n\
+---------------+",
},
TestCase {
database: None,
query: "SHOW RETENTION POLICIES",
expected: "+---------------+---------+----------+\n\
| iox::database | name | duration |\n\
+---------------+---------+----------+\n\
| bar | autogen | |\n\
| foo | autogen | |\n\
+---------------+---------+----------+",
},
TestCase {
database: None,
query: "SHOW RETENTION POLICIES ON foo",
expected: "+---------------+---------+----------+\n\
| iox::database | name | duration |\n\
+---------------+---------+----------+\n\
| foo | autogen | |\n\
+---------------+---------+----------+",
},
TestCase {
database: Some("foo"),
query: "SHOW RETENTION POLICIES",
expected: "+---------------+---------+----------+\n\
| iox::database | name | duration |\n\
+---------------+---------+----------+\n\
| foo | autogen | |\n\
+---------------+---------+----------+",
},
];
for t in test_cases {

View File

@ -49,6 +49,7 @@ object_store.workspace = true
parking_lot.workspace = true
pin-project-lite.workspace = true
serde.workspace = true
serde_arrow.workspace = true
serde_json.workspace = true
serde_urlencoded.workspace = true
sha2.workspace = true

View File

@ -177,17 +177,14 @@ pub enum Error {
#[error("must provide only one InfluxQl statement per query")]
InfluxqlSingleStatement,
#[error(
"must specify a 'database' parameter, or provide the database\
in the InfluxQL query, for queries that support ON clauses"
)]
#[error("must specify a 'db' parameter, or provide the database in the InfluxQL query")]
InfluxqlNoDatabase,
#[error(
"provided a database in both the parameters ({param_db}) and\
query string ({query_db}) that do not match, if providing a query\
with an ON clause, you can omit the 'database' parameter from your\
request"
"provided a database in both the parameters ({param_db}) and \
query string ({query_db}) that do not match, if providing a query \
that specifies the database, you can omit the 'database' parameter \
from your request"
)]
InfluxqlDatabaseMismatch { param_db: String, query_db: String },
}
@ -372,25 +369,32 @@ where
if statements.len() != 1 {
return Err(Error::InfluxqlSingleStatement);
}
let statement = statements.pop().unwrap();
let database = match (database, statement.resolve_dbrp()) {
(None, None) => None,
(None, Some(db)) | (Some(db), None) => Some(db),
(Some(p), Some(q)) => {
if p == q {
Some(p)
} else {
return Err(Error::InfluxqlDatabaseMismatch {
param_db: p,
query_db: q,
});
}
}
};
let stream = if statement.statement().is_show_databases() {
self.query_executor.show_databases()?
} else if statement.statement().is_show_retention_policies() {
self.query_executor
.show_retention_policies(database.as_deref(), None)
.await?
} else {
let database = match (database, statement.resolve_dbrp()) {
(None, None) => return Err(Error::InfluxqlNoDatabase),
(None, Some(s)) | (Some(s), None) => s,
(Some(p), Some(q)) => {
if p == q {
p
} else {
return Err(Error::InfluxqlDatabaseMismatch {
param_db: p,
query_db: q,
});
}
}
let Some(database) = database else {
return Err(Error::InfluxqlNoDatabase);
};
self.query_executor

View File

@ -137,6 +137,12 @@ pub trait QueryExecutor: QueryNamespaceProvider + Debug + Send + Sync + 'static
) -> Result<SendableRecordBatchStream, Self::Error>;
fn show_databases(&self) -> Result<SendableRecordBatchStream, Self::Error>;
async fn show_retention_policies(
&self,
database: Option<&str>,
span_ctx: Option<SpanContext>,
) -> Result<SendableRecordBatchStream, Self::Error>;
}
#[derive(Debug)]

View File

@ -38,6 +38,8 @@ use metric::Registry;
use observability_deps::tracing::{debug, info, trace};
use schema::sort::SortKey;
use schema::Schema;
use serde::{Deserialize, Serialize};
use serde_arrow::schema::SchemaLike;
use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
@ -149,13 +151,75 @@ impl<W: WriteBuffer> QueryExecutor for QueryExecutorImpl<W> {
}
fn show_databases(&self) -> Result<SendableRecordBatchStream, Self::Error> {
let databases = StringArray::from(self.catalog.list_databases());
let mut databases = self.catalog.list_databases();
// sort them to ensure consistent order:
databases.sort_unstable();
let databases = StringArray::from(databases);
let schema =
DatafusionSchema::new(vec![Field::new("iox::database", DataType::Utf8, false)]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(databases)])
.map_err(Error::DatabasesToRecordBatch)?;
Ok(Box::pin(MemoryStream::new(vec![batch])))
}
async fn show_retention_policies(
&self,
database: Option<&str>,
span_ctx: Option<SpanContext>,
) -> Result<SendableRecordBatchStream, Self::Error> {
let mut databases = if let Some(db) = database {
vec![db.to_owned()]
} else {
self.catalog.list_databases()
};
// sort them to ensure consistent order:
databases.sort_unstable();
let mut records = Vec::with_capacity(databases.len());
for database in databases {
let db = self
.db(&database, span_ctx.child_span("get database"), false)
.await
.map_err(|_| Error::DatabaseNotFound {
db_name: database.to_string(),
})?
.ok_or_else(|| Error::DatabaseNotFound {
db_name: database.to_string(),
})?;
let duration = db.retention_time_ns();
let (db_name, rp_name) = split_database_name(&database);
records.push(RetentionPolicyRecord {
database: db_name,
name: rp_name,
duration,
});
}
let fields = Vec::<Field>::from_type::<RetentionPolicyRecord>(Default::default())?;
let arrays = serde_arrow::to_arrow(&fields, &records)?;
let schema = DatafusionSchema::new(fields);
let batch = RecordBatch::try_new(Arc::new(schema), arrays)
.map_err(Error::RetentionPoliciesToRecordBatch)?;
Ok(Box::pin(MemoryStream::new(vec![batch])))
}
}
#[derive(Debug, Serialize, Deserialize)]
struct RetentionPolicyRecord {
#[serde(rename = "iox::database")]
database: String,
name: String,
duration: Option<i64>,
}
const AUTOGEN_RETENTION_POLICY: &str = "autogen";
fn split_database_name(db_name: &str) -> (String, String) {
let mut split = db_name.split('/');
(
split.next().unwrap().to_owned(),
split.next().unwrap_or(AUTOGEN_RETENTION_POLICY).to_owned(),
)
}
#[derive(Debug, thiserror::Error)]
@ -168,6 +232,10 @@ pub enum Error {
ExecuteStream(#[source] DataFusionError),
#[error("unable to compose record batches from databases: {0}")]
DatabasesToRecordBatch(#[source] ArrowError),
#[error("unable to compose record batches from retention policies: {0}")]
RetentionPoliciesToRecordBatch(#[source] ArrowError),
#[error("serde_arrow error: {0}")]
SerdeArrow(#[from] serde_arrow::Error),
}
// This implementation is for the Flight service