fix: hard limit on field size while parsing line protocol (#22311)
Per https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#string we only support 64KB, but 1MB is a more realistic practical limit. Before this commit there was no enforcement of field value size. Co-authored-by: Sam Arnold <sarnold@influxdata.com>pull/22331/head
parent
df448c654b
commit
4dd2d7cc7f
|
@ -48,6 +48,7 @@ This release adds an embedded SQLite database for storing metadata required by t
|
||||||
1. [21938](https://github.com/influxdata/influxdb/pull/21938): Added route to delete individual secret.
|
1. [21938](https://github.com/influxdata/influxdb/pull/21938): Added route to delete individual secret.
|
||||||
1. [21972](https://github.com/influxdata/influxdb/pull/21972): Added support for notebooks and annotations.
|
1. [21972](https://github.com/influxdata/influxdb/pull/21972): Added support for notebooks and annotations.
|
||||||
1. [22135](https://github.com/influxdata/influxdb/pull/22135): Added route to return known resources.
|
1. [22135](https://github.com/influxdata/influxdb/pull/22135): Added route to return known resources.
|
||||||
|
1. [22311](https://github.com/influxdata/influxdb/pull/22311): Add `storage-no-validate-field-size` config to `influxd` to disable enforcement of max field size.
|
||||||
|
|
||||||
### Bug Fixes
|
### Bug Fixes
|
||||||
|
|
||||||
|
@ -57,6 +58,7 @@ This release adds an embedded SQLite database for storing metadata required by t
|
||||||
1. [22211](https://github.com/influxdata/influxdb/pull/22211): Prevent scheduling an inactivated task after updating it.
|
1. [22211](https://github.com/influxdata/influxdb/pull/22211): Prevent scheduling an inactivated task after updating it.
|
||||||
1. [22235](https://github.com/influxdata/influxdb/pull/22235): Avoid compaction queue stats flutter.
|
1. [22235](https://github.com/influxdata/influxdb/pull/22235): Avoid compaction queue stats flutter.
|
||||||
1. [22272](https://github.com/influxdata/influxdb/pull/22272): Requests to `/api/v2/authorizations` filter correctly on `org` and `user` parameters.
|
1. [22272](https://github.com/influxdata/influxdb/pull/22272): Requests to `/api/v2/authorizations` filter correctly on `org` and `user` parameters.
|
||||||
|
1. [22311](https://github.com/influxdata/influxdb/pull/22311): Enforce max field size while parsing line protocol.
|
||||||
|
|
||||||
## v2.0.8 [2021-08-13]
|
## v2.0.8 [2021-08-13]
|
||||||
|
|
||||||
|
|
|
@ -469,6 +469,11 @@ func (o *InfluxdOpts) BindCliOpts() []cli.Opt {
|
||||||
Flag: "storage-validate-keys",
|
Flag: "storage-validate-keys",
|
||||||
Desc: "Validates incoming writes to ensure keys only have valid unicode characters.",
|
Desc: "Validates incoming writes to ensure keys only have valid unicode characters.",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
DestP: &o.StorageConfig.Data.SkipFieldSizeValidation,
|
||||||
|
Flag: "storage-no-validate-field-size",
|
||||||
|
Desc: "Skip field-size validation on incoming writes.",
|
||||||
|
},
|
||||||
{
|
{
|
||||||
DestP: &o.StorageConfig.Data.CacheMaxMemorySize,
|
DestP: &o.StorageConfig.Data.CacheMaxMemorySize,
|
||||||
Flag: "storage-cache-max-memory-size",
|
Flag: "storage-cache-max-memory-size",
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
nethttp "net/http"
|
nethttp "net/http"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -13,6 +14,7 @@ import (
|
||||||
"github.com/influxdata/influxdb/v2"
|
"github.com/influxdata/influxdb/v2"
|
||||||
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
|
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
|
||||||
"github.com/influxdata/influxdb/v2/http"
|
"github.com/influxdata/influxdb/v2/http"
|
||||||
|
"github.com/influxdata/influxdb/v2/tsdb"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -53,6 +55,60 @@ func TestStorage_WriteAndQuery(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure the server will write all points possible with exception of
|
||||||
|
// - field type conflict
|
||||||
|
// - field too large
|
||||||
|
func TestStorage_PartialWrite(t *testing.T) {
|
||||||
|
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
|
||||||
|
defer l.ShutdownOrFail(t, ctx)
|
||||||
|
|
||||||
|
// Initial write of integer.
|
||||||
|
l.WritePointsOrFail(t, `cpu value=1i 946684800000000000`)
|
||||||
|
|
||||||
|
// Write mixed-field types.
|
||||||
|
err := l.WritePoints("cpu value=2i 946684800000000001\ncpu value=3 946684800000000002\ncpu value=4i 946684800000000003")
|
||||||
|
require.Error(t, err)
|
||||||
|
|
||||||
|
// Write oversized field value.
|
||||||
|
err = l.WritePoints(fmt.Sprintf(`cpu str="%s" 946684800000000004`, strings.Repeat("a", tsdb.MaxFieldValueLength+1)))
|
||||||
|
require.Error(t, err)
|
||||||
|
|
||||||
|
// Write biggest field value.
|
||||||
|
l.WritePointsOrFail(t, fmt.Sprintf(`cpu str="%s" 946684800000000005`, strings.Repeat("a", tsdb.MaxFieldValueLength)))
|
||||||
|
|
||||||
|
// Ensure the valid points were written.
|
||||||
|
qs := `from(bucket:"BUCKET") |> range(start:2000-01-01T00:00:00Z,stop:2000-01-02T00:00:00Z) |> keep(columns: ["_time","_field"])`
|
||||||
|
|
||||||
|
exp := `,result,table,_time,_field` + "\r\n" +
|
||||||
|
`,_result,0,2000-01-01T00:00:00.000000005Z,str` + "\r\n" + // str=max-length string
|
||||||
|
`,_result,1,2000-01-01T00:00:00Z,value` + "\r\n" + // value=1
|
||||||
|
`,_result,1,2000-01-01T00:00:00.000000001Z,value` + "\r\n" + // value=2
|
||||||
|
`,_result,1,2000-01-01T00:00:00.000000003Z,value` + "\r\n\r\n" // value=4
|
||||||
|
|
||||||
|
buf, err := http.SimpleQuery(l.URL(), qs, l.Org.Name, l.Auth.Token)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, exp, string(buf))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStorage_DisableMaxFieldValueSize(t *testing.T) {
|
||||||
|
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) {
|
||||||
|
o.StorageConfig.Data.SkipFieldSizeValidation = true
|
||||||
|
})
|
||||||
|
defer l.ShutdownOrFail(t, ctx)
|
||||||
|
|
||||||
|
// Write a normally-oversized field value.
|
||||||
|
l.WritePointsOrFail(t, fmt.Sprintf(`cpu str="%s" 946684800000000000`, strings.Repeat("a", tsdb.MaxFieldValueLength+1)))
|
||||||
|
|
||||||
|
// Check that the point can be queried.
|
||||||
|
qs := `from(bucket:"BUCKET") |> range(start:2000-01-01T00:00:00Z,stop:2000-01-02T00:00:00Z) |> keep(columns: ["_value"])`
|
||||||
|
exp := `,result,table,_value` + "\r\n" +
|
||||||
|
fmt.Sprintf(`,_result,0,%s`, strings.Repeat("a", tsdb.MaxFieldValueLength+1)) + "\r\n\r\n"
|
||||||
|
|
||||||
|
buf, err := http.SimpleQuery(l.URL(), qs, l.Org.Name, l.Auth.Token)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, exp, string(buf))
|
||||||
|
}
|
||||||
|
|
||||||
func TestLauncher_WriteAndQuery(t *testing.T) {
|
func TestLauncher_WriteAndQuery(t *testing.T) {
|
||||||
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
|
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
|
||||||
defer l.ShutdownOrFail(t, ctx)
|
defer l.ShutdownOrFail(t, ctx)
|
||||||
|
|
|
@ -98,6 +98,9 @@ type Config struct {
|
||||||
// Enables unicode validation on series keys on write.
|
// Enables unicode validation on series keys on write.
|
||||||
ValidateKeys bool `toml:"validate-keys"`
|
ValidateKeys bool `toml:"validate-keys"`
|
||||||
|
|
||||||
|
// When true, skips size validation on fields
|
||||||
|
SkipFieldSizeValidation bool `toml:"skip-field-size-validation"`
|
||||||
|
|
||||||
// Query logging
|
// Query logging
|
||||||
QueryLogEnabled bool `toml:"query-log-enabled"`
|
QueryLogEnabled bool `toml:"query-log-enabled"`
|
||||||
|
|
||||||
|
|
|
@ -183,9 +183,8 @@ type EngineOptions struct {
|
||||||
// nil will allow all combinations to pass.
|
// nil will allow all combinations to pass.
|
||||||
ShardFilter func(database, rp string, id uint64) bool
|
ShardFilter func(database, rp string, id uint64) bool
|
||||||
|
|
||||||
Config Config
|
Config Config
|
||||||
SeriesIDSets SeriesIDSets
|
SeriesIDSets SeriesIDSets
|
||||||
FieldValidator FieldValidator
|
|
||||||
|
|
||||||
OnNewEngine func(Engine)
|
OnNewEngine func(Engine)
|
||||||
|
|
||||||
|
|
|
@ -8,18 +8,31 @@ import (
|
||||||
"github.com/influxdata/influxql"
|
"github.com/influxdata/influxql"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FieldValidator should return a PartialWriteError if the point should not be written.
|
const MaxFieldValueLength = 1048576
|
||||||
type FieldValidator interface {
|
|
||||||
Validate(mf *MeasurementFields, point models.Point) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// defaultFieldValidator ensures that points do not use different types for fields that already exist.
|
// ValidateFields will return a PartialWriteError if:
|
||||||
type defaultFieldValidator struct{}
|
// - the point has inconsistent fields, or
|
||||||
|
// - the point has fields that are too long
|
||||||
// Validate will return a PartialWriteError if the point has inconsistent fields.
|
func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidation bool) error {
|
||||||
func (defaultFieldValidator) Validate(mf *MeasurementFields, point models.Point) error {
|
pointSize := point.StringSize()
|
||||||
iter := point.FieldIterator()
|
iter := point.FieldIterator()
|
||||||
for iter.Next() {
|
for iter.Next() {
|
||||||
|
if !skipSizeValidation {
|
||||||
|
// Check for size of field too large. Note it is much cheaper to check the whole point size
|
||||||
|
// than checking the StringValue size (StringValue potentially takes an allocation if it must
|
||||||
|
// unescape the string, and must at least parse the string)
|
||||||
|
if pointSize > MaxFieldValueLength && iter.Type() == models.String {
|
||||||
|
if sz := len(iter.StringValue()); sz > MaxFieldValueLength {
|
||||||
|
return PartialWriteError{
|
||||||
|
Reason: fmt.Sprintf(
|
||||||
|
"input field \"%s\" on measurement \"%s\" is too long, %d > %d",
|
||||||
|
iter.FieldKey(), point.Name(), sz, MaxFieldValueLength),
|
||||||
|
Dropped: 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Skip fields name "time", they are illegal.
|
// Skip fields name "time", they are illegal.
|
||||||
if bytes.Equal(iter.FieldKey(), timeBytes) {
|
if bytes.Equal(iter.FieldKey(), timeBytes) {
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -158,9 +158,6 @@ type Shard struct {
|
||||||
func NewShard(id uint64, path string, walPath string, sfile *SeriesFile, opt EngineOptions) *Shard {
|
func NewShard(id uint64, path string, walPath string, sfile *SeriesFile, opt EngineOptions) *Shard {
|
||||||
db, rp := decodeStorePath(path)
|
db, rp := decodeStorePath(path)
|
||||||
logger := zap.NewNop()
|
logger := zap.NewNop()
|
||||||
if opt.FieldValidator == nil {
|
|
||||||
opt.FieldValidator = defaultFieldValidator{}
|
|
||||||
}
|
|
||||||
|
|
||||||
s := &Shard{
|
s := &Shard{
|
||||||
id: id,
|
id: id,
|
||||||
|
@ -645,7 +642,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
|
||||||
mf := engine.MeasurementFields(name)
|
mf := engine.MeasurementFields(name)
|
||||||
|
|
||||||
// Check with the field validator.
|
// Check with the field validator.
|
||||||
if err := s.options.FieldValidator.Validate(mf, p); err != nil {
|
if err := ValidateFields(mf, p, s.options.Config.SkipFieldSizeValidation); err != nil {
|
||||||
switch err := err.(type) {
|
switch err := err.(type) {
|
||||||
case PartialWriteError:
|
case PartialWriteError:
|
||||||
if reason == "" {
|
if reason == "" {
|
||||||
|
|
Loading…
Reference in New Issue