From 7784749bca87b91c7bcaece06b0ae2aeb0046c47 Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Thu, 28 Mar 2024 13:33:17 -0400 Subject: [PATCH] feat: support v1 and v2 write APIs (#24793) feat: support v1 and v2 write APIs This adds support for two APIs: /write and /api/v2/write. These implement the v1 and v2 write APIs, respectively. In general, the difference between these and the new /api/v3/write_lp API is in the request parsing. We leverage the WriteRequestUnifier trait from influxdb3_core to handle parsing of v1 and v2 HTTP requests, to keep the error handling at that level consistent with distributed versions of InfluxDB 3.0. Specifically, we use the SingleTenantRequestUnifier implementation of the trait. Changes: - Addition of two new routes to the route_request method in influxdb3_server::http to serve /write and /api/v2/write requests. - Database name validation was updated to handle cases where retention policies may be passed in /write requests, and to also reject empty names. A unit test was added to verify the validate_db_name function. - HTTP request authorization in the router will extract the full Authorization header value, and store it in the request extensions; this is used in the write request parsing from the core iox_http crate to authorize write requests. - E2E tests to verify correct HTTP request parsing / response behaviour for both /write and /api/v2/write APIs - E2E tests to check that data sent in through /write and /api/v2/write can be queried back --- Cargo.lock | 18 +++ influxdb3/tests/server/auth.rs | 18 +-- influxdb3/tests/server/main.rs | 1 + influxdb3/tests/server/write.rs | 268 ++++++++++++++++++++++++++++++++ influxdb3_client/src/lib.rs | 2 +- influxdb3_server/Cargo.toml | 1 + influxdb3_server/src/http.rs | 181 ++++++++++++++++++--- influxdb3_server/src/lib.rs | 25 ++- influxdb3_write/Cargo.toml | 1 + influxdb3_write/src/lib.rs | 11 ++ 10 files changed, 493 insertions(+), 33 deletions(-) create mode 100644 influxdb3/tests/server/write.rs diff --git a/Cargo.lock b/Cargo.lock index 7ec324c304..c6d0f63e1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2593,6 +2593,7 @@ dependencies = [ "influxdb-line-protocol", "influxdb3_write", "iox_catalog", + "iox_http", "iox_query", "iox_query_influxql", "iox_query_influxql_rewrite", @@ -2650,6 +2651,7 @@ dependencies = [ "hex", "influxdb-line-protocol", "iox_catalog", + "iox_http", "iox_query", "iox_time", "object_store", @@ -2823,6 +2825,22 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "iox_http" +version = "0.1.0" +source = "git+https://github.com/influxdata/influxdb3_core?rev=fd238811b995ddf949a4c7546f4c59f25bd451cf#fd238811b995ddf949a4c7546f4c59f25bd451cf" +dependencies = [ + "async-trait", + "authz", + "data_types", + "hyper", + "parking_lot", + "serde", + "serde_urlencoded", + "thiserror", + "workspace-hack", +] + [[package]] name = "iox_query" version = "0.1.0" diff --git a/influxdb3/tests/server/auth.rs b/influxdb3/tests/server/auth.rs index 00b5dc6b81..2d8a7516f6 100644 --- a/influxdb3/tests/server/auth.rs +++ b/influxdb3/tests/server/auth.rs @@ -295,31 +295,31 @@ async fn v1_password_parameter() { StatusCode::OK, ); - // TODO - The following assertions will break when the write API gets implemented, - // so will need to revisit these at that time. Right now, they just assert - // that the returned status code is 404 Not Found, as that would indicate - // the request made it past the authorize step in the HTTP router. + let valid_write_body = "cpu,host=val usage=0.5"; // Send request to write API with the token in the v1 `p` parameter: assert_eq!( client - .get(&write_url) - .query(&[("p", TOKEN)]) + .post(&write_url) + .query(&[("p", TOKEN), ("db", "foo")]) + .body(valid_write_body) .send() .await .expect("send request") .status(), - StatusCode::NOT_FOUND, + StatusCode::OK, ); // Send request to write API with the token in auth header: assert_eq!( client - .get(&write_url) + .post(&write_url) .bearer_auth(TOKEN) + .query(&[("db", "foo")]) + .body(valid_write_body) .send() .await .expect("send request") .status(), - StatusCode::NOT_FOUND, + StatusCode::OK, ); } diff --git a/influxdb3/tests/server/main.rs b/influxdb3/tests/server/main.rs index 58a86aa1fd..739af9188b 100644 --- a/influxdb3/tests/server/main.rs +++ b/influxdb3/tests/server/main.rs @@ -16,6 +16,7 @@ mod auth; mod flight; mod limits; mod query; +mod write; /// Configuration for a [`TestServer`] #[derive(Debug, Default)] diff --git a/influxdb3/tests/server/write.rs b/influxdb3/tests/server/write.rs new file mode 100644 index 0000000000..43c4d9d79a --- /dev/null +++ b/influxdb3/tests/server/write.rs @@ -0,0 +1,268 @@ +use hyper::StatusCode; +use pretty_assertions::assert_eq; + +use crate::TestServer; + +#[tokio::test] +async fn api_v1_write_request_parsing() { + let server = TestServer::spawn().await; + let client = reqwest::Client::new(); + let write_url = format!("{base}/write", base = server.client_addr()); + let write_body = "cpu,host=a usage=0.5"; + + #[derive(Debug)] + struct TestCase { + db: Option<&'static str>, + precision: Option<&'static str>, + rp: Option<&'static str>, + expected: StatusCode, + } + + let test_cases = [ + TestCase { + db: None, + precision: None, + rp: None, + expected: StatusCode::BAD_REQUEST, + }, + TestCase { + db: Some("foo"), + precision: None, + rp: None, + expected: StatusCode::OK, + }, + TestCase { + db: Some("foo"), + precision: Some("s"), + rp: None, + expected: StatusCode::OK, + }, + TestCase { + db: Some("foo"), + precision: Some("ms"), + rp: None, + expected: StatusCode::OK, + }, + TestCase { + db: Some("foo"), + precision: Some("us"), + rp: None, + expected: StatusCode::OK, + }, + TestCase { + db: Some("foo"), + precision: Some("ns"), + rp: None, + expected: StatusCode::OK, + }, + TestCase { + db: Some("foo"), + precision: Some("invalid"), + rp: None, + expected: StatusCode::BAD_REQUEST, + }, + TestCase { + db: Some("foo"), + precision: None, + rp: Some("bar"), + expected: StatusCode::OK, + }, + TestCase { + db: Some("foo"), + precision: None, + rp: Some("default"), + expected: StatusCode::OK, + }, + TestCase { + db: Some("foo"), + precision: None, + rp: Some("autogen"), + expected: StatusCode::OK, + }, + ]; + + for t in test_cases { + println!("Test Case: {t:?}"); + let mut params = vec![]; + if let Some(db) = t.db { + params.push(("db", db)); + } + if let Some(rp) = t.rp { + params.push(("rp", rp)); + } + if let Some(precision) = t.precision { + params.push(("precision", precision)); + } + let resp = client + .post(&write_url) + .query(¶ms) + .body(write_body) + .send() + .await + .expect("send /write request"); + let status = resp.status(); + let body = resp.text().await.expect("response body as text"); + println!("Response [{status}]:\n{body}"); + assert_eq!(t.expected, status); + } +} + +#[tokio::test] +async fn api_v1_write_round_trip() { + let server = TestServer::spawn().await; + let client = reqwest::Client::new(); + let write_url = format!("{base}/write", base = server.client_addr()); + + client + .post(write_url) + .query(&[("db", "foo")]) + .body( + "cpu,host=a usage=0.5 1 + cpu,host=a usage=0.6 2 + cpu,host=a usage=0.7 3", + ) + .send() + .await + .expect("send /write request"); + + let resp = server + .api_v3_query_influxql(&[("q", "SELECT * FROM foo.autogen.cpu"), ("format", "pretty")]) + .await + .text() + .await + .unwrap(); + + assert_eq!( + resp, + "+------------------+-------------------------------+------+-------+\n\ + | iox::measurement | time | host | usage |\n\ + +------------------+-------------------------------+------+-------+\n\ + | cpu | 1970-01-01T00:00:00.000000001 | a | 0.5 |\n\ + | cpu | 1970-01-01T00:00:00.000000002 | a | 0.6 |\n\ + | cpu | 1970-01-01T00:00:00.000000003 | a | 0.7 |\n\ + +------------------+-------------------------------+------+-------+" + ); +} + +#[tokio::test] +async fn api_v2_write_request_parsing() { + let server = TestServer::spawn().await; + let client = reqwest::Client::new(); + let write_url = format!("{base}/api/v2/write", base = server.client_addr()); + let write_body = "cpu,host=a usage=0.5"; + + #[derive(Debug)] + struct TestCase { + org: Option<&'static str>, + bucket: Option<&'static str>, + precision: Option<&'static str>, + expected: StatusCode, + } + + let test_cases = [ + TestCase { + org: None, + bucket: None, + precision: None, + expected: StatusCode::BAD_REQUEST, + }, + TestCase { + org: None, + bucket: Some("foo"), + precision: None, + expected: StatusCode::OK, + }, + TestCase { + org: Some("bar"), + bucket: Some("foo"), + precision: None, + expected: StatusCode::OK, + }, + TestCase { + org: None, + bucket: Some("foo"), + precision: Some("s"), + expected: StatusCode::OK, + }, + TestCase { + org: None, + bucket: Some("foo"), + precision: Some("ms"), + expected: StatusCode::OK, + }, + TestCase { + org: None, + bucket: Some("foo"), + precision: Some("us"), + expected: StatusCode::OK, + }, + TestCase { + org: None, + bucket: Some("foo"), + precision: Some("ns"), + expected: StatusCode::OK, + }, + ]; + + for t in test_cases { + println!("Test Case: {t:?}"); + let mut params = vec![]; + if let Some(bucket) = t.bucket { + params.push(("bucket", bucket)); + } + if let Some(org) = t.org { + params.push(("org", org)); + } + if let Some(precision) = t.precision { + params.push(("precision", precision)); + } + let resp = client + .post(&write_url) + .query(¶ms) + .body(write_body) + .send() + .await + .expect("send /write request"); + let status = resp.status(); + let body = resp.text().await.expect("response body as text"); + println!("Response [{status}]:\n{body}"); + assert_eq!(t.expected, status); + } +} + +#[tokio::test] +async fn api_v2_write_round_trip() { + let server = TestServer::spawn().await; + let client = reqwest::Client::new(); + let write_url = format!("{base}/api/v2/write", base = server.client_addr()); + + client + .post(write_url) + .query(&[("bucket", "foo")]) + .body( + "cpu,host=a usage=0.5 1 + cpu,host=a usage=0.6 2 + cpu,host=a usage=0.7 3", + ) + .send() + .await + .expect("send /write request"); + + let resp = server + .api_v3_query_influxql(&[("q", "SELECT * FROM foo.autogen.cpu"), ("format", "pretty")]) + .await + .text() + .await + .unwrap(); + + assert_eq!( + resp, + "+------------------+-------------------------------+------+-------+\n\ + | iox::measurement | time | host | usage |\n\ + +------------------+-------------------------------+------+-------+\n\ + | cpu | 1970-01-01T00:00:00.000000001 | a | 0.5 |\n\ + | cpu | 1970-01-01T00:00:00.000000002 | a | 0.6 |\n\ + | cpu | 1970-01-01T00:00:00.000000003 | a | 0.7 |\n\ + +------------------+-------------------------------+------+-------+" + ); +} diff --git a/influxdb3_client/src/lib.rs b/influxdb3_client/src/lib.rs index 0c3aba909e..6303caf4fb 100644 --- a/influxdb3_client/src/lib.rs +++ b/influxdb3_client/src/lib.rs @@ -207,7 +207,7 @@ impl<'a, B> From<&'a WriteRequestBuilder<'a, B>> for WriteParams<'a> { // TODO - this should re-use a type defined in the server code, or a separate crate, // central to both. #[derive(Debug, Copy, Clone, Serialize)] -#[serde(rename_all = "snake_case")] +#[serde(rename_all = "lowercase")] pub enum Precision { Second, Millisecond, diff --git a/influxdb3_server/Cargo.toml b/influxdb3_server/Cargo.toml index 19bb125ee3..56c1f31506 100644 --- a/influxdb3_server/Cargo.toml +++ b/influxdb3_server/Cargo.toml @@ -12,6 +12,7 @@ data_types.workspace = true datafusion_util.workspace = true influxdb-line-protocol.workspace = true iox_catalog.workspace = true +iox_http.workspace = true iox_query.workspace = true iox_query_params.workspace = true iox_query_influxql.workspace = true diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index c88cfc809f..983f5e2599 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -4,6 +4,7 @@ use crate::{query_executor, QueryKind}; use crate::{CommonServerState, QueryExecutor}; use arrow::record_batch::RecordBatch; use arrow::util::pretty; +use authz::http::AuthorizationHeaderExtension; use authz::Authorizer; use bytes::{Bytes, BytesMut}; use data_types::NamespaceName; @@ -25,6 +26,9 @@ use influxdb3_write::write_buffer::Error as WriteBufferError; use influxdb3_write::BufferedWriteRequest; use influxdb3_write::Precision; use influxdb3_write::WriteBuffer; +use iox_http::write::single_tenant::SingleTenantRequestUnifier; +use iox_http::write::v1::V1_NAMESPACE_RP_SEPARATOR; +use iox_http::write::{WriteParseError, WriteRequestUnifier}; use iox_query_influxql_rewrite as rewrite; use iox_query_params::StatementParams; use iox_time::TimeProvider; @@ -167,13 +171,8 @@ pub enum Error { #[error("query error: {0}")] Query(#[from] query_executor::Error), - // Invalid Start Character for a Database Name - #[error("db name did not start with a number or letter")] - DbNameInvalidStartChar, - - // Invalid Character for a Database Name - #[error("db name must use ASCII letters, numbers, underscores and hyphens only")] - DbNameInvalidChar, + #[error(transparent)] + DbName(#[from] ValidateDbNameError), #[error("partial write of line protocol occurred")] PartialLpWrite(BufferedWriteRequest), @@ -211,13 +210,15 @@ pub enum AuthorizationError { ToStr(#[from] hyper::header::ToStrError), } +#[derive(Debug, Serialize)] +struct ErrorMessage { + error: String, + data: Option, +} + impl Error { - fn response(self) -> Response { - #[derive(Debug, Serialize)] - struct ErrorMessage { - error: String, - data: Option, - } + /// Convert this error into an HTTP [`Response`] + fn into_response(self) -> Response { match self { Self::WriteBuffer(WriteBufferError::CatalogUpdateError( err @ (CatalogError::TooManyDbs @@ -247,9 +248,9 @@ impl Error { .body(body) .unwrap() } - Self::DbNameInvalidStartChar | Self::DbNameInvalidChar => { + Self::DbName(e) => { let err: ErrorMessage<()> = ErrorMessage { - error: self.to_string(), + error: e.to_string(), data: None, }; let serialized = serde_json::to_string(&err).unwrap(); @@ -304,6 +305,7 @@ pub(crate) struct HttpApi { pub(crate) query_executor: Arc, max_request_bytes: usize, authorizer: Arc, + legacy_write_param_unifier: SingleTenantRequestUnifier, } impl HttpApi { @@ -315,6 +317,7 @@ impl HttpApi { max_request_bytes: usize, authorizer: Arc, ) -> Self { + let legacy_write_param_unifier = SingleTenantRequestUnifier::new(Arc::clone(&authorizer)); Self { common_state, time_provider, @@ -322,6 +325,7 @@ impl HttpApi { query_executor, max_request_bytes, authorizer, + legacy_write_param_unifier, } } } @@ -336,7 +340,16 @@ where async fn write_lp(&self, req: Request) -> Result> { let query = req.uri().query().ok_or(Error::MissingWriteParams)?; let params: WriteParams = serde_urlencoded::from_str(query)?; - validate_db_name(¶ms.db)?; + self.write_lp_inner(params, req, false).await + } + + async fn write_lp_inner( + &self, + params: WriteParams, + req: Request, + accept_rp: bool, + ) -> Result> { + validate_db_name(¶ms.db, accept_rp)?; info!("write_lp to {}", params.db); let body = self.read_body(req).await?; @@ -478,6 +491,12 @@ where } async fn authorize_request(&self, req: &mut Request) -> Result<(), AuthorizationError> { + // Extend the request with the authorization token; this is used downstream in some + // APIs, such as write, that need the full header value to authorize a request. + let auth_header = req.headers().get(AUTHORIZATION).cloned(); + req.extensions_mut() + .insert(AuthorizationHeaderExtension::new(auth_header)); + let auth = if let Some(p) = extract_v1_auth_token(req) { Some(p) } else { @@ -646,33 +665,76 @@ impl From for AuthorizationError { } } +/// Validate a database name +/// /// A valid name: /// - Starts with a letter or a number /// - Is ASCII not UTF-8 /// - Contains only letters, numbers, underscores or hyphens -fn validate_db_name(name: &str) -> Result<()> { +/// - if `accept_rp` is true, then a single slash ('/') is allowed, separating the +/// the database name from the retention policy name, e.g., '/' +fn validate_db_name(name: &str, accept_rp: bool) -> Result<(), ValidateDbNameError> { + if name.is_empty() { + return Err(ValidateDbNameError::Empty); + } let mut is_first_char = true; + let mut rp_seperator_found = false; + let mut last_char = None; for grapheme in name.graphemes(true) { if grapheme.as_bytes().len() > 1 { // In the case of a unicode we need to handle multibyte chars - return Err(Error::DbNameInvalidChar); + return Err(ValidateDbNameError::InvalidChar); } let char = grapheme.as_bytes()[0] as char; if !is_first_char { - if !(char.is_ascii_alphanumeric() || char == '_' || char == '-') { - return Err(Error::DbNameInvalidChar); + match (accept_rp, rp_seperator_found, char) { + (true, true, V1_NAMESPACE_RP_SEPARATOR) => { + return Err(ValidateDbNameError::InvalidRetentionPolicy) + } + (true, false, V1_NAMESPACE_RP_SEPARATOR) => { + rp_seperator_found = true; + } + (false, _, char) + if !(char.is_ascii_alphanumeric() || char == '_' || char == '-') => + { + return Err(ValidateDbNameError::InvalidChar) + } + _ => (), } } else { if !char.is_ascii_alphanumeric() { - return Err(Error::DbNameInvalidStartChar); + return Err(ValidateDbNameError::InvalidStartChar); } is_first_char = false; } + last_char.replace(char); + } + + if last_char.is_some_and(|c| c == '/') { + return Err(ValidateDbNameError::InvalidRetentionPolicy); } Ok(()) } +#[derive(Debug, thiserror::Error)] +pub enum ValidateDbNameError { + #[error( + "invalid character in database name: must be ASCII, \ + containing only letters, numbers, underscores, or hyphens" + )] + InvalidChar, + #[error("db name did not start with a number or letter")] + InvalidStartChar, + #[error( + "db name with invalid retention policy, if providing a \ + retention policy name, must be of form '/'" + )] + InvalidRetentionPolicy, + #[error("db name cannot be empty")] + Empty, +} + #[derive(Debug, Deserialize)] pub(crate) struct QueryRequest { #[serde(rename = "db")] @@ -795,6 +857,17 @@ pub(crate) struct WriteParams { pub(crate) precision: Precision, } +impl From for WriteParams { + fn from(legacy: iox_http::write::WriteParams) -> Self { + Self { + db: legacy.namespace.to_string(), + // legacy behaviour was to not accept partial: + accept_partial: false, + precision: legacy.precision.into(), + } + } +} + pub(crate) async fn route_request( http_server: Arc>, mut req: Request, @@ -843,6 +916,22 @@ where let content_length = req.headers().get("content-length").cloned(); let response = match (method.clone(), uri.path()) { + (Method::POST, "/write") => { + let params = match http_server.legacy_write_param_unifier.parse_v1(&req).await { + Ok(p) => p.into(), + Err(e) => return Ok(legacy_write_error_to_response(e)), + }; + + http_server.write_lp_inner(params, req, true).await + } + (Method::POST, "/api/v2/write") => { + let params = match http_server.legacy_write_param_unifier.parse_v2(&req).await { + Ok(p) => p.into(), + Err(e) => return Ok(legacy_write_error_to_response(e)), + }; + + http_server.write_lp_inner(params, req, false).await + } (Method::POST, "/api/v3/write_lp") => http_server.write_lp(req).await, (Method::GET | Method::POST, "/api/v3/query_sql") => http_server.query_sql(req).await, (Method::GET | Method::POST, "/api/v3/query_influxql") => { @@ -871,11 +960,26 @@ where } Err(error) => { error!(%error, %method, %uri, ?content_length, "Error while handling request"); - Ok(error.response()) + Ok(error.into_response()) } } } +fn legacy_write_error_to_response(e: WriteParseError) -> Response { + let err: ErrorMessage<()> = ErrorMessage { + error: e.to_string(), + data: None, + }; + let serialized = serde_json::to_string(&err).unwrap(); + let body = Body::from(serialized); + let status = match e { + WriteParseError::NotImplemented => StatusCode::NOT_FOUND, + WriteParseError::SingleTenantError(e) => StatusCode::from(&e), + WriteParseError::MultiTenantError(e) => StatusCode::from(&e), + }; + Response::builder().status(status).body(body).unwrap() +} + async fn pprof_home(req: Request) -> Result> { let default_host = HeaderValue::from_static("localhost"); let host = req @@ -1031,3 +1135,36 @@ async fn pprof_heappy_profile(req: Request) -> Result> { async fn pprof_heappy_profile(_req: Request) -> Result> { Err(Error::HeappyIsNotCompiled) } + +#[cfg(test)] +mod tests { + use super::validate_db_name; + use super::ValidateDbNameError; + + macro_rules! assert_validate_db_name { + ($name:literal, $accept_rp:literal, $expected:pat) => { + let actual = validate_db_name($name, $accept_rp); + assert!(matches!(&actual, $expected), "got: {actual:?}",); + }; + } + + #[test] + fn test_validate_db_name() { + assert_validate_db_name!("foo/bar", false, Err(ValidateDbNameError::InvalidChar)); + assert!(validate_db_name("foo/bar", true).is_ok()); + assert_validate_db_name!( + "foo/bar/baz", + true, + Err(ValidateDbNameError::InvalidRetentionPolicy) + ); + assert_validate_db_name!( + "foo/", + true, + Err(ValidateDbNameError::InvalidRetentionPolicy) + ); + assert_validate_db_name!("foo/bar", false, Err(ValidateDbNameError::InvalidChar)); + assert_validate_db_name!("foo/bar/baz", false, Err(ValidateDbNameError::InvalidChar)); + assert_validate_db_name!("_foo", false, Err(ValidateDbNameError::InvalidStartChar)); + assert_validate_db_name!("", false, Err(ValidateDbNameError::Empty)); + } +} diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index b99d707661..a3997f6911 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -556,7 +556,7 @@ mod tests { assert_eq!( body, "{\ - \"error\":\"db name must use ASCII letters, numbers, underscores and hyphens only\",\ + \"error\":\"invalid character in database name: must be ASCII, containing only letters, numbers, underscores, or hyphens\",\ \"data\":null\ }" ); @@ -584,6 +584,29 @@ mod tests { }" ); + let resp = write_lp( + &server, + "", + "cpu,host=b val=2 155\n", + None, + true, + "nanosecond", + ) + .await; + + let status = resp.status(); + let body = + String::from_utf8(body::to_bytes(resp.into_body()).await.unwrap().to_vec()).unwrap(); + + assert_eq!(status, StatusCode::BAD_REQUEST); + assert_eq!( + body, + "{\ + \"error\":\"db name cannot be empty\",\ + \"data\":null\ + }" + ); + shutdown.cancel(); } diff --git a/influxdb3_write/Cargo.toml b/influxdb3_write/Cargo.toml index 0aa806dc38..d5bf0d9941 100644 --- a/influxdb3_write/Cargo.toml +++ b/influxdb3_write/Cargo.toml @@ -11,6 +11,7 @@ data_types.workspace = true datafusion_util.workspace = true influxdb-line-protocol.workspace = true iox_catalog.workspace = true +iox_http.workspace = true iox_query.workspace = true iox_time.workspace = true parquet_file.workspace = true diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index 60301f6cb7..955d8757f4 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -541,6 +541,17 @@ impl Default for Precision { } } +impl From for Precision { + fn from(legacy: iox_http::write::Precision) -> Self { + match legacy { + iox_http::write::Precision::Second => Precision::Second, + iox_http::write::Precision::Millisecond => Precision::Millisecond, + iox_http::write::Precision::Microsecond => Precision::Microsecond, + iox_http::write::Precision::Nanosecond => Precision::Nanosecond, + } + } +} + /// Guess precision based off of a given timestamp. // Note that this will fail in June 2128, but that's not our problem pub(crate) fn guess_precision(timestamp: i64) -> Precision {