diff --git a/CHANGELOG.md b/CHANGELOG.md index 80c8dcb86d..33ceb8f15f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - [#6079](https://github.com/influxdata/influxdb/issues/6079): Limit the maximum number of concurrent queries. - [#6075](https://github.com/influxdata/influxdb/issues/6075): Limit the maximum running time of a query. - [#6102](https://github.com/influxdata/influxdb/issues/6102): Limit series count in selection +- [#6060](https://github.com/influxdata/influxdb/pull/6060): Add configurable shard duration to retention policies ### Bugfixes diff --git a/cluster/query_executor.go b/cluster/query_executor.go index 8433dbbbf3..5ddb47e563 100644 --- a/cluster/query_executor.go +++ b/cluster/query_executor.go @@ -242,8 +242,9 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu func (e *QueryExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.AlterRetentionPolicyStatement) error { rpu := &meta.RetentionPolicyUpdate{ - Duration: stmt.Duration, - ReplicaN: stmt.Replication, + Duration: stmt.Duration, + ReplicaN: stmt.Replication, + ShardGroupDuration: stmt.ShardGroupDuration, } // Update the retention policy. @@ -274,6 +275,7 @@ func (e *QueryExecutor) executeCreateDatabaseStatement(stmt *influxql.CreateData rpi := meta.NewRetentionPolicyInfo(stmt.RetentionPolicyName) rpi.Duration = stmt.RetentionPolicyDuration rpi.ReplicaN = stmt.RetentionPolicyReplication + rpi.ShardGroupDuration = stmt.RetentionPolicyShardGroupDuration _, err := e.MetaClient.CreateDatabaseWithRetentionPolicy(stmt.Name, rpi) return err } @@ -282,6 +284,7 @@ func (e *QueryExecutor) executeCreateRetentionPolicyStatement(stmt *influxql.Cre rpi := meta.NewRetentionPolicyInfo(stmt.Name) rpi.Duration = stmt.Duration rpi.ReplicaN = stmt.Replication + rpi.ShardGroupDuration = stmt.ShardGroupDuration // Create new retention policy. if _, err := e.MetaClient.CreateRetentionPolicy(stmt.Database, rpi); err != nil { @@ -656,9 +659,9 @@ func (e *QueryExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRe return nil, influxdb.ErrDatabaseNotFound(q.Database) } - row := &models.Row{Columns: []string{"name", "duration", "replicaN", "default"}} + row := &models.Row{Columns: []string{"name", "duration", "shardGroupDuration", "replicaN", "default"}} for _, rpi := range di.RetentionPolicies { - row.Values = append(row.Values, []interface{}{rpi.Name, rpi.Duration.String(), rpi.ReplicaN, di.DefaultRetentionPolicy == rpi.Name}) + row.Values = append(row.Values, []interface{}{rpi.Name, rpi.Duration.String(), rpi.ShardGroupDuration.String(), rpi.ReplicaN, di.DefaultRetentionPolicy == rpi.Name}) } return []*models.Row{row}, nil } diff --git a/cmd/influxd/run/server_suite_test.go b/cmd/influxd/run/server_suite_test.go index cc198e77fc..6683083343 100644 --- a/cmd/influxd/run/server_suite_test.go +++ b/cmd/influxd/run/server_suite_test.go @@ -322,7 +322,7 @@ func init() { &Query{ name: "show retention policy should succeed", command: `SHOW RETENTION POLICIES ON db0`, - exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","1h0m0s",1,false]]}]}]}`, + exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","1h0m0s","1h0m0s",1,false]]}]}]}`, }, &Query{ name: "alter retention policy should succeed", @@ -333,12 +333,12 @@ func init() { &Query{ name: "show retention policy should have new altered information", command: `SHOW RETENTION POLICIES ON db0`, - exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true]]}]}]}`, + exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","2h0m0s","1h0m0s",3,true]]}]}]}`, }, &Query{ name: "show retention policy should still show policy", command: `SHOW RETENTION POLICIES ON db0`, - exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true]]}]}]}`, + exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","2h0m0s","1h0m0s",3,true]]}]}]}`, }, &Query{ name: "create a second non-default retention policy", @@ -349,7 +349,7 @@ func init() { &Query{ name: "show retention policy should show both", command: `SHOW RETENTION POLICIES ON db0`, - exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true],["rp2","1h0m0s",1,false]]}]}]}`, + exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","2h0m0s","1h0m0s",3,true],["rp2","1h0m0s","1h0m0s",1,false]]}]}]}`, }, &Query{ name: "dropping non-default retention policy succeed", @@ -357,14 +357,31 @@ func init() { exp: `{"results":[{}]}`, once: true, }, + &Query{ + name: "create a third non-default retention policy", + command: `CREATE RETENTION POLICY rp3 ON db0 DURATION 1h REPLICATION 1 SHARD DURATION 30m`, + exp: `{"results":[{}]}`, + once: true, + }, + &Query{ + name: "show retention policy should show both with custom shard", + command: `SHOW RETENTION POLICIES ON db0`, + exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","2h0m0s","1h0m0s",3,true],["rp3","1h0m0s","30m0s",1,false]]}]}]}`, + }, + &Query{ + name: "dropping non-default custom shard retention policy succeed", + command: `DROP RETENTION POLICY rp3 ON db0`, + exp: `{"results":[{}]}`, + once: true, + }, &Query{ name: "show retention policy should show just default", command: `SHOW RETENTION POLICIES ON db0`, - exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true]]}]}]}`, + exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","2h0m0s","1h0m0s",3,true]]}]}]}`, }, &Query{ name: "Ensure retention policy with unacceptable retention cannot be created", - command: `CREATE RETENTION POLICY rp3 ON db0 DURATION 1s REPLICATION 1`, + command: `CREATE RETENTION POLICY rp4 ON db0 DURATION 1s REPLICATION 1`, exp: `{"results":[{"error":"retention policy duration must be at least 1h0m0s"}]}`, once: true, }, @@ -393,7 +410,7 @@ func init() { &Query{ name: "show retention policies should return auto-created policy", command: `SHOW RETENTION POLICIES ON db0`, - exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["default","0",1,true]]}]}]}`, + exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["default","0","168h0m0s",1,true]]}]}]}`, }, }, } diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index a5c2fcf4cb..c5fb50f527 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -503,7 +503,7 @@ func TestServer_Query_DefaultDBAndRP(t *testing.T) { &Query{ name: "default rp exists", command: `show retention policies ON db0`, - exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["default","0",1,false],["rp0","1h0m0s",1,true]]}]}]}`, + exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["default","0","168h0m0s",1,false],["rp0","1h0m0s","1h0m0s",1,true]]}]}]}`, }, &Query{ name: "default rp", diff --git a/influxql/ast.go b/influxql/ast.go index 176bf9978b..e1c813fbd0 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -444,6 +444,9 @@ type CreateDatabaseStatement struct { // RetentionPolicyName indicates retention name for the new database RetentionPolicyName string + + // RetentionPolicyShardGroupDuration indicates shard group duration for the new database + RetentionPolicyShardGroupDuration time.Duration } // String returns a string representation of the create database statement. @@ -459,6 +462,10 @@ func (s *CreateDatabaseStatement) String() string { _, _ = buf.WriteString(s.RetentionPolicyDuration.String()) _, _ = buf.WriteString(" REPLICATION ") _, _ = buf.WriteString(strconv.Itoa(s.RetentionPolicyReplication)) + if s.RetentionPolicyShardGroupDuration > 0 { + _, _ = buf.WriteString(" SHARD DURATION ") + _, _ = buf.WriteString(s.RetentionPolicyShardGroupDuration.String()) + } _, _ = buf.WriteString(" NAME ") _, _ = buf.WriteString(QuoteIdent(s.RetentionPolicyName)) } @@ -751,6 +758,9 @@ type CreateRetentionPolicyStatement struct { // Should this policy be set as default for the database? Default bool + + // Shard Duration + ShardGroupDuration time.Duration } // String returns a string representation of the create retention policy. @@ -764,6 +774,10 @@ func (s *CreateRetentionPolicyStatement) String() string { _, _ = buf.WriteString(FormatDuration(s.Duration)) _, _ = buf.WriteString(" REPLICATION ") _, _ = buf.WriteString(strconv.Itoa(s.Replication)) + if s.ShardGroupDuration > 0 { + _, _ = buf.WriteString(" SHARD DURATION ") + _, _ = buf.WriteString(FormatDuration(s.ShardGroupDuration)) + } if s.Default { _, _ = buf.WriteString(" DEFAULT") } @@ -791,6 +805,9 @@ type AlterRetentionPolicyStatement struct { // Should this policy be set as defalut for the database? Default bool + + // Duration of the Shard + ShardGroupDuration *time.Duration } // String returns a string representation of the alter retention policy statement. @@ -811,6 +828,11 @@ func (s *AlterRetentionPolicyStatement) String() string { _, _ = buf.WriteString(strconv.Itoa(*s.Replication)) } + if s.ShardGroupDuration != nil { + _, _ = buf.WriteString(" SHARD DURATION ") + _, _ = buf.WriteString(FormatDuration(*s.ShardGroupDuration)) + } + if s.Default { _, _ = buf.WriteString(" DEFAULT") } @@ -1448,7 +1470,6 @@ func (s *SelectStatement) validPercentileAggr(expr *Call) error { default: return fmt.Errorf("expected float argument in percentile()") } - return nil } func (s *SelectStatement) validateAggregates(tr targetRequirement) error { diff --git a/influxql/parser.go b/influxql/parser.go index 71d8494381..87c63a2126 100644 --- a/influxql/parser.go +++ b/influxql/parser.go @@ -389,8 +389,7 @@ func (p *Parser) parseCreateRetentionPolicyStatement() (*CreateRetentionPolicySt stmt.Database = ident // Parse required DURATION token. - tok, pos, lit := p.scanIgnoreWhitespace() - if tok != DURATION { + if tok, pos, lit := p.scanIgnoreWhitespace(); tok != DURATION { return nil, newParseError(tokstr(tok, lit), []string{"DURATION"}, pos) } @@ -402,7 +401,7 @@ func (p *Parser) parseCreateRetentionPolicyStatement() (*CreateRetentionPolicySt stmt.Duration = d // Parse required REPLICATION token. - if tok, pos, lit = p.scanIgnoreWhitespace(); tok != REPLICATION { + if tok, pos, lit := p.scanIgnoreWhitespace(); tok != REPLICATION { return nil, newParseError(tokstr(tok, lit), []string{"REPLICATION"}, pos) } @@ -413,8 +412,24 @@ func (p *Parser) parseCreateRetentionPolicyStatement() (*CreateRetentionPolicySt } stmt.Replication = n + // Parse optional SHARD token. + if tok, pos, lit := p.scanIgnoreWhitespace(); tok == SHARD { + if tok, pos, lit := p.scanIgnoreWhitespace(); tok != DURATION { + return nil, newParseError(tokstr(tok, lit), []string{"DURATION"}, pos) + } + d, err := p.parseDuration() + if err != nil { + return nil, err + } + stmt.ShardGroupDuration = d + } else if tok != EOF && tok != SEMICOLON && tok != DEFAULT { + return nil, newParseError(tokstr(tok, lit), []string{"SHARD"}, pos) + } else { + p.unscan() + } + // Parse optional DEFAULT token. - if tok, pos, lit = p.scanIgnoreWhitespace(); tok == DEFAULT { + if tok, pos, lit := p.scanIgnoreWhitespace(); tok == DEFAULT { stmt.Default = true } else if tok != EOF && tok != SEMICOLON { return nil, newParseError(tokstr(tok, lit), []string{"DEFAULT"}, pos) @@ -468,11 +483,22 @@ Loop: return nil, err } stmt.Replication = &n + case SHARD: + tok, pos, lit := p.scanIgnoreWhitespace() + if tok == DURATION { + d, err := p.parseDuration() + if err != nil { + return nil, err + } + stmt.ShardGroupDuration = &d + } else { + return nil, newParseError(tokstr(tok, lit), []string{"DURATION"}, pos) + } case DEFAULT: stmt.Default = true default: if i < 1 { - return nil, newParseError(tokstr(tok, lit), []string{"DURATION", "RETENTION", "DEFAULT"}, pos) + return nil, newParseError(tokstr(tok, lit), []string{"DURATION", "RETENTION", "SHARD", "DEFAULT"}, pos) } p.unscan() break Loop @@ -1528,7 +1554,7 @@ func (p *Parser) parseCreateDatabaseStatement() (*CreateDatabaseStatement, error // validate that at least one of DURATION, REPLICATION or NAME is provided tok, pos, lit := p.scanIgnoreWhitespace() if tok != DURATION && tok != REPLICATION && tok != NAME { - return nil, newParseError(tokstr(tok, lit), []string{"DURATION", "REPLICATION", "NAME"}, pos) + return nil, newParseError(tokstr(tok, lit), []string{"DURATION", "REPLICATION", "SHARD", "NAME"}, pos) } // rewind p.unscan() @@ -1560,6 +1586,22 @@ func (p *Parser) parseCreateDatabaseStatement() (*CreateDatabaseStatement, error } stmt.RetentionPolicyReplication = rpReplication + // Look for "SHARD" + var rpShardGroupDuration time.Duration + if err := p.parseTokens([]Token{SHARD}); err != nil { + p.unscan() + } else { + tok, pos, lit := p.scanIgnoreWhitespace() + if tok != DURATION { + return nil, newParseError(tokstr(tok, lit), []string{"DURATION"}, pos) + } + rpShardGroupDuration, err = p.parseDuration() + if err != nil { + return nil, err + } + stmt.RetentionPolicyShardGroupDuration = rpShardGroupDuration + } + // Look for "NAME" var rpName string = "default" // default is default if err := p.parseTokens([]Token{NAME}); err != nil { diff --git a/influxql/parser_test.go b/influxql/parser_test.go index ae4c57b27b..bb82f98b00 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -1328,6 +1328,30 @@ func TestParser_ParseStatement(t *testing.T) { RetentionPolicyName: "test_name", }, }, + { + s: `CREATE DATABASE testdb WITH DURATION 24h REPLICATION 2 SHARD DURATION 10m NAME test_name `, + stmt: &influxql.CreateDatabaseStatement{ + Name: "testdb", + IfNotExists: false, + RetentionPolicyCreate: true, + RetentionPolicyDuration: 24 * time.Hour, + RetentionPolicyReplication: 2, + RetentionPolicyName: "test_name", + RetentionPolicyShardGroupDuration: 10 * time.Minute, + }, + }, + { + s: `CREATE DATABASE IF NOT EXISTS testdb WITH DURATION 24h REPLICATION 2 SHARD DURATION 10m NAME test_name`, + stmt: &influxql.CreateDatabaseStatement{ + Name: "testdb", + IfNotExists: true, + RetentionPolicyCreate: true, + RetentionPolicyDuration: 24 * time.Hour, + RetentionPolicyReplication: 2, + RetentionPolicyName: "test_name", + RetentionPolicyShardGroupDuration: 10 * time.Minute, + }, + }, // CREATE USER statement { @@ -1545,46 +1569,62 @@ func TestParser_ParseStatement(t *testing.T) { Default: true, }, }, + // CREATE RETENTION POLICY + { + s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 2 SHARD DURATION 30m`, + stmt: &influxql.CreateRetentionPolicyStatement{ + Name: "policy1", + Database: "testdb", + Duration: time.Hour, + Replication: 2, + ShardGroupDuration: 30 * time.Minute, + }, + }, // ALTER RETENTION POLICY { s: `ALTER RETENTION POLICY policy1 ON testdb DURATION 1m REPLICATION 4 DEFAULT`, - stmt: newAlterRetentionPolicyStatement("policy1", "testdb", time.Minute, 4, true), + stmt: newAlterRetentionPolicyStatement("policy1", "testdb", time.Minute, -1, 4, true), }, // ALTER RETENTION POLICY with options in reverse order { s: `ALTER RETENTION POLICY policy1 ON testdb DEFAULT REPLICATION 4 DURATION 1m`, - stmt: newAlterRetentionPolicyStatement("policy1", "testdb", time.Minute, 4, true), + stmt: newAlterRetentionPolicyStatement("policy1", "testdb", time.Minute, -1, 4, true), }, // ALTER RETENTION POLICY with infinite retention { s: `ALTER RETENTION POLICY policy1 ON testdb DEFAULT REPLICATION 4 DURATION INF`, - stmt: newAlterRetentionPolicyStatement("policy1", "testdb", 0, 4, true), + stmt: newAlterRetentionPolicyStatement("policy1", "testdb", 0, -1, 4, true), }, // ALTER RETENTION POLICY without optional DURATION { s: `ALTER RETENTION POLICY policy1 ON testdb DEFAULT REPLICATION 4`, - stmt: newAlterRetentionPolicyStatement("policy1", "testdb", -1, 4, true), + stmt: newAlterRetentionPolicyStatement("policy1", "testdb", -1, -1, 4, true), }, // ALTER RETENTION POLICY without optional REPLICATION { s: `ALTER RETENTION POLICY policy1 ON testdb DEFAULT`, - stmt: newAlterRetentionPolicyStatement("policy1", "testdb", -1, -1, true), + stmt: newAlterRetentionPolicyStatement("policy1", "testdb", -1, -1, -1, true), }, // ALTER RETENTION POLICY without optional DEFAULT { s: `ALTER RETENTION POLICY policy1 ON testdb REPLICATION 4`, - stmt: newAlterRetentionPolicyStatement("policy1", "testdb", -1, 4, false), + stmt: newAlterRetentionPolicyStatement("policy1", "testdb", -1, -1, 4, false), }, // ALTER default retention policy unquoted { s: `ALTER RETENTION POLICY default ON testdb REPLICATION 4`, - stmt: newAlterRetentionPolicyStatement("default", "testdb", -1, 4, false), + stmt: newAlterRetentionPolicyStatement("default", "testdb", -1, -1, 4, false), + }, + // ALTER RETENTION POLICY with SHARD duration + { + s: `ALTER RETENTION POLICY policy1 ON testdb REPLICATION 4 SHARD DURATION 10m`, + stmt: newAlterRetentionPolicyStatement("policy1", "testdb", -1, 10*time.Minute, 4, false), }, // SHOW STATS @@ -1779,14 +1819,14 @@ func TestParser_ParseStatement(t *testing.T) { {s: `DROP FOO`, err: `found FOO, expected CONTINUOUS, DATA, MEASUREMENT, META, RETENTION, SERIES, SHARD, SUBSCRIPTION, USER at line 1, char 6`}, {s: `CREATE FOO`, err: `found FOO, expected CONTINUOUS, DATABASE, USER, RETENTION, SUBSCRIPTION at line 1, char 8`}, {s: `CREATE DATABASE`, err: `found EOF, expected identifier at line 1, char 17`}, - {s: `CREATE DATABASE "testdb" WITH`, err: `found EOF, expected DURATION, REPLICATION, NAME at line 1, char 31`}, + {s: `CREATE DATABASE "testdb" WITH`, err: `found EOF, expected DURATION, REPLICATION, SHARD, NAME at line 1, char 31`}, {s: `CREATE DATABASE "testdb" WITH DURATION`, err: `found EOF, expected duration at line 1, char 40`}, {s: `CREATE DATABASE "testdb" WITH REPLICATION`, err: `found EOF, expected integer at line 1, char 43`}, {s: `CREATE DATABASE "testdb" WITH NAME`, err: `found EOF, expected identifier at line 1, char 36`}, {s: `CREATE DATABASE IF`, err: `found EOF, expected NOT at line 1, char 20`}, {s: `CREATE DATABASE IF NOT`, err: `found EOF, expected EXISTS at line 1, char 24`}, {s: `CREATE DATABASE IF NOT EXISTS`, err: `found EOF, expected identifier at line 1, char 31`}, - {s: `CREATE DATABASE IF NOT EXISTS "testdb" WITH`, err: `found EOF, expected DURATION, REPLICATION, NAME at line 1, char 45`}, + {s: `CREATE DATABASE IF NOT EXISTS "testdb" WITH`, err: `found EOF, expected DURATION, REPLICATION, SHARD, NAME at line 1, char 45`}, {s: `CREATE DATABASE IF NOT EXISTS "testdb" WITH DURATION`, err: `found EOF, expected duration at line 1, char 54`}, {s: `CREATE DATABASE IF NOT EXISTS "testdb" WITH REPLICATION`, err: `found EOF, expected integer at line 1, char 57`}, {s: `CREATE DATABASE IF NOT EXISTS "testdb" WITH NAME`, err: `found EOF, expected identifier at line 1, char 50`}, @@ -1894,12 +1934,13 @@ func TestParser_ParseStatement(t *testing.T) { {s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 3.14`, err: `found 3.14, expected integer at line 1, char 67`}, {s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 0`, err: `invalid value 0: must be 1 <= n <= 2147483647 at line 1, char 67`}, {s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION bad`, err: `found bad, expected integer at line 1, char 67`}, - {s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 1 foo`, err: `found foo, expected DEFAULT at line 1, char 69`}, + {s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 1 foo`, err: `found foo, expected SHARD at line 1, char 69`}, + {s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 1 SHARD DURATION 30m foo`, err: `found foo, expected DEFAULT at line 1, char 88`}, {s: `ALTER`, err: `found EOF, expected RETENTION at line 1, char 7`}, {s: `ALTER RETENTION`, err: `found EOF, expected POLICY at line 1, char 17`}, {s: `ALTER RETENTION POLICY`, err: `found EOF, expected identifier at line 1, char 24`}, {s: `ALTER RETENTION POLICY policy1`, err: `found EOF, expected ON at line 1, char 32`}, {s: `ALTER RETENTION POLICY policy1 ON`, err: `found EOF, expected identifier at line 1, char 35`}, - {s: `ALTER RETENTION POLICY policy1 ON testdb`, err: `found EOF, expected DURATION, RETENTION, DEFAULT at line 1, char 42`}, + {s: `ALTER RETENTION POLICY policy1 ON testdb`, err: `found EOF, expected DURATION, RETENTION, SHARD, DEFAULT at line 1, char 42`}, {s: `SET`, err: `found EOF, expected PASSWORD at line 1, char 5`}, {s: `SET PASSWORD`, err: `found EOF, expected FOR at line 1, char 14`}, {s: `SET PASSWORD something`, err: `found something, expected FOR at line 1, char 14`}, @@ -2320,7 +2361,7 @@ func errstring(err error) string { } // newAlterRetentionPolicyStatement creates an initialized AlterRetentionPolicyStatement. -func newAlterRetentionPolicyStatement(name string, DB string, d time.Duration, replication int, dfault bool) *influxql.AlterRetentionPolicyStatement { +func newAlterRetentionPolicyStatement(name string, DB string, d, sd time.Duration, replication int, dfault bool) *influxql.AlterRetentionPolicyStatement { stmt := &influxql.AlterRetentionPolicyStatement{ Name: name, Database: DB, @@ -2331,6 +2372,10 @@ func newAlterRetentionPolicyStatement(name string, DB string, d time.Duration, r stmt.Duration = &d } + if sd > -1 { + stmt.ShardGroupDuration = &sd + } + if replication > -1 { stmt.Replication = &replication } diff --git a/services/meta/data.go b/services/meta/data.go index f2b86cc6d0..eb5ee91f0b 100644 --- a/services/meta/data.go +++ b/services/meta/data.go @@ -151,12 +151,19 @@ func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInf } // Append new policy. - di.RetentionPolicies = append(di.RetentionPolicies, RetentionPolicyInfo{ - Name: rpi.Name, - Duration: rpi.Duration, - ShardGroupDuration: shardGroupDuration(rpi.Duration), - ReplicaN: rpi.ReplicaN, - }) + rp := RetentionPolicyInfo{ + Name: rpi.Name, + Duration: rpi.Duration, + ReplicaN: rpi.ReplicaN, + } + + if rpi.ShardGroupDuration != 0 { + rp.ShardGroupDuration = rpi.ShardGroupDuration + } else { + rp.ShardGroupDuration = shardGroupDuration(rpi.Duration) + } + + di.RetentionPolicies = append(di.RetentionPolicies, rp) return nil } @@ -183,9 +190,10 @@ func (data *Data) DropRetentionPolicy(database, name string) error { // RetentionPolicyUpdate represents retention policy fields to be updated. type RetentionPolicyUpdate struct { - Name *string - Duration *time.Duration - ReplicaN *int + Name *string + Duration *time.Duration + ReplicaN *int + ShardGroupDuration *time.Duration } // SetName sets the RetentionPolicyUpdate.Name @@ -197,6 +205,9 @@ func (rpu *RetentionPolicyUpdate) SetDuration(v time.Duration) { rpu.Duration = // SetReplicaN sets the RetentionPolicyUpdate.ReplicaN func (rpu *RetentionPolicyUpdate) SetReplicaN(v int) { rpu.ReplicaN = &v } +// SetShardGroupDuration sets the RetentionPolicyUpdate.ShardGroupDuration +func (rpu *RetentionPolicyUpdate) SetShardGroupDuration(v time.Duration) { rpu.ShardGroupDuration = &v } + // UpdateRetentionPolicy updates an existing retention policy. func (data *Data) UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) error { // Find database. @@ -227,12 +238,17 @@ func (data *Data) UpdateRetentionPolicy(database, name string, rpu *RetentionPol } if rpu.Duration != nil { rpi.Duration = *rpu.Duration - rpi.ShardGroupDuration = shardGroupDuration(rpi.Duration) } if rpu.ReplicaN != nil { rpi.ReplicaN = *rpu.ReplicaN } + if rpu.ShardGroupDuration != nil { + rpi.ShardGroupDuration = *rpu.ShardGroupDuration + } else { + rpi.ShardGroupDuration = shardGroupDuration(rpi.Duration) + } + return nil }