influxdb/coordinator/shard_mapper.go

221 lines
5.3 KiB
Go

package coordinator
import (
"io"
"time"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
)
// IteratorCreator is an interface that combines mapping fields and creating iterators.
type IteratorCreator interface {
query.IteratorCreator
influxql.FieldMapper
io.Closer
}
// LocalShardMapper implements a ShardMapper for local shards.
type LocalShardMapper struct {
MetaClient interface {
ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
}
TSDBStore interface {
ShardGroup(ids []uint64) tsdb.ShardGroup
}
}
// MapShards maps the sources to the appropriate shards into an IteratorCreator.
func (e *LocalShardMapper) MapShards(sources influxql.Sources, t influxql.TimeRange, opt query.SelectOptions) (query.ShardGroup, error) {
a := &LocalShardMapping{
ShardMap: make(map[Source]tsdb.ShardGroup),
}
tmin := time.Unix(0, t.MinTime())
tmax := time.Unix(0, t.MaxTime())
if err := e.mapShards(a, sources, tmin, tmax); err != nil {
return nil, err
}
return a, nil
}
func (e *LocalShardMapper) mapShards(a *LocalShardMapping, sources influxql.Sources, tmin, tmax time.Time) error {
for _, s := range sources {
switch s := s.(type) {
case *influxql.Measurement:
source := Source{
Database: s.Database,
RetentionPolicy: s.RetentionPolicy,
}
// Retrieve the list of shards for this database. This list of
// shards is always the same regardless of which measurement we are
// using.
if _, ok := a.ShardMap[source]; !ok {
groups, err := e.MetaClient.ShardGroupsByTimeRange(s.Database, s.RetentionPolicy, tmin, tmax)
if err != nil {
return err
}
if len(groups) == 0 {
a.ShardMap[source] = nil
continue
}
shardIDs := make([]uint64, 0, len(groups[0].Shards)*len(groups))
for _, g := range groups {
for _, si := range g.Shards {
shardIDs = append(shardIDs, si.ID)
}
}
a.ShardMap[source] = e.TSDBStore.ShardGroup(shardIDs)
}
case *influxql.SubQuery:
if err := e.mapShards(a, s.Statement.Sources, tmin, tmax); err != nil {
return err
}
}
}
return nil
}
// ShardMapper maps data sources to a list of shard information.
type LocalShardMapping struct {
ShardMap map[Source]tsdb.ShardGroup
}
func (a *LocalShardMapping) FieldDimensions(m *influxql.Measurement) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
source := Source{
Database: m.Database,
RetentionPolicy: m.RetentionPolicy,
}
sg := a.ShardMap[source]
if sg == nil {
return
}
fields = make(map[string]influxql.DataType)
dimensions = make(map[string]struct{})
var measurements []string
if m.Regex != nil {
measurements = sg.MeasurementsByRegex(m.Regex.Val)
} else {
measurements = []string{m.Name}
}
f, d, err := sg.FieldDimensions(measurements)
if err != nil {
return nil, nil, err
}
for k, typ := range f {
fields[k] = typ
}
for k := range d {
dimensions[k] = struct{}{}
}
return
}
func (a *LocalShardMapping) MapType(m *influxql.Measurement, field string) influxql.DataType {
source := Source{
Database: m.Database,
RetentionPolicy: m.RetentionPolicy,
}
sg := a.ShardMap[source]
if sg == nil {
return influxql.Unknown
}
var names []string
if m.Regex != nil {
names = sg.MeasurementsByRegex(m.Regex.Val)
} else {
names = []string{m.Name}
}
var typ influxql.DataType
for _, name := range names {
t := sg.MapType(name, field)
if typ.LessThan(t) {
typ = t
}
}
return typ
}
func (a *LocalShardMapping) CreateIterator(m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {
source := Source{
Database: m.Database,
RetentionPolicy: m.RetentionPolicy,
}
sg := a.ShardMap[source]
if sg == nil {
return nil, nil
}
if m.Regex != nil {
measurements := sg.MeasurementsByRegex(m.Regex.Val)
inputs := make([]query.Iterator, 0, len(measurements))
if err := func() error {
for _, measurement := range measurements {
input, err := sg.CreateIterator(measurement, opt)
if err != nil {
return err
}
inputs = append(inputs, input)
}
return nil
}(); err != nil {
query.Iterators(inputs).Close()
return nil, err
}
return query.Iterators(inputs).Merge(opt)
}
return sg.CreateIterator(m.Name, opt)
}
func (a *LocalShardMapping) IteratorCost(m *influxql.Measurement, opt query.IteratorOptions) (query.IteratorCost, error) {
source := Source{
Database: m.Database,
RetentionPolicy: m.RetentionPolicy,
}
sg := a.ShardMap[source]
if sg == nil {
return query.IteratorCost{}, nil
}
if m.Regex != nil {
var costs query.IteratorCost
measurements := sg.MeasurementsByRegex(m.Regex.Val)
for _, measurement := range measurements {
cost, err := sg.IteratorCost(measurement, opt)
if err != nil {
return query.IteratorCost{}, err
}
costs = costs.Combine(cost)
}
return costs, nil
}
return sg.IteratorCost(m.Name, opt)
}
// Close clears out the list of mapped shards.
func (a *LocalShardMapping) Close() error {
a.ShardMap = nil
return nil
}
// Source contains the database and retention policy source for data.
type Source struct {
Database string
RetentionPolicy string
}