Merge pull request #6731 from influxdata/jl-monitor
Update monitor to allow setting the PointsWriterpull/6736/head
commit
9f9aa88a4a
|
@ -1,50 +0,0 @@
|
|||
package monitor
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/client/v2"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
)
|
||||
|
||||
// A remotePointsWriter implements the models.PointsWriter interface
|
||||
// but redirects points to be writte to a remote node, using an influx
|
||||
// client.
|
||||
type remotePointsWriter struct {
|
||||
client client.Client
|
||||
}
|
||||
|
||||
func newRemotePointsWriter(addr, user, password string) (*remotePointsWriter, error) {
|
||||
conf := client.HTTPConfig{
|
||||
Addr: addr,
|
||||
Username: user,
|
||||
Password: password,
|
||||
Timeout: 30 * time.Second,
|
||||
}
|
||||
clt, err := client.NewHTTPClient(conf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &remotePointsWriter{client: clt}, nil
|
||||
}
|
||||
|
||||
// WritePoints writes the provided points to a remote node via an
|
||||
// influx client over HTTP.
|
||||
func (w *remotePointsWriter) WritePoints(database, retentionPolicy string, points models.Points) error {
|
||||
conf := client.BatchPointsConfig{
|
||||
Database: database,
|
||||
RetentionPolicy: retentionPolicy,
|
||||
}
|
||||
|
||||
bp, err := client.NewBatchPoints(conf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, point := range points {
|
||||
bp.AddPoint(client.NewPointFrom(point))
|
||||
}
|
||||
|
||||
return w.client.Write(bp)
|
||||
}
|
|
@ -58,15 +58,16 @@ type Monitor struct {
|
|||
}
|
||||
|
||||
// Writer for pushing stats back into the database.
|
||||
// This causes a circular dependency if it depends on cluster directly so it
|
||||
// is wrapped in a simpler interface.
|
||||
PointsWriter interface {
|
||||
WritePoints(database, retentionPolicy string, points models.Points) error
|
||||
}
|
||||
PointsWriter PointsWriter
|
||||
|
||||
Logger *log.Logger
|
||||
}
|
||||
|
||||
// PointsWriter is a simplified interface for writing the points the monitor gathers
|
||||
type PointsWriter interface {
|
||||
WritePoints(database, retentionPolicy string, points models.Points) error
|
||||
}
|
||||
|
||||
// New returns a new instance of the monitor system.
|
||||
func New(c Config) *Monitor {
|
||||
return &Monitor{
|
||||
|
@ -163,28 +164,17 @@ type RemoteWriterConfig struct {
|
|||
ClusterID uint64
|
||||
}
|
||||
|
||||
// SetRemoteWriter can be used via and RPC call to set a remote location
|
||||
// for writing monitoring information.
|
||||
func (m *Monitor) SetRemoteWriter(c RemoteWriterConfig) error {
|
||||
// Ignore the monitor's config settings.
|
||||
// SetPointsWriter can be used to set a remote location for writing monitoring
|
||||
// information. This will use the default interval and database.
|
||||
func (m *Monitor) SetPointsWriter(pw PointsWriter) error {
|
||||
m.mu.Lock()
|
||||
m.storeEnabled = true
|
||||
m.storeInterval = DefaultStoreInterval
|
||||
m.storeDatabase = DefaultStoreDatabase
|
||||
m.mu.Unlock()
|
||||
|
||||
m.Logger.Printf("Setting monitor to write remotely via %s", c.RemoteAddr)
|
||||
clt, err := newRemotePointsWriter(c.RemoteAddr, c.Username, c.Password)
|
||||
if err != nil {
|
||||
return err
|
||||
if !m.storeEnabled {
|
||||
// not enabled, nothing to do
|
||||
return nil
|
||||
}
|
||||
|
||||
m.SetGlobalTag("nodeID", c.NodeID)
|
||||
m.SetGlobalTag("clusterID", c.ClusterID)
|
||||
|
||||
m.mu.Lock()
|
||||
m.PointsWriter = clt
|
||||
m.PointsWriter = pw
|
||||
m.mu.Unlock()
|
||||
|
||||
// Subsequent calls to an already open Monitor are just a no-op.
|
||||
return m.Open()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue