From d2399764c525321da745941658461dc47b306870 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 1 Nov 2022 17:08:04 +0100 Subject: [PATCH] fix: be more conservative w/ "request canceled" messages --- trace_http/src/tower.rs | 36 ++++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/trace_http/src/tower.rs b/trace_http/src/tower.rs index 2b691e6672..3ef580103c 100644 --- a/trace_http/src/tower.rs +++ b/trace_http/src/tower.rs @@ -14,6 +14,7 @@ use std::future::Future; use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; @@ -217,8 +218,8 @@ where Poll::Ready(Ok(response.map(|body| TracedBody { request_ctx, span_recorder, - was_done_data: false, - was_ready_trailers: false, + was_done_data: AtomicBool::new(false), + was_ready_trailers: AtomicBool::new(false), inner: body, metrics_recorder, }))) @@ -235,8 +236,8 @@ pub struct TracedBody { request_ctx: Option, span_recorder: SpanRecorder, metrics_recorder: MetricsRecorder, - was_done_data: bool, - was_ready_trailers: bool, + was_done_data: AtomicBool, + was_ready_trailers: AtomicBool, #[pin] inner: B, } @@ -244,14 +245,14 @@ pub struct TracedBody { #[pinned_drop] impl PinnedDrop for TracedBody { fn drop(self: Pin<&mut Self>) { - if !self.was_done_data { + if !self.was_done_data.load(Ordering::SeqCst) { let trace = self.request_ctx.format_jaeger(); warn!( %trace, when="before fully returning body data", "request cancelled", ); - } else if !self.was_ready_trailers { + } else if !self.was_ready_trailers.load(Ordering::SeqCst) { let trace = self.request_ctx.format_jaeger(); warn!( %trace, @@ -274,7 +275,10 @@ impl http_body::Body for TracedBody { let result = match maybe_result { Some(result) => result, None => { - *self.as_mut().project().was_done_data = true; + self.as_mut() + .project() + .was_done_data + .store(true, Ordering::SeqCst); return Poll::Ready(None); } }; @@ -286,13 +290,17 @@ impl http_body::Body for TracedBody { Ok(_) => match projected.inner.is_end_stream() { true => { metrics_recorder.set_classification(Classification::Ok); - span_recorder.ok("returned body data and no trailers") + span_recorder.ok("returned body data and no trailers"); + projected.was_done_data.store(true, Ordering::SeqCst); + projected.was_ready_trailers.store(true, Ordering::SeqCst); } false => span_recorder.event("returned body data"), }, Err(_) => { metrics_recorder.set_classification(Classification::ServerErr); span_recorder.error("error getting body"); + projected.was_done_data.store(true, Ordering::SeqCst); + projected.was_ready_trailers.store(true, Ordering::SeqCst); } } Poll::Ready(Some(result)) @@ -306,7 +314,10 @@ impl http_body::Body for TracedBody { ready!(self.as_mut().project().inner.poll_trailers(cx)); let projected = self.as_mut().project(); - *projected.was_ready_trailers = true; + + projected.was_done_data.store(true, Ordering::SeqCst); + projected.was_ready_trailers.store(true, Ordering::SeqCst); + let span_recorder = projected.span_recorder; let metrics_recorder = projected.metrics_recorder; match &result { @@ -330,7 +341,12 @@ impl http_body::Body for TracedBody { } fn is_end_stream(&self) -> bool { - self.inner.is_end_stream() + let res = self.inner.is_end_stream(); + if res { + self.was_done_data.store(true, Ordering::SeqCst); + self.was_ready_trailers.store(true, Ordering::SeqCst); + } + res } fn size_hint(&self) -> SizeHint {