influxdb/tsdb/index.go

3177 lines
82 KiB
Go
Raw Normal View History

2016-09-14 14:15:23 +00:00
package tsdb
import (
2017-11-29 18:20:18 +00:00
"bytes"
"errors"
2016-11-15 16:20:00 +00:00
"fmt"
"io"
2016-11-15 16:20:00 +00:00
"os"
2016-09-14 14:15:23 +00:00
"regexp"
2016-11-15 16:20:00 +00:00
"sort"
2017-11-27 14:52:18 +00:00
"sync"
2016-09-14 14:15:23 +00:00
"github.com/influxdata/influxdb/models"
2017-11-29 18:20:18 +00:00
"github.com/influxdata/influxdb/pkg/bytesutil"
2016-09-21 15:04:37 +00:00
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/pkg/slices"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxql"
"go.uber.org/zap"
2016-09-14 14:15:23 +00:00
)
2018-08-21 13:32:30 +00:00
// Available index types.
const (
InmemIndexName = "inmem"
TSI1IndexName = "tsi1"
)
// ErrIndexClosing can be returned to from an Index method if the index is currently closing.
var ErrIndexClosing = errors.New("index is closing")
2016-09-14 14:15:23 +00:00
type Index interface {
2016-10-21 15:48:00 +00:00
Open() error
Close() error
WithLogger(*zap.Logger)
2016-10-21 15:48:00 +00:00
2017-11-29 18:20:18 +00:00
Database() string
2016-12-28 19:59:09 +00:00
MeasurementExists(name []byte) (bool, error)
2016-11-11 16:25:53 +00:00
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
2016-09-29 09:39:13 +00:00
DropMeasurement(name []byte) error
2016-12-15 15:31:18 +00:00
ForEachMeasurementName(fn func(name []byte) error) error
2016-09-14 14:15:23 +00:00
InitializeSeries(keys, names [][]byte, tags []models.Tags) error
feat: series creation ingress metrics (#20700) After turning this on and testing locally, note the 'seriesCreated' metric "localStore": {"name":"localStore","tags":null,"values":{"pointsWritten":2987,"seriesCreated":58,"valuesWritten":23754}}, "ingress": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"cq","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":4}}, "ingress:1": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"database","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":4}}, "ingress:2": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"httpd","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":46}}, "ingress:3": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"ingress","rp":"monitor"},"values":{"pointsWritten":14,"seriesCreated":14,"valuesWritten":42}}, "ingress:4": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"localStore","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":6}}, "ingress:5": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"queryExecutor","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":10}}, "ingress:6": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"runtime","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":30}}, "ingress:7": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"shard","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":22}}, "ingress:8": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"subscriber","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":6}}, "ingress:9": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"tsm1_cache","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":18}}, "ingress:10": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"tsm1_engine","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":58}}, "ingress:11": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"tsm1_filestore","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":4}}, "ingress:12": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"tsm1_wal","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":2,"valuesWritten":8}}, "ingress:13": {"name":"ingress","tags":{"db":"_internal","login":"_systemuser_monitor","measurement":"write","rp":"monitor"},"values":{"pointsWritten":2,"seriesCreated":1,"valuesWritten":18}}, "ingress:14": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"cpu","rp":"autogen"},"values":{"pointsWritten":1342,"seriesCreated":13,"valuesWritten":13420}}, "ingress:15": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"disk","rp":"autogen"},"values":{"pointsWritten":642,"seriesCreated":6,"valuesWritten":4494}}, "ingress:16": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"diskio","rp":"autogen"},"values":{"pointsWritten":214,"seriesCreated":2,"valuesWritten":2354}}, "ingress:17": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"mem","rp":"autogen"},"values":{"pointsWritten":107,"seriesCreated":1,"valuesWritten":963}}, "ingress:18": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"processes","rp":"autogen"},"values":{"pointsWritten":107,"seriesCreated":1,"valuesWritten":856}}, "ingress:19": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"swap","rp":"autogen"},"values":{"pointsWritten":214,"seriesCreated":1,"valuesWritten":642}}, "ingress:20": {"name":"ingress","tags":{"db":"telegraf","login":"_systemuser_unknown","measurement":"system","rp":"autogen"},"values":{"pointsWritten":321,"seriesCreated":1,"valuesWritten":749}}, Closes: https://github.com/influxdata/influxdb/issues/20613
2021-02-05 18:52:43 +00:00
CreateSeriesIfNotExists(key, name []byte, tags models.Tags, tracker StatsTracker) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags, tracker StatsTracker) error
DropSeries(seriesID uint64, key []byte, cascade bool) error
DropSeriesList(seriesID []uint64, key [][]byte, cascade bool) error
DropMeasurementIfSeriesNotExist(name []byte) (bool, error)
2016-09-14 14:15:23 +00:00
// Used to clean up series in inmem index that were dropped with a shard.
DropSeriesGlobal(key []byte) error
2016-09-23 13:33:47 +00:00
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
2016-11-29 12:26:52 +00:00
SeriesN() int64
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
2018-05-15 21:57:37 +00:00
SeriesIDSet() *SeriesIDSet
2016-09-14 14:15:23 +00:00
HasTagKey(name, key []byte) (bool, error)
2017-12-05 17:49:58 +00:00
HasTagValue(name, key, value []byte) (bool, error)
2016-12-06 17:30:41 +00:00
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
2017-03-24 15:48:10 +00:00
TagKeyCardinality(name, key []byte) int
2016-11-28 16:59:36 +00:00
// InfluxQL system iterators
2017-11-29 18:20:18 +00:00
MeasurementIterator() (MeasurementIterator, error)
2017-12-05 17:49:58 +00:00
TagKeyIterator(name []byte) (TagKeyIterator, error)
2017-12-12 21:22:42 +00:00
TagValueIterator(name, key []byte) (TagValueIterator, error)
2017-11-29 18:20:18 +00:00
MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, error)
TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error)
TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error)
2016-11-28 16:59:36 +00:00
// Sets a shared fieldset from the engine.
2017-11-29 18:20:18 +00:00
FieldSet() *MeasurementFieldSet
2016-11-28 16:59:36 +00:00
SetFieldSet(fs *MeasurementFieldSet)
2016-11-16 18:57:55 +00:00
2017-12-08 17:11:07 +00:00
// Size of the index on disk, if applicable.
DiskSizeBytes() int64
// Bytes estimates the memory footprint of this Index, in bytes.
Bytes() int
2016-11-16 18:57:55 +00:00
// To be removed w/ tsi1.
SetFieldName(measurement []byte, name string)
2017-02-09 17:59:14 +00:00
Type() string
// Returns a unique reference ID to the index instance.
// For inmem, returns a reference to the backing Index, not ShardIndex.
UniqueReferenceID() uintptr
Rebuild()
2016-09-14 14:15:23 +00:00
}
2016-11-15 16:20:00 +00:00
// SeriesElem represents a generic series element.
type SeriesElem interface {
Name() []byte
Tags() models.Tags
Deleted() bool
// InfluxQL expression associated with series during filtering.
Expr() influxql.Expr
}
// SeriesIterator represents a iterator over a list of series.
type SeriesIterator interface {
2017-12-02 23:52:34 +00:00
Close() error
2017-11-27 14:52:18 +00:00
Next() (SeriesElem, error)
}
2017-11-15 23:09:25 +00:00
// NewSeriesIteratorAdapter returns an adapter for converting series ids to series.
func NewSeriesIteratorAdapter(sfile *SeriesFile, itr SeriesIDIterator) SeriesIterator {
return &seriesIteratorAdapter{
sfile: sfile,
itr: itr,
}
}
type seriesIteratorAdapter struct {
sfile *SeriesFile
itr SeriesIDIterator
}
2017-12-02 23:52:34 +00:00
func (itr *seriesIteratorAdapter) Close() error { return itr.itr.Close() }
2017-11-27 14:52:18 +00:00
func (itr *seriesIteratorAdapter) Next() (SeriesElem, error) {
for {
elem, err := itr.itr.Next()
if err != nil {
return nil, err
} else if elem.SeriesID == 0 {
return nil, nil
}
// Skip if this key has been tombstoned.
key := itr.sfile.SeriesKey(elem.SeriesID)
if len(key) == 0 {
continue
}
2017-11-15 23:09:25 +00:00
name, tags := ParseSeriesKey(key)
deleted := itr.sfile.IsDeleted(elem.SeriesID)
return &seriesElemAdapter{
name: name,
tags: tags,
deleted: deleted,
expr: elem.Expr,
}, nil
}
2017-11-15 23:09:25 +00:00
}
type seriesElemAdapter struct {
name []byte
tags models.Tags
deleted bool
expr influxql.Expr
}
func (e *seriesElemAdapter) Name() []byte { return e.name }
func (e *seriesElemAdapter) Tags() models.Tags { return e.tags }
func (e *seriesElemAdapter) Deleted() bool { return e.deleted }
func (e *seriesElemAdapter) Expr() influxql.Expr { return e.expr }
// SeriesIDElem represents a single series and optional expression.
type SeriesIDElem struct {
SeriesID uint64
Expr influxql.Expr
}
// SeriesIDElems represents a list of series id elements.
type SeriesIDElems []SeriesIDElem
func (a SeriesIDElems) Len() int { return len(a) }
func (a SeriesIDElems) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a SeriesIDElems) Less(i, j int) bool { return a[i].SeriesID < a[j].SeriesID }
// SeriesIDIterator represents a iterator over a list of series ids.
type SeriesIDIterator interface {
2017-11-27 14:52:18 +00:00
Next() (SeriesIDElem, error)
Close() error
}
// SeriesKeyIterator represents an iterator over a list of SeriesKeys
type SeriesKeyIterator interface {
Next() ([]byte, error)
Close() error
}
2018-07-04 17:53:13 +00:00
// SeriesIDSetIterator represents an iterator that can produce a SeriesIDSet.
type SeriesIDSetIterator interface {
SeriesIDIterator
SeriesIDSet() *SeriesIDSet
}
type seriesIDSetIterator struct {
ss *SeriesIDSet
itr SeriesIDSetIterable
closer io.Closer
2018-07-04 17:53:13 +00:00
}
func NewSeriesIDSetIterator(ss *SeriesIDSet) SeriesIDSetIterator {
if ss == nil || ss.bitmap == nil {
return nil
}
return &seriesIDSetIterator{ss: ss, itr: ss.Iterator()}
}
func NewSeriesIDSetIteratorWithCloser(ss *SeriesIDSet, closer io.Closer) SeriesIDSetIterator {
if ss == nil || ss.bitmap == nil {
return nil
}
return &seriesIDSetIterator{ss: ss, itr: ss.Iterator(), closer: closer}
}
2018-07-04 17:53:13 +00:00
func (itr *seriesIDSetIterator) Next() (SeriesIDElem, error) {
if !itr.itr.HasNext() {
return SeriesIDElem{}, nil
}
return SeriesIDElem{SeriesID: uint64(itr.itr.Next())}, nil
}
func (itr *seriesIDSetIterator) Close() error {
if itr.closer != nil {
return itr.closer.Close()
}
return nil
}
2018-07-04 17:53:13 +00:00
func (itr *seriesIDSetIterator) SeriesIDSet() *SeriesIDSet { return itr.ss }
type SeriesIDSetIterators []SeriesIDSetIterator
func (a SeriesIDSetIterators) Close() (err error) {
for i := range a {
if e := a[i].Close(); e != nil && err == nil {
err = e
}
}
return err
}
2018-07-04 17:53:13 +00:00
// NewSeriesIDSetIterators returns a slice of SeriesIDSetIterator if all itrs
// can be type casted. Otherwise returns nil.
func NewSeriesIDSetIterators(itrs []SeriesIDIterator) []SeriesIDSetIterator {
if len(itrs) == 0 {
return nil
}
a := make([]SeriesIDSetIterator, len(itrs))
for i := range itrs {
if itr, ok := itrs[i].(SeriesIDSetIterator); ok {
a[i] = itr
} else {
return nil
}
}
return a
}
// ReadAllSeriesIDIterator returns all ids from the iterator.
func ReadAllSeriesIDIterator(itr SeriesIDIterator) ([]uint64, error) {
if itr == nil {
return nil, nil
}
var a []uint64
for {
e, err := itr.Next()
if err != nil {
return nil, err
} else if e.SeriesID == 0 {
break
}
a = append(a, e.SeriesID)
}
return a, nil
}
2017-11-15 23:09:25 +00:00
// NewSeriesIDSliceIterator returns a SeriesIDIterator that iterates over a slice.
func NewSeriesIDSliceIterator(ids []uint64) *SeriesIDSliceIterator {
return &SeriesIDSliceIterator{ids: ids}
}
// SeriesIDSliceIterator iterates over a slice of series ids.
type SeriesIDSliceIterator struct {
ids []uint64
}
// Next returns the next series id in the slice.
2017-11-27 14:52:18 +00:00
func (itr *SeriesIDSliceIterator) Next() (SeriesIDElem, error) {
2017-11-15 23:09:25 +00:00
if len(itr.ids) == 0 {
2017-11-27 14:52:18 +00:00
return SeriesIDElem{}, nil
2017-11-15 23:09:25 +00:00
}
id := itr.ids[0]
itr.ids = itr.ids[1:]
2017-11-27 14:52:18 +00:00
return SeriesIDElem{SeriesID: id}, nil
}
func (itr *SeriesIDSliceIterator) Close() error { return nil }
2018-07-04 17:53:13 +00:00
// SeriesIDSet returns a set of all remaining ids.
func (itr *SeriesIDSliceIterator) SeriesIDSet() *SeriesIDSet {
s := NewSeriesIDSet()
for _, id := range itr.ids {
s.AddNoLock(id)
}
return s
}
2017-11-29 18:20:18 +00:00
type SeriesIDIterators []SeriesIDIterator
func (a SeriesIDIterators) Close() (err error) {
for i := range a {
if e := a[i].Close(); e != nil && err == nil {
err = e
}
}
return err
}
func (a SeriesIDIterators) filterNonNil() []SeriesIDIterator {
other := make([]SeriesIDIterator, 0, len(a))
for _, itr := range a {
if itr == nil {
continue
}
other = append(other, itr)
}
return other
}
2017-11-27 14:52:18 +00:00
// seriesQueryAdapterIterator adapts SeriesIDIterator to an influxql.Iterator.
type seriesQueryAdapterIterator struct {
once sync.Once
sfile *SeriesFile
itr SeriesIDIterator
fieldset *MeasurementFieldSet
opt query.IteratorOptions
2017-11-29 18:20:18 +00:00
point query.FloatPoint // reusable point
}
// NewSeriesQueryAdapterIterator returns a new instance of SeriesQueryAdapterIterator.
func NewSeriesQueryAdapterIterator(sfile *SeriesFile, itr SeriesIDIterator, fieldset *MeasurementFieldSet, opt query.IteratorOptions) query.Iterator {
return &seriesQueryAdapterIterator{
sfile: sfile,
itr: itr,
fieldset: fieldset,
point: query.FloatPoint{
Aux: make([]interface{}, len(opt.Aux)),
},
opt: opt,
}
}
// Stats returns stats about the points processed.
func (itr *seriesQueryAdapterIterator) Stats() query.IteratorStats { return query.IteratorStats{} }
// Close closes the iterator.
func (itr *seriesQueryAdapterIterator) Close() error {
itr.once.Do(func() {
itr.itr.Close()
})
return nil
}
// Next emits the next point in the iterator.
func (itr *seriesQueryAdapterIterator) Next() (*query.FloatPoint, error) {
for {
// Read next series element.
e, err := itr.itr.Next()
if err != nil {
return nil, err
} else if e.SeriesID == 0 {
return nil, nil
}
// Skip if key has been tombstoned.
seriesKey := itr.sfile.SeriesKey(e.SeriesID)
if len(seriesKey) == 0 {
continue
}
2017-11-29 18:20:18 +00:00
// Convert to a key.
name, tags := ParseSeriesKey(seriesKey)
key := string(models.MakeKey(name, tags))
2017-11-29 18:20:18 +00:00
// Write auxiliary fields.
for i, f := range itr.opt.Aux {
switch f.Val {
case "key":
itr.point.Aux[i] = key
}
2017-11-29 18:20:18 +00:00
}
return &itr.point, nil
2017-11-29 18:20:18 +00:00
}
}
// filterUndeletedSeriesIDIterator returns all series which are not deleted.
type filterUndeletedSeriesIDIterator struct {
sfile *SeriesFile
itr SeriesIDIterator
}
// FilterUndeletedSeriesIDIterator returns an iterator which filters all deleted series.
func FilterUndeletedSeriesIDIterator(sfile *SeriesFile, itr SeriesIDIterator) SeriesIDIterator {
if itr == nil {
return nil
}
return &filterUndeletedSeriesIDIterator{sfile: sfile, itr: itr}
}
func (itr *filterUndeletedSeriesIDIterator) Close() error {
return itr.itr.Close()
}
func (itr *filterUndeletedSeriesIDIterator) Next() (SeriesIDElem, error) {
for {
e, err := itr.itr.Next()
if err != nil {
return SeriesIDElem{}, err
} else if e.SeriesID == 0 {
return SeriesIDElem{}, nil
} else if itr.sfile.IsDeleted(e.SeriesID) {
continue
}
return e, nil
}
}
// seriesIDExprIterator is an iterator that attaches an associated expression.
type seriesIDExprIterator struct {
itr SeriesIDIterator
expr influxql.Expr
}
// newSeriesIDExprIterator returns a new instance of seriesIDExprIterator.
func newSeriesIDExprIterator(itr SeriesIDIterator, expr influxql.Expr) SeriesIDIterator {
if itr == nil {
return nil
}
return &seriesIDExprIterator{
itr: itr,
expr: expr,
}
}
func (itr *seriesIDExprIterator) Close() error {
return itr.itr.Close()
}
// Next returns the next element in the iterator.
func (itr *seriesIDExprIterator) Next() (SeriesIDElem, error) {
elem, err := itr.itr.Next()
if err != nil {
return SeriesIDElem{}, err
} else if elem.SeriesID == 0 {
return SeriesIDElem{}, nil
}
elem.Expr = itr.expr
return elem, nil
}
// MergeSeriesIDIterators returns an iterator that merges a set of iterators.
// Iterators that are first in the list take precedence and a deletion by those
2017-11-29 18:20:18 +00:00
// early iterators will invalidate elements by later iterators.
func MergeSeriesIDIterators(itrs ...SeriesIDIterator) SeriesIDIterator {
if n := len(itrs); n == 0 {
return nil
} else if n == 1 {
return itrs[0]
}
itrs = SeriesIDIterators(itrs).filterNonNil()
2017-11-29 18:20:18 +00:00
2018-07-04 17:53:13 +00:00
// Merge as series id sets, if available.
if a := NewSeriesIDSetIterators(itrs); a != nil {
sets := make([]*SeriesIDSet, len(a))
for i := range a {
sets[i] = a[i].SeriesIDSet()
}
ss := NewSeriesIDSet()
ss.Merge(sets...)
// Attach underlying iterators as the closer
return NewSeriesIDSetIteratorWithCloser(ss, SeriesIDSetIterators(a))
2018-07-04 17:53:13 +00:00
}
2017-11-29 18:20:18 +00:00
return &seriesIDMergeIterator{
buf: make([]SeriesIDElem, len(itrs)),
itrs: itrs,
}
}
// seriesIDMergeIterator is an iterator that merges multiple iterators together.
type seriesIDMergeIterator struct {
buf []SeriesIDElem
itrs []SeriesIDIterator
}
func (itr *seriesIDMergeIterator) Close() error {
2017-12-02 23:52:34 +00:00
SeriesIDIterators(itr.itrs).Close()
2017-11-29 18:20:18 +00:00
return nil
}
// Next returns the element with the next lowest name/tags across the iterators.
func (itr *seriesIDMergeIterator) Next() (SeriesIDElem, error) {
// Find next lowest id amongst the buffers.
var elem SeriesIDElem
for i := range itr.buf {
buf := &itr.buf[i]
// Fill buffer.
if buf.SeriesID == 0 {
elem, err := itr.itrs[i].Next()
if err != nil {
return SeriesIDElem{}, nil
} else if elem.SeriesID == 0 {
continue
}
itr.buf[i] = elem
}
if elem.SeriesID == 0 || buf.SeriesID < elem.SeriesID {
elem = *buf
}
}
// Return EOF if no elements remaining.
if elem.SeriesID == 0 {
return SeriesIDElem{}, nil
}
// Clear matching buffers.
for i := range itr.buf {
if itr.buf[i].SeriesID == elem.SeriesID {
itr.buf[i].SeriesID = 0
}
}
return elem, nil
}
// IntersectSeriesIDIterators returns an iterator that only returns series which
// occur in both iterators. If both series have associated expressions then
// they are combined together.
func IntersectSeriesIDIterators(itr0, itr1 SeriesIDIterator) SeriesIDIterator {
if itr0 == nil || itr1 == nil {
2017-12-06 16:09:41 +00:00
if itr0 != nil {
itr0.Close()
}
if itr1 != nil {
itr1.Close()
}
2017-11-29 18:20:18 +00:00
return nil
}
2018-07-04 17:53:13 +00:00
// Create series id set, if available.
if a := NewSeriesIDSetIterators([]SeriesIDIterator{itr0, itr1}); a != nil {
return NewSeriesIDSetIteratorWithCloser(a[0].SeriesIDSet().And(a[1].SeriesIDSet()), SeriesIDSetIterators(a))
2018-07-04 17:53:13 +00:00
}
2017-11-29 18:20:18 +00:00
return &seriesIDIntersectIterator{itrs: [2]SeriesIDIterator{itr0, itr1}}
}
// seriesIDIntersectIterator is an iterator that merges two iterators together.
type seriesIDIntersectIterator struct {
buf [2]SeriesIDElem
itrs [2]SeriesIDIterator
}
func (itr *seriesIDIntersectIterator) Close() (err error) {
if e := itr.itrs[0].Close(); e != nil && err == nil {
err = e
}
if e := itr.itrs[1].Close(); e != nil && err == nil {
err = e
}
return err
}
// Next returns the next element which occurs in both iterators.
func (itr *seriesIDIntersectIterator) Next() (_ SeriesIDElem, err error) {
for {
// Fill buffers.
if itr.buf[0].SeriesID == 0 {
if itr.buf[0], err = itr.itrs[0].Next(); err != nil {
return SeriesIDElem{}, err
}
}
if itr.buf[1].SeriesID == 0 {
if itr.buf[1], err = itr.itrs[1].Next(); err != nil {
return SeriesIDElem{}, err
}
}
// Exit if either buffer is still empty.
if itr.buf[0].SeriesID == 0 || itr.buf[1].SeriesID == 0 {
return SeriesIDElem{}, nil
}
// Skip if both series are not equal.
if a, b := itr.buf[0].SeriesID, itr.buf[1].SeriesID; a < b {
itr.buf[0].SeriesID = 0
continue
} else if a > b {
itr.buf[1].SeriesID = 0
continue
}
// Merge series together if equal.
elem := itr.buf[0]
// Attach expression.
expr0 := itr.buf[0].Expr
expr1 := itr.buf[1].Expr
if expr0 == nil {
elem.Expr = expr1
} else if expr1 == nil {
elem.Expr = expr0
} else {
elem.Expr = influxql.Reduce(&influxql.BinaryExpr{
Op: influxql.AND,
LHS: expr0,
RHS: expr1,
}, nil)
}
itr.buf[0].SeriesID, itr.buf[1].SeriesID = 0, 0
return elem, nil
}
}
// UnionSeriesIDIterators returns an iterator that returns series from both
// both iterators. If both series have associated expressions then they are
// combined together.
func UnionSeriesIDIterators(itr0, itr1 SeriesIDIterator) SeriesIDIterator {
// Return other iterator if either one is nil.
if itr0 == nil {
return itr1
} else if itr1 == nil {
return itr0
}
2018-07-04 17:53:13 +00:00
// Create series id set, if available.
if a := NewSeriesIDSetIterators([]SeriesIDIterator{itr0, itr1}); a != nil {
ss := NewSeriesIDSet()
ss.Merge(a[0].SeriesIDSet(), a[1].SeriesIDSet())
return NewSeriesIDSetIteratorWithCloser(ss, SeriesIDSetIterators(a))
2018-07-04 17:53:13 +00:00
}
2017-11-29 18:20:18 +00:00
return &seriesIDUnionIterator{itrs: [2]SeriesIDIterator{itr0, itr1}}
}
// seriesIDUnionIterator is an iterator that unions two iterators together.
type seriesIDUnionIterator struct {
buf [2]SeriesIDElem
itrs [2]SeriesIDIterator
}
func (itr *seriesIDUnionIterator) Close() (err error) {
if e := itr.itrs[0].Close(); e != nil && err == nil {
err = e
}
if e := itr.itrs[1].Close(); e != nil && err == nil {
err = e
}
return err
}
// Next returns the next element which occurs in both iterators.
func (itr *seriesIDUnionIterator) Next() (_ SeriesIDElem, err error) {
// Fill buffers.
if itr.buf[0].SeriesID == 0 {
if itr.buf[0], err = itr.itrs[0].Next(); err != nil {
return SeriesIDElem{}, err
}
}
if itr.buf[1].SeriesID == 0 {
if itr.buf[1], err = itr.itrs[1].Next(); err != nil {
return SeriesIDElem{}, err
}
}
// Return non-zero or lesser series.
2017-12-04 17:29:04 +00:00
if a, b := itr.buf[0].SeriesID, itr.buf[1].SeriesID; a == 0 && b == 0 {
return SeriesIDElem{}, nil
} else if b == 0 || (a != 0 && a < b) {
2017-11-29 18:20:18 +00:00
elem := itr.buf[0]
itr.buf[0].SeriesID = 0
return elem, nil
2017-12-04 17:29:04 +00:00
} else if a == 0 || (b != 0 && a > b) {
2017-11-29 18:20:18 +00:00
elem := itr.buf[1]
itr.buf[1].SeriesID = 0
return elem, nil
}
// Attach element.
elem := itr.buf[0]
// Attach expression.
expr0 := itr.buf[0].Expr
expr1 := itr.buf[1].Expr
if expr0 != nil && expr1 != nil {
elem.Expr = influxql.Reduce(&influxql.BinaryExpr{
Op: influxql.OR,
LHS: expr0,
RHS: expr1,
}, nil)
} else {
elem.Expr = nil
}
itr.buf[0].SeriesID, itr.buf[1].SeriesID = 0, 0
return elem, nil
}
// DifferenceSeriesIDIterators returns an iterator that only returns series which
// occur the first iterator but not the second iterator.
func DifferenceSeriesIDIterators(itr0, itr1 SeriesIDIterator) SeriesIDIterator {
2017-12-06 16:09:41 +00:00
if itr0 == nil && itr1 == nil {
return nil
} else if itr1 == nil {
2017-11-29 18:20:18 +00:00
return itr0
} else if itr0 == nil {
2017-12-06 16:09:41 +00:00
itr1.Close()
2017-11-29 18:20:18 +00:00
return nil
}
2018-07-04 17:53:13 +00:00
// Create series id set, if available.
if a := NewSeriesIDSetIterators([]SeriesIDIterator{itr0, itr1}); a != nil {
return NewSeriesIDSetIteratorWithCloser(a[0].SeriesIDSet().AndNot(a[1].SeriesIDSet()), SeriesIDSetIterators(a))
2018-07-04 17:53:13 +00:00
}
2017-11-29 18:20:18 +00:00
return &seriesIDDifferenceIterator{itrs: [2]SeriesIDIterator{itr0, itr1}}
}
// seriesIDDifferenceIterator is an iterator that merges two iterators together.
type seriesIDDifferenceIterator struct {
buf [2]SeriesIDElem
itrs [2]SeriesIDIterator
}
func (itr *seriesIDDifferenceIterator) Close() (err error) {
if e := itr.itrs[0].Close(); e != nil && err == nil {
err = e
}
if e := itr.itrs[1].Close(); e != nil && err == nil {
err = e
}
return err
}
// Next returns the next element which occurs only in the first iterator.
func (itr *seriesIDDifferenceIterator) Next() (_ SeriesIDElem, err error) {
for {
// Fill buffers.
if itr.buf[0].SeriesID == 0 {
if itr.buf[0], err = itr.itrs[0].Next(); err != nil {
return SeriesIDElem{}, err
}
}
if itr.buf[1].SeriesID == 0 {
if itr.buf[1], err = itr.itrs[1].Next(); err != nil {
return SeriesIDElem{}, err
}
}
// Exit if first buffer is still empty.
if itr.buf[0].SeriesID == 0 {
return SeriesIDElem{}, nil
} else if itr.buf[1].SeriesID == 0 {
elem := itr.buf[0]
itr.buf[0].SeriesID = 0
return elem, nil
}
// Return first series if it's less.
// If second series is less then skip it.
// If both series are equal then skip both.
if a, b := itr.buf[0].SeriesID, itr.buf[1].SeriesID; a < b {
elem := itr.buf[0]
itr.buf[0].SeriesID = 0
return elem, nil
} else if a > b {
itr.buf[1].SeriesID = 0
continue
} else {
itr.buf[0].SeriesID, itr.buf[1].SeriesID = 0, 0
continue
}
}
}
// seriesPointIterator adapts SeriesIterator to an influxql.Iterator.
type seriesPointIterator struct {
once sync.Once
indexSet IndexSet
mitr MeasurementIterator
2017-12-05 17:49:58 +00:00
keys [][]byte
2017-11-29 18:20:18 +00:00
opt query.IteratorOptions
point query.FloatPoint // reusable point
}
// newSeriesPointIterator returns a new instance of seriesPointIterator.
func NewSeriesPointIterator(indexSet IndexSet, opt query.IteratorOptions) (_ query.Iterator, err error) {
2017-11-29 18:20:18 +00:00
// Only equality operators are allowed.
influxql.WalkFunc(opt.Condition, func(n influxql.Node) {
switch n := n.(type) {
case *influxql.BinaryExpr:
switch n.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX,
influxql.OR, influxql.AND:
default:
err = errors.New("invalid tag comparison operator")
}
}
})
if err != nil {
return nil, err
}
mitr, err := indexSet.MeasurementIterator()
if err != nil {
return nil, err
}
return &seriesPointIterator{
indexSet: indexSet,
mitr: mitr,
point: query.FloatPoint{
Aux: make([]interface{}, len(opt.Aux)),
},
opt: opt,
}, nil
}
// Stats returns stats about the points processed.
func (itr *seriesPointIterator) Stats() query.IteratorStats { return query.IteratorStats{} }
// Close closes the iterator.
func (itr *seriesPointIterator) Close() (err error) {
itr.once.Do(func() {
if itr.mitr != nil {
2017-12-05 17:49:58 +00:00
err = itr.mitr.Close()
2017-11-29 18:20:18 +00:00
}
})
return err
}
// Next emits the next point in the iterator.
func (itr *seriesPointIterator) Next() (*query.FloatPoint, error) {
for {
2017-12-05 17:49:58 +00:00
// Read series keys for next measurement if no more keys remaining.
2017-11-29 18:20:18 +00:00
// Exit if there are no measurements remaining.
2017-12-05 17:49:58 +00:00
if len(itr.keys) == 0 {
2017-11-29 18:20:18 +00:00
m, err := itr.mitr.Next()
if err != nil {
return nil, err
} else if m == nil {
return nil, nil
}
2017-12-05 17:49:58 +00:00
if err := itr.readSeriesKeys(m); err != nil {
2017-11-29 18:20:18 +00:00
return nil, err
}
continue
}
2017-12-05 17:49:58 +00:00
name, tags := ParseSeriesKey(itr.keys[0])
itr.keys = itr.keys[1:]
2017-11-29 18:20:18 +00:00
2017-12-12 21:22:42 +00:00
// TODO(edd): It seems to me like this authorisation check should be
// further down in the index. At this point we're going to be filtering
// series that have already been materialised in the LogFiles and
// IndexFiles.
if itr.opt.Authorizer != nil && !itr.opt.Authorizer.AuthorizeSeriesRead(itr.indexSet.Database(), name, tags) {
continue
}
// Convert to a key.
key := string(models.MakeKey(name, tags))
2017-11-29 18:20:18 +00:00
// Write auxiliary fields.
for i, f := range itr.opt.Aux {
switch f.Val {
case "key":
itr.point.Aux[i] = key
}
}
2017-12-04 17:29:04 +00:00
return &itr.point, nil
2017-11-29 18:20:18 +00:00
}
}
2017-12-05 17:49:58 +00:00
func (itr *seriesPointIterator) readSeriesKeys(name []byte) error {
sitr, err := itr.indexSet.MeasurementSeriesByExprIterator(name, itr.opt.Condition)
if err != nil {
return err
} else if sitr == nil {
return nil
}
defer sitr.Close()
// Slurp all series keys.
itr.keys = itr.keys[:0]
2018-03-15 17:22:34 +00:00
for i := 0; ; i++ {
2017-12-05 17:49:58 +00:00
elem, err := sitr.Next()
if err != nil {
return err
} else if elem.SeriesID == 0 {
break
}
2018-03-15 17:22:34 +00:00
// Periodically check for interrupt.
if i&0xFF == 0xFF {
select {
case <-itr.opt.InterruptCh:
return itr.Close()
2018-04-12 21:20:38 +00:00
default:
2018-03-15 17:22:34 +00:00
}
}
key := itr.indexSet.SeriesFile.SeriesKey(elem.SeriesID)
if len(key) == 0 {
continue
}
itr.keys = append(itr.keys, key)
2017-12-05 17:49:58 +00:00
}
// Sort keys.
sort.Sort(seriesKeys(itr.keys))
return nil
}
2017-11-29 18:20:18 +00:00
// MeasurementIterator represents a iterator over a list of measurements.
type MeasurementIterator interface {
Close() error
Next() ([]byte, error)
}
type MeasurementIterators []MeasurementIterator
func (a MeasurementIterators) Close() (err error) {
for i := range a {
if e := a[i].Close(); e != nil && err == nil {
err = e
}
}
return err
}
type measurementSliceIterator struct {
names [][]byte
}
// NewMeasurementSliceIterator returns an iterator over a slice of in-memory measurement names.
func NewMeasurementSliceIterator(names [][]byte) *measurementSliceIterator {
return &measurementSliceIterator{names: names}
}
func (itr *measurementSliceIterator) Close() (err error) { return nil }
func (itr *measurementSliceIterator) Next() (name []byte, err error) {
if len(itr.names) == 0 {
return nil, nil
}
name, itr.names = itr.names[0], itr.names[1:]
return name, nil
}
// MergeMeasurementIterators returns an iterator that merges a set of iterators.
2019-02-03 20:27:43 +00:00
// Iterators that are first in the list take precedence and a deletion by those
2017-11-29 18:20:18 +00:00
// early iterators will invalidate elements by later iterators.
func MergeMeasurementIterators(itrs ...MeasurementIterator) MeasurementIterator {
if len(itrs) == 0 {
return nil
} else if len(itrs) == 1 {
return itrs[0]
}
return &measurementMergeIterator{
buf: make([][]byte, len(itrs)),
itrs: itrs,
}
}
type measurementMergeIterator struct {
buf [][]byte
itrs []MeasurementIterator
}
func (itr *measurementMergeIterator) Close() (err error) {
for i := range itr.itrs {
if e := itr.itrs[i].Close(); e != nil && err == nil {
err = e
}
}
return err
}
// Next returns the element with the next lowest name across the iterators.
//
// If multiple iterators contain the same name then the first is returned
// and the remaining ones are skipped.
func (itr *measurementMergeIterator) Next() (_ []byte, err error) {
// Find next lowest name amongst the buffers.
var name []byte
for i, buf := range itr.buf {
// Fill buffer if empty.
if buf == nil {
if buf, err = itr.itrs[i].Next(); err != nil {
return nil, err
} else if buf != nil {
itr.buf[i] = buf
} else {
continue
}
}
// Find next lowest name.
if name == nil || bytes.Compare(itr.buf[i], name) == -1 {
name = itr.buf[i]
}
}
// Return nil if no elements remaining.
if name == nil {
return nil, nil
}
// Merge all elements together and clear buffers.
for i, buf := range itr.buf {
if buf == nil || !bytes.Equal(buf, name) {
continue
}
itr.buf[i] = nil
}
return name, nil
}
2017-12-05 17:49:58 +00:00
// TagKeyIterator represents a iterator over a list of tag keys.
type TagKeyIterator interface {
Close() error
Next() ([]byte, error)
}
type TagKeyIterators []TagKeyIterator
func (a TagKeyIterators) Close() (err error) {
for i := range a {
if e := a[i].Close(); e != nil && err == nil {
err = e
}
}
return err
}
// NewTagKeySliceIterator returns a TagKeyIterator that iterates over a slice.
func NewTagKeySliceIterator(keys [][]byte) *tagKeySliceIterator {
return &tagKeySliceIterator{keys: keys}
}
// tagKeySliceIterator iterates over a slice of tag keys.
type tagKeySliceIterator struct {
keys [][]byte
}
// Next returns the next tag key in the slice.
func (itr *tagKeySliceIterator) Next() ([]byte, error) {
if len(itr.keys) == 0 {
return nil, nil
}
key := itr.keys[0]
itr.keys = itr.keys[1:]
return key, nil
}
func (itr *tagKeySliceIterator) Close() error { return nil }
// MergeTagKeyIterators returns an iterator that merges a set of iterators.
func MergeTagKeyIterators(itrs ...TagKeyIterator) TagKeyIterator {
if len(itrs) == 0 {
return nil
} else if len(itrs) == 1 {
return itrs[0]
}
return &tagKeyMergeIterator{
buf: make([][]byte, len(itrs)),
itrs: itrs,
}
}
type tagKeyMergeIterator struct {
buf [][]byte
itrs []TagKeyIterator
}
func (itr *tagKeyMergeIterator) Close() error {
for i := range itr.itrs {
itr.itrs[i].Close()
}
return nil
}
// Next returns the element with the next lowest key across the iterators.
//
// If multiple iterators contain the same key then the first is returned
// and the remaining ones are skipped.
func (itr *tagKeyMergeIterator) Next() (_ []byte, err error) {
// Find next lowest key amongst the buffers.
var key []byte
for i, buf := range itr.buf {
// Fill buffer.
if buf == nil {
if buf, err = itr.itrs[i].Next(); err != nil {
return nil, err
} else if buf != nil {
itr.buf[i] = buf
} else {
continue
}
}
// Find next lowest key.
if key == nil || bytes.Compare(buf, key) == -1 {
key = buf
}
}
// Return nil if no elements remaining.
if key == nil {
return nil, nil
}
// Merge elements and clear buffers.
for i, buf := range itr.buf {
if buf == nil || !bytes.Equal(buf, key) {
continue
}
itr.buf[i] = nil
}
return key, nil
}
2017-11-29 18:20:18 +00:00
// TagValueIterator represents a iterator over a list of tag values.
type TagValueIterator interface {
Close() error
Next() ([]byte, error)
}
type TagValueIterators []TagValueIterator
func (a TagValueIterators) Close() (err error) {
for i := range a {
if e := a[i].Close(); e != nil && err == nil {
err = e
}
}
return err
}
// NewTagValueSliceIterator returns a TagValueIterator that iterates over a slice.
func NewTagValueSliceIterator(values [][]byte) *tagValueSliceIterator {
return &tagValueSliceIterator{values: values}
}
// tagValueSliceIterator iterates over a slice of tag values.
type tagValueSliceIterator struct {
values [][]byte
}
// Next returns the next tag value in the slice.
func (itr *tagValueSliceIterator) Next() ([]byte, error) {
if len(itr.values) == 0 {
return nil, nil
}
value := itr.values[0]
itr.values = itr.values[1:]
return value, nil
}
func (itr *tagValueSliceIterator) Close() error { return nil }
// MergeTagValueIterators returns an iterator that merges a set of iterators.
func MergeTagValueIterators(itrs ...TagValueIterator) TagValueIterator {
if len(itrs) == 0 {
return nil
} else if len(itrs) == 1 {
return itrs[0]
}
return &tagValueMergeIterator{
buf: make([][]byte, len(itrs)),
itrs: itrs,
}
}
type tagValueMergeIterator struct {
buf [][]byte
itrs []TagValueIterator
}
func (itr *tagValueMergeIterator) Close() error {
for i := range itr.itrs {
itr.itrs[i].Close()
}
return nil
}
// Next returns the element with the next lowest value across the iterators.
//
// If multiple iterators contain the same value then the first is returned
// and the remaining ones are skipped.
func (itr *tagValueMergeIterator) Next() (_ []byte, err error) {
// Find next lowest value amongst the buffers.
var value []byte
for i, buf := range itr.buf {
// Fill buffer.
if buf == nil {
if buf, err = itr.itrs[i].Next(); err != nil {
return nil, err
} else if buf != nil {
itr.buf[i] = buf
} else {
continue
}
}
// Find next lowest value.
if value == nil || bytes.Compare(buf, value) == -1 {
value = buf
}
}
// Return nil if no elements remaining.
if value == nil {
return nil, nil
}
// Merge elements and clear buffers.
for i, buf := range itr.buf {
if buf == nil || !bytes.Equal(buf, value) {
continue
}
itr.buf[i] = nil
}
return value, nil
}
// IndexSet represents a list of indexes, all belonging to one database.
2017-12-12 21:22:42 +00:00
type IndexSet struct {
Indexes []Index // The set of indexes comprising this IndexSet.
SeriesFile *SeriesFile // The Series File associated with the db for this set.
fieldSets []*MeasurementFieldSet // field sets for _all_ indexes in this set's DB.
2017-12-12 21:22:42 +00:00
}
2017-11-29 18:20:18 +00:00
// HasInmemIndex returns true if any in-memory index is in use.
func (is IndexSet) HasInmemIndex() bool {
for _, idx := range is.Indexes {
2018-08-21 13:32:30 +00:00
if idx.Type() == InmemIndexName {
return true
}
}
return false
}
2017-11-29 18:20:18 +00:00
// Database returns the database name of the first index.
func (is IndexSet) Database() string {
2017-12-12 21:22:42 +00:00
if len(is.Indexes) == 0 {
2017-11-29 18:20:18 +00:00
return ""
}
2017-12-12 21:22:42 +00:00
return is.Indexes[0].Database()
2017-11-29 18:20:18 +00:00
}
// HasField determines if any of the field sets on the set of indexes in the
// IndexSet have the provided field for the provided measurement.
func (is IndexSet) HasField(measurement []byte, field string) bool {
2017-12-12 21:22:42 +00:00
if len(is.Indexes) == 0 {
return false
}
if len(is.fieldSets) == 0 {
// field sets may not have been initialised yet.
is.fieldSets = make([]*MeasurementFieldSet, 0, len(is.Indexes))
for _, idx := range is.Indexes {
is.fieldSets = append(is.fieldSets, idx.FieldSet())
}
2017-11-29 18:20:18 +00:00
}
for _, fs := range is.fieldSets {
if fs.Fields(measurement).HasField(field) {
return true
}
}
return false
2017-11-29 18:20:18 +00:00
}
// DedupeInmemIndexes returns an index set which removes duplicate indexes.
// Useful because inmem indexes are shared by shards per database.
2017-12-05 17:49:58 +00:00
func (is IndexSet) DedupeInmemIndexes() IndexSet {
other := IndexSet{
Indexes: make([]Index, 0, len(is.Indexes)),
SeriesFile: is.SeriesFile,
fieldSets: make([]*MeasurementFieldSet, 0, len(is.Indexes)),
}
2017-12-05 17:49:58 +00:00
uniqueIndexes := make(map[uintptr]Index)
2017-12-12 21:22:42 +00:00
for _, idx := range is.Indexes {
uniqueIndexes[idx.UniqueReferenceID()] = idx
}
for _, idx := range uniqueIndexes {
2017-12-12 21:22:42 +00:00
other.Indexes = append(other.Indexes, idx)
other.fieldSets = append(other.fieldSets, idx.FieldSet())
2017-12-05 17:49:58 +00:00
}
2017-12-05 17:49:58 +00:00
return other
}
// MeasurementNamesByExpr returns a slice of measurement names matching the
// provided condition. If no condition is provided then all names are returned.
2017-12-08 17:11:07 +00:00
func (is IndexSet) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
release := is.SeriesFile.Retain()
defer release()
2017-12-05 17:49:58 +00:00
// Return filtered list if expression exists.
if expr != nil {
names, err := is.measurementNamesByExpr(auth, expr)
if err != nil {
return nil, err
}
return slices.CopyChunkedByteSlices(names, 1000), nil
2017-12-05 17:49:58 +00:00
}
itr, err := is.measurementIterator()
2017-12-05 17:49:58 +00:00
if err != nil {
return nil, err
} else if itr == nil {
return nil, nil
}
defer itr.Close()
// Iterate over all measurements if no condition exists.
var names [][]byte
for {
e, err := itr.Next()
if err != nil {
return nil, err
} else if e == nil {
break
}
2017-12-12 21:22:42 +00:00
// Determine if there exists at least one authorised series for the
// measurement name.
fix(services/storage): multi measurement queries return all applicable series (#19592) (#20934) This fixes multi measurement queries that go through the storage service to correctly pick up all series that apply with the filter. Previously, negative queries such as `!=`, `!~`, and predicates attempting to match empty tags did not work correctly with the storage service when multiple measurements or `OR` conditions were included. This was because these predicates would be categorized as "multiple measurements" and then it would attempt to use the field keys iterator to find the fields for each measurement. The meta queries for these did not correctly account for negative equality operators or empty tags when finding appropriate measurements and those could not be changed because it would cause a breaking change to influxql too. This modifies the storage service to use new methods that correctly account for the above situations rather than the field keys iterator. Some queries that appeared to be single measurement queries also get considered as multiple measurement queries. Any query with an `OR` condition will be considered a multiple measurement query. This bug did not apply to single measurement queries where one measurement was selected and all of the logical operators were `AND` values. This is because it used a different code path that correctly handled these situations. Backport of #19566. (cherry picked from commit ceead88bd5b1cf0d808031f02ba6a05ac1343eb9) Co-authored-by: Jonathan A. Sternberg <jonathan@influxdata.com>
2021-03-12 21:34:14 +00:00
if is.measurementAuthorizedSeries(auth, e, nil) {
2017-12-12 21:22:42 +00:00
names = append(names, e)
}
2017-12-05 17:49:58 +00:00
}
return slices.CopyChunkedByteSlices(names, 1000), nil
2017-12-05 17:49:58 +00:00
}
2017-12-12 21:22:42 +00:00
func (is IndexSet) measurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
2017-12-05 17:49:58 +00:00
if expr == nil {
return nil, nil
}
switch e := expr.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok {
return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String())
}
// Retrieve value or regex expression from RHS.
var value string
var regex *regexp.Regexp
if influxql.IsRegexOp(e.Op) {
re, ok := e.RHS.(*influxql.RegexLiteral)
if !ok {
return nil, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String())
}
regex = re.Val
} else {
s, ok := e.RHS.(*influxql.StringLiteral)
if !ok {
return nil, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String())
}
value = s.Val
}
// Match on name, if specified.
if tag.Val == "_name" {
2017-12-12 21:22:42 +00:00
return is.measurementNamesByNameFilter(auth, e.Op, value, regex)
2017-12-05 17:49:58 +00:00
} else if influxql.IsSystemName(tag.Val) {
return nil, nil
}
2017-12-12 21:22:42 +00:00
return is.measurementNamesByTagFilter(auth, e.Op, tag.Val, value, regex)
2017-12-05 17:49:58 +00:00
case influxql.OR, influxql.AND:
2017-12-12 21:22:42 +00:00
lhs, err := is.measurementNamesByExpr(auth, e.LHS)
2017-12-05 17:49:58 +00:00
if err != nil {
return nil, err
}
2017-12-12 21:22:42 +00:00
rhs, err := is.measurementNamesByExpr(auth, e.RHS)
2017-12-05 17:49:58 +00:00
if err != nil {
return nil, err
}
if e.Op == influxql.OR {
return bytesutil.Union(lhs, rhs), nil
}
return bytesutil.Intersect(lhs, rhs), nil
default:
return nil, fmt.Errorf("invalid tag comparison operator")
}
case *influxql.ParenExpr:
2017-12-12 21:22:42 +00:00
return is.measurementNamesByExpr(auth, e.Expr)
2017-12-05 17:49:58 +00:00
default:
return nil, fmt.Errorf("Invalid measurement expression %#v", expr)
2017-12-05 17:49:58 +00:00
}
}
// measurementNamesByNameFilter returns matching measurement names in sorted order.
2017-12-12 21:22:42 +00:00
func (is IndexSet) measurementNamesByNameFilter(auth query.Authorizer, op influxql.Token, val string, regex *regexp.Regexp) ([][]byte, error) {
itr, err := is.measurementIterator()
2017-12-05 17:49:58 +00:00
if err != nil {
return nil, err
} else if itr == nil {
return nil, nil
}
defer itr.Close()
var names [][]byte
for {
e, err := itr.Next()
if err != nil {
return nil, err
} else if e == nil {
break
}
var matched bool
switch op {
case influxql.EQ:
matched = string(e) == val
case influxql.NEQ:
matched = string(e) != val
case influxql.EQREGEX:
matched = regex.Match(e)
case influxql.NEQREGEX:
matched = !regex.Match(e)
}
fix(services/storage): multi measurement queries return all applicable series (#19592) (#20934) This fixes multi measurement queries that go through the storage service to correctly pick up all series that apply with the filter. Previously, negative queries such as `!=`, `!~`, and predicates attempting to match empty tags did not work correctly with the storage service when multiple measurements or `OR` conditions were included. This was because these predicates would be categorized as "multiple measurements" and then it would attempt to use the field keys iterator to find the fields for each measurement. The meta queries for these did not correctly account for negative equality operators or empty tags when finding appropriate measurements and those could not be changed because it would cause a breaking change to influxql too. This modifies the storage service to use new methods that correctly account for the above situations rather than the field keys iterator. Some queries that appeared to be single measurement queries also get considered as multiple measurement queries. Any query with an `OR` condition will be considered a multiple measurement query. This bug did not apply to single measurement queries where one measurement was selected and all of the logical operators were `AND` values. This is because it used a different code path that correctly handled these situations. Backport of #19566. (cherry picked from commit ceead88bd5b1cf0d808031f02ba6a05ac1343eb9) Co-authored-by: Jonathan A. Sternberg <jonathan@influxdata.com>
2021-03-12 21:34:14 +00:00
if matched && is.measurementAuthorizedSeries(auth, e, nil) {
2017-12-05 17:49:58 +00:00
names = append(names, e)
}
}
bytesutil.Sort(names)
return names, nil
}
fix(services/storage): multi measurement queries return all applicable series (#19592) (#20934) This fixes multi measurement queries that go through the storage service to correctly pick up all series that apply with the filter. Previously, negative queries such as `!=`, `!~`, and predicates attempting to match empty tags did not work correctly with the storage service when multiple measurements or `OR` conditions were included. This was because these predicates would be categorized as "multiple measurements" and then it would attempt to use the field keys iterator to find the fields for each measurement. The meta queries for these did not correctly account for negative equality operators or empty tags when finding appropriate measurements and those could not be changed because it would cause a breaking change to influxql too. This modifies the storage service to use new methods that correctly account for the above situations rather than the field keys iterator. Some queries that appeared to be single measurement queries also get considered as multiple measurement queries. Any query with an `OR` condition will be considered a multiple measurement query. This bug did not apply to single measurement queries where one measurement was selected and all of the logical operators were `AND` values. This is because it used a different code path that correctly handled these situations. Backport of #19566. (cherry picked from commit ceead88bd5b1cf0d808031f02ba6a05ac1343eb9) Co-authored-by: Jonathan A. Sternberg <jonathan@influxdata.com>
2021-03-12 21:34:14 +00:00
// MeasurementNamesByPredicate returns a slice of measurement names matching the
// provided condition. If no condition is provided then all names are returned.
// This behaves differently from MeasurementNamesByExpr because it will
// return measurements using flux predicates.
func (is IndexSet) MeasurementNamesByPredicate(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
release := is.SeriesFile.Retain()
defer release()
// Return filtered list if expression exists.
if expr != nil {
names, err := is.measurementNamesByPredicate(auth, expr)
if err != nil {
return nil, err
}
return slices.CopyChunkedByteSlices(names, 1000), nil
}
itr, err := is.measurementIterator()
if err != nil {
return nil, err
} else if itr == nil {
return nil, nil
}
defer itr.Close()
// Iterate over all measurements if no condition exists.
var names [][]byte
for {
e, err := itr.Next()
if err != nil {
return nil, err
} else if e == nil {
break
}
// Determine if there exists at least one authorised series for the
// measurement name.
if is.measurementAuthorizedSeries(auth, e, nil) {
names = append(names, e)
}
}
return slices.CopyChunkedByteSlices(names, 1000), nil
}
func (is IndexSet) measurementNamesByPredicate(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
if expr == nil {
return nil, nil
}
switch e := expr.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok {
return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String())
}
// Retrieve value or regex expression from RHS.
var value string
var regex *regexp.Regexp
if influxql.IsRegexOp(e.Op) {
re, ok := e.RHS.(*influxql.RegexLiteral)
if !ok {
return nil, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String())
}
regex = re.Val
} else {
s, ok := e.RHS.(*influxql.StringLiteral)
if !ok {
return nil, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String())
}
value = s.Val
}
// Match on name, if specified.
if tag.Val == "_name" {
return is.measurementNamesByNameFilter(auth, e.Op, value, regex)
} else if influxql.IsSystemName(tag.Val) {
return nil, nil
}
return is.measurementNamesByTagPredicate(auth, e.Op, tag.Val, value, regex)
case influxql.OR, influxql.AND:
lhs, err := is.measurementNamesByPredicate(auth, e.LHS)
if err != nil {
return nil, err
}
rhs, err := is.measurementNamesByPredicate(auth, e.RHS)
if err != nil {
return nil, err
}
if e.Op == influxql.OR {
return bytesutil.Union(lhs, rhs), nil
}
return bytesutil.Intersect(lhs, rhs), nil
default:
return nil, fmt.Errorf("invalid tag comparison operator")
}
case *influxql.ParenExpr:
return is.measurementNamesByPredicate(auth, e.Expr)
default:
return nil, fmt.Errorf("%#v", expr)
}
}
2017-12-12 21:22:42 +00:00
func (is IndexSet) measurementNamesByTagFilter(auth query.Authorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) {
2017-12-05 17:49:58 +00:00
var names [][]byte
mitr, err := is.measurementIterator()
2017-12-05 17:49:58 +00:00
if err != nil {
return nil, err
} else if mitr == nil {
return nil, nil
}
defer mitr.Close()
2017-12-12 21:22:42 +00:00
// valEqual determines if the provided []byte is equal to the tag value
// to be filtered on.
valEqual := regex.Match
if op == influxql.EQ || op == influxql.NEQ {
vb := []byte(val)
valEqual = func(b []byte) bool { return bytes.Equal(vb, b) }
}
var tagMatch bool
var authorized bool
2017-12-05 17:49:58 +00:00
for {
me, err := mitr.Next()
if err != nil {
return nil, err
} else if me == nil {
break
}
2017-12-12 21:22:42 +00:00
// If the measurement doesn't have the tag key, then it won't be considered.
if ok, err := is.hasTagKey(me, []byte(key)); err != nil {
2017-12-12 21:22:42 +00:00
return nil, err
} else if !ok {
continue
}
tagMatch = false
// Authorization must be explicitly granted when an authorizer is present.
2018-01-17 14:00:24 +00:00
authorized = query.AuthorizerIsOpen(auth)
2017-12-12 21:22:42 +00:00
vitr, err := is.tagValueIterator(me, []byte(key))
2017-12-12 21:22:42 +00:00
if err != nil {
return nil, err
}
if vitr != nil {
defer vitr.Close()
for {
ve, err := vitr.Next()
if err != nil {
return nil, err
} else if ve == nil {
break
}
if !valEqual(ve) {
continue
}
2017-12-05 17:49:58 +00:00
tagMatch = true
2018-01-17 14:00:24 +00:00
if query.AuthorizerIsOpen(auth) {
2017-12-12 21:22:42 +00:00
break
}
// When an authorizer is present, the measurement should be
// included only if one of it's series is authorized.
sitr, err := is.tagValueSeriesIDIterator(me, []byte(key), ve)
2017-12-05 17:49:58 +00:00
if err != nil {
2017-12-12 21:22:42 +00:00
return nil, err
} else if sitr == nil {
continue
2017-12-05 17:49:58 +00:00
}
2017-12-12 21:22:42 +00:00
defer sitr.Close()
sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr)
2017-12-05 17:49:58 +00:00
2017-12-12 21:22:42 +00:00
// Locate a series with this matching tag value that's authorized.
2017-12-05 17:49:58 +00:00
for {
2017-12-12 21:22:42 +00:00
se, err := sitr.Next()
if err != nil {
return nil, err
2017-12-05 17:49:58 +00:00
}
2017-12-12 21:22:42 +00:00
if se.SeriesID == 0 {
break
}
name, tags := is.SeriesFile.Series(se.SeriesID)
if auth.AuthorizeSeriesRead(is.Database(), name, tags) {
authorized = true
break
}
}
if err := sitr.Close(); err != nil {
return nil, err
}
if tagMatch && authorized {
// The measurement can definitely be included or rejected.
break
2017-12-05 17:49:58 +00:00
}
2017-12-12 21:22:42 +00:00
}
if err := vitr.Close(); err != nil {
2017-12-05 17:49:58 +00:00
return nil, err
}
}
2017-12-12 21:22:42 +00:00
// For negation operators, to determine if the measurement is authorized,
// an authorized series belonging to the measurement must be located.
// Then, the measurement can be added iff !tagMatch && authorized.
if (op == influxql.NEQ || op == influxql.NEQREGEX) && !tagMatch {
fix(services/storage): multi measurement queries return all applicable series (#19592) (#20934) This fixes multi measurement queries that go through the storage service to correctly pick up all series that apply with the filter. Previously, negative queries such as `!=`, `!~`, and predicates attempting to match empty tags did not work correctly with the storage service when multiple measurements or `OR` conditions were included. This was because these predicates would be categorized as "multiple measurements" and then it would attempt to use the field keys iterator to find the fields for each measurement. The meta queries for these did not correctly account for negative equality operators or empty tags when finding appropriate measurements and those could not be changed because it would cause a breaking change to influxql too. This modifies the storage service to use new methods that correctly account for the above situations rather than the field keys iterator. Some queries that appeared to be single measurement queries also get considered as multiple measurement queries. Any query with an `OR` condition will be considered a multiple measurement query. This bug did not apply to single measurement queries where one measurement was selected and all of the logical operators were `AND` values. This is because it used a different code path that correctly handled these situations. Backport of #19566. (cherry picked from commit ceead88bd5b1cf0d808031f02ba6a05ac1343eb9) Co-authored-by: Jonathan A. Sternberg <jonathan@influxdata.com>
2021-03-12 21:34:14 +00:00
authorized = is.measurementAuthorizedSeries(auth, me, nil)
2017-12-12 21:22:42 +00:00
}
2017-12-05 17:49:58 +00:00
// tags match | operation is EQ | measurement matches
// --------------------------------------------------
// True | True | True
// True | False | False
// False | True | False
// False | False | True
2017-12-12 21:22:42 +00:00
if tagMatch == (op == influxql.EQ || op == influxql.EQREGEX) && authorized {
2017-12-05 17:49:58 +00:00
names = append(names, me)
continue
}
}
bytesutil.Sort(names)
return names, nil
}
fix(services/storage): multi measurement queries return all applicable series (#19592) (#20934) This fixes multi measurement queries that go through the storage service to correctly pick up all series that apply with the filter. Previously, negative queries such as `!=`, `!~`, and predicates attempting to match empty tags did not work correctly with the storage service when multiple measurements or `OR` conditions were included. This was because these predicates would be categorized as "multiple measurements" and then it would attempt to use the field keys iterator to find the fields for each measurement. The meta queries for these did not correctly account for negative equality operators or empty tags when finding appropriate measurements and those could not be changed because it would cause a breaking change to influxql too. This modifies the storage service to use new methods that correctly account for the above situations rather than the field keys iterator. Some queries that appeared to be single measurement queries also get considered as multiple measurement queries. Any query with an `OR` condition will be considered a multiple measurement query. This bug did not apply to single measurement queries where one measurement was selected and all of the logical operators were `AND` values. This is because it used a different code path that correctly handled these situations. Backport of #19566. (cherry picked from commit ceead88bd5b1cf0d808031f02ba6a05ac1343eb9) Co-authored-by: Jonathan A. Sternberg <jonathan@influxdata.com>
2021-03-12 21:34:14 +00:00
func (is IndexSet) measurementNamesByTagPredicate(auth query.Authorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) {
var names [][]byte
mitr, err := is.measurementIterator()
if err != nil {
return nil, err
} else if mitr == nil {
return nil, nil
}
defer mitr.Close()
var checkMeasurement func(auth query.Authorizer, me []byte) (bool, error)
switch op {
case influxql.EQ:
checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) {
return is.measurementHasTagValue(auth, me, []byte(key), []byte(val))
}
case influxql.NEQ:
checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) {
// If there is an authorized series in this measurement and that series
// does not contain the tag key/value.
ok := is.measurementAuthorizedSeries(auth, me, func(tags models.Tags) bool {
return tags.GetString(key) == val
})
return ok, nil
}
case influxql.EQREGEX:
checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) {
return is.measurementHasTagValueRegex(auth, me, []byte(key), regex)
}
case influxql.NEQREGEX:
checkMeasurement = func(auth query.Authorizer, me []byte) (bool, error) {
// If there is an authorized series in this measurement and that series
// does not contain the tag key/value.
ok := is.measurementAuthorizedSeries(auth, me, func(tags models.Tags) bool {
return regex.MatchString(tags.GetString(key))
})
return ok, nil
}
default:
return nil, fmt.Errorf("unsupported operand: %s", op)
}
for {
me, err := mitr.Next()
if err != nil {
return nil, err
} else if me == nil {
break
}
ok, err := checkMeasurement(auth, me)
if err != nil {
return nil, err
} else if ok {
names = append(names, me)
}
}
bytesutil.Sort(names)
return names, nil
}
2017-12-12 21:22:42 +00:00
// measurementAuthorizedSeries determines if the measurement contains a series
// that is authorized to be read.
fix(services/storage): multi measurement queries return all applicable series (#19592) (#20934) This fixes multi measurement queries that go through the storage service to correctly pick up all series that apply with the filter. Previously, negative queries such as `!=`, `!~`, and predicates attempting to match empty tags did not work correctly with the storage service when multiple measurements or `OR` conditions were included. This was because these predicates would be categorized as "multiple measurements" and then it would attempt to use the field keys iterator to find the fields for each measurement. The meta queries for these did not correctly account for negative equality operators or empty tags when finding appropriate measurements and those could not be changed because it would cause a breaking change to influxql too. This modifies the storage service to use new methods that correctly account for the above situations rather than the field keys iterator. Some queries that appeared to be single measurement queries also get considered as multiple measurement queries. Any query with an `OR` condition will be considered a multiple measurement query. This bug did not apply to single measurement queries where one measurement was selected and all of the logical operators were `AND` values. This is because it used a different code path that correctly handled these situations. Backport of #19566. (cherry picked from commit ceead88bd5b1cf0d808031f02ba6a05ac1343eb9) Co-authored-by: Jonathan A. Sternberg <jonathan@influxdata.com>
2021-03-12 21:34:14 +00:00
func (is IndexSet) measurementAuthorizedSeries(auth query.Authorizer, name []byte, exclude func(tags models.Tags) bool) bool {
if query.AuthorizerIsOpen(auth) && exclude == nil {
2017-12-12 21:22:42 +00:00
return true
}
fix(services/storage): multi measurement queries return all applicable series (#19592) (#20934) This fixes multi measurement queries that go through the storage service to correctly pick up all series that apply with the filter. Previously, negative queries such as `!=`, `!~`, and predicates attempting to match empty tags did not work correctly with the storage service when multiple measurements or `OR` conditions were included. This was because these predicates would be categorized as "multiple measurements" and then it would attempt to use the field keys iterator to find the fields for each measurement. The meta queries for these did not correctly account for negative equality operators or empty tags when finding appropriate measurements and those could not be changed because it would cause a breaking change to influxql too. This modifies the storage service to use new methods that correctly account for the above situations rather than the field keys iterator. Some queries that appeared to be single measurement queries also get considered as multiple measurement queries. Any query with an `OR` condition will be considered a multiple measurement query. This bug did not apply to single measurement queries where one measurement was selected and all of the logical operators were `AND` values. This is because it used a different code path that correctly handled these situations. Backport of #19566. (cherry picked from commit ceead88bd5b1cf0d808031f02ba6a05ac1343eb9) Co-authored-by: Jonathan A. Sternberg <jonathan@influxdata.com>
2021-03-12 21:34:14 +00:00
if auth == nil {
auth = query.OpenAuthorizer
}
sitr, err := is.measurementSeriesIDIterator(name)
if err != nil || sitr == nil {
return false
}
2017-12-12 21:22:42 +00:00
defer sitr.Close()
sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr)
2017-12-12 21:22:42 +00:00
for {
series, err := sitr.Next()
if err != nil {
return false
}
if series.SeriesID == 0 {
return false // End of iterator
}
name, tags := is.SeriesFile.Series(series.SeriesID)
if auth.AuthorizeSeriesRead(is.Database(), name, tags) {
fix(services/storage): multi measurement queries return all applicable series (#19592) (#20934) This fixes multi measurement queries that go through the storage service to correctly pick up all series that apply with the filter. Previously, negative queries such as `!=`, `!~`, and predicates attempting to match empty tags did not work correctly with the storage service when multiple measurements or `OR` conditions were included. This was because these predicates would be categorized as "multiple measurements" and then it would attempt to use the field keys iterator to find the fields for each measurement. The meta queries for these did not correctly account for negative equality operators or empty tags when finding appropriate measurements and those could not be changed because it would cause a breaking change to influxql too. This modifies the storage service to use new methods that correctly account for the above situations rather than the field keys iterator. Some queries that appeared to be single measurement queries also get considered as multiple measurement queries. Any query with an `OR` condition will be considered a multiple measurement query. This bug did not apply to single measurement queries where one measurement was selected and all of the logical operators were `AND` values. This is because it used a different code path that correctly handled these situations. Backport of #19566. (cherry picked from commit ceead88bd5b1cf0d808031f02ba6a05ac1343eb9) Co-authored-by: Jonathan A. Sternberg <jonathan@influxdata.com>
2021-03-12 21:34:14 +00:00
if exclude != nil && exclude(tags) {
continue
}
2017-12-12 21:22:42 +00:00
return true
}
}
}
fix(services/storage): multi measurement queries return all applicable series (#19592) (#20934) This fixes multi measurement queries that go through the storage service to correctly pick up all series that apply with the filter. Previously, negative queries such as `!=`, `!~`, and predicates attempting to match empty tags did not work correctly with the storage service when multiple measurements or `OR` conditions were included. This was because these predicates would be categorized as "multiple measurements" and then it would attempt to use the field keys iterator to find the fields for each measurement. The meta queries for these did not correctly account for negative equality operators or empty tags when finding appropriate measurements and those could not be changed because it would cause a breaking change to influxql too. This modifies the storage service to use new methods that correctly account for the above situations rather than the field keys iterator. Some queries that appeared to be single measurement queries also get considered as multiple measurement queries. Any query with an `OR` condition will be considered a multiple measurement query. This bug did not apply to single measurement queries where one measurement was selected and all of the logical operators were `AND` values. This is because it used a different code path that correctly handled these situations. Backport of #19566. (cherry picked from commit ceead88bd5b1cf0d808031f02ba6a05ac1343eb9) Co-authored-by: Jonathan A. Sternberg <jonathan@influxdata.com>
2021-03-12 21:34:14 +00:00
func (is IndexSet) measurementHasTagValue(auth query.Authorizer, me, key, value []byte) (bool, error) {
if len(value) == 0 {
return is.measurementHasEmptyTagValue(auth, me, key)
}
hasTagValue, err := is.HasTagValue(me, key, value)
if err != nil || !hasTagValue {
return false, err
}
// If the authorizer is open, return true.
if query.AuthorizerIsOpen(auth) {
return true, nil
}
// When an authorizer is present, the measurement should be
// included only if one of it's series is authorized.
sitr, err := is.tagValueSeriesIDIterator(me, key, value)
if err != nil || sitr == nil {
return false, err
}
defer sitr.Close()
sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr)
// Locate a series with this matching tag value that's authorized.
for {
se, err := sitr.Next()
if err != nil || se.SeriesID == 0 {
return false, err
}
name, tags := is.SeriesFile.Series(se.SeriesID)
if auth.AuthorizeSeriesRead(is.Database(), name, tags) {
return true, nil
}
}
}
func (is IndexSet) measurementHasEmptyTagValue(auth query.Authorizer, me, key []byte) (bool, error) {
// Any series that does not have a tag key
// has an empty tag value for that key.
// Iterate through all of the series to find one
// series that does not have the tag key.
sitr, err := is.measurementSeriesIDIterator(me)
if err != nil || sitr == nil {
return false, err
}
defer sitr.Close()
sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr)
for {
series, err := sitr.Next()
if err != nil || series.SeriesID == 0 {
return false, err
}
name, tags := is.SeriesFile.Series(series.SeriesID)
if len(tags.Get(key)) > 0 {
// The tag key exists in this series. We need
// at least one series that does not have the tag
// keys.
continue
}
// Verify that we can see this series.
if query.AuthorizerIsOpen(auth) {
return true, nil
} else if auth.AuthorizeSeriesRead(is.Database(), name, tags) {
return true, nil
}
}
}
func (is IndexSet) measurementHasTagValueRegex(auth query.Authorizer, me, key []byte, value *regexp.Regexp) (bool, error) {
// If the regex matches the empty string, do a special check to see
// if we have an empty tag value.
if matchEmpty := value.MatchString(""); matchEmpty {
if ok, err := is.measurementHasEmptyTagValue(auth, me, key); err != nil {
return false, err
} else if ok {
return true, nil
}
}
// Iterate over the tag values and find one that matches the value.
vitr, err := is.tagValueIterator(me, key)
if err != nil || vitr == nil {
return false, err
}
defer vitr.Close()
for {
ve, err := vitr.Next()
if err != nil || ve == nil {
return false, err
}
if !value.Match(ve) {
// The regex does not match this tag value.
continue
}
// If the authorizer is open, then we have found a suitable tag value.
if query.AuthorizerIsOpen(auth) {
return true, nil
}
// When an authorizer is present, the measurement should only be included
// if one of the series is authorized.
if authorized, err := func() (bool, error) {
sitr, err := is.tagValueSeriesIDIterator(me, key, ve)
if err != nil || sitr == nil {
return false, err
}
defer sitr.Close()
sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr)
// Locate an authorized series.
for {
se, err := sitr.Next()
if err != nil || se.SeriesID == 0 {
return false, err
}
name, tags := is.SeriesFile.Series(se.SeriesID)
if auth.AuthorizeSeriesRead(is.Database(), name, tags) {
return true, nil
}
}
}(); err != nil {
return false, err
} else if authorized {
return true, nil
}
}
}
2017-12-12 21:22:42 +00:00
// HasTagKey returns true if the tag key exists in any index for the provided
// measurement.
func (is IndexSet) HasTagKey(name, key []byte) (bool, error) {
return is.hasTagKey(name, key)
}
// hasTagKey returns true if the tag key exists in any index for the provided
// measurement, and guarantees to never take a lock on the series file.
func (is IndexSet) hasTagKey(name, key []byte) (bool, error) {
2017-12-12 21:22:42 +00:00
for _, idx := range is.Indexes {
if ok, err := idx.HasTagKey(name, key); err != nil {
return false, err
} else if ok {
return true, nil
}
}
return false, nil
}
// HasTagValue returns true if the tag value exists in any index for the provided
// measurement and tag key.
2017-12-05 17:49:58 +00:00
func (is IndexSet) HasTagValue(name, key, value []byte) (bool, error) {
2017-12-12 21:22:42 +00:00
for _, idx := range is.Indexes {
2017-12-05 17:49:58 +00:00
if ok, err := idx.HasTagValue(name, key, value); err != nil {
return false, err
} else if ok {
return true, nil
}
}
return false, nil
}
2017-11-29 18:20:18 +00:00
// MeasurementIterator returns an iterator over all measurements in the index.
func (is IndexSet) MeasurementIterator() (MeasurementIterator, error) {
return is.measurementIterator()
}
// measurementIterator returns an iterator over all measurements in the index.
// It guarantees to never take any locks on the underlying series file.
func (is IndexSet) measurementIterator() (MeasurementIterator, error) {
2017-12-12 21:22:42 +00:00
a := make([]MeasurementIterator, 0, len(is.Indexes))
for _, idx := range is.Indexes {
2017-11-29 18:20:18 +00:00
itr, err := idx.MeasurementIterator()
if err != nil {
MeasurementIterators(a).Close()
return nil, err
} else if itr != nil {
a = append(a, itr)
}
}
return MergeMeasurementIterators(a...), nil
}
2017-12-05 17:49:58 +00:00
// TagKeyIterator returns a key iterator for a measurement.
func (is IndexSet) TagKeyIterator(name []byte) (TagKeyIterator, error) {
return is.tagKeyIterator(name)
}
// tagKeyIterator returns a key iterator for a measurement. It guarantees to never
// take any locks on the underlying series file.
func (is IndexSet) tagKeyIterator(name []byte) (TagKeyIterator, error) {
2017-12-12 21:22:42 +00:00
a := make([]TagKeyIterator, 0, len(is.Indexes))
for _, idx := range is.Indexes {
2017-12-05 17:49:58 +00:00
itr, err := idx.TagKeyIterator(name)
if err != nil {
TagKeyIterators(a).Close()
return nil, err
} else if itr != nil {
a = append(a, itr)
}
}
return MergeTagKeyIterators(a...), nil
}
2017-11-29 18:20:18 +00:00
// TagValueIterator returns a value iterator for a tag key.
2017-12-12 21:22:42 +00:00
func (is IndexSet) TagValueIterator(name, key []byte) (TagValueIterator, error) {
return is.tagValueIterator(name, key)
}
// tagValueIterator returns a value iterator for a tag key. It guarantees to never
// take any locks on the underlying series file.
func (is IndexSet) tagValueIterator(name, key []byte) (TagValueIterator, error) {
2017-12-12 21:22:42 +00:00
a := make([]TagValueIterator, 0, len(is.Indexes))
for _, idx := range is.Indexes {
itr, err := idx.TagValueIterator(name, key)
2017-11-29 18:20:18 +00:00
if err != nil {
TagValueIterators(a).Close()
return nil, err
} else if itr != nil {
a = append(a, itr)
}
}
return MergeTagValueIterators(a...), nil
}
2017-12-08 17:11:07 +00:00
// TagKeyHasAuthorizedSeries determines if there exists an authorized series for
// the provided measurement name and tag key.
2017-12-12 21:22:42 +00:00
func (is IndexSet) TagKeyHasAuthorizedSeries(auth query.Authorizer, name, tagKey []byte) (bool, error) {
if !is.HasInmemIndex() && query.AuthorizerIsOpen(auth) {
return true, nil
}
release := is.SeriesFile.Retain()
defer release()
itr, err := is.tagKeySeriesIDIterator(name, tagKey)
2017-12-12 21:22:42 +00:00
if err != nil {
return false, err
} else if itr == nil {
return false, nil
}
defer itr.Close()
itr = FilterUndeletedSeriesIDIterator(is.SeriesFile, itr)
2017-12-12 21:22:42 +00:00
for {
e, err := itr.Next()
if err != nil {
return false, err
}
if e.SeriesID == 0 {
return false, nil
}
2018-01-17 14:00:24 +00:00
if query.AuthorizerIsOpen(auth) {
2017-12-12 21:22:42 +00:00
return true, nil
}
name, tags := is.SeriesFile.Series(e.SeriesID)
if auth.AuthorizeSeriesRead(is.Database(), name, tags) {
return true, nil
}
}
2017-12-08 17:11:07 +00:00
}
2017-11-29 18:20:18 +00:00
// MeasurementSeriesIDIterator returns an iterator over all non-tombstoned series
// for the provided measurement.
func (is IndexSet) MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, error) {
release := is.SeriesFile.Retain()
defer release()
itr, err := is.measurementSeriesIDIterator(name)
if err != nil {
return nil, err
}
return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
}
// measurementSeriesIDIterator does not provide any locking on the Series file.
//
// See MeasurementSeriesIDIterator for more details.
func (is IndexSet) measurementSeriesIDIterator(name []byte) (SeriesIDIterator, error) {
2017-12-12 21:22:42 +00:00
a := make([]SeriesIDIterator, 0, len(is.Indexes))
for _, idx := range is.Indexes {
2017-11-29 18:20:18 +00:00
itr, err := idx.MeasurementSeriesIDIterator(name)
if err != nil {
SeriesIDIterators(a).Close()
return nil, err
} else if itr != nil {
a = append(a, itr)
}
}
2018-06-27 16:05:51 +00:00
return MergeSeriesIDIterators(a...), nil
2017-11-29 18:20:18 +00:00
}
2017-12-05 17:49:58 +00:00
// ForEachMeasurementTagKey iterates over all tag keys in a measurement and applies
// the provided function.
func (is IndexSet) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
release := is.SeriesFile.Retain()
defer release()
itr, err := is.tagKeyIterator(name)
2017-12-05 17:49:58 +00:00
if err != nil {
return err
} else if itr == nil {
return nil
}
2017-12-06 16:09:41 +00:00
defer itr.Close()
2017-12-05 17:49:58 +00:00
for {
key, err := itr.Next()
if err != nil {
return err
} else if key == nil {
return nil
}
if err := fn(key); err != nil {
return err
}
}
}
// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression.
func (is IndexSet) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
release := is.SeriesFile.Retain()
defer release()
2017-12-05 17:49:58 +00:00
keys := make(map[string]struct{})
2017-12-12 21:22:42 +00:00
for _, idx := range is.Indexes {
2017-12-05 17:49:58 +00:00
m, err := idx.MeasurementTagKeysByExpr(name, expr)
if err != nil {
return nil, err
}
for k := range m {
keys[k] = struct{}{}
}
}
return keys, nil
}
2017-11-29 18:20:18 +00:00
// TagKeySeriesIDIterator returns a series iterator for all values across a single key.
func (is IndexSet) TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error) {
release := is.SeriesFile.Retain()
defer release()
itr, err := is.tagKeySeriesIDIterator(name, key)
if err != nil {
return nil, err
}
return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
}
// tagKeySeriesIDIterator returns a series iterator for all values across a
// single key.
//
// It guarantees to never take any locks on the series file.
func (is IndexSet) tagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error) {
2017-12-12 21:22:42 +00:00
a := make([]SeriesIDIterator, 0, len(is.Indexes))
for _, idx := range is.Indexes {
2017-11-29 18:20:18 +00:00
itr, err := idx.TagKeySeriesIDIterator(name, key)
if err != nil {
SeriesIDIterators(a).Close()
return nil, err
} else if itr != nil {
a = append(a, itr)
}
}
2018-06-27 16:05:51 +00:00
return MergeSeriesIDIterators(a...), nil
2017-11-29 18:20:18 +00:00
}
// TagValueSeriesIDIterator returns a series iterator for a single tag value.
func (is IndexSet) TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error) {
release := is.SeriesFile.Retain()
defer release()
itr, err := is.tagValueSeriesIDIterator(name, key, value)
if err != nil {
return nil, err
}
return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
}
// tagValueSeriesIDIterator does not provide any locking on the Series File.
//
// See TagValueSeriesIDIterator for more details.
func (is IndexSet) tagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error) {
2017-12-12 21:22:42 +00:00
a := make([]SeriesIDIterator, 0, len(is.Indexes))
for _, idx := range is.Indexes {
2017-11-29 18:20:18 +00:00
itr, err := idx.TagValueSeriesIDIterator(name, key, value)
if err != nil {
SeriesIDIterators(a).Close()
return nil, err
} else if itr != nil {
a = append(a, itr)
}
}
2018-06-27 16:05:51 +00:00
return MergeSeriesIDIterators(a...), nil
2017-11-29 18:20:18 +00:00
}
// MeasurementSeriesByExprIterator returns a series iterator for a measurement
// that is filtered by expr. If expr only contains time expressions then this
// call is equivalent to MeasurementSeriesIDIterator().
func (is IndexSet) MeasurementSeriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIDIterator, error) {
release := is.SeriesFile.Retain()
defer release()
return is.measurementSeriesByExprIterator(name, expr)
}
// measurementSeriesByExprIterator returns a series iterator for a measurement
// that is filtered by expr. See MeasurementSeriesByExprIterator for more details.
//
// measurementSeriesByExprIterator guarantees to never take any locks on the
// series file.
func (is IndexSet) measurementSeriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIDIterator, error) {
2017-11-29 18:20:18 +00:00
// Return all series for the measurement if there are no tag expressions.
if expr == nil {
itr, err := is.measurementSeriesIDIterator(name)
if err != nil {
return nil, err
}
return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
2017-11-29 18:20:18 +00:00
}
itr, err := is.seriesByExprIterator(name, expr)
if err != nil {
return nil, err
}
return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
2017-11-29 18:20:18 +00:00
}
type measurementSeriesKeyByExprIterator struct {
ids SeriesIDIterator
is IndexSet
auth query.Authorizer
once sync.Once
releaser func()
}
func (itr *measurementSeriesKeyByExprIterator) Next() ([]byte, error) {
if itr.ids == nil {
return nil, nil
}
for {
e, err := itr.ids.Next()
if err != nil {
return nil, err
} else if e.SeriesID == 0 {
return nil, nil
}
seriesKey := itr.is.SeriesFile.SeriesKey(e.SeriesID)
if len(seriesKey) == 0 {
continue
}
name, tags := ParseSeriesKey(seriesKey)
// Check leftover filters. All fields that might be filtered default to zero values
if e.Expr != nil {
if v, ok := e.Expr.(*influxql.BooleanLiteral); ok {
if !v.Val {
continue
}
} else {
values := make(map[string]interface{}, len(tags))
for _, t := range tags {
values[string(t.Key)] = string(t.Value)
}
if !influxql.EvalBool(e.Expr, values) {
continue
}
}
}
if itr.auth != nil && !itr.auth.AuthorizeSeriesRead(itr.is.Database(), name, tags) {
continue
}
out := models.MakeKey(name, tags)
// ensure nil is only returned when we are done (or for errors)
if out == nil {
out = []byte{}
}
return out, nil
}
}
func (itr *measurementSeriesKeyByExprIterator) Close() error {
// assume that we don't have to release if ids is nil - see MeasurementSeriesKeyByExprIterator
if itr.ids == nil {
return nil
}
itr.once.Do(itr.releaser)
return itr.ids.Close()
}
// MeasurementSeriesKeyByExprIterator iterates through series, filtered by an expression on the tags.
// Any non-tag expressions will be filtered as if the field had the zero value.
func (is IndexSet) MeasurementSeriesKeyByExprIterator(name []byte, expr influxql.Expr, auth query.Authorizer) (SeriesKeyIterator, error) {
release := is.SeriesFile.Retain()
// Create iterator for all matching series.
ids, err := is.measurementSeriesByExprIterator(name, expr)
if err != nil {
release()
return nil, err
}
if ids == nil {
release()
release = nil
}
return &measurementSeriesKeyByExprIterator{
ids: ids,
releaser: release,
auth: auth,
is: is,
}, nil
}
2017-11-29 18:20:18 +00:00
// MeasurementSeriesKeysByExpr returns a list of series keys matching expr.
2017-12-12 21:22:42 +00:00
func (is IndexSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) {
release := is.SeriesFile.Retain()
defer release()
2017-11-29 18:20:18 +00:00
// Create iterator for all matching series.
itr, err := is.measurementSeriesByExprIterator(name, expr)
2017-11-29 18:20:18 +00:00
if err != nil {
return nil, err
} else if itr == nil {
return nil, nil
}
2017-12-02 23:52:34 +00:00
defer itr.Close()
2017-11-29 18:20:18 +00:00
// measurementSeriesByExprIterator filters deleted series; no need to do so here.
2017-11-29 18:20:18 +00:00
// Iterate over all series and generate keys.
var keys [][]byte
for {
e, err := itr.Next()
if err != nil {
return nil, err
} else if e.SeriesID == 0 {
break
}
// Check for unsupported field filters.
// Any remaining filters means there were fields (e.g., `WHERE value = 1.2`).
if e.Expr != nil {
if v, ok := e.Expr.(*influxql.BooleanLiteral); !ok || !v.Val {
return nil, errors.New("fields not supported in WHERE clause during deletion")
}
}
2017-12-12 21:22:42 +00:00
seriesKey := is.SeriesFile.SeriesKey(e.SeriesID)
if len(seriesKey) == 0 {
continue
}
2017-11-29 18:20:18 +00:00
name, tags := ParseSeriesKey(seriesKey)
keys = append(keys, models.MakeKey(name, tags))
}
bytesutil.Sort(keys)
return keys, nil
2017-11-27 14:52:18 +00:00
}
func (is IndexSet) seriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIDIterator, error) {
2017-11-29 18:20:18 +00:00
switch expr := expr.(type) {
case *influxql.BinaryExpr:
switch expr.Op {
case influxql.AND, influxql.OR:
// Get the series IDs and filter expressions for the LHS.
litr, err := is.seriesByExprIterator(name, expr.LHS)
2017-11-29 18:20:18 +00:00
if err != nil {
return nil, err
}
// Get the series IDs and filter expressions for the RHS.
ritr, err := is.seriesByExprIterator(name, expr.RHS)
2017-11-29 18:20:18 +00:00
if err != nil {
2017-12-02 23:52:34 +00:00
if litr != nil {
litr.Close()
}
2017-11-29 18:20:18 +00:00
return nil, err
}
// Intersect iterators if expression is "AND".
if expr.Op == influxql.AND {
return IntersectSeriesIDIterators(litr, ritr), nil
}
// Union iterators if expression is "OR".
return UnionSeriesIDIterators(litr, ritr), nil
default:
return is.seriesByBinaryExprIterator(name, expr)
2017-11-29 18:20:18 +00:00
}
case *influxql.ParenExpr:
return is.seriesByExprIterator(name, expr.Expr)
2017-11-29 18:20:18 +00:00
case *influxql.BooleanLiteral:
if expr.Val {
return is.measurementSeriesIDIterator(name)
}
return nil, nil
2017-11-29 18:20:18 +00:00
default:
return nil, nil
2017-11-27 14:52:18 +00:00
}
}
2017-11-29 18:20:18 +00:00
// seriesByBinaryExprIterator returns a series iterator and a filtering expression.
func (is IndexSet) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExpr) (SeriesIDIterator, error) {
2017-11-29 18:20:18 +00:00
// If this binary expression has another binary expression, then this
// is some expression math and we should just pass it to the underlying query.
if _, ok := n.LHS.(*influxql.BinaryExpr); ok {
itr, err := is.measurementSeriesIDIterator(name)
2017-11-29 18:20:18 +00:00
if err != nil {
return nil, err
}
return newSeriesIDExprIterator(itr, n), nil
} else if _, ok := n.RHS.(*influxql.BinaryExpr); ok {
itr, err := is.measurementSeriesIDIterator(name)
2017-11-29 18:20:18 +00:00
if err != nil {
return nil, err
}
return newSeriesIDExprIterator(itr, n), nil
}
2017-11-27 14:52:18 +00:00
2017-11-29 18:20:18 +00:00
// Retrieve the variable reference from the correct side of the expression.
key, ok := n.LHS.(*influxql.VarRef)
value := n.RHS
if !ok {
key, ok = n.RHS.(*influxql.VarRef)
if !ok {
// This is an expression we do not know how to evaluate. Let the
// query engine take care of this.
itr, err := is.measurementSeriesIDIterator(name)
if err != nil {
return nil, err
}
return newSeriesIDExprIterator(itr, n), nil
2017-11-29 18:20:18 +00:00
}
value = n.LHS
}
// For fields, return all series from this measurement.
if key.Val != "_name" && ((key.Type == influxql.Unknown && is.HasField(name, key.Val)) || key.Type == influxql.AnyField || (key.Type != influxql.Tag && key.Type != influxql.Unknown)) {
itr, err := is.measurementSeriesIDIterator(name)
2017-11-29 18:20:18 +00:00
if err != nil {
return nil, err
}
return newSeriesIDExprIterator(itr, n), nil
} else if value, ok := value.(*influxql.VarRef); ok {
// Check if the RHS is a variable and if it is a field.
if value.Val != "_name" && ((value.Type == influxql.Unknown && is.HasField(name, value.Val)) || key.Type == influxql.AnyField || (value.Type != influxql.Tag && value.Type != influxql.Unknown)) {
itr, err := is.measurementSeriesIDIterator(name)
2017-11-29 18:20:18 +00:00
if err != nil {
return nil, err
}
return newSeriesIDExprIterator(itr, n), nil
}
}
// Create iterator based on value type.
switch value := value.(type) {
case *influxql.StringLiteral:
return is.seriesByBinaryExprStringIterator(name, []byte(key.Val), []byte(value.Val), n.Op)
case *influxql.RegexLiteral:
return is.seriesByBinaryExprRegexIterator(name, []byte(key.Val), value.Val, n.Op)
case *influxql.VarRef:
return is.seriesByBinaryExprVarRefIterator(name, []byte(key.Val), value, n.Op)
default:
// We do not know how to evaluate this expression so pass it
// on to the query engine.
itr, err := is.measurementSeriesIDIterator(name)
if err != nil {
return nil, err
2017-11-29 18:20:18 +00:00
}
return newSeriesIDExprIterator(itr, n), nil
2017-11-29 18:20:18 +00:00
}
2017-11-27 14:52:18 +00:00
}
2017-11-29 18:20:18 +00:00
func (is IndexSet) seriesByBinaryExprStringIterator(name, key, value []byte, op influxql.Token) (SeriesIDIterator, error) {
// Special handling for "_name" to match measurement name.
if bytes.Equal(key, []byte("_name")) {
if (op == influxql.EQ && bytes.Equal(value, name)) || (op == influxql.NEQ && !bytes.Equal(value, name)) {
return is.measurementSeriesIDIterator(name)
2017-11-29 18:20:18 +00:00
}
return nil, nil
}
if op == influxql.EQ {
// Match a specific value.
if len(value) != 0 {
return is.tagValueSeriesIDIterator(name, key, value)
2017-11-29 18:20:18 +00:00
}
mitr, err := is.measurementSeriesIDIterator(name)
2017-11-29 18:20:18 +00:00
if err != nil {
return nil, err
}
kitr, err := is.tagKeySeriesIDIterator(name, key)
2017-11-29 18:20:18 +00:00
if err != nil {
2017-12-06 16:09:41 +00:00
if mitr != nil {
mitr.Close()
}
2017-11-29 18:20:18 +00:00
return nil, err
}
// Return all measurement series that have no values from this tag key.
return DifferenceSeriesIDIterators(mitr, kitr), nil
}
// Return all measurement series without this tag value.
if len(value) != 0 {
mitr, err := is.measurementSeriesIDIterator(name)
2017-11-29 18:20:18 +00:00
if err != nil {
return nil, err
}
vitr, err := is.tagValueSeriesIDIterator(name, key, value)
2017-11-29 18:20:18 +00:00
if err != nil {
2017-12-06 16:09:41 +00:00
if mitr != nil {
mitr.Close()
}
2017-11-29 18:20:18 +00:00
return nil, err
}
return DifferenceSeriesIDIterators(mitr, vitr), nil
}
// Return all series across all values of this tag key.
return is.tagKeySeriesIDIterator(name, key)
2017-11-29 18:20:18 +00:00
}
func (is IndexSet) seriesByBinaryExprRegexIterator(name, key []byte, value *regexp.Regexp, op influxql.Token) (SeriesIDIterator, error) {
// Special handling for "_name" to match measurement name.
if bytes.Equal(key, []byte("_name")) {
match := value.Match(name)
if (op == influxql.EQREGEX && match) || (op == influxql.NEQREGEX && !match) {
mitr, err := is.measurementSeriesIDIterator(name)
2017-11-29 18:20:18 +00:00
if err != nil {
return nil, err
}
return newSeriesIDExprIterator(mitr, &influxql.BooleanLiteral{Val: true}), nil
}
return nil, nil
}
return is.matchTagValueSeriesIDIterator(name, key, value, op == influxql.EQREGEX)
2017-11-29 18:20:18 +00:00
}
func (is IndexSet) seriesByBinaryExprVarRefIterator(name, key []byte, value *influxql.VarRef, op influxql.Token) (SeriesIDIterator, error) {
itr0, err := is.tagKeySeriesIDIterator(name, key)
2017-11-29 18:20:18 +00:00
if err != nil {
return nil, err
}
itr1, err := is.tagKeySeriesIDIterator(name, []byte(value.Val))
2017-11-29 18:20:18 +00:00
if err != nil {
2017-12-06 16:09:41 +00:00
if itr0 != nil {
itr0.Close()
}
2017-11-29 18:20:18 +00:00
return nil, err
}
if op == influxql.EQ {
return IntersectSeriesIDIterators(itr0, itr1), nil
}
return DifferenceSeriesIDIterators(itr0, itr1), nil
}
// MatchTagValueSeriesIDIterator returns a series iterator for tags which match value.
// If matches is false, returns iterators which do not match value.
func (is IndexSet) MatchTagValueSeriesIDIterator(name, key []byte, value *regexp.Regexp, matches bool) (SeriesIDIterator, error) {
release := is.SeriesFile.Retain()
defer release()
itr, err := is.matchTagValueSeriesIDIterator(name, key, value, matches)
if err != nil {
return nil, err
}
return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
}
// matchTagValueSeriesIDIterator returns a series iterator for tags which match
// value. See MatchTagValueSeriesIDIterator for more details.
//
// It guarantees to never take any locks on the underlying series file.
func (is IndexSet) matchTagValueSeriesIDIterator(name, key []byte, value *regexp.Regexp, matches bool) (SeriesIDIterator, error) {
2017-11-29 18:20:18 +00:00
matchEmpty := value.MatchString("")
if matches {
if matchEmpty {
return is.matchTagValueEqualEmptySeriesIDIterator(name, key, value)
}
return is.matchTagValueEqualNotEmptySeriesIDIterator(name, key, value)
}
if matchEmpty {
return is.matchTagValueNotEqualEmptySeriesIDIterator(name, key, value)
}
return is.matchTagValueNotEqualNotEmptySeriesIDIterator(name, key, value)
}
func (is IndexSet) matchTagValueEqualEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (SeriesIDIterator, error) {
vitr, err := is.tagValueIterator(name, key)
2017-11-29 18:20:18 +00:00
if err != nil {
return nil, err
} else if vitr == nil {
return is.measurementSeriesIDIterator(name)
2017-11-29 18:20:18 +00:00
}
defer vitr.Close()
var itrs []SeriesIDIterator
2017-12-02 23:52:34 +00:00
if err := func() error {
for {
e, err := vitr.Next()
2017-11-29 18:20:18 +00:00
if err != nil {
2017-12-02 23:52:34 +00:00
return err
2017-12-04 17:29:04 +00:00
} else if e == nil {
2017-12-02 23:52:34 +00:00
break
}
if !value.Match(e) {
itr, err := is.tagValueSeriesIDIterator(name, key, e)
2017-12-02 23:52:34 +00:00
if err != nil {
return err
} else if itr != nil {
itrs = append(itrs, itr)
2017-12-02 23:52:34 +00:00
}
2017-11-29 18:20:18 +00:00
}
}
2017-12-02 23:52:34 +00:00
return nil
}(); err != nil {
SeriesIDIterators(itrs).Close()
return nil, err
2017-11-29 18:20:18 +00:00
}
mitr, err := is.measurementSeriesIDIterator(name)
2017-11-29 18:20:18 +00:00
if err != nil {
SeriesIDIterators(itrs).Close()
return nil, err
}
return DifferenceSeriesIDIterators(mitr, MergeSeriesIDIterators(itrs...)), nil
}
func (is IndexSet) matchTagValueEqualNotEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (SeriesIDIterator, error) {
vitr, err := is.tagValueIterator(name, key)
2017-11-29 18:20:18 +00:00
if err != nil {
return nil, err
} else if vitr == nil {
return nil, nil
}
defer vitr.Close()
var itrs []SeriesIDIterator
for {
e, err := vitr.Next()
if err != nil {
SeriesIDIterators(itrs).Close()
return nil, err
2017-12-04 17:29:04 +00:00
} else if e == nil {
2017-11-29 18:20:18 +00:00
break
}
if value.Match(e) {
itr, err := is.tagValueSeriesIDIterator(name, key, e)
2017-11-29 18:20:18 +00:00
if err != nil {
SeriesIDIterators(itrs).Close()
return nil, err
} else if itr != nil {
itrs = append(itrs, itr)
2017-11-29 18:20:18 +00:00
}
}
}
return MergeSeriesIDIterators(itrs...), nil
}
func (is IndexSet) matchTagValueNotEqualEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (SeriesIDIterator, error) {
vitr, err := is.tagValueIterator(name, key)
2017-11-29 18:20:18 +00:00
if err != nil {
return nil, err
} else if vitr == nil {
return nil, nil
}
defer vitr.Close()
var itrs []SeriesIDIterator
for {
e, err := vitr.Next()
if err != nil {
SeriesIDIterators(itrs).Close()
return nil, err
2017-12-04 17:29:04 +00:00
} else if e == nil {
2017-11-29 18:20:18 +00:00
break
}
if !value.Match(e) {
itr, err := is.tagValueSeriesIDIterator(name, key, e)
2017-11-29 18:20:18 +00:00
if err != nil {
SeriesIDIterators(itrs).Close()
return nil, err
} else if itr != nil {
itrs = append(itrs, itr)
2017-11-29 18:20:18 +00:00
}
}
}
return MergeSeriesIDIterators(itrs...), nil
}
func (is IndexSet) matchTagValueNotEqualNotEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (SeriesIDIterator, error) {
vitr, err := is.tagValueIterator(name, key)
2017-11-29 18:20:18 +00:00
if err != nil {
return nil, err
} else if vitr == nil {
return is.measurementSeriesIDIterator(name)
2017-11-29 18:20:18 +00:00
}
defer vitr.Close()
var itrs []SeriesIDIterator
for {
e, err := vitr.Next()
2017-11-27 14:52:18 +00:00
if err != nil {
2017-11-29 18:20:18 +00:00
SeriesIDIterators(itrs).Close()
2017-11-27 14:52:18 +00:00
return nil, err
2017-12-04 17:29:04 +00:00
} else if e == nil {
2017-11-29 18:20:18 +00:00
break
}
if value.Match(e) {
itr, err := is.tagValueSeriesIDIterator(name, key, e)
2017-11-29 18:20:18 +00:00
if err != nil {
SeriesIDIterators(itrs).Close()
return nil, err
} else if itr != nil {
itrs = append(itrs, itr)
2017-11-29 18:20:18 +00:00
}
}
}
mitr, err := is.measurementSeriesIDIterator(name)
2017-11-29 18:20:18 +00:00
if err != nil {
SeriesIDIterators(itrs).Close()
return nil, err
}
return DifferenceSeriesIDIterators(mitr, MergeSeriesIDIterators(itrs...)), nil
}
// tagValuesByKeyAndExpr retrieves tag values for the provided tag keys.
2017-11-29 18:20:18 +00:00
//
// tagValuesByKeyAndExpr returns sets of values for each key, indexable by the
2017-11-29 18:20:18 +00:00
// position of the tag key in the keys argument.
//
// N.B tagValuesByKeyAndExpr relies on keys being sorted in ascending
// lexicographic order.
//
// tagValuesByKeyAndExpr guarantees to never take any locks on the underlying
// series file.
func (is IndexSet) tagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr) ([]map[string]struct{}, error) {
2017-11-29 18:20:18 +00:00
database := is.Database()
valueExpr, remainingExpr, err := influxql.PartitionExpr(influxql.CloneExpr(expr), func(e influxql.Expr) (bool, error) {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if ok && tag.Val == "value" {
return true, nil
}
}
}
return false, nil
})
if err != nil {
return nil, err
}
if remainingExpr == nil {
remainingExpr = &influxql.BooleanLiteral{Val: true}
}
itr, err := is.seriesByExprIterator(name, remainingExpr)
2017-11-29 18:20:18 +00:00
if err != nil {
2017-12-12 21:22:42 +00:00
return nil, err
2017-11-29 18:20:18 +00:00
} else if itr == nil {
2017-12-12 21:22:42 +00:00
return nil, nil
2017-11-29 18:20:18 +00:00
}
itr = FilterUndeletedSeriesIDIterator(is.SeriesFile, itr)
2017-12-02 23:52:34 +00:00
defer itr.Close()
2017-11-29 18:20:18 +00:00
keyIdxs := make(map[string]int, len(keys))
for ki, key := range keys {
keyIdxs[key] = ki
// Check that keys are in order.
if ki > 0 && key < keys[ki-1] {
2017-12-12 21:22:42 +00:00
return nil, fmt.Errorf("keys %v are not in ascending order", keys)
2017-11-29 18:20:18 +00:00
}
}
2017-12-12 21:22:42 +00:00
resultSet := make([]map[string]struct{}, len(keys))
for i := 0; i < len(resultSet); i++ {
resultSet[i] = make(map[string]struct{})
}
2017-11-29 18:20:18 +00:00
// Iterate all series to collect tag values.
for {
e, err := itr.Next()
if err != nil {
2017-12-12 21:22:42 +00:00
return nil, err
2017-11-27 14:52:18 +00:00
} else if e.SeriesID == 0 {
2017-11-29 18:20:18 +00:00
break
2017-11-27 14:52:18 +00:00
}
if e.Expr != nil {
// We don't yet have code that correctly processes expressions that
// seriesByExprIterator doesn't handle
lit, ok := e.Expr.(*influxql.BooleanLiteral)
if !ok {
return nil, fmt.Errorf("Expression too complex for metaquery: %v", e.Expr)
}
if !lit.Val {
continue
}
}
2017-12-12 21:22:42 +00:00
buf := is.SeriesFile.SeriesKey(e.SeriesID)
if len(buf) == 0 {
2017-11-29 18:20:18 +00:00
continue
}
2017-11-27 14:52:18 +00:00
2017-11-29 18:20:18 +00:00
if auth != nil {
name, tags := ParseSeriesKey(buf)
if !auth.AuthorizeSeriesRead(database, name, tags) {
continue
2017-11-27 14:52:18 +00:00
}
}
2017-11-29 18:20:18 +00:00
_, buf = ReadSeriesKeyLen(buf)
_, buf = ReadSeriesKeyMeasurement(buf)
tagN, buf := ReadSeriesKeyTagN(buf)
for i := 0; i < tagN; i++ {
var key, value []byte
key, value, buf = ReadSeriesKeyTag(buf)
if valueExpr != nil {
if !influxql.EvalBool(valueExpr, map[string]interface{}{"value": string(value)}) {
continue
}
}
2017-11-29 18:20:18 +00:00
if idx, ok := keyIdxs[string(key)]; ok {
resultSet[idx][string(value)] = struct{}{}
} else if string(key) > keys[len(keys)-1] {
// The tag key is > the largest key we're interested in.
break
}
}
}
2017-12-12 21:22:42 +00:00
return resultSet, nil
2017-11-29 18:20:18 +00:00
}
// MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression.
2017-12-12 21:22:42 +00:00
func (is IndexSet) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
2017-11-29 18:20:18 +00:00
if len(keys) == 0 {
return nil, nil
}
2017-12-12 21:22:42 +00:00
results := make([][]string, len(keys))
// If the keys are not sorted, then sort them.
2017-11-29 18:20:18 +00:00
if !keysSorted {
2018-01-23 14:57:51 +00:00
sort.Strings(keys)
2017-11-29 18:20:18 +00:00
}
release := is.SeriesFile.Retain()
defer release()
2017-12-12 21:22:42 +00:00
// No expression means that the values shouldn't be filtered; so fetch them
// all.
2017-11-29 18:20:18 +00:00
if expr == nil {
for ki, key := range keys {
vitr, err := is.tagValueIterator(name, []byte(key))
2017-12-12 21:22:42 +00:00
if err != nil {
return nil, err
} else if vitr == nil {
break
}
defer vitr.Close()
// If no authorizer present then return all values.
2018-01-17 14:00:24 +00:00
if query.AuthorizerIsOpen(auth) {
2017-12-12 21:22:42 +00:00
for {
val, err := vitr.Next()
if err != nil {
return nil, err
} else if val == nil {
break
}
results[ki] = append(results[ki], string(val))
}
continue
}
// Authorization is present — check all series with matching tag values
// and measurements for the presence of an authorized series.
for {
val, err := vitr.Next()
2017-11-29 18:20:18 +00:00
if err != nil {
2017-12-12 21:22:42 +00:00
return nil, err
} else if val == nil {
break
2017-11-29 18:20:18 +00:00
}
sitr, err := is.tagValueSeriesIDIterator(name, []byte(key), val)
2017-12-12 21:22:42 +00:00
if err != nil {
return nil, err
} else if sitr == nil {
2018-01-14 22:17:15 +00:00
continue
2017-11-29 18:20:18 +00:00
}
2017-12-12 21:22:42 +00:00
defer sitr.Close()
sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr)
2017-11-29 18:20:18 +00:00
for {
2017-12-12 21:22:42 +00:00
se, err := sitr.Next()
2017-11-29 18:20:18 +00:00
if err != nil {
2017-12-12 21:22:42 +00:00
return nil, err
}
if se.SeriesID == 0 {
2017-11-29 18:20:18 +00:00
break
}
2017-12-12 21:22:42 +00:00
name, tags := is.SeriesFile.Series(se.SeriesID)
if auth.AuthorizeSeriesRead(is.Database(), name, tags) {
results[ki] = append(results[ki], string(val))
break
2017-11-29 18:20:18 +00:00
}
}
2017-12-12 21:22:42 +00:00
if err := sitr.Close(); err != nil {
return nil, err
}
2017-11-29 18:20:18 +00:00
}
}
2017-12-12 21:22:42 +00:00
return results, nil
}
// This is the case where we have filtered series by some WHERE condition.
// We only care about the tag values for the keys given the
// filtered set of series ids.
resultSet, err := is.tagValuesByKeyAndExpr(auth, name, keys, expr)
2017-12-12 21:22:42 +00:00
if err != nil {
return nil, err
2017-11-27 14:52:18 +00:00
}
2017-11-29 18:20:18 +00:00
// Convert result sets into []string
for i, s := range resultSet {
values := make([]string, 0, len(s))
for v := range s {
values = append(values, v)
}
2018-01-23 14:57:51 +00:00
sort.Strings(values)
2017-11-29 18:20:18 +00:00
results[i] = values
}
return results, nil
}
// TagSets returns an ordered list of tag sets for a measurement by dimension
// and filtered by an optional conditional expression.
func (is IndexSet) TagSets(sfile *SeriesFile, name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) {
release := is.SeriesFile.Retain()
defer release()
itr, err := is.measurementSeriesByExprIterator(name, opt.Condition)
2017-11-29 18:20:18 +00:00
if err != nil {
return nil, err
} else if itr == nil {
return nil, nil
}
defer itr.Close()
// measurementSeriesByExprIterator filters deleted series IDs; no need to
// do so here.
var dims []string
if len(opt.Dimensions) > 0 {
dims = make([]string, len(opt.Dimensions))
copy(dims, opt.Dimensions)
sort.Strings(dims)
2017-11-29 18:20:18 +00:00
}
// For every series, get the tag values for the requested tag keys i.e.
// dimensions. This is the TagSet for that series. Series with the same
// TagSet are then grouped together, because for the purpose of GROUP BY
// they are part of the same composite series.
tagSets := make(map[string]*query.TagSet, 64)
var (
seriesN, maxSeriesN int
db = is.Database()
)
2017-11-29 18:20:18 +00:00
if opt.MaxSeriesN > 0 {
maxSeriesN = opt.MaxSeriesN
} else {
maxSeriesN = int(^uint(0) >> 1)
}
2017-11-29 18:20:18 +00:00
Reduce allocations in TSI TagSets implementation Since all tag sets are materialised to strings before this method returns, a large number of allocations can be avoided by carefully resuing buffers and containers. This commit reduces allocations by about 75%, which can be very significant for high cardinality workloads. The benchmark results shown below are for a benchmark that asks for all series keys matching `tag5=value0'. name old time/op new time/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 5.66s ± 4% 5.70s ± 5% ~ (p=0.739 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 26.5s ± 8% 26.8s ±12% ~ (p=0.579 n=10+10) IndexSet_TagSets/1M_series/inmem-8 11.9ms ±18% 10.4ms ± 2% -12.81% (p=0.000 n=10+10) IndexSet_TagSets/1M_series/tsi1-8 23.4ms ± 5% 18.9ms ± 1% -19.07% (p=0.000 n=10+9) name old alloc/op new alloc/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 2.50GB ± 0% 2.50GB ± 0% ~ (p=0.315 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 32.6GB ± 0% 32.6GB ± 0% ~ (p=0.247 n=10+10) IndexSet_TagSets/1M_series/inmem-8 3.56MB ± 0% 3.56MB ± 0% ~ (all equal) IndexSet_TagSets/1M_series/tsi1-8 12.7MB ± 0% 5.2MB ± 0% -59.02% (p=0.000 n=10+10) name old allocs/op new allocs/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 24.0M ± 0% 24.0M ± 0% ~ (p=0.353 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 96.6M ± 0% 96.7M ± 0% ~ (p=0.579 n=10+10) IndexSet_TagSets/1M_series/inmem-8 51.0 ± 0% 51.0 ± 0% ~ (all equal) IndexSet_TagSets/1M_series/tsi1-8 80.4k ± 0% 20.4k ± 0% -74.65% (p=0.000 n=10+10)
2018-08-09 14:59:37 +00:00
// The tag sets require a string for each series key in the set, The series
// file formatted keys need to be parsed into models format. Since they will
// end up as strings we can re-use an intermediate buffer for this process.
var keyBuf []byte
var tagsBuf models.Tags // Buffer for tags. Tags are not needed outside of each loop iteration.
for {
se, err := itr.Next()
if err != nil {
return nil, err
} else if se.SeriesID == 0 {
break
}
// Skip if the series has been tombstoned.
key := sfile.SeriesKey(se.SeriesID)
if len(key) == 0 {
continue
}
if seriesN&0x3fff == 0x3fff {
// check every 16384 series if the query has been canceled
select {
case <-opt.InterruptCh:
return nil, query.ErrQueryInterrupted
default:
2017-11-29 18:20:18 +00:00
}
}
2017-11-29 18:20:18 +00:00
if seriesN > maxSeriesN {
return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", seriesN, opt.MaxSeriesN)
}
2017-11-29 18:20:18 +00:00
Reduce allocations in TSI TagSets implementation Since all tag sets are materialised to strings before this method returns, a large number of allocations can be avoided by carefully resuing buffers and containers. This commit reduces allocations by about 75%, which can be very significant for high cardinality workloads. The benchmark results shown below are for a benchmark that asks for all series keys matching `tag5=value0'. name old time/op new time/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 5.66s ± 4% 5.70s ± 5% ~ (p=0.739 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 26.5s ± 8% 26.8s ±12% ~ (p=0.579 n=10+10) IndexSet_TagSets/1M_series/inmem-8 11.9ms ±18% 10.4ms ± 2% -12.81% (p=0.000 n=10+10) IndexSet_TagSets/1M_series/tsi1-8 23.4ms ± 5% 18.9ms ± 1% -19.07% (p=0.000 n=10+9) name old alloc/op new alloc/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 2.50GB ± 0% 2.50GB ± 0% ~ (p=0.315 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 32.6GB ± 0% 32.6GB ± 0% ~ (p=0.247 n=10+10) IndexSet_TagSets/1M_series/inmem-8 3.56MB ± 0% 3.56MB ± 0% ~ (all equal) IndexSet_TagSets/1M_series/tsi1-8 12.7MB ± 0% 5.2MB ± 0% -59.02% (p=0.000 n=10+10) name old allocs/op new allocs/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 24.0M ± 0% 24.0M ± 0% ~ (p=0.353 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 96.6M ± 0% 96.7M ± 0% ~ (p=0.579 n=10+10) IndexSet_TagSets/1M_series/inmem-8 51.0 ± 0% 51.0 ± 0% ~ (all equal) IndexSet_TagSets/1M_series/tsi1-8 80.4k ± 0% 20.4k ± 0% -74.65% (p=0.000 n=10+10)
2018-08-09 14:59:37 +00:00
// NOTE - must not escape this loop iteration.
_, tagsBuf = ParseSeriesKeyInto(key, tagsBuf)
if opt.Authorizer != nil && !opt.Authorizer.AuthorizeSeriesRead(db, name, tagsBuf) {
continue
}
2017-11-29 18:20:18 +00:00
var tagsAsKey []byte
if len(dims) > 0 {
Reduce allocations in TSI TagSets implementation Since all tag sets are materialised to strings before this method returns, a large number of allocations can be avoided by carefully resuing buffers and containers. This commit reduces allocations by about 75%, which can be very significant for high cardinality workloads. The benchmark results shown below are for a benchmark that asks for all series keys matching `tag5=value0'. name old time/op new time/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 5.66s ± 4% 5.70s ± 5% ~ (p=0.739 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 26.5s ± 8% 26.8s ±12% ~ (p=0.579 n=10+10) IndexSet_TagSets/1M_series/inmem-8 11.9ms ±18% 10.4ms ± 2% -12.81% (p=0.000 n=10+10) IndexSet_TagSets/1M_series/tsi1-8 23.4ms ± 5% 18.9ms ± 1% -19.07% (p=0.000 n=10+9) name old alloc/op new alloc/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 2.50GB ± 0% 2.50GB ± 0% ~ (p=0.315 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 32.6GB ± 0% 32.6GB ± 0% ~ (p=0.247 n=10+10) IndexSet_TagSets/1M_series/inmem-8 3.56MB ± 0% 3.56MB ± 0% ~ (all equal) IndexSet_TagSets/1M_series/tsi1-8 12.7MB ± 0% 5.2MB ± 0% -59.02% (p=0.000 n=10+10) name old allocs/op new allocs/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 24.0M ± 0% 24.0M ± 0% ~ (p=0.353 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 96.6M ± 0% 96.7M ± 0% ~ (p=0.579 n=10+10) IndexSet_TagSets/1M_series/inmem-8 51.0 ± 0% 51.0 ± 0% ~ (all equal) IndexSet_TagSets/1M_series/tsi1-8 80.4k ± 0% 20.4k ± 0% -74.65% (p=0.000 n=10+10)
2018-08-09 14:59:37 +00:00
tagsAsKey = MakeTagsKey(dims, tagsBuf)
}
tagSet, ok := tagSets[string(tagsAsKey)]
if !ok {
// This TagSet is new, create a new entry for it.
tagSet = &query.TagSet{
Tags: nil,
Key: tagsAsKey,
2017-11-29 18:20:18 +00:00
}
}
2017-11-29 18:20:18 +00:00
// Associate the series and filter with the Tagset.
Reduce allocations in TSI TagSets implementation Since all tag sets are materialised to strings before this method returns, a large number of allocations can be avoided by carefully resuing buffers and containers. This commit reduces allocations by about 75%, which can be very significant for high cardinality workloads. The benchmark results shown below are for a benchmark that asks for all series keys matching `tag5=value0'. name old time/op new time/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 5.66s ± 4% 5.70s ± 5% ~ (p=0.739 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 26.5s ± 8% 26.8s ±12% ~ (p=0.579 n=10+10) IndexSet_TagSets/1M_series/inmem-8 11.9ms ±18% 10.4ms ± 2% -12.81% (p=0.000 n=10+10) IndexSet_TagSets/1M_series/tsi1-8 23.4ms ± 5% 18.9ms ± 1% -19.07% (p=0.000 n=10+9) name old alloc/op new alloc/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 2.50GB ± 0% 2.50GB ± 0% ~ (p=0.315 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 32.6GB ± 0% 32.6GB ± 0% ~ (p=0.247 n=10+10) IndexSet_TagSets/1M_series/inmem-8 3.56MB ± 0% 3.56MB ± 0% ~ (all equal) IndexSet_TagSets/1M_series/tsi1-8 12.7MB ± 0% 5.2MB ± 0% -59.02% (p=0.000 n=10+10) name old allocs/op new allocs/op delta Index_ConcurrentWriteQuery/inmem/queries_100000-8 24.0M ± 0% 24.0M ± 0% ~ (p=0.353 n=10+10) Index_ConcurrentWriteQuery/tsi1/queries_100000-8 96.6M ± 0% 96.7M ± 0% ~ (p=0.579 n=10+10) IndexSet_TagSets/1M_series/inmem-8 51.0 ± 0% 51.0 ± 0% ~ (all equal) IndexSet_TagSets/1M_series/tsi1-8 80.4k ± 0% 20.4k ± 0% -74.65% (p=0.000 n=10+10)
2018-08-09 14:59:37 +00:00
keyBuf = models.AppendMakeKey(keyBuf, name, tagsBuf)
tagSet.AddFilter(string(keyBuf), se.Expr)
keyBuf = keyBuf[:0]
2017-11-29 18:20:18 +00:00
// Ensure it's back in the map.
tagSets[string(tagsAsKey)] = tagSet
seriesN++
2017-11-29 18:20:18 +00:00
}
// Sort the series in each tag set.
for _, t := range tagSets {
sort.Sort(t)
}
// The TagSets have been created, as a map of TagSets. Just send
// the values back as a slice, sorting for consistency.
sortedTagsSets := make([]*query.TagSet, 0, len(tagSets))
for _, v := range tagSets {
sortedTagsSets = append(sortedTagsSets, v)
}
sort.Sort(byTagKey(sortedTagsSets))
return sortedTagsSets, nil
2017-11-15 23:09:25 +00:00
}
2016-11-15 16:20:00 +00:00
// IndexFormat represents the format for an index.
type IndexFormat int
const (
// InMemFormat is the format used by the original in-memory shared index.
InMemFormat IndexFormat = 1
// TSI1Format is the format used by the tsi1 index.
TSI1Format IndexFormat = 2
)
// NewIndexFunc creates a new index.
type NewIndexFunc func(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) Index
2016-11-15 16:20:00 +00:00
// newIndexFuncs is a lookup of index constructors by name.
var newIndexFuncs = make(map[string]NewIndexFunc)
// RegisterIndex registers a storage index initializer by name.
func RegisterIndex(name string, fn NewIndexFunc) {
if _, ok := newIndexFuncs[name]; ok {
panic("index already registered: " + name)
}
newIndexFuncs[name] = fn
}
// RegisteredIndexes returns the slice of currently registered indexes.
2016-11-15 16:20:00 +00:00
func RegisteredIndexes() []string {
a := make([]string, 0, len(newIndexFuncs))
for k := range newIndexFuncs {
a = append(a, k)
}
sort.Strings(a)
return a
}
// NewIndex returns an instance of an index based on its format.
// If the path does not exist then the DefaultFormat is used.
func NewIndex(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) (Index, error) {
format := options.IndexVersion
// Use default format unless existing directory exists.
_, err := os.Stat(path)
if os.IsNotExist(err) {
// nop, use default
} else if err != nil {
return nil, err
} else if err == nil {
2018-08-21 13:32:30 +00:00
format = TSI1IndexName
2016-11-15 16:20:00 +00:00
}
// Lookup index by format.
fn := newIndexFuncs[format]
if fn == nil {
return nil, fmt.Errorf("invalid index format: %q", format)
}
return fn(id, database, path, seriesIDSet, sfile, options), nil
2016-11-15 16:20:00 +00:00
}
func MustOpenIndex(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) Index {
idx, err := NewIndex(id, database, path, seriesIDSet, sfile, options)
2016-11-17 16:20:39 +00:00
if err != nil {
panic(err)
2017-03-21 18:21:48 +00:00
} else if err := idx.Open(); err != nil {
panic(err)
2016-11-17 16:20:39 +00:00
}
return idx
}
2017-11-29 18:20:18 +00:00
// assert will panic with a given formatted message if the given condition is false.
func assert(condition bool, msg string, v ...interface{}) {
if !condition {
panic(fmt.Sprintf("assert failed: "+msg, v...))
}
}
type byTagKey []*query.TagSet
func (t byTagKey) Len() int { return len(t) }
func (t byTagKey) Less(i, j int) bool { return bytes.Compare(t[i].Key, t[j].Key) < 0 }
func (t byTagKey) Swap(i, j int) { t[i], t[j] = t[j], t[i] }