Merge pull request #10509 from influxdata/jmw-tsdb-disallow-mixed-index
tsdb: don't allow deletes to a database in mixed index modepull/10525/head
commit
72283cd9c5
|
@ -33,6 +33,9 @@ var (
|
|||
ErrStoreClosed = fmt.Errorf("store is closed")
|
||||
// ErrShardDeletion is returned when trying to create a shard that is being deleted
|
||||
ErrShardDeletion = errors.New("shard is being deleted")
|
||||
// ErrMultipleIndexTypes is returned when trying to do deletes on a database with
|
||||
// multiple index types.
|
||||
ErrMultipleIndexTypes = errors.New("cannot delete data. DB contains shards using both inmem and tsi1 indexes. Please convert all shards to use the same index type to delete data.")
|
||||
)
|
||||
|
||||
// Statistics gathered by the store.
|
||||
|
@ -45,11 +48,35 @@ const (
|
|||
// a database.
|
||||
const SeriesFileDirectory = "_series"
|
||||
|
||||
// databaseState keeps track of the state of a database.
|
||||
type databaseState struct{ indexTypes map[string]int }
|
||||
|
||||
// addIndexType records that the database has a shard with the given index type.
|
||||
func (d *databaseState) addIndexType(indexType string) {
|
||||
if d.indexTypes == nil {
|
||||
d.indexTypes = make(map[string]int)
|
||||
}
|
||||
d.indexTypes[indexType]++
|
||||
}
|
||||
|
||||
// addIndexType records that the database no longer has a shard with the given index type.
|
||||
func (d *databaseState) removeIndexType(indexType string) {
|
||||
if d.indexTypes != nil {
|
||||
d.indexTypes[indexType]--
|
||||
if d.indexTypes[indexType] <= 0 {
|
||||
delete(d.indexTypes, indexType)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// hasMultipleIndexTypes returns true if the database has multiple index types.
|
||||
func (d *databaseState) hasMultipleIndexTypes() bool { return d != nil && len(d.indexTypes) > 1 }
|
||||
|
||||
// Store manages shards and indexes for databases.
|
||||
type Store struct {
|
||||
mu sync.RWMutex
|
||||
shards map[uint64]*Shard
|
||||
databases map[string]struct{}
|
||||
databases map[string]*databaseState
|
||||
sfiles map[string]*SeriesFile
|
||||
SeriesFileMaxSize int64 // Determines size of series file mmap. Can be altered in tests.
|
||||
path string
|
||||
|
@ -76,7 +103,7 @@ type Store struct {
|
|||
func NewStore(path string) *Store {
|
||||
logger := zap.NewNop()
|
||||
return &Store{
|
||||
databases: make(map[string]struct{}),
|
||||
databases: make(map[string]*databaseState),
|
||||
path: path,
|
||||
sfiles: make(map[string]*SeriesFile),
|
||||
indexes: make(map[string]interface{}),
|
||||
|
@ -377,10 +404,6 @@ func (s *Store) loadShards() error {
|
|||
}
|
||||
}
|
||||
|
||||
// indexVersions tracks counts of the number of different types of index
|
||||
// being used within each database.
|
||||
indexVersions := make(map[string]map[string]int)
|
||||
|
||||
// Gather results of opening shards concurrently, keeping track of how
|
||||
// many databases we are managing.
|
||||
for i := 0; i < n; i++ {
|
||||
|
@ -389,20 +412,18 @@ func (s *Store) loadShards() error {
|
|||
continue
|
||||
}
|
||||
s.shards[res.s.id] = res.s
|
||||
s.databases[res.s.database] = struct{}{}
|
||||
|
||||
if _, ok := indexVersions[res.s.database]; !ok {
|
||||
indexVersions[res.s.database] = make(map[string]int, 2)
|
||||
if _, ok := s.databases[res.s.database]; !ok {
|
||||
s.databases[res.s.database] = new(databaseState)
|
||||
}
|
||||
indexVersions[res.s.database][res.s.IndexType()]++
|
||||
s.databases[res.s.database].addIndexType(res.s.IndexType())
|
||||
}
|
||||
close(resC)
|
||||
|
||||
// Check if any databases are running multiple index types.
|
||||
for db, idxVersions := range indexVersions {
|
||||
if len(idxVersions) > 1 {
|
||||
for db, state := range s.databases {
|
||||
if state.hasMultipleIndexTypes() {
|
||||
var fields []zapcore.Field
|
||||
for idx, cnt := range idxVersions {
|
||||
for idx, cnt := range state.indexTypes {
|
||||
fields = append(fields, zap.Int(fmt.Sprintf("%s_count", idx), cnt))
|
||||
}
|
||||
s.Logger.Warn("Mixed shard index types", append(fields, logger.Database(db))...)
|
||||
|
@ -450,7 +471,7 @@ func (s *Store) Close() error {
|
|||
}
|
||||
}
|
||||
|
||||
s.databases = make(map[string]struct{})
|
||||
s.databases = make(map[string]*databaseState)
|
||||
s.sfiles = map[string]*SeriesFile{}
|
||||
s.indexes = make(map[string]interface{})
|
||||
s.pendingShardDeletes = make(map[uint64]struct{})
|
||||
|
@ -612,7 +633,17 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en
|
|||
}
|
||||
|
||||
s.shards[shardID] = shard
|
||||
s.databases[database] = struct{}{} // Ensure we are tracking any new db.
|
||||
if _, ok := s.databases[database]; !ok {
|
||||
s.databases[database] = new(databaseState)
|
||||
}
|
||||
s.databases[database].addIndexType(shard.IndexType())
|
||||
if state := s.databases[database]; state.hasMultipleIndexTypes() {
|
||||
var fields []zapcore.Field
|
||||
for idx, cnt := range state.indexTypes {
|
||||
fields = append(fields, zap.Int(fmt.Sprintf("%s_count", idx), cnt))
|
||||
}
|
||||
s.Logger.Warn("Mixed shard index types", append(fields, logger.Database(database))...)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -672,6 +703,7 @@ func (s *Store) DeleteShard(shardID uint64) error {
|
|||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
delete(s.pendingShardDeletes, shardID)
|
||||
s.databases[db].removeIndexType(sh.IndexType())
|
||||
}()
|
||||
|
||||
// Get the shard's local bitset of series IDs.
|
||||
|
@ -853,8 +885,10 @@ func (s *Store) DeleteRetentionPolicy(database, name string) error {
|
|||
}
|
||||
|
||||
s.mu.Lock()
|
||||
state := s.databases[database]
|
||||
for _, sh := range shards {
|
||||
delete(s.shards, sh.id)
|
||||
state.removeIndexType(sh.IndexType())
|
||||
}
|
||||
s.mu.Unlock()
|
||||
return nil
|
||||
|
@ -863,6 +897,10 @@ func (s *Store) DeleteRetentionPolicy(database, name string) error {
|
|||
// DeleteMeasurement removes a measurement and all associated series from a database.
|
||||
func (s *Store) DeleteMeasurement(database, name string) error {
|
||||
s.mu.RLock()
|
||||
if s.databases[database].hasMultipleIndexTypes() {
|
||||
s.mu.RUnlock()
|
||||
return ErrMultipleIndexTypes
|
||||
}
|
||||
shards := s.filterShards(byDatabase(database))
|
||||
s.mu.RUnlock()
|
||||
|
||||
|
@ -1218,6 +1256,10 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
|
|||
}
|
||||
|
||||
s.mu.RLock()
|
||||
if s.databases[database].hasMultipleIndexTypes() {
|
||||
s.mu.RUnlock()
|
||||
return ErrMultipleIndexTypes
|
||||
}
|
||||
sfile := s.sfiles[database]
|
||||
if sfile == nil {
|
||||
s.mu.RUnlock()
|
||||
|
|
Loading…
Reference in New Issue