fix(storage/flux): fix a race condition in the tags cache (#17977)
The tags cache was not thread safe when called from multiple goroutines at the same time. It was intended that it would be, but the locking was done incorrectly and in too complicated a way. There was an assumption that the LRU would only be updated from a single thread which wasn't true at all. The tags cache has now been updated to include some test cases that test for race conditions and data validity. The tags cache itself has been changed to follow a simpler algorithm. 1. Obtain a read lock. 2. Check if the cached array can be used. 3. Release the read lock. 4. If the above was unusable or did not exist, create an array for the tag. 5. Obtain a write lock. 6. Check if the cached array should be replaced and replace if needed. 7. Move the entry to the front of the LRU. 8. Release the write lock. This simpler algorithm should ensure that this code is correct and that creating the array is still done outside of the lock since creating the array is the most expensive operation of the ones above.pull/18100/head
parent
8bf2d34bd7
commit
7379e784ab
|
@ -62,48 +62,80 @@ func (c *tagsCache) GetBounds(b execute.Bounds, l int, mem memory.Allocator) (st
|
|||
return start, stop
|
||||
}
|
||||
|
||||
if c.startColumn != nil {
|
||||
start = c.getOrReplaceBounds(&c.startColumn, b.Start, l, mem)
|
||||
} else {
|
||||
start = c.createBounds(b.Start, l, mem)
|
||||
start.Retain()
|
||||
c.startColumn = start
|
||||
// Retrieve the columns from the cache if they exist.
|
||||
c.mu.RLock()
|
||||
start, _ = c.getBoundsFromCache(c.startColumn, l)
|
||||
stop, _ = c.getBoundsFromCache(c.stopColumn, l)
|
||||
c.mu.RUnlock()
|
||||
|
||||
// If we could not retrieve an array from the cache,
|
||||
// create one here outside of the lock.
|
||||
// Record that we will need to replace the values in
|
||||
// the cache.
|
||||
replace := false
|
||||
if start == nil {
|
||||
start, replace = c.createBounds(b.Start, l, mem), true
|
||||
}
|
||||
if stop == nil {
|
||||
stop, replace = c.createBounds(b.Stop, l, mem), true
|
||||
}
|
||||
|
||||
if c.stopColumn != nil {
|
||||
stop = c.getOrReplaceBounds(&c.stopColumn, b.Stop, l, mem)
|
||||
} else {
|
||||
stop = c.createBounds(b.Stop, l, mem)
|
||||
stop.Retain()
|
||||
c.stopColumn = stop
|
||||
if !replace {
|
||||
// No need to retrieve the write lock.
|
||||
// Return now since we retrieved all values from
|
||||
// the cache.
|
||||
return start, stop
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
c.replaceBounds(&c.startColumn, start)
|
||||
c.replaceBounds(&c.stopColumn, stop)
|
||||
c.mu.Unlock()
|
||||
return start, stop
|
||||
}
|
||||
|
||||
// getOrReplaceBounds will get or replace an array of timestamps
|
||||
// and return a new reference to it.
|
||||
func (c *tagsCache) getOrReplaceBounds(arr **array.Int64, ts execute.Time, l int, mem memory.Allocator) *array.Int64 {
|
||||
if (*arr).Len() < l {
|
||||
(*arr).Release()
|
||||
*arr = c.createBounds(ts, l, mem)
|
||||
(*arr).Retain()
|
||||
return *arr
|
||||
} else if (*arr).Len() == l {
|
||||
(*arr).Retain()
|
||||
return *arr
|
||||
// getBoundsFromCache will return an array of values
|
||||
// if the array in the cache is of the appropriate size.
|
||||
// This must be called from inside of a lock.
|
||||
func (c *tagsCache) getBoundsFromCache(arr *array.Int64, l int) (*array.Int64, bool) {
|
||||
if arr == nil || arr.Len() < l {
|
||||
return nil, false
|
||||
} else if arr.Len() == l {
|
||||
arr.Retain()
|
||||
return arr, true
|
||||
}
|
||||
|
||||
// If the lengths do not match, but the cached array is less
|
||||
// than the desired array, then we can use slice.
|
||||
// NewSlice will automatically create a new reference to the
|
||||
// passed in array so we do not need to manually retain.
|
||||
data := array.NewSliceData((*arr).Data(), 0, int64(l))
|
||||
data := array.NewSliceData(arr.Data(), 0, int64(l))
|
||||
vs := array.NewInt64Data(data)
|
||||
data.Release()
|
||||
return vs
|
||||
return vs, true
|
||||
}
|
||||
|
||||
// replaceBounds will examine the array and replace it if
|
||||
// the length of the array is greater than the current array
|
||||
// or if there isn't an array in the cache.
|
||||
// This must be called from inside of a write lock.
|
||||
func (c *tagsCache) replaceBounds(cache **array.Int64, arr *array.Int64) {
|
||||
if *cache != nil {
|
||||
if (*cache).Len() >= arr.Len() {
|
||||
// The cached value is longer so just keep it.
|
||||
return
|
||||
}
|
||||
(*cache).Release()
|
||||
}
|
||||
arr.Retain()
|
||||
*cache = arr
|
||||
}
|
||||
|
||||
// createBounds will create an array of times for the given time with
|
||||
// the given length.
|
||||
//
|
||||
// DO NOT CALL THIS METHOD IN A LOCK. It is slow and will probably
|
||||
// cause lock contention.
|
||||
func (c *tagsCache) createBounds(ts execute.Time, l int, mem memory.Allocator) *array.Int64 {
|
||||
b := array.NewInt64Builder(mem)
|
||||
b.Resize(l)
|
||||
|
@ -119,49 +151,39 @@ func (c *tagsCache) createBounds(ts execute.Time, l int, mem memory.Allocator) *
|
|||
// a reference to the data will be retained and returned.
|
||||
// Otherwise, the allocator will be used to construct a new column.
|
||||
func (c *tagsCache) GetTag(value string, l int, mem memory.Allocator) *array.Binary {
|
||||
if l == 0 {
|
||||
if l == 0 || c == nil {
|
||||
return c.createTag(value, l, mem)
|
||||
}
|
||||
|
||||
c.mu.RLock()
|
||||
elem, ok := c.tags[value]
|
||||
c.mu.RUnlock()
|
||||
if ok {
|
||||
return c.getOrReplaceTag(elem, value, l, mem)
|
||||
// Attempt to retrieve the array from the cache.
|
||||
arr, ok := c.getTagFromCache(value, l)
|
||||
if !ok {
|
||||
// The array is not in the cache so create it.
|
||||
arr = c.createTag(value, l, mem)
|
||||
}
|
||||
|
||||
arr := c.createTag(value, l, mem)
|
||||
if c.lru == nil {
|
||||
c.lru = list.New()
|
||||
}
|
||||
c.mu.Lock()
|
||||
if c.tags == nil {
|
||||
c.tags = make(map[string]*list.Element)
|
||||
}
|
||||
c.tags[value] = c.lru.PushFront(arr)
|
||||
c.mu.Unlock()
|
||||
c.maintainLRU()
|
||||
arr.Retain()
|
||||
c.touchOrReplaceTag(arr)
|
||||
return arr
|
||||
}
|
||||
|
||||
func (c *tagsCache) getOrReplaceTag(elem *list.Element, value string, l int, mem memory.Allocator) *array.Binary {
|
||||
// Move this element to the front of the lru.
|
||||
c.lru.MoveBefore(elem, c.lru.Front())
|
||||
// getTagFromCache will return an array of values with the
|
||||
// specified value at the specified length. If there is no
|
||||
// cache entry or the entry is not large enough for the
|
||||
// specified length, then this returns false.
|
||||
func (c *tagsCache) getTagFromCache(value string, l int) (*array.Binary, bool) {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
elem, ok := c.tags[value]
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Determine if the array can be reused.
|
||||
arr := elem.Value.(*array.Binary)
|
||||
if arr.Len() < l {
|
||||
// Create a new array with the appropriate length since
|
||||
// this one cannot be reused here.
|
||||
arr.Release()
|
||||
arr = c.createTag(value, l, mem)
|
||||
elem.Value = arr
|
||||
if arr.Len() == l {
|
||||
arr.Retain()
|
||||
return arr
|
||||
} else if arr.Len() == l {
|
||||
arr.Retain()
|
||||
return arr
|
||||
return arr, true
|
||||
} else if arr.Len() < l {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// If the lengths do not match, but the cached array is less
|
||||
|
@ -171,12 +193,52 @@ func (c *tagsCache) getOrReplaceTag(elem *list.Element, value string, l int, mem
|
|||
data := array.NewSliceData(arr.Data(), 0, int64(l))
|
||||
vs := array.NewBinaryData(data)
|
||||
data.Release()
|
||||
return vs
|
||||
return vs, true
|
||||
}
|
||||
|
||||
// touchOrReplaceTag will update the LRU cache to have
|
||||
// the value specified by the array as the most recently
|
||||
// used entry. If the cache entry does not exist or the
|
||||
// current array in the cache is shorter than this one,
|
||||
// it will replace the array.
|
||||
func (c *tagsCache) touchOrReplaceTag(arr *array.Binary) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.lru == nil {
|
||||
c.lru = list.New()
|
||||
}
|
||||
if c.tags == nil {
|
||||
c.tags = make(map[string]*list.Element)
|
||||
}
|
||||
|
||||
value := arr.ValueString(0)
|
||||
if elem, ok := c.tags[value]; ok {
|
||||
// If the array in the cache is longer to or
|
||||
// equal to the current tag, then do not touch it.
|
||||
carr := elem.Value.(*array.Binary)
|
||||
if carr.Len() < arr.Len() {
|
||||
// Retain this array again and release our
|
||||
// previous reference to the other array.
|
||||
arr.Retain()
|
||||
elem.Value = arr
|
||||
carr.Release()
|
||||
}
|
||||
|
||||
// Move this element to the front of the lru.
|
||||
c.lru.MoveBefore(elem, c.lru.Front())
|
||||
} else {
|
||||
arr.Retain()
|
||||
c.tags[value] = c.lru.PushFront(arr)
|
||||
}
|
||||
c.maintainLRU()
|
||||
}
|
||||
|
||||
// maintainLRU will ensure the lru cache maintains the appropriate
|
||||
// length by ejecting the least recently used value from the cache
|
||||
// until the cache is the appropriate size.
|
||||
//
|
||||
// This function must be called from inside of a lock.
|
||||
func (c *tagsCache) maintainLRU() {
|
||||
max := c.maxLength
|
||||
if max == 0 {
|
||||
|
@ -187,12 +249,15 @@ func (c *tagsCache) maintainLRU() {
|
|||
}
|
||||
arr := c.lru.Remove(c.lru.Back()).(*array.Binary)
|
||||
value := arr.ValueString(0)
|
||||
c.mu.Lock()
|
||||
delete(c.tags, value)
|
||||
c.mu.Unlock()
|
||||
arr.Release()
|
||||
}
|
||||
|
||||
// createTag will create a new array for a tag with the given
|
||||
// length.
|
||||
//
|
||||
// DO NOT CALL THIS METHOD IN A LOCK. It is slow and will probably
|
||||
// cause lock contention.
|
||||
func (c *tagsCache) createTag(value string, l int, mem memory.Allocator) *array.Binary {
|
||||
b := array.NewBinaryBuilder(mem, arrow.BinaryTypes.String)
|
||||
b.Resize(l)
|
||||
|
@ -205,6 +270,9 @@ func (c *tagsCache) createTag(value string, l int, mem memory.Allocator) *array.
|
|||
|
||||
// Release will release all references to cached tag columns.
|
||||
func (c *tagsCache) Release() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.startColumn != nil {
|
||||
c.startColumn.Release()
|
||||
c.startColumn = nil
|
||||
|
@ -215,11 +283,9 @@ func (c *tagsCache) Release() {
|
|||
c.stopColumn = nil
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
for _, elem := range c.tags {
|
||||
elem.Value.(*array.Binary).Release()
|
||||
}
|
||||
c.tags = nil
|
||||
c.mu.Unlock()
|
||||
c.lru = nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,130 @@
|
|||
package storageflux
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/apache/arrow/go/arrow/memory"
|
||||
"github.com/influxdata/flux/execute"
|
||||
)
|
||||
|
||||
func TestTagsCache_GetBounds_Concurrency(t *testing.T) {
|
||||
// Concurrently use the tags cache by retrieving
|
||||
// a tag of random sizes and then iterating over the
|
||||
// retrieved tag. The test should exceed the cache's
|
||||
// size so we get values being evicted.
|
||||
cache := newTagsCache(4)
|
||||
bounds := execute.Bounds{
|
||||
Start: execute.Time(time.Second),
|
||||
Stop: execute.Time(2 * time.Second),
|
||||
}
|
||||
mem := NewCheckedAllocator(memory.DefaultAllocator)
|
||||
defer mem.AssertSize(t, 0)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 32; i++ {
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
|
||||
for j := 0; j < 128; j++ {
|
||||
l := rand.Intn(128) + 1
|
||||
start, stop := cache.GetBounds(bounds, l, mem)
|
||||
for i := 0; i < l; i++ {
|
||||
if want, got := int64(bounds.Start), start.Value(i); want != got {
|
||||
t.Errorf("unexpected value in start array: %d != %d", want, got)
|
||||
start.Release()
|
||||
stop.Release()
|
||||
return
|
||||
}
|
||||
if want, got := int64(bounds.Stop), stop.Value(i); want != got {
|
||||
t.Errorf("unexpected value in stop array: %d != %d", want, got)
|
||||
start.Release()
|
||||
stop.Release()
|
||||
return
|
||||
}
|
||||
}
|
||||
start.Release()
|
||||
stop.Release()
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
cache.Release()
|
||||
}
|
||||
|
||||
func TestTagsCache_GetTags_Concurrency(t *testing.T) {
|
||||
// Concurrently use the tags cache by retrieving
|
||||
// a tag of random sizes and then iterating over the
|
||||
// retrieved tag. The test should exceed the cache's
|
||||
// size so we get values being evicted.
|
||||
cache := newTagsCache(4)
|
||||
mem := NewCheckedAllocator(memory.DefaultAllocator)
|
||||
defer mem.AssertSize(t, 0)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 32; i++ {
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
|
||||
// Cardinality of 8 so it exceeds the cache size
|
||||
// but we also reuse tags across different goroutines.
|
||||
value := fmt.Sprintf("t%d", i%8)
|
||||
for j := 0; j < 128; j++ {
|
||||
l := rand.Intn(128) + 1
|
||||
vs := cache.GetTag(value, l, mem)
|
||||
for i := 0; i < l; i++ {
|
||||
if want, got := value, vs.ValueString(i); want != got {
|
||||
t.Errorf("unexpected value in array: %s != %s", want, got)
|
||||
vs.Release()
|
||||
return
|
||||
}
|
||||
}
|
||||
vs.Release()
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
cache.Release()
|
||||
}
|
||||
|
||||
type CheckedAllocator struct {
|
||||
mem *memory.CheckedAllocator
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewCheckedAllocator(mem memory.Allocator) *CheckedAllocator {
|
||||
return &CheckedAllocator{
|
||||
mem: memory.NewCheckedAllocator(mem),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CheckedAllocator) Allocate(size int) []byte {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.mem.Allocate(size)
|
||||
}
|
||||
|
||||
func (c *CheckedAllocator) Reallocate(size int, b []byte) []byte {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.mem.Reallocate(size, b)
|
||||
}
|
||||
|
||||
func (c *CheckedAllocator) Free(b []byte) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.mem.Free(b)
|
||||
}
|
||||
|
||||
func (c *CheckedAllocator) AssertSize(t memory.TestingT, sz int) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.mem.AssertSize(t, sz)
|
||||
}
|
Loading…
Reference in New Issue