reduce memory allocations in index

This commit changes the index to point to index data in the shards
instead of keeping it in-memory on the heap.
pull/7138/head
Ben Johnson 2016-06-30 10:49:53 -06:00
parent 35f2fdafcb
commit 8aa224b22d
No known key found for this signature in database
GPG Key ID: 81741CD251883081
38 changed files with 666 additions and 347 deletions

View File

@ -562,7 +562,7 @@ func (p *Point) MarshalJSON() ([]byte, error) {
// MarshalString renders string representation of a Point with specified // MarshalString renders string representation of a Point with specified
// precision. The default precision is nanoseconds. // precision. The default precision is nanoseconds.
func (p *Point) MarshalString() string { func (p *Point) MarshalString() string {
pt, err := models.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time) pt, err := models.NewPoint(p.Measurement, models.NewTags(p.Tags), p.Fields, p.Time)
if err != nil { if err != nil {
return "# ERROR: " + err.Error() + " " + p.Measurement return "# ERROR: " + err.Error() + " " + p.Measurement
} }

View File

@ -356,7 +356,7 @@ func NewPoint(
T = t[0] T = t[0]
} }
pt, err := models.NewPoint(name, tags, fields, T) pt, err := models.NewPoint(name, models.NewTags(tags), fields, T)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -382,7 +382,7 @@ func (p *Point) Name() string {
// Tags returns the tags associated with the point // Tags returns the tags associated with the point
func (p *Point) Tags() map[string]string { func (p *Point) Tags() map[string]string {
return p.pt.Tags() return p.pt.Tags().Map()
} }
// Time return the timestamp for the point // Time return the timestamp for the point

View File

@ -132,7 +132,7 @@ func (c *cmdExport) writeFiles() error {
for i := 0; i < reader.KeyCount(); i++ { for i := 0; i < reader.KeyCount(); i++ {
var pairs string var pairs string
key, typ := reader.KeyAt(i) key, typ := reader.KeyAt(i)
values, _ := reader.ReadAll(key) values, _ := reader.ReadAll(string(key))
measurement, field := tsm1.SeriesAndFieldFromCompositeKey(key) measurement, field := tsm1.SeriesAndFieldFromCompositeKey(key)
for _, value := range values { for _, value := range values {

View File

@ -78,7 +78,7 @@ func cmdReport(opts *reportOpts) {
totalSeries.Add([]byte(key)) totalSeries.Add([]byte(key))
if opts.detailed { if opts.detailed {
sep := strings.Index(key, "#!~#") sep := strings.Index(string(key), "#!~#")
seriesKey, field := key[:sep], key[sep+4:] seriesKey, field := key[:sep], key[sep+4:]
measurement, tags, _ := models.ParseKey(seriesKey) measurement, tags, _ := models.ParseKey(seriesKey)
@ -96,13 +96,13 @@ func cmdReport(opts *reportOpts) {
} }
fieldCount.Add([]byte(field)) fieldCount.Add([]byte(field))
for t, v := range tags { for _, t := range tags {
tagCount, ok := tagCardialities[t] tagCount, ok := tagCardialities[string(t.Key)]
if !ok { if !ok {
tagCount = hllpp.New() tagCount = hllpp.New()
tagCardialities[t] = tagCount tagCardialities[string(t.Key)] = tagCount
} }
tagCount.Add([]byte(v)) tagCount.Add(t.Value)
} }
} }
} }

View File

@ -121,9 +121,9 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {
var pos int var pos int
for i := 0; i < keyCount; i++ { for i := 0; i < keyCount; i++ {
key, _ := r.KeyAt(i) key, _ := r.KeyAt(i)
for _, e := range r.Entries(key) { for _, e := range r.Entries(string(key)) {
pos++ pos++
split := strings.Split(key, "#!~#") split := strings.Split(string(key), "#!~#")
// We dont' know know if we have fields so use an informative default // We dont' know know if we have fields so use an informative default
var measurement, field string = "UNKNOWN", "UNKNOWN" var measurement, field string = "UNKNOWN", "UNKNOWN"
@ -132,7 +132,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {
measurement = split[0] measurement = split[0]
field = split[1] field = split[1]
if opts.filterKey != "" && !strings.Contains(key, opts.filterKey) { if opts.filterKey != "" && !strings.Contains(string(key), opts.filterKey) {
continue continue
} }
fmt.Fprintln(tw, " "+strings.Join([]string{ fmt.Fprintln(tw, " "+strings.Join([]string{
@ -160,7 +160,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {
// Start at the beginning and read every block // Start at the beginning and read every block
for j := 0; j < keyCount; j++ { for j := 0; j < keyCount; j++ {
key, _ := r.KeyAt(j) key, _ := r.KeyAt(j)
for _, e := range r.Entries(key) { for _, e := range r.Entries(string(key)) {
f.Seek(int64(e.Offset), 0) f.Seek(int64(e.Offset), 0)
f.Read(b[:4]) f.Read(b[:4])
@ -172,7 +172,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {
blockSize += int64(e.Size) blockSize += int64(e.Size)
if opts.filterKey != "" && !strings.Contains(key, opts.filterKey) { if opts.filterKey != "" && !strings.Contains(string(key), opts.filterKey) {
i += blockSize i += blockSize
blockCount++ blockCount++
continue continue

View File

@ -83,7 +83,7 @@ type WritePointsRequest struct {
// AddPoint adds a point to the WritePointRequest with field key 'value' // AddPoint adds a point to the WritePointRequest with field key 'value'
func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string) { func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string) {
pt, err := models.NewPoint( pt, err := models.NewPoint(
name, tags, map[string]interface{}{"value": value}, timestamp, name, models.NewTags(tags), map[string]interface{}{"value": value}, timestamp,
) )
if err != nil { if err != nil {
return return
@ -176,7 +176,7 @@ type WriteStatistics struct {
func (w *PointsWriter) Statistics(tags map[string]string) []models.Statistic { func (w *PointsWriter) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{ return []models.Statistic{{
Name: "write", Name: "write",
Tags: tags, Tags: models.NewTags(tags),
Values: map[string]interface{}{ Values: map[string]interface{}{
statWriteReq: atomic.LoadInt64(&w.stats.WriteReq), statWriteReq: atomic.LoadInt64(&w.stats.WriteReq),
statPointWriteReq: atomic.LoadInt64(&w.stats.PointWriteReq), statPointWriteReq: atomic.LoadInt64(&w.stats.PointWriteReq),

View File

@ -818,7 +818,7 @@ func (e *StatementExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsSt
if stmt.Module != "" && stat.Name != stmt.Module { if stmt.Module != "" && stat.Name != stmt.Module {
continue continue
} }
row := &models.Row{Name: stat.Name, Tags: stat.Tags} row := &models.Row{Name: stat.Name, Tags: stat.Tags.Map()}
values := make([]interface{}, 0, len(stat.Values)) values := make([]interface{}, 0, len(stat.Values))
for _, k := range stat.ValueNames() { for _, k := range stat.ValueNames() {
@ -1055,7 +1055,7 @@ func convertRowToPoints(measurementName string, row *models.Row) ([]models.Point
} }
} }
p, err := models.NewPoint(measurementName, row.Tags, vals, v[timeIndex].(time.Time)) p, err := models.NewPoint(measurementName, models.NewTags(row.Tags), vals, v[timeIndex].(time.Time))
if err != nil { if err != nil {
// Drop points that can't be stored // Drop points that can't be stored
continue continue

View File

@ -148,7 +148,7 @@ type QueryStatistics struct {
func (e *QueryExecutor) Statistics(tags map[string]string) []models.Statistic { func (e *QueryExecutor) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{ return []models.Statistic{{
Name: "queryExecutor", Name: "queryExecutor",
Tags: tags, Tags: models.NewTags(tags),
Values: map[string]interface{}{ Values: map[string]interface{}{
statQueriesActive: atomic.LoadInt64(&e.stats.ActiveQueries), statQueriesActive: atomic.LoadInt64(&e.stats.ActiveQueries),
statQueriesExecuted: atomic.LoadInt64(&e.stats.ExecutedQueries), statQueriesExecuted: atomic.LoadInt64(&e.stats.ExecutedQueries),

View File

@ -139,14 +139,14 @@ func ParsePointsString(buf string) ([]Point, error) {
} }
// ParseKey returns the measurement name and tags from a point. // ParseKey returns the measurement name and tags from a point.
func ParseKey(buf string) (string, Tags, error) { func ParseKey(buf []byte) (string, Tags, error) {
// Ignore the error because scanMeasurement returns "missing fields" which we ignore // Ignore the error because scanMeasurement returns "missing fields" which we ignore
// when just parsing a key // when just parsing a key
state, i, _ := scanMeasurement([]byte(buf), 0) state, i, _ := scanMeasurement(buf, 0)
var tags Tags var tags Tags
if state == tagKeyState { if state == tagKeyState {
tags = parseTags([]byte(buf)) tags = parseTags(buf)
// scanMeasurement returns the location of the comma if there are tags, strip that off // scanMeasurement returns the location of the comma if there are tags, strip that off
return string(buf[:i-1]), tags, nil return string(buf[:i-1]), tags, nil
} }
@ -1225,39 +1225,42 @@ func (p *point) Tags() Tags {
} }
func parseTags(buf []byte) Tags { func parseTags(buf []byte) Tags {
tags := make(map[string]string, bytes.Count(buf, []byte(","))) if len(buf) == 0 {
return nil
}
pos, name := scanTo(buf, 0, ',')
// it's an empty key, so there are no tags
if len(name) == 0 {
return nil
}
tags := make(Tags, 0, bytes.Count(buf, []byte(",")))
hasEscape := bytes.IndexByte(buf, '\\') != -1 hasEscape := bytes.IndexByte(buf, '\\') != -1
if len(buf) != 0 { i := pos + 1
pos, name := scanTo(buf, 0, ',') var key, value []byte
for {
if i >= len(buf) {
break
}
i, key = scanTo(buf, i, '=')
i, value = scanTagValue(buf, i+1)
// it's an empyt key, so there are no tags if len(value) == 0 {
if len(name) == 0 { continue
return tags
} }
i := pos + 1 if hasEscape {
var key, value []byte tags = append(tags, Tag{Key: unescapeTag(key), Value: unescapeTag(value)})
for { } else {
if i >= len(buf) { tags = append(tags, Tag{Key: key, Value: value})
break
}
i, key = scanTo(buf, i, '=')
i, value = scanTagValue(buf, i+1)
if len(value) == 0 {
continue
}
if hasEscape {
tags[string(unescapeTag(key))] = string(unescapeTag(value))
} else {
tags[string(key)] = string(value)
}
i++
} }
i++
} }
return tags return tags
} }
@ -1276,7 +1279,8 @@ func (p *point) SetTags(tags Tags) {
// AddTag adds or replaces a tag value for a point // AddTag adds or replaces a tag value for a point
func (p *point) AddTag(key, value string) { func (p *point) AddTag(key, value string) {
tags := p.Tags() tags := p.Tags()
tags[key] = value tags = append(tags, Tag{Key: []byte(key), Value: []byte(value)})
sort.Sort(tags)
p.key = MakeKey([]byte(p.Name()), tags) p.key = MakeKey([]byte(p.Name()), tags)
} }
@ -1386,64 +1390,137 @@ func (p *point) UnixNano() int64 {
return p.Time().UnixNano() return p.Time().UnixNano()
} }
// Tags represents a mapping between a Point's tag names and their // Tag represents a single key/value tag pair.
// values. type Tag struct {
type Tags map[string]string Key []byte
Value []byte
}
// Tags represents a sorted list of tags.
type Tags []Tag
// NewTags returns a new Tags from a map.
func NewTags(m map[string]string) Tags {
a := make(Tags, 0, len(m))
for k, v := range m {
a = append(a, Tag{Key: []byte(k), Value: []byte(v)})
}
sort.Sort(a)
return a
}
func (a Tags) Len() int { return len(a) }
func (a Tags) Less(i, j int) bool { return bytes.Compare(a[i].Key, a[j].Key) == -1 }
func (a Tags) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// Get returns the value for a key.
func (a Tags) Get(key []byte) []byte {
// OPTIMIZE: Use sort.Search if tagset is large.
for _, t := range a {
if bytes.Equal(t.Key, key) {
return t.Value
}
}
return nil
}
// GetString returns the string value for a string key.
func (a Tags) GetString(key string) string {
return string(a.Get([]byte(key)))
}
// Set sets the value for a key.
func (a *Tags) Set(key, value []byte) {
for _, t := range *a {
if bytes.Equal(t.Key, key) {
t.Value = value
return
}
}
*a = append(*a, Tag{Key: key, Value: value})
sort.Sort(*a)
}
// SetString sets the string value for a string key.
func (a *Tags) SetString(key, value string) {
a.Set([]byte(key), []byte(value))
}
// Delete removes a tag by key.
func (a *Tags) Delete(key []byte) {
for i, t := range *a {
if bytes.Equal(t.Key, key) {
copy((*a)[i:], (*a)[i+1:])
(*a)[len(*a)-1] = Tag{}
*a = (*a)[:len(*a)-1]
return
}
}
}
// Map returns a map representation of the tags.
func (a Tags) Map() map[string]string {
m := make(map[string]string, len(a))
for _, t := range a {
m[string(t.Key)] = string(t.Value)
}
return m
}
// Merge merges the tags combining the two. If both define a tag with the // Merge merges the tags combining the two. If both define a tag with the
// same key, the merged value overwrites the old value. // same key, the merged value overwrites the old value.
// A new map is returned. // A new map is returned.
func (t Tags) Merge(other map[string]string) Tags { func (a Tags) Merge(other map[string]string) Tags {
merged := make(map[string]string, len(t)+len(other)) merged := make(map[string]string, len(a)+len(other))
for k, v := range t { for _, t := range a {
merged[k] = v merged[string(t.Key)] = string(t.Value)
} }
for k, v := range other { for k, v := range other {
merged[k] = v merged[k] = v
} }
return Tags(merged) return NewTags(merged)
} }
// HashKey hashes all of a tag's keys. // HashKey hashes all of a tag's keys.
func (t Tags) HashKey() []byte { func (a Tags) HashKey() []byte {
// Empty maps marshal to empty bytes. // Empty maps marshal to empty bytes.
if len(t) == 0 { if len(a) == 0 {
return nil return nil
} }
escaped := Tags{} escaped := make(Tags, 0, len(a))
for k, v := range t { for _, t := range a {
ek := escapeTag([]byte(k)) ek := escapeTag(t.Key)
ev := escapeTag([]byte(v)) ev := escapeTag(t.Value)
if len(ev) > 0 { if len(ev) > 0 {
escaped[string(ek)] = string(ev) escaped = append(escaped, Tag{Key: ek, Value: ev})
} }
} }
// Extract keys and determine final size. // Extract keys and determine final size.
sz := len(escaped) + (len(escaped) * 2) // separators sz := len(escaped) + (len(escaped) * 2) // separators
keys := make([]string, len(escaped)+1) keys := make([][]byte, len(escaped)+1)
i := 0 for i, t := range escaped {
for k, v := range escaped { keys[i] = t.Key
keys[i] = k sz += len(t.Key) + len(t.Value)
i++
sz += len(k) + len(v)
} }
keys = keys[:i] keys = keys[:len(escaped)]
sort.Strings(keys) sort.Sort(byteSlices(keys))
// Generate marshaled bytes. // Generate marshaled bytes.
b := make([]byte, sz) b := make([]byte, sz)
buf := b buf := b
idx := 0 idx := 0
for _, k := range keys { for i, k := range keys {
buf[idx] = ',' buf[idx] = ','
idx++ idx++
copy(buf[idx:idx+len(k)], k) copy(buf[idx:idx+len(k)], k)
idx += len(k) idx += len(k)
buf[idx] = '=' buf[idx] = '='
idx++ idx++
v := escaped[k] v := escaped[i].Value
copy(buf[idx:idx+len(v)], v) copy(buf[idx:idx+len(v)], v)
idx += len(v) idx += len(v)
} }
@ -1594,3 +1671,9 @@ func (p Fields) MarshalBinary() []byte {
} }
return b return b
} }
type byteSlices [][]byte
func (a byteSlices) Len() int { return len(a) }
func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }
func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

View File

@ -15,7 +15,7 @@ import (
) )
var ( var (
tags = models.Tags{"foo": "bar", "apple": "orange", "host": "serverA", "region": "uswest"} tags = models.NewTags(map[string]string{"foo": "bar", "apple": "orange", "host": "serverA", "region": "uswest"})
maxFloat64 = strconv.FormatFloat(math.MaxFloat64, 'f', 1, 64) maxFloat64 = strconv.FormatFloat(math.MaxFloat64, 'f', 1, 64)
minFloat64 = strconv.FormatFloat(-math.MaxFloat64, 'f', 1, 64) minFloat64 = strconv.FormatFloat(-math.MaxFloat64, 'f', 1, 64)
) )
@ -115,7 +115,7 @@ func BenchmarkParsePointsTagsUnSorted10(b *testing.B) {
func BenchmarkParseKey(b *testing.B) { func BenchmarkParseKey(b *testing.B) {
line := `cpu,region=us-west,host=serverA,env=prod,target=servers,zone=1c,tag1=value1,tag2=value2,tag3=value3,tag4=value4,tag5=value5` line := `cpu,region=us-west,host=serverA,env=prod,target=servers,zone=1c,tag1=value1,tag2=value2,tag3=value3,tag4=value4,tag5=value5`
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
models.ParseKey(line) models.ParseKey([]byte(line))
} }
} }
@ -163,9 +163,9 @@ func test(t *testing.T, line string, point TestPoint) {
t.Errorf(`ParsePoints("%s") tags mismatch. got %v, exp %v`, line, pts[0].Tags(), exp) t.Errorf(`ParsePoints("%s") tags mismatch. got %v, exp %v`, line, pts[0].Tags(), exp)
} }
for tag, value := range pts[0].Tags() { for _, tag := range pts[0].Tags() {
if value != point.RawTags[tag] { if !bytes.Equal(tag.Value, point.RawTags.Get(tag.Key)) {
t.Errorf(`ParsePoints("%s") tags mismatch. got %v, exp %v`, line, value, point.RawTags[tag]) t.Errorf(`ParsePoints("%s") tags mismatch. got %s, exp %s`, line, tag.Value, point.RawTags.Get(tag.Key))
} }
} }
@ -639,7 +639,7 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `foo\,bar value=1i`, test(t, `foo\,bar value=1i`,
NewTestPoint( NewTestPoint(
"foo,bar", // comma in the name "foo,bar", // comma in the name
models.Tags{}, models.NewTags(map[string]string{}),
models.Fields{ models.Fields{
"value": int64(1), "value": int64(1),
}, },
@ -649,9 +649,9 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu\,main,regions=east value=1.0`, test(t, `cpu\,main,regions=east value=1.0`,
NewTestPoint( NewTestPoint(
"cpu,main", // comma in the name "cpu,main", // comma in the name
models.Tags{ models.NewTags(map[string]string{
"regions": "east", "regions": "east",
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -661,9 +661,9 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu\ load,region=east value=1.0`, test(t, `cpu\ load,region=east value=1.0`,
NewTestPoint( NewTestPoint(
"cpu load", // space in the name "cpu load", // space in the name
models.Tags{ models.NewTags(map[string]string{
"region": "east", "region": "east",
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -673,9 +673,9 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu\=load,region=east value=1.0`, test(t, `cpu\=load,region=east value=1.0`,
NewTestPoint( NewTestPoint(
`cpu\=load`, // backslash is literal `cpu\=load`, // backslash is literal
models.Tags{ models.NewTags(map[string]string{
"region": "east", "region": "east",
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -685,9 +685,9 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu=load,region=east value=1.0`, test(t, `cpu=load,region=east value=1.0`,
NewTestPoint( NewTestPoint(
`cpu=load`, // literal equals is fine in measurement name `cpu=load`, // literal equals is fine in measurement name
models.Tags{ models.NewTags(map[string]string{
"region": "east", "region": "east",
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -696,9 +696,9 @@ func TestParsePointUnescape(t *testing.T) {
// commas in tag names // commas in tag names
test(t, `cpu,region\,zone=east value=1.0`, test(t, `cpu,region\,zone=east value=1.0`,
NewTestPoint("cpu", NewTestPoint("cpu",
models.Tags{ models.NewTags(map[string]string{
"region,zone": "east", // comma in the tag key "region,zone": "east", // comma in the tag key
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -707,9 +707,9 @@ func TestParsePointUnescape(t *testing.T) {
// spaces in tag name // spaces in tag name
test(t, `cpu,region\ zone=east value=1.0`, test(t, `cpu,region\ zone=east value=1.0`,
NewTestPoint("cpu", NewTestPoint("cpu",
models.Tags{ models.NewTags(map[string]string{
"region zone": "east", // space in the tag name "region zone": "east", // space in the tag name
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -718,9 +718,9 @@ func TestParsePointUnescape(t *testing.T) {
// backslash with escaped equals in tag name // backslash with escaped equals in tag name
test(t, `cpu,reg\\=ion=east value=1.0`, test(t, `cpu,reg\\=ion=east value=1.0`,
NewTestPoint("cpu", NewTestPoint("cpu",
models.Tags{ models.NewTags(map[string]string{
`reg\=ion`: "east", `reg\=ion`: "east",
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -729,9 +729,9 @@ func TestParsePointUnescape(t *testing.T) {
// space is tag name // space is tag name
test(t, `cpu,\ =east value=1.0`, test(t, `cpu,\ =east value=1.0`,
NewTestPoint("cpu", NewTestPoint("cpu",
models.Tags{ models.NewTags(map[string]string{
" ": "east", // tag name is single space " ": "east", // tag name is single space
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -740,9 +740,9 @@ func TestParsePointUnescape(t *testing.T) {
// commas in tag values // commas in tag values
test(t, `cpu,regions=east\,west value=1.0`, test(t, `cpu,regions=east\,west value=1.0`,
NewTestPoint("cpu", NewTestPoint("cpu",
models.Tags{ models.NewTags(map[string]string{
"regions": "east,west", // comma in the tag value "regions": "east,west", // comma in the tag value
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -752,9 +752,9 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu,regions=\\ east value=1.0`, test(t, `cpu,regions=\\ east value=1.0`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{ models.NewTags(map[string]string{
"regions": `\ east`, "regions": `\ east`,
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -764,9 +764,9 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu,regions=eas\\ t value=1.0`, test(t, `cpu,regions=eas\\ t value=1.0`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{ models.NewTags(map[string]string{
"regions": `eas\ t`, "regions": `eas\ t`,
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -776,9 +776,9 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu,regions=east\\ value=1.0`, test(t, `cpu,regions=east\\ value=1.0`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{ models.NewTags(map[string]string{
"regions": `east\ `, "regions": `east\ `,
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -787,9 +787,9 @@ func TestParsePointUnescape(t *testing.T) {
// spaces in tag values // spaces in tag values
test(t, `cpu,regions=east\ west value=1.0`, test(t, `cpu,regions=east\ west value=1.0`,
NewTestPoint("cpu", NewTestPoint("cpu",
models.Tags{ models.NewTags(map[string]string{
"regions": "east west", // comma in the tag value "regions": "east west", // comma in the tag value
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -798,9 +798,9 @@ func TestParsePointUnescape(t *testing.T) {
// commas in field keys // commas in field keys
test(t, `cpu,regions=east value\,ms=1.0`, test(t, `cpu,regions=east value\,ms=1.0`,
NewTestPoint("cpu", NewTestPoint("cpu",
models.Tags{ models.NewTags(map[string]string{
"regions": "east", "regions": "east",
}, }),
models.Fields{ models.Fields{
"value,ms": 1.0, // comma in the field keys "value,ms": 1.0, // comma in the field keys
}, },
@ -809,9 +809,9 @@ func TestParsePointUnescape(t *testing.T) {
// spaces in field keys // spaces in field keys
test(t, `cpu,regions=east value\ ms=1.0`, test(t, `cpu,regions=east value\ ms=1.0`,
NewTestPoint("cpu", NewTestPoint("cpu",
models.Tags{ models.NewTags(map[string]string{
"regions": "east", "regions": "east",
}, }),
models.Fields{ models.Fields{
"value ms": 1.0, // comma in the field keys "value ms": 1.0, // comma in the field keys
}, },
@ -820,10 +820,10 @@ func TestParsePointUnescape(t *testing.T) {
// tag with no value // tag with no value
test(t, `cpu,regions=east value="1"`, test(t, `cpu,regions=east value="1"`,
NewTestPoint("cpu", NewTestPoint("cpu",
models.Tags{ models.NewTags(map[string]string{
"regions": "east", "regions": "east",
"foobar": "", "foobar": "",
}, }),
models.Fields{ models.Fields{
"value": "1", "value": "1",
}, },
@ -832,9 +832,9 @@ func TestParsePointUnescape(t *testing.T) {
// commas in field values // commas in field values
test(t, `cpu,regions=east value="1,0"`, test(t, `cpu,regions=east value="1,0"`,
NewTestPoint("cpu", NewTestPoint("cpu",
models.Tags{ models.NewTags(map[string]string{
"regions": "east", "regions": "east",
}, }),
models.Fields{ models.Fields{
"value": "1,0", // comma in the field value "value": "1,0", // comma in the field value
}, },
@ -844,9 +844,9 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu,regions=eas\t value=1.0`, test(t, `cpu,regions=eas\t value=1.0`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{ models.NewTags(map[string]string{
"regions": "eas\\t", "regions": "eas\\t",
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -856,9 +856,9 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu,regions=\\,\,\=east value=1.0`, test(t, `cpu,regions=\\,\,\=east value=1.0`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{ models.NewTags(map[string]string{
"regions": `\,,=east`, "regions": `\,,=east`,
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -868,7 +868,7 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu \a=1i`, test(t, `cpu \a=1i`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{}, models.NewTags(map[string]string{}),
models.Fields{ models.Fields{
"\\a": int64(1), // Left as parsed since it's not a known escape sequence. "\\a": int64(1), // Left as parsed since it's not a known escape sequence.
}, },
@ -878,9 +878,9 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu=load,equals\=foo=tag\=value value=1i`, test(t, `cpu=load,equals\=foo=tag\=value value=1i`,
NewTestPoint( NewTestPoint(
"cpu=load", // Not escaped "cpu=load", // Not escaped
models.Tags{ models.NewTags(map[string]string{
"equals=foo": "tag=value", // Tag and value unescaped "equals=foo": "tag=value", // Tag and value unescaped
}, }),
models.Fields{ models.Fields{
"value": int64(1), "value": int64(1),
}, },
@ -892,7 +892,7 @@ func TestParsePointWithTags(t *testing.T) {
test(t, test(t,
"cpu,host=serverA,region=us-east value=1.0 1000000000", "cpu,host=serverA,region=us-east value=1.0 1000000000",
NewTestPoint("cpu", NewTestPoint("cpu",
models.Tags{"host": "serverA", "region": "us-east"}, models.NewTags(map[string]string{"host": "serverA", "region": "us-east"}),
models.Fields{"value": 1.0}, time.Unix(1, 0))) models.Fields{"value": 1.0}, time.Unix(1, 0)))
} }
@ -924,10 +924,10 @@ func TestParsePointWithDuplicateTags(t *testing.T) {
func TestParsePointWithStringField(t *testing.T) { func TestParsePointWithStringField(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo",str2="bar" 1000000000`, test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo",str2="bar" 1000000000`,
NewTestPoint("cpu", NewTestPoint("cpu",
models.Tags{ models.NewTags(map[string]string{
"host": "serverA", "host": "serverA",
"region": "us-east", "region": "us-east",
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
"str": "foo", "str": "foo",
@ -938,10 +938,10 @@ func TestParsePointWithStringField(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east str="foo \" bar" 1000000000`, test(t, `cpu,host=serverA,region=us-east str="foo \" bar" 1000000000`,
NewTestPoint("cpu", NewTestPoint("cpu",
models.Tags{ models.NewTags(map[string]string{
"host": "serverA", "host": "serverA",
"region": "us-east", "region": "us-east",
}, }),
models.Fields{ models.Fields{
"str": `foo " bar`, "str": `foo " bar`,
}, },
@ -954,10 +954,10 @@ func TestParsePointWithStringWithSpaces(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo bar" 1000000000`, test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo bar" 1000000000`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{ models.NewTags(map[string]string{
"host": "serverA", "host": "serverA",
"region": "us-east", "region": "us-east",
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
"str": "foo bar", // spaces in string value "str": "foo bar", // spaces in string value
@ -970,10 +970,10 @@ func TestParsePointWithStringWithNewline(t *testing.T) {
test(t, "cpu,host=serverA,region=us-east value=1.0,str=\"foo\nbar\" 1000000000", test(t, "cpu,host=serverA,region=us-east value=1.0,str=\"foo\nbar\" 1000000000",
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{ models.NewTags(map[string]string{
"host": "serverA", "host": "serverA",
"region": "us-east", "region": "us-east",
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
"str": "foo\nbar", // newline in string value "str": "foo\nbar", // newline in string value
@ -987,10 +987,10 @@ func TestParsePointWithStringWithCommas(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo\,bar" 1000000000`, test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo\,bar" 1000000000`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{ models.NewTags(map[string]string{
"host": "serverA", "host": "serverA",
"region": "us-east", "region": "us-east",
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
"str": `foo\,bar`, // commas in string value "str": `foo\,bar`, // commas in string value
@ -1002,10 +1002,10 @@ func TestParsePointWithStringWithCommas(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo,bar" 1000000000`, test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo,bar" 1000000000`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{ models.NewTags(map[string]string{
"host": "serverA", "host": "serverA",
"region": "us-east", "region": "us-east",
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
"str": "foo,bar", // commas in string value "str": "foo,bar", // commas in string value
@ -1019,10 +1019,10 @@ func TestParsePointQuotedMeasurement(t *testing.T) {
test(t, `"cpu",host=serverA,region=us-east value=1.0 1000000000`, test(t, `"cpu",host=serverA,region=us-east value=1.0 1000000000`,
NewTestPoint( NewTestPoint(
`"cpu"`, `"cpu"`,
models.Tags{ models.NewTags(map[string]string{
"host": "serverA", "host": "serverA",
"region": "us-east", "region": "us-east",
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -1034,10 +1034,10 @@ func TestParsePointQuotedTags(t *testing.T) {
test(t, `cpu,"host"="serverA",region=us-east value=1.0 1000000000`, test(t, `cpu,"host"="serverA",region=us-east value=1.0 1000000000`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{ models.NewTags(map[string]string{
`"host"`: `"serverA"`, `"host"`: `"serverA"`,
"region": "us-east", "region": "us-east",
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -1056,7 +1056,7 @@ func TestParsePointsUnbalancedQuotedTags(t *testing.T) {
} }
// Expected " in the tag value // Expected " in the tag value
exp := models.MustNewPoint("baz", models.Tags{"mytag": `"a`}, exp := models.MustNewPoint("baz", models.NewTags(map[string]string{"mytag": `"a`}),
models.Fields{"x": float64(1)}, time.Unix(0, 1441103862125)) models.Fields{"x": float64(1)}, time.Unix(0, 1441103862125))
if pts[0].String() != exp.String() { if pts[0].String() != exp.String() {
@ -1064,7 +1064,7 @@ func TestParsePointsUnbalancedQuotedTags(t *testing.T) {
} }
// Expected two points to ensure we did not overscan the line // Expected two points to ensure we did not overscan the line
exp = models.MustNewPoint("baz", models.Tags{"mytag": `a`}, exp = models.MustNewPoint("baz", models.NewTags(map[string]string{"mytag": `a`}),
models.Fields{"z": float64(1)}, time.Unix(0, 1441103862126)) models.Fields{"z": float64(1)}, time.Unix(0, 1441103862126))
if pts[1].String() != exp.String() { if pts[1].String() != exp.String() {
@ -1078,10 +1078,10 @@ func TestParsePointEscapedStringsAndCommas(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east value="{Hello\"{,}\" World}" 1000000000`, test(t, `cpu,host=serverA,region=us-east value="{Hello\"{,}\" World}" 1000000000`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{ models.NewTags(map[string]string{
"host": "serverA", "host": "serverA",
"region": "us-east", "region": "us-east",
}, }),
models.Fields{ models.Fields{
"value": `{Hello"{,}" World}`, "value": `{Hello"{,}" World}`,
}, },
@ -1092,10 +1092,10 @@ func TestParsePointEscapedStringsAndCommas(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east value="{Hello\"{\,}\" World}" 1000000000`, test(t, `cpu,host=serverA,region=us-east value="{Hello\"{\,}\" World}" 1000000000`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{ models.NewTags(map[string]string{
"host": "serverA", "host": "serverA",
"region": "us-east", "region": "us-east",
}, }),
models.Fields{ models.Fields{
"value": `{Hello"{\,}" World}`, "value": `{Hello"{\,}" World}`,
}, },
@ -1107,10 +1107,10 @@ func TestParsePointWithStringWithEquals(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east str="foo=bar",value=1.0 1000000000`, test(t, `cpu,host=serverA,region=us-east str="foo=bar",value=1.0 1000000000`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{ models.NewTags(map[string]string{
"host": "serverA", "host": "serverA",
"region": "us-east", "region": "us-east",
}, }),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
"str": "foo=bar", // spaces in string value "str": "foo=bar", // spaces in string value
@ -1123,7 +1123,7 @@ func TestParsePointWithStringWithBackslash(t *testing.T) {
test(t, `cpu value="test\\\"" 1000000000`, test(t, `cpu value="test\\\"" 1000000000`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{}, models.NewTags(map[string]string{}),
models.Fields{ models.Fields{
"value": `test\"`, "value": `test\"`,
}, },
@ -1133,7 +1133,7 @@ func TestParsePointWithStringWithBackslash(t *testing.T) {
test(t, `cpu value="test\\" 1000000000`, test(t, `cpu value="test\\" 1000000000`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{}, models.NewTags(map[string]string{}),
models.Fields{ models.Fields{
"value": `test\`, "value": `test\`,
}, },
@ -1143,7 +1143,7 @@ func TestParsePointWithStringWithBackslash(t *testing.T) {
test(t, `cpu value="test\\\"" 1000000000`, test(t, `cpu value="test\\\"" 1000000000`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{}, models.NewTags(map[string]string{}),
models.Fields{ models.Fields{
"value": `test\"`, "value": `test\"`,
}, },
@ -1153,7 +1153,7 @@ func TestParsePointWithStringWithBackslash(t *testing.T) {
test(t, `cpu value="test\"" 1000000000`, test(t, `cpu value="test\"" 1000000000`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{}, models.NewTags(map[string]string{}),
models.Fields{ models.Fields{
"value": `test"`, "value": `test"`,
}, },
@ -1165,10 +1165,10 @@ func TestParsePointWithBoolField(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east true=true,t=t,T=T,TRUE=TRUE,True=True,false=false,f=f,F=F,FALSE=FALSE,False=False 1000000000`, test(t, `cpu,host=serverA,region=us-east true=true,t=t,T=T,TRUE=TRUE,True=True,false=false,f=f,F=F,FALSE=FALSE,False=False 1000000000`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{ models.NewTags(map[string]string{
"host": "serverA", "host": "serverA",
"region": "us-east", "region": "us-east",
}, }),
models.Fields{ models.Fields{
"t": true, "t": true,
"T": true, "T": true,
@ -1189,10 +1189,10 @@ func TestParsePointUnicodeString(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east value="wè" 1000000000`, test(t, `cpu,host=serverA,region=us-east value="wè" 1000000000`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{ models.NewTags(map[string]string{
"host": "serverA", "host": "serverA",
"region": "us-east", "region": "us-east",
}, }),
models.Fields{ models.Fields{
"value": "wè", "value": "wè",
}, },
@ -1204,7 +1204,7 @@ func TestParsePointNegativeTimestamp(t *testing.T) {
test(t, `cpu value=1 -1`, test(t, `cpu value=1 -1`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{}, models.NewTags(map[string]string{}),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -1216,7 +1216,7 @@ func TestParsePointMaxTimestamp(t *testing.T) {
test(t, fmt.Sprintf(`cpu value=1 %d`, models.MaxNanoTime), test(t, fmt.Sprintf(`cpu value=1 %d`, models.MaxNanoTime),
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{}, models.NewTags(map[string]string{}),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -1228,7 +1228,7 @@ func TestParsePointMinTimestamp(t *testing.T) {
test(t, `cpu value=1 -9223372036854775808`, test(t, `cpu value=1 -9223372036854775808`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{}, models.NewTags(map[string]string{}),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -1259,7 +1259,7 @@ func TestNewPointFloatWithoutDecimal(t *testing.T) {
test(t, `cpu value=1 1000000000`, test(t, `cpu value=1 1000000000`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{}, models.NewTags(map[string]string{}),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -1270,7 +1270,7 @@ func TestNewPointNegativeFloat(t *testing.T) {
test(t, `cpu value=-0.64 1000000000`, test(t, `cpu value=-0.64 1000000000`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{}, models.NewTags(map[string]string{}),
models.Fields{ models.Fields{
"value": -0.64, "value": -0.64,
}, },
@ -1282,7 +1282,7 @@ func TestNewPointFloatNoDecimal(t *testing.T) {
test(t, `cpu value=1. 1000000000`, test(t, `cpu value=1. 1000000000`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{}, models.NewTags(map[string]string{}),
models.Fields{ models.Fields{
"value": 1.0, "value": 1.0,
}, },
@ -1294,7 +1294,7 @@ func TestNewPointFloatScientific(t *testing.T) {
test(t, `cpu value=6.632243e+06 1000000000`, test(t, `cpu value=6.632243e+06 1000000000`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{}, models.NewTags(map[string]string{}),
models.Fields{ models.Fields{
"value": float64(6632243), "value": float64(6632243),
}, },
@ -1306,7 +1306,7 @@ func TestNewPointLargeInteger(t *testing.T) {
test(t, `cpu value=6632243i 1000000000`, test(t, `cpu value=6632243i 1000000000`,
NewTestPoint( NewTestPoint(
"cpu", "cpu",
models.Tags{}, models.NewTags(map[string]string{}),
models.Fields{ models.Fields{
"value": int64(6632243), // if incorrectly encoded as a float, it would show up as 6.632243e+06 "value": int64(6632243), // if incorrectly encoded as a float, it would show up as 6.632243e+06
}, },
@ -1403,7 +1403,7 @@ func TestParsePointToString(t *testing.T) {
t.Errorf("ParsePoint() to string mismatch:\n got %v\n exp %v", got, line) t.Errorf("ParsePoint() to string mismatch:\n got %v\n exp %v", got, line)
} }
pt = models.MustNewPoint("cpu", models.Tags{"host": "serverA", "region": "us-east"}, pt = models.MustNewPoint("cpu", models.NewTags(map[string]string{"host": "serverA", "region": "us-east"}),
models.Fields{"int": 10, "float": float64(11.0), "float2": float64(12.123), "bool": false, "str": "string val"}, models.Fields{"int": 10, "float": float64(11.0), "float2": float64(12.123), "bool": false, "str": "string val"},
time.Unix(1, 0)) time.Unix(1, 0))
@ -1600,26 +1600,26 @@ cpu,host=serverA,region=us-east value=1.0 946730096789012345`,
func TestNewPointEscaped(t *testing.T) { func TestNewPointEscaped(t *testing.T) {
// commas // commas
pt := models.MustNewPoint("cpu,main", models.Tags{"tag,bar": "value"}, models.Fields{"name,bar": 1.0}, time.Unix(0, 0)) pt := models.MustNewPoint("cpu,main", models.NewTags(map[string]string{"tag,bar": "value"}), models.Fields{"name,bar": 1.0}, time.Unix(0, 0))
if exp := `cpu\,main,tag\,bar=value name\,bar=1 0`; pt.String() != exp { if exp := `cpu\,main,tag\,bar=value name\,bar=1 0`; pt.String() != exp {
t.Errorf("NewPoint().String() mismatch.\ngot %v\nexp %v", pt.String(), exp) t.Errorf("NewPoint().String() mismatch.\ngot %v\nexp %v", pt.String(), exp)
} }
// spaces // spaces
pt = models.MustNewPoint("cpu main", models.Tags{"tag bar": "value"}, models.Fields{"name bar": 1.0}, time.Unix(0, 0)) pt = models.MustNewPoint("cpu main", models.NewTags(map[string]string{"tag bar": "value"}), models.Fields{"name bar": 1.0}, time.Unix(0, 0))
if exp := `cpu\ main,tag\ bar=value name\ bar=1 0`; pt.String() != exp { if exp := `cpu\ main,tag\ bar=value name\ bar=1 0`; pt.String() != exp {
t.Errorf("NewPoint().String() mismatch.\ngot %v\nexp %v", pt.String(), exp) t.Errorf("NewPoint().String() mismatch.\ngot %v\nexp %v", pt.String(), exp)
} }
// equals // equals
pt = models.MustNewPoint("cpu=main", models.Tags{"tag=bar": "value=foo"}, models.Fields{"name=bar": 1.0}, time.Unix(0, 0)) pt = models.MustNewPoint("cpu=main", models.NewTags(map[string]string{"tag=bar": "value=foo"}), models.Fields{"name=bar": 1.0}, time.Unix(0, 0))
if exp := `cpu=main,tag\=bar=value\=foo name\=bar=1 0`; pt.String() != exp { if exp := `cpu=main,tag\=bar=value\=foo name\=bar=1 0`; pt.String() != exp {
t.Errorf("NewPoint().String() mismatch.\ngot %v\nexp %v", pt.String(), exp) t.Errorf("NewPoint().String() mismatch.\ngot %v\nexp %v", pt.String(), exp)
} }
} }
func TestNewPointWithoutField(t *testing.T) { func TestNewPointWithoutField(t *testing.T) {
_, err := models.NewPoint("cpu", models.Tags{"tag": "bar"}, models.Fields{}, time.Unix(0, 0)) _, err := models.NewPoint("cpu", models.NewTags(map[string]string{"tag": "bar"}), models.Fields{}, time.Unix(0, 0))
if err == nil { if err == nil {
t.Fatalf(`NewPoint() expected error. got nil`) t.Fatalf(`NewPoint() expected error. got nil`)
} }
@ -1645,19 +1645,19 @@ func TestNewPointUnhandledType(t *testing.T) {
} }
func TestMakeKeyEscaped(t *testing.T) { func TestMakeKeyEscaped(t *testing.T) {
if exp, got := `cpu\ load`, models.MakeKey([]byte(`cpu\ load`), models.Tags{}); string(got) != exp { if exp, got := `cpu\ load`, models.MakeKey([]byte(`cpu\ load`), models.NewTags(map[string]string{})); string(got) != exp {
t.Errorf("MakeKey() mismatch.\ngot %v\nexp %v", got, exp) t.Errorf("MakeKey() mismatch.\ngot %v\nexp %v", got, exp)
} }
if exp, got := `cpu\ load`, models.MakeKey([]byte(`cpu load`), models.Tags{}); string(got) != exp { if exp, got := `cpu\ load`, models.MakeKey([]byte(`cpu load`), models.NewTags(map[string]string{})); string(got) != exp {
t.Errorf("MakeKey() mismatch.\ngot %v\nexp %v", got, exp) t.Errorf("MakeKey() mismatch.\ngot %v\nexp %v", got, exp)
} }
if exp, got := `cpu\,load`, models.MakeKey([]byte(`cpu\,load`), models.Tags{}); string(got) != exp { if exp, got := `cpu\,load`, models.MakeKey([]byte(`cpu\,load`), models.NewTags(map[string]string{})); string(got) != exp {
t.Errorf("MakeKey() mismatch.\ngot %v\nexp %v", got, exp) t.Errorf("MakeKey() mismatch.\ngot %v\nexp %v", got, exp)
} }
if exp, got := `cpu\,load`, models.MakeKey([]byte(`cpu,load`), models.Tags{}); string(got) != exp { if exp, got := `cpu\,load`, models.MakeKey([]byte(`cpu,load`), models.NewTags(map[string]string{})); string(got) != exp {
t.Errorf("MakeKey() mismatch.\ngot %v\nexp %v", got, exp) t.Errorf("MakeKey() mismatch.\ngot %v\nexp %v", got, exp)
} }
@ -1866,7 +1866,7 @@ func TestNewPointsRejectsMaxKey(t *testing.T) {
} }
func TestParseKeyEmpty(t *testing.T) { func TestParseKeyEmpty(t *testing.T) {
if _, _, err := models.ParseKey(""); err != nil { if _, _, err := models.ParseKey(nil); err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
} }

View File

@ -212,14 +212,13 @@ func (m *Monitor) Statistics(tags map[string]string) ([]*Statistic, error) {
statistic := &Statistic{ statistic := &Statistic{
Statistic: models.Statistic{ Statistic: models.Statistic{
Tags: make(map[string]string),
Values: make(map[string]interface{}), Values: make(map[string]interface{}),
}, },
} }
// Add any supplied tags. // Add any supplied tags.
for k, v := range tags { for k, v := range tags {
statistic.Tags[k] = v statistic.Tags.SetString(k, v)
} }
// Every other top-level expvar value is a map. // Every other top-level expvar value is a map.
@ -242,7 +241,7 @@ func (m *Monitor) Statistics(tags map[string]string) ([]*Statistic, error) {
if err != nil { if err != nil {
return return
} }
statistic.Tags[t.Key] = u statistic.Tags.SetString(t.Key, u)
}) })
case "values": case "values":
// string-interface map. // string-interface map.
@ -281,14 +280,13 @@ func (m *Monitor) Statistics(tags map[string]string) ([]*Statistic, error) {
statistic := &Statistic{ statistic := &Statistic{
Statistic: models.Statistic{ Statistic: models.Statistic{
Name: "runtime", Name: "runtime",
Tags: make(map[string]string),
Values: make(map[string]interface{}), Values: make(map[string]interface{}),
}, },
} }
// Add any supplied tags to Go memstats // Add any supplied tags to Go memstats
for k, v := range tags { for k, v := range tags {
statistic.Tags[k] = v statistic.Tags.SetString(k, v)
} }
var rt runtime.MemStats var rt runtime.MemStats

View File

@ -70,7 +70,7 @@ func NewService(c Config) *Service {
Logger: log.New(os.Stderr, "[collectd] ", log.LstdFlags), Logger: log.New(os.Stderr, "[collectd] ", log.LstdFlags),
err: make(chan error), err: make(chan error),
stats: &Statistics{}, stats: &Statistics{},
statTags: map[string]string{"bind": c.BindAddress}, statTags: models.NewTags(map[string]string{"bind": c.BindAddress}),
} }
return &s return &s
@ -360,8 +360,9 @@ func (s *Service) UnmarshalCollectd(packet *gollectd.Packet) []models.Point {
if packet.TypeInstance != "" { if packet.TypeInstance != "" {
tags["type_instance"] = packet.TypeInstance tags["type_instance"] = packet.TypeInstance
} }
p, err := models.NewPoint(name, tags, fields, timestamp)
// Drop invalid points // Drop invalid points
p, err := models.NewPoint(name, models.NewTags(tags), fields, timestamp)
if err != nil { if err != nil {
s.Logger.Printf("Dropping point %v: %v", name, err) s.Logger.Printf("Dropping point %v: %v", name, err)
atomic.AddInt64(&s.stats.InvalidDroppedPoints, 1) atomic.AddInt64(&s.stats.InvalidDroppedPoints, 1)

View File

@ -147,7 +147,7 @@ type Statistics struct {
func (s *Service) Statistics(tags map[string]string) []models.Statistic { func (s *Service) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{ return []models.Statistic{{
Name: "cq", Name: "cq",
Tags: tags, Tags: models.NewTags(tags),
Values: map[string]interface{}{ Values: map[string]interface{}{
statQueryOK: atomic.LoadInt64(&s.stats.QueryOK), statQueryOK: atomic.LoadInt64(&s.stats.QueryOK),
statQueryFail: atomic.LoadInt64(&s.stats.QueryFail), statQueryFail: atomic.LoadInt64(&s.stats.QueryFail),

View File

@ -116,12 +116,12 @@ func (c *Config) WithDefaults() *Config {
// DefaultTags returns the config's tags. // DefaultTags returns the config's tags.
func (c *Config) DefaultTags() models.Tags { func (c *Config) DefaultTags() models.Tags {
tags := models.Tags{} m := make(map[string]string, len(c.Tags))
for _, t := range c.Tags { for _, t := range c.Tags {
parts := strings.Split(t, "=") parts := strings.Split(t, "=")
tags[parts[0]] = parts[1] m[parts[0]] = parts[1]
} }
return tags return models.NewTags(m)
} }
// Validate validates the config's templates and tags. // Validate validates the config's templates and tags.

View File

@ -64,12 +64,12 @@ func NewParserWithOptions(options Options) (*Parser, error) {
} }
// Parse out the default tags specific to this template // Parse out the default tags specific to this template
tags := models.Tags{} var tags models.Tags
if strings.Contains(parts[len(parts)-1], "=") { if strings.Contains(parts[len(parts)-1], "=") {
tagStrs := strings.Split(parts[len(parts)-1], ",") tagStrs := strings.Split(parts[len(parts)-1], ",")
for _, kv := range tagStrs { for _, kv := range tagStrs {
parts := strings.Split(kv, "=") parts := strings.Split(kv, "=")
tags[parts[0]] = parts[1] tags.SetString(parts[0], parts[1])
} }
} }
@ -151,12 +151,12 @@ func (p *Parser) Parse(line string) (models.Point, error) {
} }
// Set the default tags on the point if they are not already set // Set the default tags on the point if they are not already set
for k, v := range p.tags { for _, t := range p.tags {
if _, ok := tags[k]; !ok { if _, ok := tags[string(t.Key)]; !ok {
tags[k] = v tags[string(t.Key)] = string(t.Value)
} }
} }
return models.NewPoint(measurement, tags, fieldValues, timestamp) return models.NewPoint(measurement, models.NewTags(tags), fieldValues, timestamp)
} }
// ApplyTemplate extracts the template fields from the given line and // ApplyTemplate extracts the template fields from the given line and
@ -171,9 +171,9 @@ func (p *Parser) ApplyTemplate(line string) (string, map[string]string, string,
template := p.matcher.Match(fields[0]) template := p.matcher.Match(fields[0])
name, tags, field, err := template.Apply(fields[0]) name, tags, field, err := template.Apply(fields[0])
// Set the default tags on the point if they are not already set // Set the default tags on the point if they are not already set
for k, v := range p.tags { for _, t := range p.tags {
if _, ok := tags[k]; !ok { if _, ok := tags[string(t.Key)]; !ok {
tags[k] = v tags[string(t.Key)] = string(t.Value)
} }
} }
return name, tags, field, err return name, tags, field, err
@ -223,8 +223,8 @@ func (t *template) Apply(line string) (string, map[string]string, string, error)
) )
// Set any default tags // Set any default tags
for k, v := range t.defaultTags { for _, t := range t.defaultTags {
tags[k] = append(tags[k], v) tags[string(t.Key)] = append(tags[string(t.Key)], string(t.Value))
} }
// See if an invalid combination has been specified in the template: // See if an invalid combination has been specified in the template:

View File

@ -261,7 +261,7 @@ func TestFilterMatchDefault(t *testing.T) {
} }
exp := models.MustNewPoint("miss.servers.localhost.cpu_load", exp := models.MustNewPoint("miss.servers.localhost.cpu_load",
models.Tags{}, models.NewTags(map[string]string{}),
models.Fields{"value": float64(11)}, models.Fields{"value": float64(11)},
time.Unix(1435077219, 0)) time.Unix(1435077219, 0))
@ -282,7 +282,7 @@ func TestFilterMatchMultipleMeasurement(t *testing.T) {
} }
exp := models.MustNewPoint("cpu.cpu_load.10", exp := models.MustNewPoint("cpu.cpu_load.10",
models.Tags{"host": "localhost"}, models.NewTags(map[string]string{"host": "localhost"}),
models.Fields{"value": float64(11)}, models.Fields{"value": float64(11)},
time.Unix(1435077219, 0)) time.Unix(1435077219, 0))
@ -306,7 +306,7 @@ func TestFilterMatchMultipleMeasurementSeparator(t *testing.T) {
} }
exp := models.MustNewPoint("cpu_cpu_load_10", exp := models.MustNewPoint("cpu_cpu_load_10",
models.Tags{"host": "localhost"}, models.NewTags(map[string]string{"host": "localhost"}),
models.Fields{"value": float64(11)}, models.Fields{"value": float64(11)},
time.Unix(1435077219, 0)) time.Unix(1435077219, 0))
@ -327,7 +327,7 @@ func TestFilterMatchSingle(t *testing.T) {
} }
exp := models.MustNewPoint("cpu_load", exp := models.MustNewPoint("cpu_load",
models.Tags{"host": "localhost"}, models.NewTags(map[string]string{"host": "localhost"}),
models.Fields{"value": float64(11)}, models.Fields{"value": float64(11)},
time.Unix(1435077219, 0)) time.Unix(1435077219, 0))
@ -348,7 +348,7 @@ func TestParseNoMatch(t *testing.T) {
} }
exp := models.MustNewPoint("servers.localhost.memory.VmallocChunk", exp := models.MustNewPoint("servers.localhost.memory.VmallocChunk",
models.Tags{}, models.NewTags(map[string]string{}),
models.Fields{"value": float64(11)}, models.Fields{"value": float64(11)},
time.Unix(1435077219, 0)) time.Unix(1435077219, 0))
@ -369,7 +369,7 @@ func TestFilterMatchWildcard(t *testing.T) {
} }
exp := models.MustNewPoint("cpu_load", exp := models.MustNewPoint("cpu_load",
models.Tags{"host": "localhost"}, models.NewTags(map[string]string{"host": "localhost"}),
models.Fields{"value": float64(11)}, models.Fields{"value": float64(11)},
time.Unix(1435077219, 0)) time.Unix(1435077219, 0))
@ -392,7 +392,7 @@ func TestFilterMatchExactBeforeWildcard(t *testing.T) {
} }
exp := models.MustNewPoint("cpu_load", exp := models.MustNewPoint("cpu_load",
models.Tags{"host": "localhost"}, models.NewTags(map[string]string{"host": "localhost"}),
models.Fields{"value": float64(11)}, models.Fields{"value": float64(11)},
time.Unix(1435077219, 0)) time.Unix(1435077219, 0))
@ -420,7 +420,7 @@ func TestFilterMatchMostLongestFilter(t *testing.T) {
} }
exp := models.MustNewPoint("cpu_load", exp := models.MustNewPoint("cpu_load",
models.Tags{"host": "localhost", "resource": "cpu"}, models.NewTags(map[string]string{"host": "localhost", "resource": "cpu"}),
models.Fields{"value": float64(11)}, models.Fields{"value": float64(11)},
time.Unix(1435077219, 0)) time.Unix(1435077219, 0))
@ -447,7 +447,7 @@ func TestFilterMatchMultipleWildcards(t *testing.T) {
} }
exp := models.MustNewPoint("cpu_load", exp := models.MustNewPoint("cpu_load",
models.Tags{"host": "server01"}, models.NewTags(map[string]string{"host": "server01"}),
models.Fields{"value": float64(11)}, models.Fields{"value": float64(11)},
time.Unix(1435077219, 0)) time.Unix(1435077219, 0))
@ -462,17 +462,17 @@ func TestFilterMatchMultipleWildcards(t *testing.T) {
} }
func TestParseDefaultTags(t *testing.T) { func TestParseDefaultTags(t *testing.T) {
p, err := graphite.NewParser([]string{"servers.localhost .host.measurement*"}, models.Tags{ p, err := graphite.NewParser([]string{"servers.localhost .host.measurement*"}, models.NewTags(map[string]string{
"region": "us-east", "region": "us-east",
"zone": "1c", "zone": "1c",
"host": "should not set", "host": "should not set",
}) }))
if err != nil { if err != nil {
t.Fatalf("unexpected error creating parser, got %v", err) t.Fatalf("unexpected error creating parser, got %v", err)
} }
exp := models.MustNewPoint("cpu_load", exp := models.MustNewPoint("cpu_load",
models.Tags{"host": "localhost", "region": "us-east", "zone": "1c"}, models.NewTags(map[string]string{"host": "localhost", "region": "us-east", "zone": "1c"}),
models.Fields{"value": float64(11)}, models.Fields{"value": float64(11)},
time.Unix(1435077219, 0)) time.Unix(1435077219, 0))
@ -487,16 +487,16 @@ func TestParseDefaultTags(t *testing.T) {
} }
func TestParseDefaultTemplateTags(t *testing.T) { func TestParseDefaultTemplateTags(t *testing.T) {
p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c"}, models.Tags{ p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c"}, models.NewTags(map[string]string{
"region": "us-east", "region": "us-east",
"host": "should not set", "host": "should not set",
}) }))
if err != nil { if err != nil {
t.Fatalf("unexpected error creating parser, got %v", err) t.Fatalf("unexpected error creating parser, got %v", err)
} }
exp := models.MustNewPoint("cpu_load", exp := models.MustNewPoint("cpu_load",
models.Tags{"host": "localhost", "region": "us-east", "zone": "1c"}, models.NewTags(map[string]string{"host": "localhost", "region": "us-east", "zone": "1c"}),
models.Fields{"value": float64(11)}, models.Fields{"value": float64(11)},
time.Unix(1435077219, 0)) time.Unix(1435077219, 0))
@ -511,16 +511,16 @@ func TestParseDefaultTemplateTags(t *testing.T) {
} }
func TestParseDefaultTemplateTagsOverridGlobal(t *testing.T) { func TestParseDefaultTemplateTagsOverridGlobal(t *testing.T) {
p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c,region=us-east"}, models.Tags{ p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c,region=us-east"}, models.NewTags(map[string]string{
"region": "shot not be set", "region": "shot not be set",
"host": "should not set", "host": "should not set",
}) }))
if err != nil { if err != nil {
t.Fatalf("unexpected error creating parser, got %v", err) t.Fatalf("unexpected error creating parser, got %v", err)
} }
exp := models.MustNewPoint("cpu_load", exp := models.MustNewPoint("cpu_load",
models.Tags{"host": "localhost", "region": "us-east", "zone": "1c"}, models.NewTags(map[string]string{"host": "localhost", "region": "us-east", "zone": "1c"}),
models.Fields{"value": float64(11)}, models.Fields{"value": float64(11)},
time.Unix(1435077219, 0)) time.Unix(1435077219, 0))
@ -535,16 +535,16 @@ func TestParseDefaultTemplateTagsOverridGlobal(t *testing.T) {
} }
func TestParseTemplateWhitespace(t *testing.T) { func TestParseTemplateWhitespace(t *testing.T) {
p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c"}, models.Tags{ p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c"}, models.NewTags(map[string]string{
"region": "us-east", "region": "us-east",
"host": "should not set", "host": "should not set",
}) }))
if err != nil { if err != nil {
t.Fatalf("unexpected error creating parser, got %v", err) t.Fatalf("unexpected error creating parser, got %v", err)
} }
exp := models.MustNewPoint("cpu_load", exp := models.MustNewPoint("cpu_load",
models.Tags{"host": "localhost", "region": "us-east", "zone": "1c"}, models.NewTags(map[string]string{"host": "localhost", "region": "us-east", "zone": "1c"}),
models.Fields{"value": float64(11)}, models.Fields{"value": float64(11)},
time.Unix(1435077219, 0)) time.Unix(1435077219, 0))

View File

@ -106,7 +106,7 @@ func NewService(c Config) (*Service, error) {
batchTimeout: time.Duration(d.BatchTimeout), batchTimeout: time.Duration(d.BatchTimeout),
logger: log.New(os.Stderr, fmt.Sprintf("[graphite] %s ", d.BindAddress), log.LstdFlags), logger: log.New(os.Stderr, fmt.Sprintf("[graphite] %s ", d.BindAddress), log.LstdFlags),
stats: &Statistics{}, stats: &Statistics{},
statTags: map[string]string{"proto": d.Protocol, "bind": d.BindAddress}, statTags: models.NewTags(map[string]string{"proto": d.Protocol, "bind": d.BindAddress}),
tcpConnections: make(map[string]*tcpConnection), tcpConnections: make(map[string]*tcpConnection),
done: make(chan struct{}), done: make(chan struct{}),
diagsKey: strings.Join([]string{"graphite", d.Protocol, d.BindAddress}, ":"), diagsKey: strings.Join([]string{"graphite", d.Protocol, d.BindAddress}, ":"),

View File

@ -39,7 +39,7 @@ func Test_ServerGraphiteTCP(t *testing.T) {
pt, _ := models.NewPoint( pt, _ := models.NewPoint(
"cpu", "cpu",
map[string]string{}, models.NewTags(map[string]string{}),
map[string]interface{}{"value": 23.456}, map[string]interface{}{"value": 23.456},
time.Unix(now.Unix(), 0)) time.Unix(now.Unix(), 0))
@ -115,7 +115,7 @@ func Test_ServerGraphiteUDP(t *testing.T) {
pt, _ := models.NewPoint( pt, _ := models.NewPoint(
"cpu", "cpu",
map[string]string{}, models.NewTags(map[string]string{}),
map[string]interface{}{"value": 23.456}, map[string]interface{}{"value": 23.456},
time.Unix(now.Unix(), 0)) time.Unix(now.Unix(), 0))
if database != "graphitedb" { if database != "graphitedb" {

View File

@ -179,7 +179,7 @@ type Statistics struct {
func (h *Handler) Statistics(tags map[string]string) []models.Statistic { func (h *Handler) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{ return []models.Statistic{{
Name: "httpd", Name: "httpd",
Tags: tags, Tags: models.NewTags(tags),
Values: map[string]interface{}{ Values: map[string]interface{}{
statRequest: atomic.LoadInt64(&h.stats.Requests), statRequest: atomic.LoadInt64(&h.stats.Requests),
statCQRequest: atomic.LoadInt64(&h.stats.CQRequests), statCQRequest: atomic.LoadInt64(&h.stats.CQRequests),
@ -737,24 +737,24 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) {
for _, s := range stats { for _, s := range stats {
// Very hackily create a unique key. // Very hackily create a unique key.
buf := bytes.NewBufferString(s.Name) buf := bytes.NewBufferString(s.Name)
if path, ok := s.Tags["path"]; ok { if path := s.Tags.Get([]byte("path")); path != nil {
fmt.Fprintf(buf, ":%s", path) fmt.Fprintf(buf, ":%s", path)
if id, ok := s.Tags["id"]; ok { if id := s.Tags.Get([]byte("id")); id != nil {
fmt.Fprintf(buf, ":%s", id) fmt.Fprintf(buf, ":%s", id)
} }
} else if bind, ok := s.Tags["bind"]; ok { } else if bind := s.Tags.Get([]byte("bind")); bind != nil {
if proto, ok := s.Tags["proto"]; ok { if proto := s.Tags.Get([]byte("proto")); proto != nil {
fmt.Fprintf(buf, ":%s", proto) fmt.Fprintf(buf, ":%s", proto)
} }
fmt.Fprintf(buf, ":%s", bind) fmt.Fprintf(buf, ":%s", bind)
} else if database, ok := s.Tags["database"]; ok { } else if database := s.Tags.Get([]byte("database")); database != nil {
fmt.Fprintf(buf, ":%s", database) fmt.Fprintf(buf, ":%s", database)
if rp, ok := s.Tags["retention_policy"]; ok { if rp := s.Tags.Get([]byte("retention_policy")); rp != nil {
fmt.Fprintf(buf, ":%s", rp) fmt.Fprintf(buf, ":%s", rp)
if name, ok := s.Tags["name"]; ok { if name := s.Tags.Get([]byte("name")); name != nil {
fmt.Fprintf(buf, ":%s", name) fmt.Fprintf(buf, ":%s", name)
} }
if dest, ok := s.Tags["destination"]; ok { if dest := s.Tags.Get([]byte("destination")); dest != nil {
fmt.Fprintf(buf, ":%s", dest) fmt.Fprintf(buf, ":%s", dest)
} }
} }

View File

@ -114,7 +114,7 @@ func (w *csvResponseWriter) WriteResponse(resp Response) (n int, err error) {
for _, row := range result.Series { for _, row := range result.Series {
w.columns[0] = row.Name w.columns[0] = row.Name
if len(row.Tags) > 0 { if len(row.Tags) > 0 {
w.columns[1] = string(models.Tags(row.Tags).HashKey()[1:]) w.columns[1] = string(models.NewTags(row.Tags).HashKey()[1:])
} else { } else {
w.columns[1] = "" w.columns[1] = ""
} }

View File

@ -192,7 +192,7 @@ func (s *Service) Addr() net.Addr {
// Statistics returns statistics for periodic monitoring. // Statistics returns statistics for periodic monitoring.
func (s *Service) Statistics(tags map[string]string) []models.Statistic { func (s *Service) Statistics(tags map[string]string) []models.Statistic {
return s.Handler.Statistics(models.Tags{"bind": s.addr}.Merge(tags)) return s.Handler.Statistics(models.NewTags(map[string]string{"bind": s.addr}).Merge(tags).Map())
} }
// serveTCP serves the handler from the TCP listener. // serveTCP serves the handler from the TCP listener.

View File

@ -111,7 +111,7 @@ func (h *Handler) servePut(w http.ResponseWriter, r *http.Request) {
ts = time.Unix(p.Time/1000, (p.Time%1000)*1000) ts = time.Unix(p.Time/1000, (p.Time%1000)*1000)
} }
pt, err := models.NewPoint(p.Metric, p.Tags, map[string]interface{}{"value": p.Value}, ts) pt, err := models.NewPoint(p.Metric, models.NewTags(p.Tags), map[string]interface{}{"value": p.Value}, ts)
if err != nil { if err != nil {
h.Logger.Printf("Dropping point %v: %v", p.Metric, err) h.Logger.Printf("Dropping point %v: %v", p.Metric, err)
if h.stats != nil { if h.stats != nil {

View File

@ -96,7 +96,7 @@ func NewService(c Config) (*Service, error) {
Logger: log.New(os.Stderr, "[opentsdb] ", log.LstdFlags), Logger: log.New(os.Stderr, "[opentsdb] ", log.LstdFlags),
LogPointErrors: d.LogPointErrors, LogPointErrors: d.LogPointErrors,
stats: &Statistics{}, stats: &Statistics{},
statTags: map[string]string{"bind": d.BindAddress}, statTags: models.NewTags(map[string]string{"bind": d.BindAddress}),
} }
return s, nil return s, nil
} }
@ -381,7 +381,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
} }
fields["value"] = fv fields["value"] = fv
pt, err := models.NewPoint(measurement, tags, fields, t) pt, err := models.NewPoint(measurement, models.NewTags(tags), fields, t)
if err != nil { if err != nil {
atomic.AddInt64(&s.stats.TelnetBadFloat, 1) atomic.AddInt64(&s.stats.TelnetBadFloat, 1)
if s.LogPointErrors { if s.LogPointErrors {

View File

@ -39,7 +39,7 @@ func TestService_Telnet(t *testing.T) {
} else if !reflect.DeepEqual(points, []models.Point{ } else if !reflect.DeepEqual(points, []models.Point{
models.MustNewPoint( models.MustNewPoint(
"sys.cpu.user", "sys.cpu.user",
map[string]string{"host": "webserver01", "cpu": "0"}, models.NewTags(map[string]string{"host": "webserver01", "cpu": "0"}),
map[string]interface{}{"value": 42.5}, map[string]interface{}{"value": 42.5},
time.Unix(1356998400, 0), time.Unix(1356998400, 0),
), ),
@ -101,7 +101,7 @@ func TestService_HTTP(t *testing.T) {
} else if !reflect.DeepEqual(points, []models.Point{ } else if !reflect.DeepEqual(points, []models.Point{
models.MustNewPoint( models.MustNewPoint(
"sys.cpu.nice", "sys.cpu.nice",
map[string]string{"dc": "lga", "host": "web01"}, models.NewTags(map[string]string{"dc": "lga", "host": "web01"}),
map[string]interface{}{"value": 18.0}, map[string]interface{}{"value": 18.0},
time.Unix(1346846400, 0), time.Unix(1346846400, 0),
), ),

View File

@ -130,7 +130,7 @@ type Statistics struct {
func (s *Service) Statistics(tags map[string]string) []models.Statistic { func (s *Service) Statistics(tags map[string]string) []models.Statistic {
statistics := []models.Statistic{{ statistics := []models.Statistic{{
Name: "subscriber", Name: "subscriber",
Tags: tags, Tags: models.NewTags(tags),
Values: map[string]interface{}{ Values: map[string]interface{}{
statPointsWritten: atomic.LoadInt64(&s.stats.PointsWritten), statPointsWritten: atomic.LoadInt64(&s.stats.PointsWritten),
statWriteFailures: atomic.LoadInt64(&s.stats.WriteFailures), statWriteFailures: atomic.LoadInt64(&s.stats.WriteFailures),
@ -415,13 +415,13 @@ func (b *balancewriter) WritePoints(p *coordinator.WritePointsRequest) error {
// Statistics returns statistics for periodic monitoring. // Statistics returns statistics for periodic monitoring.
func (b *balancewriter) Statistics(tags map[string]string) []models.Statistic { func (b *balancewriter) Statistics(tags map[string]string) []models.Statistic {
tags = models.Tags(tags).Merge(b.tags) tags = models.NewTags(tags).Merge(b.tags).Map()
statistics := make([]models.Statistic, len(b.stats)) statistics := make([]models.Statistic, len(b.stats))
for i := range b.stats { for i := range b.stats {
statistics[i] = models.Statistic{ statistics[i] = models.Statistic{
Name: "subscriber", Name: "subscriber",
Tags: models.Tags(tags).Merge(map[string]string{"destination": b.stats[i].dest}), Tags: models.NewTags(tags).Merge(map[string]string{"destination": b.stats[i].dest}),
Values: map[string]interface{}{ Values: map[string]interface{}{
statPointsWritten: atomic.LoadInt64(&b.stats[i].pointsWritten), statPointsWritten: atomic.LoadInt64(&b.stats[i].pointsWritten),
statWriteFailures: atomic.LoadInt64(&b.stats[i].failures), statWriteFailures: atomic.LoadInt64(&b.stats[i].failures),

View File

@ -71,7 +71,7 @@ func NewService(c Config) *Service {
batcher: tsdb.NewPointBatcher(d.BatchSize, d.BatchPending, time.Duration(d.BatchTimeout)), batcher: tsdb.NewPointBatcher(d.BatchSize, d.BatchPending, time.Duration(d.BatchTimeout)),
Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags), Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags),
stats: &Statistics{}, stats: &Statistics{},
statTags: map[string]string{"bind": d.BindAddress}, statTags: models.NewTags(map[string]string{"bind": d.BindAddress}),
} }
} }

View File

@ -12,8 +12,8 @@ func TestCommunePoint(t *testing.T) {
if point.Name() != "write" { if point.Name() != "write" {
t.Errorf("expected: write\ngot: %v", point.Name()) t.Errorf("expected: write\ngot: %v", point.Name())
} }
if point.Tags()["tag"] != "tagVal" { if point.Tags().GetString("tag") != "tagVal" {
t.Errorf("expected: tagVal\ngot: %v", point.Tags()["tag"]) t.Errorf("expected: tagVal\ngot: %v", point.Tags().GetString("tag"))
} }
if int(point.Fields()["fooField"].(float64)) != 5 { if int(point.Fields()["fooField"].(float64)) != 5 {
t.Errorf("expected: 5\ngot: %v\n", point.Fields()["fooField"]) t.Errorf("expected: 5\ngot: %v\n", point.Fields()["fooField"])
@ -24,8 +24,8 @@ func TestCommunePoint(t *testing.T) {
if point.Name() != "write" { if point.Name() != "write" {
t.Errorf("expected: write\ngot: %v", point.Name()) t.Errorf("expected: write\ngot: %v", point.Name())
} }
if point.Tags()["tag"] != "tagVal" { if point.Tags().GetString("tag") != "tagVal" {
t.Errorf("expected: tagVal\ngot: %v", point.Tags()["tag"]) t.Errorf("expected: tagVal\ngot: %v", point.Tags().GetString("tag"))
} }
if int(point.Fields()["fooField"].(float64)) != 5 { if int(point.Fields()["fooField"].(float64)) != 5 {
t.Errorf("expected: 5\ngot: %v\n", point.Fields()["fooField"]) t.Errorf("expected: 5\ngot: %v\n", point.Fields()["fooField"])
@ -40,8 +40,8 @@ func TestSetCommune(t *testing.T) {
if pt.Name() != "write" { if pt.Name() != "write" {
t.Errorf("expected: write\ngot: %v", pt.Name()) t.Errorf("expected: write\ngot: %v", pt.Name())
} }
if pt.Tags()["tag"] != "tagVal" { if pt.Tags().GetString("tag") != "tagVal" {
t.Errorf("expected: tagVal\ngot: %v", pt.Tags()["tag"]) t.Errorf("expected: tagVal\ngot: %v", pt.Tags().GetString("tag"))
} }
if int(pt.Fields()["fooField"].(float64)) != 5 { if int(pt.Fields()["fooField"].(float64)) != 5 {
t.Errorf("expected: 5\ngot: %v\n", pt.Fields()["fooField"]) t.Errorf("expected: 5\ngot: %v\n", pt.Fields()["fooField"])

View File

@ -171,7 +171,7 @@ type CacheStatistics struct {
func (c *Cache) Statistics(tags map[string]string) []models.Statistic { func (c *Cache) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{ return []models.Statistic{{
Name: "tsm1_cache", Name: "tsm1_cache",
Tags: tags, Tags: models.NewTags(tags),
Values: map[string]interface{}{ Values: map[string]interface{}{
statCacheMemoryBytes: atomic.LoadInt64(&c.stats.MemSizeBytes), statCacheMemoryBytes: atomic.LoadInt64(&c.stats.MemSizeBytes),
statCacheDiskBytes: atomic.LoadInt64(&c.stats.DiskSizeBytes), statCacheDiskBytes: atomic.LoadInt64(&c.stats.DiskSizeBytes),

View File

@ -2,6 +2,7 @@ package tsm1 // import "github.com/influxdata/influxdb/tsdb/engine/tsm1"
import ( import (
"archive/tar" "archive/tar"
"bytes"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -248,7 +249,7 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic {
statistics := make([]models.Statistic, 0, 4) statistics := make([]models.Statistic, 0, 4)
statistics = append(statistics, models.Statistic{ statistics = append(statistics, models.Statistic{
Name: "tsm1_engine", Name: "tsm1_engine",
Tags: tags, Tags: models.NewTags(tags),
Values: map[string]interface{}{ Values: map[string]interface{}{
statCacheCompactions: atomic.LoadInt64(&e.stats.CacheCompactions), statCacheCompactions: atomic.LoadInt64(&e.stats.CacheCompactions),
statCacheCompactionDuration: atomic.LoadInt64(&e.stats.CacheCompactionDuration), statCacheCompactionDuration: atomic.LoadInt64(&e.stats.CacheCompactionDuration),
@ -338,8 +339,9 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index *tsdb.DatabaseIndex) er
// Save reference to index for iterator creation. // Save reference to index for iterator creation.
e.index = index e.index = index
e.FileStore.dereferencer = index
if err := e.FileStore.WalkKeys(func(key string, typ byte) error { if err := e.FileStore.WalkKeys(func(key []byte, typ byte) error {
fieldType, err := tsmFieldTypeToInfluxQLDataType(typ) fieldType, err := tsmFieldTypeToInfluxQLDataType(typ)
if err != nil { if err != nil {
return err return err
@ -365,7 +367,7 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index *tsdb.DatabaseIndex) er
continue continue
} }
if err := e.addToIndexFromKey(shardID, key, fieldType, index); err != nil { if err := e.addToIndexFromKey(shardID, []byte(key), fieldType, index); err != nil {
return err return err
} }
} }
@ -522,9 +524,9 @@ func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string) er
// addToIndexFromKey will pull the measurement name, series key, and field name from a composite key and add it to the // addToIndexFromKey will pull the measurement name, series key, and field name from a composite key and add it to the
// database index and measurement fields // database index and measurement fields
func (e *Engine) addToIndexFromKey(shardID uint64, key string, fieldType influxql.DataType, index *tsdb.DatabaseIndex) error { func (e *Engine) addToIndexFromKey(shardID uint64, key []byte, fieldType influxql.DataType, index *tsdb.DatabaseIndex) error {
seriesKey, field := SeriesAndFieldFromCompositeKey(key) seriesKey, field := SeriesAndFieldFromCompositeKey(key)
measurement := tsdb.MeasurementFromSeriesKey(seriesKey) measurement := tsdb.MeasurementFromSeriesKey(string(seriesKey))
m := index.CreateMeasurementIndexIfNotExists(measurement) m := index.CreateMeasurementIndexIfNotExists(measurement)
m.SetFieldName(field) m.SetFieldName(field)
@ -540,7 +542,7 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key string, fieldType influxq
} }
// Have we already indexed this series? // Have we already indexed this series?
ss := index.Series(seriesKey) ss := index.SeriesBytes(seriesKey)
if ss != nil { if ss != nil {
// Add this shard to the existing series // Add this shard to the existing series
ss.AssignShard(shardID) ss.AssignShard(shardID)
@ -551,7 +553,7 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key string, fieldType influxq
// fields (in line protocol format) in the series key // fields (in line protocol format) in the series key
_, tags, _ := models.ParseKey(seriesKey) _, tags, _ := models.ParseKey(seriesKey)
s := tsdb.NewSeries(seriesKey, tags) s := tsdb.NewSeries(string(seriesKey), tags)
index.CreateSeriesIndexIfNotExists(measurement, s) index.CreateSeriesIndexIfNotExists(measurement, s)
s.AssignShard(shardID) s.AssignShard(shardID)
@ -593,14 +595,14 @@ func (e *Engine) ContainsSeries(keys []string) (map[string]bool, error) {
} }
for _, k := range e.Cache.Keys() { for _, k := range e.Cache.Keys() {
seriesKey, _ := SeriesAndFieldFromCompositeKey(k) seriesKey, _ := SeriesAndFieldFromCompositeKey([]byte(k))
keyMap[seriesKey] = true keyMap[string(seriesKey)] = true
} }
if err := e.FileStore.WalkKeys(func(k string, _ byte) error { if err := e.FileStore.WalkKeys(func(k []byte, _ byte) error {
seriesKey, _ := SeriesAndFieldFromCompositeKey(k) seriesKey, _ := SeriesAndFieldFromCompositeKey(k)
if _, ok := keyMap[seriesKey]; ok { if _, ok := keyMap[string(seriesKey)]; ok {
keyMap[seriesKey] = true keyMap[string(seriesKey)] = true
} }
return nil return nil
}); err != nil { }); err != nil {
@ -638,14 +640,14 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error {
deleteKeys := make([]string, 0, len(seriesKeys)) deleteKeys := make([]string, 0, len(seriesKeys))
// go through the keys in the file store // go through the keys in the file store
if err := e.FileStore.WalkKeys(func(k string, _ byte) error { if err := e.FileStore.WalkKeys(func(k []byte, _ byte) error {
seriesKey, _ := SeriesAndFieldFromCompositeKey(k) seriesKey, _ := SeriesAndFieldFromCompositeKey(k)
// Keep track if we've added this key since WalkKeys can return keys // Keep track if we've added this key since WalkKeys can return keys
// we've seen before // we've seen before
if v, ok := keyMap[seriesKey]; ok && v == 0 { if v, ok := keyMap[string(seriesKey)]; ok && v == 0 {
deleteKeys = append(deleteKeys, k) deleteKeys = append(deleteKeys, string(k))
keyMap[seriesKey] += 1 keyMap[string(seriesKey)] += 1
} }
return nil return nil
}); err != nil { }); err != nil {
@ -665,8 +667,8 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error {
e.Cache.RLock() e.Cache.RLock()
s := e.Cache.Store() s := e.Cache.Store()
for k, _ := range s { for k, _ := range s {
seriesKey, _ := SeriesAndFieldFromCompositeKey(k) seriesKey, _ := SeriesAndFieldFromCompositeKey([]byte(k))
if _, ok := keyMap[seriesKey]; ok { if _, ok := keyMap[string(seriesKey)]; ok {
walKeys = append(walKeys, k) walKeys = append(walKeys, k)
} }
} }
@ -1277,7 +1279,7 @@ func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, mm *tsdb.Measu
// createVarRefSeriesIterator creates an iterator for a variable reference for a series. // createVarRefSeriesIterator creates an iterator for a variable reference for a series.
func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measurement, seriesKey string, t *influxql.TagSet, filter influxql.Expr, conditionFields []influxql.VarRef, opt influxql.IteratorOptions) (influxql.Iterator, error) { func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measurement, seriesKey string, t *influxql.TagSet, filter influxql.Expr, conditionFields []influxql.VarRef, opt influxql.IteratorOptions) (influxql.Iterator, error) {
tags := influxql.NewTags(e.index.TagsForSeries(seriesKey)) tags := influxql.NewTags(e.index.TagsForSeries(seriesKey).Map())
// Create options specific for this series. // Create options specific for this series.
itrOpt := opt itrOpt := opt
@ -1496,11 +1498,11 @@ func tsmFieldTypeToInfluxQLDataType(typ byte) (influxql.DataType, error) {
} }
} }
func SeriesAndFieldFromCompositeKey(key string) (string, string) { func SeriesAndFieldFromCompositeKey(key []byte) ([]byte, string) {
sep := strings.Index(key, keyFieldSeparator) sep := bytes.Index(key, []byte(keyFieldSeparator))
if sep == -1 { if sep == -1 {
// No field??? // No field???
return key, "" return key, ""
} }
return key[:sep], key[sep+len(keyFieldSeparator):] return key[:sep], string(key[sep+len(keyFieldSeparator):])
} }

View File

@ -44,7 +44,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
// Verify index is correct. // Verify index is correct.
if m := index.Measurement("cpu"); m == nil { if m := index.Measurement("cpu"); m == nil {
t.Fatal("measurement not found") t.Fatal("measurement not found")
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "A"}) { } else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "A"})) {
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags) t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags)
} }
@ -67,7 +67,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
// Verify index is correct. // Verify index is correct.
if m := index.Measurement("cpu"); m == nil { if m := index.Measurement("cpu"); m == nil {
t.Fatal("measurement not found") t.Fatal("measurement not found")
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "A"}) { } else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "A"})) {
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags) t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags)
} }
@ -92,9 +92,9 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
// Verify index is correct. // Verify index is correct.
if m := index.Measurement("cpu"); m == nil { if m := index.Measurement("cpu"); m == nil {
t.Fatal("measurement not found") t.Fatal("measurement not found")
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "A"}) { } else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "A"})) {
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags) t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags)
} else if s := m.SeriesByID(2); s.Key != "cpu,host=B" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "B"}) { } else if s := m.SeriesByID(2); s.Key != "cpu,host=B" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "B"})) {
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags) t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags)
} }
} }
@ -222,7 +222,7 @@ func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) {
e.Index().CreateMeasurementIndexIfNotExists("cpu") e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"})) e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
if err := e.WritePointsString( if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`, `cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`, `cpu,host=A value=1.2 2000000000`,
@ -275,7 +275,7 @@ func TestEngine_CreateIterator_Cache_Descending(t *testing.T) {
e.Index().CreateMeasurementIndexIfNotExists("cpu") e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"})) e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
if err := e.WritePointsString( if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`, `cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`, `cpu,host=A value=1.2 2000000000`,
@ -328,7 +328,7 @@ func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) {
e.Index().CreateMeasurementIndexIfNotExists("cpu") e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"})) e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
if err := e.WritePointsString( if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`, `cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`, `cpu,host=A value=1.2 2000000000`,
@ -382,7 +382,7 @@ func TestEngine_CreateIterator_TSM_Descending(t *testing.T) {
e.Index().CreateMeasurementIndexIfNotExists("cpu") e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"})) e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
if err := e.WritePointsString( if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`, `cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`, `cpu,host=A value=1.2 2000000000`,
@ -437,7 +437,7 @@ func TestEngine_CreateIterator_Aux(t *testing.T) {
e.Index().CreateMeasurementIndexIfNotExists("cpu") e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.MeasurementFields("cpu").CreateFieldIfNotExists("F", influxql.Float, false) e.MeasurementFields("cpu").CreateFieldIfNotExists("F", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"})) e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
if err := e.WritePointsString( if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`, `cpu,host=A value=1.1 1000000000`,
`cpu,host=A F=100 1000000000`, `cpu,host=A F=100 1000000000`,
@ -497,7 +497,7 @@ func TestEngine_CreateIterator_Condition(t *testing.T) {
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.MeasurementFields("cpu").CreateFieldIfNotExists("X", influxql.Float, false) e.MeasurementFields("cpu").CreateFieldIfNotExists("X", influxql.Float, false)
e.MeasurementFields("cpu").CreateFieldIfNotExists("Y", influxql.Float, false) e.MeasurementFields("cpu").CreateFieldIfNotExists("Y", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"})) e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
if err := e.WritePointsString( if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`, `cpu,host=A value=1.1 1000000000`,
`cpu,host=A X=10 1000000000`, `cpu,host=A X=10 1000000000`,
@ -663,7 +663,7 @@ func MustInitBenchmarkEngine(pointN int) *Engine {
// Initialize metadata. // Initialize metadata.
e.Index().CreateMeasurementIndexIfNotExists("cpu") e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"})) e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
// Generate time ascending points with jitterred time & value. // Generate time ascending points with jitterred time & value.
rand := rand.New(rand.NewSource(0)) rand := rand.New(rand.NewSource(0))

View File

@ -58,7 +58,7 @@ type TSMFile interface {
KeyCount() int KeyCount() int
// KeyAt returns the key located at index position idx // KeyAt returns the key located at index position idx
KeyAt(idx int) (string, byte) KeyAt(idx int) ([]byte, byte)
// Type returns the block type of the values stored for the key. Returns one of // Type returns the block type of the values stored for the key. Returns one of
// BlockFloat64, BlockInt64, BlockBoolean, BlockString. If key does not exist, // BlockFloat64, BlockInt64, BlockBoolean, BlockString. If key does not exist,
@ -106,6 +106,13 @@ type TSMFile interface {
// BlockIterator returns an iterator pointing to the first block in the file and // BlockIterator returns an iterator pointing to the first block in the file and
// allows sequential iteration to each every block. // allows sequential iteration to each every block.
BlockIterator() *BlockIterator BlockIterator() *BlockIterator
// Removes mmap references held by another object.
deref(dereferencer)
}
type dereferencer interface {
Dereference([]byte)
} }
// Statistics gathered by the FileStore. // Statistics gathered by the FileStore.
@ -132,6 +139,8 @@ type FileStore struct {
purger *purger purger *purger
currentTempDirID int currentTempDirID int
dereferencer dereferencer
} }
type FileStat struct { type FileStat struct {
@ -157,7 +166,7 @@ func (f FileStat) ContainsKey(key string) bool {
func NewFileStore(dir string) *FileStore { func NewFileStore(dir string) *FileStore {
logger := log.New(os.Stderr, "[filestore] ", log.LstdFlags) logger := log.New(os.Stderr, "[filestore] ", log.LstdFlags)
return &FileStore{ fs := &FileStore{
dir: dir, dir: dir,
lastModified: time.Now(), lastModified: time.Now(),
logger: logger, logger: logger,
@ -169,6 +178,8 @@ func NewFileStore(dir string) *FileStore {
logger: logger, logger: logger,
}, },
} }
fs.purger.fileStore = fs
return fs
} }
// enableTraceLogging must be called before the FileStore is opened. // enableTraceLogging must be called before the FileStore is opened.
@ -204,7 +215,7 @@ type FileStoreStatistics struct {
func (f *FileStore) Statistics(tags map[string]string) []models.Statistic { func (f *FileStore) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{ return []models.Statistic{{
Name: "tsm1_filestore", Name: "tsm1_filestore",
Tags: tags, Tags: models.NewTags(tags),
Values: map[string]interface{}{ Values: map[string]interface{}{
statFileStoreBytes: atomic.LoadInt64(&f.stats.DiskBytes), statFileStoreBytes: atomic.LoadInt64(&f.stats.DiskBytes),
statFileStoreCount: atomic.LoadInt64(&f.stats.FileCount), statFileStoreCount: atomic.LoadInt64(&f.stats.FileCount),
@ -281,7 +292,7 @@ func (f *FileStore) Remove(paths ...string) {
// WalkKeys calls fn for every key in every TSM file known to the FileStore. If the key // WalkKeys calls fn for every key in every TSM file known to the FileStore. If the key
// exists in multiple files, it will be invoked for each file. // exists in multiple files, it will be invoked for each file.
func (f *FileStore) WalkKeys(fn func(key string, typ byte) error) error { func (f *FileStore) WalkKeys(fn func(key []byte, typ byte) error) error {
f.mu.RLock() f.mu.RLock()
defer f.mu.RUnlock() defer f.mu.RUnlock()
@ -305,7 +316,7 @@ func (f *FileStore) Keys() map[string]byte {
for _, f := range f.files { for _, f := range f.files {
for i := 0; i < f.KeyCount(); i++ { for i := 0; i < f.KeyCount(); i++ {
key, typ := f.KeyAt(i) key, typ := f.KeyAt(i)
uniqueKeys[key] = typ uniqueKeys[string(key)] = typ
} }
} }
@ -556,6 +567,11 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
continue continue
} }
// Remove any mmap references held by the index.
if f.dereferencer != nil {
file.deref(f.dereferencer)
}
if err := file.Close(); err != nil { if err := file.Close(); err != nil {
return err return err
} }
@ -1088,9 +1104,10 @@ func (c *KeyCursor) filterBooleanValues(tombstones []TimeRange, values BooleanVa
} }
type purger struct { type purger struct {
mu sync.RWMutex mu sync.RWMutex
files map[string]TSMFile fileStore *FileStore
running bool files map[string]TSMFile
running bool
logger *log.Logger logger *log.Logger
} }
@ -1118,6 +1135,11 @@ func (p *purger) purge() {
p.mu.Lock() p.mu.Lock()
for k, v := range p.files { for k, v := range p.files {
if !v.InUse() { if !v.InUse() {
// Remove any mmap references held by the index.
if p.fileStore.dereferencer != nil {
v.deref(p.fileStore.dereferencer)
}
if err := v.Close(); err != nil { if err := v.Close(); err != nil {
p.logger.Printf("purge: close file: %v", err) p.logger.Printf("purge: close file: %v", err)
continue continue

View File

@ -68,7 +68,7 @@ type TSMIndex interface {
Key(index int) (string, []IndexEntry) Key(index int) (string, []IndexEntry)
// KeyAt returns the key in the index at the given postion. // KeyAt returns the key in the index at the given postion.
KeyAt(index int) (string, byte) KeyAt(index int) ([]byte, byte)
// KeyCount returns the count of unique keys in the index. // KeyCount returns the count of unique keys in the index.
KeyCount() int KeyCount() int
@ -116,7 +116,7 @@ func (b *BlockIterator) PeekNext() string {
return b.key return b.key
} else if b.n-b.i > 1 { } else if b.n-b.i > 1 {
key, _ := b.r.KeyAt(b.i + 1) key, _ := b.r.KeyAt(b.i + 1)
return key return string(key)
} }
return "" return ""
} }
@ -243,7 +243,7 @@ func (t *TSMReader) Key(index int) (string, []IndexEntry) {
} }
// KeyAt returns the key and key type at position idx in the index. // KeyAt returns the key and key type at position idx in the index.
func (t *TSMReader) KeyAt(idx int) (string, byte) { func (t *TSMReader) KeyAt(idx int) ([]byte, byte) {
return t.index.KeyAt(idx) return t.index.KeyAt(idx)
} }
@ -489,6 +489,13 @@ func (t *TSMReader) BlockIterator() *BlockIterator {
} }
} }
// deref removes mmap references held by another object.
func (t *TSMReader) deref(d dereferencer) {
if acc, ok := t.accessor.(*mmapAccessor); ok {
d.Dereference(acc.b)
}
}
// indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This // indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This
// implementation can be used for indexes that may be MMAPed into memory. // implementation can be used for indexes that may be MMAPed into memory.
type indirectIndex struct { type indirectIndex struct {
@ -667,15 +674,15 @@ func (d *indirectIndex) Key(idx int) (string, []IndexEntry) {
return string(key), entries.entries return string(key), entries.entries
} }
func (d *indirectIndex) KeyAt(idx int) (string, byte) { func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) {
d.mu.RLock() d.mu.RLock()
defer d.mu.RUnlock() defer d.mu.RUnlock()
if idx < 0 || idx >= len(d.offsets) { if idx < 0 || idx >= len(d.offsets) {
return "", 0 return nil, 0
} }
n, key, _ := readKey(d.b[d.offsets[idx]:]) n, key, _ := readKey(d.b[d.offsets[idx]:])
return string(key), d.b[d.offsets[idx]+int32(n)] return key, d.b[d.offsets[idx]+int32(n)]
} }
func (d *indirectIndex) KeyCount() int { func (d *indirectIndex) KeyCount() int {

View File

@ -144,7 +144,7 @@ type WALStatistics struct {
func (l *WAL) Statistics(tags map[string]string) []models.Statistic { func (l *WAL) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{ return []models.Statistic{{
Name: "tsm1_wal", Name: "tsm1_wal",
Tags: tags, Tags: models.NewTags(tags),
Values: map[string]interface{}{ Values: map[string]interface{}{
statWALOldBytes: atomic.LoadInt64(&l.stats.OldBytes), statWALOldBytes: atomic.LoadInt64(&l.stats.OldBytes),
statWALCurrentBytes: atomic.LoadInt64(&l.stats.CurrentBytes), statWALCurrentBytes: atomic.LoadInt64(&l.stats.CurrentBytes),

View File

@ -6,6 +6,7 @@ import (
"sort" "sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
"unsafe"
"github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/models"
@ -56,7 +57,7 @@ type IndexStatistics struct {
func (d *DatabaseIndex) Statistics(tags map[string]string) []models.Statistic { func (d *DatabaseIndex) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{ return []models.Statistic{{
Name: "database", Name: "database",
Tags: models.Tags(map[string]string{"database": d.name}).Merge(tags), Tags: models.NewTags(map[string]string{"database": d.name}).Merge(tags),
Values: map[string]interface{}{ Values: map[string]interface{}{
statDatabaseSeries: atomic.LoadInt64(&d.stats.NumSeries), statDatabaseSeries: atomic.LoadInt64(&d.stats.NumSeries),
statDatabaseMeasurements: atomic.LoadInt64(&d.stats.NumMeasurements), statDatabaseMeasurements: atomic.LoadInt64(&d.stats.NumMeasurements),
@ -72,6 +73,14 @@ func (d *DatabaseIndex) Series(key string) *Series {
return s return s
} }
// SeriesBytes returns a series by key.
func (d *DatabaseIndex) SeriesBytes(key []byte) *Series {
d.mu.RLock()
s := d.series[string(key)]
d.mu.RUnlock()
return s
}
func (d *DatabaseIndex) SeriesKeys() []string { func (d *DatabaseIndex) SeriesKeys() []string {
d.mu.RLock() d.mu.RLock()
s := make([]string, 0, len(d.series)) s := make([]string, 0, len(d.series))
@ -249,7 +258,7 @@ func (d *DatabaseIndex) RemoveShard(shardID uint64) {
} }
// TagsForSeries returns the tag map for the passed in series // TagsForSeries returns the tag map for the passed in series
func (d *DatabaseIndex) TagsForSeries(key string) map[string]string { func (d *DatabaseIndex) TagsForSeries(key string) models.Tags {
d.mu.RLock() d.mu.RLock()
defer d.mu.RUnlock() defer d.mu.RUnlock()
@ -515,6 +524,16 @@ func (d *DatabaseIndex) DropSeries(keys []string) {
atomic.AddInt64(&d.stats.NumSeries, -nDeleted) atomic.AddInt64(&d.stats.NumSeries, -nDeleted)
} }
// Dereference removes all references to data within b and moves them to the heap.
func (d *DatabaseIndex) Dereference(b []byte) {
d.mu.RLock()
defer d.mu.RUnlock()
for _, s := range d.series {
s.Dereference(b)
}
}
// Measurement represents a collection of time series in a database. It also contains in memory // Measurement represents a collection of time series in a database. It also contains in memory
// structures for indexing tags. Exported functions are goroutine safe while un-exported functions // structures for indexing tags. Exported functions are goroutine safe while un-exported functions
// assume the caller will use the appropriate locks // assume the caller will use the appropriate locks
@ -647,13 +666,13 @@ func (m *Measurement) AddSeries(s *Series) bool {
} }
// add this series id to the tag index on the measurement // add this series id to the tag index on the measurement
for k, v := range s.Tags { for _, t := range s.Tags {
valueMap := m.seriesByTagKeyValue[k] valueMap := m.seriesByTagKeyValue[string(t.Key)]
if valueMap == nil { if valueMap == nil {
valueMap = make(map[string]SeriesIDs) valueMap = make(map[string]SeriesIDs)
m.seriesByTagKeyValue[k] = valueMap m.seriesByTagKeyValue[string(t.Key)] = valueMap
} }
ids := valueMap[v] ids := valueMap[string(t.Value)]
ids = append(ids, s.id) ids = append(ids, s.id)
// most of the time the series ID will be higher than all others because it's a new // most of the time the series ID will be higher than all others because it's a new
@ -661,7 +680,7 @@ func (m *Measurement) AddSeries(s *Series) bool {
if len(ids) > 1 && ids[len(ids)-1] < ids[len(ids)-2] { if len(ids) > 1 && ids[len(ids)-1] < ids[len(ids)-2] {
sort.Sort(ids) sort.Sort(ids)
} }
valueMap[v] = ids valueMap[string(t.Value)] = ids
} }
return true return true
@ -683,19 +702,19 @@ func (m *Measurement) DropSeries(series *Series) {
// remove this series id from the tag index on the measurement // remove this series id from the tag index on the measurement
// s.seriesByTagKeyValue is defined as map[string]map[string]SeriesIDs // s.seriesByTagKeyValue is defined as map[string]map[string]SeriesIDs
for k, v := range series.Tags { for _, t := range series.Tags {
values := m.seriesByTagKeyValue[k][v] values := m.seriesByTagKeyValue[string(t.Key)][string(t.Value)]
ids := filter(values, seriesID) ids := filter(values, seriesID)
// Check to see if we have any ids, if not, remove the key // Check to see if we have any ids, if not, remove the key
if len(ids) == 0 { if len(ids) == 0 {
delete(m.seriesByTagKeyValue[k], v) delete(m.seriesByTagKeyValue[string(t.Key)], string(t.Value))
} else { } else {
m.seriesByTagKeyValue[k][v] = ids m.seriesByTagKeyValue[string(t.Key)][string(t.Value)] = ids
} }
// If we have no values, then we delete the key // If we have no values, then we delete the key
if len(m.seriesByTagKeyValue[k]) == 0 { if len(m.seriesByTagKeyValue[string(t.Key)]) == 0 {
delete(m.seriesByTagKeyValue, k) delete(m.seriesByTagKeyValue, string(t.Key))
} }
} }
@ -758,7 +777,7 @@ func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*
// Build the TagSet for this series. // Build the TagSet for this series.
for _, dim := range dimensions { for _, dim := range dimensions {
tags[dim] = s.Tags[dim] tags[dim] = s.Tags.GetString(dim)
} }
// Convert the TagSet to a string, so it can be added to a map allowing TagSets to be handled // Convert the TagSet to a string, so it can be added to a map allowing TagSets to be handled
@ -1423,40 +1442,52 @@ func (a Measurements) union(other Measurements) Measurements {
type Series struct { type Series struct {
mu sync.RWMutex mu sync.RWMutex
Key string Key string
Tags map[string]string Tags models.Tags
id uint64 id uint64
measurement *Measurement measurement *Measurement
shardIDs map[uint64]bool // shards that have this series defined shardIDs []uint64 // shards that have this series defined
} }
// NewSeries returns an initialized series struct // NewSeries returns an initialized series struct
func NewSeries(key string, tags map[string]string) *Series { func NewSeries(key string, tags models.Tags) *Series {
return &Series{ return &Series{
Key: key, Key: key,
Tags: tags, Tags: tags,
shardIDs: make(map[uint64]bool),
} }
} }
func (s *Series) AssignShard(shardID uint64) { func (s *Series) AssignShard(shardID uint64) {
s.mu.Lock() s.mu.Lock()
s.shardIDs[shardID] = true if !s.assigned(shardID) {
s.shardIDs = append(s.shardIDs, shardID)
sort.Sort(uint64Slice(s.shardIDs))
}
s.mu.Unlock() s.mu.Unlock()
} }
func (s *Series) UnassignShard(shardID uint64) { func (s *Series) UnassignShard(shardID uint64) {
s.mu.Lock() s.mu.Lock()
delete(s.shardIDs, shardID) for i, v := range s.shardIDs {
if v == shardID {
s.shardIDs = append(s.shardIDs[:i], s.shardIDs[i+1:]...)
break
}
}
s.mu.Unlock() s.mu.Unlock()
} }
func (s *Series) Assigned(shardID uint64) bool { func (s *Series) Assigned(shardID uint64) bool {
s.mu.RLock() s.mu.RLock()
b := s.shardIDs[shardID] b := s.assigned(shardID)
s.mu.RUnlock() s.mu.RUnlock()
return b return b
} }
func (s *Series) assigned(shardID uint64) bool {
i := sort.Search(len(s.shardIDs), func(i int) bool { return s.shardIDs[i] >= shardID })
return i < len(s.shardIDs) && s.shardIDs[i] == shardID
}
func (s *Series) ShardN() int { func (s *Series) ShardN() int {
s.mu.RLock() s.mu.RLock()
n := len(s.shardIDs) n := len(s.shardIDs)
@ -1464,6 +1495,36 @@ func (s *Series) ShardN() int {
return n return n
} }
// Dereference removes references to a byte slice.
func (s *Series) Dereference(b []byte) {
s.mu.Lock()
min := uintptr(unsafe.Pointer(&b[0]))
max := min + uintptr(len(b))
for _, t := range s.Tags {
deref(&t.Key, min, max)
deref(&t.Value, min, max)
}
s.mu.Unlock()
}
func deref(v *[]byte, min, max uintptr) {
vv := *v
// Ignore if value is not within range.
ptr := uintptr(unsafe.Pointer(&vv[0]))
if ptr < min || ptr > max {
return
}
// Otherwise copy to the heap.
buf := make([]byte, len(vv))
copy(buf, vv)
*v = buf
}
// MarshalBinary encodes the object to a binary format. // MarshalBinary encodes the object to a binary format.
func (s *Series) MarshalBinary() ([]byte, error) { func (s *Series) MarshalBinary() ([]byte, error) {
s.mu.RLock() s.mu.RLock()
@ -1471,10 +1532,8 @@ func (s *Series) MarshalBinary() ([]byte, error) {
var pb internal.Series var pb internal.Series
pb.Key = &s.Key pb.Key = &s.Key
for k, v := range s.Tags { for _, t := range s.Tags {
key := k pb.Tags = append(pb.Tags, &internal.Tag{Key: proto.String(string(t.Key)), Value: proto.String(string(t.Value))})
value := v
pb.Tags = append(pb.Tags, &internal.Tag{Key: &key, Value: &value})
} }
return proto.Marshal(&pb) return proto.Marshal(&pb)
} }
@ -1489,9 +1548,9 @@ func (s *Series) UnmarshalBinary(buf []byte) error {
return err return err
} }
s.Key = pb.GetKey() s.Key = pb.GetKey()
s.Tags = make(map[string]string, len(pb.Tags)) s.Tags = make(models.Tags, len(pb.Tags))
for _, t := range pb.Tags { for i, t := range pb.Tags {
s.Tags[t.GetKey()] = t.GetValue() s.Tags[i] = models.Tag{Key: []byte(t.GetKey()), Value: []byte(t.GetValue())}
} }
return nil return nil
} }
@ -1721,7 +1780,7 @@ func (m *Measurement) tagValuesByKeyAndSeriesID(tagKeys []string, ids SeriesIDs)
// Iterate the tag keys we're interested in and collect values // Iterate the tag keys we're interested in and collect values
// from this series, if they exist. // from this series, if they exist.
for _, tagKey := range tagKeys { for _, tagKey := range tagKeys {
if tagVal, ok := s.Tags[tagKey]; ok { if tagVal := s.Tags.GetString(tagKey); tagVal != "" {
if _, ok = tagValues[tagKey]; !ok { if _, ok = tagValues[tagKey]; !ok {
tagValues[tagKey] = newStringSet() tagValues[tagKey] = newStringSet()
} }
@ -1810,6 +1869,12 @@ func filter(a []uint64, v uint64) []uint64 {
// contains a measurement name. // contains a measurement name.
func MeasurementFromSeriesKey(key string) string { func MeasurementFromSeriesKey(key string) string {
// Ignoring the error because the func returns "missing fields" // Ignoring the error because the func returns "missing fields"
k, _, _ := models.ParseKey(key) k, _, _ := models.ParseKey([]byte(key))
return escape.UnescapeString(k) return escape.UnescapeString(k)
} }
type uint64Slice []uint64
func (a uint64Slice) Len() int { return len(a) }
func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }

View File

@ -6,6 +6,7 @@ import (
"testing" "testing"
"github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb"
) )
@ -182,7 +183,7 @@ func genTestSeries(mCnt, tCnt, vCnt int) []*TestSeries {
for _, ts := range tagSets { for _, ts := range tagSets {
series = append(series, &TestSeries{ series = append(series, &TestSeries{
Measurement: m, Measurement: m,
Series: tsdb.NewSeries(fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))), ts), Series: tsdb.NewSeries(fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))), models.NewTags(ts)),
}) })
} }
} }

View File

@ -8,6 +8,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"sort" "sort"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -117,12 +118,12 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti
closing: make(chan struct{}), closing: make(chan struct{}),
stats: &ShardStatistics{}, stats: &ShardStatistics{},
statTags: map[string]string{ statTags: models.NewTags(map[string]string{
"path": path, "path": path,
"id": fmt.Sprintf("%d", id), "id": fmt.Sprintf("%d", id),
"database": db, "database": db,
"retentionPolicy": rp, "retentionPolicy": rp,
}, }),
database: db, database: db,
retentionPolicy: rp, retentionPolicy: rp,
@ -177,10 +178,10 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic {
return nil return nil
} }
tags = s.statTags.Merge(tags) tags = s.statTags.Merge(tags).Map()
statistics := []models.Statistic{{ statistics := []models.Statistic{{
Name: "shard", Name: "shard",
Tags: models.Tags(tags).Merge(map[string]string{"engine": s.options.EngineVersion}), Tags: models.NewTags(tags).Merge(map[string]string{"engine": s.options.EngineVersion}),
Values: map[string]interface{}{ Values: map[string]interface{}{
statWriteReq: atomic.LoadInt64(&s.stats.WriteReq), statWriteReq: atomic.LoadInt64(&s.stats.WriteReq),
statSeriesCreate: atomic.LoadInt64(&s.stats.SeriesCreated), statSeriesCreate: atomic.LoadInt64(&s.stats.SeriesCreated),
@ -449,9 +450,9 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*FieldCreate,
for _, p := range points { for _, p := range points {
// verify the tags and fields // verify the tags and fields
tags := p.Tags() tags := p.Tags()
if _, ok := tags["time"]; ok { if v := tags.Get([]byte("time")); v != nil {
s.logger.Printf("dropping tag 'time' from '%s'\n", p.PrecisionString("")) s.logger.Printf("dropping tag 'time' from '%s'\n", p.PrecisionString(""))
delete(tags, "time") tags.Delete([]byte("time"))
p.SetTags(tags) p.SetTags(tags)
} }
@ -1052,6 +1053,145 @@ func NewTagKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Itera
return newMeasurementKeysIterator(sh, fn, opt) return newMeasurementKeysIterator(sh, fn, opt)
} }
// tagValuesIterator emits key/tag values
type tagValuesIterator struct {
series []*Series // remaining series
keys []string // tag keys to select from a series
fields []string // fields to emit (key or value)
buf struct {
s *Series // current series
keys []string // current tag's keys
}
}
// NewTagValuesIterator returns a new instance of TagValuesIterator.
func NewTagValuesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) {
if opt.Condition == nil {
return nil, errors.New("a condition is required")
}
measurementExpr := influxql.CloneExpr(opt.Condition)
measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || tag.Val != "_name" {
return nil
}
}
}
return e
}), nil)
mms, ok, err := sh.index.measurementsByExpr(measurementExpr)
if err != nil {
return nil, err
} else if !ok {
mms = sh.index.Measurements()
sort.Sort(mms)
}
// If there are no measurements, return immediately.
if len(mms) == 0 {
return &tagValuesIterator{}, nil
}
filterExpr := influxql.CloneExpr(opt.Condition)
filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || strings.HasPrefix(tag.Val, "_") {
return nil
}
}
}
return e
}), nil)
var series []*Series
keys := newStringSet()
for _, mm := range mms {
ss, ok, err := mm.TagKeysByExpr(opt.Condition)
if err != nil {
return nil, err
} else if !ok {
keys.add(mm.TagKeys()...)
} else {
keys = keys.union(ss)
}
ids, err := mm.seriesIDsAllOrByExpr(filterExpr)
if err != nil {
return nil, err
}
for _, id := range ids {
series = append(series, mm.SeriesByID(id))
}
}
return &tagValuesIterator{
series: series,
keys: keys.list(),
fields: influxql.VarRefs(opt.Aux).Strings(),
}, nil
}
// Stats returns stats about the points processed.
func (itr *tagValuesIterator) Stats() influxql.IteratorStats { return influxql.IteratorStats{} }
// Close closes the iterator.
func (itr *tagValuesIterator) Close() error { return nil }
// Next emits the next point in the iterator.
func (itr *tagValuesIterator) Next() (*influxql.FloatPoint, error) {
for {
// If there are no more values then move to the next key.
if len(itr.buf.keys) == 0 {
if len(itr.series) == 0 {
return nil, nil
}
itr.buf.s = itr.series[0]
itr.buf.keys = itr.keys
itr.series = itr.series[1:]
continue
}
key := itr.buf.keys[0]
value := itr.buf.s.Tags.GetString(key)
if value == "" {
itr.buf.keys = itr.buf.keys[1:]
continue
}
// Prepare auxiliary fields.
auxFields := make([]interface{}, len(itr.fields))
for i, f := range itr.fields {
switch f {
case "_tagKey":
auxFields[i] = key
case "value":
auxFields[i] = value
}
}
// Return next key.
p := &influxql.FloatPoint{
Name: itr.buf.s.measurement.Name,
Aux: auxFields,
}
itr.buf.keys = itr.buf.keys[1:]
return p, nil
}
}
// measurementKeyFunc is the function called by measurementKeysIterator. // measurementKeyFunc is the function called by measurementKeysIterator.
type measurementKeyFunc func(m *Measurement) []string type measurementKeyFunc func(m *Measurement) []string

View File

@ -47,7 +47,7 @@ func TestShardWriteAndIndex(t *testing.T) {
pt := models.MustNewPoint( pt := models.MustNewPoint(
"cpu", "cpu",
map[string]string{"host": "server"}, models.Tags{{Key: []byte("host"), Value: []byte("server")}},
map[string]interface{}{"value": 1.0}, map[string]interface{}{"value": 1.0},
time.Unix(1, 2), time.Unix(1, 2),
) )
@ -69,7 +69,7 @@ func TestShardWriteAndIndex(t *testing.T) {
} }
seriesTags := index.Series(string(pt.Key())).Tags seriesTags := index.Series(string(pt.Key())).Tags
if len(seriesTags) != len(pt.Tags()) || pt.Tags()["host"] != seriesTags["host"] { if len(seriesTags) != len(pt.Tags()) || pt.Tags().GetString("host") != seriesTags.GetString("host") {
t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags(), seriesTags) t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags(), seriesTags)
} }
if !reflect.DeepEqual(index.Measurement("cpu").TagKeys(), []string{"host"}) { if !reflect.DeepEqual(index.Measurement("cpu").TagKeys(), []string{"host"}) {
@ -121,7 +121,7 @@ func TestMaxSeriesLimit(t *testing.T) {
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
pt := models.MustNewPoint( pt := models.MustNewPoint(
"cpu", "cpu",
map[string]string{"host": fmt.Sprintf("server%d", i)}, models.Tags{{Key: []byte("host"), Value: []byte(fmt.Sprintf("server%d", i))}},
map[string]interface{}{"value": 1.0}, map[string]interface{}{"value": 1.0},
time.Unix(1, 2), time.Unix(1, 2),
) )
@ -136,7 +136,7 @@ func TestMaxSeriesLimit(t *testing.T) {
// Writing one more series should exceed the series limit. // Writing one more series should exceed the series limit.
pt := models.MustNewPoint( pt := models.MustNewPoint(
"cpu", "cpu",
map[string]string{"host": "server9999"}, models.Tags{{Key: []byte("host"), Value: []byte("server9999")}},
map[string]interface{}{"value": 1.0}, map[string]interface{}{"value": 1.0},
time.Unix(1, 2), time.Unix(1, 2),
) )
@ -169,7 +169,7 @@ func TestWriteTimeTag(t *testing.T) {
pt := models.MustNewPoint( pt := models.MustNewPoint(
"cpu", "cpu",
map[string]string{}, models.NewTags(map[string]string{}),
map[string]interface{}{"time": 1.0}, map[string]interface{}{"time": 1.0},
time.Unix(1, 2), time.Unix(1, 2),
) )
@ -189,7 +189,7 @@ func TestWriteTimeTag(t *testing.T) {
pt = models.MustNewPoint( pt = models.MustNewPoint(
"cpu", "cpu",
map[string]string{}, models.NewTags(map[string]string{}),
map[string]interface{}{"value": 1.0, "time": 1.0}, map[string]interface{}{"value": 1.0, "time": 1.0},
time.Unix(1, 2), time.Unix(1, 2),
) )
@ -230,7 +230,7 @@ func TestWriteTimeField(t *testing.T) {
pt := models.MustNewPoint( pt := models.MustNewPoint(
"cpu", "cpu",
map[string]string{"time": "now"}, models.NewTags(map[string]string{"time": "now"}),
map[string]interface{}{"value": 1.0}, map[string]interface{}{"value": 1.0},
time.Unix(1, 2), time.Unix(1, 2),
) )
@ -270,7 +270,7 @@ func TestShardWriteAddNewField(t *testing.T) {
pt := models.MustNewPoint( pt := models.MustNewPoint(
"cpu", "cpu",
map[string]string{"host": "server"}, models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0}, map[string]interface{}{"value": 1.0},
time.Unix(1, 2), time.Unix(1, 2),
) )
@ -282,7 +282,7 @@ func TestShardWriteAddNewField(t *testing.T) {
pt = models.MustNewPoint( pt = models.MustNewPoint(
"cpu", "cpu",
map[string]string{"host": "server"}, models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0, "value2": 2.0}, map[string]interface{}{"value": 1.0, "value2": 2.0},
time.Unix(1, 2), time.Unix(1, 2),
) )
@ -296,7 +296,7 @@ func TestShardWriteAddNewField(t *testing.T) {
t.Fatalf("series wasn't in index") t.Fatalf("series wasn't in index")
} }
seriesTags := index.Series(string(pt.Key())).Tags seriesTags := index.Series(string(pt.Key())).Tags
if len(seriesTags) != len(pt.Tags()) || pt.Tags()["host"] != seriesTags["host"] { if len(seriesTags) != len(pt.Tags()) || pt.Tags().GetString("host") != seriesTags.GetString("host") {
t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags(), seriesTags) t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags(), seriesTags)
} }
if !reflect.DeepEqual(index.Measurement("cpu").TagKeys(), []string{"host"}) { if !reflect.DeepEqual(index.Measurement("cpu").TagKeys(), []string{"host"}) {
@ -328,7 +328,7 @@ func TestShard_Close_RemoveIndex(t *testing.T) {
pt := models.MustNewPoint( pt := models.MustNewPoint(
"cpu", "cpu",
map[string]string{"host": "server"}, models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0}, map[string]interface{}{"value": 1.0},
time.Unix(1, 2), time.Unix(1, 2),
) )
@ -521,7 +521,7 @@ func TestShard_Disabled_WriteQuery(t *testing.T) {
pt := models.MustNewPoint( pt := models.MustNewPoint(
"cpu", "cpu",
map[string]string{"host": "server"}, models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0}, map[string]interface{}{"value": 1.0},
time.Unix(1, 2), time.Unix(1, 2),
) )

View File

@ -929,13 +929,13 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err
// Loop over all keys for each series. // Loop over all keys for each series.
m := make(map[KeyValue]struct{}, len(ss)) m := make(map[KeyValue]struct{}, len(ss))
for _, series := range ss { for _, series := range ss {
for key, value := range series.Tags { for _, t := range series.Tags {
if !ok { if !ok {
// nop // nop
} else if _, exists := keySet[key]; !exists { } else if _, exists := keySet[string(t.Key)]; !exists {
continue continue
} }
m[KeyValue{key, value}] = struct{}{} m[KeyValue{string(t.Key), string(t.Value)}] = struct{}{}
} }
} }