Add text protocol parsing and serialzation for points

This changes the implementation of point to minimize the extra
processing needed to parse and marshal point data though the system.
pull/2696/head
Jason Wilder 2015-05-28 15:47:52 -06:00
parent 6bd781ccd3
commit 9a9bb736f7
4 changed files with 949 additions and 63 deletions

View File

@ -1,8 +1,12 @@
package tsdb package tsdb
import ( import (
"bytes"
"fmt"
"hash/fnv" "hash/fnv"
"sort" "sort"
"strconv"
"strings"
"time" "time"
) )
@ -15,7 +19,7 @@ type Point interface {
AddTag(key, value string) AddTag(key, value string)
SetTags(tags Tags) SetTags(tags Tags)
Fields() map[string]interface{} Fields() Fields
AddField(name string, value interface{}) AddField(name string, value interface{})
Time() time.Time Time() time.Time
@ -23,29 +27,431 @@ type Point interface {
UnixNano() uint64 UnixNano() uint64
HashID() uint64 HashID() uint64
Key() string Key() []byte
Data() []byte Data() []byte
SetData(buf []byte) SetData(buf []byte)
String() string
} }
// point is the default implementation of Point. // point is the default implementation of Point.
type point struct { type point struct {
name string time time.Time
tags Tags
time time.Time // text encoding of measurement and tags
fields map[string]interface{} // key must always be stored sorted by tags, if the original line was not sorted,
key string // we need to resort it
data []byte key []byte
// text encoding of field data
fields []byte
// text encoding of timestamp
ts []byte
// binary encoded field data
data []byte
}
var escapeCodes = map[byte][]byte{
',': []byte(`\,`),
'"': []byte(`\"`),
}
var escapeCodesStr = map[string]string{}
func init() {
for k, v := range escapeCodes {
escapeCodesStr[string(k)] = string(v)
}
}
func ParsePointsString(buf string) ([]Point, error) {
return ParsePoints([]byte(buf))
}
// ParsePoints returns a slice of Points from a text representation of a point
// with each point separated by newlines.
func ParsePoints(buf []byte) ([]Point, error) {
points := []Point{}
var (
pos int
block []byte
)
for {
pos, block = scanTo(buf, pos, '\n')
if len(block) == 0 {
return nil, fmt.Errorf("missing measurement")
}
pt, err := ParsePoint(block)
if err != nil {
return nil, err
}
points = append(points, pt)
if pos >= len(buf) {
break
}
}
return points, nil
}
func ParsePoint(buf []byte) (Point, error) {
// scan the first block which is measurement[,tag1=value1,tag2=value=2...]
pos, key, err := scanKey(buf, 0)
if err != nil {
return nil, err
}
// measurement name is required
if len(key) == 0 {
return nil, fmt.Errorf("missing measurement")
}
// scan the second block is which is field1=value1[,field2=value2,...]
pos, fields, err := scanFields(buf, pos)
if err != nil {
return nil, err
}
// at least one field is required
if len(fields) == 0 {
return nil, fmt.Errorf("missing fields")
}
// scan the last block which is an optional integer timestamp
pos, ts, err := scanTime(buf, pos)
if err != nil {
return nil, err
}
pt := &point{
key: key,
fields: fields,
ts: ts,
}
return pt, nil
}
// scanKey scans buf starting at i for the measurement and tag portion of the point.
// It returns the ending position and the byte slice of key within buf. If there
// are tags, they will be sorted if they are not already.
func scanKey(buf []byte, i int) (int, []byte, error) {
start := skipWhitespace(buf, i)
i = start
// Determiens whether the tags are sort, assume they are
sorted := true
// indices holds the indexes within buf of the start of each tag. For example,
// a buf of 'cpu,host=a,region=b,zone=c' would have indices slice of [4,11,20]
// which indicates that the first tag starts at buf[4], seconds at buf[11], and
// last at buf[20]
indices := make([]int, 100)
// tracks how many commas we've seen so we know how many values are indices.
// Since indices is an arbitraily large slice,
// we need to know how many values in the buffer are in use.
separators := 0
// tracks whether we've see an '='
hasSeparator := false
// loop over each byte in buf
for {
// reached the end of buf?
if i >= len(buf) {
if !hasSeparator && separators > 0 {
return i, buf[start:i], fmt.Errorf("missing value")
}
break
}
if buf[i] == '=' {
i += 1
hasSeparator = true
continue
}
// escaped character
if buf[i] == '\\' {
i += 2
continue
}
// At a tag separator (comma), track it's location
if buf[i] == ',' {
if !hasSeparator && separators > 0 {
return i, buf[start:i], fmt.Errorf("missing value")
}
i += 1
indices[separators] = i
separators += 1
hasSeparator = false
continue
}
// reached end of the block? (next block would be fields)
if buf[i] == ' ' {
if !hasSeparator && separators > 0 {
return i, buf[start:i], fmt.Errorf("missing value")
}
indices[separators] = i + 1
break
}
i += 1
}
// Now we know where the key region is within buf, and the locations of tags, we
// need to deterimine if duplicate tags exist and if the tags are sorted. This iterates
// 1/2 of the list comparing each end with each other, walking towards the center from
// both sides.
for j := 0; j < separators/2; j++ {
// get the left and right tags
_, left := scanTo(buf[indices[j]:indices[j+1]-1], 0, '=')
_, right := scanTo(buf[indices[separators-j-1]:indices[separators-j]-1], 0, '=')
// If the tags are equal, then there are duplicate tags, and we should abort
if bytes.Equal(left, right) {
return i, buf[start:i], fmt.Errorf("duplicate tags")
}
// If left is greater than right, the tags are not sorted. We must continue
// since their could be duplicate tags still.
if bytes.Compare(left, right) > 0 {
sorted = false
}
}
// If the tags are not sorted, then sort them. This sort is inline and
// uses the tag indices we created earlier. The actual buffer is not sorted, the
// indices are using the buffer for value comparison. After the indices are sorted,
// the buffer is reconstructed from the sorted indices.
if !sorted && separators > 0 {
// Get the measurement name for later
measurement := buf[start : indices[0]-1]
// Sort the indices
indices := indices[:separators]
insertionSort(0, separators, buf, indices)
// Create a new key using the measurement and sorted indices
b := make([]byte, len(buf[start:i]))
pos := copy(b, measurement)
for _, i := range indices {
b[pos] = ','
pos += 1
_, v := scanToSpaceOr(buf, i, ',')
pos += copy(b[pos:], v)
}
return i, b, nil
}
return i, buf[start:i], nil
}
func insertionSort(l, r int, buf []byte, indices []int) {
for i := l + 1; i < r; i++ {
for j := i; j > l && less(buf, indices, j, j-1); j-- {
indices[j], indices[j-1] = indices[j-1], indices[j]
}
}
}
func less(buf []byte, indices []int, i, j int) bool {
// This grabs the tag names for i & j, it ignores the values
_, a := scanTo(buf, indices[i], '=')
_, b := scanTo(buf, indices[j], '=')
return bytes.Compare(a, b) < 0
}
// scanFields scans buf, starting at i for the fields section of a point. It returns
// the ending position and the byte slice of the fields within buf
func scanFields(buf []byte, i int) (int, []byte, error) {
start := skipWhitespace(buf, i)
i = start
quoted := false
for {
// reached the end of buf?
if i >= len(buf) {
break
}
// escaped character
if buf[i] == '\\' {
i += 2
continue
}
// If the value is quoted, scan until we get to the end quote
if buf[i] == '"' {
quoted = !quoted
i += 1
continue
}
// reached end of block?
if buf[i] == ' ' && !quoted {
break
}
i += 1
}
if quoted {
return i, buf[start:i], fmt.Errorf("unbalanced quotes")
}
return i, buf[start:i], nil
}
// scanTime scans buf, starting at i for the time section of a point. It returns
// the ending position and the byte slice of the fields within buf and error if the
// timestamp is not in the correct numeric format
func scanTime(buf []byte, i int) (int, []byte, error) {
start := skipWhitespace(buf, i)
i = start
for {
// reached the end of buf?
if i >= len(buf) {
break
}
// Timestamps should integers, make sure they are so we don't need to actually
// parse the timestamp until needed
if buf[i] < '0' || buf[i] > '9' {
return i, buf[start:i], fmt.Errorf("bad timestamp")
}
// reached end of block?
if buf[i] == '\n' {
break
}
i += 1
}
return i, buf[start:i], nil
}
// skipWhitespace returns the end position within buf, starting at i after
// scanning over spaces in tags
func skipWhitespace(buf []byte, i int) int {
for {
if i >= len(buf) {
return i
}
if buf[i] == ' ' || buf[i] == '\t' {
i += 1
continue
}
break
}
return i
}
// scanTo returns the end position in buf and the next consecutive block
// of bytes, starting from i and ending with stop byte. If there are leading
// spaces, they are skipped.
func scanTo(buf []byte, i int, stop byte) (int, []byte) {
start := i
for {
// reached the end of buf?
if i >= len(buf) {
break
}
// reached end of block?
if buf[i] == stop {
break
}
i += 1
}
return i, buf[start:i]
}
// scanTo returns the end position in buf and the next consecutive block
// of bytes, starting from i and ending with stop byte. If there are leading
// spaces, they are skipped.
func scanToSpaceOr(buf []byte, i int, stop byte) (int, []byte) {
start := i
for {
// reached the end of buf?
if i >= len(buf) {
break
}
// reached end of block?
if buf[i] == stop || buf[i] == ' ' {
break
}
i += 1
}
return i, buf[start:i]
}
func scanTagValue(buf []byte, i int) (int, []byte) {
start := i
for {
if i >= len(buf) {
break
}
if buf[i] == '\\' {
i += 2
continue
}
if buf[i] == ',' {
break
}
i += 1
}
return i, buf[start:i]
}
func escape(in []byte) []byte {
for b, esc := range escapeCodes {
in = bytes.Replace(in, []byte{b}, esc, -1)
}
return in
}
func escapeString(in string) string {
for b, esc := range escapeCodesStr {
in = strings.Replace(in, b, esc, -1)
}
return in
}
func unescape(in []byte) []byte {
for b, esc := range escapeCodes {
in = bytes.Replace(in, esc, []byte{b}, -1)
}
return in
}
func unescapeString(in string) string {
for b, esc := range escapeCodesStr {
in = strings.Replace(in, esc, b, -1)
}
return in
} }
// NewPoint returns a new point with the given measurement name, tags, fiels and timestamp // NewPoint returns a new point with the given measurement name, tags, fiels and timestamp
func NewPoint(name string, tags Tags, fields map[string]interface{}, time time.Time) Point { func NewPoint(name string, tags Tags, fields Fields, time time.Time) Point {
return &point{ return &point{
name: name, key: makeKey([]byte(name), tags),
tags: tags,
time: time, time: time,
fields: fields, fields: fields.MarshalBinary(),
} }
} }
@ -57,25 +463,39 @@ func (p *point) SetData(b []byte) {
p.data = b p.data = b
} }
func (p *point) Key() string { func (p *point) Key() []byte {
if p.key == "" {
p.key = p.Name() + "," + string(p.tags.HashKey())
}
return p.key return p.key
} }
func (p *point) name() []byte {
_, name := scanTo(p.key, 0, ',')
return name
}
// Name return the measurement name for the point // Name return the measurement name for the point
func (p *point) Name() string { func (p *point) Name() string {
return p.name return string(unescape(p.name()))
} }
// SetName updates the measurement name for the point // SetName updates the measurement name for the point
func (p *point) SetName(name string) { func (p *point) SetName(name string) {
p.name = name p.key = makeKey([]byte(name), p.Tags())
} }
// Time return the timesteamp for the point // Time return the timesteamp for the point
func (p *point) Time() time.Time { func (p *point) Time() time.Time {
if !p.time.IsZero() {
return p.time
}
if len(p.ts) > 0 {
ts, err := strconv.ParseInt(string(p.ts), 10, 64)
if err != nil {
return p.time
}
p.time = time.Unix(0, ts)
}
return p.time return p.time
} }
@ -86,48 +506,75 @@ func (p *point) SetTime(t time.Time) {
// Tags returns the tag set for the point // Tags returns the tag set for the point
func (p *point) Tags() Tags { func (p *point) Tags() Tags {
return p.tags tags := map[string]string{}
if len(p.key) != 0 {
pos, name := scanTo(p.key, 0, ',')
// it's an empyt key, so there are no tags
if len(name) == 0 {
return tags
}
i := pos + 1
var key, value []byte
for {
if i >= len(p.key) {
break
}
i, key = scanTo(p.key, i, '=')
i, value = scanTagValue(p.key, i+1)
tags[string(key)] = string(unescape(value))
i += 1
}
}
return tags
}
func makeKey(name []byte, tags Tags) []byte {
return append(escape(name), tags.hashKey()...)
} }
// SetTags replaces the tags for the point // SetTags replaces the tags for the point
func (p *point) SetTags(tags Tags) { func (p *point) SetTags(tags Tags) {
p.tags = tags p.key = makeKey(p.name(), tags)
} }
// AddTag adds or replaces a tag value for a point // AddTag adds or replaces a tag value for a point
func (p *point) AddTag(key, value string) { func (p *point) AddTag(key, value string) {
p.tags[key] = value tags := p.Tags()
tags[key] = value
p.key = makeKey(p.name(), tags)
} }
// Fields returns the fiels for the point // Fields returns the fiels for the point
func (p *point) Fields() map[string]interface{} { func (p *point) Fields() Fields {
return p.fields return p.unmarshalBinary()
} }
// AddField adds or replaces a field value for a point // AddField adds or replaces a field value for a point
func (p *point) AddField(name string, value interface{}) { func (p *point) AddField(name string, value interface{}) {
p.fields[name] = value fields := p.Fields()
fields[name] = value
p.fields = fields.MarshalBinary()
}
func (p *point) String() string {
if p.Time().IsZero() {
return fmt.Sprintf("%s %s", p.Key(), string(p.fields))
}
return fmt.Sprintf("%s %s %d", p.Key(), string(p.fields), p.UnixNano())
}
func (p *point) unmarshalBinary() Fields {
return newFieldsFromBinary(p.fields)
} }
func (p *point) HashID() uint64 { func (p *point) HashID() uint64 {
// <measurementName>|<tagKey>|<tagKey>|<tagValue>|<tagValue>
// cpu|host|servera
encodedTags := p.tags.HashKey()
size := len(p.Name()) + len(encodedTags)
if len(encodedTags) > 0 {
size++
}
b := make([]byte, 0, size)
b = append(b, p.Name()...)
if len(encodedTags) > 0 {
b = append(b, '|')
}
b = append(b, encodedTags...)
// TODO pick a better hashing that guarantees uniqueness
// TODO create a cash for faster lookup
h := fnv.New64a() h := fnv.New64a()
h.Write(b) h.Write(p.key)
sum := h.Sum64() sum := h.Sum64()
return sum return sum
} }
@ -138,17 +585,21 @@ func (p *point) UnixNano() uint64 {
type Tags map[string]string type Tags map[string]string
func (t Tags) HashKey() []byte { func (t Tags) hashKey() []byte {
// Empty maps marshal to empty bytes. // Empty maps marshal to empty bytes.
if len(t) == 0 { if len(t) == 0 {
return nil return nil
} }
// Extract keys and determine final size. // Extract keys and determine final size.
sz := (len(t) * 2) - 1 // separators sz := len(t) + (len(t) * 2) - 1 // separators
keys := make([]string, 0, len(t)) keys := make([]string, len(t))
i := 0
for k, v := range t { for k, v := range t {
keys = append(keys, k) v = escapeString(v)
t[k] = v
keys[i] = k
i += 1
sz += len(k) + len(v) sz += len(k) + len(v)
} }
sort.Strings(keys) sort.Strings(keys)
@ -156,18 +607,149 @@ func (t Tags) HashKey() []byte {
// Generate marshaled bytes. // Generate marshaled bytes.
b := make([]byte, sz) b := make([]byte, sz)
buf := b buf := b
idx := 0
for _, k := range keys { for _, k := range keys {
copy(buf, k) buf[idx] = ','
buf[len(k)] = '|' idx += 1
buf = buf[len(k)+1:] copy(buf[idx:idx+len(k)], k)
} idx += len(k)
for i, k := range keys { buf[idx] = '='
idx += 1
v := t[k] v := t[k]
copy(buf, v) copy(buf[idx:idx+len(v)], v)
if i < len(keys)-1 { idx += len(v)
buf[len(v)] = '|' }
buf = buf[len(v)+1:] return b[:idx]
}
type Fields map[string]interface{}
func parseNumber(val []byte) (interface{}, error) {
for i := 0; i < len(val); i++ {
if val[i] == '.' {
return strconv.ParseFloat(string(val), 64)
} }
if val[i] < '0' && val[i] > '9' {
return string(val), nil
}
}
return strconv.ParseInt(string(val), 10, 64)
}
func newFieldsFromBinary(buf []byte) Fields {
fields := Fields{}
var (
i int
name, valueBuf []byte
value interface{}
err error
)
for {
if i >= len(buf) {
break
}
i, name = scanTo(buf, i, '=')
if len(name) == 0 {
continue
}
i, valueBuf = scanTo(buf, i+1, ',')
if len(valueBuf) == 0 {
fields[string(name)] = nil
continue
}
// If the first char is a double-quote, then unmarshal as string
if valueBuf[0] == '"' {
value = unescapeString(string(valueBuf[1 : len(valueBuf)-1]))
// Check for numeric characters
} else if (valueBuf[0] >= '0' && valueBuf[0] <= '9') || valueBuf[0] == '-' || valueBuf[0] == '.' {
value, err = parseNumber(valueBuf)
if err != nil {
panic(fmt.Sprintf("unable to parse float value '%v': %v", string(valueBuf), err))
}
// Otherwise parse it as bool
} else {
value, err = strconv.ParseBool(string(valueBuf))
if err != nil {
panic(fmt.Sprintf("unable to parse bool value '%v': %v", string(valueBuf), err))
}
}
fields[string(name)] = value
i += 1
}
return fields
}
func (p Fields) MarshalBinary() []byte {
b := []byte{}
keys := make([]string, len(p))
i := 0
for k, _ := range p {
keys[i] = k
i += 1
}
sort.Strings(keys)
for _, k := range keys {
v := p[k]
b = append(b, []byte(k)...)
b = append(b, '=')
switch t := v.(type) {
case int:
b = append(b, []byte(strconv.FormatFloat(float64(t), 'g', -1, 64))...)
case int32:
b = append(b, []byte(strconv.FormatFloat(float64(t), 'g', -1, 64))...)
case int64:
b = append(b, []byte(strconv.FormatFloat(float64(t), 'g', -1, 64))...)
case float64:
// ensure there is a decimal in the encoded for
val := []byte(strconv.FormatFloat(t, 'f', -1, 64))
hasDecimal := t-float64(int64(t)) > 0
b = append(b, val...)
if !hasDecimal {
b = append(b, []byte(".0")...)
}
case bool:
b = append(b, []byte(strconv.FormatBool(t))...)
case []byte:
b = append(b, t...)
case string:
b = append(b, '"')
b = append(b, []byte(escapeString(t))...)
b = append(b, '"')
case nil:
// skip
default:
panic(fmt.Sprintf("unknown type: %T", v))
}
b = append(b, ',')
}
if len(b) > 0 {
return b[0 : len(b)-1]
} }
return b return b
} }
type indexedSlice struct {
indices []int
b []byte
}
func (s *indexedSlice) Less(i, j int) bool {
_, a := scanTo(s.b, s.indices[i], '=')
_, b := scanTo(s.b, s.indices[j], '=')
return bytes.Compare(a, b) < 0
}
func (s *indexedSlice) Swap(i, j int) {
s.indices[i], s.indices[j] = s.indices[j], s.indices[i]
}
func (s *indexedSlice) Len() int {
return len(s.indices)
}

View File

@ -1,12 +1,17 @@
package tsdb package tsdb
import "testing" import (
"bytes"
"reflect"
"testing"
"time"
)
var tags = Tags{"foo": "bar", "apple": "orange", "host": "serverA", "region": "uswest"} var tags = Tags{"foo": "bar", "apple": "orange", "host": "serverA", "region": "uswest"}
func TestMarshal(t *testing.T) { func TestMarshal(t *testing.T) {
got := tags.HashKey() got := tags.hashKey()
if exp := "apple|foo|host|region|orange|bar|serverA|uswest"; string(got) != exp { if exp := ",apple=orange,foo=bar,host=serverA,region=uswest"; string(got) != exp {
t.Log("got: ", string(got)) t.Log("got: ", string(got))
t.Log("exp: ", exp) t.Log("exp: ", exp)
t.Error("invalid match") t.Error("invalid match")
@ -15,6 +20,305 @@ func TestMarshal(t *testing.T) {
func BenchmarkMarshal(b *testing.B) { func BenchmarkMarshal(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
tags.HashKey() tags.hashKey()
}
}
func BenchmarkParsePointNoTags(b *testing.B) {
line := `cpu value=1 1000000000`
for i := 0; i < b.N; i++ {
ParsePoint([]byte(line))
b.SetBytes(int64(len(line)))
}
}
func BenchmarkParsePointTagsSorted2(b *testing.B) {
line := `cpu,host=serverA,region=us-west value=1 1000000000`
for i := 0; i < b.N; i++ {
ParsePoint([]byte(line))
b.SetBytes(int64(len(line)))
}
}
func BenchmarkParsePointTagsSorted5(b *testing.B) {
line := `cpu,env=prod,host=serverA,region=us-west,target=servers,zone=1c value=1 1000000000`
for i := 0; i < b.N; i++ {
ParsePoint([]byte(line))
b.SetBytes(int64(len(line)))
}
}
func BenchmarkParsePointTagsSorted10(b *testing.B) {
line := `cpu,env=prod,host=serverA,region=us-west,tag1=value1,tag2=value2,tag3=value3,tag4=value4,tag5=value5,target=servers,zone=1c value=1 1000000000`
for i := 0; i < b.N; i++ {
ParsePoint([]byte(line))
b.SetBytes(int64(len(line)))
}
}
func BenchmarkParsePointTagsUnSorted2(b *testing.B) {
line := `cpu,region=us-west,host=serverA value=1 1000000000`
for i := 0; i < b.N; i++ {
pt, _ := ParsePoint([]byte(line))
b.SetBytes(int64(len(line)))
pt.Key()
}
}
func BenchmarkParsePointTagsUnSorted5(b *testing.B) {
line := `cpu,region=us-west,host=serverA,env=prod,target=servers,zone=1c value=1 1000000000`
for i := 0; i < b.N; i++ {
pt, _ := ParsePoint([]byte(line))
b.SetBytes(int64(len(line)))
pt.Key()
}
}
func BenchmarkParsePointTagsUnSorted10(b *testing.B) {
line := `cpu,region=us-west,host=serverA,env=prod,target=servers,zone=1c,tag1=value1,tag2=value2,tag3=value3,tag4=value4,tag5=value5 value=1 1000000000`
for i := 0; i < b.N; i++ {
pt, _ := ParsePoint([]byte(line))
b.SetBytes(int64(len(line)))
pt.Key()
}
}
func test(t *testing.T, line string, point Point) {
pts, err := ParsePointsString(line)
if err != nil {
t.Fatalf(`ParsePoints("%s") mismatch. got %v, exp nil`, line, err)
}
if exp := 1; len(pts) != exp {
t.Fatalf(`ParsePoints("%s") len mismatch. got %d, exp %d`, line, len(pts), exp)
}
if exp := point.Key(); !bytes.Equal(pts[0].Key(), exp) {
t.Errorf("ParsePoints(\"%s\") key mismatch.\ngot %v\nexp %v", line, string(pts[0].Key()), string(exp))
}
if exp := len(point.Tags()); len(pts[0].Tags()) != exp {
t.Errorf(`ParsePoints("%s") tags mismatch. got %v, exp %v`, line, pts[0].Tags(), exp)
}
for tag, value := range point.Tags() {
if pts[0].Tags()[tag] != value {
t.Errorf(`ParsePoints("%s") tags mismatch. got %v, exp %v`, line, pts[0].Tags()[tag], value)
}
}
for name, value := range point.Fields() {
if !reflect.DeepEqual(pts[0].Fields()[name], value) {
t.Errorf(`ParsePoints("%s") field '%s' mismatch. got %v, exp %v`, line, name, pts[0].Fields()[name], value)
}
}
if !pts[0].Time().Equal(point.Time()) {
t.Errorf(`ParsePoints("%s") time mismatch. got %v, exp %v`, line, pts[0].Time(), point.Time())
}
if line != pts[0].String() {
t.Errorf("ParsePoints string mismatch.\ngot: %v\nexp: %v", pts[0].String(), line)
}
}
func TestParsePointNoValue(t *testing.T) {
_, err := ParsePointsString("")
if err == nil {
t.Errorf(`ParsePoints("%s") mismatch. got nil, exp error`, "")
}
}
func TestParsePointNoFields(t *testing.T) {
_, err := ParsePointsString("cpu")
if err == nil {
t.Errorf(`ParsePoints("%s") mismatch. got nil, exp error`, "cpu")
}
}
func TestParsePointNoTimestamp(t *testing.T) {
test(t, "cpu value=1", NewPoint("cpu", nil, nil, time.Time{}))
}
func TestParsePointMissingQuote(t *testing.T) {
_, err := ParsePointsString(`cpu,host=serverA value="test`)
if err == nil {
t.Errorf(`ParsePoints("%s") mismatch. got nil, exp error`, "cpu")
}
}
func TestParsePointMissingTagValue(t *testing.T) {
_, err := ParsePointsString(`cpu,host=serverA,region value=1`)
if err == nil {
t.Errorf(`ParsePoints("%s") mismatch. got nil, exp error`, "cpu")
}
}
func TestParsePointUnescape(t *testing.T) {
// commas in tag values
test(t, `cpu,regions=east\,west value=1.0`,
NewPoint("cpu",
Tags{
"regions": "east,west", // comma in the tag value
},
Fields{
"value": 1.0,
},
time.Time{}))
// commas in measuremnt name
test(t, `cpu\,main,regions=east\,west value=1.0`,
NewPoint(
"cpu,main", // comma in the name
Tags{
"regions": "east,west",
},
Fields{
"value": 1.0,
},
time.Time{}))
// random character escaped
test(t, `cpu,regions=eas\t value=1.0`,
NewPoint(
"cpu",
Tags{
"regions": "eas\\t",
},
Fields{
"value": 1.0,
},
time.Time{}))
}
func TestParsePointWithTags(t *testing.T) {
test(t,
"cpu,host=serverA,region=us-east value=1.0 1000000000",
NewPoint("cpu",
Tags{"host": "serverA", "region": "us-east"},
Fields{"value": 1.0}, time.Unix(1, 0)))
}
func TestParsPointWithDuplicateTags(t *testing.T) {
_, err := ParsePoint([]byte(`cpu,host=serverA,host=serverB value=1 1000000000`))
if err == nil {
t.Fatalf(`ParsePoint() expected error. got nil`)
}
}
func TestParsePointWithStringField(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo",str2="bar" 1000000000`,
NewPoint("cpu",
Tags{
"host": "serverA",
"region": "us-east",
},
Fields{
"value": 1.0,
"str": "foo",
"str2": "bar",
},
time.Unix(1, 0)),
)
test(t, `cpu,host=serverA,region=us-east str="foo \" bar" 1000000000`,
NewPoint("cpu",
Tags{
"host": "serverA",
"region": "us-east",
},
Fields{
"str": `foo " bar`,
},
time.Unix(1, 0)),
)
}
func TestParsePointWithStringWithSpaces(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east value=1.0,str="foo bar" 1000000000`,
NewPoint(
"cpu",
Tags{
"host": "serverA",
"region": "us-east",
},
Fields{
"value": 1.0,
"str": "foo bar", // spaces in string value
},
time.Unix(1, 0)),
)
}
func TestParsePointWithBoolField(t *testing.T) {
test(t, `cpu,host=serverA,region=us-east bool=true,boolTrue=t,false=false,falseVal=f 1000000000`,
NewPoint(
"cpu",
Tags{
"host": "serverA",
"region": "us-east",
},
Fields{
"bool": true,
"boolTrue": true,
"false": false,
"falseVal": false,
},
time.Unix(1, 0)),
)
}
func TestParsePointIntsFloats(t *testing.T) {
pt, err := ParsePoint([]byte(`cpu,host=serverA,region=us-east int=10,float=11.0,float2=12.1 1000000000`))
if err != nil {
t.Fatalf(`ParsePoints() failed. got %s`, err)
}
if _, ok := pt.Fields()["int"].(int64); !ok {
t.Errorf("ParsePoint() int field mismatch: got %T, exp %T", pt.Fields()["int"], int64(10))
}
if _, ok := pt.Fields()["float"].(float64); !ok {
t.Errorf("ParsePoint() float field mismatch: got %T, exp %T", pt.Fields()["float64"], float64(11.0))
}
if _, ok := pt.Fields()["float2"].(float64); !ok {
t.Errorf("ParsePoint() float field mismatch: got %T, exp %T", pt.Fields()["float64"], float64(12.1))
}
}
func TestParsePointKeyUnsorted(t *testing.T) {
pt, err := ParsePoint([]byte("cpu,last=1,first=2 value=1"))
if err != nil {
t.Fatalf(`ParsePoints() failed. got %s`, err)
}
if exp := "cpu,first=2,last=1"; string(pt.Key()) != exp {
t.Errorf("ParsePoint key not sorted. got %v, exp %v", pt.Key(), exp)
}
}
func TestParsePointToString(t *testing.T) {
line := `cpu,host=serverA,region=us-east bool=false,float=11.0,float2=12.123,int=10,str="string val" 1000000000`
pt, err := ParsePoint([]byte(line))
if err != nil {
t.Fatalf(`ParsePoints() failed. got %s`, err)
}
got := pt.String()
if line != got {
t.Errorf("ParsePoint() to string mismatch:\n got %v\n exp %v", got, line)
}
pt = NewPoint("cpu", Tags{"host": "serverA", "region": "us-east"},
Fields{"int": 10, "float": float64(11.0), "float2": float64(12.123), "bool": false, "str": "string val"},
time.Unix(1, 0))
got = pt.String()
if line != got {
t.Errorf("NewPoint() to string mismatch:\n got %v\n exp %v", got, line)
} }
} }

View File

@ -146,7 +146,7 @@ func (s *Shard) WritePoints(points []Point) error {
// save the raw point data // save the raw point data
b := tx.Bucket([]byte("values")) b := tx.Bucket([]byte("values"))
for _, p := range points { for _, p := range points {
bp, err := b.CreateBucketIfNotExists([]byte(p.Key())) bp, err := b.CreateBucketIfNotExists(p.Key())
if err != nil { if err != nil {
return err return err
} }
@ -217,8 +217,8 @@ func (s *Shard) validateSeriesAndFields(points []Point) ([]*seriesCreate, []*fie
for _, p := range points { for _, p := range points {
// see if the series should be added to the index // see if the series should be added to the index
if ss := s.index.series[p.Key()]; ss == nil { if ss := s.index.series[string(p.Key())]; ss == nil {
series := &Series{Key: p.Key(), Tags: p.Tags()} series := &Series{Key: string(p.Key()), Tags: p.Tags()}
seriesToCreate = append(seriesToCreate, &seriesCreate{p.Name(), series}) seriesToCreate = append(seriesToCreate, &seriesCreate{p.Name(), series})
} }

View File

@ -44,9 +44,9 @@ func TestShardWriteAndIndex(t *testing.T) {
if len(index.series) != 1 { if len(index.series) != 1 {
t.Fatalf("series wasn't in index") t.Fatalf("series wasn't in index")
} }
seriesTags := index.series[pt.Key()].Tags seriesTags := index.series[string(pt.Key())].Tags
if len(seriesTags) != len(pt.Tags()) || pt.Tags()["host"] != seriesTags["host"] { if len(seriesTags) != len(pt.Tags()) || pt.Tags()["host"] != seriesTags["host"] {
t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags(), index.series[pt.Key()].Tags) t.Fatalf("tags weren't properly saved to series index: %v, %v", pt.Tags(), index.series[string(pt.Key())].Tags)
} }
if !reflect.DeepEqual(index.measurements["cpu"].tagKeys(), []string{"host"}) { if !reflect.DeepEqual(index.measurements["cpu"].tagKeys(), []string{"host"}) {
t.Fatalf("tag key wasn't saved to measurement index") t.Fatalf("tag key wasn't saved to measurement index")