Reduce lock contention when deleting high cardinality series
Deleting high cardinality series could take a very long time, cause write timeouts as well as dead lock the process. This fixes these issue to by changing the approach for cleaning up the indexes and reducing lock contention. The prior approach delete each series and updated every index (inmem) during the delete. This was very slow and cause the index to be locked while it items in a slice were removed one by one. This has been changed to mark series as deleted and then rebuild the index asynchronously which speeds up the process. There was also a dead lock that could occur when deleing the field set. Deleting the field set held a write lock and the function it invoked under the lock could try to take a read lock on the field set. This would then deadlock. This approach was also very slow and caused time out for writes. It now uses faster approach that checks for the existing of the measurment in the cache and filestore which does not take write locks.pull/8801/head
parent
e18425757d
commit
a8d9eeef36
|
@ -1156,7 +1156,7 @@ func scanFieldValue(buf []byte, i int) (int, []byte) {
|
|||
return i, buf[start:i]
|
||||
}
|
||||
|
||||
func escapeMeasurement(in []byte) []byte {
|
||||
func EscapeMeasurement(in []byte) []byte {
|
||||
for b, esc := range measurementEscapeCodes {
|
||||
in = bytes.Replace(in, []byte{b}, esc, -1)
|
||||
}
|
||||
|
@ -1498,7 +1498,7 @@ func parseTags(buf []byte) Tags {
|
|||
func MakeKey(name []byte, tags Tags) []byte {
|
||||
// unescape the name and then re-escape it to avoid double escaping.
|
||||
// The key should always be stored in escaped form.
|
||||
return append(escapeMeasurement(unescapeMeasurement(name)), tags.HashKey()...)
|
||||
return append(EscapeMeasurement(unescapeMeasurement(name)), tags.HashKey()...)
|
||||
}
|
||||
|
||||
// SetTags replaces the tags for the point.
|
||||
|
|
|
@ -707,6 +707,11 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
|
|||
if size <= 0 {
|
||||
size = tsdb.DefaultMaxPointsPerBlock
|
||||
}
|
||||
|
||||
c.mu.RLock()
|
||||
intC := c.compactionsInterrupt
|
||||
c.mu.RUnlock()
|
||||
|
||||
// The new compacted files need to added to the max generation in the
|
||||
// set. We need to find that max generation as well as the max sequence
|
||||
// number to ensure we write to the next unique location.
|
||||
|
@ -730,6 +735,12 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
|
|||
// For each TSM file, create a TSM reader
|
||||
var trs []*TSMReader
|
||||
for _, file := range tsmFiles {
|
||||
select {
|
||||
case <-intC:
|
||||
return nil, errCompactionAborted
|
||||
default:
|
||||
}
|
||||
|
||||
f, err := os.Open(file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -747,10 +758,6 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
c.mu.RLock()
|
||||
intC := c.compactionsInterrupt
|
||||
c.mu.RUnlock()
|
||||
|
||||
tsm, err := NewTSMKeyIterator(size, fast, intC, trs...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -1008,6 +1008,7 @@ func (e *Engine) DeleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
|
|||
}
|
||||
}
|
||||
}
|
||||
go e.index.Rebuild()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -1019,13 +1020,38 @@ func (e *Engine) DeleteMeasurement(name []byte) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Under lock, delete any series created deletion.
|
||||
// A sentinel error message to cause DeleteWithLock to not delete the measurement
|
||||
abortErr := fmt.Errorf("measurements still exist")
|
||||
|
||||
// Under write lock, delete the measurement if we no longer have any data stored for
|
||||
// the measurement. If data exists, we can't delete the field set yet as there
|
||||
// were writes to the measurement while we are deleting it.
|
||||
if err := e.fieldset.DeleteWithLock(string(name), func() error {
|
||||
return e.deleteMeasurement(name)
|
||||
encodedName := models.EscapeMeasurement(name)
|
||||
|
||||
// First scan the cache to see if any series exists for this measurement.
|
||||
if err := e.Cache.ApplyEntryFn(func(k []byte, _ *entry) error {
|
||||
if bytes.HasPrefix(k, encodedName) {
|
||||
return abortErr
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check the filestore.
|
||||
return e.FileStore.WalkKeys(func(k []byte, typ byte) error {
|
||||
if bytes.HasPrefix(k, encodedName) {
|
||||
return abortErr
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
}); err != nil && err != abortErr {
|
||||
// Something else failed, return it
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -58,6 +58,8 @@ type Index interface {
|
|||
RemoveShard(shardID uint64)
|
||||
|
||||
Type() string
|
||||
|
||||
Rebuild()
|
||||
}
|
||||
|
||||
// IndexFormat represents the format for an index.
|
||||
|
|
|
@ -56,6 +56,9 @@ type Index struct {
|
|||
|
||||
seriesSketch, seriesTSSketch *hll.Plus
|
||||
measurementsSketch, measurementsTSSketch *hll.Plus
|
||||
|
||||
// Mutex to control rebuilds of the index
|
||||
rebuildQueue sync.Mutex
|
||||
}
|
||||
|
||||
// NewIndex returns a new initialized Index.
|
||||
|
@ -606,6 +609,9 @@ func (i *Index) DropSeries(key []byte) error {
|
|||
// Remove the measurement's reference.
|
||||
series.Measurement().DropSeries(series)
|
||||
|
||||
// Mark the series as deleted.
|
||||
series.Delete()
|
||||
|
||||
// If the measurement no longer has any series, remove it as well.
|
||||
if !series.Measurement().HasSeries() {
|
||||
i.dropMeasurement(series.Measurement().Name)
|
||||
|
@ -654,13 +660,12 @@ func (i *Index) SetFieldName(measurement []byte, name string) {
|
|||
// ForEachMeasurementName iterates over each measurement name.
|
||||
func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error {
|
||||
i.mu.RLock()
|
||||
defer i.mu.RUnlock()
|
||||
|
||||
mms := make(Measurements, 0, len(i.measurements))
|
||||
for _, m := range i.measurements {
|
||||
mms = append(mms, m)
|
||||
}
|
||||
sort.Sort(mms)
|
||||
i.mu.RUnlock()
|
||||
|
||||
for _, m := range mms {
|
||||
if err := fn([]byte(m.Name)); err != nil {
|
||||
|
@ -752,6 +757,30 @@ func (i *Index) UnassignShard(k string, shardID uint64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Rebuild recreates the measurement indexes to allow deleted series to be removed
|
||||
// and garbage collected.
|
||||
func (i *Index) Rebuild() {
|
||||
// Only allow one rebuild at a time. This will cause all subsequent rebuilds
|
||||
// to queue. The measurement rebuild is idempotent and will not be rebuilt if
|
||||
// it does not need to be.
|
||||
i.rebuildQueue.Lock()
|
||||
defer i.rebuildQueue.Unlock()
|
||||
|
||||
i.ForEachMeasurementName(func(name []byte) error {
|
||||
// Measurement never returns an error
|
||||
m, _ := i.Measurement(name)
|
||||
if m == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
nm := m.Rebuild()
|
||||
i.mu.Lock()
|
||||
i.measurements[string(name)] = nm
|
||||
i.mu.Unlock()
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// RemoveShard removes all references to shardID from any series or measurements
|
||||
// in the index. If the shard was the only owner of data for the series, the series
|
||||
// is removed from the index.
|
||||
|
|
|
@ -31,6 +31,10 @@ type Measurement struct {
|
|||
|
||||
// lazyily created sorted series IDs
|
||||
sortedSeriesIDs SeriesIDs // sorted list of series IDs in this measurement
|
||||
|
||||
// Indicates whether the seriesByTagKeyValueMap needs to be rebuilt as it contains deleted series
|
||||
// that waste memory.
|
||||
dirty bool
|
||||
}
|
||||
|
||||
// NewMeasurement allocates and initializes a new Measurement.
|
||||
|
@ -83,7 +87,7 @@ func (m *Measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string
|
|||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
for _, id := range ids {
|
||||
if s := m.seriesByID[id]; s != nil {
|
||||
if s := m.seriesByID[id]; s != nil && !s.Deleted() {
|
||||
dst = append(dst, s.Key)
|
||||
}
|
||||
}
|
||||
|
@ -97,7 +101,7 @@ func (m *Measurement) SeriesKeysByID(ids SeriesIDs) [][]byte {
|
|||
keys := make([][]byte, 0, len(ids))
|
||||
for _, id := range ids {
|
||||
s := m.seriesByID[id]
|
||||
if s == nil {
|
||||
if s == nil || s.Deleted() {
|
||||
continue
|
||||
}
|
||||
keys = append(keys, []byte(s.Key))
|
||||
|
@ -111,6 +115,9 @@ func (m *Measurement) SeriesKeys() [][]byte {
|
|||
defer m.mu.RUnlock()
|
||||
keys := make([][]byte, 0, len(m.seriesByID))
|
||||
for _, s := range m.seriesByID {
|
||||
if s.Deleted() {
|
||||
continue
|
||||
}
|
||||
keys = append(keys, []byte(s.Key))
|
||||
}
|
||||
return keys
|
||||
|
@ -137,7 +144,10 @@ func (m *Measurement) SeriesIDs() SeriesIDs {
|
|||
m.sortedSeriesIDs = make(SeriesIDs, 0, len(m.seriesByID))
|
||||
}
|
||||
|
||||
for k := range m.seriesByID {
|
||||
for k, v := range m.seriesByID {
|
||||
if v.Deleted() {
|
||||
continue
|
||||
}
|
||||
m.sortedSeriesIDs = append(m.sortedSeriesIDs, k)
|
||||
}
|
||||
sort.Sort(m.sortedSeriesIDs)
|
||||
|
@ -253,25 +263,32 @@ func (m *Measurement) DropSeries(series *Series) {
|
|||
// clear our lazily sorted set of ids
|
||||
m.sortedSeriesIDs = m.sortedSeriesIDs[:0]
|
||||
|
||||
// remove this series id from the tag index on the measurement
|
||||
// s.seriesByTagKeyValue is defined as map[string]map[string]SeriesIDs
|
||||
series.ForEachTag(func(t models.Tag) {
|
||||
values := m.seriesByTagKeyValue[string(t.Key)][string(t.Value)]
|
||||
ids := filter(values, seriesID)
|
||||
// Check to see if we have any ids, if not, remove the key
|
||||
if len(ids) == 0 {
|
||||
delete(m.seriesByTagKeyValue[string(t.Key)], string(t.Value))
|
||||
} else {
|
||||
m.seriesByTagKeyValue[string(t.Key)][string(t.Value)] = ids
|
||||
// Mark that this measurements tagValue map has stale entries that need to be rebuilt.
|
||||
m.dirty = true
|
||||
}
|
||||
|
||||
// If we have no values, then we delete the key
|
||||
if len(m.seriesByTagKeyValue[string(t.Key)]) == 0 {
|
||||
delete(m.seriesByTagKeyValue, string(t.Key))
|
||||
}
|
||||
})
|
||||
func (m *Measurement) Rebuild() *Measurement {
|
||||
m.mu.RLock()
|
||||
|
||||
return
|
||||
// Nothing needs to be rebuilt.
|
||||
if !m.dirty {
|
||||
m.mu.RUnlock()
|
||||
return m
|
||||
}
|
||||
|
||||
// Create a new measurement from the state of the existing measurement
|
||||
nm := NewMeasurement(m.database, string(m.name))
|
||||
nm.fieldNames = m.fieldNames
|
||||
m.mu.RUnlock()
|
||||
|
||||
// Re-add each series to allow the measurement indexes to get re-created. If there were
|
||||
// deletes, the existing measurment may have references to deleted series that need to be
|
||||
// expunged. Note: we're using SeriesIDs which returns the series in sorted order so that
|
||||
// re-adding does not incur a sort for each series added.
|
||||
for _, id := range m.SeriesIDs() {
|
||||
nm.AddSeries(m.SeriesByID(id))
|
||||
}
|
||||
return nm
|
||||
}
|
||||
|
||||
// filters walks the where clause of a select statement and returns a map with all series ids
|
||||
|
@ -345,7 +362,7 @@ func (m *Measurement) TagSets(shardID uint64, opt query.IteratorOptions) ([]*que
|
|||
}
|
||||
|
||||
s := m.seriesByID[id]
|
||||
if !s.Assigned(shardID) {
|
||||
if s == nil || s.Deleted() || !s.Assigned(shardID) {
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -1112,6 +1129,7 @@ type Series struct {
|
|||
ID uint64
|
||||
measurement *Measurement
|
||||
shardIDs map[uint64]struct{} // shards that have this series defined
|
||||
deleted bool
|
||||
}
|
||||
|
||||
// NewSeries returns an initialized series struct
|
||||
|
@ -1195,6 +1213,21 @@ func (s *Series) GetTagString(key string) string {
|
|||
return s.tags.GetString(key)
|
||||
}
|
||||
|
||||
// Delete marks this series as deleted. A deleted series should not be returned for queries.
|
||||
func (s *Series) Delete() {
|
||||
s.mu.Lock()
|
||||
s.deleted = true
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// Deleted indicates if this was previously deleted.
|
||||
func (s *Series) Deleted() bool {
|
||||
s.mu.RLock()
|
||||
v := s.deleted
|
||||
s.mu.RUnlock()
|
||||
return v
|
||||
}
|
||||
|
||||
// SeriesIDs is a convenience type for sorting, checking equality, and doing
|
||||
// union and intersection of collections of series ids.
|
||||
type SeriesIDs []uint64
|
||||
|
|
|
@ -972,6 +972,8 @@ func (i *Index) compactToLevel(files []*IndexFile, level int) {
|
|||
}
|
||||
}
|
||||
|
||||
func (i *Index) Rebuild() {}
|
||||
|
||||
func (i *Index) CheckLogFile() error {
|
||||
// Check log file size under read lock.
|
||||
if size := func() int64 {
|
||||
|
|
Loading…
Reference in New Issue