Merge branch 'main' into dom/meta-remove-row-count

pull/24376/head
Dom 2022-05-23 16:39:38 +01:00 committed by GitHub
commit 9cd1286051
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 358 additions and 21 deletions

2
Cargo.lock generated
View File

@ -1754,6 +1754,7 @@ dependencies = [
name = "generated_types"
version = "0.1.0"
dependencies = [
"base64 0.13.0",
"bytes",
"data_types",
"datafusion 0.1.0",
@ -1766,6 +1767,7 @@ dependencies = [
"prost-build",
"query_functions",
"serde",
"snafu",
"tonic",
"tonic-build",
"workspace-hack",

View File

@ -70,4 +70,14 @@ pub struct IngesterConfig {
env = "INFLUXDB_IOX_SKIP_TO_OLDEST_AVAILABLE"
)]
pub skip_to_oldest_available: bool,
/// Sets how often `do_get` flight requests should panic for testing purposes.
///
/// The first N requests will panic. Requests after this will just pass.
#[clap(
long = "--test-flight-do-get-panic",
env = "INFLUXDB_IOX_FLIGHT_DO_GET_PANIC",
default_value = "0"
)]
pub test_flight_do_get_panic: u64,
}

View File

@ -5,6 +5,7 @@ authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2021"
[dependencies] # In alphabetical order
base64 = "0.13"
bytes = "1.0"
data_types = { path = "../data_types", optional = true }
datafusion = { path = "../datafusion", optional = true }
@ -15,6 +16,7 @@ predicate = { path = "../predicate", optional = true }
prost = "0.10"
query_functions = { path = "../query_functions" }
serde = { version = "1.0", features = ["derive"] }
snafu = "0.7"
tonic = "0.7"
workspace-hack = { path = "../workspace-hack"}
@ -23,6 +25,11 @@ tonic-build = "0.7"
prost-build = "0.10"
pbjson-build = "0.3"
[dev-dependencies]
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
predicate = { path = "../predicate" }
[features]
default = ["data_types_conversions"]
data_types_conversions = ["data_types", "datafusion", "predicate"]

View File

@ -4,6 +4,8 @@ use datafusion::{
common::DataFusionError, datafusion_proto::bytes::Serializeable, logical_plan::Expr,
};
use predicate::{Predicate, ValueExpr};
use prost::Message;
use snafu::{ResultExt, Snafu};
fn expr_to_bytes_violation(field: impl Into<String>, e: DataFusionError) -> FieldViolation {
FieldViolation {
@ -194,8 +196,42 @@ impl TryFrom<ValueExpr> for proto::ValueExpr {
}
}
#[derive(Debug, Snafu)]
pub enum EncodeProtoPredicateFromBase64Error {
#[snafu(display("Cannot encode protobuf: {source}"))]
ProtobufEncode { source: prost::EncodeError },
}
/// Encodes [`proto::Predicate`] as base64.
pub fn encode_proto_predicate_as_base64(
predicate: &proto::Predicate,
) -> Result<String, EncodeProtoPredicateFromBase64Error> {
let mut buf = vec![];
predicate.encode(&mut buf).context(ProtobufEncodeSnafu)?;
Ok(base64::encode(&buf))
}
#[derive(Debug, Snafu)]
pub enum DecodeProtoPredicateFromBase64Error {
#[snafu(display("Cannot decode base64: {source}"))]
Base64Decode { source: base64::DecodeError },
#[snafu(display("Cannot decode protobuf: {source}"))]
ProtobufDecode { source: prost::DecodeError },
}
/// Decodes [`proto::Predicate`] from base64 string.
pub fn decode_proto_predicate_from_base64(
s: &str,
) -> Result<proto::Predicate, DecodeProtoPredicateFromBase64Error> {
let predicate_binary = base64::decode(s).context(Base64DecodeSnafu)?;
proto::Predicate::decode(predicate_binary.as_slice()).context(ProtobufDecodeSnafu)
}
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use super::*;
use datafusion::prelude::*;
@ -220,4 +256,19 @@ mod tests {
assert_eq!(rust_query, rust_query_converted);
}
#[test]
fn predicate_proto_base64_roundtrip() {
let predicate = Predicate {
field_columns: Some(BTreeSet::from([String::from("foo"), String::from("bar")])),
partition_key: Some(String::from("pkey")),
range: Some(TimestampRange::new(13, 42)),
exprs: vec![Expr::Wildcard],
value_expr: vec![col("_value").eq(lit("bar")).try_into().unwrap()],
};
let predicate: proto::Predicate = predicate.try_into().unwrap();
let base64 = encode_proto_predicate_as_base64(&predicate).unwrap();
let predicate2 = decode_proto_predicate_from_base64(&base64).unwrap();
assert_eq!(predicate, predicate2);
}
}

View File

@ -1,3 +1,6 @@
use generated_types::ingester::{
decode_proto_predicate_from_base64, DecodeProtoPredicateFromBase64Error,
};
use influxdb_iox_client::{connection::Connection, flight, format::QueryOutputFormat};
use std::str::FromStr;
use thiserror::Error;
@ -9,6 +12,9 @@ pub enum Error {
#[error("Error querying: {0}")]
Query(#[from] influxdb_iox_client::flight::Error),
#[error("Error decoding base64-encoded predicate from argument: {0}")]
PredicateFromBase64(#[from] DecodeProtoPredicateFromBase64Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -27,8 +33,19 @@ pub struct Config {
table: String,
/// The columns to request
#[clap(
long = "--columns",
default_value = "",
multiple_values = true,
use_value_delimiter = true
)]
columns: Vec<String>,
/// Predicate in base64 protobuf encoded form.
/// (logged on error)
#[clap(long = "--predicate-base64")]
predicate_base64: Option<String>,
/// Optional format ('pretty', 'json', or 'csv')
#[clap(short, long, default_value = "pretty")]
format: String,
@ -41,12 +58,16 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
format,
table,
columns,
predicate_base64,
} = config;
let format = QueryOutputFormat::from_str(&format)?;
// TODO it mightbe cool to parse / provide a predicate too
let predicate = None;
let predicate = if let Some(predicate_base64) = predicate_base64 {
Some(decode_proto_predicate_from_base64(&predicate_base64)?)
} else {
None
};
let request = flight::generated_types::IngesterQueryRequest {
table,

View File

@ -347,6 +347,7 @@ impl Config {
persist_partition_age_threshold_seconds,
persist_partition_cold_threshold_seconds,
skip_to_oldest_available,
test_flight_do_get_panic: 0,
};
// create a CompactorConfig for the all in one server based on

View File

@ -310,6 +310,35 @@ async fn query_ingester() {
}
.boxed()
})),
Step::Custom(Box::new(|state: &mut StepTestState| {
async {
let ingester_addr = state.cluster().ingester().ingester_grpc_base().to_string();
let expected = [
"+------+-----+",
"| tag1 | val |",
"+------+-----+",
"| A | 42 |",
"+------+-----+",
]
.join("\n");
// Validate the output of the query
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(&ingester_addr)
.arg("query-ingester")
.arg(state.cluster().namespace())
.arg("my_awesome_table2")
.arg("--columns")
.arg("tag1,val")
.assert()
.success()
.stdout(predicate::str::contains(&expected));
}
.boxed()
})),
],
)
.run()

View File

@ -1,9 +1,11 @@
pub(crate) mod influxrpc;
mod multi_ingester;
use assert_cmd::Command;
use futures::FutureExt;
use predicates::prelude::*;
use test_helpers_end_to_end::{
maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState, TestConfig,
maybe_skip_integration, try_run_query, MiniCluster, Step, StepTest, StepTestState, TestConfig,
};
#[tokio::test]
@ -159,3 +161,103 @@ async fn table_not_found_on_ingester() {
.run()
.await
}
#[tokio::test]
async fn ingester_panic() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
// Set up the cluster ====================================
let router_config = TestConfig::new_router(&database_url);
// can't use standard mini cluster here as we setup the querier to panic
let ingester_config =
TestConfig::new_ingester(&router_config).with_ingester_flight_do_get_panic(2);
let querier_config = TestConfig::new_querier(&ingester_config).with_json_logs();
let mut cluster = MiniCluster::new()
.with_router(router_config)
.await
.with_ingester(ingester_config)
.await
.with_querier(querier_config)
.await;
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocol(format!(
"{},tag1=A,tag2=B val=42i 123456\n\
{},tag1=A,tag2=C val=43i 123457",
table_name, table_name
)),
Step::WaitForReadable,
Step::AssertNotPersisted,
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
// SQL query fails, error is propagated
let sql = format!("select * from {} where tag2='B'", table_name);
let err = try_run_query(
sql,
state.cluster().namespace(),
state.cluster().querier().querier_grpc_connection(),
)
.await
.unwrap_err();
if let influxdb_iox_client::flight::Error::GrpcError(status) = err {
assert_eq!(status.code(), tonic::Code::Internal);
} else {
panic!("wrong error type");
}
// find relevant log line for debugging
let querier_logs =
std::fs::read_to_string(state.cluster().querier().log_path().await)
.unwrap();
let log_line = querier_logs
.split('\n')
.find(|s| s.contains("Failed to perform ingester query"))
.unwrap();
let log_data: serde_json::Value = serde_json::from_str(log_line).unwrap();
let log_data = log_data.as_object().unwrap();
let log_data = log_data["fields"].as_object().unwrap();
// query ingester using debug information
for i in 0..2 {
let assert = Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("-h")
.arg(log_data["ingester_address"].as_str().unwrap())
.arg("query-ingester")
.arg(log_data["namespace"].as_str().unwrap())
.arg(log_data["table"].as_str().unwrap())
.arg("--columns")
.arg(log_data["columns"].as_str().unwrap())
.arg("--predicate-base64")
.arg(log_data["predicate_binary"].as_str().unwrap())
.assert();
// The ingester is configured to fail 2 times, once for the original query and once during
// debugging. The 2nd debug query should work and should only return data for `tag2=B` (not for
// `tag2=C`).
if i == 0 {
assert.failure().stderr(predicate::str::contains(
"Error querying: status: Unknown, message: \"transport error\"",
));
} else {
assert.success().stdout(
predicate::str::contains(
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
)
.and(predicate::str::contains("C").not()),
);
}
}
}
.boxed()
})),
],
)
.run()
.await
}

View File

@ -27,7 +27,7 @@ use rand::Rng;
/// Re-export generated_types
pub mod generated_types {
pub use generated_types::influxdata::iox::{
ingester::v1::{IngesterQueryRequest, IngesterQueryResponseMetadata},
ingester::v1::{IngesterQueryRequest, IngesterQueryResponseMetadata, Predicate},
querier::v1::*,
};
}

View File

@ -21,7 +21,14 @@ use observability_deps::tracing::{info, warn};
use pin_project::{pin_project, pinned_drop};
use prost::Message;
use snafu::{ResultExt, Snafu};
use std::{pin::Pin, sync::Arc, task::Poll};
use std::{
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
task::Poll,
};
use tokio::task::JoinHandle;
use tonic::{Request, Response, Streaming};
use trace::ctx::SpanContext;
@ -32,19 +39,28 @@ use write_summary::WriteSummary;
#[derive(Debug, Default)]
pub struct GrpcDelegate<I: IngestHandler> {
ingest_handler: Arc<I>,
/// How many `do_get` flight requests should panic for testing purposes.
///
/// Every panic will decrease the counter until it reaches zero. At zero, no panics will occur.
test_flight_do_get_panic: Arc<AtomicU64>,
}
impl<I: IngestHandler + Send + Sync + 'static> GrpcDelegate<I> {
/// Initialise a new [`GrpcDelegate`] passing valid requests to the
/// specified `ingest_handler`.
pub fn new(ingest_handler: Arc<I>) -> Self {
Self { ingest_handler }
pub fn new(ingest_handler: Arc<I>, test_flight_do_get_panic: Arc<AtomicU64>) -> Self {
Self {
ingest_handler,
test_flight_do_get_panic,
}
}
/// Acquire an Arrow Flight gRPC service implementation.
pub fn flight_service(&self) -> FlightServer<impl Flight> {
FlightServer::new(FlightService {
ingest_handler: Arc::clone(&self.ingest_handler),
test_flight_do_get_panic: Arc::clone(&self.test_flight_do_get_panic),
})
}
@ -200,6 +216,34 @@ impl Error {
#[derive(Debug)]
struct FlightService<I: IngestHandler + Send + Sync + 'static> {
ingest_handler: Arc<I>,
/// How many `do_get` flight requests should panic for testing purposes.
///
/// Every panic will decrease the counter until it reaches zero. At zero, no panics will occur.
test_flight_do_get_panic: Arc<AtomicU64>,
}
impl<I> FlightService<I>
where
I: IngestHandler + Send + Sync + 'static,
{
fn maybe_panic_in_flight_do_get(&self) {
loop {
let current = self.test_flight_do_get_panic.load(Ordering::SeqCst);
if current == 0 {
return;
}
if self
.test_flight_do_get_panic
.compare_exchange(current, current - 1, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
break;
}
}
panic!("Panicking in `do_get` for testing purposes.");
}
}
type TonicStream<T> = Pin<Box<dyn Stream<Item = Result<T, tonic::Status>> + Send + Sync + 'static>>;
@ -235,6 +279,8 @@ impl<I: IngestHandler + Send + Sync + 'static> Flight for FlightService<I> {
let query_request = proto_query_request.try_into().context(InvalidQuerySnafu)?;
self.maybe_panic_in_flight_do_get();
let query_response =
self.ingest_handler
.query(query_request)

View File

@ -22,7 +22,7 @@ use object_store::DynObjectStore;
use std::{
collections::BTreeMap,
fmt::{Debug, Display},
sync::Arc,
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
use thiserror::Error;
@ -198,7 +198,10 @@ pub async fn create_ingester_server_type(
.await?,
);
let http = HttpDelegate::new(Arc::clone(&ingest_handler));
let grpc = GrpcDelegate::new(Arc::clone(&ingest_handler));
let grpc = GrpcDelegate::new(
Arc::clone(&ingest_handler),
Arc::new(AtomicU64::new(ingester_config.test_flight_do_get_panic)),
);
let ingester = IngesterServer::new(metric_registry, http, grpc, ingest_handler);
let server_type = Arc::new(IngesterServerType::new(ingester, common_state));

View File

@ -13,7 +13,8 @@ use data_types::{
use datafusion_util::MemoryStream;
use futures::{stream::FuturesUnordered, TryStreamExt};
use generated_types::{
influxdata::iox::ingester::v1::GetWriteInfoResponse, ingester::IngesterQueryRequest,
influxdata::iox::ingester::v1::GetWriteInfoResponse,
ingester::{encode_proto_predicate_as_base64, IngesterQueryRequest},
write_info::merge_responses,
};
use iox_query::{
@ -21,7 +22,7 @@ use iox_query::{
util::compute_timenanosecond_min_max,
QueryChunk, QueryChunkError, QueryChunkMeta,
};
use observability_deps::tracing::{debug, trace};
use observability_deps::tracing::{debug, trace, warn};
use predicate::{Predicate, PredicateMatch};
use schema::{selection::Selection, sort::SortKey, InfluxColumnType, InfluxFieldType, Schema};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
@ -247,9 +248,25 @@ async fn execute(request: GetPartitionForIngester<'_>) -> Result<Vec<Arc<Ingeste
return Ok(vec![]);
}
}
let mut perform_query = query_res.context(RemoteQuerySnafu {
ingester_address: ingester_address.as_ref(),
})?;
let mut perform_query = query_res
.context(RemoteQuerySnafu {
ingester_address: ingester_address.as_ref(),
})
.map_err(|e| {
// generate a warning that is sufficient to replicate the request using CLI tooling
warn!(
ingester_address=ingester_address.as_ref(),
namespace=namespace_name.as_ref(),
table=table_name.as_ref(),
columns=columns.join(",").as_str(),
predicate_str=%predicate,
predicate_binary=encode_predicate_as_base64(predicate).as_str(),
"Failed to perform ingester query",
);
// need to return error until https://github.com/rust-lang/rust/issues/91345 is stable
e
})?;
// read unpersisted partitions
// map partition_id -> (PartitionMetadata, Vec<PartitionData>))
@ -322,6 +339,22 @@ async fn execute(request: GetPartitionForIngester<'_>) -> Result<Vec<Arc<Ingeste
Ok(ingester_partitions)
}
fn encode_predicate_as_base64(predicate: &Predicate) -> String {
use generated_types::influxdata::iox::ingester::v1::Predicate as ProtoPredicate;
let predicate = match ProtoPredicate::try_from(predicate.clone()) {
Ok(predicate) => predicate,
Err(_) => {
return String::from("<invalid>");
}
};
match encode_proto_predicate_as_base64(&predicate) {
Ok(s) => s,
Err(_) => String::from("<encoding-error>"),
}
}
#[async_trait]
impl IngesterConnection for IngesterConnectionImpl {
/// Retrieve chunks from the ingester for the particular table and

View File

@ -113,6 +113,9 @@ impl Error {
Self::Query { .. } => Status::internal(self.to_string()),
Self::InvalidDatabaseName { .. } => Status::invalid_argument(self.to_string()),
Self::InvalidRecordBatch { .. } => Status::internal(self.to_string()),
Self::Planning {
source: service_common::planner::Error::External(_),
} => Status::internal(self.to_string()),
Self::Planning { .. } => Status::invalid_argument(self.to_string()),
Self::DictionaryError { .. } => Status::internal(self.to_string()),
Self::Serialization { .. } => Status::internal(self.to_string()),

View File

@ -222,27 +222,41 @@ pub fn all_persisted(res: &GetWriteInfoResponse) -> bool {
.all(|info| matches!(info.status(), KafkaPartitionStatus::Persisted))
}
/// Runs a query using the flight API on the specified connection
pub async fn run_query(
/// Runs a query using the flight API on the specified connection.
///
/// This is similar ot [`run_query`] but does NOT unwrap the result.
pub async fn try_run_query(
sql: impl Into<String>,
namespace: impl Into<String>,
querier_connection: Connection,
) -> Vec<RecordBatch> {
) -> Result<Vec<RecordBatch>, influxdb_iox_client::flight::Error> {
let namespace = namespace.into();
let sql = sql.into();
let mut client = influxdb_iox_client::flight::Client::new(querier_connection);
// This does nothing except test the client handshake implementation.
client.handshake().await.unwrap();
client.handshake().await?;
let mut response = client
.perform_query(ReadInfo {
namespace_name: namespace,
sql_query: sql,
})
.await
.expect("Error performing query");
.await?;
response.collect().await.expect("Error executing query")
response.collect().await
}
/// Runs a query using the flight API on the specified connection.
///
/// Use [`try_run_query`] if you want to check the error manually.
pub async fn run_query(
sql: impl Into<String>,
namespace: impl Into<String>,
querier_connection: Connection,
) -> Vec<RecordBatch> {
try_run_query(sql, namespace, querier_connection)
.await
.expect("Error executing query")
}

View File

@ -241,6 +241,16 @@ impl TestConfig {
.copy_env("INFLUXDB_IOX_DB_DIR", other)
}
/// Configures ingester to panic in flight `do_get` requests.
pub fn with_ingester_flight_do_get_panic(self, times: u64) -> Self {
self.with_env("INFLUXDB_IOX_FLIGHT_DO_GET_PANIC", times.to_string())
}
/// Changes the log to JSON for easier parsing.
pub fn with_json_logs(self) -> Self {
self.with_env("LOG_FORMAT", "json")
}
/// Get the test config's server type.
#[must_use]
pub fn server_type(&self) -> ServerType {

View File

@ -111,6 +111,11 @@ impl ServerFixture {
self.server.addrs().querier_grpc_api().client_base()
}
/// Return log path for server process.
pub async fn log_path(&self) -> Box<Path> {
self.server.server_process.lock().await.log_path.clone()
}
/// Get a weak reference to the underlying `TestServer`
pub(crate) fn weak(&self) -> Weak<TestServer> {
Arc::downgrade(&self.server)