chore: Enable last SQL test, retention.sql (#6721)

pull/24376/head
Andrew Lamb 2023-01-31 13:46:50 +01:00 committed by GitHub
parent cff422b795
commit e8e50df692
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 217 additions and 35 deletions

View File

@ -146,17 +146,15 @@ async fn pushdown() {
}
#[tokio::test]
#[ignore]
async fn retention() {
unimplemented!("See <https://github.com/influxdata/influxdb_iox/issues/6592>");
// test_helpers::maybe_start_logging();
//
// TestCase {
// input: "cases/in/retention.sql",
// chunk_stage: ChunkStage::Parquet,
// }
// .run()
// .await;
test_helpers::maybe_start_logging();
TestCase {
input: "cases/in/retention.sql",
chunk_stage: ChunkStage::Parquet,
}
.run()
.await;
}
#[tokio::test]

View File

@ -3,12 +3,13 @@
+------+------+----------------------+
| host | load | time |
+------+------+----------------------+
| a | 1 | 1970-01-01T00:00:00Z |
| b | 2 | 1970-01-01T00:00:00Z |
| bb | 21 | 1970-01-01T00:00:00Z |
| a | 1 | 2022-01-01T01:00:00Z |
| b | 2 | 2022-01-01T01:00:00Z |
| bb | 21 | 2022-01-01T01:00:00Z |
+------+------+----------------------+
-- SQL: EXPLAIN SELECT * FROM cpu order by host, load, time;
-- Results After Normalizing UUIDs
-- Results After Normalizing Filters
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
@ -23,10 +24,10 @@
| | SortPreservingMergeExec: [host@0 ASC,time@2 ASC] |
| | UnionExec |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 |
| | FilterExec: <REDACTED>
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 |
| | FilterExec: <REDACTED>
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
@ -34,11 +35,12 @@
+------+------+----------------------+
| host | load | time |
+------+------+----------------------+
| a | 1 | 1970-01-01T00:00:00Z |
| bb | 21 | 1970-01-01T00:00:00Z |
| a | 1 | 2022-01-01T01:00:00Z |
| bb | 21 | 2022-01-01T01:00:00Z |
+------+------+----------------------+
-- SQL: EXPLAIN SELECT * FROM cpu WHERE host != 'b' ORDER BY host,time;
-- Results After Normalizing UUIDs
-- Results After Normalizing Filters
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
@ -50,16 +52,16 @@
| | CoalescePartitionsExec |
| | ProjectionExec: expr=[host@0 as host, load@1 as load, time@2 as time] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: host@0 != b |
| | FilterExec: <REDACTED>
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
| | DeduplicateExec: [host@0 ASC,time@2 ASC] |
| | SortPreservingMergeExec: [host@0 ASC,time@2 ASC] |
| | UnionExec |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 |
| | FilterExec: <REDACTED>
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=host != Dictionary(Int32, Utf8("b")), pruning_predicate=host_min@0 != b OR b != host_max@1, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 |
| | FilterExec: <REDACTED>
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=host != Dictionary(Int32, Utf8("b")), pruning_predicate=host_min@0 != b OR b != host_max@1, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] |
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

View File

@ -5,12 +5,12 @@
SELECT * FROM cpu order by host, load, time;
-- should see only 2 chunks with predicate pushed down to ParquetExec
-- IOX_COMPARE: uuid
-- IOX_COMPARE: uuid, filters
EXPLAIN SELECT * FROM cpu order by host, load, time;
-- should return 2 rows
SELECT * FROM cpu WHERE host != 'b' ORDER BY host,time;
-- should see only 2 chunks with predicate pushed down to ParquetExec
-- IOX_COMPARE: uuid
-- IOX_COMPARE: uuid, filters
EXPLAIN SELECT * FROM cpu WHERE host != 'b' ORDER BY host,time;

View File

@ -5,6 +5,7 @@
//! -- IOX_SETUP: [test name]
//! ```
use iox_time::{SystemProvider, Time, TimeProvider};
use once_cell::sync::Lazy;
use std::collections::HashMap;
use test_helpers_end_to_end::Step;
@ -14,6 +15,9 @@ pub type SetupName = &'static str;
/// The steps that should be run when this setup is chosen.
pub type SetupSteps = Vec<Step>;
/// timestamps for the retention test
static RETENTION_SETUP: Lazy<RetentionSetup> = Lazy::new(RetentionSetup::new);
/// All possible setups for the [`TestCase`][crate::TestCase]s to use, indexed by name
pub static SETUPS: Lazy<HashMap<SetupName, SetupSteps>> = Lazy::new(|| {
HashMap::from([
@ -997,6 +1001,30 @@ pub static SETUPS: Lazy<HashMap<SetupName, SetupSteps>> = Lazy::new(|| {
},
],
),
(
"ThreeChunksWithRetention",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(RETENTION_SETUP.lp_partially_inside.clone()),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(RETENTION_SETUP.lp_fully_inside.clone()),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(RETENTION_SETUP.lp_fully_outside.clone()),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
Step::SetRetention(Some(RETENTION_SETUP.retention_period_ns)),
],
),
(
// Test data to validate fix for
// <https://github.com/influxdata/influxdb_iox/issues/2890>
@ -1025,3 +1053,77 @@ pub static SETUPS: Lazy<HashMap<SetupName, SetupSteps>> = Lazy::new(|| {
),
])
});
/// Holds parameters for retention period. Tests based on this need to
/// be run within the hour of being created.
///
/// ```text (cut off) (now)
/// time: -----------------------|--------------------------|>
///
/// partially_inside: |-----------------|
///
/// fully_inside: +1h |-----------|
///
/// fully_outside: |---------|
/// ```
///
/// Note this setup is only good for 1 hour after it was created.
struct RetentionSetup {
/// the retention period, relative to now() that the three data
/// chunks fall inside/outside
retention_period_ns: i64,
/// lineprotocol data partially inside retention
lp_partially_inside: String,
/// lineprotocol data fully inside (included in query)
lp_fully_inside: String,
/// lineprotocol data fully outside (excluded from query)
lp_fully_outside: String,
}
impl RetentionSetup {
fn new() -> Self {
let retention_period_1_hour_ns = 3600 * 1_000_000_000;
// Data is relative to this particular time stamp
let cutoff = Time::from_rfc3339("2022-01-01T00:00:00+00:00")
.unwrap()
.timestamp_nanos();
// Timestamp 1 hour later than the cutoff, so the data will be retained for 1 hour
let inside_retention = cutoff + retention_period_1_hour_ns;
let outside_retention = cutoff - 10; // before retention
let lp_partially_inside = format!(
"cpu,host=a load=1 {inside_retention}\n\
cpu,host=aa load=11 {outside_retention}"
);
let lp_fully_inside = format!(
"cpu,host=b load=2 {inside_retention}\n\
cpu,host=bb load=21 {inside_retention}"
);
let lp_fully_outside = format!(
"cpu,host=z load=3 {outside_retention}\n\
cpu,host=zz load=31 {outside_retention}"
);
// Set retention period to be at the cutoff date. Note that
// since real world time advances, after 1 hour of real world
// time the data that is inside the retention interval will
// move outside (and thus not appear in queries).
//
// Thus this setup is only valid for 1 hour.
let retention_period_ns = SystemProvider::new().now().timestamp_nanos() - cutoff;
Self {
// translate the retention period to be relative to now
retention_period_ns,
lp_partially_inside,
lp_fully_inside,
lp_fully_outside,
}
}
}

View File

@ -32,6 +32,9 @@ impl Client {
}
/// Create a namespace
///
/// `retention_period_ns` is the the retention period in nanoseconds, measured from `now()`.
/// `None` represents infinite retention (i.e. never drop data).
pub async fn create_namespace(
&mut self,
namespace: &str,
@ -49,6 +52,9 @@ impl Client {
}
/// Update retention for a namespace
///
/// `retention_period_ns` is the the retention period in nanoseconds, measured from `now()`.
/// `None` represents infinite retention (i.e. never drop data).
pub async fn update_namespace_retention(
&mut self,
namespace: &str,

View File

@ -5,7 +5,7 @@ use std::sync::Arc;
use data_types::{Namespace as CatalogNamespace, QueryPoolId, TopicId};
use generated_types::influxdata::iox::namespace::v1::*;
use iox_catalog::interface::Catalog;
use observability_deps::tracing::warn;
use observability_deps::tracing::{debug, warn};
use tonic::{Request, Response, Status};
/// Implementation of the gRPC namespace service
@ -59,18 +59,24 @@ impl namespace_service_server::NamespaceService for NamespaceService {
let mut repos = self.catalog.repositories().await;
let req = request.into_inner();
let CreateNamespaceRequest {
name: namespace_name,
retention_period_ns,
} = request.into_inner();
debug!(%namespace_name, ?retention_period_ns, "Creating namespace");
let namespace = repos
.namespaces()
.create(
&req.name,
req.retention_period_ns,
&namespace_name,
retention_period_ns,
self.topic_id.unwrap(),
self.query_id.unwrap(),
)
.await
.map_err(|e| {
warn!(error=%e, %req.name, "failed to create namespace");
warn!(error=%e, %namespace_name, "failed to create namespace");
Status::internal(e.to_string())
})?;
@ -85,17 +91,21 @@ impl namespace_service_server::NamespaceService for NamespaceService {
return Err(Status::invalid_argument("topic_id or query_id not set"));
}
let mut repos = self.catalog.repositories().await;
let req = request.into_inner();
let DeleteNamespaceRequest {
name: namespace_name,
} = request.into_inner();
debug!(%namespace_name, "Deleting namespace");
repos
.namespaces()
.delete(&req.name)
.delete(&namespace_name)
.await
.map_err(|e| match e {
iox_catalog::interface::Error::NamespaceNotFoundByName { name: _ } => {
Status::not_found(e.to_string())
}
_ => {
warn!(error=%e, %req.name, "failed to delete namespace");
warn!(error=%e, %namespace_name, "failed to delete namespace");
Status::internal(e.to_string())
}
})?;
@ -109,13 +119,19 @@ impl namespace_service_server::NamespaceService for NamespaceService {
) -> Result<Response<UpdateNamespaceRetentionResponse>, Status> {
let mut repos = self.catalog.repositories().await;
let req = request.into_inner();
let UpdateNamespaceRetentionRequest {
name: namespace_name,
retention_period_ns,
} = request.into_inner();
debug!(%namespace_name, ?retention_period_ns, "Updating namespace retention");
let namespace = repos
.namespaces()
.update_retention_period(&req.name, req.retention_period_ns)
.update_retention_period(&namespace_name, retention_period_ns)
.await
.map_err(|e| {
warn!(error=%e, %req.name, "failed to update namespace retention");
warn!(error=%e, %namespace_name, "failed to update namespace retention");
Status::not_found(e.to_string())
})?;
Ok(Response::new(UpdateNamespaceRetentionResponse {

View File

@ -76,6 +76,9 @@ pub async fn run(
if q.normalized_metrics() {
output.push("-- Results After Normalizing Metrics".into())
}
if q.normalized_filters() {
output.push("-- Results After Normalizing Filters".into())
}
let results = run_query(cluster, q).await?;
output.extend(results);
@ -264,6 +267,25 @@ async fn run_query(cluster: &MiniCluster, query: &Query) -> Result<Vec<String>>
.collect();
}
// normalize Filters, if requested
//
// Converts:
// FilterExec: time@2 < -9223372036854775808 OR time@2 > 1640995204240217000
//
// to
// FilterExec: <REDACTED>
if query.normalized_filters() {
let filter_regex = Regex::new("FilterExec: .*").expect("filter regex");
current_results = current_results
.into_iter()
.map(|s| {
filter_regex
.replace_all(&s, |_: &Captures| "FilterExec: <REDACTED>")
.to_string()
})
.collect();
}
Ok(current_results)
}
@ -277,9 +299,16 @@ pub struct Query {
/// If true, replace UUIDs with static placeholders.
normalized_uuids: bool,
/// If true, normalize timings in queries by replacing them with static placeholders.
/// If true, normalize timings in queries by replacing them with
/// static placeholders, for example:
///
/// `1s` -> `1.234ms`
normalized_metrics: bool,
/// if true, normalize filter predicates for explain plans
/// `FilterExec: <REDACTED>`
normalized_filters: bool,
/// The SQL string
sql: String,
}
@ -292,6 +321,7 @@ impl Query {
sorted_compare: false,
normalized_uuids: false,
normalized_metrics: false,
normalized_filters: false,
sql,
}
}
@ -321,6 +351,11 @@ impl Query {
pub fn normalized_metrics(&self) -> bool {
self.normalized_metrics
}
/// Use normalized filter plans
pub fn normalized_filters(&self) -> bool {
self.normalized_filters
}
}
#[derive(Debug, Default)]
@ -353,6 +388,10 @@ impl QueryBuilder {
self.query.normalized_metrics = true;
}
fn normalize_filters(&mut self) {
self.query.normalized_filters = true;
}
fn is_empty(&self) -> bool {
self.query.sql.is_empty()
}
@ -396,6 +435,9 @@ impl TestQueries {
"metrics" => {
builder.normalize_metrics();
}
"filters" => {
builder.normalize_filters();
}
_ => {}
}
}

View File

@ -174,6 +174,11 @@ pub enum Step {
/// know about the ingester, so the test needs to ask the ingester directly.
WaitForPersistedAccordingToIngester,
/// Set the namespace retention interval to a retention period,
/// specified in ns relative to `now()`. `None` represents infinite retention
/// (i.e. never drop data).
SetRetention(Option<i64>),
/// Run one hot and one cold compaction operation and wait for it to finish.
Compact,
@ -358,6 +363,17 @@ where
state.cluster.run_compaction();
info!("====Done running compaction");
}
Step::SetRetention(retention_period_ns) => {
info!("====Begin setting retention period to {retention_period_ns:?}");
let namespace = state.cluster().namespace();
let router_connection = state.cluster().router().router_grpc_connection();
let mut client = influxdb_iox_client::namespace::Client::new(router_connection);
client
.update_namespace_retention(namespace, *retention_period_ns)
.await
.expect("Error updating retention period");
info!("====Done setting retention period");
}
Step::Query { sql, expected } => {
info!("====Begin running SQL query: {}", sql);
// run query