Refactor OpenTSDB to a service
This commit converts the OpenTSDB endpoint into a service.pull/2823/head
parent
edc76b2503
commit
b688eccb77
|
@ -4,6 +4,7 @@
|
|||
|
||||
- [2816](https://github.com/influxdb/influxdb/pull/2816) -- enable UDP service. Thanks @renan-
|
||||
- [2824](https://github.com/influxdb/influxdb/pull/2824) -- Add missing call to WaitGroup.Done in execConn. Thanks @liyichao
|
||||
- [2823](https://github.com/influxdb/influxdb/pull/2823) -- Convert OpenTSDB to a service.
|
||||
|
||||
## v0.9.0-rc32 [2015-06-07]
|
||||
|
||||
|
|
|
@ -147,6 +147,7 @@ func (s *Server) appendOpenTSDBService(c opentsdb.Config) {
|
|||
return
|
||||
}
|
||||
srv := opentsdb.NewService(c)
|
||||
srv.PointsWriter = s.PointsWriter
|
||||
s.Services = append(s.Services, srv)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,16 @@
|
|||
package opentsdb
|
||||
|
||||
const (
|
||||
// DefaultBindAddress is the default address that the service binds to.
|
||||
DefaultBindAddress = ":4242"
|
||||
|
||||
// DefaultDatabase is the default database used for writes.
|
||||
DefaultDatabase = "opentsdb"
|
||||
|
||||
// DefaultRetentionPolicy is the default retention policy used for writes.
|
||||
DefaultRetentionPolicy = ""
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Enabled bool `toml:"enabled"`
|
||||
BindAddress string `toml:"bind-address"`
|
||||
|
@ -8,5 +19,9 @@ type Config struct {
|
|||
}
|
||||
|
||||
func NewConfig() Config {
|
||||
return Config{}
|
||||
return Config{
|
||||
BindAddress: DefaultBindAddress,
|
||||
Database: DefaultDatabase,
|
||||
RetentionPolicy: DefaultRetentionPolicy,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,180 @@
|
|||
package opentsdb
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"compress/gzip"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/cluster"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
type Handler struct {
|
||||
Database string
|
||||
RetentionPolicy string
|
||||
|
||||
PointsWriter interface {
|
||||
WritePoints(p *cluster.WritePointsRequest) error
|
||||
}
|
||||
|
||||
Logger *log.Logger
|
||||
}
|
||||
|
||||
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.URL.Path {
|
||||
case "/api/metadata/put":
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
case "/api/put":
|
||||
h.servePut(w, r)
|
||||
default:
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
// ServeHTTP implements OpenTSDB's HTTP /api/put endpoint
|
||||
func (h *Handler) servePut(w http.ResponseWriter, r *http.Request) {
|
||||
defer r.Body.Close()
|
||||
|
||||
// Require POST method.
|
||||
if r.Method != "POST" {
|
||||
http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
// Wrap reader if it's gzip encoded.
|
||||
var br *bufio.Reader
|
||||
if r.Header.Get("Content-Encoding") == "gzip" {
|
||||
zr, err := gzip.NewReader(r.Body)
|
||||
if err != nil {
|
||||
http.Error(w, "could not read gzip, "+err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
br = bufio.NewReader(zr)
|
||||
} else {
|
||||
br = bufio.NewReader(r.Body)
|
||||
}
|
||||
|
||||
// Lookahead at the first byte.
|
||||
f, err := br.Peek(1)
|
||||
if err != nil || len(f) != 1 {
|
||||
http.Error(w, "peek error: "+err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Peek to see if this is a JSON array.
|
||||
var multi bool
|
||||
switch f[0] {
|
||||
case '{':
|
||||
case '[':
|
||||
multi = true
|
||||
default:
|
||||
http.Error(w, "expected JSON array or hash", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Decode JSON data into slice of points.
|
||||
dps := make([]point, 1)
|
||||
if dec := json.NewDecoder(br); multi {
|
||||
if err = dec.Decode(&dps); err != nil {
|
||||
http.Error(w, "json array decode error", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
if err = dec.Decode(&dps[0]); err != nil {
|
||||
http.Error(w, "json object decode error", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Convert points into TSDB points.
|
||||
points := make([]tsdb.Point, 0, len(dps))
|
||||
for i := range dps {
|
||||
p := dps[i]
|
||||
|
||||
// Convert timestamp to Go time.
|
||||
// If time value is over a billion then it's microseconds.
|
||||
var ts time.Time
|
||||
if p.Time < 10000000000 {
|
||||
ts = time.Unix(p.Time, 0)
|
||||
} else {
|
||||
ts = time.Unix(p.Time/1000, (p.Time%1000)*1000)
|
||||
}
|
||||
|
||||
points = append(points, tsdb.NewPoint(p.Metric, p.Tags, map[string]interface{}{"value": p.Value}, ts))
|
||||
}
|
||||
|
||||
// Write points.
|
||||
if err := h.PointsWriter.WritePoints(&cluster.WritePointsRequest{
|
||||
Database: h.Database,
|
||||
RetentionPolicy: h.RetentionPolicy,
|
||||
ConsistencyLevel: cluster.ConsistencyLevelOne,
|
||||
Points: points,
|
||||
}); influxdb.IsClientError(err) {
|
||||
h.Logger.Println("write series error: ", err)
|
||||
http.Error(w, "write series error: "+err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
} else if err != nil {
|
||||
h.Logger.Println("write series error: ", err)
|
||||
http.Error(w, "write series error: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// chanListener represents a listener that receives connections through a channel.
|
||||
type chanListener struct {
|
||||
addr net.Addr
|
||||
ch chan net.Conn
|
||||
}
|
||||
|
||||
// newChanListener returns a new instance of chanListener.
|
||||
func newChanListener(addr net.Addr) *chanListener {
|
||||
return &chanListener{
|
||||
addr: addr,
|
||||
ch: make(chan net.Conn),
|
||||
}
|
||||
}
|
||||
|
||||
func (ln *chanListener) Accept() (net.Conn, error) {
|
||||
conn, ok := <-ln.ch
|
||||
if !ok {
|
||||
return nil, errors.New("network connection closed")
|
||||
}
|
||||
log.Println("TSDB listener accept ", conn)
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
// Close closes the connection channel.
|
||||
func (ln *chanListener) Close() error {
|
||||
close(ln.ch)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Addr returns the network address of the listener.
|
||||
func (ln *chanListener) Addr() net.Addr { return ln.addr }
|
||||
|
||||
// readerConn represents a net.Conn with an assignable reader.
|
||||
type readerConn struct {
|
||||
net.Conn
|
||||
r io.Reader
|
||||
}
|
||||
|
||||
// Read implments the io.Reader interface.
|
||||
func (conn *readerConn) Read(b []byte) (n int, err error) { return conn.r.Read(b) }
|
||||
|
||||
// point represents an incoming JSON data point.
|
||||
type point struct {
|
||||
Metric string `json:"metric"`
|
||||
Time int64 `json:"timestamp"`
|
||||
Value float64 `json:"value"`
|
||||
Tags map[string]string `json:"tags,omitempty"`
|
||||
}
|
|
@ -1,392 +0,0 @@
|
|||
package opentsdb
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/textproto"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultPort represents the default OpenTSDB port.
|
||||
DefaultPort = 4242
|
||||
|
||||
// DefaultDatabaseName is the default OpenTSDB database if none is specified.
|
||||
DefaultDatabaseName = "opentsdb"
|
||||
)
|
||||
|
||||
// SeriesWriter defines the interface for the destination of the data.
|
||||
type SeriesWriter interface {
|
||||
WriteSeries(database, retentionPolicy string, points []tsdb.Point) (uint64, error)
|
||||
}
|
||||
|
||||
// Server is an InfluxDB input class to implement OpenTSDB's input protocols.
|
||||
type Server struct {
|
||||
writer SeriesWriter
|
||||
|
||||
database string
|
||||
|
||||
listener *net.TCPListener
|
||||
tsdbhttp *tsdbHTTPListener
|
||||
wg sync.WaitGroup
|
||||
|
||||
addr net.Addr
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewServer(w SeriesWriter, retpol string, db string) *Server {
|
||||
s := &Server{}
|
||||
|
||||
s.writer = w
|
||||
s.database = db
|
||||
s.tsdbhttp = makeTSDBHTTPListener()
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *Server) Addr() net.Addr {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.addr
|
||||
}
|
||||
|
||||
// ListenAndServe start the OpenTSDB compatible server on the given
|
||||
// ip and port.
|
||||
func (s *Server) ListenAndServe(listenAddress string) {
|
||||
var err error
|
||||
|
||||
addr, err := net.ResolveTCPAddr("tcp", listenAddress)
|
||||
if err != nil {
|
||||
log.Println("TSDBServer: ResolveTCPAddr: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
s.listener, err = net.ListenTCP("tcp", addr)
|
||||
if err != nil {
|
||||
log.Println("TSDBServer: Listen: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.addr = s.listener.Addr()
|
||||
s.mu.Unlock()
|
||||
|
||||
s.wg.Add(1)
|
||||
|
||||
// Set up the background HTTP server that we
|
||||
// will pass http request to via a channel
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/api/metadata/put", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
})
|
||||
mux.Handle("/api/put", s)
|
||||
httpsrv := &http.Server{}
|
||||
httpsrv.Handler = mux
|
||||
go httpsrv.Serve(s.tsdbhttp)
|
||||
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
for {
|
||||
conn, err := s.listener.Accept()
|
||||
if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
|
||||
log.Println("openTSDB TCP listener closed")
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
log.Println("error accepting openTSDB: ", err.Error())
|
||||
continue
|
||||
}
|
||||
s.wg.Add(1)
|
||||
go s.HandleConnection(conn)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *Server) Close() error {
|
||||
var err error
|
||||
if s.listener != nil {
|
||||
err = (*s.listener).Close()
|
||||
}
|
||||
s.wg.Wait()
|
||||
s.listener = nil
|
||||
return err
|
||||
}
|
||||
|
||||
// HandleConnection takes each new connection and attempts to
|
||||
// determine if it should be handled by the HTTP handler, if
|
||||
// parsing as a HTTP request fails, we'll pass it to the
|
||||
// telnet handler
|
||||
func (s *Server) HandleConnection(conn net.Conn) {
|
||||
var peekbuf bytes.Buffer
|
||||
t := io.TeeReader(conn, &peekbuf)
|
||||
r := bufio.NewReader(t)
|
||||
|
||||
_, httperr := http.ReadRequest(r)
|
||||
|
||||
splice := io.MultiReader(&peekbuf, conn)
|
||||
bufsplice := bufio.NewReader(splice)
|
||||
newc := &tsdbConn{
|
||||
Conn: conn,
|
||||
rdr: bufsplice,
|
||||
}
|
||||
|
||||
if httperr == nil {
|
||||
s.tsdbhttp.acc <- tsdbHTTPReq{
|
||||
conn: newc,
|
||||
}
|
||||
} else {
|
||||
s.HandleTelnet(newc)
|
||||
}
|
||||
}
|
||||
|
||||
// HandleTelnet accepts OpenTSDB's telnet protocol
|
||||
// Each telnet command consists of a line of the form:
|
||||
// put sys.cpu.user 1356998400 42.5 host=webserver01 cpu=0
|
||||
func (s *Server) HandleTelnet(conn net.Conn) {
|
||||
reader := bufio.NewReader(conn)
|
||||
tp := textproto.NewReader(reader)
|
||||
|
||||
defer conn.Close()
|
||||
defer s.wg.Done()
|
||||
|
||||
for {
|
||||
line, err := tp.ReadLine()
|
||||
if err != nil {
|
||||
log.Println("error reading from openTSDB connection", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
inputStrs := strings.Fields(line)
|
||||
|
||||
if len(inputStrs) == 1 && inputStrs[0] == "version" {
|
||||
conn.Write([]byte("InfluxDB TSDB proxy"))
|
||||
continue
|
||||
}
|
||||
|
||||
if len(inputStrs) < 4 || inputStrs[0] != "put" {
|
||||
log.Println("TSDBServer: malformed line, skipping: ", line)
|
||||
continue
|
||||
}
|
||||
|
||||
measurement := inputStrs[1]
|
||||
tsStr := inputStrs[2]
|
||||
valueStr := inputStrs[3]
|
||||
tagStrs := inputStrs[4:]
|
||||
|
||||
var t time.Time
|
||||
ts, err := strconv.ParseInt(tsStr, 10, 64)
|
||||
if err != nil {
|
||||
log.Println("TSDBServer: malformed time, skipping: ", tsStr)
|
||||
}
|
||||
|
||||
switch len(tsStr) {
|
||||
case 10:
|
||||
t = time.Unix(ts, 0)
|
||||
break
|
||||
case 13:
|
||||
t = time.Unix(ts/1000, (ts%1000)*1000)
|
||||
break
|
||||
default:
|
||||
log.Println("TSDBServer: time must be 10 or 13 chars, skipping: ", tsStr)
|
||||
continue
|
||||
}
|
||||
|
||||
tags := make(map[string]string)
|
||||
for t := range tagStrs {
|
||||
parts := strings.SplitN(tagStrs[t], "=", 2)
|
||||
if len(parts) != 2 {
|
||||
log.Println("TSDBServer: malformed tag data", tagStrs[t])
|
||||
continue
|
||||
}
|
||||
k := parts[0]
|
||||
|
||||
tags[k] = parts[1]
|
||||
}
|
||||
|
||||
fields := make(map[string]interface{})
|
||||
fields["value"], err = strconv.ParseFloat(valueStr, 64)
|
||||
if err != nil {
|
||||
log.Println("TSDBServer: could not parse value as float: ", valueStr)
|
||||
continue
|
||||
}
|
||||
|
||||
p := tsdb.NewPoint(measurement, tags, fields, t)
|
||||
|
||||
_, err = s.writer.WriteSeries(s.database, "", []tsdb.Point{p})
|
||||
if err != nil {
|
||||
log.Println("TSDB cannot write data: ", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
tsdbDP is a struct to unmarshal OpenTSDB /api/put data into
|
||||
Request is either tsdbDP, or a []tsdbDP
|
||||
{
|
||||
"metric": "sys.cpu.nice",
|
||||
"timestamp": 1346846400,
|
||||
"value": 18,
|
||||
"tags": {
|
||||
"host": "web01",
|
||||
"dc": "lga"
|
||||
}
|
||||
}
|
||||
*/
|
||||
type tsdbDP struct {
|
||||
Metric string `json:"metric"`
|
||||
Time int64 `json:"timestamp"`
|
||||
Value float64 `json:"value"`
|
||||
Tags map[string]string `json:"tags,omitempty"`
|
||||
}
|
||||
|
||||
// ServeHTTP implements OpenTSDB's HTTP /api/put endpoint
|
||||
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
defer r.Body.Close()
|
||||
defer s.wg.Done()
|
||||
|
||||
if r.Method != "POST" {
|
||||
http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
dps := make([]tsdbDP, 1)
|
||||
var br *bufio.Reader
|
||||
|
||||
// We need to peek and see if this is an array or a single
|
||||
// DP
|
||||
multi := false
|
||||
|
||||
if r.Header.Get("Content-Encoding") == "gzip" {
|
||||
zr, err := gzip.NewReader(r.Body)
|
||||
if err != nil {
|
||||
http.Error(w, "Could not read gzip, "+err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
br = bufio.NewReader(zr)
|
||||
} else {
|
||||
br = bufio.NewReader(r.Body)
|
||||
}
|
||||
|
||||
f, err := br.Peek(1)
|
||||
|
||||
if err != nil || len(f) != 1 {
|
||||
http.Error(w, "Could not peek at JSON data, "+err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
switch f[0] {
|
||||
case '{':
|
||||
case '[':
|
||||
multi = true
|
||||
default:
|
||||
http.Error(w, "Expected JSON array or hash", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
dec := json.NewDecoder(br)
|
||||
|
||||
if multi {
|
||||
err = dec.Decode(&dps)
|
||||
} else {
|
||||
err = dec.Decode(&dps[0])
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
http.Error(w, "Could not decode JSON as TSDB data", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var idps []tsdb.Point
|
||||
for dpi := range dps {
|
||||
dp := dps[dpi]
|
||||
|
||||
var ts time.Time
|
||||
if dp.Time < 10000000000 {
|
||||
ts = time.Unix(dp.Time, 0)
|
||||
} else {
|
||||
ts = time.Unix(dp.Time/1000, (dp.Time%1000)*1000)
|
||||
}
|
||||
|
||||
fields := make(map[string]interface{})
|
||||
fields["value"] = dp.Value
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
p := tsdb.NewPoint(dp.Metric, dp.Tags, fields, ts)
|
||||
idps = append(idps, p)
|
||||
}
|
||||
_, err = s.writer.WriteSeries(s.database, "", idps)
|
||||
if err != nil {
|
||||
log.Println("TSDB cannot write data: ", err)
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// tsdbHTTPListener is a listener that takes connects from a channel
|
||||
// rather than directly from a network socket
|
||||
type tsdbHTTPListener struct {
|
||||
addr net.Addr
|
||||
cls chan struct{}
|
||||
acc chan tsdbHTTPReq
|
||||
}
|
||||
|
||||
// tsdbHTTPReq represents a incoming connection that we have established
|
||||
// to be a valid http request.
|
||||
type tsdbHTTPReq struct {
|
||||
conn net.Conn
|
||||
err error
|
||||
}
|
||||
|
||||
func (l *tsdbHTTPListener) Accept() (c net.Conn, err error) {
|
||||
select {
|
||||
case newc := <-l.acc:
|
||||
log.Println("TSDB listener accept ", newc)
|
||||
return newc.conn, newc.err
|
||||
case <-l.cls:
|
||||
close(l.cls)
|
||||
close(l.acc)
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (l *tsdbHTTPListener) Close() error {
|
||||
l.cls <- struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *tsdbHTTPListener) Addr() net.Addr {
|
||||
return l.addr
|
||||
}
|
||||
|
||||
func makeTSDBHTTPListener() *tsdbHTTPListener {
|
||||
return &tsdbHTTPListener{
|
||||
acc: make(chan tsdbHTTPReq),
|
||||
cls: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// tsdbConn is a net.Conn implmentation used to splice peeked buffer content
|
||||
// to the pre-existing net.Conn that was peeked into
|
||||
type tsdbConn struct {
|
||||
rdr *bufio.Reader
|
||||
net.Conn
|
||||
}
|
||||
|
||||
// Read implmeents the io.Reader interface
|
||||
func (c *tsdbConn) Read(b []byte) (n int, err error) {
|
||||
return c.rdr.Read(b)
|
||||
}
|
|
@ -1,11 +1,232 @@
|
|||
package opentsdb
|
||||
|
||||
import "net"
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/textproto"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
type Service struct{}
|
||||
"github.com/influxdb/influxdb/cluster"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
func NewService(c Config) *Service { return &Service{} }
|
||||
// Service manages the listener and handler for an HTTP endpoint.
|
||||
type Service struct {
|
||||
ln net.Listener // main listener
|
||||
httpln *chanListener // http channel-based listener
|
||||
|
||||
func (s *Service) Open() error { return nil }
|
||||
func (s *Service) Close() error { return nil }
|
||||
func (s *Service) Addr() net.Addr { return nil }
|
||||
wg sync.WaitGroup
|
||||
err chan error
|
||||
|
||||
BindAddress string
|
||||
Database string
|
||||
RetentionPolicy string
|
||||
|
||||
PointsWriter interface {
|
||||
WritePoints(p *cluster.WritePointsRequest) error
|
||||
}
|
||||
|
||||
Logger *log.Logger
|
||||
}
|
||||
|
||||
// NewService returns a new instance of Service.
|
||||
func NewService(c Config) *Service {
|
||||
s := &Service{
|
||||
err: make(chan error),
|
||||
BindAddress: c.BindAddress,
|
||||
Database: c.Database,
|
||||
RetentionPolicy: c.RetentionPolicy,
|
||||
Logger: log.New(os.Stderr, "[opentsdb] ", log.LstdFlags),
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// Open starts the service
|
||||
func (s *Service) Open() error {
|
||||
// Open listener.
|
||||
ln, err := net.Listen("tcp", s.BindAddress)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.ln = ln
|
||||
s.httpln = newChanListener(ln.Addr())
|
||||
|
||||
s.Logger.Println("listening on:", ln.Addr().String())
|
||||
|
||||
// Begin listening for connections.
|
||||
s.wg.Add(2)
|
||||
go s.serveHTTP()
|
||||
go s.serve()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the underlying listener.
|
||||
func (s *Service) Close() error {
|
||||
if s.ln != nil {
|
||||
return s.ln.Close()
|
||||
}
|
||||
|
||||
s.wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetLogger sets the internal logger to the logger passed in.
|
||||
func (s *Service) SetLogger(l *log.Logger) { s.Logger = l }
|
||||
|
||||
// Err returns a channel for fatal errors that occur on the listener.
|
||||
func (s *Service) Err() <-chan error { return s.err }
|
||||
|
||||
// Addr returns the listener's address. Returns nil if listener is closed.
|
||||
func (s *Service) Addr() net.Addr {
|
||||
if s.ln == nil {
|
||||
return nil
|
||||
}
|
||||
return s.ln.Addr()
|
||||
}
|
||||
|
||||
// serve serves the handler from the listener.
|
||||
func (s *Service) serve() {
|
||||
defer s.wg.Done()
|
||||
|
||||
for {
|
||||
// Wait for next connection.
|
||||
conn, err := s.ln.Accept()
|
||||
if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
|
||||
s.Logger.Println("openTSDB TCP listener closed")
|
||||
return
|
||||
} else if err != nil {
|
||||
s.Logger.Println("error accepting openTSDB: ", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
// Handle connection in separate goroutine.
|
||||
go s.handleConn(conn)
|
||||
}
|
||||
}
|
||||
|
||||
// handleConn processes conn. This is run in a separate goroutine.
|
||||
func (s *Service) handleConn(conn net.Conn) {
|
||||
// Read header into buffer to check if it's HTTP.
|
||||
var buf bytes.Buffer
|
||||
r := bufio.NewReader(io.TeeReader(conn, &buf))
|
||||
|
||||
// Attempt to parse connection as HTTP.
|
||||
_, err := http.ReadRequest(r)
|
||||
|
||||
// Rebuild connection from buffer and remaining connection data.
|
||||
bufr := bufio.NewReader(io.MultiReader(&buf, conn))
|
||||
conn = &readerConn{Conn: conn, r: bufr}
|
||||
|
||||
// If no HTTP parsing error occurred then process as HTTP.
|
||||
if err == nil {
|
||||
s.httpln.ch <- conn
|
||||
return
|
||||
}
|
||||
|
||||
// Otherwise handle in telnet format.
|
||||
s.wg.Add(1)
|
||||
s.handleTelnetConn(conn)
|
||||
}
|
||||
|
||||
// handleTelnetConn accepts OpenTSDB's telnet protocol.
|
||||
// Each telnet command consists of a line of the form:
|
||||
// put sys.cpu.user 1356998400 42.5 host=webserver01 cpu=0
|
||||
func (s *Service) handleTelnetConn(conn net.Conn) {
|
||||
defer conn.Close()
|
||||
defer s.wg.Done()
|
||||
|
||||
// Wrap connection in a text protocol reader.
|
||||
r := textproto.NewReader(bufio.NewReader(conn))
|
||||
for {
|
||||
line, err := r.ReadLine()
|
||||
if err != nil {
|
||||
s.Logger.Println("error reading from openTSDB connection", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
inputStrs := strings.Fields(line)
|
||||
|
||||
if len(inputStrs) == 1 && inputStrs[0] == "version" {
|
||||
conn.Write([]byte("InfluxDB TSDB proxy"))
|
||||
continue
|
||||
}
|
||||
|
||||
if len(inputStrs) < 4 || inputStrs[0] != "put" {
|
||||
s.Logger.Println("TSDBServer: malformed line, skipping: ", line)
|
||||
continue
|
||||
}
|
||||
|
||||
measurement := inputStrs[1]
|
||||
tsStr := inputStrs[2]
|
||||
valueStr := inputStrs[3]
|
||||
tagStrs := inputStrs[4:]
|
||||
|
||||
var t time.Time
|
||||
ts, err := strconv.ParseInt(tsStr, 10, 64)
|
||||
if err != nil {
|
||||
s.Logger.Println("TSDBServer: malformed time, skipping: ", tsStr)
|
||||
}
|
||||
|
||||
switch len(tsStr) {
|
||||
case 10:
|
||||
t = time.Unix(ts, 0)
|
||||
break
|
||||
case 13:
|
||||
t = time.Unix(ts/1000, (ts%1000)*1000)
|
||||
break
|
||||
default:
|
||||
s.Logger.Println("TSDBServer: time must be 10 or 13 chars, skipping: ", tsStr)
|
||||
continue
|
||||
}
|
||||
|
||||
tags := make(map[string]string)
|
||||
for t := range tagStrs {
|
||||
parts := strings.SplitN(tagStrs[t], "=", 2)
|
||||
if len(parts) != 2 {
|
||||
s.Logger.Println("TSDBServer: malformed tag data", tagStrs[t])
|
||||
continue
|
||||
}
|
||||
k := parts[0]
|
||||
|
||||
tags[k] = parts[1]
|
||||
}
|
||||
|
||||
fields := make(map[string]interface{})
|
||||
fields["value"], err = strconv.ParseFloat(valueStr, 64)
|
||||
if err != nil {
|
||||
s.Logger.Println("TSDBServer: could not parse value as float: ", valueStr)
|
||||
continue
|
||||
}
|
||||
|
||||
p := tsdb.NewPoint(measurement, tags, fields, t)
|
||||
if err := s.PointsWriter.WritePoints(&cluster.WritePointsRequest{
|
||||
Database: s.Database,
|
||||
RetentionPolicy: s.RetentionPolicy,
|
||||
ConsistencyLevel: cluster.ConsistencyLevelOne,
|
||||
Points: []tsdb.Point{p},
|
||||
}); err != nil {
|
||||
s.Logger.Println("TSDB cannot write data: ", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// serveHTTP handles connections in HTTP format.
|
||||
func (s *Service) serveHTTP() {
|
||||
srv := &http.Server{Handler: &Handler{
|
||||
Database: s.Database,
|
||||
RetentionPolicy: s.RetentionPolicy,
|
||||
PointsWriter: s.PointsWriter,
|
||||
Logger: s.Logger,
|
||||
}}
|
||||
srv.Serve(s.httpln)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,154 @@
|
|||
package opentsdb_test
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/influxdb/influxdb/cluster"
|
||||
"github.com/influxdb/influxdb/services/opentsdb"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
// Ensure a point can be written via the telnet protocol.
|
||||
func TestService_Telnet(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s := NewService("db0")
|
||||
if err := s.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
// Mock points writer.
|
||||
var called int32
|
||||
s.PointsWriter.WritePointsFn = func(req *cluster.WritePointsRequest) error {
|
||||
atomic.StoreInt32(&called, 1)
|
||||
|
||||
if req.Database != "db0" {
|
||||
t.Fatalf("unexpected database: %s", req.Database)
|
||||
} else if req.RetentionPolicy != "" {
|
||||
t.Fatalf("unexpected retention policy: %s", req.RetentionPolicy)
|
||||
} else if !reflect.DeepEqual(req.Points, []tsdb.Point{
|
||||
tsdb.NewPoint(
|
||||
"sys.cpu.user",
|
||||
map[string]string{"host": "webserver01", "cpu": "0"},
|
||||
map[string]interface{}{"value": 42.5},
|
||||
time.Unix(1356998400, 0),
|
||||
),
|
||||
}) {
|
||||
spew.Dump(req.Points)
|
||||
t.Fatalf("unexpected points: %#v", req.Points)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Open connection to the service.
|
||||
conn, err := net.Dial("tcp", s.Addr().String())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// Write telnet data and close.
|
||||
if _, err := conn.Write([]byte("put sys.cpu.user 1356998400 42.5 host=webserver01 cpu=0")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := conn.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// Verify that the writer was called.
|
||||
if atomic.LoadInt32(&called) == 0 {
|
||||
t.Fatal("points writer not called")
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure a point can be written via the HTTP protocol.
|
||||
func TestService_HTTP(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s := NewService("db0")
|
||||
if err := s.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
// Mock points writer.
|
||||
var called bool
|
||||
s.PointsWriter.WritePointsFn = func(req *cluster.WritePointsRequest) error {
|
||||
called = true
|
||||
if req.Database != "db0" {
|
||||
t.Fatalf("unexpected database: %s", req.Database)
|
||||
} else if req.RetentionPolicy != "" {
|
||||
t.Fatalf("unexpected retention policy: %s", req.RetentionPolicy)
|
||||
} else if !reflect.DeepEqual(req.Points, []tsdb.Point{
|
||||
tsdb.NewPoint(
|
||||
"sys.cpu.nice",
|
||||
map[string]string{"dc": "lga", "host": "web01"},
|
||||
map[string]interface{}{"value": 18.0},
|
||||
time.Unix(1346846400, 0),
|
||||
),
|
||||
}) {
|
||||
spew.Dump(req.Points)
|
||||
t.Fatalf("unexpected points: %#v", req.Points)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write HTTP request to server.
|
||||
resp, err := http.Post("http://"+s.Addr().String()+"/api/put", "application/json", strings.NewReader(`{"metric":"sys.cpu.nice", "timestamp":1346846400, "value":18, "tags":{"host":"web01", "dc":"lga"}}`))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Verify status and body.
|
||||
if resp.StatusCode != http.StatusNoContent {
|
||||
t.Fatalf("unexpected status code: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
// Verify that the writer was called.
|
||||
if !called {
|
||||
t.Fatal("points writer not called")
|
||||
}
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
*opentsdb.Service
|
||||
PointsWriter PointsWriter
|
||||
}
|
||||
|
||||
// NewService returns a new instance of Service.
|
||||
func NewService(database string) *Service {
|
||||
s := &Service{
|
||||
Service: opentsdb.NewService(opentsdb.Config{
|
||||
BindAddress: "127.0.0.1:0",
|
||||
Database: database,
|
||||
}),
|
||||
}
|
||||
s.Service.PointsWriter = &s.PointsWriter
|
||||
|
||||
if !testing.Verbose() {
|
||||
s.Logger = log.New(ioutil.Discard, "", log.LstdFlags)
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// PointsWriter represents a mock impl of PointsWriter.
|
||||
type PointsWriter struct {
|
||||
WritePointsFn func(*cluster.WritePointsRequest) error
|
||||
}
|
||||
|
||||
func (w *PointsWriter) WritePoints(p *cluster.WritePointsRequest) error {
|
||||
return w.WritePointsFn(p)
|
||||
}
|
Loading…
Reference in New Issue