Add per measurement tag index.

* Add tag index inside the measurement index
* Add initial table tests for getting series ids by filters
* Add comments for some existing methods
pull/1264/head
Paul Dix 2014-12-24 18:44:53 -05:00
parent bbe859c981
commit 9fb04a2636
2 changed files with 226 additions and 11 deletions

View File

@ -32,18 +32,26 @@ type Filter struct {
Regex regexp.Regexp Regex regexp.Regexp
} }
type Filters []*Filter
func (f Filters) String() string {
return string(mustMarshalJSON(f))
}
// SeriesIDs is a convenience type for sorting, checking equality, and doing union and
// intersection of collections of series ids.
type SeriesIDs []uint32 type SeriesIDs []uint32
func (p SeriesIDs) Len() int { return len(p) } func (p SeriesIDs) Len() int { return len(p) }
func (p SeriesIDs) Less(i, j int) bool { return p[i] < p[j] } func (p SeriesIDs) Less(i, j int) bool { return p[i] < p[j] }
func (p SeriesIDs) Swap(i, j int) { p[i], p[j] = p[j], p[i] } func (p SeriesIDs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p SeriesIDs) Sort() { sort.Sort(p) } func (p SeriesIDs) Sort() { sort.Sort(p) }
// Equals assumes that both are sorted. This is by design, no touchy!
func (p SeriesIDs) Equals(s SeriesIDs) bool { func (p SeriesIDs) Equals(s SeriesIDs) bool {
if len(p) != len(s) { if len(p) != len(s) {
return false return false
} }
p.Sort()
s.Sort()
for i, pp := range p { for i, pp := range p {
if s[i] != pp { if s[i] != pp {
return false return false
@ -52,10 +60,48 @@ func (p SeriesIDs) Equals(s SeriesIDs) bool {
return true return true
} }
// Intersect returns a new collection of series ids that is the intersection of the two.
// The two collections must already be sorted.
func (s SeriesIDs) Intersect(a SeriesIDs) SeriesIDs {
l := s
r := a
// we want to iterate through the shortest one and stop
if len(a) < len(s) {
l = a
r = s
}
// they're in sorted order so advance the counter as needed.
// That is, don't run comparisons against lower values that we've already passed
var i, j int
ids := make([]uint32, 0, len(l))
for i < len(l) {
if l[i] == r[j] {
ids = append(ids, l[i])
i += 1
j += 1
} else if l[i] < r[j] {
i += 1
} else {
j += 1
}
}
return SeriesIDs(ids)
}
// Convenience method to output something during tests
func (s SeriesIDs) String() string {
return string(mustMarshalJSON(s))
}
// Keeps a mapping of the series in a measurement // Keeps a mapping of the series in a measurement
type measurementIndex struct { type measurementIndex struct {
series map[string]*Series // sorted tag string to the series object series map[string]*Series // sorted tag string to the series object
measurement *Measurement measurement *Measurement
tagsToSeries map[string]map[string]SeriesIDs // map from tag key to value to sorted set of series ids
} }
// addSeries will add a series to the measurementIndex. Returns false if already present // addSeries will add a series to the measurementIndex. Returns false if already present
@ -68,6 +114,7 @@ func (m measurementIndex) addSeries(s *Series) bool {
return true return true
} }
// seriesByTags returns the Series that matches the given tagset.
func (m measurementIndex) seriesByTags(tags map[string]string) *Series { func (m measurementIndex) seriesByTags(tags map[string]string) *Series {
return m.series[string(tagsToBytes(tags))] return m.series[string(tagsToBytes(tags))]
} }
@ -85,41 +132,103 @@ func (t *TagIndex) AddSeries(name string, s *Series) bool {
// get or create the measurement index // get or create the measurement index
idx := t.measurementIndex[name] idx := t.measurementIndex[name]
if idx == nil { if idx == nil {
idx = &measurementIndex{series: make(map[string]*Series), measurement: NewMeasurement(name)} idx = &measurementIndex{
series: make(map[string]*Series),
measurement: NewMeasurement(name),
tagsToSeries: make(map[string]map[string]SeriesIDs),
}
t.measurementIndex[name] = idx t.measurementIndex[name] = idx
} }
idx.measurement.Series = append(idx.measurement.Series, s) idx.measurement.Series = append(idx.measurement.Series, s)
t.seriesToMeasurement[s.ID] = idx.measurement t.seriesToMeasurement[s.ID] = idx.measurement
t.series[s.ID] = s t.series[s.ID] = s
// add this series id to the tag index on the measurement
for k, v := range s.Tags {
valueMap := idx.tagsToSeries[k]
if valueMap == nil {
valueMap = make(map[string]SeriesIDs)
idx.tagsToSeries[k] = valueMap
}
ids := valueMap[v]
if ids == nil {
ids = make([]uint32, 0)
}
ids = append(ids, s.ID)
ids.Sort()
valueMap[v] = ids
}
// TODO: add this series to the global tag index
return idx.addSeries(s) return idx.addSeries(s)
} }
// AddField adds a field to the measurement name. Returns false if already present // AddField adds a field to the measurement name. Returns false if already present
func (t *TagIndex) AddField(name string, f *Field) bool { func (t *TagIndex) AddField(name string, f *Field) bool {
panic("not implemented")
return false return false
} }
// SeriesIDs returns an array of series ids for the given measurements and filters to be applied to all // SeriesIDs returns an array of series ids for the given measurements and filters to be applied to all
func (t *TagIndex) SeriesIDs(names []string, filters []*Filter) SeriesIDs { func (t *TagIndex) SeriesIDs(names []string, filters Filters) SeriesIDs {
t.mu.RLock() t.mu.RLock()
defer t.mu.RUnlock() defer t.mu.RUnlock()
ids := make([]uint32, 0) // they want all ids if no filters are specified
for _, idx := range t.measurementIndex { if len(filters) == 0 {
for _, s := range idx.series { ids := SeriesIDs(make([]uint32, 0))
ids = append(ids, s.ID) for _, idx := range t.measurementIndex {
for _, s := range idx.series {
ids = append(ids, s.ID)
}
} }
ids.Sort()
return ids
} }
ids := SeriesIDs(make([]uint32, 0))
for _, n := range names {
ids = append(ids, t.seriesIDsForName(n, filters)...)
}
ids.Sort()
return ids return ids
} }
func (t *TagIndex) seriesIDsForName(name string, filters Filters) SeriesIDs {
idx := t.measurementIndex[name]
if idx == nil {
return nil
}
// process the filters one at a time to get the list of ids they return
idsPerFilter := make([]SeriesIDs, len(filters), len(filters))
for i, filter := range filters {
var ids SeriesIDs
values := idx.tagsToSeries[filter.Key]
if values != nil {
ids = SeriesIDs(values[filter.Value])
}
idsPerFilter[i] = ids
}
// collapse the set of ids
allIDs := idsPerFilter[0]
for i := 1; i < len(filters); i++ {
allIDs = allIDs.Intersect(idsPerFilter[i])
}
return allIDs
}
// MeasurementBySeriesID returns the Measurement that is the parent of the given series id.
func (t *TagIndex) MeasurementBySeriesID(id uint32) *Measurement { func (t *TagIndex) MeasurementBySeriesID(id uint32) *Measurement {
t.mu.RLock() t.mu.RLock()
defer t.mu.RUnlock() defer t.mu.RUnlock()
return t.seriesToMeasurement[id] return t.seriesToMeasurement[id]
} }
// MeasurementAndSeries returns the Measurement and the Series for a given measurement name and tag set.
func (t *TagIndex) MeasurementAndSeries(name string, tags map[string]string) (*Measurement, *Series) { func (t *TagIndex) MeasurementAndSeries(name string, tags map[string]string) (*Measurement, *Series) {
t.mu.RLock() t.mu.RLock()
defer t.mu.RUnlock() defer t.mu.RUnlock()
@ -130,10 +239,12 @@ func (t *TagIndex) MeasurementAndSeries(name string, tags map[string]string) (*M
return idx.measurement, idx.seriesByTags(tags) return idx.measurement, idx.seriesByTags(tags)
} }
// SereiesByID returns the Series that has the given id.
func (t *TagIndex) SeriesByID(id uint32) *Series { func (t *TagIndex) SeriesByID(id uint32) *Series {
return t.series[id] return t.series[id]
} }
// Measurements returns all measurements that match the given filters.
func (t *TagIndex) Measurements(filters []*Filter) []*Measurement { func (t *TagIndex) Measurements(filters []*Filter) []*Measurement {
measurements := make([]*Measurement, 0, len(t.measurementIndex)) measurements := make([]*Measurement, 0, len(t.measurementIndex))
for _, idx := range t.measurementIndex { for _, idx := range t.measurementIndex {
@ -142,10 +253,12 @@ func (t *TagIndex) Measurements(filters []*Filter) []*Measurement {
return measurements return measurements
} }
// DropSeries will clear the index of all references to a series.
func (t *TagIndex) DropSeries(id uint32) { func (t *TagIndex) DropSeries(id uint32) {
panic("not implemented") panic("not implemented")
} }
// DropMeasurement will clear the index of all references to a measurement and its child series.
func (t *TagIndex) DropMeasurement(name string) { func (t *TagIndex) DropMeasurement(name string) {
panic("not implemented") panic("not implemented")
} }

View File

@ -143,7 +143,54 @@ func TestTagIndex_SeriesIDs(t *testing.T) {
} }
func TestTagIndex_SeriesIDsWhereFilter(t *testing.T) { func TestTagIndex_SeriesIDsWhereFilter(t *testing.T) {
t.Skip("pending") idx := indexWithFixtureData()
var tests = []struct {
names []string
filters []*influxdb.Filter
result []uint32
}{
// match against no tags
{
names: []string{"cpu_load", "redis"},
result: []uint32{uint32(1), uint32(2), uint32(3), uint32(4), uint32(5)},
},
// match against all tags
{
names: []string{"cpu_load"},
filters: []*influxdb.Filter{
&influxdb.Filter{Key: "host", Value: "servera.influx.com"},
&influxdb.Filter{Key: "region", Value: "uswest"},
},
result: []uint32{uint32(1)},
},
// match against one tag
{
names: []string{"cpu_load"},
filters: []*influxdb.Filter{
&influxdb.Filter{Key: "region", Value: "uswest"},
},
result: []uint32{uint32(1), uint32(2)},
},
// partial match against one tag
// partial match against two tags
// query against tag key that doesn't exist returns empty
// query against tag value that doesn't exist returns empty
}
for i, tt := range tests {
r := idx.SeriesIDs(tt.names, tt.filters)
expectedIDs := influxdb.SeriesIDs(tt.result)
if !r.Equals(expectedIDs) {
t.Fatalf("%d: filters: %s: result mismatch:\n exp=%s\n got=%s", i, influxdb.Filters(tt.filters), expectedIDs, r)
}
}
} }
func TestTagIndex_SeriesIDsWhereFilterMultiple(t *testing.T) { func TestTagIndex_SeriesIDsWhereFilterMultiple(t *testing.T) {
@ -186,6 +233,10 @@ func TestTagIndex_TagValuesWhereFilterAndNot(t *testing.T) {
t.Skip("pending") t.Skip("pending")
} }
func TestTagIndex_MeasurementsWhereFilter(t *testing.T) {
t.Skip("pending")
}
func TestTagIndex_DropSeries(t *testing.T) { func TestTagIndex_DropSeries(t *testing.T) {
t.Skip("pending") t.Skip("pending")
} }
@ -193,3 +244,54 @@ func TestTagIndex_DropSeries(t *testing.T) {
func TestTagIndex_DropMeasurement(t *testing.T) { func TestTagIndex_DropMeasurement(t *testing.T) {
t.Skip("pending") t.Skip("pending")
} }
// indexWithFixtureData returns a populated TagIndex for use in many of the filtering tests
func indexWithFixtureData() *influxdb.TagIndex {
idx := influxdb.NewTagIndex()
s := &influxdb.Series{
ID: uint32(1),
Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"}}
added := idx.AddSeries("cpu_load", s)
if !added {
return nil
}
s = &influxdb.Series{
ID: uint32(2),
Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}}
added = idx.AddSeries("cpu_load", s)
if !added {
return nil
}
s = &influxdb.Series{
ID: uint32(3),
Tags: map[string]string{"host": "serverc.influx.com", "region": "uswest", "service": "redis"}}
added = idx.AddSeries("key_count", s)
if !added {
return nil
}
s = &influxdb.Series{
ID: uint32(4),
Tags: map[string]string{"host": "serverd.influx.com", "region": "useast", "service": "redis"}}
added = idx.AddSeries("key_count", s)
if !added {
return nil
}
s = &influxdb.Series{
ID: uint32(5),
Tags: map[string]string{"name": "high priority"}}
added = idx.AddSeries("queue_depth", s)
if !added {
return nil
}
return idx
}