JSON via UDP

Issue: 1755

Bind the server to a UDP listener.
pull/1848/head
Joseph Rothrock 2015-03-04 12:31:39 -08:00
parent 88a4852279
commit 6bb2ad6b95
4 changed files with 100 additions and 107 deletions

View File

@ -74,15 +74,9 @@ type Config struct {
InputPlugins struct {
UDPInput struct {
Enabled bool `toml:"enabled"`
Port int `toml:"port"`
Database string `toml:"database"`
Enabled bool `toml:"enabled"`
Port int `toml:"port"`
} `toml:"udp"`
UDPServersInput []struct {
Enabled bool `toml:"enabled"`
Port int `toml:"port"`
Database string `toml:"database"`
} `toml:"udp_servers"`
} `toml:"input_plugins"`
Broker struct {

View File

@ -20,6 +20,7 @@ import (
"github.com/influxdb/influxdb/graphite"
"github.com/influxdb/influxdb/httpd"
"github.com/influxdb/influxdb/messaging"
"github.com/influxdb/influxdb/udp"
)
func Run(config *Config, join, version string, logWriter *os.File) (*messaging.Broker, *influxdb.Server) {
@ -110,6 +111,17 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
}
}
// Start the server bound to a UDP listener
if config.InputPlugins.UDPInput.Enabled {
connectString := fmt.Sprintf("%s:%d", config.BindAddress, config.InputPlugins.UDPInput.Port)
log.Printf("Starting UDP listener on %s", connectString)
u := udp.NewUDPServer(s)
if err := u.ListenAndServe(connectString); err != nil {
log.Printf("Failed to start UDP listener on %s. Got error %s.", connectString, err)
}
}
// Spin up any Graphite servers
for _, c := range config.Graphites {
if !c.Enabled {

99
udp.go
View File

@ -1,99 +0,0 @@
package influxdb
import (
"net"
"sync"
)
// UDPServer represents a UDP transport for InfluxDB.
type UDPServer struct {
server *Server
mu sync.Mutex
wg sync.WaitGroup
done chan struct{} // close notification
// The UDP address to listen on.
Addr *net.UDPAddr
// The name of the database to insert data into.
Database string
// The user authorized to insert the data.
User *User
}
// NewUDPServer returns an instance of UDPServer attached to a Server.
func NewUDPServer(server *Server) *UDPServer {
return &UDPServer{server: server}
}
// ListenAndServe opens a UDP socket to listen for messages.
func (s *UDPServer) ListenAndServe() error {
panic("not yet implemented: UDPServer.ListenAndServe()")
/* TEMPORARILY REMOVED FOR PROTOBUFS.
// Validate that server has a UDP address.
if s.Addr == nil {
return ErrBindAddressRequired
}
// Open UDP connection.
conn, err := net.ListenUDP("udp", s.Addr)
if err != nil {
return err
}
defer conn.Close()
// Read messages off the connection and handle them.
buffer := make([]byte, 2048)
for {
n, _, err := conn.ReadFromUDP(buffer)
if err != nil || n == 0 {
log.Error("UDP ReadFromUDP error: %s", err)
continue
}
// Create a JSON decoder.
dec := json.NewDecoder(bytes.NewBuffer(buffer[0:n]))
dec.UseNumber()
// Deserialize data into series.
var a []*serializedSeries
if err := dec.Decode(&a); err != nil {
log.Error("UDP json error: %s", err)
continue
}
// Write data points to the data store.
for _, ss := range a {
if len(ss.Points) == 0 {
continue
}
// Convert to the internal series format.
series, err := ss.series(SecondPrecision)
if err != nil {
log.Error("udp cannot convert received data: %s", err)
continue
}
// TODO: Authorization.
// Lookup database.
db := s.server.Database(s.Database)
if db == nil {
log.Error("udp: %s", ErrDatabaseNotFound)
continue
}
// Write series data to server.
if err := db.WriteSeries(series); err != nil {
log.Error("udp: write data error: %s", err)
continue
}
}
}
*/
}

86
udp/udp.go Normal file
View File

@ -0,0 +1,86 @@
package udp
import (
"bytes"
"encoding/json"
"github.com/influxdb/influxdb"
"log"
"net"
)
const (
udpBufferSize = 65536
)
// SeriesWriter defines the interface for the destination of the data.
type SeriesWriter interface {
WriteSeries(database, retentionPolicy string, points []influxdb.Point) (uint64, error)
}
// UDPServer
type UDPServer struct {
writer SeriesWriter
}
// NewUDPServer returns a new instance of a UDPServer
func NewUDPServer(w SeriesWriter) *UDPServer {
u := UDPServer{
writer: w,
}
return &u
}
// ListenAndServe binds the server to the given UDP interface.
func (u *UDPServer) ListenAndServe(iface string) error {
addr, err := net.ResolveUDPAddr("udp", iface)
if err != nil {
log.Printf("Failed resolve UDP address %s. Error is %s", iface, err)
return err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
log.Printf("Failed set up UDP listener at address %s. Error is %s", addr, err)
return err
}
var bp influxdb.BatchPoints
buf := make([]byte, udpBufferSize)
go func() {
for {
_, remote, err := conn.ReadFromUDP(buf)
if err != nil {
log.Printf("Failed read UDP message. Error is %s.", err)
continue
}
dec := json.NewDecoder(bytes.NewReader(buf))
if err := dec.Decode(&bp); err != nil {
log.Printf("Failed decode JSON UDP message")
msgUDP := []byte("Failed to decode your message")
conn.WriteToUDP(msgUDP, remote)
continue
}
points, err := influxdb.NormalizeBatchPoints(bp)
if err != nil {
log.Printf("Failed normalize batch points")
msgUDP := []byte("Failed find points in your message")
conn.WriteToUDP(msgUDP, remote)
continue
}
if msgIndex, err := u.writer.WriteSeries(bp.Database, bp.RetentionPolicy, points); err != nil {
log.Printf("Server write failed. Message index was %d. Error is %s.", msgIndex, err)
msgUDP := []byte("Failed to write series to the database")
conn.WriteToUDP(msgUDP, remote)
} else {
msgUDP := []byte("Write OK")
conn.WriteToUDP(msgUDP, remote)
}
}
}()
return nil
}