Remove in-memory index from Shard and Store
parent
2171d9471b
commit
c535e3899a
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"regexp"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
|
@ -36,12 +37,22 @@ type Engine interface {
|
|||
|
||||
CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error)
|
||||
WritePoints(points []models.Point) error
|
||||
|
||||
CreateSeries(measurment string, series *Series) (*Series, error)
|
||||
Series(key string) (*Series, error)
|
||||
ContainsSeries(keys []string) (map[string]bool, error)
|
||||
DeleteSeries(keys []string) error
|
||||
DeleteSeriesRange(keys []string, min, max int64) error
|
||||
DeleteMeasurement(name string, seriesKeys []string) error
|
||||
SeriesCount() (n int, err error)
|
||||
|
||||
CreateMeasurement(name string) (*Measurement, error)
|
||||
Measurement(name string) (*Measurement, error)
|
||||
Measurements() (Measurements, error)
|
||||
MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error)
|
||||
MeasurementsByRegex(re *regexp.Regexp) (Measurements, error)
|
||||
MeasurementFields(measurement string) *MeasurementFields
|
||||
DeleteMeasurement(name string, seriesKeys []string) error
|
||||
|
||||
CreateSnapshot() (string, error)
|
||||
SetEnabled(enabled bool)
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
|
@ -288,6 +289,22 @@ func (e *Engine) Index() *tsdb.DatabaseIndex {
|
|||
return e.index
|
||||
}
|
||||
|
||||
func (e *Engine) Measurement(name string) (*tsdb.Measurement, error) {
|
||||
return e.index.Measurement(name), nil
|
||||
}
|
||||
|
||||
func (e *Engine) Measurements() (tsdb.Measurements, error) {
|
||||
return e.index.Measurements(), nil
|
||||
}
|
||||
|
||||
func (e *Engine) MeasurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool, error) {
|
||||
return e.index.MeasurementsByExpr(expr)
|
||||
}
|
||||
|
||||
func (e *Engine) MeasurementsByRegex(re *regexp.Regexp) (tsdb.Measurements, error) {
|
||||
return e.index.MeasurementsByRegex(re), nil
|
||||
}
|
||||
|
||||
// MeasurementFields returns the measurement fields for a measurement.
|
||||
func (e *Engine) MeasurementFields(measurement string) *tsdb.MeasurementFields {
|
||||
e.fieldsMu.RLock()
|
||||
|
@ -374,6 +391,7 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic {
|
|||
statTSMFullCompactionDuration: atomic.LoadInt64(&e.stats.TSMFullCompactionDuration),
|
||||
},
|
||||
})
|
||||
statistics = append(statistics, e.index.Statistics(tags)...)
|
||||
statistics = append(statistics, e.Cache.Statistics(tags)...)
|
||||
statistics = append(statistics, e.FileStore.Statistics(tags)...)
|
||||
statistics = append(statistics, e.WAL.Statistics(tags)...)
|
||||
|
@ -648,8 +666,6 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key []byte, fieldType influxq
|
|||
// Have we already indexed this series?
|
||||
ss := index.SeriesBytes(seriesKey)
|
||||
if ss != nil {
|
||||
// Add this shard to the existing series
|
||||
ss.AssignShard(shardID)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -657,9 +673,8 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key []byte, fieldType influxq
|
|||
// fields (in line protocol format) in the series key
|
||||
_, tags, _ := models.ParseKey(seriesKey)
|
||||
|
||||
s := tsdb.NewSeries(string(seriesKey), tags)
|
||||
s := tsdb.NewSeries(seriesKey, tags)
|
||||
index.CreateSeriesIndexIfNotExists(measurement, s)
|
||||
s.AssignShard(shardID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -744,6 +759,7 @@ func (e *Engine) ContainsSeries(keys []string) (map[string]bool, error) {
|
|||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return keyMap, nil
|
||||
}
|
||||
|
||||
|
@ -815,9 +831,31 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error {
|
|||
e.Cache.DeleteRange(walKeys, min, max)
|
||||
|
||||
// delete from the WAL
|
||||
_, err := e.WAL.DeleteRange(walKeys, min, max)
|
||||
if _, err := e.WAL.DeleteRange(walKeys, min, max); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return err
|
||||
// Have we deleted all points for the series? If so, we need to remove
|
||||
// the series from the index.
|
||||
existing, err := e.ContainsSeries(seriesKeys)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var toDelete []string
|
||||
for k, exists := range existing {
|
||||
if !exists {
|
||||
toDelete = append(toDelete, k)
|
||||
}
|
||||
}
|
||||
e.index.DropSeries(toDelete)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateMeasurement creates a measurement on the index.
|
||||
func (e *Engine) CreateMeasurement(name string) (*tsdb.Measurement, error) {
|
||||
return e.index.CreateMeasurementIndexIfNotExists(name), nil
|
||||
}
|
||||
|
||||
// DeleteMeasurement deletes a measurement and all related series.
|
||||
|
@ -826,7 +864,13 @@ func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error {
|
|||
delete(e.measurementFields, name)
|
||||
e.fieldsMu.Unlock()
|
||||
|
||||
return e.DeleteSeries(seriesKeys)
|
||||
if err := e.DeleteSeries(seriesKeys); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Remove the measurement from the index.
|
||||
e.index.DropMeasurement(name)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SeriesCount returns the number of series buckets on the shard.
|
||||
|
@ -846,6 +890,15 @@ func (e *Engine) LastModified() time.Time {
|
|||
return fsTime
|
||||
}
|
||||
|
||||
func (e *Engine) CreateSeries(measurment string, series *tsdb.Series) (*tsdb.Series, error) {
|
||||
return e.index.CreateSeriesIndexIfNotExists(measurment, series), nil
|
||||
}
|
||||
|
||||
// Series returns a series from the index.
|
||||
func (e *Engine) Series(key string) (*tsdb.Series, error) {
|
||||
return e.index.Series(key), nil
|
||||
}
|
||||
|
||||
// WriteTo is not implemented.
|
||||
func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") }
|
||||
|
||||
|
|
|
@ -573,7 +573,8 @@ func TestEngine_DeleteSeries(t *testing.T) {
|
|||
p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000")
|
||||
|
||||
// Write those points to the engine.
|
||||
e := tsm1.NewEngine(1, f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine)
|
||||
e := tsm1.NewEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine)
|
||||
e.LoadMetadataIndex(1, tsdb.NewDatabaseIndex("db0")) // Initialise an index
|
||||
|
||||
// mock the planner so compactions don't run during the test
|
||||
e.CompactionPlan = &mockPlanner{}
|
||||
|
|
179
tsdb/meta.go
179
tsdb/meta.go
|
@ -145,7 +145,8 @@ func (d *DatabaseIndex) SeriesShardN(shardID uint64) int {
|
|||
return n
|
||||
}
|
||||
|
||||
// CreateSeriesIndexIfNotExists adds the series for the given measurement to the index and sets its ID or returns the existing series object.
|
||||
// CreateSeriesIndexIfNotExists adds the series for the given measurement to the
|
||||
// index and sets its ID or returns the existing series object
|
||||
func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, series *Series) *Series {
|
||||
d.mu.RLock()
|
||||
// if there is a measurement for this id, it's already been added
|
||||
|
@ -182,7 +183,8 @@ func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, ser
|
|||
return series
|
||||
}
|
||||
|
||||
// CreateMeasurementIndexIfNotExists creates or retrieves an in-memory index object for the measurement.
|
||||
// CreateMeasurementIndexIfNotExists creates or retrieves an in memory index
|
||||
// object for the measurement
|
||||
func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurement {
|
||||
name = escape.UnescapeString(name)
|
||||
|
||||
|
@ -210,56 +212,6 @@ func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurem
|
|||
return m
|
||||
}
|
||||
|
||||
// AssignShard updates the index to indicate that series k exists in
|
||||
// the given shardID.
|
||||
func (d *DatabaseIndex) AssignShard(k string, shardID uint64) {
|
||||
ss := d.Series(k)
|
||||
if ss != nil {
|
||||
ss.AssignShard(shardID)
|
||||
}
|
||||
}
|
||||
|
||||
// UnassignShard updates the index to indicate that series k does not exist in
|
||||
// the given shardID.
|
||||
func (d *DatabaseIndex) UnassignShard(k string, shardID uint64) {
|
||||
ss := d.Series(k)
|
||||
if ss != nil {
|
||||
if ss.Assigned(shardID) {
|
||||
// Remove the shard from any series
|
||||
ss.UnassignShard(shardID)
|
||||
|
||||
// If this series no longer has shards assigned, remove the series
|
||||
if ss.ShardN() == 0 {
|
||||
|
||||
// Remove the series the measurements
|
||||
ss.measurement.DropSeries(ss)
|
||||
|
||||
// If the measurement no longer has any series, remove it as well
|
||||
if !ss.measurement.HasSeries() {
|
||||
d.mu.Lock()
|
||||
d.dropMeasurement(ss.measurement.Name)
|
||||
d.mu.Unlock()
|
||||
}
|
||||
|
||||
// Remove the series key from the series index
|
||||
d.mu.Lock()
|
||||
delete(d.series, k)
|
||||
atomic.AddInt64(&d.stats.NumSeries, -1)
|
||||
d.mu.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RemoveShard removes all references to shardID from any series or measurements
|
||||
// in the index. If the shard was the only owner of data for the series, the series
|
||||
// is removed from the index.
|
||||
func (d *DatabaseIndex) RemoveShard(shardID uint64) {
|
||||
for _, k := range d.SeriesKeys() {
|
||||
d.UnassignShard(k, shardID)
|
||||
}
|
||||
}
|
||||
|
||||
// TagsForSeries returns the tag map for the passed in series
|
||||
func (d *DatabaseIndex) TagsForSeries(key string) models.Tags {
|
||||
d.mu.RLock()
|
||||
|
@ -497,6 +449,10 @@ func (d *DatabaseIndex) dropMeasurement(name string) {
|
|||
|
||||
// DropSeries removes the series keys and their tags from the index.
|
||||
func (d *DatabaseIndex) DropSeries(keys []string) {
|
||||
if len(keys) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
|
@ -537,9 +493,10 @@ func (d *DatabaseIndex) Dereference(b []byte) {
|
|||
}
|
||||
}
|
||||
|
||||
// Measurement represents a collection of time series in a database. It also contains in-memory
|
||||
// structures for indexing tags. Exported functions are goroutine safe while un-exported functions
|
||||
// assume the caller will use the appropriate locks.
|
||||
// Measurement represents a collection of time series in a database. It also
|
||||
// contains in memory structures for indexing tags. Exported functions are
|
||||
// goroutine safe while un-exported functions assume the caller will use the
|
||||
// appropriate locks.
|
||||
type Measurement struct {
|
||||
mu sync.RWMutex
|
||||
Name string `json:"name,omitempty"`
|
||||
|
@ -563,14 +520,6 @@ func NewMeasurement(name string) *Measurement {
|
|||
}
|
||||
}
|
||||
|
||||
// HasField returns true if the measurement has a field by the given name.
|
||||
func (m *Measurement) HasField(name string) bool {
|
||||
m.mu.RLock()
|
||||
hasField := m.hasField(name)
|
||||
m.mu.RUnlock()
|
||||
return hasField
|
||||
}
|
||||
|
||||
func (m *Measurement) hasField(name string) bool {
|
||||
_, hasField := m.fieldNames[name]
|
||||
return hasField
|
||||
|
@ -617,20 +566,7 @@ func (m *Measurement) SeriesKeys() []string {
|
|||
return keys
|
||||
}
|
||||
|
||||
// ValidateGroupBy ensures that the GROUP BY is not a field.
|
||||
func (m *Measurement) ValidateGroupBy(stmt *influxql.SelectStatement) error {
|
||||
for _, d := range stmt.Dimensions {
|
||||
switch e := d.Expr.(type) {
|
||||
case *influxql.VarRef:
|
||||
if m.HasField(e.Val) {
|
||||
return fmt.Errorf("can not use field in GROUP BY clause: %s", e.Val)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key.
|
||||
// HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key
|
||||
func (m *Measurement) HasTagKey(k string) bool {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
@ -766,15 +702,18 @@ func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]inf
|
|||
return m.walkWhereForSeriesIds(condition)
|
||||
}
|
||||
|
||||
// TagSets returns the unique tag sets that exist for the given tag keys. This is used to determine
|
||||
// what composite series will be created by a group by. i.e. "group by region" should return:
|
||||
// {"region":"uswest"}, {"region":"useast"}
|
||||
// or region, service returns
|
||||
// {"region": "uswest", "service": "redis"}, {"region": "uswest", "service": "mysql"}, ...
|
||||
// This will also populate the TagSet objects with the series IDs that match each tagset and any
|
||||
// influx filter expression that goes with the series
|
||||
// TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it.
|
||||
func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
|
||||
// TagSets returns the unique tag sets that exist for the given tag keys. This
|
||||
// is used to determine what composite series will be created by a group by.
|
||||
//
|
||||
// i.e. "group by region" should return: {"region":"uswest"},
|
||||
// {"region":"useast"} or region, service returns {"region": "uswest",
|
||||
// "service": "redis"}, {"region": "uswest", "service": "mysql"}, etc...
|
||||
//
|
||||
// This will also populate the TagSet objects with the series IDs that match
|
||||
// each tagset and any influx filter expression that goes with the series TODO:
|
||||
// this shouldn't be exported. However, until tx.go and the engine get
|
||||
// refactored into tsdb, we need it.
|
||||
func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
|
||||
m.mu.RLock()
|
||||
|
||||
// get the unique set of series ids and the filters that should be applied to each
|
||||
|
@ -1161,9 +1100,10 @@ func (fe FilterExprs) Len() int {
|
|||
return len(fe)
|
||||
}
|
||||
|
||||
// walkWhereForSeriesIds recursively walks the WHERE clause and returns an ordered set of series IDs and
|
||||
// a map from those series IDs to filter expressions that should be used to limit points returned in
|
||||
// the final query result.
|
||||
// walkWhereForSeriesIds recursively walks the WHERE clause and returns an
|
||||
// ordered set of series IDs and a map from those series IDs to filter
|
||||
// expressions that should be used to limit points returned in the final query
|
||||
// result.
|
||||
func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, FilterExprs, error) {
|
||||
switch n := expr.(type) {
|
||||
case *influxql.BinaryExpr:
|
||||
|
@ -1226,7 +1166,8 @@ func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, Filt
|
|||
}
|
||||
}
|
||||
|
||||
// expandExpr returns a list of expressions expanded by all possible tag combinations.
|
||||
// expandExpr returns a list of expressions expanded by all possible tag
|
||||
// combinations.
|
||||
func (m *Measurement) expandExpr(expr influxql.Expr) []tagSetExpr {
|
||||
// Retrieve list of unique values for each tag.
|
||||
valuesByTagKey := m.uniqueTagValues(expr)
|
||||
|
@ -1483,7 +1424,7 @@ func (m *Measurement) uniqueTagValues(expr influxql.Expr) map[string][]string {
|
|||
return out
|
||||
}
|
||||
|
||||
// Measurements represents a list of *Measurement.
|
||||
// Measurements represents a set of *Measurement.
|
||||
type Measurements []*Measurement
|
||||
|
||||
// Len implements sort.Interface.
|
||||
|
@ -1559,60 +1500,16 @@ type Series struct {
|
|||
Tags models.Tags
|
||||
ID uint64
|
||||
measurement *Measurement
|
||||
shardIDs []uint64 // shards that have this series defined
|
||||
}
|
||||
|
||||
// NewSeries returns an initialized series struct.
|
||||
func NewSeries(key string, tags models.Tags) *Series {
|
||||
// NewSeries returns an initialized series struct
|
||||
func NewSeries(key []byte, tags models.Tags) *Series {
|
||||
return &Series{
|
||||
Key: key,
|
||||
Key: string(key),
|
||||
Tags: tags,
|
||||
}
|
||||
}
|
||||
|
||||
// AssignShard adds shardID to the list of shards this series is assigned to.
|
||||
func (s *Series) AssignShard(shardID uint64) {
|
||||
s.mu.Lock()
|
||||
if !s.assigned(shardID) {
|
||||
s.shardIDs = append(s.shardIDs, shardID)
|
||||
sort.Sort(uint64Slice(s.shardIDs))
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// UnassignShard removes the shardID from the list of shards this series is assigned to.
|
||||
func (s *Series) UnassignShard(shardID uint64) {
|
||||
s.mu.Lock()
|
||||
for i, v := range s.shardIDs {
|
||||
if v == shardID {
|
||||
s.shardIDs = append(s.shardIDs[:i], s.shardIDs[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// Assigned returns whether this series is assigned to the given shard.
|
||||
func (s *Series) Assigned(shardID uint64) bool {
|
||||
s.mu.RLock()
|
||||
b := s.assigned(shardID)
|
||||
s.mu.RUnlock()
|
||||
return b
|
||||
}
|
||||
|
||||
func (s *Series) assigned(shardID uint64) bool {
|
||||
i := sort.Search(len(s.shardIDs), func(i int) bool { return s.shardIDs[i] >= shardID })
|
||||
return i < len(s.shardIDs) && s.shardIDs[i] == shardID
|
||||
}
|
||||
|
||||
// ShardN returns the number of shards this series is assigned to.
|
||||
func (s *Series) ShardN() int {
|
||||
s.mu.RLock()
|
||||
n := len(s.shardIDs)
|
||||
s.mu.RUnlock()
|
||||
return n
|
||||
}
|
||||
|
||||
// Dereference removes references to a byte slice.
|
||||
func (s *Series) Dereference(b []byte) {
|
||||
s.mu.Lock()
|
||||
|
@ -2085,12 +1982,6 @@ func MeasurementFromSeriesKey(key string) string {
|
|||
return escape.UnescapeString(k)
|
||||
}
|
||||
|
||||
type uint64Slice []uint64
|
||||
|
||||
func (a uint64Slice) Len() int { return len(a) }
|
||||
func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }
|
||||
|
||||
type byTagKey []*influxql.TagSet
|
||||
|
||||
func (t byTagKey) Len() int { return len(t) }
|
||||
|
|
118
tsdb/shard.go
118
tsdb/shard.go
|
@ -101,7 +101,6 @@ func (e PartialWriteError) Error() string {
|
|||
// Data can be split across many shards. The query engine in TSDB is responsible
|
||||
// for combining the output of many shards into a single query result.
|
||||
type Shard struct {
|
||||
index *DatabaseIndex
|
||||
path string
|
||||
walPath string
|
||||
id uint64
|
||||
|
@ -219,7 +218,6 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic {
|
|||
}}
|
||||
|
||||
// Add the index and engine statistics.
|
||||
statistics = append(statistics, s.index.Statistics(tags)...)
|
||||
statistics = append(statistics, s.engine.Statistics(tags)...)
|
||||
return statistics
|
||||
}
|
||||
|
@ -257,15 +255,18 @@ func (s *Shard) Open() error {
|
|||
|
||||
// Load metadata index.
|
||||
start := time.Now()
|
||||
if err := e.LoadMetadataIndex(s.id, s.index); err != nil {
|
||||
if err := e.LoadMetadataIndex(s.id, NewDatabaseIndex(s.database)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
count := s.index.SeriesShardN(s.id)
|
||||
atomic.AddInt64(&s.stats.SeriesCreated, int64(count))
|
||||
|
||||
s.engine = e
|
||||
|
||||
count, err := s.engine.SeriesCount()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
atomic.AddInt64(&s.stats.SeriesCreated, int64(count))
|
||||
|
||||
s.logger.Info(fmt.Sprintf("%s database index loaded in %s", s.path, time.Now().Sub(start)))
|
||||
|
||||
go s.monitor()
|
||||
|
@ -303,9 +304,6 @@ func (s *Shard) close() error {
|
|||
close(s.closing)
|
||||
}
|
||||
|
||||
// Wipe out our index.
|
||||
s.index = NewDatabaseIndex(s.database)
|
||||
|
||||
err := s.engine.Close()
|
||||
if err == nil {
|
||||
s.engine = nil
|
||||
|
@ -462,19 +460,17 @@ func (s *Shard) DeleteMeasurement(name string) error {
|
|||
}
|
||||
|
||||
// Attempt to find the series keys.
|
||||
m := s.index.Measurement(name)
|
||||
m, err := s.engine.Measurement(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if m == nil {
|
||||
return influxql.ErrMeasurementNotFound(name)
|
||||
}
|
||||
|
||||
// Remove the measurement from the engine.
|
||||
if err := s.engine.DeleteMeasurement(name, m.SeriesKeys()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Remove the measurement from the index.
|
||||
s.index.DropMeasurement(name)
|
||||
return nil
|
||||
return s.engine.DeleteMeasurement(name, m.SeriesKeys())
|
||||
}
|
||||
|
||||
func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error {
|
||||
|
@ -492,7 +488,10 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error
|
|||
}
|
||||
|
||||
// ensure the measurement is in the index and the field is there
|
||||
measurement := s.index.CreateMeasurementIndexIfNotExists(f.Measurement)
|
||||
measurement, err := s.engine.CreateMeasurement(f.Measurement)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
measurement.SetFieldName(f.Field.Name)
|
||||
}
|
||||
|
||||
|
@ -512,7 +511,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
|
|||
// and record why/increment counters
|
||||
for i, p := range points {
|
||||
tags := p.Tags()
|
||||
m := s.index.Measurement(p.Name())
|
||||
m := s.Measurement(p.Name())
|
||||
|
||||
// Measurement doesn't exist yet, can't check the limit
|
||||
if m != nil {
|
||||
|
@ -574,22 +573,31 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
|
|||
iter.Reset()
|
||||
|
||||
// see if the series should be added to the index
|
||||
ss := s.index.SeriesBytes(p.Key())
|
||||
key := string(p.Key())
|
||||
ss, err := s.engine.Series(key)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if ss == nil {
|
||||
if s.options.Config.MaxSeriesPerDatabase > 0 && s.index.SeriesN()+1 > s.options.Config.MaxSeriesPerDatabase {
|
||||
cnt, err := s.engine.SeriesN()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if s.options.Config.MaxSeriesPerDatabase > 0 && cnt+1 > s.options.Config.MaxSeriesPerDatabase {
|
||||
atomic.AddInt64(&s.stats.WritePointsDropped, 1)
|
||||
dropped++
|
||||
reason = fmt.Sprintf("max-series-per-database limit exceeded: db=%s (%d/%d)",
|
||||
s.database, s.index.SeriesN(), s.options.Config.MaxSeriesPerDatabase)
|
||||
reason = fmt.Sprintf("db %s max series limit reached: (%d/%d)", s.database, cnt, s.options.Config.MaxSeriesPerDatabase)
|
||||
continue
|
||||
}
|
||||
|
||||
ss = s.index.CreateSeriesIndexIfNotExists(p.Name(), NewSeries(string(p.Key()), tags))
|
||||
ss = NewSeries(p.Key(), tags)
|
||||
atomic.AddInt64(&s.stats.SeriesCreated, 1)
|
||||
}
|
||||
|
||||
if !ss.Assigned(s.id) {
|
||||
ss.AssignShard(s.id)
|
||||
if ss, err = s.engine.CreateSeries(p.Name(), ss); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// see if the field definitions need to be saved to the shard
|
||||
|
@ -657,18 +665,20 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
|
|||
|
||||
// Measurement returns the named measurement from the index.
|
||||
func (s *Shard) Measurement(name string) *Measurement {
|
||||
return s.index.Measurement(name)
|
||||
m, _ := s.engine.Measurement(name)
|
||||
return m
|
||||
}
|
||||
|
||||
// Measurements returns a slice of all measurements from the index.
|
||||
func (s *Shard) Measurements() []*Measurement {
|
||||
return s.index.Measurements()
|
||||
m, _ := s.engine.Measurements()
|
||||
return m
|
||||
}
|
||||
|
||||
// MeasurementsByExpr takes an expression containing only tags and returns a
|
||||
// slice of matching measurements.
|
||||
func (s *Shard) MeasurementsByExpr(cond influxql.Expr) (Measurements, bool, error) {
|
||||
return s.index.MeasurementsByExpr(cond)
|
||||
return s.engine.MeasurementsByExpr(cond)
|
||||
}
|
||||
|
||||
// SeriesCount returns the number of series buckets on the shard.
|
||||
|
@ -681,7 +691,8 @@ func (s *Shard) SeriesCount() (int, error) {
|
|||
|
||||
// Series returns a series by key.
|
||||
func (s *Shard) Series(key string) *Series {
|
||||
return s.index.Series(key)
|
||||
series, _ := s.engine.Series(key)
|
||||
return series
|
||||
}
|
||||
|
||||
// WriteTo writes the shard's data to w.
|
||||
|
@ -767,7 +778,10 @@ func (s *Shard) FieldDimensions(sources influxql.Sources) (fields map[string]inf
|
|||
switch m := src.(type) {
|
||||
case *influxql.Measurement:
|
||||
// Retrieve measurement.
|
||||
mm := s.index.Measurement(m.Name)
|
||||
mm, err := s.engine.Measurement(m.Name)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if mm == nil {
|
||||
continue
|
||||
}
|
||||
|
@ -785,7 +799,7 @@ func (s *Shard) FieldDimensions(sources influxql.Sources) (fields map[string]inf
|
|||
}
|
||||
}
|
||||
|
||||
return
|
||||
return fields, dimensions, nil
|
||||
}
|
||||
|
||||
// ExpandSources expands regex sources and removes duplicates.
|
||||
|
@ -805,7 +819,12 @@ func (s *Shard) ExpandSources(sources influxql.Sources) (influxql.Sources, error
|
|||
}
|
||||
|
||||
// Loop over matching measurements.
|
||||
for _, m := range s.index.MeasurementsByRegex(src.Regex.Val) {
|
||||
measurements, err := s.engine.MeasurementsByRegex(src.Regex.Val)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, m := range measurements {
|
||||
other := &influxql.Measurement{
|
||||
Database: src.Database,
|
||||
RetentionPolicy: src.RetentionPolicy,
|
||||
|
@ -886,8 +905,8 @@ func (s *Shard) monitor() {
|
|||
continue
|
||||
}
|
||||
|
||||
for _, m := range s.index.Measurements() {
|
||||
m.WalkTagKeys(func(k string) {
|
||||
for _, m := range s.Measurements() {
|
||||
for _, k := range m.TagKeys() {
|
||||
n := m.Cardinality(k)
|
||||
perc := int(float64(n) / float64(s.options.Config.MaxValuesPerTag) * 100)
|
||||
if perc > 100 {
|
||||
|
@ -1072,15 +1091,16 @@ func (ic *shardIteratorCreator) ExpandSources(sources influxql.Sources) (influxq
|
|||
func NewFieldKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
itr := &fieldKeysIterator{sh: sh}
|
||||
|
||||
var err error
|
||||
// Retrieve measurements from shard. Filter if condition specified.
|
||||
if opt.Condition == nil {
|
||||
itr.mms = sh.index.Measurements()
|
||||
} else {
|
||||
mms, _, err := sh.index.measurementsByExpr(opt.Condition)
|
||||
if err != nil {
|
||||
if itr.mms, err = sh.engine.Measurements(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
if itr.mms, _, err = sh.engine.MeasurementsByExpr(opt.Condition); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
itr.mms = mms
|
||||
}
|
||||
|
||||
// Sort measurements by name.
|
||||
|
@ -1182,7 +1202,10 @@ func NewSeriesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterat
|
|||
}
|
||||
|
||||
// Read and sort all measurements.
|
||||
mms := sh.index.Measurements()
|
||||
mms, err := sh.engine.Measurements()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sort.Sort(mms)
|
||||
|
||||
return &seriesIterator{
|
||||
|
@ -1295,11 +1318,13 @@ func NewTagValuesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Ite
|
|||
return e
|
||||
}), nil)
|
||||
|
||||
mms, ok, err := sh.index.measurementsByExpr(measurementExpr)
|
||||
mms, ok, err := sh.engine.MeasurementsByExpr(measurementExpr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if !ok {
|
||||
mms = sh.index.Measurements()
|
||||
if mms, err = sh.engine.Measurements(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sort.Sort(mms)
|
||||
}
|
||||
|
||||
|
@ -1408,11 +1433,14 @@ type measurementKeyFunc func(m *Measurement) []string
|
|||
func newMeasurementKeysIterator(sh *Shard, fn measurementKeyFunc, opt influxql.IteratorOptions) (*measurementKeysIterator, error) {
|
||||
itr := &measurementKeysIterator{fn: fn}
|
||||
|
||||
var err error
|
||||
// Retrieve measurements from shard. Filter if condition specified.
|
||||
if opt.Condition == nil {
|
||||
itr.mms = sh.index.Measurements()
|
||||
if itr.mms, err = sh.engine.Measurements(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
mms, _, err := sh.index.measurementsByExpr(opt.Condition)
|
||||
mms, _, err := sh.engine.MeasurementsByExpr(opt.Condition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -329,11 +329,6 @@ func (s *Store) DeleteShard(shardID uint64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Remove the shard from the database indexes before closing the shard.
|
||||
// Closing the shard will do this as well, but it will unload it while
|
||||
// the shard is locked which can block stats collection and other calls.
|
||||
sh.UnloadIndex()
|
||||
|
||||
if err := sh.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -683,28 +678,9 @@ func (s *Store) deleteSeries(database string, seriesKeys []string, min, max int6
|
|||
s.mu.RUnlock()
|
||||
|
||||
return s.walkShards(shards, func(sh *Shard) error {
|
||||
if sh.database != database {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := sh.DeleteSeriesRange(seriesKeys, min, max); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// The keys we passed in may be fully deleted from the shard, if so,
|
||||
// we need to remove the shard from all the meta data indices.
|
||||
existing, err := sh.ContainsSeries(seriesKeys)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var toDelete []string
|
||||
for k, exists := range existing {
|
||||
if !exists {
|
||||
toDelete = append(toDelete, k)
|
||||
}
|
||||
}
|
||||
sh.index.DropSeries(toDelete)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue