Move graphite parser to separate file

pull/3125/head
Jason Wilder 2015-06-22 14:53:51 -06:00
parent 2a383e6858
commit d539b23817
4 changed files with 346 additions and 297 deletions

View File

@ -0,0 +1,98 @@
package graphite
import (
"fmt"
"math"
"strconv"
"strings"
"time"
"github.com/influxdb/influxdb/tsdb"
)
// Parser encapulates a Graphite Parser.
type Parser struct {
Separator string
FieldNames []string
IgnoreUnnamed bool
}
// NewParser returns a GraphiteParser instance.
func NewParser(schema string, separator string, ignore bool) *Parser {
return &Parser{
Separator: separator,
FieldNames: strings.Split(schema, separator),
IgnoreUnnamed: ignore,
}
}
// Parse performs Graphite parsing of a single line.
func (p *Parser) Parse(line string) (tsdb.Point, error) {
// Break into 3 fields (name, value, timestamp).
fields := strings.Fields(line)
if len(fields) != 3 {
return nil, fmt.Errorf("received %q which doesn't have three fields", line)
}
// decode the name and tags
name, tags, err := p.DecodeNameAndTags(fields[0])
if err != nil {
return nil, err
}
// Parse value.
v, err := strconv.ParseFloat(fields[1], 64)
if err != nil {
return nil, fmt.Errorf("field \"%s\" value: %s", fields[0], err)
}
fieldValues := make(map[string]interface{})
fieldValues["value"] = v
// Parse timestamp.
unixTime, err := strconv.ParseFloat(fields[2], 64)
if err != nil {
return nil, fmt.Errorf("field \"%s\" time: %s", fields[0], err)
}
// Check if we have fractional seconds
timestamp := time.Unix(int64(unixTime), int64((unixTime-math.Floor(unixTime))*float64(time.Second)))
point := tsdb.NewPoint(name, tags, fieldValues, timestamp)
return point, nil
}
// DecodeNameAndTags parses the name and tags of a single field of a Graphite datum.
func (p *Parser) DecodeNameAndTags(nameField string) (string, map[string]string, error) {
var (
measurement string
tags = make(map[string]string)
minLen int
)
fields := strings.Split(nameField, p.Separator)
if len(fields) > len(p.FieldNames) {
if !p.IgnoreUnnamed {
return measurement, tags, fmt.Errorf("received %q which contains unnamed field", nameField)
}
minLen = len(p.FieldNames)
} else {
minLen = len(fields)
}
// decode the name and tags
for i := 0; i < minLen; i++ {
if p.FieldNames[i] == "measurement" {
measurement = fields[i]
} else {
tags[p.FieldNames[i]] = fields[i]
}
}
if measurement == "" {
return measurement, tags, fmt.Errorf("no measurement specified for metric. %q", nameField)
}
return measurement, tags, nil
}

View File

@ -0,0 +1,248 @@
package graphite_test
import (
"strconv"
"testing"
"time"
"github.com/influxdb/influxdb/services/graphite"
)
func TestDecodeNameAndTags(t *testing.T) {
var tests = []struct {
test string
str string
measurement string
tags map[string]string
schema string
separator string
ignore bool
err string
}{
{test: "metric only",
str: "cpu",
measurement: "cpu",
schema: "measurement",
ignore: true,
},
{test: "metric with single series",
str: "cpu.server01",
measurement: "cpu",
ignore: true,
schema: "measurement.hostname",
tags: map[string]string{"hostname": "server01"},
},
{test: "metric with multiple series",
str: "cpu.us-west.server01",
measurement: "cpu",
ignore: true,
schema: "measurement.region.hostname",
tags: map[string]string{"hostname": "server01", "region": "us-west"},
},
{test: "no metric",
tags: make(map[string]string),
ignore: true,
err: `no measurement specified for metric. ""`,
},
{test: "ignore unnamed",
str: "foo.cpu",
ignore: true,
schema: "measurement",
tags: make(map[string]string),
measurement: "foo"},
{test: "not ignore unnamed",
str: "foo.cpu",
ignore: false,
schema: "measurement",
tags: make(map[string]string),
err: `received "foo.cpu" which contains unnamed field`,
},
{test: "name shorter than schema",
str: "foo",
schema: "measurement.A.B.C",
ignore: true,
tags: make(map[string]string),
measurement: "foo",
},
}
for _, test := range tests {
t.Logf("testing %q...", test.test)
if test.separator == "" {
test.separator = "."
}
p := graphite.NewParser(test.schema, test.separator, test.ignore)
measurement, tags, err := p.DecodeNameAndTags(test.str)
if errstr(err) != test.err {
t.Fatalf("err does not match. expected %v, got %v", test.err, err)
}
if err != nil {
// If we erred out,it was intended and the following tests won't work
continue
}
if measurement != test.measurement {
t.Fatalf("name parse failer. expected %v, got %v", test.measurement, measurement)
}
if len(tags) != len(test.tags) {
t.Fatalf("unexpected number of tags. expected %d, got %d", len(test.tags), len(tags))
}
for k, v := range test.tags {
if tags[k] != v {
t.Fatalf("unexpected tag value for tags[%s]. expected %q, got %q", k, v, tags[k])
}
}
}
}
func TestParse(t *testing.T) {
testTime := time.Now().Round(time.Second)
epochTime := testTime.Unix()
strTime := strconv.FormatInt(epochTime, 10)
var tests = []struct {
test string
line string
name string
tags map[string]string
value float64
time time.Time
separator string
schema string
ignore bool
err string
}{
{
test: "normal case",
line: `cpu.foo.bar 50 ` + strTime,
schema: "measurement.foo.bar",
name: "cpu",
tags: map[string]string{
"foo": "foo",
"bar": "bar",
},
value: 50,
time: testTime,
},
{
test: "DecodeNameAndTags returns error",
line: `cpu.foo.bar 50 ` + strTime,
schema: "a.b.c",
err: `no measurement specified for metric. "cpu.foo.bar"`,
},
{
test: "separator is . by default",
line: `cpu.foo.bar 50 ` + strTime,
name: "cpu",
schema: "measurement.foo.bar",
tags: map[string]string{
"foo": "foo",
"bar": "bar",
},
value: 50,
time: testTime,
},
{
test: "separator is . if specified",
separator: ".",
line: `cpu.foo.bar 50 ` + strTime,
name: "cpu",
schema: "measurement.foo.bar",
tags: map[string]string{
"foo": "foo",
"bar": "bar",
},
value: 50,
time: testTime,
},
{
test: "separator is - if specified",
separator: "-",
line: `cpu-foo-bar 50 ` + strTime,
name: "cpu",
schema: "measurement-foo-bar",
tags: map[string]string{
"foo": "foo",
"bar": "bar",
},
value: 50,
time: testTime,
},
{
test: "separator is boo if specified",
separator: "boo",
line: `cpuboofooboobar 50 ` + strTime,
name: "cpu",
schema: "measurementboofooboobar",
tags: map[string]string{
"foo": "foo",
"bar": "bar",
},
value: 50,
time: testTime,
},
{
test: "metric only with float value",
line: `cpu 50.554 ` + strTime,
name: "cpu",
schema: "measurement",
value: 50.554,
time: testTime,
},
{
test: "missing metric",
line: `50.554 1419972457825`,
err: `received "50.554 1419972457825" which doesn't have three fields`,
},
{
test: "should error parsing invalid float",
line: `cpu 50.554z 1419972457825`,
schema: "measurement",
err: `field "cpu" value: strconv.ParseFloat: parsing "50.554z": invalid syntax`,
},
{
test: "should error parsing invalid int",
line: `cpu 50z 1419972457825`,
schema: "measurement",
err: `field "cpu" value: strconv.ParseFloat: parsing "50z": invalid syntax`,
},
{
test: "should error parsing invalid time",
line: `cpu 50.554 14199724z57825`,
schema: "measurement",
err: `field "cpu" time: strconv.ParseFloat: parsing "14199724z57825": invalid syntax`,
},
}
for _, test := range tests {
t.Logf("testing %q...", test.test)
if test.separator == "" {
test.separator = "."
}
p := graphite.NewParser(test.schema, test.separator, test.ignore)
point, err := p.Parse(test.line)
if errstr(err) != test.err {
t.Fatalf("err does not match. expected %v, got %v", test.err, err)
}
if err != nil {
// If we erred out,it was intended and the following tests won't work
continue
}
if point.Name() != test.name {
t.Fatalf("name parse failer. expected %v, got %v", test.name, point.Name())
}
if len(point.Tags()) != len(test.tags) {
t.Fatalf("tags len mismatch. expected %d, got %d", len(test.tags), len(point.Tags()))
}
f := point.Fields()["value"].(float64)
if point.Fields()["value"] != f {
t.Fatalf("floatValue value mismatch. expected %v, got %v", test.value, f)
}
if point.Time().UnixNano()/1000000 != test.time.UnixNano()/1000000 {
t.Fatalf("time value mismatch. expected %v, got %v", test.time.UnixNano(), point.Time().UnixNano())
}
}
}

View File

@ -4,10 +4,8 @@ import (
"bufio"
"fmt"
"log"
"math"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
@ -249,90 +247,3 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
}
}
}
// Parser encapulates a Graphite Parser.
type Parser struct {
Separator string
FieldNames []string
IgnoreUnnamed bool
}
// NewParser returns a GraphiteParser instance.
func NewParser(schema string, separator string, ignore bool) *Parser {
return &Parser{
Separator: separator,
FieldNames: strings.Split(schema, separator),
IgnoreUnnamed: ignore,
}
}
// Parse performs Graphite parsing of a single line.
func (p *Parser) Parse(line string) (tsdb.Point, error) {
// Break into 3 fields (name, value, timestamp).
fields := strings.Fields(line)
if len(fields) != 3 {
return nil, fmt.Errorf("received %q which doesn't have three fields", line)
}
// decode the name and tags
name, tags, err := p.DecodeNameAndTags(fields[0])
if err != nil {
return nil, err
}
// Parse value.
v, err := strconv.ParseFloat(fields[1], 64)
if err != nil {
return nil, fmt.Errorf("field \"%s\" value: %s", fields[0], err)
}
fieldValues := make(map[string]interface{})
fieldValues["value"] = v
// Parse timestamp.
unixTime, err := strconv.ParseFloat(fields[2], 64)
if err != nil {
return nil, fmt.Errorf("field \"%s\" time: %s", fields[0], err)
}
// Check if we have fractional seconds
timestamp := time.Unix(int64(unixTime), int64((unixTime-math.Floor(unixTime))*float64(time.Second)))
point := tsdb.NewPoint(name, tags, fieldValues, timestamp)
return point, nil
}
// DecodeNameAndTags parses the name and tags of a single field of a Graphite datum.
func (p *Parser) DecodeNameAndTags(nameField string) (string, map[string]string, error) {
var (
measurement string
tags = make(map[string]string)
minLen int
)
fields := strings.Split(nameField, p.Separator)
if len(fields) > len(p.FieldNames) {
if !p.IgnoreUnnamed {
return measurement, tags, fmt.Errorf("received %q which contains unnamed field", nameField)
}
minLen = len(p.FieldNames)
} else {
minLen = len(fields)
}
// decode the name and tags
for i := 0; i < minLen; i++ {
if p.FieldNames[i] == "measurement" {
measurement = fields[i]
} else {
tags[p.FieldNames[i]] = fields[i]
}
}
if measurement == "" {
return measurement, tags, fmt.Errorf("no measurement specified for metric. %q", nameField)
}
return measurement, tags, nil
}

View File

@ -4,7 +4,6 @@ import (
"fmt"
"net"
"reflect"
"strconv"
"sync"
"testing"
"time"
@ -17,213 +16,6 @@ import (
"github.com/influxdb/influxdb/tsdb"
)
func Test_DecodeNameAndTags(t *testing.T) {
var tests = []struct {
test string
str string
name string
tags map[string]string
schema string
separator string
ignore bool
err string
}{
{test: "metric only", str: "cpu", name: "cpu", schema:"measurement", ignore: true},
{test: "metric with single series", str: "cpu.server01", name: "cpu", ignore: true,
schema:"measurement.hostname", tags: map[string]string{"hostname": "server01"}},
{test: "metric with multiple series", str: "cpu.us-west.server01", name: "cpu", ignore: true,
schema:"measurement.region.hostname", tags: map[string]string{"hostname": "server01", "region": "us-west"}},
{test: "no metric", tags: make(map[string]string), ignore: true,
err: `no measurement specified for metric. ""`},
{test: "ignore unnamed", str: "foo.cpu", ignore: true, schema: "measurement",
tags: make(map[string]string), name: "foo"},
{test: "not ignore unnamed", str: "foo.cpu", ignore: false, schema: "measurement",
tags: make(map[string]string), err: `received "foo.cpu" which contains unnamed field`},
{test: "name shorter than schema", str: "foo", schema: "measurement.A.B.C", ignore: true,
tags: make(map[string]string), name: "foo"},
}
for _, test := range tests {
t.Logf("testing %q...", test.test)
if test.separator == "" {
test.separator = "."
}
p := graphite.NewParser(test.schema, test.separator, test.ignore)
name, tags, err := p.DecodeNameAndTags(test.str)
if errstr(err) != test.err {
t.Fatalf("err does not match. expected %v, got %v", test.err, err)
}
if err != nil {
// If we erred out,it was intended and the following tests won't work
continue
}
if name != test.name {
t.Fatalf("name parse failer. expected %v, got %v", test.name, name)
}
if len(tags) != len(test.tags) {
t.Fatalf("unexpected number of tags. expected %d, got %d", len(test.tags), len(tags))
}
for k, v := range test.tags {
if tags[k] != v {
t.Fatalf("unexpected tag value for tags[%s]. expected %q, got %q", k, v, tags[k])
}
}
}
}
func Test_DecodeMetric(t *testing.T) {
testTime := time.Now().Round(time.Second)
epochTime := testTime.Unix()
strTime := strconv.FormatInt(epochTime, 10)
var tests = []struct {
test string
line string
name string
tags map[string]string
value float64
time time.Time
separator string
schema string
ignore bool
err string
}{
{
test: "normal case",
line: `cpu.foo.bar 50 ` + strTime,
schema: "measurement.foo.bar",
name: "cpu",
tags: map[string]string{
"foo": "foo",
"bar": "bar",
},
value: 50,
time: testTime,
},
{
test: "DecodeNameAndTags returns error",
line: `cpu.foo.bar 50 ` + strTime,
schema: "a.b.c",
err: `no measurement specified for metric. "cpu.foo.bar"`,
},
{
test: "separator is . by default",
line: `cpu.foo.bar 50 ` + strTime,
name: "cpu",
schema: "measurement.foo.bar",
tags: map[string]string{
"foo": "foo",
"bar": "bar",
},
value: 50,
time: testTime,
},
{
test: "separator is . if specified",
separator: ".",
line: `cpu.foo.bar 50 ` + strTime,
name: "cpu",
schema: "measurement.foo.bar",
tags: map[string]string{
"foo": "foo",
"bar": "bar",
},
value: 50,
time: testTime,
},
{
test: "separator is - if specified",
separator: "-",
line: `cpu-foo-bar 50 ` + strTime,
name: "cpu",
schema: "measurement-foo-bar",
tags: map[string]string{
"foo": "foo",
"bar": "bar",
},
value: 50,
time: testTime,
},
{
test: "separator is boo if specified",
separator: "boo",
line: `cpuboofooboobar 50 ` + strTime,
name: "cpu",
schema: "measurementboofooboobar",
tags: map[string]string{
"foo": "foo",
"bar": "bar",
},
value: 50,
time: testTime,
},
{
test: "metric only with float value",
line: `cpu 50.554 ` + strTime,
name: "cpu",
schema: "measurement",
value: 50.554,
time: testTime,
},
{
test: "missing metric",
line: `50.554 1419972457825`,
err: `received "50.554 1419972457825" which doesn't have three fields`,
},
{
test: "should error parsing invalid float",
line: `cpu 50.554z 1419972457825`,
schema: "measurement",
err: `field "cpu" value: strconv.ParseFloat: parsing "50.554z": invalid syntax`,
},
{
test: "should error parsing invalid int",
line: `cpu 50z 1419972457825`,
schema: "measurement",
err: `field "cpu" value: strconv.ParseFloat: parsing "50z": invalid syntax`,
},
{
test: "should error parsing invalid time",
line: `cpu 50.554 14199724z57825`,
schema: "measurement",
err: `field "cpu" time: strconv.ParseFloat: parsing "14199724z57825": invalid syntax`,
},
}
for _, test := range tests {
t.Logf("testing %q...", test.test)
if test.separator == "" {
test.separator = "."
}
p := graphite.NewParser(test.schema, test.separator, test.ignore)
point, err := p.Parse(test.line)
if errstr(err) != test.err {
t.Fatalf("err does not match. expected %v, got %v", test.err, err)
}
if err != nil {
// If we erred out,it was intended and the following tests won't work
continue
}
if point.Name() != test.name {
t.Fatalf("name parse failer. expected %v, got %v", test.name, point.Name())
}
if len(point.Tags()) != len(test.tags) {
t.Fatalf("tags len mismatch. expected %d, got %d", len(test.tags), len(point.Tags()))
}
f := point.Fields()["value"].(float64)
if point.Fields()["value"] != f {
t.Fatalf("floatValue value mismatch. expected %v, got %v", test.value, f)
}
if point.Time().UnixNano()/1000000 != test.time.UnixNano()/1000000 {
t.Fatalf("time value mismatch. expected %v, got %v", test.time.UnixNano(), point.Time().UnixNano())
}
}
}
func Test_ServerGraphiteTCP(t *testing.T) {
t.Parallel()