Merge pull request #4062 from influxdb/opentsdb_bad_lines

Enhance openTSDB logging and stats
pull/4066/head
Philip O'Toole 2015-09-09 13:36:46 -07:00
commit a57bd2a227
1 changed files with 21 additions and 5 deletions

View File

@ -32,6 +32,10 @@ const (
statTelnetPointsReceived = "tl_points_rx"
statTelnetBytesReceived = "tl_bytes_rx"
statTelnetReadError = "tl_read_err"
statTelnetBadLine = "tl_bad_line"
statTelnetBadTime = "tl_bad_time"
statTelnetBadTag = "tl_bad_tag"
statTelnetBadFloat = "tl_bad_float"
statBatchesTrasmitted = "batches_tx"
statPointsTransmitted = "points_tx"
statBatchesTransmitFail = "batches_tx_fail"
@ -207,6 +211,10 @@ func (s *Service) serve() {
// handleConn processes conn. This is run in a separate goroutine.
func (s *Service) handleConn(conn net.Conn) {
defer s.statMap.Add(statConnectionsActive, -1)
s.statMap.Add(statConnectionsActive, 1)
s.statMap.Add(statConnectionsHandled, 1)
// Read header into buffer to check if it's HTTP.
var buf bytes.Buffer
r := bufio.NewReader(io.TeeReader(conn, &buf))
@ -240,6 +248,9 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
s.statMap.Add(statTelnetConnectionsActive, 1)
s.statMap.Add(statTelnetConnectionsHandled, 1)
// Get connection details.
remoteAddr := conn.RemoteAddr().String()
// Wrap connection in a text protocol reader.
r := textproto.NewReader(bufio.NewReader(conn))
for {
@ -262,7 +273,8 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
}
if len(inputStrs) < 4 || inputStrs[0] != "put" {
s.Logger.Println("TSDBServer: malformed line, skipping: ", line)
s.statMap.Add(statTelnetBadLine, 1)
s.Logger.Printf("malformed line '%s' from %s", line, remoteAddr)
continue
}
@ -274,7 +286,8 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
var t time.Time
ts, err := strconv.ParseInt(tsStr, 10, 64)
if err != nil {
s.Logger.Println("TSDBServer: malformed time, skipping: ", tsStr)
s.statMap.Add(statTelnetBadTime, 1)
s.Logger.Printf("malformed time '%s' from %s", tsStr, remoteAddr)
}
switch len(tsStr) {
@ -285,7 +298,8 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
t = time.Unix(ts/1000, (ts%1000)*1000)
break
default:
s.Logger.Println("TSDBServer: time must be 10 or 13 chars, skipping: ", tsStr)
s.statMap.Add(statTelnetBadTime, 1)
s.Logger.Printf("bad time '%s' must be 10 or 13 chars, from %s ", tsStr, remoteAddr)
continue
}
@ -293,7 +307,8 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
for t := range tagStrs {
parts := strings.SplitN(tagStrs[t], "=", 2)
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
s.Logger.Println("TSDBServer: malformed tag data", tagStrs[t])
s.statMap.Add(statTelnetBadTag, 1)
s.Logger.Printf("malformed tag data '%v' from %s", tagStrs[t], remoteAddr)
continue
}
k := parts[0]
@ -304,7 +319,8 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
fields := make(map[string]interface{})
fields["value"], err = strconv.ParseFloat(valueStr, 64)
if err != nil {
s.Logger.Println("TSDBServer: could not parse value as float: ", valueStr)
s.statMap.Add(statTelnetBadFloat, 1)
s.Logger.Printf("bad float '%s' from %s", valueStr, remoteAddr)
continue
}