test: random ports for storage fixture server
Rather than having to specify unique ports for test server instances, have the kernel randomly assign ports and configure the storage gRPC client to use them.pull/24376/head
parent
0eaa90e89d
commit
49fe88fced
|
@ -103,7 +103,11 @@ pub async fn main() -> Result<()> {
|
|||
}
|
||||
};
|
||||
|
||||
let grpc_server = storage::make_server(grpc_bind_addr, storage.clone(), executor);
|
||||
let socket = tokio::net::TcpListener::bind(grpc_bind_addr)
|
||||
.await
|
||||
.expect("failed to bind server");
|
||||
|
||||
let grpc_server = storage::make_server(socket, storage.clone(), executor);
|
||||
|
||||
info!("gRPC server listening on http://{}", grpc_bind_addr);
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
//! implemented in terms of the `query::Database` and
|
||||
//! `query::DatabaseStore`
|
||||
|
||||
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use generated_types::{
|
||||
i_ox_server::{IOx, IOxServer},
|
||||
|
@ -39,7 +39,7 @@ use query::{
|
|||
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::{net::TcpListener, sync::mpsc};
|
||||
use tonic::Status;
|
||||
use tracing::{info, warn};
|
||||
|
||||
|
@ -1128,7 +1128,7 @@ where
|
|||
/// underlying hyper server instance. Resolves when the server has
|
||||
/// shutdown.
|
||||
pub async fn make_server<T>(
|
||||
bind_addr: SocketAddr,
|
||||
socket: TcpListener,
|
||||
storage: Arc<T>,
|
||||
executor: Arc<QueryExecutor>,
|
||||
) -> Result<()>
|
||||
|
@ -1144,7 +1144,7 @@ where
|
|||
storage.clone(),
|
||||
executor.clone(),
|
||||
)))
|
||||
.serve(bind_addr)
|
||||
.serve_with_incoming(socket)
|
||||
.await
|
||||
.context(ServerError {})
|
||||
.log_if_error("Running Tonic Server")
|
||||
|
@ -1190,9 +1190,7 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_influxdb_iox_rpc() -> Result<()> {
|
||||
let mut fixture = Fixture::new(11807)
|
||||
.await
|
||||
.expect("Connecting to test server");
|
||||
let mut fixture = Fixture::new().await.expect("Connecting to test server");
|
||||
|
||||
let org = Organization {
|
||||
id: 1337,
|
||||
|
@ -1222,10 +1220,8 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_storage_rpc_capabilities() -> Result<(), tonic::Status> {
|
||||
// Note we use a unique port. TODO: let the OS pick the port
|
||||
let mut fixture = Fixture::new(11808)
|
||||
.await
|
||||
.expect("Connecting to test server");
|
||||
// Start a test gRPC server on a randomally allocated port
|
||||
let mut fixture = Fixture::new().await.expect("Connecting to test server");
|
||||
|
||||
// Test response from storage server
|
||||
let mut expected_capabilities: HashMap<String, Vec<String>> = HashMap::new();
|
||||
|
@ -1244,10 +1240,8 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_storage_rpc_measurement_names() -> Result<(), tonic::Status> {
|
||||
// Note we use a unique port. TODO: let the OS pick the port
|
||||
let mut fixture = Fixture::new(11809)
|
||||
.await
|
||||
.expect("Connecting to test server");
|
||||
// Start a test gRPC server on a randomally allocated port
|
||||
let mut fixture = Fixture::new().await.expect("Connecting to test server");
|
||||
|
||||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
@ -1299,10 +1293,8 @@ mod tests {
|
|||
/// and that the returned values are sent back via gRPC.
|
||||
#[tokio::test]
|
||||
async fn test_storage_rpc_tag_keys() -> Result<(), tonic::Status> {
|
||||
// Note we use a unique port. TODO: let the OS pick the port
|
||||
let mut fixture = Fixture::new(11810)
|
||||
.await
|
||||
.expect("Connecting to test server");
|
||||
// Start a test gRPC server on a randomally allocated port
|
||||
let mut fixture = Fixture::new().await.expect("Connecting to test server");
|
||||
|
||||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
@ -1380,10 +1372,8 @@ mod tests {
|
|||
/// and that the returned values are sent back via gRPC.
|
||||
#[tokio::test]
|
||||
async fn test_storage_rpc_measurement_tag_keys() -> Result<(), tonic::Status> {
|
||||
// Note we use a unique port. TODO: let the OS pick the port
|
||||
let mut fixture = Fixture::new(11811)
|
||||
.await
|
||||
.expect("Connecting to test server");
|
||||
// Start a test gRPC server on a randomally allocated port
|
||||
let mut fixture = Fixture::new().await.expect("Connecting to test server");
|
||||
|
||||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
@ -1476,10 +1466,8 @@ mod tests {
|
|||
/// and that the returned values are sent back via gRPC.
|
||||
#[tokio::test]
|
||||
async fn test_storage_rpc_tag_values() -> Result<(), tonic::Status> {
|
||||
// Note we use a unique port. TODO: let the OS pick the port
|
||||
let mut fixture = Fixture::new(11812)
|
||||
.await
|
||||
.expect("Connecting to test server");
|
||||
// Start a test gRPC server on a randomally allocated port
|
||||
let mut fixture = Fixture::new().await.expect("Connecting to test server");
|
||||
|
||||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
@ -1582,10 +1570,8 @@ mod tests {
|
|||
/// and that the returned values are sent back via gRPC.
|
||||
#[tokio::test]
|
||||
async fn test_storage_rpc_measurement_tag_values() -> Result<(), tonic::Status> {
|
||||
// Note we use a unique port. TODO: let the OS pick the port
|
||||
let mut fixture = Fixture::new(11813)
|
||||
.await
|
||||
.expect("Connecting to test server");
|
||||
// Start a test gRPC server on a randomally allocated port
|
||||
let mut fixture = Fixture::new().await.expect("Connecting to test server");
|
||||
|
||||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
@ -1677,10 +1663,8 @@ mod tests {
|
|||
// capture all tracing messages
|
||||
let tracing_capture = TracingCapture::new();
|
||||
|
||||
// Note we use a unique port. TODO: let the OS pick the port
|
||||
let mut fixture = Fixture::new(11900)
|
||||
.await
|
||||
.expect("Connecting to test server");
|
||||
// Start a test gRPC server on a randomally allocated port
|
||||
let mut fixture = Fixture::new().await.expect("Connecting to test server");
|
||||
|
||||
let request = TestErrorRequest {};
|
||||
|
||||
|
@ -1738,10 +1722,8 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_read_filter() -> Result<(), tonic::Status> {
|
||||
// Note we use a unique port. TODO: let the OS pick the port
|
||||
let mut fixture = Fixture::new(11901)
|
||||
.await
|
||||
.expect("Connecting to test server");
|
||||
// Start a test gRPC server on a randomally allocated port
|
||||
let mut fixture = Fixture::new().await.expect("Connecting to test server");
|
||||
|
||||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
@ -1817,10 +1799,8 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_read_group() -> Result<(), tonic::Status> {
|
||||
// Note we use a unique port. TODO: let the OS pick the port
|
||||
let mut fixture = Fixture::new(11902)
|
||||
.await
|
||||
.expect("Connecting to test server");
|
||||
// Start a test gRPC server on a randomally allocated port
|
||||
let mut fixture = Fixture::new().await.expect("Connecting to test server");
|
||||
|
||||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
@ -1917,10 +1897,8 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_read_window_aggegate() -> Result<(), tonic::Status> {
|
||||
// Note we use a unique port. TODO: let the OS pick the port
|
||||
let mut fixture = Fixture::new(11903)
|
||||
.await
|
||||
.expect("Connecting to test server");
|
||||
// Start a test gRPC server on a randomally allocated port
|
||||
let mut fixture = Fixture::new().await.expect("Connecting to test server");
|
||||
|
||||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
@ -2078,10 +2056,8 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_measurement_fields() -> Result<(), tonic::Status> {
|
||||
// Note we use a unique port. TODO: let the OS pick the port
|
||||
let mut fixture = Fixture::new(11904)
|
||||
.await
|
||||
.expect("Connecting to test server");
|
||||
// Start a test gRPC server on a randomally allocated port
|
||||
let mut fixture = Fixture::new().await.expect("Connecting to test server");
|
||||
|
||||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
@ -2503,6 +2479,15 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum FixtureError {
|
||||
#[snafu(display("Error binding fixture server: {}", source))]
|
||||
Bind { source: std::io::Error },
|
||||
|
||||
#[snafu(display("Error creating fixture: {}", source))]
|
||||
Tonic { source: tonic::transport::Error },
|
||||
}
|
||||
|
||||
// Wrapper around raw clients and test database
|
||||
struct Fixture {
|
||||
iox_client: IOxClient,
|
||||
|
@ -2514,22 +2499,32 @@ mod tests {
|
|||
impl Fixture {
|
||||
/// Start up a test rpc server listening on `port`, returning
|
||||
/// a fixture with the test server and clients
|
||||
async fn new(port: u16) -> Result<Self, tonic::transport::Error> {
|
||||
async fn new() -> Result<Self, FixtureError> {
|
||||
let test_storage = Arc::new(TestDatabaseStore::new());
|
||||
let test_executor = Arc::new(QueryExecutor::default());
|
||||
|
||||
// TODO: specify port 0 to let the OS pick the port (need to
|
||||
// figure out how to get access to the actual addr from tonic)
|
||||
let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
|
||||
// Get a random port from the kernel by asking for port 0.
|
||||
let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
||||
let socket = tokio::net::TcpListener::bind(bind_addr)
|
||||
.await
|
||||
.context(Bind)?;
|
||||
|
||||
// Pull the assigned port out of the socket
|
||||
let bind_addr = socket.local_addr().unwrap();
|
||||
|
||||
println!("Starting InfluxDB IOx rpc test server on {:?}", bind_addr);
|
||||
|
||||
let server = make_server(bind_addr, test_storage.clone(), test_executor.clone());
|
||||
let server = make_server(socket, test_storage.clone(), test_executor.clone());
|
||||
tokio::task::spawn(server);
|
||||
|
||||
let iox_client = connect_to_server::<IOxClient>(bind_addr).await?;
|
||||
let storage_client =
|
||||
StorageClientWrapper::new(connect_to_server::<StorageClient>(bind_addr).await?);
|
||||
let iox_client = connect_to_server::<IOxClient>(bind_addr)
|
||||
.await
|
||||
.context(Tonic)?;
|
||||
let storage_client = StorageClientWrapper::new(
|
||||
connect_to_server::<StorageClient>(bind_addr)
|
||||
.await
|
||||
.context(Tonic)?,
|
||||
);
|
||||
|
||||
Ok(Self {
|
||||
iox_client,
|
||||
|
|
Loading…
Reference in New Issue