Merge pull request #1857 from influxdb/support_inf_retention

Support infinite retention policies
pull/1859/head
Philip O'Toole 2015-03-05 15:58:45 -08:00
commit 7eb31f20a2
6 changed files with 30 additions and 5 deletions

View File

@ -6,6 +6,7 @@
### Features
- [#1755] (https://github.com/influxdb/influxdb/pull/1848): Support JSON data ingest over UDP
- [#1857] (https://github.com/influxdb/influxdb/pull/1857): Support retention policies with infinite duration
## v0.9.0-rc7 [2015-03-02]

View File

@ -1013,7 +1013,7 @@ type RetentionPolicy struct {
// Unique name within database. Required.
Name string `json:"name"`
// Length of time to keep data around
// Length of time to keep data around. A zero duration means keep the data forever.
Duration time.Duration `json:"duration"`
// The number of copies to make of each shard.

View File

@ -133,7 +133,7 @@ func (p *Parser) parseShowStatement() (Statement, error) {
}
// parseCreateStatement parses a string and returns a create statement.
// This function assumes the CREATE token has already been consumned.
// This function assumes the CREATE token has already been consumed.
func (p *Parser) parseCreateStatement() (Statement, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
if tok == CONTINUOUS {
@ -352,9 +352,14 @@ func (p *Parser) parseUInt32() (uint32, error) {
// This function assumes the DURATION token has already been consumed.
func (p *Parser) parseDuration() (time.Duration, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
if tok != DURATION_VAL {
if tok != DURATION_VAL && tok != INF {
return 0, newParseError(tokstr(tok, lit), []string{"duration"}, pos)
}
if tok == INF {
return 0, nil
}
d, err := ParseDuration(lit)
if err != nil {
return 0, &ParseError{Message: err.Error(), Pos: pos}

View File

@ -637,6 +637,17 @@ func TestParser_ParseStatement(t *testing.T) {
},
},
// CREATE RETENTION POLICY with infinite retention
{
s: `CREATE RETENTION POLICY policy1 ON testdb DURATION INF REPLICATION 2`,
stmt: &influxql.CreateRetentionPolicyStatement{
Name: "policy1",
Database: "testdb",
Duration: 0,
Replication: 2,
},
},
// CREATE RETENTION POLICY ... DEFAULT
{
s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 2m REPLICATION 4 DEFAULT`,
@ -661,6 +672,12 @@ func TestParser_ParseStatement(t *testing.T) {
stmt: newAlterRetentionPolicyStatement("policy1", "testdb", time.Minute, 4, true),
},
// ALTER RETENTION POLICY with infinite retention
{
s: `ALTER RETENTION POLICY policy1 ON testdb DEFAULT REPLICATION 4 DURATION INF`,
stmt: newAlterRetentionPolicyStatement("policy1", "testdb", 0, 4, true),
},
// ALTER RETENTION POLICY without optional DURATION
{
s: `ALTER RETENTION POLICY policy1 ON testdb DEFAULT REPLICATION 4`,

View File

@ -79,6 +79,7 @@ const (
GROUP
IF
IN
INF
INNER
INSERT
INTO
@ -176,6 +177,7 @@ var tokens = [...]string{
GROUP: "GROUP",
IF: "IF",
IN: "IN",
INF: "INF",
INNER: "INNER",
INSERT: "INSERT",
INTO: "INTO",

View File

@ -302,7 +302,7 @@ func (s *Server) EnforceRetentionPolicies() {
for _, db := range s.databases {
for _, rp := range db.policies {
for _, g := range rp.shardGroups {
if g.EndTime.Add(rp.Duration).Before(time.Now().UTC()) {
if rp.Duration != 0 && g.EndTime.Add(rp.Duration).Before(time.Now().UTC()) {
log.Printf("shard group %d, retention policy %s, database %s due for deletion",
g.ID, rp.Name, db.name)
if err := s.DeleteShardGroup(db.name, rp.Name, g.ID); err != nil {