influxdb/udp.go

101 lines
2.0 KiB
Go
Raw Normal View History

2014-10-22 05:32:19 +00:00
package influxdb
import (
"bytes"
"encoding/json"
"net"
"sync"
log "code.google.com/p/log4go"
)
// UDPServer
type UDPServer struct {
server *Server
mu sync.Mutex
wg sync.WaitGroup
done chan struct{} // close notification
// The UDP address to listen on.
Addr *net.UDPAddr
// The name of the database to insert data into.
Database string
// The cluster admin authorized to insert the data.
User *ClusterAdmin
}
// NewUDPServer returns an instance of UDPServer attached to a Server.
func NewUDPServer(server *Server) *UDPServer {
return &UDPServer{server: server}
}
// ListenAndServe opens a UDP socket to listen for messages.
2014-10-23 04:21:48 +00:00
func (s *UDPServer) ListenAndServe() error {
2014-10-22 05:32:19 +00:00
// Validate that server has a UDP address.
2014-10-23 04:21:48 +00:00
if s.Addr == nil {
2014-10-22 05:32:19 +00:00
return ErrBindAddressRequired
}
// Open UDP connection.
2014-10-23 04:21:48 +00:00
conn, err := net.ListenUDP("udp", s.Addr)
2014-10-22 05:32:19 +00:00
if err != nil {
return err
}
2014-10-23 04:21:48 +00:00
defer conn.Close()
2014-10-22 05:32:19 +00:00
// Read messages off the connection and handle them.
buffer := make([]byte, 2048)
for {
2014-10-23 04:21:48 +00:00
n, _, err := conn.ReadFromUDP(buffer)
2014-10-22 05:32:19 +00:00
if err != nil || n == 0 {
log.Error("UDP ReadFromUDP error: %s", err)
continue
}
// Create a JSON decoder.
dec := json.NewDecoder(bytes.NewBuffer(buffer[0:n]))
dec.UseNumber()
// Deserialize data into series.
var a []*serializedSeries
2014-10-23 04:21:48 +00:00
if err := dec.Decode(&a); err != nil {
2014-10-22 05:32:19 +00:00
log.Error("UDP json error: %s", err)
continue
}
// Write data points to the data store.
2014-10-23 04:21:48 +00:00
for _, ss := range a {
if len(ss.Points) == 0 {
2014-10-22 05:32:19 +00:00
continue
}
// Convert to the internal series format.
2014-11-11 05:25:03 +00:00
series, err := ss.series(SecondPrecision)
2014-10-22 05:32:19 +00:00
if err != nil {
log.Error("udp cannot convert received data: %s", err)
continue
}
2014-11-04 04:15:58 +00:00
// TODO: Authorization.
// Lookup database.
db := s.server.Database(s.Database)
if db == nil {
log.Error("udp: %s", ErrDatabaseNotFound)
continue
}
2014-10-22 05:32:19 +00:00
// Write series data to server.
2014-11-04 04:15:58 +00:00
if err := db.WriteSeries(series); err != nil {
log.Error("udp: write data error: %s", err)
2014-10-22 05:32:19 +00:00
continue
}
}
}
}