influxdb/graphite/graphite.go

331 lines
7.9 KiB
Go
Raw Normal View History

package graphite
2014-03-04 01:52:57 +00:00
import (
"bufio"
2014-10-22 05:32:19 +00:00
"errors"
"fmt"
"io"
2014-03-04 01:52:57 +00:00
"net"
"strconv"
"strings"
graphite ingest write data in batches to coordinator Close #644 This commit also include lots of cleanup related to start up and shutting down as well as logging. Below is an explanation of how the api starts up and shuts down. It also covers the error conditions and how they are handled. networking/goroutine fixes * break from TCP Accept() loop when connection closed, which was preventing shutdown to proceed * make sure that UDP functionality doesn't write to writeSeries channel after it has been closed. * clearer, more specific shutdown message in particular: * self.writers allows us to make sure things writing to writeSeries are done (they do blocking calls to handleMessage()) whether udp or tcp * self.connClosed lets us break from the Accept() loop, see http://zhen.org/blog/graceful-shutdown-of-go-net-dot-listeners/ (quit channel) * shutdown channel is now allCommitted things can get a little complicated, so here's a little schematic of how the functions and their logic relate: indent for a call out or important code within. everything shown as one nested tree server.go go ListenAndServe go committer reads from self.writeSeries until closed, then writes to self.allCommitted Serve for { Accept, breaks if err + connClosed self.writers.Add() go handleClient for { handleMessage reads until err and writes to self.writeSeries until read failed reads until EOF, ignores other handleMessage errors } conn.Close() self.writers.Done() } self.writers.Wait() close(self.writeSeries) Close() close(self.connClosed) self.conn.Close() wants confirmation on allCommitted channel; [timeout] returns within 5s
2014-04-11 12:25:09 +00:00
"sync"
"time"
"github.com/influxdb/influxdb"
log "code.google.com/p/log4go"
2014-03-04 01:52:57 +00:00
)
2014-10-22 05:32:19 +00:00
var (
// ErrBindAddressRequired is returned when starting the Server
2014-10-22 05:32:19 +00:00
// without a TCP or UDP listening address.
ErrBindAddressRequired = errors.New("bind address required")
2014-03-04 01:52:57 +00:00
// ErrServerClosed return when closing an already closed graphite server.
ErrServerClosed = errors.New("server already closed")
// ErrDatabaseNotSpecified retuned when no database was specified in the config file
ErrDatabaseNotSpecified = errors.New("database was not specified in config")
// ErrServerNotSpecified returned when Server is not specified.
ErrServerNotSpecified = errors.New("server not present")
2014-10-23 04:21:48 +00:00
)
2014-10-22 05:32:19 +00:00
// Graphite Server provides a tcp and/or udp listener that you can
2014-10-22 05:32:19 +00:00
// use to ingest metrics into influxdb via the graphite protocol. it
// behaves as a carbon daemon, except:
//
// no rounding of timestamps to the nearest interval. Upon ingestion
// of multiple datapoints for a given key within the same interval
// (possibly but not necessarily the same timestamp), graphite would
// use one (the latest received) value with a rounded timestamp
// representing that interval. We store values for every timestamp we
// receive (only the latest value for a given metric-timestamp pair)
// so it's up to the user to feed the data in proper intervals (and
// use round intervals if you plan to rely on that)
type Server struct {
mu sync.Mutex
wg sync.WaitGroup
done chan struct{} // close notification
Server interface {
WriteSeries(database, retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) error
DefaultRetentionPolicy(database string) (*influxdb.RetentionPolicy, error)
}
2014-10-22 05:32:19 +00:00
// The TCP address to listen on.
TCPAddr *net.TCPAddr
2014-10-22 05:32:19 +00:00
// The UDP address to listen on.
UDPAddr *net.UDPAddr
2014-10-22 05:32:19 +00:00
// The name of the database to insert data into.
Database string
2014-03-04 01:52:57 +00:00
}
2014-10-22 05:32:19 +00:00
// ListenAndServe opens TCP (and optionally a UDP) socket to listen for messages.
func (s *Server) ListenAndServe() error {
2014-10-22 05:32:19 +00:00
// Make sure we have either a TCP address or a UDP address.
// Make sure they have a database
2014-10-22 05:32:19 +00:00
if s.TCPAddr == nil && s.UDPAddr == nil {
return ErrBindAddressRequired
} else if s.Database == "" {
return ErrDatabaseNotSpecified
} else if s.Server == nil {
return ErrServerNotSpecified
2014-10-22 05:32:19 +00:00
}
// Create a new close notification channel.
done := make(chan struct{}, 0)
s.done = done
2014-03-04 01:52:57 +00:00
2014-10-22 05:32:19 +00:00
// Open the TCP connection.
if s.TCPAddr != nil {
2014-10-23 04:21:48 +00:00
l, err := net.ListenTCP("tcp", s.TCPAddr)
2014-03-04 01:52:57 +00:00
if err != nil {
2014-10-22 05:32:19 +00:00
return err
2014-03-04 01:52:57 +00:00
}
2014-10-23 04:21:48 +00:00
defer func() { _ = l.Close() }()
s.wg.Add(1)
go s.serveTCP(l, done)
2014-03-04 01:52:57 +00:00
}
2014-10-22 05:32:19 +00:00
// Open the UDP connection.
if s.UDPAddr != nil {
2014-10-23 04:21:48 +00:00
l, err := net.ListenUDP("udp", s.UDPAddr)
2014-10-22 05:32:19 +00:00
if err != nil {
return err
}
2014-10-23 04:21:48 +00:00
defer func() { _ = l.Close() }()
2014-10-22 05:32:19 +00:00
2014-10-23 04:21:48 +00:00
s.wg.Add(1)
go s.serveUDP(l, done)
}
2014-10-22 05:32:19 +00:00
return nil
2014-03-04 01:52:57 +00:00
}
2014-10-22 05:32:19 +00:00
// serveTCP handles incoming TCP connection requests.
func (s *Server) serveTCP(l *net.TCPListener, done chan struct{}) {
2014-10-23 04:21:48 +00:00
defer s.wg.Done()
// Listen for server close.
go func() {
<-done
l.Close()
}()
// Listen for new TCP connections.
2014-03-04 01:52:57 +00:00
for {
2014-10-22 05:32:19 +00:00
c, err := l.Accept()
2014-03-04 01:52:57 +00:00
if err != nil {
2014-10-22 05:32:19 +00:00
// TODO(benbjohnson): Check for connection closed.
log.Error("graphite.Server: Accept: ", err)
2014-03-04 01:52:57 +00:00
continue
}
2014-10-22 05:32:19 +00:00
s.wg.Add(1)
go s.handleTCPConn(c)
}
}
func (s *Server) handleTCPConn(conn net.Conn) {
2014-10-22 05:32:19 +00:00
defer conn.Close()
defer s.wg.Done()
reader := bufio.NewReader(conn)
for {
err := s.handleMessage(reader)
if err != nil {
if io.EOF == err {
log.Debug("graphite.Server: Client closed graphite connection")
2014-10-22 05:32:19 +00:00
return
}
log.Error("graphite.Server:", err)
2014-10-22 05:32:19 +00:00
}
2014-03-04 01:52:57 +00:00
}
}
2014-10-22 05:32:19 +00:00
// serveUDP handles incoming UDP messages.
func (s *Server) serveUDP(conn *net.UDPConn, done chan struct{}) {
2014-10-22 05:32:19 +00:00
defer s.wg.Done()
2014-10-23 04:21:48 +00:00
// Listen for server close.
go func() {
<-done
conn.Close()
}()
2014-10-22 05:32:19 +00:00
buf := make([]byte, 65536)
for {
2014-10-22 05:32:19 +00:00
// Read from connection.
n, _, err := conn.ReadFromUDP(buf)
2014-10-23 04:21:48 +00:00
if err == io.EOF {
return
} else if err != nil {
log.Warn("Server: Error when reading from UDP connection %s", err.Error())
}
2014-10-22 05:32:19 +00:00
// Read in data in a separate goroutine.
s.wg.Add(1)
go s.handleUDPMessage(string(buf[:n]))
}
}
2014-10-22 05:32:19 +00:00
// handleUDPMessage splits a UDP packet by newlines and processes each message.
func (s *Server) handleUDPMessage(msg string) {
2014-10-22 05:32:19 +00:00
defer s.wg.Done()
for _, metric := range strings.Split(msg, "\n") {
s.handleMessage(bufio.NewReader(strings.NewReader(metric + "\n")))
}
}
2014-10-22 05:32:19 +00:00
// Close shuts down the server's listeners.
func (s *Server) Close() error {
2014-10-22 05:32:19 +00:00
// Notify other goroutines of shutdown.
s.mu.Lock()
2014-10-23 04:21:48 +00:00
if s.done == nil {
s.mu.Unlock()
return ErrServerClosed
2014-10-23 04:21:48 +00:00
}
2014-10-22 05:32:19 +00:00
close(s.done)
s.done = nil
s.mu.Unlock()
// Wait for all goroutines to shutdown.
s.wg.Wait()
return nil
}
// handleMessage decodes a graphite message from the reader and sends it to the
// committer goroutine.
func (s *Server) handleMessage(r *bufio.Reader) error {
// Decode graphic metric.
m, err := DecodeMetric(r)
if err != nil {
return err
}
2014-10-22 05:32:19 +00:00
// Convert metric to a field value.
var values = make(map[string]interface{})
if m.IsInt {
values[m.Name] = &m.IntegerValue
} else {
values[m.Name] = &m.FloatValue
}
2014-11-04 04:15:58 +00:00
retentionPolicy, err := s.Server.DefaultRetentionPolicy(s.Database)
2014-11-04 04:15:58 +00:00
if err != nil {
return fmt.Errorf("error looking up default database retention policy: %s", err)
}
2014-11-04 04:15:58 +00:00
if err := s.Server.WriteSeries(
s.Database,
retentionPolicy.Name,
m.Name,
m.Tags,
m.Timestamp,
values,
); err != nil {
return fmt.Errorf("write series data: %s", err)
}
return nil
}
2014-03-04 01:52:57 +00:00
type Metric struct {
Name string
Tags map[string]string
IsInt bool
IntegerValue int64
FloatValue float64
Timestamp time.Time
}
// returns err == io.EOF when we hit EOF without any further data
func DecodeMetric(r *bufio.Reader) (*Metric, error) {
// Read up to the next newline.
buf, err := r.ReadBytes('\n')
if err != nil && err != io.EOF {
// it's possible to get EOF but also data
return nil, fmt.Errorf("connection closed uncleanly/broken: %s\n", err.Error())
}
// Trim the buffer, even though there should be no padding
str := strings.TrimSpace(string(buf))
// Remove line return
str = strings.TrimSuffix(str, `\n`)
if str == "" {
return nil, err
}
// Break into 3 fields (name, value, timestamp).
fields := strings.Fields(str)
if len(fields) != 3 {
return nil, fmt.Errorf("received %q which doesn't have three fields", str)
}
2014-10-22 00:20:43 +00:00
m := new(Metric)
// decode the name and tags
name, tags, err := DecodeNameAndTags(fields[0])
if err != nil {
return nil, err
}
m.Name = name
m.Tags = tags
2014-11-18 00:23:21 +00:00
// Parse value.
v, err := strconv.ParseFloat(fields[1], 64)
if err != nil {
return nil, err
}
2014-11-18 00:23:21 +00:00
// Determine if value is a float or an int.
if i := int64(v); float64(i) == v {
m.IntegerValue, m.IsInt = int64(v), true
} else {
m.FloatValue = v
2014-10-22 00:20:43 +00:00
}
2014-10-22 05:32:19 +00:00
// Parse timestamp.
unixTime, err := strconv.ParseInt(fields[2], 10, 64)
if err != nil {
return nil, err
2014-10-22 00:20:43 +00:00
}
2014-10-22 05:32:19 +00:00
m.Timestamp = time.Unix(0, unixTime*int64(time.Millisecond))
2014-10-22 05:32:19 +00:00
return m, nil
}
2014-10-22 05:32:19 +00:00
func DecodeNameAndTags(field string) (string, map[string]string, error) {
var (
name string
tags = make(map[string]string)
)
// decode the name and tags
values := strings.Split(field, `.`)
if len(values)%2 != 1 {
// There should always be an odd number of fields to map a metric name and tags
// ex: region.us-west.hostname.server01.cpu -> tags -> region: us-west, hostname: server01, metric name -> cpu
return name, tags, fmt.Errorf("received %q which doesn't conform to format of key.value.key.value.metric or metric", field)
}
2014-10-22 05:32:19 +00:00
// Name is the last field
name = values[len(values)-1]
if name == "" {
return name, tags, fmt.Errorf("no name specified for metric. %q", field)
}
2014-11-18 00:23:21 +00:00
values = values[0 : len(values)-1]
2014-11-18 00:23:21 +00:00
// Grab the pairs and throw them in the map
for i := 0; i < len(values); i += 2 {
k := values[i]
v := values[i+1]
tags[k] = v
}
2014-10-22 05:32:19 +00:00
return name, tags, nil
2014-10-22 00:20:43 +00:00
}