package coordinator import ( "context" "io" "time" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxql" ) // IteratorCreator is an interface that combines mapping fields and creating iterators. type IteratorCreator interface { query.IteratorCreator influxql.FieldMapper io.Closer } // LocalShardMapper implements a ShardMapper for local shards. type LocalShardMapper struct { MetaClient interface { ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) } TSDBStore interface { ShardGroup(ids []uint64) tsdb.ShardGroup } } // MapShards maps the sources to the appropriate shards into an IteratorCreator. func (e *LocalShardMapper) MapShards(sources influxql.Sources, t influxql.TimeRange, opt query.SelectOptions) (query.ShardGroup, error) { a := &LocalShardMapping{ ShardMap: make(map[Source]tsdb.ShardGroup), } tmin := time.Unix(0, t.MinTimeNano()) tmax := time.Unix(0, t.MaxTimeNano()) if err := e.mapShards(a, sources, tmin, tmax); err != nil { return nil, err } a.MinTime, a.MaxTime = tmin, tmax return a, nil } func (e *LocalShardMapper) mapShards(a *LocalShardMapping, sources influxql.Sources, tmin, tmax time.Time) error { for _, s := range sources { switch s := s.(type) { case *influxql.Measurement: source := Source{ Database: s.Database, RetentionPolicy: s.RetentionPolicy, } // Retrieve the list of shards for this database. This list of // shards is always the same regardless of which measurement we are // using. if _, ok := a.ShardMap[source]; !ok { groups, err := e.MetaClient.ShardGroupsByTimeRange(s.Database, s.RetentionPolicy, tmin, tmax) if err != nil { return err } if len(groups) == 0 { a.ShardMap[source] = nil continue } shardIDs := make([]uint64, 0, len(groups[0].Shards)*len(groups)) for _, g := range groups { for _, si := range g.Shards { shardIDs = append(shardIDs, si.ID) } } a.ShardMap[source] = e.TSDBStore.ShardGroup(shardIDs) } case *influxql.SubQuery: if err := e.mapShards(a, s.Statement.Sources, tmin, tmax); err != nil { return err } } } return nil } // ShardMapper maps data sources to a list of shard information. type LocalShardMapping struct { ShardMap map[Source]tsdb.ShardGroup // MinTime is the minimum time that this shard mapper will allow. // Any attempt to use a time before this one will automatically result in using // this time instead. MinTime time.Time // MaxTime is the maximum time that this shard mapper will allow. // Any attempt to use a time after this one will automatically result in using // this time instead. MaxTime time.Time } func (a *LocalShardMapping) FieldDimensions(m *influxql.Measurement) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) { source := Source{ Database: m.Database, RetentionPolicy: m.RetentionPolicy, } sg := a.ShardMap[source] if sg == nil { return } fields = make(map[string]influxql.DataType) dimensions = make(map[string]struct{}) var measurements []string if m.Regex != nil { measurements = sg.MeasurementsByRegex(m.Regex.Val) } else { measurements = []string{m.Name} } f, d, err := sg.FieldDimensions(measurements) if err != nil { return nil, nil, err } for k, typ := range f { fields[k] = typ } for k := range d { dimensions[k] = struct{}{} } return } func (a *LocalShardMapping) MapType(m *influxql.Measurement, field string) influxql.DataType { source := Source{ Database: m.Database, RetentionPolicy: m.RetentionPolicy, } sg := a.ShardMap[source] if sg == nil { return influxql.Unknown } var names []string if m.Regex != nil { names = sg.MeasurementsByRegex(m.Regex.Val) } else { names = []string{m.Name} } var typ influxql.DataType for _, name := range names { if m.SystemIterator != "" { name = m.SystemIterator } t := sg.MapType(name, field) if typ.LessThan(t) { typ = t } } return typ } func (a *LocalShardMapping) CreateIterator(ctx context.Context, m *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) { source := Source{ Database: m.Database, RetentionPolicy: m.RetentionPolicy, } sg := a.ShardMap[source] if sg == nil { return nil, nil } // Override the time constraints if they don't match each other. if !a.MinTime.IsZero() && opt.StartTime < a.MinTime.UnixNano() { opt.StartTime = a.MinTime.UnixNano() } if !a.MaxTime.IsZero() && opt.EndTime > a.MaxTime.UnixNano() { opt.EndTime = a.MaxTime.UnixNano() } if m.Regex != nil { measurements := sg.MeasurementsByRegex(m.Regex.Val) inputs := make([]query.Iterator, 0, len(measurements)) if err := func() error { // Create a Measurement for each returned matching measurement value // from the regex. for _, measurement := range measurements { mm := m.Clone() mm.Name = measurement // Set the name to this matching regex value. input, err := sg.CreateIterator(ctx, mm, opt) if err != nil { return err } inputs = append(inputs, input) } return nil }(); err != nil { query.Iterators(inputs).Close() return nil, err } return query.Iterators(inputs).Merge(opt) } return sg.CreateIterator(ctx, m, opt) } func (a *LocalShardMapping) IteratorCost(m *influxql.Measurement, opt query.IteratorOptions) (query.IteratorCost, error) { source := Source{ Database: m.Database, RetentionPolicy: m.RetentionPolicy, } sg := a.ShardMap[source] if sg == nil { return query.IteratorCost{}, nil } // Override the time constraints if they don't match each other. if !a.MinTime.IsZero() && opt.StartTime < a.MinTime.UnixNano() { opt.StartTime = a.MinTime.UnixNano() } if !a.MaxTime.IsZero() && opt.EndTime > a.MaxTime.UnixNano() { opt.EndTime = a.MaxTime.UnixNano() } if m.Regex != nil { var costs query.IteratorCost measurements := sg.MeasurementsByRegex(m.Regex.Val) for _, measurement := range measurements { cost, err := sg.IteratorCost(measurement, opt) if err != nil { return query.IteratorCost{}, err } costs = costs.Combine(cost) } return costs, nil } return sg.IteratorCost(m.Name, opt) } // Close clears out the list of mapped shards. func (a *LocalShardMapping) Close() error { a.ShardMap = nil return nil } // Source contains the database and retention policy source for data. type Source struct { Database string RetentionPolicy string }