From 9e27709c84a96b9b376a61477e26f28ae98208d0 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 13 Feb 2023 16:36:59 -0500 Subject: [PATCH] test: Add an e2e test of just the ingester's API --- Cargo.lock | 3 ++ .../tests/end_to_end_cases/ingester.rs | 10 +++-- test_helpers_end_to_end/Cargo.toml | 3 ++ test_helpers_end_to_end/src/client.rs | 44 ++++++++++++++++++- test_helpers_end_to_end/src/mini_cluster.rs | 15 ++++++- 5 files changed, 69 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 18ae8dfbbf..daeacbaa91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5766,11 +5766,14 @@ dependencies = [ "assert_cmd", "bytes", "data_types", + "dml", "futures", "generated_types", "http", "hyper", "influxdb_iox_client", + "mutable_batch_lp", + "mutable_batch_pb", "nix 0.26.2", "observability_deps", "once_cell", diff --git a/influxdb_iox/tests/end_to_end_cases/ingester.rs b/influxdb_iox/tests/end_to_end_cases/ingester.rs index bde89e437f..cf49418155 100644 --- a/influxdb_iox/tests/end_to_end_cases/ingester.rs +++ b/influxdb_iox/tests/end_to_end_cases/ingester.rs @@ -87,11 +87,15 @@ async fn ingester_flight_api() { // Don't use a shared cluster because the ingester is going to be restarted let mut cluster = MiniCluster::create_non_shared2(database_url).await; - // Write some data into the v2 HTTP API ============== + // Write some data into the v2 HTTP API to set up the namespace and schema ============== let lp = format!("{table_name},tag1=A,tag2=B val=42i 123456"); let response = cluster.write_to_router(lp).await; assert_eq!(response.status(), StatusCode::NO_CONTENT); + // Write some data directly into the ingester through its gRPC API + let lp = format!("{table_name},tag1=B,tag2=A val=84i 1234567"); + cluster.write_to_ingester(lp, table_name).await; + // query the ingester let query = IngesterQueryRequest::new( cluster.namespace_id().await, @@ -112,6 +116,7 @@ async fn ingester_flight_api() { "| tag1 | tag2 | time | val |", "+------+------+--------------------------------+-----+", "| A | B | 1970-01-01T00:00:00.000123456Z | 42 |", + "| B | A | 1970-01-01T00:00:00.001234567Z | 84 |", "+------+------+--------------------------------+-----+", ]; assert_batches_sorted_eq!(&expected, &ingester_response.record_batches); @@ -136,8 +141,7 @@ async fn ingester_flight_api() { // Populate the ingester with some data so it returns a successful // response containing the UUID. let lp = format!("{table_name},tag1=A,tag2=B val=42i 123456"); - let response = cluster.write_to_router(lp).await; - assert_eq!(response.status(), StatusCode::NO_CONTENT); + cluster.write_to_ingester(lp, table_name).await; // Query for the new UUID and assert it has changed. let ingester_response = cluster.query_ingester(query).await.unwrap(); diff --git a/test_helpers_end_to_end/Cargo.toml b/test_helpers_end_to_end/Cargo.toml index 8ef7949b63..970d36bdaf 100644 --- a/test_helpers_end_to_end/Cargo.toml +++ b/test_helpers_end_to_end/Cargo.toml @@ -12,11 +12,14 @@ arrow_util = { path = "../arrow_util" } assert_cmd = "2.0.8" bytes = "1.4" data_types = { path = "../data_types" } +dml = { path = "../dml" } futures = "0.3" generated_types = { path = "../generated_types" } http = "0.2.8" hyper = "0.14" influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format"] } +mutable_batch_lp = { path = "../mutable_batch_lp" } +mutable_batch_pb = { path = "../mutable_batch_pb" } nix = "0.26" observability_deps = { path = "../observability_deps" } once_cell = { version = "1.17", features = ["parking_lot"] } diff --git a/test_helpers_end_to_end/src/client.rs b/test_helpers_end_to_end/src/client.rs index aecd154405..818f1795cb 100644 --- a/test_helpers_end_to_end/src/client.rs +++ b/test_helpers_end_to_end/src/client.rs @@ -1,9 +1,17 @@ //! Client helpers for writing end to end ng tests use arrow::record_batch::RecordBatch; +use data_types::{NamespaceId, TableId}; +use dml::{DmlMeta, DmlWrite}; use futures::TryStreamExt; use http::Response; use hyper::{Body, Client, Request}; -use influxdb_iox_client::connection::Connection; +use influxdb_iox_client::{ + connection::Connection, + ingester::generated_types::{write_service_client::WriteServiceClient, WriteRequest}, +}; +use mutable_batch_lp::lines_to_batches; +use mutable_batch_pb::encode::encode_write; +use tonic::IntoRequest; /// Writes the line protocol to the write_base/api/v2/write endpoint (typically on the router) pub async fn write_to_router( @@ -32,6 +40,40 @@ pub async fn write_to_router( .expect("http error sending write") } +/// Writes the line protocol to the WriteService endpoint (typically on the ingester) +pub async fn write_to_ingester( + line_protocol: impl Into, + namespace_id: NamespaceId, + table_id: TableId, + ingester_connection: Connection, +) { + let line_protocol = line_protocol.into(); + let writes = lines_to_batches(&line_protocol, 0).unwrap(); + let writes = writes + .into_iter() + .map(|(_name, data)| (table_id, data)) + .collect(); + + let mut client = WriteServiceClient::new(ingester_connection.into_grpc_connection()); + + let op = DmlWrite::new( + namespace_id, + writes, + "1970-01-01".into(), + DmlMeta::unsequenced(None), + ); + + client + .write( + tonic::Request::new(WriteRequest { + payload: Some(encode_write(namespace_id.get(), &op)), + }) + .into_request(), + ) + .await + .unwrap(); +} + /// Runs a SQL query using the flight API on the specified connection. pub async fn try_run_sql( sql_query: impl Into, diff --git a/test_helpers_end_to_end/src/mini_cluster.rs b/test_helpers_end_to_end/src/mini_cluster.rs index 07b5cb1f0d..4ff27bf09b 100644 --- a/test_helpers_end_to_end/src/mini_cluster.rs +++ b/test_helpers_end_to_end/src/mini_cluster.rs @@ -1,6 +1,6 @@ use crate::{ - dump_log_to_stdout, log_command, rand_id, write_to_router, ServerFixture, TestConfig, - TestServer, + dump_log_to_stdout, log_command, rand_id, write_to_ingester, write_to_router, ServerFixture, + TestConfig, TestServer, }; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use arrow_flight::{ @@ -372,6 +372,17 @@ impl MiniCluster { .await } + /// Write to the ingester using the gRPC interface directly, rather than through a router. + pub async fn write_to_ingester(&self, line_protocol: impl Into, table_name: &str) { + write_to_ingester( + line_protocol, + self.namespace_id().await, + self.table_id(table_name).await, + self.ingester().ingester_grpc_connection(), + ) + .await; + } + /// Query the ingester using flight directly, rather than through a querier. pub async fn query_ingester( &self,