collectd should retry creating database

pull/7477/head
Edd Robinson 2016-10-18 11:52:12 +01:00
parent d3f3e020a6
commit cc60647827
2 changed files with 113 additions and 31 deletions

View File

@ -54,8 +54,9 @@ type Service struct {
typesdb gollectd.Types
addr net.Addr
mu sync.Mutex
done chan struct{}
mu sync.RWMutex
ready bool // Has the required database been created?
done chan struct{} // Is the service closing or closed?
// expvar-based stats.
stats *Statistics
@ -96,11 +97,6 @@ func (s *Service) Open() error {
return fmt.Errorf("PointsWriter is nil")
}
if _, err := s.MetaClient.CreateDatabase(s.Config.Database); err != nil {
s.Logger.Printf("Failed to ensure target database %s exists: %s", s.Config.Database, err.Error())
return err
}
if s.typesdb == nil {
// Open collectd types.
if stat, err := os.Stat(s.Config.TypesDB); err != nil {
@ -178,12 +174,11 @@ func (s *Service) Open() error {
s.batcher = tsdb.NewPointBatcher(s.Config.BatchSize, s.Config.BatchPending, time.Duration(s.Config.BatchDuration))
s.batcher.Start()
// Create waitgroup for signalling goroutines to stop.
// Create waitgroup for signalling goroutines to stop and start goroutines
// that process collectd packets.
s.wg.Add(2)
// Start goroutines that process collectd packets.
go s.serve()
go s.writePoints()
go func() { defer s.wg.Done(); s.serve() }()
go func() { defer s.wg.Done(); s.writePoints() }()
return nil
}
@ -215,13 +210,6 @@ func (s *Service) Close() error {
return nil
}
// Closed returns true if the service is currently closed.
func (s *Service) Closed() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.closed()
}
func (s *Service) closed() bool {
select {
case <-s.done:
@ -232,6 +220,26 @@ func (s *Service) closed() bool {
return s.done == nil
}
// createInternalStorage ensures that the required database has been created.
func (s *Service) createInternalStorage() error {
s.mu.RLock()
ready := s.ready
s.mu.RUnlock()
if ready {
return nil
}
if _, err := s.MetaClient.CreateDatabase(s.Config.Database); err != nil {
return err
}
// The service is now ready.
s.mu.Lock()
s.ready = true
s.mu.Unlock()
return nil
}
// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (s *Service) SetLogOutput(w io.Writer) {
@ -280,8 +288,6 @@ func (s *Service) Addr() net.Addr {
}
func (s *Service) serve() {
defer s.wg.Done()
// From https://collectd.org/wiki/index.php/Binary_protocol
// 1024 bytes (payload only, not including UDP / IP headers)
// In versions 4.0 through 4.7, the receive buffer has a fixed size
@ -331,13 +337,17 @@ func (s *Service) handleMessage(buffer []byte) {
}
func (s *Service) writePoints() {
defer s.wg.Done()
for {
select {
case <-s.done:
return
case batch := <-s.batcher.Out():
// Will attempt to create database if not yet created.
if err := s.createInternalStorage(); err != nil {
s.Logger.Printf("Required database %s not yet created: %s", s.Config.Database, err.Error())
continue
}
if err := s.PointsWriter.WritePoints(s.Config.Database, s.Config.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
atomic.AddInt64(&s.stats.BatchesTransmitted, 1)
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))

View File

@ -6,6 +6,7 @@ import (
"io/ioutil"
"log"
"net"
"strings"
"testing"
"time"
@ -52,26 +53,73 @@ func TestService_OpenClose(t *testing.T) {
}
}
// Test that the service checks / creates the target database on startup.
// Test that the service checks / creates the target database every time we
// try to write points.
func TestService_CreatesDatabase(t *testing.T) {
t.Parallel()
s := NewTestService(1, time.Second)
var created bool
s.WritePointsFn = func(string, string, models.ConsistencyLevel, []models.Point) error {
return nil
}
called := make(chan struct{})
s.MetaClient.CreateDatabaseFn = func(name string) (*meta.DatabaseInfo, error) {
if name != s.Config.Database {
t.Errorf("\n\texp = %s\n\tgot = %s\n", s.Config.Database, name)
}
created = true
called <- struct{}{}
return nil, errors.New("an error")
}
if err := s.Service.Open(); err != nil {
t.Fatal(err)
}
points, err := models.ParsePointsString(`cpu value=1`)
if err != nil {
t.Fatal(err)
}
s.Service.batcher.In() <- points[0] // Send a point.
select {
case <-called:
// OK
case <-time.NewTimer(time.Second).C:
t.Fatalf("Service should have attempted to created database")
}
// ready status should not have been switched due to meta client error.
s.Service.mu.RLock()
ready := s.Service.ready
s.Service.mu.RUnlock()
if got, exp := ready, false; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}
// This time MC won't cause an error.
s.MetaClient.CreateDatabaseFn = func(name string) (*meta.DatabaseInfo, error) {
called <- struct{}{}
return nil, nil
}
s.Service.Open()
s.Service.Close()
s.Service.batcher.In() <- points[0] // Send a point.
select {
case <-called:
// OK
case <-time.NewTimer(time.Second).C:
t.Fatalf("Service should have attempted to created database")
}
if !created {
t.Errorf("CreateDatabaseIfNotExists should have been called when the service opened.")
// ready status should not have been switched due to meta client error.
s.Service.mu.RLock()
ready = s.Service.ready
s.Service.mu.RUnlock()
if got, exp := ready, true; got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}
}
@ -280,7 +328,31 @@ func check(err error) {
// Raw data sent by collectd, captured using Wireshark.
var testData = func() []byte {
b, err := hex.DecodeString("000000167066312d36322d3231302d39342d313733000001000c00000000544928ff0007000c00000000000000050002000c656e74726f7079000004000c656e74726f7079000006000f0001010000000000007240000200086370750000030006310000040008637075000005000969646c65000006000f0001000000000000a674620005000977616974000006000f0001000000000000000000000200076466000003000500000400076466000005000d6c6976652d636f7700000600180002010100000000a090b641000000a0cb6a2742000200086370750000030006310000040008637075000005000e696e74657272757074000006000f00010000000000000000fe0005000c736f6674697271000006000f000100000000000000000000020007646600000300050000040007646600000500096c6976650000060018000201010000000000000000000000e0ec972742000200086370750000030006310000040008637075000005000a737465616c000006000f00010000000000000000000003000632000005000975736572000006000f0001000000000000005f36000500096e696365000006000f0001000000000000000ad80002000e696e746572666163650000030005000004000e69665f6f6374657473000005000b64756d6d79300000060018000200000000000000000000000000000000041a000200076466000004000764660000050008746d70000006001800020101000000000000f240000000a0ea972742000200086370750000030006320000040008637075000005000b73797374656d000006000f00010000000000000045d30002000e696e746572666163650000030005000004000f69665f7061636b657473000005000b64756d6d79300000060018000200000000000000000000000000000000000f000200086370750000030006320000040008637075000005000969646c65000006000f0001000000000000a66480000200076466000003000500000400076466000005000d72756e2d6c6f636b000006001800020101000000000000000000000000000054410002000e696e74657266616365000004000e69665f6572726f7273000005000b64756d6d793000000600180002000000000000000000000000000000000000000200086370750000030006320000040008637075000005000977616974000006000f00010000000000000000000005000e696e74657272757074000006000f0001000000000000000132")
data := []string{
"000000167066312d36322d3231302d39342d313733000001000c00000000544928ff0007000c0000000",
"0000000050002000c656e74726f7079000004000c656e74726f7079000006000f000101000000000000",
"7240000200086370750000030006310000040008637075000005000969646c65000006000f000100000",
"0000000a674620005000977616974000006000f00010000000000000000000002000764660000030005",
"00000400076466000005000d6c6976652d636f7700000600180002010100000000a090b641000000a0c",
"b6a2742000200086370750000030006310000040008637075000005000e696e74657272757074000006",
"000f00010000000000000000fe0005000c736f6674697271000006000f0001000000000000000000000",
"20007646600000300050000040007646600000500096c69766500000600180002010100000000000000",
"00000000e0ec972742000200086370750000030006310000040008637075000005000a737465616c000",
"006000f00010000000000000000000003000632000005000975736572000006000f0001000000000000",
"005f36000500096e696365000006000f0001000000000000000ad80002000e696e74657266616365000",
"0030005000004000e69665f6f6374657473000005000b64756d6d793000000600180002000000000000",
"00000000000000000000041a000200076466000004000764660000050008746d7000000600180002010",
"1000000000000f240000000a0ea97274200020008637075000003000632000004000863707500000500",
"0b73797374656d000006000f00010000000000000045d30002000e696e7465726661636500000300050",
"00004000f69665f7061636b657473000005000b64756d6d793000000600180002000000000000000000",
"00000000000000000f000200086370750000030006320000040008637075000005000969646c6500000",
"6000f0001000000000000a66480000200076466000003000500000400076466000005000d72756e2d6c",
"6f636b000006001800020101000000000000000000000000000054410002000e696e746572666163650",
"00004000e69665f6572726f7273000005000b64756d6d79300000060018000200000000000000000000",
"00000000000000000002000863707500000300063200000400086370750000050009776169740000060",
"00f00010000000000000000000005000e696e74657272757074000006000f0001000000000000000132",
}
b, err := hex.DecodeString(strings.Join(data, ""))
check(err)
return b
}()