diff --git a/coordinator/shard_mapper.go b/coordinator/shard_mapper.go index 6c0b402414..ff28ae326a 100644 --- a/coordinator/shard_mapper.go +++ b/coordinator/shard_mapper.go @@ -181,8 +181,9 @@ func (a *LocalShardMapping) CreateIterator(m *influxql.Measurement, opt query.It return sg.CreateIterator(m.Name, opt) } -// Close does nothing for a LocalShardMapping. +// Close clears out the list of mapped shards. func (a *LocalShardMapping) Close() error { + a.ShardMap = nil return nil } diff --git a/query/compile.go b/query/compile.go index b4d83e401d..c70387e5ec 100644 --- a/query/compile.go +++ b/query/compile.go @@ -65,23 +65,25 @@ func (c *compiledStatement) Prepare(shardMapper ShardMapper, sopt SelectOptions) if err != nil { return nil, err } - defer shards.Close() // Rewrite wildcards, if any exist. stmt, err := c.stmt.RewriteFields(shards) if err != nil { + shards.Close() return nil, err } // Determine base options for iterators. opt, err := newIteratorOptionsStmt(stmt, sopt) if err != nil { + shards.Close() return nil, err } if sopt.MaxBucketsN > 0 && !stmt.IsRawQuery { interval, err := stmt.GroupByInterval() if err != nil { + shards.Close() return nil, err } @@ -93,6 +95,7 @@ func (c *compiledStatement) Prepare(shardMapper ShardMapper, sopt SelectOptions) // Determine the number of buckets by finding the time span and dividing by the interval. buckets := (last - first + int64(interval)) / int64(interval) if int(buckets) > sopt.MaxBucketsN { + shards.Close() return nil, fmt.Errorf("max-select-buckets limit exceeded: (%d/%d)", buckets, sopt.MaxBucketsN) } } diff --git a/query/select.go b/query/select.go index 89ac933757..d35f6962f8 100644 --- a/query/select.go +++ b/query/select.go @@ -38,6 +38,13 @@ type ShardMapper interface { // ShardGroup represents a shard or a collection of shards that can be accessed // for creating iterators. +// When creating iterators, the resource used for reading the iterators should be +// separate from the resource used to map the shards. When the ShardGroup is closed, +// it should not close any resources associated with the created Iterator. Those +// resources belong to the Iterator and will be closed when the Iterator itself is +// closed. +// The query engine operates under this assumption and will close the shard group +// after creating the iterators, but before the iterators are actually read. type ShardGroup interface { IteratorCreator influxql.FieldMapper @@ -48,6 +55,11 @@ type ShardGroup interface { type PreparedStatement interface { // Select creates the Iterators that will be used to read the query. Select() ([]Iterator, []string, error) + + // Close closes the resources associated with this prepared statement. + // This must be called as the mapped shards may hold open resources such + // as network connections. + Close() error } // Prepare will compile the statement with the default compile options and @@ -67,13 +79,18 @@ func Select(stmt *influxql.SelectStatement, shardMapper ShardMapper, opt SelectO if err != nil { return nil, nil, err } + // Must be deferred so it runs after Select. + defer s.Close() return s.Select() } type preparedStatement struct { - stmt *influxql.SelectStatement - opt IteratorOptions - ic IteratorCreator + stmt *influxql.SelectStatement + opt IteratorOptions + ic interface { + IteratorCreator + io.Closer + } columns []string } @@ -85,6 +102,10 @@ func (p *preparedStatement) Select() ([]Iterator, []string, error) { return itrs, p.columns, nil } +func (p *preparedStatement) Close() error { + return p.ic.Close() +} + func buildIterators(stmt *influxql.SelectStatement, ic IteratorCreator, opt IteratorOptions) ([]Iterator, error) { // Retrieve refs for each call and var ref. info := newSelectInfo(stmt)