Add Support for OpenTSDB HTTP interface

OpenTSDB support a http and telnet interface on the same port. This
patch adds support for the /api/put endpoint, both single, for both
single and multiple datapoint submissions.
pull/2254/head
Tristan Colgate-McFarlane 2015-04-07 21:18:32 +01:00 committed by Tristan Colgate
parent e413539389
commit 2c68fd27c6
2 changed files with 317 additions and 8 deletions

View File

@ -1949,7 +1949,7 @@ func Test_ServerOpenTSDBIntegration(t *testing.T) {
t.Skip()
}
nNodes := 1
testName := "opentsdb integration"
testName := "ServerOpenTSDBIntegration"
dir := tempfile()
now := time.Now().UTC().Round(time.Second)
c, _ := main.NewTestConfig()
@ -2003,7 +2003,7 @@ func Test_ServerOpenTSDBIntegration_WithTags(t *testing.T) {
t.Skip()
}
nNodes := 1
testName := "opentsdb integration"
testName := "ServerOpenTSDBIntegration_WithTags"
dir := tempfile()
now := time.Now().UTC().Round(time.Second)
c, _ := main.NewTestConfig()
@ -2062,7 +2062,7 @@ func Test_ServerOpenTSDBIntegration_BadData(t *testing.T) {
t.Skip()
}
nNodes := 1
testName := "opentsdb integration"
testName := "ServerOpenTSDBIntegration_BadData"
dir := tempfile()
now := time.Now().UTC().Round(time.Second)
c, _ := main.NewTestConfig()
@ -2113,6 +2113,101 @@ func Test_ServerOpenTSDBIntegration_BadData(t *testing.T) {
}
}
func Test_ServerOpenTSDBIntegrationHTTPSingle(t *testing.T) {
if testing.Short() {
t.Skip()
}
nNodes := 1
testName := "ServerOpenTSDBIntegrationHTTPSingle"
dir := tempfile()
now := time.Now().UTC().Round(time.Second)
c, _ := main.NewTestConfig()
o := main.OpenTSDB{
Addr: "127.0.0.1",
Port: 0,
Enabled: true,
Database: "opentsdb",
RetentionPolicy: "raw",
}
c.OpenTSDB = o
t.Logf("OpenTSDB Connection String: %s\n", o.ListenAddress())
nodes := createCombinedNodeCluster(t, testName, dir, nNodes, c)
defer nodes.Close()
createDatabase(t, testName, nodes, "opentsdb")
createRetentionPolicy(t, testName, nodes, "opentsdb", "raw", len(nodes))
ts := fmt.Sprintf("%d", now.Unix())
data := bytes.NewBufferString(`{"metric":"cpu","timestamp":` + ts + `,"value":10,"tags":{"tag1":"val1","tag2":"val2"}}`)
host := nodes[0].node.OpenTSDBServer.Addr().String()
url := "http://" + host + "/api/put"
resp, err := http.Post(url, "text/json", data)
if err != nil {
t.Fatal(err)
return
}
resp.Body.Close()
expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",10]]}]}]}`, now.Format(time.RFC3339Nano))
// query and wait for results
got, ok, _ := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu`, expected, "", openTSDBTestTimeout)
if !ok {
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
}
}
func Test_ServerOpenTSDBIntegrationHTTPMulti(t *testing.T) {
if testing.Short() {
t.Skip()
}
nNodes := 1
testName := "ServerOpenTSDBIntegrationHTTPMulti"
dir := tempfile()
now := time.Now().UTC().Round(time.Second)
c, _ := main.NewTestConfig()
o := main.OpenTSDB{
Addr: "127.0.0.1",
Port: 0,
Enabled: true,
Database: "opentsdb",
RetentionPolicy: "raw",
}
c.OpenTSDB = o
t.Logf("OpenTSDB Connection String: %s\n", o.ListenAddress())
nodes := createCombinedNodeCluster(t, testName, dir, nNodes, c)
defer nodes.Close()
createDatabase(t, testName, nodes, "opentsdb")
createRetentionPolicy(t, testName, nodes, "opentsdb", "raw", len(nodes))
ts := fmt.Sprintf("%d", now.Unix())
data := bytes.NewBufferString(`[{"metric":"cpu","timestamp":` + ts + `,"value":10,"tags":{"tag1":"val1","tag2":"val2"}},{"metric":"cpu","timestamp":` + ts + `,"value":20,"tags":{"tag1":"val3","tag2":"val4"}}]`)
host := nodes[0].node.OpenTSDBServer.Addr().String()
url := "http://" + host + "/api/put"
resp, err := http.Post(url, "text/json", data)
if err != nil {
t.Fatal(err)
return
}
resp.Body.Close()
expts := now.Format(time.RFC3339Nano)
expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["%s",10],["%s",20]]}]}]}`, expts, expts)
// query and wait for results
got, ok, _ := queryAndWait(t, nodes, "opentsdb", `select * from "opentsdb"."raw".cpu`, expected, "", openTSDBTestTimeout)
if !ok {
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
}
}
func TestSeparateBrokerDataNode(t *testing.T) {
t.Parallel()
testName := "TestSeparateBrokerDataNode"

View File

@ -2,8 +2,13 @@ package opentsdb
import (
"bufio"
"bytes"
"compress/gzip"
"encoding/json"
"io"
"log"
"net"
"net/http"
"net/textproto"
"strconv"
"strings"
@ -17,7 +22,7 @@ const (
// DefaultPort represents the default OpenTSDB port.
DefaultPort = 4242
// DefaultDatabaseName is the default OpenTSDB database if none is specified
// DefaultDatabaseName is the default OpenTSDB database if none is specified.
DefaultDatabaseName = "opentsdb"
)
@ -26,9 +31,7 @@ type SeriesWriter interface {
WriteSeries(database, retentionPolicy string, points []influxdb.Point) (uint64, error)
}
// An InfluxDB input class to accept OpenTSDB's telnet protocol
// Each telnet command consists of a line of the form:
// put sys.cpu.user 1356998400 42.5 host=webserver01 cpu=0
// Server is an InfluxDB input class to implement OpenTSDB's input protocols.
type Server struct {
writer SeriesWriter
@ -36,6 +39,7 @@ type Server struct {
retentionpolicy string
listener *net.TCPListener
tsdbhttp *tsdbHTTPListener
wg sync.WaitGroup
addr net.Addr
@ -48,6 +52,7 @@ func NewServer(w SeriesWriter, retpol string, db string) *Server {
s.writer = w
s.retentionpolicy = retpol
s.database = db
s.tsdbhttp = makeTSDBHTTPListener()
return s
}
@ -58,10 +63,12 @@ func (s *Server) Addr() net.Addr {
return s.addr
}
// ListenAndServe start the OpenTSDB compatible server on the given
// ip and port.
func (s *Server) ListenAndServe(listenAddress string) {
var err error
addr, err := net.ResolveTCPAddr("tcp4", listenAddress)
addr, err := net.ResolveTCPAddr("tcp", listenAddress)
if err != nil {
log.Println("TSDBServer: ResolveTCPAddr: ", err)
return
@ -78,6 +85,18 @@ func (s *Server) ListenAndServe(listenAddress string) {
s.mu.Unlock()
s.wg.Add(1)
// Set up the background HTTP server that we
// will pass http request to via a channel
mux := http.NewServeMux()
mux.HandleFunc("/api/metadata/put", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
})
mux.Handle("/api/put", s)
httpsrv := &http.Server{}
httpsrv.Handler = mux
go httpsrv.Serve(s.tsdbhttp)
go func() {
defer s.wg.Done()
for {
@ -106,7 +125,37 @@ func (s *Server) Close() error {
return err
}
// HandleConnection takes each new connection and attempts to
// determine if it should be handled by the HTTP handler, if
// parsing as a HTTP request fails, we'll pass it to the
// telnet handler
func (s *Server) HandleConnection(conn net.Conn) {
var peekbuf bytes.Buffer
t := io.TeeReader(conn, &peekbuf)
r := bufio.NewReader(t)
_, httperr := http.ReadRequest(r)
splice := io.MultiReader(&peekbuf, conn)
bufsplice := bufio.NewReader(splice)
newc := &tsdbConn{
Conn: conn,
rdr: bufsplice,
}
if httperr == nil {
s.tsdbhttp.acc <- tsdbHTTPReq{
conn: newc,
}
} else {
s.HandleTelnet(newc)
}
}
// HandleTelnet accepts OpenTSDB's telnet protocol
// Each telnet command consists of a line of the form:
// put sys.cpu.user 1356998400 42.5 host=webserver01 cpu=0
func (s *Server) HandleTelnet(conn net.Conn) {
reader := bufio.NewReader(conn)
tp := textproto.NewReader(reader)
@ -188,3 +237,168 @@ func (s *Server) HandleConnection(conn net.Conn) {
}
}
}
/*
tsdbDP is a struct to unmarshal OpenTSDB /api/put data into
Request is either tsdbDP, or a []tsdbDP
{
"metric": "sys.cpu.nice",
"timestamp": 1346846400,
"value": 18,
"tags": {
"host": "web01",
"dc": "lga"
}
}
*/
type tsdbDP struct {
Metric string `json:"metric"`
Timestamp int64 `json:"timestamp"`
Value float64 `json:"value"`
Tags map[string]string `json:"tags,omitempty"`
}
// ServeHTTP implements OpenTSDB's HTTP /api/put endpoint
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
defer s.wg.Done()
if r.Method != "POST" {
http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
return
}
dps := make([]tsdbDP, 1)
var br *bufio.Reader
// We need to peek and see if this is an array or a single
// DP
multi := false
if r.Header.Get("Content-Encoding") == "gzip" {
zr, err := gzip.NewReader(r.Body)
if err != nil {
http.Error(w, "Could not read gzip, "+err.Error(), http.StatusBadRequest)
return
}
br = bufio.NewReader(zr)
} else {
br = bufio.NewReader(r.Body)
}
f, err := br.Peek(1)
if err != nil || len(f) != 1 {
http.Error(w, "Could not peek at JSON data, "+err.Error(), http.StatusBadRequest)
return
}
switch f[0] {
case '{':
case '[':
multi = true
default:
http.Error(w, "Expected JSON array or hash", http.StatusBadRequest)
return
}
dec := json.NewDecoder(br)
if multi {
err = dec.Decode(&dps)
} else {
err = dec.Decode(&dps[0])
}
if err != nil {
http.Error(w, "Could not decode JSON as TSDB data", http.StatusBadRequest)
return
}
var idps []influxdb.Point
for dpi := range dps {
dp := dps[dpi]
var ts time.Time
if dp.Timestamp < 10000000000 {
ts = time.Unix(dp.Timestamp, 0)
} else {
ts = time.Unix(dp.Timestamp/1000, (dp.Timestamp%1000)*1000)
}
fields := make(map[string]interface{})
fields["value"] = dp.Value
if err != nil {
continue
}
p := influxdb.Point{
Name: dp.Metric,
Tags: dp.Tags,
Timestamp: ts,
Fields: fields,
}
idps = append(idps, p)
}
_, err = s.writer.WriteSeries(s.database, s.retentionpolicy, idps)
if err != nil {
log.Println("TSDB cannot write data: ", err)
}
w.WriteHeader(http.StatusNoContent)
}
// tsdbHTTPListener is a listener that takes connects from a channel
// rather than directly from a network socket
type tsdbHTTPListener struct {
addr net.Addr
cls chan struct{}
acc chan tsdbHTTPReq
}
// tsdbHTTPReq represents a incoming connection that we have established
// to be a valid http request.
type tsdbHTTPReq struct {
conn net.Conn
err error
}
func (l *tsdbHTTPListener) Accept() (c net.Conn, err error) {
select {
case newc := <-l.acc:
log.Println("TSDB listener accept ", newc)
return newc.conn, newc.err
case <-l.cls:
close(l.cls)
close(l.acc)
return nil, nil
}
}
func (l *tsdbHTTPListener) Close() error {
l.cls <- struct{}{}
return nil
}
func (l *tsdbHTTPListener) Addr() net.Addr {
return l.addr
}
func makeTSDBHTTPListener() *tsdbHTTPListener {
return &tsdbHTTPListener{
acc: make(chan tsdbHTTPReq),
cls: make(chan struct{}),
}
}
// tsdbConn is a net.Conn implmentation used to splice peeked buffer content
// to the pre-existing net.Conn that was peeked into
type tsdbConn struct {
rdr *bufio.Reader
net.Conn
}
// Read implmeents the io.Reader interface
func (c *tsdbConn) Read(b []byte) (n int, err error) {
return c.rdr.Read(b)
}