Merge pull request #489 from influxdata/dom/storage-fixture-random-port

test: random ports for storage fixture server
pull/24376/head
Dom 2020-11-26 10:04:11 +00:00 committed by GitHub
commit 9c726530ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 59 additions and 60 deletions

View File

@ -103,7 +103,11 @@ pub async fn main() -> Result<()> {
} }
}; };
let grpc_server = storage::make_server(grpc_bind_addr, storage.clone(), executor); let socket = tokio::net::TcpListener::bind(grpc_bind_addr)
.await
.expect("failed to bind server");
let grpc_server = storage::make_server(socket, storage.clone(), executor);
info!("gRPC server listening on http://{}", grpc_bind_addr); info!("gRPC server listening on http://{}", grpc_bind_addr);

View File

@ -2,7 +2,7 @@
//! implemented in terms of the `query::Database` and //! implemented in terms of the `query::Database` and
//! `query::DatabaseStore` //! `query::DatabaseStore`
use std::{collections::HashMap, net::SocketAddr, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use generated_types::{ use generated_types::{
i_ox_server::{IOx, IOxServer}, i_ox_server::{IOx, IOxServer},
@ -39,7 +39,7 @@ use query::{
use snafu::{OptionExt, ResultExt, Snafu}; use snafu::{OptionExt, ResultExt, Snafu};
use tokio::sync::mpsc; use tokio::{net::TcpListener, sync::mpsc};
use tonic::Status; use tonic::Status;
use tracing::{info, warn}; use tracing::{info, warn};
@ -1128,7 +1128,7 @@ where
/// underlying hyper server instance. Resolves when the server has /// underlying hyper server instance. Resolves when the server has
/// shutdown. /// shutdown.
pub async fn make_server<T>( pub async fn make_server<T>(
bind_addr: SocketAddr, socket: TcpListener,
storage: Arc<T>, storage: Arc<T>,
executor: Arc<QueryExecutor>, executor: Arc<QueryExecutor>,
) -> Result<()> ) -> Result<()>
@ -1144,7 +1144,7 @@ where
storage.clone(), storage.clone(),
executor.clone(), executor.clone(),
))) )))
.serve(bind_addr) .serve_with_incoming(socket)
.await .await
.context(ServerError {}) .context(ServerError {})
.log_if_error("Running Tonic Server") .log_if_error("Running Tonic Server")
@ -1190,9 +1190,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_influxdb_iox_rpc() -> Result<()> { async fn test_influxdb_iox_rpc() -> Result<()> {
let mut fixture = Fixture::new(11807) let mut fixture = Fixture::new().await.expect("Connecting to test server");
.await
.expect("Connecting to test server");
let org = Organization { let org = Organization {
id: 1337, id: 1337,
@ -1222,10 +1220,8 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_storage_rpc_capabilities() -> Result<(), tonic::Status> { async fn test_storage_rpc_capabilities() -> Result<(), tonic::Status> {
// Note we use a unique port. TODO: let the OS pick the port // Start a test gRPC server on a randomally allocated port
let mut fixture = Fixture::new(11808) let mut fixture = Fixture::new().await.expect("Connecting to test server");
.await
.expect("Connecting to test server");
// Test response from storage server // Test response from storage server
let mut expected_capabilities: HashMap<String, Vec<String>> = HashMap::new(); let mut expected_capabilities: HashMap<String, Vec<String>> = HashMap::new();
@ -1244,10 +1240,8 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_storage_rpc_measurement_names() -> Result<(), tonic::Status> { async fn test_storage_rpc_measurement_names() -> Result<(), tonic::Status> {
// Note we use a unique port. TODO: let the OS pick the port // Start a test gRPC server on a randomally allocated port
let mut fixture = Fixture::new(11809) let mut fixture = Fixture::new().await.expect("Connecting to test server");
.await
.expect("Connecting to test server");
let db_info = OrgAndBucket::new(123, 456); let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1; let partition_id = 1;
@ -1299,10 +1293,8 @@ mod tests {
/// and that the returned values are sent back via gRPC. /// and that the returned values are sent back via gRPC.
#[tokio::test] #[tokio::test]
async fn test_storage_rpc_tag_keys() -> Result<(), tonic::Status> { async fn test_storage_rpc_tag_keys() -> Result<(), tonic::Status> {
// Note we use a unique port. TODO: let the OS pick the port // Start a test gRPC server on a randomally allocated port
let mut fixture = Fixture::new(11810) let mut fixture = Fixture::new().await.expect("Connecting to test server");
.await
.expect("Connecting to test server");
let db_info = OrgAndBucket::new(123, 456); let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1; let partition_id = 1;
@ -1380,10 +1372,8 @@ mod tests {
/// and that the returned values are sent back via gRPC. /// and that the returned values are sent back via gRPC.
#[tokio::test] #[tokio::test]
async fn test_storage_rpc_measurement_tag_keys() -> Result<(), tonic::Status> { async fn test_storage_rpc_measurement_tag_keys() -> Result<(), tonic::Status> {
// Note we use a unique port. TODO: let the OS pick the port // Start a test gRPC server on a randomally allocated port
let mut fixture = Fixture::new(11811) let mut fixture = Fixture::new().await.expect("Connecting to test server");
.await
.expect("Connecting to test server");
let db_info = OrgAndBucket::new(123, 456); let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1; let partition_id = 1;
@ -1476,10 +1466,8 @@ mod tests {
/// and that the returned values are sent back via gRPC. /// and that the returned values are sent back via gRPC.
#[tokio::test] #[tokio::test]
async fn test_storage_rpc_tag_values() -> Result<(), tonic::Status> { async fn test_storage_rpc_tag_values() -> Result<(), tonic::Status> {
// Note we use a unique port. TODO: let the OS pick the port // Start a test gRPC server on a randomally allocated port
let mut fixture = Fixture::new(11812) let mut fixture = Fixture::new().await.expect("Connecting to test server");
.await
.expect("Connecting to test server");
let db_info = OrgAndBucket::new(123, 456); let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1; let partition_id = 1;
@ -1582,10 +1570,8 @@ mod tests {
/// and that the returned values are sent back via gRPC. /// and that the returned values are sent back via gRPC.
#[tokio::test] #[tokio::test]
async fn test_storage_rpc_measurement_tag_values() -> Result<(), tonic::Status> { async fn test_storage_rpc_measurement_tag_values() -> Result<(), tonic::Status> {
// Note we use a unique port. TODO: let the OS pick the port // Start a test gRPC server on a randomally allocated port
let mut fixture = Fixture::new(11813) let mut fixture = Fixture::new().await.expect("Connecting to test server");
.await
.expect("Connecting to test server");
let db_info = OrgAndBucket::new(123, 456); let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1; let partition_id = 1;
@ -1677,10 +1663,8 @@ mod tests {
// capture all tracing messages // capture all tracing messages
let tracing_capture = TracingCapture::new(); let tracing_capture = TracingCapture::new();
// Note we use a unique port. TODO: let the OS pick the port // Start a test gRPC server on a randomally allocated port
let mut fixture = Fixture::new(11900) let mut fixture = Fixture::new().await.expect("Connecting to test server");
.await
.expect("Connecting to test server");
let request = TestErrorRequest {}; let request = TestErrorRequest {};
@ -1738,10 +1722,8 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_read_filter() -> Result<(), tonic::Status> { async fn test_read_filter() -> Result<(), tonic::Status> {
// Note we use a unique port. TODO: let the OS pick the port // Start a test gRPC server on a randomally allocated port
let mut fixture = Fixture::new(11901) let mut fixture = Fixture::new().await.expect("Connecting to test server");
.await
.expect("Connecting to test server");
let db_info = OrgAndBucket::new(123, 456); let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1; let partition_id = 1;
@ -1817,10 +1799,8 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_read_group() -> Result<(), tonic::Status> { async fn test_read_group() -> Result<(), tonic::Status> {
// Note we use a unique port. TODO: let the OS pick the port // Start a test gRPC server on a randomally allocated port
let mut fixture = Fixture::new(11902) let mut fixture = Fixture::new().await.expect("Connecting to test server");
.await
.expect("Connecting to test server");
let db_info = OrgAndBucket::new(123, 456); let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1; let partition_id = 1;
@ -1917,10 +1897,8 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_read_window_aggegate() -> Result<(), tonic::Status> { async fn test_read_window_aggegate() -> Result<(), tonic::Status> {
// Note we use a unique port. TODO: let the OS pick the port // Start a test gRPC server on a randomally allocated port
let mut fixture = Fixture::new(11903) let mut fixture = Fixture::new().await.expect("Connecting to test server");
.await
.expect("Connecting to test server");
let db_info = OrgAndBucket::new(123, 456); let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1; let partition_id = 1;
@ -2078,10 +2056,8 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_measurement_fields() -> Result<(), tonic::Status> { async fn test_measurement_fields() -> Result<(), tonic::Status> {
// Note we use a unique port. TODO: let the OS pick the port // Start a test gRPC server on a randomally allocated port
let mut fixture = Fixture::new(11904) let mut fixture = Fixture::new().await.expect("Connecting to test server");
.await
.expect("Connecting to test server");
let db_info = OrgAndBucket::new(123, 456); let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1; let partition_id = 1;
@ -2503,6 +2479,15 @@ mod tests {
} }
} }
#[derive(Debug, Snafu)]
pub enum FixtureError {
#[snafu(display("Error binding fixture server: {}", source))]
Bind { source: std::io::Error },
#[snafu(display("Error creating fixture: {}", source))]
Tonic { source: tonic::transport::Error },
}
// Wrapper around raw clients and test database // Wrapper around raw clients and test database
struct Fixture { struct Fixture {
iox_client: IOxClient, iox_client: IOxClient,
@ -2514,22 +2499,32 @@ mod tests {
impl Fixture { impl Fixture {
/// Start up a test rpc server listening on `port`, returning /// Start up a test rpc server listening on `port`, returning
/// a fixture with the test server and clients /// a fixture with the test server and clients
async fn new(port: u16) -> Result<Self, tonic::transport::Error> { async fn new() -> Result<Self, FixtureError> {
let test_storage = Arc::new(TestDatabaseStore::new()); let test_storage = Arc::new(TestDatabaseStore::new());
let test_executor = Arc::new(QueryExecutor::default()); let test_executor = Arc::new(QueryExecutor::default());
// TODO: specify port 0 to let the OS pick the port (need to // Get a random port from the kernel by asking for port 0.
// figure out how to get access to the actual addr from tonic) let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); let socket = tokio::net::TcpListener::bind(bind_addr)
.await
.context(Bind)?;
// Pull the assigned port out of the socket
let bind_addr = socket.local_addr().unwrap();
println!("Starting InfluxDB IOx rpc test server on {:?}", bind_addr); println!("Starting InfluxDB IOx rpc test server on {:?}", bind_addr);
let server = make_server(bind_addr, test_storage.clone(), test_executor.clone()); let server = make_server(socket, test_storage.clone(), test_executor.clone());
tokio::task::spawn(server); tokio::task::spawn(server);
let iox_client = connect_to_server::<IOxClient>(bind_addr).await?; let iox_client = connect_to_server::<IOxClient>(bind_addr)
let storage_client = .await
StorageClientWrapper::new(connect_to_server::<StorageClient>(bind_addr).await?); .context(Tonic)?;
let storage_client = StorageClientWrapper::new(
connect_to_server::<StorageClient>(bind_addr)
.await
.context(Tonic)?,
);
Ok(Self { Ok(Self {
iox_client, iox_client,