test: Add an e2e test of just the ingester's API

pull/24376/head
Carol (Nichols || Goulding) 2023-02-13 16:36:59 -05:00
parent 13ce6da3df
commit 9e27709c84
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
5 changed files with 69 additions and 6 deletions

3
Cargo.lock generated
View File

@ -5766,11 +5766,14 @@ dependencies = [
"assert_cmd", "assert_cmd",
"bytes", "bytes",
"data_types", "data_types",
"dml",
"futures", "futures",
"generated_types", "generated_types",
"http", "http",
"hyper", "hyper",
"influxdb_iox_client", "influxdb_iox_client",
"mutable_batch_lp",
"mutable_batch_pb",
"nix 0.26.2", "nix 0.26.2",
"observability_deps", "observability_deps",
"once_cell", "once_cell",

View File

@ -87,11 +87,15 @@ async fn ingester_flight_api() {
// Don't use a shared cluster because the ingester is going to be restarted // Don't use a shared cluster because the ingester is going to be restarted
let mut cluster = MiniCluster::create_non_shared2(database_url).await; let mut cluster = MiniCluster::create_non_shared2(database_url).await;
// Write some data into the v2 HTTP API ============== // Write some data into the v2 HTTP API to set up the namespace and schema ==============
let lp = format!("{table_name},tag1=A,tag2=B val=42i 123456"); let lp = format!("{table_name},tag1=A,tag2=B val=42i 123456");
let response = cluster.write_to_router(lp).await; let response = cluster.write_to_router(lp).await;
assert_eq!(response.status(), StatusCode::NO_CONTENT); assert_eq!(response.status(), StatusCode::NO_CONTENT);
// Write some data directly into the ingester through its gRPC API
let lp = format!("{table_name},tag1=B,tag2=A val=84i 1234567");
cluster.write_to_ingester(lp, table_name).await;
// query the ingester // query the ingester
let query = IngesterQueryRequest::new( let query = IngesterQueryRequest::new(
cluster.namespace_id().await, cluster.namespace_id().await,
@ -112,6 +116,7 @@ async fn ingester_flight_api() {
"| tag1 | tag2 | time | val |", "| tag1 | tag2 | time | val |",
"+------+------+--------------------------------+-----+", "+------+------+--------------------------------+-----+",
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |", "| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
"| B | A | 1970-01-01T00:00:00.001234567Z | 84 |",
"+------+------+--------------------------------+-----+", "+------+------+--------------------------------+-----+",
]; ];
assert_batches_sorted_eq!(&expected, &ingester_response.record_batches); assert_batches_sorted_eq!(&expected, &ingester_response.record_batches);
@ -136,8 +141,7 @@ async fn ingester_flight_api() {
// Populate the ingester with some data so it returns a successful // Populate the ingester with some data so it returns a successful
// response containing the UUID. // response containing the UUID.
let lp = format!("{table_name},tag1=A,tag2=B val=42i 123456"); let lp = format!("{table_name},tag1=A,tag2=B val=42i 123456");
let response = cluster.write_to_router(lp).await; cluster.write_to_ingester(lp, table_name).await;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
// Query for the new UUID and assert it has changed. // Query for the new UUID and assert it has changed.
let ingester_response = cluster.query_ingester(query).await.unwrap(); let ingester_response = cluster.query_ingester(query).await.unwrap();

View File

@ -12,11 +12,14 @@ arrow_util = { path = "../arrow_util" }
assert_cmd = "2.0.8" assert_cmd = "2.0.8"
bytes = "1.4" bytes = "1.4"
data_types = { path = "../data_types" } data_types = { path = "../data_types" }
dml = { path = "../dml" }
futures = "0.3" futures = "0.3"
generated_types = { path = "../generated_types" } generated_types = { path = "../generated_types" }
http = "0.2.8" http = "0.2.8"
hyper = "0.14" hyper = "0.14"
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format"] } influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format"] }
mutable_batch_lp = { path = "../mutable_batch_lp" }
mutable_batch_pb = { path = "../mutable_batch_pb" }
nix = "0.26" nix = "0.26"
observability_deps = { path = "../observability_deps" } observability_deps = { path = "../observability_deps" }
once_cell = { version = "1.17", features = ["parking_lot"] } once_cell = { version = "1.17", features = ["parking_lot"] }

View File

@ -1,9 +1,17 @@
//! Client helpers for writing end to end ng tests //! Client helpers for writing end to end ng tests
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use data_types::{NamespaceId, TableId};
use dml::{DmlMeta, DmlWrite};
use futures::TryStreamExt; use futures::TryStreamExt;
use http::Response; use http::Response;
use hyper::{Body, Client, Request}; use hyper::{Body, Client, Request};
use influxdb_iox_client::connection::Connection; use influxdb_iox_client::{
connection::Connection,
ingester::generated_types::{write_service_client::WriteServiceClient, WriteRequest},
};
use mutable_batch_lp::lines_to_batches;
use mutable_batch_pb::encode::encode_write;
use tonic::IntoRequest;
/// Writes the line protocol to the write_base/api/v2/write endpoint (typically on the router) /// Writes the line protocol to the write_base/api/v2/write endpoint (typically on the router)
pub async fn write_to_router( pub async fn write_to_router(
@ -32,6 +40,40 @@ pub async fn write_to_router(
.expect("http error sending write") .expect("http error sending write")
} }
/// Writes the line protocol to the WriteService endpoint (typically on the ingester)
pub async fn write_to_ingester(
line_protocol: impl Into<String>,
namespace_id: NamespaceId,
table_id: TableId,
ingester_connection: Connection,
) {
let line_protocol = line_protocol.into();
let writes = lines_to_batches(&line_protocol, 0).unwrap();
let writes = writes
.into_iter()
.map(|(_name, data)| (table_id, data))
.collect();
let mut client = WriteServiceClient::new(ingester_connection.into_grpc_connection());
let op = DmlWrite::new(
namespace_id,
writes,
"1970-01-01".into(),
DmlMeta::unsequenced(None),
);
client
.write(
tonic::Request::new(WriteRequest {
payload: Some(encode_write(namespace_id.get(), &op)),
})
.into_request(),
)
.await
.unwrap();
}
/// Runs a SQL query using the flight API on the specified connection. /// Runs a SQL query using the flight API on the specified connection.
pub async fn try_run_sql( pub async fn try_run_sql(
sql_query: impl Into<String>, sql_query: impl Into<String>,

View File

@ -1,6 +1,6 @@
use crate::{ use crate::{
dump_log_to_stdout, log_command, rand_id, write_to_router, ServerFixture, TestConfig, dump_log_to_stdout, log_command, rand_id, write_to_ingester, write_to_router, ServerFixture,
TestServer, TestConfig, TestServer,
}; };
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use arrow_flight::{ use arrow_flight::{
@ -372,6 +372,17 @@ impl MiniCluster {
.await .await
} }
/// Write to the ingester using the gRPC interface directly, rather than through a router.
pub async fn write_to_ingester(&self, line_protocol: impl Into<String>, table_name: &str) {
write_to_ingester(
line_protocol,
self.namespace_id().await,
self.table_id(table_name).await,
self.ingester().ingester_grpc_connection(),
)
.await;
}
/// Query the ingester using flight directly, rather than through a querier. /// Query the ingester using flight directly, rather than through a querier.
pub async fn query_ingester( pub async fn query_ingester(
&self, &self,