From 369c2237f68f56d9dd01674f7f75ccfdeca8a943 Mon Sep 17 00:00:00 2001 From: Marko Mikulicic Date: Thu, 10 Jun 2021 23:26:25 +0200 Subject: [PATCH 1/2] fix: Expose jaeger knobs and default max-packet-size to something that works everywhere `--traces-exporter-jaeger-max-packet-size` is important also when you run the jaeger collector on "localhost" by running `docker run jaegertracing/all-in-one ....` which on mac doesn't really work on the real localhost but has a few hops between tunneling interfaces, so you'd get mysteriously dropped packets that can easily drive you to doubt your own sanity on an otherwise calm Thursday evening. --- src/commands/run.rs | 26 ++++++++++++++++++++++++++ src/commands/tracing.rs | 22 +++++++++++++++++----- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/src/commands/run.rs b/src/commands/run.rs index 1a07df842b..3ff95c3df3 100644 --- a/src/commands/run.rs +++ b/src/commands/run.rs @@ -245,6 +245,32 @@ pub struct Config { )] pub traces_exporter_jaeger_agent_port: NonZeroU16, + /// Tracing: Jaeger service name. + /// + /// Only used if `--traces-exporter` is "jaeger". + #[structopt( + long = "--traces-exporter-jaeger-service-name", + env = "TRACES_EXPORTER_JAEGER_SERVICE_NAME", + default_value = "iox" + )] + pub traces_exporter_jaeger_service_name: String, + + /// Tracing: Jaeger max UDP packet size + /// + /// Default to 1300, which is a safe MTU. + /// + /// You can increase it to 65000 if the target is a jaeger collector + /// on localhost. If so, the batching exporter will be enabled for + /// extra efficiency. Otherwise an UDP packet will be sent for each exported span. + /// + /// Only used if `--traces-exporter` is "jaeger". + #[structopt( + long = "--traces-exporter-jaeger-max-packet-size", + env = "TRACES_EXPORTER_JAEGER_MAX_PACKET_SIZE", + default_value = "1300" + )] + pub traces_exporter_jaeger_max_packet_size: usize, + /// The identifier for the server. /// /// Used for writing to object storage and as an identifier that is added to diff --git a/src/commands/tracing.rs b/src/commands/tracing.rs index f5976d9333..43a4b1835f 100644 --- a/src/commands/tracing.rs +++ b/src/commands/tracing.rs @@ -160,13 +160,25 @@ fn construct_opentelemetry_tracer(config: &crate::commands::run::Config) -> Opti config.traces_exporter_jaeger_agent_port ); opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); - Some( - opentelemetry_jaeger::new_pipeline() + Some({ + let builder = opentelemetry_jaeger::new_pipeline() .with_trace_config(trace_config) .with_agent_endpoint(agent_endpoint) - .install_batch(opentelemetry::runtime::Tokio) - .unwrap(), - ) + .with_service_name(&config.traces_exporter_jaeger_service_name) + .with_max_packet_size(config.traces_exporter_jaeger_max_packet_size); + + // Batching is hard to tune because the max batch size + // is not currently exposed as a tunable from the trace config, and even then + // it's defined in terms of max number of spans, and not their size in bytes. + // Thus we enable batching only when the MTU size is 65000 which is the value suggested + // by jaeger when exporting to localhost. + if config.traces_exporter_jaeger_max_packet_size >= 65_000 { + builder.install_batch(opentelemetry::runtime::Tokio) + } else { + builder.install_simple() + } + .unwrap() + }) } TracesExporter::Otlp => { From 0cbe74dbde5b7044ed7cff8ff44a1da339135021 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 11 Jun 2021 06:55:40 -0400 Subject: [PATCH 2/2] fix: persistence to parquet by swapping order of arguments (#1687) * fix: fix order of arguments * test: for persistence --- server/src/db/lifecycle.rs | 4 +- tests/end_to_end_cases/mod.rs | 1 + tests/end_to_end_cases/persistence.rs | 65 +++++++++++++++++++++++++++ tests/end_to_end_cases/scenario.rs | 37 +++++++++++++++ 4 files changed, 105 insertions(+), 2 deletions(-) create mode 100644 tests/end_to_end_cases/persistence.rs diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index 629b5221b0..d23f9dda55 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -326,8 +326,8 @@ impl ChunkMover for LifecycleManager { fn write_to_object_store( &mut self, - partition_key: String, table_name: String, + partition_key: String, chunk_id: u32, ) -> TaskTracker { info!(%partition_key, %chunk_id, "write chunk to object store"); @@ -338,7 +338,7 @@ impl ChunkMover for LifecycleManager { tracker } - fn drop_chunk(&mut self, partition_key: String, table_name: String, chunk_id: u32) { + fn drop_chunk(&mut self, table_name: String, partition_key: String, chunk_id: u32) { info!(%partition_key, %chunk_id, "dropping chunk"); let _ = self .db diff --git a/tests/end_to_end_cases/mod.rs b/tests/end_to_end_cases/mod.rs index 44a933247a..8e3c3216af 100644 --- a/tests/end_to_end_cases/mod.rs +++ b/tests/end_to_end_cases/mod.rs @@ -4,6 +4,7 @@ pub mod management_api; pub mod management_cli; pub mod operations_api; pub mod operations_cli; +mod persistence; pub mod preservation; pub mod read_api; pub mod read_cli; diff --git a/tests/end_to_end_cases/persistence.rs b/tests/end_to_end_cases/persistence.rs new file mode 100644 index 0000000000..cc35af4937 --- /dev/null +++ b/tests/end_to_end_cases/persistence.rs @@ -0,0 +1,65 @@ +use std::{ + convert::TryInto, + time::{Duration, Instant}, +}; + +use data_types::chunk_metadata::ChunkSummary; + +use crate::common::server_fixture::ServerFixture; + +use super::scenario::{create_quickly_persisting_database, rand_name}; + +#[tokio::test] +async fn test_persistence() { + let fixture = ServerFixture::create_shared().await; + let mut write_client = fixture.write_client(); + let mut management_client = fixture.management_client(); + + let db_name = rand_name(); + create_quickly_persisting_database(&db_name, fixture.grpc_channel()).await; + + // Stream in a write that should exceed the limit + let lp_lines: Vec<_> = (0..1_000) + .map(|i| format!("data,tag1=val{} x={} {}", i, i * 10, i)) + .collect(); + + let num_lines_written = write_client + .write(&db_name, lp_lines.join("\n")) + .await + .expect("successful write"); + assert_eq!(num_lines_written, 1000); + + // wait for the chunk to be written to object store + let deadline = Instant::now() + Duration::from_secs(5); + let mut chunks = vec![]; + loop { + assert!( + Instant::now() < deadline, + "Chunk did not persist in time. Chunks were: {:#?}", + chunks + ); + + chunks = management_client + .list_chunks(&db_name) + .await + .expect("listing chunks"); + + let storage_string = chunks + .iter() + .map(|c| { + let c: ChunkSummary = c.clone().try_into().unwrap(); + format!("{:?}", c.storage) + }) + .collect::>() + .join(","); + + // Found a persisted chunk, all good + if storage_string == "ReadBufferAndObjectStore" { + return; + } + + // keep looking + println!("Current chunk storage: {:#?}", storage_string); + tokio::time::sleep(Duration::from_millis(200)).await + } +} diff --git a/tests/end_to_end_cases/scenario.rs b/tests/end_to_end_cases/scenario.rs index 7b602786ad..360832f5a1 100644 --- a/tests/end_to_end_cases/scenario.rs +++ b/tests/end_to_end_cases/scenario.rs @@ -316,6 +316,43 @@ pub async fn create_readable_database( .expect("create database failed"); } +/// given a channel to talk with the management api, create a new +/// database with the specified name that will aggressively try and +/// persist all data quickly +pub async fn create_quickly_persisting_database( + db_name: impl Into, + channel: tonic::transport::Channel, +) { + let db_name = db_name.into(); + + let mut management_client = influxdb_iox_client::management::Client::new(channel); + let rules = DatabaseRules { + name: db_name.clone(), + partition_template: Some(PartitionTemplate { + parts: vec![partition_template::Part { + part: Some(partition_template::part::Part::Time( + "%Y-%m-%d %H:00:00".into(), + )), + }], + }), + lifecycle_rules: Some(LifecycleRules { + mutable_linger_seconds: 1, + mutable_size_threshold: 100, + buffer_size_soft: 1024 * 1024, + buffer_size_hard: 1024 * 1024, + persist: true, + ..Default::default() + }), + ..Default::default() + }; + + management_client + .create_database(rules.clone()) + .await + .expect("create database failed"); + println!("Created quickly persisting database {}", db_name); +} + /// given a channel to talk with the managment api, create a new /// database with no mutable buffer configured, no partitioning rules pub async fn create_unreadable_database(