Merge branch 'main' into feature-label

pull/24376/head
Jeivardan 2021-04-09 03:50:08 +00:00 committed by GitHub
commit af72581658
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 1330 additions and 11 deletions

2
Cargo.lock generated
View File

@ -1437,6 +1437,7 @@ dependencies = [
"snafu",
"test_helpers",
"tokio",
"url",
]
[[package]]
@ -1466,6 +1467,7 @@ dependencies = [
"influxdb_tsm",
"ingest",
"internal_types",
"itertools 0.9.0",
"logfmt",
"mem_qe",
"mutable_buffer",

View File

@ -76,6 +76,7 @@ http = "0.2.0"
hyper = "0.14"
once_cell = { version = "1.4.0", features = ["parking_lot"] }
parking_lot = "0.11.1"
itertools = "0.9.0"
# used by arrow/datafusion anyway
prettytable-rs = "0.8"
prost = "0.7"

View File

@ -4,6 +4,7 @@ build:
- generated_types/protos/
lint:
allow_comment_ignores: true
ignore:
- google
- grpc

View File

@ -26,7 +26,7 @@ where
/// An extension trait that adds the methods `optional` and `required` to any
/// Option containing a type implementing `TryInto<U, Error = FieldViolation>`
pub(crate) trait FromFieldOpt<T> {
pub trait FromFieldOpt<T> {
/// Try to convert inner type, if any, using TryInto calling
/// `FieldViolation::scope` on any error encountered
///

View File

@ -23,4 +23,4 @@ pub mod timestamp;
pub mod wal;
mod database_name;
pub(crate) mod field_validation;
pub mod field_validation;

View File

@ -5,3 +5,6 @@ When updating the version of the [flatbuffers](https://crates.io/crates/flatbuff
To update the generated code, edit `generated_types/regenerate-flatbuffers.sh` and set the `FB_COMMIT` variable at the top of the file to the commit SHA of the same commit in the [flatbuffers repository](https://github.com/google/flatbuffers) where the `flatbuffers` Rust crate version was updated. This ensures we'll be [using the same version of `flatc` that the crate was tested with](https://github.com/google/flatbuffers/issues/6199#issuecomment-714562121).
Then run the `generated_types/regenerate-flatbuffers.sh` script and check in any changes. Check the whole project builds.
`generated_types/regenerate-flatbuffers.sh` will build `flatc` from source if it cannot be found.
In order to do that your system will require `bazel`; you can likely install this with your favourite package manager.

View File

@ -1,7 +1,9 @@
syntax = "proto3";
package influxdata.iox.management.v1;
import "google/longrunning/operations.proto";
import "google/protobuf/field_mask.proto";
import "influxdata/iox/management/v1/database_rules.proto";
import "influxdata/iox/management/v1/chunk.proto";
import "influxdata/iox/management/v1/partition.proto";
@ -17,6 +19,11 @@ service ManagementService {
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse);
// Update a database.
//
// Roughly follows the https://google.aip.dev/134 pattern, except we wrap the response
rpc UpdateDatabase(UpdateDatabaseRequest) returns (UpdateDatabaseResponse);
// List chunks available on this database
rpc ListChunks(ListChunksRequest) returns (ListChunksResponse);
@ -52,7 +59,6 @@ service ManagementService {
// Close a chunk and move it to the read buffer
rpc ClosePartitionChunk(ClosePartitionChunkRequest) returns (ClosePartitionChunkResponse);
}
message GetWriterIdRequest {}
@ -87,6 +93,16 @@ message CreateDatabaseRequest {
message CreateDatabaseResponse {}
// Update a database.
message UpdateDatabaseRequest {
// The rule's `name` field is used to identify the database rules to be updated.
DatabaseRules rules = 1;
}
message UpdateDatabaseResponse {
DatabaseRules rules = 1;
}
message ListChunksRequest {
// the name of the database
string db_name = 1;

View File

@ -3,7 +3,7 @@
# The commit where the Rust `flatbuffers` crate version was changed to the version in `Cargo.lock`
# Update this, rerun this script, and check in the changes in the generated code when the
# `flatbuffers` crate version is updated.
FB_COMMIT="86401e078d0746d2381735415f8c2dfe849f3f52"
FB_COMMIT="261cf3b20473abdf95fc34da0827e4986f065c39"
# Change to the generated_types crate directory, where this script is located
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"

View File

@ -11,6 +11,7 @@ reqwest = { version = "0.11", features = ["stream", "json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.44"
snafu = "0.6.6"
url = "2.1.1"
[dev-dependencies] # In alphabetical order
mockito = "0.26.0"

View File

@ -0,0 +1,26 @@
use influxdb2_client::models::{LanguageRequest, Query};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let influx_url = "http://localhost:8086";
let token = "some-token";
let client = influxdb2_client::Client::new(influx_url, token);
client.query_suggestions().await?;
client.query_suggestions_name("some-name").await?;
client
.query("some-org", Some(Query::new("some-query".to_string())))
.await?;
client
.query_analyze(Some(Query::new("some-query".to_string())))
.await?;
client
.query_ast(Some(LanguageRequest::new("some-query".to_string())))
.await?;
Ok(())
}

View File

@ -1,4 +1,5 @@
//! InfluxDB v2.0 Client API
pub mod label;
pub mod query;
pub mod ready;
pub mod setup;
pub mod setup;

View File

@ -0,0 +1,318 @@
//! Query
//!
//! Query InfluxDB using InfluxQL or Flux Query
use crate::{Client, Http, RequestError, ReqwestProcessing, Serializing};
use reqwest::{Method, StatusCode};
use snafu::ResultExt;
use crate::models::{
AnalyzeQueryResponse, AstResponse, FluxSuggestion, FluxSuggestions, LanguageRequest, Query,
};
impl Client {
/// Get Query Suggestions
pub async fn query_suggestions(&self) -> Result<FluxSuggestions, RequestError> {
let req_url = format!("{}/api/v2/query/suggestions", self.url);
let response = self
.request(Method::GET, &req_url)
.send()
.await
.context(ReqwestProcessing)?;
match response.status() {
StatusCode::OK => Ok(response
.json::<FluxSuggestions>()
.await
.context(ReqwestProcessing)?),
status => {
let text = response.text().await.context(ReqwestProcessing)?;
Http { status, text }.fail()?
}
}
}
/// Query Suggestions with name
pub async fn query_suggestions_name(&self, name: &str) -> Result<FluxSuggestion, RequestError> {
let req_url = format!(
"{}/api/v2/query/suggestions/{name}",
self.url,
name = crate::common::urlencode(name),
);
let response = self
.request(Method::GET, &req_url)
.send()
.await
.context(ReqwestProcessing)?;
match response.status() {
StatusCode::OK => Ok(response
.json::<FluxSuggestion>()
.await
.context(ReqwestProcessing)?),
status => {
let text = response.text().await.context(ReqwestProcessing)?;
Http { status, text }.fail()?
}
}
}
/// Query
pub async fn query(&self, org: &str, query: Option<Query>) -> Result<String, RequestError> {
let req_url = format!("{}/api/v2/query", self.url);
let response = self
.request(Method::POST, &req_url)
.header("Accepting-Encoding", "identity")
.header("Content-Type", "application/json")
.query(&[("org", &org)])
.body(serde_json::to_string(&query.unwrap_or_default()).context(Serializing)?)
.send()
.await
.context(ReqwestProcessing)?;
match response.status() {
StatusCode::OK => Ok(response.json::<String>().await.context(ReqwestProcessing)?),
status => {
let text = response.text().await.context(ReqwestProcessing)?;
Http { status, text }.fail()?
}
}
}
/// Analyze Query
pub async fn query_analyze(
&self,
query: Option<Query>,
) -> Result<AnalyzeQueryResponse, RequestError> {
let req_url = format!("{}/api/v2/query/analyze", self.url);
let response = self
.request(Method::POST, &req_url)
.header("Content-Type", "application/json")
.body(serde_json::to_string(&query.unwrap_or_default()).context(Serializing)?)
.send()
.await
.context(ReqwestProcessing)?;
match response.status() {
StatusCode::OK => Ok(response
.json::<AnalyzeQueryResponse>()
.await
.context(ReqwestProcessing)?),
status => {
let text = response.text().await.context(ReqwestProcessing)?;
Http { status, text }.fail()?
}
}
}
/// Get Query AST Repsonse
pub async fn query_ast(
&self,
language_request: Option<LanguageRequest>,
) -> Result<AstResponse, RequestError> {
let req_url = format!("{}/api/v2/query/ast", self.url);
let response = self
.request(Method::POST, &req_url)
.header("Content-Type", "application/json")
.body(
serde_json::to_string(&language_request.unwrap_or_default())
.context(Serializing)?,
)
.send()
.await
.context(ReqwestProcessing)?;
match response.status() {
StatusCode::OK => Ok(response
.json::<AstResponse>()
.await
.context(ReqwestProcessing)?),
status => {
let text = response.text().await.context(ReqwestProcessing)?;
Http { status, text }.fail()?
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use mockito::{mock, Matcher};
#[tokio::test]
async fn query_suggestions() {
let token = "some-token";
let mock_server = mock("GET", "/api/v2/query/suggestions")
.match_header("Authorization", format!("Token {}", token).as_str())
.create();
let client = Client::new(&mockito::server_url(), token);
let _result = client.query_suggestions().await;
mock_server.assert();
}
#[tokio::test]
async fn query_suggestions_name() {
let token = "some-token";
let suggestion_name = "some-name";
let mock_server = mock(
"GET",
format!(
"/api/v2/query/suggestions/{name}",
name = crate::common::urlencode(suggestion_name)
)
.as_str(),
)
.match_header("Authorization", format!("Token {}", token).as_str())
.create();
let client = Client::new(&mockito::server_url(), token);
let _result = client.query_suggestions_name(&suggestion_name).await;
mock_server.assert();
}
#[tokio::test]
async fn query() {
let token = "some-token";
let org = "some-org";
let query: Option<Query> = Some(Query::new("some-influx-query-string".to_string()));
let mock_server = mock("POST", "/api/v2/query")
.match_header("Authorization", format!("Token {}", token).as_str())
.match_header("Accepting-Encoding", "identity")
.match_header("Content-Type", "application/json")
.match_query(Matcher::UrlEncoded("org".into(), org.into()))
.match_body(
serde_json::to_string(&query.clone().unwrap_or_default())
.unwrap()
.as_str(),
)
.create();
let client = Client::new(&mockito::server_url(), token);
let _result = client.query(org, query).await;
mock_server.assert();
}
#[tokio::test]
async fn query_opt() {
let token = "some-token";
let org = "some-org";
let query: Option<Query> = None;
let mock_server = mock("POST", "/api/v2/query")
.match_header("Authorization", format!("Token {}", token).as_str())
.match_header("Accepting-Encoding", "identity")
.match_header("Content-Type", "application/json")
.match_query(Matcher::UrlEncoded("org".into(), org.into()))
.match_body(
serde_json::to_string(&query.unwrap_or_default())
.unwrap()
.as_str(),
)
.create();
let client = Client::new(&mockito::server_url(), token);
let _result = client.query(org, None).await;
mock_server.assert();
}
#[tokio::test]
async fn query_analyze() {
let token = "some-token";
let query: Option<Query> = Some(Query::new("some-influx-query-string".to_string()));
let mock_server = mock("POST", "/api/v2/query/analyze")
.match_header("Authorization", format!("Token {}", token).as_str())
.match_header("Content-Type", "application/json")
.match_body(
serde_json::to_string(&query.clone().unwrap_or_default())
.unwrap()
.as_str(),
)
.create();
let client = Client::new(&mockito::server_url(), token);
let _result = client.query_analyze(query).await;
mock_server.assert();
}
#[tokio::test]
async fn query_analyze_opt() {
let token = "some-token";
let query: Option<Query> = None;
let mock_server = mock("POST", "/api/v2/query/analyze")
.match_header("Authorization", format!("Token {}", token).as_str())
.match_header("Content-Type", "application/json")
.match_body(
serde_json::to_string(&query.clone().unwrap_or_default())
.unwrap()
.as_str(),
)
.create();
let client = Client::new(&mockito::server_url(), token);
let _result = client.query_analyze(query).await;
mock_server.assert();
}
#[tokio::test]
async fn query_ast() {
let token = "some-token";
let language_request: Option<LanguageRequest> =
Some(LanguageRequest::new("some-influx-query-string".to_string()));
let mock_server = mock("POST", "/api/v2/query/ast")
.match_header("Authorization", format!("Token {}", token).as_str())
.match_header("Content-Type", "application/json")
.match_body(
serde_json::to_string(&language_request.clone().unwrap_or_default())
.unwrap()
.as_str(),
)
.create();
let client = Client::new(&mockito::server_url(), token);
let _result = client.query_ast(language_request).await;
mock_server.assert();
}
#[tokio::test]
async fn query_ast_opt() {
let token = "some-token";
let language_request: Option<LanguageRequest> = None;
let mock_server = mock("POST", "/api/v2/query/ast")
.match_header("Authorization", format!("Token {}", token).as_str())
.match_header("Content-Type", "application/json")
.match_body(
serde_json::to_string(&language_request.clone().unwrap_or_default())
.unwrap()
.as_str(),
)
.create();
let client = Client::new(&mockito::server_url(), token);
let _result = client.query_ast(language_request).await;
mock_server.assert();
}
}

View File

@ -0,0 +1,8 @@
//! Common
//!
//! Collection of helper functions
/// Serialize to application/x-www-form-urlencoded syntax
pub fn urlencode<T: AsRef<str>>(s: T) -> String {
::url::form_urlencoded::byte_serialize(s.as_ref().as_bytes()).collect()
}

View File

@ -308,5 +308,7 @@ cpu,host=server01,region=us-west usage=0.87
}
}
pub mod common;
pub mod api;
pub mod models;

View File

@ -0,0 +1,24 @@
//! CallExpression
use serde::{Deserialize, Serialize};
/// Represents a function call
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct CallExpression {
/// Type of AST node
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub r#type: Option<String>,
/// Callee
#[serde(skip_serializing_if = "Option::is_none")]
pub callee: Option<Box<crate::models::ast::Expression>>,
/// Function arguments
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub arguments: Vec<crate::models::ast::Expression>,
}
impl CallExpression {
/// Represents a function call
pub fn new() -> Self {
Self::default()
}
}

View File

@ -0,0 +1,52 @@
//! Dialect
use serde::{Deserialize, Serialize};
/// Dialect are options to change the default CSV output format; https://www.w3.org/TR/2015/REC-tabular-metadata-20151217/#dialect-descriptions
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Dialect {
/// If true, the results will contain a header row
#[serde(skip_serializing_if = "Option::is_none")]
pub header: Option<bool>,
/// Separator between cells; the default is ,
#[serde(skip_serializing_if = "Option::is_none")]
pub delimiter: Option<String>,
/// https://www.w3.org/TR/2015/REC-tabular-data-model-20151217/#columns
#[serde(skip_serializing_if = "Option::is_none")]
pub annotations: Option<Annotations>,
/// Character prefixed to comment strings
#[serde(skip_serializing_if = "Option::is_none")]
pub comment_prefix: Option<String>,
/// Format of timestamps
#[serde(skip_serializing_if = "Option::is_none")]
pub date_time_format: Option<DateTimeFormat>,
}
impl Dialect {
/// Dialect are options to change the default CSV output format; https://www.w3.org/TR/2015/REC-tabular-metadata-20151217/#dialect-descriptions
pub fn new() -> Self {
Self::default()
}
}
/// https://www.w3.org/TR/2015/REC-tabular-data-model-20151217/#columns
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum Annotations {
/// Group Annotation
Group,
/// Datatype Annotation
Datatype,
/// Default Annotation
Default,
}
/// Timestamp Format
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub enum DateTimeFormat {
/// RFC3339
RFC3339,
/// RFC3339Nano
RFC3339Nano,
}

View File

@ -0,0 +1,24 @@
//! DictItem
use serde::{Deserialize, Serialize};
/// A key/value pair in a dictionary
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct DictItem {
/// Type of AST node
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub r#type: Option<String>,
/// Key
#[serde(skip_serializing_if = "Option::is_none")]
pub key: Option<crate::models::ast::Expression>,
/// Value
#[serde(skip_serializing_if = "Option::is_none")]
pub val: Option<crate::models::ast::Expression>,
}
impl DictItem {
/// A key/value pair in a dictionary
pub fn new() -> Self {
Self::default()
}
}

View File

@ -0,0 +1,27 @@
//! Duration
use serde::{Deserialize, Serialize};
/// Duration : A pair consisting of length of time and the unit of time
/// measured. It is the atomic unit from which all duration literals are
/// composed.
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct Duration {
/// Type of AST node
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub r#type: Option<String>,
/// Duration Magnitude
#[serde(skip_serializing_if = "Option::is_none")]
pub magnitude: Option<i32>,
/// Duration unit
#[serde(skip_serializing_if = "Option::is_none")]
pub unit: Option<String>,
}
impl Duration {
/// A pair consisting of length of time and the unit of time measured. It is
/// the atomic unit from which all duration literals are composed.
pub fn new() -> Self {
Self::default()
}
}

View File

@ -0,0 +1,84 @@
//! Expression
use serde::{Deserialize, Serialize};
/// Expression AST
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct Expression {
/// Type of AST node
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub r#type: Option<String>,
/// Elements of the dictionary
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub elements: Vec<crate::models::ast::DictItem>,
/// Function parameters
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub params: Vec<crate::models::ast::Property>,
/// Node
#[serde(skip_serializing_if = "Option::is_none")]
pub body: Option<crate::models::ast::Node>,
/// Operator
#[serde(skip_serializing_if = "Option::is_none")]
pub operator: Option<String>,
/// Left leaf
#[serde(skip_serializing_if = "Option::is_none")]
pub left: Option<Box<crate::models::ast::Expression>>,
/// Right leaf
#[serde(skip_serializing_if = "Option::is_none")]
pub right: Option<Box<crate::models::ast::Expression>>,
/// Parent Expression
#[serde(skip_serializing_if = "Option::is_none")]
pub callee: Option<Box<crate::models::ast::Expression>>,
/// Function arguments
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub arguments: Vec<crate::models::ast::Expression>,
/// Test Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub test: Option<Box<crate::models::ast::Expression>>,
/// Alternate Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub alternate: Option<Box<crate::models::ast::Expression>>,
/// Consequent Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub consequent: Option<Box<crate::models::ast::Expression>>,
/// Object Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub object: Option<Box<crate::models::ast::Expression>>,
/// PropertyKey Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub property: Option<Box<crate::models::ast::PropertyKey>>,
/// Array Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub array: Option<Box<crate::models::ast::Expression>>,
/// Index Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub index: Option<Box<crate::models::ast::Expression>>,
/// Properties
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub properties: Vec<crate::models::ast::Property>,
/// Expression
#[serde(skip_serializing_if = "Option::is_none")]
pub expression: Option<Box<crate::models::ast::Expression>>,
/// Argument
#[serde(skip_serializing_if = "Option::is_none")]
pub argument: Option<Box<crate::models::ast::Expression>>,
/// Call Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub call: Option<crate::models::ast::CallExpression>,
/// Expression Value
#[serde(skip_serializing_if = "Option::is_none")]
pub value: Option<String>,
/// Duration values
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub values: Vec<crate::models::ast::Duration>,
/// Expression Name
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
}
impl Expression {
/// Return instance of expression
pub fn new() -> Self {
Self::default()
}
}

View File

@ -0,0 +1,21 @@
//! Idendifier
use serde::{Deserialize, Serialize};
/// A valid Flux identifier
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct Identifier {
/// Type of AST node
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub r#type: Option<String>,
/// Identifier Name
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
}
impl Identifier {
/// A valid Flux identifier
pub fn new() -> Self {
Self::default()
}
}

View File

@ -0,0 +1,24 @@
//! ImportDeclaration
use serde::{Deserialize, Serialize};
/// Declares a package import
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct ImportDeclaration {
/// Type of AST node
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub r#type: Option<String>,
/// Import Identifier
#[serde(rename = "as", skip_serializing_if = "Option::is_none")]
pub r#as: Option<crate::models::ast::Identifier>,
/// Import Path
#[serde(skip_serializing_if = "Option::is_none")]
pub path: Option<crate::models::ast::StringLiteral>,
}
impl ImportDeclaration {
/// Declares a package import
pub fn new() -> Self {
Self::default()
}
}

View File

@ -0,0 +1,24 @@
//! MemberExpression
use serde::{Deserialize, Serialize};
/// Represents accessing a property of an object
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct MemberExpression {
/// Type of AST node
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub r#type: Option<String>,
/// Member object
#[serde(skip_serializing_if = "Option::is_none")]
pub object: Option<crate::models::ast::Expression>,
/// Member Property
#[serde(skip_serializing_if = "Option::is_none")]
pub property: Option<crate::models::ast::PropertyKey>,
}
impl MemberExpression {
/// Represents accessing a property of an object
pub fn new() -> Self {
Self::default()
}
}

View File

@ -0,0 +1,34 @@
//! Query AST models
pub mod identifier;
pub use self::identifier::Identifier;
pub mod statement;
pub use self::statement::Statement;
pub mod expression;
pub use self::expression::Expression;
pub mod call_expression;
pub use self::call_expression::CallExpression;
pub mod member_expression;
pub use self::member_expression::MemberExpression;
pub mod string_literal;
pub use self::string_literal::StringLiteral;
pub mod dict_item;
pub use self::dict_item::DictItem;
pub mod variable_assignment;
pub use self::variable_assignment::VariableAssignment;
pub mod node;
pub use self::node::Node;
pub mod property;
pub use self::property::Property;
pub mod property_key;
pub use self::property_key::PropertyKey;
pub mod dialect;
pub use self::dialect::Dialect;
pub mod import_declaration;
pub use self::import_declaration::ImportDeclaration;
pub mod package;
pub use self::package::Package;
pub mod package_clause;
pub use self::package_clause::PackageClause;
pub mod duration;
pub use self::duration::Duration;

View File

@ -0,0 +1,84 @@
//! Node
use serde::{Deserialize, Serialize};
/// Node
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct Node {
/// Type of AST node
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub r#type: Option<String>,
/// Elements of the dictionary
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub elements: Vec<crate::models::ast::DictItem>,
/// Function parameters
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub params: Vec<crate::models::ast::Property>,
/// Block body
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub body: Vec<crate::models::ast::Statement>,
/// Node Operator
#[serde(skip_serializing_if = "Option::is_none")]
pub operator: Option<String>,
/// Left left node
#[serde(skip_serializing_if = "Option::is_none")]
pub left: Option<Box<crate::models::ast::Expression>>,
/// Right right node
#[serde(skip_serializing_if = "Option::is_none")]
pub right: Option<Box<crate::models::ast::Expression>>,
/// Parent node
#[serde(skip_serializing_if = "Option::is_none")]
pub callee: Option<Box<crate::models::ast::Expression>>,
/// Function arguments
#[serde(skip_serializing_if = "Vec::is_empty")]
pub arguments: Vec<crate::models::ast::Expression>,
/// Test Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub test: Option<Box<crate::models::ast::Expression>>,
/// Alternate Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub alternate: Option<Box<crate::models::ast::Expression>>,
/// Consequent Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub consequent: Option<Box<crate::models::ast::Expression>>,
/// Object Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub object: Option<Box<crate::models::ast::Expression>>,
/// PropertyKey
#[serde(skip_serializing_if = "Option::is_none")]
pub property: Option<crate::models::ast::PropertyKey>,
/// Array Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub array: Option<Box<crate::models::ast::Expression>>,
/// Index Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub index: Option<Box<crate::models::ast::Expression>>,
/// Object properties
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub properties: Vec<crate::models::ast::Property>,
/// Expression
#[serde(skip_serializing_if = "Option::is_none")]
pub expression: Option<Box<crate::models::ast::Expression>>,
/// Node arguments
#[serde(skip_serializing_if = "Option::is_none")]
pub argument: Option<Box<crate::models::ast::Expression>>,
/// Call Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub call: Option<crate::models::ast::CallExpression>,
/// Node Value
#[serde(skip_serializing_if = "Option::is_none")]
pub value: Option<String>,
/// Duration values
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub values: Vec<crate::models::ast::Duration>,
/// Node name
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
}
impl Node {
/// Return instance of Node
pub fn new() -> Self {
Self::default()
}
}

View File

@ -0,0 +1,28 @@
//! Package
use crate::models::File;
use serde::{Deserialize, Serialize};
/// Represents a complete package source tree.
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct Package {
/// Type of AST node
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub r#type: Option<String>,
/// Package import path
#[serde(skip_serializing_if = "Option::is_none")]
pub path: Option<String>,
/// Package name
#[serde(skip_serializing_if = "Option::is_none")]
pub package: Option<String>,
/// Package files
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub files: Vec<File>,
}
impl Package {
/// Represents a complete package source tree.
pub fn new() -> Self {
Self::default()
}
}

View File

@ -0,0 +1,21 @@
//! PackageClause
use serde::{Deserialize, Serialize};
/// Defines a package identifier
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct PackageClause {
/// Type of AST node
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub r#type: Option<String>,
/// Package name
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<crate::models::ast::Identifier>,
}
impl PackageClause {
/// Defines a package identifier
pub fn new() -> Self {
Self::default()
}
}

View File

@ -0,0 +1,24 @@
//! Property
use serde::{Deserialize, Serialize};
/// The value associated with a key
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct Property {
/// Type of AST node
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub r#type: Option<String>,
/// Property Key
#[serde(skip_serializing_if = "Option::is_none")]
pub key: Option<crate::models::ast::PropertyKey>,
/// Property Value
#[serde(skip_serializing_if = "Option::is_none")]
pub value: Option<crate::models::ast::Expression>,
}
impl Property {
/// The value associated with a key
pub fn new() -> Self {
Self::default()
}
}

View File

@ -0,0 +1,24 @@
//! PropertyKey
use serde::{Deserialize, Serialize};
/// Key value pair
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct PropertyKey {
/// Type of AST node
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub r#type: Option<String>,
/// PropertyKey name
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
/// PropertyKey value
#[serde(skip_serializing_if = "Option::is_none")]
pub value: Option<String>,
}
impl PropertyKey {
/// Returns an instance of PropertyKey
pub fn new() -> Self {
Self::default()
}
}

View File

@ -0,0 +1,39 @@
//! Statement
use serde::{Deserialize, Serialize};
/// Expression Statement
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct Statement {
/// Type of AST node
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub r#type: Option<String>,
/// Raw source text
#[serde(skip_serializing_if = "Option::is_none")]
pub text: Option<String>,
/// Statement identitfier
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<crate::models::ast::Identifier>,
/// Initial Value
#[serde(skip_serializing_if = "Option::is_none")]
pub init: Option<crate::models::ast::Expression>,
/// Member
#[serde(skip_serializing_if = "Option::is_none")]
pub member: Option<crate::models::ast::MemberExpression>,
/// Expression
#[serde(skip_serializing_if = "Option::is_none")]
pub expression: Option<crate::models::ast::Expression>,
/// Argument
#[serde(skip_serializing_if = "Option::is_none")]
pub argument: Option<crate::models::ast::Expression>,
/// Assignment
#[serde(skip_serializing_if = "Option::is_none")]
pub assignment: Option<crate::models::ast::VariableAssignment>,
}
impl Statement {
/// Returns an instance of Statement
pub fn new() -> Self {
Self::default()
}
}

View File

@ -0,0 +1,21 @@
//! StringLiteral
use serde::{Deserialize, Serialize};
/// Expressions begin and end with double quote marks
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct StringLiteral {
/// Type of AST node
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub r#type: Option<String>,
/// StringLiteral Value
#[serde(skip_serializing_if = "Option::is_none")]
pub value: Option<String>,
}
impl StringLiteral {
/// Expressions begin and end with double quote marks
pub fn new() -> Self {
Self::default()
}
}

View File

@ -0,0 +1,24 @@
//! VariableAssignment
use serde::{Deserialize, Serialize};
/// Represents the declaration of a variable
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct VariableAssignment {
/// Type of AST node
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub r#type: Option<String>,
/// Variable Identifier
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<crate::models::ast::Identifier>,
/// Variable initial value
#[serde(skip_serializing_if = "Option::is_none")]
pub init: Option<crate::models::ast::Expression>,
}
impl VariableAssignment {
/// Represents the declaration of a variable
pub fn new() -> Self {
Self::default()
}
}

View File

@ -0,0 +1,30 @@
//! File
use serde::{Deserialize, Serialize};
/// Represents a source from a single file
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct File {
/// Type of AST node
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub r#type: Option<String>,
/// The name of the file.
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
/// PackageClause
#[serde(skip_serializing_if = "Option::is_none")]
pub package: Option<crate::models::ast::PackageClause>,
/// A list of package imports
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub imports: Vec<crate::models::ast::ImportDeclaration>,
/// List of Flux statements
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub body: Vec<crate::models::ast::Statement>,
}
impl File {
/// Represents a source from a single file
pub fn new() -> Self {
Self::default()
}
}

View File

@ -2,6 +2,8 @@
//!
//! Roughly follows the OpenAPI specification
pub mod ast;
pub mod user;
pub use self::user::{User, UserLinks, Users, UsersLinks};
pub mod organization;
@ -22,3 +24,10 @@ pub mod resource;
pub use self::resource::Resource;
pub mod retention_rule;
pub use self::retention_rule::RetentionRule;
pub mod query;
pub use self::query::{
AnalyzeQueryResponse, AnalyzeQueryResponseErrors, AstResponse, FluxSuggestion, FluxSuggestions,
LanguageRequest, Query,
};
pub mod file;
pub use self::file::File;

View File

@ -0,0 +1,145 @@
//! Query
use crate::models::ast::Package;
use crate::models::File;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// Query influx using the Flux language
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct Query {
/// Query Script
#[serde(rename = "extern", skip_serializing_if = "Option::is_none")]
pub r#extern: Option<File>,
/// Query script to execute.
pub query: String,
/// The type of query. Must be \"flux\".
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub r#type: Option<Type>,
/// Dialect
#[serde(skip_serializing_if = "Option::is_none")]
pub dialect: Option<crate::models::ast::Dialect>,
/// Specifies the time that should be reported as "now" in the query.
/// Default is the server's now time.
#[serde(skip_serializing_if = "Option::is_none")]
pub now: Option<String>,
}
impl Query {
/// Query influx using the Flux language
pub fn new(query: String) -> Self {
Self {
query,
..Default::default()
}
}
}
/// The type of query. Must be \"flux\".
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum Type {
/// Query Type
Flux,
}
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
/// Flux Query Suggestion
pub struct FluxSuggestion {
/// Suggestion Name
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
/// Suggestion Params
#[serde(skip_serializing_if = "Option::is_none")]
pub params: Option<HashMap<String, String>>,
}
impl FluxSuggestion {
/// Returns an instance FluxSuggestion
pub fn new() -> Self {
Self::default()
}
}
/// FluxSuggestions
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct FluxSuggestions {
/// List of Flux Suggestions
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub funcs: Vec<crate::models::FluxSuggestion>,
}
impl FluxSuggestions {
/// Return an instance of FluxSuggestions
pub fn new() -> Self {
Self::default()
}
}
/// AnalyzeQueryResponse
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct AnalyzeQueryResponse {
/// List of QueryResponseErrors
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub errors: Vec<AnalyzeQueryResponseErrors>,
}
impl AnalyzeQueryResponse {
/// Return an instance of AnanlyzeQueryResponse
pub fn new() -> Self {
Self::default()
}
}
/// AnalyzeQueryResponseErrors
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct AnalyzeQueryResponseErrors {
/// Error line
#[serde(skip_serializing_if = "Option::is_none")]
pub line: Option<i32>,
/// Error column
#[serde(skip_serializing_if = "Option::is_none")]
pub column: Option<i32>,
/// Error char
#[serde(skip_serializing_if = "Option::is_none")]
pub character: Option<i32>,
/// Error message
#[serde(skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
}
impl AnalyzeQueryResponseErrors {
/// Return an instance of AnalyzeQueryResponseErrors
pub fn new() -> Self {
Self::default()
}
}
/// AstResponse : Contains the AST for the supplied Flux query
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct AstResponse {
/// AST of Flux query
#[serde(skip_serializing_if = "Option::is_none")]
pub ast: Option<Package>,
}
impl AstResponse {
/// Contains the AST for the supplied Flux query
pub fn new() -> Self {
Self::default()
}
}
/// LanguageRequest : Flux query to be analyzed.
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct LanguageRequest {
/// Flux query script to be analyzed
pub query: String,
}
impl LanguageRequest {
/// Flux query to be analyzed.
pub fn new(query: String) -> Self {
Self { query }
}
}

View File

@ -53,6 +53,26 @@ pub enum CreateDatabaseError {
ServerError(tonic::Status),
}
/// Errors returned by Client::update_database
#[derive(Debug, Error)]
pub enum UpdateDatabaseError {
/// Writer ID is not set
#[error("Writer ID not set")]
NoWriterId,
/// Database not found
#[error("Database not found")]
DatabaseNotFound,
/// Server returned an invalid argument error
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
InvalidArgument(tonic::Status),
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
}
/// Errors returned by Client::list_databases
#[derive(Debug, Error)]
pub enum ListDatabaseError {
@ -271,6 +291,25 @@ impl Client {
Ok(())
}
/// Updates the configuration for a database.
pub async fn update_database(
&mut self,
rules: DatabaseRules,
) -> Result<DatabaseRules, UpdateDatabaseError> {
let response = self
.inner
.update_database(UpdateDatabaseRequest { rules: Some(rules) })
.await
.map_err(|status| match status.code() {
tonic::Code::NotFound => UpdateDatabaseError::DatabaseNotFound,
tonic::Code::FailedPrecondition => UpdateDatabaseError::NoWriterId,
tonic::Code::InvalidArgument => UpdateDatabaseError::InvalidArgument(status),
_ => UpdateDatabaseError::ServerError(status),
})?;
Ok(response.into_inner().rules.unwrap())
}
/// List databases.
pub async fn list_databases(&mut self) -> Result<Vec<String>, ListDatabaseError> {
let response = self

View File

@ -33,6 +33,17 @@ pub(crate) struct Config {
state: RwLock<ConfigState>,
}
pub(crate) enum UpdateError<E> {
Update(Error),
Closure(E),
}
impl<E> From<Error> for UpdateError<E> {
fn from(e: Error) -> Self {
Self::Update(e)
}
}
impl Config {
pub(crate) fn new(jobs: Arc<JobRegistry>) -> Self {
Self {
@ -67,6 +78,27 @@ impl Config {
state.databases.keys().cloned().collect()
}
pub(crate) fn update_db_rules<F, E>(
&self,
db_name: &DatabaseName<'static>,
update: F,
) -> std::result::Result<DatabaseRules, UpdateError<E>>
where
F: FnOnce(DatabaseRules) -> std::result::Result<DatabaseRules, E>,
{
let state = self.state.read().expect("mutex poisoned");
let db_state = state
.databases
.get(db_name)
.ok_or_else(|| Error::DatabaseNotFound {
db_name: db_name.to_string(),
})?;
let mut rules = db_state.db.rules.write();
*rules = update(rules.clone()).map_err(UpdateError::Closure)?;
Ok(rules.clone())
}
pub(crate) fn remotes_sorted(&self) -> Vec<(WriterId, String)> {
let state = self.state.read().expect("mutex poisoned");
state.remotes.iter().map(|(&a, b)| (a, b.clone())).collect()

View File

@ -192,6 +192,18 @@ pub struct Server<M: ConnectionManager> {
jobs: Arc<JobRegistry>,
}
#[derive(Debug)]
pub enum UpdateError<E> {
Update(Error),
Closure(E),
}
impl<E> From<Error> for UpdateError<E> {
fn from(e: Error) -> Self {
Self::Update(e)
}
}
impl<M: ConnectionManager> Server<M> {
pub fn new(connection_manager: M, store: Arc<ObjectStore>) -> Self {
let jobs = Arc::new(JobRegistry::new());
@ -406,6 +418,26 @@ impl<M: ConnectionManager> Server<M> {
self.config.db(name).map(|d| d.rules.read().clone())
}
// Update database rules and save on success.
pub async fn update_db_rules<F, E>(
&self,
db_name: &DatabaseName<'static>,
update: F,
) -> std::result::Result<DatabaseRules, UpdateError<E>>
where
F: FnOnce(DatabaseRules) -> Result<DatabaseRules, E>,
{
let rules = self
.config
.update_db_rules(db_name, update)
.map_err(|e| match e {
crate::config::UpdateError::Closure(e) => UpdateError::Closure(e),
crate::config::UpdateError::Update(e) => UpdateError::Update(e),
})?;
self.persist_database_rules(rules.clone()).await?;
Ok(rules)
}
pub fn remotes_sorted(&self) -> Vec<(WriterId, String)> {
self.config.remotes_sorted()
}

View File

@ -3,9 +3,12 @@ use std::fmt::Debug;
use std::sync::Arc;
use data_types::database_rules::DatabaseRules;
use data_types::DatabaseName;
use generated_types::google::{AlreadyExists, FieldViolation, FieldViolationExt, NotFound};
use data_types::{field_validation::FromFieldOpt, DatabaseName};
use generated_types::google::{
AlreadyExists, FieldViolation, FieldViolationExt, InternalError, NotFound,
};
use generated_types::influxdata::iox::management::v1::*;
use observability_deps::tracing::info;
use query::{Database, DatabaseStore};
use server::{ConnectionManager, Error, Server};
use tonic::{Request, Response, Status};
@ -17,6 +20,33 @@ struct ManagementService<M: ConnectionManager> {
use super::error::{default_db_error_handler, default_server_error_handler};
use std::num::NonZeroU32;
#[derive(Debug)]
enum UpdateError {
Update(server::Error),
Closure(tonic::Status),
}
impl From<UpdateError> for Status {
fn from(error: UpdateError) -> Self {
match error {
UpdateError::Update(error) => {
info!(?error, "Update error");
InternalError {}.into()
}
UpdateError::Closure(error) => error,
}
}
}
impl From<server::UpdateError<Status>> for UpdateError {
fn from(error: server::UpdateError<Status>) -> Self {
match error {
server::UpdateError::Update(error) => Self::Update(error),
server::UpdateError::Closure(error) => Self::Closure(error),
}
}
}
#[tonic::async_trait]
impl<M> management_service_server::ManagementService for ManagementService<M>
where
@ -116,6 +146,24 @@ where
}
}
async fn update_database(
&self,
request: Request<UpdateDatabaseRequest>,
) -> Result<Response<UpdateDatabaseResponse>, Status> {
let request = request.into_inner();
let rules: DatabaseRules = request.rules.required("rules")?;
let db_name = rules.name.clone();
let updated_rules = self
.server
.update_db_rules(&db_name, |_orig| Ok(rules))
.await
.map_err(UpdateError::from)?;
Ok(Response::new(UpdateDatabaseResponse {
rules: Some(updated_rules.into()),
}))
}
async fn list_chunks(
&self,
request: Request<ListChunksRequest>,

View File

@ -5,6 +5,7 @@ use generated_types::{
influxdata::iox::management::v1::*,
};
use influxdb_iox_client::{management::CreateDatabaseError, operations};
use test_helpers::assert_contains;
use super::scenario::{
@ -156,7 +157,7 @@ async fn test_list_databases() {
}
#[tokio::test]
async fn test_create_get_database() {
async fn test_create_get_update_database() {
let server_fixture = ServerFixture::create_shared().await;
let mut client = server_fixture.management_client();
@ -164,7 +165,7 @@ async fn test_create_get_database() {
// Specify everything to allow direct comparison between request and response
// Otherwise would expect difference due to server-side defaulting
let rules = DatabaseRules {
let mut rules = DatabaseRules {
name: db_name.clone(),
partition_template: Some(PartitionTemplate {
parts: vec![partition_template::Part {
@ -198,11 +199,36 @@ async fn test_create_get_database() {
.expect("create database failed");
let response = client
.get_database(db_name)
.get_database(&db_name)
.await
.expect("get database failed");
assert_eq!(response, rules);
assert_eq!(response.shard_config, None);
rules.shard_config = Some(ShardConfig {
ignore_errors: true,
..Default::default()
});
let updated_rules = client
.update_database(rules.clone())
.await
.expect("update database failed");
assert_eq!(updated_rules, rules);
let response = client
.get_database(&db_name)
.await
.expect("get database failed");
assert_eq!(
response
.shard_config
.expect("shard config missing")
.ignore_errors,
true
);
}
#[tokio::test]