Merge branch 'main' into ntran/no_use_stats
commit
ea26a77217
|
@ -13,5 +13,3 @@ updates:
|
|||
# Additionally the thrift-compiler version available in standard repos tends to lag
|
||||
# the latest release significantly, and so updating to the latest version adds friction
|
||||
- dependency-name: "thrift"
|
||||
# https://github.com/influxdata/influxdb_iox/issues/2735
|
||||
- dependency-name: "smallvec"
|
||||
|
|
|
@ -6,10 +6,9 @@ edition = "2018"
|
|||
|
||||
[dependencies] # In alphabetical order
|
||||
nom = "7"
|
||||
# See https://github.com/influxdata/influxdb_iox/issues/2735 before bumping smallvec
|
||||
smallvec = "1.6.1"
|
||||
smallvec = "1.7.0"
|
||||
snafu = "0.6.2"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
|
||||
[dev-dependencies] # In alphabetical order
|
||||
test_helpers = { path = "../test_helpers" }
|
||||
test_helpers = { path = "../test_helpers" }
|
|
@ -104,8 +104,16 @@ pub struct SeriesSet {
|
|||
/// pending on what is required for the gRPC layer.
|
||||
#[derive(Debug)]
|
||||
pub struct GroupDescription {
|
||||
/// key = value pairs that define the group
|
||||
pub tags: Vec<(Arc<str>, Arc<str>)>,
|
||||
/// the names of all tags (not just the tags used for grouping)
|
||||
pub all_tags: Vec<Arc<str>>,
|
||||
|
||||
/// the values of the group tags that defined the group.
|
||||
/// For example,
|
||||
///
|
||||
/// If there were tags `t0`, `t1`, and `t2`, and the query had
|
||||
/// group_keys of `[t1, t2]` then this list would have the values
|
||||
/// of the t1 and t2 columns
|
||||
pub gby_vals: Vec<Arc<str>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -415,9 +423,18 @@ impl GroupGenerator {
|
|||
if need_group_start {
|
||||
let group_tags = series_set.tags[0..num_prefix_tag_group_columns].to_vec();
|
||||
|
||||
let group_desc = GroupDescription {
|
||||
tags: group_tags.clone(),
|
||||
};
|
||||
let all_tags = series_set
|
||||
.tags
|
||||
.iter()
|
||||
.map(|(tag, _value)| Arc::clone(tag))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let gby_vals = group_tags
|
||||
.iter()
|
||||
.map(|(_tag, value)| Arc::clone(value))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let group_desc = GroupDescription { all_tags, gby_vals };
|
||||
|
||||
self.last_group_tags = Some(group_tags);
|
||||
return Some(group_desc);
|
||||
|
@ -780,10 +797,10 @@ mod tests {
|
|||
.await;
|
||||
|
||||
// expect the output to be
|
||||
// Group1 (tag_a = one)
|
||||
// Group1 (tags: tag_a, tag_b, vals = one)
|
||||
// Series1 (tag_a = one, tag_b = ten)
|
||||
// Series2 (tag_a = one, tag_b = ten)
|
||||
// Group2 (tag_a = two)
|
||||
// Group2 (tags: tag_a, tag_b, vals = two)
|
||||
// Series3 (tag_a = two, tag_b = eleven)
|
||||
|
||||
assert_eq!(results.len(), 5, "results were\n{:#?}", results); // 3 series, two groups (one and two)
|
||||
|
@ -794,7 +811,8 @@ mod tests {
|
|||
let group_2 = extract_group(&results[3]);
|
||||
let series_set3 = extract_series_set(&results[4]);
|
||||
|
||||
assert_eq!(group_1.tags, str_pair_vec_to_vec(&[("tag_a", "one")]));
|
||||
assert_eq!(group_1.all_tags, str_vec_to_arc_vec(&["tag_a", "tag_b"]));
|
||||
assert_eq!(group_1.gby_vals, str_vec_to_arc_vec(&["one"]));
|
||||
|
||||
assert_eq!(series_set1.table_name.as_ref(), "foo");
|
||||
assert_eq!(
|
||||
|
@ -812,7 +830,8 @@ mod tests {
|
|||
assert_eq!(series_set2.start_row, 1);
|
||||
assert_eq!(series_set2.num_rows, 1);
|
||||
|
||||
assert_eq!(group_2.tags, str_pair_vec_to_vec(&[("tag_a", "two")]));
|
||||
assert_eq!(group_2.all_tags, str_vec_to_arc_vec(&["tag_a", "tag_b"]));
|
||||
assert_eq!(group_2.gby_vals, str_vec_to_arc_vec(&["two"]));
|
||||
|
||||
assert_eq!(series_set3.table_name.as_ref(), "foo");
|
||||
assert_eq!(
|
||||
|
@ -844,8 +863,8 @@ mod tests {
|
|||
let field_columns = ["float_field"];
|
||||
let results = convert_groups(table_name, &tag_columns, 0, &field_columns, input).await;
|
||||
|
||||
// expect the output to be
|
||||
// Group1 (tag_a = one)
|
||||
// expect the output to be be (no vals, because no group columns are specified)
|
||||
// Group1 (tags: tag_a, tag_b, vals = [])
|
||||
// Series1 (tag_a = one, tag_b = ten)
|
||||
// Series2 (tag_a = one, tag_b = ten)
|
||||
|
||||
|
@ -855,7 +874,8 @@ mod tests {
|
|||
let series_set1 = extract_series_set(&results[1]);
|
||||
let series_set2 = extract_series_set(&results[2]);
|
||||
|
||||
assert_eq!(group_1.tags, &[]);
|
||||
assert_eq!(group_1.all_tags, str_vec_to_arc_vec(&["tag_a", "tag_b"]));
|
||||
assert_eq!(group_1.gby_vals, str_vec_to_arc_vec(&[]));
|
||||
|
||||
assert_eq!(series_set1.table_name.as_ref(), "foo");
|
||||
assert_eq!(
|
||||
|
@ -898,7 +918,7 @@ mod tests {
|
|||
let mut converter = SeriesSetConverter::default();
|
||||
|
||||
let table_name = Arc::from(table_name);
|
||||
let tag_columns = str_vec_to_arc_vec(tag_columns);
|
||||
let tag_columns = Arc::new(str_vec_to_arc_vec(tag_columns));
|
||||
let field_columns = FieldColumns::from(field_columns);
|
||||
|
||||
converter
|
||||
|
@ -925,7 +945,7 @@ mod tests {
|
|||
let mut converter = SeriesSetConverter::default();
|
||||
|
||||
let table_name = Arc::from(table_name);
|
||||
let tag_columns = str_vec_to_arc_vec(tag_columns);
|
||||
let tag_columns = Arc::new(str_vec_to_arc_vec(tag_columns));
|
||||
let field_columns = FieldColumns::from(field_columns);
|
||||
|
||||
converter
|
||||
|
|
113
server/src/db.rs
113
server/src/db.rs
|
@ -31,7 +31,7 @@ use entry::{Entry, Sequence, SequencedEntry, TableBatch};
|
|||
use internal_types::schema::Schema;
|
||||
use iox_object_store::IoxObjectStore;
|
||||
use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
|
||||
use observability_deps::tracing::{debug, error, info};
|
||||
use observability_deps::tracing::{debug, error, info, warn};
|
||||
use parquet_file::catalog::{
|
||||
cleanup::{delete_files as delete_parquet_files, get_unreferenced_parquet_files},
|
||||
core::PreservedCatalog,
|
||||
|
@ -277,6 +277,9 @@ pub struct Db {
|
|||
/// Number of iterations of the worker cleanup loop for this Db
|
||||
worker_iterations_cleanup: AtomicUsize,
|
||||
|
||||
/// Number of iterations of the worker delete predicate preservation loop for this Db
|
||||
worker_iterations_delete_predicate_preservation: AtomicUsize,
|
||||
|
||||
/// Optional write buffer producer
|
||||
/// TODO: Move onto Database
|
||||
write_buffer_producer: Option<Arc<dyn WriteBufferWriting>>,
|
||||
|
@ -299,6 +302,9 @@ pub struct Db {
|
|||
|
||||
/// TESTING ONLY: Mocked `Instant::now()` for the background worker
|
||||
background_worker_now_override: Mutex<Option<Instant>>,
|
||||
|
||||
/// To-be-written delete predicates.
|
||||
delete_predicates_mailbox: Mutex<Vec<(Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>)>>,
|
||||
}
|
||||
|
||||
/// All the information needed to commit a database
|
||||
|
@ -347,10 +353,12 @@ impl Db {
|
|||
catalog_access,
|
||||
worker_iterations_lifecycle: AtomicUsize::new(0),
|
||||
worker_iterations_cleanup: AtomicUsize::new(0),
|
||||
worker_iterations_delete_predicate_preservation: AtomicUsize::new(0),
|
||||
write_buffer_producer: database_to_commit.write_buffer_producer,
|
||||
cleanup_lock: Default::default(),
|
||||
lifecycle_policy: tokio::sync::Mutex::new(None),
|
||||
background_worker_now_override: Default::default(),
|
||||
delete_predicates_mailbox: Default::default(),
|
||||
};
|
||||
let this = Arc::new(this);
|
||||
*this.lifecycle_policy.try_lock().expect("not used yet") = Some(
|
||||
|
@ -583,12 +591,8 @@ impl Db {
|
|||
}
|
||||
|
||||
if !affected_persisted_chunks.is_empty() {
|
||||
let mut transaction = self.preserved_catalog.open_transaction().await;
|
||||
transaction.delete_predicate(&delete_predicate, &affected_persisted_chunks);
|
||||
transaction
|
||||
.commit()
|
||||
.await
|
||||
.context(CommitDeletePredicateError)?;
|
||||
let mut guard = self.delete_predicates_mailbox.lock();
|
||||
guard.push((delete_predicate, affected_persisted_chunks));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -798,6 +802,12 @@ impl Db {
|
|||
self.worker_iterations_cleanup.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Returns the number of iterations of the background worker delete predicate preservation loop
|
||||
pub fn worker_iterations_delete_predicate_preservation(&self) -> usize {
|
||||
self.worker_iterations_delete_predicate_preservation
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Perform sequencer-driven replay for this DB.
|
||||
///
|
||||
/// When `replay_plan` is `None` then no real replay will be performed. Instead the write buffer streams will be set
|
||||
|
@ -887,11 +897,40 @@ impl Db {
|
|||
}
|
||||
};
|
||||
|
||||
// worker loop to persist delete predicates
|
||||
let delete_predicate_persistence_loop = async {
|
||||
loop {
|
||||
let todo: Vec<_> = {
|
||||
let guard = self.delete_predicates_mailbox.lock();
|
||||
guard.clone()
|
||||
};
|
||||
|
||||
if !todo.is_empty() {
|
||||
match self.preserve_delete_predicates(&todo).await {
|
||||
Ok(()) => {
|
||||
let mut guard = self.delete_predicates_mailbox.lock();
|
||||
// TODO: we could also run a de-duplication here once
|
||||
// https://github.com/influxdata/influxdb_iox/issues/2626 is implemented
|
||||
guard.drain(0..todo.len());
|
||||
}
|
||||
Err(e) => {
|
||||
error!(%e, "cannot preserve delete predicates");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.worker_iterations_delete_predicate_preservation
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
};
|
||||
|
||||
// None of the futures need to perform drain logic on shutdown.
|
||||
// When the first one finishes, all of them are dropped
|
||||
tokio::select! {
|
||||
_ = lifecycle_loop => error!("lifecycle loop exited - db worker bailing out"),
|
||||
_ = object_store_cleanup_loop => error!("object store cleanup exited - db worker bailing out"),
|
||||
_ = object_store_cleanup_loop => error!("object store cleanup loop exited - db worker bailing out"),
|
||||
_ = delete_predicate_persistence_loop => error!("delete predicate persistence loop exited - db worker bailing out"),
|
||||
_ = shutdown.cancelled() => info!("db worker shutting down"),
|
||||
}
|
||||
|
||||
|
@ -915,6 +954,44 @@ impl Db {
|
|||
delete_parquet_files(&self.preserved_catalog, &files).await
|
||||
}
|
||||
|
||||
async fn preserve_delete_predicates(
|
||||
self: &Arc<Self>,
|
||||
predicates: &[(Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>)],
|
||||
) -> Result<(), parquet_file::catalog::core::Error> {
|
||||
let mut transaction = self.preserved_catalog.open_transaction().await;
|
||||
for (predicate, chunks) in predicates {
|
||||
transaction.delete_predicate(predicate, chunks);
|
||||
}
|
||||
let ckpt_handle = transaction.commit().await?;
|
||||
|
||||
let catalog_transactions_until_checkpoint = self
|
||||
.rules
|
||||
.read()
|
||||
.lifecycle_rules
|
||||
.catalog_transactions_until_checkpoint
|
||||
.get();
|
||||
let create_checkpoint =
|
||||
ckpt_handle.revision_counter() % catalog_transactions_until_checkpoint == 0;
|
||||
if create_checkpoint {
|
||||
// Commit is already done, so we can just scan the catalog for the state.
|
||||
//
|
||||
// NOTE: There can only be a single transaction in this section because the checkpoint handle holds
|
||||
// transaction lock. Therefore we don't need to worry about concurrent modifications of
|
||||
// preserved chunks.
|
||||
if let Err(e) = ckpt_handle
|
||||
.create_checkpoint(checkpoint_data_from_catalog(&self.catalog))
|
||||
.await
|
||||
{
|
||||
warn!(%e, "cannot create catalog checkpoint");
|
||||
|
||||
// That's somewhat OK. Don't fail the entire task, because the actual preservation was completed
|
||||
// (both in-mem and within the preserved catalog).
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stores an entry based on the configuration.
|
||||
pub async fn store_entry(&self, entry: Entry, time_of_write: DateTime<Utc>) -> Result<()> {
|
||||
let immutable = {
|
||||
|
@ -3680,6 +3757,26 @@ mod tests {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
// ==================== do: use background worker for a short while ====================
|
||||
let iters_start = db.worker_iterations_delete_predicate_preservation();
|
||||
let shutdown: CancellationToken = Default::default();
|
||||
let shutdown_captured = shutdown.clone();
|
||||
let db_captured = Arc::clone(&db);
|
||||
let join_handle =
|
||||
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
|
||||
|
||||
let t_0 = Instant::now();
|
||||
loop {
|
||||
if db.worker_iterations_delete_predicate_preservation() > iters_start {
|
||||
break;
|
||||
}
|
||||
assert!(t_0.elapsed() < Duration::from_secs(10));
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
|
||||
shutdown.cancel();
|
||||
join_handle.await.unwrap();
|
||||
|
||||
// ==================== check: delete predicates ====================
|
||||
let closure_check_delete_predicates = |db: &Db| {
|
||||
for chunk in db.catalog.chunks() {
|
||||
|
|
|
@ -104,13 +104,13 @@ pub fn series_set_item_to_read_response(series_set_item: SeriesSetItem) -> Resul
|
|||
Ok(ReadResponse { frames })
|
||||
}
|
||||
|
||||
/// Converts a [`GroupDescription`] into a storage gRPC `GroupFrame`
|
||||
/// format that can be returned to the client.
|
||||
fn group_description_to_frames(group_description: GroupDescription) -> Vec<Frame> {
|
||||
// split key=value pairs into two separate vectors
|
||||
let (tag_keys, partition_key_vals): (Vec<Vec<u8>>, Vec<Vec<u8>>) = group_description
|
||||
.tags
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k.bytes().collect(), v.bytes().collect()))
|
||||
.unzip();
|
||||
let GroupDescription { all_tags, gby_vals } = group_description;
|
||||
|
||||
let all_tags = all_tags.into_iter().map(|t| t.bytes().collect());
|
||||
|
||||
// Flux expects there to be `_field` and `_measurement` as the
|
||||
// first two "tags". Note this means the lengths of tag_keys and
|
||||
|
@ -119,7 +119,12 @@ fn group_description_to_frames(group_description: GroupDescription) -> Vec<Frame
|
|||
// See https://github.com/influxdata/influxdb_iox/issues/2690 for gory details
|
||||
let tag_keys = vec![b"_field".to_vec(), b"_measurement".to_vec()]
|
||||
.into_iter()
|
||||
.chain(tag_keys.into_iter())
|
||||
.chain(all_tags)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let partition_key_vals = gby_vals
|
||||
.into_iter()
|
||||
.map(|v| v.bytes().collect())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let group_frame = GroupFrame {
|
||||
|
@ -541,10 +546,8 @@ mod tests {
|
|||
#[test]
|
||||
fn test_group_group_conversion() {
|
||||
let group_description = GroupDescription {
|
||||
tags: vec![
|
||||
(Arc::from("tag1"), Arc::from("val1")),
|
||||
(Arc::from("tag2"), Arc::from("val2")),
|
||||
],
|
||||
all_tags: vec![Arc::from("tag1"), Arc::from("tag2")],
|
||||
gby_vals: vec![Arc::from("val1"), Arc::from("val2")],
|
||||
};
|
||||
|
||||
let grouped_series_set_item = SeriesSetItem::GroupStart(group_description);
|
||||
|
|
|
@ -60,8 +60,8 @@ pub fn make_temp_file<C: AsRef<[u8]>>(contents: C) -> tempfile::NamedTempFile {
|
|||
}
|
||||
|
||||
/// convert form that is easier to type in tests to what some code needs
|
||||
pub fn str_vec_to_arc_vec(str_vec: &[&str]) -> Arc<Vec<Arc<str>>> {
|
||||
Arc::new(str_vec.iter().map(|s| Arc::from(*s)).collect())
|
||||
pub fn str_vec_to_arc_vec(str_vec: &[&str]) -> Vec<Arc<str>> {
|
||||
str_vec.iter().map(|s| Arc::from(*s)).collect()
|
||||
}
|
||||
|
||||
/// convert form that is easier to type in tests to what some code needs
|
||||
|
|
|
@ -213,6 +213,12 @@ impl ServerFixture {
|
|||
influxdb_iox_client::flight::Client::new(self.grpc_channel())
|
||||
}
|
||||
|
||||
/// Return a storage API client suitable for communicating with this
|
||||
/// server
|
||||
pub fn storage_client(&self) -> generated_types::storage_client::StorageClient<Connection> {
|
||||
generated_types::storage_client::StorageClient::new(self.grpc_channel())
|
||||
}
|
||||
|
||||
/// Restart test server.
|
||||
///
|
||||
/// This will break all currently connected clients!
|
||||
|
|
|
@ -4,7 +4,7 @@ use crate::common::server_fixture::ServerFixture;
|
|||
use futures::prelude::*;
|
||||
use generated_types::{
|
||||
aggregate::AggregateType,
|
||||
google::protobuf::{Any, Empty},
|
||||
google::protobuf::Empty,
|
||||
measurement_fields_response::FieldType,
|
||||
node::{Comparison, Type as NodeType, Value},
|
||||
read_group_request::Group,
|
||||
|
@ -335,24 +335,17 @@ pub async fn regex_operator_test() {
|
|||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
pub async fn read_group_test() {
|
||||
/// Creates and loads the common data for read_group
|
||||
async fn read_group_setup() -> (ServerFixture, Scenario) {
|
||||
let fixture = ServerFixture::create_shared().await;
|
||||
let mut management = fixture.management_client();
|
||||
let mut storage_client = StorageClient::new(fixture.grpc_channel());
|
||||
let influxdb2 = fixture.influxdb2_client();
|
||||
|
||||
let scenario = Scenario::new();
|
||||
scenario.create_database(&mut management).await;
|
||||
|
||||
load_read_group_data(&influxdb2, &scenario).await;
|
||||
|
||||
let read_source = scenario.read_source();
|
||||
|
||||
test_read_group_none_agg(&mut storage_client, &read_source).await;
|
||||
test_read_group_none_agg_with_predicate(&mut storage_client, &read_source).await;
|
||||
test_read_group_sum_agg(&mut storage_client, &read_source).await;
|
||||
test_read_group_last_agg(&mut storage_client, &read_source).await;
|
||||
(fixture, scenario)
|
||||
}
|
||||
|
||||
async fn load_read_group_data(client: &influxdb2_client::Client, scenario: &Scenario) {
|
||||
|
@ -378,15 +371,16 @@ async fn load_read_group_data(client: &influxdb2_client::Client, scenario: &Scen
|
|||
.expect("Wrote cpu line protocol data");
|
||||
}
|
||||
|
||||
// Standalone test for read_group with group keys and no aggregate
|
||||
// assumes that load_read_group_data has been previously run
|
||||
async fn test_read_group_none_agg(
|
||||
storage_client: &mut StorageClient<Connection>,
|
||||
read_source: &std::option::Option<Any>,
|
||||
) {
|
||||
/// Standalone test for read_group with group keys and no aggregate
|
||||
/// assumes that load_read_group_data has been previously run
|
||||
#[tokio::test]
|
||||
async fn test_read_group_none_agg() {
|
||||
let (fixture, scenario) = read_group_setup().await;
|
||||
let mut storage_client = fixture.storage_client();
|
||||
|
||||
// read_group(group_keys: region, agg: None)
|
||||
let read_group_request = ReadGroupRequest {
|
||||
read_source: read_source.clone(),
|
||||
read_source: scenario.read_source(),
|
||||
range: Some(TimestampRange {
|
||||
start: 0,
|
||||
end: 2001, // include all data
|
||||
|
@ -401,7 +395,7 @@ async fn test_read_group_none_agg(
|
|||
};
|
||||
|
||||
let expected_group_frames = vec![
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu1",
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu1",
|
||||
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
|
||||
"FloatPointsFrame, timestamps: [1000, 2000], values: \"20,21\"",
|
||||
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
|
||||
|
@ -410,7 +404,7 @@ async fn test_read_group_none_agg(
|
|||
"FloatPointsFrame, timestamps: [1000, 2000], values: \"10,11\"",
|
||||
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
|
||||
"FloatPointsFrame, timestamps: [1000, 2000], values: \"71,72\"",
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu2",
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu2",
|
||||
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
|
||||
"FloatPointsFrame, timestamps: [1000, 2000], values: \"40,41\"",
|
||||
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
|
||||
|
@ -421,7 +415,7 @@ async fn test_read_group_none_agg(
|
|||
"FloatPointsFrame, timestamps: [1000, 2000], values: \"61,62\"",
|
||||
];
|
||||
|
||||
let actual_group_frames = do_read_group_request(storage_client, read_group_request).await;
|
||||
let actual_group_frames = do_read_group_request(&mut storage_client, read_group_request).await;
|
||||
|
||||
assert_eq!(
|
||||
expected_group_frames, actual_group_frames,
|
||||
|
@ -431,12 +425,13 @@ async fn test_read_group_none_agg(
|
|||
}
|
||||
|
||||
/// Test that predicates make it through
|
||||
async fn test_read_group_none_agg_with_predicate(
|
||||
storage_client: &mut StorageClient<Connection>,
|
||||
read_source: &std::option::Option<Any>,
|
||||
) {
|
||||
#[tokio::test]
|
||||
async fn test_read_group_none_agg_with_predicate() {
|
||||
let (fixture, scenario) = read_group_setup().await;
|
||||
let mut storage_client = fixture.storage_client();
|
||||
|
||||
let read_group_request = ReadGroupRequest {
|
||||
read_source: read_source.clone(),
|
||||
read_source: scenario.read_source(),
|
||||
range: Some(TimestampRange {
|
||||
start: 0,
|
||||
end: 2000, // do not include data at timestamp 2000
|
||||
|
@ -451,19 +446,19 @@ async fn test_read_group_none_agg_with_predicate(
|
|||
};
|
||||
|
||||
let expected_group_frames = vec![
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu1",
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu1",
|
||||
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
|
||||
"FloatPointsFrame, timestamps: [1000], values: \"20\"",
|
||||
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
|
||||
"FloatPointsFrame, timestamps: [1000], values: \"10\"",
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu2",
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu2",
|
||||
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
|
||||
"FloatPointsFrame, timestamps: [1000], values: \"40\"",
|
||||
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=foo, type: 0",
|
||||
"FloatPointsFrame, timestamps: [1000], values: \"30\"",
|
||||
];
|
||||
|
||||
let actual_group_frames = do_read_group_request(storage_client, read_group_request).await;
|
||||
let actual_group_frames = do_read_group_request(&mut storage_client, read_group_request).await;
|
||||
|
||||
assert_eq!(
|
||||
expected_group_frames, actual_group_frames,
|
||||
|
@ -475,13 +470,14 @@ async fn test_read_group_none_agg_with_predicate(
|
|||
// Standalone test for read_group with group keys and an actual
|
||||
// "aggregate" (not a "selector" style). assumes that
|
||||
// load_read_group_data has been previously run
|
||||
async fn test_read_group_sum_agg(
|
||||
storage_client: &mut StorageClient<Connection>,
|
||||
read_source: &std::option::Option<Any>,
|
||||
) {
|
||||
#[tokio::test]
|
||||
async fn test_read_group_sum_agg() {
|
||||
let (fixture, scenario) = read_group_setup().await;
|
||||
let mut storage_client = fixture.storage_client();
|
||||
|
||||
// read_group(group_keys: region, agg: Sum)
|
||||
let read_group_request = ReadGroupRequest {
|
||||
read_source: read_source.clone(),
|
||||
read_source: scenario.read_source(),
|
||||
range: Some(TimestampRange {
|
||||
start: 0,
|
||||
end: 2001, // include all data
|
||||
|
@ -496,7 +492,7 @@ async fn test_read_group_sum_agg(
|
|||
};
|
||||
|
||||
let expected_group_frames = vec![
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu1",
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu1",
|
||||
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
|
||||
"FloatPointsFrame, timestamps: [2000], values: \"41\"",
|
||||
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
|
||||
|
@ -505,7 +501,7 @@ async fn test_read_group_sum_agg(
|
|||
"FloatPointsFrame, timestamps: [2000], values: \"21\"",
|
||||
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
|
||||
"FloatPointsFrame, timestamps: [2000], values: \"143\"",
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu2",
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu2",
|
||||
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
|
||||
"FloatPointsFrame, timestamps: [2000], values: \"81\"",
|
||||
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
|
||||
|
@ -516,7 +512,7 @@ async fn test_read_group_sum_agg(
|
|||
"FloatPointsFrame, timestamps: [2000], values: \"123\"",
|
||||
];
|
||||
|
||||
let actual_group_frames = do_read_group_request(storage_client, read_group_request).await;
|
||||
let actual_group_frames = do_read_group_request(&mut storage_client, read_group_request).await;
|
||||
|
||||
assert_eq!(
|
||||
expected_group_frames, actual_group_frames,
|
||||
|
@ -528,13 +524,14 @@ async fn test_read_group_sum_agg(
|
|||
// Standalone test for read_group with group keys and an actual
|
||||
// "selector" function last. assumes that
|
||||
// load_read_group_data has been previously run
|
||||
async fn test_read_group_last_agg(
|
||||
storage_client: &mut StorageClient<Connection>,
|
||||
read_source: &std::option::Option<Any>,
|
||||
) {
|
||||
#[tokio::test]
|
||||
async fn test_read_group_last_agg() {
|
||||
let (fixture, scenario) = read_group_setup().await;
|
||||
let mut storage_client = fixture.storage_client();
|
||||
|
||||
// read_group(group_keys: region, agg: Last)
|
||||
let read_group_request = ReadGroupRequest {
|
||||
read_source: read_source.clone(),
|
||||
read_source: scenario.read_source(),
|
||||
range: Some(TimestampRange {
|
||||
start: 0,
|
||||
end: 2001, // include all data
|
||||
|
@ -549,7 +546,7 @@ async fn test_read_group_last_agg(
|
|||
};
|
||||
|
||||
let expected_group_frames = vec![
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu1",
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu1",
|
||||
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
|
||||
"FloatPointsFrame, timestamps: [2000], values: \"21\"",
|
||||
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
|
||||
|
@ -558,7 +555,7 @@ async fn test_read_group_last_agg(
|
|||
"FloatPointsFrame, timestamps: [2000], values: \"11\"",
|
||||
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=foo, type: 0",
|
||||
"FloatPointsFrame, timestamps: [2000], values: \"72\"",
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu, partition_key_vals: cpu2",
|
||||
"GroupFrame, tag_keys: _field,_measurement,cpu,host, partition_key_vals: cpu2",
|
||||
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
|
||||
"FloatPointsFrame, timestamps: [2000], values: \"41\"",
|
||||
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
|
||||
|
@ -569,7 +566,7 @@ async fn test_read_group_last_agg(
|
|||
"FloatPointsFrame, timestamps: [2000], values: \"62\"",
|
||||
];
|
||||
|
||||
let actual_group_frames = do_read_group_request(storage_client, read_group_request).await;
|
||||
let actual_group_frames = do_read_group_request(&mut storage_client, read_group_request).await;
|
||||
|
||||
assert_eq!(
|
||||
expected_group_frames, actual_group_frames,
|
||||
|
|
Loading…
Reference in New Issue