influxdb/tsdb/index.go

294 lines
8.1 KiB
Go
Raw Normal View History

2016-09-14 14:15:23 +00:00
package tsdb
import (
2016-11-15 16:20:00 +00:00
"fmt"
"os"
2016-09-14 14:15:23 +00:00
"regexp"
2016-11-15 16:20:00 +00:00
"sort"
2017-11-27 14:52:18 +00:00
"sync"
2016-09-14 14:15:23 +00:00
"github.com/influxdata/influxdb/models"
2016-09-21 15:04:37 +00:00
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxql"
"go.uber.org/zap"
2016-09-14 14:15:23 +00:00
)
type Index interface {
2016-10-21 15:48:00 +00:00
Open() error
Close() error
WithLogger(*zap.Logger)
2016-10-21 15:48:00 +00:00
2016-12-28 19:59:09 +00:00
MeasurementExists(name []byte) (bool, error)
2016-12-05 17:51:06 +00:00
MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
2016-11-11 16:25:53 +00:00
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
2016-09-29 09:39:13 +00:00
DropMeasurement(name []byte) error
2016-12-15 15:31:18 +00:00
ForEachMeasurementName(fn func(name []byte) error) error
2016-09-14 14:15:23 +00:00
2017-03-24 22:27:16 +00:00
InitializeSeries(key, name []byte, tags models.Tags) error
2016-11-21 15:21:58 +00:00
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DropSeries(key []byte, ts int64) error
2016-09-14 14:15:23 +00:00
2016-09-23 13:33:47 +00:00
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
2016-11-29 12:26:52 +00:00
SeriesN() int64
2016-09-14 14:15:23 +00:00
HasTagKey(name, key []byte) (bool, error)
TagSets(name []byte, options query.IteratorOptions) ([]*query.TagSet, error)
2016-12-06 17:30:41 +00:00
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
2016-12-05 17:51:06 +00:00
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
2017-03-24 15:48:10 +00:00
TagKeyCardinality(name, key []byte) int
2016-11-28 16:59:36 +00:00
// InfluxQL system iterators
MeasurementSeriesKeysByExprIterator(name []byte, condition influxql.Expr) (SeriesIDIterator, error)
2016-11-30 19:45:14 +00:00
MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error)
2017-11-27 14:52:18 +00:00
SeriesIDIterator(opt query.IteratorOptions) (SeriesIDIterator, error)
2016-11-28 16:59:36 +00:00
// Sets a shared fieldset from the engine.
SetFieldSet(fs *MeasurementFieldSet)
2016-11-16 18:57:55 +00:00
2017-02-01 21:19:24 +00:00
// Creates hard links inside path for snapshotting.
SnapshotTo(path string) error
2016-11-16 18:57:55 +00:00
// To be removed w/ tsi1.
SetFieldName(measurement []byte, name string)
2016-12-19 16:57:05 +00:00
AssignShard(k string, shardID uint64)
UnassignShard(k string, shardID uint64, ts int64) error
2016-12-19 16:57:05 +00:00
RemoveShard(shardID uint64)
2017-02-09 17:59:14 +00:00
Type() string
Rebuild()
2016-09-14 14:15:23 +00:00
}
2016-11-15 16:20:00 +00:00
// SeriesElem represents a generic series element.
type SeriesElem interface {
Name() []byte
Tags() models.Tags
Deleted() bool
// InfluxQL expression associated with series during filtering.
Expr() influxql.Expr
}
// SeriesIterator represents a iterator over a list of series.
type SeriesIterator interface {
2017-11-27 14:52:18 +00:00
Next() (SeriesElem, error)
}
2017-11-15 23:09:25 +00:00
// NewSeriesIteratorAdapter returns an adapter for converting series ids to series.
func NewSeriesIteratorAdapter(sfile *SeriesFile, itr SeriesIDIterator) SeriesIterator {
return &seriesIteratorAdapter{
sfile: sfile,
itr: itr,
}
}
type seriesIteratorAdapter struct {
sfile *SeriesFile
itr SeriesIDIterator
}
2017-11-27 14:52:18 +00:00
func (itr *seriesIteratorAdapter) Next() (SeriesElem, error) {
elem, err := itr.itr.Next()
if err != nil {
return nil, err
} else if elem.SeriesID == 0 {
return nil, nil
2017-11-15 23:09:25 +00:00
}
name, tags := ParseSeriesKey(itr.sfile.SeriesKey(elem.SeriesID))
deleted := itr.sfile.IsDeleted(elem.SeriesID)
return &seriesElemAdapter{
name: name,
tags: tags,
deleted: deleted,
expr: elem.Expr,
2017-11-27 14:52:18 +00:00
}, nil
2017-11-15 23:09:25 +00:00
}
type seriesElemAdapter struct {
name []byte
tags models.Tags
deleted bool
expr influxql.Expr
}
func (e *seriesElemAdapter) Name() []byte { return e.name }
func (e *seriesElemAdapter) Tags() models.Tags { return e.tags }
func (e *seriesElemAdapter) Deleted() bool { return e.deleted }
func (e *seriesElemAdapter) Expr() influxql.Expr { return e.expr }
// SeriesIDElem represents a single series and optional expression.
type SeriesIDElem struct {
SeriesID uint64
Expr influxql.Expr
}
// SeriesIDElems represents a list of series id elements.
type SeriesIDElems []SeriesIDElem
func (a SeriesIDElems) Len() int { return len(a) }
func (a SeriesIDElems) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a SeriesIDElems) Less(i, j int) bool { return a[i].SeriesID < a[j].SeriesID }
// SeriesIDIterator represents a iterator over a list of series ids.
type SeriesIDIterator interface {
2017-11-27 14:52:18 +00:00
Next() (SeriesIDElem, error)
Close() error
}
2017-11-15 23:09:25 +00:00
// NewSeriesIDSliceIterator returns a SeriesIDIterator that iterates over a slice.
func NewSeriesIDSliceIterator(ids []uint64) *SeriesIDSliceIterator {
return &SeriesIDSliceIterator{ids: ids}
}
// SeriesIDSliceIterator iterates over a slice of series ids.
type SeriesIDSliceIterator struct {
ids []uint64
}
// Next returns the next series id in the slice.
2017-11-27 14:52:18 +00:00
func (itr *SeriesIDSliceIterator) Next() (SeriesIDElem, error) {
2017-11-15 23:09:25 +00:00
if len(itr.ids) == 0 {
2017-11-27 14:52:18 +00:00
return SeriesIDElem{}, nil
2017-11-15 23:09:25 +00:00
}
id := itr.ids[0]
itr.ids = itr.ids[1:]
2017-11-27 14:52:18 +00:00
return SeriesIDElem{SeriesID: id}, nil
}
func (itr *SeriesIDSliceIterator) Close() error { return nil }
// seriesQueryAdapterIterator adapts SeriesIDIterator to an influxql.Iterator.
type seriesQueryAdapterIterator struct {
once sync.Once
sfile *SeriesFile
itr SeriesIDIterator
fieldset *MeasurementFieldSet
opt query.IteratorOptions
point query.FloatPoint // reusable point
}
// NewSeriesQueryAdapterIterator returns a new instance of SeriesQueryAdapterIterator.
func NewSeriesQueryAdapterIterator(sfile *SeriesFile, itr SeriesIDIterator, fieldset *MeasurementFieldSet, opt query.IteratorOptions) query.Iterator {
return &seriesQueryAdapterIterator{
sfile: sfile,
itr: itr,
fieldset: fieldset,
point: query.FloatPoint{
Aux: make([]interface{}, len(opt.Aux)),
},
opt: opt,
}
}
// Stats returns stats about the points processed.
func (itr *seriesQueryAdapterIterator) Stats() query.IteratorStats { return query.IteratorStats{} }
// Close closes the iterator.
func (itr *seriesQueryAdapterIterator) Close() error {
itr.once.Do(func() {
itr.itr.Close()
})
return nil
}
// Next emits the next point in the iterator.
func (itr *seriesQueryAdapterIterator) Next() (*query.FloatPoint, error) {
for {
// Read next series element.
e, err := itr.itr.Next()
if err != nil {
return nil, err
} else if e.SeriesID == 0 {
return nil, nil
}
// Convert to a key.
name, tags := ParseSeriesKey(itr.sfile.SeriesKey(e.SeriesID))
key := string(models.MakeKey(name, tags))
// Write auxiliary fields.
for i, f := range itr.opt.Aux {
switch f.Val {
case "key":
itr.point.Aux[i] = key
}
}
return &itr.point, nil
}
2017-11-15 23:09:25 +00:00
}
2016-11-15 16:20:00 +00:00
// IndexFormat represents the format for an index.
type IndexFormat int
const (
// InMemFormat is the format used by the original in-memory shared index.
InMemFormat IndexFormat = 1
// TSI1Format is the format used by the tsi1 index.
TSI1Format IndexFormat = 2
)
// NewIndexFunc creates a new index.
2017-11-15 23:09:25 +00:00
type NewIndexFunc func(id uint64, database, path string, sfile *SeriesFile, options EngineOptions) Index
2016-11-15 16:20:00 +00:00
// newIndexFuncs is a lookup of index constructors by name.
var newIndexFuncs = make(map[string]NewIndexFunc)
// RegisterIndex registers a storage index initializer by name.
func RegisterIndex(name string, fn NewIndexFunc) {
if _, ok := newIndexFuncs[name]; ok {
panic("index already registered: " + name)
}
newIndexFuncs[name] = fn
}
// RegisteredIndexs returns the slice of currently registered indexes.
func RegisteredIndexes() []string {
a := make([]string, 0, len(newIndexFuncs))
for k := range newIndexFuncs {
a = append(a, k)
}
sort.Strings(a)
return a
}
// NewIndex returns an instance of an index based on its format.
// If the path does not exist then the DefaultFormat is used.
2017-11-15 23:09:25 +00:00
func NewIndex(id uint64, database, path string, sfile *SeriesFile, options EngineOptions) (Index, error) {
format := options.IndexVersion
// Use default format unless existing directory exists.
_, err := os.Stat(path)
if os.IsNotExist(err) {
// nop, use default
} else if err != nil {
return nil, err
} else if err == nil {
format = "tsi1"
2016-11-15 16:20:00 +00:00
}
// Lookup index by format.
fn := newIndexFuncs[format]
if fn == nil {
return nil, fmt.Errorf("invalid index format: %q", format)
}
2017-11-15 23:09:25 +00:00
return fn(id, database, path, sfile, options), nil
2016-11-15 16:20:00 +00:00
}
2017-11-15 23:09:25 +00:00
func MustOpenIndex(id uint64, database, path string, sfile *SeriesFile, options EngineOptions) Index {
idx, err := NewIndex(id, database, path, sfile, options)
2016-11-17 16:20:39 +00:00
if err != nil {
panic(err)
2017-03-21 18:21:48 +00:00
} else if err := idx.Open(); err != nil {
panic(err)
2016-11-17 16:20:39 +00:00
}
return idx
}