From ec27c571275590a8d3c1c9d8856d6d33c842d863 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Fri, 9 Dec 2016 17:18:48 +0000 Subject: [PATCH] Further optimisations and a race fix --- tsdb/engine/tsm1/cache.go | 48 ++++++++++++++++++--------------------- tsdb/engine/tsm1/ring.go | 18 +++++++++------ 2 files changed, 33 insertions(+), 33 deletions(-) diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index e918b20780..6faffb795f 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -34,6 +34,10 @@ type entry struct { mu sync.RWMutex values Values // All stored values. needSort bool // true if the values are out of order and require deduping. + + // The type of values stored. Read only so doesn't need to be protected by + // mu. + vtype int } // newEntryValues returns a new instance of entry with the given values. If the @@ -74,6 +78,9 @@ func newEntryValues(values []Value, hint int) (*entry, error) { } } + // Set the type of values stored. + e.vtype = et + return e, nil } @@ -89,8 +96,12 @@ func (e *entry) add(values []Value) error { return nil // Nothing to do. } - // Are any of the new values out of order? + // Are any of the new values out of order or the wrong type? for _, v := range values { + if e.vtype != valueType(v) { + return tsdb.ErrFieldTypeConflict + } + if v.UnixNano() <= prevTime { needSort = true break @@ -99,11 +110,8 @@ func (e *entry) add(values []Value) error { } // entry currently has no values, so add the new ones and we're done. - e.mu.RLock() - empty := len(e.values) == 0 - e.mu.RUnlock() - if empty { - e.mu.Lock() + e.mu.Lock() + if len(e.values) == 0 { // Do the values need sorting? if needSort { e.needSort = needSort @@ -120,25 +128,13 @@ func (e *entry) add(values []Value) error { return nil } - e.mu.RLock() - // What's the type of the values in the entry? - vtype := valueType(e.values[0]) - // Are the new values occurring after the existing ones? if !needSort && e.values[len(e.values)-1].UnixNano() >= values[0].UnixNano() { + // The new values occurring after the existing ones? needSort = true } - e.mu.RUnlock() - - // Make sure the new values are the same type as the exiting values. - for _, v := range values { - if vtype != valueType(v) { - return tsdb.ErrFieldTypeConflict - } - } // Append the new values to the existing ones... - e.mu.Lock() // Do the values need sorting? if needSort { e.needSort = true @@ -410,12 +406,7 @@ func (c *Cache) Snapshot() (*Cache, error) { snapshotSize := c.Size() // record the number of bytes written into a snapshot - var err error - if c.store, err = newring(ringShards); err != nil { - return nil, err - } - - // Reset the cache store. + // Reset the cache's store. c.store.reset() atomic.StoreUint64(&c.size, 0) c.lastSnapshot = time.Now() @@ -450,7 +441,12 @@ func (c *Cache) ClearSnapshot(success bool) { if success { c.snapshotAttempts = 0 c.snapshotSize = 0 - c.snapshot = nil + + // Reset the snapshot's store, and reset the snapshot to a fresh Cache. + c.snapshot.store.reset() + c.snapshot = &Cache{ + store: c.snapshot.store, + } c.updateSnapshots() } diff --git a/tsdb/engine/tsm1/ring.go b/tsdb/engine/tsm1/ring.go index 385bb8322f..5b8b833cf5 100644 --- a/tsdb/engine/tsm1/ring.go +++ b/tsdb/engine/tsm1/ring.go @@ -86,6 +86,7 @@ func (r *ring) reset() { for _, partition := range r.partitions { partition.reset() } + r.keysHint = 0 } // getPartition retrieves the hash ring partition associated with the provided @@ -230,19 +231,22 @@ func (p *partition) write(key string, values Values) error { return e.add(values) } - // Create a new entry using a preallocated size if we have a hint available. - p.mu.RLock() - hint, _ := p.entrySizeHints[xxhash.Sum64([]byte(key))] - p.mu.RUnlock() + p.mu.Lock() + defer p.mu.Unlock() + // Check again. + if e, ok = p.store[key]; ok { + return e.add(values) + } + + // Create a new entry using a preallocated size if we have a hint available. + hint, _ := p.entrySizeHints[xxhash.Sum64([]byte(key))] e, err := newEntryValues(values, hint) if err != nil { return err } - p.mu.Lock() p.store[key] = e - p.mu.Unlock() return nil } @@ -265,7 +269,7 @@ func (p *partition) remove(key string) { func (p *partition) keys() []string { p.mu.RLock() keys := make([]string, 0, len(p.store)) - for k, _ := range p.store { + for k := range p.store { keys = append(keys, k) } p.mu.RUnlock()