Implement a distinct Graphite Parser

This encapsulates a few previously free-floating methods. This means
that params such as "separator" and "last-enabled" are not passed from
function to function, and also makes initialization of TCP and UDP
Servers somewhat clearer.

It also removes early use of "GraphiteProcessor".
pull/1294/head
Philip O'Toole 2015-01-06 23:18:14 -08:00
parent 1edbd816fc
commit f3650b4797
5 changed files with 113 additions and 144 deletions

View File

@ -97,22 +97,25 @@ func execRun(args []string) {
continue continue
} }
var g graphite.GraphiteServer // Configure Graphite parsing.
if strings.ToLower(g.Protocol) == "tcp" { parser := graphite.NewParser()
g = graphite.NewTcpGraphiteServer(s) parser.Separator = c.NameSeparator
} else { parser.LastEnabled = (c.NamePosition == "last")
g = graphite.NewUdpGraphiteServer(s)
}
// Set options if strings.ToLower(c.Protocol) == "tcp" {
g.Database = g.Database g := graphite.NewTCPServer(parser, s)
g.NamePosition = g.NamePosition g.Database = c.Database
g.NameSeparator = g.NameSeparator err := g.ListenAndServe(c.ConnectionString(config.BindAddress))
// Start the Graphite Server.
err := g.Start(c.ConnectionString(config.BindAddress))
if err != nil { if err != nil {
log.Println("failed to start Graphite Server", err.Error()) log.Println("failed to start TCP Graphite Server", err.Error())
}
} else {
g := graphite.NewUDPServer(parser, s)
g.Database = c.Database
err := g.ListenAndServe(c.ConnectionString(config.BindAddress))
if err != nil {
log.Println("failed to start UDP Graphite Server", err.Error())
}
} }
} }
} }

View File

@ -1,15 +1,11 @@
package graphite package graphite
import ( import (
"bufio"
"errors" "errors"
"fmt" "fmt"
"io"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/influxdb/influxdb"
) )
var ( var (
@ -27,92 +23,32 @@ var (
ErrServerNotSpecified = errors.New("server not present") ErrServerNotSpecified = errors.New("server not present")
) )
type ProcessorSink interface { type SeriesWriter interface {
WriteSeries(database, retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) error WriteSeries(database, retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) error
DefaultRetentionPolicy(database string) (*influxdb.RetentionPolicy, error)
} }
// Processor performs processing of Graphite data, and writes it to a sink. type Parser struct {
type Processor struct { Separator string
// Sink is the destination for processed Graphite data. LastEnabled bool
sink dataSink
// Database is the name of the database to insert data into.
Database string
// NamePosition is the position of name to be parsed from metric_path
NamePosition string
// NameSeparator is separator to parse metric_path with to get name and series
NameSeparator string
} }
func NewProcessor(sink *ProcessorSink) *Processor { func NewParser() *Parser {
p = Processor{} p := Parser{}
p.Separator = "."
return &p return &p
} }
// handleMessage decodes a graphite message from the reader and sends it to the
// committer goroutine.
func (p *processor) handleMessage(r *bufio.Reader) error {
// Decode graphic metric.
m, err := DecodeMetric(r, s.NamePosition, s.NameSeparator)
if err != nil {
return err
}
// Convert metric to a field value.
var values = make(map[string]interface{})
values[m.Name] = m.Value
retentionPolicy, err := s.sink.DefaultRetentionPolicy(s.Database)
if err != nil {
return fmt.Errorf("error looking up default database retention policy: %s", err)
}
if err := s.sink.WriteSeries(
s.Database,
retentionPolicy.Name,
m.Name,
m.Tags,
m.Timestamp,
values,
); err != nil {
return fmt.Errorf("write series data: %s", err)
}
return nil
}
type Metric struct {
Name string
Tags map[string]string
Value interface{}
Timestamp time.Time
}
// returns err == io.EOF when we hit EOF without any further data // returns err == io.EOF when we hit EOF without any further data
func DecodeMetric(r *bufio.Reader, position, separator string) (*Metric, error) { func (p *Parser) parse(line string) (*metric, error) {
// Read up to the next newline.
buf, err := r.ReadBytes('\n')
if err != nil && err != io.EOF {
// it's possible to get EOF but also data
return nil, fmt.Errorf("connection closed uncleanly/broken: %s\n", err.Error())
}
// Trim the buffer, even though there should be no padding
str := strings.TrimSpace(string(buf))
// Remove line return
str = strings.TrimSuffix(str, `\n`)
if str == "" {
return nil, err
}
// Break into 3 fields (name, value, timestamp). // Break into 3 fields (name, value, timestamp).
fields := strings.Fields(str) fields := strings.Fields(line)
if len(fields) != 3 { if len(fields) != 3 {
return nil, fmt.Errorf("received %q which doesn't have three fields", str) return nil, fmt.Errorf("received %q which doesn't have three fields", line)
} }
m := new(Metric) m := new(metric)
// decode the name and tags // decode the name and tags
name, tags, err := DecodeNameAndTags(fields[0], position, separator) name, tags, err := p.decodeNameAndTags(fields[0])
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -143,36 +79,28 @@ func DecodeMetric(r *bufio.Reader, position, separator string) (*Metric, error)
return m, nil return m, nil
} }
func DecodeNameAndTags(field, position, separator string) (string, map[string]string, error) { func (p *Parser) decodeNameAndTags(field string) (string, map[string]string, error) {
var ( var (
name string name string
tags = make(map[string]string) tags = make(map[string]string)
) )
if separator == "" {
separator = "."
}
// decode the name and tags // decode the name and tags
values := strings.Split(field, separator) values := strings.Split(field, p.Separator)
if len(values)%2 != 1 { if len(values)%2 != 1 {
// There should always be an odd number of fields to map a metric name and tags // There should always be an odd number of fields to map a metric name and tags
// ex: region.us-west.hostname.server01.cpu -> tags -> region: us-west, hostname: server01, metric name -> cpu // ex: region.us-west.hostname.server01.cpu -> tags -> region: us-west, hostname: server01, metric name -> cpu
return name, tags, fmt.Errorf("received %q which doesn't conform to format of key.value.key.value.metric or metric", field) return name, tags, fmt.Errorf("received %q which doesn't conform to format of key.value.key.value.metric or metric", field)
} }
np := strings.ToLower(strings.TrimSpace(position)) if p.LastEnabled {
switch np {
case "last":
name = values[len(values)-1] name = values[len(values)-1]
values = values[0 : len(values)-1] values = values[0 : len(values)-1]
case "first": } else {
name = values[0]
values = values[1:len(values)]
default:
name = values[0] name = values[0]
values = values[1:len(values)] values = values[1:len(values)]
} }
if name == "" { if name == "" {
return name, tags, fmt.Errorf("no name specified for metric. %q", field) return name, tags, fmt.Errorf("no name specified for metric. %q", field)
} }
@ -186,3 +114,10 @@ func DecodeNameAndTags(field, position, separator string) (string, map[string]st
return name, tags, nil return name, tags, nil
} }
type metric struct {
Name string
Tags map[string]string
Value interface{}
Timestamp time.Time
}

View File

@ -2,32 +2,35 @@ package graphite
import ( import (
"bufio" "bufio"
"io"
"log" "log"
"net" "net"
"strings"
) )
// TcpGraphiteServer processes Graphite data received over TCP connections. // TCPServer processes Graphite data received over TCP connections.
type TcpGraphiteServer struct { type TCPServer struct {
GraphiteServer writer SeriesWriter
parser *Parser
Database string
} }
// NewTcpGraphiteServer returns a new instance of a TcpGraphiteServer. // NewTCPServer returns a new instance of a TCPServer.
func NewTcpGraphiteServer(d dataSink) *TcpGraphiteServer { func NewTCPServer(p *Parser, w SeriesWriter) *TCPServer {
t := TcpGraphiteServer{} t := TCPServer{
t.sink = d parser: p,
writer: w,
}
return &t return &t
} }
// Start instructs the TcpGraphiteServer to start processing Graphite data // ListenAndServe instructs the TCPServer to start processing Graphite data
// on the given interface. iface must be in the form host:port // on the given interface. iface must be in the form host:port
func (t *TcpGraphiteServer) Start(iface string) error { func (t *TCPServer) ListenAndServe(iface string) error {
if iface == "" { // Make sure we have an address if iface == "" { // Make sure we have an address
return ErrBindAddressRequired return ErrBindAddressRequired
} else if t.Database == "" { // Make sure they have a database } else if t.Database == "" { // Make sure they have a database
return ErrDatabaseNotSpecified return ErrDatabaseNotSpecified
} else if t.sink == nil { // Make sure they specified a backend sink
return ErrServerNotSpecified
} }
ln, err := net.Listen("tcp", iface) ln, err := net.Listen("tcp", iface)
@ -38,7 +41,7 @@ func (t *TcpGraphiteServer) Start(iface string) error {
for { for {
conn, err := ln.Accept() conn, err := ln.Accept()
if err != nil { if err != nil {
log.Println("erorr accepting TCP connection", err.Error()) log.Println("error accepting TCP connection", err.Error())
continue continue
} }
go t.handleConnection(conn) go t.handleConnection(conn)
@ -46,18 +49,31 @@ func (t *TcpGraphiteServer) Start(iface string) error {
}() }()
return nil return nil
} }
func (t *TcpGraphiteServer) handleConnection(conn net.Conn) { func (t *TCPServer) handleConnection(conn net.Conn) {
defer conn.Close() defer conn.Close()
reader := bufio.NewReader(conn) reader := bufio.NewReader(conn)
for { for {
err := t.handleMessage(reader) // Read up to the next newline.
buf, err := reader.ReadBytes('\n')
if err != nil { if err != nil {
if err == io.EOF {
return return
} else { }
log.Println("ignoring error reading graphite data over TCP", err.Error())
} // Trim the buffer, even though there should be no padding
} line := strings.TrimSpace(string(buf))
// Parse it.
metric, err := t.parser.parse(line)
if err != nil {
continue
}
// Convert metric to a field value.
var values = make(map[string]interface{})
values[metric.Name] = metric.Value
// Send the data to database
t.writer.WriteSeries(t.Database, "", metric.Name, metric.Tags, metric.Timestamp, values)
} }
} }

View File

@ -1,9 +1,6 @@
package graphite package graphite
import ( import (
"bufio"
"io"
"log"
"net" "net"
"strings" "strings"
) )
@ -12,27 +9,30 @@ const (
udpBufferSize = 65536 udpBufferSize = 65536
) )
// UdpGraphiteServer processes Graphite data received via UDP packets. // UDPerver processes Graphite data received via UDP.
type UdpGraphiteServer struct { type UDPServer struct {
GrpahiteServer writer SeriesWriter
parser *Parser
Database string
} }
// NewUdpGraphiteServer returns a new instance of a UdpGraphiteServer. // NewUDPServer returns a new instance of a UDPServer
func NewUdpGraphiteServer(d dataSink) *UdpGraphiteServer { func NewUDPServer(p *Parser, w SeriesWriter) *UDPServer {
u := UdpGraphiteServer{} u := UDPServer{
u.sink = d parser: p,
writer: w,
}
return &u return &u
} }
// Start instructs the UdpGraphiteServer to start processing Graphite data // Start instructs the UdpGraphiteServer to start processing Graphite data
// on the given interface. iface must be in the form host:port // on the given interface. iface must be in the form host:port
func (u *UdpGraphiteServer) Start(iface string) error { func (u *UDPServer) ListenAndServe(iface string) error {
if iface == "" { // Make sure we have an address if iface == "" { // Make sure we have an address
return ErrBindAddressRequired return ErrBindAddressRequired
} else if u.Database == "" { // Make sure they have a database } else if u.Database == "" { // Make sure they have a database
return ErrDatabaseNotSpecified return ErrDatabaseNotSpecified
} else if u.sink == nil { // Make sure they specified a backend sink
return ErrServerNotSpecified
} }
addr, err := net.ResolveUDPAddr("udp", iface) addr, err := net.ResolveUDPAddr("udp", iface)
@ -50,14 +50,20 @@ func (u *UdpGraphiteServer) Start(iface string) error {
for { for {
n, _, err := conn.ReadFromUDP(buf) n, _, err := conn.ReadFromUDP(buf)
if err != nil { if err != nil {
if err == io.EOF {
return return
} else {
log.Println("ignoring error reading Graphite data over UDP", err.Error())
} }
for _, line := range strings.Split(string(buf[:n]), "\n") {
m, err := u.parser.parse(line)
if err != nil {
continue
} }
for _, metric := range strings.Split(string(buf[:n]), "\n") {
u.handleMessage(bufio.NewReader(strings.NewReader(metric + "\n"))) // Convert metric to a field value.
var values = make(map[string]interface{})
values[m.Name] = m.Value
// Send the data to database
u.writer.WriteSeries(u.Database, "", m.Name, m.Tags, m.Timestamp, values)
} }
} }
}() }()

View File

@ -1065,6 +1065,15 @@ func (s *Server) WriteSeries(database, retentionPolicy, name string, tags map[st
return err return err
} }
// If the retention policy is not set, use the default for this database.
if retentionPolicy == "" {
rp, err := s.DefaultRetentionPolicy(database)
if err != nil {
return fmt.Errorf("failed to determine default Retention Policy", err.Error())
}
retentionPolicy = rp.Name
}
// Now write it into the shard. // Now write it into the shard.
sh, err := s.createShardIfNotExists(database, retentionPolicy, id, timestamp) sh, err := s.createShardIfNotExists(database, retentionPolicy, id, timestamp)
if err != nil { if err != nil {