fix: Add locking during tsi iterator creation.

This commit fixes a locking issue that caused the `TestShard_WritePoints_FieldConflictConcurrent`
test to fail.
pull/20008/head
Ben Johnson 2020-11-12 06:57:29 -07:00
parent b9fc93eb85
commit edb5e56881
1 changed files with 11 additions and 4 deletions

View File

@ -368,7 +368,7 @@ func (f *LogFile) TagKeyIterator(name []byte) TagKeyIterator {
for _, k := range mm.tagSet { for _, k := range mm.tagSet {
a = append(a, k) a = append(a, k)
} }
return newLogTagKeyIterator(a) return newLogTagKeyIterator(f, a)
} }
// TagKey returns a tag key element. // TagKey returns a tag key element.
@ -766,6 +766,7 @@ func (f *LogFile) createMeasurementIfNotExists(name []byte) *logMeasurement {
mm := f.mms[string(name)] mm := f.mms[string(name)]
if mm == nil { if mm == nil {
mm = &logMeasurement{ mm = &logMeasurement{
f: f,
name: name, name: name,
tagSet: make(map[string]logTagKey), tagSet: make(map[string]logTagKey),
series: make(map[uint64]struct{}), series: make(map[uint64]struct{}),
@ -1204,6 +1205,7 @@ func (mms *logMeasurements) bytes() int {
} }
type logMeasurement struct { type logMeasurement struct {
f *LogFile
name []byte name []byte
tagSet map[string]logTagKey tagSet map[string]logTagKey
deleted bool deleted bool
@ -1304,7 +1306,7 @@ func (m *logMeasurement) Deleted() bool { return m.deleted }
func (m *logMeasurement) createTagSetIfNotExists(key []byte) logTagKey { func (m *logMeasurement) createTagSetIfNotExists(key []byte) logTagKey {
ts, ok := m.tagSet[string(key)] ts, ok := m.tagSet[string(key)]
if !ok { if !ok {
ts = logTagKey{name: key, tagValues: make(map[string]logTagValue)} ts = logTagKey{f: m.f, name: key, tagValues: make(map[string]logTagValue)}
} }
return ts return ts
} }
@ -1341,6 +1343,7 @@ func (itr *logMeasurementIterator) Next() (e MeasurementElem) {
} }
type logTagKey struct { type logTagKey struct {
f *LogFile
name []byte name []byte
deleted bool deleted bool
tagValues map[string]logTagValue tagValues map[string]logTagValue
@ -1362,10 +1365,13 @@ func (tk *logTagKey) Key() []byte { return tk.name }
func (tk *logTagKey) Deleted() bool { return tk.deleted } func (tk *logTagKey) Deleted() bool { return tk.deleted }
func (tk *logTagKey) TagValueIterator() TagValueIterator { func (tk *logTagKey) TagValueIterator() TagValueIterator {
tk.f.mu.RLock()
a := make([]logTagValue, 0, len(tk.tagValues)) a := make([]logTagValue, 0, len(tk.tagValues))
for _, v := range tk.tagValues { for _, v := range tk.tagValues {
a = append(a, v) a = append(a, v)
} }
tk.f.mu.RUnlock()
return newLogTagValueIterator(a) return newLogTagValueIterator(a)
} }
@ -1459,13 +1465,14 @@ func (a logTagValueSlice) Less(i, j int) bool { return bytes.Compare(a[i].name,
// logTagKeyIterator represents an iterator over a slice of tag keys. // logTagKeyIterator represents an iterator over a slice of tag keys.
type logTagKeyIterator struct { type logTagKeyIterator struct {
f *LogFile
a []logTagKey a []logTagKey
} }
// newLogTagKeyIterator returns a new instance of logTagKeyIterator. // newLogTagKeyIterator returns a new instance of logTagKeyIterator.
func newLogTagKeyIterator(a []logTagKey) *logTagKeyIterator { func newLogTagKeyIterator(f *LogFile, a []logTagKey) *logTagKeyIterator {
sort.Sort(logTagKeySlice(a)) sort.Sort(logTagKeySlice(a))
return &logTagKeyIterator{a: a} return &logTagKeyIterator{f: f, a: a}
} }
// Next returns the next element in the iterator. // Next returns the next element in the iterator.