chore(7618): trace ingester response encoding v2 (#7820)

* test: integration test for tracing of queries to the ingester

* chore: add FlightFrameEncodeRecorder to record spans per each polling result

* refactor(trace): impl TraceCollector for Arc

Allow any Arc-wrapped TraceCollector implementation to be used as a
TraceCollector. This avoids needing to as_any() and downcast later.

* test: assert FlightFrameEncodeRecorder trace spans

This test exercises the FlightDataEncoder wrapped with the trace
decorator (FlightFrameEncodeRecorder) when executing against a data
source that yields data after varying numbers of Stream polls.

This test passing will validate the FlightFrameEncodeRecorder correctly
instruments the amount of time a client spends waiting on the
FlightDataEncoder to acquire or encode a protocol frame, but also
ensures the decorator correctly accounts for varying behaviours allowed
through the Stream abstraction. It does this by simulating a data source
that is not always immediately ready to provide data, such as a buffer
wrapped in a contended async mutex.

* refactor: move tracing decorator into separate mod

* fix: record spans

* refactor(test): update test

The frame encoder is not one-to-one - it emits two frames for the first
data payload, a schema and a payload. This commit updates the test to
account for it!

* refactor: remove unneeded mut ref, and use enum state method which panics when in a (should be unreachable) state

* chore: add more docs to FlightFrameEncodeRecorder and related

---------

Co-authored-by: Dom Dwyer <dom@itsallbroken.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
wiedld 2023-05-26 02:40:16 -07:00 committed by GitHub
parent eb0d77f354
commit 7bcde3c544
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 395 additions and 6 deletions

View File

@ -3,6 +3,7 @@ use test_helpers_end_to_end::{
maybe_skip_integration, GrpcRequestBuilder, MiniCluster, Step, StepTest, StepTestState,
TestConfig, UdpCapture,
};
use trace_exporters::DEFAULT_JAEGER_TRACE_CONTEXT_HEADER_NAME;
#[tokio::test]
pub async fn test_tracing_sql() {
@ -145,3 +146,51 @@ pub async fn test_tracing_create_trace() {
// wait for the UDP server to shutdown
udp_capture.stop().await;
}
#[tokio::test]
pub async fn test_tracing_create_ingester_query_trace() {
let database_url = maybe_skip_integration!();
let table_name = "the_table";
let udp_capture = UdpCapture::new().await;
let test_config = TestConfig::new_all_in_one(Some(database_url))
.with_tracing(&udp_capture)
// use the header attached with --gen-trace-id flag
.with_tracing_debug_name(DEFAULT_JAEGER_TRACE_CONTEXT_HEADER_NAME)
.with_ingester_never_persist();
let mut cluster = MiniCluster::create_all_in_one(test_config).await;
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocol(format!(
"{table_name},tag1=A,tag2=B val=42i 123456\n\
{table_name},tag1=A,tag2=C val=43i 123457"
)),
Step::Query {
sql: format!("select * from {table_name}"),
expected: vec![
"+------+------+--------------------------------+-----+",
"| tag1 | tag2 | time | val |",
"+------+------+--------------------------------+-----+",
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
"| A | C | 1970-01-01T00:00:00.000123457Z | 43 |",
"+------+------+--------------------------------+-----+",
],
},
],
)
.run()
.await;
// "shallow" packet inspection and verify the UDP server got omething that had some expected
// results (maybe we could eventually verify the payload here too)
udp_capture
.wait_for(|m| m.to_string().contains("frame encoding"))
.await;
// debugging assistance
// println!("Traces received (1):\n\n{:#?}", udp_capture.messages());
// wait for the UDP server to shutdown
udp_capture.stop().await;
}

View File

@ -16,7 +16,13 @@ use prost::Message;
use thiserror::Error;
use tokio::sync::{Semaphore, TryAcquireError};
use tonic::{Request, Response, Streaming};
use trace::{ctx::SpanContext, span::SpanExt};
use trace::{
ctx::SpanContext,
span::{Span, SpanExt},
};
mod instrumentation;
use instrumentation::FlightFrameEncodeRecorder;
use crate::{
ingester_id::IngesterId,
@ -179,7 +185,7 @@ where
let response = match self
.query_handler
.query_exec(namespace_id, table_id, request.columns, span)
.query_exec(namespace_id, table_id, request.columns, span.clone())
.await
{
Ok(v) => v,
@ -194,7 +200,7 @@ where
}
};
let output = encode_response(response, self.ingester_id).map_err(tonic::Status::from);
let output = encode_response(response, self.ingester_id, span).map_err(tonic::Status::from);
Ok(Response::new(Box::pin(output) as Self::DoGetStream))
}
@ -302,6 +308,7 @@ fn build_none_flight_msg() -> Vec<u8> {
fn encode_response(
response: QueryResponse,
ingester_id: IngesterId,
span: Option<Span>,
) -> impl Stream<Item = Result<FlightData, FlightError>> {
response.into_partition_stream().flat_map(move |partition| {
let partition_id = partition.id();
@ -323,14 +330,15 @@ fn encode_response(
// While there are more batches to process.
while let Some(schema) = batch_iter.peek().map(|v| v.schema()) {
output.push(
output.push(FlightFrameEncodeRecorder::new(
FlightDataEncoderBuilder::new().build(futures::stream::iter(
// Take all the RecordBatch with a matching schema
std::iter::from_fn(|| batch_iter.next_if(|v| v.schema() == schema))
.map(Ok)
.collect::<Vec<Result<_, FlightError>>>(),
)),
)
span.clone(),
))
}
head.chain(futures::stream::iter(output).flatten())

View File

@ -0,0 +1,315 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use arrow_flight::{encode::FlightDataEncoder, error::Result as FlightResult, FlightData};
use futures::Stream;
use pin_project::pin_project;
use trace::span::Span;
use trace::span::SpanRecorder;
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
/// State machine nodes for the [`FlightFrameEncodeRecorder`]
enum FrameEncodeRecorderState {
Unpolled,
Polled(SpanRecorder),
}
impl FrameEncodeRecorderState {
/// Unwrap `self`, returning the [`SpanRecorder`] and setting `self` to
/// [`FrameEncodeRecorderState::Unpolled`].
///
/// # Panics
///
/// Panics if `self` is not [`FrameEncodeRecorderState::Polled`].
fn take_recorder(&mut self) -> SpanRecorder {
match std::mem::replace(self, Self::Unpolled) {
Self::Unpolled => panic!("unwrapping incorrect state"),
Self::Polled(recorder) => recorder,
}
}
}
#[pin_project]
/// Recorder wrapper for flight data requests.
pub(crate) struct FlightFrameEncodeRecorder {
#[pin]
inner: FlightDataEncoder,
parent_span: Option<Span>,
state: FrameEncodeRecorderState,
}
impl FlightFrameEncodeRecorder {
/// Creates a [`FlightFrameEncodeRecorder`].
///
/// When a `parent_span` is provided, it adds a trace span as a child of the parent.
pub fn new(inner: FlightDataEncoder, parent_span: Option<Span>) -> Self {
Self {
inner,
parent_span,
state: FrameEncodeRecorderState::Unpolled,
}
}
}
impl Stream for FlightFrameEncodeRecorder {
type Item = FlightResult<FlightData>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
if let FrameEncodeRecorderState::Unpolled = this.state {
let recorder =
SpanRecorder::new(this.parent_span.as_ref().map(|s| s.child("frame encoding")));
*this.state = FrameEncodeRecorderState::Polled(recorder);
};
let poll = this.inner.poll_next(cx);
match poll {
Poll::Pending => {}
Poll::Ready(None) => {
this.state.take_recorder().ok("complete");
}
Poll::Ready(Some(Ok(_))) => {
this.state.take_recorder().ok("data emitted");
}
Poll::Ready(Some(Err(_))) => {
this.state.take_recorder().error("error");
}
}
poll
}
}
#[cfg(test)]
mod tests {
use std::{
sync::Arc,
time::{Duration, Instant},
};
use arrow::{array::Int64Array, record_batch::RecordBatch};
use arrow_flight::{encode::FlightDataEncoderBuilder, error::FlightError};
use assert_matches::assert_matches;
use futures::StreamExt;
use parking_lot::Mutex;
use trace::{ctx::SpanContext, RingBufferTraceCollector, TraceCollector};
use crate::make_batch;
use super::*;
/// The lower bound on how long a MockStream poll takes.
///
/// Actual time may be longer due to CPU scheduling.
const POLL_BLOCK_TIME: Duration = Duration::from_millis(100);
/// A manual implementation of the Stream trait that can be configured to
/// return pending/ready responses when polled.]
///
/// It simulates [`POLL_BLOCK_TIME`] of computation occurring each time the
/// mock is polled by blocking the caller thread before returning.
struct MockStream {
#[allow(clippy::type_complexity)]
ret: Mutex<
Box<dyn Iterator<Item = Poll<Option<Result<RecordBatch, FlightError>>>> + Send + Sync>,
>,
}
impl MockStream {
fn new<I, T>(iter: I) -> Self
where
I: IntoIterator<IntoIter = T> + 'static,
T: Iterator<Item = Poll<Option<Result<RecordBatch, FlightError>>>>
+ Send
+ Sync
+ 'static,
{
Self {
ret: Mutex::new(Box::new(iter.into_iter())),
}
}
}
impl Stream for MockStream {
type Item = Result<RecordBatch, FlightError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Simulate computation during the poll.
std::thread::sleep(POLL_BLOCK_TIME);
// Signal that this future is ready to be polled again immediately.
cx.waker().wake_by_ref();
// Return the configured poll result.
self.ret
.lock()
.next()
.expect("mock stream has no more values to yield")
}
}
/// A test that validates the correctness of the FlightFrameEncodeRecorder
/// across all permissible Stream behaviours, by simulating a query and
/// asserting the resulting trace spans durations approximately match the
/// desired measurements though bounds checking.
#[tokio::test]
async fn test_frame_encoding_latency_trace() {
// A dummy batch of data.
let (batch, _schema) = make_batch!(
Int64Array("a" => vec![42, 42, 42, 42]),
);
// Configure the data source to respond to polls with the following
// sequence:
//
// 1. (first poll) -> Pending
// 2. Pending
// 3. Ready(Some(data))
// 4. Pending
// 5. Ready(Some(data))
// 6. Ready(Some(data))
// 7. (end of stream) -> Ready(None)
//
// This simulates a data source that must be asynchronously driven to be
// able to yield a RecordBatch (such as a contended async mutex), taking
// at least 7 * POLL_BLOCK_TIME to yield all the data.
let data_source = MockStream::new([
Poll::Pending,
Poll::Pending,
Poll::Ready(Some(Ok(batch.clone()))),
Poll::Pending,
Poll::Ready(Some(Ok(batch.clone()))),
Poll::Ready(Some(Ok(batch))),
Poll::Ready(None),
]);
// Initialise a tracing backend to capture the emitted traces.
let trace_collector = Arc::new(RingBufferTraceCollector::new(5));
let trace_observer: Arc<dyn TraceCollector> = Arc::new(Arc::clone(&trace_collector));
let span_ctx = SpanContext::new(Arc::clone(&trace_observer));
let query_span = span_ctx.child("query span");
// Construct the frame encoder, providing it with the dummy data source,
// and wrap it the encoder in the metric decorator.
let call_chain = FlightFrameEncodeRecorder::new(
FlightDataEncoderBuilder::new().build(data_source.boxed()),
Some(query_span.clone()),
);
// Wait before using the encoder stack to simulate a delay between
// construction, and usage.
std::thread::sleep(2 * POLL_BLOCK_TIME);
// Record the starting time.
let started_at = Instant::now();
// Drive the call chain by collecting all the encoded frames from
// through the stack.
let encoded = call_chain.collect::<Vec<_>>().await;
// Record the finish time.
let ended_at = Instant::now();
// Assert that at least three frames of encoded data were yielded through
// the call chain (data frames + arbitrary protocol frames).
assert!(encoded.len() >= 3);
// Assert that the entire encoding call took AT LEAST as long as the
// lower bound time.
//
// The encoding call will have required polling the mock data source at
// least 7 times, causing 7 sleeps of POLL_BLOCK_TIME, plus additional
// scheduling/execution overhead.
let call_duration = ended_at.duration_since(started_at);
assert!(call_duration >= (7 * POLL_BLOCK_TIME));
// Look for give spans that capture the time a client had to wait for an
// encoded frame; the first two spans cover this part of the sequence:
//
// 1. (first poll) -> Pending
// 2. Pending
// 3. Ready(Some(data))
//
// This initial data batch produces two protocol frames - a schema
// frame, and a data payload frame, leading to a span each. Both
// combined should cover a duration of at least 3 * POLL_BLOCK_TIME.
//
// The third span covers the second batch of data:
//
// 4. Pending
// 5. Ready(Some(data))
//
// Which should take at least 2 * POLL_BLOCK_TIME.
//
// The fourth and last data span covers the final batch of data:
//
// 6. Ready(Some(data))
//
// Which should take at least POLL_BLOCK_TIME.
//
// The fifth and final span emitted covers the last poll which observes
// the completion of the stream:
//
// 7. Ready(None)
//
// Which should take at least POLL_BLOCK_TIME.
//
let spans = trace_collector.spans();
assert_matches!(spans.as_slice(), [schema_span, span1, span2, span3, end_span] => {
// The first span should have a duration of at least 3 *
// POLL_BLOCK_TIME, and is the schema descriptor protocol frame.
assert!(span_duration(schema_span) >= (3 * POLL_BLOCK_TIME));
assert_eq!(schema_span.name, "frame encoding");
// The underlying encoder has access to the data from the above
// poll, and is capable of emitting a data frame without polling the
// data source again, so no sleep occurs and the next frame is
// emitted quickly.
// The second data span should have a duration of at least 2 *
// POLL_BLOCK_TIME
assert!(span_duration(span2) >= (2 * POLL_BLOCK_TIME));
assert_eq!(span2.name, "frame encoding");
// The third data span should have a duration of at least
// POLL_BLOCK_TIME
assert!(span_duration(span3) >= POLL_BLOCK_TIME);
assert_eq!(span3.name, "frame encoding");
// The final span is emitted when the last poll yields None, which
// should take at least one POLL_BLOCK_TIME to pull from the
// underlying data source.
assert_eq!(end_span.name, "frame encoding");
assert!(span_duration(span3) >= POLL_BLOCK_TIME);
// All spans should be child spans of the input span.
assert_eq!(schema_span.ctx.parent_span_id, Some(query_span.ctx.span_id));
assert_eq!(span1.ctx.parent_span_id, Some(query_span.ctx.span_id));
assert_eq!(span2.ctx.parent_span_id, Some(query_span.ctx.span_id));
assert_eq!(span3.ctx.parent_span_id, Some(query_span.ctx.span_id));
assert_eq!(end_span.ctx.parent_span_id, Some(query_span.ctx.span_id));
// And when summed up, the duration of all spans (frame generation)
// should be less than the duration of the entire call itself.
let span_total = span_duration(schema_span)
+ span_duration(span1)
+ span_duration(span2)
+ span_duration(span3)
+ span_duration(end_span);
assert!(span_total <= call_duration);
});
}
/// Helper function to return the duration of time covered by `s`.
///
/// # Panics
///
/// Panics if `s` does not have a start and/or end timestamp, or the range
/// exceeds representable values (very very large time difference).
fn span_duration(s: &Span) -> Duration {
let start = s.start.expect("no start timestamp");
let end = s.end.expect("no end timestamp");
end.signed_duration_since(start).to_std().unwrap()
}
}

View File

@ -191,6 +191,10 @@ impl TestConfig {
)
}
pub fn with_ingester_never_persist(self) -> Self {
self.with_env("INFLUXDB_IOX_WAL_ROTATION_PERIOD_SECONDS", "86400")
}
/// Configure the single tenancy mode, including the authorization server.
pub fn with_single_tenancy(self, addr: impl Into<String>) -> Self {
self.with_env("INFLUXDB_IOX_AUTHZ_ADDR", addr)

View File

@ -14,7 +14,7 @@
// Workaround for "unused crate" lint false positives.
use workspace_hack as _;
use std::{any::Any, collections::VecDeque};
use std::{any::Any, collections::VecDeque, sync::Arc};
use parking_lot::Mutex;
@ -91,3 +91,16 @@ impl TraceCollector for RingBufferTraceCollector {
self
}
}
impl<T> TraceCollector for Arc<T>
where
T: TraceCollector,
{
fn export(&self, span: Span) {
(**self).export(span)
}
fn as_any(&self) -> &dyn Any {
(**self).as_any()
}
}