package tsdb import ( "bytes" "context" "encoding/binary" "errors" "fmt" "io" "os" "path/filepath" "regexp" "runtime" "sort" "strings" "sync" "sync/atomic" "time" "unicode" "unsafe" "github.com/influxdata/influxdb/v2/influxql/query" "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/pkg/bytesutil" errors2 "github.com/influxdata/influxdb/v2/pkg/errors" "github.com/influxdata/influxdb/v2/pkg/estimator" "github.com/influxdata/influxdb/v2/pkg/file" "github.com/influxdata/influxdb/v2/pkg/limiter" "github.com/influxdata/influxdb/v2/pkg/slices" internal "github.com/influxdata/influxdb/v2/tsdb/internal" "github.com/influxdata/influxql" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "google.golang.org/protobuf/proto" ) const ( measurementKey = "_name" DefaultMetricInterval = 10 * time.Second FieldsChangeFile = "fields.idxl" bytesInInt64 = 8 ) var ( // ErrFieldTypeConflict is returned when a new field already exists with a different type. ErrFieldTypeConflict = errors.New("field type conflict") // ErrEngineClosed is returned when a caller attempts indirectly to // access the shard's underlying engine. ErrEngineClosed = errors.New("engine is closed") // ErrShardDisabled is returned when a the shard is not available for // queries or writes. ErrShardDisabled = errors.New("shard is disabled") // ErrUnknownFieldsFormat is returned when the fields index file is not identifiable by // the file's magic number. ErrUnknownFieldsFormat = errors.New("unknown field index format") // ErrUnknownFieldType is returned when the type of a field cannot be determined. ErrUnknownFieldType = errors.New("unknown field type") // ErrShardNotIdle is returned when an operation requiring the shard to be idle/cold is // attempted on a hot shard. ErrShardNotIdle = errors.New("shard not idle") // fieldsIndexMagicNumber is the file magic number for the fields index file. fieldsIndexMagicNumber = []byte{0, 6, 1, 3} ) var ( // Static objects to prevent small allocs. timeBytes = []byte("time") ) // A ShardError implements the error interface, and contains extra // context about the shard that generated the error. type ShardError struct { id uint64 Err error } // NewShardError returns a new ShardError. func NewShardError(id uint64, err error) error { if err == nil { return nil } return ShardError{id: id, Err: err} } // Error returns the string representation of the error, to satisfy the error interface. func (e ShardError) Error() string { return fmt.Sprintf("[shard %d] %s", e.id, e.Err) } // PartialWriteError indicates a write request could only write a portion of the // requested values. type PartialWriteError struct { Reason string Dropped int // A sorted slice of series keys that were dropped. DroppedKeys [][]byte } func (e PartialWriteError) Error() string { return fmt.Sprintf("partial write: %s dropped=%d", e.Reason, e.Dropped) } // Shard represents a self-contained time series database. An inverted index of // the measurement and tag data is kept along with the raw time series data. // Data can be split across many shards. The query engine in TSDB is responsible // for combining the output of many shards into a single query result. type Shard struct { path string walPath string id uint64 database string retentionPolicy string sfile *SeriesFile options EngineOptions mu sync.RWMutex _engine Engine index Index enabled bool stats *ShardMetrics baseLogger *zap.Logger logger *zap.Logger metricUpdater *ticker EnableOnOpen bool // CompactionDisabled specifies the shard should not schedule compactions. // This option is intended for offline tooling. CompactionDisabled bool } // NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index func NewShard(id uint64, path string, walPath string, sfile *SeriesFile, opt EngineOptions) *Shard { db, rp := decodeStorePath(path) logger := zap.NewNop() engineTags := EngineTags{ Path: path, WalPath: walPath, Id: fmt.Sprintf("%d", id), Bucket: db, EngineVersion: opt.EngineVersion, } s := &Shard{ id: id, path: path, walPath: walPath, sfile: sfile, options: opt, stats: newShardMetrics(engineTags), database: db, retentionPolicy: rp, logger: logger, baseLogger: logger, EnableOnOpen: true, } return s } // WithLogger sets the logger on the shard. It must be called before Open. func (s *Shard) WithLogger(log *zap.Logger) { s.baseLogger = log engine, err := s.Engine() if err == nil { engine.WithLogger(s.baseLogger) s.index.WithLogger(s.baseLogger) } s.logger = s.baseLogger.With(zap.String("service", "shard")) } // SetEnabled enables the shard for queries and write. When disabled, all // writes and queries return an error and compactions are stopped for the shard. func (s *Shard) SetEnabled(enabled bool) { s.mu.Lock() s.setEnabledNoLock(enabled) s.mu.Unlock() } // ! setEnabledNoLock performs actual work of SetEnabled. Must hold s.mu before calling. func (s *Shard) setEnabledNoLock(enabled bool) { // Prevent writes and queries s.enabled = enabled if s._engine != nil && !s.CompactionDisabled { // Disable background compactions and snapshotting s._engine.SetEnabled(enabled) } } // ScheduleFullCompaction forces a full compaction to be schedule on the shard. func (s *Shard) ScheduleFullCompaction() error { engine, err := s.Engine() if err != nil { return err } return engine.ScheduleFullCompaction() } // ID returns the shards ID. func (s *Shard) ID() uint64 { return s.id } // Database returns the database of the shard. func (s *Shard) Database() string { return s.database } // RetentionPolicy returns the retention policy of the shard. func (s *Shard) RetentionPolicy() string { return s.retentionPolicy } var globalShardMetrics = newAllShardMetrics() type twoCounterObserver struct { count prometheus.Counter sum prometheus.Counter } func (t twoCounterObserver) Observe(f float64) { t.sum.Inc() t.count.Add(f) } var _ prometheus.Observer = twoCounterObserver{} type allShardMetrics struct { writes *prometheus.CounterVec writesSum *prometheus.CounterVec writesErr *prometheus.CounterVec writesErrSum *prometheus.CounterVec writesDropped *prometheus.CounterVec fieldsCreated *prometheus.CounterVec diskSize *prometheus.GaugeVec series *prometheus.GaugeVec } type ShardMetrics struct { writes prometheus.Observer writesErr prometheus.Observer writesDropped prometheus.Counter fieldsCreated prometheus.Counter diskSize prometheus.Gauge series prometheus.Gauge } const storageNamespace = "storage" const shardSubsystem = "shard" func newAllShardMetrics() *allShardMetrics { labels := EngineLabelNames() return &allShardMetrics{ writes: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: storageNamespace, Subsystem: shardSubsystem, Name: "write_count", Help: "Count of the number of write requests", }, labels), writesSum: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: storageNamespace, Subsystem: shardSubsystem, Name: "write_sum", Help: "Counter of the number of points for write requests", }, labels), writesErr: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: storageNamespace, Subsystem: shardSubsystem, Name: "write_err_count", Help: "Count of the number of write requests with errors", }, labels), writesErrSum: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: storageNamespace, Subsystem: shardSubsystem, Name: "write_err_sum", Help: "Counter of the number of points for write requests with errors", }, labels), writesDropped: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: storageNamespace, Subsystem: shardSubsystem, Name: "write_dropped_sum", Help: "Counter of the number of points droppped", }, labels), fieldsCreated: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: storageNamespace, Subsystem: shardSubsystem, Name: "fields_created", Help: "Counter of the number of fields created", }, labels), diskSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: storageNamespace, Subsystem: shardSubsystem, Name: "disk_size", Help: "Gauge of the disk size for the shard", }, labels), series: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: storageNamespace, Subsystem: shardSubsystem, Name: "series", Help: "Gauge of the number of series in the shard index", }, labels), } } func ShardCollectors() []prometheus.Collector { return []prometheus.Collector{ globalShardMetrics.writes, globalShardMetrics.writesSum, globalShardMetrics.writesErr, globalShardMetrics.writesErrSum, globalShardMetrics.writesDropped, globalShardMetrics.fieldsCreated, globalShardMetrics.diskSize, globalShardMetrics.series, } } func newShardMetrics(tags EngineTags) *ShardMetrics { labels := tags.GetLabels() return &ShardMetrics{ writes: twoCounterObserver{ count: globalShardMetrics.writes.With(labels), sum: globalShardMetrics.writesSum.With(labels), }, writesErr: twoCounterObserver{ count: globalShardMetrics.writesErr.With(labels), sum: globalShardMetrics.writesErrSum.With(labels), }, writesDropped: globalShardMetrics.writesDropped.With(labels), fieldsCreated: globalShardMetrics.fieldsCreated.With(labels), diskSize: globalShardMetrics.diskSize.With(labels), series: globalShardMetrics.series.With(labels), } } // ticker runs fn periodically, and stops when Stop() is called // // Stop waits for the last function run to finish if already running type ticker struct { wg sync.WaitGroup closing chan struct{} } // Stops the ticker and waits for the function to complete func (t *ticker) Stop() { close(t.closing) t.wg.Wait() } // Path returns the path set on the shard when it was created. func (s *Shard) Path() string { return s.path } // Open initializes and opens the shard's store. func (s *Shard) Open(ctx context.Context) error { s.mu.Lock() closeWaitNeeded, err := s.openNoLock(ctx) s.mu.Unlock() if closeWaitNeeded { werr := s.closeWait() // We want the first error we get returned to the caller if err == nil { err = werr } } return err } // openNoLock performs work of Open. Must hold s.mu before calling. The first return // value is true if the caller should call closeWait after unlocking s.mu in order // to clean up a failed open operation. func (s *Shard) openNoLock(ctx context.Context) (bool, error) { if err := func() error { // Return if the shard is already open if s._engine != nil { return nil } seriesIDSet := NewSeriesIDSet() // Initialize underlying index. ipath := filepath.Join(s.path, "index") idx, err := NewIndex(s.id, s.database, ipath, seriesIDSet, s.sfile, s.options) if err != nil { return err } idx.WithLogger(s.baseLogger) // Check if the index needs to be rebuilt before Open() initializes // its file system layout. var shouldReindex bool if _, err := os.Stat(ipath); os.IsNotExist(err) { shouldReindex = true } // Open index. if err := idx.Open(); err != nil { return err } s.index = idx // Initialize underlying engine. e, err := NewEngine(s.id, idx, s.path, s.walPath, s.sfile, s.options) if err != nil { return err } // Set log output on the engine. e.WithLogger(s.baseLogger) // Disable compactions while loading the index e.SetEnabled(false) // Open engine. if err := e.Open(ctx); err != nil { return err } if shouldReindex { if err := e.Reindex(); err != nil { return err } } if err := e.LoadMetadataIndex(s.id, s.index); err != nil { return err } s._engine = e // Set up metric collection metricUpdater := &ticker{ closing: make(chan struct{}), } // We want a way to turn off the series and disk size metrics if they are suspected to cause issues // This corresponds to the top-level MetricsDisabled argument if !s.options.MetricsDisabled { metricUpdater.wg.Add(1) go func() { tick := time.NewTicker(DefaultMetricInterval) defer metricUpdater.wg.Done() defer tick.Stop() for { select { case <-tick.C: // Note this takes the engine lock, so we have to be careful not // to close metricUpdater.closing while holding the engine lock e, err := s.Engine() if err != nil { continue } s.stats.series.Set(float64(e.SeriesN())) s.stats.diskSize.Set(float64(e.DiskSize())) case <-metricUpdater.closing: return } } }() } s.metricUpdater = metricUpdater return nil }(); err != nil { s.closeNoLock() return true, NewShardError(s.id, err) } if s.EnableOnOpen { // enable writes, queries and compactions s.setEnabledNoLock(true) } return false, nil } // Close shuts down the shard's store. func (s *Shard) Close() error { err := func() error { s.mu.Lock() defer s.mu.Unlock() return s.closeNoLock() }() // make sure not to hold a lock while waiting for close to finish werr := s.closeWait() if err != nil { return err } return werr } // closeNoLock closes the shard an removes reference to the shard from associated // indexes. The s.mu mutex must be held before calling closeNoLock. closeWait should always // be called after calling closeNoLock. func (s *Shard) closeNoLock() error { if s._engine == nil { return nil } if s.metricUpdater != nil { close(s.metricUpdater.closing) } err := s._engine.Close() if err == nil { s._engine = nil } if e := s.index.Close(); e == nil { s.index = nil } return err } // closeWait waits for goroutines and other background operations associated with this // shard to complete after closeNoLock is called. Must only be called after calling // closeNoLock. closeWait should always be called after calling closeNoLock. // Public methods which close the shard should call closeWait after closeNoLock before // returning. Must be called without holding shard locks to avoid deadlocking. func (s *Shard) closeWait() error { if s.metricUpdater != nil { s.metricUpdater.wg.Wait() } return nil } // IndexType returns the index version being used for this shard. // // IndexType returns the empty string if it is called before the shard is opened, // since it is only that point that the underlying index type is known. func (s *Shard) IndexType() string { s.mu.RLock() defer s.mu.RUnlock() if s._engine == nil || s.index == nil { // Shard not open yet. return "" } return s.index.Type() } // ready determines if the Shard is ready for queries or writes. // It returns nil if ready, otherwise ErrShardClosed or ErrShardDisabled func (s *Shard) ready() error { var err error if s._engine == nil { err = ErrEngineClosed } else if !s.enabled { err = ErrShardDisabled } return err } // LastModified returns the time when this shard was last modified. func (s *Shard) LastModified() time.Time { engine, err := s.Engine() if err != nil { return time.Time{} } return engine.LastModified() } // Index returns a reference to the underlying index. It returns an error if // the index is nil. func (s *Shard) Index() (Index, error) { s.mu.RLock() defer s.mu.RUnlock() if err := s.ready(); err != nil { return nil, err } return s.index, nil } // SeriesFile returns a reference the underlying series file. If return an error // if the series file is nil. func (s *Shard) SeriesFile() (*SeriesFile, error) { s.mu.RLock() defer s.mu.RUnlock() if err := s.ready(); err != nil { return nil, err } return s.sfile, nil } // IsIdle return true if the shard is not receiving writes and is fully compacted. func (s *Shard) IsIdle() (state bool, reason string) { engine, err := s.Engine() if err != nil { return true, "" } return engine.IsIdle() } func (s *Shard) Free() error { engine, err := s.Engine() if err != nil { return err } // Disable compactions to stop background goroutines s.SetCompactionsEnabled(false) return engine.Free() } // SetCompactionsEnabled enables or disable shard background compactions. func (s *Shard) SetCompactionsEnabled(enabled bool) { engine, err := s.Engine() if err != nil { return } engine.SetCompactionsEnabled(enabled) } // DiskSize returns the size on disk of this shard. func (s *Shard) DiskSize() (int64, error) { s.mu.RLock() defer s.mu.RUnlock() // We don't use engine() because we still want to report the shard's disk // size even if the shard has been disabled. if s._engine == nil { return 0, ErrEngineClosed } size := s._engine.DiskSize() return size, nil } // FieldCreate holds information for a field to create on a measurement. type FieldCreate struct { Measurement []byte Field *Field } // WritePoints will write the raw data points and any new metadata to the index in the shard. func (s *Shard) WritePoints(ctx context.Context, points []models.Point) (rErr error) { s.mu.RLock() defer s.mu.RUnlock() engine, err := s.engineNoLock() if err != nil { return err } var writeError error s.stats.writes.Observe(float64(len(points))) defer func() { if rErr != nil { s.stats.writesErr.Observe(float64(len(points))) } }() points, fieldsToCreate, err := s.validateSeriesAndFields(points) if err != nil { if _, ok := err.(PartialWriteError); !ok { return err } // There was a partial write (points dropped), hold onto the error to return // to the caller, but continue on writing the remaining points. writeError = err } s.stats.fieldsCreated.Add(float64(len(fieldsToCreate))) // add any new fields and keep track of what needs to be saved if err := s.createFieldsAndMeasurements(fieldsToCreate); err != nil { return err } // Write to the engine. if err := engine.WritePoints(ctx, points); err != nil { return fmt.Errorf("engine: %s", err) } return writeError } // validateSeriesAndFields checks which series and fields are new and whose metadata should be saved and indexed. func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point, []*FieldCreate, error) { var ( fieldsToCreate []*FieldCreate err error dropped int reason string // only first error reason is set unless returned from CreateSeriesListIfNotExists ) // Create all series against the index in bulk. keys := make([][]byte, len(points)) names := make([][]byte, len(points)) tagsSlice := make([]models.Tags, len(points)) // Check if keys should be unicode validated. validateKeys := s.options.Config.ValidateKeys var j int for i, p := range points { tags := p.Tags() // Drop any series w/ a "time" tag, these are illegal if v := tags.Get(timeBytes); v != nil { dropped++ if reason == "" { reason = fmt.Sprintf( "invalid tag key: input tag \"%s\" on measurement \"%s\" is invalid", "time", string(p.Name())) } continue } // Drop any series with invalid unicode characters in the key. if validateKeys && !models.ValidKeyTokens(string(p.Name()), tags) { dropped++ if reason == "" { reason = fmt.Sprintf("key contains invalid unicode: %q", makePrintable(string(p.Key()))) } continue } keys[j] = p.Key() names[j] = p.Name() tagsSlice[j] = tags points[j] = points[i] j++ } points, keys, names, tagsSlice = points[:j], keys[:j], names[:j], tagsSlice[:j] engine, err := s.engineNoLock() if err != nil { return nil, nil, err } // Add new series. Check for partial writes. var droppedKeys [][]byte if err := engine.CreateSeriesListIfNotExists(keys, names, tagsSlice); err != nil { switch err := err.(type) { // (DSB) This was previously *PartialWriteError. Now catch pointer and value types. case *PartialWriteError: reason = err.Reason dropped += err.Dropped droppedKeys = err.DroppedKeys s.stats.writesDropped.Add(float64(err.Dropped)) case PartialWriteError: reason = err.Reason dropped += err.Dropped droppedKeys = err.DroppedKeys s.stats.writesDropped.Add(float64(err.Dropped)) default: return nil, nil, err } } j = 0 for i, p := range points { // Skip any points with only invalid fields. iter := p.FieldIterator() validField := false for iter.Next() { if bytes.Equal(iter.FieldKey(), timeBytes) { continue } validField = true break } if !validField { if reason == "" { reason = fmt.Sprintf( "invalid field name: input field \"%s\" on measurement \"%s\" is invalid", "time", string(p.Name())) } dropped++ continue } // Skip any points whos keys have been dropped. Dropped has already been incremented for them. if len(droppedKeys) > 0 && bytesutil.Contains(droppedKeys, keys[i]) { continue } name := p.Name() mf := engine.MeasurementFields(name) // Check with the field validator. if err := ValidateFields(mf, p, s.options.Config.SkipFieldSizeValidation); err != nil { switch err := err.(type) { case PartialWriteError: if reason == "" { reason = err.Reason } dropped += err.Dropped s.stats.writesDropped.Add(float64(err.Dropped)) default: return nil, nil, err } continue } points[j] = points[i] j++ // Create any fields that are missing. iter.Reset() for iter.Next() { fieldKey := iter.FieldKey() // Skip fields named "time". They are illegal. if bytes.Equal(fieldKey, timeBytes) { continue } if mf.FieldBytes(fieldKey) != nil { continue } dataType := dataTypeFromModelsFieldType(iter.Type()) if dataType == influxql.Unknown { continue } fieldsToCreate = append(fieldsToCreate, &FieldCreate{ Measurement: name, Field: &Field{ Name: string(fieldKey), Type: dataType, }, }) } } if dropped > 0 { err = PartialWriteError{Reason: reason, Dropped: dropped} } return points[:j], fieldsToCreate, err } const unPrintReplRune = '?' const unPrintMaxReplRune = 3 // makePrintable - replace invalid and non-printable unicode characters with a few '?' runes func makePrintable(s string) string { b := strings.Builder{} b.Grow(len(s)) c := 0 for _, r := range strings.ToValidUTF8(s, string(unicode.ReplacementChar)) { if !unicode.IsPrint(r) || r == unicode.ReplacementChar { if c < unPrintMaxReplRune { b.WriteRune(unPrintReplRune) } c++ } else { b.WriteRune(r) c = 0 } } return b.String() } func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error { if len(fieldsToCreate) == 0 { return nil } engine, err := s.engineNoLock() if err != nil { return err } // add fields changes := make([]*FieldChange, 0, len(fieldsToCreate)) for _, f := range fieldsToCreate { mf := engine.MeasurementFields(f.Measurement) if err := mf.CreateFieldIfNotExists([]byte(f.Field.Name), f.Field.Type); err != nil { return err } changes = append(changes, &FieldChange{ FieldCreate: *f, ChangeType: AddMeasurementField, }) } return engine.MeasurementFieldSet().Save(changes) } // DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive) func (s *Shard) DeleteSeriesRange(ctx context.Context, itr SeriesIterator, min, max int64) error { engine, err := s.Engine() if err != nil { return err } return engine.DeleteSeriesRange(ctx, itr, min, max) } // DeleteSeriesRangeWithPredicate deletes all values from for seriesKeys between min and max (inclusive) // for which predicate() returns true. If predicate() is nil, then all values in range are deleted. func (s *Shard) DeleteSeriesRangeWithPredicate( ctx context.Context, itr SeriesIterator, predicate func(name []byte, tags models.Tags) (int64, int64, bool), ) error { engine, err := s.Engine() if err != nil { return err } return engine.DeleteSeriesRangeWithPredicate(ctx, itr, predicate) } // DeleteMeasurement deletes a measurement and all underlying series. func (s *Shard) DeleteMeasurement(ctx context.Context, name []byte) error { engine, err := s.Engine() if err != nil { return err } return engine.DeleteMeasurement(ctx, name) } // SeriesN returns the unique number of series in the shard. func (s *Shard) SeriesN() int64 { engine, err := s.Engine() if err != nil { return 0 } return engine.SeriesN() } // SeriesSketches returns the measurement sketches for the shard. func (s *Shard) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { engine, err := s.Engine() if err != nil { return nil, nil, err } return engine.SeriesSketches() } // MeasurementsSketches returns the measurement sketches for the shard. func (s *Shard) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { engine, err := s.Engine() if err != nil { return nil, nil, err } return engine.MeasurementsSketches() } // MeasurementNamesByRegex returns names of measurements matching the regular expression. func (s *Shard) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) { engine, err := s.Engine() if err != nil { return nil, err } return engine.MeasurementNamesByRegex(re) } // MeasurementNamesByPredicate returns fields for a measurement filtered by an expression. func (s *Shard) MeasurementNamesByPredicate(expr influxql.Expr) ([][]byte, error) { index, err := s.Index() if err != nil { return nil, err } indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile} return indexSet.MeasurementNamesByPredicate(query.OpenAuthorizer, expr) } // MeasurementFields returns fields for a measurement. func (s *Shard) MeasurementFields(name []byte) *MeasurementFields { engine, err := s.Engine() if err != nil { return nil } return engine.MeasurementFields(name) } // MeasurementExists returns true if the shard contains name. // TODO(edd): This method is currently only being called from tests; do we // really need it? func (s *Shard) MeasurementExists(name []byte) (bool, error) { engine, err := s.Engine() if err != nil { return false, err } return engine.MeasurementExists(name) } // CreateIterator returns an iterator for the data in the shard. func (s *Shard) CreateIterator(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) { engine, err := s.Engine() if err != nil { return nil, err } switch m.SystemIterator { case "_fieldKeys": return NewFieldKeysIterator(s, opt) case "_series": // TODO(benbjohnson): Move up to the Shards.CreateIterator(). index, err := s.Index() if err != nil { return nil, err } indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile} itr, err := NewSeriesPointIterator(indexSet, opt) if err != nil { return nil, err } return query.NewInterruptIterator(itr, opt.InterruptCh), nil case "_tagKeys": return NewTagKeysIterator(s, opt) } return engine.CreateIterator(ctx, m.Name, opt) } func (s *Shard) CreateSeriesCursor(ctx context.Context, req SeriesCursorRequest, cond influxql.Expr) (SeriesCursor, error) { index, err := s.Index() if err != nil { return nil, err } return newSeriesCursor(req, IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile}, cond) } func (s *Shard) CreateCursorIterator(ctx context.Context) (CursorIterator, error) { engine, err := s.Engine() if err != nil { return nil, err } return engine.CreateCursorIterator(ctx) } // FieldDimensions returns unique sets of fields and dimensions across a list of sources. func (s *Shard) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) { engine, err := s.Engine() if err != nil { return nil, nil, err } fields = make(map[string]influxql.DataType) dimensions = make(map[string]struct{}) index, err := s.Index() if err != nil { return nil, nil, err } for _, name := range measurements { // Handle system sources. if strings.HasPrefix(name, "_") { var keys []string switch name { case "_fieldKeys": keys = []string{"fieldKey", "fieldType"} case "_series": keys = []string{"key"} case "_tagKeys": keys = []string{"tagKey"} } if len(keys) > 0 { for _, k := range keys { if fields[k].LessThan(influxql.String) { fields[k] = influxql.String } } continue } // Unknown system source so default to looking for a measurement. } // Retrieve measurement. if exists, err := engine.MeasurementExists([]byte(name)); err != nil { return nil, nil, err } else if !exists { continue } // Append fields and dimensions. mf := engine.MeasurementFields([]byte(name)) if mf != nil { for k, typ := range mf.FieldSet() { if fields[k].LessThan(typ) { fields[k] = typ } } } indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile} if err := indexSet.ForEachMeasurementTagKey([]byte(name), func(key []byte) error { dimensions[string(key)] = struct{}{} return nil }); err != nil { return nil, nil, err } } return fields, dimensions, nil } // mapType returns the data type for the field within the measurement. func (s *Shard) mapType(measurement, field string) (influxql.DataType, error) { engine, err := s.engineNoLock() if err != nil { return 0, err } switch field { case "_name", "_tagKey", "_tagValue", "_seriesKey": return influxql.String, nil } // Process system measurements. switch measurement { case "_fieldKeys": if field == "fieldKey" || field == "fieldType" { return influxql.String, nil } return influxql.Unknown, nil case "_series": if field == "key" { return influxql.String, nil } return influxql.Unknown, nil case "_tagKeys": if field == "tagKey" { return influxql.String, nil } return influxql.Unknown, nil } // Unknown system source so default to looking for a measurement. if exists, _ := engine.MeasurementExists([]byte(measurement)); !exists { return influxql.Unknown, nil } mf := engine.MeasurementFields([]byte(measurement)) if mf != nil { f := mf.Field(field) if f != nil { return f.Type, nil } } if exists, _ := engine.HasTagKey([]byte(measurement), []byte(field)); exists { return influxql.Tag, nil } return influxql.Unknown, nil } // expandSources expands regex sources and removes duplicates. // NOTE: sources must be normalized (db and rp set) before calling this function. func (s *Shard) expandSources(sources influxql.Sources) (influxql.Sources, error) { engine, err := s.engineNoLock() if err != nil { return nil, err } // Use a map as a set to prevent duplicates. set := map[string]influxql.Source{} // Iterate all sources, expanding regexes when they're found. for _, source := range sources { switch src := source.(type) { case *influxql.Measurement: // Add non-regex measurements directly to the set. if src.Regex == nil { set[src.String()] = src continue } // Loop over matching measurements. names, err := engine.MeasurementNamesByRegex(src.Regex.Val) if err != nil { return nil, err } for _, name := range names { other := &influxql.Measurement{ Database: src.Database, RetentionPolicy: src.RetentionPolicy, Name: string(name), } set[other.String()] = other } default: return nil, fmt.Errorf("expandSources: unsupported source type: %T", source) } } // Convert set to sorted slice. names := make([]string, 0, len(set)) for name := range set { names = append(names, name) } sort.Strings(names) // Convert set to a list of Sources. expanded := make(influxql.Sources, 0, len(set)) for _, name := range names { expanded = append(expanded, set[name]) } return expanded, nil } // Backup backs up the shard by creating a tar archive of all TSM files that // have been modified since the provided time. See Engine.Backup for more details. func (s *Shard) Backup(w io.Writer, basePath string, since time.Time) error { engine, err := s.Engine() if err != nil { return err } return engine.Backup(w, basePath, since) } func (s *Shard) Export(w io.Writer, basePath string, start time.Time, end time.Time) error { engine, err := s.Engine() if err != nil { return err } return engine.Export(w, basePath, start, end) } // Restore restores data to the underlying engine for the shard. // The shard is reopened after restore. func (s *Shard) Restore(ctx context.Context, r io.Reader, basePath string) error { closeWaitNeeded, err := func() (bool, error) { s.mu.Lock() defer s.mu.Unlock() closeWaitNeeded := false // Special case - we can still restore to a disabled shard, so we should // only check if the engine is closed and not care if the shard is // disabled. if s._engine == nil { return closeWaitNeeded, ErrEngineClosed } // Restore to engine. if err := s._engine.Restore(r, basePath); err != nil { return closeWaitNeeded, nil } // Close shard. closeWaitNeeded = true // about to call closeNoLock, closeWait will be needed if err := s.closeNoLock(); err != nil { return closeWaitNeeded, err } return closeWaitNeeded, nil }() // Now that we've unlocked, we can call closeWait if needed if closeWaitNeeded { werr := s.closeWait() // Return the first error encountered to the caller if err == nil { err = werr } } if err != nil { return err } // Reopen engine. Need locked method since we had to unlock for closeWait. return s.Open(ctx) } // Import imports data to the underlying engine for the shard. r should // be a reader from a backup created by Backup. func (s *Shard) Import(r io.Reader, basePath string) error { // Special case - we can still import to a disabled shard, so we should // only check if the engine is closed and not care if the shard is // disabled. s.mu.Lock() defer s.mu.Unlock() if s._engine == nil { return ErrEngineClosed } // Import to engine. return s._engine.Import(r, basePath) } // CreateSnapshot will return a path to a temp directory // containing hard links to the underlying shard files. func (s *Shard) CreateSnapshot(skipCacheOk bool) (string, error) { engine, err := s.Engine() if err != nil { return "", err } return engine.CreateSnapshot(skipCacheOk) } // ForEachMeasurementName iterates over each measurement in the shard. func (s *Shard) ForEachMeasurementName(fn func(name []byte) error) error { engine, err := s.Engine() if err != nil { return err } return engine.ForEachMeasurementName(fn) } func (s *Shard) TagKeyCardinality(name, key []byte) int { engine, err := s.Engine() if err != nil { return 0 } return engine.TagKeyCardinality(name, key) } // Digest returns a digest of the shard. func (s *Shard) Digest() (io.ReadCloser, int64, string, error) { engine, err := s.Engine() if err != nil { return nil, 0, "", err } // Make sure the shard is idle/cold. (No use creating a digest of a // hot shard that is rapidly changing.) if isIdle, reason := engine.IsIdle(); !isIdle { return nil, 0, reason, ErrShardNotIdle } readCloser, size, err := engine.Digest() return readCloser, size, "", err } // engine safely (under an RLock) returns a reference to the shard's Engine, or // an error if the Engine is closed, or the shard is currently disabled. // // The shard's Engine should always be accessed via a call to engine(), rather // than directly referencing Shard.engine. // // If a caller needs an Engine reference but is already under a lock, then they // should use engineNoLock(). func (s *Shard) Engine() (Engine, error) { s.mu.RLock() defer s.mu.RUnlock() return s.engineNoLock() } // engineNoLock is similar to calling engine(), but the caller must guarantee // that they already hold an appropriate lock. func (s *Shard) engineNoLock() (Engine, error) { if err := s.ready(); err != nil { return nil, err } return s._engine, nil } type ShardGroup interface { MeasurementsByRegex(re *regexp.Regexp) []string FieldKeysByMeasurement(name []byte) []string FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) MapType(measurement, field string) influxql.DataType CreateIterator(ctx context.Context, measurement *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) IteratorCost(ctx context.Context, measurement string, opt query.IteratorOptions) (query.IteratorCost, error) ExpandSources(sources influxql.Sources) (influxql.Sources, error) } // Shards represents a sortable list of shards. type Shards []*Shard // Len implements sort.Interface. func (a Shards) Len() int { return len(a) } // Less implements sort.Interface. func (a Shards) Less(i, j int) bool { return a[i].id < a[j].id } // Swap implements sort.Interface. func (a Shards) Swap(i, j int) { a[i], a[j] = a[j], a[i] } // MeasurementsByRegex returns the unique set of measurements matching the // provided regex, for all the shards. func (a Shards) MeasurementsByRegex(re *regexp.Regexp) []string { var m map[string]struct{} for _, sh := range a { names, err := sh.MeasurementNamesByRegex(re) if err != nil { continue // Skip this shard's results—previous behaviour. } if m == nil { m = make(map[string]struct{}, len(names)) } for _, name := range names { m[string(name)] = struct{}{} } } if len(m) == 0 { return nil } names := make([]string, 0, len(m)) for key := range m { names = append(names, key) } sort.Strings(names) return names } // FieldKeysByMeasurement returns a de-duplicated, sorted, set of field keys for // the provided measurement name. func (a Shards) FieldKeysByMeasurement(name []byte) []string { if len(a) == 1 { mf := a[0].MeasurementFields(name) if mf == nil { return nil } return mf.FieldKeys() } all := make([][]string, 0, len(a)) for _, shard := range a { mf := shard.MeasurementFields(name) if mf == nil { continue } all = append(all, mf.FieldKeys()) } return slices.MergeSortedStrings(all...) } // MeasurementNamesByPredicate returns the measurements that match the given predicate. func (a Shards) MeasurementNamesByPredicate(expr influxql.Expr) ([][]byte, error) { if len(a) == 1 { return a[0].MeasurementNamesByPredicate(expr) } all := make([][][]byte, len(a)) for i, shard := range a { names, err := shard.MeasurementNamesByPredicate(expr) if err != nil { return nil, err } all[i] = names } return slices.MergeSortedBytes(all...), nil } // FieldKeysByPredicate returns the field keys for series that match // the given predicate. func (a Shards) FieldKeysByPredicate(expr influxql.Expr) (map[string][]string, error) { names, ok := measurementOptimization(expr, measurementKey) if !ok { var err error if names, err = a.MeasurementNamesByPredicate(expr); err != nil { return nil, err } } all := make(map[string][]string, len(names)) for _, name := range names { all[string(name)] = a.FieldKeysByMeasurement(name) } return all, nil } // consecutiveAndChildren finds all child nodes of consecutive // influxql.BinaryExpr with AND operator nodes ("AND nodes") which are not // themselves AND nodes. This may be the root of the tree if the root of the // tree is not an AND node. type consecutiveAndChildren struct { children []influxql.Node } func (v *consecutiveAndChildren) Visit(node influxql.Node) influxql.Visitor { switch n := node.(type) { case *influxql.BinaryExpr: if n.Op == influxql.AND { return v } case *influxql.ParenExpr: // Parens are essentially a no-op and can be traversed through. return v } // If this wasn't a BinaryExpr with an AND operator or a Paren, record this // child node and stop the search for this branch. v.children = append(v.children, node) return nil } // orMeasurementTree determines if a tree (or subtree) represents a grouping of // exclusively measurement names OR'd together with EQ operators for the // measurements themselves. It collects the list of measurement names // encountered and records the validity of the tree. type orMeasurementTree struct { measurementKey string measurementNames []string valid bool } func (v *orMeasurementTree) Visit(node influxql.Node) influxql.Visitor { // Return early if this tree has already been invalidated - no reason to // continue evaluating at that point. if !v.valid { return nil } switch n := node.(type) { case *influxql.BinaryExpr: // A BinaryExpr must have an operation of OR or EQ in a valid tree if n.Op == influxql.OR { return v } else if n.Op == influxql.EQ { // An EQ must be in the form of "v.measurementKey == measurementName" in a // valid tree if name, ok := measurementNameFromEqBinary(n, v.measurementKey); ok { v.measurementNames = append(v.measurementNames, name) // If a valid measurement key/value was found, there is no need to // continue evaluating the VarRef/StringLiteral child nodes of this // node. return nil } } case *influxql.ParenExpr: // Parens are essentially a no-op and can be traversed through. return v } // The the type switch didn't already return, this tree is invalid. v.valid = false return nil } func measurementOptimization(expr influxql.Expr, key string) ([][]byte, bool) { // A measurement optimization is possible if the query contains a single group // of one or more measurements (in the form of _measurement = measName, // equality operator only) grouped together by OR operators, with the subtree // containing the OR'd measurements accessible from root of the tree either // directly (tree contains nothing but OR'd measurements) or by traversing AND // binary expression nodes. // Get a list of "candidate" measurement subtrees. v := consecutiveAndChildren{} influxql.Walk(&v, expr) possibleSubtrees := v.children // Evaluate the candidate subtrees to determine which measurement names they // contain, and to see if they are valid for the optimization. validSubtrees := []orMeasurementTree{} for _, h := range possibleSubtrees { t := orMeasurementTree{ measurementKey: key, valid: true, } influxql.Walk(&t, h) if t.valid { validSubtrees = append(validSubtrees, t) } } // There must be exactly one valid measurement subtree for this optimization // to be applied. Note: It may also be possible to have measurements in // multiple subtrees, as long as there are no measurements in invalid // subtrees, by determining an intersection of the measurement names across // all valid subtrees - this is not currently implemented. if len(validSubtrees) != 1 { return nil, false } return slices.StringsToBytes(validSubtrees[0].measurementNames...), true } // measurementNameFromEqBinary returns the name of a measurement from a binary // expression if possible, and a boolean status indicating if the binary // expression contained a measurement name. A meausurement name will only be // returned if the operator for the binary is EQ, and the measurement key is on // the LHS with the measurement name on the RHS. func measurementNameFromEqBinary(be *influxql.BinaryExpr, key string) (string, bool) { lhs, ok := be.LHS.(*influxql.VarRef) if !ok { return "", false } else if lhs.Val != key { return "", false } rhs, ok := be.RHS.(*influxql.StringLiteral) if !ok { return "", false } return rhs.Val, true } func (a Shards) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) { fields = make(map[string]influxql.DataType) dimensions = make(map[string]struct{}) for _, sh := range a { f, d, err := sh.FieldDimensions(measurements) if err != nil { return nil, nil, err } for k, typ := range f { if fields[k].LessThan(typ) { fields[k] = typ } } for k := range d { dimensions[k] = struct{}{} } } return } func (a Shards) MapType(measurement, field string) influxql.DataType { var typ influxql.DataType for _, sh := range a { sh.mu.RLock() if t, err := sh.mapType(measurement, field); err == nil && typ.LessThan(t) { typ = t } sh.mu.RUnlock() } return typ } func (a Shards) CallType(name string, args []influxql.DataType) (influxql.DataType, error) { typmap := query.CallTypeMapper{} return typmap.CallType(name, args) } func (a Shards) CreateIterator(ctx context.Context, measurement *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) { switch measurement.SystemIterator { case "_series": return a.createSeriesIterator(ctx, opt) } itrs := make([]query.Iterator, 0, len(a)) for _, sh := range a { itr, err := sh.CreateIterator(ctx, measurement, opt) if err != nil { query.Iterators(itrs).Close() return nil, err } else if itr == nil { continue } itrs = append(itrs, itr) select { case <-opt.InterruptCh: query.Iterators(itrs).Close() return nil, query.ErrQueryInterrupted default: } // Enforce series limit at creation time. if opt.MaxSeriesN > 0 { stats := itr.Stats() if stats.SeriesN > opt.MaxSeriesN { query.Iterators(itrs).Close() return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", stats.SeriesN, opt.MaxSeriesN) } } } return query.Iterators(itrs).Merge(opt) } func (a Shards) createSeriesIterator(ctx context.Context, opt query.IteratorOptions) (_ query.Iterator, err error) { var ( idxs = make([]Index, 0, len(a)) sfile *SeriesFile ) for _, sh := range a { var idx Index if idx, err = sh.Index(); err == nil { idxs = append(idxs, idx) } if sfile == nil { sfile, _ = sh.SeriesFile() } } if sfile == nil { return nil, nil } return NewSeriesPointIterator(IndexSet{Indexes: idxs, SeriesFile: sfile}, opt) } func (a Shards) IteratorCost(ctx context.Context, measurement string, opt query.IteratorOptions) (query.IteratorCost, error) { var costs query.IteratorCost var costerr error var mu sync.RWMutex setErr := func(err error) { mu.Lock() defer mu.Unlock() if costerr == nil { costerr = err } } limit := limiter.NewFixed(runtime.GOMAXPROCS(0)) var wg sync.WaitGroup for _, sh := range a { costerr = limit.Take(ctx) wg.Add(1) mu.RLock() if costerr != nil { limit.Release() mu.RUnlock() break } mu.RUnlock() go func(sh *Shard) { defer limit.Release() defer wg.Done() engine, err := sh.Engine() if err != nil { setErr(err) return } cost, err := engine.IteratorCost(measurement, opt) if err != nil { setErr(err) return } mu.Lock() costs = costs.Combine(cost) mu.Unlock() }(sh) } wg.Wait() return costs, costerr } func (a Shards) CreateSeriesCursor(ctx context.Context, req SeriesCursorRequest, cond influxql.Expr) (_ SeriesCursor, err error) { var ( idxs []Index sfile *SeriesFile ) for _, sh := range a { var idx Index if idx, err = sh.Index(); err == nil { idxs = append(idxs, idx) } if sfile == nil { sfile, _ = sh.SeriesFile() } } if sfile == nil { return nil, errors.New("CreateSeriesCursor: no series file") } return newSeriesCursor(req, IndexSet{Indexes: idxs, SeriesFile: sfile}, cond) } func (a Shards) ExpandSources(sources influxql.Sources) (influxql.Sources, error) { // Use a map as a set to prevent duplicates. set := map[string]influxql.Source{} // Iterate through every shard and expand the sources. for _, sh := range a { sh.mu.RLock() expanded, err := sh.expandSources(sources) sh.mu.RUnlock() if err != nil { return nil, err } for _, src := range expanded { switch src := src.(type) { case *influxql.Measurement: set[src.String()] = src default: return nil, fmt.Errorf("Store.ExpandSources: unsupported source type: %T", src) } } } // Convert set to sorted slice. names := make([]string, 0, len(set)) for name := range set { names = append(names, name) } sort.Strings(names) // Convert set to a list of Sources. sorted := make([]influxql.Source, 0, len(set)) for _, name := range names { sorted = append(sorted, set[name]) } return sorted, nil } // MeasurementFields holds the fields of a measurement and their codec. type MeasurementFields struct { mu sync.Mutex fields atomic.Value // map[string]*Field } // NewMeasurementFields returns an initialised *MeasurementFields value. func NewMeasurementFields() *MeasurementFields { fields := make(map[string]*Field) mf := &MeasurementFields{} mf.fields.Store(fields) return mf } func (m *MeasurementFields) FieldKeys() []string { fields := m.fields.Load().(map[string]*Field) a := make([]string, 0, len(fields)) for key := range fields { a = append(a, key) } sort.Strings(a) return a } // bytes estimates the memory footprint of this MeasurementFields, in bytes. func (m *MeasurementFields) bytes() int { var b int b += 24 // mu RWMutex is 24 bytes fields := m.fields.Load().(map[string]*Field) b += int(unsafe.Sizeof(fields)) for k, v := range fields { b += int(unsafe.Sizeof(k)) + len(k) b += int(unsafe.Sizeof(v)+unsafe.Sizeof(*v)) + len(v.Name) } return b } // CreateFieldIfNotExists creates a new field with an autoincrementing ID. // Returns an error if 255 fields have already been created on the measurement or // the fields already exists with a different type. func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType) error { fields := m.fields.Load().(map[string]*Field) // Ignore if the field already exists. if f := fields[string(name)]; f != nil { if f.Type != typ { return ErrFieldTypeConflict } return nil } m.mu.Lock() defer m.mu.Unlock() fields = m.fields.Load().(map[string]*Field) // Re-check field and type under write lock. if f := fields[string(name)]; f != nil { if f.Type != typ { return ErrFieldTypeConflict } return nil } fieldsUpdate := make(map[string]*Field, len(fields)+1) for k, v := range fields { fieldsUpdate[k] = v } // Create and append a new field. f := &Field{ ID: uint8(len(fields) + 1), Name: string(name), Type: typ, } fieldsUpdate[string(name)] = f m.fields.Store(fieldsUpdate) return nil } func (m *MeasurementFields) FieldN() int { n := len(m.fields.Load().(map[string]*Field)) return n } // Field returns the field for name, or nil if there is no field for name. func (m *MeasurementFields) Field(name string) *Field { f := m.fields.Load().(map[string]*Field)[name] return f } func (m *MeasurementFields) HasField(name string) bool { if m == nil { return false } f := m.fields.Load().(map[string]*Field)[name] return f != nil } // FieldBytes returns the field for name, or nil if there is no field for name. // FieldBytes should be preferred to Field when the caller has a []byte, because // it avoids a string allocation, which can't be avoided if the caller converts // the []byte to a string and calls Field. func (m *MeasurementFields) FieldBytes(name []byte) *Field { f := m.fields.Load().(map[string]*Field)[string(name)] return f } // FieldSet returns the set of fields and their types for the measurement. func (m *MeasurementFields) FieldSet() map[string]influxql.DataType { fields := m.fields.Load().(map[string]*Field) fieldTypes := make(map[string]influxql.DataType) for name, f := range fields { fieldTypes[name] = f.Type } return fieldTypes } func (m *MeasurementFields) ForEachField(fn func(name string, typ influxql.DataType) bool) { fields := m.fields.Load().(map[string]*Field) for name, f := range fields { if !fn(name, f.Type) { return } } } type FieldChanges []*FieldChange func MeasurementsToFieldChangeDeletions(measurements []string) FieldChanges { fcs := make([]*FieldChange, 0, len(measurements)) for _, m := range measurements { fcs = append(fcs, &FieldChange{ FieldCreate: FieldCreate{ Measurement: []byte(m), Field: nil, }, ChangeType: DeleteMeasurement, }) } return fcs } // MeasurementFieldSet represents a collection of fields by measurement. // This safe for concurrent use. type MeasurementFieldSet struct { mu sync.RWMutex fields map[string]*MeasurementFields // path is the location to persist field sets path string changeMgr *measurementFieldSetChangeMgr } // NewMeasurementFieldSet returns a new instance of MeasurementFieldSet. func NewMeasurementFieldSet(path string, logger *zap.Logger) (*MeasurementFieldSet, error) { const MaxCombinedWrites = 100 fs := &MeasurementFieldSet{ fields: make(map[string]*MeasurementFields), path: path, } if nil == logger { logger = zap.NewNop() } fs.SetMeasurementFieldSetWriter(MaxCombinedWrites, logger) // If there is a load error, return the error and an empty set so // it can be rebuild manually. return fs, fs.load() } func (fs *MeasurementFieldSet) Close() error { if fs != nil && fs.changeMgr != nil { fs.changeMgr.Close() // If there is a change log file, save the in-memory version if _, err := os.Stat(fs.changeMgr.changeFilePath); err == nil { return fs.WriteToFile() } else if os.IsNotExist(err) { return nil } else { return fmt.Errorf("cannot get file information for %s: %w", fs.changeMgr.changeFilePath, err) } } return nil } func (fs *MeasurementFieldSet) ChangesPath() string { return fs.changeMgr.changeFilePath } // Bytes estimates the memory footprint of this MeasurementFieldSet, in bytes. func (fs *MeasurementFieldSet) Bytes() int { var b int fs.mu.RLock() b += 24 // mu RWMutex is 24 bytes for k, v := range fs.fields { b += int(unsafe.Sizeof(k)) + len(k) b += int(unsafe.Sizeof(v)) + v.bytes() } b += int(unsafe.Sizeof(fs.fields)) b += int(unsafe.Sizeof(fs.path)) + len(fs.path) fs.mu.RUnlock() return b } // MeasurementNames returns the names of all of the measurements in the field set in // lexographical order. func (fs *MeasurementFieldSet) MeasurementNames() []string { fs.mu.RLock() defer fs.mu.RUnlock() names := make([]string, 0, len(fs.fields)) for name := range fs.fields { names = append(names, name) } sort.Strings(names) return names } // Fields returns fields for a measurement by name. func (fs *MeasurementFieldSet) Fields(name []byte) *MeasurementFields { fs.mu.RLock() mf := fs.fields[string(name)] fs.mu.RUnlock() return mf } // FieldsByString returns fields for a measurement by name. func (fs *MeasurementFieldSet) FieldsByString(name string) *MeasurementFields { fs.mu.RLock() mf := fs.fields[name] fs.mu.RUnlock() return mf } // CreateFieldsIfNotExists returns fields for a measurement by name. func (fs *MeasurementFieldSet) CreateFieldsIfNotExists(name []byte) *MeasurementFields { fs.mu.RLock() mf := fs.fields[string(name)] fs.mu.RUnlock() if mf != nil { return mf } fs.mu.Lock() mf = fs.fields[string(name)] if mf == nil { mf = NewMeasurementFields() fs.fields[string(name)] = mf } fs.mu.Unlock() return mf } // Delete removes a field set for a measurement. func (fs *MeasurementFieldSet) Delete(name string) { fs.mu.Lock() fs.deleteNoLock(name) fs.mu.Unlock() } // DeleteWithLock executes fn and removes a field set from a measurement under lock. func (fs *MeasurementFieldSet) DeleteWithLock(name string, fn func() error) error { fs.mu.Lock() defer fs.mu.Unlock() if err := fn(); err != nil { return err } fs.deleteNoLock(name) return nil } // deleteNoLock removes a field set for a measurement func (fs *MeasurementFieldSet) deleteNoLock(name string) { delete(fs.fields, name) } func (fs *MeasurementFieldSet) IsEmpty() bool { fs.mu.RLock() defer fs.mu.RUnlock() return len(fs.fields) == 0 } type errorChannel chan<- error type writeRequest struct { errorReturn chan<- error changes FieldChanges } type measurementFieldSetChangeMgr struct { mu sync.Mutex wg sync.WaitGroup writeRequests chan writeRequest changeFilePath string logger *zap.Logger changeFileSize int64 } // SetMeasurementFieldSetWriter - initialize the queue for write requests // and start the background write process func (fs *MeasurementFieldSet) SetMeasurementFieldSetWriter(queueLength int, logger *zap.Logger) { fs.mu.Lock() defer fs.mu.Unlock() fs.changeMgr = &measurementFieldSetChangeMgr{ writeRequests: make(chan writeRequest, queueLength), changeFilePath: filepath.Join(filepath.Dir(fs.path), FieldsChangeFile), logger: logger, changeFileSize: int64(0), } fs.changeMgr.wg.Add(1) go fs.changeMgr.SaveWriter() } func (fscm *measurementFieldSetChangeMgr) Close() { if fscm != nil { close(fscm.writeRequests) fscm.wg.Wait() } } func (fs *MeasurementFieldSet) Save(changes FieldChanges) error { return fs.changeMgr.RequestSave(changes) } func (fscm *measurementFieldSetChangeMgr) RequestSave(changes FieldChanges) error { done := make(chan error) fscm.writeRequests <- writeRequest{errorReturn: done, changes: changes} return <-done } func (fscm *measurementFieldSetChangeMgr) SaveWriter() { defer fscm.wg.Done() // Block until someone modifies the MeasurementFieldSet, and // it needs to be written to disk. Exit when the channel is closed for wr, ok := <-fscm.writeRequests; ok; wr, ok = <-fscm.writeRequests { fscm.appendToChangesFile(wr) } } // WriteToFile: Write the new index to a temp file and rename when it's sync'd // This locks the MeasurementFieldSet during the marshaling, the write, and the rename. func (fs *MeasurementFieldSet) WriteToFile() error { path := fs.path + ".tmp" // Open the temp file fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666) if err != nil { return fmt.Errorf("failed opening %s: %w", fs.path, err) } // Ensure temp file is cleaned up defer func() { if e := os.RemoveAll(path); err == nil && e != nil { err = fmt.Errorf("failed removing temporary file %s: %w", path, e) } if e := os.RemoveAll(fs.changeMgr.changeFilePath); err == nil && e != nil { err = fmt.Errorf("failed removing saved field changes - %s: %w", fs.changeMgr.changeFilePath, e) } }() fs.mu.RLock() defer fs.mu.RUnlock() isEmpty, err := func() (isEmpty bool, err error) { // ensure temp file closed before rename (for Windows) defer func() { if e := fd.Close(); err == nil && e != nil { err = fmt.Errorf("closing %s: %w", path, e) } }() if _, err = fd.Write(fieldsIndexMagicNumber); err != nil { return true, fmt.Errorf("failed writing magic number for %s: %w", path, err) } // Lock, copy, and marshal the in-memory index b, err := fs.marshalMeasurementFieldSetNoLock() if err != nil { return true, fmt.Errorf("failed marshaling fields for %s: %w", fs.path, err) } if b == nil { // No fields, file removed, all done return true, nil } if _, err := fd.Write(b); err != nil { return true, fmt.Errorf("failed saving fields to %s: %w", path, err) } return false, nil }() if err != nil { return err } else if isEmpty { // remove empty file if err = os.RemoveAll(fs.path); err != nil { return fmt.Errorf("cannot remove %s: %w", fs.path, err) } else { return nil } } return fs.renameFileNoLock(path) } // appendToChangesFile: Write a change file for fields.idx // Only called in one Go proc, so does not need locking. func (fscm *measurementFieldSetChangeMgr) appendToChangesFile(first writeRequest) { var err error = nil // Put the errorChannel on which we blocked into a slice to allow more invocations // to share the return code from the file write errorChannels := []errorChannel{first.errorReturn} changes := []FieldChanges{first.changes} // On return, send the error to every go proc that send changes defer func() { for _, c := range errorChannels { c <- err close(c) } }() log, end := logger.NewOperation(context.TODO(), fscm.logger, "saving field index changes", "MeasurementFieldSet") defer end() // Do some blocking IO operations before marshalling the changes, // to allow other changes to be queued up and be captured in one // write operation, in case we are under heavy field creation load fscm.mu.Lock() defer fscm.mu.Unlock() fd, err := os.OpenFile(fscm.changeFilePath, os.O_CREATE|os.O_APPEND|os.O_SYNC|os.O_WRONLY, 0666) if err != nil { err = fmt.Errorf("opening %s: %w", fscm.changeFilePath, err) log.Error("failed", zap.Error(err)) return } // ensure file closed defer errors2.Capture(&err, func() error { if e := fd.Close(); e != nil { e = fmt.Errorf("closing %s: %w", fd.Name(), e) log.Error("failed", zap.Error(e)) return e } else { return nil } })() var fi os.FileInfo if fi, err = fd.Stat(); err != nil { err = fmt.Errorf("unable to get size of %s: %w", fd.Name(), err) log.Error("failed", zap.Error(err)) return } else if fi.Size() > fscm.changeFileSize { // If we had a partial write last time, truncate the file to remove it. if err = fd.Truncate(fscm.changeFileSize); err != nil { err = fmt.Errorf("cannot truncate %s to last known good size of %d after incomplete write: %w", fd.Name(), fscm.changeFileSize, err) log.Error("failed", zap.Error(err)) return } } // Read all the pending field and measurement write or delete // requests for { select { case wr := <-fscm.writeRequests: changes = append(changes, wr.changes) errorChannels = append(errorChannels, wr.errorReturn) continue default: } break } // marshal the slice of slices of field changes in size-prefixed protobuf var b []byte b, err = marshalFieldChanges(changes...) if err != nil { err = fmt.Errorf("error marshaling changes for %s: %w", fd.Name(), err) log.Error("failed", zap.Error(err)) return } if _, err = fd.Write(b); err != nil { err = fmt.Errorf("failed writing to %s: %w", fd.Name(), err) log.Error("failed", zap.Error(err)) return } else if fi, err = fd.Stat(); err != nil { err = fmt.Errorf("unable to get final size of %s after appendation: %w", fd.Name(), err) log.Error("failed", zap.Error(err)) return } else { fscm.changeFileSize = fi.Size() } } func readSizePlusBuffer(r io.Reader, b []byte) ([]byte, error) { var numBuf [bytesInInt64]byte if _, err := r.Read(numBuf[:]); err != nil { return nil, err } size := int(binary.LittleEndian.Uint64(numBuf[:])) if cap(b) < size { b = make([]byte, size) } _, err := io.ReadAtLeast(r, b, size) if err != nil { return nil, err } return b, nil } func (fs *MeasurementFieldSet) renameFileNoLock(path string) error { if err := file.RenameFile(path, fs.path); err != nil { return fmt.Errorf("cannot rename %s to %s: %w", path, fs.path, err) } dir := filepath.Dir(fs.path) if err := file.SyncDir(dir); err != nil { return fmt.Errorf("cannot sync directory %s: %w", dir, err) } return nil } // marshalMeasurementFieldSetNoLock: remove the fields.idx file if no fields // otherwise, copy the in-memory version into a protobuf to write to // disk func (fs *MeasurementFieldSet) marshalMeasurementFieldSetNoLock() (marshalled []byte, err error) { if len(fs.fields) == 0 { // If no fields left, remove the fields index file return nil, nil } pb := internal.MeasurementFieldSet{ Measurements: make([]*internal.MeasurementFields, 0, len(fs.fields)), } for name, mf := range fs.fields { imf := &internal.MeasurementFields{ Name: []byte(name), Fields: make([]*internal.Field, 0, mf.FieldN()), } mf.ForEachField(func(field string, typ influxql.DataType) bool { imf.Fields = append(imf.Fields, &internal.Field{Name: []byte(field), Type: int32(typ)}) return true }) pb.Measurements = append(pb.Measurements, imf) } b, err := proto.Marshal(&pb) if err != nil { return nil, err } else { return b, nil } } func marshalFieldChanges(changeSet ...FieldChanges) ([]byte, error) { fcs := internal.FieldChangeSet{ Changes: nil, } for _, fc := range changeSet { for _, f := range fc { mfc := &internal.MeasurementFieldChange{ Measurement: f.Measurement, Change: internal.ChangeType(f.ChangeType), } if f.Field != nil { mfc.Field = &internal.Field{ Name: []byte(f.Field.Name), Type: int32(f.Field.Type), } fcs.Changes = append(fcs.Changes, mfc) } } } mo := proto.MarshalOptions{} var numBuf [bytesInInt64]byte b, err := mo.MarshalAppend(numBuf[:], &fcs) binary.LittleEndian.PutUint64(b[0:bytesInInt64], uint64(len(b)-bytesInInt64)) if err != nil { fields := make([]string, 0, len(fcs.Changes)) for _, fc := range changeSet { for _, f := range fc { fields = append(fields, fmt.Sprintf("%q.%q", f.Measurement, f.Field.Name)) } } return nil, fmt.Errorf("failed marshaling new fields - %s: %w", strings.Join(fields, ", "), err) } return b, nil } func (fs *MeasurementFieldSet) load() (rErr error) { err := func() error { fs.mu.Lock() defer fs.mu.Unlock() pb, err := fs.loadParseFieldIndexPB() if err != nil { return err } fs.fields = make(map[string]*MeasurementFields, len(pb.GetMeasurements())) for _, measurement := range pb.GetMeasurements() { fields := make(map[string]*Field, len(measurement.GetFields())) for _, field := range measurement.GetFields() { fields[string(field.GetName())] = &Field{Name: string(field.GetName()), Type: influxql.DataType(field.GetType())} } set := &MeasurementFields{} set.fields.Store(fields) fs.fields[string(measurement.GetName())] = set } return nil }() if err != nil { return fmt.Errorf("failed loading field indices: %w", err) } return fs.ApplyChanges() } func (fs *MeasurementFieldSet) loadParseFieldIndexPB() (pb *internal.MeasurementFieldSet, rErr error) { pb = &internal.MeasurementFieldSet{} fd, err := os.Open(fs.path) if os.IsNotExist(err) { return pb, nil } else if err != nil { err = fmt.Errorf("failed opening %s: %w", fs.path, err) return nil, err } defer errors2.Capture(&rErr, func() error { if e := fd.Close(); e != nil { return fmt.Errorf("failed closing %s: %w", fd.Name(), e) } else { return nil } })() var magic [4]byte if _, err := fd.Read(magic[:]); err != nil { err = fmt.Errorf("failed reading %s: %w", fs.path, err) return nil, err } if !bytes.Equal(magic[:], fieldsIndexMagicNumber) { return nil, fmt.Errorf("%q: %w", fs.path, ErrUnknownFieldsFormat) } b, err := io.ReadAll(fd) if err != nil { err = fmt.Errorf("failed reading %s: %w", fs.path, err) return nil, err } if err = proto.Unmarshal(b, pb); err != nil { err = fmt.Errorf("failed unmarshaling %s: %w", fs.path, err) return nil, err } return pb, err } func (fscm *measurementFieldSetChangeMgr) loadAllFieldChanges(log *zap.Logger) (changes []FieldChanges, rErr error) { var fcs FieldChanges fscm.mu.Lock() defer fscm.mu.Unlock() fd, err := os.Open(fscm.changeFilePath) if os.IsNotExist(err) { return nil, nil } if err != nil { err = fmt.Errorf("failed opening %s: %w", fscm.changeFilePath, err) log.Error("field index file of changes", zap.Error(err)) return nil, err } defer errors2.Capture(&rErr, func() error { if e := fd.Close(); e != nil { return fmt.Errorf("failed closing %s: %w", fd.Name(), e) } else { return nil } })() for fcs, err = fscm.loadFieldChangeSet(fd); err == nil; fcs, err = fscm.loadFieldChangeSet(fd) { changes = append(changes, fcs) } if errors.Is(err, io.EOF) { return changes, nil } else if errors.Is(err, io.ErrUnexpectedEOF) { log.Warn("last entry was an incomplete write", zap.Error(err)) return changes, nil } else { log.Error("field index file of changes", zap.Error(err)) return nil, err } } func (fscm *measurementFieldSetChangeMgr) loadFieldChangeSet(r io.Reader) (FieldChanges, error) { var pb internal.FieldChangeSet b, err := readSizePlusBuffer(r, nil) if err != nil { return nil, fmt.Errorf("failed reading %s: %w", fscm.changeFilePath, err) } if err := proto.Unmarshal(b, &pb); err != nil { return nil, fmt.Errorf("failed unmarshalling %s: %w", fscm.changeFilePath, err) } fcs := make([]*FieldChange, 0, len(pb.Changes)) for _, fc := range pb.Changes { fcs = append(fcs, &FieldChange{ FieldCreate: FieldCreate{ Measurement: fc.Measurement, Field: &Field{ ID: 0, Name: string(fc.Field.Name), Type: influxql.DataType(fc.Field.Type), }, }, ChangeType: ChangeType(fc.Change), }) } return fcs, nil } func (fs *MeasurementFieldSet) ApplyChanges() error { log, end := logger.NewOperation(context.TODO(), fs.changeMgr.logger, "loading changes", "field indices") defer end() changes, err := fs.changeMgr.loadAllFieldChanges(log) if err != nil { return err } if len(changes) <= 0 { return os.RemoveAll(fs.changeMgr.changeFilePath) } for _, fcs := range changes { for _, fc := range fcs { if fc.ChangeType == DeleteMeasurement { fs.Delete(string(fc.Measurement)) } else { mf := fs.CreateFieldsIfNotExists(fc.Measurement) if err := mf.CreateFieldIfNotExists([]byte(fc.Field.Name), fc.Field.Type); err != nil { err = fmt.Errorf("failed creating %q.%q: %w", fc.Measurement, fc.Field.Name, err) log.Error("field creation", zap.Error(err)) return err } } } } return fs.WriteToFile() } // Field represents a series field. All of the fields must be hashable. type Field struct { ID uint8 `json:"id,omitempty"` Name string `json:"name,omitempty"` Type influxql.DataType `json:"type,omitempty"` } type FieldChange struct { FieldCreate ChangeType ChangeType } type ChangeType int const ( AddMeasurementField = ChangeType(internal.ChangeType_AddMeasurementField) DeleteMeasurement = ChangeType(internal.ChangeType_DeleteMeasurement) ) // NewFieldKeysIterator returns an iterator that can be iterated over to // retrieve field keys. func NewFieldKeysIterator(sh *Shard, opt query.IteratorOptions) (query.Iterator, error) { itr := &fieldKeysIterator{shard: sh} index, err := sh.Index() if err != nil { return nil, err } // Retrieve measurements from shard. Filter if condition specified. // // FGA is currently not supported when retrieving field keys. indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sh.sfile} names, err := indexSet.MeasurementNamesByExpr(query.OpenAuthorizer, opt.Condition) if err != nil { return nil, err } itr.names = names return itr, nil } // fieldKeysIterator iterates over measurements and gets field keys from each measurement. type fieldKeysIterator struct { shard *Shard names [][]byte // remaining measurement names buf struct { name []byte // current measurement name fields []Field // current measurement's fields } } // Stats returns stats about the points processed. func (itr *fieldKeysIterator) Stats() query.IteratorStats { return query.IteratorStats{} } // Close closes the iterator. func (itr *fieldKeysIterator) Close() error { return nil } // Next emits the next tag key name. func (itr *fieldKeysIterator) Next() (*query.FloatPoint, error) { for { // If there are no more keys then move to the next measurements. if len(itr.buf.fields) == 0 { if len(itr.names) == 0 { return nil, nil } itr.buf.name = itr.names[0] mf := itr.shard.MeasurementFields(itr.buf.name) if mf != nil { fset := mf.FieldSet() if len(fset) == 0 { itr.names = itr.names[1:] continue } keys := make([]string, 0, len(fset)) for k := range fset { keys = append(keys, k) } sort.Strings(keys) itr.buf.fields = make([]Field, len(keys)) for i, name := range keys { itr.buf.fields[i] = Field{Name: name, Type: fset[name]} } } itr.names = itr.names[1:] continue } // Return next key. field := itr.buf.fields[0] p := &query.FloatPoint{ Name: string(itr.buf.name), Aux: []interface{}{field.Name, field.Type.String()}, } itr.buf.fields = itr.buf.fields[1:] return p, nil } } // NewTagKeysIterator returns a new instance of TagKeysIterator. func NewTagKeysIterator(sh *Shard, opt query.IteratorOptions) (query.Iterator, error) { fn := func(name []byte) ([][]byte, error) { index, err := sh.Index() if err != nil { return nil, err } indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sh.sfile} var keys [][]byte if err := indexSet.ForEachMeasurementTagKey(name, func(key []byte) error { keys = append(keys, key) return nil }); err != nil { return nil, err } return keys, nil } return newMeasurementKeysIterator(sh, fn, opt) } // measurementKeyFunc is the function called by measurementKeysIterator. type measurementKeyFunc func(name []byte) ([][]byte, error) func newMeasurementKeysIterator(sh *Shard, fn measurementKeyFunc, opt query.IteratorOptions) (*measurementKeysIterator, error) { index, err := sh.Index() if err != nil { return nil, err } indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sh.sfile} itr := &measurementKeysIterator{fn: fn} names, err := indexSet.MeasurementNamesByExpr(opt.Authorizer, opt.Condition) if err != nil { return nil, err } itr.names = names return itr, nil } // measurementKeysIterator iterates over measurements and gets keys from each measurement. type measurementKeysIterator struct { names [][]byte // remaining measurement names buf struct { name []byte // current measurement name keys [][]byte // current measurement's keys } fn measurementKeyFunc } // Stats returns stats about the points processed. func (itr *measurementKeysIterator) Stats() query.IteratorStats { return query.IteratorStats{} } // Close closes the iterator. func (itr *measurementKeysIterator) Close() error { return nil } // Next emits the next tag key name. func (itr *measurementKeysIterator) Next() (*query.FloatPoint, error) { for { // If there are no more keys then move to the next measurements. if len(itr.buf.keys) == 0 { if len(itr.names) == 0 { return nil, nil } itr.buf.name, itr.names = itr.names[0], itr.names[1:] keys, err := itr.fn(itr.buf.name) if err != nil { return nil, err } itr.buf.keys = keys continue } // Return next key. p := &query.FloatPoint{ Name: string(itr.buf.name), Aux: []interface{}{string(itr.buf.keys[0])}, } itr.buf.keys = itr.buf.keys[1:] return p, nil } } // LimitError represents an error caused by a configurable limit. type LimitError struct { Reason string } func (e *LimitError) Error() string { return e.Reason }