Merge branch 'main' into crepererum/inline_parquet_table_struct
commit
71e2a8fbaa
|
@ -326,8 +326,8 @@ impl ChunkMover for LifecycleManager {
|
|||
|
||||
fn write_to_object_store(
|
||||
&mut self,
|
||||
partition_key: String,
|
||||
table_name: String,
|
||||
partition_key: String,
|
||||
chunk_id: u32,
|
||||
) -> TaskTracker<Self::Job> {
|
||||
info!(%partition_key, %chunk_id, "write chunk to object store");
|
||||
|
@ -338,7 +338,7 @@ impl ChunkMover for LifecycleManager {
|
|||
tracker
|
||||
}
|
||||
|
||||
fn drop_chunk(&mut self, partition_key: String, table_name: String, chunk_id: u32) {
|
||||
fn drop_chunk(&mut self, table_name: String, partition_key: String, chunk_id: u32) {
|
||||
info!(%partition_key, %chunk_id, "dropping chunk");
|
||||
let _ = self
|
||||
.db
|
||||
|
|
|
@ -245,6 +245,32 @@ pub struct Config {
|
|||
)]
|
||||
pub traces_exporter_jaeger_agent_port: NonZeroU16,
|
||||
|
||||
/// Tracing: Jaeger service name.
|
||||
///
|
||||
/// Only used if `--traces-exporter` is "jaeger".
|
||||
#[structopt(
|
||||
long = "--traces-exporter-jaeger-service-name",
|
||||
env = "TRACES_EXPORTER_JAEGER_SERVICE_NAME",
|
||||
default_value = "iox"
|
||||
)]
|
||||
pub traces_exporter_jaeger_service_name: String,
|
||||
|
||||
/// Tracing: Jaeger max UDP packet size
|
||||
///
|
||||
/// Default to 1300, which is a safe MTU.
|
||||
///
|
||||
/// You can increase it to 65000 if the target is a jaeger collector
|
||||
/// on localhost. If so, the batching exporter will be enabled for
|
||||
/// extra efficiency. Otherwise an UDP packet will be sent for each exported span.
|
||||
///
|
||||
/// Only used if `--traces-exporter` is "jaeger".
|
||||
#[structopt(
|
||||
long = "--traces-exporter-jaeger-max-packet-size",
|
||||
env = "TRACES_EXPORTER_JAEGER_MAX_PACKET_SIZE",
|
||||
default_value = "1300"
|
||||
)]
|
||||
pub traces_exporter_jaeger_max_packet_size: usize,
|
||||
|
||||
/// The identifier for the server.
|
||||
///
|
||||
/// Used for writing to object storage and as an identifier that is added to
|
||||
|
|
|
@ -160,13 +160,25 @@ fn construct_opentelemetry_tracer(config: &crate::commands::run::Config) -> Opti
|
|||
config.traces_exporter_jaeger_agent_port
|
||||
);
|
||||
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
|
||||
Some(
|
||||
opentelemetry_jaeger::new_pipeline()
|
||||
Some({
|
||||
let builder = opentelemetry_jaeger::new_pipeline()
|
||||
.with_trace_config(trace_config)
|
||||
.with_agent_endpoint(agent_endpoint)
|
||||
.install_batch(opentelemetry::runtime::Tokio)
|
||||
.unwrap(),
|
||||
)
|
||||
.with_service_name(&config.traces_exporter_jaeger_service_name)
|
||||
.with_max_packet_size(config.traces_exporter_jaeger_max_packet_size);
|
||||
|
||||
// Batching is hard to tune because the max batch size
|
||||
// is not currently exposed as a tunable from the trace config, and even then
|
||||
// it's defined in terms of max number of spans, and not their size in bytes.
|
||||
// Thus we enable batching only when the MTU size is 65000 which is the value suggested
|
||||
// by jaeger when exporting to localhost.
|
||||
if config.traces_exporter_jaeger_max_packet_size >= 65_000 {
|
||||
builder.install_batch(opentelemetry::runtime::Tokio)
|
||||
} else {
|
||||
builder.install_simple()
|
||||
}
|
||||
.unwrap()
|
||||
})
|
||||
}
|
||||
|
||||
TracesExporter::Otlp => {
|
||||
|
|
|
@ -4,6 +4,7 @@ pub mod management_api;
|
|||
pub mod management_cli;
|
||||
pub mod operations_api;
|
||||
pub mod operations_cli;
|
||||
mod persistence;
|
||||
pub mod preservation;
|
||||
pub mod read_api;
|
||||
pub mod read_cli;
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
use std::{
|
||||
convert::TryInto,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use data_types::chunk_metadata::ChunkSummary;
|
||||
|
||||
use crate::common::server_fixture::ServerFixture;
|
||||
|
||||
use super::scenario::{create_quickly_persisting_database, rand_name};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_persistence() {
|
||||
let fixture = ServerFixture::create_shared().await;
|
||||
let mut write_client = fixture.write_client();
|
||||
let mut management_client = fixture.management_client();
|
||||
|
||||
let db_name = rand_name();
|
||||
create_quickly_persisting_database(&db_name, fixture.grpc_channel()).await;
|
||||
|
||||
// Stream in a write that should exceed the limit
|
||||
let lp_lines: Vec<_> = (0..1_000)
|
||||
.map(|i| format!("data,tag1=val{} x={} {}", i, i * 10, i))
|
||||
.collect();
|
||||
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.await
|
||||
.expect("successful write");
|
||||
assert_eq!(num_lines_written, 1000);
|
||||
|
||||
// wait for the chunk to be written to object store
|
||||
let deadline = Instant::now() + Duration::from_secs(5);
|
||||
let mut chunks = vec![];
|
||||
loop {
|
||||
assert!(
|
||||
Instant::now() < deadline,
|
||||
"Chunk did not persist in time. Chunks were: {:#?}",
|
||||
chunks
|
||||
);
|
||||
|
||||
chunks = management_client
|
||||
.list_chunks(&db_name)
|
||||
.await
|
||||
.expect("listing chunks");
|
||||
|
||||
let storage_string = chunks
|
||||
.iter()
|
||||
.map(|c| {
|
||||
let c: ChunkSummary = c.clone().try_into().unwrap();
|
||||
format!("{:?}", c.storage)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.join(",");
|
||||
|
||||
// Found a persisted chunk, all good
|
||||
if storage_string == "ReadBufferAndObjectStore" {
|
||||
return;
|
||||
}
|
||||
|
||||
// keep looking
|
||||
println!("Current chunk storage: {:#?}", storage_string);
|
||||
tokio::time::sleep(Duration::from_millis(200)).await
|
||||
}
|
||||
}
|
|
@ -316,6 +316,43 @@ pub async fn create_readable_database(
|
|||
.expect("create database failed");
|
||||
}
|
||||
|
||||
/// given a channel to talk with the management api, create a new
|
||||
/// database with the specified name that will aggressively try and
|
||||
/// persist all data quickly
|
||||
pub async fn create_quickly_persisting_database(
|
||||
db_name: impl Into<String>,
|
||||
channel: tonic::transport::Channel,
|
||||
) {
|
||||
let db_name = db_name.into();
|
||||
|
||||
let mut management_client = influxdb_iox_client::management::Client::new(channel);
|
||||
let rules = DatabaseRules {
|
||||
name: db_name.clone(),
|
||||
partition_template: Some(PartitionTemplate {
|
||||
parts: vec![partition_template::Part {
|
||||
part: Some(partition_template::part::Part::Time(
|
||||
"%Y-%m-%d %H:00:00".into(),
|
||||
)),
|
||||
}],
|
||||
}),
|
||||
lifecycle_rules: Some(LifecycleRules {
|
||||
mutable_linger_seconds: 1,
|
||||
mutable_size_threshold: 100,
|
||||
buffer_size_soft: 1024 * 1024,
|
||||
buffer_size_hard: 1024 * 1024,
|
||||
persist: true,
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
management_client
|
||||
.create_database(rules.clone())
|
||||
.await
|
||||
.expect("create database failed");
|
||||
println!("Created quickly persisting database {}", db_name);
|
||||
}
|
||||
|
||||
/// given a channel to talk with the managment api, create a new
|
||||
/// database with no mutable buffer configured, no partitioning rules
|
||||
pub async fn create_unreadable_database(
|
||||
|
|
Loading…
Reference in New Issue