Hide series type map behind feature flag
The performance is not good enough to enable by default so this allows the functionality to be merged while performance is improved.pull/9748/head
parent
ec3f5c353c
commit
aea9bf3464
|
@ -240,7 +240,12 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
|
|||
compactionLimiter: opt.CompactionLimiter,
|
||||
scheduler: newScheduler(stats, opt.CompactionLimiter.Capacity()),
|
||||
seriesIDSets: opt.SeriesIDSets,
|
||||
seriesTypeMap: radix.New(),
|
||||
}
|
||||
|
||||
// Feature flag to enable per-series type checking, by default this is off and
|
||||
// e.seriesTypeMap will be nil.
|
||||
if os.Getenv("INFLUXDB_SERIES_TYPE_CHECK_ENABLED") != "" {
|
||||
e.seriesTypeMap = radix.New()
|
||||
}
|
||||
|
||||
if e.traceLogging {
|
||||
|
@ -1213,32 +1218,34 @@ func (e *Engine) WritePoints(points []models.Point) error {
|
|||
keyBuf = append(keyBuf[:baseLen], iter.FieldKey()...)
|
||||
kstr := string(keyBuf)
|
||||
|
||||
// Fast-path check to see if the field for the series already exists.
|
||||
if v, ok := e.seriesTypeMap.Get(kstr); !ok {
|
||||
if typ, err := e.Type(keyBuf); err != nil {
|
||||
// Field type is unknown, we can try to add it.
|
||||
} else if typ != iter.Type() {
|
||||
// Existing type is different from what was passed in, we need to drop
|
||||
// this write and refresh the series type map.
|
||||
seriesErr = tsdb.ErrFieldTypeConflict
|
||||
e.seriesTypeMap.Insert(kstr, int(typ))
|
||||
continue
|
||||
}
|
||||
if e.seriesTypeMap != nil {
|
||||
// Fast-path check to see if the field for the series already exists.
|
||||
if v, ok := e.seriesTypeMap.Get(kstr); !ok {
|
||||
if typ, err := e.Type(keyBuf); err != nil {
|
||||
// Field type is unknown, we can try to add it.
|
||||
} else if typ != iter.Type() {
|
||||
// Existing type is different from what was passed in, we need to drop
|
||||
// this write and refresh the series type map.
|
||||
seriesErr = tsdb.ErrFieldTypeConflict
|
||||
e.seriesTypeMap.Insert(kstr, int(typ))
|
||||
continue
|
||||
}
|
||||
|
||||
// Doesn't exsts, so try to insert
|
||||
vv, ok := e.seriesTypeMap.Insert(kstr, int(iter.Type()))
|
||||
// Doesn't exsts, so try to insert
|
||||
vv, ok := e.seriesTypeMap.Insert(kstr, int(iter.Type()))
|
||||
|
||||
// We didn't insert and the type that exists isn't what we tried to insert, so
|
||||
// we have a conflict and must drop this field/series.
|
||||
if !ok || vv != int(iter.Type()) {
|
||||
// We didn't insert and the type that exists isn't what we tried to insert, so
|
||||
// we have a conflict and must drop this field/series.
|
||||
if !ok || vv != int(iter.Type()) {
|
||||
seriesErr = tsdb.ErrFieldTypeConflict
|
||||
continue
|
||||
}
|
||||
} else if v != int(iter.Type()) {
|
||||
// The series already exists, but with a different type. This is also a type conflict
|
||||
// and we need to drop this field/series.
|
||||
seriesErr = tsdb.ErrFieldTypeConflict
|
||||
continue
|
||||
}
|
||||
} else if v != int(iter.Type()) {
|
||||
// The series already exists, but with a different type. This is also a type conflict
|
||||
// and we need to drop this field/series.
|
||||
seriesErr = tsdb.ErrFieldTypeConflict
|
||||
continue
|
||||
}
|
||||
|
||||
var v Value
|
||||
|
|
|
@ -1893,7 +1893,8 @@ func TestEngine_DisableEnableCompactions_Concurrent(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEngine_WritePoints_TypeConflict(t *testing.T) {
|
||||
t.Parallel()
|
||||
os.Setenv("INFLUXDB_SERIES_TYPE_CHECK_ENABLED", "1")
|
||||
defer os.Unsetenv("INFLUXDB_SERIES_TYPE_CHECK_ENABLED")
|
||||
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
t.Run(index, func(t *testing.T) {
|
||||
|
@ -1929,7 +1930,7 @@ func TestEngine_WritePoints_TypeConflict(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestEngine_WritePoints_Reload(t *testing.T) {
|
||||
t.Parallel()
|
||||
t.Skip("Disabled until INFLUXDB_SERIES_TYPE_CHECK_ENABLED is enabled by default")
|
||||
|
||||
for _, index := range tsdb.RegisteredIndexes() {
|
||||
t.Run(index, func(t *testing.T) {
|
||||
|
@ -1957,7 +1958,6 @@ func TestEngine_WritePoints_Reload(t *testing.T) {
|
|||
t.Fatalf("unexpected error writing snapshot: %v", err)
|
||||
}
|
||||
|
||||
println("repopen")
|
||||
if err := e.Reopen(); err != nil {
|
||||
t.Fatalf("unexpected error reopning engine: %v", err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue