diff --git a/CHANGELOG.md b/CHANGELOG.md index baed32852d..a08b02fb66 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ - [#6686](https://github.com/influxdata/influxdb/pull/6686): Optimize timestamp run-length decoding - [#6713](https://github.com/influxdata/influxdb/pull/6713): Reduce allocations during query parsing. - [#3733](https://github.com/influxdata/influxdb/issues/3733): Modify the default retention policy name and make it configurable. +- [#5655](https://github.com/influxdata/influxdb/issues/5655): Support specifying a retention policy for the graphite service. ### Bugfixes diff --git a/services/graphite/config.go b/services/graphite/config.go index c4dc17c0d2..9261c3edca 100644 --- a/services/graphite/config.go +++ b/services/graphite/config.go @@ -54,6 +54,7 @@ type Config struct { Enabled bool `toml:"enabled"` BindAddress string `toml:"bind-address"` Database string `toml:"database"` + RetentionPolicy string `toml:"retention-policy"` Protocol string `toml:"protocol"` BatchSize int `toml:"batch-size"` BatchPending int `toml:"batch-pending"` diff --git a/services/graphite/config_test.go b/services/graphite/config_test.go index 9c1700f344..4c47979461 100644 --- a/services/graphite/config_test.go +++ b/services/graphite/config_test.go @@ -14,6 +14,7 @@ func TestConfig_Parse(t *testing.T) { if _, err := toml.Decode(` bind-address = ":8080" database = "mydb" +retention-policy = "myrp" enabled = true protocol = "tcp" batch-size=100 @@ -31,6 +32,8 @@ tags=["region=us-east"] t.Fatalf("unexpected bind address: %s", c.BindAddress) } else if c.Database != "mydb" { t.Fatalf("unexpected database selected: %s", c.Database) + } else if c.RetentionPolicy != "myrp" { + t.Fatalf("unexpected retention policy selected: %s", c.RetentionPolicy) } else if c.Enabled != true { t.Fatalf("unexpected graphite enabled: %v", c.Enabled) } else if c.Protocol != "tcp" { diff --git a/services/graphite/service.go b/services/graphite/service.go index 6628c74568..bdc06c7ec6 100644 --- a/services/graphite/service.go +++ b/services/graphite/service.go @@ -52,13 +52,14 @@ func (c *tcpConnection) Close() { type Service struct { mu sync.Mutex - bindAddress string - database string - protocol string - batchSize int - batchPending int - batchTimeout time.Duration - udpReadBuffer int + bindAddress string + database string + retentionPolicy string + protocol string + batchSize int + batchPending int + batchTimeout time.Duration + udpReadBuffer int batcher *tsdb.PointBatcher parser *Parser @@ -85,6 +86,10 @@ type Service struct { } MetaClient interface { CreateDatabase(name string) (*meta.DatabaseInfo, error) + CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error) + CreateRetentionPolicy(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) + Database(name string) *meta.DatabaseInfo + RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error) } } @@ -94,17 +99,18 @@ func NewService(c Config) (*Service, error) { d := c.WithDefaults() s := Service{ - bindAddress: d.BindAddress, - database: d.Database, - protocol: d.Protocol, - batchSize: d.BatchSize, - batchPending: d.BatchPending, - udpReadBuffer: d.UDPReadBuffer, - batchTimeout: time.Duration(d.BatchTimeout), - logger: log.New(os.Stderr, "[graphite] ", log.LstdFlags), - tcpConnections: make(map[string]*tcpConnection), - done: make(chan struct{}), - diagsKey: strings.Join([]string{"graphite", d.Protocol, d.BindAddress}, ":"), + bindAddress: d.BindAddress, + database: d.Database, + retentionPolicy: d.RetentionPolicy, + protocol: d.Protocol, + batchSize: d.BatchSize, + batchPending: d.BatchPending, + udpReadBuffer: d.UDPReadBuffer, + batchTimeout: time.Duration(d.BatchTimeout), + logger: log.New(os.Stderr, "[graphite] ", log.LstdFlags), + tcpConnections: make(map[string]*tcpConnection), + done: make(chan struct{}), + diagsKey: strings.Join([]string{"graphite", d.Protocol, d.BindAddress}, ":"), } parser, err := NewParserWithOptions(Options{ @@ -137,9 +143,19 @@ func (s *Service) Open() error { s.Monitor.RegisterDiagnosticsClient(s.diagsKey, s) } - if _, err := s.MetaClient.CreateDatabase(s.database); err != nil { - s.logger.Printf("Failed to ensure target database %s exists: %s", s.database, err.Error()) - return err + if db := s.MetaClient.Database(s.database); db != nil { + if rp, _ := s.MetaClient.RetentionPolicy(s.database, s.retentionPolicy); rp == nil { + rpi := meta.NewRetentionPolicyInfo(s.retentionPolicy) + if _, err := s.MetaClient.CreateRetentionPolicy(s.database, rpi); err != nil { + s.logger.Printf("Failed to ensure target retention policy %s exists: %s", s.database, err.Error()) + } + } + } else { + rpi := meta.NewRetentionPolicyInfo(s.retentionPolicy) + if _, err := s.MetaClient.CreateDatabaseWithRetentionPolicy(s.database, rpi); err != nil { + s.logger.Printf("Failed to ensure target database %s exists: %s", s.database, err.Error()) + return err + } } s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchPending, s.batchTimeout) @@ -355,7 +371,7 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) { for { select { case batch := <-batcher.Out(): - if err := s.PointsWriter.WritePoints(s.database, "", models.ConsistencyLevelAny, batch); err == nil { + if err := s.PointsWriter.WritePoints(s.database, s.retentionPolicy, models.ConsistencyLevelAny, batch); err == nil { s.statMap.Add(statBatchesTransmitted, 1) s.statMap.Add(statPointsTransmitted, int64(len(batch))) } else { diff --git a/services/graphite/service_test.go b/services/graphite/service_test.go index 9cc59fa493..dbc2f8519c 100644 --- a/services/graphite/service_test.go +++ b/services/graphite/service_test.go @@ -176,6 +176,23 @@ func (d *DatabaseCreator) CreateDatabase(name string) (*meta.DatabaseInfo, error return nil, nil } +func (d *DatabaseCreator) CreateRetentionPolicy(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) { + return nil, nil +} + +func (d *DatabaseCreator) CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error) { + d.Created = true + return nil, nil +} + +func (d *DatabaseCreator) Database(name string) *meta.DatabaseInfo { + return nil +} + +func (d *DatabaseCreator) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error) { + return nil, nil +} + // Test Helpers func errstr(err error) string { if err != nil {