Refactor into distinct TCP and UDP servers

pull/1294/head
Philip O'Toole 2015-01-06 14:26:31 -08:00
parent c5ea4ab2ed
commit 1f264b6c2d
5 changed files with 177 additions and 256 deletions

View File

@ -2,12 +2,10 @@ package main
import (
"fmt"
"net"
"os"
"os/user"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/BurntSushi/toml"
@ -252,7 +250,7 @@ type Graphite struct {
Address string `toml:"address"`
Database string `toml:"database"`
Enabled bool `toml:"enabled"`
Port int `toml:"port"`
Port uint16 `toml:"port"`
Protocol string `toml:"protocol"`
NamePosition string `toml:"name-position"`
NameSeparator string `toml:"name-separator"`
@ -261,56 +259,23 @@ type Graphite struct {
// Default carbon port per http://graphite.readthedocs.org/en/1.0/feeding-carbon.html
const defaultGrahitePort = 2004
// TCPAddr returns the TCP address to connect on.
// If port is not specified in the config file, it will default to defaultGraphitePort (2004)
// If address is not specified,it will default to the defaultBindAddress passed in
func (g Graphite) TCPAddr(defaultBindAddress string) *net.TCPAddr {
if !g.Enabled || strings.ToLower(g.Protocol) != "tcp" {
return nil
}
// ConnnectionString returns the connection string for this Graphite config in the form
// host:port.
func (g Graphite) ConnectionString(defaultBindAddr string) string {
var addr string
var port uint16
a := net.TCPAddr{}
// Did we specify an IP address? If not, use the defaultBindAddress passed in
// If no address specified, use default.
if g.Address != "" {
a.IP = net.ParseIP(g.Address)
} else {
a.IP = net.ParseIP(defaultBindAddress)
addr = defaultBindAddr
}
// Did we specify a port? If not, use the default port
if g.Port > 0 {
a.Port = g.Port
} else {
a.Port = defaultGrahitePort
}
return &a
}
// UDPAddr returns the UDP address to connect on.
// If port is not specified in the config file, it will default to defaultGraphitePort (2004)
// If address is not specified,it will default to the defaultBindAddress passed in
func (g Graphite) UDPAddr(defaultBindAddress string) *net.UDPAddr {
if !g.Enabled || strings.ToLower(g.Protocol) != "udp" {
return nil
// If no port specified, use default.
if g.Port < 1 {
port = 2004
}
a := net.UDPAddr{}
// Did we specify an IP address? If not, use the defaultBindAddress passed in
if g.Address != "" {
a.IP = net.ParseIP(g.Address)
} else {
a.IP = net.ParseIP(defaultBindAddress)
}
// Did we specify a port? If not, use the default port
if g.Port > 0 {
a.Port = g.Port
} else {
a.Port = defaultGrahitePort
}
return &a
return fmt.Sprintf("%s:%u", addr, port)
}
/*

View File

@ -91,26 +91,28 @@ func execRun(args []string) {
}
log.Printf("DataNode#%d running on %s", s.ID(), config.ApiHTTPListenAddr())
// Spin up any grahite servers
for _, g := range config.Graphite {
// Get a new server
s := graphite.Server{Server: s}
// Spin up any Graphite servers
for _, c := range config.Graphite {
if !c.Enabled {
continue
}
var g graphite.GraphiteServer
if strings.ToLower(g.Protocol) == "tcp" {
g = graphite.NewTcpGraphiteServer(s)
} else {
g = graphite.NewUdpGraphiteServer(s)
}
// Set options
s.Database = g.Database
s.NamePosition = g.NamePosition
s.NameSeparator = g.NameSeparator
g.Database = g.Database
g.NamePosition = g.NamePosition
g.NameSeparator = g.NameSeparator
// Set the addresses up
if strings.ToLower(g.Protocol) == "tcp" {
addr := g.TCPAddr(config.BindAddress)
log.Printf("Starting Graphite listener on tcp://%s:%d writing to database %q.\n", addr.IP, addr.Port, s.Database)
go func() { log.Fatal(s.ListenAndServeTCP(addr)) }()
} else {
addr := g.UDPAddr(config.BindAddress)
log.Printf("Starting Graphite listener on udp://%s:%d writing to database %q.\n", addr.IP, addr.Port, s.Database)
go func() { log.Fatal(s.ListenAndServeUDP(addr)) }()
// Start the Graphite Server.
err := g.Start(c.ConnectionString(config.BindAddress))
if err != nil {
log.Println("failed to start Graphite Server", err.Error())
}
}
}

View File

@ -5,11 +5,8 @@ import (
"errors"
"fmt"
"io"
"log"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/influxdb/influxdb"
@ -30,203 +27,32 @@ var (
ErrServerNotSpecified = errors.New("server not present")
)
// Graphite Server provides a tcp and/or udp listener that you can
// use to ingest metrics into influxdb via the graphite protocol. it
// behaves as a carbon daemon, except:
//
// no rounding of timestamps to the nearest interval. Upon ingestion
// of multiple datapoints for a given key within the same interval
// (possibly but not necessarily the same timestamp), graphite would
// use one (the latest received) value with a rounded timestamp
// representing that interval. We store values for every timestamp we
// receive (only the latest value for a given metric-timestamp pair)
// so it's up to the user to feed the data in proper intervals (and
// use round intervals if you plan to rely on that)
type Server struct {
mu sync.Mutex
wg sync.WaitGroup
done chan struct{} // close notification
Server influxdbServer
tcpListener *net.TCPListener
udpConn *net.UDPConn
// The name of the database to insert data into.
Database string
// Position of name to be parsed from metric_path
NamePosition string
// sperator to parse metric_path with to get name and series
NameSeparator string
}
type influxdbServer interface {
type dataSink interface {
WriteSeries(database, retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) error
DefaultRetentionPolicy(database string) (*influxdb.RetentionPolicy, error)
}
func NewServer(is influxdbServer) *Server {
s := Server{Server: is}
return &s
}
// GraphiteServer represents a server that can process Graphite data recieved over
// the network.
type GraphiteServer interface {
// Start starts the server listening on the given interface. iface
// should be in the form address:port
Start(iface string) error
// ListenAndServe opens TCP (and optionally a UDP) socket to listen for messages.
func (s *Server) ListenAndServeTCP(addr *net.TCPAddr) error {
s.mu.Lock()
defer s.mu.Unlock()
// Sink is the destination for processed Graphite data.
sink dataSink
if addr == nil { // Make sure we have a TCP address
return ErrBindAddressRequired
} else if s.Database == "" { // Make sure they have a database
return ErrDatabaseNotSpecified
} else if s.Server == nil { // Make sure they specified a server
return ErrServerNotSpecified
}
// Create a new close notification channel.
done := make(chan struct{}, 0)
s.done = done
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return err
}
s.tcpListener = l
s.wg.Add(1)
go s.serveTCP(l, done)
return nil
}
// serveTCP handles incoming TCP connection requests.
func (s *Server) serveTCP(l *net.TCPListener, done chan struct{}) {
defer s.wg.Done()
// Listen for new TCP connections.
for {
c, err := l.Accept()
if err != nil {
//log.Println("graphite.Server: Accept: ", err)
return
}
s.wg.Add(1)
go s.handleTCPConn(c)
}
}
func (s *Server) handleTCPConn(conn net.Conn) {
defer conn.Close()
defer s.wg.Done()
reader := bufio.NewReader(conn)
for {
err := s.handleMessage(reader)
if err != nil {
if io.EOF == err {
//log.Println("graphite.Server: Client closed graphite connection")
return
}
log.Println("graphite.Server:", err) // this never shows up in my buffer for testing
}
}
}
func (s *Server) ListenAndServeUDP(addr *net.UDPAddr) error {
s.mu.Lock()
defer s.mu.Unlock()
if addr == nil { // Make sure we have a TCP address
return ErrBindAddressRequired
} else if s.Database == "" { // Make sure they have a database
return ErrDatabaseNotSpecified
} else if s.Server == nil { // Make sure they specified a server
return ErrServerNotSpecified
}
// Create a new close notification channel.
done := make(chan struct{}, 0)
s.done = done
//Open the UDP connection.
c, err := net.ListenUDP("udp", addr)
if err != nil {
return err
}
s.udpConn = c
s.wg.Add(1)
go s.serveUDP(c, done)
return nil
}
// serveUDP handles incoming UDP messages.
func (s *Server) serveUDP(conn *net.UDPConn, done chan struct{}) {
defer s.wg.Done()
// Listen for server close.
go func() {
<-done
conn.Close()
}()
buf := make([]byte, 65536)
for {
// Read from connection.
n, _, err := conn.ReadFromUDP(buf)
if err == io.EOF {
return
} else if err != nil {
log.Printf("Server: Error when reading from UDP connection %s\n", err.Error())
}
// Read in data in a separate goroutine.
s.wg.Add(1)
go s.handleUDPMessage(string(buf[:n]))
}
}
// handleUDPMessage splits a UDP packet by newlines and processes each message.
func (s *Server) handleUDPMessage(msg string) {
defer s.wg.Done()
for _, metric := range strings.Split(msg, "\n") {
s.handleMessage(bufio.NewReader(strings.NewReader(metric + "\n")))
}
}
// Close shuts down the server's listeners.
func (s *Server) Close() error {
// Notify other goroutines of shutdown.
s.mu.Lock()
if s.done == nil {
s.mu.Unlock()
return ErrServerClosed
}
close(s.done)
s.done = nil
if s.tcpListener != nil {
_ = s.tcpListener.Close()
}
s.tcpListener = nil
if s.udpConn != nil {
_ = s.udpConn.Close()
}
s.udpConn = nil
s.mu.Unlock()
// Wait for all goroutines to shutdown.
s.wg.Wait()
return nil
// Database is the name of the database to insert data into.
Database string
// NamePosition is the position of name to be parsed from metric_path
NamePosition string
// NameSeparator is separator to parse metric_path with to get name and series
NameSeparator string
}
// handleMessage decodes a graphite message from the reader and sends it to the
// committer goroutine.
func (s *Server) handleMessage(r *bufio.Reader) error {
func (s *server) handleMessage(r *bufio.Reader) error {
// Decode graphic metric.
m, err := DecodeMetric(r, s.NamePosition, s.NameSeparator)
if err != nil {
@ -237,13 +63,13 @@ func (s *Server) handleMessage(r *bufio.Reader) error {
var values = make(map[string]interface{})
values[m.Name] = m.Value
retentionPolicy, err := s.Server.DefaultRetentionPolicy(s.Database)
retentionPolicy, err := s.sink.DefaultRetentionPolicy(s.Database)
if err != nil {
return fmt.Errorf("error looking up default database retention policy: %s", err)
}
if err := s.Server.WriteSeries(
if err := s.sink.WriteSeries(
s.Database,
retentionPolicy.Name,
m.Name,

63
graphite/graphite_tcp.go Normal file
View File

@ -0,0 +1,63 @@
package graphite
import (
"bufio"
"io"
"log"
"net"
)
// TcpGraphiteServer processes Graphite data received over TCP connections.
type TcpGraphiteServer struct {
GraphiteServer
}
// NewTcpGraphiteServer returns a new instance of a TcpGraphiteServer.
func NewTcpGraphiteServer(d dataSink) *TcpGraphiteServer {
t := TcpGraphiteServer{}
t.sink = d
return &t
}
// Start instructs the TcpGraphiteServer to start processing Graphite data
// on the given interface. iface must be in the form host:port
func (t *TcpGraphiteServer) Start(iface string) error {
if iface == "" { // Make sure we have an address
return ErrBindAddressRequired
} else if t.Database == "" { // Make sure they have a database
return ErrDatabaseNotSpecified
} else if t.sink == nil { // Make sure they specified a backend sink
return ErrServerNotSpecified
}
ln, err := net.Listen("tcp", iface)
if err != nil {
return err
}
go func() {
for {
conn, err := ln.Accept()
if err != nil {
log.Println("erorr accepting TCP connection", err.Error())
continue
}
go t.handleConnection(conn)
}
}()
return nil
}
func (t *TcpGraphiteServer) handleConnection(conn net.Conn) {
defer conn.Close()
reader := bufio.NewReader(conn)
for {
err := t.handleMessage(reader)
if err != nil {
if err == io.EOF {
return
} else {
log.Println("ignoring error reading graphite data over TCP", err.Error())
}
}
}
}

65
graphite/graphite_udp.go Normal file
View File

@ -0,0 +1,65 @@
package graphite
import (
"bufio"
"io"
"log"
"net"
"strings"
)
const (
udpBufferSize = 65536
)
// UdpGraphiteServer processes Graphite data received via UDP packets.
type UdpGraphiteServer struct {
GrpahiteServer
}
// NewUdpGraphiteServer returns a new instance of a UdpGraphiteServer.
func NewUdpGraphiteServer(d dataSink) *UdpGraphiteServer {
u := UdpGraphiteServer{}
u.sink = d
return &u
}
// Start instructs the UdpGraphiteServer to start processing Graphite data
// on the given interface. iface must be in the form host:port
func (u *UdpGraphiteServer) Start(iface string) error {
if iface == "" { // Make sure we have an address
return ErrBindAddressRequired
} else if u.Database == "" { // Make sure they have a database
return ErrDatabaseNotSpecified
} else if u.sink == nil { // Make sure they specified a backend sink
return ErrServerNotSpecified
}
addr, err := net.ResolveUDPAddr("udp", iface)
if err != nil {
return nil
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return err
}
buf := make([]byte, udpBufferSize)
go func() {
for {
n, _, err := conn.ReadFromUDP(buf)
if err != nil {
if err == io.EOF {
return
} else {
log.Println("ignoring error reading Graphite data over UDP", err.Error())
}
}
for _, metric := range strings.Split(string(buf[:n]), "\n") {
u.handleMessage(bufio.NewReader(strings.NewReader(metric + "\n")))
}
}
}()
return nil
}