refactor: remove DbCatalog newtype wrapper (#1067)

* refactor: remove DbCatalog newtype wrapper

* chore: review comments

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-03-30 08:52:16 +01:00 committed by GitHub
parent c30eaffe94
commit 18f3124721
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 118 additions and 119 deletions

View File

@ -12,19 +12,12 @@ use parking_lot::Mutex;
use snafu::{OptionExt, ResultExt, Snafu};
use tracing::{debug, info};
use arrow_deps::datafusion::{
catalog::{catalog::CatalogProvider, schema::SchemaProvider},
datasource::TableProvider,
};
use arrow_deps::datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider};
use catalog::{chunk::ChunkState, Catalog};
use data_types::{chunk::ChunkSummary, database_rules::DatabaseRules, error::ErrorLogger};
use data_types::{chunk::ChunkSummary, database_rules::DatabaseRules};
use internal_types::{data::ReplicatedWrite, selection::Selection};
use mutable_buffer::chunk::Chunk;
use query::{
exec::stringset::StringSet,
provider::{self, ProviderBuilder},
Database, PartitionChunk, DEFAULT_SCHEMA,
};
use query::{Database, DEFAULT_SCHEMA};
use read_buffer::Database as ReadBufferDb;
pub(crate) use chunk::DBChunk;
@ -99,7 +92,7 @@ pub enum Error {
UnknownMutableBufferChunk { chunk_id: u32 },
#[snafu(display("Cannot write to this database: no mutable buffer configured"))]
DatatbaseNotWriteable {},
DatabaseNotWriteable {},
#[snafu(display("Internal error: cannot create partition in catalog: {}", source))]
CreatingPartition {
@ -155,7 +148,7 @@ pub struct Db {
pub rules: DatabaseRules,
/// The metadata catalog
catalog: Catalog,
catalog: Arc<Catalog>,
/// The read buffer holds chunk data in an in-memory optimized
/// format.
@ -183,7 +176,7 @@ impl Db {
) -> Self {
let wal_buffer = wal_buffer.map(Mutex::new);
let read_buffer = Arc::new(read_buffer);
let catalog = Catalog::new();
let catalog = Arc::new(Catalog::new());
Self {
rules,
catalog,
@ -458,7 +451,7 @@ impl Database for Db {
fn store_replicated_write(&self, write: &ReplicatedWrite) -> Result<(), Self::Error> {
if !self.writeable() {
return DatatbaseNotWriteable {}.fail();
return DatabaseNotWriteable {}.fail();
}
let batch = if let Some(batch) = write.write_buffer_batch() {
@ -513,16 +506,7 @@ impl Database for Db {
}
fn partition_keys(&self) -> Result<Vec<String>, Self::Error> {
let partition_keys = self
.catalog
.partitions()
.map(|partition| {
let partition = partition.read();
partition.key().to_string()
})
.collect();
Ok(partition_keys)
Ok(self.catalog.partition_keys())
}
fn chunk_summaries(&self) -> Result<Vec<ChunkSummary>> {
@ -536,19 +520,7 @@ impl Database for Db {
}
}
/// Temporary newtype Db wrapper to allow it to act as a CatalogProvider
///
/// TODO: Make Db implement CatalogProvider and Catalog implement SchemaProvider
#[derive(Debug)]
pub struct DbCatalog(Arc<Db>);
impl DbCatalog {
pub fn new(db: Arc<Db>) -> Self {
Self(db)
}
}
impl CatalogProvider for DbCatalog {
impl CatalogProvider for Db {
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
@ -560,60 +532,12 @@ impl CatalogProvider for DbCatalog {
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
info!(%name, "using schema");
match name {
DEFAULT_SCHEMA => Some(Arc::<Db>::clone(&self.0)),
DEFAULT_SCHEMA => Some(Arc::<Catalog>::clone(&self.catalog)),
_ => None,
}
}
}
impl SchemaProvider for Db {
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
fn table_names(&self) -> Vec<String> {
let mut names = StringSet::new();
self.catalog.partitions().for_each(|partition| {
let partition = partition.read();
partition.chunks().for_each(|chunk| {
let chunk = chunk.read();
let db_chunk = DBChunk::snapshot(&chunk);
db_chunk.all_table_names(&mut names);
})
});
names.into_iter().collect::<Vec<_>>()
}
fn table(&self, table_name: &str) -> Option<Arc<dyn TableProvider>> {
let mut builder = ProviderBuilder::new(table_name);
for partition_key in self.partition_keys().expect("cannot fail") {
for chunk in self.chunks(&partition_key) {
if chunk.has_table(table_name) {
// This should only fail if the table doesn't exist which isn't possible
let schema = chunk
.table_schema(table_name, Selection::All)
.expect("cannot fail");
// This is unfortunate - a table with incompatible chunks ceases to
// be visible to the query engine
builder = builder
.add_chunk(chunk, schema)
.log_if_error("Adding chunks to table")
.ok()?
}
}
}
match builder.build() {
Ok(provider) => Some(Arc::new(provider)),
Err(provider::Error::InternalNoChunks { .. }) => None,
Err(e) => panic!("unexpected error: {:?}", e),
}
}
}
#[cfg(test)]
mod tests {
use arrow_deps::{
@ -907,10 +831,7 @@ mod tests {
let planner = SQLQueryPlanner::default();
let executor = Executor::new();
let physical_plan = planner
.query(Arc::new(DbCatalog::new(db)), query, &executor)
.await
.unwrap();
let physical_plan = planner.query(db, query, &executor).await.unwrap();
collect(physical_plan).await.unwrap()
}

View File

@ -1,4 +1,5 @@
//! This module contains the implementation of the InfluxDB IOx Metadata catalog
use std::any::Any;
use std::{
collections::{btree_map::Entry, BTreeMap},
sync::Arc,
@ -7,11 +8,19 @@ use std::{
use parking_lot::RwLock;
use snafu::{OptionExt, Snafu};
use arrow_deps::datafusion::{catalog::schema::SchemaProvider, datasource::TableProvider};
use data_types::error::ErrorLogger;
use internal_types::selection::Selection;
use partition::Partition;
use query::{
exec::stringset::StringSet,
provider::{self, ProviderBuilder},
PartitionChunk,
};
pub mod chunk;
pub mod partition;
use partition::Partition;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("unknown partition: {}", partition_key))]
@ -70,22 +79,27 @@ impl Catalog {
}
}
// List all partitions in this database
/// List all partitions in this database
pub fn partitions(&self) -> impl Iterator<Item = Arc<RwLock<Partition>>> {
let partitions = self.partitions.read();
partitions.values().cloned().collect::<Vec<_>>().into_iter()
}
// Get a specific partition by name, returning `None` if there is no such
// partition
/// Get a specific partition by name, returning `None` if there is no such
/// partition
pub fn partition(&self, partition_key: impl AsRef<str>) -> Option<Arc<RwLock<Partition>>> {
let partition_key = partition_key.as_ref();
let partitions = self.partitions.read();
partitions.get(partition_key).cloned()
}
// Create a new partition in the catalog and return a reference to
// it. Returns an error if the partition already exists
/// List all partition keys in this database
pub fn partition_keys(&self) -> Vec<String> {
self.partitions.read().keys().cloned().collect()
}
/// Create a new partition in the catalog and return a reference to
/// it. Returns an error if the partition already exists
pub fn create_partition(
&self,
partition_key: impl Into<String>,
@ -119,6 +133,59 @@ impl Catalog {
}
}
impl SchemaProvider for Catalog {
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
fn table_names(&self) -> Vec<String> {
let mut names = StringSet::new();
self.partitions().for_each(|partition| {
let partition = partition.read();
partition.chunks().for_each(|chunk| {
chunk.read().table_names(&mut names);
})
});
names.into_iter().collect()
}
fn table(&self, table_name: &str) -> Option<Arc<dyn TableProvider>> {
let mut builder = ProviderBuilder::new(table_name);
let partitions = self.partitions.read();
for partition in partitions.values() {
let partition = partition.read();
for chunk in partition.chunks() {
let chunk = chunk.read();
if chunk.has_table(table_name) {
let chunk = super::DBChunk::snapshot(&chunk);
// This should only fail if the table doesn't exist which isn't possible
let schema = chunk
.table_schema(table_name, Selection::All)
.expect("cannot fail");
// This is unfortunate - a table with incompatible chunks ceases to
// be visible to the query engine
builder = builder
.add_chunk(chunk, schema)
.log_if_error("Adding chunks to table")
.ok()?
}
}
}
match builder.build() {
Ok(provider) => Some(Arc::new(provider)),
Err(provider::Error::InternalNoChunks { .. }) => None,
Err(e) => panic!("unexpected error: {:?}", e),
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -1,6 +1,8 @@
use std::collections::BTreeSet;
use std::sync::Arc;
use mutable_buffer::chunk::Chunk as MBChunk;
use read_buffer::Database as ReadBufferDb;
use std::sync::Arc;
use super::{InternalChunkState, Result};
@ -96,6 +98,30 @@ impl Chunk {
&self.state
}
/// Returns true if this chunk contains a table with the provided name
pub fn has_table(&self, table_name: &str) -> bool {
match &self.state {
ChunkState::None => false,
ChunkState::Open(chunk) | ChunkState::Closing(chunk) => chunk.has_table(table_name),
ChunkState::Moving(chunk) | ChunkState::Closed(chunk) => chunk.has_table(table_name),
ChunkState::Moved(db) => {
db.has_table(self.partition_key.as_str(), table_name, &[self.id])
}
}
}
/// Collects the chunk's table names into `names`
pub fn table_names(&self, names: &mut BTreeSet<String>) {
match &self.state {
ChunkState::None => {}
ChunkState::Open(chunk) | ChunkState::Closing(chunk) => chunk.all_table_names(names),
ChunkState::Moving(chunk) | ChunkState::Closed(chunk) => chunk.all_table_names(names),
ChunkState::Moved(db) => {
db.all_table_names(self.partition_key.as_str(), &[self.id], names)
}
}
}
/// Returns a mutable reference to the mutable buffer storage for
/// chunks in the Open or Closing state
///

View File

@ -683,7 +683,6 @@ mod tests {
use crate::buffer::Segment;
use super::*;
use crate::db::DbCatalog;
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T = (), E = TestError> = std::result::Result<T, E>;
@ -860,11 +859,7 @@ mod tests {
let planner = SQLQueryPlanner::default();
let executor = server.executor();
let physical_plan = planner
.query(
Arc::new(DbCatalog::new(db)),
"select * from cpu",
executor.as_ref(),
)
.query(db, "select * from cpu", executor.as_ref())
.await
.unwrap();

View File

@ -4,7 +4,6 @@
//! important SQL does not regress)
use super::scenarios::*;
use crate::db::DbCatalog;
use arrow_deps::{
arrow::record_batch::RecordBatch, assert_table_eq, datafusion::physical_plan::collect,
};
@ -29,7 +28,7 @@ macro_rules! run_sql_test_case {
let executor = Executor::new();
let physical_plan = planner
.query(Arc::new(DbCatalog::new(db)), &sql, &executor)
.query(db, &sql, &executor)
.await
.expect("built plan successfully");

View File

@ -35,7 +35,6 @@ use tracing::{debug, error, info};
use data_types::http::WalMetadataResponse;
use hyper::server::conn::AddrIncoming;
use server::db::DbCatalog;
use std::{
fmt::Debug,
str::{self, FromStr},
@ -506,7 +505,7 @@ async fn query<M: ConnectionManager + Send + Sync + Debug + 'static>(
let executor = server.executor();
let physical_plan = planner
.query(Arc::new(DbCatalog::new(db)), &q, executor.as_ref())
.query(db, &q, executor.as_ref())
.await
.context(PlanningSQLQuery { query: &q })?;
@ -1195,10 +1194,7 @@ mod tests {
async fn run_query(db: Arc<Db>, query: &str) -> Vec<RecordBatch> {
let planner = SQLQueryPlanner::default();
let executor = Executor::new();
let physical_plan = planner
.query(Arc::new(DbCatalog::new(db)), query, &executor)
.await
.unwrap();
let physical_plan = planner.query(db, query, &executor).await.unwrap();
collect(physical_plan).await.unwrap()
}

View File

@ -66,7 +66,7 @@ pub fn default_db_error_handler(error: server::db::Error) -> tonic::Status {
description: "Cannot read from database: no mutable buffer configured".to_string(),
}
.into(),
Error::DatatbaseNotWriteable {} => PreconditionViolation {
Error::DatabaseNotWriteable {} => PreconditionViolation {
category: "database".to_string(),
subject: "influxdata.com/iox".to_string(),
description: "Cannot write to database: no mutable buffer configured".to_string(),

View File

@ -18,7 +18,6 @@ use arrow_deps::{
};
use data_types::{DatabaseName, DatabaseNameError};
use query::{frontend::sql::SQLQueryPlanner, DatabaseStore};
use server::db::DbCatalog;
use server::{ConnectionManager, Server};
use std::fmt::Debug;
@ -150,11 +149,7 @@ where
let executor = self.server.executor();
let physical_plan = planner
.query(
Arc::new(DbCatalog::new(db)),
&read_info.sql_query,
&executor,
)
.query(db, &read_info.sql_query, &executor)
.await
.context(PlanningSQLQuery {
query: &read_info.sql_query,