Merge branch 'main' into er/feat/read_buffer/float_int

pull/24376/head
Edd Robinson 2021-06-03 14:48:36 +01:00 committed by GitHub
commit e583e1fbda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 157 additions and 50 deletions

1
Cargo.lock generated
View File

@ -3722,6 +3722,7 @@ dependencies = [
"parquet_file",
"query",
"rand 0.8.3",
"rand_distr",
"read_buffer",
"serde",
"serde_json",

View File

@ -4,6 +4,7 @@ use influxdb_line_protocol::ParsedLine;
use regex::Regex;
use snafu::{OptionExt, Snafu};
use std::num::NonZeroU64;
use std::time::Duration;
use std::{
collections::HashMap,
hash::{Hash, Hasher},
@ -49,6 +50,10 @@ pub struct DatabaseRules {
/// An optional config to delegate data plane operations to one or more
/// remote servers.
pub routing_rules: Option<RoutingRules>,
/// Duration for which the cleanup loop should sleep on average.
/// Defaults to 500 seconds.
pub worker_cleanup_avg_sleep: Duration,
}
#[derive(Debug, Eq, PartialEq, Clone)]
@ -79,6 +84,7 @@ impl DatabaseRules {
write_buffer_config: None,
lifecycle_rules: Default::default(),
routing_rules: None,
worker_cleanup_avg_sleep: Duration::from_secs(500),
}
}

View File

@ -184,6 +184,10 @@ message DatabaseRules {
// Routing config
RoutingConfig routing_config = 9;
}
// Duration for which the cleanup loop should sleep on average.
// Defaults to 500 seconds.
google.protobuf.Duration worker_cleanup_avg_sleep = 10;
}
message RoutingConfig {

View File

@ -1,4 +1,5 @@
use std::convert::{TryFrom, TryInto};
use std::time::Duration;
use thiserror::Error;
@ -23,6 +24,7 @@ impl From<DatabaseRules> for management::DatabaseRules {
write_buffer_config: rules.write_buffer_config.map(Into::into),
lifecycle_rules: Some(rules.lifecycle_rules.into()),
routing_rules: rules.routing_rules.map(Into::into),
worker_cleanup_avg_sleep: Some(rules.worker_cleanup_avg_sleep.into()),
}
}
}
@ -50,12 +52,18 @@ impl TryFrom<management::DatabaseRules> for DatabaseRules {
.optional("routing_rules")
.unwrap_or_default();
let worker_cleanup_avg_sleep = match proto.worker_cleanup_avg_sleep {
Some(d) => d.try_into().field("worker_cleanup_avg_sleep")?,
None => Duration::from_secs(500),
};
Ok(Self {
name,
partition_template,
write_buffer_config,
lifecycle_rules,
routing_rules,
worker_cleanup_avg_sleep,
})
}
}

View File

@ -50,9 +50,6 @@ pub enum Error {
#[snafu(display("Unable to open file {}: {}", path.display(), source))]
UnableToOpenFile { source: io::Error, path: PathBuf },
#[snafu(display("Unable to process directory entry: {}", source))]
UnableToProcessEntry { source: walkdir::Error },
#[snafu(display("Unable to read data from file {}: {}", path.display(), source))]
UnableToReadBytes { source: io::Error, path: PathBuf },
@ -196,8 +193,7 @@ impl ObjectStoreApi for File {
let mut objects = Vec::new();
let root_path = self.root.to_raw();
for entry in walkdir {
let entry = entry.context(UnableToProcessEntry)?;
for entry in walkdir.into_iter().filter_map(Result::ok) {
let entry_location = FilePath::raw(entry.path(), false);
if entry_location.prefix_matches(&resolved_prefix) {

View File

@ -584,9 +584,11 @@ mod tests {
pub(crate) async fn list_with_delimiter(storage: &ObjectStore) -> Result<()> {
delete_fixtures(storage).await;
// ==================== check: store is empty ====================
let content_list = flatten_list_stream(storage, None).await?;
assert!(content_list.is_empty());
// ==================== do: create files ====================
let data = Bytes::from("arbitrary data");
let files: Vec<_> = [
@ -614,6 +616,7 @@ mod tests {
.unwrap();
}
// ==================== check: prefix-list `mydb/wb` (directory) ====================
let mut prefix = storage.new_path();
prefix.push_all_dirs(&["mydb", "wb"]);
@ -634,7 +637,7 @@ mod tests {
assert_eq!(object.location, expected_location);
assert_eq!(object.size, data.len());
// List with a prefix containing a partial "file name"
// ==================== check: prefix-list `mydb/wb/000/000/001` (partial filename) ====================
let mut prefix = storage.new_path();
prefix.push_all_dirs(&["mydb", "wb", "000", "000"]);
prefix.set_file_name("001");
@ -651,10 +654,20 @@ mod tests {
assert_eq!(object.location, expected_location);
// ==================== check: prefix-list `not_there` (non-existing prefix) ====================
let mut prefix = storage.new_path();
prefix.push_all_dirs(&["not_there"]);
let result = storage.list_with_delimiter(&prefix).await.unwrap();
assert!(result.common_prefixes.is_empty());
assert!(result.objects.is_empty());
// ==================== do: remove all files ====================
for f in &files {
storage.delete(f).await.unwrap();
}
// ==================== check: store is empty ====================
let content_list = flatten_list_stream(storage, None).await?;
assert!(content_list.is_empty());

View File

@ -103,14 +103,14 @@ where
// now that the transaction lock is dropped, perform the actual (and potentially slow) delete operation
let n_files = to_remove.len();
info!("Found {} files to delete, start deletion.", n_files);
info!(%n_files, "Found files to delete, start deletion.");
for path in to_remove {
info!("Delete file: {}", path.display());
info!(path = %path.display(), "Delete file");
store.delete(&path).await.context(WriteError)?;
}
info!("Finished deletion, removed {} files.", n_files);
info!(%n_files, "Finished deletion, removed files.");
Ok(())
}

View File

@ -35,6 +35,7 @@ parking_lot = "0.11.1"
parquet_file = { path = "../parquet_file" }
query = { path = "../query" }
rand = "0.8.3"
rand_distr = "0.4.0"
read_buffer = { path = "../read_buffer" }
serde = "1.0"
serde_json = "1.0"

View File

@ -1,6 +1,8 @@
//! This module contains the main IOx Database object which has the
//! instances of the mutable buffer, read buffer, and object store
use self::catalog::TableNameFilter;
use super::{
buffer::{self, Buffer},
JobRegistry,
@ -39,8 +41,8 @@ use parquet_file::{
},
storage::Storage,
};
use query::predicate::{Predicate, PredicateBuilder};
use query::{exec::Executor, Database, DEFAULT_SCHEMA};
use query::{exec::Executor, predicate::Predicate, Database, DEFAULT_SCHEMA};
use rand_distr::{Distribution, Poisson};
use read_buffer::{Chunk as ReadBufferChunk, ChunkMetrics as ReadBufferChunkMetrics};
use snafu::{ResultExt, Snafu};
use std::{
@ -879,10 +881,11 @@ impl Db {
/// Return chunk summary information for all chunks in the specified
/// partition across all storage systems
pub fn partition_chunk_summaries(&self, partition_key: &str) -> Vec<ChunkSummary> {
self.catalog.state().filtered_chunks(
&PredicateBuilder::new().partition_key(partition_key).build(),
CatalogChunk::summary,
)
let partition_key = Some(partition_key);
let table_names = TableNameFilter::AllTables;
self.catalog
.state()
.filtered_chunks(partition_key, table_names, CatalogChunk::summary)
}
/// Return Summary information for all columns in all chunks in the
@ -953,10 +956,18 @@ impl Db {
.fetch_add(1, Ordering::Relaxed);
tokio::select! {
_ = async {
// Sleep for a duration drawn from a poisson distribution to de-correlate workers.
// Perform this sleep BEFORE the actual clean-up so that we don't immediately run a clean-up
// on startup.
let avg_sleep_secs = self.rules.read().worker_cleanup_avg_sleep.as_secs_f32().max(1.0);
let dist = Poisson::new(avg_sleep_secs).expect("parameter should be positive and finite");
let duration = Duration::from_secs_f32(dist.sample(&mut rand::thread_rng()));
debug!(?duration, "cleanup worker sleeps");
tokio::time::sleep(duration).await;
if let Err(e) = cleanup_unreferenced_parquet_files(&self.catalog).await {
error!("error in background cleanup task: {:?}", e);
error!(%e, "error in background cleanup task");
}
tokio::time::sleep(Duration::from_secs(500)).await;
} => {},
_ = shutdown.cancelled() => break,
}
@ -1107,9 +1118,11 @@ impl Database for Db {
/// Note there could/should be an error here (if the partition
/// doesn't exist... but the trait doesn't have an error)
fn chunks(&self, predicate: &Predicate) -> Vec<Arc<Self::Chunk>> {
let partition_key = predicate.partition_key.as_deref();
let table_names: TableNameFilter<'_> = predicate.table_names.as_ref().into();
self.catalog
.state()
.filtered_chunks(predicate, DbChunk::snapshot)
.filtered_chunks(partition_key, table_names, DbChunk::snapshot)
}
fn partition_keys(&self) -> Result<Vec<String>, Self::Error> {
@ -2668,6 +2681,8 @@ mod tests {
.server_id(server_id)
.object_store(Arc::clone(&object_store))
.db_name(db_name)
// "dispable" clean-up by setting it to a very long time to avoid interference with this test
.worker_cleanup_avg_sleep(Duration::from_secs(1_000))
.build()
.await;
@ -2750,7 +2765,7 @@ mod tests {
let _ = chunk_a.read();
});
// Hold lock for 100 seconds blocking background task
// Hold lock for 100 milliseconds blocking background task
std::thread::sleep(std::time::Duration::from_millis(100));
std::mem::drop(chunk_b);

View File

@ -1,5 +1,5 @@
//! This module contains the implementation of the InfluxDB IOx Metadata catalog
use std::any::Any;
use std::{any::Any, collections::BTreeSet};
use std::{
collections::{btree_map::Entry, BTreeMap},
sync::Arc,
@ -21,7 +21,6 @@ use internal_types::selection::Selection;
use partition::Partition;
use query::{
exec::stringset::StringSet,
predicate::Predicate,
provider::{self, ProviderBuilder},
PartitionChunk,
};
@ -123,6 +122,34 @@ pub enum Error {
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Specify which tables are to be matched when filtering
/// catalog chunks
#[derive(Debug, Clone, Copy)]
pub enum TableNameFilter<'a> {
/// Include all tables
AllTables,
/// Only include tables that appear in the named set
NamedTables(&'a BTreeSet<String>),
}
impl<'a> From<Option<&'a BTreeSet<String>>> for TableNameFilter<'a> {
/// Creates a [`TableNameFilter`] from an [`Option`].
///
/// If the Option is `None`, all table names will be included in
/// the results.
///
/// If the Option is `Some(set)`, only table names which apear in
/// `set` will be included in the results.
///
/// Note `Some(empty set)` will not match anything
fn from(v: Option<&'a BTreeSet<String>>) -> Self {
match v {
Some(names) => Self::NamedTables(names),
None => Self::AllTables,
}
}
}
/// InfluxDB IOx Metadata Catalog
///
/// The Catalog stores information such as which chunks exist, what
@ -233,11 +260,15 @@ impl Catalog {
}
pub fn chunk_summaries(&self) -> Vec<ChunkSummary> {
self.filtered_chunks(&Predicate::default(), Chunk::summary)
let partition_key = None;
let table_names = TableNameFilter::AllTables;
self.filtered_chunks(partition_key, table_names, Chunk::summary)
}
pub fn detailed_chunk_summaries(&self) -> Vec<DetailedChunkSummary> {
self.filtered_chunks(&Predicate::default(), Chunk::detailed_summary)
let partition_key = None;
let table_names = TableNameFilter::AllTables;
self.filtered_chunks(partition_key, table_names, Chunk::detailed_summary)
}
/// Returns all chunks within the catalog in an arbitrary order
@ -272,16 +303,25 @@ impl Catalog {
chunks
}
/// Calls `map` with every chunk matching `predicate` and returns a
/// collection of the results
pub fn filtered_chunks<F, C>(&self, predicate: &Predicate, map: F) -> Vec<C>
/// Calls `map` with every chunk and returns a collection of the results
///
/// If `partition_key` is Some(partition_key) only returns chunks
/// from the specified partiton.
///
/// `table_names` specifies which tables to include
pub fn filtered_chunks<F, C>(
&self,
partition_key: Option<&str>,
table_names: TableNameFilter<'_>,
map: F,
) -> Vec<C>
where
F: Fn(&Chunk) -> C + Copy,
{
let mut chunks = Vec::new();
let partitions = self.partitions.read();
let partitions = match &predicate.partition_key {
let partitions = match partition_key {
None => itertools::Either::Left(partitions.values()),
Some(partition_key) => {
itertools::Either::Right(partitions.get(partition_key).into_iter())
@ -290,9 +330,8 @@ impl Catalog {
for partition in partitions {
let partition = partition.read();
chunks.extend(partition.filtered_chunks(predicate).map(|chunk| {
chunks.extend(partition.filtered_chunks(table_names).map(|chunk| {
let chunk = chunk.read();
// TODO: Filter chunks
map(&chunk)
}))
}
@ -360,7 +399,6 @@ mod tests {
use super::*;
use data_types::server_id::ServerId;
use entry::{test_helpers::lp_to_entry, ClockValue};
use query::predicate::PredicateBuilder;
use std::convert::TryFrom;
fn create_open_chunk(partition: &Arc<RwLock<Partition>>, table: &str) {
@ -602,6 +640,7 @@ mod tests {
#[test]
fn filtered_chunks() {
use TableNameFilter::*;
let catalog = Catalog::test();
let p1 = catalog.get_or_create_partition("p1");
@ -610,23 +649,21 @@ mod tests {
create_open_chunk(&p1, "table2");
create_open_chunk(&p2, "table2");
let a = catalog.filtered_chunks(&Predicate::default(), |_| ());
let a = catalog.filtered_chunks(None, AllTables, |_| ());
let b = catalog.filtered_chunks(&PredicateBuilder::new().table("table1").build(), |_| ());
let b = catalog.filtered_chunks(None, NamedTables(&make_set("table1")), |_| ());
let c = catalog.filtered_chunks(&PredicateBuilder::new().table("table2").build(), |_| ());
let c = catalog.filtered_chunks(None, NamedTables(&make_set("table2")), |_| ());
let d = catalog.filtered_chunks(
&PredicateBuilder::new()
.table("table2")
.partition_key("p2")
.build(),
|_| (),
);
let d = catalog.filtered_chunks(Some("p2"), NamedTables(&make_set("table2")), |_| ());
assert_eq!(a.len(), 3);
assert_eq!(b.len(), 1);
assert_eq!(c.len(), 2);
assert_eq!(d.len(), 1);
}
fn make_set(s: impl Into<String>) -> BTreeSet<String> {
std::iter::once(s.into()).collect()
}
}

View File

@ -9,14 +9,13 @@ use data_types::chunk_metadata::ChunkSummary;
use data_types::partition_metadata::{
PartitionSummary, UnaggregatedPartitionSummary, UnaggregatedTableSummary,
};
use query::predicate::Predicate;
use tracker::RwLock;
use crate::db::catalog::metrics::PartitionMetrics;
use super::{
chunk::{Chunk, ChunkStage},
Error, Result, UnknownChunk, UnknownTable,
Error, Result, TableNameFilter, UnknownChunk, UnknownTable,
};
/// IOx Catalog Partition
@ -231,16 +230,24 @@ impl Partition {
self.tables.values().flat_map(|table| table.chunks.values())
}
/// Return an iterator over chunks in this partition that
/// may pass the provided predicate
/// Return an iterator over chunks in this partition
///
/// `table_names` specifies which tables to include
pub fn filtered_chunks<'a>(
&'a self,
predicate: &'a Predicate,
table_names: TableNameFilter<'a>,
) -> impl Iterator<Item = &Arc<RwLock<Chunk>>> + 'a {
self.tables
.iter()
.filter(move |(table_name, _)| predicate.should_include_table(table_name))
.flat_map(|(_, table)| table.chunks.values())
.filter_map(
move |(partition_table_name, partition_table)| match table_names {
TableNameFilter::AllTables => Some(partition_table.chunks.values()),
TableNameFilter::NamedTables(table_names) => table_names
.contains(partition_table_name)
.then(|| partition_table.chunks.values()),
},
)
.flatten()
}
/// Return the unaggregated chunk summary information for tables

View File

@ -1112,7 +1112,7 @@ async fn get_database_config_bytes(
#[cfg(test)]
mod tests {
use std::{collections::BTreeMap, convert::TryFrom};
use std::{collections::BTreeMap, convert::TryFrom, time::Duration};
use async_trait::async_trait;
use futures::TryStreamExt;
@ -1206,6 +1206,7 @@ mod tests {
write_buffer_config: None,
lifecycle_rules: Default::default(),
routing_rules: None,
worker_cleanup_avg_sleep: Duration::from_secs(2),
};
// Create a database
@ -1302,6 +1303,7 @@ mod tests {
write_buffer_config: None,
lifecycle_rules: Default::default(),
routing_rules: None,
worker_cleanup_avg_sleep: Duration::from_secs(2),
};
// Create a database

View File

@ -12,7 +12,7 @@ use crate::{
db::{load_or_create_preserved_catalog, Db},
JobRegistry,
};
use std::{borrow::Cow, convert::TryFrom, sync::Arc};
use std::{borrow::Cow, convert::TryFrom, sync::Arc, time::Duration};
// A wrapper around a Db and a metrics registry allowing for isolated testing
// of a Db and its metrics.
@ -34,6 +34,7 @@ pub struct TestDbBuilder {
object_store: Option<Arc<ObjectStore>>,
db_name: Option<DatabaseName<'static>>,
write_buffer: bool,
worker_cleanup_avg_sleep: Option<Duration>,
}
impl TestDbBuilder {
@ -77,10 +78,17 @@ impl TestDbBuilder {
.await
.unwrap();
let mut rules = DatabaseRules::new(db_name);
// make background loop spin a bit faster for tests
rules.worker_cleanup_avg_sleep = self
.worker_cleanup_avg_sleep
.unwrap_or_else(|| Duration::from_secs(1));
TestDb {
metric_registry: metrics::TestMetricRegistry::new(metrics_registry),
db: Db::new(
DatabaseRules::new(db_name),
rules,
server_id,
object_store,
exec,
@ -110,6 +118,11 @@ impl TestDbBuilder {
self.write_buffer = enabled;
self
}
pub fn worker_cleanup_avg_sleep(mut self, d: Duration) -> Self {
self.worker_cleanup_avg_sleep = Some(d);
self
}
}
/// Used for testing: create a Database with a local store

View File

@ -223,6 +223,10 @@ async fn test_create_get_update_database() {
..Default::default()
}),
routing_rules: None,
worker_cleanup_avg_sleep: Some(Duration {
seconds: 2,
nanos: 0,
}),
};
client