Merge branch 'main' into crepererum/test-bench
commit
262bbd5c47
|
@ -14,7 +14,7 @@
|
||||||
use data_types::{names::OrgBucketMappingError, DatabaseName};
|
use data_types::{names::OrgBucketMappingError, DatabaseName};
|
||||||
use influxdb_iox_client::format::QueryOutputFormat;
|
use influxdb_iox_client::format::QueryOutputFormat;
|
||||||
use query::exec::ExecutionContextProvider;
|
use query::exec::ExecutionContextProvider;
|
||||||
use server::{connection::ConnectionManager, Error};
|
use server::Error;
|
||||||
|
|
||||||
// External crates
|
// External crates
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
@ -162,10 +162,7 @@ impl From<server::Error> for ApplicationError {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<M> HttpDrivenDml for DatabaseServerType<M>
|
impl HttpDrivenDml for DatabaseServerType {
|
||||||
where
|
|
||||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
|
||||||
{
|
|
||||||
fn max_request_size(&self) -> usize {
|
fn max_request_size(&self) -> usize {
|
||||||
self.max_request_size
|
self.max_request_size
|
||||||
}
|
}
|
||||||
|
@ -181,17 +178,18 @@ where
|
||||||
) -> Result<(), InnerDmlError> {
|
) -> Result<(), InnerDmlError> {
|
||||||
match op {
|
match op {
|
||||||
DmlOperation::Write(write) => {
|
DmlOperation::Write(write) => {
|
||||||
self.server
|
let database = self.server.active_database(db_name).map_err(|_| {
|
||||||
.write(db_name, write)
|
InnerDmlError::DatabaseNotFound {
|
||||||
|
db_name: db_name.to_string(),
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
|
||||||
|
database
|
||||||
|
.route_operation(&DmlOperation::Write(write))
|
||||||
.await
|
.await
|
||||||
.map_err(|e| match e {
|
.map_err(|e| InnerDmlError::InternalError {
|
||||||
server::Error::DatabaseNotFound { .. } => InnerDmlError::DatabaseNotFound {
|
db_name: db_name.to_string(),
|
||||||
db_name: db_name.to_string(),
|
source: Box::new(e),
|
||||||
},
|
|
||||||
e => InnerDmlError::InternalError {
|
|
||||||
db_name: db_name.to_string(),
|
|
||||||
source: Box::new(e),
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
DmlOperation::Delete(delete) => {
|
DmlOperation::Delete(delete) => {
|
||||||
|
@ -221,13 +219,10 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn route_request<M>(
|
pub async fn route_request(
|
||||||
server_type: &DatabaseServerType<M>,
|
server_type: &DatabaseServerType,
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
) -> Result<Response<Body>, ApplicationError>
|
) -> Result<Response<Body>, ApplicationError> {
|
||||||
where
|
|
||||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
|
||||||
{
|
|
||||||
match server_type
|
match server_type
|
||||||
.route_dml_http_request(req)
|
.route_dml_http_request(req)
|
||||||
.await
|
.await
|
||||||
|
@ -265,9 +260,9 @@ fn default_format() -> String {
|
||||||
QueryOutputFormat::default().to_string()
|
QueryOutputFormat::default().to_string()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn query<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
async fn query(
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
server_type: &DatabaseServerType<M>,
|
server_type: &DatabaseServerType,
|
||||||
) -> Result<Response<Body>, ApplicationError> {
|
) -> Result<Response<Body>, ApplicationError> {
|
||||||
let server = &server_type.server;
|
let server = &server_type.server;
|
||||||
|
|
||||||
|
@ -339,10 +334,7 @@ mod tests {
|
||||||
use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName};
|
use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName};
|
||||||
use object_store::ObjectStore;
|
use object_store::ObjectStore;
|
||||||
use schema::selection::Selection;
|
use schema::selection::Selection;
|
||||||
use server::{
|
use server::{db::Db, rules::ProvidedDatabaseRules, ApplicationState, Server};
|
||||||
connection::ConnectionManagerImpl, db::Db, rules::ProvidedDatabaseRules, ApplicationState,
|
|
||||||
Server,
|
|
||||||
};
|
|
||||||
use trace::RingBufferTraceCollector;
|
use trace::RingBufferTraceCollector;
|
||||||
|
|
||||||
fn make_application() -> Arc<ApplicationState> {
|
fn make_application() -> Arc<ApplicationState> {
|
||||||
|
@ -353,12 +345,8 @@ mod tests {
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn make_server(application: Arc<ApplicationState>) -> Arc<Server<ConnectionManagerImpl>> {
|
fn make_server(application: Arc<ApplicationState>) -> Arc<Server> {
|
||||||
Arc::new(Server::new(
|
Arc::new(Server::new(application, Default::default()))
|
||||||
ConnectionManagerImpl::new(),
|
|
||||||
application,
|
|
||||||
Default::default(),
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
@ -376,10 +364,7 @@ mod tests {
|
||||||
assert_tracing(setup_server().await).await;
|
assert_tracing(setup_server().await).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn assert_dbwrite(
|
async fn assert_dbwrite(test_server: TestServer<DatabaseServerType>, write: DmlWrite) {
|
||||||
test_server: TestServer<DatabaseServerType<ConnectionManagerImpl>>,
|
|
||||||
write: DmlWrite,
|
|
||||||
) {
|
|
||||||
let (table_name, mutable_batch) = write.tables().next().unwrap();
|
let (table_name, mutable_batch) = write.tables().next().unwrap();
|
||||||
|
|
||||||
let test_db = test_server
|
let test_db = test_server
|
||||||
|
@ -528,10 +513,7 @@ mod tests {
|
||||||
/// Sets up a test database with some data for testing the query endpoint
|
/// Sets up a test database with some data for testing the query endpoint
|
||||||
/// returns a client for communicating with the server, and the server
|
/// returns a client for communicating with the server, and the server
|
||||||
/// endpoint
|
/// endpoint
|
||||||
async fn setup_test_data() -> (
|
async fn setup_test_data() -> (Client, TestServer<DatabaseServerType>) {
|
||||||
Client,
|
|
||||||
TestServer<DatabaseServerType<ConnectionManagerImpl>>,
|
|
||||||
) {
|
|
||||||
let test_server = setup_server().await;
|
let test_server = setup_server().await;
|
||||||
|
|
||||||
let client = Client::new();
|
let client = Client::new();
|
||||||
|
@ -688,7 +670,7 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// return a test server and the url to contact it for `MyOrg_MyBucket`
|
/// return a test server and the url to contact it for `MyOrg_MyBucket`
|
||||||
async fn setup_server() -> TestServer<DatabaseServerType<ConnectionManagerImpl>> {
|
async fn setup_server() -> TestServer<DatabaseServerType> {
|
||||||
let application = make_application();
|
let application = make_application();
|
||||||
|
|
||||||
let app_server = make_server(Arc::clone(&application));
|
let app_server = make_server(Arc::clone(&application));
|
||||||
|
|
|
@ -5,7 +5,7 @@ use futures::{future::FusedFuture, FutureExt};
|
||||||
use hyper::{Body, Request, Response};
|
use hyper::{Body, Request, Response};
|
||||||
use metric::Registry;
|
use metric::Registry;
|
||||||
use observability_deps::tracing::{error, info};
|
use observability_deps::tracing::{error, info};
|
||||||
use server::{connection::ConnectionManager, ApplicationState, Server};
|
use server::{ApplicationState, Server};
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use trace::TraceCollector;
|
use trace::TraceCollector;
|
||||||
|
|
||||||
|
@ -25,25 +25,19 @@ pub use self::http::ApplicationError;
|
||||||
use super::common_state::CommonServerState;
|
use super::common_state::CommonServerState;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct DatabaseServerType<M>
|
pub struct DatabaseServerType {
|
||||||
where
|
|
||||||
M: ConnectionManager + std::fmt::Debug + Send + Sync + 'static,
|
|
||||||
{
|
|
||||||
pub application: Arc<ApplicationState>,
|
pub application: Arc<ApplicationState>,
|
||||||
pub server: Arc<Server<M>>,
|
pub server: Arc<Server>,
|
||||||
pub lp_metrics: Arc<LineProtocolMetrics>,
|
pub lp_metrics: Arc<LineProtocolMetrics>,
|
||||||
pub max_request_size: usize,
|
pub max_request_size: usize,
|
||||||
pub serving_readiness: ServingReadiness,
|
pub serving_readiness: ServingReadiness,
|
||||||
shutdown: CancellationToken,
|
shutdown: CancellationToken,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M> DatabaseServerType<M>
|
impl DatabaseServerType {
|
||||||
where
|
|
||||||
M: ConnectionManager + std::fmt::Debug + Send + Sync + 'static,
|
|
||||||
{
|
|
||||||
pub fn new(
|
pub fn new(
|
||||||
application: Arc<ApplicationState>,
|
application: Arc<ApplicationState>,
|
||||||
server: Arc<Server<M>>,
|
server: Arc<Server>,
|
||||||
common_state: &CommonServerState,
|
common_state: &CommonServerState,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let lp_metrics = Arc::new(LineProtocolMetrics::new(
|
let lp_metrics = Arc::new(LineProtocolMetrics::new(
|
||||||
|
@ -62,10 +56,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<M> ServerType for DatabaseServerType<M>
|
impl ServerType for DatabaseServerType {
|
||||||
where
|
|
||||||
M: ConnectionManager + std::fmt::Debug + Send + Sync + 'static,
|
|
||||||
{
|
|
||||||
type RouteError = ApplicationError;
|
type RouteError = ApplicationError;
|
||||||
|
|
||||||
fn metric_registry(&self) -> Arc<Registry> {
|
fn metric_registry(&self) -> Arc<Registry> {
|
||||||
|
@ -132,7 +123,7 @@ mod tests {
|
||||||
use data_types::{database_rules::DatabaseRules, DatabaseName};
|
use data_types::{database_rules::DatabaseRules, DatabaseName};
|
||||||
use futures::pin_mut;
|
use futures::pin_mut;
|
||||||
use influxdb_iox_client::{connection::Connection, flight::PerformQuery};
|
use influxdb_iox_client::{connection::Connection, flight::PerformQuery};
|
||||||
use server::{connection::ConnectionManagerImpl, rules::ProvidedDatabaseRules};
|
use server::rules::ProvidedDatabaseRules;
|
||||||
use std::{convert::TryInto, net::SocketAddr, num::NonZeroU64};
|
use std::{convert::TryInto, net::SocketAddr, num::NonZeroU64};
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
|
@ -157,7 +148,7 @@ mod tests {
|
||||||
async fn test_serve(
|
async fn test_serve(
|
||||||
config: RunConfig,
|
config: RunConfig,
|
||||||
application: Arc<ApplicationState>,
|
application: Arc<ApplicationState>,
|
||||||
server: Arc<Server<ConnectionManagerImpl>>,
|
server: Arc<Server>,
|
||||||
) {
|
) {
|
||||||
let grpc_listener = grpc_listener(config.grpc_bind_address.into())
|
let grpc_listener = grpc_listener(config.grpc_bind_address.into())
|
||||||
.await
|
.await
|
||||||
|
@ -348,7 +339,7 @@ mod tests {
|
||||||
collector: &Arc<T>,
|
collector: &Arc<T>,
|
||||||
) -> (
|
) -> (
|
||||||
SocketAddr,
|
SocketAddr,
|
||||||
Arc<Server<ConnectionManagerImpl>>,
|
Arc<Server>,
|
||||||
JoinHandle<crate::influxdb_ioxd::Result<()>>,
|
JoinHandle<crate::influxdb_ioxd::Result<()>>,
|
||||||
) {
|
) {
|
||||||
let config = test_config(Some(23));
|
let config = test_config(Some(23));
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
use std::fmt::Debug;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use data_types::non_empty::NonEmptyString;
|
use data_types::non_empty::NonEmptyString;
|
||||||
|
@ -6,20 +5,17 @@ use data_types::DatabaseName;
|
||||||
use dml::{DmlDelete, DmlMeta};
|
use dml::{DmlDelete, DmlMeta};
|
||||||
use generated_types::google::{FieldViolationExt, FromOptionalField, OptionalField};
|
use generated_types::google::{FieldViolationExt, FromOptionalField, OptionalField};
|
||||||
use generated_types::influxdata::iox::delete::v1::*;
|
use generated_types::influxdata::iox::delete::v1::*;
|
||||||
use server::{connection::ConnectionManager, Server};
|
use server::Server;
|
||||||
use tonic::Response;
|
use tonic::Response;
|
||||||
|
|
||||||
struct DeleteService<M: ConnectionManager> {
|
struct DeleteService {
|
||||||
server: Arc<Server<M>>,
|
server: Arc<Server>,
|
||||||
}
|
}
|
||||||
|
|
||||||
use super::error::{default_db_error_handler, default_server_error_handler};
|
use super::error::{default_db_error_handler, default_server_error_handler};
|
||||||
|
|
||||||
#[tonic::async_trait]
|
#[tonic::async_trait]
|
||||||
impl<M> delete_service_server::DeleteService for DeleteService<M>
|
impl delete_service_server::DeleteService for DeleteService {
|
||||||
where
|
|
||||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
|
||||||
{
|
|
||||||
async fn delete(
|
async fn delete(
|
||||||
&self,
|
&self,
|
||||||
request: tonic::Request<DeleteRequest>,
|
request: tonic::Request<DeleteRequest>,
|
||||||
|
@ -50,11 +46,8 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn make_server<M>(
|
pub fn make_server(
|
||||||
server: Arc<Server<M>>,
|
server: Arc<Server>,
|
||||||
) -> delete_service_server::DeleteServiceServer<impl delete_service_server::DeleteService>
|
) -> delete_service_server::DeleteServiceServer<impl delete_service_server::DeleteService> {
|
||||||
where
|
|
||||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
|
||||||
{
|
|
||||||
delete_service_server::DeleteServiceServer::new(DeleteService { server })
|
delete_service_server::DeleteServiceServer::new(DeleteService { server })
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,12 +3,12 @@ use generated_types::{
|
||||||
google::{FieldViolation, NotFound},
|
google::{FieldViolation, NotFound},
|
||||||
influxdata::iox::deployment::v1::*,
|
influxdata::iox::deployment::v1::*,
|
||||||
};
|
};
|
||||||
use server::{connection::ConnectionManager, Error, Server};
|
use server::{Error, Server};
|
||||||
use std::{convert::TryFrom, fmt::Debug, sync::Arc};
|
use std::{convert::TryFrom, sync::Arc};
|
||||||
use tonic::{Request, Response, Status};
|
use tonic::{Request, Response, Status};
|
||||||
|
|
||||||
struct DeploymentService<M: ConnectionManager> {
|
struct DeploymentService {
|
||||||
server: Arc<Server<M>>,
|
server: Arc<Server>,
|
||||||
serving_readiness: ServingReadiness,
|
serving_readiness: ServingReadiness,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,10 +16,7 @@ use super::error::default_server_error_handler;
|
||||||
use crate::influxdb_ioxd::serving_readiness::ServingReadiness;
|
use crate::influxdb_ioxd::serving_readiness::ServingReadiness;
|
||||||
|
|
||||||
#[tonic::async_trait]
|
#[tonic::async_trait]
|
||||||
impl<M> deployment_service_server::DeploymentService for DeploymentService<M>
|
impl deployment_service_server::DeploymentService for DeploymentService {
|
||||||
where
|
|
||||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
|
||||||
{
|
|
||||||
async fn get_server_id(
|
async fn get_server_id(
|
||||||
&self,
|
&self,
|
||||||
_: Request<GetServerIdRequest>,
|
_: Request<GetServerIdRequest>,
|
||||||
|
@ -69,15 +66,12 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn make_server<M>(
|
pub fn make_server(
|
||||||
server: Arc<Server<M>>,
|
server: Arc<Server>,
|
||||||
serving_readiness: ServingReadiness,
|
serving_readiness: ServingReadiness,
|
||||||
) -> deployment_service_server::DeploymentServiceServer<
|
) -> deployment_service_server::DeploymentServiceServer<
|
||||||
impl deployment_service_server::DeploymentService,
|
impl deployment_service_server::DeploymentService,
|
||||||
>
|
> {
|
||||||
where
|
|
||||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
|
||||||
{
|
|
||||||
deployment_service_server::DeploymentServiceServer::new(DeploymentService {
|
deployment_service_server::DeploymentServiceServer::new(DeploymentService {
|
||||||
server,
|
server,
|
||||||
serving_readiness,
|
serving_readiness,
|
||||||
|
|
|
@ -41,39 +41,10 @@ pub fn default_server_error_handler(error: server::Error) -> tonic::Status {
|
||||||
description: source.to_string(),
|
description: source.to_string(),
|
||||||
}
|
}
|
||||||
.into(),
|
.into(),
|
||||||
Error::HardLimitReached {} => QuotaFailure {
|
|
||||||
subject: "influxdata.com/iox/buffer".to_string(),
|
|
||||||
description: "hard buffer limit reached".to_string(),
|
|
||||||
}
|
|
||||||
.into(),
|
|
||||||
source @ Error::WritingOnlyAllowedThroughWriteBuffer { .. }
|
|
||||||
| source @ Error::ShardWrite { .. } => {
|
|
||||||
tonic::Status::failed_precondition(source.to_string())
|
|
||||||
}
|
|
||||||
Error::NoRemoteConfigured { node_group } => NotFound {
|
|
||||||
resource_type: "remote".to_string(),
|
|
||||||
resource_name: format!("{:?}", node_group),
|
|
||||||
..Default::default()
|
|
||||||
}
|
|
||||||
.into(),
|
|
||||||
Error::RemoteError { source } => tonic::Status::unavailable(source.to_string()),
|
|
||||||
Error::WipePreservedCatalog { source } => default_database_error_handler(source),
|
Error::WipePreservedCatalog { source } => default_database_error_handler(source),
|
||||||
Error::DeleteExpression {
|
|
||||||
start_time,
|
|
||||||
stop_time,
|
|
||||||
predicate,
|
|
||||||
} => FieldViolation {
|
|
||||||
field: format!(
|
|
||||||
"time range: [{}, {}], predicate: {}",
|
|
||||||
start_time, stop_time, predicate
|
|
||||||
),
|
|
||||||
description: "Invalid time range or predicate".to_string(),
|
|
||||||
}
|
|
||||||
.into(),
|
|
||||||
Error::DatabaseInit { source } => {
|
Error::DatabaseInit { source } => {
|
||||||
tonic::Status::invalid_argument(format!("Cannot initialize database: {}", source))
|
tonic::Status::invalid_argument(format!("Cannot initialize database: {}", source))
|
||||||
}
|
}
|
||||||
Error::StoreWriteErrors { .. } => tonic::Status::invalid_argument(error.to_string()),
|
|
||||||
Error::DatabaseAlreadyExists { .. } | Error::DatabaseAlreadyOwnedByThisServer { .. } => {
|
Error::DatabaseAlreadyExists { .. } | Error::DatabaseAlreadyOwnedByThisServer { .. } => {
|
||||||
tonic::Status::already_exists(error.to_string())
|
tonic::Status::already_exists(error.to_string())
|
||||||
}
|
}
|
||||||
|
@ -185,9 +156,42 @@ pub fn default_db_error_handler(error: server::db::Error) -> tonic::Status {
|
||||||
}
|
}
|
||||||
.into(),
|
.into(),
|
||||||
Error::CatalogError { source } => default_catalog_error_handler(source),
|
Error::CatalogError { source } => default_catalog_error_handler(source),
|
||||||
|
Error::HardLimitReached {} => QuotaFailure {
|
||||||
|
subject: "influxdata.com/iox/buffer".to_string(),
|
||||||
|
description: "hard buffer limit reached".to_string(),
|
||||||
|
}
|
||||||
|
.into(),
|
||||||
|
Error::StoreWriteErrors { .. } => tonic::Status::invalid_argument(error.to_string()),
|
||||||
error => {
|
error => {
|
||||||
error!(?error, "Unexpected error");
|
error!(?error, "Unexpected error");
|
||||||
InternalError {}.into()
|
InternalError {}.into()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// map common [`database::WriteError`](server::database::WriteError) errors to the appropriate tonic Status
|
||||||
|
pub fn default_database_write_error_handler(error: server::database::WriteError) -> tonic::Status {
|
||||||
|
use server::database::WriteError;
|
||||||
|
|
||||||
|
match error {
|
||||||
|
WriteError::HardLimitReached {} => QuotaFailure {
|
||||||
|
subject: "influxdata.com/iox/buffer".to_string(),
|
||||||
|
description: "hard buffer limit reached".to_string(),
|
||||||
|
}
|
||||||
|
.into(),
|
||||||
|
WriteError::DbError { source } => default_db_error_handler(source),
|
||||||
|
source @ WriteError::WritingOnlyAllowedThroughWriteBuffer => {
|
||||||
|
tonic::Status::failed_precondition(source.to_string())
|
||||||
|
}
|
||||||
|
WriteError::NotInitialized { state } => {
|
||||||
|
tonic::Status::unavailable(format!("Database is not yet initialized: {}", state))
|
||||||
|
}
|
||||||
|
error @ WriteError::StoreWriteErrors { .. } => {
|
||||||
|
tonic::Status::invalid_argument(error.to_string())
|
||||||
|
}
|
||||||
|
error => {
|
||||||
|
error!(?error, "Unexpected write error");
|
||||||
|
InternalError {}.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ use tonic::{Request, Response, Streaming};
|
||||||
use data_types::{DatabaseName, DatabaseNameError};
|
use data_types::{DatabaseName, DatabaseNameError};
|
||||||
use observability_deps::tracing::{info, warn};
|
use observability_deps::tracing::{info, warn};
|
||||||
use query::exec::{ExecutionContextProvider, IOxExecutionContext};
|
use query::exec::{ExecutionContextProvider, IOxExecutionContext};
|
||||||
use server::{connection::ConnectionManager, Server};
|
use server::Server;
|
||||||
|
|
||||||
use super::error::default_server_error_handler;
|
use super::error::default_server_error_handler;
|
||||||
use crate::influxdb_ioxd::planner::Planner;
|
use crate::influxdb_ioxd::planner::Planner;
|
||||||
|
@ -125,22 +125,16 @@ struct ReadInfo {
|
||||||
|
|
||||||
/// Concrete implementation of the gRPC Arrow Flight Service API
|
/// Concrete implementation of the gRPC Arrow Flight Service API
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct FlightService<M: ConnectionManager> {
|
struct FlightService {
|
||||||
server: Arc<Server<M>>,
|
server: Arc<Server>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn make_server<M>(server: Arc<Server<M>>) -> FlightServer<impl Flight>
|
pub fn make_server(server: Arc<Server>) -> FlightServer<impl Flight> {
|
||||||
where
|
|
||||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
|
||||||
{
|
|
||||||
FlightServer::new(FlightService { server })
|
FlightServer::new(FlightService { server })
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tonic::async_trait]
|
#[tonic::async_trait]
|
||||||
impl<M: ConnectionManager> Flight for FlightService<M>
|
impl Flight for FlightService {
|
||||||
where
|
|
||||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
|
||||||
{
|
|
||||||
type HandshakeStream = TonicStream<HandshakeResponse>;
|
type HandshakeStream = TonicStream<HandshakeResponse>;
|
||||||
type ListFlightsStream = TonicStream<FlightInfo>;
|
type ListFlightsStream = TonicStream<FlightInfo>;
|
||||||
type DoGetStream = TonicStream<FlightData>;
|
type DoGetStream = TonicStream<FlightData>;
|
||||||
|
|
|
@ -4,16 +4,14 @@ use generated_types::{
|
||||||
influxdata::iox::management::v1::{Error as ProtobufError, *},
|
influxdata::iox::management::v1::{Error as ProtobufError, *},
|
||||||
};
|
};
|
||||||
use query::QueryDatabase;
|
use query::QueryDatabase;
|
||||||
use server::{
|
use server::{rules::ProvidedDatabaseRules, ApplicationState, Error, Server};
|
||||||
connection::ConnectionManager, rules::ProvidedDatabaseRules, ApplicationState, Error, Server,
|
use std::{convert::TryFrom, sync::Arc};
|
||||||
};
|
|
||||||
use std::{convert::TryFrom, fmt::Debug, sync::Arc};
|
|
||||||
use tonic::{Request, Response, Status};
|
use tonic::{Request, Response, Status};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
struct ManagementService<M: ConnectionManager> {
|
struct ManagementService {
|
||||||
application: Arc<ApplicationState>,
|
application: Arc<ApplicationState>,
|
||||||
server: Arc<Server<M>>,
|
server: Arc<Server>,
|
||||||
}
|
}
|
||||||
|
|
||||||
use super::error::{
|
use super::error::{
|
||||||
|
@ -21,10 +19,7 @@ use super::error::{
|
||||||
};
|
};
|
||||||
|
|
||||||
#[tonic::async_trait]
|
#[tonic::async_trait]
|
||||||
impl<M> management_service_server::ManagementService for ManagementService<M>
|
impl management_service_server::ManagementService for ManagementService {
|
||||||
where
|
|
||||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
|
||||||
{
|
|
||||||
async fn list_databases(
|
async fn list_databases(
|
||||||
&self,
|
&self,
|
||||||
request: Request<ListDatabasesRequest>,
|
request: Request<ListDatabasesRequest>,
|
||||||
|
@ -523,15 +518,12 @@ fn format_rules(provided_rules: Arc<ProvidedDatabaseRules>, omit_defaults: bool)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn make_server<M>(
|
pub fn make_server(
|
||||||
application: Arc<ApplicationState>,
|
application: Arc<ApplicationState>,
|
||||||
server: Arc<Server<M>>,
|
server: Arc<Server>,
|
||||||
) -> management_service_server::ManagementServiceServer<
|
) -> management_service_server::ManagementServiceServer<
|
||||||
impl management_service_server::ManagementService,
|
impl management_service_server::ManagementService,
|
||||||
>
|
> {
|
||||||
where
|
|
||||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
|
||||||
{
|
|
||||||
management_service_server::ManagementServiceServer::new(ManagementService {
|
management_service_server::ManagementServiceServer::new(ManagementService {
|
||||||
application,
|
application,
|
||||||
server,
|
server,
|
||||||
|
|
|
@ -1,7 +1,5 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use server::connection::ConnectionManager;
|
|
||||||
|
|
||||||
use crate::influxdb_ioxd::{
|
use crate::influxdb_ioxd::{
|
||||||
rpc::{add_gated_service, add_service, serve_builder, setup_builder, RpcBuilderInput},
|
rpc::{add_gated_service, add_service, serve_builder, setup_builder, RpcBuilderInput},
|
||||||
server_type::{database::DatabaseServerType, RpcError},
|
server_type::{database::DatabaseServerType, RpcError},
|
||||||
|
@ -16,13 +14,10 @@ mod operations;
|
||||||
mod storage;
|
mod storage;
|
||||||
mod write_pb;
|
mod write_pb;
|
||||||
|
|
||||||
pub async fn server_grpc<M>(
|
pub async fn server_grpc(
|
||||||
server_type: Arc<DatabaseServerType<M>>,
|
server_type: Arc<DatabaseServerType>,
|
||||||
builder_input: RpcBuilderInput,
|
builder_input: RpcBuilderInput,
|
||||||
) -> Result<(), RpcError>
|
) -> Result<(), RpcError> {
|
||||||
where
|
|
||||||
M: ConnectionManager + std::fmt::Debug + Send + Sync + 'static,
|
|
||||||
{
|
|
||||||
let builder = setup_builder!(builder_input, server_type);
|
let builder = setup_builder!(builder_input, server_type);
|
||||||
|
|
||||||
add_gated_service!(
|
add_gated_service!(
|
||||||
|
|
|
@ -1,22 +1,18 @@
|
||||||
use data_types::DatabaseName;
|
use data_types::DatabaseName;
|
||||||
use dml::{DmlMeta, DmlWrite};
|
use dml::{DmlMeta, DmlOperation, DmlWrite};
|
||||||
use generated_types::google::{FieldViolation, FieldViolationExt};
|
use generated_types::google::{FieldViolation, FieldViolationExt};
|
||||||
use generated_types::influxdata::pbdata::v1::*;
|
use generated_types::influxdata::pbdata::v1::*;
|
||||||
use server::{connection::ConnectionManager, Server};
|
use server::Server;
|
||||||
use std::fmt::Debug;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use super::error::default_server_error_handler;
|
use super::error::{default_database_write_error_handler, default_server_error_handler};
|
||||||
|
|
||||||
struct PBWriteService<M: ConnectionManager> {
|
struct PBWriteService {
|
||||||
server: Arc<Server<M>>,
|
server: Arc<Server>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tonic::async_trait]
|
#[tonic::async_trait]
|
||||||
impl<M> write_service_server::WriteService for PBWriteService<M>
|
impl write_service_server::WriteService for PBWriteService {
|
||||||
where
|
|
||||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
|
||||||
{
|
|
||||||
async fn write(
|
async fn write(
|
||||||
&self,
|
&self,
|
||||||
request: tonic::Request<WriteRequest>,
|
request: tonic::Request<WriteRequest>,
|
||||||
|
@ -27,9 +23,6 @@ where
|
||||||
.database_batch
|
.database_batch
|
||||||
.ok_or_else(|| FieldViolation::required("database_batch"))?;
|
.ok_or_else(|| FieldViolation::required("database_batch"))?;
|
||||||
|
|
||||||
let db_name = DatabaseName::new(&database_batch.database_name)
|
|
||||||
.scope("database_batch.database_name")?;
|
|
||||||
|
|
||||||
let tables =
|
let tables =
|
||||||
mutable_batch_pb::decode::decode_database_batch(&database_batch).map_err(|e| {
|
mutable_batch_pb::decode::decode_database_batch(&database_batch).map_err(|e| {
|
||||||
FieldViolation {
|
FieldViolation {
|
||||||
|
@ -40,20 +33,24 @@ where
|
||||||
|
|
||||||
let write = DmlWrite::new(tables, DmlMeta::unsequenced(span_ctx));
|
let write = DmlWrite::new(tables, DmlMeta::unsequenced(span_ctx));
|
||||||
|
|
||||||
self.server
|
let db_name = DatabaseName::new(&database_batch.database_name)
|
||||||
.write(&db_name, write)
|
.scope("database_batch.database_name")?;
|
||||||
.await
|
let database = self
|
||||||
|
.server
|
||||||
|
.active_database(&db_name)
|
||||||
.map_err(default_server_error_handler)?;
|
.map_err(default_server_error_handler)?;
|
||||||
|
|
||||||
|
database
|
||||||
|
.route_operation(&DmlOperation::Write(write))
|
||||||
|
.await
|
||||||
|
.map_err(default_database_write_error_handler)?;
|
||||||
|
|
||||||
Ok(tonic::Response::new(WriteResponse {}))
|
Ok(tonic::Response::new(WriteResponse {}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn make_server<M>(
|
pub fn make_server(
|
||||||
server: Arc<Server<M>>,
|
server: Arc<Server>,
|
||||||
) -> write_service_server::WriteServiceServer<impl write_service_server::WriteService>
|
) -> write_service_server::WriteServiceServer<impl write_service_server::WriteService> {
|
||||||
where
|
|
||||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
|
||||||
{
|
|
||||||
write_service_server::WriteServiceServer::new(PBWriteService { server })
|
write_service_server::WriteServiceServer::new(PBWriteService { server })
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,9 +2,7 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use object_store::ObjectStore;
|
use object_store::ObjectStore;
|
||||||
use observability_deps::tracing::warn;
|
use observability_deps::tracing::warn;
|
||||||
use server::{
|
use server::{ApplicationState, Server, ServerConfig};
|
||||||
connection::ConnectionManagerImpl, ApplicationState, RemoteTemplate, Server, ServerConfig,
|
|
||||||
};
|
|
||||||
use snafu::{ResultExt, Snafu};
|
use snafu::{ResultExt, Snafu};
|
||||||
use trace::TraceCollector;
|
use trace::TraceCollector;
|
||||||
|
|
||||||
|
@ -47,18 +45,13 @@ pub async fn make_application(
|
||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn make_server(
|
pub fn make_server(application: Arc<ApplicationState>, config: &Config) -> Arc<Server> {
|
||||||
application: Arc<ApplicationState>,
|
|
||||||
config: &Config,
|
|
||||||
) -> Arc<Server<ConnectionManagerImpl>> {
|
|
||||||
let server_config = ServerConfig {
|
let server_config = ServerConfig {
|
||||||
remote_template: config.remote_template.clone().map(RemoteTemplate::new),
|
|
||||||
wipe_catalog_on_error: config.wipe_catalog_on_error.into(),
|
wipe_catalog_on_error: config.wipe_catalog_on_error.into(),
|
||||||
skip_replay_and_seek_instead: config.skip_replay_and_seek_instead.into(),
|
skip_replay_and_seek_instead: config.skip_replay_and_seek_instead.into(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let connection_manager = ConnectionManagerImpl::new();
|
let app_server = Arc::new(Server::new(application, server_config));
|
||||||
let app_server = Arc::new(Server::new(connection_manager, application, server_config));
|
|
||||||
|
|
||||||
// if this ID isn't set the server won't be usable until this is set via an API
|
// if this ID isn't set the server won't be usable until this is set via an API
|
||||||
// call
|
// call
|
||||||
|
|
|
@ -369,6 +369,13 @@ impl TestConfig {
|
||||||
server_type,
|
server_type,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// change server type
|
||||||
|
pub fn with_server_type(mut self, server_type: ServerType) -> Self {
|
||||||
|
self.server_type = server_type;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
// add a name=value environment variable when starting the server
|
// add a name=value environment variable when starting the server
|
||||||
pub fn with_env(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
|
pub fn with_env(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
|
||||||
self.env.push((name.into(), value.into()));
|
self.env.push((name.into(), value.into()));
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::iter::once;
|
|
||||||
use std::num::NonZeroU32;
|
use std::num::NonZeroU32;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
@ -25,13 +24,9 @@ use tempfile::TempDir;
|
||||||
use test_helpers::assert_contains;
|
use test_helpers::assert_contains;
|
||||||
|
|
||||||
use data_types::{names::org_and_bucket_to_database, DatabaseName};
|
use data_types::{names::org_and_bucket_to_database, DatabaseName};
|
||||||
use database_rules::RoutingRules;
|
|
||||||
use generated_types::google::protobuf::Empty;
|
use generated_types::google::protobuf::Empty;
|
||||||
use generated_types::{
|
use generated_types::{
|
||||||
influxdata::iox::{
|
influxdata::iox::{management::v1::*, write_buffer::v1::WriteBufferCreationConfig},
|
||||||
management::v1::{self as management, *},
|
|
||||||
write_buffer::v1::WriteBufferCreationConfig,
|
|
||||||
},
|
|
||||||
ReadSource, TimestampRange,
|
ReadSource, TimestampRange,
|
||||||
};
|
};
|
||||||
use influxdb_iox_client::{connection::Connection, flight::PerformQuery};
|
use influxdb_iox_client::{connection::Connection, flight::PerformQuery};
|
||||||
|
@ -312,7 +307,6 @@ pub struct DatabaseBuilder {
|
||||||
partition_template: PartitionTemplate,
|
partition_template: PartitionTemplate,
|
||||||
lifecycle_rules: LifecycleRules,
|
lifecycle_rules: LifecycleRules,
|
||||||
write_buffer: Option<WriteBufferConnection>,
|
write_buffer: Option<WriteBufferConnection>,
|
||||||
table_whitelist: Option<Vec<String>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DatabaseBuilder {
|
impl DatabaseBuilder {
|
||||||
|
@ -331,7 +325,6 @@ impl DatabaseBuilder {
|
||||||
..Default::default()
|
..Default::default()
|
||||||
},
|
},
|
||||||
write_buffer: None,
|
write_buffer: None,
|
||||||
table_whitelist: None,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -375,11 +368,6 @@ impl DatabaseBuilder {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write_buffer_table_whitelist(mut self, whitelist: Vec<String>) -> Self {
|
|
||||||
self.table_whitelist = Some(whitelist);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn worker_backoff_millis(mut self, millis: u64) -> Self {
|
pub fn worker_backoff_millis(mut self, millis: u64) -> Self {
|
||||||
self.lifecycle_rules.worker_backoff_millis = millis;
|
self.lifecycle_rules.worker_backoff_millis = millis;
|
||||||
self
|
self
|
||||||
|
@ -389,58 +377,13 @@ impl DatabaseBuilder {
|
||||||
pub async fn try_build(self, channel: Connection) -> Result<(), CreateDatabaseError> {
|
pub async fn try_build(self, channel: Connection) -> Result<(), CreateDatabaseError> {
|
||||||
let mut management_client = influxdb_iox_client::management::Client::new(channel);
|
let mut management_client = influxdb_iox_client::management::Client::new(channel);
|
||||||
|
|
||||||
let routing_rules = if self.write_buffer.is_some() {
|
|
||||||
const KAFKA_PRODUCER_SINK_ID: u32 = 0;
|
|
||||||
let kafka_producer_sink = management::Sink {
|
|
||||||
sink: Some(management::sink::Sink::Kafka(KafkaProducer {})),
|
|
||||||
};
|
|
||||||
const DEV_NULL_SINK_ID: u32 = 1;
|
|
||||||
let dev_null_sink = management::Sink {
|
|
||||||
sink: Some(management::sink::Sink::DevNull(DevNull {})),
|
|
||||||
};
|
|
||||||
|
|
||||||
let to_shard = |shard: u32| {
|
|
||||||
Box::new(move |i: String| MatcherToShard {
|
|
||||||
matcher: Some(Matcher {
|
|
||||||
table_name_regex: format!("^{}$", i),
|
|
||||||
}),
|
|
||||||
shard,
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(table_whitelist) = self.table_whitelist {
|
|
||||||
Some(RoutingRules::ShardConfig(ShardConfig {
|
|
||||||
specific_targets: table_whitelist
|
|
||||||
.into_iter()
|
|
||||||
.map(to_shard(KAFKA_PRODUCER_SINK_ID))
|
|
||||||
.chain(once(to_shard(DEV_NULL_SINK_ID)(".*".to_string())))
|
|
||||||
.collect(),
|
|
||||||
shards: vec![
|
|
||||||
(KAFKA_PRODUCER_SINK_ID, kafka_producer_sink),
|
|
||||||
(DEV_NULL_SINK_ID, dev_null_sink),
|
|
||||||
]
|
|
||||||
.into_iter()
|
|
||||||
.collect(),
|
|
||||||
..Default::default()
|
|
||||||
}))
|
|
||||||
} else {
|
|
||||||
Some(RoutingRules::RoutingConfig(RoutingConfig {
|
|
||||||
sink: Some(management::Sink {
|
|
||||||
sink: Some(management::sink::Sink::Kafka(KafkaProducer {})),
|
|
||||||
}),
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
management_client
|
management_client
|
||||||
.create_database(DatabaseRules {
|
.create_database(DatabaseRules {
|
||||||
name: self.name,
|
name: self.name,
|
||||||
partition_template: Some(self.partition_template),
|
partition_template: Some(self.partition_template),
|
||||||
lifecycle_rules: Some(self.lifecycle_rules),
|
lifecycle_rules: Some(self.lifecycle_rules),
|
||||||
worker_cleanup_avg_sleep: None,
|
worker_cleanup_avg_sleep: None,
|
||||||
routing_rules,
|
routing_rules: None,
|
||||||
write_buffer_connection: self.write_buffer,
|
write_buffer_connection: self.write_buffer,
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -769,27 +712,25 @@ pub async fn fixture_replay_broken(db_name: &str, write_buffer_path: &Path) -> S
|
||||||
fixture
|
fixture
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn create_router_to_write_buffer(
|
pub fn wildcard_router_config(
|
||||||
fixture: &ServerFixture,
|
|
||||||
db_name: &str,
|
db_name: &str,
|
||||||
) -> (TempDir, Box<dyn WriteBufferReading>) {
|
write_buffer_path: &Path,
|
||||||
|
) -> influxdb_iox_client::router::generated_types::Router {
|
||||||
use influxdb_iox_client::router::generated_types::{
|
use influxdb_iox_client::router::generated_types::{
|
||||||
write_sink::Sink, Matcher, MatcherToShard, Router, ShardConfig, WriteSink, WriteSinkSet,
|
write_sink::Sink, Matcher, MatcherToShard, Router, ShardConfig, WriteSink, WriteSinkSet,
|
||||||
};
|
};
|
||||||
|
|
||||||
let write_buffer_dir = TempDir::new().unwrap();
|
|
||||||
|
|
||||||
let write_buffer_connection = WriteBufferConnection {
|
let write_buffer_connection = WriteBufferConnection {
|
||||||
direction: write_buffer_connection::Direction::Write.into(),
|
direction: write_buffer_connection::Direction::Write.into(),
|
||||||
r#type: "file".to_string(),
|
r#type: "file".to_string(),
|
||||||
connection: write_buffer_dir.path().display().to_string(),
|
connection: write_buffer_path.display().to_string(),
|
||||||
creation_config: Some(WriteBufferCreationConfig {
|
creation_config: Some(WriteBufferCreationConfig {
|
||||||
n_sequencers: 1,
|
n_sequencers: 1,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
}),
|
}),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let router_cfg = Router {
|
Router {
|
||||||
name: db_name.to_string(),
|
name: db_name.to_string(),
|
||||||
write_sharder: Some(ShardConfig {
|
write_sharder: Some(ShardConfig {
|
||||||
specific_targets: vec![MatcherToShard {
|
specific_targets: vec![MatcherToShard {
|
||||||
|
@ -810,7 +751,16 @@ pub async fn create_router_to_write_buffer(
|
||||||
},
|
},
|
||||||
)]),
|
)]),
|
||||||
query_sinks: Default::default(),
|
query_sinks: Default::default(),
|
||||||
};
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn create_router_to_write_buffer(
|
||||||
|
fixture: &ServerFixture,
|
||||||
|
db_name: &str,
|
||||||
|
) -> (TempDir, Box<dyn WriteBufferReading>) {
|
||||||
|
let write_buffer_dir = TempDir::new().unwrap();
|
||||||
|
|
||||||
|
let router_cfg = wildcard_router_config(db_name, write_buffer_dir.path());
|
||||||
fixture
|
fixture
|
||||||
.router_client()
|
.router_client()
|
||||||
.update_router(router_cfg)
|
.update_router(router_cfg)
|
||||||
|
|
|
@ -3,11 +3,9 @@ use crate::{
|
||||||
server_fixture::{ServerFixture, ServerType, TestConfig},
|
server_fixture::{ServerFixture, ServerType, TestConfig},
|
||||||
udp_listener::UdpCapture,
|
udp_listener::UdpCapture,
|
||||||
},
|
},
|
||||||
end_to_end_cases::scenario::{rand_name, DatabaseBuilder},
|
end_to_end_cases::scenario::{rand_name, wildcard_router_config, DatabaseBuilder},
|
||||||
};
|
};
|
||||||
use arrow_util::assert_batches_sorted_eq;
|
use arrow_util::assert_batches_sorted_eq;
|
||||||
use dml::DmlOperation;
|
|
||||||
use futures::StreamExt;
|
|
||||||
use generated_types::influxdata::iox::write_buffer::v1::{
|
use generated_types::influxdata::iox::write_buffer::v1::{
|
||||||
write_buffer_connection::Direction as WriteBufferDirection, WriteBufferConnection,
|
write_buffer_connection::Direction as WriteBufferDirection, WriteBufferConnection,
|
||||||
};
|
};
|
||||||
|
@ -19,112 +17,7 @@ use std::{num::NonZeroU32, sync::Arc};
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
use test_helpers::assert_contains;
|
use test_helpers::assert_contains;
|
||||||
use time::SystemProvider;
|
use time::SystemProvider;
|
||||||
use write_buffer::{
|
use write_buffer::{core::WriteBufferWriting, file::FileBufferProducer};
|
||||||
core::{WriteBufferReading, WriteBufferWriting},
|
|
||||||
file::{FileBufferConsumer, FileBufferProducer},
|
|
||||||
};
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn writes_go_to_write_buffer() {
|
|
||||||
let write_buffer_dir = TempDir::new().unwrap();
|
|
||||||
|
|
||||||
// set up a database with a write buffer pointing at write buffer
|
|
||||||
let server = ServerFixture::create_shared(ServerType::Database).await;
|
|
||||||
let db_name = rand_name();
|
|
||||||
let write_buffer_connection = WriteBufferConnection {
|
|
||||||
direction: WriteBufferDirection::Write.into(),
|
|
||||||
r#type: "file".to_string(),
|
|
||||||
connection: write_buffer_dir.path().display().to_string(),
|
|
||||||
creation_config: Some(WriteBufferCreationConfig {
|
|
||||||
n_sequencers: 1,
|
|
||||||
..Default::default()
|
|
||||||
}),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
DatabaseBuilder::new(db_name.clone())
|
|
||||||
.write_buffer(write_buffer_connection.clone())
|
|
||||||
.build(server.grpc_channel())
|
|
||||||
.await;
|
|
||||||
|
|
||||||
// write some points
|
|
||||||
let mut write_client = server.write_client();
|
|
||||||
|
|
||||||
let lp_lines = [
|
|
||||||
"cpu,region=west user=23.2 100",
|
|
||||||
"cpu,region=west user=21.0 150",
|
|
||||||
"disk,region=east bytes=99i 200",
|
|
||||||
];
|
|
||||||
|
|
||||||
let num_lines_written = write_client
|
|
||||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
|
||||||
.await
|
|
||||||
.expect("cannot write");
|
|
||||||
assert_eq!(num_lines_written, 3);
|
|
||||||
|
|
||||||
// check the data is in write buffer
|
|
||||||
let mut consumer =
|
|
||||||
FileBufferConsumer::new(write_buffer_dir.path(), &db_name, Default::default(), None)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let (_, mut stream) = consumer.streams().into_iter().next().unwrap();
|
|
||||||
match stream.stream.next().await.unwrap().unwrap() {
|
|
||||||
DmlOperation::Write(write) => assert_eq!(write.table_count(), 2),
|
|
||||||
a => panic!("unexpected operation: {:?}", a),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn writes_go_to_write_buffer_whitelist() {
|
|
||||||
let write_buffer_dir = TempDir::new().unwrap();
|
|
||||||
|
|
||||||
// set up a database with a write buffer pointing at write buffer
|
|
||||||
let server = ServerFixture::create_shared(ServerType::Database).await;
|
|
||||||
let db_name = rand_name();
|
|
||||||
let write_buffer_connection = WriteBufferConnection {
|
|
||||||
direction: WriteBufferDirection::Write.into(),
|
|
||||||
r#type: "file".to_string(),
|
|
||||||
connection: write_buffer_dir.path().display().to_string(),
|
|
||||||
creation_config: Some(WriteBufferCreationConfig {
|
|
||||||
n_sequencers: 1,
|
|
||||||
..Default::default()
|
|
||||||
}),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
DatabaseBuilder::new(db_name.clone())
|
|
||||||
.write_buffer(write_buffer_connection)
|
|
||||||
.write_buffer_table_whitelist(vec!["cpu".to_string()])
|
|
||||||
.build(server.grpc_channel())
|
|
||||||
.await;
|
|
||||||
|
|
||||||
// write some points
|
|
||||||
let mut write_client = server.write_client();
|
|
||||||
|
|
||||||
let lp_lines = [
|
|
||||||
"cpu,region=west user=23.2 100",
|
|
||||||
"cpu,region=west user=21.0 150",
|
|
||||||
"disk,region=east bytes=99i 200",
|
|
||||||
"mem,region=east bytes=123 250",
|
|
||||||
];
|
|
||||||
|
|
||||||
let num_lines_written = write_client
|
|
||||||
.write_lp(&db_name, lp_lines.join("\n"), 0)
|
|
||||||
.await
|
|
||||||
.expect("cannot write");
|
|
||||||
assert_eq!(num_lines_written, 4);
|
|
||||||
|
|
||||||
// check the data is in write buffer
|
|
||||||
let mut consumer =
|
|
||||||
FileBufferConsumer::new(write_buffer_dir.path(), &db_name, Default::default(), None)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let (_, mut stream) = consumer.streams().into_iter().next().unwrap();
|
|
||||||
match stream.stream.next().await.unwrap().unwrap() {
|
|
||||||
DmlOperation::Write(write) => assert_eq!(write.table_count(), 1),
|
|
||||||
a => panic!("unexpected operation: {:?}", a),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn reads_come_from_write_buffer() {
|
async fn reads_come_from_write_buffer() {
|
||||||
|
@ -258,13 +151,7 @@ async fn cant_write_to_db_reading_from_write_buffer() {
|
||||||
.await
|
.await
|
||||||
.expect_err("expected write to fail");
|
.expect_err("expected write to fail");
|
||||||
|
|
||||||
assert_contains!(
|
assert_contains!(err.to_string(), "only allowed through write buffer");
|
||||||
err.to_string(),
|
|
||||||
format!(
|
|
||||||
r#"Cannot write to database {}, it's configured to only read from the write buffer"#,
|
|
||||||
db_name
|
|
||||||
)
|
|
||||||
);
|
|
||||||
assert!(matches!(dbg!(err), WriteError::ServerError(_)));
|
assert!(matches!(dbg!(err), WriteError::ServerError(_)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -300,7 +187,7 @@ pub async fn test_cross_write_buffer_tracing() {
|
||||||
|
|
||||||
// setup tracing
|
// setup tracing
|
||||||
let udp_capture = UdpCapture::new().await;
|
let udp_capture = UdpCapture::new().await;
|
||||||
let test_config = TestConfig::new(ServerType::Database)
|
let test_config = TestConfig::new(ServerType::Router)
|
||||||
.with_env("TRACES_EXPORTER", "jaeger")
|
.with_env("TRACES_EXPORTER", "jaeger")
|
||||||
.with_env("TRACES_EXPORTER_JAEGER_AGENT_HOST", udp_capture.ip())
|
.with_env("TRACES_EXPORTER_JAEGER_AGENT_HOST", udp_capture.ip())
|
||||||
.with_env("TRACES_EXPORTER_JAEGER_AGENT_PORT", udp_capture.port())
|
.with_env("TRACES_EXPORTER_JAEGER_AGENT_PORT", udp_capture.port())
|
||||||
|
@ -316,24 +203,18 @@ pub async fn test_cross_write_buffer_tracing() {
|
||||||
.update_server_id(NonZeroU32::new(1).unwrap())
|
.update_server_id(NonZeroU32::new(1).unwrap())
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
server_write.wait_server_initialized().await;
|
let router_cfg = wildcard_router_config(&db_name, write_buffer_dir.path());
|
||||||
let conn_write = WriteBufferConnection {
|
server_write
|
||||||
direction: WriteBufferDirection::Write.into(),
|
.router_client()
|
||||||
r#type: "file".to_string(),
|
.update_router(router_cfg)
|
||||||
connection: write_buffer_dir.path().display().to_string(),
|
.await
|
||||||
creation_config: Some(WriteBufferCreationConfig {
|
.unwrap();
|
||||||
n_sequencers: 1,
|
|
||||||
..Default::default()
|
|
||||||
}),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
DatabaseBuilder::new(db_name.clone())
|
|
||||||
.write_buffer(conn_write.clone())
|
|
||||||
.build(server_write.grpc_channel())
|
|
||||||
.await;
|
|
||||||
|
|
||||||
// create consumer DB
|
// create consumer DB
|
||||||
let server_read = ServerFixture::create_single_use_with_config(test_config).await;
|
let server_read = ServerFixture::create_single_use_with_config(
|
||||||
|
test_config.with_server_type(ServerType::Database),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
server_read
|
server_read
|
||||||
.deployment_client()
|
.deployment_client()
|
||||||
.update_server_id(NonZeroU32::new(2).unwrap())
|
.update_server_id(NonZeroU32::new(2).unwrap())
|
||||||
|
@ -342,7 +223,13 @@ pub async fn test_cross_write_buffer_tracing() {
|
||||||
server_read.wait_server_initialized().await;
|
server_read.wait_server_initialized().await;
|
||||||
let conn_read = WriteBufferConnection {
|
let conn_read = WriteBufferConnection {
|
||||||
direction: WriteBufferDirection::Read.into(),
|
direction: WriteBufferDirection::Read.into(),
|
||||||
..conn_write
|
r#type: "file".to_string(),
|
||||||
|
connection: write_buffer_dir.path().display().to_string(),
|
||||||
|
creation_config: Some(WriteBufferCreationConfig {
|
||||||
|
n_sequencers: 1,
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
};
|
};
|
||||||
DatabaseBuilder::new(db_name.clone())
|
DatabaseBuilder::new(db_name.clone())
|
||||||
.write_buffer(conn_read)
|
.write_buffer(conn_read)
|
||||||
|
|
|
@ -708,8 +708,15 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
|
||||||
let input_schema = input.schema();
|
let input_schema = input.schema();
|
||||||
let output_schema = output_schema.as_arrow();
|
let output_schema = output_schema.as_arrow();
|
||||||
|
|
||||||
// If the schemas are the same, nothing to do
|
// If columns are the same, nothing to do
|
||||||
if input_schema == output_schema {
|
let same_columns = input_schema.fields().len() == output_schema.fields().len()
|
||||||
|
&& input_schema
|
||||||
|
.fields()
|
||||||
|
.iter()
|
||||||
|
.zip(output_schema.fields())
|
||||||
|
.all(|(a, b)| a.name() == b.name());
|
||||||
|
|
||||||
|
if same_columns {
|
||||||
return Ok(input);
|
return Ok(input);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2442,6 +2449,56 @@ mod test {
|
||||||
assert_batches_eq!(&expected, &batch);
|
assert_batches_eq!(&expected, &batch);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_sorted_metadata() {
|
||||||
|
let mut key = SortKey::default();
|
||||||
|
key.push("time", Default::default());
|
||||||
|
|
||||||
|
let chunk = Arc::new(
|
||||||
|
TestChunk::new("t")
|
||||||
|
.with_id(1)
|
||||||
|
.with_time_column()
|
||||||
|
.with_i64_field_column("field_int")
|
||||||
|
.with_one_row_of_data()
|
||||||
|
.with_sort_key(&key),
|
||||||
|
);
|
||||||
|
|
||||||
|
let schema = chunk.schema();
|
||||||
|
assert!(schema.sort_key().is_some());
|
||||||
|
|
||||||
|
let mut provider = ProviderBuilder::new("t", Arc::clone(&schema))
|
||||||
|
.add_no_op_pruner()
|
||||||
|
.add_chunk(chunk)
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
provider.ensure_pk_sort();
|
||||||
|
|
||||||
|
let plan = provider.scan(&None, 1024, &[], None).await.unwrap();
|
||||||
|
let batches = collect(plan).await.unwrap();
|
||||||
|
|
||||||
|
for batch in &batches {
|
||||||
|
// TODO: schema output lacks sort key (#3214)
|
||||||
|
//assert_eq!(batch.schema(), schema.as_arrow())
|
||||||
|
|
||||||
|
let schema: Schema = batch.schema().try_into().unwrap();
|
||||||
|
for field_idx in 0..schema.len() {
|
||||||
|
assert!(schema.field(field_idx).0.is_some());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_batches_eq!(
|
||||||
|
&[
|
||||||
|
"+-----------+-----------------------------+",
|
||||||
|
"| field_int | time |",
|
||||||
|
"+-----------+-----------------------------+",
|
||||||
|
"| 1000 | 1970-01-01T00:00:00.000001Z |",
|
||||||
|
"+-----------+-----------------------------+",
|
||||||
|
],
|
||||||
|
&batches
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
fn chunk_ids(group: &[Arc<TestChunk>]) -> String {
|
fn chunk_ids(group: &[Arc<TestChunk>]) -> String {
|
||||||
let ids = group
|
let ids = group
|
||||||
.iter()
|
.iter()
|
||||||
|
|
|
@ -781,6 +781,16 @@ impl TestChunk {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set the sort key for this chunk
|
||||||
|
pub fn with_sort_key(mut self, sort_key: &SortKey<'_>) -> Self {
|
||||||
|
let mut merger = SchemaMerger::new();
|
||||||
|
merger = merger
|
||||||
|
.merge(self.schema.as_ref())
|
||||||
|
.expect("merging was successful");
|
||||||
|
self.schema = Arc::new(merger.build_with_sort_key(sort_key));
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns all columns of the table
|
/// Returns all columns of the table
|
||||||
pub fn all_column_names(&self) -> StringSet {
|
pub fn all_column_names(&self) -> StringSet {
|
||||||
self.schema
|
self.schema
|
||||||
|
|
|
@ -1,185 +0,0 @@
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use cache_loader_async::cache_api::LoadingCache;
|
|
||||||
use snafu::{ResultExt, Snafu};
|
|
||||||
|
|
||||||
use dml::DmlWrite;
|
|
||||||
use generated_types::influxdata::pbdata::v1::WriteRequest;
|
|
||||||
use influxdb_iox_client::{connection::Builder, write};
|
|
||||||
use observability_deps::tracing::debug;
|
|
||||||
|
|
||||||
type RemoteServerError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
|
||||||
|
|
||||||
#[derive(Debug, Snafu)]
|
|
||||||
pub enum ConnectionManagerError {
|
|
||||||
#[snafu(display("cannot connect to remote: {}", source))]
|
|
||||||
RemoteServerConnectError { source: RemoteServerError },
|
|
||||||
#[snafu(display("cannot write to remote: {}", source))]
|
|
||||||
RemoteServerWriteError { source: write::WriteError },
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The `Server` will ask the `ConnectionManager` for connections to a specific
|
|
||||||
/// remote server. These connections can be used to communicate with other
|
|
||||||
/// servers. This is implemented as a trait for dependency injection in testing.
|
|
||||||
#[async_trait]
|
|
||||||
pub trait ConnectionManager {
|
|
||||||
type RemoteServer: RemoteServer + Send + Sync + 'static;
|
|
||||||
|
|
||||||
async fn remote_server(
|
|
||||||
&self,
|
|
||||||
connect: &str,
|
|
||||||
) -> Result<Arc<Self::RemoteServer>, ConnectionManagerError>;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The `RemoteServer` represents the API for replicating, subscribing, and
|
|
||||||
/// querying other servers.
|
|
||||||
#[async_trait]
|
|
||||||
pub trait RemoteServer {
|
|
||||||
/// Sends a [`DmlWrite`] to the remote server. An IOx server acting as a
|
|
||||||
/// router/sharder will call this method to send entries to remotes.
|
|
||||||
async fn write(&self, db: &str, write: &DmlWrite) -> Result<(), ConnectionManagerError>;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The connection manager maps a host identifier to a remote server.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct ConnectionManagerImpl {
|
|
||||||
cache: LoadingCache<String, Arc<RemoteServerImpl>, CacheFillError>,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Error must be Clone because LoadingCache requires so.
|
|
||||||
#[derive(Debug, Snafu, Clone)]
|
|
||||||
pub enum CacheFillError {
|
|
||||||
#[snafu(display("gRPC error: {}", source))]
|
|
||||||
GrpcError {
|
|
||||||
source: Arc<dyn std::error::Error + Send + Sync + 'static>,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ConnectionManagerImpl {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
let cache = LoadingCache::new(Self::cached_remote_server);
|
|
||||||
Self { cache }
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn cached_remote_server(
|
|
||||||
connect: String,
|
|
||||||
) -> Result<Arc<RemoteServerImpl>, CacheFillError> {
|
|
||||||
let connection = Builder::default()
|
|
||||||
.build(&connect)
|
|
||||||
.await
|
|
||||||
.map_err(|e| Arc::new(e) as _)
|
|
||||||
.context(GrpcError)?;
|
|
||||||
let client = write::Client::new(connection);
|
|
||||||
Ok(Arc::new(RemoteServerImpl { client }))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for ConnectionManagerImpl {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self::new()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl ConnectionManager for ConnectionManagerImpl {
|
|
||||||
type RemoteServer = RemoteServerImpl;
|
|
||||||
|
|
||||||
async fn remote_server(
|
|
||||||
&self,
|
|
||||||
connect: &str,
|
|
||||||
) -> Result<Arc<Self::RemoteServer>, ConnectionManagerError> {
|
|
||||||
let ret = self
|
|
||||||
.cache
|
|
||||||
.get_with_meta(connect.to_string())
|
|
||||||
.await
|
|
||||||
.map_err(|e| Box::new(e) as _)
|
|
||||||
.context(RemoteServerConnectError)?;
|
|
||||||
debug!(was_cached=%ret.cached, %connect, "getting remote connection");
|
|
||||||
Ok(ret.result)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// An implementation for communicating with other IOx servers. This should
|
|
||||||
/// be moved into and implemented in an influxdb_iox_client create at a later
|
|
||||||
/// date.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct RemoteServerImpl {
|
|
||||||
client: write::Client,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl RemoteServer for RemoteServerImpl {
|
|
||||||
/// Sends a write to the remote server. An IOx server acting as a
|
|
||||||
/// router/sharder will call this method to send entries to remotes.
|
|
||||||
async fn write(&self, db_name: &str, write: &DmlWrite) -> Result<(), ConnectionManagerError> {
|
|
||||||
let data = mutable_batch_pb::encode::encode_write(db_name, write);
|
|
||||||
self.client
|
|
||||||
.clone() // cheap, see https://docs.rs/tonic/0.4.2/tonic/client/index.html#concurrent-usage
|
|
||||||
.write_pb(WriteRequest {
|
|
||||||
database_batch: Some(data),
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.context(RemoteServerWriteError)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub mod test_helpers {
|
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
|
||||||
|
|
||||||
use super::*;
|
|
||||||
use std::collections::BTreeMap;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct TestConnectionManager {
|
|
||||||
pub remotes: BTreeMap<String, Arc<TestRemoteServer>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for TestConnectionManager {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self::new()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TestConnectionManager {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
remotes: BTreeMap::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl ConnectionManager for TestConnectionManager {
|
|
||||||
type RemoteServer = TestRemoteServer;
|
|
||||||
|
|
||||||
async fn remote_server(
|
|
||||||
&self,
|
|
||||||
id: &str,
|
|
||||||
) -> Result<Arc<TestRemoteServer>, ConnectionManagerError> {
|
|
||||||
#[derive(Debug, Snafu)]
|
|
||||||
enum TestRemoteError {
|
|
||||||
#[snafu(display("remote not found"))]
|
|
||||||
NotFound,
|
|
||||||
}
|
|
||||||
Ok(Arc::clone(self.remotes.get(id).ok_or_else(|| {
|
|
||||||
ConnectionManagerError::RemoteServerConnectError {
|
|
||||||
source: Box::new(TestRemoteError::NotFound),
|
|
||||||
}
|
|
||||||
})?))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct TestRemoteServer {
|
|
||||||
pub written: Arc<AtomicBool>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<'a> RemoteServer for TestRemoteServer {
|
|
||||||
async fn write(&self, _db: &str, _write: &DmlWrite) -> Result<(), ConnectionManagerError> {
|
|
||||||
self.written.store(true, Ordering::Relaxed);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -973,7 +973,7 @@ impl Db {
|
||||||
/// Stores the write on this [`Db`] and/or routes it to the write buffer
|
/// Stores the write on this [`Db`] and/or routes it to the write buffer
|
||||||
///
|
///
|
||||||
/// TODO: Remove this method (#2243)
|
/// TODO: Remove this method (#2243)
|
||||||
pub async fn route_operation(&self, operation: &DmlOperation) -> Result<()> {
|
pub(crate) async fn route_operation(&self, operation: &DmlOperation) -> Result<()> {
|
||||||
let immutable = {
|
let immutable = {
|
||||||
let rules = self.rules.read();
|
let rules = self.rules.read();
|
||||||
rules.lifecycle_rules.immutable
|
rules.lifecycle_rules.immutable
|
||||||
|
@ -1023,12 +1023,12 @@ impl Db {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Writes the provided [`DmlWrite`] to this database
|
/// Writes the provided [`DmlWrite`] to this database
|
||||||
pub fn store_write(&self, db_write: &DmlWrite) -> Result<()> {
|
pub(crate) fn store_write(&self, db_write: &DmlWrite) -> Result<()> {
|
||||||
self.store_filtered_write(db_write, WriteFilterNone::default())
|
self.store_filtered_write(db_write, WriteFilterNone::default())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Writes the provided [`DmlWrite`] to this database with the provided [`WriteFilter`]
|
/// Writes the provided [`DmlWrite`] to this database with the provided [`WriteFilter`]
|
||||||
pub fn store_filtered_write(
|
pub(crate) fn store_filtered_write(
|
||||||
&self,
|
&self,
|
||||||
db_write: &DmlWrite,
|
db_write: &DmlWrite,
|
||||||
filter: impl WriteFilter,
|
filter: impl WriteFilter,
|
||||||
|
|
|
@ -70,10 +70,8 @@
|
||||||
|
|
||||||
use ::lifecycle::{LockableChunk, LockablePartition};
|
use ::lifecycle::{LockableChunk, LockablePartition};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use connection::{ConnectionManager, RemoteServer};
|
|
||||||
use data_types::{
|
use data_types::{
|
||||||
chunk_metadata::ChunkId,
|
chunk_metadata::ChunkId,
|
||||||
database_rules::{NodeGroup, RoutingRules, Sink},
|
|
||||||
detailed_database::ActiveDatabase,
|
detailed_database::ActiveDatabase,
|
||||||
error::ErrorLogger,
|
error::ErrorLogger,
|
||||||
job::Job,
|
job::Job,
|
||||||
|
@ -88,8 +86,6 @@ use internal_types::freezable::Freezable;
|
||||||
use iox_object_store::IoxObjectStore;
|
use iox_object_store::IoxObjectStore;
|
||||||
use observability_deps::tracing::{error, info, warn};
|
use observability_deps::tracing::{error, info, warn};
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use rand::seq::SliceRandom;
|
|
||||||
use resolver::Resolver;
|
|
||||||
use snafu::{ensure, OptionExt, ResultExt, Snafu};
|
use snafu::{ensure, OptionExt, ResultExt, Snafu};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::{sync::Notify, task::JoinError};
|
use tokio::{sync::Notify, task::JoinError};
|
||||||
|
@ -99,16 +95,12 @@ use uuid::Uuid;
|
||||||
|
|
||||||
pub use application::ApplicationState;
|
pub use application::ApplicationState;
|
||||||
pub use db::Db;
|
pub use db::Db;
|
||||||
use dml::{DmlOperation, DmlWrite};
|
|
||||||
pub use job::JobRegistry;
|
pub use job::JobRegistry;
|
||||||
pub use resolver::{GrpcConnectionString, RemoteTemplate};
|
|
||||||
|
|
||||||
mod application;
|
mod application;
|
||||||
pub mod connection;
|
|
||||||
pub mod database;
|
pub mod database;
|
||||||
pub mod db;
|
pub mod db;
|
||||||
mod job;
|
mod job;
|
||||||
mod resolver;
|
|
||||||
|
|
||||||
pub mod rules;
|
pub mod rules;
|
||||||
use rules::{PersistedDatabaseRules, ProvidedDatabaseRules};
|
use rules::{PersistedDatabaseRules, ProvidedDatabaseRules};
|
||||||
|
@ -182,9 +174,6 @@ pub enum Error {
|
||||||
current: Uuid,
|
current: Uuid,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display("Server error: {}", source))]
|
|
||||||
ServerError { source: std::io::Error },
|
|
||||||
|
|
||||||
#[snafu(display("invalid database: {}", source))]
|
#[snafu(display("invalid database: {}", source))]
|
||||||
InvalidDatabaseName { source: DatabaseNameError },
|
InvalidDatabaseName { source: DatabaseNameError },
|
||||||
|
|
||||||
|
@ -209,61 +198,11 @@ pub enum Error {
|
||||||
table: String,
|
table: String,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display("hard buffer limit reached"))]
|
|
||||||
HardLimitReached {},
|
|
||||||
|
|
||||||
#[snafu(display(
|
|
||||||
"Storing database write failed with the following error(s), and possibly more: {}",
|
|
||||||
errors.iter().map(ToString::to_string).collect::<Vec<_>>().join(", ")
|
|
||||||
))]
|
|
||||||
StoreWriteErrors { errors: Vec<DatabaseError> },
|
|
||||||
|
|
||||||
#[snafu(display(
|
|
||||||
"Cannot write to database {}, it's configured to only read from the write buffer",
|
|
||||||
db_name
|
|
||||||
))]
|
|
||||||
WritingOnlyAllowedThroughWriteBuffer { db_name: String },
|
|
||||||
|
|
||||||
#[snafu(display("Cannot write to write buffer: {}", source))]
|
|
||||||
WriteBuffer {
|
|
||||||
source: Box<dyn std::error::Error + Sync + Send>,
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display("no remote configured for node group: {:?}", node_group))]
|
|
||||||
NoRemoteConfigured { node_group: NodeGroup },
|
|
||||||
|
|
||||||
#[snafu(display("all remotes failed connecting: {:?}", errors))]
|
|
||||||
NoRemoteReachable {
|
|
||||||
errors: HashMap<GrpcConnectionString, connection::ConnectionManagerError>,
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display("remote error: {}", source))]
|
|
||||||
RemoteError {
|
|
||||||
source: connection::ConnectionManagerError,
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display("database failed to initialize: {}", source))]
|
#[snafu(display("database failed to initialize: {}", source))]
|
||||||
DatabaseInit { source: Arc<database::InitError> },
|
DatabaseInit { source: Arc<database::InitError> },
|
||||||
|
|
||||||
#[snafu(display(
|
|
||||||
"Either invalid time range [{}, {}] or invalid delete expression {}",
|
|
||||||
start_time,
|
|
||||||
stop_time,
|
|
||||||
predicate
|
|
||||||
))]
|
|
||||||
DeleteExpression {
|
|
||||||
start_time: String,
|
|
||||||
stop_time: String,
|
|
||||||
predicate: String,
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display("error persisting server config to object storage: {}", source))]
|
#[snafu(display("error persisting server config to object storage: {}", source))]
|
||||||
PersistServerConfig { source: object_store::Error },
|
PersistServerConfig { source: object_store::Error },
|
||||||
|
|
||||||
#[snafu(display("Error sharding write: {}", source))]
|
|
||||||
ShardWrite {
|
|
||||||
source: data_types::database_rules::Error,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
|
@ -289,8 +228,6 @@ pub trait DatabaseStore: std::fmt::Debug + Send + Sync {
|
||||||
/// Configuration options for `Server`
|
/// Configuration options for `Server`
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct ServerConfig {
|
pub struct ServerConfig {
|
||||||
pub remote_template: Option<RemoteTemplate>,
|
|
||||||
|
|
||||||
pub wipe_catalog_on_error: bool,
|
pub wipe_catalog_on_error: bool,
|
||||||
|
|
||||||
pub skip_replay_and_seek_instead: bool,
|
pub skip_replay_and_seek_instead: bool,
|
||||||
|
@ -299,7 +236,6 @@ pub struct ServerConfig {
|
||||||
impl Default for ServerConfig {
|
impl Default for ServerConfig {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
remote_template: None,
|
|
||||||
wipe_catalog_on_error: false,
|
wipe_catalog_on_error: false,
|
||||||
skip_replay_and_seek_instead: false,
|
skip_replay_and_seek_instead: false,
|
||||||
}
|
}
|
||||||
|
@ -310,20 +246,15 @@ impl Default for ServerConfig {
|
||||||
/// well as how they communicate with other servers. Each server will have one
|
/// well as how they communicate with other servers. Each server will have one
|
||||||
/// of these structs, which keeps track of all replication and query rules.
|
/// of these structs, which keeps track of all replication and query rules.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Server<M: ConnectionManager> {
|
pub struct Server {
|
||||||
connection_manager: Arc<M>,
|
|
||||||
|
|
||||||
/// Future that resolves when the background worker exits
|
/// Future that resolves when the background worker exits
|
||||||
join: Shared<BoxFuture<'static, Result<(), Arc<JoinError>>>>,
|
join: Shared<BoxFuture<'static, Result<(), Arc<JoinError>>>>,
|
||||||
|
|
||||||
/// Resolver for mapping ServerId to gRPC connection strings
|
|
||||||
resolver: RwLock<Resolver>,
|
|
||||||
|
|
||||||
/// State shared with the background worker
|
/// State shared with the background worker
|
||||||
shared: Arc<ServerShared>,
|
shared: Arc<ServerShared>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: ConnectionManager> Drop for Server<M> {
|
impl Drop for Server {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if !self.shared.shutdown.is_cancelled() {
|
if !self.shared.shutdown.is_cancelled() {
|
||||||
warn!("server dropped without calling shutdown()");
|
warn!("server dropped without calling shutdown()");
|
||||||
|
@ -522,15 +453,8 @@ impl ServerStateInitialized {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M> Server<M>
|
impl Server {
|
||||||
where
|
pub fn new(application: Arc<ApplicationState>, config: ServerConfig) -> Self {
|
||||||
M: ConnectionManager + Send + Sync,
|
|
||||||
{
|
|
||||||
pub fn new(
|
|
||||||
connection_manager: M,
|
|
||||||
application: Arc<ApplicationState>,
|
|
||||||
config: ServerConfig,
|
|
||||||
) -> Self {
|
|
||||||
let shared = Arc::new(ServerShared {
|
let shared = Arc::new(ServerShared {
|
||||||
shutdown: Default::default(),
|
shutdown: Default::default(),
|
||||||
application,
|
application,
|
||||||
|
@ -544,12 +468,7 @@ where
|
||||||
let handle = tokio::spawn(background_worker(Arc::clone(&shared)));
|
let handle = tokio::spawn(background_worker(Arc::clone(&shared)));
|
||||||
let join = handle.map_err(Arc::new).boxed().shared();
|
let join = handle.map_err(Arc::new).boxed().shared();
|
||||||
|
|
||||||
Self {
|
Self { shared, join }
|
||||||
shared,
|
|
||||||
join,
|
|
||||||
connection_manager: Arc::new(connection_manager),
|
|
||||||
resolver: RwLock::new(Resolver::new(config.remote_template)),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// sets the id of the server, which is used for replication and the base
|
/// sets the id of the server, which is used for replication and the base
|
||||||
|
@ -901,141 +820,6 @@ where
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// `write_lines` takes in raw line protocol and converts it to a collection
|
|
||||||
/// of ShardedEntry which are then sent to other IOx servers based on
|
|
||||||
/// the ShardConfig or sent to the local database for buffering in the
|
|
||||||
/// WriteBuffer and/or the MutableBuffer if configured.
|
|
||||||
///
|
|
||||||
/// The provided `default_time` is nanoseconds since the epoch and will be assigned
|
|
||||||
/// to any lines that don't have a timestamp.
|
|
||||||
///
|
|
||||||
/// TODO: Replace with dedicated router in terms of MutableBatch
|
|
||||||
pub async fn write(&self, db_name: &DatabaseName<'_>, write: DmlWrite) -> Result<()> {
|
|
||||||
let db = self.db(db_name)?;
|
|
||||||
let rules = db.rules();
|
|
||||||
|
|
||||||
let sharded_writes = match &rules.routing_rules {
|
|
||||||
Some(RoutingRules::ShardConfig(shard_config)) => {
|
|
||||||
let sharded_writes = write.shard(shard_config).context(ShardWrite)?;
|
|
||||||
itertools::Either::Left(sharded_writes.into_iter().map(|(s, w)| (Some(s), w)))
|
|
||||||
}
|
|
||||||
_ => itertools::Either::Right(std::iter::once((None, write))),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Write to all shards in parallel; as soon as one fails return error
|
|
||||||
// immediately to the client and abort all other outstanding requests.
|
|
||||||
futures_util::future::try_join_all(sharded_writes.map(|(shard, write)| {
|
|
||||||
let sink = match &rules.routing_rules {
|
|
||||||
Some(RoutingRules::ShardConfig(shard_config)) => {
|
|
||||||
let id = shard.expect("sharded entry");
|
|
||||||
Some(shard_config.shards.get(&id).expect("valid shard"))
|
|
||||||
}
|
|
||||||
Some(RoutingRules::RoutingConfig(config)) => Some(&config.sink),
|
|
||||||
None => None,
|
|
||||||
};
|
|
||||||
|
|
||||||
async move {
|
|
||||||
match sink {
|
|
||||||
Some(sink) => self.write_sink(db_name, sink, write).await,
|
|
||||||
None => self.write_local(db_name, &DmlOperation::Write(write)).await,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}))
|
|
||||||
.await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn write_sink(
|
|
||||||
&self,
|
|
||||||
db_name: &DatabaseName<'_>,
|
|
||||||
sink: &Sink,
|
|
||||||
write: DmlWrite,
|
|
||||||
) -> Result<()> {
|
|
||||||
match sink {
|
|
||||||
Sink::Iox(node_group) => self.write_downstream(db_name, node_group, &write).await,
|
|
||||||
Sink::Kafka(_) => {
|
|
||||||
// The write buffer write path is currently implemented in "db", so confusingly we
|
|
||||||
// need to invoke write_entry_local.
|
|
||||||
// TODO(mkm): tracked in #2134
|
|
||||||
self.write_local(db_name, &DmlOperation::Write(write)).await
|
|
||||||
}
|
|
||||||
Sink::DevNull => {
|
|
||||||
// write is silently ignored, as requested by the configuration.
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn write_downstream(
|
|
||||||
&self,
|
|
||||||
db_name: &str,
|
|
||||||
node_group: &[ServerId],
|
|
||||||
write: &DmlWrite,
|
|
||||||
) -> Result<()> {
|
|
||||||
// Return an error if this server is not yet ready
|
|
||||||
self.shared.state.read().initialized()?;
|
|
||||||
|
|
||||||
let addrs: Vec<_> = {
|
|
||||||
let resolver = self.resolver.read();
|
|
||||||
node_group
|
|
||||||
.iter()
|
|
||||||
.filter_map(|&node| resolver.resolve_remote(node))
|
|
||||||
.collect()
|
|
||||||
};
|
|
||||||
|
|
||||||
if addrs.is_empty() {
|
|
||||||
return NoRemoteConfigured { node_group }.fail();
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut errors = HashMap::new();
|
|
||||||
// this needs to be in its own statement because rand::thread_rng is not Send and the loop below is async.
|
|
||||||
// braces around the expression would work but clippy don't know that and complains the braces are useless.
|
|
||||||
let random_addrs_iter = addrs.choose_multiple(&mut rand::thread_rng(), addrs.len());
|
|
||||||
for addr in random_addrs_iter {
|
|
||||||
match self.connection_manager.remote_server(addr).await {
|
|
||||||
Err(err) => {
|
|
||||||
info!("error obtaining remote for {}: {}", addr, err);
|
|
||||||
errors.insert(addr.to_owned(), err);
|
|
||||||
}
|
|
||||||
Ok(remote) => return remote.write(db_name, write).await.context(RemoteError),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
NoRemoteReachable { errors }.fail()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Write an entry to the local `Db`
|
|
||||||
///
|
|
||||||
/// TODO: Remove this and migrate callers to `Database::route_write`
|
|
||||||
async fn write_local(
|
|
||||||
&self,
|
|
||||||
db_name: &DatabaseName<'_>,
|
|
||||||
operation: &DmlOperation,
|
|
||||||
) -> Result<()> {
|
|
||||||
use database::WriteError;
|
|
||||||
|
|
||||||
self.active_database(db_name)?
|
|
||||||
.route_operation(operation)
|
|
||||||
.await
|
|
||||||
.map_err(|e| match e {
|
|
||||||
WriteError::NotInitialized { .. } => Error::DatabaseNotInitialized {
|
|
||||||
db_name: db_name.to_string(),
|
|
||||||
},
|
|
||||||
WriteError::WriteBuffer { source } => Error::WriteBuffer { source },
|
|
||||||
WriteError::WritingOnlyAllowedThroughWriteBuffer => {
|
|
||||||
Error::WritingOnlyAllowedThroughWriteBuffer {
|
|
||||||
db_name: db_name.to_string(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
WriteError::DbError { source } => Error::UnknownDatabaseError {
|
|
||||||
source: Box::new(source),
|
|
||||||
},
|
|
||||||
WriteError::HardLimitReached { .. } => Error::HardLimitReached {},
|
|
||||||
WriteError::StoreWriteErrors { errors } => Error::StoreWriteErrors {
|
|
||||||
errors: errors.into_iter().map(|e| Box::new(e) as _).collect(),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Update database rules and save on success.
|
/// Update database rules and save on success.
|
||||||
pub async fn update_db_rules(
|
pub async fn update_db_rules(
|
||||||
&self,
|
&self,
|
||||||
|
@ -1051,18 +835,6 @@ where
|
||||||
.context(CanNotUpdateRules { db_name })?)
|
.context(CanNotUpdateRules { db_name })?)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn remotes_sorted(&self) -> Vec<(ServerId, String)> {
|
|
||||||
self.resolver.read().remotes_sorted()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn update_remote(&self, id: ServerId, addr: GrpcConnectionString) {
|
|
||||||
self.resolver.write().update_remote(id, addr)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn delete_remote(&self, id: ServerId) -> Option<GrpcConnectionString> {
|
|
||||||
self.resolver.write().delete_remote(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Closes a chunk and starts moving its data to the read buffer, as a
|
/// Closes a chunk and starts moving its data to the read buffer, as a
|
||||||
/// background job, dropping when complete.
|
/// background job, dropping when complete.
|
||||||
pub fn close_chunk(
|
pub fn close_chunk(
|
||||||
|
@ -1306,10 +1078,7 @@ async fn maybe_initialize_server(shared: &ServerShared) {
|
||||||
|
|
||||||
/// TODO: Revisit this trait's API
|
/// TODO: Revisit this trait's API
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<M> DatabaseStore for Server<M>
|
impl DatabaseStore for Server {
|
||||||
where
|
|
||||||
M: ConnectionManager + std::fmt::Debug + Send + Sync,
|
|
||||||
{
|
|
||||||
type Database = Db;
|
type Database = Db;
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
|
|
||||||
|
@ -1339,10 +1108,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
impl<M> Server<M>
|
impl Server {
|
||||||
where
|
|
||||||
M: ConnectionManager + Send + Sync,
|
|
||||||
{
|
|
||||||
/// For tests: list of database names in this server, regardless
|
/// For tests: list of database names in this server, regardless
|
||||||
/// of their initialization state
|
/// of their initialization state
|
||||||
fn db_names_sorted(&self) -> Vec<String> {
|
fn db_names_sorted(&self) -> Vec<String> {
|
||||||
|
@ -1409,7 +1175,6 @@ async fn database_name_from_rules_file(
|
||||||
|
|
||||||
pub mod test_utils {
|
pub mod test_utils {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::connection::test_helpers::TestConnectionManager;
|
|
||||||
use object_store::ObjectStore;
|
use object_store::ObjectStore;
|
||||||
|
|
||||||
/// Create a new [`ApplicationState`] with an in-memory object store
|
/// Create a new [`ApplicationState`] with an in-memory object store
|
||||||
|
@ -1422,12 +1187,8 @@ pub mod test_utils {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates a new server with the provided [`ApplicationState`]
|
/// Creates a new server with the provided [`ApplicationState`]
|
||||||
pub fn make_server(application: Arc<ApplicationState>) -> Arc<Server<TestConnectionManager>> {
|
pub fn make_server(application: Arc<ApplicationState>) -> Arc<Server> {
|
||||||
Arc::new(Server::new(
|
Arc::new(Server::new(application, Default::default()))
|
||||||
TestConnectionManager::new(),
|
|
||||||
application,
|
|
||||||
Default::default(),
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates a new server with the provided [`ApplicationState`]
|
/// Creates a new server with the provided [`ApplicationState`]
|
||||||
|
@ -1436,7 +1197,7 @@ pub mod test_utils {
|
||||||
pub async fn make_initialized_server(
|
pub async fn make_initialized_server(
|
||||||
server_id: ServerId,
|
server_id: ServerId,
|
||||||
application: Arc<ApplicationState>,
|
application: Arc<ApplicationState>,
|
||||||
) -> Arc<Server<TestConnectionManager>> {
|
) -> Arc<Server> {
|
||||||
let server = make_server(application);
|
let server = make_server(application);
|
||||||
server.set_id(server_id).unwrap();
|
server.set_id(server_id).unwrap();
|
||||||
server.wait_for_init().await.unwrap();
|
server.wait_for_init().await.unwrap();
|
||||||
|
@ -1450,18 +1211,13 @@ mod tests {
|
||||||
test_utils::{make_application, make_server},
|
test_utils::{make_application, make_server},
|
||||||
*,
|
*,
|
||||||
};
|
};
|
||||||
use arrow::record_batch::RecordBatch;
|
|
||||||
use arrow_util::assert_batches_eq;
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use connection::test_helpers::{TestConnectionManager, TestRemoteServer};
|
|
||||||
use data_types::{
|
use data_types::{
|
||||||
chunk_metadata::{ChunkAddr, ChunkStorage},
|
chunk_metadata::{ChunkAddr, ChunkStorage},
|
||||||
database_rules::{
|
database_rules::{DatabaseRules, LifecycleRules, PartitionTemplate, TemplatePart},
|
||||||
DatabaseRules, HashRing, LifecycleRules, PartitionTemplate, ShardConfig, ShardId,
|
|
||||||
TemplatePart,
|
|
||||||
},
|
|
||||||
write_buffer::{WriteBufferConnection, WriteBufferDirection},
|
write_buffer::{WriteBufferConnection, WriteBufferDirection},
|
||||||
};
|
};
|
||||||
|
use dml::DmlWrite;
|
||||||
use iox_object_store::IoxObjectStore;
|
use iox_object_store::IoxObjectStore;
|
||||||
use mutable_batch_lp::lines_to_batches;
|
use mutable_batch_lp::lines_to_batches;
|
||||||
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
|
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
|
||||||
|
@ -1469,13 +1225,10 @@ mod tests {
|
||||||
core::{PreservedCatalog, PreservedCatalogConfig},
|
core::{PreservedCatalog, PreservedCatalogConfig},
|
||||||
test_helpers::{load_ok, new_empty},
|
test_helpers::{load_ok, new_empty},
|
||||||
};
|
};
|
||||||
use query::{exec::ExecutionContextProvider, frontend::sql::SqlQueryPlanner, QueryDatabase};
|
use query::QueryDatabase;
|
||||||
use std::{
|
use std::{
|
||||||
convert::TryFrom,
|
convert::TryFrom,
|
||||||
sync::{
|
sync::Arc,
|
||||||
atomic::{AtomicBool, Ordering},
|
|
||||||
Arc,
|
|
||||||
},
|
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
use test_helpers::{assert_contains, assert_error};
|
use test_helpers::{assert_contains, assert_error};
|
||||||
|
@ -1484,12 +1237,7 @@ mod tests {
|
||||||
async fn server_api_calls_return_error_with_no_id_set() {
|
async fn server_api_calls_return_error_with_no_id_set() {
|
||||||
let server = make_server(make_application());
|
let server = make_server(make_application());
|
||||||
|
|
||||||
let tables = lines_to_batches("cpu foo=1 10", 0).unwrap();
|
let resp = server.db(&DatabaseName::new("foo").unwrap()).unwrap_err();
|
||||||
let write = DmlWrite::new(tables, Default::default());
|
|
||||||
let resp = server
|
|
||||||
.write(&DatabaseName::new("foo").unwrap(), write)
|
|
||||||
.await
|
|
||||||
.unwrap_err();
|
|
||||||
assert!(matches!(resp, Error::IdNotSet));
|
assert!(matches!(resp, Error::IdNotSet));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1661,13 +1409,10 @@ mod tests {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn create_simple_database<M>(
|
async fn create_simple_database(
|
||||||
server: &Server<M>,
|
server: &Server,
|
||||||
name: impl Into<String> + Send,
|
name: impl Into<String> + Send,
|
||||||
) -> Result<Arc<Database>>
|
) -> Result<Arc<Database>> {
|
||||||
where
|
|
||||||
M: ConnectionManager + Send + Sync,
|
|
||||||
{
|
|
||||||
let name = DatabaseName::new(name.into()).unwrap();
|
let name = DatabaseName::new(name.into()).unwrap();
|
||||||
|
|
||||||
let rules = DatabaseRules {
|
let rules = DatabaseRules {
|
||||||
|
@ -1984,136 +1729,6 @@ mod tests {
|
||||||
assert_eq!(names, db_names_sorted);
|
assert_eq!(names, db_names_sorted);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn writes_local() {
|
|
||||||
let server = make_server(make_application());
|
|
||||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
|
||||||
server.wait_for_init().await.unwrap();
|
|
||||||
|
|
||||||
let db_name = DatabaseName::new("foo".to_string()).unwrap();
|
|
||||||
server
|
|
||||||
.create_database(default_rules(db_name.clone()))
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
|
||||||
let write = DmlWrite::new(tables, Default::default());
|
|
||||||
server.write(&db_name, write).await.unwrap();
|
|
||||||
|
|
||||||
let db_name = DatabaseName::new("foo").unwrap();
|
|
||||||
let db = server.db(&db_name).unwrap();
|
|
||||||
let batches = run_query(db, "select * from cpu").await;
|
|
||||||
|
|
||||||
let expected = vec![
|
|
||||||
"+-----+--------------------------------+",
|
|
||||||
"| bar | time |",
|
|
||||||
"+-----+--------------------------------+",
|
|
||||||
"| 1 | 1970-01-01T00:00:00.000000010Z |",
|
|
||||||
"+-----+--------------------------------+",
|
|
||||||
];
|
|
||||||
assert_batches_eq!(expected, &batches);
|
|
||||||
}
|
|
||||||
|
|
||||||
// This tests sets up a database with a sharding config which defines exactly one shard
|
|
||||||
// backed by 3 remote nodes. One of the nodes is modeled to be "down", while the other two
|
|
||||||
// can record write entry events.
|
|
||||||
// This tests goes through a few trivial error cases before checking that the both working
|
|
||||||
// mock remote servers actually receive write entry events.
|
|
||||||
//
|
|
||||||
// This test is theoretically flaky, low probability though (in the order of 1e-30)
|
|
||||||
#[tokio::test]
|
|
||||||
async fn write_entry_downstream() {
|
|
||||||
const TEST_SHARD_ID: ShardId = 1;
|
|
||||||
const GOOD_REMOTE_ADDR_1: &str = "http://localhost:111";
|
|
||||||
const GOOD_REMOTE_ADDR_2: &str = "http://localhost:222";
|
|
||||||
const BAD_REMOTE_ADDR: &str = "http://localhost:666";
|
|
||||||
|
|
||||||
let good_remote_id_1 = ServerId::try_from(1).unwrap();
|
|
||||||
let good_remote_id_2 = ServerId::try_from(2).unwrap();
|
|
||||||
let bad_remote_id = ServerId::try_from(666).unwrap();
|
|
||||||
|
|
||||||
let mut manager = TestConnectionManager::new();
|
|
||||||
let written_1 = Arc::new(AtomicBool::new(false));
|
|
||||||
manager.remotes.insert(
|
|
||||||
GOOD_REMOTE_ADDR_1.to_owned(),
|
|
||||||
Arc::new(TestRemoteServer {
|
|
||||||
written: Arc::clone(&written_1),
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
let written_2 = Arc::new(AtomicBool::new(false));
|
|
||||||
manager.remotes.insert(
|
|
||||||
GOOD_REMOTE_ADDR_2.to_owned(),
|
|
||||||
Arc::new(TestRemoteServer {
|
|
||||||
written: Arc::clone(&written_2),
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
|
|
||||||
let server = Server::new(manager, make_application(), Default::default());
|
|
||||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
|
||||||
server.wait_for_init().await.unwrap();
|
|
||||||
|
|
||||||
let db_name = DatabaseName::new("foo").unwrap();
|
|
||||||
server
|
|
||||||
.create_database(default_rules(db_name.clone()))
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let remote_ids = vec![bad_remote_id, good_remote_id_1, good_remote_id_2];
|
|
||||||
let db = server.db(&db_name).unwrap();
|
|
||||||
|
|
||||||
let shard_config = ShardConfig {
|
|
||||||
hash_ring: Some(HashRing {
|
|
||||||
shards: vec![TEST_SHARD_ID].into(),
|
|
||||||
}),
|
|
||||||
shards: vec![(TEST_SHARD_ID, Sink::Iox(remote_ids.clone()))]
|
|
||||||
.into_iter()
|
|
||||||
.collect(),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut rules = db.rules().as_ref().clone();
|
|
||||||
rules.routing_rules = Some(RoutingRules::ShardConfig(shard_config));
|
|
||||||
let rules = Arc::new(rules);
|
|
||||||
|
|
||||||
db.update_rules(rules);
|
|
||||||
|
|
||||||
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
|
||||||
let write = DmlWrite::new(tables, Default::default());
|
|
||||||
let err = server.write(&db_name, write.clone()).await.unwrap_err();
|
|
||||||
assert!(
|
|
||||||
matches!(err, Error::NoRemoteConfigured { node_group } if node_group == remote_ids)
|
|
||||||
);
|
|
||||||
|
|
||||||
// one remote is configured but it's down and we'll get connection error
|
|
||||||
server.update_remote(bad_remote_id, BAD_REMOTE_ADDR.into());
|
|
||||||
let err = server.write(&db_name, write.clone()).await.unwrap_err();
|
|
||||||
assert!(matches!(
|
|
||||||
err,
|
|
||||||
Error::NoRemoteReachable { errors } if matches!(
|
|
||||||
errors[BAD_REMOTE_ADDR],
|
|
||||||
connection::ConnectionManagerError::RemoteServerConnectError {..}
|
|
||||||
)
|
|
||||||
));
|
|
||||||
assert!(!written_1.load(Ordering::Relaxed));
|
|
||||||
assert!(!written_2.load(Ordering::Relaxed));
|
|
||||||
|
|
||||||
// We configure the address for the other remote, this time connection will succeed
|
|
||||||
// despite the bad remote failing to connect.
|
|
||||||
server.update_remote(good_remote_id_1, GOOD_REMOTE_ADDR_1.into());
|
|
||||||
server.update_remote(good_remote_id_2, GOOD_REMOTE_ADDR_2.into());
|
|
||||||
|
|
||||||
// Remotes are tried in random order, so we need to repeat the test a few times to have a reasonable
|
|
||||||
// probability both the remotes will get hit.
|
|
||||||
for _ in 0..100 {
|
|
||||||
server
|
|
||||||
.write(&db_name, write.clone())
|
|
||||||
.await
|
|
||||||
.expect("cannot write lines");
|
|
||||||
}
|
|
||||||
assert!(written_1.load(Ordering::Relaxed));
|
|
||||||
assert!(written_2.load(Ordering::Relaxed));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn close_chunk() {
|
async fn close_chunk() {
|
||||||
test_helpers::maybe_start_logging();
|
test_helpers::maybe_start_logging();
|
||||||
|
@ -2129,11 +1744,11 @@ mod tests {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
||||||
|
let db = server.db(&db_name).unwrap();
|
||||||
let write = DmlWrite::new(tables, Default::default());
|
let write = DmlWrite::new(tables, Default::default());
|
||||||
server.write(&db_name, write).await.unwrap();
|
db.store_write(&write).unwrap();
|
||||||
|
|
||||||
// get chunk ID
|
// get chunk ID
|
||||||
let db = server.db(&db_name).unwrap();
|
|
||||||
let chunks = db.chunk_summaries().unwrap();
|
let chunks = db.chunk_summaries().unwrap();
|
||||||
assert_eq!(chunks.len(), 1);
|
assert_eq!(chunks.len(), 1);
|
||||||
let chunk_id = chunks[0].id;
|
let chunk_id = chunks[0].id;
|
||||||
|
@ -2191,39 +1806,6 @@ mod tests {
|
||||||
server.join().await.unwrap();
|
server.join().await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn hard_buffer_limit() {
|
|
||||||
let server = make_server(make_application());
|
|
||||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
|
||||||
server.wait_for_init().await.unwrap();
|
|
||||||
|
|
||||||
let name = DatabaseName::new("foo").unwrap();
|
|
||||||
server
|
|
||||||
.create_database(default_rules(name.clone()))
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let db = server.db(&name).unwrap();
|
|
||||||
|
|
||||||
let mut rules: DatabaseRules = db.rules().as_ref().clone();
|
|
||||||
|
|
||||||
rules.lifecycle_rules.buffer_size_hard = Some(std::num::NonZeroUsize::new(10).unwrap());
|
|
||||||
|
|
||||||
let rules = Arc::new(rules);
|
|
||||||
db.update_rules(Arc::clone(&rules));
|
|
||||||
|
|
||||||
// inserting first line does not trigger hard buffer limit
|
|
||||||
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
|
||||||
let write = DmlWrite::new(tables, Default::default());
|
|
||||||
server.write(&name, write).await.unwrap();
|
|
||||||
|
|
||||||
// inserting second line will
|
|
||||||
let tables = lines_to_batches("cpu bar=2 20", 0).unwrap();
|
|
||||||
let write = DmlWrite::new(tables, Default::default());
|
|
||||||
let res = server.write(&name, write).await.unwrap_err();
|
|
||||||
assert!(matches!(res, super::Error::HardLimitReached {}));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn cannot_create_db_until_server_is_initialized() {
|
async fn cannot_create_db_until_server_is_initialized() {
|
||||||
let server = make_server(make_application());
|
let server = make_server(make_application());
|
||||||
|
@ -2372,9 +1954,13 @@ mod tests {
|
||||||
// can only write to successfully created DBs
|
// can only write to successfully created DBs
|
||||||
let tables = lines_to_batches("cpu foo=1 10", 0).unwrap();
|
let tables = lines_to_batches("cpu foo=1 10", 0).unwrap();
|
||||||
let write = DmlWrite::new(tables, Default::default());
|
let write = DmlWrite::new(tables, Default::default());
|
||||||
server.write(&foo_db_name, write.clone()).await.unwrap();
|
server
|
||||||
|
.db(&foo_db_name)
|
||||||
|
.unwrap()
|
||||||
|
.store_write(&write)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
let err = server.write(&bar_db_name, write).await.unwrap_err();
|
let err = server.db(&bar_db_name).unwrap_err();
|
||||||
assert!(matches!(err, Error::DatabaseNotInitialized { .. }));
|
assert!(matches!(err, Error::DatabaseNotInitialized { .. }));
|
||||||
|
|
||||||
// creating failed DBs does not work
|
// creating failed DBs does not work
|
||||||
|
@ -2766,10 +2352,10 @@ mod tests {
|
||||||
);
|
);
|
||||||
assert!(database.init_error().is_none());
|
assert!(database.init_error().is_none());
|
||||||
|
|
||||||
assert!(server.db(&db_name_catalog_broken).is_ok());
|
let db = server.db(&db_name_catalog_broken).unwrap();
|
||||||
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
||||||
let write = DmlWrite::new(tables, Default::default());
|
let write = DmlWrite::new(tables, Default::default());
|
||||||
server.write(&db_name_catalog_broken, write).await.unwrap();
|
db.store_write(&write).unwrap();
|
||||||
|
|
||||||
// 5. cannot wipe if DB was just created
|
// 5. cannot wipe if DB was just created
|
||||||
let created = server
|
let created = server
|
||||||
|
@ -2792,56 +2378,6 @@ mod tests {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn write_buffer_errors_propagate() {
|
|
||||||
let application = make_application();
|
|
||||||
|
|
||||||
application
|
|
||||||
.write_buffer_factory()
|
|
||||||
.register_always_fail_mock("my_mock".to_string());
|
|
||||||
|
|
||||||
let server = make_server(application);
|
|
||||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
|
||||||
server.wait_for_init().await.unwrap();
|
|
||||||
|
|
||||||
let db_name = DatabaseName::new("my_db").unwrap();
|
|
||||||
let rules = DatabaseRules {
|
|
||||||
name: db_name.clone(),
|
|
||||||
partition_template: PartitionTemplate {
|
|
||||||
parts: vec![TemplatePart::TimeFormat("YYYY-MM".to_string())],
|
|
||||||
},
|
|
||||||
lifecycle_rules: Default::default(),
|
|
||||||
routing_rules: None,
|
|
||||||
worker_cleanup_avg_sleep: Duration::from_secs(2),
|
|
||||||
write_buffer_connection: Some(WriteBufferConnection {
|
|
||||||
direction: WriteBufferDirection::Write,
|
|
||||||
type_: "mock".to_string(),
|
|
||||||
connection: "my_mock".to_string(),
|
|
||||||
..Default::default()
|
|
||||||
}),
|
|
||||||
};
|
|
||||||
server
|
|
||||||
.create_database(make_provided_rules(rules))
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
|
||||||
let write = DmlWrite::new(tables, Default::default());
|
|
||||||
assert_error!(
|
|
||||||
server.write(&db_name, write).await,
|
|
||||||
Error::WriteBuffer { .. },
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// run a sql query against the database, returning the results as record batches
|
|
||||||
async fn run_query(db: Arc<Db>, query: &str) -> Vec<RecordBatch> {
|
|
||||||
let planner = SqlQueryPlanner::default();
|
|
||||||
let ctx = db.new_query_context(None);
|
|
||||||
|
|
||||||
let physical_plan = planner.query(query, &ctx).await.unwrap();
|
|
||||||
ctx.collect(physical_plan).await.unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn default_rules(db_name: DatabaseName<'static>) -> ProvidedDatabaseRules {
|
fn default_rules(db_name: DatabaseName<'static>) -> ProvidedDatabaseRules {
|
||||||
make_provided_rules(DatabaseRules::new(db_name))
|
make_provided_rules(DatabaseRules::new(db_name))
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,91 +0,0 @@
|
||||||
use data_types::server_id::ServerId;
|
|
||||||
use std::collections::BTreeMap;
|
|
||||||
|
|
||||||
/// A RemoteTemplate string is a remote connection template string.
|
|
||||||
/// Occurrences of the substring "{id}" in the template will be replaced
|
|
||||||
/// by the server ID.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct RemoteTemplate {
|
|
||||||
template: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RemoteTemplate {
|
|
||||||
pub fn new(template: impl Into<String>) -> Self {
|
|
||||||
let template = template.into();
|
|
||||||
Self { template }
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get(&self, id: &ServerId) -> GrpcConnectionString {
|
|
||||||
self.template.replace("{id}", &format!("{}", id.get_u32()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A gRPC connection string.
|
|
||||||
pub type GrpcConnectionString = String;
|
|
||||||
|
|
||||||
/// The Resolver provides a mapping between ServerId and GRpcConnectionString
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Resolver {
|
|
||||||
/// Map between remote IOx server IDs and management API connection strings.
|
|
||||||
remotes: BTreeMap<ServerId, GrpcConnectionString>,
|
|
||||||
|
|
||||||
/// Static map between remote server IDs and hostnames based on a template
|
|
||||||
remote_template: Option<RemoteTemplate>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Resolver {
|
|
||||||
pub fn new(remote_template: Option<RemoteTemplate>) -> Self {
|
|
||||||
Self {
|
|
||||||
remotes: Default::default(),
|
|
||||||
remote_template,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get all registered remote servers.
|
|
||||||
pub fn remotes_sorted(&self) -> Vec<(ServerId, String)> {
|
|
||||||
self.remotes.iter().map(|(&a, b)| (a, b.clone())).collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Update given remote server.
|
|
||||||
pub fn update_remote(&mut self, id: ServerId, addr: GrpcConnectionString) {
|
|
||||||
self.remotes.insert(id, addr);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Delete remote server by ID.
|
|
||||||
pub fn delete_remote(&mut self, id: ServerId) -> Option<GrpcConnectionString> {
|
|
||||||
self.remotes.remove(&id)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get remote server by ID.
|
|
||||||
pub fn resolve_remote(&self, id: ServerId) -> Option<GrpcConnectionString> {
|
|
||||||
self.remotes
|
|
||||||
.get(&id)
|
|
||||||
.cloned()
|
|
||||||
.or_else(|| self.remote_template.as_ref().map(|t| t.get(&id)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use std::num::NonZeroU32;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn resolve_remote() {
|
|
||||||
let resolver = Resolver::new(Some(RemoteTemplate::new("http://iox-query-{id}:8082")));
|
|
||||||
|
|
||||||
let server_id = ServerId::new(NonZeroU32::new(42).unwrap());
|
|
||||||
let remote = resolver.resolve_remote(server_id);
|
|
||||||
assert_eq!(
|
|
||||||
remote,
|
|
||||||
Some(GrpcConnectionString::from("http://iox-query-42:8082"))
|
|
||||||
);
|
|
||||||
|
|
||||||
let server_id = ServerId::new(NonZeroU32::new(24).unwrap());
|
|
||||||
let remote = resolver.resolve_remote(server_id);
|
|
||||||
assert_eq!(
|
|
||||||
remote,
|
|
||||||
Some(GrpcConnectionString::from("http://iox-query-24:8082"))
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -23,7 +23,6 @@ use query::frontend::sql::SqlQueryPlanner;
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use router::router::Router;
|
use router::router::Router;
|
||||||
use router::server::RouterServer;
|
use router::server::RouterServer;
|
||||||
use server::connection::test_helpers::TestConnectionManager;
|
|
||||||
use server::rules::ProvidedDatabaseRules;
|
use server::rules::ProvidedDatabaseRules;
|
||||||
use server::test_utils::{make_application, make_initialized_server};
|
use server::test_utils::{make_application, make_initialized_server};
|
||||||
use server::{Db, Server};
|
use server::{Db, Server};
|
||||||
|
@ -44,7 +43,7 @@ use write_buffer::mock::MockBufferSharedState;
|
||||||
struct DistributedTest {
|
struct DistributedTest {
|
||||||
router: Arc<Router>,
|
router: Arc<Router>,
|
||||||
|
|
||||||
consumer: Arc<Server<TestConnectionManager>>,
|
consumer: Arc<Server>,
|
||||||
consumer_db: Arc<Db>,
|
consumer_db: Arc<Db>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue