diff --git a/query/src/lib.rs b/query/src/lib.rs index f0ed162286..535549df53 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -8,13 +8,12 @@ clippy::future_not_send )] -use async_trait::async_trait; use data_types::{ chunk_metadata::ChunkSummary, partition_metadata::{InfluxDbType, TableSummary}, }; use datafusion::physical_plan::SendableRecordBatchStream; -use exec::{stringset::StringSet, Executor}; +use exec::stringset::StringSet; use internal_types::{ schema::{sort::SortKey, Schema, TIME_COLUMN_NAME}, selection::Selection, @@ -55,9 +54,6 @@ pub trait QueryChunkMeta: Sized { /// /// Databases store data organized by partitions and each partition stores /// data in Chunks. -/// -/// TODO: Move all Query and Line Protocol specific things out of this -/// trait and into the various query planners. pub trait QueryDatabase: Debug + Send + Sync { type Error: std::error::Error + Send + Sync + 'static; type Chunk: QueryChunk; @@ -153,31 +149,6 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync { fn chunk_type(&self) -> &str; } -#[async_trait] -/// Storage for `Databases` which can be retrieved by name -pub trait DatabaseStore: Debug + Send + Sync { - /// The type of database that is stored by this DatabaseStore - type Database: QueryDatabase; - - /// The type of error this DataBase store generates - type Error: std::error::Error + Send + Sync + 'static; - - /// List the database names. - fn db_names_sorted(&self) -> Vec; - - /// Retrieve the database specified by `name` returning None if no - /// such database exists - fn db(&self, name: &str) -> Option>; - - /// Retrieve the database specified by `name`, creating it if it - /// doesn't exist. - async fn db_or_create(&self, name: &str) -> Result, Self::Error>; - - /// Provide a query executor to use for running queries on - /// databases in this `DatabaseStore` - fn executor(&self) -> Arc; -} - /// Implement ChunkMeta for something wrapped in an Arc (like Chunks often are) impl

QueryChunkMeta for Arc

where diff --git a/query/src/test.rs b/query/src/test.rs index c3cc33b150..c30d8e89a1 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -1,19 +1,17 @@ -//! This module provides a reference implementaton of `query::DatabaseSource` -//! and `query::Database` for use in testing. +//! This module provides a reference implementaton of +//! [`QueryDatabase`] for use in testing. //! //! AKA it is a Mock -use crate::exec::Executor; use crate::{ exec::stringset::{StringSet, StringSetRef}, - DatabaseStore, Predicate, PredicateMatch, QueryChunk, QueryChunkMeta, QueryDatabase, + Predicate, PredicateMatch, QueryChunk, QueryChunkMeta, QueryDatabase, }; use arrow::{ array::{ArrayRef, DictionaryArray, Int64Array, StringArray, TimestampNanosecondArray}, datatypes::{DataType, Int32Type, TimeUnit}, record_batch::RecordBatch, }; -use async_trait::async_trait; use data_types::{ chunk_metadata::ChunkSummary, partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, @@ -887,67 +885,6 @@ impl QueryChunkMeta for TestChunk { } } -#[derive(Debug)] -pub struct TestDatabaseStore { - databases: Mutex>>, - executor: Arc, - pub metrics_registry: metrics::TestMetricRegistry, -} - -impl TestDatabaseStore { - pub fn new() -> Self { - Self::default() - } -} - -impl Default for TestDatabaseStore { - fn default() -> Self { - Self { - databases: Mutex::new(BTreeMap::new()), - executor: Arc::new(Executor::new(1)), - metrics_registry: metrics::TestMetricRegistry::default(), - } - } -} - -#[async_trait] -impl DatabaseStore for TestDatabaseStore { - type Database = TestDatabase; - type Error = TestError; - - /// List the database names. - fn db_names_sorted(&self) -> Vec { - let databases = self.databases.lock(); - - databases.keys().cloned().collect() - } - - /// Retrieve the database specified name - fn db(&self, name: &str) -> Option> { - let databases = self.databases.lock(); - - databases.get(name).cloned() - } - - /// Retrieve the database specified by name, creating it if it - /// doesn't exist. - async fn db_or_create(&self, name: &str) -> Result, Self::Error> { - let mut databases = self.databases.lock(); - - if let Some(db) = databases.get(name) { - Ok(Arc::clone(db)) - } else { - let new_db = Arc::new(TestDatabase::new()); - databases.insert(name.to_string(), Arc::clone(&new_db)); - Ok(new_db) - } - } - - fn executor(&self) -> Arc { - Arc::clone(&self.executor) - } -} - /// Return the raw data from the list of chunks pub async fn raw_data(chunks: &[Arc]) -> Vec { let mut batches = vec![]; diff --git a/server/src/database.rs b/server/src/database.rs index 91babde556..942a3626c9 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -316,6 +316,8 @@ async fn initialize_database(shared: &DatabaseShared) { } }; + info!(%db_name, %state, "attempting to advance database initialization state"); + // Try to advance to the next state let next_state = match state { DatabaseState::Known(state) => match state.advance(shared).await { @@ -336,6 +338,8 @@ async fn initialize_database(shared: &DatabaseShared) { // Commit the next state { let mut state = shared.state.write(); + info!(%db_name, from=%state, to=%next_state, "database initialization state transition"); + *state.unfreeze(handle) = next_state; shared.state_notify.notify_waiters(); } diff --git a/server/src/lib.rs b/server/src/lib.rs index 3af797c12e..60f206bf27 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -92,7 +92,7 @@ use metrics::{KeyValue, MetricObserverBuilder}; use object_store::{ObjectStore, ObjectStoreApi}; use observability_deps::tracing::{error, info, warn}; use parking_lot::RwLock; -use query::{exec::Executor, DatabaseStore}; +use query::exec::Executor; use rand::seq::SliceRandom; use resolver::Resolver; use snafu::{OptionExt, ResultExt, Snafu}; @@ -223,6 +223,31 @@ pub enum Error { pub type Result = std::result::Result; +/// Storage for `Databases` which can be retrieved by name +#[async_trait] +pub trait DatabaseStore: std::fmt::Debug + Send + Sync { + /// The type of database that is stored by this DatabaseStore + type Database: query::QueryDatabase; + + /// The type of error this DataBase store generates + type Error: std::error::Error + Send + Sync + 'static; + + /// List the database names. + fn db_names_sorted(&self) -> Vec; + + /// Retrieve the database specified by `name` returning None if no + /// such database exists + fn db(&self, name: &str) -> Option>; + + /// Retrieve the database specified by `name`, creating it if it + /// doesn't exist. + async fn db_or_create(&self, name: &str) -> Result, Self::Error>; + + /// Provide a query executor to use for running queries on + /// databases in this `DatabaseStore` + fn executor(&self) -> Arc; +} + /// A collection of metrics used to instrument the Server. #[derive(Debug)] pub struct ServerMetrics { diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index fb0906018c..3f483b02e8 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -8,8 +8,8 @@ use generated_types::google::{ }; use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *}; use observability_deps::tracing::info; -use query::{DatabaseStore, QueryDatabase}; -use server::{ApplicationState, ConnectionManager, Error, Server}; +use query::QueryDatabase; +use server::{ApplicationState, ConnectionManager, DatabaseStore, Error, Server}; use tonic::{Request, Response, Status}; struct ManagementService { diff --git a/src/influxdb_ioxd/rpc/storage.rs b/src/influxdb_ioxd/rpc/storage.rs index e65c0bcedc..d2df4057a1 100644 --- a/src/influxdb_ioxd/rpc/storage.rs +++ b/src/influxdb_ioxd/rpc/storage.rs @@ -16,7 +16,7 @@ pub mod service; use generated_types::storage_server::{Storage, StorageServer}; use metrics::{MetricRegistry, RedMetric}; -use query::DatabaseStore; +use server::DatabaseStore; use std::sync::Arc; /// Concrete implementation of the gRPC InfluxDB Storage Service API diff --git a/src/influxdb_ioxd/rpc/storage/service.rs b/src/influxdb_ioxd/rpc/storage/service.rs index 100fb119a6..071e0be14b 100644 --- a/src/influxdb_ioxd/rpc/storage/service.rs +++ b/src/influxdb_ioxd/rpc/storage/service.rs @@ -1,6 +1,6 @@ //! This module contains implementations for the storage gRPC service -//! implemented in terms of the `query::Database` and -//! `query::DatabaseStore` +//! implemented in terms of the [`QueryDatabase`](query::QueryDatabase) and +//! [`DatabaseStore`] use crate::influxdb_ioxd::{ planner::Planner, @@ -27,8 +27,9 @@ use metrics::KeyValue; use observability_deps::tracing::{error, info}; use query::{ exec::fieldlist::FieldList, exec::seriesset::Error as SeriesSetError, - predicate::PredicateBuilder, DatabaseStore, + predicate::PredicateBuilder, }; +use server::DatabaseStore; use snafu::{OptionExt, ResultExt, Snafu}; use std::{collections::HashMap, sync::Arc}; use tokio::sync::mpsc; @@ -1166,8 +1167,14 @@ mod tests { use super::*; use datafusion::logical_plan::{col, lit, Expr}; use panic_logging::SendPanicsToTracing; - use query::{predicate::PredicateMatch, test::TestChunk, test::TestDatabaseStore}; + use parking_lot::Mutex; + use query::{ + exec::Executor, + predicate::PredicateMatch, + test::{TestChunk, TestDatabase, TestError}, + }; use std::{ + collections::BTreeMap, convert::TryFrom, net::{IpAddr, Ipv4Addr, SocketAddr}, time::Duration, @@ -2865,4 +2872,65 @@ mod tests { Self::connect(addr).await } } + + #[derive(Debug)] + pub struct TestDatabaseStore { + databases: Mutex>>, + executor: Arc, + pub metrics_registry: metrics::TestMetricRegistry, + } + + impl TestDatabaseStore { + pub fn new() -> Self { + Self::default() + } + } + + impl Default for TestDatabaseStore { + fn default() -> Self { + Self { + databases: Mutex::new(BTreeMap::new()), + executor: Arc::new(Executor::new(1)), + metrics_registry: metrics::TestMetricRegistry::default(), + } + } + } + + #[tonic::async_trait] + impl DatabaseStore for TestDatabaseStore { + type Database = TestDatabase; + type Error = TestError; + + /// List the database names. + fn db_names_sorted(&self) -> Vec { + let databases = self.databases.lock(); + + databases.keys().cloned().collect() + } + + /// Retrieve the database specified name + fn db(&self, name: &str) -> Option> { + let databases = self.databases.lock(); + + databases.get(name).cloned() + } + + /// Retrieve the database specified by name, creating it if it + /// doesn't exist. + async fn db_or_create(&self, name: &str) -> Result, Self::Error> { + let mut databases = self.databases.lock(); + + if let Some(db) = databases.get(name) { + Ok(Arc::clone(db)) + } else { + let new_db = Arc::new(TestDatabase::new()); + databases.insert(name.to_string(), Arc::clone(&new_db)); + Ok(new_db) + } + } + + fn executor(&self) -> Arc { + Arc::clone(&self.executor) + } + } }