From 9fb04a2636f6bf7e1030c0595dcce3ffcfb029c8 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Wed, 24 Dec 2014 18:44:53 -0500 Subject: [PATCH] Add per measurement tag index. * Add tag index inside the measurement index * Add initial table tests for getting series ids by filters * Add comments for some existing methods --- tag_index.go | 133 ++++++++++++++++++++++++++++++++++++++++++---- tag_index_test.go | 104 +++++++++++++++++++++++++++++++++++- 2 files changed, 226 insertions(+), 11 deletions(-) diff --git a/tag_index.go b/tag_index.go index b835182c71..ae0c443c86 100644 --- a/tag_index.go +++ b/tag_index.go @@ -32,18 +32,26 @@ type Filter struct { Regex regexp.Regexp } +type Filters []*Filter + +func (f Filters) String() string { + return string(mustMarshalJSON(f)) +} + +// SeriesIDs is a convenience type for sorting, checking equality, and doing union and +// intersection of collections of series ids. type SeriesIDs []uint32 func (p SeriesIDs) Len() int { return len(p) } func (p SeriesIDs) Less(i, j int) bool { return p[i] < p[j] } func (p SeriesIDs) Swap(i, j int) { p[i], p[j] = p[j], p[i] } func (p SeriesIDs) Sort() { sort.Sort(p) } + +// Equals assumes that both are sorted. This is by design, no touchy! func (p SeriesIDs) Equals(s SeriesIDs) bool { if len(p) != len(s) { return false } - p.Sort() - s.Sort() for i, pp := range p { if s[i] != pp { return false @@ -52,10 +60,48 @@ func (p SeriesIDs) Equals(s SeriesIDs) bool { return true } +// Intersect returns a new collection of series ids that is the intersection of the two. +// The two collections must already be sorted. +func (s SeriesIDs) Intersect(a SeriesIDs) SeriesIDs { + l := s + r := a + + // we want to iterate through the shortest one and stop + if len(a) < len(s) { + l = a + r = s + } + + // they're in sorted order so advance the counter as needed. + // That is, don't run comparisons against lower values that we've already passed + var i, j int + + ids := make([]uint32, 0, len(l)) + for i < len(l) { + if l[i] == r[j] { + ids = append(ids, l[i]) + i += 1 + j += 1 + } else if l[i] < r[j] { + i += 1 + } else { + j += 1 + } + } + + return SeriesIDs(ids) +} + +// Convenience method to output something during tests +func (s SeriesIDs) String() string { + return string(mustMarshalJSON(s)) +} + // Keeps a mapping of the series in a measurement type measurementIndex struct { - series map[string]*Series // sorted tag string to the series object - measurement *Measurement + series map[string]*Series // sorted tag string to the series object + measurement *Measurement + tagsToSeries map[string]map[string]SeriesIDs // map from tag key to value to sorted set of series ids } // addSeries will add a series to the measurementIndex. Returns false if already present @@ -68,6 +114,7 @@ func (m measurementIndex) addSeries(s *Series) bool { return true } +// seriesByTags returns the Series that matches the given tagset. func (m measurementIndex) seriesByTags(tags map[string]string) *Series { return m.series[string(tagsToBytes(tags))] } @@ -85,41 +132,103 @@ func (t *TagIndex) AddSeries(name string, s *Series) bool { // get or create the measurement index idx := t.measurementIndex[name] if idx == nil { - idx = &measurementIndex{series: make(map[string]*Series), measurement: NewMeasurement(name)} + idx = &measurementIndex{ + series: make(map[string]*Series), + measurement: NewMeasurement(name), + tagsToSeries: make(map[string]map[string]SeriesIDs), + } t.measurementIndex[name] = idx } idx.measurement.Series = append(idx.measurement.Series, s) t.seriesToMeasurement[s.ID] = idx.measurement t.series[s.ID] = s + // add this series id to the tag index on the measurement + for k, v := range s.Tags { + valueMap := idx.tagsToSeries[k] + if valueMap == nil { + valueMap = make(map[string]SeriesIDs) + idx.tagsToSeries[k] = valueMap + } + ids := valueMap[v] + if ids == nil { + ids = make([]uint32, 0) + } + ids = append(ids, s.ID) + ids.Sort() + valueMap[v] = ids + } + + // TODO: add this series to the global tag index + return idx.addSeries(s) } // AddField adds a field to the measurement name. Returns false if already present func (t *TagIndex) AddField(name string, f *Field) bool { + panic("not implemented") return false } // SeriesIDs returns an array of series ids for the given measurements and filters to be applied to all -func (t *TagIndex) SeriesIDs(names []string, filters []*Filter) SeriesIDs { +func (t *TagIndex) SeriesIDs(names []string, filters Filters) SeriesIDs { t.mu.RLock() defer t.mu.RUnlock() - ids := make([]uint32, 0) - for _, idx := range t.measurementIndex { - for _, s := range idx.series { - ids = append(ids, s.ID) + // they want all ids if no filters are specified + if len(filters) == 0 { + ids := SeriesIDs(make([]uint32, 0)) + for _, idx := range t.measurementIndex { + for _, s := range idx.series { + ids = append(ids, s.ID) + } } + ids.Sort() + return ids } + + ids := SeriesIDs(make([]uint32, 0)) + for _, n := range names { + ids = append(ids, t.seriesIDsForName(n, filters)...) + } + ids.Sort() return ids } +func (t *TagIndex) seriesIDsForName(name string, filters Filters) SeriesIDs { + idx := t.measurementIndex[name] + if idx == nil { + return nil + } + + // process the filters one at a time to get the list of ids they return + idsPerFilter := make([]SeriesIDs, len(filters), len(filters)) + for i, filter := range filters { + var ids SeriesIDs + values := idx.tagsToSeries[filter.Key] + if values != nil { + ids = SeriesIDs(values[filter.Value]) + } + idsPerFilter[i] = ids + } + + // collapse the set of ids + allIDs := idsPerFilter[0] + for i := 1; i < len(filters); i++ { + allIDs = allIDs.Intersect(idsPerFilter[i]) + } + + return allIDs +} + +// MeasurementBySeriesID returns the Measurement that is the parent of the given series id. func (t *TagIndex) MeasurementBySeriesID(id uint32) *Measurement { t.mu.RLock() defer t.mu.RUnlock() return t.seriesToMeasurement[id] } +// MeasurementAndSeries returns the Measurement and the Series for a given measurement name and tag set. func (t *TagIndex) MeasurementAndSeries(name string, tags map[string]string) (*Measurement, *Series) { t.mu.RLock() defer t.mu.RUnlock() @@ -130,10 +239,12 @@ func (t *TagIndex) MeasurementAndSeries(name string, tags map[string]string) (*M return idx.measurement, idx.seriesByTags(tags) } +// SereiesByID returns the Series that has the given id. func (t *TagIndex) SeriesByID(id uint32) *Series { return t.series[id] } +// Measurements returns all measurements that match the given filters. func (t *TagIndex) Measurements(filters []*Filter) []*Measurement { measurements := make([]*Measurement, 0, len(t.measurementIndex)) for _, idx := range t.measurementIndex { @@ -142,10 +253,12 @@ func (t *TagIndex) Measurements(filters []*Filter) []*Measurement { return measurements } +// DropSeries will clear the index of all references to a series. func (t *TagIndex) DropSeries(id uint32) { panic("not implemented") } +// DropMeasurement will clear the index of all references to a measurement and its child series. func (t *TagIndex) DropMeasurement(name string) { panic("not implemented") } diff --git a/tag_index_test.go b/tag_index_test.go index 5dfa3a70ec..49488b5291 100644 --- a/tag_index_test.go +++ b/tag_index_test.go @@ -143,7 +143,54 @@ func TestTagIndex_SeriesIDs(t *testing.T) { } func TestTagIndex_SeriesIDsWhereFilter(t *testing.T) { - t.Skip("pending") + idx := indexWithFixtureData() + + var tests = []struct { + names []string + filters []*influxdb.Filter + result []uint32 + }{ + // match against no tags + { + names: []string{"cpu_load", "redis"}, + result: []uint32{uint32(1), uint32(2), uint32(3), uint32(4), uint32(5)}, + }, + + // match against all tags + { + names: []string{"cpu_load"}, + filters: []*influxdb.Filter{ + &influxdb.Filter{Key: "host", Value: "servera.influx.com"}, + &influxdb.Filter{Key: "region", Value: "uswest"}, + }, + result: []uint32{uint32(1)}, + }, + + // match against one tag + { + names: []string{"cpu_load"}, + filters: []*influxdb.Filter{ + &influxdb.Filter{Key: "region", Value: "uswest"}, + }, + result: []uint32{uint32(1), uint32(2)}, + }, + + // partial match against one tag + + // partial match against two tags + + // query against tag key that doesn't exist returns empty + + // query against tag value that doesn't exist returns empty + } + + for i, tt := range tests { + r := idx.SeriesIDs(tt.names, tt.filters) + expectedIDs := influxdb.SeriesIDs(tt.result) + if !r.Equals(expectedIDs) { + t.Fatalf("%d: filters: %s: result mismatch:\n exp=%s\n got=%s", i, influxdb.Filters(tt.filters), expectedIDs, r) + } + } } func TestTagIndex_SeriesIDsWhereFilterMultiple(t *testing.T) { @@ -186,6 +233,10 @@ func TestTagIndex_TagValuesWhereFilterAndNot(t *testing.T) { t.Skip("pending") } +func TestTagIndex_MeasurementsWhereFilter(t *testing.T) { + t.Skip("pending") +} + func TestTagIndex_DropSeries(t *testing.T) { t.Skip("pending") } @@ -193,3 +244,54 @@ func TestTagIndex_DropSeries(t *testing.T) { func TestTagIndex_DropMeasurement(t *testing.T) { t.Skip("pending") } + +// indexWithFixtureData returns a populated TagIndex for use in many of the filtering tests +func indexWithFixtureData() *influxdb.TagIndex { + idx := influxdb.NewTagIndex() + s := &influxdb.Series{ + ID: uint32(1), + Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"}} + + added := idx.AddSeries("cpu_load", s) + if !added { + return nil + } + + s = &influxdb.Series{ + ID: uint32(2), + Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}} + + added = idx.AddSeries("cpu_load", s) + if !added { + return nil + } + + s = &influxdb.Series{ + ID: uint32(3), + Tags: map[string]string{"host": "serverc.influx.com", "region": "uswest", "service": "redis"}} + + added = idx.AddSeries("key_count", s) + if !added { + return nil + } + + s = &influxdb.Series{ + ID: uint32(4), + Tags: map[string]string{"host": "serverd.influx.com", "region": "useast", "service": "redis"}} + + added = idx.AddSeries("key_count", s) + if !added { + return nil + } + + s = &influxdb.Series{ + ID: uint32(5), + Tags: map[string]string{"name": "high priority"}} + + added = idx.AddSeries("queue_depth", s) + if !added { + return nil + } + + return idx +}