From d44b583c4d762911550aa45c1d0742178feec188 Mon Sep 17 00:00:00 2001 From: Jeff Wendling Date: Mon, 24 Sep 2018 21:51:11 -0600 Subject: [PATCH] remove code as reported by the unused tool --- tsdb/config.go | 12 - tsdb/cursor.go | 14 - tsdb/engine.go | 11 - tsdb/errors.go | 18 - tsdb/field.go | 36 -- tsdb/index.go | 1053 ------------------------------------- tsdb/series_collection.go | 1 - tsdb/series_cursor.go | 16 - tsdb/series_file.go | 36 -- tsdb/series_id.go | 12 +- tsdb/series_index.go | 22 - tsdb/series_partition.go | 36 -- tsdb/series_segment.go | 26 - tsdb/series_set.go | 20 - tsdb/shard.go | 9 - 15 files changed, 5 insertions(+), 1317 deletions(-) diff --git a/tsdb/config.go b/tsdb/config.go index c5156aa591..c980b4a270 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -74,14 +74,6 @@ type Config struct { Engine string `toml:"-"` Index string `toml:"index-version"` - // General WAL configuration options - WALDir string `toml:"wal-dir"` - - // WALFsyncDelay is the amount of time that a write will wait before fsyncing. A duration - // greater than 0 can be used to batch up multiple fsync calls. This is useful for slower - // disks or when WAL write contention is seen. A value of 0 fsyncs every write to the WAL. - WALFsyncDelay toml.Duration `toml:"wal-fsync-delay"` - // Enables unicode validation on series keys on write. ValidateKeys bool `toml:"validate-keys"` @@ -159,8 +151,6 @@ func NewConfig() Config { func (c *Config) Validate() error { if c.Dir == "" { return errors.New("Data.Dir must be specified") - } else if c.WALDir == "" { - return errors.New("Data.WALDir must be specified") } if c.MaxConcurrentCompactions < 0 { @@ -196,8 +186,6 @@ func (c *Config) Validate() error { func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) { return diagnostics.RowFromMap(map[string]interface{}{ "dir": c.Dir, - "wal-dir": c.WALDir, - "wal-fsync-delay": c.WALFsyncDelay, "cache-max-memory-size": c.CacheMaxMemorySize, "cache-snapshot-memory-size": c.CacheSnapshotMemorySize, "cache-snapshot-write-cold-duration": c.CacheSnapshotWriteColdDuration, diff --git a/tsdb/cursor.go b/tsdb/cursor.go index 5cd3843833..a445d50097 100644 --- a/tsdb/cursor.go +++ b/tsdb/cursor.go @@ -55,17 +55,3 @@ type CursorIterator interface { } type CursorIterators []CursorIterator - -func CreateCursorIterators(ctx context.Context, shards []*Shard) (CursorIterators, error) { - q := make(CursorIterators, 0, len(shards)) - for _, s := range shards { - // possible errors are ErrEngineClosed or ErrShardDisabled, so we can safely skip those shards - if cq, err := s.CreateCursorIterator(ctx); cq != nil && err == nil { - q = append(q, cq) - } - } - if len(q) == 0 { - return nil, nil - } - return q, nil -} diff --git a/tsdb/engine.go b/tsdb/engine.go index c891620886..ff097732d0 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -20,9 +20,6 @@ import ( ) var ( - // ErrFormatNotFound is returned when no format can be determined from a path. - ErrFormatNotFound = errors.New("format not found") - // ErrUnknownEngineFormat is returned when the engine format is // unknown. ErrUnknownEngineFormat is currently returned if a format // other than tsm1 is encountered. @@ -89,14 +86,6 @@ type SeriesIDSets interface { ForEach(f func(ids *SeriesIDSet)) error } -// EngineFormat represents the format for an engine. -type EngineFormat int - -const ( - // TSM1Format is the format used by the tsm1 engine. - TSM1Format EngineFormat = 2 -) - // NewEngineFunc creates a new engine. type NewEngineFunc func(id uint64, i Index, path string, walPath string, sfile *SeriesFile, options EngineOptions) Engine diff --git a/tsdb/errors.go b/tsdb/errors.go index 887e72399d..59208b3d95 100644 --- a/tsdb/errors.go +++ b/tsdb/errors.go @@ -6,19 +6,9 @@ import ( ) var ( - // ErrFieldOverflow is returned when too many fields are created on a measurement. - ErrFieldOverflow = errors.New("field overflow") - // ErrFieldTypeConflict is returned when a new field already exists with a different type. ErrFieldTypeConflict = errors.New("field type conflict") - // ErrFieldNotFound is returned when a field cannot be found. - ErrFieldNotFound = errors.New("field not found") - - // ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID - // there is no mapping for. - ErrFieldUnmappedID = errors.New("field ID not mapped") - // ErrEngineClosed is returned when a caller attempts indirectly to // access the shard's underlying engine. ErrEngineClosed = errors.New("engine is closed") @@ -27,16 +17,8 @@ var ( // queries or writes. ErrShardDisabled = errors.New("shard is disabled") - // ErrUnknownFieldsFormat is returned when the fields index file is not identifiable by - // the file's magic number. - ErrUnknownFieldsFormat = errors.New("unknown field index format") - // ErrUnknownFieldType is returned when the type of a field cannot be determined. ErrUnknownFieldType = errors.New("unknown field type") - - // ErrShardNotIdle is returned when an operation requring the shard to be idle/cold is - // attempted on a hot shard. - ErrShardNotIdle = errors.New("shard not idle") ) // A ShardError implements the error interface, and contains extra diff --git a/tsdb/field.go b/tsdb/field.go index 686f25231b..3cc1086ca5 100644 --- a/tsdb/field.go +++ b/tsdb/field.go @@ -220,13 +220,6 @@ func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.Dat return nil } -func (m *MeasurementFields) FieldN() int { - m.mu.RLock() - n := len(m.fields) - m.mu.RUnlock() - return n -} - // Field returns the field for name, or nil if there is no field for name. func (m *MeasurementFields) Field(name string) *Field { m.mu.RLock() @@ -256,28 +249,6 @@ func (m *MeasurementFields) FieldBytes(name []byte) *Field { return f } -// FieldSet returns the set of fields and their types for the measurement. -func (m *MeasurementFields) FieldSet() map[string]influxql.DataType { - m.mu.RLock() - defer m.mu.RUnlock() - - fields := make(map[string]influxql.DataType) - for name, f := range m.fields { - fields[name] = f.Type - } - return fields -} - -func (m *MeasurementFields) ForEachField(fn func(name string, typ influxql.DataType) bool) { - m.mu.RLock() - defer m.mu.RUnlock() - for name, f := range m.fields { - if !fn(name, f.Type) { - return - } - } -} - // Clone returns copy of the MeasurementFields func (m *MeasurementFields) Clone() *MeasurementFields { m.mu.RLock() @@ -364,13 +335,6 @@ func (fs *MeasurementFieldSet) CreateFieldsIfNotExists(name []byte) *Measurement return mf } -// Delete removes a field set for a measurement. -func (fs *MeasurementFieldSet) Delete(name string) { - fs.mu.Lock() - delete(fs.fields, name) - fs.mu.Unlock() -} - // DeleteWithLock executes fn and removes a field set from a measurement under lock. func (fs *MeasurementFieldSet) DeleteWithLock(name string, fn func() error) error { fs.mu.Lock() diff --git a/tsdb/index.go b/tsdb/index.go index 4a42673f2e..f039af73d5 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -2,17 +2,13 @@ package tsdb import ( "bytes" - "errors" "fmt" "os" "regexp" "sort" - "sync" "github.com/influxdata/influxdb/models" - "github.com/influxdata/influxdb/pkg/bytesutil" "github.com/influxdata/influxdb/pkg/estimator" - "github.com/influxdata/influxdb/pkg/slices" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxql" "go.uber.org/zap" @@ -151,13 +147,6 @@ type SeriesIDElem struct { Expr influxql.Expr } -// SeriesIDElems represents a list of series id elements. -type SeriesIDElems []SeriesIDElem - -func (a SeriesIDElems) Len() int { return len(a) } -func (a SeriesIDElems) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a SeriesIDElems) Less(i, j int) bool { return a[i].SeriesID.Less(a[j].SeriesID) } - // SeriesIDIterator represents a iterator over a list of series ids. type SeriesIDIterator interface { Next() (SeriesIDElem, error) @@ -211,25 +200,6 @@ func NewSeriesIDSetIterators(itrs []SeriesIDIterator) []SeriesIDSetIterator { return a } -// ReadAllSeriesIDIterator returns all ids from the iterator. -func ReadAllSeriesIDIterator(itr SeriesIDIterator) ([]SeriesID, error) { - if itr == nil { - return nil, nil - } - - var a []SeriesID - for { - e, err := itr.Next() - if err != nil { - return nil, err - } else if e.SeriesID.IsZero() { - break - } - a = append(a, e.SeriesID) - } - return a, nil -} - // NewSeriesIDSliceIterator returns a SeriesIDIterator that iterates over a slice. func NewSeriesIDSliceIterator(ids []SeriesID) *SeriesIDSliceIterator { return &SeriesIDSliceIterator{ids: ids} @@ -272,73 +242,6 @@ func (a SeriesIDIterators) Close() (err error) { return err } -// seriesQueryAdapterIterator adapts SeriesIDIterator to an influxql.Iterator. -type seriesQueryAdapterIterator struct { - once sync.Once - sfile *SeriesFile - itr SeriesIDIterator - fieldset *MeasurementFieldSet - opt query.IteratorOptions - - point query.FloatPoint // reusable point -} - -// NewSeriesQueryAdapterIterator returns a new instance of SeriesQueryAdapterIterator. -func NewSeriesQueryAdapterIterator(sfile *SeriesFile, itr SeriesIDIterator, fieldset *MeasurementFieldSet, opt query.IteratorOptions) query.Iterator { - return &seriesQueryAdapterIterator{ - sfile: sfile, - itr: itr, - fieldset: fieldset, - point: query.FloatPoint{ - Aux: make([]interface{}, len(opt.Aux)), - }, - opt: opt, - } -} - -// Stats returns stats about the points processed. -func (itr *seriesQueryAdapterIterator) Stats() query.IteratorStats { return query.IteratorStats{} } - -// Close closes the iterator. -func (itr *seriesQueryAdapterIterator) Close() error { - itr.once.Do(func() { - itr.itr.Close() - }) - return nil -} - -// Next emits the next point in the iterator. -func (itr *seriesQueryAdapterIterator) Next() (*query.FloatPoint, error) { - for { - // Read next series element. - e, err := itr.itr.Next() - if err != nil { - return nil, err - } else if e.SeriesID.IsZero() { - return nil, nil - } - - // Skip if key has been tombstoned. - seriesKey := itr.sfile.SeriesKey(e.SeriesID) - if len(seriesKey) == 0 { - continue - } - - // Convert to a key. - name, tags := ParseSeriesKey(seriesKey) - key := string(models.MakeKey(name, tags)) - - // Write auxiliary fields. - for i, f := range itr.opt.Aux { - switch f.Val { - case "key": - itr.point.Aux[i] = key - } - } - return &itr.point, nil - } -} - // filterUndeletedSeriesIDIterator returns all series which are not deleted. type filterUndeletedSeriesIDIterator struct { sfile *SeriesFile @@ -739,148 +642,6 @@ func (itr *seriesIDDifferenceIterator) Next() (_ SeriesIDElem, err error) { } } -// seriesPointIterator adapts SeriesIterator to an influxql.Iterator. -type seriesPointIterator struct { - once sync.Once - indexSet IndexSet - mitr MeasurementIterator - keys [][]byte - opt query.IteratorOptions - - point query.FloatPoint // reusable point -} - -// newSeriesPointIterator returns a new instance of seriesPointIterator. -func NewSeriesPointIterator(indexSet IndexSet, opt query.IteratorOptions) (_ query.Iterator, err error) { - // Only equality operators are allowed. - influxql.WalkFunc(opt.Condition, func(n influxql.Node) { - switch n := n.(type) { - case *influxql.BinaryExpr: - switch n.Op { - case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX, - influxql.OR, influxql.AND: - default: - err = errors.New("invalid tag comparison operator") - } - } - }) - if err != nil { - return nil, err - } - - mitr, err := indexSet.MeasurementIterator() - if err != nil { - return nil, err - } - - return &seriesPointIterator{ - indexSet: indexSet, - mitr: mitr, - point: query.FloatPoint{ - Aux: make([]interface{}, len(opt.Aux)), - }, - opt: opt, - }, nil -} - -// Stats returns stats about the points processed. -func (itr *seriesPointIterator) Stats() query.IteratorStats { return query.IteratorStats{} } - -// Close closes the iterator. -func (itr *seriesPointIterator) Close() (err error) { - itr.once.Do(func() { - if itr.mitr != nil { - err = itr.mitr.Close() - } - }) - return err -} - -// Next emits the next point in the iterator. -func (itr *seriesPointIterator) Next() (*query.FloatPoint, error) { - for { - // Read series keys for next measurement if no more keys remaining. - // Exit if there are no measurements remaining. - if len(itr.keys) == 0 { - m, err := itr.mitr.Next() - if err != nil { - return nil, err - } else if m == nil { - return nil, nil - } - - if err := itr.readSeriesKeys(m); err != nil { - return nil, err - } - continue - } - - name, tags := ParseSeriesKey(itr.keys[0]) - itr.keys = itr.keys[1:] - - // TODO(edd): It seems to me like this authorisation check should be - // further down in the index. At this point we're going to be filtering - // series that have already been materialised in the LogFiles and - // IndexFiles. - if itr.opt.Authorizer != nil && !itr.opt.Authorizer.AuthorizeSeriesRead(itr.indexSet.Database(), name, tags) { - continue - } - - // Convert to a key. - key := string(models.MakeKey(name, tags)) - - // Write auxiliary fields. - for i, f := range itr.opt.Aux { - switch f.Val { - case "key": - itr.point.Aux[i] = key - } - } - - return &itr.point, nil - } -} - -func (itr *seriesPointIterator) readSeriesKeys(name []byte) error { - sitr, err := itr.indexSet.MeasurementSeriesByExprIterator(name, itr.opt.Condition) - if err != nil { - return err - } else if sitr == nil { - return nil - } - defer sitr.Close() - - // Slurp all series keys. - itr.keys = itr.keys[:0] - for i := 0; ; i++ { - elem, err := sitr.Next() - if err != nil { - return err - } else if elem.SeriesID.IsZero() { - break - } - - // Periodically check for interrupt. - if i&0xFF == 0xFF { - select { - case <-itr.opt.InterruptCh: - return itr.Close() - default: - } - } - - key := itr.indexSet.SeriesFile.SeriesKey(elem.SeriesID) - if len(key) == 0 { - continue - } - itr.keys = append(itr.keys, key) - } - - // Sort keys. - sort.Sort(seriesKeys(itr.keys)) - return nil -} - // MeasurementIterator represents a iterator over a list of measurements. type MeasurementIterator interface { Close() error @@ -993,17 +754,6 @@ type TagKeyIterator interface { Next() ([]byte, error) } -type TagKeyIterators []TagKeyIterator - -func (a TagKeyIterators) Close() (err error) { - for i := range a { - if e := a[i].Close(); e != nil && err == nil { - err = e - } - } - return err -} - // NewTagKeySliceIterator returns a TagKeyIterator that iterates over a slice. func NewTagKeySliceIterator(keys [][]byte) *tagKeySliceIterator { return &tagKeySliceIterator{keys: keys} @@ -1204,16 +954,6 @@ type IndexSet struct { fieldSets []*MeasurementFieldSet // field sets for _all_ indexes in this set's DB. } -// HasInmemIndex returns true if any in-memory index is in use. -func (is IndexSet) HasInmemIndex() bool { - for _, idx := range is.Indexes { - if idx.Type() == InmemIndexName { - return true - } - } - return false -} - // Database returns the database name of the first index. func (is IndexSet) Database() string { if len(is.Indexes) == 0 { @@ -1245,361 +985,6 @@ func (is IndexSet) HasField(measurement []byte, field string) bool { return false } -// DedupeInmemIndexes returns an index set which removes duplicate indexes. -// Useful because inmem indexes are shared by shards per database. -func (is IndexSet) DedupeInmemIndexes() IndexSet { - other := IndexSet{ - Indexes: make([]Index, 0, len(is.Indexes)), - SeriesFile: is.SeriesFile, - fieldSets: make([]*MeasurementFieldSet, 0, len(is.Indexes)), - } - - uniqueIndexes := make(map[uintptr]Index) - for _, idx := range is.Indexes { - uniqueIndexes[idx.UniqueReferenceID()] = idx - } - - for _, idx := range uniqueIndexes { - other.Indexes = append(other.Indexes, idx) - other.fieldSets = append(other.fieldSets, idx.FieldSet()) - } - - return other -} - -// MeasurementNamesByExpr returns a slice of measurement names matching the -// provided condition. If no condition is provided then all names are returned. -func (is IndexSet) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { - release := is.SeriesFile.Retain() - defer release() - - // Return filtered list if expression exists. - if expr != nil { - return is.measurementNamesByExpr(auth, expr) - } - - itr, err := is.measurementIterator() - if err != nil { - return nil, err - } else if itr == nil { - return nil, nil - } - defer itr.Close() - - // Iterate over all measurements if no condition exists. - var names [][]byte - for { - e, err := itr.Next() - if err != nil { - return nil, err - } else if e == nil { - break - } - - // Determine if there exists at least one authorised series for the - // measurement name. - if is.measurementAuthorizedSeries(auth, e) { - names = append(names, e) - } - } - return slices.CopyChunkedByteSlices(names, 1000), nil -} - -func (is IndexSet) measurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { - if expr == nil { - return nil, nil - } - - switch e := expr.(type) { - case *influxql.BinaryExpr: - switch e.Op { - case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: - tag, ok := e.LHS.(*influxql.VarRef) - if !ok { - return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String()) - } - - // Retrieve value or regex expression from RHS. - var value string - var regex *regexp.Regexp - if influxql.IsRegexOp(e.Op) { - re, ok := e.RHS.(*influxql.RegexLiteral) - if !ok { - return nil, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String()) - } - regex = re.Val - } else { - s, ok := e.RHS.(*influxql.StringLiteral) - if !ok { - return nil, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String()) - } - value = s.Val - } - - // Match on name, if specified. - if tag.Val == "_name" { - return is.measurementNamesByNameFilter(auth, e.Op, value, regex) - } else if influxql.IsSystemName(tag.Val) { - return nil, nil - } - return is.measurementNamesByTagFilter(auth, e.Op, tag.Val, value, regex) - - case influxql.OR, influxql.AND: - lhs, err := is.measurementNamesByExpr(auth, e.LHS) - if err != nil { - return nil, err - } - - rhs, err := is.measurementNamesByExpr(auth, e.RHS) - if err != nil { - return nil, err - } - - if e.Op == influxql.OR { - return bytesutil.Union(lhs, rhs), nil - } - return bytesutil.Intersect(lhs, rhs), nil - - default: - return nil, fmt.Errorf("invalid tag comparison operator") - } - - case *influxql.ParenExpr: - return is.measurementNamesByExpr(auth, e.Expr) - default: - return nil, fmt.Errorf("%#v", expr) - } -} - -// measurementNamesByNameFilter returns matching measurement names in sorted order. -func (is IndexSet) measurementNamesByNameFilter(auth query.Authorizer, op influxql.Token, val string, regex *regexp.Regexp) ([][]byte, error) { - itr, err := is.measurementIterator() - if err != nil { - return nil, err - } else if itr == nil { - return nil, nil - } - defer itr.Close() - - var names [][]byte - for { - e, err := itr.Next() - if err != nil { - return nil, err - } else if e == nil { - break - } - - var matched bool - switch op { - case influxql.EQ: - matched = string(e) == val - case influxql.NEQ: - matched = string(e) != val - case influxql.EQREGEX: - matched = regex.Match(e) - case influxql.NEQREGEX: - matched = !regex.Match(e) - } - - if matched && is.measurementAuthorizedSeries(auth, e) { - names = append(names, e) - } - } - bytesutil.Sort(names) - return names, nil -} - -func (is IndexSet) measurementNamesByTagFilter(auth query.Authorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) { - var names [][]byte - - mitr, err := is.measurementIterator() - if err != nil { - return nil, err - } else if mitr == nil { - return nil, nil - } - defer mitr.Close() - - // valEqual determines if the provided []byte is equal to the tag value - // to be filtered on. - valEqual := regex.Match - if op == influxql.EQ || op == influxql.NEQ { - vb := []byte(val) - valEqual = func(b []byte) bool { return bytes.Equal(vb, b) } - } - - var tagMatch bool - var authorized bool - for { - me, err := mitr.Next() - if err != nil { - return nil, err - } else if me == nil { - break - } - // If the measurement doesn't have the tag key, then it won't be considered. - if ok, err := is.hasTagKey(me, []byte(key)); err != nil { - return nil, err - } else if !ok { - continue - } - tagMatch = false - // Authorization must be explicitly granted when an authorizer is present. - authorized = query.AuthorizerIsOpen(auth) - - vitr, err := is.tagValueIterator(me, []byte(key)) - if err != nil { - return nil, err - } - - if vitr != nil { - defer vitr.Close() - for { - ve, err := vitr.Next() - if err != nil { - return nil, err - } else if ve == nil { - break - } - if !valEqual(ve) { - continue - } - - tagMatch = true - if query.AuthorizerIsOpen(auth) { - break - } - - // When an authorizer is present, the measurement should be - // included only if one of it's series is authorized. - sitr, err := is.tagValueSeriesIDIterator(me, []byte(key), ve) - if err != nil { - return nil, err - } else if sitr == nil { - continue - } - defer sitr.Close() - sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr) - - // Locate a series with this matching tag value that's authorized. - for { - se, err := sitr.Next() - if err != nil { - return nil, err - } - - if se.SeriesID.IsZero() { - break - } - - name, tags := is.SeriesFile.Series(se.SeriesID) - if auth.AuthorizeSeriesRead(is.Database(), name, tags) { - authorized = true - break - } - } - - if err := sitr.Close(); err != nil { - return nil, err - } - - if tagMatch && authorized { - // The measurement can definitely be included or rejected. - break - } - } - if err := vitr.Close(); err != nil { - return nil, err - } - } - - // For negation operators, to determine if the measurement is authorized, - // an authorized series belonging to the measurement must be located. - // Then, the measurement can be added iff !tagMatch && authorized. - if (op == influxql.NEQ || op == influxql.NEQREGEX) && !tagMatch { - authorized = is.measurementAuthorizedSeries(auth, me) - } - - // tags match | operation is EQ | measurement matches - // -------------------------------------------------- - // True | True | True - // True | False | False - // False | True | False - // False | False | True - if tagMatch == (op == influxql.EQ || op == influxql.EQREGEX) && authorized { - names = append(names, me) - continue - } - } - - bytesutil.Sort(names) - return names, nil -} - -// measurementAuthorizedSeries determines if the measurement contains a series -// that is authorized to be read. -func (is IndexSet) measurementAuthorizedSeries(auth query.Authorizer, name []byte) bool { - if query.AuthorizerIsOpen(auth) { - return true - } - - sitr, err := is.measurementSeriesIDIterator(name) - if err != nil || sitr == nil { - return false - } - defer sitr.Close() - sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr) - - for { - series, err := sitr.Next() - if err != nil { - return false - } - - if series.SeriesID.IsZero() { - return false // End of iterator - } - - name, tags := is.SeriesFile.Series(series.SeriesID) - if auth.AuthorizeSeriesRead(is.Database(), name, tags) { - return true - } - } -} - -// HasTagKey returns true if the tag key exists in any index for the provided -// measurement. -func (is IndexSet) HasTagKey(name, key []byte) (bool, error) { - return is.hasTagKey(name, key) -} - -// hasTagKey returns true if the tag key exists in any index for the provided -// measurement, and guarantees to never take a lock on the series file. -func (is IndexSet) hasTagKey(name, key []byte) (bool, error) { - for _, idx := range is.Indexes { - if ok, err := idx.HasTagKey(name, key); err != nil { - return false, err - } else if ok { - return true, nil - } - } - return false, nil -} - -// HasTagValue returns true if the tag value exists in any index for the provided -// measurement and tag key. -func (is IndexSet) HasTagValue(name, key, value []byte) (bool, error) { - for _, idx := range is.Indexes { - if ok, err := idx.HasTagValue(name, key, value); err != nil { - return false, err - } else if ok { - return true, nil - } - } - return false, nil -} - // MeasurementIterator returns an iterator over all measurements in the index. func (is IndexSet) MeasurementIterator() (MeasurementIterator, error) { return is.measurementIterator() @@ -1621,32 +1006,6 @@ func (is IndexSet) measurementIterator() (MeasurementIterator, error) { return MergeMeasurementIterators(a...), nil } -// TagKeyIterator returns a key iterator for a measurement. -func (is IndexSet) TagKeyIterator(name []byte) (TagKeyIterator, error) { - return is.tagKeyIterator(name) -} - -// tagKeyIterator returns a key iterator for a measurement. It guarantees to never -// take any locks on the underlying series file. -func (is IndexSet) tagKeyIterator(name []byte) (TagKeyIterator, error) { - a := make([]TagKeyIterator, 0, len(is.Indexes)) - for _, idx := range is.Indexes { - itr, err := idx.TagKeyIterator(name) - if err != nil { - TagKeyIterators(a).Close() - return nil, err - } else if itr != nil { - a = append(a, itr) - } - } - return MergeTagKeyIterators(a...), nil -} - -// TagValueIterator returns a value iterator for a tag key. -func (is IndexSet) TagValueIterator(name, key []byte) (TagValueIterator, error) { - return is.tagValueIterator(name, key) -} - // tagValueIterator returns a value iterator for a tag key. It guarantees to never // take any locks on the underlying series file. func (is IndexSet) tagValueIterator(name, key []byte) (TagValueIterator, error) { @@ -1663,59 +1022,6 @@ func (is IndexSet) tagValueIterator(name, key []byte) (TagValueIterator, error) return MergeTagValueIterators(a...), nil } -// TagKeyHasAuthorizedSeries determines if there exists an authorized series for -// the provided measurement name and tag key. -func (is IndexSet) TagKeyHasAuthorizedSeries(auth query.Authorizer, name, tagKey []byte) (bool, error) { - if !is.HasInmemIndex() && query.AuthorizerIsOpen(auth) { - return true, nil - } - - release := is.SeriesFile.Retain() - defer release() - - itr, err := is.tagKeySeriesIDIterator(name, tagKey) - if err != nil { - return false, err - } else if itr == nil { - return false, nil - } - defer itr.Close() - itr = FilterUndeletedSeriesIDIterator(is.SeriesFile, itr) - - for { - e, err := itr.Next() - if err != nil { - return false, err - } - - if e.SeriesID.IsZero() { - return false, nil - } - - if query.AuthorizerIsOpen(auth) { - return true, nil - } - - name, tags := is.SeriesFile.Series(e.SeriesID) - if auth.AuthorizeSeriesRead(is.Database(), name, tags) { - return true, nil - } - } -} - -// MeasurementSeriesIDIterator returns an iterator over all non-tombstoned series -// for the provided measurement. -func (is IndexSet) MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, error) { - release := is.SeriesFile.Retain() - defer release() - - itr, err := is.measurementSeriesIDIterator(name) - if err != nil { - return nil, err - } - return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil -} - // measurementSeriesIDIterator does not provide any locking on the Series file. // // See MeasurementSeriesIDIterator for more details. @@ -1733,64 +1039,6 @@ func (is IndexSet) measurementSeriesIDIterator(name []byte) (SeriesIDIterator, e return MergeSeriesIDIterators(a...), nil } -// ForEachMeasurementTagKey iterates over all tag keys in a measurement and applies -// the provided function. -func (is IndexSet) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error { - release := is.SeriesFile.Retain() - defer release() - - itr, err := is.tagKeyIterator(name) - if err != nil { - return err - } else if itr == nil { - return nil - } - defer itr.Close() - - for { - key, err := itr.Next() - if err != nil { - return err - } else if key == nil { - return nil - } - - if err := fn(key); err != nil { - return err - } - } -} - -// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression. -func (is IndexSet) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) { - release := is.SeriesFile.Retain() - defer release() - - keys := make(map[string]struct{}) - for _, idx := range is.Indexes { - m, err := idx.MeasurementTagKeysByExpr(name, expr) - if err != nil { - return nil, err - } - for k := range m { - keys[k] = struct{}{} - } - } - return keys, nil -} - -// TagKeySeriesIDIterator returns a series iterator for all values across a single key. -func (is IndexSet) TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error) { - release := is.SeriesFile.Retain() - defer release() - - itr, err := is.tagKeySeriesIDIterator(name, key) - if err != nil { - return nil, err - } - return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil -} - // tagKeySeriesIDIterator returns a series iterator for all values across a // single key. // @@ -1809,18 +1057,6 @@ func (is IndexSet) tagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, e return MergeSeriesIDIterators(a...), nil } -// TagValueSeriesIDIterator returns a series iterator for a single tag value. -func (is IndexSet) TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error) { - release := is.SeriesFile.Retain() - defer release() - - itr, err := is.tagValueSeriesIDIterator(name, key, value) - if err != nil { - return nil, err - } - return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil -} - // tagValueSeriesIDIterator does not provide any locking on the Series File. // // See TagValueSeriesIDIterator for more details. @@ -1869,54 +1105,6 @@ func (is IndexSet) measurementSeriesByExprIterator(name []byte, expr influxql.Ex return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil } -// MeasurementSeriesKeysByExpr returns a list of series keys matching expr. -func (is IndexSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) { - release := is.SeriesFile.Retain() - defer release() - - // Create iterator for all matching series. - itr, err := is.measurementSeriesByExprIterator(name, expr) - if err != nil { - return nil, err - } else if itr == nil { - return nil, nil - } - defer itr.Close() - - // measurementSeriesByExprIterator filters deleted series; no need to do so here. - - // Iterate over all series and generate keys. - var keys [][]byte - for { - e, err := itr.Next() - if err != nil { - return nil, err - } else if e.SeriesID.IsZero() { - break - } - - // Check for unsupported field filters. - // Any remaining filters means there were fields (e.g., `WHERE value = 1.2`). - if e.Expr != nil { - if v, ok := e.Expr.(*influxql.BooleanLiteral); !ok || !v.Val { - return nil, errors.New("fields not supported in WHERE clause during deletion") - } - } - - seriesKey := is.SeriesFile.SeriesKey(e.SeriesID) - if len(seriesKey) == 0 { - continue - } - - name, tags := ParseSeriesKey(seriesKey) - keys = append(keys, models.MakeKey(name, tags)) - } - - bytesutil.Sort(keys) - - return keys, nil -} - func (is IndexSet) seriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIDIterator, error) { switch expr := expr.(type) { case *influxql.BinaryExpr: @@ -2125,18 +1313,6 @@ func (is IndexSet) seriesByBinaryExprVarRefIterator(name, key []byte, value *inf return DifferenceSeriesIDIterators(itr0, itr1), nil } -// MatchTagValueSeriesIDIterator returns a series iterator for tags which match value. -// If matches is false, returns iterators which do not match value. -func (is IndexSet) MatchTagValueSeriesIDIterator(name, key []byte, value *regexp.Regexp, matches bool) (SeriesIDIterator, error) { - release := is.SeriesFile.Retain() - defer release() - itr, err := is.matchTagValueSeriesIDIterator(name, key, value, matches) - if err != nil { - return nil, err - } - return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil -} - // matchTagValueSeriesIDIterator returns a series iterator for tags which match // value. See MatchTagValueSeriesIDIterator for more details. // @@ -2300,214 +1476,6 @@ func (is IndexSet) matchTagValueNotEqualNotEmptySeriesIDIterator(name, key []byt return DifferenceSeriesIDIterators(mitr, MergeSeriesIDIterators(itrs...)), nil } -// TagValuesByKeyAndExpr retrieves tag values for the provided tag keys. -// -// TagValuesByKeyAndExpr returns sets of values for each key, indexable by the -// position of the tag key in the keys argument. -// -// N.B tagValuesByKeyAndExpr relies on keys being sorted in ascending -// lexicographic order. -func (is IndexSet) TagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, fieldset *MeasurementFieldSet) ([]map[string]struct{}, error) { - release := is.SeriesFile.Retain() - defer release() - return is.tagValuesByKeyAndExpr(auth, name, keys, expr) -} - -// tagValuesByKeyAndExpr retrieves tag values for the provided tag keys. See -// TagValuesByKeyAndExpr for more details. -// -// tagValuesByKeyAndExpr guarantees to never take any locks on the underlying -// series file. -func (is IndexSet) tagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr) ([]map[string]struct{}, error) { - database := is.Database() - - valueExpr := influxql.CloneExpr(expr) - valueExpr = influxql.Reduce(influxql.RewriteExpr(valueExpr, func(e influxql.Expr) influxql.Expr { - switch e := e.(type) { - case *influxql.BinaryExpr: - switch e.Op { - case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: - tag, ok := e.LHS.(*influxql.VarRef) - if !ok || tag.Val != "value" { - return nil - } - } - } - return e - }), nil) - - itr, err := is.seriesByExprIterator(name, expr) - if err != nil { - return nil, err - } else if itr == nil { - return nil, nil - } - itr = FilterUndeletedSeriesIDIterator(is.SeriesFile, itr) - defer itr.Close() - - keyIdxs := make(map[string]int, len(keys)) - for ki, key := range keys { - keyIdxs[key] = ki - - // Check that keys are in order. - if ki > 0 && key < keys[ki-1] { - return nil, fmt.Errorf("keys %v are not in ascending order", keys) - } - } - - resultSet := make([]map[string]struct{}, len(keys)) - for i := 0; i < len(resultSet); i++ { - resultSet[i] = make(map[string]struct{}) - } - - // Iterate all series to collect tag values. - for { - e, err := itr.Next() - if err != nil { - return nil, err - } else if e.SeriesID.IsZero() { - break - } - - buf := is.SeriesFile.SeriesKey(e.SeriesID) - if len(buf) == 0 { - continue - } - - if auth != nil { - name, tags := ParseSeriesKey(buf) - if !auth.AuthorizeSeriesRead(database, name, tags) { - continue - } - } - - _, buf = ReadSeriesKeyLen(buf) - _, buf = ReadSeriesKeyMeasurement(buf) - tagN, buf := ReadSeriesKeyTagN(buf) - for i := 0; i < tagN; i++ { - var key, value []byte - key, value, buf = ReadSeriesKeyTag(buf) - if valueExpr != nil { - if !influxql.EvalBool(valueExpr, map[string]interface{}{"value": string(value)}) { - continue - } - } - - if idx, ok := keyIdxs[string(key)]; ok { - resultSet[idx][string(value)] = struct{}{} - } else if string(key) > keys[len(keys)-1] { - // The tag key is > the largest key we're interested in. - break - } - } - } - return resultSet, nil -} - -// MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression. -func (is IndexSet) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) { - if len(keys) == 0 { - return nil, nil - } - - results := make([][]string, len(keys)) - // If the keys are not sorted, then sort them. - if !keysSorted { - sort.Strings(keys) - } - - release := is.SeriesFile.Retain() - defer release() - - // No expression means that the values shouldn't be filtered; so fetch them - // all. - if expr == nil { - for ki, key := range keys { - vitr, err := is.tagValueIterator(name, []byte(key)) - if err != nil { - return nil, err - } else if vitr == nil { - break - } - defer vitr.Close() - - // If no authorizer present then return all values. - if query.AuthorizerIsOpen(auth) { - for { - val, err := vitr.Next() - if err != nil { - return nil, err - } else if val == nil { - break - } - results[ki] = append(results[ki], string(val)) - } - continue - } - - // Authorization is present — check all series with matching tag values - // and measurements for the presence of an authorized series. - for { - val, err := vitr.Next() - if err != nil { - return nil, err - } else if val == nil { - break - } - - sitr, err := is.tagValueSeriesIDIterator(name, []byte(key), val) - if err != nil { - return nil, err - } else if sitr == nil { - continue - } - defer sitr.Close() - sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr) - - for { - se, err := sitr.Next() - if err != nil { - return nil, err - } - - if se.SeriesID.IsZero() { - break - } - - name, tags := is.SeriesFile.Series(se.SeriesID) - if auth.AuthorizeSeriesRead(is.Database(), name, tags) { - results[ki] = append(results[ki], string(val)) - break - } - } - if err := sitr.Close(); err != nil { - return nil, err - } - } - } - return results, nil - } - - // This is the case where we have filtered series by some WHERE condition. - // We only care about the tag values for the keys given the - // filtered set of series ids. - resultSet, err := is.tagValuesByKeyAndExpr(auth, name, keys, expr) - if err != nil { - return nil, err - } - - // Convert result sets into []string - for i, s := range resultSet { - values := make([]string, 0, len(s)) - for v := range s { - values = append(values, v) - } - sort.Strings(values) - results[i] = values - } - return results, nil -} - // TagSets returns an ordered list of tag sets for a measurement by dimension // and filtered by an optional conditional expression. func (is IndexSet) TagSets(sfile *SeriesFile, name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) { @@ -2625,17 +1593,6 @@ func (is IndexSet) TagSets(sfile *SeriesFile, name []byte, opt query.IteratorOpt return sortedTagsSets, nil } -// IndexFormat represents the format for an index. -type IndexFormat int - -const ( - // InMemFormat is the format used by the original in-memory shared index. - InMemFormat IndexFormat = 1 - - // TSI1Format is the format used by the tsi1 index. - TSI1Format IndexFormat = 2 -) - // NewIndexFunc creates a new index. type NewIndexFunc func(id uint64, database, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) Index @@ -2685,16 +1642,6 @@ func NewIndex(id uint64, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFil return fn(id, "", path, seriesIDSet, sfile, options), nil } -func MustOpenIndex(id uint64, path string, seriesIDSet *SeriesIDSet, sfile *SeriesFile, options EngineOptions) Index { - idx, err := NewIndex(id, path, seriesIDSet, sfile, options) - if err != nil { - panic(err) - } else if err := idx.Open(); err != nil { - panic(err) - } - return idx -} - // assert will panic with a given formatted message if the given condition is false. func assert(condition bool, msg string, v ...interface{}) { if !condition { diff --git a/tsdb/series_collection.go b/tsdb/series_collection.go index cfb697c9c0..371ebef819 100644 --- a/tsdb/series_collection.go +++ b/tsdb/series_collection.go @@ -316,7 +316,6 @@ func (i *SeriesCollectionIterator) Next() bool { // Helpers that return the current state of the iterator. func (i SeriesCollectionIterator) Index() int { return i.index } -func (i SeriesCollectionIterator) Length() int { return i.length } func (i SeriesCollectionIterator) Point() models.Point { return i.s.Points[i.index] } func (i SeriesCollectionIterator) Key() []byte { return i.s.Keys[i.index] } func (i SeriesCollectionIterator) SeriesKey() []byte { return i.s.SeriesKeys[i.index] } diff --git a/tsdb/series_cursor.go b/tsdb/series_cursor.go index d69515ebad..6a7c722535 100644 --- a/tsdb/series_cursor.go +++ b/tsdb/series_cursor.go @@ -1,7 +1,6 @@ package tsdb import ( - "bytes" "errors" "sort" "sync" @@ -35,21 +34,6 @@ type SeriesCursorRow struct { Tags models.Tags } -func (r *SeriesCursorRow) Compare(other *SeriesCursorRow) int { - if r == other { - return 0 - } else if r == nil { - return -1 - } else if other == nil { - return 1 - } - cmp := bytes.Compare(r.Name, other.Name) - if cmp != 0 { - return cmp - } - return models.CompareTags(r.Tags, other.Tags) -} - // newSeriesCursor returns a new instance of SeriesCursor. func newSeriesCursor(req SeriesCursorRequest, indexSet IndexSet, cond influxql.Expr) (_ SeriesCursor, err error) { // Only equality operators are allowed. diff --git a/tsdb/series_file.go b/tsdb/series_file.go index dfbfba0f4d..7bb2796435 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -7,7 +7,6 @@ import ( "fmt" "os" "path/filepath" - "sort" "sync" "github.com/cespare/xxhash" @@ -20,13 +19,9 @@ import ( const SeriesFileDirectory = "_series" var ( - ErrSeriesFileClosed = errors.New("tsdb: series file closed") ErrInvalidSeriesPartitionID = errors.New("tsdb: invalid series partition id") ) -// SeriesIDSize is the size in bytes of a series key ID. -const SeriesIDSize = 8 - const ( // SeriesFilePartitionN is the number of partitions a series file is split into. SeriesFilePartitionN = 8 @@ -99,9 +94,6 @@ func (f *SeriesFile) SeriesPartitionPath(i int) string { return filepath.Join(f.path, fmt.Sprintf("%02x", i)) } -// Partitions returns all partitions. -func (f *SeriesFile) Partitions() []*SeriesPartition { return f.partitions } - // Retain adds a reference count to the file. It returns a release func. func (f *SeriesFile) Retain() func() { if f != nil { @@ -188,15 +180,6 @@ func (f *SeriesFile) SeriesKey(id SeriesID) []byte { return p.SeriesKey(id) } -// SeriesKeys returns a list of series keys from a list of ids. -func (f *SeriesFile) SeriesKeys(ids []SeriesID) [][]byte { - keys := make([][]byte, len(ids)) - for i := range ids { - keys[i] = f.SeriesKey(ids[i]) - } - return keys -} - // Series returns the parsed series name and tags for an offset. func (f *SeriesFile) Series(id SeriesID) ([]byte, models.Tags) { key := f.SeriesKey(id) @@ -221,25 +204,6 @@ func (f *SeriesFile) HasSeries(name []byte, tags models.Tags, buf []byte) bool { return !f.SeriesID(name, tags, buf).IsZero() } -// SeriesCount returns the number of series. -func (f *SeriesFile) SeriesCount() uint64 { - var n uint64 - for _, p := range f.partitions { - n += p.SeriesCount() - } - return n -} - -// SeriesIterator returns an iterator over all the series. -func (f *SeriesFile) SeriesIDIterator() SeriesIDIterator { - var ids []SeriesID - for _, p := range f.partitions { - ids = p.AppendSeriesIDs(ids) - } - sort.Slice(ids, func(i, j int) bool { return ids[i].Less(ids[j]) }) - return NewSeriesIDSliceIterator(ids) -} - func (f *SeriesFile) SeriesIDPartitionID(id SeriesID) int { return int((id.RawID() - 1) % SeriesFilePartitionN) } diff --git a/tsdb/series_id.go b/tsdb/series_id.go index 341268f8ab..90632e9752 100644 --- a/tsdb/series_id.go +++ b/tsdb/series_id.go @@ -12,11 +12,9 @@ const ( seriesIDValueMask = 0xFFFFFFFF // series ids numerically are 32 bits seriesIDTypeShift = 32 // we put the type right after the value info seriesIDTypeMask = 0xFF << seriesIDTypeShift // a mask for the type byte + seriesIDSize = 8 ) -// SeriesIDHasType returns if the raw id contains type information. -func SeriesIDHasType(id uint64) bool { return id&seriesIDTypeFlag > 0 } - // SeriesID is the type of a series id. It is logically a uint64, but encoded as a struct so // that we gain more type checking when changing operations on it. The field is exported only // so that tests that use reflection based comparisons still work; no one should use the field @@ -72,9 +70,9 @@ func (s SeriesIDTyped) Type() models.FieldType { type ( // some static assertions that the SeriesIDSize matches the structs we defined. // if the values are not the same, at least one will be negative causing a compilation failure - _ [SeriesIDSize - unsafe.Sizeof(SeriesID{})]byte - _ [unsafe.Sizeof(SeriesID{}) - SeriesIDSize]byte + _ [seriesIDSize - unsafe.Sizeof(SeriesID{})]byte + _ [unsafe.Sizeof(SeriesID{}) - seriesIDSize]byte - _ [SeriesIDSize - unsafe.Sizeof(SeriesIDTyped{})]byte - _ [unsafe.Sizeof(SeriesIDTyped{}) - SeriesIDSize]byte + _ [seriesIDSize - unsafe.Sizeof(SeriesIDTyped{})]byte + _ [unsafe.Sizeof(SeriesIDTyped{}) - seriesIDSize]byte ) diff --git a/tsdb/series_index.go b/tsdb/series_index.go index bfaccb17d5..46f5b6fa07 100644 --- a/tsdb/series_index.go +++ b/tsdb/series_index.go @@ -7,7 +7,6 @@ import ( "io" "os" - "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/mmap" "github.com/influxdata/influxdb/pkg/rhh" ) @@ -215,27 +214,6 @@ func (idx *SeriesIndex) FindIDBySeriesKey(segments []*SeriesSegment, key []byte) } } -func (idx *SeriesIndex) FindIDByNameTags(segments []*SeriesSegment, name []byte, tags models.Tags, buf []byte) SeriesIDTyped { - id := idx.FindIDBySeriesKey(segments, AppendSeriesKey(buf[:0], name, tags)) - if _, ok := idx.tombstones[id.SeriesID()]; ok { - return SeriesIDTyped{} - } - return id -} - -func (idx *SeriesIndex) FindIDListByNameTags(segments []*SeriesSegment, names [][]byte, tagsSlice []models.Tags, buf []byte) (ids []SeriesIDTyped, ok bool) { - ids, ok = make([]SeriesIDTyped, len(names)), true - for i := range names { - id := idx.FindIDByNameTags(segments, names[i], tagsSlice[i], buf) - if id.IsZero() { - ok = false - continue - } - ids[i] = id - } - return ids, ok -} - func (idx *SeriesIndex) FindOffsetByID(id SeriesID) int64 { if offset := idx.idOffsetMap[id]; offset != 0 { return offset diff --git a/tsdb/series_partition.go b/tsdb/series_partition.go index e0ac0106d0..25c639cdf5 100644 --- a/tsdb/series_partition.go +++ b/tsdb/series_partition.go @@ -311,13 +311,6 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(collection *SeriesCollecti return nil } -// Compacting returns if the SeriesPartition is currently compacting. -func (p *SeriesPartition) Compacting() bool { - p.mu.RLock() - defer p.mu.RUnlock() - return p.compacting -} - // DeleteSeriesID flags a series as permanently deleted. // If the series is reintroduced later then it must create a new id. func (p *SeriesPartition) DeleteSeriesID(id SeriesID) error { @@ -372,15 +365,6 @@ func (p *SeriesPartition) SeriesKey(id SeriesID) []byte { return key } -// Series returns the parsed series name and tags for an offset. -func (p *SeriesPartition) Series(id SeriesID) ([]byte, models.Tags) { - key := p.SeriesKey(id) - if key == nil { - return nil, nil - } - return ParseSeriesKey(key) -} - // FindIDBySeriesKey return the series id for the series key. func (p *SeriesPartition) FindIDBySeriesKey(key []byte) SeriesID { p.mu.RLock() @@ -393,18 +377,6 @@ func (p *SeriesPartition) FindIDBySeriesKey(key []byte) SeriesID { return id.SeriesID() } -// SeriesCount returns the number of series. -func (p *SeriesPartition) SeriesCount() uint64 { - p.mu.RLock() - if p.closed { - p.mu.RUnlock() - return 0 - } - n := p.index.Count() - p.mu.RUnlock() - return n -} - func (p *SeriesPartition) DisableCompactions() { p.mu.Lock() defer p.mu.Unlock() @@ -425,14 +397,6 @@ func (p *SeriesPartition) compactionsEnabled() bool { return p.compactionsDisabled == 0 } -// AppendSeriesIDs returns a list of all series ids. -func (p *SeriesPartition) AppendSeriesIDs(a []SeriesID) []SeriesID { - for _, segment := range p.segments { - a = segment.AppendSeriesIDs(a) - } - return a -} - // activeSegment returns the last segment. func (p *SeriesPartition) activeSegment() *SeriesSegment { if len(p.segments) == 0 { diff --git a/tsdb/series_segment.go b/tsdb/series_segment.go index 43247b9b6a..7820ed7fc5 100644 --- a/tsdb/series_segment.go +++ b/tsdb/series_segment.go @@ -8,7 +8,6 @@ import ( "fmt" "io" "os" - "regexp" "strconv" "github.com/influxdata/influxdb/pkg/mmap" @@ -168,16 +167,9 @@ func (s *SeriesSegment) CloseForWrite() (err error) { return err } -// Data returns the raw data. -func (s *SeriesSegment) Data() []byte { return s.data } - // ID returns the id the segment was initialized with. func (s *SeriesSegment) ID() uint16 { return s.id } -// Size returns the size of the data in the segment. -// This is only populated once InitForWrite() is called. -func (s *SeriesSegment) Size() int64 { return int64(s.size) } - // Slice returns a byte slice starting at pos. func (s *SeriesSegment) Slice(pos uint32) []byte { return s.data[pos:] } @@ -210,17 +202,6 @@ func (s *SeriesSegment) Flush() error { return s.w.Flush() } -// AppendSeriesIDs appends all the segments ids to a slice. Returns the new slice. -func (s *SeriesSegment) AppendSeriesIDs(a []SeriesID) []SeriesID { - s.ForEachEntry(func(flag uint8, id SeriesIDTyped, _ int64, _ []byte) error { - if flag == SeriesEntryInsertFlag { - a = append(a, id.SeriesID()) - } - return nil - }) - return a -} - // MaxSeriesID returns the highest series id in the segment. func (s *SeriesSegment) MaxSeriesID() SeriesID { var max SeriesID @@ -302,19 +283,12 @@ func SplitSeriesOffset(offset int64) (segmentID uint16, pos uint32) { return uint16((offset >> 32) & 0xFFFF), uint32(offset & 0xFFFFFFFF) } -// IsValidSeriesSegmentFilename returns true if filename is a 4-character lowercase hexidecimal number. -func IsValidSeriesSegmentFilename(filename string) bool { - return seriesSegmentFilenameRegex.MatchString(filename) -} - // ParseSeriesSegmentFilename returns the id represented by the hexidecimal filename. func ParseSeriesSegmentFilename(filename string) (uint16, error) { i, err := strconv.ParseUint(filename, 16, 32) return uint16(i), err } -var seriesSegmentFilenameRegex = regexp.MustCompile(`^[0-9a-f]{4}$`) - // SeriesSegmentSize returns the maximum size of the segment. // The size goes up by powers of 2 starting from 4MB and reaching 256MB. func SeriesSegmentSize(id uint16) uint32 { diff --git a/tsdb/series_set.go b/tsdb/series_set.go index 881c42d8de..b8bf99f44f 100644 --- a/tsdb/series_set.go +++ b/tsdb/series_set.go @@ -108,19 +108,6 @@ func (s *SeriesIDSet) Merge(others ...*SeriesIDSet) { s.Unlock() } -// Equals returns true if other and s are the same set of ids. -func (s *SeriesIDSet) Equals(other *SeriesIDSet) bool { - if s == other { - return true - } - - s.RLock() - defer s.RUnlock() - other.RLock() - defer other.RUnlock() - return s.bitmap.Equals(other.bitmap) -} - // And returns a new SeriesIDSet containing elements that were present in s and other. func (s *SeriesIDSet) And(other *SeriesIDSet) *SeriesIDSet { s.RLock() @@ -176,13 +163,6 @@ func (s *SeriesIDSet) Diff(other *SeriesIDSet) { s.bitmap = roaring.AndNot(s.bitmap, other.bitmap) } -// Clone returns a new SeriesIDSet with a deep copy of the underlying bitmap. -func (s *SeriesIDSet) Clone() *SeriesIDSet { - s.RLock() - defer s.RUnlock() - return s.CloneNoLock() -} - // CloneNoLock calls Clone without taking a lock. func (s *SeriesIDSet) CloneNoLock() *SeriesIDSet { new := NewSeriesIDSet() diff --git a/tsdb/shard.go b/tsdb/shard.go index 846a610e7e..9e6f7829b1 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -86,15 +86,6 @@ func (s *Shard) SetEnabled(enabled bool) { s.mu.Unlock() } -// ScheduleFullCompaction forces a full compaction to be schedule on the shard. -func (s *Shard) ScheduleFullCompaction() error { - engine, err := s.Engine() - if err != nil { - return err - } - return engine.ScheduleFullCompaction() -} - // ID returns the shards ID. func (s *Shard) ID() uint64 { return s.id