influxdb/tsdb/index.go

127 lines
3.6 KiB
Go
Raw Normal View History

2016-09-14 14:15:23 +00:00
package tsdb
import (
2016-11-15 16:20:00 +00:00
"fmt"
"os"
2016-09-14 14:15:23 +00:00
"regexp"
2016-11-15 16:20:00 +00:00
"sort"
2016-09-14 14:15:23 +00:00
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
2016-09-21 15:04:37 +00:00
"github.com/influxdata/influxdb/pkg/estimator"
2016-09-14 14:15:23 +00:00
)
type Index interface {
2016-10-21 15:48:00 +00:00
Open() error
Close() error
2016-12-28 19:59:09 +00:00
MeasurementExists(name []byte) (bool, error)
2016-12-05 17:51:06 +00:00
MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
2016-11-11 16:25:53 +00:00
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
2016-09-29 09:39:13 +00:00
DropMeasurement(name []byte) error
2016-12-15 15:31:18 +00:00
ForEachMeasurementName(fn func(name []byte) error) error
2016-09-14 14:15:23 +00:00
2016-11-21 15:21:58 +00:00
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
2017-02-01 18:51:29 +00:00
DropSeries(key []byte) error
2016-09-14 14:15:23 +00:00
2016-09-23 13:33:47 +00:00
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
2016-11-29 12:26:52 +00:00
SeriesN() int64
2016-09-14 14:15:23 +00:00
2016-10-21 15:48:00 +00:00
Dereference(b []byte)
2016-11-10 15:45:27 +00:00
HasTagKey(name, key []byte) (bool, error)
2016-11-28 16:59:36 +00:00
TagSets(name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error)
2016-12-06 17:30:41 +00:00
MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
2016-12-05 17:51:06 +00:00
ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
2017-03-24 15:48:10 +00:00
TagKeyCardinality(name, key []byte) int
2016-11-28 16:59:36 +00:00
// InfluxQL system iterators
2016-11-30 19:45:14 +00:00
MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error)
2016-12-05 17:51:06 +00:00
ForEachMeasurementSeriesByExpr(name []byte, expr influxql.Expr, fn func(tags models.Tags) error) error
2016-11-28 16:59:36 +00:00
SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iterator, error)
// Sets a shared fieldset from the engine.
SetFieldSet(fs *MeasurementFieldSet)
2016-11-16 18:57:55 +00:00
2017-02-01 21:19:24 +00:00
// Creates hard links inside path for snapshotting.
SnapshotTo(path string) error
2016-11-16 18:57:55 +00:00
// To be removed w/ tsi1.
SetFieldName(measurement, name string)
2016-12-19 16:57:05 +00:00
AssignShard(k string, shardID uint64)
2016-12-20 15:14:15 +00:00
UnassignShard(k string, shardID uint64) error
2016-12-19 16:57:05 +00:00
RemoveShard(shardID uint64)
2017-02-09 17:59:14 +00:00
Type() string
2016-09-14 14:15:23 +00:00
}
2016-11-15 16:20:00 +00:00
// IndexFormat represents the format for an index.
type IndexFormat int
const (
// InMemFormat is the format used by the original in-memory shared index.
InMemFormat IndexFormat = 1
// TSI1Format is the format used by the tsi1 index.
TSI1Format IndexFormat = 2
)
// NewIndexFunc creates a new index.
2016-11-16 18:57:55 +00:00
type NewIndexFunc func(id uint64, path string, options EngineOptions) Index
2016-11-15 16:20:00 +00:00
// newIndexFuncs is a lookup of index constructors by name.
var newIndexFuncs = make(map[string]NewIndexFunc)
// RegisterIndex registers a storage index initializer by name.
func RegisterIndex(name string, fn NewIndexFunc) {
if _, ok := newIndexFuncs[name]; ok {
panic("index already registered: " + name)
}
newIndexFuncs[name] = fn
}
// RegisteredIndexs returns the slice of currently registered indexes.
func RegisteredIndexes() []string {
a := make([]string, 0, len(newIndexFuncs))
for k := range newIndexFuncs {
a = append(a, k)
}
sort.Strings(a)
return a
}
// NewIndex returns an instance of an index based on its format.
// If the path does not exist then the DefaultFormat is used.
2016-11-16 18:57:55 +00:00
func NewIndex(id uint64, path string, options EngineOptions) (Index, error) {
format := options.IndexVersion
// Use default format unless existing directory exists.
_, err := os.Stat(path)
if os.IsNotExist(err) {
// nop, use default
} else if err != nil {
return nil, err
} else if err == nil {
format = "tsi1"
2016-11-15 16:20:00 +00:00
}
// Lookup index by format.
fn := newIndexFuncs[format]
if fn == nil {
return nil, fmt.Errorf("invalid index format: %q", format)
}
return fn(id, path, options), nil
}
2017-03-21 18:21:48 +00:00
func MustOpenIndex(id uint64, path string, options EngineOptions) Index {
2016-11-17 16:20:39 +00:00
idx, err := NewIndex(id, path, options)
if err != nil {
panic(err)
2017-03-21 18:21:48 +00:00
} else if err := idx.Open(); err != nil {
panic(err)
2016-11-17 16:20:39 +00:00
}
return idx
}