From 0a683144fb1516ff84d9e49036c0b88d35e7ce5e Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Thu, 5 Nov 2015 13:51:42 -0700 Subject: [PATCH] Update collectd and graphite UDP listeners with perf enhancements closes #4678 --- services/collectd/config.go | 16 ++++++++++++++-- services/collectd/service.go | 19 ++++++++++--------- services/graphite/config.go | 16 ++++++++++++---- services/graphite/service.go | 3 +++ 4 files changed, 39 insertions(+), 15 deletions(-) diff --git a/services/collectd/config.go b/services/collectd/config.go index 427598ca6d..ba404adef3 100644 --- a/services/collectd/config.go +++ b/services/collectd/config.go @@ -7,19 +7,29 @@ import ( ) const ( + // DefaultBindAddress is the default port to bind to DefaultBindAddress = ":25826" + // DefaultDatabase is the default DB to write to DefaultDatabase = "collectd" + // DefaultRetentionPolicy is the default retention policy of the writes DefaultRetentionPolicy = "" - DefaultBatchSize = 1000 + // DefaultBatchSize is the default write batch size. + DefaultBatchSize = 5000 - DefaultBatchPending = 5 + // DefaultBatchPending is the default number of pending write batches. + DefaultBatchPending = 10 + // DefaultBatchTimeout is the default batch timeout. DefaultBatchDuration = toml.Duration(10 * time.Second) DefaultTypesDB = "/usr/share/collectd/types.db" + + // DefaultUDPReadBuffer is the default UDP read buffer + // increasing this increases the number of UDP packets that can be handled. + DefaultReadBuffer = 8 * 1024 * 1024 ) // Config represents a configuration for the collectd service. @@ -31,6 +41,7 @@ type Config struct { BatchSize int `toml:"batch-size"` BatchPending int `toml:"batch-pending"` BatchDuration toml.Duration `toml:"batch-timeout"` + ReadBuffer int `toml:"read-buffer"` TypesDB string `toml:"typesdb"` } @@ -40,6 +51,7 @@ func NewConfig() Config { BindAddress: DefaultBindAddress, Database: DefaultDatabase, RetentionPolicy: DefaultRetentionPolicy, + ReadBuffer: DefaultReadBuffer, BatchSize: DefaultBatchSize, BatchPending: DefaultBatchPending, BatchDuration: DefaultBatchDuration, diff --git a/services/collectd/service.go b/services/collectd/service.go index 7acd07252e..339503abbe 100644 --- a/services/collectd/service.go +++ b/services/collectd/service.go @@ -53,7 +53,7 @@ type Service struct { wg sync.WaitGroup err chan error stop chan struct{} - ln *net.UDPConn + conn *net.UDPConn batcher *tsdb.PointBatcher typesdb gollectd.Types addr net.Addr @@ -118,13 +118,14 @@ func (s *Service) Open() error { s.addr = addr // Start listening - ln, err := net.ListenUDP("udp", addr) + conn, err := net.ListenUDP("udp", addr) if err != nil { return fmt.Errorf("unable to listen on UDP: %s", err) } - s.ln = ln + conn.SetReadBuffer(s.Config.ReadBuffer) + s.conn = conn - s.Logger.Println("Listening on UDP: ", ln.LocalAddr().String()) + s.Logger.Println("Listening on UDP: ", conn.LocalAddr().String()) // Start the points batcher. s.batcher = tsdb.NewPointBatcher(s.Config.BatchSize, s.Config.BatchPending, time.Duration(s.Config.BatchDuration)) @@ -147,8 +148,8 @@ func (s *Service) Close() error { if s.stop != nil { close(s.stop) } - if s.ln != nil { - s.ln.Close() + if s.conn != nil { + s.conn.Close() } if s.batcher != nil { s.batcher.Stop() @@ -157,7 +158,7 @@ func (s *Service) Close() error { // Release all remaining resources. s.stop = nil - s.ln = nil + s.conn = nil s.batcher = nil s.Logger.Println("collectd UDP closed") return nil @@ -179,7 +180,7 @@ func (s *Service) Err() chan error { return s.err } // Addr returns the listener's address. Returns nil if listener is closed. func (s *Service) Addr() net.Addr { - return s.ln.LocalAddr() + return s.conn.LocalAddr() } func (s *Service) serve() { @@ -204,7 +205,7 @@ func (s *Service) serve() { // Keep processing. } - n, _, err := s.ln.ReadFromUDP(buffer) + n, _, err := s.conn.ReadFromUDP(buffer) if err != nil { s.statMap.Add(statReadFail, 1) s.Logger.Printf("collectd ReadFromUDP error: %s", err) diff --git a/services/graphite/config.go b/services/graphite/config.go index ff589974d0..30367fd1b3 100644 --- a/services/graphite/config.go +++ b/services/graphite/config.go @@ -26,14 +26,18 @@ const ( // measurment parts in a template. DefaultSeparator = "." - // DefaultBatchSize is the default Graphite batch size. - DefaultBatchSize = 1000 + // DefaultBatchSize is the default write batch size. + DefaultBatchSize = 5000 - // DefaultBatchPending is the default number of pending Graphite batches. - DefaultBatchPending = 5 + // DefaultBatchPending is the default number of pending write batches. + DefaultBatchPending = 10 // DefaultBatchTimeout is the default Graphite batch timeout. DefaultBatchTimeout = time.Second + + // DefaultUDPReadBuffer is the default UDP read buffer + // increasing this increases the number of UDP packets that can be handled. + DefaultUDPReadBuffer = 8 * 1024 * 1024 ) // Config represents the configuration for Graphite endpoints. @@ -49,6 +53,7 @@ type Config struct { Templates []string `toml:"templates"` Tags []string `toml:"tags"` Separator string `toml:"separator"` + UDPReadBuffer int `toml:"udp-read-buffer"` } func NewConfig() Config { @@ -92,6 +97,9 @@ func (c *Config) WithDefaults() *Config { if d.Separator == "" { d.Separator = DefaultSeparator } + if d.UDPReadBuffer == 0 { + d.UDPReadBuffer = DefaultUDPReadBuffer + } return &d } diff --git a/services/graphite/service.go b/services/graphite/service.go index a49f13f9b0..b504f0abbc 100644 --- a/services/graphite/service.go +++ b/services/graphite/service.go @@ -55,6 +55,7 @@ type Service struct { batchPending int batchTimeout time.Duration consistencyLevel cluster.ConsistencyLevel + udpReadBuffer int batcher *tsdb.PointBatcher parser *Parser @@ -95,6 +96,7 @@ func NewService(c Config) (*Service, error) { protocol: d.Protocol, batchSize: d.BatchSize, batchPending: d.BatchPending, + udpReadBuffer: d.UDPReadBuffer, batchTimeout: time.Duration(d.BatchTimeout), logger: log.New(os.Stderr, "[graphite] ", log.LstdFlags), tcpConnections: make(map[string]*tcpConnection), @@ -293,6 +295,7 @@ func (s *Service) openUDPServer() (net.Addr, error) { if err != nil { return nil, err } + s.udpConn.SetReadBuffer(s.udpReadBuffer) buf := make([]byte, udpBufferSize) s.wg.Add(1)