Merge branch 'main' into cn/last-available

pull/24376/head
kodiakhq[bot] 2022-05-23 13:08:19 +00:00 committed by GitHub
commit a06746c715
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 146 additions and 66 deletions

View File

@ -131,6 +131,7 @@ jobs:
doc: doc:
docker: docker:
- image: quay.io/influxdb/rust:ci - image: quay.io/influxdb/rust:ci
resource_class: medium+ # use of a smaller executor runs out of memory
environment: environment:
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway. # Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
CARGO_INCREMENTAL: "0" CARGO_INCREMENTAL: "0"

12
Cargo.lock generated
View File

@ -2008,9 +2008,9 @@ dependencies = [
[[package]] [[package]]
name = "http-body" name = "http-body"
version = "0.4.4" version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
dependencies = [ dependencies = [
"bytes", "bytes",
"http", "http",
@ -4416,9 +4416,9 @@ dependencies = [
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.5.5" version = "1.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286" checksum = "d83f127d94bdbcda4c8cc2e50f6f84f4b611f69c902699ca385a39c3a75f9ff1"
dependencies = [ dependencies = [
"aho-corasick", "aho-corasick",
"memchr", "memchr",
@ -4436,9 +4436,9 @@ dependencies = [
[[package]] [[package]]
name = "regex-syntax" name = "regex-syntax"
version = "0.6.25" version = "0.6.26"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" checksum = "49b3de9ec5dc0a3417da371aab17d729997c15010e7fd24ff707773a33bddb64"
[[package]] [[package]]
name = "remove_dir_all" name = "remove_dir_all"

View File

@ -1887,8 +1887,8 @@ mod tests {
// They should be 2 groups // They should be 2 groups
assert_eq!(groups.len(), 2, "There should have been two group"); assert_eq!(groups.len(), 2, "There should have been two group");
groups[0].parquet_files.contains(&pf1); assert!(groups[0].parquet_files.contains(&pf1));
groups[1].parquet_files.contains(&pf2); assert!(groups[1].parquet_files.contains(&pf2));
} }
#[test] #[test]

View File

@ -1,10 +1,11 @@
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)] #![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
// `clippy::use_self` is deliberately excluded from the lints this crate uses.
// See <https://github.com/rust-lang/rust-clippy/issues/6902>.
#![warn( #![warn(
missing_copy_implementations, missing_copy_implementations,
missing_debug_implementations, missing_debug_implementations,
missing_docs, missing_docs,
clippy::explicit_iter_loop, clippy::explicit_iter_loop,
clippy::use_self,
clippy::clone_on_ref_ptr, clippy::clone_on_ref_ptr,
clippy::future_not_send clippy::future_not_send
)] )]

View File

@ -71,10 +71,13 @@ impl ServerFixture {
let shared_server = SHARED_SERVER.get_or_init(|| parking_lot::Mutex::new(Weak::new())); let shared_server = SHARED_SERVER.get_or_init(|| parking_lot::Mutex::new(Weak::new()));
let mut shared_server = shared_server.lock(); let shared_upgraded = {
let locked = shared_server.lock();
locked.upgrade()
};
// is a shared server already present? // is a shared server already present?
let server = match shared_server.upgrade() { let server = match shared_upgraded {
Some(server) => server, Some(server) => server,
None => { None => {
// if not, create one // if not, create one
@ -86,11 +89,11 @@ impl ServerFixture {
// save a reference for other threads that may want to // save a reference for other threads that may want to
// use this server, but don't prevent it from being // use this server, but don't prevent it from being
// destroyed when going out of scope // destroyed when going out of scope
let mut shared_server = shared_server.lock();
*shared_server = Arc::downgrade(&server); *shared_server = Arc::downgrade(&server);
server server
} }
}; };
std::mem::drop(shared_server);
Self { server } Self { server }
} }

View File

@ -2206,8 +2206,8 @@ mod tests {
.await .await
.unwrap(); .unwrap();
{ {
let tables = data.tables.read(); let table_data = data.table_data("mem").unwrap();
let table = tables.get("mem").unwrap().read().await; let table = table_data.read().await;
let p = table.partition_data.get("1970-01-01").unwrap(); let p = table.partition_data.get("1970-01-01").unwrap();
assert_eq!( assert_eq!(
p.data.max_persisted_sequence_number, p.data.max_persisted_sequence_number,
@ -2229,8 +2229,8 @@ mod tests {
.await .await
.unwrap(); .unwrap();
let tables = data.tables.read(); let table_data = data.table_data("mem").unwrap();
let table = tables.get("mem").unwrap().read().await; let table = table_data.read().await;
let partition = table.partition_data.get("1970-01-01").unwrap(); let partition = table.partition_data.get("1970-01-01").unwrap();
assert_eq!( assert_eq!(
partition.data.buffer.as_ref().unwrap().min_sequence_number, partition.data.buffer.as_ref().unwrap().min_sequence_number,

View File

@ -392,7 +392,7 @@ mod test {
fn quote_not_printable() { fn quote_not_printable() {
assert_eq!(quote_and_escape("foo\nbar"), r#""foo\nbar""#); assert_eq!(quote_and_escape("foo\nbar"), r#""foo\nbar""#);
assert_eq!(quote_and_escape("foo\r\nbar"), r#""foo\r\nbar""#); assert_eq!(quote_and_escape("foo\r\nbar"), r#""foo\r\nbar""#);
assert_eq!(quote_and_escape("foo\0bar"), r#""foo\u{0}bar""#); assert_eq!(quote_and_escape("foo\0bar"), r#""foo\0bar""#);
} }
#[test] #[test]

View File

@ -13,7 +13,7 @@ itertools = "0.10.2"
lazy_static = "1.4.0" lazy_static = "1.4.0"
observability_deps = { path = "../observability_deps" } observability_deps = { path = "../observability_deps" }
regex = "1" regex = "1"
regex-syntax = "0.6.25" regex-syntax = "0.6.26"
schema = { path = "../schema" } schema = { path = "../schema" }
snafu = "0.7" snafu = "0.7"
workspace-hack = { path = "../workspace-hack"} workspace-hack = { path = "../workspace-hack"}

View File

@ -0,0 +1,8 @@
-- Test Setup: TwoChunksMissingColumns
-- SQL: SELECT * from "table" order by time;
+--------+--------+--------+------+------+------+--------------------------------+
| field1 | field2 | field3 | tag1 | tag2 | tag3 | time |
+--------+--------+--------+------+------+------+--------------------------------+
| 10 | 11 | | a | b | | 1970-01-01T00:00:00.000000100Z |
| 20 | | 22 | a | | c | 1970-01-01T00:00:00.000000200Z |
+--------+--------+--------+------+------+------+--------------------------------+

View File

@ -0,0 +1,5 @@
-- Basic query tests
-- IOX_SETUP: TwoChunksMissingColumns
-- query data
SELECT * from "table" order by time;

View File

@ -2,7 +2,7 @@
//! This file is auto generated by query_tests/generate. //! This file is auto generated by query_tests/generate.
//! Do not edit manually --> will result in sadness //! Do not edit manually --> will result in sadness
use std::path::Path; use std::path::Path;
use crate::runner::{Runner, make_output_path, read_file}; use crate::runner::Runner;
#[tokio::test] #[tokio::test]
// Tests from "basic.sql", // Tests from "basic.sql",
@ -117,9 +117,9 @@ async fn test_cases_new_sql_system_tables_sql() {
} }
#[tokio::test] #[tokio::test]
// Tests from "two_chunks.sql", // Tests from "pushdown.sql",
async fn test_cases_two_sql() { async fn test_cases_pushdown_sql() {
let input_path = Path::new("cases").join("in").join("two_chunks.sql"); let input_path = Path::new("cases").join("in").join("pushdown.sql");
let mut runner = Runner::new(); let mut runner = Runner::new();
runner runner
.run(input_path) .run(input_path)
@ -134,28 +134,6 @@ async fn test_cases_two_sql() {
// Tests from "several_chunks.sql", // Tests from "several_chunks.sql",
async fn test_cases_several_chunks_sql() { async fn test_cases_several_chunks_sql() {
let input_path = Path::new("cases").join("in").join("several_chunks.sql"); let input_path = Path::new("cases").join("in").join("several_chunks.sql");
let output_path = make_output_path(&input_path).unwrap();
let expected_path = input_path.with_extension("expected");
let mut runner = Runner::new();
let result = runner
.run(input_path)
.await;
if result.is_err() {
let output_contents = read_file(&output_path);
let expected_contents = read_file(&expected_path);
pretty_assertions::assert_eq!(expected_contents, output_contents);
} else {
runner
.flush()
.expect("flush worked");
}
}
#[tokio::test]
// Tests from "pushdown.sql",
async fn test_cases_pushdown_sql() {
let input_path = Path::new("cases").join("in").join("pushdown.sql");
let mut runner = Runner::new(); let mut runner = Runner::new();
runner runner
.run(input_path) .run(input_path)
@ -193,3 +171,31 @@ async fn test_cases_timestamps_sql() {
.flush() .flush()
.expect("flush worked"); .expect("flush worked");
} }
#[tokio::test]
// Tests from "two_chunks.sql",
async fn test_cases_two_chunks_sql() {
let input_path = Path::new("cases").join("in").join("two_chunks.sql");
let mut runner = Runner::new();
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
// Tests from "two_chunks_missing_columns.sql",
async fn test_cases_two_chunks_missing_columns_sql() {
let input_path = Path::new("cases").join("in").join("two_chunks_missing_columns.sql");
let mut runner = Runner::new();
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}

View File

@ -61,6 +61,7 @@ pub fn get_all_setups() -> &'static HashMap<String, Arc<dyn DbSetup>> {
register_setup!(OneMeasurementRealisticTimes), register_setup!(OneMeasurementRealisticTimes),
register_setup!(TwoMeasurementsManyFieldsTwoChunks), register_setup!(TwoMeasurementsManyFieldsTwoChunks),
register_setup!(ManyFieldsSeveralChunks), register_setup!(ManyFieldsSeveralChunks),
register_setup!(TwoChunksMissingColumns),
] ]
.into_iter() .into_iter()
.map(|(name, setup)| (name.to_string(), setup as Arc<dyn DbSetup>)) .map(|(name, setup)| (name.to_string(), setup as Arc<dyn DbSetup>))

View File

@ -1322,3 +1322,30 @@ impl DbSetup for MeasurementForDefect2890 {
all_scenarios_for_one_chunk(vec![], vec![], lp, "mm", partition_key).await all_scenarios_for_one_chunk(vec![], vec![], lp, "mm", partition_key).await
} }
} }
#[derive(Debug)]
pub struct TwoChunksMissingColumns {}
#[async_trait]
impl DbSetup for TwoChunksMissingColumns {
async fn make(&self) -> Vec<DbScenario> {
let partition_key1 = "a";
let partition_key2 = "b";
let lp_lines1 = vec!["table,tag1=a,tag2=b field1=10,field2=11 100"];
let lp_lines2 = vec!["table,tag1=a,tag3=c field1=20,field3=22 200"];
make_n_chunks_scenario(&[
ChunkData {
lp_lines: lp_lines1,
partition_key: partition_key1,
..Default::default()
},
ChunkData {
lp_lines: lp_lines2,
partition_key: partition_key2,
..Default::default()
},
])
.await
}
}

View File

@ -94,7 +94,7 @@ impl<'a, 'b> ChunkData<'a, 'b> {
} }
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ChunkStage { pub enum ChunkStage {
/// In parquet file. /// In parquet file.
Parquet, Parquet,
@ -383,25 +383,17 @@ pub async fn make_n_chunks_scenario(chunks: &[ChunkData<'_, '_>]) -> Vec<DbScena
let mut scenarios = vec![]; let mut scenarios = vec![];
for stages in ChunkStage::all() 'stage_combinations: for stages in ChunkStage::all()
.into_iter() .into_iter()
.combinations_with_replacement(n_stages_unset) .combinations_with_replacement(n_stages_unset)
.flat_map(|v| v.into_iter().permutations(n_stages_unset))
.unique()
{ {
// filter out unordered stages // combine stages w/ chunks
if !stages.windows(2).all(|stages| { let chunks_orig = chunks;
stages[0] let mut chunks = Vec::with_capacity(chunks.len());
.partial_cmp(&stages[1])
.map(|o| o.is_le())
.unwrap_or_default()
}) {
continue;
}
let mut scenario_name = format!("{} chunks:", chunks.len());
let mut stages_it = stages.iter(); let mut stages_it = stages.iter();
let mut mock_ingester = MockIngester::new().await; for chunk_data in chunks_orig {
for chunk_data in chunks {
let mut chunk_data = chunk_data.clone(); let mut chunk_data = chunk_data.clone();
if chunk_data.chunk_stage.is_none() { if chunk_data.chunk_stage.is_none() {
@ -411,13 +403,43 @@ pub async fn make_n_chunks_scenario(chunks: &[ChunkData<'_, '_>]) -> Vec<DbScena
let chunk_data = chunk_data.replace_begin_and_end_delete_times(); let chunk_data = chunk_data.replace_begin_and_end_delete_times();
chunks.push(chunk_data);
}
assert!(stages_it.next().is_none(), "generated too many stages");
// filter out unordered stages
let mut stage_by_partition = HashMap::<&str, Vec<ChunkStage>>::new();
for chunk_data in &chunks {
stage_by_partition
.entry(chunk_data.partition_key)
.or_default()
.push(
chunk_data
.chunk_stage
.expect("Stage should be initialized by now"),
);
}
for stages in stage_by_partition.values() {
if !stages.windows(2).all(|stages| {
stages[0]
.partial_cmp(&stages[1])
.map(|o| o.is_le())
.unwrap_or_default()
}) {
continue 'stage_combinations;
}
}
// build scenario
let mut scenario_name = format!("{} chunks:", chunks.len());
let mut mock_ingester = MockIngester::new().await;
for chunk_data in chunks {
let name = make_chunk(&mut mock_ingester, chunk_data).await; let name = make_chunk(&mut mock_ingester, chunk_data).await;
write!(&mut scenario_name, ", {}", name).unwrap(); write!(&mut scenario_name, ", {}", name).unwrap();
} }
assert!(stages_it.next().is_none(), "generated too many stages");
let db = mock_ingester.into_query_namespace().await; let db = mock_ingester.into_query_namespace().await;
scenarios.push(DbScenario { scenario_name, db }); scenarios.push(DbScenario { scenario_name, db });
} }
@ -550,7 +572,10 @@ async fn make_chunk(mock_ingester: &mut MockIngester, chunk: ChunkData<'_, '_>)
} }
} }
let mut name = format!("Chunk {}", chunk_stage); let mut name = format!(
"Chunk stage={} partition={}",
chunk_stage, chunk.partition_key
);
let n_preds = chunk.preds.len(); let n_preds = chunk.preds.len();
if n_preds > 0 { if n_preds > 0 {
let delete_names: Vec<_> = chunk let delete_names: Vec<_> = chunk

View File

@ -1,3 +1,3 @@
[toolchain] [toolchain]
channel = "1.60" channel = "1.61"
components = [ "rustfmt", "clippy" ] components = [ "rustfmt", "clippy" ]

View File

@ -20,7 +20,7 @@ arrow = { version = "14.0.0", features = ["prettyprint"] }
async-trait = "0.1" async-trait = "0.1"
futures = "0.3" futures = "0.3"
prost = "0.10" prost = "0.10"
regex = "1.5.4" regex = "1.5.6"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.81" serde_json = "1.0.81"
snafu = "0.7" snafu = "0.7"

View File

@ -294,9 +294,12 @@ unsafe impl<R: lock_api::RawRwLockUpgrade + Sized> lock_api::RawRwLockUpgrade
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::time::Duration; // Clippy isn't recognizing the explicit drops; none of these locks are actually being held
// across await points. See <https://github.com/rust-lang/rust-clippy/issues/6446>
#![allow(clippy::await_holding_lock)]
use super::*; use super::*;
use std::time::Duration;
#[test] #[test]
fn test_counts() { fn test_counts() {