Merge pull request #17499 from influxdata/feature/move-prometheus-span
chore(tracing): Span with Prometheus Metrics moved to kit/tracingpull/17598/head
commit
47959b7436
|
@ -6,7 +6,9 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/uber/jaeger-client-go"
|
"github.com/uber/jaeger-client-go"
|
||||||
|
|
||||||
"github.com/influxdata/httprouter"
|
"github.com/influxdata/httprouter"
|
||||||
|
@ -75,6 +77,33 @@ func annotateSpan(span opentracing.Span, handlerName string, req *http.Request)
|
||||||
span.LogKV("path", req.URL.Path)
|
span.LogKV("path", req.URL.Path)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// span is a simple wrapper around opentracing.Span in order to
|
||||||
|
// get access to the duration of the span for metrics reporting.
|
||||||
|
type Span struct {
|
||||||
|
opentracing.Span
|
||||||
|
start time.Time
|
||||||
|
Duration time.Duration
|
||||||
|
hist prometheus.Observer
|
||||||
|
gauge prometheus.Gauge
|
||||||
|
}
|
||||||
|
|
||||||
|
func StartSpanFromContextWithPromMetrics(ctx context.Context, operationName string, hist prometheus.Observer, gauge prometheus.Gauge, opts ...opentracing.StartSpanOption) (*Span, context.Context) {
|
||||||
|
start := time.Now()
|
||||||
|
s, sctx := StartSpanFromContextWithOperationName(ctx, operationName, opentracing.StartTime(start))
|
||||||
|
gauge.Inc()
|
||||||
|
return &Span{s, start, 0, hist, gauge}, sctx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Span) Finish() {
|
||||||
|
finish := time.Now()
|
||||||
|
s.Duration = finish.Sub(s.start)
|
||||||
|
s.Span.FinishWithOptions(opentracing.FinishOptions{
|
||||||
|
FinishTime: finish,
|
||||||
|
})
|
||||||
|
s.hist.Observe(s.Duration.Seconds())
|
||||||
|
s.gauge.Dec()
|
||||||
|
}
|
||||||
|
|
||||||
// StartSpanFromContext is an improved opentracing.StartSpanFromContext.
|
// StartSpanFromContext is an improved opentracing.StartSpanFromContext.
|
||||||
// Uses the calling function as the operation name, and logs the filename and line number.
|
// Uses the calling function as the operation name, and logs the filename and line number.
|
||||||
//
|
//
|
||||||
|
@ -100,7 +129,7 @@ func annotateSpan(span opentracing.Span, handlerName string, req *http.Request)
|
||||||
//
|
//
|
||||||
// // Sugar to create a child span
|
// // Sugar to create a child span
|
||||||
// span, ctx := opentracing.StartSpanFromContext(ctx, "operation name")
|
// span, ctx := opentracing.StartSpanFromContext(ctx, "operation name")
|
||||||
func StartSpanFromContext(ctx context.Context) (opentracing.Span, context.Context) {
|
func StartSpanFromContext(ctx context.Context, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) {
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
panic("StartSpanFromContext called with nil context")
|
panic("StartSpanFromContext called with nil context")
|
||||||
}
|
}
|
||||||
|
@ -109,7 +138,7 @@ func StartSpanFromContext(ctx context.Context) (opentracing.Span, context.Contex
|
||||||
var pcs [1]uintptr
|
var pcs [1]uintptr
|
||||||
n := runtime.Callers(2, pcs[:])
|
n := runtime.Callers(2, pcs[:])
|
||||||
if n < 1 {
|
if n < 1 {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "unknown")
|
span, ctx := opentracing.StartSpanFromContext(ctx, "unknown", opts...)
|
||||||
span.LogFields(log.Error(errors.New("runtime.Callers failed")))
|
span.LogFields(log.Error(errors.New("runtime.Callers failed")))
|
||||||
return span, ctx
|
return span, ctx
|
||||||
}
|
}
|
||||||
|
@ -122,7 +151,8 @@ func StartSpanFromContext(ctx context.Context) (opentracing.Span, context.Contex
|
||||||
var span opentracing.Span
|
var span opentracing.Span
|
||||||
if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil {
|
if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil {
|
||||||
// Create a child span.
|
// Create a child span.
|
||||||
span = opentracing.StartSpan(name, opentracing.ChildOf(parentSpan.Context()))
|
opts = append(opts, opentracing.ChildOf(parentSpan.Context()))
|
||||||
|
span = opentracing.StartSpan(name, opts...)
|
||||||
} else {
|
} else {
|
||||||
// Create a root span.
|
// Create a root span.
|
||||||
span = opentracing.StartSpan(name)
|
span = opentracing.StartSpan(name)
|
||||||
|
@ -137,7 +167,7 @@ func StartSpanFromContext(ctx context.Context) (opentracing.Span, context.Contex
|
||||||
}
|
}
|
||||||
|
|
||||||
// StartSpanFromContextWithOperationName is like StartSpanFromContext, but the caller determines the operation name.
|
// StartSpanFromContextWithOperationName is like StartSpanFromContext, but the caller determines the operation name.
|
||||||
func StartSpanFromContextWithOperationName(ctx context.Context, operationName string) (opentracing.Span, context.Context) {
|
func StartSpanFromContextWithOperationName(ctx context.Context, operationName string, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) {
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
panic("StartSpanFromContextWithOperationName called with nil context")
|
panic("StartSpanFromContextWithOperationName called with nil context")
|
||||||
}
|
}
|
||||||
|
@ -146,7 +176,7 @@ func StartSpanFromContextWithOperationName(ctx context.Context, operationName st
|
||||||
var pcs [1]uintptr
|
var pcs [1]uintptr
|
||||||
n := runtime.Callers(2, pcs[:])
|
n := runtime.Callers(2, pcs[:])
|
||||||
if n < 1 {
|
if n < 1 {
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, operationName)
|
span, ctx := opentracing.StartSpanFromContext(ctx, operationName, opts...)
|
||||||
span.LogFields(log.Error(errors.New("runtime.Callers failed")))
|
span.LogFields(log.Error(errors.New("runtime.Callers failed")))
|
||||||
return span, ctx
|
return span, ctx
|
||||||
}
|
}
|
||||||
|
@ -154,11 +184,12 @@ func StartSpanFromContextWithOperationName(ctx context.Context, operationName st
|
||||||
|
|
||||||
var span opentracing.Span
|
var span opentracing.Span
|
||||||
if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil {
|
if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil {
|
||||||
|
opts = append(opts, opentracing.ChildOf(parentSpan.Context()))
|
||||||
// Create a child span.
|
// Create a child span.
|
||||||
span = opentracing.StartSpan(operationName, opentracing.ChildOf(parentSpan.Context()))
|
span = opentracing.StartSpan(operationName, opts...)
|
||||||
} else {
|
} else {
|
||||||
// Create a root span.
|
// Create a root span.
|
||||||
span = opentracing.StartSpan(operationName)
|
span = opentracing.StartSpan(operationName, opts...)
|
||||||
}
|
}
|
||||||
// New context references this span, not the parent (if there was one).
|
// New context references this span, not the parent (if there was one).
|
||||||
ctx = opentracing.ContextWithSpan(ctx, span)
|
ctx = opentracing.ContextWithSpan(ctx, span)
|
||||||
|
|
|
@ -23,7 +23,6 @@ import (
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/influxdata/flux"
|
"github.com/influxdata/flux"
|
||||||
"github.com/influxdata/flux/codes"
|
"github.com/influxdata/flux/codes"
|
||||||
|
@ -35,7 +34,6 @@ import (
|
||||||
"github.com/influxdata/influxdb/kit/tracing"
|
"github.com/influxdata/influxdb/kit/tracing"
|
||||||
influxlogger "github.com/influxdata/influxdb/logger"
|
influxlogger "github.com/influxdata/influxdb/logger"
|
||||||
"github.com/influxdata/influxdb/query"
|
"github.com/influxdata/influxdb/query"
|
||||||
"github.com/opentracing/opentracing-go"
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
|
@ -265,7 +263,7 @@ func (c *Controller) createQuery(ctx context.Context, ct flux.CompilerType) (*Qu
|
||||||
compileLabelValues[len(compileLabelValues)-1] = string(ct)
|
compileLabelValues[len(compileLabelValues)-1] = string(ct)
|
||||||
|
|
||||||
cctx, cancel := context.WithCancel(ctx)
|
cctx, cancel := context.WithCancel(ctx)
|
||||||
parentSpan, parentCtx := StartSpanFromContext(
|
parentSpan, parentCtx := tracing.StartSpanFromContextWithPromMetrics(
|
||||||
cctx,
|
cctx,
|
||||||
"all",
|
"all",
|
||||||
c.metrics.allDur.WithLabelValues(labelValues...),
|
c.metrics.allDur.WithLabelValues(labelValues...),
|
||||||
|
@ -535,7 +533,7 @@ type Query struct {
|
||||||
cancel func()
|
cancel func()
|
||||||
|
|
||||||
parentCtx context.Context
|
parentCtx context.Context
|
||||||
parentSpan, currentSpan *span
|
parentSpan, currentSpan *tracing.Span
|
||||||
stats flux.Statistics
|
stats flux.Statistics
|
||||||
|
|
||||||
done sync.Once
|
done sync.Once
|
||||||
|
@ -762,7 +760,7 @@ TRANSITION:
|
||||||
return q.parentCtx, true
|
return q.parentCtx, true
|
||||||
}
|
}
|
||||||
var currentCtx context.Context
|
var currentCtx context.Context
|
||||||
q.currentSpan, currentCtx = StartSpanFromContext(
|
q.currentSpan, currentCtx = tracing.StartSpanFromContextWithPromMetrics(
|
||||||
q.parentCtx,
|
q.parentCtx,
|
||||||
newState.String(),
|
newState.String(),
|
||||||
dur.WithLabelValues(labelValues...),
|
dur.WithLabelValues(labelValues...),
|
||||||
|
@ -977,38 +975,6 @@ func isFinishedState(state State) bool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// span is a simple wrapper around opentracing.Span in order to
|
|
||||||
// get access to the duration of the span for metrics reporting.
|
|
||||||
type span struct {
|
|
||||||
s opentracing.Span
|
|
||||||
start time.Time
|
|
||||||
Duration time.Duration
|
|
||||||
hist prometheus.Observer
|
|
||||||
gauge prometheus.Gauge
|
|
||||||
}
|
|
||||||
|
|
||||||
func StartSpanFromContext(ctx context.Context, operationName string, hist prometheus.Observer, gauge prometheus.Gauge) (*span, context.Context) {
|
|
||||||
start := time.Now()
|
|
||||||
s, sctx := opentracing.StartSpanFromContext(ctx, operationName, opentracing.StartTime(start))
|
|
||||||
gauge.Inc()
|
|
||||||
return &span{
|
|
||||||
s: s,
|
|
||||||
start: start,
|
|
||||||
hist: hist,
|
|
||||||
gauge: gauge,
|
|
||||||
}, sctx
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *span) Finish() {
|
|
||||||
finish := time.Now()
|
|
||||||
s.Duration = finish.Sub(s.start)
|
|
||||||
s.s.FinishWithOptions(opentracing.FinishOptions{
|
|
||||||
FinishTime: finish,
|
|
||||||
})
|
|
||||||
s.hist.Observe(s.Duration.Seconds())
|
|
||||||
s.gauge.Dec()
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleFluxError will take a flux.Error and convert it into an influxdb.Error.
|
// handleFluxError will take a flux.Error and convert it into an influxdb.Error.
|
||||||
// It will match certain codes to the equivalent in influxdb.
|
// It will match certain codes to the equivalent in influxdb.
|
||||||
//
|
//
|
||||||
|
|
Loading…
Reference in New Issue