Wire up TagValues on index.
parent
0f64a29d78
commit
3aa46c6b8a
69
index.go
69
index.go
|
@ -235,6 +235,75 @@ func (t *Index) TagKeys(names []string) []string {
|
||||||
return sortedKeys
|
return sortedKeys
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TagValues returns a map of unique tag values for the given measurements and key with the given filters applied.
|
||||||
|
// Call .ToSlice() on the result to convert it into a sorted slice of strings.
|
||||||
|
// Filters are equivalent to and AND operation. If you want to do an OR, get the tag values for one set,
|
||||||
|
// then get the tag values for another set and do a union of the two.
|
||||||
|
func (t *Index) TagValues(names []string, key string, filters []*Filter) TagValues {
|
||||||
|
t.mu.RLock()
|
||||||
|
defer t.mu.RUnlock()
|
||||||
|
|
||||||
|
values := TagValues(make(map[string]bool))
|
||||||
|
|
||||||
|
// see if they just want all the tag values for this key
|
||||||
|
if len(filters) == 0 {
|
||||||
|
for _, n := range names {
|
||||||
|
idx := t.measurements[n]
|
||||||
|
if idx != nil {
|
||||||
|
values.Union(idx.tagValues(key))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return values
|
||||||
|
}
|
||||||
|
|
||||||
|
// they have filters so just get a set of series ids matching them and then get the tag values from those
|
||||||
|
seriesIDs := t.SeriesIDs(names, filters)
|
||||||
|
return t.tagValuesForSeries(key, seriesIDs)
|
||||||
|
}
|
||||||
|
|
||||||
|
// tagValuesForSeries will return a TagValues map of all the unique tag values for a collection of series.
|
||||||
|
func (t *Index) tagValuesForSeries(key string, seriesIDs SeriesIDs) TagValues {
|
||||||
|
values := make(map[string]bool)
|
||||||
|
for _, id := range seriesIDs {
|
||||||
|
s := t.series[id]
|
||||||
|
if s == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if v, ok := s.Tags[key]; ok {
|
||||||
|
values[v] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return TagValues(values)
|
||||||
|
}
|
||||||
|
|
||||||
|
type TagValues map[string]bool
|
||||||
|
|
||||||
|
// ToSlice returns a sorted slice of the TagValues
|
||||||
|
func (t TagValues) ToSlice() []string {
|
||||||
|
a := make([]string, 0, len(t))
|
||||||
|
for v, _ := range t {
|
||||||
|
a = append(a, v)
|
||||||
|
}
|
||||||
|
sort.Strings(a)
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
|
||||||
|
// Union will modify the receiver by merging in the passed in values.
|
||||||
|
func (l TagValues) Union(r TagValues) {
|
||||||
|
for v, _ := range r {
|
||||||
|
l[v] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Intersect will modify the receiver by keeping only the keys that exist in the passed in values
|
||||||
|
func (l TagValues) Intersect(r TagValues) {
|
||||||
|
for v, _ := range l {
|
||||||
|
if _, ok := r[v]; !ok {
|
||||||
|
delete(l, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//seriesIDsForName is the same as SeriesIDs, but for a specific measurement.
|
//seriesIDsForName is the same as SeriesIDs, but for a specific measurement.
|
||||||
func (t *Index) seriesIDsForName(name string, filters Filters) SeriesIDs {
|
func (t *Index) seriesIDsForName(name string, filters Filters) SeriesIDs {
|
||||||
idx := t.measurements[name]
|
idx := t.measurements[name]
|
||||||
|
|
|
@ -338,7 +338,81 @@ func TestIndex_TagKeys(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIndex_TagValuesWhereFilter(t *testing.T) {
|
func TestIndex_TagValuesWhereFilter(t *testing.T) {
|
||||||
t.Skip("pending")
|
idx := indexWithFixtureData()
|
||||||
|
|
||||||
|
var tests = []struct {
|
||||||
|
names []string
|
||||||
|
key string
|
||||||
|
filters []*influxdb.Filter
|
||||||
|
result []string
|
||||||
|
}{
|
||||||
|
// get the tag values across multiple measurements
|
||||||
|
|
||||||
|
// get the tag values for a single measurement
|
||||||
|
{
|
||||||
|
names: []string{"key_count"},
|
||||||
|
key: "region",
|
||||||
|
result: []string{"useast", "uswest"},
|
||||||
|
},
|
||||||
|
|
||||||
|
// get the tag values for a single measurement with where filter
|
||||||
|
{
|
||||||
|
names: []string{"key_count"},
|
||||||
|
key: "region",
|
||||||
|
filters: []*influxdb.Filter{
|
||||||
|
&influxdb.Filter{Key: "host", Value: "serverc.influx.com"},
|
||||||
|
},
|
||||||
|
result: []string{"uswest"},
|
||||||
|
},
|
||||||
|
|
||||||
|
// get the tag values for a single measurement with a not where filter
|
||||||
|
{
|
||||||
|
names: []string{"key_count"},
|
||||||
|
key: "region",
|
||||||
|
filters: []*influxdb.Filter{
|
||||||
|
&influxdb.Filter{Key: "host", Value: "serverc.influx.com", Not: true},
|
||||||
|
},
|
||||||
|
result: []string{"useast"},
|
||||||
|
},
|
||||||
|
|
||||||
|
// get the tag values for a single measurement with multiple where filters
|
||||||
|
{
|
||||||
|
names: []string{"key_count"},
|
||||||
|
key: "region",
|
||||||
|
filters: []*influxdb.Filter{
|
||||||
|
&influxdb.Filter{Key: "host", Value: "serverc.influx.com"},
|
||||||
|
&influxdb.Filter{Key: "service", Value: "redis"},
|
||||||
|
},
|
||||||
|
result: []string{"uswest"},
|
||||||
|
},
|
||||||
|
|
||||||
|
// get the tag values for a single measurement with regex filter
|
||||||
|
{
|
||||||
|
names: []string{"queue_depth"},
|
||||||
|
key: "name",
|
||||||
|
filters: []*influxdb.Filter{
|
||||||
|
&influxdb.Filter{Key: "app", Regex: regexp.MustCompile("paul.*")},
|
||||||
|
},
|
||||||
|
result: []string{"high priority"},
|
||||||
|
},
|
||||||
|
|
||||||
|
// get the tag values for a single measurement with a not regex filter
|
||||||
|
{
|
||||||
|
names: []string{"key_count"},
|
||||||
|
key: "region",
|
||||||
|
filters: []*influxdb.Filter{
|
||||||
|
&influxdb.Filter{Key: "host", Regex: regexp.MustCompile("serverd.*"), Not: true},
|
||||||
|
},
|
||||||
|
result: []string{"uswest"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, tt := range tests {
|
||||||
|
r := idx.TagValues(tt.names, tt.key, tt.filters).ToSlice()
|
||||||
|
if !reflect.DeepEqual(r, tt.result) {
|
||||||
|
t.Fatalf("%d: filters: %s: result mismatch:\n exp=%s\n got=%s", i, mustMarshalJSON(tt.filters), tt.result, r)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIndex_TagValuesWhereFilterMultiple(t *testing.T) {
|
func TestIndex_TagValuesWhereFilterMultiple(t *testing.T) {
|
||||||
|
|
24
server.go
24
server.go
|
@ -1264,13 +1264,17 @@ type databaseJSON struct {
|
||||||
Shards []*Shard `json:"shards,omitempty"`
|
Shards []*Shard `json:"shards,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Measurement represents a collection of time series in a database
|
// Measurement represents a collection of time series in a database. It also contains in memory
|
||||||
|
// structures for indexing tags. These structures are accessed through private methods on the Measurement
|
||||||
|
// object. Generally these methods are only accessed from Index, which is responsible for ensuring
|
||||||
|
// go routine safe access.
|
||||||
type Measurement struct {
|
type Measurement struct {
|
||||||
Name string `json:"name,omitempty"`
|
Name string `json:"name,omitempty"`
|
||||||
Fields []*Fields `json:"fields,omitempty"`
|
Fields []*Fields `json:"fields,omitempty"`
|
||||||
|
|
||||||
// in memory index fields
|
// in memory index fields
|
||||||
series map[string]*Series // sorted tagset string to the series object
|
series map[string]*Series // sorted tagset string to the series object
|
||||||
|
seriesByID map[uint32]*Series // lookup table for series by their id
|
||||||
measurement *Measurement
|
measurement *Measurement
|
||||||
seriesByTagKeyValue map[string]map[string]SeriesIDs // map from tag key to value to sorted set of series ids
|
seriesByTagKeyValue map[string]map[string]SeriesIDs // map from tag key to value to sorted set of series ids
|
||||||
ids SeriesIDs // sorted list of series IDs in this measurement
|
ids SeriesIDs // sorted list of series IDs in this measurement
|
||||||
|
@ -1282,6 +1286,7 @@ func NewMeasurement(name string) *Measurement {
|
||||||
Fields: make([]*Fields, 0),
|
Fields: make([]*Fields, 0),
|
||||||
|
|
||||||
series: make(map[string]*Series),
|
series: make(map[string]*Series),
|
||||||
|
seriesByID: make(map[uint32]*Series),
|
||||||
seriesByTagKeyValue: make(map[string]map[string]SeriesIDs),
|
seriesByTagKeyValue: make(map[string]map[string]SeriesIDs),
|
||||||
ids: SeriesIDs(make([]uint32, 0)),
|
ids: SeriesIDs(make([]uint32, 0)),
|
||||||
}
|
}
|
||||||
|
@ -1289,10 +1294,11 @@ func NewMeasurement(name string) *Measurement {
|
||||||
|
|
||||||
// addSeries will add a series to the measurementIndex. Returns false if already present
|
// addSeries will add a series to the measurementIndex. Returns false if already present
|
||||||
func (m *Measurement) addSeries(s *Series) bool {
|
func (m *Measurement) addSeries(s *Series) bool {
|
||||||
tagset := string(marshalTags(s.Tags))
|
if _, ok := m.seriesByID[s.ID]; ok {
|
||||||
if _, ok := m.series[tagset]; ok {
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
m.seriesByID[s.ID] = s
|
||||||
|
tagset := string(marshalTags(s.Tags))
|
||||||
m.series[tagset] = s
|
m.series[tagset] = s
|
||||||
m.ids = append(m.ids, s.ID)
|
m.ids = append(m.ids, s.ID)
|
||||||
// the series ID should always be higher than all others because it's a new
|
// the series ID should always be higher than all others because it's a new
|
||||||
|
@ -1374,9 +1380,17 @@ func (m *Measurement) seriesIDs(filter *Filter) (ids SeriesIDs) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
type Measurements []*Measurement
|
// tagValues returns an array of unique tag values for the given key
|
||||||
|
func (m *Measurement) tagValues(key string) TagValues {
|
||||||
|
tags := m.seriesByTagKeyValue[key]
|
||||||
|
values := make(map[string]bool, len(tags))
|
||||||
|
for k, _ := range tags {
|
||||||
|
values[k] = true
|
||||||
|
}
|
||||||
|
return TagValues(values)
|
||||||
|
}
|
||||||
|
|
||||||
func (m Measurement) String() string { return string(mustMarshalJSON(m)) }
|
type Measurements []*Measurement
|
||||||
|
|
||||||
// Field represents a series field.
|
// Field represents a series field.
|
||||||
type Field struct {
|
type Field struct {
|
||||||
|
|
Loading…
Reference in New Issue