Merge branch 'main' into pd/add-data-generator-schemas

pull/24376/head
kodiakhq[bot] 2021-12-13 15:07:06 +00:00 committed by GitHub
commit 967a41f7a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 208 additions and 13 deletions

View File

@ -16,8 +16,9 @@ We hold monthly Tech Talks that explain the project's technical underpinnings. Y
* May 2021: Catalogs - Turning a Set of Parquet Files into a Data Set [recording](https://www.youtube.com/watch?v=Zaei3l3qk0c), [slides](https://www.slideshare.net/influxdata/catalogs-turning-a-set-of-parquet-files-into-a-data-set)
* June 2021: Performance Profiling in Rust [recording](https://www.youtube.com/watch?v=_ZNcg-nAVTM), [slides](https://www.slideshare.net/influxdata/performance-profiling-in-rust)
* July 2021: Impacts of Sharding, Partitioning, Encoding & Sorting on Distributed Query Performance [recording](https://www.youtube.com/watch?v=VHYMpItvBZQ), [slides](https://www.slideshare.net/influxdata/impacts-of-sharding-partitioning-encoding-and-sorting-on-distributed-query-performance)
* September 2021: Observability of InfluxDB IOx Tracing, Metrics and System Tables [recording](https://www.youtube.com/watch?v=tB-umdJCJQc)
* October 2021: Query Processing in InfluxDB IOx [recording](https://www.youtube.com/watch?v=9DYkWuM8xco)
* September 2021: Observability of InfluxDB IOx Tracing, Metrics and System Tables [recording](https://www.youtube.com/watch?v=tB-umdJCJQc), [slides](https://www.slideshare.net/influxdata/observability-of-influxdb-iox-tracing-metrics-and-system-tables)
* October 2021: Query Processing in InfluxDB IOx [recording](https://www.youtube.com/watch?v=9DYkWuM8xco), [slides](https://www.slideshare.net/influxdata/influxdb-iox-tech-talks-query-processing-in-influxdb-iox)
* November 2021: The Impossible Dream: Easy-to-Use, Super Fast Software and Simple Implementation [recording](https://www.youtube.com/watch?v=kK_7t24dQ-Q&list=PLYt2jfZorkDp-PKBS05kf2Yx2NrRyPAAz&index=2&t=122s), [slides](https://www.slideshare.net/influxdata/influxdb-iox-tech-talks-the-impossible-dream-easytouse-super-fast-software-and-simple-implementation)
## Table of Contents:

View File

@ -6,6 +6,8 @@ use iox_data_generator::{
tag_set::GeneratedTagSets,
write::PointsWriterBuilder,
};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;
pub fn single_agent(c: &mut Criterion) {
@ -178,6 +180,7 @@ agents = [{name = "foo", sampling_interval = "1s", count = 3}]
let agent = agents.first_mut().unwrap();
let expected_points = 30000;
let counter = Arc::new(AtomicU64::new(0));
let mut group = c.benchmark_group("agent_pre_generated");
group.measurement_time(std::time::Duration::from_secs(50));
group.throughput(Throughput::Elements(expected_points));
@ -186,7 +189,7 @@ agents = [{name = "foo", sampling_interval = "1s", count = 3}]
b.iter(|| {
agent.reset_current_date_time(0);
let points_writer = points_writer.build_for_agent("foo", "foo", "foo").unwrap();
let r = block_on(agent.generate_all(points_writer, 1));
let r = block_on(agent.generate_all(points_writer, 1, Arc::clone(&counter)));
let n_points = r.expect("Could not generate data");
assert_eq!(n_points, expected_points as usize);
})

View File

@ -10,6 +10,10 @@ use crate::{
use crate::tag_set::GeneratedTagSets;
use serde_json::json;
use snafu::{ResultExt, Snafu};
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use std::time::{Duration, Instant};
use tracing::{debug, info};
@ -132,6 +136,7 @@ impl Agent {
&mut self,
mut points_writer: PointsWriter,
batch_size: usize,
counter: Arc<AtomicU64>,
) -> Result<usize> {
let mut points_this_batch = 1;
let mut total_points = 0;
@ -161,13 +166,17 @@ impl Agent {
.context(CouldNotWritePoints)?;
info!("wrote {} in {:?}", points_this_batch, batch_start.elapsed());
let total = counter.fetch_add(points_this_batch as u64, Ordering::SeqCst);
let secs = start.elapsed().as_secs();
if secs != 0 {
info!(
"written {} in {:?} for {}/sec",
"Agent {} written {} in {:?} for {}/sec. Aggregate {} in {}/sec",
self.id,
total_points,
start.elapsed(),
total_points / secs as usize
total_points / secs as usize,
total,
total / secs,
)
}
}

View File

@ -29,7 +29,7 @@
use crate::{agent::Agent, tag_set::GeneratedTagSets};
use snafu::{ResultExt, Snafu};
use std::sync::Arc;
use std::sync::{atomic::AtomicU64, Arc};
use std::{
convert::TryFrom,
time::{SystemTime, UNIX_EPOCH},
@ -132,6 +132,7 @@ pub async fn generate(
let lock = Arc::new(tokio::sync::Mutex::new(()));
let start = std::time::Instant::now();
let total_rows = Arc::new(AtomicU64::new(0));
for database_assignments in &database_agents {
let (org, bucket) = org_and_bucket_from_database(database_assignments.database);
@ -150,11 +151,12 @@ pub async fn generate(
.context(CouldNotCreateAgent)?;
info!(
"Configuring {} agents of \"{}\" to write data to org {} and bucket {} (database {})", database_assignments.database,
"Configuring {} agents of \"{}\" to write data to org {} and bucket {} (database {})",
agent_assignment.count,
agent_assignment.spec.name,
org,
bucket,
database_assignments.database,
);
for mut agent in agents.into_iter() {
@ -164,13 +166,18 @@ pub async fn generate(
let lock_ref = Arc::clone(&lock);
let total_rows = Arc::clone(&total_rows);
handles.push(tokio::task::spawn(async move {
// did this weird hack because otherwise the stdout outputs would be jumbled together garbage
if one_agent_at_a_time {
let _l = lock_ref.lock().await;
agent.generate_all(agent_points_writer, batch_size).await
agent
.generate_all(agent_points_writer, batch_size, total_rows)
.await
} else {
agent.generate_all(agent_points_writer, batch_size).await
agent
.generate_all(agent_points_writer, batch_size, total_rows)
.await
}
}));
}

View File

@ -1027,8 +1027,4 @@ mod tests {
assert_eq!(summary_chunks[0].storage, ChunkStorage::OpenMutableBuffer);
assert_eq!(summary_chunks[0].row_count, 1);
}
// todo: add tests
// . compact with deletes happening during compaction
// . replay
}

View File

@ -521,6 +521,11 @@ mod tests {
/// The partitions are by table name and partition key.
Persist(Vec<(&'static str, &'static str)>),
/// Compact object store chunks of partitions.
///
/// The partitions are by table name and partition key.
CompactObjectStore(Vec<(&'static str, &'static str)>),
/// Drop partitions.
///
/// Note that this only works for fully persisted partitions if
@ -691,6 +696,17 @@ mod tests {
}
}
}
Step::CompactObjectStore(partitions) => {
let db = &test_db.db;
for (table_name, partition_key) in partitions {
println!("Compact Object Store {}:{}", table_name, partition_key);
db.compact_object_store_partition(table_name, partition_key)
.unwrap()
.join()
.await;
}
}
Step::Drop(partitions) => {
let db = &test_db.db;
@ -2731,6 +2747,169 @@ mod tests {
.await;
}
// This test replay compact os chunks with deletes and duplicates
#[tokio::test]
async fn replay_delete_compact_os_chunks() {
ReplayTest {
steps: vec![
// --------------------------
// Create and persist chunk 1
Step::Ingest(vec![TestSequencedEntry {
sequencer_id: 0,
sequence_number: 0,
lp: "table_1,tag_partition_by=a,tag1=cupcake bar=1 10\ntable_1,tag_partition_by=a,tag1=cupcake bar=2 20",
}]),
Step::Await(vec![Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+---------+------------------+--------------------------------+",
"| bar | tag1 | tag_partition_by | time |",
"+-----+---------+------------------+--------------------------------+",
"| 1 | cupcake | a | 1970-01-01T00:00:00.000000010Z |",
"| 2 | cupcake | a | 1970-01-01T00:00:00.000000020Z |",
"+-----+---------+------------------+--------------------------------+",
],
)]),
Step::MakeWritesPersistable,
Step::Persist(vec![("table_1", "tag_partition_by_a")]),
// --------------------------
// Create and persist chunk 2
// First row of chunk 2 is a duplicate of first row of chunk 1
Step::Ingest(vec![TestSequencedEntry {
sequencer_id: 0,
sequence_number: 1,
lp: "table_1,tag_partition_by=a,tag1=cupcake bar=100 10\ntable_1,tag_partition_by=a,tag1=cookie bar=4 30",
}]),
// There are 4 rows in two chunks be only 3 rows are return due to duplicate elimination
Step::Await(vec![Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+---------+------------------+--------------------------------+",
"| bar | tag1 | tag_partition_by | time |",
"+-----+---------+------------------+--------------------------------+",
"| 2 | cupcake | a | 1970-01-01T00:00:00.000000020Z |",
"| 4 | cookie | a | 1970-01-01T00:00:00.000000030Z |",
"| 100 | cupcake | a | 1970-01-01T00:00:00.000000010Z |",
"+-----+---------+------------------+--------------------------------+",
],
)]),
Step::MakeWritesPersistable,
Step::Persist(vec![("table_1", "tag_partition_by_a")]),
// --------------------------
// Verify chunks and rows in system table
Step::Assert(vec![
Check::Query(
"select storage, min_value, max_value, row_count from system.chunk_columns where table_name = 'table_1' and column_name = 'time' order by storage, max_value",
vec![
"+--------------------------+-----------+-----------+-----------+",
"| storage | min_value | max_value | row_count |",
"+--------------------------+-----------+-----------+-----------+",
"| ReadBufferAndObjectStore | 10 | 20 | 2 |",
"| ReadBufferAndObjectStore | 10 | 30 | 2 |",
"+--------------------------+-----------+-----------+-----------+",
],
),
]),
// --------------------------
// Delete second row of chunk 1
Step::Delete(vec![TestDelete {
sequencer_id: 0,
sequence_number: 2,
table_name: Some("table_1"),
predicate: DeletePredicate {
range: TimestampRange { start: 19, end: 21 },
exprs: vec![],
},
}]),
// There are 4 rows in two chunks but only 2 rows are return due to duplicate and soft-delete elimination
Step::Await(vec![Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+---------+------------------+--------------------------------+",
"| bar | tag1 | tag_partition_by | time |",
"+-----+---------+------------------+--------------------------------+",
"| 4 | cookie | a | 1970-01-01T00:00:00.000000030Z |",
"| 100 | cupcake | a | 1970-01-01T00:00:00.000000010Z |",
"+-----+---------+------------------+--------------------------------+",
],
)]),
// System table still includes 4 rows
Step::Assert(vec![
Check::Query(
"select storage, min_value, max_value, row_count from system.chunk_columns where table_name = 'table_1' and column_name = 'time' order by storage, max_value",
vec![
"+--------------------------+-----------+-----------+-----------+",
"| storage | min_value | max_value | row_count |",
"+--------------------------+-----------+-----------+-----------+",
"| ReadBufferAndObjectStore | 10 | 20 | 2 |",
"| ReadBufferAndObjectStore | 10 | 30 | 2 |",
"+--------------------------+-----------+-----------+-----------+",
],
),
]),
// --------------------------
// Compact those 2 chunks
Step::CompactObjectStore(vec![("table_1", "tag_partition_by_a")]),
// Still has 2 rows returned
Step::Await(vec![Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+---------+------------------+--------------------------------+",
"| bar | tag1 | tag_partition_by | time |",
"+-----+---------+------------------+--------------------------------+",
"| 4 | cookie | a | 1970-01-01T00:00:00.000000030Z |",
"| 100 | cupcake | a | 1970-01-01T00:00:00.000000010Z |",
"+-----+---------+------------------+--------------------------------+",
],
)]),
// But only one OS-only chunk with 2 rows now. The duplicated and deleted rows were purged
Step::Assert(vec![
Check::Query(
"select storage, min_value, max_value, row_count from system.chunk_columns where table_name = 'table_1' and column_name = 'time' order by storage, max_value",
vec![
"+-----------------+-----------+-----------+-----------+",
"| storage | min_value | max_value | row_count |",
"+-----------------+-----------+-----------+-----------+",
"| ObjectStoreOnly | 10 | 30 | 2 |",
"+-----------------+-----------+-----------+-----------+",
],
),
]),
// --------------------------
// Restart and replay the preserved catalog
Step::Restart,
Step::Replay,
Step::Assert(vec![Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+---------+------------------+--------------------------------+",
"| bar | tag1 | tag_partition_by | time |",
"+-----+---------+------------------+--------------------------------+",
"| 4 | cookie | a | 1970-01-01T00:00:00.000000030Z |",
"| 100 | cupcake | a | 1970-01-01T00:00:00.000000010Z |",
"+-----+---------+------------------+--------------------------------+",
],
)]),
// Should have one OS-only chunk
Step::Assert(vec![
Check::Query(
"select storage, min_value, max_value, row_count from system.chunk_columns where table_name = 'table_1' and column_name = 'time' order by storage, max_value",
vec![
"+-----------------+-----------+-----------+-----------+",
"| storage | min_value | max_value | row_count |",
"+-----------------+-----------+-----------+-----------+",
"| ObjectStoreOnly | 10 | 30 | 2 |",
"+-----------------+-----------+-----------+-----------+",
],
),
]),
],
..Default::default()
}
.run()
.await;
}
#[tokio::test]
async fn replay_fail_sequencers_change() {
// create write buffer w/ sequencer 0 and 1