Adjust Tags cloning
This change delays Tag cloning until a new series is found, and will only clone Tags acquired from `ParsePoints...` and not those referencing the mmap-ed files (TSM) that are created on startup.pull/7832/head
parent
7964a87310
commit
cd00085e9e
|
@ -1323,6 +1323,11 @@ func (p *point) Tags() Tags {
|
|||
return p.cachedTags
|
||||
}
|
||||
p.cachedTags = parseTags(p.key)
|
||||
|
||||
for i := range p.cachedTags {
|
||||
p.cachedTags[i].shouldCopy = true
|
||||
}
|
||||
|
||||
return p.cachedTags
|
||||
}
|
||||
|
||||
|
@ -1621,6 +1626,9 @@ func (p *point) Split(size int) []Point {
|
|||
type Tag struct {
|
||||
Key []byte
|
||||
Value []byte
|
||||
|
||||
// shouldCopy returns whether or not a tag should be copied when Clone-ing
|
||||
shouldCopy bool
|
||||
}
|
||||
|
||||
// Clone returns a shallow copy of Tag.
|
||||
|
@ -1628,6 +1636,10 @@ type Tag struct {
|
|||
// Tags associated with a Point created by ParsePointsWithPrecision will hold references to the byte slice that was parsed.
|
||||
// Use Clone to create a Tag with new byte slices that do not refer to the argument to ParsePointsWithPrecision.
|
||||
func (t Tag) Clone() Tag {
|
||||
if !t.shouldCopy {
|
||||
return t
|
||||
}
|
||||
|
||||
other := Tag{
|
||||
Key: make([]byte, len(t.Key)),
|
||||
Value: make([]byte, len(t.Value)),
|
||||
|
@ -1655,11 +1667,24 @@ func NewTags(m map[string]string) Tags {
|
|||
return a
|
||||
}
|
||||
|
||||
// Clone returns a shallow copy of Tags.
|
||||
// Clone returns a copy of the slice where the elements are a result of calling `Clone` on the original elements
|
||||
//
|
||||
// Tags associated with a Point created by ParsePointsWithPrecision will hold references to the byte slice that was parsed.
|
||||
// Use Clone to create Tags with new byte slices that do not refer to the argument to ParsePointsWithPrecision.
|
||||
func (a Tags) Clone() Tags {
|
||||
if len(a) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
needsClone := false
|
||||
for i := 0; i < len(a) && !needsClone; i++ {
|
||||
needsClone = a[i].shouldCopy
|
||||
}
|
||||
|
||||
if !needsClone {
|
||||
return a
|
||||
}
|
||||
|
||||
others := make(Tags, len(a))
|
||||
for i := range a {
|
||||
others[i] = a[i].Clone()
|
||||
|
|
|
@ -172,6 +172,9 @@ func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, ser
|
|||
d.lastID++
|
||||
|
||||
series.measurement = m
|
||||
|
||||
// Clone the tags to dereference any short-term buffers
|
||||
series.Tags = series.Tags.Clone()
|
||||
d.series[series.Key] = series
|
||||
|
||||
m.AddSeries(series)
|
||||
|
|
|
@ -580,10 +580,6 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
|
|||
continue
|
||||
}
|
||||
|
||||
// If the tags were created by models.ParsePointsWithPrecision,
|
||||
// they refer to subslices of the buffer containing line protocol.
|
||||
// To ensure we don't refer to that buffer, preventing that buffer from being garbage collected, clone the tags.
|
||||
tags = tags.Clone()
|
||||
ss = s.index.CreateSeriesIndexIfNotExists(p.Name(), NewSeries(string(p.Key()), tags))
|
||||
atomic.AddInt64(&s.stats.SeriesCreated, 1)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue