Ensure deleted series are not returned via iterators
parent
7080ffcaaa
commit
9e3b17fd09
|
@ -295,6 +295,12 @@ func init() {
|
|||
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverB","uswest",23.2],["2000-01-03T00:00:00Z","serverA","uswest",200]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Make sure other points are deleted",
|
||||
command: `SELECT COUNT(val) FROM cpu WHERE "host" = 'serverA'`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Make sure data wasn't deleted from other database.",
|
||||
command: `SELECT * FROM cpu`,
|
||||
|
|
|
@ -1164,6 +1164,7 @@ func (e *Engine) DeleteSeriesRange(itr tsdb.SeriesIterator, min, max int64) erro
|
|||
} else if elem == nil {
|
||||
break
|
||||
}
|
||||
|
||||
if elem.Expr() != nil {
|
||||
if v, ok := elem.Expr().(*influxql.BooleanLiteral); !ok || !v.Val {
|
||||
return errors.New("fields not supported in WHERE clause during deletion")
|
||||
|
@ -1307,7 +1308,7 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
|
|||
// exists now. To reconcile the index, we walk the series keys that still exists
|
||||
// on disk and cross out any keys that match the passed in series. Any series
|
||||
// left in the slice at the end do not exist and can be deleted from the index.
|
||||
// Note: this is inherently racy if writes are occuring to the same measurement/series are
|
||||
// Note: this is inherently racy if writes are occurring to the same measurement/series are
|
||||
// being removed. A write could occur and exist in the cache at this point, but we
|
||||
// would delete it from the index.
|
||||
minKey := seriesKeys[0]
|
||||
|
|
|
@ -885,7 +885,8 @@ func TestEngine_DeleteSeriesRange(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check that the series still exists in the index
|
||||
iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu"))
|
||||
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
|
||||
iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu"))
|
||||
if err != nil {
|
||||
t.Fatalf("iterator error: %v", err)
|
||||
}
|
||||
|
|
|
@ -1504,7 +1504,7 @@ func (is IndexSet) MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, e
|
|||
a = append(a, itr)
|
||||
}
|
||||
}
|
||||
return MergeSeriesIDIterators(a...), nil
|
||||
return FilterUndeletedSeriesIDIterator(is.SeriesFile, MergeSeriesIDIterators(a...)), nil
|
||||
}
|
||||
|
||||
// ForEachMeasurementTagKey iterates over all tag keys in a measurement and applies
|
||||
|
@ -1559,7 +1559,7 @@ func (is IndexSet) TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, e
|
|||
a = append(a, itr)
|
||||
}
|
||||
}
|
||||
return MergeSeriesIDIterators(a...), nil
|
||||
return FilterUndeletedSeriesIDIterator(is.SeriesFile, MergeSeriesIDIterators(a...)), nil
|
||||
}
|
||||
|
||||
// TagValueSeriesIDIterator returns a series iterator for a single tag value.
|
||||
|
@ -1574,7 +1574,7 @@ func (is IndexSet) TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIt
|
|||
a = append(a, itr)
|
||||
}
|
||||
}
|
||||
return MergeSeriesIDIterators(a...), nil
|
||||
return FilterUndeletedSeriesIDIterator(is.SeriesFile, MergeSeriesIDIterators(a...)), nil
|
||||
}
|
||||
|
||||
// MeasurementSeriesByExprIterator returns a series iterator for a measurement
|
||||
|
@ -1586,7 +1586,12 @@ func (is IndexSet) MeasurementSeriesByExprIterator(name []byte, expr influxql.Ex
|
|||
return is.MeasurementSeriesIDIterator(name)
|
||||
}
|
||||
fieldset := is.FieldSet()
|
||||
return is.seriesByExprIterator(name, expr, fieldset.CreateFieldsIfNotExists(name))
|
||||
|
||||
itr, err := is.seriesByExprIterator(name, expr, fieldset.CreateFieldsIfNotExists(name))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return FilterUndeletedSeriesIDIterator(is.SeriesFile, itr), nil
|
||||
}
|
||||
|
||||
// MeasurementSeriesKeysByExpr returns a list of series keys matching expr.
|
||||
|
@ -1997,6 +2002,7 @@ func (is IndexSet) TagValuesByKeyAndExpr(auth query.Authorizer, name []byte, key
|
|||
} else if itr == nil {
|
||||
return nil, nil
|
||||
}
|
||||
itr = FilterUndeletedSeriesIDIterator(is.SeriesFile, itr)
|
||||
defer itr.Close()
|
||||
|
||||
keyIdxs := make(map[string]int, len(keys))
|
||||
|
|
|
@ -457,6 +457,9 @@ func (i *Index) TagsForSeries(key string) (models.Tags, error) {
|
|||
|
||||
// MeasurementNamesByExpr takes an expression containing only tags and returns a
|
||||
// list of matching measurement names.
|
||||
//
|
||||
// TODO(edd): Remove authorisation from these methods. There shouldn't need to
|
||||
// be any auth passed down into the index.
|
||||
func (i *Index) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
|
||||
i.mu.RLock()
|
||||
defer i.mu.RUnlock()
|
||||
|
@ -603,7 +606,14 @@ func (i *Index) measurementNamesByTagFilters(auth query.Authorizer, filter *TagF
|
|||
// Is there a series with this matching tag value that is
|
||||
// authorized to be read?
|
||||
for _, sid := range seriesIDs {
|
||||
if s := m.SeriesByID(sid); s != nil && auth.AuthorizeSeriesRead(i.database, m.name, s.Tags()) {
|
||||
s := m.SeriesByID(sid)
|
||||
|
||||
// If the series is deleted then it can't be used to authorise against.
|
||||
if s != nil && s.Deleted() {
|
||||
continue
|
||||
}
|
||||
|
||||
if s != nil && auth.AuthorizeSeriesRead(i.database, m.name, s.Tags()) {
|
||||
// The Range call can return early as a matching
|
||||
// tag value with an authorized series has been found.
|
||||
authorized = true
|
||||
|
@ -705,7 +715,6 @@ func (i *Index) DropSeries(key []byte, ts int64) error {
|
|||
|
||||
// Remove the measurement's reference.
|
||||
series.Measurement().DropSeries(series)
|
||||
|
||||
// Mark the series as deleted.
|
||||
series.Delete(ts)
|
||||
|
||||
|
|
|
@ -55,12 +55,8 @@ func NewMeasurement(database, name string) *Measurement {
|
|||
|
||||
// Authorized determines if this Measurement is authorized to be read, according
|
||||
// to the provided Authorizer. A measurement is authorized to be read if at
|
||||
// least one series from the measurement is authorized to be read.
|
||||
// least one undeleted series from the measurement is authorized to be read.
|
||||
func (m *Measurement) Authorized(auth query.Authorizer) bool {
|
||||
if auth == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
// Note(edd): the cost of this check scales linearly with the number of series
|
||||
// belonging to a measurement, which means it may become expensive when there
|
||||
// are large numbers of series on a measurement.
|
||||
|
@ -68,7 +64,11 @@ func (m *Measurement) Authorized(auth query.Authorizer) bool {
|
|||
// In the future we might want to push the set of series down into the
|
||||
// authorizer, but that will require an API change.
|
||||
for _, s := range m.SeriesByIDMap() {
|
||||
if auth.AuthorizeSeriesRead(m.database, m.name, s.tags) {
|
||||
if s != nil && s.Deleted() {
|
||||
continue
|
||||
}
|
||||
|
||||
if auth == nil || auth.AuthorizeSeriesRead(m.database, m.name, s.tags) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
|
|
@ -540,7 +540,6 @@ func (i *Partition) DropSeries(key []byte, ts int64) error {
|
|||
|
||||
mname := []byte(name)
|
||||
seriesID := i.sfile.Offset(mname, tags, nil)
|
||||
|
||||
if err := i.sfile.DeleteSeriesID(seriesID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -550,7 +549,7 @@ func (i *Partition) DropSeries(key []byte, ts int64) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Swap log file, if necesssary.
|
||||
// Swap log file, if necessary.
|
||||
if err := i.CheckLogFile(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -720,11 +719,6 @@ func (i *Partition) SetFieldName(measurement []byte, name string) {}
|
|||
func (i *Partition) RemoveShard(shardID uint64) {}
|
||||
func (i *Partition) AssignShard(k string, shardID uint64) {}
|
||||
|
||||
func (i *Partition) UnassignShard(k string, shardID uint64, ts int64) error {
|
||||
// This can be called directly once inmem is gone.
|
||||
return i.DropSeries([]byte(k), ts)
|
||||
}
|
||||
|
||||
// Compact requests a compaction of log files.
|
||||
func (i *Partition) Compact() {
|
||||
i.mu.Lock()
|
||||
|
|
|
@ -890,6 +890,53 @@ cpu,secret=foo value=100 0
|
|||
if gotCount != expCount {
|
||||
return fmt.Errorf("got %d series, expected %d", gotCount, expCount)
|
||||
}
|
||||
|
||||
// Delete series cpu,host=serverA,region=uswest
|
||||
if err := sh.Index().DropSeries([]byte("cpu,host=serverA,region=uswest"), time.Now().UnixNano()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if itr, err = sh.CreateIterator(context.Background(), v.m, query.IteratorOptions{
|
||||
Aux: v.aux,
|
||||
Ascending: true,
|
||||
StartTime: influxql.MinTime,
|
||||
EndTime: influxql.MaxTime,
|
||||
Authorizer: seriesAuthorizer,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if itr == nil {
|
||||
return fmt.Errorf("iterator is nil")
|
||||
}
|
||||
defer itr.Close()
|
||||
|
||||
fitr = itr.(query.FloatIterator)
|
||||
defer fitr.Close()
|
||||
expCount = 1
|
||||
gotCount = 0
|
||||
for {
|
||||
f, err := fitr.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if f == nil {
|
||||
break
|
||||
}
|
||||
|
||||
if got := f.Aux[0].(string); strings.Contains(got, "secret") {
|
||||
return fmt.Errorf("got a series %q that should be filtered", got)
|
||||
} else if got := f.Aux[0].(string); strings.Contains(got, "serverA") {
|
||||
return fmt.Errorf("got a series %q that should be filtered", got)
|
||||
}
|
||||
gotCount++
|
||||
}
|
||||
|
||||
if gotCount != expCount {
|
||||
return fmt.Errorf("got %d series, expected %d", gotCount, expCount)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1845,3 +1892,30 @@ func MustTempDir() (string, func()) {
|
|||
}
|
||||
return dir, func() { os.RemoveAll(dir) }
|
||||
}
|
||||
|
||||
type seriesIterator struct {
|
||||
keys [][]byte
|
||||
}
|
||||
|
||||
type series struct {
|
||||
name []byte
|
||||
tags models.Tags
|
||||
deleted bool
|
||||
}
|
||||
|
||||
func (s series) Name() []byte { return s.name }
|
||||
func (s series) Tags() models.Tags { return s.tags }
|
||||
func (s series) Deleted() bool { return s.deleted }
|
||||
func (s series) Expr() influxql.Expr { return nil }
|
||||
|
||||
func (itr *seriesIterator) Close() error { return nil }
|
||||
|
||||
func (itr *seriesIterator) Next() (tsdb.SeriesElem, error) {
|
||||
if len(itr.keys) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
name, tags := models.ParseKeyBytes(itr.keys[0])
|
||||
s := series{name: name, tags: tags}
|
||||
itr.keys = itr.keys[1:]
|
||||
return s, nil
|
||||
}
|
||||
|
|
|
@ -986,7 +986,6 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
|
|||
continue
|
||||
}
|
||||
defer itr.Close()
|
||||
|
||||
if err := sh.DeleteSeriesRange(NewSeriesIteratorAdapter(sfile, itr), min, max); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -1010,6 +1010,36 @@ func TestStore_Measurements_Auth(t *testing.T) {
|
|||
if gotNames != expNames {
|
||||
return fmt.Errorf("got %d measurements, but expected %d", gotNames, expNames)
|
||||
}
|
||||
|
||||
// Now delete all of the cpu series.
|
||||
cond, err := influxql.ParseExpr("host = 'serverA' OR region = 'west'")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.DeleteSeries("db0", nil, cond); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if names, err = s.MeasurementNames(authorizer, "db0", nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// names should not contain any measurements where none of the associated
|
||||
// series are authorised for reads.
|
||||
expNames = 1
|
||||
gotNames = 0
|
||||
for _, name := range names {
|
||||
if string(name) == "mem" || string(name) == "cpu" {
|
||||
return fmt.Errorf("after delete got measurement %q but it should be filtered.", name)
|
||||
}
|
||||
gotNames++
|
||||
}
|
||||
|
||||
if gotNames != expNames {
|
||||
return fmt.Errorf("after delete got %d measurements, but expected %d", gotNames, expNames)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1020,6 +1050,7 @@ func TestStore_Measurements_Auth(t *testing.T) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestStore_TagKeys_Auth(t *testing.T) {
|
||||
|
@ -1072,6 +1103,41 @@ func TestStore_TagKeys_Auth(t *testing.T) {
|
|||
if gotKeys != expKeys {
|
||||
return fmt.Errorf("got %d keys, but expected %d", gotKeys, expKeys)
|
||||
}
|
||||
|
||||
// Delete the series with region = west
|
||||
cond, err := influxql.ParseExpr("region = 'west'")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.DeleteSeries("db0", nil, cond); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if keys, err = s.TagKeys(authorizer, []uint64{0}, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// keys should not contain any tag keys associated with a series containing
|
||||
// a secret tag or the deleted series
|
||||
expKeys = 2
|
||||
gotKeys = 0
|
||||
for _, tk := range keys {
|
||||
if got, exp := tk.Measurement, "cpu"; got != exp {
|
||||
return fmt.Errorf("got measurement %q, expected %q", got, exp)
|
||||
}
|
||||
|
||||
for _, key := range tk.Keys {
|
||||
if key == "secret" || key == "machine" || key == "region" {
|
||||
return fmt.Errorf("got tag key %q but it should be filtered.", key)
|
||||
}
|
||||
gotKeys++
|
||||
}
|
||||
}
|
||||
|
||||
if gotKeys != expKeys {
|
||||
return fmt.Errorf("got %d keys, but expected %d", gotKeys, expKeys)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1082,6 +1148,7 @@ func TestStore_TagKeys_Auth(t *testing.T) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestStore_TagValues_Auth(t *testing.T) {
|
||||
|
@ -1136,6 +1203,48 @@ func TestStore_TagValues_Auth(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
if gotValues != expValues {
|
||||
return fmt.Errorf("got %d tags, but expected %d", gotValues, expValues)
|
||||
}
|
||||
|
||||
// Delete the series with values serverA
|
||||
cond, err := influxql.ParseExpr("host = 'serverA'")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.DeleteSeries("db0", nil, cond); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
values, err = s.TagValues(authorizer, []uint64{0}, &influxql.BinaryExpr{
|
||||
Op: influxql.EQ,
|
||||
LHS: &influxql.VarRef{Val: "_tagKey"},
|
||||
RHS: &influxql.StringLiteral{Val: "host"},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// values should not contain any tag values associated with a series containing
|
||||
// a secret tag.
|
||||
expValues = 1
|
||||
gotValues = 0
|
||||
for _, tv := range values {
|
||||
if got, exp := tv.Measurement, "cpu"; got != exp {
|
||||
return fmt.Errorf("got measurement %q, expected %q", got, exp)
|
||||
}
|
||||
|
||||
for _, v := range tv.Values {
|
||||
if got, exp := v.Value, "serverD"; got == exp {
|
||||
return fmt.Errorf("got tag value %q but it should be filtered.", got)
|
||||
} else if got, exp := v.Value, "serverA"; got == exp {
|
||||
return fmt.Errorf("got tag value %q but it should be filtered.", got)
|
||||
}
|
||||
gotValues++
|
||||
}
|
||||
}
|
||||
|
||||
if gotValues != expValues {
|
||||
return fmt.Errorf("got %d tags, but expected %d", gotValues, expValues)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue