Refactor measurementIndex into the measurement object.

* Updated megastore to build the per database index for the server object
* Changed Server to have MeasurementNames and MeasurementSeriesIDs methods instead of generic Measurement method.
pull/1264/head
Paul Dix 2014-12-30 16:45:58 -05:00
parent 4716e0c03d
commit f763060b68
5 changed files with 178 additions and 171 deletions

155
index.go
View File

@ -11,15 +11,15 @@ import (
// and series within a database.
type Index struct {
mu sync.RWMutex
measurementIndex map[string]*measurementIndex // map measurement name to its tag index
seriesToMeasurement map[uint32]*Measurement // map series id to its measurement
series map[uint32]*Series // map series id to the Series object
names []string // sorted list of the measurement names
measurements map[string]*Measurement // measurement name to object and index
seriesToMeasurement map[uint32]*Measurement // map series id to its measurement
series map[uint32]*Series // map series id to the Series object
names []string // sorted list of the measurement names
}
func NewIndex() *Index {
return &Index{
measurementIndex: make(map[string]*measurementIndex),
measurements: make(map[string]*Measurement),
seriesToMeasurement: make(map[uint32]*Measurement),
series: make(map[uint32]*Series),
names: make([]string, 0),
@ -144,101 +144,6 @@ func (l SeriesIDs) Reject(r SeriesIDs) SeriesIDs {
return SeriesIDs(ids)
}
// Keeps a mapping of the series in a measurement
type measurementIndex struct {
series map[string]*Series // sorted tag string to the series object
measurement *Measurement
seriesByTagset map[string]map[string]SeriesIDs // map from tag key to value to sorted set of series ids
ids SeriesIDs // sorted list of series IDs in this measurement
}
// addSeries will add a series to the measurementIndex. Returns false if already present
func (m *measurementIndex) addSeries(s *Series) bool {
tagset := string(marshalTags(s.Tags))
if _, ok := m.series[tagset]; ok {
return false
}
m.series[tagset] = s
m.ids = append(m.ids, s.ID)
// the series ID should always be higher than all others because it's a new
// series. So don't do the sort if we don't have to.
if len(m.ids) > 1 && m.ids[len(m.ids)-1] < m.ids[len(m.ids)-2] {
sort.Sort(m.ids)
}
// add this series id to the tag index on the measurement
for k, v := range s.Tags {
valueMap := m.seriesByTagset[k]
if valueMap == nil {
valueMap = make(map[string]SeriesIDs)
m.seriesByTagset[k] = valueMap
}
ids := valueMap[v]
ids = append(ids, s.ID)
// most of the time the series ID will be higher than all others because it's a new
// series. So don't do the sort if we don't have to.
if len(ids) > 1 && ids[len(ids)-1] < ids[len(ids)-2] {
sort.Sort(ids)
}
valueMap[v] = ids
}
return true
}
// seriesByTags returns the Series that matches the given tagset.
func (m *measurementIndex) seriesByTags(tags map[string]string) *Series {
return m.series[string(marshalTags(tags))]
}
// sereisIDs returns the series ids for a given filter
func (m measurementIndex) seriesIDs(filter *Filter) (ids SeriesIDs) {
values := m.seriesByTagset[filter.Key]
if values == nil {
return
}
// hanlde regex filters
if filter.Regex != nil {
for k, v := range values {
if filter.Regex.MatchString(k) {
if ids == nil {
ids = v
} else {
ids = ids.Union(v)
}
}
}
if filter.Not {
ids = m.ids.Reject(ids)
}
return
}
// this is for the value is not null query
if filter.Not && filter.Value == "" {
for _, v := range values {
if ids == nil {
ids = v
} else {
ids.Intersect(v)
}
}
return
}
// get the ids that have the given key/value tag pair
ids = SeriesIDs(values[filter.Value])
// filter out these ids from the entire set if it's a not query
if filter.Not {
ids = m.ids.Reject(ids)
}
return
}
// AddSeries adds the series for the given measurement to the index. Returns false if already present
func (t *Index) AddSeries(name string, s *Series) bool {
t.mu.Lock()
@ -249,27 +154,27 @@ func (t *Index) AddSeries(name string, s *Series) bool {
return false
}
// get or create the measurement index
idx := t.measurementIndex[name]
if idx == nil {
idx = &measurementIndex{
series: make(map[string]*Series),
measurement: NewMeasurement(name),
seriesByTagset: make(map[string]map[string]SeriesIDs),
ids: SeriesIDs(make([]uint32, 0)),
}
t.measurementIndex[name] = idx
t.names = append(t.names, name)
sort.Strings(t.names)
}
idx.measurement.Series = append(idx.measurement.Series, s)
t.seriesToMeasurement[s.ID] = idx.measurement
// get or create the measurement index and index it globally and in the measurement
idx := t.createMeasurementIfNotExists(name)
t.seriesToMeasurement[s.ID] = idx
t.series[s.ID] = s
// TODO: add this series to the global tag index
b := idx.addSeries(s)
return b
return idx.addSeries(s)
}
// createMeasurementIfNotExists will either add a measurement object to the index or return the existing one.
func (t *Index) createMeasurementIfNotExists(name string) *Measurement {
idx := t.measurements[name]
if idx == nil {
idx = NewMeasurement(name)
t.measurements[name] = idx
t.names = append(t.names, name)
sort.Strings(t.names)
}
return idx
}
// AddField adds a field to the measurement name. Returns false if already present
@ -288,7 +193,7 @@ func (t *Index) SeriesIDs(names []string, filters Filters) SeriesIDs {
// they want all ids if no filters are specified
if len(filters) == 0 {
ids := SeriesIDs(make([]uint32, 0))
for _, idx := range t.measurementIndex {
for _, idx := range t.measurements {
ids = ids.Union(idx.ids)
}
return ids
@ -313,9 +218,9 @@ func (t *Index) TagKeys(names []string) []string {
keys := make(map[string]bool)
for _, n := range names {
idx := t.measurementIndex[n]
idx := t.measurements[n]
if idx != nil {
for k, _ := range idx.seriesByTagset {
for k, _ := range idx.seriesByTagKeyValue {
keys[k] = true
}
}
@ -332,7 +237,7 @@ func (t *Index) TagKeys(names []string) []string {
//seriesIDsForName is the same as SeriesIDs, but for a specific measurement.
func (t *Index) seriesIDsForName(name string, filters Filters) SeriesIDs {
idx := t.measurementIndex[name]
idx := t.measurements[name]
if idx == nil {
return nil
}
@ -363,11 +268,11 @@ func (t *Index) MeasurementBySeriesID(id uint32) *Measurement {
func (t *Index) MeasurementAndSeries(name string, tags map[string]string) (*Measurement, *Series) {
t.mu.RLock()
defer t.mu.RUnlock()
idx := t.measurementIndex[name]
idx := t.measurements[name]
if idx == nil {
return nil, nil
}
return idx.measurement, idx.seriesByTags(tags)
return idx, idx.seriesByTags(tags)
}
// SereiesByID returns the Series that has the given id.
@ -377,8 +282,8 @@ func (t *Index) SeriesByID(id uint32) *Series {
// Measurements returns all measurements that match the given filters.
func (t *Index) Measurements(filters []*Filter) []*Measurement {
measurements := make([]*Measurement, 0, len(t.measurementIndex))
for _, idx := range t.measurementIndex {
measurements := make([]*Measurement, 0, len(t.measurements))
for _, idx := range t.measurements {
measurements = append(measurements, idx.measurement)
}
return measurements

View File

@ -24,23 +24,23 @@ func TestIndex_MeasurementBySeriesID(t *testing.T) {
idx := influxdb.NewIndex()
m := &influxdb.Measurement{
Name: "cpu_load",
Series: []*influxdb.Series{
&influxdb.Series{
ID: uint32(1),
Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"}}}}
}
s := &influxdb.Series{
ID: uint32(1),
Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"},
}
// add it and see if we can look it up
idx.AddSeries(m.Name, m.Series[0])
idx.AddSeries(m.Name, s)
mm := idx.MeasurementBySeriesID(uint32(1))
if mustMarshalJSON(m) != mustMarshalJSON(mm) {
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
}
// now test that we can add another
s := &influxdb.Series{
s = &influxdb.Series{
ID: uint32(2),
Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}}
m.Series = append(m.Series, s)
idx.AddSeries(m.Name, s)
mm = idx.MeasurementBySeriesID(uint32(2))
@ -80,11 +80,11 @@ func TestIndex_MeasurementAndSeries(t *testing.T) {
idx := influxdb.NewIndex()
m := &influxdb.Measurement{
Name: "cpu_load",
Series: []*influxdb.Series{
&influxdb.Series{
ID: uint32(1),
Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"}}}}
s := m.Series[0]
}
s := &influxdb.Series{
ID: uint32(1),
Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"},
}
// add it and see if we can look it up by name and tags
idx.AddSeries(m.Name, s)
@ -99,7 +99,6 @@ func TestIndex_MeasurementAndSeries(t *testing.T) {
s = &influxdb.Series{
ID: uint32(2),
Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}}
m.Series = append(m.Series, s)
idx.AddSeries(m.Name, s)
mm, ss = idx.MeasurementAndSeries(m.Name, s.Tags)

View File

@ -157,24 +157,26 @@ func (tx *metatx) createSeries(database, name string, tags map[string]string) (*
return s, nil
}
// series returns all the measurements and series in a database
func (tx *metatx) measurements(database string) []*Measurement {
// loops through all the measurements and series in a database
func (tx *metatx) measurementIndex(database string) *Index {
// get the bucket that holds series data for the database
b := tx.Bucket([]byte("Databases")).Bucket([]byte(database)).Bucket([]byte("Series"))
measurements := make([]*Measurement, 0)
c := b.Cursor()
// create the index and populate it from the series data
idx := NewIndex()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
mc := b.Bucket(k).Cursor()
m := &Measurement{Name: string(k), Series: make([]*Series, 0)}
name := string(k)
for id, v := mc.First(); id != nil; id, v = mc.Next() {
var s *Series
mustUnmarshalJSON(v, &s)
m.Series = append(m.Series, s)
idx.AddSeries(name, s)
}
measurements = append(measurements, m)
}
return measurements
return idx
}
// user returns a user from the metastore by name.

121
server.go
View File

@ -192,21 +192,15 @@ func (s *Server) load() error {
}
for db := range s.databases {
log.Printf("Loading metadata index for %d\n", db)
var measurements []*Measurement
var idx *Index
err := s.meta.view(func(tx *metatx) error {
measurements = tx.measurements(db)
idx = tx.measurementIndex(db)
return nil
})
if err != nil {
return err
}
idx := NewIndex()
s.metaIndexes[db] = idx
for _, m := range measurements {
for _, ss := range m.Series {
idx.AddSeries(m.Name, ss)
}
}
}
return nil
}
@ -962,12 +956,20 @@ func (s *Server) createSeriesIfNotExists(database, name string, tags map[string]
return series.ID, nil
}
func (s *Server) Measurements(database string) (a Measurements) {
func (s *Server) MeasurementNames(database string) []string {
s.mu.RLock()
idx := s.metaIndexes[database]
s.mu.RUnlock()
return idx.Measurements(nil)
return idx.names
}
func (s *Server) MeasurementSeriesIDs(database, measurement string) SeriesIDs {
s.mu.RLock()
idx := s.metaIndexes[database]
s.mu.RUnlock()
return idx.SeriesIDs([]string{measurement}, nil)
}
// processor runs in a separate goroutine and processes all incoming broker messages.
@ -1124,18 +1126,113 @@ type databaseJSON struct {
// Measurement represents a collection of time series in a database
type Measurement struct {
Name string `json:"name,omitempty"`
Series []*Series `json:"series,omitempty"`
Fields []*Fields `json:"fields,omitempty"`
// in memory index fields
series map[string]*Series // sorted tagset string to the series object
measurement *Measurement
seriesByTagKeyValue map[string]map[string]SeriesIDs // map from tag key to value to sorted set of series ids
ids SeriesIDs // sorted list of series IDs in this measurement
}
func NewMeasurement(name string) *Measurement {
return &Measurement{
Name: name,
Series: make([]*Series, 0),
Fields: make([]*Fields, 0),
series: make(map[string]*Series),
seriesByTagKeyValue: make(map[string]map[string]SeriesIDs),
ids: SeriesIDs(make([]uint32, 0)),
}
}
// addSeries will add a series to the measurementIndex. Returns false if already present
func (m *Measurement) addSeries(s *Series) bool {
tagset := string(marshalTags(s.Tags))
if _, ok := m.series[tagset]; ok {
return false
}
m.series[tagset] = s
m.ids = append(m.ids, s.ID)
// the series ID should always be higher than all others because it's a new
// series. So don't do the sort if we don't have to.
if len(m.ids) > 1 && m.ids[len(m.ids)-1] < m.ids[len(m.ids)-2] {
sort.Sort(m.ids)
}
// add this series id to the tag index on the measurement
for k, v := range s.Tags {
valueMap := m.seriesByTagKeyValue[k]
if valueMap == nil {
valueMap = make(map[string]SeriesIDs)
m.seriesByTagKeyValue[k] = valueMap
}
ids := valueMap[v]
ids = append(ids, s.ID)
// most of the time the series ID will be higher than all others because it's a new
// series. So don't do the sort if we don't have to.
if len(ids) > 1 && ids[len(ids)-1] < ids[len(ids)-2] {
sort.Sort(ids)
}
valueMap[v] = ids
}
return true
}
// seriesByTags returns the Series that matches the given tagset.
func (m *Measurement) seriesByTags(tags map[string]string) *Series {
return m.series[string(marshalTags(tags))]
}
// sereisIDs returns the series ids for a given filter
func (m *Measurement) seriesIDs(filter *Filter) (ids SeriesIDs) {
values := m.seriesByTagKeyValue[filter.Key]
if values == nil {
return
}
// hanlde regex filters
if filter.Regex != nil {
for k, v := range values {
if filter.Regex.MatchString(k) {
if ids == nil {
ids = v
} else {
ids = ids.Union(v)
}
}
}
if filter.Not {
ids = m.ids.Reject(ids)
}
return
}
// this is for the value is not null query
if filter.Not && filter.Value == "" {
for _, v := range values {
if ids == nil {
ids = v
} else {
ids.Intersect(v)
}
}
return
}
// get the ids that have the given key/value tag pair
ids = SeriesIDs(values[filter.Value])
// filter out these ids from the entire set if it's a not query
if filter.Not {
ids = m.ids.Reject(ids)
}
return
}
type Measurements []*Measurement
func (m Measurement) String() string { return string(mustMarshalJSON(m)) }

View File

@ -429,22 +429,26 @@ func TestServer_Measurements(t *testing.T) {
t.Fatal(err)
}
r := s.Measurements("foo")
m := []*influxdb.Measurement{
&influxdb.Measurement{
Name: "cpu_load",
Series: []*influxdb.Series{
&influxdb.Series{
ID: uint32(1),
Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"}}}}}
if !measurementsEqual(r, m) {
t.Fatalf("Mesurements not the same:\n%s\n%s", r, m)
expectedMeasurementNames := []string{"foo"}
expectedSeriesIDs := influxdb.SeriesIDs([]uint32{uint32(1)})
names := s.MeasurementNames("foo")
if !reflect.DeepEqual(names, expectedMeasurementNames) {
t.Fatalf("Mesurements not the same:\n exp: %s\n got: %s", expectedMeasurementNames, names)
}
ids := s.MeasurementSeriesIDs("foo", "foo")
if !ids.Equals(expectedSeriesIDs) {
t.Fatalf("Series IDs not the same:\n exp: %s\n got: %s", expectedSeriesIDs, ids)
}
s.Restart()
r = s.Measurements("foo")
if !measurementsEqual(r, m) {
t.Fatalf("Mesurements not the same:\n%s\n%s", r, m)
names = s.MeasurementNames("foo")
if !reflect.DeepEqual(names, expectedMeasurementNames) {
t.Fatalf("Mesurements not the same:\n exp: %s\n got: %s", expectedMeasurementNames, names)
}
ids = s.MeasurementSeriesIDs("foo", "foo")
if !ids.Equals(expectedSeriesIDs) {
t.Fatalf("Series IDs not the same:\n exp: %s\n got: %s", expectedSeriesIDs, ids)
}
}