Comment additions to conform to golint
Also 1 slight code change that fixed an if statement that golint didn't like.pull/4835/head
parent
d1b6e21e25
commit
65f4060de3
|
@ -22,9 +22,10 @@ const (
|
|||
// DefaultBatchPending is the default number of pending write batches.
|
||||
DefaultBatchPending = 10
|
||||
|
||||
// DefaultBatchTimeout is the default batch timeout.
|
||||
// DefaultBatchDuration is the default batch timeout duration.
|
||||
DefaultBatchDuration = toml.Duration(10 * time.Second)
|
||||
|
||||
// DefaultTypesDB is the default location of the collectd types db file.
|
||||
DefaultTypesDB = "/usr/share/collectd/types.db"
|
||||
|
||||
// DefaultReadBuffer is the default buffer size for the UDP listener.
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
// Service manages the shard precreation service.
|
||||
type Service struct {
|
||||
checkInterval time.Duration
|
||||
advancePeriod time.Duration
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
package subscriber
|
||||
|
||||
// Config represents a configuration of the subscriber service.
|
||||
type Config struct {
|
||||
// Whether to enable to Subscriber service
|
||||
Enabled bool `toml:"enabled"`
|
||||
}
|
||||
|
||||
// NewConfig returns a new instance of a subscriber config.
|
||||
func NewConfig() Config {
|
||||
return Config{Enabled: true}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,8 @@ const (
|
|||
statWriteFailures = "writeFailures"
|
||||
)
|
||||
|
||||
// PointsWriter is an interface for writing points to a subscription destination.
|
||||
// Only WritePoints() needs to be satisfied.
|
||||
type PointsWriter interface {
|
||||
WritePoints(p *cluster.WritePointsRequest) error
|
||||
}
|
||||
|
@ -31,7 +33,7 @@ type subEntry struct {
|
|||
name string
|
||||
}
|
||||
|
||||
// The Subscriber service manages forking the incoming data from InfluxDB
|
||||
// Service manages forking the incoming data from InfluxDB
|
||||
// to defined third party destinations.
|
||||
// Subscriptions are defined per database and retention policy.
|
||||
type Service struct {
|
||||
|
@ -49,6 +51,7 @@ type Service struct {
|
|||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// NewService returns a subscriber service with given settings
|
||||
func NewService(c Config) *Service {
|
||||
return &Service{
|
||||
subs: make(map[subEntry]PointsWriter),
|
||||
|
@ -60,6 +63,7 @@ func NewService(c Config) *Service {
|
|||
}
|
||||
}
|
||||
|
||||
// Open starts the subscription service.
|
||||
func (s *Service) Open() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
@ -82,6 +86,7 @@ func (s *Service) Open() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Close terminates the subscription service
|
||||
func (s *Service) Close() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
@ -103,22 +108,21 @@ func (s *Service) waitForMetaUpdates() {
|
|||
if err != nil {
|
||||
s.Logger.Printf("error while waiting for meta data changes, err: %v\n", err)
|
||||
return
|
||||
} else {
|
||||
//Check that we haven't been closed before performing update.
|
||||
s.mu.Lock()
|
||||
if s.closed {
|
||||
s.mu.Unlock()
|
||||
s.Logger.Println("service closed not updating")
|
||||
return
|
||||
}
|
||||
s.mu.Unlock()
|
||||
s.Update()
|
||||
}
|
||||
//Check that we haven't been closed before performing update.
|
||||
s.mu.Lock()
|
||||
if s.closed {
|
||||
s.mu.Unlock()
|
||||
s.Logger.Println("service closed not updating")
|
||||
return
|
||||
}
|
||||
s.mu.Unlock()
|
||||
s.Update()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// start new and stop deleted subscriptions.
|
||||
// Update will start new and stop deleted subscriptions.
|
||||
func (s *Service) Update() error {
|
||||
dbis, err := s.MetaStore.Databases()
|
||||
if err != nil {
|
||||
|
@ -198,7 +202,7 @@ func (s *Service) createSubscription(se subEntry, mode string, destinations []st
|
|||
}, nil
|
||||
}
|
||||
|
||||
// Return channel into which write point requests can be sent.
|
||||
// Points returns a channel into which write point requests can be sent.
|
||||
func (s *Service) Points() chan<- *cluster.WritePointsRequest {
|
||||
return s.points
|
||||
}
|
||||
|
@ -220,8 +224,11 @@ func (s *Service) writePoints() {
|
|||
}
|
||||
}
|
||||
|
||||
// BalanceMode sets what balance mode to use on a subscription.
|
||||
// valid options are currently ALL or ANY
|
||||
type BalanceMode int
|
||||
|
||||
//ALL is a Balance mode option
|
||||
const (
|
||||
ALL BalanceMode = iota
|
||||
ANY
|
||||
|
|
|
@ -6,15 +6,17 @@ import (
|
|||
"github.com/influxdb/influxdb/cluster"
|
||||
)
|
||||
|
||||
// Writes points over UDP using the line protocol
|
||||
// UDP supports writing points over UDP using the line protocol.
|
||||
type UDP struct {
|
||||
addr string
|
||||
}
|
||||
|
||||
// NewUDP returns a new UDP listener with default options.
|
||||
func NewUDP(addr string) *UDP {
|
||||
return &UDP{addr: addr}
|
||||
}
|
||||
|
||||
// WritePoints writes points over UDP transport.
|
||||
func (u *UDP) WritePoints(p *cluster.WritePointsRequest) (err error) {
|
||||
var addr *net.UDPAddr
|
||||
var con *net.UDPConn
|
||||
|
|
Loading…
Reference in New Issue