diff --git a/tsdb/config.go b/tsdb/config.go index 4b08e389ff..e25108915c 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -12,6 +12,9 @@ const ( // DefaultEngine is the default engine for new shards DefaultEngine = "tsm1" + // DefaultIndex is the default index for new shards + DefaultIndex = "inmem" + // tsdb/engine/wal configuration options // Default settings for TSM @@ -48,6 +51,7 @@ const ( type Config struct { Dir string `toml:"dir"` Engine string `toml:"-"` + Index string `toml:"-"` // General WAL configuration options WALDir string `toml:"wal-dir"` @@ -80,6 +84,7 @@ type Config struct { func NewConfig() Config { return Config{ Engine: DefaultEngine, + Index: DefaultIndex, QueryLogEnabled: true, diff --git a/tsdb/engine/inmem/inmem.go b/tsdb/engine/inmem/inmem.go new file mode 100644 index 0000000000..b187eb8f72 --- /dev/null +++ b/tsdb/engine/inmem/inmem.go @@ -0,0 +1,484 @@ +package inmem + +import ( + "fmt" + "regexp" + "sort" + "sync" + + "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/escape" + "github.com/influxdata/influxdb/pkg/estimator" + "github.com/influxdata/influxdb/pkg/estimator/hll" + "github.com/influxdata/influxdb/tsdb" +) + +// Ensure index implements interface. +var _ tsdb.Index = &Index{} + +// Index is the in memory index of a collection of measurements, time +// series, and their tags. Exported functions are goroutine safe while +// un-exported functions assume the caller will use the appropriate locks. +type Index struct { + // In-memory metadata index, built on load and updated when new series come in + mu sync.RWMutex + measurements map[string]*tsdb.Measurement // measurement name to object and index + series map[string]*tsdb.Series // map series key to the Series object + lastID uint64 // last used series ID. They're in memory only for this shard + + seriesSketch, seriesTSSketch *hll.Plus + measurementsSketch, measurementsTSSketch *hll.Plus + + name string // name of the database represented by this index +} + +// NewIndex returns a new initialized Index. +func NewIndex(name string) (index *Index, err error) { + index = &Index{ + measurements: make(map[string]*tsdb.Measurement), + series: make(map[string]*tsdb.Series), + name: name, + } + + if index.seriesSketch, err = hll.NewPlus(16); err != nil { + return nil, err + } else if index.seriesTSSketch, err = hll.NewPlus(16); err != nil { + return nil, err + } else if index.measurementsSketch, err = hll.NewPlus(16); err != nil { + return nil, err + } else if index.measurementsTSSketch, err = hll.NewPlus(16); err != nil { + return nil, err + } + + return index, nil +} + +func (i *Index) Open() (err error) { return nil } +func (i *Index) Close() error { return nil } + +// Series returns a series by key. +func (i *Index) Series(key []byte) (*tsdb.Series, error) { + i.mu.RLock() + s := i.series[string(key)] + i.mu.RUnlock() + return s, nil +} + +// SeriesN returns the exact number of series in the index. +func (i *Index) SeriesN() (uint64, error) { + i.mu.RLock() + defer i.mu.RUnlock() + return uint64(len(i.series)), nil +} + +// CreateSeriesIfNotExists creates a series if it doesn't already exist. +func (i *Index) CreateSeriesIfNotExists(name []byte, tags models.Tags) error { + panic("TODO") +} + +// SeriesSketch returns the sketch for the series. +func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { + i.mu.RLock() + defer i.mu.RUnlock() + return i.seriesSketch, i.seriesTSSketch, nil +} + +// Measurement returns the measurement object from the index by the name +func (i *Index) Measurement(name []byte) (*tsdb.Measurement, error) { + i.mu.RLock() + defer i.mu.RUnlock() + return i.measurements[string(name)], nil +} + +// MeasurementsSketch returns the sketch for the series. +func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { + i.mu.RLock() + defer i.mu.RUnlock() + return i.measurementsSketch, i.measurementsTSSketch, nil +} + +// MeasurementsByName returns a list of measurements. +func (i *Index) MeasurementsByName(names [][]byte) ([]*tsdb.Measurement, error) { + i.mu.RLock() + defer i.mu.RUnlock() + + a := make([]*tsdb.Measurement, 0, len(names)) + for _, name := range names { + if m := i.measurements[string(name)]; m != nil { + a = append(a, m) + } + } + return a, nil +} + +// CreateSeriesIndexIfNotExists adds the series for the given measurement to the +// index and sets its ID or returns the existing series object +func (i *Index) CreateSeriesIndexIfNotExists(measurementName string, series *tsdb.Series) (*tsdb.Series, error) { + i.mu.RLock() + // if there is a measurement for this id, it's already been added + ss := i.series[series.Key] + if ss != nil { + i.mu.RUnlock() + return ss, nil + } + i.mu.RUnlock() + + // get or create the measurement index + m, err := i.CreateMeasurementIndexIfNotExists(measurementName) + if err != nil { + return nil, err + } + + i.mu.Lock() + // Check for the series again under a write lock + ss = i.series[series.Key] + if ss != nil { + i.mu.Unlock() + return ss, nil + } + + // set the in memory ID for query processing on this shard + series.ID = i.lastID + 1 + i.lastID++ + + series.SetMeasurement(m) + i.series[series.Key] = series + + m.AddSeries(series) + + // Add the series to the series sketch. + i.seriesSketch.Add([]byte(series.Key)) + i.mu.Unlock() + + return series, nil +} + +// CreateMeasurementIndexIfNotExists creates or retrieves an in memory index +// object for the measurement +func (i *Index) CreateMeasurementIndexIfNotExists(name string) (*tsdb.Measurement, error) { + name = escape.UnescapeString(name) + + // See if the measurement exists using a read-lock + i.mu.RLock() + m := i.measurements[name] + if m != nil { + i.mu.RUnlock() + return m, nil + } + i.mu.RUnlock() + + // Doesn't exist, so lock the index to create it + i.mu.Lock() + defer i.mu.Unlock() + + // Make sure it was created in between the time we released our read-lock + // and acquire the write lock + m = i.measurements[name] + if m == nil { + m = tsdb.NewMeasurement(name) + i.measurements[name] = m + + // Add the measurement to the measurements sketch. + i.measurementsSketch.Add([]byte(name)) + } + return m, nil +} + +// TagsForSeries returns the tag map for the passed in series +func (i *Index) TagsForSeries(key string) (models.Tags, error) { + i.mu.RLock() + defer i.mu.RUnlock() + + ss := i.series[key] + if ss == nil { + return nil, nil + } + return ss.Tags, nil +} + +// MeasurementsByExpr takes an expression containing only tags and returns a +// list of matching *tsdb.Measurement. The bool return argument returns if the +// expression was a measurement expression. It is used to differentiate a list +// of no measurements because all measurements were filtered out (when the bool +// is true) against when there are no measurements because the expression +// wasn't evaluated (when the bool is false). +func (i *Index) MeasurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool, error) { + i.mu.RLock() + defer i.mu.RUnlock() + return i.measurementsByExpr(expr) +} + +func (i *Index) measurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool, error) { + if expr == nil { + return nil, false, nil + } + + switch e := expr.(type) { + case *influxql.BinaryExpr: + switch e.Op { + case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: + tag, ok := e.LHS.(*influxql.VarRef) + if !ok { + return nil, false, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String()) + } + + tf := &tsdb.TagFilter{ + Op: e.Op, + Key: tag.Val, + } + + if influxql.IsRegexOp(e.Op) { + re, ok := e.RHS.(*influxql.RegexLiteral) + if !ok { + return nil, false, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String()) + } + tf.Regex = re.Val + } else { + s, ok := e.RHS.(*influxql.StringLiteral) + if !ok { + return nil, false, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String()) + } + tf.Value = s.Val + } + + // Match on name, if specified. + if tag.Val == "_name" { + return i.measurementsByNameFilter(tf.Op, tf.Value, tf.Regex), true, nil + } else if influxql.IsSystemName(tag.Val) { + return nil, false, nil + } + + return i.measurementsByTagFilters([]*tsdb.TagFilter{tf}), true, nil + case influxql.OR, influxql.AND: + lhsIDs, lhsOk, err := i.measurementsByExpr(e.LHS) + if err != nil { + return nil, false, err + } + + rhsIDs, rhsOk, err := i.measurementsByExpr(e.RHS) + if err != nil { + return nil, false, err + } + + if lhsOk && rhsOk { + if e.Op == influxql.OR { + return lhsIDs.Union(rhsIDs), true, nil + } + + return lhsIDs.Intersect(rhsIDs), true, nil + } else if lhsOk { + return lhsIDs, true, nil + } else if rhsOk { + return rhsIDs, true, nil + } + return nil, false, nil + default: + return nil, false, fmt.Errorf("invalid tag comparison operator") + } + case *influxql.ParenExpr: + return i.measurementsByExpr(e.Expr) + } + return nil, false, fmt.Errorf("%#v", expr) +} + +// measurementsByNameFilter returns the sorted measurements matching a name. +func (i *Index) measurementsByNameFilter(op influxql.Token, val string, regex *regexp.Regexp) tsdb.Measurements { + var measurements tsdb.Measurements + for _, m := range i.measurements { + var matched bool + switch op { + case influxql.EQ: + matched = m.Name == val + case influxql.NEQ: + matched = m.Name != val + case influxql.EQREGEX: + matched = regex.MatchString(m.Name) + case influxql.NEQREGEX: + matched = !regex.MatchString(m.Name) + } + + if !matched { + continue + } + measurements = append(measurements, m) + } + sort.Sort(measurements) + return measurements +} + +// measurementsByTagFilters returns the sorted measurements matching the filters on tag values. +func (i *Index) measurementsByTagFilters(filters []*tsdb.TagFilter) tsdb.Measurements { + // If no filters, then return all measurements. + if len(filters) == 0 { + measurements := make(tsdb.Measurements, 0, len(i.measurements)) + for _, m := range i.measurements { + measurements = append(measurements, m) + } + return measurements + } + + // Build a list of measurements matching the filters. + var measurements tsdb.Measurements + var tagMatch bool + + // Iterate through all measurements in the database. + for _, m := range i.measurements { + // Iterate filters seeing if the measurement has a matching tag. + for _, f := range filters { + tagVals := m.SeriesByTagKeyValue(f.Key) + if tagVals == nil { + continue + } + + tagMatch = false + + // If the operator is non-regex, only check the specified value. + if f.Op == influxql.EQ || f.Op == influxql.NEQ { + if _, ok := tagVals[f.Value]; ok { + tagMatch = true + } + } else { + // Else, the operator is a regex and we have to check all tag + // values against the regular expression. + for tagVal := range tagVals { + if f.Regex.MatchString(tagVal) { + tagMatch = true + break + } + } + } + + isEQ := (f.Op == influxql.EQ || f.Op == influxql.EQREGEX) + + // + // XNOR gate + // + // tags match | operation is EQ | measurement matches + // -------------------------------------------------- + // True | True | True + // True | False | False + // False | True | False + // False | False | True + + if tagMatch == isEQ { + measurements = append(measurements, m) + break + } + } + } + + sort.Sort(measurements) + return measurements +} + +// MeasurementNamesByRegex returns the measurements that match the regex. +func (i *Index) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) { + i.mu.RLock() + defer i.mu.RUnlock() + + var matches [][]byte + for _, m := range i.measurements { + if re.MatchString(m.Name) { + matches = append(matches, []byte(m.Name)) + } + } + return matches, nil +} + +// Measurements returns a list of all measurements. +func (i *Index) Measurements() (tsdb.Measurements, error) { + i.mu.RLock() + measurements := make(tsdb.Measurements, 0, len(i.measurements)) + for _, m := range i.measurements { + measurements = append(measurements, m) + } + i.mu.RUnlock() + + return measurements, nil +} + +// DropMeasurement removes the measurement and all of its underlying +// series from the database index +func (i *Index) DropMeasurement(name []byte) error { + i.mu.Lock() + defer i.mu.Unlock() + return i.dropMeasurement(string(name)) +} + +func (i *Index) dropMeasurement(name string) error { + // Update the tombstone sketch. + i.measurementsTSSketch.Add([]byte(name)) + + m := i.measurements[name] + if m == nil { + return nil + } + + delete(i.measurements, name) + for _, s := range m.SeriesByIDMap() { + delete(i.series, s.Key) + } + return nil +} + +// DropSeries removes the series keys and their tags from the index +func (i *Index) DropSeries(keys [][]byte) error { + if len(keys) == 0 { + return nil + } + + i.mu.Lock() + defer i.mu.Unlock() + + var ( + mToDelete = map[string]struct{}{} + nDeleted int64 + ) + + for _, k := range keys { + // Update the tombstone sketch. + i.seriesTSSketch.Add(k) + + series := i.series[string(k)] + if series == nil { + continue + } + series.Measurement().DropSeries(series) + delete(i.series, string(k)) + nDeleted++ + + // If there are no more series in the measurement then we'll + // remove it. + if len(series.Measurement().SeriesByIDMap()) == 0 { + mToDelete[series.Measurement().Name] = struct{}{} + } + } + + for mname := range mToDelete { + i.dropMeasurement(mname) + } + return nil +} + +// Dereference removes all references to data within b and moves them to the heap. +func (i *Index) Dereference(b []byte) { + i.mu.RLock() + defer i.mu.RUnlock() + + for _, s := range i.series { + s.Dereference(b) + } +} + +// TagSets returns a list of tag sets. +func (i *Index) TagSets(shardID uint64, name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) { + i.mu.RLock() + defer i.mu.RUnlock() + + mm := i.measurements[string(name)] + if mm == nil { + return nil, nil + } + return mm.TagSets(shardID, dimensions, condition) +} diff --git a/tsdb/engine/tsi1/index.go b/tsdb/engine/tsi1/index.go index d6dbddb273..7ee04cf1fc 100644 --- a/tsdb/engine/tsi1/index.go +++ b/tsdb/engine/tsi1/index.go @@ -503,7 +503,7 @@ func (i *Index) MatchTagValueSeriesIterator(name, key []byte, value *regexp.Rege // TagSets returns an ordered list of tag sets for a measurement by dimension // and filtered by an optional conditional expression. -func (i *Index) TagSets(name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) { +func (i *Index) TagSets(shardID uint64, name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) { itr, err := i.MeasurementSeriesByExprIterator(name, condition) if err != nil { return nil, err diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index e3dd21298d..6c2cd27ecf 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1341,7 +1341,7 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions, aggregate bo if err := func() error { for _, name := range influxql.Sources(opt.Sources).Names() { // Generate tag sets from index. - tagSets, err := e.index.TagSets([]byte(name), opt.Dimensions, opt.Condition) + tagSets, err := e.index.TagSets(e.id, []byte(name), opt.Dimensions, opt.Condition) if err != nil { return err } diff --git a/tsdb/index.go b/tsdb/index.go index 55e2533668..8920d86501 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -28,5 +28,5 @@ type Index interface { Dereference(b []byte) - TagSets(name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) + TagSets(shardID uint64, name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) } diff --git a/tsdb/meta.go b/tsdb/meta.go index 4e2c5872db..a8e25b8203 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -1,6 +1,7 @@ package tsdb import ( + "bytes" "fmt" "regexp" "sort" @@ -17,479 +18,6 @@ import ( //go:generate protoc --gogo_out=. internal/meta.proto -/* -// DatabaseIndex is the in memory index of a collection of measurements, time series, and their tags. -// Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks. -type DatabaseIndex struct { - // in memory metadata index, built on load and updated when new series come in - mu sync.RWMutex - measurements map[string]*Measurement // measurement name to object and index - series map[string]*Series // map series key to the Series object - lastID uint64 // last used series ID. They're in memory only for this shard - - seriesSketch, seriesTSSketch *hll.Plus - measurementsSketch, measurementsTSSketch *hll.Plus - - name string // name of the database represented by this index -} - -// NewDatabaseIndex returns a new initialized DatabaseIndex. -func NewDatabaseIndex(name string) (index *DatabaseIndex, err error) { - index = &DatabaseIndex{ - measurements: make(map[string]*Measurement), - series: make(map[string]*Series), - name: name, - } - - if index.seriesSketch, err = hll.NewPlus(16); err != nil { - return nil, err - } else if index.seriesTSSketch, err = hll.NewPlus(16); err != nil { - return nil, err - } else if index.measurementsSketch, err = hll.NewPlus(16); err != nil { - return nil, err - } else if index.measurementsTSSketch, err = hll.NewPlus(16); err != nil { - return nil, err - } - - return index, nil -} - -func (d *DatabaseIndex) Open() (err error) { return nil } -func (d *DatabaseIndex) Close() error { return nil } - -// Series returns a series by key. -func (d *DatabaseIndex) Series(key []byte) (*Series, error) { - d.mu.RLock() - s := d.series[string(key)] - d.mu.RUnlock() - return s, nil -} - -// SeriesN returns the exact number of series in the index. -func (d *DatabaseIndex) SeriesN() (uint64, error) { - d.mu.RLock() - defer d.mu.RUnlock() - return uint64(len(d.series)), nil -} - -// SeriesSketch returns the sketch for the series. -func (d *DatabaseIndex) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { - d.mu.RLock() - defer d.mu.RUnlock() - return d.seriesSketch, d.seriesTSSketch, nil -} - -// Measurement returns the measurement object from the index by the name -func (d *DatabaseIndex) Measurement(name []byte) (*Measurement, error) { - d.mu.RLock() - defer d.mu.RUnlock() - return d.measurements[string(name)], nil -} - -// MeasurementsSketch returns the sketch for the series. -func (d *DatabaseIndex) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { - d.mu.RLock() - defer d.mu.RUnlock() - return d.measurementsSketch, d.measurementsTSSketch, nil -} - -// MeasurementsByName returns a list of measurements. -func (d *DatabaseIndex) MeasurementsByName(names []string) ([]*Measurement, error) { - d.mu.RLock() - defer d.mu.RUnlock() - - a := make([]*Measurement, 0, len(names)) - for _, name := range names { - if m := d.measurements[name]; m != nil { - a = append(a, m) - } - } - return a, nil -} - -// MeasurementSeriesCounts returns the number of measurements and series currently indexed by the database. -// Useful for reporting and monitoring. -func (d *DatabaseIndex) MeasurementSeriesCounts() (nMeasurements int, nSeries int) { - d.mu.RLock() - defer d.mu.RUnlock() - nMeasurements, nSeries = len(d.measurements), len(d.series) - return -} - -// SeriesShardN returns the series count for a shard. -func (d *DatabaseIndex) SeriesShardN(shardID uint64) int { - var n int - d.mu.RLock() - for _, s := range d.series { - if s.Assigned(shardID) { - n++ - } - } - d.mu.RUnlock() - return n -} - -// CreateSeriesIndexIfNotExists adds the series for the given measurement to the -// index and sets its ID or returns the existing series object -func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, series *Series) (*Series, error) { - d.mu.RLock() - // if there is a measurement for this id, it's already been added - ss := d.series[series.Key] - if ss != nil { - d.mu.RUnlock() - return ss, nil - } - d.mu.RUnlock() - - // get or create the measurement index - m, err := d.CreateMeasurementIndexIfNotExists(measurementName) - if err != nil { - return nil, err - } - - d.mu.Lock() - // Check for the series again under a write lock - ss = d.series[series.Key] - if ss != nil { - d.mu.Unlock() - return ss, nil - } - - // set the in memory ID for query processing on this shard - series.ID = d.lastID + 1 - d.lastID++ - - series.measurement = m - d.series[series.Key] = series - - m.AddSeries(series) - - // Add the series to the series sketch. - d.seriesSketch.Add([]byte(series.Key)) - d.mu.Unlock() - - return series, nil -} - -// CreateMeasurementIndexIfNotExists creates or retrieves an in memory index -// object for the measurement -func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) (*Measurement, error) { - name = escape.UnescapeString(name) - - // See if the measurement exists using a read-lock - d.mu.RLock() - m := d.measurements[name] - if m != nil { - d.mu.RUnlock() - return m, nil - } - d.mu.RUnlock() - - // Doesn't exist, so lock the index to create it - d.mu.Lock() - defer d.mu.Unlock() - - // Make sure it was created in between the time we released our read-lock - // and acquire the write lock - m = d.measurements[name] - if m == nil { - m = NewMeasurement(name) - d.measurements[name] = m - - // Add the measurement to the measurements sketch. - d.measurementsSketch.Add([]byte(name)) - } - return m, nil -} - -// TagsForSeries returns the tag map for the passed in series -func (d *DatabaseIndex) TagsForSeries(key string) (models.Tags, error) { - d.mu.RLock() - defer d.mu.RUnlock() - - ss := d.series[key] - if ss == nil { - return nil, nil - } - return ss.Tags, nil -} - -// MeasurementsByExpr takes an expression containing only tags and returns a -// list of matching *Measurement. The bool return argument returns if the -// expression was a measurement expression. It is used to differentiate a list -// of no measurements because all measurements were filtered out (when the bool -// is true) against when there are no measurements because the expression -// wasn't evaluated (when the bool is false). -func (d *DatabaseIndex) MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error) { - d.mu.RLock() - defer d.mu.RUnlock() - return d.measurementsByExpr(expr) -} - -func (d *DatabaseIndex) measurementsByExpr(expr influxql.Expr) (Measurements, bool, error) { - if expr == nil { - return nil, false, nil - } - - switch e := expr.(type) { - case *influxql.BinaryExpr: - switch e.Op { - case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: - tag, ok := e.LHS.(*influxql.VarRef) - if !ok { - return nil, false, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String()) - } - - tf := &TagFilter{ - Op: e.Op, - Key: tag.Val, - } - - if influxql.IsRegexOp(e.Op) { - re, ok := e.RHS.(*influxql.RegexLiteral) - if !ok { - return nil, false, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String()) - } - tf.Regex = re.Val - } else { - s, ok := e.RHS.(*influxql.StringLiteral) - if !ok { - return nil, false, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String()) - } - tf.Value = s.Val - } - - // Match on name, if specified. - if tag.Val == "_name" { - return d.measurementsByNameFilter(tf.Op, tf.Value, tf.Regex), true, nil - } else if influxql.IsSystemName(tag.Val) { - return nil, false, nil - } - - return d.measurementsByTagFilters([]*TagFilter{tf}), true, nil - case influxql.OR, influxql.AND: - lhsIDs, lhsOk, err := d.measurementsByExpr(e.LHS) - if err != nil { - return nil, false, err - } - - rhsIDs, rhsOk, err := d.measurementsByExpr(e.RHS) - if err != nil { - return nil, false, err - } - - if lhsOk && rhsOk { - if e.Op == influxql.OR { - return lhsIDs.union(rhsIDs), true, nil - } - - return lhsIDs.intersect(rhsIDs), true, nil - } else if lhsOk { - return lhsIDs, true, nil - } else if rhsOk { - return rhsIDs, true, nil - } - return nil, false, nil - default: - return nil, false, fmt.Errorf("invalid tag comparison operator") - } - case *influxql.ParenExpr: - return d.measurementsByExpr(e.Expr) - } - return nil, false, fmt.Errorf("%#v", expr) -} - -// measurementsByNameFilter returns the sorted measurements matching a name. -func (d *DatabaseIndex) measurementsByNameFilter(op influxql.Token, val string, regex *regexp.Regexp) Measurements { - var measurements Measurements - for _, m := range d.measurements { - var matched bool - switch op { - case influxql.EQ: - matched = m.Name == val - case influxql.NEQ: - matched = m.Name != val - case influxql.EQREGEX: - matched = regex.MatchString(m.Name) - case influxql.NEQREGEX: - matched = !regex.MatchString(m.Name) - } - - if !matched { - continue - } - measurements = append(measurements, m) - } - sort.Sort(measurements) - return measurements -} - -// measurementsByTagFilters returns the sorted measurements matching the filters on tag values. -func (d *DatabaseIndex) measurementsByTagFilters(filters []*TagFilter) Measurements { - // If no filters, then return all measurements. - if len(filters) == 0 { - measurements := make(Measurements, 0, len(d.measurements)) - for _, m := range d.measurements { - measurements = append(measurements, m) - } - return measurements - } - - // Build a list of measurements matching the filters. - var measurements Measurements - var tagMatch bool - - // Iterate through all measurements in the database. - for _, m := range d.measurements { - // Iterate filters seeing if the measurement has a matching tag. - for _, f := range filters { - m.mu.RLock() - tagVals, ok := m.seriesByTagKeyValue[f.Key] - m.mu.RUnlock() - if !ok { - continue - } - - tagMatch = false - - // If the operator is non-regex, only check the specified value. - if f.Op == influxql.EQ || f.Op == influxql.NEQ { - if _, ok := tagVals[f.Value]; ok { - tagMatch = true - } - } else { - // Else, the operator is a regex and we have to check all tag - // values against the regular expression. - for tagVal := range tagVals { - if f.Regex.MatchString(tagVal) { - tagMatch = true - break - } - } - } - - isEQ := (f.Op == influxql.EQ || f.Op == influxql.EQREGEX) - - // - // XNOR gate - // - // tags match | operation is EQ | measurement matches - // -------------------------------------------------- - // True | True | True - // True | False | False - // False | True | False - // False | False | True - - if tagMatch == isEQ { - measurements = append(measurements, m) - break - } - } - } - - sort.Sort(measurements) - return measurements -} - -// MeasurementNamesByRegex returns the measurements that match the regex. -func (d *DatabaseIndex) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) { - d.mu.RLock() - defer d.mu.RUnlock() - - var matches [][]byte - for _, m := range d.measurements { - if re.MatchString(m.Name) { - matches = append(matches,[]byte(m.Name)) - } - } - return matches, nil -} - -// Measurements returns a list of all measurements. -func (d *DatabaseIndex) Measurements() (Measurements, error) { - d.mu.RLock() - measurements := make(Measurements, 0, len(d.measurements)) - for _, m := range d.measurements { - measurements = append(measurements, m) - } - d.mu.RUnlock() - - return measurements, nil -} - -// DropMeasurement removes the measurement and all of its underlying -// series from the database index -func (d *DatabaseIndex) DropMeasurement(name []byte) error { - d.mu.Lock() - defer d.mu.Unlock() - return d.dropMeasurement(string(name)) -} - -func (d *DatabaseIndex) dropMeasurement(name string) error { - // Update the tombstone sketch. - d.measurementsTSSketch.Add([]byte(name)) - - m := d.measurements[name] - if m == nil { - return nil - } - - delete(d.measurements, name) - for _, s := range m.seriesByID { - delete(d.series, s.Key) - } - return nil -} - -// DropSeries removes the series keys and their tags from the index -func (d *DatabaseIndex) DropSeries(keys []string) error { - if len(keys) == 0 { - return nil - } - - d.mu.Lock() - defer d.mu.Unlock() - - var ( - mToDelete = map[string]struct{}{} - nDeleted int64 - ) - - for _, k := range keys { - // Update the tombstone sketch. - d.seriesTSSketch.Add([]byte(k)) - - series := d.series[k] - if series == nil { - continue - } - series.measurement.DropSeries(series) - delete(d.series, k) - nDeleted++ - - // If there are no more series in the measurement then we'll - // remove it. - if len(series.measurement.seriesByID) == 0 { - mToDelete[series.measurement.Name] = struct{}{} - } - } - - for mname := range mToDelete { - d.dropMeasurement(mname) - } - return nil -} - -// Dereference removes all references to data within b and moves them to the heap. -func (d *DatabaseIndex) Dereference(b []byte) { - d.mu.RLock() - defer d.mu.RUnlock() - - for _, s := range d.series { - s.Dereference(b) - } -} -*/ // Measurement represents a collection of time series in a database. It also // contains in memory structures for indexing tags. Exported functions are @@ -530,6 +58,13 @@ func (m *Measurement) SeriesByID(id uint64) *Series { return m.seriesByID[id] } +// SeriesByIDMap returns the internal seriesByID map. +func (m *Measurement) SeriesByIDMap() map[uint64]*Series { + m.mu.RLock() + defer m.mu.RUnlock() + return m.seriesByID +} + // SeriesByIDSlice returns a list of series by identifiers. func (m *Measurement) SeriesByIDSlice(ids []uint64) []*Series { m.mu.RLock() @@ -572,7 +107,17 @@ func (m *Measurement) HasTagKey(k string) bool { return hasTag } +<<<<<<< HEAD // HasTagKeyValue returns true if at least one series in this measurement has written a value the given tag key and tag value. +======= +func (m *Measurement) SeriesByTagValue(key string) map[string]SeriesIDs { + m.mu.RLock() + tagVals := m.seriesByTagKeyValue[key] + m.mu.RUnlock() + return tagVals +} + +>>>>>>> ee54c3e... intermediate func (m *Measurement) HasTagKeyValue(k, v []byte) bool { m.mu.RLock() if vals, ok := m.seriesByTagKeyValue[string(k)]; ok { @@ -691,19 +236,215 @@ func (m *Measurement) DropSeries(series *Series) { return } -// TagSets returns the unique tag sets that exist for the given tag keys. This -// is used to determine what composite series will be created by a group by. -// -// i.e. "group by region" should return: {"region":"uswest"}, -// {"region":"useast"} or region, service returns {"region": "uswest", -// "service": "redis"}, {"region": "uswest", "service": "mysql"}, etc... -// -// This will also populate the TagSet objects with the series IDs that match -// each tagset and any influx filter expression that goes with the series TODO: -// this shouldn't be exported. However, until tx.go and the engine get -// refactored into tsdb, we need it. -func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) { - panic("MOVED") +// filters walks the where clause of a select statement and returns a map with all series ids +// matching the where clause and any filter expression that should be applied to each +func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]influxql.Expr, error) { + if condition == nil || influxql.OnlyTimeExpr(condition) { + return m.seriesIDs, nil, nil + } + return m.walkWhereForSeriesIds(condition) +} + +// TagSets returns the unique tag sets that exist for the given tag keys. This is used to determine +// what composite series will be created by a group by. i.e. "group by region" should return: +// {"region":"uswest"}, {"region":"useast"} +// or region, service returns +// {"region": "uswest", "service": "redis"}, {"region": "uswest", "service": "mysql"}, etc... +// This will also populate the TagSet objects with the series IDs that match each tagset and any +// influx filter expression that goes with the series +// TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it. +func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) { + m.mu.RLock() + + // get the unique set of series ids and the filters that should be applied to each + ids, filters, err := m.filters(condition) + if err != nil { + m.mu.RUnlock() + return nil, err + } + + // For every series, get the tag values for the requested tag keys i.e. dimensions. This is the + // TagSet for that series. Series with the same TagSet are then grouped together, because for the + // purpose of GROUP BY they are part of the same composite series. + tagSets := make(map[string]*influxql.TagSet, 64) + for _, id := range ids { + s := m.seriesByID[id] + if !s.Assigned(shardID) { + continue + } + tags := make(map[string]string, len(dimensions)) + + // Build the TagSet for this series. + for _, dim := range dimensions { + tags[dim] = s.Tags.GetString(dim) + } + // Convert the TagSet to a string, so it can be added to a map allowing TagSets to be handled + // as a set. + tagsAsKey := MarshalTags(tags) + tagSet, ok := tagSets[string(tagsAsKey)] + if !ok { + // This TagSet is new, create a new entry for it. + tagSet = &influxql.TagSet{ + Tags: tags, + Key: tagsAsKey, + } + } + // Associate the series and filter with the Tagset. + tagSet.AddFilter(m.seriesByID[id].Key, filters[id]) + + // Ensure it's back in the map. + tagSets[string(tagsAsKey)] = tagSet + } + // Release the lock while we sort all the tags + m.mu.RUnlock() + + // Sort the series in each tag set. + for _, t := range tagSets { + sort.Sort(t) + } + + // The TagSets have been created, as a map of TagSets. Just send + // the values back as a slice, sorting for consistency. + sortedTagsSets := make([]*influxql.TagSet, 0, len(tagSets)) + for _, v := range tagSets { + sortedTagsSets = append(sortedTagsSets, v) + } + sort.Sort(byTagKey(sortedTagsSets)) + + return sortedTagsSets, nil +} + +// intersectSeriesFilters performs an intersection for two sets of ids and filter expressions. +func intersectSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) { + // We only want to allocate a slice and map of the smaller size. + var ids []uint64 + if len(lids) > len(rids) { + ids = make([]uint64, 0, len(rids)) + } else { + ids = make([]uint64, 0, len(lids)) + } + + var filters FilterExprs + if len(lfilters) > len(rfilters) { + filters = make(FilterExprs, len(rfilters)) + } else { + filters = make(FilterExprs, len(lfilters)) + } + + // They're in sorted order so advance the counter as needed. + // This is, don't run comparisons against lower values that we've already passed. + for len(lids) > 0 && len(rids) > 0 { + lid, rid := lids[0], rids[0] + if lid == rid { + ids = append(ids, lid) + + var expr influxql.Expr + lfilter := lfilters[lid] + rfilter := rfilters[rid] + + if lfilter != nil && rfilter != nil { + be := &influxql.BinaryExpr{ + Op: influxql.AND, + LHS: lfilter, + RHS: rfilter, + } + expr = influxql.Reduce(be, nil) + } else if lfilter != nil { + expr = lfilter + } else if rfilter != nil { + expr = rfilter + } + + if expr != nil { + filters[lid] = expr + } + lids, rids = lids[1:], rids[1:] + } else if lid < rid { + lids = lids[1:] + } else { + rids = rids[1:] + } + } + return ids, filters +} + +// unionSeriesFilters performs a union for two sets of ids and filter expressions. +func unionSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) { + ids := make([]uint64, 0, len(lids)+len(rids)) + + // Setup the filters with the smallest size since we will discard filters + // that do not have a match on the other side. + var filters FilterExprs + if len(lfilters) < len(rfilters) { + filters = make(FilterExprs, len(lfilters)) + } else { + filters = make(FilterExprs, len(rfilters)) + } + + for len(lids) > 0 && len(rids) > 0 { + lid, rid := lids[0], rids[0] + if lid == rid { + ids = append(ids, lid) + + // If one side does not have a filter, then the series has been + // included on one side of the OR with no condition. Eliminate the + // filter in this case. + var expr influxql.Expr + lfilter := lfilters[lid] + rfilter := rfilters[rid] + if lfilter != nil && rfilter != nil { + be := &influxql.BinaryExpr{ + Op: influxql.OR, + LHS: lfilter, + RHS: rfilter, + } + expr = influxql.Reduce(be, nil) + } + + if expr != nil { + filters[lid] = expr + } + lids, rids = lids[1:], rids[1:] + } else if lid < rid { + ids = append(ids, lid) + + filter := lfilters[lid] + if filter != nil { + filters[lid] = filter + } + lids = lids[1:] + } else { + ids = append(ids, rid) + + filter := rfilters[rid] + if filter != nil { + filters[rid] = filter + } + rids = rids[1:] + } + } + + // Now append the remainder. + if len(lids) > 0 { + for i := 0; i < len(lids); i++ { + ids = append(ids, lids[i]) + + filter := lfilters[lids[i]] + if filter != nil { + filters[lids[i]] = filter + } + } + } else if len(rids) > 0 { + for i := 0; i < len(rids); i++ { + ids = append(ids, rids[i]) + + filter := rfilters[rids[i]] + if filter != nil { + filters[rids[i]] = filter + } + } + } + return ids, filters } // IDsForExpr returns the series IDs that are candidates to match the given expression. @@ -899,6 +640,71 @@ func (fe FilterExprs) Len() int { return len(fe) } +// walkWhereForSeriesIds recursively walks the WHERE clause and returns an ordered set of series IDs and +// a map from those series IDs to filter expressions that should be used to limit points returned in +// the final query result. +func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, FilterExprs, error) { + switch n := expr.(type) { + case *influxql.BinaryExpr: + switch n.Op { + case influxql.EQ, influxql.NEQ, influxql.LT, influxql.LTE, influxql.GT, influxql.GTE, influxql.EQREGEX, influxql.NEQREGEX: + // Get the series IDs and filter expression for the tag or field comparison. + ids, expr, err := m.idsForExpr(n) + if err != nil { + return nil, nil, err + } + + if len(ids) == 0 { + return ids, nil, nil + } + + // If the expression is a boolean literal that is true, ignore it. + if b, ok := expr.(*influxql.BooleanLiteral); ok && b.Val { + expr = nil + } + + var filters FilterExprs + if expr != nil { + filters = make(FilterExprs, len(ids)) + for _, id := range ids { + filters[id] = expr + } + } + + return ids, filters, nil + case influxql.AND, influxql.OR: + // Get the series IDs and filter expressions for the LHS. + lids, lfilters, err := m.walkWhereForSeriesIds(n.LHS) + if err != nil { + return nil, nil, err + } + + // Get the series IDs and filter expressions for the RHS. + rids, rfilters, err := m.walkWhereForSeriesIds(n.RHS) + if err != nil { + return nil, nil, err + } + + // Combine the series IDs from the LHS and RHS. + if n.Op == influxql.AND { + ids, filters := intersectSeriesFilters(lids, rids, lfilters, rfilters) + return ids, filters, nil + } else { + ids, filters := unionSeriesFilters(lids, rids, lfilters, rfilters) + return ids, filters, nil + } + } + + ids, _, err := m.idsForExpr(n) + return ids, nil, err + case *influxql.ParenExpr: + // walk down the tree + return m.walkWhereForSeriesIds(n.Expr) + default: + return nil, nil, nil + } +} + // expandExpr returns a list of expressions expanded by all possible tag // combinations. func (m *Measurement) expandExpr(expr influxql.Expr) []tagSetExpr { @@ -1207,6 +1013,7 @@ type Series struct { Tags models.Tags ID uint64 measurement *Measurement + shardIDs []uint64 // shards that have this series defined } // NewSeries returns an initialized series struct @@ -1217,6 +1024,48 @@ func NewSeries(key []byte, tags models.Tags) *Series { } } +func (s *Series) AssignShard(shardID uint64) { + s.mu.Lock() + if !s.assigned(shardID) { + s.shardIDs = append(s.shardIDs, shardID) + sort.Sort(uint64Slice(s.shardIDs)) + } + s.mu.Unlock() +} + +func (s *Series) UnassignShard(shardID uint64) { + s.mu.Lock() + for i, v := range s.shardIDs { + if v == shardID { + s.shardIDs = append(s.shardIDs[:i], s.shardIDs[i+1:]...) + break + } + } + s.mu.Unlock() +} + +func (s *Series) Assigned(shardID uint64) bool { + s.mu.RLock() + b := s.assigned(shardID) + s.mu.RUnlock() + return b +} + +func (s *Series) assigned(shardID uint64) bool { + i := sort.Search(len(s.shardIDs), func(i int) bool { return s.shardIDs[i] >= shardID }) + return i < len(s.shardIDs) && s.shardIDs[i] == shardID +} + +// Measurement returns the measurement on the series. +func (s *Series) Measurement() *Measurement { + return s.measurement +} + +// SetMeasurement sets the measurement on the series. +func (s *Series) SetMeasurement(m *Measurement) { + s.measurement = m +} + // Dereference removes references to a byte slice. func (s *Series) Dereference(b []byte) { s.mu.Lock() @@ -1695,3 +1544,15 @@ func MeasurementFromSeriesKey(key string) string { k, _, _ := models.ParseKey([]byte(key)) return escape.UnescapeString(k) } + +type uint64Slice []uint64 + +func (a uint64Slice) Len() int { return len(a) } +func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] } + +type byTagKey []*influxql.TagSet + +func (t byTagKey) Len() int { return len(t) } +func (t byTagKey) Less(i, j int) bool { return bytes.Compare(t[i].Key, t[j].Key) < 0 } +func (t byTagKey) Swap(i, j int) { t[i], t[j] = t[j], t[i] } diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 8a9ad4e8ad..3bd7aa6730 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -346,11 +346,10 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) { tmpShard := path.Join(tmpDir, "shard") tmpWal := path.Join(tmpDir, "wal") - index := tsdb.NewDatabaseIndex("db") opts := tsdb.NewEngineOptions() opts.Config.WALDir = filepath.Join(tmpDir, "wal") - sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts) + sh := tsdb.NewShard(1, tmpShard, tmpWal, opts) if err := sh.Open(); err != nil { t.Fatalf("error opening shard: %s", err.Error()) } @@ -380,7 +379,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) { go func() { defer wg.Done() for i := 0; i < 50; i++ { - if err := sh.DeleteMeasurement("cpu", []string{"cpu,host=server"}); err != nil { + if err := sh.DeleteMeasurement([]byte("cpu")); err != nil { t.Fatalf(err.Error()) } @@ -395,7 +394,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) { go func() { defer wg.Done() for i := 0; i < 50; i++ { - if err := sh.DeleteMeasurement("cpu", []string{"cpu,host=server"}); err != nil { + if err := sh.DeleteMeasurement([]byte("cpu")); err != nil { t.Fatalf(err.Error()) }