Merge pull request #7533 from influxdata/jw-7498

Fix panic: interface conversion: tsm1.Value is *tsm1.FloatValue, not *tsm1.StringValue
pull/7538/head
Jason Wilder 2016-10-28 12:53:08 -06:00 committed by GitHub
commit bcb48a8ff2
4 changed files with 201 additions and 18 deletions

View File

@ -12,6 +12,7 @@ import (
"time"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
)
var (
@ -34,24 +35,35 @@ func newEntry() *entry {
}
}
// newEntryValues returns a new instance of entry with the given values
func newEntryValues(values []Value) *entry {
// newEntryValues returns a new instance of entry with the given values. If the
// values are not valid, an error is returned.
func newEntryValues(values []Value) (*entry, error) {
e := &entry{values: values}
// No values, don't check types and ordering
if len(values) == 0 {
return e, nil
}
var prevTime int64
et := valueType(values[0])
for _, v := range values {
if v.UnixNano() <= prevTime {
e.needSort = true
break
}
prevTime = v.UnixNano()
// Make sure all the values are the same type
if et != valueType(v) {
return nil, tsdb.ErrFieldTypeConflict
}
}
return e
return e, nil
}
// add adds the given values to the entry.
func (e *entry) add(values []Value) {
func (e *entry) add(values []Value) error {
// See if the new values are sorted or contain duplicate timestamps
var (
prevTime int64
@ -75,6 +87,14 @@ func (e *entry) add(values []Value) {
if len(e.values) == 0 {
e.values = values
} else {
// Make sure the new values are the same type as the exiting values
et := valueType(e.values[0])
for _, v := range values {
if et != valueType(v) {
e.mu.Unlock()
return tsdb.ErrFieldTypeConflict
}
}
l := len(e.values)
lastValTime := e.values[l-1].UnixNano()
if lastValTime >= values[0].UnixNano() {
@ -83,6 +103,7 @@ func (e *entry) add(values []Value) {
e.values = append(e.values, values...)
}
e.mu.Unlock()
return nil
}
// deduplicate sorts and orders the entry's values. If values are already deduped and
@ -135,8 +156,9 @@ const (
statCachedBytes = "cachedBytes" // counter: Total number of bytes written into snapshots.
statWALCompactionTimeMs = "WALCompactionTimeMs" // counter: Total number of milliseconds spent compacting snapshots
writeOK = "writeOk"
writeErr = "writeErr"
statCacheWriteOK = "writeOk"
statCacheWriteErr = "writeErr"
statCacheWriteDropped = "writeDropped"
)
// Cache maintains an in-memory store of Values for a set of keys.
@ -188,6 +210,7 @@ type CacheStatistics struct {
WALCompactionTimeMs int64
WriteOK int64
WriteErr int64
WriteDropped int64
}
// Statistics returns statistics for periodic monitoring.
@ -202,6 +225,9 @@ func (c *Cache) Statistics(tags map[string]string) []models.Statistic {
statCacheAgeMs: atomic.LoadInt64(&c.stats.CacheAgeMs),
statCachedBytes: atomic.LoadInt64(&c.stats.CachedBytes),
statWALCompactionTimeMs: atomic.LoadInt64(&c.stats.WALCompactionTimeMs),
statCacheWriteOK: atomic.LoadInt64(&c.stats.WriteOK),
statCacheWriteErr: atomic.LoadInt64(&c.stats.WriteErr),
statCacheWriteDropped: atomic.LoadInt64(&c.stats.WriteDropped),
},
}}
}
@ -219,7 +245,11 @@ func (c *Cache) Write(key string, values []Value) error {
return ErrCacheMemoryExceeded
}
c.write(key, values)
if err := c.write(key, values); err != nil {
c.mu.Unlock()
atomic.AddInt64(&c.stats.WriteErr, 1)
return err
}
c.size += addedSize
c.mu.Unlock()
@ -231,7 +261,9 @@ func (c *Cache) Write(key string, values []Value) error {
}
// WriteMulti writes the map of keys and associated values to the cache. This function is goroutine-safe.
// It returns an error if the cache will exceeded its max size by adding the new values.
// It returns an error if the cache will exceeded its max size by adding the new values. The write attempts
// to write as many values as possible. If one key fails, the others can still succeed and an error will
// be returned.
func (c *Cache) WriteMulti(values map[string][]Value) error {
var totalSz uint64
for _, v := range values {
@ -246,17 +278,30 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
return ErrCacheMemoryExceeded
}
var werr error
for k, v := range values {
c.write(k, v)
if err := c.write(k, v); err != nil {
// write failed, hold onto the error and adjust
// the size delta
werr = err
totalSz -= uint64(Values(v).Size())
}
}
c.size += totalSz
c.mu.Unlock()
// Some points in the batch were dropped. An error is returned so
// error stat is incremented as well.
if werr != nil {
atomic.AddInt64(&c.stats.WriteDropped, 1)
atomic.AddInt64(&c.stats.WriteErr, 1)
}
// Update the memory size stat
c.updateMemSize(int64(totalSz))
atomic.AddInt64(&c.stats.WriteOK, 1)
return nil
return werr
}
// Snapshot will take a snapshot of the current cache, add it to the slice of caches that
@ -491,13 +536,18 @@ func (c *Cache) values(key string) Values {
// write writes the set of values for the key to the cache. This function assumes
// the lock has been taken and does not enforce the cache size limits.
func (c *Cache) write(key string, values []Value) {
func (c *Cache) write(key string, values []Value) error {
e, ok := c.store[key]
if !ok {
c.store[key] = newEntryValues(values)
return
var err error
e, err = newEntryValues(values)
if err != nil {
return err
}
c.store[key] = e
return nil
}
e.add(values)
return e.add(values)
}
func (c *Cache) entry(key string) *entry {
@ -624,6 +674,21 @@ func (c *Cache) updateMemSize(b int64) {
atomic.AddInt64(&c.stats.MemSizeBytes, b)
}
func valueType(v Value) int {
switch v.(type) {
case *FloatValue:
return 1
case *IntegerValue:
return 2
case *StringValue:
return 3
case *BooleanValue:
return 4
default:
return 0
}
}
// Update the snapshotsCount and the diskSize levels
func (c *Cache) updateSnapshots() {
// Update disk stats

View File

@ -56,6 +56,27 @@ func TestCache_CacheWrite(t *testing.T) {
}
}
func TestCache_CacheWrite_TypeConflict(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, int(64))
values := Values{v0, v1}
valuesSize := v0.Size() + v1.Size()
c := NewCache(uint64(2*valuesSize), "")
if err := c.Write("foo", values[:1]); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}
if err := c.Write("foo", values[1:]); err == nil {
t.Fatalf("expected field type conflict")
}
if exp, got := uint64(v0.Size()), c.Size(); exp != got {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", exp, got)
}
}
func TestCache_CacheWriteMulti(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, 2.0)
@ -77,6 +98,28 @@ func TestCache_CacheWriteMulti(t *testing.T) {
}
}
func TestCache_CacheWriteMulti_TypeConflict(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, 2.0)
v2 := NewValue(3, int64(3))
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())
c := NewCache(3*valuesSize, "")
if err := c.WriteMulti(map[string][]Value{"foo": values[:1], "bar": values[1:]}); err == nil {
t.Fatalf(" expected field type conflict")
}
if exp, got := uint64(v0.Size()), c.Size(); exp != got {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", exp, got)
}
if exp, keys := []string{"foo"}, c.Keys(); !reflect.DeepEqual(keys, exp) {
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
}
}
func TestCache_Cache_DeleteRange(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, 2.0)

View File

@ -261,7 +261,7 @@ func (f *FileStore) Add(files ...TSMFile) {
for _, file := range files {
atomic.AddInt64(&f.stats.DiskBytes, int64(file.Size()))
}
f.lastFileStats = f.lastFileStats[:0] // Will need to be recalculated on next call to Stats.
f.lastFileStats = nil
f.files = append(f.files, files...)
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
@ -289,7 +289,7 @@ func (f *FileStore) Remove(paths ...string) {
atomic.AddInt64(&f.stats.DiskBytes, -int64(file.Size()))
}
}
f.lastFileStats = f.lastFileStats[:0] // Will need to be recalculated on next call to Stats.
f.lastFileStats = nil
f.files = active
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
@ -618,7 +618,7 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
// Tell the purger about our in-use files we need to remove
f.purger.add(inuse)
f.lastFileStats = f.lastFileStats[:0] // Will need to be recalculated on next call to Stats.
f.lastFileStats = nil
f.files = active
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))

View File

@ -9,6 +9,7 @@ import (
"path/filepath"
"reflect"
"strings"
"sync"
"testing"
"time"
@ -361,6 +362,80 @@ func TestShardWriteAddNewField(t *testing.T) {
}
}
// Tests concurrently writing to the same shard with different field types which
// can trigger a panic when the shard is snapshotted to TSM files.
func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
if testing.Short() {
t.Skip()
}
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
index := tsdb.NewDatabaseIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
defer sh.Close()
points := make([]models.Point, 0, 1000)
for i := 0; i < cap(points); i++ {
if i < 500 {
points = append(points, models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(int64(i), 0),
))
} else {
points = append(points, models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": int64(1)},
time.Unix(int64(i), 0),
))
}
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < 50; i++ {
if err := sh.DeleteMeasurement("cpu", []string{"cpu,host=server"}); err != nil {
t.Fatalf(err.Error())
}
_ = sh.WritePoints(points[:500])
if f, err := sh.CreateSnapshot(); err == nil {
os.RemoveAll(f)
}
}
}()
go func() {
defer wg.Done()
for i := 0; i < 50; i++ {
if err := sh.DeleteMeasurement("cpu", []string{"cpu,host=server"}); err != nil {
t.Fatalf(err.Error())
}
_ = sh.WritePoints(points[500:])
if f, err := sh.CreateSnapshot(); err == nil {
os.RemoveAll(f)
}
}
}()
wg.Wait()
}
// Ensures that when a shard is closed, it removes any series meta-data
// from the index.
func TestShard_Close_RemoveIndex(t *testing.T) {