From edb5e56881585097adb8641f26746dd7ca532fec Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Thu, 12 Nov 2020 06:57:29 -0700 Subject: [PATCH] fix: Add locking during tsi iterator creation. This commit fixes a locking issue that caused the `TestShard_WritePoints_FieldConflictConcurrent` test to fail. --- tsdb/index/tsi1/log_file.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/tsdb/index/tsi1/log_file.go b/tsdb/index/tsi1/log_file.go index acf75457eb..b7f3476561 100644 --- a/tsdb/index/tsi1/log_file.go +++ b/tsdb/index/tsi1/log_file.go @@ -368,7 +368,7 @@ func (f *LogFile) TagKeyIterator(name []byte) TagKeyIterator { for _, k := range mm.tagSet { a = append(a, k) } - return newLogTagKeyIterator(a) + return newLogTagKeyIterator(f, a) } // TagKey returns a tag key element. @@ -766,6 +766,7 @@ func (f *LogFile) createMeasurementIfNotExists(name []byte) *logMeasurement { mm := f.mms[string(name)] if mm == nil { mm = &logMeasurement{ + f: f, name: name, tagSet: make(map[string]logTagKey), series: make(map[uint64]struct{}), @@ -1204,6 +1205,7 @@ func (mms *logMeasurements) bytes() int { } type logMeasurement struct { + f *LogFile name []byte tagSet map[string]logTagKey deleted bool @@ -1304,7 +1306,7 @@ func (m *logMeasurement) Deleted() bool { return m.deleted } func (m *logMeasurement) createTagSetIfNotExists(key []byte) logTagKey { ts, ok := m.tagSet[string(key)] if !ok { - ts = logTagKey{name: key, tagValues: make(map[string]logTagValue)} + ts = logTagKey{f: m.f, name: key, tagValues: make(map[string]logTagValue)} } return ts } @@ -1341,6 +1343,7 @@ func (itr *logMeasurementIterator) Next() (e MeasurementElem) { } type logTagKey struct { + f *LogFile name []byte deleted bool tagValues map[string]logTagValue @@ -1362,10 +1365,13 @@ func (tk *logTagKey) Key() []byte { return tk.name } func (tk *logTagKey) Deleted() bool { return tk.deleted } func (tk *logTagKey) TagValueIterator() TagValueIterator { + tk.f.mu.RLock() a := make([]logTagValue, 0, len(tk.tagValues)) for _, v := range tk.tagValues { a = append(a, v) } + tk.f.mu.RUnlock() + return newLogTagValueIterator(a) } @@ -1459,13 +1465,14 @@ func (a logTagValueSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, // logTagKeyIterator represents an iterator over a slice of tag keys. type logTagKeyIterator struct { + f *LogFile a []logTagKey } // newLogTagKeyIterator returns a new instance of logTagKeyIterator. -func newLogTagKeyIterator(a []logTagKey) *logTagKeyIterator { +func newLogTagKeyIterator(f *LogFile, a []logTagKey) *logTagKeyIterator { sort.Sort(logTagKeySlice(a)) - return &logTagKeyIterator{a: a} + return &logTagKeyIterator{f: f, a: a} } // Next returns the next element in the iterator.