refactor: `QueryDatabaseProvider::db` should be async (#4143)
This is required to fetch querier namespaces on demand. Ref #4123.pull/24376/head
parent
4f9515ffba
commit
fb186c6733
|
@ -5153,6 +5153,7 @@ dependencies = [
|
|||
name = "service_common"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"datafusion 0.1.0",
|
||||
"predicate",
|
||||
"query",
|
||||
|
|
|
@ -6,6 +6,7 @@ use crate::{
|
|||
namespace::QuerierNamespace,
|
||||
poison::{PoisonCabinet, PoisonPill},
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use backoff::{Backoff, BackoffConfig};
|
||||
use data_types2::NamespaceId;
|
||||
use iox_catalog::interface::Catalog;
|
||||
|
@ -57,10 +58,11 @@ pub struct QuerierDatabase {
|
|||
exec: Arc<Executor>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl QueryDatabaseProvider for QuerierDatabase {
|
||||
type Db = QuerierNamespace;
|
||||
|
||||
fn db(&self, name: &str) -> Option<Arc<Self::Db>> {
|
||||
async fn db(&self, name: &str) -> Option<Arc<Self::Db>> {
|
||||
self.namespace(name)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,6 +68,7 @@
|
|||
clippy::future_not_send
|
||||
)]
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{
|
||||
chunk_metadata::ChunkId,
|
||||
error::ErrorLogger,
|
||||
|
@ -242,10 +243,11 @@ impl Drop for Server {
|
|||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl service_common::QueryDatabaseProvider for Server {
|
||||
type Db = Db;
|
||||
|
||||
fn db(&self, name: &str) -> Option<Arc<Self::Db>> {
|
||||
async fn db(&self, name: &str) -> Option<Arc<Self::Db>> {
|
||||
DatabaseName::new(name)
|
||||
.ok()
|
||||
.and_then(|name| self.db(&name).ok())
|
||||
|
|
|
@ -13,3 +13,4 @@ query = { path = "../query" }
|
|||
workspace-hack = { path = "../workspace-hack"}
|
||||
|
||||
# Crates.io dependencies, in alphabetical order
|
||||
async-trait = "0.1.53"
|
||||
|
|
|
@ -4,16 +4,18 @@ pub mod planner;
|
|||
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use query::{exec::ExecutionContextProvider, QueryDatabase};
|
||||
|
||||
/// Trait that allows the query engine (which includes flight and storage/InfluxRPC) to access a virtual set of
|
||||
/// databases.
|
||||
///
|
||||
/// The query engine MUST ONLY use this trait to access the databases / catalogs.
|
||||
#[async_trait]
|
||||
pub trait QueryDatabaseProvider: std::fmt::Debug + Send + Sync + 'static {
|
||||
/// Abstract database.
|
||||
type Db: ExecutionContextProvider + QueryDatabase;
|
||||
|
||||
/// Get database if it exists.
|
||||
fn db(&self, name: &str) -> Option<Arc<Self::Db>>;
|
||||
async fn db(&self, name: &str) -> Option<Arc<Self::Db>>;
|
||||
}
|
||||
|
|
|
@ -175,10 +175,10 @@ where
|
|||
let database =
|
||||
DatabaseName::new(&read_info.database_name).context(InvalidDatabaseNameSnafu)?;
|
||||
|
||||
let db = self
|
||||
.server
|
||||
.db(&database)
|
||||
.ok_or_else(|| tonic::Status::not_found(format!("Unknown namespace: {database}")))?;
|
||||
let db =
|
||||
self.server.db(&database).await.ok_or_else(|| {
|
||||
tonic::Status::not_found(format!("Unknown namespace: {database}"))
|
||||
})?;
|
||||
|
||||
let ctx = db.new_query_context(span_ctx);
|
||||
let query_completed_token =
|
||||
|
|
|
@ -240,6 +240,7 @@ where
|
|||
let db = self
|
||||
.db_store
|
||||
.db(&db_name)
|
||||
.await
|
||||
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
|
||||
|
||||
let ctx = db.new_query_context(span_ctx);
|
||||
|
@ -271,6 +272,7 @@ where
|
|||
let db = self
|
||||
.db_store
|
||||
.db(&db_name)
|
||||
.await
|
||||
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
|
||||
|
||||
let ctx = db.new_query_context(span_ctx);
|
||||
|
@ -327,6 +329,7 @@ where
|
|||
let db = self
|
||||
.db_store
|
||||
.db(&db_name)
|
||||
.await
|
||||
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
|
||||
|
||||
let ctx = db.new_query_context(span_ctx);
|
||||
|
@ -382,6 +385,7 @@ where
|
|||
let db = self
|
||||
.db_store
|
||||
.db(&db_name)
|
||||
.await
|
||||
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
|
||||
|
||||
let ctx = db.new_query_context(span_ctx);
|
||||
|
@ -434,6 +438,7 @@ where
|
|||
let db = self
|
||||
.db_store
|
||||
.db(&db_name)
|
||||
.await
|
||||
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
|
||||
|
||||
let ctx = db.new_query_context(span_ctx);
|
||||
|
@ -519,6 +524,7 @@ where
|
|||
let db = self
|
||||
.db_store
|
||||
.db(&db_name)
|
||||
.await
|
||||
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
|
||||
|
||||
let ctx = db.new_query_context(span_ctx);
|
||||
|
@ -606,6 +612,7 @@ where
|
|||
let db = self
|
||||
.db_store
|
||||
.db(&db_name)
|
||||
.await
|
||||
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
|
||||
|
||||
let ctx = db.new_query_context(span_ctx);
|
||||
|
@ -650,6 +657,7 @@ where
|
|||
let db = self
|
||||
.db_store
|
||||
.db(&db_name)
|
||||
.await
|
||||
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
|
||||
|
||||
let ctx = db.new_query_context(span_ctx);
|
||||
|
@ -704,6 +712,7 @@ where
|
|||
let db = self
|
||||
.db_store
|
||||
.db(&db_name)
|
||||
.await
|
||||
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
|
||||
|
||||
let ctx = db.new_query_context(span_ctx);
|
||||
|
@ -760,6 +769,7 @@ where
|
|||
let db = self
|
||||
.db_store
|
||||
.db(&db_name)
|
||||
.await
|
||||
.context(DatabaseNotFoundSnafu { db_name: &db_name })?;
|
||||
|
||||
let ctx = db.new_query_context(span_ctx);
|
||||
|
@ -1364,6 +1374,7 @@ mod tests {
|
|||
sync::Arc,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::chunk_metadata::ChunkId;
|
||||
use generated_types::{i_ox_testing_client::IOxTestingClient, tag_key_predicate::Value};
|
||||
use parking_lot::Mutex;
|
||||
|
@ -3023,11 +3034,12 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl QueryDatabaseProvider for TestDatabaseStore {
|
||||
type Db = TestDatabase;
|
||||
|
||||
/// Retrieve the database specified name
|
||||
fn db(&self, name: &str) -> Option<Arc<Self::Db>> {
|
||||
async fn db(&self, name: &str) -> Option<Arc<Self::Db>> {
|
||||
let databases = self.databases.lock();
|
||||
|
||||
databases.get(name).cloned()
|
||||
|
|
Loading…
Reference in New Issue