Wire up tag keys
* Add index for sorted measurement names * Add TagKeys methodpull/1264/head
parent
d0d36b73c1
commit
6481c94162
39
index.go
39
index.go
|
@ -14,6 +14,7 @@ type Index struct {
|
|||
measurementIndex map[string]*measurementIndex // map measurement name to its tag index
|
||||
seriesToMeasurement map[uint32]*Measurement // map series id to its measurement
|
||||
series map[uint32]*Series // map series id to the Series object
|
||||
names []string // sorted list of the measurement names
|
||||
}
|
||||
|
||||
func NewIndex() *Index {
|
||||
|
@ -21,6 +22,7 @@ func NewIndex() *Index {
|
|||
measurementIndex: make(map[string]*measurementIndex),
|
||||
seriesToMeasurement: make(map[uint32]*Measurement),
|
||||
series: make(map[uint32]*Series),
|
||||
names: make([]string, 0),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -273,6 +275,8 @@ func (t *Index) AddSeries(name string, s *Series) bool {
|
|||
ids: SeriesIDs(make([]uint32, 0)),
|
||||
}
|
||||
t.measurementIndex[name] = idx
|
||||
t.names = append(t.names, name)
|
||||
sort.Strings(t.names)
|
||||
}
|
||||
idx.measurement.Series = append(idx.measurement.Series, s)
|
||||
t.seriesToMeasurement[s.ID] = idx.measurement
|
||||
|
@ -314,6 +318,34 @@ func (t *Index) SeriesIDs(names []string, filters Filters) SeriesIDs {
|
|||
return ids
|
||||
}
|
||||
|
||||
// TagKeys returns a sorted array of unique tag keys for the given measurements.
|
||||
func (t *Index) TagKeys(names []string) []string {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
||||
if len(names) == 0 {
|
||||
names = t.names
|
||||
}
|
||||
|
||||
keys := make(map[string]bool)
|
||||
for _, n := range names {
|
||||
idx := t.measurementIndex[n]
|
||||
if idx != nil {
|
||||
for k, _ := range idx.tagsToSeries {
|
||||
keys[k] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sortedKeys := make([]string, 0, len(keys))
|
||||
for k, _ := range keys {
|
||||
sortedKeys = append(sortedKeys, k)
|
||||
}
|
||||
sort.Strings(sortedKeys)
|
||||
|
||||
return sortedKeys
|
||||
}
|
||||
|
||||
//seriesIDsForName is the same as SeriesIDs, but for a specific measurement.
|
||||
func (t *Index) seriesIDsForName(name string, filters Filters) SeriesIDs {
|
||||
idx := t.measurementIndex[name]
|
||||
|
@ -368,6 +400,13 @@ func (t *Index) Measurements(filters []*Filter) []*Measurement {
|
|||
return measurements
|
||||
}
|
||||
|
||||
// Names returns all measuremet names in sorted order.
|
||||
func (t *Index) Names() []string {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
return t.names
|
||||
}
|
||||
|
||||
// DropSeries will clear the index of all references to a series.
|
||||
func (t *Index) DropSeries(id uint32) {
|
||||
panic("not implemented")
|
||||
|
|
|
@ -1,12 +1,24 @@
|
|||
package influxdb_test
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"regexp"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
)
|
||||
|
||||
// Ensure that the index will return a sorted array of measurement names.
|
||||
func TestIndex_Names(t *testing.T) {
|
||||
idx := indexWithFixtureData()
|
||||
|
||||
r := idx.Names()
|
||||
exp := []string{"another_thing", "cpu_load", "key_count", "queue_depth"}
|
||||
if !reflect.DeepEqual(r, exp) {
|
||||
t.Fatalf("Names not equal:\n got: %s\n exp: %s", r, exp)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that we can get the measurement by the series ID.
|
||||
func TestIndex_MeasurementBySeriesID(t *testing.T) {
|
||||
idx := influxdb.NewIndex()
|
||||
|
@ -159,7 +171,7 @@ func TestIndex_SeriesIDsWhereFilter(t *testing.T) {
|
|||
// match against no tags
|
||||
{
|
||||
names: []string{"cpu_load", "redis"},
|
||||
result: []uint32{uint32(1), uint32(2), uint32(3), uint32(4), uint32(5), uint32(6), uint32(7)},
|
||||
result: []uint32{uint32(1), uint32(2), uint32(3), uint32(4), uint32(5), uint32(6), uint32(7), uint32(8)},
|
||||
},
|
||||
|
||||
// match against all tags
|
||||
|
@ -298,11 +310,32 @@ func TestIndex_FieldKeys(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestIndex_TagKeys(t *testing.T) {
|
||||
t.Skip("pending")
|
||||
}
|
||||
idx := indexWithFixtureData()
|
||||
|
||||
func TestIndex_TagKeysForMeasurement(t *testing.T) {
|
||||
t.Skip("pending")
|
||||
var tests = []struct {
|
||||
names []string
|
||||
result []string
|
||||
}{
|
||||
{
|
||||
names: nil,
|
||||
result: []string{"a", "app", "host", "name", "region", "service"},
|
||||
},
|
||||
{
|
||||
names: []string{"cpu_load"},
|
||||
result: []string{"host", "region"},
|
||||
},
|
||||
{
|
||||
names: []string{"key_count", "queue_depth"},
|
||||
result: []string{"app", "host", "name", "region", "service"},
|
||||
},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
r := idx.TagKeys(tt.names)
|
||||
if !reflect.DeepEqual(r, tt.result) {
|
||||
t.Fatalf("%d: names: %s: result mismatch:\n exp=%s\n got=%s", i, tt.names, tt.result, r)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndex_TagValuesWhereFilter(t *testing.T) {
|
||||
|
@ -399,6 +432,15 @@ func indexWithFixtureData() *influxdb.Index {
|
|||
return nil
|
||||
}
|
||||
|
||||
s = &influxdb.Series{
|
||||
ID: uint32(8),
|
||||
Tags: map[string]string{"a": "b"}}
|
||||
|
||||
added = idx.AddSeries("another_thing", s)
|
||||
if !added {
|
||||
return nil
|
||||
}
|
||||
|
||||
return idx
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue