Merge pull request #7832 from influxdata/mr-fix-http-write-memory-leak

Fix memory leak of retained HTTP write payloads
pull/7840/head v1.2.0-rc1
Mark Rushakoff 2017-01-13 12:04:24 -08:00 committed by GitHub
commit 63e5bae7b8
4 changed files with 96 additions and 0 deletions

View File

@ -27,6 +27,7 @@ The stress tool `influx_stress` will be removed in a subsequent release. We reco
### Bugfixes
- [#7832](https://github.com/influxdata/influxdb/pull/7832): Fix memory leak when writing new series over HTTP
- [#7786](https://github.com/influxdata/influxdb/pull/7786): Fix potential race condition in correctness of tsm1_cache memBytes statistic.
- [#7784](https://github.com/influxdata/influxdb/pull/7784): Fix broken error return on meta client's UpdateUser and DropContinuousQuery methods.
- [#7741](https://github.com/influxdata/influxdb/pull/7741): Fix string quoting and significantly improve performance of `influx_inspect export`.

View File

@ -233,6 +233,9 @@ func ParsePointsString(buf string) ([]Point, error) {
}
// ParseKey returns the measurement name and tags from a point.
//
// NOTE: to minimize heap allocations, the returned Tags will refer to subslices of buf.
// This can have the unintended effect preventing buf from being garbage collected.
func ParseKey(buf []byte) (string, Tags, error) {
// Ignore the error because scanMeasurement returns "missing fields" which we ignore
// when just parsing a key
@ -249,6 +252,9 @@ func ParseKey(buf []byte) (string, Tags, error) {
// ParsePointsWithPrecision is similar to ParsePoints, but allows the
// caller to provide a precision for time.
//
// NOTE: to minimize heap allocations, the returned Points will refer to subslices of buf.
// This can have the unintended effect preventing buf from being garbage collected.
func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) {
points := make([]Point, 0, bytes.Count(buf, []byte{'\n'})+1)
var (
@ -1317,6 +1323,11 @@ func (p *point) Tags() Tags {
return p.cachedTags
}
p.cachedTags = parseTags(p.key)
for i := range p.cachedTags {
p.cachedTags[i].shouldCopy = true
}
return p.cachedTags
}
@ -1615,6 +1626,29 @@ func (p *point) Split(size int) []Point {
type Tag struct {
Key []byte
Value []byte
// shouldCopy returns whether or not a tag should be copied when Clone-ing
shouldCopy bool
}
// Clone returns a shallow copy of Tag.
//
// Tags associated with a Point created by ParsePointsWithPrecision will hold references to the byte slice that was parsed.
// Use Clone to create a Tag with new byte slices that do not refer to the argument to ParsePointsWithPrecision.
func (t Tag) Clone() Tag {
if !t.shouldCopy {
return t
}
other := Tag{
Key: make([]byte, len(t.Key)),
Value: make([]byte, len(t.Value)),
}
copy(other.Key, t.Key)
copy(other.Value, t.Value)
return other
}
// Tags represents a sorted list of tags.
@ -1633,6 +1667,32 @@ func NewTags(m map[string]string) Tags {
return a
}
// Clone returns a copy of the slice where the elements are a result of calling `Clone` on the original elements
//
// Tags associated with a Point created by ParsePointsWithPrecision will hold references to the byte slice that was parsed.
// Use Clone to create Tags with new byte slices that do not refer to the argument to ParsePointsWithPrecision.
func (a Tags) Clone() Tags {
if len(a) == 0 {
return nil
}
needsClone := false
for i := 0; i < len(a) && !needsClone; i++ {
needsClone = a[i].shouldCopy
}
if !needsClone {
return a
}
others := make(Tags, len(a))
for i := range a {
others[i] = a[i].Clone()
}
return others
}
// Len implements sort.Interface.
func (a Tags) Len() int { return len(a) }

View File

@ -89,6 +89,38 @@ func testPoint_cube(t *testing.T, f func(p models.Point)) {
}
}
func TestTag_Clone(t *testing.T) {
tag := models.Tag{Key: []byte("key"), Value: []byte("value")}
c := tag.Clone()
if &c.Key == &tag.Key || !bytes.Equal(c.Key, tag.Key) {
t.Fatalf("key %s should have been a clone of %s", c.Key, tag.Key)
}
if &c.Value == &tag.Value || !bytes.Equal(c.Value, tag.Value) {
t.Fatalf("value %s should have been a clone of %s", c.Value, tag.Value)
}
}
func TestTags_Clone(t *testing.T) {
tags := models.NewTags(map[string]string{"k1": "v1", "k2": "v2", "k3": "v3"})
clone := tags.Clone()
for i := range tags {
tag := tags[i]
c := clone[i]
if &c.Key == &tag.Key || !bytes.Equal(c.Key, tag.Key) {
t.Fatalf("key %s should have been a clone of %s", c.Key, tag.Key)
}
if &c.Value == &tag.Value || !bytes.Equal(c.Value, tag.Value) {
t.Fatalf("value %s should have been a clone of %s", c.Value, tag.Value)
}
}
}
var p models.Point
func BenchmarkNewPoint(b *testing.B) {

View File

@ -172,6 +172,9 @@ func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, ser
d.lastID++
series.measurement = m
// Clone the tags to dereference any short-term buffers
series.Tags = series.Tags.Clone()
d.series[series.Key] = series
m.AddSeries(series)