diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 5f36df8c71..2b039f133d 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -97,22 +97,25 @@ func execRun(args []string) { continue } - var g graphite.GraphiteServer - if strings.ToLower(g.Protocol) == "tcp" { - g = graphite.NewTcpGraphiteServer(s) + // Configure Graphite parsing. + parser := graphite.NewParser() + parser.Separator = c.NameSeparator + parser.LastEnabled = (c.NamePosition == "last") + + if strings.ToLower(c.Protocol) == "tcp" { + g := graphite.NewTCPServer(parser, s) + g.Database = c.Database + err := g.ListenAndServe(c.ConnectionString(config.BindAddress)) + if err != nil { + log.Println("failed to start TCP Graphite Server", err.Error()) + } } else { - g = graphite.NewUdpGraphiteServer(s) - } - - // Set options - g.Database = g.Database - g.NamePosition = g.NamePosition - g.NameSeparator = g.NameSeparator - - // Start the Graphite Server. - err := g.Start(c.ConnectionString(config.BindAddress)) - if err != nil { - log.Println("failed to start Graphite Server", err.Error()) + g := graphite.NewUDPServer(parser, s) + g.Database = c.Database + err := g.ListenAndServe(c.ConnectionString(config.BindAddress)) + if err != nil { + log.Println("failed to start UDP Graphite Server", err.Error()) + } } } } diff --git a/graphite/graphite.go b/graphite/graphite.go index e0b3753d3c..cd6979b510 100644 --- a/graphite/graphite.go +++ b/graphite/graphite.go @@ -1,15 +1,11 @@ package graphite import ( - "bufio" "errors" "fmt" - "io" "strconv" "strings" "time" - - "github.com/influxdb/influxdb" ) var ( @@ -27,92 +23,32 @@ var ( ErrServerNotSpecified = errors.New("server not present") ) -type ProcessorSink interface { +type SeriesWriter interface { WriteSeries(database, retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) error - DefaultRetentionPolicy(database string) (*influxdb.RetentionPolicy, error) } -// Processor performs processing of Graphite data, and writes it to a sink. -type Processor struct { - // Sink is the destination for processed Graphite data. - sink dataSink - - // Database is the name of the database to insert data into. - Database string - // NamePosition is the position of name to be parsed from metric_path - NamePosition string - // NameSeparator is separator to parse metric_path with to get name and series - NameSeparator string +type Parser struct { + Separator string + LastEnabled bool } -func NewProcessor(sink *ProcessorSink) *Processor { - p = Processor{} +func NewParser() *Parser { + p := Parser{} + p.Separator = "." return &p } -// handleMessage decodes a graphite message from the reader and sends it to the -// committer goroutine. -func (p *processor) handleMessage(r *bufio.Reader) error { - // Decode graphic metric. - m, err := DecodeMetric(r, s.NamePosition, s.NameSeparator) - if err != nil { - return err - } - - // Convert metric to a field value. - var values = make(map[string]interface{}) - values[m.Name] = m.Value - - retentionPolicy, err := s.sink.DefaultRetentionPolicy(s.Database) - - if err != nil { - return fmt.Errorf("error looking up default database retention policy: %s", err) - } - - if err := s.sink.WriteSeries( - s.Database, - retentionPolicy.Name, - m.Name, - m.Tags, - m.Timestamp, - values, - ); err != nil { - return fmt.Errorf("write series data: %s", err) - } - return nil -} - -type Metric struct { - Name string - Tags map[string]string - Value interface{} - Timestamp time.Time -} - // returns err == io.EOF when we hit EOF without any further data -func DecodeMetric(r *bufio.Reader, position, separator string) (*Metric, error) { - // Read up to the next newline. - buf, err := r.ReadBytes('\n') - if err != nil && err != io.EOF { - // it's possible to get EOF but also data - return nil, fmt.Errorf("connection closed uncleanly/broken: %s\n", err.Error()) - } - // Trim the buffer, even though there should be no padding - str := strings.TrimSpace(string(buf)) - // Remove line return - str = strings.TrimSuffix(str, `\n`) - if str == "" { - return nil, err - } +func (p *Parser) parse(line string) (*metric, error) { // Break into 3 fields (name, value, timestamp). - fields := strings.Fields(str) + fields := strings.Fields(line) if len(fields) != 3 { - return nil, fmt.Errorf("received %q which doesn't have three fields", str) + return nil, fmt.Errorf("received %q which doesn't have three fields", line) } - m := new(Metric) + m := new(metric) // decode the name and tags - name, tags, err := DecodeNameAndTags(fields[0], position, separator) + name, tags, err := p.decodeNameAndTags(fields[0]) if err != nil { return nil, err } @@ -143,36 +79,28 @@ func DecodeMetric(r *bufio.Reader, position, separator string) (*Metric, error) return m, nil } -func DecodeNameAndTags(field, position, separator string) (string, map[string]string, error) { +func (p *Parser) decodeNameAndTags(field string) (string, map[string]string, error) { var ( name string tags = make(map[string]string) ) - if separator == "" { - separator = "." - } - // decode the name and tags - values := strings.Split(field, separator) + values := strings.Split(field, p.Separator) if len(values)%2 != 1 { // There should always be an odd number of fields to map a metric name and tags // ex: region.us-west.hostname.server01.cpu -> tags -> region: us-west, hostname: server01, metric name -> cpu return name, tags, fmt.Errorf("received %q which doesn't conform to format of key.value.key.value.metric or metric", field) } - np := strings.ToLower(strings.TrimSpace(position)) - switch np { - case "last": + if p.LastEnabled { name = values[len(values)-1] values = values[0 : len(values)-1] - case "first": - name = values[0] - values = values[1:len(values)] - default: + } else { name = values[0] values = values[1:len(values)] } + if name == "" { return name, tags, fmt.Errorf("no name specified for metric. %q", field) } @@ -186,3 +114,10 @@ func DecodeNameAndTags(field, position, separator string) (string, map[string]st return name, tags, nil } + +type metric struct { + Name string + Tags map[string]string + Value interface{} + Timestamp time.Time +} diff --git a/graphite/graphite_tcp.go b/graphite/graphite_tcp.go index ce4820b72b..ec1306279b 100644 --- a/graphite/graphite_tcp.go +++ b/graphite/graphite_tcp.go @@ -2,32 +2,35 @@ package graphite import ( "bufio" - "io" "log" "net" + "strings" ) -// TcpGraphiteServer processes Graphite data received over TCP connections. -type TcpGraphiteServer struct { - GraphiteServer +// TCPServer processes Graphite data received over TCP connections. +type TCPServer struct { + writer SeriesWriter + parser *Parser + + Database string } -// NewTcpGraphiteServer returns a new instance of a TcpGraphiteServer. -func NewTcpGraphiteServer(d dataSink) *TcpGraphiteServer { - t := TcpGraphiteServer{} - t.sink = d +// NewTCPServer returns a new instance of a TCPServer. +func NewTCPServer(p *Parser, w SeriesWriter) *TCPServer { + t := TCPServer{ + parser: p, + writer: w, + } return &t } -// Start instructs the TcpGraphiteServer to start processing Graphite data +// ListenAndServe instructs the TCPServer to start processing Graphite data // on the given interface. iface must be in the form host:port -func (t *TcpGraphiteServer) Start(iface string) error { +func (t *TCPServer) ListenAndServe(iface string) error { if iface == "" { // Make sure we have an address return ErrBindAddressRequired } else if t.Database == "" { // Make sure they have a database return ErrDatabaseNotSpecified - } else if t.sink == nil { // Make sure they specified a backend sink - return ErrServerNotSpecified } ln, err := net.Listen("tcp", iface) @@ -38,7 +41,7 @@ func (t *TcpGraphiteServer) Start(iface string) error { for { conn, err := ln.Accept() if err != nil { - log.Println("erorr accepting TCP connection", err.Error()) + log.Println("error accepting TCP connection", err.Error()) continue } go t.handleConnection(conn) @@ -46,18 +49,31 @@ func (t *TcpGraphiteServer) Start(iface string) error { }() return nil } -func (t *TcpGraphiteServer) handleConnection(conn net.Conn) { +func (t *TCPServer) handleConnection(conn net.Conn) { defer conn.Close() reader := bufio.NewReader(conn) for { - err := t.handleMessage(reader) + // Read up to the next newline. + buf, err := reader.ReadBytes('\n') if err != nil { - if err == io.EOF { - return - } else { - log.Println("ignoring error reading graphite data over TCP", err.Error()) - } + return } + + // Trim the buffer, even though there should be no padding + line := strings.TrimSpace(string(buf)) + + // Parse it. + metric, err := t.parser.parse(line) + if err != nil { + continue + } + + // Convert metric to a field value. + var values = make(map[string]interface{}) + values[metric.Name] = metric.Value + + // Send the data to database + t.writer.WriteSeries(t.Database, "", metric.Name, metric.Tags, metric.Timestamp, values) } } diff --git a/graphite/graphite_udp.go b/graphite/graphite_udp.go index 9b5259f50d..50a07fc5db 100644 --- a/graphite/graphite_udp.go +++ b/graphite/graphite_udp.go @@ -1,9 +1,6 @@ package graphite import ( - "bufio" - "io" - "log" "net" "strings" ) @@ -12,27 +9,30 @@ const ( udpBufferSize = 65536 ) -// UdpGraphiteServer processes Graphite data received via UDP packets. -type UdpGraphiteServer struct { - GrpahiteServer +// UDPerver processes Graphite data received via UDP. +type UDPServer struct { + writer SeriesWriter + parser *Parser + + Database string } -// NewUdpGraphiteServer returns a new instance of a UdpGraphiteServer. -func NewUdpGraphiteServer(d dataSink) *UdpGraphiteServer { - u := UdpGraphiteServer{} - u.sink = d +// NewUDPServer returns a new instance of a UDPServer +func NewUDPServer(p *Parser, w SeriesWriter) *UDPServer { + u := UDPServer{ + parser: p, + writer: w, + } return &u } // Start instructs the UdpGraphiteServer to start processing Graphite data // on the given interface. iface must be in the form host:port -func (u *UdpGraphiteServer) Start(iface string) error { +func (u *UDPServer) ListenAndServe(iface string) error { if iface == "" { // Make sure we have an address return ErrBindAddressRequired } else if u.Database == "" { // Make sure they have a database return ErrDatabaseNotSpecified - } else if u.sink == nil { // Make sure they specified a backend sink - return ErrServerNotSpecified } addr, err := net.ResolveUDPAddr("udp", iface) @@ -50,14 +50,20 @@ func (u *UdpGraphiteServer) Start(iface string) error { for { n, _, err := conn.ReadFromUDP(buf) if err != nil { - if err == io.EOF { - return - } else { - log.Println("ignoring error reading Graphite data over UDP", err.Error()) - } + return } - for _, metric := range strings.Split(string(buf[:n]), "\n") { - u.handleMessage(bufio.NewReader(strings.NewReader(metric + "\n"))) + for _, line := range strings.Split(string(buf[:n]), "\n") { + m, err := u.parser.parse(line) + if err != nil { + continue + } + + // Convert metric to a field value. + var values = make(map[string]interface{}) + values[m.Name] = m.Value + + // Send the data to database + u.writer.WriteSeries(u.Database, "", m.Name, m.Tags, m.Timestamp, values) } } }() diff --git a/server.go b/server.go index ae55da2237..41fe71f3ce 100644 --- a/server.go +++ b/server.go @@ -1065,6 +1065,15 @@ func (s *Server) WriteSeries(database, retentionPolicy, name string, tags map[st return err } + // If the retention policy is not set, use the default for this database. + if retentionPolicy == "" { + rp, err := s.DefaultRetentionPolicy(database) + if err != nil { + return fmt.Errorf("failed to determine default Retention Policy", err.Error()) + } + retentionPolicy = rp.Name + } + // Now write it into the shard. sh, err := s.createShardIfNotExists(database, retentionPolicy, id, timestamp) if err != nil {