* fix(error): SELECT INTO doesn't return error with unsupported value (#20429)
When a SELECT INTO query generates an illegal value that cannot be inserted,
like +/- Inf, it should return an error, rather than failing silently.
This adds a boolean parameter to the [data] section of influxdb.conf:
* strict-error-handling
When false, the default, the old behavior is preserved. When true,
unsupported values will return an error from SELECT INTO queries
Fixes https://github.com/influxdata/influxdb/issues/20426
(cherry picked from commit 9e33be2619
)
Fixes https://github.com/influxdata/influxdb/issues/20427
pull/20303/head^2
v1.8.4rc3
parent
ab020f74ab
commit
b2cb862484
|
@ -13,6 +13,7 @@ v1.8.4 [unreleased]
|
|||
- [#20277](https://github.com/influxdata/influxdb/pull/20277): fix(query): Group By queries with offset that crosses a DST boundary can fail.
|
||||
- [#20295](https://github.com/influxdata/influxdb/pull/20295): fix: cp.Mux.Serve() closes all net.Listener instances silently on error.
|
||||
- [#19832](https://github.com/influxdata/influxdb/pull/19832): fix(prometheus): regexp handling should comply with PromQL.
|
||||
- [#20432](https://github.com/influxdata/influxdb/pull/20432): fix(error): SELECT INTO doesn't return error with unsupported value
|
||||
|
||||
v1.8.3 [2020-09-30]
|
||||
-------------------
|
||||
|
|
|
@ -215,11 +215,12 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
|
|||
MetaClient: s.MetaClient,
|
||||
TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore},
|
||||
},
|
||||
Monitor: s.Monitor,
|
||||
PointsWriter: s.PointsWriter,
|
||||
MaxSelectPointN: c.Coordinator.MaxSelectPointN,
|
||||
MaxSelectSeriesN: c.Coordinator.MaxSelectSeriesN,
|
||||
MaxSelectBucketsN: c.Coordinator.MaxSelectBucketsN,
|
||||
StrictErrorHandling: s.TSDBStore.EngineOptions.Config.StrictErrorHandling,
|
||||
Monitor: s.Monitor,
|
||||
PointsWriter: s.PointsWriter,
|
||||
MaxSelectPointN: c.Coordinator.MaxSelectPointN,
|
||||
MaxSelectSeriesN: c.Coordinator.MaxSelectSeriesN,
|
||||
MaxSelectBucketsN: c.Coordinator.MaxSelectBucketsN,
|
||||
}
|
||||
s.QueryExecutor.TaskManager.QueryTimeout = time.Duration(c.Coordinator.QueryTimeout)
|
||||
s.QueryExecutor.TaskManager.LogQueriesAfter = time.Duration(c.Coordinator.LogQueriesAfter)
|
||||
|
|
|
@ -51,6 +51,9 @@ type StatementExecutor struct {
|
|||
WritePointsInto(*IntoWriteRequest) error
|
||||
}
|
||||
|
||||
// Disallow INF values in SELECT INTO and other previously ignored errors
|
||||
StrictErrorHandling bool
|
||||
|
||||
// Select statement limits
|
||||
MaxSelectPointN int
|
||||
MaxSelectSeriesN int
|
||||
|
@ -573,7 +576,7 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
|
|||
|
||||
// Write points back into system for INTO statements.
|
||||
if stmt.Target != nil {
|
||||
n, err := e.writeInto(pointsWriter, stmt, row)
|
||||
n, err := e.writeInto(pointsWriter, stmt, row, e.StrictErrorHandling)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1188,7 +1191,7 @@ func (w *BufferedPointsWriter) Len() int { return len(w.buf) }
|
|||
// Cap returns the capacity (in points) of the buffer.
|
||||
func (w *BufferedPointsWriter) Cap() int { return cap(w.buf) }
|
||||
|
||||
func (e *StatementExecutor) writeInto(w pointsWriter, stmt *influxql.SelectStatement, row *models.Row) (n int64, err error) {
|
||||
func (e *StatementExecutor) writeInto(w pointsWriter, stmt *influxql.SelectStatement, row *models.Row, strictErrorHandling bool) (n int64, err error) {
|
||||
if stmt.Target.Measurement.Database == "" {
|
||||
return 0, errNoDatabaseInTarget
|
||||
}
|
||||
|
@ -1205,7 +1208,7 @@ func (e *StatementExecutor) writeInto(w pointsWriter, stmt *influxql.SelectState
|
|||
name = row.Name
|
||||
}
|
||||
|
||||
points, err := convertRowToPoints(name, row)
|
||||
points, err := convertRowToPoints(name, row, strictErrorHandling)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -1224,7 +1227,7 @@ func (e *StatementExecutor) writeInto(w pointsWriter, stmt *influxql.SelectState
|
|||
var errNoDatabaseInTarget = errors.New("no database in target")
|
||||
|
||||
// convertRowToPoints will convert a query result Row into Points that can be written back in.
|
||||
func convertRowToPoints(measurementName string, row *models.Row) ([]models.Point, error) {
|
||||
func convertRowToPoints(measurementName string, row *models.Row, strictErrorHandling bool) ([]models.Point, error) {
|
||||
// figure out which parts of the result are the time and which are the fields
|
||||
timeIndex := -1
|
||||
fieldIndexes := make(map[string]int)
|
||||
|
@ -1256,13 +1259,16 @@ func convertRowToPoints(measurementName string, row *models.Row) ([]models.Point
|
|||
|
||||
p, err := models.NewPoint(measurementName, models.NewTags(row.Tags), vals, v[timeIndex].(time.Time))
|
||||
if err != nil {
|
||||
// Drop points that can't be stored
|
||||
continue
|
||||
if !strictErrorHandling {
|
||||
// Drop points that can't be stored
|
||||
continue
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
points = append(points, p)
|
||||
}
|
||||
|
||||
return points, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -67,6 +67,10 @@
|
|||
# log any sensitive data contained within a query.
|
||||
# query-log-enabled = true
|
||||
|
||||
# Provides more error checking. For example, SELECT INTO will err out inserting an +/-Inf value
|
||||
# rather than silently failing.
|
||||
# strict-error-handling = false
|
||||
|
||||
# Validates incoming writes to ensure keys only have valid unicode characters.
|
||||
# This setting will incur a small overhead because every key must be checked.
|
||||
# validate-keys = false
|
||||
|
|
|
@ -93,6 +93,9 @@ type Config struct {
|
|||
// Enables unicode validation on series keys on write.
|
||||
ValidateKeys bool `toml:"validate-keys"`
|
||||
|
||||
// Enables strict error handling. For example, forces SELECT INTO to err out on INF values.
|
||||
StrictErrorHandling bool `toml:"strict-error-handling"`
|
||||
|
||||
// Query logging
|
||||
QueryLogEnabled bool `toml:"query-log-enabled"`
|
||||
|
||||
|
@ -155,7 +158,8 @@ func NewConfig() Config {
|
|||
Engine: DefaultEngine,
|
||||
Index: DefaultIndex,
|
||||
|
||||
QueryLogEnabled: true,
|
||||
StrictErrorHandling: false,
|
||||
QueryLogEnabled: true,
|
||||
|
||||
CacheMaxMemorySize: toml.Size(DefaultCacheMaxMemorySize),
|
||||
CacheSnapshotMemorySize: toml.Size(DefaultCacheSnapshotMemorySize),
|
||||
|
@ -229,6 +233,7 @@ func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) {
|
|||
"dir": c.Dir,
|
||||
"wal-dir": c.WALDir,
|
||||
"wal-fsync-delay": c.WALFsyncDelay,
|
||||
"strict-error-handling": c.StrictErrorHandling,
|
||||
"cache-max-memory-size": c.CacheMaxMemorySize,
|
||||
"cache-snapshot-memory-size": c.CacheSnapshotMemorySize,
|
||||
"cache-snapshot-write-cold-duration": c.CacheSnapshotWriteColdDuration,
|
||||
|
|
Loading…
Reference in New Issue