diff --git a/CHANGELOG.md b/CHANGELOG.md index 9eab4b7007..7db2d2f986 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ - [#3255](https://github.com/influxdb/influxdb/pull/3255): Flush WAL on start-up as soon as possible. - [#3289](https://github.com/influxdb/influxdb/issues/3289): InfluxDB crashes on floats without decimal - [#3298](https://github.com/influxdb/influxdb/pull/3298): Corrected WAL & flush parameters in default config. Thanks @jhorwit2 +- [#3152](https://github.com/influxdb/influxdb/issues/3159): High CPU Usage with unsorted writes ## v0.9.1 [2015-07-02] diff --git a/tsdb/shard.go b/tsdb/shard.go index 4d0204e160..c3bc48ca86 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -317,6 +317,9 @@ func (s *Shard) WritePoints(points []Point) error { s.mu.Lock() defer s.mu.Unlock() + // tracks which in-memory caches need to be resorted + resorts := map[uint8]map[string]struct{}{} + for _, p := range points { // Generate in-memory cache entry of . key, data := p.Key(), p.Data() @@ -332,9 +335,14 @@ func (s *Shard) WritePoints(points []Point) error { // Append to cache list. a = append(a, v) - // Sort by timestamp if not appending. + // If not appending, keep track of cache lists that need to be resorted. if !appending { - sort.Sort(byteSlices(a)) + series := resorts[partitionID] + if series == nil { + series = map[string]struct{}{} + resorts[partitionID] = series + } + series[string(key)] = struct{}{} } s.cache[partitionID][string(key)] = a @@ -343,6 +351,13 @@ func (s *Shard) WritePoints(points []Point) error { s.walSize += len(key) + len(v) } + // Sort by timestamp if not appending. + for partitionID, cache := range resorts { + for key, _ := range cache { + sort.Sort(byteSlices(s.cache[partitionID][key])) + } + } + // Check for flush threshold. s.triggerAutoFlush()