fix: do not allow shard creation to create overlapping shards (#22604)
parent
ea018dfc21
commit
0a6f562a44
|
@ -369,16 +369,48 @@ func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time,
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
startTime := timestamp.Truncate(rpi.ShardGroupDuration).UTC()
|
||||||
|
endTime := startTime.Add(rpi.ShardGroupDuration).UTC()
|
||||||
|
if endTime.After(time.Unix(0, models.MaxNanoTime)) {
|
||||||
|
// Shard group range is [start, end) so add one to the max time.
|
||||||
|
endTime = time.Unix(0, models.MaxNanoTime+1)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range rpi.ShardGroups {
|
||||||
|
if rpi.ShardGroups[i].Deleted() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
startI := rpi.ShardGroups[i].StartTime
|
||||||
|
endI := rpi.ShardGroups[i].EndTime
|
||||||
|
if rpi.ShardGroups[i].Truncated() {
|
||||||
|
endI = rpi.ShardGroups[i].TruncatedAt
|
||||||
|
}
|
||||||
|
|
||||||
|
// shard_i covers range [start_i, end_i)
|
||||||
|
// We want the largest range [startTime, endTime) such that all of the following hold:
|
||||||
|
// startTime <= timestamp < endTime
|
||||||
|
// for all i, not { start_i < endTime && startTime < end_i }
|
||||||
|
// Assume the above conditions are true for shards index < i, we want to modify startTime,endTime so they are true
|
||||||
|
// also for shard_i
|
||||||
|
|
||||||
|
// It must be the case that either endI <= timestamp || timestamp < startI, because otherwise:
|
||||||
|
// startI <= timestamp < endI means timestamp is contained in shard I
|
||||||
|
if !timestamp.Before(endI) && endI.After(startTime) {
|
||||||
|
// startTime < endI <= timestamp
|
||||||
|
startTime = endI
|
||||||
|
}
|
||||||
|
if startI.After(timestamp) && startI.Before(endTime) {
|
||||||
|
// timestamp < startI < endTime
|
||||||
|
endTime = startI
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Create the shard group.
|
// Create the shard group.
|
||||||
data.MaxShardGroupID++
|
data.MaxShardGroupID++
|
||||||
sgi := ShardGroupInfo{}
|
sgi := ShardGroupInfo{}
|
||||||
sgi.ID = data.MaxShardGroupID
|
sgi.ID = data.MaxShardGroupID
|
||||||
sgi.StartTime = timestamp.Truncate(rpi.ShardGroupDuration).UTC()
|
sgi.StartTime = startTime
|
||||||
sgi.EndTime = sgi.StartTime.Add(rpi.ShardGroupDuration).UTC()
|
sgi.EndTime = endTime
|
||||||
if sgi.EndTime.After(time.Unix(0, models.MaxNanoTime)) {
|
|
||||||
// Shard group range is [start, end) so add one to the max time.
|
|
||||||
sgi.EndTime = time.Unix(0, models.MaxNanoTime+1)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(shards) > 0 {
|
if len(shards) > 0 {
|
||||||
sgi.Shards = make([]ShardInfo, len(shards))
|
sgi.Shards = make([]ShardInfo, len(shards))
|
||||||
|
|
|
@ -342,6 +342,61 @@ func TestData_TruncateShardGroups(t *testing.T) {
|
||||||
if sg1.TruncatedAt != sg1.StartTime {
|
if sg1.TruncatedAt != sg1.StartTime {
|
||||||
t.Fatalf("Incorrect truncation of future shard group. Expected %v, got %v", sg1.StartTime, sg1.TruncatedAt)
|
t.Fatalf("Incorrect truncation of future shard group. Expected %v, got %v", sg1.StartTime, sg1.TruncatedAt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
groups := data.Databases[0].RetentionPolicies[0].ShardGroups
|
||||||
|
assert.Equal(t, 2, len(groups))
|
||||||
|
assert.Equal(t, "1970-01-01 00:00:00 +0000 UTC", groups[0].StartTime.String())
|
||||||
|
assert.Equal(t, "1970-01-02 00:00:00 +0000 UTC", groups[0].EndTime.String())
|
||||||
|
assert.Equal(t, "1970-01-01 23:59:00 +0000 UTC", groups[0].TruncatedAt.String())
|
||||||
|
|
||||||
|
assert.Equal(t, "1970-01-02 00:00:00 +0000 UTC", groups[1].StartTime.String())
|
||||||
|
assert.Equal(t, "1970-01-03 00:00:00 +0000 UTC", groups[1].EndTime.String())
|
||||||
|
assert.Equal(t, "1970-01-02 00:00:00 +0000 UTC", groups[1].TruncatedAt.String())
|
||||||
|
|
||||||
|
// Create some more shard groups and validate there is no overlap
|
||||||
|
// Add a shard starting at sg0's truncation time, until 01/02
|
||||||
|
must(data.CreateShardGroup("db", "rp", sg0.EndTime.Add(-time.Second)))
|
||||||
|
// Add a shard 01/02 - 01/03 (since sg1 is fully truncated)
|
||||||
|
must(data.CreateShardGroup("db", "rp", sg1.EndTime.Add(-time.Second)))
|
||||||
|
// Add a shard 01/06 - 01/07
|
||||||
|
must(data.CreateShardGroup("db", "rp", sg1.EndTime.Add(3*rp.ShardGroupDuration)))
|
||||||
|
newDuration := 10 * rp.ShardGroupDuration
|
||||||
|
data.UpdateRetentionPolicy("db", "rp", &meta.RetentionPolicyUpdate{
|
||||||
|
Name: nil,
|
||||||
|
Duration: nil,
|
||||||
|
ReplicaN: nil,
|
||||||
|
ShardGroupDuration: &newDuration,
|
||||||
|
}, true)
|
||||||
|
// Add a shard 01/03 - 01/06
|
||||||
|
must(data.CreateShardGroup("db", "rp", sg1.EndTime.Add(1*rp.ShardGroupDuration)))
|
||||||
|
// Add a shard 01/07 - 01/09
|
||||||
|
must(data.CreateShardGroup("db", "rp", sg1.EndTime.Add(4*rp.ShardGroupDuration)))
|
||||||
|
// Add a shard 01/09 - 01/19
|
||||||
|
must(data.CreateShardGroup("db", "rp", sg1.EndTime.Add(10*rp.ShardGroupDuration)))
|
||||||
|
// No additional shard added
|
||||||
|
must(data.CreateShardGroup("db", "rp", sg1.EndTime.Add(11*rp.ShardGroupDuration)))
|
||||||
|
|
||||||
|
groups = data.Databases[0].RetentionPolicies[0].ShardGroups
|
||||||
|
assert.Equal(t, 8, len(groups))
|
||||||
|
|
||||||
|
expectTimes := []struct {
|
||||||
|
start, end, truncated string
|
||||||
|
}{
|
||||||
|
{"1970-01-01 00:00:00 +0000 UTC", "1970-01-02 00:00:00 +0000 UTC", "1970-01-01 23:59:00 +0000 UTC"},
|
||||||
|
{"1970-01-01 23:59:00 +0000 UTC", "1970-01-02 00:00:00 +0000 UTC", "0001-01-01 00:00:00 +0000 UTC"},
|
||||||
|
{"1970-01-02 00:00:00 +0000 UTC", "1970-01-03 00:00:00 +0000 UTC", "1970-01-02 00:00:00 +0000 UTC"},
|
||||||
|
{"1970-01-02 00:00:00 +0000 UTC", "1970-01-03 00:00:00 +0000 UTC", "0001-01-01 00:00:00 +0000 UTC"},
|
||||||
|
{"1970-01-03 00:00:00 +0000 UTC", "1970-01-06 00:00:00 +0000 UTC", "0001-01-01 00:00:00 +0000 UTC"},
|
||||||
|
{"1970-01-06 00:00:00 +0000 UTC", "1970-01-07 00:00:00 +0000 UTC", "0001-01-01 00:00:00 +0000 UTC"},
|
||||||
|
{"1970-01-07 00:00:00 +0000 UTC", "1970-01-09 00:00:00 +0000 UTC", "0001-01-01 00:00:00 +0000 UTC"},
|
||||||
|
{"1970-01-09 00:00:00 +0000 UTC", "1970-01-19 00:00:00 +0000 UTC", "0001-01-01 00:00:00 +0000 UTC"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range expectTimes {
|
||||||
|
assert.Equal(t, expectTimes[i].start, groups[i].StartTime.String(), "start time %d", i)
|
||||||
|
assert.Equal(t, expectTimes[i].end, groups[i].EndTime.String(), "end time %d", i)
|
||||||
|
assert.Equal(t, expectTimes[i].truncated, groups[i].TruncatedAt.String(), "truncate time %d", i)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUserInfo_AuthorizeDatabase(t *testing.T) {
|
func TestUserInfo_AuthorizeDatabase(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue