From cd00085e9e7eb3308c2784e04e0b326ba513c0d8 Mon Sep 17 00:00:00 2001 From: Joe LeGasse Date: Fri, 13 Jan 2017 13:11:09 -0500 Subject: [PATCH] Adjust Tags cloning This change delays Tag cloning until a new series is found, and will only clone Tags acquired from `ParsePoints...` and not those referencing the mmap-ed files (TSM) that are created on startup. --- models/points.go | 27 ++++++++++++++++++++++++++- tsdb/meta.go | 3 +++ tsdb/shard.go | 4 ---- 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/models/points.go b/models/points.go index a1643c78ed..3f7cd5e639 100644 --- a/models/points.go +++ b/models/points.go @@ -1323,6 +1323,11 @@ func (p *point) Tags() Tags { return p.cachedTags } p.cachedTags = parseTags(p.key) + + for i := range p.cachedTags { + p.cachedTags[i].shouldCopy = true + } + return p.cachedTags } @@ -1621,6 +1626,9 @@ func (p *point) Split(size int) []Point { type Tag struct { Key []byte Value []byte + + // shouldCopy returns whether or not a tag should be copied when Clone-ing + shouldCopy bool } // Clone returns a shallow copy of Tag. @@ -1628,6 +1636,10 @@ type Tag struct { // Tags associated with a Point created by ParsePointsWithPrecision will hold references to the byte slice that was parsed. // Use Clone to create a Tag with new byte slices that do not refer to the argument to ParsePointsWithPrecision. func (t Tag) Clone() Tag { + if !t.shouldCopy { + return t + } + other := Tag{ Key: make([]byte, len(t.Key)), Value: make([]byte, len(t.Value)), @@ -1655,11 +1667,24 @@ func NewTags(m map[string]string) Tags { return a } -// Clone returns a shallow copy of Tags. +// Clone returns a copy of the slice where the elements are a result of calling `Clone` on the original elements // // Tags associated with a Point created by ParsePointsWithPrecision will hold references to the byte slice that was parsed. // Use Clone to create Tags with new byte slices that do not refer to the argument to ParsePointsWithPrecision. func (a Tags) Clone() Tags { + if len(a) == 0 { + return nil + } + + needsClone := false + for i := 0; i < len(a) && !needsClone; i++ { + needsClone = a[i].shouldCopy + } + + if !needsClone { + return a + } + others := make(Tags, len(a)) for i := range a { others[i] = a[i].Clone() diff --git a/tsdb/meta.go b/tsdb/meta.go index c22b486334..dccb6ae5ac 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -172,6 +172,9 @@ func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, ser d.lastID++ series.measurement = m + + // Clone the tags to dereference any short-term buffers + series.Tags = series.Tags.Clone() d.series[series.Key] = series m.AddSeries(series) diff --git a/tsdb/shard.go b/tsdb/shard.go index 9f9857ac3c..418558cf49 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -580,10 +580,6 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point, continue } - // If the tags were created by models.ParsePointsWithPrecision, - // they refer to subslices of the buffer containing line protocol. - // To ensure we don't refer to that buffer, preventing that buffer from being garbage collected, clone the tags. - tags = tags.Clone() ss = s.index.CreateSeriesIndexIfNotExists(p.Name(), NewSeries(string(p.Key()), tags)) atomic.AddInt64(&s.stats.SeriesCreated, 1) }