Merge branch 'main' into crepererum/issue1382-c

pull/24376/head
kodiakhq[bot] 2021-05-20 15:51:47 +00:00 committed by GitHub
commit f028a356f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1011 additions and 541 deletions

1
Cargo.lock generated
View File

@ -3164,6 +3164,7 @@ name = "read_buffer"
version = "0.1.0"
dependencies = [
"arrow 0.1.0",
"arrow_util",
"criterion",
"croaring",
"data_types",

View File

@ -21,6 +21,13 @@ impl BitSet {
Self::default()
}
/// Creates a new BitSet with `count` unset bits.
pub fn with_size(count: usize) -> Self {
let mut bitset = Self::default();
bitset.append_unset(count);
bitset
}
/// Appends `count` unset bits
pub fn append_unset(&mut self, count: usize) {
self.len += count;

View File

@ -12,6 +12,7 @@ edition = "2018"
[dependencies] # In alphabetical order
arrow = { path = "../arrow" }
arrow_util = { path = "../arrow_util" }
croaring = "0.4.5"
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
@ -46,5 +47,5 @@ name = "string"
harness = false
[[bench]]
name = "row_group"
name = "read"
harness = false

View File

@ -0,0 +1,9 @@
use criterion::{criterion_group, criterion_main};
mod read_filter;
mod read_group;
use read_filter::read_filter;
use read_group::read_group;
criterion_group!(benches, read_filter, read_group);
criterion_main!(benches);

View File

@ -0,0 +1,314 @@
use criterion::{BenchmarkId, Criterion, Throughput};
use rand::distributions::Alphanumeric;
use rand::prelude::*;
use rand::Rng;
use rand_distr::{Distribution, Normal};
use internal_types::selection::Selection;
use packers::{sorter, Packers};
use read_buffer::{
benchmarks::{Column, ColumnType, RowGroup},
Chunk,
};
use read_buffer::{BinaryExpr, Predicate};
const ONE_MS: i64 = 1_000_000;
pub fn read_filter(c: &mut Criterion) {
let mut rng = rand::thread_rng();
let mut chunk = Chunk::new(read_buffer::ChunkMetrics::new_unregistered());
let row_group = generate_row_group(200_000, &mut rng);
read_buffer::benchmarks::upsert_table_with_row_group(&mut chunk, "table", row_group);
read_filter_no_pred_vary_proj(c, &chunk);
read_filter_with_pred_vary_proj(c, &chunk);
}
// These benchmarks track the performance of read_filter without any predicate
// but varying the size of projection (columns) requested
fn read_filter_no_pred_vary_proj(c: &mut Criterion, chunk: &Chunk) {
let mut group = c.benchmark_group("read_filter/no_pred");
// All these projections involve the same number of rows but with varying
// cardinalities.
let projections = vec![
(Selection::Some(&["user_id"]), 200_000),
(Selection::Some(&["node_id"]), 2_000),
(Selection::Some(&["cluster"]), 200),
(Selection::Some(&["env"]), 2),
];
for (projection, exp_card) in projections {
// benchmark measures the throughput of group creation.
group.throughput(Throughput::Elements(200_000));
group.bench_with_input(
BenchmarkId::from_parameter(format!("cardinality_{:?}_rows_{:?}", exp_card, 200_000)),
&exp_card,
|b, _| {
b.iter(|| {
let result = chunk
.read_filter("table", Predicate::default(), projection)
.unwrap();
let rbs = result.collect::<Vec<_>>();
assert_eq!(rbs.len(), 1);
assert_eq!(rbs[0].num_rows(), 200_000);
assert_eq!(rbs[0].num_columns(), 1);
});
},
);
}
group.finish();
}
// These benchmarks track the performance of read_filter with different predicates
fn read_filter_with_pred_vary_proj(c: &mut Criterion, chunk: &Chunk) {
let mut group = c.benchmark_group("read_filter/with_pred");
// these predicates vary the number of rows returned
let predicates = vec![(
Predicate::with_time_range(
&[BinaryExpr::from(("env", "=", "env-1"))],
i64::MIN,
i64::MAX,
),
100_000,
)];
for (predicate, exp_rows) in predicates {
// benchmark measures the throughput of group creation.
group.throughput(Throughput::Elements(exp_rows as u64));
group.bench_with_input(
BenchmarkId::from_parameter(format!("rows_{:?}", exp_rows)),
&exp_rows,
|b, _| {
b.iter(|| {
let result = chunk
.read_filter("table", predicate.clone(), Selection::All)
.unwrap();
let rbs = result.collect::<Vec<_>>();
assert_eq!(rbs.len(), 1);
assert!(rbs[0].num_rows() > 0); // data randomly generated so row numbers not exact
assert_eq!(rbs[0].num_columns(), 11);
});
},
);
}
group.finish();
}
// TODO(edd): figure out how to DRY this into a single place in `benches` "crate".
//
// This generates a `RowGroup` with a known schema, ~known column cardinalities
// and variable number of rows.
//
// The schema and cardinalities are in-line with a tracing data use-case.
fn generate_row_group(rows: usize, rng: &mut ThreadRng) -> RowGroup {
let mut timestamp = 1351700038292387000_i64;
let spans_per_trace = 10;
let mut column_packers: Vec<Packers> = vec![
Packers::from(Vec::<Option<String>>::with_capacity(rows)), // env (card 2)
Packers::from(Vec::<Option<String>>::with_capacity(rows)), // data_centre (card 20)
Packers::from(Vec::<Option<String>>::with_capacity(rows)), // cluster (card 200)
Packers::from(Vec::<Option<String>>::with_capacity(rows)), // user_id (card 200,000)
Packers::from(Vec::<Option<String>>::with_capacity(rows)), // request_id (card 2,000,000)
Packers::from(Vec::<Option<String>>::with_capacity(rows)), // node_id (card 2,000)
Packers::from(Vec::<Option<String>>::with_capacity(rows)), // pod_id (card 20,000)
Packers::from(Vec::<Option<String>>::with_capacity(rows)), // trace_id (card "rows / 10")
Packers::from(Vec::<Option<String>>::with_capacity(rows)), // span_id (card "rows")
Packers::from(Vec::<Option<i64>>::with_capacity(rows)), // duration
Packers::from(Vec::<Option<i64>>::with_capacity(rows)), // time
];
let n = rows / spans_per_trace;
for _ in 0..n {
column_packers =
generate_trace_for_row_group(spans_per_trace, timestamp, column_packers, rng);
// next trace is ~10 seconds in the future
timestamp += 10_000 * ONE_MS;
}
// sort the packers according to lowest to highest cardinality excluding
// columns that are likely to be unique.
//
// - env, data_centre, cluster, node_id, pod_id, user_id, request_id, time
sorter::sort(&mut column_packers, &[0, 1, 2, 5, 6, 3, 4, 10]).unwrap();
// create columns
let columns = vec![
(
"env".to_string(),
ColumnType::Tag(Column::from(column_packers[0].str_packer().values())),
),
(
"data_centre".to_string(),
ColumnType::Tag(Column::from(column_packers[1].str_packer().values())),
),
(
"cluster".to_string(),
ColumnType::Tag(Column::from(column_packers[2].str_packer().values())),
),
(
"user_id".to_string(),
ColumnType::Tag(Column::from(column_packers[3].str_packer().values())),
),
(
"request_id".to_string(),
ColumnType::Tag(Column::from(column_packers[4].str_packer().values())),
),
(
"node_id".to_string(),
ColumnType::Tag(Column::from(column_packers[5].str_packer().values())),
),
(
"pod_id".to_string(),
ColumnType::Tag(Column::from(column_packers[6].str_packer().values())),
),
(
"trace_id".to_string(),
ColumnType::Tag(Column::from(column_packers[7].str_packer().values())),
),
(
"span_id".to_string(),
ColumnType::Tag(Column::from(column_packers[8].str_packer().values())),
),
(
"duration".to_string(),
ColumnType::Field(Column::from(
column_packers[9].i64_packer().some_values().as_slice(),
)),
),
(
"time".to_string(),
ColumnType::Time(Column::from(
column_packers[10].i64_packer().some_values().as_slice(),
)),
),
];
RowGroup::new(rows as u32, columns)
}
fn generate_trace_for_row_group(
spans_per_trace: usize,
timestamp: i64,
mut column_packers: Vec<Packers>,
rng: &mut ThreadRng,
) -> Vec<Packers> {
let env_idx = 0;
let data_centre_idx = 1;
let cluster_idx = 2;
let user_id_idx = 3;
let request_id_idx = 4;
let node_id_idx = 5;
let pod_id_idx = 6;
let trace_id_idx = 7;
let span_id_idx = 8;
let duration_idx = 9;
let time_idx = 10;
let env_value = rng.gen_range(0_u8..2);
let env = format!("env-{:?}", env_value); // cardinality of 2.
let data_centre_value = rng.gen_range(0_u8..10);
let data_centre = format!("data_centre-{:?}-{:?}", env_value, data_centre_value); // cardinality of 2 * 10 = 20
let cluster_value = rng.gen_range(0_u8..10);
let cluster = format!(
"cluster-{:?}-{:?}-{:?}",
env_value,
data_centre_value,
cluster_value // cardinality of 2 * 10 * 10 = 200
);
// user id is dependent on the cluster
let user_id_value = rng.gen_range(0_u32..1000);
let user_id = format!(
"uid-{:?}-{:?}-{:?}-{:?}",
env_value,
data_centre_value,
cluster_value,
user_id_value // cardinality of 2 * 10 * 10 * 1000 = 200,000
);
let request_id_value = rng.gen_range(0_u32..10);
let request_id = format!(
"rid-{:?}-{:?}-{:?}-{:?}-{:?}",
env_value,
data_centre_value,
cluster_value,
user_id_value,
request_id_value // cardinality of 2 * 10 * 10 * 1000 * 10 = 2,000,000
);
let trace_id = rng
.sample_iter(&Alphanumeric)
.map(char::from)
.take(8)
.collect::<String>();
// the trace should move across hosts, which in this setup would be nodes
// and pods.
let normal = Normal::new(10.0, 5.0).unwrap();
let node_id_prefix = format!("{}-{}-{}", env_value, data_centre_value, cluster_value,);
for _ in 0..spans_per_trace {
// these values are not the same for each span so need to be generated
// separately.
let node_id = rng.gen_range(0..10); // cardinality is 2 * 10 * 10 * 10 = 2,000
column_packers[pod_id_idx].str_packer_mut().push(format!(
"pod_id-{}-{}-{}",
node_id_prefix,
node_id,
rng.gen_range(0..10) // cardinality is 2 * 10 * 10 * 10 * 10 = 20,000
));
column_packers[node_id_idx]
.str_packer_mut()
.push(format!("node_id-{}-{}", node_id_prefix, node_id));
// randomly generate a span_id
column_packers[span_id_idx].str_packer_mut().push(
rng.sample_iter(&Alphanumeric)
.map(char::from)
.take(8)
.collect::<String>(),
);
// randomly generate some duration times in milliseconds.
column_packers[duration_idx].i64_packer_mut().push(
(normal.sample(rng) * ONE_MS as f64)
.max(ONE_MS as f64) // minimum duration is 1ms
.round() as i64,
);
}
column_packers[env_idx]
.str_packer_mut()
.fill_with(env, spans_per_trace);
column_packers[data_centre_idx]
.str_packer_mut()
.fill_with(data_centre, spans_per_trace);
column_packers[cluster_idx]
.str_packer_mut()
.fill_with(cluster, spans_per_trace);
column_packers[user_id_idx]
.str_packer_mut()
.fill_with(user_id, spans_per_trace);
column_packers[request_id_idx]
.str_packer_mut()
.fill_with(request_id, spans_per_trace);
column_packers[trace_id_idx]
.str_packer_mut()
.fill_with(trace_id, spans_per_trace);
column_packers[time_idx]
.i64_packer_mut()
.fill_with(timestamp, spans_per_trace);
column_packers
}

View File

@ -1,4 +1,4 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use criterion::{BenchmarkId, Criterion, Throughput};
use rand::distributions::Alphanumeric;
use rand::prelude::*;
use rand::Rng;
@ -10,7 +10,7 @@ use read_buffer::{AggregateType, Predicate};
const ONE_MS: i64 = 1_000_000;
fn read_group(c: &mut Criterion) {
pub fn read_group(c: &mut Criterion) {
let mut rng = rand::thread_rng();
let row_group = generate_row_group(500_000, &mut rng);
@ -455,6 +455,3 @@ fn generate_trace_for_row_group(
column_packers
}
criterion_group!(benches, read_group);
criterion_main!(benches);

View File

@ -788,28 +788,35 @@ mod test {
let got_column = rb.column(rb.schema().index_of(col_name).unwrap());
match exp {
Values::String(exp_data) => match got_column.data_type() {
DataType::Utf8 => {
let arr = got_column.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(&arr.iter().collect::<Vec<_>>(), exp_data);
}
Values::Dictionary(keys, values) => match got_column.data_type() {
DataType::Dictionary(key, value)
if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 =>
{
// Record batch stores keys as i32
let keys = keys
.iter()
.map(|&x| i32::try_from(x).unwrap())
.collect::<Vec<_>>();
let dictionary = got_column
.as_any()
.downcast_ref::<DictionaryArray<Int32Type>>()
.unwrap();
let values = dictionary.values();
let values = values.as_any().downcast_ref::<StringArray>().unwrap();
let rb_values = dictionary.values();
let rb_values = rb_values.as_any().downcast_ref::<StringArray>().unwrap();
let hydrated: Vec<_> = dictionary
.keys()
.iter()
.map(|key| key.map(|key| values.value(key as _)))
.collect();
// Ensure string values are same
assert!(rb_values.iter().zip(values.iter()).all(|(a, b)| &a == b));
assert_eq!(&hydrated, exp_data)
let rb_keys = dictionary.keys().values();
assert_eq!(rb_keys, keys.as_slice());
}
d => panic!("Unexpected type {:?}", d),
},
Values::String(exp_data) => match got_column.data_type() {
DataType::Utf8 => {
let arr = got_column.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(&arr.iter().collect::<Vec<_>>(), exp_data);
}
d => panic!("Unexpected type {:?}", d),
},
@ -1278,11 +1285,12 @@ mod test {
.read_filter("Coolverine", predicate, Selection::All)
.unwrap();
let exp_env_values = Values::String(vec![Some("us-west")]);
let exp_region_values = Values::String(vec![Some("west")]);
let exp_env_values = Values::Dictionary(vec![0], vec![Some("us-west")]);
let exp_region_values = Values::Dictionary(vec![0], vec![Some("west")]);
let exp_counter_values = Values::F64(vec![1.2]);
let exp_sketchy_sensor_values = Values::I64N(vec![None]);
let exp_active_values = Values::Bool(vec![Some(true)]);
let exp_msg_values = Values::String(vec![Some("message a")]);
let first_row_group = itr.next().unwrap();
assert_rb_column_equals(&first_row_group, "env", &exp_env_values);
@ -1294,11 +1302,7 @@ mod test {
&exp_sketchy_sensor_values,
);
assert_rb_column_equals(&first_row_group, "active", &exp_active_values);
assert_rb_column_equals(
&first_row_group,
"msg",
&Values::String(vec![Some("message a")]),
);
assert_rb_column_equals(&first_row_group, "msg", &exp_msg_values);
assert_rb_column_equals(&first_row_group, "time", &Values::I64(vec![100])); // first row from first record batch
let second_row_group = itr.next().unwrap();

View File

@ -246,6 +246,26 @@ impl Column {
}
}
/// All values present at the provided logical row ids materialised in a
/// dictionary format.
///
/// # Panics
///
/// Panics if called on a non-string columnar encoding.
pub fn values_as_dictionary(&self, row_ids: &[u32]) -> Values<'_> {
assert!(
row_ids.len() as u32 <= self.num_rows(),
"too many row ids {:?} provided for column with {:?} rows",
row_ids.len(),
self.num_rows()
);
if let Self::String(_, data) = &self {
return data.values_as_dictionary(row_ids);
}
panic!("unsupported encoding type {}", self)
}
/// All logical values in the column.
pub fn all_values(&self) -> Values<'_> {
match &self {
@ -258,6 +278,18 @@ impl Column {
}
}
/// All logical values in the column returned in a dictionary encoded format.
///
/// # Panics
///
/// Panics if called on a non-string columnar encoding.
pub fn all_values_as_dictionary(&self) -> Values<'_> {
if let Self::String(_, data) = &self {
return data.all_values_as_dictionary();
}
panic!("unsupported encoding type {}", self)
}
/// The value present at the provided logical row id.
pub fn decode_id(&self, encoded_id: u32) -> Value<'_> {
match &self {
@ -1636,10 +1668,27 @@ mod test {
let col = Column::from(&[0.0, 1.1, 20.2, 22.3, 100.1324][..]);
assert_eq!(col.values(&[1, 3]), Values::F64(vec![1.1, 22.3]));
let col = Column::from(&[Some("a"), Some("b"), None, Some("c")][..]);
let col = Column::from(&[Some("a"), Some("b"), None, Some("c"), Some("b")][..]);
assert_eq!(
col.values(&[1, 2, 3]),
Values::String(vec![Some("b"), None, Some("c")])
col.values(&[1, 2, 3, 4]),
Values::String(vec![Some("b"), None, Some("c"), Some("b")])
);
}
#[test]
fn values_as_dictionary() {
let col = Column::from(&[Some("a"), Some("b"), None, Some("c"), Some("b")][..]);
//
// Stored in dictionary like:
//
// dict: {NULL: 0, a: 1, b: 2, c: 3}
// values: [1, 2, 0, 3, 2]
assert_eq!(
col.values_as_dictionary(&[1, 2, 3, 4]),
Values::Dictionary(
vec![1, 0, 2, 1], // encoded IDs for [b, NULL, c, b]
vec![None, Some("b"), Some("c")] // dictionary
)
);
}

View File

@ -5,7 +5,7 @@ use either::Either;
use super::cmp;
use super::encoding::string::{dictionary, rle};
use super::encoding::string::{Dictionary, Encoding, RLE};
use super::encoding::string::{Dictionary, Encoding, NULL_ID, RLE};
use crate::column::{RowIDs, Statistics, Value, Values};
// Edd's totally made up magic constant. This determines whether we would use
@ -152,7 +152,7 @@ impl StringEncoding {
}
}
/// All values present at the provided logical row ids.
/// All values present at the provided logical row IDs.
///
/// TODO(edd): perf - pooling of destination vectors.
pub fn values(&self, row_ids: &[u32]) -> Values<'_> {
@ -162,6 +162,75 @@ impl StringEncoding {
}
}
/// Returns all values present at the provided logical row IDs as a
/// dictionary encoded `Values` format.
pub fn values_as_dictionary(&self, row_ids: &[u32]) -> Values<'_> {
//
// Example:
//
// Suppose you have column encoded like this:
//
// values: NULL, "alpha", "beta", "gamma"
// encoded: 1, 1, 2, 0, 3 (alpha, alpha, beta, NULL, gamma)
//
// And only the rows: {0, 1, 3, 4} are required.
//
// The column encoding will return the following encoded values
//
// encoded: 1, 1, 0, 3 (alpha, alpha, NULL, gamma)
//
// Because the dictionary has likely changed, the encoded values need
// to be transformed into a new domain `[0, encoded.len())` so that they
// become:
//
// keys: [1, 1, 0, 2]
// values: [None, Some("alpha"), Some("gamma")]
let mut keys = self.encoded_values(row_ids, vec![]);
// build a mapping from encoded value to new ordinal position.
let mut ordinal_mapping = hashbrown::HashMap::new();
for key in &keys {
ordinal_mapping.insert(*key, u32::default()); // don't know final ordinal position yet
}
// create new ordinal offsets - the encoded values need to be shifted
// into a new domain `[0, ordinal_mapping.len())` which is the length
// of the new dictionary.
let mut ordinal_mapping_keys = ordinal_mapping
.keys()
.into_iter()
.cloned()
.collect::<Vec<_>>();
ordinal_mapping_keys.sort_unstable();
for (i, key) in ordinal_mapping_keys.iter().enumerate() {
// now we can insert the new ordinal position of the encoded in key
// in the final values vector.
ordinal_mapping.insert(*key, i as u32);
}
// Rewrite all the encoded values into the new domain.
for key in keys.iter_mut() {
*key = *ordinal_mapping.get(key).unwrap();
}
// now generate the values vector, which will contain the sorted set of
// string values
let mut values = match &self {
Self::RleDictionary(c) => ordinal_mapping_keys
.iter()
.map(|id| c.decode_id(*id))
.collect::<Vec<_>>(),
Self::Dictionary(c) => ordinal_mapping_keys
.iter()
.map(|id| c.decode_id(*id))
.collect::<Vec<_>>(),
};
values.sort_unstable();
Values::Dictionary(keys, values)
}
/// All values in the column.
///
/// TODO(edd): perf - pooling of destination vectors.
@ -172,6 +241,48 @@ impl StringEncoding {
}
}
/// Returns all values as a dictionary encoded `Values` format.
pub fn all_values_as_dictionary(&self) -> Values<'_> {
let mut keys = self.all_encoded_values(vec![]);
let values = if self.contains_null() {
// The column's ordered set of values including None because that is a
// reserved encoded key (`0`).
let mut values = vec![None];
match &self {
Self::RleDictionary(c) => {
values.extend(c.dictionary().into_iter().map(|s| Some(s.as_str())));
}
Self::Dictionary(c) => {
values.extend(c.dictionary().into_iter().map(|s| Some(s.as_str())));
}
};
values
} else {
// since column doesn't contain null we need to shift all the encoded
// values down
assert_eq!(NULL_ID, 0);
for key in keys.iter_mut() {
*key -= 1;
}
match &self {
Self::RleDictionary(c) => c
.dictionary()
.into_iter()
.map(|s| Some(s.as_str()))
.collect::<Vec<_>>(),
Self::Dictionary(c) => c
.dictionary()
.into_iter()
.map(|s| Some(s.as_str()))
.collect::<Vec<_>>(),
}
};
Values::Dictionary(keys, values)
}
/// Returns the logical value for the specified encoded representation.
pub fn decode_id(&self, encoded_id: u32) -> Value<'_> {
match &self {
@ -487,3 +598,156 @@ impl From<&[&str]> for StringEncoding {
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
// tests both `values_as_dictionary` and `all_values_as_dictionary`
fn values_as_dictionary() {
let set = vec!["apple", "beta", "orange", "pear"];
let data = vec![
Some("apple"),
Some("apple"),
Some("pear"),
None,
None,
Some("orange"),
Some("beta"),
];
let mut rle = RLE::with_dictionary(
set.iter()
.cloned()
.map(String::from)
.collect::<BTreeSet<String>>(),
);
for v in data.iter().map(|x| x.map(String::from)) {
rle.push_additional(v, 1);
}
let mut dict = Dictionary::with_dictionary(
set.into_iter()
.map(String::from)
.collect::<BTreeSet<String>>(),
);
for v in data.iter().map(|x| x.map(String::from)) {
dict.push_additional(v, 1);
}
let encodings = vec![
StringEncoding::RleDictionary(rle),
StringEncoding::Dictionary(dict),
];
for enc in encodings {
_values_as_dictionary(&enc);
_all_values_as_dictionary(&enc);
}
// example without NULL values
let data = vec![
Some("apple"),
Some("apple"),
Some("beta"),
Some("orange"),
Some("pear"),
];
let encodings = vec![
StringEncoding::RleDictionary(RLE::from(data.clone())),
StringEncoding::Dictionary(Dictionary::from(data)),
];
for enc in encodings {
let exp_keys = vec![0, 0, 1, 2, 3];
let exp_values = vec![Some("apple"), Some("beta"), Some("orange"), Some("pear")];
let values = enc.all_values_as_dictionary();
if let Values::Dictionary(got_keys, got_values) = values {
assert_eq!(got_keys, exp_keys, "key comparison for {} failed", enc);
assert_eq!(
got_values, exp_values,
"values comparison for {} failed",
enc
);
} else {
panic!("invalid Values format returned, got {:?}", values);
}
}
}
fn _values_as_dictionary(enc: &StringEncoding) {
// column is: [apple, apple, pear, NULL, NULL, orange, beta]
// Since the Read Buffer only accepts row IDs in order we only need to
// cover ascending rows in these tests.
let cases = vec![
(
&[0, 3, 4][..], // apple NULL, NULL
(vec![1, 0, 0], vec![None, Some("apple")]),
),
(
&[6], // beta
(vec![0], vec![Some("beta")]),
),
(
&[0, 3, 5][..], // apple NULL, orange
(vec![1, 0, 2], vec![None, Some("apple"), Some("orange")]),
),
(
&[0, 1, 2, 3, 4, 5, 6], // apple, apple, pear, NULL, NULL, orange, beta
(
vec![1, 1, 4, 0, 0, 3, 2],
vec![
None,
Some("apple"),
Some("beta"),
Some("orange"),
Some("pear"),
],
),
),
];
for (row_ids, (exp_keys, exp_values)) in cases {
let values = enc.values_as_dictionary(row_ids);
if let Values::Dictionary(got_keys, got_values) = values {
assert_eq!(got_keys, exp_keys, "key comparison for {} failed", enc);
assert_eq!(
got_values, exp_values,
"values comparison for {} failed",
enc
);
} else {
panic!("invalid Values format returned, got {:?}", values);
}
}
}
fn _all_values_as_dictionary(enc: &StringEncoding) {
// column is: [apple, apple, pear, NULL, NULL, orange, beta]
let exp_keys = vec![1, 1, 4, 0, 0, 3, 2];
let exp_values = vec![
None,
Some("apple"),
Some("beta"),
Some("orange"),
Some("pear"),
];
let values = enc.all_values_as_dictionary();
if let Values::Dictionary(got_keys, got_values) = values {
assert_eq!(got_keys, exp_keys, "key comparison for {} failed", enc);
assert_eq!(
got_values, exp_values,
"values comparison for {} failed",
enc
);
} else {
panic!("invalid Values format returned, got {:?}", values);
}
}
}

View File

@ -25,6 +25,15 @@ pub mod benchmarks {
cmp::Operator, encoding::scalar::Fixed, encoding::scalar::FixedNull, encoding::string,
Column, RowIDs,
};
pub use crate::row_group::{ColumnType, RowGroup};
use crate::Chunk;
// Allow external benchmarks to use this crate-only test method
pub fn upsert_table_with_row_group(
chunk: &mut Chunk,
table_name: impl Into<String>,
row_group: RowGroup,
) {
chunk.upsert_table_with_row_group(table_name, row_group)
}
}

View File

@ -7,7 +7,6 @@ use std::{
sync::Arc,
};
use arrow::array;
use hashbrown::{hash_map, HashMap};
use itertools::Itertools;
use snafu::{ResultExt, Snafu};
@ -19,6 +18,7 @@ use crate::value::{
AggregateVec, EncodedValues, OwnedValue, Scalar, Value, Values, ValuesIterator,
};
use arrow::{
array,
array::ArrayRef,
datatypes::{DataType, TimeUnit},
record_batch::RecordBatch,
@ -256,37 +256,40 @@ impl RowGroup {
// apply predicates to determine candidate rows.
let row_ids = self.row_ids_from_predicate(predicates);
let col_data = self.materialise_rows(columns, row_ids);
let col_data = self.materialise_rows(&schema, row_ids);
ReadFilterResult {
schema,
data: col_data,
}
}
fn materialise_rows(&self, names: &[ColumnName<'_>], row_ids: RowIDsOption) -> Vec<Values<'_>> {
let mut col_data = Vec::with_capacity(names.len());
fn materialise_rows(&self, schema: &ResultSchema, row_ids: RowIDsOption) -> Vec<Values<'_>> {
let mut col_data = Vec::with_capacity(schema.len());
match row_ids {
RowIDsOption::None(_) => col_data, // nothing to materialise
RowIDsOption::Some(row_ids) => {
// TODO(edd): causes an allocation. Implement a way to pass a
// pooled buffer to the croaring Bitmap API.
let row_ids = row_ids.to_vec();
for &name in names {
let (_, col) = self.column_name_and_column(name);
col_data.push(col.values(row_ids.as_slice()));
for (ct, _) in &schema.select_columns {
let (_, col) = self.column_name_and_column(ct.as_str());
if let schema::ColumnType::Tag(_) = ct {
col_data.push(col.values_as_dictionary(row_ids.as_slice()));
} else {
col_data.push(col.values(row_ids.as_slice()));
}
}
col_data
}
RowIDsOption::All(_) => {
// TODO(edd): Perf - add specialised method to get all
// materialised values from a column without having to
// materialise a vector of row ids.......
let row_ids = (0..self.rows()).collect::<Vec<_>>();
for &name in names {
let (_, col) = self.column_name_and_column(name);
col_data.push(col.values(row_ids.as_slice()));
for (ct, _) in &schema.select_columns {
let (_, col) = self.column_name_and_column(ct.as_str());
if let schema::ColumnType::Tag(_) = ct {
col_data.push(col.all_values_as_dictionary());
} else {
col_data.push(col.all_values());
}
}
col_data
}
@ -1684,27 +1687,6 @@ impl TryFrom<ReadFilterResult<'_>> for RecordBatch {
};
}
if let Some(InfluxColumnType::Tag) = schema.field(i).0 {
return match values {
Values::String(values) => {
// TODO: Preserve dictionary encoding
Ok(
Arc::new(
values
.into_iter()
.collect::<arrow::array::DictionaryArray<
arrow::datatypes::Int32Type,
>>(),
) as _,
)
}
t => UnsupportedOperation {
msg: format!("cannot convert {:?} to DictionaryArray", t),
}
.fail(),
};
}
Ok(arrow::array::ArrayRef::from(values))
})
.collect::<Result<Vec<_>, _>>()?;
@ -2473,6 +2455,55 @@ west,4
assert!(results.is_empty());
}
#[test]
fn read_filter_dictionaries() {
let mut columns = vec![];
let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5, 6][..]));
columns.push(("time".to_string(), tc));
// Tag column that will be dictionary encoded when materialised
let rc = ColumnType::Tag(Column::from(
&["west", "west", "east", "west", "south", "north"][..],
));
columns.push(("region".to_string(), rc));
// Field column that will be stored as a string array when materialised
let mc = ColumnType::Field(Column::from(
&["GET", "POST", "POST", "POST", "PUT", "GET"][..],
));
columns.push(("method".to_string(), mc));
let row_group = RowGroup::new(6, columns);
let cases = vec![
(
vec!["method", "region", "time"],
Predicate::default(),
"method,region,time
GET,west,1
POST,west,2
POST,east,3
POST,west,4
PUT,south,5
GET,north,6
",
),
(
vec!["method", "region", "time"],
Predicate::with_time_range(&[], -1, 3),
"method,region,time
GET,west,1
POST,west,2
",
),
];
for (cols, predicates, expected) in cases {
let results = row_group.read_filter(&cols, &predicates);
assert_eq!(format!("{:?}", &results), expected);
}
}
#[test]
fn read_aggregate() {
let mut columns = vec![];

View File

@ -1349,6 +1349,11 @@ pub enum Values<'a> {
// UTF-8 valid unicode strings
String(Vec<Option<&'a str>>),
// A dictionary mapping between a vector of dictionary integer keys and the
// string values they refer to.
// NOTE the strings are always sorted
Dictionary(Vec<u32>, Vec<Option<&'a str>>),
// Scalar types
I64(Vec<i64>),
U64(Vec<u64>),
@ -1368,6 +1373,7 @@ impl<'a> Values<'a> {
pub fn len(&self) -> usize {
match &self {
Self::String(c) => c.len(),
Self::Dictionary(c, _) => c.len(),
Self::I64(c) => c.len(),
Self::U64(c) => c.len(),
Self::F64(c) => c.len(),
@ -1386,6 +1392,7 @@ impl<'a> Values<'a> {
pub fn is_null(&self, i: usize) -> bool {
match &self {
Self::String(c) => c[i].is_none(),
Self::Dictionary(keys, values) => values[keys[i] as usize].is_none(),
Self::F64(_) => false,
Self::I64(_) => false,
Self::U64(_) => false,
@ -1403,6 +1410,10 @@ impl<'a> Values<'a> {
Some(v) => Value::String(v),
None => Value::Null,
},
Self::Dictionary(keys, values) => match values[keys[i] as usize] {
Some(v) => Value::String(v),
None => Value::Null,
},
Self::F64(c) => Value::Scalar(Scalar::F64(c[i])),
Self::I64(c) => Value::Scalar(Scalar::I64(c[i])),
Self::U64(c) => Value::Scalar(Scalar::U64(c[i])),
@ -1460,6 +1471,7 @@ impl<'a> Values<'a> {
fn value_str(&self, i: usize) -> &'a str {
match &self {
Values::String(c) => c[i].unwrap(),
Values::Dictionary(keys, values) => values[keys[i] as usize].unwrap(),
_ => panic!("value cannot be returned as &str"),
}
}
@ -1481,11 +1493,71 @@ impl<'a> Values<'a> {
}
}
use arrow::{
array::{Array, ArrayDataBuilder, DictionaryArray},
buffer::Buffer,
datatypes::{DataType, Int32Type},
};
use arrow_util::bitset::BitSet;
use std::iter::FromIterator;
/// Moves ownership of Values into an arrow `ArrayRef`.
impl From<Values<'_>> for arrow::array::ArrayRef {
fn from(values: Values<'_>) -> Self {
match values {
Values::String(values) => Arc::new(arrow::array::StringArray::from(values)),
Values::Dictionary(mut keys, values) => {
// check for NULL values, setting null positions
// on the null bitmap if there is at least one NULL
// value.
let null_bitmap = if matches!(values.first(), Some(None)) {
let mut bitset = BitSet::with_size(keys.len());
for (i, v) in keys.iter_mut().enumerate() {
if *v as usize != 0 {
bitset.set(i); // valid value
}
// because Arrow Dictionary arrays do not maintain a
// None/NULL entry in the string values array we need to
// shift the encoded key down so it maps correctly to
// the values array. The encoded key for NULL entries is
// never used (it's undefined) so we can keep those
// encoded keys set to 0.
if *v > 0 {
*v -= 1;
}
}
Some(bitset)
} else {
None
};
// If there is a null bitmap we need to remove the None entry
// from the string values array since Arrow doesn't maintain
// NULL entries in a dictionary's value array.
let values_arr = if null_bitmap.is_some() {
// drop NULL value entry as this is not stored in Arrow's
// dictionary values array.
assert!(values[0].is_none());
arrow::array::StringArray::from_iter(values.into_iter().skip(1))
} else {
arrow::array::StringArray::from(values)
};
let mut builder = ArrayDataBuilder::new(DataType::Dictionary(
Box::new(DataType::Int32),
Box::new(DataType::Utf8),
))
.len(keys.len())
.add_buffer(Buffer::from_iter(keys))
.add_child_data(values_arr.data().clone());
if let Some(bm) = null_bitmap {
builder = builder.null_bit_buffer(bm.to_arrow());
}
Arc::new(DictionaryArray::<Int32Type>::from(builder.build()))
}
Values::I64(values) => Arc::new(arrow::array::Int64Array::from(values)),
Values::U64(values) => Arc::new(arrow::array::UInt64Array::from(values)),
Values::F64(values) => Arc::new(arrow::array::Float64Array::from(values)),
@ -1593,6 +1665,7 @@ impl EncodedValues {
#[cfg(test)]
mod test {
use super::*;
use arrow::array::ArrayRef;
#[test]
fn aggregate_vec_update() {
@ -1783,4 +1856,72 @@ mod test {
let v1 = OwnedValue::ByteArray(vec![2, 44, 252]);
assert_eq!(v1.size(), 35);
}
#[test]
fn from_dictionary_arrow() {
let values = Values::Dictionary(
vec![0, 1, 2, 0, 1, 2, 2],
vec![Some("bones"), Some("just"), Some("planet telex")],
);
let arr = ArrayRef::from(values);
// no null values in Arrow dictionary array
assert_eq!(arr.null_count(), 0);
assert!((0..7).into_iter().all(|i| !arr.is_null(i)));
// Should produce the same the array as when created from an iterator
// of strings.
let exp_dict_arr = vec![
Some("bones"),
Some("just"),
Some("planet telex"),
Some("bones"),
Some("just"),
Some("planet telex"),
Some("planet telex"),
]
.into_iter()
.collect::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>();
let as_dict_arr = arr
.as_any()
.downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
.unwrap();
assert_eq!(as_dict_arr.keys(), exp_dict_arr.keys());
// Now let's try with some NULL entries.
let values = Values::Dictionary(
vec![0, 1, 2, 0, 1, 2, 2],
vec![None, Some("just"), Some("planet telex")],
);
let arr = ArrayRef::from(values);
assert_eq!(arr.null_count(), 2);
for (i, exp) in vec![true, false, false, true, false, false, false]
.iter()
.enumerate()
{
assert_eq!(arr.is_null(i), *exp);
}
// Should produce the same the array as when created from an iterator
// of strings.
let exp_dict_arr = vec![
None,
Some("just"),
Some("planet telex"),
None,
Some("just"),
Some("planet telex"),
Some("planet telex"),
]
.into_iter()
.collect::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>();
let as_dict_arr = arr
.as_any()
.downcast_ref::<arrow::array::DictionaryArray<arrow::datatypes::Int32Type>>()
.unwrap();
assert_eq!(as_dict_arr.keys(), exp_dict_arr.keys());
}
}

View File

@ -48,10 +48,14 @@ impl<E> From<Error> for UpdateError<E> {
}
impl Config {
pub(crate) fn new(jobs: Arc<JobRegistry>, metric_registry: Arc<MetricRegistry>) -> Self {
pub(crate) fn new(
jobs: Arc<JobRegistry>,
metric_registry: Arc<MetricRegistry>,
remote_template: Option<RemoteTemplate>,
) -> Self {
Self {
shutdown: Default::default(),
state: Default::default(),
state: RwLock::new(ConfigState::new(remote_template)),
jobs,
metric_registry,
}
@ -120,7 +124,11 @@ impl Config {
pub(crate) fn resolve_remote(&self, id: ServerId) -> Option<GRpcConnectionString> {
let state = self.state.read().expect("mutex poisoned");
state.remotes.get(&id).cloned()
state
.remotes
.get(&id)
.cloned()
.or_else(|| state.remote_template.as_ref().map(|t| t.get(&id)))
}
fn commit(
@ -233,6 +241,36 @@ struct ConfigState {
databases: BTreeMap<DatabaseName<'static>, DatabaseState>,
/// Map between remote IOx server IDs and management API connection strings.
remotes: BTreeMap<ServerId, GRpcConnectionString>,
/// Static map between remote server IDs and hostnames based on a template
remote_template: Option<RemoteTemplate>,
}
impl ConfigState {
fn new(remote_template: Option<RemoteTemplate>) -> Self {
Self {
remote_template,
..Default::default()
}
}
}
/// A RemoteTemplate string is a remote connection template string.
/// Occurrences of the substring "{id}" in the template will be replaced
/// by the server ID.
#[derive(Debug)]
pub struct RemoteTemplate {
template: String,
}
impl RemoteTemplate {
pub fn new(template: impl Into<String>) -> Self {
let template = template.into();
Self { template }
}
fn get(&self, id: &ServerId) -> GRpcConnectionString {
self.template.replace("{id}", &format!("{}", id.get_u32()))
}
}
#[derive(Debug)]
@ -316,12 +354,17 @@ mod test {
use crate::db::load_or_create_preserved_catalog;
use super::*;
use std::num::NonZeroU32;
#[tokio::test]
async fn create_db() {
let name = DatabaseName::new("foo").unwrap();
let metric_registry = Arc::new(metrics::MetricRegistry::new());
let config = Config::new(Arc::new(JobRegistry::new()), Arc::clone(&metric_registry));
let config = Config::new(
Arc::new(JobRegistry::new()),
Arc::clone(&metric_registry),
None,
);
let rules = DatabaseRules::new(name.clone());
{
@ -363,7 +406,11 @@ mod test {
async fn test_db_drop() {
let name = DatabaseName::new("foo").unwrap();
let metric_registry = Arc::new(metrics::MetricRegistry::new());
let config = Config::new(Arc::new(JobRegistry::new()), Arc::clone(&metric_registry));
let config = Config::new(
Arc::new(JobRegistry::new()),
Arc::clone(&metric_registry),
None,
);
let rules = DatabaseRules::new(name.clone());
let db_reservation = config.create_db(rules).unwrap();
@ -412,4 +459,28 @@ mod test {
assert_eq!(rules_path, expected_path);
}
#[test]
fn resolve_remote() {
let metric_registry = Arc::new(metrics::MetricRegistry::new());
let config = Config::new(
Arc::new(JobRegistry::new()),
Arc::clone(&metric_registry),
Some(RemoteTemplate::new("http://iox-query-{id}:8082")),
);
let server_id = ServerId::new(NonZeroU32::new(42).unwrap());
let remote = config.resolve_remote(server_id);
assert_eq!(
remote,
Some(GRpcConnectionString::from("http://iox-query-42:8082"))
);
let server_id = ServerId::new(NonZeroU32::new(24).unwrap());
let remote = config.resolve_remote(server_id);
assert_eq!(
remote,
Some(GRpcConnectionString::from("http://iox-query-24:8082"))
);
}
}

View File

@ -75,7 +75,7 @@ use bytes::BytesMut;
use cached::proc_macro::cached;
use db::load_or_create_preserved_catalog;
use futures::stream::TryStreamExt;
use observability_deps::tracing::{error, info, warn};
use observability_deps::tracing::{debug, error, info, warn};
use parking_lot::Mutex;
use snafu::{OptionExt, ResultExt, Snafu};
@ -93,12 +93,14 @@ use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
use query::{exec::Executor, DatabaseStore};
use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt};
pub use crate::config::RemoteTemplate;
use crate::{
config::{
object_store_path_for_database_config, Config, GRpcConnectionString, DB_RULES_FILE_NAME,
},
db::Db,
};
use cached::Return;
use data_types::database_rules::{NodeGroup, Shard, ShardId};
use generated_types::database_rules::{decode_database_rules, encode_database_rules};
use influxdb_iox_client::{connection::Builder, write};
@ -109,7 +111,6 @@ pub mod buffer;
mod config;
pub mod db;
mod query_tests;
pub mod snapshot;
// This module exposes `query_tests` outside of the crate so that it may be used
// in benchmarks. Do not import this module for non-benchmark purposes!
@ -228,15 +229,22 @@ pub struct ServerConfig {
object_store: Arc<ObjectStore>,
metric_registry: Arc<MetricRegistry>,
remote_template: Option<RemoteTemplate>,
}
impl ServerConfig {
/// Create a new config using the specified store.
pub fn new(object_store: Arc<ObjectStore>, metric_registry: Arc<MetricRegistry>) -> Self {
pub fn new(
object_store: Arc<ObjectStore>,
metric_registry: Arc<MetricRegistry>,
remote_template: Option<RemoteTemplate>,
) -> Self {
Self {
num_worker_threads: None,
object_store,
metric_registry,
remote_template,
}
}
@ -390,12 +398,17 @@ impl<M: ConnectionManager> Server<M> {
object_store,
// to test the metrics provide a different registry to the `ServerConfig`.
metric_registry,
remote_template,
} = config;
let num_worker_threads = num_worker_threads.unwrap_or_else(num_cpus::get);
Self {
id: Default::default(),
config: Arc::new(Config::new(Arc::clone(&jobs), Arc::clone(&metric_registry))),
config: Arc::new(Config::new(
Arc::clone(&jobs),
Arc::clone(&metric_registry),
remote_template,
)),
store: object_store,
connection_manager: Arc::new(connection_manager),
exec: Arc::new(Executor::new(num_worker_threads)),
@ -937,23 +950,25 @@ impl ConnectionManager for ConnectionManagerImpl {
&self,
connect: &str,
) -> Result<Arc<Self::RemoteServer>, ConnectionManagerError> {
cached_remote_server(connect.to_string()).await
let ret = cached_remote_server(connect.to_string()).await?;
debug!(was_cached=%ret.was_cached, %connect, "getting remote connection");
Ok(ret.value)
}
}
// cannot be an associated function
// argument need to have static lifetime because they become caching keys
#[cached(result = true)]
#[cached(result = true, with_cached_flag = true)]
async fn cached_remote_server(
connect: String,
) -> Result<Arc<RemoteServerImpl>, ConnectionManagerError> {
) -> Result<Return<Arc<RemoteServerImpl>>, ConnectionManagerError> {
let connection = Builder::default()
.build(&connect)
.await
.map_err(|e| Box::new(e) as _)
.context(RemoteServerConnectError)?;
let client = write::Client::new(connection);
Ok(Arc::new(RemoteServerImpl { client }))
Ok(Return::new(Arc::new(RemoteServerImpl { client })))
}
/// An implementation for communicating with other IOx servers. This should
@ -1055,11 +1070,7 @@ mod tests {
let test_registry = metrics::TestMetricRegistry::new(Arc::clone(&registry));
(
test_registry,
ServerConfig::new(
Arc::new(object_store),
registry, // new registry ensures test isolation of metrics
)
.with_num_worker_threads(1),
ServerConfig::new(Arc::new(object_store), registry, None).with_num_worker_threads(1),
)
}
@ -1158,8 +1169,8 @@ mod tests {
store.list_with_delimiter(&store.new_path()).await.unwrap();
let manager = TestConnectionManager::new();
let config2 =
ServerConfig::new(store, Arc::new(MetricRegistry::new())).with_num_worker_threads(1);
let config2 = ServerConfig::new(store, Arc::new(MetricRegistry::new()), Option::None)
.with_num_worker_threads(1);
let server2 = Server::new(manager, config2);
server2.set_id(ServerId::try_from(1).unwrap()).unwrap();
server2.load_database_configs().await.unwrap();

View File

@ -1,328 +0,0 @@
use data_types::partition_metadata::{PartitionSummary, TableSummary};
use internal_types::selection::Selection;
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
use query::{predicate::EMPTY_PREDICATE, PartitionChunk};
use std::sync::Arc;
use bytes::Bytes;
use observability_deps::tracing::{error, info};
use parking_lot::Mutex;
use snafu::{ResultExt, Snafu};
use tokio::sync::oneshot;
use uuid::Uuid;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Partition error creating snapshot: {}", source))]
PartitionError {
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display("Table position out of bounds: {}", position))]
TablePositionOutOfBounds { position: usize },
#[snafu(display("Error generating json response: {}", source))]
JsonGenerationError { source: serde_json::Error },
#[snafu(display("Error opening Parquet Writer: {}", source))]
ParquetStreamToByte {
source: parquet_file::storage::Error,
},
#[snafu(display("Error writing to object store: {}", source))]
WritingToObjectStore { source: object_store::Error },
#[snafu(display("Error reading batches while writing to '{}': {}", file_name, source))]
ReadingBatches {
file_name: String,
source: arrow::error::ArrowError,
},
#[snafu(display("Stopped early"))]
StoppedEarly,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Code for snapshotting a database chunk to a Parquet
/// file in object storage.
#[derive(Debug)]
pub struct Snapshot<T>
where
T: Send + Sync + 'static + PartitionChunk,
{
pub id: Uuid,
pub partition_summary: PartitionSummary,
pub metadata_path: object_store::path::Path,
pub data_path: object_store::path::Path,
store: Arc<ObjectStore>,
chunk: Arc<T>,
status: Mutex<Status>,
}
impl<T> Snapshot<T>
where
T: Send + Sync + 'static + PartitionChunk,
{
fn new(
partition_key: impl Into<String>,
metadata_path: object_store::path::Path,
data_path: object_store::path::Path,
store: Arc<ObjectStore>,
partition: Arc<T>,
table: TableSummary,
) -> Self {
let status = Status::new(TableState::NotStarted);
Self {
id: Uuid::new_v4(),
partition_summary: PartitionSummary {
key: partition_key.into(),
tables: vec![table],
},
metadata_path,
data_path,
store,
chunk: partition,
status: Mutex::new(status),
}
}
fn mark_table_running(&self) {
let mut status = self.status.lock();
if status.table_state == TableState::NotStarted {
status.table_state = TableState::Running;
}
}
fn mark_table_finished(&self) {
let mut status = self.status.lock();
status.table_state = TableState::Finished;
}
fn mark_meta_written(&self) {
let mut status = self.status.lock();
status.meta_written = true;
}
pub fn finished(&self) -> bool {
let status = self.status.lock();
matches!(status.table_state, TableState::Finished)
}
fn should_stop(&self) -> bool {
let status = self.status.lock();
status.stop_on_next_update
}
async fn run(&self, notify: Option<oneshot::Sender<()>>) -> Result<()> {
self.mark_table_running();
// get all the data in this chunk:
let table_name = self.partition_summary.tables[0].name.as_ref();
let stream = self
.chunk
.read_filter(table_name, &EMPTY_PREDICATE, Selection::All)
.map_err(|e| Box::new(e) as _)
.context(PartitionError)?;
let schema = stream.schema();
let mut location = self.data_path.clone();
let file_name = format!("{}.parquet", table_name);
location.set_file_name(&file_name);
let data = parquet_file::storage::Storage::parquet_stream_to_bytes(stream, schema)
.await
.context(ParquetStreamToByte)?;
self.write_to_object_store(data, &location).await?;
self.mark_table_finished();
if self.should_stop() {
return StoppedEarly.fail();
}
let mut partition_meta_path = self.metadata_path.clone();
let key = format!("{}.json", &self.partition_summary.key);
partition_meta_path.set_file_name(&key);
let json_data = serde_json::to_vec(&self.partition_summary).context(JsonGenerationError)?;
let data = Bytes::from(json_data);
let len = data.len();
let stream_data = std::io::Result::Ok(data);
self.store
.put(
&partition_meta_path,
futures::stream::once(async move { stream_data }),
Some(len),
)
.await
.context(WritingToObjectStore)?;
self.mark_meta_written();
if let Some(notify) = notify {
if let Err(e) = notify.send(()) {
error!("error sending notify: {:?}", e);
}
}
Ok(())
}
async fn write_to_object_store(
&self,
data: Vec<u8>,
file_name: &object_store::path::Path,
) -> Result<()> {
let len = data.len();
let data = Bytes::from(data);
let stream_data = Result::Ok(data);
self.store
.put(
&file_name,
futures::stream::once(async move { stream_data }),
Some(len),
)
.await
.context(WritingToObjectStore)
}
fn set_error(&self, e: Error) {
let mut status = self.status.lock();
status.error = Some(e);
}
}
#[derive(Debug, PartialEq, Clone)]
pub enum TableState {
NotStarted,
Running,
Finished,
}
#[derive(Debug)]
pub struct Status {
table_state: TableState,
meta_written: bool,
stop_on_next_update: bool,
error: Option<Error>,
}
impl Status {
fn new(table_state: TableState) -> Self {
Self {
table_state,
meta_written: false,
stop_on_next_update: false,
error: None,
}
}
}
pub fn snapshot_chunk<T>(
metadata_path: object_store::path::Path,
data_path: object_store::path::Path,
store: Arc<ObjectStore>,
partition_key: &str,
chunk: Arc<T>,
table_stats: TableSummary,
notify: Option<oneshot::Sender<()>>,
) -> Result<Arc<Snapshot<T>>>
where
T: Send + Sync + 'static + PartitionChunk,
{
let snapshot = Snapshot::new(
partition_key.to_string(),
metadata_path,
data_path,
store,
chunk,
table_stats,
);
let snapshot = Arc::new(snapshot);
let return_snapshot = Arc::clone(&snapshot);
tokio::spawn(async move {
info!(
"starting snapshot of {} to {}",
&snapshot.partition_summary.key,
&snapshot.data_path.display()
);
if let Err(e) = snapshot.run(notify).await {
error!("error running snapshot: {:?}", e);
snapshot.set_error(e);
}
});
Ok(return_snapshot)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{db::test_helpers::write_lp, query_tests::utils::TestDb};
use futures::TryStreamExt;
use object_store::memory::InMemory;
use query::{predicate::Predicate, Database};
#[tokio::test]
async fn snapshot() {
let lp = r#"
cpu,host=A,region=west user=23.2,system=55.1 1
cpu,host=A,region=west user=3.2,system=50.1 10
cpu,host=B,region=east user=10.0,system=74.1 1
"#;
let db = TestDb::builder()
.object_store(Arc::new(ObjectStore::new_in_memory(InMemory::new())))
.build()
.await
.db;
write_lp(&db, &lp);
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let (tx, rx) = tokio::sync::oneshot::channel();
let mut metadata_path = store.new_path();
metadata_path.push_dir("meta");
let mut data_path = store.new_path();
data_path.push_dir("data");
let chunk = Arc::clone(&db.chunks(&Predicate::default())[0]);
let table_summary = db
.table_summary("1970-01-01T00", "cpu", chunk.id())
.unwrap();
let snapshot = snapshot_chunk(
metadata_path.clone(),
data_path,
Arc::clone(&store),
"testaroo",
chunk,
table_summary,
Some(tx),
)
.unwrap();
rx.await.unwrap();
let mut location = metadata_path;
location.set_file_name("testaroo.json");
let summary = store
.get(&location)
.await
.unwrap()
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await
.unwrap();
let meta: PartitionSummary = serde_json::from_slice(&*summary).unwrap();
assert_eq!(meta, snapshot.partition_summary);
assert!(snapshot.finished());
}
}

View File

@ -397,6 +397,15 @@ Possible values (case insensitive):
/// environments.
#[structopt(long = "--azure-storage-access-key", env = "AZURE_STORAGE_ACCESS_KEY")]
pub azure_storage_access_key: Option<String>,
/// When IOx nodes need to talk to remote peers they consult an internal remote address
/// mapping. This mapping is populated via API calls. If the mapping doesn't produce
/// a result, this config entry allows to generate a hostname from at template:
/// occurrences of the "{id}" substring will be replaced with the remote Server ID.
///
/// Example: http://node-{id}.ioxmydomain.com:8082
#[structopt(long = "--remote-template", env = "INFLUXDB_IOX_REMOTE_TEMPLATE")]
pub remote_template: Option<String>,
}
pub async fn command(config: Config) -> Result<()> {

View File

@ -7,7 +7,7 @@ use object_store::{
use observability_deps::tracing::{self, error, info, warn, Instrument};
use panic_logging::SendPanicsToTracing;
use server::{
ConnectionManagerImpl as ConnectionManager, Server as AppServer,
ConnectionManagerImpl as ConnectionManager, RemoteTemplate, Server as AppServer,
ServerConfig as AppServerConfig,
};
use snafu::{ResultExt, Snafu};
@ -123,7 +123,8 @@ pub async fn main(config: Config) -> Result<()> {
let object_store = ObjectStore::try_from(&config)?;
let object_storage = Arc::new(object_store);
let metric_registry = Arc::new(metrics::MetricRegistry::new());
let server_config = AppServerConfig::new(object_storage, metric_registry);
let remote_template = config.remote_template.map(RemoteTemplate::new);
let server_config = AppServerConfig::new(object_storage, metric_registry, remote_template);
let server_config = if let Some(n) = config.num_worker_threads {
info!(

View File

@ -18,8 +18,7 @@ use data_types::{
};
use influxdb_iox_client::format::QueryOutputFormat;
use influxdb_line_protocol::parse_lines;
use object_store::ObjectStoreApi;
use query::{Database, PartitionChunk};
use query::Database;
use server::{ConnectionManager, Server as AppServer};
// External crates
@ -361,7 +360,6 @@ where
.get("/metrics", handle_metrics::<M>)
.get("/iox/api/v1/databases/:name/query", query::<M>)
.get("/api/v1/partitions", list_partitions::<M>)
.post("/api/v1/snapshot", snapshot_partition::<M>)
.get("/debug/pprof", pprof_home::<M>)
.get("/debug/pprof/profile", pprof_profile::<M>)
// Specify the error handler to handle any errors caused by
@ -737,78 +735,6 @@ struct SnapshotInfo {
table_name: String,
}
#[tracing::instrument(level = "debug")]
async fn snapshot_partition<M: ConnectionManager + Send + Sync + Debug + 'static>(
req: Request<Body>,
) -> Result<Response<Body>, ApplicationError> {
use object_store::path::ObjectStorePath;
let path = req.uri().path().to_string();
let server = Arc::clone(&req.data::<Arc<AppServer<M>>>().expect("server state"));
// TODO - catch error conditions
let obs = server.metrics.http_requests.observation();
let query = req.uri().query().context(ExpectedQueryString {})?;
let snapshot: SnapshotInfo = serde_urlencoded::from_str(query).context(InvalidQueryString {
query_string: query,
})?;
let db_name =
org_and_bucket_to_database(&snapshot.org, &snapshot.bucket).context(BucketMappingError)?;
let metric_kv = vec![
KeyValue::new("db_name", db_name.to_string()),
KeyValue::new("path", path),
];
// TODO: refactor the rest of this out of the http route and into the server
// crate.
let db = server.db(&db_name).context(BucketNotFound {
org: &snapshot.org,
bucket: &snapshot.bucket,
})?;
let store = Arc::clone(&server.store);
let mut metadata_path = store.new_path();
metadata_path.push_dir(&db_name.to_string());
let mut data_path = metadata_path.clone();
metadata_path.push_dir("meta");
data_path.push_all_dirs(&["data", &snapshot.partition]);
let partition_key = &snapshot.partition;
let table_name = &snapshot.table_name;
if let Some(chunk) = db
.rollover_partition(partition_key, table_name)
.await
.unwrap()
{
let table_stats = db
.table_summary(partition_key, table_name, chunk.id())
.unwrap();
let snapshot = server::snapshot::snapshot_chunk(
metadata_path,
data_path,
store,
partition_key,
chunk,
table_stats,
None,
)
.unwrap();
obs.ok_with_labels(&metric_kv);
let ret = format!("{}", snapshot.id);
Ok(Response::new(Body::from(ret)))
} else {
Err(ApplicationError::NoSnapshot {
db_name: db_name.to_string(),
partition: partition_key.to_string(),
table_name: table_name.to_string(),
})
}
}
#[tracing::instrument(level = "debug")]
async fn pprof_home<M: ConnectionManager + Send + Sync + Debug + 'static>(
req: Request<Body>,
@ -923,7 +849,6 @@ mod tests {
use std::{
convert::TryFrom,
net::{IpAddr, Ipv4Addr, SocketAddr},
num::NonZeroU32,
};
use arrow::record_batch::RecordBatch;
@ -943,6 +868,7 @@ mod tests {
AppServerConfig::new(
Arc::new(ObjectStore::new_in_memory(InMemory::new())),
registry,
None,
)
.with_num_worker_threads(1),
)
@ -1320,53 +1246,6 @@ mod tests {
.await;
}
#[tokio::test]
async fn test_snapshot() {
let (_, config) = config();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
app_server
.set_id(ServerId::new(NonZeroU32::new(1).unwrap()))
.unwrap();
app_server
.create_database(
DatabaseRules::new(DatabaseName::new("MyOrg_MyBucket").unwrap()),
app_server.require_id().unwrap(),
)
.await
.unwrap();
let server_url = test_server(Arc::clone(&app_server));
let client = Client::new();
let lp_data = "h2o_temperature,location=santa_monica,state=CA surface_degrees=65.2,bottom_degrees=50.4 1617286224000000000";
// send write data
let bucket_name = "MyBucket";
let org_name = "MyOrg";
let response = client
.post(&format!(
"{}/api/v2/write?bucket={}&org={}",
server_url, bucket_name, org_name
))
.body(lp_data)
.send()
.await;
check_response("write", response, StatusCode::NO_CONTENT, Some("")).await;
// issue first snapshot => OK
let url = format!(
"{}/api/v1/snapshot?bucket={}&org={}&partition=&table_name=h2o_temperature",
server_url, bucket_name, org_name
);
let response = client.post(&url).body(lp_data).send().await;
check_response("snapshot", response, StatusCode::OK, None).await;
// second snapshot results in "not modified"
let response = client.post(&url).body(lp_data).send().await;
check_response("snapshot", response, StatusCode::NOT_MODIFIED, None).await;
}
fn get_content_type(response: &Result<Response, reqwest::Error>) -> String {
if let Ok(response) = response {
response