shared in-memory index

pull/7913/head
Ben Johnson 2016-11-16 11:57:55 -07:00
parent a812502ea3
commit 409b0165f5
No known key found for this signature in database
GPG Key ID: 81741CD251883081
14 changed files with 468 additions and 351 deletions

View File

@ -32,8 +32,10 @@ import (
"github.com/influxdata/influxdb/tsdb"
client "github.com/influxdata/usage-client/v1"
"go.uber.org/zap"
// Initialize the engine packages
// Initialize the engine & index packages
_ "github.com/influxdata/influxdb/tsdb/engine"
_ "github.com/influxdata/influxdb/tsdb/index"
)
var startTime time.Time

View File

@ -5684,7 +5684,7 @@ func TestServer_Query_DropAndRecreateMeasurement(t *testing.T) {
&Query{
name: "Drop non-existant measurement",
command: `DROP MEASUREMENT doesntexist`,
exp: `{"results":[{"statement_id":0,"error":"shard 1: measurement not found: doesntexist"}]}`,
exp: `{"results":[{}]}`,
params: url.Values{"db": []string{"db0"}},
},
}...)

View File

@ -236,7 +236,7 @@ type TSDBStore struct {
DeleteRetentionPolicyFn func(database, name string) error
DeleteShardFn func(id uint64) error
DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr) error
DatabaseIndexFn func(name string) *tsdb.DatabaseIndex
DatabaseIndexFn func(name string) tsdb.Index
ShardIteratorCreatorFn func(id uint64) influxql.IteratorCreator
}
@ -304,7 +304,7 @@ func (s *TSDBStore) ShardIteratorCreator(id uint64) influxql.IteratorCreator {
return s.ShardIteratorCreatorFn(id)
}
func (s *TSDBStore) DatabaseIndex(name string) *tsdb.DatabaseIndex {
func (s *TSDBStore) DatabaseIndex(name string) tsdb.Index {
return s.DatabaseIndexFn(name)
}

View File

@ -45,9 +45,6 @@ const (
// ErrDatabaseNotFound returns a database not found error for the given database name.
func ErrDatabaseNotFound(name string) error { return fmt.Errorf("database not found: %s", name) }
// ErrMeasurementNotFound returns a measurement not found error for the given measurement name.
func ErrMeasurementNotFound(name string) error { return fmt.Errorf("measurement not found: %s", name) }
// ErrMaxSelectPointsLimitExceeded is an error when a query hits the maximum number of points.
func ErrMaxSelectPointsLimitExceeded(n, limit int) error {
return fmt.Errorf("max-select-point limit exceeed: (%d/%d)", n, limit)

View File

@ -48,8 +48,7 @@ type Engine interface {
SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
// CreateMeasurement(name string) (*Measurement, error)
DeleteMeasurement(name []byte, seriesKeys [][]byte) error
DeleteMeasurement(name []byte) error
Measurement(name []byte) (*Measurement, error)
Measurements() (Measurements, error)
MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error)
@ -125,7 +124,9 @@ func NewEngine(id uint64, i Index, path string, walPath string, options EngineOp
// EngineOptions represents the options used to initialize the engine.
type EngineOptions struct {
EngineVersion string
IndexVersion string
ShardID uint64
InmemIndex interface{} // shared in-memory index
Config Config
}
@ -134,6 +135,7 @@ type EngineOptions struct {
func NewEngineOptions() EngineOptions {
return EngineOptions{
EngineVersion: DefaultEngine,
IndexVersion: DefaultIndex,
Config: NewConfig(),
}
}

View File

@ -665,6 +665,11 @@ func (e *Engine) addToIndexFromKey(key []byte, fieldType influxql.DataType, inde
if err := mf.CreateFieldIfNotExists(field, fieldType, false); err != nil {
return err
}
_, tags, _ := models.ParseKey(key)
if err := e.index.CreateSeriesIfNotExists([]byte(name), tags); err != nil {
return err
}
return nil
}
@ -831,27 +836,37 @@ func (e *Engine) DeleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
return err
}
var toDelete [][]byte
for k, exists := range existing {
if !exists {
toDelete = append(toDelete, []byte(k))
e.index.UnassignShard(k, e.id)
}
}
return e.index.DropSeries(toDelete)
return nil
}
// DeleteMeasurement deletes a measurement and all related series.
func (e *Engine) DeleteMeasurement(name []byte, seriesKeys [][]byte) error {
func (e *Engine) DeleteMeasurement(name []byte) error {
e.mu.Lock()
delete(e.measurementFields, string(name))
e.mu.Unlock()
if err := e.deleteSeries(seriesKeys); err != nil {
// Attempt to find the series keys.
m, err := e.Measurement(name)
if err != nil {
return err
} else if m != nil {
if err := e.deleteSeries(m.SeriesKeys()); err != nil {
return err
}
return nil
}
// Remove the measurement from the index.
return e.index.DropMeasurement(name)
if err := e.index.DropMeasurement(name); err != nil {
return err
}
return nil
}
func (e *Engine) CreateSeriesIfNotExists(name []byte, tags models.Tags) error {

View File

@ -21,6 +21,7 @@ import (
"github.com/influxdata/influxdb/pkg/deep"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"github.com/influxdata/influxdb/tsdb/index/inmem"
)
/*
@ -157,7 +158,7 @@ func TestEngine_Backup(t *testing.T) {
p3 := MustParsePointString("cpu,host=C value=1.3 3000000000")
// Write those points to the engine.
e := tsm1.NewEngine(1, f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine)
e := tsm1.NewEngine(1, NewIndex(), f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine)
// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
@ -231,10 +232,9 @@ func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) {
e := MustOpenEngine()
defer e.Close()
e.CreateMeasurement("cpu")
// e.CreateMeasurement("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
si, _ := e.CreateSeries("cpu", tsdb.NewSeries([]byte("cpu,host=A"), models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)
e.CreateSeriesIfNotExists([]byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
@ -285,9 +285,8 @@ func TestEngine_CreateIterator_Cache_Descending(t *testing.T) {
e := MustOpenEngine()
defer e.Close()
e.CreateMeasurement("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.CreateSeries("cpu", tsdb.NewSeries([]byte("cpu,host=A"), models.NewTags(map[string]string{"host": "A"})))
e.CreateSeriesIfNotExists([]byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
@ -338,9 +337,8 @@ func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) {
e := MustOpenEngine()
defer e.Close()
e.CreateMeasurement("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.CreateSeries("cpu", tsdb.NewSeries([]byte("cpu,host=A"), models.NewTags(map[string]string{"host": "A"})))
e.CreateSeriesIfNotExists([]byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
@ -392,9 +390,8 @@ func TestEngine_CreateIterator_TSM_Descending(t *testing.T) {
e := MustOpenEngine()
defer e.Close()
e.CreateMeasurement("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.CreateSeries("cpu", tsdb.NewSeries([]byte("cpu,host=A"), models.NewTags(map[string]string{"host": "A"})))
e.CreateSeriesIfNotExists([]byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
@ -446,10 +443,9 @@ func TestEngine_CreateIterator_Aux(t *testing.T) {
e := MustOpenEngine()
defer e.Close()
e.CreateMeasurement("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.MeasurementFields("cpu").CreateFieldIfNotExists("F", influxql.Float, false)
e.CreateSeries("cpu", tsdb.NewSeries([]byte("cpu,host=A"), models.NewTags(map[string]string{"host": "A"})))
e.CreateSeriesIfNotExists([]byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A F=100 1000000000`,
@ -503,14 +499,13 @@ func TestEngine_CreateIterator_Condition(t *testing.T) {
e := MustOpenEngine()
defer e.Close()
e.CreateMeasurement("cpu")
e.MustMeasurement("cpu").SetFieldName("X")
e.MustMeasurement("cpu").SetFieldName("Y")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.MeasurementFields("cpu").CreateFieldIfNotExists("X", influxql.Float, false)
e.MeasurementFields("cpu").CreateFieldIfNotExists("Y", influxql.Float, false)
e.CreateSeries("cpu", tsdb.NewSeries([]byte("cpu,host=A"), models.NewTags(map[string]string{"host": "A"})))
e.CreateSeriesIfNotExists([]byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A X=10 1000000000`,
@ -573,7 +568,7 @@ func TestEngine_DeleteSeries(t *testing.T) {
p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000")
// Write those points to the engine.
e := tsm1.NewEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine)
e := tsm1.NewEngine(1, NewIndex(), f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine)
// e.LoadMetadataIndex(1, MustNewDatabaseIndex("db0")) // Initialise an index
// mock the planner so compactions don't run during the test
@ -760,7 +755,6 @@ func benchmarkEngine_WritePoints(b *testing.B, batchSize int) {
e := MustOpenEngine()
defer e.Close()
e.CreateMeasurement("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
pp := make([]models.Point, 0, batchSize)
@ -902,9 +896,8 @@ func MustInitBenchmarkEngine(pointN int) *Engine {
e := MustOpenEngine()
// Initialize metadata.
e.CreateMeasurement("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.CreateSeries("cpu", tsdb.NewSeries([]byte("cpu,host=A"), models.NewTags(map[string]string{"host": "A"})))
e.CreateSeriesIfNotExists([]byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
// Generate time ascending points with jitterred time & value.
rand := rand.New(rand.NewSource(0))
@ -954,6 +947,7 @@ func NewEngine() *Engine {
}
return &Engine{
Engine: tsm1.NewEngine(1,
NewIndex(),
filepath.Join(root, "data"),
filepath.Join(root, "wal"),
tsdb.NewEngineOptions()).(*tsm1.Engine),
@ -986,6 +980,7 @@ func (e *Engine) Reopen() error {
}
e.Engine = tsm1.NewEngine(1,
NewIndex(),
filepath.Join(e.root, "data"),
filepath.Join(e.root, "wal"),
tsdb.NewEngineOptions()).(*tsm1.Engine)
@ -1057,3 +1052,12 @@ func ParseTags(s string) influxql.Tags {
}
return influxql.NewTags(m)
}
func NewIndex() tsdb.Index {
idx, err := inmem.NewIndex("db")
if err != nil {
panic(err)
}
opt := tsdb.NewEngineOptions()
return &inmem.NewShardIndex(1, "", opt)
}

View File

@ -32,6 +32,12 @@ type Index interface {
Dereference(b []byte)
TagSets(name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error)
// To be removed w/ tsi1.
SetFieldName(measurement, name string)
AssignShard(k string, shardID uint64)
UnassignShard(k string, shardID uint64)
RemoveShard(shardID uint64)
}
// IndexFormat represents the format for an index.
@ -46,7 +52,7 @@ const (
)
// NewIndexFunc creates a new index.
type NewIndexFunc func(id uint64, path string, options IndexOptions) Index
type NewIndexFunc func(id uint64, path string, options EngineOptions) Index
// newIndexFuncs is a lookup of index constructors by name.
var newIndexFuncs = make(map[string]NewIndexFunc)
@ -71,7 +77,7 @@ func RegisteredIndexes() []string {
// NewIndex returns an instance of an index based on its format.
// If the path does not exist then the DefaultFormat is used.
func NewIndex(id uint64, path string, options IndexOptions) (Index, error) {
func NewIndex(id uint64, path string, options EngineOptions) (Index, error) {
// Create a new index.
if _, err := os.Stat(path); os.IsNotExist(err) {
return newIndexFuncs[options.IndexVersion](id, path, options), nil
@ -89,22 +95,5 @@ func NewIndex(id uint64, path string, options IndexOptions) (Index, error) {
return fn(id, path, options), nil
}
// IndexOptions represents the options used to initialize the index.
type IndexOptions struct {
IndexVersion string
ShardID uint64
InmemIndex interface{} // shared in-memory index
Config Config
}
// NewIndexOptions returns the default options.
func NewIndexOptions() IndexOptions {
return IndexOptions{
IndexVersion: DefaultIndex,
Config: NewConfig(),
}
}
// NewInmemIndex returns a new "inmem" index type.
var NewInmemIndex func(name string) (interface{}, error)

View File

@ -16,6 +16,7 @@ import (
"regexp"
"sort"
"sync"
// "sync/atomic"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
@ -143,17 +144,14 @@ func (i *Index) CreateSeriesIfNotExists(shardID uint64, name []byte, tags models
if err != nil {
return err
}
if opt.Config.MaxSeriesPerDatabase > 0 && sn+1 > uint64(opt.Config.MaxSeriesPerDatabase) {
if opt.Config.MaxSeriesPerDatabase > 0 && n+1 > uint64(opt.Config.MaxSeriesPerDatabase) {
return &tsdb.LimitError{
Reason: fmt.Sprintf("max series limit reached: (%d/%d)", sn, opt.Config.MaxSeriesPerDatabase),
Reason: fmt.Sprintf("max-series-per-database limit exceeded: (%d/%d)", n, opt.Config.MaxSeriesPerDatabase),
}
}
// get or create the measurement index
m, err := i.CreateMeasurementIndexIfNotExists(string(name))
if err != nil {
return err
}
m := i.CreateMeasurementIndexIfNotExists(string(name))
i.mu.Lock()
// Check for the series again under a write lock
@ -184,7 +182,7 @@ func (i *Index) CreateSeriesIfNotExists(shardID uint64, name []byte, tags models
// CreateMeasurementIndexIfNotExists creates or retrieves an in memory index
// object for the measurement
func (i *Index) CreateMeasurementIndexIfNotExists(name string) (*tsdb.Measurement, error) {
func (i *Index) CreateMeasurementIndexIfNotExists(name string) *tsdb.Measurement {
name = escape.UnescapeString(name)
// See if the measurement exists using a read-lock
@ -192,7 +190,7 @@ func (i *Index) CreateMeasurementIndexIfNotExists(name string) (*tsdb.Measuremen
m := i.measurements[name]
if m != nil {
i.mu.RUnlock()
return m, nil
return m
}
i.mu.RUnlock()
@ -210,7 +208,7 @@ func (i *Index) CreateMeasurementIndexIfNotExists(name string) (*tsdb.Measuremen
// Add the measurement to the measurements sketch.
i.measurementsSketch.Add([]byte(name))
}
return m, nil
return m
}
// TagsForSeries returns the tag map for the passed in series
@ -517,17 +515,82 @@ func (i *Index) TagSets(shardID uint64, name []byte, dimensions []string, condit
return tagSets, nil
}
// AssignShard update the index to indicate that series k exists in the given shardID.
func (i *Index) AssignShard(k string, shardID uint64) {
ss, _ := i.Series([]byte(k))
if ss != nil {
ss.AssignShard(shardID)
}
}
// UnassignShard updates the index to indicate that series k does not exist in
// the given shardID.
func (i *Index) UnassignShard(k string, shardID uint64) {
ss, _ := i.Series([]byte(k))
if ss != nil {
if ss.Assigned(shardID) {
// Remove the shard from any series
ss.UnassignShard(shardID)
// If this series no longer has shards assigned, remove the series
if ss.ShardN() == 0 {
// Remove the series the measurements
ss.Measurement().DropSeries(ss)
// If the measurement no longer has any series, remove it as well
if !ss.Measurement().HasSeries() {
i.mu.Lock()
i.dropMeasurement(ss.Measurement().Name)
i.mu.Unlock()
}
// Remove the series key from the series index
i.mu.Lock()
delete(i.series, k)
// atomic.AddInt64(&i.stats.NumSeries, -1)
i.mu.Unlock()
}
}
}
}
func (i *Index) SeriesKeys() []string {
i.mu.RLock()
s := make([]string, 0, len(i.series))
for k := range i.series {
s = append(s, k)
}
i.mu.RUnlock()
return s
}
// SetFieldName adds a field name to a measurement.
func (i *Index) SetFieldName(measurement, name string) {
m := i.CreateMeasurementIndexIfNotExists(measurement)
m.SetFieldName(name)
}
// RemoveShard removes all references to shardID from any series or measurements
// in the index. If the shard was the only owner of data for the series, the series
// is removed from the index.
func (i *Index) RemoveShard(shardID uint64) {
for _, k := range i.SeriesKeys() {
i.UnassignShard(k, shardID)
}
}
// Ensure index implements interface.
var _ tsdb.Index = &ShardIndex{}
// ShardIndex represents a wrapper around the shared Index.
type ShardIndex struct {
*Index
id uint64
id uint64
opt tsdb.EngineOptions
}
func (i *ShardIndex) CreateSeriesIfNotExists(name []byte, tags models.Tags) error {
return i.Index.CreateSeriesIfNotExists(i.id, name, tags)
return i.Index.CreateSeriesIfNotExists(i.id, name, tags, &i.opt)
}
// TagSets returns a list of tag sets based on series filtering.
@ -536,9 +599,10 @@ func (i *ShardIndex) TagSets(name []byte, dimensions []string, condition influxq
}
// NewShardIndex returns a new index for a shard.
func NewShardIndex(id uint64, path string, options tsdb.IndexOptions) tsdb.Index {
func NewShardIndex(id uint64, path string, opt tsdb.EngineOptions) tsdb.Index {
return &ShardIndex{
Index: options.InmemIndex.(*Index),
Index: opt.InmemIndex.(*Index),
id: id,
opt: opt,
}
}

View File

@ -722,6 +722,11 @@ func (i *Index) seriesByBinaryExprVarRefIterator(name, key []byte, value *influx
), nil
}
func (i *Index) SetFieldName(measurement, name string) {}
func (i *Index) RemoveShard(shardID uint64) {}
func (i *Index) AssignShard(k string, shardID uint64) {}
func (i *Index) UnassignShard(k string, shardID uint64) {}
// File represents a log or index file.
type File interface {
Series(name []byte, tags models.Tags) SeriesElem

View File

@ -751,6 +751,32 @@ func expandExprWithValues(expr influxql.Expr, keys []string, tagExprs []tagExpr,
return exprs
}
// SeriesIDsAllOrByExpr walks an expressions for matching series IDs
// or, if no expressions is given, returns all series IDs for the measurement.
func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error) {
m.mu.RLock()
defer m.mu.RUnlock()
return m.seriesIDsAllOrByExpr(expr)
}
func (m *Measurement) seriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error) {
// If no expression given or the measurement has no series,
// we can take just return the ids or nil accordingly.
if expr == nil {
return m.seriesIDs, nil
} else if len(m.seriesIDs) == 0 {
return nil, nil
}
// Get series IDs that match the WHERE clause.
ids, _, err := m.walkWhereForSeriesIds(expr)
if err != nil {
return nil, err
}
return ids, nil
}
// tagKeysByExpr extracts the tag keys wanted by the expression.
func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (stringSet, bool, error) {
switch e := expr.(type) {
@ -1044,6 +1070,13 @@ func (s *Series) assigned(shardID uint64) bool {
return i < len(s.shardIDs) && s.shardIDs[i] == shardID
}
func (s *Series) ShardN() int {
s.mu.RLock()
n := len(s.shardIDs)
s.mu.RUnlock()
return n
}
// Measurement returns the measurement on the series.
func (s *Series) Measurement() *Measurement {
return s.measurement

View File

@ -109,8 +109,7 @@ type Shard struct {
database string
retentionPolicy string
engineOptions EngineOptions
indexOptions IndexOptions
options EngineOptions
mu sync.RWMutex
engine Engine
@ -129,17 +128,16 @@ type Shard struct {
}
// NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index
func NewShard(id uint64, path string, walPath string, engineOptions EngineOptions, indexOptions IndexOptions) *Shard {
func NewShard(id uint64, path string, walPath string, opt EngineOptions) *Shard {
db, rp := decodeStorePath(path)
logger := zap.New(zap.NullEncoder())
s := &Shard{
id: id,
path: path,
walPath: walPath,
engineOptions: engineOptions,
indexOptions: indexOptions,
closing: make(chan struct{}),
id: id,
path: path,
walPath: walPath,
options: opt,
closing: make(chan struct{}),
stats: &ShardStatistics{},
defaultTags: models.StatisticTags{
@ -148,7 +146,7 @@ func NewShard(id uint64, path string, walPath string, engineOptions EngineOption
"id": fmt.Sprintf("%d", id),
"database": db,
"retentionPolicy": rp,
"engine": engineOptions.EngineVersion,
"engine": opt.EngineVersion,
},
database: db,
@ -247,7 +245,7 @@ func (s *Shard) Open() error {
// Initialize underlying index.
ipath := filepath.Join(s.path, "index")
idx, err := NewIndex(s.id, ipath, s.indexOptions)
idx, err := NewIndex(s.id, ipath, s.options)
if err != nil {
return err
}
@ -259,7 +257,7 @@ func (s *Shard) Open() error {
s.index = idx
// Initialize underlying engine.
e, err := NewEngine(s.id, idx, s.path, s.walPath, s.engineOptions)
e, err := NewEngine(s.id, idx, s.path, s.walPath, s.options)
if err != nil {
return err
}
@ -274,9 +272,22 @@ func (s *Shard) Open() error {
if err := e.Open(); err != nil {
return err
}
// Load metadata index.
start := time.Now()
if err := e.LoadMetadataIndex(s.id, s.index); err != nil {
return err
}
// TODO(benbjohnson):
// count := s.index.SeriesShardN(s.id)
// atomic.AddInt64(&s.stats.SeriesCreated, int64(count))
s.engine = e
s.logger.Info(fmt.Sprintf("%s database index loaded in %s", s.path, time.Now().Sub(start)))
s.logger.Printf("%s database index loaded in %s", s.path, time.Now().Sub(start))
go s.monitor()
return nil
@ -312,6 +323,8 @@ func (s *Shard) close() error {
close(s.closing)
}
s.UnloadIndex()
err := s.engine.Close()
if err == nil {
s.engine = nil
@ -346,7 +359,12 @@ func (s *Shard) LastModified() time.Time {
return s.engine.LastModified()
}
// DiskSize returns the size on disk of this shard.
// UnloadIndex removes all references to this shard from the DatabaseIndex
func (s *Shard) UnloadIndex() {
s.index.RemoveShard(s.id)
}
// DiskSize returns the size on disk of this shard
func (s *Shard) DiskSize() (int64, error) {
var size int64
err := filepath.Walk(s.path, func(_ string, fi os.FileInfo, err error) error {
@ -453,19 +471,7 @@ func (s *Shard) DeleteMeasurement(name []byte) error {
if err := s.ready(); err != nil {
return err
}
// Attempt to find the series keys.
m, err := s.engine.Measurement(name)
if err != nil {
return err
}
if m == nil {
return influxql.ErrMeasurementNotFound(string(name))
}
// Remove the measurement from the engine.
return s.engine.DeleteMeasurement(name, m.SeriesKeys())
return s.engine.DeleteMeasurement(name)
}
func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error {
@ -479,6 +485,8 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error
if err := mf.CreateFieldIfNotExists(f.Field.Name, f.Field.Type, false); err != nil {
return err
}
s.index.SetFieldName(f.Measurement, f.Field.Name)
}
return nil
@ -493,7 +501,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
reason string
)
if s.engineOptions.Config.MaxValuesPerTag > 0 {
if s.options.Config.MaxValuesPerTag > 0 {
// Validate that all the new points would not exceed any limits, if so, we drop them
// and record why/increment counters
for i, p := range points {
@ -510,10 +518,10 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
}
n := m.CardinalityBytes(tag.Key)
if n >= s.engineOptions.Config.MaxValuesPerTag {
if n >= s.options.Config.MaxValuesPerTag {
dropPoint = true
reason = fmt.Sprintf("max-values-per-tag limit exceeded (%d/%d): measurement=%q tag=%q value=%q",
n, s.engineOptions.Config.MaxValuesPerTag, m.Name, string(tag.Key), string(tag.Key))
n, s.options.Config.MaxValuesPerTag, m.Name, string(tag.Key), string(tag.Key))
break
}
}
@ -563,7 +571,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
if err, ok := err.(*LimitError); ok {
atomic.AddInt64(&s.stats.WritePointsDropped, 1)
dropped += 1
reason = err.Reason
reason = fmt.Sprintf("db=%s: %s", s.database, err.Reason)
continue
}
return nil, nil, err
@ -650,6 +658,11 @@ func (s *Shard) MeasurementsByExpr(cond influxql.Expr) (Measurements, bool, erro
return s.engine.MeasurementsByExpr(cond)
}
// MeasurementFields returns fields for a measurement.
func (s *Shard) MeasurementFields(name []byte) *MeasurementFields {
return s.engine.MeasurementFields(string(name))
}
// SeriesN returns the exact number of series in the shard.
func (s *Shard) SeriesN() (uint64, error) {
if err := s.ready(); err != nil {
@ -864,14 +877,14 @@ func (s *Shard) monitor() {
}
atomic.StoreInt64(&s.stats.DiskBytes, size)
case <-t2.C:
if s.engineOptions.Config.MaxValuesPerTag == 0 {
if s.options.Config.MaxValuesPerTag == 0 {
continue
}
for _, m := range s.Measurements() {
for _, k := range m.TagKeys() {
n := m.Cardinality(k)
perc := int(float64(n) / float64(s.engineOptions.Config.MaxValuesPerTag) * 100)
perc := int(float64(n) / float64(s.options.Config.MaxValuesPerTag) * 100)
if perc > 100 {
perc = 100
}
@ -879,7 +892,7 @@ func (s *Shard) monitor() {
// Log at 80, 85, 90-100% levels
if perc == 80 || perc == 85 || perc >= 90 {
s.logger.Info(fmt.Sprintf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s",
perc, n, s.engineOptions.Config.MaxValuesPerTag, s.database, s.id, m.Name, k))
perc, n, s.options.Config.MaxValuesPerTag, s.database, s.id, m.Name, k))
}
})
}
@ -976,6 +989,13 @@ func (m *MeasurementFields) CreateFieldIfNotExists(name string, typ influxql.Dat
return nil
}
func (m *MeasurementFields) FieldN() int {
m.mu.RLock()
n := len(m.fields)
m.mu.RUnlock()
return n
}
// Field returns the field for name, or nil if there is no field for name.
func (m *MeasurementFields) Field(name string) *Field {
m.mu.RLock()
@ -1216,33 +1236,29 @@ func (itr *seriesIterator) Next() (*influxql.FloatPoint, error) {
// nextKeys reads all keys for the next measurement.
func (itr *seriesIterator) nextKeys() error {
panic("MOVE TO TSI")
/*
for {
// Ensure previous keys are cleared out.
itr.keys.i, itr.keys.buf = 0, itr.keys.buf[:0]
// Read next measurement.
if len(itr.mms) == 0 {
return nil
}
mm := itr.mms[0]
itr.mms = itr.mms[1:]
// Read all series keys.
ids, err := mm.seriesIDsAllOrByExpr(itr.opt.Condition)
if err != nil {
return err
} else if len(ids) == 0 {
continue
}
itr.keys.buf = mm.AppendSeriesKeysByID(itr.keys.buf, ids)
sort.Strings(itr.keys.buf)
for {
// Ensure previous keys are cleared out.
itr.keys.i, itr.keys.buf = 0, itr.keys.buf[:0]
// Read next measurement.
if len(itr.mms) == 0 {
return nil
}
*/
mm := itr.mms[0]
itr.mms = itr.mms[1:]
// Read all series keys.
ids, err := mm.seriesIDsAllOrByExpr(itr.opt.Condition)
if err != nil {
return err
} else if len(ids) == 0 {
continue
}
itr.keys.buf = mm.AppendSeriesKeysByID(itr.keys.buf, ids)
sort.Strings(itr.keys.buf)
return nil
}
}
// NewTagKeysIterator returns a new instance of TagKeysIterator.

View File

@ -30,13 +30,11 @@ func TestShardWriteAndIndex(t *testing.T) {
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
eopts := tsdb.NewEngineOptions()
eopts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = MustNewInmemIndex("db")
iopts := tsdb.NewIndexOptions()
iopts.InmemIndex = MustNewInmemIndex("db")
sh := tsdb.NewShard(1, tmpShard, tmpWal, eopts, iopts)
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
// Calling WritePoints when the engine is not open will return
// ErrEngineClosed.
@ -80,7 +78,7 @@ func TestShardWriteAndIndex(t *testing.T) {
// ensure the index gets loaded after closing and opening the shard
sh.Close()
sh = tsdb.NewShard(1, tmpShard, tmpWal, eopts, iopts)
sh = tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
@ -101,13 +99,12 @@ func TestMaxSeriesLimit(t *testing.T) {
tmpShard := path.Join(tmpDir, "db", "rp", "1")
tmpWal := path.Join(tmpDir, "wal")
eopts := tsdb.NewEngineOptions()
eopts.Config.WALDir = filepath.Join(tmpDir, "wal")
eopts.Config.MaxSeriesPerDatabase = 1000
iopts := tsdb.NewIndexOptions()
iopts.InmemIndex = MustNewInmemIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.Config.MaxSeriesPerDatabase = 1000
opts.InmemIndex = MustNewInmemIndex("db")
sh := tsdb.NewShard(1, tmpShard, tmpWal, eopts, iopts)
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
@ -142,7 +139,7 @@ func TestMaxSeriesLimit(t *testing.T) {
err = sh.WritePoints([]models.Point{pt})
if err == nil {
t.Fatal("expected error")
} else if exp, got := `max-series-per-database limit exceeded: db=db (1000/1000) dropped=1`, err.Error(); exp != got {
} else if exp, got := `db=db: max-series-per-database limit exceeded: (1000/1000) dropped=1`, err.Error(); exp != got {
t.Fatalf("unexpected error message:\n\texp = %s\n\tgot = %s", exp, got)
}
@ -155,13 +152,12 @@ func TestShard_MaxTagValuesLimit(t *testing.T) {
tmpShard := path.Join(tmpDir, "db", "rp", "1")
tmpWal := path.Join(tmpDir, "wal")
eopts := tsdb.NewEngineOptions()
eopts.Config.WALDir = filepath.Join(tmpDir, "wal")
eopts.Config.MaxValuesPerTag = 1000
iopts := tsdb.NewIndexOptions()
iopts.InmemIndex = MustNewInmemIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.Config.MaxValuesPerTag = 1000
opts.InmemIndex = MustNewInmemIndex("db")
sh := tsdb.NewShard(1, tmpShard, tmpWal, eopts, iopts)
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
@ -209,12 +205,11 @@ func TestWriteTimeTag(t *testing.T) {
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
eopts := tsdb.NewEngineOptions()
eopts.Config.WALDir = filepath.Join(tmpDir, "wal")
iopts := tsdb.NewIndexOptions()
iopts.InmemIndex = MustNewInmemIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = MustNewInmemIndex("db")
sh := tsdb.NewShard(1, tmpShard, tmpWal, eopts, iopts)
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
@ -255,12 +250,12 @@ func TestWriteTimeTag(t *testing.T) {
t.Fatalf("unexpected log message: %s", strings.TrimSpace(got))
}
m = sh.Measurement([]byte("cpu"))
if m == nil {
t.Fatal("expected cpu measurement")
mf := sh.MeasurementFields([]byte("cpu"))
if mf == nil {
t.Fatal("expected cpu measurement fields")
}
if got, exp := len(m.FieldNames()), 1; got != exp {
if got, exp := mf.FieldN(), 1; got != exp {
t.Fatalf("invalid number of field names: got=%v exp=%v", got, exp)
}
}
@ -271,12 +266,11 @@ func TestWriteTimeField(t *testing.T) {
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
eopts := tsdb.NewEngineOptions()
eopts.Config.WALDir = filepath.Join(tmpDir, "wal")
iopts := tsdb.NewIndexOptions()
iopts.InmemIndex = MustNewInmemIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = MustNewInmemIndex("db")
sh := tsdb.NewShard(1, tmpShard, tmpWal, eopts, iopts)
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
@ -304,12 +298,11 @@ func TestShardWriteAddNewField(t *testing.T) {
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
eopts := tsdb.NewEngineOptions()
eopts.Config.WALDir = filepath.Join(tmpDir, "wal")
iopts := tsdb.NewIndexOptions()
iopts.InmemIndex = MustNewInmemIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = MustNewInmemIndex("db")
sh := tsdb.NewShard(1, tmpShard, tmpWal, eopts, iopts)
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
@ -359,12 +352,11 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
eopts := tsdb.NewEngineOptions()
eopts.Config.WALDir = filepath.Join(tmpDir, "wal")
iopts := tsdb.NewIndexOptions()
iopts.InmemIndex = MustNewInmemIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = MustNewInmemIndex("db")
sh := tsdb.NewShard(1, tmpShard, tmpWal, eopts, iopts)
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
@ -431,12 +423,11 @@ func TestShard_Close_RemoveIndex(t *testing.T) {
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
eopts := tsdb.NewEngineOptions()
eopts.Config.WALDir = filepath.Join(tmpDir, "wal")
iopts := tsdb.NewIndexOptions()
iopts.InmemIndex = MustNewInmemIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = MustNewInmemIndex("db")
sh := tsdb.NewShard(1, tmpShard, tmpWal, eopts, iopts)
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
@ -835,7 +826,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) {
tmpDir, _ := ioutil.TempDir("", "shard_test")
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
shard := tsdb.NewShard(1, tmpShard, tmpWal, tsdb.NewEngineOptions(), tsdb.NewIndexOptions())
shard := tsdb.NewShard(1, tmpShard, tmpWal, tsdb.NewEngineOptions())
shard.Open()
b.StartTimer()
@ -869,7 +860,7 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
shard := tsdb.NewShard(1, tmpShard, tmpWal, tsdb.NewEngineOptions(), tsdb.NewIndexOptions())
shard := tsdb.NewShard(1, tmpShard, tmpWal, tsdb.NewEngineOptions())
shard.Open()
defer shard.Close()
chunkedWrite(shard, points)
@ -925,16 +916,15 @@ func NewShard() *Shard {
}
// Build engine options.
eopt := tsdb.NewEngineOptions()
eopt.Config.WALDir = filepath.Join(path, "wal")
iopt := tsdb.NewIndexOptions()
iopt.InmemIndex = MustNewInmemIndex("db")
opt := tsdb.NewEngineOptions()
opt.Config.WALDir = filepath.Join(path, "wal")
opt.InmemIndex = MustNewInmemIndex("db")
return &Shard{
Shard: tsdb.NewShard(0,
filepath.Join(path, "data", "db0", "rp0", "1"),
filepath.Join(path, "wal", "db0", "rp0", "1"),
eopt, iopt,
opt,
),
path: path,
}

View File

@ -10,6 +10,7 @@ import (
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
@ -48,7 +49,6 @@ type Store struct {
shards map[uint64]*Shard
EngineOptions EngineOptions
IndexOptions IndexOptions
Logger *log.Logger
baseLogger zap.Logger
@ -68,7 +68,6 @@ func NewStore(path string) *Store {
path: path,
indexes: make(map[string]interface{}),
EngineOptions: NewEngineOptions(),
IndexOptions: NewIndexOptions(),
Logger: logger,
baseLogger: logger,
}
@ -175,7 +174,7 @@ func (s *Store) loadShards() error {
}
// Retrieve database index.
idx, err := s.CreateIndexIfNotExists(db.Name())
idx, err := s.createIndexIfNotExists(db.Name())
if err != nil {
return err
}
@ -213,12 +212,12 @@ func (s *Store) loadShards() error {
return
}
// Copy index options and assign shared index.
idxopt := s.IndexOptions
idxopt.InmemIndex = idx
// Copy options and assign shared index.
opt := s.EngineOptions
opt.InmemIndex = idx
// Open engine.
shard := NewShard(shardID, path, walPath, s.EngineOptions, idxopt)
shard := NewShard(shardID, path, walPath, opt)
shard.WithLogger(s.baseLogger)
err = shard.Open()
@ -277,6 +276,10 @@ func (s *Store) Close() error {
func (s *Store) CreateIndexIfNotExists(name string) (interface{}, error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.createIndexIfNotExists(name)
}
func (s *Store) createIndexIfNotExists(name string) (interface{}, error) {
if idx := s.indexes[name]; idx != nil {
return idx, nil
}
@ -351,17 +354,17 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en
}
// Retrieve shared index, if needed.
idx, err := s.CreateIndexIfNotExists(database)
idx, err := s.createIndexIfNotExists(database)
if err != nil {
return err
}
// Copy index options and pass in shared index.
idxopt := s.IndexOptions
idxopt.InmemIndex = idx
opt := s.EngineOptions
opt.InmemIndex = idx
path := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10))
shard := NewShard(shardID, path, walPath, s.EngineOptions, s.IndexOptions)
shard := NewShard(shardID, path, walPath, opt)
shard.WithLogger(s.baseLogger)
shard.EnableOnOpen = enabled
@ -403,6 +406,11 @@ func (s *Store) DeleteShard(shardID uint64) error {
return nil
}
// Remove the shard from the database indexes before closing the shard.
// Closing the shard will do this as well, but it will unload it while
// the shard is locked which can block stats collection and other calls.
sh.UnloadIndex()
if err := sh.Close(); err != nil {
return err
}
@ -739,82 +747,78 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) {
// DeleteSeries loops through the local shards and deletes the series data for
// the passed in series keys.
func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error {
panic("MOVE TO TSI")
// Expand regex expressions in the FROM clause.
a, err := s.ExpandSources(sources)
if err != nil {
return err
} else if sources != nil && len(sources) != 0 && len(a) == 0 {
return nil
}
sources = a
/*
// Expand regex expressions in the FROM clause.
a, err := s.ExpandSources(sources)
if err != nil {
return err
} else if sources != nil && len(sources) != 0 && len(a) == 0 {
return nil
// Determine deletion time range.
min, max, err := influxql.TimeRangeAsEpochNano(condition)
if err != nil {
return err
}
s.mu.RLock()
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()
mMap := make(map[string]*Measurement)
for _, shard := range shards {
shardMeasures := shard.Measurements()
for _, m := range shardMeasures {
mMap[m.Name] = m
}
sources = a
}
// Determine deletion time range.
min, max, err := influxql.TimeRangeAsEpochNano(condition)
if err != nil {
return err
}
s.mu.RLock()
defer s.mu.RUnlock()
s.mu.RLock()
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()
measurements, err := measurementsFromSourcesOrDB(mMap, sources...)
if err != nil {
return err
}
mMap := make(map[string]*Measurement)
for _, shard := range shards {
shardMeasures := shard.Measurements()
for _, m := range shardMeasures {
mMap[m.Name] = m
}
}
s.mu.RLock()
defer s.mu.RUnlock()
measurements, err := measurementsFromSourcesOrDB(mMap, sources...)
if err != nil {
return err
}
var seriesKeys [][]byte
for _, m := range measurements {
var ids SeriesIDs
var filters FilterExprs
if condition != nil {
// Get series IDs that match the WHERE clause.
ids, filters, err = m.walkWhereForSeriesIds(condition)
if err != nil {
return err
}
// Delete boolean literal true filter expressions.
// These are returned for `WHERE tagKey = 'tagVal'` type expressions and are okay.
filters.DeleteBoolLiteralTrues()
// Check for unsupported field filters.
// Any remaining filters means there were fields (e.g., `WHERE value = 1.2`).
if filters.Len() > 0 {
return errors.New("fields not supported in WHERE clause during deletion")
}
} else {
// No WHERE clause so get all series IDs for this measurement.
ids = m.seriesIDs
}
for _, id := range ids {
seriesKeys = append(seriesKeys, []byte(m.seriesByID[id].Key))
}
}
// delete the raw series data.
return s.walkShards(shards, func(sh *Shard) error {
if err := sh.DeleteSeriesRange(seriesKeys, min, max); err != nil {
var seriesKeys [][]byte
for _, m := range measurements {
var ids SeriesIDs
var filters FilterExprs
if condition != nil {
// Get series IDs that match the WHERE clause.
ids, filters, err = m.walkWhereForSeriesIds(condition)
if err != nil {
return err
}
return nil
})
*/
// Delete boolean literal true filter expressions.
// These are returned for `WHERE tagKey = 'tagVal'` type expressions and are okay.
filters.DeleteBoolLiteralTrues()
// Check for unsupported field filters.
// Any remaining filters means there were fields (e.g., `WHERE value = 1.2`).
if filters.Len() > 0 {
return errors.New("fields not supported in WHERE clause during deletion")
}
} else {
// No WHERE clause so get all series IDs for this measurement.
ids = m.seriesIDs
}
for _, id := range ids {
seriesKeys = append(seriesKeys, []byte(m.seriesByID[id].Key))
}
}
// delete the raw series data.
return s.walkShards(shards, func(sh *Shard) error {
if err := sh.DeleteSeriesRange(seriesKeys, min, max); err != nil {
return err
}
return nil
})
}
// ExpandSources expands sources against all local shards.
@ -927,114 +931,110 @@ type TagValues struct {
// TagValues returns the tag keys and values in the given database, matching the condition.
func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, error) {
panic("MOVE TO TSI")
if cond == nil {
return nil, errors.New("a condition is required")
}
/*
if cond == nil {
return nil, errors.New("a condition is required")
}
measurementExpr := influxql.CloneExpr(cond)
measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || tag.Val != "_name" {
return nil
}
measurementExpr := influxql.CloneExpr(cond)
measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || tag.Val != "_name" {
return nil
}
}
return e
}), nil)
}
return e
}), nil)
// Get all measurements for the shards we're interested in.
s.mu.RLock()
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()
// Get all measurements for the shards we're interested in.
s.mu.RLock()
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()
var measures Measurements
for _, sh := range shards {
mms, ok, err := sh.MeasurementsByExpr(measurementExpr)
if err != nil {
return nil, err
} else if !ok {
// TODO(edd): can we simplify this so we don't have to check the
// ok value, and we can call sh.measurements with a shard filter
// instead?
mms = sh.Measurements()
}
measures = append(measures, mms...)
var measures Measurements
for _, sh := range shards {
mms, ok, err := sh.MeasurementsByExpr(measurementExpr)
if err != nil {
return nil, err
} else if !ok {
// TODO(edd): can we simplify this so we don't have to check the
// ok value, and we can call sh.measurements with a shard filter
// instead?
mms = sh.Measurements()
}
// If there are no measurements, return immediately.
if len(measures) == 0 {
return nil, nil
}
sort.Sort(measures)
measures = append(measures, mms...)
}
filterExpr := influxql.CloneExpr(cond)
filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || strings.HasPrefix(tag.Val, "_") {
return nil
}
// If there are no measurements, return immediately.
if len(measures) == 0 {
return nil, nil
}
sort.Sort(measures)
filterExpr := influxql.CloneExpr(cond)
filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || strings.HasPrefix(tag.Val, "_") {
return nil
}
}
return e
}), nil)
}
return e
}), nil)
tagValues := make([]TagValues, len(measures))
for i, mm := range measures {
tagValues[i].Measurement = mm.Name
tagValues := make([]TagValues, len(measures))
for i, mm := range measures {
tagValues[i].Measurement = mm.Name
ids, err := mm.SeriesIDsAllOrByExpr(filterExpr)
if err != nil {
return nil, err
}
ss := mm.SeriesByIDSlice(ids)
ids, err := mm.SeriesIDsAllOrByExpr(filterExpr)
if err != nil {
return nil, err
}
ss := mm.SeriesByIDSlice(ids)
// Determine a list of keys from condition.
keySet, ok, err := mm.TagKeysByExpr(cond)
if err != nil {
return nil, err
}
// Loop over all keys for each series.
m := make(map[KeyValue]struct{}, len(ss))
for _, series := range ss {
for _, t := range series.Tags {
if !ok {
// nop
} else if _, exists := keySet[string(t.Key)]; !exists {
continue
}
m[KeyValue{string(t.Key), string(t.Value)}] = struct{}{}
}
}
// Return an empty slice if there are no key/value matches.
if len(m) == 0 {
continue
}
// Sort key/value set.
a := make([]KeyValue, 0, len(m))
for kv := range m {
a = append(a, kv)
}
sort.Sort(KeyValues(a))
tagValues[i].Values = a
// Determine a list of keys from condition.
keySet, ok, err := mm.TagKeysByExpr(cond)
if err != nil {
return nil, err
}
return tagValues, nil
*/
// Loop over all keys for each series.
m := make(map[KeyValue]struct{}, len(ss))
for _, series := range ss {
for _, t := range series.Tags {
if !ok {
// nop
} else if _, exists := keySet[string(t.Key)]; !exists {
continue
}
m[KeyValue{string(t.Key), string(t.Value)}] = struct{}{}
}
}
// Return an empty slice if there are no key/value matches.
if len(m) == 0 {
continue
}
// Sort key/value set.
a := make([]KeyValue, 0, len(m))
for kv := range m {
a = append(a, kv)
}
sort.Sort(KeyValues(a))
tagValues[i].Values = a
}
return tagValues, nil
}
// KeyValue holds a string key and a string value.