Fix some races

pull/7633/head
Edd Robinson 2016-11-16 16:46:22 +00:00
parent d2923c7bf9
commit d78ca1a0f3
3 changed files with 40 additions and 10 deletions

View File

@ -99,8 +99,10 @@ func (e *entry) add(values []Value) error {
}
// entry currently has no values, so add the new ones and we're done.
// TODO(edd): I think this branch is unreachable. Need to verify.
if len(e.values) == 0 {
e.mu.RLock()
empty := len(e.values) == 0
e.mu.RUnlock()
if empty {
e.mu.Lock()
// Do the values need sorting?
if needSort {
@ -121,7 +123,7 @@ func (e *entry) add(values []Value) error {
e.mu.RLock()
// What's the type of the values in the entry?
vtype := valueType(e.values[0])
// Are the new values occuring after the existing ones?
// Are the new values occurring after the existing ones?
if !needSort && e.values[len(e.values)-1].UnixNano() >= values[0].UnixNano() {
needSort = true
}
@ -383,8 +385,11 @@ func (c *Cache) Snapshot() (*Cache, error) {
}
}
// Append the current cache values to the snapshot.
if err := c.store.apply(func(k string, e *entry) error {
// Append the current cache values to the snapshot. Because we're accessing
// the Cache we need to call f on each partition in serial.
if err := c.store.applySerial(func(k string, e *entry) error {
e.mu.RLock()
defer e.mu.RUnlock()
snapshotEntry, ok := c.snapshot.store.entry(k)
if ok {
if err := snapshotEntry.add(e.values); err != nil {
@ -615,6 +620,13 @@ func (c *Cache) ApplyEntryFn(f func(key string, entry *entry) error) error {
return store.apply(f)
}
func (c *Cache) ApplySerialEntryFn(f func(key string, entry *entry) error) error {
c.mu.RLock()
store := c.store
c.mu.RUnlock()
return store.applySerial(f)
}
// CacheLoader processes a set of WAL segment files, and loads a cache with the data
// contained within those files. Processing of the supplied files take place in the
// order they exist in the files slice.

View File

@ -784,8 +784,8 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error {
// find the keys in the cache and remove them
walKeys := deleteKeys[:0]
// ApplyEntryFn cannot return an error in this invocation.
_ = e.Cache.ApplyEntryFn(func(k string, _ *entry) error {
// ApplySerialEntryFn cannot return an error in this invocation.
_ = e.Cache.ApplySerialEntryFn(func(k string, _ *entry) error {
seriesKey, _ := SeriesAndFieldFromCompositeKey([]byte(k))
if _, ok := keyMap[string(seriesKey)]; ok {
walKeys = append(walKeys, k)

View File

@ -137,9 +137,10 @@ func (r *ring) keys(sorted bool) []string {
}
// apply applies the provided function to every entry in the ring under a read
// lock. The provided function will be called with each key and the
// corresponding entry. The first error encountered will be returned, if any.
// apply is safe for use by multiple goroutines.
// lock using a separate goroutine for each partition. The provided function
// will be called with each key and the corresponding entry. The first error
// encountered will be returned, if any. apply is safe for use by multiple
// goroutines.
func (r *ring) apply(f func(string, *entry) error) error {
var (
@ -179,6 +180,23 @@ func (r *ring) apply(f func(string, *entry) error) error {
return nil
}
// applySerial is similar to apply, but invokes f on each partition in the same
// goroutine.
// apply is safe for use by multiple goroutines.
func (r *ring) applySerial(f func(string, *entry) error) error {
for _, p := range r.partitions {
p.mu.RLock()
for k, e := range p.store {
if err := f(k, e); err != nil {
p.mu.RUnlock()
return err
}
}
p.mu.RUnlock()
}
return nil
}
// partition provides safe access to a map of series keys to entries.
type partition struct {
mu sync.RWMutex