fix(storageflux): fix data race on tags map (#17702)
parent
b1ba20c79c
commit
80e71d6ee0
|
|
@ -2,6 +2,7 @@ package storageflux
|
|||
|
||||
import (
|
||||
"container/list"
|
||||
"sync"
|
||||
|
||||
"github.com/apache/arrow/go/arrow"
|
||||
"github.com/apache/arrow/go/arrow/array"
|
||||
|
|
@ -39,6 +40,7 @@ type tagsCache struct {
|
|||
// An array can get shared among two different tag keys that
|
||||
// have the same value.
|
||||
tags map[string]*list.Element
|
||||
mu sync.RWMutex
|
||||
lru *list.List
|
||||
maxLength int
|
||||
}
|
||||
|
|
@ -121,7 +123,10 @@ func (c *tagsCache) GetTag(value string, l int, mem memory.Allocator) *array.Bin
|
|||
return c.createTag(value, l, mem)
|
||||
}
|
||||
|
||||
if elem, ok := c.tags[value]; ok {
|
||||
c.mu.RLock()
|
||||
elem, ok := c.tags[value]
|
||||
c.mu.RUnlock()
|
||||
if ok {
|
||||
return c.getOrReplaceTag(elem, value, l, mem)
|
||||
}
|
||||
|
||||
|
|
@ -129,10 +134,12 @@ func (c *tagsCache) GetTag(value string, l int, mem memory.Allocator) *array.Bin
|
|||
if c.lru == nil {
|
||||
c.lru = list.New()
|
||||
}
|
||||
c.mu.Lock()
|
||||
if c.tags == nil {
|
||||
c.tags = make(map[string]*list.Element)
|
||||
}
|
||||
c.tags[value] = c.lru.PushFront(arr)
|
||||
c.mu.Unlock()
|
||||
c.maintainLRU()
|
||||
arr.Retain()
|
||||
return arr
|
||||
|
|
@ -180,7 +187,9 @@ func (c *tagsCache) maintainLRU() {
|
|||
}
|
||||
arr := c.lru.Remove(c.lru.Back()).(*array.Binary)
|
||||
value := arr.ValueString(0)
|
||||
c.mu.Lock()
|
||||
delete(c.tags, value)
|
||||
c.mu.Unlock()
|
||||
arr.Release()
|
||||
}
|
||||
|
||||
|
|
@ -206,9 +215,11 @@ func (c *tagsCache) Release() {
|
|||
c.stopColumn = nil
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
for _, elem := range c.tags {
|
||||
elem.Value.(*array.Binary).Release()
|
||||
}
|
||||
c.tags = nil
|
||||
c.mu.Unlock()
|
||||
c.lru = nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue