reduce memory allocations in index
This commit changes the index to point to index data in the shards instead of keeping it in-memory on the heap.pull/7138/head
parent
35f2fdafcb
commit
8aa224b22d
|
@ -562,7 +562,7 @@ func (p *Point) MarshalJSON() ([]byte, error) {
|
|||
// MarshalString renders string representation of a Point with specified
|
||||
// precision. The default precision is nanoseconds.
|
||||
func (p *Point) MarshalString() string {
|
||||
pt, err := models.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time)
|
||||
pt, err := models.NewPoint(p.Measurement, models.NewTags(p.Tags), p.Fields, p.Time)
|
||||
if err != nil {
|
||||
return "# ERROR: " + err.Error() + " " + p.Measurement
|
||||
}
|
||||
|
|
|
@ -356,7 +356,7 @@ func NewPoint(
|
|||
T = t[0]
|
||||
}
|
||||
|
||||
pt, err := models.NewPoint(name, tags, fields, T)
|
||||
pt, err := models.NewPoint(name, models.NewTags(tags), fields, T)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -382,7 +382,7 @@ func (p *Point) Name() string {
|
|||
|
||||
// Tags returns the tags associated with the point
|
||||
func (p *Point) Tags() map[string]string {
|
||||
return p.pt.Tags()
|
||||
return p.pt.Tags().Map()
|
||||
}
|
||||
|
||||
// Time return the timestamp for the point
|
||||
|
|
|
@ -132,7 +132,7 @@ func (c *cmdExport) writeFiles() error {
|
|||
for i := 0; i < reader.KeyCount(); i++ {
|
||||
var pairs string
|
||||
key, typ := reader.KeyAt(i)
|
||||
values, _ := reader.ReadAll(key)
|
||||
values, _ := reader.ReadAll(string(key))
|
||||
measurement, field := tsm1.SeriesAndFieldFromCompositeKey(key)
|
||||
|
||||
for _, value := range values {
|
||||
|
|
|
@ -78,7 +78,7 @@ func cmdReport(opts *reportOpts) {
|
|||
totalSeries.Add([]byte(key))
|
||||
|
||||
if opts.detailed {
|
||||
sep := strings.Index(key, "#!~#")
|
||||
sep := strings.Index(string(key), "#!~#")
|
||||
seriesKey, field := key[:sep], key[sep+4:]
|
||||
measurement, tags, _ := models.ParseKey(seriesKey)
|
||||
|
||||
|
@ -96,13 +96,13 @@ func cmdReport(opts *reportOpts) {
|
|||
}
|
||||
fieldCount.Add([]byte(field))
|
||||
|
||||
for t, v := range tags {
|
||||
tagCount, ok := tagCardialities[t]
|
||||
for _, t := range tags {
|
||||
tagCount, ok := tagCardialities[string(t.Key)]
|
||||
if !ok {
|
||||
tagCount = hllpp.New()
|
||||
tagCardialities[t] = tagCount
|
||||
tagCardialities[string(t.Key)] = tagCount
|
||||
}
|
||||
tagCount.Add([]byte(v))
|
||||
tagCount.Add(t.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -121,9 +121,9 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {
|
|||
var pos int
|
||||
for i := 0; i < keyCount; i++ {
|
||||
key, _ := r.KeyAt(i)
|
||||
for _, e := range r.Entries(key) {
|
||||
for _, e := range r.Entries(string(key)) {
|
||||
pos++
|
||||
split := strings.Split(key, "#!~#")
|
||||
split := strings.Split(string(key), "#!~#")
|
||||
|
||||
// We dont' know know if we have fields so use an informative default
|
||||
var measurement, field string = "UNKNOWN", "UNKNOWN"
|
||||
|
@ -132,7 +132,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {
|
|||
measurement = split[0]
|
||||
field = split[1]
|
||||
|
||||
if opts.filterKey != "" && !strings.Contains(key, opts.filterKey) {
|
||||
if opts.filterKey != "" && !strings.Contains(string(key), opts.filterKey) {
|
||||
continue
|
||||
}
|
||||
fmt.Fprintln(tw, " "+strings.Join([]string{
|
||||
|
@ -160,7 +160,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {
|
|||
// Start at the beginning and read every block
|
||||
for j := 0; j < keyCount; j++ {
|
||||
key, _ := r.KeyAt(j)
|
||||
for _, e := range r.Entries(key) {
|
||||
for _, e := range r.Entries(string(key)) {
|
||||
|
||||
f.Seek(int64(e.Offset), 0)
|
||||
f.Read(b[:4])
|
||||
|
@ -172,7 +172,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {
|
|||
|
||||
blockSize += int64(e.Size)
|
||||
|
||||
if opts.filterKey != "" && !strings.Contains(key, opts.filterKey) {
|
||||
if opts.filterKey != "" && !strings.Contains(string(key), opts.filterKey) {
|
||||
i += blockSize
|
||||
blockCount++
|
||||
continue
|
||||
|
|
|
@ -83,7 +83,7 @@ type WritePointsRequest struct {
|
|||
// AddPoint adds a point to the WritePointRequest with field key 'value'
|
||||
func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string) {
|
||||
pt, err := models.NewPoint(
|
||||
name, tags, map[string]interface{}{"value": value}, timestamp,
|
||||
name, models.NewTags(tags), map[string]interface{}{"value": value}, timestamp,
|
||||
)
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -176,7 +176,7 @@ type WriteStatistics struct {
|
|||
func (w *PointsWriter) Statistics(tags map[string]string) []models.Statistic {
|
||||
return []models.Statistic{{
|
||||
Name: "write",
|
||||
Tags: tags,
|
||||
Tags: models.NewTags(tags),
|
||||
Values: map[string]interface{}{
|
||||
statWriteReq: atomic.LoadInt64(&w.stats.WriteReq),
|
||||
statPointWriteReq: atomic.LoadInt64(&w.stats.PointWriteReq),
|
||||
|
|
|
@ -818,7 +818,7 @@ func (e *StatementExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsSt
|
|||
if stmt.Module != "" && stat.Name != stmt.Module {
|
||||
continue
|
||||
}
|
||||
row := &models.Row{Name: stat.Name, Tags: stat.Tags}
|
||||
row := &models.Row{Name: stat.Name, Tags: stat.Tags.Map()}
|
||||
|
||||
values := make([]interface{}, 0, len(stat.Values))
|
||||
for _, k := range stat.ValueNames() {
|
||||
|
@ -1055,7 +1055,7 @@ func convertRowToPoints(measurementName string, row *models.Row) ([]models.Point
|
|||
}
|
||||
}
|
||||
|
||||
p, err := models.NewPoint(measurementName, row.Tags, vals, v[timeIndex].(time.Time))
|
||||
p, err := models.NewPoint(measurementName, models.NewTags(row.Tags), vals, v[timeIndex].(time.Time))
|
||||
if err != nil {
|
||||
// Drop points that can't be stored
|
||||
continue
|
||||
|
|
|
@ -148,7 +148,7 @@ type QueryStatistics struct {
|
|||
func (e *QueryExecutor) Statistics(tags map[string]string) []models.Statistic {
|
||||
return []models.Statistic{{
|
||||
Name: "queryExecutor",
|
||||
Tags: tags,
|
||||
Tags: models.NewTags(tags),
|
||||
Values: map[string]interface{}{
|
||||
statQueriesActive: atomic.LoadInt64(&e.stats.ActiveQueries),
|
||||
statQueriesExecuted: atomic.LoadInt64(&e.stats.ExecutedQueries),
|
||||
|
|
193
models/points.go
193
models/points.go
|
@ -139,14 +139,14 @@ func ParsePointsString(buf string) ([]Point, error) {
|
|||
}
|
||||
|
||||
// ParseKey returns the measurement name and tags from a point.
|
||||
func ParseKey(buf string) (string, Tags, error) {
|
||||
func ParseKey(buf []byte) (string, Tags, error) {
|
||||
// Ignore the error because scanMeasurement returns "missing fields" which we ignore
|
||||
// when just parsing a key
|
||||
state, i, _ := scanMeasurement([]byte(buf), 0)
|
||||
state, i, _ := scanMeasurement(buf, 0)
|
||||
|
||||
var tags Tags
|
||||
if state == tagKeyState {
|
||||
tags = parseTags([]byte(buf))
|
||||
tags = parseTags(buf)
|
||||
// scanMeasurement returns the location of the comma if there are tags, strip that off
|
||||
return string(buf[:i-1]), tags, nil
|
||||
}
|
||||
|
@ -1225,39 +1225,42 @@ func (p *point) Tags() Tags {
|
|||
}
|
||||
|
||||
func parseTags(buf []byte) Tags {
|
||||
tags := make(map[string]string, bytes.Count(buf, []byte(",")))
|
||||
if len(buf) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
pos, name := scanTo(buf, 0, ',')
|
||||
|
||||
// it's an empty key, so there are no tags
|
||||
if len(name) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
tags := make(Tags, 0, bytes.Count(buf, []byte(",")))
|
||||
hasEscape := bytes.IndexByte(buf, '\\') != -1
|
||||
|
||||
if len(buf) != 0 {
|
||||
pos, name := scanTo(buf, 0, ',')
|
||||
i := pos + 1
|
||||
var key, value []byte
|
||||
for {
|
||||
if i >= len(buf) {
|
||||
break
|
||||
}
|
||||
i, key = scanTo(buf, i, '=')
|
||||
i, value = scanTagValue(buf, i+1)
|
||||
|
||||
// it's an empyt key, so there are no tags
|
||||
if len(name) == 0 {
|
||||
return tags
|
||||
if len(value) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
i := pos + 1
|
||||
var key, value []byte
|
||||
for {
|
||||
if i >= len(buf) {
|
||||
break
|
||||
}
|
||||
i, key = scanTo(buf, i, '=')
|
||||
i, value = scanTagValue(buf, i+1)
|
||||
|
||||
if len(value) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if hasEscape {
|
||||
tags[string(unescapeTag(key))] = string(unescapeTag(value))
|
||||
} else {
|
||||
tags[string(key)] = string(value)
|
||||
}
|
||||
|
||||
i++
|
||||
if hasEscape {
|
||||
tags = append(tags, Tag{Key: unescapeTag(key), Value: unescapeTag(value)})
|
||||
} else {
|
||||
tags = append(tags, Tag{Key: key, Value: value})
|
||||
}
|
||||
|
||||
i++
|
||||
}
|
||||
|
||||
return tags
|
||||
}
|
||||
|
||||
|
@ -1276,7 +1279,8 @@ func (p *point) SetTags(tags Tags) {
|
|||
// AddTag adds or replaces a tag value for a point
|
||||
func (p *point) AddTag(key, value string) {
|
||||
tags := p.Tags()
|
||||
tags[key] = value
|
||||
tags = append(tags, Tag{Key: []byte(key), Value: []byte(value)})
|
||||
sort.Sort(tags)
|
||||
p.key = MakeKey([]byte(p.Name()), tags)
|
||||
}
|
||||
|
||||
|
@ -1386,64 +1390,137 @@ func (p *point) UnixNano() int64 {
|
|||
return p.Time().UnixNano()
|
||||
}
|
||||
|
||||
// Tags represents a mapping between a Point's tag names and their
|
||||
// values.
|
||||
type Tags map[string]string
|
||||
// Tag represents a single key/value tag pair.
|
||||
type Tag struct {
|
||||
Key []byte
|
||||
Value []byte
|
||||
}
|
||||
|
||||
// Tags represents a sorted list of tags.
|
||||
type Tags []Tag
|
||||
|
||||
// NewTags returns a new Tags from a map.
|
||||
func NewTags(m map[string]string) Tags {
|
||||
a := make(Tags, 0, len(m))
|
||||
for k, v := range m {
|
||||
a = append(a, Tag{Key: []byte(k), Value: []byte(v)})
|
||||
}
|
||||
sort.Sort(a)
|
||||
return a
|
||||
}
|
||||
|
||||
func (a Tags) Len() int { return len(a) }
|
||||
func (a Tags) Less(i, j int) bool { return bytes.Compare(a[i].Key, a[j].Key) == -1 }
|
||||
func (a Tags) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
||||
// Get returns the value for a key.
|
||||
func (a Tags) Get(key []byte) []byte {
|
||||
// OPTIMIZE: Use sort.Search if tagset is large.
|
||||
|
||||
for _, t := range a {
|
||||
if bytes.Equal(t.Key, key) {
|
||||
return t.Value
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetString returns the string value for a string key.
|
||||
func (a Tags) GetString(key string) string {
|
||||
return string(a.Get([]byte(key)))
|
||||
}
|
||||
|
||||
// Set sets the value for a key.
|
||||
func (a *Tags) Set(key, value []byte) {
|
||||
for _, t := range *a {
|
||||
if bytes.Equal(t.Key, key) {
|
||||
t.Value = value
|
||||
return
|
||||
}
|
||||
}
|
||||
*a = append(*a, Tag{Key: key, Value: value})
|
||||
sort.Sort(*a)
|
||||
}
|
||||
|
||||
// SetString sets the string value for a string key.
|
||||
func (a *Tags) SetString(key, value string) {
|
||||
a.Set([]byte(key), []byte(value))
|
||||
}
|
||||
|
||||
// Delete removes a tag by key.
|
||||
func (a *Tags) Delete(key []byte) {
|
||||
for i, t := range *a {
|
||||
if bytes.Equal(t.Key, key) {
|
||||
copy((*a)[i:], (*a)[i+1:])
|
||||
(*a)[len(*a)-1] = Tag{}
|
||||
*a = (*a)[:len(*a)-1]
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Map returns a map representation of the tags.
|
||||
func (a Tags) Map() map[string]string {
|
||||
m := make(map[string]string, len(a))
|
||||
for _, t := range a {
|
||||
m[string(t.Key)] = string(t.Value)
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// Merge merges the tags combining the two. If both define a tag with the
|
||||
// same key, the merged value overwrites the old value.
|
||||
// A new map is returned.
|
||||
func (t Tags) Merge(other map[string]string) Tags {
|
||||
merged := make(map[string]string, len(t)+len(other))
|
||||
for k, v := range t {
|
||||
merged[k] = v
|
||||
func (a Tags) Merge(other map[string]string) Tags {
|
||||
merged := make(map[string]string, len(a)+len(other))
|
||||
for _, t := range a {
|
||||
merged[string(t.Key)] = string(t.Value)
|
||||
}
|
||||
for k, v := range other {
|
||||
merged[k] = v
|
||||
}
|
||||
return Tags(merged)
|
||||
return NewTags(merged)
|
||||
}
|
||||
|
||||
// HashKey hashes all of a tag's keys.
|
||||
func (t Tags) HashKey() []byte {
|
||||
func (a Tags) HashKey() []byte {
|
||||
// Empty maps marshal to empty bytes.
|
||||
if len(t) == 0 {
|
||||
if len(a) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
escaped := Tags{}
|
||||
for k, v := range t {
|
||||
ek := escapeTag([]byte(k))
|
||||
ev := escapeTag([]byte(v))
|
||||
escaped := make(Tags, 0, len(a))
|
||||
for _, t := range a {
|
||||
ek := escapeTag(t.Key)
|
||||
ev := escapeTag(t.Value)
|
||||
|
||||
if len(ev) > 0 {
|
||||
escaped[string(ek)] = string(ev)
|
||||
escaped = append(escaped, Tag{Key: ek, Value: ev})
|
||||
}
|
||||
}
|
||||
|
||||
// Extract keys and determine final size.
|
||||
sz := len(escaped) + (len(escaped) * 2) // separators
|
||||
keys := make([]string, len(escaped)+1)
|
||||
i := 0
|
||||
for k, v := range escaped {
|
||||
keys[i] = k
|
||||
i++
|
||||
sz += len(k) + len(v)
|
||||
keys := make([][]byte, len(escaped)+1)
|
||||
for i, t := range escaped {
|
||||
keys[i] = t.Key
|
||||
sz += len(t.Key) + len(t.Value)
|
||||
}
|
||||
keys = keys[:i]
|
||||
sort.Strings(keys)
|
||||
keys = keys[:len(escaped)]
|
||||
sort.Sort(byteSlices(keys))
|
||||
|
||||
// Generate marshaled bytes.
|
||||
b := make([]byte, sz)
|
||||
buf := b
|
||||
idx := 0
|
||||
for _, k := range keys {
|
||||
for i, k := range keys {
|
||||
buf[idx] = ','
|
||||
idx++
|
||||
copy(buf[idx:idx+len(k)], k)
|
||||
idx += len(k)
|
||||
buf[idx] = '='
|
||||
idx++
|
||||
v := escaped[k]
|
||||
v := escaped[i].Value
|
||||
copy(buf[idx:idx+len(v)], v)
|
||||
idx += len(v)
|
||||
}
|
||||
|
@ -1594,3 +1671,9 @@ func (p Fields) MarshalBinary() []byte {
|
|||
}
|
||||
return b
|
||||
}
|
||||
|
||||
type byteSlices [][]byte
|
||||
|
||||
func (a byteSlices) Len() int { return len(a) }
|
||||
func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }
|
||||
func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
|
|
@ -15,7 +15,7 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
tags = models.Tags{"foo": "bar", "apple": "orange", "host": "serverA", "region": "uswest"}
|
||||
tags = models.NewTags(map[string]string{"foo": "bar", "apple": "orange", "host": "serverA", "region": "uswest"})
|
||||
maxFloat64 = strconv.FormatFloat(math.MaxFloat64, 'f', 1, 64)
|
||||
minFloat64 = strconv.FormatFloat(-math.MaxFloat64, 'f', 1, 64)
|
||||
)
|
||||
|
@ -115,7 +115,7 @@ func BenchmarkParsePointsTagsUnSorted10(b *testing.B) {
|
|||
func BenchmarkParseKey(b *testing.B) {
|
||||
line := `cpu,region=us-west,host=serverA,env=prod,target=servers,zone=1c,tag1=value1,tag2=value2,tag3=value3,tag4=value4,tag5=value5`
|
||||
for i := 0; i < b.N; i++ {
|
||||
models.ParseKey(line)
|
||||
models.ParseKey([]byte(line))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -163,9 +163,9 @@ func test(t *testing.T, line string, point TestPoint) {
|
|||
t.Errorf(`ParsePoints("%s") tags mismatch. got %v, exp %v`, line, pts[0].Tags(), exp)
|
||||
}
|
||||
|
||||
for tag, value := range pts[0].Tags() {
|
||||
if value != point.RawTags[tag] {
|
||||
t.Errorf(`ParsePoints("%s") tags mismatch. got %v, exp %v`, line, value, point.RawTags[tag])
|
||||
for _, tag := range pts[0].Tags() {
|
||||
if !bytes.Equal(tag.Value, point.RawTags.Get(tag.Key)) {
|
||||
t.Errorf(`ParsePoints("%s") tags mismatch. got %s, exp %s`, line, tag.Value, point.RawTags.Get(tag.Key))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -639,7 +639,7 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
test(t, `foo\,bar value=1i`,
|
||||
NewTestPoint(
|
||||
"foo,bar", // comma in the name
|
||||
models.Tags{},
|
||||
models.NewTags(map[string]string{}),
|
||||
models.Fields{
|
||||
"value": int64(1),
|
||||
},
|
||||
|
@ -649,9 +649,9 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
test(t, `cpu\,main,regions=east value=1.0`,
|
||||
NewTestPoint(
|
||||
"cpu,main", // comma in the name
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"regions": "east",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -661,9 +661,9 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
test(t, `cpu\ load,region=east value=1.0`,
|
||||
NewTestPoint(
|
||||
"cpu load", // space in the name
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"region": "east",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -673,9 +673,9 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
test(t, `cpu\=load,region=east value=1.0`,
|
||||
NewTestPoint(
|
||||
`cpu\=load`, // backslash is literal
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"region": "east",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -685,9 +685,9 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
test(t, `cpu=load,region=east value=1.0`,
|
||||
NewTestPoint(
|
||||
`cpu=load`, // literal equals is fine in measurement name
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"region": "east",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -696,9 +696,9 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
// commas in tag names
|
||||
test(t, `cpu,region\,zone=east value=1.0`,
|
||||
NewTestPoint("cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"region,zone": "east", // comma in the tag key
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -707,9 +707,9 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
// spaces in tag name
|
||||
test(t, `cpu,region\ zone=east value=1.0`,
|
||||
NewTestPoint("cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"region zone": "east", // space in the tag name
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -718,9 +718,9 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
// backslash with escaped equals in tag name
|
||||
test(t, `cpu,reg\\=ion=east value=1.0`,
|
||||
NewTestPoint("cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
`reg\=ion`: "east",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -729,9 +729,9 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
// space is tag name
|
||||
test(t, `cpu,\ =east value=1.0`,
|
||||
NewTestPoint("cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
" ": "east", // tag name is single space
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -740,9 +740,9 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
// commas in tag values
|
||||
test(t, `cpu,regions=east\,west value=1.0`,
|
||||
NewTestPoint("cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"regions": "east,west", // comma in the tag value
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -752,9 +752,9 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
test(t, `cpu,regions=\\ east value=1.0`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"regions": `\ east`,
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -764,9 +764,9 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
test(t, `cpu,regions=eas\\ t value=1.0`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"regions": `eas\ t`,
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -776,9 +776,9 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
test(t, `cpu,regions=east\\ value=1.0`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"regions": `east\ `,
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -787,9 +787,9 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
// spaces in tag values
|
||||
test(t, `cpu,regions=east\ west value=1.0`,
|
||||
NewTestPoint("cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"regions": "east west", // comma in the tag value
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -798,9 +798,9 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
// commas in field keys
|
||||
test(t, `cpu,regions=east value\,ms=1.0`,
|
||||
NewTestPoint("cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"regions": "east",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value,ms": 1.0, // comma in the field keys
|
||||
},
|
||||
|
@ -809,9 +809,9 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
// spaces in field keys
|
||||
test(t, `cpu,regions=east value\ ms=1.0`,
|
||||
NewTestPoint("cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"regions": "east",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value ms": 1.0, // comma in the field keys
|
||||
},
|
||||
|
@ -820,10 +820,10 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
// tag with no value
|
||||
test(t, `cpu,regions=east value="1"`,
|
||||
NewTestPoint("cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"regions": "east",
|
||||
"foobar": "",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": "1",
|
||||
},
|
||||
|
@ -832,9 +832,9 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
// commas in field values
|
||||
test(t, `cpu,regions=east value="1,0"`,
|
||||
NewTestPoint("cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"regions": "east",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": "1,0", // comma in the field value
|
||||
},
|
||||
|
@ -844,9 +844,9 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
test(t, `cpu,regions=eas\t value=1.0`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"regions": "eas\\t",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -856,9 +856,9 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
test(t, `cpu,regions=\\,\,\=east value=1.0`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"regions": `\,,=east`,
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -868,7 +868,7 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
test(t, `cpu \a=1i`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{},
|
||||
models.NewTags(map[string]string{}),
|
||||
models.Fields{
|
||||
"\\a": int64(1), // Left as parsed since it's not a known escape sequence.
|
||||
},
|
||||
|
@ -878,9 +878,9 @@ func TestParsePointUnescape(t *testing.T) {
|
|||
test(t, `cpu=load,equals\=foo=tag\=value value=1i`,
|
||||
NewTestPoint(
|
||||
"cpu=load", // Not escaped
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"equals=foo": "tag=value", // Tag and value unescaped
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": int64(1),
|
||||
},
|
||||
|
@ -892,7 +892,7 @@ func TestParsePointWithTags(t *testing.T) {
|
|||
test(t,
|
||||
"cpu,host=serverA,region=us-east value=1.0 1000000000",
|
||||
NewTestPoint("cpu",
|
||||
models.Tags{"host": "serverA", "region": "us-east"},
|
||||
models.NewTags(map[string]string{"host": "serverA", "region": "us-east"}),
|
||||
models.Fields{"value": 1.0}, time.Unix(1, 0)))
|
||||
}
|
||||
|
||||
|
@ -924,10 +924,10 @@ func TestParsePointWithDuplicateTags(t *testing.T) {
|
|||
func TestParsePointWithStringField(t *testing.T) {
|
||||
test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo",str2="bar" 1000000000`,
|
||||
NewTestPoint("cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"host": "serverA",
|
||||
"region": "us-east",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
"str": "foo",
|
||||
|
@ -938,10 +938,10 @@ func TestParsePointWithStringField(t *testing.T) {
|
|||
|
||||
test(t, `cpu,host=serverA,region=us-east str="foo \" bar" 1000000000`,
|
||||
NewTestPoint("cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"host": "serverA",
|
||||
"region": "us-east",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"str": `foo " bar`,
|
||||
},
|
||||
|
@ -954,10 +954,10 @@ func TestParsePointWithStringWithSpaces(t *testing.T) {
|
|||
test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo bar" 1000000000`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"host": "serverA",
|
||||
"region": "us-east",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
"str": "foo bar", // spaces in string value
|
||||
|
@ -970,10 +970,10 @@ func TestParsePointWithStringWithNewline(t *testing.T) {
|
|||
test(t, "cpu,host=serverA,region=us-east value=1.0,str=\"foo\nbar\" 1000000000",
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"host": "serverA",
|
||||
"region": "us-east",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
"str": "foo\nbar", // newline in string value
|
||||
|
@ -987,10 +987,10 @@ func TestParsePointWithStringWithCommas(t *testing.T) {
|
|||
test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo\,bar" 1000000000`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"host": "serverA",
|
||||
"region": "us-east",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
"str": `foo\,bar`, // commas in string value
|
||||
|
@ -1002,10 +1002,10 @@ func TestParsePointWithStringWithCommas(t *testing.T) {
|
|||
test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo,bar" 1000000000`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"host": "serverA",
|
||||
"region": "us-east",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
"str": "foo,bar", // commas in string value
|
||||
|
@ -1019,10 +1019,10 @@ func TestParsePointQuotedMeasurement(t *testing.T) {
|
|||
test(t, `"cpu",host=serverA,region=us-east value=1.0 1000000000`,
|
||||
NewTestPoint(
|
||||
`"cpu"`,
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"host": "serverA",
|
||||
"region": "us-east",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -1034,10 +1034,10 @@ func TestParsePointQuotedTags(t *testing.T) {
|
|||
test(t, `cpu,"host"="serverA",region=us-east value=1.0 1000000000`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
`"host"`: `"serverA"`,
|
||||
"region": "us-east",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -1056,7 +1056,7 @@ func TestParsePointsUnbalancedQuotedTags(t *testing.T) {
|
|||
}
|
||||
|
||||
// Expected " in the tag value
|
||||
exp := models.MustNewPoint("baz", models.Tags{"mytag": `"a`},
|
||||
exp := models.MustNewPoint("baz", models.NewTags(map[string]string{"mytag": `"a`}),
|
||||
models.Fields{"x": float64(1)}, time.Unix(0, 1441103862125))
|
||||
|
||||
if pts[0].String() != exp.String() {
|
||||
|
@ -1064,7 +1064,7 @@ func TestParsePointsUnbalancedQuotedTags(t *testing.T) {
|
|||
}
|
||||
|
||||
// Expected two points to ensure we did not overscan the line
|
||||
exp = models.MustNewPoint("baz", models.Tags{"mytag": `a`},
|
||||
exp = models.MustNewPoint("baz", models.NewTags(map[string]string{"mytag": `a`}),
|
||||
models.Fields{"z": float64(1)}, time.Unix(0, 1441103862126))
|
||||
|
||||
if pts[1].String() != exp.String() {
|
||||
|
@ -1078,10 +1078,10 @@ func TestParsePointEscapedStringsAndCommas(t *testing.T) {
|
|||
test(t, `cpu,host=serverA,region=us-east value="{Hello\"{,}\" World}" 1000000000`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"host": "serverA",
|
||||
"region": "us-east",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": `{Hello"{,}" World}`,
|
||||
},
|
||||
|
@ -1092,10 +1092,10 @@ func TestParsePointEscapedStringsAndCommas(t *testing.T) {
|
|||
test(t, `cpu,host=serverA,region=us-east value="{Hello\"{\,}\" World}" 1000000000`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"host": "serverA",
|
||||
"region": "us-east",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": `{Hello"{\,}" World}`,
|
||||
},
|
||||
|
@ -1107,10 +1107,10 @@ func TestParsePointWithStringWithEquals(t *testing.T) {
|
|||
test(t, `cpu,host=serverA,region=us-east str="foo=bar",value=1.0 1000000000`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"host": "serverA",
|
||||
"region": "us-east",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
"str": "foo=bar", // spaces in string value
|
||||
|
@ -1123,7 +1123,7 @@ func TestParsePointWithStringWithBackslash(t *testing.T) {
|
|||
test(t, `cpu value="test\\\"" 1000000000`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{},
|
||||
models.NewTags(map[string]string{}),
|
||||
models.Fields{
|
||||
"value": `test\"`,
|
||||
},
|
||||
|
@ -1133,7 +1133,7 @@ func TestParsePointWithStringWithBackslash(t *testing.T) {
|
|||
test(t, `cpu value="test\\" 1000000000`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{},
|
||||
models.NewTags(map[string]string{}),
|
||||
models.Fields{
|
||||
"value": `test\`,
|
||||
},
|
||||
|
@ -1143,7 +1143,7 @@ func TestParsePointWithStringWithBackslash(t *testing.T) {
|
|||
test(t, `cpu value="test\\\"" 1000000000`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{},
|
||||
models.NewTags(map[string]string{}),
|
||||
models.Fields{
|
||||
"value": `test\"`,
|
||||
},
|
||||
|
@ -1153,7 +1153,7 @@ func TestParsePointWithStringWithBackslash(t *testing.T) {
|
|||
test(t, `cpu value="test\"" 1000000000`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{},
|
||||
models.NewTags(map[string]string{}),
|
||||
models.Fields{
|
||||
"value": `test"`,
|
||||
},
|
||||
|
@ -1165,10 +1165,10 @@ func TestParsePointWithBoolField(t *testing.T) {
|
|||
test(t, `cpu,host=serverA,region=us-east true=true,t=t,T=T,TRUE=TRUE,True=True,false=false,f=f,F=F,FALSE=FALSE,False=False 1000000000`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"host": "serverA",
|
||||
"region": "us-east",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"t": true,
|
||||
"T": true,
|
||||
|
@ -1189,10 +1189,10 @@ func TestParsePointUnicodeString(t *testing.T) {
|
|||
test(t, `cpu,host=serverA,region=us-east value="wè" 1000000000`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{
|
||||
models.NewTags(map[string]string{
|
||||
"host": "serverA",
|
||||
"region": "us-east",
|
||||
},
|
||||
}),
|
||||
models.Fields{
|
||||
"value": "wè",
|
||||
},
|
||||
|
@ -1204,7 +1204,7 @@ func TestParsePointNegativeTimestamp(t *testing.T) {
|
|||
test(t, `cpu value=1 -1`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{},
|
||||
models.NewTags(map[string]string{}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -1216,7 +1216,7 @@ func TestParsePointMaxTimestamp(t *testing.T) {
|
|||
test(t, fmt.Sprintf(`cpu value=1 %d`, models.MaxNanoTime),
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{},
|
||||
models.NewTags(map[string]string{}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -1228,7 +1228,7 @@ func TestParsePointMinTimestamp(t *testing.T) {
|
|||
test(t, `cpu value=1 -9223372036854775808`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{},
|
||||
models.NewTags(map[string]string{}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -1259,7 +1259,7 @@ func TestNewPointFloatWithoutDecimal(t *testing.T) {
|
|||
test(t, `cpu value=1 1000000000`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{},
|
||||
models.NewTags(map[string]string{}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -1270,7 +1270,7 @@ func TestNewPointNegativeFloat(t *testing.T) {
|
|||
test(t, `cpu value=-0.64 1000000000`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{},
|
||||
models.NewTags(map[string]string{}),
|
||||
models.Fields{
|
||||
"value": -0.64,
|
||||
},
|
||||
|
@ -1282,7 +1282,7 @@ func TestNewPointFloatNoDecimal(t *testing.T) {
|
|||
test(t, `cpu value=1. 1000000000`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{},
|
||||
models.NewTags(map[string]string{}),
|
||||
models.Fields{
|
||||
"value": 1.0,
|
||||
},
|
||||
|
@ -1294,7 +1294,7 @@ func TestNewPointFloatScientific(t *testing.T) {
|
|||
test(t, `cpu value=6.632243e+06 1000000000`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{},
|
||||
models.NewTags(map[string]string{}),
|
||||
models.Fields{
|
||||
"value": float64(6632243),
|
||||
},
|
||||
|
@ -1306,7 +1306,7 @@ func TestNewPointLargeInteger(t *testing.T) {
|
|||
test(t, `cpu value=6632243i 1000000000`,
|
||||
NewTestPoint(
|
||||
"cpu",
|
||||
models.Tags{},
|
||||
models.NewTags(map[string]string{}),
|
||||
models.Fields{
|
||||
"value": int64(6632243), // if incorrectly encoded as a float, it would show up as 6.632243e+06
|
||||
},
|
||||
|
@ -1403,7 +1403,7 @@ func TestParsePointToString(t *testing.T) {
|
|||
t.Errorf("ParsePoint() to string mismatch:\n got %v\n exp %v", got, line)
|
||||
}
|
||||
|
||||
pt = models.MustNewPoint("cpu", models.Tags{"host": "serverA", "region": "us-east"},
|
||||
pt = models.MustNewPoint("cpu", models.NewTags(map[string]string{"host": "serverA", "region": "us-east"}),
|
||||
models.Fields{"int": 10, "float": float64(11.0), "float2": float64(12.123), "bool": false, "str": "string val"},
|
||||
time.Unix(1, 0))
|
||||
|
||||
|
@ -1600,26 +1600,26 @@ cpu,host=serverA,region=us-east value=1.0 946730096789012345`,
|
|||
|
||||
func TestNewPointEscaped(t *testing.T) {
|
||||
// commas
|
||||
pt := models.MustNewPoint("cpu,main", models.Tags{"tag,bar": "value"}, models.Fields{"name,bar": 1.0}, time.Unix(0, 0))
|
||||
pt := models.MustNewPoint("cpu,main", models.NewTags(map[string]string{"tag,bar": "value"}), models.Fields{"name,bar": 1.0}, time.Unix(0, 0))
|
||||
if exp := `cpu\,main,tag\,bar=value name\,bar=1 0`; pt.String() != exp {
|
||||
t.Errorf("NewPoint().String() mismatch.\ngot %v\nexp %v", pt.String(), exp)
|
||||
}
|
||||
|
||||
// spaces
|
||||
pt = models.MustNewPoint("cpu main", models.Tags{"tag bar": "value"}, models.Fields{"name bar": 1.0}, time.Unix(0, 0))
|
||||
pt = models.MustNewPoint("cpu main", models.NewTags(map[string]string{"tag bar": "value"}), models.Fields{"name bar": 1.0}, time.Unix(0, 0))
|
||||
if exp := `cpu\ main,tag\ bar=value name\ bar=1 0`; pt.String() != exp {
|
||||
t.Errorf("NewPoint().String() mismatch.\ngot %v\nexp %v", pt.String(), exp)
|
||||
}
|
||||
|
||||
// equals
|
||||
pt = models.MustNewPoint("cpu=main", models.Tags{"tag=bar": "value=foo"}, models.Fields{"name=bar": 1.0}, time.Unix(0, 0))
|
||||
pt = models.MustNewPoint("cpu=main", models.NewTags(map[string]string{"tag=bar": "value=foo"}), models.Fields{"name=bar": 1.0}, time.Unix(0, 0))
|
||||
if exp := `cpu=main,tag\=bar=value\=foo name\=bar=1 0`; pt.String() != exp {
|
||||
t.Errorf("NewPoint().String() mismatch.\ngot %v\nexp %v", pt.String(), exp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewPointWithoutField(t *testing.T) {
|
||||
_, err := models.NewPoint("cpu", models.Tags{"tag": "bar"}, models.Fields{}, time.Unix(0, 0))
|
||||
_, err := models.NewPoint("cpu", models.NewTags(map[string]string{"tag": "bar"}), models.Fields{}, time.Unix(0, 0))
|
||||
if err == nil {
|
||||
t.Fatalf(`NewPoint() expected error. got nil`)
|
||||
}
|
||||
|
@ -1645,19 +1645,19 @@ func TestNewPointUnhandledType(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestMakeKeyEscaped(t *testing.T) {
|
||||
if exp, got := `cpu\ load`, models.MakeKey([]byte(`cpu\ load`), models.Tags{}); string(got) != exp {
|
||||
if exp, got := `cpu\ load`, models.MakeKey([]byte(`cpu\ load`), models.NewTags(map[string]string{})); string(got) != exp {
|
||||
t.Errorf("MakeKey() mismatch.\ngot %v\nexp %v", got, exp)
|
||||
}
|
||||
|
||||
if exp, got := `cpu\ load`, models.MakeKey([]byte(`cpu load`), models.Tags{}); string(got) != exp {
|
||||
if exp, got := `cpu\ load`, models.MakeKey([]byte(`cpu load`), models.NewTags(map[string]string{})); string(got) != exp {
|
||||
t.Errorf("MakeKey() mismatch.\ngot %v\nexp %v", got, exp)
|
||||
}
|
||||
|
||||
if exp, got := `cpu\,load`, models.MakeKey([]byte(`cpu\,load`), models.Tags{}); string(got) != exp {
|
||||
if exp, got := `cpu\,load`, models.MakeKey([]byte(`cpu\,load`), models.NewTags(map[string]string{})); string(got) != exp {
|
||||
t.Errorf("MakeKey() mismatch.\ngot %v\nexp %v", got, exp)
|
||||
}
|
||||
|
||||
if exp, got := `cpu\,load`, models.MakeKey([]byte(`cpu,load`), models.Tags{}); string(got) != exp {
|
||||
if exp, got := `cpu\,load`, models.MakeKey([]byte(`cpu,load`), models.NewTags(map[string]string{})); string(got) != exp {
|
||||
t.Errorf("MakeKey() mismatch.\ngot %v\nexp %v", got, exp)
|
||||
}
|
||||
|
||||
|
@ -1866,7 +1866,7 @@ func TestNewPointsRejectsMaxKey(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestParseKeyEmpty(t *testing.T) {
|
||||
if _, _, err := models.ParseKey(""); err != nil {
|
||||
if _, _, err := models.ParseKey(nil); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -212,14 +212,13 @@ func (m *Monitor) Statistics(tags map[string]string) ([]*Statistic, error) {
|
|||
|
||||
statistic := &Statistic{
|
||||
Statistic: models.Statistic{
|
||||
Tags: make(map[string]string),
|
||||
Values: make(map[string]interface{}),
|
||||
},
|
||||
}
|
||||
|
||||
// Add any supplied tags.
|
||||
for k, v := range tags {
|
||||
statistic.Tags[k] = v
|
||||
statistic.Tags.SetString(k, v)
|
||||
}
|
||||
|
||||
// Every other top-level expvar value is a map.
|
||||
|
@ -242,7 +241,7 @@ func (m *Monitor) Statistics(tags map[string]string) ([]*Statistic, error) {
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
statistic.Tags[t.Key] = u
|
||||
statistic.Tags.SetString(t.Key, u)
|
||||
})
|
||||
case "values":
|
||||
// string-interface map.
|
||||
|
@ -281,14 +280,13 @@ func (m *Monitor) Statistics(tags map[string]string) ([]*Statistic, error) {
|
|||
statistic := &Statistic{
|
||||
Statistic: models.Statistic{
|
||||
Name: "runtime",
|
||||
Tags: make(map[string]string),
|
||||
Values: make(map[string]interface{}),
|
||||
},
|
||||
}
|
||||
|
||||
// Add any supplied tags to Go memstats
|
||||
for k, v := range tags {
|
||||
statistic.Tags[k] = v
|
||||
statistic.Tags.SetString(k, v)
|
||||
}
|
||||
|
||||
var rt runtime.MemStats
|
||||
|
|
|
@ -70,7 +70,7 @@ func NewService(c Config) *Service {
|
|||
Logger: log.New(os.Stderr, "[collectd] ", log.LstdFlags),
|
||||
err: make(chan error),
|
||||
stats: &Statistics{},
|
||||
statTags: map[string]string{"bind": c.BindAddress},
|
||||
statTags: models.NewTags(map[string]string{"bind": c.BindAddress}),
|
||||
}
|
||||
|
||||
return &s
|
||||
|
@ -360,8 +360,9 @@ func (s *Service) UnmarshalCollectd(packet *gollectd.Packet) []models.Point {
|
|||
if packet.TypeInstance != "" {
|
||||
tags["type_instance"] = packet.TypeInstance
|
||||
}
|
||||
p, err := models.NewPoint(name, tags, fields, timestamp)
|
||||
|
||||
// Drop invalid points
|
||||
p, err := models.NewPoint(name, models.NewTags(tags), fields, timestamp)
|
||||
if err != nil {
|
||||
s.Logger.Printf("Dropping point %v: %v", name, err)
|
||||
atomic.AddInt64(&s.stats.InvalidDroppedPoints, 1)
|
||||
|
|
|
@ -147,7 +147,7 @@ type Statistics struct {
|
|||
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
|
||||
return []models.Statistic{{
|
||||
Name: "cq",
|
||||
Tags: tags,
|
||||
Tags: models.NewTags(tags),
|
||||
Values: map[string]interface{}{
|
||||
statQueryOK: atomic.LoadInt64(&s.stats.QueryOK),
|
||||
statQueryFail: atomic.LoadInt64(&s.stats.QueryFail),
|
||||
|
|
|
@ -116,12 +116,12 @@ func (c *Config) WithDefaults() *Config {
|
|||
|
||||
// DefaultTags returns the config's tags.
|
||||
func (c *Config) DefaultTags() models.Tags {
|
||||
tags := models.Tags{}
|
||||
m := make(map[string]string, len(c.Tags))
|
||||
for _, t := range c.Tags {
|
||||
parts := strings.Split(t, "=")
|
||||
tags[parts[0]] = parts[1]
|
||||
m[parts[0]] = parts[1]
|
||||
}
|
||||
return tags
|
||||
return models.NewTags(m)
|
||||
}
|
||||
|
||||
// Validate validates the config's templates and tags.
|
||||
|
|
|
@ -64,12 +64,12 @@ func NewParserWithOptions(options Options) (*Parser, error) {
|
|||
}
|
||||
|
||||
// Parse out the default tags specific to this template
|
||||
tags := models.Tags{}
|
||||
var tags models.Tags
|
||||
if strings.Contains(parts[len(parts)-1], "=") {
|
||||
tagStrs := strings.Split(parts[len(parts)-1], ",")
|
||||
for _, kv := range tagStrs {
|
||||
parts := strings.Split(kv, "=")
|
||||
tags[parts[0]] = parts[1]
|
||||
tags.SetString(parts[0], parts[1])
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -151,12 +151,12 @@ func (p *Parser) Parse(line string) (models.Point, error) {
|
|||
}
|
||||
|
||||
// Set the default tags on the point if they are not already set
|
||||
for k, v := range p.tags {
|
||||
if _, ok := tags[k]; !ok {
|
||||
tags[k] = v
|
||||
for _, t := range p.tags {
|
||||
if _, ok := tags[string(t.Key)]; !ok {
|
||||
tags[string(t.Key)] = string(t.Value)
|
||||
}
|
||||
}
|
||||
return models.NewPoint(measurement, tags, fieldValues, timestamp)
|
||||
return models.NewPoint(measurement, models.NewTags(tags), fieldValues, timestamp)
|
||||
}
|
||||
|
||||
// ApplyTemplate extracts the template fields from the given line and
|
||||
|
@ -171,9 +171,9 @@ func (p *Parser) ApplyTemplate(line string) (string, map[string]string, string,
|
|||
template := p.matcher.Match(fields[0])
|
||||
name, tags, field, err := template.Apply(fields[0])
|
||||
// Set the default tags on the point if they are not already set
|
||||
for k, v := range p.tags {
|
||||
if _, ok := tags[k]; !ok {
|
||||
tags[k] = v
|
||||
for _, t := range p.tags {
|
||||
if _, ok := tags[string(t.Key)]; !ok {
|
||||
tags[string(t.Key)] = string(t.Value)
|
||||
}
|
||||
}
|
||||
return name, tags, field, err
|
||||
|
@ -223,8 +223,8 @@ func (t *template) Apply(line string) (string, map[string]string, string, error)
|
|||
)
|
||||
|
||||
// Set any default tags
|
||||
for k, v := range t.defaultTags {
|
||||
tags[k] = append(tags[k], v)
|
||||
for _, t := range t.defaultTags {
|
||||
tags[string(t.Key)] = append(tags[string(t.Key)], string(t.Value))
|
||||
}
|
||||
|
||||
// See if an invalid combination has been specified in the template:
|
||||
|
|
|
@ -261,7 +261,7 @@ func TestFilterMatchDefault(t *testing.T) {
|
|||
}
|
||||
|
||||
exp := models.MustNewPoint("miss.servers.localhost.cpu_load",
|
||||
models.Tags{},
|
||||
models.NewTags(map[string]string{}),
|
||||
models.Fields{"value": float64(11)},
|
||||
time.Unix(1435077219, 0))
|
||||
|
||||
|
@ -282,7 +282,7 @@ func TestFilterMatchMultipleMeasurement(t *testing.T) {
|
|||
}
|
||||
|
||||
exp := models.MustNewPoint("cpu.cpu_load.10",
|
||||
models.Tags{"host": "localhost"},
|
||||
models.NewTags(map[string]string{"host": "localhost"}),
|
||||
models.Fields{"value": float64(11)},
|
||||
time.Unix(1435077219, 0))
|
||||
|
||||
|
@ -306,7 +306,7 @@ func TestFilterMatchMultipleMeasurementSeparator(t *testing.T) {
|
|||
}
|
||||
|
||||
exp := models.MustNewPoint("cpu_cpu_load_10",
|
||||
models.Tags{"host": "localhost"},
|
||||
models.NewTags(map[string]string{"host": "localhost"}),
|
||||
models.Fields{"value": float64(11)},
|
||||
time.Unix(1435077219, 0))
|
||||
|
||||
|
@ -327,7 +327,7 @@ func TestFilterMatchSingle(t *testing.T) {
|
|||
}
|
||||
|
||||
exp := models.MustNewPoint("cpu_load",
|
||||
models.Tags{"host": "localhost"},
|
||||
models.NewTags(map[string]string{"host": "localhost"}),
|
||||
models.Fields{"value": float64(11)},
|
||||
time.Unix(1435077219, 0))
|
||||
|
||||
|
@ -348,7 +348,7 @@ func TestParseNoMatch(t *testing.T) {
|
|||
}
|
||||
|
||||
exp := models.MustNewPoint("servers.localhost.memory.VmallocChunk",
|
||||
models.Tags{},
|
||||
models.NewTags(map[string]string{}),
|
||||
models.Fields{"value": float64(11)},
|
||||
time.Unix(1435077219, 0))
|
||||
|
||||
|
@ -369,7 +369,7 @@ func TestFilterMatchWildcard(t *testing.T) {
|
|||
}
|
||||
|
||||
exp := models.MustNewPoint("cpu_load",
|
||||
models.Tags{"host": "localhost"},
|
||||
models.NewTags(map[string]string{"host": "localhost"}),
|
||||
models.Fields{"value": float64(11)},
|
||||
time.Unix(1435077219, 0))
|
||||
|
||||
|
@ -392,7 +392,7 @@ func TestFilterMatchExactBeforeWildcard(t *testing.T) {
|
|||
}
|
||||
|
||||
exp := models.MustNewPoint("cpu_load",
|
||||
models.Tags{"host": "localhost"},
|
||||
models.NewTags(map[string]string{"host": "localhost"}),
|
||||
models.Fields{"value": float64(11)},
|
||||
time.Unix(1435077219, 0))
|
||||
|
||||
|
@ -420,7 +420,7 @@ func TestFilterMatchMostLongestFilter(t *testing.T) {
|
|||
}
|
||||
|
||||
exp := models.MustNewPoint("cpu_load",
|
||||
models.Tags{"host": "localhost", "resource": "cpu"},
|
||||
models.NewTags(map[string]string{"host": "localhost", "resource": "cpu"}),
|
||||
models.Fields{"value": float64(11)},
|
||||
time.Unix(1435077219, 0))
|
||||
|
||||
|
@ -447,7 +447,7 @@ func TestFilterMatchMultipleWildcards(t *testing.T) {
|
|||
}
|
||||
|
||||
exp := models.MustNewPoint("cpu_load",
|
||||
models.Tags{"host": "server01"},
|
||||
models.NewTags(map[string]string{"host": "server01"}),
|
||||
models.Fields{"value": float64(11)},
|
||||
time.Unix(1435077219, 0))
|
||||
|
||||
|
@ -462,17 +462,17 @@ func TestFilterMatchMultipleWildcards(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestParseDefaultTags(t *testing.T) {
|
||||
p, err := graphite.NewParser([]string{"servers.localhost .host.measurement*"}, models.Tags{
|
||||
p, err := graphite.NewParser([]string{"servers.localhost .host.measurement*"}, models.NewTags(map[string]string{
|
||||
"region": "us-east",
|
||||
"zone": "1c",
|
||||
"host": "should not set",
|
||||
})
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating parser, got %v", err)
|
||||
}
|
||||
|
||||
exp := models.MustNewPoint("cpu_load",
|
||||
models.Tags{"host": "localhost", "region": "us-east", "zone": "1c"},
|
||||
models.NewTags(map[string]string{"host": "localhost", "region": "us-east", "zone": "1c"}),
|
||||
models.Fields{"value": float64(11)},
|
||||
time.Unix(1435077219, 0))
|
||||
|
||||
|
@ -487,16 +487,16 @@ func TestParseDefaultTags(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestParseDefaultTemplateTags(t *testing.T) {
|
||||
p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c"}, models.Tags{
|
||||
p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c"}, models.NewTags(map[string]string{
|
||||
"region": "us-east",
|
||||
"host": "should not set",
|
||||
})
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating parser, got %v", err)
|
||||
}
|
||||
|
||||
exp := models.MustNewPoint("cpu_load",
|
||||
models.Tags{"host": "localhost", "region": "us-east", "zone": "1c"},
|
||||
models.NewTags(map[string]string{"host": "localhost", "region": "us-east", "zone": "1c"}),
|
||||
models.Fields{"value": float64(11)},
|
||||
time.Unix(1435077219, 0))
|
||||
|
||||
|
@ -511,16 +511,16 @@ func TestParseDefaultTemplateTags(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestParseDefaultTemplateTagsOverridGlobal(t *testing.T) {
|
||||
p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c,region=us-east"}, models.Tags{
|
||||
p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c,region=us-east"}, models.NewTags(map[string]string{
|
||||
"region": "shot not be set",
|
||||
"host": "should not set",
|
||||
})
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating parser, got %v", err)
|
||||
}
|
||||
|
||||
exp := models.MustNewPoint("cpu_load",
|
||||
models.Tags{"host": "localhost", "region": "us-east", "zone": "1c"},
|
||||
models.NewTags(map[string]string{"host": "localhost", "region": "us-east", "zone": "1c"}),
|
||||
models.Fields{"value": float64(11)},
|
||||
time.Unix(1435077219, 0))
|
||||
|
||||
|
@ -535,16 +535,16 @@ func TestParseDefaultTemplateTagsOverridGlobal(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestParseTemplateWhitespace(t *testing.T) {
|
||||
p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c"}, models.Tags{
|
||||
p, err := graphite.NewParser([]string{"servers.localhost .host.measurement* zone=1c"}, models.NewTags(map[string]string{
|
||||
"region": "us-east",
|
||||
"host": "should not set",
|
||||
})
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating parser, got %v", err)
|
||||
}
|
||||
|
||||
exp := models.MustNewPoint("cpu_load",
|
||||
models.Tags{"host": "localhost", "region": "us-east", "zone": "1c"},
|
||||
models.NewTags(map[string]string{"host": "localhost", "region": "us-east", "zone": "1c"}),
|
||||
models.Fields{"value": float64(11)},
|
||||
time.Unix(1435077219, 0))
|
||||
|
||||
|
|
|
@ -106,7 +106,7 @@ func NewService(c Config) (*Service, error) {
|
|||
batchTimeout: time.Duration(d.BatchTimeout),
|
||||
logger: log.New(os.Stderr, fmt.Sprintf("[graphite] %s ", d.BindAddress), log.LstdFlags),
|
||||
stats: &Statistics{},
|
||||
statTags: map[string]string{"proto": d.Protocol, "bind": d.BindAddress},
|
||||
statTags: models.NewTags(map[string]string{"proto": d.Protocol, "bind": d.BindAddress}),
|
||||
tcpConnections: make(map[string]*tcpConnection),
|
||||
done: make(chan struct{}),
|
||||
diagsKey: strings.Join([]string{"graphite", d.Protocol, d.BindAddress}, ":"),
|
||||
|
|
|
@ -39,7 +39,7 @@ func Test_ServerGraphiteTCP(t *testing.T) {
|
|||
|
||||
pt, _ := models.NewPoint(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
models.NewTags(map[string]string{}),
|
||||
map[string]interface{}{"value": 23.456},
|
||||
time.Unix(now.Unix(), 0))
|
||||
|
||||
|
@ -115,7 +115,7 @@ func Test_ServerGraphiteUDP(t *testing.T) {
|
|||
|
||||
pt, _ := models.NewPoint(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
models.NewTags(map[string]string{}),
|
||||
map[string]interface{}{"value": 23.456},
|
||||
time.Unix(now.Unix(), 0))
|
||||
if database != "graphitedb" {
|
||||
|
|
|
@ -179,7 +179,7 @@ type Statistics struct {
|
|||
func (h *Handler) Statistics(tags map[string]string) []models.Statistic {
|
||||
return []models.Statistic{{
|
||||
Name: "httpd",
|
||||
Tags: tags,
|
||||
Tags: models.NewTags(tags),
|
||||
Values: map[string]interface{}{
|
||||
statRequest: atomic.LoadInt64(&h.stats.Requests),
|
||||
statCQRequest: atomic.LoadInt64(&h.stats.CQRequests),
|
||||
|
@ -737,24 +737,24 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) {
|
|||
for _, s := range stats {
|
||||
// Very hackily create a unique key.
|
||||
buf := bytes.NewBufferString(s.Name)
|
||||
if path, ok := s.Tags["path"]; ok {
|
||||
if path := s.Tags.Get([]byte("path")); path != nil {
|
||||
fmt.Fprintf(buf, ":%s", path)
|
||||
if id, ok := s.Tags["id"]; ok {
|
||||
if id := s.Tags.Get([]byte("id")); id != nil {
|
||||
fmt.Fprintf(buf, ":%s", id)
|
||||
}
|
||||
} else if bind, ok := s.Tags["bind"]; ok {
|
||||
if proto, ok := s.Tags["proto"]; ok {
|
||||
} else if bind := s.Tags.Get([]byte("bind")); bind != nil {
|
||||
if proto := s.Tags.Get([]byte("proto")); proto != nil {
|
||||
fmt.Fprintf(buf, ":%s", proto)
|
||||
}
|
||||
fmt.Fprintf(buf, ":%s", bind)
|
||||
} else if database, ok := s.Tags["database"]; ok {
|
||||
} else if database := s.Tags.Get([]byte("database")); database != nil {
|
||||
fmt.Fprintf(buf, ":%s", database)
|
||||
if rp, ok := s.Tags["retention_policy"]; ok {
|
||||
if rp := s.Tags.Get([]byte("retention_policy")); rp != nil {
|
||||
fmt.Fprintf(buf, ":%s", rp)
|
||||
if name, ok := s.Tags["name"]; ok {
|
||||
if name := s.Tags.Get([]byte("name")); name != nil {
|
||||
fmt.Fprintf(buf, ":%s", name)
|
||||
}
|
||||
if dest, ok := s.Tags["destination"]; ok {
|
||||
if dest := s.Tags.Get([]byte("destination")); dest != nil {
|
||||
fmt.Fprintf(buf, ":%s", dest)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -114,7 +114,7 @@ func (w *csvResponseWriter) WriteResponse(resp Response) (n int, err error) {
|
|||
for _, row := range result.Series {
|
||||
w.columns[0] = row.Name
|
||||
if len(row.Tags) > 0 {
|
||||
w.columns[1] = string(models.Tags(row.Tags).HashKey()[1:])
|
||||
w.columns[1] = string(models.NewTags(row.Tags).HashKey()[1:])
|
||||
} else {
|
||||
w.columns[1] = ""
|
||||
}
|
||||
|
|
|
@ -192,7 +192,7 @@ func (s *Service) Addr() net.Addr {
|
|||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
|
||||
return s.Handler.Statistics(models.Tags{"bind": s.addr}.Merge(tags))
|
||||
return s.Handler.Statistics(models.NewTags(map[string]string{"bind": s.addr}).Merge(tags).Map())
|
||||
}
|
||||
|
||||
// serveTCP serves the handler from the TCP listener.
|
||||
|
|
|
@ -111,7 +111,7 @@ func (h *Handler) servePut(w http.ResponseWriter, r *http.Request) {
|
|||
ts = time.Unix(p.Time/1000, (p.Time%1000)*1000)
|
||||
}
|
||||
|
||||
pt, err := models.NewPoint(p.Metric, p.Tags, map[string]interface{}{"value": p.Value}, ts)
|
||||
pt, err := models.NewPoint(p.Metric, models.NewTags(p.Tags), map[string]interface{}{"value": p.Value}, ts)
|
||||
if err != nil {
|
||||
h.Logger.Printf("Dropping point %v: %v", p.Metric, err)
|
||||
if h.stats != nil {
|
||||
|
|
|
@ -96,7 +96,7 @@ func NewService(c Config) (*Service, error) {
|
|||
Logger: log.New(os.Stderr, "[opentsdb] ", log.LstdFlags),
|
||||
LogPointErrors: d.LogPointErrors,
|
||||
stats: &Statistics{},
|
||||
statTags: map[string]string{"bind": d.BindAddress},
|
||||
statTags: models.NewTags(map[string]string{"bind": d.BindAddress}),
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
@ -381,7 +381,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
|
|||
}
|
||||
fields["value"] = fv
|
||||
|
||||
pt, err := models.NewPoint(measurement, tags, fields, t)
|
||||
pt, err := models.NewPoint(measurement, models.NewTags(tags), fields, t)
|
||||
if err != nil {
|
||||
atomic.AddInt64(&s.stats.TelnetBadFloat, 1)
|
||||
if s.LogPointErrors {
|
||||
|
|
|
@ -39,7 +39,7 @@ func TestService_Telnet(t *testing.T) {
|
|||
} else if !reflect.DeepEqual(points, []models.Point{
|
||||
models.MustNewPoint(
|
||||
"sys.cpu.user",
|
||||
map[string]string{"host": "webserver01", "cpu": "0"},
|
||||
models.NewTags(map[string]string{"host": "webserver01", "cpu": "0"}),
|
||||
map[string]interface{}{"value": 42.5},
|
||||
time.Unix(1356998400, 0),
|
||||
),
|
||||
|
@ -101,7 +101,7 @@ func TestService_HTTP(t *testing.T) {
|
|||
} else if !reflect.DeepEqual(points, []models.Point{
|
||||
models.MustNewPoint(
|
||||
"sys.cpu.nice",
|
||||
map[string]string{"dc": "lga", "host": "web01"},
|
||||
models.NewTags(map[string]string{"dc": "lga", "host": "web01"}),
|
||||
map[string]interface{}{"value": 18.0},
|
||||
time.Unix(1346846400, 0),
|
||||
),
|
||||
|
|
|
@ -130,7 +130,7 @@ type Statistics struct {
|
|||
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
|
||||
statistics := []models.Statistic{{
|
||||
Name: "subscriber",
|
||||
Tags: tags,
|
||||
Tags: models.NewTags(tags),
|
||||
Values: map[string]interface{}{
|
||||
statPointsWritten: atomic.LoadInt64(&s.stats.PointsWritten),
|
||||
statWriteFailures: atomic.LoadInt64(&s.stats.WriteFailures),
|
||||
|
@ -415,13 +415,13 @@ func (b *balancewriter) WritePoints(p *coordinator.WritePointsRequest) error {
|
|||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
func (b *balancewriter) Statistics(tags map[string]string) []models.Statistic {
|
||||
tags = models.Tags(tags).Merge(b.tags)
|
||||
tags = models.NewTags(tags).Merge(b.tags).Map()
|
||||
|
||||
statistics := make([]models.Statistic, len(b.stats))
|
||||
for i := range b.stats {
|
||||
statistics[i] = models.Statistic{
|
||||
Name: "subscriber",
|
||||
Tags: models.Tags(tags).Merge(map[string]string{"destination": b.stats[i].dest}),
|
||||
Tags: models.NewTags(tags).Merge(map[string]string{"destination": b.stats[i].dest}),
|
||||
Values: map[string]interface{}{
|
||||
statPointsWritten: atomic.LoadInt64(&b.stats[i].pointsWritten),
|
||||
statWriteFailures: atomic.LoadInt64(&b.stats[i].failures),
|
||||
|
|
|
@ -71,7 +71,7 @@ func NewService(c Config) *Service {
|
|||
batcher: tsdb.NewPointBatcher(d.BatchSize, d.BatchPending, time.Duration(d.BatchTimeout)),
|
||||
Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags),
|
||||
stats: &Statistics{},
|
||||
statTags: map[string]string{"bind": d.BindAddress},
|
||||
statTags: models.NewTags(map[string]string{"bind": d.BindAddress}),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -12,8 +12,8 @@ func TestCommunePoint(t *testing.T) {
|
|||
if point.Name() != "write" {
|
||||
t.Errorf("expected: write\ngot: %v", point.Name())
|
||||
}
|
||||
if point.Tags()["tag"] != "tagVal" {
|
||||
t.Errorf("expected: tagVal\ngot: %v", point.Tags()["tag"])
|
||||
if point.Tags().GetString("tag") != "tagVal" {
|
||||
t.Errorf("expected: tagVal\ngot: %v", point.Tags().GetString("tag"))
|
||||
}
|
||||
if int(point.Fields()["fooField"].(float64)) != 5 {
|
||||
t.Errorf("expected: 5\ngot: %v\n", point.Fields()["fooField"])
|
||||
|
@ -24,8 +24,8 @@ func TestCommunePoint(t *testing.T) {
|
|||
if point.Name() != "write" {
|
||||
t.Errorf("expected: write\ngot: %v", point.Name())
|
||||
}
|
||||
if point.Tags()["tag"] != "tagVal" {
|
||||
t.Errorf("expected: tagVal\ngot: %v", point.Tags()["tag"])
|
||||
if point.Tags().GetString("tag") != "tagVal" {
|
||||
t.Errorf("expected: tagVal\ngot: %v", point.Tags().GetString("tag"))
|
||||
}
|
||||
if int(point.Fields()["fooField"].(float64)) != 5 {
|
||||
t.Errorf("expected: 5\ngot: %v\n", point.Fields()["fooField"])
|
||||
|
@ -40,8 +40,8 @@ func TestSetCommune(t *testing.T) {
|
|||
if pt.Name() != "write" {
|
||||
t.Errorf("expected: write\ngot: %v", pt.Name())
|
||||
}
|
||||
if pt.Tags()["tag"] != "tagVal" {
|
||||
t.Errorf("expected: tagVal\ngot: %v", pt.Tags()["tag"])
|
||||
if pt.Tags().GetString("tag") != "tagVal" {
|
||||
t.Errorf("expected: tagVal\ngot: %v", pt.Tags().GetString("tag"))
|
||||
}
|
||||
if int(pt.Fields()["fooField"].(float64)) != 5 {
|
||||
t.Errorf("expected: 5\ngot: %v\n", pt.Fields()["fooField"])
|
||||
|
|
|
@ -171,7 +171,7 @@ type CacheStatistics struct {
|
|||
func (c *Cache) Statistics(tags map[string]string) []models.Statistic {
|
||||
return []models.Statistic{{
|
||||
Name: "tsm1_cache",
|
||||
Tags: tags,
|
||||
Tags: models.NewTags(tags),
|
||||
Values: map[string]interface{}{
|
||||
statCacheMemoryBytes: atomic.LoadInt64(&c.stats.MemSizeBytes),
|
||||
statCacheDiskBytes: atomic.LoadInt64(&c.stats.DiskSizeBytes),
|
||||
|
|
|
@ -2,6 +2,7 @@ package tsm1 // import "github.com/influxdata/influxdb/tsdb/engine/tsm1"
|
|||
|
||||
import (
|
||||
"archive/tar"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
|
@ -248,7 +249,7 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic {
|
|||
statistics := make([]models.Statistic, 0, 4)
|
||||
statistics = append(statistics, models.Statistic{
|
||||
Name: "tsm1_engine",
|
||||
Tags: tags,
|
||||
Tags: models.NewTags(tags),
|
||||
Values: map[string]interface{}{
|
||||
statCacheCompactions: atomic.LoadInt64(&e.stats.CacheCompactions),
|
||||
statCacheCompactionDuration: atomic.LoadInt64(&e.stats.CacheCompactionDuration),
|
||||
|
@ -338,8 +339,9 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index *tsdb.DatabaseIndex) er
|
|||
|
||||
// Save reference to index for iterator creation.
|
||||
e.index = index
|
||||
e.FileStore.dereferencer = index
|
||||
|
||||
if err := e.FileStore.WalkKeys(func(key string, typ byte) error {
|
||||
if err := e.FileStore.WalkKeys(func(key []byte, typ byte) error {
|
||||
fieldType, err := tsmFieldTypeToInfluxQLDataType(typ)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -365,7 +367,7 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index *tsdb.DatabaseIndex) er
|
|||
continue
|
||||
}
|
||||
|
||||
if err := e.addToIndexFromKey(shardID, key, fieldType, index); err != nil {
|
||||
if err := e.addToIndexFromKey(shardID, []byte(key), fieldType, index); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -522,9 +524,9 @@ func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string) er
|
|||
|
||||
// addToIndexFromKey will pull the measurement name, series key, and field name from a composite key and add it to the
|
||||
// database index and measurement fields
|
||||
func (e *Engine) addToIndexFromKey(shardID uint64, key string, fieldType influxql.DataType, index *tsdb.DatabaseIndex) error {
|
||||
func (e *Engine) addToIndexFromKey(shardID uint64, key []byte, fieldType influxql.DataType, index *tsdb.DatabaseIndex) error {
|
||||
seriesKey, field := SeriesAndFieldFromCompositeKey(key)
|
||||
measurement := tsdb.MeasurementFromSeriesKey(seriesKey)
|
||||
measurement := tsdb.MeasurementFromSeriesKey(string(seriesKey))
|
||||
|
||||
m := index.CreateMeasurementIndexIfNotExists(measurement)
|
||||
m.SetFieldName(field)
|
||||
|
@ -540,7 +542,7 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key string, fieldType influxq
|
|||
}
|
||||
|
||||
// Have we already indexed this series?
|
||||
ss := index.Series(seriesKey)
|
||||
ss := index.SeriesBytes(seriesKey)
|
||||
if ss != nil {
|
||||
// Add this shard to the existing series
|
||||
ss.AssignShard(shardID)
|
||||
|
@ -551,7 +553,7 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key string, fieldType influxq
|
|||
// fields (in line protocol format) in the series key
|
||||
_, tags, _ := models.ParseKey(seriesKey)
|
||||
|
||||
s := tsdb.NewSeries(seriesKey, tags)
|
||||
s := tsdb.NewSeries(string(seriesKey), tags)
|
||||
index.CreateSeriesIndexIfNotExists(measurement, s)
|
||||
s.AssignShard(shardID)
|
||||
|
||||
|
@ -593,14 +595,14 @@ func (e *Engine) ContainsSeries(keys []string) (map[string]bool, error) {
|
|||
}
|
||||
|
||||
for _, k := range e.Cache.Keys() {
|
||||
seriesKey, _ := SeriesAndFieldFromCompositeKey(k)
|
||||
keyMap[seriesKey] = true
|
||||
seriesKey, _ := SeriesAndFieldFromCompositeKey([]byte(k))
|
||||
keyMap[string(seriesKey)] = true
|
||||
}
|
||||
|
||||
if err := e.FileStore.WalkKeys(func(k string, _ byte) error {
|
||||
if err := e.FileStore.WalkKeys(func(k []byte, _ byte) error {
|
||||
seriesKey, _ := SeriesAndFieldFromCompositeKey(k)
|
||||
if _, ok := keyMap[seriesKey]; ok {
|
||||
keyMap[seriesKey] = true
|
||||
if _, ok := keyMap[string(seriesKey)]; ok {
|
||||
keyMap[string(seriesKey)] = true
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
|
@ -638,14 +640,14 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error {
|
|||
|
||||
deleteKeys := make([]string, 0, len(seriesKeys))
|
||||
// go through the keys in the file store
|
||||
if err := e.FileStore.WalkKeys(func(k string, _ byte) error {
|
||||
if err := e.FileStore.WalkKeys(func(k []byte, _ byte) error {
|
||||
seriesKey, _ := SeriesAndFieldFromCompositeKey(k)
|
||||
|
||||
// Keep track if we've added this key since WalkKeys can return keys
|
||||
// we've seen before
|
||||
if v, ok := keyMap[seriesKey]; ok && v == 0 {
|
||||
deleteKeys = append(deleteKeys, k)
|
||||
keyMap[seriesKey] += 1
|
||||
if v, ok := keyMap[string(seriesKey)]; ok && v == 0 {
|
||||
deleteKeys = append(deleteKeys, string(k))
|
||||
keyMap[string(seriesKey)] += 1
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
|
@ -665,8 +667,8 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error {
|
|||
e.Cache.RLock()
|
||||
s := e.Cache.Store()
|
||||
for k, _ := range s {
|
||||
seriesKey, _ := SeriesAndFieldFromCompositeKey(k)
|
||||
if _, ok := keyMap[seriesKey]; ok {
|
||||
seriesKey, _ := SeriesAndFieldFromCompositeKey([]byte(k))
|
||||
if _, ok := keyMap[string(seriesKey)]; ok {
|
||||
walKeys = append(walKeys, k)
|
||||
}
|
||||
}
|
||||
|
@ -1277,7 +1279,7 @@ func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, mm *tsdb.Measu
|
|||
|
||||
// createVarRefSeriesIterator creates an iterator for a variable reference for a series.
|
||||
func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, mm *tsdb.Measurement, seriesKey string, t *influxql.TagSet, filter influxql.Expr, conditionFields []influxql.VarRef, opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
tags := influxql.NewTags(e.index.TagsForSeries(seriesKey))
|
||||
tags := influxql.NewTags(e.index.TagsForSeries(seriesKey).Map())
|
||||
|
||||
// Create options specific for this series.
|
||||
itrOpt := opt
|
||||
|
@ -1496,11 +1498,11 @@ func tsmFieldTypeToInfluxQLDataType(typ byte) (influxql.DataType, error) {
|
|||
}
|
||||
}
|
||||
|
||||
func SeriesAndFieldFromCompositeKey(key string) (string, string) {
|
||||
sep := strings.Index(key, keyFieldSeparator)
|
||||
func SeriesAndFieldFromCompositeKey(key []byte) ([]byte, string) {
|
||||
sep := bytes.Index(key, []byte(keyFieldSeparator))
|
||||
if sep == -1 {
|
||||
// No field???
|
||||
return key, ""
|
||||
}
|
||||
return key[:sep], key[sep+len(keyFieldSeparator):]
|
||||
return key[:sep], string(key[sep+len(keyFieldSeparator):])
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
|
|||
// Verify index is correct.
|
||||
if m := index.Measurement("cpu"); m == nil {
|
||||
t.Fatal("measurement not found")
|
||||
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "A"}) {
|
||||
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "A"})) {
|
||||
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags)
|
||||
}
|
||||
|
||||
|
@ -67,7 +67,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
|
|||
// Verify index is correct.
|
||||
if m := index.Measurement("cpu"); m == nil {
|
||||
t.Fatal("measurement not found")
|
||||
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "A"}) {
|
||||
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "A"})) {
|
||||
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags)
|
||||
}
|
||||
|
||||
|
@ -92,9 +92,9 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
|
|||
// Verify index is correct.
|
||||
if m := index.Measurement("cpu"); m == nil {
|
||||
t.Fatal("measurement not found")
|
||||
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "A"}) {
|
||||
} else if s := m.SeriesByID(1); s.Key != "cpu,host=A" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "A"})) {
|
||||
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags)
|
||||
} else if s := m.SeriesByID(2); s.Key != "cpu,host=B" || !reflect.DeepEqual(s.Tags, map[string]string{"host": "B"}) {
|
||||
} else if s := m.SeriesByID(2); s.Key != "cpu,host=B" || !reflect.DeepEqual(s.Tags, models.NewTags(map[string]string{"host": "B"})) {
|
||||
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags)
|
||||
}
|
||||
}
|
||||
|
@ -222,7 +222,7 @@ func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) {
|
|||
|
||||
e.Index().CreateMeasurementIndexIfNotExists("cpu")
|
||||
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
|
||||
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"}))
|
||||
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
|
||||
if err := e.WritePointsString(
|
||||
`cpu,host=A value=1.1 1000000000`,
|
||||
`cpu,host=A value=1.2 2000000000`,
|
||||
|
@ -275,7 +275,7 @@ func TestEngine_CreateIterator_Cache_Descending(t *testing.T) {
|
|||
|
||||
e.Index().CreateMeasurementIndexIfNotExists("cpu")
|
||||
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
|
||||
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"}))
|
||||
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
|
||||
if err := e.WritePointsString(
|
||||
`cpu,host=A value=1.1 1000000000`,
|
||||
`cpu,host=A value=1.2 2000000000`,
|
||||
|
@ -328,7 +328,7 @@ func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) {
|
|||
|
||||
e.Index().CreateMeasurementIndexIfNotExists("cpu")
|
||||
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
|
||||
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"}))
|
||||
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
|
||||
if err := e.WritePointsString(
|
||||
`cpu,host=A value=1.1 1000000000`,
|
||||
`cpu,host=A value=1.2 2000000000`,
|
||||
|
@ -382,7 +382,7 @@ func TestEngine_CreateIterator_TSM_Descending(t *testing.T) {
|
|||
|
||||
e.Index().CreateMeasurementIndexIfNotExists("cpu")
|
||||
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
|
||||
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"}))
|
||||
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
|
||||
if err := e.WritePointsString(
|
||||
`cpu,host=A value=1.1 1000000000`,
|
||||
`cpu,host=A value=1.2 2000000000`,
|
||||
|
@ -437,7 +437,7 @@ func TestEngine_CreateIterator_Aux(t *testing.T) {
|
|||
e.Index().CreateMeasurementIndexIfNotExists("cpu")
|
||||
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
|
||||
e.MeasurementFields("cpu").CreateFieldIfNotExists("F", influxql.Float, false)
|
||||
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"}))
|
||||
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
|
||||
if err := e.WritePointsString(
|
||||
`cpu,host=A value=1.1 1000000000`,
|
||||
`cpu,host=A F=100 1000000000`,
|
||||
|
@ -497,7 +497,7 @@ func TestEngine_CreateIterator_Condition(t *testing.T) {
|
|||
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
|
||||
e.MeasurementFields("cpu").CreateFieldIfNotExists("X", influxql.Float, false)
|
||||
e.MeasurementFields("cpu").CreateFieldIfNotExists("Y", influxql.Float, false)
|
||||
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"}))
|
||||
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
|
||||
if err := e.WritePointsString(
|
||||
`cpu,host=A value=1.1 1000000000`,
|
||||
`cpu,host=A X=10 1000000000`,
|
||||
|
@ -663,7 +663,7 @@ func MustInitBenchmarkEngine(pointN int) *Engine {
|
|||
// Initialize metadata.
|
||||
e.Index().CreateMeasurementIndexIfNotExists("cpu")
|
||||
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
|
||||
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", map[string]string{"host": "A"}))
|
||||
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
|
||||
|
||||
// Generate time ascending points with jitterred time & value.
|
||||
rand := rand.New(rand.NewSource(0))
|
||||
|
|
|
@ -58,7 +58,7 @@ type TSMFile interface {
|
|||
KeyCount() int
|
||||
|
||||
// KeyAt returns the key located at index position idx
|
||||
KeyAt(idx int) (string, byte)
|
||||
KeyAt(idx int) ([]byte, byte)
|
||||
|
||||
// Type returns the block type of the values stored for the key. Returns one of
|
||||
// BlockFloat64, BlockInt64, BlockBoolean, BlockString. If key does not exist,
|
||||
|
@ -106,6 +106,13 @@ type TSMFile interface {
|
|||
// BlockIterator returns an iterator pointing to the first block in the file and
|
||||
// allows sequential iteration to each every block.
|
||||
BlockIterator() *BlockIterator
|
||||
|
||||
// Removes mmap references held by another object.
|
||||
deref(dereferencer)
|
||||
}
|
||||
|
||||
type dereferencer interface {
|
||||
Dereference([]byte)
|
||||
}
|
||||
|
||||
// Statistics gathered by the FileStore.
|
||||
|
@ -132,6 +139,8 @@ type FileStore struct {
|
|||
purger *purger
|
||||
|
||||
currentTempDirID int
|
||||
|
||||
dereferencer dereferencer
|
||||
}
|
||||
|
||||
type FileStat struct {
|
||||
|
@ -157,7 +166,7 @@ func (f FileStat) ContainsKey(key string) bool {
|
|||
|
||||
func NewFileStore(dir string) *FileStore {
|
||||
logger := log.New(os.Stderr, "[filestore] ", log.LstdFlags)
|
||||
return &FileStore{
|
||||
fs := &FileStore{
|
||||
dir: dir,
|
||||
lastModified: time.Now(),
|
||||
logger: logger,
|
||||
|
@ -169,6 +178,8 @@ func NewFileStore(dir string) *FileStore {
|
|||
logger: logger,
|
||||
},
|
||||
}
|
||||
fs.purger.fileStore = fs
|
||||
return fs
|
||||
}
|
||||
|
||||
// enableTraceLogging must be called before the FileStore is opened.
|
||||
|
@ -204,7 +215,7 @@ type FileStoreStatistics struct {
|
|||
func (f *FileStore) Statistics(tags map[string]string) []models.Statistic {
|
||||
return []models.Statistic{{
|
||||
Name: "tsm1_filestore",
|
||||
Tags: tags,
|
||||
Tags: models.NewTags(tags),
|
||||
Values: map[string]interface{}{
|
||||
statFileStoreBytes: atomic.LoadInt64(&f.stats.DiskBytes),
|
||||
statFileStoreCount: atomic.LoadInt64(&f.stats.FileCount),
|
||||
|
@ -281,7 +292,7 @@ func (f *FileStore) Remove(paths ...string) {
|
|||
|
||||
// WalkKeys calls fn for every key in every TSM file known to the FileStore. If the key
|
||||
// exists in multiple files, it will be invoked for each file.
|
||||
func (f *FileStore) WalkKeys(fn func(key string, typ byte) error) error {
|
||||
func (f *FileStore) WalkKeys(fn func(key []byte, typ byte) error) error {
|
||||
f.mu.RLock()
|
||||
defer f.mu.RUnlock()
|
||||
|
||||
|
@ -305,7 +316,7 @@ func (f *FileStore) Keys() map[string]byte {
|
|||
for _, f := range f.files {
|
||||
for i := 0; i < f.KeyCount(); i++ {
|
||||
key, typ := f.KeyAt(i)
|
||||
uniqueKeys[key] = typ
|
||||
uniqueKeys[string(key)] = typ
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -556,6 +567,11 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
|
|||
continue
|
||||
}
|
||||
|
||||
// Remove any mmap references held by the index.
|
||||
if f.dereferencer != nil {
|
||||
file.deref(f.dereferencer)
|
||||
}
|
||||
|
||||
if err := file.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1088,9 +1104,10 @@ func (c *KeyCursor) filterBooleanValues(tombstones []TimeRange, values BooleanVa
|
|||
}
|
||||
|
||||
type purger struct {
|
||||
mu sync.RWMutex
|
||||
files map[string]TSMFile
|
||||
running bool
|
||||
mu sync.RWMutex
|
||||
fileStore *FileStore
|
||||
files map[string]TSMFile
|
||||
running bool
|
||||
|
||||
logger *log.Logger
|
||||
}
|
||||
|
@ -1118,6 +1135,11 @@ func (p *purger) purge() {
|
|||
p.mu.Lock()
|
||||
for k, v := range p.files {
|
||||
if !v.InUse() {
|
||||
// Remove any mmap references held by the index.
|
||||
if p.fileStore.dereferencer != nil {
|
||||
v.deref(p.fileStore.dereferencer)
|
||||
}
|
||||
|
||||
if err := v.Close(); err != nil {
|
||||
p.logger.Printf("purge: close file: %v", err)
|
||||
continue
|
||||
|
|
|
@ -68,7 +68,7 @@ type TSMIndex interface {
|
|||
Key(index int) (string, []IndexEntry)
|
||||
|
||||
// KeyAt returns the key in the index at the given postion.
|
||||
KeyAt(index int) (string, byte)
|
||||
KeyAt(index int) ([]byte, byte)
|
||||
|
||||
// KeyCount returns the count of unique keys in the index.
|
||||
KeyCount() int
|
||||
|
@ -116,7 +116,7 @@ func (b *BlockIterator) PeekNext() string {
|
|||
return b.key
|
||||
} else if b.n-b.i > 1 {
|
||||
key, _ := b.r.KeyAt(b.i + 1)
|
||||
return key
|
||||
return string(key)
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
@ -243,7 +243,7 @@ func (t *TSMReader) Key(index int) (string, []IndexEntry) {
|
|||
}
|
||||
|
||||
// KeyAt returns the key and key type at position idx in the index.
|
||||
func (t *TSMReader) KeyAt(idx int) (string, byte) {
|
||||
func (t *TSMReader) KeyAt(idx int) ([]byte, byte) {
|
||||
return t.index.KeyAt(idx)
|
||||
}
|
||||
|
||||
|
@ -489,6 +489,13 @@ func (t *TSMReader) BlockIterator() *BlockIterator {
|
|||
}
|
||||
}
|
||||
|
||||
// deref removes mmap references held by another object.
|
||||
func (t *TSMReader) deref(d dereferencer) {
|
||||
if acc, ok := t.accessor.(*mmapAccessor); ok {
|
||||
d.Dereference(acc.b)
|
||||
}
|
||||
}
|
||||
|
||||
// indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This
|
||||
// implementation can be used for indexes that may be MMAPed into memory.
|
||||
type indirectIndex struct {
|
||||
|
@ -667,15 +674,15 @@ func (d *indirectIndex) Key(idx int) (string, []IndexEntry) {
|
|||
return string(key), entries.entries
|
||||
}
|
||||
|
||||
func (d *indirectIndex) KeyAt(idx int) (string, byte) {
|
||||
func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) {
|
||||
d.mu.RLock()
|
||||
defer d.mu.RUnlock()
|
||||
|
||||
if idx < 0 || idx >= len(d.offsets) {
|
||||
return "", 0
|
||||
return nil, 0
|
||||
}
|
||||
n, key, _ := readKey(d.b[d.offsets[idx]:])
|
||||
return string(key), d.b[d.offsets[idx]+int32(n)]
|
||||
return key, d.b[d.offsets[idx]+int32(n)]
|
||||
}
|
||||
|
||||
func (d *indirectIndex) KeyCount() int {
|
||||
|
|
|
@ -144,7 +144,7 @@ type WALStatistics struct {
|
|||
func (l *WAL) Statistics(tags map[string]string) []models.Statistic {
|
||||
return []models.Statistic{{
|
||||
Name: "tsm1_wal",
|
||||
Tags: tags,
|
||||
Tags: models.NewTags(tags),
|
||||
Values: map[string]interface{}{
|
||||
statWALOldBytes: atomic.LoadInt64(&l.stats.OldBytes),
|
||||
statWALCurrentBytes: atomic.LoadInt64(&l.stats.CurrentBytes),
|
||||
|
|
129
tsdb/meta.go
129
tsdb/meta.go
|
@ -6,6 +6,7 @@ import (
|
|||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
|
@ -56,7 +57,7 @@ type IndexStatistics struct {
|
|||
func (d *DatabaseIndex) Statistics(tags map[string]string) []models.Statistic {
|
||||
return []models.Statistic{{
|
||||
Name: "database",
|
||||
Tags: models.Tags(map[string]string{"database": d.name}).Merge(tags),
|
||||
Tags: models.NewTags(map[string]string{"database": d.name}).Merge(tags),
|
||||
Values: map[string]interface{}{
|
||||
statDatabaseSeries: atomic.LoadInt64(&d.stats.NumSeries),
|
||||
statDatabaseMeasurements: atomic.LoadInt64(&d.stats.NumMeasurements),
|
||||
|
@ -72,6 +73,14 @@ func (d *DatabaseIndex) Series(key string) *Series {
|
|||
return s
|
||||
}
|
||||
|
||||
// SeriesBytes returns a series by key.
|
||||
func (d *DatabaseIndex) SeriesBytes(key []byte) *Series {
|
||||
d.mu.RLock()
|
||||
s := d.series[string(key)]
|
||||
d.mu.RUnlock()
|
||||
return s
|
||||
}
|
||||
|
||||
func (d *DatabaseIndex) SeriesKeys() []string {
|
||||
d.mu.RLock()
|
||||
s := make([]string, 0, len(d.series))
|
||||
|
@ -249,7 +258,7 @@ func (d *DatabaseIndex) RemoveShard(shardID uint64) {
|
|||
}
|
||||
|
||||
// TagsForSeries returns the tag map for the passed in series
|
||||
func (d *DatabaseIndex) TagsForSeries(key string) map[string]string {
|
||||
func (d *DatabaseIndex) TagsForSeries(key string) models.Tags {
|
||||
d.mu.RLock()
|
||||
defer d.mu.RUnlock()
|
||||
|
||||
|
@ -515,6 +524,16 @@ func (d *DatabaseIndex) DropSeries(keys []string) {
|
|||
atomic.AddInt64(&d.stats.NumSeries, -nDeleted)
|
||||
}
|
||||
|
||||
// Dereference removes all references to data within b and moves them to the heap.
|
||||
func (d *DatabaseIndex) Dereference(b []byte) {
|
||||
d.mu.RLock()
|
||||
defer d.mu.RUnlock()
|
||||
|
||||
for _, s := range d.series {
|
||||
s.Dereference(b)
|
||||
}
|
||||
}
|
||||
|
||||
// Measurement represents a collection of time series in a database. It also contains in memory
|
||||
// structures for indexing tags. Exported functions are goroutine safe while un-exported functions
|
||||
// assume the caller will use the appropriate locks
|
||||
|
@ -647,13 +666,13 @@ func (m *Measurement) AddSeries(s *Series) bool {
|
|||
}
|
||||
|
||||
// add this series id to the tag index on the measurement
|
||||
for k, v := range s.Tags {
|
||||
valueMap := m.seriesByTagKeyValue[k]
|
||||
for _, t := range s.Tags {
|
||||
valueMap := m.seriesByTagKeyValue[string(t.Key)]
|
||||
if valueMap == nil {
|
||||
valueMap = make(map[string]SeriesIDs)
|
||||
m.seriesByTagKeyValue[k] = valueMap
|
||||
m.seriesByTagKeyValue[string(t.Key)] = valueMap
|
||||
}
|
||||
ids := valueMap[v]
|
||||
ids := valueMap[string(t.Value)]
|
||||
ids = append(ids, s.id)
|
||||
|
||||
// most of the time the series ID will be higher than all others because it's a new
|
||||
|
@ -661,7 +680,7 @@ func (m *Measurement) AddSeries(s *Series) bool {
|
|||
if len(ids) > 1 && ids[len(ids)-1] < ids[len(ids)-2] {
|
||||
sort.Sort(ids)
|
||||
}
|
||||
valueMap[v] = ids
|
||||
valueMap[string(t.Value)] = ids
|
||||
}
|
||||
|
||||
return true
|
||||
|
@ -683,19 +702,19 @@ func (m *Measurement) DropSeries(series *Series) {
|
|||
|
||||
// remove this series id from the tag index on the measurement
|
||||
// s.seriesByTagKeyValue is defined as map[string]map[string]SeriesIDs
|
||||
for k, v := range series.Tags {
|
||||
values := m.seriesByTagKeyValue[k][v]
|
||||
for _, t := range series.Tags {
|
||||
values := m.seriesByTagKeyValue[string(t.Key)][string(t.Value)]
|
||||
ids := filter(values, seriesID)
|
||||
// Check to see if we have any ids, if not, remove the key
|
||||
if len(ids) == 0 {
|
||||
delete(m.seriesByTagKeyValue[k], v)
|
||||
delete(m.seriesByTagKeyValue[string(t.Key)], string(t.Value))
|
||||
} else {
|
||||
m.seriesByTagKeyValue[k][v] = ids
|
||||
m.seriesByTagKeyValue[string(t.Key)][string(t.Value)] = ids
|
||||
}
|
||||
|
||||
// If we have no values, then we delete the key
|
||||
if len(m.seriesByTagKeyValue[k]) == 0 {
|
||||
delete(m.seriesByTagKeyValue, k)
|
||||
if len(m.seriesByTagKeyValue[string(t.Key)]) == 0 {
|
||||
delete(m.seriesByTagKeyValue, string(t.Key))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -758,7 +777,7 @@ func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*
|
|||
|
||||
// Build the TagSet for this series.
|
||||
for _, dim := range dimensions {
|
||||
tags[dim] = s.Tags[dim]
|
||||
tags[dim] = s.Tags.GetString(dim)
|
||||
}
|
||||
|
||||
// Convert the TagSet to a string, so it can be added to a map allowing TagSets to be handled
|
||||
|
@ -1423,40 +1442,52 @@ func (a Measurements) union(other Measurements) Measurements {
|
|||
type Series struct {
|
||||
mu sync.RWMutex
|
||||
Key string
|
||||
Tags map[string]string
|
||||
Tags models.Tags
|
||||
id uint64
|
||||
measurement *Measurement
|
||||
shardIDs map[uint64]bool // shards that have this series defined
|
||||
shardIDs []uint64 // shards that have this series defined
|
||||
}
|
||||
|
||||
// NewSeries returns an initialized series struct
|
||||
func NewSeries(key string, tags map[string]string) *Series {
|
||||
func NewSeries(key string, tags models.Tags) *Series {
|
||||
return &Series{
|
||||
Key: key,
|
||||
Tags: tags,
|
||||
shardIDs: make(map[uint64]bool),
|
||||
Key: key,
|
||||
Tags: tags,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Series) AssignShard(shardID uint64) {
|
||||
s.mu.Lock()
|
||||
s.shardIDs[shardID] = true
|
||||
if !s.assigned(shardID) {
|
||||
s.shardIDs = append(s.shardIDs, shardID)
|
||||
sort.Sort(uint64Slice(s.shardIDs))
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *Series) UnassignShard(shardID uint64) {
|
||||
s.mu.Lock()
|
||||
delete(s.shardIDs, shardID)
|
||||
for i, v := range s.shardIDs {
|
||||
if v == shardID {
|
||||
s.shardIDs = append(s.shardIDs[:i], s.shardIDs[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *Series) Assigned(shardID uint64) bool {
|
||||
s.mu.RLock()
|
||||
b := s.shardIDs[shardID]
|
||||
b := s.assigned(shardID)
|
||||
s.mu.RUnlock()
|
||||
return b
|
||||
}
|
||||
|
||||
func (s *Series) assigned(shardID uint64) bool {
|
||||
i := sort.Search(len(s.shardIDs), func(i int) bool { return s.shardIDs[i] >= shardID })
|
||||
return i < len(s.shardIDs) && s.shardIDs[i] == shardID
|
||||
}
|
||||
|
||||
func (s *Series) ShardN() int {
|
||||
s.mu.RLock()
|
||||
n := len(s.shardIDs)
|
||||
|
@ -1464,6 +1495,36 @@ func (s *Series) ShardN() int {
|
|||
return n
|
||||
}
|
||||
|
||||
// Dereference removes references to a byte slice.
|
||||
func (s *Series) Dereference(b []byte) {
|
||||
s.mu.Lock()
|
||||
|
||||
min := uintptr(unsafe.Pointer(&b[0]))
|
||||
max := min + uintptr(len(b))
|
||||
|
||||
for _, t := range s.Tags {
|
||||
deref(&t.Key, min, max)
|
||||
deref(&t.Value, min, max)
|
||||
}
|
||||
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func deref(v *[]byte, min, max uintptr) {
|
||||
vv := *v
|
||||
|
||||
// Ignore if value is not within range.
|
||||
ptr := uintptr(unsafe.Pointer(&vv[0]))
|
||||
if ptr < min || ptr > max {
|
||||
return
|
||||
}
|
||||
|
||||
// Otherwise copy to the heap.
|
||||
buf := make([]byte, len(vv))
|
||||
copy(buf, vv)
|
||||
*v = buf
|
||||
}
|
||||
|
||||
// MarshalBinary encodes the object to a binary format.
|
||||
func (s *Series) MarshalBinary() ([]byte, error) {
|
||||
s.mu.RLock()
|
||||
|
@ -1471,10 +1532,8 @@ func (s *Series) MarshalBinary() ([]byte, error) {
|
|||
|
||||
var pb internal.Series
|
||||
pb.Key = &s.Key
|
||||
for k, v := range s.Tags {
|
||||
key := k
|
||||
value := v
|
||||
pb.Tags = append(pb.Tags, &internal.Tag{Key: &key, Value: &value})
|
||||
for _, t := range s.Tags {
|
||||
pb.Tags = append(pb.Tags, &internal.Tag{Key: proto.String(string(t.Key)), Value: proto.String(string(t.Value))})
|
||||
}
|
||||
return proto.Marshal(&pb)
|
||||
}
|
||||
|
@ -1489,9 +1548,9 @@ func (s *Series) UnmarshalBinary(buf []byte) error {
|
|||
return err
|
||||
}
|
||||
s.Key = pb.GetKey()
|
||||
s.Tags = make(map[string]string, len(pb.Tags))
|
||||
for _, t := range pb.Tags {
|
||||
s.Tags[t.GetKey()] = t.GetValue()
|
||||
s.Tags = make(models.Tags, len(pb.Tags))
|
||||
for i, t := range pb.Tags {
|
||||
s.Tags[i] = models.Tag{Key: []byte(t.GetKey()), Value: []byte(t.GetValue())}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -1721,7 +1780,7 @@ func (m *Measurement) tagValuesByKeyAndSeriesID(tagKeys []string, ids SeriesIDs)
|
|||
// Iterate the tag keys we're interested in and collect values
|
||||
// from this series, if they exist.
|
||||
for _, tagKey := range tagKeys {
|
||||
if tagVal, ok := s.Tags[tagKey]; ok {
|
||||
if tagVal := s.Tags.GetString(tagKey); tagVal != "" {
|
||||
if _, ok = tagValues[tagKey]; !ok {
|
||||
tagValues[tagKey] = newStringSet()
|
||||
}
|
||||
|
@ -1810,6 +1869,12 @@ func filter(a []uint64, v uint64) []uint64 {
|
|||
// contains a measurement name.
|
||||
func MeasurementFromSeriesKey(key string) string {
|
||||
// Ignoring the error because the func returns "missing fields"
|
||||
k, _, _ := models.ParseKey(key)
|
||||
k, _, _ := models.ParseKey([]byte(key))
|
||||
return escape.UnescapeString(k)
|
||||
}
|
||||
|
||||
type uint64Slice []uint64
|
||||
|
||||
func (a uint64Slice) Len() int { return len(a) }
|
||||
func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
)
|
||||
|
||||
|
@ -182,7 +183,7 @@ func genTestSeries(mCnt, tCnt, vCnt int) []*TestSeries {
|
|||
for _, ts := range tagSets {
|
||||
series = append(series, &TestSeries{
|
||||
Measurement: m,
|
||||
Series: tsdb.NewSeries(fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))), ts),
|
||||
Series: tsdb.NewSeries(fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))), models.NewTags(ts)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
152
tsdb/shard.go
152
tsdb/shard.go
|
@ -8,6 +8,7 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -117,12 +118,12 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti
|
|||
closing: make(chan struct{}),
|
||||
|
||||
stats: &ShardStatistics{},
|
||||
statTags: map[string]string{
|
||||
statTags: models.NewTags(map[string]string{
|
||||
"path": path,
|
||||
"id": fmt.Sprintf("%d", id),
|
||||
"database": db,
|
||||
"retentionPolicy": rp,
|
||||
},
|
||||
}),
|
||||
|
||||
database: db,
|
||||
retentionPolicy: rp,
|
||||
|
@ -177,10 +178,10 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic {
|
|||
return nil
|
||||
}
|
||||
|
||||
tags = s.statTags.Merge(tags)
|
||||
tags = s.statTags.Merge(tags).Map()
|
||||
statistics := []models.Statistic{{
|
||||
Name: "shard",
|
||||
Tags: models.Tags(tags).Merge(map[string]string{"engine": s.options.EngineVersion}),
|
||||
Tags: models.NewTags(tags).Merge(map[string]string{"engine": s.options.EngineVersion}),
|
||||
Values: map[string]interface{}{
|
||||
statWriteReq: atomic.LoadInt64(&s.stats.WriteReq),
|
||||
statSeriesCreate: atomic.LoadInt64(&s.stats.SeriesCreated),
|
||||
|
@ -449,9 +450,9 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*FieldCreate,
|
|||
for _, p := range points {
|
||||
// verify the tags and fields
|
||||
tags := p.Tags()
|
||||
if _, ok := tags["time"]; ok {
|
||||
if v := tags.Get([]byte("time")); v != nil {
|
||||
s.logger.Printf("dropping tag 'time' from '%s'\n", p.PrecisionString(""))
|
||||
delete(tags, "time")
|
||||
tags.Delete([]byte("time"))
|
||||
p.SetTags(tags)
|
||||
}
|
||||
|
||||
|
@ -1052,6 +1053,145 @@ func NewTagKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Itera
|
|||
return newMeasurementKeysIterator(sh, fn, opt)
|
||||
}
|
||||
|
||||
// tagValuesIterator emits key/tag values
|
||||
type tagValuesIterator struct {
|
||||
series []*Series // remaining series
|
||||
keys []string // tag keys to select from a series
|
||||
fields []string // fields to emit (key or value)
|
||||
buf struct {
|
||||
s *Series // current series
|
||||
keys []string // current tag's keys
|
||||
}
|
||||
}
|
||||
|
||||
// NewTagValuesIterator returns a new instance of TagValuesIterator.
|
||||
func NewTagValuesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
if opt.Condition == nil {
|
||||
return nil, errors.New("a condition is required")
|
||||
}
|
||||
|
||||
measurementExpr := influxql.CloneExpr(opt.Condition)
|
||||
measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr {
|
||||
switch e := e.(type) {
|
||||
case *influxql.BinaryExpr:
|
||||
switch e.Op {
|
||||
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
|
||||
tag, ok := e.LHS.(*influxql.VarRef)
|
||||
if !ok || tag.Val != "_name" {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return e
|
||||
}), nil)
|
||||
|
||||
mms, ok, err := sh.index.measurementsByExpr(measurementExpr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if !ok {
|
||||
mms = sh.index.Measurements()
|
||||
sort.Sort(mms)
|
||||
}
|
||||
|
||||
// If there are no measurements, return immediately.
|
||||
if len(mms) == 0 {
|
||||
return &tagValuesIterator{}, nil
|
||||
}
|
||||
|
||||
filterExpr := influxql.CloneExpr(opt.Condition)
|
||||
filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr {
|
||||
switch e := e.(type) {
|
||||
case *influxql.BinaryExpr:
|
||||
switch e.Op {
|
||||
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
|
||||
tag, ok := e.LHS.(*influxql.VarRef)
|
||||
if !ok || strings.HasPrefix(tag.Val, "_") {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return e
|
||||
}), nil)
|
||||
|
||||
var series []*Series
|
||||
keys := newStringSet()
|
||||
for _, mm := range mms {
|
||||
ss, ok, err := mm.TagKeysByExpr(opt.Condition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if !ok {
|
||||
keys.add(mm.TagKeys()...)
|
||||
} else {
|
||||
keys = keys.union(ss)
|
||||
}
|
||||
|
||||
ids, err := mm.seriesIDsAllOrByExpr(filterExpr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, id := range ids {
|
||||
series = append(series, mm.SeriesByID(id))
|
||||
}
|
||||
}
|
||||
|
||||
return &tagValuesIterator{
|
||||
series: series,
|
||||
keys: keys.list(),
|
||||
fields: influxql.VarRefs(opt.Aux).Strings(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Stats returns stats about the points processed.
|
||||
func (itr *tagValuesIterator) Stats() influxql.IteratorStats { return influxql.IteratorStats{} }
|
||||
|
||||
// Close closes the iterator.
|
||||
func (itr *tagValuesIterator) Close() error { return nil }
|
||||
|
||||
// Next emits the next point in the iterator.
|
||||
func (itr *tagValuesIterator) Next() (*influxql.FloatPoint, error) {
|
||||
for {
|
||||
// If there are no more values then move to the next key.
|
||||
if len(itr.buf.keys) == 0 {
|
||||
if len(itr.series) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
itr.buf.s = itr.series[0]
|
||||
itr.buf.keys = itr.keys
|
||||
itr.series = itr.series[1:]
|
||||
continue
|
||||
}
|
||||
|
||||
key := itr.buf.keys[0]
|
||||
value := itr.buf.s.Tags.GetString(key)
|
||||
if value == "" {
|
||||
itr.buf.keys = itr.buf.keys[1:]
|
||||
continue
|
||||
}
|
||||
|
||||
// Prepare auxiliary fields.
|
||||
auxFields := make([]interface{}, len(itr.fields))
|
||||
for i, f := range itr.fields {
|
||||
switch f {
|
||||
case "_tagKey":
|
||||
auxFields[i] = key
|
||||
case "value":
|
||||
auxFields[i] = value
|
||||
}
|
||||
}
|
||||
|
||||
// Return next key.
|
||||
p := &influxql.FloatPoint{
|
||||
Name: itr.buf.s.measurement.Name,
|
||||
Aux: auxFields,
|
||||
}
|
||||
itr.buf.keys = itr.buf.keys[1:]
|
||||
|
||||
return p, nil
|
||||
}
|
||||
}
|
||||
|
||||
// measurementKeyFunc is the function called by measurementKeysIterator.
|
||||
type measurementKeyFunc func(m *Measurement) []string
|
||||
|
||||
|
|
|
@ -47,7 +47,7 @@ func TestShardWriteAndIndex(t *testing.T) {
|
|||
|
||||
pt := models.MustNewPoint(
|
||||
"cpu",
|
||||
map[string]string{"host": "server"},
|
||||
models.Tags{{Key: []byte("host"), Value: []byte("server")}},
|
||||
map[string]interface{}{"value": 1.0},
|
||||
time.Unix(1, 2),
|
||||
)
|
||||
|
@ -69,7 +69,7 @@ func TestShardWriteAndIndex(t *testing.T) {
|
|||
}
|
||||
|
||||
seriesTags := index.Series(string(pt.Key())).Tags
|
||||
if len(seriesTags) != len(pt.Tags()) || pt.Tags()["host"] != seriesTags["host"] {
|
||||
if len(seriesTags) != len(pt.Tags()) || pt.Tags().GetString("host") != seriesTags.GetString("host") {
|
||||
t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags(), seriesTags)
|
||||
}
|
||||
if !reflect.DeepEqual(index.Measurement("cpu").TagKeys(), []string{"host"}) {
|
||||
|
@ -121,7 +121,7 @@ func TestMaxSeriesLimit(t *testing.T) {
|
|||
for i := 0; i < 1000; i++ {
|
||||
pt := models.MustNewPoint(
|
||||
"cpu",
|
||||
map[string]string{"host": fmt.Sprintf("server%d", i)},
|
||||
models.Tags{{Key: []byte("host"), Value: []byte(fmt.Sprintf("server%d", i))}},
|
||||
map[string]interface{}{"value": 1.0},
|
||||
time.Unix(1, 2),
|
||||
)
|
||||
|
@ -136,7 +136,7 @@ func TestMaxSeriesLimit(t *testing.T) {
|
|||
// Writing one more series should exceed the series limit.
|
||||
pt := models.MustNewPoint(
|
||||
"cpu",
|
||||
map[string]string{"host": "server9999"},
|
||||
models.Tags{{Key: []byte("host"), Value: []byte("server9999")}},
|
||||
map[string]interface{}{"value": 1.0},
|
||||
time.Unix(1, 2),
|
||||
)
|
||||
|
@ -169,7 +169,7 @@ func TestWriteTimeTag(t *testing.T) {
|
|||
|
||||
pt := models.MustNewPoint(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
models.NewTags(map[string]string{}),
|
||||
map[string]interface{}{"time": 1.0},
|
||||
time.Unix(1, 2),
|
||||
)
|
||||
|
@ -189,7 +189,7 @@ func TestWriteTimeTag(t *testing.T) {
|
|||
|
||||
pt = models.MustNewPoint(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
models.NewTags(map[string]string{}),
|
||||
map[string]interface{}{"value": 1.0, "time": 1.0},
|
||||
time.Unix(1, 2),
|
||||
)
|
||||
|
@ -230,7 +230,7 @@ func TestWriteTimeField(t *testing.T) {
|
|||
|
||||
pt := models.MustNewPoint(
|
||||
"cpu",
|
||||
map[string]string{"time": "now"},
|
||||
models.NewTags(map[string]string{"time": "now"}),
|
||||
map[string]interface{}{"value": 1.0},
|
||||
time.Unix(1, 2),
|
||||
)
|
||||
|
@ -270,7 +270,7 @@ func TestShardWriteAddNewField(t *testing.T) {
|
|||
|
||||
pt := models.MustNewPoint(
|
||||
"cpu",
|
||||
map[string]string{"host": "server"},
|
||||
models.NewTags(map[string]string{"host": "server"}),
|
||||
map[string]interface{}{"value": 1.0},
|
||||
time.Unix(1, 2),
|
||||
)
|
||||
|
@ -282,7 +282,7 @@ func TestShardWriteAddNewField(t *testing.T) {
|
|||
|
||||
pt = models.MustNewPoint(
|
||||
"cpu",
|
||||
map[string]string{"host": "server"},
|
||||
models.NewTags(map[string]string{"host": "server"}),
|
||||
map[string]interface{}{"value": 1.0, "value2": 2.0},
|
||||
time.Unix(1, 2),
|
||||
)
|
||||
|
@ -296,7 +296,7 @@ func TestShardWriteAddNewField(t *testing.T) {
|
|||
t.Fatalf("series wasn't in index")
|
||||
}
|
||||
seriesTags := index.Series(string(pt.Key())).Tags
|
||||
if len(seriesTags) != len(pt.Tags()) || pt.Tags()["host"] != seriesTags["host"] {
|
||||
if len(seriesTags) != len(pt.Tags()) || pt.Tags().GetString("host") != seriesTags.GetString("host") {
|
||||
t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags(), seriesTags)
|
||||
}
|
||||
if !reflect.DeepEqual(index.Measurement("cpu").TagKeys(), []string{"host"}) {
|
||||
|
@ -328,7 +328,7 @@ func TestShard_Close_RemoveIndex(t *testing.T) {
|
|||
|
||||
pt := models.MustNewPoint(
|
||||
"cpu",
|
||||
map[string]string{"host": "server"},
|
||||
models.NewTags(map[string]string{"host": "server"}),
|
||||
map[string]interface{}{"value": 1.0},
|
||||
time.Unix(1, 2),
|
||||
)
|
||||
|
@ -521,7 +521,7 @@ func TestShard_Disabled_WriteQuery(t *testing.T) {
|
|||
|
||||
pt := models.MustNewPoint(
|
||||
"cpu",
|
||||
map[string]string{"host": "server"},
|
||||
models.NewTags(map[string]string{"host": "server"}),
|
||||
map[string]interface{}{"value": 1.0},
|
||||
time.Unix(1, 2),
|
||||
)
|
||||
|
|
|
@ -929,13 +929,13 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err
|
|||
// Loop over all keys for each series.
|
||||
m := make(map[KeyValue]struct{}, len(ss))
|
||||
for _, series := range ss {
|
||||
for key, value := range series.Tags {
|
||||
for _, t := range series.Tags {
|
||||
if !ok {
|
||||
// nop
|
||||
} else if _, exists := keySet[key]; !exists {
|
||||
} else if _, exists := keySet[string(t.Key)]; !exists {
|
||||
continue
|
||||
}
|
||||
m[KeyValue{key, value}] = struct{}{}
|
||||
m[KeyValue{string(t.Key), string(t.Value)}] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue