Filter series IDs at the last possible moment

pull/10013/head
Edd Robinson 2018-06-27 17:48:48 +01:00
parent 609b980671
commit 6059db3d3a
1 changed files with 37 additions and 6 deletions

View File

@ -1387,6 +1387,7 @@ func (is IndexSet) measurementNamesByTagFilter(auth query.Authorizer, op influxq
continue
}
defer sitr.Close()
sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr)
// Locate a series with this matching tag value that's authorized.
for {
@ -1455,6 +1456,7 @@ func (is IndexSet) measurementAuthorizedSeries(auth query.Authorizer, name []byt
return false
}
defer sitr.Close()
sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr)
for {
series, err := sitr.Next()
@ -1585,6 +1587,7 @@ func (is IndexSet) TagKeyHasAuthorizedSeries(auth query.Authorizer, name, tagKey
return false, nil
}
defer itr.Close()
itr = FilterUndeletedSeriesIDIterator(is.SeriesFile, itr)
for {
e, err := itr.Next()
@ -1612,7 +1615,12 @@ func (is IndexSet) TagKeyHasAuthorizedSeries(auth query.Authorizer, name, tagKey
func (is IndexSet) MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, error) {
release := is.SeriesFile.Retain()
defer release()
return is.measurementSeriesIDIterator(name)
itr, err := is.measurementSeriesIDIterator(name)
if err != nil {
return nil, err
}
return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
}
// measurementSeriesIDIterator does not provide any locking on the Series file.
@ -1682,7 +1690,12 @@ func (is IndexSet) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (ma
func (is IndexSet) TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error) {
release := is.SeriesFile.Retain()
defer release()
return is.tagKeySeriesIDIterator(name, key)
itr, err := is.tagKeySeriesIDIterator(name, key)
if err != nil {
return nil, err
}
return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
}
// tagKeySeriesIDIterator returns a series iterator for all values across a
@ -1707,7 +1720,12 @@ func (is IndexSet) tagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, e
func (is IndexSet) TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error) {
release := is.SeriesFile.Retain()
defer release()
return is.tagValueSeriesIDIterator(name, key, value)
itr, err := is.tagValueSeriesIDIterator(name, key, value)
if err != nil {
return nil, err
}
return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
}
// tagValueSeriesIDIterator does not provide any locking on the Series File.
@ -1744,14 +1762,18 @@ func (is IndexSet) MeasurementSeriesByExprIterator(name []byte, expr influxql.Ex
func (is IndexSet) measurementSeriesByExprIterator(name []byte, expr influxql.Expr) (SeriesIDIterator, error) {
// Return all series for the measurement if there are no tag expressions.
if expr == nil {
return is.measurementSeriesIDIterator(name)
itr, err := is.measurementSeriesIDIterator(name)
if err != nil {
return nil, err
}
return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
}
itr, err := is.seriesByExprIterator(name, expr)
if err != nil {
return nil, err
}
return itr, nil
return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
}
// MeasurementSeriesKeysByExpr returns a list of series keys matching expr.
@ -1768,6 +1790,8 @@ func (is IndexSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr)
}
defer itr.Close()
// measurementSeriesByExprIterator filters deleted series; no need to do so here.
// Iterate over all series and generate keys.
var keys [][]byte
for {
@ -2013,7 +2037,11 @@ func (is IndexSet) seriesByBinaryExprVarRefIterator(name, key []byte, value *inf
func (is IndexSet) MatchTagValueSeriesIDIterator(name, key []byte, value *regexp.Regexp, matches bool) (SeriesIDIterator, error) {
release := is.SeriesFile.Retain()
defer release()
return is.matchTagValueSeriesIDIterator(name, key, value, matches)
itr, err := is.matchTagValueSeriesIDIterator(name, key, value, matches)
if err != nil {
return nil, err
}
return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
}
// matchTagValueSeriesIDIterator returns a series iterator for tags which match
@ -2337,6 +2365,7 @@ func (is IndexSet) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []b
continue
}
defer sitr.Close()
sitr = FilterUndeletedSeriesIDIterator(is.SeriesFile, sitr)
for {
se, err := sitr.Next()
@ -2395,6 +2424,8 @@ func (is IndexSet) TagSets(sfile *SeriesFile, name []byte, opt query.IteratorOpt
return nil, nil
}
defer itr.Close()
// measurementSeriesByExprIterator filters deleted series IDs; no need to
// do so here.
var dims []string
if len(opt.Dimensions) > 0 {