feat(storage/reads): add scanned values and bytes metadata to the query (#12156)

This updates influxdb to use the new arbitrary metadata that can be
attached by a source and adds metadata entries for the cursor
statistics.
pull/12165/head
Jonathan A. Sternberg 2019-02-25 14:44:18 -06:00 committed by GitHub
parent 26e7f641b7
commit 70507670c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 93 additions and 84 deletions

2
go.mod
View File

@ -62,7 +62,7 @@ require (
github.com/hashicorp/vault v0.11.5
github.com/hashicorp/vault-plugin-secrets-kv v0.0.0-20181106190520-2236f141171e // indirect
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d // indirect
github.com/influxdata/flux v0.20.1-0.20190220204516-5138168746ee
github.com/influxdata/flux v0.20.1-0.20190225195205-f2ede3cf3a5d
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368
github.com/jefferai/jsonx v0.0.0-20160721235117-9cc31c3135ee // indirect

4
go.sum
View File

@ -224,8 +224,8 @@ github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/flux v0.20.1-0.20190220204516-5138168746ee h1:Mlg8OS1IHpAHOHlrsb0oi/Rn6yVkI68IrqCbkWbXWBQ=
github.com/influxdata/flux v0.20.1-0.20190220204516-5138168746ee/go.mod h1:0f5Yrm4VPSd/Ne6jIVOVtPo0MFe6jpLCr6vdaZYp7wY=
github.com/influxdata/flux v0.20.1-0.20190225195205-f2ede3cf3a5d h1:dfWOLAnFR1TTDsCsKXSfm6GVU1AQPAC2fBfjnteA29A=
github.com/influxdata/flux v0.20.1-0.20190225195205-f2ede3cf3a5d/go.mod h1:0f5Yrm4VPSd/Ne6jIVOVtPo0MFe6jpLCr6vdaZYp7wY=
github.com/influxdata/goreleaser v0.97.0-influx/go.mod h1:MnjA0e0Uq6ISqjG1WxxMAl+3VS1QYjILSWVnMYDxasE=
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 h1:CFx+pP90q/qg3spoiZjf8donE4WpAdjeJfPOcoNqkWo=
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo=

View File

@ -71,10 +71,7 @@ func (h *QueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) {
defer results.Release()
// Setup headers
stats, hasStats := results.(flux.Statisticser)
if hasStats {
w.Header().Set("Trailer", queryStatisticsTrailer)
}
w.Header().Set("Trailer", queryStatisticsTrailer)
// NOTE: We do not write out the headers here.
// It is possible that if the encoding step fails
@ -102,15 +99,13 @@ func (h *QueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) {
}
}
if hasStats {
data, err := json.Marshal(stats.Statistics())
if err != nil {
h.Logger.Info("Failed to encode statistics", zap.Error(err))
return
}
// Write statisitcs trailer
w.Header().Set(queryStatisticsTrailer, string(data))
data, err := json.Marshal(results.Statistics())
if err != nil {
h.Logger.Info("Failed to encode statistics", zap.Error(err))
return
}
// Write statisitcs trailer
w.Header().Set(queryStatisticsTrailer, string(data))
}
// PrometheusCollectors satisifies the prom.PrometheusCollector interface.

View File

@ -36,11 +36,8 @@ func (b ProxyQueryServiceBridge) Query(ctx context.Context, w io.Writer, req *Pr
defer results.Release()
// Setup headers
stats, hasStats := results.(flux.Statisticser)
if hasStats {
if w, ok := w.(http.ResponseWriter); ok {
w.Header().Set("Trailer", "Influx-Query-Statistics")
}
if w, ok := w.(http.ResponseWriter); ok {
w.Header().Set("Trailer", "Influx-Query-Statistics")
}
encoder := req.Dialect.Encoder()
@ -50,10 +47,8 @@ func (b ProxyQueryServiceBridge) Query(ctx context.Context, w io.Writer, req *Pr
}
if w, ok := w.(http.ResponseWriter); ok {
if hasStats {
data, _ := json.Marshal(stats.Statistics())
w.Header().Set("Influx-Query-Statistics", string(data))
}
data, _ := json.Marshal(results.Statistics())
w.Header().Set("Influx-Query-Statistics", string(data))
}
return n, nil

View File

@ -42,12 +42,10 @@ func (s *LoggingServiceBridge) Query(ctx context.Context, w io.Writer, req *Prox
}
// Check if this result iterator reports stats. We call this defer before cancel because
// the query needs to be finished before it will have valid statistics.
if s, ok := results.(flux.Statisticser); ok {
defer func() {
stats = s.Statistics()
}()
}
defer results.Release()
defer func() {
results.Release()
stats = results.Statistics()
}()
encoder := req.Dialect.Encoder()
n, err = encoder.Encode(w, results)

View File

@ -10,6 +10,7 @@ import (
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/semantic"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/tsdb/cursors"
"github.com/pkg/errors"
)
@ -76,7 +77,7 @@ type source struct {
currentTime execute.Time
overflow bool
stats flux.Statistics
stats cursors.CursorStats
}
func NewSource(id execute.DatasetID, r Reader, readSpec ReadSpec, bounds execute.Bounds, w execute.Window, currentTime execute.Time) execute.Source {
@ -101,6 +102,13 @@ func (s *source) Run(ctx context.Context) {
}
}
func (s *source) Metadata() flux.Metadata {
return flux.Metadata{
"influxdb/scanned-bytes": []interface{}{s.stats.ScannedBytes},
"influxdb/scanned-values": []interface{}{s.stats.ScannedValues},
}
}
func (s *source) run(ctx context.Context) error {
//TODO(nathanielc): Pass through context to actual network I/O.
for tables, mark, ok := s.next(ctx); ok; tables, mark, ok = s.next(ctx) {
@ -120,7 +128,12 @@ func (s *source) run(ctx context.Context) error {
if err != nil {
return err
}
s.stats = s.stats.Add(tables.Statistics())
// Track the number of bytes and values scanned.
stats := tables.Statistics()
s.stats.ScannedValues += stats.ScannedValues
s.stats.ScannedBytes += stats.ScannedBytes
for _, t := range s.ts {
if err := t.UpdateWatermark(s.id, mark); err != nil {
return err
@ -130,7 +143,7 @@ func (s *source) run(ctx context.Context) error {
return nil
}
func (s *source) next(ctx context.Context) (flux.TableIterator, execute.Time, bool) {
func (s *source) next(ctx context.Context) (TableIterator, execute.Time, bool) {
if s.overflow {
return nil, 0, false
}
@ -163,10 +176,6 @@ func (s *source) next(ctx context.Context) (flux.TableIterator, execute.Time, bo
return bi, stop, true
}
func (s *source) Statistics() flux.Statistics {
return s.stats
}
type GroupMode int
const (
@ -228,6 +237,12 @@ type ReadSpec struct {
}
type Reader interface {
Read(ctx context.Context, rs ReadSpec, start, stop execute.Time) (flux.TableIterator, error)
Read(ctx context.Context, rs ReadSpec, start, stop execute.Time) (TableIterator, error)
Close()
}
// TableIterator is a table iterator that also keeps track of cursor statistics from the storage engine.
type TableIterator interface {
flux.TableIterator
Statistics() cursors.CursorStats
}

View File

@ -20,6 +20,7 @@ type storageTable interface {
flux.Table
Close()
Cancel()
Statistics() cursors.CursorStats
}
type storeReader struct {
@ -30,7 +31,7 @@ func NewReader(s Store) influxdb.Reader {
return &storeReader{s: s}
}
func (r *storeReader) Read(ctx context.Context, rs influxdb.ReadSpec, start, stop execute.Time) (flux.TableIterator, error) {
func (r *storeReader) Read(ctx context.Context, rs influxdb.ReadSpec, start, stop execute.Time) (influxdb.TableIterator, error) {
var predicate *datatypes.Predicate
if rs.Predicate != nil {
p, err := toStoragePredicate(rs.Predicate)
@ -57,10 +58,10 @@ type tableIterator struct {
s Store
readSpec influxdb.ReadSpec
predicate *datatypes.Predicate
stats flux.Statistics
stats cursors.CursorStats
}
func (bi *tableIterator) Statistics() flux.Statistics { return bi.stats }
func (bi *tableIterator) Statistics() cursors.CursorStats { return bi.stats }
func (bi *tableIterator) Do(f func(flux.Table) error) error {
src, err := bi.s.GetSource(bi.readSpec)
@ -192,7 +193,9 @@ READ:
}
table.Close()
bi.stats = bi.stats.Add(table.Statistics())
stats := table.Statistics()
bi.stats.ScannedValues += stats.ScannedValues
bi.stats.ScannedBytes += stats.ScannedBytes
table = nil
}
return rs.Err()
@ -316,6 +319,9 @@ READ:
}
table.Close()
stats := table.Statistics()
bi.stats.ScannedValues += stats.ScannedValues
bi.stats.ScannedBytes += stats.ScannedBytes
table = nil
gc = rs.Next()

View File

@ -7,12 +7,12 @@
package reads
import (
"github.com/influxdata/flux/arrow"
"github.com/influxdata/flux/memory"
"sync"
"github.com/influxdata/flux"
"github.com/influxdata/flux/arrow"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb/cursors"
"github.com/pkg/errors"
@ -57,15 +57,15 @@ func (t *floatTable) Close() {
t.mu.Unlock()
}
func (t *floatTable) Statistics() flux.Statistics {
func (t *floatTable) Statistics() cursors.CursorStats {
t.mu.Lock()
defer t.mu.Unlock()
cur := t.cur
if cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
@ -233,12 +233,12 @@ func (t *floatGroupTable) advanceCursor() bool {
return false
}
func (t *floatGroupTable) Statistics() flux.Statistics {
func (t *floatGroupTable) Statistics() cursors.CursorStats {
if t.cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := t.cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
@ -283,15 +283,15 @@ func (t *integerTable) Close() {
t.mu.Unlock()
}
func (t *integerTable) Statistics() flux.Statistics {
func (t *integerTable) Statistics() cursors.CursorStats {
t.mu.Lock()
defer t.mu.Unlock()
cur := t.cur
if cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
@ -459,12 +459,12 @@ func (t *integerGroupTable) advanceCursor() bool {
return false
}
func (t *integerGroupTable) Statistics() flux.Statistics {
func (t *integerGroupTable) Statistics() cursors.CursorStats {
if t.cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := t.cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
@ -509,15 +509,15 @@ func (t *unsignedTable) Close() {
t.mu.Unlock()
}
func (t *unsignedTable) Statistics() flux.Statistics {
func (t *unsignedTable) Statistics() cursors.CursorStats {
t.mu.Lock()
defer t.mu.Unlock()
cur := t.cur
if cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
@ -685,12 +685,12 @@ func (t *unsignedGroupTable) advanceCursor() bool {
return false
}
func (t *unsignedGroupTable) Statistics() flux.Statistics {
func (t *unsignedGroupTable) Statistics() cursors.CursorStats {
if t.cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := t.cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
@ -735,15 +735,15 @@ func (t *stringTable) Close() {
t.mu.Unlock()
}
func (t *stringTable) Statistics() flux.Statistics {
func (t *stringTable) Statistics() cursors.CursorStats {
t.mu.Lock()
defer t.mu.Unlock()
cur := t.cur
if cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
@ -911,12 +911,12 @@ func (t *stringGroupTable) advanceCursor() bool {
return false
}
func (t *stringGroupTable) Statistics() flux.Statistics {
func (t *stringGroupTable) Statistics() cursors.CursorStats {
if t.cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := t.cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
@ -961,15 +961,15 @@ func (t *booleanTable) Close() {
t.mu.Unlock()
}
func (t *booleanTable) Statistics() flux.Statistics {
func (t *booleanTable) Statistics() cursors.CursorStats {
t.mu.Lock()
defer t.mu.Unlock()
cur := t.cur
if cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
@ -1137,12 +1137,12 @@ func (t *booleanGroupTable) advanceCursor() bool {
return false
}
func (t *booleanGroupTable) Statistics() flux.Statistics {
func (t *booleanGroupTable) Statistics() cursors.CursorStats {
if t.cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := t.cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}

View File

@ -1,12 +1,12 @@
package reads
import (
"github.com/influxdata/flux/arrow"
"github.com/influxdata/flux/memory"
"sync"
"github.com/influxdata/flux"
"github.com/influxdata/flux/arrow"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb/cursors"
"github.com/pkg/errors"
@ -51,15 +51,15 @@ func (t *{{.name}}Table) Close() {
t.mu.Unlock()
}
func (t *{{.name}}Table) Statistics() flux.Statistics {
func (t *{{.name}}Table) Statistics() cursors.CursorStats {
t.mu.Lock()
defer t.mu.Unlock()
cur := t.cur
if cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
@ -227,12 +227,12 @@ func (t *{{.name}}GroupTable) advanceCursor() bool {
return false
}
func (t *{{.name}}GroupTable) Statistics() flux.Statistics {
func (t *{{.name}}GroupTable) Statistics() cursors.CursorStats {
if t.cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := t.cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}

View File

@ -211,7 +211,7 @@ func newTableNoPoints(
func (t *tableNoPoints) Close() {}
func (t *tableNoPoints) Statistics() flux.Statistics { return flux.Statistics{} }
func (t *tableNoPoints) Statistics() cursors.CursorStats { return cursors.CursorStats{} }
func (t *tableNoPoints) Do(f func(flux.ColReader) error) error {
if t.isCancelled() {
@ -251,7 +251,7 @@ func (t *groupTableNoPoints) Do(f func(flux.ColReader) error) error {
return t.err
}
func (t *groupTableNoPoints) Statistics() flux.Statistics { return flux.Statistics{} }
func (t *groupTableNoPoints) Statistics() cursors.CursorStats { return cursors.CursorStats{} }
func (t *floatTable) toArrowBuffer(vs []float64) *array.Float64 {
return arrow.NewFloat(vs, &memory.Allocator{})