Merge pull request #14266 from influxdata/er-fix-fields
fix(storage): Fix issue where fields re-appearpull/14297/head
commit
0ff7fb96b1
|
@ -281,6 +281,43 @@ func TestServer_Insert_Delete_1515777603585914810(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// This test reproduces the issue identified in https://github.com/influxdata/influxdb/issues/10052
|
||||
func TestServer_Insert_Delete_10052(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s := OpenDefaultServer(NewConfig())
|
||||
defer s.Close()
|
||||
|
||||
mustWrite(s,
|
||||
"ping,server=ping a=1,b=2,c=3,d=4,e=5 1",
|
||||
"ping,server=ping a=1,b=2,c=3,d=4,e=5 2",
|
||||
"ping,server=ping a=1,b=2,c=3,d=4,e=5 3",
|
||||
"ping,server=ping a=1,b=2,c=3,d=4,e=5 4",
|
||||
"ping,server=ping a=1,b=2,c=3,d=4,e=5 5",
|
||||
"ping,server=ping a=1,b=2,c=3,d=4,e=5 6",
|
||||
)
|
||||
|
||||
mustDropMeasurement(s, "ping")
|
||||
gotSeries := mustGetSeries(s)
|
||||
expectedSeries := []string(nil)
|
||||
if !reflect.DeepEqual(gotSeries, expectedSeries) {
|
||||
t.Fatalf("got series %v, expected %v", gotSeries, expectedSeries)
|
||||
}
|
||||
|
||||
mustWrite(s, "ping v=1 1")
|
||||
gotSeries = mustGetSeries(s)
|
||||
expectedSeries = []string{"ping"}
|
||||
if !reflect.DeepEqual(gotSeries, expectedSeries) {
|
||||
t.Fatalf("got series %v, expected %v", gotSeries, expectedSeries)
|
||||
}
|
||||
|
||||
gotSeries = mustGetFieldKeys(s)
|
||||
expectedSeries = []string{"v"}
|
||||
if !reflect.DeepEqual(gotSeries, expectedSeries) {
|
||||
t.Fatalf("got series %v, expected %v", gotSeries, expectedSeries)
|
||||
}
|
||||
}
|
||||
|
||||
func mustGetSeries(s Server) []string {
|
||||
// Compare series left in index.
|
||||
result, err := s.QueryWithParams("SHOW SERIES", url.Values{"db": []string{"db0"}})
|
||||
|
@ -288,7 +325,21 @@ func mustGetSeries(s Server) []string {
|
|||
panic(err)
|
||||
}
|
||||
|
||||
gotSeries, err := seriesFromShowSeries(result)
|
||||
gotSeries, err := valuesFromShowQuery(result)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return gotSeries
|
||||
}
|
||||
|
||||
func mustGetFieldKeys(s Server) []string {
|
||||
// Compare series left in index.
|
||||
result, err := s.QueryWithParams("SHOW FIELD KEYS", url.Values{"db": []string{"db0"}})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
gotSeries, err := valuesFromShowQuery(result)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -318,6 +369,13 @@ func mustDelete(s Server, name string, min, max int64) {
|
|||
}
|
||||
}
|
||||
|
||||
func mustDropMeasurement(s Server, name string) {
|
||||
query := fmt.Sprintf("DROP MEASUREMENT %q", name)
|
||||
if _, err := s.QueryWithParams(query, url.Values{"db": []string{db}}); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// SeriesTracker is a lockable tracker of which shards should own which series.
|
||||
type SeriesTracker struct {
|
||||
sync.RWMutex
|
||||
|
@ -544,7 +602,7 @@ func (s *SeriesTracker) Verify() error {
|
|||
}
|
||||
|
||||
// Get all series...
|
||||
gotSeries, err := seriesFromShowSeries(res)
|
||||
gotSeries, err := valuesFromShowQuery(res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -561,9 +619,9 @@ func (s *SeriesTracker) Verify() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// seriesFromShowSeries extracts a lexicographically sorted set of series keys
|
||||
// valuesFromShowQuery extracts a lexicographically sorted set of series keys
|
||||
// from a SHOW SERIES query.
|
||||
func seriesFromShowSeries(result string) ([]string, error) {
|
||||
func valuesFromShowQuery(result string) ([]string, error) {
|
||||
// Get all series...
|
||||
var results struct {
|
||||
Results []struct {
|
||||
|
|
|
@ -266,7 +266,10 @@ func (i *Index) CreateSeriesListIfNotExists(seriesIDSet *tsdb.SeriesIDSet, measu
|
|||
}
|
||||
|
||||
for j, key := range keys {
|
||||
if seriesList[j] != nil {
|
||||
// Note, keys may contain duplicates (e.g., because of points for the same series
|
||||
// in the same batch). If the duplicate series are new, the index must
|
||||
// be rechecked on each iteration.
|
||||
if seriesList[j] != nil || i.series[string(key)] != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
@ -491,6 +491,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestShard_WritePoints_FieldConflictConcurrentQuery(t *testing.T) {
|
||||
t.Skip("https://github.com/influxdata/influxdb/issues/14267")
|
||||
if testing.Short() {
|
||||
t.Skip()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue