diff --git a/tests/server_delete_test.go b/tests/server_delete_test.go index c2d447da1c..8cdd56cd2b 100644 --- a/tests/server_delete_test.go +++ b/tests/server_delete_test.go @@ -281,6 +281,43 @@ func TestServer_Insert_Delete_1515777603585914810(t *testing.T) { } } +// This test reproduces the issue identified in https://github.com/influxdata/influxdb/issues/10052 +func TestServer_Insert_Delete_10052(t *testing.T) { + t.Parallel() + + s := OpenDefaultServer(NewConfig()) + defer s.Close() + + mustWrite(s, + "ping,server=ping a=1,b=2,c=3,d=4,e=5 1", + "ping,server=ping a=1,b=2,c=3,d=4,e=5 2", + "ping,server=ping a=1,b=2,c=3,d=4,e=5 3", + "ping,server=ping a=1,b=2,c=3,d=4,e=5 4", + "ping,server=ping a=1,b=2,c=3,d=4,e=5 5", + "ping,server=ping a=1,b=2,c=3,d=4,e=5 6", + ) + + mustDropMeasurement(s, "ping") + gotSeries := mustGetSeries(s) + expectedSeries := []string(nil) + if !reflect.DeepEqual(gotSeries, expectedSeries) { + t.Fatalf("got series %v, expected %v", gotSeries, expectedSeries) + } + + mustWrite(s, "ping v=1 1") + gotSeries = mustGetSeries(s) + expectedSeries = []string{"ping"} + if !reflect.DeepEqual(gotSeries, expectedSeries) { + t.Fatalf("got series %v, expected %v", gotSeries, expectedSeries) + } + + gotSeries = mustGetFieldKeys(s) + expectedSeries = []string{"v"} + if !reflect.DeepEqual(gotSeries, expectedSeries) { + t.Fatalf("got series %v, expected %v", gotSeries, expectedSeries) + } +} + func mustGetSeries(s Server) []string { // Compare series left in index. result, err := s.QueryWithParams("SHOW SERIES", url.Values{"db": []string{"db0"}}) @@ -288,7 +325,21 @@ func mustGetSeries(s Server) []string { panic(err) } - gotSeries, err := seriesFromShowSeries(result) + gotSeries, err := valuesFromShowQuery(result) + if err != nil { + panic(err) + } + return gotSeries +} + +func mustGetFieldKeys(s Server) []string { + // Compare series left in index. + result, err := s.QueryWithParams("SHOW FIELD KEYS", url.Values{"db": []string{"db0"}}) + if err != nil { + panic(err) + } + + gotSeries, err := valuesFromShowQuery(result) if err != nil { panic(err) } @@ -318,6 +369,13 @@ func mustDelete(s Server, name string, min, max int64) { } } +func mustDropMeasurement(s Server, name string) { + query := fmt.Sprintf("DROP MEASUREMENT %q", name) + if _, err := s.QueryWithParams(query, url.Values{"db": []string{db}}); err != nil { + panic(err) + } +} + // SeriesTracker is a lockable tracker of which shards should own which series. type SeriesTracker struct { sync.RWMutex @@ -544,7 +602,7 @@ func (s *SeriesTracker) Verify() error { } // Get all series... - gotSeries, err := seriesFromShowSeries(res) + gotSeries, err := valuesFromShowQuery(res) if err != nil { return err } @@ -561,9 +619,9 @@ func (s *SeriesTracker) Verify() error { return nil } -// seriesFromShowSeries extracts a lexicographically sorted set of series keys +// valuesFromShowQuery extracts a lexicographically sorted set of series keys // from a SHOW SERIES query. -func seriesFromShowSeries(result string) ([]string, error) { +func valuesFromShowQuery(result string) ([]string, error) { // Get all series... var results struct { Results []struct { diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index bad6580215..ba7fea251e 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -266,7 +266,10 @@ func (i *Index) CreateSeriesListIfNotExists(seriesIDSet *tsdb.SeriesIDSet, measu } for j, key := range keys { - if seriesList[j] != nil { + // Note, keys may contain duplicates (e.g., because of points for the same series + // in the same batch). If the duplicate series are new, the index must + // be rechecked on each iteration. + if seriesList[j] != nil || i.series[string(key)] != nil { continue } diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 90ef9e16ca..e8eeb9736c 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -491,6 +491,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) { } func TestShard_WritePoints_FieldConflictConcurrentQuery(t *testing.T) { + t.Skip("https://github.com/influxdata/influxdb/issues/14267") if testing.Short() { t.Skip() }