Add configurable shard duration to retention policies
Allows configuration of shard group duration at database creation, and retention policy create/alter time. Query examples: ``` CREATE DATABASE testdb WITH DURATION 90d SHARD DURATION 30m NAME rp_testdb CREATE RETENTION POLICY rp_testdb2 ON testdb DURATION INF REPLICATION 1 SHARD DURATION 30m ALTER RETENTION POLICY rp_testdb2 ON testdb SHARD DURATION 1h ``` This can be useful with long duration retention policies with lots of data, where you can split into smaller shards to relieve memory pressure.pull/6060/head
parent
5e8e849ebd
commit
45b3e61ac7
|
@ -10,6 +10,7 @@
|
|||
- [#6079](https://github.com/influxdata/influxdb/issues/6079): Limit the maximum number of concurrent queries.
|
||||
- [#6075](https://github.com/influxdata/influxdb/issues/6075): Limit the maximum running time of a query.
|
||||
- [#6102](https://github.com/influxdata/influxdb/issues/6102): Limit series count in selection
|
||||
- [#6060](https://github.com/influxdata/influxdb/pull/6060): Add configurable shard duration to retention policies
|
||||
|
||||
### Bugfixes
|
||||
|
||||
|
|
|
@ -244,6 +244,7 @@ func (e *QueryExecutor) executeAlterRetentionPolicyStatement(stmt *influxql.Alte
|
|||
rpu := &meta.RetentionPolicyUpdate{
|
||||
Duration: stmt.Duration,
|
||||
ReplicaN: stmt.Replication,
|
||||
ShardGroupDuration: stmt.ShardGroupDuration,
|
||||
}
|
||||
|
||||
// Update the retention policy.
|
||||
|
@ -274,6 +275,7 @@ func (e *QueryExecutor) executeCreateDatabaseStatement(stmt *influxql.CreateData
|
|||
rpi := meta.NewRetentionPolicyInfo(stmt.RetentionPolicyName)
|
||||
rpi.Duration = stmt.RetentionPolicyDuration
|
||||
rpi.ReplicaN = stmt.RetentionPolicyReplication
|
||||
rpi.ShardGroupDuration = stmt.RetentionPolicyShardGroupDuration
|
||||
_, err := e.MetaClient.CreateDatabaseWithRetentionPolicy(stmt.Name, rpi)
|
||||
return err
|
||||
}
|
||||
|
@ -282,6 +284,7 @@ func (e *QueryExecutor) executeCreateRetentionPolicyStatement(stmt *influxql.Cre
|
|||
rpi := meta.NewRetentionPolicyInfo(stmt.Name)
|
||||
rpi.Duration = stmt.Duration
|
||||
rpi.ReplicaN = stmt.Replication
|
||||
rpi.ShardGroupDuration = stmt.ShardGroupDuration
|
||||
|
||||
// Create new retention policy.
|
||||
if _, err := e.MetaClient.CreateRetentionPolicy(stmt.Database, rpi); err != nil {
|
||||
|
@ -656,9 +659,9 @@ func (e *QueryExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRe
|
|||
return nil, influxdb.ErrDatabaseNotFound(q.Database)
|
||||
}
|
||||
|
||||
row := &models.Row{Columns: []string{"name", "duration", "replicaN", "default"}}
|
||||
row := &models.Row{Columns: []string{"name", "duration", "shardGroupDuration", "replicaN", "default"}}
|
||||
for _, rpi := range di.RetentionPolicies {
|
||||
row.Values = append(row.Values, []interface{}{rpi.Name, rpi.Duration.String(), rpi.ReplicaN, di.DefaultRetentionPolicy == rpi.Name})
|
||||
row.Values = append(row.Values, []interface{}{rpi.Name, rpi.Duration.String(), rpi.ShardGroupDuration.String(), rpi.ReplicaN, di.DefaultRetentionPolicy == rpi.Name})
|
||||
}
|
||||
return []*models.Row{row}, nil
|
||||
}
|
||||
|
|
|
@ -322,7 +322,7 @@ func init() {
|
|||
&Query{
|
||||
name: "show retention policy should succeed",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","1h0m0s",1,false]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","1h0m0s","1h0m0s",1,false]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "alter retention policy should succeed",
|
||||
|
@ -333,12 +333,12 @@ func init() {
|
|||
&Query{
|
||||
name: "show retention policy should have new altered information",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","2h0m0s","1h0m0s",3,true]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policy should still show policy",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","2h0m0s","1h0m0s",3,true]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create a second non-default retention policy",
|
||||
|
@ -349,7 +349,7 @@ func init() {
|
|||
&Query{
|
||||
name: "show retention policy should show both",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true],["rp2","1h0m0s",1,false]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","2h0m0s","1h0m0s",3,true],["rp2","1h0m0s","1h0m0s",1,false]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "dropping non-default retention policy succeed",
|
||||
|
@ -357,14 +357,31 @@ func init() {
|
|||
exp: `{"results":[{}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "create a third non-default retention policy",
|
||||
command: `CREATE RETENTION POLICY rp3 ON db0 DURATION 1h REPLICATION 1 SHARD DURATION 30m`,
|
||||
exp: `{"results":[{}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policy should show both with custom shard",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","2h0m0s","1h0m0s",3,true],["rp3","1h0m0s","30m0s",1,false]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "dropping non-default custom shard retention policy succeed",
|
||||
command: `DROP RETENTION POLICY rp3 ON db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policy should show just default",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["rp0","2h0m0s",3,true]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","2h0m0s","1h0m0s",3,true]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "Ensure retention policy with unacceptable retention cannot be created",
|
||||
command: `CREATE RETENTION POLICY rp3 ON db0 DURATION 1s REPLICATION 1`,
|
||||
command: `CREATE RETENTION POLICY rp4 ON db0 DURATION 1s REPLICATION 1`,
|
||||
exp: `{"results":[{"error":"retention policy duration must be at least 1h0m0s"}]}`,
|
||||
once: true,
|
||||
},
|
||||
|
@ -393,7 +410,7 @@ func init() {
|
|||
&Query{
|
||||
name: "show retention policies should return auto-created policy",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["default","0",1,true]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["default","0","168h0m0s",1,true]]}]}]}`,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -503,7 +503,7 @@ func TestServer_Query_DefaultDBAndRP(t *testing.T) {
|
|||
&Query{
|
||||
name: "default rp exists",
|
||||
command: `show retention policies ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["default","0",1,false],["rp0","1h0m0s",1,true]]}]}]}`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["default","0","168h0m0s",1,false],["rp0","1h0m0s","1h0m0s",1,true]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "default rp",
|
||||
|
|
|
@ -444,6 +444,9 @@ type CreateDatabaseStatement struct {
|
|||
|
||||
// RetentionPolicyName indicates retention name for the new database
|
||||
RetentionPolicyName string
|
||||
|
||||
// RetentionPolicyShardGroupDuration indicates shard group duration for the new database
|
||||
RetentionPolicyShardGroupDuration time.Duration
|
||||
}
|
||||
|
||||
// String returns a string representation of the create database statement.
|
||||
|
@ -459,6 +462,10 @@ func (s *CreateDatabaseStatement) String() string {
|
|||
_, _ = buf.WriteString(s.RetentionPolicyDuration.String())
|
||||
_, _ = buf.WriteString(" REPLICATION ")
|
||||
_, _ = buf.WriteString(strconv.Itoa(s.RetentionPolicyReplication))
|
||||
if s.RetentionPolicyShardGroupDuration > 0 {
|
||||
_, _ = buf.WriteString(" SHARD DURATION ")
|
||||
_, _ = buf.WriteString(s.RetentionPolicyShardGroupDuration.String())
|
||||
}
|
||||
_, _ = buf.WriteString(" NAME ")
|
||||
_, _ = buf.WriteString(QuoteIdent(s.RetentionPolicyName))
|
||||
}
|
||||
|
@ -751,6 +758,9 @@ type CreateRetentionPolicyStatement struct {
|
|||
|
||||
// Should this policy be set as default for the database?
|
||||
Default bool
|
||||
|
||||
// Shard Duration
|
||||
ShardGroupDuration time.Duration
|
||||
}
|
||||
|
||||
// String returns a string representation of the create retention policy.
|
||||
|
@ -764,6 +774,10 @@ func (s *CreateRetentionPolicyStatement) String() string {
|
|||
_, _ = buf.WriteString(FormatDuration(s.Duration))
|
||||
_, _ = buf.WriteString(" REPLICATION ")
|
||||
_, _ = buf.WriteString(strconv.Itoa(s.Replication))
|
||||
if s.ShardGroupDuration > 0 {
|
||||
_, _ = buf.WriteString(" SHARD DURATION ")
|
||||
_, _ = buf.WriteString(FormatDuration(s.ShardGroupDuration))
|
||||
}
|
||||
if s.Default {
|
||||
_, _ = buf.WriteString(" DEFAULT")
|
||||
}
|
||||
|
@ -791,6 +805,9 @@ type AlterRetentionPolicyStatement struct {
|
|||
|
||||
// Should this policy be set as defalut for the database?
|
||||
Default bool
|
||||
|
||||
// Duration of the Shard
|
||||
ShardGroupDuration *time.Duration
|
||||
}
|
||||
|
||||
// String returns a string representation of the alter retention policy statement.
|
||||
|
@ -811,6 +828,11 @@ func (s *AlterRetentionPolicyStatement) String() string {
|
|||
_, _ = buf.WriteString(strconv.Itoa(*s.Replication))
|
||||
}
|
||||
|
||||
if s.ShardGroupDuration != nil {
|
||||
_, _ = buf.WriteString(" SHARD DURATION ")
|
||||
_, _ = buf.WriteString(FormatDuration(*s.ShardGroupDuration))
|
||||
}
|
||||
|
||||
if s.Default {
|
||||
_, _ = buf.WriteString(" DEFAULT")
|
||||
}
|
||||
|
@ -1448,7 +1470,6 @@ func (s *SelectStatement) validPercentileAggr(expr *Call) error {
|
|||
default:
|
||||
return fmt.Errorf("expected float argument in percentile()")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
|
||||
|
|
|
@ -389,8 +389,7 @@ func (p *Parser) parseCreateRetentionPolicyStatement() (*CreateRetentionPolicySt
|
|||
stmt.Database = ident
|
||||
|
||||
// Parse required DURATION token.
|
||||
tok, pos, lit := p.scanIgnoreWhitespace()
|
||||
if tok != DURATION {
|
||||
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != DURATION {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"DURATION"}, pos)
|
||||
}
|
||||
|
||||
|
@ -402,7 +401,7 @@ func (p *Parser) parseCreateRetentionPolicyStatement() (*CreateRetentionPolicySt
|
|||
stmt.Duration = d
|
||||
|
||||
// Parse required REPLICATION token.
|
||||
if tok, pos, lit = p.scanIgnoreWhitespace(); tok != REPLICATION {
|
||||
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != REPLICATION {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"REPLICATION"}, pos)
|
||||
}
|
||||
|
||||
|
@ -413,8 +412,24 @@ func (p *Parser) parseCreateRetentionPolicyStatement() (*CreateRetentionPolicySt
|
|||
}
|
||||
stmt.Replication = n
|
||||
|
||||
// Parse optional SHARD token.
|
||||
if tok, pos, lit := p.scanIgnoreWhitespace(); tok == SHARD {
|
||||
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != DURATION {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"DURATION"}, pos)
|
||||
}
|
||||
d, err := p.parseDuration()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stmt.ShardGroupDuration = d
|
||||
} else if tok != EOF && tok != SEMICOLON && tok != DEFAULT {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"SHARD"}, pos)
|
||||
} else {
|
||||
p.unscan()
|
||||
}
|
||||
|
||||
// Parse optional DEFAULT token.
|
||||
if tok, pos, lit = p.scanIgnoreWhitespace(); tok == DEFAULT {
|
||||
if tok, pos, lit := p.scanIgnoreWhitespace(); tok == DEFAULT {
|
||||
stmt.Default = true
|
||||
} else if tok != EOF && tok != SEMICOLON {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"DEFAULT"}, pos)
|
||||
|
@ -468,11 +483,22 @@ Loop:
|
|||
return nil, err
|
||||
}
|
||||
stmt.Replication = &n
|
||||
case SHARD:
|
||||
tok, pos, lit := p.scanIgnoreWhitespace()
|
||||
if tok == DURATION {
|
||||
d, err := p.parseDuration()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stmt.ShardGroupDuration = &d
|
||||
} else {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"DURATION"}, pos)
|
||||
}
|
||||
case DEFAULT:
|
||||
stmt.Default = true
|
||||
default:
|
||||
if i < 1 {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"DURATION", "RETENTION", "DEFAULT"}, pos)
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"DURATION", "RETENTION", "SHARD", "DEFAULT"}, pos)
|
||||
}
|
||||
p.unscan()
|
||||
break Loop
|
||||
|
@ -1528,7 +1554,7 @@ func (p *Parser) parseCreateDatabaseStatement() (*CreateDatabaseStatement, error
|
|||
// validate that at least one of DURATION, REPLICATION or NAME is provided
|
||||
tok, pos, lit := p.scanIgnoreWhitespace()
|
||||
if tok != DURATION && tok != REPLICATION && tok != NAME {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"DURATION", "REPLICATION", "NAME"}, pos)
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"DURATION", "REPLICATION", "SHARD", "NAME"}, pos)
|
||||
}
|
||||
// rewind
|
||||
p.unscan()
|
||||
|
@ -1560,6 +1586,22 @@ func (p *Parser) parseCreateDatabaseStatement() (*CreateDatabaseStatement, error
|
|||
}
|
||||
stmt.RetentionPolicyReplication = rpReplication
|
||||
|
||||
// Look for "SHARD"
|
||||
var rpShardGroupDuration time.Duration
|
||||
if err := p.parseTokens([]Token{SHARD}); err != nil {
|
||||
p.unscan()
|
||||
} else {
|
||||
tok, pos, lit := p.scanIgnoreWhitespace()
|
||||
if tok != DURATION {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"DURATION"}, pos)
|
||||
}
|
||||
rpShardGroupDuration, err = p.parseDuration()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stmt.RetentionPolicyShardGroupDuration = rpShardGroupDuration
|
||||
}
|
||||
|
||||
// Look for "NAME"
|
||||
var rpName string = "default" // default is default
|
||||
if err := p.parseTokens([]Token{NAME}); err != nil {
|
||||
|
|
|
@ -1328,6 +1328,30 @@ func TestParser_ParseStatement(t *testing.T) {
|
|||
RetentionPolicyName: "test_name",
|
||||
},
|
||||
},
|
||||
{
|
||||
s: `CREATE DATABASE testdb WITH DURATION 24h REPLICATION 2 SHARD DURATION 10m NAME test_name `,
|
||||
stmt: &influxql.CreateDatabaseStatement{
|
||||
Name: "testdb",
|
||||
IfNotExists: false,
|
||||
RetentionPolicyCreate: true,
|
||||
RetentionPolicyDuration: 24 * time.Hour,
|
||||
RetentionPolicyReplication: 2,
|
||||
RetentionPolicyName: "test_name",
|
||||
RetentionPolicyShardGroupDuration: 10 * time.Minute,
|
||||
},
|
||||
},
|
||||
{
|
||||
s: `CREATE DATABASE IF NOT EXISTS testdb WITH DURATION 24h REPLICATION 2 SHARD DURATION 10m NAME test_name`,
|
||||
stmt: &influxql.CreateDatabaseStatement{
|
||||
Name: "testdb",
|
||||
IfNotExists: true,
|
||||
RetentionPolicyCreate: true,
|
||||
RetentionPolicyDuration: 24 * time.Hour,
|
||||
RetentionPolicyReplication: 2,
|
||||
RetentionPolicyName: "test_name",
|
||||
RetentionPolicyShardGroupDuration: 10 * time.Minute,
|
||||
},
|
||||
},
|
||||
|
||||
// CREATE USER statement
|
||||
{
|
||||
|
@ -1545,46 +1569,62 @@ func TestParser_ParseStatement(t *testing.T) {
|
|||
Default: true,
|
||||
},
|
||||
},
|
||||
// CREATE RETENTION POLICY
|
||||
{
|
||||
s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 2 SHARD DURATION 30m`,
|
||||
stmt: &influxql.CreateRetentionPolicyStatement{
|
||||
Name: "policy1",
|
||||
Database: "testdb",
|
||||
Duration: time.Hour,
|
||||
Replication: 2,
|
||||
ShardGroupDuration: 30 * time.Minute,
|
||||
},
|
||||
},
|
||||
|
||||
// ALTER RETENTION POLICY
|
||||
{
|
||||
s: `ALTER RETENTION POLICY policy1 ON testdb DURATION 1m REPLICATION 4 DEFAULT`,
|
||||
stmt: newAlterRetentionPolicyStatement("policy1", "testdb", time.Minute, 4, true),
|
||||
stmt: newAlterRetentionPolicyStatement("policy1", "testdb", time.Minute, -1, 4, true),
|
||||
},
|
||||
|
||||
// ALTER RETENTION POLICY with options in reverse order
|
||||
{
|
||||
s: `ALTER RETENTION POLICY policy1 ON testdb DEFAULT REPLICATION 4 DURATION 1m`,
|
||||
stmt: newAlterRetentionPolicyStatement("policy1", "testdb", time.Minute, 4, true),
|
||||
stmt: newAlterRetentionPolicyStatement("policy1", "testdb", time.Minute, -1, 4, true),
|
||||
},
|
||||
|
||||
// ALTER RETENTION POLICY with infinite retention
|
||||
{
|
||||
s: `ALTER RETENTION POLICY policy1 ON testdb DEFAULT REPLICATION 4 DURATION INF`,
|
||||
stmt: newAlterRetentionPolicyStatement("policy1", "testdb", 0, 4, true),
|
||||
stmt: newAlterRetentionPolicyStatement("policy1", "testdb", 0, -1, 4, true),
|
||||
},
|
||||
|
||||
// ALTER RETENTION POLICY without optional DURATION
|
||||
{
|
||||
s: `ALTER RETENTION POLICY policy1 ON testdb DEFAULT REPLICATION 4`,
|
||||
stmt: newAlterRetentionPolicyStatement("policy1", "testdb", -1, 4, true),
|
||||
stmt: newAlterRetentionPolicyStatement("policy1", "testdb", -1, -1, 4, true),
|
||||
},
|
||||
|
||||
// ALTER RETENTION POLICY without optional REPLICATION
|
||||
{
|
||||
s: `ALTER RETENTION POLICY policy1 ON testdb DEFAULT`,
|
||||
stmt: newAlterRetentionPolicyStatement("policy1", "testdb", -1, -1, true),
|
||||
stmt: newAlterRetentionPolicyStatement("policy1", "testdb", -1, -1, -1, true),
|
||||
},
|
||||
|
||||
// ALTER RETENTION POLICY without optional DEFAULT
|
||||
{
|
||||
s: `ALTER RETENTION POLICY policy1 ON testdb REPLICATION 4`,
|
||||
stmt: newAlterRetentionPolicyStatement("policy1", "testdb", -1, 4, false),
|
||||
stmt: newAlterRetentionPolicyStatement("policy1", "testdb", -1, -1, 4, false),
|
||||
},
|
||||
// ALTER default retention policy unquoted
|
||||
{
|
||||
s: `ALTER RETENTION POLICY default ON testdb REPLICATION 4`,
|
||||
stmt: newAlterRetentionPolicyStatement("default", "testdb", -1, 4, false),
|
||||
stmt: newAlterRetentionPolicyStatement("default", "testdb", -1, -1, 4, false),
|
||||
},
|
||||
// ALTER RETENTION POLICY with SHARD duration
|
||||
{
|
||||
s: `ALTER RETENTION POLICY policy1 ON testdb REPLICATION 4 SHARD DURATION 10m`,
|
||||
stmt: newAlterRetentionPolicyStatement("policy1", "testdb", -1, 10*time.Minute, 4, false),
|
||||
},
|
||||
|
||||
// SHOW STATS
|
||||
|
@ -1779,14 +1819,14 @@ func TestParser_ParseStatement(t *testing.T) {
|
|||
{s: `DROP FOO`, err: `found FOO, expected CONTINUOUS, DATA, MEASUREMENT, META, RETENTION, SERIES, SHARD, SUBSCRIPTION, USER at line 1, char 6`},
|
||||
{s: `CREATE FOO`, err: `found FOO, expected CONTINUOUS, DATABASE, USER, RETENTION, SUBSCRIPTION at line 1, char 8`},
|
||||
{s: `CREATE DATABASE`, err: `found EOF, expected identifier at line 1, char 17`},
|
||||
{s: `CREATE DATABASE "testdb" WITH`, err: `found EOF, expected DURATION, REPLICATION, NAME at line 1, char 31`},
|
||||
{s: `CREATE DATABASE "testdb" WITH`, err: `found EOF, expected DURATION, REPLICATION, SHARD, NAME at line 1, char 31`},
|
||||
{s: `CREATE DATABASE "testdb" WITH DURATION`, err: `found EOF, expected duration at line 1, char 40`},
|
||||
{s: `CREATE DATABASE "testdb" WITH REPLICATION`, err: `found EOF, expected integer at line 1, char 43`},
|
||||
{s: `CREATE DATABASE "testdb" WITH NAME`, err: `found EOF, expected identifier at line 1, char 36`},
|
||||
{s: `CREATE DATABASE IF`, err: `found EOF, expected NOT at line 1, char 20`},
|
||||
{s: `CREATE DATABASE IF NOT`, err: `found EOF, expected EXISTS at line 1, char 24`},
|
||||
{s: `CREATE DATABASE IF NOT EXISTS`, err: `found EOF, expected identifier at line 1, char 31`},
|
||||
{s: `CREATE DATABASE IF NOT EXISTS "testdb" WITH`, err: `found EOF, expected DURATION, REPLICATION, NAME at line 1, char 45`},
|
||||
{s: `CREATE DATABASE IF NOT EXISTS "testdb" WITH`, err: `found EOF, expected DURATION, REPLICATION, SHARD, NAME at line 1, char 45`},
|
||||
{s: `CREATE DATABASE IF NOT EXISTS "testdb" WITH DURATION`, err: `found EOF, expected duration at line 1, char 54`},
|
||||
{s: `CREATE DATABASE IF NOT EXISTS "testdb" WITH REPLICATION`, err: `found EOF, expected integer at line 1, char 57`},
|
||||
{s: `CREATE DATABASE IF NOT EXISTS "testdb" WITH NAME`, err: `found EOF, expected identifier at line 1, char 50`},
|
||||
|
@ -1894,12 +1934,13 @@ func TestParser_ParseStatement(t *testing.T) {
|
|||
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 3.14`, err: `found 3.14, expected integer at line 1, char 67`},
|
||||
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 0`, err: `invalid value 0: must be 1 <= n <= 2147483647 at line 1, char 67`},
|
||||
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION bad`, err: `found bad, expected integer at line 1, char 67`},
|
||||
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 1 foo`, err: `found foo, expected DEFAULT at line 1, char 69`},
|
||||
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 1 foo`, err: `found foo, expected SHARD at line 1, char 69`},
|
||||
{s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 1 SHARD DURATION 30m foo`, err: `found foo, expected DEFAULT at line 1, char 88`},
|
||||
{s: `ALTER`, err: `found EOF, expected RETENTION at line 1, char 7`},
|
||||
{s: `ALTER RETENTION`, err: `found EOF, expected POLICY at line 1, char 17`},
|
||||
{s: `ALTER RETENTION POLICY`, err: `found EOF, expected identifier at line 1, char 24`},
|
||||
{s: `ALTER RETENTION POLICY policy1`, err: `found EOF, expected ON at line 1, char 32`}, {s: `ALTER RETENTION POLICY policy1 ON`, err: `found EOF, expected identifier at line 1, char 35`},
|
||||
{s: `ALTER RETENTION POLICY policy1 ON testdb`, err: `found EOF, expected DURATION, RETENTION, DEFAULT at line 1, char 42`},
|
||||
{s: `ALTER RETENTION POLICY policy1 ON testdb`, err: `found EOF, expected DURATION, RETENTION, SHARD, DEFAULT at line 1, char 42`},
|
||||
{s: `SET`, err: `found EOF, expected PASSWORD at line 1, char 5`},
|
||||
{s: `SET PASSWORD`, err: `found EOF, expected FOR at line 1, char 14`},
|
||||
{s: `SET PASSWORD something`, err: `found something, expected FOR at line 1, char 14`},
|
||||
|
@ -2320,7 +2361,7 @@ func errstring(err error) string {
|
|||
}
|
||||
|
||||
// newAlterRetentionPolicyStatement creates an initialized AlterRetentionPolicyStatement.
|
||||
func newAlterRetentionPolicyStatement(name string, DB string, d time.Duration, replication int, dfault bool) *influxql.AlterRetentionPolicyStatement {
|
||||
func newAlterRetentionPolicyStatement(name string, DB string, d, sd time.Duration, replication int, dfault bool) *influxql.AlterRetentionPolicyStatement {
|
||||
stmt := &influxql.AlterRetentionPolicyStatement{
|
||||
Name: name,
|
||||
Database: DB,
|
||||
|
@ -2331,6 +2372,10 @@ func newAlterRetentionPolicyStatement(name string, DB string, d time.Duration, r
|
|||
stmt.Duration = &d
|
||||
}
|
||||
|
||||
if sd > -1 {
|
||||
stmt.ShardGroupDuration = &sd
|
||||
}
|
||||
|
||||
if replication > -1 {
|
||||
stmt.Replication = &replication
|
||||
}
|
||||
|
|
|
@ -151,12 +151,19 @@ func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInf
|
|||
}
|
||||
|
||||
// Append new policy.
|
||||
di.RetentionPolicies = append(di.RetentionPolicies, RetentionPolicyInfo{
|
||||
rp := RetentionPolicyInfo{
|
||||
Name: rpi.Name,
|
||||
Duration: rpi.Duration,
|
||||
ShardGroupDuration: shardGroupDuration(rpi.Duration),
|
||||
ReplicaN: rpi.ReplicaN,
|
||||
})
|
||||
}
|
||||
|
||||
if rpi.ShardGroupDuration != 0 {
|
||||
rp.ShardGroupDuration = rpi.ShardGroupDuration
|
||||
} else {
|
||||
rp.ShardGroupDuration = shardGroupDuration(rpi.Duration)
|
||||
}
|
||||
|
||||
di.RetentionPolicies = append(di.RetentionPolicies, rp)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -186,6 +193,7 @@ type RetentionPolicyUpdate struct {
|
|||
Name *string
|
||||
Duration *time.Duration
|
||||
ReplicaN *int
|
||||
ShardGroupDuration *time.Duration
|
||||
}
|
||||
|
||||
// SetName sets the RetentionPolicyUpdate.Name
|
||||
|
@ -197,6 +205,9 @@ func (rpu *RetentionPolicyUpdate) SetDuration(v time.Duration) { rpu.Duration =
|
|||
// SetReplicaN sets the RetentionPolicyUpdate.ReplicaN
|
||||
func (rpu *RetentionPolicyUpdate) SetReplicaN(v int) { rpu.ReplicaN = &v }
|
||||
|
||||
// SetShardGroupDuration sets the RetentionPolicyUpdate.ShardGroupDuration
|
||||
func (rpu *RetentionPolicyUpdate) SetShardGroupDuration(v time.Duration) { rpu.ShardGroupDuration = &v }
|
||||
|
||||
// UpdateRetentionPolicy updates an existing retention policy.
|
||||
func (data *Data) UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) error {
|
||||
// Find database.
|
||||
|
@ -227,12 +238,17 @@ func (data *Data) UpdateRetentionPolicy(database, name string, rpu *RetentionPol
|
|||
}
|
||||
if rpu.Duration != nil {
|
||||
rpi.Duration = *rpu.Duration
|
||||
rpi.ShardGroupDuration = shardGroupDuration(rpi.Duration)
|
||||
}
|
||||
if rpu.ReplicaN != nil {
|
||||
rpi.ReplicaN = *rpu.ReplicaN
|
||||
}
|
||||
|
||||
if rpu.ShardGroupDuration != nil {
|
||||
rpi.ShardGroupDuration = *rpu.ShardGroupDuration
|
||||
} else {
|
||||
rpi.ShardGroupDuration = shardGroupDuration(rpi.Duration)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue