Initial index interface

pull/7913/head
Edd Robinson 2016-09-14 15:15:23 +01:00 committed by Ben Johnson
parent 05bc4dec00
commit 2b8efefef4
No known key found for this signature in database
GPG Key ID: 81741CD251883081
4 changed files with 106 additions and 71 deletions

View File

@ -99,9 +99,7 @@ type Engine struct {
traceLogger zap.Logger // Logger to be used when trace-logging is on.
traceLogging bool
// TODO(benbjohnson): Index needs to be moved entirely into engine.
index *tsdb.DatabaseIndex
index tsdb.Index
fieldsMu sync.RWMutex
measurementFields map[string]*tsdb.MeasurementFields
@ -283,11 +281,11 @@ func (e *Engine) disableSnapshotCompactions() {
func (e *Engine) Path() string { return e.path }
func (e *Engine) Measurement(name string) (*tsdb.Measurement, error) {
return e.index.Measurement(name), nil
return e.index.Measurement(name)
}
func (e *Engine) Measurements() (tsdb.Measurements, error) {
return e.index.Measurements(), nil
return e.index.Measurements()
}
func (e *Engine) MeasurementCardinality() (int64, error) {
@ -299,7 +297,7 @@ func (e *Engine) MeasurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool
}
func (e *Engine) MeasurementsByRegex(re *regexp.Regexp) (tsdb.Measurements, error) {
return e.index.MeasurementsByRegex(re), nil
return e.index.MeasurementsByRegex(re)
}
// MeasurementFields returns the measurement fields for a measurement.
@ -434,6 +432,12 @@ func (e *Engine) Close() error {
defer e.mu.Unlock()
e.done = nil // Ensures that the channel will not be closed again.
if e.index != nil {
if err := e.index.Close(); err != nil {
return err
}
}
if err := e.FileStore.Close(); err != nil {
return err
}
@ -644,7 +648,7 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key []byte, fieldType influxq
seriesKey, field := SeriesAndFieldFromCompositeKey(key)
measurement := tsdb.MeasurementFromSeriesKey(string(seriesKey))
m := index.CreateMeasurementIndexIfNotExists(measurement)
m, _ := index.CreateMeasurementIndexIfNotExists(measurement)
m.SetFieldName(field)
e.fieldsMu.Lock()
@ -660,8 +664,10 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key []byte, fieldType influxq
}
// Have we already indexed this series?
ss := index.SeriesBytes(seriesKey)
if ss != nil {
ss, err := index.Series(string(seriesKey))
if err != nil {
return err
} else if ss != nil {
return nil
}
@ -844,14 +850,12 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error {
toDelete = append(toDelete, k)
}
}
e.index.DropSeries(toDelete)
return nil
return e.index.DropSeries(toDelete)
}
// CreateMeasurement creates a measurement on the index.
func (e *Engine) CreateMeasurement(name string) (*tsdb.Measurement, error) {
return e.index.CreateMeasurementIndexIfNotExists(name), nil
return e.index.CreateMeasurementIndexIfNotExists(name)
}
// DeleteMeasurement deletes a measurement and all related series.
@ -865,17 +869,16 @@ func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error {
}
// Remove the measurement from the index.
e.index.DropMeasurement(name)
return nil
return e.index.DropMeasurement(name)
}
func (e *Engine) CreateSeries(measurment string, series *tsdb.Series) (*tsdb.Series, error) {
return e.index.CreateSeriesIndexIfNotExists(measurment, series), nil
return e.index.CreateSeriesIndexIfNotExists(measurment, series)
}
// Series returns a series from the index.
func (e *Engine) Series(key string) (*tsdb.Series, error) {
return e.index.Series(key), nil
return e.index.Series(key)
}
// WriteTo is not implemented.
@ -1338,7 +1341,11 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions, aggregate bo
var itrs []influxql.Iterator
if err := func() error {
mms := tsdb.Measurements(e.index.MeasurementsByName(influxql.Sources(opt.Sources).Names()))
mByName, err := e.index.MeasurementsByName(influxql.Sources(opt.Sources).Names())
if err != nil {
return err
}
mms := tsdb.Measurements(mByName)
for _, mm := range mms {
// Determine tagsets for this measurement based on dimensions and filters.
@ -1468,7 +1475,12 @@ func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, mm *tsdb.Measu
// createVarRefSeriesIterator creates an iterator for a variable reference for a series.
func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measurement, seriesKey string, t *influxql.TagSet, filter influxql.Expr, conditionFields []influxql.VarRef, opt influxql.IteratorOptions) (influxql.Iterator, error) {
tags := influxql.NewTags(e.index.TagsForSeries(seriesKey).Map())
tfs, err := e.index.TagsForSeries(seriesKey)
if err != nil {
return nil, err
}
tags := influxql.NewTags(tfs.Map())
// Create options specific for this series.
itrOpt := opt

View File

@ -44,7 +44,10 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
}
// Verify index is correct.
if m := index.Measurement("cpu"); m == nil {
m, err := index.Measurement("cpu")
if err != nil {
t.Fatal(err)
} else if m == nil {
t.Fatal("measurement not found")
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "A"})) {
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags)
@ -67,7 +70,9 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
}
// Verify index is correct.
if m := index.Measurement("cpu"); m == nil {
if m, err = index.Measurement("cpu"); err != nil {
t.Fatal(err)
} else if m == nil {
t.Fatal("measurement not found")
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "A"})) {
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags)
@ -92,7 +97,9 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
}
// Verify index is correct.
if m := index.Measurement("cpu"); m == nil {
if m, err = index.Measurement("cpu"); err != nil {
t.Fatal(err)
} else if m == nil {
t.Fatal("measurement not found")
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "A"})) {
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags)

30
tsdb/index.go Normal file
View File

@ -0,0 +1,30 @@
package tsdb
import (
"regexp"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
)
type Index interface {
Open() error
Close() error
CreateMeasurementIndexIfNotExists(name string) (*Measurement, error)
Measurement(name string) (*Measurement, error)
Measurements() (Measurements, error)
MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error)
MeasurementsByName(names []string) ([]*Measurement, error)
MeasurementsByRegex(re *regexp.Regexp) (Measurements, error)
DropMeasurement(name string) error
CreateSeriesIndexIfNotExists(measurment string, series *Series) (*Series, error)
Series(key string) (*Series, error)
DropSeries(keys []string) error
SeriesN() (int64, error)
Statistics(tags map[string]string) []models.Statistic
TagsForSeries(key string) (models.Tags, error)
}

View File

@ -50,6 +50,9 @@ func NewDatabaseIndex(name string) *DatabaseIndex {
}
}
func (d *DatabaseIndex) Open() error { return nil }
func (d *DatabaseIndex) Close() error { return nil }
// IndexStatistics maintains statistics for the index.
type IndexStatistics struct {
NumSeries int64
@ -69,48 +72,29 @@ func (d *DatabaseIndex) Statistics(tags map[string]string) []models.Statistic {
}
// Series returns a series by key.
func (d *DatabaseIndex) Series(key string) *Series {
func (d *DatabaseIndex) Series(key string) (*Series, error) {
d.mu.RLock()
s := d.series[key]
d.mu.RUnlock()
return s
}
// SeriesBytes returns a series by key.
func (d *DatabaseIndex) SeriesBytes(key []byte) *Series {
d.mu.RLock()
s := d.series[string(key)]
d.mu.RUnlock()
return s
}
// SeriesKeys returns a sorted slice of strings indicating all the series keys in the index.
func (d *DatabaseIndex) SeriesKeys() []string {
d.mu.RLock()
s := make([]string, 0, len(d.series))
for k := range d.series {
s = append(s, k)
}
d.mu.RUnlock()
return s
return s, nil
}
// SeriesN returns the number of series.
func (d *DatabaseIndex) SeriesN() int {
func (d *DatabaseIndex) SeriesN() (int64, error) {
d.mu.RLock()
defer d.mu.RUnlock()
return len(d.series)
return int64(len(d.series)), nil
}
// Measurement returns the measurement object from the index by the name.
func (d *DatabaseIndex) Measurement(name string) *Measurement {
// Measurement returns the measurement object from the index by the name
func (d *DatabaseIndex) Measurement(name string) (*Measurement, error) {
d.mu.RLock()
defer d.mu.RUnlock()
return d.measurements[name]
return d.measurements[name], nil
}
// MeasurementsByName returns a list of all the measurements in the index that match any entry in names.
func (d *DatabaseIndex) MeasurementsByName(names []string) []*Measurement {
// MeasurementsByName returns a list of measurements.
func (d *DatabaseIndex) MeasurementsByName(names []string) ([]*Measurement, error) {
d.mu.RLock()
defer d.mu.RUnlock()
@ -120,7 +104,7 @@ func (d *DatabaseIndex) MeasurementsByName(names []string) []*Measurement {
a = append(a, m)
}
}
return a
return a, nil
}
// MeasurementSeriesCounts returns the number of measurements and series currently indexed by the database.
@ -147,25 +131,25 @@ func (d *DatabaseIndex) SeriesShardN(shardID uint64) int {
// CreateSeriesIndexIfNotExists adds the series for the given measurement to the
// index and sets its ID or returns the existing series object
func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, series *Series) *Series {
func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, series *Series) (*Series, error) {
d.mu.RLock()
// if there is a measurement for this id, it's already been added
ss := d.series[series.Key]
if ss != nil {
d.mu.RUnlock()
return ss
return ss, nil
}
d.mu.RUnlock()
// get or create the measurement index
m := d.CreateMeasurementIndexIfNotExists(measurementName)
m, _ := d.CreateMeasurementIndexIfNotExists(measurementName)
d.mu.Lock()
// Check for the series again under a write lock
ss = d.series[series.Key]
if ss != nil {
d.mu.Unlock()
return ss
return ss, nil
}
// set the in memory ID for query processing on this shard
@ -180,12 +164,12 @@ func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, ser
atomic.AddInt64(&d.stats.NumSeries, 1)
d.mu.Unlock()
return series
return series, nil
}
// CreateMeasurementIndexIfNotExists creates or retrieves an in memory index
// object for the measurement
func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurement {
func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) (*Measurement, error) {
name = escape.UnescapeString(name)
// See if the measurement exists using a read-lock
@ -193,7 +177,7 @@ func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurem
m := d.measurements[name]
if m != nil {
d.mu.RUnlock()
return m
return m, nil
}
d.mu.RUnlock()
@ -209,19 +193,19 @@ func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurem
d.measurements[name] = m
atomic.AddInt64(&d.stats.NumMeasurements, 1)
}
return m
return m, nil
}
// TagsForSeries returns the tag map for the passed in series
func (d *DatabaseIndex) TagsForSeries(key string) models.Tags {
func (d *DatabaseIndex) TagsForSeries(key string) (models.Tags, error) {
d.mu.RLock()
defer d.mu.RUnlock()
ss := d.series[key]
if ss == nil {
return nil
return nil, nil
}
return ss.Tags
return ss.Tags, nil
}
// MeasurementsByExpr takes an expression containing only tags and returns a
@ -399,7 +383,7 @@ func (d *DatabaseIndex) measurementsByTagFilters(filters []*TagFilter) Measureme
}
// MeasurementsByRegex returns the measurements that match the regex.
func (d *DatabaseIndex) MeasurementsByRegex(re *regexp.Regexp) Measurements {
func (d *DatabaseIndex) MeasurementsByRegex(re *regexp.Regexp) (Measurements, error) {
d.mu.RLock()
defer d.mu.RUnlock()
@ -409,11 +393,11 @@ func (d *DatabaseIndex) MeasurementsByRegex(re *regexp.Regexp) Measurements {
matches = append(matches, m)
}
}
return matches
return matches, nil
}
// Measurements returns a list of all measurements.
func (d *DatabaseIndex) Measurements() Measurements {
func (d *DatabaseIndex) Measurements() (Measurements, error) {
d.mu.RLock()
measurements := make(Measurements, 0, len(d.measurements))
for _, m := range d.measurements {
@ -421,15 +405,16 @@ func (d *DatabaseIndex) Measurements() Measurements {
}
d.mu.RUnlock()
return measurements
return measurements, nil
}
// DropMeasurement removes the measurement and all of its underlying
// series from the database index.
func (d *DatabaseIndex) DropMeasurement(name string) {
// series from the database index
func (d *DatabaseIndex) DropMeasurement(name string) error {
d.mu.Lock()
defer d.mu.Unlock()
d.dropMeasurement(name)
return nil
}
func (d *DatabaseIndex) dropMeasurement(name string) {
@ -447,10 +432,10 @@ func (d *DatabaseIndex) dropMeasurement(name string) {
atomic.AddInt64(&d.stats.NumMeasurements, -1)
}
// DropSeries removes the series keys and their tags from the index.
func (d *DatabaseIndex) DropSeries(keys []string) {
// DropSeries removes the series keys and their tags from the index
func (d *DatabaseIndex) DropSeries(keys []string) error {
if len(keys) == 0 {
return
return nil
}
d.mu.Lock()
@ -481,6 +466,7 @@ func (d *DatabaseIndex) DropSeries(keys []string) {
d.dropMeasurement(mname)
}
atomic.AddInt64(&d.stats.NumSeries, -nDeleted)
return nil
}
// Dereference removes all references to data within b and moves them to the heap.