Periodic upload of stats to Enterprise
parent
667ad3342a
commit
878663e1e3
|
@ -305,6 +305,7 @@ func (s *Server) appendRegistrationService(c registration.Config) error {
|
|||
}
|
||||
|
||||
srv.MetaStore = s.MetaStore
|
||||
srv.Monitor = s.Monitor
|
||||
s.Services = append(s.Services, srv)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1,20 +1,27 @@
|
|||
package registration
|
||||
|
||||
import ()
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/toml"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultURL = "https://enterprise.influxdata.com"
|
||||
DefaultURL = "https://enterprise.influxdata.com"
|
||||
DefaultStatsInterval = time.Minute
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Enabled bool `toml:"enabled"`
|
||||
URL string `toml:"url"`
|
||||
Token string `toml:"token"`
|
||||
Enabled bool `toml:"enabled"`
|
||||
URL string `toml:"url"`
|
||||
Token string `toml:"token"`
|
||||
StatsInterval toml.Duration `toml:"stats-interval"`
|
||||
}
|
||||
|
||||
func NewConfig() Config {
|
||||
return Config{
|
||||
Enabled: true,
|
||||
URL: DefaultURL,
|
||||
Enabled: true,
|
||||
URL: DefaultURL,
|
||||
StatsInterval: toml.Duration(DefaultStatsInterval),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package registration_test
|
|||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
"github.com/influxdb/influxdb/services/registration"
|
||||
|
@ -14,6 +15,7 @@ func TestConfig_Parse(t *testing.T) {
|
|||
enabled = true
|
||||
url = "a.b.c"
|
||||
token = "1234"
|
||||
stats-interval = "1s"
|
||||
`, &c); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -25,5 +27,7 @@ token = "1234"
|
|||
t.Fatalf("unexpected Enterprise URL: %s", c.URL)
|
||||
} else if c.Token != "1234" {
|
||||
t.Fatalf("unexpected Enterprise URL: %s", c.URL)
|
||||
} else if time.Duration(c.StatsInterval) != time.Second {
|
||||
t.Fatalf("unexpected stats interval: %v", c.StatsInterval)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,6 +11,8 @@ import (
|
|||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/monitor"
|
||||
)
|
||||
|
||||
// Service represents the registration service.
|
||||
|
@ -19,11 +21,15 @@ type Service struct {
|
|||
ClusterID() (uint64, error)
|
||||
NodeID() uint64
|
||||
}
|
||||
Monitor interface {
|
||||
Statistics(tags map[string]string) ([]*monitor.Statistic, error)
|
||||
}
|
||||
|
||||
enabled bool
|
||||
url *url.URL
|
||||
token string
|
||||
version string
|
||||
enabled bool
|
||||
url *url.URL
|
||||
token string
|
||||
statsInterval time.Duration
|
||||
version string
|
||||
|
||||
wg sync.WaitGroup
|
||||
done chan struct{}
|
||||
|
@ -39,12 +45,13 @@ func NewService(c Config, version string) (*Service, error) {
|
|||
}
|
||||
|
||||
return &Service{
|
||||
enabled: c.Enabled,
|
||||
url: url,
|
||||
token: c.Token,
|
||||
version: version,
|
||||
done: make(chan struct{}),
|
||||
logger: log.New(os.Stderr, "[registration] ", log.LstdFlags),
|
||||
enabled: c.Enabled,
|
||||
url: url,
|
||||
token: c.Token,
|
||||
statsInterval: time.Duration(c.StatsInterval),
|
||||
version: version,
|
||||
done: make(chan struct{}),
|
||||
logger: log.New(os.Stderr, "[registration] ", log.LstdFlags),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -55,6 +62,9 @@ func (s *Service) Open() error {
|
|||
return err
|
||||
}
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.reportStats()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -116,3 +126,55 @@ func (s *Service) registerServer() error {
|
|||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) reportStats() {
|
||||
defer s.wg.Done()
|
||||
if s.token == "" {
|
||||
// No reporting, for now, without token.
|
||||
return
|
||||
}
|
||||
statsURL := fmt.Sprintf("%s/api/v1/stats/influxdb?token=%s", s.url.String(), s.token)
|
||||
|
||||
clusterID, err := s.MetaStore.ClusterID()
|
||||
if err != nil {
|
||||
s.logger.Printf("failed to retrieve cluster ID for registration -- aborting stats upload: %s", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
t := time.NewTicker(s.statsInterval)
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
stats, err := s.Monitor.Statistics(nil)
|
||||
if err != nil {
|
||||
s.logger.Printf("failed to retrieve statistics: %s", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
o := map[string]interface{}{
|
||||
"cluster_id": fmt.Sprintf("%d", clusterID),
|
||||
"server_id": fmt.Sprintf("%d", s.MetaStore.NodeID()),
|
||||
"stats": stats,
|
||||
}
|
||||
b, err := json.Marshal(o)
|
||||
if err != nil {
|
||||
s.logger.Printf("failed to JSON-encode stats: %s", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
client := http.Client{Timeout: time.Duration(5 * time.Second)}
|
||||
resp, err := client.Post(statsURL, "application/json", bytes.NewBuffer(b))
|
||||
if err != nil {
|
||||
s.logger.Printf("failed to post statistics to %s: %s", statsURL, err.Error())
|
||||
continue
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
s.logger.Printf("failed to post statistics to %s: repsonse code: %d", statsURL, resp.StatusCode)
|
||||
continue
|
||||
}
|
||||
case <-s.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue