Merge branch 'main' into dom/remove-dbg

pull/24376/head
Dom 2022-09-29 12:14:19 +01:00 committed by GitHub
commit 8dbeef25ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 450 additions and 388 deletions

8
Cargo.lock generated
View File

@ -658,6 +658,8 @@ name = "client_util"
version = "0.1.0"
dependencies = [
"http",
"mockito",
"reqwest",
"thiserror",
"tokio",
"tonic",
@ -2045,7 +2047,6 @@ dependencies = [
"http",
"humantime",
"import",
"influxdb2_client",
"influxdb_iox_client",
"influxdb_storage_client",
"influxrpc_parser",
@ -2103,13 +2104,12 @@ dependencies = [
"arrow_util",
"bytes",
"client_util",
"dml",
"futures-util",
"generated_types",
"mutable_batch_lp",
"mutable_batch_pb",
"mockito",
"prost 0.11.0",
"rand",
"reqwest",
"thiserror",
"tokio",
"tonic",

View File

@ -7,6 +7,7 @@ edition = "2021"
[dependencies]
http = "0.2.8"
reqwest = { version = "0.11", default-features = false, features = ["stream", "rustls-tls"] }
thiserror = "1.0.37"
tonic = { version = "0.8" }
tower = "0.4"
@ -14,3 +15,4 @@ workspace-hack = { path = "../workspace-hack"}
[dev-dependencies]
tokio = { version = "1.21", features = ["macros", "parking_lot", "rt-multi-thread"] }
mockito = "0.31"

View File

@ -1,5 +1,6 @@
use crate::tower::{SetRequestHeadersLayer, SetRequestHeadersService};
use http::header::HeaderName;
use http::HeaderMap;
use http::{uri::InvalidUri, HeaderValue, Uri};
use std::convert::TryInto;
use std::time::Duration;
@ -7,8 +8,63 @@ use thiserror::Error;
use tonic::transport::{Channel, Endpoint};
use tower::make::MakeConnection;
/// The connection type used for clients
pub type Connection = SetRequestHeadersService<tonic::transport::Channel>;
/// The connection type used for clients. Use [`Builder`] to create
/// instances of [`Connection`] objects
#[derive(Debug, Clone)]
pub struct Connection {
grpc_connection: GrpcConnection,
http_connection: HttpConnection,
}
impl Connection {
/// Create a new Connection
fn new(grpc_connection: GrpcConnection, http_connection: HttpConnection) -> Self {
Self {
grpc_connection,
http_connection,
}
}
/// Consume `self` and return a [`GrpcConnection`] (suitable for use in
/// tonic clients)
pub fn into_grpc_connection(self) -> GrpcConnection {
self.grpc_connection
}
/// Consume `self` and return a [`HttpConnection`] (suitable for making
/// calls to /api/v2 endpoints)
pub fn into_http_connection(self) -> HttpConnection {
self.http_connection
}
}
/// The type used to make tonic (gRPC) requests
pub type GrpcConnection = SetRequestHeadersService<tonic::transport::Channel>;
/// The type used to make raw http request
#[derive(Debug, Clone)]
pub struct HttpConnection {
/// The base uri of the IOx http API endpoint
uri: Uri,
/// http client connection
http_client: reqwest::Client,
}
impl HttpConnection {
fn new(uri: Uri, http_client: reqwest::Client) -> Self {
Self { uri, http_client }
}
/// Return a reference to the underyling http client
pub fn client(&self) -> &reqwest::Client {
&self.http_client
}
/// Return a reference to the base uri of the IOx http API endpoint
pub fn uri(&self) -> &Uri {
&self.uri
}
}
/// The default User-Agent header sent by the HTTP client.
pub const USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
@ -100,7 +156,7 @@ impl Builder {
{
let endpoint = self.create_endpoint(dst)?;
let channel = endpoint.connect().await?;
Ok(self.compose_middleware(channel))
Ok(self.compose_middleware(channel, endpoint))
}
/// Construct the [`Connection`] instance using the specified base URL and custom connector.
@ -114,7 +170,7 @@ impl Builder {
{
let endpoint = self.create_endpoint(dst)?;
let channel = endpoint.connect_with_connector(connector).await?;
Ok(self.compose_middleware(channel))
Ok(self.compose_middleware(channel, endpoint))
}
fn create_endpoint<D>(&self, dst: D) -> Result<Endpoint>
@ -128,11 +184,23 @@ impl Builder {
Ok(endpoint)
}
fn compose_middleware(self, channel: Channel) -> Connection {
fn compose_middleware(self, channel: Channel, endpoint: Endpoint) -> Connection {
let headers_map: HeaderMap = self.headers.iter().cloned().collect();
// Compose channel with new tower middleware stack
tower::ServiceBuilder::new()
let grpc_connection = tower::ServiceBuilder::new()
.layer(SetRequestHeadersLayer::new(self.headers))
.service(channel)
.service(channel);
let http_client = reqwest::Client::builder()
.connection_verbose(true)
.default_headers(headers_map)
.build()
.expect("reqwest::Client should have built");
let http_connection = HttpConnection::new(endpoint.uri().clone(), http_client);
Connection::new(grpc_connection, http_connection)
}
/// Set the `User-Agent` header sent by this client.
@ -180,6 +248,7 @@ impl Builder {
#[cfg(test)]
mod tests {
use super::*;
use reqwest::Method;
#[test]
fn test_builder_cloneable() {
@ -187,4 +256,37 @@ mod tests {
fn assert_clone<T: Clone>(_t: T) {}
assert_clone(Builder::default())
}
#[tokio::test]
async fn headers_are_set() {
let url = mockito::server_url();
let http_connection = Builder::new()
.header(
HeaderName::from_static("foo"),
HeaderValue::from_static("bar"),
)
.build(&url)
.await
.unwrap()
.into_http_connection();
let url = format!("{}/the_api", url);
println!("Sending to {url}");
let m = mockito::mock("POST", "/the_api")
.with_status(201)
.with_body("world")
.match_header("FOO", "bar")
.create();
http_connection
.client()
.request(Method::POST, &url)
.send()
.await
.expect("Error making http request");
m.assert();
}
}

View File

@ -21,3 +21,6 @@
pub mod connection;
mod tower;
/// Namespace <--> org/bucket utilities
pub mod namespace_translation;

View File

@ -0,0 +1,90 @@
//! Contains logic to map namespace back/forth to org/bucket
use thiserror::Error;
/// Errors returned by namespace parsing
#[allow(missing_docs)]
#[derive(Debug, Error)]
pub enum Error {
#[error("Invalid namespace '{namespace}': {reason}")]
InvalidNamespace { namespace: String, reason: String },
}
impl Error {
fn new(namespace: impl Into<String>, reason: impl Into<String>) -> Self {
Self::InvalidNamespace {
namespace: namespace.into(),
reason: reason.into(),
}
}
}
/// Splits up the namespace name into org_id and bucket_id
pub fn split_namespace(namespace: &str) -> Result<(&str, &str), Error> {
let mut iter = namespace.split('_');
let org_id = iter.next().ok_or_else(|| Error::new(namespace, "empty"))?;
if org_id.is_empty() {
return Err(Error::new(namespace, "No org_id found"));
}
let bucket_id = iter
.next()
.ok_or_else(|| Error::new(namespace, "Could not find '_'"))?;
if bucket_id.is_empty() {
return Err(Error::new(namespace, "No bucket_id found"));
}
if iter.next().is_some() {
return Err(Error::new(namespace, "More than one '_'"));
}
Ok((org_id, bucket_id))
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn split_good() {
assert_eq!(split_namespace("foo_bar").unwrap(), ("foo", "bar"));
}
#[test]
#[should_panic(expected = "No org_id found")]
fn split_bad_empty() {
split_namespace("").unwrap();
}
#[test]
#[should_panic(expected = "No org_id found")]
fn split_bad_only_underscore() {
split_namespace("_").unwrap();
}
#[test]
#[should_panic(expected = "No org_id found")]
fn split_bad_empty_org_id() {
split_namespace("_ff").unwrap();
}
#[test]
#[should_panic(expected = "No bucket_id found")]
fn split_bad_empty_bucket_id() {
split_namespace("ff_").unwrap();
}
#[test]
#[should_panic(expected = "More than one '_'")]
fn split_too_many() {
split_namespace("ff_bf_").unwrap();
}
#[test]
#[should_panic(expected = "More than one '_'")]
fn split_way_too_many() {
split_namespace("ff_bf_dfd_3_f").unwrap();
}
}

View File

@ -5,7 +5,7 @@ use data_types::{
org_and_bucket_to_database, ColumnType, Namespace, NamespaceSchema, OrgBucketMappingError,
Partition, PartitionKey, QueryPoolId, ShardId, TableSchema, TopicId,
};
use influxdb_iox_client::connection::Connection;
use influxdb_iox_client::connection::{Connection, GrpcConnection};
use iox_catalog::interface::{get_schema_by_name, Catalog, ColumnUpsertRequest, RepoCollection};
use schema::{
sort::{adjust_sort_key_columns, SortKey, SortKeyBuilder},
@ -98,7 +98,7 @@ pub async fn update_iox_catalog<'a>(
// initialise a client of the shard service in the router. we will use it to find out which
// shard a table/namespace combo would shard to, without exposing the implementation
// details of the sharding
let mut shard_client = ShardServiceClient::new(connection);
let mut shard_client = ShardServiceClient::new(connection.into_grpc_connection());
update_catalog_schema_with_merged(
namespace_name.as_str(),
iox_schema,
@ -176,7 +176,7 @@ async fn update_catalog_schema_with_merged<R>(
iox_schema: NamespaceSchema,
merged_tsm_schema: &AggregateTSMSchema,
repos: &mut R,
shard_client: &mut ShardServiceClient<Connection>,
shard_client: &mut ShardServiceClient<GrpcConnection>,
) -> Result<(), UpdateCatalogError>
where
R: RepoCollection + ?Sized,

View File

@ -13,8 +13,7 @@ data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
generated_types = { path = "../generated_types" }
import = { path = "../import" }
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] }
influxdb2_client = { path = "../influxdb2_client" }
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format"] }
influxdb_storage_client = { path = "../influxdb_storage_client" }
influxrpc_parser = { path = "../influxrpc_parser"}
iox_catalog = { path = "../iox_catalog" }

View File

@ -1,6 +1,5 @@
use influxdb2_client::RequestError;
use observability_deps::tracing::debug;
use snafu::{OptionExt, ResultExt, Snafu};
use influxdb_iox_client::{connection::Connection, write};
use snafu::{ResultExt, Snafu};
use std::{fs::File, io::Read, path::PathBuf};
#[allow(clippy::enum_variant_names)]
@ -13,10 +12,9 @@ pub enum Error {
},
#[snafu(display("Client error: {source}"))]
ClientError { source: RequestError },
#[snafu(display("Invalid namespace '{namespace}': {reason}"))]
InvalidNamespace { namespace: String, reason: String },
ClientError {
source: influxdb_iox_client::error::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -33,7 +31,7 @@ pub struct Config {
file_name: PathBuf,
}
pub async fn command(url: String, config: Config) -> Result<()> {
pub async fn command(connection: Connection, config: Config) -> Result<()> {
let Config {
namespace,
file_name,
@ -46,19 +44,10 @@ pub async fn command(url: String, config: Config) -> Result<()> {
file.read_to_string(&mut lp_data)
.context(ReadingFileSnafu { file_name })?;
let total_bytes = lp_data.len();
let mut client = write::Client::new(connection);
// split a namespace name ("foo_bar") into org_bucket
let (org_id, bucket_id) = split_namespace(&namespace)?;
debug!(url, total_bytes, org_id, bucket_id, "Writing data");
// IOx's v2 api doesn't validate auth tokens so pass an empty one
let auth_token = "";
let client = influxdb2_client::Client::new(url, auth_token);
client
.write_line_protocol(org_id, bucket_id, lp_data)
let total_bytes = client
.write_lp(namespace, lp_data)
.await
.context(ClientSnafu)?;
@ -66,89 +55,3 @@ pub async fn command(url: String, config: Config) -> Result<()> {
Ok(())
}
/// Splits up the strings into org_id and bucket_id
fn split_namespace(namespace: &str) -> Result<(&str, &str)> {
let mut iter = namespace.split('_');
let org_id = iter.next().context(InvalidNamespaceSnafu {
namespace,
reason: "empty",
})?;
if org_id.is_empty() {
return InvalidNamespaceSnafu {
namespace,
reason: "No org_id found",
}
.fail();
}
let bucket_id = iter.next().context(InvalidNamespaceSnafu {
namespace,
reason: "Could not find '_'",
})?;
if bucket_id.is_empty() {
return InvalidNamespaceSnafu {
namespace,
reason: "No bucket_id found",
}
.fail();
}
if iter.next().is_some() {
return InvalidNamespaceSnafu {
namespace,
reason: "More than one '_'",
}
.fail();
}
Ok((org_id, bucket_id))
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn split_good() {
assert_eq!(split_namespace("foo_bar").unwrap(), ("foo", "bar"));
}
#[test]
#[should_panic(expected = "No org_id found")]
fn split_bad_empty() {
split_namespace("").unwrap();
}
#[test]
#[should_panic(expected = "No org_id found")]
fn split_bad_only_underscore() {
split_namespace("_").unwrap();
}
#[test]
#[should_panic(expected = "No org_id found")]
fn split_bad_empty_org_id() {
split_namespace("_ff").unwrap();
}
#[test]
#[should_panic(expected = "No bucket_id found")]
fn split_bad_empty_bucket_id() {
split_namespace("ff_").unwrap();
}
#[test]
#[should_panic(expected = "More than one '_'")]
fn split_too_many() {
split_namespace("ff_bf_").unwrap();
}
#[test]
#[should_panic(expected = "More than one '_'")]
fn split_way_too_many() {
split_namespace("ff_bf_dfd_3_f").unwrap();
}
}

View File

@ -209,7 +209,6 @@ fn main() -> Result<(), std::io::Error> {
let log_verbose_count = config.all_in_one_config.logging_config.log_verbose_count;
let rpc_timeout = config.rpc_timeout;
let host_captured = host.clone();
let connection = || async move {
let mut builder = headers.into_iter().fold(Builder::default(), |builder, kv| {
builder.header(kv.key, kv.value)
@ -230,10 +229,10 @@ fn main() -> Result<(), std::io::Error> {
println!("Trace ID set to {}", trace_id);
}
match builder.build(&host_captured).await {
match builder.build(&host).await {
Ok(connection) => connection,
Err(e) => {
eprintln!("Error connecting to {}: {}", host_captured, e);
eprintln!("Error connecting to {}: {}", host, e);
std::process::exit(ReturnCode::Failure as _)
}
}
@ -312,7 +311,8 @@ fn main() -> Result<(), std::io::Error> {
}
Some(Command::Write(config)) => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
if let Err(e) = commands::write::command(host, config).await {
let connection = connection().await;
if let Err(e) = commands::write::command(connection, config).await {
eprintln!("{}", e);
std::process::exit(ReturnCode::Failure as _)
}

View File

@ -11,6 +11,5 @@ mod logging;
mod metrics;
mod namespace;
mod querier;
mod router;
mod schema;
mod tracing;

View File

@ -2,7 +2,7 @@ use super::{run_data_test, run_no_data_test};
use futures::{prelude::*, FutureExt};
use generated_types::{
google::protobuf::Empty, measurement_fields_response::FieldType,
offsets_response::PartitionOffsetResponse, storage_client::StorageClient, OffsetsResponse,
offsets_response::PartitionOffsetResponse, OffsetsResponse,
};
use influxdb_storage_client::tag_key_bytes_to_strings;
use std::sync::Arc;
@ -13,8 +13,7 @@ use test_helpers_end_to_end::{DataGenerator, GrpcRequestBuilder, StepTestState};
async fn capabilities() {
run_no_data_test(Box::new(|state: &mut StepTestState| {
async move {
let mut storage_client =
StorageClient::new(state.cluster().querier().querier_grpc_connection());
let mut storage_client = state.cluster().querier_storage_client();
let capabilities_response = storage_client
.capabilities(Empty {})
.await
@ -37,8 +36,7 @@ async fn capabilities() {
async fn offsets() {
run_no_data_test(Box::new(|state: &mut StepTestState| {
async move {
let mut storage_client =
StorageClient::new(state.cluster().querier().querier_grpc_connection());
let mut storage_client = state.cluster().querier_storage_client();
let offsets_response = storage_client.offsets(Empty {}).await.unwrap().into_inner();
let expected = OffsetsResponse {
partitions: vec![PartitionOffsetResponse { id: 0, offset: 1 }],
@ -57,8 +55,7 @@ async fn tag_keys() {
Arc::clone(&generator),
Box::new(move |state: &mut StepTestState| {
async move {
let mut storage_client =
StorageClient::new(state.cluster().querier().querier_grpc_connection());
let mut storage_client = state.cluster().querier_storage_client();
let tag_keys_request = GrpcRequestBuilder::new()
.source(state.cluster())
@ -90,8 +87,7 @@ async fn tag_values() {
Arc::clone(&generator),
Box::new(move |state: &mut StepTestState| {
async move {
let mut storage_client =
StorageClient::new(state.cluster().querier().querier_grpc_connection());
let mut storage_client = state.cluster().querier_storage_client();
let tag_values_request = GrpcRequestBuilder::new()
.source(state.cluster())
@ -128,8 +124,7 @@ async fn measurement_names() {
Arc::clone(&generator),
Box::new(move |state: &mut StepTestState| {
async move {
let mut storage_client =
StorageClient::new(state.cluster().querier().querier_grpc_connection());
let mut storage_client = state.cluster().querier_storage_client();
let measurement_names_request = GrpcRequestBuilder::new()
.source(state.cluster())
@ -170,8 +165,7 @@ async fn measurement_tag_keys() {
Arc::clone(&generator),
Box::new(move |state: &mut StepTestState| {
async move {
let mut storage_client =
StorageClient::new(state.cluster().querier().querier_grpc_connection());
let mut storage_client = state.cluster().querier_storage_client();
let measurement_tag_keys_request = GrpcRequestBuilder::new()
.source(state.cluster())
@ -210,8 +204,7 @@ async fn measurement_tag_values() {
Arc::clone(&generator),
Box::new(move |state: &mut StepTestState| {
async move {
let mut storage_client =
StorageClient::new(state.cluster().querier().querier_grpc_connection());
let mut storage_client = state.cluster().querier_storage_client();
let measurement_tag_values_request = GrpcRequestBuilder::new()
.source(state.cluster())
@ -250,8 +243,7 @@ async fn measurement_fields() {
Arc::clone(&generator),
Box::new(move |state: &mut StepTestState| {
async move {
let mut storage_client =
StorageClient::new(state.cluster().querier().querier_grpc_connection());
let mut storage_client = state.cluster().querier_storage_client();
let measurement_fields_request = GrpcRequestBuilder::new()
.source(state.cluster())

View File

@ -3,7 +3,7 @@ use futures::{prelude::*, FutureExt};
use generated_types::{
read_response::frame::Data, storage_client::StorageClient, ReadFilterRequest,
};
use influxdb_iox_client::connection::Connection;
use influxdb_iox_client::connection::GrpcConnection;
use std::sync::Arc;
use test_helpers_end_to_end::{
maybe_skip_integration, DataGenerator, GrpcRequestBuilder, MiniCluster, Step, StepTest,
@ -15,8 +15,7 @@ async fn read_filter() {
let generator = Arc::new(DataGenerator::new());
run_data_test(Arc::clone(&generator), Box::new(move |state: &mut StepTestState| {
async move {
let mut storage_client =
StorageClient::new(state.cluster().querier().querier_grpc_connection());
let mut storage_client = state.cluster().querier_storage_client();
let read_filter_request = GrpcRequestBuilder::new()
.source(state.cluster())
@ -155,8 +154,7 @@ async fn do_read_filter_test(
Step::WaitForReadable,
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
let mut storage_client =
StorageClient::new(state.cluster().querier().querier_grpc_connection());
let mut storage_client = state.cluster().querier_storage_client();
println!("Sending read_filter request with {:#?}", request_builder);
@ -178,7 +176,7 @@ async fn do_read_filter_test(
/// Make a read_group request and returns the results in a comparable format
async fn do_read_filter_request(
storage_client: &mut StorageClient<Connection>,
storage_client: &mut StorageClient<GrpcConnection>,
request: tonic::Request<ReadFilterRequest>,
) -> Vec<String> {
let read_filter_response = storage_client

View File

@ -1,7 +1,7 @@
use super::{dump::dump_data_frames, read_group_data};
use futures::{prelude::*, FutureExt};
use generated_types::storage_client::StorageClient;
use influxdb_iox_client::connection::Connection;
use influxdb_iox_client::connection::GrpcConnection;
use test_helpers_end_to_end::{
maybe_skip_integration, GrpcRequestBuilder, MiniCluster, Step, StepTest, StepTestState,
};
@ -202,8 +202,12 @@ async fn do_read_group_test(
Step::WaitForReadable,
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
let mut storage_client =
StorageClient::new(state.cluster().querier().querier_grpc_connection());
let grpc_connection = state
.cluster()
.querier()
.querier_grpc_connection()
.into_grpc_connection();
let mut storage_client = StorageClient::new(grpc_connection);
println!("Sending read_group request with {:#?}", request_builder);
@ -225,7 +229,7 @@ async fn do_read_group_test(
/// Make a read_group request and returns the results in a comparable format
async fn do_read_group_request(
storage_client: &mut StorageClient<Connection>,
storage_client: &mut StorageClient<GrpcConnection>,
request: tonic::Request<ReadGroupRequest>,
) -> Vec<String> {
let read_group_response = storage_client

View File

@ -1,6 +1,6 @@
use super::dump::dump_data_frames;
use futures::{prelude::*, FutureExt};
use generated_types::{aggregate::AggregateType, storage_client::StorageClient};
use generated_types::aggregate::AggregateType;
use test_helpers_end_to_end::{
maybe_skip_integration, GrpcRequestBuilder, MiniCluster, Step, StepTest, StepTestState,
};
@ -40,8 +40,7 @@ pub async fn read_window_aggregate_test() {
Step::WaitForReadable,
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
let mut storage_client =
StorageClient::new(state.cluster().querier().querier_grpc_connection());
let mut storage_client = state.cluster().querier_storage_client();
let request = GrpcRequestBuilder::new()

View File

@ -1,101 +0,0 @@
use influxdb_iox_client::write::generated_types::{column, Column, TableBatch};
use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTest};
#[tokio::test]
async fn write_via_grpc() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
let table_batches = vec![TableBatch {
table_name: table_name.to_string(),
columns: vec![
Column {
column_name: "time".to_string(),
semantic_type: column::SemanticType::Time as i32,
values: Some(column::Values {
i64_values: vec![123456],
f64_values: vec![],
u64_values: vec![],
string_values: vec![],
bool_values: vec![],
bytes_values: vec![],
packed_string_values: None,
interned_string_values: None,
}),
null_mask: vec![],
},
Column {
column_name: "val".to_string(),
semantic_type: column::SemanticType::Field as i32,
values: Some(column::Values {
i64_values: vec![42],
f64_values: vec![],
u64_values: vec![],
string_values: vec![],
bool_values: vec![],
bytes_values: vec![],
packed_string_values: None,
interned_string_values: None,
}),
null_mask: vec![],
},
Column {
column_name: "tag1".to_string(),
semantic_type: column::SemanticType::Tag as i32,
values: Some(column::Values {
i64_values: vec![],
f64_values: vec![],
u64_values: vec![],
string_values: vec!["A".into()],
bool_values: vec![],
bytes_values: vec![],
packed_string_values: None,
interned_string_values: None,
}),
null_mask: vec![],
},
Column {
column_name: "tag2".to_string(),
semantic_type: column::SemanticType::Tag as i32,
values: Some(column::Values {
i64_values: vec![],
f64_values: vec![],
u64_values: vec![],
string_values: vec!["B".into()],
bool_values: vec![],
bytes_values: vec![],
packed_string_values: None,
interned_string_values: None,
}),
null_mask: vec![],
},
],
row_count: 1,
}];
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
vec![
Step::WriteTableBatches(table_batches),
// format!("{},tag1=A,tag2=B val=42i 123456", table_name)),
// Wait for data to be persisted to parquet
Step::WaitForPersisted,
Step::Query {
sql: format!("select * from {}", table_name),
expected: vec![
"+------+------+--------------------------------+-----+",
"| tag1 | tag2 | time | val |",
"+------+------+--------------------------------+-----+",
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
"+------+------+--------------------------------+-----+",
],
},
],
)
.run()
.await
}

View File

@ -1,5 +1,4 @@
use futures::{prelude::*, FutureExt};
use generated_types::storage_client::StorageClient;
use test_helpers_end_to_end::{
maybe_skip_integration, GrpcRequestBuilder, MiniCluster, Step, StepTest, StepTestState,
TestConfig, UdpCapture,
@ -70,8 +69,7 @@ pub async fn test_tracing_storage_api() {
Step::WaitForReadable,
Step::Custom(Box::new(move |state: &mut StepTestState| {
let cluster = state.cluster();
let mut storage_client =
StorageClient::new(cluster.querier().querier_grpc_connection());
let mut storage_client = cluster.querier_storage_client();
let read_filter_request = GrpcRequestBuilder::new()
.source(state.cluster())

View File

@ -5,19 +5,15 @@ authors = ["Dom Dwyer <dom@itsallbroken.com>"]
edition = "2021"
[features]
default = ["flight", "format", "write_lp"]
default = ["flight", "format"]
flight = ["arrow", "arrow-flight", "arrow_util", "futures-util"]
format = ["arrow", "arrow_util"]
write_lp = ["dml", "mutable_batch_lp", "mutable_batch_pb"]
[dependencies]
# Workspace dependencies, in alphabetical order
arrow_util = { path = "../arrow_util", optional = true }
client_util = { path = "../client_util" }
dml = { path = "../dml", optional = true }
generated_types = { path = "../generated_types", default-features = false }
mutable_batch_lp = { path = "../mutable_batch_lp", optional = true }
mutable_batch_pb = { path = "../mutable_batch_pb", optional = true }
generated_types = { path = "../generated_types", default-features = false, features = ["data_types_conversions"] }
# Crates.io dependencies, in alphabetical order
arrow = { version = "23.0.0", optional = true }
@ -26,8 +22,10 @@ bytes = "1.2"
futures-util = { version = "0.3", optional = true }
prost = "0.11"
rand = "0.8.3"
reqwest = { version = "0.11", default-features = false, features = ["stream", "rustls-tls"] }
thiserror = "1.0.37"
tonic = { version = "0.8" }
[dev-dependencies] # In alphabetical order
tokio = { version = "1.21", features = ["macros", "parking_lot", "rt-multi-thread"] }
mockito = "0.31"

View File

@ -1,3 +1,5 @@
use client_util::connection::GrpcConnection;
use self::generated_types::{catalog_service_client::CatalogServiceClient, *};
use crate::connection::Connection;
@ -11,14 +13,14 @@ pub mod generated_types {
/// A basic client for interacting the a remote catalog.
#[derive(Debug, Clone)]
pub struct Client {
inner: CatalogServiceClient<Connection>,
inner: CatalogServiceClient<GrpcConnection>,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
pub fn new(connection: Connection) -> Self {
Self {
inner: CatalogServiceClient::new(channel),
inner: CatalogServiceClient::new(connection.into_grpc_connection()),
}
}

View File

@ -1,3 +1,5 @@
use client_util::connection::GrpcConnection;
use self::generated_types::{delete_service_client::DeleteServiceClient, *};
use crate::connection::Connection;
@ -60,14 +62,14 @@ pub mod generated_types {
/// ```
#[derive(Debug, Clone)]
pub struct Client {
inner: DeleteServiceClient<Connection>,
inner: DeleteServiceClient<GrpcConnection>,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
pub fn new(connection: Connection) -> Self {
Self {
inner: DeleteServiceClient::new(channel),
inner: DeleteServiceClient::new(connection.into_grpc_connection()),
}
}

View File

@ -139,3 +139,68 @@ impl From<tonic::Status> for Error {
}
}
}
impl Error {
/// Return a `Error::Unknown` variant with the specified message
pub(crate) fn unknown(message: impl Into<String>) -> Self {
Self::Unknown(ServerError {
message: message.into(),
details: None,
})
}
/// Return a `Error::Internal` variant with the specified message
pub(crate) fn internal(message: impl Into<String>) -> Self {
Self::Internal(ServerError {
message: message.into(),
details: None,
})
}
/// Return a `Error::Client` variant with the specified message
pub(crate) fn client<E: std::error::Error + Send + Sync + 'static>(e: E) -> Self {
Self::Client(Box::new(e))
}
/// Return `Error::InvalidArgument` specifing an error in `field_name`
pub(crate) fn invalid_argument(
field_name: impl Into<String>,
description: impl Into<String>,
) -> Self {
let field_name = field_name.into();
let description = description.into();
Self::InvalidArgument(ServerError {
message: format!("Invalid argument for '{}': {}", field_name, description),
details: Some(FieldViolation {
field: field_name,
description,
}),
})
}
}
/// Translates a reqwest response to an Error
pub(crate) async fn translate_response(response: reqwest::Response) -> Result<(), Error> {
let status = response.status();
if status.is_success() {
Ok(())
} else if status.is_server_error() {
Err(Error::internal(response_description(response).await))
} else {
// todo would be nice to check for 404, etc and return more specific errors
Err(Error::unknown(response_description(response).await))
}
}
/// Makes as detailed error message as possible
async fn response_description(response: reqwest::Response) -> String {
let status = response.status();
// see if the response has any text we can include
match response.text().await {
Ok(text) => format!("(status {status}): {text}"),
Err(_) => format!("status: {status}"),
}
}

View File

@ -21,6 +21,7 @@ use ::generated_types::influxdata::iox::{
ingester::v1::{IngesterQueryRequest, IngesterQueryResponseMetadata},
querier::v1::{AppMetadata, ReadInfo},
};
use client_util::connection::{Connection, GrpcConnection};
use futures_util::stream;
use futures_util::stream::StreamExt;
use prost::Message;
@ -39,7 +40,6 @@ use arrow_flight::{
};
use super::Error;
use crate::connection::Connection;
use rand::Rng;
/// Metadata that can be send during flight requests.
@ -67,7 +67,7 @@ pub struct Client<T>
where
T: ClientMetadata,
{
inner: FlightServiceClient<Connection>,
inner: FlightServiceClient<GrpcConnection>,
_phantom: PhantomData<T>,
}
@ -76,9 +76,9 @@ where
T: ClientMetadata,
{
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
pub fn new(connection: Connection) -> Self {
Self {
inner: FlightServiceClient::new(channel),
inner: FlightServiceClient::new(connection.into_grpc_connection()),
_phantom: PhantomData::default(),
}
}

View File

@ -118,9 +118,9 @@ pub struct Client {
impl Client {
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
pub fn new(connection: Connection) -> Self {
Self {
inner: LowLevelClient::new(channel),
inner: LowLevelClient::new(connection),
}
}

View File

@ -2,7 +2,7 @@ use generated_types::google::FieldViolation;
use generated_types::grpc::health::v1::*;
use crate::connection::Connection;
use crate::connection::{Connection, GrpcConnection};
use crate::error::Error;
/// A client for the gRPC health checking API
@ -10,14 +10,14 @@ use crate::error::Error;
/// Allows checking the status of a given service
#[derive(Debug)]
pub struct Client {
inner: health_client::HealthClient<Connection>,
inner: health_client::HealthClient<GrpcConnection>,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
Self {
inner: health_client::HealthClient::new(channel),
inner: health_client::HealthClient::new(channel.into_grpc_connection()),
}
}

View File

@ -1,3 +1,5 @@
use client_util::connection::GrpcConnection;
use self::generated_types::{namespace_service_client::NamespaceServiceClient, *};
use crate::connection::Connection;
use crate::error::Error;
@ -10,14 +12,14 @@ pub mod generated_types {
/// A basic client for fetching the Schema for a Namespace.
#[derive(Debug, Clone)]
pub struct Client {
inner: NamespaceServiceClient<Connection>,
inner: NamespaceServiceClient<GrpcConnection>,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
pub fn new(connection: Connection) -> Self {
Self {
inner: NamespaceServiceClient::new(channel),
inner: NamespaceServiceClient::new(connection.into_grpc_connection()),
}
}

View File

@ -1,5 +1,6 @@
use self::generated_types::{schema_service_client::SchemaServiceClient, *};
use ::generated_types::google::OptionalField;
use client_util::connection::GrpcConnection;
use crate::connection::Connection;
use crate::error::Error;
@ -12,14 +13,14 @@ pub mod generated_types {
/// A basic client for fetching the Schema for a Namespace.
#[derive(Debug, Clone)]
pub struct Client {
inner: SchemaServiceClient<Connection>,
inner: SchemaServiceClient<GrpcConnection>,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
pub fn new(connection: Connection) -> Self {
Self {
inner: SchemaServiceClient::new(channel),
inner: SchemaServiceClient::new(connection.into_grpc_connection()),
}
}

View File

@ -3,6 +3,7 @@ use self::generated_types::{object_store_service_client::ObjectStoreServiceClien
use crate::connection::Connection;
use crate::error::Error;
use client_util::connection::GrpcConnection;
use futures_util::stream::BoxStream;
use tonic::Status;
@ -14,14 +15,14 @@ pub mod generated_types {
/// A basic client for interacting the a remote catalog.
#[derive(Debug, Clone)]
pub struct Client {
inner: ObjectStoreServiceClient<Connection>,
inner: ObjectStoreServiceClient<GrpcConnection>,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
pub fn new(connection: Connection) -> Self {
Self {
inner: ObjectStoreServiceClient::new(channel),
inner: ObjectStoreServiceClient::new(connection.into_grpc_connection()),
}
}

View File

@ -1,3 +1,4 @@
use client_util::connection::GrpcConnection;
/// Re-export generated_types
use generated_types::{i_ox_testing_client::IOxTestingClient, TestErrorRequest};
@ -30,14 +31,14 @@ use crate::error::Error;
/// ```
#[derive(Debug, Clone)]
pub struct Client {
inner: IOxTestingClient<Connection>,
inner: IOxTestingClient<GrpcConnection>,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
pub fn new(connection: Connection) -> Self {
Self {
inner: IOxTestingClient::new(channel),
inner: IOxTestingClient::new(connection.into_grpc_connection()),
}
}

View File

@ -3,10 +3,13 @@ pub mod generated_types {
pub use generated_types::influxdata::pbdata::v1::*;
}
use self::generated_types::write_service_client::WriteServiceClient;
use client_util::{connection::HttpConnection, namespace_translation::split_namespace};
use crate::connection::Connection;
use crate::error::Error;
use crate::{
connection::Connection,
error::{translate_response, Error},
};
use reqwest::Method;
/// An IOx Write API client.
///
@ -19,7 +22,7 @@ use crate::error::Error;
/// };
///
/// let mut connection = Builder::default()
/// .build("http://127.0.0.1:8082")
/// .build("http://127.0.0.1:8080")
/// .await
/// .unwrap();
///
@ -27,60 +30,88 @@ use crate::error::Error;
///
/// // write a line of line procol data
/// client
/// .write_lp("bananas", "cpu,region=west user=23.2 100",0)
/// .write_lp("bananas", "cpu,region=west user=23.2 100")
/// .await
/// .expect("failed to write to IOx");
/// # }
/// ```
#[derive(Debug, Clone)]
pub struct Client {
inner: WriteServiceClient<Connection>,
inner: HttpConnection,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
pub fn new(connection: Connection) -> Self {
Self {
inner: WriteServiceClient::new(channel),
inner: connection.into_http_connection(),
}
}
/// Write the [LineProtocol] formatted data in `lp_data` to
/// database `name`. Lines without a timestamp will be assigned `default_time`
/// namespace `namespace`.
///
/// Returns the number of lines which were parsed and written to the database
/// Returns the number of bytes which were written to the database
///
/// [LineProtocol]: https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format
#[cfg(feature = "write_lp")]
pub async fn write_lp(
&mut self,
db_name: impl AsRef<str> + Send,
lp_data: impl AsRef<str> + Send,
default_time: i64,
namespace: impl AsRef<str> + Send,
lp_data: impl Into<String> + Send,
) -> Result<usize, Error> {
let tables = mutable_batch_lp::lines_to_batches(lp_data.as_ref(), default_time)
.map_err(|e| Error::Client(Box::new(e)))?;
let lp_data = lp_data.into();
let data_len = lp_data.len();
let meta = dml::DmlMeta::unsequenced(None);
let write = dml::DmlWrite::new(db_name.as_ref().to_string(), tables, None, meta);
let lines = write.tables().map(|(_, table)| table.rows()).sum();
let write_url = format!("{}api/v2/write", self.inner.uri());
let database_batch = mutable_batch_pb::encode::encode_write(db_name.as_ref(), &write);
let (org_id, bucket_id) = split_namespace(namespace.as_ref()).map_err(|e| {
Error::invalid_argument(
"namespace",
format!("Could not find valid org_id and bucket_id: {}", e),
)
})?;
self.inner
.write(generated_types::WriteRequest {
database_batch: Some(database_batch),
})
.await?;
let response = self
.inner
.client()
.request(Method::POST, &write_url)
.query(&[("bucket", bucket_id), ("org", org_id)])
.body(lp_data)
.send()
.await
.map_err(Error::client)?;
Ok(lines)
}
translate_response(response).await?;
/// Write a protobuf batch.
pub async fn write_pb(
&mut self,
write_request: generated_types::WriteRequest,
) -> Result<tonic::Response<generated_types::WriteResponse>, Error> {
Ok(self.inner.write(write_request).await?)
Ok(data_len)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::connection::Builder;
#[tokio::test]
/// Ensure the basic plumbing is hooked up correctly
async fn basic() {
let url = mockito::server_url();
let connection = Builder::new().build(&url).await.unwrap();
let namespace = "orgname_bucketname";
let data = "m,t=foo f=4";
let m = mockito::mock("POST", "/api/v2/write?bucket=bucketname&org=orgname")
.with_status(201)
.match_body(data)
.create();
let res = Client::new(connection).write_lp(namespace, data).await;
m.assert();
let num_bytes = res.expect("Error making write request");
assert_eq!(num_bytes, 11);
}
}

View File

@ -1,3 +1,5 @@
use client_util::connection::GrpcConnection;
use self::generated_types::{write_info_service_client::WriteInfoServiceClient, *};
use crate::connection::Connection;
@ -22,14 +24,14 @@ pub mod generated_types {
/// <https://github.com/influxdata/influxdb_iox/issues/4354>
#[derive(Debug, Clone)]
pub struct Client {
inner: WriteInfoServiceClient<Connection>,
inner: WriteInfoServiceClient<GrpcConnection>,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
pub fn new(connection: Connection) -> Self {
Self {
inner: WriteInfoServiceClient::new(channel),
inner: WriteInfoServiceClient::new(connection.into_grpc_connection()),
}
}

View File

@ -22,6 +22,7 @@ pub use generated_types::{google, protobuf_type_url, protobuf_type_url_eq};
pub use client::*;
pub use client_util::connection;
pub use client_util::namespace_translation;
#[cfg(feature = "format")]
/// Output formatting utilities

View File

@ -17,6 +17,7 @@
)]
#![allow(clippy::missing_docs_in_private_items)]
use client_util::connection::GrpcConnection;
use futures_util::TryStreamExt;
use prost::Message;
use std::collections::HashMap;
@ -75,14 +76,14 @@ impl OrgAndBucket {
/// A client for the InfluxDB gRPC storage API
#[derive(Debug)]
pub struct Client {
inner: storage_client::StorageClient<Connection>,
inner: storage_client::StorageClient<GrpcConnection>,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
pub fn new(connection: Connection) -> Self {
Self {
inner: storage_client::StorageClient::new(channel),
inner: storage_client::StorageClient::new(connection.into_grpc_connection()),
}
}

View File

@ -1614,7 +1614,7 @@ mod tests {
use futures::Future;
use generated_types::{i_ox_testing_client::IOxTestingClient, tag_key_predicate::Value};
use influxdb_storage_client::{
connection::{Builder as ConnectionBuilder, Connection},
connection::{Builder as ConnectionBuilder, GrpcConnection},
generated_types::*,
Client as StorageClient, OrgAndBucket,
};
@ -3482,7 +3482,7 @@ mod tests {
// Wrapper around raw clients and test database
struct Fixture {
iox_client: IOxTestingClient<Connection>,
iox_client: IOxTestingClient<GrpcConnection>,
storage_client: StorageClient,
test_storage: Arc<TestDatabaseStore>,
join_handle: JoinHandle<()>,
@ -3542,7 +3542,7 @@ mod tests {
.await
.unwrap();
let iox_client = IOxTestingClient::new(conn.clone());
let iox_client = IOxTestingClient::new(conn.clone().into_grpc_connection());
let storage_client = StorageClient::new(conn);

View File

@ -13,7 +13,7 @@ futures = "0.3"
generated_types = { path = "../generated_types" }
http = "0.2.8"
hyper = "0.14"
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] }
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format"] }
nix = "0.25"
observability_deps = { path = "../observability_deps" }
once_cell = { version = "1.15.0", features = ["parking_lot"] }

View File

@ -6,7 +6,7 @@ use hyper::{Body, Client, Request};
use influxdb_iox_client::{
connection::Connection,
flight::generated_types::ReadInfo,
write::generated_types::{DatabaseBatch, TableBatch, WriteRequest, WriteResponse},
write::generated_types::WriteResponse,
write_info::generated_types::{merge_responses, GetWriteInfoResponse, ShardStatus},
};
use observability_deps::tracing::info;
@ -39,27 +39,6 @@ pub async fn write_to_router(
.expect("http error sending write")
}
/// Writes the table batch to the gRPC write API on the router into the org/bucket (typically on
/// the router)
pub async fn write_to_router_grpc(
table_batches: Vec<TableBatch>,
namespace: impl Into<String>,
router_connection: Connection,
) -> tonic::Response<WriteResponse> {
let request = WriteRequest {
database_batch: Some(DatabaseBatch {
database_name: namespace.into(),
table_batches,
partition_key: Default::default(),
}),
};
influxdb_iox_client::write::Client::new(router_connection)
.write_pb(request)
.await
.expect("grpc error sending write")
}
/// Extracts the write token from the specified response (to the /api/v2/write api)
pub fn get_write_token(response: &Response<Body>) -> String {
let message = format!("no write token in {:?}", response);

View File

@ -1,12 +1,12 @@
use crate::{
dump_log_to_stdout, log_command, rand_id, write_to_router, write_to_router_grpc, ServerFixture,
TestConfig, TestServer,
dump_log_to_stdout, log_command, rand_id, write_to_router, ServerFixture, TestConfig,
TestServer,
};
use assert_cmd::prelude::*;
use futures::{stream::FuturesOrdered, StreamExt};
use http::Response;
use hyper::Body;
use influxdb_iox_client::write::generated_types::{TableBatch, WriteResponse};
use influxdb_iox_client::connection::GrpcConnection;
use observability_deps::tracing::{debug, info};
use once_cell::sync::Lazy;
use std::{
@ -248,19 +248,6 @@ impl MiniCluster {
.await
}
/// Writes the table batch to the gRPC write API on the router into the org/bucket
pub async fn write_to_router_grpc(
&self,
table_batches: Vec<TableBatch>,
) -> tonic::Response<WriteResponse> {
write_to_router_grpc(
table_batches,
&self.namespace,
self.router().router_grpc_connection(),
)
.await
}
/// Get a reference to the mini cluster's other servers.
pub fn other_servers(&self) -> &[ServerFixture] {
self.other_servers.as_ref()
@ -311,6 +298,18 @@ impl MiniCluster {
command.ok().unwrap();
dump_log_to_stdout("compactor run-once", &log_path);
}
/// Create a storage client connected to the querier member of the cluster
pub fn querier_storage_client(
&self,
) -> generated_types::storage_client::StorageClient<GrpcConnection> {
let grpc_connection = self
.querier()
.querier_grpc_connection()
.into_grpc_connection();
generated_types::storage_client::StorageClient::new(grpc_connection)
}
}
/// holds shared server processes to share across tests

View File

@ -1,12 +1,11 @@
use crate::{
get_write_token, get_write_token_from_grpc, run_query, token_is_persisted, wait_for_persisted,
wait_for_readable, MiniCluster,
get_write_token, run_query, token_is_persisted, wait_for_persisted, wait_for_readable,
MiniCluster,
};
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_sorted_eq;
use futures::future::BoxFuture;
use http::StatusCode;
use influxdb_iox_client::write::generated_types::TableBatch;
use observability_deps::tracing::info;
/// Test harness for end to end tests that are comprised of several steps
@ -75,9 +74,6 @@ pub enum Step {
/// endpoint, assert the data was written successfully
WriteLineProtocol(String),
/// Writes the specified `TableBatch`es to the gRPC write API
WriteTableBatches(Vec<TableBatch>),
/// Wait for all previously written data to be readable
WaitForReadable,
@ -155,13 +151,6 @@ impl<'a> StepTest<'a> {
info!("====Done writing line protocol, got token {}", write_token);
state.write_tokens.push(write_token);
}
Step::WriteTableBatches(table_batches) => {
info!("====Begin writing TableBatches to gRPC API");
let response = state.cluster.write_to_router_grpc(table_batches).await;
let write_token = get_write_token_from_grpc(&response);
info!("====Done writing TableBatches, got token {}", write_token);
state.write_tokens.push(write_token);
}
Step::WaitForReadable => {
info!("====Begin waiting for all write tokens to be readable");
let querier_grpc_connection =