fix: unknown namespace/table in querier<>ingester flight protocol (#4307)
* fix: return "not found" gRPC error instead of "internal" when ingester does not know table * fix: properly handle "namespace not found" in ingester queries * fix: make `initialize_db` work with async code * test: add custom step for NG tests * fix: handle "unknown table/namespace" resp. in querier * docs: explain test setup Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>pull/24376/head
parent
136e052e33
commit
351b0d0c15
|
@ -85,3 +85,80 @@ async fn ingester_flight_api() {
|
|||
);
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ingester_flight_api_namespace_not_found() {
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
let table_name = "mytable";
|
||||
|
||||
// Set up cluster
|
||||
let router2_config = TestConfig::new_router2(&database_url);
|
||||
let ingester_config = TestConfig::new_ingester(&router2_config);
|
||||
let cluster = MiniCluster::new().with_ingester(ingester_config).await;
|
||||
|
||||
let mut querier_flight = influxdb_iox_client::flight::Client::<
|
||||
influxdb_iox_client::flight::generated_types::IngesterQueryRequest,
|
||||
>::new(cluster.ingester().ingester_grpc_connection());
|
||||
|
||||
let query = IngesterQueryRequest::new(
|
||||
String::from("does_not_exist"),
|
||||
table_name.into(),
|
||||
vec![],
|
||||
Some(::predicate::EMPTY_PREDICATE),
|
||||
);
|
||||
|
||||
let err = querier_flight
|
||||
.perform_query(query.try_into().unwrap())
|
||||
.await
|
||||
.unwrap_err();
|
||||
if let influxdb_iox_client::flight::Error::GrpcError(status) = err {
|
||||
assert_eq!(status.code(), tonic::Code::NotFound);
|
||||
} else {
|
||||
panic!("Wrong error variant: {err}")
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ingester_flight_api_table_not_found() {
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
// Set up cluster
|
||||
let router2_config = TestConfig::new_router2(&database_url);
|
||||
let ingester_config = TestConfig::new_ingester(&router2_config);
|
||||
let cluster = MiniCluster::new()
|
||||
.with_router2(router2_config)
|
||||
.await
|
||||
.with_ingester(ingester_config)
|
||||
.await;
|
||||
|
||||
// Write some data into the v2 HTTP API ==============
|
||||
let lp = String::from("my_table,tag1=A,tag2=B val=42i 123456");
|
||||
let response = cluster.write_to_router(lp).await;
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// wait for the write to become visible
|
||||
let write_token = get_write_token(&response);
|
||||
wait_for_readable(write_token, cluster.ingester().ingester_grpc_connection()).await;
|
||||
|
||||
let mut querier_flight = influxdb_iox_client::flight::Client::<
|
||||
influxdb_iox_client::flight::generated_types::IngesterQueryRequest,
|
||||
>::new(cluster.ingester().ingester_grpc_connection());
|
||||
|
||||
let query = IngesterQueryRequest::new(
|
||||
cluster.namespace().to_string(),
|
||||
String::from("does_not_exist"),
|
||||
vec![],
|
||||
Some(::predicate::EMPTY_PREDICATE),
|
||||
);
|
||||
|
||||
let err = querier_flight
|
||||
.perform_query(query.try_into().unwrap())
|
||||
.await
|
||||
.unwrap_err();
|
||||
if let influxdb_iox_client::flight::Error::GrpcError(status) = err {
|
||||
assert_eq!(status.code(), tonic::Code::NotFound);
|
||||
} else {
|
||||
panic!("Wrong error variant: {err}")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
use futures::FutureExt;
|
||||
use test_helpers_end_to_end_ng::{maybe_skip_integration, MiniCluster, Step, StepTest, TestConfig};
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -11,7 +12,7 @@ async fn basic_ingester() {
|
|||
let querier_config = TestConfig::new_querier(&ingester_config);
|
||||
|
||||
// Set up the cluster ====================================
|
||||
let cluster = MiniCluster::new()
|
||||
let mut cluster = MiniCluster::new()
|
||||
.with_router2(router2_config)
|
||||
.await
|
||||
.with_ingester(ingester_config)
|
||||
|
@ -20,7 +21,7 @@ async fn basic_ingester() {
|
|||
.await;
|
||||
|
||||
StepTest::new(
|
||||
&cluster,
|
||||
&mut cluster,
|
||||
vec![
|
||||
Step::WriteLineProtocol(format!(
|
||||
"{},tag1=A,tag2=B val=42i 123456\n\
|
||||
|
@ -58,7 +59,7 @@ async fn basic_on_parquet() {
|
|||
let querier_config = TestConfig::new_querier(&ingester_config);
|
||||
|
||||
// Set up the cluster ====================================
|
||||
let cluster = MiniCluster::new()
|
||||
let mut cluster = MiniCluster::new()
|
||||
.with_router2(router2_config)
|
||||
.await
|
||||
.with_ingester(ingester_config)
|
||||
|
@ -67,7 +68,7 @@ async fn basic_on_parquet() {
|
|||
.await;
|
||||
|
||||
StepTest::new(
|
||||
&cluster,
|
||||
&mut cluster,
|
||||
vec![
|
||||
Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123456", table_name)),
|
||||
// Wait for data to be persisted to parquet
|
||||
|
@ -102,7 +103,7 @@ async fn basic_no_ingster_connection() {
|
|||
let querier_config = TestConfig::new_querier_without_ingester(&ingester_config);
|
||||
|
||||
// Set up the cluster ====================================
|
||||
let cluster = MiniCluster::new()
|
||||
let mut cluster = MiniCluster::new()
|
||||
.with_router2(router2_config)
|
||||
.await
|
||||
.with_ingester(ingester_config)
|
||||
|
@ -112,7 +113,7 @@ async fn basic_no_ingster_connection() {
|
|||
|
||||
// Write some data into the v2 HTTP API ==============
|
||||
StepTest::new(
|
||||
&cluster,
|
||||
&mut cluster,
|
||||
vec![
|
||||
Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123456", table_name)),
|
||||
// Wait for data to be persisted to parquet
|
||||
|
@ -132,3 +133,51 @@ async fn basic_no_ingster_connection() {
|
|||
.run()
|
||||
.await
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn table_not_found_on_ingester() {
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
let table_name = "the_table";
|
||||
|
||||
let router2_config = TestConfig::new_router2(&database_url);
|
||||
// fast parquet
|
||||
let ingester_config = TestConfig::new_ingester(&router2_config).with_fast_parquet_generation();
|
||||
let querier_config = TestConfig::new_querier(&ingester_config);
|
||||
|
||||
// Set up the cluster ====================================
|
||||
let mut cluster = MiniCluster::new()
|
||||
.with_router2(router2_config)
|
||||
.await
|
||||
.with_ingester(ingester_config)
|
||||
.await
|
||||
.with_querier(querier_config)
|
||||
.await;
|
||||
|
||||
StepTest::new(
|
||||
&mut cluster,
|
||||
vec![
|
||||
Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123456", table_name)),
|
||||
Step::WaitForPersisted,
|
||||
Step::WriteLineProtocol(String::from("other_table,tag1=A,tag2=B val=42i 123456")),
|
||||
Step::WaitForPersisted,
|
||||
// Restart the ingester so that it does not have any table data in memory
|
||||
// and so will return "not found" to the querier
|
||||
Step::Custom(Box::new(|cluster: &mut MiniCluster| {
|
||||
cluster.restart_ingester().boxed()
|
||||
})),
|
||||
Step::Query {
|
||||
sql: format!("select * from {}", table_name),
|
||||
expected: vec![
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| tag1 | tag2 | time | val |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
],
|
||||
},
|
||||
],
|
||||
)
|
||||
.run()
|
||||
.await
|
||||
}
|
||||
|
|
|
@ -57,6 +57,12 @@ pub enum Error {
|
|||
#[snafu(display("Error collecting a stream of record batches: {}", source))]
|
||||
CollectStream { source: DataFusionError },
|
||||
|
||||
#[snafu(display(
|
||||
"No Namespace Data found for the given namespace name {}",
|
||||
namespace_name,
|
||||
))]
|
||||
NamespaceNotFound { namespace_name: String },
|
||||
|
||||
#[snafu(display(
|
||||
"No Table Data found for the given namespace name {}, table name {}",
|
||||
namespace_name,
|
||||
|
@ -87,13 +93,20 @@ pub async fn prepare_data_to_querier(
|
|||
) -> Result<IngesterQueryResponse> {
|
||||
let mut schema_merger = SchemaMerger::new();
|
||||
let mut unpersisted_partitions = BTreeMap::new();
|
||||
let mut found_namespace = false;
|
||||
let mut batches = vec![];
|
||||
for sequencer_data in ingest_data.sequencers.values() {
|
||||
let maybe_table_data = sequencer_data
|
||||
.namespace(&request.namespace)
|
||||
.and_then(|namespace_data| namespace_data.table_data(&request.table));
|
||||
let namespace_data = match sequencer_data.namespace(&request.namespace) {
|
||||
Some(namespace_data) => {
|
||||
found_namespace = true;
|
||||
namespace_data
|
||||
}
|
||||
None => {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let table_data = match maybe_table_data {
|
||||
let table_data = match namespace_data.table_data(&request.table) {
|
||||
Some(table_data) => table_data,
|
||||
None => {
|
||||
continue;
|
||||
|
@ -124,12 +137,18 @@ pub async fn prepare_data_to_querier(
|
|||
}
|
||||
}
|
||||
|
||||
ensure!(
|
||||
found_namespace,
|
||||
NamespaceNotFoundSnafu {
|
||||
namespace_name: &request.namespace,
|
||||
},
|
||||
);
|
||||
ensure!(
|
||||
!unpersisted_partitions.is_empty(),
|
||||
TableNotFoundSnafu {
|
||||
namespace_name: &request.namespace,
|
||||
table_name: &request.table
|
||||
}
|
||||
},
|
||||
);
|
||||
let schema = schema_merger.build();
|
||||
|
||||
|
@ -336,6 +355,7 @@ mod tests {
|
|||
make_queryable_batch_with_deletes, DataLocation, TEST_NAMESPACE, TEST_TABLE,
|
||||
};
|
||||
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
|
||||
use assert_matches::assert_matches;
|
||||
use datafusion::logical_plan::{col, lit};
|
||||
use predicate::PredicateBuilder;
|
||||
|
||||
|
@ -548,6 +568,26 @@ mod tests {
|
|||
assert_batches_sorted_eq!(&expected, &result);
|
||||
assert_batches_have_schema_metadata(&result);
|
||||
}
|
||||
|
||||
// test "table not found" handling
|
||||
request.table = String::from("table_does_not_exist");
|
||||
for (loc, scenario) in &scenarios {
|
||||
println!("Location: {loc:?}");
|
||||
let err = prepare_data_to_querier(scenario, &request)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(err, Error::TableNotFound { .. });
|
||||
}
|
||||
|
||||
// test "namespace not found" handling
|
||||
request.namespace = String::from("namespace_does_not_exist");
|
||||
for (loc, scenario) in &scenarios {
|
||||
println!("Location: {loc:?}");
|
||||
let err = prepare_data_to_querier(scenario, &request)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(err, Error::NamespaceNotFound { .. });
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
@ -138,6 +138,22 @@ pub enum Error {
|
|||
source: Box<crate::querier_handler::Error>,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"No Namespace Data found for the given namespace name {}",
|
||||
namespace_name,
|
||||
))]
|
||||
NamespaceNotFound { namespace_name: String },
|
||||
|
||||
#[snafu(display(
|
||||
"No Table Data found for the given namespace name {}, table name {}",
|
||||
namespace_name,
|
||||
table_name
|
||||
))]
|
||||
TableNotFound {
|
||||
namespace_name: String,
|
||||
table_name: String,
|
||||
},
|
||||
|
||||
#[snafu(display("Error while streaming query results: {}", source))]
|
||||
QueryStream { source: ArrowError },
|
||||
|
||||
|
@ -152,7 +168,11 @@ impl From<Error> for tonic::Status {
|
|||
// new error variants.
|
||||
let msg = "Error handling Flight gRPC request";
|
||||
match err {
|
||||
Error::InvalidTicket { .. } | Error::InvalidQuery { .. } | Error::Query { .. } => {
|
||||
Error::InvalidTicket { .. }
|
||||
| Error::InvalidQuery { .. }
|
||||
| Error::Query { .. }
|
||||
| Error::NamespaceNotFound { .. }
|
||||
| Error::TableNotFound { .. } => {
|
||||
// TODO(edd): this should be `debug`. Keeping at info whilst IOx still in early
|
||||
// development
|
||||
info!(?err, msg)
|
||||
|
@ -179,6 +199,9 @@ impl Error {
|
|||
| Self::Dictionary { .. }
|
||||
| Self::QueryStream { .. }
|
||||
| Self::Serialization { .. } => Status::internal(self.to_string()),
|
||||
Self::NamespaceNotFound { .. } | Self::TableNotFound { .. } => {
|
||||
Status::not_found(self.to_string())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -222,12 +245,25 @@ impl<I: IngestHandler + Send + Sync + 'static> Flight for FlightService<I> {
|
|||
|
||||
let query_request = proto_query_request.try_into().context(InvalidQuerySnafu)?;
|
||||
|
||||
let query_response = self
|
||||
.ingest_handler
|
||||
.query(query_request)
|
||||
.await
|
||||
.map_err(Box::new)
|
||||
.context(QuerySnafu)?;
|
||||
let query_response =
|
||||
self.ingest_handler
|
||||
.query(query_request)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
crate::querier_handler::Error::NamespaceNotFound { namespace_name } => {
|
||||
Error::NamespaceNotFound { namespace_name }
|
||||
}
|
||||
crate::querier_handler::Error::TableNotFound {
|
||||
namespace_name,
|
||||
table_name,
|
||||
} => Error::TableNotFound {
|
||||
namespace_name,
|
||||
table_name,
|
||||
},
|
||||
_ => Error::Query {
|
||||
source: Box::new(e),
|
||||
},
|
||||
})?;
|
||||
|
||||
let output = GetStream::new(query_response).await?;
|
||||
|
||||
|
|
|
@ -57,6 +57,12 @@ pub enum Error {
|
|||
ingester_address: String,
|
||||
source: FlightError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed ingester query '{}': {}", ingester_address, source))]
|
||||
RemoteQuery {
|
||||
ingester_address: String,
|
||||
source: FlightError,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -118,7 +124,12 @@ impl IngesterConnection for IngesterConnectionImpl {
|
|||
|
||||
// TODO make these requests in parallel
|
||||
for ingester_address in &self.ingester_addresses {
|
||||
debug!(%ingester_address, %table_name, "Connecting to ingester");
|
||||
debug!(
|
||||
%ingester_address,
|
||||
%namespace_name,
|
||||
%table_name,
|
||||
"Connecting to ingester",
|
||||
);
|
||||
// TODO maybe cache this connection
|
||||
let connection = connection::Builder::new()
|
||||
.build(ingester_address)
|
||||
|
@ -146,10 +157,19 @@ impl IngesterConnection for IngesterConnectionImpl {
|
|||
.try_into()
|
||||
.context(CreatingRequestSnafu)?;
|
||||
|
||||
let mut perform_query = client
|
||||
.perform_query(ingester_query_request)
|
||||
.await
|
||||
.expect("error performing query");
|
||||
let query_res = client.perform_query(ingester_query_request).await;
|
||||
if let Err(FlightError::GrpcError(status)) = &query_res {
|
||||
if status.code() == tonic::Code::NotFound {
|
||||
debug!(
|
||||
%ingester_address,
|
||||
%namespace_name,
|
||||
%table_name,
|
||||
"Ingester does not know namespace or table, skipping",
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
let mut perform_query = query_res.context(RemoteQuerySnafu { ingester_address })?;
|
||||
|
||||
// Gather up all the results (todo pass along partition information in metadata)
|
||||
let batches = perform_query.collect().await.expect("collecting");
|
||||
|
|
|
@ -3,15 +3,15 @@
|
|||
use assert_cmd::Command;
|
||||
use once_cell::sync::Lazy;
|
||||
use sqlx::{migrate::MigrateDatabase, Postgres};
|
||||
use std::{collections::BTreeSet, sync::Mutex};
|
||||
use std::collections::BTreeSet;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
// I really do want to block everything until the database is initialized...
|
||||
#[allow(clippy::await_holding_lock)]
|
||||
static DB_INITIALIZED: Lazy<Mutex<BTreeSet<String>>> = Lazy::new(|| Mutex::new(BTreeSet::new()));
|
||||
|
||||
/// Performs once-per-process database initialization, if necessary
|
||||
pub async fn initialize_db(dsn: &str, schema_name: &str) {
|
||||
let mut init = DB_INITIALIZED.lock().expect("Mutex poisoned");
|
||||
let mut init = DB_INITIALIZED.lock().await;
|
||||
|
||||
// already done
|
||||
if init.contains(schema_name) {
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
use futures::future::BoxFuture;
|
||||
use http::StatusCode;
|
||||
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
|
@ -9,12 +10,17 @@ use crate::{
|
|||
|
||||
/// Test harness for end to end tests that are comprised of several steps
|
||||
pub struct StepTest<'a> {
|
||||
cluster: &'a MiniCluster,
|
||||
cluster: &'a mut MiniCluster,
|
||||
|
||||
/// The test steps to perform
|
||||
steps: Vec<Step>,
|
||||
}
|
||||
|
||||
/// Function used for custom [`Step`]s.
|
||||
///
|
||||
/// It is an async function that receives a mutable reference to [`MiniCluster`].
|
||||
pub type FCustom = Box<dyn for<'b> Fn(&'b mut MiniCluster) -> BoxFuture<'b, ()>>;
|
||||
|
||||
/// Possible test steps that a test can perform
|
||||
pub enum Step {
|
||||
/// Writes the specified line protocol to the `/api/v2/write`
|
||||
|
@ -35,12 +41,15 @@ pub enum Step {
|
|||
sql: String,
|
||||
expected: Vec<&'static str>,
|
||||
},
|
||||
|
||||
/// A custom step that can be used to implement special cases that are only used once.
|
||||
Custom(FCustom),
|
||||
}
|
||||
|
||||
impl<'a> StepTest<'a> {
|
||||
/// Create a new test that runs each `step`, in sequence, against
|
||||
/// `cluster` panic'ing if any step fails
|
||||
pub fn new(cluster: &'a MiniCluster, steps: Vec<Step>) -> Self {
|
||||
pub fn new(cluster: &'a mut MiniCluster, steps: Vec<Step>) -> Self {
|
||||
Self { cluster, steps }
|
||||
}
|
||||
|
||||
|
@ -102,6 +111,11 @@ impl<'a> StepTest<'a> {
|
|||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
println!("====Done running");
|
||||
}
|
||||
Step::Custom(f) => {
|
||||
println!("====Begin custom step");
|
||||
f(cluster).await;
|
||||
println!("====Done custom step");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue