From c2a8baf824c49707c7c93d184a97e4fe1d242f8c Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 25 Nov 2021 14:18:25 +0100 Subject: [PATCH 1/4] refactor: clean up write buffer end2end tests Remove the ones that are covered by `write_pb.rs` or that are obsolete (i.e. rely on behavior that only exist in the legacy router code). Migrate the remaining ones to use an actual router. --- influxdb_iox/tests/common/server_fixture.rs | 7 + .../tests/end_to_end_cases/scenario.rs | 84 ++-------- .../tests/end_to_end_cases/write_buffer.rs | 147 +++--------------- 3 files changed, 44 insertions(+), 194 deletions(-) diff --git a/influxdb_iox/tests/common/server_fixture.rs b/influxdb_iox/tests/common/server_fixture.rs index 4ffd88ba3e..34aedf1b6f 100644 --- a/influxdb_iox/tests/common/server_fixture.rs +++ b/influxdb_iox/tests/common/server_fixture.rs @@ -369,6 +369,13 @@ impl TestConfig { server_type, } } + + // change server type + pub fn with_server_type(mut self, server_type: ServerType) -> Self { + self.server_type = server_type; + self + } + // add a name=value environment variable when starting the server pub fn with_env(mut self, name: impl Into, value: impl Into) -> Self { self.env.push((name.into(), value.into())); diff --git a/influxdb_iox/tests/end_to_end_cases/scenario.rs b/influxdb_iox/tests/end_to_end_cases/scenario.rs index c371b6c0a4..6622cb058c 100644 --- a/influxdb_iox/tests/end_to_end_cases/scenario.rs +++ b/influxdb_iox/tests/end_to_end_cases/scenario.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::iter::once; use std::num::NonZeroU32; use std::path::Path; use std::time::Duration; @@ -25,13 +24,9 @@ use tempfile::TempDir; use test_helpers::assert_contains; use data_types::{names::org_and_bucket_to_database, DatabaseName}; -use database_rules::RoutingRules; use generated_types::google::protobuf::Empty; use generated_types::{ - influxdata::iox::{ - management::v1::{self as management, *}, - write_buffer::v1::WriteBufferCreationConfig, - }, + influxdata::iox::{management::v1::*, write_buffer::v1::WriteBufferCreationConfig}, ReadSource, TimestampRange, }; use influxdb_iox_client::{connection::Connection, flight::PerformQuery}; @@ -312,7 +307,6 @@ pub struct DatabaseBuilder { partition_template: PartitionTemplate, lifecycle_rules: LifecycleRules, write_buffer: Option, - table_whitelist: Option>, } impl DatabaseBuilder { @@ -331,7 +325,6 @@ impl DatabaseBuilder { ..Default::default() }, write_buffer: None, - table_whitelist: None, } } @@ -375,11 +368,6 @@ impl DatabaseBuilder { self } - pub fn write_buffer_table_whitelist(mut self, whitelist: Vec) -> Self { - self.table_whitelist = Some(whitelist); - self - } - pub fn worker_backoff_millis(mut self, millis: u64) -> Self { self.lifecycle_rules.worker_backoff_millis = millis; self @@ -389,58 +377,13 @@ impl DatabaseBuilder { pub async fn try_build(self, channel: Connection) -> Result<(), CreateDatabaseError> { let mut management_client = influxdb_iox_client::management::Client::new(channel); - let routing_rules = if self.write_buffer.is_some() { - const KAFKA_PRODUCER_SINK_ID: u32 = 0; - let kafka_producer_sink = management::Sink { - sink: Some(management::sink::Sink::Kafka(KafkaProducer {})), - }; - const DEV_NULL_SINK_ID: u32 = 1; - let dev_null_sink = management::Sink { - sink: Some(management::sink::Sink::DevNull(DevNull {})), - }; - - let to_shard = |shard: u32| { - Box::new(move |i: String| MatcherToShard { - matcher: Some(Matcher { - table_name_regex: format!("^{}$", i), - }), - shard, - }) - }; - - if let Some(table_whitelist) = self.table_whitelist { - Some(RoutingRules::ShardConfig(ShardConfig { - specific_targets: table_whitelist - .into_iter() - .map(to_shard(KAFKA_PRODUCER_SINK_ID)) - .chain(once(to_shard(DEV_NULL_SINK_ID)(".*".to_string()))) - .collect(), - shards: vec![ - (KAFKA_PRODUCER_SINK_ID, kafka_producer_sink), - (DEV_NULL_SINK_ID, dev_null_sink), - ] - .into_iter() - .collect(), - ..Default::default() - })) - } else { - Some(RoutingRules::RoutingConfig(RoutingConfig { - sink: Some(management::Sink { - sink: Some(management::sink::Sink::Kafka(KafkaProducer {})), - }), - })) - } - } else { - None - }; - management_client .create_database(DatabaseRules { name: self.name, partition_template: Some(self.partition_template), lifecycle_rules: Some(self.lifecycle_rules), worker_cleanup_avg_sleep: None, - routing_rules, + routing_rules: None, write_buffer_connection: self.write_buffer, }) .await?; @@ -769,27 +712,25 @@ pub async fn fixture_replay_broken(db_name: &str, write_buffer_path: &Path) -> S fixture } -pub async fn create_router_to_write_buffer( - fixture: &ServerFixture, +pub fn wildcard_router_config( db_name: &str, -) -> (TempDir, Box) { + write_buffer_path: &Path, +) -> influxdb_iox_client::router::generated_types::Router { use influxdb_iox_client::router::generated_types::{ write_sink::Sink, Matcher, MatcherToShard, Router, ShardConfig, WriteSink, WriteSinkSet, }; - let write_buffer_dir = TempDir::new().unwrap(); - let write_buffer_connection = WriteBufferConnection { direction: write_buffer_connection::Direction::Write.into(), r#type: "file".to_string(), - connection: write_buffer_dir.path().display().to_string(), + connection: write_buffer_path.display().to_string(), creation_config: Some(WriteBufferCreationConfig { n_sequencers: 1, ..Default::default() }), ..Default::default() }; - let router_cfg = Router { + Router { name: db_name.to_string(), write_sharder: Some(ShardConfig { specific_targets: vec![MatcherToShard { @@ -810,7 +751,16 @@ pub async fn create_router_to_write_buffer( }, )]), query_sinks: Default::default(), - }; + } +} + +pub async fn create_router_to_write_buffer( + fixture: &ServerFixture, + db_name: &str, +) -> (TempDir, Box) { + let write_buffer_dir = TempDir::new().unwrap(); + + let router_cfg = wildcard_router_config(db_name, write_buffer_dir.path()); fixture .router_client() .update_router(router_cfg) diff --git a/influxdb_iox/tests/end_to_end_cases/write_buffer.rs b/influxdb_iox/tests/end_to_end_cases/write_buffer.rs index 7ab245951c..8e5a39586b 100644 --- a/influxdb_iox/tests/end_to_end_cases/write_buffer.rs +++ b/influxdb_iox/tests/end_to_end_cases/write_buffer.rs @@ -3,11 +3,9 @@ use crate::{ server_fixture::{ServerFixture, ServerType, TestConfig}, udp_listener::UdpCapture, }, - end_to_end_cases::scenario::{rand_name, DatabaseBuilder}, + end_to_end_cases::scenario::{rand_name, wildcard_router_config, DatabaseBuilder}, }; use arrow_util::assert_batches_sorted_eq; -use dml::DmlOperation; -use futures::StreamExt; use generated_types::influxdata::iox::write_buffer::v1::{ write_buffer_connection::Direction as WriteBufferDirection, WriteBufferConnection, }; @@ -19,112 +17,7 @@ use std::{num::NonZeroU32, sync::Arc}; use tempfile::TempDir; use test_helpers::assert_contains; use time::SystemProvider; -use write_buffer::{ - core::{WriteBufferReading, WriteBufferWriting}, - file::{FileBufferConsumer, FileBufferProducer}, -}; - -#[tokio::test] -async fn writes_go_to_write_buffer() { - let write_buffer_dir = TempDir::new().unwrap(); - - // set up a database with a write buffer pointing at write buffer - let server = ServerFixture::create_shared(ServerType::Database).await; - let db_name = rand_name(); - let write_buffer_connection = WriteBufferConnection { - direction: WriteBufferDirection::Write.into(), - r#type: "file".to_string(), - connection: write_buffer_dir.path().display().to_string(), - creation_config: Some(WriteBufferCreationConfig { - n_sequencers: 1, - ..Default::default() - }), - ..Default::default() - }; - - DatabaseBuilder::new(db_name.clone()) - .write_buffer(write_buffer_connection.clone()) - .build(server.grpc_channel()) - .await; - - // write some points - let mut write_client = server.write_client(); - - let lp_lines = [ - "cpu,region=west user=23.2 100", - "cpu,region=west user=21.0 150", - "disk,region=east bytes=99i 200", - ]; - - let num_lines_written = write_client - .write_lp(&db_name, lp_lines.join("\n"), 0) - .await - .expect("cannot write"); - assert_eq!(num_lines_written, 3); - - // check the data is in write buffer - let mut consumer = - FileBufferConsumer::new(write_buffer_dir.path(), &db_name, Default::default(), None) - .await - .unwrap(); - let (_, mut stream) = consumer.streams().into_iter().next().unwrap(); - match stream.stream.next().await.unwrap().unwrap() { - DmlOperation::Write(write) => assert_eq!(write.table_count(), 2), - a => panic!("unexpected operation: {:?}", a), - } -} - -#[tokio::test] -async fn writes_go_to_write_buffer_whitelist() { - let write_buffer_dir = TempDir::new().unwrap(); - - // set up a database with a write buffer pointing at write buffer - let server = ServerFixture::create_shared(ServerType::Database).await; - let db_name = rand_name(); - let write_buffer_connection = WriteBufferConnection { - direction: WriteBufferDirection::Write.into(), - r#type: "file".to_string(), - connection: write_buffer_dir.path().display().to_string(), - creation_config: Some(WriteBufferCreationConfig { - n_sequencers: 1, - ..Default::default() - }), - ..Default::default() - }; - - DatabaseBuilder::new(db_name.clone()) - .write_buffer(write_buffer_connection) - .write_buffer_table_whitelist(vec!["cpu".to_string()]) - .build(server.grpc_channel()) - .await; - - // write some points - let mut write_client = server.write_client(); - - let lp_lines = [ - "cpu,region=west user=23.2 100", - "cpu,region=west user=21.0 150", - "disk,region=east bytes=99i 200", - "mem,region=east bytes=123 250", - ]; - - let num_lines_written = write_client - .write_lp(&db_name, lp_lines.join("\n"), 0) - .await - .expect("cannot write"); - assert_eq!(num_lines_written, 4); - - // check the data is in write buffer - let mut consumer = - FileBufferConsumer::new(write_buffer_dir.path(), &db_name, Default::default(), None) - .await - .unwrap(); - let (_, mut stream) = consumer.streams().into_iter().next().unwrap(); - match stream.stream.next().await.unwrap().unwrap() { - DmlOperation::Write(write) => assert_eq!(write.table_count(), 1), - a => panic!("unexpected operation: {:?}", a), - } -} +use write_buffer::{core::WriteBufferWriting, file::FileBufferProducer}; #[tokio::test] async fn reads_come_from_write_buffer() { @@ -300,7 +193,7 @@ pub async fn test_cross_write_buffer_tracing() { // setup tracing let udp_capture = UdpCapture::new().await; - let test_config = TestConfig::new(ServerType::Database) + let test_config = TestConfig::new(ServerType::Router) .with_env("TRACES_EXPORTER", "jaeger") .with_env("TRACES_EXPORTER_JAEGER_AGENT_HOST", udp_capture.ip()) .with_env("TRACES_EXPORTER_JAEGER_AGENT_PORT", udp_capture.port()) @@ -316,24 +209,18 @@ pub async fn test_cross_write_buffer_tracing() { .update_server_id(NonZeroU32::new(1).unwrap()) .await .unwrap(); - server_write.wait_server_initialized().await; - let conn_write = WriteBufferConnection { - direction: WriteBufferDirection::Write.into(), - r#type: "file".to_string(), - connection: write_buffer_dir.path().display().to_string(), - creation_config: Some(WriteBufferCreationConfig { - n_sequencers: 1, - ..Default::default() - }), - ..Default::default() - }; - DatabaseBuilder::new(db_name.clone()) - .write_buffer(conn_write.clone()) - .build(server_write.grpc_channel()) - .await; + let router_cfg = wildcard_router_config(&db_name, write_buffer_dir.path()); + server_write + .router_client() + .update_router(router_cfg) + .await + .unwrap(); // create consumer DB - let server_read = ServerFixture::create_single_use_with_config(test_config).await; + let server_read = ServerFixture::create_single_use_with_config( + test_config.with_server_type(ServerType::Database), + ) + .await; server_read .deployment_client() .update_server_id(NonZeroU32::new(2).unwrap()) @@ -342,7 +229,13 @@ pub async fn test_cross_write_buffer_tracing() { server_read.wait_server_initialized().await; let conn_read = WriteBufferConnection { direction: WriteBufferDirection::Read.into(), - ..conn_write + r#type: "file".to_string(), + connection: write_buffer_dir.path().display().to_string(), + creation_config: Some(WriteBufferCreationConfig { + n_sequencers: 1, + ..Default::default() + }), + ..Default::default() }; DatabaseBuilder::new(db_name.clone()) .write_buffer(conn_read) From 1722704077d59d78b2b6d2d3711cfc9d6b0cdb74 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Thu, 25 Nov 2021 17:07:02 +0000 Subject: [PATCH 2/4] fix: don't project on schema metadata mismatch (#3213) * fix: don't project on schema metadata mismatch * chore: add test --- query/src/provider.rs | 61 +++++++++++++++++++++++++++++++++++++++++-- query/src/test.rs | 10 +++++++ 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/query/src/provider.rs b/query/src/provider.rs index 7286b42f78..da08e91326 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -708,8 +708,15 @@ impl Deduplicater { let input_schema = input.schema(); let output_schema = output_schema.as_arrow(); - // If the schemas are the same, nothing to do - if input_schema == output_schema { + // If columns are the same, nothing to do + let same_columns = input_schema.fields().len() == output_schema.fields().len() + && input_schema + .fields() + .iter() + .zip(output_schema.fields()) + .all(|(a, b)| a.name() == b.name()); + + if same_columns { return Ok(input); } @@ -2442,6 +2449,56 @@ mod test { assert_batches_eq!(&expected, &batch); } + #[tokio::test] + async fn test_sorted_metadata() { + let mut key = SortKey::default(); + key.push("time", Default::default()); + + let chunk = Arc::new( + TestChunk::new("t") + .with_id(1) + .with_time_column() + .with_i64_field_column("field_int") + .with_one_row_of_data() + .with_sort_key(&key), + ); + + let schema = chunk.schema(); + assert!(schema.sort_key().is_some()); + + let mut provider = ProviderBuilder::new("t", Arc::clone(&schema)) + .add_no_op_pruner() + .add_chunk(chunk) + .build() + .unwrap(); + + provider.ensure_pk_sort(); + + let plan = provider.scan(&None, 1024, &[], None).await.unwrap(); + let batches = collect(plan).await.unwrap(); + + for batch in &batches { + // TODO: schema output lacks sort key (#3214) + //assert_eq!(batch.schema(), schema.as_arrow()) + + let schema: Schema = batch.schema().try_into().unwrap(); + for field_idx in 0..schema.len() { + assert!(schema.field(field_idx).0.is_some()); + } + } + + assert_batches_eq!( + &[ + "+-----------+-----------------------------+", + "| field_int | time |", + "+-----------+-----------------------------+", + "| 1000 | 1970-01-01T00:00:00.000001Z |", + "+-----------+-----------------------------+", + ], + &batches + ); + } + fn chunk_ids(group: &[Arc]) -> String { let ids = group .iter() diff --git a/query/src/test.rs b/query/src/test.rs index 224c42ea44..727e82d4e4 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -781,6 +781,16 @@ impl TestChunk { self } + /// Set the sort key for this chunk + pub fn with_sort_key(mut self, sort_key: &SortKey<'_>) -> Self { + let mut merger = SchemaMerger::new(); + merger = merger + .merge(self.schema.as_ref()) + .expect("merging was successful"); + self.schema = Arc::new(merger.build_with_sort_key(sort_key)); + self + } + /// Returns all columns of the table pub fn all_column_names(&self) -> StringSet { self.schema From 0e06026fbdf68275a8f584d8ee5e7a56d56f395b Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Fri, 26 Nov 2021 09:26:41 +0100 Subject: [PATCH 3/4] refactor: remove write routing logic from `Server` API users should use `Database` instead, there's no need any longer to mirror this API in `Server`. Note that `Database` is better than `Db` in this case, because the former can also check if we're writing from a write buffer and can easily reject unsequenced writes. The `pub` modifiers were adjusted to make it impossible to write through `Db` directly. --- .../server_type/database/http.rs | 21 +- .../server_type/database/rpc/error.rs | 62 +-- .../server_type/database/rpc/write_pb.rs | 20 +- .../tests/end_to_end_cases/write_buffer.rs | 8 +- server/src/db.rs | 6 +- server/src/lib.rs | 445 +----------------- server/src/resolver.rs | 37 -- 7 files changed, 76 insertions(+), 523 deletions(-) diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs index d4488497aa..7fa70d09c1 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs @@ -181,17 +181,18 @@ where ) -> Result<(), InnerDmlError> { match op { DmlOperation::Write(write) => { - self.server - .write(db_name, write) + let database = self.server.active_database(db_name).map_err(|_| { + InnerDmlError::DatabaseNotFound { + db_name: db_name.to_string(), + } + })?; + + database + .route_operation(&DmlOperation::Write(write)) .await - .map_err(|e| match e { - server::Error::DatabaseNotFound { .. } => InnerDmlError::DatabaseNotFound { - db_name: db_name.to_string(), - }, - e => InnerDmlError::InternalError { - db_name: db_name.to_string(), - source: Box::new(e), - }, + .map_err(|e| InnerDmlError::InternalError { + db_name: db_name.to_string(), + source: Box::new(e), }) } DmlOperation::Delete(delete) => { diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/error.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/error.rs index c3b8bbe418..6794ec37e8 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/error.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/error.rs @@ -41,39 +41,10 @@ pub fn default_server_error_handler(error: server::Error) -> tonic::Status { description: source.to_string(), } .into(), - Error::HardLimitReached {} => QuotaFailure { - subject: "influxdata.com/iox/buffer".to_string(), - description: "hard buffer limit reached".to_string(), - } - .into(), - source @ Error::WritingOnlyAllowedThroughWriteBuffer { .. } - | source @ Error::ShardWrite { .. } => { - tonic::Status::failed_precondition(source.to_string()) - } - Error::NoRemoteConfigured { node_group } => NotFound { - resource_type: "remote".to_string(), - resource_name: format!("{:?}", node_group), - ..Default::default() - } - .into(), - Error::RemoteError { source } => tonic::Status::unavailable(source.to_string()), Error::WipePreservedCatalog { source } => default_database_error_handler(source), - Error::DeleteExpression { - start_time, - stop_time, - predicate, - } => FieldViolation { - field: format!( - "time range: [{}, {}], predicate: {}", - start_time, stop_time, predicate - ), - description: "Invalid time range or predicate".to_string(), - } - .into(), Error::DatabaseInit { source } => { tonic::Status::invalid_argument(format!("Cannot initialize database: {}", source)) } - Error::StoreWriteErrors { .. } => tonic::Status::invalid_argument(error.to_string()), Error::DatabaseAlreadyExists { .. } | Error::DatabaseAlreadyOwnedByThisServer { .. } => { tonic::Status::already_exists(error.to_string()) } @@ -185,9 +156,42 @@ pub fn default_db_error_handler(error: server::db::Error) -> tonic::Status { } .into(), Error::CatalogError { source } => default_catalog_error_handler(source), + Error::HardLimitReached {} => QuotaFailure { + subject: "influxdata.com/iox/buffer".to_string(), + description: "hard buffer limit reached".to_string(), + } + .into(), + Error::StoreWriteErrors { .. } => tonic::Status::invalid_argument(error.to_string()), error => { error!(?error, "Unexpected error"); InternalError {}.into() } } } + +/// map common [`database::WriteError`](server::database::WriteError) errors to the appropriate tonic Status +pub fn default_database_write_error_handler(error: server::database::WriteError) -> tonic::Status { + use server::database::WriteError; + + match error { + WriteError::HardLimitReached {} => QuotaFailure { + subject: "influxdata.com/iox/buffer".to_string(), + description: "hard buffer limit reached".to_string(), + } + .into(), + WriteError::DbError { source } => default_db_error_handler(source), + source @ WriteError::WritingOnlyAllowedThroughWriteBuffer => { + tonic::Status::failed_precondition(source.to_string()) + } + WriteError::NotInitialized { state } => { + tonic::Status::unavailable(format!("Database is not yet initialized: {}", state)) + } + error @ WriteError::StoreWriteErrors { .. } => { + tonic::Status::invalid_argument(error.to_string()) + } + error => { + error!(?error, "Unexpected write error"); + InternalError {}.into() + } + } +} diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/write_pb.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/write_pb.rs index 1ce859f185..d285b65b0e 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/write_pb.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/write_pb.rs @@ -1,12 +1,12 @@ use data_types::DatabaseName; -use dml::{DmlMeta, DmlWrite}; +use dml::{DmlMeta, DmlOperation, DmlWrite}; use generated_types::google::{FieldViolation, FieldViolationExt}; use generated_types::influxdata::pbdata::v1::*; use server::{connection::ConnectionManager, Server}; use std::fmt::Debug; use std::sync::Arc; -use super::error::default_server_error_handler; +use super::error::{default_database_write_error_handler, default_server_error_handler}; struct PBWriteService { server: Arc>, @@ -27,9 +27,6 @@ where .database_batch .ok_or_else(|| FieldViolation::required("database_batch"))?; - let db_name = DatabaseName::new(&database_batch.database_name) - .scope("database_batch.database_name")?; - let tables = mutable_batch_pb::decode::decode_database_batch(&database_batch).map_err(|e| { FieldViolation { @@ -40,11 +37,18 @@ where let write = DmlWrite::new(tables, DmlMeta::unsequenced(span_ctx)); - self.server - .write(&db_name, write) - .await + let db_name = DatabaseName::new(&database_batch.database_name) + .scope("database_batch.database_name")?; + let database = self + .server + .active_database(&db_name) .map_err(default_server_error_handler)?; + database + .route_operation(&DmlOperation::Write(write)) + .await + .map_err(default_database_write_error_handler)?; + Ok(tonic::Response::new(WriteResponse {})) } } diff --git a/influxdb_iox/tests/end_to_end_cases/write_buffer.rs b/influxdb_iox/tests/end_to_end_cases/write_buffer.rs index 8e5a39586b..a3ba682cc7 100644 --- a/influxdb_iox/tests/end_to_end_cases/write_buffer.rs +++ b/influxdb_iox/tests/end_to_end_cases/write_buffer.rs @@ -151,13 +151,7 @@ async fn cant_write_to_db_reading_from_write_buffer() { .await .expect_err("expected write to fail"); - assert_contains!( - err.to_string(), - format!( - r#"Cannot write to database {}, it's configured to only read from the write buffer"#, - db_name - ) - ); + assert_contains!(err.to_string(), "only allowed through write buffer"); assert!(matches!(dbg!(err), WriteError::ServerError(_))); } diff --git a/server/src/db.rs b/server/src/db.rs index 1ba344a4fa..d899a19fa4 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -973,7 +973,7 @@ impl Db { /// Stores the write on this [`Db`] and/or routes it to the write buffer /// /// TODO: Remove this method (#2243) - pub async fn route_operation(&self, operation: &DmlOperation) -> Result<()> { + pub(crate) async fn route_operation(&self, operation: &DmlOperation) -> Result<()> { let immutable = { let rules = self.rules.read(); rules.lifecycle_rules.immutable @@ -1023,12 +1023,12 @@ impl Db { } /// Writes the provided [`DmlWrite`] to this database - pub fn store_write(&self, db_write: &DmlWrite) -> Result<()> { + pub(crate) fn store_write(&self, db_write: &DmlWrite) -> Result<()> { self.store_filtered_write(db_write, WriteFilterNone::default()) } /// Writes the provided [`DmlWrite`] to this database with the provided [`WriteFilter`] - pub fn store_filtered_write( + pub(crate) fn store_filtered_write( &self, db_write: &DmlWrite, filter: impl WriteFilter, diff --git a/server/src/lib.rs b/server/src/lib.rs index ddfd2f47c0..f544543803 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -70,10 +70,9 @@ use ::lifecycle::{LockableChunk, LockablePartition}; use async_trait::async_trait; -use connection::{ConnectionManager, RemoteServer}; +use connection::ConnectionManager; use data_types::{ chunk_metadata::ChunkId, - database_rules::{NodeGroup, RoutingRules, Sink}, detailed_database::ActiveDatabase, error::ErrorLogger, job::Job, @@ -88,7 +87,6 @@ use internal_types::freezable::Freezable; use iox_object_store::IoxObjectStore; use observability_deps::tracing::{error, info, warn}; use parking_lot::RwLock; -use rand::seq::SliceRandom; use resolver::Resolver; use snafu::{ensure, OptionExt, ResultExt, Snafu}; use std::sync::Arc; @@ -99,7 +97,6 @@ use uuid::Uuid; pub use application::ApplicationState; pub use db::Db; -use dml::{DmlOperation, DmlWrite}; pub use job::JobRegistry; pub use resolver::{GrpcConnectionString, RemoteTemplate}; @@ -182,9 +179,6 @@ pub enum Error { current: Uuid, }, - #[snafu(display("Server error: {}", source))] - ServerError { source: std::io::Error }, - #[snafu(display("invalid database: {}", source))] InvalidDatabaseName { source: DatabaseNameError }, @@ -209,61 +203,11 @@ pub enum Error { table: String, }, - #[snafu(display("hard buffer limit reached"))] - HardLimitReached {}, - - #[snafu(display( - "Storing database write failed with the following error(s), and possibly more: {}", - errors.iter().map(ToString::to_string).collect::>().join(", ") - ))] - StoreWriteErrors { errors: Vec }, - - #[snafu(display( - "Cannot write to database {}, it's configured to only read from the write buffer", - db_name - ))] - WritingOnlyAllowedThroughWriteBuffer { db_name: String }, - - #[snafu(display("Cannot write to write buffer: {}", source))] - WriteBuffer { - source: Box, - }, - - #[snafu(display("no remote configured for node group: {:?}", node_group))] - NoRemoteConfigured { node_group: NodeGroup }, - - #[snafu(display("all remotes failed connecting: {:?}", errors))] - NoRemoteReachable { - errors: HashMap, - }, - - #[snafu(display("remote error: {}", source))] - RemoteError { - source: connection::ConnectionManagerError, - }, - #[snafu(display("database failed to initialize: {}", source))] DatabaseInit { source: Arc }, - #[snafu(display( - "Either invalid time range [{}, {}] or invalid delete expression {}", - start_time, - stop_time, - predicate - ))] - DeleteExpression { - start_time: String, - stop_time: String, - predicate: String, - }, - #[snafu(display("error persisting server config to object storage: {}", source))] PersistServerConfig { source: object_store::Error }, - - #[snafu(display("Error sharding write: {}", source))] - ShardWrite { - source: data_types::database_rules::Error, - }, } pub type Result = std::result::Result; @@ -901,141 +845,6 @@ where Ok(()) } - /// `write_lines` takes in raw line protocol and converts it to a collection - /// of ShardedEntry which are then sent to other IOx servers based on - /// the ShardConfig or sent to the local database for buffering in the - /// WriteBuffer and/or the MutableBuffer if configured. - /// - /// The provided `default_time` is nanoseconds since the epoch and will be assigned - /// to any lines that don't have a timestamp. - /// - /// TODO: Replace with dedicated router in terms of MutableBatch - pub async fn write(&self, db_name: &DatabaseName<'_>, write: DmlWrite) -> Result<()> { - let db = self.db(db_name)?; - let rules = db.rules(); - - let sharded_writes = match &rules.routing_rules { - Some(RoutingRules::ShardConfig(shard_config)) => { - let sharded_writes = write.shard(shard_config).context(ShardWrite)?; - itertools::Either::Left(sharded_writes.into_iter().map(|(s, w)| (Some(s), w))) - } - _ => itertools::Either::Right(std::iter::once((None, write))), - }; - - // Write to all shards in parallel; as soon as one fails return error - // immediately to the client and abort all other outstanding requests. - futures_util::future::try_join_all(sharded_writes.map(|(shard, write)| { - let sink = match &rules.routing_rules { - Some(RoutingRules::ShardConfig(shard_config)) => { - let id = shard.expect("sharded entry"); - Some(shard_config.shards.get(&id).expect("valid shard")) - } - Some(RoutingRules::RoutingConfig(config)) => Some(&config.sink), - None => None, - }; - - async move { - match sink { - Some(sink) => self.write_sink(db_name, sink, write).await, - None => self.write_local(db_name, &DmlOperation::Write(write)).await, - } - } - })) - .await?; - Ok(()) - } - - async fn write_sink( - &self, - db_name: &DatabaseName<'_>, - sink: &Sink, - write: DmlWrite, - ) -> Result<()> { - match sink { - Sink::Iox(node_group) => self.write_downstream(db_name, node_group, &write).await, - Sink::Kafka(_) => { - // The write buffer write path is currently implemented in "db", so confusingly we - // need to invoke write_entry_local. - // TODO(mkm): tracked in #2134 - self.write_local(db_name, &DmlOperation::Write(write)).await - } - Sink::DevNull => { - // write is silently ignored, as requested by the configuration. - Ok(()) - } - } - } - - async fn write_downstream( - &self, - db_name: &str, - node_group: &[ServerId], - write: &DmlWrite, - ) -> Result<()> { - // Return an error if this server is not yet ready - self.shared.state.read().initialized()?; - - let addrs: Vec<_> = { - let resolver = self.resolver.read(); - node_group - .iter() - .filter_map(|&node| resolver.resolve_remote(node)) - .collect() - }; - - if addrs.is_empty() { - return NoRemoteConfigured { node_group }.fail(); - } - - let mut errors = HashMap::new(); - // this needs to be in its own statement because rand::thread_rng is not Send and the loop below is async. - // braces around the expression would work but clippy don't know that and complains the braces are useless. - let random_addrs_iter = addrs.choose_multiple(&mut rand::thread_rng(), addrs.len()); - for addr in random_addrs_iter { - match self.connection_manager.remote_server(addr).await { - Err(err) => { - info!("error obtaining remote for {}: {}", addr, err); - errors.insert(addr.to_owned(), err); - } - Ok(remote) => return remote.write(db_name, write).await.context(RemoteError), - }; - } - NoRemoteReachable { errors }.fail() - } - - /// Write an entry to the local `Db` - /// - /// TODO: Remove this and migrate callers to `Database::route_write` - async fn write_local( - &self, - db_name: &DatabaseName<'_>, - operation: &DmlOperation, - ) -> Result<()> { - use database::WriteError; - - self.active_database(db_name)? - .route_operation(operation) - .await - .map_err(|e| match e { - WriteError::NotInitialized { .. } => Error::DatabaseNotInitialized { - db_name: db_name.to_string(), - }, - WriteError::WriteBuffer { source } => Error::WriteBuffer { source }, - WriteError::WritingOnlyAllowedThroughWriteBuffer => { - Error::WritingOnlyAllowedThroughWriteBuffer { - db_name: db_name.to_string(), - } - } - WriteError::DbError { source } => Error::UnknownDatabaseError { - source: Box::new(source), - }, - WriteError::HardLimitReached { .. } => Error::HardLimitReached {}, - WriteError::StoreWriteErrors { errors } => Error::StoreWriteErrors { - errors: errors.into_iter().map(|e| Box::new(e) as _).collect(), - }, - }) - } - /// Update database rules and save on success. pub async fn update_db_rules( &self, @@ -1450,18 +1259,13 @@ mod tests { test_utils::{make_application, make_server}, *, }; - use arrow::record_batch::RecordBatch; - use arrow_util::assert_batches_eq; use bytes::Bytes; - use connection::test_helpers::{TestConnectionManager, TestRemoteServer}; use data_types::{ chunk_metadata::{ChunkAddr, ChunkStorage}, - database_rules::{ - DatabaseRules, HashRing, LifecycleRules, PartitionTemplate, ShardConfig, ShardId, - TemplatePart, - }, + database_rules::{DatabaseRules, LifecycleRules, PartitionTemplate, TemplatePart}, write_buffer::{WriteBufferConnection, WriteBufferDirection}, }; + use dml::DmlWrite; use iox_object_store::IoxObjectStore; use mutable_batch_lp::lines_to_batches; use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi}; @@ -1469,13 +1273,10 @@ mod tests { core::{PreservedCatalog, PreservedCatalogConfig}, test_helpers::{load_ok, new_empty}, }; - use query::{exec::ExecutionContextProvider, frontend::sql::SqlQueryPlanner, QueryDatabase}; + use query::QueryDatabase; use std::{ convert::TryFrom, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, + sync::Arc, time::{Duration, Instant}, }; use test_helpers::{assert_contains, assert_error}; @@ -1484,12 +1285,7 @@ mod tests { async fn server_api_calls_return_error_with_no_id_set() { let server = make_server(make_application()); - let tables = lines_to_batches("cpu foo=1 10", 0).unwrap(); - let write = DmlWrite::new(tables, Default::default()); - let resp = server - .write(&DatabaseName::new("foo").unwrap(), write) - .await - .unwrap_err(); + let resp = server.db(&DatabaseName::new("foo").unwrap()).unwrap_err(); assert!(matches!(resp, Error::IdNotSet)); } @@ -1984,136 +1780,6 @@ mod tests { assert_eq!(names, db_names_sorted); } - #[tokio::test] - async fn writes_local() { - let server = make_server(make_application()); - server.set_id(ServerId::try_from(1).unwrap()).unwrap(); - server.wait_for_init().await.unwrap(); - - let db_name = DatabaseName::new("foo".to_string()).unwrap(); - server - .create_database(default_rules(db_name.clone())) - .await - .unwrap(); - - let tables = lines_to_batches("cpu bar=1 10", 0).unwrap(); - let write = DmlWrite::new(tables, Default::default()); - server.write(&db_name, write).await.unwrap(); - - let db_name = DatabaseName::new("foo").unwrap(); - let db = server.db(&db_name).unwrap(); - let batches = run_query(db, "select * from cpu").await; - - let expected = vec![ - "+-----+--------------------------------+", - "| bar | time |", - "+-----+--------------------------------+", - "| 1 | 1970-01-01T00:00:00.000000010Z |", - "+-----+--------------------------------+", - ]; - assert_batches_eq!(expected, &batches); - } - - // This tests sets up a database with a sharding config which defines exactly one shard - // backed by 3 remote nodes. One of the nodes is modeled to be "down", while the other two - // can record write entry events. - // This tests goes through a few trivial error cases before checking that the both working - // mock remote servers actually receive write entry events. - // - // This test is theoretically flaky, low probability though (in the order of 1e-30) - #[tokio::test] - async fn write_entry_downstream() { - const TEST_SHARD_ID: ShardId = 1; - const GOOD_REMOTE_ADDR_1: &str = "http://localhost:111"; - const GOOD_REMOTE_ADDR_2: &str = "http://localhost:222"; - const BAD_REMOTE_ADDR: &str = "http://localhost:666"; - - let good_remote_id_1 = ServerId::try_from(1).unwrap(); - let good_remote_id_2 = ServerId::try_from(2).unwrap(); - let bad_remote_id = ServerId::try_from(666).unwrap(); - - let mut manager = TestConnectionManager::new(); - let written_1 = Arc::new(AtomicBool::new(false)); - manager.remotes.insert( - GOOD_REMOTE_ADDR_1.to_owned(), - Arc::new(TestRemoteServer { - written: Arc::clone(&written_1), - }), - ); - let written_2 = Arc::new(AtomicBool::new(false)); - manager.remotes.insert( - GOOD_REMOTE_ADDR_2.to_owned(), - Arc::new(TestRemoteServer { - written: Arc::clone(&written_2), - }), - ); - - let server = Server::new(manager, make_application(), Default::default()); - server.set_id(ServerId::try_from(1).unwrap()).unwrap(); - server.wait_for_init().await.unwrap(); - - let db_name = DatabaseName::new("foo").unwrap(); - server - .create_database(default_rules(db_name.clone())) - .await - .unwrap(); - - let remote_ids = vec![bad_remote_id, good_remote_id_1, good_remote_id_2]; - let db = server.db(&db_name).unwrap(); - - let shard_config = ShardConfig { - hash_ring: Some(HashRing { - shards: vec![TEST_SHARD_ID].into(), - }), - shards: vec![(TEST_SHARD_ID, Sink::Iox(remote_ids.clone()))] - .into_iter() - .collect(), - ..Default::default() - }; - - let mut rules = db.rules().as_ref().clone(); - rules.routing_rules = Some(RoutingRules::ShardConfig(shard_config)); - let rules = Arc::new(rules); - - db.update_rules(rules); - - let tables = lines_to_batches("cpu bar=1 10", 0).unwrap(); - let write = DmlWrite::new(tables, Default::default()); - let err = server.write(&db_name, write.clone()).await.unwrap_err(); - assert!( - matches!(err, Error::NoRemoteConfigured { node_group } if node_group == remote_ids) - ); - - // one remote is configured but it's down and we'll get connection error - server.update_remote(bad_remote_id, BAD_REMOTE_ADDR.into()); - let err = server.write(&db_name, write.clone()).await.unwrap_err(); - assert!(matches!( - err, - Error::NoRemoteReachable { errors } if matches!( - errors[BAD_REMOTE_ADDR], - connection::ConnectionManagerError::RemoteServerConnectError {..} - ) - )); - assert!(!written_1.load(Ordering::Relaxed)); - assert!(!written_2.load(Ordering::Relaxed)); - - // We configure the address for the other remote, this time connection will succeed - // despite the bad remote failing to connect. - server.update_remote(good_remote_id_1, GOOD_REMOTE_ADDR_1.into()); - server.update_remote(good_remote_id_2, GOOD_REMOTE_ADDR_2.into()); - - // Remotes are tried in random order, so we need to repeat the test a few times to have a reasonable - // probability both the remotes will get hit. - for _ in 0..100 { - server - .write(&db_name, write.clone()) - .await - .expect("cannot write lines"); - } - assert!(written_1.load(Ordering::Relaxed)); - assert!(written_2.load(Ordering::Relaxed)); - } - #[tokio::test] async fn close_chunk() { test_helpers::maybe_start_logging(); @@ -2129,11 +1795,11 @@ mod tests { .unwrap(); let tables = lines_to_batches("cpu bar=1 10", 0).unwrap(); + let db = server.db(&db_name).unwrap(); let write = DmlWrite::new(tables, Default::default()); - server.write(&db_name, write).await.unwrap(); + db.store_write(&write).unwrap(); // get chunk ID - let db = server.db(&db_name).unwrap(); let chunks = db.chunk_summaries().unwrap(); assert_eq!(chunks.len(), 1); let chunk_id = chunks[0].id; @@ -2191,39 +1857,6 @@ mod tests { server.join().await.unwrap(); } - #[tokio::test] - async fn hard_buffer_limit() { - let server = make_server(make_application()); - server.set_id(ServerId::try_from(1).unwrap()).unwrap(); - server.wait_for_init().await.unwrap(); - - let name = DatabaseName::new("foo").unwrap(); - server - .create_database(default_rules(name.clone())) - .await - .unwrap(); - - let db = server.db(&name).unwrap(); - - let mut rules: DatabaseRules = db.rules().as_ref().clone(); - - rules.lifecycle_rules.buffer_size_hard = Some(std::num::NonZeroUsize::new(10).unwrap()); - - let rules = Arc::new(rules); - db.update_rules(Arc::clone(&rules)); - - // inserting first line does not trigger hard buffer limit - let tables = lines_to_batches("cpu bar=1 10", 0).unwrap(); - let write = DmlWrite::new(tables, Default::default()); - server.write(&name, write).await.unwrap(); - - // inserting second line will - let tables = lines_to_batches("cpu bar=2 20", 0).unwrap(); - let write = DmlWrite::new(tables, Default::default()); - let res = server.write(&name, write).await.unwrap_err(); - assert!(matches!(res, super::Error::HardLimitReached {})); - } - #[tokio::test] async fn cannot_create_db_until_server_is_initialized() { let server = make_server(make_application()); @@ -2372,9 +2005,13 @@ mod tests { // can only write to successfully created DBs let tables = lines_to_batches("cpu foo=1 10", 0).unwrap(); let write = DmlWrite::new(tables, Default::default()); - server.write(&foo_db_name, write.clone()).await.unwrap(); + server + .db(&foo_db_name) + .unwrap() + .store_write(&write) + .unwrap(); - let err = server.write(&bar_db_name, write).await.unwrap_err(); + let err = server.db(&bar_db_name).unwrap_err(); assert!(matches!(err, Error::DatabaseNotInitialized { .. })); // creating failed DBs does not work @@ -2766,10 +2403,10 @@ mod tests { ); assert!(database.init_error().is_none()); - assert!(server.db(&db_name_catalog_broken).is_ok()); + let db = server.db(&db_name_catalog_broken).unwrap(); let tables = lines_to_batches("cpu bar=1 10", 0).unwrap(); let write = DmlWrite::new(tables, Default::default()); - server.write(&db_name_catalog_broken, write).await.unwrap(); + db.store_write(&write).unwrap(); // 5. cannot wipe if DB was just created let created = server @@ -2792,56 +2429,6 @@ mod tests { ); } - #[tokio::test] - async fn write_buffer_errors_propagate() { - let application = make_application(); - - application - .write_buffer_factory() - .register_always_fail_mock("my_mock".to_string()); - - let server = make_server(application); - server.set_id(ServerId::try_from(1).unwrap()).unwrap(); - server.wait_for_init().await.unwrap(); - - let db_name = DatabaseName::new("my_db").unwrap(); - let rules = DatabaseRules { - name: db_name.clone(), - partition_template: PartitionTemplate { - parts: vec![TemplatePart::TimeFormat("YYYY-MM".to_string())], - }, - lifecycle_rules: Default::default(), - routing_rules: None, - worker_cleanup_avg_sleep: Duration::from_secs(2), - write_buffer_connection: Some(WriteBufferConnection { - direction: WriteBufferDirection::Write, - type_: "mock".to_string(), - connection: "my_mock".to_string(), - ..Default::default() - }), - }; - server - .create_database(make_provided_rules(rules)) - .await - .unwrap(); - - let tables = lines_to_batches("cpu bar=1 10", 0).unwrap(); - let write = DmlWrite::new(tables, Default::default()); - assert_error!( - server.write(&db_name, write).await, - Error::WriteBuffer { .. }, - ); - } - - // run a sql query against the database, returning the results as record batches - async fn run_query(db: Arc, query: &str) -> Vec { - let planner = SqlQueryPlanner::default(); - let ctx = db.new_query_context(None); - - let physical_plan = planner.query(query, &ctx).await.unwrap(); - ctx.collect(physical_plan).await.unwrap() - } - fn default_rules(db_name: DatabaseName<'static>) -> ProvidedDatabaseRules { make_provided_rules(DatabaseRules::new(db_name)) } diff --git a/server/src/resolver.rs b/server/src/resolver.rs index 19ed62ac97..e77ea9bf41 100644 --- a/server/src/resolver.rs +++ b/server/src/resolver.rs @@ -14,10 +14,6 @@ impl RemoteTemplate { let template = template.into(); Self { template } } - - fn get(&self, id: &ServerId) -> GrpcConnectionString { - self.template.replace("{id}", &format!("{}", id.get_u32())) - } } /// A gRPC connection string. @@ -55,37 +51,4 @@ impl Resolver { pub fn delete_remote(&mut self, id: ServerId) -> Option { self.remotes.remove(&id) } - - /// Get remote server by ID. - pub fn resolve_remote(&self, id: ServerId) -> Option { - self.remotes - .get(&id) - .cloned() - .or_else(|| self.remote_template.as_ref().map(|t| t.get(&id))) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::num::NonZeroU32; - - #[test] - fn resolve_remote() { - let resolver = Resolver::new(Some(RemoteTemplate::new("http://iox-query-{id}:8082"))); - - let server_id = ServerId::new(NonZeroU32::new(42).unwrap()); - let remote = resolver.resolve_remote(server_id); - assert_eq!( - remote, - Some(GrpcConnectionString::from("http://iox-query-42:8082")) - ); - - let server_id = ServerId::new(NonZeroU32::new(24).unwrap()); - let remote = resolver.resolve_remote(server_id); - assert_eq!( - remote, - Some(GrpcConnectionString::from("http://iox-query-24:8082")) - ); - } } From 22936abb2372802dd3e74f10d0e35f07735324f1 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 24 Nov 2021 18:22:31 +0100 Subject: [PATCH 4/4] refactor: remove connection manager and resolver from `Server` --- .../server_type/database/http.rs | 45 ++--- .../influxdb_ioxd/server_type/database/mod.rs | 27 +-- .../server_type/database/rpc/delete.rs | 21 +- .../server_type/database/rpc/deployment.rs | 22 +-- .../server_type/database/rpc/flight.rs | 16 +- .../server_type/database/rpc/management.rs | 24 +-- .../server_type/database/rpc/mod.rs | 11 +- .../server_type/database/rpc/write_pb.rs | 21 +- .../server_type/database/setup.rs | 13 +- server/src/connection.rs | 185 ------------------ server/src/lib.rs | 77 ++------ server/src/resolver.rs | 54 ----- server/tests/write_buffer_delete.rs | 3 +- 13 files changed, 77 insertions(+), 442 deletions(-) delete mode 100644 server/src/connection.rs delete mode 100644 server/src/resolver.rs diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs index 7fa70d09c1..163b4b6e17 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/http.rs @@ -14,7 +14,7 @@ use data_types::{names::OrgBucketMappingError, DatabaseName}; use influxdb_iox_client::format::QueryOutputFormat; use query::exec::ExecutionContextProvider; -use server::{connection::ConnectionManager, Error}; +use server::Error; // External crates use async_trait::async_trait; @@ -162,10 +162,7 @@ impl From for ApplicationError { } #[async_trait] -impl HttpDrivenDml for DatabaseServerType -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +impl HttpDrivenDml for DatabaseServerType { fn max_request_size(&self) -> usize { self.max_request_size } @@ -222,13 +219,10 @@ where } } -pub async fn route_request( - server_type: &DatabaseServerType, +pub async fn route_request( + server_type: &DatabaseServerType, req: Request, -) -> Result, ApplicationError> -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +) -> Result, ApplicationError> { match server_type .route_dml_http_request(req) .await @@ -266,9 +260,9 @@ fn default_format() -> String { QueryOutputFormat::default().to_string() } -async fn query( +async fn query( req: Request, - server_type: &DatabaseServerType, + server_type: &DatabaseServerType, ) -> Result, ApplicationError> { let server = &server_type.server; @@ -340,10 +334,7 @@ mod tests { use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName}; use object_store::ObjectStore; use schema::selection::Selection; - use server::{ - connection::ConnectionManagerImpl, db::Db, rules::ProvidedDatabaseRules, ApplicationState, - Server, - }; + use server::{db::Db, rules::ProvidedDatabaseRules, ApplicationState, Server}; use trace::RingBufferTraceCollector; fn make_application() -> Arc { @@ -354,12 +345,8 @@ mod tests { )) } - fn make_server(application: Arc) -> Arc> { - Arc::new(Server::new( - ConnectionManagerImpl::new(), - application, - Default::default(), - )) + fn make_server(application: Arc) -> Arc { + Arc::new(Server::new(application, Default::default())) } #[tokio::test] @@ -377,10 +364,7 @@ mod tests { assert_tracing(setup_server().await).await; } - async fn assert_dbwrite( - test_server: TestServer>, - write: DmlWrite, - ) { + async fn assert_dbwrite(test_server: TestServer, write: DmlWrite) { let (table_name, mutable_batch) = write.tables().next().unwrap(); let test_db = test_server @@ -529,10 +513,7 @@ mod tests { /// Sets up a test database with some data for testing the query endpoint /// returns a client for communicating with the server, and the server /// endpoint - async fn setup_test_data() -> ( - Client, - TestServer>, - ) { + async fn setup_test_data() -> (Client, TestServer) { let test_server = setup_server().await; let client = Client::new(); @@ -689,7 +670,7 @@ mod tests { } /// return a test server and the url to contact it for `MyOrg_MyBucket` - async fn setup_server() -> TestServer> { + async fn setup_server() -> TestServer { let application = make_application(); let app_server = make_server(Arc::clone(&application)); diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/mod.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/mod.rs index ed3daa4e7a..4e2577e0b7 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/mod.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/mod.rs @@ -5,7 +5,7 @@ use futures::{future::FusedFuture, FutureExt}; use hyper::{Body, Request, Response}; use metric::Registry; use observability_deps::tracing::{error, info}; -use server::{connection::ConnectionManager, ApplicationState, Server}; +use server::{ApplicationState, Server}; use tokio_util::sync::CancellationToken; use trace::TraceCollector; @@ -25,25 +25,19 @@ pub use self::http::ApplicationError; use super::common_state::CommonServerState; #[derive(Debug)] -pub struct DatabaseServerType -where - M: ConnectionManager + std::fmt::Debug + Send + Sync + 'static, -{ +pub struct DatabaseServerType { pub application: Arc, - pub server: Arc>, + pub server: Arc, pub lp_metrics: Arc, pub max_request_size: usize, pub serving_readiness: ServingReadiness, shutdown: CancellationToken, } -impl DatabaseServerType -where - M: ConnectionManager + std::fmt::Debug + Send + Sync + 'static, -{ +impl DatabaseServerType { pub fn new( application: Arc, - server: Arc>, + server: Arc, common_state: &CommonServerState, ) -> Self { let lp_metrics = Arc::new(LineProtocolMetrics::new( @@ -62,10 +56,7 @@ where } #[async_trait] -impl ServerType for DatabaseServerType -where - M: ConnectionManager + std::fmt::Debug + Send + Sync + 'static, -{ +impl ServerType for DatabaseServerType { type RouteError = ApplicationError; fn metric_registry(&self) -> Arc { @@ -132,7 +123,7 @@ mod tests { use data_types::{database_rules::DatabaseRules, DatabaseName}; use futures::pin_mut; use influxdb_iox_client::{connection::Connection, flight::PerformQuery}; - use server::{connection::ConnectionManagerImpl, rules::ProvidedDatabaseRules}; + use server::rules::ProvidedDatabaseRules; use std::{convert::TryInto, net::SocketAddr, num::NonZeroU64}; use structopt::StructOpt; use tokio::task::JoinHandle; @@ -157,7 +148,7 @@ mod tests { async fn test_serve( config: RunConfig, application: Arc, - server: Arc>, + server: Arc, ) { let grpc_listener = grpc_listener(config.grpc_bind_address.into()) .await @@ -348,7 +339,7 @@ mod tests { collector: &Arc, ) -> ( SocketAddr, - Arc>, + Arc, JoinHandle>, ) { let config = test_config(Some(23)); diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/delete.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/delete.rs index 90b8005b57..85133af32e 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/delete.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/delete.rs @@ -1,4 +1,3 @@ -use std::fmt::Debug; use std::sync::Arc; use data_types::non_empty::NonEmptyString; @@ -6,20 +5,17 @@ use data_types::DatabaseName; use dml::{DmlDelete, DmlMeta}; use generated_types::google::{FieldViolationExt, FromOptionalField, OptionalField}; use generated_types::influxdata::iox::delete::v1::*; -use server::{connection::ConnectionManager, Server}; +use server::Server; use tonic::Response; -struct DeleteService { - server: Arc>, +struct DeleteService { + server: Arc, } use super::error::{default_db_error_handler, default_server_error_handler}; #[tonic::async_trait] -impl delete_service_server::DeleteService for DeleteService -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +impl delete_service_server::DeleteService for DeleteService { async fn delete( &self, request: tonic::Request, @@ -50,11 +46,8 @@ where } } -pub fn make_server( - server: Arc>, -) -> delete_service_server::DeleteServiceServer -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +pub fn make_server( + server: Arc, +) -> delete_service_server::DeleteServiceServer { delete_service_server::DeleteServiceServer::new(DeleteService { server }) } diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/deployment.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/deployment.rs index 92e180e1ec..08780dcfe2 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/deployment.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/deployment.rs @@ -3,12 +3,12 @@ use generated_types::{ google::{FieldViolation, NotFound}, influxdata::iox::deployment::v1::*, }; -use server::{connection::ConnectionManager, Error, Server}; -use std::{convert::TryFrom, fmt::Debug, sync::Arc}; +use server::{Error, Server}; +use std::{convert::TryFrom, sync::Arc}; use tonic::{Request, Response, Status}; -struct DeploymentService { - server: Arc>, +struct DeploymentService { + server: Arc, serving_readiness: ServingReadiness, } @@ -16,10 +16,7 @@ use super::error::default_server_error_handler; use crate::influxdb_ioxd::serving_readiness::ServingReadiness; #[tonic::async_trait] -impl deployment_service_server::DeploymentService for DeploymentService -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +impl deployment_service_server::DeploymentService for DeploymentService { async fn get_server_id( &self, _: Request, @@ -69,15 +66,12 @@ where } } -pub fn make_server( - server: Arc>, +pub fn make_server( + server: Arc, serving_readiness: ServingReadiness, ) -> deployment_service_server::DeploymentServiceServer< impl deployment_service_server::DeploymentService, -> -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +> { deployment_service_server::DeploymentServiceServer::new(DeploymentService { server, serving_readiness, diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/flight.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/flight.rs index f8151b11ee..08ea5a3ec4 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/flight.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/flight.rs @@ -25,7 +25,7 @@ use tonic::{Request, Response, Streaming}; use data_types::{DatabaseName, DatabaseNameError}; use observability_deps::tracing::{info, warn}; use query::exec::{ExecutionContextProvider, IOxExecutionContext}; -use server::{connection::ConnectionManager, Server}; +use server::Server; use super::error::default_server_error_handler; use crate::influxdb_ioxd::planner::Planner; @@ -125,22 +125,16 @@ struct ReadInfo { /// Concrete implementation of the gRPC Arrow Flight Service API #[derive(Debug)] -struct FlightService { - server: Arc>, +struct FlightService { + server: Arc, } -pub fn make_server(server: Arc>) -> FlightServer -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +pub fn make_server(server: Arc) -> FlightServer { FlightServer::new(FlightService { server }) } #[tonic::async_trait] -impl Flight for FlightService -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +impl Flight for FlightService { type HandshakeStream = TonicStream; type ListFlightsStream = TonicStream; type DoGetStream = TonicStream; diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs index a3f6c5e3b4..eb65507578 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs @@ -4,16 +4,14 @@ use generated_types::{ influxdata::iox::management::v1::{Error as ProtobufError, *}, }; use query::QueryDatabase; -use server::{ - connection::ConnectionManager, rules::ProvidedDatabaseRules, ApplicationState, Error, Server, -}; -use std::{convert::TryFrom, fmt::Debug, sync::Arc}; +use server::{rules::ProvidedDatabaseRules, ApplicationState, Error, Server}; +use std::{convert::TryFrom, sync::Arc}; use tonic::{Request, Response, Status}; use uuid::Uuid; -struct ManagementService { +struct ManagementService { application: Arc, - server: Arc>, + server: Arc, } use super::error::{ @@ -21,10 +19,7 @@ use super::error::{ }; #[tonic::async_trait] -impl management_service_server::ManagementService for ManagementService -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +impl management_service_server::ManagementService for ManagementService { async fn list_databases( &self, request: Request, @@ -523,15 +518,12 @@ fn format_rules(provided_rules: Arc, omit_defaults: bool) } } -pub fn make_server( +pub fn make_server( application: Arc, - server: Arc>, + server: Arc, ) -> management_service_server::ManagementServiceServer< impl management_service_server::ManagementService, -> -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +> { management_service_server::ManagementServiceServer::new(ManagementService { application, server, diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/mod.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/mod.rs index c2322b0703..9ecbc86646 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/mod.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/mod.rs @@ -1,7 +1,5 @@ use std::sync::Arc; -use server::connection::ConnectionManager; - use crate::influxdb_ioxd::{ rpc::{add_gated_service, add_service, serve_builder, setup_builder, RpcBuilderInput}, server_type::{database::DatabaseServerType, RpcError}, @@ -16,13 +14,10 @@ mod operations; mod storage; mod write_pb; -pub async fn server_grpc( - server_type: Arc>, +pub async fn server_grpc( + server_type: Arc, builder_input: RpcBuilderInput, -) -> Result<(), RpcError> -where - M: ConnectionManager + std::fmt::Debug + Send + Sync + 'static, -{ +) -> Result<(), RpcError> { let builder = setup_builder!(builder_input, server_type); add_gated_service!( diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/write_pb.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/write_pb.rs index d285b65b0e..69f861a959 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/write_pb.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/write_pb.rs @@ -2,21 +2,17 @@ use data_types::DatabaseName; use dml::{DmlMeta, DmlOperation, DmlWrite}; use generated_types::google::{FieldViolation, FieldViolationExt}; use generated_types::influxdata::pbdata::v1::*; -use server::{connection::ConnectionManager, Server}; -use std::fmt::Debug; +use server::Server; use std::sync::Arc; use super::error::{default_database_write_error_handler, default_server_error_handler}; -struct PBWriteService { - server: Arc>, +struct PBWriteService { + server: Arc, } #[tonic::async_trait] -impl write_service_server::WriteService for PBWriteService -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +impl write_service_server::WriteService for PBWriteService { async fn write( &self, request: tonic::Request, @@ -53,11 +49,8 @@ where } } -pub fn make_server( - server: Arc>, -) -> write_service_server::WriteServiceServer -where - M: ConnectionManager + Send + Sync + Debug + 'static, -{ +pub fn make_server( + server: Arc, +) -> write_service_server::WriteServiceServer { write_service_server::WriteServiceServer::new(PBWriteService { server }) } diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/setup.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/setup.rs index c38ccef35e..7a4dc8d4c4 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/setup.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/setup.rs @@ -2,9 +2,7 @@ use std::sync::Arc; use object_store::ObjectStore; use observability_deps::tracing::warn; -use server::{ - connection::ConnectionManagerImpl, ApplicationState, RemoteTemplate, Server, ServerConfig, -}; +use server::{ApplicationState, Server, ServerConfig}; use snafu::{ResultExt, Snafu}; use trace::TraceCollector; @@ -47,18 +45,13 @@ pub async fn make_application( ))) } -pub fn make_server( - application: Arc, - config: &Config, -) -> Arc> { +pub fn make_server(application: Arc, config: &Config) -> Arc { let server_config = ServerConfig { - remote_template: config.remote_template.clone().map(RemoteTemplate::new), wipe_catalog_on_error: config.wipe_catalog_on_error.into(), skip_replay_and_seek_instead: config.skip_replay_and_seek_instead.into(), }; - let connection_manager = ConnectionManagerImpl::new(); - let app_server = Arc::new(Server::new(connection_manager, application, server_config)); + let app_server = Arc::new(Server::new(application, server_config)); // if this ID isn't set the server won't be usable until this is set via an API // call diff --git a/server/src/connection.rs b/server/src/connection.rs deleted file mode 100644 index c5d0d0030e..0000000000 --- a/server/src/connection.rs +++ /dev/null @@ -1,185 +0,0 @@ -use std::sync::Arc; - -use async_trait::async_trait; -use cache_loader_async::cache_api::LoadingCache; -use snafu::{ResultExt, Snafu}; - -use dml::DmlWrite; -use generated_types::influxdata::pbdata::v1::WriteRequest; -use influxdb_iox_client::{connection::Builder, write}; -use observability_deps::tracing::debug; - -type RemoteServerError = Box; - -#[derive(Debug, Snafu)] -pub enum ConnectionManagerError { - #[snafu(display("cannot connect to remote: {}", source))] - RemoteServerConnectError { source: RemoteServerError }, - #[snafu(display("cannot write to remote: {}", source))] - RemoteServerWriteError { source: write::WriteError }, -} - -/// The `Server` will ask the `ConnectionManager` for connections to a specific -/// remote server. These connections can be used to communicate with other -/// servers. This is implemented as a trait for dependency injection in testing. -#[async_trait] -pub trait ConnectionManager { - type RemoteServer: RemoteServer + Send + Sync + 'static; - - async fn remote_server( - &self, - connect: &str, - ) -> Result, ConnectionManagerError>; -} - -/// The `RemoteServer` represents the API for replicating, subscribing, and -/// querying other servers. -#[async_trait] -pub trait RemoteServer { - /// Sends a [`DmlWrite`] to the remote server. An IOx server acting as a - /// router/sharder will call this method to send entries to remotes. - async fn write(&self, db: &str, write: &DmlWrite) -> Result<(), ConnectionManagerError>; -} - -/// The connection manager maps a host identifier to a remote server. -#[derive(Debug)] -pub struct ConnectionManagerImpl { - cache: LoadingCache, CacheFillError>, -} - -// Error must be Clone because LoadingCache requires so. -#[derive(Debug, Snafu, Clone)] -pub enum CacheFillError { - #[snafu(display("gRPC error: {}", source))] - GrpcError { - source: Arc, - }, -} - -impl ConnectionManagerImpl { - pub fn new() -> Self { - let cache = LoadingCache::new(Self::cached_remote_server); - Self { cache } - } - - async fn cached_remote_server( - connect: String, - ) -> Result, CacheFillError> { - let connection = Builder::default() - .build(&connect) - .await - .map_err(|e| Arc::new(e) as _) - .context(GrpcError)?; - let client = write::Client::new(connection); - Ok(Arc::new(RemoteServerImpl { client })) - } -} - -impl Default for ConnectionManagerImpl { - fn default() -> Self { - Self::new() - } -} - -#[async_trait] -impl ConnectionManager for ConnectionManagerImpl { - type RemoteServer = RemoteServerImpl; - - async fn remote_server( - &self, - connect: &str, - ) -> Result, ConnectionManagerError> { - let ret = self - .cache - .get_with_meta(connect.to_string()) - .await - .map_err(|e| Box::new(e) as _) - .context(RemoteServerConnectError)?; - debug!(was_cached=%ret.cached, %connect, "getting remote connection"); - Ok(ret.result) - } -} - -/// An implementation for communicating with other IOx servers. This should -/// be moved into and implemented in an influxdb_iox_client create at a later -/// date. -#[derive(Debug)] -pub struct RemoteServerImpl { - client: write::Client, -} - -#[async_trait] -impl RemoteServer for RemoteServerImpl { - /// Sends a write to the remote server. An IOx server acting as a - /// router/sharder will call this method to send entries to remotes. - async fn write(&self, db_name: &str, write: &DmlWrite) -> Result<(), ConnectionManagerError> { - let data = mutable_batch_pb::encode::encode_write(db_name, write); - self.client - .clone() // cheap, see https://docs.rs/tonic/0.4.2/tonic/client/index.html#concurrent-usage - .write_pb(WriteRequest { - database_batch: Some(data), - }) - .await - .context(RemoteServerWriteError) - } -} - -pub mod test_helpers { - use std::sync::atomic::{AtomicBool, Ordering}; - - use super::*; - use std::collections::BTreeMap; - - #[derive(Debug)] - pub struct TestConnectionManager { - pub remotes: BTreeMap>, - } - - impl Default for TestConnectionManager { - fn default() -> Self { - Self::new() - } - } - - impl TestConnectionManager { - pub fn new() -> Self { - Self { - remotes: BTreeMap::new(), - } - } - } - - #[async_trait] - impl ConnectionManager for TestConnectionManager { - type RemoteServer = TestRemoteServer; - - async fn remote_server( - &self, - id: &str, - ) -> Result, ConnectionManagerError> { - #[derive(Debug, Snafu)] - enum TestRemoteError { - #[snafu(display("remote not found"))] - NotFound, - } - Ok(Arc::clone(self.remotes.get(id).ok_or_else(|| { - ConnectionManagerError::RemoteServerConnectError { - source: Box::new(TestRemoteError::NotFound), - } - })?)) - } - } - - #[derive(Debug)] - pub struct TestRemoteServer { - pub written: Arc, - } - - #[async_trait] - impl<'a> RemoteServer for TestRemoteServer { - async fn write(&self, _db: &str, _write: &DmlWrite) -> Result<(), ConnectionManagerError> { - self.written.store(true, Ordering::Relaxed); - Ok(()) - } - } -} diff --git a/server/src/lib.rs b/server/src/lib.rs index f544543803..414a2aabe4 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -70,7 +70,6 @@ use ::lifecycle::{LockableChunk, LockablePartition}; use async_trait::async_trait; -use connection::ConnectionManager; use data_types::{ chunk_metadata::ChunkId, detailed_database::ActiveDatabase, @@ -87,7 +86,6 @@ use internal_types::freezable::Freezable; use iox_object_store::IoxObjectStore; use observability_deps::tracing::{error, info, warn}; use parking_lot::RwLock; -use resolver::Resolver; use snafu::{ensure, OptionExt, ResultExt, Snafu}; use std::sync::Arc; use tokio::{sync::Notify, task::JoinError}; @@ -98,14 +96,11 @@ use uuid::Uuid; pub use application::ApplicationState; pub use db::Db; pub use job::JobRegistry; -pub use resolver::{GrpcConnectionString, RemoteTemplate}; mod application; -pub mod connection; pub mod database; pub mod db; mod job; -mod resolver; pub mod rules; use rules::{PersistedDatabaseRules, ProvidedDatabaseRules}; @@ -233,8 +228,6 @@ pub trait DatabaseStore: std::fmt::Debug + Send + Sync { /// Configuration options for `Server` #[derive(Debug)] pub struct ServerConfig { - pub remote_template: Option, - pub wipe_catalog_on_error: bool, pub skip_replay_and_seek_instead: bool, @@ -243,7 +236,6 @@ pub struct ServerConfig { impl Default for ServerConfig { fn default() -> Self { Self { - remote_template: None, wipe_catalog_on_error: false, skip_replay_and_seek_instead: false, } @@ -254,20 +246,15 @@ impl Default for ServerConfig { /// well as how they communicate with other servers. Each server will have one /// of these structs, which keeps track of all replication and query rules. #[derive(Debug)] -pub struct Server { - connection_manager: Arc, - +pub struct Server { /// Future that resolves when the background worker exits join: Shared>>>, - /// Resolver for mapping ServerId to gRPC connection strings - resolver: RwLock, - /// State shared with the background worker shared: Arc, } -impl Drop for Server { +impl Drop for Server { fn drop(&mut self) { if !self.shared.shutdown.is_cancelled() { warn!("server dropped without calling shutdown()"); @@ -466,15 +453,8 @@ impl ServerStateInitialized { } } -impl Server -where - M: ConnectionManager + Send + Sync, -{ - pub fn new( - connection_manager: M, - application: Arc, - config: ServerConfig, - ) -> Self { +impl Server { + pub fn new(application: Arc, config: ServerConfig) -> Self { let shared = Arc::new(ServerShared { shutdown: Default::default(), application, @@ -488,12 +468,7 @@ where let handle = tokio::spawn(background_worker(Arc::clone(&shared))); let join = handle.map_err(Arc::new).boxed().shared(); - Self { - shared, - join, - connection_manager: Arc::new(connection_manager), - resolver: RwLock::new(Resolver::new(config.remote_template)), - } + Self { shared, join } } /// sets the id of the server, which is used for replication and the base @@ -860,18 +835,6 @@ where .context(CanNotUpdateRules { db_name })?) } - pub fn remotes_sorted(&self) -> Vec<(ServerId, String)> { - self.resolver.read().remotes_sorted() - } - - pub fn update_remote(&self, id: ServerId, addr: GrpcConnectionString) { - self.resolver.write().update_remote(id, addr) - } - - pub fn delete_remote(&self, id: ServerId) -> Option { - self.resolver.write().delete_remote(id) - } - /// Closes a chunk and starts moving its data to the read buffer, as a /// background job, dropping when complete. pub fn close_chunk( @@ -1115,10 +1078,7 @@ async fn maybe_initialize_server(shared: &ServerShared) { /// TODO: Revisit this trait's API #[async_trait] -impl DatabaseStore for Server -where - M: ConnectionManager + std::fmt::Debug + Send + Sync, -{ +impl DatabaseStore for Server { type Database = Db; type Error = Error; @@ -1148,10 +1108,7 @@ where } #[cfg(test)] -impl Server -where - M: ConnectionManager + Send + Sync, -{ +impl Server { /// For tests: list of database names in this server, regardless /// of their initialization state fn db_names_sorted(&self) -> Vec { @@ -1218,7 +1175,6 @@ async fn database_name_from_rules_file( pub mod test_utils { use super::*; - use crate::connection::test_helpers::TestConnectionManager; use object_store::ObjectStore; /// Create a new [`ApplicationState`] with an in-memory object store @@ -1231,12 +1187,8 @@ pub mod test_utils { } /// Creates a new server with the provided [`ApplicationState`] - pub fn make_server(application: Arc) -> Arc> { - Arc::new(Server::new( - TestConnectionManager::new(), - application, - Default::default(), - )) + pub fn make_server(application: Arc) -> Arc { + Arc::new(Server::new(application, Default::default())) } /// Creates a new server with the provided [`ApplicationState`] @@ -1245,7 +1197,7 @@ pub mod test_utils { pub async fn make_initialized_server( server_id: ServerId, application: Arc, - ) -> Arc> { + ) -> Arc { let server = make_server(application); server.set_id(server_id).unwrap(); server.wait_for_init().await.unwrap(); @@ -1457,13 +1409,10 @@ mod tests { } } - async fn create_simple_database( - server: &Server, + async fn create_simple_database( + server: &Server, name: impl Into + Send, - ) -> Result> - where - M: ConnectionManager + Send + Sync, - { + ) -> Result> { let name = DatabaseName::new(name.into()).unwrap(); let rules = DatabaseRules { diff --git a/server/src/resolver.rs b/server/src/resolver.rs deleted file mode 100644 index e77ea9bf41..0000000000 --- a/server/src/resolver.rs +++ /dev/null @@ -1,54 +0,0 @@ -use data_types::server_id::ServerId; -use std::collections::BTreeMap; - -/// A RemoteTemplate string is a remote connection template string. -/// Occurrences of the substring "{id}" in the template will be replaced -/// by the server ID. -#[derive(Debug)] -pub struct RemoteTemplate { - template: String, -} - -impl RemoteTemplate { - pub fn new(template: impl Into) -> Self { - let template = template.into(); - Self { template } - } -} - -/// A gRPC connection string. -pub type GrpcConnectionString = String; - -/// The Resolver provides a mapping between ServerId and GRpcConnectionString -#[derive(Debug)] -pub struct Resolver { - /// Map between remote IOx server IDs and management API connection strings. - remotes: BTreeMap, - - /// Static map between remote server IDs and hostnames based on a template - remote_template: Option, -} - -impl Resolver { - pub fn new(remote_template: Option) -> Self { - Self { - remotes: Default::default(), - remote_template, - } - } - - /// Get all registered remote servers. - pub fn remotes_sorted(&self) -> Vec<(ServerId, String)> { - self.remotes.iter().map(|(&a, b)| (a, b.clone())).collect() - } - - /// Update given remote server. - pub fn update_remote(&mut self, id: ServerId, addr: GrpcConnectionString) { - self.remotes.insert(id, addr); - } - - /// Delete remote server by ID. - pub fn delete_remote(&mut self, id: ServerId) -> Option { - self.remotes.remove(&id) - } -} diff --git a/server/tests/write_buffer_delete.rs b/server/tests/write_buffer_delete.rs index c09be30957..d4ccd198e4 100644 --- a/server/tests/write_buffer_delete.rs +++ b/server/tests/write_buffer_delete.rs @@ -23,7 +23,6 @@ use query::frontend::sql::SqlQueryPlanner; use regex::Regex; use router::router::Router; use router::server::RouterServer; -use server::connection::test_helpers::TestConnectionManager; use server::rules::ProvidedDatabaseRules; use server::test_utils::{make_application, make_initialized_server}; use server::{Db, Server}; @@ -44,7 +43,7 @@ use write_buffer::mock::MockBufferSharedState; struct DistributedTest { router: Arc, - consumer: Arc>, + consumer: Arc, consumer_db: Arc, }