feat: Add decoded payload type and size to querier <--> ingester tracing (#7870)
* feat: Add decoded payload type and size to querier <--> ingester tracing * feat: add aggregate sizes --------- Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
7bcde3c544
commit
c1a448e930
|
@ -311,7 +311,10 @@ where
|
||||||
let res = self.inner.next_message().await;
|
let res = self.inner.next_message().await;
|
||||||
|
|
||||||
match &res {
|
match &res {
|
||||||
Ok(_) => span_recorder.ok("ok"),
|
Ok(res) => {
|
||||||
|
span_recorder.ok("ok");
|
||||||
|
self.record_metadata(&mut span_recorder, res.as_ref())
|
||||||
|
}
|
||||||
Err(e) => span_recorder.error(e.to_string()),
|
Err(e) => span_recorder.error(e.to_string()),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -319,6 +322,35 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T> QueryDataTracer<T>
|
||||||
|
where
|
||||||
|
T: QueryData,
|
||||||
|
{
|
||||||
|
/// Record additional metadata on the
|
||||||
|
fn record_metadata(
|
||||||
|
&self,
|
||||||
|
span_recorder: &mut SpanRecorder,
|
||||||
|
res: Option<&(DecodedPayload, proto::IngesterQueryResponseMetadata)>,
|
||||||
|
) {
|
||||||
|
let Some((payload, _metadata)) = res else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
match payload {
|
||||||
|
DecodedPayload::None => {
|
||||||
|
span_recorder.set_metadata("payload_type", "none");
|
||||||
|
}
|
||||||
|
DecodedPayload::Schema(_) => {
|
||||||
|
span_recorder.set_metadata("payload_type", "schema");
|
||||||
|
}
|
||||||
|
DecodedPayload::RecordBatch(batch) => {
|
||||||
|
span_recorder.set_metadata("payload_type", "batch");
|
||||||
|
span_recorder.set_metadata("num_rows", batch.num_rows() as i64);
|
||||||
|
span_recorder.set_metadata("mem_bytes", batch.get_array_memory_size() as i64);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
struct CachedConnection {
|
struct CachedConnection {
|
||||||
ingester_address: Arc<str>,
|
ingester_address: Arc<str>,
|
||||||
|
|
|
@ -220,6 +220,8 @@ struct IngesterResponseOk {
|
||||||
n_partitions: usize,
|
n_partitions: usize,
|
||||||
n_chunks: usize,
|
n_chunks: usize,
|
||||||
n_rows: usize,
|
n_rows: usize,
|
||||||
|
/// Estimated number of bytes this batch requires in memory
|
||||||
|
memory_bytes: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Helper to observe a single ingester request.
|
/// Helper to observe a single ingester request.
|
||||||
|
@ -257,6 +259,15 @@ impl<'a> ObserveIngesterRequest<'a> {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_ok(mut self, ok_status: IngesterResponseOk) {
|
fn set_ok(mut self, ok_status: IngesterResponseOk) {
|
||||||
|
self.span_recorder
|
||||||
|
.set_metadata("n_partitions", ok_status.n_partitions as i64);
|
||||||
|
self.span_recorder
|
||||||
|
.set_metadata("num_chunks", ok_status.n_chunks as i64);
|
||||||
|
self.span_recorder
|
||||||
|
.set_metadata("num_rows", ok_status.n_rows as i64);
|
||||||
|
self.span_recorder
|
||||||
|
.set_metadata("mem_bytes", ok_status.memory_bytes as i64);
|
||||||
|
|
||||||
self.res = Some(Ok(ok_status));
|
self.res = Some(Ok(ok_status));
|
||||||
self.span_recorder.ok("done");
|
self.span_recorder.ok("done");
|
||||||
}
|
}
|
||||||
|
@ -695,6 +706,7 @@ impl IngesterConnection for IngesterConnectionImpl {
|
||||||
for c in p.chunks() {
|
for c in p.chunks() {
|
||||||
status.n_chunks += 1;
|
status.n_chunks += 1;
|
||||||
status.n_rows += c.rows();
|
status.n_rows += c.rows();
|
||||||
|
status.memory_bytes += c.estimate_size()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue