diff --git a/index.go b/index.go index 9316cd89db..cdaade4c05 100644 --- a/index.go +++ b/index.go @@ -11,15 +11,15 @@ import ( // and series within a database. type Index struct { mu sync.RWMutex - measurementIndex map[string]*measurementIndex // map measurement name to its tag index - seriesToMeasurement map[uint32]*Measurement // map series id to its measurement - series map[uint32]*Series // map series id to the Series object - names []string // sorted list of the measurement names + measurements map[string]*Measurement // measurement name to object and index + seriesToMeasurement map[uint32]*Measurement // map series id to its measurement + series map[uint32]*Series // map series id to the Series object + names []string // sorted list of the measurement names } func NewIndex() *Index { return &Index{ - measurementIndex: make(map[string]*measurementIndex), + measurements: make(map[string]*Measurement), seriesToMeasurement: make(map[uint32]*Measurement), series: make(map[uint32]*Series), names: make([]string, 0), @@ -144,101 +144,6 @@ func (l SeriesIDs) Reject(r SeriesIDs) SeriesIDs { return SeriesIDs(ids) } -// Keeps a mapping of the series in a measurement -type measurementIndex struct { - series map[string]*Series // sorted tag string to the series object - measurement *Measurement - seriesByTagset map[string]map[string]SeriesIDs // map from tag key to value to sorted set of series ids - ids SeriesIDs // sorted list of series IDs in this measurement -} - -// addSeries will add a series to the measurementIndex. Returns false if already present -func (m *measurementIndex) addSeries(s *Series) bool { - tagset := string(marshalTags(s.Tags)) - if _, ok := m.series[tagset]; ok { - return false - } - m.series[tagset] = s - m.ids = append(m.ids, s.ID) - // the series ID should always be higher than all others because it's a new - // series. So don't do the sort if we don't have to. - if len(m.ids) > 1 && m.ids[len(m.ids)-1] < m.ids[len(m.ids)-2] { - sort.Sort(m.ids) - } - - // add this series id to the tag index on the measurement - for k, v := range s.Tags { - valueMap := m.seriesByTagset[k] - if valueMap == nil { - valueMap = make(map[string]SeriesIDs) - m.seriesByTagset[k] = valueMap - } - ids := valueMap[v] - ids = append(ids, s.ID) - - // most of the time the series ID will be higher than all others because it's a new - // series. So don't do the sort if we don't have to. - if len(ids) > 1 && ids[len(ids)-1] < ids[len(ids)-2] { - sort.Sort(ids) - } - valueMap[v] = ids - } - - return true -} - -// seriesByTags returns the Series that matches the given tagset. -func (m *measurementIndex) seriesByTags(tags map[string]string) *Series { - return m.series[string(marshalTags(tags))] -} - -// sereisIDs returns the series ids for a given filter -func (m measurementIndex) seriesIDs(filter *Filter) (ids SeriesIDs) { - values := m.seriesByTagset[filter.Key] - if values == nil { - return - } - - // hanlde regex filters - if filter.Regex != nil { - for k, v := range values { - if filter.Regex.MatchString(k) { - if ids == nil { - ids = v - } else { - ids = ids.Union(v) - } - } - } - if filter.Not { - ids = m.ids.Reject(ids) - } - return - } - - // this is for the value is not null query - if filter.Not && filter.Value == "" { - for _, v := range values { - if ids == nil { - ids = v - } else { - ids.Intersect(v) - } - } - return - } - - // get the ids that have the given key/value tag pair - ids = SeriesIDs(values[filter.Value]) - - // filter out these ids from the entire set if it's a not query - if filter.Not { - ids = m.ids.Reject(ids) - } - - return -} - // AddSeries adds the series for the given measurement to the index. Returns false if already present func (t *Index) AddSeries(name string, s *Series) bool { t.mu.Lock() @@ -249,27 +154,27 @@ func (t *Index) AddSeries(name string, s *Series) bool { return false } - // get or create the measurement index - idx := t.measurementIndex[name] - if idx == nil { - idx = &measurementIndex{ - series: make(map[string]*Series), - measurement: NewMeasurement(name), - seriesByTagset: make(map[string]map[string]SeriesIDs), - ids: SeriesIDs(make([]uint32, 0)), - } - t.measurementIndex[name] = idx - t.names = append(t.names, name) - sort.Strings(t.names) - } - idx.measurement.Series = append(idx.measurement.Series, s) - t.seriesToMeasurement[s.ID] = idx.measurement + // get or create the measurement index and index it globally and in the measurement + idx := t.createMeasurementIfNotExists(name) + + t.seriesToMeasurement[s.ID] = idx t.series[s.ID] = s // TODO: add this series to the global tag index - b := idx.addSeries(s) - return b + return idx.addSeries(s) +} + +// createMeasurementIfNotExists will either add a measurement object to the index or return the existing one. +func (t *Index) createMeasurementIfNotExists(name string) *Measurement { + idx := t.measurements[name] + if idx == nil { + idx = NewMeasurement(name) + t.measurements[name] = idx + t.names = append(t.names, name) + sort.Strings(t.names) + } + return idx } // AddField adds a field to the measurement name. Returns false if already present @@ -288,7 +193,7 @@ func (t *Index) SeriesIDs(names []string, filters Filters) SeriesIDs { // they want all ids if no filters are specified if len(filters) == 0 { ids := SeriesIDs(make([]uint32, 0)) - for _, idx := range t.measurementIndex { + for _, idx := range t.measurements { ids = ids.Union(idx.ids) } return ids @@ -313,9 +218,9 @@ func (t *Index) TagKeys(names []string) []string { keys := make(map[string]bool) for _, n := range names { - idx := t.measurementIndex[n] + idx := t.measurements[n] if idx != nil { - for k, _ := range idx.seriesByTagset { + for k, _ := range idx.seriesByTagKeyValue { keys[k] = true } } @@ -332,7 +237,7 @@ func (t *Index) TagKeys(names []string) []string { //seriesIDsForName is the same as SeriesIDs, but for a specific measurement. func (t *Index) seriesIDsForName(name string, filters Filters) SeriesIDs { - idx := t.measurementIndex[name] + idx := t.measurements[name] if idx == nil { return nil } @@ -363,11 +268,11 @@ func (t *Index) MeasurementBySeriesID(id uint32) *Measurement { func (t *Index) MeasurementAndSeries(name string, tags map[string]string) (*Measurement, *Series) { t.mu.RLock() defer t.mu.RUnlock() - idx := t.measurementIndex[name] + idx := t.measurements[name] if idx == nil { return nil, nil } - return idx.measurement, idx.seriesByTags(tags) + return idx, idx.seriesByTags(tags) } // SereiesByID returns the Series that has the given id. @@ -377,8 +282,8 @@ func (t *Index) SeriesByID(id uint32) *Series { // Measurements returns all measurements that match the given filters. func (t *Index) Measurements(filters []*Filter) []*Measurement { - measurements := make([]*Measurement, 0, len(t.measurementIndex)) - for _, idx := range t.measurementIndex { + measurements := make([]*Measurement, 0, len(t.measurements)) + for _, idx := range t.measurements { measurements = append(measurements, idx.measurement) } return measurements diff --git a/index_test.go b/index_test.go index a644410eab..2d16e770e3 100644 --- a/index_test.go +++ b/index_test.go @@ -24,23 +24,23 @@ func TestIndex_MeasurementBySeriesID(t *testing.T) { idx := influxdb.NewIndex() m := &influxdb.Measurement{ Name: "cpu_load", - Series: []*influxdb.Series{ - &influxdb.Series{ - ID: uint32(1), - Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"}}}} + } + s := &influxdb.Series{ + ID: uint32(1), + Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"}, + } // add it and see if we can look it up - idx.AddSeries(m.Name, m.Series[0]) + idx.AddSeries(m.Name, s) mm := idx.MeasurementBySeriesID(uint32(1)) if mustMarshalJSON(m) != mustMarshalJSON(mm) { t.Fatalf("mesurement not equal:\n%s\n%s", m, mm) } // now test that we can add another - s := &influxdb.Series{ + s = &influxdb.Series{ ID: uint32(2), Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}} - m.Series = append(m.Series, s) idx.AddSeries(m.Name, s) mm = idx.MeasurementBySeriesID(uint32(2)) @@ -80,11 +80,11 @@ func TestIndex_MeasurementAndSeries(t *testing.T) { idx := influxdb.NewIndex() m := &influxdb.Measurement{ Name: "cpu_load", - Series: []*influxdb.Series{ - &influxdb.Series{ - ID: uint32(1), - Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"}}}} - s := m.Series[0] + } + s := &influxdb.Series{ + ID: uint32(1), + Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"}, + } // add it and see if we can look it up by name and tags idx.AddSeries(m.Name, s) @@ -99,7 +99,6 @@ func TestIndex_MeasurementAndSeries(t *testing.T) { s = &influxdb.Series{ ID: uint32(2), Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}} - m.Series = append(m.Series, s) idx.AddSeries(m.Name, s) mm, ss = idx.MeasurementAndSeries(m.Name, s.Tags) diff --git a/metastore.go b/metastore.go index d6ff74d0c4..cd81199203 100644 --- a/metastore.go +++ b/metastore.go @@ -157,24 +157,26 @@ func (tx *metatx) createSeries(database, name string, tags map[string]string) (* return s, nil } -// series returns all the measurements and series in a database -func (tx *metatx) measurements(database string) []*Measurement { +// loops through all the measurements and series in a database +func (tx *metatx) measurementIndex(database string) *Index { // get the bucket that holds series data for the database b := tx.Bucket([]byte("Databases")).Bucket([]byte(database)).Bucket([]byte("Series")) - - measurements := make([]*Measurement, 0) c := b.Cursor() + + // create the index and populate it from the series data + idx := NewIndex() + for k, _ := c.First(); k != nil; k, _ = c.Next() { mc := b.Bucket(k).Cursor() - m := &Measurement{Name: string(k), Series: make([]*Series, 0)} + name := string(k) for id, v := mc.First(); id != nil; id, v = mc.Next() { var s *Series mustUnmarshalJSON(v, &s) - m.Series = append(m.Series, s) + idx.AddSeries(name, s) } - measurements = append(measurements, m) } - return measurements + + return idx } // user returns a user from the metastore by name. diff --git a/server.go b/server.go index fa0fcacacc..e384772067 100644 --- a/server.go +++ b/server.go @@ -192,21 +192,15 @@ func (s *Server) load() error { } for db := range s.databases { log.Printf("Loading metadata index for %d\n", db) - var measurements []*Measurement + var idx *Index err := s.meta.view(func(tx *metatx) error { - measurements = tx.measurements(db) + idx = tx.measurementIndex(db) return nil }) if err != nil { return err } - idx := NewIndex() s.metaIndexes[db] = idx - for _, m := range measurements { - for _, ss := range m.Series { - idx.AddSeries(m.Name, ss) - } - } } return nil } @@ -962,12 +956,20 @@ func (s *Server) createSeriesIfNotExists(database, name string, tags map[string] return series.ID, nil } -func (s *Server) Measurements(database string) (a Measurements) { +func (s *Server) MeasurementNames(database string) []string { s.mu.RLock() idx := s.metaIndexes[database] s.mu.RUnlock() - return idx.Measurements(nil) + return idx.names +} + +func (s *Server) MeasurementSeriesIDs(database, measurement string) SeriesIDs { + s.mu.RLock() + idx := s.metaIndexes[database] + s.mu.RUnlock() + + return idx.SeriesIDs([]string{measurement}, nil) } // processor runs in a separate goroutine and processes all incoming broker messages. @@ -1124,18 +1126,113 @@ type databaseJSON struct { // Measurement represents a collection of time series in a database type Measurement struct { Name string `json:"name,omitempty"` - Series []*Series `json:"series,omitempty"` Fields []*Fields `json:"fields,omitempty"` + + // in memory index fields + series map[string]*Series // sorted tagset string to the series object + measurement *Measurement + seriesByTagKeyValue map[string]map[string]SeriesIDs // map from tag key to value to sorted set of series ids + ids SeriesIDs // sorted list of series IDs in this measurement } func NewMeasurement(name string) *Measurement { return &Measurement{ Name: name, - Series: make([]*Series, 0), Fields: make([]*Fields, 0), + + series: make(map[string]*Series), + seriesByTagKeyValue: make(map[string]map[string]SeriesIDs), + ids: SeriesIDs(make([]uint32, 0)), } } +// addSeries will add a series to the measurementIndex. Returns false if already present +func (m *Measurement) addSeries(s *Series) bool { + tagset := string(marshalTags(s.Tags)) + if _, ok := m.series[tagset]; ok { + return false + } + m.series[tagset] = s + m.ids = append(m.ids, s.ID) + // the series ID should always be higher than all others because it's a new + // series. So don't do the sort if we don't have to. + if len(m.ids) > 1 && m.ids[len(m.ids)-1] < m.ids[len(m.ids)-2] { + sort.Sort(m.ids) + } + + // add this series id to the tag index on the measurement + for k, v := range s.Tags { + valueMap := m.seriesByTagKeyValue[k] + if valueMap == nil { + valueMap = make(map[string]SeriesIDs) + m.seriesByTagKeyValue[k] = valueMap + } + ids := valueMap[v] + ids = append(ids, s.ID) + + // most of the time the series ID will be higher than all others because it's a new + // series. So don't do the sort if we don't have to. + if len(ids) > 1 && ids[len(ids)-1] < ids[len(ids)-2] { + sort.Sort(ids) + } + valueMap[v] = ids + } + + return true +} + +// seriesByTags returns the Series that matches the given tagset. +func (m *Measurement) seriesByTags(tags map[string]string) *Series { + return m.series[string(marshalTags(tags))] +} + +// sereisIDs returns the series ids for a given filter +func (m *Measurement) seriesIDs(filter *Filter) (ids SeriesIDs) { + values := m.seriesByTagKeyValue[filter.Key] + if values == nil { + return + } + + // hanlde regex filters + if filter.Regex != nil { + for k, v := range values { + if filter.Regex.MatchString(k) { + if ids == nil { + ids = v + } else { + ids = ids.Union(v) + } + } + } + if filter.Not { + ids = m.ids.Reject(ids) + } + return + } + + // this is for the value is not null query + if filter.Not && filter.Value == "" { + for _, v := range values { + if ids == nil { + ids = v + } else { + ids.Intersect(v) + } + } + return + } + + // get the ids that have the given key/value tag pair + ids = SeriesIDs(values[filter.Value]) + + // filter out these ids from the entire set if it's a not query + if filter.Not { + ids = m.ids.Reject(ids) + } + + return +} + type Measurements []*Measurement func (m Measurement) String() string { return string(mustMarshalJSON(m)) } diff --git a/server_test.go b/server_test.go index 8d5e734a80..a51276c14a 100644 --- a/server_test.go +++ b/server_test.go @@ -429,22 +429,26 @@ func TestServer_Measurements(t *testing.T) { t.Fatal(err) } - r := s.Measurements("foo") - m := []*influxdb.Measurement{ - &influxdb.Measurement{ - Name: "cpu_load", - Series: []*influxdb.Series{ - &influxdb.Series{ - ID: uint32(1), - Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"}}}}} - if !measurementsEqual(r, m) { - t.Fatalf("Mesurements not the same:\n%s\n%s", r, m) + expectedMeasurementNames := []string{"foo"} + expectedSeriesIDs := influxdb.SeriesIDs([]uint32{uint32(1)}) + names := s.MeasurementNames("foo") + if !reflect.DeepEqual(names, expectedMeasurementNames) { + t.Fatalf("Mesurements not the same:\n exp: %s\n got: %s", expectedMeasurementNames, names) + } + ids := s.MeasurementSeriesIDs("foo", "foo") + if !ids.Equals(expectedSeriesIDs) { + t.Fatalf("Series IDs not the same:\n exp: %s\n got: %s", expectedSeriesIDs, ids) } s.Restart() - r = s.Measurements("foo") - if !measurementsEqual(r, m) { - t.Fatalf("Mesurements not the same:\n%s\n%s", r, m) + + names = s.MeasurementNames("foo") + if !reflect.DeepEqual(names, expectedMeasurementNames) { + t.Fatalf("Mesurements not the same:\n exp: %s\n got: %s", expectedMeasurementNames, names) + } + ids = s.MeasurementSeriesIDs("foo", "foo") + if !ids.Equals(expectedSeriesIDs) { + t.Fatalf("Series IDs not the same:\n exp: %s\n got: %s", expectedSeriesIDs, ids) } }