Fix the point validation parser to identify and sort tags correctly

Fixes #6771.
pull/6801/head
Jonathan A. Sternberg 2016-06-13 09:45:08 -05:00
parent 256f57a4f4
commit 3bd9425edb
3 changed files with 51 additions and 18 deletions

View File

@ -56,6 +56,7 @@
- [#6685](https://github.com/influxdata/influxdb/issues/6685): Batch SELECT INTO / CQ writes
- [#6756](https://github.com/influxdata/influxdb/issues/6756): Set X-Influxdb-Version header on every request (even 404 requests).
- [#6760](https://github.com/influxdata/influxdb/issues/6760): Prevent panic in concurrent auth cache write
- [#6771](https://github.com/influxdata/influxdb/issues/6771): Fix the point validation parser to identify and sort tags correctly.
## v0.13.0 [2016-05-12]

View File

@ -324,24 +324,24 @@ func scanKey(buf []byte, i int) (int, []byte, error) {
}
}
// Now we know where the key region is within buf, and the locations of tags, we
// need to determine if duplicate tags exist and if the tags are sorted. This iterates
// 1/2 of the list comparing each end with each other, walking towards the center from
// both sides.
for j := 0; j < commas/2; j++ {
// Now we know where the key region is within buf, and the location of tags, we
// need to determine if duplicate tags exist and if the tags are sorted. This iterates
// over the list comparing each tag in the sequence with each other.
for j := 0; j < commas-1; j++ {
// get the left and right tags
_, left := scanTo(buf[indices[j]:indices[j+1]-1], 0, '=')
_, right := scanTo(buf[indices[commas-j-1]:indices[commas-j]-1], 0, '=')
_, right := scanTo(buf[indices[j+1]:indices[j+2]-1], 0, '=')
// If the tags are equal, then there are duplicate tags, and we should abort
if bytes.Equal(left, right) {
return i, buf[start:i], fmt.Errorf("duplicate tags")
}
// If left is greater than right, the tags are not sorted. We must continue
// since their could be duplicate tags still.
if bytes.Compare(left, right) > 0 {
// If left is greater than right, the tags are not sorted. We do not have to
// continue because the short path no longer works.
// If the tags are equal, then there are duplicate tags, and we should abort.
// If the tags are not sorted, this pass may not find duplicate tags and we
// need to do a more exhaustive search later.
if cmp := bytes.Compare(left, right); cmp > 0 {
sorted = false
break
} else if cmp == 0 {
return i, buf[start:i], fmt.Errorf("duplicate tags")
}
}
@ -367,6 +367,20 @@ func scanKey(buf []byte, i int) (int, []byte, error) {
pos += copy(b[pos:], v)
}
// Check again for duplicate tags now that the tags are sorted.
for j := 0; j < commas-1; j++ {
// get the left and right tags
_, left := scanTo(buf[indices[j]:], 0, '=')
_, right := scanTo(buf[indices[j+1]:], 0, '=')
// If the tags are equal, then there are duplicate tags, and we should abort.
// If the tags are not sorted, this pass may not find duplicate tags and we
// need to do a more exhaustive search later.
if bytes.Equal(left, right) {
return i, b, fmt.Errorf("duplicate tags")
}
}
return i, b, nil
}

View File

@ -896,10 +896,28 @@ func TestParsePointWithTags(t *testing.T) {
models.Fields{"value": 1.0}, time.Unix(1, 0)))
}
func TestParsPointWithDuplicateTags(t *testing.T) {
_, err := models.ParsePoints([]byte(`cpu,host=serverA,host=serverB value=1i 1000000000`))
if err == nil {
t.Fatalf(`ParsePoint() expected error. got nil`)
func TestParsePointWithDuplicateTags(t *testing.T) {
for i, tt := range []struct {
line string
err string
}{
{
line: `cpu,host=serverA,host=serverB value=1i 1000000000`,
err: `unable to parse 'cpu,host=serverA,host=serverB value=1i 1000000000': duplicate tags`,
},
{
line: `cpu,b=2,b=1,c=3 value=1i 1000000000`,
err: `unable to parse 'cpu,b=2,b=1,c=3 value=1i 1000000000': duplicate tags`,
},
{
line: `cpu,b=2,c=3,b=1 value=1i 1000000000`,
err: `unable to parse 'cpu,b=2,c=3,b=1 value=1i 1000000000': duplicate tags`,
},
} {
_, err := models.ParsePointsString(tt.line)
if err == nil || tt.err != err.Error() {
t.Errorf("%d. ParsePoint() expected error '%s'. got '%s'", i, tt.err, err)
}
}
}