diff --git a/tsdb/index.go b/tsdb/index.go index 05e4eec803..5983ae3031 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -1097,6 +1097,9 @@ func (is IndexSet) FieldSet() *MeasurementFieldSet { // DedupeInmemIndexes returns an index set which removes duplicate in-memory indexes. func (is IndexSet) DedupeInmemIndexes() IndexSet { + release := is.SeriesFile.Retain() + defer release() + other := IndexSet{Indexes: make([]Index, 0, len(is.Indexes)), SeriesFile: is.SeriesFile} var hasInmem bool @@ -1114,6 +1117,9 @@ func (is IndexSet) DedupeInmemIndexes() IndexSet { } func (is IndexSet) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) { + release := is.SeriesFile.Retain() + defer release() + // Return filtered list if expression exists. if expr != nil { return is.measurementNamesByExpr(auth, expr) @@ -1482,6 +1488,9 @@ func (is IndexSet) TagValueIterator(name, key []byte) (TagValueIterator, error) // TagKeyHasAuthorizedSeries determines if there exists an authorized series for // the provided measurement name and tag key. func (is IndexSet) TagKeyHasAuthorizedSeries(auth query.Authorizer, name, tagKey []byte) (bool, error) { + release := is.SeriesFile.Retain() + defer release() + itr, err := is.TagKeySeriesIDIterator(name, tagKey) if err != nil { return false, err @@ -1514,6 +1523,9 @@ func (is IndexSet) TagKeyHasAuthorizedSeries(auth query.Authorizer, name, tagKey // MeasurementSeriesIDIterator returns an iterator over all non-tombstoned series // for the provided measurement. func (is IndexSet) MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, error) { + release := is.SeriesFile.Retain() + defer release() + a := make([]SeriesIDIterator, 0, len(is.Indexes)) for _, idx := range is.Indexes { itr, err := idx.MeasurementSeriesIDIterator(name) @@ -1530,6 +1542,9 @@ func (is IndexSet) MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, e // ForEachMeasurementTagKey iterates over all tag keys in a measurement and applies // the provided function. func (is IndexSet) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error { + release := is.SeriesFile.Retain() + defer release() + itr, err := is.TagKeyIterator(name) if err != nil { return err @@ -1554,6 +1569,9 @@ func (is IndexSet) ForEachMeasurementTagKey(name []byte, fn func(key []byte) err // MeasurementTagKeysByExpr extracts the tag keys wanted by the expression. func (is IndexSet) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) { + release := is.SeriesFile.Retain() + defer release() + keys := make(map[string]struct{}) for _, idx := range is.Indexes { m, err := idx.MeasurementTagKeysByExpr(name, expr) @@ -1569,6 +1587,9 @@ func (is IndexSet) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (ma // TagKeySeriesIDIterator returns a series iterator for all values across a single key. func (is IndexSet) TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error) { + release := is.SeriesFile.Retain() + defer release() + a := make([]SeriesIDIterator, 0, len(is.Indexes)) for _, idx := range is.Indexes { itr, err := idx.TagKeySeriesIDIterator(name, key) @@ -1584,6 +1605,9 @@ func (is IndexSet) TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, e // TagValueSeriesIDIterator returns a series iterator for a single tag value. func (is IndexSet) TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error) { + release := is.SeriesFile.Retain() + defer release() + a := make([]SeriesIDIterator, 0, len(is.Indexes)) for _, idx := range is.Indexes { itr, err := idx.TagValueSeriesIDIterator(name, key, value) @@ -1601,6 +1625,9 @@ func (is IndexSet) TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIt // that is filtered by expr. If expr only contains time expressions then this // call is equivalent to MeasurementSeriesIDIterator(). func (is IndexSet) MeasurementSeriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIDIterator, error) { + release := is.SeriesFile.Retain() + defer release() + // Return all series for the measurement if there are no tag expressions. if expr == nil { return is.MeasurementSeriesIDIterator(name) @@ -1616,6 +1643,9 @@ func (is IndexSet) MeasurementSeriesByExprIterator(name []byte, expr influxql.Ex // MeasurementSeriesKeysByExpr returns a list of series keys matching expr. func (is IndexSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) { + release := is.SeriesFile.Retain() + defer release() + // Create iterator for all matching series. itr, err := is.MeasurementSeriesByExprIterator(name, expr) if err != nil { @@ -1851,6 +1881,9 @@ func (is IndexSet) seriesByBinaryExprVarRefIterator(name, key []byte, value *inf // MatchTagValueSeriesIDIterator returns a series iterator for tags which match value. // If matches is false, returns iterators which do not match value. func (is IndexSet) MatchTagValueSeriesIDIterator(name, key []byte, value *regexp.Regexp, matches bool) (SeriesIDIterator, error) { + release := is.SeriesFile.Retain() + defer release() + matchEmpty := value.MatchString("") if matches { @@ -2014,6 +2047,9 @@ func (is IndexSet) matchTagValueNotEqualNotEmptySeriesIDIterator(name, key []byt // N.B tagValuesByKeyAndExpr relies on keys being sorted in ascending // lexicographic order. func (is IndexSet) TagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, fieldset *MeasurementFieldSet) ([]map[string]struct{}, error) { + release := is.SeriesFile.Retain() + defer release() + database := is.Database() itr, err := is.seriesByExprIterator(name, expr, fieldset.Fields(string(name))) @@ -2091,6 +2127,9 @@ func (is IndexSet) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []b sort.Sort(sort.StringSlice(keys)) } + release := is.SeriesFile.Retain() + defer release() + // No expression means that the values shouldn't be filtered; so fetch them // all. if expr == nil { @@ -2182,6 +2221,9 @@ func (is IndexSet) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []b // TagSets returns an ordered list of tag sets for a measurement by dimension // and filtered by an optional conditional expression. func (is IndexSet) TagSets(sfile *SeriesFile, name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) { + release := is.SeriesFile.Retain() + defer release() + itr, err := is.MeasurementSeriesByExprIterator(name, opt.Condition) if err != nil { return nil, err diff --git a/tsdb/series_file.go b/tsdb/series_file.go index 6c27cd7aeb..f72decc7b7 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "sort" + "sync" "github.com/cespare/xxhash" "github.com/influxdata/influxdb/models" @@ -34,6 +35,8 @@ type SeriesFile struct { path string partitions []*SeriesPartition + refs sync.RWMutex // RWMutex to track references to the SeriesFile that are in use. + Logger *zap.Logger } @@ -47,6 +50,10 @@ func NewSeriesFile(path string) *SeriesFile { // Open memory maps the data file at the file's path. func (f *SeriesFile) Open() error { + // Wait for all references to be released and prevent new ones from being acquired. + f.refs.Lock() + defer f.refs.Unlock() + // Create path if it doesn't exist. if err := os.MkdirAll(filepath.Join(f.path), 0777); err != nil { return err @@ -69,11 +76,16 @@ func (f *SeriesFile) Open() error { // Close unmaps the data file. func (f *SeriesFile) Close() (err error) { + // Wait for all references to be released and prevent new ones from being acquired. + f.refs.Lock() + defer f.refs.Unlock() + for _, p := range f.partitions { if e := p.Close(); e != nil && err == nil { err = e } } + return err } @@ -88,6 +100,17 @@ func (f *SeriesFile) SeriesPartitionPath(i int) string { // Partitions returns all partitions. func (f *SeriesFile) Partitions() []*SeriesPartition { return f.partitions } +// Retain adds a reference count to the file. It returns a release func. +func (f *SeriesFile) Retain() func() { + if f != nil { + f.refs.RLock() + + // Return the RUnlock func as the release func to be called when done. + return f.refs.RUnlock + } + return nop +} + // CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist. // The returned ids list returns values for new series and zero for existing series. func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags, buf []byte) (ids []uint64, err error) { @@ -426,3 +449,5 @@ type uint64Slice []uint64 func (a uint64Slice) Len() int { return len(a) } func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] } + +func nop() {} diff --git a/tsdb/store.go b/tsdb/store.go index 1e106a72eb..817c421627 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -1074,6 +1074,9 @@ func (s *Store) MeasurementNames(auth query.Authorizer, database string, cond in return nil, nil } + release := sfile.Retain() + defer release() + // Build indexset. is := IndexSet{Indexes: make([]Index, 0, len(shards)), SeriesFile: sfile} for _, sh := range shards {