Merge pull request #8095 from influxdata/jw-cache-race2

Fix race in WALEntry.Encode and Values.Deduplicate
pull/8094/head
Jason Wilder 2017-03-06 10:12:44 -07:00 committed by GitHub
commit 80f9a29d7f
4 changed files with 155 additions and 5 deletions

View File

@ -20,7 +20,7 @@
- [#8078](https://github.com/influxdata/influxdb/issues/8078): Map types correctly when selecting a field with multiple measurements where one of the measurements is empty.
- [#8080](https://github.com/influxdata/influxdb/issues/8080): Point.UnmarshalBinary() bounds check
- [#8085](https://github.com/influxdata/influxdb/issues/8085): panic: interface conversion: tsm1.Value is tsm1.IntegerValue, not tsm1.FloatValue.
- [#8095](https://github.com/influxdata/influxdb/pull/8095): Fix race in WALEntry.Encode and Values.Deduplicate
## v1.2.0 [2017-01-24]

View File

@ -53,12 +53,12 @@ func newEntryValues(values []Value, hint int) (*entry, error) {
}
e := &entry{}
if len(values) >= hint {
e.values = values
if len(values) > hint {
e.values = make(Values, 0, len(values))
} else {
e.values = make(Values, 0, hint)
e.values = append(e.values, values...)
}
e.values = append(e.values, values...)
// No values, don't check types and ordering
if len(values) == 0 {

View File

@ -816,11 +816,15 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error {
// DeleteMeasurement deletes a measurement and all related series.
func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error {
if err := e.DeleteSeries(seriesKeys); err != nil {
return err
}
e.fieldsMu.Lock()
delete(e.measurementFields, name)
e.fieldsMu.Unlock()
return e.DeleteSeries(seriesKeys)
return nil
}
// SeriesCount returns the number of series buckets on the shard.

View File

@ -437,6 +437,152 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
wg.Wait()
}
func TestShard_WritePoints_FieldConflictConcurrentQuery(t *testing.T) {
if testing.Short() {
t.Skip()
}
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
index := tsdb.NewDatabaseIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
defer sh.Close()
// Spin up two goroutines that write points with different field types in reverse
// order concurrently. After writing them, query them back.
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
// Write 250 floats and then ints to the same field
points := make([]models.Point, 0, 500)
for i := 0; i < cap(points); i++ {
if i < 250 {
points = append(points, models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(int64(i), 0),
))
} else {
points = append(points, models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": int64(1)},
time.Unix(int64(i), 0),
))
}
}
for i := 0; i < 500; i++ {
if err := sh.DeleteMeasurement("cpu", []string{"cpu,host=server"}); err != nil {
t.Fatalf(err.Error())
}
sh.WritePoints(points)
iter, err := sh.CreateIterator("cpu", influxql.IteratorOptions{
Expr: influxql.MustParseExpr(`value`),
Aux: []influxql.VarRef{{Val: "value"}},
Dimensions: []string{},
Ascending: true,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
})
if err != nil {
t.Fatalf(err.Error())
}
switch itr := iter.(type) {
case influxql.IntegerIterator:
p, err := itr.Next()
for p != nil && err == nil {
p, err = itr.Next()
}
iter.Close()
case influxql.FloatIterator:
p, err := itr.Next()
for p != nil && err == nil {
p, err = itr.Next()
}
iter.Close()
}
}
}()
go func() {
defer wg.Done()
// Write 250 ints and then floats to the same field
points := make([]models.Point, 0, 500)
for i := 0; i < cap(points); i++ {
if i < 250 {
points = append(points, models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": int64(1)},
time.Unix(int64(i), 0),
))
} else {
points = append(points, models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(int64(i), 0),
))
}
}
for i := 0; i < 500; i++ {
if err := sh.DeleteMeasurement("cpu", []string{"cpu,host=server"}); err != nil {
t.Fatalf(err.Error())
}
sh.WritePoints(points)
iter, err := sh.CreateIterator("cpu", influxql.IteratorOptions{
Expr: influxql.MustParseExpr(`value`),
Aux: []influxql.VarRef{{Val: "value"}},
Dimensions: []string{},
Ascending: true,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
})
if err != nil {
t.Fatalf(err.Error())
}
switch itr := iter.(type) {
case influxql.IntegerIterator:
p, err := itr.Next()
for p != nil && err == nil {
p, err = itr.Next()
}
iter.Close()
case influxql.FloatIterator:
p, err := itr.Next()
for p != nil && err == nil {
p, err = itr.Next()
}
iter.Close()
}
}
}()
wg.Wait()
}
// Ensures that when a shard is closed, it removes any series meta-data
// from the index.
func TestShard_Close_RemoveIndex(t *testing.T) {