Fix memory leak of retained HTTP write payloads
This leak seems to have been introduced in 8aa224b22d
,
present in 1.1.0 and 1.1.1.
When points were parsed from HTTP payloads, their tags and fields
referred to subslices of the request body; if any tag set introduced a
new series, then those tags then were stored in the in-memory series
index objects, preventing the HTTP body from being garbage collected. If
there were no new series in the payload, then the request body would be
garbage collected as usual.
Now, we clone the tags before we store them in the index. This is an
imperfect fix because the Point still holds references to the original
tags, and the Point's field iterator also refers to the payload buffer.
However, the current write code path does not retain references to the
Point or its fields; and this change will likely be obsoleted when TSI
is introduced.
This change likely fixes #7827, #7810, #7778, and perhaps others.
pull/7832/head
parent
dcb379779a
commit
cdbdd156f3
|
@ -233,6 +233,9 @@ func ParsePointsString(buf string) ([]Point, error) {
|
|||
}
|
||||
|
||||
// ParseKey returns the measurement name and tags from a point.
|
||||
//
|
||||
// NOTE: to minimize heap allocations, the returned Tags will refer to subslices of buf.
|
||||
// This can have the unintended effect preventing buf from being garbage collected.
|
||||
func ParseKey(buf []byte) (string, Tags, error) {
|
||||
// Ignore the error because scanMeasurement returns "missing fields" which we ignore
|
||||
// when just parsing a key
|
||||
|
@ -249,6 +252,9 @@ func ParseKey(buf []byte) (string, Tags, error) {
|
|||
|
||||
// ParsePointsWithPrecision is similar to ParsePoints, but allows the
|
||||
// caller to provide a precision for time.
|
||||
//
|
||||
// NOTE: to minimize heap allocations, the returned Points will refer to subslices of buf.
|
||||
// This can have the unintended effect preventing buf from being garbage collected.
|
||||
func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) {
|
||||
points := make([]Point, 0, bytes.Count(buf, []byte{'\n'})+1)
|
||||
var (
|
||||
|
@ -1617,6 +1623,22 @@ type Tag struct {
|
|||
Value []byte
|
||||
}
|
||||
|
||||
// Clone returns a shallow copy of Tag.
|
||||
//
|
||||
// Tags associated with a Point created by ParsePointsWithPrecision will hold references to the byte slice that was parsed.
|
||||
// Use Clone to create a Tag with new byte slices that do not refer to the argument to ParsePointsWithPrecision.
|
||||
func (t Tag) Clone() Tag {
|
||||
other := Tag{
|
||||
Key: make([]byte, len(t.Key)),
|
||||
Value: make([]byte, len(t.Value)),
|
||||
}
|
||||
|
||||
copy(other.Key, t.Key)
|
||||
copy(other.Value, t.Value)
|
||||
|
||||
return other
|
||||
}
|
||||
|
||||
// Tags represents a sorted list of tags.
|
||||
type Tags []Tag
|
||||
|
||||
|
@ -1633,6 +1655,19 @@ func NewTags(m map[string]string) Tags {
|
|||
return a
|
||||
}
|
||||
|
||||
// Clone returns a shallow copy of Tags.
|
||||
//
|
||||
// Tags associated with a Point created by ParsePointsWithPrecision will hold references to the byte slice that was parsed.
|
||||
// Use Clone to create Tags with new byte slices that do not refer to the argument to ParsePointsWithPrecision.
|
||||
func (a Tags) Clone() Tags {
|
||||
others := make(Tags, len(a))
|
||||
for i := range a {
|
||||
others[i] = a[i].Clone()
|
||||
}
|
||||
|
||||
return others
|
||||
}
|
||||
|
||||
// Len implements sort.Interface.
|
||||
func (a Tags) Len() int { return len(a) }
|
||||
|
||||
|
|
|
@ -89,6 +89,38 @@ func testPoint_cube(t *testing.T, f func(p models.Point)) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestTag_Clone(t *testing.T) {
|
||||
tag := models.Tag{Key: []byte("key"), Value: []byte("value")}
|
||||
|
||||
c := tag.Clone()
|
||||
|
||||
if &c.Key == &tag.Key || !bytes.Equal(c.Key, tag.Key) {
|
||||
t.Fatalf("key %s should have been a clone of %s", c.Key, tag.Key)
|
||||
}
|
||||
|
||||
if &c.Value == &tag.Value || !bytes.Equal(c.Value, tag.Value) {
|
||||
t.Fatalf("value %s should have been a clone of %s", c.Value, tag.Value)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTags_Clone(t *testing.T) {
|
||||
tags := models.NewTags(map[string]string{"k1": "v1", "k2": "v2", "k3": "v3"})
|
||||
|
||||
clone := tags.Clone()
|
||||
|
||||
for i := range tags {
|
||||
tag := tags[i]
|
||||
c := clone[i]
|
||||
if &c.Key == &tag.Key || !bytes.Equal(c.Key, tag.Key) {
|
||||
t.Fatalf("key %s should have been a clone of %s", c.Key, tag.Key)
|
||||
}
|
||||
|
||||
if &c.Value == &tag.Value || !bytes.Equal(c.Value, tag.Value) {
|
||||
t.Fatalf("value %s should have been a clone of %s", c.Value, tag.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var p models.Point
|
||||
|
||||
func BenchmarkNewPoint(b *testing.B) {
|
||||
|
|
|
@ -580,6 +580,10 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
|
|||
continue
|
||||
}
|
||||
|
||||
// If the tags were created by models.ParsePointsWithPrecision,
|
||||
// they refer to subslices of the buffer containing line protocol.
|
||||
// To ensure we don't refer to that buffer, preventing that buffer from being garbage collected, clone the tags.
|
||||
tags = tags.Clone()
|
||||
ss = s.index.CreateSeriesIndexIfNotExists(p.Name(), NewSeries(string(p.Key()), tags))
|
||||
atomic.AddInt64(&s.stats.SeriesCreated, 1)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue