graphite service should retry database creation
parent
73c267abc1
commit
168c91cc67
|
@ -707,3 +707,11 @@ func TestApplyTemplateFieldError(t *testing.T) {
|
|||
"'field' can only be used once in each template: current.users.logged_in")
|
||||
}
|
||||
}
|
||||
|
||||
// Test Helpers
|
||||
func errstr(err error) string {
|
||||
if err != nil {
|
||||
return err.Error()
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
|
|
@ -45,8 +45,6 @@ func (c *tcpConnection) Close() {
|
|||
|
||||
// Service represents a Graphite service.
|
||||
type Service struct {
|
||||
mu sync.Mutex
|
||||
|
||||
bindAddress string
|
||||
database string
|
||||
retentionPolicy string
|
||||
|
@ -71,8 +69,11 @@ type Service struct {
|
|||
addr net.Addr
|
||||
udpConn *net.UDPConn
|
||||
|
||||
wg sync.WaitGroup
|
||||
done chan struct{}
|
||||
wg sync.WaitGroup
|
||||
|
||||
mu sync.RWMutex
|
||||
ready bool // Has the required database been created?
|
||||
done chan struct{} // Is the service closing or closed?
|
||||
|
||||
Monitor interface {
|
||||
RegisterDiagnosticsClient(name string, client diagnostics.Client)
|
||||
|
@ -141,21 +142,6 @@ func (s *Service) Open() error {
|
|||
s.Monitor.RegisterDiagnosticsClient(s.diagsKey, s)
|
||||
}
|
||||
|
||||
if db := s.MetaClient.Database(s.database); db != nil {
|
||||
if rp, _ := s.MetaClient.RetentionPolicy(s.database, s.retentionPolicy); rp == nil {
|
||||
spec := meta.RetentionPolicySpec{Name: s.retentionPolicy}
|
||||
if _, err := s.MetaClient.CreateRetentionPolicy(s.database, &spec); err != nil {
|
||||
s.logger.Printf("Failed to ensure target retention policy %s exists: %s", s.database, err.Error())
|
||||
}
|
||||
}
|
||||
} else {
|
||||
spec := meta.RetentionPolicySpec{Name: s.retentionPolicy}
|
||||
if _, err := s.MetaClient.CreateDatabaseWithRetentionPolicy(s.database, &spec); err != nil {
|
||||
s.logger.Printf("Failed to ensure target database %s exists: %s", s.database, err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchPending, s.batchTimeout)
|
||||
s.batcher.Start()
|
||||
|
||||
|
@ -236,6 +222,36 @@ func (s *Service) closed() bool {
|
|||
return s.done == nil
|
||||
}
|
||||
|
||||
// createInternalStorage ensures that the required database has been created.
|
||||
func (s *Service) createInternalStorage() error {
|
||||
s.mu.RLock()
|
||||
ready := s.ready
|
||||
s.mu.RUnlock()
|
||||
if ready {
|
||||
return nil
|
||||
}
|
||||
|
||||
if db := s.MetaClient.Database(s.database); db != nil {
|
||||
if rp, _ := s.MetaClient.RetentionPolicy(s.database, s.retentionPolicy); rp == nil {
|
||||
spec := meta.RetentionPolicySpec{Name: s.retentionPolicy}
|
||||
if _, err := s.MetaClient.CreateRetentionPolicy(s.database, &spec); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
spec := meta.RetentionPolicySpec{Name: s.retentionPolicy}
|
||||
if _, err := s.MetaClient.CreateDatabaseWithRetentionPolicy(s.database, &spec); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// The service is now ready.
|
||||
s.mu.Lock()
|
||||
s.ready = true
|
||||
s.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetLogOutput sets the writer to which all logs are written. It must not be
|
||||
// called after Open is called.
|
||||
func (s *Service) SetLogOutput(w io.Writer) {
|
||||
|
@ -422,6 +438,12 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
|
|||
for {
|
||||
select {
|
||||
case batch := <-batcher.Out():
|
||||
// Will attempt to create database if not yet created.
|
||||
if err := s.createInternalStorage(); err != nil {
|
||||
s.logger.Printf("Required database or retention policy do not yet exist: %s", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
if err := s.PointsWriter.WritePoints(s.database, s.retentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
|
||||
atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
|
||||
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package graphite_test
|
||||
package graphite
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
|
@ -10,61 +11,135 @@ import (
|
|||
|
||||
"github.com/influxdata/influxdb/internal"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/services/graphite"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
"github.com/influxdata/influxdb/toml"
|
||||
)
|
||||
|
||||
func Test_Service_OpenClose(t *testing.T) {
|
||||
c := graphite.Config{BindAddress: ":35422"}
|
||||
service := NewService(&c)
|
||||
c := Config{BindAddress: ":35422"}
|
||||
service := NewTestService(&c)
|
||||
|
||||
// Closing a closed service is fine.
|
||||
if err := service.GraphiteService.Close(); err != nil {
|
||||
if err := service.Service.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Closing a closed service again is fine.
|
||||
if err := service.GraphiteService.Close(); err != nil {
|
||||
if err := service.Service.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := service.GraphiteService.Open(); err != nil {
|
||||
if err := service.Service.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Opening an already open service is fine.
|
||||
if err := service.GraphiteService.Open(); err != nil {
|
||||
if err := service.Service.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Reopening a previously opened service is fine.
|
||||
if err := service.GraphiteService.Close(); err != nil {
|
||||
if err := service.Service.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := service.GraphiteService.Open(); err != nil {
|
||||
if err := service.Service.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Tidy up.
|
||||
if err := service.GraphiteService.Close(); err != nil {
|
||||
if err := service.Service.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestService_CreatesDatabase(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s := NewTestService(nil)
|
||||
s.WritePointsFn = func(string, string, models.ConsistencyLevel, []models.Point) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
called := make(chan struct{})
|
||||
s.MetaClient.CreateDatabaseWithRetentionPolicyFn = func(name string, _ *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) {
|
||||
if name != s.Service.database {
|
||||
t.Errorf("\n\texp = %s\n\tgot = %s\n", s.Service.database, name)
|
||||
}
|
||||
// Allow some time for the caller to return and the ready status to
|
||||
// be set.
|
||||
time.AfterFunc(10*time.Millisecond, func() { called <- struct{}{} })
|
||||
return nil, errors.New("an error")
|
||||
}
|
||||
|
||||
if err := s.Service.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
points, err := models.ParsePointsString(`cpu value=1`)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s.Service.batcher.In() <- points[0] // Send a point.
|
||||
s.Service.batcher.Flush()
|
||||
select {
|
||||
case <-called:
|
||||
// OK
|
||||
case <-time.NewTimer(5 * time.Second).C:
|
||||
t.Fatal("Service should have attempted to create database")
|
||||
}
|
||||
|
||||
// ready status should not have been switched due to meta client error.
|
||||
s.Service.mu.RLock()
|
||||
ready := s.Service.ready
|
||||
s.Service.mu.RUnlock()
|
||||
|
||||
if got, exp := ready, false; got != exp {
|
||||
t.Fatalf("got %v, expected %v", got, exp)
|
||||
}
|
||||
|
||||
// This time MC won't cause an error.
|
||||
s.MetaClient.CreateDatabaseWithRetentionPolicyFn = func(name string, _ *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) {
|
||||
// Allow some time for the caller to return and the ready status to
|
||||
// be set.
|
||||
time.AfterFunc(10*time.Millisecond, func() { called <- struct{}{} })
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
s.Service.batcher.In() <- points[0] // Send a point.
|
||||
s.Service.batcher.Flush()
|
||||
select {
|
||||
case <-called:
|
||||
// OK
|
||||
case <-time.NewTimer(5 * time.Second).C:
|
||||
t.Fatal("Service should have attempted to create database")
|
||||
}
|
||||
|
||||
// ready status should now be true.
|
||||
s.Service.mu.RLock()
|
||||
ready = s.Service.ready
|
||||
s.Service.mu.RUnlock()
|
||||
|
||||
if got, exp := ready, true; got != exp {
|
||||
t.Fatalf("got %v, expected %v", got, exp)
|
||||
}
|
||||
|
||||
s.Service.Close()
|
||||
}
|
||||
|
||||
func Test_Service_TCP(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
now := time.Now().UTC().Round(time.Second)
|
||||
|
||||
config := graphite.Config{}
|
||||
config := Config{}
|
||||
config.Database = "graphitedb"
|
||||
config.BatchSize = 0 // No batching.
|
||||
config.BatchTimeout = toml.Duration(time.Second)
|
||||
config.BindAddress = ":0"
|
||||
|
||||
service := NewService(&config)
|
||||
service := NewTestService(&config)
|
||||
|
||||
// Allow test to wait until points are written.
|
||||
var wg sync.WaitGroup
|
||||
|
@ -91,25 +166,12 @@ func Test_Service_TCP(t *testing.T) {
|
|||
return nil
|
||||
}
|
||||
|
||||
var created bool
|
||||
service.MetaClient.CreateDatabaseWithRetentionPolicyFn = func(db string, _ *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) {
|
||||
if db != config.Database {
|
||||
t.Fatalf("got %s, expected %s", db, config.Database)
|
||||
}
|
||||
created = true
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if err := service.GraphiteService.Open(); err != nil {
|
||||
if err := service.Service.Open(); err != nil {
|
||||
t.Fatalf("failed to open Graphite service: %s", err.Error())
|
||||
}
|
||||
|
||||
if !created {
|
||||
t.Fatalf("failed to create target database")
|
||||
}
|
||||
|
||||
// Connect to the graphite endpoint we just spun up
|
||||
_, port, _ := net.SplitHostPort(service.GraphiteService.Addr().String())
|
||||
_, port, _ := net.SplitHostPort(service.Service.Addr().String())
|
||||
conn, err := net.Dial("tcp", "127.0.0.1:"+port)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -134,14 +196,14 @@ func Test_Service_UDP(t *testing.T) {
|
|||
|
||||
now := time.Now().UTC().Round(time.Second)
|
||||
|
||||
config := graphite.Config{}
|
||||
config := Config{}
|
||||
config.Database = "graphitedb"
|
||||
config.BatchSize = 0 // No batching.
|
||||
config.BatchTimeout = toml.Duration(time.Second)
|
||||
config.BindAddress = ":10000"
|
||||
config.Protocol = "udp"
|
||||
|
||||
service := NewService(&config)
|
||||
service := NewTestService(&config)
|
||||
|
||||
// Allow test to wait until points are written.
|
||||
var wg sync.WaitGroup
|
||||
|
@ -165,25 +227,12 @@ func Test_Service_UDP(t *testing.T) {
|
|||
return nil
|
||||
}
|
||||
|
||||
var created bool
|
||||
service.MetaClient.CreateDatabaseWithRetentionPolicyFn = func(db string, _ *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) {
|
||||
if db != config.Database {
|
||||
t.Fatalf("got %s, expected %s", db, config.Database)
|
||||
}
|
||||
created = true
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if err := service.GraphiteService.Open(); err != nil {
|
||||
if err := service.Service.Open(); err != nil {
|
||||
t.Fatalf("failed to open Graphite service: %s", err.Error())
|
||||
}
|
||||
|
||||
if !created {
|
||||
t.Fatalf("failed to create target database")
|
||||
}
|
||||
|
||||
// Connect to the graphite endpoint we just spun up
|
||||
_, port, _ := net.SplitHostPort(service.GraphiteService.Addr().String())
|
||||
_, port, _ := net.SplitHostPort(service.Service.Addr().String())
|
||||
conn, err := net.Dial("udp", "127.0.0.1:"+port)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -200,27 +249,26 @@ func Test_Service_UDP(t *testing.T) {
|
|||
conn.Close()
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
GraphiteService *graphite.Service
|
||||
|
||||
type TestService struct {
|
||||
Service *Service
|
||||
MetaClient *internal.MetaClientMock
|
||||
WritePointsFn func(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
|
||||
}
|
||||
|
||||
func NewService(c *graphite.Config) *Service {
|
||||
func NewTestService(c *Config) *TestService {
|
||||
if c == nil {
|
||||
defaultC := graphite.NewConfig()
|
||||
defaultC := NewConfig()
|
||||
c = &defaultC
|
||||
}
|
||||
|
||||
gservice, err := graphite.NewService(*c)
|
||||
gservice, err := NewService(*c)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
service := &Service{
|
||||
GraphiteService: gservice,
|
||||
MetaClient: &internal.MetaClientMock{},
|
||||
service := &TestService{
|
||||
Service: gservice,
|
||||
MetaClient: &internal.MetaClientMock{},
|
||||
}
|
||||
|
||||
service.MetaClient.CreateRetentionPolicyFn = func(string, *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) {
|
||||
|
@ -239,26 +287,17 @@ func NewService(c *graphite.Config) *Service {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// Set the Meta Client
|
||||
service.GraphiteService.MetaClient = service.MetaClient
|
||||
|
||||
// Set the PointsWriter
|
||||
service.GraphiteService.PointsWriter = service
|
||||
|
||||
if !testing.Verbose() {
|
||||
service.GraphiteService.SetLogOutput(ioutil.Discard)
|
||||
service.Service.SetLogOutput(ioutil.Discard)
|
||||
}
|
||||
|
||||
// Set the Meta Client and PointsWriter.
|
||||
service.Service.MetaClient = service.MetaClient
|
||||
service.Service.PointsWriter = service
|
||||
|
||||
return service
|
||||
}
|
||||
|
||||
func (s *Service) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
|
||||
func (s *TestService) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
|
||||
return s.WritePointsFn(database, retentionPolicy, consistencyLevel, points)
|
||||
}
|
||||
|
||||
// Test Helpers
|
||||
func errstr(err error) string {
|
||||
if err != nil {
|
||||
return err.Error()
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue