Merge branch 'master' into amh-8789

pull/8845/head
Andrew Hare 2017-11-28 17:05:42 -07:00 committed by GitHub
commit 28ec02a7c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 1549 additions and 327 deletions

View File

@ -6,13 +6,18 @@
### Features
- [#8495](https://github.com/influxdata/influxdb/pull/8495): Improve CLI connection warnings
- [#9084](https://github.com/influxdata/influxdb/pull/9084): Handle high cardinality deletes in TSM engine
- [#9162](https://github.com/influxdata/influxdb/pull/9162): Improve inmem index startup performance for high cardinality.
### Bugfixes
- [#9065](https://github.com/influxdata/influxdb/pull/9065): Refuse extra arguments to influx CLI
- [#9058](https://github.com/influxdata/influxdb/issues/9058): Fix space required after regex operator. Thanks @stop-start!
- [#8789](https://github.com/influxdata/influxdb/issues/8789): Fix CLI to allow quoted database names in use statement
- [#9109](https://github.com/influxdata/influxdb/issues/9109): Fix: panic: sync: WaitGroup is reused before previous Wait has returned
- [#9163](https://github.com/influxdata/influxdb/pull/9163): Fix race condition in the merge iterator close method.
- [#9144](https://github.com/influxdata/influxdb/issues/9144): Fix query compilation so multiple nested distinct calls is allowable
- [#8789](https://github.com/influxdata/influxdb/issues/8789): Fix CLI to allow quoted database names in use statement
## v1.4.2 [2017-11-15]

2
Godeps
View File

@ -15,6 +15,7 @@ github.com/influxdata/usage-client 6d3895376368aa52a3a81d2a16e90f0f52371967
github.com/influxdata/yamux 1f58ded512de5feabbe30b60c7d33a7a896c5f16
github.com/influxdata/yarpc 036268cdec22b7074cd6d50cc6d7315c667063c7
github.com/jwilder/encoding 27894731927e49b0a9023f00312be26733744815
github.com/opentracing/opentracing-go 1361b9cd60be79c4c3a7fa9841b3c132e40066a7
github.com/paulbellamy/ratecounter 5a11f585a31379765c190c033b6ad39956584447
github.com/peterh/liner 88609521dc4b6c858fd4c98b628147da928ce4ac
github.com/philhofer/fwd 1612a298117663d7bc9a760ae20d383413859798
@ -26,5 +27,6 @@ go.uber.org/atomic 54f72d32435d760d5604f17a82e2435b28dc4ba5
go.uber.org/multierr fb7d312c2c04c34f0ad621048bbb953b168f9ff6
go.uber.org/zap 35aad584952c3e7020db7b839f6b102de6271f89
golang.org/x/crypto 9477e0b78b9ac3d0b03822fd95422e2fe07627cd
golang.org/x/net 9dfe39835686865bff950a07b394c12a98ddc811
golang.org/x/sys 062cd7e4e68206d8bab9b18396626e855c992658
golang.org/x/text a71fd10341b064c10f4a81ceac72bcf70f26ea34

View File

@ -187,7 +187,12 @@ func (c *CommandLine) Run() error {
c.Line.SetMultiLineMode(true)
fmt.Printf("Connected to %s version %s\n", c.Client.Addr(), c.ServerVersion)
if len(c.ServerVersion) == 0 {
fmt.Printf("WARN: Connected to %s, but found no server version.\n", c.Client.Addr())
fmt.Printf("Are you sure an InfluxDB server is listening at the given address?\n")
} else {
fmt.Printf("Connected to %s version %s\n", c.Client.Addr(), c.ServerVersion)
}
c.Version()

View File

@ -5,10 +5,10 @@ import (
"flag"
"fmt"
"os"
"strings"
"github.com/influxdata/influxdb/client"
"github.com/influxdata/influxdb/cmd/influx/cli"
"strings"
)
// These variables are populated via the Go linker.

20
pkg/file/file_unix.go Normal file
View File

@ -0,0 +1,20 @@
// +build !windows
package file
import "os"
func SyncDir(dirName string) error {
// fsync the dir to flush the rename
dir, err := os.OpenFile(dirName, os.O_RDONLY, os.ModeDir)
if err != nil {
return err
}
defer dir.Close()
return dir.Sync()
}
// RenameFile will rename the source to target using os function.
func RenameFile(oldpath, newpath string) error {
return os.Rename(oldpath, newpath)
}

18
pkg/file/file_windows.go Normal file
View File

@ -0,0 +1,18 @@
package file
import "os"
func SyncDir(dirName string) error {
return nil
}
// RenameFile will rename the source to target using os function. If target exists it will be removed before renaming.
func RenameFile(oldpath, newpath string) error {
if _, err := os.Stat(newpath); err == nil {
if err = os.Remove(newpath); nil != err {
return err
}
}
return os.Rename(oldpath, newpath)
}

View File

@ -250,7 +250,7 @@ func (c *compiledField) compileExpr(expr influxql.Expr) error {
case "sample":
return c.compileSample(expr.Args)
case "distinct":
return c.compileDistinct(expr.Args)
return c.compileDistinct(expr.Args, false)
case "top", "bottom":
return c.compileTopBottom(expr)
case "derivative", "non_negative_derivative":
@ -276,7 +276,7 @@ func (c *compiledField) compileExpr(expr influxql.Expr) error {
case *influxql.Distinct:
call := expr.NewCall()
c.global.FunctionCalls = append(c.global.FunctionCalls, call)
return c.compileDistinct(call.Args)
return c.compileDistinct(call.Args, false)
case *influxql.BinaryExpr:
// Disallow wildcards in binary expressions. RewriteFields, which expands
// wildcards, is too complicated if we allow wildcards inside of expressions.
@ -349,10 +349,10 @@ func (c *compiledField) compileFunction(expr *influxql.Call) error {
if expr.Name == "count" {
// If we have count(), the argument may be a distinct() call.
if arg0, ok := expr.Args[0].(*influxql.Call); ok && arg0.Name == "distinct" {
return c.compileDistinct(arg0.Args)
return c.compileDistinct(arg0.Args, true)
} else if arg0, ok := expr.Args[0].(*influxql.Distinct); ok {
call := arg0.NewCall()
return c.compileDistinct(call.Args)
return c.compileDistinct(call.Args, true)
}
}
return c.compileSymbol(expr.Name, expr.Args[0])
@ -591,7 +591,7 @@ func (c *compiledField) compileHoltWinters(args []influxql.Expr, withFit bool) e
return c.compileExpr(call)
}
func (c *compiledField) compileDistinct(args []influxql.Expr) error {
func (c *compiledField) compileDistinct(args []influxql.Expr, nested bool) error {
if len(args) == 0 {
return errors.New("distinct function requires at least one argument")
} else if len(args) != 1 {
@ -601,7 +601,9 @@ func (c *compiledField) compileDistinct(args []influxql.Expr) error {
if _, ok := args[0].(*influxql.VarRef); !ok {
return errors.New("expected field argument in distinct()")
}
c.global.HasDistinct = true
if !nested {
c.global.HasDistinct = true
}
c.global.OnlySelectors = false
return nil
}

View File

@ -80,6 +80,7 @@ func TestCompile_Success(t *testing.T) {
`SELECT max(value) FROM (SELECT value + total FROM cpu) WHERE time >= now() - 1m GROUP BY time(10s)`,
`SELECT value FROM cpu WHERE time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T01:00:00Z'`,
`SELECT value FROM (SELECT value FROM cpu) ORDER BY time DESC`,
`SELECT count(distinct(value)), max(value) FROM cpu`,
} {
t.Run(tt, func(t *testing.T) {
stmt, err := influxql.ParseStatement(tt)
@ -121,7 +122,6 @@ func TestCompile_Failures(t *testing.T) {
{s: `SELECT mean() FROM cpu`, err: `invalid number of arguments for mean, expected 1, got 0`},
{s: `SELECT mean(value, host) FROM cpu`, err: `invalid number of arguments for mean, expected 1, got 2`},
{s: `SELECT distinct(value), max(value) FROM cpu`, err: `aggregate function distinct() cannot be combined with other functions or fields`},
{s: `SELECT count(distinct(value)), max(value) FROM cpu`, err: `aggregate function distinct() cannot be combined with other functions or fields`},
{s: `SELECT count(distinct()) FROM cpu`, err: `distinct function requires at least one argument`},
{s: `SELECT count(distinct(value, host)) FROM cpu`, err: `distinct function can only have one argument`},
{s: `SELECT count(distinct(2)) FROM cpu`, err: `expected field argument in distinct()`},

View File

@ -112,6 +112,9 @@ type floatMergeIterator struct {
heap *floatMergeHeap
init bool
closed bool
mu sync.RWMutex
// Current iterator and window.
curr *floatMergeHeapItem
window struct {
@ -140,6 +143,7 @@ func newFloatMergeIterator(inputs []FloatIterator, opt IteratorOptions) *floatMe
// Append to the heap.
itr.heap.items = append(itr.heap.items, &floatMergeHeapItem{itr: bufInput})
}
return itr
}
@ -154,17 +158,27 @@ func (itr *floatMergeIterator) Stats() IteratorStats {
// Close closes the underlying iterators.
func (itr *floatMergeIterator) Close() error {
itr.mu.Lock()
defer itr.mu.Unlock()
for _, input := range itr.inputs {
input.Close()
}
itr.curr = nil
itr.inputs = nil
itr.heap.items = nil
itr.closed = true
return nil
}
// Next returns the next point from the iterator.
func (itr *floatMergeIterator) Next() (*FloatPoint, error) {
itr.mu.RLock()
defer itr.mu.RUnlock()
if itr.closed {
return nil, nil
}
// Initialize the heap. This needs to be done lazily on the first call to this iterator
// so that iterator initialization done through the Select() call returns quickly.
// Queries can only be interrupted after the Select() call completes so any operations
@ -3514,6 +3528,9 @@ type integerMergeIterator struct {
heap *integerMergeHeap
init bool
closed bool
mu sync.RWMutex
// Current iterator and window.
curr *integerMergeHeapItem
window struct {
@ -3557,17 +3574,27 @@ func (itr *integerMergeIterator) Stats() IteratorStats {
// Close closes the underlying iterators.
func (itr *integerMergeIterator) Close() error {
itr.mu.Lock()
defer itr.mu.Unlock()
for _, input := range itr.inputs {
input.Close()
}
itr.curr = nil
itr.inputs = nil
itr.heap.items = nil
itr.closed = true
return nil
}
// Next returns the next point from the iterator.
func (itr *integerMergeIterator) Next() (*IntegerPoint, error) {
itr.mu.RLock()
defer itr.mu.RUnlock()
if itr.closed {
return nil, nil
}
// Initialize the heap. This needs to be done lazily on the first call to this iterator
// so that iterator initialization done through the Select() call returns quickly.
// Queries can only be interrupted after the Select() call completes so any operations
@ -6914,6 +6941,9 @@ type unsignedMergeIterator struct {
heap *unsignedMergeHeap
init bool
closed bool
mu sync.RWMutex
// Current iterator and window.
curr *unsignedMergeHeapItem
window struct {
@ -6957,17 +6987,27 @@ func (itr *unsignedMergeIterator) Stats() IteratorStats {
// Close closes the underlying iterators.
func (itr *unsignedMergeIterator) Close() error {
itr.mu.Lock()
defer itr.mu.Unlock()
for _, input := range itr.inputs {
input.Close()
}
itr.curr = nil
itr.inputs = nil
itr.heap.items = nil
itr.closed = true
return nil
}
// Next returns the next point from the iterator.
func (itr *unsignedMergeIterator) Next() (*UnsignedPoint, error) {
itr.mu.RLock()
defer itr.mu.RUnlock()
if itr.closed {
return nil, nil
}
// Initialize the heap. This needs to be done lazily on the first call to this iterator
// so that iterator initialization done through the Select() call returns quickly.
// Queries can only be interrupted after the Select() call completes so any operations
@ -10314,6 +10354,9 @@ type stringMergeIterator struct {
heap *stringMergeHeap
init bool
closed bool
mu sync.RWMutex
// Current iterator and window.
curr *stringMergeHeapItem
window struct {
@ -10357,17 +10400,27 @@ func (itr *stringMergeIterator) Stats() IteratorStats {
// Close closes the underlying iterators.
func (itr *stringMergeIterator) Close() error {
itr.mu.Lock()
defer itr.mu.Unlock()
for _, input := range itr.inputs {
input.Close()
}
itr.curr = nil
itr.inputs = nil
itr.heap.items = nil
itr.closed = true
return nil
}
// Next returns the next point from the iterator.
func (itr *stringMergeIterator) Next() (*StringPoint, error) {
itr.mu.RLock()
defer itr.mu.RUnlock()
if itr.closed {
return nil, nil
}
// Initialize the heap. This needs to be done lazily on the first call to this iterator
// so that iterator initialization done through the Select() call returns quickly.
// Queries can only be interrupted after the Select() call completes so any operations
@ -13700,6 +13753,9 @@ type booleanMergeIterator struct {
heap *booleanMergeHeap
init bool
closed bool
mu sync.RWMutex
// Current iterator and window.
curr *booleanMergeHeapItem
window struct {
@ -13743,17 +13799,27 @@ func (itr *booleanMergeIterator) Stats() IteratorStats {
// Close closes the underlying iterators.
func (itr *booleanMergeIterator) Close() error {
itr.mu.Lock()
defer itr.mu.Unlock()
for _, input := range itr.inputs {
input.Close()
}
itr.curr = nil
itr.inputs = nil
itr.heap.items = nil
itr.closed = true
return nil
}
// Next returns the next point from the iterator.
func (itr *booleanMergeIterator) Next() (*BooleanPoint, error) {
itr.mu.RLock()
defer itr.mu.RUnlock()
if itr.closed {
return nil, nil
}
// Initialize the heap. This needs to be done lazily on the first call to this iterator
// so that iterator initialization done through the Select() call returns quickly.
// Queries can only be interrupted after the Select() call completes so any operations

View File

@ -110,6 +110,9 @@ type {{$k.name}}MergeIterator struct {
heap *{{$k.name}}MergeHeap
init bool
closed bool
mu sync.RWMutex
// Current iterator and window.
curr *{{$k.name}}MergeHeapItem
window struct {
@ -153,17 +156,27 @@ func (itr *{{$k.name}}MergeIterator) Stats() IteratorStats {
// Close closes the underlying iterators.
func (itr *{{$k.name}}MergeIterator) Close() error {
itr.mu.Lock()
defer itr.mu.Unlock()
for _, input := range itr.inputs {
input.Close()
}
itr.curr = nil
itr.inputs = nil
itr.heap.items = nil
itr.closed = true
return nil
}
// Next returns the next point from the iterator.
func (itr *{{$k.name}}MergeIterator) Next() (*{{$k.Name}}Point, error) {
itr.mu.RLock()
defer itr.mu.RUnlock()
if itr.closed {
return nil, nil
}
// Initialize the heap. This needs to be done lazily on the first call to this iterator
// so that iterator initialization done through the Select() call returns quickly.
// Queries can only be interrupted after the Select() call completes so any operations

View File

@ -50,13 +50,13 @@ func (w *responseWriter) streamFloatPoints(cur tsdb.FloatBatchCursor) {
cur.Close()
seriesValueCount += b
w.vc += seriesValueCount
if seriesValueCount == 0 {
w.sz -= w.sf.Size()
// no points collected, strip series frame
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.flushFrames()
w.vc += seriesValueCount
}
}
@ -100,13 +100,13 @@ func (w *responseWriter) streamIntegerPoints(cur tsdb.IntegerBatchCursor) {
cur.Close()
seriesValueCount += b
w.vc += seriesValueCount
if seriesValueCount == 0 {
w.sz -= w.sf.Size()
// no points collected, strip series frame
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.flushFrames()
w.vc += seriesValueCount
}
}
@ -150,13 +150,13 @@ func (w *responseWriter) streamUnsignedPoints(cur tsdb.UnsignedBatchCursor) {
cur.Close()
seriesValueCount += b
w.vc += seriesValueCount
if seriesValueCount == 0 {
w.sz -= w.sf.Size()
// no points collected, strip series frame
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.flushFrames()
w.vc += seriesValueCount
}
}
@ -200,13 +200,13 @@ func (w *responseWriter) streamStringPoints(cur tsdb.StringBatchCursor) {
cur.Close()
seriesValueCount += b
w.vc += seriesValueCount
if seriesValueCount == 0 {
w.sz -= w.sf.Size()
// no points collected, strip series frame
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.flushFrames()
w.vc += seriesValueCount
}
}
@ -250,12 +250,12 @@ func (w *responseWriter) streamBooleanPoints(cur tsdb.BooleanBatchCursor) {
cur.Close()
seriesValueCount += b
w.vc += seriesValueCount
if seriesValueCount == 0 {
w.sz -= w.sf.Size()
// no points collected, strip series frame
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.flushFrames()
w.vc += seriesValueCount
}
}

View File

@ -47,13 +47,13 @@ func (w *responseWriter) stream{{.Name}}Points(cur tsdb.{{.Name}}BatchCursor) {
cur.Close()
seriesValueCount += b
w.vc += seriesValueCount
if seriesValueCount == 0 {
w.sz -= w.sf.Size()
// no points collected, strip series frame
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.flushFrames()
w.vc += seriesValueCount
}
}
{{end}}

View File

@ -8,7 +8,11 @@ import (
"strings"
"github.com/gogo/protobuf/types"
"github.com/influxdata/influxdb/pkg/metrics"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"go.uber.org/zap"
)
@ -16,6 +20,12 @@ import (
//go:generate tmpl -data=@batch_cursor.gen.go.tmpldata batch_cursor.gen.go.tmpl
//go:generate tmpl -data=@batch_cursor.gen.go.tmpldata response_writer.gen.go.tmpl
const (
batchSize = 1000
frameCount = 50
writeSize = 64 << 10 // 64k
)
type rpcService struct {
loggingEnabled bool
@ -31,26 +41,58 @@ func (r *rpcService) Hints(context.Context, *types.Empty) (*HintsResponse, error
return nil, errors.New("not implemented")
}
const (
batchSize = 1000
frameCount = 50
writeSize = 64 << 10 // 64k
)
func (r *rpcService) Read(req *ReadRequest, stream Storage_ReadServer) error {
// TODO(sgc): implement frameWriter that handles the details of streaming frames
var err error
var wire opentracing.SpanContext
if len(req.Trace) > 0 {
wire, err = opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(req.Trace))
if err != nil {
// TODO(sgc): log it?
}
}
span := opentracing.StartSpan("storage.read", ext.RPCServerOption(wire))
defer span.Finish()
ext.DBInstance.Set(span, req.Database)
// TODO(sgc): use yarpc stream.Context() once implemented
ctx := context.Background()
ctx = opentracing.ContextWithSpan(ctx, span)
// TODO(sgc): this should be available via a generic API, such as tsdb.Store
ctx = tsm1.NewContextWithMetricsGroup(ctx)
var agg Aggregate_AggregateType
if req.Aggregate != nil {
agg = req.Aggregate.Type
}
pred := truncateString(PredicateToExprString(req.Predicate))
groupKeys := truncateString(strings.Join(req.Grouping, ","))
span.
SetTag("predicate", pred).
SetTag("series_limit", req.SeriesLimit).
SetTag("series_offset", req.SeriesOffset).
SetTag("points_limit", req.PointsLimit).
SetTag("start", req.TimestampRange.Start).
SetTag("end", req.TimestampRange.End).
SetTag("desc", req.Descending).
SetTag("group_keys", groupKeys).
SetTag("aggregate", agg.String())
if r.loggingEnabled {
r.Logger.Info("request",
zap.String("database", req.Database),
zap.String("predicate", PredicateToExprString(req.Predicate)),
zap.String("predicate", pred),
zap.Uint64("series_limit", req.SeriesLimit),
zap.Uint64("series_offset", req.SeriesOffset),
zap.Uint64("points_limit", req.PointsLimit),
zap.Int64("start", req.TimestampRange.Start),
zap.Int64("end", req.TimestampRange.End),
zap.Bool("desc", req.Descending),
zap.String("grouping", strings.Join(req.Grouping, ",")),
zap.String("group_keys", groupKeys),
zap.String("aggregate", agg.String()),
)
}
@ -58,8 +100,7 @@ func (r *rpcService) Read(req *ReadRequest, stream Storage_ReadServer) error {
req.PointsLimit = math.MaxUint64
}
// TODO(sgc): use yarpc stream.Context() once implemented
rs, err := r.Store.Read(context.Background(), req)
rs, err := r.Store.Read(ctx, req)
if err != nil {
r.Logger.Error("Store.Read failed", zap.Error(err))
return err
@ -107,5 +148,14 @@ func (r *rpcService) Read(req *ReadRequest, stream Storage_ReadServer) error {
w.flushFrames()
span.SetTag("num_values", w.vc)
grp := tsm1.MetricsGroupFromContext(ctx)
grp.ForEach(func(v metrics.Metric) {
switch m := v.(type) {
case *metrics.Counter:
span.SetTag(m.Name(), m.Value())
}
})
return nil
}

View File

@ -10,6 +10,7 @@ import (
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxql"
"github.com/opentracing/opentracing-go"
)
var (
@ -53,6 +54,12 @@ type indexSeriesCursor struct {
}
func newIndexSeriesCursor(ctx context.Context, req *ReadRequest, shards []*tsdb.Shard) (*indexSeriesCursor, error) {
span := opentracing.SpanFromContext(ctx)
if span != nil {
span = opentracing.StartSpan("index_cursor.create", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
opt := query.IteratorOptions{
Aux: []influxql.VarRef{{Val: "key"}},
Authorizer: query.OpenAuthorizer,
@ -83,10 +90,10 @@ func newIndexSeriesCursor(ctx context.Context, req *ReadRequest, shards []*tsdb.
}
}
// TODO(sgc): tsdb.Store or tsdb.ShardGroup should provide an API to enumerate series efficiently
sg := tsdb.Shards(shards)
var itr query.Iterator
if itr, err = sg.CreateIterator(ctx, &influxql.Measurement{SystemIterator: "_series"}, opt); itr != nil && err == nil {
// TODO(sgc): need to rethink how we enumerate series across shards; dedupe is inefficient
itr = query.NewDedupeIterator(itr)
if p.sitr, err = toFloatIterator(itr); err != nil {
@ -217,13 +224,14 @@ func (c *limitSeriesCursor) Next() *seriesRow {
type groupSeriesCursor struct {
seriesCursor
ctx context.Context
rows []seriesRow
keys [][]byte
f bool
}
func newGroupSeriesCursor(ctx context.Context, cur seriesCursor, keys []string) *groupSeriesCursor {
g := &groupSeriesCursor{seriesCursor: cur}
g := &groupSeriesCursor{seriesCursor: cur, ctx: ctx}
g.keys = make([][]byte, 0, len(keys))
for _, k := range keys {
@ -248,6 +256,12 @@ func (c *groupSeriesCursor) Next() *seriesRow {
}
func (c *groupSeriesCursor) sort() {
span := opentracing.SpanFromContext(c.ctx)
if span != nil {
span = opentracing.StartSpan("group_series_cursor.sort", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
var rows []seriesRow
row := c.seriesCursor.Next()
for row != nil {
@ -269,6 +283,10 @@ func (c *groupSeriesCursor) sort() {
return false
})
if span != nil {
span.SetTag("rows", len(rows))
}
c.rows = rows
// free early

View File

@ -142,6 +142,8 @@ type ReadRequest struct {
// PointsLimit determines the maximum number of values per series to be returned for the request.
// Specify 0 for no limit.
PointsLimit uint64 `protobuf:"varint,8,opt,name=points_limit,json=pointsLimit,proto3" json:"points_limit,omitempty"`
// Trace contains opaque data if a trace is active.
Trace map[string]string `protobuf:"bytes,10,rep,name=trace" json:"trace,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
}
func (m *ReadRequest) Reset() { *m = ReadRequest{} }
@ -635,6 +637,23 @@ func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) {
}
i += n3
}
if len(m.Trace) > 0 {
for k, _ := range m.Trace {
dAtA[i] = 0x52
i++
v := m.Trace[k]
mapSize := 1 + len(k) + sovStorage(uint64(len(k))) + 1 + len(v) + sovStorage(uint64(len(v)))
i = encodeVarintStorage(dAtA, i, uint64(mapSize))
dAtA[i] = 0xa
i++
i = encodeVarintStorage(dAtA, i, uint64(len(k)))
i += copy(dAtA[i:], k)
dAtA[i] = 0x12
i++
i = encodeVarintStorage(dAtA, i, uint64(len(v)))
i += copy(dAtA[i:], v)
}
}
return i, nil
}
@ -1300,6 +1319,14 @@ func (m *ReadRequest) Size() (n int) {
l = m.Aggregate.Size()
n += 1 + l + sovStorage(uint64(l))
}
if len(m.Trace) > 0 {
for k, v := range m.Trace {
_ = k
_ = v
mapEntrySize := 1 + len(k) + sovStorage(uint64(len(k))) + 1 + len(v) + sovStorage(uint64(len(v)))
n += mapEntrySize + 1 + sovStorage(uint64(mapEntrySize))
}
}
return n
}
@ -1792,6 +1819,124 @@ func (m *ReadRequest) Unmarshal(dAtA []byte) error {
return err
}
iNdEx = postIndex
case 10:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Trace", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowStorage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthStorage
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Trace == nil {
m.Trace = make(map[string]string)
}
var mapkey string
var mapvalue string
for iNdEx < postIndex {
entryPreIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowStorage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
if fieldNum == 1 {
var stringLenmapkey uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowStorage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLenmapkey |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLenmapkey := int(stringLenmapkey)
if intStringLenmapkey < 0 {
return ErrInvalidLengthStorage
}
postStringIndexmapkey := iNdEx + intStringLenmapkey
if postStringIndexmapkey > l {
return io.ErrUnexpectedEOF
}
mapkey = string(dAtA[iNdEx:postStringIndexmapkey])
iNdEx = postStringIndexmapkey
} else if fieldNum == 2 {
var stringLenmapvalue uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowStorage
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLenmapvalue |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLenmapvalue := int(stringLenmapvalue)
if intStringLenmapvalue < 0 {
return ErrInvalidLengthStorage
}
postStringIndexmapvalue := iNdEx + intStringLenmapvalue
if postStringIndexmapvalue > l {
return io.ErrUnexpectedEOF
}
mapvalue = string(dAtA[iNdEx:postStringIndexmapvalue])
iNdEx = postStringIndexmapvalue
} else {
iNdEx = entryPreIndex
skippy, err := skipStorage(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthStorage
}
if (iNdEx + skippy) > postIndex {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
m.Trace[mapkey] = mapvalue
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipStorage(dAtA[iNdEx:])
@ -3646,78 +3791,81 @@ var (
func init() { proto.RegisterFile("storage.proto", fileDescriptorStorage) }
var fileDescriptorStorage = []byte{
// 1168 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x56, 0xcf, 0x8f, 0xda, 0xc6,
0x17, 0xb7, 0xb1, 0x61, 0xe1, 0xf1, 0xcb, 0x3b, 0xd9, 0xec, 0x17, 0x39, 0xdf, 0x80, 0xc3, 0x21,
0xa5, 0x87, 0x90, 0x88, 0xb6, 0x6a, 0xda, 0xa8, 0x87, 0x90, 0x90, 0x40, 0xb3, 0x81, 0xd5, 0xc0,
0x4a, 0x3d, 0x54, 0xda, 0x9a, 0x65, 0x70, 0xac, 0x82, 0xed, 0xda, 0xa6, 0x0a, 0xb7, 0x1e, 0x2b,
0xd4, 0x43, 0x0f, 0xbd, 0x72, 0xea, 0xdf, 0xd0, 0x5e, 0x7a, 0xcb, 0x69, 0x8f, 0x3d, 0xf6, 0xb4,
0x6a, 0xe9, 0x3f, 0x52, 0xcd, 0x8c, 0x6d, 0xec, 0x5d, 0x36, 0xd2, 0x5e, 0xac, 0x79, 0xbf, 0x3e,
0xef, 0xbd, 0x79, 0x3f, 0x3c, 0x50, 0xf4, 0x7c, 0xdb, 0xd5, 0x0d, 0xd2, 0x74, 0x5c, 0xdb, 0xb7,
0xd1, 0x5e, 0x40, 0xaa, 0x0f, 0x0c, 0xd3, 0x7f, 0xb3, 0x18, 0x37, 0xcf, 0xec, 0xf9, 0x43, 0xc3,
0x36, 0xec, 0x87, 0x4c, 0x3e, 0x5e, 0x4c, 0x19, 0xc5, 0x08, 0x76, 0xe2, 0x76, 0xea, 0x1d, 0xc3,
0xb6, 0x8d, 0x19, 0xd9, 0x6a, 0x91, 0xb9, 0xe3, 0x2f, 0x03, 0x61, 0x2b, 0x86, 0x65, 0x5a, 0xd3,
0xd9, 0xe2, 0xed, 0x44, 0xf7, 0xf5, 0x87, 0x4b, 0xdd, 0x75, 0xce, 0xf8, 0x97, 0xe3, 0xb1, 0x63,
0x60, 0x53, 0x76, 0x5c, 0x32, 0x31, 0xcf, 0x74, 0x3f, 0x88, 0xac, 0xfe, 0x4e, 0x82, 0x3c, 0x26,
0xfa, 0x04, 0x93, 0xef, 0x16, 0xc4, 0xf3, 0x91, 0x0a, 0x59, 0x8a, 0x32, 0xd6, 0x3d, 0x52, 0x11,
0x35, 0xb1, 0x91, 0xc3, 0x11, 0x8d, 0xbe, 0x82, 0xb2, 0x6f, 0xce, 0x89, 0xe7, 0xeb, 0x73, 0xe7,
0xd4, 0xd5, 0x2d, 0x83, 0x54, 0x52, 0x9a, 0xd8, 0xc8, 0xb7, 0xfe, 0xd7, 0x0c, 0xd3, 0x1d, 0x85,
0x72, 0x4c, 0xc5, 0xed, 0xc3, 0xf3, 0x8b, 0x9a, 0xb0, 0xb9, 0xa8, 0x95, 0x92, 0x7c, 0x5c, 0xf2,
0x13, 0x34, 0xaa, 0x02, 0x4c, 0x88, 0x77, 0x46, 0xac, 0x89, 0x69, 0x19, 0x15, 0x49, 0x13, 0x1b,
0x59, 0x1c, 0xe3, 0xd0, 0xa8, 0x0c, 0xd7, 0x5e, 0x38, 0x54, 0x2a, 0x6b, 0x12, 0x8d, 0x2a, 0xa4,
0xd1, 0x23, 0xc8, 0x45, 0x49, 0x55, 0xd2, 0x2c, 0x1e, 0x14, 0xc5, 0x73, 0x1c, 0x4a, 0xf0, 0x56,
0x09, 0xb5, 0xa0, 0xe0, 0x11, 0xd7, 0x24, 0xde, 0xe9, 0xcc, 0x9c, 0x9b, 0x7e, 0x25, 0xa3, 0x89,
0x0d, 0xb9, 0x5d, 0xde, 0x5c, 0xd4, 0xf2, 0x43, 0xc6, 0x3f, 0xa2, 0x6c, 0x9c, 0xf7, 0xb6, 0x04,
0xfa, 0x04, 0x8a, 0x81, 0x8d, 0x3d, 0x9d, 0x7a, 0xc4, 0xaf, 0xec, 0x31, 0x23, 0x65, 0x73, 0x51,
0x2b, 0x70, 0xa3, 0x01, 0xe3, 0xe3, 0x00, 0x9a, 0x53, 0xd4, 0x95, 0x63, 0x9b, 0x96, 0x1f, 0xba,
0xca, 0x6e, 0x5d, 0x1d, 0x33, 0x7e, 0xe0, 0xca, 0xd9, 0x12, 0x34, 0x21, 0xdd, 0x30, 0x5c, 0x62,
0xd0, 0x84, 0x72, 0x97, 0x12, 0x7a, 0x1a, 0x4a, 0xf0, 0x56, 0xa9, 0xfe, 0x87, 0x08, 0xb9, 0x48,
0x80, 0x3e, 0x06, 0xd9, 0x5f, 0x3a, 0xbc, 0x7c, 0xa5, 0x96, 0x76, 0xd5, 0x74, 0x7b, 0x1a, 0x2d,
0x1d, 0x82, 0x99, 0x76, 0xfd, 0x2d, 0x14, 0x13, 0x6c, 0x54, 0x03, 0xb9, 0x3f, 0xe8, 0x77, 0x14,
0x41, 0xbd, 0xbd, 0x5a, 0x6b, 0xfb, 0x09, 0x61, 0xdf, 0xb6, 0x08, 0xba, 0x0b, 0xd2, 0xf0, 0xe4,
0xb5, 0x22, 0xaa, 0x07, 0xab, 0xb5, 0xa6, 0x24, 0xe4, 0xc3, 0xc5, 0x1c, 0xdd, 0x83, 0xf4, 0xb3,
0xc1, 0x49, 0x7f, 0xa4, 0xa4, 0xd4, 0xc3, 0xd5, 0x5a, 0x43, 0x09, 0x85, 0x67, 0xf6, 0xc2, 0xf2,
0x55, 0xf9, 0xc7, 0x5f, 0xab, 0x42, 0xfd, 0x01, 0x48, 0x23, 0xdd, 0x40, 0x0a, 0x48, 0xdf, 0x92,
0x25, 0x8b, 0xba, 0x80, 0xe9, 0x11, 0x1d, 0x40, 0xfa, 0x7b, 0x7d, 0xb6, 0xe0, 0x5d, 0x56, 0xc0,
0x9c, 0xa8, 0xff, 0x92, 0x87, 0x02, 0xef, 0x58, 0xcf, 0xb1, 0x2d, 0x8f, 0xa0, 0xcf, 0x20, 0x33,
0x75, 0xf5, 0x39, 0xf1, 0x2a, 0xa2, 0x26, 0x35, 0xf2, 0xad, 0x3b, 0x51, 0xc6, 0x71, 0xb5, 0xe6,
0x0b, 0xaa, 0xd3, 0x96, 0x69, 0x47, 0xe2, 0xc0, 0x40, 0x7d, 0x27, 0x43, 0x9a, 0xf1, 0xd1, 0x13,
0xc8, 0xf0, 0xc2, 0xb1, 0x00, 0xf2, 0xad, 0x7b, 0xbb, 0x41, 0x78, 0xa9, 0x99, 0x49, 0x57, 0xc0,
0x81, 0x09, 0xfa, 0x1a, 0x0a, 0xd3, 0x99, 0xad, 0xfb, 0xa7, 0xbc, 0x8c, 0xc1, 0x54, 0xdc, 0xbf,
0x26, 0x0e, 0xaa, 0xc9, 0x8b, 0xcf, 0x43, 0x62, 0xdd, 0x10, 0xe3, 0x76, 0x05, 0x9c, 0x9f, 0x6e,
0x49, 0x34, 0x81, 0x92, 0x69, 0xf9, 0xc4, 0x20, 0x6e, 0x88, 0x2f, 0x31, 0xfc, 0xc6, 0x6e, 0xfc,
0x1e, 0xd7, 0x8d, 0x7b, 0xd8, 0xdf, 0x5c, 0xd4, 0x8a, 0x09, 0x7e, 0x57, 0xc0, 0x45, 0x33, 0xce,
0x40, 0x6f, 0xa0, 0xbc, 0xb0, 0x3c, 0xd3, 0xb0, 0xc8, 0x24, 0x74, 0x23, 0x33, 0x37, 0x1f, 0xee,
0x76, 0x73, 0x12, 0x28, 0xc7, 0xfd, 0x20, 0x3a, 0xea, 0x49, 0x41, 0x57, 0xc0, 0xa5, 0x45, 0x82,
0x43, 0xf3, 0x19, 0xdb, 0xf6, 0x8c, 0xe8, 0x56, 0xe8, 0x28, 0xfd, 0xbe, 0x7c, 0xda, 0x5c, 0xf7,
0x4a, 0x3e, 0x09, 0x3e, 0xcd, 0x67, 0x1c, 0x67, 0xa0, 0x6f, 0xe8, 0x0e, 0x76, 0x4d, 0xcb, 0x08,
0x9d, 0x64, 0x98, 0x93, 0x0f, 0xae, 0xa9, 0x2b, 0x53, 0x8d, 0xfb, 0xe0, 0x93, 0x1d, 0x63, 0x77,
0x05, 0x5c, 0xf0, 0x62, 0x74, 0x3b, 0x03, 0x32, 0x5d, 0x8d, 0xaa, 0x0b, 0xf9, 0x58, 0x5b, 0xa0,
0xfb, 0x20, 0xfb, 0xba, 0x11, 0x36, 0x63, 0x61, 0xbb, 0x1a, 0x75, 0x23, 0xe8, 0x3e, 0x26, 0x47,
0x4f, 0x20, 0x47, 0xcd, 0x4f, 0xd9, 0xac, 0xa6, 0xd8, 0xac, 0x56, 0x77, 0x07, 0xf7, 0x5c, 0xf7,
0x75, 0x36, 0xa9, 0x6c, 0x15, 0xd3, 0x93, 0xfa, 0x25, 0x28, 0x97, 0xfb, 0x88, 0x2e, 0xd1, 0x68,
0xad, 0x72, 0xf7, 0x0a, 0x8e, 0x71, 0xd0, 0x21, 0x64, 0xd8, 0x04, 0xd1, 0xfe, 0x94, 0x1a, 0x22,
0x0e, 0x28, 0xf5, 0x08, 0xd0, 0xd5, 0x9e, 0xb9, 0x21, 0x9a, 0x14, 0xa1, 0xbd, 0x86, 0x5b, 0x3b,
0x5a, 0xe3, 0x86, 0x70, 0x72, 0x3c, 0xb8, 0xab, 0x0d, 0x70, 0x43, 0xb4, 0x6c, 0x84, 0xf6, 0x0a,
0xf6, 0xaf, 0x54, 0xfa, 0x86, 0x60, 0xb9, 0x10, 0xac, 0x3e, 0x84, 0x1c, 0x03, 0x08, 0xb6, 0x65,
0x66, 0xd8, 0xc1, 0xbd, 0xce, 0x50, 0x11, 0xd4, 0x5b, 0xab, 0xb5, 0x56, 0x8e, 0x44, 0xbc, 0x37,
0xa8, 0xc2, 0xf1, 0xa0, 0xd7, 0x1f, 0x0d, 0x15, 0xf1, 0x92, 0x02, 0x8f, 0x25, 0x58, 0x86, 0xbf,
0x8b, 0x90, 0x0d, 0xeb, 0x8d, 0xfe, 0x0f, 0xe9, 0x17, 0x47, 0x83, 0xa7, 0x23, 0x45, 0x50, 0xf7,
0x57, 0x6b, 0xad, 0x18, 0x0a, 0x58, 0xe9, 0x91, 0x06, 0x7b, 0xbd, 0xfe, 0xa8, 0xf3, 0xb2, 0x83,
0x43, 0xc8, 0x50, 0x1e, 0x94, 0x13, 0xd5, 0x21, 0x7b, 0xd2, 0x1f, 0xf6, 0x5e, 0xf6, 0x3b, 0xcf,
0x95, 0x14, 0x5f, 0xd3, 0xa1, 0x4a, 0x58, 0x23, 0x8a, 0xd2, 0x1e, 0x0c, 0x8e, 0x3a, 0x4f, 0xfb,
0x8a, 0x94, 0x44, 0x09, 0xee, 0x1d, 0x55, 0x21, 0x33, 0x1c, 0xe1, 0x5e, 0xff, 0xa5, 0x22, 0xab,
0x68, 0xb5, 0xd6, 0x4a, 0xa1, 0x02, 0xbf, 0xca, 0x20, 0xf0, 0x9f, 0x44, 0x38, 0x78, 0xa6, 0x3b,
0xfa, 0xd8, 0x9c, 0x99, 0xbe, 0x49, 0xbc, 0x68, 0x3d, 0x3f, 0x01, 0xf9, 0x4c, 0x77, 0xc2, 0x79,
0xd8, 0xce, 0xdf, 0x2e, 0x65, 0xca, 0xf4, 0x3a, 0x96, 0xef, 0x2e, 0x31, 0x33, 0x52, 0x3f, 0x85,
0x5c, 0xc4, 0x8a, 0xff, 0x21, 0x72, 0x3b, 0xfe, 0x10, 0xb9, 0xe0, 0x0f, 0xf1, 0x79, 0xea, 0xb1,
0x58, 0x2f, 0x43, 0xb1, 0x4b, 0xaf, 0x35, 0x44, 0xae, 0x3f, 0x86, 0x4b, 0x8f, 0x10, 0x6a, 0xec,
0xf9, 0xba, 0xeb, 0x33, 0x40, 0x09, 0x73, 0x82, 0x3a, 0x21, 0xd6, 0x84, 0x01, 0x4a, 0x98, 0x1e,
0x5b, 0x7f, 0x89, 0xb0, 0x37, 0xe4, 0x41, 0xd3, 0x64, 0xe8, 0x68, 0xa2, 0x83, 0x4b, 0x93, 0xca,
0x1e, 0x4f, 0xea, 0xed, 0x9d, 0xf3, 0x5b, 0x97, 0x7f, 0xf8, 0xad, 0x22, 0x3c, 0x12, 0xd1, 0x2b,
0x28, 0xc4, 0x93, 0x46, 0x87, 0x4d, 0xfe, 0xbc, 0x6b, 0x86, 0xcf, 0xbb, 0x66, 0x87, 0x3e, 0xef,
0xd4, 0xbb, 0xef, 0xbd, 0x23, 0x06, 0x27, 0xa2, 0x2f, 0x20, 0xcd, 0x12, 0xbc, 0x16, 0xe5, 0x30,
0x42, 0x49, 0x5e, 0x04, 0x35, 0x4f, 0xa9, 0x2c, 0xa6, 0xf6, 0xc1, 0xf9, 0x3f, 0x55, 0xe1, 0x7c,
0x53, 0x15, 0xff, 0xdc, 0x54, 0xc5, 0xbf, 0x37, 0x55, 0xf1, 0xe7, 0x7f, 0xab, 0xc2, 0x38, 0xc3,
0x90, 0x3e, 0xfa, 0x2f, 0x00, 0x00, 0xff, 0xff, 0xf8, 0x50, 0xa4, 0x8e, 0xc5, 0x0a, 0x00, 0x00,
// 1206 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x56, 0x41, 0x8f, 0xdb, 0x44,
0x14, 0xb6, 0xd7, 0x4e, 0x76, 0xf3, 0x92, 0xec, 0x7a, 0xa7, 0xdb, 0x25, 0x72, 0x69, 0xe2, 0xe6,
0x50, 0xc2, 0xa1, 0x69, 0x15, 0x40, 0x14, 0x2a, 0x24, 0x9a, 0x36, 0xed, 0x2e, 0xdd, 0x26, 0xd5,
0x24, 0x2b, 0x71, 0x40, 0x5a, 0x26, 0x9b, 0x89, 0x6b, 0x91, 0xd8, 0xc6, 0x9e, 0xa0, 0xee, 0x8d,
0x23, 0x5a, 0x71, 0xe0, 0xc0, 0x35, 0x27, 0x7e, 0x03, 0x5c, 0x90, 0x38, 0x70, 0xea, 0x91, 0x23,
0xa7, 0x08, 0xc2, 0x1f, 0x41, 0x33, 0x63, 0x3b, 0xf6, 0x6e, 0x5a, 0x69, 0x2f, 0xd1, 0xbc, 0xf7,
0xbe, 0xf7, 0xbd, 0xf7, 0x66, 0xde, 0x7b, 0x31, 0x94, 0x43, 0xe6, 0x05, 0xc4, 0xa6, 0x4d, 0x3f,
0xf0, 0x98, 0x87, 0x36, 0x23, 0xd1, 0xbc, 0x63, 0x3b, 0xec, 0xe5, 0x6c, 0xd8, 0x3c, 0xf5, 0xa6,
0x77, 0x6d, 0xcf, 0xf6, 0xee, 0x0a, 0xfb, 0x70, 0x36, 0x16, 0x92, 0x10, 0xc4, 0x49, 0xfa, 0x99,
0x37, 0x6c, 0xcf, 0xb3, 0x27, 0x74, 0x85, 0xa2, 0x53, 0x9f, 0x9d, 0x45, 0xc6, 0x56, 0x8a, 0xcb,
0x71, 0xc7, 0x93, 0xd9, 0xab, 0x11, 0x61, 0xe4, 0xee, 0x19, 0x09, 0xfc, 0x53, 0xf9, 0x2b, 0xf9,
0xc4, 0x31, 0xf2, 0xd9, 0xf1, 0x03, 0x3a, 0x72, 0x4e, 0x09, 0x8b, 0x32, 0xab, 0xff, 0xa1, 0x43,
0x11, 0x53, 0x32, 0xc2, 0xf4, 0xdb, 0x19, 0x0d, 0x19, 0x32, 0x61, 0x8b, 0xb3, 0x0c, 0x49, 0x48,
0x2b, 0xaa, 0xa5, 0x36, 0x0a, 0x38, 0x91, 0xd1, 0x97, 0xb0, 0xc3, 0x9c, 0x29, 0x0d, 0x19, 0x99,
0xfa, 0x27, 0x01, 0x71, 0x6d, 0x5a, 0xd9, 0xb0, 0xd4, 0x46, 0xb1, 0xf5, 0x4e, 0x33, 0x2e, 0x77,
0x10, 0xdb, 0x31, 0x37, 0xb7, 0xf7, 0x5f, 0x2f, 0x6a, 0xca, 0x72, 0x51, 0xdb, 0xce, 0xea, 0xf1,
0x36, 0xcb, 0xc8, 0xa8, 0x0a, 0x30, 0xa2, 0xe1, 0x29, 0x75, 0x47, 0x8e, 0x6b, 0x57, 0x34, 0x4b,
0x6d, 0x6c, 0xe1, 0x94, 0x86, 0x67, 0x65, 0x07, 0xde, 0xcc, 0xe7, 0x56, 0xdd, 0xd2, 0x78, 0x56,
0xb1, 0x8c, 0xee, 0x41, 0x21, 0x29, 0xaa, 0x92, 0x13, 0xf9, 0xa0, 0x24, 0x9f, 0x17, 0xb1, 0x05,
0xaf, 0x40, 0xa8, 0x05, 0xa5, 0x90, 0x06, 0x0e, 0x0d, 0x4f, 0x26, 0xce, 0xd4, 0x61, 0x95, 0xbc,
0xa5, 0x36, 0xf4, 0xf6, 0xce, 0x72, 0x51, 0x2b, 0xf6, 0x85, 0xfe, 0x88, 0xab, 0x71, 0x31, 0x5c,
0x09, 0xe8, 0x23, 0x28, 0x47, 0x3e, 0xde, 0x78, 0x1c, 0x52, 0x56, 0xd9, 0x14, 0x4e, 0xc6, 0x72,
0x51, 0x2b, 0x49, 0xa7, 0x9e, 0xd0, 0xe3, 0x88, 0x5a, 0x4a, 0x3c, 0x94, 0xef, 0x39, 0x2e, 0x8b,
0x43, 0x6d, 0xad, 0x42, 0xbd, 0x10, 0xfa, 0x28, 0x94, 0xbf, 0x12, 0x78, 0x41, 0xc4, 0xb6, 0x03,
0x6a, 0xf3, 0x82, 0x0a, 0x17, 0x0a, 0x7a, 0x18, 0x5b, 0xf0, 0x0a, 0x84, 0x3e, 0x87, 0x1c, 0x0b,
0xc8, 0x29, 0xad, 0x80, 0xa5, 0x35, 0x8a, 0xad, 0x5a, 0x82, 0x4e, 0xbd, 0x6c, 0x73, 0xc0, 0x11,
0x1d, 0x97, 0x05, 0x67, 0xed, 0xc2, 0x72, 0x51, 0xcb, 0x09, 0x19, 0x4b, 0x47, 0xf3, 0x3e, 0xc0,
0xca, 0x8e, 0x0c, 0xd0, 0xbe, 0xa1, 0x67, 0xd1, 0xfb, 0xf3, 0x23, 0xda, 0x83, 0xdc, 0x77, 0x64,
0x32, 0x93, 0x0f, 0x5e, 0xc0, 0x52, 0xf8, 0x74, 0xe3, 0xbe, 0x5a, 0xff, 0x5d, 0x85, 0x42, 0x92,
0x14, 0xfa, 0x10, 0x74, 0x76, 0xe6, 0xcb, 0xd6, 0xd9, 0x6e, 0x59, 0x97, 0xd3, 0x5e, 0x9d, 0x06,
0x67, 0x3e, 0xc5, 0x02, 0x5d, 0x7f, 0x05, 0xe5, 0x8c, 0x1a, 0xd5, 0x40, 0xef, 0xf6, 0xba, 0x1d,
0x43, 0x31, 0xaf, 0x9f, 0xcf, 0xad, 0xdd, 0x8c, 0xb1, 0xeb, 0xb9, 0x14, 0xdd, 0x04, 0xad, 0x7f,
0xfc, 0xdc, 0x50, 0xcd, 0xbd, 0xf3, 0xb9, 0x65, 0x64, 0xec, 0xfd, 0xd9, 0x14, 0xdd, 0x82, 0xdc,
0xa3, 0xde, 0x71, 0x77, 0x60, 0x6c, 0x98, 0xfb, 0xe7, 0x73, 0x0b, 0x65, 0x00, 0x8f, 0xbc, 0x99,
0xcb, 0x4c, 0xfd, 0x87, 0x5f, 0xaa, 0x4a, 0xfd, 0x0e, 0x68, 0x03, 0x62, 0xa7, 0x0b, 0x2e, 0xad,
0x29, 0xb8, 0x14, 0x15, 0x5c, 0xff, 0xb9, 0x08, 0x25, 0x79, 0xa7, 0xa1, 0xef, 0xb9, 0x21, 0x45,
0x9f, 0x40, 0x7e, 0x1c, 0x90, 0x29, 0x0d, 0x2b, 0xaa, 0xb8, 0xfa, 0x1b, 0x17, 0xae, 0x5e, 0xc2,
0x9a, 0x4f, 0x38, 0xa6, 0xad, 0xf3, 0x69, 0xc0, 0x91, 0x83, 0xf9, 0xa7, 0x0e, 0x39, 0xa1, 0x47,
0x0f, 0x20, 0x2f, 0x9b, 0x46, 0x24, 0x50, 0x6c, 0xdd, 0x5a, 0x4f, 0x22, 0xdb, 0x4c, 0xb8, 0x1c,
0x28, 0x38, 0x72, 0x41, 0x5f, 0x41, 0x69, 0x3c, 0xf1, 0x08, 0x3b, 0x91, 0x2d, 0x14, 0x4d, 0xe4,
0xed, 0x37, 0xe4, 0xc1, 0x91, 0xb2, 0xf1, 0x64, 0x4a, 0xa2, 0x13, 0x53, 0xda, 0x03, 0x05, 0x17,
0xc7, 0x2b, 0x11, 0x8d, 0x60, 0xdb, 0x71, 0x19, 0xb5, 0x69, 0x10, 0xf3, 0x6b, 0x82, 0xbf, 0xb1,
0x9e, 0xff, 0x50, 0x62, 0xd3, 0x11, 0x76, 0x97, 0x8b, 0x5a, 0x39, 0xa3, 0x3f, 0x50, 0x70, 0xd9,
0x49, 0x2b, 0xd0, 0x4b, 0xd8, 0x99, 0xb9, 0xa1, 0x63, 0xbb, 0x74, 0x14, 0x87, 0xd1, 0x45, 0x98,
0xf7, 0xd7, 0x87, 0x39, 0x8e, 0xc0, 0xe9, 0x38, 0x88, 0xaf, 0x99, 0xac, 0xe1, 0x40, 0xc1, 0xdb,
0xb3, 0x8c, 0x86, 0xd7, 0x33, 0xf4, 0xbc, 0x09, 0x25, 0x6e, 0x1c, 0x28, 0xf7, 0xb6, 0x7a, 0xda,
0x12, 0x7b, 0xa9, 0x9e, 0x8c, 0x9e, 0xd7, 0x33, 0x4c, 0x2b, 0xd0, 0xd7, 0x7c, 0xff, 0x07, 0x8e,
0x6b, 0xc7, 0x41, 0xf2, 0x22, 0xc8, 0x7b, 0x6f, 0x78, 0x57, 0x01, 0x4d, 0xc7, 0x90, 0x5b, 0x25,
0xa5, 0x3e, 0x50, 0x70, 0x29, 0x4c, 0xc9, 0xed, 0x3c, 0xe8, 0x7c, 0x2d, 0x9b, 0x01, 0x14, 0x53,
0x6d, 0x81, 0x6e, 0x83, 0xce, 0x88, 0x1d, 0x37, 0x63, 0x69, 0xb5, 0x96, 0x89, 0x1d, 0x75, 0x9f,
0xb0, 0xa3, 0x07, 0x50, 0xe0, 0xee, 0x27, 0x62, 0x56, 0x37, 0xc4, 0xac, 0x56, 0xd7, 0x27, 0xf7,
0x98, 0x30, 0x22, 0x26, 0x55, 0xfc, 0x0d, 0xf0, 0x93, 0xf9, 0x05, 0x18, 0x17, 0xfb, 0x88, 0x2f,
0xf0, 0x64, 0xa5, 0xcb, 0xf0, 0x06, 0x4e, 0x69, 0xd0, 0x3e, 0xe4, 0xc5, 0x04, 0xf1, 0xfe, 0xd4,
0x1a, 0x2a, 0x8e, 0x24, 0xf3, 0x08, 0xd0, 0xe5, 0x9e, 0xb9, 0x22, 0x9b, 0x96, 0xb0, 0x3d, 0x87,
0x6b, 0x6b, 0x5a, 0xe3, 0x8a, 0x74, 0x7a, 0x3a, 0xb9, 0xcb, 0x0d, 0x70, 0x45, 0xb6, 0xad, 0x84,
0xed, 0x19, 0xec, 0x5e, 0x7a, 0xe9, 0x2b, 0x92, 0x15, 0x62, 0xb2, 0x7a, 0x1f, 0x0a, 0x82, 0x20,
0xda, 0x96, 0xf9, 0x7e, 0x07, 0x1f, 0x76, 0xfa, 0x86, 0x62, 0x5e, 0x3b, 0x9f, 0x5b, 0x3b, 0x89,
0x49, 0xf6, 0x06, 0x07, 0xbc, 0xe8, 0x1d, 0x76, 0x07, 0x7d, 0x43, 0xbd, 0x00, 0x90, 0xb9, 0x44,
0xcb, 0xf0, 0x37, 0x15, 0xb6, 0xe2, 0xf7, 0x46, 0xef, 0x42, 0xee, 0xc9, 0x51, 0xef, 0xe1, 0xc0,
0x50, 0xcc, 0xdd, 0xf3, 0xb9, 0x55, 0x8e, 0x0d, 0xe2, 0xe9, 0x91, 0x05, 0x9b, 0x87, 0xdd, 0x41,
0xe7, 0x69, 0x07, 0xc7, 0x94, 0xb1, 0x3d, 0x7a, 0x4e, 0x54, 0x87, 0xad, 0xe3, 0x6e, 0xff, 0xf0,
0x69, 0xb7, 0xf3, 0xd8, 0xd8, 0x90, 0x6b, 0x3a, 0x86, 0xc4, 0x6f, 0xc4, 0x59, 0xda, 0xbd, 0xde,
0x51, 0xe7, 0x61, 0xd7, 0xd0, 0xb2, 0x2c, 0xd1, 0xbd, 0xa3, 0x2a, 0xe4, 0xfb, 0x03, 0x7c, 0xd8,
0x7d, 0x6a, 0xe8, 0x26, 0x3a, 0x9f, 0x5b, 0xdb, 0x31, 0x40, 0x5e, 0x65, 0x94, 0xf8, 0x8f, 0x2a,
0xec, 0x3d, 0x22, 0x3e, 0x19, 0x3a, 0x13, 0x87, 0x39, 0x34, 0x4c, 0xd6, 0xf3, 0x03, 0xd0, 0x4f,
0x89, 0x1f, 0xcf, 0xc3, 0x6a, 0xfe, 0xd6, 0x81, 0xb9, 0x32, 0x14, 0xff, 0x7f, 0x58, 0x38, 0x99,
0x1f, 0x43, 0x21, 0x51, 0x5d, 0xe9, 0x2f, 0x71, 0x07, 0xca, 0x07, 0xfc, 0x5a, 0x63, 0xe6, 0xfa,
0x7d, 0xb8, 0xf0, 0x01, 0xc4, 0x9d, 0x43, 0x46, 0x02, 0x26, 0x08, 0x35, 0x2c, 0x05, 0x1e, 0x84,
0xba, 0x23, 0x41, 0xa8, 0x61, 0x7e, 0x6c, 0xfd, 0xad, 0xc2, 0x66, 0x5f, 0x26, 0xcd, 0x8b, 0xe1,
0xa3, 0x89, 0xf6, 0xd6, 0xfd, 0xbd, 0x9b, 0xd7, 0xd7, 0xce, 0x6f, 0x5d, 0xff, 0xfe, 0xd7, 0x8a,
0x72, 0x4f, 0x45, 0xcf, 0xa0, 0x94, 0x2e, 0x1a, 0xed, 0x37, 0xe5, 0xa7, 0x65, 0x33, 0xfe, 0xb4,
0x6c, 0x76, 0xf8, 0xa7, 0xa5, 0x79, 0xf3, 0xad, 0x77, 0x24, 0xe8, 0x54, 0xf4, 0x19, 0xe4, 0x44,
0x81, 0x6f, 0x64, 0xd9, 0x4f, 0x58, 0xb2, 0x17, 0xc1, 0xdd, 0x37, 0x4c, 0x91, 0x53, 0x7b, 0xef,
0xf5, 0xbf, 0x55, 0xe5, 0xf5, 0xb2, 0xaa, 0xfe, 0xb5, 0xac, 0xaa, 0xff, 0x2c, 0xab, 0xea, 0x4f,
0xff, 0x55, 0x95, 0x61, 0x5e, 0x30, 0x7d, 0xf0, 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x91, 0xdc,
0x3a, 0xb6, 0x41, 0x0b, 0x00, 0x00,
}

View File

@ -61,6 +61,9 @@ message ReadRequest {
// PointsLimit determines the maximum number of values per series to be returned for the request.
// Specify 0 for no limit.
uint64 points_limit = 8 [(gogoproto.customname) = "PointsLimit"];
// Trace contains opaque data if a trace is active.
map<string, string> trace = 10 [(gogoproto.customname) = "Trace"];
}
message Aggregate {

View File

@ -0,0 +1,16 @@
package storage
const (
// maxAnnotationLength is the max length of byte array or string allowed in the annotations
maxAnnotationLength = 256
)
func truncateString(value string) string {
// we ignore the problem of utf8 runes possibly being sliced in the middle,
// as it is rather expensive to iterate through each tag just to find rune
// boundaries.
if len(value) > maxAnnotationLength {
return value[:maxAnnotationLength]
}
return value
}

View File

@ -338,6 +338,11 @@ func (s *LocalServer) Reset() error {
func (s *LocalServer) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error {
s.mu.RLock()
defer s.mu.RUnlock()
if s.PointsWriter == nil {
return fmt.Errorf("server closed")
}
return s.PointsWriter.WritePoints(database, retentionPolicy, consistencyLevel, user, points)
}

View File

@ -64,11 +64,12 @@ type Engine interface {
MeasurementFields(measurement []byte) *MeasurementFields
ForEachMeasurementName(fn func(name []byte) error) error
DeleteMeasurement(name []byte) error
MeasurementFieldSet() *MeasurementFieldSet
// TagKeys(name []byte) ([][]byte, error)
HasTagKey(name, key []byte) (bool, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, key string) bool
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
TagKeyCardinality(name, key []byte) int

View File

@ -1,5 +0,0 @@
{
"files": [
"00000001.tsl"
]
}

View File

@ -61,6 +61,19 @@ var (
planningTimer = metrics.MustRegisterTimer("planning_time", metrics.WithGroup(tsmGroup))
)
// NewContextWithMetricsGroup creates a new context with a tsm1 metrics.Group for tracking
// various metrics when accessing TSM data.
func NewContextWithMetricsGroup(ctx context.Context) context.Context {
group := metrics.NewGroup(tsmGroup)
return metrics.NewContextWithGroup(ctx, group)
}
// MetricsGroupFromContext returns the tsm1 metrics.Group associated with the context
// or nil if no group has been assigned.
func MetricsGroupFromContext(ctx context.Context) *metrics.Group {
return metrics.GroupFromContext(ctx)
}
const (
// keyFieldSeparator separates the series key from the field name in the composite key
// that identifies a specific field in series
@ -189,8 +202,6 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
traceLogger: logger,
traceLogging: opt.Config.TraceLoggingEnabled,
fieldset: tsdb.NewMeasurementFieldSet(),
WAL: w,
Cache: cache,
@ -206,9 +217,6 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
scheduler: newScheduler(stats, opt.CompactionLimiter.Capacity()),
}
// Attach fieldset to index.
e.index.SetFieldSet(e.fieldset)
if e.traceLogging {
fs.enableTraceLogging(true)
w.enableTraceLogging(true)
@ -282,17 +290,44 @@ func (e *Engine) disableLevelCompactions(wait bool) {
e.levelWorkers += 1
}
// Hold onto the current done channel so we can wait on it if necessary
waitCh := e.done
if old == 0 && e.done != nil {
// It's possible we have closed the done channel and released the lock and another
// goroutine has attempted to disable compactions. We're current in the process of
// disabling them so check for this and wait until the original completes.
select {
case <-e.done:
e.mu.Unlock()
return
default:
}
// Prevent new compactions from starting
e.Compactor.DisableCompactions()
// Stop all background compaction goroutines
close(e.done)
e.done = nil
e.mu.Unlock()
e.wg.Wait()
// Signal that all goroutines have exited.
e.mu.Lock()
e.done = nil
e.mu.Unlock()
return
}
e.mu.Unlock()
// Compaction were already disabled.
if waitCh == nil {
return
}
e.mu.Unlock()
// We were not the first caller to disable compactions and they were in the process
// of being disabled. Wait for them to complete before returning.
<-waitCh
e.wg.Wait()
}
@ -323,15 +358,34 @@ func (e *Engine) enableSnapshotCompactions() {
func (e *Engine) disableSnapshotCompactions() {
e.mu.Lock()
var wait bool
if e.snapDone != nil {
// We may be in the process of stopping snapshots. See if the channel
// was closed.
select {
case <-e.snapDone:
e.mu.Unlock()
return
default:
}
close(e.snapDone)
e.snapDone = nil
e.Compactor.DisableSnapshots()
wait = true
}
e.mu.Unlock()
// Wait for the snapshot goroutine to exit.
if wait {
e.snapWG.Wait()
}
// Signal that the goroutines are exit and everything is stopped by setting
// snapDone to nil.
e.mu.Lock()
e.snapDone = nil
e.mu.Unlock()
e.snapWG.Wait()
// If the cache is empty, free up its resources as well.
if e.Cache.Size() == 0 {
@ -383,6 +437,10 @@ func (e *Engine) MeasurementFields(measurement []byte) *tsdb.MeasurementFields {
return e.fieldset.CreateFieldsIfNotExists(measurement)
}
func (e *Engine) MeasurementFieldSet() *tsdb.MeasurementFieldSet {
return e.fieldset
}
func (e *Engine) HasTagKey(name, key []byte) (bool, error) {
return e.index.HasTagKey(name, key)
}
@ -391,6 +449,12 @@ func (e *Engine) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[
return e.index.MeasurementTagKeysByExpr(name, expr)
}
// TagKeyHasAuthorizedSeries determines if there exist authorized series for the
// provided measurement name and tag key.
func (e *Engine) TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, key string) bool {
return e.index.TagKeyHasAuthorizedSeries(auth, name, key)
}
// MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression.
//
// MeasurementTagKeyValuesByExpr relies on the provided tag keys being sorted.
@ -528,6 +592,17 @@ func (e *Engine) Open() error {
return err
}
fields, err := tsdb.NewMeasurementFieldSet(filepath.Join(e.path, "fields.idx"))
if err != nil {
return err
}
e.mu.Lock()
e.fieldset = fields
e.mu.Unlock()
e.index.SetFieldSet(fields)
if err := e.WAL.Open(); err != nil {
return err
}
@ -583,6 +658,12 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
// Save reference to index for iterator creation.
e.index = index
// If we have the cached fields index on disk and we're using TSI, we
// can skip scanning all the TSM files.
if e.index.Type() != inmem.IndexName && !e.fieldset.IsEmpty() {
return nil
}
if err := e.FileStore.WalkKeys(nil, func(key []byte, typ byte) error {
fieldType, err := tsmFieldTypeToInfluxQLDataType(typ)
if err != nil {
@ -612,6 +693,11 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
return err
}
// Save the field set index so we don't have to rebuild it next time
if err := e.fieldset.Save(); err != nil {
return err
}
e.traceLogger.Info(fmt.Sprintf("Meta data index for shard %d loaded in %v", shardID, time.Since(now)))
return nil
}
@ -872,7 +958,7 @@ func (e *Engine) addToIndexFromKey(key []byte, fieldType influxql.DataType) erro
name := tsdb.MeasurementFromSeriesKey(seriesKey)
mf := e.fieldset.CreateFieldsIfNotExists(name)
if err := mf.CreateFieldIfNotExists(field, fieldType, false); err != nil {
if err := mf.CreateFieldIfNotExists(field, fieldType); err != nil {
return err
}
@ -1098,6 +1184,9 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
return nil
})
// Sort the series keys because ApplyEntryFn iterates over the keys randomly.
bytesutil.Sort(deleteKeys)
e.Cache.DeleteRange(deleteKeys, min, max)
// delete from the WAL
@ -1229,7 +1318,7 @@ func (e *Engine) DeleteMeasurement(name []byte) error {
return err
}
return nil
return e.fieldset.Save()
}
// DeleteMeasurement deletes a measurement and all related series.

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/influxdata/influxdb/pkg/metrics"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxql"
@ -23,6 +24,10 @@ func (e *Engine) CreateCursor(ctx context.Context, r *tsdb.CursorRequest) (tsdb.
return nil, nil
}
if grp := metrics.GroupFromContext(ctx); grp != nil {
grp.GetCounter(numberOfRefCursorsCounter).Add(1)
}
var opt query.IteratorOptions
opt.Ascending = r.Ascending
opt.StartTime = r.StartTime

View File

@ -261,7 +261,7 @@ func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) {
defer e.Close()
// e.CreateMeasurement("cpu")
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
@ -313,7 +313,7 @@ func TestEngine_CreateIterator_Cache_Descending(t *testing.T) {
e := MustOpenDefaultEngine()
defer e.Close()
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
@ -365,7 +365,7 @@ func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) {
e := MustOpenDefaultEngine()
defer e.Close()
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
@ -418,7 +418,7 @@ func TestEngine_CreateIterator_TSM_Descending(t *testing.T) {
e := MustOpenDefaultEngine()
defer e.Close()
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
@ -471,8 +471,8 @@ func TestEngine_CreateIterator_Aux(t *testing.T) {
e := MustOpenDefaultEngine()
defer e.Close()
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("F"), influxql.Float, false)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("F"), influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
@ -527,9 +527,9 @@ func TestEngine_CreateIterator_Condition(t *testing.T) {
e := MustOpenDefaultEngine()
defer e.Close()
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("X"), influxql.Float, false)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("Y"), influxql.Float, false)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("X"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("Y"), influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
e.SetFieldName([]byte("cpu"), "X")
e.SetFieldName([]byte("cpu"), "Y")
@ -725,7 +725,7 @@ func TestEngine_CreateCursor_Ascending(t *testing.T) {
e := MustOpenDefaultEngine()
defer e.Close()
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
@ -774,7 +774,7 @@ func TestEngine_CreateCursor_Descending(t *testing.T) {
e := MustOpenDefaultEngine()
defer e.Close()
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
@ -816,6 +816,35 @@ func TestEngine_CreateCursor_Descending(t *testing.T) {
}
}
// This test ensures that "sync: WaitGroup is reused before previous Wait has returned" is
// is not raised.
func TestEngine_DisableEnableCompactions_Concurrent(t *testing.T) {
t.Parallel()
e := MustOpenDefaultEngine()
defer e.Close()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
e.SetCompactionsEnabled(true)
e.SetCompactionsEnabled(false)
}
}()
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
e.SetCompactionsEnabled(false)
e.SetCompactionsEnabled(true)
}
}()
wg.Wait()
}
func BenchmarkEngine_CreateIterator_Count_1K(b *testing.B) {
benchmarkEngineCreateIteratorCount(b, 1000)
}
@ -890,7 +919,7 @@ func BenchmarkEngine_WritePoints(b *testing.B) {
for _, sz := range batchSizes {
for _, index := range tsdb.RegisteredIndexes() {
e := MustOpenEngine(index)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
pp := make([]models.Point, 0, sz)
for i := 0; i < sz; i++ {
p := MustParsePointString(fmt.Sprintf("cpu,host=%d value=1.2", i))
@ -916,7 +945,7 @@ func BenchmarkEngine_WritePoints_Parallel(b *testing.B) {
for _, sz := range batchSizes {
for _, index := range tsdb.RegisteredIndexes() {
e := MustOpenEngine(index)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
cpus := runtime.GOMAXPROCS(0)
pp := make([]models.Point, 0, sz*cpus)
@ -1015,7 +1044,7 @@ func MustInitDefaultBenchmarkEngine(pointN int) *Engine {
e := MustOpenEngine(tsdb.DefaultIndex)
// Initialize metadata.
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float, false)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
// Generate time ascending points with jitterred time & value.

View File

@ -303,49 +303,16 @@ func (f *FileStore) WalkKeys(seek []byte, fn func(key []byte, typ byte) error) e
return nil
}
readers := make([]chan seriesKey, 0, len(f.files))
done := make(chan struct{})
for _, f := range f.files {
ch := make(chan seriesKey, 1)
readers = append(readers, ch)
go func(c chan seriesKey, r TSMFile) {
start := 0
if len(seek) > 0 {
start = r.Seek(seek)
}
n := r.KeyCount()
for i := start; i < n; i++ {
key, typ := r.KeyAt(i)
select {
case <-done:
// Abort iteration
break
case c <- seriesKey{key, typ}:
}
}
close(ch)
}(ch, f)
}
ki := newMergeKeyIterator(f.files, seek)
f.mu.RUnlock()
merged := merge(readers...)
var err error
for v := range merged {
// Drain the remaing values so goroutines can exit
if err != nil {
continue
}
if err = fn(v.key, v.typ); err != nil {
// Signal that we should stop iterating
close(done)
for ki.Next() {
key, typ := ki.Read()
if err := fn(key, typ); err != nil {
return err
}
}
return err
return nil
}
// Keys returns all keys and types for all files in the file store.

View File

@ -0,0 +1,112 @@
package tsm1
import (
"bytes"
"container/heap"
)
type keyIterator struct {
f TSMFile
c int // current key index
n int // key count
key []byte
typ byte
}
func newKeyIterator(f TSMFile, seek []byte) *keyIterator {
c, n := 0, f.KeyCount()
if len(seek) > 0 {
c = f.Seek(seek)
}
if c >= n {
return nil
}
k := &keyIterator{f: f, c: c, n: n}
k.next()
return k
}
func (k *keyIterator) next() bool {
if k.c < k.n {
k.key, k.typ = k.f.KeyAt(k.c)
k.c++
return true
}
return false
}
type mergeKeyIterator struct {
itrs keyIterators
key []byte
typ byte
}
func newMergeKeyIterator(files []TSMFile, seek []byte) *mergeKeyIterator {
m := &mergeKeyIterator{}
itrs := make(keyIterators, 0, len(files))
for _, f := range files {
if ki := newKeyIterator(f, seek); ki != nil {
itrs = append(itrs, ki)
}
}
m.itrs = itrs
heap.Init(&m.itrs)
return m
}
func (m *mergeKeyIterator) Next() bool {
merging := len(m.itrs) > 1
RETRY:
if len(m.itrs) == 0 {
return false
}
key, typ := m.itrs[0].key, m.itrs[0].typ
more := m.itrs[0].next()
switch {
case len(m.itrs) > 1:
if !more {
// remove iterator from heap
heap.Pop(&m.itrs)
} else {
heap.Fix(&m.itrs, 0)
}
case len(m.itrs) == 1:
if !more {
m.itrs = nil
}
}
if merging && bytes.Compare(m.key, key) == 0 {
// same as previous key, keep iterating
goto RETRY
}
m.key, m.typ = key, typ
return true
}
func (m *mergeKeyIterator) Read() ([]byte, byte) { return m.key, m.typ }
type keyIterators []*keyIterator
func (k keyIterators) Len() int { return len(k) }
func (k keyIterators) Less(i, j int) bool { return bytes.Compare(k[i].key, k[j].key) == -1 }
func (k keyIterators) Swap(i, j int) { k[i], k[j] = k[j], k[i] }
func (k *keyIterators) Push(x interface{}) { *k = append(*k, x.(*keyIterator)) }
func (k *keyIterators) Pop() interface{} {
old := *k
n := len(old)
x := old[n-1]
*k = old[:n-1]
return x
}

View File

@ -0,0 +1,198 @@
package tsm1
import (
"sort"
"testing"
"github.com/google/go-cmp/cmp"
)
func TestNewMergeKeyIterator(t *testing.T) {
cases := []struct {
name string
seek string
files []TSMFile
exp []string
}{
{
name: "mixed",
files: newTSMFiles(
[]string{"aaaa", "bbbb", "cccc", "dddd"},
[]string{"aaaa", "cccc", "dddd"},
[]string{"eeee", "ffff", "gggg"},
[]string{"aaaa"},
[]string{"dddd"},
),
exp: []string{"aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff", "gggg"},
},
{
name: "similar keys",
files: newTSMFiles(
[]string{"a", "aaa"},
[]string{"aa", "aaaa"},
),
exp: []string{"a", "aa", "aaa", "aaaa"},
},
{
name: "seek skips some files",
seek: "eeee",
files: newTSMFiles(
[]string{"aaaa", "bbbb", "cccc", "dddd"},
[]string{"aaaa", "cccc", "dddd"},
[]string{"eeee", "ffff", "gggg"},
[]string{"aaaa"},
[]string{"dddd"},
),
exp: []string{"eeee", "ffff", "gggg"},
},
{
name: "keys same across all files",
files: newTSMFiles(
[]string{"aaaa", "bbbb", "cccc", "dddd"},
[]string{"aaaa", "bbbb", "cccc", "dddd"},
[]string{"aaaa", "bbbb", "cccc", "dddd"},
),
exp: []string{"aaaa", "bbbb", "cccc", "dddd"},
},
{
name: "keys same across all files with extra",
files: newTSMFiles(
[]string{"aaaa", "bbbb", "cccc", "dddd"},
[]string{"aaaa", "bbbb", "cccc", "dddd"},
[]string{"aaaa", "bbbb", "cccc", "dddd", "eeee"},
),
exp: []string{"aaaa", "bbbb", "cccc", "dddd", "eeee"},
},
{
name: "seek skips all files",
seek: "eeee",
files: newTSMFiles(
[]string{"aaaa", "bbbb", "cccc", "dddd"},
[]string{"aaaa", "bbbb", "cccc", "dddd"},
[]string{"aaaa", "bbbb", "cccc", "dddd"},
),
exp: nil,
},
{
name: "keys sequential across all files",
files: newTSMFiles(
[]string{"a", "b", "c", "d"},
[]string{"e", "f", "g", "h"},
[]string{"i", "j", "k", "l"},
),
exp: []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"},
},
{
name: "seek past one file",
seek: "e",
files: newTSMFiles(
[]string{"a", "b", "c", "d"},
[]string{"e", "f", "g", "h"},
[]string{"i", "j", "k", "l"},
),
exp: []string{"e", "f", "g", "h", "i", "j", "k", "l"},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
ki := newMergeKeyIterator(tc.files, []byte(tc.seek))
var act []string
for ki.Next() {
key, _ := ki.Read()
act = append(act, string(key))
}
if !cmp.Equal(tc.exp, act) {
t.Error(cmp.Diff(tc.exp, act))
}
})
}
}
func newTSMFiles(keys ...[]string) []TSMFile {
var files []TSMFile
for _, k := range keys {
files = append(files, newMockTSMFile(k...))
}
return files
}
type mockTSMFile struct {
keys []string
}
func newMockTSMFile(keys ...string) *mockTSMFile {
sort.Strings(keys)
return &mockTSMFile{keys: keys}
}
func (t *mockTSMFile) KeyCount() int { return len(t.keys) }
func (t *mockTSMFile) Seek(key []byte) int {
k := string(key)
return sort.Search(len(t.keys), func(i int) bool {
return t.keys[i] >= k
})
}
func (t *mockTSMFile) KeyAt(idx int) ([]byte, byte) {
return []byte(t.keys[idx]), BlockFloat64
}
func (*mockTSMFile) Path() string { panic("implement me") }
func (*mockTSMFile) Read(key []byte, t int64) ([]Value, error) { panic("implement me") }
func (*mockTSMFile) ReadAt(entry *IndexEntry, values []Value) ([]Value, error) { panic("implement me") }
func (*mockTSMFile) Entries(key []byte) []IndexEntry { panic("implement me") }
func (*mockTSMFile) ReadEntries(key []byte, entries *[]IndexEntry) []IndexEntry { panic("implement me") }
func (*mockTSMFile) ContainsValue(key []byte, t int64) bool { panic("implement me") }
func (*mockTSMFile) Contains(key []byte) bool { panic("implement me") }
func (*mockTSMFile) OverlapsTimeRange(min, max int64) bool { panic("implement me") }
func (*mockTSMFile) OverlapsKeyRange(min, max []byte) bool { panic("implement me") }
func (*mockTSMFile) TimeRange() (int64, int64) { panic("implement me") }
func (*mockTSMFile) TombstoneRange(key []byte) []TimeRange { panic("implement me") }
func (*mockTSMFile) KeyRange() ([]byte, []byte) { panic("implement me") }
func (*mockTSMFile) Type(key []byte) (byte, error) { panic("implement me") }
func (*mockTSMFile) BatchDelete() BatchDeleter { panic("implement me") }
func (*mockTSMFile) Delete(keys [][]byte) error { panic("implement me") }
func (*mockTSMFile) DeleteRange(keys [][]byte, min, max int64) error { panic("implement me") }
func (*mockTSMFile) HasTombstones() bool { panic("implement me") }
func (*mockTSMFile) TombstoneFiles() []FileStat { panic("implement me") }
func (*mockTSMFile) Close() error { panic("implement me") }
func (*mockTSMFile) Size() uint32 { panic("implement me") }
func (*mockTSMFile) Rename(path string) error { panic("implement me") }
func (*mockTSMFile) Remove() error { panic("implement me") }
func (*mockTSMFile) InUse() bool { panic("implement me") }
func (*mockTSMFile) Ref() { panic("implement me") }
func (*mockTSMFile) Unref() { panic("implement me") }
func (*mockTSMFile) Stats() FileStat { panic("implement me") }
func (*mockTSMFile) BlockIterator() *BlockIterator { panic("implement me") }
func (*mockTSMFile) Free() error { panic("implement me") }
func (*mockTSMFile) ReadFloatBlockAt(*IndexEntry, *[]FloatValue) ([]FloatValue, error) {
panic("implement me")
}
func (*mockTSMFile) ReadIntegerBlockAt(*IndexEntry, *[]IntegerValue) ([]IntegerValue, error) {
panic("implement me")
}
func (*mockTSMFile) ReadUnsignedBlockAt(*IndexEntry, *[]UnsignedValue) ([]UnsignedValue, error) {
panic("implement me")
}
func (*mockTSMFile) ReadStringBlockAt(*IndexEntry, *[]StringValue) ([]StringValue, error) {
panic("implement me")
}
func (*mockTSMFile) ReadBooleanBlockAt(*IndexEntry, *[]BooleanValue) ([]BooleanValue, error) {
panic("implement me")
}

View File

@ -135,9 +135,10 @@ func newFloatFinalizerIterator(inner query.FloatIterator, logger *zap.Logger) *f
}
func (itr *floatFinalizerIterator) closeGC() {
runtime.SetFinalizer(itr, nil)
itr.logger.Error("FloatIterator finalized by GC")
itr.Close()
go func() {
itr.logger.Error("FloatIterator finalized by GC")
itr.Close()
}()
}
func (itr *floatFinalizerIterator) Close() error {
@ -598,9 +599,10 @@ func newIntegerFinalizerIterator(inner query.IntegerIterator, logger *zap.Logger
}
func (itr *integerFinalizerIterator) closeGC() {
runtime.SetFinalizer(itr, nil)
itr.logger.Error("IntegerIterator finalized by GC")
itr.Close()
go func() {
itr.logger.Error("IntegerIterator finalized by GC")
itr.Close()
}()
}
func (itr *integerFinalizerIterator) Close() error {
@ -1061,9 +1063,10 @@ func newUnsignedFinalizerIterator(inner query.UnsignedIterator, logger *zap.Logg
}
func (itr *unsignedFinalizerIterator) closeGC() {
runtime.SetFinalizer(itr, nil)
itr.logger.Error("UnsignedIterator finalized by GC")
itr.Close()
go func() {
itr.logger.Error("UnsignedIterator finalized by GC")
itr.Close()
}()
}
func (itr *unsignedFinalizerIterator) Close() error {
@ -1524,9 +1527,10 @@ func newStringFinalizerIterator(inner query.StringIterator, logger *zap.Logger)
}
func (itr *stringFinalizerIterator) closeGC() {
runtime.SetFinalizer(itr, nil)
itr.logger.Error("StringIterator finalized by GC")
itr.Close()
go func() {
itr.logger.Error("StringIterator finalized by GC")
itr.Close()
}()
}
func (itr *stringFinalizerIterator) Close() error {
@ -1987,9 +1991,10 @@ func newBooleanFinalizerIterator(inner query.BooleanIterator, logger *zap.Logger
}
func (itr *booleanFinalizerIterator) closeGC() {
runtime.SetFinalizer(itr, nil)
itr.logger.Error("BooleanIterator finalized by GC")
itr.Close()
go func() {
itr.logger.Error("BooleanIterator finalized by GC")
itr.Close()
}()
}
func (itr *booleanFinalizerIterator) Close() error {

View File

@ -131,9 +131,10 @@ func new{{.Name}}FinalizerIterator(inner query.{{.Name}}Iterator, logger *zap.Lo
}
func (itr *{{.name}}FinalizerIterator) closeGC() {
runtime.SetFinalizer(itr, nil)
itr.logger.Error("{{.Name}}Iterator finalized by GC")
itr.Close()
go func() {
itr.logger.Error("{{.Name}}Iterator finalized by GC")
itr.Close()
}()
}
func (itr *{{.name}}FinalizerIterator) Close() error {

View File

@ -1,42 +0,0 @@
package tsm1
import (
"testing"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxql"
)
func BenchmarkIntegerIterator_Next(b *testing.B) {
opt := query.IteratorOptions{
Aux: []influxql.VarRef{{Val: "f1"}, {Val: "f1"}, {Val: "f1"}, {Val: "f1"}},
}
aux := []cursorAt{
&literalValueCursor{value: "foo bar"},
&literalValueCursor{value: int64(1e3)},
&literalValueCursor{value: float64(1e3)},
&literalValueCursor{value: true},
}
cur := newIntegerIterator("m0", query.Tags{}, opt, &infiniteIntegerCursor{}, aux, nil, nil)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
cur.Next()
}
}
type infiniteIntegerCursor struct{}
func (*infiniteIntegerCursor) close() error {
return nil
}
func (*infiniteIntegerCursor) next() (t int64, v interface{}) {
return 0, 0
}
func (*infiniteIntegerCursor) nextInteger() (t int64, v int64) {
return 0, 0
}

View File

@ -0,0 +1,148 @@
package tsm1
import (
"os"
"runtime"
"testing"
"time"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxql"
)
func BenchmarkIntegerIterator_Next(b *testing.B) {
opt := query.IteratorOptions{
Aux: []influxql.VarRef{{Val: "f1"}, {Val: "f1"}, {Val: "f1"}, {Val: "f1"}},
}
aux := []cursorAt{
&literalValueCursor{value: "foo bar"},
&literalValueCursor{value: int64(1e3)},
&literalValueCursor{value: float64(1e3)},
&literalValueCursor{value: true},
}
cur := newIntegerIterator("m0", query.Tags{}, opt, &infiniteIntegerCursor{}, aux, nil, nil)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
cur.Next()
}
}
type infiniteIntegerCursor struct{}
func (*infiniteIntegerCursor) close() error {
return nil
}
func (*infiniteIntegerCursor) next() (t int64, v interface{}) {
return 0, 0
}
func (*infiniteIntegerCursor) nextInteger() (t int64, v int64) {
return 0, 0
}
type testFinalizerIterator struct {
OnClose func()
}
func (itr *testFinalizerIterator) Next() (*query.FloatPoint, error) {
return nil, nil
}
func (itr *testFinalizerIterator) Close() error {
// Act as if this is a slow finalizer and ensure that it doesn't block
// the finalizer background thread.
itr.OnClose()
return nil
}
func (itr *testFinalizerIterator) Stats() query.IteratorStats {
return query.IteratorStats{}
}
func TestFinalizerIterator(t *testing.T) {
var (
step1 = make(chan struct{})
step2 = make(chan struct{})
step3 = make(chan struct{})
)
l := logger.New(os.Stderr)
done := make(chan struct{})
func() {
itr := &testFinalizerIterator{
OnClose: func() {
// Simulate a slow closing iterator by waiting for the done channel
// to be closed. The done channel is closed by a later finalizer.
close(step1)
<-done
close(step3)
},
}
newFinalizerIterator(itr, l)
}()
for i := 0; i < 100; i++ {
runtime.GC()
}
timer := time.NewTimer(100 * time.Millisecond)
select {
case <-timer.C:
t.Fatal("The finalizer for the iterator did not run")
close(done)
case <-step1:
// The finalizer has successfully run, but should not have completed yet.
timer.Stop()
}
select {
case <-step3:
t.Fatal("The finalizer should not have finished yet")
default:
}
// Use a fake value that will be collected by the garbage collector and have
// the finalizer close the channel. This finalizer should run after the iterator's
// finalizer.
value := func() int {
foo := &struct {
value int
}{value: 1}
runtime.SetFinalizer(foo, func(value interface{}) {
close(done)
close(step2)
})
return foo.value + 2
}()
if value < 2 {
t.Log("This should never be output")
}
for i := 0; i < 100; i++ {
runtime.GC()
}
timer.Reset(100 * time.Millisecond)
select {
case <-timer.C:
t.Fatal("The second finalizer did not run")
case <-step2:
// The finalizer has successfully run and should have
// closed the done channel.
timer.Stop()
}
// Wait for step3 to finish where the closed value should be set.
timer.Reset(100 * time.Millisecond)
select {
case <-timer.C:
t.Fatal("The iterator was not finalized")
case <-step3:
timer.Stop()
}
}

View File

@ -37,6 +37,7 @@ type Index interface {
TagSets(name []byte, options query.IteratorOptions) ([]*query.TagSet, error)
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, key string) bool
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
TagKeyCardinality(name, key []byte) int

View File

@ -274,6 +274,48 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s
return mm.TagKeysByExpr(expr)
}
// TagKeyHasAuthorizedSeries determines if there exists an authorized series for
// the provided measurement name and tag key.
func (i *Index) TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, key string) bool {
i.mu.RLock()
mm := i.measurements[string(name)]
i.mu.RUnlock()
if mm == nil {
return false
}
// TODO(edd): This looks like it's inefficient. Since a series can have multiple
// tag key/value pairs on it, it's possible that the same unauthorised series
// will be checked multiple times. It would be more efficient if it were
// possible to get the set of unique series IDs for a given measurement name
// and tag key.
var authorized bool
mm.SeriesByTagKeyValue(key).Range(func(_ string, seriesIDs SeriesIDs) bool {
if auth == nil || auth == query.OpenAuthorizer {
authorized = true
return false
}
for _, id := range seriesIDs {
s := mm.SeriesByID(id)
if s == nil {
continue
}
if auth.AuthorizeSeriesRead(i.database, mm.name, s.Tags()) {
authorized = true
return false
}
}
// This tag key/value combination doesn't have any authorised series, so
// keep checking other tag values.
return true
})
return authorized
}
// MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression.
//
// See tsm1.Engine.MeasurementTagKeyValuesByExpr for a fuller description of this

View File

@ -1351,15 +1351,15 @@ func (t *TagKeyValue) LoadByte(value []byte) SeriesIDs {
// TagKeyValue is a no-op.
//
// If f returns false then iteration over any remaining keys or values will cease.
func (t *TagKeyValue) Range(f func(k string, a SeriesIDs) bool) {
func (t *TagKeyValue) Range(f func(tagValue string, a SeriesIDs) bool) {
if t == nil {
return
}
t.mu.RLock()
defer t.mu.RUnlock()
for k, a := range t.valueIDs {
if !f(k, a) {
for tagValue, a := range t.valueIDs {
if !f(tagValue, a) {
return
}
}

View File

@ -642,6 +642,29 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s
return fs.MeasurementTagKeysByExpr(name, expr)
}
// TagKeyHasAuthorizedSeries determines if there exist authorized series for the
// provided measurement name and tag key.
func (i *Index) TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, key string) bool {
fs := i.RetainFileSet()
defer fs.Release()
itr := fs.TagValueIterator(name, []byte(key))
for val := itr.Next(); val != nil; val = itr.Next() {
if auth == nil || auth == query.OpenAuthorizer {
return true
}
// Identify an authorized series.
si := fs.TagValueSeriesIterator(name, []byte(key), val.Value())
for se := si.Next(); se != nil; se = si.Next() {
if auth.AuthorizeSeriesRead(i.Database, se.Name(), se.Tags()) {
return true
}
}
}
return false
}
// MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression.
//
// See tsm1.Engine.MeasurementTagKeyValuesByExpr for a fuller description of this

View File

@ -1,9 +1,8 @@
// Code generated by protoc-gen-gogo.
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: internal/meta.proto
// DO NOT EDIT!
/*
Package meta is a generated protocol buffer package.
Package tsdb is a generated protocol buffer package.
It is generated from these files:
internal/meta.proto
@ -13,8 +12,9 @@ It has these top-level messages:
Tag
MeasurementFields
Field
MeasurementFieldSet
*/
package meta
package tsdb
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
@ -32,9 +32,8 @@ var _ = math.Inf
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type Series struct {
Key *string `protobuf:"bytes,1,req,name=Key" json:"Key,omitempty"`
Tags []*Tag `protobuf:"bytes,2,rep,name=Tags" json:"Tags,omitempty"`
XXX_unrecognized []byte `json:"-"`
Key string `protobuf:"bytes,1,opt,name=Key,proto3" json:"Key,omitempty"`
Tags []*Tag `protobuf:"bytes,2,rep,name=Tags" json:"Tags,omitempty"`
}
func (m *Series) Reset() { *m = Series{} }
@ -43,8 +42,8 @@ func (*Series) ProtoMessage() {}
func (*Series) Descriptor() ([]byte, []int) { return fileDescriptorMeta, []int{0} }
func (m *Series) GetKey() string {
if m != nil && m.Key != nil {
return *m.Key
if m != nil {
return m.Key
}
return ""
}
@ -57,9 +56,8 @@ func (m *Series) GetTags() []*Tag {
}
type Tag struct {
Key *string `protobuf:"bytes,1,req,name=Key" json:"Key,omitempty"`
Value *string `protobuf:"bytes,2,req,name=Value" json:"Value,omitempty"`
XXX_unrecognized []byte `json:"-"`
Key string `protobuf:"bytes,1,opt,name=Key,proto3" json:"Key,omitempty"`
Value string `protobuf:"bytes,2,opt,name=Value,proto3" json:"Value,omitempty"`
}
func (m *Tag) Reset() { *m = Tag{} }
@ -68,22 +66,22 @@ func (*Tag) ProtoMessage() {}
func (*Tag) Descriptor() ([]byte, []int) { return fileDescriptorMeta, []int{1} }
func (m *Tag) GetKey() string {
if m != nil && m.Key != nil {
return *m.Key
if m != nil {
return m.Key
}
return ""
}
func (m *Tag) GetValue() string {
if m != nil && m.Value != nil {
return *m.Value
if m != nil {
return m.Value
}
return ""
}
type MeasurementFields struct {
Fields []*Field `protobuf:"bytes,1,rep,name=Fields" json:"Fields,omitempty"`
XXX_unrecognized []byte `json:"-"`
Name string `protobuf:"bytes,1,opt,name=Name,proto3" json:"Name,omitempty"`
Fields []*Field `protobuf:"bytes,2,rep,name=Fields" json:"Fields,omitempty"`
}
func (m *MeasurementFields) Reset() { *m = MeasurementFields{} }
@ -91,6 +89,13 @@ func (m *MeasurementFields) String() string { return proto.CompactTex
func (*MeasurementFields) ProtoMessage() {}
func (*MeasurementFields) Descriptor() ([]byte, []int) { return fileDescriptorMeta, []int{2} }
func (m *MeasurementFields) GetName() string {
if m != nil {
return m.Name
}
return ""
}
func (m *MeasurementFields) GetFields() []*Field {
if m != nil {
return m.Fields
@ -99,10 +104,9 @@ func (m *MeasurementFields) GetFields() []*Field {
}
type Field struct {
ID *int32 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"`
Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"`
Type *int32 `protobuf:"varint,3,req,name=Type" json:"Type,omitempty"`
XXX_unrecognized []byte `json:"-"`
ID int32 `protobuf:"varint,1,opt,name=ID,proto3" json:"ID,omitempty"`
Name string `protobuf:"bytes,2,opt,name=Name,proto3" json:"Name,omitempty"`
Type int32 `protobuf:"varint,3,opt,name=Type,proto3" json:"Type,omitempty"`
}
func (m *Field) Reset() { *m = Field{} }
@ -111,47 +115,68 @@ func (*Field) ProtoMessage() {}
func (*Field) Descriptor() ([]byte, []int) { return fileDescriptorMeta, []int{3} }
func (m *Field) GetID() int32 {
if m != nil && m.ID != nil {
return *m.ID
if m != nil {
return m.ID
}
return 0
}
func (m *Field) GetName() string {
if m != nil && m.Name != nil {
return *m.Name
if m != nil {
return m.Name
}
return ""
}
func (m *Field) GetType() int32 {
if m != nil && m.Type != nil {
return *m.Type
if m != nil {
return m.Type
}
return 0
}
type MeasurementFieldSet struct {
Measurements []*MeasurementFields `protobuf:"bytes,1,rep,name=Measurements" json:"Measurements,omitempty"`
}
func (m *MeasurementFieldSet) Reset() { *m = MeasurementFieldSet{} }
func (m *MeasurementFieldSet) String() string { return proto.CompactTextString(m) }
func (*MeasurementFieldSet) ProtoMessage() {}
func (*MeasurementFieldSet) Descriptor() ([]byte, []int) { return fileDescriptorMeta, []int{4} }
func (m *MeasurementFieldSet) GetMeasurements() []*MeasurementFields {
if m != nil {
return m.Measurements
}
return nil
}
func init() {
proto.RegisterType((*Series)(nil), "meta.Series")
proto.RegisterType((*Tag)(nil), "meta.Tag")
proto.RegisterType((*MeasurementFields)(nil), "meta.MeasurementFields")
proto.RegisterType((*Field)(nil), "meta.Field")
proto.RegisterType((*Series)(nil), "tsdb.Series")
proto.RegisterType((*Tag)(nil), "tsdb.Tag")
proto.RegisterType((*MeasurementFields)(nil), "tsdb.MeasurementFields")
proto.RegisterType((*Field)(nil), "tsdb.Field")
proto.RegisterType((*MeasurementFieldSet)(nil), "tsdb.MeasurementFieldSet")
}
func init() { proto.RegisterFile("internal/meta.proto", fileDescriptorMeta) }
var fileDescriptorMeta = []byte{
// 180 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x54, 0x8c, 0xbd, 0xca, 0xc2, 0x30,
0x14, 0x40, 0x69, 0xd2, 0x16, 0x7a, 0xfb, 0x7d, 0x83, 0x71, 0x30, 0xe0, 0x52, 0x33, 0x75, 0x6a,
0xc5, 0x67, 0x10, 0x41, 0x44, 0x17, 0x83, 0xfb, 0x05, 0x2f, 0xa5, 0xd0, 0x3f, 0x92, 0x74, 0xe8,
0xdb, 0x4b, 0x52, 0x17, 0xb7, 0x73, 0xee, 0xcf, 0x81, 0x6d, 0x3b, 0x38, 0x32, 0x03, 0x76, 0x75,
0x4f, 0x0e, 0xab, 0xc9, 0x8c, 0x6e, 0x14, 0xb1, 0x67, 0x55, 0x41, 0xfa, 0x24, 0xd3, 0x92, 0x15,
0x39, 0xf0, 0x1b, 0x2d, 0x32, 0x2a, 0x58, 0x99, 0x89, 0x1d, 0xc4, 0x1a, 0x1b, 0x2b, 0x59, 0xc1,
0xcb, 0xfc, 0x94, 0x55, 0xe1, 0x4f, 0x63, 0xa3, 0x0e, 0xc0, 0x35, 0x36, 0xbf, 0xc7, 0xff, 0x90,
0xbc, 0xb0, 0x9b, 0x49, 0x32, 0xaf, 0xea, 0x08, 0x9b, 0x3b, 0xa1, 0x9d, 0x0d, 0xf5, 0x34, 0xb8,
0x4b, 0x4b, 0xdd, 0xdb, 0x8a, 0x3d, 0xa4, 0x2b, 0xc9, 0x28, 0x24, 0xf3, 0x35, 0x19, 0x66, 0xaa,
0x86, 0x24, 0x80, 0x00, 0x60, 0xd7, 0x73, 0xa8, 0x26, 0xe2, 0x0f, 0xe2, 0x07, 0xf6, 0xdf, 0xa8,
0x37, 0xbd, 0x4c, 0x24, 0xb9, 0xdf, 0x7d, 0x02, 0x00, 0x00, 0xff, 0xff, 0x04, 0x3d, 0x58, 0x4a,
0xd1, 0x00, 0x00, 0x00,
// 242 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x90, 0xbd, 0x4b, 0x03, 0x41,
0x10, 0xc5, 0xb9, 0xbd, 0x0f, 0xc8, 0x44, 0x44, 0x27, 0x82, 0xdb, 0x08, 0x61, 0x6d, 0xd2, 0x78,
0x82, 0x56, 0x62, 0x61, 0x13, 0x84, 0xe0, 0x47, 0xb1, 0x39, 0xec, 0x27, 0x64, 0x38, 0x0e, 0xee,
0x2e, 0x61, 0x77, 0x53, 0xe4, 0xbf, 0x97, 0xcc, 0x1e, 0x12, 0x35, 0xdd, 0xdb, 0x37, 0xf3, 0xe6,
0xfd, 0x58, 0x98, 0x34, 0x7d, 0x60, 0xd7, 0x53, 0x7b, 0xdf, 0x71, 0xa0, 0x72, 0xeb, 0x36, 0x61,
0x83, 0x59, 0xf0, 0xeb, 0x95, 0x79, 0x82, 0x62, 0xc9, 0xae, 0x61, 0x8f, 0x17, 0x90, 0xbe, 0xf1,
0x5e, 0x27, 0xd3, 0x64, 0x36, 0xb2, 0x07, 0x89, 0x37, 0x90, 0x55, 0x54, 0x7b, 0xad, 0xa6, 0xe9,
0x6c, 0xfc, 0x30, 0x2a, 0x0f, 0x81, 0xb2, 0xa2, 0xda, 0x8a, 0x6d, 0xee, 0x20, 0xad, 0xa8, 0x3e,
0x91, 0xbb, 0x82, 0xfc, 0x8b, 0xda, 0x1d, 0x6b, 0x25, 0x5e, 0x7c, 0x98, 0x77, 0xb8, 0xfc, 0x60,
0xf2, 0x3b, 0xc7, 0x1d, 0xf7, 0xe1, 0xb5, 0xe1, 0x76, 0xed, 0x11, 0x21, 0xfb, 0xa4, 0x8e, 0x87,
0xb4, 0x68, 0xbc, 0x85, 0x22, 0x4e, 0x87, 0xe2, 0x71, 0x2c, 0x16, 0xcf, 0x0e, 0x23, 0xf3, 0x02,
0xb9, 0x28, 0x3c, 0x07, 0xb5, 0x98, 0x4b, 0x3e, 0xb7, 0x6a, 0x31, 0xff, 0xb9, 0xa8, 0x8e, 0x2e,
0x22, 0x64, 0xd5, 0x7e, 0xcb, 0x3a, 0x95, 0x2d, 0xd1, 0xc6, 0xc2, 0xe4, 0x2f, 0xce, 0x92, 0x03,
0x3e, 0xc3, 0xd9, 0x91, 0xed, 0x75, 0x22, 0x08, 0xd7, 0x11, 0xe1, 0x1f, 0xbf, 0xfd, 0xb5, 0xbc,
0x2a, 0xe4, 0x67, 0x1f, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0xac, 0xee, 0x08, 0x52, 0x70, 0x01,
0x00, 0x00,
}

View File

@ -1,4 +1,6 @@
package meta;
syntax = "proto3";
package tsdb;
//========================================================================
//
@ -7,21 +9,25 @@ package meta;
//========================================================================
message Series {
required string Key = 1;
string Key = 1;
repeated Tag Tags = 2;
}
message Tag {
required string Key = 1;
required string Value = 2;
string Key = 1;
string Value = 2;
}
message MeasurementFields {
repeated Field Fields = 1;
string Name = 1;
repeated Field Fields = 2;
}
message Field {
required int32 ID = 1;
required string Name = 2;
required int32 Type = 3;
}
string Name = 1;
int32 Type = 2;
}
message MeasurementFieldSet {
repeated MeasurementFields Measurements = 1;
}

View File

@ -6,6 +6,8 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"regexp"
"runtime"
@ -18,6 +20,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/pkg/file"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/influxdata/influxdb/query"
internal "github.com/influxdata/influxdb/tsdb/internal"
@ -63,6 +66,13 @@ var (
// ErrShardDisabled is returned when a the shard is not available for
// queries or writes.
ErrShardDisabled = errors.New("shard is disabled")
// ErrUnknownFieldsFormat is returned when the fields index file is not identifiable by
// the file's magic number.
ErrUnknownFieldsFormat = errors.New("unknown field index format")
// fieldsIndexMagicNumber is the file magic number for the fields index file.
fieldsIndexMagicNumber = []byte{0, 6, 1, 3}
)
var (
@ -684,13 +694,17 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error
// add fields
for _, f := range fieldsToCreate {
mf := engine.MeasurementFields(f.Measurement)
if err := mf.CreateFieldIfNotExists([]byte(f.Field.Name), f.Field.Type, false); err != nil {
if err := mf.CreateFieldIfNotExists([]byte(f.Field.Name), f.Field.Type); err != nil {
return err
}
s.index.SetFieldName(f.Measurement, f.Field.Name)
}
if len(fieldsToCreate) > 0 {
return engine.MeasurementFieldSet().Save()
}
return nil
}
@ -776,6 +790,16 @@ func (s *Shard) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([]
return engine.MeasurementSeriesKeysByExpr(name, expr)
}
// TagKeyHasAuthorizedSeries determines if there exists an authorised series on
// the provided measurement with the provided tag key.
func (s *Shard) TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, key string) bool {
engine, err := s.engine()
if err != nil {
return false
}
return engine.TagKeyHasAuthorizedSeries(auth, name, key)
}
// MeasurementTagKeysByExpr returns all the tag keys for the provided expression.
func (s *Shard) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
engine, err := s.engine()
@ -1392,41 +1416,10 @@ func (m *MeasurementFields) FieldKeys() []string {
return a
}
// MarshalBinary encodes the object to a binary format.
func (m *MeasurementFields) MarshalBinary() ([]byte, error) {
m.mu.RLock()
defer m.mu.RUnlock()
var pb internal.MeasurementFields
for _, f := range m.fields {
id := int32(f.ID)
name := f.Name
t := int32(f.Type)
pb.Fields = append(pb.Fields, &internal.Field{ID: &id, Name: &name, Type: &t})
}
return proto.Marshal(&pb)
}
// UnmarshalBinary decodes the object from a binary format.
func (m *MeasurementFields) UnmarshalBinary(buf []byte) error {
m.mu.Lock()
defer m.mu.Unlock()
var pb internal.MeasurementFields
if err := proto.Unmarshal(buf, &pb); err != nil {
return err
}
m.fields = make(map[string]*Field, len(pb.Fields))
for _, f := range pb.Fields {
m.fields[f.GetName()] = &Field{ID: uint8(f.GetID()), Name: f.GetName(), Type: influxql.DataType(f.GetType())}
}
return nil
}
// CreateFieldIfNotExists creates a new field with an autoincrementing ID.
// Returns an error if 255 fields have already been created on the measurement or
// the fields already exists with a different type.
func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType, limitCount bool) error {
func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType) error {
m.mu.RLock()
// Ignore if the field already exists.
@ -1507,6 +1500,16 @@ func (m *MeasurementFields) FieldSet() map[string]influxql.DataType {
return fields
}
func (m *MeasurementFields) ForEachField(fn func(name string, typ influxql.DataType) bool) {
m.mu.RLock()
defer m.mu.RUnlock()
for name, f := range m.fields {
if !fn(name, f.Type) {
return
}
}
}
// Clone returns copy of the MeasurementFields
func (m *MeasurementFields) Clone() *MeasurementFields {
m.mu.RLock()
@ -1525,13 +1528,21 @@ func (m *MeasurementFields) Clone() *MeasurementFields {
type MeasurementFieldSet struct {
mu sync.RWMutex
fields map[string]*MeasurementFields
// path is the location to persist field sets
path string
}
// NewMeasurementFieldSet returns a new instance of MeasurementFieldSet.
func NewMeasurementFieldSet() *MeasurementFieldSet {
return &MeasurementFieldSet{
func NewMeasurementFieldSet(path string) (*MeasurementFieldSet, error) {
fs := &MeasurementFieldSet{
fields: make(map[string]*MeasurementFields),
path: path,
}
if err := fs.load(); err != nil {
return nil, err
}
return fs, nil
}
// Fields returns fields for a measurement by name.
@ -1582,6 +1593,119 @@ func (fs *MeasurementFieldSet) DeleteWithLock(name string, fn func() error) erro
return nil
}
func (fs *MeasurementFieldSet) IsEmpty() bool {
fs.mu.RLock()
defer fs.mu.RUnlock()
return len(fs.fields) == 0
}
func (fs *MeasurementFieldSet) Save() error {
fs.mu.Lock()
defer fs.mu.Unlock()
return fs.saveNoLock()
}
func (fs *MeasurementFieldSet) saveNoLock() error {
// No fields left, remove the fields index file
if len(fs.fields) == 0 {
return os.RemoveAll(fs.path)
}
// Write the new index to a temp file and rename when it's sync'd
path := fs.path + ".tmp"
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
if err != nil {
return err
}
defer os.RemoveAll(path)
if _, err := fd.Write(fieldsIndexMagicNumber); err != nil {
return err
}
pb := internal.MeasurementFieldSet{
Measurements: make([]*internal.MeasurementFields, 0, len(fs.fields)),
}
for name, mf := range fs.fields {
fs := &internal.MeasurementFields{
Name: name,
Fields: make([]*internal.Field, 0, mf.FieldN()),
}
mf.ForEachField(func(field string, typ influxql.DataType) bool {
fs.Fields = append(fs.Fields, &internal.Field{Name: field, Type: int32(typ)})
return true
})
pb.Measurements = append(pb.Measurements, fs)
}
b, err := proto.Marshal(&pb)
if err != nil {
return err
}
if _, err := fd.Write(b); err != nil {
return err
}
if err = fd.Sync(); err != nil {
return err
}
//close file handle before renaming to support Windows
if err = fd.Close(); err != nil {
return err
}
return file.RenameFile(path, fs.path)
}
func (fs *MeasurementFieldSet) load() error {
fs.mu.Lock()
defer fs.mu.Unlock()
fd, err := os.Open(fs.path)
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
defer fd.Close()
var magic [4]byte
if _, err := fd.Read(magic[:]); err != nil {
return err
}
if !bytes.Equal(magic[:], fieldsIndexMagicNumber) {
return ErrUnknownFieldsFormat
}
var pb internal.MeasurementFieldSet
b, err := ioutil.ReadAll(fd)
if err != nil {
return err
}
if err := proto.Unmarshal(b, &pb); err != nil {
return err
}
fs.fields = make(map[string]*MeasurementFields, len(pb.GetMeasurements()))
for _, measurement := range pb.GetMeasurements() {
set := &MeasurementFields{
fields: make(map[string]*Field, len(measurement.GetFields())),
}
for _, field := range measurement.GetFields() {
set.fields[field.GetName()] = &Field{Name: field.GetName(), Type: influxql.DataType(field.GetType())}
}
fs.fields[measurement.GetName()] = set
}
return nil
}
// Field represents a series field.
type Field struct {
ID uint8 `json:"id,omitempty"`

View File

@ -1402,6 +1402,102 @@ _reserved,region=uswest value="foo" 0
}
}
func TestMeasurementFieldSet_SaveLoad(t *testing.T) {
dir, cleanup := MustTempDir()
defer cleanup()
path := filepath.Join(dir, "fields.idx")
mf, err := tsdb.NewMeasurementFieldSet(path)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
fields := mf.CreateFieldsIfNotExists([]byte("cpu"))
if err := fields.CreateFieldIfNotExists([]byte("value"), influxql.Float); err != nil {
t.Fatalf("create field error: %v", err)
}
if err := mf.Save(); err != nil {
t.Fatalf("save error: %v", err)
}
mf, err = tsdb.NewMeasurementFieldSet(path)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
fields = mf.Fields("cpu")
field := fields.Field("value")
if field == nil {
t.Fatalf("field is null")
}
if got, exp := field.Type, influxql.Float; got != exp {
t.Fatalf("field type mismatch: got %v, exp %v", got, exp)
}
}
func TestMeasurementFieldSet_DeleteEmpty(t *testing.T) {
dir, cleanup := MustTempDir()
defer cleanup()
path := filepath.Join(dir, "fields.idx")
mf, err := tsdb.NewMeasurementFieldSet(path)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
fields := mf.CreateFieldsIfNotExists([]byte("cpu"))
if err := fields.CreateFieldIfNotExists([]byte("value"), influxql.Float); err != nil {
t.Fatalf("create field error: %v", err)
}
if err := mf.Save(); err != nil {
t.Fatalf("save error: %v", err)
}
mf, err = tsdb.NewMeasurementFieldSet(path)
if err != nil {
t.Fatalf("NewMeasurementFieldSet error: %v", err)
}
fields = mf.Fields("cpu")
field := fields.Field("value")
if field == nil {
t.Fatalf("field is null")
}
if got, exp := field.Type, influxql.Float; got != exp {
t.Fatalf("field type mismatch: got %v, exp %v", got, exp)
}
mf.Delete("cpu")
if err := mf.Save(); err != nil {
t.Fatalf("save after delete error: %v", err)
}
if _, err := os.Stat(path); !os.IsNotExist(err) {
t.Fatalf("got %v, not exist err", err)
}
}
func TestMeasurementFieldSet_InvalidFormat(t *testing.T) {
dir, cleanup := MustTempDir()
defer cleanup()
path := filepath.Join(dir, "fields.idx")
if err := ioutil.WriteFile(path, []byte{0, 0}, 0666); err != nil {
t.Fatalf("error writing fields.index: %v", err)
}
_, err := tsdb.NewMeasurementFieldSet(path)
if err != tsdb.ErrUnknownFieldsFormat {
t.Fatalf("unexpected error: got %v, exp %v", err, tsdb.ErrUnknownFieldsFormat)
}
}
func BenchmarkWritePoints_NewSeries_1K(b *testing.B) { benchmarkWritePoints(b, 38, 3, 3, 1) }
func BenchmarkWritePoints_NewSeries_100K(b *testing.B) { benchmarkWritePoints(b, 32, 5, 5, 1) }
func BenchmarkWritePoints_NewSeries_250K(b *testing.B) { benchmarkWritePoints(b, 80, 5, 5, 1) }
@ -1693,3 +1789,11 @@ func (sh *Shard) MustWritePointsString(s string) {
panic(err)
}
}
func MustTempDir() (string, func()) {
dir, err := ioutil.TempDir("", "shard-test")
if err != nil {
panic(fmt.Sprintf("failed to create temp dir: %v", err))
}
return dir, func() { os.RemoveAll(dir) }
}

View File

@ -1096,7 +1096,7 @@ func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.
var results []TagKeys
for _, name := range names {
// Build keyset over all shards for measurement.
keySet := make(map[string]struct{})
keySet := map[string]struct{}{}
for _, sh := range shards {
shardKeySet, err := sh.MeasurementTagKeysByExpr([]byte(name), nil)
if err != nil {
@ -1105,6 +1105,21 @@ func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.
continue
}
// If no tag value filter is present then all the tag keys can be returned
// If they have authorized series associated with them.
if filterExpr == nil {
for tagKey := range shardKeySet {
if sh.TagKeyHasAuthorizedSeries(auth, []byte(name), tagKey) {
keySet[tagKey] = struct{}{}
}
}
continue
}
// A tag value condition has been supplied. For each tag key filter
// the set of tag values by the condition. Only tag keys with remaining
// tag values will be included in the result set.
// Sort the tag keys.
shardKeys := make([]string, 0, len(shardKeySet))
for k := range shardKeySet {
@ -1112,7 +1127,10 @@ func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.
}
sort.Strings(shardKeys)
// Filter against tag values, skip if no values exist.
// TODO(edd): This is very expensive. We're materialising all unfiltered
// tag values for all required tag keys, only to see if we have any.
// Then we're throwing them all away as we only care about the tag
// keys in the result set.
shardValues, err := sh.MeasurementTagKeyValuesByExpr(auth, []byte(name), shardKeys, filterExpr, true)
if err != nil {
return nil, err