add node id to execution options

This commit changes the `ExecutionOptions` and `SelectOptions` to
allow a `NodeID` for specifying an exact node to query against.
pull/6820/head
Ben Johnson 2016-06-10 09:14:21 -06:00
parent c8e90fa4ee
commit 7d4bea7153
No known key found for this signature in database
GPG Key ID: 780E98C6BEDA0915
7 changed files with 20 additions and 7 deletions

View File

@ -23,6 +23,7 @@
- [#6713](https://github.com/influxdata/influxdb/pull/6713): Reduce allocations during query parsing. - [#6713](https://github.com/influxdata/influxdb/pull/6713): Reduce allocations during query parsing.
- [#3733](https://github.com/influxdata/influxdb/issues/3733): Modify the default retention policy name and make it configurable. - [#3733](https://github.com/influxdata/influxdb/issues/3733): Modify the default retention policy name and make it configurable.
- [#5655](https://github.com/influxdata/influxdb/issues/5655): Support specifying a retention policy for the graphite service. - [#5655](https://github.com/influxdata/influxdb/issues/5655): Support specifying a retention policy for the graphite service.
- [#6820](https://github.com/influxdata/influxdb/issues/6820): Add NodeID to execution options
### Bugfixes ### Bugfixes

View File

@ -415,7 +415,10 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
// It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now` // It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now`
now := time.Now().UTC() now := time.Now().UTC()
opt := influxql.SelectOptions{InterruptCh: ctx.InterruptCh} opt := influxql.SelectOptions{
InterruptCh: ctx.InterruptCh,
NodeID: ctx.ExecutionOptions.NodeID,
}
// Replace instances of "now()" with the current time, and check the resultant times. // Replace instances of "now()" with the current time, and check the resultant times.
nowValuer := influxql.NowValuer{Now: now} nowValuer := influxql.NowValuer{Now: now}
@ -604,7 +607,7 @@ func (e *StatementExecutor) iteratorCreator(stmt *influxql.SelectStatement, opt
if err != nil { if err != nil {
return nil, err return nil, err
} }
return e.TSDBStore.IteratorCreator(shards) return e.TSDBStore.IteratorCreator(shards, opt)
} }
func (e *StatementExecutor) executeShowTagValues(stmt *influxql.SelectStatement, ctx *influxql.ExecutionContext, store LocalTSDBStore) error { func (e *StatementExecutor) executeShowTagValues(stmt *influxql.SelectStatement, ctx *influxql.ExecutionContext, store LocalTSDBStore) error {
@ -1142,19 +1145,19 @@ type TSDBStore interface {
DeleteRetentionPolicy(database, name string) error DeleteRetentionPolicy(database, name string) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteShard(id uint64) error DeleteShard(id uint64) error
IteratorCreator(shards []meta.ShardInfo) (influxql.IteratorCreator, error) IteratorCreator(shards []meta.ShardInfo, opt *influxql.SelectOptions) (influxql.IteratorCreator, error)
} }
type LocalTSDBStore struct { type LocalTSDBStore struct {
*tsdb.Store *tsdb.Store
} }
func (s LocalTSDBStore) IteratorCreator(shards []meta.ShardInfo) (influxql.IteratorCreator, error) { func (s LocalTSDBStore) IteratorCreator(shards []meta.ShardInfo, opt *influxql.SelectOptions) (influxql.IteratorCreator, error) {
shardIDs := make([]uint64, len(shards)) shardIDs := make([]uint64, len(shards))
for i, sh := range shards { for i, sh := range shards {
shardIDs[i] = sh.ID shardIDs[i] = sh.ID
} }
return s.Store.IteratorCreator(shardIDs) return s.Store.IteratorCreator(shardIDs, opt)
} }
// ShardIteratorCreator is an interface for creating an IteratorCreator to access a specific shard. // ShardIteratorCreator is an interface for creating an IteratorCreator to access a specific shard.

View File

@ -247,7 +247,7 @@ func (s *TSDBStore) DeleteSeries(database string, sources []influxql.Source, con
return s.DeleteSeriesFn(database, sources, condition) return s.DeleteSeriesFn(database, sources, condition)
} }
func (s *TSDBStore) IteratorCreator(shards []meta.ShardInfo) (influxql.IteratorCreator, error) { func (s *TSDBStore) IteratorCreator(shards []meta.ShardInfo, opt *influxql.SelectOptions) (influxql.IteratorCreator, error) {
// Generate iterators for each node. // Generate iterators for each node.
ics := make([]influxql.IteratorCreator, 0) ics := make([]influxql.IteratorCreator, 0)
if err := func() error { if err := func() error {

View File

@ -70,6 +70,9 @@ type ExecutionOptions struct {
// If this query is being executed in a read-only context. // If this query is being executed in a read-only context.
ReadOnly bool ReadOnly bool
// Node to execute on.
NodeID uint64
} }
// ExecutionContext contains state that the query is currently executing with. // ExecutionContext contains state that the query is currently executing with.

View File

@ -15,6 +15,10 @@ type SelectOptions struct {
// The upper bound for a select call. // The upper bound for a select call.
MaxTime time.Time MaxTime time.Time
// Node to exclusively read from.
// If zero, all nodes are used.
NodeID uint64
// An optional channel that, if closed, signals that the select should be // An optional channel that, if closed, signals that the select should be
// interrupted. // interrupted.
InterruptCh <-chan struct{} InterruptCh <-chan struct{}

View File

@ -267,6 +267,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
}(time.Now()) }(time.Now())
pretty := r.FormValue("pretty") == "true" pretty := r.FormValue("pretty") == "true"
nodeID, _ := strconv.ParseUint(r.FormValue("node_id"), 10, 64)
qp := strings.TrimSpace(r.FormValue("q")) qp := strings.TrimSpace(r.FormValue("q"))
if qp == "" { if qp == "" {
@ -370,6 +371,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
Database: db, Database: db,
ChunkSize: chunkSize, ChunkSize: chunkSize,
ReadOnly: r.Method == "GET", ReadOnly: r.Method == "GET",
NodeID: nodeID,
}, closing) }, closing)
// if we're not chunking, this will be the in memory buffer for all results before sending to client // if we're not chunking, this will be the in memory buffer for all results before sending to client

View File

@ -709,7 +709,7 @@ func (s *Store) IteratorCreators() influxql.IteratorCreators {
return a return a
} }
func (s *Store) IteratorCreator(shards []uint64) (influxql.IteratorCreator, error) { func (s *Store) IteratorCreator(shards []uint64, opt *influxql.SelectOptions) (influxql.IteratorCreator, error) {
// Generate iterators for each node. // Generate iterators for each node.
ics := make([]influxql.IteratorCreator, 0) ics := make([]influxql.IteratorCreator, 0)
if err := func() error { if err := func() error {