Close the query shard group after the iterators are created

Now, the prepared statement keeps the open resource and closing the open
resource created from `Prepare` is the responsibility of the prepared
statement.

This also nils out the local shard mapping after it is closed to prevent
it from being used after it is closed.
pull/8750/head
Jonathan A. Sternberg 2017-08-28 09:34:36 -05:00
parent b61b030b8c
commit d2fcb893e1
3 changed files with 30 additions and 5 deletions

View File

@ -181,8 +181,9 @@ func (a *LocalShardMapping) CreateIterator(m *influxql.Measurement, opt query.It
return sg.CreateIterator(m.Name, opt)
}
// Close does nothing for a LocalShardMapping.
// Close clears out the list of mapped shards.
func (a *LocalShardMapping) Close() error {
a.ShardMap = nil
return nil
}

View File

@ -65,23 +65,25 @@ func (c *compiledStatement) Prepare(shardMapper ShardMapper, sopt SelectOptions)
if err != nil {
return nil, err
}
defer shards.Close()
// Rewrite wildcards, if any exist.
stmt, err := c.stmt.RewriteFields(shards)
if err != nil {
shards.Close()
return nil, err
}
// Determine base options for iterators.
opt, err := newIteratorOptionsStmt(stmt, sopt)
if err != nil {
shards.Close()
return nil, err
}
if sopt.MaxBucketsN > 0 && !stmt.IsRawQuery {
interval, err := stmt.GroupByInterval()
if err != nil {
shards.Close()
return nil, err
}
@ -93,6 +95,7 @@ func (c *compiledStatement) Prepare(shardMapper ShardMapper, sopt SelectOptions)
// Determine the number of buckets by finding the time span and dividing by the interval.
buckets := (last - first + int64(interval)) / int64(interval)
if int(buckets) > sopt.MaxBucketsN {
shards.Close()
return nil, fmt.Errorf("max-select-buckets limit exceeded: (%d/%d)", buckets, sopt.MaxBucketsN)
}
}

View File

@ -38,6 +38,13 @@ type ShardMapper interface {
// ShardGroup represents a shard or a collection of shards that can be accessed
// for creating iterators.
// When creating iterators, the resource used for reading the iterators should be
// separate from the resource used to map the shards. When the ShardGroup is closed,
// it should not close any resources associated with the created Iterator. Those
// resources belong to the Iterator and will be closed when the Iterator itself is
// closed.
// The query engine operates under this assumption and will close the shard group
// after creating the iterators, but before the iterators are actually read.
type ShardGroup interface {
IteratorCreator
influxql.FieldMapper
@ -48,6 +55,11 @@ type ShardGroup interface {
type PreparedStatement interface {
// Select creates the Iterators that will be used to read the query.
Select() ([]Iterator, []string, error)
// Close closes the resources associated with this prepared statement.
// This must be called as the mapped shards may hold open resources such
// as network connections.
Close() error
}
// Prepare will compile the statement with the default compile options and
@ -67,13 +79,18 @@ func Select(stmt *influxql.SelectStatement, shardMapper ShardMapper, opt SelectO
if err != nil {
return nil, nil, err
}
// Must be deferred so it runs after Select.
defer s.Close()
return s.Select()
}
type preparedStatement struct {
stmt *influxql.SelectStatement
opt IteratorOptions
ic IteratorCreator
stmt *influxql.SelectStatement
opt IteratorOptions
ic interface {
IteratorCreator
io.Closer
}
columns []string
}
@ -85,6 +102,10 @@ func (p *preparedStatement) Select() ([]Iterator, []string, error) {
return itrs, p.columns, nil
}
func (p *preparedStatement) Close() error {
return p.ic.Close()
}
func buildIterators(stmt *influxql.SelectStatement, ic IteratorCreator, opt IteratorOptions) ([]Iterator, error) {
// Retrieve refs for each call and var ref.
info := newSelectInfo(stmt)