diff --git a/Cargo.lock b/Cargo.lock index 1c2594cf4b..f5614258be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1437,6 +1437,7 @@ dependencies = [ "snafu", "test_helpers", "tokio", + "url", ] [[package]] @@ -1466,6 +1467,7 @@ dependencies = [ "influxdb_tsm", "ingest", "internal_types", + "itertools 0.9.0", "logfmt", "mem_qe", "mutable_buffer", diff --git a/Cargo.toml b/Cargo.toml index 7ec57c6902..bee78699c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,6 +76,7 @@ http = "0.2.0" hyper = "0.14" once_cell = { version = "1.4.0", features = ["parking_lot"] } parking_lot = "0.11.1" +itertools = "0.9.0" # used by arrow/datafusion anyway prettytable-rs = "0.8" prost = "0.7" diff --git a/buf.yaml b/buf.yaml index e728e5f408..db3cfad8bd 100644 --- a/buf.yaml +++ b/buf.yaml @@ -4,6 +4,7 @@ build: - generated_types/protos/ lint: + allow_comment_ignores: true ignore: - google - grpc diff --git a/data_types/src/field_validation.rs b/data_types/src/field_validation.rs index 49ae635a6d..4ebffb0d52 100644 --- a/data_types/src/field_validation.rs +++ b/data_types/src/field_validation.rs @@ -26,7 +26,7 @@ where /// An extension trait that adds the methods `optional` and `required` to any /// Option containing a type implementing `TryInto` -pub(crate) trait FromFieldOpt { +pub trait FromFieldOpt { /// Try to convert inner type, if any, using TryInto calling /// `FieldViolation::scope` on any error encountered /// diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 1d0333bf34..eea1d75d61 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -23,4 +23,4 @@ pub mod timestamp; pub mod wal; mod database_name; -pub(crate) mod field_validation; +pub mod field_validation; diff --git a/docs/regenerating_flatbuffers.md b/docs/regenerating_flatbuffers.md index 1c5f8a07f5..f01bdede18 100644 --- a/docs/regenerating_flatbuffers.md +++ b/docs/regenerating_flatbuffers.md @@ -5,3 +5,6 @@ When updating the version of the [flatbuffers](https://crates.io/crates/flatbuff To update the generated code, edit `generated_types/regenerate-flatbuffers.sh` and set the `FB_COMMIT` variable at the top of the file to the commit SHA of the same commit in the [flatbuffers repository](https://github.com/google/flatbuffers) where the `flatbuffers` Rust crate version was updated. This ensures we'll be [using the same version of `flatc` that the crate was tested with](https://github.com/google/flatbuffers/issues/6199#issuecomment-714562121). Then run the `generated_types/regenerate-flatbuffers.sh` script and check in any changes. Check the whole project builds. + +`generated_types/regenerate-flatbuffers.sh` will build `flatc` from source if it cannot be found. +In order to do that your system will require `bazel`; you can likely install this with your favourite package manager. diff --git a/generated_types/protos/influxdata/iox/management/v1/service.proto b/generated_types/protos/influxdata/iox/management/v1/service.proto index d02999bad3..94e6b273ce 100644 --- a/generated_types/protos/influxdata/iox/management/v1/service.proto +++ b/generated_types/protos/influxdata/iox/management/v1/service.proto @@ -1,7 +1,9 @@ syntax = "proto3"; package influxdata.iox.management.v1; + import "google/longrunning/operations.proto"; +import "google/protobuf/field_mask.proto"; import "influxdata/iox/management/v1/database_rules.proto"; import "influxdata/iox/management/v1/chunk.proto"; import "influxdata/iox/management/v1/partition.proto"; @@ -17,6 +19,11 @@ service ManagementService { rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse); + // Update a database. + // + // Roughly follows the https://google.aip.dev/134 pattern, except we wrap the response + rpc UpdateDatabase(UpdateDatabaseRequest) returns (UpdateDatabaseResponse); + // List chunks available on this database rpc ListChunks(ListChunksRequest) returns (ListChunksResponse); @@ -52,7 +59,6 @@ service ManagementService { // Close a chunk and move it to the read buffer rpc ClosePartitionChunk(ClosePartitionChunkRequest) returns (ClosePartitionChunkResponse); - } message GetWriterIdRequest {} @@ -87,6 +93,16 @@ message CreateDatabaseRequest { message CreateDatabaseResponse {} +// Update a database. +message UpdateDatabaseRequest { + // The rule's `name` field is used to identify the database rules to be updated. + DatabaseRules rules = 1; +} + +message UpdateDatabaseResponse { + DatabaseRules rules = 1; +} + message ListChunksRequest { // the name of the database string db_name = 1; diff --git a/generated_types/regenerate-flatbuffers.sh b/generated_types/regenerate-flatbuffers.sh index 56f0db2ccd..f0a601008a 100755 --- a/generated_types/regenerate-flatbuffers.sh +++ b/generated_types/regenerate-flatbuffers.sh @@ -3,7 +3,7 @@ # The commit where the Rust `flatbuffers` crate version was changed to the version in `Cargo.lock` # Update this, rerun this script, and check in the changes in the generated code when the # `flatbuffers` crate version is updated. -FB_COMMIT="86401e078d0746d2381735415f8c2dfe849f3f52" +FB_COMMIT="261cf3b20473abdf95fc34da0827e4986f065c39" # Change to the generated_types crate directory, where this script is located DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" diff --git a/influxdb2_client/Cargo.toml b/influxdb2_client/Cargo.toml index d18a43bfbe..ef7148fa3e 100644 --- a/influxdb2_client/Cargo.toml +++ b/influxdb2_client/Cargo.toml @@ -11,6 +11,7 @@ reqwest = { version = "0.11", features = ["stream", "json"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.44" snafu = "0.6.6" +url = "2.1.1" [dev-dependencies] # In alphabetical order mockito = "0.26.0" diff --git a/influxdb2_client/examples/query.rs b/influxdb2_client/examples/query.rs new file mode 100644 index 0000000000..10cdde64af --- /dev/null +++ b/influxdb2_client/examples/query.rs @@ -0,0 +1,26 @@ +use influxdb2_client::models::{LanguageRequest, Query}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let influx_url = "http://localhost:8086"; + let token = "some-token"; + + let client = influxdb2_client::Client::new(influx_url, token); + + client.query_suggestions().await?; + client.query_suggestions_name("some-name").await?; + + client + .query("some-org", Some(Query::new("some-query".to_string()))) + .await?; + + client + .query_analyze(Some(Query::new("some-query".to_string()))) + .await?; + + client + .query_ast(Some(LanguageRequest::new("some-query".to_string()))) + .await?; + + Ok(()) +} diff --git a/influxdb2_client/src/api/mod.rs b/influxdb2_client/src/api/mod.rs index 234742170e..ab175958b8 100644 --- a/influxdb2_client/src/api/mod.rs +++ b/influxdb2_client/src/api/mod.rs @@ -1,4 +1,5 @@ //! InfluxDB v2.0 Client API pub mod label; +pub mod query; pub mod ready; -pub mod setup; +pub mod setup; \ No newline at end of file diff --git a/influxdb2_client/src/api/query.rs b/influxdb2_client/src/api/query.rs new file mode 100644 index 0000000000..96d127f4bc --- /dev/null +++ b/influxdb2_client/src/api/query.rs @@ -0,0 +1,318 @@ +//! Query +//! +//! Query InfluxDB using InfluxQL or Flux Query + +use crate::{Client, Http, RequestError, ReqwestProcessing, Serializing}; +use reqwest::{Method, StatusCode}; +use snafu::ResultExt; + +use crate::models::{ + AnalyzeQueryResponse, AstResponse, FluxSuggestion, FluxSuggestions, LanguageRequest, Query, +}; + +impl Client { + /// Get Query Suggestions + pub async fn query_suggestions(&self) -> Result { + let req_url = format!("{}/api/v2/query/suggestions", self.url); + let response = self + .request(Method::GET, &req_url) + .send() + .await + .context(ReqwestProcessing)?; + + match response.status() { + StatusCode::OK => Ok(response + .json::() + .await + .context(ReqwestProcessing)?), + status => { + let text = response.text().await.context(ReqwestProcessing)?; + Http { status, text }.fail()? + } + } + } + + /// Query Suggestions with name + pub async fn query_suggestions_name(&self, name: &str) -> Result { + let req_url = format!( + "{}/api/v2/query/suggestions/{name}", + self.url, + name = crate::common::urlencode(name), + ); + + let response = self + .request(Method::GET, &req_url) + .send() + .await + .context(ReqwestProcessing)?; + + match response.status() { + StatusCode::OK => Ok(response + .json::() + .await + .context(ReqwestProcessing)?), + status => { + let text = response.text().await.context(ReqwestProcessing)?; + Http { status, text }.fail()? + } + } + } + + /// Query + pub async fn query(&self, org: &str, query: Option) -> Result { + let req_url = format!("{}/api/v2/query", self.url); + + let response = self + .request(Method::POST, &req_url) + .header("Accepting-Encoding", "identity") + .header("Content-Type", "application/json") + .query(&[("org", &org)]) + .body(serde_json::to_string(&query.unwrap_or_default()).context(Serializing)?) + .send() + .await + .context(ReqwestProcessing)?; + + match response.status() { + StatusCode::OK => Ok(response.json::().await.context(ReqwestProcessing)?), + status => { + let text = response.text().await.context(ReqwestProcessing)?; + Http { status, text }.fail()? + } + } + } + + /// Analyze Query + pub async fn query_analyze( + &self, + query: Option, + ) -> Result { + let req_url = format!("{}/api/v2/query/analyze", self.url); + + let response = self + .request(Method::POST, &req_url) + .header("Content-Type", "application/json") + .body(serde_json::to_string(&query.unwrap_or_default()).context(Serializing)?) + .send() + .await + .context(ReqwestProcessing)?; + + match response.status() { + StatusCode::OK => Ok(response + .json::() + .await + .context(ReqwestProcessing)?), + status => { + let text = response.text().await.context(ReqwestProcessing)?; + Http { status, text }.fail()? + } + } + } + + /// Get Query AST Repsonse + pub async fn query_ast( + &self, + language_request: Option, + ) -> Result { + let req_url = format!("{}/api/v2/query/ast", self.url); + + let response = self + .request(Method::POST, &req_url) + .header("Content-Type", "application/json") + .body( + serde_json::to_string(&language_request.unwrap_or_default()) + .context(Serializing)?, + ) + .send() + .await + .context(ReqwestProcessing)?; + + match response.status() { + StatusCode::OK => Ok(response + .json::() + .await + .context(ReqwestProcessing)?), + status => { + let text = response.text().await.context(ReqwestProcessing)?; + Http { status, text }.fail()? + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use mockito::{mock, Matcher}; + + #[tokio::test] + async fn query_suggestions() { + let token = "some-token"; + + let mock_server = mock("GET", "/api/v2/query/suggestions") + .match_header("Authorization", format!("Token {}", token).as_str()) + .create(); + + let client = Client::new(&mockito::server_url(), token); + + let _result = client.query_suggestions().await; + + mock_server.assert(); + } + + #[tokio::test] + async fn query_suggestions_name() { + let token = "some-token"; + let suggestion_name = "some-name"; + + let mock_server = mock( + "GET", + format!( + "/api/v2/query/suggestions/{name}", + name = crate::common::urlencode(suggestion_name) + ) + .as_str(), + ) + .match_header("Authorization", format!("Token {}", token).as_str()) + .create(); + + let client = Client::new(&mockito::server_url(), token); + + let _result = client.query_suggestions_name(&suggestion_name).await; + + mock_server.assert(); + } + + #[tokio::test] + async fn query() { + let token = "some-token"; + let org = "some-org"; + let query: Option = Some(Query::new("some-influx-query-string".to_string())); + let mock_server = mock("POST", "/api/v2/query") + .match_header("Authorization", format!("Token {}", token).as_str()) + .match_header("Accepting-Encoding", "identity") + .match_header("Content-Type", "application/json") + .match_query(Matcher::UrlEncoded("org".into(), org.into())) + .match_body( + serde_json::to_string(&query.clone().unwrap_or_default()) + .unwrap() + .as_str(), + ) + .create(); + + let client = Client::new(&mockito::server_url(), token); + + let _result = client.query(org, query).await; + + mock_server.assert(); + } + + #[tokio::test] + async fn query_opt() { + let token = "some-token"; + let org = "some-org"; + let query: Option = None; + + let mock_server = mock("POST", "/api/v2/query") + .match_header("Authorization", format!("Token {}", token).as_str()) + .match_header("Accepting-Encoding", "identity") + .match_header("Content-Type", "application/json") + .match_query(Matcher::UrlEncoded("org".into(), org.into())) + .match_body( + serde_json::to_string(&query.unwrap_or_default()) + .unwrap() + .as_str(), + ) + .create(); + + let client = Client::new(&mockito::server_url(), token); + + let _result = client.query(org, None).await; + + mock_server.assert(); + } + + #[tokio::test] + async fn query_analyze() { + let token = "some-token"; + let query: Option = Some(Query::new("some-influx-query-string".to_string())); + let mock_server = mock("POST", "/api/v2/query/analyze") + .match_header("Authorization", format!("Token {}", token).as_str()) + .match_header("Content-Type", "application/json") + .match_body( + serde_json::to_string(&query.clone().unwrap_or_default()) + .unwrap() + .as_str(), + ) + .create(); + + let client = Client::new(&mockito::server_url(), token); + + let _result = client.query_analyze(query).await; + + mock_server.assert(); + } + + #[tokio::test] + async fn query_analyze_opt() { + let token = "some-token"; + let query: Option = None; + let mock_server = mock("POST", "/api/v2/query/analyze") + .match_header("Authorization", format!("Token {}", token).as_str()) + .match_header("Content-Type", "application/json") + .match_body( + serde_json::to_string(&query.clone().unwrap_or_default()) + .unwrap() + .as_str(), + ) + .create(); + + let client = Client::new(&mockito::server_url(), token); + + let _result = client.query_analyze(query).await; + + mock_server.assert(); + } + + #[tokio::test] + async fn query_ast() { + let token = "some-token"; + let language_request: Option = + Some(LanguageRequest::new("some-influx-query-string".to_string())); + let mock_server = mock("POST", "/api/v2/query/ast") + .match_header("Authorization", format!("Token {}", token).as_str()) + .match_header("Content-Type", "application/json") + .match_body( + serde_json::to_string(&language_request.clone().unwrap_or_default()) + .unwrap() + .as_str(), + ) + .create(); + + let client = Client::new(&mockito::server_url(), token); + + let _result = client.query_ast(language_request).await; + + mock_server.assert(); + } + + #[tokio::test] + async fn query_ast_opt() { + let token = "some-token"; + let language_request: Option = None; + let mock_server = mock("POST", "/api/v2/query/ast") + .match_header("Authorization", format!("Token {}", token).as_str()) + .match_header("Content-Type", "application/json") + .match_body( + serde_json::to_string(&language_request.clone().unwrap_or_default()) + .unwrap() + .as_str(), + ) + .create(); + + let client = Client::new(&mockito::server_url(), token); + + let _result = client.query_ast(language_request).await; + + mock_server.assert(); + } +} diff --git a/influxdb2_client/src/common.rs b/influxdb2_client/src/common.rs new file mode 100644 index 0000000000..d51ea203f8 --- /dev/null +++ b/influxdb2_client/src/common.rs @@ -0,0 +1,8 @@ +//! Common +//! +//! Collection of helper functions + +/// Serialize to application/x-www-form-urlencoded syntax +pub fn urlencode>(s: T) -> String { + ::url::form_urlencoded::byte_serialize(s.as_ref().as_bytes()).collect() +} diff --git a/influxdb2_client/src/lib.rs b/influxdb2_client/src/lib.rs index 0fe80f80c7..96bb717f43 100644 --- a/influxdb2_client/src/lib.rs +++ b/influxdb2_client/src/lib.rs @@ -308,5 +308,7 @@ cpu,host=server01,region=us-west usage=0.87 } } +pub mod common; + pub mod api; pub mod models; diff --git a/influxdb2_client/src/models/ast/call_expression.rs b/influxdb2_client/src/models/ast/call_expression.rs new file mode 100644 index 0000000000..b50771f30b --- /dev/null +++ b/influxdb2_client/src/models/ast/call_expression.rs @@ -0,0 +1,24 @@ +//! CallExpression + +use serde::{Deserialize, Serialize}; + +/// Represents a function call +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct CallExpression { + /// Type of AST node + #[serde(rename = "type", skip_serializing_if = "Option::is_none")] + pub r#type: Option, + /// Callee + #[serde(skip_serializing_if = "Option::is_none")] + pub callee: Option>, + /// Function arguments + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub arguments: Vec, +} + +impl CallExpression { + /// Represents a function call + pub fn new() -> Self { + Self::default() + } +} diff --git a/influxdb2_client/src/models/ast/dialect.rs b/influxdb2_client/src/models/ast/dialect.rs new file mode 100644 index 0000000000..e17253bc08 --- /dev/null +++ b/influxdb2_client/src/models/ast/dialect.rs @@ -0,0 +1,52 @@ +//! Dialect + +use serde::{Deserialize, Serialize}; + +/// Dialect are options to change the default CSV output format; https://www.w3.org/TR/2015/REC-tabular-metadata-20151217/#dialect-descriptions +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Dialect { + /// If true, the results will contain a header row + #[serde(skip_serializing_if = "Option::is_none")] + pub header: Option, + /// Separator between cells; the default is , + #[serde(skip_serializing_if = "Option::is_none")] + pub delimiter: Option, + /// https://www.w3.org/TR/2015/REC-tabular-data-model-20151217/#columns + #[serde(skip_serializing_if = "Option::is_none")] + pub annotations: Option, + /// Character prefixed to comment strings + #[serde(skip_serializing_if = "Option::is_none")] + pub comment_prefix: Option, + /// Format of timestamps + #[serde(skip_serializing_if = "Option::is_none")] + pub date_time_format: Option, +} + +impl Dialect { + /// Dialect are options to change the default CSV output format; https://www.w3.org/TR/2015/REC-tabular-metadata-20151217/#dialect-descriptions + pub fn new() -> Self { + Self::default() + } +} + +/// https://www.w3.org/TR/2015/REC-tabular-data-model-20151217/#columns +#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum Annotations { + /// Group Annotation + Group, + /// Datatype Annotation + Datatype, + /// Default Annotation + Default, +} + +/// Timestamp Format +#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] +pub enum DateTimeFormat { + /// RFC3339 + RFC3339, + /// RFC3339Nano + RFC3339Nano, +} diff --git a/influxdb2_client/src/models/ast/dict_item.rs b/influxdb2_client/src/models/ast/dict_item.rs new file mode 100644 index 0000000000..56de76eabd --- /dev/null +++ b/influxdb2_client/src/models/ast/dict_item.rs @@ -0,0 +1,24 @@ +//! DictItem + +use serde::{Deserialize, Serialize}; + +/// A key/value pair in a dictionary +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct DictItem { + /// Type of AST node + #[serde(rename = "type", skip_serializing_if = "Option::is_none")] + pub r#type: Option, + /// Key + #[serde(skip_serializing_if = "Option::is_none")] + pub key: Option, + /// Value + #[serde(skip_serializing_if = "Option::is_none")] + pub val: Option, +} + +impl DictItem { + /// A key/value pair in a dictionary + pub fn new() -> Self { + Self::default() + } +} diff --git a/influxdb2_client/src/models/ast/duration.rs b/influxdb2_client/src/models/ast/duration.rs new file mode 100644 index 0000000000..83f1c19ad9 --- /dev/null +++ b/influxdb2_client/src/models/ast/duration.rs @@ -0,0 +1,27 @@ +//! Duration + +use serde::{Deserialize, Serialize}; + +/// Duration : A pair consisting of length of time and the unit of time +/// measured. It is the atomic unit from which all duration literals are +/// composed. +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct Duration { + /// Type of AST node + #[serde(rename = "type", skip_serializing_if = "Option::is_none")] + pub r#type: Option, + /// Duration Magnitude + #[serde(skip_serializing_if = "Option::is_none")] + pub magnitude: Option, + /// Duration unit + #[serde(skip_serializing_if = "Option::is_none")] + pub unit: Option, +} + +impl Duration { + /// A pair consisting of length of time and the unit of time measured. It is + /// the atomic unit from which all duration literals are composed. + pub fn new() -> Self { + Self::default() + } +} diff --git a/influxdb2_client/src/models/ast/expression.rs b/influxdb2_client/src/models/ast/expression.rs new file mode 100644 index 0000000000..87c6bb8e73 --- /dev/null +++ b/influxdb2_client/src/models/ast/expression.rs @@ -0,0 +1,84 @@ +//! Expression + +use serde::{Deserialize, Serialize}; + +/// Expression AST +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct Expression { + /// Type of AST node + #[serde(rename = "type", skip_serializing_if = "Option::is_none")] + pub r#type: Option, + /// Elements of the dictionary + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub elements: Vec, + /// Function parameters + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub params: Vec, + /// Node + #[serde(skip_serializing_if = "Option::is_none")] + pub body: Option, + /// Operator + #[serde(skip_serializing_if = "Option::is_none")] + pub operator: Option, + /// Left leaf + #[serde(skip_serializing_if = "Option::is_none")] + pub left: Option>, + /// Right leaf + #[serde(skip_serializing_if = "Option::is_none")] + pub right: Option>, + /// Parent Expression + #[serde(skip_serializing_if = "Option::is_none")] + pub callee: Option>, + /// Function arguments + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub arguments: Vec, + /// Test Expr + #[serde(skip_serializing_if = "Option::is_none")] + pub test: Option>, + /// Alternate Expr + #[serde(skip_serializing_if = "Option::is_none")] + pub alternate: Option>, + /// Consequent Expr + #[serde(skip_serializing_if = "Option::is_none")] + pub consequent: Option>, + /// Object Expr + #[serde(skip_serializing_if = "Option::is_none")] + pub object: Option>, + /// PropertyKey Expr + #[serde(skip_serializing_if = "Option::is_none")] + pub property: Option>, + /// Array Expr + #[serde(skip_serializing_if = "Option::is_none")] + pub array: Option>, + /// Index Expr + #[serde(skip_serializing_if = "Option::is_none")] + pub index: Option>, + /// Properties + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub properties: Vec, + /// Expression + #[serde(skip_serializing_if = "Option::is_none")] + pub expression: Option>, + /// Argument + #[serde(skip_serializing_if = "Option::is_none")] + pub argument: Option>, + /// Call Expr + #[serde(skip_serializing_if = "Option::is_none")] + pub call: Option, + /// Expression Value + #[serde(skip_serializing_if = "Option::is_none")] + pub value: Option, + /// Duration values + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub values: Vec, + /// Expression Name + #[serde(skip_serializing_if = "Option::is_none")] + pub name: Option, +} + +impl Expression { + /// Return instance of expression + pub fn new() -> Self { + Self::default() + } +} diff --git a/influxdb2_client/src/models/ast/identifier.rs b/influxdb2_client/src/models/ast/identifier.rs new file mode 100644 index 0000000000..d604aae6ca --- /dev/null +++ b/influxdb2_client/src/models/ast/identifier.rs @@ -0,0 +1,21 @@ +//! Idendifier + +use serde::{Deserialize, Serialize}; + +/// A valid Flux identifier +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct Identifier { + /// Type of AST node + #[serde(rename = "type", skip_serializing_if = "Option::is_none")] + pub r#type: Option, + /// Identifier Name + #[serde(skip_serializing_if = "Option::is_none")] + pub name: Option, +} + +impl Identifier { + /// A valid Flux identifier + pub fn new() -> Self { + Self::default() + } +} diff --git a/influxdb2_client/src/models/ast/import_declaration.rs b/influxdb2_client/src/models/ast/import_declaration.rs new file mode 100644 index 0000000000..617fd300f4 --- /dev/null +++ b/influxdb2_client/src/models/ast/import_declaration.rs @@ -0,0 +1,24 @@ +//! ImportDeclaration + +use serde::{Deserialize, Serialize}; + +/// Declares a package import +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct ImportDeclaration { + /// Type of AST node + #[serde(rename = "type", skip_serializing_if = "Option::is_none")] + pub r#type: Option, + /// Import Identifier + #[serde(rename = "as", skip_serializing_if = "Option::is_none")] + pub r#as: Option, + /// Import Path + #[serde(skip_serializing_if = "Option::is_none")] + pub path: Option, +} + +impl ImportDeclaration { + /// Declares a package import + pub fn new() -> Self { + Self::default() + } +} diff --git a/influxdb2_client/src/models/ast/member_expression.rs b/influxdb2_client/src/models/ast/member_expression.rs new file mode 100644 index 0000000000..b44aa43938 --- /dev/null +++ b/influxdb2_client/src/models/ast/member_expression.rs @@ -0,0 +1,24 @@ +//! MemberExpression + +use serde::{Deserialize, Serialize}; + +/// Represents accessing a property of an object +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct MemberExpression { + /// Type of AST node + #[serde(rename = "type", skip_serializing_if = "Option::is_none")] + pub r#type: Option, + /// Member object + #[serde(skip_serializing_if = "Option::is_none")] + pub object: Option, + /// Member Property + #[serde(skip_serializing_if = "Option::is_none")] + pub property: Option, +} + +impl MemberExpression { + /// Represents accessing a property of an object + pub fn new() -> Self { + Self::default() + } +} diff --git a/influxdb2_client/src/models/ast/mod.rs b/influxdb2_client/src/models/ast/mod.rs new file mode 100644 index 0000000000..f170a857e1 --- /dev/null +++ b/influxdb2_client/src/models/ast/mod.rs @@ -0,0 +1,34 @@ +//! Query AST models + +pub mod identifier; +pub use self::identifier::Identifier; +pub mod statement; +pub use self::statement::Statement; +pub mod expression; +pub use self::expression::Expression; +pub mod call_expression; +pub use self::call_expression::CallExpression; +pub mod member_expression; +pub use self::member_expression::MemberExpression; +pub mod string_literal; +pub use self::string_literal::StringLiteral; +pub mod dict_item; +pub use self::dict_item::DictItem; +pub mod variable_assignment; +pub use self::variable_assignment::VariableAssignment; +pub mod node; +pub use self::node::Node; +pub mod property; +pub use self::property::Property; +pub mod property_key; +pub use self::property_key::PropertyKey; +pub mod dialect; +pub use self::dialect::Dialect; +pub mod import_declaration; +pub use self::import_declaration::ImportDeclaration; +pub mod package; +pub use self::package::Package; +pub mod package_clause; +pub use self::package_clause::PackageClause; +pub mod duration; +pub use self::duration::Duration; diff --git a/influxdb2_client/src/models/ast/node.rs b/influxdb2_client/src/models/ast/node.rs new file mode 100644 index 0000000000..33b76362cc --- /dev/null +++ b/influxdb2_client/src/models/ast/node.rs @@ -0,0 +1,84 @@ +//! Node + +use serde::{Deserialize, Serialize}; + +/// Node +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct Node { + /// Type of AST node + #[serde(rename = "type", skip_serializing_if = "Option::is_none")] + pub r#type: Option, + /// Elements of the dictionary + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub elements: Vec, + /// Function parameters + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub params: Vec, + /// Block body + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub body: Vec, + /// Node Operator + #[serde(skip_serializing_if = "Option::is_none")] + pub operator: Option, + /// Left left node + #[serde(skip_serializing_if = "Option::is_none")] + pub left: Option>, + /// Right right node + #[serde(skip_serializing_if = "Option::is_none")] + pub right: Option>, + /// Parent node + #[serde(skip_serializing_if = "Option::is_none")] + pub callee: Option>, + /// Function arguments + #[serde(skip_serializing_if = "Vec::is_empty")] + pub arguments: Vec, + /// Test Expr + #[serde(skip_serializing_if = "Option::is_none")] + pub test: Option>, + /// Alternate Expr + #[serde(skip_serializing_if = "Option::is_none")] + pub alternate: Option>, + /// Consequent Expr + #[serde(skip_serializing_if = "Option::is_none")] + pub consequent: Option>, + /// Object Expr + #[serde(skip_serializing_if = "Option::is_none")] + pub object: Option>, + /// PropertyKey + #[serde(skip_serializing_if = "Option::is_none")] + pub property: Option, + /// Array Expr + #[serde(skip_serializing_if = "Option::is_none")] + pub array: Option>, + /// Index Expr + #[serde(skip_serializing_if = "Option::is_none")] + pub index: Option>, + /// Object properties + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub properties: Vec, + /// Expression + #[serde(skip_serializing_if = "Option::is_none")] + pub expression: Option>, + /// Node arguments + #[serde(skip_serializing_if = "Option::is_none")] + pub argument: Option>, + /// Call Expr + #[serde(skip_serializing_if = "Option::is_none")] + pub call: Option, + /// Node Value + #[serde(skip_serializing_if = "Option::is_none")] + pub value: Option, + /// Duration values + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub values: Vec, + /// Node name + #[serde(skip_serializing_if = "Option::is_none")] + pub name: Option, +} + +impl Node { + /// Return instance of Node + pub fn new() -> Self { + Self::default() + } +} diff --git a/influxdb2_client/src/models/ast/package.rs b/influxdb2_client/src/models/ast/package.rs new file mode 100644 index 0000000000..b0adda6ed8 --- /dev/null +++ b/influxdb2_client/src/models/ast/package.rs @@ -0,0 +1,28 @@ +//! Package + +use crate::models::File; +use serde::{Deserialize, Serialize}; + +/// Represents a complete package source tree. +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct Package { + /// Type of AST node + #[serde(rename = "type", skip_serializing_if = "Option::is_none")] + pub r#type: Option, + /// Package import path + #[serde(skip_serializing_if = "Option::is_none")] + pub path: Option, + /// Package name + #[serde(skip_serializing_if = "Option::is_none")] + pub package: Option, + /// Package files + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub files: Vec, +} + +impl Package { + /// Represents a complete package source tree. + pub fn new() -> Self { + Self::default() + } +} diff --git a/influxdb2_client/src/models/ast/package_clause.rs b/influxdb2_client/src/models/ast/package_clause.rs new file mode 100644 index 0000000000..80909dab82 --- /dev/null +++ b/influxdb2_client/src/models/ast/package_clause.rs @@ -0,0 +1,21 @@ +//! PackageClause + +use serde::{Deserialize, Serialize}; + +/// Defines a package identifier +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct PackageClause { + /// Type of AST node + #[serde(rename = "type", skip_serializing_if = "Option::is_none")] + pub r#type: Option, + /// Package name + #[serde(skip_serializing_if = "Option::is_none")] + pub name: Option, +} + +impl PackageClause { + /// Defines a package identifier + pub fn new() -> Self { + Self::default() + } +} diff --git a/influxdb2_client/src/models/ast/property.rs b/influxdb2_client/src/models/ast/property.rs new file mode 100644 index 0000000000..5458595114 --- /dev/null +++ b/influxdb2_client/src/models/ast/property.rs @@ -0,0 +1,24 @@ +//! Property + +use serde::{Deserialize, Serialize}; + +/// The value associated with a key +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct Property { + /// Type of AST node + #[serde(rename = "type", skip_serializing_if = "Option::is_none")] + pub r#type: Option, + /// Property Key + #[serde(skip_serializing_if = "Option::is_none")] + pub key: Option, + /// Property Value + #[serde(skip_serializing_if = "Option::is_none")] + pub value: Option, +} + +impl Property { + /// The value associated with a key + pub fn new() -> Self { + Self::default() + } +} diff --git a/influxdb2_client/src/models/ast/property_key.rs b/influxdb2_client/src/models/ast/property_key.rs new file mode 100644 index 0000000000..d5dfe6c800 --- /dev/null +++ b/influxdb2_client/src/models/ast/property_key.rs @@ -0,0 +1,24 @@ +//! PropertyKey + +use serde::{Deserialize, Serialize}; + +/// Key value pair +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct PropertyKey { + /// Type of AST node + #[serde(rename = "type", skip_serializing_if = "Option::is_none")] + pub r#type: Option, + /// PropertyKey name + #[serde(skip_serializing_if = "Option::is_none")] + pub name: Option, + /// PropertyKey value + #[serde(skip_serializing_if = "Option::is_none")] + pub value: Option, +} + +impl PropertyKey { + /// Returns an instance of PropertyKey + pub fn new() -> Self { + Self::default() + } +} diff --git a/influxdb2_client/src/models/ast/statement.rs b/influxdb2_client/src/models/ast/statement.rs new file mode 100644 index 0000000000..70a18c46e0 --- /dev/null +++ b/influxdb2_client/src/models/ast/statement.rs @@ -0,0 +1,39 @@ +//! Statement + +use serde::{Deserialize, Serialize}; + +/// Expression Statement +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct Statement { + /// Type of AST node + #[serde(rename = "type", skip_serializing_if = "Option::is_none")] + pub r#type: Option, + /// Raw source text + #[serde(skip_serializing_if = "Option::is_none")] + pub text: Option, + /// Statement identitfier + #[serde(skip_serializing_if = "Option::is_none")] + pub id: Option, + /// Initial Value + #[serde(skip_serializing_if = "Option::is_none")] + pub init: Option, + /// Member + #[serde(skip_serializing_if = "Option::is_none")] + pub member: Option, + /// Expression + #[serde(skip_serializing_if = "Option::is_none")] + pub expression: Option, + /// Argument + #[serde(skip_serializing_if = "Option::is_none")] + pub argument: Option, + /// Assignment + #[serde(skip_serializing_if = "Option::is_none")] + pub assignment: Option, +} + +impl Statement { + /// Returns an instance of Statement + pub fn new() -> Self { + Self::default() + } +} diff --git a/influxdb2_client/src/models/ast/string_literal.rs b/influxdb2_client/src/models/ast/string_literal.rs new file mode 100644 index 0000000000..606b9cb550 --- /dev/null +++ b/influxdb2_client/src/models/ast/string_literal.rs @@ -0,0 +1,21 @@ +//! StringLiteral + +use serde::{Deserialize, Serialize}; + +/// Expressions begin and end with double quote marks +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct StringLiteral { + /// Type of AST node + #[serde(rename = "type", skip_serializing_if = "Option::is_none")] + pub r#type: Option, + /// StringLiteral Value + #[serde(skip_serializing_if = "Option::is_none")] + pub value: Option, +} + +impl StringLiteral { + /// Expressions begin and end with double quote marks + pub fn new() -> Self { + Self::default() + } +} diff --git a/influxdb2_client/src/models/ast/variable_assignment.rs b/influxdb2_client/src/models/ast/variable_assignment.rs new file mode 100644 index 0000000000..c2260fdad4 --- /dev/null +++ b/influxdb2_client/src/models/ast/variable_assignment.rs @@ -0,0 +1,24 @@ +//! VariableAssignment + +use serde::{Deserialize, Serialize}; + +/// Represents the declaration of a variable +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct VariableAssignment { + /// Type of AST node + #[serde(rename = "type", skip_serializing_if = "Option::is_none")] + pub r#type: Option, + /// Variable Identifier + #[serde(skip_serializing_if = "Option::is_none")] + pub id: Option, + /// Variable initial value + #[serde(skip_serializing_if = "Option::is_none")] + pub init: Option, +} + +impl VariableAssignment { + /// Represents the declaration of a variable + pub fn new() -> Self { + Self::default() + } +} diff --git a/influxdb2_client/src/models/file.rs b/influxdb2_client/src/models/file.rs new file mode 100644 index 0000000000..98c6071daa --- /dev/null +++ b/influxdb2_client/src/models/file.rs @@ -0,0 +1,30 @@ +//! File + +use serde::{Deserialize, Serialize}; + +/// Represents a source from a single file +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct File { + /// Type of AST node + #[serde(rename = "type", skip_serializing_if = "Option::is_none")] + pub r#type: Option, + /// The name of the file. + #[serde(skip_serializing_if = "Option::is_none")] + pub name: Option, + /// PackageClause + #[serde(skip_serializing_if = "Option::is_none")] + pub package: Option, + /// A list of package imports + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub imports: Vec, + /// List of Flux statements + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub body: Vec, +} + +impl File { + /// Represents a source from a single file + pub fn new() -> Self { + Self::default() + } +} diff --git a/influxdb2_client/src/models/mod.rs b/influxdb2_client/src/models/mod.rs index 8e56de2c47..e2f78bba13 100644 --- a/influxdb2_client/src/models/mod.rs +++ b/influxdb2_client/src/models/mod.rs @@ -2,6 +2,8 @@ //! //! Roughly follows the OpenAPI specification +pub mod ast; + pub mod user; pub use self::user::{User, UserLinks, Users, UsersLinks}; pub mod organization; @@ -22,3 +24,10 @@ pub mod resource; pub use self::resource::Resource; pub mod retention_rule; pub use self::retention_rule::RetentionRule; +pub mod query; +pub use self::query::{ + AnalyzeQueryResponse, AnalyzeQueryResponseErrors, AstResponse, FluxSuggestion, FluxSuggestions, + LanguageRequest, Query, +}; +pub mod file; +pub use self::file::File; diff --git a/influxdb2_client/src/models/query.rs b/influxdb2_client/src/models/query.rs new file mode 100644 index 0000000000..ce37174ac7 --- /dev/null +++ b/influxdb2_client/src/models/query.rs @@ -0,0 +1,145 @@ +//! Query + +use crate::models::ast::Package; +use crate::models::File; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Query influx using the Flux language +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct Query { + /// Query Script + #[serde(rename = "extern", skip_serializing_if = "Option::is_none")] + pub r#extern: Option, + /// Query script to execute. + pub query: String, + /// The type of query. Must be \"flux\". + #[serde(rename = "type", skip_serializing_if = "Option::is_none")] + pub r#type: Option, + /// Dialect + #[serde(skip_serializing_if = "Option::is_none")] + pub dialect: Option, + /// Specifies the time that should be reported as "now" in the query. + /// Default is the server's now time. + #[serde(skip_serializing_if = "Option::is_none")] + pub now: Option, +} + +impl Query { + /// Query influx using the Flux language + pub fn new(query: String) -> Self { + Self { + query, + ..Default::default() + } + } +} + +/// The type of query. Must be \"flux\". +#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum Type { + /// Query Type + Flux, +} + +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +/// Flux Query Suggestion +pub struct FluxSuggestion { + /// Suggestion Name + #[serde(skip_serializing_if = "Option::is_none")] + pub name: Option, + /// Suggestion Params + #[serde(skip_serializing_if = "Option::is_none")] + pub params: Option>, +} + +impl FluxSuggestion { + /// Returns an instance FluxSuggestion + pub fn new() -> Self { + Self::default() + } +} + +/// FluxSuggestions +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct FluxSuggestions { + /// List of Flux Suggestions + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub funcs: Vec, +} + +impl FluxSuggestions { + /// Return an instance of FluxSuggestions + pub fn new() -> Self { + Self::default() + } +} + +/// AnalyzeQueryResponse +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct AnalyzeQueryResponse { + /// List of QueryResponseErrors + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub errors: Vec, +} + +impl AnalyzeQueryResponse { + /// Return an instance of AnanlyzeQueryResponse + pub fn new() -> Self { + Self::default() + } +} + +/// AnalyzeQueryResponseErrors +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct AnalyzeQueryResponseErrors { + /// Error line + #[serde(skip_serializing_if = "Option::is_none")] + pub line: Option, + /// Error column + #[serde(skip_serializing_if = "Option::is_none")] + pub column: Option, + /// Error char + #[serde(skip_serializing_if = "Option::is_none")] + pub character: Option, + /// Error message + #[serde(skip_serializing_if = "Option::is_none")] + pub message: Option, +} + +impl AnalyzeQueryResponseErrors { + /// Return an instance of AnalyzeQueryResponseErrors + pub fn new() -> Self { + Self::default() + } +} + +/// AstResponse : Contains the AST for the supplied Flux query +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct AstResponse { + /// AST of Flux query + #[serde(skip_serializing_if = "Option::is_none")] + pub ast: Option, +} + +impl AstResponse { + /// Contains the AST for the supplied Flux query + pub fn new() -> Self { + Self::default() + } +} + +/// LanguageRequest : Flux query to be analyzed. +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct LanguageRequest { + /// Flux query script to be analyzed + pub query: String, +} + +impl LanguageRequest { + /// Flux query to be analyzed. + pub fn new(query: String) -> Self { + Self { query } + } +} diff --git a/influxdb_iox_client/src/client/management.rs b/influxdb_iox_client/src/client/management.rs index 29a786c8ef..d46e249691 100644 --- a/influxdb_iox_client/src/client/management.rs +++ b/influxdb_iox_client/src/client/management.rs @@ -53,6 +53,26 @@ pub enum CreateDatabaseError { ServerError(tonic::Status), } +/// Errors returned by Client::update_database +#[derive(Debug, Error)] +pub enum UpdateDatabaseError { + /// Writer ID is not set + #[error("Writer ID not set")] + NoWriterId, + + /// Database not found + #[error("Database not found")] + DatabaseNotFound, + + /// Server returned an invalid argument error + #[error("Unexpected server error: {}: {}", .0.code(), .0.message())] + InvalidArgument(tonic::Status), + + /// Client received an unexpected error from the server + #[error("Unexpected server error: {}: {}", .0.code(), .0.message())] + ServerError(tonic::Status), +} + /// Errors returned by Client::list_databases #[derive(Debug, Error)] pub enum ListDatabaseError { @@ -271,6 +291,25 @@ impl Client { Ok(()) } + /// Updates the configuration for a database. + pub async fn update_database( + &mut self, + rules: DatabaseRules, + ) -> Result { + let response = self + .inner + .update_database(UpdateDatabaseRequest { rules: Some(rules) }) + .await + .map_err(|status| match status.code() { + tonic::Code::NotFound => UpdateDatabaseError::DatabaseNotFound, + tonic::Code::FailedPrecondition => UpdateDatabaseError::NoWriterId, + tonic::Code::InvalidArgument => UpdateDatabaseError::InvalidArgument(status), + _ => UpdateDatabaseError::ServerError(status), + })?; + + Ok(response.into_inner().rules.unwrap()) + } + /// List databases. pub async fn list_databases(&mut self) -> Result, ListDatabaseError> { let response = self diff --git a/server/src/config.rs b/server/src/config.rs index fa37ea59d0..d662109920 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -33,6 +33,17 @@ pub(crate) struct Config { state: RwLock, } +pub(crate) enum UpdateError { + Update(Error), + Closure(E), +} + +impl From for UpdateError { + fn from(e: Error) -> Self { + Self::Update(e) + } +} + impl Config { pub(crate) fn new(jobs: Arc) -> Self { Self { @@ -67,6 +78,27 @@ impl Config { state.databases.keys().cloned().collect() } + pub(crate) fn update_db_rules( + &self, + db_name: &DatabaseName<'static>, + update: F, + ) -> std::result::Result> + where + F: FnOnce(DatabaseRules) -> std::result::Result, + { + let state = self.state.read().expect("mutex poisoned"); + let db_state = state + .databases + .get(db_name) + .ok_or_else(|| Error::DatabaseNotFound { + db_name: db_name.to_string(), + })?; + + let mut rules = db_state.db.rules.write(); + *rules = update(rules.clone()).map_err(UpdateError::Closure)?; + Ok(rules.clone()) + } + pub(crate) fn remotes_sorted(&self) -> Vec<(WriterId, String)> { let state = self.state.read().expect("mutex poisoned"); state.remotes.iter().map(|(&a, b)| (a, b.clone())).collect() diff --git a/server/src/lib.rs b/server/src/lib.rs index 3046d18639..5f981f5f4e 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -192,6 +192,18 @@ pub struct Server { jobs: Arc, } +#[derive(Debug)] +pub enum UpdateError { + Update(Error), + Closure(E), +} + +impl From for UpdateError { + fn from(e: Error) -> Self { + Self::Update(e) + } +} + impl Server { pub fn new(connection_manager: M, store: Arc) -> Self { let jobs = Arc::new(JobRegistry::new()); @@ -406,6 +418,26 @@ impl Server { self.config.db(name).map(|d| d.rules.read().clone()) } + // Update database rules and save on success. + pub async fn update_db_rules( + &self, + db_name: &DatabaseName<'static>, + update: F, + ) -> std::result::Result> + where + F: FnOnce(DatabaseRules) -> Result, + { + let rules = self + .config + .update_db_rules(db_name, update) + .map_err(|e| match e { + crate::config::UpdateError::Closure(e) => UpdateError::Closure(e), + crate::config::UpdateError::Update(e) => UpdateError::Update(e), + })?; + self.persist_database_rules(rules.clone()).await?; + Ok(rules) + } + pub fn remotes_sorted(&self) -> Vec<(WriterId, String)> { self.config.remotes_sorted() } diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index 49af054052..ef51b8de99 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -3,9 +3,12 @@ use std::fmt::Debug; use std::sync::Arc; use data_types::database_rules::DatabaseRules; -use data_types::DatabaseName; -use generated_types::google::{AlreadyExists, FieldViolation, FieldViolationExt, NotFound}; +use data_types::{field_validation::FromFieldOpt, DatabaseName}; +use generated_types::google::{ + AlreadyExists, FieldViolation, FieldViolationExt, InternalError, NotFound, +}; use generated_types::influxdata::iox::management::v1::*; +use observability_deps::tracing::info; use query::{Database, DatabaseStore}; use server::{ConnectionManager, Error, Server}; use tonic::{Request, Response, Status}; @@ -17,6 +20,33 @@ struct ManagementService { use super::error::{default_db_error_handler, default_server_error_handler}; use std::num::NonZeroU32; +#[derive(Debug)] +enum UpdateError { + Update(server::Error), + Closure(tonic::Status), +} + +impl From for Status { + fn from(error: UpdateError) -> Self { + match error { + UpdateError::Update(error) => { + info!(?error, "Update error"); + InternalError {}.into() + } + UpdateError::Closure(error) => error, + } + } +} + +impl From> for UpdateError { + fn from(error: server::UpdateError) -> Self { + match error { + server::UpdateError::Update(error) => Self::Update(error), + server::UpdateError::Closure(error) => Self::Closure(error), + } + } +} + #[tonic::async_trait] impl management_service_server::ManagementService for ManagementService where @@ -116,6 +146,24 @@ where } } + async fn update_database( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let rules: DatabaseRules = request.rules.required("rules")?; + let db_name = rules.name.clone(); + let updated_rules = self + .server + .update_db_rules(&db_name, |_orig| Ok(rules)) + .await + .map_err(UpdateError::from)?; + + Ok(Response::new(UpdateDatabaseResponse { + rules: Some(updated_rules.into()), + })) + } + async fn list_chunks( &self, request: Request, diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index 28e1a6ed12..0ea2b48b14 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -5,6 +5,7 @@ use generated_types::{ influxdata::iox::management::v1::*, }; use influxdb_iox_client::{management::CreateDatabaseError, operations}; + use test_helpers::assert_contains; use super::scenario::{ @@ -156,7 +157,7 @@ async fn test_list_databases() { } #[tokio::test] -async fn test_create_get_database() { +async fn test_create_get_update_database() { let server_fixture = ServerFixture::create_shared().await; let mut client = server_fixture.management_client(); @@ -164,7 +165,7 @@ async fn test_create_get_database() { // Specify everything to allow direct comparison between request and response // Otherwise would expect difference due to server-side defaulting - let rules = DatabaseRules { + let mut rules = DatabaseRules { name: db_name.clone(), partition_template: Some(PartitionTemplate { parts: vec![partition_template::Part { @@ -198,11 +199,36 @@ async fn test_create_get_database() { .expect("create database failed"); let response = client - .get_database(db_name) + .get_database(&db_name) .await .expect("get database failed"); - assert_eq!(response, rules); + assert_eq!(response.shard_config, None); + + rules.shard_config = Some(ShardConfig { + ignore_errors: true, + ..Default::default() + }); + + let updated_rules = client + .update_database(rules.clone()) + .await + .expect("update database failed"); + + assert_eq!(updated_rules, rules); + + let response = client + .get_database(&db_name) + .await + .expect("get database failed"); + + assert_eq!( + response + .shard_config + .expect("shard config missing") + .ignore_errors, + true + ); } #[tokio::test]