Merge branch 'main' into dom/router2-metrics

pull/24376/head
kodiakhq[bot] 2022-02-17 15:16:39 +00:00 committed by GitHub
commit c89fa3701e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 64 additions and 22 deletions

4
Cargo.lock generated
View File

@ -4166,8 +4166,8 @@ dependencies = [
[[package]]
name = "rskafka"
version = "0.1.0"
source = "git+https://github.com/influxdata/rskafka.git?rev=7c74d43f50c517c9cbdb912dc5169066330f2528#7c74d43f50c517c9cbdb912dc5169066330f2528"
version = "0.2.0"
source = "git+https://github.com/influxdata/rskafka.git?rev=3af1939fd47f8680d40074dc3fd2e2a4a0da6b8c#3af1939fd47f8680d40074dc3fd2e2a4a0da6b8c"
dependencies = [
"async-trait",
"bytes",

View File

@ -21,6 +21,12 @@ message IngesterQueryRequest {
// Optionally only return rows with a sequence number greater than this
optional uint64 greater_than_sequence_number = 6;
// Namespace to search
string namespace = 7;
// Sequencer to search
int32 sequencer_id = 8;
}
// Serialization of `predicate::predicate::Predicate` that contains DataFusion `Expr`s

View File

@ -14,7 +14,12 @@ use dotenv::dotenv;
use influxdb_iox_client::connection::Builder;
use observability_deps::tracing::warn;
use once_cell::sync::Lazy;
use std::str::FromStr;
use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
str::FromStr,
};
use time::{SystemProvider, TimeProvider};
use tokio::runtime::Runtime;
mod commands {
@ -135,6 +140,11 @@ struct Config {
#[clap(long, global = true)]
header: Vec<KeyValue<http::header::HeaderName, http::HeaderValue>>,
#[clap(long, global = true)]
/// Automatically generate an uber-trace-id header. The generated trace ID
/// will be emitted at the beginning of the response.
gen_trace_id: bool,
#[clap(long)]
/// Set the maximum number of threads to use. Defaults to the number of
/// cores on the system
@ -190,10 +200,23 @@ fn main() -> Result<(), std::io::Error> {
let log_verbose_count = config.log_verbose_count;
let connection = || async move {
let builder = headers.into_iter().fold(Builder::default(), |builder, kv| {
let mut builder = headers.into_iter().fold(Builder::default(), |builder, kv| {
builder.header(kv.key, kv.value)
});
if config.gen_trace_id {
let key = http::header::HeaderName::from_str(
trace_exporters::DEFAULT_JAEGER_TRACE_CONTEXT_HEADER_NAME,
)
.unwrap();
let trace_id = gen_trace_id();
let value = http::header::HeaderValue::from_str(trace_id.as_str()).unwrap();
builder = builder.header(key, value);
// Emit trace id information
println!("Trace ID set to {}", trace_id);
}
match builder.build(&host).await {
Ok(connection) => connection,
Err(e) => {
@ -290,6 +313,15 @@ fn main() -> Result<(), std::io::Error> {
Ok(())
}
// Generates a compatible header values for a jaeger trace context header.
fn gen_trace_id() -> String {
let now = SystemProvider::new().now();
let mut hasher = DefaultHasher::new();
now.timestamp_nanos().hash(&mut hasher);
format!("{:x}:1112223334445:0:1", hasher.finish())
}
/// Creates the tokio runtime for executing IOx
///
/// if nthreads is none, uses the default scheduler

View File

@ -8,7 +8,10 @@ use async_trait::async_trait;
use chrono::{format::StrftimeItems, TimeZone, Utc};
use data_types::delete_predicate::DeletePredicate;
use dml::DmlOperation;
use generated_types::{google::FieldViolation, influxdata::iox::ingester::v1 as proto};
use generated_types::{
google::{FieldViolation, FieldViolationExt},
influxdata::iox::ingester::v1 as proto,
};
use iox_catalog::interface::{
Catalog, KafkaPartition, NamespaceId, PartitionId, PartitionInfo, SequenceNumber, SequencerId,
TableId, Timestamp, Tombstone,
@ -949,6 +952,8 @@ impl TryFrom<proto::IngesterQueryRequest> for IngesterQueryRequest {
fn try_from(proto: proto::IngesterQueryRequest) -> Result<Self, Self::Error> {
let proto::IngesterQueryRequest {
namespace,
sequencer_id,
table,
columns,
min_time,
@ -958,10 +963,11 @@ impl TryFrom<proto::IngesterQueryRequest> for IngesterQueryRequest {
} = proto;
let predicate = predicate.map(TryInto::try_into).transpose()?;
let sequencer_id: i16 = sequencer_id.try_into().scope("sequencer_id")?;
Ok(Self::new(
"todo_namespace".to_string(), // todo #3753
SequencerId::new(0), // todo #3573
namespace,
SequencerId::new(sequencer_id),
table,
columns,
min_time,
@ -1050,8 +1056,8 @@ mod tests {
};
let rust_query = IngesterQueryRequest::new(
"todo_namespace".to_string(),
SequencerId::new(0),
"mydb".into(),
SequencerId::new(5),
"cpu".into(),
vec!["usage".into(), "time".into()],
1,
@ -1061,6 +1067,8 @@ mod tests {
);
let proto_query = proto::IngesterQueryRequest {
namespace: "mydb".into(),
sequencer_id: 5,
table: "cpu".into(),
columns: vec!["usage".into(), "time".into()],
min_time: 1,

View File

@ -275,7 +275,7 @@ impl Dictionary {
}
}
}
_ => unreachable!(format!("operator {:?} not supported for row_ids_equal", op)),
_ => unreachable!("operator {:?} not supported for row_ids_equal", op),
}
} else if let cmp::Operator::NotEqual = op {
// special case - the column does not contain the value in the
@ -410,7 +410,7 @@ impl Dictionary {
self.row_ids_encoded_cmp(id, &cmp::Operator::LT, dst)
}
_ => {
unreachable!(format!("operator {:?} not supported for row_ids_cmp", op));
unreachable!("operator {:?} not supported for row_ids_cmp", op);
}
},
}

View File

@ -35,6 +35,8 @@ mod thrift {
pub mod jaeger;
}
pub const DEFAULT_JAEGER_TRACE_CONTEXT_HEADER_NAME: &str = "uber-trace-id";
/// CLI config for distributed tracing options
#[derive(Debug, Clone, clap::Parser)]
pub struct TracingConfig {
@ -88,7 +90,7 @@ pub struct TracingConfig {
#[clap(
long = "--traces-exporter-jaeger-trace-context-header-name",
env = "TRACES_EXPORTER_JAEGER_TRACE_CONTEXT_HEADER_NAME",
default_value = "uber-trace-id"
default_value = DEFAULT_JAEGER_TRACE_CONTEXT_HEADER_NAME
)]
pub traces_jaeger_trace_context_header_name: String,

View File

@ -21,7 +21,7 @@ observability_deps = { path = "../observability_deps" }
parking_lot = "0.12"
pin-project = "1.0"
prost = "0.9"
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="7c74d43f50c517c9cbdb912dc5169066330f2528", default-features = false, features = ["compression-snappy"] }
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="3af1939fd47f8680d40074dc3fd2e2a4a0da6b8c", default-features = false, features = ["compression-snappy"] }
schema = { path = "../schema" }
time = { path = "../time" }
tokio = { version = "1.13", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] }

View File

@ -12,7 +12,7 @@ use dml::{DmlMeta, DmlOperation};
use futures::{stream::BoxStream, StreamExt};
use parking_lot::Mutex;
use rskafka::client::{
consumer::StreamConsumerBuilder,
consumer::{StartOffset, StreamConsumerBuilder},
error::{Error as RSKafkaError, ProtocolError},
partition::{OffsetAt, PartitionClient},
producer::{BatchProducer, BatchProducerBuilder},
@ -145,14 +145,8 @@ impl WriteBufferStreamHandler for RSKafkaStreamHandler {
*next_offset.lock()
};
let start_offset = match start_offset {
Some(x) => x,
None => {
// try to guess next offset from upstream
self.partition_client
.get_offset(OffsetAt::Earliest)
.await
.unwrap_or_default()
}
Some(x) => StartOffset::At(x),
None => StartOffset::Earliest,
};
let mut stream_builder =