Merge branch 'main' into crepererum/writer_buffer_seek

pull/24376/head
kodiakhq[bot] 2021-07-20 12:29:18 +00:00 committed by GitHub
commit 58dd7e9532
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1603 additions and 1594 deletions

View File

@ -22,3 +22,4 @@ pub mod names;
pub mod partition_metadata;
pub mod server_id;
pub mod timestamp;
pub mod write_summary;

View File

@ -0,0 +1,20 @@
use chrono::{DateTime, Utc};
/// A description of a set of writes
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct WriteSummary {
/// The wall clock timestamp of the last write in this summary
pub time_of_first_write: DateTime<Utc>,
/// The wall clock timestamp of the last write in this summary
pub time_of_last_write: DateTime<Utc>,
/// The minimum row timestamp for data in this summary
pub min_timestamp: DateTime<Utc>,
/// The maximum row timestamp value for data in this summary
pub max_timestamp: DateTime<Utc>,
/// The number of rows in this summary
pub row_count: usize,
}

View File

@ -786,12 +786,12 @@ macro_rules! assert_column_eq {
#[cfg(test)]
mod test {
use arrow::compute::SortOptions;
use InfluxColumnType::*;
use InfluxFieldType::*;
use super::{builder::SchemaBuilder, *};
use crate::schema::merge::SchemaMerger;
use crate::schema::sort::SortOptions;
fn make_field(
name: &str,

View File

@ -1,5 +1,6 @@
use std::{fmt::Display, str::FromStr};
use arrow::compute::SortOptions;
use indexmap::{map::Iter, IndexMap};
use itertools::Itertools;
use snafu::Snafu;
@ -23,24 +24,6 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Temporary - <https://github.com/apache/arrow-rs/pull/425>
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct SortOptions {
/// Whether to sort in descending order
pub descending: bool,
/// Whether to sort nulls first
pub nulls_first: bool,
}
impl Default for SortOptions {
fn default() -> Self {
Self {
descending: false,
nulls_first: true,
}
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct ColumnSort {
/// Position of this column in the sort key

View File

@ -1399,6 +1399,7 @@ mod tests {
let rules = LifecycleRules {
late_arrive_window_seconds: NonZeroU32::new(10).unwrap(),
persist_row_threshold: NonZeroUsize::new(1_000).unwrap(),
max_active_compactions: NonZeroU32::new(10).unwrap(),
..Default::default()
};
@ -1538,6 +1539,7 @@ mod tests {
persist_row_threshold: NonZeroUsize::new(1_000).unwrap(),
late_arrive_window_seconds: NonZeroU32::new(10).unwrap(),
persist_age_threshold_seconds: NonZeroU32::new(10).unwrap(),
max_active_compactions: NonZeroU32::new(10).unwrap(),
..Default::default()
};
let now = Instant::now();

View File

@ -7,7 +7,7 @@ use std::{
use chrono::{DateTime, TimeZone, Utc};
use data_types::partition_metadata::PartitionAddr;
use data_types::{partition_metadata::PartitionAddr, write_summary::WriteSummary};
use entry::Sequence;
use internal_types::guard::{ReadGuard, ReadLock};
@ -45,6 +45,16 @@ pub struct PersistenceWindows {
late_arrival_period: Duration,
closed_window_period: Duration,
/// The datetime this PersistenceWindows was created
///
/// `PersistenceWindows` internally uses monotonic `Instant`, however,
/// these cannot be rendered. To provide a stable rendering of Wall timestamp,
/// a single timestamp is recorded at creation time
created_at_time: DateTime<Utc>,
/// The instant this PersistenceWindows was created
created_at_instant: Instant,
/// The last instant passed to PersistenceWindows::add_range
last_instant: Instant,
@ -106,6 +116,9 @@ impl PersistenceWindows {
let closed_window_count = late_arrival_seconds / closed_window_seconds;
let created_at_time = Utc::now();
let created_at_instant = Instant::now();
Self {
persistable: ReadLock::new(None),
closed: VecDeque::with_capacity(closed_window_count as usize),
@ -113,7 +126,9 @@ impl PersistenceWindows {
addr,
late_arrival_period,
closed_window_period,
last_instant: Instant::now(),
created_at_time,
created_at_instant,
last_instant: created_at_instant,
max_sequence_numbers: Default::default(),
}
}
@ -165,7 +180,7 @@ impl PersistenceWindows {
self.rotate(received_at);
match self.open.as_mut() {
Some(w) => w.add_range(sequence, row_count, min_time, max_time),
Some(w) => w.add_range(sequence, row_count, min_time, max_time, received_at),
None => {
self.open = Some(Window::new(
received_at,
@ -335,6 +350,34 @@ impl PersistenceWindows {
self.windows().next()
}
/// Returns approximate summaries of the unpersisted writes contained
/// recorded by this PersistenceWindow instance
///
/// These are approximate because persistence may partially flush a window, which will
/// update the min row timestamp but not the row count
pub fn summaries(&self) -> impl Iterator<Item = WriteSummary> + '_ {
self.windows().map(move |window| {
let window_age = chrono::Duration::from_std(
window.created_at.duration_since(self.created_at_instant),
)
.expect("duration overflow");
let time_of_first_write = self.created_at_time + window_age;
let window_duration =
chrono::Duration::from_std(window.last_instant.duration_since(window.created_at))
.expect("duration overflow");
WriteSummary {
time_of_first_write,
time_of_last_write: time_of_first_write + window_duration,
min_timestamp: window.min_time,
max_timestamp: window.max_time,
row_count: window.row_count,
}
})
}
/// Returns true if this PersistenceWindows instance is empty
pub fn is_empty(&self) -> bool {
self.minimum_window().is_none()
@ -374,9 +417,14 @@ struct Window {
/// The server time when this window was created. Used to determine how long data in this
/// window has been sitting in memory.
created_at: Instant,
/// The server time of the last write to this window
last_instant: Instant,
/// The number of rows in the window
row_count: usize,
min_time: DateTime<Utc>, // min time value for data in the window
max_time: DateTime<Utc>, // max time value for data in the window
/// min time value for data in the window
min_time: DateTime<Utc>,
/// max time value for data in the window
max_time: DateTime<Utc>,
/// maps sequencer_id to the minimum and maximum sequence numbers seen
sequencer_numbers: BTreeMap<u32, MinMaxSequence>,
}
@ -399,6 +447,7 @@ impl Window {
Self {
created_at,
last_instant: created_at,
row_count,
min_time,
max_time,
@ -414,7 +463,11 @@ impl Window {
row_count: usize,
min_time: DateTime<Utc>,
max_time: DateTime<Utc>,
instant: Instant,
) {
assert!(self.created_at <= instant);
self.last_instant = instant;
self.row_count += row_count;
if self.min_time > min_time {
self.min_time = min_time;
@ -1265,4 +1318,89 @@ mod tests {
assert_eq!(w.closed[1].max_time, start + chrono::Duration::seconds(2));
assert_eq!(w.closed[1].row_count, 11);
}
#[test]
fn test_summaries() {
let mut w = make_windows(Duration::from_secs(100));
let instant = w.created_at_instant;
// Window 1
w.add_range(
Some(&Sequence { id: 1, number: 1 }),
11,
Utc.timestamp_nanos(10),
Utc.timestamp_nanos(11),
instant + Duration::from_millis(1),
);
w.add_range(
Some(&Sequence { id: 1, number: 2 }),
4,
Utc.timestamp_nanos(10),
Utc.timestamp_nanos(340),
instant + Duration::from_millis(30),
);
w.add_range(
Some(&Sequence { id: 1, number: 3 }),
6,
Utc.timestamp_nanos(1),
Utc.timestamp_nanos(5),
instant + Duration::from_millis(50),
);
// More than DEFAULT_CLOSED_WINDOW_PERIOD after start of Window 1 => Window 2
w.add_range(
Some(&Sequence { id: 1, number: 4 }),
3,
Utc.timestamp_nanos(89),
Utc.timestamp_nanos(90),
instant + DEFAULT_CLOSED_WINDOW_PERIOD + Duration::from_millis(1),
);
// More than DEFAULT_CLOSED_WINDOW_PERIOD after start of Window 2 => Window 3
w.add_range(
Some(&Sequence { id: 1, number: 5 }),
8,
Utc.timestamp_nanos(3),
Utc.timestamp_nanos(4),
instant + DEFAULT_CLOSED_WINDOW_PERIOD * 3,
);
let closed_duration = chrono::Duration::from_std(DEFAULT_CLOSED_WINDOW_PERIOD).unwrap();
let summaries: Vec<_> = w.summaries().collect();
assert_eq!(summaries.len(), 3);
assert_eq!(
summaries,
vec![
WriteSummary {
time_of_first_write: w.created_at_time + chrono::Duration::milliseconds(1),
time_of_last_write: w.created_at_time + chrono::Duration::milliseconds(50),
min_timestamp: Utc.timestamp_nanos(1),
max_timestamp: Utc.timestamp_nanos(340),
row_count: 21
},
WriteSummary {
time_of_first_write: w.created_at_time
+ closed_duration
+ chrono::Duration::milliseconds(1),
time_of_last_write: w.created_at_time
+ closed_duration
+ chrono::Duration::milliseconds(1),
min_timestamp: Utc.timestamp_nanos(89),
max_timestamp: Utc.timestamp_nanos(90),
row_count: 3
},
WriteSummary {
time_of_first_write: w.created_at_time + closed_duration * 3,
time_of_last_write: w.created_at_time + closed_duration * 3,
min_timestamp: Utc.timestamp_nanos(3),
max_timestamp: Utc.timestamp_nanos(4),
row_count: 8
},
]
)
}
}

View File

@ -268,8 +268,9 @@ struct ScanPlan<C: QueryChunk + 'static> {
#[cfg(test)]
mod test {
use arrow::compute::SortOptions;
use arrow_util::assert_batches_eq;
use internal_types::schema::{merge::SchemaMerger, sort::SortOptions};
use internal_types::schema::merge::SchemaMerger;
use crate::{
exec::{Executor, ExecutorType},

View File

@ -339,21 +339,12 @@ impl RecordBatchDeduplicator {
}
/// Create a new record batch from offset --> len
///
/// <https://github.com/apache/arrow-rs/issues/460> for adding this upstream
fn slice_record_batch(
batch: &RecordBatch,
offset: usize,
len: usize,
) -> ArrowResult<RecordBatch> {
let schema = batch.schema();
let new_columns: Vec<_> = batch
.columns()
.iter()
.map(|old_column| old_column.slice(offset, len))
.collect();
let batch = RecordBatch::try_new(schema, new_columns)?;
let batch = batch.slice(offset, len);
// At time of writing, `concat_batches` concatenates the
// contents of dictionaries as well; Do a post pass to remove the

View File

@ -8,7 +8,7 @@ use data_types::{
DatabaseName,
};
use metrics::MetricRegistry;
use object_store::{path::ObjectStorePath, ObjectStore};
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
use parquet_file::catalog::PreservedCatalog;
use query::exec::Executor;
use write_buffer::config::WriteBufferConfig;
@ -20,6 +20,7 @@ use crate::{
InvalidDatabaseStateTransition, JobRegistry, Result, RulesDatabaseNameMismatch,
ServerShuttingDown,
};
use object_store::path::Path;
use observability_deps::tracing::{self, error, info, warn, Instrument};
use snafu::{ensure, OptionExt};
use tokio::task::JoinHandle;
@ -37,10 +38,14 @@ pub(crate) const DB_RULES_FILE_NAME: &str = "rules.pb";
/// run to completion if the tokio runtime is dropped
#[derive(Debug)]
pub(crate) struct Config {
shutdown: CancellationToken,
jobs: Arc<JobRegistry>,
state: RwLock<ConfigState>,
object_store: Arc<ObjectStore>,
exec: Arc<Executor>,
server_id: ServerId,
metric_registry: Arc<MetricRegistry>,
shutdown: CancellationToken,
state: RwLock<ConfigState>,
}
pub(crate) enum UpdateError<E> {
@ -58,14 +63,20 @@ impl Config {
/// Create new empty config.
pub(crate) fn new(
jobs: Arc<JobRegistry>,
object_store: Arc<ObjectStore>,
exec: Arc<Executor>,
server_id: ServerId,
metric_registry: Arc<MetricRegistry>,
remote_template: Option<RemoteTemplate>,
) -> Self {
Self {
jobs,
object_store,
exec,
server_id,
metric_registry,
shutdown: Default::default(),
state: RwLock::new(ConfigState::new(remote_template)),
jobs,
metric_registry,
}
}
@ -80,13 +91,7 @@ impl Config {
/// This only works if the database is not yet known. To recover a database out of an uninitialized state, see
/// [`recover_db`](Self::recover_db). To do maintainance work on data linked to the database (e.g. the catalog)
/// without initializing it, see [`block_db`](Self::block_db).
pub(crate) fn create_db(
&self,
object_store: Arc<ObjectStore>,
exec: Arc<Executor>,
server_id: ServerId,
db_name: DatabaseName<'static>,
) -> Result<DatabaseHandle<'_>> {
pub(crate) fn create_db(&self, db_name: DatabaseName<'static>) -> Result<DatabaseHandle<'_>> {
let mut state = self.state.write().expect("mutex poisoned");
ensure!(
!state.reservations.contains(&db_name),
@ -99,12 +104,7 @@ impl Config {
state.reservations.insert(db_name.clone());
Ok(DatabaseHandle {
state: Some(Arc::new(DatabaseState::Known {
object_store,
exec,
server_id,
db_name,
})),
state: Some(Arc::new(DatabaseState::Known { db_name })),
config: &self,
})
}
@ -116,7 +116,7 @@ impl Config {
/// While the handle is held, no other operations for the given database can be executed.
///
/// This only works if the database is known but is uninitialized. To create a new database that is not yet known,
/// see [`create_db`](Self::create_db). To do maintainance work on data linked to the database (e.g. the catalog)
/// see [`create_db`](Self::create_db). To do maintenance work on data linked to the database (e.g. the catalog)
/// without initializing it, see [`block_db`](Self::block_db).
pub(crate) fn recover_db(&self, db_name: DatabaseName<'static>) -> Result<DatabaseHandle<'_>> {
let mut state = self.state.write().expect("mutex poisoned");
@ -303,6 +303,24 @@ impl Config {
pub fn metrics_registry(&self) -> Arc<MetricRegistry> {
Arc::clone(&self.metric_registry)
}
/// Returns the object store of this server
pub fn object_store(&self) -> Arc<ObjectStore> {
Arc::clone(&self.object_store)
}
/// Returns the server id of this server
pub fn server_id(&self) -> ServerId {
self.server_id
}
/// Base location in object store for this server.
pub fn root_path(&self) -> Path {
let id = self.server_id.get();
let mut path = self.object_store.new_path();
path.push_dir(format!("{}", id));
path
}
}
/// Get object store path for the database config under the given root (= path under with the server with the current ID
@ -365,41 +383,14 @@ impl RemoteTemplate {
}
/// Internal representation of the different database states.
///
/// # Shared Data During Transitions
/// The following elements can safely be shared between states because they won't be poisoned by any half-done
/// transition (e.g. starting a transition and then failing due to an IO error):
/// - `object_store`
/// - `exec`
///
/// The following elements can trivially be copied from one state to the next:
/// - `server_id`
/// - `db_name`
///
/// The following elements MUST be copied from one state to the next because partial modifications are not allowed:
/// - `rules`
///
/// Exceptions to the above rules are the following states:
/// - [`Replay`](Self::Replay): replaying twice should (apart from some performance penalties) not do much harm
/// - [`Initialized`](Self::Initialized): the final state is not advanced to anything else
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
enum DatabaseState {
/// Database is known but nothing is loaded.
Known {
object_store: Arc<ObjectStore>,
exec: Arc<Executor>,
server_id: ServerId,
db_name: DatabaseName<'static>,
},
Known { db_name: DatabaseName<'static> },
/// Rules are loaded
RulesLoaded {
object_store: Arc<ObjectStore>,
exec: Arc<Executor>,
server_id: ServerId,
rules: Arc<DatabaseRules>,
},
RulesLoaded { rules: Arc<DatabaseRules> },
/// Catalog is loaded but data from sequencers / write buffers is not yet replayed.
Replay { db: Arc<Db> },
@ -457,24 +448,6 @@ impl DatabaseState {
}
}
fn object_store(&self) -> Arc<ObjectStore> {
match self {
DatabaseState::Known { object_store, .. } => Arc::clone(object_store),
DatabaseState::RulesLoaded { object_store, .. } => Arc::clone(object_store),
DatabaseState::Replay { db, .. } => Arc::clone(&db.store),
DatabaseState::Initialized { db, .. } => Arc::clone(&db.store),
}
}
fn server_id(&self) -> ServerId {
match self {
DatabaseState::Known { server_id, .. } => *server_id,
DatabaseState::RulesLoaded { server_id, .. } => *server_id,
DatabaseState::Replay { db, .. } => db.server_id,
DatabaseState::Initialized { db, .. } => db.server_id,
}
}
fn rules(&self) -> Option<Arc<DatabaseRules>> {
match self {
DatabaseState::Known { .. } => None,
@ -540,12 +513,12 @@ impl<'a> DatabaseHandle<'a> {
/// Get object store.
pub fn object_store(&self) -> Arc<ObjectStore> {
self.state().object_store()
Arc::clone(&self.config.object_store)
}
/// Get server ID.
pub fn server_id(&self) -> ServerId {
self.state().server_id()
self.config.server_id
}
/// Get metrics registry.
@ -584,12 +557,7 @@ impl<'a> DatabaseHandle<'a> {
/// Advance database state to [`RulesLoaded`](DatabaseStateCode::RulesLoaded).
pub fn advance_rules_loaded(&mut self, rules: DatabaseRules) -> Result<()> {
match self.state().as_ref() {
DatabaseState::Known {
object_store,
exec,
server_id,
db_name,
} => {
DatabaseState::Known { db_name } => {
ensure!(
db_name == &rules.name,
RulesDatabaseNameMismatch {
@ -599,9 +567,6 @@ impl<'a> DatabaseHandle<'a> {
);
self.state = Some(Arc::new(DatabaseState::RulesLoaded {
object_store: Arc::clone(&object_store),
exec: Arc::clone(&exec),
server_id: *server_id,
rules: Arc::new(rules),
}));
@ -623,16 +588,11 @@ impl<'a> DatabaseHandle<'a> {
write_buffer: Option<WriteBufferConfig>,
) -> Result<()> {
match self.state().as_ref() {
DatabaseState::RulesLoaded {
object_store,
exec,
server_id,
rules,
} => {
DatabaseState::RulesLoaded { rules } => {
let database_to_commit = DatabaseToCommit {
server_id: *server_id,
object_store: Arc::clone(&object_store),
exec: Arc::clone(&exec),
server_id: self.config.server_id,
object_store: Arc::clone(&self.config.object_store),
exec: Arc::clone(&self.config.exec),
preserved_catalog,
catalog,
rules: Arc::clone(&rules),
@ -726,40 +686,32 @@ mod test {
use super::*;
use std::num::NonZeroU32;
fn make_config(remote_template: Option<RemoteTemplate>) -> Config {
let store = Arc::new(ObjectStore::new_in_memory());
let server_id = ServerId::try_from(1).unwrap();
let metric_registry = Arc::new(metrics::MetricRegistry::new());
Config::new(
Arc::new(JobRegistry::new()),
Arc::clone(&store),
Arc::new(Executor::new(1)),
server_id,
Arc::clone(&metric_registry),
remote_template,
)
}
#[tokio::test]
async fn create_db() {
// setup
let name = DatabaseName::new("foo").unwrap();
let store = Arc::new(ObjectStore::new_in_memory());
let exec = Arc::new(Executor::new(1));
let server_id = ServerId::try_from(1).unwrap();
let metric_registry = Arc::new(metrics::MetricRegistry::new());
let config = Config::new(
Arc::new(JobRegistry::new()),
Arc::clone(&metric_registry),
None,
);
let config = make_config(None);
let rules = DatabaseRules::new(name.clone());
// getting handle while DB is reserved => fails
{
let _db_reservation = config
.create_db(
Arc::clone(&store),
Arc::clone(&exec),
server_id,
name.clone(),
)
.unwrap();
let _db_reservation = config.create_db(name.clone()).unwrap();
let err = config
.create_db(
Arc::clone(&store),
Arc::clone(&exec),
server_id,
name.clone(),
)
.unwrap_err();
let err = config.create_db(name.clone()).unwrap_err();
assert!(matches!(err, Error::DatabaseReserved { .. }));
let err = config.block_db(name.clone()).unwrap_err();
@ -771,14 +723,7 @@ mod test {
// name in rules must match reserved name
{
let mut db_reservation = config
.create_db(
Arc::clone(&store),
Arc::clone(&exec),
server_id,
DatabaseName::new("bar").unwrap(),
)
.unwrap();
let mut db_reservation = config.create_db(DatabaseName::new("bar").unwrap()).unwrap();
let err = db_reservation
.advance_rules_loaded(rules.clone())
@ -791,14 +736,7 @@ mod test {
// handle.abort just works (aka does not mess up the transaction afterwards)
{
let db_reservation = config
.create_db(
Arc::clone(&store),
Arc::clone(&exec),
server_id,
DatabaseName::new("bar").unwrap(),
)
.unwrap();
let db_reservation = config.create_db(DatabaseName::new("bar").unwrap()).unwrap();
db_reservation.abort();
}
@ -808,21 +746,14 @@ mod test {
// create DB successfull
{
let mut db_reservation = config
.create_db(
Arc::clone(&store),
Arc::clone(&exec),
server_id,
name.clone(),
)
.unwrap();
let mut db_reservation = config.create_db(name.clone()).unwrap();
db_reservation.advance_rules_loaded(rules).unwrap();
let (preserved_catalog, catalog) = load_or_create_preserved_catalog(
&name,
Arc::clone(&store),
server_id,
config.object_store(),
config.server_id(),
config.metrics_registry(),
false,
)
@ -862,14 +793,7 @@ mod test {
assert!(matches!(err, Error::DatabaseAlreadyExists { .. }));
// create DB as second time => fail
let err = config
.create_db(
Arc::clone(&store),
Arc::clone(&exec),
server_id,
name.clone(),
)
.unwrap_err();
let err = config.create_db(name.clone()).unwrap_err();
assert!(matches!(err, Error::DatabaseAlreadyExists { .. }));
// block fully initiliazed DB => fail
@ -884,40 +808,18 @@ mod test {
async fn recover_db() {
// setup
let name = DatabaseName::new("foo").unwrap();
let store = Arc::new(ObjectStore::new_in_memory());
let exec = Arc::new(Executor::new(1));
let server_id = ServerId::try_from(1).unwrap();
let metric_registry = Arc::new(metrics::MetricRegistry::new());
let config = Config::new(
Arc::new(JobRegistry::new()),
Arc::clone(&metric_registry),
None,
);
let config = make_config(None);
let rules = DatabaseRules::new(name.clone());
// create DB but don't continue with rules loaded (e.g. because the rules file is broken)
{
let db_reservation = config
.create_db(
Arc::clone(&store),
Arc::clone(&exec),
server_id,
name.clone(),
)
.unwrap();
let db_reservation = config.create_db(name.clone()).unwrap();
db_reservation.commit();
}
assert!(config.has_uninitialized_database(&name));
// create DB while it is uninitialized => fail
let err = config
.create_db(
Arc::clone(&store),
Arc::clone(&exec),
server_id,
name.clone(),
)
.unwrap_err();
let err = config.create_db(name.clone()).unwrap_err();
assert!(matches!(err, Error::DatabaseAlreadyExists { .. }));
// recover an unknown DB => fail
@ -931,19 +833,19 @@ mod test {
let mut db_reservation = config.recover_db(name.clone()).unwrap();
assert_eq!(db_reservation.state_code(), DatabaseStateCode::Known);
assert_eq!(db_reservation.db_name(), name);
assert_eq!(db_reservation.server_id(), server_id);
assert_eq!(db_reservation.server_id(), config.server_id());
assert!(db_reservation.rules().is_none());
db_reservation.advance_rules_loaded(rules).unwrap();
assert_eq!(db_reservation.state_code(), DatabaseStateCode::RulesLoaded);
assert_eq!(db_reservation.db_name(), name);
assert_eq!(db_reservation.server_id(), server_id);
assert_eq!(db_reservation.server_id(), config.server_id());
assert!(db_reservation.rules().is_some());
let (preserved_catalog, catalog) = load_or_create_preserved_catalog(
&name,
Arc::clone(&store),
server_id,
config.object_store(),
config.server_id(),
config.metrics_registry(),
false,
)
@ -954,13 +856,13 @@ mod test {
.unwrap();
assert_eq!(db_reservation.state_code(), DatabaseStateCode::Replay);
assert_eq!(db_reservation.db_name(), name);
assert_eq!(db_reservation.server_id(), server_id);
assert_eq!(db_reservation.server_id(), config.server_id());
assert!(db_reservation.rules().is_some());
db_reservation.advance_init().unwrap();
assert_eq!(db_reservation.state_code(), DatabaseStateCode::Initialized);
assert_eq!(db_reservation.db_name(), name);
assert_eq!(db_reservation.server_id(), server_id);
assert_eq!(db_reservation.server_id(), config.server_id());
assert!(db_reservation.rules().is_some());
db_reservation.commit();
@ -974,14 +876,7 @@ mod test {
assert!(matches!(err, Error::DatabaseAlreadyExists { .. }));
// create recovered DB => fail
let err = config
.create_db(
Arc::clone(&store),
Arc::clone(&exec),
server_id,
name.clone(),
)
.unwrap_err();
let err = config.create_db(name.clone()).unwrap_err();
assert!(matches!(err, Error::DatabaseAlreadyExists { .. }));
// block recovered DB => fail
@ -996,28 +891,13 @@ mod test {
async fn block_db() {
// setup
let name = DatabaseName::new("foo").unwrap();
let store = Arc::new(ObjectStore::new_in_memory());
let exec = Arc::new(Executor::new(1));
let server_id = ServerId::try_from(1).unwrap();
let metric_registry = Arc::new(metrics::MetricRegistry::new());
let config = Config::new(
Arc::new(JobRegistry::new()),
Arc::clone(&metric_registry),
None,
);
let config = make_config(None);
// block DB
let handle = config.block_db(name.clone()).unwrap();
// create while blocked => fail
let err = config
.create_db(
Arc::clone(&store),
Arc::clone(&exec),
server_id,
name.clone(),
)
.unwrap_err();
let err = config.create_db(name.clone()).unwrap_err();
assert!(matches!(err, Error::DatabaseReserved { .. }));
// recover while blocked => fail
@ -1030,14 +910,7 @@ mod test {
// unblock => DB can be created
drop(handle);
config
.create_db(
Arc::clone(&store),
Arc::clone(&exec),
server_id,
name.clone(),
)
.unwrap();
config.create_db(name.clone()).unwrap();
// cleanup
config.drain().await
@ -1047,20 +920,12 @@ mod test {
async fn test_db_drop() {
// setup
let name = DatabaseName::new("foo").unwrap();
let store = Arc::new(ObjectStore::new_in_memory());
let exec = Arc::new(Executor::new(1));
let server_id = ServerId::try_from(1).unwrap();
let metric_registry = Arc::new(metrics::MetricRegistry::new());
let config = Config::new(
Arc::new(JobRegistry::new()),
Arc::clone(&metric_registry),
None,
);
let config = make_config(None);
let rules = DatabaseRules::new(name.clone());
let (preserved_catalog, catalog) = load_or_create_preserved_catalog(
&name,
Arc::clone(&store),
server_id,
config.object_store(),
config.server_id(),
config.metrics_registry(),
false,
)
@ -1068,14 +933,7 @@ mod test {
.unwrap();
// create DB
let mut db_reservation = config
.create_db(
Arc::clone(&store),
Arc::clone(&exec),
server_id,
name.clone(),
)
.unwrap();
let mut db_reservation = config.create_db(name.clone()).unwrap();
db_reservation.advance_rules_loaded(rules).unwrap();
db_reservation
.advance_replay(preserved_catalog, catalog, None)
@ -1122,12 +980,7 @@ mod test {
#[test]
fn resolve_remote() {
let metric_registry = Arc::new(metrics::MetricRegistry::new());
let config = Config::new(
Arc::new(JobRegistry::new()),
Arc::clone(&metric_registry),
Some(RemoteTemplate::new("http://iox-query-{id}:8082")),
);
let config = make_config(Some(RemoteTemplate::new("http://iox-query-{id}:8082")));
let server_id = ServerId::new(NonZeroU32::new(42).unwrap());
let remote = config.resolve_remote(server_id);

View File

@ -203,10 +203,10 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
pub struct Db {
rules: RwLock<Arc<DatabaseRules>>,
pub server_id: ServerId, // this is also the Query Server ID
server_id: ServerId, // this is also the Query Server ID
/// Interface to use for persistence
pub store: Arc<ObjectStore>,
store: Arc<ObjectStore>,
/// Executor for running queries
exec: Arc<Executor>,

View File

@ -7,38 +7,30 @@
//!
//! For example `SELECT * FROM system.chunks`
use std::convert::AsRef;
use std::any::Any;
use std::sync::Arc;
use std::{any::Any, collections::HashMap};
use chrono::{DateTime, Utc};
use arrow::{
array::{
ArrayRef, StringArray, StringBuilder, Time64NanosecondArray, TimestampNanosecondArray,
UInt32Array, UInt32Builder, UInt64Array, UInt64Builder,
},
datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit},
datatypes::{Field, Schema, SchemaRef},
error::Result,
record_batch::RecordBatch,
};
use data_types::{
chunk_metadata::{ChunkSummary, DetailedChunkSummary},
error::ErrorLogger,
job::Job,
partition_metadata::PartitionSummary,
};
use chrono::{DateTime, Utc};
use datafusion::{
catalog::schema::SchemaProvider,
datasource::{datasource::Statistics, TableProvider},
error::{DataFusionError, Result as DataFusionResult},
physical_plan::{memory::MemoryExec, ExecutionPlan},
};
use tracker::TaskTracker;
use crate::JobRegistry;
use super::catalog::Catalog;
use crate::JobRegistry;
use data_types::partition_metadata::TableSummary;
mod chunks;
mod columns;
mod operations;
// The IOx system schema
pub const SYSTEM_SCHEMA: &str = "system";
@ -67,16 +59,16 @@ impl SystemSchemaProvider {
pub fn new(db_name: impl Into<String>, catalog: Arc<Catalog>, jobs: Arc<JobRegistry>) -> Self {
let db_name = db_name.into();
let chunks = Arc::new(SystemTableProvider {
inner: ChunksTable::new(Arc::clone(&catalog)),
inner: chunks::ChunksTable::new(Arc::clone(&catalog)),
});
let columns = Arc::new(SystemTableProvider {
inner: ColumnsTable::new(Arc::clone(&catalog)),
inner: columns::ColumnsTable::new(Arc::clone(&catalog)),
});
let chunk_columns = Arc::new(SystemTableProvider {
inner: ChunkColumnsTable::new(catalog),
inner: columns::ChunkColumnsTable::new(catalog),
});
let operations = Arc::new(SystemTableProvider {
inner: OperationsTable::new(db_name, jobs),
inner: operations::OperationsTable::new(db_name, jobs),
});
Self {
chunks,
@ -162,407 +154,6 @@ fn time_to_ts(time: Option<DateTime<Utc>>) -> Option<i64> {
time.map(|ts| ts.timestamp_nanos())
}
/// Implementation of system.chunks table
#[derive(Debug)]
struct ChunksTable {
schema: SchemaRef,
catalog: Arc<Catalog>,
}
impl ChunksTable {
fn new(catalog: Arc<Catalog>) -> Self {
Self {
schema: chunk_summaries_schema(),
catalog,
}
}
}
impl IoxSystemTable for ChunksTable {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn batch(&self) -> Result<RecordBatch> {
from_chunk_summaries(self.schema(), self.catalog.chunk_summaries())
.log_if_error("system.chunks table")
}
}
fn chunk_summaries_schema() -> SchemaRef {
let ts = DataType::Timestamp(TimeUnit::Nanosecond, None);
Arc::new(Schema::new(vec![
Field::new("id", DataType::UInt32, false),
Field::new("partition_key", DataType::Utf8, false),
Field::new("table_name", DataType::Utf8, false),
Field::new("storage", DataType::Utf8, false),
Field::new("lifecycle_action", DataType::Utf8, true),
Field::new("memory_bytes", DataType::UInt64, false),
Field::new("object_store_bytes", DataType::UInt64, false),
Field::new("row_count", DataType::UInt64, false),
Field::new("time_of_first_write", ts.clone(), true),
Field::new("time_of_last_write", ts.clone(), true),
Field::new("time_closed", ts, true),
]))
}
fn from_chunk_summaries(schema: SchemaRef, chunks: Vec<ChunkSummary>) -> Result<RecordBatch> {
let id = chunks.iter().map(|c| Some(c.id)).collect::<UInt32Array>();
let partition_key = chunks
.iter()
.map(|c| Some(c.partition_key.as_ref()))
.collect::<StringArray>();
let table_name = chunks
.iter()
.map(|c| Some(c.table_name.as_ref()))
.collect::<StringArray>();
let storage = chunks
.iter()
.map(|c| Some(c.storage.as_str()))
.collect::<StringArray>();
let lifecycle_action = chunks
.iter()
.map(|c| c.lifecycle_action.map(|a| a.name()))
.collect::<StringArray>();
let memory_bytes = chunks
.iter()
.map(|c| Some(c.memory_bytes as u64))
.collect::<UInt64Array>();
let object_store_bytes = chunks
.iter()
.map(|c| Some(c.object_store_bytes as u64).filter(|&v| v > 0))
.collect::<UInt64Array>();
let row_counts = chunks
.iter()
.map(|c| Some(c.row_count as u64))
.collect::<UInt64Array>();
let time_of_first_write = chunks
.iter()
.map(|c| c.time_of_first_write)
.map(time_to_ts)
.collect::<TimestampNanosecondArray>();
let time_of_last_write = chunks
.iter()
.map(|c| c.time_of_last_write)
.map(time_to_ts)
.collect::<TimestampNanosecondArray>();
let time_closed = chunks
.iter()
.map(|c| c.time_closed)
.map(time_to_ts)
.collect::<TimestampNanosecondArray>();
RecordBatch::try_new(
schema,
vec![
Arc::new(id),
Arc::new(partition_key),
Arc::new(table_name),
Arc::new(storage),
Arc::new(lifecycle_action),
Arc::new(memory_bytes),
Arc::new(object_store_bytes),
Arc::new(row_counts),
Arc::new(time_of_first_write),
Arc::new(time_of_last_write),
Arc::new(time_closed),
],
)
}
/// Implementation of `system.columns` system table
#[derive(Debug)]
struct ColumnsTable {
schema: SchemaRef,
catalog: Arc<Catalog>,
}
impl ColumnsTable {
fn new(catalog: Arc<Catalog>) -> Self {
Self {
schema: partition_summaries_schema(),
catalog,
}
}
}
impl IoxSystemTable for ColumnsTable {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn batch(&self) -> Result<RecordBatch> {
from_partition_summaries(self.schema(), self.catalog.partition_summaries())
.log_if_error("system.columns table")
}
}
fn partition_summaries_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("partition_key", DataType::Utf8, false),
Field::new("table_name", DataType::Utf8, false),
Field::new("column_name", DataType::Utf8, false),
Field::new("column_type", DataType::Utf8, false),
Field::new("influxdb_type", DataType::Utf8, true),
]))
}
fn from_partition_summaries(
schema: SchemaRef,
partitions: Vec<PartitionSummary>,
) -> Result<RecordBatch> {
// Assume each partition has roughly 5 tables with 5 columns
let row_estimate = partitions.len() * 25;
let mut partition_key = StringBuilder::new(row_estimate);
let mut table_name = StringBuilder::new(row_estimate);
let mut column_name = StringBuilder::new(row_estimate);
let mut column_type = StringBuilder::new(row_estimate);
let mut influxdb_type = StringBuilder::new(row_estimate);
// Note no rows are produced for partitions with no tabes, or
// tables with no columns: There are other tables to list tables
// and columns
for partition in partitions {
let table = partition.table;
for column in table.columns {
partition_key.append_value(&partition.key)?;
table_name.append_value(&table.name)?;
column_name.append_value(&column.name)?;
column_type.append_value(column.type_name())?;
if let Some(t) = &column.influxdb_type {
influxdb_type.append_value(t.as_str())?;
} else {
influxdb_type.append_null()?;
}
}
}
RecordBatch::try_new(
schema,
vec![
Arc::new(partition_key.finish()) as ArrayRef,
Arc::new(table_name.finish()),
Arc::new(column_name.finish()),
Arc::new(column_type.finish()),
Arc::new(influxdb_type.finish()),
],
)
}
/// Implementation of system.column_chunks table
#[derive(Debug)]
struct ChunkColumnsTable {
schema: SchemaRef,
catalog: Arc<Catalog>,
}
impl ChunkColumnsTable {
fn new(catalog: Arc<Catalog>) -> Self {
Self {
schema: chunk_columns_schema(),
catalog,
}
}
}
impl IoxSystemTable for ChunkColumnsTable {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn batch(&self) -> Result<RecordBatch> {
assemble_chunk_columns(self.schema(), self.catalog.detailed_chunk_summaries())
.log_if_error("system.column_chunks table")
}
}
fn chunk_columns_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("partition_key", DataType::Utf8, false),
Field::new("chunk_id", DataType::UInt32, false),
Field::new("table_name", DataType::Utf8, false),
Field::new("column_name", DataType::Utf8, false),
Field::new("storage", DataType::Utf8, false),
Field::new("row_count", DataType::UInt64, true),
Field::new("min_value", DataType::Utf8, true),
Field::new("max_value", DataType::Utf8, true),
Field::new("memory_bytes", DataType::UInt64, true),
]))
}
fn assemble_chunk_columns(
schema: SchemaRef,
chunk_summaries: Vec<(Arc<TableSummary>, DetailedChunkSummary)>,
) -> Result<RecordBatch> {
/// Builds an index from column_name -> size
fn make_column_index(summary: &DetailedChunkSummary) -> HashMap<&str, u64> {
summary
.columns
.iter()
.map(|column_summary| {
(
column_summary.name.as_ref(),
column_summary.memory_bytes as u64,
)
})
.collect()
}
// Assume each chunk has roughly 5 columns
let row_estimate = chunk_summaries.len() * 5;
let mut partition_key = StringBuilder::new(row_estimate);
let mut chunk_id = UInt32Builder::new(row_estimate);
let mut table_name = StringBuilder::new(row_estimate);
let mut column_name = StringBuilder::new(row_estimate);
let mut storage = StringBuilder::new(row_estimate);
let mut row_count = UInt64Builder::new(row_estimate);
let mut min_values = StringBuilder::new(row_estimate);
let mut max_values = StringBuilder::new(row_estimate);
let mut memory_bytes = UInt64Builder::new(row_estimate);
// Note no rows are produced for partitions with no chunks, or
// tables with no partitions: There are other tables to list tables
// and columns
for (table_summary, chunk_summary) in chunk_summaries {
let mut column_index = make_column_index(&chunk_summary);
let storage_value = chunk_summary.inner.storage.as_str();
for column in &table_summary.columns {
partition_key.append_value(chunk_summary.inner.partition_key.as_ref())?;
chunk_id.append_value(chunk_summary.inner.id)?;
table_name.append_value(&chunk_summary.inner.table_name)?;
column_name.append_value(&column.name)?;
storage.append_value(storage_value)?;
row_count.append_value(column.count())?;
if let Some(v) = column.stats.min_as_str() {
min_values.append_value(v)?;
} else {
min_values.append(false)?;
}
if let Some(v) = column.stats.max_as_str() {
max_values.append_value(v)?;
} else {
max_values.append(false)?;
}
let size = column_index.remove(column.name.as_str());
memory_bytes.append_option(size)?;
}
}
RecordBatch::try_new(
schema,
vec![
Arc::new(partition_key.finish()) as ArrayRef,
Arc::new(chunk_id.finish()),
Arc::new(table_name.finish()),
Arc::new(column_name.finish()),
Arc::new(storage.finish()),
Arc::new(row_count.finish()),
Arc::new(min_values.finish()),
Arc::new(max_values.finish()),
Arc::new(memory_bytes.finish()),
],
)
}
/// Implementation of system.operations table
#[derive(Debug)]
struct OperationsTable {
schema: SchemaRef,
db_name: String,
jobs: Arc<JobRegistry>,
}
impl OperationsTable {
fn new(db_name: String, jobs: Arc<JobRegistry>) -> Self {
Self {
schema: operations_schema(),
db_name,
jobs,
}
}
}
impl IoxSystemTable for OperationsTable {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn batch(&self) -> Result<RecordBatch> {
from_task_trackers(self.schema(), &self.db_name, self.jobs.tracked())
.log_if_error("system.operations table")
}
}
fn operations_schema() -> SchemaRef {
let ts = DataType::Time64(TimeUnit::Nanosecond);
Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("status", DataType::Utf8, true),
Field::new("cpu_time_used", ts.clone(), true),
Field::new("wall_time_used", ts, true),
Field::new("partition_key", DataType::Utf8, true),
Field::new("chunk_id", DataType::UInt32, true),
Field::new("description", DataType::Utf8, true),
]))
}
fn from_task_trackers(
schema: SchemaRef,
db_name: &str,
jobs: Vec<TaskTracker<Job>>,
) -> Result<RecordBatch> {
let jobs = jobs
.into_iter()
.filter(|job| job.metadata().db_name() == Some(db_name))
.collect::<Vec<_>>();
let ids = jobs
.iter()
.map(|job| Some(job.id().to_string()))
.collect::<StringArray>();
let statuses = jobs
.iter()
.map(|job| Some(job.get_status().name()))
.collect::<StringArray>();
let cpu_time_used = jobs
.iter()
.map(|job| job.get_status().cpu_nanos().map(|n| n as i64))
.collect::<Time64NanosecondArray>();
let wall_time_used = jobs
.iter()
.map(|job| job.get_status().wall_nanos().map(|n| n as i64))
.collect::<Time64NanosecondArray>();
let partition_keys = jobs
.iter()
.map(|job| job.metadata().partition_key())
.collect::<StringArray>();
let chunk_ids = jobs
.iter()
.map(|job| job.metadata().chunk_id())
.collect::<UInt32Array>();
let descriptions = jobs
.iter()
.map(|job| Some(job.metadata().description()))
.collect::<StringArray>();
RecordBatch::try_new(
schema,
vec![
Arc::new(ids) as ArrayRef,
Arc::new(statuses),
Arc::new(cpu_time_used),
Arc::new(wall_time_used),
Arc::new(partition_keys),
Arc::new(chunk_ids),
Arc::new(descriptions),
],
)
}
/// Creates a DataFusion ExecutionPlan node that scans a single batch
/// of records.
fn scan_batch(
@ -605,141 +196,10 @@ fn scan_batch(
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{ArrayRef, UInt64Array};
use arrow_util::assert_batches_eq;
use chrono::NaiveDateTime;
use data_types::{
chunk_metadata::{ChunkColumnSummary, ChunkLifecycleAction, ChunkStorage},
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
};
#[test]
fn test_from_chunk_summaries() {
let chunks = vec![
ChunkSummary {
partition_key: Arc::from("p1"),
table_name: Arc::from("table1"),
id: 0,
storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action: None,
memory_bytes: 23754,
object_store_bytes: 0,
row_count: 11,
time_of_first_write: Some(DateTime::from_utc(
NaiveDateTime::from_timestamp(10, 0),
Utc,
)),
time_of_last_write: None,
time_closed: None,
},
ChunkSummary {
partition_key: Arc::from("p1"),
table_name: Arc::from("table1"),
id: 1,
storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action: Some(ChunkLifecycleAction::Persisting),
memory_bytes: 23455,
object_store_bytes: 0,
row_count: 22,
time_of_first_write: None,
time_of_last_write: Some(DateTime::from_utc(
NaiveDateTime::from_timestamp(80, 0),
Utc,
)),
time_closed: None,
},
ChunkSummary {
partition_key: Arc::from("p1"),
table_name: Arc::from("table1"),
id: 2,
storage: ChunkStorage::ObjectStoreOnly,
lifecycle_action: None,
memory_bytes: 1234,
object_store_bytes: 5678,
row_count: 33,
time_of_first_write: Some(DateTime::from_utc(
NaiveDateTime::from_timestamp(100, 0),
Utc,
)),
time_of_last_write: Some(DateTime::from_utc(
NaiveDateTime::from_timestamp(200, 0),
Utc,
)),
time_closed: None,
},
];
let expected = vec![
"+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+---------------------+---------------------+-------------+",
"| id | partition_key | table_name | storage | lifecycle_action | memory_bytes | object_store_bytes | row_count | time_of_first_write | time_of_last_write | time_closed |",
"+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+---------------------+---------------------+-------------+",
"| 0 | p1 | table1 | OpenMutableBuffer | | 23754 | | 11 | 1970-01-01 00:00:10 | | |",
"| 1 | p1 | table1 | OpenMutableBuffer | Persisting to Object Storage | 23455 | | 22 | | 1970-01-01 00:01:20 | |",
"| 2 | p1 | table1 | ObjectStoreOnly | | 1234 | 5678 | 33 | 1970-01-01 00:01:40 | 1970-01-01 00:03:20 | |",
"+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+---------------------+---------------------+-------------+",
];
let schema = chunk_summaries_schema();
let batch = from_chunk_summaries(schema, chunks).unwrap();
assert_batches_eq!(&expected, &[batch]);
}
#[test]
fn test_from_partition_summaries() {
let partitions = vec![
PartitionSummary {
key: "p1".to_string(),
table: TableSummary {
name: "t1".to_string(),
columns: vec![
ColumnSummary {
name: "c1".to_string(),
influxdb_type: Some(InfluxDbType::Tag),
stats: Statistics::I64(StatValues::new_with_value(23)),
},
ColumnSummary {
name: "c2".to_string(),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::I64(StatValues::new_with_value(43)),
},
ColumnSummary {
name: "c3".to_string(),
influxdb_type: None,
stats: Statistics::String(StatValues::new_with_value(
"foo".to_string(),
)),
},
ColumnSummary {
name: "time".to_string(),
influxdb_type: Some(InfluxDbType::Timestamp),
stats: Statistics::I64(StatValues::new_with_value(43)),
},
],
},
},
PartitionSummary {
key: "p3".to_string(),
table: TableSummary {
name: "t1".to_string(),
columns: vec![],
},
},
];
let expected = vec![
"+---------------+------------+-------------+-------------+---------------+",
"| partition_key | table_name | column_name | column_type | influxdb_type |",
"+---------------+------------+-------------+-------------+---------------+",
"| p1 | t1 | c1 | I64 | Tag |",
"| p1 | t1 | c2 | I64 | Field |",
"| p1 | t1 | c3 | String | |",
"| p1 | t1 | time | I64 | Timestamp |",
"+---------------+------------+-------------+-------------+---------------+",
];
let batch = from_partition_summaries(partition_summaries_schema(), partitions).unwrap();
assert_batches_eq!(&expected, &[batch]);
}
use super::*;
fn seq_array(start: u64, end: u64) -> ArrayRef {
Arc::new(UInt64Array::from_iter_values(start..end))
@ -820,130 +280,4 @@ mod tests {
err_string
);
}
#[test]
fn test_assemble_chunk_columns() {
let lifecycle_action = None;
let summaries = vec![
(
Arc::new(TableSummary {
name: "t1".to_string(),
columns: vec![
ColumnSummary {
name: "c1".to_string(),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::String(StatValues::new(
Some("bar".to_string()),
Some("foo".to_string()),
55,
)),
},
ColumnSummary {
name: "c2".to_string(),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::F64(StatValues::new(Some(11.0), Some(43.0), 66)),
},
],
}),
DetailedChunkSummary {
inner: ChunkSummary {
partition_key: "p1".into(),
table_name: "t1".into(),
id: 42,
storage: ChunkStorage::ReadBuffer,
lifecycle_action,
memory_bytes: 23754,
object_store_bytes: 0,
row_count: 11,
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
},
columns: vec![
ChunkColumnSummary {
name: "c1".into(),
memory_bytes: 11,
},
ChunkColumnSummary {
name: "c2".into(),
memory_bytes: 12,
},
],
},
),
(
Arc::new(TableSummary {
name: "t1".to_string(),
columns: vec![ColumnSummary {
name: "c1".to_string(),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::F64(StatValues::new(Some(110.0), Some(430.0), 667)),
}],
}),
DetailedChunkSummary {
inner: ChunkSummary {
partition_key: "p2".into(),
table_name: "t1".into(),
id: 43,
storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action,
memory_bytes: 23754,
object_store_bytes: 0,
row_count: 11,
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
},
columns: vec![ChunkColumnSummary {
name: "c1".into(),
memory_bytes: 100,
}],
},
),
(
Arc::new(TableSummary {
name: "t2".to_string(),
columns: vec![ColumnSummary {
name: "c3".to_string(),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::F64(StatValues::new(Some(-1.0), Some(2.0), 4)),
}],
}),
DetailedChunkSummary {
inner: ChunkSummary {
partition_key: "p2".into(),
table_name: "t2".into(),
id: 44,
storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action,
memory_bytes: 23754,
object_store_bytes: 0,
row_count: 11,
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
},
columns: vec![ChunkColumnSummary {
name: "c3".into(),
memory_bytes: 200,
}],
},
),
];
let expected = vec![
"+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+",
"| partition_key | chunk_id | table_name | column_name | storage | row_count | min_value | max_value | memory_bytes |",
"+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+",
"| p1 | 42 | t1 | c1 | ReadBuffer | 55 | bar | foo | 11 |",
"| p1 | 42 | t1 | c2 | ReadBuffer | 66 | 11 | 43 | 12 |",
"| p2 | 43 | t1 | c1 | OpenMutableBuffer | 667 | 110 | 430 | 100 |",
"| p2 | 44 | t2 | c3 | OpenMutableBuffer | 4 | -1 | 2 | 200 |",
"+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+",
];
let batch = assemble_chunk_columns(chunk_columns_schema(), summaries).unwrap();
assert_batches_eq!(&expected, &[batch]);
}
}

View File

@ -0,0 +1,201 @@
use std::sync::Arc;
use arrow::array::{StringArray, TimestampNanosecondArray, UInt32Array, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use arrow::error::Result;
use arrow::record_batch::RecordBatch;
use data_types::chunk_metadata::ChunkSummary;
use data_types::error::ErrorLogger;
use crate::db::catalog::Catalog;
use crate::db::system_tables::{time_to_ts, IoxSystemTable};
/// Implementation of system.chunks table
#[derive(Debug)]
pub(super) struct ChunksTable {
schema: SchemaRef,
catalog: Arc<Catalog>,
}
impl ChunksTable {
pub(super) fn new(catalog: Arc<Catalog>) -> Self {
Self {
schema: chunk_summaries_schema(),
catalog,
}
}
}
impl IoxSystemTable for ChunksTable {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn batch(&self) -> Result<RecordBatch> {
from_chunk_summaries(self.schema(), self.catalog.chunk_summaries())
.log_if_error("system.chunks table")
}
}
fn chunk_summaries_schema() -> SchemaRef {
let ts = DataType::Timestamp(TimeUnit::Nanosecond, None);
Arc::new(Schema::new(vec![
Field::new("id", DataType::UInt32, false),
Field::new("partition_key", DataType::Utf8, false),
Field::new("table_name", DataType::Utf8, false),
Field::new("storage", DataType::Utf8, false),
Field::new("lifecycle_action", DataType::Utf8, true),
Field::new("memory_bytes", DataType::UInt64, false),
Field::new("object_store_bytes", DataType::UInt64, false),
Field::new("row_count", DataType::UInt64, false),
Field::new("time_of_first_write", ts.clone(), true),
Field::new("time_of_last_write", ts.clone(), true),
Field::new("time_closed", ts, true),
]))
}
fn from_chunk_summaries(schema: SchemaRef, chunks: Vec<ChunkSummary>) -> Result<RecordBatch> {
let id = chunks.iter().map(|c| Some(c.id)).collect::<UInt32Array>();
let partition_key = chunks
.iter()
.map(|c| Some(c.partition_key.as_ref()))
.collect::<StringArray>();
let table_name = chunks
.iter()
.map(|c| Some(c.table_name.as_ref()))
.collect::<StringArray>();
let storage = chunks
.iter()
.map(|c| Some(c.storage.as_str()))
.collect::<StringArray>();
let lifecycle_action = chunks
.iter()
.map(|c| c.lifecycle_action.map(|a| a.name()))
.collect::<StringArray>();
let memory_bytes = chunks
.iter()
.map(|c| Some(c.memory_bytes as u64))
.collect::<UInt64Array>();
let object_store_bytes = chunks
.iter()
.map(|c| Some(c.object_store_bytes as u64).filter(|&v| v > 0))
.collect::<UInt64Array>();
let row_counts = chunks
.iter()
.map(|c| Some(c.row_count as u64))
.collect::<UInt64Array>();
let time_of_first_write = chunks
.iter()
.map(|c| c.time_of_first_write)
.map(time_to_ts)
.collect::<TimestampNanosecondArray>();
let time_of_last_write = chunks
.iter()
.map(|c| c.time_of_last_write)
.map(time_to_ts)
.collect::<TimestampNanosecondArray>();
let time_closed = chunks
.iter()
.map(|c| c.time_closed)
.map(time_to_ts)
.collect::<TimestampNanosecondArray>();
RecordBatch::try_new(
schema,
vec![
Arc::new(id),
Arc::new(partition_key),
Arc::new(table_name),
Arc::new(storage),
Arc::new(lifecycle_action),
Arc::new(memory_bytes),
Arc::new(object_store_bytes),
Arc::new(row_counts),
Arc::new(time_of_first_write),
Arc::new(time_of_last_write),
Arc::new(time_closed),
],
)
}
#[cfg(test)]
mod tests {
use chrono::{DateTime, NaiveDateTime, Utc};
use arrow_util::assert_batches_eq;
use data_types::chunk_metadata::{ChunkLifecycleAction, ChunkStorage};
use super::*;
#[test]
fn test_from_chunk_summaries() {
let chunks = vec![
ChunkSummary {
partition_key: Arc::from("p1"),
table_name: Arc::from("table1"),
id: 0,
storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action: None,
memory_bytes: 23754,
object_store_bytes: 0,
row_count: 11,
time_of_first_write: Some(DateTime::from_utc(
NaiveDateTime::from_timestamp(10, 0),
Utc,
)),
time_of_last_write: None,
time_closed: None,
},
ChunkSummary {
partition_key: Arc::from("p1"),
table_name: Arc::from("table1"),
id: 1,
storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action: Some(ChunkLifecycleAction::Persisting),
memory_bytes: 23455,
object_store_bytes: 0,
row_count: 22,
time_of_first_write: None,
time_of_last_write: Some(DateTime::from_utc(
NaiveDateTime::from_timestamp(80, 0),
Utc,
)),
time_closed: None,
},
ChunkSummary {
partition_key: Arc::from("p1"),
table_name: Arc::from("table1"),
id: 2,
storage: ChunkStorage::ObjectStoreOnly,
lifecycle_action: None,
memory_bytes: 1234,
object_store_bytes: 5678,
row_count: 33,
time_of_first_write: Some(DateTime::from_utc(
NaiveDateTime::from_timestamp(100, 0),
Utc,
)),
time_of_last_write: Some(DateTime::from_utc(
NaiveDateTime::from_timestamp(200, 0),
Utc,
)),
time_closed: None,
},
];
let expected = vec![
"+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+---------------------+---------------------+-------------+",
"| id | partition_key | table_name | storage | lifecycle_action | memory_bytes | object_store_bytes | row_count | time_of_first_write | time_of_last_write | time_closed |",
"+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+---------------------+---------------------+-------------+",
"| 0 | p1 | table1 | OpenMutableBuffer | | 23754 | | 11 | 1970-01-01 00:00:10 | | |",
"| 1 | p1 | table1 | OpenMutableBuffer | Persisting to Object Storage | 23455 | | 22 | | 1970-01-01 00:01:20 | |",
"| 2 | p1 | table1 | ObjectStoreOnly | | 1234 | 5678 | 33 | 1970-01-01 00:01:40 | 1970-01-01 00:03:20 | |",
"+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+---------------------+---------------------+-------------+",
];
let schema = chunk_summaries_schema();
let batch = from_chunk_summaries(schema, chunks).unwrap();
assert_batches_eq!(&expected, &[batch]);
}
}

View File

@ -0,0 +1,404 @@
use std::collections::HashMap;
use std::sync::Arc;
use arrow::array::{ArrayRef, StringBuilder, UInt32Builder, UInt64Builder};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::error::Result;
use arrow::record_batch::RecordBatch;
use data_types::chunk_metadata::DetailedChunkSummary;
use data_types::error::ErrorLogger;
use data_types::partition_metadata::{PartitionSummary, TableSummary};
use crate::db::catalog::Catalog;
use crate::db::system_tables::IoxSystemTable;
/// Implementation of `system.columns` system table
#[derive(Debug)]
pub(super) struct ColumnsTable {
schema: SchemaRef,
catalog: Arc<Catalog>,
}
impl ColumnsTable {
pub(super) fn new(catalog: Arc<Catalog>) -> Self {
Self {
schema: partition_summaries_schema(),
catalog,
}
}
}
impl IoxSystemTable for ColumnsTable {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn batch(&self) -> Result<RecordBatch> {
from_partition_summaries(self.schema(), self.catalog.partition_summaries())
.log_if_error("system.columns table")
}
}
fn partition_summaries_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("partition_key", DataType::Utf8, false),
Field::new("table_name", DataType::Utf8, false),
Field::new("column_name", DataType::Utf8, false),
Field::new("column_type", DataType::Utf8, false),
Field::new("influxdb_type", DataType::Utf8, true),
]))
}
fn from_partition_summaries(
schema: SchemaRef,
partitions: Vec<PartitionSummary>,
) -> Result<RecordBatch> {
// Assume each partition has roughly 5 tables with 5 columns
let row_estimate = partitions.len() * 25;
let mut partition_key = StringBuilder::new(row_estimate);
let mut table_name = StringBuilder::new(row_estimate);
let mut column_name = StringBuilder::new(row_estimate);
let mut column_type = StringBuilder::new(row_estimate);
let mut influxdb_type = StringBuilder::new(row_estimate);
// Note no rows are produced for partitions with no tabes, or
// tables with no columns: There are other tables to list tables
// and columns
for partition in partitions {
let table = partition.table;
for column in table.columns {
partition_key.append_value(&partition.key)?;
table_name.append_value(&table.name)?;
column_name.append_value(&column.name)?;
column_type.append_value(column.type_name())?;
if let Some(t) = &column.influxdb_type {
influxdb_type.append_value(t.as_str())?;
} else {
influxdb_type.append_null()?;
}
}
}
RecordBatch::try_new(
schema,
vec![
Arc::new(partition_key.finish()) as ArrayRef,
Arc::new(table_name.finish()),
Arc::new(column_name.finish()),
Arc::new(column_type.finish()),
Arc::new(influxdb_type.finish()),
],
)
}
/// Implementation of system.column_chunks table
#[derive(Debug)]
pub(super) struct ChunkColumnsTable {
schema: SchemaRef,
catalog: Arc<Catalog>,
}
impl ChunkColumnsTable {
pub(super) fn new(catalog: Arc<Catalog>) -> Self {
Self {
schema: chunk_columns_schema(),
catalog,
}
}
}
impl IoxSystemTable for ChunkColumnsTable {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn batch(&self) -> Result<RecordBatch> {
assemble_chunk_columns(self.schema(), self.catalog.detailed_chunk_summaries())
.log_if_error("system.column_chunks table")
}
}
fn chunk_columns_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("partition_key", DataType::Utf8, false),
Field::new("chunk_id", DataType::UInt32, false),
Field::new("table_name", DataType::Utf8, false),
Field::new("column_name", DataType::Utf8, false),
Field::new("storage", DataType::Utf8, false),
Field::new("row_count", DataType::UInt64, true),
Field::new("min_value", DataType::Utf8, true),
Field::new("max_value", DataType::Utf8, true),
Field::new("memory_bytes", DataType::UInt64, true),
]))
}
fn assemble_chunk_columns(
schema: SchemaRef,
chunk_summaries: Vec<(Arc<TableSummary>, DetailedChunkSummary)>,
) -> Result<RecordBatch> {
/// Builds an index from column_name -> size
fn make_column_index(summary: &DetailedChunkSummary) -> HashMap<&str, u64> {
summary
.columns
.iter()
.map(|column_summary| {
(
column_summary.name.as_ref(),
column_summary.memory_bytes as u64,
)
})
.collect()
}
// Assume each chunk has roughly 5 columns
let row_estimate = chunk_summaries.len() * 5;
let mut partition_key = StringBuilder::new(row_estimate);
let mut chunk_id = UInt32Builder::new(row_estimate);
let mut table_name = StringBuilder::new(row_estimate);
let mut column_name = StringBuilder::new(row_estimate);
let mut storage = StringBuilder::new(row_estimate);
let mut row_count = UInt64Builder::new(row_estimate);
let mut min_values = StringBuilder::new(row_estimate);
let mut max_values = StringBuilder::new(row_estimate);
let mut memory_bytes = UInt64Builder::new(row_estimate);
// Note no rows are produced for partitions with no chunks, or
// tables with no partitions: There are other tables to list tables
// and columns
for (table_summary, chunk_summary) in chunk_summaries {
let mut column_index = make_column_index(&chunk_summary);
let storage_value = chunk_summary.inner.storage.as_str();
for column in &table_summary.columns {
partition_key.append_value(chunk_summary.inner.partition_key.as_ref())?;
chunk_id.append_value(chunk_summary.inner.id)?;
table_name.append_value(&chunk_summary.inner.table_name)?;
column_name.append_value(&column.name)?;
storage.append_value(storage_value)?;
row_count.append_value(column.count())?;
if let Some(v) = column.stats.min_as_str() {
min_values.append_value(v)?;
} else {
min_values.append(false)?;
}
if let Some(v) = column.stats.max_as_str() {
max_values.append_value(v)?;
} else {
max_values.append(false)?;
}
let size = column_index.remove(column.name.as_str());
memory_bytes.append_option(size)?;
}
}
RecordBatch::try_new(
schema,
vec![
Arc::new(partition_key.finish()) as ArrayRef,
Arc::new(chunk_id.finish()),
Arc::new(table_name.finish()),
Arc::new(column_name.finish()),
Arc::new(storage.finish()),
Arc::new(row_count.finish()),
Arc::new(min_values.finish()),
Arc::new(max_values.finish()),
Arc::new(memory_bytes.finish()),
],
)
}
#[cfg(test)]
mod tests {
use arrow_util::assert_batches_eq;
use data_types::chunk_metadata::{ChunkColumnSummary, ChunkStorage, ChunkSummary};
use data_types::partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics};
use super::*;
#[test]
fn test_from_partition_summaries() {
let partitions = vec![
PartitionSummary {
key: "p1".to_string(),
table: TableSummary {
name: "t1".to_string(),
columns: vec![
ColumnSummary {
name: "c1".to_string(),
influxdb_type: Some(InfluxDbType::Tag),
stats: Statistics::I64(StatValues::new_with_value(23)),
},
ColumnSummary {
name: "c2".to_string(),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::I64(StatValues::new_with_value(43)),
},
ColumnSummary {
name: "c3".to_string(),
influxdb_type: None,
stats: Statistics::String(StatValues::new_with_value(
"foo".to_string(),
)),
},
ColumnSummary {
name: "time".to_string(),
influxdb_type: Some(InfluxDbType::Timestamp),
stats: Statistics::I64(StatValues::new_with_value(43)),
},
],
},
},
PartitionSummary {
key: "p3".to_string(),
table: TableSummary {
name: "t1".to_string(),
columns: vec![],
},
},
];
let expected = vec![
"+---------------+------------+-------------+-------------+---------------+",
"| partition_key | table_name | column_name | column_type | influxdb_type |",
"+---------------+------------+-------------+-------------+---------------+",
"| p1 | t1 | c1 | I64 | Tag |",
"| p1 | t1 | c2 | I64 | Field |",
"| p1 | t1 | c3 | String | |",
"| p1 | t1 | time | I64 | Timestamp |",
"+---------------+------------+-------------+-------------+---------------+",
];
let batch = from_partition_summaries(partition_summaries_schema(), partitions).unwrap();
assert_batches_eq!(&expected, &[batch]);
}
#[test]
fn test_assemble_chunk_columns() {
let lifecycle_action = None;
let summaries = vec![
(
Arc::new(TableSummary {
name: "t1".to_string(),
columns: vec![
ColumnSummary {
name: "c1".to_string(),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::String(StatValues::new(
Some("bar".to_string()),
Some("foo".to_string()),
55,
)),
},
ColumnSummary {
name: "c2".to_string(),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::F64(StatValues::new(Some(11.0), Some(43.0), 66)),
},
],
}),
DetailedChunkSummary {
inner: ChunkSummary {
partition_key: "p1".into(),
table_name: "t1".into(),
id: 42,
storage: ChunkStorage::ReadBuffer,
lifecycle_action,
memory_bytes: 23754,
object_store_bytes: 0,
row_count: 11,
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
},
columns: vec![
ChunkColumnSummary {
name: "c1".into(),
memory_bytes: 11,
},
ChunkColumnSummary {
name: "c2".into(),
memory_bytes: 12,
},
],
},
),
(
Arc::new(TableSummary {
name: "t1".to_string(),
columns: vec![ColumnSummary {
name: "c1".to_string(),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::F64(StatValues::new(Some(110.0), Some(430.0), 667)),
}],
}),
DetailedChunkSummary {
inner: ChunkSummary {
partition_key: "p2".into(),
table_name: "t1".into(),
id: 43,
storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action,
memory_bytes: 23754,
object_store_bytes: 0,
row_count: 11,
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
},
columns: vec![ChunkColumnSummary {
name: "c1".into(),
memory_bytes: 100,
}],
},
),
(
Arc::new(TableSummary {
name: "t2".to_string(),
columns: vec![ColumnSummary {
name: "c3".to_string(),
influxdb_type: Some(InfluxDbType::Field),
stats: Statistics::F64(StatValues::new(Some(-1.0), Some(2.0), 4)),
}],
}),
DetailedChunkSummary {
inner: ChunkSummary {
partition_key: "p2".into(),
table_name: "t2".into(),
id: 44,
storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action,
memory_bytes: 23754,
object_store_bytes: 0,
row_count: 11,
time_of_first_write: None,
time_of_last_write: None,
time_closed: None,
},
columns: vec![ChunkColumnSummary {
name: "c3".into(),
memory_bytes: 200,
}],
},
),
];
let expected = vec![
"+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+",
"| partition_key | chunk_id | table_name | column_name | storage | row_count | min_value | max_value | memory_bytes |",
"+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+",
"| p1 | 42 | t1 | c1 | ReadBuffer | 55 | bar | foo | 11 |",
"| p1 | 42 | t1 | c2 | ReadBuffer | 66 | 11 | 43 | 12 |",
"| p2 | 43 | t1 | c1 | OpenMutableBuffer | 667 | 110 | 430 | 100 |",
"| p2 | 44 | t2 | c3 | OpenMutableBuffer | 4 | -1 | 2 | 200 |",
"+---------------+----------+------------+-------------+-------------------+-----------+-----------+-----------+--------------+",
];
let batch = assemble_chunk_columns(chunk_columns_schema(), summaries).unwrap();
assert_batches_eq!(&expected, &[batch]);
}
}

View File

@ -0,0 +1,108 @@
use std::sync::Arc;
use arrow::array::{ArrayRef, StringArray, Time64NanosecondArray, UInt32Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use arrow::error::Result;
use arrow::record_batch::RecordBatch;
use data_types::error::ErrorLogger;
use data_types::job::Job;
use tracker::TaskTracker;
use crate::db::system_tables::IoxSystemTable;
use crate::JobRegistry;
/// Implementation of system.operations table
#[derive(Debug)]
pub(super) struct OperationsTable {
schema: SchemaRef,
db_name: String,
jobs: Arc<JobRegistry>,
}
impl OperationsTable {
pub(super) fn new(db_name: String, jobs: Arc<JobRegistry>) -> Self {
Self {
schema: operations_schema(),
db_name,
jobs,
}
}
}
impl IoxSystemTable for OperationsTable {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn batch(&self) -> Result<RecordBatch> {
from_task_trackers(self.schema(), &self.db_name, self.jobs.tracked())
.log_if_error("system.operations table")
}
}
fn operations_schema() -> SchemaRef {
let ts = DataType::Time64(TimeUnit::Nanosecond);
Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("status", DataType::Utf8, true),
Field::new("cpu_time_used", ts.clone(), true),
Field::new("wall_time_used", ts, true),
Field::new("partition_key", DataType::Utf8, true),
Field::new("chunk_id", DataType::UInt32, true),
Field::new("description", DataType::Utf8, true),
]))
}
fn from_task_trackers(
schema: SchemaRef,
db_name: &str,
jobs: Vec<TaskTracker<Job>>,
) -> Result<RecordBatch> {
let jobs = jobs
.into_iter()
.filter(|job| job.metadata().db_name() == Some(db_name))
.collect::<Vec<_>>();
let ids = jobs
.iter()
.map(|job| Some(job.id().to_string()))
.collect::<StringArray>();
let statuses = jobs
.iter()
.map(|job| Some(job.get_status().name()))
.collect::<StringArray>();
let cpu_time_used = jobs
.iter()
.map(|job| job.get_status().cpu_nanos().map(|n| n as i64))
.collect::<Time64NanosecondArray>();
let wall_time_used = jobs
.iter()
.map(|job| job.get_status().wall_nanos().map(|n| n as i64))
.collect::<Time64NanosecondArray>();
let partition_keys = jobs
.iter()
.map(|job| job.metadata().partition_key())
.collect::<StringArray>();
let chunk_ids = jobs
.iter()
.map(|job| job.metadata().chunk_id())
.collect::<UInt32Array>();
let descriptions = jobs
.iter()
.map(|job| Some(job.metadata().description()))
.collect::<StringArray>();
RecordBatch::try_new(
schema,
vec![
Arc::new(ids) as ArrayRef,
Arc::new(statuses),
Arc::new(cpu_time_used),
Arc::new(wall_time_used),
Arc::new(partition_keys),
Arc::new(chunk_ids),
Arc::new(descriptions),
],
)
}

View File

@ -2,29 +2,19 @@
use data_types::{
database_rules::{DatabaseRules, WriteBufferConnection},
database_state::DatabaseStateCode,
server_id::ServerId,
error::ErrorLogger,
DatabaseName,
};
use futures::TryStreamExt;
use generated_types::database_rules::decode_database_rules;
use internal_types::once::OnceNonZeroU32;
use object_store::{
path::{parsed::DirsAndFileName, ObjectStorePath, Path},
ObjectStore, ObjectStoreApi,
};
use observability_deps::tracing::{debug, error, info, warn};
use parking_lot::Mutex;
use observability_deps::tracing::{error, info, warn};
use parquet_file::catalog::PreservedCatalog;
use query::exec::Executor;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tokio::sync::Semaphore;
use snafu::{ResultExt, Snafu};
use std::sync::Arc;
use write_buffer::config::WriteBufferConfig;
use crate::{
@ -45,9 +35,6 @@ pub enum Error {
source: generated_types::database_rules::DecodeError,
},
#[snafu(display("id already set"))]
IdAlreadySet { id: ServerId },
#[snafu(display("unable to use server until id is set"))]
IdNotSet,
@ -97,472 +84,254 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Default)]
pub struct CurrentServerId(OnceNonZeroU32);
/// Loads the database configurations based on the databases in the
/// object store. Any databases in the config already won't be
/// replaced.
///
/// Returns a Vec containing the results of loading the contained databases
pub(crate) async fn initialize_server(
config: Arc<Config>,
wipe_on_error: bool,
) -> Result<Vec<(DatabaseName<'static>, Result<()>)>> {
let root = config.root_path();
impl CurrentServerId {
pub fn set(&self, id: ServerId) -> Result<()> {
let id = id.get();
// get the database names from the object store prefixes
// TODO: update object store to pull back all common prefixes by
// following the next tokens.
let list_result = config
.object_store()
.list_with_delimiter(&root)
.await
.context(StoreError)?;
match self.0.set(id) {
Ok(()) => {
info!(server_id = id, "server ID set");
Ok(())
}
Err(id) => Err(Error::IdAlreadySet {
id: ServerId::new(id),
}),
}
}
let handles: Vec<_> = list_result
.common_prefixes
.into_iter()
.filter_map(|mut path| {
let config = Arc::clone(&config);
let root = root.clone();
path.set_file_name(DB_RULES_FILE_NAME);
let db_name = db_name_from_rules_path(&path)
.log_if_error("invalid database path")
.ok()?;
pub fn get(&self) -> Result<ServerId> {
self.0.get().map(ServerId::new).context(IdNotSet)
}
}
#[derive(Debug)]
pub struct InitStatus {
pub server_id: CurrentServerId,
/// Flags that databases are loaded and server is ready to read/write data.
initialized: AtomicBool,
/// Semaphore that limits the number of jobs that load DBs when the serverID is set.
///
/// Note that this semaphore is more of a "lock" than an arbitrary semaphore. All the other sync structures (mutex,
/// rwlock) require something to be wrapped which we don't have in our case, so we're using a semaphore here. We
/// want exactly 1 background worker to mess with the server init / DB loading, otherwise everything in the critical
/// section (in [`maybe_initialize_server`](Self::maybe_initialize_server)) will break apart. So this semaphore
/// cannot be configured.
initialize_semaphore: Semaphore,
/// Error occurred during generic server init (e.g. listing store content).
error_generic: Mutex<Option<Arc<Error>>>,
/// Errors that occurred during some DB init.
errors_databases: Arc<Mutex<HashMap<String, Arc<Error>>>>,
/// Automatic wipe-on-error recovery
///
/// See <https://github.com/influxdata/influxdb_iox/issues/1522>
pub(crate) wipe_on_error: AtomicBool,
}
impl InitStatus {
/// Create new "not initialized" status.
pub fn new() -> Self {
Self {
server_id: Default::default(),
initialized: AtomicBool::new(false),
// Always set semaphore permits to `1`, see design comments in `Server::initialize_semaphore`.
initialize_semaphore: Semaphore::new(1),
error_generic: Default::default(),
errors_databases: Default::default(),
wipe_on_error: AtomicBool::new(true),
}
}
/// Base location in object store for this writer.
pub fn root_path(&self, store: &ObjectStore) -> Result<Path> {
let id = self.server_id.get()?;
let mut path = store.new_path();
path.push_dir(format!("{}", id));
Ok(path)
}
/// Check if server is loaded. Databases are loaded and server is ready to read/write.
pub fn initialized(&self) -> bool {
// Need `Acquire` ordering because IF we a `true` here, this thread will likely also read data that
// `maybe_initialize_server` wrote before toggling the flag with `Release`. The `Acquire` flag here ensures that
// every data acccess AFTER the following line will also stay AFTER this line.
self.initialized.load(Ordering::Acquire)
}
/// Error occurred during generic server init (e.g. listing store content).
pub fn error_generic(&self) -> Option<Arc<Error>> {
let guard = self.error_generic.lock();
guard.clone()
}
/// List all databases with errors in sorted order.
pub fn databases_with_errors(&self) -> Vec<String> {
let guard = self.errors_databases.lock();
let mut names: Vec<_> = guard.keys().cloned().collect();
names.sort();
names
}
/// Error that occurred during initialization of a specific database.
pub fn error_database(&self, db_name: &str) -> Option<Arc<Error>> {
let guard = self.errors_databases.lock();
guard.get(db_name).cloned()
}
/// Loads the database configurations based on the databases in the
/// object store. Any databases in the config already won't be
/// replaced.
///
/// This requires the serverID to be set (will be a no-op otherwise).
///
/// It will be a no-op if the configs are already loaded and the server is ready.
pub(crate) async fn maybe_initialize_server(
&self,
store: Arc<ObjectStore>,
config: Arc<Config>,
exec: Arc<Executor>,
) {
let server_id = match self.server_id.get() {
Ok(id) => id,
Err(e) => {
debug!(%e, "cannot initialize server because cannot get serverID");
return;
}
};
let _guard = self
.initialize_semaphore
.acquire()
.await
.expect("semaphore should not be closed");
// Note that we use Acquire-Release ordering for the atomic within the semaphore to ensure that another thread
// that enters this semaphore after we've left actually sees the correct `is_ready` flag.
if self.initialized.load(Ordering::Acquire) {
// already loaded, so do nothing
return;
}
// Check if there was a previous failed attempt
if self.error_generic().is_some() {
return;
}
match self
.maybe_initialize_server_inner(store, config, exec, server_id)
.await
{
Ok(_) => {
// mark as ready (use correct ordering for Acquire-Release)
self.initialized.store(true, Ordering::Release);
info!("loaded databases, server is initalized");
}
Err(e) => {
error!(%e, "error during server init");
let mut guard = self.error_generic.lock();
*guard = Some(Arc::new(e));
}
}
}
async fn maybe_initialize_server_inner(
&self,
store: Arc<ObjectStore>,
config: Arc<Config>,
exec: Arc<Executor>,
server_id: ServerId,
) -> Result<()> {
let root = self.root_path(&store)?;
// get the database names from the object store prefixes
// TODO: update object store to pull back all common prefixes by
// following the next tokens.
let list_result = store.list_with_delimiter(&root).await.context(StoreError)?;
let handles: Vec<_> = list_result
.common_prefixes
.into_iter()
.filter_map(|mut path| {
let store = Arc::clone(&store);
let config = Arc::clone(&config);
let exec = Arc::clone(&exec);
let errors_databases = Arc::clone(&self.errors_databases);
let wipe_on_error = self.wipe_on_error.load(Ordering::Relaxed);
let root = root.clone();
path.set_file_name(DB_RULES_FILE_NAME);
match db_name_from_rules_path(&path) {
Ok(db_name) => {
let handle = tokio::task::spawn(async move {
match Self::initialize_database(
server_id,
store,
config,
exec,
root,
db_name.clone(),
wipe_on_error,
)
.await
{
Ok(()) => {
info!(%db_name, "database initialized");
}
Err(e) => {
error!(%e, %db_name, "cannot load database");
let mut guard = errors_databases.lock();
guard.insert(db_name.to_string(), Arc::new(e));
}
}
});
Some(handle)
}
Err(e) => {
error!(%e, "invalid database path");
None
}
}
Some(async move {
let result =
initialize_database(config, root, db_name.clone(), wipe_on_error).await;
(db_name, result)
})
.collect();
})
.collect();
futures::future::join_all(handles).await;
Ok(futures::future::join_all(handles).await)
}
async fn initialize_database(
config: Arc<Config>,
root: Path,
db_name: DatabaseName<'static>,
wipe_on_error: bool,
) -> Result<()> {
// Reserve name before expensive IO (e.g. loading the preserved catalog)
let mut handle = config
.create_db(db_name)
.map_err(Box::new)
.context(InitDbError)?;
match try_advance_database_init_process_until_complete(&mut handle, &root, wipe_on_error).await
{
Ok(true) => {
// finished init and keep DB
handle.commit();
Ok(())
}
Ok(false) => {
// finished but do not keep DB
handle.abort();
Ok(())
}
Err(e) => {
// encountered some error, still commit intermediate result
handle.commit();
Err(e)
}
}
}
async fn load_database_rules(store: Arc<ObjectStore>, path: Path) -> Result<Option<DatabaseRules>> {
let serialized_rules = loop {
match get_database_config_bytes(&path, &store).await {
Ok(data) => break data,
Err(e) => {
if let Error::NoDatabaseConfigError { location } = &e {
warn!(?location, "{}", e);
return Ok(None);
}
error!(
"error getting database config {:?} from object store: {}",
path, e
);
tokio::time::sleep(tokio::time::Duration::from_secs(STORE_ERROR_PAUSE_SECONDS))
.await;
}
}
};
let rules = decode_database_rules(serialized_rules.freeze())
.context(ErrorDeserializingRulesProtobuf)?;
Ok(Some(rules))
}
pub(crate) async fn wipe_preserved_catalog_and_maybe_recover(
config: Arc<Config>,
db_name: &DatabaseName<'static>,
) -> Result<()> {
let store = config.object_store();
if config.has_uninitialized_database(db_name) {
let mut handle = config
.recover_db(db_name.clone())
.map_err(|e| Arc::new(e) as _)
.context(RecoverDbError)?;
if !((handle.state_code() == DatabaseStateCode::Known)
|| (handle.state_code() == DatabaseStateCode::RulesLoaded))
{
// cannot wipe because init state is already too far
return Err(Error::DbPartiallyInitialized {
db_name: db_name.to_string(),
});
}
// wipe while holding handle so no other init/wipe process can interact with the catalog
PreservedCatalog::wipe(&store, handle.server_id(), db_name)
.await
.map_err(Box::new)
.context(PreservedCatalogWipeError)?;
let root = config.root_path();
let result =
try_advance_database_init_process_until_complete(&mut handle, &root, true).await;
// Commit changes even if failed
handle.commit();
result.map(|_| ())
} else {
let handle = config
.block_db(db_name.clone())
.map_err(|e| Arc::new(e) as _)
.context(RecoverDbError)?;
PreservedCatalog::wipe(&store, config.server_id(), db_name)
.await
.map_err(Box::new)
.context(PreservedCatalogWipeError)?;
drop(handle);
info!(%db_name, "wiped preserved catalog of non-registered database");
Ok(())
}
}
async fn initialize_database(
server_id: ServerId,
store: Arc<ObjectStore>,
config: Arc<Config>,
exec: Arc<Executor>,
root: Path,
db_name: DatabaseName<'static>,
wipe_on_error: bool,
) -> Result<()> {
// Reserve name before expensive IO (e.g. loading the preserved catalog)
let mut handle = config
.create_db(store, exec, server_id, db_name)
.map_err(Box::new)
.context(InitDbError)?;
match Self::try_advance_database_init_process_until_complete(
&mut handle,
&root,
wipe_on_error,
)
.await
{
Ok(true) => {
// finished init and keep DB
handle.commit();
Ok(())
/// Try to make as much progress as possible with DB init.
///
/// Returns an error if there was an error along the way (in which case the handle should still be commit to safe
/// the intermediate result). Returns `Ok(true)` if DB init is finished and `Ok(false)` if the DB can be forgotten
/// (e.g. because not rules file is present.)
async fn try_advance_database_init_process_until_complete(
handle: &mut DatabaseHandle<'_>,
root: &Path,
wipe_on_error: bool,
) -> Result<bool> {
loop {
match try_advance_database_init_process(handle, root, wipe_on_error).await? {
InitProgress::Unfinished => {}
InitProgress::Done => {
return Ok(true);
}
Ok(false) => {
// finished but do not keep DB
handle.abort();
Ok(())
}
Err(e) => {
// encountered some error, still commit intermediate result
handle.commit();
Err(e)
InitProgress::Forget => {
return Ok(false);
}
}
}
}
async fn load_database_rules(
store: Arc<ObjectStore>,
path: Path,
) -> Result<Option<DatabaseRules>> {
let serialized_rules = loop {
match get_database_config_bytes(&path, &store).await {
Ok(data) => break data,
Err(e) => {
if let Error::NoDatabaseConfigError { location } = &e {
warn!(?location, "{}", e);
return Ok(None);
}
error!(
"error getting database config {:?} from object store: {}",
path, e
);
tokio::time::sleep(tokio::time::Duration::from_secs(STORE_ERROR_PAUSE_SECONDS))
.await;
/// Try to make some progress in the DB init.
async fn try_advance_database_init_process(
handle: &mut DatabaseHandle<'_>,
root: &Path,
wipe_on_error: bool,
) -> Result<InitProgress> {
match handle.state_code() {
DatabaseStateCode::Known => {
// known => load DB rules
let path = object_store_path_for_database_config(root, &handle.db_name());
match load_database_rules(handle.object_store(), path).await? {
Some(rules) => {
handle
.advance_rules_loaded(rules)
.map_err(Box::new)
.context(InitDbError)?;
// there is still more work to do for this DB
Ok(InitProgress::Unfinished)
}
None => {
// no rules file present, advice to forget his DB
Ok(InitProgress::Forget)
}
}
};
let rules = decode_database_rules(serialized_rules.freeze())
.context(ErrorDeserializingRulesProtobuf)?;
Ok(Some(rules))
}
pub(crate) async fn wipe_preserved_catalog_and_maybe_recover(
&self,
store: Arc<ObjectStore>,
config: Arc<Config>,
server_id: ServerId,
db_name: DatabaseName<'static>,
) -> Result<()> {
if config.has_uninitialized_database(&db_name) {
let mut handle = config
.recover_db(db_name.clone())
.map_err(|e| Arc::new(e) as _)
.context(RecoverDbError)?;
if !((handle.state_code() == DatabaseStateCode::Known)
|| (handle.state_code() == DatabaseStateCode::RulesLoaded))
{
// cannot wipe because init state is already too far
return Err(Error::DbPartiallyInitialized {
db_name: db_name.to_string(),
});
}
// wipe while holding handle so no other init/wipe process can interact with the catalog
PreservedCatalog::wipe(&store, handle.server_id(), &db_name)
.await
.map_err(Box::new)
.context(PreservedCatalogWipeError)?;
let root = self.root_path(&store)?;
let wipe_on_error = self.wipe_on_error.load(Ordering::Relaxed);
match Self::try_advance_database_init_process_until_complete(
&mut handle,
&root,
}
DatabaseStateCode::RulesLoaded => {
// rules already loaded => continue with loading preserved catalog
let (preserved_catalog, catalog) = load_or_create_preserved_catalog(
&handle.db_name(),
handle.object_store(),
handle.server_id(),
handle.metrics_registry(),
wipe_on_error,
)
.await
{
Ok(_) => {
// yeah, recovered DB
handle.commit();
.map_err(|e| Box::new(e) as _)
.context(CatalogLoadError)?;
let mut guard = self.errors_databases.lock();
guard.remove(&db_name.to_string());
info!(%db_name, "wiped preserved catalog of registered database and recovered");
Ok(())
}
Err(e) => {
// could not recover, but still keep new result
handle.commit();
let mut guard = self.errors_databases.lock();
let e = Arc::new(e);
guard.insert(db_name.to_string(), Arc::clone(&e));
warn!(%db_name, %e, "wiped preserved catalog of registered database but still cannot recover");
Err(Error::RecoverDbError { source: e })
}
}
} else {
let handle = config
.block_db(db_name.clone())
.map_err(|e| Arc::new(e) as _)
.context(RecoverDbError)?;
PreservedCatalog::wipe(&store, server_id, &db_name)
let rules = handle
.rules()
.expect("in this state rules should be loaded");
let write_buffer = WriteBufferConfig::new(handle.server_id(), &rules)
.await
.context(CreateWriteBuffer {
config: rules.write_buffer_connection.clone(),
})?;
info!(write_buffer_enabled=?write_buffer.is_some(), db_name=rules.db_name(), "write buffer config");
handle
.advance_replay(preserved_catalog, catalog, write_buffer)
.map_err(Box::new)
.context(PreservedCatalogWipeError)?;
.context(InitDbError)?;
drop(handle);
info!(%db_name, "wiped preserved catalog of non-registered database");
Ok(())
// there is still more work to do for this DB
Ok(InitProgress::Unfinished)
}
}
DatabaseStateCode::Replay => {
let db = handle
.db_any_state()
.expect("DB should be available in this state");
db.perform_replay().await;
/// Try to make as much progress as possible with DB init.
///
/// Returns an error if there was an error along the way (in which case the handle should still be commit to safe
/// the intermediate result). Returns `Ok(true)` if DB init is finished and `Ok(false)` if the DB can be forgotten
/// (e.g. because not rules file is present.)
async fn try_advance_database_init_process_until_complete(
handle: &mut DatabaseHandle<'_>,
root: &Path,
wipe_on_error: bool,
) -> Result<bool> {
loop {
match Self::try_advance_database_init_process(handle, root, wipe_on_error).await? {
InitProgress::Unfinished => {}
InitProgress::Done => {
return Ok(true);
}
InitProgress::Forget => {
return Ok(false);
}
}
handle
.advance_init()
.map_err(Box::new)
.context(InitDbError)?;
// there is still more work to do for this DB
Ok(InitProgress::Unfinished)
}
}
/// Try to make some progress in the DB init.
async fn try_advance_database_init_process(
handle: &mut DatabaseHandle<'_>,
root: &Path,
wipe_on_error: bool,
) -> Result<InitProgress> {
match handle.state_code() {
DatabaseStateCode::Known => {
// known => load DB rules
let path = object_store_path_for_database_config(root, &handle.db_name());
match Self::load_database_rules(handle.object_store(), path).await? {
Some(rules) => {
handle
.advance_rules_loaded(rules)
.map_err(Box::new)
.context(InitDbError)?;
// there is still more work to do for this DB
Ok(InitProgress::Unfinished)
}
None => {
// no rules file present, advice to forget his DB
Ok(InitProgress::Forget)
}
}
}
DatabaseStateCode::RulesLoaded => {
// rules already loaded => continue with loading preserved catalog
let (preserved_catalog, catalog) = load_or_create_preserved_catalog(
&handle.db_name(),
handle.object_store(),
handle.server_id(),
handle.metrics_registry(),
wipe_on_error,
)
.await
.map_err(|e| Box::new(e) as _)
.context(CatalogLoadError)?;
let rules = handle
.rules()
.expect("in this state rules should be loaded");
let write_buffer = WriteBufferConfig::new(handle.server_id(), &rules)
.await
.context(CreateWriteBuffer {
config: rules.write_buffer_connection.clone(),
})?;
info!(write_buffer_enabled=?write_buffer.is_some(), db_name=rules.db_name(), "write buffer config");
handle
.advance_replay(preserved_catalog, catalog, write_buffer)
.map_err(Box::new)
.context(InitDbError)?;
// there is still more work to do for this DB
Ok(InitProgress::Unfinished)
}
DatabaseStateCode::Replay => {
let db = handle
.db_any_state()
.expect("DB should be available in this state");
db.perform_replay().await;
handle
.advance_init()
.map_err(Box::new)
.context(InitDbError)?;
// there is still more work to do for this DB
Ok(InitProgress::Unfinished)
}
DatabaseStateCode::Initialized => {
// database fully initialized => nothing to do
Ok(InitProgress::Done)
}
DatabaseStateCode::Initialized => {
// database fully initialized => nothing to do
Ok(InitProgress::Done)
}
}
}

View File

@ -74,9 +74,8 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::BytesMut;
use db::load::create_preserved_catalog;
use init::InitStatus;
use observability_deps::tracing::{debug, info, warn};
use parking_lot::Mutex;
use observability_deps::tracing::{debug, error, info, warn};
use parking_lot::{Mutex, RwLockUpgradableReadGuard};
use snafu::{OptionExt, ResultExt, Snafu};
use data_types::{
@ -93,6 +92,7 @@ use generated_types::influxdata::transfer::column::v1 as pb;
use influxdb_line_protocol::ParsedLine;
use metrics::{KeyValue, MetricObserverBuilder, MetricRegistry};
use object_store::{ObjectStore, ObjectStoreApi};
use parking_lot::RwLock;
use query::{exec::Executor, DatabaseStore};
use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt};
use write_buffer::config::WriteBufferConfig;
@ -220,11 +220,11 @@ pub enum Error {
#[snafu(display("cannot create preserved catalog: {}", source))]
CannotCreatePreservedCatalog { source: DatabaseError },
#[snafu(display("cannot set id: {}", source))]
SetIdError { source: crate::init::Error },
#[snafu(display("id already set"))]
IdAlreadySet,
#[snafu(display("cannot get id: {}", source))]
GetIdError { source: crate::init::Error },
#[snafu(display("id not set"))]
IdNotSet,
#[snafu(display(
"cannot create write buffer with config: {:?}, error: {}",
@ -297,6 +297,8 @@ pub struct ServerConfig {
metric_registry: Arc<MetricRegistry>,
remote_template: Option<RemoteTemplate>,
wipe_catalog_on_error: bool,
}
impl ServerConfig {
@ -311,6 +313,7 @@ impl ServerConfig {
object_store,
metric_registry,
remote_template,
wipe_catalog_on_error: true,
}
}
@ -414,7 +417,6 @@ impl ServerMetrics {
/// of these structs, which keeps track of all replication and query rules.
#[derive(Debug)]
pub struct Server<M: ConnectionManager> {
config: Arc<Config>,
connection_manager: Arc<M>,
pub store: Arc<ObjectStore>,
exec: Arc<Executor>,
@ -426,7 +428,50 @@ pub struct Server<M: ConnectionManager> {
/// and populates the endpoint with this data.
pub registry: Arc<metrics::MetricRegistry>,
init_status: Arc<InitStatus>,
/// The state machine for server startup
stage: Arc<RwLock<ServerStage>>,
}
/// The stage of the server in the startup process
///
/// The progression is linear Startup -> InitReady -> Initializing -> Initialized
/// with the sole exception that on failure Initializing -> InitReady
///
/// Errors encountered on server init will be retried, however, errors encountered
/// during database init will require operator intervention
///
/// These errors are exposed via `Server::error_generic` and `Server::error_database` respectively
///
/// They do not impact the state machine's progression, but instead are exposed to the
/// gRPC management API to allow an operator to assess the state of the system
#[derive(Debug)]
enum ServerStage {
/// Server has started but doesn't have a server id yet
Startup {
remote_template: Option<RemoteTemplate>,
wipe_catalog_on_error: bool,
},
/// Server can be initialized
InitReady {
wipe_catalog_on_error: bool,
config: Arc<Config>,
last_error: Option<Arc<init::Error>>,
},
/// Server has a server id, has started loading
Initializing {
wipe_catalog_on_error: bool,
config: Arc<Config>,
last_error: Option<Arc<init::Error>>,
},
/// Server has finish initializing, possibly with errors
Initialized {
config: Arc<Config>,
/// Errors that occurred during some DB init.
database_errors: HashMap<String, Arc<init::Error>>,
},
}
#[derive(Debug)]
@ -454,22 +499,23 @@ where
// to test the metrics provide a different registry to the `ServerConfig`.
metric_registry,
remote_template,
wipe_catalog_on_error,
} = config;
let num_worker_threads = num_worker_threads.unwrap_or_else(num_cpus::get);
let exec = Arc::new(Executor::new(num_worker_threads));
Self {
config: Arc::new(Config::new(
Arc::clone(&jobs),
Arc::clone(&metric_registry),
remote_template,
)),
store: object_store,
connection_manager: Arc::new(connection_manager),
exec: Arc::new(Executor::new(num_worker_threads)),
exec,
jobs,
metrics: Arc::new(ServerMetrics::new(Arc::clone(&metric_registry))),
registry: Arc::clone(&metric_registry),
init_status: Arc::new(InitStatus::new()),
stage: Arc::new(RwLock::new(ServerStage::Startup {
remote_template,
wipe_catalog_on_error,
})),
}
}
@ -478,68 +524,112 @@ where
///
/// A valid server ID Must be non-zero.
pub fn set_id(&self, id: ServerId) -> Result<()> {
self.init_status.server_id.set(id).context(SetIdError)
}
let mut stage = self.stage.write();
match &mut *stage {
ServerStage::Startup {
remote_template,
wipe_catalog_on_error,
} => {
let remote_template = remote_template.take();
/// Returns the current server ID, or an error if not yet set.
pub fn require_id(&self) -> Result<ServerId> {
self.init_status.server_id.get().context(GetIdError)
*stage = ServerStage::InitReady {
wipe_catalog_on_error: *wipe_catalog_on_error,
config: Arc::new(Config::new(
Arc::clone(&self.jobs),
Arc::clone(&self.store),
Arc::clone(&self.exec),
id,
Arc::clone(&self.registry),
remote_template,
)),
last_error: None,
};
Ok(())
}
_ => Err(Error::IdAlreadySet),
}
}
/// Check if server is loaded. Databases are loaded and server is ready to read/write.
pub fn initialized(&self) -> bool {
self.init_status.initialized()
matches!(&*self.stage.read(), ServerStage::Initialized { .. })
}
/// Require that server is loaded. Databases are loaded and server is ready to read/write.
fn require_initialized(&self) -> Result<Arc<Config>> {
match &*self.stage.read() {
ServerStage::Startup { .. } => Err(Error::IdNotSet),
ServerStage::InitReady { config, .. } | ServerStage::Initializing { config, .. } => {
Err(Error::ServerNotInitialized {
server_id: config.server_id(),
})
}
ServerStage::Initialized { config, .. } => Ok(Arc::clone(&config)),
}
}
/// Returns the config for this server if server id has been set
fn config(&self) -> Result<Arc<Config>> {
let stage = self.stage.read();
match &*stage {
ServerStage::Startup { .. } => Err(Error::IdNotSet),
ServerStage::InitReady { config, .. }
| ServerStage::Initializing { config, .. }
| ServerStage::Initialized { config, .. } => Ok(Arc::clone(&config)),
}
}
/// Returns the server id for this server if set
pub fn server_id(&self) -> Option<ServerId> {
self.config().map(|x| x.server_id()).ok()
}
/// Error occurred during generic server init (e.g. listing store content).
pub fn error_generic(&self) -> Option<Arc<crate::init::Error>> {
self.init_status.error_generic()
let stage = self.stage.read();
match &*stage {
ServerStage::InitReady { last_error, .. } => last_error.clone(),
ServerStage::Initializing { last_error, .. } => last_error.clone(),
_ => None,
}
}
/// List all databases with errors in sorted order.
pub fn databases_with_errors(&self) -> Vec<String> {
self.init_status.databases_with_errors()
let stage = self.stage.read();
match &*stage {
ServerStage::Initialized {
database_errors, ..
} => database_errors.keys().cloned().collect(),
_ => Default::default(),
}
}
/// Error that occurred during initialization of a specific database.
pub fn error_database(&self, db_name: &str) -> Option<Arc<crate::init::Error>> {
self.init_status.error_database(db_name)
let stage = self.stage.read();
match &*stage {
ServerStage::Initialized {
database_errors, ..
} => database_errors.get(db_name).cloned(),
_ => None,
}
}
/// Current database init state.
pub fn database_state(&self, name: &str) -> Option<DatabaseStateCode> {
if let Ok(name) = DatabaseName::new(name) {
self.config.db_state(&name)
} else {
None
}
}
/// Require that server is loaded. Databases are loaded and server is ready to read/write.
fn require_initialized(&self) -> Result<ServerId> {
// since a server ID is the pre-requirement for init, check this first
let server_id = self.require_id()?;
// ordering here isn't that important since this method is not used to check-and-modify the flag
if self.initialized() {
Ok(server_id)
} else {
Err(Error::ServerNotInitialized { server_id })
}
let db_name = DatabaseName::new(name).ok()?;
let config = self.config().ok()?;
config.db_state(&db_name)
}
/// Tells the server the set of rules for a database.
pub async fn create_database(&self, rules: DatabaseRules) -> Result<()> {
// Return an error if this server is not yet ready
let server_id = self.require_initialized()?;
let config = self.require_initialized()?;
// Reserve name before expensive IO (e.g. loading the preserved catalog)
let mut db_reservation = self.config.create_db(
Arc::clone(&self.store),
Arc::clone(&self.exec),
server_id,
rules.name.clone(),
)?;
let mut db_reservation = config.create_db(rules.name.clone())?;
// register rules
db_reservation.advance_rules_loaded(rules.clone())?;
@ -548,14 +638,14 @@ where
let (preserved_catalog, catalog) = create_preserved_catalog(
rules.db_name(),
Arc::clone(&self.store),
server_id,
self.config.metrics_registry(),
config.server_id(),
config.metrics_registry(),
)
.await
.map_err(|e| Box::new(e) as _)
.context(CannotCreatePreservedCatalog)?;
let write_buffer = WriteBufferConfig::new(server_id, &rules)
let write_buffer = WriteBufferConfig::new(config.server_id(), &rules)
.await
.map_err(|e| Error::CreatingWriteBuffer {
config: rules.write_buffer_connection.clone(),
@ -575,13 +665,8 @@ where
}
pub async fn persist_database_rules<'a>(&self, rules: DatabaseRules) -> Result<()> {
let location = object_store_path_for_database_config(
&self
.init_status
.root_path(&self.store)
.context(GetIdError)?,
&rules.name,
);
let config = self.config()?;
let location = object_store_path_for_database_config(&config.root_path(), &rules.name);
let mut data = BytesMut::new();
encode_database_rules(rules, &mut data).context(ErrorSerializingRulesProtobuf)?;
@ -604,15 +689,62 @@ where
/// object store. Any databases in the config already won't be
/// replaced.
///
/// This requires the serverID to be set. It will be a no-op if the configs are already loaded and the server is ready.
/// This requires the serverID to be set.
///
/// It will be a no-op if the configs are already loaded and the server is ready.
pub async fn maybe_initialize_server(&self) {
self.init_status
.maybe_initialize_server(
Arc::clone(&self.store),
Arc::clone(&self.config),
Arc::clone(&self.exec),
)
.await;
// Explicit scope to help async generator
let (wipe_catalog_on_error, config) = {
let state = self.stage.upgradable_read();
match &*state {
ServerStage::InitReady {
wipe_catalog_on_error,
config,
last_error,
} => {
let config = Arc::clone(config);
let last_error = last_error.clone();
let wipe_catalog_on_error = *wipe_catalog_on_error;
// Mark the server as initializing and drop lock
let mut state = RwLockUpgradableReadGuard::upgrade(state);
*state = ServerStage::Initializing {
config: Arc::clone(&config),
wipe_catalog_on_error,
last_error,
};
(wipe_catalog_on_error, config)
}
_ => return,
}
};
let init_result = init::initialize_server(Arc::clone(&config), wipe_catalog_on_error).await;
let new_stage = match init_result {
// Success -> move to next stage
Ok(results) => {
info!(server_id=%config.server_id(), "server initialized");
ServerStage::Initialized {
config,
database_errors: results
.into_iter()
.filter_map(|(name, res)| Some((name.to_string(), Arc::new(res.err()?))))
.collect(),
}
}
// Error -> return to InitReady
Err(err) => {
error!(%err, "error during server init");
ServerStage::InitReady {
wipe_catalog_on_error,
config,
last_error: Some(Arc::new(err)),
}
}
};
*self.stage.write() = new_stage;
}
pub async fn write_pb(&self, database_batch: pb::DatabaseBatch) -> Result<()> {
@ -640,11 +772,10 @@ where
default_time: i64,
) -> Result<()> {
// Return an error if this server is not yet ready
self.require_initialized()?;
let config = self.require_initialized()?;
let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?;
let db = self
.config
let db = config
.db_initialized(&db_name)
.context(DatabaseNotFound { db_name: &*db_name })?;
@ -744,9 +875,12 @@ where
node_group: &[ServerId],
entry: Entry,
) -> Result<()> {
// Return an error if this server is not yet ready
let config = self.config()?;
let addrs: Vec<_> = node_group
.iter()
.filter_map(|&node| self.config.resolve_remote(node))
.filter_map(|&node| config.resolve_remote(node))
.collect();
if addrs.is_empty() {
return NoRemoteConfigured { node_group }.fail();
@ -775,11 +909,10 @@ where
pub async fn write_entry(&self, db_name: &str, entry_bytes: Vec<u8>) -> Result<()> {
// Return an error if this server is not yet ready
self.require_initialized()?;
let config = self.require_initialized()?;
let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?;
let db = self
.config
let db = config
.db_initialized(&db_name)
.context(DatabaseNotFound { db_name: &*db_name })?;
@ -825,11 +958,11 @@ where
}
pub fn db(&self, name: &DatabaseName<'_>) -> Option<Arc<Db>> {
self.config.db_initialized(name)
self.config().ok()?.db_initialized(name)
}
pub fn db_rules(&self, name: &DatabaseName<'_>) -> Option<Arc<DatabaseRules>> {
self.config.db_initialized(name).map(|d| d.rules())
self.db(name).map(|d| d.rules())
}
// Update database rules and save on success.
@ -841,8 +974,8 @@ where
where
F: FnOnce(DatabaseRules) -> Result<DatabaseRules, E> + Send,
{
let rules = self
.config
let config = self.config()?;
let rules = config
.update_db_rules(db_name, update)
.map_err(|e| match e {
crate::config::UpdateError::Closure(e) => UpdateError::Closure(e),
@ -854,16 +987,23 @@ where
Ok(rules)
}
pub fn remotes_sorted(&self) -> Vec<(ServerId, String)> {
self.config.remotes_sorted()
pub fn remotes_sorted(&self) -> Result<Vec<(ServerId, String)>> {
// TODO: Should these be on ConnectionManager and not Config
let config = self.config()?;
Ok(config.remotes_sorted())
}
pub fn update_remote(&self, id: ServerId, addr: GRpcConnectionString) {
self.config.update_remote(id, addr)
pub fn update_remote(&self, id: ServerId, addr: GRpcConnectionString) -> Result<()> {
// TODO: Should these be on ConnectionManager and not Config
let config = self.config()?;
config.update_remote(id, addr);
Ok(())
}
pub fn delete_remote(&self, id: ServerId) -> Option<GRpcConnectionString> {
self.config.delete_remote(id)
pub fn delete_remote(&self, id: ServerId) -> Result<Option<GRpcConnectionString>> {
// TODO: Should these be on ConnectionManager and not Config
let config = self.config()?;
Ok(config.delete_remote(id))
}
pub fn spawn_dummy_job(&self, nanos: Vec<u64>) -> TaskTracker<Job> {
@ -893,14 +1033,15 @@ where
partition_key: impl Into<String>,
chunk_id: u32,
) -> Result<TaskTracker<Job>> {
let config = self.require_initialized()?;
let db_name = db_name.to_string();
let name = DatabaseName::new(&db_name).context(InvalidDatabaseName)?;
let partition_key = partition_key.into();
let table_name = table_name.into();
let db = self
.config
let db = config
.db_initialized(&name)
.context(DatabaseNotFound { db_name: &db_name })?;
@ -921,25 +1062,62 @@ where
/// DB jobs and this command.
pub fn wipe_preserved_catalog(
&self,
db_name: DatabaseName<'static>,
db_name: &DatabaseName<'static>,
) -> Result<TaskTracker<Job>> {
if self.config.db_initialized(&db_name).is_some() {
return Err(Error::DatabaseAlreadyExists {
db_name: db_name.to_string(),
});
}
// Can only wipe catalog of database that failed to initialize
let config = match &*self.stage.read() {
ServerStage::Initialized {
config,
database_errors,
} => {
if config.db_initialized(db_name).is_some() {
return Err(Error::DatabaseAlreadyExists {
db_name: db_name.to_string(),
});
}
if !database_errors.contains_key(db_name.as_str()) {
// TODO: Should this be an error? Some end-to-end tests assume it is non-fatal
warn!(%db_name, "wiping database not present at startup");
}
Arc::clone(config)
}
ServerStage::Startup { .. } => return Err(Error::IdNotSet),
ServerStage::Initializing { config, .. } | ServerStage::InitReady { config, .. } => {
return Err(Error::ServerNotInitialized {
server_id: config.server_id(),
})
}
};
let (tracker, registration) = self.jobs.register(Job::WipePreservedCatalog {
db_name: db_name.to_string(),
});
let object_store = Arc::clone(&self.store);
let config = Arc::clone(&self.config);
let server_id = self.require_id()?;
let init_status = Arc::clone(&self.init_status);
let state = Arc::clone(&self.stage);
let db_name = db_name.clone();
let task = async move {
init_status
.wipe_preserved_catalog_and_maybe_recover(object_store, config, server_id, db_name)
.await
let result = init::wipe_preserved_catalog_and_maybe_recover(config, &db_name).await;
match &mut *state.write() {
ServerStage::Initialized {
database_errors, ..
} => match result {
Ok(_) => {
info!(%db_name, "wiped preserved catalog of registered database and recovered");
database_errors.remove(db_name.as_str());
Ok(())
}
Err(e) => {
warn!(%db_name, %e, "wiped preserved catalog of registered database but still cannot recover");
let e = Arc::new(e);
database_errors.insert(db_name.to_string(), Arc::clone(&e));
Err(e)
}
},
_ => unreachable!("server cannot become uninitialized"),
}
};
tokio::spawn(task.track(registration));
@ -973,7 +1151,9 @@ where
}
info!("shutting down background workers");
self.config.drain().await;
if let Ok(config) = self.config() {
config.drain().await;
}
info!("draining tracker registry");
@ -999,11 +1179,15 @@ where
type Error = Error;
fn db_names_sorted(&self) -> Vec<String> {
self.config
.db_names_sorted()
.iter()
.map(|i| i.clone().into())
.collect()
self.config()
.map(|config| {
config
.db_names_sorted()
.iter()
.map(ToString::to_string)
.collect()
})
.unwrap_or_default()
}
fn db(&self, name: &str) -> Option<Arc<Self::Database>> {
@ -1214,25 +1398,15 @@ mod tests {
let manager = TestConnectionManager::new();
let server = Server::new(manager, config());
let resp = server.require_id().unwrap_err();
assert!(matches!(
resp,
Error::GetIdError {
source: crate::init::Error::IdNotSet
}
));
let resp = server.config().unwrap_err();
assert!(matches!(resp, Error::IdNotSet));
let lines = parsed_lines("cpu foo=1 10");
let resp = server
.write_lines("foo", &lines, ARBITRARY_DEFAULT_TIME)
.await
.unwrap_err();
assert!(matches!(
resp,
Error::GetIdError {
source: crate::init::Error::IdNotSet
}
));
assert!(matches!(resp, Error::IdNotSet));
}
#[tokio::test]
@ -1589,7 +1763,9 @@ mod tests {
);
// one remote is configured but it's down and we'll get connection error
server.update_remote(bad_remote_id, BAD_REMOTE_ADDR.into());
server
.update_remote(bad_remote_id, BAD_REMOTE_ADDR.into())
.unwrap();
let err = server
.write_lines(&db_name, &lines, ARBITRARY_DEFAULT_TIME)
.await
@ -1606,8 +1782,12 @@ mod tests {
// We configure the address for the other remote, this time connection will succeed
// despite the bad remote failing to connect.
server.update_remote(good_remote_id_1, GOOD_REMOTE_ADDR_1.into());
server.update_remote(good_remote_id_2, GOOD_REMOTE_ADDR_2.into());
server
.update_remote(good_remote_id_1, GOOD_REMOTE_ADDR_1.into())
.unwrap();
server
.update_remote(good_remote_id_2, GOOD_REMOTE_ADDR_2.into())
.unwrap();
// Remotes are tried in random order, so we need to repeat the test a few times to have a reasonable
// probability both the remotes will get hit.
@ -1844,12 +2024,7 @@ mod tests {
let err = create_simple_database(&server, "bananas")
.await
.unwrap_err();
assert!(matches!(
err,
Error::GetIdError {
source: crate::init::Error::IdNotSet
}
));
assert!(matches!(err, Error::IdNotSet));
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
// do NOT call `server.maybe_load_database_configs` so DBs are not loaded and server is not ready
@ -1873,7 +2048,7 @@ mod tests {
let t_0 = Instant::now();
loop {
if server.require_initialized().is_ok() {
if server.config().is_ok() {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
@ -1916,9 +2091,12 @@ mod tests {
create_simple_database(&server, "foo")
.await
.expect("failed to create database");
let root = server.init_status.root_path(&store).unwrap();
server.config.drain().await;
let config = server.require_initialized().unwrap();
let root = config.root_path();
config.drain().await;
drop(server);
drop(config);
// tamper store
let path = object_store_path_for_database_config(&root, &DatabaseName::new("bar").unwrap());
@ -2003,18 +2181,24 @@ mod tests {
let server = Server::new(manager, config);
server.set_id(server_id).unwrap();
server.maybe_initialize_server().await;
create_simple_database(&server, db_name_existing.clone())
.await
.expect("failed to create database");
create_simple_database(&server, db_name_rules_broken.clone())
.await
.expect("failed to create database");
create_simple_database(&server, db_name_catalog_broken.clone())
.await
.expect("failed to create database");
let root = server.init_status.root_path(&store).unwrap();
server.config.drain().await;
let config = server.require_initialized().unwrap();
let root = config.root_path();
config.drain().await;
drop(server);
drop(config);
// tamper store to break one database
let path = object_store_path_for_database_config(&root, &db_name_rules_broken);
@ -2045,22 +2229,18 @@ mod tests {
let store = Arc::try_unwrap(store).unwrap();
store.get(&path).await.unwrap();
let manager = TestConnectionManager::new();
let config = config_with_store(store);
let server = Server::new(manager, config);
// need to disable auto-wipe for this test
server
.init_status
.wipe_on_error
.store(false, std::sync::atomic::Ordering::Relaxed);
let mut config = config_with_store(store);
config.wipe_catalog_on_error = false;
let server = Server::new(manager, config);
// cannot wipe if server ID is not set
assert_eq!(
server
.wipe_preserved_catalog(db_name_non_existing.clone())
.wipe_preserved_catalog(&db_name_non_existing)
.unwrap_err()
.to_string(),
"cannot get id: unable to use server until id is set"
"id not set"
);
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
@ -2069,31 +2249,29 @@ mod tests {
// 1. cannot wipe if DB exists
assert_eq!(
server
.wipe_preserved_catalog(db_name_existing.clone())
.wipe_preserved_catalog(&db_name_existing)
.unwrap_err()
.to_string(),
"database already exists: db_existing"
);
assert!(PreservedCatalog::exists(
&server.store,
server.require_id().unwrap(),
&db_name_existing.to_string()
)
.await
.unwrap());
assert!(
PreservedCatalog::exists(&server.store, server_id, db_name_existing.as_str())
.await
.unwrap()
);
// 2. wiping a non-existing DB just works, but won't bring DB into existence
assert!(server.error_database(&db_name_non_existing).is_none());
PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&server.store),
server.require_id().unwrap(),
server_id,
db_name_non_existing.to_string(),
(),
)
.await
.unwrap();
let tracker = server
.wipe_preserved_catalog(db_name_non_existing.clone())
.wipe_preserved_catalog(&db_name_non_existing)
.unwrap();
let metadata = tracker.metadata();
let expected_metadata = Job::WipePreservedCatalog {
@ -2103,7 +2281,7 @@ mod tests {
tracker.join().await;
assert!(!PreservedCatalog::exists(
&server.store,
server.require_id().unwrap(),
server_id,
&db_name_non_existing.to_string()
)
.await
@ -2114,7 +2292,7 @@ mod tests {
// 3. wipe DB with broken rules file, this won't bring DB back to life
assert!(server.error_database(&db_name_rules_broken).is_some());
let tracker = server
.wipe_preserved_catalog(db_name_rules_broken.clone())
.wipe_preserved_catalog(&db_name_rules_broken)
.unwrap();
let metadata = tracker.metadata();
let expected_metadata = Job::WipePreservedCatalog {
@ -2124,7 +2302,7 @@ mod tests {
tracker.join().await;
assert!(!PreservedCatalog::exists(
&server.store,
server.require_id().unwrap(),
server_id,
&db_name_rules_broken.to_string()
)
.await
@ -2135,7 +2313,7 @@ mod tests {
// 4. wipe DB with broken catalog, this will bring the DB back to life
assert!(server.error_database(&db_name_catalog_broken).is_some());
let tracker = server
.wipe_preserved_catalog(db_name_catalog_broken.clone())
.wipe_preserved_catalog(&db_name_catalog_broken)
.unwrap();
let metadata = tracker.metadata();
let expected_metadata = Job::WipePreservedCatalog {
@ -2145,7 +2323,7 @@ mod tests {
tracker.join().await;
assert!(PreservedCatalog::exists(
&server.store,
server.require_id().unwrap(),
server_id,
&db_name_catalog_broken.to_string()
)
.await
@ -2166,18 +2344,16 @@ mod tests {
.unwrap();
assert_eq!(
server
.wipe_preserved_catalog(db_name_created.clone())
.wipe_preserved_catalog(&db_name_created)
.unwrap_err()
.to_string(),
"database already exists: db_created"
);
assert!(PreservedCatalog::exists(
&server.store,
server.require_id().unwrap(),
&db_name_created.to_string()
)
.await
.unwrap());
assert!(
PreservedCatalog::exists(&server.store, server_id, &db_name_created.to_string())
.await
.unwrap()
);
}
#[tokio::test]

View File

@ -8,7 +8,7 @@ pub fn default_server_error_handler(error: server::Error) -> tonic::Status {
use server::Error;
match error {
Error::GetIdError { .. } => PreconditionViolation {
Error::IdNotSet => PreconditionViolation {
category: "Writer ID".to_string(),
subject: "influxdata.com/iox".to_string(),
description: "Writer ID must be set".to_string(),

View File

@ -56,7 +56,7 @@ where
&self,
_: Request<GetServerIdRequest>,
) -> Result<Response<GetServerIdResponse>, Status> {
match self.server.require_id().ok() {
match self.server.server_id() {
Some(id) => Ok(Response::new(GetServerIdResponse { id: id.get_u32() })),
None => return Err(NotFound::default().into()),
}
@ -71,7 +71,7 @@ where
match self.server.set_id(id) {
Ok(_) => Ok(Response::new(UpdateServerIdResponse {})),
Err(e @ Error::SetIdError { .. }) => {
Err(e @ Error::IdAlreadySet) => {
return Err(FieldViolation {
field: "id".to_string(),
description: e.to_string(),
@ -199,15 +199,18 @@ where
&self,
_: Request<ListRemotesRequest>,
) -> Result<Response<ListRemotesResponse>, Status> {
let remotes = self
.server
.remotes_sorted()
.into_iter()
.map(|(id, connection_string)| Remote {
id: id.get_u32(),
connection_string,
})
.collect();
let result = self.server.remotes_sorted();
let remotes = match result {
Ok(remotes) => remotes
.into_iter()
.map(|(id, connection_string)| Remote {
id: id.get_u32(),
connection_string,
})
.collect(),
Err(e) => return Err(default_server_error_handler(e)),
};
Ok(Response::new(ListRemotesResponse { remotes }))
}
@ -221,8 +224,16 @@ where
.ok_or_else(|| FieldViolation::required("remote"))?;
let remote_id = ServerId::try_from(remote.id)
.map_err(|_| FieldViolation::required("id").scope("remote"))?;
self.server
let result = self
.server
.update_remote(remote_id, remote.connection_string);
match result {
Ok(_) => {}
Err(e) => return Err(default_server_error_handler(e)),
}
Ok(Response::new(UpdateRemoteResponse {}))
}
@ -233,9 +244,12 @@ where
let request = request.into_inner();
let remote_id =
ServerId::try_from(request.id).map_err(|_| FieldViolation::required("id"))?;
self.server
.delete_remote(remote_id)
.ok_or_else(NotFound::default)?;
match self.server.delete_remote(remote_id) {
Ok(Some(_)) => {}
Ok(None) => return Err(NotFound::default().into()),
Err(e) => return Err(default_server_error_handler(e)),
}
Ok(Response::new(DeleteRemoteResponse {}))
}
@ -455,7 +469,7 @@ where
let tracker = self
.server
.wipe_preserved_catalog(db_name)
.wipe_preserved_catalog(&db_name)
.map_err(|e| match e {
Error::DatabaseAlreadyExists { db_name } => AlreadyExists {
resource_type: "database".to_string(),

View File

@ -65,6 +65,8 @@ async fn test_list_update_remotes() {
const TEST_REMOTE_ADDR_2: &str = "4.3.2.1:4321";
const TEST_REMOTE_ADDR_2_UPDATED: &str = "40.30.20.10:4321";
client.update_server_id(123).await.unwrap();
let res = client.list_remotes().await.expect("list remotes failed");
assert_eq!(res.len(), 0);

View File

@ -244,6 +244,18 @@ async fn test_list_chunks_error() {
async fn test_remotes() {
let server_fixture = ServerFixture::create_single_use().await;
let addr = server_fixture.grpc_base();
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("server")
.arg("set")
.arg("32")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains("Ok"));
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("server")

View File

@ -79,8 +79,8 @@ impl KafkaBufferProducer {
let mut cfg = ClientConfig::new();
cfg.set("bootstrap.servers", &conn);
cfg.set("message.timeout.ms", "5000");
cfg.set("message.max.bytes", "10000000");
cfg.set("queue.buffering.max.kbytes", "10485760");
cfg.set("message.max.bytes", "31457280");
cfg.set("queue.buffering.max.kbytes", "31457280");
cfg.set("request.required.acks", "all"); // equivalent to acks=-1
let producer: FutureProducer = cfg.create()?;