Reduce contention when adding entries
parent
98f0392ca6
commit
f2b5c7f5be
|
@ -71,6 +71,11 @@ func (e *entry) add(values []Value) error {
|
|||
needSort bool
|
||||
)
|
||||
|
||||
if len(values) == 0 {
|
||||
return nil // Nothing to do.
|
||||
}
|
||||
|
||||
// Are any of the new values out of order?
|
||||
for _, v := range values {
|
||||
if v.UnixNano() <= prevTime {
|
||||
needSort = true
|
||||
|
@ -79,30 +84,42 @@ func (e *entry) add(values []Value) error {
|
|||
prevTime = v.UnixNano()
|
||||
}
|
||||
|
||||
// if there are existing values make sure they're all less than the first of
|
||||
// the new values being added
|
||||
e.mu.Lock()
|
||||
if needSort {
|
||||
e.needSort = needSort
|
||||
}
|
||||
// entry currently has no values, so add the new ones and we're done.
|
||||
if len(e.values) == 0 {
|
||||
e.mu.Lock()
|
||||
// Do the values need sorting?
|
||||
if needSort {
|
||||
e.needSort = needSort
|
||||
}
|
||||
e.values = values
|
||||
} else {
|
||||
// Make sure the new values are the same type as the exiting values
|
||||
et := valueType(e.values[0])
|
||||
for _, v := range values {
|
||||
if et != valueType(v) {
|
||||
e.mu.Unlock()
|
||||
return tsdb.ErrFieldTypeConflict
|
||||
}
|
||||
}
|
||||
l := len(e.values)
|
||||
lastValTime := e.values[l-1].UnixNano()
|
||||
if lastValTime >= values[0].UnixNano() {
|
||||
e.needSort = true
|
||||
}
|
||||
e.values = append(e.values, values...)
|
||||
e.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
e.mu.RLock()
|
||||
// What's the type of the values in the entry?
|
||||
vtype := valueType(e.values[0])
|
||||
// Are the new values occuring after the existing ones?
|
||||
if !needSort && e.values[len(e.values)-1].UnixNano() >= values[0].UnixNano() {
|
||||
needSort = true
|
||||
}
|
||||
e.mu.RUnlock()
|
||||
|
||||
// Make sure the new values are the same type as the exiting values.
|
||||
for _, v := range values {
|
||||
if vtype != valueType(v) {
|
||||
return tsdb.ErrFieldTypeConflict
|
||||
}
|
||||
}
|
||||
|
||||
// Append the new values to the existing ones...
|
||||
|
||||
e.mu.Lock()
|
||||
// Do the values need sorting?
|
||||
if needSort {
|
||||
e.needSort = true
|
||||
}
|
||||
e.values = append(e.values, values...)
|
||||
e.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -741,3 +741,30 @@ func BenchmarkCacheParallelFloatEntries(b *testing.B) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkEntry_add(b *testing.B) {
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
b.StopTimer()
|
||||
values := make([]Value, 10)
|
||||
for i := 0; i < 10; i++ {
|
||||
values[i] = NewValue(int64(i+1), float64(i))
|
||||
}
|
||||
|
||||
otherValues := make([]Value, 10)
|
||||
for i := 0; i < 10; i++ {
|
||||
otherValues[i] = NewValue(1, float64(i))
|
||||
}
|
||||
|
||||
entry, err := newEntryValues(values)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
b.StartTimer()
|
||||
if err := entry.add(otherValues); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue