fix(upgrade): don't drop shard-group durations when upgrading DBs (#22650)

Add KV migration to repair missing shard-group durations
pull/22652/head
Daniel Moran 2021-10-11 17:43:57 -04:00 committed by GitHub
parent e75d023eba
commit 7b7d4f3856
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 114 additions and 91 deletions

View File

@ -73,6 +73,7 @@ func upgradeDatabases(ctx context.Context, cli clients.CLI, v1 *influxDBv1, v2 *
Description: fmt.Sprintf("Upgraded from v1 database %s with retention policy %s", db.Name, rp.Name),
RetentionPolicyName: rp.Name,
RetentionPeriod: rp.Duration,
ShardGroupDuration: rp.ShardGroupDuration,
}
log.Debug("Creating bucket", zap.String("Bucket", bucket.Name))
err = v2.bucketSvc.CreateBucket(ctx, bucket)

View File

@ -294,6 +294,7 @@ func TestUpgradeRealDB(t *testing.T) {
case bucketNames[6]:
emptyBucketId = b.ID.String()
}
require.NotZero(t, b.ShardGroupDuration)
}
require.NoDirExists(t, filepath.Join(enginePath, "data", "_internal"))

View File

@ -10,110 +10,112 @@ import (
"github.com/influxdata/influxdb/v2/v1/services/meta"
)
var Migration0015_RecordShardGroupDurationsInBucketMetadata = UpOnlyMigration(
"record shard group durations in bucket metadata",
func(ctx context.Context, store kv.SchemaStore) error {
type bucket struct {
ID platform.ID `json:"id,omitempty"`
OrgID platform.ID `json:"orgID,omitempty"`
Type int `json:"type"`
Name string `json:"name"`
Description string `json:"description"`
RetentionPolicyName string `json:"rp,omitempty"` // This to support v1 sources
RetentionPeriod time.Duration `json:"retentionPeriod"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
func repairMissingShardGroupDurations(ctx context.Context, store kv.SchemaStore) error {
type bucket struct {
ID platform.ID `json:"id,omitempty"`
OrgID platform.ID `json:"orgID,omitempty"`
Type int `json:"type"`
Name string `json:"name"`
Description string `json:"description"`
RetentionPolicyName string `json:"rp,omitempty"` // This to support v1 sources
RetentionPeriod time.Duration `json:"retentionPeriod"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
// This is expected to be 0 for all buckets created before
// we began tracking it in metadata.
ShardGroupDuration time.Duration `json:"shardGroupDuration"`
// This is expected to be 0 for all buckets created before
// we began tracking it in metadata.
ShardGroupDuration time.Duration `json:"shardGroupDuration"`
}
bucketBucket := []byte("bucketsv1")
// Collect buckets that need to be updated
var buckets []*bucket
if err := store.View(ctx, func(tx kv.Tx) error {
bkt, err := tx.Bucket(bucketBucket)
if err != nil {
return err
}
bucketBucket := []byte("bucketsv1")
// Collect buckets that need to be updated
var buckets []*bucket
if err := store.View(ctx, func(tx kv.Tx) error {
cursor, err := bkt.ForwardCursor(nil)
if err != nil {
return err
}
return kv.WalkCursor(ctx, cursor, func(_, v []byte) (bool, error) {
var b bucket
if err := json.Unmarshal(v, &b); err != nil {
return false, err
}
if b.ShardGroupDuration == 0 {
buckets = append(buckets, &b)
}
return true, nil
})
}); err != nil {
return err
}
batchSize := 100
writeBatch := func(batch []*bucket) (err error) {
ids := make([][]byte, len(batch))
for i, b := range batch {
ids[i], err = b.ID.Encode()
if err != nil {
return err
}
}
return store.Update(ctx, func(tx kv.Tx) error {
bkt, err := tx.Bucket(bucketBucket)
if err != nil {
return err
}
cursor, err := bkt.ForwardCursor(nil)
values, err := bkt.GetBatch(ids...)
if err != nil {
return err
}
return kv.WalkCursor(ctx, cursor, func(_, v []byte) (bool, error) {
for i, value := range values {
var b bucket
if err := json.Unmarshal(v, &b); err != nil {
return false, err
}
if b.ShardGroupDuration == 0 {
buckets = append(buckets, &b)
if err := json.Unmarshal(value, &b); err != nil {
return err
}
return true, nil
})
}); err != nil {
if b.ShardGroupDuration == 0 {
// Backfill the duration using the same method used
// to derive the value within the storage engine.
b.ShardGroupDuration = meta.NormalisedShardDuration(0, b.RetentionPeriod)
}
updated, err := json.Marshal(b)
if err != nil {
return err
}
if err := bkt.Put(ids[i], updated); err != nil {
return err
}
}
return nil
})
}
for i := 0; i < len(buckets); i += batchSize {
end := i + batchSize
if end > len(buckets) {
end = len(buckets)
}
if err := writeBatch(buckets[i:end]); err != nil {
return err
}
}
batchSize := 100
writeBatch := func(batch []*bucket) (err error) {
ids := make([][]byte, len(batch))
for i, b := range batch {
ids[i], err = b.ID.Encode()
if err != nil {
return err
}
}
return nil
}
return store.Update(ctx, func(tx kv.Tx) error {
bkt, err := tx.Bucket(bucketBucket)
if err != nil {
return err
}
values, err := bkt.GetBatch(ids...)
if err != nil {
return err
}
for i, value := range values {
var b bucket
if err := json.Unmarshal(value, &b); err != nil {
return err
}
if b.ShardGroupDuration == 0 {
// Backfill the duration using the same method used
// to dervie the value within the storage engine.
b.ShardGroupDuration = meta.NormalisedShardDuration(0, b.RetentionPeriod)
}
updated, err := json.Marshal(b)
if err != nil {
return err
}
if err := bkt.Put(ids[i], updated); err != nil {
return err
}
}
return nil
})
}
for i := 0; i < len(buckets); i += batchSize {
end := i + batchSize
if end > len(buckets) {
end = len(buckets)
}
if err := writeBatch(buckets[i:end]); err != nil {
return err
}
}
return nil
},
var Migration0015_RecordShardGroupDurationsInBucketMetadata = UpOnlyMigration(
"record shard group durations in bucket metadata",
repairMissingShardGroupDurations,
)

View File

@ -14,11 +14,15 @@ import (
)
func TestMigration_ShardGroupDuration(t *testing.T) {
testRepairMissingShardGroupDurations(t, 15)
}
func testRepairMissingShardGroupDurations(t *testing.T, migrationNum int) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
// Run up to migration 14.
ts := newService(t, ctx, 14)
// Run up to the migration before the migration-under-test.
ts := newService(t, ctx, migrationNum-2)
// Seed some buckets.
buckets := []*influxdb.Bucket{
@ -65,8 +69,8 @@ func TestMigration_ShardGroupDuration(t *testing.T) {
})
require.NoError(t, err)
// Run the migration.
require.NoError(t, Migration0015_RecordShardGroupDurationsInBucketMetadata.Up(context.Background(), ts.Store))
// Run the migration-under-test.
require.NoError(t, Migrations[migrationNum-1].Up(context.Background(), ts.Store))
// Read the buckets back out of the store.
migratedBuckets := make([]influxdb.Bucket, len(buckets))

View File

@ -0,0 +1,6 @@
package all
var Migration0018_RepairMissingShardGroupDurations = UpOnlyMigration(
"repair missing shard group durations",
repairMissingShardGroupDurations,
)

View File

@ -0,0 +1,7 @@
package all
import "testing"
func TestMigration_PostUpgradeShardGroupDuration(t *testing.T) {
testRepairMissingShardGroupDurations(t, 18)
}

View File

@ -41,5 +41,7 @@ var Migrations = [...]migration.Spec{
Migration0016_AddAnnotationsNotebooksToOperToken,
// add annotations and notebooks resource types to all-access tokens
Migration0017_AddAnnotationsNotebooksToAllAccessTokens,
// repair missing shard group durations
Migration0018_RepairMissingShardGroupDurations,
// {{ do_not_edit . }}
}