Sort points after appending if needed

Writing points that were not sorted by time could cause very high
CPU usages and increased latencies because each point inserted would
cause the in-memory cache to be resorted.  The worst case would be
writing a large batch of N points in reverse time order which would
invoke N sorts of the slice.

This patch keeps track of which slices need to be sorted and sorts
them once at the end.  In the previous example, the N sorts becomes
one.  There is still a pathalogical case that would require N/2 sorts.
For example, 10000 points split across 5000 series.  Each series has two
points that are in reverse time order.  This would incur 5000 sorts still.

Fixes #3159
pull/3293/head
Jason Wilder 2015-06-29 10:12:15 -06:00
parent 7b815fe8aa
commit f4f0373579
2 changed files with 18 additions and 2 deletions

View File

@ -16,6 +16,7 @@
- [#3255](https://github.com/influxdb/influxdb/pull/3255): Flush WAL on start-up as soon as possible.
- [#3289](https://github.com/influxdb/influxdb/issues/3289): InfluxDB crashes on floats without decimal
- [#3298](https://github.com/influxdb/influxdb/pull/3298): Corrected WAL & flush parameters in default config. Thanks @jhorwit2
- [#3152](https://github.com/influxdb/influxdb/issues/3159): High CPU Usage with unsorted writes
## v0.9.1 [2015-07-02]

View File

@ -317,6 +317,9 @@ func (s *Shard) WritePoints(points []Point) error {
s.mu.Lock()
defer s.mu.Unlock()
// tracks which in-memory caches need to be resorted
resorts := map[uint8]map[string]struct{}{}
for _, p := range points {
// Generate in-memory cache entry of <timestamp,data>.
key, data := p.Key(), p.Data()
@ -332,9 +335,14 @@ func (s *Shard) WritePoints(points []Point) error {
// Append to cache list.
a = append(a, v)
// Sort by timestamp if not appending.
// If not appending, keep track of cache lists that need to be resorted.
if !appending {
sort.Sort(byteSlices(a))
series := resorts[partitionID]
if series == nil {
series = map[string]struct{}{}
resorts[partitionID] = series
}
series[string(key)] = struct{}{}
}
s.cache[partitionID][string(key)] = a
@ -343,6 +351,13 @@ func (s *Shard) WritePoints(points []Point) error {
s.walSize += len(key) + len(v)
}
// Sort by timestamp if not appending.
for partitionID, cache := range resorts {
for key, _ := range cache {
sort.Sort(byteSlices(s.cache[partitionID][key]))
}
}
// Check for flush threshold.
s.triggerAutoFlush()