Fix panic: interface conversion: tsm1.Value is *tsm1.FloatValue, not *tsm1.StringValue

If concurrent writes to the same shard occur, it's possible for different types to
be added to the cache for the same series.  The way the measurementFields map on the
shard is updated is racy in this scenario which would normally prevent this from occurring.
When this occurs, the snapshot compaction panics because it can't encode different types
in the same series.

To prevent this, we have the cache return an error a different type is added to existing
values in the cache.

Fixes #7498
pull/7533/head
Jason Wilder 2016-10-26 11:01:44 -06:00
parent e388912b6c
commit 873189e0c2
3 changed files with 183 additions and 13 deletions

View File

@ -12,6 +12,7 @@ import (
"time"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
)
var (
@ -34,24 +35,35 @@ func newEntry() *entry {
}
}
// newEntryValues returns a new instance of entry with the given values
func newEntryValues(values []Value) *entry {
// newEntryValues returns a new instance of entry with the given values. If the
// values are not valid, an error is returned.
func newEntryValues(values []Value) (*entry, error) {
e := &entry{values: values}
// No values, don't check types and ordering
if len(values) == 0 {
return e, nil
}
var prevTime int64
et := valueType(values[0])
for _, v := range values {
if v.UnixNano() <= prevTime {
e.needSort = true
break
}
prevTime = v.UnixNano()
// Make sure all the values are the same type
if et != valueType(v) {
return nil, tsdb.ErrFieldTypeConflict
}
}
return e
return e, nil
}
// add adds the given values to the entry.
func (e *entry) add(values []Value) {
func (e *entry) add(values []Value) error {
// See if the new values are sorted or contain duplicate timestamps
var (
prevTime int64
@ -75,6 +87,14 @@ func (e *entry) add(values []Value) {
if len(e.values) == 0 {
e.values = values
} else {
// Make sure the new values are the same type as the exiting values
et := valueType(e.values[0])
for _, v := range values {
if et != valueType(v) {
e.mu.Unlock()
return tsdb.ErrFieldTypeConflict
}
}
l := len(e.values)
lastValTime := e.values[l-1].UnixNano()
if lastValTime >= values[0].UnixNano() {
@ -83,6 +103,7 @@ func (e *entry) add(values []Value) {
e.values = append(e.values, values...)
}
e.mu.Unlock()
return nil
}
// deduplicate sorts and orders the entry's values. If values are already deduped and
@ -219,7 +240,10 @@ func (c *Cache) Write(key string, values []Value) error {
return ErrCacheMemoryExceeded
}
c.write(key, values)
if err := c.write(key, values); err != nil {
c.mu.Unlock()
return err
}
c.size += addedSize
c.mu.Unlock()
@ -231,7 +255,9 @@ func (c *Cache) Write(key string, values []Value) error {
}
// WriteMulti writes the map of keys and associated values to the cache. This function is goroutine-safe.
// It returns an error if the cache will exceeded its max size by adding the new values.
// It returns an error if the cache will exceeded its max size by adding the new values. The write attempts
// to write as many values as possible. If one key fails, the others can still succeed and an error will
// be returned.
func (c *Cache) WriteMulti(values map[string][]Value) error {
var totalSz uint64
for _, v := range values {
@ -246,8 +272,14 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
return ErrCacheMemoryExceeded
}
var werr error
for k, v := range values {
c.write(k, v)
if err := c.write(k, v); err != nil {
// write failed, hold onto the error and adjust
// the size delta
werr = err
totalSz -= uint64(Values(v).Size())
}
}
c.size += totalSz
c.mu.Unlock()
@ -256,7 +288,7 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
c.updateMemSize(int64(totalSz))
atomic.AddInt64(&c.stats.WriteOK, 1)
return nil
return werr
}
// Snapshot will take a snapshot of the current cache, add it to the slice of caches that
@ -491,13 +523,18 @@ func (c *Cache) values(key string) Values {
// write writes the set of values for the key to the cache. This function assumes
// the lock has been taken and does not enforce the cache size limits.
func (c *Cache) write(key string, values []Value) {
func (c *Cache) write(key string, values []Value) error {
e, ok := c.store[key]
if !ok {
c.store[key] = newEntryValues(values)
return
var err error
e, err = newEntryValues(values)
if err != nil {
return err
}
c.store[key] = e
return nil
}
e.add(values)
return e.add(values)
}
func (c *Cache) entry(key string) *entry {
@ -624,6 +661,21 @@ func (c *Cache) updateMemSize(b int64) {
atomic.AddInt64(&c.stats.MemSizeBytes, b)
}
func valueType(v Value) int {
switch v.(type) {
case *FloatValue:
return 1
case *IntegerValue:
return 2
case *StringValue:
return 3
case *BooleanValue:
return 4
default:
return 0
}
}
// Update the snapshotsCount and the diskSize levels
func (c *Cache) updateSnapshots() {
// Update disk stats

View File

@ -56,6 +56,27 @@ func TestCache_CacheWrite(t *testing.T) {
}
}
func TestCache_CacheWrite_TypeConflict(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, int(64))
values := Values{v0, v1}
valuesSize := v0.Size() + v1.Size()
c := NewCache(uint64(2*valuesSize), "")
if err := c.Write("foo", values[:1]); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}
if err := c.Write("foo", values[1:]); err == nil {
t.Fatalf("expected field type conflict")
}
if exp, got := uint64(v0.Size()), c.Size(); exp != got {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", exp, got)
}
}
func TestCache_CacheWriteMulti(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, 2.0)
@ -77,6 +98,28 @@ func TestCache_CacheWriteMulti(t *testing.T) {
}
}
func TestCache_CacheWriteMulti_TypeConflict(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, 2.0)
v2 := NewValue(3, int64(3))
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())
c := NewCache(3*valuesSize, "")
if err := c.WriteMulti(map[string][]Value{"foo": values[:1], "bar": values[1:]}); err == nil {
t.Fatalf(" expected field type conflict")
}
if exp, got := uint64(v0.Size()), c.Size(); exp != got {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", exp, got)
}
if exp, keys := []string{"foo"}, c.Keys(); !reflect.DeepEqual(keys, exp) {
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
}
}
func TestCache_Cache_DeleteRange(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, 2.0)

View File

@ -9,6 +9,7 @@ import (
"path/filepath"
"reflect"
"strings"
"sync"
"testing"
"time"
@ -361,6 +362,80 @@ func TestShardWriteAddNewField(t *testing.T) {
}
}
// Tests concurrently writing to the same shard with different field types which
// can trigger a panic when the shard is snapshotted to TSM files.
func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
if testing.Short() {
t.Skip()
}
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
index := tsdb.NewDatabaseIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
defer sh.Close()
points := make([]models.Point, 0, 1000)
for i := 0; i < cap(points); i++ {
if i < 500 {
points = append(points, models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(int64(i), 0),
))
} else {
points = append(points, models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": int64(1)},
time.Unix(int64(i), 0),
))
}
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < 50; i++ {
if err := sh.DeleteMeasurement("cpu", []string{"cpu,host=server"}); err != nil {
t.Fatalf(err.Error())
}
_ = sh.WritePoints(points[:500])
if f, err := sh.CreateSnapshot(); err == nil {
os.RemoveAll(f)
}
}
}()
go func() {
defer wg.Done()
for i := 0; i < 50; i++ {
if err := sh.DeleteMeasurement("cpu", []string{"cpu,host=server"}); err != nil {
t.Fatalf(err.Error())
}
_ = sh.WritePoints(points[500:])
if f, err := sh.CreateSnapshot(); err == nil {
os.RemoveAll(f)
}
}
}()
wg.Wait()
}
// Ensures that when a shard is closed, it removes any series meta-data
// from the index.
func TestShard_Close_RemoveIndex(t *testing.T) {