package tsdb import ( "bytes" "context" "errors" "fmt" "io" "io/ioutil" "os" "path/filepath" "regexp" "runtime" "sort" "strings" "sync" "sync/atomic" "time" "unsafe" "github.com/gogo/protobuf/proto" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/bytesutil" "github.com/influxdata/influxdb/pkg/estimator" "github.com/influxdata/influxdb/pkg/file" "github.com/influxdata/influxdb/pkg/limiter" "github.com/influxdata/influxdb/query" internal "github.com/influxdata/influxdb/tsdb/internal" "github.com/influxdata/influxql" "go.uber.org/zap" ) const ( statWriteReq = "writeReq" statWriteReqOK = "writeReqOk" statWriteReqErr = "writeReqErr" statSeriesCreate = "seriesCreate" statFieldsCreate = "fieldsCreate" statWritePointsErr = "writePointsErr" statWritePointsDropped = "writePointsDropped" statWritePointsOK = "writePointsOk" statWriteBytes = "writeBytes" statDiskBytes = "diskBytes" ) var ( // ErrFieldOverflow is returned when too many fields are created on a measurement. ErrFieldOverflow = errors.New("field overflow") // ErrFieldTypeConflict is returned when a new field already exists with a different type. ErrFieldTypeConflict = errors.New("field type conflict") // ErrFieldNotFound is returned when a field cannot be found. ErrFieldNotFound = errors.New("field not found") // ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID // there is no mapping for. ErrFieldUnmappedID = errors.New("field ID not mapped") // ErrEngineClosed is returned when a caller attempts indirectly to // access the shard's underlying engine. ErrEngineClosed = errors.New("engine is closed") // ErrShardDisabled is returned when a the shard is not available for // queries or writes. ErrShardDisabled = errors.New("shard is disabled") // ErrUnknownFieldsFormat is returned when the fields index file is not identifiable by // the file's magic number. ErrUnknownFieldsFormat = errors.New("unknown field index format") // ErrUnknownFieldType is returned when the type of a field cannot be determined. ErrUnknownFieldType = errors.New("unknown field type") // ErrShardNotIdle is returned when an operation requring the shard to be idle/cold is // attempted on a hot shard. ErrShardNotIdle = errors.New("shard not idle") // fieldsIndexMagicNumber is the file magic number for the fields index file. fieldsIndexMagicNumber = []byte{0, 6, 1, 3} ) var ( // Static objects to prevent small allocs. timeBytes = []byte("time") ) // A ShardError implements the error interface, and contains extra // context about the shard that generated the error. type ShardError struct { id uint64 Err error } // NewShardError returns a new ShardError. func NewShardError(id uint64, err error) error { if err == nil { return nil } return ShardError{id: id, Err: err} } // Error returns the string representation of the error, to satisfy the error interface. func (e ShardError) Error() string { return fmt.Sprintf("[shard %d] %s", e.id, e.Err) } // PartialWriteError indicates a write request could only write a portion of the // requested values. type PartialWriteError struct { Reason string Dropped int // A sorted slice of series keys that were dropped. DroppedKeys [][]byte } func (e PartialWriteError) Error() string { return fmt.Sprintf("partial write: %s dropped=%d", e.Reason, e.Dropped) } // Shard represents a self-contained time series database. An inverted index of // the measurement and tag data is kept along with the raw time series data. // Data can be split across many shards. The query engine in TSDB is responsible // for combining the output of many shards into a single query result. type Shard struct { path string walPath string id uint64 database string retentionPolicy string sfile *SeriesFile options EngineOptions mu sync.RWMutex _engine Engine index Index enabled bool // expvar-based stats. stats *ShardStatistics defaultTags models.StatisticTags baseLogger *zap.Logger logger *zap.Logger EnableOnOpen bool // CompactionDisabled specifies the shard should not schedule compactions. // This option is intended for offline tooling. CompactionDisabled bool } // NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index func NewShard(id uint64, path string, walPath string, sfile *SeriesFile, opt EngineOptions) *Shard { db, rp := decodeStorePath(path) logger := zap.NewNop() if opt.FieldValidator == nil { opt.FieldValidator = defaultFieldValidator{} } s := &Shard{ id: id, path: path, walPath: walPath, sfile: sfile, options: opt, stats: &ShardStatistics{}, defaultTags: models.StatisticTags{ "path": path, "walPath": walPath, "id": fmt.Sprintf("%d", id), "database": db, "retentionPolicy": rp, "engine": opt.EngineVersion, }, database: db, retentionPolicy: rp, logger: logger, baseLogger: logger, EnableOnOpen: true, } return s } // WithLogger sets the logger on the shard. It must be called before Open. func (s *Shard) WithLogger(log *zap.Logger) { s.baseLogger = log engine, err := s.Engine() if err == nil { engine.WithLogger(s.baseLogger) s.index.WithLogger(s.baseLogger) } s.logger = s.baseLogger.With(zap.String("service", "shard")) } // SetEnabled enables the shard for queries and write. When disabled, all // writes and queries return an error and compactions are stopped for the shard. func (s *Shard) SetEnabled(enabled bool) { s.mu.Lock() // Prevent writes and queries s.enabled = enabled if s._engine != nil && !s.CompactionDisabled { // Disable background compactions and snapshotting s._engine.SetEnabled(enabled) } s.mu.Unlock() } // ScheduleFullCompaction forces a full compaction to be schedule on the shard. func (s *Shard) ScheduleFullCompaction() error { engine, err := s.Engine() if err != nil { return err } return engine.ScheduleFullCompaction() } // ID returns the shards ID. func (s *Shard) ID() uint64 { return s.id } // Database returns the database of the shard. func (s *Shard) Database() string { return s.database } // RetentionPolicy returns the retention policy of the shard. func (s *Shard) RetentionPolicy() string { return s.retentionPolicy } // ShardStatistics maintains statistics for a shard. type ShardStatistics struct { WriteReq int64 WriteReqOK int64 WriteReqErr int64 FieldsCreated int64 WritePointsErr int64 WritePointsDropped int64 WritePointsOK int64 BytesWritten int64 DiskBytes int64 } // Statistics returns statistics for periodic monitoring. func (s *Shard) Statistics(tags map[string]string) []models.Statistic { engine, err := s.Engine() if err != nil { return nil } // Refresh our disk size stat if _, err := s.DiskSize(); err != nil { return nil } seriesN := engine.SeriesN() tags = s.defaultTags.Merge(tags) statistics := []models.Statistic{{ Name: "shard", Tags: tags, Values: map[string]interface{}{ statWriteReq: atomic.LoadInt64(&s.stats.WriteReq), statWriteReqOK: atomic.LoadInt64(&s.stats.WriteReqOK), statWriteReqErr: atomic.LoadInt64(&s.stats.WriteReqErr), statSeriesCreate: seriesN, statFieldsCreate: atomic.LoadInt64(&s.stats.FieldsCreated), statWritePointsErr: atomic.LoadInt64(&s.stats.WritePointsErr), statWritePointsDropped: atomic.LoadInt64(&s.stats.WritePointsDropped), statWritePointsOK: atomic.LoadInt64(&s.stats.WritePointsOK), statWriteBytes: atomic.LoadInt64(&s.stats.BytesWritten), statDiskBytes: atomic.LoadInt64(&s.stats.DiskBytes), }, }} // Add the index and engine statistics. statistics = append(statistics, engine.Statistics(tags)...) return statistics } // Path returns the path set on the shard when it was created. func (s *Shard) Path() string { return s.path } // Open initializes and opens the shard's store. func (s *Shard) Open() error { if err := func() error { s.mu.Lock() defer s.mu.Unlock() // Return if the shard is already open if s._engine != nil { return nil } seriesIDSet := NewSeriesIDSet() // Initialize underlying index. ipath := filepath.Join(s.path, "index") idx, err := NewIndex(s.id, s.database, ipath, seriesIDSet, s.sfile, s.options) if err != nil { return err } idx.WithLogger(s.baseLogger) // Open index. if err := idx.Open(); err != nil { return err } s.index = idx // Initialize underlying engine. e, err := NewEngine(s.id, idx, s.path, s.walPath, s.sfile, s.options) if err != nil { return err } // Set log output on the engine. e.WithLogger(s.baseLogger) // Disable compactions while loading the index e.SetEnabled(false) // Open engine. if err := e.Open(); err != nil { return err } // Load metadata index for the inmem index only. if err := e.LoadMetadataIndex(s.id, s.index); err != nil { return err } s._engine = e return nil }(); err != nil { s.close() return NewShardError(s.id, err) } if s.EnableOnOpen { // enable writes, queries and compactions s.SetEnabled(true) } return nil } // Close shuts down the shard's store. func (s *Shard) Close() error { s.mu.Lock() defer s.mu.Unlock() return s.close() } // close closes the shard an removes reference to the shard from associated // indexes, unless clean is false. func (s *Shard) close() error { if s._engine == nil { return nil } err := s._engine.Close() if err == nil { s._engine = nil } if e := s.index.Close(); e == nil { s.index = nil } return err } // IndexType returns the index version being used for this shard. // // IndexType returns the empty string if it is called before the shard is opened, // since it is only that point that the underlying index type is known. func (s *Shard) IndexType() string { s.mu.RLock() defer s.mu.RUnlock() if s._engine == nil || s.index == nil { // Shard not open yet. return "" } return s.index.Type() } // ready determines if the Shard is ready for queries or writes. // It returns nil if ready, otherwise ErrShardClosed or ErrShardDisabled func (s *Shard) ready() error { var err error if s._engine == nil { err = ErrEngineClosed } else if !s.enabled { err = ErrShardDisabled } return err } // LastModified returns the time when this shard was last modified. func (s *Shard) LastModified() time.Time { engine, err := s.Engine() if err != nil { return time.Time{} } return engine.LastModified() } // Index returns a reference to the underlying index. It returns an error if // the index is nil. func (s *Shard) Index() (Index, error) { s.mu.RLock() defer s.mu.RUnlock() if err := s.ready(); err != nil { return nil, err } return s.index, nil } func (s *Shard) seriesFile() (*SeriesFile, error) { s.mu.RLock() defer s.mu.RUnlock() if err := s.ready(); err != nil { return nil, err } return s.sfile, nil } // IsIdle return true if the shard is not receiving writes and is fully compacted. func (s *Shard) IsIdle() bool { engine, err := s.Engine() if err != nil { return true } return engine.IsIdle() } func (s *Shard) Free() error { engine, err := s.Engine() if err != nil { return err } // Disable compactions to stop background goroutines s.SetCompactionsEnabled(false) return engine.Free() } // SetCompactionsEnabled enables or disable shard background compactions. func (s *Shard) SetCompactionsEnabled(enabled bool) { engine, err := s.Engine() if err != nil { return } engine.SetCompactionsEnabled(enabled) } // DiskSize returns the size on disk of this shard. func (s *Shard) DiskSize() (int64, error) { s.mu.RLock() defer s.mu.RUnlock() // We don't use engine() becuase we still want to report the shard's disk // size even if the shard has been disabled. if s._engine == nil { return 0, ErrEngineClosed } size := s._engine.DiskSize() atomic.StoreInt64(&s.stats.DiskBytes, size) return size, nil } // FieldCreate holds information for a field to create on a measurement. type FieldCreate struct { Measurement []byte Field *Field } // WritePoints will write the raw data points and any new metadata to the index in the shard. func (s *Shard) WritePoints(points []models.Point) error { s.mu.RLock() defer s.mu.RUnlock() engine, err := s.engineNoLock() if err != nil { return err } var writeError error atomic.AddInt64(&s.stats.WriteReq, 1) points, fieldsToCreate, err := s.validateSeriesAndFields(points) if err != nil { if _, ok := err.(PartialWriteError); !ok { return err } // There was a partial write (points dropped), hold onto the error to return // to the caller, but continue on writing the remaining points. writeError = err } atomic.AddInt64(&s.stats.FieldsCreated, int64(len(fieldsToCreate))) // add any new fields and keep track of what needs to be saved if err := s.createFieldsAndMeasurements(fieldsToCreate); err != nil { return err } // Write to the engine. if err := engine.WritePoints(points); err != nil { atomic.AddInt64(&s.stats.WritePointsErr, int64(len(points))) atomic.AddInt64(&s.stats.WriteReqErr, 1) return fmt.Errorf("engine: %s", err) } atomic.AddInt64(&s.stats.WritePointsOK, int64(len(points))) atomic.AddInt64(&s.stats.WriteReqOK, 1) return writeError } // validateSeriesAndFields checks which series and fields are new and whose metadata should be saved and indexed. func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point, []*FieldCreate, error) { var ( fieldsToCreate []*FieldCreate err error dropped int reason string // only first error reason is set unless returned from CreateSeriesListIfNotExists ) // Create all series against the index in bulk. keys := make([][]byte, len(points)) names := make([][]byte, len(points)) tagsSlice := make([]models.Tags, len(points)) var j int for i, p := range points { tags := p.Tags() // Drop any series w/ a "time" tag, these are illegal if v := tags.Get(timeBytes); v != nil { dropped++ if reason == "" { reason = fmt.Sprintf( "invalid tag key: input tag \"%s\" on measurement \"%s\" is invalid", "time", string(p.Name())) } continue } keys[j] = p.Key() names[j] = p.Name() tagsSlice[j] = tags points[j] = points[i] j++ } points, keys, names, tagsSlice = points[:j], keys[:j], names[:j], tagsSlice[:j] engine, err := s.engineNoLock() if err != nil { return nil, nil, err } // Add new series. Check for partial writes. var droppedKeys [][]byte if err := engine.CreateSeriesListIfNotExists(keys, names, tagsSlice); err != nil { switch err := err.(type) { // TODO(jmw): why is this a *PartialWriteError when everything else is not a pointer? // Maybe we can just change it to be consistent if we change it also in all // the places that construct it. case *PartialWriteError: reason = err.Reason dropped += err.Dropped droppedKeys = err.DroppedKeys atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped)) default: return nil, nil, err } } // Create a MeasurementFields cache. mfCache := make(map[string]*MeasurementFields, 16) j = 0 for i, p := range points { // Skip any points with only invalid fields. iter := p.FieldIterator() validField := false for iter.Next() { if bytes.Equal(iter.FieldKey(), timeBytes) { continue } validField = true break } if !validField { if reason == "" { reason = fmt.Sprintf( "invalid field name: input field \"%s\" on measurement \"%s\" is invalid", "time", string(p.Name())) } dropped++ continue } // Skip any points whos keys have been dropped. Dropped has already been incremented for them. if len(droppedKeys) > 0 && bytesutil.Contains(droppedKeys, keys[i]) { continue } // Grab the MeasurementFields checking the local cache to avoid lock contention. name := p.Name() mf := mfCache[string(name)] if mf == nil { mf = engine.MeasurementFields(name).Clone() mfCache[string(name)] = mf } // Check with the field validator. if err := s.options.FieldValidator.Validate(mf, p); err != nil { switch err := err.(type) { case PartialWriteError: if reason == "" { reason = err.Reason } dropped += err.Dropped atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped)) default: return nil, nil, err } continue } points[j] = points[i] j++ // Create any fields that are missing. iter.Reset() for iter.Next() { fieldKey := iter.FieldKey() // Skip fields named "time". They are illegal. if bytes.Equal(fieldKey, timeBytes) { continue } if mf.FieldBytes(fieldKey) != nil { continue } dataType := dataTypeFromModelsFieldType(iter.Type()) if dataType == influxql.Unknown { continue } fieldsToCreate = append(fieldsToCreate, &FieldCreate{ Measurement: name, Field: &Field{ Name: string(fieldKey), Type: dataType, }, }) } } if dropped > 0 { err = PartialWriteError{Reason: reason, Dropped: dropped} } return points[:j], fieldsToCreate, err } func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error { if len(fieldsToCreate) == 0 { return nil } engine, err := s.engineNoLock() if err != nil { return err } // add fields for _, f := range fieldsToCreate { mf := engine.MeasurementFields(f.Measurement) if err := mf.CreateFieldIfNotExists([]byte(f.Field.Name), f.Field.Type); err != nil { return err } s.index.SetFieldName(f.Measurement, f.Field.Name) } if len(fieldsToCreate) > 0 { return engine.MeasurementFieldSet().Save() } return nil } // DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive) func (s *Shard) DeleteSeriesRange(itr SeriesIterator, min, max int64) error { engine, err := s.Engine() if err != nil { return err } return engine.DeleteSeriesRange(itr, min, max) } // DeleteSeriesRangeWithPredicate deletes all values from for seriesKeys between min and max (inclusive) // for which predicate() returns true. If predicate() is nil, then all values in range are deleted. func (s *Shard) DeleteSeriesRangeWithPredicate(itr SeriesIterator, predicate func(name []byte, tags models.Tags) (int64, int64, bool)) error { engine, err := s.Engine() if err != nil { return err } return engine.DeleteSeriesRangeWithPredicate(itr, predicate) } // DeleteMeasurement deletes a measurement and all underlying series. func (s *Shard) DeleteMeasurement(name []byte) error { engine, err := s.Engine() if err != nil { return err } return engine.DeleteMeasurement(name) } // SeriesN returns the unique number of series in the shard. func (s *Shard) SeriesN() int64 { engine, err := s.Engine() if err != nil { return 0 } return engine.SeriesN() } // SeriesSketches returns the measurement sketches for the shard. func (s *Shard) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { engine, err := s.Engine() if err != nil { return nil, nil, err } return engine.SeriesSketches() } // MeasurementsSketches returns the measurement sketches for the shard. func (s *Shard) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { engine, err := s.Engine() if err != nil { return nil, nil, err } return engine.MeasurementsSketches() } // MeasurementNamesByRegex returns names of measurements matching the regular expression. func (s *Shard) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) { engine, err := s.Engine() if err != nil { return nil, err } return engine.MeasurementNamesByRegex(re) } // MeasurementTagKeysByExpr returns all the tag keys for the provided expression. func (s *Shard) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) { engine, err := s.Engine() if err != nil { return nil, err } return engine.MeasurementTagKeysByExpr(name, expr) } // MeasurementTagKeyValuesByExpr returns all the tag keys values for the // provided expression. func (s *Shard) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { index, err := s.Index() if err != nil { return nil, err } indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile} return indexSet.MeasurementTagKeyValuesByExpr(auth, name, key, expr, keysSorted) } // MeasurementFields returns fields for a measurement. // TODO(edd): This method is currently only being called from tests; do we // really need it? func (s *Shard) MeasurementFields(name []byte) *MeasurementFields { engine, err := s.Engine() if err != nil { return nil } return engine.MeasurementFields(name) } // MeasurementExists returns true if the shard contains name. // TODO(edd): This method is currently only being called from tests; do we // really need it? func (s *Shard) MeasurementExists(name []byte) (bool, error) { engine, err := s.Engine() if err != nil { return false, err } return engine.MeasurementExists(name) } // WriteTo writes the shard's data to w. func (s *Shard) WriteTo(w io.Writer) (int64, error) { engine, err := s.Engine() if err != nil { return 0, err } n, err := engine.WriteTo(w) atomic.AddInt64(&s.stats.BytesWritten, int64(n)) return n, err } // CreateIterator returns an iterator for the data in the shard. func (s *Shard) CreateIterator(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) { engine, err := s.Engine() if err != nil { return nil, err } switch m.SystemIterator { case "_fieldKeys": return NewFieldKeysIterator(s, opt) case "_series": // TODO(benbjohnson): Move up to the Shards.CreateIterator(). index, err := s.Index() if err != nil { return nil, err } indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile} itr, err := NewSeriesPointIterator(indexSet, opt) if err != nil { return nil, err } return query.NewInterruptIterator(itr, opt.InterruptCh), nil case "_tagKeys": return NewTagKeysIterator(s, opt) } return engine.CreateIterator(ctx, m.Name, opt) } func (s *Shard) CreateSeriesCursor(ctx context.Context, req SeriesCursorRequest, cond influxql.Expr) (SeriesCursor, error) { index, err := s.Index() if err != nil { return nil, err } return newSeriesCursor(req, IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile}, cond) } func (s *Shard) CreateCursorIterator(ctx context.Context) (CursorIterator, error) { engine, err := s.Engine() if err != nil { return nil, err } return engine.CreateCursorIterator(ctx) } // FieldDimensions returns unique sets of fields and dimensions across a list of sources. func (s *Shard) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) { engine, err := s.Engine() if err != nil { return nil, nil, err } fields = make(map[string]influxql.DataType) dimensions = make(map[string]struct{}) index, err := s.Index() if err != nil { return nil, nil, err } for _, name := range measurements { // Handle system sources. if strings.HasPrefix(name, "_") { var keys []string switch name { case "_fieldKeys": keys = []string{"fieldKey", "fieldType"} case "_series": keys = []string{"key"} case "_tagKeys": keys = []string{"tagKey"} } if len(keys) > 0 { for _, k := range keys { if fields[k].LessThan(influxql.String) { fields[k] = influxql.String } } continue } // Unknown system source so default to looking for a measurement. } // Retrieve measurement. if exists, err := engine.MeasurementExists([]byte(name)); err != nil { return nil, nil, err } else if !exists { continue } // Append fields and dimensions. mf := engine.MeasurementFields([]byte(name)) if mf != nil { for k, typ := range mf.FieldSet() { if fields[k].LessThan(typ) { fields[k] = typ } } } indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: s.sfile} if err := indexSet.ForEachMeasurementTagKey([]byte(name), func(key []byte) error { dimensions[string(key)] = struct{}{} return nil }); err != nil { return nil, nil, err } } return fields, dimensions, nil } // mapType returns the data type for the field within the measurement. func (s *Shard) mapType(measurement, field string) (influxql.DataType, error) { engine, err := s.engineNoLock() if err != nil { return 0, err } switch field { case "_name", "_tagKey", "_tagValue", "_seriesKey": return influxql.String, nil } // Process system measurements. switch measurement { case "_fieldKeys": if field == "fieldKey" || field == "fieldType" { return influxql.String, nil } return influxql.Unknown, nil case "_series": if field == "key" { return influxql.String, nil } return influxql.Unknown, nil case "_tagKeys": if field == "tagKey" { return influxql.String, nil } return influxql.Unknown, nil } // Unknown system source so default to looking for a measurement. if exists, _ := engine.MeasurementExists([]byte(measurement)); !exists { return influxql.Unknown, nil } mf := engine.MeasurementFields([]byte(measurement)) if mf != nil { f := mf.Field(field) if f != nil { return f.Type, nil } } if exists, _ := engine.HasTagKey([]byte(measurement), []byte(field)); exists { return influxql.Tag, nil } return influxql.Unknown, nil } // expandSources expands regex sources and removes duplicates. // NOTE: sources must be normalized (db and rp set) before calling this function. func (s *Shard) expandSources(sources influxql.Sources) (influxql.Sources, error) { engine, err := s.engineNoLock() if err != nil { return nil, err } // Use a map as a set to prevent duplicates. set := map[string]influxql.Source{} // Iterate all sources, expanding regexes when they're found. for _, source := range sources { switch src := source.(type) { case *influxql.Measurement: // Add non-regex measurements directly to the set. if src.Regex == nil { set[src.String()] = src continue } // Loop over matching measurements. names, err := engine.MeasurementNamesByRegex(src.Regex.Val) if err != nil { return nil, err } for _, name := range names { other := &influxql.Measurement{ Database: src.Database, RetentionPolicy: src.RetentionPolicy, Name: string(name), } set[other.String()] = other } default: return nil, fmt.Errorf("expandSources: unsupported source type: %T", source) } } // Convert set to sorted slice. names := make([]string, 0, len(set)) for name := range set { names = append(names, name) } sort.Strings(names) // Convert set to a list of Sources. expanded := make(influxql.Sources, 0, len(set)) for _, name := range names { expanded = append(expanded, set[name]) } return expanded, nil } // Backup backs up the shard by creating a tar archive of all TSM files that // have been modified since the provided time. See Engine.Backup for more details. func (s *Shard) Backup(w io.Writer, basePath string, since time.Time) error { engine, err := s.Engine() if err != nil { return err } return engine.Backup(w, basePath, since) } func (s *Shard) Export(w io.Writer, basePath string, start time.Time, end time.Time) error { engine, err := s.Engine() if err != nil { return err } return engine.Export(w, basePath, start, end) } // Restore restores data to the underlying engine for the shard. // The shard is reopened after restore. func (s *Shard) Restore(r io.Reader, basePath string) error { if err := func() error { s.mu.Lock() defer s.mu.Unlock() // Special case - we can still restore to a disabled shard, so we should // only check if the engine is closed and not care if the shard is // disabled. if s._engine == nil { return ErrEngineClosed } // Restore to engine. return s._engine.Restore(r, basePath) }(); err != nil { return err } // Close shard. if err := s.Close(); err != nil { return err } // Reopen engine. return s.Open() } // Import imports data to the underlying engine for the shard. r should // be a reader from a backup created by Backup. func (s *Shard) Import(r io.Reader, basePath string) error { // Special case - we can still import to a disabled shard, so we should // only check if the engine is closed and not care if the shard is // disabled. s.mu.Lock() defer s.mu.Unlock() if s._engine == nil { return ErrEngineClosed } // Import to engine. return s._engine.Import(r, basePath) } // CreateSnapshot will return a path to a temp directory // containing hard links to the underlying shard files. func (s *Shard) CreateSnapshot() (string, error) { engine, err := s.Engine() if err != nil { return "", err } return engine.CreateSnapshot() } // ForEachMeasurementName iterates over each measurement in the shard. func (s *Shard) ForEachMeasurementName(fn func(name []byte) error) error { engine, err := s.Engine() if err != nil { return err } return engine.ForEachMeasurementName(fn) } func (s *Shard) TagKeyCardinality(name, key []byte) int { engine, err := s.Engine() if err != nil { return 0 } return engine.TagKeyCardinality(name, key) } // Digest returns a digest of the shard. func (s *Shard) Digest() (io.ReadCloser, int64, error) { engine, err := s.Engine() if err != nil { return nil, 0, err } // Make sure the shard is idle/cold. (No use creating a digest of a // hot shard that is rapidly changing.) if !engine.IsIdle() { return nil, 0, ErrShardNotIdle } return engine.Digest() } // engine safely (under an RLock) returns a reference to the shard's Engine, or // an error if the Engine is closed, or the shard is currently disabled. // // The shard's Engine should always be accessed via a call to engine(), rather // than directly referencing Shard.engine. // // If a caller needs an Engine reference but is already under a lock, then they // should use engineNoLock(). func (s *Shard) Engine() (Engine, error) { s.mu.RLock() defer s.mu.RUnlock() return s.engineNoLock() } // engineNoLock is similar to calling engine(), but the caller must guarantee // that they already hold an appropriate lock. func (s *Shard) engineNoLock() (Engine, error) { if err := s.ready(); err != nil { return nil, err } return s._engine, nil } type ShardGroup interface { MeasurementsByRegex(re *regexp.Regexp) []string FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) MapType(measurement, field string) influxql.DataType CreateIterator(ctx context.Context, measurement *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) ExpandSources(sources influxql.Sources) (influxql.Sources, error) } // Shards represents a sortable list of shards. type Shards []*Shard // Len implements sort.Interface. func (a Shards) Len() int { return len(a) } // Less implements sort.Interface. func (a Shards) Less(i, j int) bool { return a[i].id < a[j].id } // Swap implements sort.Interface. func (a Shards) Swap(i, j int) { a[i], a[j] = a[j], a[i] } // MeasurementsByRegex returns the unique set of measurements matching the // provided regex, for all the shards. func (a Shards) MeasurementsByRegex(re *regexp.Regexp) []string { var m map[string]struct{} for _, sh := range a { names, err := sh.MeasurementNamesByRegex(re) if err != nil { continue // Skip this shard's results—previous behaviour. } if m == nil { m = make(map[string]struct{}, len(names)) } for _, name := range names { m[string(name)] = struct{}{} } } if len(m) == 0 { return nil } names := make([]string, 0, len(m)) for key := range m { names = append(names, key) } sort.Strings(names) return names } func (a Shards) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) { fields = make(map[string]influxql.DataType) dimensions = make(map[string]struct{}) for _, sh := range a { f, d, err := sh.FieldDimensions(measurements) if err != nil { return nil, nil, err } for k, typ := range f { if fields[k].LessThan(typ) { fields[k] = typ } } for k := range d { dimensions[k] = struct{}{} } } return } func (a Shards) MapType(measurement, field string) influxql.DataType { var typ influxql.DataType for _, sh := range a { sh.mu.RLock() if t, err := sh.mapType(measurement, field); err == nil && typ.LessThan(t) { typ = t } sh.mu.RUnlock() } return typ } func (a Shards) CallType(name string, args []influxql.DataType) (influxql.DataType, error) { typmap := query.CallTypeMapper{} return typmap.CallType(name, args) } func (a Shards) CreateIterator(ctx context.Context, measurement *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) { switch measurement.SystemIterator { case "_series": return a.createSeriesIterator(ctx, opt) } itrs := make([]query.Iterator, 0, len(a)) for _, sh := range a { itr, err := sh.CreateIterator(ctx, measurement, opt) if err != nil { query.Iterators(itrs).Close() return nil, err } else if itr == nil { continue } itrs = append(itrs, itr) select { case <-opt.InterruptCh: query.Iterators(itrs).Close() return nil, query.ErrQueryInterrupted default: } // Enforce series limit at creation time. if opt.MaxSeriesN > 0 { stats := itr.Stats() if stats.SeriesN > opt.MaxSeriesN { query.Iterators(itrs).Close() return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", stats.SeriesN, opt.MaxSeriesN) } } } return query.Iterators(itrs).Merge(opt) } func (a Shards) createSeriesIterator(ctx context.Context, opt query.IteratorOptions) (_ query.Iterator, err error) { var ( idxs = make([]Index, 0, len(a)) sfile *SeriesFile ) for _, sh := range a { var idx Index if idx, err = sh.Index(); err == nil { idxs = append(idxs, idx) } if sfile == nil { sfile, _ = sh.seriesFile() } } if sfile == nil { return nil, nil } return NewSeriesPointIterator(IndexSet{Indexes: idxs, SeriesFile: sfile}, opt) } func (a Shards) IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error) { var costs query.IteratorCost var costerr error var mu sync.RWMutex setErr := func(err error) { mu.Lock() defer mu.Unlock() if costerr == nil { costerr = err } } limit := limiter.NewFixed(runtime.GOMAXPROCS(0)) var wg sync.WaitGroup for _, sh := range a { limit.Take() wg.Add(1) mu.RLock() if costerr != nil { mu.RUnlock() break } mu.RUnlock() go func(sh *Shard) { defer limit.Release() defer wg.Done() engine, err := sh.Engine() if err != nil { setErr(err) return } cost, err := engine.IteratorCost(measurement, opt) if err != nil { setErr(err) return } mu.Lock() costs = costs.Combine(cost) mu.Unlock() }(sh) } wg.Wait() return costs, costerr } func (a Shards) CreateSeriesCursor(ctx context.Context, req SeriesCursorRequest, cond influxql.Expr) (_ SeriesCursor, err error) { var ( idxs []Index sfile *SeriesFile ) for _, sh := range a { var idx Index if idx, err = sh.Index(); err == nil { idxs = append(idxs, idx) } if sfile == nil { sfile, _ = sh.seriesFile() } } if sfile == nil { return nil, errors.New("CreateSeriesCursor: no series file") } return newSeriesCursor(req, IndexSet{Indexes: idxs, SeriesFile: sfile}, cond) } func (a Shards) ExpandSources(sources influxql.Sources) (influxql.Sources, error) { // Use a map as a set to prevent duplicates. set := map[string]influxql.Source{} // Iterate through every shard and expand the sources. for _, sh := range a { sh.mu.RLock() expanded, err := sh.expandSources(sources) sh.mu.RUnlock() if err != nil { return nil, err } for _, src := range expanded { switch src := src.(type) { case *influxql.Measurement: set[src.String()] = src default: return nil, fmt.Errorf("Store.ExpandSources: unsupported source type: %T", src) } } } // Convert set to sorted slice. names := make([]string, 0, len(set)) for name := range set { names = append(names, name) } sort.Strings(names) // Convert set to a list of Sources. sorted := make([]influxql.Source, 0, len(set)) for _, name := range names { sorted = append(sorted, set[name]) } return sorted, nil } // MeasurementFields holds the fields of a measurement and their codec. type MeasurementFields struct { mu sync.RWMutex fields map[string]*Field } // NewMeasurementFields returns an initialised *MeasurementFields value. func NewMeasurementFields() *MeasurementFields { return &MeasurementFields{fields: make(map[string]*Field)} } func (m *MeasurementFields) FieldKeys() []string { m.mu.RLock() defer m.mu.RUnlock() a := make([]string, 0, len(m.fields)) for key := range m.fields { a = append(a, key) } sort.Strings(a) return a } // bytes estimates the memory footprint of this MeasurementFields, in bytes. func (m *MeasurementFields) bytes() int { var b int m.mu.RLock() b += 24 // mu RWMutex is 24 bytes b += int(unsafe.Sizeof(m.fields)) for k, v := range m.fields { b += int(unsafe.Sizeof(k)) + len(k) b += int(unsafe.Sizeof(v)+unsafe.Sizeof(*v)) + len(v.Name) } m.mu.RUnlock() return b } // CreateFieldIfNotExists creates a new field with an autoincrementing ID. // Returns an error if 255 fields have already been created on the measurement or // the fields already exists with a different type. func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType) error { m.mu.RLock() // Ignore if the field already exists. if f := m.fields[string(name)]; f != nil { if f.Type != typ { m.mu.RUnlock() return ErrFieldTypeConflict } m.mu.RUnlock() return nil } m.mu.RUnlock() m.mu.Lock() defer m.mu.Unlock() // Re-check field and type under write lock. if f := m.fields[string(name)]; f != nil { if f.Type != typ { return ErrFieldTypeConflict } return nil } // Create and append a new field. f := &Field{ ID: uint8(len(m.fields) + 1), Name: string(name), Type: typ, } m.fields[string(name)] = f return nil } func (m *MeasurementFields) FieldN() int { m.mu.RLock() n := len(m.fields) m.mu.RUnlock() return n } // Field returns the field for name, or nil if there is no field for name. func (m *MeasurementFields) Field(name string) *Field { m.mu.RLock() f := m.fields[name] m.mu.RUnlock() return f } func (m *MeasurementFields) HasField(name string) bool { if m == nil { return false } m.mu.RLock() f := m.fields[name] m.mu.RUnlock() return f != nil } // FieldBytes returns the field for name, or nil if there is no field for name. // FieldBytes should be preferred to Field when the caller has a []byte, because // it avoids a string allocation, which can't be avoided if the caller converts // the []byte to a string and calls Field. func (m *MeasurementFields) FieldBytes(name []byte) *Field { m.mu.RLock() f := m.fields[string(name)] m.mu.RUnlock() return f } // FieldSet returns the set of fields and their types for the measurement. func (m *MeasurementFields) FieldSet() map[string]influxql.DataType { m.mu.RLock() defer m.mu.RUnlock() fields := make(map[string]influxql.DataType) for name, f := range m.fields { fields[name] = f.Type } return fields } func (m *MeasurementFields) ForEachField(fn func(name string, typ influxql.DataType) bool) { m.mu.RLock() defer m.mu.RUnlock() for name, f := range m.fields { if !fn(name, f.Type) { return } } } // Clone returns copy of the MeasurementFields func (m *MeasurementFields) Clone() *MeasurementFields { m.mu.RLock() defer m.mu.RUnlock() fields := make(map[string]*Field, len(m.fields)) for key, field := range m.fields { fields[key] = field } return &MeasurementFields{ fields: fields, } } // MeasurementFieldSet represents a collection of fields by measurement. // This safe for concurrent use. type MeasurementFieldSet struct { mu sync.RWMutex fields map[string]*MeasurementFields // path is the location to persist field sets path string } // NewMeasurementFieldSet returns a new instance of MeasurementFieldSet. func NewMeasurementFieldSet(path string) (*MeasurementFieldSet, error) { fs := &MeasurementFieldSet{ fields: make(map[string]*MeasurementFields), path: path, } // If there is a load error, return the error and an empty set so // it can be rebuild manually. return fs, fs.load() } // Bytes estimates the memory footprint of this MeasurementFieldSet, in bytes. func (fs *MeasurementFieldSet) Bytes() int { var b int fs.mu.RLock() b += 24 // mu RWMutex is 24 bytes for k, v := range fs.fields { b += int(unsafe.Sizeof(k)) + len(k) b += int(unsafe.Sizeof(v)) + v.bytes() } b += int(unsafe.Sizeof(fs.fields)) b += int(unsafe.Sizeof(fs.path)) + len(fs.path) fs.mu.RUnlock() return b } // Fields returns fields for a measurement by name. func (fs *MeasurementFieldSet) Fields(name []byte) *MeasurementFields { fs.mu.RLock() mf := fs.fields[string(name)] fs.mu.RUnlock() return mf } // FieldsByString returns fields for a measurment by name. func (fs *MeasurementFieldSet) FieldsByString(name string) *MeasurementFields { fs.mu.RLock() mf := fs.fields[name] fs.mu.RUnlock() return mf } // CreateFieldsIfNotExists returns fields for a measurement by name. func (fs *MeasurementFieldSet) CreateFieldsIfNotExists(name []byte) *MeasurementFields { fs.mu.RLock() mf := fs.fields[string(name)] fs.mu.RUnlock() if mf != nil { return mf } fs.mu.Lock() mf = fs.fields[string(name)] if mf == nil { mf = NewMeasurementFields() fs.fields[string(name)] = mf } fs.mu.Unlock() return mf } // Delete removes a field set for a measurement. func (fs *MeasurementFieldSet) Delete(name string) { fs.mu.Lock() delete(fs.fields, name) fs.mu.Unlock() } // DeleteWithLock executes fn and removes a field set from a measurement under lock. func (fs *MeasurementFieldSet) DeleteWithLock(name string, fn func() error) error { fs.mu.Lock() defer fs.mu.Unlock() if err := fn(); err != nil { return err } delete(fs.fields, name) return nil } func (fs *MeasurementFieldSet) IsEmpty() bool { fs.mu.RLock() defer fs.mu.RUnlock() return len(fs.fields) == 0 } func (fs *MeasurementFieldSet) Save() error { fs.mu.Lock() defer fs.mu.Unlock() return fs.saveNoLock() } func (fs *MeasurementFieldSet) saveNoLock() error { // No fields left, remove the fields index file if len(fs.fields) == 0 { return os.RemoveAll(fs.path) } // Write the new index to a temp file and rename when it's sync'd path := fs.path + ".tmp" fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666) if err != nil { return err } defer os.RemoveAll(path) if _, err := fd.Write(fieldsIndexMagicNumber); err != nil { return err } pb := internal.MeasurementFieldSet{ Measurements: make([]*internal.MeasurementFields, 0, len(fs.fields)), } for name, mf := range fs.fields { fs := &internal.MeasurementFields{ Name: name, Fields: make([]*internal.Field, 0, mf.FieldN()), } mf.ForEachField(func(field string, typ influxql.DataType) bool { fs.Fields = append(fs.Fields, &internal.Field{Name: field, Type: int32(typ)}) return true }) pb.Measurements = append(pb.Measurements, fs) } b, err := proto.Marshal(&pb) if err != nil { return err } if _, err := fd.Write(b); err != nil { return err } if err = fd.Sync(); err != nil { return err } //close file handle before renaming to support Windows if err = fd.Close(); err != nil { return err } if err := file.RenameFile(path, fs.path); err != nil { return err } return file.SyncDir(filepath.Dir(fs.path)) } func (fs *MeasurementFieldSet) load() error { fs.mu.Lock() defer fs.mu.Unlock() fd, err := os.Open(fs.path) if os.IsNotExist(err) { return nil } else if err != nil { return err } defer fd.Close() var magic [4]byte if _, err := fd.Read(magic[:]); err != nil { return err } if !bytes.Equal(magic[:], fieldsIndexMagicNumber) { return ErrUnknownFieldsFormat } var pb internal.MeasurementFieldSet b, err := ioutil.ReadAll(fd) if err != nil { return err } if err := proto.Unmarshal(b, &pb); err != nil { return err } fs.fields = make(map[string]*MeasurementFields, len(pb.GetMeasurements())) for _, measurement := range pb.GetMeasurements() { set := &MeasurementFields{ fields: make(map[string]*Field, len(measurement.GetFields())), } for _, field := range measurement.GetFields() { set.fields[field.GetName()] = &Field{Name: field.GetName(), Type: influxql.DataType(field.GetType())} } fs.fields[measurement.GetName()] = set } return nil } // Field represents a series field. type Field struct { ID uint8 `json:"id,omitempty"` Name string `json:"name,omitempty"` Type influxql.DataType `json:"type,omitempty"` } // NewFieldKeysIterator returns an iterator that can be iterated over to // retrieve field keys. func NewFieldKeysIterator(sh *Shard, opt query.IteratorOptions) (query.Iterator, error) { itr := &fieldKeysIterator{shard: sh} index, err := sh.Index() if err != nil { return nil, err } // Retrieve measurements from shard. Filter if condition specified. // // FGA is currently not supported when retrieving field keys. indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sh.sfile} names, err := indexSet.MeasurementNamesByExpr(query.OpenAuthorizer, opt.Condition) if err != nil { return nil, err } itr.names = names return itr, nil } // fieldKeysIterator iterates over measurements and gets field keys from each measurement. type fieldKeysIterator struct { shard *Shard names [][]byte // remaining measurement names buf struct { name []byte // current measurement name fields []Field // current measurement's fields } } // Stats returns stats about the points processed. func (itr *fieldKeysIterator) Stats() query.IteratorStats { return query.IteratorStats{} } // Close closes the iterator. func (itr *fieldKeysIterator) Close() error { return nil } // Next emits the next tag key name. func (itr *fieldKeysIterator) Next() (*query.FloatPoint, error) { for { // If there are no more keys then move to the next measurements. if len(itr.buf.fields) == 0 { if len(itr.names) == 0 { return nil, nil } itr.buf.name = itr.names[0] mf := itr.shard.MeasurementFields(itr.buf.name) if mf != nil { fset := mf.FieldSet() if len(fset) == 0 { itr.names = itr.names[1:] continue } keys := make([]string, 0, len(fset)) for k := range fset { keys = append(keys, k) } sort.Strings(keys) itr.buf.fields = make([]Field, len(keys)) for i, name := range keys { itr.buf.fields[i] = Field{Name: name, Type: fset[name]} } } itr.names = itr.names[1:] continue } // Return next key. field := itr.buf.fields[0] p := &query.FloatPoint{ Name: string(itr.buf.name), Aux: []interface{}{field.Name, field.Type.String()}, } itr.buf.fields = itr.buf.fields[1:] return p, nil } } // NewTagKeysIterator returns a new instance of TagKeysIterator. func NewTagKeysIterator(sh *Shard, opt query.IteratorOptions) (query.Iterator, error) { fn := func(name []byte) ([][]byte, error) { index, err := sh.Index() if err != nil { return nil, err } indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sh.sfile} var keys [][]byte if err := indexSet.ForEachMeasurementTagKey(name, func(key []byte) error { keys = append(keys, key) return nil }); err != nil { return nil, err } return keys, nil } return newMeasurementKeysIterator(sh, fn, opt) } // measurementKeyFunc is the function called by measurementKeysIterator. type measurementKeyFunc func(name []byte) ([][]byte, error) func newMeasurementKeysIterator(sh *Shard, fn measurementKeyFunc, opt query.IteratorOptions) (*measurementKeysIterator, error) { index, err := sh.Index() if err != nil { return nil, err } indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sh.sfile} itr := &measurementKeysIterator{fn: fn} names, err := indexSet.MeasurementNamesByExpr(opt.Authorizer, opt.Condition) if err != nil { return nil, err } itr.names = names return itr, nil } // measurementKeysIterator iterates over measurements and gets keys from each measurement. type measurementKeysIterator struct { names [][]byte // remaining measurement names buf struct { name []byte // current measurement name keys [][]byte // current measurement's keys } fn measurementKeyFunc } // Stats returns stats about the points processed. func (itr *measurementKeysIterator) Stats() query.IteratorStats { return query.IteratorStats{} } // Close closes the iterator. func (itr *measurementKeysIterator) Close() error { return nil } // Next emits the next tag key name. func (itr *measurementKeysIterator) Next() (*query.FloatPoint, error) { for { // If there are no more keys then move to the next measurements. if len(itr.buf.keys) == 0 { if len(itr.names) == 0 { return nil, nil } itr.buf.name, itr.names = itr.names[0], itr.names[1:] keys, err := itr.fn(itr.buf.name) if err != nil { return nil, err } itr.buf.keys = keys continue } // Return next key. p := &query.FloatPoint{ Name: string(itr.buf.name), Aux: []interface{}{string(itr.buf.keys[0])}, } itr.buf.keys = itr.buf.keys[1:] return p, nil } } // LimitError represents an error caused by a configurable limit. type LimitError struct { Reason string } func (e *LimitError) Error() string { return e.Reason }