Update database style based on feedback

pull/1264/head
Paul Dix 2015-01-02 14:02:02 -05:00
parent d5548aa136
commit b42def229a
4 changed files with 88 additions and 90 deletions

View File

@ -40,12 +40,12 @@ func newDatabase() *database {
}
// shardByTimestamp returns a shard that owns a given timestamp.
func (db *database) shardByTimestamp(policy string, id uint32, timestamp time.Time) (*Shard, error) {
func (db *database) shardByTimestamp(policy string, seriesID uint32, timestamp time.Time) (*Shard, error) {
p := db.policies[policy]
if p == nil {
return nil, ErrRetentionPolicyNotFound
}
return p.shardByTimestamp(id, timestamp), nil
return p.shardByTimestamp(seriesID, timestamp), nil
}
// shardsByTimestamp returns all shards that own a given timestamp.
@ -182,13 +182,13 @@ func (m *Measurement) seriesByTags(tags map[string]string) *Series {
}
// sereisIDs returns the series ids for a given filter
func (m *Measurement) seriesIDs(filter *Filter) (ids SeriesIDs) {
func (m *Measurement) seriesIDs(filter *TagFilter) (ids SeriesIDs) {
values := m.seriesByTagKeyValue[filter.Key]
if values == nil {
return
}
// hanlde regex filters
// handle regex filters
if filter.Regex != nil {
for k, v := range values {
if filter.Regex.MatchString(k) {
@ -228,7 +228,7 @@ func (m *Measurement) seriesIDs(filter *Filter) (ids SeriesIDs) {
return
}
// tagValues returns an array of unique tag values for the given key
// tagValues returns a map of unique tag values for the given key
func (m *Measurement) tagValues(key string) TagValues {
tags := m.seriesByTagKeyValue[key]
values := make(map[string]bool, len(tags))
@ -292,10 +292,10 @@ func NewRetentionPolicy(name string) *RetentionPolicy {
// shardByTimestamp returns the shard in the space that owns a given timestamp for a given series id.
// Returns nil if the shard does not exist.
func (rp *RetentionPolicy) shardByTimestamp(id uint32, timestamp time.Time) *Shard {
func (rp *RetentionPolicy) shardByTimestamp(seriesID uint32, timestamp time.Time) *Shard {
shards := rp.shardsByTimestamp(timestamp)
if len(shards) > 0 {
return shards[int(id)%len(shards)]
return shards[int(seriesID)%len(shards)]
}
return nil
}
@ -359,16 +359,14 @@ func (rps RetentionPolicies) Shards() []*Shard {
return shards
}
// Filter represents a tag filter when looking up other tags or measurements.
type Filter struct {
// TagFilter represents a tag filter when looking up other tags or measurements.
type TagFilter struct {
Not bool
Key string
Value string
Regex *regexp.Regexp
}
type Filters []*Filter
// SeriesIDs is a convenience type for sorting, checking equality, and doing union and
// intersection of collections of series ids.
type SeriesIDs []uint32
@ -477,15 +475,15 @@ func (l SeriesIDs) Reject(r SeriesIDs) SeriesIDs {
return SeriesIDs(ids)
}
// AddSeries adds the series for the given measurement to the index. Returns false if already present
func (t *database) AddSeries(name string, s *Series) bool {
// addSeriesToIndex adds the series for the given measurement to the index. Returns false if already present
func (t *database) addSeriesToIndex(measurementName string, s *Series) bool {
// if there is a measurement for this id, it's already been added
if t.seriesToMeasurement[s.ID] != nil {
return false
}
// get or create the measurement index and index it globally and in the measurement
idx := t.createMeasurementIfNotExists(name)
idx := t.createMeasurementIfNotExists(measurementName)
t.seriesToMeasurement[s.ID] = idx
t.series[s.ID] = s
@ -513,8 +511,8 @@ func (t *database) AddField(name string, f *Field) bool {
return false
}
// MeasurementsForSeriesIDs returns a collection of unique Measurements for the passed in SeriesIDs.
func (t *database) MeasurementsForSeriesIDs(seriesIDs SeriesIDs) []*Measurement {
// MeasurementsBySeriesIDs returns a collection of unique Measurements for the passed in SeriesIDs.
func (t *database) MeasurementsBySeriesIDs(seriesIDs SeriesIDs) []*Measurement {
measurements := make(map[*Measurement]bool)
for _, id := range seriesIDs {
@ -532,7 +530,7 @@ func (t *database) MeasurementsForSeriesIDs(seriesIDs SeriesIDs) []*Measurement
// SeriesIDs returns an array of series ids for the given measurements and filters to be applied to all.
// Filters are equivalent to and AND operation. If you want to do an OR, get the series IDs for one set,
// then get the series IDs for another set and use the SeriesIDs.Union to combine the two.
func (t *database) SeriesIDs(names []string, filters Filters) SeriesIDs {
func (t *database) SeriesIDs(names []string, filters []*TagFilter) SeriesIDs {
// they want all ids if no filters are specified
if len(filters) == 0 {
ids := SeriesIDs(make([]uint32, 0))
@ -544,7 +542,7 @@ func (t *database) SeriesIDs(names []string, filters Filters) SeriesIDs {
ids := SeriesIDs(make([]uint32, 0))
for _, n := range names {
ids = ids.Union(t.seriesIDsForName(n, filters))
ids = ids.Union(t.seriesIDsByName(n, filters))
}
return ids
@ -579,7 +577,7 @@ func (t *database) TagKeys(names []string) []string {
// Call .ToSlice() on the result to convert it into a sorted slice of strings.
// Filters are equivalent to and AND operation. If you want to do an OR, get the tag values for one set,
// then get the tag values for another set and do a union of the two.
func (t *database) TagValues(names []string, key string, filters []*Filter) TagValues {
func (t *database) TagValues(names []string, key string, filters []*TagFilter) TagValues {
values := TagValues(make(map[string]bool))
// see if they just want all the tag values for this key
@ -595,11 +593,11 @@ func (t *database) TagValues(names []string, key string, filters []*Filter) TagV
// they have filters so just get a set of series ids matching them and then get the tag values from those
seriesIDs := t.SeriesIDs(names, filters)
return t.tagValuesForSeries(key, seriesIDs)
return t.tagValuesBySeries(key, seriesIDs)
}
// tagValuesForSeries will return a TagValues map of all the unique tag values for a collection of series.
func (t *database) tagValuesForSeries(key string, seriesIDs SeriesIDs) TagValues {
// tagValuesBySeries will return a TagValues map of all the unique tag values for a collection of series.
func (t *database) tagValuesBySeries(key string, seriesIDs SeriesIDs) TagValues {
values := make(map[string]bool)
for _, id := range seriesIDs {
s := t.series[id]
@ -641,8 +639,8 @@ func (l TagValues) Intersect(r TagValues) {
}
}
//seriesIDsForName is the same as SeriesIDs, but for a specific measurement.
func (t *database) seriesIDsForName(name string, filters Filters) SeriesIDs {
//seriesIDsByName is the same as SeriesIDs, but for a specific measurement.
func (t *database) seriesIDsByName(name string, filters []*TagFilter) SeriesIDs {
idx := t.measurements[name]
if idx == nil {
return nil
@ -683,7 +681,7 @@ func (t *database) SeriesByID(id uint32) *Series {
}
// Measurements returns all measurements that match the given filters.
func (t *database) Measurements(filters []*Filter) []*Measurement {
func (t *database) Measurements(filters []*TagFilter) []*Measurement {
measurements := make([]*Measurement, 0, len(t.measurements))
for _, idx := range t.measurements {
measurements = append(measurements, idx.measurement)

View File

@ -30,7 +30,7 @@ func TestDatabase_MeasurementBySeriesID(t *testing.T) {
}
// add it and see if we can look it up
idx.AddSeries(m.Name, s)
idx.addSeriesToIndex(m.Name, s)
mm := idx.MeasurementBySeriesID(uint32(1))
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
@ -41,7 +41,7 @@ func TestDatabase_MeasurementBySeriesID(t *testing.T) {
ID: uint32(2),
Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}}
idx.AddSeries(m.Name, s)
idx.addSeriesToIndex(m.Name, s)
mm = idx.MeasurementBySeriesID(uint32(2))
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
@ -59,7 +59,7 @@ func TestDatabase_MeasurementsBySeriesIDs(t *testing.T) {
ids := SeriesIDs([]uint32{uint32(1), uint32(4)})
names := make([]string, 0)
for _, m := range idx.MeasurementsForSeriesIDs(ids) {
for _, m := range idx.MeasurementsBySeriesIDs(ids) {
names = append(names, m.Name)
}
sort.Strings(names)
@ -78,7 +78,7 @@ func TestDatabase_SeriesBySeriesID(t *testing.T) {
ID: uint32(2),
Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}}
idx.AddSeries("foo", s)
idx.addSeriesToIndex("foo", s)
ss := idx.SeriesByID(uint32(2))
if string(mustMarshalJSON(s)) != string(mustMarshalJSON(ss)) {
t.Fatalf("series not equal:\n%s\n%s", s, ss)
@ -97,7 +97,7 @@ func TestDatabase_MeasurementAndSeries(t *testing.T) {
}
// add it and see if we can look it up by name and tags
idx.AddSeries(m.Name, s)
idx.addSeriesToIndex(m.Name, s)
mm, ss := idx.MeasurementAndSeries(m.Name, s.Tags)
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
@ -110,7 +110,7 @@ func TestDatabase_MeasurementAndSeries(t *testing.T) {
ID: uint32(2),
Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}}
idx.AddSeries(m.Name, s)
idx.addSeriesToIndex(m.Name, s)
mm, ss = idx.MeasurementAndSeries(m.Name, s.Tags)
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
@ -127,13 +127,13 @@ func TestDatabase_SeriesIDs(t *testing.T) {
Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"}}
// add it and see if we can look it up
added := idx.AddSeries("cpu_load", s)
added := idx.addSeriesToIndex("cpu_load", s)
if !added {
t.Fatal("couldn't add series")
}
// test that we can't add it again
added = idx.AddSeries("cpu_load", s)
added = idx.addSeriesToIndex("cpu_load", s)
if added {
t.Fatal("shoulnd't be able to add duplicate series")
}
@ -142,7 +142,7 @@ func TestDatabase_SeriesIDs(t *testing.T) {
s = &Series{
ID: uint32(2),
Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}}
added = idx.AddSeries("cpu_load", s)
added = idx.addSeriesToIndex("cpu_load", s)
if !added {
t.Fatalf("couldn't add series")
}
@ -157,7 +157,7 @@ func TestDatabase_SeriesIDs(t *testing.T) {
s = &Series{
ID: uint32(3),
Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}}
added = idx.AddSeries("network_in", s)
added = idx.addSeriesToIndex("network_in", s)
if !added {
t.Fatalf("couldn't add series")
}
@ -169,12 +169,12 @@ func TestDatabase_SeriesIDs(t *testing.T) {
}
}
func TestDatabase_SeriesIDsWhereFilter(t *testing.T) {
func TestDatabase_SeriesIDsWhereTagFilter(t *testing.T) {
idx := databaseWithFixtureData()
var tests = []struct {
names []string
filters []*Filter
filters []*TagFilter
result []uint32
}{
// match against no tags
@ -186,9 +186,9 @@ func TestDatabase_SeriesIDsWhereFilter(t *testing.T) {
// match against all tags
{
names: []string{"cpu_load"},
filters: []*Filter{
&Filter{Key: "host", Value: "servera.influx.com"},
&Filter{Key: "region", Value: "uswest"},
filters: []*TagFilter{
&TagFilter{Key: "host", Value: "servera.influx.com"},
&TagFilter{Key: "region", Value: "uswest"},
},
result: []uint32{uint32(1)},
},
@ -196,8 +196,8 @@ func TestDatabase_SeriesIDsWhereFilter(t *testing.T) {
// match against one tag
{
names: []string{"cpu_load"},
filters: []*Filter{
&Filter{Key: "region", Value: "uswest"},
filters: []*TagFilter{
&TagFilter{Key: "region", Value: "uswest"},
},
result: []uint32{uint32(1), uint32(2)},
},
@ -205,8 +205,8 @@ func TestDatabase_SeriesIDsWhereFilter(t *testing.T) {
// match against one tag, single result
{
names: []string{"cpu_load"},
filters: []*Filter{
&Filter{Key: "host", Value: "servera.influx.com"},
filters: []*TagFilter{
&TagFilter{Key: "host", Value: "servera.influx.com"},
},
result: []uint32{uint32(1)},
},
@ -214,8 +214,8 @@ func TestDatabase_SeriesIDsWhereFilter(t *testing.T) {
// query against tag key that doesn't exist returns empty
{
names: []string{"cpu_load"},
filters: []*Filter{
&Filter{Key: "foo", Value: "bar"},
filters: []*TagFilter{
&TagFilter{Key: "foo", Value: "bar"},
},
result: []uint32{},
},
@ -223,8 +223,8 @@ func TestDatabase_SeriesIDsWhereFilter(t *testing.T) {
// query against tag value that doesn't exist returns empty
{
names: []string{"cpu_load"},
filters: []*Filter{
&Filter{Key: "host", Value: "foo"},
filters: []*TagFilter{
&TagFilter{Key: "host", Value: "foo"},
},
result: []uint32{},
},
@ -232,8 +232,8 @@ func TestDatabase_SeriesIDsWhereFilter(t *testing.T) {
// query against a tag NOT value
{
names: []string{"key_count"},
filters: []*Filter{
&Filter{Key: "region", Value: "useast", Not: true},
filters: []*TagFilter{
&TagFilter{Key: "region", Value: "useast", Not: true},
},
result: []uint32{uint32(3)},
},
@ -241,8 +241,8 @@ func TestDatabase_SeriesIDsWhereFilter(t *testing.T) {
// query against a tag NOT null
{
names: []string{"queue_depth"},
filters: []*Filter{
&Filter{Key: "app", Value: "", Not: true},
filters: []*TagFilter{
&TagFilter{Key: "app", Value: "", Not: true},
},
result: []uint32{uint32(6)},
},
@ -250,9 +250,9 @@ func TestDatabase_SeriesIDsWhereFilter(t *testing.T) {
// query against a tag value and another tag NOT value
{
names: []string{"queue_depth"},
filters: []*Filter{
&Filter{Key: "name", Value: "high priority"},
&Filter{Key: "app", Value: "paultown", Not: true},
filters: []*TagFilter{
&TagFilter{Key: "name", Value: "high priority"},
&TagFilter{Key: "app", Value: "paultown", Not: true},
},
result: []uint32{uint32(5), uint32(7)},
},
@ -260,8 +260,8 @@ func TestDatabase_SeriesIDsWhereFilter(t *testing.T) {
// query against a tag value matching regex
{
names: []string{"queue_depth"},
filters: []*Filter{
&Filter{Key: "app", Regex: regexp.MustCompile("paul.*")},
filters: []*TagFilter{
&TagFilter{Key: "app", Regex: regexp.MustCompile("paul.*")},
},
result: []uint32{uint32(6), uint32(7)},
},
@ -269,9 +269,9 @@ func TestDatabase_SeriesIDsWhereFilter(t *testing.T) {
// query against a tag value matching regex and other tag value matching value
{
names: []string{"queue_depth"},
filters: []*Filter{
&Filter{Key: "name", Value: "high priority"},
&Filter{Key: "app", Regex: regexp.MustCompile("paul.*")},
filters: []*TagFilter{
&TagFilter{Key: "name", Value: "high priority"},
&TagFilter{Key: "app", Regex: regexp.MustCompile("paul.*")},
},
result: []uint32{uint32(6), uint32(7)},
},
@ -279,8 +279,8 @@ func TestDatabase_SeriesIDsWhereFilter(t *testing.T) {
// query against a tag value NOT matching regex
{
names: []string{"queue_depth"},
filters: []*Filter{
&Filter{Key: "app", Regex: regexp.MustCompile("paul.*"), Not: true},
filters: []*TagFilter{
&TagFilter{Key: "app", Regex: regexp.MustCompile("paul.*"), Not: true},
},
result: []uint32{uint32(5)},
},
@ -288,9 +288,9 @@ func TestDatabase_SeriesIDsWhereFilter(t *testing.T) {
// query against a tag value NOT matching regex and other tag value matching value
{
names: []string{"queue_depth"},
filters: []*Filter{
&Filter{Key: "app", Regex: regexp.MustCompile("paul.*"), Not: true},
&Filter{Key: "name", Value: "high priority"},
filters: []*TagFilter{
&TagFilter{Key: "app", Regex: regexp.MustCompile("paul.*"), Not: true},
&TagFilter{Key: "name", Value: "high priority"},
},
result: []uint32{uint32(5)},
},
@ -298,8 +298,8 @@ func TestDatabase_SeriesIDsWhereFilter(t *testing.T) {
// query against multiple measurements
{
names: []string{"cpu_load", "key_count"},
filters: []*Filter{
&Filter{Key: "region", Value: "uswest"},
filters: []*TagFilter{
&TagFilter{Key: "region", Value: "uswest"},
},
result: []uint32{uint32(1), uint32(2), uint32(3)},
},
@ -343,13 +343,13 @@ func TestDatabase_TagKeys(t *testing.T) {
}
}
func TestDatabase_TagValuesWhereFilter(t *testing.T) {
func TestDatabase_TagValuesWhereTagFilter(t *testing.T) {
idx := databaseWithFixtureData()
var tests = []struct {
names []string
key string
filters []*Filter
filters []*TagFilter
result []string
}{
// get the tag values across multiple measurements
@ -365,8 +365,8 @@ func TestDatabase_TagValuesWhereFilter(t *testing.T) {
{
names: []string{"key_count"},
key: "region",
filters: []*Filter{
&Filter{Key: "host", Value: "serverc.influx.com"},
filters: []*TagFilter{
&TagFilter{Key: "host", Value: "serverc.influx.com"},
},
result: []string{"uswest"},
},
@ -375,8 +375,8 @@ func TestDatabase_TagValuesWhereFilter(t *testing.T) {
{
names: []string{"key_count"},
key: "region",
filters: []*Filter{
&Filter{Key: "host", Value: "serverc.influx.com", Not: true},
filters: []*TagFilter{
&TagFilter{Key: "host", Value: "serverc.influx.com", Not: true},
},
result: []string{"useast"},
},
@ -385,9 +385,9 @@ func TestDatabase_TagValuesWhereFilter(t *testing.T) {
{
names: []string{"key_count"},
key: "region",
filters: []*Filter{
&Filter{Key: "host", Value: "serverc.influx.com"},
&Filter{Key: "service", Value: "redis"},
filters: []*TagFilter{
&TagFilter{Key: "host", Value: "serverc.influx.com"},
&TagFilter{Key: "service", Value: "redis"},
},
result: []string{"uswest"},
},
@ -396,8 +396,8 @@ func TestDatabase_TagValuesWhereFilter(t *testing.T) {
{
names: []string{"queue_depth"},
key: "name",
filters: []*Filter{
&Filter{Key: "app", Regex: regexp.MustCompile("paul.*")},
filters: []*TagFilter{
&TagFilter{Key: "app", Regex: regexp.MustCompile("paul.*")},
},
result: []string{"high priority"},
},
@ -406,8 +406,8 @@ func TestDatabase_TagValuesWhereFilter(t *testing.T) {
{
names: []string{"key_count"},
key: "region",
filters: []*Filter{
&Filter{Key: "host", Regex: regexp.MustCompile("serverd.*"), Not: true},
filters: []*TagFilter{
&TagFilter{Key: "host", Regex: regexp.MustCompile("serverd.*"), Not: true},
},
result: []string{"uswest"},
},
@ -440,7 +440,7 @@ func databaseWithFixtureData() *database {
ID: uint32(1),
Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"}}
added := idx.AddSeries("cpu_load", s)
added := idx.addSeriesToIndex("cpu_load", s)
if !added {
return nil
}
@ -449,7 +449,7 @@ func databaseWithFixtureData() *database {
ID: uint32(2),
Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}}
added = idx.AddSeries("cpu_load", s)
added = idx.addSeriesToIndex("cpu_load", s)
if !added {
return nil
}
@ -458,7 +458,7 @@ func databaseWithFixtureData() *database {
ID: uint32(3),
Tags: map[string]string{"host": "serverc.influx.com", "region": "uswest", "service": "redis"}}
added = idx.AddSeries("key_count", s)
added = idx.addSeriesToIndex("key_count", s)
if !added {
return nil
}
@ -467,7 +467,7 @@ func databaseWithFixtureData() *database {
ID: uint32(4),
Tags: map[string]string{"host": "serverd.influx.com", "region": "useast", "service": "redis"}}
added = idx.AddSeries("key_count", s)
added = idx.addSeriesToIndex("key_count", s)
if !added {
return nil
}
@ -476,7 +476,7 @@ func databaseWithFixtureData() *database {
ID: uint32(5),
Tags: map[string]string{"name": "high priority"}}
added = idx.AddSeries("queue_depth", s)
added = idx.addSeriesToIndex("queue_depth", s)
if !added {
return nil
}
@ -485,7 +485,7 @@ func databaseWithFixtureData() *database {
ID: uint32(6),
Tags: map[string]string{"name": "high priority", "app": "paultown"}}
added = idx.AddSeries("queue_depth", s)
added = idx.addSeriesToIndex("queue_depth", s)
if !added {
return nil
}
@ -494,7 +494,7 @@ func databaseWithFixtureData() *database {
ID: uint32(7),
Tags: map[string]string{"name": "high priority", "app": "paulcountry"}}
added = idx.AddSeries("queue_depth", s)
added = idx.addSeriesToIndex("queue_depth", s)
if !added {
return nil
}
@ -503,7 +503,7 @@ func databaseWithFixtureData() *database {
ID: uint32(8),
Tags: map[string]string{"a": "b"}}
added = idx.AddSeries("another_thing", s)
added = idx.addSeriesToIndex("another_thing", s)
if !added {
return nil
}

View File

@ -204,7 +204,7 @@ func (tx *metatx) indexDatabase(db *database) {
for id, v := mc.First(); id != nil; id, v = mc.Next() {
var s *Series
mustUnmarshalJSON(v, &s)
db.AddSeries(name, s)
db.addSeriesToIndex(name, s)
}
}
}

View File

@ -978,7 +978,7 @@ func (s *Server) applyCreateSeriesIfNotExists(m *messaging.Message) error {
if err != nil {
return err
}
db.AddSeries(c.Name, series)
db.addSeriesToIndex(c.Name, series)
return nil
}