add SHOW TAG KEYS support

pull/5196/head
Ben Johnson 2016-02-04 11:00:50 -07:00
parent 607750ab1b
commit 47c2bab74b
5 changed files with 127 additions and 366 deletions

View File

@ -585,22 +585,46 @@ func (q *QueryExecutor) planShowTagKeys(stmt *influxql.ShowTagKeysStatement, dat
return nil, errors.New("SHOW TAG KEYS doesn't support time in WHERE clause")
}
panic("FIXME: Implement SHOW TAG KEYS")
condition := stmt.Condition
if len(stmt.Sources) > 0 {
if source, ok := stmt.Sources[0].(*influxql.Measurement); ok {
var expr influxql.Expr
if source.Regex != nil {
expr = &influxql.BinaryExpr{
Op: influxql.EQREGEX,
LHS: &influxql.VarRef{Val: "name"},
RHS: &influxql.RegexLiteral{Val: source.Regex.Val},
}
} else if source.Name != "" {
expr = &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "name"},
RHS: &influxql.StringLiteral{Val: source.Name},
}
}
/*
return q.PlanSelect(&influxql.SelectStatement{
Fields: influxql.Fields{
{Expr: &influxql.VarRef{Val: "tagKey"}},
},
Sources: influxql.Sources{
&influxql.Measurement{Database: database, Name: "_tagkeys"},
},
Condition: stmt.Condition,
Offset: stmt.Offset,
Limit: stmt.Limit,
SortFields: stmt.SortFields,
}, chunkSize)
*/
// Set condition or "AND" together.
if condition == nil {
condition = expr
} else {
condition = &influxql.BinaryExpr{Op: influxql.AND, LHS: expr, RHS: condition}
}
}
}
return q.PlanSelect(&influxql.SelectStatement{
Fields: influxql.Fields{
{Expr: &influxql.VarRef{Val: "tagKey"}},
},
Sources: influxql.Sources{
&influxql.Measurement{Database: database, Name: "_tagKeys"},
},
Condition: condition,
Offset: stmt.Offset,
Limit: stmt.Limit,
SortFields: stmt.SortFields,
OmitTime: true,
}, chunkSize)
}
func (q *QueryExecutor) executeStatement(statementID int, stmt influxql.Statement, database string, results chan *influxql.Result, chunkSize int, closing chan struct{}) error {

View File

@ -88,26 +88,6 @@ func TestQueryExecutor_ExecuteQuery_Select_Empty(t *testing.T) {
}
}
*/
// Ensure the query executor can execute a DROP SERIES statement.
func TestQueryExecutor_ExecuteQuery_DropSeries(t *testing.T) {
e := NewQueryExecutor()
e.Store.DeleteSeriesFn = func(database string, sources influxql.Sources, condition influxql.Expr) error {
if database != `db0` {
t.Fatalf("unexpected database: %s", database)
} else if !reflect.DeepEqual(sources, influxql.Sources{&influxql.Measurement{Database: "db0", RetentionPolicy: "rp0", Name: "cpu"}}) {
t.Fatalf("unexpected sources: %s", spew.Sdump(sources))
} else if condition != nil {
t.Fatalf("unexpected condition: %s", spew.Sdump(condition))
}
return nil
}
res := e.MustExecuteQueryString("db0", `drop series from cpu`)
if s := MustMarshalJSON(res); s != `[{}]` {
t.Fatalf("unexpected results: %s", s)
}
}
// Ensure the query executor can execute a DROP MEASUREMENT statement.
func TestQueryExecutor_ExecuteQuery_DropMeasurement(t *testing.T) {
@ -359,13 +339,17 @@ func (e *QueryExecutor) MustExecuteQueryStringJSON(database string, s string) st
// QueryExecutorStore is a mockable implementation of QueryExecutor.Store.
type QueryExecutorStore struct {
DatabaseIndexFn func(name string) *tsdb.DatabaseIndex
ShardsFn func(ids []uint64) []*tsdb.Shard
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
DeleteDatabaseFn func(name string, shardIDs []uint64) error
DeleteMeasurementFn func(database, name string) error
DeleteSeriesFn func(database string, sources influxql.Sources, condition influxql.Expr) error
DeleteSeriesFn func(database string, seriesKeys []string) error
}
func (s *QueryExecutorStore) DatabaseIndex(name string) *tsdb.DatabaseIndex {
return s.DatabaseIndexFn(name)
}
func (s *QueryExecutorStore) Shards(ids []uint64) []*tsdb.Shard {
return s.ShardsFn(ids)
}
@ -378,8 +362,8 @@ func (s *QueryExecutorStore) DeleteDatabase(name string, shardIDs []uint64) erro
func (s *QueryExecutorStore) DeleteMeasurement(database, name string) error {
return s.DeleteMeasurementFn(database, name)
}
func (s *QueryExecutorStore) DeleteSeries(database string, sources influxql.Sources, condition influxql.Expr) error {
return s.DeleteSeriesFn(database, sources, condition)
func (s *QueryExecutorStore) DeleteSeries(database string, seriesKeys []string) error {
return s.DeleteSeriesFn(database, seriesKeys)
}
// DefaultStoreExpandSourcesFn returns the original sources unchanged.

View File

@ -463,6 +463,8 @@ func (a Shards) createSystemIterator(opt influxql.IteratorOptions) (influxql.Ite
switch m.Name {
case "_measurements":
return a.createMeasurementsIterator(opt)
case "_tagKeys":
return a.createTagKeysIterator(opt)
default:
return nil, fmt.Errorf("unknown system source: %s", m.Name)
}
@ -487,6 +489,25 @@ func (a Shards) createMeasurementsIterator(opt influxql.IteratorOptions) (influx
return influxql.NewMergeIterator(itrs, opt), nil
}
// createTagKeysIterator returns an iterator for all tag keys across measurements.
func (a Shards) createTagKeysIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
itrs := make([]influxql.Iterator, 0, len(a))
if err := func() error {
for _, sh := range a {
itr, err := NewTagKeysIterator(sh, opt)
if err != nil {
return err
}
itrs = append(itrs, itr)
}
return nil
}(); err != nil {
influxql.Iterators(itrs).Close()
return nil, err
}
return influxql.NewMergeIterator(itrs, opt), nil
}
// FieldDimensions returns the unique fields and dimensions across a list of sources.
func (a Shards) FieldDimensions(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error) {
fields = make(map[string]struct{})
@ -880,6 +901,65 @@ func (itr *MeasurementIterator) Next() *influxql.FloatPoint {
}
}
// TagKeysIterator represents a string iterator that emits all tag keys in a shard.
type TagKeysIterator struct {
mms Measurements // remaining measurements
buf struct {
mm *Measurement // current measurement
keys []string // current measurement's keys
}
}
// NewTagKeysIterator returns a new instance of TagKeysIterator.
func NewTagKeysIterator(sh *Shard, opt influxql.IteratorOptions) (*TagKeysIterator, error) {
itr := &TagKeysIterator{}
// Retrieve measurements from shard. Filter if condition specified.
if opt.Condition == nil {
itr.mms = sh.index.Measurements()
} else {
mms, err := sh.index.measurementsByExpr(opt.Condition)
if err != nil {
return nil, err
}
itr.mms = mms
}
// Sort measurements by name.
sort.Sort(itr.mms)
return itr, nil
}
// Close closes the iterator.
func (itr *TagKeysIterator) Close() error { return nil }
// Next emits the next tag key name.
func (itr *TagKeysIterator) Next() *influxql.FloatPoint {
for {
// If there are no more keys then move to the next measurements.
if len(itr.buf.keys) == 0 {
if len(itr.mms) == 0 {
return nil
}
itr.buf.mm = itr.mms[0]
itr.buf.keys = itr.buf.mm.TagKeys()
itr.mms = itr.mms[1:]
continue
}
// Return next key.
p := &influxql.FloatPoint{
Name: itr.buf.mm.Name,
Aux: []interface{}{itr.buf.keys[0]},
}
itr.buf.keys = itr.buf.keys[1:]
return p
}
}
// IsNumeric returns whether a given aggregate can only be run on numeric fields.
func IsNumeric(c *influxql.Call) bool {
switch c.Name {

View File

@ -183,7 +183,7 @@ cpu,host=serverB,region=uswest value=25 0
Tags: influxql.NewTags(map[string]string{"host": "serverA"}),
Time: time.Unix(0, 0).UnixNano(),
Value: 100,
Aux: []interface{}{math.NaN()},
Aux: []interface{}{(*float64)(nil)},
}) {
t.Fatalf("unexpected point(0): %s", spew.Sdump(p))
}

View File

@ -1,327 +0,0 @@
package tsdb
/*
import (
"encoding/json"
"fmt"
"sort"
"time"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/models"
)
// ShowTagKeysExecutor implements the Executor interface for a SHOW MEASUREMENTS statement.
type ShowTagKeysExecutor struct {
stmt *influxql.ShowTagKeysStatement
mappers []Mapper
chunkSize int
}
// NewShowTagKeysExecutor returns a new ShowTagKeysExecutor.
func NewShowTagKeysExecutor(stmt *influxql.ShowTagKeysStatement, mappers []Mapper, chunkSize int) *ShowTagKeysExecutor {
return &ShowTagKeysExecutor{
stmt: stmt,
mappers: mappers,
chunkSize: chunkSize,
}
}
// Execute begins execution of the query and returns a channel to receive rows.
func (e *ShowTagKeysExecutor) Execute(closing <-chan struct{}) <-chan *models.Row {
// It's important that all resources are released when execution completes.
defer e.close()
// Create output channel and stream data in a separate goroutine.
out := make(chan *models.Row, 0)
go func() {
defer close(out)
// Open the mappers.
for _, m := range e.mappers {
if err := m.Open(); err != nil {
out <- &models.Row{Err: err}
return
}
}
// Create a map of measurement to tags keys.
set := map[string]map[string]struct{}{}
// Iterate through mappers collecting measurement names.
for _, m := range e.mappers {
// Read all data from the mapper.
for {
c, err := m.NextChunk()
if err != nil {
out <- &models.Row{Err: err}
return
} else if c == nil {
// Mapper has been drained.
break
}
// Convert the mapper chunk to an array of measurements with tag keys.
mtks, ok := c.(MeasurementsTagKeys)
if !ok {
out <- &models.Row{Err: fmt.Errorf("show tag keys mapper returned invalid type: %T", c)}
return
}
// Merge mapper chunk with previous mapper outputs.
for _, mm := range mtks {
for _, key := range mm.TagKeys {
if set[mm.Measurement] == nil {
set[mm.Measurement] = map[string]struct{}{}
}
set[mm.Measurement][key] = struct{}{}
}
}
}
}
// All mappers are drained.
// Convert the set into an array of measurements and their tag keys.
mstks := make(MeasurementsTagKeys, 0)
for mm, tks := range set {
mtks := &MeasurementTagKeys{Measurement: mm}
for tk := range tks {
mtks.TagKeys = append(mtks.TagKeys, tk)
}
sort.Strings(mtks.TagKeys)
mstks = append(mstks, mtks)
}
// Sort by measurement name.
sort.Sort(mstks)
slim, soff := limitAndOffset(e.stmt.SLimit, e.stmt.SOffset, len(mstks))
// Send results.
for _, mtks := range mstks[soff:slim] {
lim, off := limitAndOffset(e.stmt.Limit, e.stmt.Offset, len(mtks.TagKeys))
row := &models.Row{
Name: mtks.Measurement,
Columns: []string{"tagKey"},
Values: make([][]interface{}, 0, lim-off),
}
for _, tk := range mtks.TagKeys[off:lim] {
v := []interface{}{tk}
row.Values = append(row.Values, v)
}
select {
case out <- row:
case <-closing:
out <- &models.Row{Err: fmt.Errorf("execute was closed by caller")}
break
case <-time.After(30 * time.Second):
// This should never happen, so if it does, it is a problem
out <- &models.Row{Err: fmt.Errorf("execute was closed by read timeout")}
break
}
}
}()
return out
}
// limitAndOffset calculates the limit and offset indexes for n things.
func limitAndOffset(lim, off, n int) (int, int) {
if off >= n {
return 0, 0
}
o := off
l := n
if lim > 0 && o+lim < l {
l = o + lim
}
if o > l {
return 0, 0
}
return l, o
}
// Close closes the executor such that all resources are released. Once closed,
// an executor may not be re-used.
func (e *ShowTagKeysExecutor) close() {
if e != nil {
for _, m := range e.mappers {
m.Close()
}
}
}
// ShowTagKeysMapper is a mapper for collecting measurement names from a shard.
type ShowTagKeysMapper struct {
remote Mapper
shard *Shard
stmt *influxql.ShowTagKeysStatement
chunkSize int
state interface{}
}
// NewShowTagKeysMapper returns a mapper for the given shard, which will return data for the meta statement.
func NewShowTagKeysMapper(shard *Shard, stmt *influxql.ShowTagKeysStatement, chunkSize int) *ShowTagKeysMapper {
return &ShowTagKeysMapper{
shard: shard,
stmt: stmt,
chunkSize: chunkSize,
}
}
// MeasurementTagKeys represents measurement tag keys.
type MeasurementTagKeys struct {
Measurement string `json:"measurement"`
TagKeys []string `json:"tagkeys"`
}
// MeasurementsTagKeys represents tag keys for multiple measurements.
type MeasurementsTagKeys []*MeasurementTagKeys
func (a MeasurementsTagKeys) Len() int { return len(a) }
func (a MeasurementsTagKeys) Less(i, j int) bool { return a[i].Measurement < a[j].Measurement }
func (a MeasurementsTagKeys) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// Size returns the total string length of measurement names & tag keys.
func (a MeasurementsTagKeys) Size() int {
n := 0
for _, m := range a {
n += len(m.Measurement)
for _, k := range m.TagKeys {
n += len(k)
}
}
return n
}
// Open opens the mapper for use.
func (m *ShowTagKeysMapper) Open() error {
if m.remote != nil {
return m.remote.Open()
}
// This can happen when a shard has been assigned to this node but we have not
// written to it so it may not exist yet.
if m.shard == nil {
return nil
}
sources := influxql.Sources{}
// Expand regex expressions in the FROM clause.
if m.stmt.Sources != nil {
var err error
sources, err = m.shard.index.ExpandSources(m.stmt.Sources)
if err != nil {
return err
}
}
// Get measurements from sources in the statement if provided or database if not.
measurements, err := measurementsFromSourcesOrDB(m.shard.index, sources...)
if err != nil {
return err
}
// If a WHERE clause was specified, filter the measurements.
if m.stmt.Condition != nil {
var err error
whereMs, err := m.shard.index.measurementsByExpr(m.stmt.Condition)
if err != nil {
return err
}
sort.Sort(whereMs)
measurements = measurements.intersect(whereMs)
}
// Create a channel to send measurement names on.
ch := make(chan *MeasurementTagKeys)
// Start a goroutine to send the names over the channel as needed.
go func() {
for _, mm := range measurements {
ch <- &MeasurementTagKeys{
Measurement: mm.Name,
TagKeys: mm.TagKeys(),
}
}
close(ch)
}()
// Store the channel as the state of the mapper.
m.state = ch
return nil
}
// SetRemote sets the remote mapper to use.
func (m *ShowTagKeysMapper) SetRemote(remote Mapper) error {
m.remote = remote
return nil
}
// TagSets is only implemented on this mapper to satisfy the Mapper interface.
func (m *ShowTagKeysMapper) TagSets() []string { return nil }
// Fields returns a list of field names for this mapper.
func (m *ShowTagKeysMapper) Fields() []string { return []string{"tagKey"} }
// NextChunk returns the next chunk of measurements and tag keys.
func (m *ShowTagKeysMapper) NextChunk() (interface{}, error) {
if m.remote != nil {
b, err := m.remote.NextChunk()
if err != nil {
return nil, err
} else if b == nil {
return nil, nil
}
mtks := []*MeasurementTagKeys{}
if err := json.Unmarshal(b.([]byte), &mtks); err != nil {
return nil, err
} else if len(mtks) == 0 {
// Mapper on other node sent 0 values so it's done.
return nil, nil
}
return mtks, nil
}
return m.nextChunk()
}
// nextChunk implements next chunk logic for a local shard.
func (m *ShowTagKeysMapper) nextChunk() (interface{}, error) {
// Get the channel of measurement tag keys from the state.
ch, ok := m.state.(chan *MeasurementTagKeys)
if !ok {
return nil, nil
}
// Allocate array to hold measurement names.
mtks := make(MeasurementsTagKeys, 0)
// Get the next chunk of tag keys.
for n := range ch {
mtks = append(mtks, n)
if mtks.Size() >= m.chunkSize {
break
}
}
// See if we've read all the names.
if len(mtks) == 0 {
return nil, nil
}
return mtks, nil
}
// Close closes the mapper.
func (m *ShowTagKeysMapper) Close() {
if m.remote != nil {
m.remote.Close()
}
}
*/