diff --git a/Cargo.lock b/Cargo.lock index 05bd65577a..46622a0e70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -658,6 +658,8 @@ name = "client_util" version = "0.1.0" dependencies = [ "http", + "mockito", + "reqwest", "thiserror", "tokio", "tonic", @@ -2045,7 +2047,6 @@ dependencies = [ "http", "humantime", "import", - "influxdb2_client", "influxdb_iox_client", "influxdb_storage_client", "influxrpc_parser", @@ -2103,13 +2104,12 @@ dependencies = [ "arrow_util", "bytes", "client_util", - "dml", "futures-util", "generated_types", - "mutable_batch_lp", - "mutable_batch_pb", + "mockito", "prost 0.11.0", "rand", + "reqwest", "thiserror", "tokio", "tonic", diff --git a/client_util/Cargo.toml b/client_util/Cargo.toml index 462dff2a08..af1a7f6c4b 100644 --- a/client_util/Cargo.toml +++ b/client_util/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] http = "0.2.8" +reqwest = { version = "0.11", default-features = false, features = ["stream", "rustls-tls"] } thiserror = "1.0.37" tonic = { version = "0.8" } tower = "0.4" @@ -14,3 +15,4 @@ workspace-hack = { path = "../workspace-hack"} [dev-dependencies] tokio = { version = "1.21", features = ["macros", "parking_lot", "rt-multi-thread"] } +mockito = "0.31" \ No newline at end of file diff --git a/client_util/src/connection.rs b/client_util/src/connection.rs index fde94490db..429ec6b32f 100644 --- a/client_util/src/connection.rs +++ b/client_util/src/connection.rs @@ -1,5 +1,6 @@ use crate::tower::{SetRequestHeadersLayer, SetRequestHeadersService}; use http::header::HeaderName; +use http::HeaderMap; use http::{uri::InvalidUri, HeaderValue, Uri}; use std::convert::TryInto; use std::time::Duration; @@ -7,8 +8,63 @@ use thiserror::Error; use tonic::transport::{Channel, Endpoint}; use tower::make::MakeConnection; -/// The connection type used for clients -pub type Connection = SetRequestHeadersService; +/// The connection type used for clients. Use [`Builder`] to create +/// instances of [`Connection`] objects +#[derive(Debug, Clone)] +pub struct Connection { + grpc_connection: GrpcConnection, + http_connection: HttpConnection, +} + +impl Connection { + /// Create a new Connection + fn new(grpc_connection: GrpcConnection, http_connection: HttpConnection) -> Self { + Self { + grpc_connection, + http_connection, + } + } + + /// Consume `self` and return a [`GrpcConnection`] (suitable for use in + /// tonic clients) + pub fn into_grpc_connection(self) -> GrpcConnection { + self.grpc_connection + } + + /// Consume `self` and return a [`HttpConnection`] (suitable for making + /// calls to /api/v2 endpoints) + pub fn into_http_connection(self) -> HttpConnection { + self.http_connection + } +} + +/// The type used to make tonic (gRPC) requests +pub type GrpcConnection = SetRequestHeadersService; + +/// The type used to make raw http request +#[derive(Debug, Clone)] +pub struct HttpConnection { + /// The base uri of the IOx http API endpoint + uri: Uri, + /// http client connection + http_client: reqwest::Client, +} + +impl HttpConnection { + fn new(uri: Uri, http_client: reqwest::Client) -> Self { + Self { uri, http_client } + } + + /// Return a reference to the underyling http client + pub fn client(&self) -> &reqwest::Client { + &self.http_client + } + + /// Return a reference to the base uri of the IOx http API endpoint + pub fn uri(&self) -> &Uri { + &self.uri + } +} /// The default User-Agent header sent by the HTTP client. pub const USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); @@ -100,7 +156,7 @@ impl Builder { { let endpoint = self.create_endpoint(dst)?; let channel = endpoint.connect().await?; - Ok(self.compose_middleware(channel)) + Ok(self.compose_middleware(channel, endpoint)) } /// Construct the [`Connection`] instance using the specified base URL and custom connector. @@ -114,7 +170,7 @@ impl Builder { { let endpoint = self.create_endpoint(dst)?; let channel = endpoint.connect_with_connector(connector).await?; - Ok(self.compose_middleware(channel)) + Ok(self.compose_middleware(channel, endpoint)) } fn create_endpoint(&self, dst: D) -> Result @@ -128,11 +184,23 @@ impl Builder { Ok(endpoint) } - fn compose_middleware(self, channel: Channel) -> Connection { + fn compose_middleware(self, channel: Channel, endpoint: Endpoint) -> Connection { + let headers_map: HeaderMap = self.headers.iter().cloned().collect(); + // Compose channel with new tower middleware stack - tower::ServiceBuilder::new() + let grpc_connection = tower::ServiceBuilder::new() .layer(SetRequestHeadersLayer::new(self.headers)) - .service(channel) + .service(channel); + + let http_client = reqwest::Client::builder() + .connection_verbose(true) + .default_headers(headers_map) + .build() + .expect("reqwest::Client should have built"); + + let http_connection = HttpConnection::new(endpoint.uri().clone(), http_client); + + Connection::new(grpc_connection, http_connection) } /// Set the `User-Agent` header sent by this client. @@ -180,6 +248,7 @@ impl Builder { #[cfg(test)] mod tests { use super::*; + use reqwest::Method; #[test] fn test_builder_cloneable() { @@ -187,4 +256,37 @@ mod tests { fn assert_clone(_t: T) {} assert_clone(Builder::default()) } + + #[tokio::test] + async fn headers_are_set() { + let url = mockito::server_url(); + + let http_connection = Builder::new() + .header( + HeaderName::from_static("foo"), + HeaderValue::from_static("bar"), + ) + .build(&url) + .await + .unwrap() + .into_http_connection(); + + let url = format!("{}/the_api", url); + println!("Sending to {url}"); + + let m = mockito::mock("POST", "/the_api") + .with_status(201) + .with_body("world") + .match_header("FOO", "bar") + .create(); + + http_connection + .client() + .request(Method::POST, &url) + .send() + .await + .expect("Error making http request"); + + m.assert(); + } } diff --git a/client_util/src/lib.rs b/client_util/src/lib.rs index 487680b699..ebfc52ea16 100644 --- a/client_util/src/lib.rs +++ b/client_util/src/lib.rs @@ -21,3 +21,6 @@ pub mod connection; mod tower; + +/// Namespace <--> org/bucket utilities +pub mod namespace_translation; diff --git a/client_util/src/namespace_translation.rs b/client_util/src/namespace_translation.rs new file mode 100644 index 0000000000..53f011e34f --- /dev/null +++ b/client_util/src/namespace_translation.rs @@ -0,0 +1,90 @@ +//! Contains logic to map namespace back/forth to org/bucket + +use thiserror::Error; + +/// Errors returned by namespace parsing +#[allow(missing_docs)] +#[derive(Debug, Error)] +pub enum Error { + #[error("Invalid namespace '{namespace}': {reason}")] + InvalidNamespace { namespace: String, reason: String }, +} + +impl Error { + fn new(namespace: impl Into, reason: impl Into) -> Self { + Self::InvalidNamespace { + namespace: namespace.into(), + reason: reason.into(), + } + } +} + +/// Splits up the namespace name into org_id and bucket_id +pub fn split_namespace(namespace: &str) -> Result<(&str, &str), Error> { + let mut iter = namespace.split('_'); + let org_id = iter.next().ok_or_else(|| Error::new(namespace, "empty"))?; + + if org_id.is_empty() { + return Err(Error::new(namespace, "No org_id found")); + } + + let bucket_id = iter + .next() + .ok_or_else(|| Error::new(namespace, "Could not find '_'"))?; + + if bucket_id.is_empty() { + return Err(Error::new(namespace, "No bucket_id found")); + } + + if iter.next().is_some() { + return Err(Error::new(namespace, "More than one '_'")); + } + + Ok((org_id, bucket_id)) +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn split_good() { + assert_eq!(split_namespace("foo_bar").unwrap(), ("foo", "bar")); + } + + #[test] + #[should_panic(expected = "No org_id found")] + fn split_bad_empty() { + split_namespace("").unwrap(); + } + + #[test] + #[should_panic(expected = "No org_id found")] + fn split_bad_only_underscore() { + split_namespace("_").unwrap(); + } + + #[test] + #[should_panic(expected = "No org_id found")] + fn split_bad_empty_org_id() { + split_namespace("_ff").unwrap(); + } + + #[test] + #[should_panic(expected = "No bucket_id found")] + fn split_bad_empty_bucket_id() { + split_namespace("ff_").unwrap(); + } + + #[test] + #[should_panic(expected = "More than one '_'")] + fn split_too_many() { + split_namespace("ff_bf_").unwrap(); + } + + #[test] + #[should_panic(expected = "More than one '_'")] + fn split_way_too_many() { + split_namespace("ff_bf_dfd_3_f").unwrap(); + } +} diff --git a/import/src/aggregate_tsm_schema/update_catalog.rs b/import/src/aggregate_tsm_schema/update_catalog.rs index 35e808b5cc..8243ec16c0 100644 --- a/import/src/aggregate_tsm_schema/update_catalog.rs +++ b/import/src/aggregate_tsm_schema/update_catalog.rs @@ -5,7 +5,7 @@ use data_types::{ org_and_bucket_to_database, ColumnType, Namespace, NamespaceSchema, OrgBucketMappingError, Partition, PartitionKey, QueryPoolId, ShardId, TableSchema, TopicId, }; -use influxdb_iox_client::connection::Connection; +use influxdb_iox_client::connection::{Connection, GrpcConnection}; use iox_catalog::interface::{get_schema_by_name, Catalog, ColumnUpsertRequest, RepoCollection}; use schema::{ sort::{adjust_sort_key_columns, SortKey, SortKeyBuilder}, @@ -98,7 +98,7 @@ pub async fn update_iox_catalog<'a>( // initialise a client of the shard service in the router. we will use it to find out which // shard a table/namespace combo would shard to, without exposing the implementation // details of the sharding - let mut shard_client = ShardServiceClient::new(connection); + let mut shard_client = ShardServiceClient::new(connection.into_grpc_connection()); update_catalog_schema_with_merged( namespace_name.as_str(), iox_schema, @@ -176,7 +176,7 @@ async fn update_catalog_schema_with_merged( iox_schema: NamespaceSchema, merged_tsm_schema: &AggregateTSMSchema, repos: &mut R, - shard_client: &mut ShardServiceClient, + shard_client: &mut ShardServiceClient, ) -> Result<(), UpdateCatalogError> where R: RepoCollection + ?Sized, diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index 06da5bb425..f2b400b07c 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -13,8 +13,7 @@ data_types = { path = "../data_types" } datafusion = { path = "../datafusion" } generated_types = { path = "../generated_types" } import = { path = "../import" } -influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] } -influxdb2_client = { path = "../influxdb2_client" } +influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format"] } influxdb_storage_client = { path = "../influxdb_storage_client" } influxrpc_parser = { path = "../influxrpc_parser"} iox_catalog = { path = "../iox_catalog" } diff --git a/influxdb_iox/src/commands/write.rs b/influxdb_iox/src/commands/write.rs index 1818285d45..e5aff6bd88 100644 --- a/influxdb_iox/src/commands/write.rs +++ b/influxdb_iox/src/commands/write.rs @@ -1,6 +1,5 @@ -use influxdb2_client::RequestError; -use observability_deps::tracing::debug; -use snafu::{OptionExt, ResultExt, Snafu}; +use influxdb_iox_client::{connection::Connection, write}; +use snafu::{ResultExt, Snafu}; use std::{fs::File, io::Read, path::PathBuf}; #[allow(clippy::enum_variant_names)] @@ -13,10 +12,9 @@ pub enum Error { }, #[snafu(display("Client error: {source}"))] - ClientError { source: RequestError }, - - #[snafu(display("Invalid namespace '{namespace}': {reason}"))] - InvalidNamespace { namespace: String, reason: String }, + ClientError { + source: influxdb_iox_client::error::Error, + }, } pub type Result = std::result::Result; @@ -33,7 +31,7 @@ pub struct Config { file_name: PathBuf, } -pub async fn command(url: String, config: Config) -> Result<()> { +pub async fn command(connection: Connection, config: Config) -> Result<()> { let Config { namespace, file_name, @@ -46,19 +44,10 @@ pub async fn command(url: String, config: Config) -> Result<()> { file.read_to_string(&mut lp_data) .context(ReadingFileSnafu { file_name })?; - let total_bytes = lp_data.len(); + let mut client = write::Client::new(connection); - // split a namespace name ("foo_bar") into org_bucket - let (org_id, bucket_id) = split_namespace(&namespace)?; - - debug!(url, total_bytes, org_id, bucket_id, "Writing data"); - - // IOx's v2 api doesn't validate auth tokens so pass an empty one - let auth_token = ""; - let client = influxdb2_client::Client::new(url, auth_token); - - client - .write_line_protocol(org_id, bucket_id, lp_data) + let total_bytes = client + .write_lp(namespace, lp_data) .await .context(ClientSnafu)?; @@ -66,89 +55,3 @@ pub async fn command(url: String, config: Config) -> Result<()> { Ok(()) } - -/// Splits up the strings into org_id and bucket_id -fn split_namespace(namespace: &str) -> Result<(&str, &str)> { - let mut iter = namespace.split('_'); - let org_id = iter.next().context(InvalidNamespaceSnafu { - namespace, - reason: "empty", - })?; - - if org_id.is_empty() { - return InvalidNamespaceSnafu { - namespace, - reason: "No org_id found", - } - .fail(); - } - - let bucket_id = iter.next().context(InvalidNamespaceSnafu { - namespace, - reason: "Could not find '_'", - })?; - - if bucket_id.is_empty() { - return InvalidNamespaceSnafu { - namespace, - reason: "No bucket_id found", - } - .fail(); - } - - if iter.next().is_some() { - return InvalidNamespaceSnafu { - namespace, - reason: "More than one '_'", - } - .fail(); - } - - Ok((org_id, bucket_id)) -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn split_good() { - assert_eq!(split_namespace("foo_bar").unwrap(), ("foo", "bar")); - } - - #[test] - #[should_panic(expected = "No org_id found")] - fn split_bad_empty() { - split_namespace("").unwrap(); - } - - #[test] - #[should_panic(expected = "No org_id found")] - fn split_bad_only_underscore() { - split_namespace("_").unwrap(); - } - - #[test] - #[should_panic(expected = "No org_id found")] - fn split_bad_empty_org_id() { - split_namespace("_ff").unwrap(); - } - - #[test] - #[should_panic(expected = "No bucket_id found")] - fn split_bad_empty_bucket_id() { - split_namespace("ff_").unwrap(); - } - - #[test] - #[should_panic(expected = "More than one '_'")] - fn split_too_many() { - split_namespace("ff_bf_").unwrap(); - } - - #[test] - #[should_panic(expected = "More than one '_'")] - fn split_way_too_many() { - split_namespace("ff_bf_dfd_3_f").unwrap(); - } -} diff --git a/influxdb_iox/src/main.rs b/influxdb_iox/src/main.rs index f384b1c164..5b20b559d9 100644 --- a/influxdb_iox/src/main.rs +++ b/influxdb_iox/src/main.rs @@ -209,7 +209,6 @@ fn main() -> Result<(), std::io::Error> { let log_verbose_count = config.all_in_one_config.logging_config.log_verbose_count; let rpc_timeout = config.rpc_timeout; - let host_captured = host.clone(); let connection = || async move { let mut builder = headers.into_iter().fold(Builder::default(), |builder, kv| { builder.header(kv.key, kv.value) @@ -230,10 +229,10 @@ fn main() -> Result<(), std::io::Error> { println!("Trace ID set to {}", trace_id); } - match builder.build(&host_captured).await { + match builder.build(&host).await { Ok(connection) => connection, Err(e) => { - eprintln!("Error connecting to {}: {}", host_captured, e); + eprintln!("Error connecting to {}: {}", host, e); std::process::exit(ReturnCode::Failure as _) } } @@ -312,7 +311,8 @@ fn main() -> Result<(), std::io::Error> { } Some(Command::Write(config)) => { let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count)); - if let Err(e) = commands::write::command(host, config).await { + let connection = connection().await; + if let Err(e) = commands::write::command(connection, config).await { eprintln!("{}", e); std::process::exit(ReturnCode::Failure as _) } diff --git a/influxdb_iox/tests/end_to_end_cases/mod.rs b/influxdb_iox/tests/end_to_end_cases/mod.rs index 99a35bec97..9bae0d7c8b 100644 --- a/influxdb_iox/tests/end_to_end_cases/mod.rs +++ b/influxdb_iox/tests/end_to_end_cases/mod.rs @@ -11,6 +11,5 @@ mod logging; mod metrics; mod namespace; mod querier; -mod router; mod schema; mod tracing; diff --git a/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/metadata.rs b/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/metadata.rs index 7f7174421b..e5041a416e 100644 --- a/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/metadata.rs +++ b/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/metadata.rs @@ -2,7 +2,7 @@ use super::{run_data_test, run_no_data_test}; use futures::{prelude::*, FutureExt}; use generated_types::{ google::protobuf::Empty, measurement_fields_response::FieldType, - offsets_response::PartitionOffsetResponse, storage_client::StorageClient, OffsetsResponse, + offsets_response::PartitionOffsetResponse, OffsetsResponse, }; use influxdb_storage_client::tag_key_bytes_to_strings; use std::sync::Arc; @@ -13,8 +13,7 @@ use test_helpers_end_to_end::{DataGenerator, GrpcRequestBuilder, StepTestState}; async fn capabilities() { run_no_data_test(Box::new(|state: &mut StepTestState| { async move { - let mut storage_client = - StorageClient::new(state.cluster().querier().querier_grpc_connection()); + let mut storage_client = state.cluster().querier_storage_client(); let capabilities_response = storage_client .capabilities(Empty {}) .await @@ -37,8 +36,7 @@ async fn capabilities() { async fn offsets() { run_no_data_test(Box::new(|state: &mut StepTestState| { async move { - let mut storage_client = - StorageClient::new(state.cluster().querier().querier_grpc_connection()); + let mut storage_client = state.cluster().querier_storage_client(); let offsets_response = storage_client.offsets(Empty {}).await.unwrap().into_inner(); let expected = OffsetsResponse { partitions: vec![PartitionOffsetResponse { id: 0, offset: 1 }], @@ -57,8 +55,7 @@ async fn tag_keys() { Arc::clone(&generator), Box::new(move |state: &mut StepTestState| { async move { - let mut storage_client = - StorageClient::new(state.cluster().querier().querier_grpc_connection()); + let mut storage_client = state.cluster().querier_storage_client(); let tag_keys_request = GrpcRequestBuilder::new() .source(state.cluster()) @@ -90,8 +87,7 @@ async fn tag_values() { Arc::clone(&generator), Box::new(move |state: &mut StepTestState| { async move { - let mut storage_client = - StorageClient::new(state.cluster().querier().querier_grpc_connection()); + let mut storage_client = state.cluster().querier_storage_client(); let tag_values_request = GrpcRequestBuilder::new() .source(state.cluster()) @@ -128,8 +124,7 @@ async fn measurement_names() { Arc::clone(&generator), Box::new(move |state: &mut StepTestState| { async move { - let mut storage_client = - StorageClient::new(state.cluster().querier().querier_grpc_connection()); + let mut storage_client = state.cluster().querier_storage_client(); let measurement_names_request = GrpcRequestBuilder::new() .source(state.cluster()) @@ -170,8 +165,7 @@ async fn measurement_tag_keys() { Arc::clone(&generator), Box::new(move |state: &mut StepTestState| { async move { - let mut storage_client = - StorageClient::new(state.cluster().querier().querier_grpc_connection()); + let mut storage_client = state.cluster().querier_storage_client(); let measurement_tag_keys_request = GrpcRequestBuilder::new() .source(state.cluster()) @@ -210,8 +204,7 @@ async fn measurement_tag_values() { Arc::clone(&generator), Box::new(move |state: &mut StepTestState| { async move { - let mut storage_client = - StorageClient::new(state.cluster().querier().querier_grpc_connection()); + let mut storage_client = state.cluster().querier_storage_client(); let measurement_tag_values_request = GrpcRequestBuilder::new() .source(state.cluster()) @@ -250,8 +243,7 @@ async fn measurement_fields() { Arc::clone(&generator), Box::new(move |state: &mut StepTestState| { async move { - let mut storage_client = - StorageClient::new(state.cluster().querier().querier_grpc_connection()); + let mut storage_client = state.cluster().querier_storage_client(); let measurement_fields_request = GrpcRequestBuilder::new() .source(state.cluster()) diff --git a/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/read_filter.rs b/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/read_filter.rs index c4e4b3b8ca..12a01b5750 100644 --- a/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/read_filter.rs +++ b/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/read_filter.rs @@ -3,7 +3,7 @@ use futures::{prelude::*, FutureExt}; use generated_types::{ read_response::frame::Data, storage_client::StorageClient, ReadFilterRequest, }; -use influxdb_iox_client::connection::Connection; +use influxdb_iox_client::connection::GrpcConnection; use std::sync::Arc; use test_helpers_end_to_end::{ maybe_skip_integration, DataGenerator, GrpcRequestBuilder, MiniCluster, Step, StepTest, @@ -15,8 +15,7 @@ async fn read_filter() { let generator = Arc::new(DataGenerator::new()); run_data_test(Arc::clone(&generator), Box::new(move |state: &mut StepTestState| { async move { - let mut storage_client = - StorageClient::new(state.cluster().querier().querier_grpc_connection()); + let mut storage_client = state.cluster().querier_storage_client(); let read_filter_request = GrpcRequestBuilder::new() .source(state.cluster()) @@ -155,8 +154,7 @@ async fn do_read_filter_test( Step::WaitForReadable, Step::Custom(Box::new(move |state: &mut StepTestState| { async move { - let mut storage_client = - StorageClient::new(state.cluster().querier().querier_grpc_connection()); + let mut storage_client = state.cluster().querier_storage_client(); println!("Sending read_filter request with {:#?}", request_builder); @@ -178,7 +176,7 @@ async fn do_read_filter_test( /// Make a read_group request and returns the results in a comparable format async fn do_read_filter_request( - storage_client: &mut StorageClient, + storage_client: &mut StorageClient, request: tonic::Request, ) -> Vec { let read_filter_response = storage_client diff --git a/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/read_group.rs b/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/read_group.rs index e997d15f28..269b39cf3d 100644 --- a/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/read_group.rs +++ b/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/read_group.rs @@ -1,7 +1,7 @@ use super::{dump::dump_data_frames, read_group_data}; use futures::{prelude::*, FutureExt}; use generated_types::storage_client::StorageClient; -use influxdb_iox_client::connection::Connection; +use influxdb_iox_client::connection::GrpcConnection; use test_helpers_end_to_end::{ maybe_skip_integration, GrpcRequestBuilder, MiniCluster, Step, StepTest, StepTestState, }; @@ -202,8 +202,12 @@ async fn do_read_group_test( Step::WaitForReadable, Step::Custom(Box::new(move |state: &mut StepTestState| { async move { - let mut storage_client = - StorageClient::new(state.cluster().querier().querier_grpc_connection()); + let grpc_connection = state + .cluster() + .querier() + .querier_grpc_connection() + .into_grpc_connection(); + let mut storage_client = StorageClient::new(grpc_connection); println!("Sending read_group request with {:#?}", request_builder); @@ -225,7 +229,7 @@ async fn do_read_group_test( /// Make a read_group request and returns the results in a comparable format async fn do_read_group_request( - storage_client: &mut StorageClient, + storage_client: &mut StorageClient, request: tonic::Request, ) -> Vec { let read_group_response = storage_client diff --git a/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/read_window_aggregate.rs b/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/read_window_aggregate.rs index 5b1b6b2d74..7fe1ea73b9 100644 --- a/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/read_window_aggregate.rs +++ b/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/read_window_aggregate.rs @@ -1,6 +1,6 @@ use super::dump::dump_data_frames; use futures::{prelude::*, FutureExt}; -use generated_types::{aggregate::AggregateType, storage_client::StorageClient}; +use generated_types::aggregate::AggregateType; use test_helpers_end_to_end::{ maybe_skip_integration, GrpcRequestBuilder, MiniCluster, Step, StepTest, StepTestState, }; @@ -40,8 +40,7 @@ pub async fn read_window_aggregate_test() { Step::WaitForReadable, Step::Custom(Box::new(move |state: &mut StepTestState| { async move { - let mut storage_client = - StorageClient::new(state.cluster().querier().querier_grpc_connection()); + let mut storage_client = state.cluster().querier_storage_client(); let request = GrpcRequestBuilder::new() diff --git a/influxdb_iox/tests/end_to_end_cases/router.rs b/influxdb_iox/tests/end_to_end_cases/router.rs deleted file mode 100644 index db993495ac..0000000000 --- a/influxdb_iox/tests/end_to_end_cases/router.rs +++ /dev/null @@ -1,101 +0,0 @@ -use influxdb_iox_client::write::generated_types::{column, Column, TableBatch}; -use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTest}; - -#[tokio::test] -async fn write_via_grpc() { - test_helpers::maybe_start_logging(); - let database_url = maybe_skip_integration!(); - - let table_name = "the_table"; - let table_batches = vec![TableBatch { - table_name: table_name.to_string(), - columns: vec![ - Column { - column_name: "time".to_string(), - semantic_type: column::SemanticType::Time as i32, - values: Some(column::Values { - i64_values: vec![123456], - f64_values: vec![], - u64_values: vec![], - string_values: vec![], - bool_values: vec![], - bytes_values: vec![], - packed_string_values: None, - interned_string_values: None, - }), - null_mask: vec![], - }, - Column { - column_name: "val".to_string(), - semantic_type: column::SemanticType::Field as i32, - values: Some(column::Values { - i64_values: vec![42], - f64_values: vec![], - u64_values: vec![], - string_values: vec![], - bool_values: vec![], - bytes_values: vec![], - packed_string_values: None, - interned_string_values: None, - }), - null_mask: vec![], - }, - Column { - column_name: "tag1".to_string(), - semantic_type: column::SemanticType::Tag as i32, - values: Some(column::Values { - i64_values: vec![], - f64_values: vec![], - u64_values: vec![], - string_values: vec!["A".into()], - bool_values: vec![], - bytes_values: vec![], - packed_string_values: None, - interned_string_values: None, - }), - null_mask: vec![], - }, - Column { - column_name: "tag2".to_string(), - semantic_type: column::SemanticType::Tag as i32, - values: Some(column::Values { - i64_values: vec![], - f64_values: vec![], - u64_values: vec![], - string_values: vec!["B".into()], - bool_values: vec![], - bytes_values: vec![], - packed_string_values: None, - interned_string_values: None, - }), - null_mask: vec![], - }, - ], - row_count: 1, - }]; - - // Set up the cluster ==================================== - let mut cluster = MiniCluster::create_shared(database_url).await; - - StepTest::new( - &mut cluster, - vec![ - Step::WriteTableBatches(table_batches), - // format!("{},tag1=A,tag2=B val=42i 123456", table_name)), - // Wait for data to be persisted to parquet - Step::WaitForPersisted, - Step::Query { - sql: format!("select * from {}", table_name), - expected: vec![ - "+------+------+--------------------------------+-----+", - "| tag1 | tag2 | time | val |", - "+------+------+--------------------------------+-----+", - "| A | B | 1970-01-01T00:00:00.000123456Z | 42 |", - "+------+------+--------------------------------+-----+", - ], - }, - ], - ) - .run() - .await -} diff --git a/influxdb_iox/tests/end_to_end_cases/tracing.rs b/influxdb_iox/tests/end_to_end_cases/tracing.rs index 8b8cea615c..53059fcba8 100644 --- a/influxdb_iox/tests/end_to_end_cases/tracing.rs +++ b/influxdb_iox/tests/end_to_end_cases/tracing.rs @@ -1,5 +1,4 @@ use futures::{prelude::*, FutureExt}; -use generated_types::storage_client::StorageClient; use test_helpers_end_to_end::{ maybe_skip_integration, GrpcRequestBuilder, MiniCluster, Step, StepTest, StepTestState, TestConfig, UdpCapture, @@ -70,8 +69,7 @@ pub async fn test_tracing_storage_api() { Step::WaitForReadable, Step::Custom(Box::new(move |state: &mut StepTestState| { let cluster = state.cluster(); - let mut storage_client = - StorageClient::new(cluster.querier().querier_grpc_connection()); + let mut storage_client = cluster.querier_storage_client(); let read_filter_request = GrpcRequestBuilder::new() .source(state.cluster()) diff --git a/influxdb_iox_client/Cargo.toml b/influxdb_iox_client/Cargo.toml index a75dc3b460..9b674c4a33 100644 --- a/influxdb_iox_client/Cargo.toml +++ b/influxdb_iox_client/Cargo.toml @@ -5,19 +5,15 @@ authors = ["Dom Dwyer "] edition = "2021" [features] -default = ["flight", "format", "write_lp"] +default = ["flight", "format"] flight = ["arrow", "arrow-flight", "arrow_util", "futures-util"] format = ["arrow", "arrow_util"] -write_lp = ["dml", "mutable_batch_lp", "mutable_batch_pb"] [dependencies] # Workspace dependencies, in alphabetical order arrow_util = { path = "../arrow_util", optional = true } client_util = { path = "../client_util" } -dml = { path = "../dml", optional = true } -generated_types = { path = "../generated_types", default-features = false } -mutable_batch_lp = { path = "../mutable_batch_lp", optional = true } -mutable_batch_pb = { path = "../mutable_batch_pb", optional = true } +generated_types = { path = "../generated_types", default-features = false, features = ["data_types_conversions"] } # Crates.io dependencies, in alphabetical order arrow = { version = "23.0.0", optional = true } @@ -26,8 +22,10 @@ bytes = "1.2" futures-util = { version = "0.3", optional = true } prost = "0.11" rand = "0.8.3" +reqwest = { version = "0.11", default-features = false, features = ["stream", "rustls-tls"] } thiserror = "1.0.37" tonic = { version = "0.8" } [dev-dependencies] # In alphabetical order tokio = { version = "1.21", features = ["macros", "parking_lot", "rt-multi-thread"] } +mockito = "0.31" \ No newline at end of file diff --git a/influxdb_iox_client/src/client/catalog.rs b/influxdb_iox_client/src/client/catalog.rs index 743e8ef8f1..39c331d6ec 100644 --- a/influxdb_iox_client/src/client/catalog.rs +++ b/influxdb_iox_client/src/client/catalog.rs @@ -1,3 +1,5 @@ +use client_util::connection::GrpcConnection; + use self::generated_types::{catalog_service_client::CatalogServiceClient, *}; use crate::connection::Connection; @@ -11,14 +13,14 @@ pub mod generated_types { /// A basic client for interacting the a remote catalog. #[derive(Debug, Clone)] pub struct Client { - inner: CatalogServiceClient, + inner: CatalogServiceClient, } impl Client { /// Creates a new client with the provided connection - pub fn new(channel: Connection) -> Self { + pub fn new(connection: Connection) -> Self { Self { - inner: CatalogServiceClient::new(channel), + inner: CatalogServiceClient::new(connection.into_grpc_connection()), } } diff --git a/influxdb_iox_client/src/client/delete.rs b/influxdb_iox_client/src/client/delete.rs index b6658ed61b..4b4e64d26a 100644 --- a/influxdb_iox_client/src/client/delete.rs +++ b/influxdb_iox_client/src/client/delete.rs @@ -1,3 +1,5 @@ +use client_util::connection::GrpcConnection; + use self::generated_types::{delete_service_client::DeleteServiceClient, *}; use crate::connection::Connection; @@ -60,14 +62,14 @@ pub mod generated_types { /// ``` #[derive(Debug, Clone)] pub struct Client { - inner: DeleteServiceClient, + inner: DeleteServiceClient, } impl Client { /// Creates a new client with the provided connection - pub fn new(channel: Connection) -> Self { + pub fn new(connection: Connection) -> Self { Self { - inner: DeleteServiceClient::new(channel), + inner: DeleteServiceClient::new(connection.into_grpc_connection()), } } diff --git a/influxdb_iox_client/src/client/error.rs b/influxdb_iox_client/src/client/error.rs index 7a96845213..6619ce1eb0 100644 --- a/influxdb_iox_client/src/client/error.rs +++ b/influxdb_iox_client/src/client/error.rs @@ -139,3 +139,68 @@ impl From for Error { } } } + +impl Error { + /// Return a `Error::Unknown` variant with the specified message + pub(crate) fn unknown(message: impl Into) -> Self { + Self::Unknown(ServerError { + message: message.into(), + details: None, + }) + } + + /// Return a `Error::Internal` variant with the specified message + pub(crate) fn internal(message: impl Into) -> Self { + Self::Internal(ServerError { + message: message.into(), + details: None, + }) + } + + /// Return a `Error::Client` variant with the specified message + pub(crate) fn client(e: E) -> Self { + Self::Client(Box::new(e)) + } + + /// Return `Error::InvalidArgument` specifing an error in `field_name` + pub(crate) fn invalid_argument( + field_name: impl Into, + description: impl Into, + ) -> Self { + let field_name = field_name.into(); + let description = description.into(); + + Self::InvalidArgument(ServerError { + message: format!("Invalid argument for '{}': {}", field_name, description), + details: Some(FieldViolation { + field: field_name, + description, + }), + }) + } +} + +/// Translates a reqwest response to an Error +pub(crate) async fn translate_response(response: reqwest::Response) -> Result<(), Error> { + let status = response.status(); + + if status.is_success() { + Ok(()) + } else if status.is_server_error() { + Err(Error::internal(response_description(response).await)) + } else { + // todo would be nice to check for 404, etc and return more specific errors + Err(Error::unknown(response_description(response).await)) + } +} + +/// Makes as detailed error message as possible +async fn response_description(response: reqwest::Response) -> String { + let status = response.status(); + + // see if the response has any text we can include + match response.text().await { + Ok(text) => format!("(status {status}): {text}"), + Err(_) => format!("status: {status}"), + } +} diff --git a/influxdb_iox_client/src/client/flight/low_level.rs b/influxdb_iox_client/src/client/flight/low_level.rs index 9ddf5d5d15..6e0860aa50 100644 --- a/influxdb_iox_client/src/client/flight/low_level.rs +++ b/influxdb_iox_client/src/client/flight/low_level.rs @@ -21,6 +21,7 @@ use ::generated_types::influxdata::iox::{ ingester::v1::{IngesterQueryRequest, IngesterQueryResponseMetadata}, querier::v1::{AppMetadata, ReadInfo}, }; +use client_util::connection::{Connection, GrpcConnection}; use futures_util::stream; use futures_util::stream::StreamExt; use prost::Message; @@ -39,7 +40,6 @@ use arrow_flight::{ }; use super::Error; -use crate::connection::Connection; use rand::Rng; /// Metadata that can be send during flight requests. @@ -67,7 +67,7 @@ pub struct Client where T: ClientMetadata, { - inner: FlightServiceClient, + inner: FlightServiceClient, _phantom: PhantomData, } @@ -76,9 +76,9 @@ where T: ClientMetadata, { /// Creates a new client with the provided connection - pub fn new(channel: Connection) -> Self { + pub fn new(connection: Connection) -> Self { Self { - inner: FlightServiceClient::new(channel), + inner: FlightServiceClient::new(connection.into_grpc_connection()), _phantom: PhantomData::default(), } } diff --git a/influxdb_iox_client/src/client/flight/mod.rs b/influxdb_iox_client/src/client/flight/mod.rs index a33464d0a6..588bbba855 100644 --- a/influxdb_iox_client/src/client/flight/mod.rs +++ b/influxdb_iox_client/src/client/flight/mod.rs @@ -118,9 +118,9 @@ pub struct Client { impl Client { /// Creates a new client with the provided connection - pub fn new(channel: Connection) -> Self { + pub fn new(connection: Connection) -> Self { Self { - inner: LowLevelClient::new(channel), + inner: LowLevelClient::new(connection), } } diff --git a/influxdb_iox_client/src/client/health.rs b/influxdb_iox_client/src/client/health.rs index c655ffb309..aa602e09db 100644 --- a/influxdb_iox_client/src/client/health.rs +++ b/influxdb_iox_client/src/client/health.rs @@ -2,7 +2,7 @@ use generated_types::google::FieldViolation; use generated_types::grpc::health::v1::*; -use crate::connection::Connection; +use crate::connection::{Connection, GrpcConnection}; use crate::error::Error; /// A client for the gRPC health checking API @@ -10,14 +10,14 @@ use crate::error::Error; /// Allows checking the status of a given service #[derive(Debug)] pub struct Client { - inner: health_client::HealthClient, + inner: health_client::HealthClient, } impl Client { /// Creates a new client with the provided connection pub fn new(channel: Connection) -> Self { Self { - inner: health_client::HealthClient::new(channel), + inner: health_client::HealthClient::new(channel.into_grpc_connection()), } } diff --git a/influxdb_iox_client/src/client/namespace.rs b/influxdb_iox_client/src/client/namespace.rs index 1b60143a67..6b25edc344 100644 --- a/influxdb_iox_client/src/client/namespace.rs +++ b/influxdb_iox_client/src/client/namespace.rs @@ -1,3 +1,5 @@ +use client_util::connection::GrpcConnection; + use self::generated_types::{namespace_service_client::NamespaceServiceClient, *}; use crate::connection::Connection; use crate::error::Error; @@ -10,14 +12,14 @@ pub mod generated_types { /// A basic client for fetching the Schema for a Namespace. #[derive(Debug, Clone)] pub struct Client { - inner: NamespaceServiceClient, + inner: NamespaceServiceClient, } impl Client { /// Creates a new client with the provided connection - pub fn new(channel: Connection) -> Self { + pub fn new(connection: Connection) -> Self { Self { - inner: NamespaceServiceClient::new(channel), + inner: NamespaceServiceClient::new(connection.into_grpc_connection()), } } diff --git a/influxdb_iox_client/src/client/schema.rs b/influxdb_iox_client/src/client/schema.rs index 48fae2f609..8f9e7bcd8b 100644 --- a/influxdb_iox_client/src/client/schema.rs +++ b/influxdb_iox_client/src/client/schema.rs @@ -1,5 +1,6 @@ use self::generated_types::{schema_service_client::SchemaServiceClient, *}; use ::generated_types::google::OptionalField; +use client_util::connection::GrpcConnection; use crate::connection::Connection; use crate::error::Error; @@ -12,14 +13,14 @@ pub mod generated_types { /// A basic client for fetching the Schema for a Namespace. #[derive(Debug, Clone)] pub struct Client { - inner: SchemaServiceClient, + inner: SchemaServiceClient, } impl Client { /// Creates a new client with the provided connection - pub fn new(channel: Connection) -> Self { + pub fn new(connection: Connection) -> Self { Self { - inner: SchemaServiceClient::new(channel), + inner: SchemaServiceClient::new(connection.into_grpc_connection()), } } diff --git a/influxdb_iox_client/src/client/store.rs b/influxdb_iox_client/src/client/store.rs index b57581dbf4..88fbaac57e 100644 --- a/influxdb_iox_client/src/client/store.rs +++ b/influxdb_iox_client/src/client/store.rs @@ -3,6 +3,7 @@ use self::generated_types::{object_store_service_client::ObjectStoreServiceClien use crate::connection::Connection; use crate::error::Error; +use client_util::connection::GrpcConnection; use futures_util::stream::BoxStream; use tonic::Status; @@ -14,14 +15,14 @@ pub mod generated_types { /// A basic client for interacting the a remote catalog. #[derive(Debug, Clone)] pub struct Client { - inner: ObjectStoreServiceClient, + inner: ObjectStoreServiceClient, } impl Client { /// Creates a new client with the provided connection - pub fn new(channel: Connection) -> Self { + pub fn new(connection: Connection) -> Self { Self { - inner: ObjectStoreServiceClient::new(channel), + inner: ObjectStoreServiceClient::new(connection.into_grpc_connection()), } } diff --git a/influxdb_iox_client/src/client/test.rs b/influxdb_iox_client/src/client/test.rs index 9ba7b2e3ae..ebc468b665 100644 --- a/influxdb_iox_client/src/client/test.rs +++ b/influxdb_iox_client/src/client/test.rs @@ -1,3 +1,4 @@ +use client_util::connection::GrpcConnection; /// Re-export generated_types use generated_types::{i_ox_testing_client::IOxTestingClient, TestErrorRequest}; @@ -30,14 +31,14 @@ use crate::error::Error; /// ``` #[derive(Debug, Clone)] pub struct Client { - inner: IOxTestingClient, + inner: IOxTestingClient, } impl Client { /// Creates a new client with the provided connection - pub fn new(channel: Connection) -> Self { + pub fn new(connection: Connection) -> Self { Self { - inner: IOxTestingClient::new(channel), + inner: IOxTestingClient::new(connection.into_grpc_connection()), } } diff --git a/influxdb_iox_client/src/client/write.rs b/influxdb_iox_client/src/client/write.rs index f4209762a7..1ee584d8a0 100644 --- a/influxdb_iox_client/src/client/write.rs +++ b/influxdb_iox_client/src/client/write.rs @@ -3,10 +3,13 @@ pub mod generated_types { pub use generated_types::influxdata::pbdata::v1::*; } -use self::generated_types::write_service_client::WriteServiceClient; +use client_util::{connection::HttpConnection, namespace_translation::split_namespace}; -use crate::connection::Connection; -use crate::error::Error; +use crate::{ + connection::Connection, + error::{translate_response, Error}, +}; +use reqwest::Method; /// An IOx Write API client. /// @@ -19,7 +22,7 @@ use crate::error::Error; /// }; /// /// let mut connection = Builder::default() -/// .build("http://127.0.0.1:8082") +/// .build("http://127.0.0.1:8080") /// .await /// .unwrap(); /// @@ -27,60 +30,88 @@ use crate::error::Error; /// /// // write a line of line procol data /// client -/// .write_lp("bananas", "cpu,region=west user=23.2 100",0) +/// .write_lp("bananas", "cpu,region=west user=23.2 100") /// .await /// .expect("failed to write to IOx"); /// # } /// ``` #[derive(Debug, Clone)] pub struct Client { - inner: WriteServiceClient, + inner: HttpConnection, } impl Client { /// Creates a new client with the provided connection - pub fn new(channel: Connection) -> Self { + pub fn new(connection: Connection) -> Self { Self { - inner: WriteServiceClient::new(channel), + inner: connection.into_http_connection(), } } /// Write the [LineProtocol] formatted data in `lp_data` to - /// database `name`. Lines without a timestamp will be assigned `default_time` + /// namespace `namespace`. /// - /// Returns the number of lines which were parsed and written to the database + /// Returns the number of bytes which were written to the database /// /// [LineProtocol]: https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format - #[cfg(feature = "write_lp")] pub async fn write_lp( &mut self, - db_name: impl AsRef + Send, - lp_data: impl AsRef + Send, - default_time: i64, + namespace: impl AsRef + Send, + lp_data: impl Into + Send, ) -> Result { - let tables = mutable_batch_lp::lines_to_batches(lp_data.as_ref(), default_time) - .map_err(|e| Error::Client(Box::new(e)))?; + let lp_data = lp_data.into(); + let data_len = lp_data.len(); - let meta = dml::DmlMeta::unsequenced(None); - let write = dml::DmlWrite::new(db_name.as_ref().to_string(), tables, None, meta); - let lines = write.tables().map(|(_, table)| table.rows()).sum(); + let write_url = format!("{}api/v2/write", self.inner.uri()); - let database_batch = mutable_batch_pb::encode::encode_write(db_name.as_ref(), &write); + let (org_id, bucket_id) = split_namespace(namespace.as_ref()).map_err(|e| { + Error::invalid_argument( + "namespace", + format!("Could not find valid org_id and bucket_id: {}", e), + ) + })?; - self.inner - .write(generated_types::WriteRequest { - database_batch: Some(database_batch), - }) - .await?; + let response = self + .inner + .client() + .request(Method::POST, &write_url) + .query(&[("bucket", bucket_id), ("org", org_id)]) + .body(lp_data) + .send() + .await + .map_err(Error::client)?; - Ok(lines) - } + translate_response(response).await?; - /// Write a protobuf batch. - pub async fn write_pb( - &mut self, - write_request: generated_types::WriteRequest, - ) -> Result, Error> { - Ok(self.inner.write(write_request).await?) + Ok(data_len) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::connection::Builder; + + #[tokio::test] + /// Ensure the basic plumbing is hooked up correctly + async fn basic() { + let url = mockito::server_url(); + + let connection = Builder::new().build(&url).await.unwrap(); + + let namespace = "orgname_bucketname"; + let data = "m,t=foo f=4"; + + let m = mockito::mock("POST", "/api/v2/write?bucket=bucketname&org=orgname") + .with_status(201) + .match_body(data) + .create(); + + let res = Client::new(connection).write_lp(namespace, data).await; + + m.assert(); + + let num_bytes = res.expect("Error making write request"); + assert_eq!(num_bytes, 11); } } diff --git a/influxdb_iox_client/src/client/write_info.rs b/influxdb_iox_client/src/client/write_info.rs index 73a9df1f55..1778d4be0c 100644 --- a/influxdb_iox_client/src/client/write_info.rs +++ b/influxdb_iox_client/src/client/write_info.rs @@ -1,3 +1,5 @@ +use client_util::connection::GrpcConnection; + use self::generated_types::{write_info_service_client::WriteInfoServiceClient, *}; use crate::connection::Connection; @@ -22,14 +24,14 @@ pub mod generated_types { /// #[derive(Debug, Clone)] pub struct Client { - inner: WriteInfoServiceClient, + inner: WriteInfoServiceClient, } impl Client { /// Creates a new client with the provided connection - pub fn new(channel: Connection) -> Self { + pub fn new(connection: Connection) -> Self { Self { - inner: WriteInfoServiceClient::new(channel), + inner: WriteInfoServiceClient::new(connection.into_grpc_connection()), } } diff --git a/influxdb_iox_client/src/lib.rs b/influxdb_iox_client/src/lib.rs index 2147cdda7b..4bb0181513 100644 --- a/influxdb_iox_client/src/lib.rs +++ b/influxdb_iox_client/src/lib.rs @@ -22,6 +22,7 @@ pub use generated_types::{google, protobuf_type_url, protobuf_type_url_eq}; pub use client::*; pub use client_util::connection; +pub use client_util::namespace_translation; #[cfg(feature = "format")] /// Output formatting utilities diff --git a/influxdb_storage_client/src/lib.rs b/influxdb_storage_client/src/lib.rs index 5c4b700731..d4dfffd84e 100644 --- a/influxdb_storage_client/src/lib.rs +++ b/influxdb_storage_client/src/lib.rs @@ -17,6 +17,7 @@ )] #![allow(clippy::missing_docs_in_private_items)] +use client_util::connection::GrpcConnection; use futures_util::TryStreamExt; use prost::Message; use std::collections::HashMap; @@ -75,14 +76,14 @@ impl OrgAndBucket { /// A client for the InfluxDB gRPC storage API #[derive(Debug)] pub struct Client { - inner: storage_client::StorageClient, + inner: storage_client::StorageClient, } impl Client { /// Creates a new client with the provided connection - pub fn new(channel: Connection) -> Self { + pub fn new(connection: Connection) -> Self { Self { - inner: storage_client::StorageClient::new(channel), + inner: storage_client::StorageClient::new(connection.into_grpc_connection()), } } diff --git a/service_grpc_influxrpc/src/service.rs b/service_grpc_influxrpc/src/service.rs index d83046390c..f8d54e2d05 100644 --- a/service_grpc_influxrpc/src/service.rs +++ b/service_grpc_influxrpc/src/service.rs @@ -1614,7 +1614,7 @@ mod tests { use futures::Future; use generated_types::{i_ox_testing_client::IOxTestingClient, tag_key_predicate::Value}; use influxdb_storage_client::{ - connection::{Builder as ConnectionBuilder, Connection}, + connection::{Builder as ConnectionBuilder, GrpcConnection}, generated_types::*, Client as StorageClient, OrgAndBucket, }; @@ -3482,7 +3482,7 @@ mod tests { // Wrapper around raw clients and test database struct Fixture { - iox_client: IOxTestingClient, + iox_client: IOxTestingClient, storage_client: StorageClient, test_storage: Arc, join_handle: JoinHandle<()>, @@ -3542,7 +3542,7 @@ mod tests { .await .unwrap(); - let iox_client = IOxTestingClient::new(conn.clone()); + let iox_client = IOxTestingClient::new(conn.clone().into_grpc_connection()); let storage_client = StorageClient::new(conn); diff --git a/test_helpers_end_to_end/Cargo.toml b/test_helpers_end_to_end/Cargo.toml index c54aeb940e..c525695798 100644 --- a/test_helpers_end_to_end/Cargo.toml +++ b/test_helpers_end_to_end/Cargo.toml @@ -13,7 +13,7 @@ futures = "0.3" generated_types = { path = "../generated_types" } http = "0.2.8" hyper = "0.14" -influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] } +influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format"] } nix = "0.25" observability_deps = { path = "../observability_deps" } once_cell = { version = "1.15.0", features = ["parking_lot"] } diff --git a/test_helpers_end_to_end/src/client.rs b/test_helpers_end_to_end/src/client.rs index 86fc9daa7d..0f4567a973 100644 --- a/test_helpers_end_to_end/src/client.rs +++ b/test_helpers_end_to_end/src/client.rs @@ -6,7 +6,7 @@ use hyper::{Body, Client, Request}; use influxdb_iox_client::{ connection::Connection, flight::generated_types::ReadInfo, - write::generated_types::{DatabaseBatch, TableBatch, WriteRequest, WriteResponse}, + write::generated_types::WriteResponse, write_info::generated_types::{merge_responses, GetWriteInfoResponse, ShardStatus}, }; use observability_deps::tracing::info; @@ -39,27 +39,6 @@ pub async fn write_to_router( .expect("http error sending write") } -/// Writes the table batch to the gRPC write API on the router into the org/bucket (typically on -/// the router) -pub async fn write_to_router_grpc( - table_batches: Vec, - namespace: impl Into, - router_connection: Connection, -) -> tonic::Response { - let request = WriteRequest { - database_batch: Some(DatabaseBatch { - database_name: namespace.into(), - table_batches, - partition_key: Default::default(), - }), - }; - - influxdb_iox_client::write::Client::new(router_connection) - .write_pb(request) - .await - .expect("grpc error sending write") -} - /// Extracts the write token from the specified response (to the /api/v2/write api) pub fn get_write_token(response: &Response) -> String { let message = format!("no write token in {:?}", response); diff --git a/test_helpers_end_to_end/src/mini_cluster.rs b/test_helpers_end_to_end/src/mini_cluster.rs index f0a5913545..7add89797a 100644 --- a/test_helpers_end_to_end/src/mini_cluster.rs +++ b/test_helpers_end_to_end/src/mini_cluster.rs @@ -1,12 +1,12 @@ use crate::{ - dump_log_to_stdout, log_command, rand_id, write_to_router, write_to_router_grpc, ServerFixture, - TestConfig, TestServer, + dump_log_to_stdout, log_command, rand_id, write_to_router, ServerFixture, TestConfig, + TestServer, }; use assert_cmd::prelude::*; use futures::{stream::FuturesOrdered, StreamExt}; use http::Response; use hyper::Body; -use influxdb_iox_client::write::generated_types::{TableBatch, WriteResponse}; +use influxdb_iox_client::connection::GrpcConnection; use observability_deps::tracing::{debug, info}; use once_cell::sync::Lazy; use std::{ @@ -248,19 +248,6 @@ impl MiniCluster { .await } - /// Writes the table batch to the gRPC write API on the router into the org/bucket - pub async fn write_to_router_grpc( - &self, - table_batches: Vec, - ) -> tonic::Response { - write_to_router_grpc( - table_batches, - &self.namespace, - self.router().router_grpc_connection(), - ) - .await - } - /// Get a reference to the mini cluster's other servers. pub fn other_servers(&self) -> &[ServerFixture] { self.other_servers.as_ref() @@ -311,6 +298,18 @@ impl MiniCluster { command.ok().unwrap(); dump_log_to_stdout("compactor run-once", &log_path); } + + /// Create a storage client connected to the querier member of the cluster + pub fn querier_storage_client( + &self, + ) -> generated_types::storage_client::StorageClient { + let grpc_connection = self + .querier() + .querier_grpc_connection() + .into_grpc_connection(); + + generated_types::storage_client::StorageClient::new(grpc_connection) + } } /// holds shared server processes to share across tests diff --git a/test_helpers_end_to_end/src/steps.rs b/test_helpers_end_to_end/src/steps.rs index 70397ffd94..26b580d8bb 100644 --- a/test_helpers_end_to_end/src/steps.rs +++ b/test_helpers_end_to_end/src/steps.rs @@ -1,12 +1,11 @@ use crate::{ - get_write_token, get_write_token_from_grpc, run_query, token_is_persisted, wait_for_persisted, - wait_for_readable, MiniCluster, + get_write_token, run_query, token_is_persisted, wait_for_persisted, wait_for_readable, + MiniCluster, }; use arrow::record_batch::RecordBatch; use arrow_util::assert_batches_sorted_eq; use futures::future::BoxFuture; use http::StatusCode; -use influxdb_iox_client::write::generated_types::TableBatch; use observability_deps::tracing::info; /// Test harness for end to end tests that are comprised of several steps @@ -75,9 +74,6 @@ pub enum Step { /// endpoint, assert the data was written successfully WriteLineProtocol(String), - /// Writes the specified `TableBatch`es to the gRPC write API - WriteTableBatches(Vec), - /// Wait for all previously written data to be readable WaitForReadable, @@ -155,13 +151,6 @@ impl<'a> StepTest<'a> { info!("====Done writing line protocol, got token {}", write_token); state.write_tokens.push(write_token); } - Step::WriteTableBatches(table_batches) => { - info!("====Begin writing TableBatches to gRPC API"); - let response = state.cluster.write_to_router_grpc(table_batches).await; - let write_token = get_write_token_from_grpc(&response); - info!("====Done writing TableBatches, got token {}", write_token); - state.write_tokens.push(write_token); - } Step::WaitForReadable => { info!("====Begin waiting for all write tokens to be readable"); let querier_grpc_connection =