influxdb/graphite.go

301 lines
7.1 KiB
Go
Raw Normal View History

2014-10-22 00:20:43 +00:00
package influxdb
2014-03-04 01:52:57 +00:00
import (
"bufio"
2014-10-22 05:32:19 +00:00
"errors"
"io"
2014-03-04 01:52:57 +00:00
"net"
"strings"
graphite ingest write data in batches to coordinator Close #644 This commit also include lots of cleanup related to start up and shutting down as well as logging. Below is an explanation of how the api starts up and shuts down. It also covers the error conditions and how they are handled. networking/goroutine fixes * break from TCP Accept() loop when connection closed, which was preventing shutdown to proceed * make sure that UDP functionality doesn't write to writeSeries channel after it has been closed. * clearer, more specific shutdown message in particular: * self.writers allows us to make sure things writing to writeSeries are done (they do blocking calls to handleMessage()) whether udp or tcp * self.connClosed lets us break from the Accept() loop, see http://zhen.org/blog/graceful-shutdown-of-go-net-dot-listeners/ (quit channel) * shutdown channel is now allCommitted things can get a little complicated, so here's a little schematic of how the functions and their logic relate: indent for a call out or important code within. everything shown as one nested tree server.go go ListenAndServe go committer reads from self.writeSeries until closed, then writes to self.allCommitted Serve for { Accept, breaks if err + connClosed self.writers.Add() go handleClient for { handleMessage reads until err and writes to self.writeSeries until read failed reads until EOF, ignores other handleMessage errors } conn.Close() self.writers.Done() } self.writers.Wait() close(self.writeSeries) Close() close(self.connClosed) self.conn.Close() wants confirmation on allCommitted channel; [timeout] returns within 5s
2014-04-11 12:25:09 +00:00
"sync"
log "code.google.com/p/log4go"
2014-03-04 01:52:57 +00:00
)
2014-10-22 05:32:19 +00:00
var (
// ErrBindAddressRequired is returned when starting the GraphiteServer
// without a TCP or UDP listening address.
ErrBindAddressRequired = errors.New("bind address required")
2014-03-04 01:52:57 +00:00
2014-10-23 04:21:48 +00:00
// ErrGraphiteServerClosed return when closing an already closed graphite server.
ErrGraphiteServerClosed = errors.New("graphite server already closed")
)
2014-10-22 05:32:19 +00:00
// GraphiteListener provides a tcp and/or udp listener that you can
// use to ingest metrics into influxdb via the graphite protocol. it
// behaves as a carbon daemon, except:
//
// no rounding of timestamps to the nearest interval. Upon ingestion
// of multiple datapoints for a given key within the same interval
// (possibly but not necessarily the same timestamp), graphite would
// use one (the latest received) value with a rounded timestamp
// representing that interval. We store values for every timestamp we
// receive (only the latest value for a given metric-timestamp pair)
// so it's up to the user to feed the data in proper intervals (and
// use round intervals if you plan to rely on that)
type GraphiteServer struct {
server *Server
mu sync.Mutex
wg sync.WaitGroup
done chan struct{} // close notification
// The TCP address to listen on.
TCPAddr *net.TCPAddr
// The UDP address to listen on.
UDPAddr *net.UDPAddr
// The name of the database to insert data into.
Database string
// The cluster admin authorized to insert the data.
User *ClusterAdmin
}
// NewGraphiteServer returns an instance of GraphiteServer attached to a Server.
func NewGraphiteServer(server *Server) *GraphiteServer {
return &GraphiteServer{server: server}
2014-03-04 01:52:57 +00:00
}
2014-10-22 05:32:19 +00:00
// ListenAndServe opens TCP (and optionally a UDP) socket to listen for messages.
func (s *GraphiteServer) ListenAndServe() error {
// Make sure we have either a TCP address or a UDP address.
// Also validate that there is an admin user to insert data as.
if s.TCPAddr == nil && s.UDPAddr == nil {
return ErrBindAddressRequired
} else if s.User != nil {
return ErrClusterAdminNotFound
}
// Create a new close notification channel.
done := make(chan struct{}, 0)
s.done = done
2014-03-04 01:52:57 +00:00
2014-10-22 05:32:19 +00:00
// Open the TCP connection.
if s.TCPAddr != nil {
2014-10-23 04:21:48 +00:00
l, err := net.ListenTCP("tcp", s.TCPAddr)
2014-03-04 01:52:57 +00:00
if err != nil {
2014-10-22 05:32:19 +00:00
return err
2014-03-04 01:52:57 +00:00
}
2014-10-23 04:21:48 +00:00
defer func() { _ = l.Close() }()
s.wg.Add(1)
go s.serveTCP(l, done)
2014-03-04 01:52:57 +00:00
}
2014-10-22 05:32:19 +00:00
// Open the UDP connection.
if s.UDPAddr != nil {
2014-10-23 04:21:48 +00:00
l, err := net.ListenUDP("udp", s.UDPAddr)
2014-10-22 05:32:19 +00:00
if err != nil {
return err
}
2014-10-23 04:21:48 +00:00
defer func() { _ = l.Close() }()
2014-10-22 05:32:19 +00:00
2014-10-23 04:21:48 +00:00
s.wg.Add(1)
go s.serveUDP(l, done)
}
2014-10-22 05:32:19 +00:00
return nil
2014-03-04 01:52:57 +00:00
}
2014-10-22 05:32:19 +00:00
// serveTCP handles incoming TCP connection requests.
func (s *GraphiteServer) serveTCP(l *net.TCPListener, done chan struct{}) {
2014-10-23 04:21:48 +00:00
defer s.wg.Done()
// Listen for server close.
go func() {
<-done
l.Close()
}()
// Listen for new TCP connections.
2014-03-04 01:52:57 +00:00
for {
2014-10-22 05:32:19 +00:00
c, err := l.Accept()
2014-03-04 01:52:57 +00:00
if err != nil {
2014-10-22 05:32:19 +00:00
// TODO(benbjohnson): Check for connection closed.
2014-03-04 01:52:57 +00:00
log.Error("GraphiteServer: Accept: ", err)
continue
}
2014-10-22 05:32:19 +00:00
s.wg.Add(1)
go s.handleTCPConn(c)
}
}
func (s *GraphiteServer) handleTCPConn(conn net.Conn) {
defer conn.Close()
defer s.wg.Done()
reader := bufio.NewReader(conn)
for {
err := s.handleMessage(reader)
if err != nil {
if io.EOF == err {
log.Debug("GraphiteServer: Client closed graphite connection")
return
}
log.Error("GraphiteServer:", err)
}
2014-03-04 01:52:57 +00:00
}
}
2014-10-22 05:32:19 +00:00
// serveUDP handles incoming UDP messages.
func (s *GraphiteServer) serveUDP(conn *net.UDPConn, done chan struct{}) {
defer s.wg.Done()
2014-10-23 04:21:48 +00:00
// Listen for server close.
go func() {
<-done
conn.Close()
}()
2014-10-22 05:32:19 +00:00
buf := make([]byte, 65536)
for {
2014-10-22 05:32:19 +00:00
// Read from connection.
n, _, err := conn.ReadFromUDP(buf)
2014-10-23 04:21:48 +00:00
if err == io.EOF {
return
} else if err != nil {
graphite ingest write data in batches to coordinator Close #644 This commit also include lots of cleanup related to start up and shutting down as well as logging. Below is an explanation of how the api starts up and shuts down. It also covers the error conditions and how they are handled. networking/goroutine fixes * break from TCP Accept() loop when connection closed, which was preventing shutdown to proceed * make sure that UDP functionality doesn't write to writeSeries channel after it has been closed. * clearer, more specific shutdown message in particular: * self.writers allows us to make sure things writing to writeSeries are done (they do blocking calls to handleMessage()) whether udp or tcp * self.connClosed lets us break from the Accept() loop, see http://zhen.org/blog/graceful-shutdown-of-go-net-dot-listeners/ (quit channel) * shutdown channel is now allCommitted things can get a little complicated, so here's a little schematic of how the functions and their logic relate: indent for a call out or important code within. everything shown as one nested tree server.go go ListenAndServe go committer reads from self.writeSeries until closed, then writes to self.allCommitted Serve for { Accept, breaks if err + connClosed self.writers.Add() go handleClient for { handleMessage reads until err and writes to self.writeSeries until read failed reads until EOF, ignores other handleMessage errors } conn.Close() self.writers.Done() } self.writers.Wait() close(self.writeSeries) Close() close(self.connClosed) self.conn.Close() wants confirmation on allCommitted channel; [timeout] returns within 5s
2014-04-11 12:25:09 +00:00
log.Warn("GraphiteServer: Error when reading from UDP connection %s", err.Error())
}
2014-10-22 05:32:19 +00:00
// Read in data in a separate goroutine.
s.wg.Add(1)
go s.handleUDPMessage(string(buf[:n]))
}
}
2014-10-22 05:32:19 +00:00
// handleUDPMessage splits a UDP packet by newlines and processes each message.
func (s *GraphiteServer) handleUDPMessage(msg string) {
defer s.wg.Done()
for _, metric := range strings.Split(msg, "\n") {
s.handleMessage(bufio.NewReader(strings.NewReader(metric + "\n")))
}
}
2014-10-22 05:32:19 +00:00
// Close shuts down the server's listeners.
func (s *GraphiteServer) Close() error {
// Notify other goroutines of shutdown.
s.mu.Lock()
2014-10-23 04:21:48 +00:00
if s.done == nil {
s.mu.Unlock()
return ErrGraphiteServerClosed
}
2014-10-22 05:32:19 +00:00
close(s.done)
s.done = nil
s.mu.Unlock()
// Wait for all goroutines to shutdown.
s.wg.Wait()
return nil
}
// handleMessage decodes a graphite message from the reader and sends it to the
// committer goroutine.
func (s *GraphiteServer) handleMessage(r *bufio.Reader) error {
2014-11-18 00:23:21 +00:00
panic("not yet implemented: GraphiteServer.handleMessage()")
/* TEMPORARILY REMOVED FOR PROTOBUFS.
// Decode graphic metric.
m, err := decodeGraphiteMetric(r)
if err != nil {
return err
}
2014-10-22 05:32:19 +00:00
2014-11-04 04:15:58 +00:00
2014-11-18 00:23:21 +00:00
// Convert metric to a field value.
v := &protocol.FieldValue{}
if m.isInt {
v.Int64Value = &m.integerValue
} else {
v.DoubleValue = &m.floatValue
}
2014-11-04 04:15:58 +00:00
2014-11-18 00:23:21 +00:00
// Use a single sequence number to make sure last write wins.
sn := uint64(1)
2014-11-04 04:15:58 +00:00
2014-11-18 00:23:21 +00:00
// Send data point to committer.
p := &protocol.Point{
Timestamp: &m.timestamp,
Values: []*protocol.FieldValue{v},
SequenceNumber: &sn,
}
2014-03-04 01:52:57 +00:00
2014-11-18 00:23:21 +00:00
// Write data to server.
series := &protocol.Series{
Name: &m.name,
Fields: []string{"value"},
Points: []*protocol.Point{p},
}
2014-10-22 00:20:43 +00:00
2014-11-18 00:23:21 +00:00
// TODO: Validate user.
2014-10-22 00:20:43 +00:00
2014-11-18 00:23:21 +00:00
// Look up database.
db := s.server.Database(s.Database)
if db == nil {
return ErrDatabaseNotFound
2014-10-22 00:20:43 +00:00
}
2014-11-18 00:23:21 +00:00
// Write series data to database.
if err := db.WriteSeries(series); err != nil {
return fmt.Errorf("write series data: %s", err)
2014-10-22 00:20:43 +00:00
}
2014-11-18 00:23:21 +00:00
return nil
2014-10-22 00:20:43 +00:00
}
2014-10-22 05:32:19 +00:00
2014-11-18 00:23:21 +00:00
type graphiteMetric struct {
name string
isInt bool
integerValue int64
floatValue float64
timestamp int64
2014-10-22 00:20:43 +00:00
}
2014-10-22 05:32:19 +00:00
2014-11-18 00:23:21 +00:00
// returns err == io.EOF when we hit EOF without any further data
func decodeGraphiteMetric(r *bufio.Reader) (*graphiteMetric, error) {
// Read up to the next newline.
buf, err := r.ReadBytes('\n')
str := strings.TrimSpace(string(buf))
if err != nil {
if err != io.EOF {
return nil, fmt.Errorf("connection closed uncleanly/broken: %s\n", err.Error())
}
if str == "" {
return nil, err
}
// else we got EOF but also data, so just try to process it as valid data
}
2014-10-22 05:32:19 +00:00
2014-11-18 00:23:21 +00:00
// Break into 3 fields (name, value, timestamp).
fields := strings.Fields(str)
if len(fields) != 3 {
return nil, fmt.Errorf("received '%s' which doesn't have three fields", str)
}
2014-10-22 05:32:19 +00:00
2014-11-18 00:23:21 +00:00
// Create a metric.
m := &graphiteMetric{name: fields[0]}
2014-10-22 05:32:19 +00:00
2014-11-18 00:23:21 +00:00
// Parse value.
v, err := strconv.ParseFloat(fields[1], 64)
if err != nil {
return nil, err
}
// Determine if value is a float or an int.
if i := int64(v); float64(i) == v {
m.integerValue, m.isInt = int64(v), true
} else {
m.floatValue = v
}
// Parse timestamp.
timestamp, err := strconv.ParseUint(fields[2], 10, 32)
if err != nil {
return nil, err
}
m.timestamp = int64(timestamp) * int64(time.Millisecond)
2014-10-22 05:32:19 +00:00
2014-11-18 00:23:21 +00:00
return m, nil
*/
2014-10-22 00:20:43 +00:00
}