reduce memory allocations in index

This commit changes the index to point to index data in the shards
instead of keeping it in-memory on the heap.
pull/7138/head
Ben Johnson 2016-06-30 10:49:53 -06:00
parent 35f2fdafcb
commit 8aa224b22d
No known key found for this signature in database
GPG Key ID: 81741CD251883081
38 changed files with 666 additions and 347 deletions

View File

@ -562,7 +562,7 @@ func (p *Point) MarshalJSON() ([]byte, error) {
// MarshalString renders string representation of a Point with specified
// precision. The default precision is nanoseconds.
func (p *Point) MarshalString() string {
pt, err := models.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time)
pt, err := models.NewPoint(p.Measurement, models.NewTags(p.Tags), p.Fields, p.Time)
if err != nil {
return "# ERROR: " + err.Error() + " " + p.Measurement
}

View File

@ -356,7 +356,7 @@ func NewPoint(
T = t[0]
}
pt, err := models.NewPoint(name, tags, fields, T)
pt, err := models.NewPoint(name, models.NewTags(tags), fields, T)
if err != nil {
return nil, err
}
@ -382,7 +382,7 @@ func (p *Point) Name() string {
// Tags returns the tags associated with the point
func (p *Point) Tags() map[string]string {
return p.pt.Tags()
return p.pt.Tags().Map()
}
// Time return the timestamp for the point

View File

@ -132,7 +132,7 @@ func (c *cmdExport) writeFiles() error {
for i := 0; i < reader.KeyCount(); i++ {
var pairs string
key, typ := reader.KeyAt(i)
values, _ := reader.ReadAll(key)
values, _ := reader.ReadAll(string(key))
measurement, field := tsm1.SeriesAndFieldFromCompositeKey(key)
for _, value := range values {

View File

@ -78,7 +78,7 @@ func cmdReport(opts *reportOpts) {
totalSeries.Add([]byte(key))
if opts.detailed {
sep := strings.Index(key, "#!~#")
sep := strings.Index(string(key), "#!~#")
seriesKey, field := key[:sep], key[sep+4:]
measurement, tags, _ := models.ParseKey(seriesKey)
@ -96,13 +96,13 @@ func cmdReport(opts *reportOpts) {
}
fieldCount.Add([]byte(field))
for t, v := range tags {
tagCount, ok := tagCardialities[t]
for _, t := range tags {
tagCount, ok := tagCardialities[string(t.Key)]
if !ok {
tagCount = hllpp.New()
tagCardialities[t] = tagCount
tagCardialities[string(t.Key)] = tagCount
}
tagCount.Add([]byte(v))
tagCount.Add(t.Value)
}
}
}

View File

@ -121,9 +121,9 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {
var pos int
for i := 0; i < keyCount; i++ {
key, _ := r.KeyAt(i)
for _, e := range r.Entries(key) {
for _, e := range r.Entries(string(key)) {
pos++
split := strings.Split(key, "#!~#")
split := strings.Split(string(key), "#!~#")
// We dont' know know if we have fields so use an informative default
var measurement, field string = "UNKNOWN", "UNKNOWN"
@ -132,7 +132,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {
measurement = split[0]
field = split[1]
if opts.filterKey != "" && !strings.Contains(key, opts.filterKey) {
if opts.filterKey != "" && !strings.Contains(string(key), opts.filterKey) {
continue
}
fmt.Fprintln(tw, " "+strings.Join([]string{
@ -160,7 +160,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {
// Start at the beginning and read every block
for j := 0; j < keyCount; j++ {
key, _ := r.KeyAt(j)
for _, e := range r.Entries(key) {
for _, e := range r.Entries(string(key)) {
f.Seek(int64(e.Offset), 0)
f.Read(b[:4])
@ -172,7 +172,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {
blockSize += int64(e.Size)
if opts.filterKey != "" && !strings.Contains(key, opts.filterKey) {
if opts.filterKey != "" && !strings.Contains(string(key), opts.filterKey) {
i += blockSize
blockCount++
continue

View File

@ -83,7 +83,7 @@ type WritePointsRequest struct {
// AddPoint adds a point to the WritePointRequest with field key 'value'
func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string) {
pt, err := models.NewPoint(
name, tags, map[string]interface{}{"value": value}, timestamp,
name, models.NewTags(tags), map[string]interface{}{"value": value}, timestamp,
)
if err != nil {
return
@ -176,7 +176,7 @@ type WriteStatistics struct {
func (w *PointsWriter) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{
Name: "write",
Tags: tags,
Tags: models.NewTags(tags),
Values: map[string]interface{}{
statWriteReq: atomic.LoadInt64(&w.stats.WriteReq),
statPointWriteReq: atomic.LoadInt64(&w.stats.PointWriteReq),

View File

@ -818,7 +818,7 @@ func (e *StatementExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsSt
if stmt.Module != "" && stat.Name != stmt.Module {
continue
}
row := &models.Row{Name: stat.Name, Tags: stat.Tags}
row := &models.Row{Name: stat.Name, Tags: stat.Tags.Map()}
values := make([]interface{}, 0, len(stat.Values))
for _, k := range stat.ValueNames() {
@ -1055,7 +1055,7 @@ func convertRowToPoints(measurementName string, row *models.Row) ([]models.Point
}
}
p, err := models.NewPoint(measurementName, row.Tags, vals, v[timeIndex].(time.Time))
p, err := models.NewPoint(measurementName, models.NewTags(row.Tags), vals, v[timeIndex].(time.Time))
if err != nil {
// Drop points that can't be stored
continue

View File

@ -148,7 +148,7 @@ type QueryStatistics struct {
func (e *QueryExecutor) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{
Name: "queryExecutor",
Tags: tags,
Tags: models.NewTags(tags),
Values: map[string]interface{}{
statQueriesActive: atomic.LoadInt64(&e.stats.ActiveQueries),
statQueriesExecuted: atomic.LoadInt64(&e.stats.ExecutedQueries),

View File

@ -139,14 +139,14 @@ func ParsePointsString(buf string) ([]Point, error) {
}
// ParseKey returns the measurement name and tags from a point.
func ParseKey(buf string) (string, Tags, error) {
func ParseKey(buf []byte) (string, Tags, error) {
// Ignore the error because scanMeasurement returns "missing fields" which we ignore
// when just parsing a key
state, i, _ := scanMeasurement([]byte(buf), 0)
state, i, _ := scanMeasurement(buf, 0)
var tags Tags
if state == tagKeyState {
tags = parseTags([]byte(buf))
tags = parseTags(buf)
// scanMeasurement returns the location of the comma if there are tags, strip that off
return string(buf[:i-1]), tags, nil
}
@ -1225,39 +1225,42 @@ func (p *point) Tags() Tags {
}
func parseTags(buf []byte) Tags {
tags := make(map[string]string, bytes.Count(buf, []byte(",")))
if len(buf) == 0 {
return nil
}
pos, name := scanTo(buf, 0, ',')
// it's an empty key, so there are no tags
if len(name) == 0 {
return nil
}
tags := make(Tags, 0, bytes.Count(buf, []byte(",")))
hasEscape := bytes.IndexByte(buf, '\\') != -1
if len(buf) != 0 {
pos, name := scanTo(buf, 0, ',')
i := pos + 1
var key, value []byte
for {
if i >= len(buf) {
break
}
i, key = scanTo(buf, i, '=')
i, value = scanTagValue(buf, i+1)
// it's an empyt key, so there are no tags
if len(name) == 0 {
return tags
if len(value) == 0 {
continue
}
i := pos + 1
var key, value []byte
for {
if i >= len(buf) {
break
}
i, key = scanTo(buf, i, '=')
i, value = scanTagValue(buf, i+1)
if len(value) == 0 {
continue
}
if hasEscape {
tags[string(unescapeTag(key))] = string(unescapeTag(value))
} else {
tags[string(key)] = string(value)
}
i++
if hasEscape {
tags = append(tags, Tag{Key: unescapeTag(key), Value: unescapeTag(value)})
} else {
tags = append(tags, Tag{Key: key, Value: value})
}
i++
}
return tags
}
@ -1276,7 +1279,8 @@ func (p *point) SetTags(tags Tags) {
// AddTag adds or replaces a tag value for a point
func (p *point) AddTag(key, value string) {
tags := p.Tags()
tags[key] = value
tags = append(tags, Tag{Key: []byte(key), Value: []byte(value)})
sort.Sort(tags)
p.key = MakeKey([]byte(p.Name()), tags)
}
@ -1386,64 +1390,137 @@ func (p *point) UnixNano() int64 {
return p.Time().UnixNano()
}
// Tags represents a mapping between a Point's tag names and their
// values.
type Tags map[string]string
// Tag represents a single key/value tag pair.
type Tag struct {
Key []byte
Value []byte
}
// Tags represents a sorted list of tags.
type Tags []Tag
// NewTags returns a new Tags from a map.
func NewTags(m map[string]string) Tags {
a := make(Tags, 0, len(m))
for k, v := range m {
a = append(a, Tag{Key: []byte(k), Value: []byte(v)})
}
sort.Sort(a)
return a
}
func (a Tags) Len() int { return len(a) }
func (a Tags) Less(i, j int) bool { return bytes.Compare(a[i].Key, a[j].Key) == -1 }
func (a Tags) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// Get returns the value for a key.
func (a Tags) Get(key []byte) []byte {
// OPTIMIZE: Use sort.Search if tagset is large.
for _, t := range a {
if bytes.Equal(t.Key, key) {
return t.Value
}
}
return nil
}
// GetString returns the string value for a string key.
func (a Tags) GetString(key string) string {
return string(a.Get([]byte(key)))
}
// Set sets the value for a key.
func (a *Tags) Set(key, value []byte) {
for _, t := range *a {
if bytes.Equal(t.Key, key) {
t.Value = value
return
}
}
*a = append(*a, Tag{Key: key, Value: value})
sort.Sort(*a)
}
// SetString sets the string value for a string key.
func (a *Tags) SetString(key, value string) {
a.Set([]byte(key), []byte(value))
}
// Delete removes a tag by key.
func (a *Tags) Delete(key []byte) {
for i, t := range *a {
if bytes.Equal(t.Key, key) {
copy((*a)[i:], (*a)[i+1:])
(*a)[len(*a)-1] = Tag{}
*a = (*a)[:len(*a)-1]
return
}
}
}
// Map returns a map representation of the tags.
func (a Tags) Map() map[string]string {
m := make(map[string]string, len(a))
for _, t := range a {
m[string(t.Key)] = string(t.Value)
}
return m
}
// Merge merges the tags combining the two. If both define a tag with the
// same key, the merged value overwrites the old value.
// A new map is returned.
func (t Tags) Merge(other map[string]string) Tags {
merged := make(map[string]string, len(t)+len(other))
for k, v := range t {
merged[k] = v
func (a Tags) Merge(other map[string]string) Tags {
merged := make(map[string]string, len(a)+len(other))
for _, t := range a {
merged[string(t.Key)] = string(t.Value)
}
for k, v := range other {
merged[k] = v
}
return Tags(merged)
return NewTags(merged)
}
// HashKey hashes all of a tag's keys.
func (t Tags) HashKey() []byte {
func (a Tags) HashKey() []byte {
// Empty maps marshal to empty bytes.
if len(t) == 0 {
if len(a) == 0 {
return nil
}
escaped := Tags{}
for k, v := range t {
ek := escapeTag([]byte(k))
ev := escapeTag([]byte(v))
escaped := make(Tags, 0, len(a))
for _, t := range a {
ek := escapeTag(t.Key)
ev := escapeTag(t.Value)
if len(ev) > 0 {
escaped[string(ek)] = string(ev)
escaped = append(escaped, Tag{Key: ek, Value: ev})
}
}
// Extract keys and determine final size.
sz := len(escaped) + (len(escaped) * 2) // separators
keys := make([]string, len(escaped)+1)
i := 0
for k, v := range escaped {
keys[i] = k
i++
sz += len(k) + len(v)
keys := make([][]byte, len(escaped)+1)
for i, t := range escaped {
keys[i] = t.Key
sz += len(t.Key) + len(t.Value)
}
keys = keys[:i]
sort.Strings(keys)
keys = keys[:len(escaped)]
sort.Sort(byteSlices(keys))
// Generate marshaled bytes.
b := make([]byte, sz)
buf := b
idx := 0
for _, k := range keys {
for i, k := range keys {
buf[idx] = ','
idx++
copy(buf[idx:idx+len(k)], k)
idx += len(k)
buf[idx] = '='
idx++
v := escaped[k]
v := escaped[i].Value
copy(buf[idx:idx+len(v)], v)
idx += len(v)
}
@ -1594,3 +1671,9 @@ func (p Fields) MarshalBinary() []byte {
}
return b
}
type byteSlices [][]byte
func (a byteSlices) Len() int { return len(a) }
func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }
func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

View File

@ -15,7 +15,7 @@ import (
)
var (
tags = models.Tags{"foo": "bar", "apple": "orange", "host": "serverA", "region": "uswest"}
tags = models.NewTags(map[string]string{"foo": "bar", "apple": "orange", "host": "serverA", "region": "uswest"})
maxFloat64 = strconv.FormatFloat(math.MaxFloat64, 'f', 1, 64)
minFloat64 = strconv.FormatFloat(-math.MaxFloat64, 'f', 1, 64)
)
@ -115,7 +115,7 @@ func BenchmarkParsePointsTagsUnSorted10(b *testing.B) {
func BenchmarkParseKey(b *testing.B) {
line := `cpu,region=us-west,host=serverA,env=prod,target=servers,zone=1c,tag1=value1,tag2=value2,tag3=value3,tag4=value4,tag5=value5`
for i := 0; i < b.N; i++ {
models.ParseKey(line)
models.ParseKey([]byte(line))
}
}
@ -163,9 +163,9 @@ func test(t *testing.T, line string, point TestPoint) {
t.Errorf(`ParsePoints("%s") tags mismatch. got %v, exp %v`, line, pts[0].Tags(), exp)
}
for tag, value := range pts[0].Tags() {
if value != point.RawTags[tag] {
t.Errorf(`ParsePoints("%s") tags mismatch. got %v, exp %v`, line, value, point.RawTags[tag])
for _, tag := range pts[0].Tags() {
if !bytes.Equal(tag.Value, point.RawTags.Get(tag.Key)) {
t.Errorf(`ParsePoints("%s") tags mismatch. got %s, exp %s`, line, tag.Value, point.RawTags.Get(tag.Key))
}
}
@ -639,7 +639,7 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `foo\,bar value=1i`,
NewTestPoint(
"foo,bar", // comma in the name
models.Tags{},
models.NewTags(map[string]string{}),
models.Fields{
"value": int64(1),
},
@ -649,9 +649,9 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu\,main,regions=east value=1.0`,
NewTestPoint(
"cpu,main", // comma in the name
models.Tags{
models.NewTags(map[string]string{
"regions": "east",
},
}),
models.Fields{
"value": 1.0,
},
@ -661,9 +661,9 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu\ load,region=east value=1.0`,
NewTestPoint(
"cpu load", // space in the name
models.Tags{
models.NewTags(map[string]string{
"region": "east",
},
}),
models.Fields{
"value": 1.0,
},
@ -673,9 +673,9 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu\=load,region=east value=1.0`,
NewTestPoint(
`cpu\=load`, // backslash is literal
models.Tags{
models.NewTags(map[string]string{
"region": "east",
},
}),
models.Fields{
"value": 1.0,
},
@ -685,9 +685,9 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu=load,region=east value=1.0`,
NewTestPoint(
`cpu=load`, // literal equals is fine in measurement name
models.Tags{
models.NewTags(map[string]string{
"region": "east",
},
}),
models.Fields{
"value": 1.0,
},
@ -696,9 +696,9 @@ func TestParsePointUnescape(t *testing.T) {
// commas in tag names
test(t, `cpu,region\,zone=east value=1.0`,
NewTestPoint("cpu",
models.Tags{
models.NewTags(map[string]string{
"region,zone": "east", // comma in the tag key
},
}),
models.Fields{
"value": 1.0,
},
@ -707,9 +707,9 @@ func TestParsePointUnescape(t *testing.T) {
// spaces in tag name
test(t, `cpu,region\ zone=east value=1.0`,
NewTestPoint("cpu",
models.Tags{
models.NewTags(map[string]string{
"region zone": "east", // space in the tag name
},
}),
models.Fields{
"value": 1.0,
},
@ -718,9 +718,9 @@ func TestParsePointUnescape(t *testing.T) {
// backslash with escaped equals in tag name
test(t, `cpu,reg\\=ion=east value=1.0`,
NewTestPoint("cpu",
models.Tags{
models.NewTags(map[string]string{
`reg\=ion`: "east",
},
}),
models.Fields{
"value": 1.0,
},
@ -729,9 +729,9 @@ func TestParsePointUnescape(t *testing.T) {
// space is tag name
test(t, `cpu,\ =east value=1.0`,
NewTestPoint("cpu",
models.Tags{
models.NewTags(map[string]string{
" ": "east", // tag name is single space
},
}),
models.Fields{
"value": 1.0,
},
@ -740,9 +740,9 @@ func TestParsePointUnescape(t *testing.T) {
// commas in tag values
test(t, `cpu,regions=east\,west value=1.0`,
NewTestPoint("cpu",
models.Tags{
models.NewTags(map[string]string{
"regions": "east,west", // comma in the tag value
},
}),
models.Fields{
"value": 1.0,
},
@ -752,9 +752,9 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu,regions=\\ east value=1.0`,
NewTestPoint(
"cpu",
models.Tags{
models.NewTags(map[string]string{
"regions": `\ east`,
},
}),
models.Fields{
"value": 1.0,
},
@ -764,9 +764,9 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu,regions=eas\\ t value=1.0`,
NewTestPoint(
"cpu",
models.Tags{
models.NewTags(map[string]string{
"regions": `eas\ t`,
},
}),
models.Fields{
"value": 1.0,
},
@ -776,9 +776,9 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu,regions=east\\ value=1.0`,
NewTestPoint(
"cpu",
models.Tags{
models.NewTags(map[string]string{
"regions": `east\ `,
},
}),
models.Fields{
"value": 1.0,
},
@ -787,9 +787,9 @@ func TestParsePointUnescape(t *testing.T) {
// spaces in tag values
test(t, `cpu,regions=east\ west value=1.0`,
NewTestPoint("cpu",
models.Tags{
models.NewTags(map[string]string{
"regions": "east west", // comma in the tag value
},
}),
models.Fields{
"value": 1.0,
},
@ -798,9 +798,9 @@ func TestParsePointUnescape(t *testing.T) {
// commas in field keys
test(t, `cpu,regions=east value\,ms=1.0`,
NewTestPoint("cpu",
models.Tags{
models.NewTags(map[string]string{
"regions": "east",
},
}),
models.Fields{
"value,ms": 1.0, // comma in the field keys
},
@ -809,9 +809,9 @@ func TestParsePointUnescape(t *testing.T) {
// spaces in field keys
test(t, `cpu,regions=east value\ ms=1.0`,
NewTestPoint("cpu",
models.Tags{
models.NewTags(map[string]string{
"regions": "east",
},
}),
models.Fields{
"value ms": 1.0, // comma in the field keys
},
@ -820,10 +820,10 @@ func TestParsePointUnescape(t *testing.T) {
// tag with no value
test(t, `cpu,regions=east value="1"`,
NewTestPoint("cpu",
models.Tags{
models.NewTags(map[string]string{
"regions": "east",
"foobar": "",
},
}),
models.Fields{
"value": "1",
},
@ -832,9 +832,9 @@ func TestParsePointUnescape(t *testing.T) {
// commas in field values
test(t, `cpu,regions=east value="1,0"`,
NewTestPoint("cpu",
models.Tags{
models.NewTags(map[string]string{
"regions": "east",
},
}),
models.Fields{
"value": "1,0", // comma in the field value
},
@ -844,9 +844,9 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu,regions=eas\t value=1.0`,
NewTestPoint(
"cpu",
models.Tags{
models.NewTags(map[string]string{
"regions": "eas\\t",
},
}),
models.Fields{
"value": 1.0,
},
@ -856,9 +856,9 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu,regions=\\,\,\=east value=1.0`,
NewTestPoint(
"cpu",
models.Tags{
models.NewTags(map[string]string{
"regions": `\,,=east`,
},
}),
models.Fields{
"value": 1.0,
},
@ -868,7 +868,7 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu \a=1i`,
NewTestPoint(
"cpu",
models.Tags{},
models.NewTags(map[string]string{}),
models.Fields{
"\\a": int64(1), // Left as parsed since it's not a known escape sequence.
},
@ -878,9 +878,9 @@ func TestParsePointUnescape(t *testing.T) {
test(t, `cpu=load,equals\=foo=tag\=value value=1i`,
NewTestPoint(
"cpu=load", // Not escaped
models.Tags{
models.NewTags(map[string]string{
"equals=foo": "tag=value", // Tag and value unescaped
},
}),
models.Fields{
"value": int64(1),
},
@ -892,7 +892,7 @@ func TestParsePointWithTags(t *testing.T) {
test(t,
"cpu,host=serverA,region=us-east value=1.0 1000000000",
NewTestPoint("cpu",
models.Tags{"host": "serverA", "region": "us-east"},
models.NewTags(map[string]string{"host": "serverA", "region": "us-east"}),
models.Fields{"value": 1.0}, time.Unix(1, 0)))
}
@ -924,10 +924,10 @@ func TestParsePointWithDuplicateTags(t *testing.T) {
func TestParsePointWithStringField(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo",str2="bar" 1000000000`,
NewTestPoint("cpu",
models.Tags{
models.NewTags(map[string]string{
"host": "serverA",
"region": "us-east",
},
}),
models.Fields{
"value": 1.0,
"str": "foo",
@ -938,10 +938,10 @@ func TestParsePointWithStringField(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east str="foo \" bar" 1000000000`,
NewTestPoint("cpu",
models.Tags{
models.NewTags(map[string]string{
"host": "serverA",
"region": "us-east",
},
}),
models.Fields{
"str": `foo " bar`,
},
@ -954,10 +954,10 @@ func TestParsePointWithStringWithSpaces(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo bar" 1000000000`,
NewTestPoint(
"cpu",
models.Tags{
models.NewTags(map[string]string{
"host": "serverA",
"region": "us-east",
},
}),
models.Fields{
"value": 1.0,
"str": "foo bar", // spaces in string value
@ -970,10 +970,10 @@ func TestParsePointWithStringWithNewline(t *testing.T) {
test(t, "cpu,host=serverA,region=us-east value=1.0,str=\"foo\nbar\" 1000000000",
NewTestPoint(
"cpu",
models.Tags{
models.NewTags(map[string]string{
"host": "serverA",
"region": "us-east",
},
}),
models.Fields{
"value": 1.0,
"str": "foo\nbar", // newline in string value
@ -987,10 +987,10 @@ func TestParsePointWithStringWithCommas(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo\,bar" 1000000000`,
NewTestPoint(
"cpu",
models.Tags{
models.NewTags(map[string]string{
"host": "serverA",
"region": "us-east",
},
}),
models.Fields{
"value": 1.0,
"str": `foo\,bar`, // commas in string value
@ -1002,10 +1002,10 @@ func TestParsePointWithStringWithCommas(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo,bar" 1000000000`,
NewTestPoint(
"cpu",
models.Tags{
models.NewTags(map[string]string{
"host": "serverA",
"region": "us-east",
},
}),
models.Fields{
"value": 1.0,
"str": "foo,bar", // commas in string value
@ -1019,10 +1019,10 @@ func TestParsePointQuotedMeasurement(t *testing.T) {
test(t, `"cpu",host=serverA,region=us-east value=1.0 1000000000`,
NewTestPoint(
`"cpu"`,
models.Tags{
models.NewTags(map[string]string{
"host": "serverA",
"region": "us-east",
},
}),
models.Fields{
"value": 1.0,
},
@ -1034,10 +1034,10 @@ func TestParsePointQuotedTags(t *testing.T) {
test(t, `cpu,"host"="serverA",region=us-east value=1.0 1000000000`,
NewTestPoint(
"cpu",
models.Tags{
models.NewTags(map[string]string{
`"host"`: `"serverA"`,
"region": "us-east",
},
}),
models.Fields{
"value": 1.0,
},
@ -1056,7 +1056,7 @@ func TestParsePointsUnbalancedQuotedTags(t *testing.T) {
}
// Expected " in the tag value
exp := models.MustNewPoint("baz", models.Tags{"mytag": `"a`},
exp := models.MustNewPoint("baz", models.NewTags(map[string]string{"mytag": `"a`}),
models.Fields{"x": float64(1)}, time.Unix(0, 1441103862125))
if pts[0].String() != exp.String() {
@ -1064,7 +1064,7 @@ func TestParsePointsUnbalancedQuotedTags(t *testing.T) {
}
// Expected two points to ensure we did not overscan the line
exp = models.MustNewPoint("baz", models.Tags{"mytag": `a`},
exp = models.MustNewPoint("baz", models.NewTags(map[string]string{"mytag": `a`}),
models.Fields{"z": float64(1)}, time.Unix(0, 1441103862126))
if pts[1].String() != exp.String() {
@ -1078,10 +1078,10 @@ func TestParsePointEscapedStringsAndCommas(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east value="{Hello\"{,}\" World}" 1000000000`,
NewTestPoint(
"cpu",
models.Tags{
models.NewTags(map[string]string{
"host": "serverA",
"region": "us-east",
},
}),
models.Fields{
"value": `{Hello"{,}" World}`,
},
@ -1092,10 +1092,10 @@ func TestParsePointEscapedStringsAndCommas(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east value="{Hello\"{\,}\" World}" 1000000000`,
NewTestPoint(
"cpu",
models.Tags{
models.NewTags(map[string]string{
"host": "serverA",
"region": "us-east",
},
}),
models.Fields{
"value": `{Hello"{\,}" World}`,
},
@ -1107,10 +1107,10 @@ func TestParsePointWithStringWithEquals(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east str="foo=bar",value=1.0 1000000000`,
NewTestPoint(
"cpu",
models.Tags{
models.NewTags(map[string]string{
"host": "serverA",
"region": "us-east",
},
}),
models.Fields{
"value": 1.0,
"str": "foo=bar", // spaces in string value
@ -1123,7 +1123,7 @@ func TestParsePointWithStringWithBackslash(t *testing.T) {
test(t, `cpu value="test\\\"" 1000000000`,
NewTestPoint(
"cpu",
models.Tags{},
models.NewTags(map[string]string{}),
models.Fields{
"value": `test\"`,
},
@ -1133,7 +1133,7 @@ func TestParsePointWithStringWithBackslash(t *testing.T) {
test(t, `cpu value="test\\" 1000000000`,
NewTestPoint(
"cpu",
models.Tags{},
models.NewTags(map[string]string{}),
models.Fields{
"value": `test\`,
},
@ -1143,7 +1143,7 @@ func TestParsePointWithStringWithBackslash(t *testing.T) {
test(t, `cpu value="test\\\"" 1000000000`,
NewTestPoint(
"cpu",
models.Tags{},
models.NewTags(map[string]string{}),
models.Fields{
"value": `test\"`,
},
@ -1153,7 +1153,7 @@ func TestParsePointWithStringWithBackslash(t *testing.T) {
test(t, `cpu value="test\"" 1000000000`,
NewTestPoint(
"cpu",
models.Tags{},
models.NewTags(map[string]string{}),
models.Fields{
"value": `test"`,
},
@ -1165,10 +1165,10 @@ func TestParsePointWithBoolField(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east true=true,t=t,T=T,TRUE=TRUE,True=True,false=false,f=f,F=F,FALSE=FALSE,False=False 1000000000`,
NewTestPoint(
"cpu",
models.Tags{
models.NewTags(map[string]string{
"host": "serverA",
"region": "us-east",
},
}),
models.Fields{
"t": true,
"T": true,
@ -1189,10 +1189,10 @@ func TestParsePointUnicodeString(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east value="wè" 1000000000`,
NewTestPoint(
"cpu",
models.Tags{
models.NewTags(map[string]string{
"host": "serverA",
"region": "us-east",
},
}),
models.Fields{
"value": "wè",
},
@ -1204,7 +1204,7 @@ func TestParsePointNegativeTimestamp(t *testing.T) {
test(t, `cpu value=1 -1`,
NewTestPoint(
"cpu",
models.Tags{},
models.NewTags(map[string]string{}),
models.Fields{
"value": 1.0,
},
@ -1216,7 +1216,7 @@ func TestParsePointMaxTimestamp(t *testing.T) {
test(t, fmt.Sprintf(`cpu value=1 %d`, models.MaxNanoTime),
NewTestPoint(
"cpu",
models.Tags{},
models.NewTags(map[string]string{}),
models.Fields{
"value": 1.0,
},
@ -1228,7 +1228,7 @@ func TestParsePointMinTimestamp(t *testing.T) {
test(t, `cpu value=1 -9223372036854775808`,
NewTestPoint(
"cpu",
models.Tags{},
models.NewTags(map[string]string{}),
models.Fields{
"value": 1.0,
},
@ -1259,7 +1259,7 @@ func TestNewPointFloatWithoutDecimal(t *testing.T) {
test(t, `cpu value=1 1000000000`,
NewTestPoint(
"cpu",
models.Tags{},
models.NewTags(map[string]string{}),
models.Fields{
"value": 1.0,
},
@ -1270,7 +1270,7 @@ func TestNewPointNegativeFloat(t *testing.T) {
test(t, `cpu value=-0.64 1000000000`,
NewTestPoint(
"cpu",
models.Tags{},
models.NewTags(map[string]string{}),
models.Fields{
"value": -0.64,
},
@ -1282,7 +1282,7 @@ func TestNewPointFloatNoDecimal(t *testing.T) {
test(t, `cpu value=1. 1000000000`,
NewTestPoint(
"cpu",
models.Tags{},
models.NewTags(map[string]string{}),
models.Fields{
"value": 1.0,
},
@ -1294,7 +1294,7 @@ func TestNewPointFloatScientific(t *testing.T) {
test(t, `cpu value=6.632243e+06 1000000000`,
NewTestPoint(
"cpu",
models.Tags{},
models.NewTags(map[string]string{}),
models.Fields{
"value": float64(6632243),
},
@ -1306,7 +1306,7 @@ func TestNewPointLargeInteger(t *testing.T) {
test(t, `cpu value=6632243i 1000000000`,
NewTestPoint(
"cpu",
models.Tags{},
models.NewTags(map[string]string{}),
models.Fields{
"value": int64(6632243), // if incorrectly encoded as a float, it would show up as 6.632243e+06
},
@ -1403,7 +1403,7 @@ func TestParsePointToString(t *testing.T) {
t.Errorf("ParsePoint() to string mismatch:\n got %v\n exp %v", got, line)
}
pt = models.MustNewPoint("cpu", models.Tags{"host": "serverA", "region": "us-east"},
pt = models.MustNewPoint("cpu", models.NewTags(map[string]string{"host": "serverA", "region": "us-east"}),
models.Fields{"int": 10, "float": float64(11.0), "float2": float64(12.123), "bool": false, "str": "string val"},
time.Unix(1, 0))
@ -1600,26 +1600,26 @@ cpu,host=serverA,region=us-east value=1.0 946730096789012345`,
func TestNewPointEscaped(t *testing.T) {
// commas
pt := models.MustNewPoint("cpu,main", models.Tags{"tag,bar": "value"}, models.Fields{"name,bar": 1.0}, time.Unix(0, 0))
pt := models.MustNewPoint("cpu,main", models.NewTags(map[string]string{"tag,bar": "value"}), models.Fields{"name,bar": 1.0}, time.Unix(0, 0))
if exp := `cpu\,main,tag\,bar=value name\,bar=1 0`; pt.String() != exp {
t.Errorf("NewPoint().String() mismatch.\ngot %v\nexp %v", pt.String(), exp)
}
// spaces
pt = models.MustNewPoint("cpu main", models.Tags{"tag bar": "value"}, models.Fields{"name bar": 1.0}, time.Unix(0, 0))
pt = models.MustNewPoint("cpu main", models.NewTags(map[string]string{"tag bar": "value"}), models.Fields{"name bar": 1.0}, time.Unix(0, 0))
if exp := `cpu\ main,tag\ bar=value name\ bar=1 0`; pt.String() != exp {
t.Errorf("NewPoint().String() mismatch.\ngot %v\nexp %v", pt.String(), exp)
}
// equals
pt = models.MustNewPoint("cpu=main", models.Tags{"tag=bar": "value=foo"}, models.Fields{"name=bar": 1.0}, time.Unix(0, 0))
pt = models.MustNewPoint("cpu=main", models.NewTags(map[string]string{"tag=bar": "value=foo"}), models.Fields{"name=bar": 1.0}, time.Unix(0, 0))
if exp := `cpu=main,tag\=bar=value\=foo name\=bar=1 0`; pt.String() != exp {
t.Errorf("NewPoint().String() mismatch.\ngot %v\nexp %v", pt.String(), exp)
}
}
func TestNewPointWithoutField(t *testing.T) {
_, err := models.NewPoint("cpu", models.Tags{"tag": "bar"}, models.Fields{}, time.Unix(0, 0))
_, err := models.NewPoint("cpu", models.NewTags(map[string]string{"tag": "bar"}), models.Fields{}, time.Unix(0, 0))
if err == nil {
t.Fatalf(`NewPoint() expected error. got nil`)
}
@ -1645,19 +1645,19 @@ func TestNewPointUnhandledType(t *testing.T) {
}
func TestMakeKeyEscaped(t *testing.T) {
if exp, got := `cpu\ load`, models.MakeKey([]byte(`cpu\ load`), models.Tags{}); string(got) != exp {
if exp, got := `cpu\ load`, models.MakeKey([]byte(`cpu\ load`), models.NewTags(map[string]string{})); string(got) != exp {
t.Errorf("MakeKey() mismatch.\ngot %v\nexp %v", got, exp)
}
if exp, got := `cpu\ load`, models.MakeKey([]byte(`cpu load`), models.Tags{}); string(got) != exp {
if exp, got := `cpu\ load`, models.MakeKey([]byte(`cpu load`), models.NewTags(map[string]string{})); string(got) != exp {
t.Errorf("MakeKey() mismatch.\ngot %v\nexp %v", got, exp)
}
if exp, got := `cpu\,load`, models.MakeKey([]byte(`cpu\,load`), models.Tags{}); string(got) != exp {
if exp, got := `cpu\,load`, models.MakeKey([]byte(`cpu\,load`), models.NewTags(map[string]string{})); string(got) != exp {
t.Errorf("MakeKey() mismatch.\ngot %v\nexp %v", got, exp)
}
if exp, got := `cpu\,load`, models.MakeKey([]byte(`cpu,load`), models.Tags{}); string(got) != exp {
if exp, got := `cpu\,load`, models.MakeKey([]byte(`cpu,load`), models.NewTags(map[string]string{})); string(got) != exp {
t.Errorf("MakeKey() mismatch.\ngot %v\nexp %v", got, exp)
}
@ -1866,7 +1866,7 @@ func TestNewPointsRejectsMaxKey(t *testing.T) {
}
func TestParseKeyEmpty(t *testing.T) {
if _, _, err := models.ParseKey(""); err != nil {
if _, _, err := models.ParseKey(nil); err != nil {
t.Fatalf("unexpected error: %v", err)
}
}

View File

@ -212,14 +212,13 @@ func (m *Monitor) Statistics(tags map[string]string) ([]*Statistic, error) {
statistic := &Statistic{
Statistic: models.Statistic{
Tags: make(map[string]string),
Values: make(map[string]interface{}),
},
}
// Add any supplied tags.
for k, v := range tags {
statistic.Tags[k] = v
statistic.Tags.SetString(k, v)
}
// Every other top-level expvar value is a map.
@ -242,7 +241,7 @@ func (m *Monitor) Statistics(tags map[string]string) ([]*Statistic, error) {
if err != nil {
return
}
statistic.Tags[t.Key] = u
statistic.Tags.SetString(t.Key, u)
})
case "values":
// string-interface map.
@ -281,14 +280,13 @@ func (m *Monitor) Statistics(tags map[string]string) ([]*Statistic, error) {
statistic := &Statistic{
Statistic: models.Statistic{
Name: "runtime",
Tags: make(map[string]string),
Values: make(map[string]interface{}),
},
}
// Add any supplied tags to Go memstats
for k, v := range tags {
statistic.Tags[k] = v
statistic.Tags.SetString(k, v)
}
var rt runtime.MemStats

View File

@ -70,7 +70,7 @@ func NewService(c Config) *Service {
Logger: log.New(os.Stderr, "[collectd] ", log.LstdFlags),
err: make(chan error),
stats: &Statistics{},
statTags: map[string]string{"bind": c.BindAddress},
statTags: models.NewTags(map[string]string{"bind": c.BindAddress}),
}
return &s
@ -360,8 +360,9 @@ func (s *Service) UnmarshalCollectd(packet *gollectd.Packet) []models.Point {
if packet.TypeInstance != "" {
tags["type_instance"] = packet.TypeInstance
}
p, err := models.NewPoint(name, tags, fields, timestamp)
// Drop invalid points
p, err := models.NewPoint(name, models.NewTags(tags), fields, timestamp)
if err != nil {
s.Logger.Printf("Dropping point %v: %v", name, err)
atomic.AddInt64(&s.stats.InvalidDroppedPoints, 1)

View File

@ -147,7 +147,7 @@ type Statistics struct {
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{
Name: "cq",
Tags: tags,
Tags: models.NewTags(tags),
Values: map[string]interface{}{
statQueryOK: atomic.LoadInt64(&s.stats.QueryOK),
statQueryFail: atomic.LoadInt64(&s.stats.QueryFail),

View File

@ -116,12 +116,12 @@ func (c *Config) WithDefaults() *Config {
// DefaultTags returns the config's tags.
func (c *Config) DefaultTags() models.Tags {
tags := models.Tags{}
m := make(map[string]string, len(c.Tags))
for _, t := range c.Tags {
parts := strings.Split(t, "=")
tags[parts[0]] = parts[1]
m[parts[0]] = parts[1]
}
return tags
return models.NewTags(m)
}
// Validate validates the config's templates and tags.

View File

@ -64,12 +64,12 @@ func NewParserWithOptions(options Options) (*Parser, error) {
}
// Parse out the default tags specific to this template
tags := models.Tags{}
var tags models.Tags
if strings.Contains(parts[len(parts)-1], "=") {
tagStrs := strings.Split(parts[len(parts)-1], ",")
for _, kv := range tagStrs {
parts := strings.Split(kv, "=")
tags[parts[0]] = parts[1]
tags.SetString(parts[0], parts[1])
}
}
@ -151,12 +151,12 @@ func (p *Parser) Parse(line string) (models.Point, error) {
}
// Set the default tags on the point if they are not already set
for k, v := range p.tags {
if _, ok := tags[k]; !ok {
tags[k] = v
for _, t := range p.tags {
if _, ok := tags[string(t.Key)]; !ok {
tags[string(t.Key)] = string(t.Value)
}
}
return models.NewPoint(measurement, tags, fieldValues, timestamp)
return models.NewPoint(measurement, models.NewTags(tags), fieldValues, timestamp)
}
// ApplyTemplate extracts the template fields from the given line and
@ -171,9 +171,9 @@ func (p *Parser) ApplyTemplate(line string) (string, map[string]string, string,
template := p.matcher.Match(fields[0])
name, tags, field, err := template.Apply(fields[0])
// Set the default tags on the point if they are not already set
for k, v := range p.tags {
if _, ok := tags[k]; !ok {
tags[k] = v
for _, t := range p.tags {
if _, ok := tags[string(t.Key)]; !ok {
tags[string(t.Key)] = string(t.Value)
}
}
return name, tags, field, err
@ -223,8 +223,8 @@ func (t *template) Apply(line string) (string, map[string]string, string, error)
)
// Set any default tags
for k, v := range t.defaultTags {
tags[k] = append(tags[k], v)
for _, t := range t.defaultTags {
tags[string(t.Key)] = append(tags[string(t.Key)], string(t.Value))
}
// See if an invalid combination has been specified in the template:

View File

@ -261,7 +261,7 @@ func TestFilterMatchDefault(t *testing.T) {
}
exp := models.MustNewPoint("miss.servers.localhost.cpu_load",
models.Tags{},
models.NewTags(map[string]string{}),
models.Fields{"value": float64(11)},
time.Unix(1435077219, 0))
@ -282,7 +282,7 @@ func TestFilterMatchMultipleMeasurement(t *testing.T) {
}
exp := models.MustNewPoint("cpu.cpu_load.10",
models.Tags{"host": "localhost"},
models.NewTags(map[string]string{"host": "localhost"}),
models.Fields{"value": float64(11)},
time.Unix(1435077219, 0))
@ -306,7 +306,7 @@ func TestFilterMatchMultipleMeasurementSeparator(t *testing.T) {
}
exp := models.MustNewPoint("cpu_cpu_load_10",
models.Tags{"host": "localhost"},
models.NewTags(map[string]string{"host": "localhost"}),
models.Fields{"value": float64(11)},
time.Unix(1435077219, 0))
@ -327,7 +327,7 @@ func TestFilterMatchSingle(t *testing.T) {
}
exp := models.MustNewPoint("cpu_load",
models.Tags{"host": "localhost"},
models.NewTags(map[string]string{"host": "localhost"}),
models.Fields{"value": float64(11)},
time.Unix(1435077219, 0))
@ -348,7 +348,7 @@ func TestParseNoMatch(t *testing.T) {
}
exp := models.MustNewPoint("servers.localhost.memory.VmallocChunk",
models.Tags{},
models.NewTags(map[string]string{}),
models.Fields{"value": float64(11)},
time.Unix(1435077219, 0))
@ -369,7 +369,7 @@ func TestFilterMatchWildcard(t *testing.T) {
}
exp := models.MustNewPoint("cpu_load",
models.Tags{"host": "localhost"},
models.NewTags(map[string]string{"host": "localhost"}),
models.Fields{"value": float64(11)},
time.Unix(1435077219, 0))
@ -392,7 +392,7 @@ func TestFilterMatchExactBeforeWildcard(t *testing.T) {
}
exp := models.MustNewPoint("cpu_load",
models.Tags{"host": "localhost"},
models.NewTags(map[string]string{"host": "localhost"}),
models.Fields{"value": float64(11)},
time.Unix(1435077219, 0))
@ -420,7 +420,7 @@ func TestFilterMatchMostLongestFilter(t *testing.T) {
}
exp := models.MustNewPoint("cpu_load",
models.Tags{"host": "localhost", "resource": "cpu"},
models.NewTags(map[string]string{"host": "localhost", "resource": "cpu"}),
models.Fields{"value": float64(11)},
time.Unix(1435077219, 0))
@ -447,7 +447,7 @@ func TestFilterMatchMultipleWildcards(t *testing.T) {
}
exp := models.MustNewPoint("cpu_load",
models.Tags{"host": "server01"},
models.NewTags(map[string]string{"host": "server01"}),
models.Fields{"value": float64(11)},
time.Unix(1435077219, 0))
@ -462,17 +462,17 @@ func TestFilterMatchMultipleWildcards(t *testing.T) {
}
func TestParseDefaultTags(t *testing.T) {
p, err := graphite.NewParser([]string{"servers.localhost .host.measurement*"}, models.Tags{
p, err := graphite.NewParser([]string{"servers.localhost .host.measurement*"}, models.NewTags(map[string]string{
"region": "us-east",
"zone": "1c",
"host": "should not set",
})
}))
if err != nil {
t.Fatalf("unexpected error creating parser, got %v", err)
}
exp := models.MustNewPoint("cpu_load",
models.Tags{"host": "localhost", "region": "us-east", "zone": "1c"},
models.NewTags(map[string]string{"host": "localhost", "region": "us-east", "zone": "1c"}),
models.Fields{"value": float64(11)},
time.Unix(1435077219, 0))
@ -487,16 +487,16 @@ func TestParseDefaultTags(t *testing.T) {
}
func TestParseDefaultTemplateTags(t *testing.T) {
p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c"}, models.Tags{
p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c"}, models.NewTags(map[string]string{
"region": "us-east",
"host": "should not set",
})
}))
if err != nil {
t.Fatalf("unexpected error creating parser, got %v", err)
}
exp := models.MustNewPoint("cpu_load",
models.Tags{"host": "localhost", "region": "us-east", "zone": "1c"},
models.NewTags(map[string]string{"host": "localhost", "region": "us-east", "zone": "1c"}),
models.Fields{"value": float64(11)},
time.Unix(1435077219, 0))
@ -511,16 +511,16 @@ func TestParseDefaultTemplateTags(t *testing.T) {
}
func TestParseDefaultTemplateTagsOverridGlobal(t *testing.T) {
p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c,region=us-east"}, models.Tags{
p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c,region=us-east"}, models.NewTags(map[string]string{
"region": "shot not be set",
"host": "should not set",
})
}))
if err != nil {
t.Fatalf("unexpected error creating parser, got %v", err)
}
exp := models.MustNewPoint("cpu_load",
models.Tags{"host": "localhost", "region": "us-east", "zone": "1c"},
models.NewTags(map[string]string{"host": "localhost", "region": "us-east", "zone": "1c"}),
models.Fields{"value": float64(11)},
time.Unix(1435077219, 0))
@ -535,16 +535,16 @@ func TestParseDefaultTemplateTagsOverridGlobal(t *testing.T) {
}
func TestParseTemplateWhitespace(t *testing.T) {
p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c"}, models.Tags{
p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c"}, models.NewTags(map[string]string{
"region": "us-east",
"host": "should not set",
})
}))
if err != nil {
t.Fatalf("unexpected error creating parser, got %v", err)
}
exp := models.MustNewPoint("cpu_load",
models.Tags{"host": "localhost", "region": "us-east", "zone": "1c"},
models.NewTags(map[string]string{"host": "localhost", "region": "us-east", "zone": "1c"}),
models.Fields{"value": float64(11)},
time.Unix(1435077219, 0))

View File

@ -106,7 +106,7 @@ func NewService(c Config) (*Service, error) {
batchTimeout: time.Duration(d.BatchTimeout),
logger: log.New(os.Stderr, fmt.Sprintf("[graphite] %s ", d.BindAddress), log.LstdFlags),
stats: &Statistics{},
statTags: map[string]string{"proto": d.Protocol, "bind": d.BindAddress},
statTags: models.NewTags(map[string]string{"proto": d.Protocol, "bind": d.BindAddress}),
tcpConnections: make(map[string]*tcpConnection),
done: make(chan struct{}),
diagsKey: strings.Join([]string{"graphite", d.Protocol, d.BindAddress}, ":"),

View File

@ -39,7 +39,7 @@ func Test_ServerGraphiteTCP(t *testing.T) {
pt, _ := models.NewPoint(
"cpu",
map[string]string{},
models.NewTags(map[string]string{}),
map[string]interface{}{"value": 23.456},
time.Unix(now.Unix(), 0))
@ -115,7 +115,7 @@ func Test_ServerGraphiteUDP(t *testing.T) {
pt, _ := models.NewPoint(
"cpu",
map[string]string{},
models.NewTags(map[string]string{}),
map[string]interface{}{"value": 23.456},
time.Unix(now.Unix(), 0))
if database != "graphitedb" {

View File

@ -179,7 +179,7 @@ type Statistics struct {
func (h *Handler) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{
Name: "httpd",
Tags: tags,
Tags: models.NewTags(tags),
Values: map[string]interface{}{
statRequest: atomic.LoadInt64(&h.stats.Requests),
statCQRequest: atomic.LoadInt64(&h.stats.CQRequests),
@ -737,24 +737,24 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) {
for _, s := range stats {
// Very hackily create a unique key.
buf := bytes.NewBufferString(s.Name)
if path, ok := s.Tags["path"]; ok {
if path := s.Tags.Get([]byte("path")); path != nil {
fmt.Fprintf(buf, ":%s", path)
if id, ok := s.Tags["id"]; ok {
if id := s.Tags.Get([]byte("id")); id != nil {
fmt.Fprintf(buf, ":%s", id)
}
} else if bind, ok := s.Tags["bind"]; ok {
if proto, ok := s.Tags["proto"]; ok {
} else if bind := s.Tags.Get([]byte("bind")); bind != nil {
if proto := s.Tags.Get([]byte("proto")); proto != nil {
fmt.Fprintf(buf, ":%s", proto)
}
fmt.Fprintf(buf, ":%s", bind)
} else if database, ok := s.Tags["database"]; ok {
} else if database := s.Tags.Get([]byte("database")); database != nil {
fmt.Fprintf(buf, ":%s", database)
if rp, ok := s.Tags["retention_policy"]; ok {
if rp := s.Tags.Get([]byte("retention_policy")); rp != nil {
fmt.Fprintf(buf, ":%s", rp)
if name, ok := s.Tags["name"]; ok {
if name := s.Tags.Get([]byte("name")); name != nil {
fmt.Fprintf(buf, ":%s", name)
}
if dest, ok := s.Tags["destination"]; ok {
if dest := s.Tags.Get([]byte("destination")); dest != nil {
fmt.Fprintf(buf, ":%s", dest)
}
}

View File

@ -114,7 +114,7 @@ func (w *csvResponseWriter) WriteResponse(resp Response) (n int, err error) {
for _, row := range result.Series {
w.columns[0] = row.Name
if len(row.Tags) > 0 {
w.columns[1] = string(models.Tags(row.Tags).HashKey()[1:])
w.columns[1] = string(models.NewTags(row.Tags).HashKey()[1:])
} else {
w.columns[1] = ""
}

View File

@ -192,7 +192,7 @@ func (s *Service) Addr() net.Addr {
// Statistics returns statistics for periodic monitoring.
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
return s.Handler.Statistics(models.Tags{"bind": s.addr}.Merge(tags))
return s.Handler.Statistics(models.NewTags(map[string]string{"bind": s.addr}).Merge(tags).Map())
}
// serveTCP serves the handler from the TCP listener.

View File

@ -111,7 +111,7 @@ func (h *Handler) servePut(w http.ResponseWriter, r *http.Request) {
ts = time.Unix(p.Time/1000, (p.Time%1000)*1000)
}
pt, err := models.NewPoint(p.Metric, p.Tags, map[string]interface{}{"value": p.Value}, ts)
pt, err := models.NewPoint(p.Metric, models.NewTags(p.Tags), map[string]interface{}{"value": p.Value}, ts)
if err != nil {
h.Logger.Printf("Dropping point %v: %v", p.Metric, err)
if h.stats != nil {

View File

@ -96,7 +96,7 @@ func NewService(c Config) (*Service, error) {
Logger: log.New(os.Stderr, "[opentsdb] ", log.LstdFlags),
LogPointErrors: d.LogPointErrors,
stats: &Statistics{},
statTags: map[string]string{"bind": d.BindAddress},
statTags: models.NewTags(map[string]string{"bind": d.BindAddress}),
}
return s, nil
}
@ -381,7 +381,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
}
fields["value"] = fv
pt, err := models.NewPoint(measurement, tags, fields, t)
pt, err := models.NewPoint(measurement, models.NewTags(tags), fields, t)
if err != nil {
atomic.AddInt64(&s.stats.TelnetBadFloat, 1)
if s.LogPointErrors {

View File

@ -39,7 +39,7 @@ func TestService_Telnet(t *testing.T) {
} else if !reflect.DeepEqual(points, []models.Point{
models.MustNewPoint(
"sys.cpu.user",
map[string]string{"host": "webserver01", "cpu": "0"},
models.NewTags(map[string]string{"host": "webserver01", "cpu": "0"}),
map[string]interface{}{"value": 42.5},
time.Unix(1356998400, 0),
),
@ -101,7 +101,7 @@ func TestService_HTTP(t *testing.T) {
} else if !reflect.DeepEqual(points, []models.Point{
models.MustNewPoint(
"sys.cpu.nice",
map[string]string{"dc": "lga", "host": "web01"},
models.NewTags(map[string]string{"dc": "lga", "host": "web01"}),
map[string]interface{}{"value": 18.0},
time.Unix(1346846400, 0),
),

View File

@ -130,7 +130,7 @@ type Statistics struct {
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
statistics := []models.Statistic{{
Name: "subscriber",
Tags: tags,
Tags: models.NewTags(tags),
Values: map[string]interface{}{
statPointsWritten: atomic.LoadInt64(&s.stats.PointsWritten),
statWriteFailures: atomic.LoadInt64(&s.stats.WriteFailures),
@ -415,13 +415,13 @@ func (b *balancewriter) WritePoints(p *coordinator.WritePointsRequest) error {
// Statistics returns statistics for periodic monitoring.
func (b *balancewriter) Statistics(tags map[string]string) []models.Statistic {
tags = models.Tags(tags).Merge(b.tags)
tags = models.NewTags(tags).Merge(b.tags).Map()
statistics := make([]models.Statistic, len(b.stats))
for i := range b.stats {
statistics[i] = models.Statistic{
Name: "subscriber",
Tags: models.Tags(tags).Merge(map[string]string{"destination": b.stats[i].dest}),
Tags: models.NewTags(tags).Merge(map[string]string{"destination": b.stats[i].dest}),
Values: map[string]interface{}{
statPointsWritten: atomic.LoadInt64(&b.stats[i].pointsWritten),
statWriteFailures: atomic.LoadInt64(&b.stats[i].failures),

View File

@ -71,7 +71,7 @@ func NewService(c Config) *Service {
batcher: tsdb.NewPointBatcher(d.BatchSize, d.BatchPending, time.Duration(d.BatchTimeout)),
Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags),
stats: &Statistics{},
statTags: map[string]string{"bind": d.BindAddress},
statTags: models.NewTags(map[string]string{"bind": d.BindAddress}),
}
}

View File

@ -12,8 +12,8 @@ func TestCommunePoint(t *testing.T) {
if point.Name() != "write" {
t.Errorf("expected: write\ngot: %v", point.Name())
}
if point.Tags()["tag"] != "tagVal" {
t.Errorf("expected: tagVal\ngot: %v", point.Tags()["tag"])
if point.Tags().GetString("tag") != "tagVal" {
t.Errorf("expected: tagVal\ngot: %v", point.Tags().GetString("tag"))
}
if int(point.Fields()["fooField"].(float64)) != 5 {
t.Errorf("expected: 5\ngot: %v\n", point.Fields()["fooField"])
@ -24,8 +24,8 @@ func TestCommunePoint(t *testing.T) {
if point.Name() != "write" {
t.Errorf("expected: write\ngot: %v", point.Name())
}
if point.Tags()["tag"] != "tagVal" {
t.Errorf("expected: tagVal\ngot: %v", point.Tags()["tag"])
if point.Tags().GetString("tag") != "tagVal" {
t.Errorf("expected: tagVal\ngot: %v", point.Tags().GetString("tag"))
}
if int(point.Fields()["fooField"].(float64)) != 5 {
t.Errorf("expected: 5\ngot: %v\n", point.Fields()["fooField"])
@ -40,8 +40,8 @@ func TestSetCommune(t *testing.T) {
if pt.Name() != "write" {
t.Errorf("expected: write\ngot: %v", pt.Name())
}
if pt.Tags()["tag"] != "tagVal" {
t.Errorf("expected: tagVal\ngot: %v", pt.Tags()["tag"])
if pt.Tags().GetString("tag") != "tagVal" {
t.Errorf("expected: tagVal\ngot: %v", pt.Tags().GetString("tag"))
}
if int(pt.Fields()["fooField"].(float64)) != 5 {
t.Errorf("expected: 5\ngot: %v\n", pt.Fields()["fooField"])

View File

@ -171,7 +171,7 @@ type CacheStatistics struct {
func (c *Cache) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{
Name: "tsm1_cache",
Tags: tags,
Tags: models.NewTags(tags),
Values: map[string]interface{}{
statCacheMemoryBytes: atomic.LoadInt64(&c.stats.MemSizeBytes),
statCacheDiskBytes: atomic.LoadInt64(&c.stats.DiskSizeBytes),

View File

@ -2,6 +2,7 @@ package tsm1 // import "github.com/influxdata/influxdb/tsdb/engine/tsm1"
import (
"archive/tar"
"bytes"
"fmt"
"io"
"io/ioutil"
@ -248,7 +249,7 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic {
statistics := make([]models.Statistic, 0, 4)
statistics = append(statistics, models.Statistic{
Name: "tsm1_engine",
Tags: tags,
Tags: models.NewTags(tags),
Values: map[string]interface{}{
statCacheCompactions: atomic.LoadInt64(&e.stats.CacheCompactions),
statCacheCompactionDuration: atomic.LoadInt64(&e.stats.CacheCompactionDuration),
@ -338,8 +339,9 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index *tsdb.DatabaseIndex) er
// Save reference to index for iterator creation.
e.index = index
e.FileStore.dereferencer = index
if err := e.FileStore.WalkKeys(func(key string, typ byte) error {
if err := e.FileStore.WalkKeys(func(key []byte, typ byte) error {
fieldType, err := tsmFieldTypeToInfluxQLDataType(typ)
if err != nil {
return err
@ -365,7 +367,7 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index *tsdb.DatabaseIndex) er
continue
}
if err := e.addToIndexFromKey(shardID, key, fieldType, index); err != nil {
if err := e.addToIndexFromKey(shardID, []byte(key), fieldType, index); err != nil {
return err
}
}
@ -522,9 +524,9 @@ func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string) er
// addToIndexFromKey will pull the measurement name, series key, and field name from a composite key and add it to the
// database index and measurement fields
func (e *Engine) addToIndexFromKey(shardID uint64, key string, fieldType influxql.DataType, index *tsdb.DatabaseIndex) error {
func (e *Engine) addToIndexFromKey(shardID uint64, key []byte, fieldType influxql.DataType, index *tsdb.DatabaseIndex) error {
seriesKey, field := SeriesAndFieldFromCompositeKey(key)
measurement := tsdb.MeasurementFromSeriesKey(seriesKey)
measurement := tsdb.MeasurementFromSeriesKey(string(seriesKey))
m := index.CreateMeasurementIndexIfNotExists(measurement)
m.SetFieldName(field)
@ -540,7 +542,7 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key string, fieldType influxq
}
// Have we already indexed this series?
ss := index.Series(seriesKey)
ss := index.SeriesBytes(seriesKey)
if ss != nil {
// Add this shard to the existing series
ss.AssignShard(shardID)
@ -551,7 +553,7 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key string, fieldType influxq
// fields (in line protocol format) in the series key
_, tags, _ := models.ParseKey(seriesKey)
s := tsdb.NewSeries(seriesKey, tags)
s := tsdb.NewSeries(string(seriesKey), tags)
index.CreateSeriesIndexIfNotExists(measurement, s)
s.AssignShard(shardID)
@ -593,14 +595,14 @@ func (e *Engine) ContainsSeries(keys []string) (map[string]bool, error) {
}
for _, k := range e.Cache.Keys() {
seriesKey, _ := SeriesAndFieldFromCompositeKey(k)
keyMap[seriesKey] = true
seriesKey, _ := SeriesAndFieldFromCompositeKey([]byte(k))
keyMap[string(seriesKey)] = true
}
if err := e.FileStore.WalkKeys(func(k string, _ byte) error {
if err := e.FileStore.WalkKeys(func(k []byte, _ byte) error {
seriesKey, _ := SeriesAndFieldFromCompositeKey(k)
if _, ok := keyMap[seriesKey]; ok {
keyMap[seriesKey] = true
if _, ok := keyMap[string(seriesKey)]; ok {
keyMap[string(seriesKey)] = true
}
return nil
}); err != nil {
@ -638,14 +640,14 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error {
deleteKeys := make([]string, 0, len(seriesKeys))
// go through the keys in the file store
if err := e.FileStore.WalkKeys(func(k string, _ byte) error {
if err := e.FileStore.WalkKeys(func(k []byte, _ byte) error {
seriesKey, _ := SeriesAndFieldFromCompositeKey(k)
// Keep track if we've added this key since WalkKeys can return keys
// we've seen before
if v, ok := keyMap[seriesKey]; ok && v == 0 {
deleteKeys = append(deleteKeys, k)
keyMap[seriesKey] += 1
if v, ok := keyMap[string(seriesKey)]; ok && v == 0 {
deleteKeys = append(deleteKeys, string(k))
keyMap[string(seriesKey)] += 1
}
return nil
}); err != nil {
@ -665,8 +667,8 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error {
e.Cache.RLock()
s := e.Cache.Store()
for k, _ := range s {
seriesKey, _ := SeriesAndFieldFromCompositeKey(k)
if _, ok := keyMap[seriesKey]; ok {
seriesKey, _ := SeriesAndFieldFromCompositeKey([]byte(k))
if _, ok := keyMap[string(seriesKey)]; ok {
walKeys = append(walKeys, k)
}
}
@ -1277,7 +1279,7 @@ func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, mm *tsdb.Measu
// createVarRefSeriesIterator creates an iterator for a variable reference for a series.
func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measurement, seriesKey string, t *influxql.TagSet, filter influxql.Expr, conditionFields []influxql.VarRef, opt influxql.IteratorOptions) (influxql.Iterator, error) {
tags := influxql.NewTags(e.index.TagsForSeries(seriesKey))
tags := influxql.NewTags(e.index.TagsForSeries(seriesKey).Map())
// Create options specific for this series.
itrOpt := opt
@ -1496,11 +1498,11 @@ func tsmFieldTypeToInfluxQLDataType(typ byte) (influxql.DataType, error) {
}
}
func SeriesAndFieldFromCompositeKey(key string) (string, string) {
sep := strings.Index(key, keyFieldSeparator)
func SeriesAndFieldFromCompositeKey(key []byte) ([]byte, string) {
sep := bytes.Index(key, []byte(keyFieldSeparator))
if sep == -1 {
// No field???
return key, ""
}
return key[:sep], key[sep+len(keyFieldSeparator):]
return key[:sep], string(key[sep+len(keyFieldSeparator):])
}

View File

@ -44,7 +44,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
// Verify index is correct.
if m := index.Measurement("cpu"); m == nil {
t.Fatal("measurement not found")
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "A"}) {
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "A"})) {
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags)
}
@ -67,7 +67,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
// Verify index is correct.
if m := index.Measurement("cpu"); m == nil {
t.Fatal("measurement not found")
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "A"}) {
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "A"})) {
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags)
}
@ -92,9 +92,9 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
// Verify index is correct.
if m := index.Measurement("cpu"); m == nil {
t.Fatal("measurement not found")
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "A"}) {
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "A"})) {
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags)
} else if s := m.SeriesByID(2); s.Key != "cpu,host=B" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "B"}) {
} else if s := m.SeriesByID(2); s.Key != "cpu,host=B" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "B"})) {
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags)
}
}
@ -222,7 +222,7 @@ func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) {
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"}))
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
@ -275,7 +275,7 @@ func TestEngine_CreateIterator_Cache_Descending(t *testing.T) {
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"}))
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
@ -328,7 +328,7 @@ func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) {
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"}))
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
@ -382,7 +382,7 @@ func TestEngine_CreateIterator_TSM_Descending(t *testing.T) {
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"}))
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
@ -437,7 +437,7 @@ func TestEngine_CreateIterator_Aux(t *testing.T) {
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.MeasurementFields("cpu").CreateFieldIfNotExists("F", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"}))
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A F=100 1000000000`,
@ -497,7 +497,7 @@ func TestEngine_CreateIterator_Condition(t *testing.T) {
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.MeasurementFields("cpu").CreateFieldIfNotExists("X", influxql.Float, false)
e.MeasurementFields("cpu").CreateFieldIfNotExists("Y", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"}))
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A X=10 1000000000`,
@ -663,7 +663,7 @@ func MustInitBenchmarkEngine(pointN int) *Engine {
// Initialize metadata.
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"}))
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
// Generate time ascending points with jitterred time & value.
rand := rand.New(rand.NewSource(0))

View File

@ -58,7 +58,7 @@ type TSMFile interface {
KeyCount() int
// KeyAt returns the key located at index position idx
KeyAt(idx int) (string, byte)
KeyAt(idx int) ([]byte, byte)
// Type returns the block type of the values stored for the key. Returns one of
// BlockFloat64, BlockInt64, BlockBoolean, BlockString. If key does not exist,
@ -106,6 +106,13 @@ type TSMFile interface {
// BlockIterator returns an iterator pointing to the first block in the file and
// allows sequential iteration to each every block.
BlockIterator() *BlockIterator
// Removes mmap references held by another object.
deref(dereferencer)
}
type dereferencer interface {
Dereference([]byte)
}
// Statistics gathered by the FileStore.
@ -132,6 +139,8 @@ type FileStore struct {
purger *purger
currentTempDirID int
dereferencer dereferencer
}
type FileStat struct {
@ -157,7 +166,7 @@ func (f FileStat) ContainsKey(key string) bool {
func NewFileStore(dir string) *FileStore {
logger := log.New(os.Stderr, "[filestore] ", log.LstdFlags)
return &FileStore{
fs := &FileStore{
dir: dir,
lastModified: time.Now(),
logger: logger,
@ -169,6 +178,8 @@ func NewFileStore(dir string) *FileStore {
logger: logger,
},
}
fs.purger.fileStore = fs
return fs
}
// enableTraceLogging must be called before the FileStore is opened.
@ -204,7 +215,7 @@ type FileStoreStatistics struct {
func (f *FileStore) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{
Name: "tsm1_filestore",
Tags: tags,
Tags: models.NewTags(tags),
Values: map[string]interface{}{
statFileStoreBytes: atomic.LoadInt64(&f.stats.DiskBytes),
statFileStoreCount: atomic.LoadInt64(&f.stats.FileCount),
@ -281,7 +292,7 @@ func (f *FileStore) Remove(paths ...string) {
// WalkKeys calls fn for every key in every TSM file known to the FileStore. If the key
// exists in multiple files, it will be invoked for each file.
func (f *FileStore) WalkKeys(fn func(key string, typ byte) error) error {
func (f *FileStore) WalkKeys(fn func(key []byte, typ byte) error) error {
f.mu.RLock()
defer f.mu.RUnlock()
@ -305,7 +316,7 @@ func (f *FileStore) Keys() map[string]byte {
for _, f := range f.files {
for i := 0; i < f.KeyCount(); i++ {
key, typ := f.KeyAt(i)
uniqueKeys[key] = typ
uniqueKeys[string(key)] = typ
}
}
@ -556,6 +567,11 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
continue
}
// Remove any mmap references held by the index.
if f.dereferencer != nil {
file.deref(f.dereferencer)
}
if err := file.Close(); err != nil {
return err
}
@ -1088,9 +1104,10 @@ func (c *KeyCursor) filterBooleanValues(tombstones []TimeRange, values BooleanVa
}
type purger struct {
mu sync.RWMutex
files map[string]TSMFile
running bool
mu sync.RWMutex
fileStore *FileStore
files map[string]TSMFile
running bool
logger *log.Logger
}
@ -1118,6 +1135,11 @@ func (p *purger) purge() {
p.mu.Lock()
for k, v := range p.files {
if !v.InUse() {
// Remove any mmap references held by the index.
if p.fileStore.dereferencer != nil {
v.deref(p.fileStore.dereferencer)
}
if err := v.Close(); err != nil {
p.logger.Printf("purge: close file: %v", err)
continue

View File

@ -68,7 +68,7 @@ type TSMIndex interface {
Key(index int) (string, []IndexEntry)
// KeyAt returns the key in the index at the given postion.
KeyAt(index int) (string, byte)
KeyAt(index int) ([]byte, byte)
// KeyCount returns the count of unique keys in the index.
KeyCount() int
@ -116,7 +116,7 @@ func (b *BlockIterator) PeekNext() string {
return b.key
} else if b.n-b.i > 1 {
key, _ := b.r.KeyAt(b.i + 1)
return key
return string(key)
}
return ""
}
@ -243,7 +243,7 @@ func (t *TSMReader) Key(index int) (string, []IndexEntry) {
}
// KeyAt returns the key and key type at position idx in the index.
func (t *TSMReader) KeyAt(idx int) (string, byte) {
func (t *TSMReader) KeyAt(idx int) ([]byte, byte) {
return t.index.KeyAt(idx)
}
@ -489,6 +489,13 @@ func (t *TSMReader) BlockIterator() *BlockIterator {
}
}
// deref removes mmap references held by another object.
func (t *TSMReader) deref(d dereferencer) {
if acc, ok := t.accessor.(*mmapAccessor); ok {
d.Dereference(acc.b)
}
}
// indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This
// implementation can be used for indexes that may be MMAPed into memory.
type indirectIndex struct {
@ -667,15 +674,15 @@ func (d *indirectIndex) Key(idx int) (string, []IndexEntry) {
return string(key), entries.entries
}
func (d *indirectIndex) KeyAt(idx int) (string, byte) {
func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) {
d.mu.RLock()
defer d.mu.RUnlock()
if idx < 0 || idx >= len(d.offsets) {
return "", 0
return nil, 0
}
n, key, _ := readKey(d.b[d.offsets[idx]:])
return string(key), d.b[d.offsets[idx]+int32(n)]
return key, d.b[d.offsets[idx]+int32(n)]
}
func (d *indirectIndex) KeyCount() int {

View File

@ -144,7 +144,7 @@ type WALStatistics struct {
func (l *WAL) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{
Name: "tsm1_wal",
Tags: tags,
Tags: models.NewTags(tags),
Values: map[string]interface{}{
statWALOldBytes: atomic.LoadInt64(&l.stats.OldBytes),
statWALCurrentBytes: atomic.LoadInt64(&l.stats.CurrentBytes),

View File

@ -6,6 +6,7 @@ import (
"sort"
"sync"
"sync/atomic"
"unsafe"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
@ -56,7 +57,7 @@ type IndexStatistics struct {
func (d *DatabaseIndex) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{
Name: "database",
Tags: models.Tags(map[string]string{"database": d.name}).Merge(tags),
Tags: models.NewTags(map[string]string{"database": d.name}).Merge(tags),
Values: map[string]interface{}{
statDatabaseSeries: atomic.LoadInt64(&d.stats.NumSeries),
statDatabaseMeasurements: atomic.LoadInt64(&d.stats.NumMeasurements),
@ -72,6 +73,14 @@ func (d *DatabaseIndex) Series(key string) *Series {
return s
}
// SeriesBytes returns a series by key.
func (d *DatabaseIndex) SeriesBytes(key []byte) *Series {
d.mu.RLock()
s := d.series[string(key)]
d.mu.RUnlock()
return s
}
func (d *DatabaseIndex) SeriesKeys() []string {
d.mu.RLock()
s := make([]string, 0, len(d.series))
@ -249,7 +258,7 @@ func (d *DatabaseIndex) RemoveShard(shardID uint64) {
}
// TagsForSeries returns the tag map for the passed in series
func (d *DatabaseIndex) TagsForSeries(key string) map[string]string {
func (d *DatabaseIndex) TagsForSeries(key string) models.Tags {
d.mu.RLock()
defer d.mu.RUnlock()
@ -515,6 +524,16 @@ func (d *DatabaseIndex) DropSeries(keys []string) {
atomic.AddInt64(&d.stats.NumSeries, -nDeleted)
}
// Dereference removes all references to data within b and moves them to the heap.
func (d *DatabaseIndex) Dereference(b []byte) {
d.mu.RLock()
defer d.mu.RUnlock()
for _, s := range d.series {
s.Dereference(b)
}
}
// Measurement represents a collection of time series in a database. It also contains in memory
// structures for indexing tags. Exported functions are goroutine safe while un-exported functions
// assume the caller will use the appropriate locks
@ -647,13 +666,13 @@ func (m *Measurement) AddSeries(s *Series) bool {
}
// add this series id to the tag index on the measurement
for k, v := range s.Tags {
valueMap := m.seriesByTagKeyValue[k]
for _, t := range s.Tags {
valueMap := m.seriesByTagKeyValue[string(t.Key)]
if valueMap == nil {
valueMap = make(map[string]SeriesIDs)
m.seriesByTagKeyValue[k] = valueMap
m.seriesByTagKeyValue[string(t.Key)] = valueMap
}
ids := valueMap[v]
ids := valueMap[string(t.Value)]
ids = append(ids, s.id)
// most of the time the series ID will be higher than all others because it's a new
@ -661,7 +680,7 @@ func (m *Measurement) AddSeries(s *Series) bool {
if len(ids) > 1 && ids[len(ids)-1] < ids[len(ids)-2] {
sort.Sort(ids)
}
valueMap[v] = ids
valueMap[string(t.Value)] = ids
}
return true
@ -683,19 +702,19 @@ func (m *Measurement) DropSeries(series *Series) {
// remove this series id from the tag index on the measurement
// s.seriesByTagKeyValue is defined as map[string]map[string]SeriesIDs
for k, v := range series.Tags {
values := m.seriesByTagKeyValue[k][v]
for _, t := range series.Tags {
values := m.seriesByTagKeyValue[string(t.Key)][string(t.Value)]
ids := filter(values, seriesID)
// Check to see if we have any ids, if not, remove the key
if len(ids) == 0 {
delete(m.seriesByTagKeyValue[k], v)
delete(m.seriesByTagKeyValue[string(t.Key)], string(t.Value))
} else {
m.seriesByTagKeyValue[k][v] = ids
m.seriesByTagKeyValue[string(t.Key)][string(t.Value)] = ids
}
// If we have no values, then we delete the key
if len(m.seriesByTagKeyValue[k]) == 0 {
delete(m.seriesByTagKeyValue, k)
if len(m.seriesByTagKeyValue[string(t.Key)]) == 0 {
delete(m.seriesByTagKeyValue, string(t.Key))
}
}
@ -758,7 +777,7 @@ func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*
// Build the TagSet for this series.
for _, dim := range dimensions {
tags[dim] = s.Tags[dim]
tags[dim] = s.Tags.GetString(dim)
}
// Convert the TagSet to a string, so it can be added to a map allowing TagSets to be handled
@ -1423,40 +1442,52 @@ func (a Measurements) union(other Measurements) Measurements {
type Series struct {
mu sync.RWMutex
Key string
Tags map[string]string
Tags models.Tags
id uint64
measurement *Measurement
shardIDs map[uint64]bool // shards that have this series defined
shardIDs []uint64 // shards that have this series defined
}
// NewSeries returns an initialized series struct
func NewSeries(key string, tags map[string]string) *Series {
func NewSeries(key string, tags models.Tags) *Series {
return &Series{
Key: key,
Tags: tags,
shardIDs: make(map[uint64]bool),
Key: key,
Tags: tags,
}
}
func (s *Series) AssignShard(shardID uint64) {
s.mu.Lock()
s.shardIDs[shardID] = true
if !s.assigned(shardID) {
s.shardIDs = append(s.shardIDs, shardID)
sort.Sort(uint64Slice(s.shardIDs))
}
s.mu.Unlock()
}
func (s *Series) UnassignShard(shardID uint64) {
s.mu.Lock()
delete(s.shardIDs, shardID)
for i, v := range s.shardIDs {
if v == shardID {
s.shardIDs = append(s.shardIDs[:i], s.shardIDs[i+1:]...)
break
}
}
s.mu.Unlock()
}
func (s *Series) Assigned(shardID uint64) bool {
s.mu.RLock()
b := s.shardIDs[shardID]
b := s.assigned(shardID)
s.mu.RUnlock()
return b
}
func (s *Series) assigned(shardID uint64) bool {
i := sort.Search(len(s.shardIDs), func(i int) bool { return s.shardIDs[i] >= shardID })
return i < len(s.shardIDs) && s.shardIDs[i] == shardID
}
func (s *Series) ShardN() int {
s.mu.RLock()
n := len(s.shardIDs)
@ -1464,6 +1495,36 @@ func (s *Series) ShardN() int {
return n
}
// Dereference removes references to a byte slice.
func (s *Series) Dereference(b []byte) {
s.mu.Lock()
min := uintptr(unsafe.Pointer(&b[0]))
max := min + uintptr(len(b))
for _, t := range s.Tags {
deref(&t.Key, min, max)
deref(&t.Value, min, max)
}
s.mu.Unlock()
}
func deref(v *[]byte, min, max uintptr) {
vv := *v
// Ignore if value is not within range.
ptr := uintptr(unsafe.Pointer(&vv[0]))
if ptr < min || ptr > max {
return
}
// Otherwise copy to the heap.
buf := make([]byte, len(vv))
copy(buf, vv)
*v = buf
}
// MarshalBinary encodes the object to a binary format.
func (s *Series) MarshalBinary() ([]byte, error) {
s.mu.RLock()
@ -1471,10 +1532,8 @@ func (s *Series) MarshalBinary() ([]byte, error) {
var pb internal.Series
pb.Key = &s.Key
for k, v := range s.Tags {
key := k
value := v
pb.Tags = append(pb.Tags, &internal.Tag{Key: &key, Value: &value})
for _, t := range s.Tags {
pb.Tags = append(pb.Tags, &internal.Tag{Key: proto.String(string(t.Key)), Value: proto.String(string(t.Value))})
}
return proto.Marshal(&pb)
}
@ -1489,9 +1548,9 @@ func (s *Series) UnmarshalBinary(buf []byte) error {
return err
}
s.Key = pb.GetKey()
s.Tags = make(map[string]string, len(pb.Tags))
for _, t := range pb.Tags {
s.Tags[t.GetKey()] = t.GetValue()
s.Tags = make(models.Tags, len(pb.Tags))
for i, t := range pb.Tags {
s.Tags[i] = models.Tag{Key: []byte(t.GetKey()), Value: []byte(t.GetValue())}
}
return nil
}
@ -1721,7 +1780,7 @@ func (m *Measurement) tagValuesByKeyAndSeriesID(tagKeys []string, ids SeriesIDs)
// Iterate the tag keys we're interested in and collect values
// from this series, if they exist.
for _, tagKey := range tagKeys {
if tagVal, ok := s.Tags[tagKey]; ok {
if tagVal := s.Tags.GetString(tagKey); tagVal != "" {
if _, ok = tagValues[tagKey]; !ok {
tagValues[tagKey] = newStringSet()
}
@ -1810,6 +1869,12 @@ func filter(a []uint64, v uint64) []uint64 {
// contains a measurement name.
func MeasurementFromSeriesKey(key string) string {
// Ignoring the error because the func returns "missing fields"
k, _, _ := models.ParseKey(key)
k, _, _ := models.ParseKey([]byte(key))
return escape.UnescapeString(k)
}
type uint64Slice []uint64
func (a uint64Slice) Len() int { return len(a) }
func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }

View File

@ -6,6 +6,7 @@ import (
"testing"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
)
@ -182,7 +183,7 @@ func genTestSeries(mCnt, tCnt, vCnt int) []*TestSeries {
for _, ts := range tagSets {
series = append(series, &TestSeries{
Measurement: m,
Series: tsdb.NewSeries(fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))), ts),
Series: tsdb.NewSeries(fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))), models.NewTags(ts)),
})
}
}

View File

@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
@ -117,12 +118,12 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti
closing: make(chan struct{}),
stats: &ShardStatistics{},
statTags: map[string]string{
statTags: models.NewTags(map[string]string{
"path": path,
"id": fmt.Sprintf("%d", id),
"database": db,
"retentionPolicy": rp,
},
}),
database: db,
retentionPolicy: rp,
@ -177,10 +178,10 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic {
return nil
}
tags = s.statTags.Merge(tags)
tags = s.statTags.Merge(tags).Map()
statistics := []models.Statistic{{
Name: "shard",
Tags: models.Tags(tags).Merge(map[string]string{"engine": s.options.EngineVersion}),
Tags: models.NewTags(tags).Merge(map[string]string{"engine": s.options.EngineVersion}),
Values: map[string]interface{}{
statWriteReq: atomic.LoadInt64(&s.stats.WriteReq),
statSeriesCreate: atomic.LoadInt64(&s.stats.SeriesCreated),
@ -449,9 +450,9 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*FieldCreate,
for _, p := range points {
// verify the tags and fields
tags := p.Tags()
if _, ok := tags["time"]; ok {
if v := tags.Get([]byte("time")); v != nil {
s.logger.Printf("dropping tag 'time' from '%s'\n", p.PrecisionString(""))
delete(tags, "time")
tags.Delete([]byte("time"))
p.SetTags(tags)
}
@ -1052,6 +1053,145 @@ func NewTagKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Itera
return newMeasurementKeysIterator(sh, fn, opt)
}
// tagValuesIterator emits key/tag values
type tagValuesIterator struct {
series []*Series // remaining series
keys []string // tag keys to select from a series
fields []string // fields to emit (key or value)
buf struct {
s *Series // current series
keys []string // current tag's keys
}
}
// NewTagValuesIterator returns a new instance of TagValuesIterator.
func NewTagValuesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) {
if opt.Condition == nil {
return nil, errors.New("a condition is required")
}
measurementExpr := influxql.CloneExpr(opt.Condition)
measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || tag.Val != "_name" {
return nil
}
}
}
return e
}), nil)
mms, ok, err := sh.index.measurementsByExpr(measurementExpr)
if err != nil {
return nil, err
} else if !ok {
mms = sh.index.Measurements()
sort.Sort(mms)
}
// If there are no measurements, return immediately.
if len(mms) == 0 {
return &tagValuesIterator{}, nil
}
filterExpr := influxql.CloneExpr(opt.Condition)
filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || strings.HasPrefix(tag.Val, "_") {
return nil
}
}
}
return e
}), nil)
var series []*Series
keys := newStringSet()
for _, mm := range mms {
ss, ok, err := mm.TagKeysByExpr(opt.Condition)
if err != nil {
return nil, err
} else if !ok {
keys.add(mm.TagKeys()...)
} else {
keys = keys.union(ss)
}
ids, err := mm.seriesIDsAllOrByExpr(filterExpr)
if err != nil {
return nil, err
}
for _, id := range ids {
series = append(series, mm.SeriesByID(id))
}
}
return &tagValuesIterator{
series: series,
keys: keys.list(),
fields: influxql.VarRefs(opt.Aux).Strings(),
}, nil
}
// Stats returns stats about the points processed.
func (itr *tagValuesIterator) Stats() influxql.IteratorStats { return influxql.IteratorStats{} }
// Close closes the iterator.
func (itr *tagValuesIterator) Close() error { return nil }
// Next emits the next point in the iterator.
func (itr *tagValuesIterator) Next() (*influxql.FloatPoint, error) {
for {
// If there are no more values then move to the next key.
if len(itr.buf.keys) == 0 {
if len(itr.series) == 0 {
return nil, nil
}
itr.buf.s = itr.series[0]
itr.buf.keys = itr.keys
itr.series = itr.series[1:]
continue
}
key := itr.buf.keys[0]
value := itr.buf.s.Tags.GetString(key)
if value == "" {
itr.buf.keys = itr.buf.keys[1:]
continue
}
// Prepare auxiliary fields.
auxFields := make([]interface{}, len(itr.fields))
for i, f := range itr.fields {
switch f {
case "_tagKey":
auxFields[i] = key
case "value":
auxFields[i] = value
}
}
// Return next key.
p := &influxql.FloatPoint{
Name: itr.buf.s.measurement.Name,
Aux: auxFields,
}
itr.buf.keys = itr.buf.keys[1:]
return p, nil
}
}
// measurementKeyFunc is the function called by measurementKeysIterator.
type measurementKeyFunc func(m *Measurement) []string

View File

@ -47,7 +47,7 @@ func TestShardWriteAndIndex(t *testing.T) {
pt := models.MustNewPoint(
"cpu",
map[string]string{"host": "server"},
models.Tags{{Key: []byte("host"), Value: []byte("server")}},
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
@ -69,7 +69,7 @@ func TestShardWriteAndIndex(t *testing.T) {
}
seriesTags := index.Series(string(pt.Key())).Tags
if len(seriesTags) != len(pt.Tags()) || pt.Tags()["host"] != seriesTags["host"] {
if len(seriesTags) != len(pt.Tags()) || pt.Tags().GetString("host") != seriesTags.GetString("host") {
t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags(), seriesTags)
}
if !reflect.DeepEqual(index.Measurement("cpu").TagKeys(), []string{"host"}) {
@ -121,7 +121,7 @@ func TestMaxSeriesLimit(t *testing.T) {
for i := 0; i < 1000; i++ {
pt := models.MustNewPoint(
"cpu",
map[string]string{"host": fmt.Sprintf("server%d", i)},
models.Tags{{Key: []byte("host"), Value: []byte(fmt.Sprintf("server%d", i))}},
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
@ -136,7 +136,7 @@ func TestMaxSeriesLimit(t *testing.T) {
// Writing one more series should exceed the series limit.
pt := models.MustNewPoint(
"cpu",
map[string]string{"host": "server9999"},
models.Tags{{Key: []byte("host"), Value: []byte("server9999")}},
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
@ -169,7 +169,7 @@ func TestWriteTimeTag(t *testing.T) {
pt := models.MustNewPoint(
"cpu",
map[string]string{},
models.NewTags(map[string]string{}),
map[string]interface{}{"time": 1.0},
time.Unix(1, 2),
)
@ -189,7 +189,7 @@ func TestWriteTimeTag(t *testing.T) {
pt = models.MustNewPoint(
"cpu",
map[string]string{},
models.NewTags(map[string]string{}),
map[string]interface{}{"value": 1.0, "time": 1.0},
time.Unix(1, 2),
)
@ -230,7 +230,7 @@ func TestWriteTimeField(t *testing.T) {
pt := models.MustNewPoint(
"cpu",
map[string]string{"time": "now"},
models.NewTags(map[string]string{"time": "now"}),
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
@ -270,7 +270,7 @@ func TestShardWriteAddNewField(t *testing.T) {
pt := models.MustNewPoint(
"cpu",
map[string]string{"host": "server"},
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
@ -282,7 +282,7 @@ func TestShardWriteAddNewField(t *testing.T) {
pt = models.MustNewPoint(
"cpu",
map[string]string{"host": "server"},
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0, "value2": 2.0},
time.Unix(1, 2),
)
@ -296,7 +296,7 @@ func TestShardWriteAddNewField(t *testing.T) {
t.Fatalf("series wasn't in index")
}
seriesTags := index.Series(string(pt.Key())).Tags
if len(seriesTags) != len(pt.Tags()) || pt.Tags()["host"] != seriesTags["host"] {
if len(seriesTags) != len(pt.Tags()) || pt.Tags().GetString("host") != seriesTags.GetString("host") {
t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags(), seriesTags)
}
if !reflect.DeepEqual(index.Measurement("cpu").TagKeys(), []string{"host"}) {
@ -328,7 +328,7 @@ func TestShard_Close_RemoveIndex(t *testing.T) {
pt := models.MustNewPoint(
"cpu",
map[string]string{"host": "server"},
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
@ -521,7 +521,7 @@ func TestShard_Disabled_WriteQuery(t *testing.T) {
pt := models.MustNewPoint(
"cpu",
map[string]string{"host": "server"},
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)

View File

@ -929,13 +929,13 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err
// Loop over all keys for each series.
m := make(map[KeyValue]struct{}, len(ss))
for _, series := range ss {
for key, value := range series.Tags {
for _, t := range series.Tags {
if !ok {
// nop
} else if _, exists := keySet[key]; !exists {
} else if _, exists := keySet[string(t.Key)]; !exists {
continue
}
m[KeyValue{key, value}] = struct{}{}
m[KeyValue{string(t.Key), string(t.Value)}] = struct{}{}
}
}