diff --git a/Cargo.lock b/Cargo.lock index 7ec324c304..c6d0f63e1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2593,6 +2593,7 @@ dependencies = [ "influxdb-line-protocol", "influxdb3_write", "iox_catalog", + "iox_http", "iox_query", "iox_query_influxql", "iox_query_influxql_rewrite", @@ -2650,6 +2651,7 @@ dependencies = [ "hex", "influxdb-line-protocol", "iox_catalog", + "iox_http", "iox_query", "iox_time", "object_store", @@ -2823,6 +2825,22 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "iox_http" +version = "0.1.0" +source = "git+https://github.com/influxdata/influxdb3_core?rev=fd238811b995ddf949a4c7546f4c59f25bd451cf#fd238811b995ddf949a4c7546f4c59f25bd451cf" +dependencies = [ + "async-trait", + "authz", + "data_types", + "hyper", + "parking_lot", + "serde", + "serde_urlencoded", + "thiserror", + "workspace-hack", +] + [[package]] name = "iox_query" version = "0.1.0" diff --git a/influxdb3/tests/server/auth.rs b/influxdb3/tests/server/auth.rs index 00b5dc6b81..2d8a7516f6 100644 --- a/influxdb3/tests/server/auth.rs +++ b/influxdb3/tests/server/auth.rs @@ -295,31 +295,31 @@ async fn v1_password_parameter() { StatusCode::OK, ); - // TODO - The following assertions will break when the write API gets implemented, - // so will need to revisit these at that time. Right now, they just assert - // that the returned status code is 404 Not Found, as that would indicate - // the request made it past the authorize step in the HTTP router. + let valid_write_body = "cpu,host=val usage=0.5"; // Send request to write API with the token in the v1 `p` parameter: assert_eq!( client - .get(&write_url) - .query(&[("p", TOKEN)]) + .post(&write_url) + .query(&[("p", TOKEN), ("db", "foo")]) + .body(valid_write_body) .send() .await .expect("send request") .status(), - StatusCode::NOT_FOUND, + StatusCode::OK, ); // Send request to write API with the token in auth header: assert_eq!( client - .get(&write_url) + .post(&write_url) .bearer_auth(TOKEN) + .query(&[("db", "foo")]) + .body(valid_write_body) .send() .await .expect("send request") .status(), - StatusCode::NOT_FOUND, + StatusCode::OK, ); } diff --git a/influxdb3/tests/server/main.rs b/influxdb3/tests/server/main.rs index 58a86aa1fd..739af9188b 100644 --- a/influxdb3/tests/server/main.rs +++ b/influxdb3/tests/server/main.rs @@ -16,6 +16,7 @@ mod auth; mod flight; mod limits; mod query; +mod write; /// Configuration for a [`TestServer`] #[derive(Debug, Default)] diff --git a/influxdb3/tests/server/write.rs b/influxdb3/tests/server/write.rs new file mode 100644 index 0000000000..43c4d9d79a --- /dev/null +++ b/influxdb3/tests/server/write.rs @@ -0,0 +1,268 @@ +use hyper::StatusCode; +use pretty_assertions::assert_eq; + +use crate::TestServer; + +#[tokio::test] +async fn api_v1_write_request_parsing() { + let server = TestServer::spawn().await; + let client = reqwest::Client::new(); + let write_url = format!("{base}/write", base = server.client_addr()); + let write_body = "cpu,host=a usage=0.5"; + + #[derive(Debug)] + struct TestCase { + db: Option<&'static str>, + precision: Option<&'static str>, + rp: Option<&'static str>, + expected: StatusCode, + } + + let test_cases = [ + TestCase { + db: None, + precision: None, + rp: None, + expected: StatusCode::BAD_REQUEST, + }, + TestCase { + db: Some("foo"), + precision: None, + rp: None, + expected: StatusCode::OK, + }, + TestCase { + db: Some("foo"), + precision: Some("s"), + rp: None, + expected: StatusCode::OK, + }, + TestCase { + db: Some("foo"), + precision: Some("ms"), + rp: None, + expected: StatusCode::OK, + }, + TestCase { + db: Some("foo"), + precision: Some("us"), + rp: None, + expected: StatusCode::OK, + }, + TestCase { + db: Some("foo"), + precision: Some("ns"), + rp: None, + expected: StatusCode::OK, + }, + TestCase { + db: Some("foo"), + precision: Some("invalid"), + rp: None, + expected: StatusCode::BAD_REQUEST, + }, + TestCase { + db: Some("foo"), + precision: None, + rp: Some("bar"), + expected: StatusCode::OK, + }, + TestCase { + db: Some("foo"), + precision: None, + rp: Some("default"), + expected: StatusCode::OK, + }, + TestCase { + db: Some("foo"), + precision: None, + rp: Some("autogen"), + expected: StatusCode::OK, + }, + ]; + + for t in test_cases { + println!("Test Case: {t:?}"); + let mut params = vec![]; + if let Some(db) = t.db { + params.push(("db", db)); + } + if let Some(rp) = t.rp { + params.push(("rp", rp)); + } + if let Some(precision) = t.precision { + params.push(("precision", precision)); + } + let resp = client + .post(&write_url) + .query(¶ms) + .body(write_body) + .send() + .await + .expect("send /write request"); + let status = resp.status(); + let body = resp.text().await.expect("response body as text"); + println!("Response [{status}]:\n{body}"); + assert_eq!(t.expected, status); + } +} + +#[tokio::test] +async fn api_v1_write_round_trip() { + let server = TestServer::spawn().await; + let client = reqwest::Client::new(); + let write_url = format!("{base}/write", base = server.client_addr()); + + client + .post(write_url) + .query(&[("db", "foo")]) + .body( + "cpu,host=a usage=0.5 1 + cpu,host=a usage=0.6 2 + cpu,host=a usage=0.7 3", + ) + .send() + .await + .expect("send /write request"); + + let resp = server + .api_v3_query_influxql(&[("q", "SELECT * FROM foo.autogen.cpu"), ("format", "pretty")]) + .await + .text() + .await + .unwrap(); + + assert_eq!( + resp, + "+------------------+-------------------------------+------+-------+\n\ + | iox::measurement | time | host | usage |\n\ + +------------------+-------------------------------+------+-------+\n\ + | cpu | 1970-01-01T00:00:00.000000001 | a | 0.5 |\n\ + | cpu | 1970-01-01T00:00:00.000000002 | a | 0.6 |\n\ + | cpu | 1970-01-01T00:00:00.000000003 | a | 0.7 |\n\ + +------------------+-------------------------------+------+-------+" + ); +} + +#[tokio::test] +async fn api_v2_write_request_parsing() { + let server = TestServer::spawn().await; + let client = reqwest::Client::new(); + let write_url = format!("{base}/api/v2/write", base = server.client_addr()); + let write_body = "cpu,host=a usage=0.5"; + + #[derive(Debug)] + struct TestCase { + org: Option<&'static str>, + bucket: Option<&'static str>, + precision: Option<&'static str>, + expected: StatusCode, + } + + let test_cases = [ + TestCase { + org: None, + bucket: None, + precision: None, + expected: StatusCode::BAD_REQUEST, + }, + TestCase { + org: None, + bucket: Some("foo"), + precision: None, + expected: StatusCode::OK, + }, + TestCase { + org: Some("bar"), + bucket: Some("foo"), + precision: None, + expected: StatusCode::OK, + }, + TestCase { + org: None, + bucket: Some("foo"), + precision: Some("s"), + expected: StatusCode::OK, + }, + TestCase { + org: None, + bucket: Some("foo"), + precision: Some("ms"), + expected: StatusCode::OK, + }, + TestCase { + org: None, + bucket: Some("foo"), + precision: Some("us"), + expected: StatusCode::OK, + }, + TestCase { + org: None, + bucket: Some("foo"), + precision: Some("ns"), + expected: StatusCode::OK, + }, + ]; + + for t in test_cases { + println!("Test Case: {t:?}"); + let mut params = vec![]; + if let Some(bucket) = t.bucket { + params.push(("bucket", bucket)); + } + if let Some(org) = t.org { + params.push(("org", org)); + } + if let Some(precision) = t.precision { + params.push(("precision", precision)); + } + let resp = client + .post(&write_url) + .query(¶ms) + .body(write_body) + .send() + .await + .expect("send /write request"); + let status = resp.status(); + let body = resp.text().await.expect("response body as text"); + println!("Response [{status}]:\n{body}"); + assert_eq!(t.expected, status); + } +} + +#[tokio::test] +async fn api_v2_write_round_trip() { + let server = TestServer::spawn().await; + let client = reqwest::Client::new(); + let write_url = format!("{base}/api/v2/write", base = server.client_addr()); + + client + .post(write_url) + .query(&[("bucket", "foo")]) + .body( + "cpu,host=a usage=0.5 1 + cpu,host=a usage=0.6 2 + cpu,host=a usage=0.7 3", + ) + .send() + .await + .expect("send /write request"); + + let resp = server + .api_v3_query_influxql(&[("q", "SELECT * FROM foo.autogen.cpu"), ("format", "pretty")]) + .await + .text() + .await + .unwrap(); + + assert_eq!( + resp, + "+------------------+-------------------------------+------+-------+\n\ + | iox::measurement | time | host | usage |\n\ + +------------------+-------------------------------+------+-------+\n\ + | cpu | 1970-01-01T00:00:00.000000001 | a | 0.5 |\n\ + | cpu | 1970-01-01T00:00:00.000000002 | a | 0.6 |\n\ + | cpu | 1970-01-01T00:00:00.000000003 | a | 0.7 |\n\ + +------------------+-------------------------------+------+-------+" + ); +} diff --git a/influxdb3_client/src/lib.rs b/influxdb3_client/src/lib.rs index 0c3aba909e..6303caf4fb 100644 --- a/influxdb3_client/src/lib.rs +++ b/influxdb3_client/src/lib.rs @@ -207,7 +207,7 @@ impl<'a, B> From<&'a WriteRequestBuilder<'a, B>> for WriteParams<'a> { // TODO - this should re-use a type defined in the server code, or a separate crate, // central to both. #[derive(Debug, Copy, Clone, Serialize)] -#[serde(rename_all = "snake_case")] +#[serde(rename_all = "lowercase")] pub enum Precision { Second, Millisecond, diff --git a/influxdb3_server/Cargo.toml b/influxdb3_server/Cargo.toml index 19bb125ee3..56c1f31506 100644 --- a/influxdb3_server/Cargo.toml +++ b/influxdb3_server/Cargo.toml @@ -12,6 +12,7 @@ data_types.workspace = true datafusion_util.workspace = true influxdb-line-protocol.workspace = true iox_catalog.workspace = true +iox_http.workspace = true iox_query.workspace = true iox_query_params.workspace = true iox_query_influxql.workspace = true diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index c88cfc809f..983f5e2599 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -4,6 +4,7 @@ use crate::{query_executor, QueryKind}; use crate::{CommonServerState, QueryExecutor}; use arrow::record_batch::RecordBatch; use arrow::util::pretty; +use authz::http::AuthorizationHeaderExtension; use authz::Authorizer; use bytes::{Bytes, BytesMut}; use data_types::NamespaceName; @@ -25,6 +26,9 @@ use influxdb3_write::write_buffer::Error as WriteBufferError; use influxdb3_write::BufferedWriteRequest; use influxdb3_write::Precision; use influxdb3_write::WriteBuffer; +use iox_http::write::single_tenant::SingleTenantRequestUnifier; +use iox_http::write::v1::V1_NAMESPACE_RP_SEPARATOR; +use iox_http::write::{WriteParseError, WriteRequestUnifier}; use iox_query_influxql_rewrite as rewrite; use iox_query_params::StatementParams; use iox_time::TimeProvider; @@ -167,13 +171,8 @@ pub enum Error { #[error("query error: {0}")] Query(#[from] query_executor::Error), - // Invalid Start Character for a Database Name - #[error("db name did not start with a number or letter")] - DbNameInvalidStartChar, - - // Invalid Character for a Database Name - #[error("db name must use ASCII letters, numbers, underscores and hyphens only")] - DbNameInvalidChar, + #[error(transparent)] + DbName(#[from] ValidateDbNameError), #[error("partial write of line protocol occurred")] PartialLpWrite(BufferedWriteRequest), @@ -211,13 +210,15 @@ pub enum AuthorizationError { ToStr(#[from] hyper::header::ToStrError), } +#[derive(Debug, Serialize)] +struct ErrorMessage { + error: String, + data: Option, +} + impl Error { - fn response(self) -> Response { - #[derive(Debug, Serialize)] - struct ErrorMessage { - error: String, - data: Option, - } + /// Convert this error into an HTTP [`Response`] + fn into_response(self) -> Response { match self { Self::WriteBuffer(WriteBufferError::CatalogUpdateError( err @ (CatalogError::TooManyDbs @@ -247,9 +248,9 @@ impl Error { .body(body) .unwrap() } - Self::DbNameInvalidStartChar | Self::DbNameInvalidChar => { + Self::DbName(e) => { let err: ErrorMessage<()> = ErrorMessage { - error: self.to_string(), + error: e.to_string(), data: None, }; let serialized = serde_json::to_string(&err).unwrap(); @@ -304,6 +305,7 @@ pub(crate) struct HttpApi { pub(crate) query_executor: Arc, max_request_bytes: usize, authorizer: Arc, + legacy_write_param_unifier: SingleTenantRequestUnifier, } impl HttpApi { @@ -315,6 +317,7 @@ impl HttpApi { max_request_bytes: usize, authorizer: Arc, ) -> Self { + let legacy_write_param_unifier = SingleTenantRequestUnifier::new(Arc::clone(&authorizer)); Self { common_state, time_provider, @@ -322,6 +325,7 @@ impl HttpApi { query_executor, max_request_bytes, authorizer, + legacy_write_param_unifier, } } } @@ -336,7 +340,16 @@ where async fn write_lp(&self, req: Request) -> Result> { let query = req.uri().query().ok_or(Error::MissingWriteParams)?; let params: WriteParams = serde_urlencoded::from_str(query)?; - validate_db_name(¶ms.db)?; + self.write_lp_inner(params, req, false).await + } + + async fn write_lp_inner( + &self, + params: WriteParams, + req: Request, + accept_rp: bool, + ) -> Result> { + validate_db_name(¶ms.db, accept_rp)?; info!("write_lp to {}", params.db); let body = self.read_body(req).await?; @@ -478,6 +491,12 @@ where } async fn authorize_request(&self, req: &mut Request) -> Result<(), AuthorizationError> { + // Extend the request with the authorization token; this is used downstream in some + // APIs, such as write, that need the full header value to authorize a request. + let auth_header = req.headers().get(AUTHORIZATION).cloned(); + req.extensions_mut() + .insert(AuthorizationHeaderExtension::new(auth_header)); + let auth = if let Some(p) = extract_v1_auth_token(req) { Some(p) } else { @@ -646,33 +665,76 @@ impl From for AuthorizationError { } } +/// Validate a database name +/// /// A valid name: /// - Starts with a letter or a number /// - Is ASCII not UTF-8 /// - Contains only letters, numbers, underscores or hyphens -fn validate_db_name(name: &str) -> Result<()> { +/// - if `accept_rp` is true, then a single slash ('/') is allowed, separating the +/// the database name from the retention policy name, e.g., '/' +fn validate_db_name(name: &str, accept_rp: bool) -> Result<(), ValidateDbNameError> { + if name.is_empty() { + return Err(ValidateDbNameError::Empty); + } let mut is_first_char = true; + let mut rp_seperator_found = false; + let mut last_char = None; for grapheme in name.graphemes(true) { if grapheme.as_bytes().len() > 1 { // In the case of a unicode we need to handle multibyte chars - return Err(Error::DbNameInvalidChar); + return Err(ValidateDbNameError::InvalidChar); } let char = grapheme.as_bytes()[0] as char; if !is_first_char { - if !(char.is_ascii_alphanumeric() || char == '_' || char == '-') { - return Err(Error::DbNameInvalidChar); + match (accept_rp, rp_seperator_found, char) { + (true, true, V1_NAMESPACE_RP_SEPARATOR) => { + return Err(ValidateDbNameError::InvalidRetentionPolicy) + } + (true, false, V1_NAMESPACE_RP_SEPARATOR) => { + rp_seperator_found = true; + } + (false, _, char) + if !(char.is_ascii_alphanumeric() || char == '_' || char == '-') => + { + return Err(ValidateDbNameError::InvalidChar) + } + _ => (), } } else { if !char.is_ascii_alphanumeric() { - return Err(Error::DbNameInvalidStartChar); + return Err(ValidateDbNameError::InvalidStartChar); } is_first_char = false; } + last_char.replace(char); + } + + if last_char.is_some_and(|c| c == '/') { + return Err(ValidateDbNameError::InvalidRetentionPolicy); } Ok(()) } +#[derive(Debug, thiserror::Error)] +pub enum ValidateDbNameError { + #[error( + "invalid character in database name: must be ASCII, \ + containing only letters, numbers, underscores, or hyphens" + )] + InvalidChar, + #[error("db name did not start with a number or letter")] + InvalidStartChar, + #[error( + "db name with invalid retention policy, if providing a \ + retention policy name, must be of form '/'" + )] + InvalidRetentionPolicy, + #[error("db name cannot be empty")] + Empty, +} + #[derive(Debug, Deserialize)] pub(crate) struct QueryRequest { #[serde(rename = "db")] @@ -795,6 +857,17 @@ pub(crate) struct WriteParams { pub(crate) precision: Precision, } +impl From for WriteParams { + fn from(legacy: iox_http::write::WriteParams) -> Self { + Self { + db: legacy.namespace.to_string(), + // legacy behaviour was to not accept partial: + accept_partial: false, + precision: legacy.precision.into(), + } + } +} + pub(crate) async fn route_request( http_server: Arc>, mut req: Request, @@ -843,6 +916,22 @@ where let content_length = req.headers().get("content-length").cloned(); let response = match (method.clone(), uri.path()) { + (Method::POST, "/write") => { + let params = match http_server.legacy_write_param_unifier.parse_v1(&req).await { + Ok(p) => p.into(), + Err(e) => return Ok(legacy_write_error_to_response(e)), + }; + + http_server.write_lp_inner(params, req, true).await + } + (Method::POST, "/api/v2/write") => { + let params = match http_server.legacy_write_param_unifier.parse_v2(&req).await { + Ok(p) => p.into(), + Err(e) => return Ok(legacy_write_error_to_response(e)), + }; + + http_server.write_lp_inner(params, req, false).await + } (Method::POST, "/api/v3/write_lp") => http_server.write_lp(req).await, (Method::GET | Method::POST, "/api/v3/query_sql") => http_server.query_sql(req).await, (Method::GET | Method::POST, "/api/v3/query_influxql") => { @@ -871,11 +960,26 @@ where } Err(error) => { error!(%error, %method, %uri, ?content_length, "Error while handling request"); - Ok(error.response()) + Ok(error.into_response()) } } } +fn legacy_write_error_to_response(e: WriteParseError) -> Response { + let err: ErrorMessage<()> = ErrorMessage { + error: e.to_string(), + data: None, + }; + let serialized = serde_json::to_string(&err).unwrap(); + let body = Body::from(serialized); + let status = match e { + WriteParseError::NotImplemented => StatusCode::NOT_FOUND, + WriteParseError::SingleTenantError(e) => StatusCode::from(&e), + WriteParseError::MultiTenantError(e) => StatusCode::from(&e), + }; + Response::builder().status(status).body(body).unwrap() +} + async fn pprof_home(req: Request) -> Result> { let default_host = HeaderValue::from_static("localhost"); let host = req @@ -1031,3 +1135,36 @@ async fn pprof_heappy_profile(req: Request) -> Result> { async fn pprof_heappy_profile(_req: Request) -> Result> { Err(Error::HeappyIsNotCompiled) } + +#[cfg(test)] +mod tests { + use super::validate_db_name; + use super::ValidateDbNameError; + + macro_rules! assert_validate_db_name { + ($name:literal, $accept_rp:literal, $expected:pat) => { + let actual = validate_db_name($name, $accept_rp); + assert!(matches!(&actual, $expected), "got: {actual:?}",); + }; + } + + #[test] + fn test_validate_db_name() { + assert_validate_db_name!("foo/bar", false, Err(ValidateDbNameError::InvalidChar)); + assert!(validate_db_name("foo/bar", true).is_ok()); + assert_validate_db_name!( + "foo/bar/baz", + true, + Err(ValidateDbNameError::InvalidRetentionPolicy) + ); + assert_validate_db_name!( + "foo/", + true, + Err(ValidateDbNameError::InvalidRetentionPolicy) + ); + assert_validate_db_name!("foo/bar", false, Err(ValidateDbNameError::InvalidChar)); + assert_validate_db_name!("foo/bar/baz", false, Err(ValidateDbNameError::InvalidChar)); + assert_validate_db_name!("_foo", false, Err(ValidateDbNameError::InvalidStartChar)); + assert_validate_db_name!("", false, Err(ValidateDbNameError::Empty)); + } +} diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index b99d707661..a3997f6911 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -556,7 +556,7 @@ mod tests { assert_eq!( body, "{\ - \"error\":\"db name must use ASCII letters, numbers, underscores and hyphens only\",\ + \"error\":\"invalid character in database name: must be ASCII, containing only letters, numbers, underscores, or hyphens\",\ \"data\":null\ }" ); @@ -584,6 +584,29 @@ mod tests { }" ); + let resp = write_lp( + &server, + "", + "cpu,host=b val=2 155\n", + None, + true, + "nanosecond", + ) + .await; + + let status = resp.status(); + let body = + String::from_utf8(body::to_bytes(resp.into_body()).await.unwrap().to_vec()).unwrap(); + + assert_eq!(status, StatusCode::BAD_REQUEST); + assert_eq!( + body, + "{\ + \"error\":\"db name cannot be empty\",\ + \"data\":null\ + }" + ); + shutdown.cancel(); } diff --git a/influxdb3_write/Cargo.toml b/influxdb3_write/Cargo.toml index 0aa806dc38..d5bf0d9941 100644 --- a/influxdb3_write/Cargo.toml +++ b/influxdb3_write/Cargo.toml @@ -11,6 +11,7 @@ data_types.workspace = true datafusion_util.workspace = true influxdb-line-protocol.workspace = true iox_catalog.workspace = true +iox_http.workspace = true iox_query.workspace = true iox_time.workspace = true parquet_file.workspace = true diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index 60301f6cb7..955d8757f4 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -541,6 +541,17 @@ impl Default for Precision { } } +impl From for Precision { + fn from(legacy: iox_http::write::Precision) -> Self { + match legacy { + iox_http::write::Precision::Second => Precision::Second, + iox_http::write::Precision::Millisecond => Precision::Millisecond, + iox_http::write::Precision::Microsecond => Precision::Microsecond, + iox_http::write::Precision::Nanosecond => Precision::Nanosecond, + } + } +} + /// Guess precision based off of a given timestamp. // Note that this will fail in June 2128, but that's not our problem pub(crate) fn guess_precision(timestamp: i64) -> Precision {