fix: prevent differing field types in the same shard (#26025)

* fix: lock MeasurementFields while validating (#25998)

There was a window where a race between writes with
differing types for the same field were being validated.
Lock the  MeasurementFields struct during field
validation to avoid this.

closes https://github.com/influxdata/influxdb/issues/23756

(cherry picked from commit 5a20a835a5)

helps https://github.com/influxdata/influxdb/issues/26001

* fix: switch MeasurementFields from atomic.Value to sync.Map (#26022)

Simplify and speed up synchronization for
MeasurementFields structures by switching
from a mutex and atomic.Value to a sync.Map

(cherry picked from commit b617eb24a7)

closes https://github.com/influxdata/influxdb/issues/26001
pull/25770/merge
davidby-influx 2025-02-14 12:28:10 -08:00 committed by GitHub
parent 982ae57f22
commit 8711e2d6cc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 326 additions and 169 deletions

View File

@ -0,0 +1,47 @@
package gensyncmap
import "sync"
type Map[K comparable, V any] struct {
m sync.Map
}
func (m *Map[K, V]) Delete(key K) {
m.m.Delete(key)
}
func (m *Map[K, V]) Load(key K) (value V, ok bool) {
v, ok := m.m.Load(key)
if !ok {
return value, ok
}
return v.(V), ok
}
func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool) {
v, loaded := m.m.LoadAndDelete(key)
if !loaded {
return value, loaded
}
return v.(V), loaded
}
func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
a, loaded := m.m.LoadOrStore(key, value)
return a.(V), loaded
}
func (m *Map[K, V]) Range(f func(key K, value V) bool) {
m.m.Range(func(key, value any) bool { return f(key.(K), value.(V)) })
}
func (m *Map[K, V]) Store(key K, value V) { m.m.Store(key, value) }
func (m *Map[K, V]) Len() int {
var n int
m.m.Range(func(_, _ any) bool {
n++
return true
})
return n
}

View File

@ -1264,7 +1264,7 @@ func (e *Engine) addToIndexFromKey(keys [][]byte, fieldTypes []influxql.DataType
keys[i], field = SeriesAndFieldFromCompositeKey(keys[i])
name := models.ParseName(keys[i])
mf := e.fieldset.CreateFieldsIfNotExists(name)
if err := mf.CreateFieldIfNotExists(field, fieldTypes[i]); err != nil {
if _, err := mf.CreateFieldIfNotExists(field, fieldTypes[i]); err != nil {
return err
}

View File

@ -13,9 +13,11 @@ const MaxFieldValueLength = 1048576
// ValidateFields will return a PartialWriteError if:
// - the point has inconsistent fields, or
// - the point has fields that are too long
func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidation bool) error {
func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidation bool) ([]*FieldCreate, error) {
pointSize := point.StringSize()
iter := point.FieldIterator()
var fieldsToCreate []*FieldCreate
for iter.Next() {
if !skipSizeValidation {
// Check for size of field too large. Note it is much cheaper to check the whole point size
@ -23,7 +25,7 @@ func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidatio
// unescape the string, and must at least parse the string)
if pointSize > MaxFieldValueLength && iter.Type() == models.String {
if sz := len(iter.StringValue()); sz > MaxFieldValueLength {
return PartialWriteError{
return nil, PartialWriteError{
Reason: fmt.Sprintf(
"input field \"%s\" on measurement \"%s\" is too long, %d > %d",
iter.FieldKey(), point.Name(), sz, MaxFieldValueLength),
@ -33,14 +35,9 @@ func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidatio
}
}
fieldKey := iter.FieldKey()
// Skip fields name "time", they are illegal.
if bytes.Equal(iter.FieldKey(), timeBytes) {
continue
}
// If the fields is not present, there cannot be a conflict.
f := mf.FieldBytes(iter.FieldKey())
if f == nil {
if bytes.Equal(fieldKey, timeBytes) {
continue
}
@ -49,18 +46,26 @@ func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidatio
continue
}
// If the types are not the same, there is a conflict.
if f.Type != dataType {
return PartialWriteError{
// If the field is not present, remember to create it.
f := mf.FieldBytes(fieldKey)
if f == nil {
fieldsToCreate = append(fieldsToCreate, &FieldCreate{
Measurement: point.Name(),
Field: &Field{
Name: string(fieldKey),
Type: dataType,
}})
} else if f.Type != dataType {
// If the types are not the same, there is a conflict.
return nil, PartialWriteError{
Reason: fmt.Sprintf(
"%s: input field \"%s\" on measurement \"%s\" is type %s, already exists as type %s",
ErrFieldTypeConflict, iter.FieldKey(), point.Name(), dataType, f.Type),
ErrFieldTypeConflict, fieldKey, point.Name(), dataType, f.Type),
Dropped: 1,
}
}
}
return nil
return fieldsToCreate, nil
}
// dataTypeFromModelsFieldType returns the influxql.DataType that corresponds to the

View File

@ -14,7 +14,6 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"unicode"
"unsafe"
@ -23,6 +22,7 @@ import (
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/bytesutil"
"github.com/influxdata/influxdb/v2/pkg/data/gensyncmap"
errors2 "github.com/influxdata/influxdb/v2/pkg/errors"
"github.com/influxdata/influxdb/v2/pkg/estimator"
"github.com/influxdata/influxdb/v2/pkg/file"
@ -704,16 +704,17 @@ func (s *Shard) WritePoints(ctx context.Context, points []models.Point) (rErr er
// to the caller, but continue on writing the remaining points.
writeError = err
}
s.stats.fieldsCreated.Add(float64(len(fieldsToCreate)))
// add any new fields and keep track of what needs to be saved
if err := s.createFieldsAndMeasurements(fieldsToCreate); err != nil {
if numFieldsCreated, err := s.createFieldsAndMeasurements(fieldsToCreate); err != nil {
return err
} else {
s.stats.fieldsCreated.Add(float64(numFieldsCreated))
}
// Write to the engine.
if err := engine.WritePoints(ctx, points); err != nil {
return fmt.Errorf("engine: %s", err)
return fmt.Errorf("engine: %w", err)
}
return writeError
@ -815,61 +816,42 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
continue
}
// Skip any points whos keys have been dropped. Dropped has already been incremented for them.
// Skip any points whose keys have been dropped. Dropped has already been incremented for them.
if len(droppedKeys) > 0 && bytesutil.Contains(droppedKeys, keys[i]) {
continue
}
name := p.Name()
mf := engine.MeasurementFields(name)
// Check with the field validator.
if err := ValidateFields(mf, p, s.options.Config.SkipFieldSizeValidation); err != nil {
switch err := err.(type) {
case PartialWriteError:
if reason == "" {
reason = err.Reason
err := func(p models.Point, iter models.FieldIterator) error {
var newFields []*FieldCreate
var validateErr error
name := p.Name()
mf := engine.MeasurementFields(name)
// Check with the field validator.
if newFields, validateErr = ValidateFields(mf, p, s.options.Config.SkipFieldSizeValidation); validateErr != nil {
var err PartialWriteError
switch {
case errors.As(validateErr, &err):
// This will turn into an error later, outside this lambda
if reason == "" {
reason = err.Reason
}
dropped += err.Dropped
s.stats.writesDropped.Add(float64(err.Dropped))
default:
return err
}
dropped += err.Dropped
s.stats.writesDropped.Add(float64(err.Dropped))
default:
return nil, nil, err
}
continue
}
points[j] = points[i]
j++
// Create any fields that are missing.
iter.Reset()
for iter.Next() {
fieldKey := iter.FieldKey()
// Skip fields named "time". They are illegal.
if bytes.Equal(fieldKey, timeBytes) {
continue
return nil
}
if mf.FieldBytes(fieldKey) != nil {
continue
}
dataType := dataTypeFromModelsFieldType(iter.Type())
if dataType == influxql.Unknown {
continue
}
fieldsToCreate = append(fieldsToCreate, &FieldCreate{
Measurement: name,
Field: &Field{
Name: string(fieldKey),
Type: dataType,
},
})
points[j] = points[i]
j++
fieldsToCreate = append(fieldsToCreate, newFields...)
return nil
}(p, iter)
if err != nil {
return nil, nil, err
}
}
if dropped > 0 {
err = PartialWriteError{Reason: reason, Dropped: dropped}
}
@ -899,30 +881,32 @@ func makePrintable(s string) string {
return b.String()
}
func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error {
func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) (int, error) {
if len(fieldsToCreate) == 0 {
return nil
return 0, nil
}
engine, err := s.engineNoLock()
if err != nil {
return err
return 0, err
}
numCreated := 0
// add fields
changes := make([]*FieldChange, 0, len(fieldsToCreate))
for _, f := range fieldsToCreate {
mf := engine.MeasurementFields(f.Measurement)
if err := mf.CreateFieldIfNotExists([]byte(f.Field.Name), f.Field.Type); err != nil {
return err
if created, err := mf.CreateFieldIfNotExists([]byte(f.Field.Name), f.Field.Type); err != nil {
return 0, err
} else if created {
numCreated++
changes = append(changes, &FieldChange{
FieldCreate: *f,
ChangeType: AddMeasurementField,
})
}
changes = append(changes, &FieldChange{
FieldCreate: *f,
ChangeType: AddMeasurementField,
})
}
return engine.MeasurementFieldSet().Save(changes)
return numCreated, engine.MeasurementFieldSet().Save(changes)
}
// DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive)
@ -1837,25 +1821,21 @@ func (a Shards) ExpandSources(sources influxql.Sources) (influxql.Sources, error
// MeasurementFields holds the fields of a measurement and their codec.
type MeasurementFields struct {
mu sync.Mutex
fields atomic.Value // map[string]*Field
fields gensyncmap.Map[string, *Field]
}
// NewMeasurementFields returns an initialised *MeasurementFields value.
func NewMeasurementFields() *MeasurementFields {
fields := make(map[string]*Field)
mf := &MeasurementFields{}
mf.fields.Store(fields)
return mf
return &MeasurementFields{fields: gensyncmap.Map[string, *Field]{}}
}
func (m *MeasurementFields) FieldKeys() []string {
fields := m.fields.Load().(map[string]*Field)
a := make([]string, 0, len(fields))
for key := range fields {
a = append(a, key)
}
var a []string
m.fields.Range(func(k string, _ *Field) bool {
a = append(a, k)
return true
})
sort.Strings(a)
return a
}
@ -1863,66 +1843,38 @@ func (m *MeasurementFields) FieldKeys() []string {
// bytes estimates the memory footprint of this MeasurementFields, in bytes.
func (m *MeasurementFields) bytes() int {
var b int
b += 24 // mu RWMutex is 24 bytes
fields := m.fields.Load().(map[string]*Field)
b += int(unsafe.Sizeof(fields))
for k, v := range fields {
b += int(unsafe.Sizeof(m.fields))
m.fields.Range(func(k string, v *Field) bool {
b += int(unsafe.Sizeof(k)) + len(k)
b += int(unsafe.Sizeof(v)+unsafe.Sizeof(*v)) + len(v.Name)
}
return true
})
return b
}
// CreateFieldIfNotExists creates a new field with an autoincrementing ID.
// Returns an error if 255 fields have already been created on the measurement or
// the fields already exists with a different type.
func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType) error {
fields := m.fields.Load().(map[string]*Field)
// Ignore if the field already exists.
if f := fields[string(name)]; f != nil {
if f.Type != typ {
return ErrFieldTypeConflict
}
return nil
}
m.mu.Lock()
defer m.mu.Unlock()
fields = m.fields.Load().(map[string]*Field)
// Re-check field and type under write lock.
if f := fields[string(name)]; f != nil {
if f.Type != typ {
return ErrFieldTypeConflict
}
return nil
}
fieldsUpdate := make(map[string]*Field, len(fields)+1)
for k, v := range fields {
fieldsUpdate[k] = v
}
// Create and append a new field.
f := &Field{
ID: uint8(len(fields) + 1),
// CreateFieldIfNotExists creates a new field with the given name and type.
// Returns an error if the field already exists with a different type.
func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType) (bool, error) {
newField := &Field{
Name: string(name),
Type: typ,
}
fieldsUpdate[string(name)] = f
m.fields.Store(fieldsUpdate)
return nil
if f, loaded := m.fields.LoadOrStore(newField.Name, newField); loaded {
if f.Type != typ {
return false, ErrFieldTypeConflict
}
return false, nil
}
return true, nil
}
func (m *MeasurementFields) FieldN() int {
n := len(m.fields.Load().(map[string]*Field))
return n
return m.fields.Len()
}
// Field returns the field for name, or nil if there is no field for name.
func (m *MeasurementFields) Field(name string) *Field {
f := m.fields.Load().(map[string]*Field)[name]
f, _ := m.fields.Load(name)
return f
}
@ -1930,8 +1882,8 @@ func (m *MeasurementFields) HasField(name string) bool {
if m == nil {
return false
}
f := m.fields.Load().(map[string]*Field)[name]
return f != nil
_, ok := m.fields.Load(name)
return ok
}
// FieldBytes returns the field for name, or nil if there is no field for name.
@ -1939,27 +1891,23 @@ func (m *MeasurementFields) HasField(name string) bool {
// it avoids a string allocation, which can't be avoided if the caller converts
// the []byte to a string and calls Field.
func (m *MeasurementFields) FieldBytes(name []byte) *Field {
f := m.fields.Load().(map[string]*Field)[string(name)]
return f
return m.Field(string(name))
}
// FieldSet returns the set of fields and their types for the measurement.
func (m *MeasurementFields) FieldSet() map[string]influxql.DataType {
fields := m.fields.Load().(map[string]*Field)
fieldTypes := make(map[string]influxql.DataType)
for name, f := range fields {
fieldTypes[name] = f.Type
}
m.fields.Range(func(k string, v *Field) bool {
fieldTypes[k] = v.Type
return true
})
return fieldTypes
}
func (m *MeasurementFields) ForEachField(fn func(name string, typ influxql.DataType) bool) {
fields := m.fields.Load().(map[string]*Field)
for name, f := range fields {
if !fn(name, f.Type) {
return
}
}
m.fields.Range(func(k string, v *Field) bool {
return fn(k, v.Type)
})
}
type FieldChanges []*FieldChange
@ -2438,12 +2386,11 @@ func (fs *MeasurementFieldSet) load() (rErr error) {
}
fs.fields = make(map[string]*MeasurementFields, len(pb.GetMeasurements()))
for _, measurement := range pb.GetMeasurements() {
fields := make(map[string]*Field, len(measurement.GetFields()))
set := NewMeasurementFields()
for _, field := range measurement.GetFields() {
fields[string(field.GetName())] = &Field{Name: string(field.GetName()), Type: influxql.DataType(field.GetType())}
name := string(field.GetName())
set.fields.Store(name, &Field{Name: name, Type: influxql.DataType(field.GetType())})
}
set := &MeasurementFields{}
set.fields.Store(fields)
fs.fields[string(measurement.GetName())] = set
}
return nil
@ -2557,7 +2504,6 @@ func (fscm *measurementFieldSetChangeMgr) loadFieldChangeSet(r io.Reader) (Field
FieldCreate: FieldCreate{
Measurement: fc.Measurement,
Field: &Field{
ID: 0,
Name: string(fc.Field.Name),
Type: influxql.DataType(fc.Field.Type),
},
@ -2585,7 +2531,7 @@ func (fs *MeasurementFieldSet) ApplyChanges() error {
fs.Delete(string(fc.Measurement))
} else {
mf := fs.CreateFieldsIfNotExists(fc.Measurement)
if err := mf.CreateFieldIfNotExists([]byte(fc.Field.Name), fc.Field.Type); err != nil {
if _, err := mf.CreateFieldIfNotExists([]byte(fc.Field.Name), fc.Field.Type); err != nil {
err = fmt.Errorf("failed creating %q.%q: %w", fc.Measurement, fc.Field.Name, err)
log.Error("field creation", zap.Error(err))
return err
@ -2598,7 +2544,6 @@ func (fs *MeasurementFieldSet) ApplyChanges() error {
// Field represents a series field. All of the fields must be hashable.
type Field struct {
ID uint8 `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Type influxql.DataType `json:"type,omitempty"`
}

View File

@ -7,6 +7,7 @@ import (
"fmt"
"math"
"os"
"path"
"path/filepath"
"reflect"
"regexp"
@ -14,12 +15,10 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
assert2 "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/davecgh/go-spew/spew"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
@ -30,8 +29,11 @@ import (
"github.com/influxdata/influxdb/v2/pkg/testing/assert"
"github.com/influxdata/influxdb/v2/tsdb"
_ "github.com/influxdata/influxdb/v2/tsdb/engine"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
_ "github.com/influxdata/influxdb/v2/tsdb/index"
"github.com/influxdata/influxql"
assert2 "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestShardWriteAndIndex(t *testing.T) {
@ -1602,13 +1604,13 @@ func TestMeasurementFieldSet_SaveLoad(t *testing.T) {
}
defer checkMeasurementFieldSetClose(t, mf)
fields := mf.CreateFieldsIfNotExists([]byte(measurement))
if err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
if _, err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
t.Fatalf("create field error: %v", err)
}
change := tsdb.FieldChange{
FieldCreate: tsdb.FieldCreate{
Measurement: []byte(measurement),
Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float},
Field: &tsdb.Field{Name: fieldName, Type: influxql.Float},
},
ChangeType: tsdb.AddMeasurementField,
}
@ -1653,13 +1655,13 @@ func TestMeasurementFieldSet_Corrupt(t *testing.T) {
measurement := []byte("cpu")
fields := mf.CreateFieldsIfNotExists(measurement)
fieldName := "value"
if err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
if _, err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
t.Fatalf("create field error: %v", err)
}
change := tsdb.FieldChange{
FieldCreate: tsdb.FieldCreate{
Measurement: []byte(measurement),
Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float},
Field: &tsdb.Field{Name: fieldName, Type: influxql.Float},
},
ChangeType: tsdb.AddMeasurementField,
}
@ -1723,13 +1725,13 @@ func TestMeasurementFieldSet_CorruptChangeFile(t *testing.T) {
defer checkMeasurementFieldSetClose(t, mf)
for _, f := range testFields {
fields := mf.CreateFieldsIfNotExists([]byte(f.Measurement))
if err := fields.CreateFieldIfNotExists([]byte(f.Field), f.FieldType); err != nil {
if _, err := fields.CreateFieldIfNotExists([]byte(f.Field), f.FieldType); err != nil {
t.Fatalf("create field error: %v", err)
}
change := tsdb.FieldChange{
FieldCreate: tsdb.FieldCreate{
Measurement: []byte(f.Measurement),
Field: &tsdb.Field{ID: 0, Name: f.Field, Type: f.FieldType},
Field: &tsdb.Field{Name: f.Field, Type: f.FieldType},
},
ChangeType: tsdb.AddMeasurementField,
}
@ -1784,14 +1786,14 @@ func TestMeasurementFieldSet_DeleteEmpty(t *testing.T) {
defer checkMeasurementFieldSetClose(t, mf)
fields := mf.CreateFieldsIfNotExists([]byte(measurement))
if err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
if _, err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
t.Fatalf("create field error: %v", err)
}
change := tsdb.FieldChange{
FieldCreate: tsdb.FieldCreate{
Measurement: []byte(measurement),
Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float},
Field: &tsdb.Field{Name: fieldName, Type: influxql.Float},
},
ChangeType: tsdb.AddMeasurementField,
}
@ -1933,14 +1935,15 @@ func testFieldMaker(t *testing.T, wg *sync.WaitGroup, mf *tsdb.MeasurementFieldS
fields := mf.CreateFieldsIfNotExists([]byte(measurement))
for _, fieldName := range fieldNames {
if err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
t.Errorf("create field error: %v", err)
if _, err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
t.Logf("create field error: %v", err)
t.Fail()
return
}
change := tsdb.FieldChange{
FieldCreate: tsdb.FieldCreate{
Measurement: []byte(measurement),
Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float},
Field: &tsdb.Field{Name: fieldName, Type: influxql.Float},
},
ChangeType: tsdb.AddMeasurementField,
}
@ -2582,3 +2585,160 @@ func (a seriesIDSets) ForEach(f func(ids *tsdb.SeriesIDSet)) error {
}
return nil
}
// Tests concurrently writing to the same shard with different field types which
// can trigger a panic when the shard is snapshotted to TSM files.
func TestShard_WritePoints_ForceFieldConflictConcurrent(t *testing.T) {
const Runs = 50
if testing.Short() || runtime.GOOS == "windows" {
t.Skip("Skipping on short or windows")
}
for i := 0; i < Runs; i++ {
conflictShard(t, i)
}
}
func conflictShard(t *testing.T, run int) {
const measurement = "cpu"
const field = "value"
const numTypes = 4 // float, int, bool, string
const pointCopies = 10
const trialsPerShard = 10
tmpDir, _ := os.MkdirTemp("", "shard_test")
defer func() {
require.NoError(t, os.RemoveAll(tmpDir), "removing %s", tmpDir)
}()
tmpShard := filepath.Join(tmpDir, "shard")
tmpWal := filepath.Join(tmpDir, "wal")
sfile := MustOpenSeriesFile(t)
defer func() {
require.NoError(t, sfile.Close(), "closing series file")
require.NoError(t, os.RemoveAll(sfile.Path()), "removing series file %s", sfile.Path())
}()
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = tmpWal
opts.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{})
sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts)
require.NoError(t, sh.Open(context.Background()), "opening shard: %s", sh.Path())
defer func() {
require.NoError(t, sh.Close(), "closing shard %s", tmpShard)
}()
var wg sync.WaitGroup
mu := sync.RWMutex{}
maxConcurrency := atomic.Int64{}
currentTime := time.Now()
points := make([]models.Point, 0, pointCopies*numTypes)
for i := 0; i < pointCopies; i++ {
points = append(points, models.MustNewPoint(
measurement,
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{field: 1.0},
currentTime.Add(time.Duration(i)*time.Second),
))
points = append(points, models.MustNewPoint(
measurement,
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{field: int64(1)},
currentTime.Add(time.Duration(i)*time.Second),
))
points = append(points, models.MustNewPoint(
measurement,
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{field: "one"},
currentTime.Add(time.Duration(i)*time.Second),
))
points = append(points, models.MustNewPoint(
measurement,
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{field: true},
currentTime.Add(time.Duration(i)*time.Second),
))
}
concurrency := atomic.Int64{}
for i := 0; i < trialsPerShard; i++ {
mu.Lock()
wg.Add(len(points))
// Write points concurrently
for i := 0; i < pointCopies; i++ {
for j := 0; j < numTypes; j++ {
concurrency.Add(1)
go func(mp models.Point) {
mu.RLock()
defer concurrency.Add(-1)
defer mu.RUnlock()
defer wg.Done()
if err := sh.WritePoints(context.Background(), []models.Point{mp}); err == nil {
fs, err := mp.Fields()
require.NoError(t, err, "getting fields")
require.Equal(t,
sh.MeasurementFields([]byte(measurement)).Field(field).Type,
influxql.InspectDataType(fs[field]),
"field types mismatch on run %d: types exp: %s, got: %s", run+1, sh.MeasurementFields([]byte(measurement)).Field(field).Type.String(), influxql.InspectDataType(fs[field]).String())
} else {
require.ErrorContains(t, err, tsdb.ErrFieldTypeConflict.Error(), "unexpected error")
}
if c := concurrency.Load(); maxConcurrency.Load() < c {
maxConcurrency.Store(c)
}
}(points[i*numTypes+j])
}
}
mu.Unlock()
wg.Wait()
dir, err := sh.CreateSnapshot(false)
require.NoError(t, err, "creating snapshot: %s", sh.Path())
require.NoError(t, os.RemoveAll(dir), "removing snapshot directory %s", dir)
}
keyType := map[string]byte{}
files, err := os.ReadDir(tmpShard)
require.NoError(t, err, "reading shard directory %s", tmpShard)
for i, file := range files {
if !strings.HasSuffix(path.Ext(file.Name()), tsm1.TSMFileExtension) {
continue
}
ffile := path.Join(tmpShard, file.Name())
fh, err := os.Open(ffile)
require.NoError(t, err, "opening snapshot file %s", ffile)
tr, err := tsm1.NewTSMReader(fh)
require.NoError(t, err, "creating TSM reader for %s", ffile)
key, typ := tr.KeyAt(0)
if oldTyp, ok := keyType[string(key)]; ok {
require.Equal(t, oldTyp, typ,
"field type mismatch in run %d TSM file %d -- %q in %s\nfirst seen: %s, newest: %s, field type: %s",
run+1,
i+1,
string(key),
ffile,
blockTypeString(oldTyp),
blockTypeString(typ),
sh.MeasurementFields([]byte(measurement)).Field(field).Type.String())
} else {
keyType[string(key)] = typ
}
// Must close after all uses of key (mapped memory)
require.NoError(t, tr.Close(), "closing TSM reader")
}
// t.Logf("Type %s wins run %d with concurrency: %d", sh.MeasurementFields([]byte(measurement)).Field(field).Type.String(), run+1, maxConcurrency.Load())
}
func blockTypeString(typ byte) string {
switch typ {
case tsm1.BlockFloat64:
return "float64"
case tsm1.BlockInteger:
return "int64"
case tsm1.BlockBoolean:
return "bool"
case tsm1.BlockString:
return "string"
default:
return "unknown"
}
}