Merge pull request #9308 from influxdata/jw-series-file-retain

Ensure series file is not closed while in use
pull/9311/head
Jason Wilder 2018-01-12 17:30:34 -07:00 committed by GitHub
commit 9418d373d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 70 additions and 0 deletions

View File

@ -1097,6 +1097,9 @@ func (is IndexSet) FieldSet() *MeasurementFieldSet {
// DedupeInmemIndexes returns an index set which removes duplicate in-memory indexes.
func (is IndexSet) DedupeInmemIndexes() IndexSet {
release := is.SeriesFile.Retain()
defer release()
other := IndexSet{Indexes: make([]Index, 0, len(is.Indexes)), SeriesFile: is.SeriesFile}
var hasInmem bool
@ -1114,6 +1117,9 @@ func (is IndexSet) DedupeInmemIndexes() IndexSet {
}
func (is IndexSet) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
release := is.SeriesFile.Retain()
defer release()
// Return filtered list if expression exists.
if expr != nil {
return is.measurementNamesByExpr(auth, expr)
@ -1482,6 +1488,9 @@ func (is IndexSet) TagValueIterator(name, key []byte) (TagValueIterator, error)
// TagKeyHasAuthorizedSeries determines if there exists an authorized series for
// the provided measurement name and tag key.
func (is IndexSet) TagKeyHasAuthorizedSeries(auth query.Authorizer, name, tagKey []byte) (bool, error) {
release := is.SeriesFile.Retain()
defer release()
itr, err := is.TagKeySeriesIDIterator(name, tagKey)
if err != nil {
return false, err
@ -1514,6 +1523,9 @@ func (is IndexSet) TagKeyHasAuthorizedSeries(auth query.Authorizer, name, tagKey
// MeasurementSeriesIDIterator returns an iterator over all non-tombstoned series
// for the provided measurement.
func (is IndexSet) MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, error) {
release := is.SeriesFile.Retain()
defer release()
a := make([]SeriesIDIterator, 0, len(is.Indexes))
for _, idx := range is.Indexes {
itr, err := idx.MeasurementSeriesIDIterator(name)
@ -1530,6 +1542,9 @@ func (is IndexSet) MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, e
// ForEachMeasurementTagKey iterates over all tag keys in a measurement and applies
// the provided function.
func (is IndexSet) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
release := is.SeriesFile.Retain()
defer release()
itr, err := is.TagKeyIterator(name)
if err != nil {
return err
@ -1554,6 +1569,9 @@ func (is IndexSet) ForEachMeasurementTagKey(name []byte, fn func(key []byte) err
// MeasurementTagKeysByExpr extracts the tag keys wanted by the expression.
func (is IndexSet) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) {
release := is.SeriesFile.Retain()
defer release()
keys := make(map[string]struct{})
for _, idx := range is.Indexes {
m, err := idx.MeasurementTagKeysByExpr(name, expr)
@ -1569,6 +1587,9 @@ func (is IndexSet) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (ma
// TagKeySeriesIDIterator returns a series iterator for all values across a single key.
func (is IndexSet) TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error) {
release := is.SeriesFile.Retain()
defer release()
a := make([]SeriesIDIterator, 0, len(is.Indexes))
for _, idx := range is.Indexes {
itr, err := idx.TagKeySeriesIDIterator(name, key)
@ -1584,6 +1605,9 @@ func (is IndexSet) TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, e
// TagValueSeriesIDIterator returns a series iterator for a single tag value.
func (is IndexSet) TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error) {
release := is.SeriesFile.Retain()
defer release()
a := make([]SeriesIDIterator, 0, len(is.Indexes))
for _, idx := range is.Indexes {
itr, err := idx.TagValueSeriesIDIterator(name, key, value)
@ -1601,6 +1625,9 @@ func (is IndexSet) TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIt
// that is filtered by expr. If expr only contains time expressions then this
// call is equivalent to MeasurementSeriesIDIterator().
func (is IndexSet) MeasurementSeriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIDIterator, error) {
release := is.SeriesFile.Retain()
defer release()
// Return all series for the measurement if there are no tag expressions.
if expr == nil {
return is.MeasurementSeriesIDIterator(name)
@ -1616,6 +1643,9 @@ func (is IndexSet) MeasurementSeriesByExprIterator(name []byte, expr influxql.Ex
// MeasurementSeriesKeysByExpr returns a list of series keys matching expr.
func (is IndexSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error) {
release := is.SeriesFile.Retain()
defer release()
// Create iterator for all matching series.
itr, err := is.MeasurementSeriesByExprIterator(name, expr)
if err != nil {
@ -1851,6 +1881,9 @@ func (is IndexSet) seriesByBinaryExprVarRefIterator(name, key []byte, value *inf
// MatchTagValueSeriesIDIterator returns a series iterator for tags which match value.
// If matches is false, returns iterators which do not match value.
func (is IndexSet) MatchTagValueSeriesIDIterator(name, key []byte, value *regexp.Regexp, matches bool) (SeriesIDIterator, error) {
release := is.SeriesFile.Retain()
defer release()
matchEmpty := value.MatchString("")
if matches {
@ -2014,6 +2047,9 @@ func (is IndexSet) matchTagValueNotEqualNotEmptySeriesIDIterator(name, key []byt
// N.B tagValuesByKeyAndExpr relies on keys being sorted in ascending
// lexicographic order.
func (is IndexSet) TagValuesByKeyAndExpr(auth query.Authorizer, name []byte, keys []string, expr influxql.Expr, fieldset *MeasurementFieldSet) ([]map[string]struct{}, error) {
release := is.SeriesFile.Retain()
defer release()
database := is.Database()
itr, err := is.seriesByExprIterator(name, expr, fieldset.Fields(string(name)))
@ -2091,6 +2127,9 @@ func (is IndexSet) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []b
sort.Sort(sort.StringSlice(keys))
}
release := is.SeriesFile.Retain()
defer release()
// No expression means that the values shouldn't be filtered; so fetch them
// all.
if expr == nil {
@ -2182,6 +2221,9 @@ func (is IndexSet) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []b
// TagSets returns an ordered list of tag sets for a measurement by dimension
// and filtered by an optional conditional expression.
func (is IndexSet) TagSets(sfile *SeriesFile, name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) {
release := is.SeriesFile.Retain()
defer release()
itr, err := is.MeasurementSeriesByExprIterator(name, opt.Condition)
if err != nil {
return nil, err

View File

@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"sort"
"sync"
"github.com/cespare/xxhash"
"github.com/influxdata/influxdb/models"
@ -34,6 +35,8 @@ type SeriesFile struct {
path string
partitions []*SeriesPartition
refs sync.RWMutex // RWMutex to track references to the SeriesFile that are in use.
Logger *zap.Logger
}
@ -47,6 +50,10 @@ func NewSeriesFile(path string) *SeriesFile {
// Open memory maps the data file at the file's path.
func (f *SeriesFile) Open() error {
// Wait for all references to be released and prevent new ones from being acquired.
f.refs.Lock()
defer f.refs.Unlock()
// Create path if it doesn't exist.
if err := os.MkdirAll(filepath.Join(f.path), 0777); err != nil {
return err
@ -69,11 +76,16 @@ func (f *SeriesFile) Open() error {
// Close unmaps the data file.
func (f *SeriesFile) Close() (err error) {
// Wait for all references to be released and prevent new ones from being acquired.
f.refs.Lock()
defer f.refs.Unlock()
for _, p := range f.partitions {
if e := p.Close(); e != nil && err == nil {
err = e
}
}
return err
}
@ -88,6 +100,17 @@ func (f *SeriesFile) SeriesPartitionPath(i int) string {
// Partitions returns all partitions.
func (f *SeriesFile) Partitions() []*SeriesPartition { return f.partitions }
// Retain adds a reference count to the file. It returns a release func.
func (f *SeriesFile) Retain() func() {
if f != nil {
f.refs.RLock()
// Return the RUnlock func as the release func to be called when done.
return f.refs.RUnlock
}
return nop
}
// CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist.
// The returned ids list returns values for new series and zero for existing series.
func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []models.Tags, buf []byte) (ids []uint64, err error) {
@ -426,3 +449,5 @@ type uint64Slice []uint64
func (a uint64Slice) Len() int { return len(a) }
func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }
func nop() {}

View File

@ -1074,6 +1074,9 @@ func (s *Store) MeasurementNames(auth query.Authorizer, database string, cond in
return nil, nil
}
release := sfile.Retain()
defer release()
// Build indexset.
is := IndexSet{Indexes: make([]Index, 0, len(shards)), SeriesFile: sfile}
for _, sh := range shards {