Merge branch 'main' into crepererum/issue2292

pull/24376/head
kodiakhq[bot] 2021-08-17 11:29:24 +00:00 committed by GitHub
commit e6ae36722a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 96 additions and 85 deletions

View File

@ -286,11 +286,20 @@ jobs:
# Disabling for now, and tracked further investigations
# in https://github.com/influxdata/k8s-idpe/issues/3038
docker_layer_caching: false
version: 19.03.14
- run: |
sudo apt-get update
sudo apt-get install -y docker.io
- run: |
echo "$QUAY_PASS" | docker login quay.io --username $QUAY_USER --password-stdin
- run:
# Docker has functionality to support per-Dockerfile .dockerignore
# This was added in https://github.com/moby/buildkit/pull/901
# And included in 19.03 - https://docs.docker.com/engine/release-notes/19.03/#19030
# Unfortunately CircleCI only seems to understand a root-level .dockerignore
# So we need to move it into position for it to not send ~10GB of build context
name: Fudge CircleCI Docker Context
command: mv docker/Dockerfile.iox.dockerignore .dockerignore
- run: |
BRANCH=$(git rev-parse --abbrev-ref HEAD | tr '/' '.')
echo sha256sum after build is

View File

@ -555,11 +555,11 @@ mod tests {
)
.header(
HeaderName::from_static("x-b3-traceid"),
HeaderValue::from_static("999999999"),
HeaderValue::from_static("fea24902"),
)
.header(
HeaderName::from_static("x-b3-spanid"),
HeaderValue::from_static("111111"),
HeaderValue::from_static("ab3409"),
)
.build(format!("http://{}", addr))
.await
@ -573,7 +573,7 @@ mod tests {
let jaeger_tracing_client = influxdb_iox_client::connection::Builder::default()
.header(
HeaderName::from_static("uber-trace-id"),
HeaderValue::from_static("3459495:30434:0:1"),
HeaderValue::from_static("34f9495:30e34:0:1"),
)
.build(format!("http://{}", addr))
.await
@ -593,20 +593,20 @@ mod tests {
.collect();
assert_eq!(spans[0].name, "IOx");
assert_eq!(spans[0].ctx.parent_span_id.unwrap().0.get(), 111111);
assert_eq!(spans[0].ctx.trace_id.0.get(), 999999999);
assert_eq!(spans[0].ctx.parent_span_id.unwrap().0.get(), 0xab3409);
assert_eq!(spans[0].ctx.trace_id.0.get(), 0xfea24902);
assert!(spans[0].start.is_some());
assert!(spans[0].end.is_some());
assert_eq!(spans[1].name, "IOx");
assert_eq!(spans[1].ctx.parent_span_id.unwrap().0.get(), 111111);
assert_eq!(spans[1].ctx.trace_id.0.get(), 999999999);
assert_eq!(spans[1].ctx.parent_span_id.unwrap().0.get(), 0xab3409);
assert_eq!(spans[1].ctx.trace_id.0.get(), 0xfea24902);
assert!(spans[1].start.is_some());
assert!(spans[1].end.is_some());
assert_eq!(spans[2].name, "IOx");
assert_eq!(spans[2].ctx.parent_span_id.unwrap().0.get(), 30434);
assert_eq!(spans[2].ctx.trace_id.0.get(), 3459495);
assert_eq!(spans[2].ctx.parent_span_id.unwrap().0.get(), 0x30e34);
assert_eq!(spans[2].ctx.trace_id.0.get(), 0x34f9495);
assert!(spans[2].start.is_some());
assert!(spans[2].end.is_some());

View File

@ -9,15 +9,15 @@ description = "Distributed tracing support within IOx"
chrono = { version = "0.4", features = ["serde"] }
futures = "0.3"
http = "0.2.0"
http = "0.2"
http-body = "0.4"
itertools = "0.10"
observability_deps = { path = "../observability_deps" }
parking_lot = "0.11"
pin-project = "1.0"
rand = "0.8.3"
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.44"
serde_json = "1.0"
snafu = "0.6"
tower = "0.4"

View File

@ -69,7 +69,7 @@ impl<'a> FromStr for TraceId {
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(
NonZeroU128::new(s.parse()?).ok_or(DecodeError::ZeroError)?,
NonZeroU128::new(u128::from_str_radix(s, 16)?).ok_or(DecodeError::ZeroError)?,
))
}
}
@ -89,7 +89,7 @@ impl<'a> FromStr for SpanId {
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(
NonZeroU64::new(s.parse()?).ok_or(DecodeError::ZeroError)?,
NonZeroU64::new(u64::from_str_radix(s, 16)?).ok_or(DecodeError::ZeroError)?,
))
}
}
@ -219,7 +219,7 @@ impl FromStr for JaegerCtx {
"0" => None,
_ => Some(parent_span_id.parse()?),
};
let flags = flags.parse()?;
let flags = u8::from_str_radix(flags, 16)?;
Ok(Self {
trace_id,
@ -308,7 +308,7 @@ mod tests {
.unwrap()
.is_none());
headers.insert(B3_TRACE_ID_HEADER, HeaderValue::from_static("99999999"));
headers.insert(B3_TRACE_ID_HEADER, HeaderValue::from_static("ee25f"));
headers.insert(B3_SAMPLED_HEADER, HeaderValue::from_static("0"));
// Not sampled
@ -326,14 +326,14 @@ mod tests {
"header 'X-B3-SpanId' not found"
);
headers.insert(B3_SPAN_ID_HEADER, HeaderValue::from_static("69559"));
headers.insert(B3_SPAN_ID_HEADER, HeaderValue::from_static("34e"));
let span = SpanContext::from_headers(&collector, &headers)
.unwrap()
.unwrap();
assert_eq!(span.span_id.0.get(), 69559);
assert_eq!(span.trace_id.0.get(), 99999999);
assert_eq!(span.span_id.0.get(), 0x34e);
assert_eq!(span.trace_id.0.get(), 0xee25f);
assert!(span.parent_span_id.is_none());
headers.insert(
@ -345,9 +345,9 @@ mod tests {
.unwrap()
.unwrap();
assert_eq!(span.span_id.0.get(), 69559);
assert_eq!(span.trace_id.0.get(), 99999999);
assert_eq!(span.parent_span_id.unwrap().0.get(), 4595945);
assert_eq!(span.span_id.0.get(), 0x34e);
assert_eq!(span.trace_id.0.get(), 0xee25f);
assert_eq!(span.parent_span_id.unwrap().0.get(), 0x4595945);
headers.insert(B3_SPAN_ID_HEADER, HeaderValue::from_static("not a number"));
@ -394,28 +394,28 @@ mod tests {
// Sampled
headers.insert(
JAEGER_TRACE_HEADER,
HeaderValue::from_static("343:4325345:0:1"),
HeaderValue::from_static("3a43:432e345:0:1"),
);
let span = SpanContext::from_headers(&collector, &headers)
.unwrap()
.unwrap();
assert_eq!(span.trace_id.0.get(), 343);
assert_eq!(span.span_id.0.get(), 4325345);
assert_eq!(span.trace_id.0.get(), 0x3a43);
assert_eq!(span.span_id.0.get(), 0x432e345);
assert!(span.parent_span_id.is_none());
// Parent span
headers.insert(
JAEGER_TRACE_HEADER,
HeaderValue::from_static("343:4325345:3434:1"),
HeaderValue::from_static("343:4325345:3434:F"),
);
let span = SpanContext::from_headers(&collector, &headers)
.unwrap()
.unwrap();
assert_eq!(span.trace_id.0.get(), 343);
assert_eq!(span.span_id.0.get(), 4325345);
assert_eq!(span.parent_span_id.unwrap().0.get(), 3434);
assert_eq!(span.trace_id.0.get(), 0x343);
assert_eq!(span.span_id.0.get(), 0x4325345);
assert_eq!(span.parent_span_id.unwrap().0.get(), 0x3434);
// Invalid trace id
headers.insert(

View File

@ -101,6 +101,7 @@ impl KafkaBufferProducer {
cfg.set("queue.buffering.max.kbytes", "31457280");
cfg.set("request.required.acks", "all"); // equivalent to acks=-1
cfg.set("compression.type", "snappy");
cfg.set("statistics.interval.ms", "15000");
let producer: FutureProducer = cfg.create()?;
@ -253,6 +254,7 @@ impl KafkaBufferConsumer {
cfg.set("bootstrap.servers", &conn);
cfg.set("session.timeout.ms", "6000");
cfg.set("enable.auto.commit", "false");
cfg.set("statistics.interval.ms", "15000");
// Create a unique group ID for this database's consumer as we don't want to create
// consumer groups.