2016-09-02 14:52:11 +00:00
|
|
|
package tsi1
|
|
|
|
|
|
|
|
import (
|
2016-11-08 21:07:01 +00:00
|
|
|
"bytes"
|
2016-10-18 14:34:51 +00:00
|
|
|
"fmt"
|
2016-11-10 15:45:27 +00:00
|
|
|
"os"
|
|
|
|
"path/filepath"
|
2016-10-18 14:34:51 +00:00
|
|
|
"regexp"
|
2016-10-03 15:08:43 +00:00
|
|
|
"sort"
|
2016-11-21 17:13:55 +00:00
|
|
|
"sync"
|
2016-10-03 15:08:43 +00:00
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
"github.com/influxdata/influxdb/influxql"
|
2016-10-03 15:08:43 +00:00
|
|
|
"github.com/influxdata/influxdb/models"
|
2016-10-18 14:34:51 +00:00
|
|
|
"github.com/influxdata/influxdb/pkg/estimator"
|
|
|
|
"github.com/influxdata/influxdb/tsdb"
|
2016-10-03 15:08:43 +00:00
|
|
|
)
|
|
|
|
|
2016-11-21 17:11:59 +00:00
|
|
|
func init() {
|
|
|
|
tsdb.RegisterIndex("tsi1", func(id uint64, path string, opt tsdb.EngineOptions) tsdb.Index {
|
|
|
|
return &Index{Path: path}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2016-11-10 15:45:27 +00:00
|
|
|
// File extensions.
|
|
|
|
const (
|
|
|
|
LogFileExt = ".tsi.log"
|
|
|
|
IndexFileExt = ".tsi"
|
|
|
|
)
|
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
// Ensure index implements the interface.
|
|
|
|
var _ tsdb.Index = &Index{}
|
2016-09-02 14:52:11 +00:00
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
// Index represents a collection of layered index files and WAL.
|
2016-09-02 14:52:11 +00:00
|
|
|
type Index struct {
|
2016-11-10 15:45:27 +00:00
|
|
|
Path string
|
|
|
|
|
2016-11-21 17:13:55 +00:00
|
|
|
mu sync.RWMutex
|
2016-11-08 21:07:01 +00:00
|
|
|
logFiles []*LogFile
|
|
|
|
indexFiles IndexFiles
|
2016-11-28 16:59:36 +00:00
|
|
|
|
|
|
|
// Fieldset shared with engine.
|
|
|
|
// TODO: Move field management into index.
|
|
|
|
fieldset *tsdb.MeasurementFieldSet
|
2016-09-02 14:52:11 +00:00
|
|
|
}
|
|
|
|
|
2016-10-21 15:48:00 +00:00
|
|
|
// Open opens the index.
|
2016-11-10 15:45:27 +00:00
|
|
|
func (i *Index) Open() error {
|
2016-11-21 17:13:55 +00:00
|
|
|
i.mu.Lock()
|
|
|
|
defer i.mu.Unlock()
|
|
|
|
|
2016-11-10 15:45:27 +00:00
|
|
|
// Open root index directory.
|
|
|
|
f, err := os.Open(i.Path)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
defer f.Close()
|
|
|
|
|
|
|
|
// Open all log & index files.
|
|
|
|
names, err := f.Readdirnames(-1)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
for _, name := range names {
|
|
|
|
switch filepath.Ext(name) {
|
|
|
|
case LogFileExt:
|
|
|
|
if err := i.openLogFile(name); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
case IndexFileExt:
|
|
|
|
if err := i.openIndexFile(name); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Ensure at least one log file exists.
|
|
|
|
if len(i.logFiles) == 0 {
|
|
|
|
path := filepath.Join(i.Path, fmt.Sprintf("%08x%s", 0, LogFileExt))
|
|
|
|
if err := i.openLogFile(path); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// openLogFile opens a log file and appends it to the index.
|
|
|
|
func (i *Index) openLogFile(path string) error {
|
|
|
|
f := NewLogFile()
|
|
|
|
f.Path = path
|
|
|
|
if err := f.Open(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
i.logFiles = append(i.logFiles, f)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// openIndexFile opens a log file and appends it to the index.
|
|
|
|
func (i *Index) openIndexFile(path string) error {
|
|
|
|
f := NewIndexFile()
|
|
|
|
f.Path = path
|
|
|
|
if err := f.Open(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
i.indexFiles = append(i.indexFiles, f)
|
|
|
|
return nil
|
|
|
|
}
|
2016-10-21 15:48:00 +00:00
|
|
|
|
|
|
|
// Close closes the index.
|
2016-11-10 15:45:27 +00:00
|
|
|
func (i *Index) Close() error {
|
2016-11-21 17:13:55 +00:00
|
|
|
i.mu.Lock()
|
|
|
|
defer i.mu.Unlock()
|
|
|
|
|
2016-11-10 15:45:27 +00:00
|
|
|
// Close log files.
|
|
|
|
for _, f := range i.logFiles {
|
|
|
|
f.Close()
|
|
|
|
}
|
|
|
|
i.logFiles = nil
|
|
|
|
|
|
|
|
// Close index files.
|
|
|
|
for _, f := range i.indexFiles {
|
|
|
|
f.Close()
|
|
|
|
}
|
|
|
|
i.indexFiles = nil
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
2016-10-21 15:48:00 +00:00
|
|
|
|
2016-11-28 16:59:36 +00:00
|
|
|
// SetFieldSet sets a shared field set from the engine.
|
|
|
|
func (i *Index) SetFieldSet(fs *tsdb.MeasurementFieldSet) {
|
|
|
|
i.mu.Lock()
|
|
|
|
i.fieldset = fs
|
|
|
|
i.mu.Unlock()
|
|
|
|
}
|
|
|
|
|
2016-11-08 21:07:01 +00:00
|
|
|
// SetLogFiles explicitly sets log files.
|
|
|
|
// TEMPORARY: For testing only.
|
|
|
|
func (i *Index) SetLogFiles(a ...*LogFile) { i.logFiles = a }
|
2016-10-05 15:04:04 +00:00
|
|
|
|
2016-11-08 21:07:01 +00:00
|
|
|
// SetIndexFiles explicitly sets index files
|
|
|
|
// TEMPORARY: For testing only.
|
|
|
|
func (i *Index) SetIndexFiles(a ...*IndexFile) { i.indexFiles = IndexFiles(a) }
|
|
|
|
|
|
|
|
// FileN returns the number of log and index files within the index.
|
|
|
|
func (i *Index) FileN() int { return len(i.logFiles) + len(i.indexFiles) }
|
|
|
|
|
2016-11-10 15:45:27 +00:00
|
|
|
// files returns a list of all log & index files.
|
|
|
|
//
|
|
|
|
// OPTIMIZE(benbjohnson): Convert to an iterator to remove allocation.
|
|
|
|
func (i *Index) files() []File {
|
|
|
|
a := make([]File, 0, len(i.logFiles)+len(i.indexFiles))
|
|
|
|
for _, f := range i.logFiles {
|
|
|
|
a = append(a, f)
|
|
|
|
}
|
|
|
|
for _, f := range i.indexFiles {
|
|
|
|
a = append(a, f)
|
|
|
|
}
|
|
|
|
return a
|
|
|
|
}
|
|
|
|
|
2016-11-28 16:59:36 +00:00
|
|
|
// SeriesIterator returns an iterator over all series in the index.
|
|
|
|
func (i *Index) SeriesIterator() SeriesIterator {
|
|
|
|
a := make([]SeriesIterator, 0, i.FileN())
|
|
|
|
for _, f := range i.files() {
|
|
|
|
itr := f.SeriesIterator()
|
|
|
|
if itr == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
a = append(a, itr)
|
|
|
|
}
|
|
|
|
return MergeSeriesIterators(a...)
|
|
|
|
}
|
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
// Measurement retrieves a measurement by name.
|
|
|
|
func (i *Index) Measurement(name []byte) (*tsdb.Measurement, error) {
|
|
|
|
return i.measurement(name), nil
|
|
|
|
}
|
2016-10-03 15:08:43 +00:00
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
func (i *Index) measurement(name []byte) *tsdb.Measurement {
|
|
|
|
m := tsdb.NewMeasurement(string(name))
|
2016-10-03 15:08:43 +00:00
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
// Iterate over measurement series.
|
2016-11-08 21:07:01 +00:00
|
|
|
itr := i.MeasurementSeriesIterator(name)
|
2016-10-03 15:08:43 +00:00
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
var id uint64 // TEMPORARY
|
2016-10-27 15:47:41 +00:00
|
|
|
for e := itr.Next(); e != nil; e = itr.Next() {
|
2016-10-18 14:34:51 +00:00
|
|
|
// TODO: Handle deleted series.
|
2016-09-02 14:52:11 +00:00
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
// Append series to to measurement.
|
|
|
|
// TODO: Remove concept of series ids.
|
|
|
|
m.AddSeries(&tsdb.Series{
|
|
|
|
ID: id,
|
2016-10-31 14:46:07 +00:00
|
|
|
Key: string(e.Name()),
|
|
|
|
Tags: models.CopyTags(e.Tags()),
|
2016-10-18 14:34:51 +00:00
|
|
|
})
|
2016-09-02 14:52:11 +00:00
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
// TEMPORARY: Increment ID.
|
|
|
|
id++
|
2016-10-05 15:04:04 +00:00
|
|
|
}
|
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
if !m.HasSeries() {
|
|
|
|
return nil
|
2016-10-05 15:04:04 +00:00
|
|
|
}
|
2016-10-18 14:34:51 +00:00
|
|
|
return m
|
2016-10-05 15:04:04 +00:00
|
|
|
}
|
|
|
|
|
2016-11-08 21:07:01 +00:00
|
|
|
// MeasurementSeriesIterator returns an iterator over all series in the index.
|
|
|
|
func (i *Index) MeasurementSeriesIterator(name []byte) SeriesIterator {
|
|
|
|
a := make([]SeriesIterator, 0, i.FileN())
|
2016-11-29 18:09:33 +00:00
|
|
|
for _, f := range i.files() {
|
|
|
|
itr := f.MeasurementSeriesIterator(name)
|
|
|
|
if itr != nil {
|
|
|
|
a = append(a, itr)
|
|
|
|
}
|
2016-11-08 21:07:01 +00:00
|
|
|
}
|
|
|
|
return MergeSeriesIterators(a...)
|
|
|
|
}
|
|
|
|
|
2016-11-29 18:09:33 +00:00
|
|
|
// TagKeyIterator returns an iterator over all tag keys for a measurement.
|
|
|
|
func (i *Index) TagKeyIterator(name []byte) TagKeyIterator {
|
|
|
|
a := make([]TagKeyIterator, 0, i.FileN())
|
|
|
|
for _, f := range i.files() {
|
|
|
|
itr := f.TagKeyIterator(name)
|
|
|
|
if itr != nil {
|
|
|
|
a = append(a, itr)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return MergeTagKeyIterators(a...)
|
|
|
|
}
|
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
// Measurements returns a list of all measurements.
|
|
|
|
func (i *Index) Measurements() (tsdb.Measurements, error) {
|
|
|
|
var mms tsdb.Measurements
|
2016-11-08 21:07:01 +00:00
|
|
|
itr := i.MeasurementIterator()
|
2016-10-18 14:34:51 +00:00
|
|
|
for e := itr.Next(); e != nil; e = itr.Next() {
|
2016-10-31 14:46:07 +00:00
|
|
|
mms = append(mms, i.measurement(e.Name()))
|
2016-10-05 15:04:04 +00:00
|
|
|
}
|
2016-10-18 14:34:51 +00:00
|
|
|
return mms, nil
|
2016-10-05 15:04:04 +00:00
|
|
|
}
|
|
|
|
|
2016-11-08 21:07:01 +00:00
|
|
|
// MeasurementIterator returns an iterator over all measurements in the index.
|
|
|
|
func (i *Index) MeasurementIterator() MeasurementIterator {
|
|
|
|
a := make([]MeasurementIterator, 0, i.FileN())
|
|
|
|
for _, f := range i.logFiles {
|
|
|
|
a = append(a, f.MeasurementIterator())
|
|
|
|
}
|
|
|
|
for _, f := range i.indexFiles {
|
|
|
|
a = append(a, f.MeasurementIterator())
|
|
|
|
}
|
|
|
|
return MergeMeasurementIterators(a...)
|
|
|
|
}
|
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
func (i *Index) MeasurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool, error) {
|
|
|
|
return i.measurementsByExpr(expr)
|
2016-10-03 15:08:43 +00:00
|
|
|
}
|
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
func (i *Index) measurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool, error) {
|
|
|
|
if expr == nil {
|
|
|
|
return nil, false, nil
|
2016-10-03 15:08:43 +00:00
|
|
|
}
|
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
switch e := expr.(type) {
|
|
|
|
case *influxql.BinaryExpr:
|
|
|
|
switch e.Op {
|
|
|
|
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
|
|
|
|
tag, ok := e.LHS.(*influxql.VarRef)
|
|
|
|
if !ok {
|
|
|
|
return nil, false, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String())
|
|
|
|
}
|
|
|
|
|
|
|
|
// Retrieve value or regex expression from RHS.
|
|
|
|
var value string
|
|
|
|
var regex *regexp.Regexp
|
|
|
|
if influxql.IsRegexOp(e.Op) {
|
|
|
|
re, ok := e.RHS.(*influxql.RegexLiteral)
|
|
|
|
if !ok {
|
|
|
|
return nil, false, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String())
|
|
|
|
}
|
|
|
|
regex = re.Val
|
|
|
|
} else {
|
|
|
|
s, ok := e.RHS.(*influxql.StringLiteral)
|
|
|
|
if !ok {
|
|
|
|
return nil, false, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String())
|
|
|
|
}
|
|
|
|
value = s.Val
|
|
|
|
}
|
|
|
|
|
|
|
|
// Match on name, if specified.
|
|
|
|
if tag.Val == "_name" {
|
|
|
|
return i.measurementsByNameFilter(e.Op, value, regex), true, nil
|
|
|
|
} else if influxql.IsSystemName(tag.Val) {
|
|
|
|
return nil, false, nil
|
|
|
|
}
|
|
|
|
return i.measurementsByTagFilter(e.Op, tag.Val, value, regex), true, nil
|
|
|
|
|
|
|
|
case influxql.OR, influxql.AND:
|
|
|
|
lhsIDs, lhsOk, err := i.measurementsByExpr(e.LHS)
|
|
|
|
if err != nil {
|
|
|
|
return nil, false, err
|
|
|
|
}
|
|
|
|
|
|
|
|
rhsIDs, rhsOk, err := i.measurementsByExpr(e.RHS)
|
|
|
|
if err != nil {
|
|
|
|
return nil, false, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if lhsOk && rhsOk {
|
|
|
|
if e.Op == influxql.OR {
|
|
|
|
return lhsIDs.Union(rhsIDs), true, nil
|
|
|
|
}
|
|
|
|
return lhsIDs.Intersect(rhsIDs), true, nil
|
|
|
|
} else if lhsOk {
|
|
|
|
return lhsIDs, true, nil
|
|
|
|
} else if rhsOk {
|
|
|
|
return rhsIDs, true, nil
|
|
|
|
}
|
|
|
|
return nil, false, nil
|
|
|
|
|
|
|
|
default:
|
|
|
|
return nil, false, fmt.Errorf("invalid tag comparison operator")
|
2016-10-03 15:08:43 +00:00
|
|
|
}
|
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
case *influxql.ParenExpr:
|
|
|
|
return i.measurementsByExpr(e.Expr)
|
|
|
|
default:
|
|
|
|
return nil, false, fmt.Errorf("%#v", expr)
|
2016-10-03 15:08:43 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
// measurementsByNameFilter returns the sorted measurements matching a name.
|
|
|
|
func (i *Index) measurementsByNameFilter(op influxql.Token, val string, regex *regexp.Regexp) tsdb.Measurements {
|
|
|
|
var mms tsdb.Measurements
|
2016-11-08 21:07:01 +00:00
|
|
|
itr := i.MeasurementIterator()
|
2016-10-18 14:34:51 +00:00
|
|
|
for e := itr.Next(); e != nil; e = itr.Next() {
|
|
|
|
var matched bool
|
|
|
|
switch op {
|
|
|
|
case influxql.EQ:
|
2016-10-31 14:46:07 +00:00
|
|
|
matched = string(e.Name()) == val
|
2016-10-18 14:34:51 +00:00
|
|
|
case influxql.NEQ:
|
2016-10-31 14:46:07 +00:00
|
|
|
matched = string(e.Name()) != val
|
2016-10-18 14:34:51 +00:00
|
|
|
case influxql.EQREGEX:
|
2016-10-31 14:46:07 +00:00
|
|
|
matched = regex.Match(e.Name())
|
2016-10-18 14:34:51 +00:00
|
|
|
case influxql.NEQREGEX:
|
2016-10-31 14:46:07 +00:00
|
|
|
matched = !regex.Match(e.Name())
|
2016-10-18 14:34:51 +00:00
|
|
|
}
|
2016-10-03 15:08:43 +00:00
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
if matched {
|
2016-10-31 14:46:07 +00:00
|
|
|
mms = append(mms, i.measurement(e.Name()))
|
2016-10-18 14:34:51 +00:00
|
|
|
}
|
2016-10-03 15:08:43 +00:00
|
|
|
}
|
2016-10-18 14:34:51 +00:00
|
|
|
sort.Sort(mms)
|
|
|
|
return mms
|
2016-10-03 15:08:43 +00:00
|
|
|
}
|
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
func (i *Index) measurementsByTagFilter(op influxql.Token, key, val string, regex *regexp.Regexp) tsdb.Measurements {
|
|
|
|
var mms tsdb.Measurements
|
2016-11-08 21:07:01 +00:00
|
|
|
itr := i.MeasurementIterator()
|
2016-10-18 14:34:51 +00:00
|
|
|
for e := itr.Next(); e != nil; e = itr.Next() {
|
2016-10-31 14:46:07 +00:00
|
|
|
mm := i.measurement(e.Name())
|
2016-10-03 15:08:43 +00:00
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
tagVals := mm.SeriesByTagKeyValue(key)
|
|
|
|
if tagVals == nil {
|
|
|
|
continue
|
2016-10-03 15:08:43 +00:00
|
|
|
}
|
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
// If the operator is non-regex, only check the specified value.
|
|
|
|
var tagMatch bool
|
|
|
|
if op == influxql.EQ || op == influxql.NEQ {
|
|
|
|
if _, ok := tagVals[val]; ok {
|
|
|
|
tagMatch = true
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// Else, the operator is a regex and we have to check all tag
|
|
|
|
// values against the regular expression.
|
|
|
|
for tagVal := range tagVals {
|
|
|
|
if regex.MatchString(tagVal) {
|
|
|
|
tagMatch = true
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
2016-10-03 15:08:43 +00:00
|
|
|
}
|
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
//
|
|
|
|
// XNOR gate
|
|
|
|
//
|
|
|
|
// tags match | operation is EQ | measurement matches
|
|
|
|
// --------------------------------------------------
|
|
|
|
// True | True | True
|
|
|
|
// True | False | False
|
|
|
|
// False | True | False
|
|
|
|
// False | False | True
|
|
|
|
if tagMatch == (op == influxql.EQ || op == influxql.EQREGEX) {
|
|
|
|
mms = append(mms, mm)
|
|
|
|
break
|
2016-10-03 15:08:43 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
sort.Sort(mms)
|
|
|
|
return mms
|
2016-10-03 15:08:43 +00:00
|
|
|
}
|
|
|
|
|
2016-11-08 21:07:01 +00:00
|
|
|
func (i *Index) MeasurementsByName(names [][]byte) ([]*tsdb.Measurement, error) {
|
|
|
|
itr := i.MeasurementIterator()
|
2016-10-18 14:34:51 +00:00
|
|
|
mms := make([]*tsdb.Measurement, 0, len(names))
|
|
|
|
for e := itr.Next(); e != nil; e = itr.Next() {
|
|
|
|
for _, name := range names {
|
2016-11-08 21:07:01 +00:00
|
|
|
if bytes.Equal(e.Name(), name) {
|
2016-10-31 14:46:07 +00:00
|
|
|
mms = append(mms, i.measurement(e.Name()))
|
2016-10-18 14:34:51 +00:00
|
|
|
break
|
|
|
|
}
|
2016-10-03 15:08:43 +00:00
|
|
|
}
|
|
|
|
}
|
2016-10-18 14:34:51 +00:00
|
|
|
return mms, nil
|
2016-09-02 14:52:11 +00:00
|
|
|
}
|
2016-10-03 15:08:43 +00:00
|
|
|
|
2016-11-11 16:25:53 +00:00
|
|
|
func (i *Index) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
|
2016-11-08 21:07:01 +00:00
|
|
|
itr := i.MeasurementIterator()
|
2016-11-11 16:25:53 +00:00
|
|
|
var a [][]byte
|
2016-10-18 14:34:51 +00:00
|
|
|
for e := itr.Next(); e != nil; e = itr.Next() {
|
2016-10-31 14:46:07 +00:00
|
|
|
if re.Match(e.Name()) {
|
2016-11-11 16:25:53 +00:00
|
|
|
a = append(a, e.Name())
|
2016-10-03 15:08:43 +00:00
|
|
|
}
|
|
|
|
}
|
2016-11-11 16:25:53 +00:00
|
|
|
return a, nil
|
2016-10-03 15:08:43 +00:00
|
|
|
}
|
|
|
|
|
2016-11-10 15:45:27 +00:00
|
|
|
// DropMeasurement deletes a measurement from the index.
|
2016-10-18 14:34:51 +00:00
|
|
|
func (i *Index) DropMeasurement(name []byte) error {
|
2016-11-29 18:09:33 +00:00
|
|
|
// Delete all keys and values.
|
|
|
|
if kitr := i.TagKeyIterator(name); kitr != nil {
|
|
|
|
for k := kitr.Next(); k != nil; k = kitr.Next() {
|
|
|
|
// Delete key if not already deleted.
|
|
|
|
if !k.Deleted() {
|
|
|
|
if err := i.logFiles[0].DeleteTagKey(name, k.Key()); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Delete each value in key.
|
|
|
|
if vitr := k.TagValueIterator(); vitr != nil {
|
|
|
|
for v := vitr.Next(); v != nil; v = vitr.Next() {
|
|
|
|
if !v.Deleted() {
|
|
|
|
if err := i.logFiles[0].DeleteTagValue(name, k.Key(), v.Value()); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Delete all series in measurement.
|
|
|
|
if sitr := i.MeasurementSeriesIterator(name); sitr != nil {
|
|
|
|
for s := sitr.Next(); s != nil; s = sitr.Next() {
|
|
|
|
if !s.Deleted() {
|
|
|
|
if err := i.logFiles[0].DeleteSeries(s.Name(), s.Tags()); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Mark measurement as deleted.
|
2016-11-10 15:45:27 +00:00
|
|
|
return i.logFiles[0].DeleteMeasurement(name)
|
2016-10-03 15:08:43 +00:00
|
|
|
}
|
|
|
|
|
2016-11-10 15:45:27 +00:00
|
|
|
// CreateSeriesIfNotExists creates a series if it doesn't exist or is deleted.
|
2016-11-21 15:21:58 +00:00
|
|
|
func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error {
|
2016-11-27 16:34:03 +00:00
|
|
|
if e := i.Series(name, tags); e != nil {
|
2016-11-10 15:45:27 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return i.logFiles[0].AddSeries(name, tags)
|
2016-10-03 15:08:43 +00:00
|
|
|
}
|
|
|
|
|
2016-11-29 18:09:33 +00:00
|
|
|
// Series returns a series by name/tags.
|
2016-11-10 15:45:27 +00:00
|
|
|
func (i *Index) Series(name []byte, tags models.Tags) SeriesElem {
|
|
|
|
for _, f := range i.files() {
|
2016-11-29 18:09:33 +00:00
|
|
|
if e := f.Series(name, tags); e != nil && !e.Deleted() {
|
2016-11-10 15:45:27 +00:00
|
|
|
return e
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
2016-10-03 15:08:43 +00:00
|
|
|
}
|
|
|
|
|
2016-11-08 21:07:01 +00:00
|
|
|
func (i *Index) DropSeries(keys [][]byte) error {
|
2016-11-11 16:25:53 +00:00
|
|
|
for _, key := range keys {
|
|
|
|
name, tags, err := models.ParseKey(key)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := i.logFiles[0].DeleteSeries([]byte(name), tags); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
2016-10-03 15:08:43 +00:00
|
|
|
}
|
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
func (i *Index) SeriesN() (n uint64, err error) {
|
2016-11-21 17:13:55 +00:00
|
|
|
// FIXME(edd): Use sketches.
|
2016-11-11 16:25:53 +00:00
|
|
|
|
|
|
|
// HACK(benbjohnson): Use first log file until edd adds sketches.
|
|
|
|
return i.logFiles[0].SeriesN(), nil
|
2016-10-03 15:08:43 +00:00
|
|
|
}
|
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
|
2016-11-21 17:13:55 +00:00
|
|
|
//FIXME(edd)
|
|
|
|
return nil, nil, fmt.Errorf("SeriesSketches not implemented")
|
|
|
|
|
2016-10-03 15:08:43 +00:00
|
|
|
}
|
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
|
2016-11-21 17:13:55 +00:00
|
|
|
//FIXME(edd)
|
|
|
|
return nil, nil, fmt.Errorf("MeasurementSketches not implemented")
|
|
|
|
|
2016-10-18 14:34:51 +00:00
|
|
|
}
|
2016-10-21 15:48:00 +00:00
|
|
|
|
|
|
|
// Dereference is a nop.
|
|
|
|
func (i *Index) Dereference([]byte) {}
|
2016-11-08 21:07:01 +00:00
|
|
|
|
|
|
|
// TagKeySeriesIterator returns a series iterator for all values across a single key.
|
|
|
|
func (i *Index) TagKeySeriesIterator(name, key []byte) SeriesIterator {
|
2016-11-27 16:34:03 +00:00
|
|
|
a := make([]SeriesIterator, 0, i.FileN())
|
|
|
|
for _, f := range i.files() {
|
2016-11-29 18:09:33 +00:00
|
|
|
itr := f.TagKeySeriesIterator(name, key)
|
2016-11-27 16:34:03 +00:00
|
|
|
if itr != nil {
|
|
|
|
a = append(a, itr)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return MergeSeriesIterators(a...)
|
2016-11-08 21:07:01 +00:00
|
|
|
}
|
|
|
|
|
2016-11-27 20:15:32 +00:00
|
|
|
// TagValueIterator returns a value iterator for a tag key.
|
|
|
|
func (i *Index) TagValueIterator(name, key []byte) TagValueIterator {
|
|
|
|
a := make([]TagValueIterator, 0, i.FileN())
|
|
|
|
for _, f := range i.files() {
|
2016-11-29 18:09:33 +00:00
|
|
|
itr := f.TagValueIterator(name, key)
|
2016-11-27 20:15:32 +00:00
|
|
|
if itr != nil {
|
|
|
|
a = append(a, itr)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return MergeTagValueIterators(a...)
|
|
|
|
}
|
|
|
|
|
2016-11-08 21:07:01 +00:00
|
|
|
// TagValueSeriesIterator returns a series iterator for a single tag value.
|
|
|
|
func (i *Index) TagValueSeriesIterator(name, key, value []byte) SeriesIterator {
|
2016-11-27 16:34:03 +00:00
|
|
|
a := make([]SeriesIterator, 0, i.FileN())
|
|
|
|
for _, f := range i.files() {
|
2016-11-29 18:09:33 +00:00
|
|
|
itr := f.TagValueSeriesIterator(name, key, value)
|
2016-11-27 16:34:03 +00:00
|
|
|
if itr != nil {
|
|
|
|
a = append(a, itr)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return MergeSeriesIterators(a...)
|
2016-11-08 21:07:01 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// MatchTagValueSeriesIterator returns a series iterator for tags which match value.
|
|
|
|
// If matches is false, returns iterators which do not match value.
|
|
|
|
func (i *Index) MatchTagValueSeriesIterator(name, key []byte, value *regexp.Regexp, matches bool) SeriesIterator {
|
2016-11-27 20:15:32 +00:00
|
|
|
matchEmpty := value.MatchString("")
|
2016-11-08 21:07:01 +00:00
|
|
|
|
2016-11-27 20:15:32 +00:00
|
|
|
if matches {
|
|
|
|
if matchEmpty {
|
|
|
|
return i.matchTagValueEqualEmptySeriesIterator(name, key, value)
|
2016-11-08 21:07:01 +00:00
|
|
|
}
|
2016-11-27 20:15:32 +00:00
|
|
|
return i.matchTagValueEqualNotEmptySeriesIterator(name, key, value)
|
|
|
|
}
|
2016-11-08 21:07:01 +00:00
|
|
|
|
2016-11-27 20:15:32 +00:00
|
|
|
if matchEmpty {
|
|
|
|
return i.matchTagValueNotEqualEmptySeriesIterator(name, key, value)
|
|
|
|
}
|
|
|
|
return i.matchTagValueNotEqualNotEmptySeriesIterator(name, key, value)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (i *Index) matchTagValueEqualEmptySeriesIterator(name, key []byte, value *regexp.Regexp) SeriesIterator {
|
|
|
|
vitr := i.TagValueIterator(name, key)
|
|
|
|
if vitr == nil {
|
|
|
|
return i.MeasurementSeriesIterator(name)
|
|
|
|
}
|
|
|
|
|
|
|
|
var itrs []SeriesIterator
|
|
|
|
for e := vitr.Next(); e != nil; e = vitr.Next() {
|
|
|
|
if !value.Match(e.Value()) {
|
|
|
|
itrs = append(itrs, i.TagValueSeriesIterator(name, key, e.Value()))
|
2016-11-08 21:07:01 +00:00
|
|
|
}
|
2016-11-27 20:15:32 +00:00
|
|
|
}
|
2016-11-08 21:07:01 +00:00
|
|
|
|
2016-11-27 20:15:32 +00:00
|
|
|
return DifferenceSeriesIterators(
|
|
|
|
i.MeasurementSeriesIterator(name),
|
|
|
|
MergeSeriesIterators(itrs...),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (i *Index) matchTagValueEqualNotEmptySeriesIterator(name, key []byte, value *regexp.Regexp) SeriesIterator {
|
|
|
|
vitr := i.TagValueIterator(name, key)
|
|
|
|
if vitr == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
var itrs []SeriesIterator
|
|
|
|
for e := vitr.Next(); e != nil; e = vitr.Next() {
|
|
|
|
if value.Match(e.Value()) {
|
|
|
|
itrs = append(itrs, i.TagValueSeriesIterator(name, key, e.Value()))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return MergeSeriesIterators(itrs...)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (i *Index) matchTagValueNotEqualEmptySeriesIterator(name, key []byte, value *regexp.Regexp) SeriesIterator {
|
|
|
|
vitr := i.TagValueIterator(name, key)
|
|
|
|
if vitr == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
var itrs []SeriesIterator
|
|
|
|
for e := vitr.Next(); e != nil; e = vitr.Next() {
|
|
|
|
if !value.Match(e.Value()) {
|
|
|
|
itrs = append(itrs, i.TagValueSeriesIterator(name, key, e.Value()))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return MergeSeriesIterators(itrs...)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (i *Index) matchTagValueNotEqualNotEmptySeriesIterator(name, key []byte, value *regexp.Regexp) SeriesIterator {
|
|
|
|
vitr := i.TagValueIterator(name, key)
|
|
|
|
if vitr == nil {
|
|
|
|
return i.MeasurementSeriesIterator(name)
|
|
|
|
}
|
|
|
|
|
|
|
|
var itrs []SeriesIterator
|
|
|
|
for e := vitr.Next(); e != nil; e = vitr.Next() {
|
|
|
|
if value.Match(e.Value()) {
|
|
|
|
itrs = append(itrs, i.TagValueSeriesIterator(name, key, e.Value()))
|
2016-11-08 21:07:01 +00:00
|
|
|
}
|
2016-11-27 20:15:32 +00:00
|
|
|
}
|
2016-11-08 21:07:01 +00:00
|
|
|
|
2016-11-27 20:15:32 +00:00
|
|
|
return DifferenceSeriesIterators(
|
|
|
|
i.MeasurementSeriesIterator(name),
|
|
|
|
MergeSeriesIterators(itrs...),
|
|
|
|
)
|
2016-11-08 21:07:01 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// TagSets returns an ordered list of tag sets for a measurement by dimension
|
|
|
|
// and filtered by an optional conditional expression.
|
2016-11-28 16:59:36 +00:00
|
|
|
func (i *Index) TagSets(name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
|
|
|
|
itr, err := i.MeasurementSeriesByExprIterator(name, condition)
|
2016-11-11 16:25:53 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
} else if itr == nil {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// For every series, get the tag values for the requested tag keys i.e.
|
|
|
|
// dimensions. This is the TagSet for that series. Series with the same
|
|
|
|
// TagSet are then grouped together, because for the purpose of GROUP BY
|
|
|
|
// they are part of the same composite series.
|
|
|
|
tagSets := make(map[string]*influxql.TagSet, 64)
|
|
|
|
|
|
|
|
for e := itr.Next(); e != nil; e = itr.Next() {
|
|
|
|
tags := make(map[string]string, len(dimensions))
|
|
|
|
|
|
|
|
// Build the TagSet for this series.
|
|
|
|
for _, dim := range dimensions {
|
|
|
|
tags[dim] = e.Tags().GetString(dim)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Convert the TagSet to a string, so it can be added to a map
|
|
|
|
// allowing TagSets to be handled as a set.
|
|
|
|
tagsAsKey := tsdb.MarshalTags(tags)
|
|
|
|
tagSet, ok := tagSets[string(tagsAsKey)]
|
|
|
|
if !ok {
|
|
|
|
// This TagSet is new, create a new entry for it.
|
|
|
|
tagSet = &influxql.TagSet{
|
|
|
|
Tags: tags,
|
|
|
|
Key: tagsAsKey,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Associate the series and filter with the Tagset.
|
|
|
|
tagSet.AddFilter(string(SeriesElemKey(e)), e.Expr())
|
|
|
|
|
|
|
|
// Ensure it's back in the map.
|
|
|
|
tagSets[string(tagsAsKey)] = tagSet
|
|
|
|
}
|
|
|
|
|
|
|
|
// Sort the series in each tag set.
|
|
|
|
for _, t := range tagSets {
|
|
|
|
sort.Sort(t)
|
|
|
|
}
|
|
|
|
|
|
|
|
// The TagSets have been created, as a map of TagSets. Just send
|
|
|
|
// the values back as a slice, sorting for consistency.
|
|
|
|
sortedTagsSets := make([]*influxql.TagSet, 0, len(tagSets))
|
|
|
|
for _, v := range tagSets {
|
|
|
|
sortedTagsSets = append(sortedTagsSets, v)
|
|
|
|
}
|
|
|
|
sort.Sort(byTagKey(sortedTagsSets))
|
|
|
|
|
|
|
|
return sortedTagsSets, nil
|
2016-11-08 21:07:01 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// MeasurementSeriesByExprIterator returns a series iterator for a measurement
|
|
|
|
// that is filtered by expr. If expr only contains time expressions then this
|
|
|
|
// call is equivalent to MeasurementSeriesIterator().
|
2016-11-28 16:59:36 +00:00
|
|
|
func (i *Index) MeasurementSeriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIterator, error) {
|
2016-11-08 21:07:01 +00:00
|
|
|
// Return all series for the measurement if there are no tag expressions.
|
|
|
|
if expr == nil || influxql.OnlyTimeExpr(expr) {
|
|
|
|
return i.MeasurementSeriesIterator(name), nil
|
|
|
|
}
|
2016-11-28 16:59:36 +00:00
|
|
|
return i.seriesByExprIterator(name, expr, i.fieldset.CreateFieldsIfNotExists(string(name)))
|
2016-11-08 21:07:01 +00:00
|
|
|
}
|
|
|
|
|
2016-11-27 20:15:32 +00:00
|
|
|
func (i *Index) seriesByExprIterator(name []byte, expr influxql.Expr, mf *tsdb.MeasurementFields) (SeriesIterator, error) {
|
2016-11-08 21:07:01 +00:00
|
|
|
switch expr := expr.(type) {
|
|
|
|
case *influxql.BinaryExpr:
|
|
|
|
switch expr.Op {
|
|
|
|
case influxql.AND, influxql.OR:
|
|
|
|
// Get the series IDs and filter expressions for the LHS.
|
2016-11-27 20:15:32 +00:00
|
|
|
litr, err := i.seriesByExprIterator(name, expr.LHS, mf)
|
2016-11-08 21:07:01 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get the series IDs and filter expressions for the RHS.
|
2016-11-27 20:15:32 +00:00
|
|
|
ritr, err := i.seriesByExprIterator(name, expr.RHS, mf)
|
2016-11-08 21:07:01 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Intersect iterators if expression is "AND".
|
|
|
|
if expr.Op == influxql.AND {
|
|
|
|
return IntersectSeriesIterators(litr, ritr), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Union iterators if expression is "OR".
|
|
|
|
return UnionSeriesIterators(litr, ritr), nil
|
|
|
|
|
|
|
|
default:
|
2016-11-27 20:15:32 +00:00
|
|
|
return i.seriesByBinaryExprIterator(name, expr, mf)
|
2016-11-08 21:07:01 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
case *influxql.ParenExpr:
|
2016-11-27 20:15:32 +00:00
|
|
|
return i.seriesByExprIterator(name, expr.Expr, mf)
|
2016-11-08 21:07:01 +00:00
|
|
|
|
|
|
|
default:
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// seriesByBinaryExprIterator returns a series iterator and a filtering expression.
|
2016-11-27 20:15:32 +00:00
|
|
|
func (i *Index) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExpr, mf *tsdb.MeasurementFields) (SeriesIterator, error) {
|
2016-11-08 21:07:01 +00:00
|
|
|
// If this binary expression has another binary expression, then this
|
|
|
|
// is some expression math and we should just pass it to the underlying query.
|
|
|
|
if _, ok := n.LHS.(*influxql.BinaryExpr); ok {
|
|
|
|
return newSeriesExprIterator(i.MeasurementSeriesIterator(name), n), nil
|
|
|
|
} else if _, ok := n.RHS.(*influxql.BinaryExpr); ok {
|
|
|
|
return newSeriesExprIterator(i.MeasurementSeriesIterator(name), n), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Retrieve the variable reference from the correct side of the expression.
|
|
|
|
key, ok := n.LHS.(*influxql.VarRef)
|
|
|
|
value := n.RHS
|
|
|
|
if !ok {
|
|
|
|
key, ok = n.RHS.(*influxql.VarRef)
|
|
|
|
if !ok {
|
|
|
|
return nil, fmt.Errorf("invalid expression: %s", n.String())
|
|
|
|
}
|
|
|
|
value = n.LHS
|
|
|
|
}
|
|
|
|
|
|
|
|
// For time literals, return all series and "true" as the filter.
|
|
|
|
if _, ok := value.(*influxql.TimeLiteral); ok || key.Val == "time" {
|
|
|
|
return newSeriesExprIterator(i.MeasurementSeriesIterator(name), &influxql.BooleanLiteral{Val: true}), nil
|
|
|
|
}
|
|
|
|
|
2016-11-27 20:15:32 +00:00
|
|
|
// For fields, return all series from this measurement.
|
|
|
|
if key.Val != "_name" && ((key.Type == influxql.Unknown && mf.HasField(key.Val)) || key.Type == influxql.AnyField || (key.Type != influxql.Tag && key.Type != influxql.Unknown)) {
|
|
|
|
return newSeriesExprIterator(i.MeasurementSeriesIterator(name), n), nil
|
|
|
|
} else if value, ok := value.(*influxql.VarRef); ok {
|
|
|
|
// Check if the RHS is a variable and if it is a field.
|
|
|
|
if value.Val != "_name" && ((value.Type == influxql.Unknown && mf.HasField(value.Val)) || key.Type == influxql.AnyField || (value.Type != influxql.Tag && value.Type != influxql.Unknown)) {
|
2016-11-08 21:07:01 +00:00
|
|
|
return newSeriesExprIterator(i.MeasurementSeriesIterator(name), n), nil
|
|
|
|
}
|
2016-11-27 20:15:32 +00:00
|
|
|
}
|
2016-11-08 21:07:01 +00:00
|
|
|
|
|
|
|
// Create iterator based on value type.
|
|
|
|
switch value := value.(type) {
|
|
|
|
case *influxql.StringLiteral:
|
|
|
|
return i.seriesByBinaryExprStringIterator(name, []byte(key.Val), []byte(value.Val), n.Op)
|
|
|
|
case *influxql.RegexLiteral:
|
|
|
|
return i.seriesByBinaryExprRegexIterator(name, []byte(key.Val), value.Val, n.Op)
|
|
|
|
case *influxql.VarRef:
|
|
|
|
return i.seriesByBinaryExprVarRefIterator(name, []byte(key.Val), value, n.Op)
|
|
|
|
default:
|
|
|
|
if n.Op == influxql.NEQ || n.Op == influxql.NEQREGEX {
|
|
|
|
return i.MeasurementSeriesIterator(name), nil
|
|
|
|
}
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (i *Index) seriesByBinaryExprStringIterator(name, key, value []byte, op influxql.Token) (SeriesIterator, error) {
|
|
|
|
// Special handling for "_name" to match measurement name.
|
|
|
|
if bytes.Equal(key, []byte("_name")) {
|
|
|
|
if (op == influxql.EQ && bytes.Equal(value, name)) || (op == influxql.NEQ && !bytes.Equal(value, name)) {
|
|
|
|
return i.MeasurementSeriesIterator(name), nil
|
|
|
|
}
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if op == influxql.EQ {
|
|
|
|
// Match a specific value.
|
|
|
|
if len(value) != 0 {
|
|
|
|
return i.TagValueSeriesIterator(name, key, value), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Return all measurement series that have no values from this tag key.
|
|
|
|
return DifferenceSeriesIterators(
|
|
|
|
i.MeasurementSeriesIterator(name),
|
|
|
|
i.TagKeySeriesIterator(name, key),
|
|
|
|
), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Return all measurement series without this tag value.
|
|
|
|
if len(value) != 0 {
|
|
|
|
return DifferenceSeriesIterators(
|
|
|
|
i.MeasurementSeriesIterator(name),
|
|
|
|
i.TagValueSeriesIterator(name, key, value),
|
|
|
|
), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Return all series across all values of this tag key.
|
|
|
|
return i.TagKeySeriesIterator(name, key), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (i *Index) seriesByBinaryExprRegexIterator(name, key []byte, value *regexp.Regexp, op influxql.Token) (SeriesIterator, error) {
|
|
|
|
// Special handling for "_name" to match measurement name.
|
|
|
|
if bytes.Equal(key, []byte("_name")) {
|
|
|
|
match := value.Match(name)
|
|
|
|
if (op == influxql.EQREGEX && match) || (op == influxql.NEQREGEX && !match) {
|
|
|
|
return newSeriesExprIterator(i.MeasurementSeriesIterator(name), &influxql.BooleanLiteral{Val: true}), nil
|
|
|
|
}
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
return i.MatchTagValueSeriesIterator(name, key, value, op == influxql.EQREGEX), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (i *Index) seriesByBinaryExprVarRefIterator(name, key []byte, value *influxql.VarRef, op influxql.Token) (SeriesIterator, error) {
|
|
|
|
if op == influxql.EQ {
|
|
|
|
return IntersectSeriesIterators(
|
|
|
|
i.TagKeySeriesIterator(name, key),
|
|
|
|
i.TagKeySeriesIterator(name, []byte(value.Val)),
|
|
|
|
), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return DifferenceSeriesIterators(
|
|
|
|
i.TagKeySeriesIterator(name, key),
|
|
|
|
i.TagKeySeriesIterator(name, []byte(value.Val)),
|
|
|
|
), nil
|
|
|
|
}
|
|
|
|
|
2016-11-16 18:57:55 +00:00
|
|
|
func (i *Index) SetFieldName(measurement, name string) {}
|
|
|
|
func (i *Index) RemoveShard(shardID uint64) {}
|
|
|
|
func (i *Index) AssignShard(k string, shardID uint64) {}
|
|
|
|
func (i *Index) UnassignShard(k string, shardID uint64) {}
|
|
|
|
|
2016-11-28 16:59:36 +00:00
|
|
|
// SeriesPointIterator returns an influxql iterator over all series.
|
|
|
|
func (i *Index) SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
|
|
|
return newSeriesPointIterator(i, opt), nil
|
|
|
|
}
|
|
|
|
|
2016-11-10 15:45:27 +00:00
|
|
|
// File represents a log or index file.
|
|
|
|
type File interface {
|
2016-11-27 16:34:03 +00:00
|
|
|
Measurement(name []byte) MeasurementElem
|
2016-11-29 18:09:33 +00:00
|
|
|
Series(name []byte, tags models.Tags) SeriesElem
|
2016-11-27 20:15:32 +00:00
|
|
|
|
2016-11-29 18:09:33 +00:00
|
|
|
// Tag key & value iteration.
|
|
|
|
TagKeyIterator(name []byte) TagKeyIterator
|
|
|
|
TagValueIterator(name, key []byte) TagValueIterator
|
2016-11-27 20:15:32 +00:00
|
|
|
|
2016-11-29 18:09:33 +00:00
|
|
|
// Series iteration.
|
2016-11-28 16:59:36 +00:00
|
|
|
SeriesIterator() SeriesIterator
|
|
|
|
MeasurementSeriesIterator(name []byte) SeriesIterator
|
2016-11-29 18:09:33 +00:00
|
|
|
TagKeySeriesIterator(name, key []byte) SeriesIterator
|
|
|
|
TagValueSeriesIterator(name, key, value []byte) SeriesIterator
|
2016-11-10 15:45:27 +00:00
|
|
|
}
|
|
|
|
|
2016-11-08 21:07:01 +00:00
|
|
|
// FilterExprs represents a map of series IDs to filter expressions.
|
|
|
|
type FilterExprs map[uint64]influxql.Expr
|
|
|
|
|
|
|
|
// DeleteBoolLiteralTrues deletes all elements whose filter expression is a boolean literal true.
|
|
|
|
func (fe FilterExprs) DeleteBoolLiteralTrues() {
|
|
|
|
for id, expr := range fe {
|
|
|
|
if e, ok := expr.(*influxql.BooleanLiteral); ok && e.Val == true {
|
|
|
|
delete(fe, id)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Len returns the number of elements.
|
|
|
|
func (fe FilterExprs) Len() int {
|
|
|
|
if fe == nil {
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
return len(fe)
|
|
|
|
}
|
2016-11-28 16:59:36 +00:00
|
|
|
|
|
|
|
// seriesPointIterator adapts SeriesIterator to an influxql.Iterator.
|
|
|
|
type seriesPointIterator struct {
|
|
|
|
index *Index
|
|
|
|
mitr MeasurementIterator
|
|
|
|
sitr SeriesIterator
|
|
|
|
opt influxql.IteratorOptions
|
|
|
|
|
|
|
|
point influxql.FloatPoint // reusable point
|
|
|
|
}
|
|
|
|
|
|
|
|
// newSeriesPointIterator returns a new instance of seriesPointIterator.
|
|
|
|
func newSeriesPointIterator(index *Index, opt influxql.IteratorOptions) *seriesPointIterator {
|
|
|
|
return &seriesPointIterator{
|
|
|
|
index: index,
|
|
|
|
mitr: index.MeasurementIterator(),
|
|
|
|
point: influxql.FloatPoint{
|
|
|
|
Aux: make([]interface{}, len(opt.Aux)),
|
|
|
|
},
|
|
|
|
opt: opt,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Stats returns stats about the points processed.
|
|
|
|
func (itr *seriesPointIterator) Stats() influxql.IteratorStats { return influxql.IteratorStats{} }
|
|
|
|
|
|
|
|
// Close closes the iterator.
|
|
|
|
func (itr *seriesPointIterator) Close() error { return nil }
|
|
|
|
|
|
|
|
// Next emits the next point in the iterator.
|
|
|
|
func (itr *seriesPointIterator) Next() (*influxql.FloatPoint, error) {
|
|
|
|
for {
|
|
|
|
// Create new series iterator, if necessary.
|
|
|
|
// Exit if there are no measurements remaining.
|
|
|
|
if itr.sitr == nil {
|
|
|
|
m := itr.mitr.Next()
|
|
|
|
if m == nil {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
sitr, err := itr.index.MeasurementSeriesByExprIterator(m.Name(), itr.opt.Condition)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
} else if sitr == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
itr.sitr = sitr
|
|
|
|
}
|
|
|
|
|
|
|
|
// Read next series element.
|
|
|
|
e := itr.sitr.Next()
|
|
|
|
if e == nil {
|
|
|
|
itr.sitr = nil
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// Convert to a key.
|
|
|
|
key := string(models.MakeKey(e.Name(), e.Tags()))
|
|
|
|
|
|
|
|
// Write auxiliary fields.
|
|
|
|
for i, f := range itr.opt.Aux {
|
|
|
|
switch f.Val {
|
|
|
|
case "key":
|
|
|
|
itr.point.Aux[i] = key
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return &itr.point, nil
|
|
|
|
}
|
|
|
|
}
|