Merge pull request #3207 from influxdata/crepererum/remove_routing_from_database_mode_3
refactor: remove write routing logic from `Server`pull/24376/head
commit
6d9d84ff29
|
@ -181,17 +181,18 @@ where
|
|||
) -> Result<(), InnerDmlError> {
|
||||
match op {
|
||||
DmlOperation::Write(write) => {
|
||||
self.server
|
||||
.write(db_name, write)
|
||||
let database = self.server.active_database(db_name).map_err(|_| {
|
||||
InnerDmlError::DatabaseNotFound {
|
||||
db_name: db_name.to_string(),
|
||||
}
|
||||
})?;
|
||||
|
||||
database
|
||||
.route_operation(&DmlOperation::Write(write))
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
server::Error::DatabaseNotFound { .. } => InnerDmlError::DatabaseNotFound {
|
||||
db_name: db_name.to_string(),
|
||||
},
|
||||
e => InnerDmlError::InternalError {
|
||||
db_name: db_name.to_string(),
|
||||
source: Box::new(e),
|
||||
},
|
||||
.map_err(|e| InnerDmlError::InternalError {
|
||||
db_name: db_name.to_string(),
|
||||
source: Box::new(e),
|
||||
})
|
||||
}
|
||||
DmlOperation::Delete(delete) => {
|
||||
|
|
|
@ -41,39 +41,10 @@ pub fn default_server_error_handler(error: server::Error) -> tonic::Status {
|
|||
description: source.to_string(),
|
||||
}
|
||||
.into(),
|
||||
Error::HardLimitReached {} => QuotaFailure {
|
||||
subject: "influxdata.com/iox/buffer".to_string(),
|
||||
description: "hard buffer limit reached".to_string(),
|
||||
}
|
||||
.into(),
|
||||
source @ Error::WritingOnlyAllowedThroughWriteBuffer { .. }
|
||||
| source @ Error::ShardWrite { .. } => {
|
||||
tonic::Status::failed_precondition(source.to_string())
|
||||
}
|
||||
Error::NoRemoteConfigured { node_group } => NotFound {
|
||||
resource_type: "remote".to_string(),
|
||||
resource_name: format!("{:?}", node_group),
|
||||
..Default::default()
|
||||
}
|
||||
.into(),
|
||||
Error::RemoteError { source } => tonic::Status::unavailable(source.to_string()),
|
||||
Error::WipePreservedCatalog { source } => default_database_error_handler(source),
|
||||
Error::DeleteExpression {
|
||||
start_time,
|
||||
stop_time,
|
||||
predicate,
|
||||
} => FieldViolation {
|
||||
field: format!(
|
||||
"time range: [{}, {}], predicate: {}",
|
||||
start_time, stop_time, predicate
|
||||
),
|
||||
description: "Invalid time range or predicate".to_string(),
|
||||
}
|
||||
.into(),
|
||||
Error::DatabaseInit { source } => {
|
||||
tonic::Status::invalid_argument(format!("Cannot initialize database: {}", source))
|
||||
}
|
||||
Error::StoreWriteErrors { .. } => tonic::Status::invalid_argument(error.to_string()),
|
||||
Error::DatabaseAlreadyExists { .. } | Error::DatabaseAlreadyOwnedByThisServer { .. } => {
|
||||
tonic::Status::already_exists(error.to_string())
|
||||
}
|
||||
|
@ -185,9 +156,42 @@ pub fn default_db_error_handler(error: server::db::Error) -> tonic::Status {
|
|||
}
|
||||
.into(),
|
||||
Error::CatalogError { source } => default_catalog_error_handler(source),
|
||||
Error::HardLimitReached {} => QuotaFailure {
|
||||
subject: "influxdata.com/iox/buffer".to_string(),
|
||||
description: "hard buffer limit reached".to_string(),
|
||||
}
|
||||
.into(),
|
||||
Error::StoreWriteErrors { .. } => tonic::Status::invalid_argument(error.to_string()),
|
||||
error => {
|
||||
error!(?error, "Unexpected error");
|
||||
InternalError {}.into()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// map common [`database::WriteError`](server::database::WriteError) errors to the appropriate tonic Status
|
||||
pub fn default_database_write_error_handler(error: server::database::WriteError) -> tonic::Status {
|
||||
use server::database::WriteError;
|
||||
|
||||
match error {
|
||||
WriteError::HardLimitReached {} => QuotaFailure {
|
||||
subject: "influxdata.com/iox/buffer".to_string(),
|
||||
description: "hard buffer limit reached".to_string(),
|
||||
}
|
||||
.into(),
|
||||
WriteError::DbError { source } => default_db_error_handler(source),
|
||||
source @ WriteError::WritingOnlyAllowedThroughWriteBuffer => {
|
||||
tonic::Status::failed_precondition(source.to_string())
|
||||
}
|
||||
WriteError::NotInitialized { state } => {
|
||||
tonic::Status::unavailable(format!("Database is not yet initialized: {}", state))
|
||||
}
|
||||
error @ WriteError::StoreWriteErrors { .. } => {
|
||||
tonic::Status::invalid_argument(error.to_string())
|
||||
}
|
||||
error => {
|
||||
error!(?error, "Unexpected write error");
|
||||
InternalError {}.into()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
use data_types::DatabaseName;
|
||||
use dml::{DmlMeta, DmlWrite};
|
||||
use dml::{DmlMeta, DmlOperation, DmlWrite};
|
||||
use generated_types::google::{FieldViolation, FieldViolationExt};
|
||||
use generated_types::influxdata::pbdata::v1::*;
|
||||
use server::{connection::ConnectionManager, Server};
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::error::default_server_error_handler;
|
||||
use super::error::{default_database_write_error_handler, default_server_error_handler};
|
||||
|
||||
struct PBWriteService<M: ConnectionManager> {
|
||||
server: Arc<Server<M>>,
|
||||
|
@ -27,9 +27,6 @@ where
|
|||
.database_batch
|
||||
.ok_or_else(|| FieldViolation::required("database_batch"))?;
|
||||
|
||||
let db_name = DatabaseName::new(&database_batch.database_name)
|
||||
.scope("database_batch.database_name")?;
|
||||
|
||||
let tables =
|
||||
mutable_batch_pb::decode::decode_database_batch(&database_batch).map_err(|e| {
|
||||
FieldViolation {
|
||||
|
@ -40,11 +37,18 @@ where
|
|||
|
||||
let write = DmlWrite::new(tables, DmlMeta::unsequenced(span_ctx));
|
||||
|
||||
self.server
|
||||
.write(&db_name, write)
|
||||
.await
|
||||
let db_name = DatabaseName::new(&database_batch.database_name)
|
||||
.scope("database_batch.database_name")?;
|
||||
let database = self
|
||||
.server
|
||||
.active_database(&db_name)
|
||||
.map_err(default_server_error_handler)?;
|
||||
|
||||
database
|
||||
.route_operation(&DmlOperation::Write(write))
|
||||
.await
|
||||
.map_err(default_database_write_error_handler)?;
|
||||
|
||||
Ok(tonic::Response::new(WriteResponse {}))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -151,13 +151,7 @@ async fn cant_write_to_db_reading_from_write_buffer() {
|
|||
.await
|
||||
.expect_err("expected write to fail");
|
||||
|
||||
assert_contains!(
|
||||
err.to_string(),
|
||||
format!(
|
||||
r#"Cannot write to database {}, it's configured to only read from the write buffer"#,
|
||||
db_name
|
||||
)
|
||||
);
|
||||
assert_contains!(err.to_string(), "only allowed through write buffer");
|
||||
assert!(matches!(dbg!(err), WriteError::ServerError(_)));
|
||||
}
|
||||
|
||||
|
|
|
@ -973,7 +973,7 @@ impl Db {
|
|||
/// Stores the write on this [`Db`] and/or routes it to the write buffer
|
||||
///
|
||||
/// TODO: Remove this method (#2243)
|
||||
pub async fn route_operation(&self, operation: &DmlOperation) -> Result<()> {
|
||||
pub(crate) async fn route_operation(&self, operation: &DmlOperation) -> Result<()> {
|
||||
let immutable = {
|
||||
let rules = self.rules.read();
|
||||
rules.lifecycle_rules.immutable
|
||||
|
@ -1023,12 +1023,12 @@ impl Db {
|
|||
}
|
||||
|
||||
/// Writes the provided [`DmlWrite`] to this database
|
||||
pub fn store_write(&self, db_write: &DmlWrite) -> Result<()> {
|
||||
pub(crate) fn store_write(&self, db_write: &DmlWrite) -> Result<()> {
|
||||
self.store_filtered_write(db_write, WriteFilterNone::default())
|
||||
}
|
||||
|
||||
/// Writes the provided [`DmlWrite`] to this database with the provided [`WriteFilter`]
|
||||
pub fn store_filtered_write(
|
||||
pub(crate) fn store_filtered_write(
|
||||
&self,
|
||||
db_write: &DmlWrite,
|
||||
filter: impl WriteFilter,
|
||||
|
|
|
@ -70,10 +70,9 @@
|
|||
|
||||
use ::lifecycle::{LockableChunk, LockablePartition};
|
||||
use async_trait::async_trait;
|
||||
use connection::{ConnectionManager, RemoteServer};
|
||||
use connection::ConnectionManager;
|
||||
use data_types::{
|
||||
chunk_metadata::ChunkId,
|
||||
database_rules::{NodeGroup, RoutingRules, Sink},
|
||||
detailed_database::ActiveDatabase,
|
||||
error::ErrorLogger,
|
||||
job::Job,
|
||||
|
@ -88,7 +87,6 @@ use internal_types::freezable::Freezable;
|
|||
use iox_object_store::IoxObjectStore;
|
||||
use observability_deps::tracing::{error, info, warn};
|
||||
use parking_lot::RwLock;
|
||||
use rand::seq::SliceRandom;
|
||||
use resolver::Resolver;
|
||||
use snafu::{ensure, OptionExt, ResultExt, Snafu};
|
||||
use std::sync::Arc;
|
||||
|
@ -99,7 +97,6 @@ use uuid::Uuid;
|
|||
|
||||
pub use application::ApplicationState;
|
||||
pub use db::Db;
|
||||
use dml::{DmlOperation, DmlWrite};
|
||||
pub use job::JobRegistry;
|
||||
pub use resolver::{GrpcConnectionString, RemoteTemplate};
|
||||
|
||||
|
@ -182,9 +179,6 @@ pub enum Error {
|
|||
current: Uuid,
|
||||
},
|
||||
|
||||
#[snafu(display("Server error: {}", source))]
|
||||
ServerError { source: std::io::Error },
|
||||
|
||||
#[snafu(display("invalid database: {}", source))]
|
||||
InvalidDatabaseName { source: DatabaseNameError },
|
||||
|
||||
|
@ -209,61 +203,11 @@ pub enum Error {
|
|||
table: String,
|
||||
},
|
||||
|
||||
#[snafu(display("hard buffer limit reached"))]
|
||||
HardLimitReached {},
|
||||
|
||||
#[snafu(display(
|
||||
"Storing database write failed with the following error(s), and possibly more: {}",
|
||||
errors.iter().map(ToString::to_string).collect::<Vec<_>>().join(", ")
|
||||
))]
|
||||
StoreWriteErrors { errors: Vec<DatabaseError> },
|
||||
|
||||
#[snafu(display(
|
||||
"Cannot write to database {}, it's configured to only read from the write buffer",
|
||||
db_name
|
||||
))]
|
||||
WritingOnlyAllowedThroughWriteBuffer { db_name: String },
|
||||
|
||||
#[snafu(display("Cannot write to write buffer: {}", source))]
|
||||
WriteBuffer {
|
||||
source: Box<dyn std::error::Error + Sync + Send>,
|
||||
},
|
||||
|
||||
#[snafu(display("no remote configured for node group: {:?}", node_group))]
|
||||
NoRemoteConfigured { node_group: NodeGroup },
|
||||
|
||||
#[snafu(display("all remotes failed connecting: {:?}", errors))]
|
||||
NoRemoteReachable {
|
||||
errors: HashMap<GrpcConnectionString, connection::ConnectionManagerError>,
|
||||
},
|
||||
|
||||
#[snafu(display("remote error: {}", source))]
|
||||
RemoteError {
|
||||
source: connection::ConnectionManagerError,
|
||||
},
|
||||
|
||||
#[snafu(display("database failed to initialize: {}", source))]
|
||||
DatabaseInit { source: Arc<database::InitError> },
|
||||
|
||||
#[snafu(display(
|
||||
"Either invalid time range [{}, {}] or invalid delete expression {}",
|
||||
start_time,
|
||||
stop_time,
|
||||
predicate
|
||||
))]
|
||||
DeleteExpression {
|
||||
start_time: String,
|
||||
stop_time: String,
|
||||
predicate: String,
|
||||
},
|
||||
|
||||
#[snafu(display("error persisting server config to object storage: {}", source))]
|
||||
PersistServerConfig { source: object_store::Error },
|
||||
|
||||
#[snafu(display("Error sharding write: {}", source))]
|
||||
ShardWrite {
|
||||
source: data_types::database_rules::Error,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -901,141 +845,6 @@ where
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// `write_lines` takes in raw line protocol and converts it to a collection
|
||||
/// of ShardedEntry which are then sent to other IOx servers based on
|
||||
/// the ShardConfig or sent to the local database for buffering in the
|
||||
/// WriteBuffer and/or the MutableBuffer if configured.
|
||||
///
|
||||
/// The provided `default_time` is nanoseconds since the epoch and will be assigned
|
||||
/// to any lines that don't have a timestamp.
|
||||
///
|
||||
/// TODO: Replace with dedicated router in terms of MutableBatch
|
||||
pub async fn write(&self, db_name: &DatabaseName<'_>, write: DmlWrite) -> Result<()> {
|
||||
let db = self.db(db_name)?;
|
||||
let rules = db.rules();
|
||||
|
||||
let sharded_writes = match &rules.routing_rules {
|
||||
Some(RoutingRules::ShardConfig(shard_config)) => {
|
||||
let sharded_writes = write.shard(shard_config).context(ShardWrite)?;
|
||||
itertools::Either::Left(sharded_writes.into_iter().map(|(s, w)| (Some(s), w)))
|
||||
}
|
||||
_ => itertools::Either::Right(std::iter::once((None, write))),
|
||||
};
|
||||
|
||||
// Write to all shards in parallel; as soon as one fails return error
|
||||
// immediately to the client and abort all other outstanding requests.
|
||||
futures_util::future::try_join_all(sharded_writes.map(|(shard, write)| {
|
||||
let sink = match &rules.routing_rules {
|
||||
Some(RoutingRules::ShardConfig(shard_config)) => {
|
||||
let id = shard.expect("sharded entry");
|
||||
Some(shard_config.shards.get(&id).expect("valid shard"))
|
||||
}
|
||||
Some(RoutingRules::RoutingConfig(config)) => Some(&config.sink),
|
||||
None => None,
|
||||
};
|
||||
|
||||
async move {
|
||||
match sink {
|
||||
Some(sink) => self.write_sink(db_name, sink, write).await,
|
||||
None => self.write_local(db_name, &DmlOperation::Write(write)).await,
|
||||
}
|
||||
}
|
||||
}))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_sink(
|
||||
&self,
|
||||
db_name: &DatabaseName<'_>,
|
||||
sink: &Sink,
|
||||
write: DmlWrite,
|
||||
) -> Result<()> {
|
||||
match sink {
|
||||
Sink::Iox(node_group) => self.write_downstream(db_name, node_group, &write).await,
|
||||
Sink::Kafka(_) => {
|
||||
// The write buffer write path is currently implemented in "db", so confusingly we
|
||||
// need to invoke write_entry_local.
|
||||
// TODO(mkm): tracked in #2134
|
||||
self.write_local(db_name, &DmlOperation::Write(write)).await
|
||||
}
|
||||
Sink::DevNull => {
|
||||
// write is silently ignored, as requested by the configuration.
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_downstream(
|
||||
&self,
|
||||
db_name: &str,
|
||||
node_group: &[ServerId],
|
||||
write: &DmlWrite,
|
||||
) -> Result<()> {
|
||||
// Return an error if this server is not yet ready
|
||||
self.shared.state.read().initialized()?;
|
||||
|
||||
let addrs: Vec<_> = {
|
||||
let resolver = self.resolver.read();
|
||||
node_group
|
||||
.iter()
|
||||
.filter_map(|&node| resolver.resolve_remote(node))
|
||||
.collect()
|
||||
};
|
||||
|
||||
if addrs.is_empty() {
|
||||
return NoRemoteConfigured { node_group }.fail();
|
||||
}
|
||||
|
||||
let mut errors = HashMap::new();
|
||||
// this needs to be in its own statement because rand::thread_rng is not Send and the loop below is async.
|
||||
// braces around the expression would work but clippy don't know that and complains the braces are useless.
|
||||
let random_addrs_iter = addrs.choose_multiple(&mut rand::thread_rng(), addrs.len());
|
||||
for addr in random_addrs_iter {
|
||||
match self.connection_manager.remote_server(addr).await {
|
||||
Err(err) => {
|
||||
info!("error obtaining remote for {}: {}", addr, err);
|
||||
errors.insert(addr.to_owned(), err);
|
||||
}
|
||||
Ok(remote) => return remote.write(db_name, write).await.context(RemoteError),
|
||||
};
|
||||
}
|
||||
NoRemoteReachable { errors }.fail()
|
||||
}
|
||||
|
||||
/// Write an entry to the local `Db`
|
||||
///
|
||||
/// TODO: Remove this and migrate callers to `Database::route_write`
|
||||
async fn write_local(
|
||||
&self,
|
||||
db_name: &DatabaseName<'_>,
|
||||
operation: &DmlOperation,
|
||||
) -> Result<()> {
|
||||
use database::WriteError;
|
||||
|
||||
self.active_database(db_name)?
|
||||
.route_operation(operation)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
WriteError::NotInitialized { .. } => Error::DatabaseNotInitialized {
|
||||
db_name: db_name.to_string(),
|
||||
},
|
||||
WriteError::WriteBuffer { source } => Error::WriteBuffer { source },
|
||||
WriteError::WritingOnlyAllowedThroughWriteBuffer => {
|
||||
Error::WritingOnlyAllowedThroughWriteBuffer {
|
||||
db_name: db_name.to_string(),
|
||||
}
|
||||
}
|
||||
WriteError::DbError { source } => Error::UnknownDatabaseError {
|
||||
source: Box::new(source),
|
||||
},
|
||||
WriteError::HardLimitReached { .. } => Error::HardLimitReached {},
|
||||
WriteError::StoreWriteErrors { errors } => Error::StoreWriteErrors {
|
||||
errors: errors.into_iter().map(|e| Box::new(e) as _).collect(),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
/// Update database rules and save on success.
|
||||
pub async fn update_db_rules(
|
||||
&self,
|
||||
|
@ -1450,18 +1259,13 @@ mod tests {
|
|||
test_utils::{make_application, make_server},
|
||||
*,
|
||||
};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_util::assert_batches_eq;
|
||||
use bytes::Bytes;
|
||||
use connection::test_helpers::{TestConnectionManager, TestRemoteServer};
|
||||
use data_types::{
|
||||
chunk_metadata::{ChunkAddr, ChunkStorage},
|
||||
database_rules::{
|
||||
DatabaseRules, HashRing, LifecycleRules, PartitionTemplate, ShardConfig, ShardId,
|
||||
TemplatePart,
|
||||
},
|
||||
database_rules::{DatabaseRules, LifecycleRules, PartitionTemplate, TemplatePart},
|
||||
write_buffer::{WriteBufferConnection, WriteBufferDirection},
|
||||
};
|
||||
use dml::DmlWrite;
|
||||
use iox_object_store::IoxObjectStore;
|
||||
use mutable_batch_lp::lines_to_batches;
|
||||
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
|
||||
|
@ -1469,13 +1273,10 @@ mod tests {
|
|||
core::{PreservedCatalog, PreservedCatalogConfig},
|
||||
test_helpers::{load_ok, new_empty},
|
||||
};
|
||||
use query::{exec::ExecutionContextProvider, frontend::sql::SqlQueryPlanner, QueryDatabase};
|
||||
use query::QueryDatabase;
|
||||
use std::{
|
||||
convert::TryFrom,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use test_helpers::{assert_contains, assert_error};
|
||||
|
@ -1484,12 +1285,7 @@ mod tests {
|
|||
async fn server_api_calls_return_error_with_no_id_set() {
|
||||
let server = make_server(make_application());
|
||||
|
||||
let tables = lines_to_batches("cpu foo=1 10", 0).unwrap();
|
||||
let write = DmlWrite::new(tables, Default::default());
|
||||
let resp = server
|
||||
.write(&DatabaseName::new("foo").unwrap(), write)
|
||||
.await
|
||||
.unwrap_err();
|
||||
let resp = server.db(&DatabaseName::new("foo").unwrap()).unwrap_err();
|
||||
assert!(matches!(resp, Error::IdNotSet));
|
||||
}
|
||||
|
||||
|
@ -1984,136 +1780,6 @@ mod tests {
|
|||
assert_eq!(names, db_names_sorted);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn writes_local() {
|
||||
let server = make_server(make_application());
|
||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
||||
let db_name = DatabaseName::new("foo".to_string()).unwrap();
|
||||
server
|
||||
.create_database(default_rules(db_name.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
||||
let write = DmlWrite::new(tables, Default::default());
|
||||
server.write(&db_name, write).await.unwrap();
|
||||
|
||||
let db_name = DatabaseName::new("foo").unwrap();
|
||||
let db = server.db(&db_name).unwrap();
|
||||
let batches = run_query(db, "select * from cpu").await;
|
||||
|
||||
let expected = vec![
|
||||
"+-----+--------------------------------+",
|
||||
"| bar | time |",
|
||||
"+-----+--------------------------------+",
|
||||
"| 1 | 1970-01-01T00:00:00.000000010Z |",
|
||||
"+-----+--------------------------------+",
|
||||
];
|
||||
assert_batches_eq!(expected, &batches);
|
||||
}
|
||||
|
||||
// This tests sets up a database with a sharding config which defines exactly one shard
|
||||
// backed by 3 remote nodes. One of the nodes is modeled to be "down", while the other two
|
||||
// can record write entry events.
|
||||
// This tests goes through a few trivial error cases before checking that the both working
|
||||
// mock remote servers actually receive write entry events.
|
||||
//
|
||||
// This test is theoretically flaky, low probability though (in the order of 1e-30)
|
||||
#[tokio::test]
|
||||
async fn write_entry_downstream() {
|
||||
const TEST_SHARD_ID: ShardId = 1;
|
||||
const GOOD_REMOTE_ADDR_1: &str = "http://localhost:111";
|
||||
const GOOD_REMOTE_ADDR_2: &str = "http://localhost:222";
|
||||
const BAD_REMOTE_ADDR: &str = "http://localhost:666";
|
||||
|
||||
let good_remote_id_1 = ServerId::try_from(1).unwrap();
|
||||
let good_remote_id_2 = ServerId::try_from(2).unwrap();
|
||||
let bad_remote_id = ServerId::try_from(666).unwrap();
|
||||
|
||||
let mut manager = TestConnectionManager::new();
|
||||
let written_1 = Arc::new(AtomicBool::new(false));
|
||||
manager.remotes.insert(
|
||||
GOOD_REMOTE_ADDR_1.to_owned(),
|
||||
Arc::new(TestRemoteServer {
|
||||
written: Arc::clone(&written_1),
|
||||
}),
|
||||
);
|
||||
let written_2 = Arc::new(AtomicBool::new(false));
|
||||
manager.remotes.insert(
|
||||
GOOD_REMOTE_ADDR_2.to_owned(),
|
||||
Arc::new(TestRemoteServer {
|
||||
written: Arc::clone(&written_2),
|
||||
}),
|
||||
);
|
||||
|
||||
let server = Server::new(manager, make_application(), Default::default());
|
||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
||||
let db_name = DatabaseName::new("foo").unwrap();
|
||||
server
|
||||
.create_database(default_rules(db_name.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let remote_ids = vec![bad_remote_id, good_remote_id_1, good_remote_id_2];
|
||||
let db = server.db(&db_name).unwrap();
|
||||
|
||||
let shard_config = ShardConfig {
|
||||
hash_ring: Some(HashRing {
|
||||
shards: vec![TEST_SHARD_ID].into(),
|
||||
}),
|
||||
shards: vec![(TEST_SHARD_ID, Sink::Iox(remote_ids.clone()))]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut rules = db.rules().as_ref().clone();
|
||||
rules.routing_rules = Some(RoutingRules::ShardConfig(shard_config));
|
||||
let rules = Arc::new(rules);
|
||||
|
||||
db.update_rules(rules);
|
||||
|
||||
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
||||
let write = DmlWrite::new(tables, Default::default());
|
||||
let err = server.write(&db_name, write.clone()).await.unwrap_err();
|
||||
assert!(
|
||||
matches!(err, Error::NoRemoteConfigured { node_group } if node_group == remote_ids)
|
||||
);
|
||||
|
||||
// one remote is configured but it's down and we'll get connection error
|
||||
server.update_remote(bad_remote_id, BAD_REMOTE_ADDR.into());
|
||||
let err = server.write(&db_name, write.clone()).await.unwrap_err();
|
||||
assert!(matches!(
|
||||
err,
|
||||
Error::NoRemoteReachable { errors } if matches!(
|
||||
errors[BAD_REMOTE_ADDR],
|
||||
connection::ConnectionManagerError::RemoteServerConnectError {..}
|
||||
)
|
||||
));
|
||||
assert!(!written_1.load(Ordering::Relaxed));
|
||||
assert!(!written_2.load(Ordering::Relaxed));
|
||||
|
||||
// We configure the address for the other remote, this time connection will succeed
|
||||
// despite the bad remote failing to connect.
|
||||
server.update_remote(good_remote_id_1, GOOD_REMOTE_ADDR_1.into());
|
||||
server.update_remote(good_remote_id_2, GOOD_REMOTE_ADDR_2.into());
|
||||
|
||||
// Remotes are tried in random order, so we need to repeat the test a few times to have a reasonable
|
||||
// probability both the remotes will get hit.
|
||||
for _ in 0..100 {
|
||||
server
|
||||
.write(&db_name, write.clone())
|
||||
.await
|
||||
.expect("cannot write lines");
|
||||
}
|
||||
assert!(written_1.load(Ordering::Relaxed));
|
||||
assert!(written_2.load(Ordering::Relaxed));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn close_chunk() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
@ -2129,11 +1795,11 @@ mod tests {
|
|||
.unwrap();
|
||||
|
||||
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
||||
let db = server.db(&db_name).unwrap();
|
||||
let write = DmlWrite::new(tables, Default::default());
|
||||
server.write(&db_name, write).await.unwrap();
|
||||
db.store_write(&write).unwrap();
|
||||
|
||||
// get chunk ID
|
||||
let db = server.db(&db_name).unwrap();
|
||||
let chunks = db.chunk_summaries().unwrap();
|
||||
assert_eq!(chunks.len(), 1);
|
||||
let chunk_id = chunks[0].id;
|
||||
|
@ -2191,39 +1857,6 @@ mod tests {
|
|||
server.join().await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn hard_buffer_limit() {
|
||||
let server = make_server(make_application());
|
||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
||||
let name = DatabaseName::new("foo").unwrap();
|
||||
server
|
||||
.create_database(default_rules(name.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let db = server.db(&name).unwrap();
|
||||
|
||||
let mut rules: DatabaseRules = db.rules().as_ref().clone();
|
||||
|
||||
rules.lifecycle_rules.buffer_size_hard = Some(std::num::NonZeroUsize::new(10).unwrap());
|
||||
|
||||
let rules = Arc::new(rules);
|
||||
db.update_rules(Arc::clone(&rules));
|
||||
|
||||
// inserting first line does not trigger hard buffer limit
|
||||
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
||||
let write = DmlWrite::new(tables, Default::default());
|
||||
server.write(&name, write).await.unwrap();
|
||||
|
||||
// inserting second line will
|
||||
let tables = lines_to_batches("cpu bar=2 20", 0).unwrap();
|
||||
let write = DmlWrite::new(tables, Default::default());
|
||||
let res = server.write(&name, write).await.unwrap_err();
|
||||
assert!(matches!(res, super::Error::HardLimitReached {}));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cannot_create_db_until_server_is_initialized() {
|
||||
let server = make_server(make_application());
|
||||
|
@ -2372,9 +2005,13 @@ mod tests {
|
|||
// can only write to successfully created DBs
|
||||
let tables = lines_to_batches("cpu foo=1 10", 0).unwrap();
|
||||
let write = DmlWrite::new(tables, Default::default());
|
||||
server.write(&foo_db_name, write.clone()).await.unwrap();
|
||||
server
|
||||
.db(&foo_db_name)
|
||||
.unwrap()
|
||||
.store_write(&write)
|
||||
.unwrap();
|
||||
|
||||
let err = server.write(&bar_db_name, write).await.unwrap_err();
|
||||
let err = server.db(&bar_db_name).unwrap_err();
|
||||
assert!(matches!(err, Error::DatabaseNotInitialized { .. }));
|
||||
|
||||
// creating failed DBs does not work
|
||||
|
@ -2766,10 +2403,10 @@ mod tests {
|
|||
);
|
||||
assert!(database.init_error().is_none());
|
||||
|
||||
assert!(server.db(&db_name_catalog_broken).is_ok());
|
||||
let db = server.db(&db_name_catalog_broken).unwrap();
|
||||
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
||||
let write = DmlWrite::new(tables, Default::default());
|
||||
server.write(&db_name_catalog_broken, write).await.unwrap();
|
||||
db.store_write(&write).unwrap();
|
||||
|
||||
// 5. cannot wipe if DB was just created
|
||||
let created = server
|
||||
|
@ -2792,56 +2429,6 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_buffer_errors_propagate() {
|
||||
let application = make_application();
|
||||
|
||||
application
|
||||
.write_buffer_factory()
|
||||
.register_always_fail_mock("my_mock".to_string());
|
||||
|
||||
let server = make_server(application);
|
||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
||||
let db_name = DatabaseName::new("my_db").unwrap();
|
||||
let rules = DatabaseRules {
|
||||
name: db_name.clone(),
|
||||
partition_template: PartitionTemplate {
|
||||
parts: vec![TemplatePart::TimeFormat("YYYY-MM".to_string())],
|
||||
},
|
||||
lifecycle_rules: Default::default(),
|
||||
routing_rules: None,
|
||||
worker_cleanup_avg_sleep: Duration::from_secs(2),
|
||||
write_buffer_connection: Some(WriteBufferConnection {
|
||||
direction: WriteBufferDirection::Write,
|
||||
type_: "mock".to_string(),
|
||||
connection: "my_mock".to_string(),
|
||||
..Default::default()
|
||||
}),
|
||||
};
|
||||
server
|
||||
.create_database(make_provided_rules(rules))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let tables = lines_to_batches("cpu bar=1 10", 0).unwrap();
|
||||
let write = DmlWrite::new(tables, Default::default());
|
||||
assert_error!(
|
||||
server.write(&db_name, write).await,
|
||||
Error::WriteBuffer { .. },
|
||||
);
|
||||
}
|
||||
|
||||
// run a sql query against the database, returning the results as record batches
|
||||
async fn run_query(db: Arc<Db>, query: &str) -> Vec<RecordBatch> {
|
||||
let planner = SqlQueryPlanner::default();
|
||||
let ctx = db.new_query_context(None);
|
||||
|
||||
let physical_plan = planner.query(query, &ctx).await.unwrap();
|
||||
ctx.collect(physical_plan).await.unwrap()
|
||||
}
|
||||
|
||||
fn default_rules(db_name: DatabaseName<'static>) -> ProvidedDatabaseRules {
|
||||
make_provided_rules(DatabaseRules::new(db_name))
|
||||
}
|
||||
|
|
|
@ -14,10 +14,6 @@ impl RemoteTemplate {
|
|||
let template = template.into();
|
||||
Self { template }
|
||||
}
|
||||
|
||||
fn get(&self, id: &ServerId) -> GrpcConnectionString {
|
||||
self.template.replace("{id}", &format!("{}", id.get_u32()))
|
||||
}
|
||||
}
|
||||
|
||||
/// A gRPC connection string.
|
||||
|
@ -55,37 +51,4 @@ impl Resolver {
|
|||
pub fn delete_remote(&mut self, id: ServerId) -> Option<GrpcConnectionString> {
|
||||
self.remotes.remove(&id)
|
||||
}
|
||||
|
||||
/// Get remote server by ID.
|
||||
pub fn resolve_remote(&self, id: ServerId) -> Option<GrpcConnectionString> {
|
||||
self.remotes
|
||||
.get(&id)
|
||||
.cloned()
|
||||
.or_else(|| self.remote_template.as_ref().map(|t| t.get(&id)))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::num::NonZeroU32;
|
||||
|
||||
#[test]
|
||||
fn resolve_remote() {
|
||||
let resolver = Resolver::new(Some(RemoteTemplate::new("http://iox-query-{id}:8082")));
|
||||
|
||||
let server_id = ServerId::new(NonZeroU32::new(42).unwrap());
|
||||
let remote = resolver.resolve_remote(server_id);
|
||||
assert_eq!(
|
||||
remote,
|
||||
Some(GrpcConnectionString::from("http://iox-query-42:8082"))
|
||||
);
|
||||
|
||||
let server_id = ServerId::new(NonZeroU32::new(24).unwrap());
|
||||
let remote = resolver.resolve_remote(server_id);
|
||||
assert_eq!(
|
||||
remote,
|
||||
Some(GrpcConnectionString::from("http://iox-query-24:8082"))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue