From 873189e0c23841323090dc74c8de96d2369abada Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Wed, 26 Oct 2016 11:01:44 -0600 Subject: [PATCH] Fix panic: interface conversion: tsm1.Value is *tsm1.FloatValue, not *tsm1.StringValue If concurrent writes to the same shard occur, it's possible for different types to be added to the cache for the same series. The way the measurementFields map on the shard is updated is racy in this scenario which would normally prevent this from occurring. When this occurs, the snapshot compaction panics because it can't encode different types in the same series. To prevent this, we have the cache return an error a different type is added to existing values in the cache. Fixes #7498 --- tsdb/engine/tsm1/cache.go | 78 ++++++++++++++++++++++++++++------ tsdb/engine/tsm1/cache_test.go | 43 +++++++++++++++++++ tsdb/shard_test.go | 75 ++++++++++++++++++++++++++++++++ 3 files changed, 183 insertions(+), 13 deletions(-) diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index cd267a09ca..d511b33bd5 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -12,6 +12,7 @@ import ( "time" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/tsdb" ) var ( @@ -34,24 +35,35 @@ func newEntry() *entry { } } -// newEntryValues returns a new instance of entry with the given values -func newEntryValues(values []Value) *entry { +// newEntryValues returns a new instance of entry with the given values. If the +// values are not valid, an error is returned. +func newEntryValues(values []Value) (*entry, error) { e := &entry{values: values} + // No values, don't check types and ordering + if len(values) == 0 { + return e, nil + } + var prevTime int64 + et := valueType(values[0]) for _, v := range values { if v.UnixNano() <= prevTime { e.needSort = true - break } prevTime = v.UnixNano() + + // Make sure all the values are the same type + if et != valueType(v) { + return nil, tsdb.ErrFieldTypeConflict + } } - return e + return e, nil } // add adds the given values to the entry. -func (e *entry) add(values []Value) { +func (e *entry) add(values []Value) error { // See if the new values are sorted or contain duplicate timestamps var ( prevTime int64 @@ -75,6 +87,14 @@ func (e *entry) add(values []Value) { if len(e.values) == 0 { e.values = values } else { + // Make sure the new values are the same type as the exiting values + et := valueType(e.values[0]) + for _, v := range values { + if et != valueType(v) { + e.mu.Unlock() + return tsdb.ErrFieldTypeConflict + } + } l := len(e.values) lastValTime := e.values[l-1].UnixNano() if lastValTime >= values[0].UnixNano() { @@ -83,6 +103,7 @@ func (e *entry) add(values []Value) { e.values = append(e.values, values...) } e.mu.Unlock() + return nil } // deduplicate sorts and orders the entry's values. If values are already deduped and @@ -219,7 +240,10 @@ func (c *Cache) Write(key string, values []Value) error { return ErrCacheMemoryExceeded } - c.write(key, values) + if err := c.write(key, values); err != nil { + c.mu.Unlock() + return err + } c.size += addedSize c.mu.Unlock() @@ -231,7 +255,9 @@ func (c *Cache) Write(key string, values []Value) error { } // WriteMulti writes the map of keys and associated values to the cache. This function is goroutine-safe. -// It returns an error if the cache will exceeded its max size by adding the new values. +// It returns an error if the cache will exceeded its max size by adding the new values. The write attempts +// to write as many values as possible. If one key fails, the others can still succeed and an error will +// be returned. func (c *Cache) WriteMulti(values map[string][]Value) error { var totalSz uint64 for _, v := range values { @@ -246,8 +272,14 @@ func (c *Cache) WriteMulti(values map[string][]Value) error { return ErrCacheMemoryExceeded } + var werr error for k, v := range values { - c.write(k, v) + if err := c.write(k, v); err != nil { + // write failed, hold onto the error and adjust + // the size delta + werr = err + totalSz -= uint64(Values(v).Size()) + } } c.size += totalSz c.mu.Unlock() @@ -256,7 +288,7 @@ func (c *Cache) WriteMulti(values map[string][]Value) error { c.updateMemSize(int64(totalSz)) atomic.AddInt64(&c.stats.WriteOK, 1) - return nil + return werr } // Snapshot will take a snapshot of the current cache, add it to the slice of caches that @@ -491,13 +523,18 @@ func (c *Cache) values(key string) Values { // write writes the set of values for the key to the cache. This function assumes // the lock has been taken and does not enforce the cache size limits. -func (c *Cache) write(key string, values []Value) { +func (c *Cache) write(key string, values []Value) error { e, ok := c.store[key] if !ok { - c.store[key] = newEntryValues(values) - return + var err error + e, err = newEntryValues(values) + if err != nil { + return err + } + c.store[key] = e + return nil } - e.add(values) + return e.add(values) } func (c *Cache) entry(key string) *entry { @@ -624,6 +661,21 @@ func (c *Cache) updateMemSize(b int64) { atomic.AddInt64(&c.stats.MemSizeBytes, b) } +func valueType(v Value) int { + switch v.(type) { + case *FloatValue: + return 1 + case *IntegerValue: + return 2 + case *StringValue: + return 3 + case *BooleanValue: + return 4 + default: + return 0 + } +} + // Update the snapshotsCount and the diskSize levels func (c *Cache) updateSnapshots() { // Update disk stats diff --git a/tsdb/engine/tsm1/cache_test.go b/tsdb/engine/tsm1/cache_test.go index 87d9cac808..7f7c5fafad 100644 --- a/tsdb/engine/tsm1/cache_test.go +++ b/tsdb/engine/tsm1/cache_test.go @@ -56,6 +56,27 @@ func TestCache_CacheWrite(t *testing.T) { } } +func TestCache_CacheWrite_TypeConflict(t *testing.T) { + v0 := NewValue(1, 1.0) + v1 := NewValue(2, int(64)) + values := Values{v0, v1} + valuesSize := v0.Size() + v1.Size() + + c := NewCache(uint64(2*valuesSize), "") + + if err := c.Write("foo", values[:1]); err != nil { + t.Fatalf("failed to write key foo to cache: %s", err.Error()) + } + + if err := c.Write("foo", values[1:]); err == nil { + t.Fatalf("expected field type conflict") + } + + if exp, got := uint64(v0.Size()), c.Size(); exp != got { + t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", exp, got) + } +} + func TestCache_CacheWriteMulti(t *testing.T) { v0 := NewValue(1, 1.0) v1 := NewValue(2, 2.0) @@ -77,6 +98,28 @@ func TestCache_CacheWriteMulti(t *testing.T) { } } +func TestCache_CacheWriteMulti_TypeConflict(t *testing.T) { + v0 := NewValue(1, 1.0) + v1 := NewValue(2, 2.0) + v2 := NewValue(3, int64(3)) + values := Values{v0, v1, v2} + valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) + + c := NewCache(3*valuesSize, "") + + if err := c.WriteMulti(map[string][]Value{"foo": values[:1], "bar": values[1:]}); err == nil { + t.Fatalf(" expected field type conflict") + } + + if exp, got := uint64(v0.Size()), c.Size(); exp != got { + t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", exp, got) + } + + if exp, keys := []string{"foo"}, c.Keys(); !reflect.DeepEqual(keys, exp) { + t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys) + } +} + func TestCache_Cache_DeleteRange(t *testing.T) { v0 := NewValue(1, 1.0) v1 := NewValue(2, 2.0) diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 02bf7f2b29..5724e3e45f 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -9,6 +9,7 @@ import ( "path/filepath" "reflect" "strings" + "sync" "testing" "time" @@ -361,6 +362,80 @@ func TestShardWriteAddNewField(t *testing.T) { } } +// Tests concurrently writing to the same shard with different field types which +// can trigger a panic when the shard is snapshotted to TSM files. +func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) { + if testing.Short() { + t.Skip() + } + tmpDir, _ := ioutil.TempDir("", "shard_test") + defer os.RemoveAll(tmpDir) + tmpShard := path.Join(tmpDir, "shard") + tmpWal := path.Join(tmpDir, "wal") + + index := tsdb.NewDatabaseIndex("db") + opts := tsdb.NewEngineOptions() + opts.Config.WALDir = filepath.Join(tmpDir, "wal") + + sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts) + if err := sh.Open(); err != nil { + t.Fatalf("error opening shard: %s", err.Error()) + } + defer sh.Close() + + points := make([]models.Point, 0, 1000) + for i := 0; i < cap(points); i++ { + if i < 500 { + points = append(points, models.MustNewPoint( + "cpu", + models.NewTags(map[string]string{"host": "server"}), + map[string]interface{}{"value": 1.0}, + time.Unix(int64(i), 0), + )) + } else { + points = append(points, models.MustNewPoint( + "cpu", + models.NewTags(map[string]string{"host": "server"}), + map[string]interface{}{"value": int64(1)}, + time.Unix(int64(i), 0), + )) + } + } + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + for i := 0; i < 50; i++ { + if err := sh.DeleteMeasurement("cpu", []string{"cpu,host=server"}); err != nil { + t.Fatalf(err.Error()) + } + + _ = sh.WritePoints(points[:500]) + if f, err := sh.CreateSnapshot(); err == nil { + os.RemoveAll(f) + } + + } + }() + + go func() { + defer wg.Done() + for i := 0; i < 50; i++ { + if err := sh.DeleteMeasurement("cpu", []string{"cpu,host=server"}); err != nil { + t.Fatalf(err.Error()) + } + + _ = sh.WritePoints(points[500:]) + if f, err := sh.CreateSnapshot(); err == nil { + os.RemoveAll(f) + } + } + }() + + wg.Wait() +} + // Ensures that when a shard is closed, it removes any series meta-data // from the index. func TestShard_Close_RemoveIndex(t *testing.T) {