diff --git a/services/opentsdb/service.go b/services/opentsdb/service.go index 5301a4e864..dba6800a5f 100644 --- a/services/opentsdb/service.go +++ b/services/opentsdb/service.go @@ -32,6 +32,10 @@ const ( statTelnetPointsReceived = "tl_points_rx" statTelnetBytesReceived = "tl_bytes_rx" statTelnetReadError = "tl_read_err" + statTelnetBadLine = "tl_bad_line" + statTelnetBadTime = "tl_bad_time" + statTelnetBadTag = "tl_bad_tag" + statTelnetBadFloat = "tl_bad_float" statBatchesTrasmitted = "batches_tx" statPointsTransmitted = "points_tx" statBatchesTransmitFail = "batches_tx_fail" @@ -207,6 +211,10 @@ func (s *Service) serve() { // handleConn processes conn. This is run in a separate goroutine. func (s *Service) handleConn(conn net.Conn) { + defer s.statMap.Add(statConnectionsActive, -1) + s.statMap.Add(statConnectionsActive, 1) + s.statMap.Add(statConnectionsHandled, 1) + // Read header into buffer to check if it's HTTP. var buf bytes.Buffer r := bufio.NewReader(io.TeeReader(conn, &buf)) @@ -240,6 +248,9 @@ func (s *Service) handleTelnetConn(conn net.Conn) { s.statMap.Add(statTelnetConnectionsActive, 1) s.statMap.Add(statTelnetConnectionsHandled, 1) + // Get connection details. + remoteAddr := conn.RemoteAddr().String() + // Wrap connection in a text protocol reader. r := textproto.NewReader(bufio.NewReader(conn)) for { @@ -262,7 +273,8 @@ func (s *Service) handleTelnetConn(conn net.Conn) { } if len(inputStrs) < 4 || inputStrs[0] != "put" { - s.Logger.Println("TSDBServer: malformed line, skipping: ", line) + s.statMap.Add(statTelnetBadLine, 1) + s.Logger.Printf("malformed line '%s' from %s", line, remoteAddr) continue } @@ -274,7 +286,8 @@ func (s *Service) handleTelnetConn(conn net.Conn) { var t time.Time ts, err := strconv.ParseInt(tsStr, 10, 64) if err != nil { - s.Logger.Println("TSDBServer: malformed time, skipping: ", tsStr) + s.statMap.Add(statTelnetBadTime, 1) + s.Logger.Printf("malformed time '%s' from %s", tsStr, remoteAddr) } switch len(tsStr) { @@ -285,7 +298,8 @@ func (s *Service) handleTelnetConn(conn net.Conn) { t = time.Unix(ts/1000, (ts%1000)*1000) break default: - s.Logger.Println("TSDBServer: time must be 10 or 13 chars, skipping: ", tsStr) + s.statMap.Add(statTelnetBadTime, 1) + s.Logger.Printf("bad time '%s' must be 10 or 13 chars, from %s ", tsStr, remoteAddr) continue } @@ -293,7 +307,8 @@ func (s *Service) handleTelnetConn(conn net.Conn) { for t := range tagStrs { parts := strings.SplitN(tagStrs[t], "=", 2) if len(parts) != 2 || parts[0] == "" || parts[1] == "" { - s.Logger.Println("TSDBServer: malformed tag data", tagStrs[t]) + s.statMap.Add(statTelnetBadTag, 1) + s.Logger.Printf("malformed tag data '%v' from %s", tagStrs[t], remoteAddr) continue } k := parts[0] @@ -304,7 +319,8 @@ func (s *Service) handleTelnetConn(conn net.Conn) { fields := make(map[string]interface{}) fields["value"], err = strconv.ParseFloat(valueStr, 64) if err != nil { - s.Logger.Println("TSDBServer: could not parse value as float: ", valueStr) + s.statMap.Add(statTelnetBadFloat, 1) + s.Logger.Printf("bad float '%s' from %s", valueStr, remoteAddr) continue }