From d78ca1a0f37e2a676a3f1d0a9911286e1aa8eaa7 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Wed, 16 Nov 2016 16:46:22 +0000 Subject: [PATCH] Fix some races --- tsdb/engine/tsm1/cache.go | 22 +++++++++++++++++----- tsdb/engine/tsm1/engine.go | 4 ++-- tsdb/engine/tsm1/ring.go | 24 +++++++++++++++++++++--- 3 files changed, 40 insertions(+), 10 deletions(-) diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index 2b777d0ea9..e30df74c8a 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -99,8 +99,10 @@ func (e *entry) add(values []Value) error { } // entry currently has no values, so add the new ones and we're done. - // TODO(edd): I think this branch is unreachable. Need to verify. - if len(e.values) == 0 { + e.mu.RLock() + empty := len(e.values) == 0 + e.mu.RUnlock() + if empty { e.mu.Lock() // Do the values need sorting? if needSort { @@ -121,7 +123,7 @@ func (e *entry) add(values []Value) error { e.mu.RLock() // What's the type of the values in the entry? vtype := valueType(e.values[0]) - // Are the new values occuring after the existing ones? + // Are the new values occurring after the existing ones? if !needSort && e.values[len(e.values)-1].UnixNano() >= values[0].UnixNano() { needSort = true } @@ -383,8 +385,11 @@ func (c *Cache) Snapshot() (*Cache, error) { } } - // Append the current cache values to the snapshot. - if err := c.store.apply(func(k string, e *entry) error { + // Append the current cache values to the snapshot. Because we're accessing + // the Cache we need to call f on each partition in serial. + if err := c.store.applySerial(func(k string, e *entry) error { + e.mu.RLock() + defer e.mu.RUnlock() snapshotEntry, ok := c.snapshot.store.entry(k) if ok { if err := snapshotEntry.add(e.values); err != nil { @@ -615,6 +620,13 @@ func (c *Cache) ApplyEntryFn(f func(key string, entry *entry) error) error { return store.apply(f) } +func (c *Cache) ApplySerialEntryFn(f func(key string, entry *entry) error) error { + c.mu.RLock() + store := c.store + c.mu.RUnlock() + return store.applySerial(f) +} + // CacheLoader processes a set of WAL segment files, and loads a cache with the data // contained within those files. Processing of the supplied files take place in the // order they exist in the files slice. diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 17ec71b911..2cd78e07d7 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -784,8 +784,8 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error { // find the keys in the cache and remove them walKeys := deleteKeys[:0] - // ApplyEntryFn cannot return an error in this invocation. - _ = e.Cache.ApplyEntryFn(func(k string, _ *entry) error { + // ApplySerialEntryFn cannot return an error in this invocation. + _ = e.Cache.ApplySerialEntryFn(func(k string, _ *entry) error { seriesKey, _ := SeriesAndFieldFromCompositeKey([]byte(k)) if _, ok := keyMap[string(seriesKey)]; ok { walKeys = append(walKeys, k) diff --git a/tsdb/engine/tsm1/ring.go b/tsdb/engine/tsm1/ring.go index ef39c2494b..385bb8322f 100644 --- a/tsdb/engine/tsm1/ring.go +++ b/tsdb/engine/tsm1/ring.go @@ -137,9 +137,10 @@ func (r *ring) keys(sorted bool) []string { } // apply applies the provided function to every entry in the ring under a read -// lock. The provided function will be called with each key and the -// corresponding entry. The first error encountered will be returned, if any. -// apply is safe for use by multiple goroutines. +// lock using a separate goroutine for each partition. The provided function +// will be called with each key and the corresponding entry. The first error +// encountered will be returned, if any. apply is safe for use by multiple +// goroutines. func (r *ring) apply(f func(string, *entry) error) error { var ( @@ -179,6 +180,23 @@ func (r *ring) apply(f func(string, *entry) error) error { return nil } +// applySerial is similar to apply, but invokes f on each partition in the same +// goroutine. +// apply is safe for use by multiple goroutines. +func (r *ring) applySerial(f func(string, *entry) error) error { + for _, p := range r.partitions { + p.mu.RLock() + for k, e := range p.store { + if err := f(k, e); err != nil { + p.mu.RUnlock() + return err + } + } + p.mu.RUnlock() + } + return nil +} + // partition provides safe access to a map of series keys to entries. type partition struct { mu sync.RWMutex