influxdb/services/udp/service.go

161 lines
3.0 KiB
Go
Raw Normal View History

2015-06-03 13:06:36 +00:00
package udp
import (
2015-06-04 23:07:34 +00:00
"errors"
"log"
2015-06-03 13:06:36 +00:00
"net"
2015-06-04 23:07:34 +00:00
"os"
"sync"
2015-06-03 13:06:36 +00:00
"time"
2015-06-04 23:07:34 +00:00
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/tsdb"
)
const (
UDPBufferSize = 65536
2015-06-03 13:06:36 +00:00
)
2015-06-04 23:07:34 +00:00
//
// Service represents here an UDP service
// that will listen for incoming packets
// formatted with the inline protocol
//
2015-06-03 13:06:36 +00:00
type Service struct {
2015-06-04 23:07:34 +00:00
conn *net.UDPConn
addr *net.UDPAddr
wg sync.WaitGroup
done chan struct{}
2015-06-03 13:06:36 +00:00
2015-06-04 23:07:34 +00:00
batcher *tsdb.PointBatcher
config Config
PointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
}
Logger *log.Logger
2015-06-03 13:06:36 +00:00
}
func NewService(c Config) *Service {
return &Service{
2015-06-04 23:07:34 +00:00
config: c,
done: make(chan struct{}),
Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags),
}
}
func (s *Service) Open() (err error) {
if s.config.BindAddress == "" {
return errors.New("bind address has to be specified in config")
}
if s.config.Database == "" {
return errors.New("database has to be specified in config")
}
s.addr, err = net.ResolveUDPAddr("udp", s.config.BindAddress)
if err != nil {
s.Logger.Printf("Failed to resolve UDP address %s: %s", s.config.BindAddress, err)
return err
}
s.conn, err = net.ListenUDP("udp", s.addr)
if err != nil {
s.Logger.Printf("Failed to set up UDP listener at address %s: %s", s.addr, err)
return err
}
s.Logger.Printf("Started listening on %s", s.config.BindAddress)
s.batcher = tsdb.NewPointBatcher(s.config.BatchSize, time.Duration(s.config.BatchTimeout))
s.wg.Add(2)
go s.serve()
go s.writePoints()
return nil
}
func (s *Service) writePoints() {
defer s.wg.Done()
for {
select {
case batch := <-s.batcher.Out():
err := s.PointsWriter.WritePoints(&cluster.WritePointsRequest{
Database: s.config.Database,
RetentionPolicy: "",
ConsistencyLevel: cluster.ConsistencyLevelOne,
Points: batch,
})
if err != nil {
s.Logger.Printf("Failed to write points batch to database %s: %s", s.config.Database, err)
}
2015-06-04 23:07:34 +00:00
case <-s.done:
return
}
2015-06-03 13:06:36 +00:00
}
}
2015-06-04 23:07:34 +00:00
func (s *Service) serve() {
defer s.wg.Done()
s.batcher.Start()
for {
buf := make([]byte, UDPBufferSize)
select {
case <-s.done:
// We closed the connection, time to go.
return
default:
// Keep processing.
}
n, _, err := s.conn.ReadFromUDP(buf)
if err != nil {
s.Logger.Printf("Failed to read UDP message: %s", err)
continue
}
points, err := tsdb.ParsePoints(buf[:n])
if err != nil {
s.Logger.Printf("Failed to parse points: %s", err)
continue
}
for _, point := range points {
s.batcher.In() <- point
}
}
2015-06-03 13:06:36 +00:00
}
func (s *Service) Close() error {
2015-06-04 23:07:34 +00:00
if s.conn == nil {
return errors.New("Service already closed")
}
s.conn.Close()
s.batcher.Flush()
close(s.done)
s.wg.Wait()
// Release all remaining resources.
s.done = nil
s.conn = nil
s.Logger.Print("Service closed")
return nil
2015-06-03 13:06:36 +00:00
}
// SetLogger sets the internal logger to the logger passed in.
func (s *Service) SetLogger(l *log.Logger) {
s.Logger = l
}
2015-06-03 13:06:36 +00:00
func (s *Service) Addr() net.Addr {
2015-06-04 23:07:34 +00:00
return s.addr
2015-06-03 13:06:36 +00:00
}