parent
c1e093bc08
commit
1f8b04729f
|
@ -3,6 +3,7 @@ package graphite
|
|||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/influxdb/influxdb/cluster"
|
||||
"github.com/influxdb/influxdb/toml"
|
||||
)
|
||||
|
||||
|
@ -21,28 +22,33 @@ const (
|
|||
|
||||
// DefaultProtocol is the default IP protocol used by the Graphite input.
|
||||
DefaultProtocol = "tcp"
|
||||
|
||||
// DefaultConsistencyLevel is the default write consistency for the Graphite input.
|
||||
DefaultConsistencyLevel = "one"
|
||||
)
|
||||
|
||||
// Config represents the configuration for Graphite endpoints.
|
||||
type Config struct {
|
||||
BindAddress string `toml:"bind-address"`
|
||||
Database string `toml:"database"`
|
||||
Enabled bool `toml:"enabled"`
|
||||
Protocol string `toml:"protocol"`
|
||||
NamePosition string `toml:"name-position"`
|
||||
NameSeparator string `toml:"name-separator"`
|
||||
BatchSize int `toml:"batch-size"`
|
||||
BatchTimeout toml.Duration `toml:"batch-timeout"`
|
||||
BindAddress string `toml:"bind-address"`
|
||||
Database string `toml:"database"`
|
||||
Enabled bool `toml:"enabled"`
|
||||
Protocol string `toml:"protocol"`
|
||||
NamePosition string `toml:"name-position"`
|
||||
NameSeparator string `toml:"name-separator"`
|
||||
BatchSize int `toml:"batch-size"`
|
||||
BatchTimeout toml.Duration `toml:"batch-timeout"`
|
||||
ConsistencyLevel string `toml:"consistency-level"`
|
||||
}
|
||||
|
||||
// NewConfig returns a new Config with defaults.
|
||||
func NewConfig() Config {
|
||||
return Config{
|
||||
BindAddress: DefaultBindAddress,
|
||||
Database: DefaultDatabase,
|
||||
Protocol: DefaultProtocol,
|
||||
NamePosition: DefaultNamePosition,
|
||||
NameSeparator: DefaultNameSeparator,
|
||||
BindAddress: DefaultBindAddress,
|
||||
Database: DefaultDatabase,
|
||||
Protocol: DefaultProtocol,
|
||||
NamePosition: DefaultNamePosition,
|
||||
NameSeparator: DefaultNameSeparator,
|
||||
ConsistencyLevel: DefaultConsistencyLevel,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -50,3 +56,19 @@ func NewConfig() Config {
|
|||
func (c *Config) LastEnabled() bool {
|
||||
return c.NamePosition == strings.ToLower("last")
|
||||
}
|
||||
|
||||
// ConsistencyAsEnum returns the enumerated write consistency level.
|
||||
func (c *Config) ConsistencyAsEnum() cluster.ConsistencyLevel {
|
||||
switch strings.ToLower(c.ConsistencyLevel) {
|
||||
case "any":
|
||||
return cluster.ConsistencyLevelAny
|
||||
case "one":
|
||||
return cluster.ConsistencyLevelOne
|
||||
case "quorum":
|
||||
return cluster.ConsistencyLevelQuorum
|
||||
case "all":
|
||||
return cluster.ConsistencyLevelAll
|
||||
default:
|
||||
return cluster.ConsistencyLevelOne
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ name-position = "first"
|
|||
name-separator = "."
|
||||
batch-size=100
|
||||
batch-timeout="1s"
|
||||
consistency-level="one"
|
||||
`, &c); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -41,5 +42,7 @@ batch-timeout="1s"
|
|||
t.Fatalf("unexpected graphite batch size: %d", c.BatchSize)
|
||||
} else if time.Duration(c.BatchTimeout) != time.Second {
|
||||
t.Fatalf("unexpected graphite batch timeout: %v", c.BatchTimeout)
|
||||
} else if c.ConsistencyLevel != "one" {
|
||||
t.Fatalf("unexpected graphite consistency setting: %s", c.ConsistencyLevel)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,140 +0,0 @@
|
|||
package graphite
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultGraphitePort represents the default Graphite (Carbon) plaintext port.
|
||||
DefaultGraphitePort = 2003
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrBindAddressRequired is returned when starting the Server
|
||||
// without a TCP or UDP listening address.
|
||||
ErrBindAddressRequired = errors.New("bind address required")
|
||||
|
||||
// ErrServerClosed return when closing an already closed graphite server.
|
||||
ErrServerClosed = errors.New("server already closed")
|
||||
|
||||
// ErrServerNotSpecified returned when Server is not specified.
|
||||
ErrServerNotSpecified = errors.New("server not present")
|
||||
)
|
||||
|
||||
// SeriesWriter defines the interface for the destination of the data.
|
||||
type SeriesWriter interface {
|
||||
WriteSeries(string, string, []tsdb.Point) (uint64, error)
|
||||
}
|
||||
|
||||
// Server defines the interface all Graphite servers support.
|
||||
type Server interface {
|
||||
SetBatchSize(sz int)
|
||||
SetBatchTimeout(t time.Duration)
|
||||
ListenAndServe(iface string) error
|
||||
Host() string
|
||||
Close() error
|
||||
}
|
||||
|
||||
// NewServer return a Graphite server for the given protocol, using the given parser
|
||||
// series writer, and database.
|
||||
func NewServer(protocol string, p *Parser, s SeriesWriter, db string) (Server, error) {
|
||||
if strings.ToLower(protocol) == "tcp" {
|
||||
return NewTCPServer(p, s, db), nil
|
||||
} else if strings.ToLower(protocol) == "udp" {
|
||||
return NewUDPServer(p, s, db), nil
|
||||
} else {
|
||||
return nil, fmt.Errorf("unrecognized Graphite Server protocol %s", protocol)
|
||||
}
|
||||
}
|
||||
|
||||
// Parser encapulates a Graphite Parser.
|
||||
type Parser struct {
|
||||
Separator string
|
||||
LastEnabled bool
|
||||
}
|
||||
|
||||
// NewParser returns a GraphiteParser instance.
|
||||
func NewParser() *Parser {
|
||||
return &Parser{Separator: DefaultNameSeparator}
|
||||
}
|
||||
|
||||
// Parse performs Graphite parsing of a single line.
|
||||
func (p *Parser) Parse(line string) (tsdb.Point, error) {
|
||||
// Break into 3 fields (name, value, timestamp).
|
||||
fields := strings.Fields(line)
|
||||
if len(fields) != 3 {
|
||||
return nil, fmt.Errorf("received %q which doesn't have three fields", line)
|
||||
}
|
||||
|
||||
// decode the name and tags
|
||||
name, tags, err := p.DecodeNameAndTags(fields[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Parse value.
|
||||
v, err := strconv.ParseFloat(fields[1], 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("field \"%s\" value: %s", fields[0], err)
|
||||
}
|
||||
|
||||
fieldValues := make(map[string]interface{})
|
||||
fieldValues["value"] = v
|
||||
|
||||
// Parse timestamp.
|
||||
unixTime, err := strconv.ParseFloat(fields[2], 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("field \"%s\" time: %s", fields[0], err)
|
||||
}
|
||||
|
||||
// Check if we have fractional seconds
|
||||
timestamp := time.Unix(int64(unixTime), int64((unixTime-math.Floor(unixTime))*float64(time.Second)))
|
||||
|
||||
point := tsdb.NewPoint(name, tags, fieldValues, timestamp)
|
||||
|
||||
return point, nil
|
||||
}
|
||||
|
||||
// DecodeNameAndTags parses the name and tags of a single field of a Graphite datum.
|
||||
func (p *Parser) DecodeNameAndTags(field string) (string, map[string]string, error) {
|
||||
var (
|
||||
name string
|
||||
tags = make(map[string]string)
|
||||
)
|
||||
|
||||
// decode the name and tags
|
||||
values := strings.Split(field, p.Separator)
|
||||
if len(values)%2 != 1 {
|
||||
// There should always be an odd number of fields to map a point name and tags
|
||||
// ex: region.us-west.hostname.server01.cpu -> tags -> region: us-west, hostname: server01, point name -> cpu
|
||||
return name, tags, fmt.Errorf("received %q which doesn't conform to format of key.value.key.value.name or name", field)
|
||||
}
|
||||
|
||||
if p.LastEnabled {
|
||||
name = values[len(values)-1]
|
||||
values = values[0 : len(values)-1]
|
||||
} else {
|
||||
name = values[0]
|
||||
values = values[1:]
|
||||
}
|
||||
|
||||
if name == "" {
|
||||
return name, tags, fmt.Errorf("no name specified for metric. %q", field)
|
||||
}
|
||||
|
||||
// Grab the pairs and throw them in the map
|
||||
for i := 0; i < len(values); i += 2 {
|
||||
k := values[i]
|
||||
v := values[i+1]
|
||||
tags[k] = v
|
||||
}
|
||||
|
||||
return name, tags, nil
|
||||
}
|
|
@ -1,149 +0,0 @@
|
|||
package graphite
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
// TCPServer processes Graphite data received over TCP connections.
|
||||
type TCPServer struct {
|
||||
writer SeriesWriter
|
||||
parser *Parser
|
||||
database string
|
||||
|
||||
batchSize int
|
||||
batchTimeout time.Duration
|
||||
|
||||
listener *net.Listener
|
||||
wg sync.WaitGroup
|
||||
|
||||
Logger *log.Logger
|
||||
|
||||
host string
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// NewTCPServer returns a new instance of a TCPServer.
|
||||
func NewTCPServer(p *Parser, w SeriesWriter, db string) *TCPServer {
|
||||
return &TCPServer{
|
||||
parser: p,
|
||||
writer: w,
|
||||
database: db,
|
||||
Logger: log.New(os.Stderr, "[graphite] ", log.LstdFlags),
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TCPServer) SetBatchSize(sz int) { t.batchSize = sz }
|
||||
func (t *TCPServer) SetBatchTimeout(d time.Duration) { t.batchTimeout = d }
|
||||
|
||||
// ListenAndServe instructs the TCPServer to start processing Graphite data
|
||||
// on the given interface. iface must be in the form host:port
|
||||
func (t *TCPServer) ListenAndServe(iface string) error {
|
||||
if iface == "" { // Make sure we have an address
|
||||
return ErrBindAddressRequired
|
||||
}
|
||||
|
||||
ln, err := net.Listen("tcp", iface)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.listener = &ln
|
||||
t.mu.Lock()
|
||||
t.host = ln.Addr().String()
|
||||
t.mu.Unlock()
|
||||
|
||||
t.Logger.Println("listening on TCP connection", ln.Addr().String())
|
||||
t.wg.Add(1)
|
||||
go func() {
|
||||
defer t.wg.Done()
|
||||
for {
|
||||
conn, err := ln.Accept()
|
||||
if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
|
||||
t.Logger.Println("graphite TCP listener closed")
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
t.Logger.Println("error accepting TCP connection", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
t.wg.Add(1)
|
||||
go t.handleConnection(conn)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TCPServer) Host() string {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
return t.host
|
||||
}
|
||||
|
||||
func (t *TCPServer) Close() error {
|
||||
var err error
|
||||
if t.listener != nil {
|
||||
err = (*t.listener).Close()
|
||||
}
|
||||
t.wg.Wait()
|
||||
t.listener = nil
|
||||
return err
|
||||
}
|
||||
|
||||
// handleConnection services an individual TCP connection.
|
||||
func (t *TCPServer) handleConnection(conn net.Conn) {
|
||||
defer conn.Close()
|
||||
defer t.wg.Done()
|
||||
|
||||
batcher := tsdb.NewPointBatcher(t.batchSize, t.batchTimeout)
|
||||
batcher.Start()
|
||||
reader := bufio.NewReader(conn)
|
||||
|
||||
// Start processing batches.
|
||||
var wg sync.WaitGroup
|
||||
done := make(chan struct{})
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
select {
|
||||
case batch := <-batcher.Out():
|
||||
_, e := t.writer.WriteSeries(t.database, "", batch)
|
||||
if e != nil {
|
||||
t.Logger.Printf("failed to write point batch to database %q: %s\n", t.database, e)
|
||||
}
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
// Read up to the next newline.
|
||||
buf, err := reader.ReadBytes('\n')
|
||||
if err != nil {
|
||||
batcher.Flush()
|
||||
close(done)
|
||||
wg.Wait()
|
||||
return
|
||||
}
|
||||
|
||||
// Trim the buffer, even though there should be no padding
|
||||
line := strings.TrimSpace(string(buf))
|
||||
|
||||
// Parse it.
|
||||
point, err := t.parser.Parse(line)
|
||||
if err != nil {
|
||||
t.Logger.Printf("unable to parse data: %s", err)
|
||||
continue
|
||||
}
|
||||
batcher.In() <- point
|
||||
}
|
||||
}
|
|
@ -1,124 +0,0 @@
|
|||
package graphite
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
const (
|
||||
udpBufferSize = 65536
|
||||
)
|
||||
|
||||
// UDPServer processes Graphite data received via UDP.
|
||||
type UDPServer struct {
|
||||
writer SeriesWriter
|
||||
parser *Parser
|
||||
database string
|
||||
conn *net.UDPConn
|
||||
addr *net.UDPAddr
|
||||
wg sync.WaitGroup
|
||||
|
||||
batchSize int
|
||||
batchTimeout time.Duration
|
||||
|
||||
Logger *log.Logger
|
||||
|
||||
host string
|
||||
}
|
||||
|
||||
// NewUDPServer returns a new instance of a UDPServer
|
||||
func NewUDPServer(p *Parser, w SeriesWriter, db string) *UDPServer {
|
||||
u := UDPServer{
|
||||
parser: p,
|
||||
writer: w,
|
||||
database: db,
|
||||
Logger: log.New(os.Stderr, "[graphite] ", log.LstdFlags),
|
||||
}
|
||||
return &u
|
||||
}
|
||||
|
||||
func (u *UDPServer) SetBatchSize(sz int) { u.batchSize = sz }
|
||||
func (u *UDPServer) SetBatchTimeout(d time.Duration) { u.batchTimeout = d }
|
||||
|
||||
// ListenAndServer instructs the UDPServer to start processing Graphite data
|
||||
// on the given interface. iface must be in the form host:port.
|
||||
func (u *UDPServer) ListenAndServe(iface string) error {
|
||||
if iface == "" { // Make sure we have an address
|
||||
return ErrBindAddressRequired
|
||||
}
|
||||
|
||||
addr, err := net.ResolveUDPAddr("udp", iface)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
u.addr = addr
|
||||
|
||||
conn, err := net.ListenUDP("udp", addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
u.host = u.addr.String()
|
||||
|
||||
batcher := tsdb.NewPointBatcher(u.batchSize, u.batchTimeout)
|
||||
batcher.Start()
|
||||
|
||||
// Start processing batches.
|
||||
var wg sync.WaitGroup
|
||||
done := make(chan struct{})
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
select {
|
||||
case batch := <-batcher.Out():
|
||||
_, e := u.writer.WriteSeries(u.database, "", batch)
|
||||
if e != nil {
|
||||
u.Logger.Printf("failed to write point batch to database %q: %s\n", u.database, e)
|
||||
}
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
buf := make([]byte, udpBufferSize)
|
||||
u.wg.Add(1)
|
||||
go func() {
|
||||
defer u.wg.Done()
|
||||
for {
|
||||
n, _, err := conn.ReadFromUDP(buf)
|
||||
if err != nil {
|
||||
batcher.Flush()
|
||||
close(done)
|
||||
wg.Wait()
|
||||
return
|
||||
}
|
||||
for _, line := range strings.Split(string(buf[:n]), "\n") {
|
||||
point, err := u.parser.Parse(line)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
batcher.In() <- point
|
||||
}
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *UDPServer) Host() string {
|
||||
return u.host
|
||||
}
|
||||
|
||||
func (u *UDPServer) Close() error {
|
||||
err := u.Close()
|
||||
u.wg.Done()
|
||||
return err
|
||||
}
|
Loading…
Reference in New Issue