intermediate
parent
fda84955ea
commit
1ac067e53b
|
@ -12,6 +12,9 @@ const (
|
||||||
// DefaultEngine is the default engine for new shards
|
// DefaultEngine is the default engine for new shards
|
||||||
DefaultEngine = "tsm1"
|
DefaultEngine = "tsm1"
|
||||||
|
|
||||||
|
// DefaultIndex is the default index for new shards
|
||||||
|
DefaultIndex = "inmem"
|
||||||
|
|
||||||
// tsdb/engine/wal configuration options
|
// tsdb/engine/wal configuration options
|
||||||
|
|
||||||
// Default settings for TSM
|
// Default settings for TSM
|
||||||
|
@ -48,6 +51,7 @@ const (
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Dir string `toml:"dir"`
|
Dir string `toml:"dir"`
|
||||||
Engine string `toml:"-"`
|
Engine string `toml:"-"`
|
||||||
|
Index string `toml:"-"`
|
||||||
|
|
||||||
// General WAL configuration options
|
// General WAL configuration options
|
||||||
WALDir string `toml:"wal-dir"`
|
WALDir string `toml:"wal-dir"`
|
||||||
|
@ -80,6 +84,7 @@ type Config struct {
|
||||||
func NewConfig() Config {
|
func NewConfig() Config {
|
||||||
return Config{
|
return Config{
|
||||||
Engine: DefaultEngine,
|
Engine: DefaultEngine,
|
||||||
|
Index: DefaultIndex,
|
||||||
|
|
||||||
QueryLogEnabled: true,
|
QueryLogEnabled: true,
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,484 @@
|
||||||
|
package inmem
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"regexp"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb/influxql"
|
||||||
|
"github.com/influxdata/influxdb/models"
|
||||||
|
"github.com/influxdata/influxdb/pkg/escape"
|
||||||
|
"github.com/influxdata/influxdb/pkg/estimator"
|
||||||
|
"github.com/influxdata/influxdb/pkg/estimator/hll"
|
||||||
|
"github.com/influxdata/influxdb/tsdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Ensure index implements interface.
|
||||||
|
var _ tsdb.Index = &Index{}
|
||||||
|
|
||||||
|
// Index is the in memory index of a collection of measurements, time
|
||||||
|
// series, and their tags. Exported functions are goroutine safe while
|
||||||
|
// un-exported functions assume the caller will use the appropriate locks.
|
||||||
|
type Index struct {
|
||||||
|
// In-memory metadata index, built on load and updated when new series come in
|
||||||
|
mu sync.RWMutex
|
||||||
|
measurements map[string]*tsdb.Measurement // measurement name to object and index
|
||||||
|
series map[string]*tsdb.Series // map series key to the Series object
|
||||||
|
lastID uint64 // last used series ID. They're in memory only for this shard
|
||||||
|
|
||||||
|
seriesSketch, seriesTSSketch *hll.Plus
|
||||||
|
measurementsSketch, measurementsTSSketch *hll.Plus
|
||||||
|
|
||||||
|
name string // name of the database represented by this index
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewIndex returns a new initialized Index.
|
||||||
|
func NewIndex(name string) (index *Index, err error) {
|
||||||
|
index = &Index{
|
||||||
|
measurements: make(map[string]*tsdb.Measurement),
|
||||||
|
series: make(map[string]*tsdb.Series),
|
||||||
|
name: name,
|
||||||
|
}
|
||||||
|
|
||||||
|
if index.seriesSketch, err = hll.NewPlus(16); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if index.seriesTSSketch, err = hll.NewPlus(16); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if index.measurementsSketch, err = hll.NewPlus(16); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if index.measurementsTSSketch, err = hll.NewPlus(16); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return index, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *Index) Open() (err error) { return nil }
|
||||||
|
func (i *Index) Close() error { return nil }
|
||||||
|
|
||||||
|
// Series returns a series by key.
|
||||||
|
func (i *Index) Series(key []byte) (*tsdb.Series, error) {
|
||||||
|
i.mu.RLock()
|
||||||
|
s := i.series[string(key)]
|
||||||
|
i.mu.RUnlock()
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SeriesN returns the exact number of series in the index.
|
||||||
|
func (i *Index) SeriesN() (uint64, error) {
|
||||||
|
i.mu.RLock()
|
||||||
|
defer i.mu.RUnlock()
|
||||||
|
return uint64(len(i.series)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateSeriesIfNotExists creates a series if it doesn't already exist.
|
||||||
|
func (i *Index) CreateSeriesIfNotExists(name []byte, tags models.Tags) error {
|
||||||
|
panic("TODO")
|
||||||
|
}
|
||||||
|
|
||||||
|
// SeriesSketch returns the sketch for the series.
|
||||||
|
func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
|
||||||
|
i.mu.RLock()
|
||||||
|
defer i.mu.RUnlock()
|
||||||
|
return i.seriesSketch, i.seriesTSSketch, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Measurement returns the measurement object from the index by the name
|
||||||
|
func (i *Index) Measurement(name []byte) (*tsdb.Measurement, error) {
|
||||||
|
i.mu.RLock()
|
||||||
|
defer i.mu.RUnlock()
|
||||||
|
return i.measurements[string(name)], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MeasurementsSketch returns the sketch for the series.
|
||||||
|
func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
|
||||||
|
i.mu.RLock()
|
||||||
|
defer i.mu.RUnlock()
|
||||||
|
return i.measurementsSketch, i.measurementsTSSketch, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MeasurementsByName returns a list of measurements.
|
||||||
|
func (i *Index) MeasurementsByName(names [][]byte) ([]*tsdb.Measurement, error) {
|
||||||
|
i.mu.RLock()
|
||||||
|
defer i.mu.RUnlock()
|
||||||
|
|
||||||
|
a := make([]*tsdb.Measurement, 0, len(names))
|
||||||
|
for _, name := range names {
|
||||||
|
if m := i.measurements[string(name)]; m != nil {
|
||||||
|
a = append(a, m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return a, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateSeriesIndexIfNotExists adds the series for the given measurement to the
|
||||||
|
// index and sets its ID or returns the existing series object
|
||||||
|
func (i *Index) CreateSeriesIndexIfNotExists(measurementName string, series *tsdb.Series) (*tsdb.Series, error) {
|
||||||
|
i.mu.RLock()
|
||||||
|
// if there is a measurement for this id, it's already been added
|
||||||
|
ss := i.series[series.Key]
|
||||||
|
if ss != nil {
|
||||||
|
i.mu.RUnlock()
|
||||||
|
return ss, nil
|
||||||
|
}
|
||||||
|
i.mu.RUnlock()
|
||||||
|
|
||||||
|
// get or create the measurement index
|
||||||
|
m, err := i.CreateMeasurementIndexIfNotExists(measurementName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
i.mu.Lock()
|
||||||
|
// Check for the series again under a write lock
|
||||||
|
ss = i.series[series.Key]
|
||||||
|
if ss != nil {
|
||||||
|
i.mu.Unlock()
|
||||||
|
return ss, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// set the in memory ID for query processing on this shard
|
||||||
|
series.ID = i.lastID + 1
|
||||||
|
i.lastID++
|
||||||
|
|
||||||
|
series.SetMeasurement(m)
|
||||||
|
i.series[series.Key] = series
|
||||||
|
|
||||||
|
m.AddSeries(series)
|
||||||
|
|
||||||
|
// Add the series to the series sketch.
|
||||||
|
i.seriesSketch.Add([]byte(series.Key))
|
||||||
|
i.mu.Unlock()
|
||||||
|
|
||||||
|
return series, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateMeasurementIndexIfNotExists creates or retrieves an in memory index
|
||||||
|
// object for the measurement
|
||||||
|
func (i *Index) CreateMeasurementIndexIfNotExists(name string) (*tsdb.Measurement, error) {
|
||||||
|
name = escape.UnescapeString(name)
|
||||||
|
|
||||||
|
// See if the measurement exists using a read-lock
|
||||||
|
i.mu.RLock()
|
||||||
|
m := i.measurements[name]
|
||||||
|
if m != nil {
|
||||||
|
i.mu.RUnlock()
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
i.mu.RUnlock()
|
||||||
|
|
||||||
|
// Doesn't exist, so lock the index to create it
|
||||||
|
i.mu.Lock()
|
||||||
|
defer i.mu.Unlock()
|
||||||
|
|
||||||
|
// Make sure it was created in between the time we released our read-lock
|
||||||
|
// and acquire the write lock
|
||||||
|
m = i.measurements[name]
|
||||||
|
if m == nil {
|
||||||
|
m = tsdb.NewMeasurement(name)
|
||||||
|
i.measurements[name] = m
|
||||||
|
|
||||||
|
// Add the measurement to the measurements sketch.
|
||||||
|
i.measurementsSketch.Add([]byte(name))
|
||||||
|
}
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TagsForSeries returns the tag map for the passed in series
|
||||||
|
func (i *Index) TagsForSeries(key string) (models.Tags, error) {
|
||||||
|
i.mu.RLock()
|
||||||
|
defer i.mu.RUnlock()
|
||||||
|
|
||||||
|
ss := i.series[key]
|
||||||
|
if ss == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return ss.Tags, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MeasurementsByExpr takes an expression containing only tags and returns a
|
||||||
|
// list of matching *tsdb.Measurement. The bool return argument returns if the
|
||||||
|
// expression was a measurement expression. It is used to differentiate a list
|
||||||
|
// of no measurements because all measurements were filtered out (when the bool
|
||||||
|
// is true) against when there are no measurements because the expression
|
||||||
|
// wasn't evaluated (when the bool is false).
|
||||||
|
func (i *Index) MeasurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool, error) {
|
||||||
|
i.mu.RLock()
|
||||||
|
defer i.mu.RUnlock()
|
||||||
|
return i.measurementsByExpr(expr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *Index) measurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool, error) {
|
||||||
|
if expr == nil {
|
||||||
|
return nil, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
switch e := expr.(type) {
|
||||||
|
case *influxql.BinaryExpr:
|
||||||
|
switch e.Op {
|
||||||
|
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
|
||||||
|
tag, ok := e.LHS.(*influxql.VarRef)
|
||||||
|
if !ok {
|
||||||
|
return nil, false, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
tf := &tsdb.TagFilter{
|
||||||
|
Op: e.Op,
|
||||||
|
Key: tag.Val,
|
||||||
|
}
|
||||||
|
|
||||||
|
if influxql.IsRegexOp(e.Op) {
|
||||||
|
re, ok := e.RHS.(*influxql.RegexLiteral)
|
||||||
|
if !ok {
|
||||||
|
return nil, false, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String())
|
||||||
|
}
|
||||||
|
tf.Regex = re.Val
|
||||||
|
} else {
|
||||||
|
s, ok := e.RHS.(*influxql.StringLiteral)
|
||||||
|
if !ok {
|
||||||
|
return nil, false, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String())
|
||||||
|
}
|
||||||
|
tf.Value = s.Val
|
||||||
|
}
|
||||||
|
|
||||||
|
// Match on name, if specified.
|
||||||
|
if tag.Val == "_name" {
|
||||||
|
return i.measurementsByNameFilter(tf.Op, tf.Value, tf.Regex), true, nil
|
||||||
|
} else if influxql.IsSystemName(tag.Val) {
|
||||||
|
return nil, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return i.measurementsByTagFilters([]*tsdb.TagFilter{tf}), true, nil
|
||||||
|
case influxql.OR, influxql.AND:
|
||||||
|
lhsIDs, lhsOk, err := i.measurementsByExpr(e.LHS)
|
||||||
|
if err != nil {
|
||||||
|
return nil, false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
rhsIDs, rhsOk, err := i.measurementsByExpr(e.RHS)
|
||||||
|
if err != nil {
|
||||||
|
return nil, false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if lhsOk && rhsOk {
|
||||||
|
if e.Op == influxql.OR {
|
||||||
|
return lhsIDs.Union(rhsIDs), true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return lhsIDs.Intersect(rhsIDs), true, nil
|
||||||
|
} else if lhsOk {
|
||||||
|
return lhsIDs, true, nil
|
||||||
|
} else if rhsOk {
|
||||||
|
return rhsIDs, true, nil
|
||||||
|
}
|
||||||
|
return nil, false, nil
|
||||||
|
default:
|
||||||
|
return nil, false, fmt.Errorf("invalid tag comparison operator")
|
||||||
|
}
|
||||||
|
case *influxql.ParenExpr:
|
||||||
|
return i.measurementsByExpr(e.Expr)
|
||||||
|
}
|
||||||
|
return nil, false, fmt.Errorf("%#v", expr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// measurementsByNameFilter returns the sorted measurements matching a name.
|
||||||
|
func (i *Index) measurementsByNameFilter(op influxql.Token, val string, regex *regexp.Regexp) tsdb.Measurements {
|
||||||
|
var measurements tsdb.Measurements
|
||||||
|
for _, m := range i.measurements {
|
||||||
|
var matched bool
|
||||||
|
switch op {
|
||||||
|
case influxql.EQ:
|
||||||
|
matched = m.Name == val
|
||||||
|
case influxql.NEQ:
|
||||||
|
matched = m.Name != val
|
||||||
|
case influxql.EQREGEX:
|
||||||
|
matched = regex.MatchString(m.Name)
|
||||||
|
case influxql.NEQREGEX:
|
||||||
|
matched = !regex.MatchString(m.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !matched {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
measurements = append(measurements, m)
|
||||||
|
}
|
||||||
|
sort.Sort(measurements)
|
||||||
|
return measurements
|
||||||
|
}
|
||||||
|
|
||||||
|
// measurementsByTagFilters returns the sorted measurements matching the filters on tag values.
|
||||||
|
func (i *Index) measurementsByTagFilters(filters []*tsdb.TagFilter) tsdb.Measurements {
|
||||||
|
// If no filters, then return all measurements.
|
||||||
|
if len(filters) == 0 {
|
||||||
|
measurements := make(tsdb.Measurements, 0, len(i.measurements))
|
||||||
|
for _, m := range i.measurements {
|
||||||
|
measurements = append(measurements, m)
|
||||||
|
}
|
||||||
|
return measurements
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build a list of measurements matching the filters.
|
||||||
|
var measurements tsdb.Measurements
|
||||||
|
var tagMatch bool
|
||||||
|
|
||||||
|
// Iterate through all measurements in the database.
|
||||||
|
for _, m := range i.measurements {
|
||||||
|
// Iterate filters seeing if the measurement has a matching tag.
|
||||||
|
for _, f := range filters {
|
||||||
|
tagVals := m.SeriesByTagKeyValue(f.Key)
|
||||||
|
if tagVals == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
tagMatch = false
|
||||||
|
|
||||||
|
// If the operator is non-regex, only check the specified value.
|
||||||
|
if f.Op == influxql.EQ || f.Op == influxql.NEQ {
|
||||||
|
if _, ok := tagVals[f.Value]; ok {
|
||||||
|
tagMatch = true
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Else, the operator is a regex and we have to check all tag
|
||||||
|
// values against the regular expression.
|
||||||
|
for tagVal := range tagVals {
|
||||||
|
if f.Regex.MatchString(tagVal) {
|
||||||
|
tagMatch = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
isEQ := (f.Op == influxql.EQ || f.Op == influxql.EQREGEX)
|
||||||
|
|
||||||
|
//
|
||||||
|
// XNOR gate
|
||||||
|
//
|
||||||
|
// tags match | operation is EQ | measurement matches
|
||||||
|
// --------------------------------------------------
|
||||||
|
// True | True | True
|
||||||
|
// True | False | False
|
||||||
|
// False | True | False
|
||||||
|
// False | False | True
|
||||||
|
|
||||||
|
if tagMatch == isEQ {
|
||||||
|
measurements = append(measurements, m)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Sort(measurements)
|
||||||
|
return measurements
|
||||||
|
}
|
||||||
|
|
||||||
|
// MeasurementNamesByRegex returns the measurements that match the regex.
|
||||||
|
func (i *Index) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
|
||||||
|
i.mu.RLock()
|
||||||
|
defer i.mu.RUnlock()
|
||||||
|
|
||||||
|
var matches [][]byte
|
||||||
|
for _, m := range i.measurements {
|
||||||
|
if re.MatchString(m.Name) {
|
||||||
|
matches = append(matches, []byte(m.Name))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return matches, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Measurements returns a list of all measurements.
|
||||||
|
func (i *Index) Measurements() (tsdb.Measurements, error) {
|
||||||
|
i.mu.RLock()
|
||||||
|
measurements := make(tsdb.Measurements, 0, len(i.measurements))
|
||||||
|
for _, m := range i.measurements {
|
||||||
|
measurements = append(measurements, m)
|
||||||
|
}
|
||||||
|
i.mu.RUnlock()
|
||||||
|
|
||||||
|
return measurements, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DropMeasurement removes the measurement and all of its underlying
|
||||||
|
// series from the database index
|
||||||
|
func (i *Index) DropMeasurement(name []byte) error {
|
||||||
|
i.mu.Lock()
|
||||||
|
defer i.mu.Unlock()
|
||||||
|
return i.dropMeasurement(string(name))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *Index) dropMeasurement(name string) error {
|
||||||
|
// Update the tombstone sketch.
|
||||||
|
i.measurementsTSSketch.Add([]byte(name))
|
||||||
|
|
||||||
|
m := i.measurements[name]
|
||||||
|
if m == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(i.measurements, name)
|
||||||
|
for _, s := range m.SeriesByIDMap() {
|
||||||
|
delete(i.series, s.Key)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DropSeries removes the series keys and their tags from the index
|
||||||
|
func (i *Index) DropSeries(keys [][]byte) error {
|
||||||
|
if len(keys) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
i.mu.Lock()
|
||||||
|
defer i.mu.Unlock()
|
||||||
|
|
||||||
|
var (
|
||||||
|
mToDelete = map[string]struct{}{}
|
||||||
|
nDeleted int64
|
||||||
|
)
|
||||||
|
|
||||||
|
for _, k := range keys {
|
||||||
|
// Update the tombstone sketch.
|
||||||
|
i.seriesTSSketch.Add(k)
|
||||||
|
|
||||||
|
series := i.series[string(k)]
|
||||||
|
if series == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
series.Measurement().DropSeries(series)
|
||||||
|
delete(i.series, string(k))
|
||||||
|
nDeleted++
|
||||||
|
|
||||||
|
// If there are no more series in the measurement then we'll
|
||||||
|
// remove it.
|
||||||
|
if len(series.Measurement().SeriesByIDMap()) == 0 {
|
||||||
|
mToDelete[series.Measurement().Name] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for mname := range mToDelete {
|
||||||
|
i.dropMeasurement(mname)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dereference removes all references to data within b and moves them to the heap.
|
||||||
|
func (i *Index) Dereference(b []byte) {
|
||||||
|
i.mu.RLock()
|
||||||
|
defer i.mu.RUnlock()
|
||||||
|
|
||||||
|
for _, s := range i.series {
|
||||||
|
s.Dereference(b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TagSets returns a list of tag sets.
|
||||||
|
func (i *Index) TagSets(shardID uint64, name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
|
||||||
|
i.mu.RLock()
|
||||||
|
defer i.mu.RUnlock()
|
||||||
|
|
||||||
|
mm := i.measurements[string(name)]
|
||||||
|
if mm == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return mm.TagSets(shardID, dimensions, condition)
|
||||||
|
}
|
|
@ -503,7 +503,7 @@ func (i *Index) MatchTagValueSeriesIterator(name, key []byte, value *regexp.Rege
|
||||||
|
|
||||||
// TagSets returns an ordered list of tag sets for a measurement by dimension
|
// TagSets returns an ordered list of tag sets for a measurement by dimension
|
||||||
// and filtered by an optional conditional expression.
|
// and filtered by an optional conditional expression.
|
||||||
func (i *Index) TagSets(name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
|
func (i *Index) TagSets(shardID uint64, name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
|
||||||
itr, err := i.MeasurementSeriesByExprIterator(name, condition)
|
itr, err := i.MeasurementSeriesByExprIterator(name, condition)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -1341,7 +1341,7 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions, aggregate bo
|
||||||
if err := func() error {
|
if err := func() error {
|
||||||
for _, name := range influxql.Sources(opt.Sources).Names() {
|
for _, name := range influxql.Sources(opt.Sources).Names() {
|
||||||
// Generate tag sets from index.
|
// Generate tag sets from index.
|
||||||
tagSets, err := e.index.TagSets([]byte(name), opt.Dimensions, opt.Condition)
|
tagSets, err := e.index.TagSets(e.id, []byte(name), opt.Dimensions, opt.Condition)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,5 +28,5 @@ type Index interface {
|
||||||
|
|
||||||
Dereference(b []byte)
|
Dereference(b []byte)
|
||||||
|
|
||||||
TagSets(name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error)
|
TagSets(shardID uint64, name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error)
|
||||||
}
|
}
|
||||||
|
|
833
tsdb/meta.go
833
tsdb/meta.go
|
@ -1,6 +1,7 @@
|
||||||
package tsdb
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"regexp"
|
"regexp"
|
||||||
"sort"
|
"sort"
|
||||||
|
@ -17,479 +18,6 @@ import (
|
||||||
|
|
||||||
//go:generate protoc --gogo_out=. internal/meta.proto
|
//go:generate protoc --gogo_out=. internal/meta.proto
|
||||||
|
|
||||||
/*
|
|
||||||
// DatabaseIndex is the in memory index of a collection of measurements, time series, and their tags.
|
|
||||||
// Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks.
|
|
||||||
type DatabaseIndex struct {
|
|
||||||
// in memory metadata index, built on load and updated when new series come in
|
|
||||||
mu sync.RWMutex
|
|
||||||
measurements map[string]*Measurement // measurement name to object and index
|
|
||||||
series map[string]*Series // map series key to the Series object
|
|
||||||
lastID uint64 // last used series ID. They're in memory only for this shard
|
|
||||||
|
|
||||||
seriesSketch, seriesTSSketch *hll.Plus
|
|
||||||
measurementsSketch, measurementsTSSketch *hll.Plus
|
|
||||||
|
|
||||||
name string // name of the database represented by this index
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewDatabaseIndex returns a new initialized DatabaseIndex.
|
|
||||||
func NewDatabaseIndex(name string) (index *DatabaseIndex, err error) {
|
|
||||||
index = &DatabaseIndex{
|
|
||||||
measurements: make(map[string]*Measurement),
|
|
||||||
series: make(map[string]*Series),
|
|
||||||
name: name,
|
|
||||||
}
|
|
||||||
|
|
||||||
if index.seriesSketch, err = hll.NewPlus(16); err != nil {
|
|
||||||
return nil, err
|
|
||||||
} else if index.seriesTSSketch, err = hll.NewPlus(16); err != nil {
|
|
||||||
return nil, err
|
|
||||||
} else if index.measurementsSketch, err = hll.NewPlus(16); err != nil {
|
|
||||||
return nil, err
|
|
||||||
} else if index.measurementsTSSketch, err = hll.NewPlus(16); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return index, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DatabaseIndex) Open() (err error) { return nil }
|
|
||||||
func (d *DatabaseIndex) Close() error { return nil }
|
|
||||||
|
|
||||||
// Series returns a series by key.
|
|
||||||
func (d *DatabaseIndex) Series(key []byte) (*Series, error) {
|
|
||||||
d.mu.RLock()
|
|
||||||
s := d.series[string(key)]
|
|
||||||
d.mu.RUnlock()
|
|
||||||
return s, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// SeriesN returns the exact number of series in the index.
|
|
||||||
func (d *DatabaseIndex) SeriesN() (uint64, error) {
|
|
||||||
d.mu.RLock()
|
|
||||||
defer d.mu.RUnlock()
|
|
||||||
return uint64(len(d.series)), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// SeriesSketch returns the sketch for the series.
|
|
||||||
func (d *DatabaseIndex) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
|
|
||||||
d.mu.RLock()
|
|
||||||
defer d.mu.RUnlock()
|
|
||||||
return d.seriesSketch, d.seriesTSSketch, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Measurement returns the measurement object from the index by the name
|
|
||||||
func (d *DatabaseIndex) Measurement(name []byte) (*Measurement, error) {
|
|
||||||
d.mu.RLock()
|
|
||||||
defer d.mu.RUnlock()
|
|
||||||
return d.measurements[string(name)], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// MeasurementsSketch returns the sketch for the series.
|
|
||||||
func (d *DatabaseIndex) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
|
|
||||||
d.mu.RLock()
|
|
||||||
defer d.mu.RUnlock()
|
|
||||||
return d.measurementsSketch, d.measurementsTSSketch, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// MeasurementsByName returns a list of measurements.
|
|
||||||
func (d *DatabaseIndex) MeasurementsByName(names []string) ([]*Measurement, error) {
|
|
||||||
d.mu.RLock()
|
|
||||||
defer d.mu.RUnlock()
|
|
||||||
|
|
||||||
a := make([]*Measurement, 0, len(names))
|
|
||||||
for _, name := range names {
|
|
||||||
if m := d.measurements[name]; m != nil {
|
|
||||||
a = append(a, m)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return a, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// MeasurementSeriesCounts returns the number of measurements and series currently indexed by the database.
|
|
||||||
// Useful for reporting and monitoring.
|
|
||||||
func (d *DatabaseIndex) MeasurementSeriesCounts() (nMeasurements int, nSeries int) {
|
|
||||||
d.mu.RLock()
|
|
||||||
defer d.mu.RUnlock()
|
|
||||||
nMeasurements, nSeries = len(d.measurements), len(d.series)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// SeriesShardN returns the series count for a shard.
|
|
||||||
func (d *DatabaseIndex) SeriesShardN(shardID uint64) int {
|
|
||||||
var n int
|
|
||||||
d.mu.RLock()
|
|
||||||
for _, s := range d.series {
|
|
||||||
if s.Assigned(shardID) {
|
|
||||||
n++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
d.mu.RUnlock()
|
|
||||||
return n
|
|
||||||
}
|
|
||||||
|
|
||||||
// CreateSeriesIndexIfNotExists adds the series for the given measurement to the
|
|
||||||
// index and sets its ID or returns the existing series object
|
|
||||||
func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, series *Series) (*Series, error) {
|
|
||||||
d.mu.RLock()
|
|
||||||
// if there is a measurement for this id, it's already been added
|
|
||||||
ss := d.series[series.Key]
|
|
||||||
if ss != nil {
|
|
||||||
d.mu.RUnlock()
|
|
||||||
return ss, nil
|
|
||||||
}
|
|
||||||
d.mu.RUnlock()
|
|
||||||
|
|
||||||
// get or create the measurement index
|
|
||||||
m, err := d.CreateMeasurementIndexIfNotExists(measurementName)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
d.mu.Lock()
|
|
||||||
// Check for the series again under a write lock
|
|
||||||
ss = d.series[series.Key]
|
|
||||||
if ss != nil {
|
|
||||||
d.mu.Unlock()
|
|
||||||
return ss, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// set the in memory ID for query processing on this shard
|
|
||||||
series.ID = d.lastID + 1
|
|
||||||
d.lastID++
|
|
||||||
|
|
||||||
series.measurement = m
|
|
||||||
d.series[series.Key] = series
|
|
||||||
|
|
||||||
m.AddSeries(series)
|
|
||||||
|
|
||||||
// Add the series to the series sketch.
|
|
||||||
d.seriesSketch.Add([]byte(series.Key))
|
|
||||||
d.mu.Unlock()
|
|
||||||
|
|
||||||
return series, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// CreateMeasurementIndexIfNotExists creates or retrieves an in memory index
|
|
||||||
// object for the measurement
|
|
||||||
func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) (*Measurement, error) {
|
|
||||||
name = escape.UnescapeString(name)
|
|
||||||
|
|
||||||
// See if the measurement exists using a read-lock
|
|
||||||
d.mu.RLock()
|
|
||||||
m := d.measurements[name]
|
|
||||||
if m != nil {
|
|
||||||
d.mu.RUnlock()
|
|
||||||
return m, nil
|
|
||||||
}
|
|
||||||
d.mu.RUnlock()
|
|
||||||
|
|
||||||
// Doesn't exist, so lock the index to create it
|
|
||||||
d.mu.Lock()
|
|
||||||
defer d.mu.Unlock()
|
|
||||||
|
|
||||||
// Make sure it was created in between the time we released our read-lock
|
|
||||||
// and acquire the write lock
|
|
||||||
m = d.measurements[name]
|
|
||||||
if m == nil {
|
|
||||||
m = NewMeasurement(name)
|
|
||||||
d.measurements[name] = m
|
|
||||||
|
|
||||||
// Add the measurement to the measurements sketch.
|
|
||||||
d.measurementsSketch.Add([]byte(name))
|
|
||||||
}
|
|
||||||
return m, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// TagsForSeries returns the tag map for the passed in series
|
|
||||||
func (d *DatabaseIndex) TagsForSeries(key string) (models.Tags, error) {
|
|
||||||
d.mu.RLock()
|
|
||||||
defer d.mu.RUnlock()
|
|
||||||
|
|
||||||
ss := d.series[key]
|
|
||||||
if ss == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
return ss.Tags, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// MeasurementsByExpr takes an expression containing only tags and returns a
|
|
||||||
// list of matching *Measurement. The bool return argument returns if the
|
|
||||||
// expression was a measurement expression. It is used to differentiate a list
|
|
||||||
// of no measurements because all measurements were filtered out (when the bool
|
|
||||||
// is true) against when there are no measurements because the expression
|
|
||||||
// wasn't evaluated (when the bool is false).
|
|
||||||
func (d *DatabaseIndex) MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error) {
|
|
||||||
d.mu.RLock()
|
|
||||||
defer d.mu.RUnlock()
|
|
||||||
return d.measurementsByExpr(expr)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DatabaseIndex) measurementsByExpr(expr influxql.Expr) (Measurements, bool, error) {
|
|
||||||
if expr == nil {
|
|
||||||
return nil, false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
switch e := expr.(type) {
|
|
||||||
case *influxql.BinaryExpr:
|
|
||||||
switch e.Op {
|
|
||||||
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
|
|
||||||
tag, ok := e.LHS.(*influxql.VarRef)
|
|
||||||
if !ok {
|
|
||||||
return nil, false, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String())
|
|
||||||
}
|
|
||||||
|
|
||||||
tf := &TagFilter{
|
|
||||||
Op: e.Op,
|
|
||||||
Key: tag.Val,
|
|
||||||
}
|
|
||||||
|
|
||||||
if influxql.IsRegexOp(e.Op) {
|
|
||||||
re, ok := e.RHS.(*influxql.RegexLiteral)
|
|
||||||
if !ok {
|
|
||||||
return nil, false, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String())
|
|
||||||
}
|
|
||||||
tf.Regex = re.Val
|
|
||||||
} else {
|
|
||||||
s, ok := e.RHS.(*influxql.StringLiteral)
|
|
||||||
if !ok {
|
|
||||||
return nil, false, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String())
|
|
||||||
}
|
|
||||||
tf.Value = s.Val
|
|
||||||
}
|
|
||||||
|
|
||||||
// Match on name, if specified.
|
|
||||||
if tag.Val == "_name" {
|
|
||||||
return d.measurementsByNameFilter(tf.Op, tf.Value, tf.Regex), true, nil
|
|
||||||
} else if influxql.IsSystemName(tag.Val) {
|
|
||||||
return nil, false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return d.measurementsByTagFilters([]*TagFilter{tf}), true, nil
|
|
||||||
case influxql.OR, influxql.AND:
|
|
||||||
lhsIDs, lhsOk, err := d.measurementsByExpr(e.LHS)
|
|
||||||
if err != nil {
|
|
||||||
return nil, false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
rhsIDs, rhsOk, err := d.measurementsByExpr(e.RHS)
|
|
||||||
if err != nil {
|
|
||||||
return nil, false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if lhsOk && rhsOk {
|
|
||||||
if e.Op == influxql.OR {
|
|
||||||
return lhsIDs.union(rhsIDs), true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return lhsIDs.intersect(rhsIDs), true, nil
|
|
||||||
} else if lhsOk {
|
|
||||||
return lhsIDs, true, nil
|
|
||||||
} else if rhsOk {
|
|
||||||
return rhsIDs, true, nil
|
|
||||||
}
|
|
||||||
return nil, false, nil
|
|
||||||
default:
|
|
||||||
return nil, false, fmt.Errorf("invalid tag comparison operator")
|
|
||||||
}
|
|
||||||
case *influxql.ParenExpr:
|
|
||||||
return d.measurementsByExpr(e.Expr)
|
|
||||||
}
|
|
||||||
return nil, false, fmt.Errorf("%#v", expr)
|
|
||||||
}
|
|
||||||
|
|
||||||
// measurementsByNameFilter returns the sorted measurements matching a name.
|
|
||||||
func (d *DatabaseIndex) measurementsByNameFilter(op influxql.Token, val string, regex *regexp.Regexp) Measurements {
|
|
||||||
var measurements Measurements
|
|
||||||
for _, m := range d.measurements {
|
|
||||||
var matched bool
|
|
||||||
switch op {
|
|
||||||
case influxql.EQ:
|
|
||||||
matched = m.Name == val
|
|
||||||
case influxql.NEQ:
|
|
||||||
matched = m.Name != val
|
|
||||||
case influxql.EQREGEX:
|
|
||||||
matched = regex.MatchString(m.Name)
|
|
||||||
case influxql.NEQREGEX:
|
|
||||||
matched = !regex.MatchString(m.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !matched {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
measurements = append(measurements, m)
|
|
||||||
}
|
|
||||||
sort.Sort(measurements)
|
|
||||||
return measurements
|
|
||||||
}
|
|
||||||
|
|
||||||
// measurementsByTagFilters returns the sorted measurements matching the filters on tag values.
|
|
||||||
func (d *DatabaseIndex) measurementsByTagFilters(filters []*TagFilter) Measurements {
|
|
||||||
// If no filters, then return all measurements.
|
|
||||||
if len(filters) == 0 {
|
|
||||||
measurements := make(Measurements, 0, len(d.measurements))
|
|
||||||
for _, m := range d.measurements {
|
|
||||||
measurements = append(measurements, m)
|
|
||||||
}
|
|
||||||
return measurements
|
|
||||||
}
|
|
||||||
|
|
||||||
// Build a list of measurements matching the filters.
|
|
||||||
var measurements Measurements
|
|
||||||
var tagMatch bool
|
|
||||||
|
|
||||||
// Iterate through all measurements in the database.
|
|
||||||
for _, m := range d.measurements {
|
|
||||||
// Iterate filters seeing if the measurement has a matching tag.
|
|
||||||
for _, f := range filters {
|
|
||||||
m.mu.RLock()
|
|
||||||
tagVals, ok := m.seriesByTagKeyValue[f.Key]
|
|
||||||
m.mu.RUnlock()
|
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
tagMatch = false
|
|
||||||
|
|
||||||
// If the operator is non-regex, only check the specified value.
|
|
||||||
if f.Op == influxql.EQ || f.Op == influxql.NEQ {
|
|
||||||
if _, ok := tagVals[f.Value]; ok {
|
|
||||||
tagMatch = true
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Else, the operator is a regex and we have to check all tag
|
|
||||||
// values against the regular expression.
|
|
||||||
for tagVal := range tagVals {
|
|
||||||
if f.Regex.MatchString(tagVal) {
|
|
||||||
tagMatch = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
isEQ := (f.Op == influxql.EQ || f.Op == influxql.EQREGEX)
|
|
||||||
|
|
||||||
//
|
|
||||||
// XNOR gate
|
|
||||||
//
|
|
||||||
// tags match | operation is EQ | measurement matches
|
|
||||||
// --------------------------------------------------
|
|
||||||
// True | True | True
|
|
||||||
// True | False | False
|
|
||||||
// False | True | False
|
|
||||||
// False | False | True
|
|
||||||
|
|
||||||
if tagMatch == isEQ {
|
|
||||||
measurements = append(measurements, m)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
sort.Sort(measurements)
|
|
||||||
return measurements
|
|
||||||
}
|
|
||||||
|
|
||||||
// MeasurementNamesByRegex returns the measurements that match the regex.
|
|
||||||
func (d *DatabaseIndex) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
|
|
||||||
d.mu.RLock()
|
|
||||||
defer d.mu.RUnlock()
|
|
||||||
|
|
||||||
var matches [][]byte
|
|
||||||
for _, m := range d.measurements {
|
|
||||||
if re.MatchString(m.Name) {
|
|
||||||
matches = append(matches,[]byte(m.Name))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return matches, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Measurements returns a list of all measurements.
|
|
||||||
func (d *DatabaseIndex) Measurements() (Measurements, error) {
|
|
||||||
d.mu.RLock()
|
|
||||||
measurements := make(Measurements, 0, len(d.measurements))
|
|
||||||
for _, m := range d.measurements {
|
|
||||||
measurements = append(measurements, m)
|
|
||||||
}
|
|
||||||
d.mu.RUnlock()
|
|
||||||
|
|
||||||
return measurements, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// DropMeasurement removes the measurement and all of its underlying
|
|
||||||
// series from the database index
|
|
||||||
func (d *DatabaseIndex) DropMeasurement(name []byte) error {
|
|
||||||
d.mu.Lock()
|
|
||||||
defer d.mu.Unlock()
|
|
||||||
return d.dropMeasurement(string(name))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DatabaseIndex) dropMeasurement(name string) error {
|
|
||||||
// Update the tombstone sketch.
|
|
||||||
d.measurementsTSSketch.Add([]byte(name))
|
|
||||||
|
|
||||||
m := d.measurements[name]
|
|
||||||
if m == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
delete(d.measurements, name)
|
|
||||||
for _, s := range m.seriesByID {
|
|
||||||
delete(d.series, s.Key)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// DropSeries removes the series keys and their tags from the index
|
|
||||||
func (d *DatabaseIndex) DropSeries(keys []string) error {
|
|
||||||
if len(keys) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
d.mu.Lock()
|
|
||||||
defer d.mu.Unlock()
|
|
||||||
|
|
||||||
var (
|
|
||||||
mToDelete = map[string]struct{}{}
|
|
||||||
nDeleted int64
|
|
||||||
)
|
|
||||||
|
|
||||||
for _, k := range keys {
|
|
||||||
// Update the tombstone sketch.
|
|
||||||
d.seriesTSSketch.Add([]byte(k))
|
|
||||||
|
|
||||||
series := d.series[k]
|
|
||||||
if series == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
series.measurement.DropSeries(series)
|
|
||||||
delete(d.series, k)
|
|
||||||
nDeleted++
|
|
||||||
|
|
||||||
// If there are no more series in the measurement then we'll
|
|
||||||
// remove it.
|
|
||||||
if len(series.measurement.seriesByID) == 0 {
|
|
||||||
mToDelete[series.measurement.Name] = struct{}{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for mname := range mToDelete {
|
|
||||||
d.dropMeasurement(mname)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dereference removes all references to data within b and moves them to the heap.
|
|
||||||
func (d *DatabaseIndex) Dereference(b []byte) {
|
|
||||||
d.mu.RLock()
|
|
||||||
defer d.mu.RUnlock()
|
|
||||||
|
|
||||||
for _, s := range d.series {
|
|
||||||
s.Dereference(b)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
// Measurement represents a collection of time series in a database. It also
|
// Measurement represents a collection of time series in a database. It also
|
||||||
// contains in memory structures for indexing tags. Exported functions are
|
// contains in memory structures for indexing tags. Exported functions are
|
||||||
|
@ -530,6 +58,13 @@ func (m *Measurement) SeriesByID(id uint64) *Series {
|
||||||
return m.seriesByID[id]
|
return m.seriesByID[id]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SeriesByIDMap returns the internal seriesByID map.
|
||||||
|
func (m *Measurement) SeriesByIDMap() map[uint64]*Series {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
return m.seriesByID
|
||||||
|
}
|
||||||
|
|
||||||
// SeriesByIDSlice returns a list of series by identifiers.
|
// SeriesByIDSlice returns a list of series by identifiers.
|
||||||
func (m *Measurement) SeriesByIDSlice(ids []uint64) []*Series {
|
func (m *Measurement) SeriesByIDSlice(ids []uint64) []*Series {
|
||||||
m.mu.RLock()
|
m.mu.RLock()
|
||||||
|
@ -572,7 +107,17 @@ func (m *Measurement) HasTagKey(k string) bool {
|
||||||
return hasTag
|
return hasTag
|
||||||
}
|
}
|
||||||
|
|
||||||
|
<<<<<<< HEAD
|
||||||
// HasTagKeyValue returns true if at least one series in this measurement has written a value the given tag key and tag value.
|
// HasTagKeyValue returns true if at least one series in this measurement has written a value the given tag key and tag value.
|
||||||
|
=======
|
||||||
|
func (m *Measurement) SeriesByTagValue(key string) map[string]SeriesIDs {
|
||||||
|
m.mu.RLock()
|
||||||
|
tagVals := m.seriesByTagKeyValue[key]
|
||||||
|
m.mu.RUnlock()
|
||||||
|
return tagVals
|
||||||
|
}
|
||||||
|
|
||||||
|
>>>>>>> ee54c3e... intermediate
|
||||||
func (m *Measurement) HasTagKeyValue(k, v []byte) bool {
|
func (m *Measurement) HasTagKeyValue(k, v []byte) bool {
|
||||||
m.mu.RLock()
|
m.mu.RLock()
|
||||||
if vals, ok := m.seriesByTagKeyValue[string(k)]; ok {
|
if vals, ok := m.seriesByTagKeyValue[string(k)]; ok {
|
||||||
|
@ -691,19 +236,215 @@ func (m *Measurement) DropSeries(series *Series) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// TagSets returns the unique tag sets that exist for the given tag keys. This
|
// filters walks the where clause of a select statement and returns a map with all series ids
|
||||||
// is used to determine what composite series will be created by a group by.
|
// matching the where clause and any filter expression that should be applied to each
|
||||||
//
|
func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]influxql.Expr, error) {
|
||||||
// i.e. "group by region" should return: {"region":"uswest"},
|
if condition == nil || influxql.OnlyTimeExpr(condition) {
|
||||||
// {"region":"useast"} or region, service returns {"region": "uswest",
|
return m.seriesIDs, nil, nil
|
||||||
// "service": "redis"}, {"region": "uswest", "service": "mysql"}, etc...
|
}
|
||||||
//
|
return m.walkWhereForSeriesIds(condition)
|
||||||
// This will also populate the TagSet objects with the series IDs that match
|
}
|
||||||
// each tagset and any influx filter expression that goes with the series TODO:
|
|
||||||
// this shouldn't be exported. However, until tx.go and the engine get
|
// TagSets returns the unique tag sets that exist for the given tag keys. This is used to determine
|
||||||
// refactored into tsdb, we need it.
|
// what composite series will be created by a group by. i.e. "group by region" should return:
|
||||||
func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
|
// {"region":"uswest"}, {"region":"useast"}
|
||||||
panic("MOVED")
|
// or region, service returns
|
||||||
|
// {"region": "uswest", "service": "redis"}, {"region": "uswest", "service": "mysql"}, etc...
|
||||||
|
// This will also populate the TagSet objects with the series IDs that match each tagset and any
|
||||||
|
// influx filter expression that goes with the series
|
||||||
|
// TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it.
|
||||||
|
func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
|
||||||
|
m.mu.RLock()
|
||||||
|
|
||||||
|
// get the unique set of series ids and the filters that should be applied to each
|
||||||
|
ids, filters, err := m.filters(condition)
|
||||||
|
if err != nil {
|
||||||
|
m.mu.RUnlock()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// For every series, get the tag values for the requested tag keys i.e. dimensions. This is the
|
||||||
|
// TagSet for that series. Series with the same TagSet are then grouped together, because for the
|
||||||
|
// purpose of GROUP BY they are part of the same composite series.
|
||||||
|
tagSets := make(map[string]*influxql.TagSet, 64)
|
||||||
|
for _, id := range ids {
|
||||||
|
s := m.seriesByID[id]
|
||||||
|
if !s.Assigned(shardID) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
tags := make(map[string]string, len(dimensions))
|
||||||
|
|
||||||
|
// Build the TagSet for this series.
|
||||||
|
for _, dim := range dimensions {
|
||||||
|
tags[dim] = s.Tags.GetString(dim)
|
||||||
|
}
|
||||||
|
// Convert the TagSet to a string, so it can be added to a map allowing TagSets to be handled
|
||||||
|
// as a set.
|
||||||
|
tagsAsKey := MarshalTags(tags)
|
||||||
|
tagSet, ok := tagSets[string(tagsAsKey)]
|
||||||
|
if !ok {
|
||||||
|
// This TagSet is new, create a new entry for it.
|
||||||
|
tagSet = &influxql.TagSet{
|
||||||
|
Tags: tags,
|
||||||
|
Key: tagsAsKey,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Associate the series and filter with the Tagset.
|
||||||
|
tagSet.AddFilter(m.seriesByID[id].Key, filters[id])
|
||||||
|
|
||||||
|
// Ensure it's back in the map.
|
||||||
|
tagSets[string(tagsAsKey)] = tagSet
|
||||||
|
}
|
||||||
|
// Release the lock while we sort all the tags
|
||||||
|
m.mu.RUnlock()
|
||||||
|
|
||||||
|
// Sort the series in each tag set.
|
||||||
|
for _, t := range tagSets {
|
||||||
|
sort.Sort(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The TagSets have been created, as a map of TagSets. Just send
|
||||||
|
// the values back as a slice, sorting for consistency.
|
||||||
|
sortedTagsSets := make([]*influxql.TagSet, 0, len(tagSets))
|
||||||
|
for _, v := range tagSets {
|
||||||
|
sortedTagsSets = append(sortedTagsSets, v)
|
||||||
|
}
|
||||||
|
sort.Sort(byTagKey(sortedTagsSets))
|
||||||
|
|
||||||
|
return sortedTagsSets, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// intersectSeriesFilters performs an intersection for two sets of ids and filter expressions.
|
||||||
|
func intersectSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) {
|
||||||
|
// We only want to allocate a slice and map of the smaller size.
|
||||||
|
var ids []uint64
|
||||||
|
if len(lids) > len(rids) {
|
||||||
|
ids = make([]uint64, 0, len(rids))
|
||||||
|
} else {
|
||||||
|
ids = make([]uint64, 0, len(lids))
|
||||||
|
}
|
||||||
|
|
||||||
|
var filters FilterExprs
|
||||||
|
if len(lfilters) > len(rfilters) {
|
||||||
|
filters = make(FilterExprs, len(rfilters))
|
||||||
|
} else {
|
||||||
|
filters = make(FilterExprs, len(lfilters))
|
||||||
|
}
|
||||||
|
|
||||||
|
// They're in sorted order so advance the counter as needed.
|
||||||
|
// This is, don't run comparisons against lower values that we've already passed.
|
||||||
|
for len(lids) > 0 && len(rids) > 0 {
|
||||||
|
lid, rid := lids[0], rids[0]
|
||||||
|
if lid == rid {
|
||||||
|
ids = append(ids, lid)
|
||||||
|
|
||||||
|
var expr influxql.Expr
|
||||||
|
lfilter := lfilters[lid]
|
||||||
|
rfilter := rfilters[rid]
|
||||||
|
|
||||||
|
if lfilter != nil && rfilter != nil {
|
||||||
|
be := &influxql.BinaryExpr{
|
||||||
|
Op: influxql.AND,
|
||||||
|
LHS: lfilter,
|
||||||
|
RHS: rfilter,
|
||||||
|
}
|
||||||
|
expr = influxql.Reduce(be, nil)
|
||||||
|
} else if lfilter != nil {
|
||||||
|
expr = lfilter
|
||||||
|
} else if rfilter != nil {
|
||||||
|
expr = rfilter
|
||||||
|
}
|
||||||
|
|
||||||
|
if expr != nil {
|
||||||
|
filters[lid] = expr
|
||||||
|
}
|
||||||
|
lids, rids = lids[1:], rids[1:]
|
||||||
|
} else if lid < rid {
|
||||||
|
lids = lids[1:]
|
||||||
|
} else {
|
||||||
|
rids = rids[1:]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ids, filters
|
||||||
|
}
|
||||||
|
|
||||||
|
// unionSeriesFilters performs a union for two sets of ids and filter expressions.
|
||||||
|
func unionSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) {
|
||||||
|
ids := make([]uint64, 0, len(lids)+len(rids))
|
||||||
|
|
||||||
|
// Setup the filters with the smallest size since we will discard filters
|
||||||
|
// that do not have a match on the other side.
|
||||||
|
var filters FilterExprs
|
||||||
|
if len(lfilters) < len(rfilters) {
|
||||||
|
filters = make(FilterExprs, len(lfilters))
|
||||||
|
} else {
|
||||||
|
filters = make(FilterExprs, len(rfilters))
|
||||||
|
}
|
||||||
|
|
||||||
|
for len(lids) > 0 && len(rids) > 0 {
|
||||||
|
lid, rid := lids[0], rids[0]
|
||||||
|
if lid == rid {
|
||||||
|
ids = append(ids, lid)
|
||||||
|
|
||||||
|
// If one side does not have a filter, then the series has been
|
||||||
|
// included on one side of the OR with no condition. Eliminate the
|
||||||
|
// filter in this case.
|
||||||
|
var expr influxql.Expr
|
||||||
|
lfilter := lfilters[lid]
|
||||||
|
rfilter := rfilters[rid]
|
||||||
|
if lfilter != nil && rfilter != nil {
|
||||||
|
be := &influxql.BinaryExpr{
|
||||||
|
Op: influxql.OR,
|
||||||
|
LHS: lfilter,
|
||||||
|
RHS: rfilter,
|
||||||
|
}
|
||||||
|
expr = influxql.Reduce(be, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
if expr != nil {
|
||||||
|
filters[lid] = expr
|
||||||
|
}
|
||||||
|
lids, rids = lids[1:], rids[1:]
|
||||||
|
} else if lid < rid {
|
||||||
|
ids = append(ids, lid)
|
||||||
|
|
||||||
|
filter := lfilters[lid]
|
||||||
|
if filter != nil {
|
||||||
|
filters[lid] = filter
|
||||||
|
}
|
||||||
|
lids = lids[1:]
|
||||||
|
} else {
|
||||||
|
ids = append(ids, rid)
|
||||||
|
|
||||||
|
filter := rfilters[rid]
|
||||||
|
if filter != nil {
|
||||||
|
filters[rid] = filter
|
||||||
|
}
|
||||||
|
rids = rids[1:]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now append the remainder.
|
||||||
|
if len(lids) > 0 {
|
||||||
|
for i := 0; i < len(lids); i++ {
|
||||||
|
ids = append(ids, lids[i])
|
||||||
|
|
||||||
|
filter := lfilters[lids[i]]
|
||||||
|
if filter != nil {
|
||||||
|
filters[lids[i]] = filter
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if len(rids) > 0 {
|
||||||
|
for i := 0; i < len(rids); i++ {
|
||||||
|
ids = append(ids, rids[i])
|
||||||
|
|
||||||
|
filter := rfilters[rids[i]]
|
||||||
|
if filter != nil {
|
||||||
|
filters[rids[i]] = filter
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ids, filters
|
||||||
}
|
}
|
||||||
|
|
||||||
// IDsForExpr returns the series IDs that are candidates to match the given expression.
|
// IDsForExpr returns the series IDs that are candidates to match the given expression.
|
||||||
|
@ -899,6 +640,71 @@ func (fe FilterExprs) Len() int {
|
||||||
return len(fe)
|
return len(fe)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// walkWhereForSeriesIds recursively walks the WHERE clause and returns an ordered set of series IDs and
|
||||||
|
// a map from those series IDs to filter expressions that should be used to limit points returned in
|
||||||
|
// the final query result.
|
||||||
|
func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, FilterExprs, error) {
|
||||||
|
switch n := expr.(type) {
|
||||||
|
case *influxql.BinaryExpr:
|
||||||
|
switch n.Op {
|
||||||
|
case influxql.EQ, influxql.NEQ, influxql.LT, influxql.LTE, influxql.GT, influxql.GTE, influxql.EQREGEX, influxql.NEQREGEX:
|
||||||
|
// Get the series IDs and filter expression for the tag or field comparison.
|
||||||
|
ids, expr, err := m.idsForExpr(n)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(ids) == 0 {
|
||||||
|
return ids, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the expression is a boolean literal that is true, ignore it.
|
||||||
|
if b, ok := expr.(*influxql.BooleanLiteral); ok && b.Val {
|
||||||
|
expr = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var filters FilterExprs
|
||||||
|
if expr != nil {
|
||||||
|
filters = make(FilterExprs, len(ids))
|
||||||
|
for _, id := range ids {
|
||||||
|
filters[id] = expr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ids, filters, nil
|
||||||
|
case influxql.AND, influxql.OR:
|
||||||
|
// Get the series IDs and filter expressions for the LHS.
|
||||||
|
lids, lfilters, err := m.walkWhereForSeriesIds(n.LHS)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the series IDs and filter expressions for the RHS.
|
||||||
|
rids, rfilters, err := m.walkWhereForSeriesIds(n.RHS)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Combine the series IDs from the LHS and RHS.
|
||||||
|
if n.Op == influxql.AND {
|
||||||
|
ids, filters := intersectSeriesFilters(lids, rids, lfilters, rfilters)
|
||||||
|
return ids, filters, nil
|
||||||
|
} else {
|
||||||
|
ids, filters := unionSeriesFilters(lids, rids, lfilters, rfilters)
|
||||||
|
return ids, filters, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ids, _, err := m.idsForExpr(n)
|
||||||
|
return ids, nil, err
|
||||||
|
case *influxql.ParenExpr:
|
||||||
|
// walk down the tree
|
||||||
|
return m.walkWhereForSeriesIds(n.Expr)
|
||||||
|
default:
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// expandExpr returns a list of expressions expanded by all possible tag
|
// expandExpr returns a list of expressions expanded by all possible tag
|
||||||
// combinations.
|
// combinations.
|
||||||
func (m *Measurement) expandExpr(expr influxql.Expr) []tagSetExpr {
|
func (m *Measurement) expandExpr(expr influxql.Expr) []tagSetExpr {
|
||||||
|
@ -1207,6 +1013,7 @@ type Series struct {
|
||||||
Tags models.Tags
|
Tags models.Tags
|
||||||
ID uint64
|
ID uint64
|
||||||
measurement *Measurement
|
measurement *Measurement
|
||||||
|
shardIDs []uint64 // shards that have this series defined
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSeries returns an initialized series struct
|
// NewSeries returns an initialized series struct
|
||||||
|
@ -1217,6 +1024,48 @@ func NewSeries(key []byte, tags models.Tags) *Series {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Series) AssignShard(shardID uint64) {
|
||||||
|
s.mu.Lock()
|
||||||
|
if !s.assigned(shardID) {
|
||||||
|
s.shardIDs = append(s.shardIDs, shardID)
|
||||||
|
sort.Sort(uint64Slice(s.shardIDs))
|
||||||
|
}
|
||||||
|
s.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Series) UnassignShard(shardID uint64) {
|
||||||
|
s.mu.Lock()
|
||||||
|
for i, v := range s.shardIDs {
|
||||||
|
if v == shardID {
|
||||||
|
s.shardIDs = append(s.shardIDs[:i], s.shardIDs[i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Series) Assigned(shardID uint64) bool {
|
||||||
|
s.mu.RLock()
|
||||||
|
b := s.assigned(shardID)
|
||||||
|
s.mu.RUnlock()
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Series) assigned(shardID uint64) bool {
|
||||||
|
i := sort.Search(len(s.shardIDs), func(i int) bool { return s.shardIDs[i] >= shardID })
|
||||||
|
return i < len(s.shardIDs) && s.shardIDs[i] == shardID
|
||||||
|
}
|
||||||
|
|
||||||
|
// Measurement returns the measurement on the series.
|
||||||
|
func (s *Series) Measurement() *Measurement {
|
||||||
|
return s.measurement
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetMeasurement sets the measurement on the series.
|
||||||
|
func (s *Series) SetMeasurement(m *Measurement) {
|
||||||
|
s.measurement = m
|
||||||
|
}
|
||||||
|
|
||||||
// Dereference removes references to a byte slice.
|
// Dereference removes references to a byte slice.
|
||||||
func (s *Series) Dereference(b []byte) {
|
func (s *Series) Dereference(b []byte) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
|
@ -1695,3 +1544,15 @@ func MeasurementFromSeriesKey(key string) string {
|
||||||
k, _, _ := models.ParseKey([]byte(key))
|
k, _, _ := models.ParseKey([]byte(key))
|
||||||
return escape.UnescapeString(k)
|
return escape.UnescapeString(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type uint64Slice []uint64
|
||||||
|
|
||||||
|
func (a uint64Slice) Len() int { return len(a) }
|
||||||
|
func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||||
|
func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }
|
||||||
|
|
||||||
|
type byTagKey []*influxql.TagSet
|
||||||
|
|
||||||
|
func (t byTagKey) Len() int { return len(t) }
|
||||||
|
func (t byTagKey) Less(i, j int) bool { return bytes.Compare(t[i].Key, t[j].Key) < 0 }
|
||||||
|
func (t byTagKey) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
|
||||||
|
|
|
@ -346,11 +346,10 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
|
||||||
tmpShard := path.Join(tmpDir, "shard")
|
tmpShard := path.Join(tmpDir, "shard")
|
||||||
tmpWal := path.Join(tmpDir, "wal")
|
tmpWal := path.Join(tmpDir, "wal")
|
||||||
|
|
||||||
index := tsdb.NewDatabaseIndex("db")
|
|
||||||
opts := tsdb.NewEngineOptions()
|
opts := tsdb.NewEngineOptions()
|
||||||
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
|
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
|
||||||
|
|
||||||
sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
|
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
|
||||||
if err := sh.Open(); err != nil {
|
if err := sh.Open(); err != nil {
|
||||||
t.Fatalf("error opening shard: %s", err.Error())
|
t.Fatalf("error opening shard: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
@ -380,7 +379,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for i := 0; i < 50; i++ {
|
for i := 0; i < 50; i++ {
|
||||||
if err := sh.DeleteMeasurement("cpu", []string{"cpu,host=server"}); err != nil {
|
if err := sh.DeleteMeasurement([]byte("cpu")); err != nil {
|
||||||
t.Fatalf(err.Error())
|
t.Fatalf(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -395,7 +394,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for i := 0; i < 50; i++ {
|
for i := 0; i < 50; i++ {
|
||||||
if err := sh.DeleteMeasurement("cpu", []string{"cpu,host=server"}); err != nil {
|
if err := sh.DeleteMeasurement([]byte("cpu")); err != nil {
|
||||||
t.Fatalf(err.Error())
|
t.Fatalf(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue