Merge pull request #5087 from influxdb/usage-client
Remove registration, add 'usage-client'pull/5089/head
commit
69a7664f2d
|
@ -22,7 +22,6 @@ import (
|
||||||
"github.com/influxdb/influxdb/services/httpd"
|
"github.com/influxdb/influxdb/services/httpd"
|
||||||
"github.com/influxdb/influxdb/services/opentsdb"
|
"github.com/influxdb/influxdb/services/opentsdb"
|
||||||
"github.com/influxdb/influxdb/services/precreator"
|
"github.com/influxdb/influxdb/services/precreator"
|
||||||
"github.com/influxdb/influxdb/services/registration"
|
|
||||||
"github.com/influxdb/influxdb/services/retention"
|
"github.com/influxdb/influxdb/services/retention"
|
||||||
"github.com/influxdb/influxdb/services/subscriber"
|
"github.com/influxdb/influxdb/services/subscriber"
|
||||||
"github.com/influxdb/influxdb/services/udp"
|
"github.com/influxdb/influxdb/services/udp"
|
||||||
|
@ -35,7 +34,6 @@ type Config struct {
|
||||||
Data tsdb.Config `toml:"data"`
|
Data tsdb.Config `toml:"data"`
|
||||||
Cluster cluster.Config `toml:"cluster"`
|
Cluster cluster.Config `toml:"cluster"`
|
||||||
Retention retention.Config `toml:"retention"`
|
Retention retention.Config `toml:"retention"`
|
||||||
Registration registration.Config `toml:"registration"`
|
|
||||||
Precreator precreator.Config `toml:"shard-precreation"`
|
Precreator precreator.Config `toml:"shard-precreation"`
|
||||||
|
|
||||||
Admin admin.Config `toml:"admin"`
|
Admin admin.Config `toml:"admin"`
|
||||||
|
@ -62,7 +60,6 @@ func NewConfig() *Config {
|
||||||
c.Meta = meta.NewConfig()
|
c.Meta = meta.NewConfig()
|
||||||
c.Data = tsdb.NewConfig()
|
c.Data = tsdb.NewConfig()
|
||||||
c.Cluster = cluster.NewConfig()
|
c.Cluster = cluster.NewConfig()
|
||||||
c.Registration = registration.NewConfig()
|
|
||||||
c.Precreator = precreator.NewConfig()
|
c.Precreator = precreator.NewConfig()
|
||||||
|
|
||||||
c.Admin = admin.NewConfig()
|
c.Admin = admin.NewConfig()
|
||||||
|
|
|
@ -10,7 +10,6 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdb/enterprise-client/v1"
|
|
||||||
"github.com/influxdb/influxdb/cluster"
|
"github.com/influxdb/influxdb/cluster"
|
||||||
"github.com/influxdb/influxdb/meta"
|
"github.com/influxdb/influxdb/meta"
|
||||||
"github.com/influxdb/influxdb/monitor"
|
"github.com/influxdb/influxdb/monitor"
|
||||||
|
@ -23,13 +22,13 @@ import (
|
||||||
"github.com/influxdb/influxdb/services/httpd"
|
"github.com/influxdb/influxdb/services/httpd"
|
||||||
"github.com/influxdb/influxdb/services/opentsdb"
|
"github.com/influxdb/influxdb/services/opentsdb"
|
||||||
"github.com/influxdb/influxdb/services/precreator"
|
"github.com/influxdb/influxdb/services/precreator"
|
||||||
"github.com/influxdb/influxdb/services/registration"
|
|
||||||
"github.com/influxdb/influxdb/services/retention"
|
"github.com/influxdb/influxdb/services/retention"
|
||||||
"github.com/influxdb/influxdb/services/snapshotter"
|
"github.com/influxdb/influxdb/services/snapshotter"
|
||||||
"github.com/influxdb/influxdb/services/subscriber"
|
"github.com/influxdb/influxdb/services/subscriber"
|
||||||
"github.com/influxdb/influxdb/services/udp"
|
"github.com/influxdb/influxdb/services/udp"
|
||||||
"github.com/influxdb/influxdb/tcp"
|
"github.com/influxdb/influxdb/tcp"
|
||||||
"github.com/influxdb/influxdb/tsdb"
|
"github.com/influxdb/influxdb/tsdb"
|
||||||
|
"github.com/influxdb/usage-client/v1"
|
||||||
// Initialize the engine packages
|
// Initialize the engine packages
|
||||||
_ "github.com/influxdb/influxdb/tsdb/engine"
|
_ "github.com/influxdb/influxdb/tsdb/engine"
|
||||||
)
|
)
|
||||||
|
@ -158,7 +157,6 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
|
||||||
// Append services.
|
// Append services.
|
||||||
s.appendClusterService(c.Cluster)
|
s.appendClusterService(c.Cluster)
|
||||||
s.appendPrecreatorService(c.Precreator)
|
s.appendPrecreatorService(c.Precreator)
|
||||||
s.appendRegistrationService(c.Registration)
|
|
||||||
s.appendSnapshotterService()
|
s.appendSnapshotterService()
|
||||||
s.appendCopierService()
|
s.appendCopierService()
|
||||||
s.appendAdminService(c.Admin)
|
s.appendAdminService(c.Admin)
|
||||||
|
@ -296,21 +294,6 @@ func (s *Server) appendPrecreatorService(c precreator.Config) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) appendRegistrationService(c registration.Config) error {
|
|
||||||
if !c.Enabled {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
srv, err := registration.NewService(c, s.buildInfo.Version)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
srv.MetaStore = s.MetaStore
|
|
||||||
srv.Monitor = s.Monitor
|
|
||||||
s.Services = append(s.Services, srv)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) appendUDPService(c udp.Config) {
|
func (s *Server) appendUDPService(c udp.Config) {
|
||||||
if !c.Enabled {
|
if !c.Enabled {
|
||||||
return
|
return
|
||||||
|
|
|
@ -8,15 +8,6 @@
|
||||||
# Change this option to true to disable reporting.
|
# Change this option to true to disable reporting.
|
||||||
reporting-disabled = false
|
reporting-disabled = false
|
||||||
|
|
||||||
###
|
|
||||||
### Enterprise registration control
|
|
||||||
###
|
|
||||||
|
|
||||||
[registration]
|
|
||||||
# enabled = true
|
|
||||||
# url = "https://enterprise.influxdata.com" # The Enterprise server URL
|
|
||||||
# token = "" # Registration token for Enterprise server
|
|
||||||
|
|
||||||
###
|
###
|
||||||
### [meta]
|
### [meta]
|
||||||
###
|
###
|
||||||
|
|
|
@ -1,29 +0,0 @@
|
||||||
package registration
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/influxdb/influxdb/toml"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
defaultURL = "https://enterprise.influxdata.com"
|
|
||||||
defaultStatsInterval = time.Minute
|
|
||||||
)
|
|
||||||
|
|
||||||
// Config represents the configuration for the registration service.
|
|
||||||
type Config struct {
|
|
||||||
Enabled bool `toml:"enabled"`
|
|
||||||
URL string `toml:"url"`
|
|
||||||
Token string `toml:"token"`
|
|
||||||
StatsInterval toml.Duration `toml:"stats-interval"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewConfig returns an instance of Config with defaults.
|
|
||||||
func NewConfig() Config {
|
|
||||||
return Config{
|
|
||||||
Enabled: true,
|
|
||||||
URL: defaultURL,
|
|
||||||
StatsInterval: toml.Duration(defaultStatsInterval),
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,33 +0,0 @@
|
||||||
package registration_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/BurntSushi/toml"
|
|
||||||
"github.com/influxdb/influxdb/services/registration"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestConfig_Parse(t *testing.T) {
|
|
||||||
// Parse configuration.
|
|
||||||
var c registration.Config
|
|
||||||
if _, err := toml.Decode(`
|
|
||||||
enabled = true
|
|
||||||
url = "a.b.c"
|
|
||||||
token = "1234"
|
|
||||||
stats-interval = "1s"
|
|
||||||
`, &c); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Validate configuration.
|
|
||||||
if c.Enabled != true {
|
|
||||||
t.Fatalf("unexpected enabled state: %v", c.Enabled)
|
|
||||||
} else if c.URL != "a.b.c" {
|
|
||||||
t.Fatalf("unexpected Enterprise URL: %s", c.URL)
|
|
||||||
} else if c.Token != "1234" {
|
|
||||||
t.Fatalf("unexpected Enterprise URL: %s", c.URL)
|
|
||||||
} else if time.Duration(c.StatsInterval) != time.Second {
|
|
||||||
t.Fatalf("unexpected stats interval: %v", c.StatsInterval)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,210 +0,0 @@
|
||||||
package registration
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"net/url"
|
|
||||||
"os"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/influxdb/enterprise-client/v1"
|
|
||||||
"github.com/influxdb/influxdb/monitor"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Service represents the registration service.
|
|
||||||
type Service struct {
|
|
||||||
MetaStore interface {
|
|
||||||
ClusterID() (uint64, error)
|
|
||||||
NodeID() uint64
|
|
||||||
}
|
|
||||||
Monitor interface {
|
|
||||||
Statistics(tags map[string]string) ([]*monitor.Statistic, error)
|
|
||||||
RegisterDiagnosticsClient(name string, client monitor.DiagsClient)
|
|
||||||
}
|
|
||||||
|
|
||||||
enabled bool
|
|
||||||
url *url.URL
|
|
||||||
token string
|
|
||||||
statsInterval time.Duration
|
|
||||||
version string
|
|
||||||
mu sync.Mutex
|
|
||||||
lastContact time.Time
|
|
||||||
|
|
||||||
wg sync.WaitGroup
|
|
||||||
done chan struct{}
|
|
||||||
|
|
||||||
logger *log.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewService returns a configured registration service.
|
|
||||||
func NewService(c Config, version string) (*Service, error) {
|
|
||||||
url, err := url.Parse(c.URL)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &Service{
|
|
||||||
enabled: c.Enabled,
|
|
||||||
url: url,
|
|
||||||
token: c.Token,
|
|
||||||
statsInterval: time.Duration(c.StatsInterval),
|
|
||||||
version: version,
|
|
||||||
done: make(chan struct{}),
|
|
||||||
logger: log.New(os.Stderr, "[registration] ", log.LstdFlags),
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Open starts retention policy enforcement.
|
|
||||||
func (s *Service) Open() error {
|
|
||||||
if !s.enabled {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
s.logger.Println("Starting registration service")
|
|
||||||
if err := s.registerServer(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register diagnostics if a Monitor service is available.
|
|
||||||
if s.Monitor != nil {
|
|
||||||
s.Monitor.RegisterDiagnosticsClient("registration", s)
|
|
||||||
}
|
|
||||||
|
|
||||||
s.wg.Add(1)
|
|
||||||
go s.reportStats()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close stops retention policy enforcement.
|
|
||||||
func (s *Service) Close() error {
|
|
||||||
s.logger.Println("registration service terminating")
|
|
||||||
close(s.done)
|
|
||||||
s.wg.Wait()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetLogger sets the internal logger to the logger passed in.
|
|
||||||
func (s *Service) SetLogger(l *log.Logger) {
|
|
||||||
s.logger = l
|
|
||||||
}
|
|
||||||
|
|
||||||
// Diagnostics returns diagnostics information.
|
|
||||||
func (s *Service) Diagnostics() (*monitor.Diagnostic, error) {
|
|
||||||
diagnostics := map[string]interface{}{
|
|
||||||
"URL": s.url.String(),
|
|
||||||
"token": s.token,
|
|
||||||
"last_contact": s.getLastContact().String(),
|
|
||||||
}
|
|
||||||
|
|
||||||
return monitor.DiagnosticFromMap(diagnostics), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// registerServer registers the server.
|
|
||||||
func (s *Service) registerServer() error {
|
|
||||||
if !s.enabled || s.token == "" {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
cl := client.New(s.token)
|
|
||||||
cl.URL = s.url.String()
|
|
||||||
|
|
||||||
clusterID, err := s.MetaStore.ClusterID()
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Printf("failed to retrieve cluster ID for registration: %s", err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
hostname, err := os.Hostname()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
server := client.Server{
|
|
||||||
ClusterID: fmt.Sprintf("%d", clusterID),
|
|
||||||
ServerID: fmt.Sprintf("%d", s.MetaStore.NodeID()),
|
|
||||||
Host: hostname,
|
|
||||||
Product: "influxdb",
|
|
||||||
Version: s.version,
|
|
||||||
}
|
|
||||||
|
|
||||||
s.wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer s.wg.Done()
|
|
||||||
|
|
||||||
resp, err := cl.Save(server)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Printf("failed to register server with %s: received code %s, error: %s", s.url.String(), resp.Status, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
s.updateLastContact(time.Now().UTC())
|
|
||||||
}()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) reportStats() {
|
|
||||||
defer s.wg.Done()
|
|
||||||
if s.token == "" {
|
|
||||||
// No reporting, for now, without token.
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
cl := client.New(s.token)
|
|
||||||
cl.URL = s.url.String()
|
|
||||||
|
|
||||||
clusterID, err := s.MetaStore.ClusterID()
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Printf("failed to retrieve cluster ID for registration -- aborting stats upload: %s", err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
t := time.NewTicker(s.statsInterval)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-t.C:
|
|
||||||
stats, err := s.Monitor.Statistics(nil)
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Printf("failed to retrieve statistics: %s", err.Error())
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
st := client.Stats{
|
|
||||||
Product: "influxdb",
|
|
||||||
ClusterID: fmt.Sprintf("%d", clusterID),
|
|
||||||
ServerID: fmt.Sprintf("%d", s.MetaStore.NodeID()),
|
|
||||||
}
|
|
||||||
data := make([]client.StatsData, len(stats))
|
|
||||||
for i, x := range stats {
|
|
||||||
data[i] = client.StatsData{
|
|
||||||
Name: x.Name,
|
|
||||||
Tags: x.Tags,
|
|
||||||
Values: x.Values,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
st.Data = data
|
|
||||||
|
|
||||||
resp, err := cl.Save(st)
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Printf("failed to post statistics to Enterprise: repsonse code: %d: error: %s", resp.StatusCode, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
s.updateLastContact(time.Now().UTC())
|
|
||||||
|
|
||||||
case <-s.done:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) updateLastContact(t time.Time) {
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
s.lastContact = t
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) getLastContact() time.Time {
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
return s.lastContact
|
|
||||||
}
|
|
Loading…
Reference in New Issue