test(tsdb/tsi1): test series id cache delete concurrently

Report the total number of gets, puts, and deletes at the end of the
test. I've found this kind of output to be a useful sanity check in
similar tests that exercise concurrency involving tasks.

Use a local random source in each goroutine. I unscientifically
eyeballed that to increase total operations by 5-10%.

Also call t.Parallel in a few more tests that involve disk access. This
shaves 1-2 seconds off the full tsi1 test suite on my machine.
pull/10616/head
Mark Rushakoff 2018-12-11 15:31:50 -08:00 committed by Mark Rushakoff
parent 5f781748c4
commit f383e8337a
3 changed files with 40 additions and 7 deletions

View File

@ -3,6 +3,7 @@ package tsi1
import (
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"
@ -152,49 +153,70 @@ func TestTagValueSeriesIDCache_addToSet(t *testing.T) {
if !newSeriesIDSet(20).Equals(ss) {
t.Fatalf("series id set was %v", ss)
}
}
func TestTagValueSeriesIDCache_ConcurrentGetPut(t *testing.T) {
func TestTagValueSeriesIDCache_ConcurrentGetPutDelete(t *testing.T) {
// Exercise concurrent operations against a series ID cache.
// This will catch any likely data races, when run with the race detector.
if testing.Short() {
t.Skip("Skipping long test")
}
a := []string{"a", "b", "c", "d", "e"}
rnd := func() []byte {
return []byte(a[rand.Intn(len(a)-1)])
t.Parallel()
const letters = "abcde"
rnd := func(rng *rand.Rand) []byte {
return []byte{letters[rng.Intn(len(letters)-1)]}
}
cache := TestCache{NewTagValueSeriesIDCache(100)}
done := make(chan struct{})
var wg sync.WaitGroup
var seriesIDCounter int32 // Atomic counter to ensure unique series IDs.
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// Local rng to avoid lock contention.
rng := rand.New(rand.NewSource(rand.Int63()))
for {
select {
case <-done:
return
default:
}
cache.Put(rnd(), rnd(), rnd(), newSeriesIDSet())
nextID := int(atomic.AddInt32(&seriesIDCounter, 1))
cache.Put(rnd(rng), rnd(rng), rnd(rng), newSeriesIDSet(nextID))
}
}()
}
var gets, deletes int32
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// Local rng to avoid lock contention.
rng := rand.New(rand.NewSource(rand.Int63()))
for {
select {
case <-done:
return
default:
}
_ = cache.Get(rnd(), rnd(), rnd())
name, key, value := rnd(rng), rnd(rng), rnd(rng)
if set := cache.Get(name, key, value); set != nil {
ids := set.Slice()
for _, id := range ids {
cache.Delete(name, key, value, tsdb.NewSeriesID(id))
atomic.AddInt32(&deletes, 1)
}
}
atomic.AddInt32(&gets, 1)
}
}()
}
@ -202,6 +224,7 @@ func TestTagValueSeriesIDCache_ConcurrentGetPut(t *testing.T) {
time.Sleep(10 * time.Second)
close(done)
wg.Wait()
t.Logf("Concurrently executed against series ID cache: gets=%d puts=%d deletes=%d", gets, seriesIDCounter, deletes)
}
type TestCache struct {

View File

@ -23,6 +23,8 @@ import (
// Ensure log file can append series.
func TestLogFile_AddSeriesList(t *testing.T) {
t.Parallel()
sfile := MustOpenSeriesFile()
defer sfile.Close()
@ -127,6 +129,8 @@ func TestLogFile_AddSeriesList(t *testing.T) {
}
func TestLogFile_SeriesStoredInOrder(t *testing.T) {
t.Parallel()
sfile := MustOpenSeriesFile()
defer sfile.Close()
@ -189,6 +193,8 @@ func TestLogFile_SeriesStoredInOrder(t *testing.T) {
// Ensure log file can delete an existing measurement.
func TestLogFile_DeleteMeasurement(t *testing.T) {
t.Parallel()
sfile := MustOpenSeriesFile()
defer sfile.Close()
@ -232,6 +238,8 @@ func TestLogFile_DeleteMeasurement(t *testing.T) {
// Ensure log file can recover correctly.
func TestLogFile_Open(t *testing.T) {
t.Parallel()
t.Run("Truncate", func(t *testing.T) {
sfile := MustOpenSeriesFile()
defer sfile.Close()

View File

@ -12,6 +12,8 @@ import (
)
func TestPartition_Open(t *testing.T) {
t.Parallel() // There's a bit of IO in this test.
sfile := MustOpenSeriesFile()
defer sfile.Close()