feat: API to create last caches (#25147)

Closes #25096

- Adds a new HTTP API that allows the creation of a last cache, see the issue for details
- An E2E test was added to check success/failure behaviour of the API
- Adds the mime crate, for parsing request MIME types, but this is only used in the code I added - we may adopt it in other APIs / parts of the HTTP server in future PRs
pull/25162/head^2
Trevor Hilton 2024-07-16 10:32:26 -04:00 committed by GitHub
parent 0279461738
commit 56488592db
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 341 additions and 20 deletions

1
Cargo.lock generated
View File

@ -2535,6 +2535,7 @@ dependencies = [
"iox_time",
"metric",
"metric_exporters",
"mime",
"object_store",
"observability_deps",
"parking_lot",

View File

@ -64,6 +64,7 @@ hyper = "0.14"
insta = { version = "1.39", features = ["json"] }
indexmap = { version = "2.2.6" }
libc = { version = "0.2" }
mime = "0.3.17"
mockito = { version = "1.4.0", default-features = false }
num_cpus = "1.16.0"
object_store = "0.10.1"

View File

@ -0,0 +1,171 @@
use hyper::StatusCode;
use crate::TestServer;
#[tokio::test]
async fn api_v3_configure_last_cache_create() {
let server = TestServer::spawn().await;
let client = reqwest::Client::new();
let url = format!(
"{base}/api/v3/configure/last_cache",
base = server.client_addr()
);
// Write some LP to the database to initialize the catalog:
let db_name = "db";
let tbl_name = "tbl";
server
.write_lp_to_db(
db_name,
format!("{tbl_name},t1=a,t2=b,t3=c f1=true,f2=\"hello\",f3=4i,f4=4u,f5=5 1000"),
influxdb3_client::Precision::Second,
)
.await
.expect("write to db");
#[derive(Default)]
struct TestCase {
// These attributes all map to parameters of the request body:
db: Option<&'static str>,
table: Option<&'static str>,
cache_name: Option<&'static str>,
count: Option<usize>,
ttl: Option<usize>,
key_cols: Option<&'static [&'static str]>,
val_cols: Option<&'static [&'static str]>,
// This is the status code expected in the response:
expected: StatusCode,
}
let test_cases = [
// No parameters specified:
TestCase {
expected: StatusCode::BAD_REQUEST,
..Default::default()
},
// Missing database name:
TestCase {
table: Some(tbl_name),
expected: StatusCode::BAD_REQUEST,
..Default::default()
},
// Missing table name:
TestCase {
db: Some(db_name),
expected: StatusCode::BAD_REQUEST,
..Default::default()
},
// Good, will use defaults for everything omitted, and get back a 201:
TestCase {
db: Some(db_name),
table: Some(tbl_name),
expected: StatusCode::CREATED,
..Default::default()
},
// Same as before, will be successful, but with 204:
TestCase {
db: Some(db_name),
table: Some(tbl_name),
expected: StatusCode::NO_CONTENT,
..Default::default()
},
// Use a specific cache name, will succeed and create new cache:
// NOTE: this will only differ from the previous cache in name, should this actually
// be an error?
TestCase {
db: Some(db_name),
table: Some(tbl_name),
cache_name: Some("my_cache"),
expected: StatusCode::CREATED,
..Default::default()
},
// Same as previous, but will get 204 because it does nothing:
TestCase {
db: Some(db_name),
table: Some(tbl_name),
cache_name: Some("my_cache"),
expected: StatusCode::NO_CONTENT,
..Default::default()
},
// Same as previous, but this time try to use different parameters, this will result in
// a bad request:
TestCase {
db: Some(db_name),
table: Some(tbl_name),
cache_name: Some("my_cache"),
// The default TTL that would have been used is 4 * 60 * 60 seconds (4 hours)
ttl: Some(666),
expected: StatusCode::BAD_REQUEST,
..Default::default()
},
// Will create new cache, because key columns are unique, and so will be the name:
TestCase {
db: Some(db_name),
table: Some(tbl_name),
key_cols: Some(&["t1", "t2"]),
expected: StatusCode::CREATED,
..Default::default()
},
// Same as previous, but will get 204 because nothing happens:
TestCase {
db: Some(db_name),
table: Some(tbl_name),
key_cols: Some(&["t1", "t2"]),
expected: StatusCode::NO_CONTENT,
..Default::default()
},
// Use an invalid key column (by name) is a bad request:
TestCase {
db: Some(db_name),
table: Some(tbl_name),
key_cols: Some(&["not_a_key_column"]),
expected: StatusCode::BAD_REQUEST,
..Default::default()
},
// Use an invalid key column (by type) is a bad request:
TestCase {
db: Some(db_name),
table: Some(tbl_name),
// f5 is a float, which is not supported as a key column:
key_cols: Some(&["f5"]),
expected: StatusCode::BAD_REQUEST,
..Default::default()
},
// Use an invalid value column is a bad request:
TestCase {
db: Some(db_name),
table: Some(tbl_name),
val_cols: Some(&["not_a_value_column"]),
expected: StatusCode::BAD_REQUEST,
..Default::default()
},
// Use an invalid cache size is a bad request:
TestCase {
db: Some(db_name),
table: Some(tbl_name),
count: Some(11),
expected: StatusCode::BAD_REQUEST,
..Default::default()
},
];
for (i, t) in test_cases.into_iter().enumerate() {
let body = serde_json::json!({
"db": t.db,
"table": t.table,
"name": t.cache_name,
"key_columns": t.key_cols,
"value_columns": t.val_cols,
"count": t.count,
"ttl": t.ttl,
});
let resp = client
.post(&url)
.json(&body)
.send()
.await
.expect("send /api/v3/configure/last_cache request");
let status = resp.status();
assert_eq!(t.expected, status, "test case ({i}) failed");
}
}

View File

@ -14,6 +14,7 @@ use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use reqwest::Response;
mod auth;
mod configure;
mod flight;
mod limits;
mod ping;

View File

@ -52,6 +52,7 @@ flate2.workspace = true
futures.workspace = true
hex.workspace = true
hyper.workspace = true
mime.workspace = true
object_store.workspace = true
parking_lot.workspace = true
pin-project-lite.workspace = true

View File

@ -22,6 +22,7 @@ use hyper::HeaderMap;
use hyper::{Body, Method, Request, Response, StatusCode};
use influxdb3_process::{INFLUXDB3_GIT_HASH_SHORT, INFLUXDB3_VERSION};
use influxdb3_write::catalog::Error as CatalogError;
use influxdb3_write::last_cache::{self, CreateCacheArguments};
use influxdb3_write::persister::TrackedMemoryArrowWriter;
use influxdb3_write::write_buffer::Error as WriteBufferError;
use influxdb3_write::BufferedWriteRequest;
@ -43,6 +44,7 @@ use std::pin::Pin;
use std::str::Utf8Error;
use std::string::FromUtf8Error;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use unicode_segmentation::UnicodeSegmentation;
@ -60,12 +62,20 @@ pub enum Error {
/// The `Content-Encoding` header is invalid and cannot be read.
#[error("invalid content-encoding header: {0}")]
NonUtf8ContentHeader(hyper::header::ToStrError),
NonUtf8ContentEncodingHeader(hyper::header::ToStrError),
/// The `Content-Type` header is invalid and cannot be read.
#[error("invalid content-type header: {0}")]
NonUtf8ContentTypeHeader(hyper::header::ToStrError),
/// The specified `Content-Encoding` is not acceptable.
#[error("unacceptable content-encoding: {0}")]
InvalidContentEncoding(String),
/// The specified `Content-Type` is not acceptable.
#[error("unacceptable content-type, expected: {expected}")]
InvalidContentType { expected: mime::Mime },
/// The client disconnected.
#[error("client disconnected")]
ClientHangup(hyper::Error),
@ -187,6 +197,15 @@ pub enum Error {
#[error("v1 query API error: {0}")]
V1Query(#[from] v1::QueryError),
#[error("last cache error: {0}")]
LastCache(#[from] last_cache::Error),
#[error("provided database name does not exist")]
DatabaseDoesNotExist,
#[error("provided table name does not exist for database")]
TableDoesNotExist,
}
#[derive(Debug, Error)]
@ -275,6 +294,27 @@ impl Error {
.body(body)
.unwrap()
}
Self::SerdeJson(_) => Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from(self.to_string()))
.unwrap(),
Self::LastCache(ref lc_err) => match lc_err {
last_cache::Error::InvalidCacheSize
| last_cache::Error::CacheAlreadyExists { .. }
| last_cache::Error::KeyColumnDoesNotExist { .. }
| last_cache::Error::InvalidKeyColumn
| last_cache::Error::ValueColumnDoesNotExist { .. } => Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from(lc_err.to_string()))
.unwrap(),
// This variant should not be encountered by the API, as it is thrown during
// query execution and would be captured there, but avoiding a catch-all arm here
// in case new variants are added to the enum:
last_cache::Error::CacheDoesNotExist => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(self.to_string()))
.unwrap(),
},
_ => {
let body = Body::from(self.to_string());
Response::builder()
@ -464,7 +504,7 @@ where
let encoding = req
.headers()
.get(&CONTENT_ENCODING)
.map(|v| v.to_str().map_err(Error::NonUtf8ContentHeader))
.map(|v| v.to_str().map_err(Error::NonUtf8ContentEncodingHeader))
.transpose()?;
let ungzip = match encoding {
None | Some("identity") => false,
@ -634,6 +674,91 @@ where
}
.map_err(Into::into)
}
async fn config_last_cache_create(&self, req: Request<Body>) -> Result<Response<Body>> {
let LastCacheCreateRequest {
db,
table,
name,
key_columns,
value_columns,
count,
ttl,
} = self.read_body_json(req).await?;
let Some(db_schema) = self.write_buffer.catalog().db_schema(&db) else {
return Err(Error::DatabaseDoesNotExist);
};
let Some(tbl_schema) = db_schema.get_table_schema(&table) else {
return Err(Error::TableDoesNotExist);
};
match self
.write_buffer
.last_cache()
.create_cache(CreateCacheArguments {
db_name: db,
tbl_name: table,
schema: tbl_schema.clone(),
cache_name: name,
count,
ttl: ttl.map(Duration::from_secs),
key_columns,
value_columns,
})? {
Some(cache_name) => Response::builder()
.status(StatusCode::CREATED)
.header(CONTENT_TYPE, mime::APPLICATION_JSON.as_ref())
.body(Body::from(
serde_json::to_string(&LastCacheCreatedResponse { cache_name }).unwrap(),
))
.map_err(Into::into),
None => Response::builder()
.status(StatusCode::NO_CONTENT)
.body(Body::empty())
.map_err(Into::into),
}
}
async fn read_body_json<ReqBody: DeserializeOwned>(
&self,
req: hyper::Request<Body>,
) -> Result<ReqBody> {
if !json_content_type(req.headers()) {
return Err(Error::InvalidContentType {
expected: mime::APPLICATION_JSON,
});
}
let bytes = self.read_body(req).await?;
serde_json::from_slice(&bytes).map_err(Into::into)
}
}
/// Check that the content type is application/json
fn json_content_type(headers: &HeaderMap) -> bool {
let content_type = if let Some(content_type) = headers.get(CONTENT_TYPE) {
content_type
} else {
return false;
};
let content_type = if let Ok(content_type) = content_type.to_str() {
content_type
} else {
return false;
};
let mime = if let Ok(mime) = content_type.parse::<mime::Mime>() {
mime
} else {
return false;
};
let is_json_content_type = mime.type_() == "application"
&& (mime.subtype() == "json" || mime.suffix().map_or(false, |name| name == "json"));
is_json_content_type
}
#[derive(Debug, Deserialize)]
@ -886,6 +1011,22 @@ impl From<iox_http::write::WriteParams> for WriteParams {
}
}
#[derive(Debug, Deserialize)]
struct LastCacheCreateRequest {
db: String,
table: String,
name: Option<String>,
key_columns: Option<Vec<String>>,
value_columns: Option<Vec<String>>,
count: Option<usize>,
ttl: Option<u64>,
}
#[derive(Debug, Serialize)]
struct LastCacheCreatedResponse {
cache_name: String,
}
pub(crate) async fn route_request<W: WriteBuffer, Q: QueryExecutor, T: TimeProvider>(
http_server: Arc<HttpApi<W, Q, T>>,
mut req: Request<Body>,
@ -958,6 +1099,9 @@ where
(Method::GET, "/health" | "/api/v1/health") => http_server.health(),
(Method::GET | Method::POST, "/ping") => http_server.ping(),
(Method::GET, "/metrics") => http_server.handle_metrics(),
(Method::POST, "/api/v3/configure/last_cache") => {
http_server.config_last_cache_create(req).await
}
_ => {
let body = Body::from("not found");
Ok(Response::builder()

View File

@ -46,8 +46,6 @@ pub enum Error {
InvalidKeyColumn,
#[error("specified value column ({column_name}) does not exist in the table schema")]
ValueColumnDoesNotExist { column_name: String },
#[error("schema builder error: {0}")]
SchemaBuilder(#[from] schema::builder::Error),
#[error("requested last cache does not exist")]
CacheDoesNotExist,
}
@ -78,35 +76,35 @@ impl std::fmt::Debug for LastCacheProvider {
const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(60 * 60 * 4);
/// Arguments to the [`LastCacheProvider::create_cache`] method
pub(crate) struct CreateCacheArguments {
pub struct CreateCacheArguments {
/// The name of the database to create the cache for
pub(crate) db_name: String,
pub db_name: String,
/// The name of the table in the database to create the cache for
pub(crate) tbl_name: String,
pub tbl_name: String,
/// The Influx Schema of the table
pub(crate) schema: Schema,
pub schema: Schema,
/// An optional name for the cache
///
/// The cache name will default to `<table_name>_<keys>_last_cache`
pub(crate) cache_name: Option<String>,
pub cache_name: Option<String>,
/// The number of values to hold in the created cache
///
/// This will default to 1.
pub(crate) count: Option<usize>,
pub count: Option<usize>,
/// The time-to-live (TTL) for the created cache
///
/// This will default to [`DEFAULT_CACHE_TTL`]
pub(crate) ttl: Option<Duration>,
pub ttl: Option<Duration>,
/// The key column names to use in the cache hierarchy
///
/// This will default to:
/// - the series key columns for a v3 table
/// - the lexicographically ordered tag set for a v1 table
pub(crate) key_columns: Option<Vec<String>>,
pub key_columns: Option<Vec<String>>,
/// The value columns to use in the cache
///
/// This will default to all non-key columns. The `time` column is always included.
pub(crate) value_columns: Option<Vec<String>>,
pub value_columns: Option<Vec<String>>,
}
impl LastCacheProvider {
@ -147,7 +145,10 @@ impl LastCacheProvider {
/// Create a new entry in the last cache for a given database and table, along with the given
/// parameters.
pub(crate) fn create_cache(
///
/// If a new cache is created, it will return its name. If the provided arguments are identical
/// to an existing cache (along with any defaults), then `None` will be returned.
pub fn create_cache(
&self,
CreateCacheArguments {
db_name,
@ -159,7 +160,7 @@ impl LastCacheProvider {
key_columns,
value_columns,
}: CreateCacheArguments,
) -> Result<String, Error> {
) -> Result<Option<String>, Error> {
let key_columns = if let Some(keys) = key_columns {
// validate the key columns specified to ensure correct type (string, int, unit, or bool)
// and that they exist in the table's schema.
@ -276,7 +277,7 @@ impl LastCacheProvider {
.and_then(|db| db.get(&tbl_name))
.and_then(|tbl| tbl.get(&cache_name))
{
return lc.compare_config(&last_cache).map(|_| cache_name);
return lc.compare_config(&last_cache).map(|_| None);
}
// get the write lock and insert:
@ -288,7 +289,7 @@ impl LastCacheProvider {
.or_default()
.insert(cache_name.clone(), last_cache);
Ok(cache_name)
Ok(Some(cache_name))
}
/// Write a batch from the buffer into the cache by iterating over its database and table batches
@ -2693,7 +2694,7 @@ mod tests {
)
.expect("create last cache should have failed");
assert_eq!(wbuf.last_cache().size(), 2);
assert_eq!("tbl_t1_last_cache", name);
assert_eq!(Some("tbl_t1_last_cache"), name.as_deref());
// Specify different TTL:
wbuf.create_last_cache(

View File

@ -417,7 +417,8 @@ impl<W: Wal, T: TimeProvider> WriteBufferImpl<W, T> {
/// Create a new last-N-value cache in the specified database and table, along with the given
/// parameters.
///
/// Returns the name of the newly created cache.
/// Returns the name of the newly created cache, or `None` if a cache was not created, but the
/// provided parameters match those of an existing cache.
#[allow(clippy::too_many_arguments)]
pub fn create_last_cache(
&self,
@ -428,7 +429,7 @@ impl<W: Wal, T: TimeProvider> WriteBufferImpl<W, T> {
ttl: Option<Duration>,
key_columns: Option<Vec<String>>,
value_columns: Option<Vec<String>>,
) -> Result<String, Error> {
) -> Result<Option<String>, Error> {
let db_name = db_name.into();
let tbl_name = tbl_name.into();
let cache_name = cache_name.map(Into::into);