fix: do not send non-UTF-8 characters to subscriptions (#21558)
Added a check for valid UTF-8 strings in measurement names, tags name, tag values, and field names when writing to subscriptions. Do not send the failing points to subscribers, and log the errors if at debug level logging Closes https://github.com/influxdata/influxdb/issues/21557pull/21614/head
parent
87fbb3fec9
commit
a08b69098e
|
@ -100,6 +100,9 @@ type Point interface {
|
|||
// HasTag returns true if the tag exists for the point.
|
||||
HasTag(tag []byte) bool
|
||||
|
||||
// ForEachField iterates over each field invoking fn. if fn returns false, iteration stops.
|
||||
ForEachField(fn func(k, v []byte) bool) error
|
||||
|
||||
// Fields returns the fields for the point.
|
||||
Fields() (Fields, error)
|
||||
|
||||
|
@ -1574,6 +1577,10 @@ func walkTags(buf []byte, fn func(key, value []byte) bool) {
|
|||
}
|
||||
}
|
||||
|
||||
func (p *point) ForEachField(fn func(k, v []byte) bool) error {
|
||||
return walkFields(p.fields, fn)
|
||||
}
|
||||
|
||||
// walkFields walks each field key and value via fn. If fn returns false, the iteration
|
||||
// is stopped. The values are the raw byte slices and not the converted types.
|
||||
func walkFields(buf []byte, fn func(key, value []byte) bool) error {
|
||||
|
@ -2546,3 +2553,43 @@ func ValidKeyTokens(name string, tags Tags) bool {
|
|||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// ValidPointStrings validates the measurement name, tage names and values, and field names in a point
|
||||
func ValidPointStrings(p Point) (err error) {
|
||||
if !ValidKeyToken(string(p.Name())) {
|
||||
return fmt.Errorf("invalid or unprintable UTF-8 characters in measurement name: %q", p.Name())
|
||||
}
|
||||
|
||||
validTag := func(k []byte, v []byte) bool {
|
||||
if !ValidKeyToken(string(k)) {
|
||||
err = fmt.Errorf("invalid or unprintable UTF-8 characters in tag key: %q", k)
|
||||
return false
|
||||
} else if !ValidKeyToken(string(v)) {
|
||||
err = fmt.Errorf("invalid or unprintable UTF-8 characters in tag value: %q", v)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
p.ForEachTag(validTag)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
validField := func(k, v []byte) bool {
|
||||
if !ValidKeyToken(string(k)) {
|
||||
err = fmt.Errorf("invalid or unprintable UTF-8 in field name: %q", k)
|
||||
return false
|
||||
} else {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
if e := p.ForEachField(validField); e != nil {
|
||||
return e
|
||||
} else if err != nil {
|
||||
return err
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -245,6 +245,8 @@ func (s *Service) run() {
|
|||
s.close(&wg)
|
||||
return
|
||||
}
|
||||
|
||||
p = s.removeBadPoints(p)
|
||||
for se, cw := range s.subs {
|
||||
if p.Database == se.db && p.RetentionPolicy == se.rp {
|
||||
select {
|
||||
|
@ -258,6 +260,44 @@ func (s *Service) run() {
|
|||
}
|
||||
}
|
||||
|
||||
// removeBadPoints - if any non-UTF8 strings are found in the points in the WritePointRequest,
|
||||
// make a copy without those points
|
||||
func (s *Service) removeBadPoints(p *coordinator.WritePointsRequest) *coordinator.WritePointsRequest {
|
||||
log := s.Logger.With(zap.String("database", p.Database), zap.String("retention_policy", p.RetentionPolicy))
|
||||
|
||||
firstBad, err := func() (int, error) {
|
||||
for i, point := range p.Points {
|
||||
if err := models.ValidPointStrings(point); err != nil {
|
||||
atomic.AddInt64(&s.stats.WriteFailures, 1)
|
||||
log.Debug("discarding point", zap.Error(err))
|
||||
return i, err
|
||||
}
|
||||
}
|
||||
return -1, nil
|
||||
}()
|
||||
if err != nil {
|
||||
wrq := &coordinator.WritePointsRequest{
|
||||
Database: p.Database,
|
||||
RetentionPolicy: p.RetentionPolicy,
|
||||
Points: make([]models.Point, 0, len(p.Points)-1),
|
||||
}
|
||||
|
||||
// Copy all the points up to the first bad one.
|
||||
wrq.Points = append(wrq.Points, p.Points[:firstBad]...)
|
||||
for _, point := range p.Points[firstBad+1:] {
|
||||
if err := models.ValidPointStrings(point); err != nil {
|
||||
// Log and omit this point from subscription writes
|
||||
atomic.AddInt64(&s.stats.WriteFailures, 1)
|
||||
log.Debug("discarding point", zap.Error(err))
|
||||
} else {
|
||||
wrq.Points = append(wrq.Points, point)
|
||||
}
|
||||
}
|
||||
p = wrq
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
// close closes the existing channel writers.
|
||||
func (s *Service) close(wg *sync.WaitGroup) {
|
||||
s.subMu.Lock()
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
package subscriber_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/coordinator"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
"github.com/influxdata/influxdb/services/subscriber"
|
||||
)
|
||||
|
@ -443,3 +445,156 @@ func TestService_WaitForDataChanged(t *testing.T) {
|
|||
|
||||
close(dataChanged)
|
||||
}
|
||||
|
||||
func TestService_BadUTF8(t *testing.T) {
|
||||
dataChanged := make(chan struct{})
|
||||
ms := MetaClient{}
|
||||
ms.WaitForDataChangedFn = func() chan struct{} {
|
||||
return dataChanged
|
||||
}
|
||||
ms.DatabasesFn = func() []meta.DatabaseInfo {
|
||||
return []meta.DatabaseInfo{
|
||||
{
|
||||
Name: "db0",
|
||||
RetentionPolicies: []meta.RetentionPolicyInfo{
|
||||
{
|
||||
Name: "rp0",
|
||||
Subscriptions: []meta.SubscriptionInfo{
|
||||
{Name: "s0", Mode: "ALL", Destinations: []string{"udp://h0:9093", "udp://h1:9093"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
prs := make(chan *coordinator.WritePointsRequest, 2)
|
||||
urls := make(chan url.URL, 2)
|
||||
newPointsWriter := func(u url.URL) (subscriber.PointsWriter, error) {
|
||||
sub := Subscription{}
|
||||
sub.WritePointsFn = func(p *coordinator.WritePointsRequest) error {
|
||||
prs <- p
|
||||
return nil
|
||||
}
|
||||
urls <- u
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
s := subscriber.NewService(subscriber.NewConfig())
|
||||
s.MetaClient = ms
|
||||
s.NewPointsWriter = newPointsWriter
|
||||
s.Open()
|
||||
defer s.Close()
|
||||
|
||||
// Signal that data has changed
|
||||
dataChanged <- struct{}{}
|
||||
|
||||
for _, expURLStr := range []string{"udp://h0:9093", "udp://h1:9093"} {
|
||||
var u url.URL
|
||||
expURL, _ := url.Parse(expURLStr)
|
||||
select {
|
||||
case u = <-urls:
|
||||
case <-time.After(testTimeout):
|
||||
t.Fatal("expected urls")
|
||||
}
|
||||
if expURL.String() != u.String() {
|
||||
t.Fatalf("unexpected url: got %s exp %s", u.String(), expURL.String())
|
||||
}
|
||||
}
|
||||
|
||||
badOne := []byte{255, 112, 114, 111, 99} // A known invalid UTF-8 string
|
||||
badTwo := []byte{255, 110, 101, 116} // A known invalid UTF-8 string
|
||||
// bad measurement
|
||||
// all good
|
||||
// bad field name
|
||||
// bad tag name
|
||||
// bad tag value
|
||||
// all good
|
||||
fmtString :=
|
||||
`%s,tagname=A fieldname=1.1,stringfield="bathyscape1" 6000000000
|
||||
measurementname,tagname=a fieldname=1.1,stringfield="bathyscape5" 6000000001
|
||||
measurementname,tagname=A %s=1.2,stringfield="bathyscape2" 6000000002
|
||||
measurementname,%s=A fieldname=1.3,stringfield="bathyscape3" 6000000003
|
||||
measurementname,tagname=%s fieldname=1.4,stringfield="bathyscape4" 6000000004
|
||||
measurementname,tagname=a fieldname=1.6,stringfield="bathyscape5" 6000000006`
|
||||
pointString := fmt.Sprintf(fmtString, badOne, badTwo, badOne, badTwo)
|
||||
verifyNonUTF8Removal(t, pointString, s, prs, []int{1, 5}, "2 good, 4 bad")
|
||||
|
||||
// All points are bad
|
||||
fmtString = `measurementname,tagname=A %s=1.2,stringfield="bathyscape2" 6000000002
|
||||
measurementname,tagname=%s fieldname=1.4,stringfield="bathyscape4" 6000000003`
|
||||
pointString = fmt.Sprintf(fmtString, badTwo, badOne)
|
||||
verifyNonUTF8Removal(t, pointString, s, prs, []int{}, "All 2 bad")
|
||||
|
||||
// First point is bad
|
||||
fmtString = `measurementname,tagname=A %s=1.2,stringfield="bathyscape2" 6000000004
|
||||
measurementname,tagname=a fieldname=1.1,stringfield="bathyscape5" 6000000005
|
||||
measurementname,tagname=b fieldname=1.2,stringfield="bathyscape6" 6000000006`
|
||||
pointString = fmt.Sprintf(fmtString, badTwo)
|
||||
verifyNonUTF8Removal(t, pointString, s, prs, []int{1, 2}, "First of 3 bad")
|
||||
|
||||
// last point is bad
|
||||
fmtString = `measurementname,tagname=a fieldname=1.1,stringfield="bathyscape5" 6000000006
|
||||
measurementname,tagname=b %s=1.2,stringfield="bathyscape2" 6000000007`
|
||||
pointString = fmt.Sprintf(fmtString, badOne)
|
||||
verifyNonUTF8Removal(t, pointString, s, prs, []int{0}, "Last of 2 bad")
|
||||
|
||||
// only point is bad
|
||||
fmtString = `measurementname,tagname=b %s=1.2,stringfield="bathyscape2" 6000000007`
|
||||
pointString = fmt.Sprintf(fmtString, badOne)
|
||||
verifyNonUTF8Removal(t, pointString, s, prs, []int{}, "1 bad, 0 good")
|
||||
|
||||
// last 2 points are bad
|
||||
fmtString = `measurementname,tagname=a fieldname=1.1,stringfield="bathyscape5" 6000000006
|
||||
measurementname,tagname=b %s=1.2,stringfield="bathyscape2" 6000000007
|
||||
%s,tagname=A fieldname=1.1,stringfield="bathyscape1" 6000000008`
|
||||
pointString = fmt.Sprintf(fmtString, badTwo, badOne)
|
||||
verifyNonUTF8Removal(t, pointString, s, prs, []int{0}, "1 good, 2 bad")
|
||||
|
||||
// first 2 points are bad
|
||||
fmtString = `measurementname,tagname=b %s=1.2,stringfield="bathyscape2" 6000000007
|
||||
%s,tagname=A fieldname=1.1,stringfield="bathyscape1" 6000000008
|
||||
measurementname,tagname=a fieldname=1.1,stringfield="bathyscape5" 6000000009`
|
||||
pointString = fmt.Sprintf(fmtString, badTwo, badOne)
|
||||
verifyNonUTF8Removal(t, pointString, s, prs, []int{2}, "2 bad, 1 good")
|
||||
|
||||
close(dataChanged)
|
||||
}
|
||||
|
||||
func verifyNonUTF8Removal(t *testing.T, pointString string, s *subscriber.Service, prs chan *coordinator.WritePointsRequest, goodLines []int, trialMessage string) {
|
||||
points, err := models.ParsePointsString(pointString)
|
||||
if err != nil {
|
||||
t.Fatalf("%s: %v", trialMessage, err)
|
||||
}
|
||||
goodPoints := make([]string, 0, len(goodLines))
|
||||
|
||||
for _, line := range goodLines {
|
||||
goodPoints = append(goodPoints, points[line].String())
|
||||
}
|
||||
|
||||
// Write points that match subscription with mode ALL
|
||||
expPR := &coordinator.WritePointsRequest{
|
||||
Database: "db0",
|
||||
RetentionPolicy: "rp0",
|
||||
Points: points,
|
||||
}
|
||||
s.Points() <- expPR
|
||||
|
||||
// Should get pr back twice
|
||||
for i := 0; i < 2; i++ {
|
||||
var pr *coordinator.WritePointsRequest
|
||||
select {
|
||||
case pr = <-prs:
|
||||
if len(pr.Points) != len(goodPoints) {
|
||||
t.Fatalf("%s expected %d points: got %d for %q", trialMessage, len(goodPoints), len(pr.Points), pointString)
|
||||
}
|
||||
for i, p := range pr.Points {
|
||||
if p.String() != goodPoints[i] {
|
||||
t.Fatalf("%s expected %q: got %q for %q", trialMessage, goodPoints[i], p.String(), pointString)
|
||||
}
|
||||
}
|
||||
case <-time.After(testTimeout):
|
||||
t.Fatalf("%s expected points request: got %d exp 2 for %q", trialMessage, i, pointString)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue