Refactoring the monitor service to avoid expvar
Truncate the time interval output of the monitor service to be on even time intervals rather than on every minute based on the start time. This normalizes the output from the monitor service.pull/6964/head
parent
df289a687d
commit
837a9804cf
cmd/influxd/run
coordinator
influxql
models
services
collectd
continuous_querier
graphite
opentsdb
subscriber
udp
|
@ -31,6 +31,7 @@ With this release the systemd configuration files for InfluxDB will use the syst
|
|||
- [#6889](https://github.com/influxdata/influxdb/pull/6889): Update help and remove unused config options from the configuration file.
|
||||
- [#6900](https://github.com/influxdata/influxdb/pull/6900): Trim BOM from Windows Notepad-saved config files.
|
||||
- [#6938](https://github.com/influxdata/influxdb/issues/6938): Added favicon
|
||||
- [#6507](https://github.com/influxdata/influxdb/issues/6507): Refactor monitor service to avoid expvar and write monitor statistics on a truncated time interval.
|
||||
|
||||
### Bugfixes
|
||||
|
||||
|
|
|
@ -147,8 +147,6 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
|
|||
|
||||
MetaClient: meta.NewClient(c.Meta),
|
||||
|
||||
Monitor: monitor.New(c.Monitor),
|
||||
|
||||
reportingDisabled: c.ReportingDisabled,
|
||||
|
||||
httpAPIAddr: c.HTTPD.BindAddress,
|
||||
|
@ -158,6 +156,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
|
|||
config: c,
|
||||
logOutput: os.Stderr,
|
||||
}
|
||||
s.Monitor = monitor.New(s, c.Monitor)
|
||||
|
||||
if err := s.MetaClient.Open(); err != nil {
|
||||
return nil, err
|
||||
|
@ -203,6 +202,20 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
|
|||
return s, nil
|
||||
}
|
||||
|
||||
func (s *Server) Statistics(tags map[string]string) []models.Statistic {
|
||||
var statistics []models.Statistic
|
||||
statistics = append(statistics, s.QueryExecutor.Statistics(tags)...)
|
||||
statistics = append(statistics, s.TSDBStore.Statistics(tags)...)
|
||||
statistics = append(statistics, s.PointsWriter.Statistics(tags)...)
|
||||
statistics = append(statistics, s.Subscriber.Statistics(tags)...)
|
||||
for _, srv := range s.Services {
|
||||
if m, ok := srv.(monitor.Reporter); ok {
|
||||
statistics = append(statistics, m.Statistics(tags)...)
|
||||
}
|
||||
}
|
||||
return statistics
|
||||
}
|
||||
|
||||
func (s *Server) appendSnapshotterService() {
|
||||
srv := snapshotter.NewService()
|
||||
srv.TSDBStore = s.TSDBStore
|
||||
|
@ -250,6 +263,7 @@ func (s *Server) appendHTTPDService(c httpd.Config) {
|
|||
srv.Handler.QueryAuthorizer = meta.NewQueryAuthorizer(s.MetaClient)
|
||||
srv.Handler.WriteAuthorizer = meta.NewWriteAuthorizer(s.MetaClient)
|
||||
srv.Handler.QueryExecutor = s.QueryExecutor
|
||||
srv.Handler.Monitor = s.Monitor
|
||||
srv.Handler.PointsWriter = s.PointsWriter
|
||||
srv.Handler.Version = s.buildInfo.Version
|
||||
|
||||
|
|
|
@ -2,11 +2,11 @@ package coordinator
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"expvar"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
|
@ -70,7 +70,7 @@ type PointsWriter struct {
|
|||
}
|
||||
subPoints chan<- *WritePointsRequest
|
||||
|
||||
statMap *expvar.Map
|
||||
stats *WriteStatistics
|
||||
}
|
||||
|
||||
// WritePointsRequest represents a request to write point data to the cluster
|
||||
|
@ -97,7 +97,7 @@ func NewPointsWriter() *PointsWriter {
|
|||
closing: make(chan struct{}),
|
||||
WriteTimeout: DefaultWriteTimeout,
|
||||
Logger: log.New(os.Stderr, "[write] ", log.LstdFlags),
|
||||
statMap: influxdb.NewStatistics("write", "write", nil),
|
||||
stats: &WriteStatistics{},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -159,6 +159,38 @@ func (w *PointsWriter) SetLogOutput(lw io.Writer) {
|
|||
w.Logger = log.New(lw, "[write] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// WriteStatistics keeps statistics related to the PointsWriter.
|
||||
type WriteStatistics struct {
|
||||
WriteReq int64
|
||||
PointWriteReq int64
|
||||
PointWriteReqLocal int64
|
||||
WriteOK int64
|
||||
WriteDropped int64
|
||||
WriteTimeout int64
|
||||
WriteErr int64
|
||||
SubWriteOK int64
|
||||
SubWriteDrop int64
|
||||
}
|
||||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
func (w *PointsWriter) Statistics(tags map[string]string) []models.Statistic {
|
||||
return []models.Statistic{{
|
||||
Name: "write",
|
||||
Tags: tags,
|
||||
Values: map[string]interface{}{
|
||||
statWriteReq: atomic.LoadInt64(&w.stats.WriteReq),
|
||||
statPointWriteReq: atomic.LoadInt64(&w.stats.PointWriteReq),
|
||||
statPointWriteReqLocal: atomic.LoadInt64(&w.stats.PointWriteReqLocal),
|
||||
statWriteOK: atomic.LoadInt64(&w.stats.WriteOK),
|
||||
statWriteDrop: atomic.LoadInt64(&w.stats.WriteDropped),
|
||||
statWriteTimeout: atomic.LoadInt64(&w.stats.WriteTimeout),
|
||||
statWriteErr: atomic.LoadInt64(&w.stats.WriteErr),
|
||||
statSubWriteOK: atomic.LoadInt64(&w.stats.SubWriteOK),
|
||||
statSubWriteDrop: atomic.LoadInt64(&w.stats.SubWriteDrop),
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
||||
// MapShards maps the points contained in wp to a ShardMapping. If a point
|
||||
// maps to a shard group or shard that does not currently exist, it will be
|
||||
// created before returning the mapping.
|
||||
|
@ -205,7 +237,7 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
|
|||
for _, p := range wp.Points {
|
||||
sg, ok := timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]
|
||||
if !ok {
|
||||
w.statMap.Add(statWriteDrop, 1)
|
||||
atomic.AddInt64(&w.stats.WriteDropped, 1)
|
||||
continue
|
||||
}
|
||||
sh := sg.ShardFor(p.HashID())
|
||||
|
@ -222,8 +254,8 @@ func (w *PointsWriter) WritePointsInto(p *IntoWriteRequest) error {
|
|||
|
||||
// WritePoints writes across multiple local and remote data nodes according the consistency level.
|
||||
func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
|
||||
w.statMap.Add(statWriteReq, 1)
|
||||
w.statMap.Add(statPointWriteReq, int64(len(points)))
|
||||
atomic.AddInt64(&w.stats.WriteReq, 1)
|
||||
atomic.AddInt64(&w.stats.PointWriteReq, int64(len(points)))
|
||||
|
||||
if retentionPolicy == "" {
|
||||
db := w.MetaClient.Database(database)
|
||||
|
@ -258,9 +290,9 @@ func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistency
|
|||
}
|
||||
w.mu.RUnlock()
|
||||
if ok {
|
||||
w.statMap.Add(statSubWriteOK, 1)
|
||||
atomic.AddInt64(&w.stats.SubWriteOK, 1)
|
||||
} else {
|
||||
w.statMap.Add(statSubWriteDrop, 1)
|
||||
atomic.AddInt64(&w.stats.SubWriteDrop, 1)
|
||||
}
|
||||
|
||||
timeout := time.NewTimer(w.WriteTimeout)
|
||||
|
@ -270,7 +302,7 @@ func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistency
|
|||
case <-w.closing:
|
||||
return ErrWriteFailed
|
||||
case <-timeout.C:
|
||||
w.statMap.Add(statWriteTimeout, 1)
|
||||
atomic.AddInt64(&w.stats.WriteTimeout, 1)
|
||||
// return timeout error to caller
|
||||
return ErrTimeout
|
||||
case err := <-ch:
|
||||
|
@ -284,11 +316,11 @@ func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistency
|
|||
|
||||
// writeToShards writes points to a shard.
|
||||
func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) error {
|
||||
w.statMap.Add(statPointWriteReqLocal, int64(len(points)))
|
||||
atomic.AddInt64(&w.stats.PointWriteReqLocal, int64(len(points)))
|
||||
|
||||
err := w.TSDBStore.WriteToShard(shard.ID, points)
|
||||
if err == nil {
|
||||
w.statMap.Add(statWriteOK, 1)
|
||||
atomic.AddInt64(&w.stats.WriteOK, 1)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -298,17 +330,18 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
|
|||
err = w.TSDBStore.CreateShard(database, retentionPolicy, shard.ID, true)
|
||||
if err != nil {
|
||||
w.Logger.Printf("write failed for shard %d: %v", shard.ID, err)
|
||||
w.statMap.Add(statWriteErr, 1)
|
||||
|
||||
atomic.AddInt64(&w.stats.WriteErr, 1)
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = w.TSDBStore.WriteToShard(shard.ID, points)
|
||||
if err != nil {
|
||||
w.Logger.Printf("write failed for shard %d: %v", shard.ID, err)
|
||||
w.statMap.Add(statWriteErr, 1)
|
||||
atomic.AddInt64(&w.stats.WriteErr, 1)
|
||||
return err
|
||||
}
|
||||
|
||||
w.statMap.Add(statWriteOK, 1)
|
||||
atomic.AddInt64(&w.stats.WriteOK, 1)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -996,20 +996,13 @@ func (s *SelectStatement) TimeFieldName() string {
|
|||
|
||||
// Clone returns a deep copy of the statement.
|
||||
func (s *SelectStatement) Clone() *SelectStatement {
|
||||
clone := &SelectStatement{
|
||||
Fields: make(Fields, 0, len(s.Fields)),
|
||||
Dimensions: make(Dimensions, 0, len(s.Dimensions)),
|
||||
Sources: cloneSources(s.Sources),
|
||||
SortFields: make(SortFields, 0, len(s.SortFields)),
|
||||
Condition: CloneExpr(s.Condition),
|
||||
Limit: s.Limit,
|
||||
Offset: s.Offset,
|
||||
SLimit: s.SLimit,
|
||||
SOffset: s.SOffset,
|
||||
Fill: s.Fill,
|
||||
FillValue: s.FillValue,
|
||||
IsRawQuery: s.IsRawQuery,
|
||||
}
|
||||
clone := *s
|
||||
clone.Fields = make(Fields, 0, len(s.Fields))
|
||||
clone.Dimensions = make(Dimensions, 0, len(s.Dimensions))
|
||||
clone.Sources = cloneSources(s.Sources)
|
||||
clone.SortFields = make(SortFields, 0, len(s.SortFields))
|
||||
clone.Condition = CloneExpr(s.Condition)
|
||||
|
||||
if s.Target != nil {
|
||||
clone.Target = &Target{
|
||||
Measurement: &Measurement{
|
||||
|
@ -1029,7 +1022,7 @@ func (s *SelectStatement) Clone() *SelectStatement {
|
|||
for _, f := range s.SortFields {
|
||||
clone.SortFields = append(clone.SortFields, &SortField{Name: f.Name, Ascending: f.Ascending})
|
||||
}
|
||||
return clone
|
||||
return &clone
|
||||
}
|
||||
|
||||
func cloneSources(sources Sources) Sources {
|
||||
|
|
|
@ -2,16 +2,16 @@ package influxql
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -119,7 +119,7 @@ type QueryExecutor struct {
|
|||
Logger *log.Logger
|
||||
|
||||
// expvar-based stats.
|
||||
statMap *expvar.Map
|
||||
stats *QueryStatistics
|
||||
}
|
||||
|
||||
// NewQueryExecutor returns a new instance of QueryExecutor.
|
||||
|
@ -127,10 +127,28 @@ func NewQueryExecutor() *QueryExecutor {
|
|||
return &QueryExecutor{
|
||||
TaskManager: NewTaskManager(),
|
||||
Logger: log.New(ioutil.Discard, "[query] ", log.LstdFlags),
|
||||
statMap: influxdb.NewStatistics("queryExecutor", "queryExecutor", nil),
|
||||
stats: &QueryStatistics{},
|
||||
}
|
||||
}
|
||||
|
||||
// QueryStatistics keeps statistics related to the QueryExecutor.
|
||||
type QueryStatistics struct {
|
||||
ActiveQueries int64
|
||||
QueryExecutionDuration int64
|
||||
}
|
||||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
func (e *QueryExecutor) Statistics(tags map[string]string) []models.Statistic {
|
||||
return []models.Statistic{{
|
||||
Name: "queryExecutor",
|
||||
Tags: tags,
|
||||
Values: map[string]interface{}{
|
||||
statQueriesActive: atomic.LoadInt64(&e.stats.ActiveQueries),
|
||||
statQueryExecutionDuration: atomic.LoadInt64(&e.stats.QueryExecutionDuration),
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
||||
// Close kills all running queries and prevents new queries from being attached.
|
||||
func (e *QueryExecutor) Close() error {
|
||||
return e.TaskManager.Close()
|
||||
|
@ -154,10 +172,10 @@ func (e *QueryExecutor) executeQuery(query *Query, opt ExecutionOptions, closing
|
|||
defer close(results)
|
||||
defer e.recover(query, results)
|
||||
|
||||
e.statMap.Add(statQueriesActive, 1)
|
||||
atomic.AddInt64(&e.stats.ActiveQueries, 1)
|
||||
defer func(start time.Time) {
|
||||
e.statMap.Add(statQueriesActive, -1)
|
||||
e.statMap.Add(statQueryExecutionDuration, time.Since(start).Nanoseconds())
|
||||
atomic.AddInt64(&e.stats.ActiveQueries, -1)
|
||||
atomic.AddInt64(&e.stats.QueryExecutionDuration, time.Since(start).Nanoseconds())
|
||||
}(time.Now())
|
||||
|
||||
qid, task, err := e.TaskManager.AttachQuery(query, opt.Database, closing)
|
||||
|
|
45
influxvar.go
45
influxvar.go
|
@ -1,45 +0,0 @@
|
|||
package influxdb
|
||||
|
||||
import (
|
||||
"expvar"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var expvarMu sync.Mutex
|
||||
|
||||
// NewStatistics returns an expvar-based map with the given key. Within that map
|
||||
// is another map. Within there "name" is the Measurement name, "tags" are the tags,
|
||||
// and values are placed at the key "values".
|
||||
func NewStatistics(key, name string, tags map[string]string) *expvar.Map {
|
||||
expvarMu.Lock()
|
||||
defer expvarMu.Unlock()
|
||||
|
||||
// Add expvar for this service.
|
||||
var v expvar.Var
|
||||
if v = expvar.Get(key); v == nil {
|
||||
v = expvar.NewMap(key)
|
||||
}
|
||||
m := v.(*expvar.Map)
|
||||
|
||||
// Set the name
|
||||
nameVar := &expvar.String{}
|
||||
nameVar.Set(name)
|
||||
m.Set("name", nameVar)
|
||||
|
||||
// Set the tags
|
||||
tagsVar := &expvar.Map{}
|
||||
tagsVar.Init()
|
||||
for k, v := range tags {
|
||||
value := &expvar.String{}
|
||||
value.Set(v)
|
||||
tagsVar.Set(k, value)
|
||||
}
|
||||
m.Set("tags", tagsVar)
|
||||
|
||||
// Create and set the values entry used for actual stats.
|
||||
statMap := &expvar.Map{}
|
||||
statMap.Init()
|
||||
m.Set("values", statMap)
|
||||
|
||||
return statMap
|
||||
}
|
|
@ -1390,6 +1390,20 @@ func (p *point) UnixNano() int64 {
|
|||
// values.
|
||||
type Tags map[string]string
|
||||
|
||||
// Merge merges the tags combining the two. If both define a tag with the
|
||||
// same key, the merged value overwrites the old value.
|
||||
// A new map is returned.
|
||||
func (t Tags) Merge(other map[string]string) Tags {
|
||||
merged := make(map[string]string, len(t)+len(other))
|
||||
for k, v := range t {
|
||||
merged[k] = v
|
||||
}
|
||||
for k, v := range other {
|
||||
merged[k] = v
|
||||
}
|
||||
return Tags(merged)
|
||||
}
|
||||
|
||||
// HashKey hashes all of a tag's keys.
|
||||
func (t Tags) HashKey() []byte {
|
||||
// Empty maps marshal to empty bytes.
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
package models
|
||||
|
||||
type Statistic struct {
|
||||
Name string `json:"name"`
|
||||
Tags Tags `json:"tags"`
|
||||
Values map[string]interface{} `json:"values"`
|
||||
}
|
|
@ -39,35 +39,8 @@ A new module named `monitor` supports all basic statistics and diagnostic functi
|
|||
|
||||
## Registering statistics and diagnostics
|
||||
|
||||
To export statistical information with the `monitor` system, code simply calls `influxdb.NewStatistics()` and receives an `expvar.Map` instance in response. This object can then be used to store statistics.
|
||||
|
||||
For example, if you have a component called `Service`, you can statistics like so:
|
||||
|
||||
```
|
||||
import (
|
||||
"expvar"
|
||||
"github.com/influxdata/influxdb"
|
||||
)
|
||||
.
|
||||
.
|
||||
.
|
||||
.
|
||||
type Service struct {
|
||||
....some other fields....
|
||||
statMap *expvar.Map /// Add a map of type *expvar.Map. Check GoDocs for how to use this.
|
||||
}
|
||||
|
||||
|
||||
func NewService() *Service {
|
||||
s = &NewService{}
|
||||
s.statMap = NewStatistics(key, name, tags)
|
||||
}
|
||||
```
|
||||
When calling `NewStatistics` `key` should be unique for the Service instance (if a network service, the protocol and binding port are good to include in the key). `name` will be the name of the Measurement used to store these statistics. Finally, when these statistics are written to the `monitor` database, all points will be tagged with `tags`. A value of nil for `tags` is legal.
|
||||
To export statistical information with the `monitor` system, a service should implement the `monitor.Reporter` interface. Services added to the Server will be automatically added to the list of statistics returned. Any service that is not added to the `Services` slice will need to modify the `Server`'s `Statistics(map[string]string)` method to aggregate the call to the service's `Statistics(map[string]string)` method so they are combined into a single response. The `Statistics(map[string]string)` method should return a statistics slice with the passed in tags included. The statistics should be kept inside of an internal structure and should be accessed in a thread-safe way. It is common to create a struct for holding the statistics and using `sync/atomic` instead of locking. If using `sync/atomic`, be sure to align the values in the struct so it works properly on `i386`.
|
||||
|
||||
To register diagnostic information, `monitor.RegisterDiagnosticsClient` is called, passing a `influxdb.monitor.DiagsClient` object to `monitor`. Implementing the `influxdb.monitor.DiagsClient` interface requires that your component have function returning diagnostic information in specific form, so that it can be displayed by the `monitor` system.
|
||||
|
||||
## expvar
|
||||
Statistical information is gathered by each package using [expvar](https://golang.org/pkg/expvar). Each package registers a map using its package name.
|
||||
|
||||
Due to the nature of `expvar`, statistical information is reset to its initial state when a server is restarted.
|
||||
Statistical information is reset to its initial state when a server is restarted.
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
package monitor
|
||||
|
||||
import "github.com/influxdata/influxdb/models"
|
||||
|
||||
type Reporter interface {
|
||||
Statistics(tags map[string]string) []models.Statistic
|
||||
}
|
|
@ -1,6 +1,8 @@
|
|||
package monitor // import "github.com/influxdata/influxdb/monitor"
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -33,9 +35,10 @@ type Monitor struct {
|
|||
|
||||
wg sync.WaitGroup
|
||||
|
||||
mu sync.Mutex
|
||||
mu sync.RWMutex
|
||||
globalTags map[string]string
|
||||
diagRegistrations map[string]diagnostics.Client
|
||||
reporter Reporter
|
||||
done chan struct{}
|
||||
storeCreated bool
|
||||
storeEnabled bool
|
||||
|
@ -64,10 +67,11 @@ type PointsWriter interface {
|
|||
}
|
||||
|
||||
// New returns a new instance of the monitor system.
|
||||
func New(c Config) *Monitor {
|
||||
func New(r Reporter, c Config) *Monitor {
|
||||
return &Monitor{
|
||||
globalTags: make(map[string]string),
|
||||
diagRegistrations: make(map[string]diagnostics.Client),
|
||||
reporter: r,
|
||||
storeEnabled: c.StoreEnabled,
|
||||
storeDatabase: c.StoreDatabase,
|
||||
storeInterval: time.Duration(c.StoreInterval),
|
||||
|
@ -206,8 +210,10 @@ func (m *Monitor) Statistics(tags map[string]string) ([]*Statistic, error) {
|
|||
}
|
||||
|
||||
statistic := &Statistic{
|
||||
Tags: make(map[string]string),
|
||||
Values: make(map[string]interface{}),
|
||||
Statistic: models.Statistic{
|
||||
Tags: make(map[string]string),
|
||||
Values: make(map[string]interface{}),
|
||||
},
|
||||
}
|
||||
|
||||
// Add any supplied tags.
|
||||
|
@ -272,9 +278,11 @@ func (m *Monitor) Statistics(tags map[string]string) ([]*Statistic, error) {
|
|||
|
||||
// Add Go memstats.
|
||||
statistic := &Statistic{
|
||||
Name: "runtime",
|
||||
Tags: make(map[string]string),
|
||||
Values: make(map[string]interface{}),
|
||||
Statistic: models.Statistic{
|
||||
Name: "runtime",
|
||||
Tags: make(map[string]string),
|
||||
Values: make(map[string]interface{}),
|
||||
},
|
||||
}
|
||||
|
||||
// Add any supplied tags to Go memstats
|
||||
|
@ -303,9 +311,22 @@ func (m *Monitor) Statistics(tags map[string]string) ([]*Statistic, error) {
|
|||
}
|
||||
statistics = append(statistics, statistic)
|
||||
|
||||
statistics = m.gatherStatistics(statistics, tags)
|
||||
sort.Sort(Statistics(statistics))
|
||||
|
||||
return statistics, nil
|
||||
}
|
||||
|
||||
func (m *Monitor) gatherStatistics(statistics []*Statistic, tags map[string]string) []*Statistic {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
for _, s := range m.reporter.Statistics(tags) {
|
||||
statistics = append(statistics, &Statistic{Statistic: s})
|
||||
}
|
||||
return statistics
|
||||
}
|
||||
|
||||
// Diagnostics fetches diagnostic information for each registered
|
||||
// diagnostic client. It skips any clients that return an error when
|
||||
// retrieving their diagnostics.
|
||||
|
@ -346,6 +367,21 @@ func (m *Monitor) createInternalStorage() {
|
|||
m.storeCreated = true
|
||||
}
|
||||
|
||||
// waitUntilInterval waits until we are on an even interval for the duration.
|
||||
func (m *Monitor) waitUntilInterval(d time.Duration) error {
|
||||
now := time.Now()
|
||||
until := now.Truncate(d).Add(d)
|
||||
timer := time.NewTimer(until.Sub(now))
|
||||
defer timer.Stop()
|
||||
|
||||
select {
|
||||
case <-timer.C:
|
||||
return nil
|
||||
case <-m.done:
|
||||
return errors.New("interrupted")
|
||||
}
|
||||
}
|
||||
|
||||
// storeStatistics writes the statistics to an InfluxDB system.
|
||||
func (m *Monitor) storeStatistics() {
|
||||
defer m.wg.Done()
|
||||
|
@ -355,35 +391,44 @@ func (m *Monitor) storeStatistics() {
|
|||
hostname, _ := os.Hostname()
|
||||
m.SetGlobalTag("hostname", hostname)
|
||||
|
||||
m.mu.Lock()
|
||||
tick := time.NewTicker(m.storeInterval)
|
||||
m.mu.Unlock()
|
||||
// Wait until an even interval to start recording monitor statistics.
|
||||
// If we are interrupted before the interval for some reason, exit early.
|
||||
if err := m.waitUntilInterval(m.storeInterval); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
tick := time.NewTicker(m.storeInterval)
|
||||
defer tick.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-tick.C:
|
||||
case now := <-tick.C:
|
||||
now = now.Truncate(m.storeInterval)
|
||||
func() {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
m.createInternalStorage()
|
||||
}()
|
||||
|
||||
stats, err := m.Statistics(m.globalTags)
|
||||
stats, err := m.Statistics(m.globalTags)
|
||||
if err != nil {
|
||||
m.Logger.Printf("failed to retrieve registered statistics: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
points := make(models.Points, 0, len(stats))
|
||||
for _, s := range stats {
|
||||
pt, err := models.NewPoint(s.Name, s.Tags, s.Values, now)
|
||||
if err != nil {
|
||||
m.Logger.Printf("failed to retrieve registered statistics: %s", err)
|
||||
m.Logger.Printf("Dropping point %v: %v", s.Name, err)
|
||||
return
|
||||
}
|
||||
points = append(points, pt)
|
||||
}
|
||||
|
||||
points := make(models.Points, 0, len(stats))
|
||||
for _, s := range stats {
|
||||
pt, err := models.NewPoint(s.Name, s.Tags, s.Values, time.Now().Truncate(time.Second))
|
||||
if err != nil {
|
||||
m.Logger.Printf("Dropping point %v: %v", s.Name, err)
|
||||
return
|
||||
}
|
||||
points = append(points, pt)
|
||||
}
|
||||
func() {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
if err := m.PointsWriter.WritePoints(m.storeDatabase, m.storeRetentionPolicy, points); err != nil {
|
||||
m.Logger.Printf("failed to store statistics: %s", err)
|
||||
|
@ -398,9 +443,7 @@ func (m *Monitor) storeStatistics() {
|
|||
|
||||
// Statistic represents the information returned by a single monitor client.
|
||||
type Statistic struct {
|
||||
Name string `json:"name"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
Values map[string]interface{} `json:"values"`
|
||||
models.Statistic
|
||||
}
|
||||
|
||||
// valueNames returns a sorted list of the value names, if any.
|
||||
|
@ -413,6 +456,17 @@ func (s *Statistic) ValueNames() []string {
|
|||
return a
|
||||
}
|
||||
|
||||
type Statistics []*Statistic
|
||||
|
||||
func (a Statistics) Len() int { return len(a) }
|
||||
func (a Statistics) Less(i, j int) bool {
|
||||
if a[i].Name != a[j].Name {
|
||||
return a[i].Name < a[j].Name
|
||||
}
|
||||
return bytes.Compare(a[i].Tags.HashKey(), a[j].Tags.HashKey()) < 0
|
||||
}
|
||||
func (a Statistics) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
||||
// DiagnosticsFromMap returns a Diagnostics from a map.
|
||||
func DiagnosticsFromMap(m map[string]interface{}) *diagnostics.Diagnostics {
|
||||
// Display columns in deterministic order.
|
||||
|
|
|
@ -1,17 +1,15 @@
|
|||
package collectd // import "github.com/influxdata/influxdb/services/collectd"
|
||||
|
||||
import (
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
|
@ -24,7 +22,7 @@ const (
|
|||
statBytesReceived = "bytesRx"
|
||||
statPointsParseFail = "pointsParseFail"
|
||||
statReadFail = "readFail"
|
||||
statBatchesTrasmitted = "batchesTx"
|
||||
statBatchesTransmitted = "batchesTx"
|
||||
statPointsTransmitted = "pointsTx"
|
||||
statBatchesTransmitFail = "batchesTxFail"
|
||||
statDroppedPointsInvalid = "droppedPointsInvalid"
|
||||
|
@ -57,7 +55,8 @@ type Service struct {
|
|||
addr net.Addr
|
||||
|
||||
// expvar-based stats.
|
||||
statMap *expvar.Map
|
||||
stats *Statistics
|
||||
statTags models.Tags
|
||||
}
|
||||
|
||||
// NewService returns a new instance of the collectd service.
|
||||
|
@ -66,8 +65,10 @@ func NewService(c Config) *Service {
|
|||
// Use defaults where necessary.
|
||||
Config: c.WithDefaults(),
|
||||
|
||||
Logger: log.New(os.Stderr, "[collectd] ", log.LstdFlags),
|
||||
err: make(chan error),
|
||||
Logger: log.New(os.Stderr, "[collectd] ", log.LstdFlags),
|
||||
err: make(chan error),
|
||||
stats: &Statistics{},
|
||||
statTags: map[string]string{"bind": c.BindAddress},
|
||||
}
|
||||
|
||||
return &s
|
||||
|
@ -77,12 +78,6 @@ func NewService(c Config) *Service {
|
|||
func (s *Service) Open() error {
|
||||
s.Logger.Printf("Starting collectd service")
|
||||
|
||||
// Configure expvar monitoring. It's OK to do this even if the service fails to open and
|
||||
// should be done before any data could arrive for the service.
|
||||
key := strings.Join([]string{"collectd", s.Config.BindAddress}, ":")
|
||||
tags := map[string]string{"bind": s.Config.BindAddress}
|
||||
s.statMap = influxdb.NewStatistics(key, "collectd", tags)
|
||||
|
||||
if s.Config.BindAddress == "" {
|
||||
return fmt.Errorf("bind address is blank")
|
||||
} else if s.Config.Database == "" {
|
||||
|
@ -172,6 +167,36 @@ func (s *Service) SetLogOutput(w io.Writer) {
|
|||
s.Logger = log.New(w, "[collectd] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// Statistics maintains statistics for the collectd service.
|
||||
type Statistics struct {
|
||||
PointsReceived int64
|
||||
BytesReceived int64
|
||||
PointsParseFail int64
|
||||
ReadFail int64
|
||||
BatchesTransmitted int64
|
||||
PointsTransmitted int64
|
||||
BatchesTransmitFail int64
|
||||
InvalidDroppedPoints int64
|
||||
}
|
||||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
|
||||
return []models.Statistic{{
|
||||
Name: "collectd",
|
||||
Tags: s.statTags.Merge(tags),
|
||||
Values: map[string]interface{}{
|
||||
statPointsReceived: atomic.LoadInt64(&s.stats.PointsReceived),
|
||||
statBytesReceived: atomic.LoadInt64(&s.stats.BytesReceived),
|
||||
statPointsParseFail: atomic.LoadInt64(&s.stats.PointsParseFail),
|
||||
statReadFail: atomic.LoadInt64(&s.stats.ReadFail),
|
||||
statBatchesTransmitted: atomic.LoadInt64(&s.stats.BatchesTransmitted),
|
||||
statPointsTransmitted: atomic.LoadInt64(&s.stats.PointsTransmitted),
|
||||
statBatchesTransmitFail: atomic.LoadInt64(&s.stats.BatchesTransmitFail),
|
||||
statDroppedPointsInvalid: atomic.LoadInt64(&s.stats.InvalidDroppedPoints),
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
||||
// SetTypes sets collectd types db.
|
||||
func (s *Service) SetTypes(types string) (err error) {
|
||||
s.typesdb, err = gollectd.TypesDB([]byte(types))
|
||||
|
@ -210,12 +235,12 @@ func (s *Service) serve() {
|
|||
|
||||
n, _, err := s.conn.ReadFromUDP(buffer)
|
||||
if err != nil {
|
||||
s.statMap.Add(statReadFail, 1)
|
||||
atomic.AddInt64(&s.stats.ReadFail, 1)
|
||||
s.Logger.Printf("collectd ReadFromUDP error: %s", err)
|
||||
continue
|
||||
}
|
||||
if n > 0 {
|
||||
s.statMap.Add(statBytesReceived, int64(n))
|
||||
atomic.AddInt64(&s.stats.BytesReceived, int64(n))
|
||||
s.handleMessage(buffer[:n])
|
||||
}
|
||||
}
|
||||
|
@ -224,7 +249,7 @@ func (s *Service) serve() {
|
|||
func (s *Service) handleMessage(buffer []byte) {
|
||||
packets, err := gollectd.Packets(buffer, s.typesdb)
|
||||
if err != nil {
|
||||
s.statMap.Add(statPointsParseFail, 1)
|
||||
atomic.AddInt64(&s.stats.PointsParseFail, 1)
|
||||
s.Logger.Printf("Collectd parse error: %s", err)
|
||||
return
|
||||
}
|
||||
|
@ -233,7 +258,7 @@ func (s *Service) handleMessage(buffer []byte) {
|
|||
for _, p := range points {
|
||||
s.batcher.In() <- p
|
||||
}
|
||||
s.statMap.Add(statPointsReceived, int64(len(points)))
|
||||
atomic.AddInt64(&s.stats.PointsReceived, int64(len(points)))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -246,11 +271,11 @@ func (s *Service) writePoints() {
|
|||
return
|
||||
case batch := <-s.batcher.Out():
|
||||
if err := s.PointsWriter.WritePoints(s.Config.Database, s.Config.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
|
||||
s.statMap.Add(statBatchesTrasmitted, 1)
|
||||
s.statMap.Add(statPointsTransmitted, int64(len(batch)))
|
||||
atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
|
||||
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
|
||||
} else {
|
||||
s.Logger.Printf("failed to write point batch to database %q: %s", s.Config.Database, err)
|
||||
s.statMap.Add(statBatchesTransmitFail, 1)
|
||||
atomic.AddInt64(&s.stats.BatchesTransmitFail, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -296,7 +321,7 @@ func (s *Service) UnmarshalCollectd(packet *gollectd.Packet) []models.Point {
|
|||
// Drop invalid points
|
||||
if err != nil {
|
||||
s.Logger.Printf("Dropping point %v: %v", name, err)
|
||||
s.statMap.Add(statDroppedPointsInvalid, 1)
|
||||
atomic.AddInt64(&s.stats.InvalidDroppedPoints, 1)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
@ -2,17 +2,17 @@ package continuous_querier // import "github.com/influxdata/influxdb/services/co
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
)
|
||||
|
||||
|
@ -74,7 +74,7 @@ type Service struct {
|
|||
RunCh chan *RunRequest
|
||||
Logger *log.Logger
|
||||
loggingEnabled bool
|
||||
statMap *expvar.Map
|
||||
stats *Statistics
|
||||
// lastRuns maps CQ name to last time it was run.
|
||||
mu sync.RWMutex
|
||||
lastRuns map[string]time.Time
|
||||
|
@ -89,8 +89,8 @@ func NewService(c Config) *Service {
|
|||
RunInterval: time.Duration(c.RunInterval),
|
||||
RunCh: make(chan *RunRequest),
|
||||
loggingEnabled: c.LogEnabled,
|
||||
statMap: influxdb.NewStatistics("cq", "cq", nil),
|
||||
Logger: log.New(os.Stderr, "[continuous_querier] ", log.LstdFlags),
|
||||
stats: &Statistics{},
|
||||
lastRuns: map[string]time.Time{},
|
||||
}
|
||||
|
||||
|
@ -133,6 +133,24 @@ func (s *Service) SetLogOutput(w io.Writer) {
|
|||
s.Logger = log.New(w, "[continuous_querier] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// Statistics maintains the statistics for the continuous query service.
|
||||
type Statistics struct {
|
||||
QueryOK int64
|
||||
QueryFail int64
|
||||
}
|
||||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
|
||||
return []models.Statistic{{
|
||||
Name: "cq",
|
||||
Tags: tags,
|
||||
Values: map[string]interface{}{
|
||||
statQueryOK: atomic.LoadInt64(&s.stats.QueryOK),
|
||||
statQueryFail: atomic.LoadInt64(&s.stats.QueryFail),
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
||||
// Run runs the specified continuous query, or all CQs if none is specified.
|
||||
func (s *Service) Run(database, name string, t time.Time) error {
|
||||
var dbs []meta.DatabaseInfo
|
||||
|
@ -225,9 +243,9 @@ func (s *Service) runContinuousQueries(req *RunRequest) {
|
|||
}
|
||||
if err := s.ExecuteContinuousQuery(&db, &cq, req.Now); err != nil {
|
||||
s.Logger.Printf("error executing query: %s: err = %s", cq.Query, err)
|
||||
s.statMap.Add(statQueryFail, 1)
|
||||
atomic.AddInt64(&s.stats.QueryFail, 1)
|
||||
} else {
|
||||
s.statMap.Add(statQueryOK, 1)
|
||||
atomic.AddInt64(&s.stats.QueryOK, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,6 @@ package graphite // import "github.com/influxdata/influxdb/services/graphite"
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
|
@ -11,9 +10,9 @@ import (
|
|||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/monitor/diagnostics"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
|
@ -60,8 +59,10 @@ type Service struct {
|
|||
batcher *tsdb.PointBatcher
|
||||
parser *Parser
|
||||
|
||||
logger *log.Logger
|
||||
statMap *expvar.Map
|
||||
logger *log.Logger
|
||||
stats *Statistics
|
||||
statTags models.Tags
|
||||
|
||||
tcpConnectionsMu sync.Mutex
|
||||
tcpConnections map[string]*tcpConnection
|
||||
diagsKey string
|
||||
|
@ -104,6 +105,8 @@ func NewService(c Config) (*Service, error) {
|
|||
udpReadBuffer: d.UDPReadBuffer,
|
||||
batchTimeout: time.Duration(d.BatchTimeout),
|
||||
logger: log.New(os.Stderr, fmt.Sprintf("[graphite] %s ", d.BindAddress), log.LstdFlags),
|
||||
stats: &Statistics{},
|
||||
statTags: map[string]string{"proto": d.Protocol, "bind": d.BindAddress},
|
||||
tcpConnections: make(map[string]*tcpConnection),
|
||||
done: make(chan struct{}),
|
||||
diagsKey: strings.Join([]string{"graphite", d.Protocol, d.BindAddress}, ":"),
|
||||
|
@ -129,11 +132,6 @@ func (s *Service) Open() error {
|
|||
|
||||
s.logger.Printf("Starting graphite service, batch size %d, batch timeout %s", s.batchSize, s.batchTimeout)
|
||||
|
||||
// Configure expvar monitoring. It's OK to do this even if the service fails to open and
|
||||
// should be done before any data could arrive for the service.
|
||||
tags := map[string]string{"proto": s.protocol, "bind": s.bindAddress}
|
||||
s.statMap = influxdb.NewStatistics(s.diagsKey, "graphite", tags)
|
||||
|
||||
// Register diagnostics if a Monitor service is available.
|
||||
if s.Monitor != nil {
|
||||
s.Monitor.RegisterDiagnosticsClient(s.diagsKey, s)
|
||||
|
@ -219,6 +217,38 @@ func (s *Service) SetLogOutput(w io.Writer) {
|
|||
s.logger = log.New(w, "[graphite] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// Statistics maintains statistics for the graphite service.
|
||||
type Statistics struct {
|
||||
PointsReceived int64
|
||||
BytesReceived int64
|
||||
PointsParseFail int64
|
||||
PointsNaNFail int64
|
||||
BatchesTransmitted int64
|
||||
PointsTransmitted int64
|
||||
BatchesTransmitFail int64
|
||||
ActiveConnections int64
|
||||
HandledConnections int64
|
||||
}
|
||||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
|
||||
return []models.Statistic{{
|
||||
Name: "graphite",
|
||||
Tags: s.statTags.Merge(tags),
|
||||
Values: map[string]interface{}{
|
||||
statPointsReceived: atomic.LoadInt64(&s.stats.PointsReceived),
|
||||
statBytesReceived: atomic.LoadInt64(&s.stats.BytesReceived),
|
||||
statPointsParseFail: atomic.LoadInt64(&s.stats.PointsParseFail),
|
||||
statPointsNaNFail: atomic.LoadInt64(&s.stats.PointsNaNFail),
|
||||
statBatchesTransmitted: atomic.LoadInt64(&s.stats.BatchesTransmitted),
|
||||
statPointsTransmitted: atomic.LoadInt64(&s.stats.PointsTransmitted),
|
||||
statBatchesTransmitFail: atomic.LoadInt64(&s.stats.BatchesTransmitFail),
|
||||
statConnectionsActive: atomic.LoadInt64(&s.stats.ActiveConnections),
|
||||
statConnectionsHandled: atomic.LoadInt64(&s.stats.HandledConnections),
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
||||
// Addr returns the address the Service binds to.
|
||||
func (s *Service) Addr() net.Addr {
|
||||
return s.addr
|
||||
|
@ -257,10 +287,10 @@ func (s *Service) openTCPServer() (net.Addr, error) {
|
|||
func (s *Service) handleTCPConnection(conn net.Conn) {
|
||||
defer s.wg.Done()
|
||||
defer conn.Close()
|
||||
defer s.statMap.Add(statConnectionsActive, -1)
|
||||
defer atomic.AddInt64(&s.stats.ActiveConnections, -1)
|
||||
defer s.untrackConnection(conn)
|
||||
s.statMap.Add(statConnectionsActive, 1)
|
||||
s.statMap.Add(statConnectionsHandled, 1)
|
||||
atomic.AddInt64(&s.stats.ActiveConnections, 1)
|
||||
atomic.AddInt64(&s.stats.HandledConnections, 1)
|
||||
s.trackConnection(conn)
|
||||
|
||||
reader := bufio.NewReader(conn)
|
||||
|
@ -275,8 +305,8 @@ func (s *Service) handleTCPConnection(conn net.Conn) {
|
|||
// Trim the buffer, even though there should be no padding
|
||||
line := strings.TrimSpace(string(buf))
|
||||
|
||||
s.statMap.Add(statPointsReceived, 1)
|
||||
s.statMap.Add(statBytesReceived, int64(len(buf)))
|
||||
atomic.AddInt64(&s.stats.PointsReceived, 1)
|
||||
atomic.AddInt64(&s.stats.BytesReceived, int64(len(buf)))
|
||||
s.handleLine(line)
|
||||
}
|
||||
}
|
||||
|
@ -330,8 +360,8 @@ func (s *Service) openUDPServer() (net.Addr, error) {
|
|||
for _, line := range lines {
|
||||
s.handleLine(line)
|
||||
}
|
||||
s.statMap.Add(statPointsReceived, int64(len(lines)))
|
||||
s.statMap.Add(statBytesReceived, int64(n))
|
||||
atomic.AddInt64(&s.stats.PointsReceived, int64(len(lines)))
|
||||
atomic.AddInt64(&s.stats.BytesReceived, int64(n))
|
||||
}
|
||||
}()
|
||||
return s.udpConn.LocalAddr(), nil
|
||||
|
@ -349,12 +379,12 @@ func (s *Service) handleLine(line string) {
|
|||
case *UnsupportedValueError:
|
||||
// Graphite ignores NaN values with no error.
|
||||
if math.IsNaN(err.Value) {
|
||||
s.statMap.Add(statPointsNaNFail, 1)
|
||||
atomic.AddInt64(&s.stats.PointsNaNFail, 1)
|
||||
return
|
||||
}
|
||||
}
|
||||
s.logger.Printf("unable to parse line: %s: %s", line, err)
|
||||
s.statMap.Add(statPointsParseFail, 1)
|
||||
atomic.AddInt64(&s.stats.PointsParseFail, 1)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -368,11 +398,11 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
|
|||
select {
|
||||
case batch := <-batcher.Out():
|
||||
if err := s.PointsWriter.WritePoints(s.database, s.retentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
|
||||
s.statMap.Add(statBatchesTransmitted, 1)
|
||||
s.statMap.Add(statPointsTransmitted, int64(len(batch)))
|
||||
atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
|
||||
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
|
||||
} else {
|
||||
s.logger.Printf("failed to write point batch to database %q: %s", s.database, err)
|
||||
s.statMap.Add(statBatchesTransmitFail, 1)
|
||||
atomic.AddInt64(&s.stats.BatchesTransmitFail, 1)
|
||||
}
|
||||
|
||||
case <-s.done:
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"compress/gzip"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
|
@ -13,8 +12,10 @@ import (
|
|||
"net/http/pprof"
|
||||
"os"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/bmizerany/pat"
|
||||
|
@ -22,6 +23,7 @@ import (
|
|||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/monitor"
|
||||
"github.com/influxdata/influxdb/services/continuous_querier"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
"github.com/influxdata/influxdb/uuid"
|
||||
|
@ -78,6 +80,10 @@ type Handler struct {
|
|||
|
||||
QueryExecutor *influxql.QueryExecutor
|
||||
|
||||
Monitor interface {
|
||||
Statistics(tags map[string]string) ([]*monitor.Statistic, error)
|
||||
}
|
||||
|
||||
PointsWriter interface {
|
||||
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
|
||||
}
|
||||
|
@ -87,17 +93,17 @@ type Handler struct {
|
|||
Config *Config
|
||||
Logger *log.Logger
|
||||
CLFLogger *log.Logger
|
||||
statMap *expvar.Map
|
||||
stats *Statistics
|
||||
}
|
||||
|
||||
// NewHandler returns a new instance of handler with routes.
|
||||
func NewHandler(c Config, statMap *expvar.Map) *Handler {
|
||||
func NewHandler(c Config) *Handler {
|
||||
h := &Handler{
|
||||
mux: pat.New(),
|
||||
Config: &c,
|
||||
Logger: log.New(os.Stderr, "[httpd] ", log.LstdFlags),
|
||||
CLFLogger: log.New(os.Stderr, "[httpd] ", 0),
|
||||
statMap: statMap,
|
||||
stats: &Statistics{},
|
||||
}
|
||||
|
||||
h.AddRoutes([]Route{
|
||||
|
@ -147,6 +153,56 @@ func NewHandler(c Config, statMap *expvar.Map) *Handler {
|
|||
return h
|
||||
}
|
||||
|
||||
// Statistics maintains statistics for the httpd service.
|
||||
type Statistics struct {
|
||||
Requests int64
|
||||
CQRequests int64
|
||||
QueryRequests int64
|
||||
WriteRequests int64
|
||||
PingRequests int64
|
||||
StatusRequests int64
|
||||
WriteRequestBytesReceived int64
|
||||
QueryRequestBytesTransmitted int64
|
||||
PointsWrittenOK int64
|
||||
PointsWrittenFail int64
|
||||
AuthenticationFailures int64
|
||||
RequestDuration int64
|
||||
QueryRequestDuration int64
|
||||
WriteRequestDuration int64
|
||||
ActiveRequests int64
|
||||
ActiveWriteRequests int64
|
||||
ClientErrors int64
|
||||
ServerErrors int64
|
||||
}
|
||||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
func (h *Handler) Statistics(tags map[string]string) []models.Statistic {
|
||||
return []models.Statistic{{
|
||||
Name: "httpd",
|
||||
Tags: tags,
|
||||
Values: map[string]interface{}{
|
||||
statRequest: atomic.LoadInt64(&h.stats.Requests),
|
||||
statCQRequest: atomic.LoadInt64(&h.stats.CQRequests),
|
||||
statQueryRequest: atomic.LoadInt64(&h.stats.QueryRequests),
|
||||
statWriteRequest: atomic.LoadInt64(&h.stats.WriteRequests),
|
||||
statPingRequest: atomic.LoadInt64(&h.stats.PingRequests),
|
||||
statStatusRequest: atomic.LoadInt64(&h.stats.StatusRequests),
|
||||
statWriteRequestBytesReceived: atomic.LoadInt64(&h.stats.WriteRequestBytesReceived),
|
||||
statQueryRequestBytesTransmitted: atomic.LoadInt64(&h.stats.QueryRequestBytesTransmitted),
|
||||
statPointsWrittenOK: atomic.LoadInt64(&h.stats.PointsWrittenOK),
|
||||
statPointsWrittenFail: atomic.LoadInt64(&h.stats.PointsWrittenFail),
|
||||
statAuthFail: atomic.LoadInt64(&h.stats.AuthenticationFailures),
|
||||
statRequestDuration: atomic.LoadInt64(&h.stats.RequestDuration),
|
||||
statQueryRequestDuration: atomic.LoadInt64(&h.stats.QueryRequestDuration),
|
||||
statWriteRequestDuration: atomic.LoadInt64(&h.stats.WriteRequestDuration),
|
||||
statRequestsActive: atomic.LoadInt64(&h.stats.ActiveRequests),
|
||||
statWriteRequestsActive: atomic.LoadInt64(&h.stats.ActiveWriteRequests),
|
||||
statClientError: atomic.LoadInt64(&h.stats.ClientErrors),
|
||||
statServerError: atomic.LoadInt64(&h.stats.ServerErrors),
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
||||
// AddRoutes sets the provided routes on the handler.
|
||||
func (h *Handler) AddRoutes(routes ...Route) {
|
||||
for _, r := range routes {
|
||||
|
@ -179,8 +235,9 @@ func (h *Handler) AddRoutes(routes ...Route) {
|
|||
|
||||
// ServeHTTP responds to HTTP request to the handler.
|
||||
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
h.statMap.Add(statRequest, 1)
|
||||
h.statMap.Add(statRequestsActive, 1)
|
||||
atomic.AddInt64(&h.stats.Requests, 1)
|
||||
atomic.AddInt64(&h.stats.ActiveRequests, 1)
|
||||
defer atomic.AddInt64(&h.stats.ActiveRequests, -1)
|
||||
start := time.Now()
|
||||
|
||||
// Add version header to all InfluxDB requests.
|
||||
|
@ -199,13 +256,12 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
pprof.Index(w, r)
|
||||
}
|
||||
} else if strings.HasPrefix(r.URL.Path, "/debug/vars") {
|
||||
serveExpvar(w, r)
|
||||
h.serveExpvar(w, r)
|
||||
} else {
|
||||
h.mux.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
h.statMap.Add(statRequestsActive, -1)
|
||||
h.statMap.Add(statRequestDuration, time.Since(start).Nanoseconds())
|
||||
atomic.AddInt64(&h.stats.RequestDuration, time.Since(start).Nanoseconds())
|
||||
}
|
||||
|
||||
// writeHeader writes the provided status code in the response, and
|
||||
|
@ -213,15 +269,15 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
func (h *Handler) writeHeader(w http.ResponseWriter, code int) {
|
||||
switch code / 100 {
|
||||
case 4:
|
||||
h.statMap.Add(statClientError, 1)
|
||||
atomic.AddInt64(&h.stats.ClientErrors, 1)
|
||||
case 5:
|
||||
h.statMap.Add(statServerError, 1)
|
||||
atomic.AddInt64(&h.stats.ServerErrors, 1)
|
||||
}
|
||||
w.WriteHeader(code)
|
||||
}
|
||||
|
||||
func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
|
||||
h.statMap.Add(statCQRequest, 1)
|
||||
atomic.AddInt64(&h.stats.CQRequests, 1)
|
||||
|
||||
// If the continuous query service isn't configured, return 404.
|
||||
if h.ContinuousQuerier == nil {
|
||||
|
@ -263,9 +319,9 @@ func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.R
|
|||
|
||||
// serveQuery parses an incoming query and, if valid, executes the query.
|
||||
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
|
||||
h.statMap.Add(statQueryRequest, 1)
|
||||
atomic.AddInt64(&h.stats.QueryRequests, 1)
|
||||
defer func(start time.Time) {
|
||||
h.statMap.Add(statQueryRequestDuration, time.Since(start).Nanoseconds())
|
||||
atomic.AddInt64(&h.stats.QueryRequestDuration, time.Since(start).Nanoseconds())
|
||||
}(time.Now())
|
||||
|
||||
pretty := r.FormValue("pretty") == "true"
|
||||
|
@ -403,7 +459,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
if !pretty {
|
||||
w.Write([]byte("\n"))
|
||||
}
|
||||
h.statMap.Add(statQueryRequestBytesTransmitted, int64(n))
|
||||
atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n))
|
||||
w.(http.Flusher).Flush()
|
||||
continue
|
||||
}
|
||||
|
@ -458,17 +514,17 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
// If it's not chunked we buffered everything in memory, so write it out
|
||||
if !chunked {
|
||||
n, _ := w.Write(MarshalJSON(resp, pretty))
|
||||
h.statMap.Add(statQueryRequestBytesTransmitted, int64(n))
|
||||
atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n))
|
||||
}
|
||||
}
|
||||
|
||||
// serveWrite receives incoming series data in line protocol format and writes it to the database.
|
||||
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
|
||||
h.statMap.Add(statWriteRequest, 1)
|
||||
h.statMap.Add(statWriteRequestsActive, 1)
|
||||
atomic.AddInt64(&h.stats.WriteRequests, 1)
|
||||
atomic.AddInt64(&h.stats.ActiveWriteRequests, 1)
|
||||
defer func(start time.Time) {
|
||||
h.statMap.Add(statWriteRequestsActive, -1)
|
||||
h.statMap.Add(statWriteRequestDuration, time.Since(start).Nanoseconds())
|
||||
atomic.AddInt64(&h.stats.ActiveWriteRequests, -1)
|
||||
atomic.AddInt64(&h.stats.WriteRequestDuration, time.Since(start).Nanoseconds())
|
||||
}(time.Now())
|
||||
|
||||
database := r.URL.Query().Get("db")
|
||||
|
@ -524,7 +580,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
h.resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
h.statMap.Add(statWriteRequestBytesReceived, int64(buf.Len()))
|
||||
atomic.AddInt64(&h.stats.WriteRequestBytesReceived, int64(buf.Len()))
|
||||
|
||||
if h.Config.WriteTracing {
|
||||
h.Logger.Printf("Write body received by handler: %s", buf.Bytes())
|
||||
|
@ -555,23 +611,23 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
|
||||
// Write points.
|
||||
if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, points); influxdb.IsClientError(err) {
|
||||
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
|
||||
atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
|
||||
h.resultError(w, influxql.Result{Err: err}, http.StatusBadRequest)
|
||||
return
|
||||
} else if err != nil {
|
||||
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
|
||||
atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
|
||||
h.resultError(w, influxql.Result{Err: err}, http.StatusInternalServerError)
|
||||
return
|
||||
} else if parseError != nil {
|
||||
// We wrote some of the points
|
||||
h.statMap.Add(statPointsWrittenOK, int64(len(points)))
|
||||
atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points)))
|
||||
// The other points failed to parse which means the client sent invalid line protocol. We return a 400
|
||||
// response code as well as the lines that failed to parse.
|
||||
h.resultError(w, influxql.Result{Err: fmt.Errorf("partial write:\n%v", parseError)}, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
h.statMap.Add(statPointsWrittenOK, int64(len(points)))
|
||||
atomic.AddInt64(&h.stats.PointsWrittenOK, int64(len(points)))
|
||||
h.writeHeader(w, http.StatusNoContent)
|
||||
}
|
||||
|
||||
|
@ -582,14 +638,14 @@ func (h *Handler) serveOptions(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
// servePing returns a simple response to let the client know the server is running.
|
||||
func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) {
|
||||
h.statMap.Add(statPingRequest, 1)
|
||||
atomic.AddInt64(&h.stats.PingRequests, 1)
|
||||
h.writeHeader(w, http.StatusNoContent)
|
||||
}
|
||||
|
||||
// serveStatus has been depricated
|
||||
// serveStatus has been deprecated
|
||||
func (h *Handler) serveStatus(w http.ResponseWriter, r *http.Request) {
|
||||
h.Logger.Printf("WARNING: /status has been depricated. Use /ping instead.")
|
||||
h.statMap.Add(statStatusRequest, 1)
|
||||
h.Logger.Printf("WARNING: /status has been deprecated. Use /ping instead.")
|
||||
atomic.AddInt64(&h.stats.StatusRequests, 1)
|
||||
h.writeHeader(w, http.StatusNoContent)
|
||||
}
|
||||
|
||||
|
@ -635,19 +691,71 @@ func MarshalJSON(v interface{}, pretty bool) []byte {
|
|||
return b
|
||||
}
|
||||
|
||||
// serveExpvar serves registered expvar information over HTTP.
|
||||
func serveExpvar(w http.ResponseWriter, r *http.Request) {
|
||||
// serveExpvar serves internal metrics in /debug/vars format over HTTP.
|
||||
func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) {
|
||||
// Retrieve statistics from the monitor.
|
||||
stats, err := h.Monitor.Statistics(nil)
|
||||
if err != nil {
|
||||
h.httpError(w, err.Error(), false, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
m := make(map[string]*monitor.Statistic)
|
||||
for _, s := range stats {
|
||||
// Very hackily create a unique key.
|
||||
buf := bytes.NewBufferString(s.Name)
|
||||
if path, ok := s.Tags["path"]; ok {
|
||||
fmt.Fprintf(buf, ":%s", path)
|
||||
if id, ok := s.Tags["id"]; ok {
|
||||
fmt.Fprintf(buf, ":%s", id)
|
||||
}
|
||||
} else if bind, ok := s.Tags["bind"]; ok {
|
||||
if proto, ok := s.Tags["proto"]; ok {
|
||||
fmt.Fprintf(buf, ":%s", proto)
|
||||
}
|
||||
fmt.Fprintf(buf, ":%s", bind)
|
||||
} else if database, ok := s.Tags["database"]; ok {
|
||||
fmt.Fprintf(buf, ":%s", database)
|
||||
if rp, ok := s.Tags["retention_policy"]; ok {
|
||||
fmt.Fprintf(buf, ":%s", rp)
|
||||
if name, ok := s.Tags["name"]; ok {
|
||||
fmt.Fprintf(buf, ":%s", name)
|
||||
}
|
||||
if dest, ok := s.Tags["destination"]; ok {
|
||||
fmt.Fprintf(buf, ":%s", dest)
|
||||
}
|
||||
}
|
||||
}
|
||||
key := buf.String()
|
||||
|
||||
m[key] = s
|
||||
}
|
||||
|
||||
// Sort the keys to simulate /debug/vars output.
|
||||
keys := make([]string, 0, len(m))
|
||||
for k := range m {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
fmt.Fprintf(w, "{\n")
|
||||
fmt.Fprintln(w, "{")
|
||||
first := true
|
||||
expvar.Do(func(kv expvar.KeyValue) {
|
||||
for _, key := range keys {
|
||||
// Marshal this statistic to JSON.
|
||||
out, err := json.Marshal(m[key])
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if !first {
|
||||
fmt.Fprintf(w, ",\n")
|
||||
fmt.Fprintln(w, ",")
|
||||
}
|
||||
first = false
|
||||
fmt.Fprintf(w, "%q: %s", kv.Key, kv.Value)
|
||||
})
|
||||
fmt.Fprintf(w, "\n}\n")
|
||||
fmt.Fprintf(w, "%q: ", key)
|
||||
w.Write(bytes.TrimSpace(out))
|
||||
}
|
||||
fmt.Fprintln(w, "\n}")
|
||||
}
|
||||
|
||||
// h.httpError writes an error to the client in a standard format.
|
||||
|
@ -751,7 +859,7 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, *meta.UserInfo)
|
|||
if requireAuthentication && adminExists {
|
||||
creds, err := parseCredentials(r)
|
||||
if err != nil {
|
||||
h.statMap.Add(statAuthFail, 1)
|
||||
atomic.AddInt64(&h.stats.AuthenticationFailures, 1)
|
||||
h.httpError(w, err.Error(), false, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
@ -759,14 +867,14 @@ func authenticate(inner func(http.ResponseWriter, *http.Request, *meta.UserInfo)
|
|||
switch creds.Method {
|
||||
case UserAuthentication:
|
||||
if creds.Username == "" {
|
||||
h.statMap.Add(statAuthFail, 1)
|
||||
atomic.AddInt64(&h.stats.AuthenticationFailures, 1)
|
||||
h.httpError(w, "username required", false, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
user, err = h.MetaClient.Authenticate(creds.Username, creds.Password)
|
||||
if err != nil {
|
||||
h.statMap.Add(statAuthFail, 1)
|
||||
atomic.AddInt64(&h.stats.AuthenticationFailures, 1)
|
||||
h.httpError(w, "authorization failed", false, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -13,7 +13,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/dgrijalva/jwt-go"
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/services/httpd"
|
||||
|
@ -427,14 +426,12 @@ type Handler struct {
|
|||
|
||||
// NewHandler returns a new instance of Handler.
|
||||
func NewHandler(requireAuthentication bool) *Handler {
|
||||
statMap := influxdb.NewStatistics("httpd", "httpd", nil)
|
||||
|
||||
config := httpd.NewConfig()
|
||||
config.AuthEnabled = requireAuthentication
|
||||
config.SharedSecret = "super secret key"
|
||||
|
||||
h := &Handler{
|
||||
Handler: httpd.NewHandler(config, statMap),
|
||||
Handler: httpd.NewHandler(config),
|
||||
}
|
||||
h.Handler.MetaClient = &h.MetaClient
|
||||
h.Handler.QueryExecutor = influxql.NewQueryExecutor()
|
||||
|
|
|
@ -2,7 +2,6 @@ package httpd // import "github.com/influxdata/influxdb/services/httpd"
|
|||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
|
@ -12,7 +11,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
)
|
||||
|
||||
// statistics gathered by the httpd package.
|
||||
|
@ -49,18 +48,11 @@ type Service struct {
|
|||
|
||||
Handler *Handler
|
||||
|
||||
Logger *log.Logger
|
||||
statMap *expvar.Map
|
||||
Logger *log.Logger
|
||||
}
|
||||
|
||||
// NewService returns a new instance of Service.
|
||||
func NewService(c Config) *Service {
|
||||
// Configure expvar monitoring. It's OK to do this even if the service fails to open and
|
||||
// should be done before any data could arrive for the service.
|
||||
key := strings.Join([]string{"httpd", c.BindAddress}, ":")
|
||||
tags := map[string]string{"bind": c.BindAddress}
|
||||
statMap := influxdb.NewStatistics(key, "httpd", tags)
|
||||
|
||||
s := &Service{
|
||||
addr: c.BindAddress,
|
||||
https: c.HTTPSEnabled,
|
||||
|
@ -68,7 +60,7 @@ func NewService(c Config) *Service {
|
|||
key: c.HTTPSPrivateKey,
|
||||
limit: c.MaxConnectionLimit,
|
||||
err: make(chan error),
|
||||
Handler: NewHandler(c, statMap),
|
||||
Handler: NewHandler(c),
|
||||
Logger: log.New(os.Stderr, "[httpd] ", log.LstdFlags),
|
||||
}
|
||||
if s.key == "" {
|
||||
|
@ -159,6 +151,11 @@ func (s *Service) Addr() net.Addr {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
|
||||
return s.Handler.Statistics(models.Tags{"bind": s.addr}.Merge(tags))
|
||||
}
|
||||
|
||||
// serve serves the handler from the listener.
|
||||
func (s *Service) serve() {
|
||||
// The listener was closed so exit
|
||||
|
|
|
@ -5,11 +5,11 @@ import (
|
|||
"compress/gzip"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"expvar"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
|
@ -27,7 +27,7 @@ type Handler struct {
|
|||
|
||||
Logger *log.Logger
|
||||
|
||||
statMap *expvar.Map
|
||||
stats *Statistics
|
||||
}
|
||||
|
||||
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -114,7 +114,9 @@ func (h *Handler) servePut(w http.ResponseWriter, r *http.Request) {
|
|||
pt, err := models.NewPoint(p.Metric, p.Tags, map[string]interface{}{"value": p.Value}, ts)
|
||||
if err != nil {
|
||||
h.Logger.Printf("Dropping point %v: %v", p.Metric, err)
|
||||
h.statMap.Add(statDroppedPointsInvalid, 1)
|
||||
if h.stats != nil {
|
||||
atomic.AddInt64(&h.stats.InvalidDroppedPoints, 1)
|
||||
}
|
||||
continue
|
||||
}
|
||||
points = append(points, pt)
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"bufio"
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"expvar"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
|
@ -14,9 +13,9 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
|
@ -34,7 +33,7 @@ const (
|
|||
statTelnetBadTime = "tlBadTime"
|
||||
statTelnetBadTag = "tlBadTag"
|
||||
statTelnetBadFloat = "tlBadFloat"
|
||||
statBatchesTrasmitted = "batchesTx"
|
||||
statBatchesTransmitted = "batchesTx"
|
||||
statPointsTransmitted = "pointsTx"
|
||||
statBatchesTransmitFail = "batchesTxFail"
|
||||
statConnectionsActive = "connsActive"
|
||||
|
@ -73,7 +72,9 @@ type Service struct {
|
|||
|
||||
LogPointErrors bool
|
||||
Logger *log.Logger
|
||||
statMap *expvar.Map
|
||||
|
||||
stats *Statistics
|
||||
statTags models.Tags
|
||||
}
|
||||
|
||||
// NewService returns a new instance of Service.
|
||||
|
@ -94,6 +95,8 @@ func NewService(c Config) (*Service, error) {
|
|||
batchTimeout: time.Duration(d.BatchTimeout),
|
||||
Logger: log.New(os.Stderr, "[opentsdb] ", log.LstdFlags),
|
||||
LogPointErrors: d.LogPointErrors,
|
||||
stats: &Statistics{},
|
||||
statTags: map[string]string{"bind": d.BindAddress},
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
@ -105,12 +108,6 @@ func (s *Service) Open() error {
|
|||
|
||||
s.Logger.Println("Starting OpenTSDB service")
|
||||
|
||||
// Configure expvar monitoring. It's OK to do this even if the service fails to open and
|
||||
// should be done before any data could arrive for the service.
|
||||
key := strings.Join([]string{"opentsdb", s.BindAddress}, ":")
|
||||
tags := map[string]string{"bind": s.BindAddress}
|
||||
s.statMap = influxdb.NewStatistics(key, "opentsdb", tags)
|
||||
|
||||
if _, err := s.MetaClient.CreateDatabase(s.Database); err != nil {
|
||||
s.Logger.Printf("Failed to ensure target database %s exists: %s", s.Database, err.Error())
|
||||
return err
|
||||
|
@ -181,6 +178,52 @@ func (s *Service) SetLogOutput(w io.Writer) {
|
|||
s.Logger = log.New(w, "[opentsdb] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// Statistics maintains statistics for the subscriber service.
|
||||
type Statistics struct {
|
||||
HTTPConnectionsHandled int64
|
||||
ActiveTelnetConnections int64
|
||||
HandledTelnetConnections int64
|
||||
TelnetPointsReceived int64
|
||||
TelnetBytesReceived int64
|
||||
TelnetReadError int64
|
||||
TelnetBadLine int64
|
||||
TelnetBadTime int64
|
||||
TelnetBadTag int64
|
||||
TelnetBadFloat int64
|
||||
BatchesTransmitted int64
|
||||
PointsTransmitted int64
|
||||
BatchesTransmitFail int64
|
||||
ActiveConnections int64
|
||||
HandledConnections int64
|
||||
InvalidDroppedPoints int64
|
||||
}
|
||||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
|
||||
return []models.Statistic{{
|
||||
Name: "opentsdb",
|
||||
Tags: s.statTags,
|
||||
Values: map[string]interface{}{
|
||||
statHTTPConnectionsHandled: atomic.LoadInt64(&s.stats.HTTPConnectionsHandled),
|
||||
statTelnetConnectionsActive: atomic.LoadInt64(&s.stats.ActiveTelnetConnections),
|
||||
statTelnetConnectionsHandled: atomic.LoadInt64(&s.stats.HandledTelnetConnections),
|
||||
statTelnetPointsReceived: atomic.LoadInt64(&s.stats.TelnetPointsReceived),
|
||||
statTelnetBytesReceived: atomic.LoadInt64(&s.stats.TelnetBytesReceived),
|
||||
statTelnetReadError: atomic.LoadInt64(&s.stats.TelnetReadError),
|
||||
statTelnetBadLine: atomic.LoadInt64(&s.stats.TelnetBadLine),
|
||||
statTelnetBadTime: atomic.LoadInt64(&s.stats.TelnetBadTime),
|
||||
statTelnetBadTag: atomic.LoadInt64(&s.stats.TelnetBadTag),
|
||||
statTelnetBadFloat: atomic.LoadInt64(&s.stats.TelnetBadFloat),
|
||||
statBatchesTransmitted: atomic.LoadInt64(&s.stats.BatchesTransmitted),
|
||||
statPointsTransmitted: atomic.LoadInt64(&s.stats.PointsTransmitted),
|
||||
statBatchesTransmitFail: atomic.LoadInt64(&s.stats.BatchesTransmitFail),
|
||||
statConnectionsActive: atomic.LoadInt64(&s.stats.ActiveConnections),
|
||||
statConnectionsHandled: atomic.LoadInt64(&s.stats.HandledConnections),
|
||||
statDroppedPointsInvalid: atomic.LoadInt64(&s.stats.InvalidDroppedPoints),
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
||||
// Err returns a channel for fatal errors that occur on the listener.
|
||||
func (s *Service) Err() <-chan error { return s.err }
|
||||
|
||||
|
@ -214,9 +257,9 @@ func (s *Service) serve() {
|
|||
|
||||
// handleConn processes conn. This is run in a separate goroutine.
|
||||
func (s *Service) handleConn(conn net.Conn) {
|
||||
defer s.statMap.Add(statConnectionsActive, -1)
|
||||
s.statMap.Add(statConnectionsActive, 1)
|
||||
s.statMap.Add(statConnectionsHandled, 1)
|
||||
defer atomic.AddInt64(&s.stats.ActiveConnections, -1)
|
||||
atomic.AddInt64(&s.stats.ActiveConnections, 1)
|
||||
atomic.AddInt64(&s.stats.HandledConnections, 1)
|
||||
|
||||
// Read header into buffer to check if it's HTTP.
|
||||
var buf bytes.Buffer
|
||||
|
@ -231,7 +274,7 @@ func (s *Service) handleConn(conn net.Conn) {
|
|||
|
||||
// If no HTTP parsing error occurred then process as HTTP.
|
||||
if err == nil {
|
||||
s.statMap.Add(statHTTPConnectionsHandled, 1)
|
||||
atomic.AddInt64(&s.stats.HTTPConnectionsHandled, 1)
|
||||
s.httpln.ch <- conn
|
||||
return
|
||||
}
|
||||
|
@ -247,9 +290,9 @@ func (s *Service) handleConn(conn net.Conn) {
|
|||
func (s *Service) handleTelnetConn(conn net.Conn) {
|
||||
defer conn.Close()
|
||||
defer s.wg.Done()
|
||||
defer s.statMap.Add(statTelnetConnectionsActive, -1)
|
||||
s.statMap.Add(statTelnetConnectionsActive, 1)
|
||||
s.statMap.Add(statTelnetConnectionsHandled, 1)
|
||||
defer atomic.AddInt64(&s.stats.ActiveTelnetConnections, -1)
|
||||
atomic.AddInt64(&s.stats.ActiveTelnetConnections, 1)
|
||||
atomic.AddInt64(&s.stats.HandledTelnetConnections, 1)
|
||||
|
||||
// Get connection details.
|
||||
remoteAddr := conn.RemoteAddr().String()
|
||||
|
@ -260,13 +303,13 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
|
|||
line, err := r.ReadLine()
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
s.statMap.Add(statTelnetReadError, 1)
|
||||
atomic.AddInt64(&s.stats.TelnetReadError, 1)
|
||||
s.Logger.Println("error reading from openTSDB connection", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
s.statMap.Add(statTelnetPointsReceived, 1)
|
||||
s.statMap.Add(statTelnetBytesReceived, int64(len(line)))
|
||||
atomic.AddInt64(&s.stats.TelnetPointsReceived, 1)
|
||||
atomic.AddInt64(&s.stats.TelnetBytesReceived, int64(len(line)))
|
||||
|
||||
inputStrs := strings.Fields(line)
|
||||
|
||||
|
@ -276,7 +319,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
|
|||
}
|
||||
|
||||
if len(inputStrs) < 4 || inputStrs[0] != "put" {
|
||||
s.statMap.Add(statTelnetBadLine, 1)
|
||||
atomic.AddInt64(&s.stats.TelnetBadLine, 1)
|
||||
if s.LogPointErrors {
|
||||
s.Logger.Printf("malformed line '%s' from %s", line, remoteAddr)
|
||||
}
|
||||
|
@ -291,7 +334,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
|
|||
var t time.Time
|
||||
ts, err := strconv.ParseInt(tsStr, 10, 64)
|
||||
if err != nil {
|
||||
s.statMap.Add(statTelnetBadTime, 1)
|
||||
atomic.AddInt64(&s.stats.TelnetBadTime, 1)
|
||||
if s.LogPointErrors {
|
||||
s.Logger.Printf("malformed time '%s' from %s", tsStr, remoteAddr)
|
||||
}
|
||||
|
@ -305,7 +348,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
|
|||
t = time.Unix(ts/1000, (ts%1000)*1000)
|
||||
break
|
||||
default:
|
||||
s.statMap.Add(statTelnetBadTime, 1)
|
||||
atomic.AddInt64(&s.stats.TelnetBadTime, 1)
|
||||
if s.LogPointErrors {
|
||||
s.Logger.Printf("bad time '%s' must be 10 or 13 chars, from %s ", tsStr, remoteAddr)
|
||||
}
|
||||
|
@ -316,7 +359,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
|
|||
for t := range tagStrs {
|
||||
parts := strings.SplitN(tagStrs[t], "=", 2)
|
||||
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
|
||||
s.statMap.Add(statTelnetBadTag, 1)
|
||||
atomic.AddInt64(&s.stats.TelnetBadTag, 1)
|
||||
if s.LogPointErrors {
|
||||
s.Logger.Printf("malformed tag data '%v' from %s", tagStrs[t], remoteAddr)
|
||||
}
|
||||
|
@ -330,7 +373,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
|
|||
fields := make(map[string]interface{})
|
||||
fv, err := strconv.ParseFloat(valueStr, 64)
|
||||
if err != nil {
|
||||
s.statMap.Add(statTelnetBadFloat, 1)
|
||||
atomic.AddInt64(&s.stats.TelnetBadFloat, 1)
|
||||
if s.LogPointErrors {
|
||||
s.Logger.Printf("bad float '%s' from %s", valueStr, remoteAddr)
|
||||
}
|
||||
|
@ -340,7 +383,7 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
|
|||
|
||||
pt, err := models.NewPoint(measurement, tags, fields, t)
|
||||
if err != nil {
|
||||
s.statMap.Add(statTelnetBadFloat, 1)
|
||||
atomic.AddInt64(&s.stats.TelnetBadFloat, 1)
|
||||
if s.LogPointErrors {
|
||||
s.Logger.Printf("bad float '%s' from %s", valueStr, remoteAddr)
|
||||
}
|
||||
|
@ -352,13 +395,14 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
|
|||
|
||||
// serveHTTP handles connections in HTTP format.
|
||||
func (s *Service) serveHTTP() {
|
||||
srv := &http.Server{Handler: &Handler{
|
||||
handler := &Handler{
|
||||
Database: s.Database,
|
||||
RetentionPolicy: s.RetentionPolicy,
|
||||
PointsWriter: s.PointsWriter,
|
||||
Logger: s.Logger,
|
||||
statMap: s.statMap,
|
||||
}}
|
||||
stats: s.stats,
|
||||
}
|
||||
srv := &http.Server{Handler: handler}
|
||||
srv.Serve(s.httpln)
|
||||
}
|
||||
|
||||
|
@ -369,11 +413,11 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
|
|||
select {
|
||||
case batch := <-batcher.Out():
|
||||
if err := s.PointsWriter.WritePoints(s.Database, s.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
|
||||
s.statMap.Add(statBatchesTrasmitted, 1)
|
||||
s.statMap.Add(statPointsTransmitted, int64(len(batch)))
|
||||
atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
|
||||
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
|
||||
} else {
|
||||
s.Logger.Printf("failed to write point batch to database %q: %s", s.Database, err)
|
||||
s.statMap.Add(statBatchesTransmitFail, 1)
|
||||
atomic.AddInt64(&s.stats.BatchesTransmitFail, 1)
|
||||
}
|
||||
|
||||
case <-s.done:
|
||||
|
|
|
@ -2,18 +2,18 @@ package subscriber // import "github.com/influxdata/influxdb/services/subscriber
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/coordinator"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/monitor"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
)
|
||||
|
||||
|
@ -47,7 +47,7 @@ type Service struct {
|
|||
NewPointsWriter func(u url.URL) (PointsWriter, error)
|
||||
Logger *log.Logger
|
||||
update chan struct{}
|
||||
statMap *expvar.Map
|
||||
stats *Statistics
|
||||
points chan *coordinator.WritePointsRequest
|
||||
wg sync.WaitGroup
|
||||
closed bool
|
||||
|
@ -55,23 +55,19 @@ type Service struct {
|
|||
mu sync.Mutex
|
||||
conf Config
|
||||
|
||||
failures *expvar.Int
|
||||
pointsWritten *expvar.Int
|
||||
subs map[subEntry]chanWriter
|
||||
subMu sync.RWMutex
|
||||
}
|
||||
|
||||
// NewService returns a subscriber service with given settings
|
||||
func NewService(c Config) *Service {
|
||||
s := &Service{
|
||||
Logger: log.New(os.Stderr, "[subscriber] ", log.LstdFlags),
|
||||
statMap: influxdb.NewStatistics("subscriber", "subscriber", nil),
|
||||
closed: true,
|
||||
conf: c,
|
||||
failures: &expvar.Int{},
|
||||
pointsWritten: &expvar.Int{},
|
||||
Logger: log.New(os.Stderr, "[subscriber] ", log.LstdFlags),
|
||||
closed: true,
|
||||
stats: &Statistics{},
|
||||
conf: c,
|
||||
}
|
||||
s.NewPointsWriter = s.newPointsWriter
|
||||
s.statMap.Set(statWriteFailures, s.failures)
|
||||
s.statMap.Set(statPointsWritten, s.pointsWritten)
|
||||
return s
|
||||
}
|
||||
|
||||
|
@ -124,6 +120,32 @@ func (s *Service) SetLogOutput(w io.Writer) {
|
|||
s.Logger = log.New(w, "[subscriber] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// Statistics maintains the statistics for the subscriber service.
|
||||
type Statistics struct {
|
||||
WriteFailures int64
|
||||
PointsWritten int64
|
||||
}
|
||||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
|
||||
statistics := []models.Statistic{{
|
||||
Name: "subscriber",
|
||||
Tags: tags,
|
||||
Values: map[string]interface{}{
|
||||
statPointsWritten: atomic.LoadInt64(&s.stats.PointsWritten),
|
||||
statWriteFailures: atomic.LoadInt64(&s.stats.WriteFailures),
|
||||
},
|
||||
}}
|
||||
|
||||
s.subMu.RLock()
|
||||
defer s.subMu.RUnlock()
|
||||
|
||||
for _, sub := range s.subs {
|
||||
statistics = append(statistics, sub.Statistics(tags)...)
|
||||
}
|
||||
return statistics
|
||||
}
|
||||
|
||||
func (s *Service) waitForMetaUpdates() {
|
||||
for {
|
||||
ch := s.MetaClient.WaitForDataChanged()
|
||||
|
@ -161,7 +183,7 @@ func (s *Service) createSubscription(se subEntry, mode string, destinations []st
|
|||
return nil, fmt.Errorf("unknown balance mode %q", mode)
|
||||
}
|
||||
writers := make([]PointsWriter, len(destinations))
|
||||
statMaps := make([]*expvar.Map, len(writers))
|
||||
stats := make([]writerStats, len(writers))
|
||||
for i, dest := range destinations {
|
||||
u, err := url.Parse(dest)
|
||||
if err != nil {
|
||||
|
@ -172,20 +194,18 @@ func (s *Service) createSubscription(se subEntry, mode string, destinations []st
|
|||
return nil, err
|
||||
}
|
||||
writers[i] = w
|
||||
tags := map[string]string{
|
||||
stats[i].dest = dest
|
||||
}
|
||||
return &balancewriter{
|
||||
bm: bm,
|
||||
writers: writers,
|
||||
stats: stats,
|
||||
tags: map[string]string{
|
||||
"database": se.db,
|
||||
"retention_policy": se.rp,
|
||||
"name": se.name,
|
||||
"mode": mode,
|
||||
"destination": dest,
|
||||
}
|
||||
key := strings.Join([]string{"subscriber", se.db, se.rp, se.name, dest}, ":")
|
||||
statMaps[i] = influxdb.NewStatistics(key, "subscriber", tags)
|
||||
}
|
||||
return &balancewriter{
|
||||
bm: bm,
|
||||
writers: writers,
|
||||
statMaps: statMaps,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -197,32 +217,28 @@ func (s *Service) Points() chan<- *coordinator.WritePointsRequest {
|
|||
// read points off chan and write them
|
||||
func (s *Service) run() {
|
||||
var wg sync.WaitGroup
|
||||
subs := make(map[subEntry]chanWriter)
|
||||
s.subs = make(map[subEntry]chanWriter)
|
||||
// Perform initial update
|
||||
s.updateSubs(subs, &wg)
|
||||
s.updateSubs(&wg)
|
||||
for {
|
||||
select {
|
||||
case <-s.update:
|
||||
err := s.updateSubs(subs, &wg)
|
||||
err := s.updateSubs(&wg)
|
||||
if err != nil {
|
||||
s.Logger.Println("failed to update subscriptions:", err)
|
||||
}
|
||||
case p, ok := <-s.points:
|
||||
if !ok {
|
||||
// Close out all chanWriters
|
||||
for _, cw := range subs {
|
||||
cw.Close()
|
||||
}
|
||||
// Wait for them to finish
|
||||
wg.Wait()
|
||||
s.close(&wg)
|
||||
return
|
||||
}
|
||||
for se, cw := range subs {
|
||||
for se, cw := range s.subs {
|
||||
if p.Database == se.db && p.RetentionPolicy == se.rp {
|
||||
select {
|
||||
case cw.writeRequests <- p:
|
||||
default:
|
||||
s.failures.Add(1)
|
||||
atomic.AddInt64(&s.stats.WriteFailures, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -230,7 +246,27 @@ func (s *Service) run() {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Service) updateSubs(subs map[subEntry]chanWriter, wg *sync.WaitGroup) error {
|
||||
// close closes the existing channel writers
|
||||
func (s *Service) close(wg *sync.WaitGroup) {
|
||||
s.subMu.Lock()
|
||||
defer s.subMu.Unlock()
|
||||
|
||||
for _, cw := range s.subs {
|
||||
cw.Close()
|
||||
}
|
||||
// Wait for them to finish
|
||||
wg.Wait()
|
||||
s.subs = nil
|
||||
}
|
||||
|
||||
func (s *Service) updateSubs(wg *sync.WaitGroup) error {
|
||||
s.subMu.Lock()
|
||||
defer s.subMu.Unlock()
|
||||
|
||||
if s.subs == nil {
|
||||
s.subs = make(map[subEntry]chanWriter)
|
||||
}
|
||||
|
||||
dbis := s.MetaClient.Databases()
|
||||
allEntries := make(map[subEntry]bool, 0)
|
||||
// Add in new subscriptions
|
||||
|
@ -243,7 +279,7 @@ func (s *Service) updateSubs(subs map[subEntry]chanWriter, wg *sync.WaitGroup) e
|
|||
name: si.Name,
|
||||
}
|
||||
allEntries[se] = true
|
||||
if _, ok := subs[se]; ok {
|
||||
if _, ok := s.subs[se]; ok {
|
||||
continue
|
||||
}
|
||||
sub, err := s.createSubscription(se, si.Mode, si.Destinations)
|
||||
|
@ -253,8 +289,8 @@ func (s *Service) updateSubs(subs map[subEntry]chanWriter, wg *sync.WaitGroup) e
|
|||
cw := chanWriter{
|
||||
writeRequests: make(chan *coordinator.WritePointsRequest, 100),
|
||||
pw: sub,
|
||||
failures: s.failures,
|
||||
pointsWritten: s.pointsWritten,
|
||||
pointsWritten: &s.stats.PointsWritten,
|
||||
failures: &s.stats.WriteFailures,
|
||||
logger: s.Logger,
|
||||
}
|
||||
wg.Add(1)
|
||||
|
@ -262,20 +298,20 @@ func (s *Service) updateSubs(subs map[subEntry]chanWriter, wg *sync.WaitGroup) e
|
|||
defer wg.Done()
|
||||
cw.Run()
|
||||
}()
|
||||
subs[se] = cw
|
||||
s.subs[se] = cw
|
||||
s.Logger.Println("added new subscription for", se.db, se.rp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove deleted subs
|
||||
for se := range subs {
|
||||
for se := range s.subs {
|
||||
if !allEntries[se] {
|
||||
// Close the chanWriter
|
||||
subs[se].Close()
|
||||
s.subs[se].Close()
|
||||
|
||||
// Remove it from the set
|
||||
delete(subs, se)
|
||||
delete(s.subs, se)
|
||||
s.Logger.Println("deleted old subscription for", se.db, se.rp)
|
||||
}
|
||||
}
|
||||
|
@ -299,8 +335,8 @@ func (s *Service) newPointsWriter(u url.URL) (PointsWriter, error) {
|
|||
type chanWriter struct {
|
||||
writeRequests chan *coordinator.WritePointsRequest
|
||||
pw PointsWriter
|
||||
pointsWritten *expvar.Int
|
||||
failures *expvar.Int
|
||||
pointsWritten *int64
|
||||
failures *int64
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
|
@ -314,13 +350,21 @@ func (c chanWriter) Run() {
|
|||
err := c.pw.WritePoints(wr)
|
||||
if err != nil {
|
||||
c.logger.Println(err)
|
||||
c.failures.Add(1)
|
||||
atomic.AddInt64(c.failures, 1)
|
||||
} else {
|
||||
c.pointsWritten.Add(int64(len(wr.Points)))
|
||||
atomic.AddInt64(c.pointsWritten, int64(len(wr.Points)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
func (c chanWriter) Statistics(tags map[string]string) []models.Statistic {
|
||||
if m, ok := c.pw.(monitor.Reporter); ok {
|
||||
return m.Statistics(tags)
|
||||
}
|
||||
return []models.Statistic{}
|
||||
}
|
||||
|
||||
// BalanceMode sets what balance mode to use on a subscription.
|
||||
// valid options are currently ALL or ANY
|
||||
type BalanceMode int
|
||||
|
@ -331,12 +375,19 @@ const (
|
|||
ANY
|
||||
)
|
||||
|
||||
type writerStats struct {
|
||||
dest string
|
||||
failures int64
|
||||
pointsWritten int64
|
||||
}
|
||||
|
||||
// balances writes across PointsWriters according to BalanceMode
|
||||
type balancewriter struct {
|
||||
bm BalanceMode
|
||||
writers []PointsWriter
|
||||
statMaps []*expvar.Map
|
||||
i int
|
||||
bm BalanceMode
|
||||
writers []PointsWriter
|
||||
stats []writerStats
|
||||
tags map[string]string
|
||||
i int
|
||||
}
|
||||
|
||||
func (b *balancewriter) WritePoints(p *coordinator.WritePointsRequest) error {
|
||||
|
@ -351,9 +402,9 @@ func (b *balancewriter) WritePoints(p *coordinator.WritePointsRequest) error {
|
|||
err := w.WritePoints(p)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
b.statMaps[i].Add(statWriteFailures, 1)
|
||||
atomic.AddInt64(&b.stats[i].failures, 1)
|
||||
} else {
|
||||
b.statMaps[i].Add(statPointsWritten, int64(len(p.Points)))
|
||||
atomic.AddInt64(&b.stats[i].pointsWritten, int64(len(p.Points)))
|
||||
if b.bm == ANY {
|
||||
break
|
||||
}
|
||||
|
@ -361,3 +412,21 @@ func (b *balancewriter) WritePoints(p *coordinator.WritePointsRequest) error {
|
|||
}
|
||||
return lastErr
|
||||
}
|
||||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
func (b *balancewriter) Statistics(tags map[string]string) []models.Statistic {
|
||||
tags = models.Tags(tags).Merge(b.tags)
|
||||
|
||||
statistics := make([]models.Statistic, len(b.stats))
|
||||
for i := range b.stats {
|
||||
statistics[i] = models.Statistic{
|
||||
Name: "subscriber",
|
||||
Tags: models.Tags(tags).Merge(map[string]string{"destination": b.stats[i].dest}),
|
||||
Values: map[string]interface{}{
|
||||
statPointsWritten: atomic.LoadInt64(&b.stats[i].pointsWritten),
|
||||
statWriteFailures: atomic.LoadInt64(&b.stats[i].failures),
|
||||
},
|
||||
}
|
||||
}
|
||||
return statistics
|
||||
}
|
||||
|
|
|
@ -2,16 +2,14 @@ package udp // import "github.com/influxdata/influxdb/services/udp"
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"expvar"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
|
@ -30,7 +28,7 @@ const (
|
|||
statBytesReceived = "bytesRx"
|
||||
statPointsParseFail = "pointsParseFail"
|
||||
statReadFail = "readFail"
|
||||
statBatchesTrasmitted = "batchesTx"
|
||||
statBatchesTransmitted = "batchesTx"
|
||||
statPointsTransmitted = "pointsTx"
|
||||
statBatchesTransmitFail = "batchesTxFail"
|
||||
)
|
||||
|
@ -58,8 +56,9 @@ type Service struct {
|
|||
CreateDatabase(name string) (*meta.DatabaseInfo, error)
|
||||
}
|
||||
|
||||
Logger *log.Logger
|
||||
statMap *expvar.Map
|
||||
Logger *log.Logger
|
||||
stats *Statistics
|
||||
statTags models.Tags
|
||||
}
|
||||
|
||||
// NewService returns a new instance of Service.
|
||||
|
@ -71,17 +70,13 @@ func NewService(c Config) *Service {
|
|||
parserChan: make(chan []byte, parserChanLen),
|
||||
batcher: tsdb.NewPointBatcher(d.BatchSize, d.BatchPending, time.Duration(d.BatchTimeout)),
|
||||
Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags),
|
||||
stats: &Statistics{},
|
||||
statTags: map[string]string{"bind": d.BindAddress},
|
||||
}
|
||||
}
|
||||
|
||||
// Open starts the service
|
||||
func (s *Service) Open() (err error) {
|
||||
// Configure expvar monitoring. It's OK to do this even if the service fails to open and
|
||||
// should be done before any data could arrive for the service.
|
||||
key := strings.Join([]string{"udp", s.config.BindAddress}, ":")
|
||||
tags := map[string]string{"bind": s.config.BindAddress}
|
||||
s.statMap = influxdb.NewStatistics(key, "udp", tags)
|
||||
|
||||
if s.config.BindAddress == "" {
|
||||
return errors.New("bind address has to be specified in config")
|
||||
}
|
||||
|
@ -124,6 +119,34 @@ func (s *Service) Open() (err error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Statistics maintains statistics for the UDP service.
|
||||
type Statistics struct {
|
||||
PointsReceived int64
|
||||
BytesReceived int64
|
||||
PointsParseFail int64
|
||||
ReadFail int64
|
||||
BatchesTransmitted int64
|
||||
PointsTransmitted int64
|
||||
BatchesTransmitFail int64
|
||||
}
|
||||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
|
||||
return []models.Statistic{{
|
||||
Name: "udp",
|
||||
Tags: s.statTags,
|
||||
Values: map[string]interface{}{
|
||||
statPointsReceived: atomic.LoadInt64(&s.stats.PointsReceived),
|
||||
statBytesReceived: atomic.LoadInt64(&s.stats.BytesReceived),
|
||||
statPointsParseFail: atomic.LoadInt64(&s.stats.PointsParseFail),
|
||||
statReadFail: atomic.LoadInt64(&s.stats.ReadFail),
|
||||
statBatchesTransmitted: atomic.LoadInt64(&s.stats.BatchesTransmitted),
|
||||
statPointsTransmitted: atomic.LoadInt64(&s.stats.PointsTransmitted),
|
||||
statBatchesTransmitFail: atomic.LoadInt64(&s.stats.BatchesTransmitFail),
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
||||
func (s *Service) writer() {
|
||||
defer s.wg.Done()
|
||||
|
||||
|
@ -131,11 +154,11 @@ func (s *Service) writer() {
|
|||
select {
|
||||
case batch := <-s.batcher.Out():
|
||||
if err := s.PointsWriter.WritePoints(s.config.Database, s.config.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
|
||||
s.statMap.Add(statBatchesTrasmitted, 1)
|
||||
s.statMap.Add(statPointsTransmitted, int64(len(batch)))
|
||||
atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
|
||||
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
|
||||
} else {
|
||||
s.Logger.Printf("failed to write point batch to database %q: %s", s.config.Database, err)
|
||||
s.statMap.Add(statBatchesTransmitFail, 1)
|
||||
atomic.AddInt64(&s.stats.BatchesTransmitFail, 1)
|
||||
}
|
||||
|
||||
case <-s.done:
|
||||
|
@ -159,11 +182,11 @@ func (s *Service) serve() {
|
|||
// Keep processing.
|
||||
n, _, err := s.conn.ReadFromUDP(buf)
|
||||
if err != nil {
|
||||
s.statMap.Add(statReadFail, 1)
|
||||
atomic.AddInt64(&s.stats.ReadFail, 1)
|
||||
s.Logger.Printf("Failed to read UDP message: %s", err)
|
||||
continue
|
||||
}
|
||||
s.statMap.Add(statBytesReceived, int64(n))
|
||||
atomic.AddInt64(&s.stats.BytesReceived, int64(n))
|
||||
|
||||
bufCopy := make([]byte, n)
|
||||
copy(bufCopy, buf[:n])
|
||||
|
@ -182,7 +205,7 @@ func (s *Service) parser() {
|
|||
case buf := <-s.parserChan:
|
||||
points, err := models.ParsePointsWithPrecision(buf, time.Now().UTC(), s.config.Precision)
|
||||
if err != nil {
|
||||
s.statMap.Add(statPointsParseFail, 1)
|
||||
atomic.AddInt64(&s.stats.PointsParseFail, 1)
|
||||
s.Logger.Printf("Failed to parse points: %s", err)
|
||||
continue
|
||||
}
|
||||
|
@ -190,7 +213,8 @@ func (s *Service) parser() {
|
|||
for _, point := range points {
|
||||
s.batcher.In() <- point
|
||||
}
|
||||
s.statMap.Add(statPointsReceived, int64(len(points)))
|
||||
atomic.AddInt64(&s.stats.PointsReceived, int64(len(points)))
|
||||
atomic.AddInt64(&s.stats.PointsReceived, int64(len(points)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,6 +47,9 @@ type Engine interface {
|
|||
// Format will return the format for the engine
|
||||
Format() EngineFormat
|
||||
|
||||
// Statistics will return statistics relevant to this engine.
|
||||
Statistics(tags map[string]string) []models.Statistic
|
||||
|
||||
io.WriterTo
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
|
@ -9,10 +8,10 @@ import (
|
|||
"os"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -137,22 +136,17 @@ type Cache struct {
|
|||
// This number is the number of pending or failed WriteSnaphot attempts since the last successful one.
|
||||
snapshotAttempts int
|
||||
|
||||
statMap *expvar.Map // nil for snapshots.
|
||||
stats *CacheStatistics
|
||||
lastSnapshot time.Time
|
||||
}
|
||||
|
||||
// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory.
|
||||
// Only used for engine caches, never for snapshots
|
||||
func NewCache(maxSize uint64, path string) *Cache {
|
||||
db, rp := tsdb.DecodeStorePath(path)
|
||||
c := &Cache{
|
||||
maxSize: maxSize,
|
||||
store: make(map[string]*entry),
|
||||
statMap: influxdb.NewStatistics(
|
||||
"tsm1_cache:"+path,
|
||||
"tsm1_cache",
|
||||
map[string]string{"path": path, "database": db, "retentionPolicy": rp},
|
||||
),
|
||||
maxSize: maxSize,
|
||||
store: make(map[string]*entry),
|
||||
stats: &CacheStatistics{},
|
||||
lastSnapshot: time.Now(),
|
||||
}
|
||||
c.UpdateAge()
|
||||
|
@ -163,6 +157,32 @@ func NewCache(maxSize uint64, path string) *Cache {
|
|||
return c
|
||||
}
|
||||
|
||||
// CacheStatistics hold statistics related to the cache.
|
||||
type CacheStatistics struct {
|
||||
MemSizeBytes int64
|
||||
DiskSizeBytes int64
|
||||
SnapshotCount int64
|
||||
CacheAgeMs int64
|
||||
CachedBytes int64
|
||||
WALCompactionTimeMs int64
|
||||
}
|
||||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
func (c *Cache) Statistics(tags map[string]string) []models.Statistic {
|
||||
return []models.Statistic{{
|
||||
Name: "tsm1_cache",
|
||||
Tags: tags,
|
||||
Values: map[string]interface{}{
|
||||
statCacheMemoryBytes: atomic.LoadInt64(&c.stats.MemSizeBytes),
|
||||
statCacheDiskBytes: atomic.LoadInt64(&c.stats.DiskSizeBytes),
|
||||
statSnapshots: atomic.LoadInt64(&c.stats.SnapshotCount),
|
||||
statCacheAgeMs: atomic.LoadInt64(&c.stats.CacheAgeMs),
|
||||
statCachedBytes: atomic.LoadInt64(&c.stats.CachedBytes),
|
||||
statWALCompactionTimeMs: atomic.LoadInt64(&c.stats.WALCompactionTimeMs),
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
||||
// Write writes the set of values for the key to the cache. This function is goroutine-safe.
|
||||
// It returns an error if the cache has exceeded its max size.
|
||||
func (c *Cache) Write(key string, values []Value) error {
|
||||
|
@ -568,34 +588,28 @@ func (cl *CacheLoader) SetLogOutput(w io.Writer) {
|
|||
func (c *Cache) UpdateAge() {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
ageStat := new(expvar.Int)
|
||||
ageStat.Set(int64(time.Now().Sub(c.lastSnapshot) / time.Millisecond))
|
||||
c.statMap.Set(statCacheAgeMs, ageStat)
|
||||
ageStat := int64(time.Now().Sub(c.lastSnapshot) / time.Millisecond)
|
||||
atomic.StoreInt64(&c.stats.CacheAgeMs, ageStat)
|
||||
}
|
||||
|
||||
// Updates WAL compaction time statistic
|
||||
func (c *Cache) UpdateCompactTime(d time.Duration) {
|
||||
c.statMap.Add(statWALCompactionTimeMs, int64(d/time.Millisecond))
|
||||
atomic.AddInt64(&c.stats.WALCompactionTimeMs, int64(d/time.Millisecond))
|
||||
}
|
||||
|
||||
// Update the cachedBytes counter
|
||||
func (c *Cache) updateCachedBytes(b uint64) {
|
||||
c.statMap.Add(statCachedBytes, int64(b))
|
||||
atomic.AddInt64(&c.stats.CachedBytes, int64(b))
|
||||
}
|
||||
|
||||
// Update the memSize level
|
||||
func (c *Cache) updateMemSize(b int64) {
|
||||
c.statMap.Add(statCacheMemoryBytes, b)
|
||||
atomic.AddInt64(&c.stats.MemSizeBytes, b)
|
||||
}
|
||||
|
||||
// Update the snapshotsCount and the diskSize levels
|
||||
func (c *Cache) updateSnapshots() {
|
||||
// Update disk stats
|
||||
diskSizeStat := new(expvar.Int)
|
||||
diskSizeStat.Set(int64(c.snapshotSize))
|
||||
c.statMap.Set(statCacheDiskBytes, diskSizeStat)
|
||||
|
||||
snapshotsStat := new(expvar.Int)
|
||||
snapshotsStat.Set(int64(c.snapshotAttempts))
|
||||
c.statMap.Set(statSnapshots, snapshotsStat)
|
||||
atomic.StoreInt64(&c.stats.DiskSizeBytes, int64(c.snapshotSize))
|
||||
atomic.StoreInt64(&c.stats.SnapshotCount, int64(c.snapshotAttempts))
|
||||
}
|
||||
|
|
|
@ -198,6 +198,15 @@ func (e *Engine) Format() tsdb.EngineFormat {
|
|||
return tsdb.TSM1Format
|
||||
}
|
||||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
func (e *Engine) Statistics(tags map[string]string) []models.Statistic {
|
||||
statistics := make([]models.Statistic, 0, 3)
|
||||
statistics = append(statistics, e.Cache.Statistics(tags)...)
|
||||
statistics = append(statistics, e.FileStore.Statistics(tags)...)
|
||||
statistics = append(statistics, e.WAL.Statistics(tags)...)
|
||||
return statistics
|
||||
}
|
||||
|
||||
// Open opens and initializes the engine.
|
||||
func (e *Engine) Open() error {
|
||||
e.done = make(chan struct{})
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
|
@ -13,10 +12,10 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
)
|
||||
|
||||
type TSMFile interface {
|
||||
|
@ -113,7 +112,7 @@ type FileStore struct {
|
|||
Logger *log.Logger
|
||||
traceLogging bool
|
||||
|
||||
statMap *expvar.Map
|
||||
stats *FileStoreStatistics
|
||||
|
||||
currentTempDirID int
|
||||
}
|
||||
|
@ -140,16 +139,11 @@ func (f FileStat) ContainsKey(key string) bool {
|
|||
}
|
||||
|
||||
func NewFileStore(dir string) *FileStore {
|
||||
db, rp := tsdb.DecodeStorePath(dir)
|
||||
return &FileStore{
|
||||
dir: dir,
|
||||
lastModified: time.Now(),
|
||||
Logger: log.New(os.Stderr, "[filestore] ", log.LstdFlags),
|
||||
statMap: influxdb.NewStatistics(
|
||||
"tsm1_filestore:"+dir,
|
||||
"tsm1_filestore",
|
||||
map[string]string{"path": dir, "database": db, "retentionPolicy": rp},
|
||||
),
|
||||
stats: &FileStoreStatistics{},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -159,6 +153,22 @@ func (f *FileStore) SetLogOutput(w io.Writer) {
|
|||
f.Logger = log.New(w, "[filestore] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// FileStoreStatistics keeps statistics about the file store.
|
||||
type FileStoreStatistics struct {
|
||||
DiskBytes int64
|
||||
}
|
||||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
func (f *FileStore) Statistics(tags map[string]string) []models.Statistic {
|
||||
return []models.Statistic{{
|
||||
Name: "tsm1_filestore",
|
||||
Tags: tags,
|
||||
Values: map[string]interface{}{
|
||||
statFileStoreBytes: atomic.LoadInt64(&f.stats.DiskBytes),
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
||||
// Returns the number of TSM files currently loaded
|
||||
func (f *FileStore) Count() int {
|
||||
f.mu.RLock()
|
||||
|
@ -192,7 +202,7 @@ func (f *FileStore) Add(files ...TSMFile) {
|
|||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
for _, file := range files {
|
||||
f.statMap.Add(statFileStoreBytes, int64(file.Size()))
|
||||
atomic.AddInt64(&f.stats.DiskBytes, int64(file.Size()))
|
||||
}
|
||||
f.files = append(f.files, files...)
|
||||
sort.Sort(tsmReaders(f.files))
|
||||
|
@ -217,7 +227,7 @@ func (f *FileStore) Remove(paths ...string) {
|
|||
active = append(active, file)
|
||||
} else {
|
||||
// Removing the file, remove the file size from the total file store bytes
|
||||
f.statMap.Add(statFileStoreBytes, -int64(file.Size()))
|
||||
atomic.AddInt64(&f.stats.DiskBytes, -int64(file.Size()))
|
||||
}
|
||||
}
|
||||
f.files = active
|
||||
|
@ -345,7 +355,7 @@ func (f *FileStore) Open() error {
|
|||
|
||||
// Accumulate file store size stat
|
||||
if fi, err := file.Stat(); err == nil {
|
||||
f.statMap.Add(statFileStoreBytes, fi.Size())
|
||||
atomic.AddInt64(&f.stats.DiskBytes, fi.Size())
|
||||
}
|
||||
|
||||
go func(idx int, file *os.File) {
|
||||
|
@ -502,9 +512,7 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
|
|||
for _, file := range f.files {
|
||||
totalSize += int64(file.Size())
|
||||
}
|
||||
sizeStat := new(expvar.Int)
|
||||
sizeStat.Set(totalSize)
|
||||
f.statMap.Set(statFileStoreBytes, sizeStat)
|
||||
atomic.StoreInt64(&f.stats.DiskBytes, totalSize)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -3,7 +3,6 @@ package tsm1
|
|||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
|
@ -14,11 +13,11 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -90,11 +89,11 @@ type WAL struct {
|
|||
// LoggingEnabled specifies if detailed logs should be output
|
||||
LoggingEnabled bool
|
||||
|
||||
statMap *expvar.Map
|
||||
// statistics for the WAL
|
||||
stats *WALStatistics
|
||||
}
|
||||
|
||||
func NewWAL(path string) *WAL {
|
||||
db, rp := tsdb.DecodeStorePath(path)
|
||||
return &WAL{
|
||||
path: path,
|
||||
|
||||
|
@ -103,12 +102,7 @@ func NewWAL(path string) *WAL {
|
|||
SegmentSize: DefaultSegmentSize,
|
||||
logger: log.New(os.Stderr, "[tsm1wal] ", log.LstdFlags),
|
||||
closing: make(chan struct{}),
|
||||
|
||||
statMap: influxdb.NewStatistics(
|
||||
"tsm1_wal:"+path,
|
||||
"tsm1_wal",
|
||||
map[string]string{"path": path, "database": db, "retentionPolicy": rp},
|
||||
),
|
||||
stats: &WALStatistics{},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -118,6 +112,24 @@ func (l *WAL) SetLogOutput(w io.Writer) {
|
|||
l.logger = log.New(w, "[tsm1wal] ", log.LstdFlags)
|
||||
}
|
||||
|
||||
// WALStatistics maintains statistics about the WAL.
|
||||
type WALStatistics struct {
|
||||
OldBytes int64
|
||||
CurrentBytes int64
|
||||
}
|
||||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
func (l *WAL) Statistics(tags map[string]string) []models.Statistic {
|
||||
return []models.Statistic{{
|
||||
Name: "tsm1_wal",
|
||||
Tags: tags,
|
||||
Values: map[string]interface{}{
|
||||
statWALOldBytes: atomic.LoadInt64(&l.stats.OldBytes),
|
||||
statWALCurrentBytes: atomic.LoadInt64(&l.stats.CurrentBytes),
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
||||
// Path returns the path the log was initialized with.
|
||||
func (l *WAL) Path() string {
|
||||
l.mu.RLock()
|
||||
|
@ -174,9 +186,7 @@ func (l *WAL) Open() error {
|
|||
|
||||
totalOldDiskSize += stat.Size()
|
||||
}
|
||||
sizeStat := new(expvar.Int)
|
||||
sizeStat.Set(totalOldDiskSize)
|
||||
l.statMap.Set(statWALOldBytes, sizeStat)
|
||||
atomic.StoreInt64(&l.stats.OldBytes, totalOldDiskSize)
|
||||
|
||||
l.closing = make(chan struct{})
|
||||
|
||||
|
@ -254,9 +264,7 @@ func (l *WAL) Remove(files []string) error {
|
|||
|
||||
totalOldDiskSize += stat.Size()
|
||||
}
|
||||
sizeStat := new(expvar.Int)
|
||||
sizeStat.Set(totalOldDiskSize)
|
||||
l.statMap.Set(statWALOldBytes, sizeStat)
|
||||
atomic.StoreInt64(&l.stats.OldBytes, totalOldDiskSize)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -303,9 +311,7 @@ func (l *WAL) writeToLog(entry WALEntry) (int, error) {
|
|||
}
|
||||
|
||||
// Update stats for current segment size
|
||||
curSize := new(expvar.Int)
|
||||
curSize.Set(int64(l.currentSegmentWriter.size))
|
||||
l.statMap.Set(statWALCurrentBytes, curSize)
|
||||
atomic.StoreInt64(&l.stats.CurrentBytes, int64(l.currentSegmentWriter.size))
|
||||
|
||||
l.lastWriteTime = time.Now()
|
||||
|
||||
|
@ -409,7 +415,7 @@ func (l *WAL) newSegmentFile() error {
|
|||
if err := l.currentSegmentWriter.close(); err != nil {
|
||||
return err
|
||||
}
|
||||
l.statMap.Add(statWALOldBytes, int64(l.currentSegmentWriter.size))
|
||||
atomic.StoreInt64(&l.stats.OldBytes, int64(l.currentSegmentWriter.size))
|
||||
}
|
||||
|
||||
fileName := filepath.Join(l.path, fmt.Sprintf("%s%05d.%s", WALFilePrefix, l.currentSegmentID, WALFileExtension))
|
||||
|
@ -420,9 +426,7 @@ func (l *WAL) newSegmentFile() error {
|
|||
l.currentSegmentWriter = NewWALSegmentWriter(fd)
|
||||
|
||||
// Reset the current segment size stat
|
||||
curSize := new(expvar.Int)
|
||||
curSize.Set(0)
|
||||
l.statMap.Set(statWALCurrentBytes, curSize)
|
||||
atomic.StoreInt64(&l.stats.CurrentBytes, 0)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
39
tsdb/meta.go
39
tsdb/meta.go
|
@ -1,13 +1,12 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"expvar"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/pkg/escape"
|
||||
|
@ -34,7 +33,7 @@ type DatabaseIndex struct {
|
|||
|
||||
name string // name of the database represented by this index
|
||||
|
||||
statMap *expvar.Map
|
||||
stats *IndexStatistics
|
||||
}
|
||||
|
||||
// NewDatabaseIndex returns a new initialized DatabaseIndex.
|
||||
|
@ -43,10 +42,28 @@ func NewDatabaseIndex(name string) *DatabaseIndex {
|
|||
measurements: make(map[string]*Measurement),
|
||||
series: make(map[string]*Series),
|
||||
name: name,
|
||||
statMap: influxdb.NewStatistics("database:"+name, "database", map[string]string{"database": name}),
|
||||
stats: &IndexStatistics{},
|
||||
}
|
||||
}
|
||||
|
||||
// IndexStatistics maintains statistics for the index.
|
||||
type IndexStatistics struct {
|
||||
NumSeries int64
|
||||
NumMeasurements int64
|
||||
}
|
||||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
func (d *DatabaseIndex) Statistics(tags map[string]string) []models.Statistic {
|
||||
return []models.Statistic{{
|
||||
Name: "database",
|
||||
Tags: models.Tags(map[string]string{"database": d.name}).Merge(tags),
|
||||
Values: map[string]interface{}{
|
||||
statDatabaseSeries: atomic.LoadInt64(&d.stats.NumSeries),
|
||||
statDatabaseMeasurements: atomic.LoadInt64(&d.stats.NumMeasurements),
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
||||
// Series returns a series by key.
|
||||
func (d *DatabaseIndex) Series(key string) *Series {
|
||||
d.mu.RLock()
|
||||
|
@ -139,7 +156,7 @@ func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, ser
|
|||
|
||||
m.AddSeries(series)
|
||||
|
||||
d.statMap.Add(statDatabaseSeries, 1)
|
||||
atomic.AddInt64(&d.stats.NumSeries, 1)
|
||||
d.mu.Unlock()
|
||||
|
||||
return series
|
||||
|
@ -168,7 +185,7 @@ func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurem
|
|||
if m == nil {
|
||||
m = NewMeasurement(name)
|
||||
d.measurements[name] = m
|
||||
d.statMap.Add(statDatabaseMeasurements, 1)
|
||||
atomic.AddInt64(&d.stats.NumMeasurements, 1)
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
@ -201,14 +218,14 @@ func (d *DatabaseIndex) UnassignShard(k string, shardID uint64) {
|
|||
if !ss.measurement.HasSeries() {
|
||||
d.mu.Lock()
|
||||
d.dropMeasurement(ss.measurement.Name)
|
||||
d.statMap.Add(statDatabaseMeasurements, int64(-1))
|
||||
atomic.AddInt64(&d.stats.NumMeasurements, -1)
|
||||
d.mu.Unlock()
|
||||
}
|
||||
|
||||
// Remove the series key from the series index
|
||||
d.mu.Lock()
|
||||
delete(d.series, k)
|
||||
d.statMap.Add(statDatabaseSeries, int64(-1))
|
||||
atomic.AddInt64(&d.stats.NumSeries, -1)
|
||||
d.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
@ -455,8 +472,8 @@ func (d *DatabaseIndex) dropMeasurement(name string) {
|
|||
delete(d.series, s.Key)
|
||||
}
|
||||
|
||||
d.statMap.Add(statDatabaseSeries, int64(-len(m.seriesByID)))
|
||||
d.statMap.Add(statDatabaseMeasurements, -1)
|
||||
atomic.AddInt64(&d.stats.NumSeries, int64(-len(m.seriesByID)))
|
||||
atomic.AddInt64(&d.stats.NumMeasurements, -1)
|
||||
}
|
||||
|
||||
// DropSeries removes the series keys and their tags from the index
|
||||
|
@ -488,7 +505,7 @@ func (d *DatabaseIndex) DropSeries(keys []string) {
|
|||
for mname := range mToDelete {
|
||||
d.dropMeasurement(mname)
|
||||
}
|
||||
d.statMap.Add(statDatabaseSeries, -nDeleted)
|
||||
atomic.AddInt64(&d.stats.NumSeries, -nDeleted)
|
||||
}
|
||||
|
||||
// Measurement represents a collection of time series in a database. It also contains in memory
|
||||
|
|
|
@ -2,7 +2,6 @@ package tsdb
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
|
@ -11,10 +10,10 @@ import (
|
|||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
internal "github.com/influxdata/influxdb/tsdb/internal"
|
||||
|
@ -97,7 +96,8 @@ type Shard struct {
|
|||
enabled bool
|
||||
|
||||
// expvar-based stats.
|
||||
statMap *expvar.Map
|
||||
stats *ShardStatistics
|
||||
statTags models.Tags
|
||||
|
||||
logger *log.Logger
|
||||
|
||||
|
@ -108,18 +108,7 @@ type Shard struct {
|
|||
|
||||
// NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index
|
||||
func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, options EngineOptions) *Shard {
|
||||
// Configure statistics collection.
|
||||
key := fmt.Sprintf("shard:%s:%d", path, id)
|
||||
db, rp := DecodeStorePath(path)
|
||||
tags := map[string]string{
|
||||
"path": path,
|
||||
"id": fmt.Sprintf("%d", id),
|
||||
"engine": options.EngineVersion,
|
||||
"database": db,
|
||||
"retentionPolicy": rp,
|
||||
}
|
||||
statMap := influxdb.NewStatistics(key, "shard", tags)
|
||||
|
||||
s := &Shard{
|
||||
index: index,
|
||||
id: id,
|
||||
|
@ -128,13 +117,21 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti
|
|||
options: options,
|
||||
closing: make(chan struct{}),
|
||||
|
||||
stats: &ShardStatistics{},
|
||||
statTags: map[string]string{
|
||||
"path": path,
|
||||
"id": fmt.Sprintf("%d", id),
|
||||
"database": db,
|
||||
"retentionPolicy": rp,
|
||||
},
|
||||
|
||||
database: db,
|
||||
retentionPolicy: rp,
|
||||
|
||||
statMap: statMap,
|
||||
LogOutput: os.Stderr,
|
||||
EnableOnOpen: true,
|
||||
}
|
||||
|
||||
s.SetLogOutput(os.Stderr)
|
||||
return s
|
||||
}
|
||||
|
@ -162,6 +159,37 @@ func (s *Shard) SetEnabled(enabled bool) {
|
|||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// ShardStatistics maintains statistics for a shard.
|
||||
type ShardStatistics struct {
|
||||
WriteReq int64
|
||||
SeriesCreated int64
|
||||
FieldsCreated int64
|
||||
WritePointsFail int64
|
||||
WritePointsOK int64
|
||||
BytesWritten int64
|
||||
DiskBytes int64
|
||||
}
|
||||
|
||||
// Statistics returns statistics for periodic monitoring.
|
||||
func (s *Shard) Statistics(tags map[string]string) []models.Statistic {
|
||||
tags = s.statTags.Merge(tags)
|
||||
statistics := []models.Statistic{{
|
||||
Name: "shard",
|
||||
Tags: models.Tags(tags).Merge(map[string]string{"engine": s.options.EngineVersion}),
|
||||
Values: map[string]interface{}{
|
||||
statWriteReq: atomic.LoadInt64(&s.stats.WriteReq),
|
||||
statSeriesCreate: atomic.LoadInt64(&s.stats.SeriesCreated),
|
||||
statFieldsCreate: atomic.LoadInt64(&s.stats.FieldsCreated),
|
||||
statWritePointsFail: atomic.LoadInt64(&s.stats.WritePointsFail),
|
||||
statWritePointsOK: atomic.LoadInt64(&s.stats.WritePointsOK),
|
||||
statWriteBytes: atomic.LoadInt64(&s.stats.BytesWritten),
|
||||
statDiskBytes: atomic.LoadInt64(&s.stats.DiskBytes),
|
||||
},
|
||||
}}
|
||||
statistics = append(statistics, s.engine.Statistics(tags)...)
|
||||
return statistics
|
||||
}
|
||||
|
||||
// Path returns the path set on the shard when it was created.
|
||||
func (s *Shard) Path() string { return s.path }
|
||||
|
||||
|
@ -200,7 +228,7 @@ func (s *Shard) Open() error {
|
|||
}
|
||||
|
||||
count := s.index.SeriesShardN(s.id)
|
||||
s.statMap.Add(statSeriesCreate, int64(count))
|
||||
atomic.AddInt64(&s.stats.SeriesCreated, int64(count))
|
||||
|
||||
s.engine = e
|
||||
|
||||
|
@ -318,13 +346,13 @@ func (s *Shard) WritePoints(points []models.Point) error {
|
|||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
s.statMap.Add(statWriteReq, 1)
|
||||
atomic.AddInt64(&s.stats.WriteReq, 1)
|
||||
|
||||
fieldsToCreate, err := s.validateSeriesAndFields(points)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.statMap.Add(statFieldsCreate, int64(len(fieldsToCreate)))
|
||||
atomic.AddInt64(&s.stats.FieldsCreated, int64(len(fieldsToCreate)))
|
||||
|
||||
// add any new fields and keep track of what needs to be saved
|
||||
if err := s.createFieldsAndMeasurements(fieldsToCreate); err != nil {
|
||||
|
@ -333,10 +361,10 @@ func (s *Shard) WritePoints(points []models.Point) error {
|
|||
|
||||
// Write to the engine.
|
||||
if err := s.engine.WritePoints(points); err != nil {
|
||||
s.statMap.Add(statWritePointsFail, 1)
|
||||
atomic.AddInt64(&s.stats.WritePointsFail, 1)
|
||||
return fmt.Errorf("engine: %s", err)
|
||||
}
|
||||
s.statMap.Add(statWritePointsOK, int64(len(points)))
|
||||
atomic.AddInt64(&s.stats.WritePointsOK, int64(len(points)))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -419,7 +447,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*FieldCreate,
|
|||
ss := s.index.Series(key)
|
||||
if ss == nil {
|
||||
ss = NewSeries(key, p.Tags())
|
||||
s.statMap.Add(statSeriesCreate, 1)
|
||||
atomic.AddInt64(&s.stats.SeriesCreated, 1)
|
||||
}
|
||||
|
||||
ss = s.index.CreateSeriesIndexIfNotExists(p.Name(), ss)
|
||||
|
@ -467,7 +495,7 @@ func (s *Shard) WriteTo(w io.Writer) (int64, error) {
|
|||
return 0, err
|
||||
}
|
||||
n, err := s.engine.WriteTo(w)
|
||||
s.statMap.Add(statWriteBytes, int64(n))
|
||||
atomic.AddInt64(&s.stats.BytesWritten, int64(n))
|
||||
return n, err
|
||||
}
|
||||
|
||||
|
@ -658,9 +686,7 @@ func (s *Shard) monitorSize() {
|
|||
s.logger.Printf("Error collecting shard size: %v", err)
|
||||
continue
|
||||
}
|
||||
sizeStat := new(expvar.Int)
|
||||
sizeStat.Set(size)
|
||||
s.statMap.Set(statDiskBytes, sizeStat)
|
||||
atomic.StoreInt64(&s.stats.DiskBytes, size)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,6 +69,25 @@ func (s *Store) SetLogOutput(w io.Writer) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Store) Statistics(tags map[string]string) []models.Statistic {
|
||||
var statistics []models.Statistic
|
||||
|
||||
s.mu.RLock()
|
||||
indexes := make([]models.Statistic, 0, len(s.databaseIndexes))
|
||||
for _, dbi := range s.databaseIndexes {
|
||||
indexes = append(indexes, dbi.Statistics(tags)...)
|
||||
}
|
||||
shards := s.shardsSlice()
|
||||
s.mu.RUnlock()
|
||||
|
||||
for _, shard := range shards {
|
||||
statistics = append(statistics, shard.Statistics(tags)...)
|
||||
}
|
||||
|
||||
statistics = append(statistics, indexes...)
|
||||
return statistics
|
||||
}
|
||||
|
||||
// Path returns the store's root path.
|
||||
func (s *Store) Path() string { return s.path }
|
||||
|
||||
|
|
Loading…
Reference in New Issue