Remove time comparisons from the inner sections of the storage engine
parent
8bd04ebe39
commit
697759613c
|
@ -4248,28 +4248,6 @@ func HasTimeExpr(expr Expr) bool {
|
|||
}
|
||||
}
|
||||
|
||||
// OnlyTimeExpr returns true if the expression only has time constraints.
|
||||
func OnlyTimeExpr(expr Expr) bool {
|
||||
if expr == nil {
|
||||
return false
|
||||
}
|
||||
switch n := expr.(type) {
|
||||
case *BinaryExpr:
|
||||
if n.Op == AND || n.Op == OR {
|
||||
return OnlyTimeExpr(n.LHS) && OnlyTimeExpr(n.RHS)
|
||||
}
|
||||
if ref, ok := n.LHS.(*VarRef); ok && strings.ToLower(ref.Val) == "time" {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
case *ParenExpr:
|
||||
// walk down the tree
|
||||
return OnlyTimeExpr(n.Expr)
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Visitor can be called by Walk to traverse an AST hierarchy.
|
||||
// The Visit() function is called once per node.
|
||||
type Visitor interface {
|
||||
|
|
|
@ -949,46 +949,6 @@ func TestConditionExpr(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure that we see if a where clause has only time limitations
|
||||
func TestOnlyTimeExpr(t *testing.T) {
|
||||
var tests = []struct {
|
||||
stmt string
|
||||
exp bool
|
||||
}{
|
||||
{
|
||||
stmt: `SELECT value FROM myseries WHERE value > 1`,
|
||||
exp: false,
|
||||
},
|
||||
{
|
||||
stmt: `SELECT value FROM foo WHERE time >= '2000-01-01T00:00:05Z'`,
|
||||
exp: true,
|
||||
},
|
||||
{
|
||||
stmt: `SELECT value FROM foo WHERE time >= '2000-01-01T00:00:05Z' AND time < '2000-01-01T00:00:05Z'`,
|
||||
exp: true,
|
||||
},
|
||||
{
|
||||
stmt: `SELECT value FROM foo WHERE time >= '2000-01-01T00:00:05Z' AND asdf = 'bar'`,
|
||||
exp: false,
|
||||
},
|
||||
{
|
||||
stmt: `SELECT value FROM foo WHERE asdf = 'jkl' AND (time >= '2000-01-01T00:00:05Z' AND time < '2000-01-01T00:00:05Z')`,
|
||||
exp: false,
|
||||
},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
// Parse statement.
|
||||
stmt, err := influxql.NewParser(strings.NewReader(tt.stmt)).ParseStatement()
|
||||
if err != nil {
|
||||
t.Fatalf("invalid statement: %q: %s", tt.stmt, err)
|
||||
}
|
||||
if influxql.OnlyTimeExpr(stmt.(*influxql.SelectStatement).Condition) != tt.exp {
|
||||
t.Fatalf("%d. expected statement to return only time dimension to be %t: %s", i, tt.exp, tt.stmt)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure an AST node can be rewritten.
|
||||
func TestRewrite(t *testing.T) {
|
||||
expr := MustParseExpr(`time > 1 OR foo = 2`)
|
||||
|
|
|
@ -70,7 +70,6 @@ type Engine interface {
|
|||
|
||||
// InfluxQL iterators
|
||||
MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error)
|
||||
ForEachMeasurementSeriesByExpr(name []byte, expr influxql.Expr, fn func(tags models.Tags) error) error
|
||||
SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error)
|
||||
|
||||
// Statistics will return statistics relevant to this engine.
|
||||
|
|
|
@ -337,10 +337,6 @@ func (e *Engine) MeasurementFields(measurement []byte) *tsdb.MeasurementFields {
|
|||
return e.fieldset.CreateFieldsIfNotExists(measurement)
|
||||
}
|
||||
|
||||
func (e *Engine) ForEachMeasurementSeriesByExpr(name []byte, condition influxql.Expr, fn func(tags models.Tags) error) error {
|
||||
return e.index.ForEachMeasurementSeriesByExpr(name, condition, fn)
|
||||
}
|
||||
|
||||
func (e *Engine) HasTagKey(name, key []byte) (bool, error) {
|
||||
return e.index.HasTagKey(name, key)
|
||||
}
|
||||
|
|
|
@ -43,7 +43,6 @@ type Index interface {
|
|||
|
||||
// InfluxQL system iterators
|
||||
MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error)
|
||||
ForEachMeasurementSeriesByExpr(name []byte, expr influxql.Expr, fn func(tags models.Tags) error) error
|
||||
SeriesPointIterator(opt query.IteratorOptions) (query.Iterator, error)
|
||||
|
||||
// Sets a shared fieldset from the engine.
|
||||
|
|
|
@ -614,23 +614,6 @@ func (i *Index) DropSeries(key []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// ForEachMeasurementSeriesByExpr iterates over all series in a measurement filtered by an expression.
|
||||
func (i *Index) ForEachMeasurementSeriesByExpr(name []byte, expr influxql.Expr, fn func(tags models.Tags) error) error {
|
||||
i.mu.RLock()
|
||||
mm := i.measurements[string(name)]
|
||||
i.mu.RUnlock()
|
||||
|
||||
if mm == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := mm.ForEachSeriesByExpr(expr, fn); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// TagSets returns a list of tag sets.
|
||||
func (i *Index) TagSets(shardID uint64, name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) {
|
||||
i.mu.RLock()
|
||||
|
|
|
@ -276,7 +276,7 @@ func (m *Measurement) DropSeries(series *Series) {
|
|||
// filters walks the where clause of a select statement and returns a map with all series ids
|
||||
// matching the where clause and any filter expression that should be applied to each
|
||||
func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]influxql.Expr, error) {
|
||||
if condition == nil || influxql.OnlyTimeExpr(condition) {
|
||||
if condition == nil {
|
||||
return m.SeriesIDs(), nil, nil
|
||||
}
|
||||
return m.WalkWhereForSeriesIds(condition)
|
||||
|
@ -557,11 +557,6 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex
|
|||
value = n.LHS
|
||||
}
|
||||
|
||||
// For time literals, return all series IDs and "true" as the filter.
|
||||
if _, ok := value.(*influxql.TimeLiteral); ok || name.Val == "time" {
|
||||
return m.SeriesIDs(), &influxql.BooleanLiteral{Val: true}, nil
|
||||
}
|
||||
|
||||
// For fields, return all series IDs from this measurement and return
|
||||
// the expression passed in, as the filter.
|
||||
if name.Val != "_name" && ((name.Type == influxql.Unknown && m.HasField(name.Val)) || name.Type == influxql.AnyField || (name.Type != influxql.Tag && name.Type != influxql.Unknown)) {
|
||||
|
|
|
@ -138,27 +138,6 @@ func TestMeasurement_TagsSet_Deadlock(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMeasurement_ForEachSeriesByExpr_Deadlock(t *testing.T) {
|
||||
m := inmem.NewMeasurement("foo", "cpu")
|
||||
s1 := inmem.NewSeries([]byte("cpu,host=foo"), models.Tags{models.NewTag([]byte("host"), []byte("foo"))})
|
||||
s1.ID = 1
|
||||
m.AddSeries(s1)
|
||||
|
||||
s2 := inmem.NewSeries([]byte("cpu,host=bar"), models.Tags{models.NewTag([]byte("host"), []byte("bar"))})
|
||||
s2.ID = 2
|
||||
m.AddSeries(s2)
|
||||
|
||||
m.DropSeries(s1)
|
||||
|
||||
// This was deadlocking
|
||||
m.ForEachSeriesByExpr(nil, func(tags models.Tags) error {
|
||||
return nil
|
||||
})
|
||||
if got, exp := len(m.SeriesIDs()), 1; got != exp {
|
||||
t.Fatalf("series count mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMeasurement_SeriesIDForExp_EQRegex(b *testing.B) {
|
||||
m := inmem.NewMeasurement("foo", "cpu")
|
||||
for i := 0; i < 100000; i++ {
|
||||
|
|
|
@ -736,7 +736,7 @@ func (fs *FileSet) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, e
|
|||
// call is equivalent to MeasurementSeriesIterator().
|
||||
func (fs *FileSet) MeasurementSeriesByExprIterator(name []byte, expr influxql.Expr, fieldset *tsdb.MeasurementFieldSet) (SeriesIterator, error) {
|
||||
// Return all series for the measurement if there are no tag expressions.
|
||||
if expr == nil || influxql.OnlyTimeExpr(expr) {
|
||||
if expr == nil {
|
||||
return fs.MeasurementSeriesIterator(name), nil
|
||||
}
|
||||
return fs.seriesByExprIterator(name, expr, fieldset.CreateFieldsIfNotExists(name))
|
||||
|
@ -824,11 +824,6 @@ func (fs *FileSet) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExp
|
|||
value = n.LHS
|
||||
}
|
||||
|
||||
// For time literals, return all series and "true" as the filter.
|
||||
if _, ok := value.(*influxql.TimeLiteral); ok || key.Val == "time" {
|
||||
return newSeriesExprIterator(fs.MeasurementSeriesIterator(name), &influxql.BooleanLiteral{Val: true}), nil
|
||||
}
|
||||
|
||||
// For fields, return all series from this measurement.
|
||||
if key.Val != "_name" && ((key.Type == influxql.Unknown && mf.HasField(key.Val)) || key.Type == influxql.AnyField || (key.Type != influxql.Tag && key.Type != influxql.Unknown)) {
|
||||
return newSeriesExprIterator(fs.MeasurementSeriesIterator(name), n), nil
|
||||
|
|
|
@ -670,27 +670,6 @@ func (i *Index) MeasurementTagKeyValuesByExpr(name []byte, keys []string, expr i
|
|||
return results, nil
|
||||
}
|
||||
|
||||
// ForEachMeasurementSeriesByExpr iterates over all series in a measurement filtered by an expression.
|
||||
func (i *Index) ForEachMeasurementSeriesByExpr(name []byte, condition influxql.Expr, fn func(tags models.Tags) error) error {
|
||||
fs := i.RetainFileSet()
|
||||
defer fs.Release()
|
||||
|
||||
itr, err := fs.MeasurementSeriesByExprIterator(name, condition, i.fieldset)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if itr == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for e := itr.Next(); e != nil; e = itr.Next() {
|
||||
if err := fn(e.Tags()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ForEachMeasurementTagKey iterates over all tag keys in a measurement.
|
||||
func (i *Index) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error {
|
||||
fs := i.RetainFileSet()
|
||||
|
|
|
@ -829,7 +829,7 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
|
|||
sources = a
|
||||
|
||||
// Determine deletion time range.
|
||||
_, timeRange, err := influxql.ConditionExpr(condition, nil)
|
||||
condition, timeRange, err := influxql.ConditionExpr(condition, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue