Merge pull request #11970 from influxdata/jmw-shard-epoch-races

Fix some more shard epoch races
pull/12745/head
Jeff Wendling 2019-02-19 10:40:09 -07:00 committed by GitHub
commit 8e56b3ff1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 14 additions and 2 deletions

View File

@ -493,6 +493,16 @@ func (s *Store) Close() error {
return nil
}
// epochsForShards returns a copy of the epoch trackers only including what is necessary
// for the provided shards. Must be called under the lock.
func (s *Store) epochsForShards(shards []*Shard) map[uint64]*epochTracker {
out := make(map[uint64]*epochTracker)
for _, sh := range shards {
out[sh.id] = s.epochs[sh.id]
}
return out
}
// openSeriesFile either returns or creates a series file for the provided
// database. It must be called under a full lock.
func (s *Store) openSeriesFile(database string) (*SeriesFile, error) {
@ -917,6 +927,7 @@ func (s *Store) DeleteMeasurement(database, name string) error {
return ErrMultipleIndexTypes
}
shards := s.filterShards(byDatabase(database))
epochs := s.epochsForShards(shards)
s.mu.RUnlock()
// Limit to 1 delete for each shard since expanding the measurement into the list
@ -929,7 +940,7 @@ func (s *Store) DeleteMeasurement(database, name string) error {
// install our guard and wait for any prior deletes to finish. the
// guard ensures future deletes that could conflict wait for us.
guard := newGuard(influxql.MinTime, influxql.MaxTime, []string{name}, nil)
waiter := s.epochs[sh.id].WaitDelete(guard)
waiter := epochs[sh.id].WaitDelete(guard)
waiter.Wait()
defer waiter.Done()
@ -1289,6 +1300,7 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
return nil
}
shards := s.filterShards(byDatabase(database))
epochs := s.epochsForShards(shards)
s.mu.RUnlock()
// Limit to 1 delete for each shard since expanding the measurement into the list
@ -1318,7 +1330,7 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
// install our guard and wait for any prior deletes to finish. the
// guard ensures future deletes that could conflict wait for us.
waiter := s.epochs[sh.id].WaitDelete(newGuard(min, max, names, condition))
waiter := epochs[sh.id].WaitDelete(newGuard(min, max, names, condition))
waiter.Wait()
defer waiter.Done()