From cdbdd156f3b70024f2f512446c5ba3f5fdfc431f Mon Sep 17 00:00:00 2001 From: Mark Rushakoff Date: Thu, 12 Jan 2017 16:16:54 -0800 Subject: [PATCH] Fix memory leak of retained HTTP write payloads This leak seems to have been introduced in 8aa224b22dc007e, present in 1.1.0 and 1.1.1. When points were parsed from HTTP payloads, their tags and fields referred to subslices of the request body; if any tag set introduced a new series, then those tags then were stored in the in-memory series index objects, preventing the HTTP body from being garbage collected. If there were no new series in the payload, then the request body would be garbage collected as usual. Now, we clone the tags before we store them in the index. This is an imperfect fix because the Point still holds references to the original tags, and the Point's field iterator also refers to the payload buffer. However, the current write code path does not retain references to the Point or its fields; and this change will likely be obsoleted when TSI is introduced. This change likely fixes #7827, #7810, #7778, and perhaps others. --- models/points.go | 35 +++++++++++++++++++++++++++++++++++ models/points_test.go | 32 ++++++++++++++++++++++++++++++++ tsdb/shard.go | 4 ++++ 3 files changed, 71 insertions(+) diff --git a/models/points.go b/models/points.go index 537a051fbe..a1643c78ed 100644 --- a/models/points.go +++ b/models/points.go @@ -233,6 +233,9 @@ func ParsePointsString(buf string) ([]Point, error) { } // ParseKey returns the measurement name and tags from a point. +// +// NOTE: to minimize heap allocations, the returned Tags will refer to subslices of buf. +// This can have the unintended effect preventing buf from being garbage collected. func ParseKey(buf []byte) (string, Tags, error) { // Ignore the error because scanMeasurement returns "missing fields" which we ignore // when just parsing a key @@ -249,6 +252,9 @@ func ParseKey(buf []byte) (string, Tags, error) { // ParsePointsWithPrecision is similar to ParsePoints, but allows the // caller to provide a precision for time. +// +// NOTE: to minimize heap allocations, the returned Points will refer to subslices of buf. +// This can have the unintended effect preventing buf from being garbage collected. func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) { points := make([]Point, 0, bytes.Count(buf, []byte{'\n'})+1) var ( @@ -1617,6 +1623,22 @@ type Tag struct { Value []byte } +// Clone returns a shallow copy of Tag. +// +// Tags associated with a Point created by ParsePointsWithPrecision will hold references to the byte slice that was parsed. +// Use Clone to create a Tag with new byte slices that do not refer to the argument to ParsePointsWithPrecision. +func (t Tag) Clone() Tag { + other := Tag{ + Key: make([]byte, len(t.Key)), + Value: make([]byte, len(t.Value)), + } + + copy(other.Key, t.Key) + copy(other.Value, t.Value) + + return other +} + // Tags represents a sorted list of tags. type Tags []Tag @@ -1633,6 +1655,19 @@ func NewTags(m map[string]string) Tags { return a } +// Clone returns a shallow copy of Tags. +// +// Tags associated with a Point created by ParsePointsWithPrecision will hold references to the byte slice that was parsed. +// Use Clone to create Tags with new byte slices that do not refer to the argument to ParsePointsWithPrecision. +func (a Tags) Clone() Tags { + others := make(Tags, len(a)) + for i := range a { + others[i] = a[i].Clone() + } + + return others +} + // Len implements sort.Interface. func (a Tags) Len() int { return len(a) } diff --git a/models/points_test.go b/models/points_test.go index 67f717a7aa..26977b19af 100644 --- a/models/points_test.go +++ b/models/points_test.go @@ -89,6 +89,38 @@ func testPoint_cube(t *testing.T, f func(p models.Point)) { } } +func TestTag_Clone(t *testing.T) { + tag := models.Tag{Key: []byte("key"), Value: []byte("value")} + + c := tag.Clone() + + if &c.Key == &tag.Key || !bytes.Equal(c.Key, tag.Key) { + t.Fatalf("key %s should have been a clone of %s", c.Key, tag.Key) + } + + if &c.Value == &tag.Value || !bytes.Equal(c.Value, tag.Value) { + t.Fatalf("value %s should have been a clone of %s", c.Value, tag.Value) + } +} + +func TestTags_Clone(t *testing.T) { + tags := models.NewTags(map[string]string{"k1": "v1", "k2": "v2", "k3": "v3"}) + + clone := tags.Clone() + + for i := range tags { + tag := tags[i] + c := clone[i] + if &c.Key == &tag.Key || !bytes.Equal(c.Key, tag.Key) { + t.Fatalf("key %s should have been a clone of %s", c.Key, tag.Key) + } + + if &c.Value == &tag.Value || !bytes.Equal(c.Value, tag.Value) { + t.Fatalf("value %s should have been a clone of %s", c.Value, tag.Value) + } + } +} + var p models.Point func BenchmarkNewPoint(b *testing.B) { diff --git a/tsdb/shard.go b/tsdb/shard.go index 418558cf49..9f9857ac3c 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -580,6 +580,10 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point, continue } + // If the tags were created by models.ParsePointsWithPrecision, + // they refer to subslices of the buffer containing line protocol. + // To ensure we don't refer to that buffer, preventing that buffer from being garbage collected, clone the tags. + tags = tags.Clone() ss = s.index.CreateSeriesIndexIfNotExists(p.Name(), NewSeries(string(p.Key()), tags)) atomic.AddInt64(&s.stats.SeriesCreated, 1) }