diff --git a/cmd/influxd/config.go b/cmd/influxd/config.go index 12e639a77a..e570e44ecb 100644 --- a/cmd/influxd/config.go +++ b/cmd/influxd/config.go @@ -74,15 +74,9 @@ type Config struct { InputPlugins struct { UDPInput struct { - Enabled bool `toml:"enabled"` - Port int `toml:"port"` - Database string `toml:"database"` + Enabled bool `toml:"enabled"` + Port int `toml:"port"` } `toml:"udp"` - UDPServersInput []struct { - Enabled bool `toml:"enabled"` - Port int `toml:"port"` - Database string `toml:"database"` - } `toml:"udp_servers"` } `toml:"input_plugins"` Broker struct { diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index e16d247b18..9804198604 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -20,6 +20,7 @@ import ( "github.com/influxdb/influxdb/graphite" "github.com/influxdb/influxdb/httpd" "github.com/influxdb/influxdb/messaging" + "github.com/influxdb/influxdb/udp" ) func Run(config *Config, join, version string, logWriter *os.File) (*messaging.Broker, *influxdb.Server) { @@ -110,6 +111,17 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B } } + // Start the server bound to a UDP listener + if config.InputPlugins.UDPInput.Enabled { + connectString := fmt.Sprintf("%s:%d", config.BindAddress, config.InputPlugins.UDPInput.Port) + log.Printf("Starting UDP listener on %s", connectString) + u := udp.NewUDPServer(s) + if err := u.ListenAndServe(connectString); err != nil { + log.Printf("Failed to start UDP listener on %s. Got error %s.", connectString, err) + } + + } + // Spin up any Graphite servers for _, c := range config.Graphites { if !c.Enabled { diff --git a/udp.go b/udp.go deleted file mode 100644 index a44d465484..0000000000 --- a/udp.go +++ /dev/null @@ -1,99 +0,0 @@ -package influxdb - -import ( - "net" - "sync" -) - -// UDPServer represents a UDP transport for InfluxDB. -type UDPServer struct { - server *Server - - mu sync.Mutex - wg sync.WaitGroup - done chan struct{} // close notification - - // The UDP address to listen on. - Addr *net.UDPAddr - - // The name of the database to insert data into. - Database string - - // The user authorized to insert the data. - User *User -} - -// NewUDPServer returns an instance of UDPServer attached to a Server. -func NewUDPServer(server *Server) *UDPServer { - return &UDPServer{server: server} -} - -// ListenAndServe opens a UDP socket to listen for messages. -func (s *UDPServer) ListenAndServe() error { - panic("not yet implemented: UDPServer.ListenAndServe()") - - /* TEMPORARILY REMOVED FOR PROTOBUFS. - // Validate that server has a UDP address. - if s.Addr == nil { - return ErrBindAddressRequired - } - - // Open UDP connection. - conn, err := net.ListenUDP("udp", s.Addr) - if err != nil { - return err - } - defer conn.Close() - - // Read messages off the connection and handle them. - buffer := make([]byte, 2048) - for { - n, _, err := conn.ReadFromUDP(buffer) - if err != nil || n == 0 { - log.Error("UDP ReadFromUDP error: %s", err) - continue - } - - // Create a JSON decoder. - dec := json.NewDecoder(bytes.NewBuffer(buffer[0:n])) - dec.UseNumber() - - // Deserialize data into series. - var a []*serializedSeries - if err := dec.Decode(&a); err != nil { - log.Error("UDP json error: %s", err) - continue - } - - // Write data points to the data store. - for _, ss := range a { - if len(ss.Points) == 0 { - continue - } - - // Convert to the internal series format. - series, err := ss.series(SecondPrecision) - if err != nil { - log.Error("udp cannot convert received data: %s", err) - continue - } - - // TODO: Authorization. - - // Lookup database. - db := s.server.Database(s.Database) - if db == nil { - log.Error("udp: %s", ErrDatabaseNotFound) - continue - } - - // Write series data to server. - if err := db.WriteSeries(series); err != nil { - log.Error("udp: write data error: %s", err) - continue - } - } - - } - */ -} diff --git a/udp/udp.go b/udp/udp.go new file mode 100644 index 0000000000..83dc6086ea --- /dev/null +++ b/udp/udp.go @@ -0,0 +1,86 @@ +package udp + +import ( + "bytes" + "encoding/json" + "github.com/influxdb/influxdb" + "log" + "net" +) + +const ( + udpBufferSize = 65536 +) + +// SeriesWriter defines the interface for the destination of the data. +type SeriesWriter interface { + WriteSeries(database, retentionPolicy string, points []influxdb.Point) (uint64, error) +} + +// UDPServer +type UDPServer struct { + writer SeriesWriter +} + +// NewUDPServer returns a new instance of a UDPServer +func NewUDPServer(w SeriesWriter) *UDPServer { + u := UDPServer{ + writer: w, + } + return &u +} + +// ListenAndServe binds the server to the given UDP interface. +func (u *UDPServer) ListenAndServe(iface string) error { + + addr, err := net.ResolveUDPAddr("udp", iface) + if err != nil { + log.Printf("Failed resolve UDP address %s. Error is %s", iface, err) + return err + } + + conn, err := net.ListenUDP("udp", addr) + if err != nil { + log.Printf("Failed set up UDP listener at address %s. Error is %s", addr, err) + return err + } + + var bp influxdb.BatchPoints + buf := make([]byte, udpBufferSize) + + go func() { + for { + _, remote, err := conn.ReadFromUDP(buf) + if err != nil { + log.Printf("Failed read UDP message. Error is %s.", err) + continue + } + + dec := json.NewDecoder(bytes.NewReader(buf)) + if err := dec.Decode(&bp); err != nil { + log.Printf("Failed decode JSON UDP message") + msgUDP := []byte("Failed to decode your message") + conn.WriteToUDP(msgUDP, remote) + continue + } + + points, err := influxdb.NormalizeBatchPoints(bp) + if err != nil { + log.Printf("Failed normalize batch points") + msgUDP := []byte("Failed find points in your message") + conn.WriteToUDP(msgUDP, remote) + continue + } + + if msgIndex, err := u.writer.WriteSeries(bp.Database, bp.RetentionPolicy, points); err != nil { + log.Printf("Server write failed. Message index was %d. Error is %s.", msgIndex, err) + msgUDP := []byte("Failed to write series to the database") + conn.WriteToUDP(msgUDP, remote) + } else { + msgUDP := []byte("Write OK") + conn.WriteToUDP(msgUDP, remote) + } + } + }() + return nil +}