Support specifying a retention policy for the graphite service

The graphite service will attempt to create the retention policy and use
it. If the retention policy doesn't exist, it will be created with the
default options.

Fixes #5655.
pull/6640/head
Jonathan A. Sternberg 2016-05-16 13:00:02 -04:00
parent 256f57a4f4
commit 4f37bc5a40
5 changed files with 60 additions and 22 deletions

View File

@ -22,6 +22,7 @@
- [#6686](https://github.com/influxdata/influxdb/pull/6686): Optimize timestamp run-length decoding
- [#6713](https://github.com/influxdata/influxdb/pull/6713): Reduce allocations during query parsing.
- [#3733](https://github.com/influxdata/influxdb/issues/3733): Modify the default retention policy name and make it configurable.
- [#5655](https://github.com/influxdata/influxdb/issues/5655): Support specifying a retention policy for the graphite service.
### Bugfixes

View File

@ -54,6 +54,7 @@ type Config struct {
Enabled bool `toml:"enabled"`
BindAddress string `toml:"bind-address"`
Database string `toml:"database"`
RetentionPolicy string `toml:"retention-policy"`
Protocol string `toml:"protocol"`
BatchSize int `toml:"batch-size"`
BatchPending int `toml:"batch-pending"`

View File

@ -14,6 +14,7 @@ func TestConfig_Parse(t *testing.T) {
if _, err := toml.Decode(`
bind-address = ":8080"
database = "mydb"
retention-policy = "myrp"
enabled = true
protocol = "tcp"
batch-size=100
@ -31,6 +32,8 @@ tags=["region=us-east"]
t.Fatalf("unexpected bind address: %s", c.BindAddress)
} else if c.Database != "mydb" {
t.Fatalf("unexpected database selected: %s", c.Database)
} else if c.RetentionPolicy != "myrp" {
t.Fatalf("unexpected retention policy selected: %s", c.RetentionPolicy)
} else if c.Enabled != true {
t.Fatalf("unexpected graphite enabled: %v", c.Enabled)
} else if c.Protocol != "tcp" {

View File

@ -52,13 +52,14 @@ func (c *tcpConnection) Close() {
type Service struct {
mu sync.Mutex
bindAddress string
database string
protocol string
batchSize int
batchPending int
batchTimeout time.Duration
udpReadBuffer int
bindAddress string
database string
retentionPolicy string
protocol string
batchSize int
batchPending int
batchTimeout time.Duration
udpReadBuffer int
batcher *tsdb.PointBatcher
parser *Parser
@ -85,6 +86,10 @@ type Service struct {
}
MetaClient interface {
CreateDatabase(name string) (*meta.DatabaseInfo, error)
CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error)
CreateRetentionPolicy(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
Database(name string) *meta.DatabaseInfo
RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error)
}
}
@ -94,17 +99,18 @@ func NewService(c Config) (*Service, error) {
d := c.WithDefaults()
s := Service{
bindAddress: d.BindAddress,
database: d.Database,
protocol: d.Protocol,
batchSize: d.BatchSize,
batchPending: d.BatchPending,
udpReadBuffer: d.UDPReadBuffer,
batchTimeout: time.Duration(d.BatchTimeout),
logger: log.New(os.Stderr, "[graphite] ", log.LstdFlags),
tcpConnections: make(map[string]*tcpConnection),
done: make(chan struct{}),
diagsKey: strings.Join([]string{"graphite", d.Protocol, d.BindAddress}, ":"),
bindAddress: d.BindAddress,
database: d.Database,
retentionPolicy: d.RetentionPolicy,
protocol: d.Protocol,
batchSize: d.BatchSize,
batchPending: d.BatchPending,
udpReadBuffer: d.UDPReadBuffer,
batchTimeout: time.Duration(d.BatchTimeout),
logger: log.New(os.Stderr, "[graphite] ", log.LstdFlags),
tcpConnections: make(map[string]*tcpConnection),
done: make(chan struct{}),
diagsKey: strings.Join([]string{"graphite", d.Protocol, d.BindAddress}, ":"),
}
parser, err := NewParserWithOptions(Options{
@ -137,9 +143,19 @@ func (s *Service) Open() error {
s.Monitor.RegisterDiagnosticsClient(s.diagsKey, s)
}
if _, err := s.MetaClient.CreateDatabase(s.database); err != nil {
s.logger.Printf("Failed to ensure target database %s exists: %s", s.database, err.Error())
return err
if db := s.MetaClient.Database(s.database); db != nil {
if rp, _ := s.MetaClient.RetentionPolicy(s.database, s.retentionPolicy); rp == nil {
rpi := meta.NewRetentionPolicyInfo(s.retentionPolicy)
if _, err := s.MetaClient.CreateRetentionPolicy(s.database, rpi); err != nil {
s.logger.Printf("Failed to ensure target retention policy %s exists: %s", s.database, err.Error())
}
}
} else {
rpi := meta.NewRetentionPolicyInfo(s.retentionPolicy)
if _, err := s.MetaClient.CreateDatabaseWithRetentionPolicy(s.database, rpi); err != nil {
s.logger.Printf("Failed to ensure target database %s exists: %s", s.database, err.Error())
return err
}
}
s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchPending, s.batchTimeout)
@ -355,7 +371,7 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
for {
select {
case batch := <-batcher.Out():
if err := s.PointsWriter.WritePoints(s.database, "", models.ConsistencyLevelAny, batch); err == nil {
if err := s.PointsWriter.WritePoints(s.database, s.retentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
s.statMap.Add(statBatchesTransmitted, 1)
s.statMap.Add(statPointsTransmitted, int64(len(batch)))
} else {

View File

@ -176,6 +176,23 @@ func (d *DatabaseCreator) CreateDatabase(name string) (*meta.DatabaseInfo, error
return nil, nil
}
func (d *DatabaseCreator) CreateRetentionPolicy(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) {
return nil, nil
}
func (d *DatabaseCreator) CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error) {
d.Created = true
return nil, nil
}
func (d *DatabaseCreator) Database(name string) *meta.DatabaseInfo {
return nil
}
func (d *DatabaseCreator) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error) {
return nil, nil
}
// Test Helpers
func errstr(err error) string {
if err != nil {