Re-add shared in-memory index.

pull/7913/head
Ben Johnson 2016-12-19 09:57:05 -07:00
parent 0f9b2bfe6a
commit f9efcb3365
No known key found for this signature in database
GPG Key ID: 81741CD251883081
8 changed files with 208 additions and 18 deletions

View File

@ -148,3 +148,6 @@ func NewEngineOptions() EngineOptions {
Config: NewConfig(),
}
}
// NewInmemIndex returns a new "inmem" index type.
var NewInmemIndex func(name string) (interface{}, error)

View File

@ -823,6 +823,7 @@ func (e *Engine) DeleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
for k, exists := range existing {
if !exists {
e.index.UnassignShard(k, e.id)
e.index.DropSeries([][]byte{[]byte(k)})
}
}

View File

@ -44,6 +44,9 @@ type Index interface {
// To be removed w/ tsi1.
SetFieldName(measurement, name string)
AssignShard(k string, shardID uint64)
UnassignShard(k string, shardID uint64)
RemoveShard(shardID uint64)
}
// IndexFormat represents the format for an index.

View File

@ -29,23 +29,19 @@ import (
)
func init() {
tsdb.NewInmemIndex = func(name string) (interface{}, error) { return NewIndex(), nil }
tsdb.RegisterIndex("inmem", func(id uint64, path string, opt tsdb.EngineOptions) tsdb.Index {
return NewIndex(id, path, opt)
return NewShardIndex(id, path, opt)
})
}
// Ensure index implements interface.
var _ tsdb.Index = &Index{}
// Index is the in memory index of a collection of measurements, time
// series, and their tags. Exported functions are goroutine safe while
// un-exported functions assume the caller will use the appropriate locks.
type Index struct {
mu sync.RWMutex
id uint64 // shard id
opt tsdb.EngineOptions
// In-memory metadata index, built on load and updated when new series come in
measurements map[string]*tsdb.Measurement // measurement name to object and index
series map[string]*tsdb.Series // map series key to the Series object
@ -56,11 +52,8 @@ type Index struct {
}
// NewIndex returns a new initialized Index.
func NewIndex(id uint64, path string, opt tsdb.EngineOptions) *Index {
func NewIndex() *Index {
index := &Index{
id: id,
opt: opt,
measurements: make(map[string]*tsdb.Measurement),
series: make(map[string]*tsdb.Series),
}
@ -128,11 +121,12 @@ func (i *Index) MeasurementsByName(names [][]byte) ([]*tsdb.Measurement, error)
// CreateSeriesIfNotExists adds the series for the given measurement to the
// index and sets its ID or returns the existing series object
func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error {
func (i *Index) CreateSeriesIfNotExists(shardID uint64, key, name []byte, tags models.Tags, opt *tsdb.EngineOptions) error {
i.mu.RLock()
// if there is a series for this id, it's already been added
ss := i.series[string(key)]
if ss != nil {
ss.AssignShard(shardID)
i.mu.RUnlock()
return nil
}
@ -145,6 +139,7 @@ func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) erro
// Check for the series again under a write lock
ss = i.series[string(key)]
if ss != nil {
ss.AssignShard(shardID)
i.mu.Unlock()
return nil
}
@ -158,6 +153,7 @@ func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) erro
i.series[string(key)] = series
m.AddSeries(series)
series.AssignShard(shardID)
// Add the series to the series sketch.
i.seriesSketch.Add(key)
@ -504,7 +500,7 @@ func (i *Index) ForEachMeasurementSeriesByExpr(name []byte, expr influxql.Expr,
}
// TagSets returns a list of tag sets.
func (i *Index) TagSets(name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
func (i *Index) TagSets(shardID uint64, name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
i.mu.RLock()
defer i.mu.RUnlock()
@ -513,7 +509,7 @@ func (i *Index) TagSets(name []byte, dimensions []string, condition influxql.Exp
return nil, nil
}
tagSets, err := mm.TagSets(dimensions, condition)
tagSets, err := mm.TagSets(shardID, dimensions, condition)
if err != nil {
return nil, err
}
@ -610,6 +606,85 @@ func (i *Index) SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iter
}, nil
}
// AssignShard update the index to indicate that series k exists in the given shardID.
func (i *Index) AssignShard(k string, shardID uint64) {
ss, _ := i.Series([]byte(k))
if ss != nil {
ss.AssignShard(shardID)
}
}
// UnassignShard updates the index to indicate that series k does not exist in
// the given shardID.
func (i *Index) UnassignShard(k string, shardID uint64) {
ss, _ := i.Series([]byte(k))
if ss != nil {
if ss.Assigned(shardID) {
// Remove the shard from any series
ss.UnassignShard(shardID)
// If this series no longer has shards assigned, remove the series
if ss.ShardN() == 0 {
// Remove the series the measurements
ss.Measurement().DropSeries(ss)
// If the measurement no longer has any series, remove it as well
if !ss.Measurement().HasSeries() {
i.mu.Lock()
i.dropMeasurement(ss.Measurement().Name)
i.mu.Unlock()
}
// Remove the series key from the series index
i.mu.Lock()
delete(i.series, k)
// atomic.AddInt64(&i.stats.NumSeries, -1)
i.mu.Unlock()
}
}
}
}
// RemoveShard removes all references to shardID from any series or measurements
// in the index. If the shard was the only owner of data for the series, the series
// is removed from the index.
func (i *Index) RemoveShard(shardID uint64) {
for _, k := range i.SeriesKeys() {
i.UnassignShard(k, shardID)
}
}
// Ensure index implements interface.
var _ tsdb.Index = &ShardIndex{}
// ShardIndex represents a shim between the TSDB index interface and the shared
// in-memory index. This is required because per-shard in-memory indexes will
// grow the heap size too large.
type ShardIndex struct {
*Index
id uint64 // shard id
opt tsdb.EngineOptions
}
func (i *ShardIndex) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error {
return i.Index.CreateSeriesIfNotExists(i.id, key, name, tags, &i.opt)
}
// TagSets returns a list of tag sets based on series filtering.
func (i *ShardIndex) TagSets(name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
return i.Index.TagSets(i.id, name, dimensions, condition)
}
// NewShardIndex returns a new index for a shard.
func NewShardIndex(id uint64, path string, opt tsdb.EngineOptions) tsdb.Index {
return &ShardIndex{
Index: opt.InmemIndex.(*Index),
id: id,
opt: opt,
}
}
// seriesPointIterator emits series as influxql points.
type seriesPointIterator struct {
mms tsdb.Measurements

View File

@ -278,7 +278,7 @@ func (m *Measurement) ForEachSeriesByExpr(condition influxql.Expr, fn func(tags
// This will also populate the TagSet objects with the series IDs that match each tagset and any
// influx filter expression that goes with the series
// TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it.
func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
m.mu.RLock()
// get the unique set of series ids and the filters that should be applied to each
@ -294,6 +294,9 @@ func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*
tagSets := make(map[string]*influxql.TagSet, 64)
for _, id := range ids {
s := m.seriesByID[id]
if !s.Assigned(shardID) {
continue
}
tags := make(map[string]string, len(dimensions))
// Build the TagSet for this series.
@ -1056,6 +1059,7 @@ type Series struct {
Tags models.Tags
ID uint64
measurement *Measurement
shardIDs []uint64 // shards that have this series defined
}
// NewSeries returns an initialized series struct
@ -1066,6 +1070,45 @@ func NewSeries(key []byte, tags models.Tags) *Series {
}
}
func (s *Series) AssignShard(shardID uint64) {
s.mu.Lock()
if !s.assigned(shardID) {
s.shardIDs = append(s.shardIDs, shardID)
sort.Sort(uint64Slice(s.shardIDs))
}
s.mu.Unlock()
}
func (s *Series) UnassignShard(shardID uint64) {
s.mu.Lock()
for i, v := range s.shardIDs {
if v == shardID {
s.shardIDs = append(s.shardIDs[:i], s.shardIDs[i+1:]...)
break
}
}
s.mu.Unlock()
}
func (s *Series) Assigned(shardID uint64) bool {
s.mu.RLock()
b := s.assigned(shardID)
s.mu.RUnlock()
return b
}
func (s *Series) assigned(shardID uint64) bool {
i := sort.Search(len(s.shardIDs), func(i int) bool { return s.shardIDs[i] >= shardID })
return i < len(s.shardIDs) && s.shardIDs[i] == shardID
}
func (s *Series) ShardN() int {
s.mu.RLock()
n := len(s.shardIDs)
s.mu.RUnlock()
return n
}
// Measurement returns the measurement on the series.
func (s *Series) Measurement() *Measurement {
return s.measurement

View File

@ -340,6 +340,8 @@ func (s *Shard) close() error {
close(s.closing)
}
s.UnloadIndex()
err := s.engine.Close()
if err == nil {
s.engine = nil
@ -374,6 +376,11 @@ func (s *Shard) LastModified() time.Time {
return s.engine.LastModified()
}
// UnloadIndex removes all references to this shard from the DatabaseIndex
func (s *Shard) UnloadIndex() {
s.index.RemoveShard(s.id)
}
// DiskSize returns the size on disk of this shard
func (s *Shard) DiskSize() (int64, error) {
var size int64
@ -481,7 +488,6 @@ func (s *Shard) DeleteMeasurement(name []byte) error {
if err := s.ready(); err != nil {
return err
}
println("S.DM", string(name))
return s.engine.DeleteMeasurement(name)
}

View File

@ -19,6 +19,7 @@ import (
"github.com/influxdata/influxdb/tsdb"
_ "github.com/influxdata/influxdb/tsdb/engine"
_ "github.com/influxdata/influxdb/tsdb/index"
"github.com/influxdata/influxdb/tsdb/index/inmem"
"go.uber.org/zap"
)
@ -33,6 +34,7 @@ func TestShardWriteAndIndex(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = inmem.NewIndex()
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
@ -158,6 +160,7 @@ func TestShard_MaxTagValuesLimit(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.Config.MaxValuesPerTag = 1000
opts.InmemIndex = inmem.NewIndex()
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
@ -209,6 +212,7 @@ func TestWriteTimeTag(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = inmem.NewIndex()
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
@ -264,6 +268,7 @@ func TestWriteTimeField(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = inmem.NewIndex()
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
@ -295,6 +300,7 @@ func TestShardWriteAddNewField(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = inmem.NewIndex()
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
@ -344,6 +350,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = inmem.NewIndex()
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
@ -414,6 +421,7 @@ func TestShard_Close_RemoveIndex(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = inmem.NewIndex()
sh := tsdb.NewShard(1, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
@ -898,6 +906,7 @@ func NewShard() *Shard {
// Build engine options.
opt := tsdb.NewEngineOptions()
opt.Config.WALDir = filepath.Join(path, "wal")
opt.InmemIndex = inmem.NewIndex()
return &Shard{
Shard: tsdb.NewShard(0,

View File

@ -43,6 +43,9 @@ type Store struct {
path string
// shared per-database indexes, only if using "inmem".
indexes map[string]interface{}
// shards is a map of shard IDs to the associated Shard.
shards map[uint64]*Shard
@ -64,6 +67,7 @@ func NewStore(path string) *Store {
return &Store{
databases: make(map[string]struct{}),
path: path,
indexes: make(map[string]interface{}),
EngineOptions: NewEngineOptions(),
Logger: logger,
baseLogger: logger,
@ -170,6 +174,12 @@ func (s *Store) loadShards() error {
continue
}
// Retrieve database index.
idx, err := s.createIndexIfNotExists(db.Name())
if err != nil {
return err
}
// Load each retention policy within the database directory.
rpDirs, err := ioutil.ReadDir(filepath.Join(s.path, db.Name()))
if err != nil {
@ -203,8 +213,12 @@ func (s *Store) loadShards() error {
return
}
// Copy options and assign shared index.
opt := s.EngineOptions
opt.InmemIndex = idx
// Open engine.
shard := NewShard(shardID, path, walPath, s.EngineOptions)
shard := NewShard(shardID, path, walPath, opt)
shard.WithLogger(s.baseLogger)
err = shard.Open()
@ -259,6 +273,27 @@ func (s *Store) Close() error {
return nil
}
// CreateIndexIfNotExists returns an in-memory index for a database.
func (s *Store) CreateIndexIfNotExists(name string) (interface{}, error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.createIndexIfNotExists(name)
}
func (s *Store) createIndexIfNotExists(name string) (interface{}, error) {
if idx := s.indexes[name]; idx != nil {
return idx, nil
}
idx, err := NewInmemIndex(name)
if err != nil {
return nil, err
}
s.indexes[name] = idx
return idx, nil
}
// Shard returns a shard by id.
func (s *Store) Shard(id uint64) *Shard {
s.mu.RLock()
@ -319,8 +354,18 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en
return err
}
// Retrieve shared index, if needed.
idx, err := s.createIndexIfNotExists(database)
if err != nil {
return err
}
// Copy index options and pass in shared index.
opt := s.EngineOptions
opt.InmemIndex = idx
path := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10))
shard := NewShard(shardID, path, walPath, s.EngineOptions)
shard := NewShard(shardID, path, walPath, opt)
shard.WithLogger(s.baseLogger)
shard.EnableOnOpen = enabled
@ -362,6 +407,11 @@ func (s *Store) DeleteShard(shardID uint64) error {
return nil
}
// Remove the shard from the database indexes before closing the shard.
// Closing the shard will do this as well, but it will unload it while
// the shard is locked which can block stats collection and other calls.
sh.UnloadIndex()
if err := sh.Close(); err != nil {
return err
}