feat: support v1 and v2 write APIs (#24793)
feat: support v1 and v2 write APIs This adds support for two APIs: /write and /api/v2/write. These implement the v1 and v2 write APIs, respectively. In general, the difference between these and the new /api/v3/write_lp API is in the request parsing. We leverage the WriteRequestUnifier trait from influxdb3_core to handle parsing of v1 and v2 HTTP requests, to keep the error handling at that level consistent with distributed versions of InfluxDB 3.0. Specifically, we use the SingleTenantRequestUnifier implementation of the trait. Changes: - Addition of two new routes to the route_request method in influxdb3_server::http to serve /write and /api/v2/write requests. - Database name validation was updated to handle cases where retention policies may be passed in /write requests, and to also reject empty names. A unit test was added to verify the validate_db_name function. - HTTP request authorization in the router will extract the full Authorization header value, and store it in the request extensions; this is used in the write request parsing from the core iox_http crate to authorize write requests. - E2E tests to verify correct HTTP request parsing / response behaviour for both /write and /api/v2/write APIs - E2E tests to check that data sent in through /write and /api/v2/write can be queried backpull/24851/head
parent
c79821b246
commit
7784749bca
|
@ -2593,6 +2593,7 @@ dependencies = [
|
|||
"influxdb-line-protocol",
|
||||
"influxdb3_write",
|
||||
"iox_catalog",
|
||||
"iox_http",
|
||||
"iox_query",
|
||||
"iox_query_influxql",
|
||||
"iox_query_influxql_rewrite",
|
||||
|
@ -2650,6 +2651,7 @@ dependencies = [
|
|||
"hex",
|
||||
"influxdb-line-protocol",
|
||||
"iox_catalog",
|
||||
"iox_http",
|
||||
"iox_query",
|
||||
"iox_time",
|
||||
"object_store",
|
||||
|
@ -2823,6 +2825,22 @@ dependencies = [
|
|||
"workspace-hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "iox_http"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/influxdata/influxdb3_core?rev=fd238811b995ddf949a4c7546f4c59f25bd451cf#fd238811b995ddf949a4c7546f4c59f25bd451cf"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"authz",
|
||||
"data_types",
|
||||
"hyper",
|
||||
"parking_lot",
|
||||
"serde",
|
||||
"serde_urlencoded",
|
||||
"thiserror",
|
||||
"workspace-hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "iox_query"
|
||||
version = "0.1.0"
|
||||
|
|
|
@ -295,31 +295,31 @@ async fn v1_password_parameter() {
|
|||
StatusCode::OK,
|
||||
);
|
||||
|
||||
// TODO - The following assertions will break when the write API gets implemented,
|
||||
// so will need to revisit these at that time. Right now, they just assert
|
||||
// that the returned status code is 404 Not Found, as that would indicate
|
||||
// the request made it past the authorize step in the HTTP router.
|
||||
let valid_write_body = "cpu,host=val usage=0.5";
|
||||
|
||||
// Send request to write API with the token in the v1 `p` parameter:
|
||||
assert_eq!(
|
||||
client
|
||||
.get(&write_url)
|
||||
.query(&[("p", TOKEN)])
|
||||
.post(&write_url)
|
||||
.query(&[("p", TOKEN), ("db", "foo")])
|
||||
.body(valid_write_body)
|
||||
.send()
|
||||
.await
|
||||
.expect("send request")
|
||||
.status(),
|
||||
StatusCode::NOT_FOUND,
|
||||
StatusCode::OK,
|
||||
);
|
||||
// Send request to write API with the token in auth header:
|
||||
assert_eq!(
|
||||
client
|
||||
.get(&write_url)
|
||||
.post(&write_url)
|
||||
.bearer_auth(TOKEN)
|
||||
.query(&[("db", "foo")])
|
||||
.body(valid_write_body)
|
||||
.send()
|
||||
.await
|
||||
.expect("send request")
|
||||
.status(),
|
||||
StatusCode::NOT_FOUND,
|
||||
StatusCode::OK,
|
||||
);
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ mod auth;
|
|||
mod flight;
|
||||
mod limits;
|
||||
mod query;
|
||||
mod write;
|
||||
|
||||
/// Configuration for a [`TestServer`]
|
||||
#[derive(Debug, Default)]
|
||||
|
|
|
@ -0,0 +1,268 @@
|
|||
use hyper::StatusCode;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use crate::TestServer;
|
||||
|
||||
#[tokio::test]
|
||||
async fn api_v1_write_request_parsing() {
|
||||
let server = TestServer::spawn().await;
|
||||
let client = reqwest::Client::new();
|
||||
let write_url = format!("{base}/write", base = server.client_addr());
|
||||
let write_body = "cpu,host=a usage=0.5";
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TestCase {
|
||||
db: Option<&'static str>,
|
||||
precision: Option<&'static str>,
|
||||
rp: Option<&'static str>,
|
||||
expected: StatusCode,
|
||||
}
|
||||
|
||||
let test_cases = [
|
||||
TestCase {
|
||||
db: None,
|
||||
precision: None,
|
||||
rp: None,
|
||||
expected: StatusCode::BAD_REQUEST,
|
||||
},
|
||||
TestCase {
|
||||
db: Some("foo"),
|
||||
precision: None,
|
||||
rp: None,
|
||||
expected: StatusCode::OK,
|
||||
},
|
||||
TestCase {
|
||||
db: Some("foo"),
|
||||
precision: Some("s"),
|
||||
rp: None,
|
||||
expected: StatusCode::OK,
|
||||
},
|
||||
TestCase {
|
||||
db: Some("foo"),
|
||||
precision: Some("ms"),
|
||||
rp: None,
|
||||
expected: StatusCode::OK,
|
||||
},
|
||||
TestCase {
|
||||
db: Some("foo"),
|
||||
precision: Some("us"),
|
||||
rp: None,
|
||||
expected: StatusCode::OK,
|
||||
},
|
||||
TestCase {
|
||||
db: Some("foo"),
|
||||
precision: Some("ns"),
|
||||
rp: None,
|
||||
expected: StatusCode::OK,
|
||||
},
|
||||
TestCase {
|
||||
db: Some("foo"),
|
||||
precision: Some("invalid"),
|
||||
rp: None,
|
||||
expected: StatusCode::BAD_REQUEST,
|
||||
},
|
||||
TestCase {
|
||||
db: Some("foo"),
|
||||
precision: None,
|
||||
rp: Some("bar"),
|
||||
expected: StatusCode::OK,
|
||||
},
|
||||
TestCase {
|
||||
db: Some("foo"),
|
||||
precision: None,
|
||||
rp: Some("default"),
|
||||
expected: StatusCode::OK,
|
||||
},
|
||||
TestCase {
|
||||
db: Some("foo"),
|
||||
precision: None,
|
||||
rp: Some("autogen"),
|
||||
expected: StatusCode::OK,
|
||||
},
|
||||
];
|
||||
|
||||
for t in test_cases {
|
||||
println!("Test Case: {t:?}");
|
||||
let mut params = vec![];
|
||||
if let Some(db) = t.db {
|
||||
params.push(("db", db));
|
||||
}
|
||||
if let Some(rp) = t.rp {
|
||||
params.push(("rp", rp));
|
||||
}
|
||||
if let Some(precision) = t.precision {
|
||||
params.push(("precision", precision));
|
||||
}
|
||||
let resp = client
|
||||
.post(&write_url)
|
||||
.query(¶ms)
|
||||
.body(write_body)
|
||||
.send()
|
||||
.await
|
||||
.expect("send /write request");
|
||||
let status = resp.status();
|
||||
let body = resp.text().await.expect("response body as text");
|
||||
println!("Response [{status}]:\n{body}");
|
||||
assert_eq!(t.expected, status);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn api_v1_write_round_trip() {
|
||||
let server = TestServer::spawn().await;
|
||||
let client = reqwest::Client::new();
|
||||
let write_url = format!("{base}/write", base = server.client_addr());
|
||||
|
||||
client
|
||||
.post(write_url)
|
||||
.query(&[("db", "foo")])
|
||||
.body(
|
||||
"cpu,host=a usage=0.5 1
|
||||
cpu,host=a usage=0.6 2
|
||||
cpu,host=a usage=0.7 3",
|
||||
)
|
||||
.send()
|
||||
.await
|
||||
.expect("send /write request");
|
||||
|
||||
let resp = server
|
||||
.api_v3_query_influxql(&[("q", "SELECT * FROM foo.autogen.cpu"), ("format", "pretty")])
|
||||
.await
|
||||
.text()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
resp,
|
||||
"+------------------+-------------------------------+------+-------+\n\
|
||||
| iox::measurement | time | host | usage |\n\
|
||||
+------------------+-------------------------------+------+-------+\n\
|
||||
| cpu | 1970-01-01T00:00:00.000000001 | a | 0.5 |\n\
|
||||
| cpu | 1970-01-01T00:00:00.000000002 | a | 0.6 |\n\
|
||||
| cpu | 1970-01-01T00:00:00.000000003 | a | 0.7 |\n\
|
||||
+------------------+-------------------------------+------+-------+"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn api_v2_write_request_parsing() {
|
||||
let server = TestServer::spawn().await;
|
||||
let client = reqwest::Client::new();
|
||||
let write_url = format!("{base}/api/v2/write", base = server.client_addr());
|
||||
let write_body = "cpu,host=a usage=0.5";
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TestCase {
|
||||
org: Option<&'static str>,
|
||||
bucket: Option<&'static str>,
|
||||
precision: Option<&'static str>,
|
||||
expected: StatusCode,
|
||||
}
|
||||
|
||||
let test_cases = [
|
||||
TestCase {
|
||||
org: None,
|
||||
bucket: None,
|
||||
precision: None,
|
||||
expected: StatusCode::BAD_REQUEST,
|
||||
},
|
||||
TestCase {
|
||||
org: None,
|
||||
bucket: Some("foo"),
|
||||
precision: None,
|
||||
expected: StatusCode::OK,
|
||||
},
|
||||
TestCase {
|
||||
org: Some("bar"),
|
||||
bucket: Some("foo"),
|
||||
precision: None,
|
||||
expected: StatusCode::OK,
|
||||
},
|
||||
TestCase {
|
||||
org: None,
|
||||
bucket: Some("foo"),
|
||||
precision: Some("s"),
|
||||
expected: StatusCode::OK,
|
||||
},
|
||||
TestCase {
|
||||
org: None,
|
||||
bucket: Some("foo"),
|
||||
precision: Some("ms"),
|
||||
expected: StatusCode::OK,
|
||||
},
|
||||
TestCase {
|
||||
org: None,
|
||||
bucket: Some("foo"),
|
||||
precision: Some("us"),
|
||||
expected: StatusCode::OK,
|
||||
},
|
||||
TestCase {
|
||||
org: None,
|
||||
bucket: Some("foo"),
|
||||
precision: Some("ns"),
|
||||
expected: StatusCode::OK,
|
||||
},
|
||||
];
|
||||
|
||||
for t in test_cases {
|
||||
println!("Test Case: {t:?}");
|
||||
let mut params = vec![];
|
||||
if let Some(bucket) = t.bucket {
|
||||
params.push(("bucket", bucket));
|
||||
}
|
||||
if let Some(org) = t.org {
|
||||
params.push(("org", org));
|
||||
}
|
||||
if let Some(precision) = t.precision {
|
||||
params.push(("precision", precision));
|
||||
}
|
||||
let resp = client
|
||||
.post(&write_url)
|
||||
.query(¶ms)
|
||||
.body(write_body)
|
||||
.send()
|
||||
.await
|
||||
.expect("send /write request");
|
||||
let status = resp.status();
|
||||
let body = resp.text().await.expect("response body as text");
|
||||
println!("Response [{status}]:\n{body}");
|
||||
assert_eq!(t.expected, status);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn api_v2_write_round_trip() {
|
||||
let server = TestServer::spawn().await;
|
||||
let client = reqwest::Client::new();
|
||||
let write_url = format!("{base}/api/v2/write", base = server.client_addr());
|
||||
|
||||
client
|
||||
.post(write_url)
|
||||
.query(&[("bucket", "foo")])
|
||||
.body(
|
||||
"cpu,host=a usage=0.5 1
|
||||
cpu,host=a usage=0.6 2
|
||||
cpu,host=a usage=0.7 3",
|
||||
)
|
||||
.send()
|
||||
.await
|
||||
.expect("send /write request");
|
||||
|
||||
let resp = server
|
||||
.api_v3_query_influxql(&[("q", "SELECT * FROM foo.autogen.cpu"), ("format", "pretty")])
|
||||
.await
|
||||
.text()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
resp,
|
||||
"+------------------+-------------------------------+------+-------+\n\
|
||||
| iox::measurement | time | host | usage |\n\
|
||||
+------------------+-------------------------------+------+-------+\n\
|
||||
| cpu | 1970-01-01T00:00:00.000000001 | a | 0.5 |\n\
|
||||
| cpu | 1970-01-01T00:00:00.000000002 | a | 0.6 |\n\
|
||||
| cpu | 1970-01-01T00:00:00.000000003 | a | 0.7 |\n\
|
||||
+------------------+-------------------------------+------+-------+"
|
||||
);
|
||||
}
|
|
@ -207,7 +207,7 @@ impl<'a, B> From<&'a WriteRequestBuilder<'a, B>> for WriteParams<'a> {
|
|||
// TODO - this should re-use a type defined in the server code, or a separate crate,
|
||||
// central to both.
|
||||
#[derive(Debug, Copy, Clone, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum Precision {
|
||||
Second,
|
||||
Millisecond,
|
||||
|
|
|
@ -12,6 +12,7 @@ data_types.workspace = true
|
|||
datafusion_util.workspace = true
|
||||
influxdb-line-protocol.workspace = true
|
||||
iox_catalog.workspace = true
|
||||
iox_http.workspace = true
|
||||
iox_query.workspace = true
|
||||
iox_query_params.workspace = true
|
||||
iox_query_influxql.workspace = true
|
||||
|
|
|
@ -4,6 +4,7 @@ use crate::{query_executor, QueryKind};
|
|||
use crate::{CommonServerState, QueryExecutor};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow::util::pretty;
|
||||
use authz::http::AuthorizationHeaderExtension;
|
||||
use authz::Authorizer;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use data_types::NamespaceName;
|
||||
|
@ -25,6 +26,9 @@ use influxdb3_write::write_buffer::Error as WriteBufferError;
|
|||
use influxdb3_write::BufferedWriteRequest;
|
||||
use influxdb3_write::Precision;
|
||||
use influxdb3_write::WriteBuffer;
|
||||
use iox_http::write::single_tenant::SingleTenantRequestUnifier;
|
||||
use iox_http::write::v1::V1_NAMESPACE_RP_SEPARATOR;
|
||||
use iox_http::write::{WriteParseError, WriteRequestUnifier};
|
||||
use iox_query_influxql_rewrite as rewrite;
|
||||
use iox_query_params::StatementParams;
|
||||
use iox_time::TimeProvider;
|
||||
|
@ -167,13 +171,8 @@ pub enum Error {
|
|||
#[error("query error: {0}")]
|
||||
Query(#[from] query_executor::Error),
|
||||
|
||||
// Invalid Start Character for a Database Name
|
||||
#[error("db name did not start with a number or letter")]
|
||||
DbNameInvalidStartChar,
|
||||
|
||||
// Invalid Character for a Database Name
|
||||
#[error("db name must use ASCII letters, numbers, underscores and hyphens only")]
|
||||
DbNameInvalidChar,
|
||||
#[error(transparent)]
|
||||
DbName(#[from] ValidateDbNameError),
|
||||
|
||||
#[error("partial write of line protocol occurred")]
|
||||
PartialLpWrite(BufferedWriteRequest),
|
||||
|
@ -211,13 +210,15 @@ pub enum AuthorizationError {
|
|||
ToStr(#[from] hyper::header::ToStrError),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct ErrorMessage<T: Serialize> {
|
||||
error: String,
|
||||
data: Option<T>,
|
||||
}
|
||||
|
||||
impl Error {
|
||||
fn response(self) -> Response<Body> {
|
||||
#[derive(Debug, Serialize)]
|
||||
struct ErrorMessage<T: Serialize> {
|
||||
error: String,
|
||||
data: Option<T>,
|
||||
}
|
||||
/// Convert this error into an HTTP [`Response`]
|
||||
fn into_response(self) -> Response<Body> {
|
||||
match self {
|
||||
Self::WriteBuffer(WriteBufferError::CatalogUpdateError(
|
||||
err @ (CatalogError::TooManyDbs
|
||||
|
@ -247,9 +248,9 @@ impl Error {
|
|||
.body(body)
|
||||
.unwrap()
|
||||
}
|
||||
Self::DbNameInvalidStartChar | Self::DbNameInvalidChar => {
|
||||
Self::DbName(e) => {
|
||||
let err: ErrorMessage<()> = ErrorMessage {
|
||||
error: self.to_string(),
|
||||
error: e.to_string(),
|
||||
data: None,
|
||||
};
|
||||
let serialized = serde_json::to_string(&err).unwrap();
|
||||
|
@ -304,6 +305,7 @@ pub(crate) struct HttpApi<W, Q, T> {
|
|||
pub(crate) query_executor: Arc<Q>,
|
||||
max_request_bytes: usize,
|
||||
authorizer: Arc<dyn Authorizer>,
|
||||
legacy_write_param_unifier: SingleTenantRequestUnifier,
|
||||
}
|
||||
|
||||
impl<W, Q, T> HttpApi<W, Q, T> {
|
||||
|
@ -315,6 +317,7 @@ impl<W, Q, T> HttpApi<W, Q, T> {
|
|||
max_request_bytes: usize,
|
||||
authorizer: Arc<dyn Authorizer>,
|
||||
) -> Self {
|
||||
let legacy_write_param_unifier = SingleTenantRequestUnifier::new(Arc::clone(&authorizer));
|
||||
Self {
|
||||
common_state,
|
||||
time_provider,
|
||||
|
@ -322,6 +325,7 @@ impl<W, Q, T> HttpApi<W, Q, T> {
|
|||
query_executor,
|
||||
max_request_bytes,
|
||||
authorizer,
|
||||
legacy_write_param_unifier,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -336,7 +340,16 @@ where
|
|||
async fn write_lp(&self, req: Request<Body>) -> Result<Response<Body>> {
|
||||
let query = req.uri().query().ok_or(Error::MissingWriteParams)?;
|
||||
let params: WriteParams = serde_urlencoded::from_str(query)?;
|
||||
validate_db_name(¶ms.db)?;
|
||||
self.write_lp_inner(params, req, false).await
|
||||
}
|
||||
|
||||
async fn write_lp_inner(
|
||||
&self,
|
||||
params: WriteParams,
|
||||
req: Request<Body>,
|
||||
accept_rp: bool,
|
||||
) -> Result<Response<Body>> {
|
||||
validate_db_name(¶ms.db, accept_rp)?;
|
||||
info!("write_lp to {}", params.db);
|
||||
|
||||
let body = self.read_body(req).await?;
|
||||
|
@ -478,6 +491,12 @@ where
|
|||
}
|
||||
|
||||
async fn authorize_request(&self, req: &mut Request<Body>) -> Result<(), AuthorizationError> {
|
||||
// Extend the request with the authorization token; this is used downstream in some
|
||||
// APIs, such as write, that need the full header value to authorize a request.
|
||||
let auth_header = req.headers().get(AUTHORIZATION).cloned();
|
||||
req.extensions_mut()
|
||||
.insert(AuthorizationHeaderExtension::new(auth_header));
|
||||
|
||||
let auth = if let Some(p) = extract_v1_auth_token(req) {
|
||||
Some(p)
|
||||
} else {
|
||||
|
@ -646,33 +665,76 @@ impl From<authz::Error> for AuthorizationError {
|
|||
}
|
||||
}
|
||||
|
||||
/// Validate a database name
|
||||
///
|
||||
/// A valid name:
|
||||
/// - Starts with a letter or a number
|
||||
/// - Is ASCII not UTF-8
|
||||
/// - Contains only letters, numbers, underscores or hyphens
|
||||
fn validate_db_name(name: &str) -> Result<()> {
|
||||
/// - if `accept_rp` is true, then a single slash ('/') is allowed, separating the
|
||||
/// the database name from the retention policy name, e.g., '<db_name>/<rp_name>'
|
||||
fn validate_db_name(name: &str, accept_rp: bool) -> Result<(), ValidateDbNameError> {
|
||||
if name.is_empty() {
|
||||
return Err(ValidateDbNameError::Empty);
|
||||
}
|
||||
let mut is_first_char = true;
|
||||
let mut rp_seperator_found = false;
|
||||
let mut last_char = None;
|
||||
for grapheme in name.graphemes(true) {
|
||||
if grapheme.as_bytes().len() > 1 {
|
||||
// In the case of a unicode we need to handle multibyte chars
|
||||
return Err(Error::DbNameInvalidChar);
|
||||
return Err(ValidateDbNameError::InvalidChar);
|
||||
}
|
||||
let char = grapheme.as_bytes()[0] as char;
|
||||
if !is_first_char {
|
||||
if !(char.is_ascii_alphanumeric() || char == '_' || char == '-') {
|
||||
return Err(Error::DbNameInvalidChar);
|
||||
match (accept_rp, rp_seperator_found, char) {
|
||||
(true, true, V1_NAMESPACE_RP_SEPARATOR) => {
|
||||
return Err(ValidateDbNameError::InvalidRetentionPolicy)
|
||||
}
|
||||
(true, false, V1_NAMESPACE_RP_SEPARATOR) => {
|
||||
rp_seperator_found = true;
|
||||
}
|
||||
(false, _, char)
|
||||
if !(char.is_ascii_alphanumeric() || char == '_' || char == '-') =>
|
||||
{
|
||||
return Err(ValidateDbNameError::InvalidChar)
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
} else {
|
||||
if !char.is_ascii_alphanumeric() {
|
||||
return Err(Error::DbNameInvalidStartChar);
|
||||
return Err(ValidateDbNameError::InvalidStartChar);
|
||||
}
|
||||
is_first_char = false;
|
||||
}
|
||||
last_char.replace(char);
|
||||
}
|
||||
|
||||
if last_char.is_some_and(|c| c == '/') {
|
||||
return Err(ValidateDbNameError::InvalidRetentionPolicy);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ValidateDbNameError {
|
||||
#[error(
|
||||
"invalid character in database name: must be ASCII, \
|
||||
containing only letters, numbers, underscores, or hyphens"
|
||||
)]
|
||||
InvalidChar,
|
||||
#[error("db name did not start with a number or letter")]
|
||||
InvalidStartChar,
|
||||
#[error(
|
||||
"db name with invalid retention policy, if providing a \
|
||||
retention policy name, must be of form '<db_name>/<rp_name>'"
|
||||
)]
|
||||
InvalidRetentionPolicy,
|
||||
#[error("db name cannot be empty")]
|
||||
Empty,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub(crate) struct QueryRequest<D, F, P> {
|
||||
#[serde(rename = "db")]
|
||||
|
@ -795,6 +857,17 @@ pub(crate) struct WriteParams {
|
|||
pub(crate) precision: Precision,
|
||||
}
|
||||
|
||||
impl From<iox_http::write::WriteParams> for WriteParams {
|
||||
fn from(legacy: iox_http::write::WriteParams) -> Self {
|
||||
Self {
|
||||
db: legacy.namespace.to_string(),
|
||||
// legacy behaviour was to not accept partial:
|
||||
accept_partial: false,
|
||||
precision: legacy.precision.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn route_request<W: WriteBuffer, Q: QueryExecutor, T: TimeProvider>(
|
||||
http_server: Arc<HttpApi<W, Q, T>>,
|
||||
mut req: Request<Body>,
|
||||
|
@ -843,6 +916,22 @@ where
|
|||
let content_length = req.headers().get("content-length").cloned();
|
||||
|
||||
let response = match (method.clone(), uri.path()) {
|
||||
(Method::POST, "/write") => {
|
||||
let params = match http_server.legacy_write_param_unifier.parse_v1(&req).await {
|
||||
Ok(p) => p.into(),
|
||||
Err(e) => return Ok(legacy_write_error_to_response(e)),
|
||||
};
|
||||
|
||||
http_server.write_lp_inner(params, req, true).await
|
||||
}
|
||||
(Method::POST, "/api/v2/write") => {
|
||||
let params = match http_server.legacy_write_param_unifier.parse_v2(&req).await {
|
||||
Ok(p) => p.into(),
|
||||
Err(e) => return Ok(legacy_write_error_to_response(e)),
|
||||
};
|
||||
|
||||
http_server.write_lp_inner(params, req, false).await
|
||||
}
|
||||
(Method::POST, "/api/v3/write_lp") => http_server.write_lp(req).await,
|
||||
(Method::GET | Method::POST, "/api/v3/query_sql") => http_server.query_sql(req).await,
|
||||
(Method::GET | Method::POST, "/api/v3/query_influxql") => {
|
||||
|
@ -871,11 +960,26 @@ where
|
|||
}
|
||||
Err(error) => {
|
||||
error!(%error, %method, %uri, ?content_length, "Error while handling request");
|
||||
Ok(error.response())
|
||||
Ok(error.into_response())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn legacy_write_error_to_response(e: WriteParseError) -> Response<Body> {
|
||||
let err: ErrorMessage<()> = ErrorMessage {
|
||||
error: e.to_string(),
|
||||
data: None,
|
||||
};
|
||||
let serialized = serde_json::to_string(&err).unwrap();
|
||||
let body = Body::from(serialized);
|
||||
let status = match e {
|
||||
WriteParseError::NotImplemented => StatusCode::NOT_FOUND,
|
||||
WriteParseError::SingleTenantError(e) => StatusCode::from(&e),
|
||||
WriteParseError::MultiTenantError(e) => StatusCode::from(&e),
|
||||
};
|
||||
Response::builder().status(status).body(body).unwrap()
|
||||
}
|
||||
|
||||
async fn pprof_home(req: Request<Body>) -> Result<Response<Body>> {
|
||||
let default_host = HeaderValue::from_static("localhost");
|
||||
let host = req
|
||||
|
@ -1031,3 +1135,36 @@ async fn pprof_heappy_profile(req: Request<Body>) -> Result<Response<Body>> {
|
|||
async fn pprof_heappy_profile(_req: Request<Body>) -> Result<Response<Body>> {
|
||||
Err(Error::HeappyIsNotCompiled)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::validate_db_name;
|
||||
use super::ValidateDbNameError;
|
||||
|
||||
macro_rules! assert_validate_db_name {
|
||||
($name:literal, $accept_rp:literal, $expected:pat) => {
|
||||
let actual = validate_db_name($name, $accept_rp);
|
||||
assert!(matches!(&actual, $expected), "got: {actual:?}",);
|
||||
};
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_db_name() {
|
||||
assert_validate_db_name!("foo/bar", false, Err(ValidateDbNameError::InvalidChar));
|
||||
assert!(validate_db_name("foo/bar", true).is_ok());
|
||||
assert_validate_db_name!(
|
||||
"foo/bar/baz",
|
||||
true,
|
||||
Err(ValidateDbNameError::InvalidRetentionPolicy)
|
||||
);
|
||||
assert_validate_db_name!(
|
||||
"foo/",
|
||||
true,
|
||||
Err(ValidateDbNameError::InvalidRetentionPolicy)
|
||||
);
|
||||
assert_validate_db_name!("foo/bar", false, Err(ValidateDbNameError::InvalidChar));
|
||||
assert_validate_db_name!("foo/bar/baz", false, Err(ValidateDbNameError::InvalidChar));
|
||||
assert_validate_db_name!("_foo", false, Err(ValidateDbNameError::InvalidStartChar));
|
||||
assert_validate_db_name!("", false, Err(ValidateDbNameError::Empty));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -556,7 +556,7 @@ mod tests {
|
|||
assert_eq!(
|
||||
body,
|
||||
"{\
|
||||
\"error\":\"db name must use ASCII letters, numbers, underscores and hyphens only\",\
|
||||
\"error\":\"invalid character in database name: must be ASCII, containing only letters, numbers, underscores, or hyphens\",\
|
||||
\"data\":null\
|
||||
}"
|
||||
);
|
||||
|
@ -584,6 +584,29 @@ mod tests {
|
|||
}"
|
||||
);
|
||||
|
||||
let resp = write_lp(
|
||||
&server,
|
||||
"",
|
||||
"cpu,host=b val=2 155\n",
|
||||
None,
|
||||
true,
|
||||
"nanosecond",
|
||||
)
|
||||
.await;
|
||||
|
||||
let status = resp.status();
|
||||
let body =
|
||||
String::from_utf8(body::to_bytes(resp.into_body()).await.unwrap().to_vec()).unwrap();
|
||||
|
||||
assert_eq!(status, StatusCode::BAD_REQUEST);
|
||||
assert_eq!(
|
||||
body,
|
||||
"{\
|
||||
\"error\":\"db name cannot be empty\",\
|
||||
\"data\":null\
|
||||
}"
|
||||
);
|
||||
|
||||
shutdown.cancel();
|
||||
}
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@ data_types.workspace = true
|
|||
datafusion_util.workspace = true
|
||||
influxdb-line-protocol.workspace = true
|
||||
iox_catalog.workspace = true
|
||||
iox_http.workspace = true
|
||||
iox_query.workspace = true
|
||||
iox_time.workspace = true
|
||||
parquet_file.workspace = true
|
||||
|
|
|
@ -541,6 +541,17 @@ impl Default for Precision {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<iox_http::write::Precision> for Precision {
|
||||
fn from(legacy: iox_http::write::Precision) -> Self {
|
||||
match legacy {
|
||||
iox_http::write::Precision::Second => Precision::Second,
|
||||
iox_http::write::Precision::Millisecond => Precision::Millisecond,
|
||||
iox_http::write::Precision::Microsecond => Precision::Microsecond,
|
||||
iox_http::write::Precision::Nanosecond => Precision::Nanosecond,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Guess precision based off of a given timestamp.
|
||||
// Note that this will fail in June 2128, but that's not our problem
|
||||
pub(crate) fn guess_precision(timestamp: i64) -> Precision {
|
||||
|
|
Loading…
Reference in New Issue