diff --git a/query/compile.go b/query/compile.go index 8761632a98..0b2a746c44 100644 --- a/query/compile.go +++ b/query/compile.go @@ -878,5 +878,6 @@ func (c *compiledStatement) Prepare(shardMapper ShardMapper, sopt SelectOptions) opt: opt, ic: shards, columns: columns, + now: c.Options.Now, }, nil } diff --git a/query/iterator.go b/query/iterator.go index 4c6eb8624a..e549e08a5c 100644 --- a/query/iterator.go +++ b/query/iterator.go @@ -797,7 +797,7 @@ func newIteratorOptionsStmt(stmt *influxql.SelectStatement, sopt SelectOptions) return opt, nil } -func newIteratorOptionsSubstatement(stmt *influxql.SelectStatement, opt IteratorOptions) (IteratorOptions, error) { +func newIteratorOptionsSubstatement(ctx context.Context, stmt *influxql.SelectStatement, opt IteratorOptions) (IteratorOptions, error) { subOpt, err := newIteratorOptionsStmt(stmt, SelectOptions{}) if err != nil { return IteratorOptions{}, err @@ -809,6 +809,11 @@ func newIteratorOptionsSubstatement(stmt *influxql.SelectStatement, opt Iterator if subOpt.EndTime > opt.EndTime { subOpt.EndTime = opt.EndTime } + if !subOpt.Interval.IsZero() && subOpt.EndTime == influxql.MaxTime { + if now := ctx.Value("now"); now != nil { + subOpt.EndTime = now.(time.Time).UnixNano() + } + } // Propagate the dimensions to the inner subquery. subOpt.Dimensions = opt.Dimensions for d := range opt.GroupBy { diff --git a/query/select.go b/query/select.go index cf9f23ea81..c071410eca 100644 --- a/query/select.go +++ b/query/select.go @@ -7,6 +7,7 @@ import ( "io" "math" "sort" + "time" "github.com/influxdata/influxdb/pkg/tracing" "github.com/influxdata/influxql" @@ -97,9 +98,14 @@ type preparedStatement struct { io.Closer } columns []string + now time.Time } func (p *preparedStatement) Select(ctx context.Context) ([]Iterator, []string, error) { + // TODO(jsternberg): Remove this hacky method of propagating now. + // Each level of the query should use a time range discovered during + // compilation, but that requires too large of a refactor at the moment. + ctx = context.WithValue(ctx, "now", p.now) itrs, err := buildIterators(ctx, p.stmt, p.ic, p.opt) if err != nil { return nil, nil, err diff --git a/query/subquery.go b/query/subquery.go index b19e963530..4a6978b26a 100644 --- a/query/subquery.go +++ b/query/subquery.go @@ -24,7 +24,7 @@ func (b *subqueryBuilder) buildAuxIterator(ctx context.Context, opt IteratorOpti // Map the desired auxiliary fields from the substatement. indexes := b.mapAuxFields(auxFields) - subOpt, err := newIteratorOptionsSubstatement(b.stmt, opt) + subOpt, err := newIteratorOptionsSubstatement(ctx, b.stmt, opt) if err != nil { return nil, err } @@ -114,7 +114,7 @@ func (b *subqueryBuilder) buildVarRefIterator(ctx context.Context, expr *influxq // Map the auxiliary fields to their index in the subquery. indexes := b.mapAuxFields(auxFields) - subOpt, err := newIteratorOptionsSubstatement(b.stmt, opt) + subOpt, err := newIteratorOptionsSubstatement(ctx, b.stmt, opt) if err != nil { return nil, err }