fix: Remove server

pull/24376/head
Carol (Nichols || Goulding) 2022-05-04 09:06:15 -04:00
parent e0bc1801ac
commit b88d071ce7
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
16 changed files with 0 additions and 5900 deletions

41
Cargo.lock generated
View File

@ -2224,7 +2224,6 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"serde_urlencoded", "serde_urlencoded",
"server",
"snafu", "snafu",
"tempfile", "tempfile",
"test_helpers", "test_helpers",
@ -5213,46 +5212,6 @@ dependencies = [
"yaml-rust", "yaml-rust",
] ]
[[package]]
name = "server"
version = "0.1.0"
dependencies = [
"arrow_util",
"async-trait",
"bytes",
"chrono",
"crc32fast",
"data_types",
"dml",
"futures",
"futures-util",
"generated_types",
"hashbrown 0.12.0",
"influxdb_line_protocol",
"iox_object_store",
"iox_time",
"job_registry",
"metric",
"mutable_batch_lp",
"num_cpus",
"object_store",
"observability_deps",
"parking_lot 0.12.0",
"query",
"rand",
"regex",
"service_common",
"snafu",
"test_helpers",
"tokio",
"tokio-util 0.7.1",
"trace",
"tracker",
"uuid 0.8.2",
"workspace-hack",
"write_buffer",
]
[[package]] [[package]]
name = "service_common" name = "service_common"
version = "0.1.0" version = "0.1.0"

View File

@ -58,7 +58,6 @@ members = [
"read_buffer", "read_buffer",
"router2", "router2",
"schema", "schema",
"server",
"service_common", "service_common",
"service_grpc_influxrpc", "service_grpc_influxrpc",
"service_grpc_flight", "service_grpc_flight",

View File

@ -44,7 +44,6 @@ query = { path = "../query" }
read_buffer = { path = "../read_buffer" } read_buffer = { path = "../read_buffer" }
router2 = { path = "../router2" } router2 = { path = "../router2" }
schema = { path = "../schema" } schema = { path = "../schema" }
server = { path = "../server" }
iox_time = { path = "../iox_time" } iox_time = { path = "../iox_time" }
trace = { path = "../trace" } trace = { path = "../trace" }
trace_exporters = { path = "../trace_exporters" } trace_exporters = { path = "../trace_exporters" }

View File

@ -1,43 +0,0 @@
[package]
name = "server"
version = "0.1.0"
authors = ["pauldix <paul@pauldix.net>"]
edition = "2021"
[dependencies] # In alphabetical order
arrow_util = { path = "../arrow_util" }
async-trait = "0.1"
bytes = "1.0"
chrono = { version = "0.4", default-features = false }
crc32fast = "1.3.2"
data_types = { path = "../data_types" }
futures = "0.3"
generated_types = { path = "../generated_types" }
hashbrown = "0.12"
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
iox_object_store = { path = "../iox_object_store" }
job_registry = { path = "../job_registry" }
metric = { path = "../metric" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
num_cpus = "1.13.0"
object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" }
parking_lot = "0.12"
query = { path = "../query" }
rand = "0.8.3"
service_common = { path = "../service_common" }
snafu = "0.7"
iox_time = { path = "../iox_time" }
trace = { path = "../trace" }
tokio = { version = "1.18", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
tokio-util = { version = "0.7.1" }
tracker = { path = "../tracker" }
uuid = { version = "0.8", features = ["v4"] }
write_buffer = { path = "../write_buffer" }
workspace-hack = { path = "../workspace-hack"}
[dev-dependencies] # In alphabetical order
dml = { path = "../dml" }
futures-util = { version = "0.3" }
regex = "1"
test_helpers = { path = "../test_helpers" }

View File

@ -1,104 +0,0 @@
use crate::config::{object_store::ConfigProviderObjectStorage, ConfigProvider};
use iox_time::TimeProvider;
use job_registry::JobRegistry;
use object_store::DynObjectStore;
use observability_deps::tracing::info;
use query::exec::Executor;
use std::sync::Arc;
use trace::TraceCollector;
use write_buffer::config::WriteBufferConfigFactory;
/// A container for application-global resources
/// shared between server and all DatabaseInstances
#[derive(Debug, Clone)]
pub struct ApplicationState {
object_store: Arc<DynObjectStore>,
write_buffer_factory: Arc<WriteBufferConfigFactory>,
executor: Arc<Executor>,
job_registry: Arc<JobRegistry>,
metric_registry: Arc<metric::Registry>,
time_provider: Arc<dyn TimeProvider>,
trace_collector: Option<Arc<dyn TraceCollector>>,
config_provider: Arc<dyn ConfigProvider>,
}
impl ApplicationState {
/// Creates a new `ApplicationState`
///
/// Uses number of CPUs in the system if num_worker_threads is not set
pub fn new(
object_store: Arc<DynObjectStore>,
num_worker_threads: Option<usize>,
trace_collector: Option<Arc<dyn TraceCollector>>,
config_provider: Option<Arc<dyn ConfigProvider>>,
) -> Self {
let num_threads = num_worker_threads.unwrap_or_else(num_cpus::get);
info!(%num_threads, "using specified number of threads per thread pool");
let metric_registry = Arc::new(metric::Registry::new());
let time_provider: Arc<dyn TimeProvider> = Arc::new(iox_time::SystemProvider::new());
let job_registry = Arc::new(JobRegistry::new(
Arc::clone(&metric_registry),
Arc::clone(&time_provider),
));
let write_buffer_factory = Arc::new(WriteBufferConfigFactory::new(
Arc::clone(&time_provider),
Arc::clone(&metric_registry),
));
let config_provider = config_provider.unwrap_or_else(|| {
Arc::new(ConfigProviderObjectStorage::new(
Arc::clone(&object_store),
Arc::clone(&time_provider),
))
});
Self {
object_store,
write_buffer_factory,
executor: Arc::new(Executor::new(num_threads)),
job_registry,
metric_registry,
time_provider,
trace_collector,
config_provider,
}
}
pub fn object_store(&self) -> Arc<DynObjectStore> {
Arc::clone(&self.object_store)
}
pub fn write_buffer_factory(&self) -> &Arc<WriteBufferConfigFactory> {
&self.write_buffer_factory
}
pub fn job_registry(&self) -> &Arc<JobRegistry> {
&self.job_registry
}
pub fn metric_registry(&self) -> &Arc<metric::Registry> {
&self.metric_registry
}
pub fn time_provider(&self) -> &Arc<dyn TimeProvider> {
&self.time_provider
}
pub fn trace_collector(&self) -> &Option<Arc<dyn TraceCollector>> {
&self.trace_collector
}
pub fn config_provider(&self) -> &Arc<dyn ConfigProvider> {
&self.config_provider
}
pub fn executor(&self) -> &Arc<Executor> {
&self.executor
}
pub async fn join(&self) {
self.executor.join().await;
}
}

View File

@ -1,47 +0,0 @@
use crate::ProvidedDatabaseRules;
use async_trait::async_trait;
use data_types::server_id::ServerId;
use generated_types::influxdata::iox::management::v1::OwnerInfo;
use uuid::Uuid;
pub mod object_store;
mod owner;
/// A generic opaque error returned by [`ConfigProvider`]
pub type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
/// Result type returned by [`ConfigProvider`]
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// A generic trait for interacting with the configuration
/// of a database server
#[async_trait]
pub trait ConfigProvider: std::fmt::Debug + Send + Sync {
/// Returns a list of database name and UUID pairs
async fn fetch_server_config(&self, server_id: ServerId) -> Result<Vec<(String, Uuid)>>;
/// Persists a list of database names and UUID pairs overwriting any
/// pre-existing persisted server configuration
async fn store_server_config(
&self,
server_id: ServerId,
config: &[(String, Uuid)],
) -> Result<()>;
/// Returns the configuration for the database with the given `uuid`
async fn fetch_rules(&self, uuid: Uuid) -> Result<ProvidedDatabaseRules>;
/// Persists the configuration for the database with the given `uuid`
async fn store_rules(&self, uuid: Uuid, rules: &ProvidedDatabaseRules) -> Result<()>;
/// Returns the owner information for the database with the given `uuid`
async fn fetch_owner_info(&self, server_id: ServerId, uuid: Uuid) -> Result<OwnerInfo>;
/// Updates the owner information for the database with the given `uuid`
/// and records it as owned by `server_id`
async fn update_owner_info(&self, server_id: Option<ServerId>, uuid: Uuid) -> Result<()>;
/// Updates the owner information for the database with the given `uuid`
/// and records it as owned by `server_id`
async fn create_owner_info(&self, server_id: ServerId, uuid: Uuid) -> Result<()>;
}

View File

@ -1,256 +0,0 @@
use super::Result as ConfigResult;
use crate::{
config::{
owner::{
create_owner_info, fetch_owner_info, update_owner_info, OwnerInfoCreateError,
OwnerInfoFetchError, OwnerInfoUpdateError,
},
ConfigProvider,
},
PersistedDatabaseRules, ProvidedDatabaseRules,
};
use async_trait::async_trait;
use data_types::server_id::ServerId;
use generated_types::database_rules::encode_persisted_database_rules;
use generated_types::google::FieldViolation;
use generated_types::influxdata::iox::management;
use generated_types::influxdata::iox::management::v1::OwnerInfo;
use iox_object_store::IoxObjectStore;
use iox_time::TimeProvider;
use object_store::DynObjectStore;
use snafu::{ensure, ResultExt, Snafu};
use std::sync::Arc;
use uuid::Uuid;
/// Error enumeration for [`ConfigProviderObjectStorage`]
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("error saving server config to object storage: {}", source))]
StoreServer { source: object_store::Error },
#[snafu(display("error getting server config from object storage: {}", source))]
FetchServer { source: object_store::Error },
#[snafu(display("error deserializing server config: {}", source))]
DeserializeServer {
source: generated_types::DecodeError,
},
#[snafu(display("error serializing server config: {}", source))]
SerializeServer {
source: generated_types::EncodeError,
},
#[snafu(display(
"UUID mismatch reading server config from object storage, expected {}, got {}",
expected,
actual
))]
UuidMismatch { expected: Uuid, actual: Uuid },
#[snafu(display(
"invalid database uuid in server config while finding location: {}",
source
))]
InvalidDatabaseLocation { source: uuid::Error },
#[snafu(display("Error saving rules for {}: {}", db_name, source))]
StoreRules {
db_name: String,
source: object_store::Error,
},
#[snafu(display("error getting database rules from object storage: {}", source))]
RulesFetch { source: object_store::Error },
#[snafu(display("error deserializing database rules: {}", source))]
DeserializeRules {
source: generated_types::DecodeError,
},
#[snafu(display("error serializing database rules: {}", source))]
SerializeRules {
source: generated_types::EncodeError,
},
#[snafu(display("error converting to database rules: {}", source))]
ConvertingRules { source: FieldViolation },
#[snafu(display("error creating database owner info: {}", source))]
CreatingOwnerInfo { source: OwnerInfoCreateError },
#[snafu(display("error getting database owner info: {}", source))]
FetchingOwnerInfo { source: OwnerInfoFetchError },
#[snafu(display("error updating database owner info: {}", source))]
UpdatingOwnerInfo { source: OwnerInfoUpdateError },
}
type Result<T, E = Error> = std::result::Result<T, E>;
/// Parse the UUID from an object storage path
///
/// TODO: Encode this data directly in server config
fn parse_location(location: &str) -> Result<Uuid> {
// Strip trailing / if any
let location = location.strip_suffix('/').unwrap_or(location);
let uuid = location.rsplit('/').next().unwrap();
std::str::FromStr::from_str(uuid).context(InvalidDatabaseLocationSnafu)
}
#[derive(Debug)]
pub struct ConfigProviderObjectStorage {
object_store: Arc<DynObjectStore>,
time_provider: Arc<dyn TimeProvider>,
}
impl ConfigProviderObjectStorage {
pub fn new(object_store: Arc<DynObjectStore>, time_provider: Arc<dyn TimeProvider>) -> Self {
Self {
object_store,
time_provider,
}
}
fn iox_object_store(&self, uuid: Uuid) -> IoxObjectStore {
let root_path = IoxObjectStore::root_path_for(&*self.object_store, uuid);
IoxObjectStore::existing(Arc::clone(&self.object_store), root_path)
}
}
#[async_trait]
impl ConfigProvider for ConfigProviderObjectStorage {
async fn fetch_server_config(&self, server_id: ServerId) -> ConfigResult<Vec<(String, Uuid)>> {
let fetch_result =
IoxObjectStore::get_server_config_file(&*self.object_store, server_id).await;
let server_config_bytes = match fetch_result {
Ok(bytes) => bytes,
// If this is the first time starting up this server and there is no config file yet,
// this isn't a problem. Start an empty server config.
Err(object_store::Error::NotFound { .. }) => bytes::Bytes::new(),
Err(source) => return Err(Error::FetchServer { source }.into()),
};
let server_config =
generated_types::server_config::decode_persisted_server_config(server_config_bytes)
.context(DeserializeServerSnafu)?;
let config = server_config
.databases
.into_iter()
.map(|(name, location)| Ok((name, parse_location(&location)?)))
.collect::<Result<Vec<_>>>()?;
self.store_server_config(server_id, &config).await?;
Ok(config)
}
async fn store_server_config(
&self,
server_id: ServerId,
config: &[(String, Uuid)],
) -> ConfigResult<()> {
let databases = config
.iter()
.map(|(name, database)| {
(
name.to_string(),
IoxObjectStore::root_path_for(&*self.object_store, *database).to_string(),
)
})
.collect();
let data = management::v1::ServerConfig { databases };
let mut encoded = bytes::BytesMut::new();
generated_types::server_config::encode_persisted_server_config(&data, &mut encoded)
.context(SerializeServerSnafu)?;
let bytes = encoded.freeze();
IoxObjectStore::put_server_config_file(&*self.object_store, server_id, bytes)
.await
.context(StoreServerSnafu)?;
Ok(())
}
async fn fetch_rules(&self, uuid: Uuid) -> ConfigResult<ProvidedDatabaseRules> {
let bytes = IoxObjectStore::load_database_rules(Arc::clone(&self.object_store), uuid)
.await
.context(RulesFetchSnafu)?;
let proto: management::v1::PersistedDatabaseRules =
generated_types::database_rules::decode_persisted_database_rules(bytes)
.context(DeserializeRulesSnafu)?;
let rules: PersistedDatabaseRules = proto.try_into().context(ConvertingRulesSnafu)?;
ensure!(
uuid == rules.uuid(),
UuidMismatchSnafu {
expected: uuid,
actual: rules.uuid()
}
);
Ok(rules.into_inner().1)
}
async fn store_rules(&self, uuid: Uuid, rules: &ProvidedDatabaseRules) -> ConfigResult<()> {
let persisted_database_rules = management::v1::PersistedDatabaseRules {
uuid: uuid.as_bytes().to_vec(),
// Note we save the original version
rules: Some(rules.original().clone()),
};
let mut data = bytes::BytesMut::new();
encode_persisted_database_rules(&persisted_database_rules, &mut data)
.context(SerializeRulesSnafu)?;
self.iox_object_store(uuid)
.put_database_rules_file(data.freeze())
.await
.context(StoreRulesSnafu {
db_name: rules.db_name(),
})?;
Ok(())
}
async fn fetch_owner_info(&self, _server_id: ServerId, uuid: Uuid) -> ConfigResult<OwnerInfo> {
let config = fetch_owner_info(&self.iox_object_store(uuid))
.await
.context(FetchingOwnerInfoSnafu)?;
Ok(config)
}
async fn update_owner_info(&self, server_id: Option<ServerId>, uuid: Uuid) -> ConfigResult<()> {
let path = server_id.map(|server_id| {
IoxObjectStore::server_config_path(&*self.object_store, server_id).to_string()
});
update_owner_info(
server_id,
path,
self.time_provider.now(),
&self.iox_object_store(uuid),
)
.await
.context(UpdatingOwnerInfoSnafu)?;
Ok(())
}
async fn create_owner_info(&self, server_id: ServerId, uuid: Uuid) -> ConfigResult<()> {
let path = IoxObjectStore::server_config_path(&*self.object_store, server_id).to_string();
create_owner_info(server_id, path, &self.iox_object_store(uuid))
.await
.context(CreatingOwnerInfoSnafu)?;
Ok(())
}
}

View File

@ -1,128 +0,0 @@
//! Code related to managing ownership information in owner.pb
use data_types::server_id::ServerId;
use generated_types::influxdata::iox::management;
use iox_object_store::IoxObjectStore;
use iox_time::Time;
use snafu::{ensure, ResultExt, Snafu};
#[derive(Debug, Snafu)]
pub enum OwnerInfoFetchError {
#[snafu(display("error loading owner info: {}", source))]
Loading { source: object_store::Error },
#[snafu(display("error decoding owner info: {}", source))]
Decoding {
source: generated_types::DecodeError,
},
}
pub(crate) async fn fetch_owner_info(
iox_object_store: &IoxObjectStore,
) -> Result<management::v1::OwnerInfo, OwnerInfoFetchError> {
let raw_owner_info = iox_object_store
.get_owner_file()
.await
.context(LoadingSnafu)?;
generated_types::server_config::decode_database_owner_info(raw_owner_info)
.context(DecodingSnafu)
}
#[derive(Debug, Snafu)]
pub enum OwnerInfoCreateError {
#[snafu(display("could not create new owner info file; it already exists"))]
OwnerFileAlreadyExists,
#[snafu(display("error creating database owner info file: {}", source))]
CreatingOwnerFile { source: Box<object_store::Error> },
}
/// Create a new owner info file for this database. Existing content at this location in object
/// storage is an error.
pub(crate) async fn create_owner_info(
server_id: ServerId,
server_location: String,
iox_object_store: &IoxObjectStore,
) -> Result<(), OwnerInfoCreateError> {
ensure!(
matches!(
iox_object_store.get_owner_file().await,
Err(object_store::Error::NotFound { .. })
),
OwnerFileAlreadyExistsSnafu,
);
let owner_info = management::v1::OwnerInfo {
id: server_id.get_u32(),
location: server_location,
transactions: vec![],
};
let mut encoded = bytes::BytesMut::new();
generated_types::server_config::encode_database_owner_info(&owner_info, &mut encoded)
.expect("owner info serialization should be valid");
let encoded = encoded.freeze();
iox_object_store
.put_owner_file(encoded)
.await
.map_err(Box::new)
.context(CreatingOwnerFileSnafu)?;
Ok(())
}
#[derive(Debug, Snafu)]
pub enum OwnerInfoUpdateError {
#[snafu(display("could not fetch existing owner info: {}", source))]
CouldNotFetch { source: OwnerInfoFetchError },
#[snafu(display("error updating database owner info file: {}", source))]
UpdatingOwnerFile { source: object_store::Error },
}
/// Fetch existing owner info, set the `id` and `location`, insert a new entry into the transaction
/// history, and overwrite the contents of the owner file. Errors if the owner info file does NOT
/// currently exist.
pub(crate) async fn update_owner_info(
new_server_id: Option<ServerId>,
new_server_location: Option<String>,
timestamp: Time,
iox_object_store: &IoxObjectStore,
) -> Result<(), OwnerInfoUpdateError> {
let management::v1::OwnerInfo {
id,
location,
mut transactions,
} = fetch_owner_info(iox_object_store)
.await
.context(CouldNotFetchSnafu)?;
let new_transaction = management::v1::OwnershipTransaction {
id,
location,
timestamp: Some(timestamp.date_time().into()),
};
transactions.push(new_transaction);
// TODO: only save latest 100 transactions
let new_owner_info = management::v1::OwnerInfo {
// 0 is not a valid server ID, so it indicates "unowned".
id: new_server_id.map(|s| s.get_u32()).unwrap_or_default(),
// Owner location is empty when the database is unowned.
location: new_server_location.unwrap_or_default(),
transactions,
};
let mut encoded = bytes::BytesMut::new();
generated_types::server_config::encode_database_owner_info(&new_owner_info, &mut encoded)
.expect("owner info serialization should be valid");
let encoded = encoded.freeze();
iox_object_store
.put_owner_file(encoded)
.await
.context(UpdatingOwnerFileSnafu)?;
Ok(())
}

File diff suppressed because it is too large Load Diff

View File

@ -1,731 +0,0 @@
//! Database initialization / creation logic
use crate::{rules::ProvidedDatabaseRules, ApplicationState};
use data_types::{server_id::ServerId, DatabaseName};
use db::{
load::{create_preserved_catalog, load_or_create_preserved_catalog},
write_buffer::WriteBufferConsumer,
DatabaseToCommit, Db, LifecycleWorker,
};
use generated_types::{
database_state::DatabaseState as DatabaseStateCode, influxdata::iox::management,
};
use iox_object_store::IoxObjectStore;
use observability_deps::tracing::{error, info, warn};
use persistence_windows::checkpoint::ReplayPlan;
use rand::{thread_rng, Rng};
use snafu::{ResultExt, Snafu};
use std::{sync::Arc, time::Duration};
use tokio::task::JoinError;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use super::state::DatabaseShared;
/// Errors encountered during initialization of a database
#[derive(Debug, Snafu)]
pub enum InitError {
#[snafu(display("error finding database directory in object storage: {}", source))]
DatabaseObjectStoreLookup {
source: iox_object_store::IoxObjectStoreError,
},
#[snafu(display(
"Database name in deserialized rules ({}) does not match expected value ({})",
actual,
expected
))]
RulesDatabaseNameMismatch { actual: String, expected: String },
#[snafu(display("error loading catalog: {}", source))]
CatalogLoad { source: db::load::Error },
#[snafu(display("error creating write buffer: {}", source))]
CreateWriteBuffer {
source: write_buffer::core::WriteBufferError,
},
#[snafu(display("error during replay: {}", source))]
Replay { source: db::Error },
#[snafu(display("error creating database owner info: {}", source))]
CreatingOwnerInfo { source: crate::config::Error },
#[snafu(display("error getting database owner info: {}", source))]
FetchingOwnerInfo { source: crate::config::Error },
#[snafu(display("error updating database owner info: {}", source))]
UpdatingOwnerInfo { source: crate::config::Error },
#[snafu(display(
"Server ID in the database's owner info file ({}) does not match this server's ID ({})",
actual,
expected
))]
DatabaseOwnerMismatch { actual: u32, expected: u32 },
#[snafu(display(
"The database with UUID `{}` is already owned by the server with ID {}",
uuid,
server_id
))]
CantClaimDatabaseCurrentlyOwned { uuid: Uuid, server_id: u32 },
#[snafu(display("error saving database rules: {}", source))]
SavingRules { source: crate::config::Error },
#[snafu(display("error loading database rules: {}", source))]
LoadingRules { source: crate::config::Error },
#[snafu(display("{}", source))]
IoxObjectStoreError {
source: iox_object_store::IoxObjectStoreError,
},
#[snafu(display("The database with UUID `{}` named `{}` is already active", uuid, name))]
AlreadyActive { name: String, uuid: Uuid },
#[snafu(display("cannot create preserved catalog: {}", source))]
CannotCreatePreservedCatalog { source: db::load::Error },
#[snafu(display("database is not running"))]
Shutdown,
}
/// The Database startup state machine
///
/// ```text
/// (start)
/// |
/// |----------------------------o o-o
/// V V V |
/// [Known]-------------->[OwnerInfoLoadError]
/// | |
/// +---------------------------o
/// |
/// | o-o
/// V V |
/// [OwnerInfoLoaded]----------->[RulesLoadError]
/// | |
/// +---------------------------o
/// |
/// | o-o
/// V V |
/// [RulesLoaded]-------------->[CatalogLoadError]
/// | |
/// +---------------------------o
/// |
/// | o-o
/// V V |
/// [CatalogLoaded]---------->[WriteBufferCreationError]
/// | | | |
/// | | | | o-o
/// | | | V V |
/// | o---------------|-->[ReplayError]
/// | | |
/// +--------------------+-------o
/// |
/// |
/// V
/// [Initialized]
///
/// |
/// V
/// [Shutdown]
/// ```
///
/// A Database starts in [`DatabaseState::Known`] and advances through the
/// non error states in sequential order until either:
///
/// 1. It reaches [`DatabaseState::Initialized`]: Database is initialized
/// 2. An error is encountered, in which case it transitions to one of
/// the error states. We try to recover from all of them. For all except [`DatabaseState::ReplayError`] this is a
/// rather cheap operation since we can just retry the actual operation. For [`DatabaseState::ReplayError`] we need
/// to dump the potentially half-modified in-memory catalog before retrying.
#[derive(Debug, Clone)]
pub(crate) enum DatabaseState {
// Database not running, with an optional shutdown error
Shutdown(Option<Arc<JoinError>>),
// Basic initialization sequence states:
Known(DatabaseStateKnown),
OwnerInfoLoaded(DatabaseStateOwnerInfoLoaded),
RulesLoaded(DatabaseStateRulesLoaded),
CatalogLoaded(DatabaseStateCatalogLoaded),
// Terminal state (success)
Initialized(DatabaseStateInitialized),
// Error states, we'll try to recover from them
OwnerInfoLoadError(DatabaseStateKnown, Arc<InitError>),
RulesLoadError(DatabaseStateOwnerInfoLoaded, Arc<InitError>),
CatalogLoadError(DatabaseStateRulesLoaded, Arc<InitError>),
WriteBufferCreationError(DatabaseStateCatalogLoaded, Arc<InitError>),
ReplayError(DatabaseStateCatalogLoaded, Arc<InitError>),
}
impl std::fmt::Display for DatabaseState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.state_code().fmt(f)
}
}
impl DatabaseState {
// Construct the start state of the database machine
pub fn new_known() -> Self {
Self::Known(DatabaseStateKnown {})
}
pub(crate) fn state_code(&self) -> DatabaseStateCode {
match self {
DatabaseState::Shutdown(_) => DatabaseStateCode::Shutdown,
DatabaseState::Known(_) => DatabaseStateCode::Known,
DatabaseState::OwnerInfoLoaded(_) => DatabaseStateCode::OwnerInfoLoaded,
DatabaseState::RulesLoaded(_) => DatabaseStateCode::RulesLoaded,
DatabaseState::CatalogLoaded(_) => DatabaseStateCode::CatalogLoaded,
DatabaseState::Initialized(_) => DatabaseStateCode::Initialized,
DatabaseState::OwnerInfoLoadError(_, _) => DatabaseStateCode::OwnerInfoLoadError,
DatabaseState::RulesLoadError(_, _) => DatabaseStateCode::RulesLoadError,
DatabaseState::CatalogLoadError(_, _) => DatabaseStateCode::CatalogLoadError,
DatabaseState::WriteBufferCreationError(_, _) => {
DatabaseStateCode::WriteBufferCreationError
}
DatabaseState::ReplayError(_, _) => DatabaseStateCode::ReplayError,
}
}
pub(crate) fn error(&self) -> Option<&Arc<InitError>> {
match self {
DatabaseState::Known(_)
| DatabaseState::Shutdown(_)
| DatabaseState::OwnerInfoLoaded(_)
| DatabaseState::RulesLoaded(_)
| DatabaseState::CatalogLoaded(_)
| DatabaseState::Initialized(_) => None,
DatabaseState::OwnerInfoLoadError(_, e)
| DatabaseState::RulesLoadError(_, e)
| DatabaseState::CatalogLoadError(_, e)
| DatabaseState::WriteBufferCreationError(_, e)
| DatabaseState::ReplayError(_, e) => Some(e),
}
}
pub(crate) fn provided_rules(&self) -> Option<Arc<ProvidedDatabaseRules>> {
match self {
DatabaseState::Known(_)
| DatabaseState::Shutdown(_)
| DatabaseState::OwnerInfoLoaded(_)
| DatabaseState::OwnerInfoLoadError(_, _)
| DatabaseState::RulesLoadError(_, _) => None,
DatabaseState::RulesLoaded(state) | DatabaseState::CatalogLoadError(state, _) => {
Some(Arc::clone(&state.provided_rules))
}
DatabaseState::CatalogLoaded(state)
| DatabaseState::WriteBufferCreationError(state, _)
| DatabaseState::ReplayError(state, _) => Some(Arc::clone(&state.provided_rules)),
DatabaseState::Initialized(state) => Some(Arc::clone(&state.provided_rules)),
}
}
pub(crate) fn owner_info(&self) -> Option<management::v1::OwnerInfo> {
match self {
DatabaseState::Known(_)
| DatabaseState::Shutdown(_)
| DatabaseState::OwnerInfoLoadError(_, _)
| DatabaseState::RulesLoadError(_, _) => None,
DatabaseState::OwnerInfoLoaded(state) => Some(state.owner_info.clone()),
DatabaseState::RulesLoaded(state) | DatabaseState::CatalogLoadError(state, _) => {
Some(state.owner_info.clone())
}
DatabaseState::CatalogLoaded(state)
| DatabaseState::WriteBufferCreationError(state, _)
| DatabaseState::ReplayError(state, _) => Some(state.owner_info.clone()),
DatabaseState::Initialized(state) => Some(state.owner_info.clone()),
}
}
/// Whether this is shutdown
pub(crate) fn is_shutdown(&self) -> bool {
matches!(self, DatabaseState::Shutdown(_))
}
pub(crate) fn get_initialized(&self) -> Option<&DatabaseStateInitialized> {
match self {
DatabaseState::Initialized(state) => Some(state),
_ => None,
}
}
/// Try to advance to the next state
///
/// # Panic
///
/// Panics if the database cannot be advanced (already initialized or shutdown)
async fn advance(self, shared: &DatabaseShared) -> Self {
match self {
Self::Known(state) | Self::OwnerInfoLoadError(state, _) => {
match state.advance(shared).await {
Ok(state) => Self::OwnerInfoLoaded(state),
Err(e) => Self::OwnerInfoLoadError(state, Arc::new(e)),
}
}
Self::OwnerInfoLoaded(state) | Self::RulesLoadError(state, _) => {
match state.advance(shared).await {
Ok(state) => Self::RulesLoaded(state),
Err(e) => Self::RulesLoadError(state, Arc::new(e)),
}
}
Self::RulesLoaded(state) | Self::CatalogLoadError(state, _) => {
match state.advance(shared).await {
Ok(state) => Self::CatalogLoaded(state),
Err(e) => Self::CatalogLoadError(state, Arc::new(e)),
}
}
Self::CatalogLoaded(state) | Self::WriteBufferCreationError(state, _) => {
match state.advance(shared).await {
Ok(state) => Self::Initialized(state),
Err(e @ InitError::CreateWriteBuffer { .. }) => {
Self::WriteBufferCreationError(state, Arc::new(e))
}
Err(e) => Self::ReplayError(state, Arc::new(e)),
}
}
Self::ReplayError(state, _) => {
let state2 = state.rollback();
match state2.advance(shared).await {
Ok(state2) => match state2.advance(shared).await {
Ok(state2) => Self::Initialized(state2),
Err(e) => Self::ReplayError(state, Arc::new(e)),
},
Err(e) => Self::ReplayError(state, Arc::new(e)),
}
}
Self::Initialized(_) => unreachable!(),
Self::Shutdown(_) => unreachable!(),
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct DatabaseStateKnown {}
impl DatabaseStateKnown {
/// Load owner info from object storage and verify it matches the current owner
async fn advance(
&self,
shared: &DatabaseShared,
) -> Result<DatabaseStateOwnerInfoLoaded, InitError> {
let (server_id, uuid) = {
let config = shared.config.read();
(config.server_id, config.database_uuid)
};
let owner_info = shared
.application
.config_provider()
.fetch_owner_info(server_id, uuid)
.await
.context(FetchingOwnerInfoSnafu)?;
if owner_info.id != server_id.get_u32() {
return DatabaseOwnerMismatchSnafu {
actual: owner_info.id,
expected: server_id.get_u32(),
}
.fail();
}
Ok(DatabaseStateOwnerInfoLoaded { owner_info })
}
}
#[derive(Debug, Clone)]
pub(crate) struct DatabaseStateOwnerInfoLoaded {
owner_info: management::v1::OwnerInfo,
}
impl DatabaseStateOwnerInfoLoaded {
/// Load database rules from object storage
async fn advance(
&self,
shared: &DatabaseShared,
) -> Result<DatabaseStateRulesLoaded, InitError> {
let uuid = shared.config.read().database_uuid;
let provided_rules = shared
.application
.config_provider()
.fetch_rules(uuid)
.await
.context(LoadingRulesSnafu)?;
let db_name = shared.config.read().name.clone();
if provided_rules.db_name() != &db_name {
return RulesDatabaseNameMismatchSnafu {
actual: provided_rules.db_name(),
expected: db_name.as_str(),
}
.fail();
}
Ok(DatabaseStateRulesLoaded {
provided_rules: Arc::new(provided_rules),
owner_info: self.owner_info.clone(),
})
}
}
#[derive(Debug, Clone)]
pub(crate) struct DatabaseStateRulesLoaded {
provided_rules: Arc<ProvidedDatabaseRules>,
owner_info: management::v1::OwnerInfo,
}
impl DatabaseStateRulesLoaded {
/// Load catalog from object storage
async fn advance(
&self,
shared: &DatabaseShared,
) -> Result<DatabaseStateCatalogLoaded, InitError> {
let (db_name, wipe_catalog_on_error, skip_replay, server_id) = {
let config = shared.config.read();
(
config.name.clone(),
config.wipe_catalog_on_error,
config.skip_replay,
config.server_id,
)
};
let (preserved_catalog, catalog, replay_plan) = load_or_create_preserved_catalog(
db_name.as_str(),
Arc::clone(&shared.iox_object_store),
Arc::clone(shared.application.metric_registry()),
Arc::clone(shared.application.time_provider()),
wipe_catalog_on_error,
skip_replay,
)
.await
.context(CatalogLoadSnafu)?;
let database_to_commit = DatabaseToCommit {
server_id,
iox_object_store: Arc::clone(&shared.iox_object_store),
exec: Arc::clone(shared.application.executor()),
rules: Arc::clone(self.provided_rules.rules()),
preserved_catalog,
catalog,
metric_registry: Arc::clone(shared.application.metric_registry()),
time_provider: Arc::clone(shared.application.time_provider()),
};
let db = Arc::new(Db::new(
database_to_commit,
Arc::clone(shared.application.job_registry()),
));
let lifecycle_worker = Arc::new(LifecycleWorker::new(Arc::clone(&db)));
Ok(DatabaseStateCatalogLoaded {
db,
lifecycle_worker,
replay_plan: Arc::new(replay_plan),
provided_rules: Arc::clone(&self.provided_rules),
owner_info: self.owner_info.clone(),
})
}
}
#[derive(Debug, Clone)]
pub(crate) struct DatabaseStateCatalogLoaded {
db: Arc<Db>,
lifecycle_worker: Arc<LifecycleWorker>,
replay_plan: Arc<Option<ReplayPlan>>,
provided_rules: Arc<ProvidedDatabaseRules>,
owner_info: management::v1::OwnerInfo,
}
impl DatabaseStateCatalogLoaded {
/// Perform replay
async fn advance(
&self,
shared: &DatabaseShared,
) -> Result<DatabaseStateInitialized, InitError> {
let db = Arc::clone(&self.db);
let rules = self.provided_rules.rules();
let trace_collector = shared.application.trace_collector();
let write_buffer_factory = shared.application.write_buffer_factory();
let (db_name, skip_replay) = {
let config = shared.config.read();
(config.name.clone(), config.skip_replay)
};
let write_buffer_consumer = match rules.write_buffer_connection.as_ref() {
Some(connection) => {
let consumer = write_buffer_factory
.new_config_read(db_name.as_str(), trace_collector.as_ref(), connection)
.await
.context(CreateWriteBufferSnafu)?;
let replay_plan = if skip_replay {
None
} else {
self.replay_plan.as_ref().as_ref()
};
let streams = db
.perform_replay(replay_plan, Arc::clone(&consumer))
.await
.context(ReplaySnafu)?;
Some(Arc::new(WriteBufferConsumer::new(
consumer,
streams,
Arc::clone(&db),
shared.application.metric_registry().as_ref(),
)))
}
_ => None,
};
self.lifecycle_worker.unsuppress_persistence();
Ok(DatabaseStateInitialized {
db,
write_buffer_consumer,
lifecycle_worker: Arc::clone(&self.lifecycle_worker),
provided_rules: Arc::clone(&self.provided_rules),
owner_info: self.owner_info.clone(),
})
}
/// Rolls back state to an unloaded catalog.
pub(crate) fn rollback(&self) -> DatabaseStateRulesLoaded {
warn!(db_name=%self.db.name(), "throwing away loaded catalog to recover from replay error");
DatabaseStateRulesLoaded {
provided_rules: Arc::clone(&self.provided_rules),
owner_info: self.owner_info.clone(),
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct DatabaseStateInitialized {
db: Arc<Db>,
write_buffer_consumer: Option<Arc<WriteBufferConsumer>>,
lifecycle_worker: Arc<LifecycleWorker>,
provided_rules: Arc<ProvidedDatabaseRules>,
owner_info: management::v1::OwnerInfo,
}
impl DatabaseStateInitialized {
pub fn db(&self) -> &Arc<Db> {
&self.db
}
pub fn write_buffer_consumer(&self) -> Option<&Arc<WriteBufferConsumer>> {
self.write_buffer_consumer.as_ref()
}
pub fn set_provided_rules(&mut self, provided_rules: Arc<ProvidedDatabaseRules>) {
self.provided_rules = provided_rules
}
/// Get a reference to the database state initialized's lifecycle worker.
pub(crate) fn lifecycle_worker(&self) -> &Arc<LifecycleWorker> {
&self.lifecycle_worker
}
}
const INIT_BACKOFF: Duration = Duration::from_secs(1);
const MAX_BACKOFF: Duration = Duration::from_secs(500);
/// Try to drive the database to `DatabaseState::Initialized` returns when
/// this is achieved or the shutdown signal is triggered
pub(crate) async fn initialize_database(shared: &DatabaseShared, shutdown: CancellationToken) {
let db_name = shared.config.read().name.clone();
info!(%db_name, "database initialization started");
// A backoff duration for retrying errors that will change over the course of multiple errors
let mut backoff = INIT_BACKOFF;
while !shutdown.is_cancelled() {
let handle = shared.state.read().freeze();
let handle = handle.await;
// Re-acquire read lock to avoid holding lock across await point
let state = DatabaseState::clone(&shared.state.read());
info!(%db_name, %state, "attempting to advance database initialization state");
match &state {
DatabaseState::Initialized(_) => break,
DatabaseState::Shutdown(_) => {
info!(%db_name, "database in shutdown - aborting initialization");
shutdown.cancel();
return;
}
_ => {}
}
// Try to advance to the next state
let next_state = tokio::select! {
next_state = state.advance(shared) => next_state,
_ = shutdown.cancelled() => {
info!(%db_name, "initialization aborted by shutdown");
return
}
};
let state_code = next_state.state_code();
let maybe_error = next_state.error().cloned();
// Commit the next state
{
let mut state = shared.state.write();
info!(%db_name, from=%state, to=%next_state, "database initialization state transition");
*state.unfreeze(handle) = next_state;
shared.state_notify.notify_waiters();
}
match maybe_error {
Some(error) => {
// exponential backoff w/ jitter, decorrelated
// see https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
backoff = Duration::from_secs_f64(
MAX_BACKOFF.as_secs_f64().min(
thread_rng()
.gen_range(INIT_BACKOFF.as_secs_f64()..(backoff.as_secs_f64() * 3.0)),
),
);
error!(
%db_name,
%error,
state=%state_code,
backoff_secs = backoff.as_secs_f64(),
"database in error state - backing off initialization"
);
// Wait for timeout or shutdown signal
tokio::select! {
_ = tokio::time::sleep(backoff) => {},
_ = shutdown.cancelled() => {}
}
}
None => {
// reset backoff
backoff = INIT_BACKOFF;
}
}
}
}
/// Create fresh database without any any state. Returns its location in object storage
/// for saving in the server config file.
pub async fn create_empty_db_in_object_store(
application: Arc<ApplicationState>,
uuid: Uuid,
provided_rules: ProvidedDatabaseRules,
server_id: ServerId,
) -> Result<String, InitError> {
let db_name = provided_rules.db_name().clone();
let iox_object_store = Arc::new(
match IoxObjectStore::create(Arc::clone(&application.object_store()), uuid).await {
Ok(ios) => ios,
Err(source) => return Err(InitError::IoxObjectStoreError { source }),
},
);
let database_location = iox_object_store.root_path();
application
.config_provider()
.create_owner_info(server_id, uuid)
.await
.context(CreatingOwnerInfoSnafu)?;
application
.config_provider()
.store_rules(uuid, &provided_rules)
.await
.context(SavingRulesSnafu)?;
create_preserved_catalog(
&db_name,
iox_object_store,
Arc::clone(application.metric_registry()),
Arc::clone(application.time_provider()),
true,
)
.await
.context(CannotCreatePreservedCatalogSnafu)?;
Ok(database_location)
}
/// Create an claimed database without any state. Returns its
/// location in object storage for saving in the server config
/// file.
///
/// if `force` is true, a missing owner info or owner info that is
/// for the wrong server id are ignored (do not cause errors)
pub async fn claim_database_in_object_store(
application: Arc<ApplicationState>,
db_name: &DatabaseName<'static>,
uuid: Uuid,
server_id: ServerId,
force: bool,
) -> Result<String, InitError> {
info!(%db_name, %uuid, %force, "claiming database");
let iox_object_store = IoxObjectStore::load(Arc::clone(&application.object_store()), uuid)
.await
.context(IoxObjectStoreSnafu)?;
let owner_info = application
.config_provider()
.fetch_owner_info(server_id, uuid)
.await
.context(FetchingOwnerInfoSnafu);
// try to recreate owner_info if force is specified
let owner_info = match owner_info {
Err(_) if force => {
warn!("Attempting to recreate missing owner info due to force");
application
.config_provider()
.create_owner_info(server_id, uuid)
.await
.context(CreatingOwnerInfoSnafu)?;
application
.config_provider()
.fetch_owner_info(server_id, uuid)
.await
.context(FetchingOwnerInfoSnafu)
}
t => t,
}?;
if owner_info.id != 0 {
if !force {
return CantClaimDatabaseCurrentlyOwnedSnafu {
uuid,
server_id: owner_info.id,
}
.fail();
} else {
warn!(
owner_id = owner_info.id,
"Ignoring owner info mismatch due to force"
);
}
}
let database_location = iox_object_store.root_path();
application
.config_provider()
.update_owner_info(Some(server_id), uuid)
.await
.context(UpdatingOwnerInfoSnafu)?;
Ok(database_location)
}

View File

@ -1,43 +0,0 @@
//! Database initialization states
use crate::ApplicationState;
use data_types::{server_id::ServerId, DatabaseName};
use internal_types::freezable::Freezable;
use iox_object_store::IoxObjectStore;
use parking_lot::RwLock;
use std::sync::Arc;
use tokio::sync::Notify;
use uuid::Uuid;
use super::init::DatabaseState;
#[derive(Debug, Clone)]
/// Information about where a database is located on object store,
/// and how to perform startup activities.
pub struct DatabaseConfig {
pub name: DatabaseName<'static>,
pub server_id: ServerId,
pub database_uuid: Uuid,
pub wipe_catalog_on_error: bool,
pub skip_replay: bool,
}
/// State shared with the `Database` background worker
#[derive(Debug)]
pub(crate) struct DatabaseShared {
/// Configuration provided to the database at startup
pub(crate) config: RwLock<DatabaseConfig>,
/// Application-global state
pub(crate) application: Arc<ApplicationState>,
/// Database object store
pub(crate) iox_object_store: Arc<IoxObjectStore>,
/// The initialization state of the `Database`, wrapped in a
/// `Freezable` to ensure there is only one task with an
/// outstanding intent to write at any time.
pub(crate) state: RwLock<Freezable<DatabaseState>>,
/// Notify that the database state has changed
pub(crate) state_notify: Notify,
}

File diff suppressed because it is too large Load Diff

View File

@ -1,134 +0,0 @@
use data_types::{database_rules::DatabaseRules, DatabaseName};
use generated_types::{
google::{FieldViolation, FieldViolationExt},
influxdata::iox::management,
};
use std::{
convert::{TryFrom, TryInto},
sync::Arc,
};
use uuid::Uuid;
/// The configuration ([`DatabaseRules`]) used to create and update
/// databases, both in original and "materialized" (with defaults filled in) form.
///
/// The rationale for storing both the rules as they were provided
/// *and* materialized form is provide the property that if the same
/// rules are sent to a database that were previously sent the
/// database will still be runing the same configuration. If the
/// materialized configuration was stored, and then the defaults were
/// changed in a new version of the software, the required property
/// would not hold.
///
/// While this may sound like an esoteric corner case with little real
/// world impact, it has non trivial real world implications for
/// keeping the configurations of fleets of IOx servers in sync. See
/// <https://github.com/influxdata/influxdb_iox/issues/2409> for
/// further gory details.
///
/// A design goal is to keep the notion of "what user provided" as
/// isolated as much as possible so only the server crate worries
/// about what the user actually provided and the rest of the system
/// can use `data_types::database_rules::PersistedDatabaseRules` in
/// blissful ignorance of such subtlties
#[derive(Debug, Clone)]
pub struct ProvidedDatabaseRules {
/// Full database rules, with all fields set. Derived from
/// `original` by applying default values.
full: Arc<DatabaseRules>,
/// Encoded database rules, as provided by the user and as stored
/// in the object store (may not have all fields set).
original: management::v1::DatabaseRules,
}
impl ProvidedDatabaseRules {
// Create a new database with a default database
pub fn new_empty(db_name: DatabaseName<'static>) -> Self {
let original = management::v1::DatabaseRules {
name: db_name.to_string(),
..Default::default()
};
// Should always be able to create a DBRules with default values
let full = Arc::new(original.clone().try_into().expect("creating empty rules"));
Self { full, original }
}
pub fn new_rules(original: management::v1::DatabaseRules) -> Result<Self, FieldViolation> {
let full = Arc::new(original.clone().try_into()?);
Ok(Self { full, original })
}
/// returns the name of the database in the rules
pub fn db_name(&self) -> &DatabaseName<'static> {
&self.full.name
}
/// Return the full database rules
pub fn rules(&self) -> &Arc<DatabaseRules> {
&self.full
}
/// Return the original rules provided to this
pub fn original(&self) -> &management::v1::DatabaseRules {
&self.original
}
}
#[derive(Debug, Clone)]
pub struct PersistedDatabaseRules {
uuid: Uuid,
provided: ProvidedDatabaseRules,
}
impl PersistedDatabaseRules {
pub fn new(uuid: Uuid, provided: ProvidedDatabaseRules) -> Self {
Self { uuid, provided }
}
pub fn uuid(&self) -> Uuid {
self.uuid
}
pub fn db_name(&self) -> &DatabaseName<'static> {
&self.provided.full.name
}
/// Return the full database rules
pub fn rules(&self) -> &Arc<DatabaseRules> {
&self.provided.full
}
/// Return the original rules provided to this
pub fn original(&self) -> &management::v1::DatabaseRules {
&self.provided.original
}
/// Convert to its inner representation
pub fn into_inner(self) -> (Uuid, ProvidedDatabaseRules) {
(self.uuid, self.provided)
}
}
impl TryFrom<management::v1::PersistedDatabaseRules> for PersistedDatabaseRules {
type Error = FieldViolation;
/// Create a new PersistedDatabaseRules from a grpc message
fn try_from(proto: management::v1::PersistedDatabaseRules) -> Result<Self, Self::Error> {
let original: management::v1::DatabaseRules = proto
.rules
.ok_or_else(|| FieldViolation::required("rules"))?;
let full = Arc::new(original.clone().try_into()?);
let uuid = Uuid::from_slice(&proto.uuid).scope("uuid")?;
Ok(Self {
uuid,
provided: ProvidedDatabaseRules { full, original },
})
}
}

View File

@ -1,255 +0,0 @@
use arrow_util::assert_batches_sorted_eq;
use data_types::{
chunk_metadata::ChunkStorage,
database_rules::{DatabaseRules, LifecycleRules, PartitionTemplate, TemplatePart},
delete_predicate::{DeleteExpr, DeletePredicate},
server_id::ServerId,
timestamp::TimestampRange,
DatabaseName,
};
use db::{
test_helpers::{run_query, write_lp},
Db,
};
use futures::TryStreamExt;
use query::{QueryChunk, QueryDatabase};
use server::{
rules::ProvidedDatabaseRules,
test_utils::{make_application, make_initialized_server},
};
use std::{
num::{NonZeroU32, NonZeroU64},
sync::Arc,
time::{Duration, Instant},
};
use test_helpers::maybe_start_logging;
#[tokio::test]
async fn delete_predicate_preservation() {
maybe_start_logging();
// ==================== setup ====================
let server_id = ServerId::new(NonZeroU32::new(1).unwrap());
let db_name = DatabaseName::new("delete_predicate_preservation_test").unwrap();
let application = make_application();
let server = make_initialized_server(server_id, Arc::clone(&application)).await;
// Test that delete predicates are stored within the preserved catalog
// ==================== do: create DB ====================
// Create a DB given a server id, an object store and a db name
let rules = DatabaseRules {
partition_template: PartitionTemplate {
parts: vec![TemplatePart::Column("part".to_string())],
},
lifecycle_rules: LifecycleRules {
catalog_transactions_until_checkpoint: NonZeroU64::new(1).unwrap(),
// do not prune transactions files because this tests relies on them
catalog_transaction_prune_age: Duration::from_secs(1_000),
late_arrive_window_seconds: NonZeroU32::new(1).unwrap(),
..Default::default()
},
..DatabaseRules::new(db_name.clone())
};
let database = server
.create_database(ProvidedDatabaseRules::new_rules(rules.clone().into()).unwrap())
.await
.unwrap();
let db = database.initialized_db().unwrap();
// ==================== do: create chunks ====================
let table_name = "cpu";
// 1: preserved
let partition_key = "part_a";
write_lp(&db, "cpu,part=a row=10,selector=0i 10");
write_lp(&db, "cpu,part=a row=11,selector=1i 11");
db.persist_partition(table_name, partition_key, true)
.await
.unwrap();
// 2: RUB
let partition_key = "part_b";
write_lp(&db, "cpu,part=b row=20,selector=0i 20");
write_lp(&db, "cpu,part=b row=21,selector=1i 21");
db.compact_partition(table_name, partition_key)
.await
.unwrap();
// 3: MUB
let _partition_key = "part_c";
write_lp(&db, "cpu,part=c row=30,selector=0i 30");
write_lp(&db, "cpu,part=c row=31,selector=1i 31");
// 4: preserved and unloaded
let partition_key = "part_d";
write_lp(&db, "cpu,part=d row=40,selector=0i 40");
write_lp(&db, "cpu,part=d row=41,selector=1i 41");
let chunk_id = db
.persist_partition(table_name, partition_key, true)
.await
.unwrap()
.unwrap()
.id();
db.unload_read_buffer(table_name, partition_key, chunk_id)
.unwrap();
// ==================== do: delete ====================
let pred = Arc::new(DeletePredicate {
range: TimestampRange::new(0, 1_000),
exprs: vec![DeleteExpr::new(
"selector".to_string(),
data_types::delete_predicate::Op::Eq,
data_types::delete_predicate::Scalar::I64(1),
)],
});
db.delete("cpu", Arc::clone(&pred)).unwrap();
// ==================== do: preserve another partition ====================
let partition_key = "part_b";
db.persist_partition(table_name, partition_key, true)
.await
.unwrap();
// ==================== do: use background worker for a short while ====================
let iters_start = db.worker_iterations_delete_predicate_preservation();
// time_provider.inc(rules.lifecycle_rules.late_arrive_window());
let t_0 = Instant::now();
loop {
let did_delete_predicate_preservation =
db.worker_iterations_delete_predicate_preservation() > iters_start;
let did_compaction = db.chunk_summaries().into_iter().any(|summary| {
(summary.partition_key.as_ref() == "part_c")
&& (summary.storage == ChunkStorage::ReadBuffer)
});
if did_delete_predicate_preservation && did_compaction {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
// ==================== check: delete predicates ====================
let closure_check_delete_predicates = |db: &Arc<Db>| {
let db = Arc::clone(db);
let pred = pred.clone();
async move {
let chunks = db
.chunks(table_name, &Default::default())
.await
.expect("error getting chunks");
for chunk in chunks {
let addr = chunk.addr();
let partition_key = addr.partition_key.as_ref();
if partition_key == "part_b" {
// Strictly speaking not required because the chunk was persisted AFTER the delete predicate was
// registered so we can get away with materializing it during persistence.
continue;
}
if partition_key == "part_c" {
// This partition was compacted, so the delete predicates were materialized.
continue;
}
let predicates = chunk.delete_predicates();
assert_eq!(predicates.len(), 1);
assert_eq!(predicates[0].as_ref(), pred.as_ref());
}
}
};
closure_check_delete_predicates(&db).await;
// ==================== check: query ====================
let expected = vec![
"+------+-----+----------+--------------------------------+",
"| part | row | selector | time |",
"+------+-----+----------+--------------------------------+",
"| a | 10 | 0 | 1970-01-01T00:00:00.000000010Z |",
"| b | 20 | 0 | 1970-01-01T00:00:00.000000020Z |",
"| c | 30 | 0 | 1970-01-01T00:00:00.000000030Z |",
"| d | 40 | 0 | 1970-01-01T00:00:00.000000040Z |",
"+------+-----+----------+--------------------------------+",
];
let batches = run_query(Arc::clone(&db), "select * from cpu order by time").await;
assert_batches_sorted_eq!(&expected, &batches);
// ==================== do: re-load DB ====================
// Re-create database with same store, serverID, and DB name
database.restart().await.unwrap();
let db = database.initialized_db().unwrap();
// ==================== check: delete predicates ====================
closure_check_delete_predicates(&db).await;
// ==================== check: query ====================
// NOTE: partition "c" is gone here because it was not written to object store
let expected = vec![
"+------+-----+----------+--------------------------------+",
"| part | row | selector | time |",
"+------+-----+----------+--------------------------------+",
"| a | 10 | 0 | 1970-01-01T00:00:00.000000010Z |",
"| b | 20 | 0 | 1970-01-01T00:00:00.000000020Z |",
"| d | 40 | 0 | 1970-01-01T00:00:00.000000040Z |",
"+------+-----+----------+--------------------------------+",
];
let batches = run_query(Arc::clone(&db), "select * from cpu order by time").await;
assert_batches_sorted_eq!(&expected, &batches);
database.restart().await.unwrap();
// ==================== do: remove checkpoint files ====================
let iox_object_store = database.iox_object_store();
let files = iox_object_store
.catalog_transaction_files()
.await
.unwrap()
.try_concat()
.await
.unwrap();
let mut deleted_one = false;
for file in files {
if file.is_checkpoint() {
iox_object_store
.delete_catalog_transaction_file(&file)
.await
.unwrap();
deleted_one = true;
}
}
assert!(deleted_one);
// ==================== do: re-load DB ====================
// Re-create database with same store, serverID, and DB name
database.restart().await.unwrap();
let db = database.initialized_db().unwrap();
// ==================== check: delete predicates ====================
closure_check_delete_predicates(&db).await;
// ==================== check: query ====================
// NOTE: partition "c" is gone here because it was not written to object store
let _expected = vec![
"+------+-----+----------+--------------------------------+",
"| part | row | selector | time |",
"+------+-----+----------+--------------------------------+",
"| a | 10 | 0 | 1970-01-01T00:00:00.000000010Z |",
"| b | 20 | 0 | 1970-01-01T00:00:00.000000020Z |",
"| d | 40 | 0 | 1970-01-01T00:00:00.000000040Z |",
"+------+-----+----------+--------------------------------+",
];
let batches = run_query(Arc::clone(&db), "select * from cpu order by time").await;
assert_batches_sorted_eq!(&expected, &batches);
server.shutdown();
server.join().await.unwrap();
}

View File

@ -1,230 +0,0 @@
use arrow_util::assert_batches_eq;
use data_types::{
delete_predicate::{DeleteExpr, DeletePredicate, Op, Scalar},
router::{
Matcher, MatcherToShard, QuerySinks, Router as RouterConfig, ShardConfig, ShardId,
WriteSink, WriteSinkSet, WriteSinkVariant,
},
server_id::ServerId,
timestamp::TimestampRange,
DatabaseName,
};
use db::{test_helpers::wait_for_tables, Db};
use dml::{DmlDelete, DmlOperation, DmlWrite};
use generated_types::influxdata::iox::{
management::v1::DatabaseRules, write_buffer::v1::WriteBufferConnection,
};
use mutable_batch_lp::lines_to_batches;
use query::{exec::ExecutionContextProvider, frontend::sql::SqlQueryPlanner};
use regex::Regex;
use router::{router::Router, server::RouterServer};
use server::{
rules::ProvidedDatabaseRules,
test_utils::{make_application, make_initialized_server},
Server,
};
use std::{collections::BTreeMap, num::NonZeroU32, sync::Arc};
use write_buffer::mock::MockBufferSharedState;
/// A distributed IOx topology consisting of a router and a database, separated by a write buffer
///
/// There is some overlap with `ReplayTest` in `server` and `ServerFixture` in the end-to-end
/// tests. The former is primarily concerned with the interaction of replay and persistence,
/// whilst the latter is concerned with the behaviour of the process as a whole.
///
/// `DistributedTest` sits somewhere in the middle, it is not concerned with the details of
/// persistence or replay, but is still at a low enough level that it can manipulate the server
/// APIs directly and is not restricted to what is exposed over gRPC.
///
/// It primarily exists to test the routing logic.
///
struct DistributedTest {
router: Arc<Router>,
consumer: Arc<Server>,
consumer_db: Arc<Db>,
}
impl DistributedTest {
/// Create a new DistributedTest
pub async fn new(db_name: &DatabaseName<'static>) -> Self {
let write_buffer_state =
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::new(1).unwrap());
let application = make_application();
application
.write_buffer_factory()
.register_mock("my_mock".to_string(), write_buffer_state);
let write_buffer_connection = WriteBufferConnection {
r#type: "mock".to_string(),
connection: "my_mock".to_string(),
connection_config: Default::default(),
creation_config: None,
};
// Create a router
let router_server = RouterServer::for_testing(
None,
None,
Arc::clone(application.time_provider()),
Some(Arc::clone(application.write_buffer_factory())),
)
.await;
let router_id = ServerId::new(NonZeroU32::new(1).unwrap());
router_server.set_server_id(router_id).unwrap();
router_server.update_router(RouterConfig {
name: db_name.to_string(),
write_sharder: ShardConfig {
specific_targets: vec![MatcherToShard {
matcher: Matcher {
table_name_regex: Some(Regex::new(".*").unwrap()),
},
shard: ShardId::new(1),
}],
hash_ring: None,
},
write_sinks: BTreeMap::from([(
ShardId::new(1),
WriteSinkSet {
sinks: vec![WriteSink {
sink: WriteSinkVariant::WriteBuffer(
write_buffer_connection.clone().try_into().unwrap(),
),
ignore_errors: false,
}],
},
)]),
query_sinks: QuerySinks::default(),
});
let router = router_server.router(db_name).unwrap();
// Create a consumer
let consumer_id = ServerId::new(NonZeroU32::new(2).unwrap());
let consumer = make_initialized_server(consumer_id, Arc::clone(&application)).await;
let consumer_db = consumer
.create_database(
ProvidedDatabaseRules::new_rules(DatabaseRules {
name: db_name.to_string(),
write_buffer_connection: Some(write_buffer_connection.clone()),
..Default::default()
})
.unwrap(),
)
.await
.unwrap()
.initialized_db()
.unwrap();
Self {
router,
consumer,
consumer_db,
}
}
/// Wait for the consumer to have the following tables
pub async fn wait_for_tables(&self, expected_tables: &[&str]) {
wait_for_tables(&self.consumer_db, expected_tables).await
}
/// Write line protocol
pub async fn write(&self, lp: &str) {
self.router
.write(DmlOperation::Write(DmlWrite::new(
self.consumer_db.name().as_ref(),
lines_to_batches(lp, 0).unwrap(),
Default::default(),
)))
.await
.unwrap();
}
pub async fn delete(&self, delete: DmlDelete) {
// TODO: Write to router not Db (#2980)
self.router
.write(DmlOperation::Delete(delete))
.await
.unwrap();
}
/// Perform a query and assert the result
pub async fn query(&self, query: &str, expected: &[&'static str]) {
let ctx = self.consumer_db.new_query_context(None);
let physical_plan = SqlQueryPlanner::new().query(query, &ctx).await.unwrap();
let batches = ctx.collect(physical_plan).await.unwrap();
assert_batches_eq!(expected, &batches);
}
/// Shuts down the fixture and waits for the servers to exit
pub async fn drain(&self) {
self.consumer.shutdown();
self.consumer.join().await.unwrap();
}
}
#[tokio::test]
async fn write_buffer_deletes() {
let db_name = DatabaseName::new("distributed").unwrap();
let fixture = DistributedTest::new(&db_name).await;
// Write some data
fixture.write("foo x=1 1").await;
fixture.write("foo x=3 2").await;
// Send a delete over the write buffer
fixture
.delete(DmlDelete::new(
db_name.as_str(),
DeletePredicate {
range: TimestampRange::new(0, 20),
exprs: vec![DeleteExpr {
column: "x".to_string(),
op: Op::Eq,
scalar: Scalar::I64(1),
}],
},
None,
Default::default(),
))
.await;
// Use a write to a different table to signal consumption has completed by waiting
// for the this new table to exist in the consumer database
fixture.write("bar x=2 1").await;
// Wait for consumer to catch up
fixture.wait_for_tables(&["bar", "foo"]).await;
fixture
.query(
"select * from foo;",
&[
"+--------------------------------+---+",
"| time | x |",
"+--------------------------------+---+",
"| 1970-01-01T00:00:00.000000002Z | 3 |",
"+--------------------------------+---+",
],
)
.await;
fixture
.query(
"select * from bar;",
&[
"+--------------------------------+---+",
"| time | x |",
"+--------------------------------+---+",
"| 1970-01-01T00:00:00.000000001Z | 2 |",
"+--------------------------------+---+",
],
)
.await;
fixture.drain().await;
}

View File

@ -1,274 +0,0 @@
use arrow_util::assert_batches_eq;
use data_types::{
chunk_metadata::ChunkStorage,
database_rules::{DatabaseRules, LifecycleRules, PartitionTemplate, TemplatePart},
sequence::Sequence,
server_id::ServerId,
write_buffer::WriteBufferConnection,
DatabaseName,
};
use db::{
test_helpers::{run_query, wait_for_tables},
Db,
};
use futures_util::FutureExt;
use server::{
rules::ProvidedDatabaseRules,
test_utils::{make_application, make_initialized_server},
};
use std::{
num::{NonZeroU32, NonZeroUsize},
sync::Arc,
time::{Duration, Instant},
};
use test_helpers::{assert_contains, tracing::TracingCapture};
use write_buffer::mock::MockBufferSharedState;
#[tokio::test]
async fn write_buffer_lifecycle() {
// Test the interaction between the write buffer and the lifecycle
let tracing_capture = TracingCapture::new();
// ==================== setup ====================
let server_id = ServerId::new(NonZeroU32::new(1).unwrap());
let db_name = DatabaseName::new("delete_predicate_preservation_test").unwrap();
let application = make_application();
let mock_shared_state =
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::new(1).unwrap());
// The writes are split into two groups to allow "pausing replay" by playing back from
// a MockBufferSharedState with only the first set of writes
write_group1(&mock_shared_state);
write_group2(&mock_shared_state);
application
.write_buffer_factory()
.register_mock("my_mock".to_string(), mock_shared_state.clone());
let server = make_initialized_server(server_id, Arc::clone(&application)).await;
let partition_template = PartitionTemplate {
parts: vec![TemplatePart::Column("tag_partition_by".to_string())],
};
let write_buffer_connection = WriteBufferConnection {
type_: "mock".to_string(),
connection: "my_mock".to_string(),
..Default::default()
};
//
// Phase 1: Verify that consuming from a write buffer will wait for compaction in the event
// the hard limit is exceeded
//
// create DB
let rules = DatabaseRules {
partition_template: partition_template.clone(),
lifecycle_rules: LifecycleRules {
buffer_size_hard: Some(NonZeroUsize::new(10_000).unwrap()),
mub_row_threshold: NonZeroUsize::new(10).unwrap(),
..Default::default()
},
write_buffer_connection: Some(write_buffer_connection.clone()),
..DatabaseRules::new(db_name.clone())
};
let database = server
.create_database(ProvidedDatabaseRules::new_rules(rules.into()).unwrap())
.await
.unwrap();
let db = database.initialized_db().unwrap();
// after a while the table should exist
wait_for_tables(&db, &["table_1", "table_2"]).await;
// no rows should be dropped
let batches = run_query(Arc::clone(&db), "select sum(bar) as n from table_1").await;
let expected = vec!["+----+", "| n |", "+----+", "| 25 |", "+----+"];
assert_batches_eq!(expected, &batches);
// check that hard buffer limit was actually hit (otherwise this test is pointless/outdated)
assert_contains!(
tracing_capture.to_string(),
"Hard limit reached while reading from write buffer, waiting for compaction to catch up"
);
// Persist the final write, this will ensure that we have to replay the data for table_1
db.persist_partition("table_2", "tag_partition_by_a", true)
.await
.unwrap();
// Only table_2 should be persisted
assert_eq!(count_persisted_chunks(&db), 1);
// Shutdown server
server.shutdown();
server.join().await.unwrap();
// Drop so they don't contribute to metrics
std::mem::drop(server);
std::mem::drop(database);
std::mem::drop(db);
std::mem::drop(tracing_capture);
//
// Phase 2: Verify that replaying from a write buffer will wait for compaction in the event
// the hard limit is exceeded
//
// Recreate server
let tracing_capture = TracingCapture::new();
let server = make_initialized_server(server_id, Arc::clone(&application)).await;
let databases = server.databases().unwrap();
assert_eq!(databases.len(), 1);
let database = databases.into_iter().next().unwrap();
database.wait_for_init().await.unwrap();
let database_uuid = database.uuid();
let db = database.initialized_db().unwrap();
let batches = run_query(Arc::clone(&db), "select sum(bar) as n from table_1").await;
let expected = vec!["+----+", "| n |", "+----+", "| 25 |", "+----+"];
assert_batches_eq!(expected, &batches);
assert_contains!(
tracing_capture.to_string(),
"Hard limit reached while replaying, waiting for compaction to catch up"
);
// Only table_2 should be persisted
assert_eq!(count_persisted_chunks(&db), 1);
server.shutdown();
server.join().await.unwrap();
//
// Phase 3: Verify that persistence is disabled during replay
//
// Override rules to set persist row threshold lower on restart
let rules = ProvidedDatabaseRules::new_rules(
DatabaseRules {
partition_template,
lifecycle_rules: LifecycleRules {
persist: true,
late_arrive_window_seconds: NonZeroU32::new(1).unwrap(),
persist_row_threshold: NonZeroUsize::new(5).unwrap(),
..Default::default()
},
write_buffer_connection: Some(write_buffer_connection),
..DatabaseRules::new(db_name.clone())
}
.into(),
)
.unwrap();
application
.config_provider()
.store_rules(database_uuid, &rules)
.await
.unwrap();
std::mem::drop(server);
std::mem::drop(database);
std::mem::drop(db);
std::mem::drop(tracing_capture);
// Clear the write buffer and only write in the first group of writes
mock_shared_state.clear_messages(0);
write_group1(&mock_shared_state);
// Restart server - this will load new rules written above
let server = make_initialized_server(server_id, Arc::clone(&application)).await;
let databases = server.databases().unwrap();
assert_eq!(databases.len(), 1);
let database = databases.into_iter().next().unwrap();
// Sleep for a bit to allow the lifecycle policy to run a bit
//
// During this time replay should still be running as there is insufficient
// writes within the write buffer to "complete" replay.
//
// However, there are sufficient rows to exceed the persist row threshold.
//
// Therefore, if persist were not disabled by replay, the lifecycle would try
// to persist without the full set of writes. This would in turn result
// in two separate chunks being persisted for table_1
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(
database.wait_for_init().now_or_never().is_none(),
"replay shouldn't have finished as insufficient data"
);
// Write in remainder of data to allow replay to finish
write_group2(&mock_shared_state);
database.wait_for_init().await.unwrap();
let db = database.initialized_db().unwrap();
let start = Instant::now();
loop {
if count_persisted_chunks(&db) > 1 {
// As soon as replay finishes the lifecycle should have persisted everything in
// table_1 into a single chunk. We should therefore have two chunks, one for
// each of table_1 and table_2
assert_eq!(db.chunk_summaries().len(), 2, "persisted during replay!");
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
assert!(
start.elapsed() < Duration::from_secs(10),
"failed to persist chunk"
)
}
server.shutdown();
server.join().await.unwrap();
}
/// The first set of writes for the write buffer
fn write_group1(write_buffer_state: &MockBufferSharedState) {
// setup write buffer
// these numbers are handtuned to trigger hard buffer limits w/o making the test too big
let n_entries = 50u64;
for sequence_number in 0..n_entries {
let lp = format!(
"table_1,tag_partition_by=a foo=\"hello\",bar=1 {}",
sequence_number / 2
);
write_buffer_state.push_lp(Sequence::new(0, sequence_number), &lp);
}
}
/// The second set of writes for the write buffer
fn write_group2(write_buffer_state: &MockBufferSharedState) {
// Write line with timestamp 0 - this forces persistence to persist all
// prior writes if the server has read this line
write_buffer_state.push_lp(
Sequence::new(0, 100),
"table_1,tag_partition_by=a foo=\"hello\",bar=1 0",
);
write_buffer_state.push_lp(Sequence::new(0, 101), "table_2,tag_partition_by=a foo=1 0");
}
fn count_persisted_chunks(db: &Db) -> usize {
db.chunk_summaries()
.into_iter()
.filter(|x| {
matches!(
x.storage,
ChunkStorage::ObjectStoreOnly | ChunkStorage::ReadBufferAndObjectStore
)
})
.count()
}