Merge branch 'main' into cn/ingester-persist-tick

pull/24376/head
Dom 2023-01-13 12:31:45 +00:00 committed by GitHub
commit f7ff877582
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 764 additions and 328 deletions

5
Cargo.lock generated
View File

@ -2681,9 +2681,9 @@ checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
[[package]]
name = "io-lifetimes"
version = "1.0.3"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46112a93252b123d31a119a8d1a1ac19deac4fac6e0e8b0df58f0d4e5870e63c"
checksum = "e7d6c6f8c91b4b9ed43484ad1a938e393caf35960fce7f82a040497207bd8e9e"
dependencies = [
"libc",
"windows-sys",
@ -5565,6 +5565,7 @@ dependencies = [
"http",
"hyper",
"influxdb_iox_client",
"iox_arrow_flight",
"nix 0.26.1",
"observability_deps",
"once_cell",

View File

@ -17,6 +17,7 @@ use once_cell::sync::Lazy;
use parking_lot::Mutex;
use pin_project::{pin_project, pinned_drop};
use std::{
panic::AssertUnwindSafe,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
@ -28,7 +29,7 @@ use tokio_util::sync::CancellationToken;
use futures::{
future::{BoxFuture, Shared},
Future, FutureExt, TryFutureExt,
ready, Future, FutureExt, TryFutureExt,
};
use observability_deps::tracing::warn;
@ -57,7 +58,7 @@ impl Task {
}
/// The type of error that is returned from tasks in this module
pub type Error = tokio::sync::oneshot::error::RecvError;
pub type Error = String;
/// Job within the executor.
///
@ -68,7 +69,7 @@ pub struct Job<T> {
cancel: CancellationToken,
detached: bool,
#[pin]
rx: Receiver<T>,
rx: Receiver<Result<T, String>>,
}
impl<T> Job<T> {
@ -89,7 +90,12 @@ impl<T> Future for Job<T> {
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
this.rx.poll(cx)
match ready!(this.rx.poll(cx)) {
Ok(res) => std::task::Poll::Ready(res),
Err(_) => std::task::Poll::Ready(Err(String::from(
"Worker thread gone, executor was likely shut down",
))),
}
}
}
@ -278,7 +284,16 @@ impl DedicatedExecutor {
let (tx, rx) = tokio::sync::oneshot::channel();
let fut = Box::pin(async move {
let task_output = task.await;
let task_output = AssertUnwindSafe(task).catch_unwind().await.map_err(|e| {
if let Some(s) = e.downcast_ref::<String>() {
s.clone()
} else if let Some(s) = e.downcast_ref::<&str>() {
s.to_string()
} else {
"unknown internal error".to_string()
}
});
if tx.send(task_output).is_err() {
warn!("Spawned task output ignored: receiver dropped")
}
@ -372,6 +387,7 @@ fn set_current_thread_priority(prio: i32) {
mod tests {
use super::*;
use std::{
panic::panic_any,
sync::{Arc, Barrier},
time::Duration,
};
@ -512,7 +528,7 @@ mod tests {
}
#[tokio::test]
async fn panic_on_executor() {
async fn panic_on_executor_str() {
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
let dedicated_task = exec.spawn(async move {
if true {
@ -523,7 +539,47 @@ mod tests {
});
// should not be able to get the result
dedicated_task.await.unwrap_err();
let err = dedicated_task.await.unwrap_err();
assert_eq!(
err.to_string(),
"At the disco, on the dedicated task scheduler",
);
exec.join().await;
}
#[tokio::test]
async fn panic_on_executor_string() {
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
let dedicated_task = exec.spawn(async move {
if true {
panic!("{} {}", 1, 2);
} else {
42
}
});
// should not be able to get the result
let err = dedicated_task.await.unwrap_err();
assert_eq!(err.to_string(), "1 2",);
exec.join().await;
}
#[tokio::test]
async fn panic_on_executor_other() {
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
let dedicated_task = exec.spawn(async move {
if true {
panic_any(1)
} else {
42
}
});
// should not be able to get the result
let err = dedicated_task.await.unwrap_err();
assert_eq!(err.to_string(), "unknown internal error",);
exec.join().await;
}
@ -558,7 +614,11 @@ mod tests {
let dedicated_task = exec.spawn(async { 11 });
// task should complete, but return an error
dedicated_task.await.unwrap_err();
let err = dedicated_task.await.unwrap_err();
assert_eq!(
err.to_string(),
"Worker thread gone, executor was likely shut down"
);
exec.join().await;
}
@ -574,7 +634,11 @@ mod tests {
let dedicated_task = exec.spawn(async { 11 });
// task should complete, but return an error
dedicated_task.await.unwrap_err();
let err = dedicated_task.await.unwrap_err();
assert_eq!(
err.to_string(),
"Worker thread gone, executor was likely shut down"
);
exec.join().await;
}

View File

@ -13,3 +13,11 @@ message WriteRequest {
}
message WriteResponse {}
service PersistService {
rpc Persist(PersistRequest) returns (PersistResponse);
}
message PersistRequest {}
message PersistResponse {}

View File

@ -7,6 +7,8 @@ pub use conditional::*;
pub mod arithmetic;
/// Provides conditional expression parsing.
pub mod conditional;
/// Provides APIs to traverse an expression tree using closures.
pub mod walk;
#[cfg(test)]
mod test_util;

View File

@ -15,7 +15,7 @@ use nom::sequence::{delimited, preceded, tuple};
use std::fmt;
use std::fmt::{Display, Formatter, Write};
/// Represents on of the conditional operators supported by [`ConditionalExpression::Binary`].
/// Represents one of the conditional operators supported by [`ConditionalExpression::Binary`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConditionalOperator {
/// Represents the `=` operator.
@ -233,7 +233,7 @@ impl ArithmeticParsers for ConditionalExpression {
}
/// Parse an arithmetic expression used by conditional expressions.
fn arithmetic_expression(i: &str) -> ParseResult<&str, Expr> {
pub(crate) fn arithmetic_expression(i: &str) -> ParseResult<&str, Expr> {
arithmetic::<ConditionalExpression>(i)
}

View File

@ -0,0 +1,7 @@
---
source: influxdb_influxql_parser/src/expression/walk.rs
expression: "walk_expr(\"now() + 1h\")"
---
0: Call { name: "now", args: [] }
1: Literal(Duration(Duration(3600000000000)))
2: Binary { lhs: Call { name: "now", args: [] }, op: Add, rhs: Literal(Duration(Duration(3600000000000))) }

View File

@ -0,0 +1,7 @@
---
source: influxdb_influxql_parser/src/expression/walk.rs
expression: "walk_expr(\"5 + 6\")"
---
0: Literal(Unsigned(5))
1: Literal(Unsigned(6))
2: Binary { lhs: Literal(Unsigned(5)), op: Add, rhs: Literal(Unsigned(6)) }

View File

@ -0,0 +1,7 @@
---
source: influxdb_influxql_parser/src/expression/walk.rs
expression: "walk_expr_mut(\"now() + 1h\")"
---
0: Call { name: "now", args: [] }
1: Literal(Duration(Duration(3600000000000)))
2: Binary { lhs: Call { name: "now", args: [] }, op: Add, rhs: Literal(Duration(Duration(3600000000000))) }

View File

@ -0,0 +1,7 @@
---
source: influxdb_influxql_parser/src/expression/walk.rs
expression: "walk_expr_mut(\"5 + 6\")"
---
0: Literal(Unsigned(5))
1: Literal(Unsigned(6))
2: Binary { lhs: Literal(Unsigned(5)), op: Add, rhs: Literal(Unsigned(6)) }

View File

@ -0,0 +1,11 @@
---
source: influxdb_influxql_parser/src/expression/walk.rs
expression: "walk_expression(\"time > now() + 1h\")"
---
0: Arithmetic(VarRef { name: Identifier("time"), data_type: None })
1: Conditional(Expr(VarRef { name: Identifier("time"), data_type: None }))
2: Arithmetic(Call { name: "now", args: [] })
3: Arithmetic(Literal(Duration(Duration(3600000000000))))
4: Arithmetic(Binary { lhs: Call { name: "now", args: [] }, op: Add, rhs: Literal(Duration(Duration(3600000000000))) })
5: Conditional(Expr(Binary { lhs: Call { name: "now", args: [] }, op: Add, rhs: Literal(Duration(Duration(3600000000000))) }))
6: Conditional(Binary { lhs: Expr(VarRef { name: Identifier("time"), data_type: None }), op: Gt, rhs: Expr(Binary { lhs: Call { name: "now", args: [] }, op: Add, rhs: Literal(Duration(Duration(3600000000000))) }) })

View File

@ -0,0 +1,13 @@
---
source: influxdb_influxql_parser/src/expression/walk.rs
expression: "walk_expression(\"5 + 6 = 2 + 9\")"
---
0: Arithmetic(Literal(Unsigned(5)))
1: Arithmetic(Literal(Unsigned(6)))
2: Arithmetic(Binary { lhs: Literal(Unsigned(5)), op: Add, rhs: Literal(Unsigned(6)) })
3: Conditional(Expr(Binary { lhs: Literal(Unsigned(5)), op: Add, rhs: Literal(Unsigned(6)) }))
4: Arithmetic(Literal(Unsigned(2)))
5: Arithmetic(Literal(Unsigned(9)))
6: Arithmetic(Binary { lhs: Literal(Unsigned(2)), op: Add, rhs: Literal(Unsigned(9)) })
7: Conditional(Expr(Binary { lhs: Literal(Unsigned(2)), op: Add, rhs: Literal(Unsigned(9)) }))
8: Conditional(Binary { lhs: Expr(Binary { lhs: Literal(Unsigned(5)), op: Add, rhs: Literal(Unsigned(6)) }), op: Eq, rhs: Expr(Binary { lhs: Literal(Unsigned(2)), op: Add, rhs: Literal(Unsigned(9)) }) })

View File

@ -0,0 +1,207 @@
use crate::expression::{ConditionalExpression, Expr};
/// Expression distinguishes InfluxQL [`ConditionalExpression`] or [`Expr`]
/// nodes when visiting a [`ConditionalExpression`] tree. See [`walk_expression`].
#[derive(Debug)]
pub enum Expression<'a> {
/// Specifies a conditional expression.
Conditional(&'a ConditionalExpression),
/// Specifies an arithmetic expression.
Arithmetic(&'a Expr),
}
/// ExpressionMut is the same as [`Expression`] with the exception that
/// it provides mutable access to the nodes of the tree.
#[derive(Debug)]
pub enum ExpressionMut<'a> {
/// Specifies a conditional expression.
Conditional(&'a mut ConditionalExpression),
/// Specifies an arithmetic expression.
Arithmetic(&'a mut Expr),
}
/// Perform a depth-first traversal of an expression tree.
pub fn walk_expression<B>(
node: &ConditionalExpression,
visit: &mut impl FnMut(Expression<'_>) -> std::ops::ControlFlow<B>,
) -> std::ops::ControlFlow<B> {
match node {
ConditionalExpression::Expr(n) => walk_expr(n, &mut |n| visit(Expression::Arithmetic(n)))?,
ConditionalExpression::Binary { lhs, rhs, .. } => {
walk_expression(lhs, visit)?;
walk_expression(rhs, visit)?;
}
ConditionalExpression::Grouped(n) => walk_expression(n, visit)?,
}
visit(Expression::Conditional(node))
}
/// Perform a depth-first traversal of a mutable arithmetic or conditional expression tree.
pub fn walk_expression_mut<B>(
node: &mut ConditionalExpression,
visit: &mut impl FnMut(ExpressionMut<'_>) -> std::ops::ControlFlow<B>,
) -> std::ops::ControlFlow<B> {
match node {
ConditionalExpression::Expr(n) => {
walk_expr_mut(n, &mut |n| visit(ExpressionMut::Arithmetic(n)))?
}
ConditionalExpression::Binary { lhs, rhs, .. } => {
walk_expression_mut(lhs, visit)?;
walk_expression_mut(rhs, visit)?;
}
ConditionalExpression::Grouped(n) => walk_expression_mut(n, visit)?,
}
visit(ExpressionMut::Conditional(node))
}
/// Perform a depth-first traversal of the arithmetic expression tree.
pub fn walk_expr<B>(
expr: &Expr,
visit: &mut impl FnMut(&Expr) -> std::ops::ControlFlow<B>,
) -> std::ops::ControlFlow<B> {
match expr {
Expr::Binary { lhs, rhs, .. } => {
walk_expr(lhs, visit)?;
walk_expr(rhs, visit)?;
}
Expr::UnaryOp(_, n) => walk_expr(n, visit)?,
Expr::Nested(n) => walk_expr(n, visit)?,
Expr::Call { args, .. } => {
args.iter().try_for_each(|n| walk_expr(n, visit))?;
}
Expr::VarRef { .. }
| Expr::BindParameter(_)
| Expr::Literal(_)
| Expr::Wildcard(_)
| Expr::Distinct(_) => {}
}
visit(expr)
}
/// Perform a depth-first traversal of a mutable arithmetic expression tree.
pub fn walk_expr_mut<B>(
expr: &mut Expr,
visit: &mut impl FnMut(&mut Expr) -> std::ops::ControlFlow<B>,
) -> std::ops::ControlFlow<B> {
match expr {
Expr::Binary { lhs, rhs, .. } => {
walk_expr_mut(lhs, visit)?;
walk_expr_mut(rhs, visit)?;
}
Expr::UnaryOp(_, n) => walk_expr_mut(n, visit)?,
Expr::Nested(n) => walk_expr_mut(n, visit)?,
Expr::Call { args, .. } => {
args.iter_mut().try_for_each(|n| walk_expr_mut(n, visit))?;
}
Expr::VarRef { .. }
| Expr::BindParameter(_)
| Expr::Literal(_)
| Expr::Wildcard(_)
| Expr::Distinct(_) => {}
}
visit(expr)
}
#[cfg(test)]
mod test {
use crate::expression::walk::{walk_expr_mut, walk_expression_mut, ExpressionMut};
use crate::expression::{
arithmetic_expression, conditional_expression, ConditionalExpression, ConditionalOperator,
Expr,
};
use crate::literal::Literal;
#[test]
fn test_walk_expression() {
fn walk_expression(s: &str) -> String {
let (_, ref expr) = conditional_expression(s).unwrap();
let mut calls = Vec::new();
let mut call_no = 0;
super::walk_expression::<()>(expr, &mut |n| {
calls.push(format!("{}: {:?}", call_no, n));
call_no += 1;
std::ops::ControlFlow::Continue(())
});
calls.join("\n")
}
insta::assert_display_snapshot!(walk_expression("5 + 6 = 2 + 9"));
insta::assert_display_snapshot!(walk_expression("time > now() + 1h"));
}
#[test]
fn test_walk_expression_mut_modify() {
let (_, ref mut expr) = conditional_expression("foo + bar + 5 =~ /str/").unwrap();
walk_expression_mut::<()>(expr, &mut |e| {
match e {
ExpressionMut::Arithmetic(n) => match n {
Expr::VarRef { name, .. } => *name = format!("c_{}", name).into(),
Expr::Literal(Literal::Unsigned(v)) => *v *= 10,
Expr::Literal(Literal::Regex(v)) => *v = format!("c_{}", v.0).into(),
_ => {}
},
ExpressionMut::Conditional(n) => {
if let ConditionalExpression::Binary { op, .. } = n {
*op = ConditionalOperator::NotEqRegex
}
}
}
std::ops::ControlFlow::Continue(())
});
assert_eq!(format!("{}", expr), "c_foo + c_bar + 50 !~ /c_str/")
}
#[test]
fn test_walk_expr() {
fn walk_expr(s: &str) -> String {
let (_, expr) = arithmetic_expression(s).unwrap();
let mut calls = Vec::new();
let mut call_no = 0;
super::walk_expr::<()>(&expr, &mut |n| {
calls.push(format!("{}: {:?}", call_no, n));
call_no += 1;
std::ops::ControlFlow::Continue(())
});
calls.join("\n")
}
insta::assert_display_snapshot!(walk_expr("5 + 6"));
insta::assert_display_snapshot!(walk_expr("now() + 1h"));
}
#[test]
fn test_walk_expr_mut() {
fn walk_expr_mut(s: &str) -> String {
let (_, mut expr) = arithmetic_expression(s).unwrap();
let mut calls = Vec::new();
let mut call_no = 0;
super::walk_expr_mut::<()>(&mut expr, &mut |n| {
calls.push(format!("{}: {:?}", call_no, n));
call_no += 1;
std::ops::ControlFlow::Continue(())
});
calls.join("\n")
}
insta::assert_display_snapshot!(walk_expr_mut("5 + 6"));
insta::assert_display_snapshot!(walk_expr_mut("now() + 1h"));
}
#[test]
fn test_walk_expr_mut_modify() {
let (_, mut expr) = arithmetic_expression("foo + bar + 5").unwrap();
walk_expr_mut::<()>(&mut expr, &mut |e| {
match e {
Expr::VarRef { name, .. } => *name = format!("c_{}", name).into(),
Expr::Literal(Literal::Unsigned(v)) => *v *= 10,
_ => {}
}
std::ops::ControlFlow::Continue(())
});
assert_eq!(format!("{}", expr), "c_foo + c_bar + 50")
}
}

View File

@ -1,13 +1,13 @@
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use arrow_util::assert_batches_sorted_eq;
use data_types::{NamespaceId, TableId};
use futures::StreamExt;
use futures::FutureExt;
use generated_types::{influxdata::iox::ingester::v1 as proto, ingester::IngesterQueryRequest};
use http::StatusCode;
use influxdb_iox_client::flight::generated_types::IngesterQueryResponseMetadata;
use iox_arrow_flight::{prost::Message, DecodedFlightData, DecodedPayload, FlightDataStream};
use iox_arrow_flight::prost::Message;
use test_helpers_end_to_end::{
get_write_token, maybe_skip_integration, wait_for_readable, MiniCluster,
get_write_token, maybe_skip_integration, wait_for_readable, MiniCluster, Step, StepTest,
StepTestState,
};
/// Temporary duplication: These tests should be kept in sync (as far as what they're logically
@ -37,31 +37,19 @@ mod with_kafka {
let write_token = get_write_token(&response);
wait_for_readable(write_token, cluster.ingester().ingester_grpc_connection()).await;
let querier_flight =
influxdb_iox_client::flight::Client::new(cluster.ingester().ingester_grpc_connection());
// query the ingester
let query = IngesterQueryRequest::new(
cluster.namespace_id().await,
cluster.table_id(table_name).await,
vec![],
Some(::predicate::EMPTY_PREDICATE),
);
let query: proto::IngesterQueryRequest = query.try_into().unwrap();
let ingester_response = cluster.query_ingester(query).await.unwrap();
let mut performed_query = querier_flight
.into_inner()
.do_get(query.encode_to_vec())
.await
.unwrap()
.into_inner();
let (msg, app_metadata) = next_message(&mut performed_query).await.unwrap();
assert!(matches!(msg, DecodedPayload::None), "{:?}", msg);
let partition_id = app_metadata.partition_id;
let partition_id = ingester_response.app_metadata.partition_id;
assert_eq!(
app_metadata,
ingester_response.app_metadata,
IngesterQueryResponseMetadata {
partition_id,
status: Some(proto::PartitionStatus {
@ -72,14 +60,7 @@ mod with_kafka {
},
);
let (msg, _) = next_message(&mut performed_query).await.unwrap();
let schema = unwrap_schema(msg);
let mut query_results = vec![];
while let Some((msg, _md)) = next_message(&mut performed_query).await {
let batch = unwrap_record_batch(msg);
query_results.push(batch);
}
let schema = ingester_response.schema.unwrap();
let expected = [
"+------+------+--------------------------------+-----+",
@ -88,18 +69,22 @@ mod with_kafka {
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
"+------+------+--------------------------------+-----+",
];
assert_batches_sorted_eq!(&expected, &query_results);
assert_batches_sorted_eq!(&expected, &ingester_response.record_batches);
// Also ensure that the schema of the batches matches what is
// reported by the performed_query.
query_results.iter().enumerate().for_each(|(i, b)| {
assert_eq!(
schema,
b.schema(),
"Schema mismatch for returned batch {}",
i
);
});
ingester_response
.record_batches
.iter()
.enumerate()
.for_each(|(i, b)| {
assert_eq!(
schema,
b.schema(),
"Schema mismatch for returned batch {}",
i
);
});
}
#[tokio::test]
@ -110,10 +95,7 @@ mod with_kafka {
// Set up cluster
let cluster = MiniCluster::create_shared(database_url).await;
let mut querier_flight =
influxdb_iox_client::flight::Client::new(cluster.ingester().ingester_grpc_connection())
.into_inner();
// query the ingester
let query = IngesterQueryRequest::new(
NamespaceId::new(i64::MAX),
TableId::new(42),
@ -121,11 +103,8 @@ mod with_kafka {
Some(::predicate::EMPTY_PREDICATE),
);
let query: proto::IngesterQueryRequest = query.try_into().unwrap();
let err = cluster.query_ingester(query).await.unwrap_err();
let err = querier_flight
.do_get(query.encode_to_vec())
.await
.unwrap_err();
if let iox_arrow_flight::FlightError::Tonic(status) = err {
assert_eq!(status.code(), tonic::Code::NotFound);
} else {
@ -150,10 +129,6 @@ mod with_kafka {
let write_token = get_write_token(&response);
wait_for_readable(write_token, cluster.ingester().ingester_grpc_connection()).await;
let mut querier_flight =
influxdb_iox_client::flight::Client::new(cluster.ingester().ingester_grpc_connection())
.into_inner();
let query = IngesterQueryRequest::new(
cluster.namespace_id().await,
TableId::new(i64::MAX),
@ -161,11 +136,8 @@ mod with_kafka {
Some(::predicate::EMPTY_PREDICATE),
);
let query: proto::IngesterQueryRequest = query.try_into().unwrap();
let err = cluster.query_ingester(query).await.unwrap_err();
let err = querier_flight
.do_get(query.encode_to_vec())
.await
.unwrap_err();
if let iox_arrow_flight::FlightError::Tonic(status) = err {
assert_eq!(status.code(), tonic::Code::NotFound);
} else {
@ -183,6 +155,76 @@ mod with_kafka {
mod kafkaless_rpc_write {
use super::*;
#[tokio::test]
async fn persist_on_demand() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "mytable";
let mut cluster = MiniCluster::create_shared2_never_persist(database_url).await;
StepTest::new(
&mut cluster,
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(format!("{table_name},tag1=A,tag2=B val=42i 123456")),
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
// query the ingester
let query = IngesterQueryRequest::new(
state.cluster().namespace_id().await,
state.cluster().table_id(table_name).await,
vec![],
Some(::predicate::EMPTY_PREDICATE),
);
let query: proto::IngesterQueryRequest = query.try_into().unwrap();
let ingester_response =
state.cluster().query_ingester(query).await.unwrap();
let ingester_uuid = ingester_response.app_metadata.ingester_uuid.clone();
assert!(!ingester_uuid.is_empty());
let expected = [
"+------+------+--------------------------------+-----+",
"| tag1 | tag2 | time | val |",
"+------+------+--------------------------------+-----+",
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
"+------+------+--------------------------------+-----+",
];
assert_batches_sorted_eq!(&expected, &ingester_response.record_batches);
}
.boxed()
})),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
// Ensure the ingester responds with the correct file count to tell the querier
// it needs to expire its catalog cache
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
let query = IngesterQueryRequest::new(
state.cluster().namespace_id().await,
state.cluster().table_id(table_name).await,
vec![],
Some(::predicate::EMPTY_PREDICATE),
);
let query: proto::IngesterQueryRequest = query.try_into().unwrap();
let ingester_response =
state.cluster().query_ingester(query.clone()).await.unwrap();
let num_files_persisted =
ingester_response.app_metadata.completed_persistence_count;
assert_eq!(num_files_persisted, 1);
}
.boxed()
})),
],
)
.run()
.await
}
#[tokio::test]
async fn ingester_flight_api() {
test_helpers::maybe_start_logging();
@ -199,40 +241,20 @@ mod kafkaless_rpc_write {
let response = cluster.write_to_router(lp).await;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
let mut querier_flight =
influxdb_iox_client::flight::Client::new(cluster.ingester().ingester_grpc_connection())
.into_inner();
// query the ingester
let query = IngesterQueryRequest::new(
cluster.namespace_id().await,
cluster.table_id(table_name).await,
vec![],
Some(::predicate::EMPTY_PREDICATE),
);
let query: proto::IngesterQueryRequest = query.try_into().unwrap();
let query = query.encode_to_vec();
let ingester_response = cluster.query_ingester(query.clone()).await.unwrap();
let mut performed_query = querier_flight
.do_get(query.clone())
.await
.unwrap()
.into_inner();
let (msg, app_metadata) = next_message(&mut performed_query).await.unwrap();
assert!(matches!(msg, DecodedPayload::None), "{:?}", msg);
let ingester_uuid = app_metadata.ingester_uuid.clone();
let ingester_uuid = ingester_response.app_metadata.ingester_uuid.clone();
assert!(!ingester_uuid.is_empty());
let (msg, _) = next_message(&mut performed_query).await.unwrap();
let schema = unwrap_schema(msg);
let mut query_results = vec![];
while let Some((msg, _md)) = next_message(&mut performed_query).await {
let batch = unwrap_record_batch(msg);
query_results.push(batch);
}
let schema = ingester_response.schema.unwrap();
let expected = [
"+------+------+--------------------------------+-----+",
@ -241,31 +263,28 @@ mod kafkaless_rpc_write {
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
"+------+------+--------------------------------+-----+",
];
assert_batches_sorted_eq!(&expected, &query_results);
assert_batches_sorted_eq!(&expected, &ingester_response.record_batches);
// Also ensure that the schema of the batches matches what is
// reported by the performed_query.
query_results.iter().enumerate().for_each(|(i, b)| {
assert_eq!(
schema,
b.schema(),
"Schema mismatch for returned batch {}",
i
);
});
ingester_response
.record_batches
.iter()
.enumerate()
.for_each(|(i, b)| {
assert_eq!(
schema,
b.schema(),
"Schema mismatch for returned batch {}",
i
);
});
// Ensure the ingester UUID is the same in the next query
let mut performed_query = querier_flight
.do_get(query.clone())
.await
.unwrap()
.into_inner();
let ingester_response = cluster.query_ingester(query.clone()).await.unwrap();
assert_eq!(ingester_response.app_metadata.ingester_uuid, ingester_uuid);
let (msg, app_metadata) = next_message(&mut performed_query).await.unwrap();
assert!(matches!(msg, DecodedPayload::None), "{:?}", msg);
assert_eq!(app_metadata.ingester_uuid, ingester_uuid);
// Restart the ingester and ensure it gets a new UUID
// Restart the ingester
cluster.restart_ingester().await;
// Populate the ingester with some data so it returns a successful
@ -275,10 +294,8 @@ mod kafkaless_rpc_write {
assert_eq!(response.status(), StatusCode::NO_CONTENT);
// Query for the new UUID and assert it has changed.
let mut performed_query = querier_flight.do_get(query).await.unwrap().into_inner();
let (msg, app_metadata) = next_message(&mut performed_query).await.unwrap();
assert!(matches!(msg, DecodedPayload::None), "{:?}", msg);
assert_ne!(app_metadata.ingester_uuid, ingester_uuid);
let ingester_response = cluster.query_ingester(query).await.unwrap();
assert_ne!(ingester_response.app_metadata.ingester_uuid, ingester_uuid);
}
#[tokio::test]
@ -289,10 +306,7 @@ mod kafkaless_rpc_write {
// Set up cluster
let cluster = MiniCluster::create_shared2(database_url).await;
let mut querier_flight =
influxdb_iox_client::flight::Client::new(cluster.ingester().ingester_grpc_connection())
.into_inner();
// query the ingester
let query = IngesterQueryRequest::new(
NamespaceId::new(i64::MAX),
TableId::new(42),
@ -300,11 +314,8 @@ mod kafkaless_rpc_write {
Some(::predicate::EMPTY_PREDICATE),
);
let query: proto::IngesterQueryRequest = query.try_into().unwrap();
let err = cluster.query_ingester(query).await.unwrap_err();
let err = querier_flight
.do_get(query.encode_to_vec())
.await
.unwrap_err();
if let iox_arrow_flight::FlightError::Tonic(status) = err {
assert_eq!(status.code(), tonic::Code::NotFound);
} else {
@ -348,29 +359,3 @@ mod kafkaless_rpc_write {
}
}
}
async fn next_message(
performed_query: &mut FlightDataStream,
) -> Option<(DecodedPayload, proto::IngesterQueryResponseMetadata)> {
let DecodedFlightData { inner, payload } = performed_query.next().await.transpose().unwrap()?;
// extract the metadata from the underlying FlightData structure
let app_metadata = &inner.app_metadata[..];
let app_metadata: proto::IngesterQueryResponseMetadata = Message::decode(app_metadata).unwrap();
Some((payload, app_metadata))
}
fn unwrap_schema(msg: DecodedPayload) -> SchemaRef {
match msg {
DecodedPayload::Schema(s) => s,
_ => panic!("Unexpected message type: {:?}", msg),
}
}
fn unwrap_record_batch(msg: DecodedPayload) -> RecordBatch {
match msg {
DecodedPayload::RecordBatch(b) => b,
_ => panic!("Unexpected message type: {:?}", msg),
}
}

View File

@ -17,6 +17,9 @@ pub mod flight;
/// Client for health checking API
pub mod health;
/// Client for the ingester API
pub mod ingester;
/// Client for namespace API
pub mod namespace;

View File

@ -0,0 +1,32 @@
use self::generated_types::{persist_service_client::PersistServiceClient, *};
use crate::{connection::Connection, error::Error};
use client_util::connection::GrpcConnection;
/// Re-export generated_types
pub mod generated_types {
pub use generated_types::influxdata::iox::ingester::v1::*;
}
/// A basic client for interacting with the ingester persist service.
#[derive(Debug, Clone)]
pub struct Client {
inner: PersistServiceClient<GrpcConnection>,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(connection: Connection) -> Self {
Self {
inner: PersistServiceClient::new(connection.into_grpc_connection()),
}
}
/// Instruct the ingester to persist its data to Parquet. Will block until the data has
/// persisted, which is useful in tests asserting on persisted data. May behave in unexpected
/// ways if used concurrently with writes and ingester WAL rotations.
pub async fn persist(&mut self) -> Result<(), Error> {
self.inner.persist(PersistRequest {}).await?;
Ok(())
}
}

View File

@ -11,7 +11,10 @@ use backoff::BackoffConfig;
use futures::{future::Shared, Future, FutureExt};
use generated_types::influxdata::iox::{
catalog::v1::catalog_service_server::{CatalogService, CatalogServiceServer},
ingester::v1::write_service_server::{WriteService, WriteServiceServer},
ingester::v1::{
persist_service_server::{PersistService, PersistServiceServer},
write_service_server::{WriteService, WriteServiceServer},
},
};
use iox_arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
use iox_catalog::interface::Catalog;
@ -54,6 +57,8 @@ pub trait IngesterRpcInterface: Send + Sync + std::fmt::Debug {
type CatalogHandler: CatalogService;
/// The type of the [`WriteService`] implementation.
type WriteHandler: WriteService;
/// The type of the [`PersistService`] implementation.
type PersistHandler: PersistService;
/// The type of the [`FlightService`] implementation.
type FlightHandler: FlightService;
@ -65,6 +70,10 @@ pub trait IngesterRpcInterface: Send + Sync + std::fmt::Debug {
/// handler implementation.
fn write_service(&self) -> WriteServiceServer<Self::WriteHandler>;
/// Acquire an opaque handle to the Ingester's [`PersistService`] RPC
/// handler implementation.
fn persist_service(&self) -> PersistServiceServer<Self::PersistHandler>;
/// Acquire an opaque handle to the Ingester's Arrow Flight
/// [`FlightService`] RPC handler implementation, allowing at most
/// `max_simultaneous_requests` queries to be running at any one time.
@ -334,18 +343,20 @@ where
shutdown_tx,
Arc::clone(&ingest_state),
Arc::clone(&buffer),
persist_handle,
Arc::clone(&persist_handle),
wal,
));
Ok(IngesterGuard {
rpc: GrpcDelegate::new(
Arc::new(write_path),
buffer,
Arc::clone(&buffer),
timestamp,
ingest_state,
catalog,
metrics,
buffer,
persist_handle,
),
rotation_task,
graceful_shutdown_handler: shutdown_task,

View File

@ -1,5 +1,6 @@
//! gRPC service implementations for `ingester`.
mod persist;
mod query;
mod rpc_write;
@ -7,7 +8,9 @@ use std::{fmt::Debug, sync::Arc};
use generated_types::influxdata::iox::{
catalog::v1::catalog_service_server::CatalogServiceServer,
ingester::v1::write_service_server::WriteServiceServer,
ingester::v1::{
persist_service_server::PersistServiceServer, write_service_server::WriteServiceServer,
},
};
use iox_arrow_flight::flight_service_server::FlightServiceServer;
use iox_catalog::interface::Catalog;
@ -17,11 +20,13 @@ use crate::{
dml_sink::DmlSink,
ingest_state::IngestState,
init::IngesterRpcInterface,
partition_iter::PartitionIter,
persist::queue::PersistQueue,
query::{response::QueryResponse, QueryExec},
timestamp_oracle::TimestampOracle,
};
use self::rpc_write::RpcWrite;
use self::{persist::PersistHandler, rpc_write::RpcWrite};
/// This type is responsible for injecting internal dependencies that SHOULD NOT
/// leak outside of the ingester crate into public gRPC handlers.
@ -29,21 +34,26 @@ use self::rpc_write::RpcWrite;
/// Configuration and external dependencies SHOULD be injected through the
/// respective gRPC handler constructor method.
#[derive(Debug)]
pub(crate) struct GrpcDelegate<D, Q> {
pub(crate) struct GrpcDelegate<D, Q, T, P> {
dml_sink: Arc<D>,
query_exec: Arc<Q>,
timestamp: Arc<TimestampOracle>,
ingest_state: Arc<IngestState>,
catalog: Arc<dyn Catalog>,
metrics: Arc<metric::Registry>,
buffer: Arc<T>,
persist_handle: Arc<P>,
}
impl<D, Q> GrpcDelegate<D, Q>
impl<D, Q, T, P> GrpcDelegate<D, Q, T, P>
where
D: DmlSink + 'static,
Q: QueryExec<Response = QueryResponse> + 'static,
T: PartitionIter + Sync + 'static,
P: PersistQueue + Sync + 'static,
{
/// Initialise a new [`GrpcDelegate`].
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
dml_sink: Arc<D>,
query_exec: Arc<Q>,
@ -51,6 +61,8 @@ where
ingest_state: Arc<IngestState>,
catalog: Arc<dyn Catalog>,
metrics: Arc<metric::Registry>,
buffer: Arc<T>,
persist_handle: Arc<P>,
) -> Self {
Self {
dml_sink,
@ -59,19 +71,24 @@ where
ingest_state,
catalog,
metrics,
buffer,
persist_handle,
}
}
}
/// Implement the type-erasure trait to hide internal types from crate-external
/// callers.
impl<D, Q> IngesterRpcInterface for GrpcDelegate<D, Q>
impl<D, Q, T, P> IngesterRpcInterface for GrpcDelegate<D, Q, T, P>
where
D: DmlSink + 'static,
Q: QueryExec<Response = QueryResponse> + 'static,
T: PartitionIter + Sync + 'static,
P: PersistQueue + Sync + 'static,
{
type CatalogHandler = CatalogService;
type WriteHandler = RpcWrite<Arc<D>>;
type PersistHandler = PersistHandler<Arc<T>, Arc<P>>;
type FlightHandler = query::FlightService<Arc<Q>>;
/// Acquire a [`CatalogService`] gRPC service implementation.
@ -83,7 +100,7 @@ where
/// Return a [`WriteService`] gRPC implementation.
///
/// [`WriteService`]: generated_types::influxdata::iox::catalog::v1::write_service_server::WriteService.
/// [`WriteService`]: generated_types::influxdata::iox::ingester::v1::write_service_server::WriteService.
fn write_service(&self) -> WriteServiceServer<Self::WriteHandler> {
WriteServiceServer::new(RpcWrite::new(
Arc::clone(&self.dml_sink),
@ -92,6 +109,16 @@ where
))
}
/// Return a [`PersistService`] gRPC implementation.
///
/// [`PersistService`]: generated_types::influxdata::iox::ingester::v1::persist_service_server::PersistService.
fn persist_service(&self) -> PersistServiceServer<Self::PersistHandler> {
PersistServiceServer::new(PersistHandler::new(
Arc::clone(&self.buffer),
Arc::clone(&self.persist_handle),
))
}
/// Return an Arrow [`FlightService`] gRPC implementation.
///
/// [`FlightService`]: iox_arrow_flight::flight_service_server::FlightService

View File

@ -0,0 +1,46 @@
use crate::{
partition_iter::PartitionIter,
persist::{drain_buffer::persist_partitions, queue::PersistQueue},
};
use generated_types::influxdata::iox::ingester::v1::{
self as proto, persist_service_server::PersistService,
};
use tonic::{Request, Response};
#[derive(Debug)]
pub(crate) struct PersistHandler<T, P> {
buffer: T,
persist_handle: P,
}
impl<T, P> PersistHandler<T, P>
where
T: PartitionIter + Sync + 'static,
P: PersistQueue + Clone + Sync + 'static,
{
pub(crate) fn new(buffer: T, persist_handle: P) -> Self {
Self {
buffer,
persist_handle,
}
}
}
#[tonic::async_trait]
impl<T, P> PersistService for PersistHandler<T, P>
where
T: PartitionIter + Sync + 'static,
P: PersistQueue + Clone + Sync + 'static,
{
/// Handle the RPC request to persist immediately. Will block until the data has persisted,
/// which is useful in tests asserting on persisted data. May behave in unexpected ways if used
/// concurrently with writes and ingester WAL rotations.
async fn persist(
&self,
_request: Request<proto::PersistRequest>,
) -> Result<Response<proto::PersistResponse>, tonic::Status> {
persist_partitions(self.buffer.partition_iter(), &self.persist_handle).await;
Ok(Response::new(proto::PersistResponse {}))
}
}

View File

@ -611,7 +611,7 @@ impl IOxSessionContext {
exec.spawn(fut).await.unwrap_or_else(|e| {
Err(Error::Context(
"Join Error".to_string(),
Box::new(Error::External(Box::new(e))),
Box::new(Error::External(e.into())),
))
})
}

View File

@ -5,6 +5,7 @@ use crate::plan::influxql::field::field_name;
use crate::plan::influxql::field_mapper::{field_and_dimensions, FieldTypeMap, TagSet};
use datafusion::common::{DataFusionError, Result};
use influxdb_influxql_parser::common::{MeasurementName, QualifiedMeasurementName};
use influxdb_influxql_parser::expression::walk::{walk_expr, walk_expr_mut};
use influxdb_influxql_parser::expression::{Expr, VarRefDataType, WildcardType};
use influxdb_influxql_parser::identifier::Identifier;
use influxdb_influxql_parser::literal::Literal;
@ -19,7 +20,7 @@ use predicate::rpc_predicate::QueryNamespaceMeta;
use query_functions::clean_non_meta_escapes;
use std::borrow::Borrow;
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
use std::ops::{ControlFlow, Deref};
fn parse_regex(re: &Regex) -> Result<regex::Regex> {
let pattern = clean_non_meta_escapes(re.as_str());
@ -184,50 +185,6 @@ fn has_wildcards(stmt: &SelectStatement) -> (bool, bool) {
(res.0, res.1)
}
/// Perform a depth-first traversal of the expression tree.
fn walk_expr_mut(expr: &mut Expr, visit: &mut impl FnMut(&mut Expr) -> Result<()>) -> Result<()> {
match expr {
Expr::Binary { lhs, rhs, .. } => {
walk_expr_mut(lhs, visit)?;
walk_expr_mut(rhs, visit)?;
}
Expr::UnaryOp(_, expr) => walk_expr_mut(expr, visit)?,
Expr::Nested(expr) => walk_expr_mut(expr, visit)?,
Expr::Call { args, .. } => {
args.iter_mut().try_for_each(|n| walk_expr_mut(n, visit))?;
}
Expr::VarRef { .. }
| Expr::BindParameter(_)
| Expr::Literal(_)
| Expr::Wildcard(_)
| Expr::Distinct(_) => {}
}
visit(expr)
}
/// Perform a depth-first traversal of the expression tree.
pub(crate) fn walk_expr(expr: &Expr, visit: &mut impl FnMut(&Expr) -> Result<()>) -> Result<()> {
match expr {
Expr::Binary { lhs, rhs, .. } => {
walk_expr(lhs, visit)?;
walk_expr(rhs, visit)?;
}
Expr::UnaryOp(_, expr) => walk_expr(expr, visit)?,
Expr::Nested(expr) => walk_expr(expr, visit)?,
Expr::Call { args, .. } => {
args.iter().try_for_each(|n| walk_expr(n, visit))?;
}
Expr::VarRef { .. }
| Expr::BindParameter(_)
| Expr::Literal(_)
| Expr::Wildcard(_)
| Expr::Distinct(_) => {}
}
visit(expr)
}
/// Rewrite the projection list and GROUP BY of the specified `SELECT` statement.
///
/// Wildcards and regular expressions in the `SELECT` projection list and `GROUP BY` are expanded.
@ -248,18 +205,23 @@ fn rewrite_field_list(
// Attempt to rewrite all variable references in the fields with their types, if one
// hasn't been specified.
stmt.fields.iter_mut().try_for_each(|f| {
walk_expr_mut(&mut f.expr, &mut |e| {
if let ControlFlow::Break(e) = stmt.fields.iter_mut().try_for_each(|f| {
walk_expr_mut::<DataFusionError>(&mut f.expr, &mut |e| {
if matches!(e, Expr::VarRef { .. }) {
let new_type = evaluate_type(namespace, e.borrow(), &stmt.from)?;
let new_type = match evaluate_type(namespace, e.borrow(), &stmt.from) {
Err(e) => ControlFlow::Break(e)?,
Ok(v) => v,
};
if let Expr::VarRef { data_type, .. } = e {
*data_type = new_type;
}
}
Ok(())
ControlFlow::Continue(())
})
})?;
}) {
return Err(e);
}
let (has_field_wildcard, has_group_by_wildcard) = has_wildcards(stmt);
if (has_field_wildcard, has_group_by_wildcard) == (false, false) {
@ -421,17 +383,16 @@ fn rewrite_field_list(
}
Expr::Binary { .. } => {
let mut has_wildcard = false;
walk_expr(&f.expr, &mut |e| {
let has_wildcard = walk_expr(&f.expr, &mut |e| {
match e {
Expr::Wildcard(_) | Expr::Literal(Literal::Regex(_)) => {
has_wildcard = true
return ControlFlow::Break(())
}
_ => {}
}
Ok(())
})?;
ControlFlow::Continue(())
})
.is_break();
if has_wildcard {
return Err(DataFusionError::External(
@ -539,10 +500,8 @@ pub(crate) fn rewrite_statement(
#[cfg(test)]
mod test {
use crate::plan::influxql::rewriter::{has_wildcards, rewrite_statement, walk_expr_mut};
use crate::plan::influxql::test_utils::{get_first_field, MockNamespace};
use influxdb_influxql_parser::expression::Expr;
use influxdb_influxql_parser::literal::Literal;
use crate::plan::influxql::rewriter::{has_wildcards, rewrite_statement};
use crate::plan::influxql::test_utils::MockNamespace;
use influxdb_influxql_parser::parse_statements;
use influxdb_influxql_parser::select::SelectStatement;
use influxdb_influxql_parser::statement::Statement;
@ -851,59 +810,4 @@ mod test {
assert!(!res.0);
assert!(!res.1);
}
#[test]
fn test_walk_expr() {
fn walk_expr(s: &str) -> String {
let expr = get_first_field(format!("SELECT {} FROM f", s).as_str()).expr;
let mut calls = Vec::new();
let mut call_no = 0;
super::walk_expr(&expr, &mut |n| {
calls.push(format!("{}: {}", call_no, n));
call_no += 1;
Ok(())
})
.unwrap();
calls.join("\n")
}
insta::assert_display_snapshot!(walk_expr("5 + 6"));
insta::assert_display_snapshot!(walk_expr("count(5, foo + 7)"));
insta::assert_display_snapshot!(walk_expr("count(5, foo + 7) + sum(bar)"));
}
#[test]
fn test_walk_expr_mut() {
fn walk_expr_mut(s: &str) -> String {
let mut expr = get_first_field(format!("SELECT {} FROM f", s).as_str()).expr;
let mut calls = Vec::new();
let mut call_no = 0;
super::walk_expr_mut(&mut expr, &mut |n| {
calls.push(format!("{}: {}", call_no, n));
call_no += 1;
Ok(())
})
.unwrap();
calls.join("\n")
}
insta::assert_display_snapshot!(walk_expr_mut("5 + 6"));
insta::assert_display_snapshot!(walk_expr_mut("count(5, foo + 7)"));
insta::assert_display_snapshot!(walk_expr_mut("count(5, foo + 7) + sum(bar)"));
}
#[test]
fn test_walk_expr_mut_modify() {
let mut expr = get_first_field("SELECT foo + bar + 5 FROM f").expr;
walk_expr_mut(&mut expr, &mut |e| {
match e {
Expr::VarRef { name, .. } => *name = format!("c_{}", name).into(),
Expr::Literal(Literal::Unsigned(v)) => *v *= 10,
_ => {}
}
Ok(())
})
.unwrap();
assert_eq!(format!("{}", expr), "c_foo + c_bar + 50")
}
}

View File

@ -1,9 +0,0 @@
---
source: iox_query/src/plan/influxql/rewriter.rs
expression: "walk_expr(\"count(5, foo + 7)\")"
---
0: 5
1: foo
2: 7
3: foo + 7
4: count(5, foo + 7)

View File

@ -1,12 +0,0 @@
---
source: iox_query/src/plan/influxql/rewriter.rs
expression: "walk_expr(\"count(5, foo + 7) + sum(bar)\")"
---
0: 5
1: foo
2: 7
3: foo + 7
4: count(5, foo + 7)
5: bar
6: sum(bar)
7: count(5, foo + 7) + sum(bar)

View File

@ -1,7 +0,0 @@
---
source: iox_query/src/plan/influxql/rewriter.rs
expression: "walk_expr(\"5 + 6\")"
---
0: 5
1: 6
2: 5 + 6

View File

@ -1,9 +0,0 @@
---
source: iox_query/src/plan/influxql/rewriter.rs
expression: "walk_expr_mut(\"count(5, foo + 7)\")"
---
0: 5
1: foo
2: 7
3: foo + 7
4: count(5, foo + 7)

View File

@ -1,12 +0,0 @@
---
source: iox_query/src/plan/influxql/rewriter.rs
expression: "walk_expr_mut(\"count(5, foo + 7) + sum(bar)\")"
---
0: 5
1: foo
2: 7
3: foo + 7
4: count(5, foo + 7)
5: bar
6: sum(bar)
7: count(5, foo + 7) + sum(bar)

View File

@ -1,7 +0,0 @@
---
source: iox_query/src/plan/influxql/rewriter.rs
expression: "walk_expr_mut(\"5 + 6\")"
---
0: 5
1: 6
2: 5 + 6

View File

@ -91,6 +91,7 @@ impl<I: IngesterRpcInterface + Sync + Send + Debug + 'static> ServerType for Ing
add_service!(builder, self.server.rpc().catalog_service());
add_service!(builder, self.server.rpc().write_service());
add_service!(builder, self.server.rpc().persist_service());
add_service!(
builder,
self.server

View File

@ -16,6 +16,7 @@ generated_types = { path = "../generated_types" }
http = "0.2.8"
hyper = "0.14"
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format"] }
iox_arrow_flight = { path = "../iox_arrow_flight" }
nix = "0.26"
observability_deps = { path = "../observability_deps" }
once_cell = { version = "1.17", features = ["parking_lot"] }

View File

@ -2,6 +2,7 @@ use crate::{
dump_log_to_stdout, log_command, rand_id, write_to_router, ServerFixture, TestConfig,
TestServer,
};
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use assert_cmd::prelude::*;
use data_types::{NamespaceId, TableId};
use futures::{stream::FuturesOrdered, StreamExt};
@ -9,8 +10,12 @@ use http::Response;
use hyper::Body;
use influxdb_iox_client::{
connection::GrpcConnection,
flight::generated_types::{IngesterQueryRequest, IngesterQueryResponseMetadata},
schema::generated_types::{schema_service_client::SchemaServiceClient, GetSchemaRequest},
};
use iox_arrow_flight::{
prost::Message, DecodedFlightData, DecodedPayload, FlightDataStream, FlightError,
};
use observability_deps::tracing::{debug, info};
use once_cell::sync::Lazy;
use std::{
@ -172,6 +177,47 @@ impl MiniCluster {
new_cluster
}
/// Create a shared "version 2" MiniCluster that has a router, ingester set to essentially
/// never persist data (except on-demand), and querier (but no
/// compactor as that should be run on-demand in tests).
///
/// Note: Because the underlying server processes are shared across multiple tests, all users
/// of this `MiniCluster` instance should only modify their own unique namespace.
pub async fn create_shared2_never_persist(database_url: String) -> Self {
let start = Instant::now();
let mut shared_servers = GLOBAL_SHARED_SERVERS2_NEVER_PERSIST.lock().await;
debug!(mutex_wait=?start.elapsed(), "creating standard2 cluster");
// try to reuse existing server processes
if let Some(shared) = shared_servers.take() {
if let Some(cluster) = shared.creatable_cluster().await {
debug!("Reusing existing cluster");
// Put the server back
*shared_servers = Some(shared);
let start = Instant::now();
// drop the lock prior to calling `create()` to allow others to proceed
std::mem::drop(shared_servers);
let new_self = cluster.create().await;
info!(
total_wait=?start.elapsed(),
"created new mini cluster2 from existing cluster"
);
return new_self;
} else {
info!("some server proceses of previous cluster2 have already returned");
}
}
// Have to make a new one
info!("Create a new server2 set to never persist");
let new_cluster = Self::create_non_shared2_never_persist(database_url).await;
// Update the shared servers to point at the newly created server proesses
*shared_servers = Some(SharedServers::new(&new_cluster));
new_cluster
}
/// Create a non shared "standard" MiniCluster that has a router, ingester, querier. Save
/// config for a compactor, but the compactor should be run on-demand in tests using `compactor
/// run-once` rather than using `run compactor`.
@ -208,6 +254,23 @@ impl MiniCluster {
.await
}
/// Create a non-shared "version 2" MiniCluster that has a router, ingester set to essentially
/// never persist data (except on-demand), and querier.
pub async fn create_non_shared2_never_persist(database_url: String) -> Self {
let ingester_config = TestConfig::new_ingester2_never_persist(&database_url);
let router_config = TestConfig::new_router2(&ingester_config);
let querier_config = TestConfig::new_querier2(&ingester_config);
// Set up the cluster ====================================
Self::new()
.with_ingester(ingester_config)
.await
.with_router(router_config)
.await
.with_querier(querier_config)
.await
}
/// Create an all-(minus compactor)-in-one server with the specified configuration
pub async fn create_all_in_one(test_config: TestConfig) -> Self {
Self::new()
@ -364,6 +427,48 @@ impl MiniCluster {
.await
}
/// Query the ingester using flight directly, rather than through a querier.
pub async fn query_ingester(
&self,
query: IngesterQueryRequest,
) -> Result<IngesterResponse, FlightError> {
let querier_flight =
influxdb_iox_client::flight::Client::new(self.ingester().ingester_grpc_connection());
let mut performed_query = querier_flight
.into_inner()
.do_get(query.encode_to_vec())
.await?
.into_inner();
let (msg, app_metadata) = next_message(&mut performed_query).await.unwrap();
assert!(matches!(msg, DecodedPayload::None), "{:?}", msg);
let schema = next_message(&mut performed_query)
.await
.map(|(msg, _)| unwrap_schema(msg));
let mut record_batches = vec![];
while let Some((msg, _md)) = next_message(&mut performed_query).await {
let batch = unwrap_record_batch(msg);
record_batches.push(batch);
}
Ok(IngesterResponse {
app_metadata,
schema,
record_batches,
})
}
/// Ask the ingester to persist its data.
pub async fn persist_ingester(&self) {
let mut ingester_client =
influxdb_iox_client::ingester::Client::new(self.ingester().ingester_grpc_connection());
ingester_client.persist().await.unwrap();
}
/// Get a reference to the mini cluster's other servers.
pub fn other_servers(&self) -> &[ServerFixture] {
self.other_servers.as_ref()
@ -428,6 +533,14 @@ impl MiniCluster {
}
}
/// Gathers data from ingester Flight queries
#[derive(Debug)]
pub struct IngesterResponse {
pub app_metadata: IngesterQueryResponseMetadata,
pub schema: Option<SchemaRef>,
pub record_batches: Vec<RecordBatch>,
}
/// holds shared server processes to share across tests
#[derive(Clone)]
struct SharedServers {
@ -532,3 +645,31 @@ static GLOBAL_SHARED_SERVERS: Lazy<Mutex<Option<SharedServers>>> = Lazy::new(||
// For the new server versions. `GLOBAL_SHARED_SERVERS` can be removed and this can be renamed
// when the migration to router2/etc is complete.
static GLOBAL_SHARED_SERVERS2: Lazy<Mutex<Option<SharedServers>>> = Lazy::new(|| Mutex::new(None));
static GLOBAL_SHARED_SERVERS2_NEVER_PERSIST: Lazy<Mutex<Option<SharedServers>>> =
Lazy::new(|| Mutex::new(None));
async fn next_message(
performed_query: &mut FlightDataStream,
) -> Option<(DecodedPayload, IngesterQueryResponseMetadata)> {
let DecodedFlightData { inner, payload } = performed_query.next().await.transpose().unwrap()?;
// extract the metadata from the underlying FlightData structure
let app_metadata = &inner.app_metadata[..];
let app_metadata: IngesterQueryResponseMetadata = Message::decode(app_metadata).unwrap();
Some((payload, app_metadata))
}
fn unwrap_schema(msg: DecodedPayload) -> SchemaRef {
match msg {
DecodedPayload::Schema(s) => s,
_ => panic!("Unexpected message type: {:?}", msg),
}
}
fn unwrap_record_batch(msg: DecodedPayload) -> RecordBatch {
match msg {
DecodedPayload::RecordBatch(b) => b,
_ => panic!("Unexpected message type: {:?}", msg),
}
}

View File

@ -161,6 +161,9 @@ pub enum Step {
/// files from the value this step recorded.
RecordNumParquetFiles,
/// Ask the ingester to persist immediately through the persist service gRPC API
Persist,
/// Wait for all previously written data to be persisted by observing an increase in the number
/// of Parquet files in the catalog as specified for this cluster's namespace. Needed for
/// router2/ingester2/querier2.
@ -282,6 +285,10 @@ impl<'a> StepTest<'a> {
Step::RecordNumParquetFiles => {
state.record_num_parquet_files().await;
}
// Ask the ingester to persist immediately through the persist service gRPC API
Step::Persist => {
state.cluster().persist_ingester().await;
}
Step::WaitForPersisted2 { expected_increase } => {
info!("====Begin waiting for a change in the number of Parquet files");
state