Merge pull request #1170 from zhulongcheng/rm-index
refactor(tsdb): remove tsdb.Index and tsdb.IndexSetpull/10616/head
commit
46a7b8155a
|
@ -115,7 +115,7 @@ func (cmd *Command) run(dataDir, walDir string) error {
|
|||
func (cmd *Command) processDatabase(dbName, dataDir, walDir string) error {
|
||||
cmd.Logger.Info("Rebuilding database", zap.String("name", dbName))
|
||||
|
||||
sfile := tsdb.NewSeriesFile(filepath.Join(dataDir, tsdb.SeriesFileDirectory))
|
||||
sfile := tsdb.NewSeriesFile(filepath.Join(dataDir, tsdb.DefaultSeriesFileDirectory))
|
||||
sfile.Logger = cmd.Logger
|
||||
if err := sfile.Open(); err != nil {
|
||||
return err
|
||||
|
@ -131,7 +131,7 @@ func (cmd *Command) processDatabase(dbName, dataDir, walDir string) error {
|
|||
rpName := fi.Name()
|
||||
if !fi.IsDir() {
|
||||
continue
|
||||
} else if rpName == tsdb.SeriesFileDirectory {
|
||||
} else if rpName == tsdb.DefaultSeriesFileDirectory {
|
||||
continue
|
||||
} else if cmd.retentionFilter != "" && rpName != cmd.retentionFilter {
|
||||
continue
|
||||
|
|
|
@ -98,7 +98,7 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
|
|||
e := &Engine{
|
||||
config: c,
|
||||
path: path,
|
||||
sfile: tsdb.NewSeriesFile(filepath.Join(path, tsdb.SeriesFileDirectory)),
|
||||
sfile: tsdb.NewSeriesFile(filepath.Join(path, tsdb.DefaultSeriesFileDirectory)),
|
||||
logger: zap.NewNop(),
|
||||
}
|
||||
|
||||
|
@ -110,7 +110,7 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
|
|||
|
||||
// Initialise Engine
|
||||
// TODO(edd): should just be able to use the config values for data/wal.
|
||||
engine := tsm1.NewEngine(0, tsdb.Index(e.index), filepath.Join(path, "data"), filepath.Join(path, "wal"), e.sfile, c.EngineOptions)
|
||||
engine := tsm1.NewEngine(0, e.index, filepath.Join(path, "data"), filepath.Join(path, "wal"), e.sfile, c.EngineOptions)
|
||||
|
||||
// TODO(edd): Once the tsdb.Engine abstraction is gone, this won't be needed.
|
||||
e.engine = engine.(*tsm1.Engine)
|
||||
|
@ -257,8 +257,7 @@ func (e *Engine) CreateSeriesCursor(ctx context.Context, req SeriesCursorRequest
|
|||
if e.closing == nil {
|
||||
return nil, ErrEngineClosed
|
||||
}
|
||||
// TODO(edd): remove IndexSet
|
||||
return newSeriesCursor(req, tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}, cond)
|
||||
return newSeriesCursor(req, e.index, cond)
|
||||
}
|
||||
|
||||
func (e *Engine) CreateCursorIterator(ctx context.Context) (tsdb.CursorIterator, error) {
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/influxdata/influxql"
|
||||
"github.com/influxdata/platform/models"
|
||||
"github.com/influxdata/platform/tsdb"
|
||||
"github.com/influxdata/platform/tsdb/tsi1"
|
||||
)
|
||||
|
||||
type SeriesCursor interface {
|
||||
|
@ -19,10 +20,10 @@ type SeriesCursorRequest struct {
|
|||
Measurements tsdb.MeasurementIterator
|
||||
}
|
||||
|
||||
// seriesCursor is an implementation of SeriesCursor over an IndexSet.
|
||||
// seriesCursor is an implementation of SeriesCursor over an tsi1.Index.
|
||||
type seriesCursor struct {
|
||||
once sync.Once
|
||||
indexSet tsdb.IndexSet
|
||||
index *tsi1.Index
|
||||
mitr tsdb.MeasurementIterator
|
||||
keys [][]byte
|
||||
ofs int
|
||||
|
@ -36,7 +37,7 @@ type SeriesCursorRow struct {
|
|||
}
|
||||
|
||||
// newSeriesCursor returns a new instance of SeriesCursor.
|
||||
func newSeriesCursor(req SeriesCursorRequest, indexSet tsdb.IndexSet, cond influxql.Expr) (_ SeriesCursor, err error) {
|
||||
func newSeriesCursor(req SeriesCursorRequest, index *tsi1.Index, cond influxql.Expr) (_ SeriesCursor, err error) {
|
||||
// Only equality operators are allowed.
|
||||
influxql.WalkFunc(cond, func(node influxql.Node) {
|
||||
switch n := node.(type) {
|
||||
|
@ -54,14 +55,14 @@ func newSeriesCursor(req SeriesCursorRequest, indexSet tsdb.IndexSet, cond influ
|
|||
|
||||
mitr := req.Measurements
|
||||
if mitr == nil {
|
||||
mitr, err = indexSet.MeasurementIterator()
|
||||
mitr, err = index.MeasurementIterator()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return &seriesCursor{
|
||||
indexSet: indexSet,
|
||||
index: index,
|
||||
mitr: mitr,
|
||||
cond: cond,
|
||||
}, nil
|
||||
|
@ -103,7 +104,7 @@ func (cur *seriesCursor) Next() (*SeriesCursorRow, error) {
|
|||
}
|
||||
|
||||
func (cur *seriesCursor) readSeriesKeys(name []byte) error {
|
||||
sitr, err := cur.indexSet.MeasurementSeriesByExprIterator(name, cur.cond)
|
||||
sitr, err := cur.index.MeasurementSeriesByExprIterator(name, cur.cond)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if sitr == nil {
|
||||
|
@ -122,7 +123,7 @@ func (cur *seriesCursor) readSeriesKeys(name []byte) error {
|
|||
break
|
||||
}
|
||||
|
||||
key := cur.indexSet.SeriesFile.SeriesKey(elem.SeriesID)
|
||||
key := cur.index.SeriesFile().SeriesKey(elem.SeriesID)
|
||||
if len(key) == 0 {
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ const ( // See the defaults package for explanations of what these mean
|
|||
DefaultCompactThroughputBurst = defaults.DefaultCompactThroughputBurst
|
||||
DefaultMaxPointsPerBlock = defaults.DefaultMaxPointsPerBlock
|
||||
DefaultMaxConcurrentCompactions = defaults.DefaultMaxConcurrentCompactions
|
||||
DefaultSeriesFileDirectory = defaults.DefaultSeriesFileDirectory
|
||||
)
|
||||
|
||||
// Config holds the configuration for the tsbd package.
|
||||
|
@ -105,17 +106,6 @@ func (c *Config) Validate() error {
|
|||
}
|
||||
|
||||
valid := false
|
||||
for _, e := range RegisteredEngines() {
|
||||
if e == c.Engine {
|
||||
valid = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !valid {
|
||||
return fmt.Errorf("unrecognized engine %s", c.Engine)
|
||||
}
|
||||
|
||||
valid = false
|
||||
for _, e := range RegisteredIndexes() {
|
||||
if e == c.Index {
|
||||
valid = true
|
||||
|
|
|
@ -48,4 +48,9 @@ const (
|
|||
// DefaultMaxConcurrentCompactions is the maximum number of concurrent full and level compactions
|
||||
// that can run at one time. A value of 0 results in 50% of runtime.GOMAXPROCS(0) used at runtime.
|
||||
DefaultMaxConcurrentCompactions = 0
|
||||
|
||||
// DefaultSeriesFileDirectory is the name of the directory containing series files for
|
||||
// a database.
|
||||
DefaultSeriesFileDirectory = "_series"
|
||||
|
||||
)
|
||||
|
|
|
@ -3,12 +3,9 @@ package tsdb
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxql"
|
||||
|
@ -79,67 +76,6 @@ type SeriesIDSets interface {
|
|||
ForEach(f func(ids *SeriesIDSet)) error
|
||||
}
|
||||
|
||||
// NewEngineFunc creates a new engine.
|
||||
type NewEngineFunc func(id uint64, i Index, path string, walPath string, sfile *SeriesFile, options EngineOptions) Engine
|
||||
|
||||
// newEngineFuncs is a lookup of engine constructors by name.
|
||||
var newEngineFuncs = make(map[string]NewEngineFunc)
|
||||
|
||||
// RegisterEngine registers a storage engine initializer by name.
|
||||
func RegisterEngine(name string, fn NewEngineFunc) {
|
||||
if _, ok := newEngineFuncs[name]; ok {
|
||||
panic("engine already registered: " + name)
|
||||
}
|
||||
newEngineFuncs[name] = fn
|
||||
}
|
||||
|
||||
// RegisteredEngines returns the slice of currently registered engines.
|
||||
func RegisteredEngines() []string {
|
||||
a := make([]string, 0, len(newEngineFuncs))
|
||||
for k := range newEngineFuncs {
|
||||
a = append(a, k)
|
||||
}
|
||||
sort.Strings(a)
|
||||
return a
|
||||
}
|
||||
|
||||
// NewEngine returns an instance of an engine based on its format.
|
||||
// If the path does not exist then the DefaultFormat is used.
|
||||
func NewEngine(id uint64, i Index, path string, sfile *SeriesFile, options EngineOptions) (Engine, error) {
|
||||
// Create a new engine
|
||||
if _, err := os.Stat(path); os.IsNotExist(err) {
|
||||
// TODO(jeff): remove walPath argument
|
||||
engine := newEngineFuncs[options.EngineVersion](id, i, path, "", sfile, options)
|
||||
if options.OnNewEngine != nil {
|
||||
options.OnNewEngine(engine)
|
||||
}
|
||||
return engine, nil
|
||||
}
|
||||
|
||||
// If it's a dir then it's a tsm1 engine
|
||||
format := DefaultEngine
|
||||
if fi, err := os.Stat(path); err != nil {
|
||||
return nil, err
|
||||
} else if !fi.Mode().IsDir() {
|
||||
return nil, ErrUnknownEngineFormat
|
||||
} else {
|
||||
format = "tsm1"
|
||||
}
|
||||
|
||||
// Lookup engine by format.
|
||||
fn := newEngineFuncs[format]
|
||||
if fn == nil {
|
||||
return nil, fmt.Errorf("invalid engine format: %q", format)
|
||||
}
|
||||
|
||||
// TODO(jeff): remove walPath argument
|
||||
engine := fn(id, i, path, "", sfile, options)
|
||||
if options.OnNewEngine != nil {
|
||||
options.OnNewEngine(engine)
|
||||
}
|
||||
return engine, nil
|
||||
}
|
||||
|
||||
// EngineOptions represents the options used to initialize the engine.
|
||||
type EngineOptions struct {
|
||||
EngineVersion string
|
||||
|
|
1466
tsdb/index.go
1466
tsdb/index.go
File diff suppressed because it is too large
Load Diff
|
@ -14,7 +14,6 @@ import (
|
|||
"github.com/influxdata/influxql"
|
||||
"github.com/influxdata/platform/logger"
|
||||
"github.com/influxdata/platform/models"
|
||||
"github.com/influxdata/platform/pkg/slices"
|
||||
"github.com/influxdata/platform/tsdb"
|
||||
"github.com/influxdata/platform/tsdb/tsi1"
|
||||
)
|
||||
|
@ -62,51 +61,6 @@ func TestMergeSeriesIDIterators(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestIndexSet_MeasurementNamesByExpr(t *testing.T) {
|
||||
// Setup indexes
|
||||
indexes := map[string]*Index{}
|
||||
for _, name := range tsdb.RegisteredIndexes() {
|
||||
idx := MustOpenNewIndex(tsi1.NewConfig())
|
||||
idx.AddSeries("cpu", map[string]string{"region": "east"}, models.Integer)
|
||||
idx.AddSeries("cpu", map[string]string{"region": "west", "secret": "foo"}, models.Integer)
|
||||
idx.AddSeries("disk", map[string]string{"secret": "foo"}, models.Integer)
|
||||
idx.AddSeries("mem", map[string]string{"region": "west"}, models.Integer)
|
||||
idx.AddSeries("gpu", map[string]string{"region": "east"}, models.Integer)
|
||||
idx.AddSeries("pci", map[string]string{"region": "east", "secret": "foo"}, models.Integer)
|
||||
indexes[name] = idx
|
||||
defer idx.Close()
|
||||
}
|
||||
|
||||
type example struct {
|
||||
name string
|
||||
expr influxql.Expr
|
||||
expected [][]byte
|
||||
}
|
||||
|
||||
examples := []example{
|
||||
{name: "all", expected: slices.StringsToBytes("cpu", "disk", "gpu", "mem", "pci")},
|
||||
{name: "EQ", expr: influxql.MustParseExpr(`region = 'west'`), expected: slices.StringsToBytes("cpu", "mem")},
|
||||
{name: "NEQ", expr: influxql.MustParseExpr(`region != 'west'`), expected: slices.StringsToBytes("gpu", "pci")},
|
||||
{name: "EQREGEX", expr: influxql.MustParseExpr(`region =~ /.*st/`), expected: slices.StringsToBytes("cpu", "gpu", "mem", "pci")},
|
||||
{name: "NEQREGEX", expr: influxql.MustParseExpr(`region !~ /.*est/`), expected: slices.StringsToBytes("gpu", "pci")},
|
||||
}
|
||||
|
||||
for _, idx := range tsdb.RegisteredIndexes() {
|
||||
t.Run(idx, func(t *testing.T) {
|
||||
for _, example := range examples {
|
||||
t.Run(example.name, func(t *testing.T) {
|
||||
names, err := indexes[idx].IndexSet().MeasurementNamesByExpr(nil, example.expr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !reflect.DeepEqual(names, example.expected) {
|
||||
t.Fatalf("got names: %v, expected %v", slices.BytesToStrings(names), slices.BytesToStrings(example.expected))
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Index wraps a series file and index.
|
||||
type Index struct {
|
||||
rootPath string
|
||||
|
@ -126,7 +80,7 @@ func MustNewIndex(c tsi1.Config) *Index {
|
|||
panic(err)
|
||||
}
|
||||
|
||||
seriesPath, err := ioutil.TempDir(rootPath, tsdb.SeriesFileDirectory)
|
||||
seriesPath, err := ioutil.TempDir(rootPath, tsdb.DefaultSeriesFileDirectory)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -166,10 +120,6 @@ func (i *Index) MustOpen() {
|
|||
}
|
||||
}
|
||||
|
||||
func (idx *Index) IndexSet() *tsdb.IndexSet {
|
||||
return &tsdb.IndexSet{Indexes: []tsdb.Index{idx.Index}, SeriesFile: idx.sfile}
|
||||
}
|
||||
|
||||
func (idx *Index) AddSeries(name string, tags map[string]string, typ models.FieldType) error {
|
||||
t := models.NewTags(tags)
|
||||
key := fmt.Sprintf("%s,%s", name, t.HashKey())
|
||||
|
@ -215,8 +165,8 @@ func (i *Index) Close() error {
|
|||
//
|
||||
// Typical results on an i7 laptop.
|
||||
//
|
||||
// BenchmarkIndexSet_TagSets/1M_series/tsi1-8 100 18995530 ns/op 5221180 B/op 20379 allocs/op
|
||||
func BenchmarkIndexSet_TagSets(b *testing.B) {
|
||||
// BenchmarkIndex_TagSets/1M_series/tsi1-8 100 18995530 ns/op 5221180 B/op 20379 allocs/op
|
||||
func BenchmarkIndex_TagSets(b *testing.B) {
|
||||
// Read line-protocol and coerce into tsdb format.
|
||||
// 1M series generated with:
|
||||
// $inch -b 10000 -c 1 -t 10,10,10,10,10,10 -f 1 -m 5 -p 1
|
||||
|
@ -269,14 +219,10 @@ func BenchmarkIndexSet_TagSets(b *testing.B) {
|
|||
|
||||
name := []byte("m4")
|
||||
opt := query.IteratorOptions{Condition: influxql.MustParseExpr(`"tag5"::tag = 'value0'`)}
|
||||
indexSet := tsdb.IndexSet{
|
||||
SeriesFile: idx.sfile,
|
||||
Indexes: []tsdb.Index{idx.Index},
|
||||
} // For TSI implementation
|
||||
|
||||
var ts func() ([]*query.TagSet, error)
|
||||
ts = func() ([]*query.TagSet, error) {
|
||||
return indexSet.TagSets(idx.sfile, name, opt)
|
||||
return idx.Index.TagSets(name, opt)
|
||||
}
|
||||
|
||||
b.Run(indexType, func(b *testing.B) {
|
||||
|
|
305
tsdb/shard.go
305
tsdb/shard.go
|
@ -1,305 +0,0 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/influxdata/platform/models"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// SeriesFileDirectory is the name of the directory containing series files for
|
||||
// a database.
|
||||
const SeriesFileDirectory = "_series"
|
||||
|
||||
var (
|
||||
// Static objects to prevent small allocs.
|
||||
timeBytes = []byte("time")
|
||||
)
|
||||
|
||||
// Shard represents a self-contained time series database. An inverted index of
|
||||
// the measurement and tag data is kept along with the raw time series data.
|
||||
// Data can be split across many shards. The query engine in TSDB is responsible
|
||||
// for combining the output of many shards into a single query result.
|
||||
type Shard struct {
|
||||
path string
|
||||
id uint64
|
||||
|
||||
sfile *SeriesFile
|
||||
options EngineOptions
|
||||
|
||||
mu sync.RWMutex
|
||||
_engine Engine
|
||||
index Index
|
||||
enabled bool
|
||||
|
||||
baseLogger *zap.Logger
|
||||
logger *zap.Logger
|
||||
|
||||
EnableOnOpen bool
|
||||
|
||||
// CompactionDisabled specifies the shard should not schedule compactions.
|
||||
// This option is intended for offline tooling.
|
||||
CompactionDisabled bool
|
||||
}
|
||||
|
||||
// NewShard returns a new initialized Shard.
|
||||
func NewShard(id uint64, path string, sfile *SeriesFile, opt EngineOptions) *Shard {
|
||||
logger := zap.NewNop()
|
||||
|
||||
s := &Shard{
|
||||
id: id,
|
||||
path: path,
|
||||
sfile: sfile,
|
||||
options: opt,
|
||||
|
||||
logger: logger,
|
||||
baseLogger: logger,
|
||||
EnableOnOpen: true,
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// WithLogger sets the logger on the shard. It must be called before Open.
|
||||
func (s *Shard) WithLogger(log *zap.Logger) {
|
||||
s.baseLogger = log
|
||||
engine, err := s.Engine()
|
||||
if err == nil {
|
||||
engine.WithLogger(s.baseLogger)
|
||||
s.index.WithLogger(s.baseLogger)
|
||||
}
|
||||
s.logger = s.baseLogger.With(zap.String("service", "shard"))
|
||||
}
|
||||
|
||||
// SetEnabled enables the shard for queries and write. When disabled, all
|
||||
// writes and queries return an error and compactions are stopped for the shard.
|
||||
func (s *Shard) SetEnabled(enabled bool) {
|
||||
s.mu.Lock()
|
||||
// Prevent writes and queries
|
||||
s.enabled = enabled
|
||||
if s._engine != nil && !s.CompactionDisabled {
|
||||
// Disable background compactions and snapshotting
|
||||
s._engine.SetEnabled(enabled)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// ID returns the shards ID.
|
||||
func (s *Shard) ID() uint64 {
|
||||
return s.id
|
||||
}
|
||||
|
||||
// Path returns the path set on the shard when it was created.
|
||||
func (s *Shard) Path() string { return s.path }
|
||||
|
||||
// Open initializes and opens the shard's store.
|
||||
func (s *Shard) Open() error {
|
||||
if err := func() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Return if the shard is already open
|
||||
if s._engine != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
seriesIDSet := NewSeriesIDSet()
|
||||
|
||||
// Initialize underlying index.
|
||||
ipath := filepath.Join(s.path, "index")
|
||||
idx, err := NewIndex(s.id, "remove-me", ipath, seriesIDSet, s.sfile, s.options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
idx.WithLogger(s.baseLogger)
|
||||
|
||||
// Open index.
|
||||
if err := idx.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
s.index = idx
|
||||
|
||||
// Initialize underlying engine.
|
||||
e, err := NewEngine(s.id, idx, s.path, s.sfile, s.options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Set log output on the engine.
|
||||
e.WithLogger(s.baseLogger)
|
||||
|
||||
// Disable compactions while loading the index
|
||||
e.SetEnabled(false)
|
||||
|
||||
// Open engine.
|
||||
if err := e.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s._engine = e
|
||||
|
||||
return nil
|
||||
}(); err != nil {
|
||||
s.close()
|
||||
return NewShardError(s.id, err)
|
||||
}
|
||||
|
||||
if s.EnableOnOpen {
|
||||
// enable writes, queries and compactions
|
||||
s.SetEnabled(true)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close shuts down the shard's store.
|
||||
func (s *Shard) Close() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.close()
|
||||
}
|
||||
|
||||
// close closes the shard an removes reference to the shard from associated
|
||||
// indexes, unless clean is false.
|
||||
func (s *Shard) close() error {
|
||||
if s._engine == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := s._engine.Close()
|
||||
if err == nil {
|
||||
s._engine = nil
|
||||
}
|
||||
|
||||
if e := s.index.Close(); e == nil {
|
||||
s.index = nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// ready determines if the Shard is ready for queries or writes.
|
||||
// It returns nil if ready, otherwise ErrShardClosed or ErrShardDisabled
|
||||
func (s *Shard) ready() error {
|
||||
return nil // TODO(edd)remove
|
||||
}
|
||||
|
||||
// Engine returns a reference to the currently loaded engine.
|
||||
func (s *Shard) Engine() (Engine, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.engineNoLock()
|
||||
}
|
||||
|
||||
// engineNoLock is similar to calling Engine(), but the caller must guarantee
|
||||
// that they already hold an appropriate lock.
|
||||
func (s *Shard) engineNoLock() (Engine, error) {
|
||||
if err := s.ready(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s._engine, nil
|
||||
}
|
||||
|
||||
// Index returns a reference to the underlying index. It returns an error if
|
||||
// the index is nil.
|
||||
func (s *Shard) Index() (Index, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
if err := s.ready(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.index, nil
|
||||
}
|
||||
|
||||
// WritePoints will write the raw data points and any new metadata to the index in the shard.
|
||||
func (s *Shard) WritePoints(points []models.Point) error {
|
||||
collection := NewSeriesCollection(points)
|
||||
|
||||
j := 0
|
||||
for iter := collection.Iterator(); iter.Next(); {
|
||||
tags := iter.Tags()
|
||||
|
||||
// Filter out any tags with key equal to "time": they are invalid.
|
||||
if tags.Get(timeBytes) != nil {
|
||||
if collection.Reason == "" {
|
||||
collection.Reason = fmt.Sprintf(
|
||||
"invalid tag key: input tag %q on measurement %q is invalid",
|
||||
timeBytes, iter.Name())
|
||||
}
|
||||
collection.Dropped++
|
||||
collection.DroppedKeys = append(collection.DroppedKeys, iter.Key())
|
||||
continue
|
||||
}
|
||||
|
||||
// Drop any series with invalid unicode characters in the key.
|
||||
if s.options.Config.ValidateKeys && !models.ValidKeyTokens(string(iter.Name()), tags) {
|
||||
if collection.Reason == "" {
|
||||
collection.Reason = fmt.Sprintf(
|
||||
"key contains invalid unicode: %q",
|
||||
iter.Key())
|
||||
}
|
||||
collection.Dropped++
|
||||
collection.DroppedKeys = append(collection.DroppedKeys, iter.Key())
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO(jeff): do we have to filter entries where the only field is "time"?
|
||||
|
||||
collection.Copy(j, iter.Index())
|
||||
j++
|
||||
}
|
||||
collection.Truncate(j)
|
||||
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
engine, err := s.engineNoLock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// make sure the series exist
|
||||
if err := engine.CreateSeriesListIfNotExists(collection); err != nil {
|
||||
// ignore PartialWriteErrors. The collection captures it.
|
||||
if _, ok := err.(PartialWriteError); !ok {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Write to the engine.
|
||||
if err := engine.WritePoints(collection.Points); err != nil {
|
||||
return fmt.Errorf("engine: %s", err)
|
||||
}
|
||||
|
||||
return collection.PartialWriteError()
|
||||
}
|
||||
|
||||
// DeleteSeriesRangeWithPredicate deletes all values from for seriesKeys between min and max (inclusive)
|
||||
// for which predicate() returns true. If predicate() is nil, then all values in range are deleted.
|
||||
func (s *Shard) DeleteSeriesRangeWithPredicate(itr SeriesIterator, predicate func(name []byte, tags models.Tags) (int64, int64, bool)) error {
|
||||
engine, err := s.Engine()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return engine.DeleteSeriesRangeWithPredicate(itr, predicate)
|
||||
}
|
||||
|
||||
// SeriesN returns the unique number of series in the shard.
|
||||
func (s *Shard) SeriesN() int64 {
|
||||
engine, err := s.Engine()
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return engine.SeriesN()
|
||||
}
|
||||
|
||||
// CreateCursorIterator creates a CursorIterator for the shard.
|
||||
func (s *Shard) CreateCursorIterator(ctx context.Context) (CursorIterator, error) {
|
||||
engine, err := s.Engine()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return engine.CreateCursorIterator(ctx)
|
||||
}
|
|
@ -13,7 +13,11 @@ import (
|
|||
"sync/atomic"
|
||||
"unsafe"
|
||||
|
||||
"bytes"
|
||||
"sort"
|
||||
|
||||
"github.com/cespare/xxhash"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/influxql"
|
||||
"github.com/influxdata/platform/models"
|
||||
"github.com/influxdata/platform/pkg/estimator"
|
||||
|
@ -534,8 +538,52 @@ func (i *Index) MeasurementIterator() (tsdb.MeasurementIterator, error) {
|
|||
return tsdb.MergeMeasurementIterators(itrs...), nil
|
||||
}
|
||||
|
||||
// MeasurementSeriesIDIterator returns an iterator over all series in a measurement.
|
||||
func (i *Index) MeasurementSeriesByExprIterator(name []byte, expr influxql.Expr) (tsdb.SeriesIDIterator, error) {
|
||||
return i.measurementSeriesByExprIterator(name, expr)
|
||||
}
|
||||
|
||||
// measurementSeriesByExprIterator returns a series iterator for a measurement
|
||||
// that is filtered by expr. See MeasurementSeriesByExprIterator for more details.
|
||||
//
|
||||
// measurementSeriesByExprIterator guarantees to never take any locks on the
|
||||
// series file.
|
||||
func (i *Index) measurementSeriesByExprIterator(name []byte, expr influxql.Expr) (tsdb.SeriesIDIterator, error) {
|
||||
// Return all series for the measurement if there are no tag expressions.
|
||||
|
||||
release := i.sfile.Retain()
|
||||
defer release()
|
||||
|
||||
if expr == nil {
|
||||
itr, err := i.measurementSeriesIDIterator(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tsdb.FilterUndeletedSeriesIDIterator(i.sfile, itr), nil
|
||||
}
|
||||
|
||||
itr, err := i.seriesByExprIterator(name, expr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return tsdb.FilterUndeletedSeriesIDIterator(i.sfile, itr), nil
|
||||
}
|
||||
|
||||
// MeasurementSeriesIDIterator returns an iterator over all non-tombstoned series
|
||||
// for the provided measurement.
|
||||
func (i *Index) MeasurementSeriesIDIterator(name []byte) (tsdb.SeriesIDIterator, error) {
|
||||
itr, err := i.measurementSeriesIDIterator(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
release := i.sfile.Retain()
|
||||
defer release()
|
||||
return tsdb.FilterUndeletedSeriesIDIterator(i.sfile, itr), nil
|
||||
}
|
||||
|
||||
// measurementSeriesIDIterator returns an iterator over all series in a measurement.
|
||||
func (i *Index) measurementSeriesIDIterator(name []byte) (tsdb.SeriesIDIterator, error) {
|
||||
itrs := make([]tsdb.SeriesIDIterator, 0, len(i.partitions))
|
||||
for _, p := range i.partitions {
|
||||
itr, err := p.MeasurementSeriesIDIterator(name)
|
||||
|
@ -945,6 +993,18 @@ func (i *Index) TagValueIterator(name, key []byte) (tsdb.TagValueIterator, error
|
|||
|
||||
// TagKeySeriesIDIterator returns a series iterator for all values across a single key.
|
||||
func (i *Index) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) {
|
||||
release := i.sfile.Retain()
|
||||
defer release()
|
||||
|
||||
itr, err := i.tagKeySeriesIDIterator(name, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tsdb.FilterUndeletedSeriesIDIterator(i.sfile, itr), nil
|
||||
}
|
||||
|
||||
// tagKeySeriesIDIterator returns a series iterator for all values across a single key.
|
||||
func (i *Index) tagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) {
|
||||
a := make([]tsdb.SeriesIDIterator, 0, len(i.partitions))
|
||||
for _, p := range i.partitions {
|
||||
itr := p.TagKeySeriesIDIterator(name, key)
|
||||
|
@ -957,6 +1017,18 @@ func (i *Index) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator,
|
|||
|
||||
// TagValueSeriesIDIterator returns a series iterator for a single tag value.
|
||||
func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
|
||||
release := i.sfile.Retain()
|
||||
defer release()
|
||||
|
||||
itr, err := i.tagValueSeriesIDIterator(name, key, value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tsdb.FilterUndeletedSeriesIDIterator(i.sfile, itr), nil
|
||||
}
|
||||
|
||||
// tagValueSeriesIDIterator returns a series iterator for a single tag value.
|
||||
func (i *Index) tagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
|
||||
// Check series ID set cache...
|
||||
if EnableBitsetCache {
|
||||
if ss := i.tagValueCache.Get(name, key, value); ss != nil {
|
||||
|
@ -989,6 +1061,114 @@ func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesID
|
|||
return itr, nil
|
||||
}
|
||||
|
||||
func (i *Index) TagSets(name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) {
|
||||
release := i.sfile.Retain()
|
||||
defer release()
|
||||
|
||||
itr, err := i.MeasurementSeriesByExprIterator(name, opt.Condition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if itr == nil {
|
||||
return nil, nil
|
||||
}
|
||||
defer itr.Close()
|
||||
// measurementSeriesByExprIterator filters deleted series IDs; no need to
|
||||
// do so here.
|
||||
|
||||
var dims []string
|
||||
if len(opt.Dimensions) > 0 {
|
||||
dims = make([]string, len(opt.Dimensions))
|
||||
copy(dims, opt.Dimensions)
|
||||
sort.Strings(dims)
|
||||
}
|
||||
|
||||
// For every series, get the tag values for the requested tag keys i.e.
|
||||
// dimensions. This is the TagSet for that series. Series with the same
|
||||
// TagSet are then grouped together, because for the purpose of GROUP BY
|
||||
// they are part of the same composite series.
|
||||
tagSets := make(map[string]*query.TagSet, 64)
|
||||
var seriesN, maxSeriesN int
|
||||
|
||||
if opt.MaxSeriesN > 0 {
|
||||
maxSeriesN = opt.MaxSeriesN
|
||||
} else {
|
||||
maxSeriesN = int(^uint(0) >> 1)
|
||||
}
|
||||
|
||||
// The tag sets require a string for each series key in the set, The series
|
||||
// file formatted keys need to be parsed into models format. Since they will
|
||||
// end up as strings we can re-use an intermediate buffer for this process.
|
||||
var keyBuf []byte
|
||||
var tagsBuf models.Tags // Buffer for tags. Tags are not needed outside of each loop iteration.
|
||||
for {
|
||||
se, err := itr.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if se.SeriesID.IsZero() {
|
||||
break
|
||||
}
|
||||
|
||||
// Skip if the series has been tombstoned.
|
||||
key := i.sfile.SeriesKey(se.SeriesID)
|
||||
if len(key) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if seriesN&0x3fff == 0x3fff {
|
||||
// check every 16384 series if the query has been canceled
|
||||
select {
|
||||
case <-opt.InterruptCh:
|
||||
return nil, query.ErrQueryInterrupted
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
if seriesN > maxSeriesN {
|
||||
return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", seriesN, opt.MaxSeriesN)
|
||||
}
|
||||
|
||||
// NOTE - must not escape this loop iteration.
|
||||
_, tagsBuf = tsdb.ParseSeriesKeyInto(key, tagsBuf)
|
||||
var tagsAsKey []byte
|
||||
if len(dims) > 0 {
|
||||
tagsAsKey = tsdb.MakeTagsKey(dims, tagsBuf)
|
||||
}
|
||||
|
||||
tagSet, ok := tagSets[string(tagsAsKey)]
|
||||
if !ok {
|
||||
// This TagSet is new, create a new entry for it.
|
||||
tagSet = &query.TagSet{
|
||||
Tags: nil,
|
||||
Key: tagsAsKey,
|
||||
}
|
||||
}
|
||||
|
||||
// Associate the series and filter with the Tagset.
|
||||
keyBuf = models.AppendMakeKey(keyBuf, name, tagsBuf)
|
||||
tagSet.AddFilter(string(keyBuf), se.Expr)
|
||||
keyBuf = keyBuf[:0]
|
||||
|
||||
// Ensure it's back in the map.
|
||||
tagSets[string(tagsAsKey)] = tagSet
|
||||
seriesN++
|
||||
}
|
||||
|
||||
// Sort the series in each tag set.
|
||||
for _, t := range tagSets {
|
||||
sort.Sort(t)
|
||||
}
|
||||
|
||||
// The TagSets have been created, as a map of TagSets. Just send
|
||||
// the values back as a slice, sorting for consistency.
|
||||
sortedTagsSets := make([]*query.TagSet, 0, len(tagSets))
|
||||
for _, v := range tagSets {
|
||||
sortedTagsSets = append(sortedTagsSets, v)
|
||||
}
|
||||
sort.Sort(tsdb.ByTagKey(sortedTagsSets))
|
||||
|
||||
return sortedTagsSets, nil
|
||||
}
|
||||
|
||||
// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression.
|
||||
func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
|
||||
n := i.availableThreads()
|
||||
|
@ -1092,6 +1272,390 @@ func (i *Index) MeasurementCardinalityStats() MeasurementCardinalityStats {
|
|||
return stats
|
||||
}
|
||||
|
||||
func (i *Index) seriesByExprIterator(name []byte, expr influxql.Expr) (tsdb.SeriesIDIterator, error) {
|
||||
switch expr := expr.(type) {
|
||||
case *influxql.BinaryExpr:
|
||||
switch expr.Op {
|
||||
case influxql.AND, influxql.OR:
|
||||
// Get the series IDs and filter expressions for the LHS.
|
||||
litr, err := i.seriesByExprIterator(name, expr.LHS)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Get the series IDs and filter expressions for the RHS.
|
||||
ritr, err := i.seriesByExprIterator(name, expr.RHS)
|
||||
if err != nil {
|
||||
if litr != nil {
|
||||
litr.Close()
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Intersect iterators if expression is "AND".
|
||||
if expr.Op == influxql.AND {
|
||||
return tsdb.IntersectSeriesIDIterators(litr, ritr), nil
|
||||
}
|
||||
|
||||
// Union iterators if expression is "OR".
|
||||
return tsdb.UnionSeriesIDIterators(litr, ritr), nil
|
||||
|
||||
default:
|
||||
return i.seriesByBinaryExprIterator(name, expr)
|
||||
}
|
||||
|
||||
case *influxql.ParenExpr:
|
||||
return i.seriesByExprIterator(name, expr.Expr)
|
||||
|
||||
case *influxql.BooleanLiteral:
|
||||
if expr.Val {
|
||||
return i.measurementSeriesIDIterator(name)
|
||||
}
|
||||
return nil, nil
|
||||
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
// seriesByBinaryExprIterator returns a series iterator and a filtering expression.
|
||||
func (i *Index) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExpr) (tsdb.SeriesIDIterator, error) {
|
||||
// If this binary expression has another binary expression, then this
|
||||
// is some expression math and we should just pass it to the underlying query.
|
||||
if _, ok := n.LHS.(*influxql.BinaryExpr); ok {
|
||||
itr, err := i.measurementSeriesIDIterator(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tsdb.NewSeriesIDExprIterator(itr, n), nil
|
||||
} else if _, ok := n.RHS.(*influxql.BinaryExpr); ok {
|
||||
itr, err := i.measurementSeriesIDIterator(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tsdb.NewSeriesIDExprIterator(itr, n), nil
|
||||
}
|
||||
|
||||
// Retrieve the variable reference from the correct side of the expression.
|
||||
key, ok := n.LHS.(*influxql.VarRef)
|
||||
value := n.RHS
|
||||
if !ok {
|
||||
key, ok = n.RHS.(*influxql.VarRef)
|
||||
if !ok {
|
||||
// This is an expression we do not know how to evaluate. Let the
|
||||
// query engine take care of this.
|
||||
itr, err := i.measurementSeriesIDIterator(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tsdb.NewSeriesIDExprIterator(itr, n), nil
|
||||
}
|
||||
value = n.LHS
|
||||
}
|
||||
|
||||
// For fields, return all series from this measurement.
|
||||
if key.Val != "_name" && (key.Type == influxql.AnyField || (key.Type != influxql.Tag && key.Type != influxql.Unknown)) {
|
||||
itr, err := i.measurementSeriesIDIterator(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tsdb.NewSeriesIDExprIterator(itr, n), nil
|
||||
} else if value, ok := value.(*influxql.VarRef); ok {
|
||||
// Check if the RHS is a variable and if it is a field.
|
||||
if value.Val != "_name" && (key.Type == influxql.AnyField || (value.Type != influxql.Tag && value.Type != influxql.Unknown)) {
|
||||
itr, err := i.measurementSeriesIDIterator(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tsdb.NewSeriesIDExprIterator(itr, n), nil
|
||||
}
|
||||
}
|
||||
|
||||
// Create iterator based on value type.
|
||||
switch value := value.(type) {
|
||||
case *influxql.StringLiteral:
|
||||
return i.seriesByBinaryExprStringIterator(name, []byte(key.Val), []byte(value.Val), n.Op)
|
||||
case *influxql.RegexLiteral:
|
||||
return i.seriesByBinaryExprRegexIterator(name, []byte(key.Val), value.Val, n.Op)
|
||||
case *influxql.VarRef:
|
||||
return i.seriesByBinaryExprVarRefIterator(name, []byte(key.Val), value, n.Op)
|
||||
default:
|
||||
// We do not know how to evaluate this expression so pass it
|
||||
// on to the query engine.
|
||||
itr, err := i.measurementSeriesIDIterator(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tsdb.NewSeriesIDExprIterator(itr, n), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (i *Index) seriesByBinaryExprStringIterator(name, key, value []byte, op influxql.Token) (tsdb.SeriesIDIterator, error) {
|
||||
// Special handling for "_name" to match measurement name.
|
||||
if bytes.Equal(key, []byte("_name")) {
|
||||
if (op == influxql.EQ && bytes.Equal(value, name)) || (op == influxql.NEQ && !bytes.Equal(value, name)) {
|
||||
return i.measurementSeriesIDIterator(name)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if op == influxql.EQ {
|
||||
// Match a specific value.
|
||||
if len(value) != 0 {
|
||||
return i.tagValueSeriesIDIterator(name, key, value)
|
||||
}
|
||||
|
||||
mitr, err := i.measurementSeriesIDIterator(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
kitr, err := i.tagKeySeriesIDIterator(name, key)
|
||||
if err != nil {
|
||||
if mitr != nil {
|
||||
mitr.Close()
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Return all measurement series that have no values from this tag key.
|
||||
return tsdb.DifferenceSeriesIDIterators(mitr, kitr), nil
|
||||
}
|
||||
|
||||
// Return all measurement series without this tag value.
|
||||
if len(value) != 0 {
|
||||
mitr, err := i.measurementSeriesIDIterator(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
vitr, err := i.tagValueSeriesIDIterator(name, key, value)
|
||||
if err != nil {
|
||||
if mitr != nil {
|
||||
mitr.Close()
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return tsdb.DifferenceSeriesIDIterators(mitr, vitr), nil
|
||||
}
|
||||
|
||||
// Return all series across all values of this tag key.
|
||||
return i.tagKeySeriesIDIterator(name, key)
|
||||
}
|
||||
|
||||
func (i *Index) seriesByBinaryExprRegexIterator(name, key []byte, value *regexp.Regexp, op influxql.Token) (tsdb.SeriesIDIterator, error) {
|
||||
// Special handling for "_name" to match measurement name.
|
||||
if bytes.Equal(key, []byte("_name")) {
|
||||
match := value.Match(name)
|
||||
if (op == influxql.EQREGEX && match) || (op == influxql.NEQREGEX && !match) {
|
||||
mitr, err := i.measurementSeriesIDIterator(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tsdb.NewSeriesIDExprIterator(mitr, &influxql.BooleanLiteral{Val: true}), nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
return i.matchTagValueSeriesIDIterator(name, key, value, op == influxql.EQREGEX)
|
||||
}
|
||||
|
||||
func (i *Index) seriesByBinaryExprVarRefIterator(name, key []byte, value *influxql.VarRef, op influxql.Token) (tsdb.SeriesIDIterator, error) {
|
||||
itr0, err := i.tagKeySeriesIDIterator(name, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
itr1, err := i.tagKeySeriesIDIterator(name, []byte(value.Val))
|
||||
if err != nil {
|
||||
if itr0 != nil {
|
||||
itr0.Close()
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if op == influxql.EQ {
|
||||
return tsdb.IntersectSeriesIDIterators(itr0, itr1), nil
|
||||
}
|
||||
return tsdb.DifferenceSeriesIDIterators(itr0, itr1), nil
|
||||
}
|
||||
|
||||
// MatchTagValueSeriesIDIterator returns a series iterator for tags which match value.
|
||||
// If matches is false, returns iterators which do not match value.
|
||||
func (i *Index) MatchTagValueSeriesIDIterator(name, key []byte, value *regexp.Regexp, matches bool) (tsdb.SeriesIDIterator, error) {
|
||||
release := i.sfile.Retain()
|
||||
defer release()
|
||||
|
||||
itr, err := i.matchTagValueSeriesIDIterator(name, key, value, matches)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tsdb.FilterUndeletedSeriesIDIterator(i.sfile, itr), nil
|
||||
}
|
||||
|
||||
// matchTagValueSeriesIDIterator returns a series iterator for tags which match
|
||||
// value. See MatchTagValueSeriesIDIterator for more details.
|
||||
//
|
||||
// It guarantees to never take any locks on the underlying series file.
|
||||
func (i *Index) matchTagValueSeriesIDIterator(name, key []byte, value *regexp.Regexp, matches bool) (tsdb.SeriesIDIterator, error) {
|
||||
matchEmpty := value.MatchString("")
|
||||
if matches {
|
||||
if matchEmpty {
|
||||
return i.matchTagValueEqualEmptySeriesIDIterator(name, key, value)
|
||||
}
|
||||
return i.matchTagValueEqualNotEmptySeriesIDIterator(name, key, value)
|
||||
}
|
||||
|
||||
if matchEmpty {
|
||||
return i.matchTagValueNotEqualEmptySeriesIDIterator(name, key, value)
|
||||
}
|
||||
return i.matchTagValueNotEqualNotEmptySeriesIDIterator(name, key, value)
|
||||
}
|
||||
|
||||
func (i *Index) matchTagValueEqualEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (tsdb.SeriesIDIterator, error) {
|
||||
vitr, err := i.TagValueIterator(name, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if vitr == nil {
|
||||
return i.measurementSeriesIDIterator(name)
|
||||
}
|
||||
defer vitr.Close()
|
||||
|
||||
var itrs []tsdb.SeriesIDIterator
|
||||
if err := func() error {
|
||||
for {
|
||||
e, err := vitr.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
} else if e == nil {
|
||||
break
|
||||
}
|
||||
|
||||
if !value.Match(e) {
|
||||
itr, err := i.tagValueSeriesIDIterator(name, key, e)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if itr != nil {
|
||||
itrs = append(itrs, itr)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}(); err != nil {
|
||||
tsdb.SeriesIDIterators(itrs).Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mitr, err := i.measurementSeriesIDIterator(name)
|
||||
if err != nil {
|
||||
tsdb.SeriesIDIterators(itrs).Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return tsdb.DifferenceSeriesIDIterators(mitr, tsdb.MergeSeriesIDIterators(itrs...)), nil
|
||||
}
|
||||
|
||||
func (i *Index) matchTagValueEqualNotEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (tsdb.SeriesIDIterator, error) {
|
||||
vitr, err := i.TagValueIterator(name, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if vitr == nil {
|
||||
return nil, nil
|
||||
}
|
||||
defer vitr.Close()
|
||||
|
||||
var itrs []tsdb.SeriesIDIterator
|
||||
for {
|
||||
e, err := vitr.Next()
|
||||
if err != nil {
|
||||
tsdb.SeriesIDIterators(itrs).Close()
|
||||
return nil, err
|
||||
} else if e == nil {
|
||||
break
|
||||
}
|
||||
|
||||
if value.Match(e) {
|
||||
itr, err := i.tagValueSeriesIDIterator(name, key, e)
|
||||
if err != nil {
|
||||
tsdb.SeriesIDIterators(itrs).Close()
|
||||
return nil, err
|
||||
} else if itr != nil {
|
||||
itrs = append(itrs, itr)
|
||||
}
|
||||
}
|
||||
}
|
||||
return tsdb.MergeSeriesIDIterators(itrs...), nil
|
||||
}
|
||||
|
||||
func (i *Index) matchTagValueNotEqualEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (tsdb.SeriesIDIterator, error) {
|
||||
vitr, err := i.TagValueIterator(name, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if vitr == nil {
|
||||
return nil, nil
|
||||
}
|
||||
defer vitr.Close()
|
||||
|
||||
var itrs []tsdb.SeriesIDIterator
|
||||
for {
|
||||
e, err := vitr.Next()
|
||||
if err != nil {
|
||||
tsdb.SeriesIDIterators(itrs).Close()
|
||||
return nil, err
|
||||
} else if e == nil {
|
||||
break
|
||||
}
|
||||
|
||||
if !value.Match(e) {
|
||||
itr, err := i.tagValueSeriesIDIterator(name, key, e)
|
||||
if err != nil {
|
||||
tsdb.SeriesIDIterators(itrs).Close()
|
||||
return nil, err
|
||||
} else if itr != nil {
|
||||
itrs = append(itrs, itr)
|
||||
}
|
||||
}
|
||||
}
|
||||
return tsdb.MergeSeriesIDIterators(itrs...), nil
|
||||
}
|
||||
|
||||
func (i *Index) matchTagValueNotEqualNotEmptySeriesIDIterator(name, key []byte, value *regexp.Regexp) (tsdb.SeriesIDIterator, error) {
|
||||
vitr, err := i.TagValueIterator(name, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if vitr == nil {
|
||||
return i.measurementSeriesIDIterator(name)
|
||||
}
|
||||
defer vitr.Close()
|
||||
|
||||
var itrs []tsdb.SeriesIDIterator
|
||||
for {
|
||||
e, err := vitr.Next()
|
||||
if err != nil {
|
||||
tsdb.SeriesIDIterators(itrs).Close()
|
||||
return nil, err
|
||||
} else if e == nil {
|
||||
break
|
||||
}
|
||||
if value.Match(e) {
|
||||
itr, err := i.tagValueSeriesIDIterator(name, key, e)
|
||||
if err != nil {
|
||||
tsdb.SeriesIDIterators(itrs).Close()
|
||||
return nil, err
|
||||
} else if itr != nil {
|
||||
itrs = append(itrs, itr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mitr, err := i.measurementSeriesIDIterator(name)
|
||||
if err != nil {
|
||||
tsdb.SeriesIDIterators(itrs).Close()
|
||||
return nil, err
|
||||
}
|
||||
return tsdb.DifferenceSeriesIDIterators(mitr, tsdb.MergeSeriesIDIterators(itrs...)), nil
|
||||
}
|
||||
|
||||
// IsIndexDir returns true if directory contains at least one partition directory.
|
||||
func IsIndexDir(path string) (bool, error) {
|
||||
fis, err := ioutil.ReadDir(path)
|
||||
|
|
|
@ -40,10 +40,6 @@ import (
|
|||
//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@compact.gen.go.tmpldata compact.gen.go.tmpl
|
||||
//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@reader.gen.go.tmpldata reader.gen.go.tmpl
|
||||
|
||||
func init() {
|
||||
tsdb.RegisterEngine("tsm1", NewEngine)
|
||||
}
|
||||
|
||||
var (
|
||||
// Ensure Engine implements the interface.
|
||||
_ tsdb.Engine = &Engine{}
|
||||
|
@ -121,7 +117,7 @@ const (
|
|||
type Engine struct {
|
||||
mu sync.RWMutex
|
||||
|
||||
index tsdb.Index
|
||||
index *tsi1.Index
|
||||
|
||||
// The following group of fields is used to track the state of level compactions within the
|
||||
// Engine. The WaitGroup is used to monitor the compaction goroutines, the 'done' channel is
|
||||
|
@ -177,7 +173,7 @@ type Engine struct {
|
|||
}
|
||||
|
||||
// NewEngine returns a new instance of Engine.
|
||||
func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *tsdb.SeriesFile, opt tsdb.EngineOptions) tsdb.Engine {
|
||||
func NewEngine(id uint64, idx *tsi1.Index, path string, walPath string, sfile *tsdb.SeriesFile, opt tsdb.EngineOptions) tsdb.Engine {
|
||||
fs := NewFileStore(path)
|
||||
fs.openLimiter = opt.OpenLimiter
|
||||
if opt.FileStoreObserver != nil {
|
||||
|
@ -1193,17 +1189,16 @@ func (e *Engine) DeleteSeriesRangeWithPredicate(itr tsdb.SeriesIterator, predica
|
|||
|
||||
// Ensure that the index does not compact away the measurement or series we're
|
||||
// going to delete before we're done with them.
|
||||
if tsiIndex, ok := e.index.(*tsi1.Index); ok {
|
||||
tsiIndex.DisableCompactions()
|
||||
defer tsiIndex.EnableCompactions()
|
||||
tsiIndex.Wait()
|
||||
e.index.DisableCompactions()
|
||||
defer e.index.EnableCompactions()
|
||||
e.index.Wait()
|
||||
|
||||
fs, err := tsiIndex.RetainFileSet()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer fs.Release()
|
||||
fs, err := e.index.RetainFileSet()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer fs.Release()
|
||||
|
||||
|
||||
var (
|
||||
sz int
|
||||
|
@ -1514,8 +1509,7 @@ func (e *Engine) DeleteMeasurement(name []byte) error {
|
|||
// DeleteMeasurement deletes a measurement and all related series.
|
||||
func (e *Engine) deleteMeasurement(name []byte) error {
|
||||
// Attempt to find the series keys.
|
||||
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
|
||||
itr, err := indexSet.MeasurementSeriesByExprIterator(name, nil)
|
||||
itr, err := e.index.MeasurementSeriesIDIterator(name)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if itr == nil {
|
||||
|
@ -2049,9 +2043,7 @@ func (e *Engine) IteratorCost(measurement string, opt query.IteratorOptions) (qu
|
|||
return query.IteratorCost{}, nil
|
||||
}
|
||||
|
||||
// Determine all of the tag sets for this query.
|
||||
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
|
||||
tagSets, err := indexSet.TagSets(e.sfile, []byte(measurement), opt)
|
||||
tagSets, err := e.index.TagSets([]byte(measurement), opt)
|
||||
if err != nil {
|
||||
return query.IteratorCost{}, err
|
||||
}
|
||||
|
|
|
@ -753,8 +753,7 @@ func TestEngine_DeleteSeriesRange(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check that the series still exists in the index
|
||||
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
|
||||
iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu"))
|
||||
iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu"))
|
||||
if err != nil {
|
||||
t.Fatalf("iterator error: %v", err)
|
||||
}
|
||||
|
@ -785,8 +784,7 @@ func TestEngine_DeleteSeriesRange(t *testing.T) {
|
|||
t.Fatalf("failed to delete series: %v", err)
|
||||
}
|
||||
|
||||
indexSet = tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
|
||||
if iter, err = indexSet.MeasurementSeriesIDIterator([]byte("cpu")); err != nil {
|
||||
if iter, err = e.index.MeasurementSeriesIDIterator([]byte("cpu")); err != nil {
|
||||
t.Fatalf("iterator error: %v", err)
|
||||
}
|
||||
if iter == nil {
|
||||
|
@ -878,8 +876,7 @@ func TestEngine_DeleteSeriesRangeWithPredicate(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check that the series still exists in the index
|
||||
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
|
||||
iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu"))
|
||||
iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu"))
|
||||
if err != nil {
|
||||
t.Fatalf("iterator error: %v", err)
|
||||
}
|
||||
|
@ -910,8 +907,7 @@ func TestEngine_DeleteSeriesRangeWithPredicate(t *testing.T) {
|
|||
t.Fatalf("failed to delete series: %v", err)
|
||||
}
|
||||
|
||||
indexSet = tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
|
||||
if iter, err = indexSet.MeasurementSeriesIDIterator([]byte("cpu")); err != nil {
|
||||
if iter, err = e.index.MeasurementSeriesIDIterator([]byte("cpu")); err != nil {
|
||||
t.Fatalf("iterator error: %v", err)
|
||||
}
|
||||
if iter == nil {
|
||||
|
@ -984,8 +980,7 @@ func TestEngine_DeleteSeriesRangeWithPredicate_Nil(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check that the series still exists in the index
|
||||
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
|
||||
iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu"))
|
||||
iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu"))
|
||||
if err != nil {
|
||||
t.Fatalf("iterator error: %v", err)
|
||||
} else if iter == nil {
|
||||
|
@ -1000,7 +995,7 @@ func TestEngine_DeleteSeriesRangeWithPredicate_Nil(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check that disk series still exists
|
||||
iter, err = indexSet.MeasurementSeriesIDIterator([]byte("disk"))
|
||||
iter, err = e.index.MeasurementSeriesIDIterator([]byte("disk"))
|
||||
if err != nil {
|
||||
t.Fatalf("iterator error: %v", err)
|
||||
} else if iter == nil {
|
||||
|
@ -1091,8 +1086,7 @@ func TestEngine_DeleteSeriesRangeWithPredicate_FlushBatch(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check that the series still exists in the index
|
||||
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
|
||||
iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu"))
|
||||
iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu"))
|
||||
if err != nil {
|
||||
t.Fatalf("iterator error: %v", err)
|
||||
}
|
||||
|
@ -1123,8 +1117,7 @@ func TestEngine_DeleteSeriesRangeWithPredicate_FlushBatch(t *testing.T) {
|
|||
t.Fatalf("failed to delete series: %v", err)
|
||||
}
|
||||
|
||||
indexSet = tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
|
||||
if iter, err = indexSet.MeasurementSeriesIDIterator([]byte("cpu")); err != nil {
|
||||
if iter, err = e.index.MeasurementSeriesIDIterator([]byte("cpu")); err != nil {
|
||||
t.Fatalf("iterator error: %v", err)
|
||||
}
|
||||
if iter == nil {
|
||||
|
@ -1504,7 +1497,7 @@ type Engine struct {
|
|||
*tsm1.Engine
|
||||
root string
|
||||
indexPath string
|
||||
index tsdb.Index
|
||||
index *tsi1.Index
|
||||
sfile *tsdb.SeriesFile
|
||||
}
|
||||
|
||||
|
@ -1523,7 +1516,7 @@ func NewEngine() (*Engine, error) {
|
|||
}
|
||||
|
||||
// Setup series file.
|
||||
sfile := tsdb.NewSeriesFile(filepath.Join(dbPath, tsdb.SeriesFileDirectory))
|
||||
sfile := tsdb.NewSeriesFile(filepath.Join(dbPath, tsdb.DefaultSeriesFileDirectory))
|
||||
sfile.Logger = logger.New(os.Stdout)
|
||||
if err = sfile.Open(); err != nil {
|
||||
return nil, err
|
||||
|
@ -1676,7 +1669,7 @@ func (e *Engine) MustWriteSnapshot() {
|
|||
}
|
||||
}
|
||||
|
||||
func MustOpenIndex(id uint64, database, path string, seriesIDSet *tsdb.SeriesIDSet, sfile *tsdb.SeriesFile, options tsdb.EngineOptions) tsdb.Index {
|
||||
func MustOpenIndex(id uint64, database, path string, seriesIDSet *tsdb.SeriesIDSet, sfile *tsdb.SeriesFile, options tsdb.EngineOptions) *tsi1.Index {
|
||||
idx := tsi1.NewIndex(sfile, database, tsi1.NewConfig(), tsi1.WithPath(path))
|
||||
if err := idx.Open(); err != nil {
|
||||
panic(err)
|
||||
|
|
Loading…
Reference in New Issue