From 8aa224b22dc007ee31a176e053ceb78519899a73 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Thu, 30 Jun 2016 10:49:53 -0600 Subject: [PATCH] reduce memory allocations in index This commit changes the index to point to index data in the shards instead of keeping it in-memory on the heap. --- client/influxdb.go | 2 +- client/v2/client.go | 4 +- cmd/influx_inspect/export.go | 2 +- cmd/influx_inspect/report.go | 10 +- cmd/influx_inspect/tsm.go | 10 +- coordinator/points_writer.go | 4 +- coordinator/statement_executor.go | 4 +- influxql/query_executor.go | 2 +- models/points.go | 193 ++++++++++++++++------- models/points_test.go | 196 ++++++++++++------------ monitor/service.go | 8 +- services/collectd/service.go | 5 +- services/continuous_querier/service.go | 2 +- services/graphite/config.go | 6 +- services/graphite/parser.go | 22 +-- services/graphite/parser_test.go | 42 ++--- services/graphite/service.go | 2 +- services/graphite/service_test.go | 4 +- services/httpd/handler.go | 18 +-- services/httpd/response_writer.go | 2 +- services/httpd/service.go | 2 +- services/opentsdb/handler.go | 2 +- services/opentsdb/service.go | 4 +- services/opentsdb/service_test.go | 4 +- services/subscriber/service.go | 6 +- services/udp/service.go | 2 +- stress/v2/stress_client/commune_test.go | 12 +- tsdb/engine/tsm1/cache.go | 2 +- tsdb/engine/tsm1/engine.go | 46 +++--- tsdb/engine/tsm1/engine_test.go | 22 +-- tsdb/engine/tsm1/file_store.go | 38 ++++- tsdb/engine/tsm1/reader.go | 19 ++- tsdb/engine/tsm1/wal.go | 2 +- tsdb/meta.go | 129 ++++++++++++---- tsdb/meta_test.go | 3 +- tsdb/shard.go | 152 +++++++++++++++++- tsdb/shard_test.go | 24 +-- tsdb/store.go | 6 +- 38 files changed, 666 insertions(+), 347 deletions(-) diff --git a/client/influxdb.go b/client/influxdb.go index 90695b9ed2..77f451022a 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -562,7 +562,7 @@ func (p *Point) MarshalJSON() ([]byte, error) { // MarshalString renders string representation of a Point with specified // precision. The default precision is nanoseconds. func (p *Point) MarshalString() string { - pt, err := models.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time) + pt, err := models.NewPoint(p.Measurement, models.NewTags(p.Tags), p.Fields, p.Time) if err != nil { return "# ERROR: " + err.Error() + " " + p.Measurement } diff --git a/client/v2/client.go b/client/v2/client.go index 1ed78e3156..fe08d29a8f 100644 --- a/client/v2/client.go +++ b/client/v2/client.go @@ -356,7 +356,7 @@ func NewPoint( T = t[0] } - pt, err := models.NewPoint(name, tags, fields, T) + pt, err := models.NewPoint(name, models.NewTags(tags), fields, T) if err != nil { return nil, err } @@ -382,7 +382,7 @@ func (p *Point) Name() string { // Tags returns the tags associated with the point func (p *Point) Tags() map[string]string { - return p.pt.Tags() + return p.pt.Tags().Map() } // Time return the timestamp for the point diff --git a/cmd/influx_inspect/export.go b/cmd/influx_inspect/export.go index df8114a7ea..53505bbde6 100644 --- a/cmd/influx_inspect/export.go +++ b/cmd/influx_inspect/export.go @@ -132,7 +132,7 @@ func (c *cmdExport) writeFiles() error { for i := 0; i < reader.KeyCount(); i++ { var pairs string key, typ := reader.KeyAt(i) - values, _ := reader.ReadAll(key) + values, _ := reader.ReadAll(string(key)) measurement, field := tsm1.SeriesAndFieldFromCompositeKey(key) for _, value := range values { diff --git a/cmd/influx_inspect/report.go b/cmd/influx_inspect/report.go index 9932f6655a..006454974f 100644 --- a/cmd/influx_inspect/report.go +++ b/cmd/influx_inspect/report.go @@ -78,7 +78,7 @@ func cmdReport(opts *reportOpts) { totalSeries.Add([]byte(key)) if opts.detailed { - sep := strings.Index(key, "#!~#") + sep := strings.Index(string(key), "#!~#") seriesKey, field := key[:sep], key[sep+4:] measurement, tags, _ := models.ParseKey(seriesKey) @@ -96,13 +96,13 @@ func cmdReport(opts *reportOpts) { } fieldCount.Add([]byte(field)) - for t, v := range tags { - tagCount, ok := tagCardialities[t] + for _, t := range tags { + tagCount, ok := tagCardialities[string(t.Key)] if !ok { tagCount = hllpp.New() - tagCardialities[t] = tagCount + tagCardialities[string(t.Key)] = tagCount } - tagCount.Add([]byte(v)) + tagCount.Add(t.Value) } } } diff --git a/cmd/influx_inspect/tsm.go b/cmd/influx_inspect/tsm.go index 5fea93aed9..cd20759a57 100644 --- a/cmd/influx_inspect/tsm.go +++ b/cmd/influx_inspect/tsm.go @@ -121,9 +121,9 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) { var pos int for i := 0; i < keyCount; i++ { key, _ := r.KeyAt(i) - for _, e := range r.Entries(key) { + for _, e := range r.Entries(string(key)) { pos++ - split := strings.Split(key, "#!~#") + split := strings.Split(string(key), "#!~#") // We dont' know know if we have fields so use an informative default var measurement, field string = "UNKNOWN", "UNKNOWN" @@ -132,7 +132,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) { measurement = split[0] field = split[1] - if opts.filterKey != "" && !strings.Contains(key, opts.filterKey) { + if opts.filterKey != "" && !strings.Contains(string(key), opts.filterKey) { continue } fmt.Fprintln(tw, " "+strings.Join([]string{ @@ -160,7 +160,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) { // Start at the beginning and read every block for j := 0; j < keyCount; j++ { key, _ := r.KeyAt(j) - for _, e := range r.Entries(key) { + for _, e := range r.Entries(string(key)) { f.Seek(int64(e.Offset), 0) f.Read(b[:4]) @@ -172,7 +172,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) { blockSize += int64(e.Size) - if opts.filterKey != "" && !strings.Contains(key, opts.filterKey) { + if opts.filterKey != "" && !strings.Contains(string(key), opts.filterKey) { i += blockSize blockCount++ continue diff --git a/coordinator/points_writer.go b/coordinator/points_writer.go index 1a54ebf088..8f8e8d9e39 100644 --- a/coordinator/points_writer.go +++ b/coordinator/points_writer.go @@ -83,7 +83,7 @@ type WritePointsRequest struct { // AddPoint adds a point to the WritePointRequest with field key 'value' func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string) { pt, err := models.NewPoint( - name, tags, map[string]interface{}{"value": value}, timestamp, + name, models.NewTags(tags), map[string]interface{}{"value": value}, timestamp, ) if err != nil { return @@ -176,7 +176,7 @@ type WriteStatistics struct { func (w *PointsWriter) Statistics(tags map[string]string) []models.Statistic { return []models.Statistic{{ Name: "write", - Tags: tags, + Tags: models.NewTags(tags), Values: map[string]interface{}{ statWriteReq: atomic.LoadInt64(&w.stats.WriteReq), statPointWriteReq: atomic.LoadInt64(&w.stats.PointWriteReq), diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index c0040c6a9a..e18b5becc4 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -818,7 +818,7 @@ func (e *StatementExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsSt if stmt.Module != "" && stat.Name != stmt.Module { continue } - row := &models.Row{Name: stat.Name, Tags: stat.Tags} + row := &models.Row{Name: stat.Name, Tags: stat.Tags.Map()} values := make([]interface{}, 0, len(stat.Values)) for _, k := range stat.ValueNames() { @@ -1055,7 +1055,7 @@ func convertRowToPoints(measurementName string, row *models.Row) ([]models.Point } } - p, err := models.NewPoint(measurementName, row.Tags, vals, v[timeIndex].(time.Time)) + p, err := models.NewPoint(measurementName, models.NewTags(row.Tags), vals, v[timeIndex].(time.Time)) if err != nil { // Drop points that can't be stored continue diff --git a/influxql/query_executor.go b/influxql/query_executor.go index 1ef0adbfc0..365bd97e13 100644 --- a/influxql/query_executor.go +++ b/influxql/query_executor.go @@ -148,7 +148,7 @@ type QueryStatistics struct { func (e *QueryExecutor) Statistics(tags map[string]string) []models.Statistic { return []models.Statistic{{ Name: "queryExecutor", - Tags: tags, + Tags: models.NewTags(tags), Values: map[string]interface{}{ statQueriesActive: atomic.LoadInt64(&e.stats.ActiveQueries), statQueriesExecuted: atomic.LoadInt64(&e.stats.ExecutedQueries), diff --git a/models/points.go b/models/points.go index fa678b9631..b9f76d4464 100644 --- a/models/points.go +++ b/models/points.go @@ -139,14 +139,14 @@ func ParsePointsString(buf string) ([]Point, error) { } // ParseKey returns the measurement name and tags from a point. -func ParseKey(buf string) (string, Tags, error) { +func ParseKey(buf []byte) (string, Tags, error) { // Ignore the error because scanMeasurement returns "missing fields" which we ignore // when just parsing a key - state, i, _ := scanMeasurement([]byte(buf), 0) + state, i, _ := scanMeasurement(buf, 0) var tags Tags if state == tagKeyState { - tags = parseTags([]byte(buf)) + tags = parseTags(buf) // scanMeasurement returns the location of the comma if there are tags, strip that off return string(buf[:i-1]), tags, nil } @@ -1225,39 +1225,42 @@ func (p *point) Tags() Tags { } func parseTags(buf []byte) Tags { - tags := make(map[string]string, bytes.Count(buf, []byte(","))) + if len(buf) == 0 { + return nil + } + + pos, name := scanTo(buf, 0, ',') + + // it's an empty key, so there are no tags + if len(name) == 0 { + return nil + } + + tags := make(Tags, 0, bytes.Count(buf, []byte(","))) hasEscape := bytes.IndexByte(buf, '\\') != -1 - if len(buf) != 0 { - pos, name := scanTo(buf, 0, ',') + i := pos + 1 + var key, value []byte + for { + if i >= len(buf) { + break + } + i, key = scanTo(buf, i, '=') + i, value = scanTagValue(buf, i+1) - // it's an empyt key, so there are no tags - if len(name) == 0 { - return tags + if len(value) == 0 { + continue } - i := pos + 1 - var key, value []byte - for { - if i >= len(buf) { - break - } - i, key = scanTo(buf, i, '=') - i, value = scanTagValue(buf, i+1) - - if len(value) == 0 { - continue - } - - if hasEscape { - tags[string(unescapeTag(key))] = string(unescapeTag(value)) - } else { - tags[string(key)] = string(value) - } - - i++ + if hasEscape { + tags = append(tags, Tag{Key: unescapeTag(key), Value: unescapeTag(value)}) + } else { + tags = append(tags, Tag{Key: key, Value: value}) } + + i++ } + return tags } @@ -1276,7 +1279,8 @@ func (p *point) SetTags(tags Tags) { // AddTag adds or replaces a tag value for a point func (p *point) AddTag(key, value string) { tags := p.Tags() - tags[key] = value + tags = append(tags, Tag{Key: []byte(key), Value: []byte(value)}) + sort.Sort(tags) p.key = MakeKey([]byte(p.Name()), tags) } @@ -1386,64 +1390,137 @@ func (p *point) UnixNano() int64 { return p.Time().UnixNano() } -// Tags represents a mapping between a Point's tag names and their -// values. -type Tags map[string]string +// Tag represents a single key/value tag pair. +type Tag struct { + Key []byte + Value []byte +} + +// Tags represents a sorted list of tags. +type Tags []Tag + +// NewTags returns a new Tags from a map. +func NewTags(m map[string]string) Tags { + a := make(Tags, 0, len(m)) + for k, v := range m { + a = append(a, Tag{Key: []byte(k), Value: []byte(v)}) + } + sort.Sort(a) + return a +} + +func (a Tags) Len() int { return len(a) } +func (a Tags) Less(i, j int) bool { return bytes.Compare(a[i].Key, a[j].Key) == -1 } +func (a Tags) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +// Get returns the value for a key. +func (a Tags) Get(key []byte) []byte { + // OPTIMIZE: Use sort.Search if tagset is large. + + for _, t := range a { + if bytes.Equal(t.Key, key) { + return t.Value + } + } + return nil +} + +// GetString returns the string value for a string key. +func (a Tags) GetString(key string) string { + return string(a.Get([]byte(key))) +} + +// Set sets the value for a key. +func (a *Tags) Set(key, value []byte) { + for _, t := range *a { + if bytes.Equal(t.Key, key) { + t.Value = value + return + } + } + *a = append(*a, Tag{Key: key, Value: value}) + sort.Sort(*a) +} + +// SetString sets the string value for a string key. +func (a *Tags) SetString(key, value string) { + a.Set([]byte(key), []byte(value)) +} + +// Delete removes a tag by key. +func (a *Tags) Delete(key []byte) { + for i, t := range *a { + if bytes.Equal(t.Key, key) { + copy((*a)[i:], (*a)[i+1:]) + (*a)[len(*a)-1] = Tag{} + *a = (*a)[:len(*a)-1] + return + } + } +} + +// Map returns a map representation of the tags. +func (a Tags) Map() map[string]string { + m := make(map[string]string, len(a)) + for _, t := range a { + m[string(t.Key)] = string(t.Value) + } + return m +} // Merge merges the tags combining the two. If both define a tag with the // same key, the merged value overwrites the old value. // A new map is returned. -func (t Tags) Merge(other map[string]string) Tags { - merged := make(map[string]string, len(t)+len(other)) - for k, v := range t { - merged[k] = v +func (a Tags) Merge(other map[string]string) Tags { + merged := make(map[string]string, len(a)+len(other)) + for _, t := range a { + merged[string(t.Key)] = string(t.Value) } for k, v := range other { merged[k] = v } - return Tags(merged) + return NewTags(merged) } // HashKey hashes all of a tag's keys. -func (t Tags) HashKey() []byte { +func (a Tags) HashKey() []byte { // Empty maps marshal to empty bytes. - if len(t) == 0 { + if len(a) == 0 { return nil } - escaped := Tags{} - for k, v := range t { - ek := escapeTag([]byte(k)) - ev := escapeTag([]byte(v)) + escaped := make(Tags, 0, len(a)) + for _, t := range a { + ek := escapeTag(t.Key) + ev := escapeTag(t.Value) if len(ev) > 0 { - escaped[string(ek)] = string(ev) + escaped = append(escaped, Tag{Key: ek, Value: ev}) } } // Extract keys and determine final size. sz := len(escaped) + (len(escaped) * 2) // separators - keys := make([]string, len(escaped)+1) - i := 0 - for k, v := range escaped { - keys[i] = k - i++ - sz += len(k) + len(v) + keys := make([][]byte, len(escaped)+1) + for i, t := range escaped { + keys[i] = t.Key + sz += len(t.Key) + len(t.Value) } - keys = keys[:i] - sort.Strings(keys) + keys = keys[:len(escaped)] + sort.Sort(byteSlices(keys)) + // Generate marshaled bytes. b := make([]byte, sz) buf := b idx := 0 - for _, k := range keys { + for i, k := range keys { buf[idx] = ',' idx++ copy(buf[idx:idx+len(k)], k) idx += len(k) buf[idx] = '=' idx++ - v := escaped[k] + v := escaped[i].Value copy(buf[idx:idx+len(v)], v) idx += len(v) } @@ -1594,3 +1671,9 @@ func (p Fields) MarshalBinary() []byte { } return b } + +type byteSlices [][]byte + +func (a byteSlices) Len() int { return len(a) } +func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 } +func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] } diff --git a/models/points_test.go b/models/points_test.go index e7df0bfc09..c467b5e7cb 100644 --- a/models/points_test.go +++ b/models/points_test.go @@ -15,7 +15,7 @@ import ( ) var ( - tags = models.Tags{"foo": "bar", "apple": "orange", "host": "serverA", "region": "uswest"} + tags = models.NewTags(map[string]string{"foo": "bar", "apple": "orange", "host": "serverA", "region": "uswest"}) maxFloat64 = strconv.FormatFloat(math.MaxFloat64, 'f', 1, 64) minFloat64 = strconv.FormatFloat(-math.MaxFloat64, 'f', 1, 64) ) @@ -115,7 +115,7 @@ func BenchmarkParsePointsTagsUnSorted10(b *testing.B) { func BenchmarkParseKey(b *testing.B) { line := `cpu,region=us-west,host=serverA,env=prod,target=servers,zone=1c,tag1=value1,tag2=value2,tag3=value3,tag4=value4,tag5=value5` for i := 0; i < b.N; i++ { - models.ParseKey(line) + models.ParseKey([]byte(line)) } } @@ -163,9 +163,9 @@ func test(t *testing.T, line string, point TestPoint) { t.Errorf(`ParsePoints("%s") tags mismatch. got %v, exp %v`, line, pts[0].Tags(), exp) } - for tag, value := range pts[0].Tags() { - if value != point.RawTags[tag] { - t.Errorf(`ParsePoints("%s") tags mismatch. got %v, exp %v`, line, value, point.RawTags[tag]) + for _, tag := range pts[0].Tags() { + if !bytes.Equal(tag.Value, point.RawTags.Get(tag.Key)) { + t.Errorf(`ParsePoints("%s") tags mismatch. got %s, exp %s`, line, tag.Value, point.RawTags.Get(tag.Key)) } } @@ -639,7 +639,7 @@ func TestParsePointUnescape(t *testing.T) { test(t, `foo\,bar value=1i`, NewTestPoint( "foo,bar", // comma in the name - models.Tags{}, + models.NewTags(map[string]string{}), models.Fields{ "value": int64(1), }, @@ -649,9 +649,9 @@ func TestParsePointUnescape(t *testing.T) { test(t, `cpu\,main,regions=east value=1.0`, NewTestPoint( "cpu,main", // comma in the name - models.Tags{ + models.NewTags(map[string]string{ "regions": "east", - }, + }), models.Fields{ "value": 1.0, }, @@ -661,9 +661,9 @@ func TestParsePointUnescape(t *testing.T) { test(t, `cpu\ load,region=east value=1.0`, NewTestPoint( "cpu load", // space in the name - models.Tags{ + models.NewTags(map[string]string{ "region": "east", - }, + }), models.Fields{ "value": 1.0, }, @@ -673,9 +673,9 @@ func TestParsePointUnescape(t *testing.T) { test(t, `cpu\=load,region=east value=1.0`, NewTestPoint( `cpu\=load`, // backslash is literal - models.Tags{ + models.NewTags(map[string]string{ "region": "east", - }, + }), models.Fields{ "value": 1.0, }, @@ -685,9 +685,9 @@ func TestParsePointUnescape(t *testing.T) { test(t, `cpu=load,region=east value=1.0`, NewTestPoint( `cpu=load`, // literal equals is fine in measurement name - models.Tags{ + models.NewTags(map[string]string{ "region": "east", - }, + }), models.Fields{ "value": 1.0, }, @@ -696,9 +696,9 @@ func TestParsePointUnescape(t *testing.T) { // commas in tag names test(t, `cpu,region\,zone=east value=1.0`, NewTestPoint("cpu", - models.Tags{ + models.NewTags(map[string]string{ "region,zone": "east", // comma in the tag key - }, + }), models.Fields{ "value": 1.0, }, @@ -707,9 +707,9 @@ func TestParsePointUnescape(t *testing.T) { // spaces in tag name test(t, `cpu,region\ zone=east value=1.0`, NewTestPoint("cpu", - models.Tags{ + models.NewTags(map[string]string{ "region zone": "east", // space in the tag name - }, + }), models.Fields{ "value": 1.0, }, @@ -718,9 +718,9 @@ func TestParsePointUnescape(t *testing.T) { // backslash with escaped equals in tag name test(t, `cpu,reg\\=ion=east value=1.0`, NewTestPoint("cpu", - models.Tags{ + models.NewTags(map[string]string{ `reg\=ion`: "east", - }, + }), models.Fields{ "value": 1.0, }, @@ -729,9 +729,9 @@ func TestParsePointUnescape(t *testing.T) { // space is tag name test(t, `cpu,\ =east value=1.0`, NewTestPoint("cpu", - models.Tags{ + models.NewTags(map[string]string{ " ": "east", // tag name is single space - }, + }), models.Fields{ "value": 1.0, }, @@ -740,9 +740,9 @@ func TestParsePointUnescape(t *testing.T) { // commas in tag values test(t, `cpu,regions=east\,west value=1.0`, NewTestPoint("cpu", - models.Tags{ + models.NewTags(map[string]string{ "regions": "east,west", // comma in the tag value - }, + }), models.Fields{ "value": 1.0, }, @@ -752,9 +752,9 @@ func TestParsePointUnescape(t *testing.T) { test(t, `cpu,regions=\\ east value=1.0`, NewTestPoint( "cpu", - models.Tags{ + models.NewTags(map[string]string{ "regions": `\ east`, - }, + }), models.Fields{ "value": 1.0, }, @@ -764,9 +764,9 @@ func TestParsePointUnescape(t *testing.T) { test(t, `cpu,regions=eas\\ t value=1.0`, NewTestPoint( "cpu", - models.Tags{ + models.NewTags(map[string]string{ "regions": `eas\ t`, - }, + }), models.Fields{ "value": 1.0, }, @@ -776,9 +776,9 @@ func TestParsePointUnescape(t *testing.T) { test(t, `cpu,regions=east\\ value=1.0`, NewTestPoint( "cpu", - models.Tags{ + models.NewTags(map[string]string{ "regions": `east\ `, - }, + }), models.Fields{ "value": 1.0, }, @@ -787,9 +787,9 @@ func TestParsePointUnescape(t *testing.T) { // spaces in tag values test(t, `cpu,regions=east\ west value=1.0`, NewTestPoint("cpu", - models.Tags{ + models.NewTags(map[string]string{ "regions": "east west", // comma in the tag value - }, + }), models.Fields{ "value": 1.0, }, @@ -798,9 +798,9 @@ func TestParsePointUnescape(t *testing.T) { // commas in field keys test(t, `cpu,regions=east value\,ms=1.0`, NewTestPoint("cpu", - models.Tags{ + models.NewTags(map[string]string{ "regions": "east", - }, + }), models.Fields{ "value,ms": 1.0, // comma in the field keys }, @@ -809,9 +809,9 @@ func TestParsePointUnescape(t *testing.T) { // spaces in field keys test(t, `cpu,regions=east value\ ms=1.0`, NewTestPoint("cpu", - models.Tags{ + models.NewTags(map[string]string{ "regions": "east", - }, + }), models.Fields{ "value ms": 1.0, // comma in the field keys }, @@ -820,10 +820,10 @@ func TestParsePointUnescape(t *testing.T) { // tag with no value test(t, `cpu,regions=east value="1"`, NewTestPoint("cpu", - models.Tags{ + models.NewTags(map[string]string{ "regions": "east", "foobar": "", - }, + }), models.Fields{ "value": "1", }, @@ -832,9 +832,9 @@ func TestParsePointUnescape(t *testing.T) { // commas in field values test(t, `cpu,regions=east value="1,0"`, NewTestPoint("cpu", - models.Tags{ + models.NewTags(map[string]string{ "regions": "east", - }, + }), models.Fields{ "value": "1,0", // comma in the field value }, @@ -844,9 +844,9 @@ func TestParsePointUnescape(t *testing.T) { test(t, `cpu,regions=eas\t value=1.0`, NewTestPoint( "cpu", - models.Tags{ + models.NewTags(map[string]string{ "regions": "eas\\t", - }, + }), models.Fields{ "value": 1.0, }, @@ -856,9 +856,9 @@ func TestParsePointUnescape(t *testing.T) { test(t, `cpu,regions=\\,\,\=east value=1.0`, NewTestPoint( "cpu", - models.Tags{ + models.NewTags(map[string]string{ "regions": `\,,=east`, - }, + }), models.Fields{ "value": 1.0, }, @@ -868,7 +868,7 @@ func TestParsePointUnescape(t *testing.T) { test(t, `cpu \a=1i`, NewTestPoint( "cpu", - models.Tags{}, + models.NewTags(map[string]string{}), models.Fields{ "\\a": int64(1), // Left as parsed since it's not a known escape sequence. }, @@ -878,9 +878,9 @@ func TestParsePointUnescape(t *testing.T) { test(t, `cpu=load,equals\=foo=tag\=value value=1i`, NewTestPoint( "cpu=load", // Not escaped - models.Tags{ + models.NewTags(map[string]string{ "equals=foo": "tag=value", // Tag and value unescaped - }, + }), models.Fields{ "value": int64(1), }, @@ -892,7 +892,7 @@ func TestParsePointWithTags(t *testing.T) { test(t, "cpu,host=serverA,region=us-east value=1.0 1000000000", NewTestPoint("cpu", - models.Tags{"host": "serverA", "region": "us-east"}, + models.NewTags(map[string]string{"host": "serverA", "region": "us-east"}), models.Fields{"value": 1.0}, time.Unix(1, 0))) } @@ -924,10 +924,10 @@ func TestParsePointWithDuplicateTags(t *testing.T) { func TestParsePointWithStringField(t *testing.T) { test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo",str2="bar" 1000000000`, NewTestPoint("cpu", - models.Tags{ + models.NewTags(map[string]string{ "host": "serverA", "region": "us-east", - }, + }), models.Fields{ "value": 1.0, "str": "foo", @@ -938,10 +938,10 @@ func TestParsePointWithStringField(t *testing.T) { test(t, `cpu,host=serverA,region=us-east str="foo \" bar" 1000000000`, NewTestPoint("cpu", - models.Tags{ + models.NewTags(map[string]string{ "host": "serverA", "region": "us-east", - }, + }), models.Fields{ "str": `foo " bar`, }, @@ -954,10 +954,10 @@ func TestParsePointWithStringWithSpaces(t *testing.T) { test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo bar" 1000000000`, NewTestPoint( "cpu", - models.Tags{ + models.NewTags(map[string]string{ "host": "serverA", "region": "us-east", - }, + }), models.Fields{ "value": 1.0, "str": "foo bar", // spaces in string value @@ -970,10 +970,10 @@ func TestParsePointWithStringWithNewline(t *testing.T) { test(t, "cpu,host=serverA,region=us-east value=1.0,str=\"foo\nbar\" 1000000000", NewTestPoint( "cpu", - models.Tags{ + models.NewTags(map[string]string{ "host": "serverA", "region": "us-east", - }, + }), models.Fields{ "value": 1.0, "str": "foo\nbar", // newline in string value @@ -987,10 +987,10 @@ func TestParsePointWithStringWithCommas(t *testing.T) { test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo\,bar" 1000000000`, NewTestPoint( "cpu", - models.Tags{ + models.NewTags(map[string]string{ "host": "serverA", "region": "us-east", - }, + }), models.Fields{ "value": 1.0, "str": `foo\,bar`, // commas in string value @@ -1002,10 +1002,10 @@ func TestParsePointWithStringWithCommas(t *testing.T) { test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo,bar" 1000000000`, NewTestPoint( "cpu", - models.Tags{ + models.NewTags(map[string]string{ "host": "serverA", "region": "us-east", - }, + }), models.Fields{ "value": 1.0, "str": "foo,bar", // commas in string value @@ -1019,10 +1019,10 @@ func TestParsePointQuotedMeasurement(t *testing.T) { test(t, `"cpu",host=serverA,region=us-east value=1.0 1000000000`, NewTestPoint( `"cpu"`, - models.Tags{ + models.NewTags(map[string]string{ "host": "serverA", "region": "us-east", - }, + }), models.Fields{ "value": 1.0, }, @@ -1034,10 +1034,10 @@ func TestParsePointQuotedTags(t *testing.T) { test(t, `cpu,"host"="serverA",region=us-east value=1.0 1000000000`, NewTestPoint( "cpu", - models.Tags{ + models.NewTags(map[string]string{ `"host"`: `"serverA"`, "region": "us-east", - }, + }), models.Fields{ "value": 1.0, }, @@ -1056,7 +1056,7 @@ func TestParsePointsUnbalancedQuotedTags(t *testing.T) { } // Expected " in the tag value - exp := models.MustNewPoint("baz", models.Tags{"mytag": `"a`}, + exp := models.MustNewPoint("baz", models.NewTags(map[string]string{"mytag": `"a`}), models.Fields{"x": float64(1)}, time.Unix(0, 1441103862125)) if pts[0].String() != exp.String() { @@ -1064,7 +1064,7 @@ func TestParsePointsUnbalancedQuotedTags(t *testing.T) { } // Expected two points to ensure we did not overscan the line - exp = models.MustNewPoint("baz", models.Tags{"mytag": `a`}, + exp = models.MustNewPoint("baz", models.NewTags(map[string]string{"mytag": `a`}), models.Fields{"z": float64(1)}, time.Unix(0, 1441103862126)) if pts[1].String() != exp.String() { @@ -1078,10 +1078,10 @@ func TestParsePointEscapedStringsAndCommas(t *testing.T) { test(t, `cpu,host=serverA,region=us-east value="{Hello\"{,}\" World}" 1000000000`, NewTestPoint( "cpu", - models.Tags{ + models.NewTags(map[string]string{ "host": "serverA", "region": "us-east", - }, + }), models.Fields{ "value": `{Hello"{,}" World}`, }, @@ -1092,10 +1092,10 @@ func TestParsePointEscapedStringsAndCommas(t *testing.T) { test(t, `cpu,host=serverA,region=us-east value="{Hello\"{\,}\" World}" 1000000000`, NewTestPoint( "cpu", - models.Tags{ + models.NewTags(map[string]string{ "host": "serverA", "region": "us-east", - }, + }), models.Fields{ "value": `{Hello"{\,}" World}`, }, @@ -1107,10 +1107,10 @@ func TestParsePointWithStringWithEquals(t *testing.T) { test(t, `cpu,host=serverA,region=us-east str="foo=bar",value=1.0 1000000000`, NewTestPoint( "cpu", - models.Tags{ + models.NewTags(map[string]string{ "host": "serverA", "region": "us-east", - }, + }), models.Fields{ "value": 1.0, "str": "foo=bar", // spaces in string value @@ -1123,7 +1123,7 @@ func TestParsePointWithStringWithBackslash(t *testing.T) { test(t, `cpu value="test\\\"" 1000000000`, NewTestPoint( "cpu", - models.Tags{}, + models.NewTags(map[string]string{}), models.Fields{ "value": `test\"`, }, @@ -1133,7 +1133,7 @@ func TestParsePointWithStringWithBackslash(t *testing.T) { test(t, `cpu value="test\\" 1000000000`, NewTestPoint( "cpu", - models.Tags{}, + models.NewTags(map[string]string{}), models.Fields{ "value": `test\`, }, @@ -1143,7 +1143,7 @@ func TestParsePointWithStringWithBackslash(t *testing.T) { test(t, `cpu value="test\\\"" 1000000000`, NewTestPoint( "cpu", - models.Tags{}, + models.NewTags(map[string]string{}), models.Fields{ "value": `test\"`, }, @@ -1153,7 +1153,7 @@ func TestParsePointWithStringWithBackslash(t *testing.T) { test(t, `cpu value="test\"" 1000000000`, NewTestPoint( "cpu", - models.Tags{}, + models.NewTags(map[string]string{}), models.Fields{ "value": `test"`, }, @@ -1165,10 +1165,10 @@ func TestParsePointWithBoolField(t *testing.T) { test(t, `cpu,host=serverA,region=us-east true=true,t=t,T=T,TRUE=TRUE,True=True,false=false,f=f,F=F,FALSE=FALSE,False=False 1000000000`, NewTestPoint( "cpu", - models.Tags{ + models.NewTags(map[string]string{ "host": "serverA", "region": "us-east", - }, + }), models.Fields{ "t": true, "T": true, @@ -1189,10 +1189,10 @@ func TestParsePointUnicodeString(t *testing.T) { test(t, `cpu,host=serverA,region=us-east value="wè" 1000000000`, NewTestPoint( "cpu", - models.Tags{ + models.NewTags(map[string]string{ "host": "serverA", "region": "us-east", - }, + }), models.Fields{ "value": "wè", }, @@ -1204,7 +1204,7 @@ func TestParsePointNegativeTimestamp(t *testing.T) { test(t, `cpu value=1 -1`, NewTestPoint( "cpu", - models.Tags{}, + models.NewTags(map[string]string{}), models.Fields{ "value": 1.0, }, @@ -1216,7 +1216,7 @@ func TestParsePointMaxTimestamp(t *testing.T) { test(t, fmt.Sprintf(`cpu value=1 %d`, models.MaxNanoTime), NewTestPoint( "cpu", - models.Tags{}, + models.NewTags(map[string]string{}), models.Fields{ "value": 1.0, }, @@ -1228,7 +1228,7 @@ func TestParsePointMinTimestamp(t *testing.T) { test(t, `cpu value=1 -9223372036854775808`, NewTestPoint( "cpu", - models.Tags{}, + models.NewTags(map[string]string{}), models.Fields{ "value": 1.0, }, @@ -1259,7 +1259,7 @@ func TestNewPointFloatWithoutDecimal(t *testing.T) { test(t, `cpu value=1 1000000000`, NewTestPoint( "cpu", - models.Tags{}, + models.NewTags(map[string]string{}), models.Fields{ "value": 1.0, }, @@ -1270,7 +1270,7 @@ func TestNewPointNegativeFloat(t *testing.T) { test(t, `cpu value=-0.64 1000000000`, NewTestPoint( "cpu", - models.Tags{}, + models.NewTags(map[string]string{}), models.Fields{ "value": -0.64, }, @@ -1282,7 +1282,7 @@ func TestNewPointFloatNoDecimal(t *testing.T) { test(t, `cpu value=1. 1000000000`, NewTestPoint( "cpu", - models.Tags{}, + models.NewTags(map[string]string{}), models.Fields{ "value": 1.0, }, @@ -1294,7 +1294,7 @@ func TestNewPointFloatScientific(t *testing.T) { test(t, `cpu value=6.632243e+06 1000000000`, NewTestPoint( "cpu", - models.Tags{}, + models.NewTags(map[string]string{}), models.Fields{ "value": float64(6632243), }, @@ -1306,7 +1306,7 @@ func TestNewPointLargeInteger(t *testing.T) { test(t, `cpu value=6632243i 1000000000`, NewTestPoint( "cpu", - models.Tags{}, + models.NewTags(map[string]string{}), models.Fields{ "value": int64(6632243), // if incorrectly encoded as a float, it would show up as 6.632243e+06 }, @@ -1403,7 +1403,7 @@ func TestParsePointToString(t *testing.T) { t.Errorf("ParsePoint() to string mismatch:\n got %v\n exp %v", got, line) } - pt = models.MustNewPoint("cpu", models.Tags{"host": "serverA", "region": "us-east"}, + pt = models.MustNewPoint("cpu", models.NewTags(map[string]string{"host": "serverA", "region": "us-east"}), models.Fields{"int": 10, "float": float64(11.0), "float2": float64(12.123), "bool": false, "str": "string val"}, time.Unix(1, 0)) @@ -1600,26 +1600,26 @@ cpu,host=serverA,region=us-east value=1.0 946730096789012345`, func TestNewPointEscaped(t *testing.T) { // commas - pt := models.MustNewPoint("cpu,main", models.Tags{"tag,bar": "value"}, models.Fields{"name,bar": 1.0}, time.Unix(0, 0)) + pt := models.MustNewPoint("cpu,main", models.NewTags(map[string]string{"tag,bar": "value"}), models.Fields{"name,bar": 1.0}, time.Unix(0, 0)) if exp := `cpu\,main,tag\,bar=value name\,bar=1 0`; pt.String() != exp { t.Errorf("NewPoint().String() mismatch.\ngot %v\nexp %v", pt.String(), exp) } // spaces - pt = models.MustNewPoint("cpu main", models.Tags{"tag bar": "value"}, models.Fields{"name bar": 1.0}, time.Unix(0, 0)) + pt = models.MustNewPoint("cpu main", models.NewTags(map[string]string{"tag bar": "value"}), models.Fields{"name bar": 1.0}, time.Unix(0, 0)) if exp := `cpu\ main,tag\ bar=value name\ bar=1 0`; pt.String() != exp { t.Errorf("NewPoint().String() mismatch.\ngot %v\nexp %v", pt.String(), exp) } // equals - pt = models.MustNewPoint("cpu=main", models.Tags{"tag=bar": "value=foo"}, models.Fields{"name=bar": 1.0}, time.Unix(0, 0)) + pt = models.MustNewPoint("cpu=main", models.NewTags(map[string]string{"tag=bar": "value=foo"}), models.Fields{"name=bar": 1.0}, time.Unix(0, 0)) if exp := `cpu=main,tag\=bar=value\=foo name\=bar=1 0`; pt.String() != exp { t.Errorf("NewPoint().String() mismatch.\ngot %v\nexp %v", pt.String(), exp) } } func TestNewPointWithoutField(t *testing.T) { - _, err := models.NewPoint("cpu", models.Tags{"tag": "bar"}, models.Fields{}, time.Unix(0, 0)) + _, err := models.NewPoint("cpu", models.NewTags(map[string]string{"tag": "bar"}), models.Fields{}, time.Unix(0, 0)) if err == nil { t.Fatalf(`NewPoint() expected error. got nil`) } @@ -1645,19 +1645,19 @@ func TestNewPointUnhandledType(t *testing.T) { } func TestMakeKeyEscaped(t *testing.T) { - if exp, got := `cpu\ load`, models.MakeKey([]byte(`cpu\ load`), models.Tags{}); string(got) != exp { + if exp, got := `cpu\ load`, models.MakeKey([]byte(`cpu\ load`), models.NewTags(map[string]string{})); string(got) != exp { t.Errorf("MakeKey() mismatch.\ngot %v\nexp %v", got, exp) } - if exp, got := `cpu\ load`, models.MakeKey([]byte(`cpu load`), models.Tags{}); string(got) != exp { + if exp, got := `cpu\ load`, models.MakeKey([]byte(`cpu load`), models.NewTags(map[string]string{})); string(got) != exp { t.Errorf("MakeKey() mismatch.\ngot %v\nexp %v", got, exp) } - if exp, got := `cpu\,load`, models.MakeKey([]byte(`cpu\,load`), models.Tags{}); string(got) != exp { + if exp, got := `cpu\,load`, models.MakeKey([]byte(`cpu\,load`), models.NewTags(map[string]string{})); string(got) != exp { t.Errorf("MakeKey() mismatch.\ngot %v\nexp %v", got, exp) } - if exp, got := `cpu\,load`, models.MakeKey([]byte(`cpu,load`), models.Tags{}); string(got) != exp { + if exp, got := `cpu\,load`, models.MakeKey([]byte(`cpu,load`), models.NewTags(map[string]string{})); string(got) != exp { t.Errorf("MakeKey() mismatch.\ngot %v\nexp %v", got, exp) } @@ -1866,7 +1866,7 @@ func TestNewPointsRejectsMaxKey(t *testing.T) { } func TestParseKeyEmpty(t *testing.T) { - if _, _, err := models.ParseKey(""); err != nil { + if _, _, err := models.ParseKey(nil); err != nil { t.Fatalf("unexpected error: %v", err) } } diff --git a/monitor/service.go b/monitor/service.go index 75fab16865..871d47f393 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -212,14 +212,13 @@ func (m *Monitor) Statistics(tags map[string]string) ([]*Statistic, error) { statistic := &Statistic{ Statistic: models.Statistic{ - Tags: make(map[string]string), Values: make(map[string]interface{}), }, } // Add any supplied tags. for k, v := range tags { - statistic.Tags[k] = v + statistic.Tags.SetString(k, v) } // Every other top-level expvar value is a map. @@ -242,7 +241,7 @@ func (m *Monitor) Statistics(tags map[string]string) ([]*Statistic, error) { if err != nil { return } - statistic.Tags[t.Key] = u + statistic.Tags.SetString(t.Key, u) }) case "values": // string-interface map. @@ -281,14 +280,13 @@ func (m *Monitor) Statistics(tags map[string]string) ([]*Statistic, error) { statistic := &Statistic{ Statistic: models.Statistic{ Name: "runtime", - Tags: make(map[string]string), Values: make(map[string]interface{}), }, } // Add any supplied tags to Go memstats for k, v := range tags { - statistic.Tags[k] = v + statistic.Tags.SetString(k, v) } var rt runtime.MemStats diff --git a/services/collectd/service.go b/services/collectd/service.go index 24111876aa..5937238f18 100644 --- a/services/collectd/service.go +++ b/services/collectd/service.go @@ -70,7 +70,7 @@ func NewService(c Config) *Service { Logger: log.New(os.Stderr, "[collectd] ", log.LstdFlags), err: make(chan error), stats: &Statistics{}, - statTags: map[string]string{"bind": c.BindAddress}, + statTags: models.NewTags(map[string]string{"bind": c.BindAddress}), } return &s @@ -360,8 +360,9 @@ func (s *Service) UnmarshalCollectd(packet *gollectd.Packet) []models.Point { if packet.TypeInstance != "" { tags["type_instance"] = packet.TypeInstance } - p, err := models.NewPoint(name, tags, fields, timestamp) + // Drop invalid points + p, err := models.NewPoint(name, models.NewTags(tags), fields, timestamp) if err != nil { s.Logger.Printf("Dropping point %v: %v", name, err) atomic.AddInt64(&s.stats.InvalidDroppedPoints, 1) diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index a51150e0b6..a3e6328e3e 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -147,7 +147,7 @@ type Statistics struct { func (s *Service) Statistics(tags map[string]string) []models.Statistic { return []models.Statistic{{ Name: "cq", - Tags: tags, + Tags: models.NewTags(tags), Values: map[string]interface{}{ statQueryOK: atomic.LoadInt64(&s.stats.QueryOK), statQueryFail: atomic.LoadInt64(&s.stats.QueryFail), diff --git a/services/graphite/config.go b/services/graphite/config.go index 9261c3edca..ace3f6d805 100644 --- a/services/graphite/config.go +++ b/services/graphite/config.go @@ -116,12 +116,12 @@ func (c *Config) WithDefaults() *Config { // DefaultTags returns the config's tags. func (c *Config) DefaultTags() models.Tags { - tags := models.Tags{} + m := make(map[string]string, len(c.Tags)) for _, t := range c.Tags { parts := strings.Split(t, "=") - tags[parts[0]] = parts[1] + m[parts[0]] = parts[1] } - return tags + return models.NewTags(m) } // Validate validates the config's templates and tags. diff --git a/services/graphite/parser.go b/services/graphite/parser.go index 6bd63cbb43..2e576b64e1 100644 --- a/services/graphite/parser.go +++ b/services/graphite/parser.go @@ -64,12 +64,12 @@ func NewParserWithOptions(options Options) (*Parser, error) { } // Parse out the default tags specific to this template - tags := models.Tags{} + var tags models.Tags if strings.Contains(parts[len(parts)-1], "=") { tagStrs := strings.Split(parts[len(parts)-1], ",") for _, kv := range tagStrs { parts := strings.Split(kv, "=") - tags[parts[0]] = parts[1] + tags.SetString(parts[0], parts[1]) } } @@ -151,12 +151,12 @@ func (p *Parser) Parse(line string) (models.Point, error) { } // Set the default tags on the point if they are not already set - for k, v := range p.tags { - if _, ok := tags[k]; !ok { - tags[k] = v + for _, t := range p.tags { + if _, ok := tags[string(t.Key)]; !ok { + tags[string(t.Key)] = string(t.Value) } } - return models.NewPoint(measurement, tags, fieldValues, timestamp) + return models.NewPoint(measurement, models.NewTags(tags), fieldValues, timestamp) } // ApplyTemplate extracts the template fields from the given line and @@ -171,9 +171,9 @@ func (p *Parser) ApplyTemplate(line string) (string, map[string]string, string, template := p.matcher.Match(fields[0]) name, tags, field, err := template.Apply(fields[0]) // Set the default tags on the point if they are not already set - for k, v := range p.tags { - if _, ok := tags[k]; !ok { - tags[k] = v + for _, t := range p.tags { + if _, ok := tags[string(t.Key)]; !ok { + tags[string(t.Key)] = string(t.Value) } } return name, tags, field, err @@ -223,8 +223,8 @@ func (t *template) Apply(line string) (string, map[string]string, string, error) ) // Set any default tags - for k, v := range t.defaultTags { - tags[k] = append(tags[k], v) + for _, t := range t.defaultTags { + tags[string(t.Key)] = append(tags[string(t.Key)], string(t.Value)) } // See if an invalid combination has been specified in the template: diff --git a/services/graphite/parser_test.go b/services/graphite/parser_test.go index 0c5581d232..85f2713dba 100644 --- a/services/graphite/parser_test.go +++ b/services/graphite/parser_test.go @@ -261,7 +261,7 @@ func TestFilterMatchDefault(t *testing.T) { } exp := models.MustNewPoint("miss.servers.localhost.cpu_load", - models.Tags{}, + models.NewTags(map[string]string{}), models.Fields{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -282,7 +282,7 @@ func TestFilterMatchMultipleMeasurement(t *testing.T) { } exp := models.MustNewPoint("cpu.cpu_load.10", - models.Tags{"host": "localhost"}, + models.NewTags(map[string]string{"host": "localhost"}), models.Fields{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -306,7 +306,7 @@ func TestFilterMatchMultipleMeasurementSeparator(t *testing.T) { } exp := models.MustNewPoint("cpu_cpu_load_10", - models.Tags{"host": "localhost"}, + models.NewTags(map[string]string{"host": "localhost"}), models.Fields{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -327,7 +327,7 @@ func TestFilterMatchSingle(t *testing.T) { } exp := models.MustNewPoint("cpu_load", - models.Tags{"host": "localhost"}, + models.NewTags(map[string]string{"host": "localhost"}), models.Fields{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -348,7 +348,7 @@ func TestParseNoMatch(t *testing.T) { } exp := models.MustNewPoint("servers.localhost.memory.VmallocChunk", - models.Tags{}, + models.NewTags(map[string]string{}), models.Fields{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -369,7 +369,7 @@ func TestFilterMatchWildcard(t *testing.T) { } exp := models.MustNewPoint("cpu_load", - models.Tags{"host": "localhost"}, + models.NewTags(map[string]string{"host": "localhost"}), models.Fields{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -392,7 +392,7 @@ func TestFilterMatchExactBeforeWildcard(t *testing.T) { } exp := models.MustNewPoint("cpu_load", - models.Tags{"host": "localhost"}, + models.NewTags(map[string]string{"host": "localhost"}), models.Fields{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -420,7 +420,7 @@ func TestFilterMatchMostLongestFilter(t *testing.T) { } exp := models.MustNewPoint("cpu_load", - models.Tags{"host": "localhost", "resource": "cpu"}, + models.NewTags(map[string]string{"host": "localhost", "resource": "cpu"}), models.Fields{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -447,7 +447,7 @@ func TestFilterMatchMultipleWildcards(t *testing.T) { } exp := models.MustNewPoint("cpu_load", - models.Tags{"host": "server01"}, + models.NewTags(map[string]string{"host": "server01"}), models.Fields{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -462,17 +462,17 @@ func TestFilterMatchMultipleWildcards(t *testing.T) { } func TestParseDefaultTags(t *testing.T) { - p, err := graphite.NewParser([]string{"servers.localhost .host.measurement*"}, models.Tags{ + p, err := graphite.NewParser([]string{"servers.localhost .host.measurement*"}, models.NewTags(map[string]string{ "region": "us-east", "zone": "1c", "host": "should not set", - }) + })) if err != nil { t.Fatalf("unexpected error creating parser, got %v", err) } exp := models.MustNewPoint("cpu_load", - models.Tags{"host": "localhost", "region": "us-east", "zone": "1c"}, + models.NewTags(map[string]string{"host": "localhost", "region": "us-east", "zone": "1c"}), models.Fields{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -487,16 +487,16 @@ func TestParseDefaultTags(t *testing.T) { } func TestParseDefaultTemplateTags(t *testing.T) { - p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c"}, models.Tags{ + p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c"}, models.NewTags(map[string]string{ "region": "us-east", "host": "should not set", - }) + })) if err != nil { t.Fatalf("unexpected error creating parser, got %v", err) } exp := models.MustNewPoint("cpu_load", - models.Tags{"host": "localhost", "region": "us-east", "zone": "1c"}, + models.NewTags(map[string]string{"host": "localhost", "region": "us-east", "zone": "1c"}), models.Fields{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -511,16 +511,16 @@ func TestParseDefaultTemplateTags(t *testing.T) { } func TestParseDefaultTemplateTagsOverridGlobal(t *testing.T) { - p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c,region=us-east"}, models.Tags{ + p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c,region=us-east"}, models.NewTags(map[string]string{ "region": "shot not be set", "host": "should not set", - }) + })) if err != nil { t.Fatalf("unexpected error creating parser, got %v", err) } exp := models.MustNewPoint("cpu_load", - models.Tags{"host": "localhost", "region": "us-east", "zone": "1c"}, + models.NewTags(map[string]string{"host": "localhost", "region": "us-east", "zone": "1c"}), models.Fields{"value": float64(11)}, time.Unix(1435077219, 0)) @@ -535,16 +535,16 @@ func TestParseDefaultTemplateTagsOverridGlobal(t *testing.T) { } func TestParseTemplateWhitespace(t *testing.T) { - p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c"}, models.Tags{ + p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c"}, models.NewTags(map[string]string{ "region": "us-east", "host": "should not set", - }) + })) if err != nil { t.Fatalf("unexpected error creating parser, got %v", err) } exp := models.MustNewPoint("cpu_load", - models.Tags{"host": "localhost", "region": "us-east", "zone": "1c"}, + models.NewTags(map[string]string{"host": "localhost", "region": "us-east", "zone": "1c"}), models.Fields{"value": float64(11)}, time.Unix(1435077219, 0)) diff --git a/services/graphite/service.go b/services/graphite/service.go index ab9bf94d1d..51f559bff2 100644 --- a/services/graphite/service.go +++ b/services/graphite/service.go @@ -106,7 +106,7 @@ func NewService(c Config) (*Service, error) { batchTimeout: time.Duration(d.BatchTimeout), logger: log.New(os.Stderr, fmt.Sprintf("[graphite] %s ", d.BindAddress), log.LstdFlags), stats: &Statistics{}, - statTags: map[string]string{"proto": d.Protocol, "bind": d.BindAddress}, + statTags: models.NewTags(map[string]string{"proto": d.Protocol, "bind": d.BindAddress}), tcpConnections: make(map[string]*tcpConnection), done: make(chan struct{}), diagsKey: strings.Join([]string{"graphite", d.Protocol, d.BindAddress}, ":"), diff --git a/services/graphite/service_test.go b/services/graphite/service_test.go index e523ae00be..47b373d3a6 100644 --- a/services/graphite/service_test.go +++ b/services/graphite/service_test.go @@ -39,7 +39,7 @@ func Test_ServerGraphiteTCP(t *testing.T) { pt, _ := models.NewPoint( "cpu", - map[string]string{}, + models.NewTags(map[string]string{}), map[string]interface{}{"value": 23.456}, time.Unix(now.Unix(), 0)) @@ -115,7 +115,7 @@ func Test_ServerGraphiteUDP(t *testing.T) { pt, _ := models.NewPoint( "cpu", - map[string]string{}, + models.NewTags(map[string]string{}), map[string]interface{}{"value": 23.456}, time.Unix(now.Unix(), 0)) if database != "graphitedb" { diff --git a/services/httpd/handler.go b/services/httpd/handler.go index fe5de82280..8ce5493c4d 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -179,7 +179,7 @@ type Statistics struct { func (h *Handler) Statistics(tags map[string]string) []models.Statistic { return []models.Statistic{{ Name: "httpd", - Tags: tags, + Tags: models.NewTags(tags), Values: map[string]interface{}{ statRequest: atomic.LoadInt64(&h.stats.Requests), statCQRequest: atomic.LoadInt64(&h.stats.CQRequests), @@ -737,24 +737,24 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) { for _, s := range stats { // Very hackily create a unique key. buf := bytes.NewBufferString(s.Name) - if path, ok := s.Tags["path"]; ok { + if path := s.Tags.Get([]byte("path")); path != nil { fmt.Fprintf(buf, ":%s", path) - if id, ok := s.Tags["id"]; ok { + if id := s.Tags.Get([]byte("id")); id != nil { fmt.Fprintf(buf, ":%s", id) } - } else if bind, ok := s.Tags["bind"]; ok { - if proto, ok := s.Tags["proto"]; ok { + } else if bind := s.Tags.Get([]byte("bind")); bind != nil { + if proto := s.Tags.Get([]byte("proto")); proto != nil { fmt.Fprintf(buf, ":%s", proto) } fmt.Fprintf(buf, ":%s", bind) - } else if database, ok := s.Tags["database"]; ok { + } else if database := s.Tags.Get([]byte("database")); database != nil { fmt.Fprintf(buf, ":%s", database) - if rp, ok := s.Tags["retention_policy"]; ok { + if rp := s.Tags.Get([]byte("retention_policy")); rp != nil { fmt.Fprintf(buf, ":%s", rp) - if name, ok := s.Tags["name"]; ok { + if name := s.Tags.Get([]byte("name")); name != nil { fmt.Fprintf(buf, ":%s", name) } - if dest, ok := s.Tags["destination"]; ok { + if dest := s.Tags.Get([]byte("destination")); dest != nil { fmt.Fprintf(buf, ":%s", dest) } } diff --git a/services/httpd/response_writer.go b/services/httpd/response_writer.go index 8bdd2926eb..5f7845bc47 100644 --- a/services/httpd/response_writer.go +++ b/services/httpd/response_writer.go @@ -114,7 +114,7 @@ func (w *csvResponseWriter) WriteResponse(resp Response) (n int, err error) { for _, row := range result.Series { w.columns[0] = row.Name if len(row.Tags) > 0 { - w.columns[1] = string(models.Tags(row.Tags).HashKey()[1:]) + w.columns[1] = string(models.NewTags(row.Tags).HashKey()[1:]) } else { w.columns[1] = "" } diff --git a/services/httpd/service.go b/services/httpd/service.go index 4e608f8d79..064bbb588d 100644 --- a/services/httpd/service.go +++ b/services/httpd/service.go @@ -192,7 +192,7 @@ func (s *Service) Addr() net.Addr { // Statistics returns statistics for periodic monitoring. func (s *Service) Statistics(tags map[string]string) []models.Statistic { - return s.Handler.Statistics(models.Tags{"bind": s.addr}.Merge(tags)) + return s.Handler.Statistics(models.NewTags(map[string]string{"bind": s.addr}).Merge(tags).Map()) } // serveTCP serves the handler from the TCP listener. diff --git a/services/opentsdb/handler.go b/services/opentsdb/handler.go index 892314a489..7b3ccb0e6f 100644 --- a/services/opentsdb/handler.go +++ b/services/opentsdb/handler.go @@ -111,7 +111,7 @@ func (h *Handler) servePut(w http.ResponseWriter, r *http.Request) { ts = time.Unix(p.Time/1000, (p.Time%1000)*1000) } - pt, err := models.NewPoint(p.Metric, p.Tags, map[string]interface{}{"value": p.Value}, ts) + pt, err := models.NewPoint(p.Metric, models.NewTags(p.Tags), map[string]interface{}{"value": p.Value}, ts) if err != nil { h.Logger.Printf("Dropping point %v: %v", p.Metric, err) if h.stats != nil { diff --git a/services/opentsdb/service.go b/services/opentsdb/service.go index 43508bebdc..8de8d83fd8 100644 --- a/services/opentsdb/service.go +++ b/services/opentsdb/service.go @@ -96,7 +96,7 @@ func NewService(c Config) (*Service, error) { Logger: log.New(os.Stderr, "[opentsdb] ", log.LstdFlags), LogPointErrors: d.LogPointErrors, stats: &Statistics{}, - statTags: map[string]string{"bind": d.BindAddress}, + statTags: models.NewTags(map[string]string{"bind": d.BindAddress}), } return s, nil } @@ -381,7 +381,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) { } fields["value"] = fv - pt, err := models.NewPoint(measurement, tags, fields, t) + pt, err := models.NewPoint(measurement, models.NewTags(tags), fields, t) if err != nil { atomic.AddInt64(&s.stats.TelnetBadFloat, 1) if s.LogPointErrors { diff --git a/services/opentsdb/service_test.go b/services/opentsdb/service_test.go index 5a4240a36a..d75920a959 100644 --- a/services/opentsdb/service_test.go +++ b/services/opentsdb/service_test.go @@ -39,7 +39,7 @@ func TestService_Telnet(t *testing.T) { } else if !reflect.DeepEqual(points, []models.Point{ models.MustNewPoint( "sys.cpu.user", - map[string]string{"host": "webserver01", "cpu": "0"}, + models.NewTags(map[string]string{"host": "webserver01", "cpu": "0"}), map[string]interface{}{"value": 42.5}, time.Unix(1356998400, 0), ), @@ -101,7 +101,7 @@ func TestService_HTTP(t *testing.T) { } else if !reflect.DeepEqual(points, []models.Point{ models.MustNewPoint( "sys.cpu.nice", - map[string]string{"dc": "lga", "host": "web01"}, + models.NewTags(map[string]string{"dc": "lga", "host": "web01"}), map[string]interface{}{"value": 18.0}, time.Unix(1346846400, 0), ), diff --git a/services/subscriber/service.go b/services/subscriber/service.go index fae321f2cf..962fdb56a6 100644 --- a/services/subscriber/service.go +++ b/services/subscriber/service.go @@ -130,7 +130,7 @@ type Statistics struct { func (s *Service) Statistics(tags map[string]string) []models.Statistic { statistics := []models.Statistic{{ Name: "subscriber", - Tags: tags, + Tags: models.NewTags(tags), Values: map[string]interface{}{ statPointsWritten: atomic.LoadInt64(&s.stats.PointsWritten), statWriteFailures: atomic.LoadInt64(&s.stats.WriteFailures), @@ -415,13 +415,13 @@ func (b *balancewriter) WritePoints(p *coordinator.WritePointsRequest) error { // Statistics returns statistics for periodic monitoring. func (b *balancewriter) Statistics(tags map[string]string) []models.Statistic { - tags = models.Tags(tags).Merge(b.tags) + tags = models.NewTags(tags).Merge(b.tags).Map() statistics := make([]models.Statistic, len(b.stats)) for i := range b.stats { statistics[i] = models.Statistic{ Name: "subscriber", - Tags: models.Tags(tags).Merge(map[string]string{"destination": b.stats[i].dest}), + Tags: models.NewTags(tags).Merge(map[string]string{"destination": b.stats[i].dest}), Values: map[string]interface{}{ statPointsWritten: atomic.LoadInt64(&b.stats[i].pointsWritten), statWriteFailures: atomic.LoadInt64(&b.stats[i].failures), diff --git a/services/udp/service.go b/services/udp/service.go index e94c156118..9d0628754e 100644 --- a/services/udp/service.go +++ b/services/udp/service.go @@ -71,7 +71,7 @@ func NewService(c Config) *Service { batcher: tsdb.NewPointBatcher(d.BatchSize, d.BatchPending, time.Duration(d.BatchTimeout)), Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags), stats: &Statistics{}, - statTags: map[string]string{"bind": d.BindAddress}, + statTags: models.NewTags(map[string]string{"bind": d.BindAddress}), } } diff --git a/stress/v2/stress_client/commune_test.go b/stress/v2/stress_client/commune_test.go index d28f1febd6..c392ea14d7 100644 --- a/stress/v2/stress_client/commune_test.go +++ b/stress/v2/stress_client/commune_test.go @@ -12,8 +12,8 @@ func TestCommunePoint(t *testing.T) { if point.Name() != "write" { t.Errorf("expected: write\ngot: %v", point.Name()) } - if point.Tags()["tag"] != "tagVal" { - t.Errorf("expected: tagVal\ngot: %v", point.Tags()["tag"]) + if point.Tags().GetString("tag") != "tagVal" { + t.Errorf("expected: tagVal\ngot: %v", point.Tags().GetString("tag")) } if int(point.Fields()["fooField"].(float64)) != 5 { t.Errorf("expected: 5\ngot: %v\n", point.Fields()["fooField"]) @@ -24,8 +24,8 @@ func TestCommunePoint(t *testing.T) { if point.Name() != "write" { t.Errorf("expected: write\ngot: %v", point.Name()) } - if point.Tags()["tag"] != "tagVal" { - t.Errorf("expected: tagVal\ngot: %v", point.Tags()["tag"]) + if point.Tags().GetString("tag") != "tagVal" { + t.Errorf("expected: tagVal\ngot: %v", point.Tags().GetString("tag")) } if int(point.Fields()["fooField"].(float64)) != 5 { t.Errorf("expected: 5\ngot: %v\n", point.Fields()["fooField"]) @@ -40,8 +40,8 @@ func TestSetCommune(t *testing.T) { if pt.Name() != "write" { t.Errorf("expected: write\ngot: %v", pt.Name()) } - if pt.Tags()["tag"] != "tagVal" { - t.Errorf("expected: tagVal\ngot: %v", pt.Tags()["tag"]) + if pt.Tags().GetString("tag") != "tagVal" { + t.Errorf("expected: tagVal\ngot: %v", pt.Tags().GetString("tag")) } if int(pt.Fields()["fooField"].(float64)) != 5 { t.Errorf("expected: 5\ngot: %v\n", pt.Fields()["fooField"]) diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index fe75017948..83d56d4bf9 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -171,7 +171,7 @@ type CacheStatistics struct { func (c *Cache) Statistics(tags map[string]string) []models.Statistic { return []models.Statistic{{ Name: "tsm1_cache", - Tags: tags, + Tags: models.NewTags(tags), Values: map[string]interface{}{ statCacheMemoryBytes: atomic.LoadInt64(&c.stats.MemSizeBytes), statCacheDiskBytes: atomic.LoadInt64(&c.stats.DiskSizeBytes), diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index d1235c088d..197a0e2aa1 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -2,6 +2,7 @@ package tsm1 // import "github.com/influxdata/influxdb/tsdb/engine/tsm1" import ( "archive/tar" + "bytes" "fmt" "io" "io/ioutil" @@ -248,7 +249,7 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic { statistics := make([]models.Statistic, 0, 4) statistics = append(statistics, models.Statistic{ Name: "tsm1_engine", - Tags: tags, + Tags: models.NewTags(tags), Values: map[string]interface{}{ statCacheCompactions: atomic.LoadInt64(&e.stats.CacheCompactions), statCacheCompactionDuration: atomic.LoadInt64(&e.stats.CacheCompactionDuration), @@ -338,8 +339,9 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index *tsdb.DatabaseIndex) er // Save reference to index for iterator creation. e.index = index + e.FileStore.dereferencer = index - if err := e.FileStore.WalkKeys(func(key string, typ byte) error { + if err := e.FileStore.WalkKeys(func(key []byte, typ byte) error { fieldType, err := tsmFieldTypeToInfluxQLDataType(typ) if err != nil { return err @@ -365,7 +367,7 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index *tsdb.DatabaseIndex) er continue } - if err := e.addToIndexFromKey(shardID, key, fieldType, index); err != nil { + if err := e.addToIndexFromKey(shardID, []byte(key), fieldType, index); err != nil { return err } } @@ -522,9 +524,9 @@ func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string) er // addToIndexFromKey will pull the measurement name, series key, and field name from a composite key and add it to the // database index and measurement fields -func (e *Engine) addToIndexFromKey(shardID uint64, key string, fieldType influxql.DataType, index *tsdb.DatabaseIndex) error { +func (e *Engine) addToIndexFromKey(shardID uint64, key []byte, fieldType influxql.DataType, index *tsdb.DatabaseIndex) error { seriesKey, field := SeriesAndFieldFromCompositeKey(key) - measurement := tsdb.MeasurementFromSeriesKey(seriesKey) + measurement := tsdb.MeasurementFromSeriesKey(string(seriesKey)) m := index.CreateMeasurementIndexIfNotExists(measurement) m.SetFieldName(field) @@ -540,7 +542,7 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key string, fieldType influxq } // Have we already indexed this series? - ss := index.Series(seriesKey) + ss := index.SeriesBytes(seriesKey) if ss != nil { // Add this shard to the existing series ss.AssignShard(shardID) @@ -551,7 +553,7 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key string, fieldType influxq // fields (in line protocol format) in the series key _, tags, _ := models.ParseKey(seriesKey) - s := tsdb.NewSeries(seriesKey, tags) + s := tsdb.NewSeries(string(seriesKey), tags) index.CreateSeriesIndexIfNotExists(measurement, s) s.AssignShard(shardID) @@ -593,14 +595,14 @@ func (e *Engine) ContainsSeries(keys []string) (map[string]bool, error) { } for _, k := range e.Cache.Keys() { - seriesKey, _ := SeriesAndFieldFromCompositeKey(k) - keyMap[seriesKey] = true + seriesKey, _ := SeriesAndFieldFromCompositeKey([]byte(k)) + keyMap[string(seriesKey)] = true } - if err := e.FileStore.WalkKeys(func(k string, _ byte) error { + if err := e.FileStore.WalkKeys(func(k []byte, _ byte) error { seriesKey, _ := SeriesAndFieldFromCompositeKey(k) - if _, ok := keyMap[seriesKey]; ok { - keyMap[seriesKey] = true + if _, ok := keyMap[string(seriesKey)]; ok { + keyMap[string(seriesKey)] = true } return nil }); err != nil { @@ -638,14 +640,14 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error { deleteKeys := make([]string, 0, len(seriesKeys)) // go through the keys in the file store - if err := e.FileStore.WalkKeys(func(k string, _ byte) error { + if err := e.FileStore.WalkKeys(func(k []byte, _ byte) error { seriesKey, _ := SeriesAndFieldFromCompositeKey(k) // Keep track if we've added this key since WalkKeys can return keys // we've seen before - if v, ok := keyMap[seriesKey]; ok && v == 0 { - deleteKeys = append(deleteKeys, k) - keyMap[seriesKey] += 1 + if v, ok := keyMap[string(seriesKey)]; ok && v == 0 { + deleteKeys = append(deleteKeys, string(k)) + keyMap[string(seriesKey)] += 1 } return nil }); err != nil { @@ -665,8 +667,8 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error { e.Cache.RLock() s := e.Cache.Store() for k, _ := range s { - seriesKey, _ := SeriesAndFieldFromCompositeKey(k) - if _, ok := keyMap[seriesKey]; ok { + seriesKey, _ := SeriesAndFieldFromCompositeKey([]byte(k)) + if _, ok := keyMap[string(seriesKey)]; ok { walKeys = append(walKeys, k) } } @@ -1277,7 +1279,7 @@ func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, mm *tsdb.Measu // createVarRefSeriesIterator creates an iterator for a variable reference for a series. func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measurement, seriesKey string, t *influxql.TagSet, filter influxql.Expr, conditionFields []influxql.VarRef, opt influxql.IteratorOptions) (influxql.Iterator, error) { - tags := influxql.NewTags(e.index.TagsForSeries(seriesKey)) + tags := influxql.NewTags(e.index.TagsForSeries(seriesKey).Map()) // Create options specific for this series. itrOpt := opt @@ -1496,11 +1498,11 @@ func tsmFieldTypeToInfluxQLDataType(typ byte) (influxql.DataType, error) { } } -func SeriesAndFieldFromCompositeKey(key string) (string, string) { - sep := strings.Index(key, keyFieldSeparator) +func SeriesAndFieldFromCompositeKey(key []byte) ([]byte, string) { + sep := bytes.Index(key, []byte(keyFieldSeparator)) if sep == -1 { // No field??? return key, "" } - return key[:sep], key[sep+len(keyFieldSeparator):] + return key[:sep], string(key[sep+len(keyFieldSeparator):]) } diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index da12c0da65..012ddc6282 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -44,7 +44,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) { // Verify index is correct. if m := index.Measurement("cpu"); m == nil { t.Fatal("measurement not found") - } else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "A"}) { + } else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "A"})) { t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags) } @@ -67,7 +67,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) { // Verify index is correct. if m := index.Measurement("cpu"); m == nil { t.Fatal("measurement not found") - } else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "A"}) { + } else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "A"})) { t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags) } @@ -92,9 +92,9 @@ func TestEngine_LoadMetadataIndex(t *testing.T) { // Verify index is correct. if m := index.Measurement("cpu"); m == nil { t.Fatal("measurement not found") - } else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "A"}) { + } else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "A"})) { t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags) - } else if s := m.SeriesByID(2); s.Key != "cpu,host=B" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "B"}) { + } else if s := m.SeriesByID(2); s.Key != "cpu,host=B" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "B"})) { t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags) } } @@ -222,7 +222,7 @@ func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) { e.Index().CreateMeasurementIndexIfNotExists("cpu") e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) - e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"})) + e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) if err := e.WritePointsString( `cpu,host=A value=1.1 1000000000`, `cpu,host=A value=1.2 2000000000`, @@ -275,7 +275,7 @@ func TestEngine_CreateIterator_Cache_Descending(t *testing.T) { e.Index().CreateMeasurementIndexIfNotExists("cpu") e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) - e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"})) + e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) if err := e.WritePointsString( `cpu,host=A value=1.1 1000000000`, `cpu,host=A value=1.2 2000000000`, @@ -328,7 +328,7 @@ func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) { e.Index().CreateMeasurementIndexIfNotExists("cpu") e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) - e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"})) + e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) if err := e.WritePointsString( `cpu,host=A value=1.1 1000000000`, `cpu,host=A value=1.2 2000000000`, @@ -382,7 +382,7 @@ func TestEngine_CreateIterator_TSM_Descending(t *testing.T) { e.Index().CreateMeasurementIndexIfNotExists("cpu") e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) - e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"})) + e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) if err := e.WritePointsString( `cpu,host=A value=1.1 1000000000`, `cpu,host=A value=1.2 2000000000`, @@ -437,7 +437,7 @@ func TestEngine_CreateIterator_Aux(t *testing.T) { e.Index().CreateMeasurementIndexIfNotExists("cpu") e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) e.MeasurementFields("cpu").CreateFieldIfNotExists("F", influxql.Float, false) - e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"})) + e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) if err := e.WritePointsString( `cpu,host=A value=1.1 1000000000`, `cpu,host=A F=100 1000000000`, @@ -497,7 +497,7 @@ func TestEngine_CreateIterator_Condition(t *testing.T) { e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) e.MeasurementFields("cpu").CreateFieldIfNotExists("X", influxql.Float, false) e.MeasurementFields("cpu").CreateFieldIfNotExists("Y", influxql.Float, false) - e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"})) + e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) if err := e.WritePointsString( `cpu,host=A value=1.1 1000000000`, `cpu,host=A X=10 1000000000`, @@ -663,7 +663,7 @@ func MustInitBenchmarkEngine(pointN int) *Engine { // Initialize metadata. e.Index().CreateMeasurementIndexIfNotExists("cpu") e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false) - e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"})) + e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"}))) // Generate time ascending points with jitterred time & value. rand := rand.New(rand.NewSource(0)) diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index ccc0866839..3281168806 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -58,7 +58,7 @@ type TSMFile interface { KeyCount() int // KeyAt returns the key located at index position idx - KeyAt(idx int) (string, byte) + KeyAt(idx int) ([]byte, byte) // Type returns the block type of the values stored for the key. Returns one of // BlockFloat64, BlockInt64, BlockBoolean, BlockString. If key does not exist, @@ -106,6 +106,13 @@ type TSMFile interface { // BlockIterator returns an iterator pointing to the first block in the file and // allows sequential iteration to each every block. BlockIterator() *BlockIterator + + // Removes mmap references held by another object. + deref(dereferencer) +} + +type dereferencer interface { + Dereference([]byte) } // Statistics gathered by the FileStore. @@ -132,6 +139,8 @@ type FileStore struct { purger *purger currentTempDirID int + + dereferencer dereferencer } type FileStat struct { @@ -157,7 +166,7 @@ func (f FileStat) ContainsKey(key string) bool { func NewFileStore(dir string) *FileStore { logger := log.New(os.Stderr, "[filestore] ", log.LstdFlags) - return &FileStore{ + fs := &FileStore{ dir: dir, lastModified: time.Now(), logger: logger, @@ -169,6 +178,8 @@ func NewFileStore(dir string) *FileStore { logger: logger, }, } + fs.purger.fileStore = fs + return fs } // enableTraceLogging must be called before the FileStore is opened. @@ -204,7 +215,7 @@ type FileStoreStatistics struct { func (f *FileStore) Statistics(tags map[string]string) []models.Statistic { return []models.Statistic{{ Name: "tsm1_filestore", - Tags: tags, + Tags: models.NewTags(tags), Values: map[string]interface{}{ statFileStoreBytes: atomic.LoadInt64(&f.stats.DiskBytes), statFileStoreCount: atomic.LoadInt64(&f.stats.FileCount), @@ -281,7 +292,7 @@ func (f *FileStore) Remove(paths ...string) { // WalkKeys calls fn for every key in every TSM file known to the FileStore. If the key // exists in multiple files, it will be invoked for each file. -func (f *FileStore) WalkKeys(fn func(key string, typ byte) error) error { +func (f *FileStore) WalkKeys(fn func(key []byte, typ byte) error) error { f.mu.RLock() defer f.mu.RUnlock() @@ -305,7 +316,7 @@ func (f *FileStore) Keys() map[string]byte { for _, f := range f.files { for i := 0; i < f.KeyCount(); i++ { key, typ := f.KeyAt(i) - uniqueKeys[key] = typ + uniqueKeys[string(key)] = typ } } @@ -556,6 +567,11 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error { continue } + // Remove any mmap references held by the index. + if f.dereferencer != nil { + file.deref(f.dereferencer) + } + if err := file.Close(); err != nil { return err } @@ -1088,9 +1104,10 @@ func (c *KeyCursor) filterBooleanValues(tombstones []TimeRange, values BooleanVa } type purger struct { - mu sync.RWMutex - files map[string]TSMFile - running bool + mu sync.RWMutex + fileStore *FileStore + files map[string]TSMFile + running bool logger *log.Logger } @@ -1118,6 +1135,11 @@ func (p *purger) purge() { p.mu.Lock() for k, v := range p.files { if !v.InUse() { + // Remove any mmap references held by the index. + if p.fileStore.dereferencer != nil { + v.deref(p.fileStore.dereferencer) + } + if err := v.Close(); err != nil { p.logger.Printf("purge: close file: %v", err) continue diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index ddeca7df12..0edddf2bac 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -68,7 +68,7 @@ type TSMIndex interface { Key(index int) (string, []IndexEntry) // KeyAt returns the key in the index at the given postion. - KeyAt(index int) (string, byte) + KeyAt(index int) ([]byte, byte) // KeyCount returns the count of unique keys in the index. KeyCount() int @@ -116,7 +116,7 @@ func (b *BlockIterator) PeekNext() string { return b.key } else if b.n-b.i > 1 { key, _ := b.r.KeyAt(b.i + 1) - return key + return string(key) } return "" } @@ -243,7 +243,7 @@ func (t *TSMReader) Key(index int) (string, []IndexEntry) { } // KeyAt returns the key and key type at position idx in the index. -func (t *TSMReader) KeyAt(idx int) (string, byte) { +func (t *TSMReader) KeyAt(idx int) ([]byte, byte) { return t.index.KeyAt(idx) } @@ -489,6 +489,13 @@ func (t *TSMReader) BlockIterator() *BlockIterator { } } +// deref removes mmap references held by another object. +func (t *TSMReader) deref(d dereferencer) { + if acc, ok := t.accessor.(*mmapAccessor); ok { + d.Dereference(acc.b) + } +} + // indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This // implementation can be used for indexes that may be MMAPed into memory. type indirectIndex struct { @@ -667,15 +674,15 @@ func (d *indirectIndex) Key(idx int) (string, []IndexEntry) { return string(key), entries.entries } -func (d *indirectIndex) KeyAt(idx int) (string, byte) { +func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) { d.mu.RLock() defer d.mu.RUnlock() if idx < 0 || idx >= len(d.offsets) { - return "", 0 + return nil, 0 } n, key, _ := readKey(d.b[d.offsets[idx]:]) - return string(key), d.b[d.offsets[idx]+int32(n)] + return key, d.b[d.offsets[idx]+int32(n)] } func (d *indirectIndex) KeyCount() int { diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index aa21116e49..0d4925d238 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -144,7 +144,7 @@ type WALStatistics struct { func (l *WAL) Statistics(tags map[string]string) []models.Statistic { return []models.Statistic{{ Name: "tsm1_wal", - Tags: tags, + Tags: models.NewTags(tags), Values: map[string]interface{}{ statWALOldBytes: atomic.LoadInt64(&l.stats.OldBytes), statWALCurrentBytes: atomic.LoadInt64(&l.stats.CurrentBytes), diff --git a/tsdb/meta.go b/tsdb/meta.go index 638997ce08..54ce5e2d16 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -6,6 +6,7 @@ import ( "sort" "sync" "sync/atomic" + "unsafe" "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" @@ -56,7 +57,7 @@ type IndexStatistics struct { func (d *DatabaseIndex) Statistics(tags map[string]string) []models.Statistic { return []models.Statistic{{ Name: "database", - Tags: models.Tags(map[string]string{"database": d.name}).Merge(tags), + Tags: models.NewTags(map[string]string{"database": d.name}).Merge(tags), Values: map[string]interface{}{ statDatabaseSeries: atomic.LoadInt64(&d.stats.NumSeries), statDatabaseMeasurements: atomic.LoadInt64(&d.stats.NumMeasurements), @@ -72,6 +73,14 @@ func (d *DatabaseIndex) Series(key string) *Series { return s } +// SeriesBytes returns a series by key. +func (d *DatabaseIndex) SeriesBytes(key []byte) *Series { + d.mu.RLock() + s := d.series[string(key)] + d.mu.RUnlock() + return s +} + func (d *DatabaseIndex) SeriesKeys() []string { d.mu.RLock() s := make([]string, 0, len(d.series)) @@ -249,7 +258,7 @@ func (d *DatabaseIndex) RemoveShard(shardID uint64) { } // TagsForSeries returns the tag map for the passed in series -func (d *DatabaseIndex) TagsForSeries(key string) map[string]string { +func (d *DatabaseIndex) TagsForSeries(key string) models.Tags { d.mu.RLock() defer d.mu.RUnlock() @@ -515,6 +524,16 @@ func (d *DatabaseIndex) DropSeries(keys []string) { atomic.AddInt64(&d.stats.NumSeries, -nDeleted) } +// Dereference removes all references to data within b and moves them to the heap. +func (d *DatabaseIndex) Dereference(b []byte) { + d.mu.RLock() + defer d.mu.RUnlock() + + for _, s := range d.series { + s.Dereference(b) + } +} + // Measurement represents a collection of time series in a database. It also contains in memory // structures for indexing tags. Exported functions are goroutine safe while un-exported functions // assume the caller will use the appropriate locks @@ -647,13 +666,13 @@ func (m *Measurement) AddSeries(s *Series) bool { } // add this series id to the tag index on the measurement - for k, v := range s.Tags { - valueMap := m.seriesByTagKeyValue[k] + for _, t := range s.Tags { + valueMap := m.seriesByTagKeyValue[string(t.Key)] if valueMap == nil { valueMap = make(map[string]SeriesIDs) - m.seriesByTagKeyValue[k] = valueMap + m.seriesByTagKeyValue[string(t.Key)] = valueMap } - ids := valueMap[v] + ids := valueMap[string(t.Value)] ids = append(ids, s.id) // most of the time the series ID will be higher than all others because it's a new @@ -661,7 +680,7 @@ func (m *Measurement) AddSeries(s *Series) bool { if len(ids) > 1 && ids[len(ids)-1] < ids[len(ids)-2] { sort.Sort(ids) } - valueMap[v] = ids + valueMap[string(t.Value)] = ids } return true @@ -683,19 +702,19 @@ func (m *Measurement) DropSeries(series *Series) { // remove this series id from the tag index on the measurement // s.seriesByTagKeyValue is defined as map[string]map[string]SeriesIDs - for k, v := range series.Tags { - values := m.seriesByTagKeyValue[k][v] + for _, t := range series.Tags { + values := m.seriesByTagKeyValue[string(t.Key)][string(t.Value)] ids := filter(values, seriesID) // Check to see if we have any ids, if not, remove the key if len(ids) == 0 { - delete(m.seriesByTagKeyValue[k], v) + delete(m.seriesByTagKeyValue[string(t.Key)], string(t.Value)) } else { - m.seriesByTagKeyValue[k][v] = ids + m.seriesByTagKeyValue[string(t.Key)][string(t.Value)] = ids } // If we have no values, then we delete the key - if len(m.seriesByTagKeyValue[k]) == 0 { - delete(m.seriesByTagKeyValue, k) + if len(m.seriesByTagKeyValue[string(t.Key)]) == 0 { + delete(m.seriesByTagKeyValue, string(t.Key)) } } @@ -758,7 +777,7 @@ func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]* // Build the TagSet for this series. for _, dim := range dimensions { - tags[dim] = s.Tags[dim] + tags[dim] = s.Tags.GetString(dim) } // Convert the TagSet to a string, so it can be added to a map allowing TagSets to be handled @@ -1423,40 +1442,52 @@ func (a Measurements) union(other Measurements) Measurements { type Series struct { mu sync.RWMutex Key string - Tags map[string]string + Tags models.Tags id uint64 measurement *Measurement - shardIDs map[uint64]bool // shards that have this series defined + shardIDs []uint64 // shards that have this series defined } // NewSeries returns an initialized series struct -func NewSeries(key string, tags map[string]string) *Series { +func NewSeries(key string, tags models.Tags) *Series { return &Series{ - Key: key, - Tags: tags, - shardIDs: make(map[uint64]bool), + Key: key, + Tags: tags, } } func (s *Series) AssignShard(shardID uint64) { s.mu.Lock() - s.shardIDs[shardID] = true + if !s.assigned(shardID) { + s.shardIDs = append(s.shardIDs, shardID) + sort.Sort(uint64Slice(s.shardIDs)) + } s.mu.Unlock() } func (s *Series) UnassignShard(shardID uint64) { s.mu.Lock() - delete(s.shardIDs, shardID) + for i, v := range s.shardIDs { + if v == shardID { + s.shardIDs = append(s.shardIDs[:i], s.shardIDs[i+1:]...) + break + } + } s.mu.Unlock() } func (s *Series) Assigned(shardID uint64) bool { s.mu.RLock() - b := s.shardIDs[shardID] + b := s.assigned(shardID) s.mu.RUnlock() return b } +func (s *Series) assigned(shardID uint64) bool { + i := sort.Search(len(s.shardIDs), func(i int) bool { return s.shardIDs[i] >= shardID }) + return i < len(s.shardIDs) && s.shardIDs[i] == shardID +} + func (s *Series) ShardN() int { s.mu.RLock() n := len(s.shardIDs) @@ -1464,6 +1495,36 @@ func (s *Series) ShardN() int { return n } +// Dereference removes references to a byte slice. +func (s *Series) Dereference(b []byte) { + s.mu.Lock() + + min := uintptr(unsafe.Pointer(&b[0])) + max := min + uintptr(len(b)) + + for _, t := range s.Tags { + deref(&t.Key, min, max) + deref(&t.Value, min, max) + } + + s.mu.Unlock() +} + +func deref(v *[]byte, min, max uintptr) { + vv := *v + + // Ignore if value is not within range. + ptr := uintptr(unsafe.Pointer(&vv[0])) + if ptr < min || ptr > max { + return + } + + // Otherwise copy to the heap. + buf := make([]byte, len(vv)) + copy(buf, vv) + *v = buf +} + // MarshalBinary encodes the object to a binary format. func (s *Series) MarshalBinary() ([]byte, error) { s.mu.RLock() @@ -1471,10 +1532,8 @@ func (s *Series) MarshalBinary() ([]byte, error) { var pb internal.Series pb.Key = &s.Key - for k, v := range s.Tags { - key := k - value := v - pb.Tags = append(pb.Tags, &internal.Tag{Key: &key, Value: &value}) + for _, t := range s.Tags { + pb.Tags = append(pb.Tags, &internal.Tag{Key: proto.String(string(t.Key)), Value: proto.String(string(t.Value))}) } return proto.Marshal(&pb) } @@ -1489,9 +1548,9 @@ func (s *Series) UnmarshalBinary(buf []byte) error { return err } s.Key = pb.GetKey() - s.Tags = make(map[string]string, len(pb.Tags)) - for _, t := range pb.Tags { - s.Tags[t.GetKey()] = t.GetValue() + s.Tags = make(models.Tags, len(pb.Tags)) + for i, t := range pb.Tags { + s.Tags[i] = models.Tag{Key: []byte(t.GetKey()), Value: []byte(t.GetValue())} } return nil } @@ -1721,7 +1780,7 @@ func (m *Measurement) tagValuesByKeyAndSeriesID(tagKeys []string, ids SeriesIDs) // Iterate the tag keys we're interested in and collect values // from this series, if they exist. for _, tagKey := range tagKeys { - if tagVal, ok := s.Tags[tagKey]; ok { + if tagVal := s.Tags.GetString(tagKey); tagVal != "" { if _, ok = tagValues[tagKey]; !ok { tagValues[tagKey] = newStringSet() } @@ -1810,6 +1869,12 @@ func filter(a []uint64, v uint64) []uint64 { // contains a measurement name. func MeasurementFromSeriesKey(key string) string { // Ignoring the error because the func returns "missing fields" - k, _, _ := models.ParseKey(key) + k, _, _ := models.ParseKey([]byte(key)) return escape.UnescapeString(k) } + +type uint64Slice []uint64 + +func (a uint64Slice) Len() int { return len(a) } +func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] } diff --git a/tsdb/meta_test.go b/tsdb/meta_test.go index df6bec89d6..e4f54d3499 100644 --- a/tsdb/meta_test.go +++ b/tsdb/meta_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/tsdb" ) @@ -182,7 +183,7 @@ func genTestSeries(mCnt, tCnt, vCnt int) []*TestSeries { for _, ts := range tagSets { series = append(series, &TestSeries{ Measurement: m, - Series: tsdb.NewSeries(fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))), ts), + Series: tsdb.NewSeries(fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))), models.NewTags(ts)), }) } } diff --git a/tsdb/shard.go b/tsdb/shard.go index 5882a03e41..406c559079 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "sort" + "strings" "sync" "sync/atomic" "time" @@ -117,12 +118,12 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti closing: make(chan struct{}), stats: &ShardStatistics{}, - statTags: map[string]string{ + statTags: models.NewTags(map[string]string{ "path": path, "id": fmt.Sprintf("%d", id), "database": db, "retentionPolicy": rp, - }, + }), database: db, retentionPolicy: rp, @@ -177,10 +178,10 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic { return nil } - tags = s.statTags.Merge(tags) + tags = s.statTags.Merge(tags).Map() statistics := []models.Statistic{{ Name: "shard", - Tags: models.Tags(tags).Merge(map[string]string{"engine": s.options.EngineVersion}), + Tags: models.NewTags(tags).Merge(map[string]string{"engine": s.options.EngineVersion}), Values: map[string]interface{}{ statWriteReq: atomic.LoadInt64(&s.stats.WriteReq), statSeriesCreate: atomic.LoadInt64(&s.stats.SeriesCreated), @@ -449,9 +450,9 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*FieldCreate, for _, p := range points { // verify the tags and fields tags := p.Tags() - if _, ok := tags["time"]; ok { + if v := tags.Get([]byte("time")); v != nil { s.logger.Printf("dropping tag 'time' from '%s'\n", p.PrecisionString("")) - delete(tags, "time") + tags.Delete([]byte("time")) p.SetTags(tags) } @@ -1052,6 +1053,145 @@ func NewTagKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Itera return newMeasurementKeysIterator(sh, fn, opt) } +// tagValuesIterator emits key/tag values +type tagValuesIterator struct { + series []*Series // remaining series + keys []string // tag keys to select from a series + fields []string // fields to emit (key or value) + buf struct { + s *Series // current series + keys []string // current tag's keys + } +} + +// NewTagValuesIterator returns a new instance of TagValuesIterator. +func NewTagValuesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) { + if opt.Condition == nil { + return nil, errors.New("a condition is required") + } + + measurementExpr := influxql.CloneExpr(opt.Condition) + measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr { + switch e := e.(type) { + case *influxql.BinaryExpr: + switch e.Op { + case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: + tag, ok := e.LHS.(*influxql.VarRef) + if !ok || tag.Val != "_name" { + return nil + } + } + } + return e + }), nil) + + mms, ok, err := sh.index.measurementsByExpr(measurementExpr) + if err != nil { + return nil, err + } else if !ok { + mms = sh.index.Measurements() + sort.Sort(mms) + } + + // If there are no measurements, return immediately. + if len(mms) == 0 { + return &tagValuesIterator{}, nil + } + + filterExpr := influxql.CloneExpr(opt.Condition) + filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr { + switch e := e.(type) { + case *influxql.BinaryExpr: + switch e.Op { + case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: + tag, ok := e.LHS.(*influxql.VarRef) + if !ok || strings.HasPrefix(tag.Val, "_") { + return nil + } + } + } + return e + }), nil) + + var series []*Series + keys := newStringSet() + for _, mm := range mms { + ss, ok, err := mm.TagKeysByExpr(opt.Condition) + if err != nil { + return nil, err + } else if !ok { + keys.add(mm.TagKeys()...) + } else { + keys = keys.union(ss) + } + + ids, err := mm.seriesIDsAllOrByExpr(filterExpr) + if err != nil { + return nil, err + } + + for _, id := range ids { + series = append(series, mm.SeriesByID(id)) + } + } + + return &tagValuesIterator{ + series: series, + keys: keys.list(), + fields: influxql.VarRefs(opt.Aux).Strings(), + }, nil +} + +// Stats returns stats about the points processed. +func (itr *tagValuesIterator) Stats() influxql.IteratorStats { return influxql.IteratorStats{} } + +// Close closes the iterator. +func (itr *tagValuesIterator) Close() error { return nil } + +// Next emits the next point in the iterator. +func (itr *tagValuesIterator) Next() (*influxql.FloatPoint, error) { + for { + // If there are no more values then move to the next key. + if len(itr.buf.keys) == 0 { + if len(itr.series) == 0 { + return nil, nil + } + + itr.buf.s = itr.series[0] + itr.buf.keys = itr.keys + itr.series = itr.series[1:] + continue + } + + key := itr.buf.keys[0] + value := itr.buf.s.Tags.GetString(key) + if value == "" { + itr.buf.keys = itr.buf.keys[1:] + continue + } + + // Prepare auxiliary fields. + auxFields := make([]interface{}, len(itr.fields)) + for i, f := range itr.fields { + switch f { + case "_tagKey": + auxFields[i] = key + case "value": + auxFields[i] = value + } + } + + // Return next key. + p := &influxql.FloatPoint{ + Name: itr.buf.s.measurement.Name, + Aux: auxFields, + } + itr.buf.keys = itr.buf.keys[1:] + + return p, nil + } +} + // measurementKeyFunc is the function called by measurementKeysIterator. type measurementKeyFunc func(m *Measurement) []string diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index ccf7b43573..9bc5f66d36 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -47,7 +47,7 @@ func TestShardWriteAndIndex(t *testing.T) { pt := models.MustNewPoint( "cpu", - map[string]string{"host": "server"}, + models.Tags{{Key: []byte("host"), Value: []byte("server")}}, map[string]interface{}{"value": 1.0}, time.Unix(1, 2), ) @@ -69,7 +69,7 @@ func TestShardWriteAndIndex(t *testing.T) { } seriesTags := index.Series(string(pt.Key())).Tags - if len(seriesTags) != len(pt.Tags()) || pt.Tags()["host"] != seriesTags["host"] { + if len(seriesTags) != len(pt.Tags()) || pt.Tags().GetString("host") != seriesTags.GetString("host") { t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags(), seriesTags) } if !reflect.DeepEqual(index.Measurement("cpu").TagKeys(), []string{"host"}) { @@ -121,7 +121,7 @@ func TestMaxSeriesLimit(t *testing.T) { for i := 0; i < 1000; i++ { pt := models.MustNewPoint( "cpu", - map[string]string{"host": fmt.Sprintf("server%d", i)}, + models.Tags{{Key: []byte("host"), Value: []byte(fmt.Sprintf("server%d", i))}}, map[string]interface{}{"value": 1.0}, time.Unix(1, 2), ) @@ -136,7 +136,7 @@ func TestMaxSeriesLimit(t *testing.T) { // Writing one more series should exceed the series limit. pt := models.MustNewPoint( "cpu", - map[string]string{"host": "server9999"}, + models.Tags{{Key: []byte("host"), Value: []byte("server9999")}}, map[string]interface{}{"value": 1.0}, time.Unix(1, 2), ) @@ -169,7 +169,7 @@ func TestWriteTimeTag(t *testing.T) { pt := models.MustNewPoint( "cpu", - map[string]string{}, + models.NewTags(map[string]string{}), map[string]interface{}{"time": 1.0}, time.Unix(1, 2), ) @@ -189,7 +189,7 @@ func TestWriteTimeTag(t *testing.T) { pt = models.MustNewPoint( "cpu", - map[string]string{}, + models.NewTags(map[string]string{}), map[string]interface{}{"value": 1.0, "time": 1.0}, time.Unix(1, 2), ) @@ -230,7 +230,7 @@ func TestWriteTimeField(t *testing.T) { pt := models.MustNewPoint( "cpu", - map[string]string{"time": "now"}, + models.NewTags(map[string]string{"time": "now"}), map[string]interface{}{"value": 1.0}, time.Unix(1, 2), ) @@ -270,7 +270,7 @@ func TestShardWriteAddNewField(t *testing.T) { pt := models.MustNewPoint( "cpu", - map[string]string{"host": "server"}, + models.NewTags(map[string]string{"host": "server"}), map[string]interface{}{"value": 1.0}, time.Unix(1, 2), ) @@ -282,7 +282,7 @@ func TestShardWriteAddNewField(t *testing.T) { pt = models.MustNewPoint( "cpu", - map[string]string{"host": "server"}, + models.NewTags(map[string]string{"host": "server"}), map[string]interface{}{"value": 1.0, "value2": 2.0}, time.Unix(1, 2), ) @@ -296,7 +296,7 @@ func TestShardWriteAddNewField(t *testing.T) { t.Fatalf("series wasn't in index") } seriesTags := index.Series(string(pt.Key())).Tags - if len(seriesTags) != len(pt.Tags()) || pt.Tags()["host"] != seriesTags["host"] { + if len(seriesTags) != len(pt.Tags()) || pt.Tags().GetString("host") != seriesTags.GetString("host") { t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags(), seriesTags) } if !reflect.DeepEqual(index.Measurement("cpu").TagKeys(), []string{"host"}) { @@ -328,7 +328,7 @@ func TestShard_Close_RemoveIndex(t *testing.T) { pt := models.MustNewPoint( "cpu", - map[string]string{"host": "server"}, + models.NewTags(map[string]string{"host": "server"}), map[string]interface{}{"value": 1.0}, time.Unix(1, 2), ) @@ -521,7 +521,7 @@ func TestShard_Disabled_WriteQuery(t *testing.T) { pt := models.MustNewPoint( "cpu", - map[string]string{"host": "server"}, + models.NewTags(map[string]string{"host": "server"}), map[string]interface{}{"value": 1.0}, time.Unix(1, 2), ) diff --git a/tsdb/store.go b/tsdb/store.go index e021715df5..7ae575d969 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -929,13 +929,13 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err // Loop over all keys for each series. m := make(map[KeyValue]struct{}, len(ss)) for _, series := range ss { - for key, value := range series.Tags { + for _, t := range series.Tags { if !ok { // nop - } else if _, exists := keySet[key]; !exists { + } else if _, exists := keySet[string(t.Key)]; !exists { continue } - m[KeyValue{key, value}] = struct{}{} + m[KeyValue{string(t.Key), string(t.Value)}] = struct{}{} } }