Further optimisations and a race fix
parent
05ec6ad9ad
commit
ec27c57127
|
@ -34,6 +34,10 @@ type entry struct {
|
|||
mu sync.RWMutex
|
||||
values Values // All stored values.
|
||||
needSort bool // true if the values are out of order and require deduping.
|
||||
|
||||
// The type of values stored. Read only so doesn't need to be protected by
|
||||
// mu.
|
||||
vtype int
|
||||
}
|
||||
|
||||
// newEntryValues returns a new instance of entry with the given values. If the
|
||||
|
@ -74,6 +78,9 @@ func newEntryValues(values []Value, hint int) (*entry, error) {
|
|||
}
|
||||
}
|
||||
|
||||
// Set the type of values stored.
|
||||
e.vtype = et
|
||||
|
||||
return e, nil
|
||||
}
|
||||
|
||||
|
@ -89,8 +96,12 @@ func (e *entry) add(values []Value) error {
|
|||
return nil // Nothing to do.
|
||||
}
|
||||
|
||||
// Are any of the new values out of order?
|
||||
// Are any of the new values out of order or the wrong type?
|
||||
for _, v := range values {
|
||||
if e.vtype != valueType(v) {
|
||||
return tsdb.ErrFieldTypeConflict
|
||||
}
|
||||
|
||||
if v.UnixNano() <= prevTime {
|
||||
needSort = true
|
||||
break
|
||||
|
@ -99,11 +110,8 @@ func (e *entry) add(values []Value) error {
|
|||
}
|
||||
|
||||
// entry currently has no values, so add the new ones and we're done.
|
||||
e.mu.RLock()
|
||||
empty := len(e.values) == 0
|
||||
e.mu.RUnlock()
|
||||
if empty {
|
||||
e.mu.Lock()
|
||||
e.mu.Lock()
|
||||
if len(e.values) == 0 {
|
||||
// Do the values need sorting?
|
||||
if needSort {
|
||||
e.needSort = needSort
|
||||
|
@ -120,25 +128,13 @@ func (e *entry) add(values []Value) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
e.mu.RLock()
|
||||
// What's the type of the values in the entry?
|
||||
vtype := valueType(e.values[0])
|
||||
// Are the new values occurring after the existing ones?
|
||||
if !needSort && e.values[len(e.values)-1].UnixNano() >= values[0].UnixNano() {
|
||||
// The new values occurring after the existing ones?
|
||||
needSort = true
|
||||
}
|
||||
e.mu.RUnlock()
|
||||
|
||||
// Make sure the new values are the same type as the exiting values.
|
||||
for _, v := range values {
|
||||
if vtype != valueType(v) {
|
||||
return tsdb.ErrFieldTypeConflict
|
||||
}
|
||||
}
|
||||
|
||||
// Append the new values to the existing ones...
|
||||
|
||||
e.mu.Lock()
|
||||
// Do the values need sorting?
|
||||
if needSort {
|
||||
e.needSort = true
|
||||
|
@ -410,12 +406,7 @@ func (c *Cache) Snapshot() (*Cache, error) {
|
|||
|
||||
snapshotSize := c.Size() // record the number of bytes written into a snapshot
|
||||
|
||||
var err error
|
||||
if c.store, err = newring(ringShards); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Reset the cache store.
|
||||
// Reset the cache's store.
|
||||
c.store.reset()
|
||||
atomic.StoreUint64(&c.size, 0)
|
||||
c.lastSnapshot = time.Now()
|
||||
|
@ -450,7 +441,12 @@ func (c *Cache) ClearSnapshot(success bool) {
|
|||
if success {
|
||||
c.snapshotAttempts = 0
|
||||
c.snapshotSize = 0
|
||||
c.snapshot = nil
|
||||
|
||||
// Reset the snapshot's store, and reset the snapshot to a fresh Cache.
|
||||
c.snapshot.store.reset()
|
||||
c.snapshot = &Cache{
|
||||
store: c.snapshot.store,
|
||||
}
|
||||
|
||||
c.updateSnapshots()
|
||||
}
|
||||
|
|
|
@ -86,6 +86,7 @@ func (r *ring) reset() {
|
|||
for _, partition := range r.partitions {
|
||||
partition.reset()
|
||||
}
|
||||
r.keysHint = 0
|
||||
}
|
||||
|
||||
// getPartition retrieves the hash ring partition associated with the provided
|
||||
|
@ -230,19 +231,22 @@ func (p *partition) write(key string, values Values) error {
|
|||
return e.add(values)
|
||||
}
|
||||
|
||||
// Create a new entry using a preallocated size if we have a hint available.
|
||||
p.mu.RLock()
|
||||
hint, _ := p.entrySizeHints[xxhash.Sum64([]byte(key))]
|
||||
p.mu.RUnlock()
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
// Check again.
|
||||
if e, ok = p.store[key]; ok {
|
||||
return e.add(values)
|
||||
}
|
||||
|
||||
// Create a new entry using a preallocated size if we have a hint available.
|
||||
hint, _ := p.entrySizeHints[xxhash.Sum64([]byte(key))]
|
||||
e, err := newEntryValues(values, hint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.mu.Lock()
|
||||
p.store[key] = e
|
||||
p.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -265,7 +269,7 @@ func (p *partition) remove(key string) {
|
|||
func (p *partition) keys() []string {
|
||||
p.mu.RLock()
|
||||
keys := make([]string, 0, len(p.store))
|
||||
for k, _ := range p.store {
|
||||
for k := range p.store {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
p.mu.RUnlock()
|
||||
|
|
Loading…
Reference in New Issue