Merge pull request #18664 from influxdata/storage-engine-write-validation-enabled
feat(storage): Add option to disable WritePoints() validation.pull/18705/head
commit
50efc73210
|
@ -3230,3 +3230,11 @@ func BenchmarkParsePointsWithOptions(b *testing.B) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkValidToken(b *testing.B) {
|
||||
token := []byte("Hello世界")
|
||||
b.ReportAllocs()
|
||||
for i := 0; i < b.N; i++ {
|
||||
models.ValidToken(token)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,6 +64,8 @@ type Engine struct {
|
|||
|
||||
defaultMetricLabels prometheus.Labels
|
||||
|
||||
writePointsValidationEnabled bool
|
||||
|
||||
// Tracks all goroutines started by the Engine.
|
||||
wg sync.WaitGroup
|
||||
|
||||
|
@ -156,6 +158,13 @@ func WithCompactionSemaphore(s influxdb.Semaphore) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithWritePointsValidationEnabled sets whether written points should be validated.
|
||||
func WithWritePointsValidationEnabled(v bool) Option {
|
||||
return func(e *Engine) {
|
||||
e.writePointsValidationEnabled = v
|
||||
}
|
||||
}
|
||||
|
||||
// NewEngine initialises a new storage engine, including a series file, index and
|
||||
// TSM engine.
|
||||
func NewEngine(path string, c Config, options ...Option) *Engine {
|
||||
|
@ -164,6 +173,8 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
|
|||
path: path,
|
||||
defaultMetricLabels: prometheus.Labels{},
|
||||
logger: zap.NewNop(),
|
||||
|
||||
writePointsValidationEnabled: true,
|
||||
}
|
||||
|
||||
// Initialize series file.
|
||||
|
@ -482,45 +493,48 @@ func (e *Engine) WritePoints(ctx context.Context, points []models.Point) error {
|
|||
}
|
||||
|
||||
for iter := collection.Iterator(); iter.Next(); {
|
||||
tags := iter.Tags()
|
||||
// Skip validation if it has already been performed previously in the call stack.
|
||||
if e.writePointsValidationEnabled {
|
||||
tags := iter.Tags()
|
||||
|
||||
// Not enough tags present.
|
||||
if tags.Len() < 2 {
|
||||
dropPoint(iter.Key(), fmt.Sprintf("missing required tags: parsed tags: %q", tags))
|
||||
continue
|
||||
}
|
||||
// Not enough tags present.
|
||||
if tags.Len() < 2 {
|
||||
dropPoint(iter.Key(), fmt.Sprintf("missing required tags: parsed tags: %q", tags))
|
||||
continue
|
||||
}
|
||||
|
||||
// First tag key is not measurement tag.
|
||||
if !bytes.Equal(tags[0].Key, models.MeasurementTagKeyBytes) {
|
||||
dropPoint(iter.Key(), fmt.Sprintf("missing required measurement tag as first tag, got: %q", tags[0].Key))
|
||||
continue
|
||||
}
|
||||
// First tag key is not measurement tag.
|
||||
if !bytes.Equal(tags[0].Key, models.MeasurementTagKeyBytes) {
|
||||
dropPoint(iter.Key(), fmt.Sprintf("missing required measurement tag as first tag, got: %q", tags[0].Key))
|
||||
continue
|
||||
}
|
||||
|
||||
fkey, fval := tags[len(tags)-1].Key, tags[len(tags)-1].Value
|
||||
fkey, fval := tags[len(tags)-1].Key, tags[len(tags)-1].Value
|
||||
|
||||
// Last tag key is not field tag.
|
||||
if !bytes.Equal(fkey, models.FieldKeyTagKeyBytes) {
|
||||
dropPoint(iter.Key(), fmt.Sprintf("missing required field key tag as last tag, got: %q", tags[0].Key))
|
||||
continue
|
||||
}
|
||||
// Last tag key is not field tag.
|
||||
if !bytes.Equal(fkey, models.FieldKeyTagKeyBytes) {
|
||||
dropPoint(iter.Key(), fmt.Sprintf("missing required field key tag as last tag, got: %q", tags[0].Key))
|
||||
continue
|
||||
}
|
||||
|
||||
// The value representing the underlying field key is invalid if it's "time".
|
||||
if bytes.Equal(fval, timeBytes) {
|
||||
dropPoint(iter.Key(), fmt.Sprintf("invalid field key: input field %q is invalid", timeBytes))
|
||||
continue
|
||||
}
|
||||
// The value representing the underlying field key is invalid if it's "time".
|
||||
if bytes.Equal(fval, timeBytes) {
|
||||
dropPoint(iter.Key(), fmt.Sprintf("invalid field key: input field %q is invalid", timeBytes))
|
||||
continue
|
||||
}
|
||||
|
||||
// Filter out any tags with key equal to "time": they are invalid.
|
||||
if tags.Get(timeBytes) != nil {
|
||||
dropPoint(iter.Key(), fmt.Sprintf("invalid tag key: input tag %q on measurement %q is invalid", timeBytes, iter.Name()))
|
||||
continue
|
||||
}
|
||||
// Filter out any tags with key equal to "time": they are invalid.
|
||||
if tags.Get(timeBytes) != nil {
|
||||
dropPoint(iter.Key(), fmt.Sprintf("invalid tag key: input tag %q on measurement %q is invalid", timeBytes, iter.Name()))
|
||||
continue
|
||||
}
|
||||
|
||||
// Drop any point with invalid unicode characters in any of the tag keys or values.
|
||||
// This will also cover validating the value used to represent the field key.
|
||||
if !models.ValidTagTokens(tags) {
|
||||
dropPoint(iter.Key(), fmt.Sprintf("key contains invalid unicode: %q", iter.Key()))
|
||||
continue
|
||||
// Drop any point with invalid unicode characters in any of the tag keys or values.
|
||||
// This will also cover validating the value used to represent the field key.
|
||||
if !models.ValidTagTokens(tags) {
|
||||
dropPoint(iter.Key(), fmt.Sprintf("key contains invalid unicode: %q", iter.Key()))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
collection.Copy(j, iter.Index())
|
||||
|
|
Loading…
Reference in New Issue