From a63eb53ac50e83fcfb5a15985b4a92570f7a4d17 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 2 Sep 2021 16:52:52 +0200 Subject: [PATCH 1/3] feat: forward connection config to Kafka write buffer --- tests/end_to_end_cases/scenario.rs | 3 ++- write_buffer/src/config.rs | 12 +++++++++--- write_buffer/src/kafka.rs | 23 ++++++++++++++++++----- 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/tests/end_to_end_cases/scenario.rs b/tests/end_to_end_cases/scenario.rs index 069a716eed..0f9613fdaa 100644 --- a/tests/end_to_end_cases/scenario.rs +++ b/tests/end_to_end_cases/scenario.rs @@ -705,7 +705,8 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser .unwrap(); // ingest data as mixed throughput - let producer = KafkaBufferProducer::new(kafka_connection, db_name).unwrap(); + let producer = + KafkaBufferProducer::new(kafka_connection, db_name, &Default::default()).unwrap(); producer .store_entry( &lp_to_entries("table_1,partition_by=a foo=1 10", &partition_template) diff --git a/write_buffer/src/config.rs b/write_buffer/src/config.rs index 7f22c76092..3e40fcf576 100644 --- a/write_buffer/src/config.rs +++ b/write_buffer/src/config.rs @@ -109,7 +109,8 @@ impl WriteBufferConfigFactory { let writer = match &cfg.type_[..] { "kafka" => { - let kafka_buffer = KafkaBufferProducer::new(&cfg.connection, db_name)?; + let kafka_buffer = + KafkaBufferProducer::new(&cfg.connection, db_name, &cfg.connection_config)?; Arc::new(kafka_buffer) as _ } "mock" => match self.get_mock(&cfg.connection)? { @@ -140,8 +141,13 @@ impl WriteBufferConfigFactory { let reader = match &cfg.type_[..] { "kafka" => { - let kafka_buffer = - KafkaBufferConsumer::new(&cfg.connection, server_id, db_name).await?; + let kafka_buffer = KafkaBufferConsumer::new( + &cfg.connection, + server_id, + db_name, + &cfg.connection_config, + ) + .await?; Box::new(kafka_buffer) as _ } "mock" => match self.get_mock(&cfg.connection)? { diff --git a/write_buffer/src/kafka.rs b/write_buffer/src/kafka.rs index feff39ce6a..fd66eb6986 100644 --- a/write_buffer/src/kafka.rs +++ b/write_buffer/src/kafka.rs @@ -1,5 +1,5 @@ use std::{ - collections::BTreeMap, + collections::{BTreeMap, HashMap}, convert::{TryFrom, TryInto}, sync::Arc, time::Duration, @@ -90,6 +90,7 @@ impl KafkaBufferProducer { pub fn new( conn: impl Into, database_name: impl Into, + connection_config: &HashMap, ) -> Result { let conn = conn.into(); let database_name = database_name.into(); @@ -102,6 +103,9 @@ impl KafkaBufferProducer { cfg.set("request.required.acks", "all"); // equivalent to acks=-1 cfg.set("compression.type", "snappy"); cfg.set("statistics.interval.ms", "15000"); + for (k, v) in connection_config { + cfg.set(k, v); + } let producer: FutureProducer = cfg.create()?; @@ -246,6 +250,7 @@ impl KafkaBufferConsumer { conn: impl Into + Send + Sync, server_id: ServerId, database_name: impl Into + Send + Sync, + connection_config: &HashMap, ) -> Result { let conn = conn.into(); let database_name = database_name.into(); @@ -256,6 +261,9 @@ impl KafkaBufferConsumer { cfg.set("enable.auto.commit", "false"); cfg.set("statistics.interval.ms", "15000"); cfg.set("queued.max.messages.kbytes", "10000"); + for (k, v) in connection_config { + cfg.set(k, v); + } // Create a unique group ID for this database's consumer as we don't want to create // consumer groups. @@ -484,15 +492,20 @@ mod tests { type Reading = KafkaBufferConsumer; fn writing(&self) -> Self::Writing { - KafkaBufferProducer::new(&self.conn, &self.database_name).unwrap() + KafkaBufferProducer::new(&self.conn, &self.database_name, &Default::default()).unwrap() } async fn reading(&self) -> Self::Reading { let server_id = self.server_id_counter.fetch_add(1, Ordering::SeqCst); let server_id = ServerId::try_from(server_id).unwrap(); - KafkaBufferConsumer::new(&self.conn, server_id, &self.database_name) - .await - .unwrap() + KafkaBufferConsumer::new( + &self.conn, + server_id, + &self.database_name, + &Default::default(), + ) + .await + .unwrap() } } From 3c968ac092cb63d02ce4736449adc43d247b4c6f Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Fri, 3 Sep 2021 09:15:49 +0200 Subject: [PATCH 2/3] feat: correctly account MUB sizes Fixes #1565. --- arrow_util/src/string.rs | 2 +- mutable_buffer/src/chunk.rs | 56 +++++++++---- mutable_buffer/src/column.rs | 23 +++-- query_tests/src/sql.rs | 22 ++--- server/src/db.rs | 31 ++++--- server/src/db/catalog/chunk.rs | 14 ++-- server/src/db/catalog/metrics.rs | 102 ++++++++++++++++------- server/src/db/chunk.rs | 8 +- server/src/db/replay.rs | 2 +- tests/end_to_end_cases/management_api.rs | 6 +- tests/end_to_end_cases/management_cli.rs | 2 +- 11 files changed, 171 insertions(+), 97 deletions(-) diff --git a/arrow_util/src/string.rs b/arrow_util/src/string.rs index 33a9c9b67d..600b60c4fe 100644 --- a/arrow_util/src/string.rs +++ b/arrow_util/src/string.rs @@ -93,7 +93,7 @@ impl + FromPrimitive + Zero> PackedStringArray { /// Return the amount of memory in bytes taken up by this array pub fn size(&self) -> usize { - self.storage.len() + self.offsets.len() * std::mem::size_of::() + self.storage.capacity() + self.offsets.capacity() * std::mem::size_of::() } pub fn into_inner(self) -> (Vec, String) { diff --git a/mutable_buffer/src/chunk.rs b/mutable_buffer/src/chunk.rs index 6f23c8ff46..554f39e52a 100644 --- a/mutable_buffer/src/chunk.rs +++ b/mutable_buffer/src/chunk.rs @@ -126,23 +126,23 @@ impl MBChunk { Ok(()) } - /// Returns a queryable snapshot of this chunk + /// Returns a queryable snapshot of this chunk and an indicator if the snapshot was just cached. #[cfg(not(feature = "nocache"))] - pub fn snapshot(&self) -> Arc { + pub fn snapshot(&self) -> (Arc, bool) { let mut guard = self.snapshot.lock(); if let Some(snapshot) = &*guard { - return Arc::clone(snapshot); + return (Arc::clone(snapshot), false); } let snapshot = Arc::new(ChunkSnapshot::new(self)); *guard = Some(Arc::clone(&snapshot)); - snapshot + (snapshot, true) } - /// Returns a queryable snapshot of this chunk + /// Returns a queryable snapshot of this chunk and an indicator if the snapshot was just cached. #[cfg(feature = "nocache")] - pub fn snapshot(&self) -> Arc { - Arc::new(ChunkSnapshot::new(self)) + pub fn snapshot(&self) -> (Arc, bool) { + (Arc::new(ChunkSnapshot::new(self)), false) } /// Return the name of the table in this chunk @@ -227,14 +227,26 @@ impl MBChunk { /// Return the approximate memory size of the chunk, in bytes including the /// dictionary, tables, and their rows. /// + /// This includes the size of `self`. + /// /// Note: This does not include the size of any cached ChunkSnapshot pub fn size(&self) -> usize { - // TODO: Better accounting of non-column data (#1565) - self.columns + let size_self = std::mem::size_of::(); + + let size_columns = self + .columns .iter() - .map(|(k, v)| k.len() + v.size()) - .sum::() - + self.table_name.len() + .map(|(k, v)| k.capacity() + v.size()) + .sum::(); + + let size_table_name = self.table_name.len(); + + let snapshot_size = { + let guard = self.snapshot.lock(); + guard.as_ref().map(|snapshot| snapshot.size()).unwrap_or(0) + }; + + size_self + size_columns + size_table_name + snapshot_size } /// Returns an iterator over (column_name, estimated_size) for all @@ -814,12 +826,16 @@ mod tests { let lp = vec!["cpu,host=a val=23 1", "cpu,host=b val=2 1"].join("\n"); let mut chunk = write_lp_to_new_chunk(&lp).unwrap(); - let s1 = chunk.snapshot(); - let s2 = chunk.snapshot(); + let (s1, c1) = chunk.snapshot(); + assert!(c1); + let (s2, c2) = chunk.snapshot(); + assert!(!c2); write_lp_to_chunk(&lp, &mut chunk).unwrap(); - let s3 = chunk.snapshot(); - let s4 = chunk.snapshot(); + let (s3, c3) = chunk.snapshot(); + assert!(c3); + let (s4, c4) = chunk.snapshot(); + assert!(!c4); assert_eq!(Arc::as_ptr(&s1), Arc::as_ptr(&s2)); assert_ne!(Arc::as_ptr(&s1), Arc::as_ptr(&s3)); @@ -846,8 +862,12 @@ mod tests { write_lp_to_chunk(&lp, &mut chunk).unwrap(); let s3 = chunk.size(); - // Should increase by a constant amount each time - assert_eq!(s2 - s1, s3 - s2); + // Should increase or stay identical (if array capacities are sufficient) each time + assert!(s2 >= s1); + assert!(s3 >= s2); + + // also assume that we wrote enough data to bump the capacity at least once + assert!(s3 > s1); } #[test] diff --git a/mutable_buffer/src/column.rs b/mutable_buffer/src/column.rs index c433a0fef3..c398f1161b 100644 --- a/mutable_buffer/src/column.rs +++ b/mutable_buffer/src/column.rs @@ -319,24 +319,29 @@ impl Column { } } - /// The approximate memory size of the data in the column. Note that - /// the space taken for the tag string values is represented in - /// the dictionary size in the chunk that holds the table that has this - /// column. The size returned here is only for their identifiers. + /// The approximate memory size of the data in the column. + /// + /// This includes the size of `self`. pub fn size(&self) -> usize { let data_size = match &self.data { - ColumnData::F64(v, stats) => mem::size_of::() * v.len() + mem::size_of_val(&stats), - ColumnData::I64(v, stats) => mem::size_of::() * v.len() + mem::size_of_val(&stats), - ColumnData::U64(v, stats) => mem::size_of::() * v.len() + mem::size_of_val(&stats), + ColumnData::F64(v, stats) => { + mem::size_of::() * v.capacity() + mem::size_of_val(&stats) + } + ColumnData::I64(v, stats) => { + mem::size_of::() * v.capacity() + mem::size_of_val(&stats) + } + ColumnData::U64(v, stats) => { + mem::size_of::() * v.capacity() + mem::size_of_val(&stats) + } ColumnData::Bool(v, stats) => v.byte_len() + mem::size_of_val(&stats), ColumnData::Tag(v, dictionary, stats) => { - mem::size_of::() * v.len() + dictionary.size() + mem::size_of_val(&stats) + mem::size_of::() * v.capacity() + dictionary.size() + mem::size_of_val(&stats) } ColumnData::String(v, stats) => { v.size() + mem::size_of_val(&stats) + stats.string_size() } }; - data_size + self.valid.byte_len() + mem::size_of::() + data_size + self.valid.byte_len() } pub fn to_arrow(&self) -> Result { diff --git a/query_tests/src/sql.rs b/query_tests/src/sql.rs index 17d2807e62..47c95f1f24 100644 --- a/query_tests/src/sql.rs +++ b/query_tests/src/sql.rs @@ -312,8 +312,8 @@ async fn sql_select_from_system_chunks() { "+----+---------------+------------+-------------------+--------------+-----------+", "| id | partition_key | table_name | storage | memory_bytes | row_count |", "+----+---------------+------------+-------------------+--------------+-----------+", - "| 0 | 1970-01-01T00 | h2o | OpenMutableBuffer | 213 | 3 |", - "| 0 | 1970-01-01T00 | o2 | OpenMutableBuffer | 177 | 2 |", + "| 0 | 1970-01-01T00 | h2o | OpenMutableBuffer | 1639 | 3 |", + "| 0 | 1970-01-01T00 | o2 | OpenMutableBuffer | 1635 | 2 |", "+----+---------------+------------+-------------------+--------------+-----------+", ]; run_sql_test_case!( @@ -368,15 +368,15 @@ async fn sql_select_from_system_chunk_columns() { "| 1970-01-01T00 | 0 | h2o | state | ReadBuffer | 2 | 0 | MA | MA | 347 |", "| 1970-01-01T00 | 0 | h2o | temp | ReadBuffer | 2 | 1 | 70.4 | 70.4 | 471 |", "| 1970-01-01T00 | 0 | h2o | time | ReadBuffer | 2 | 0 | 50 | 250 | 110 |", - "| 1970-01-01T00 | 0 | o2 | city | OpenMutableBuffer | 2 | 1 | Boston | Boston | 35 |", - "| 1970-01-01T00 | 0 | o2 | reading | OpenMutableBuffer | 2 | 1 | 51 | 51 | 25 |", - "| 1970-01-01T00 | 0 | o2 | state | OpenMutableBuffer | 2 | 0 | CA | MA | 41 |", - "| 1970-01-01T00 | 0 | o2 | temp | OpenMutableBuffer | 2 | 0 | 53.4 | 79 | 25 |", - "| 1970-01-01T00 | 0 | o2 | time | OpenMutableBuffer | 2 | 0 | 50 | 300 | 25 |", - "| 1970-01-01T00 | 1 | h2o | city | OpenMutableBuffer | 1 | 0 | Boston | Boston | 31 |", - "| 1970-01-01T00 | 1 | h2o | other_temp | OpenMutableBuffer | 1 | 0 | 72.4 | 72.4 | 17 |", - "| 1970-01-01T00 | 1 | h2o | state | OpenMutableBuffer | 1 | 0 | CA | CA | 27 |", - "| 1970-01-01T00 | 1 | h2o | time | OpenMutableBuffer | 1 | 0 | 350 | 350 | 17 |", + "| 1970-01-01T00 | 0 | o2 | city | OpenMutableBuffer | 2 | 1 | Boston | Boston | 309 |", + "| 1970-01-01T00 | 0 | o2 | reading | OpenMutableBuffer | 2 | 1 | 51 | 51 | 297 |", + "| 1970-01-01T00 | 0 | o2 | state | OpenMutableBuffer | 2 | 0 | CA | MA | 313 |", + "| 1970-01-01T00 | 0 | o2 | temp | OpenMutableBuffer | 2 | 0 | 53.4 | 79 | 297 |", + "| 1970-01-01T00 | 0 | o2 | time | OpenMutableBuffer | 2 | 0 | 50 | 300 | 297 |", + "| 1970-01-01T00 | 1 | h2o | city | OpenMutableBuffer | 1 | 0 | Boston | Boston | 309 |", + "| 1970-01-01T00 | 1 | h2o | other_temp | OpenMutableBuffer | 1 | 0 | 72.4 | 72.4 | 297 |", + "| 1970-01-01T00 | 1 | h2o | state | OpenMutableBuffer | 1 | 0 | CA | CA | 309 |", + "| 1970-01-01T00 | 1 | h2o | time | OpenMutableBuffer | 1 | 0 | 350 | 350 | 297 |", "+---------------+----------+------------+-------------+-------------------+-----------+------------+-----------+-----------+--------------+", ]; run_sql_test_case!( diff --git a/server/src/db.rs b/server/src/db.rs index 6016217168..20e2bf8fc0 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -2107,13 +2107,18 @@ mod tests { assert_metric("catalog_loaded_rows", "object_store", 0.0); // verify chunk size updated - catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 44).unwrap(); + catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 700) + .unwrap(); // write into same chunk again. write_lp(db.as_ref(), "cpu bar=2 20").await; + write_lp(db.as_ref(), "cpu bar=3 30").await; + write_lp(db.as_ref(), "cpu bar=4 40").await; + write_lp(db.as_ref(), "cpu bar=5 50").await; // verify chunk size updated - catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 60).unwrap(); + catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 764) + .unwrap(); // Still only one chunk open test_db @@ -2131,7 +2136,7 @@ mod tests { assert_metric("catalog_loaded_chunks", "mutable_buffer", 1.0); assert_metric("catalog_loaded_chunks", "read_buffer", 0.0); assert_metric("catalog_loaded_chunks", "object_store", 0.0); - assert_metric("catalog_loaded_rows", "mutable_buffer", 2.0); + assert_metric("catalog_loaded_rows", "mutable_buffer", 5.0); assert_metric("catalog_loaded_rows", "read_buffer", 0.0); assert_metric("catalog_loaded_rows", "object_store", 0.0); @@ -2153,7 +2158,7 @@ mod tests { assert_metric("catalog_loaded_chunks", "mutable_buffer", 1.0); assert_metric("catalog_loaded_chunks", "read_buffer", 0.0); assert_metric("catalog_loaded_chunks", "object_store", 0.0); - assert_metric("catalog_loaded_rows", "mutable_buffer", 2.0); + assert_metric("catalog_loaded_rows", "mutable_buffer", 5.0); assert_metric("catalog_loaded_rows", "read_buffer", 0.0); assert_metric("catalog_loaded_rows", "object_store", 0.0); @@ -2181,12 +2186,12 @@ mod tests { assert_metric("catalog_loaded_chunks", "read_buffer", 1.0); assert_metric("catalog_loaded_chunks", "object_store", 0.0); assert_metric("catalog_loaded_rows", "mutable_buffer", 0.0); - assert_metric("catalog_loaded_rows", "read_buffer", 2.0); + assert_metric("catalog_loaded_rows", "read_buffer", 5.0); assert_metric("catalog_loaded_rows", "object_store", 0.0); // verify chunk size updated (chunk moved from closing to moving to moved) catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 0).unwrap(); - let expected_read_buffer_size = 1916; + let expected_read_buffer_size = 1922; catalog_chunk_size_bytes_metric_eq( &test_db.metric_registry, "read_buffer", @@ -2234,8 +2239,8 @@ mod tests { assert_metric("catalog_loaded_chunks", "read_buffer", 1.0); assert_metric("catalog_loaded_chunks", "object_store", 1.0); assert_metric("catalog_loaded_rows", "mutable_buffer", 0.0); - assert_metric("catalog_loaded_rows", "read_buffer", 2.0); - assert_metric("catalog_loaded_rows", "object_store", 2.0); + assert_metric("catalog_loaded_rows", "read_buffer", 5.0); + assert_metric("catalog_loaded_rows", "object_store", 5.0); db.unload_read_buffer("cpu", "1970-01-01T00", 1).unwrap(); @@ -2253,7 +2258,7 @@ mod tests { assert_metric("catalog_loaded_chunks", "object_store", 1.0); assert_metric("catalog_loaded_rows", "mutable_buffer", 0.0); assert_metric("catalog_loaded_rows", "read_buffer", 0.0); - assert_metric("catalog_loaded_rows", "object_store", 2.0); + assert_metric("catalog_loaded_rows", "object_store", 5.0); // verify chunk size not increased for OS (it was in OS before unload) catalog_chunk_size_bytes_metric_eq( @@ -2574,7 +2579,7 @@ mod tests { ("svr_id", "1"), ]) .histogram() - .sample_sum_eq(280.0) + .sample_sum_eq(5085.0) .unwrap(); // RB chunk size @@ -3161,7 +3166,7 @@ mod tests { id: 0, storage: ChunkStorage::OpenMutableBuffer, lifecycle_action: None, - memory_bytes: 70, // memory_size + memory_bytes: 1006, // memory_size object_store_bytes: 0, // os_size row_count: 1, time_of_last_access: None, @@ -3479,7 +3484,7 @@ mod tests { id: 1, storage: ChunkStorage::OpenMutableBuffer, lifecycle_action, - memory_bytes: 87, + memory_bytes: 1303, object_store_bytes: 0, // no OS chunks row_count: 1, time_of_last_access: None, @@ -3501,7 +3506,7 @@ mod tests { ); } - assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2486 + 87); + assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2486 + 1303); assert_eq!(db.catalog.metrics().memory().read_buffer(), 2766); assert_eq!(db.catalog.metrics().memory().object_store(), 2007); } diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index 897f4c5153..74dbba9820 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -276,7 +276,7 @@ impl CatalogChunk { .state .inc_with_attributes(&[KeyValue::new("state", "open")]); - let mut chunk = Self { + let chunk = Self { addr, stage, lifecycle_action: None, @@ -313,7 +313,7 @@ impl CatalogChunk { .state .inc_with_attributes(&[KeyValue::new("state", "compacted")]); - let mut chunk = Self { + let chunk = Self { addr, stage, lifecycle_action: None, @@ -350,7 +350,7 @@ impl CatalogChunk { meta, }; - let mut chunk = Self { + let chunk = Self { addr, stage, lifecycle_action: None, @@ -412,7 +412,7 @@ impl CatalogChunk { } /// Updates `self.metrics` to match the contents of `self.stage` - fn update_metrics(&mut self) { + pub fn update_metrics(&self) { match &self.stage { ChunkStage::Open { mb_chunk } => { self.metrics.memory_metrics.set_mub_only(mb_chunk.size()); @@ -627,7 +627,7 @@ impl CatalogChunk { assert!(self.time_closed.is_none()); self.time_closed = Some(Utc::now()); - let s = mb_chunk.snapshot(); + let (s, _) = mb_chunk.snapshot(); self.metrics .state .inc_with_attributes(&[KeyValue::new("state", "closed")]); @@ -880,9 +880,7 @@ impl CatalogChunk { self.set_lifecycle_action(ChunkLifecycleAction::Dropping, registration)?; // set memory metrics to 0 to stop accounting for this chunk within the catalog - self.metrics.memory_metrics.mutable_buffer.set(0); - self.metrics.memory_metrics.read_buffer.set(0); - self.metrics.memory_metrics.object_store.set(0); + self.metrics.memory_metrics.set_to_zero(); Ok(()) } diff --git a/server/src/db/catalog/metrics.rs b/server/src/db/catalog/metrics.rs index 48df34c355..d85f4ce150 100644 --- a/server/src/db/catalog/metrics.rs +++ b/server/src/db/catalog/metrics.rs @@ -214,82 +214,122 @@ impl PartitionMetrics { /// /// This can then be used within each `CatalogChunk` to record its observations for /// the different storages -#[derive(Debug)] pub struct StorageGauge { - pub(super) mutable_buffer: GaugeValue, - pub(super) read_buffer: GaugeValue, - pub(super) object_store: GaugeValue, + inner: Mutex, +} + +impl std::fmt::Debug for StorageGauge { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("StorageGauge").finish_non_exhaustive() + } +} + +struct StorageGaugeInner { + mutable_buffer: GaugeValue, + read_buffer: GaugeValue, + object_store: GaugeValue, } impl StorageGauge { pub(super) fn new_unregistered() -> Self { - Self { + let inner = StorageGaugeInner { mutable_buffer: GaugeValue::new_unregistered(), read_buffer: GaugeValue::new_unregistered(), object_store: GaugeValue::new_unregistered(), + }; + Self { + inner: Mutex::new(inner), } } pub(super) fn new(gauge: &Gauge) -> Self { - Self { + let inner = StorageGaugeInner { mutable_buffer: gauge.gauge_value(&[KeyValue::new("location", "mutable_buffer")]), read_buffer: gauge.gauge_value(&[KeyValue::new("location", "read_buffer")]), object_store: gauge.gauge_value(&[KeyValue::new("location", "object_store")]), + }; + Self { + inner: Mutex::new(inner), } } - pub(super) fn set_mub_only(&mut self, value: usize) { - self.mutable_buffer.set(value); - self.read_buffer.set(0); - self.object_store.set(0); + pub(super) fn set_mub_only(&self, value: usize) { + let mut guard = self.inner.lock(); + + guard.mutable_buffer.set(value); + guard.read_buffer.set(0); + guard.object_store.set(0); } - pub(super) fn set_rub_only(&mut self, value: usize) { - self.mutable_buffer.set(0); - self.read_buffer.set(value); - self.object_store.set(0); + pub(super) fn set_rub_only(&self, value: usize) { + let mut guard = self.inner.lock(); + + guard.mutable_buffer.set(0); + guard.read_buffer.set(value); + guard.object_store.set(0); } - pub(super) fn set_rub_and_object_store_only(&mut self, rub: usize, parquet: usize) { - self.mutable_buffer.set(0); - self.read_buffer.set(rub); - self.object_store.set(parquet); + pub(super) fn set_rub_and_object_store_only(&self, rub: usize, parquet: usize) { + let mut guard = self.inner.lock(); + + guard.mutable_buffer.set(0); + guard.read_buffer.set(rub); + guard.object_store.set(parquet); } - pub(super) fn set_object_store_only(&mut self, value: usize) { - self.mutable_buffer.set(0); - self.read_buffer.set(0); - self.object_store.set(value); + pub(super) fn set_object_store_only(&self, value: usize) { + let mut guard = self.inner.lock(); + + guard.mutable_buffer.set(0); + guard.read_buffer.set(0); + guard.object_store.set(value); + } + + pub(super) fn set_to_zero(&self) { + let mut guard = self.inner.lock(); + + guard.mutable_buffer.set(0); + guard.read_buffer.set(0); + guard.object_store.set(0); } fn clone_empty(&self) -> Self { + let guard = self.inner.lock(); + + let inner = StorageGaugeInner { + mutable_buffer: guard.mutable_buffer.clone_empty(), + read_buffer: guard.read_buffer.clone_empty(), + object_store: guard.object_store.clone_empty(), + }; Self { - mutable_buffer: self.mutable_buffer.clone_empty(), - read_buffer: self.read_buffer.clone_empty(), - object_store: self.object_store.clone_empty(), + inner: Mutex::new(inner), } } /// Returns the total for the mutable buffer pub fn mutable_buffer(&self) -> usize { - self.mutable_buffer.get_total() + let guard = self.inner.lock(); + guard.mutable_buffer.get_total() } /// Returns the total for the read buffer pub fn read_buffer(&self) -> usize { - self.read_buffer.get_total() + let guard = self.inner.lock(); + guard.read_buffer.get_total() } /// Returns the total for object storage pub fn object_store(&self) -> usize { - self.object_store.get_total() + let guard = self.inner.lock(); + guard.object_store.get_total() } /// Returns the total over all storages pub fn total(&self) -> usize { - self.mutable_buffer.get_total() - + self.read_buffer.get_total() - + self.object_store.get_total() + let guard = self.inner.lock(); + guard.mutable_buffer.get_total() + + guard.read_buffer.get_total() + + guard.object_store.get_total() } } diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 881d88a290..1b216e7bfa 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -110,7 +110,13 @@ impl DbChunk { let (state, meta) = match chunk.stage() { ChunkStage::Open { mb_chunk, .. } => { - let snapshot = mb_chunk.snapshot(); + let (snapshot, just_cached) = mb_chunk.snapshot(); + + // the snapshot might be cached, so we need to update the chunk metrics + if just_cached { + chunk.update_metrics(); + } + let state = State::MutableBuffer { chunk: Arc::clone(&snapshot), }; diff --git a/server/src/db/replay.rs b/server/src/db/replay.rs index cbd40a33aa..28bff534f2 100644 --- a/server/src/db/replay.rs +++ b/server/src/db/replay.rs @@ -742,7 +742,7 @@ mod tests { tokio::sync::Mutex::new(Box::new(write_buffer) as _), ))) .lifecycle_rules(data_types::database_rules::LifecycleRules { - buffer_size_hard: Some(NonZeroUsize::new(10_000).unwrap()), + buffer_size_hard: Some(NonZeroUsize::new(12_000).unwrap()), late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(), catalog_transactions_until_checkpoint, mub_row_threshold: NonZeroUsize::new(10).unwrap(), diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index 0d30eaff7b..bcc939898d 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -501,7 +501,7 @@ async fn test_chunk_get() { id: 0, storage: ChunkStorage::OpenMutableBuffer.into(), lifecycle_action, - memory_bytes: 100, + memory_bytes: 1016, object_store_bytes: 0, row_count: 2, time_of_last_access: None, @@ -515,7 +515,7 @@ async fn test_chunk_get() { id: 0, storage: ChunkStorage::OpenMutableBuffer.into(), lifecycle_action, - memory_bytes: 82, + memory_bytes: 1018, object_store_bytes: 0, row_count: 1, time_of_last_access: None, @@ -686,7 +686,7 @@ async fn test_list_partition_chunks() { id: 0, storage: ChunkStorage::OpenMutableBuffer.into(), lifecycle_action: ChunkLifecycleAction::Unspecified.into(), - memory_bytes: 100, + memory_bytes: 1016, object_store_bytes: 0, row_count: 2, time_of_last_access: None, diff --git a/tests/end_to_end_cases/management_cli.rs b/tests/end_to_end_cases/management_cli.rs index 964a3f4e7a..3367863f95 100644 --- a/tests/end_to_end_cases/management_cli.rs +++ b/tests/end_to_end_cases/management_cli.rs @@ -271,7 +271,7 @@ async fn test_get_chunks() { .and(predicate::str::contains( r#""storage": "OpenMutableBuffer","#, )) - .and(predicate::str::contains(r#""memory_bytes": 100"#)) + .and(predicate::str::contains(r#""memory_bytes": 1016"#)) // Check for a non empty timestamp such as // "time_of_first_write": "2021-03-30T17:11:10.723866Z", .and(predicate::str::contains(r#""time_of_first_write": "20"#)); From fdfec8fa4f1c91653631a7f0ff016ddf7f0bb64c Mon Sep 17 00:00:00 2001 From: Jacob Marble Date: Fri, 3 Sep 2021 06:20:45 -0700 Subject: [PATCH 3/3] feat: create initial performance test (#2358) * feat: introduce perf/perf.py: performance tests * fix: use Python requirements.txt for dependency requirements * chore: call ManagementService.GetServerStatus directly * chore: s/decode()/text Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- iox_data_generator/README.md | 6 +- perf/.gitignore | 2 + perf/README.md | 134 ++++++++ perf/battery-0/datagen.toml | 18 + perf/battery-0/foo.csv | 2 + perf/battery-0/queries.toml | 16 + perf/perf.py | 633 +++++++++++++++++++++++++++++++++++ perf/requirements.txt | 9 + 8 files changed, 817 insertions(+), 3 deletions(-) create mode 100644 perf/.gitignore create mode 100644 perf/README.md create mode 100644 perf/battery-0/datagen.toml create mode 100644 perf/battery-0/foo.csv create mode 100644 perf/battery-0/queries.toml create mode 100755 perf/perf.py create mode 100644 perf/requirements.txt diff --git a/iox_data_generator/README.md b/iox_data_generator/README.md index a6b1f717fb..21da13fdcd 100644 --- a/iox_data_generator/README.md +++ b/iox_data_generator/README.md @@ -67,7 +67,7 @@ instance to write to Kafka and the database in the "reader" IOx instance to read you run it with: ``` -cargo run -p iox_data_generator --bin create_database -- --writer 127.0.0.1:8082 --reader 127.0.0.1:8086 mlb_pirates +cargo run --release -p iox_data_generator --bin create_database -- --writer 127.0.0.1:8082 --reader 127.0.0.1:8086 mlb_pirates ``` This script adds 3 rows to a `writer_test` table because [this issue with the Kafka Consumer @@ -81,7 +81,7 @@ from an existing schema as a guide. In this example, we're going to use Next, run the data generation tool as follows: ``` -cargo run -p iox_data_generator -- --spec iox_data_generator/schemas/cap-write.toml --continue --host 127.0.0.1:8080 --token arbitrary --org mlb --bucket pirates +cargo run --release -p iox_data_generator -- --spec iox_data_generator/schemas/cap-write.toml --continue --host 127.0.0.1:8080 --token arbitrary --org mlb --bucket pirates ``` - `--spec iox_data_generator/schemas/cap-write.toml` sets the schema you want to use to generate the data @@ -100,7 +100,7 @@ is, ``` # in your influxdb_iox checkout -cargo run -- sql -h http://127.0.0.1:8086 +cargo run --release -- sql -h http://127.0.0.1:8086 ``` Connecting to the writer instance won't show any data. diff --git a/perf/.gitignore b/perf/.gitignore new file mode 100644 index 0000000000..a787156826 --- /dev/null +++ b/perf/.gitignore @@ -0,0 +1,2 @@ +logs/ +volumes/ diff --git a/perf/README.md b/perf/README.md new file mode 100644 index 0000000000..de6aa4d313 --- /dev/null +++ b/perf/README.md @@ -0,0 +1,134 @@ +# Performance Tests + +This tool starts a complete test environment: + +- Kafka (docker) +- Minio (docker) +- Jaeger (docker) +- IOx router (local process) +- IOx writer (local process) +- test battery: + - generate data with iox_data_generator + - query data and benchmark + +Logs live in `perf/logs`. +As long as perf.py is running, this works: `tail -f logs/iox_router.log` +After perf.py exits, log files are closed. +When perf.py is run again, old log files are deleted. + +Persistence volumes live in `perf/volumes`. +Similar to log files, these data remain after perf.py exits, and are deleted when perf.py is run again. + +## Test Batteries + +A test battery is composed of: +- a directory, named for the battery name + - data generator spec, in file `datagen.toml` + - query tests, in file `queries.toml` + - SQL query + - (optional) name of query test + - (optional) expected results, as a string or as a file + +The data generator spec format is that defined by `iox_data_generator`. +[Read about that here](../iox_data_generator/README.md). + +The query tests file looks like this: +```toml +[[queries]] +name = "example query, no expected result" +sql = "select count(*) from cpu" + +[[queries]] +name = "example query, expected result in string" +sql = "select count(*) from cpu" +expect = """ +COUNT(Uint8(1)) +3 +""" + +[[queries]] +name = "example query, expected result in file" +sql = "select count(*) from cpu" +expect_filename = "foo.csv" +``` + +## Usage + +Help: +```console +perf/perf.py --help +``` + +Run all test batteries: +```console +perf/perf.py all +``` + +Run test batteries `battery-0` and `battery-1`: +```console +perf/perf.py battery-0 battery-1 +``` + +Keep all processes running after test batteries have completed: +```console +perf/perf.py battery-0 --hold +``` + +Do not run any tests, just create an empty playground environment: +```console +perf/perf.py --hold +``` + +Do not build IOx: +```console +perf/perf.py --skip-build +``` + +Use debug binaries (`target/debug`) rather than release binaries: +```console +perf/perf.py --debug +``` + +Use Kafka/Zookeeper instead of Redpanda: +```console +perf/perf.py --kafka-zookeeper +``` + +Use in-memory object store implementation, instead of S3/Minio: +```console +perf/perf.py --object-store memory +``` + +Use file object store implementation, instead of S3/Minio: +```console +perf/perf.py --object-store file +``` + +Just delete docker containers and network, then exit. +In the future, this will also detect orphaned IOx processes generated by perf, and delete those too: +```console +perf/perf.py --cleanup +``` + +## Install + +Install Docker: +https://www.docker.com/products/docker-desktop + +Install python3: + +```console +brew install python3 +``` + +or: + +```console +apt install python3 python3-pip +``` + +Install the required Python packages: + +```console +python3 -m pip install -r perf/requirements.txt +``` diff --git a/perf/battery-0/datagen.toml b/perf/battery-0/datagen.toml new file mode 100644 index 0000000000..f34fd747f1 --- /dev/null +++ b/perf/battery-0/datagen.toml @@ -0,0 +1,18 @@ +name = "example" + +[[agents]] +name = "cap_write_{{agent_id}}" +count = 3 +sampling_interval = "10s" + +[[agents.measurements]] +name = "cpu" + +[[agents.measurements.tags]] +name = "host" +value = "host-{{agent_id}}" + +[[agents.measurements.fields]] +name = "usage_user" +f64_range = [0.0, 100.0] + diff --git a/perf/battery-0/foo.csv b/perf/battery-0/foo.csv new file mode 100644 index 0000000000..aeec46b5c4 --- /dev/null +++ b/perf/battery-0/foo.csv @@ -0,0 +1,2 @@ +COUNT(Uint8(1)) +3 diff --git a/perf/battery-0/queries.toml b/perf/battery-0/queries.toml new file mode 100644 index 0000000000..a22c1f40bb --- /dev/null +++ b/perf/battery-0/queries.toml @@ -0,0 +1,16 @@ +[[queries]] +name = "example query, no expected result" +sql = "select count(*) from cpu" + +[[queries]] +name = "example query, expected result in string" +sql = "select count(*) from cpu" +expect = """ +COUNT(Uint8(1)) +3 +""" + +[[queries]] +name = "example query, expected result in file" +sql = "select count(*) from cpu" +expect_filename = "foo.csv" diff --git a/perf/perf.py b/perf/perf.py new file mode 100755 index 0000000000..510f2586b6 --- /dev/null +++ b/perf/perf.py @@ -0,0 +1,633 @@ +#!/usr/bin/env python3 + +import argparse +import glob +import logging +import math +import os +import pathlib +import shutil +import signal +import socket +import subprocess +import threading +import time + +import docker +import grpc_requests +import minio +import requests +import toml +import urllib3 + +ioxperf_name = "ioxperf" +ioxperf_labels = {ioxperf_name: None} +ioxperf_filters = {'label': ioxperf_name} +org_name = 'myorg' +bucket_name = 'mybucket' +db_name = '%s_%s' % (org_name, bucket_name) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--skip-build', help='do not build IOx, execute existing binaries', action='store_true') + parser.add_argument('--debug', help='build/execute debug IOx binaries instead of release', action='store_true') + parser.add_argument('--object-store', help='object store type', default='s3', choices=('memory', 's3', 'file')) + parser.add_argument('--kafka-zookeeper', help='use Kafka/ZooKeeper instead of Redpanda', action='store_true') + parser.add_argument('--hold', help='keep all services running after tests complete', action='store_true') + parser.add_argument('--cleanup', help='remove Docker assets and exit (TODO terminate IOx processes)', action='store_true') + parser.add_argument('batteries', help='name of directories containing test batteries, or "all"', nargs='*') + args = parser.parse_args() + + os.chdir(os.path.dirname(os.path.abspath(__file__))) + + dc = docker.from_env() + if args.cleanup: + docker_cleanup_resources(dc) + return + cleanup_logs_and_volumes(dc) + + batteries = args.batteries + if batteries == ['all']: + batteries = ( + p.relative_to(os.getcwd()) + for p + in pathlib.Path(os.getcwd()).iterdir() + if p.joinpath('datagen.toml').is_file() + ) + else: + for battery in batteries: + p = pathlib.Path(os.getcwd()).joinpath(battery, 'datagen.toml') + if not p.is_file(): + print('invalid battery "%s" - does not contain datagen.toml' % battery) + exit(1) + + processes = {} + + try: + if not args.skip_build: + cargo_build_iox(args.debug) + + docker_create_network(dc) + if args.kafka_zookeeper: + docker_run_zookeeper(dc) + docker_run_kafka(dc) + else: + docker_run_redpanda(dc) + if args.object_store == 's3': + docker_run_minio(dc) + docker_run_jaeger(dc) + processes['iox_router'] = exec_iox(1, 'iox_router', debug=args.debug, object_store=args.object_store) + processes['iox_writer'] = exec_iox(2, 'iox_writer', debug=args.debug, object_store=args.object_store) + grpc_create_database(1, 2) + + print('-' * 40) + for battery in batteries: + run_test_battery(battery, 1, 2, debug=args.debug) + print('-' * 40) + + except Exception as e: + print(e) + + if args.hold: + print('subprocesses are still running, ctrl-C to terminate and exit') + try: + signal.pause() + except KeyboardInterrupt: + pass + print('-' * 40) + + for service_name, process in processes.items(): + if process is None: + continue + print('%s <- SIGTERM' % service_name) + process.send_signal(signal.SIGTERM) + exit_code = process.wait(1.0) + if exit_code is None: + print('%s <- SIGKILL' % service_name) + process.send_signal(signal.SIGKILL) + if exit_code != 0: + print('%s exited with %d' % (service_name, exit_code)) + docker_cleanup_resources(dc) + + +def docker_cleanup_resources(dc): + containers = dc.containers.list(all=True, filters=ioxperf_filters) + if len(containers) > 0: + print('removing containers: %s' % ', '.join((c.name for c in containers))) + for container in containers: + container.remove(v=True, force=True) + + networks = dc.networks.list(filters=ioxperf_filters) + if len(networks) > 0: + print('removing networks: %s' % ', '.join((n.name for n in networks))) + for network in networks: + network.remove() + + +def cleanup_logs_and_volumes(dc): + docker_cleanup_resources(dc) + + volume_paths = glob.glob(os.path.join(os.getcwd(), 'volumes', '*')) + if len(volume_paths) > 0: + print('removing volume contents: %s' % ', '.join((os.path.relpath(p) for p in volume_paths))) + for path in volume_paths: + shutil.rmtree(path) + + log_paths = glob.glob(os.path.join(os.getcwd(), 'logs', '*')) + if len(log_paths) > 0: + print('removing logs: %s' % ', '.join((os.path.relpath(p) for p in log_paths))) + for path in log_paths: + os.unlink(path) + + +def docker_create_network(dc): + dc.networks.create(name=ioxperf_name, driver='bridge', check_duplicate=True, scope='local', + labels=ioxperf_labels) + + +def docker_pull_image_if_needed(dc, image): + try: + dc.images.get(image) + except docker.errors.ImageNotFound: + print("pulling image '%s'..." % image) + dc.images.pull(image) + + +def docker_wait_container_running(container): + while True: + container.reload() + if container.status == 'running': + print("container '%s' has started" % container.name) + return + elif container.status == 'created': + print("waiting for container '%s' to start" % container.name) + time.sleep(0.1) + raise Exception("container '%s' status '%s' unexpected" % (container.name, container.status)) + + +def pipe_container_logs_to_file(container, log_filename): + with pathlib.Path(os.path.join(os.getcwd(), 'logs')) as dir_path: + if not dir_path.exists(): + os.mkdir(dir_path, mode=0o777) + + logs = container.logs(stdout=True, stderr=True, stream=True, follow=True) + f = open(file=os.path.join(os.getcwd(), 'logs', log_filename), mode='wb', buffering=0) + + def thread_function(): + for entry in logs: + f.write(entry) + f.flush() + f.close() + + threading.Thread(target=thread_function, daemon=True).start() + + +def check_port_open(addr, port): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + port_open = sock.connect_ex((addr, port)) == 0 + sock.close() + return port_open + + +def docker_run_redpanda(dc): + image = 'vectorized/redpanda:v21.7.6' + command = ['redpanda', 'start', + '--overprovisioned', '--smp 1', '--memory 128M', '--reserve-memory', '0M', '--node-id', '0', + '--check=false', '--kafka-addr', 'CLIENT://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093', + '--advertise-kafka-addr', 'CLIENT://kafka:9092,EXTERNAL://localhost:9093'] + name = '%s-%s' % (ioxperf_name, 'redpanda') + ports = {'9093/tcp': 9093} + volumes = {os.path.join(os.getcwd(), 'volumes/redpanda'): { + 'bind': '/var/lib/redpanda/data', + 'mode': 'rw', + }} + docker_pull_image_if_needed(dc, image) + container = dc.containers.run(image=image, command=command, detach=True, name=name, hostname='kafka', + labels=ioxperf_labels, network=ioxperf_name, ports=ports, volumes=volumes) + docker_wait_container_running(container) + + while True: + if check_port_open('127.0.0.1', 9093): + break + print('waiting for Redpanda to become ready') + time.sleep(0.1) + + pipe_container_logs_to_file(container, 'redpanda.log') + print('Redpanda service is ready') + + return container + + +def docker_run_zookeeper(dc): + image = 'docker.io/bitnami/zookeeper:3' + name = '%s-%s' % (ioxperf_name, 'zookeeper') + ports = {'2181/tcp': 2181} + env = { + 'ALLOW_ANONYMOUS_LOGIN': 'yes', + } + volumes = {os.path.join(os.getcwd(), 'volumes/zookeeper'): { + 'bind': '/bitnami/zookeeper', + 'mode': 'rw', + }} + docker_pull_image_if_needed(dc, image) + container = dc.containers.run(image=image, detach=True, environment=env, name=name, hostname='zookeeper', + labels=ioxperf_labels, network=ioxperf_name, ports=ports, volumes=volumes) + docker_wait_container_running(container) + + while True: + if check_port_open('127.0.0.1', 2181): + break + print('waiting for ZooKeeper to become ready') + time.sleep(0.1) + + pipe_container_logs_to_file(container, 'zookeeper.log') + print('ZooKeeper service is ready') + + return container + + +def docker_run_kafka(dc): + image = 'docker.io/bitnami/kafka:2' + name = '%s-%s' % (ioxperf_name, 'kafka') + ports = {'9093/tcp': 9093} + env = { + 'KAFKA_CFG_ZOOKEEPER_CONNECT': 'zookeeper:2181', + 'ALLOW_PLAINTEXT_LISTENER': 'yes', + 'KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP': 'CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT', + 'KAFKA_CFG_LISTENERS': 'CLIENT://:9092,EXTERNAL://:9093', + 'KAFKA_CFG_ADVERTISED_LISTENERS': 'CLIENT://kafka:9092,EXTERNAL://localhost:9093', + 'KAFKA_INTER_BROKER_LISTENER_NAME': 'CLIENT', + 'KAFKA_CFG_LOG_RETENTION_CHECK_INTERVAL_MS': '100', + } + volumes = {os.path.join(os.getcwd(), 'volumes/kafka'): { + 'bind': '/bitname/kafka', + 'mode': 'rw', + }} + docker_pull_image_if_needed(dc, image) + container = dc.containers.run(image=image, detach=True, environment=env, name=name, hostname='kafka', + labels=ioxperf_labels, network=ioxperf_name, ports=ports, volumes=volumes) + docker_wait_container_running(container) + + while True: + if check_port_open('127.0.0.1', 9093): + break + print('waiting for Kafka to become ready') + time.sleep(0.1) + + pipe_container_logs_to_file(container, 'kafka.log') + print('Kafka service is ready') + + return container + + +def docker_run_minio(dc): + image = 'minio/minio:RELEASE.2021-08-05T22-01-19Z' + command = 'server --address 0.0.0.0:9000 --console-address 0.0.0.0:9001 /data' + name = '%s-%s' % (ioxperf_name, 'minio') + ports = {'9000/tcp': 9000, '9001/tcp': 9001} + volumes = {os.path.join(os.getcwd(), 'volumes/minio'): { + 'bind': '/data', + 'mode': 'rw', + }} + env = { + 'MINIO_ROOT_USER': 'minio', + 'MINIO_ROOT_PASSWORD': 'miniominio', + 'MINIO_PROMETHEUS_AUTH_TYPE': 'public', + 'MINIO_HTTP_TRACE': '/dev/stdout', + } + docker_pull_image_if_needed(dc, image) + container = dc.containers.run(image=image, command=command, detach=True, environment=env, name=name, + hostname='minio', labels=ioxperf_labels, network=ioxperf_name, ports=ports, + volumes=volumes) + docker_wait_container_running(container) + + while True: + timeout = urllib3.util.Timeout(connect=0.1, read=0.1) + http_client = urllib3.PoolManager(num_pools=1, timeout=timeout, retries=False) + try: + mc = minio.Minio(endpoint='127.0.0.1:9000', access_key='minio', secret_key='miniominio', secure=False, + http_client=http_client) + if not mc.bucket_exists('iox1'): + mc.make_bucket('iox1') + if not mc.bucket_exists('iox2'): + mc.make_bucket('iox2') + break + except (urllib3.exceptions.ProtocolError, urllib3.exceptions.TimeoutError, minio.error.S3Error): + pass + print('waiting for Minio to become ready') + time.sleep(0.5) + + pipe_container_logs_to_file(container, 'minio.log') + print('Minio service ready') + + return container + + +def docker_run_jaeger(dc): + image = 'jaegertracing/all-in-one:1.25.0' + name = '%s-%s' % (ioxperf_name, 'jaeger') + ports = {'16686/tcp': 16686} + docker_pull_image_if_needed(dc, image) + container = dc.containers.run(image=image, detach=True, name=name, hostname='jaeger', labels=ioxperf_labels, + network=ioxperf_name, ports=ports) + docker_wait_container_running(container) + + while True: + try: + if requests.get(url='http://127.0.0.1:16686/search', timeout=0.1).status_code / 100 == 2: + break + except (requests.exceptions.ConnectionError, requests.exceptions.Timeout): + pass + print('waiting for Jaeger to become ready') + time.sleep(0.1) + + pipe_container_logs_to_file(container, 'jaeger.log') + print('Jaeger service ready') + + return container + + +def cargo_build_iox(debug=False): + t = time.time() + print('building IOx') + + args = ['cargo', 'build'] + if not debug: + args += ['--release'] + args += ['--package', 'influxdb_iox', '--features', 'aws,jaeger', '--bin', 'influxdb_iox'] + args += ['--package', 'iox_data_generator', '--bin', 'iox_data_generator', '--bin', 'create_database'] + subprocess.run(args=args) + + print('building IOx finished in %.2fs' % (time.time() - t)) + + +def exec_iox(id, service_name, debug=False, object_store='memory', print_only=False): + http_addr = 'localhost:%d' % (id * 10000 + 8080) + grpc_addr = 'localhost:%d' % (id * 10000 + 8082) + + if debug: + iox_path = os.path.abspath(os.path.join(os.getcwd(), '../target/debug/influxdb_iox')) + else: + iox_path = os.path.abspath(os.path.join(os.getcwd(), '../target/release/influxdb_iox')) + args = [iox_path, 'run'] + env = { + 'INFLUXDB_IOX_ID': str(id), + 'INFLUXDB_IOX_BIND_ADDR': http_addr, + 'INFLUXDB_IOX_GRPC_BIND_ADDR': grpc_addr, + 'INFLUXDB_IOX_BUCKET': 'iox%d' % id, + 'LOG_DESTINATION': 'stdout', + 'LOG_FORMAT': 'full', + 'TRACES_EXPORTER': 'jaeger', + 'TRACES_EXPORTER_JAEGER_AGENT_HOST': 'localhost', + 'TRACES_EXPORTER_JAEGER_AGENT_PORT': '6831', + 'TRACES_EXPORTER_JAEGER_SERVICE_NAME': service_name, + 'TRACES_SAMPLER': 'always_on', + 'RUST_BACKTRACE': '1', + 'LOG_FILTER': 'debug,lifecycle=info,rusoto_core=warn,hyper=warn,h2=warn', + } + + if object_store == 'memory': + env['INFLUXDB_IOX_OBJECT_STORE'] = 'memory' + elif object_store == 's3': + env['INFLUXDB_IOX_OBJECT_STORE'] = 's3' + env['AWS_ACCESS_KEY_ID'] = 'minio' + env['AWS_SECRET_ACCESS_KEY'] = 'miniominio' + env['AWS_ENDPOINT'] = 'http://localhost:9000' + elif object_store == 'file': + env['INFLUXDB_IOX_OBJECT_STORE'] = 'file' + env['INFLUXDB_IOX_DB_DIR'] = 'volumes/%s' % service_name + else: + raise ValueError('invalid object_store value "%s"' % object_store) + + if print_only: + print() + for k in sorted(env.keys()): + print('%s=%s' % (k, env[k])) + print(' '.join(args)) + print() + return None + + log_file = open('logs/%s.log' % service_name, mode='w') + process = subprocess.Popen(args=args, env=env, stdout=log_file, stderr=log_file) + + while True: + if process.poll() is not None: + raise ChildProcessError('service %s stopped unexpectedly, check %s' % (service_name, log_file.name)) + router = grpc_requests.Client(grpc_addr, lazy=True) + while True: + try: + router.register_service('influxdata.iox.management.v1.ManagementService') + break + except: + # fall through to retry + pass + try: + server_status_response = router.request('influxdata.iox.management.v1.ManagementService', 'GetServerStatus', None) + if 'server_status' in server_status_response and server_status_response['server_status']['initialized'] is True: + break + except: + # fall through to retry + pass + + print('waiting for %s to become ready' % service_name) + time.sleep(0.1) + + print('%s service ready' % service_name) + + return process + + +def grpc_create_database(router_id, writer_id): + print('creating database "%s" on both IOx servers' % db_name) + + router_db_rules = { + 'rules': { + 'name': db_name, + 'partition_template': { + 'parts': [ + {'time': '%Y-%m-%d %H:00:00'}, + ], + }, + 'lifecycle_rules': { + 'immutable': True, + 'worker_backoff_millis': '1000', + 'catalog_transactions_until_checkpoint': '100', + 'late_arrive_window_seconds': 300, + 'persist_row_threshold': '1000000', + 'persist_age_threshold_seconds': 1800, + 'mub_row_threshold': '100000', + 'max_active_compactions_cpu_fraction': 1.0, + }, + 'routing_config': {'sink': {'kafka': {}}}, + 'worker_cleanup_avg_sleep': '500s', + 'writing': '127.0.0.1:9093', + }, + } + + writer_db_rules = { + 'rules': { + 'name': db_name, + 'partition_template': { + 'parts': [ + {'time': '%Y-%m-%d %H:00:00'} + ], + }, + 'lifecycle_rules': { + 'buffer_size_soft': 1024 * 1024 * 1024, + 'buffer_size_hard': 1024 * 1024 * 1024 * 2, + 'worker_backoff_millis': 100, + 'max_active_compactions': 1, + 'persist': True, + 'persist_row_threshold': 10000000, + 'catalog_transactions_until_checkpoint': 100, + 'late_arrive_window_seconds': 300, + 'persist_age_threshold_seconds': 1800, + 'mub_row_threshold': 100000, + }, + 'routing_config': {'sink': {'kafka': {}}}, + 'worker_cleanup_avg_sleep': '500s', + 'reading': '127.0.0.1:9093', + }, + } + + if router_id is not None: + router_grpc_addr = 'localhost:%d' % (router_id * 10000 + 8082) + router = grpc_requests.Client(router_grpc_addr, lazy=True) + router.register_service('influxdata.iox.management.v1.ManagementService') + router.request('influxdata.iox.management.v1.ManagementService', 'CreateDatabase', router_db_rules) + + router_http_addr = 'localhost:%d' % (router_id * 10000 + 8080) + router_write_url = 'http://%s/api/v2/write?org=%s&bucket=%s' % (router_http_addr, org_name, bucket_name) + lp = "sentinel,source=perf.py f=1i" + response = requests.post(url=router_write_url, data=lp, timeout=10) + if not response.ok: + print('failed to write to router') + print(response.reason) + print(response.content) + return + + else: + print() + print(router_db_rules) + print() + + if writer_id is not None: + writer_grpc_addr = 'localhost:%d' % (writer_id * 10000 + 8082) + writer = grpc_requests.Client(writer_grpc_addr, lazy=True) + writer.register_service('influxdata.iox.management.v1.ManagementService') + writer.request('influxdata.iox.management.v1.ManagementService', 'CreateDatabase', writer_db_rules) + + writer_http_addr = 'localhost:%d' % (writer_id * 10000 + 8080) + writer_query_url = 'http://%s/iox/api/v1/databases/%s/query' % (writer_http_addr, db_name) + writer_query_params = {'q': 'select count(1) from sentinel'} + + response = requests.get(url=writer_query_url, params=writer_query_params, timeout=10) + for i in range(20): + if response.ok: + break + print('waiting for round trip test to succeed') + time.sleep(0.5) + response = requests.get(url=writer_query_url, params=writer_query_params, timeout=10) + + if not response.ok: + print(response.reason) + print(response.content) + return + + else: + print() + print(writer_db_rules) + print() + + print('created database "%s" on both IOx servers' % db_name) + + +def run_test_battery(battery_name, router_id, writer_id, debug=False): + print('starting test battery "%s"' % battery_name) + + # Write + + battery_dir = os.path.join(os.getcwd(), battery_name) + datagen_filename = os.path.join(battery_dir, 'datagen.toml') + if debug: + iox_data_generator_path = os.path.abspath(os.path.join(os.getcwd(), '../target/debug/iox_data_generator')) + else: + iox_data_generator_path = os.path.abspath(os.path.join(os.getcwd(), '../target/release/iox_data_generator')) + + router_http_addr = 'localhost:%d' % (router_id * 10000 + 8080) + args = [iox_data_generator_path, + '--host', router_http_addr, '--token', 'arbitrary', + '--org', org_name, '--bucket', bucket_name, + '--spec', datagen_filename] + env = { + 'RUST_BACKTRACE': '0', + } + log_file = open('logs/test.log', mode='w') + if subprocess.run(args=args, stdout=log_file, stderr=log_file, env=env).returncode != 0: + raise ChildProcessError( + 'failed to run iox_data_generator for battery "%s", check %s' % (battery_name, log_file.name)) + + # Query + + writer_http_addr = 'localhost:%d' % (writer_id * 10000 + 8080) + query_url = 'http://%s/iox/api/v1/databases/%s/query' % (writer_http_addr, db_name) + queries_filename = os.path.join(battery_dir, 'queries.toml') + queries = toml.load(open(queries_filename)) + + for query in queries['queries']: + if 'sql' not in query: + print('query missing SQL query') + print(query) + print() + continue + sql = query['sql'] + name = query['name'] + if name is None: + name = sql + + print('running test "%s"' % name) + time_start = time.time() + params = {'q': sql, 'format': 'csv'} + response = requests.get(url=query_url, params=params) + time_delta = '%dms' % math.floor((time.time() - time_start) * 1000) + + if not response.ok: + print(response.reason) + print(response.content.text) + print() + continue + + got = response.content.text.strip() + print('time: %s' % time_delta) + if 'expect' in query: + expect = query['expect'].strip() + if expect != got: + print('expected: %s' % expect) + print('got: %s' % got) + else: + print('OK') + + elif 'expect_filename' in query: + path = pathlib.Path(os.path.join(battery_dir, query['expect_filename'])) + if not path.is_file(): + print('file "%s" not found' % path) + print() + continue + expect = open(path).read().strip() + if expect != got: + print('expected: %s' % expect) + print('got: %s' % got) + else: + print('OK') + else: + print('OK') + + print() + + print('completed test battery "%s"' % battery_name) + + +if __name__ == "__main__": + logging.getLogger('grpc_requests.client').setLevel(logging.ERROR) + main() diff --git a/perf/requirements.txt b/perf/requirements.txt new file mode 100644 index 0000000000..c638092166 --- /dev/null +++ b/perf/requirements.txt @@ -0,0 +1,9 @@ +docker>=5.0,<6 +grpc-requests>=0.1,<0.2 +grpcio>=1.39,<1.40 +grpcio-reflection>=1.39,<1.40 +protobuf>=3.17,<3.18 +minio +requests +toml +urllib3