fix: be more conservative w/ "request canceled" messages
parent
ee92d28dfd
commit
d2399764c5
|
@ -14,6 +14,7 @@
|
|||
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
|
@ -217,8 +218,8 @@ where
|
|||
Poll::Ready(Ok(response.map(|body| TracedBody {
|
||||
request_ctx,
|
||||
span_recorder,
|
||||
was_done_data: false,
|
||||
was_ready_trailers: false,
|
||||
was_done_data: AtomicBool::new(false),
|
||||
was_ready_trailers: AtomicBool::new(false),
|
||||
inner: body,
|
||||
metrics_recorder,
|
||||
})))
|
||||
|
@ -235,8 +236,8 @@ pub struct TracedBody<B> {
|
|||
request_ctx: Option<RequestLogContext>,
|
||||
span_recorder: SpanRecorder,
|
||||
metrics_recorder: MetricsRecorder,
|
||||
was_done_data: bool,
|
||||
was_ready_trailers: bool,
|
||||
was_done_data: AtomicBool,
|
||||
was_ready_trailers: AtomicBool,
|
||||
#[pin]
|
||||
inner: B,
|
||||
}
|
||||
|
@ -244,14 +245,14 @@ pub struct TracedBody<B> {
|
|||
#[pinned_drop]
|
||||
impl<B> PinnedDrop for TracedBody<B> {
|
||||
fn drop(self: Pin<&mut Self>) {
|
||||
if !self.was_done_data {
|
||||
if !self.was_done_data.load(Ordering::SeqCst) {
|
||||
let trace = self.request_ctx.format_jaeger();
|
||||
warn!(
|
||||
%trace,
|
||||
when="before fully returning body data",
|
||||
"request cancelled",
|
||||
);
|
||||
} else if !self.was_ready_trailers {
|
||||
} else if !self.was_ready_trailers.load(Ordering::SeqCst) {
|
||||
let trace = self.request_ctx.format_jaeger();
|
||||
warn!(
|
||||
%trace,
|
||||
|
@ -274,7 +275,10 @@ impl<B: http_body::Body> http_body::Body for TracedBody<B> {
|
|||
let result = match maybe_result {
|
||||
Some(result) => result,
|
||||
None => {
|
||||
*self.as_mut().project().was_done_data = true;
|
||||
self.as_mut()
|
||||
.project()
|
||||
.was_done_data
|
||||
.store(true, Ordering::SeqCst);
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
};
|
||||
|
@ -286,13 +290,17 @@ impl<B: http_body::Body> http_body::Body for TracedBody<B> {
|
|||
Ok(_) => match projected.inner.is_end_stream() {
|
||||
true => {
|
||||
metrics_recorder.set_classification(Classification::Ok);
|
||||
span_recorder.ok("returned body data and no trailers")
|
||||
span_recorder.ok("returned body data and no trailers");
|
||||
projected.was_done_data.store(true, Ordering::SeqCst);
|
||||
projected.was_ready_trailers.store(true, Ordering::SeqCst);
|
||||
}
|
||||
false => span_recorder.event("returned body data"),
|
||||
},
|
||||
Err(_) => {
|
||||
metrics_recorder.set_classification(Classification::ServerErr);
|
||||
span_recorder.error("error getting body");
|
||||
projected.was_done_data.store(true, Ordering::SeqCst);
|
||||
projected.was_ready_trailers.store(true, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
Poll::Ready(Some(result))
|
||||
|
@ -306,7 +314,10 @@ impl<B: http_body::Body> http_body::Body for TracedBody<B> {
|
|||
ready!(self.as_mut().project().inner.poll_trailers(cx));
|
||||
|
||||
let projected = self.as_mut().project();
|
||||
*projected.was_ready_trailers = true;
|
||||
|
||||
projected.was_done_data.store(true, Ordering::SeqCst);
|
||||
projected.was_ready_trailers.store(true, Ordering::SeqCst);
|
||||
|
||||
let span_recorder = projected.span_recorder;
|
||||
let metrics_recorder = projected.metrics_recorder;
|
||||
match &result {
|
||||
|
@ -330,7 +341,12 @@ impl<B: http_body::Body> http_body::Body for TracedBody<B> {
|
|||
}
|
||||
|
||||
fn is_end_stream(&self) -> bool {
|
||||
self.inner.is_end_stream()
|
||||
let res = self.inner.is_end_stream();
|
||||
if res {
|
||||
self.was_done_data.store(true, Ordering::SeqCst);
|
||||
self.was_ready_trailers.store(true, Ordering::SeqCst);
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> SizeHint {
|
||||
|
|
Loading…
Reference in New Issue