diff --git a/server/src/connection.rs b/server/src/connection.rs index 7f1f081642..94b04beca8 100644 --- a/server/src/connection.rs +++ b/server/src/connection.rs @@ -120,8 +120,7 @@ impl RemoteServer for RemoteServerImpl { } } -#[cfg(test)] -pub(crate) mod test_helpers { +pub mod test_helpers { use std::sync::atomic::{AtomicBool, Ordering}; use super::*; @@ -132,6 +131,12 @@ pub(crate) mod test_helpers { pub remotes: BTreeMap>, } + impl Default for TestConnectionManager { + fn default() -> Self { + Self::new() + } + } + impl TestConnectionManager { pub fn new() -> Self { Self { diff --git a/server/src/db.rs b/server/src/db.rs index 12bb8c6711..b6235efc2f 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1398,16 +1398,14 @@ mod tests { use arrow::record_batch::RecordBatch; use bytes::Bytes; use futures::{stream, StreamExt, TryStreamExt}; - use predicate::delete_expr::DeleteExpr; use tokio_util::sync::CancellationToken; - use ::test_helpers::{assert_contains, maybe_start_logging}; + use ::test_helpers::assert_contains; use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; use data_types::{ chunk_metadata::{ChunkAddr, ChunkStorage}, database_rules::{LifecycleRules, PartitionTemplate, TemplatePart}, partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, - timestamp::TimestampRange, write_summary::TimestampSummary, }; use entry::test_helpers::lp_to_entry; @@ -3563,233 +3561,6 @@ mod tests { join_handle.await.unwrap(); } - #[tokio::test] - async fn delete_predicate_preservation() { - // Test that delete predicates are stored within the preserved catalog - maybe_start_logging(); - - // ==================== setup ==================== - let object_store = Arc::new(ObjectStore::new_in_memory()); - let server_id = ServerId::try_from(1).unwrap(); - let db_name = "delete_predicate_preservation_test"; - - // ==================== do: create DB ==================== - // Create a DB given a server id, an object store and a db name - let test_db = TestDb::builder() - .object_store(Arc::clone(&object_store)) - .server_id(server_id) - .db_name(db_name) - .lifecycle_rules(LifecycleRules { - catalog_transactions_until_checkpoint: NonZeroU64::try_from(1).unwrap(), - // do not prune transactions files because this tests relies on them - catalog_transaction_prune_age: Duration::from_secs(1_000), - late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(), - ..Default::default() - }) - .partition_template(PartitionTemplate { - parts: vec![TemplatePart::Column("part".to_string())], - }) - .build() - .await; - let db = Arc::new(test_db.db); - - // ==================== do: create chunks ==================== - let table_name = "cpu"; - - // 1: preserved - let partition_key = "part_a"; - write_lp(&db, "cpu,part=a row=10,selector=0i 10").await; - write_lp(&db, "cpu,part=a row=11,selector=1i 11").await; - db.persist_partition(table_name, partition_key, true) - .await - .unwrap(); - - // 2: RUB - let partition_key = "part_b"; - write_lp(&db, "cpu,part=b row=20,selector=0i 20").await; - write_lp(&db, "cpu,part=b row=21,selector=1i 21").await; - db.compact_partition(table_name, partition_key) - .await - .unwrap(); - - // 3: MUB - let _partition_key = "part_c"; - write_lp(&db, "cpu,part=c row=30,selector=0i 30").await; - write_lp(&db, "cpu,part=c row=31,selector=1i 31").await; - - // 4: preserved and unloaded - let partition_key = "part_d"; - write_lp(&db, "cpu,part=d row=40,selector=0i 40").await; - write_lp(&db, "cpu,part=d row=41,selector=1i 41").await; - - let chunk_id = db - .persist_partition(table_name, partition_key, true) - .await - .unwrap() - .unwrap() - .id(); - - db.unload_read_buffer(table_name, partition_key, chunk_id) - .unwrap(); - - // ==================== do: delete ==================== - let pred = Arc::new(DeletePredicate { - range: TimestampRange { - start: 0, - end: 1_000, - }, - exprs: vec![DeleteExpr::new( - "selector".to_string(), - predicate::delete_expr::Op::Eq, - predicate::delete_expr::Scalar::I64(1), - )], - }); - db.delete("cpu", Arc::clone(&pred)).await.unwrap(); - - // ==================== do: preserve another partition ==================== - let partition_key = "part_b"; - db.persist_partition(table_name, partition_key, true) - .await - .unwrap(); - - // ==================== do: use background worker for a short while ==================== - let iters_start = db.worker_iterations_delete_predicate_preservation(); - let shutdown: CancellationToken = Default::default(); - let shutdown_captured = shutdown.clone(); - let db_captured = Arc::clone(&db); - let join_handle = - tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); - - let t_0 = Instant::now(); - loop { - let did_delete_predicate_preservation = - db.worker_iterations_delete_predicate_preservation() > iters_start; - let did_compaction = db.chunk_summaries().unwrap().into_iter().any(|summary| { - (summary.partition_key.as_ref() == "part_c") - && (summary.storage == ChunkStorage::ReadBuffer) - }); - if did_delete_predicate_preservation && did_compaction { - break; - } - assert!(t_0.elapsed() < Duration::from_secs(10)); - tokio::time::sleep(Duration::from_millis(100)).await; - } - - shutdown.cancel(); - join_handle.await.unwrap(); - - // ==================== check: delete predicates ==================== - let closure_check_delete_predicates = |db: &Db| { - for chunk in db.catalog.chunks() { - let chunk = chunk.read(); - if chunk.addr().partition_key.as_ref() == "part_b" { - // Strictly speaking not required because the chunk was persisted AFTER the delete predicate was - // registered so we can get away with materializing it during persistence. - continue; - } - if chunk.addr().partition_key.as_ref() == "part_c" { - // This partition was compacted, so the delete predicates were materialized. - continue; - } - let predicates = chunk.delete_predicates(); - assert_eq!(predicates.len(), 1); - assert_eq!(predicates[0].as_ref(), pred.as_ref()); - } - }; - closure_check_delete_predicates(&db); - - // ==================== check: query ==================== - let expected = vec![ - "+------+-----+----------+--------------------------------+", - "| part | row | selector | time |", - "+------+-----+----------+--------------------------------+", - "| a | 10 | 0 | 1970-01-01T00:00:00.000000010Z |", - "| b | 20 | 0 | 1970-01-01T00:00:00.000000020Z |", - "| c | 30 | 0 | 1970-01-01T00:00:00.000000030Z |", - "| d | 40 | 0 | 1970-01-01T00:00:00.000000040Z |", - "+------+-----+----------+--------------------------------+", - ]; - let batches = run_query(Arc::clone(&db), "select * from cpu order by time").await; - assert_batches_sorted_eq!(&expected, &batches); - - // ==================== do: re-load DB ==================== - // Re-create database with same store, serverID, and DB name - drop(db); - let test_db = TestDb::builder() - .object_store(Arc::clone(&object_store)) - .server_id(server_id) - .db_name(db_name) - .build() - .await; - let db = Arc::new(test_db.db); - - // ==================== check: delete predicates ==================== - closure_check_delete_predicates(&db); - - // ==================== check: query ==================== - // NOTE: partition "c" is gone here because it was not written to object store - let expected = vec![ - "+------+-----+----------+--------------------------------+", - "| part | row | selector | time |", - "+------+-----+----------+--------------------------------+", - "| a | 10 | 0 | 1970-01-01T00:00:00.000000010Z |", - "| b | 20 | 0 | 1970-01-01T00:00:00.000000020Z |", - "| d | 40 | 0 | 1970-01-01T00:00:00.000000040Z |", - "+------+-----+----------+--------------------------------+", - ]; - let batches = run_query(Arc::clone(&db), "select * from cpu order by time").await; - assert_batches_sorted_eq!(&expected, &batches); - - // ==================== do: remove checkpoint files ==================== - let files = db - .iox_object_store - .catalog_transaction_files() - .await - .unwrap() - .try_concat() - .await - .unwrap(); - let mut deleted_one = false; - for file in files { - if file.is_checkpoint() { - db.iox_object_store - .delete_catalog_transaction_file(&file) - .await - .unwrap(); - deleted_one = true; - } - } - assert!(deleted_one); - - // ==================== do: re-load DB ==================== - // Re-create database with same store, serverID, and DB name - drop(db); - let test_db = TestDb::builder() - .object_store(Arc::clone(&object_store)) - .server_id(server_id) - .db_name(db_name) - .build() - .await; - let db = Arc::new(test_db.db); - - // ==================== check: delete predicates ==================== - closure_check_delete_predicates(&db); - - // ==================== check: query ==================== - // NOTE: partition "c" is gone here because it was not written to object store - let _expected = vec![ - "+------+-----+----------+--------------------------------+", - "| part | row | selector | time |", - "+------+-----+----------+--------------------------------+", - "| a | 10 | 0 | 1970-01-01T00:00:00.000000010Z |", - "| b | 20 | 0 | 1970-01-01T00:00:00.000000020Z |", - "| d | 40 | 0 | 1970-01-01T00:00:00.000000040Z |", - "+------+-----+----------+--------------------------------+", - ]; - let batches = run_query(Arc::clone(&db), "select * from cpu order by time").await; - assert_batches_sorted_eq!(&expected, &batches); - } - #[tokio::test] async fn table_wide_schema_enforcement() { // need a table with a partition template that uses a tag column, so that we can easily write to different partitions diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 25bc4a01da..a3791c8041 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -1,6 +1,7 @@ use super::{ catalog::chunk::ChunkMetadata, pred::to_read_buffer_predicate, streams::ReadFilterResultsStream, }; +use data_types::chunk_metadata::ChunkAddr; use data_types::{ chunk_metadata::{ChunkId, ChunkOrder}, partition_metadata, @@ -76,8 +77,7 @@ pub type Result = std::result::Result; /// MutableBuffer, ReadBuffer, or a ParquetFile #[derive(Debug)] pub struct DbChunk { - id: ChunkId, - table_name: Arc, + addr: ChunkAddr, access_recorder: AccessRecorder, state: State, meta: Arc, @@ -88,22 +88,15 @@ pub struct DbChunk { #[derive(Debug)] enum State { - MutableBuffer { - chunk: Arc, - }, - ReadBuffer { - chunk: Arc, - partition_key: Arc, - }, - ParquetFile { - chunk: Arc, - }, + MutableBuffer { chunk: Arc }, + ReadBuffer { chunk: Arc }, + ParquetFile { chunk: Arc }, } impl DbChunk { /// Create a DBChunk snapshot of the catalog chunk pub fn snapshot(chunk: &super::catalog::chunk::CatalogChunk) -> Arc { - let partition_key = Arc::from(chunk.key()); + let addr = chunk.addr().clone(); use super::catalog::chunk::{ChunkStage, ChunkStageFrozenRepr}; @@ -137,7 +130,6 @@ impl DbChunk { }, ChunkStageFrozenRepr::ReadBuffer(repr) => State::ReadBuffer { chunk: Arc::clone(repr), - partition_key, }, }; (state, Arc::clone(meta)) @@ -151,7 +143,6 @@ impl DbChunk { let state = if let Some(read_buffer) = &read_buffer { State::ReadBuffer { chunk: Arc::clone(read_buffer), - partition_key, } } else { State::ParquetFile { @@ -163,8 +154,7 @@ impl DbChunk { }; Arc::new(Self { - id: chunk.id(), - table_name: chunk.table_name(), + addr, access_recorder: chunk.access_recorder().clone(), state, meta, @@ -193,8 +183,7 @@ impl DbChunk { } }; Arc::new(Self { - id: chunk.id(), - table_name: chunk.table_name(), + addr: chunk.addr().clone(), meta, state, access_recorder: chunk.access_recorder().clone(), @@ -213,9 +202,14 @@ impl DbChunk { } } + /// Return the address of this chunk + pub fn addr(&self) -> &ChunkAddr { + &self.addr + } + /// Return the name of the table in this chunk pub fn table_name(&self) -> &Arc { - &self.table_name + &self.addr.table_name } pub fn time_of_first_write(&self) -> Time { @@ -249,11 +243,11 @@ impl QueryChunk for DbChunk { type Error = Error; fn id(&self) -> ChunkId { - self.id + self.addr.chunk_id } fn table_name(&self) -> &str { - self.table_name.as_ref() + self.addr.table_name.as_ref() } fn may_contain_pk_duplicates(&self) -> bool { @@ -663,7 +657,7 @@ mod tests { .await .unwrap() .unwrap() - .id; + .id(); db.unload_read_buffer("cpu", "1970-01-01T00", id).unwrap(); diff --git a/server/src/lib.rs b/server/src/lib.rs index 202d6217a0..cc61533d99 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -68,7 +68,9 @@ clippy::future_not_send )] +use ::lifecycle::{LockableChunk, LockablePartition}; use async_trait::async_trait; +use connection::{ConnectionManager, RemoteServer}; use data_types::{ chunk_metadata::ChunkId, database_rules::{NodeGroup, RoutingRules, ShardId, Sink}, @@ -86,7 +88,6 @@ use hashbrown::HashMap; use influxdb_line_protocol::ParsedLine; use internal_types::freezable::Freezable; use iox_object_store::IoxObjectStore; -use lifecycle::{LockableChunk, LockablePartition}; use observability_deps::tracing::{error, info, warn}; use parking_lot::RwLock; use rand::seq::SliceRandom; @@ -100,13 +101,12 @@ use tracker::{TaskTracker, TrackedFutureExt}; use uuid::Uuid; pub use application::ApplicationState; -pub use connection::{ConnectionManager, ConnectionManagerImpl, RemoteServer}; pub use db::Db; pub use job::JobRegistry; pub use resolver::{GrpcConnectionString, RemoteTemplate}; mod application; -mod connection; +pub mod connection; pub mod database; pub mod db; mod job; @@ -1233,9 +1233,8 @@ mod tests { use arrow_util::assert_batches_eq; use bytes::Bytes; use connection::test_helpers::{TestConnectionManager, TestRemoteServer}; - use data_types::chunk_metadata::ChunkStorage; use data_types::{ - chunk_metadata::ChunkAddr, + chunk_metadata::{ChunkAddr, ChunkStorage}, database_rules::{ DatabaseRules, HashRing, LifecycleRules, PartitionTemplate, ShardConfig, TemplatePart, WriteBufferConnection, WriteBufferDirection, NO_SHARD_CONFIG, diff --git a/server/tests/delete.rs b/server/tests/delete.rs new file mode 100644 index 0000000000..478aa05566 --- /dev/null +++ b/server/tests/delete.rs @@ -0,0 +1,274 @@ +use futures::TryStreamExt; +use std::{ + num::{NonZeroU32, NonZeroU64}, + sync::Arc, + time::{Duration, Instant}, +}; + +use arrow_util::assert_batches_sorted_eq; +use data_types::{ + chunk_metadata::ChunkStorage, + database_rules::{DatabaseRules, LifecycleRules, PartitionTemplate, TemplatePart}, + server_id::ServerId, + timestamp::TimestampRange, + DatabaseName, +}; +use iox_object_store::IoxObjectStore; +use object_store::ObjectStore; +use predicate::{delete_expr::DeleteExpr, delete_predicate::DeletePredicate}; +use query::{QueryChunk, QueryChunkMeta, QueryDatabase}; +use server::{ + connection::test_helpers::TestConnectionManager, + db::test_helpers::{run_query, write_lp}, + rules::ProvidedDatabaseRules, + ApplicationState, Db, Server, +}; +use test_helpers::maybe_start_logging; + +async fn start_server( + server_id: ServerId, + application: Arc, +) -> Arc> { + let server = Arc::new(Server::new( + TestConnectionManager::new(), + application, + Default::default(), + )); + server.set_id(server_id).unwrap(); + server.wait_for_init().await.unwrap(); + server +} + +#[tokio::test] +async fn delete_predicate_preservation() { + maybe_start_logging(); + + // ==================== setup ==================== + let object_store = Arc::new(ObjectStore::new_in_memory()); + let server_id = ServerId::new(NonZeroU32::new(1).unwrap()); + let db_name = DatabaseName::new("delete_predicate_preservation_test").unwrap(); + + let application = Arc::new(ApplicationState::new(Arc::clone(&object_store), None)); + let server = start_server(server_id, Arc::clone(&application)).await; + + // Test that delete predicates are stored within the preserved catalog + + // ==================== do: create DB ==================== + // Create a DB given a server id, an object store and a db name + + let rules = DatabaseRules { + partition_template: PartitionTemplate { + parts: vec![TemplatePart::Column("part".to_string())], + }, + lifecycle_rules: LifecycleRules { + catalog_transactions_until_checkpoint: NonZeroU64::new(1).unwrap(), + // do not prune transactions files because this tests relies on them + catalog_transaction_prune_age: Duration::from_secs(1_000), + late_arrive_window_seconds: NonZeroU32::new(1).unwrap(), + ..Default::default() + }, + ..DatabaseRules::new(db_name.clone()) + }; + + let database = server + .create_database(ProvidedDatabaseRules::new_rules(rules.clone().into()).unwrap()) + .await + .unwrap(); + let db = database.initialized_db().unwrap(); + + // ==================== do: create chunks ==================== + let table_name = "cpu"; + + // 1: preserved + let partition_key = "part_a"; + write_lp(&db, "cpu,part=a row=10,selector=0i 10").await; + write_lp(&db, "cpu,part=a row=11,selector=1i 11").await; + db.persist_partition(table_name, partition_key, true) + .await + .unwrap(); + + // 2: RUB + let partition_key = "part_b"; + write_lp(&db, "cpu,part=b row=20,selector=0i 20").await; + write_lp(&db, "cpu,part=b row=21,selector=1i 21").await; + db.compact_partition(table_name, partition_key) + .await + .unwrap(); + + // 3: MUB + let _partition_key = "part_c"; + write_lp(&db, "cpu,part=c row=30,selector=0i 30").await; + write_lp(&db, "cpu,part=c row=31,selector=1i 31").await; + + // 4: preserved and unloaded + let partition_key = "part_d"; + write_lp(&db, "cpu,part=d row=40,selector=0i 40").await; + write_lp(&db, "cpu,part=d row=41,selector=1i 41").await; + + let chunk_id = db + .persist_partition(table_name, partition_key, true) + .await + .unwrap() + .unwrap() + .id(); + + db.unload_read_buffer(table_name, partition_key, chunk_id) + .unwrap(); + + // ==================== do: delete ==================== + let pred = Arc::new(DeletePredicate { + range: TimestampRange { + start: 0, + end: 1_000, + }, + exprs: vec![DeleteExpr::new( + "selector".to_string(), + predicate::delete_expr::Op::Eq, + predicate::delete_expr::Scalar::I64(1), + )], + }); + db.delete("cpu", Arc::clone(&pred)).await.unwrap(); + + // ==================== do: preserve another partition ==================== + let partition_key = "part_b"; + db.persist_partition(table_name, partition_key, true) + .await + .unwrap(); + + // ==================== do: use background worker for a short while ==================== + let iters_start = db.worker_iterations_delete_predicate_preservation(); + // time_provider.inc(rules.lifecycle_rules.late_arrive_window()); + + let t_0 = Instant::now(); + loop { + let did_delete_predicate_preservation = + db.worker_iterations_delete_predicate_preservation() > iters_start; + let did_compaction = db.chunk_summaries().unwrap().into_iter().any(|summary| { + (summary.partition_key.as_ref() == "part_c") + && (summary.storage == ChunkStorage::ReadBuffer) + }); + if did_delete_predicate_preservation && did_compaction { + break; + } + assert!(t_0.elapsed() < Duration::from_secs(10)); + tokio::time::sleep(Duration::from_millis(100)).await; + } + + // ==================== check: delete predicates ==================== + + let closure_check_delete_predicates = |db: &Db| { + for chunk in db.chunks(&Default::default()) { + let partition_key = chunk.addr().partition_key.as_ref(); + if partition_key == "part_b" { + // Strictly speaking not required because the chunk was persisted AFTER the delete predicate was + // registered so we can get away with materializing it during persistence. + continue; + } + if partition_key == "part_c" { + // This partition was compacted, so the delete predicates were materialized. + continue; + } + let predicates = chunk.delete_predicates(); + assert_eq!(predicates.len(), 1); + assert_eq!(predicates[0].as_ref(), pred.as_ref()); + } + }; + closure_check_delete_predicates(&db); + + // ==================== check: query ==================== + let expected = vec![ + "+------+-----+----------+--------------------------------+", + "| part | row | selector | time |", + "+------+-----+----------+--------------------------------+", + "| a | 10 | 0 | 1970-01-01T00:00:00.000000010Z |", + "| b | 20 | 0 | 1970-01-01T00:00:00.000000020Z |", + "| c | 30 | 0 | 1970-01-01T00:00:00.000000030Z |", + "| d | 40 | 0 | 1970-01-01T00:00:00.000000040Z |", + "+------+-----+----------+--------------------------------+", + ]; + let batches = run_query(Arc::clone(&db), "select * from cpu order by time").await; + assert_batches_sorted_eq!(&expected, &batches); + + // ==================== do: re-load DB ==================== + // Re-create database with same store, serverID, and DB name + server.shutdown(); + server.join().await.unwrap(); + + drop(db); + drop(server); + let server = start_server(server_id, Arc::clone(&application)).await; + let db = server.db(&db_name).unwrap(); + + // ==================== check: delete predicates ==================== + closure_check_delete_predicates(&db); + + // ==================== check: query ==================== + // NOTE: partition "c" is gone here because it was not written to object store + let expected = vec![ + "+------+-----+----------+--------------------------------+", + "| part | row | selector | time |", + "+------+-----+----------+--------------------------------+", + "| a | 10 | 0 | 1970-01-01T00:00:00.000000010Z |", + "| b | 20 | 0 | 1970-01-01T00:00:00.000000020Z |", + "| d | 40 | 0 | 1970-01-01T00:00:00.000000040Z |", + "+------+-----+----------+--------------------------------+", + ]; + + let batches = run_query(Arc::clone(&db), "select * from cpu order by time").await; + assert_batches_sorted_eq!(&expected, &batches); + + server.shutdown(); + server.join().await.unwrap(); + + drop(db); + drop(server); + + // ==================== do: remove checkpoint files ==================== + let iox_object_store = + IoxObjectStore::find_existing(Arc::clone(application.object_store()), server_id, &db_name) + .await + .unwrap() + .unwrap(); + + let files = iox_object_store + .catalog_transaction_files() + .await + .unwrap() + .try_concat() + .await + .unwrap(); + + let mut deleted_one = false; + for file in files { + if file.is_checkpoint() { + iox_object_store + .delete_catalog_transaction_file(&file) + .await + .unwrap(); + deleted_one = true; + } + } + assert!(deleted_one); + + // ==================== do: re-load DB ==================== + // Re-create database with same store, serverID, and DB name + let server = start_server(server_id, Arc::clone(&application)).await; + let db = server.db(&db_name).unwrap(); + + // ==================== check: delete predicates ==================== + closure_check_delete_predicates(&db); + + // ==================== check: query ==================== + // NOTE: partition "c" is gone here because it was not written to object store + let _expected = vec![ + "+------+-----+----------+--------------------------------+", + "| part | row | selector | time |", + "+------+-----+----------+--------------------------------+", + "| a | 10 | 0 | 1970-01-01T00:00:00.000000010Z |", + "| b | 20 | 0 | 1970-01-01T00:00:00.000000020Z |", + "| d | 40 | 0 | 1970-01-01T00:00:00.000000040Z |", + "+------+-----+----------+--------------------------------+", + ]; + let batches = run_query(Arc::clone(&db), "select * from cpu order by time").await; + assert_batches_sorted_eq!(&expected, &batches); +} diff --git a/src/influxdb_ioxd.rs b/src/influxdb_ioxd.rs index a6175d75e9..c562a485a7 100644 --- a/src/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -8,7 +8,7 @@ use object_store::{self, ObjectStore}; use observability_deps::tracing::{error, info, warn}; use panic_logging::SendPanicsToTracing; use server::{ - ApplicationState, ConnectionManagerImpl as ConnectionManager, RemoteTemplate, + connection::ConnectionManagerImpl as ConnectionManager, ApplicationState, RemoteTemplate, Server as AppServer, ServerConfig, }; use snafu::{ResultExt, Snafu}; diff --git a/src/influxdb_ioxd/http.rs b/src/influxdb_ioxd/http.rs index 80946712ec..96fdd7767f 100644 --- a/src/influxdb_ioxd/http.rs +++ b/src/influxdb_ioxd/http.rs @@ -28,7 +28,7 @@ use influxdb_iox_client::format::QueryOutputFormat; use influxdb_line_protocol::parse_lines; use predicate::delete_predicate::{parse_delete, DeletePredicate}; use query::exec::ExecutionContextProvider; -use server::{ApplicationState, ConnectionManager, Error, Server as AppServer}; +use server::{connection::ConnectionManager, ApplicationState, Error, Server as AppServer}; // External crates use bytes::{Bytes, BytesMut}; @@ -910,7 +910,9 @@ mod tests { use metric::{Attributes, DurationHistogram, Metric, U64Counter}; use object_store::ObjectStore; use serde::de::DeserializeOwned; - use server::{db::Db, rules::ProvidedDatabaseRules, ApplicationState, ConnectionManagerImpl}; + use server::{ + connection::ConnectionManagerImpl, db::Db, rules::ProvidedDatabaseRules, ApplicationState, + }; use tokio_stream::wrappers::ReceiverStream; use trace::RingBufferTraceCollector; diff --git a/src/influxdb_ioxd/rpc.rs b/src/influxdb_ioxd/rpc.rs index 2fc55a6352..226341fd85 100644 --- a/src/influxdb_ioxd/rpc.rs +++ b/src/influxdb_ioxd/rpc.rs @@ -10,7 +10,7 @@ use tonic::transport::NamedService; use trace_http::ctx::TraceHeaderParser; use crate::influxdb_ioxd::serving_readiness::ServingReadiness; -use server::{ApplicationState, ConnectionManager, Server}; +use server::{connection::ConnectionManager, ApplicationState, Server}; use trace::TraceCollector; pub mod error; diff --git a/src/influxdb_ioxd/rpc/flight.rs b/src/influxdb_ioxd/rpc/flight.rs index bb1df4dd8b..81926a9911 100644 --- a/src/influxdb_ioxd/rpc/flight.rs +++ b/src/influxdb_ioxd/rpc/flight.rs @@ -21,7 +21,7 @@ use tonic::{Request, Response, Streaming}; use data_types::{DatabaseName, DatabaseNameError}; use observability_deps::tracing::{info, warn}; use query::exec::ExecutionContextProvider; -use server::{ConnectionManager, Server}; +use server::{connection::ConnectionManager, Server}; use crate::influxdb_ioxd::rpc::error::default_server_error_handler; diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index 03485dbbc3..c62de5d9a8 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -9,7 +9,7 @@ use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, * use predicate::delete_predicate::DeletePredicate; use query::QueryDatabase; use server::rules::ProvidedDatabaseRules; -use server::{ApplicationState, ConnectionManager, Error, Server}; +use server::{connection::ConnectionManager, ApplicationState, Error, Server}; use tonic::{Request, Response, Status}; struct ManagementService { diff --git a/src/influxdb_ioxd/rpc/write.rs b/src/influxdb_ioxd/rpc/write.rs index 515cd2d8ab..77b888891a 100644 --- a/src/influxdb_ioxd/rpc/write.rs +++ b/src/influxdb_ioxd/rpc/write.rs @@ -12,7 +12,7 @@ use generated_types::{ }; use influxdb_line_protocol::parse_lines; use observability_deps::tracing::debug; -use server::{ConnectionManager, Server}; +use server::{connection::ConnectionManager, Server}; use super::error::default_server_error_handler; diff --git a/src/influxdb_ioxd/rpc/write_pb.rs b/src/influxdb_ioxd/rpc/write_pb.rs index be37c00f28..f57e1a6ae9 100644 --- a/src/influxdb_ioxd/rpc/write_pb.rs +++ b/src/influxdb_ioxd/rpc/write_pb.rs @@ -1,7 +1,7 @@ use super::error::default_server_error_handler; use generated_types::google::FieldViolation; use generated_types::influxdata::pbdata::v1::*; -use server::{ConnectionManager, Server}; +use server::{connection::ConnectionManager, Server}; use std::fmt::Debug; use std::sync::Arc;