Add remote monitor writer

pull/6551/head
Edd Robinson 2016-05-04 00:34:17 +01:00
parent 81dbbf570f
commit 0b338dddd8
9 changed files with 262 additions and 93 deletions

View File

@ -398,12 +398,12 @@ func (m *ExpandSourcesResponse) GetErr() string {
}
type RemoteMonitorRequest struct {
RemoteAddr *string `protobuf:"bytes,1,req,name=RemoteAddr" json:"RemoteAddr,omitempty"`
NodeAddr *string `protobuf:"bytes,2,req,name=NodeAddr" json:"NodeAddr,omitempty"`
Username *string `protobuf:"bytes,3,req,name=Username" json:"Username,omitempty"`
Password *string `protobuf:"bytes,4,req,name=Password" json:"Password,omitempty"`
ClusterID *uint64 `protobuf:"varint,5,req,name=ClusterID" json:"ClusterID,omitempty"`
XXX_unrecognized []byte `json:"-"`
RemoteAddrs []string `protobuf:"bytes,1,rep,name=RemoteAddrs" json:"RemoteAddrs,omitempty"`
NodeID *string `protobuf:"bytes,2,req,name=NodeID" json:"NodeID,omitempty"`
Username *string `protobuf:"bytes,3,req,name=Username" json:"Username,omitempty"`
Password *string `protobuf:"bytes,4,req,name=Password" json:"Password,omitempty"`
ClusterID *uint64 `protobuf:"varint,5,req,name=ClusterID" json:"ClusterID,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *RemoteMonitorRequest) Reset() { *m = RemoteMonitorRequest{} }
@ -411,16 +411,16 @@ func (m *RemoteMonitorRequest) String() string { return proto.Compact
func (*RemoteMonitorRequest) ProtoMessage() {}
func (*RemoteMonitorRequest) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{13} }
func (m *RemoteMonitorRequest) GetRemoteAddr() string {
if m != nil && m.RemoteAddr != nil {
return *m.RemoteAddr
func (m *RemoteMonitorRequest) GetRemoteAddrs() []string {
if m != nil {
return m.RemoteAddrs
}
return ""
return nil
}
func (m *RemoteMonitorRequest) GetNodeAddr() string {
if m != nil && m.NodeAddr != nil {
return *m.NodeAddr
func (m *RemoteMonitorRequest) GetNodeID() string {
if m != nil && m.NodeID != nil {
return *m.NodeID
}
return ""
}
@ -482,36 +482,36 @@ func init() {
}
var fileDescriptorData = []byte{
// 483 bytes of a gzipped FileDescriptorProto
// 486 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x53, 0x61, 0x6b, 0x13, 0x41,
0x10, 0xa5, 0xb9, 0xc4, 0x7a, 0xd3, 0x68, 0xcd, 0xda, 0x24, 0xf7, 0x31, 0x1c, 0x0a, 0xf9, 0x14,
0xa5, 0x52, 0xa1, 0x08, 0x82, 0x24, 0x11, 0xaa, 0x36, 0x86, 0xa8, 0x08, 0x7e, 0x91, 0x35, 0x37,
0xe8, 0x42, 0x72, 0x7b, 0xee, 0x6c, 0xb0, 0xf9, 0xf7, 0xce, 0xee, 0xed, 0xb5, 0xe9, 0x55, 0xa1,
0xfd, 0x78, 0xef, 0xf6, 0xcd, 0x9b, 0x37, 0xf3, 0x06, 0x1e, 0xab, 0xdc, 0xa2, 0xc9, 0xe5, 0xea,
0x59, 0x26, 0xad, 0x1c, 0x15, 0x46, 0x5b, 0x2d, 0xf6, 0x97, 0xab, 0x0d, 0x31, 0x9c, 0x7e, 0x87,
0xce, 0x57, 0xa3, 0x2c, 0x7e, 0xfa, 0x25, 0x4d, 0xb6, 0xc0, 0xdf, 0x1b, 0x24, 0x2b, 0x0e, 0x61,
0xdf, 0x7f, 0x9f, 0x4d, 0x92, 0xbd, 0x41, 0x63, 0xd8, 0x14, 0x0f, 0xe1, 0xde, 0x5c, 0x73, 0x1d,
0x4a, 0x1a, 0x83, 0x68, 0xd8, 0x16, 0x8f, 0xe0, 0xfe, 0x84, 0x8b, 0xfd, 0x90, 0x84, 0x49, 0x34,
0xd8, 0x1b, 0xc6, 0xa2, 0x0f, 0x87, 0x0b, 0xb4, 0x98, 0x5b, 0xa5, 0xf3, 0xb9, 0x5e, 0xa9, 0xe5,
0x36, 0x69, 0xba, 0x1f, 0xe9, 0x0b, 0x10, 0xbb, 0x02, 0x54, 0xe8, 0x9c, 0x50, 0xb4, 0xa1, 0x39,
0xd6, 0x19, 0xfa, 0xf2, 0x2d, 0xa7, 0x77, 0x8e, 0x44, 0xf2, 0x27, 0x72, 0x7d, 0x47, 0x7a, 0x0d,
0xfd, 0xe9, 0x05, 0x2e, 0x37, 0x4c, 0xb3, 0xd2, 0xe2, 0x9a, 0xeb, 0x56, 0xbd, 0x75, 0x20, 0xbe,
0xc4, 0x3c, 0x3d, 0xbe, 0xd6, 0x4d, 0xc3, 0x21, 0xe9, 0x29, 0x24, 0x37, 0xf9, 0xb7, 0x93, 0x7e,
0x09, 0xdd, 0xb1, 0x41, 0x26, 0x9d, 0xf1, 0x78, 0xa4, 0xd5, 0xa6, 0x12, 0x66, 0x95, 0x30, 0x14,
0x62, 0x6e, 0xc4, 0x53, 0x39, 0x80, 0xe8, 0x63, 0x61, 0xbd, 0x64, 0x3b, 0xfd, 0x06, 0xbd, 0x3a,
0x2f, 0x08, 0xf2, 0xb3, 0xa9, 0x31, 0xcc, 0x71, 0x73, 0x62, 0xf5, 0xcf, 0xdb, 0xa2, 0xec, 0xb3,
0x25, 0x9e, 0x42, 0xcb, 0x35, 0x48, 0x7e, 0x88, 0x07, 0xc7, 0xbd, 0x51, 0x58, 0xcb, 0xa8, 0x2a,
0xe2, 0xff, 0xa6, 0xcf, 0xe1, 0xc1, 0x35, 0xc0, 0x2f, 0x08, 0x8d, 0x42, 0x9a, 0xf9, 0xb2, 0xd1,
0xe5, 0x82, 0x66, 0xde, 0x45, 0x94, 0xbe, 0x82, 0xde, 0x5b, 0x85, 0xab, 0x6c, 0xa2, 0xd8, 0x3b,
0xf1, 0x4e, 0xe8, 0xff, 0x36, 0x5c, 0x31, 0xbd, 0x31, 0x4b, 0xa4, 0x60, 0xe5, 0x1d, 0xf4, 0x6f,
0x90, 0x83, 0x17, 0xd6, 0xf1, 0xbf, 0x4a, 0x6e, 0x2c, 0x04, 0xc0, 0xd5, 0x2b, 0x1f, 0x8e, 0xb8,
0xf2, 0xeb, 0x73, 0x91, 0x1e, 0x43, 0xa7, 0xec, 0xf4, 0x3d, 0x6e, 0xe9, 0x96, 0xa3, 0x3c, 0x01,
0xb1, 0xcb, 0x09, 0xd2, 0x2c, 0x55, 0xa2, 0x1f, 0x14, 0x59, 0x6f, 0xbb, 0x5d, 0x49, 0x95, 0x9b,
0x3b, 0x85, 0xa3, 0xe9, 0x45, 0x21, 0xf3, 0x2c, 0xb8, 0xb9, 0x83, 0xe3, 0x13, 0xe8, 0xd6, 0xa8,
0x41, 0x74, 0xe7, 0xa5, 0xcb, 0x4b, 0x4d, 0x91, 0xe0, 0x68, 0x81, 0x6b, 0x6d, 0xf1, 0x5c, 0xe7,
0x6a, 0x27, 0x2a, 0xdc, 0x6a, 0x89, 0xbf, 0xc9, 0x32, 0x73, 0x15, 0xd2, 0x19, 0xc7, 0xce, 0x23,
0x8d, 0x0a, 0xf9, 0x42, 0xee, 0x32, 0xd7, 0xee, 0x88, 0x02, 0x32, 0x97, 0x44, 0x7f, 0xb4, 0xc9,
0xf8, 0x7a, 0x1c, 0xc2, 0x69, 0x1f, 0x97, 0x91, 0xe0, 0x5b, 0x6c, 0xb9, 0x5b, 0x4c, 0x9f, 0x40,
0xb7, 0x26, 0xfa, 0x8f, 0x9c, 0xfd, 0x0d, 0x00, 0x00, 0xff, 0xff, 0xba, 0x31, 0xc1, 0x11, 0xf6,
0x03, 0x00, 0x00,
0x10, 0xa5, 0xb9, 0xc4, 0x7a, 0x93, 0x68, 0xcd, 0xb6, 0x49, 0xee, 0x63, 0x38, 0x14, 0xf2, 0x29,
0x4a, 0xa5, 0x42, 0x11, 0x04, 0x49, 0x22, 0x54, 0x6d, 0x0c, 0x51, 0x11, 0xfc, 0x22, 0x6b, 0x6e,
0xd0, 0x85, 0xe4, 0xf6, 0xdc, 0xd9, 0x60, 0xf3, 0xef, 0x9d, 0xdd, 0xdb, 0xab, 0x69, 0xaa, 0x90,
0x7e, 0xbc, 0xb9, 0x7d, 0xf3, 0xe6, 0xbd, 0x79, 0x03, 0xc7, 0x2a, 0xb7, 0x68, 0x72, 0xb9, 0x7c,
0x9a, 0x49, 0x2b, 0x87, 0x85, 0xd1, 0x56, 0x8b, 0xc3, 0xc5, 0x72, 0x4d, 0x5c, 0x4e, 0xbf, 0x41,
0xfb, 0x8b, 0x51, 0x16, 0x3f, 0xfe, 0x94, 0x26, 0x9b, 0xe3, 0xaf, 0x35, 0x92, 0x15, 0x47, 0x70,
0xe8, 0xbf, 0x2f, 0xc6, 0xc9, 0x41, 0xbf, 0x36, 0xa8, 0x8b, 0x87, 0x70, 0x6f, 0xa6, 0xb9, 0x0f,
0x25, 0xb5, 0x7e, 0x34, 0x68, 0x89, 0x47, 0x70, 0x7f, 0xcc, 0xcd, 0xbe, 0x4b, 0xc2, 0x24, 0xea,
0x1f, 0x0c, 0x62, 0xd1, 0x83, 0xa3, 0x39, 0x5a, 0xcc, 0xad, 0xd2, 0xf9, 0x4c, 0x2f, 0xd5, 0x62,
0x93, 0xd4, 0xdd, 0x8f, 0xf4, 0x39, 0x88, 0x6d, 0x02, 0x2a, 0x74, 0x4e, 0x28, 0x5a, 0x50, 0x1f,
0xe9, 0x0c, 0x7d, 0xfb, 0x86, 0xe3, 0xbb, 0x44, 0x22, 0xf9, 0x03, 0xb9, 0xbf, 0x03, 0xbd, 0x82,
0xde, 0xe4, 0x0a, 0x17, 0x6b, 0x86, 0x59, 0x69, 0x71, 0xc5, 0x7d, 0xab, 0xd9, 0xda, 0x10, 0x5f,
0xd7, 0x3c, 0x3c, 0xbe, 0x31, 0x4d, 0xcd, 0x55, 0xd2, 0x73, 0x48, 0x6e, 0xe3, 0xf7, 0xa3, 0x7e,
0x01, 0x9d, 0x91, 0x41, 0x06, 0x5d, 0xb0, 0x3d, 0xd2, 0x6a, 0x53, 0x11, 0x33, 0x4b, 0x30, 0x85,
0x18, 0x1b, 0xb1, 0x2b, 0x4d, 0x88, 0x3e, 0x14, 0xd6, 0x53, 0xb6, 0xd2, 0xaf, 0xd0, 0xdd, 0xc5,
0x05, 0x42, 0x7e, 0x36, 0x31, 0x86, 0x31, 0xce, 0x27, 0x66, 0xff, 0xb4, 0x29, 0xca, 0x39, 0x1b,
0xe2, 0x09, 0x34, 0xdc, 0x80, 0xe4, 0x4d, 0x6c, 0x9e, 0x76, 0x87, 0x61, 0x2d, 0xc3, 0xaa, 0x89,
0xff, 0x9b, 0x3e, 0x83, 0x07, 0x37, 0x0a, 0x7e, 0x41, 0x68, 0x14, 0xd2, 0xd4, 0xb7, 0x8d, 0xae,
0x17, 0x34, 0xf5, 0x2a, 0xa2, 0xf4, 0x25, 0x74, 0xdf, 0x28, 0x5c, 0x66, 0x63, 0xc5, 0xda, 0x89,
0x77, 0x42, 0xff, 0x97, 0xe1, 0x9a, 0xe9, 0xb5, 0x59, 0x20, 0x05, 0x29, 0x6f, 0xa1, 0x77, 0x0b,
0x1c, 0xb4, 0x30, 0x8f, 0xff, 0x55, 0x62, 0x63, 0x21, 0x00, 0xfe, 0xbe, 0xf2, 0xe1, 0x88, 0x2b,
0xbd, 0x3e, 0x17, 0xe9, 0x29, 0xb4, 0xcb, 0x49, 0xdf, 0xe1, 0x86, 0xf6, 0xb4, 0xf2, 0x0c, 0xc4,
0x36, 0x26, 0x50, 0x33, 0x55, 0x59, 0x7d, 0xaf, 0xc8, 0x7a, 0xd9, 0xad, 0x8a, 0xaa, 0xdc, 0xdc,
0x39, 0x9c, 0x4c, 0xae, 0x0a, 0x99, 0x67, 0x41, 0xcd, 0x1d, 0x14, 0x9f, 0x41, 0x67, 0x07, 0x1a,
0x48, 0xb7, 0x5e, 0xba, 0xbc, 0xec, 0x30, 0x1a, 0x38, 0x99, 0xe3, 0x4a, 0x5b, 0xbc, 0xd4, 0xb9,
0xda, 0x8a, 0xca, 0x31, 0x34, 0xcb, 0xfa, 0xeb, 0x2c, 0x33, 0x95, 0x55, 0x6c, 0xdd, 0x94, 0x73,
0xc7, 0x37, 0x55, 0xab, 0x52, 0xfb, 0x99, 0xdc, 0x61, 0xae, 0xdc, 0x0d, 0x85, 0xca, 0x4c, 0x12,
0xfd, 0xd6, 0x26, 0xe3, 0xe3, 0x71, 0x15, 0x0e, 0xfb, 0xa8, 0x4c, 0x04, 0xc3, 0x1a, 0xee, 0x14,
0xd3, 0xc7, 0xd0, 0xd9, 0xe1, 0xfc, 0x47, 0xcc, 0xfe, 0x04, 0x00, 0x00, 0xff, 0xff, 0x76, 0x81,
0x9b, 0x7c, 0xf5, 0x03, 0x00, 0x00,
}

View File

@ -71,11 +71,11 @@ message ExpandSourcesResponse {
}
message RemoteMonitorRequest {
required string RemoteAddr = 1;
required string NodeAddr = 2;
required string Username = 3;
required string Password = 4;
required uint64 ClusterID = 5;
repeated string RemoteAddrs = 1;
required string NodeID = 2;
required string Username = 3;
required string Password = 4;
required uint64 ClusterID = 5;
}
message RemoteMonitorResponse {

View File

@ -498,6 +498,26 @@ type RemoteMonitorRequest struct {
pb internal.RemoteMonitorRequest
}
func (m *RemoteMonitorRequest) SetRemoteAddrs(s []string) {
m.pb.RemoteAddrs = s
}
func (m *RemoteMonitorRequest) SetNodeID(s string) {
m.pb.NodeID = &s
}
func (m *RemoteMonitorRequest) SetUsername(s string) {
m.pb.Username = &s
}
func (m *RemoteMonitorRequest) SetPassword(s string) {
m.pb.Password = &s
}
func (m *RemoteMonitorRequest) SetClusterID(v uint64) {
m.pb.ClusterID = &v
}
// MarshalBinary encodes the object to a binary format.
func (r *RemoteMonitorRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(&r.pb)

View File

@ -13,6 +13,8 @@ import (
"strings"
"sync"
"github.com/influxdata/influxdb/monitor"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/tsdb"
@ -70,6 +72,7 @@ type Service struct {
Listener net.Listener
TSDBStore TSDBStore
Monitor *monitor.Monitor
Logger *log.Logger
statMap *expvar.Map
@ -214,8 +217,16 @@ func (s *Service) handleConn(conn net.Conn) {
s.processSeriesKeysRequest(conn)
return
case remoteMonitorRequestMessage:
s.processRemoteMonitorRequest(conn)
return
buf, err := ReadLV(conn)
if err != nil {
s.Logger.Printf("unable to read length-value: %s", err)
return
}
if err = s.processRemoteMonitorRequest(buf); err != nil {
s.Logger.Printf("process write shard error: %s", err)
}
s.writeRemoteMonitorResponse(conn, err)
default:
s.Logger.Printf("cluster service message type not found: %d", typ)
}
@ -497,21 +508,44 @@ func (s *Service) processSeriesKeysRequest(conn net.Conn) {
}
}
func (s *Service) processRemoteMonitorRequest(conn net.Conn) {
func (s *Service) processRemoteMonitorRequest(buf []byte) error {
// Unmarshal the request.
var req RemoteMonitorRequest
if err := DecodeLV(conn, &req); err != nil {
s.Logger.Printf("error reading RemoteMonitor request: %s", err)
EncodeTLV(conn, remoteMonitorResponseMessage, &RemoteMonitorResponse{Err: err})
return
if err := req.UnmarshalBinary(buf); err != nil {
return err
}
// Process the request
var err error
var remoteAddr string
if len(req.pb.GetRemoteAddrs()) > 0 {
remoteAddr = req.pb.GetRemoteAddrs()[0]
}
return s.Monitor.SetRemoteWriter(monitor.RemoteWriterConfig{
RemoteAddr: remoteAddr,
NodeID: req.pb.GetNodeID(),
Username: req.pb.GetUsername(),
Password: req.pb.GetPassword(),
ClusterID: req.pb.GetClusterID(),
})
}
// Encode response.
if e := EncodeTLV(conn, remoteMonitorResponseMessage, &RemoteMonitorResponse{Err: err}); e != nil {
s.Logger.Printf("error writing RemoteMonitor response: %v", e)
func (s *Service) writeRemoteMonitorResponse(w io.Writer, e error) {
// Build response.
var resp RemoteMonitorResponse
if e != nil {
resp.Err = e
}
// Marshal response to binary.
buf, err := resp.MarshalBinary()
if err != nil {
s.Logger.Printf("error marshalling remote monitor response: %s", err)
return
}
// Write to connection.
if err := WriteTLV(w, remoteMonitorResponseMessage, buf); err != nil {
s.Logger.Printf("write remote monitor response error: %s", err)
}
}

View File

@ -126,6 +126,10 @@ func (c *Config) Validate() error {
return err
}
if err := c.Monitor.Validate(); err != nil {
return err
}
for _, g := range c.GraphiteInputs {
if err := g.Validate(); err != nil {
return fmt.Errorf("invalid graphite config: %v", err)

View File

@ -202,6 +202,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
func (s *Server) appendClusterService(c cluster.Config) {
srv := cluster.NewService(c)
srv.TSDBStore = cluster.LocalTSDBStore{Store: s.TSDBStore}
srv.Monitor = s.Monitor
s.Services = append(s.Services, srv)
s.ClusterService = srv
}

View File

@ -1,6 +1,7 @@
package monitor
import (
"errors"
"time"
"github.com/influxdata/influxdb/toml"
@ -33,3 +34,11 @@ func NewConfig() Config {
StoreInterval: toml.Duration(DefaultStoreInterval),
}
}
// Validate validates that the configuration is acceptable.
func (c Config) Validate() error {
if c.StoreInterval <= 0 {
return errors.New("monitor store interval must be positive")
}
return nil
}

50
monitor/remote_writer.go Normal file
View File

@ -0,0 +1,50 @@
package monitor
import (
"time"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/models"
)
// A remotePointsWriter implements the models.PointsWriter interface
// but redirects points to be writte to a remote node, using an influx
// client.
type remotePointsWriter struct {
client client.Client
}
func newRemotePointsWriter(addr, user, password string) (*remotePointsWriter, error) {
conf := client.HTTPConfig{
Addr: addr,
Username: user,
Password: password,
Timeout: 30 * time.Second,
}
clt, err := client.NewHTTPClient(conf)
if err != nil {
return nil, err
}
return &remotePointsWriter{client: clt}, nil
}
// WritePoints writes the provided points to a remote node via an
// influx client over HTTP.
func (w *remotePointsWriter) WritePoints(database, retentionPolicy string, points models.Points) error {
conf := client.BatchPointsConfig{
Database: database,
RetentionPolicy: retentionPolicy,
}
bp, err := client.NewBatchPoints(conf)
if err != nil {
return err
}
for _, point := range points {
bp.AddPoint(client.NewPointFrom(point))
}
return w.client.Write(bp)
}

View File

@ -2,6 +2,7 @@ package monitor // import "github.com/influxdata/influxdb/monitor"
import (
"expvar"
"fmt"
"io"
"log"
"os"
@ -36,9 +37,8 @@ type Monitor struct {
wg sync.WaitGroup
mu sync.Mutex
globalTags map[string]string
diagRegistrations map[string]diagnostics.Client
clusterID string
nodeAddr string
done chan struct{}
storeCreated bool
storeEnabled bool
@ -51,7 +51,6 @@ type Monitor struct {
storeInterval time.Duration
MetaClient interface {
ClusterID() uint64
CreateDatabase(name string) (*meta.DatabaseInfo, error)
CreateRetentionPolicy(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
SetDefaultRetentionPolicy(database, name string) error
@ -71,7 +70,7 @@ type Monitor struct {
// New returns a new instance of the monitor system.
func New(c Config) *Monitor {
return &Monitor{
done: make(chan struct{}),
globalTags: make(map[string]string),
diagRegistrations: make(map[string]diagnostics.Client),
storeEnabled: c.StoreEnabled,
storeDatabase: c.StoreDatabase,
@ -81,9 +80,20 @@ func New(c Config) *Monitor {
}
}
func (m *Monitor) open() bool {
m.mu.Lock()
defer m.mu.Unlock()
return m.done != nil
}
// Open opens the monitoring system, using the given clusterID, node ID, and hostname
// for identification purpose.
func (m *Monitor) Open() error {
if m.open() {
m.Logger.Println("Monitor is already open")
return nil
}
m.Logger.Printf("Starting monitor system")
// Self-register various stats and diagnostics.
@ -113,6 +123,11 @@ func (m *Monitor) Open() error {
// Close closes the monitor system.
func (m *Monitor) Close() error {
if !m.open() {
m.Logger.Println("Monitor is already closed.")
return nil
}
m.Logger.Println("shutting down monitor system")
m.mu.Lock()
close(m.done)
@ -131,6 +146,49 @@ func (m *Monitor) Close() error {
return nil
}
// SetGlobalTag can be used to set tags that will appear on all points
// written by the Monitor.
func (m *Monitor) SetGlobalTag(key string, value interface{}) {
m.mu.Lock()
m.globalTags[key] = fmt.Sprintf("%v", value)
m.mu.Unlock()
}
// RemoteWriterConfig represents the configuration of a remote writer
type RemoteWriterConfig struct {
RemoteAddr string
NodeID string
Username string
Password string
ClusterID uint64
}
// SetRemoteWriter can be used via and RPC call to set a remote location
// for writing monitoring information.
func (m *Monitor) SetRemoteWriter(c RemoteWriterConfig) error {
// Ignore the monitor's config settings.
m.mu.Lock()
m.storeEnabled = true
m.storeInterval = DefaultStoreInterval
m.storeDatabase = DefaultStoreDatabase
m.mu.Unlock()
m.Logger.Printf("Setting monitor to write remotely via %s", c.RemoteAddr)
clt, err := newRemotePointsWriter(c.RemoteAddr, c.Username, c.Password)
if err != nil {
return err
}
m.SetGlobalTag("nodeID", c.NodeID)
m.SetGlobalTag("clusterID", c.ClusterID)
m.mu.Lock()
m.PointsWriter = clt
m.mu.Unlock()
// Subsequent calls to an already open Monitor are just a no-op.
return m.Open()
}
// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (m *Monitor) SetLogOutput(w io.Writer) {
@ -326,19 +384,9 @@ func (m *Monitor) storeStatistics() {
m.storeDatabase, m.storeRetentionPolicy, m.storeInterval)
hostname, _ := os.Hostname()
clusterTags := map[string]string{
"hostname": hostname,
}
m.SetGlobalTag("hostname", hostname)
m.mu.Lock()
if m.clusterID != "" {
clusterTags["clusterID"] = m.clusterID
}
if m.nodeAddr != "" {
clusterTags["nodeAddr"] = m.nodeAddr
}
tick := time.NewTicker(m.storeInterval)
m.mu.Unlock()
@ -346,29 +394,32 @@ func (m *Monitor) storeStatistics() {
for {
select {
case <-tick.C:
m.mu.Lock()
m.createInternalStorage()
func() {
m.mu.Lock()
defer m.mu.Unlock()
stats, err := m.Statistics(clusterTags)
if err != nil {
m.Logger.Printf("failed to retrieve registered statistics: %s", err)
continue
}
m.createInternalStorage()
points := make(models.Points, 0, len(stats))
for _, s := range stats {
pt, err := models.NewPoint(s.Name, s.Tags, s.Values, time.Now().Truncate(time.Second))
stats, err := m.Statistics(m.globalTags)
if err != nil {
m.Logger.Printf("Dropping point %v: %v", s.Name, err)
continue
m.Logger.Printf("failed to retrieve registered statistics: %s", err)
return
}
points = append(points, pt)
}
if err := m.PointsWriter.WritePoints(m.storeDatabase, m.storeRetentionPolicy, points); err != nil {
m.Logger.Printf("failed to store statistics: %s", err)
}
m.mu.Unlock()
points := make(models.Points, 0, len(stats))
for _, s := range stats {
pt, err := models.NewPoint(s.Name, s.Tags, s.Values, time.Now().Truncate(time.Second))
if err != nil {
m.Logger.Printf("Dropping point %v: %v", s.Name, err)
return
}
points = append(points, pt)
}
if err := m.PointsWriter.WritePoints(m.storeDatabase, m.storeRetentionPolicy, points); err != nil {
m.Logger.Printf("failed to store statistics: %s", err)
}
}()
case <-m.done:
m.Logger.Printf("terminating storage of statistics")
return