refactor: remove `procspawn` (#7990)

`procspawn` indirectly pulls in many outdated dependencies. It was
introduced in #7850 to work around a flaky test. Isolating the test into
its own binary has the same affect and requires less dependencies.
pull/24376/head
Marco Neumann 2023-06-14 13:09:37 +02:00 committed by GitHub
parent c4d475cb86
commit 1762172321
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 325 additions and 553 deletions

483
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -13,9 +13,12 @@ datafusion_util = { path = "../datafusion_util" }
generated_types = { path = "../generated_types" }
observability_deps = { path = "../observability_deps" }
predicate = { path = "../predicate" }
influxdb_storage_client = { path = "../influxdb_storage_client", optional = true }
iox_query = { path = "../iox_query" }
query_functions = { path = "../query_functions"}
service_common = { path = "../service_common" }
service_grpc_testing = { path = "../service_grpc_testing", optional = true }
tokio = { version = "1.28", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"], optional = true }
trace = { path = "../trace"}
trace_http = { path = "../trace_http"}
tracker = { path = "../tracker" }
@ -39,7 +42,6 @@ datafusion_util = { path = "../datafusion_util" }
influxdb_storage_client = { path = "../influxdb_storage_client" }
metric = { path = "../metric" }
panic_logging = { path = "../panic_logging" }
procspawn = { version = "0.10", default-features = false, features = ["test-support", "safe-shared-libraries"] }
schema = { path = "../schema" }
service_grpc_testing = { path = "../service_grpc_testing" }
test_helpers = { path = "../test_helpers" }
@ -48,3 +50,10 @@ trace_http = { path = "../trace_http" }
# Crates.io dependencies, in alphabetical order
parking_lot = "0.12"
tokio = { version = "1.28", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
[features]
test-util = ["influxdb_storage_client", "service_grpc_testing", "tokio"]
[[test]]
name = "log_on_panic"
required-features = ["test-util"]

View File

@ -18,6 +18,10 @@
// Workaround for "unused crate" lint false positives.
use workspace_hack as _;
// rustc doesn't seem to understand test-only requirements
#[cfg(test)]
use panic_logging as _;
/// `[0x00]` is the magic value that that the storage gRPC layer uses to
/// encode a tag_key that means "measurement name"
pub(crate) const TAG_KEY_MEASUREMENT: &[u8] = &[0];
@ -35,6 +39,9 @@ mod query_completed_token;
mod response_chunking;
pub mod service;
#[cfg(any(test, feature = "test-util"))]
pub mod test_util;
use generated_types::storage_server::{Storage, StorageServer};
use service_common::QueryNamespaceProvider;
use std::sync::Arc;

View File

@ -1717,30 +1717,18 @@ where
#[cfg(test)]
mod tests {
use crate::test_util::Fixture;
use super::*;
use futures::Future;
use generated_types::{
google::rpc::Status as GrpcStatus, i_ox_testing_client::IOxTestingClient,
tag_key_predicate::Value,
};
use influxdb_storage_client::{
connection::{Builder as ConnectionBuilder, Connection, GrpcConnection},
generated_types::*,
Client as StorageClient, OrgAndBucket,
};
use generated_types::{google::rpc::Status as GrpcStatus, tag_key_predicate::Value};
use influxdb_storage_client::{generated_types::*, Client as StorageClient, OrgAndBucket};
use iox_query::test::TestChunk;
use metric::{Attributes, Metric, U64Counter, U64Gauge};
use panic_logging::SendPanicsToTracing;
use service_common::test_util::TestDatabaseStore;
use std::{
any::Any,
net::{IpAddr, Ipv4Addr, SocketAddr},
num::NonZeroU64,
sync::Arc,
};
use test_helpers::{assert_contains, maybe_start_logging, tracing::TracingCapture};
use tokio::{pin, task::JoinHandle};
use tokio_stream::wrappers::TcpListenerStream;
use std::{any::Any, num::NonZeroU64, sync::Arc};
use test_helpers::{assert_contains, maybe_start_logging};
use tokio::pin;
fn to_str_vec(s: &[&str]) -> Vec<String> {
s.iter().map(|s| s.to_string()).collect()
@ -2608,84 +2596,6 @@ mod tests {
grpc_request_metric_has_count(&fixture, "MeasurementTagValues", "server_error", 1);
}
procspawn::enable_test_support!();
#[test]
fn test_log_on_panic() {
// libtest (i.e. the standard library test fixture) sets panic hooks. This will race w/ our own panic hooks. To
// prevent that, we spawn a dedicated process with its own panic hooks that is isolated from the remaining
// tests.
procspawn::spawn((), |_| {
// do NOT write to stdout (default behavior)
std::panic::set_hook(Box::new(|_| {}));
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(test_log_on_panic_inner());
})
.join()
.unwrap();
}
async fn test_log_on_panic_inner() {
// Send a message to a route that causes a panic and ensure:
// 1. We don't use up all executors 2. The panic message
// message ends up in the log system
// Normally, the global panic logger is set at program start
let _f = SendPanicsToTracing::new();
// capture all tracing messages
let tracing_capture = TracingCapture::new();
// Start a test gRPC server on a randomally allocated port
let mut fixture = Fixture::new().await.expect("Connecting to test server");
let request = TestErrorRequest {};
// Test response from storage server
let response = fixture.iox_client.test_error(request).await;
match &response {
Ok(_) => {
panic!("Unexpected success: {response:?}");
}
Err(status) => {
assert_eq!(status.code(), tonic::Code::Cancelled);
assert_contains!(
status.message(),
"http2 error: stream error received: stream no longer needed"
);
}
};
// Ensure that the logs captured the panic
let captured_logs = tracing_capture.to_string();
// Note we don't include the actual line / column in the
// expected panic message to avoid needing to update the test
// whenever the source code file changed.
let expected_error = "'This is a test panic', service_grpc_testing/src/lib.rs:";
assert_contains!(captured_logs, expected_error);
// Ensure that panics don't exhaust the tokio executor by
// running 100 times (success is if we can make a successful
// call after this)
for _ in 0usize..100 {
let request = TestErrorRequest {};
// Test response from storage server
let response = fixture.iox_client.test_error(request).await;
assert!(response.is_err(), "Got an error response: {response:?}");
}
// Ensure there are still threads to answer actual client queries
let caps = fixture.storage_client.capabilities().await.unwrap();
assert!(!caps.is_empty(), "Caps: {caps:?}");
}
#[tokio::test]
async fn test_read_filter() {
test_helpers::maybe_start_logging();
@ -3736,97 +3646,6 @@ mod tests {
v.iter().map(|s| s.to_string()).collect()
}
#[derive(Debug, Snafu)]
pub enum FixtureError {
#[snafu(display("Error binding fixture server: {}", source))]
Bind { source: std::io::Error },
#[snafu(display("Error creating fixture: {}", source))]
Tonic { source: tonic::transport::Error },
}
// Wrapper around raw clients and test database
struct Fixture {
client_connection: Connection,
iox_client: IOxTestingClient<GrpcConnection>,
storage_client: StorageClient,
test_storage: Arc<TestDatabaseStore>,
join_handle: JoinHandle<()>,
}
impl Fixture {
/// Start up a test storage server listening on `port`, returning
/// a fixture with the test server and clients
async fn new() -> Result<Self, FixtureError> {
Self::new_with_semaphore_size(u16::MAX as usize).await
}
async fn new_with_semaphore_size(semaphore_size: usize) -> Result<Self, FixtureError> {
let test_storage = Arc::new(TestDatabaseStore::new_with_semaphore_size(semaphore_size));
// Get a random port from the kernel by asking for port 0.
let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let socket = tokio::net::TcpListener::bind(bind_addr)
.await
.context(BindSnafu)?;
// Pull the assigned port out of the socket
let bind_addr = socket.local_addr().unwrap();
println!("Starting InfluxDB IOx storage test server on {bind_addr:?}");
let trace_header_parser = trace_http::ctx::TraceHeaderParser::new();
let router = tonic::transport::Server::builder()
.layer(trace_http::tower::TraceLayer::new(
trace_header_parser,
Arc::clone(&test_storage.metric_registry),
None,
true,
"test server",
))
.add_service(service_grpc_testing::make_server())
.add_service(crate::make_server(Arc::clone(&test_storage)));
let server = async move {
let stream = TcpListenerStream::new(socket);
router
.serve_with_incoming(stream)
.await
.log_if_error("Running Tonic Server")
.ok();
};
let join_handle = tokio::task::spawn(server);
let client_connection = ConnectionBuilder::default()
.connect_timeout(std::time::Duration::from_secs(30))
.build(format!("http://{bind_addr}"))
.await
.unwrap();
let iox_client =
IOxTestingClient::new(client_connection.clone().into_grpc_connection());
let storage_client = StorageClient::new(client_connection.clone());
Ok(Self {
client_connection,
iox_client,
storage_client,
test_storage,
join_handle,
})
}
}
impl Drop for Fixture {
fn drop(&mut self) {
self.join_handle.abort();
}
}
/// Assert that given future is pending.
///
/// This will try to poll the future a bit to ensure that it is not stuck in tokios task preemption.

View File

@ -0,0 +1,107 @@
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Arc,
};
use generated_types::i_ox_testing_client::IOxTestingClient;
use influxdb_storage_client::{
connection::{Builder as ConnectionBuilder, Connection, GrpcConnection},
Client as StorageClient,
};
use service_common::test_util::TestDatabaseStore;
use snafu::{ResultExt, Snafu};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::TcpListenerStream;
use crate::service::ErrorLogger;
#[derive(Debug, Snafu)]
pub enum FixtureError {
#[snafu(display("Error binding fixture server: {}", source))]
Bind { source: std::io::Error },
#[snafu(display("Error creating fixture: {}", source))]
Tonic { source: tonic::transport::Error },
}
/// Wrapper around raw clients and test database
#[derive(Debug)]
pub struct Fixture {
pub client_connection: Connection,
pub iox_client: IOxTestingClient<GrpcConnection>,
pub storage_client: StorageClient,
pub test_storage: Arc<TestDatabaseStore>,
pub join_handle: JoinHandle<()>,
}
impl Fixture {
/// Start up a test storage server listening on `port`, returning
/// a fixture with the test server and clients
pub async fn new() -> Result<Self, FixtureError> {
Self::new_with_semaphore_size(u16::MAX as usize).await
}
pub async fn new_with_semaphore_size(semaphore_size: usize) -> Result<Self, FixtureError> {
let test_storage = Arc::new(TestDatabaseStore::new_with_semaphore_size(semaphore_size));
// Get a random port from the kernel by asking for port 0.
let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let socket = tokio::net::TcpListener::bind(bind_addr)
.await
.context(BindSnafu)?;
// Pull the assigned port out of the socket
let bind_addr = socket.local_addr().unwrap();
println!("Starting InfluxDB IOx storage test server on {bind_addr:?}");
let trace_header_parser = trace_http::ctx::TraceHeaderParser::new();
let router = tonic::transport::Server::builder()
.layer(trace_http::tower::TraceLayer::new(
trace_header_parser,
Arc::clone(&test_storage.metric_registry),
None,
true,
"test server",
))
.add_service(service_grpc_testing::make_server())
.add_service(crate::make_server(Arc::clone(&test_storage)));
let server = async move {
let stream = TcpListenerStream::new(socket);
router
.serve_with_incoming(stream)
.await
.log_if_error("Running Tonic Server")
.ok();
};
let join_handle = tokio::task::spawn(server);
let client_connection = ConnectionBuilder::default()
.connect_timeout(std::time::Duration::from_secs(30))
.build(format!("http://{bind_addr}"))
.await
.unwrap();
let iox_client = IOxTestingClient::new(client_connection.clone().into_grpc_connection());
let storage_client = StorageClient::new(client_connection.clone());
Ok(Self {
client_connection,
iox_client,
storage_client,
test_storage,
join_handle,
})
}
}
impl Drop for Fixture {
fn drop(&mut self) {
self.join_handle.abort();
}
}

View File

@ -0,0 +1,71 @@
use generated_types::TestErrorRequest;
use panic_logging::SendPanicsToTracing;
use service_grpc_influxrpc::test_util::Fixture;
use test_helpers::{assert_contains, tracing::TracingCapture};
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
// !!!!!!!!!!!!!!!!!!!! IMPORTANT !!!!!!!!!!!!!!!!!!!!
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
//
// This file MUST only contain a single test, otherwise
// libtest's panic hooks will interfer with our custom
// panic hook.
//
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
#[tokio::test]
async fn test() {
// Send a message to a route that causes a panic and ensure:
// 1. We don't use up all executors 2. The panic message
// message ends up in the log system
// Normally, the global panic logger is set at program start
let _f = SendPanicsToTracing::new();
// capture all tracing messages
let tracing_capture = TracingCapture::new();
// Start a test gRPC server on a randomally allocated port
let mut fixture = Fixture::new().await.expect("Connecting to test server");
let request = TestErrorRequest {};
// Test response from storage server
let response = fixture.iox_client.test_error(request).await;
match &response {
Ok(_) => {
panic!("Unexpected success: {response:?}");
}
Err(status) => {
assert_eq!(status.code(), tonic::Code::Cancelled);
assert_contains!(
status.message(),
"http2 error: stream error received: stream no longer needed"
);
}
};
// Ensure that the logs captured the panic
let captured_logs = tracing_capture.to_string();
// Note we don't include the actual line / column in the
// expected panic message to avoid needing to update the test
// whenever the source code file changed.
let expected_error = "'This is a test panic', service_grpc_testing/src/lib.rs:";
assert_contains!(captured_logs, expected_error);
// Ensure that panics don't exhaust the tokio executor by
// running 100 times (success is if we can make a successful
// call after this)
for _ in 0usize..100 {
let request = TestErrorRequest {};
// Test response from storage server
let response = fixture.iox_client.test_error(request).await;
assert!(response.is_err(), "Got an error response: {response:?}");
}
// Ensure there are still threads to answer actual client queries
let caps = fixture.storage_client.capabilities().await.unwrap();
assert!(!caps.is_empty(), "Caps: {caps:?}");
}

View File

@ -204,7 +204,7 @@ once_cell = { version = "1", default-features = false, features = ["unstable"] }
rustls = { version = "0.21", features = ["dangerous_configuration"] }
scopeguard = { version = "1" }
webpki = { version = "0.22", default-features = false, features = ["std"] }
winapi = { version = "0.3", default-features = false, features = ["basetsd", "consoleapi", "errhandlingapi", "fileapi", "handleapi", "impl-debug", "impl-default", "ioapiset", "knownfolders", "libloaderapi", "memoryapi", "minwinbase", "minwindef", "namedpipeapi", "ntsecapi", "ntstatus", "objbase", "processenv", "processthreadsapi", "psapi", "shellapi", "shlobj", "std", "stringapiset", "synchapi", "sysinfoapi", "timezoneapi", "winbase", "wincon", "winerror", "winnt", "winreg", "winuser", "ws2ipdef", "ws2tcpip", "wtypesbase"] }
winapi = { version = "0.3", default-features = false, features = ["basetsd", "consoleapi", "errhandlingapi", "fileapi", "handleapi", "impl-debug", "impl-default", "knownfolders", "minwinbase", "minwindef", "ntsecapi", "ntstatus", "objbase", "processenv", "shellapi", "shlobj", "std", "stringapiset", "synchapi", "sysinfoapi", "timezoneapi", "winbase", "wincon", "winerror", "winnt", "winreg", "winuser", "ws2ipdef", "ws2tcpip", "wtypesbase"] }
windows-sys-53888c27b7ba5cf4 = { package = "windows-sys", version = "0.45", features = ["Win32_Foundation", "Win32_Networking_WinSock", "Win32_Security", "Win32_Storage_FileSystem", "Win32_System_Console", "Win32_System_IO", "Win32_System_LibraryLoader", "Win32_System_Pipes", "Win32_System_SystemServices", "Win32_System_WindowsProgramming", "Win32_UI_Input_KeyboardAndMouse"] }
windows-sys-c8eced492e86ede7 = { package = "windows-sys", version = "0.48", features = ["Win32_Foundation", "Win32_Networking_WinSock", "Win32_Security", "Win32_Storage_FileSystem", "Win32_System_Console", "Win32_System_IO", "Win32_System_Pipes", "Win32_System_SystemServices", "Win32_System_Threading", "Win32_System_WindowsProgramming", "Win32_UI_Shell"] }
@ -212,7 +212,7 @@ windows-sys-c8eced492e86ede7 = { package = "windows-sys", version = "0.48", feat
once_cell = { version = "1", default-features = false, features = ["unstable"] }
scopeguard = { version = "1" }
webpki = { version = "0.22", default-features = false, features = ["std"] }
winapi = { version = "0.3", default-features = false, features = ["basetsd", "consoleapi", "errhandlingapi", "fileapi", "handleapi", "impl-debug", "impl-default", "ioapiset", "knownfolders", "libloaderapi", "memoryapi", "minwinbase", "minwindef", "namedpipeapi", "ntsecapi", "ntstatus", "objbase", "processenv", "processthreadsapi", "psapi", "shellapi", "shlobj", "std", "stringapiset", "synchapi", "sysinfoapi", "timezoneapi", "winbase", "wincon", "winerror", "winnt", "winreg", "winuser", "ws2ipdef", "ws2tcpip", "wtypesbase"] }
winapi = { version = "0.3", default-features = false, features = ["basetsd", "consoleapi", "errhandlingapi", "fileapi", "handleapi", "impl-debug", "impl-default", "knownfolders", "minwinbase", "minwindef", "ntsecapi", "ntstatus", "objbase", "processenv", "shellapi", "shlobj", "std", "stringapiset", "synchapi", "sysinfoapi", "timezoneapi", "winbase", "wincon", "winerror", "winnt", "winreg", "winuser", "ws2ipdef", "ws2tcpip", "wtypesbase"] }
windows-sys-53888c27b7ba5cf4 = { package = "windows-sys", version = "0.45", features = ["Win32_Foundation", "Win32_Networking_WinSock", "Win32_Security", "Win32_Storage_FileSystem", "Win32_System_Console", "Win32_System_IO", "Win32_System_LibraryLoader", "Win32_System_Pipes", "Win32_System_SystemServices", "Win32_System_WindowsProgramming", "Win32_UI_Input_KeyboardAndMouse"] }
windows-sys-c8eced492e86ede7 = { package = "windows-sys", version = "0.48", features = ["Win32_Foundation", "Win32_Networking_WinSock", "Win32_Security", "Win32_Storage_FileSystem", "Win32_System_Console", "Win32_System_IO", "Win32_System_Pipes", "Win32_System_SystemServices", "Win32_System_Threading", "Win32_System_WindowsProgramming", "Win32_UI_Shell"] }