Merge branch 'main' into crepererum/fix_checkpoint_ordering3

pull/24376/head
kodiakhq[bot] 2021-08-09 12:27:20 +00:00 committed by GitHub
commit bf15e50ce7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 109 additions and 104 deletions

View File

@ -8,13 +8,12 @@
clippy::future_not_send
)]
use async_trait::async_trait;
use data_types::{
chunk_metadata::ChunkSummary,
partition_metadata::{InfluxDbType, TableSummary},
};
use datafusion::physical_plan::SendableRecordBatchStream;
use exec::{stringset::StringSet, Executor};
use exec::stringset::StringSet;
use internal_types::{
schema::{sort::SortKey, Schema, TIME_COLUMN_NAME},
selection::Selection,
@ -55,9 +54,6 @@ pub trait QueryChunkMeta: Sized {
///
/// Databases store data organized by partitions and each partition stores
/// data in Chunks.
///
/// TODO: Move all Query and Line Protocol specific things out of this
/// trait and into the various query planners.
pub trait QueryDatabase: Debug + Send + Sync {
type Error: std::error::Error + Send + Sync + 'static;
type Chunk: QueryChunk;
@ -153,31 +149,6 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync {
fn chunk_type(&self) -> &str;
}
#[async_trait]
/// Storage for `Databases` which can be retrieved by name
pub trait DatabaseStore: Debug + Send + Sync {
/// The type of database that is stored by this DatabaseStore
type Database: QueryDatabase;
/// The type of error this DataBase store generates
type Error: std::error::Error + Send + Sync + 'static;
/// List the database names.
fn db_names_sorted(&self) -> Vec<String>;
/// Retrieve the database specified by `name` returning None if no
/// such database exists
fn db(&self, name: &str) -> Option<Arc<Self::Database>>;
/// Retrieve the database specified by `name`, creating it if it
/// doesn't exist.
async fn db_or_create(&self, name: &str) -> Result<Arc<Self::Database>, Self::Error>;
/// Provide a query executor to use for running queries on
/// databases in this `DatabaseStore`
fn executor(&self) -> Arc<Executor>;
}
/// Implement ChunkMeta for something wrapped in an Arc (like Chunks often are)
impl<P> QueryChunkMeta for Arc<P>
where

View File

@ -1,19 +1,17 @@
//! This module provides a reference implementaton of `query::DatabaseSource`
//! and `query::Database` for use in testing.
//! This module provides a reference implementaton of
//! [`QueryDatabase`] for use in testing.
//!
//! AKA it is a Mock
use crate::exec::Executor;
use crate::{
exec::stringset::{StringSet, StringSetRef},
DatabaseStore, Predicate, PredicateMatch, QueryChunk, QueryChunkMeta, QueryDatabase,
Predicate, PredicateMatch, QueryChunk, QueryChunkMeta, QueryDatabase,
};
use arrow::{
array::{ArrayRef, DictionaryArray, Int64Array, StringArray, TimestampNanosecondArray},
datatypes::{DataType, Int32Type, TimeUnit},
record_batch::RecordBatch,
};
use async_trait::async_trait;
use data_types::{
chunk_metadata::ChunkSummary,
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
@ -887,67 +885,6 @@ impl QueryChunkMeta for TestChunk {
}
}
#[derive(Debug)]
pub struct TestDatabaseStore {
databases: Mutex<BTreeMap<String, Arc<TestDatabase>>>,
executor: Arc<Executor>,
pub metrics_registry: metrics::TestMetricRegistry,
}
impl TestDatabaseStore {
pub fn new() -> Self {
Self::default()
}
}
impl Default for TestDatabaseStore {
fn default() -> Self {
Self {
databases: Mutex::new(BTreeMap::new()),
executor: Arc::new(Executor::new(1)),
metrics_registry: metrics::TestMetricRegistry::default(),
}
}
}
#[async_trait]
impl DatabaseStore for TestDatabaseStore {
type Database = TestDatabase;
type Error = TestError;
/// List the database names.
fn db_names_sorted(&self) -> Vec<String> {
let databases = self.databases.lock();
databases.keys().cloned().collect()
}
/// Retrieve the database specified name
fn db(&self, name: &str) -> Option<Arc<Self::Database>> {
let databases = self.databases.lock();
databases.get(name).cloned()
}
/// Retrieve the database specified by name, creating it if it
/// doesn't exist.
async fn db_or_create(&self, name: &str) -> Result<Arc<Self::Database>, Self::Error> {
let mut databases = self.databases.lock();
if let Some(db) = databases.get(name) {
Ok(Arc::clone(db))
} else {
let new_db = Arc::new(TestDatabase::new());
databases.insert(name.to_string(), Arc::clone(&new_db));
Ok(new_db)
}
}
fn executor(&self) -> Arc<Executor> {
Arc::clone(&self.executor)
}
}
/// Return the raw data from the list of chunks
pub async fn raw_data(chunks: &[Arc<TestChunk>]) -> Vec<RecordBatch> {
let mut batches = vec![];

View File

@ -316,6 +316,8 @@ async fn initialize_database(shared: &DatabaseShared) {
}
};
info!(%db_name, %state, "attempting to advance database initialization state");
// Try to advance to the next state
let next_state = match state {
DatabaseState::Known(state) => match state.advance(shared).await {
@ -336,6 +338,8 @@ async fn initialize_database(shared: &DatabaseShared) {
// Commit the next state
{
let mut state = shared.state.write();
info!(%db_name, from=%state, to=%next_state, "database initialization state transition");
*state.unfreeze(handle) = next_state;
shared.state_notify.notify_waiters();
}

View File

@ -92,7 +92,7 @@ use metrics::{KeyValue, MetricObserverBuilder};
use object_store::{ObjectStore, ObjectStoreApi};
use observability_deps::tracing::{error, info, warn};
use parking_lot::RwLock;
use query::{exec::Executor, DatabaseStore};
use query::exec::Executor;
use rand::seq::SliceRandom;
use resolver::Resolver;
use snafu::{OptionExt, ResultExt, Snafu};
@ -223,6 +223,31 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Storage for `Databases` which can be retrieved by name
#[async_trait]
pub trait DatabaseStore: std::fmt::Debug + Send + Sync {
/// The type of database that is stored by this DatabaseStore
type Database: query::QueryDatabase;
/// The type of error this DataBase store generates
type Error: std::error::Error + Send + Sync + 'static;
/// List the database names.
fn db_names_sorted(&self) -> Vec<String>;
/// Retrieve the database specified by `name` returning None if no
/// such database exists
fn db(&self, name: &str) -> Option<Arc<Self::Database>>;
/// Retrieve the database specified by `name`, creating it if it
/// doesn't exist.
async fn db_or_create(&self, name: &str) -> Result<Arc<Self::Database>, Self::Error>;
/// Provide a query executor to use for running queries on
/// databases in this `DatabaseStore`
fn executor(&self) -> Arc<Executor>;
}
/// A collection of metrics used to instrument the Server.
#[derive(Debug)]
pub struct ServerMetrics {

View File

@ -8,8 +8,8 @@ use generated_types::google::{
};
use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *};
use observability_deps::tracing::info;
use query::{DatabaseStore, QueryDatabase};
use server::{ApplicationState, ConnectionManager, Error, Server};
use query::QueryDatabase;
use server::{ApplicationState, ConnectionManager, DatabaseStore, Error, Server};
use tonic::{Request, Response, Status};
struct ManagementService<M: ConnectionManager> {

View File

@ -16,7 +16,7 @@ pub mod service;
use generated_types::storage_server::{Storage, StorageServer};
use metrics::{MetricRegistry, RedMetric};
use query::DatabaseStore;
use server::DatabaseStore;
use std::sync::Arc;
/// Concrete implementation of the gRPC InfluxDB Storage Service API

View File

@ -1,6 +1,6 @@
//! This module contains implementations for the storage gRPC service
//! implemented in terms of the `query::Database` and
//! `query::DatabaseStore`
//! implemented in terms of the [`QueryDatabase`](query::QueryDatabase) and
//! [`DatabaseStore`]
use crate::influxdb_ioxd::{
planner::Planner,
@ -27,8 +27,9 @@ use metrics::KeyValue;
use observability_deps::tracing::{error, info};
use query::{
exec::fieldlist::FieldList, exec::seriesset::Error as SeriesSetError,
predicate::PredicateBuilder, DatabaseStore,
predicate::PredicateBuilder,
};
use server::DatabaseStore;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::mpsc;
@ -1166,8 +1167,14 @@ mod tests {
use super::*;
use datafusion::logical_plan::{col, lit, Expr};
use panic_logging::SendPanicsToTracing;
use query::{predicate::PredicateMatch, test::TestChunk, test::TestDatabaseStore};
use parking_lot::Mutex;
use query::{
exec::Executor,
predicate::PredicateMatch,
test::{TestChunk, TestDatabase, TestError},
};
use std::{
collections::BTreeMap,
convert::TryFrom,
net::{IpAddr, Ipv4Addr, SocketAddr},
time::Duration,
@ -2865,4 +2872,65 @@ mod tests {
Self::connect(addr).await
}
}
#[derive(Debug)]
pub struct TestDatabaseStore {
databases: Mutex<BTreeMap<String, Arc<TestDatabase>>>,
executor: Arc<Executor>,
pub metrics_registry: metrics::TestMetricRegistry,
}
impl TestDatabaseStore {
pub fn new() -> Self {
Self::default()
}
}
impl Default for TestDatabaseStore {
fn default() -> Self {
Self {
databases: Mutex::new(BTreeMap::new()),
executor: Arc::new(Executor::new(1)),
metrics_registry: metrics::TestMetricRegistry::default(),
}
}
}
#[tonic::async_trait]
impl DatabaseStore for TestDatabaseStore {
type Database = TestDatabase;
type Error = TestError;
/// List the database names.
fn db_names_sorted(&self) -> Vec<String> {
let databases = self.databases.lock();
databases.keys().cloned().collect()
}
/// Retrieve the database specified name
fn db(&self, name: &str) -> Option<Arc<Self::Database>> {
let databases = self.databases.lock();
databases.get(name).cloned()
}
/// Retrieve the database specified by name, creating it if it
/// doesn't exist.
async fn db_or_create(&self, name: &str) -> Result<Arc<Self::Database>, Self::Error> {
let mut databases = self.databases.lock();
if let Some(db) = databases.get(name) {
Ok(Arc::clone(db))
} else {
let new_db = Arc::new(TestDatabase::new());
databases.insert(name.to_string(), Arc::clone(&new_db));
Ok(new_db)
}
}
fn executor(&self) -> Arc<Executor> {
Arc::clone(&self.executor)
}
}
}