From f4f0373579e73c4816db84e121f66277c7caee40 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 29 Jun 2015 10:12:15 -0600 Subject: [PATCH] Sort points after appending if needed Writing points that were not sorted by time could cause very high CPU usages and increased latencies because each point inserted would cause the in-memory cache to be resorted. The worst case would be writing a large batch of N points in reverse time order which would invoke N sorts of the slice. This patch keeps track of which slices need to be sorted and sorts them once at the end. In the previous example, the N sorts becomes one. There is still a pathalogical case that would require N/2 sorts. For example, 10000 points split across 5000 series. Each series has two points that are in reverse time order. This would incur 5000 sorts still. Fixes #3159 --- CHANGELOG.md | 1 + tsdb/shard.go | 19 +++++++++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9eab4b7007..7db2d2f986 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ - [#3255](https://github.com/influxdb/influxdb/pull/3255): Flush WAL on start-up as soon as possible. - [#3289](https://github.com/influxdb/influxdb/issues/3289): InfluxDB crashes on floats without decimal - [#3298](https://github.com/influxdb/influxdb/pull/3298): Corrected WAL & flush parameters in default config. Thanks @jhorwit2 +- [#3152](https://github.com/influxdb/influxdb/issues/3159): High CPU Usage with unsorted writes ## v0.9.1 [2015-07-02] diff --git a/tsdb/shard.go b/tsdb/shard.go index 4d0204e160..c3bc48ca86 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -317,6 +317,9 @@ func (s *Shard) WritePoints(points []Point) error { s.mu.Lock() defer s.mu.Unlock() + // tracks which in-memory caches need to be resorted + resorts := map[uint8]map[string]struct{}{} + for _, p := range points { // Generate in-memory cache entry of . key, data := p.Key(), p.Data() @@ -332,9 +335,14 @@ func (s *Shard) WritePoints(points []Point) error { // Append to cache list. a = append(a, v) - // Sort by timestamp if not appending. + // If not appending, keep track of cache lists that need to be resorted. if !appending { - sort.Sort(byteSlices(a)) + series := resorts[partitionID] + if series == nil { + series = map[string]struct{}{} + resorts[partitionID] = series + } + series[string(key)] = struct{}{} } s.cache[partitionID][string(key)] = a @@ -343,6 +351,13 @@ func (s *Shard) WritePoints(points []Point) error { s.walSize += len(key) + len(v) } + // Sort by timestamp if not appending. + for partitionID, cache := range resorts { + for key, _ := range cache { + sort.Sort(byteSlices(s.cache[partitionID][key])) + } + } + // Check for flush threshold. s.triggerAutoFlush()