feat: initialize `influxd downgrade` command to run `Down()` migrations on metadata (#22800)
parent
b3b4dd6503
commit
eedd84671b
|
@ -0,0 +1,126 @@
|
|||
package downgrade
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/bolt"
|
||||
"github.com/influxdata/influxdb/v2/internal/fs"
|
||||
"github.com/influxdata/influxdb/v2/kit/cli"
|
||||
"github.com/influxdata/influxdb/v2/kv/migration"
|
||||
"github.com/influxdata/influxdb/v2/kv/migration/all"
|
||||
influxlogger "github.com/influxdata/influxdb/v2/logger"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
var downgradeMigrationTargets = map[string]int{
|
||||
"2.0": 15,
|
||||
}
|
||||
|
||||
func NewCommand(ctx context.Context, v *viper.Viper) (*cobra.Command, error) {
|
||||
v2dir, err := fs.InfluxDir()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error fetching default InfluxDB 2.0 dir: %w", err)
|
||||
}
|
||||
|
||||
var validDowngradeTargets []string
|
||||
for k := range downgradeMigrationTargets {
|
||||
validDowngradeTargets = append(validDowngradeTargets, k)
|
||||
}
|
||||
var validTargetsHelp string
|
||||
if len(validDowngradeTargets) == 1 {
|
||||
validTargetsHelp = validDowngradeTargets[0]
|
||||
} else {
|
||||
validTargetsHelp = fmt.Sprintf("<%s>", strings.Join(validDowngradeTargets, "|"))
|
||||
}
|
||||
|
||||
var boltPath string
|
||||
var logLevel zapcore.Level
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: fmt.Sprintf("downgrade [flags] %s", validTargetsHelp),
|
||||
Short: "Downgrade metadata schema used by influxd to match the expectations of an older release",
|
||||
Long: `Run this command prior to downgrading the influxd binary.
|
||||
|
||||
influxd does not guarantee backwards-compatibility with older releases in its embedded
|
||||
metadata stores. Attempting to boot up an older influxd on a BoltDB/SQLite file that has
|
||||
been migrated to a newer schema will result in a startup error. This command downgrades
|
||||
those metadata schemas to match the expectations of an older release, allowing the older
|
||||
influxd binary to boot successfully.
|
||||
|
||||
The target version of the downgrade must be specified, i.e. "influxd downgrade 2.0".
|
||||
`,
|
||||
ValidArgs: validDowngradeTargets,
|
||||
Args: cobra.ExactValidArgs(1),
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
logconf := &influxlogger.Config{
|
||||
Format: "auto",
|
||||
Level: logLevel,
|
||||
}
|
||||
logger, err := logconf.New(os.Stdout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return downgrade(ctx, boltPath, args[0], logger)
|
||||
},
|
||||
}
|
||||
|
||||
opts := []cli.Opt{
|
||||
{
|
||||
DestP: &boltPath,
|
||||
Flag: "bolt-path",
|
||||
Default: filepath.Join(v2dir, bolt.DefaultFilename),
|
||||
Desc: "path for boltdb database",
|
||||
Short: 'm',
|
||||
},
|
||||
{
|
||||
DestP: &logLevel,
|
||||
Flag: "log-level",
|
||||
Default: zapcore.InfoLevel,
|
||||
Desc: "supported log levels are debug, info, warn and error",
|
||||
},
|
||||
}
|
||||
if err := cli.BindOptions(v, cmd, opts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return cmd, nil
|
||||
}
|
||||
|
||||
func downgrade(ctx context.Context, boltPath string, targetVersion string, log *zap.Logger) error {
|
||||
boltClient := bolt.NewClient(log.With(zap.String("service", "bolt")))
|
||||
boltClient.Path = boltPath
|
||||
|
||||
if err := boltClient.Open(ctx); err != nil {
|
||||
return fmt.Errorf("failed to open bolt DB: %w", err)
|
||||
}
|
||||
defer boltClient.Close()
|
||||
|
||||
kvStore := bolt.NewKVStore(log.With(zap.String("service", "kvstore-bolt")), boltPath)
|
||||
kvStore.WithDB(boltClient.DB())
|
||||
|
||||
kvMigrator, err := migration.NewMigrator(log.With(zap.String("service", "kv-migrator")), kvStore)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize KV migrator: %w", err)
|
||||
}
|
||||
info := influxdb.GetBuildInfo()
|
||||
kvMigrator.SetBackupPath(fmt.Sprintf("%s.%s-pre-%s-downgrade.backup", boltPath, info.Version, targetVersion))
|
||||
kvMigrator.AddMigrations(all.Migrations[:]...)
|
||||
|
||||
log.Info("Downgrading KV metadata to target version", zap.String("version", targetVersion))
|
||||
if err := kvMigrator.Down(ctx, downgradeMigrationTargets[targetVersion]); err != nil {
|
||||
return fmt.Errorf("failed to tear down migrations: %w", err)
|
||||
}
|
||||
|
||||
log.Info("Metadata successfully downgraded, you can now safely replace this `influxd` with the target older version",
|
||||
zap.String("version", targetVersion))
|
||||
return nil
|
||||
}
|
|
@ -7,6 +7,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/cmd/influxd/downgrade"
|
||||
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect"
|
||||
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
|
||||
"github.com/influxdata/influxdb/v2/cmd/influxd/recovery"
|
||||
|
@ -50,6 +51,11 @@ func main() {
|
|||
rootCmd.AddCommand(inspectCmd)
|
||||
rootCmd.AddCommand(versionCmd())
|
||||
rootCmd.AddCommand(recovery.NewCommand())
|
||||
downgradeCmd, err := downgrade.NewCommand(ctx, v)
|
||||
if err != nil {
|
||||
handleErr(err.Error())
|
||||
}
|
||||
rootCmd.AddCommand(downgradeCmd)
|
||||
|
||||
rootCmd.SilenceUsage = true
|
||||
if err := rootCmd.Execute(); err != nil {
|
||||
|
|
|
@ -10,9 +10,9 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/kv"
|
||||
)
|
||||
|
||||
var Migration0016_AddAnnotationsNotebooksToOperToken = UpOnlyMigration(
|
||||
"add annotations and notebooks resource types to operator token",
|
||||
migrateTokensMigration(
|
||||
var Migration0016_AddAnnotationsNotebooksToOperToken = &Migration{
|
||||
name: "add annotations and notebooks resource types to operator token",
|
||||
up: migrateTokensMigration(
|
||||
func(t influxdb.Authorization) bool {
|
||||
return permListsMatch(preNotebooksAnnotationsOpPerms(), t.Permissions)
|
||||
},
|
||||
|
@ -20,7 +20,24 @@ var Migration0016_AddAnnotationsNotebooksToOperToken = UpOnlyMigration(
|
|||
t.Permissions = append(t.Permissions, notebooksAndAnnotationsPerms(0)...)
|
||||
},
|
||||
),
|
||||
)
|
||||
down: migrateTokensMigration(
|
||||
func(t influxdb.Authorization) bool {
|
||||
return permListsMatch(append(preNotebooksAnnotationsOpPerms(), notebooksAndAnnotationsPerms(0)...), t.Permissions)
|
||||
},
|
||||
func(t *influxdb.Authorization) {
|
||||
newPerms := t.Permissions[:0]
|
||||
for _, p := range t.Permissions {
|
||||
switch p.Resource.Type {
|
||||
case influxdb.AnnotationsResourceType:
|
||||
case influxdb.NotebooksResourceType:
|
||||
default:
|
||||
newPerms = append(newPerms, p)
|
||||
}
|
||||
}
|
||||
t.Permissions = newPerms
|
||||
},
|
||||
),
|
||||
}
|
||||
|
||||
func migrateTokensMigration(
|
||||
checkToken func(influxdb.Authorization) bool,
|
||||
|
|
|
@ -78,45 +78,56 @@ func TestMigration_AnnotationsNotebooksOperToken(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Run the migration
|
||||
require.NoError(t, Migration0016_AddAnnotationsNotebooksToOperToken.Up(context.Background(), ts.Store))
|
||||
|
||||
// the first item should not be changed
|
||||
encoded1, err := id1.Encode()
|
||||
require.NoError(t, err)
|
||||
err = ts.Store.View(context.Background(), func(tx kv.Tx) error {
|
||||
bkt, err := tx.Bucket(authBucket)
|
||||
require.NoError(t, err)
|
||||
|
||||
b, err := bkt.Get(encoded1)
|
||||
require.NoError(t, err)
|
||||
|
||||
var token influxdb.Authorization
|
||||
require.NoError(t, json.Unmarshal(b, &token))
|
||||
require.Equal(t, auths[0], token)
|
||||
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// the second item is the 2.0.x operator token and should have been updated
|
||||
// with a new permissions list
|
||||
encoded2, err := id2.Encode()
|
||||
require.NoError(t, err)
|
||||
err = ts.Store.View(context.Background(), func(tx kv.Tx) error {
|
||||
bkt, err := tx.Bucket(authBucket)
|
||||
|
||||
checkPerms := func(expectedOpPerms []influxdb.Permission) {
|
||||
// the first item should never change
|
||||
err = ts.Store.View(context.Background(), func(tx kv.Tx) error {
|
||||
bkt, err := tx.Bucket(authBucket)
|
||||
require.NoError(t, err)
|
||||
|
||||
b, err := bkt.Get(encoded1)
|
||||
require.NoError(t, err)
|
||||
|
||||
var token influxdb.Authorization
|
||||
require.NoError(t, json.Unmarshal(b, &token))
|
||||
require.Equal(t, auths[0], token)
|
||||
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
b, err := bkt.Get(encoded2)
|
||||
// the second item is the 2.0.x operator token and should have been updated to match our expectations
|
||||
err = ts.Store.View(context.Background(), func(tx kv.Tx) error {
|
||||
bkt, err := tx.Bucket(authBucket)
|
||||
require.NoError(t, err)
|
||||
|
||||
b, err := bkt.Get(encoded2)
|
||||
require.NoError(t, err)
|
||||
|
||||
var token influxdb.Authorization
|
||||
require.NoError(t, json.Unmarshal(b, &token))
|
||||
|
||||
require.ElementsMatch(t, expectedOpPerms, token.Permissions)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
var token influxdb.Authorization
|
||||
require.NoError(t, json.Unmarshal(b, &token))
|
||||
// Test applying the migration for the 1st time.
|
||||
require.NoError(t, Migration0016_AddAnnotationsNotebooksToOperToken.Up(context.Background(), ts.Store))
|
||||
checkPerms(append(preNotebooksAnnotationsOpPerms(), notebooksAndAnnotationsPerms(0)...))
|
||||
|
||||
require.ElementsMatch(t, append(preNotebooksAnnotationsOpPerms(), notebooksAndAnnotationsPerms(0)...), token.Permissions)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
// Downgrade the migration.
|
||||
require.NoError(t, Migration0016_AddAnnotationsNotebooksToOperToken.Down(context.Background(), ts.Store))
|
||||
checkPerms(preNotebooksAnnotationsOpPerms())
|
||||
|
||||
// Test re-applying the migration after a downgrade.
|
||||
require.NoError(t, Migration0016_AddAnnotationsNotebooksToOperToken.Up(context.Background(), ts.Store))
|
||||
checkPerms(append(preNotebooksAnnotationsOpPerms(), notebooksAndAnnotationsPerms(0)...))
|
||||
}
|
||||
|
||||
func Test_PermListsMatch(t *testing.T) {
|
||||
|
|
|
@ -5,9 +5,9 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
)
|
||||
|
||||
var Migration0017_AddAnnotationsNotebooksToAllAccessTokens = UpOnlyMigration(
|
||||
"add annotations and notebooks resource types to all-access tokens",
|
||||
migrateTokensMigration(
|
||||
var Migration0017_AddAnnotationsNotebooksToAllAccessTokens = &Migration{
|
||||
name: "add annotations and notebooks resource types to all-access tokens",
|
||||
up: migrateTokensMigration(
|
||||
func(t influxdb.Authorization) bool {
|
||||
return permListsMatch(preNotebooksAnnotationsAllAccessPerms(t.OrgID, t.UserID), t.Permissions)
|
||||
},
|
||||
|
@ -15,7 +15,24 @@ var Migration0017_AddAnnotationsNotebooksToAllAccessTokens = UpOnlyMigration(
|
|||
t.Permissions = append(t.Permissions, notebooksAndAnnotationsPerms(t.OrgID)...)
|
||||
},
|
||||
),
|
||||
)
|
||||
down: migrateTokensMigration(
|
||||
func(t influxdb.Authorization) bool {
|
||||
return permListsMatch(append(preNotebooksAnnotationsAllAccessPerms(t.OrgID, t.UserID), notebooksAndAnnotationsPerms(t.OrgID)...), t.Permissions)
|
||||
},
|
||||
func(t *influxdb.Authorization) {
|
||||
newPerms := t.Permissions[:0]
|
||||
for _, p := range t.Permissions {
|
||||
switch p.Resource.Type {
|
||||
case influxdb.AnnotationsResourceType:
|
||||
case influxdb.NotebooksResourceType:
|
||||
default:
|
||||
newPerms = append(newPerms, p)
|
||||
}
|
||||
}
|
||||
t.Permissions = newPerms
|
||||
},
|
||||
),
|
||||
}
|
||||
|
||||
// preNotebooksAnnotationsAllAccessPerms is the list of permissions from a 2.0.x all-access token,
|
||||
// prior to the addition of the notebooks and annotations resource types.
|
||||
|
|
|
@ -61,45 +61,56 @@ func TestMigration_AnnotationsNotebooksAllAccessToken(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Run the migration
|
||||
require.NoError(t, Migration0017_AddAnnotationsNotebooksToAllAccessTokens.Up(context.Background(), ts.Store))
|
||||
|
||||
// the first item should not be changed
|
||||
encoded1, err := id1.Encode()
|
||||
require.NoError(t, err)
|
||||
err = ts.Store.View(context.Background(), func(tx kv.Tx) error {
|
||||
bkt, err := tx.Bucket(authBucket)
|
||||
require.NoError(t, err)
|
||||
|
||||
b, err := bkt.Get(encoded1)
|
||||
require.NoError(t, err)
|
||||
|
||||
var token influxdb.Authorization
|
||||
require.NoError(t, json.Unmarshal(b, &token))
|
||||
require.Equal(t, auths[0], token)
|
||||
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// the second item is a 2.0.x all-access token and should have been updated
|
||||
// with a new permissions list
|
||||
encoded2, err := id2.Encode()
|
||||
require.NoError(t, err)
|
||||
err = ts.Store.View(context.Background(), func(tx kv.Tx) error {
|
||||
bkt, err := tx.Bucket(authBucket)
|
||||
|
||||
checkPerms := func(expectedAllPerms []influxdb.Permission) {
|
||||
// the first item should never change
|
||||
err = ts.Store.View(context.Background(), func(tx kv.Tx) error {
|
||||
bkt, err := tx.Bucket(authBucket)
|
||||
require.NoError(t, err)
|
||||
|
||||
b, err := bkt.Get(encoded1)
|
||||
require.NoError(t, err)
|
||||
|
||||
var token influxdb.Authorization
|
||||
require.NoError(t, json.Unmarshal(b, &token))
|
||||
require.Equal(t, auths[0], token)
|
||||
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
b, err := bkt.Get(encoded2)
|
||||
// the second item is a 2.0.x all-access token and should have been updated to match our expectations
|
||||
err = ts.Store.View(context.Background(), func(tx kv.Tx) error {
|
||||
bkt, err := tx.Bucket(authBucket)
|
||||
require.NoError(t, err)
|
||||
|
||||
b, err := bkt.Get(encoded2)
|
||||
require.NoError(t, err)
|
||||
|
||||
var token influxdb.Authorization
|
||||
require.NoError(t, json.Unmarshal(b, &token))
|
||||
|
||||
require.ElementsMatch(t, expectedAllPerms, token.Permissions)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
var token influxdb.Authorization
|
||||
require.NoError(t, json.Unmarshal(b, &token))
|
||||
// Test applying the migration for the 1st time.
|
||||
require.NoError(t, Migration0017_AddAnnotationsNotebooksToAllAccessTokens.Up(context.Background(), ts.Store))
|
||||
checkPerms(append(preNotebooksAnnotationsAllAccessPerms(OrgID, UserID), notebooksAndAnnotationsPerms(OrgID)...))
|
||||
|
||||
require.ElementsMatch(t, append(preNotebooksAnnotationsAllAccessPerms(OrgID, UserID), notebooksAndAnnotationsPerms(OrgID)...), token.Permissions)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
// Downgrade the migration.
|
||||
require.NoError(t, Migration0017_AddAnnotationsNotebooksToAllAccessTokens.Down(context.Background(), ts.Store))
|
||||
checkPerms(preNotebooksAnnotationsAllAccessPerms(OrgID, UserID))
|
||||
|
||||
// Test re-applying the migration after a downgrade.
|
||||
require.NoError(t, Migration0017_AddAnnotationsNotebooksToAllAccessTokens.Up(context.Background(), ts.Store))
|
||||
checkPerms(append(preNotebooksAnnotationsAllAccessPerms(OrgID, UserID), notebooksAndAnnotationsPerms(OrgID)...))
|
||||
}
|
||||
|
||||
// This set of permissions shouldn't change - it doesn't match an all-access token.
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
package all
|
||||
|
||||
// NOTE: Down() is purposefully left as a no-op here because this migration fills in
|
||||
// values that were missing because of a logic bug, and doesn't actually modify the
|
||||
// metadata schema.
|
||||
var Migration0018_RepairMissingShardGroupDurations = UpOnlyMigration(
|
||||
"repair missing shard group durations",
|
||||
repairMissingShardGroupDurations,
|
||||
|
|
|
@ -5,9 +5,9 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
)
|
||||
|
||||
var Migration0019_AddRemotesReplicationsToTokens = UpOnlyMigration(
|
||||
"add remotes and replications resource types to operator and all-access tokens",
|
||||
migrateTokensMigration(
|
||||
var Migration0019_AddRemotesReplicationsToTokens = &Migration{
|
||||
name: "add remotes and replications resource types to operator and all-access tokens",
|
||||
up: migrateTokensMigration(
|
||||
func(t influxdb.Authorization) bool {
|
||||
return permListsMatch(preReplicationOpPerms(), t.Permissions) ||
|
||||
permListsMatch(preReplicationAllAccessPerms(t.OrgID, t.UserID), t.Permissions)
|
||||
|
@ -20,7 +20,25 @@ var Migration0019_AddRemotesReplicationsToTokens = UpOnlyMigration(
|
|||
}
|
||||
},
|
||||
),
|
||||
)
|
||||
down: migrateTokensMigration(
|
||||
func(t influxdb.Authorization) bool {
|
||||
return permListsMatch(append(preReplicationOpPerms(), remotesAndReplicationsPerms(0)...), t.Permissions) ||
|
||||
permListsMatch(append(preReplicationAllAccessPerms(t.OrgID, t.UserID), remotesAndReplicationsPerms(t.OrgID)...), t.Permissions)
|
||||
},
|
||||
func(t *influxdb.Authorization) {
|
||||
newPerms := t.Permissions[:0]
|
||||
for _, p := range t.Permissions {
|
||||
switch p.Resource.Type {
|
||||
case influxdb.RemotesResourceType:
|
||||
case influxdb.ReplicationsResourceType:
|
||||
default:
|
||||
newPerms = append(newPerms, p)
|
||||
}
|
||||
}
|
||||
t.Permissions = newPerms
|
||||
},
|
||||
),
|
||||
}
|
||||
|
||||
func preReplicationOpPerms() []influxdb.Permission {
|
||||
return append(preNotebooksAnnotationsOpPerms(), notebooksAndAnnotationsPerms(0)...)
|
||||
|
|
|
@ -78,45 +78,56 @@ func TestMigration_RemotesReplicationsOperToken(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Run the migration
|
||||
require.NoError(t, Migration0019_AddRemotesReplicationsToTokens.Up(context.Background(), ts.Store))
|
||||
|
||||
// the first item should not be changed
|
||||
encoded1, err := id1.Encode()
|
||||
require.NoError(t, err)
|
||||
err = ts.Store.View(context.Background(), func(tx kv.Tx) error {
|
||||
bkt, err := tx.Bucket(authBucket)
|
||||
require.NoError(t, err)
|
||||
|
||||
b, err := bkt.Get(encoded1)
|
||||
require.NoError(t, err)
|
||||
|
||||
var token influxdb.Authorization
|
||||
require.NoError(t, json.Unmarshal(b, &token))
|
||||
require.Equal(t, auths[0], token)
|
||||
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// the second item is the 2.0.x operator token and should have been updated
|
||||
// with a new permissions list
|
||||
encoded2, err := id2.Encode()
|
||||
require.NoError(t, err)
|
||||
err = ts.Store.View(context.Background(), func(tx kv.Tx) error {
|
||||
bkt, err := tx.Bucket(authBucket)
|
||||
|
||||
checkPerms := func(expectedAllPerms []influxdb.Permission) {
|
||||
// the first item should never change
|
||||
err = ts.Store.View(context.Background(), func(tx kv.Tx) error {
|
||||
bkt, err := tx.Bucket(authBucket)
|
||||
require.NoError(t, err)
|
||||
|
||||
b, err := bkt.Get(encoded1)
|
||||
require.NoError(t, err)
|
||||
|
||||
var token influxdb.Authorization
|
||||
require.NoError(t, json.Unmarshal(b, &token))
|
||||
require.Equal(t, auths[0], token)
|
||||
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
b, err := bkt.Get(encoded2)
|
||||
// the second item is a 2.0.x all-access token and should have been updated to match our expectations
|
||||
err = ts.Store.View(context.Background(), func(tx kv.Tx) error {
|
||||
bkt, err := tx.Bucket(authBucket)
|
||||
require.NoError(t, err)
|
||||
|
||||
b, err := bkt.Get(encoded2)
|
||||
require.NoError(t, err)
|
||||
|
||||
var token influxdb.Authorization
|
||||
require.NoError(t, json.Unmarshal(b, &token))
|
||||
|
||||
require.ElementsMatch(t, expectedAllPerms, token.Permissions)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
var token influxdb.Authorization
|
||||
require.NoError(t, json.Unmarshal(b, &token))
|
||||
// Test applying the migration for the 1st time.
|
||||
require.NoError(t, Migration0019_AddRemotesReplicationsToTokens.Up(context.Background(), ts.Store))
|
||||
checkPerms(append(preReplicationOpPerms(), remotesAndReplicationsPerms(0)...))
|
||||
|
||||
require.ElementsMatch(t, append(preReplicationOpPerms(), remotesAndReplicationsPerms(0)...), token.Permissions)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
// Downgrade the migration.
|
||||
require.NoError(t, Migration0019_AddRemotesReplicationsToTokens.Down(context.Background(), ts.Store))
|
||||
checkPerms(preReplicationOpPerms())
|
||||
|
||||
// Test re-applying the migration after a downgrade.
|
||||
require.NoError(t, Migration0019_AddRemotesReplicationsToTokens.Up(context.Background(), ts.Store))
|
||||
checkPerms(append(preReplicationOpPerms(), remotesAndReplicationsPerms(0)...))
|
||||
}
|
||||
|
||||
func TestMigration_RemotesReplicationsAllAccessToken(t *testing.T) {
|
||||
|
@ -168,43 +179,54 @@ func TestMigration_RemotesReplicationsAllAccessToken(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Run the migration
|
||||
require.NoError(t, Migration0019_AddRemotesReplicationsToTokens.Up(context.Background(), ts.Store))
|
||||
|
||||
// the first item should not be changed
|
||||
encoded1, err := id1.Encode()
|
||||
require.NoError(t, err)
|
||||
err = ts.Store.View(context.Background(), func(tx kv.Tx) error {
|
||||
bkt, err := tx.Bucket(authBucket)
|
||||
require.NoError(t, err)
|
||||
|
||||
b, err := bkt.Get(encoded1)
|
||||
require.NoError(t, err)
|
||||
|
||||
var token influxdb.Authorization
|
||||
require.NoError(t, json.Unmarshal(b, &token))
|
||||
require.Equal(t, auths[0], token)
|
||||
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// the second item is a 2.0.x all-access token and should have been updated
|
||||
// with a new permissions list
|
||||
encoded2, err := id2.Encode()
|
||||
require.NoError(t, err)
|
||||
err = ts.Store.View(context.Background(), func(tx kv.Tx) error {
|
||||
bkt, err := tx.Bucket(authBucket)
|
||||
|
||||
checkPerms := func(expectedAllPerms []influxdb.Permission) {
|
||||
// the first item should never change
|
||||
err = ts.Store.View(context.Background(), func(tx kv.Tx) error {
|
||||
bkt, err := tx.Bucket(authBucket)
|
||||
require.NoError(t, err)
|
||||
|
||||
b, err := bkt.Get(encoded1)
|
||||
require.NoError(t, err)
|
||||
|
||||
var token influxdb.Authorization
|
||||
require.NoError(t, json.Unmarshal(b, &token))
|
||||
require.Equal(t, auths[0], token)
|
||||
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
b, err := bkt.Get(encoded2)
|
||||
// the second item is a 2.0.x all-access token and should have been updated to match our expectations
|
||||
err = ts.Store.View(context.Background(), func(tx kv.Tx) error {
|
||||
bkt, err := tx.Bucket(authBucket)
|
||||
require.NoError(t, err)
|
||||
|
||||
b, err := bkt.Get(encoded2)
|
||||
require.NoError(t, err)
|
||||
|
||||
var token influxdb.Authorization
|
||||
require.NoError(t, json.Unmarshal(b, &token))
|
||||
|
||||
require.ElementsMatch(t, expectedAllPerms, token.Permissions)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
var token influxdb.Authorization
|
||||
require.NoError(t, json.Unmarshal(b, &token))
|
||||
// Test applying the migration for the 1st time.
|
||||
require.NoError(t, Migration0019_AddRemotesReplicationsToTokens.Up(context.Background(), ts.Store))
|
||||
checkPerms(append(preReplicationAllAccessPerms(OrgID, UserID), remotesAndReplicationsPerms(OrgID)...))
|
||||
|
||||
require.ElementsMatch(t, append(preReplicationAllAccessPerms(OrgID, UserID), remotesAndReplicationsPerms(OrgID)...), token.Permissions)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
// Downgrade the migration.
|
||||
require.NoError(t, Migration0019_AddRemotesReplicationsToTokens.Down(context.Background(), ts.Store))
|
||||
checkPerms(preReplicationAllAccessPerms(OrgID, UserID))
|
||||
|
||||
// Test re-applying the migration after a downgrade.
|
||||
require.NoError(t, Migration0019_AddRemotesReplicationsToTokens.Up(context.Background(), ts.Store))
|
||||
checkPerms(append(preReplicationAllAccessPerms(OrgID, UserID), remotesAndReplicationsPerms(OrgID)...))
|
||||
}
|
||||
|
|
|
@ -220,7 +220,7 @@ func (m *Migrator) Up(ctx context.Context) error {
|
|||
// 0003 add index "foo on baz" | (down)
|
||||
//
|
||||
// Down would call down() on 0002 and then on 0001.
|
||||
func (m *Migrator) Down(ctx context.Context) (err error) {
|
||||
func (m *Migrator) Down(ctx context.Context, untilMigration int) (err error) {
|
||||
wrapErr := func(err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
|
@ -249,12 +249,35 @@ func (m *Migrator) Down(ctx context.Context) (err error) {
|
|||
return wrapErr(err)
|
||||
}
|
||||
|
||||
migrationsToDo := len(migrations)
|
||||
if migrationsToDo > 0 {
|
||||
m.logger.Info("Tearing down metadata migrations", zap.Int("migration_count", migrationsToDo))
|
||||
migrationsToDo := len(migrations) - untilMigration
|
||||
if migrationsToDo == 0 {
|
||||
return nil
|
||||
}
|
||||
if migrationsToDo < 0 {
|
||||
m.logger.Warn("KV metadata is already on a schema older than target, nothing to do")
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := migrationsToDo - 1; i >= 0; i-- {
|
||||
if m.backupPath != "" {
|
||||
m.logger.Info("Backing up pre-migration metadata", zap.String("backup_path", m.backupPath))
|
||||
if err := func() error {
|
||||
out, err := os.Create(m.backupPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer out.Close()
|
||||
|
||||
if err := m.store.Backup(ctx, out); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return fmt.Errorf("failed to back up pre-migration metadata: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
m.logger.Info("Tearing down metadata migrations", zap.Int("migration_count", migrationsToDo))
|
||||
for i := len(migrations) - 1; i >= untilMigration; i-- {
|
||||
migration := migrations[i]
|
||||
|
||||
m.logMigrationEvent(DownMigrationState, migration.Migration, "started")
|
||||
|
|
|
@ -217,7 +217,7 @@ func Migrator(t *testing.T, store kv.SchemaStore, newMigrator func(*testing.T, *
|
|||
|
||||
t.Run("Down() calls down for each migration", func(t *testing.T) {
|
||||
// apply all migrations
|
||||
if err := migrator.Down(ctx); err != nil {
|
||||
if err := migrator.Down(ctx, 0); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
@ -311,9 +311,50 @@ func Migrator(t *testing.T, store kv.SchemaStore, newMigrator func(*testing.T, *
|
|||
migrationFour.assertUpCalled(t, 2)
|
||||
})
|
||||
|
||||
t.Run("Down() calls down on a subset of migrations", func(t *testing.T) {
|
||||
if err := migrator.Down(ctx, 2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// list migration again
|
||||
migrations, err := migrator.List(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if expected := []migration.Migration{
|
||||
{
|
||||
ID: platform.ID(1),
|
||||
Name: "migration one",
|
||||
State: migration.UpMigrationState,
|
||||
StartedAt: ts(9),
|
||||
FinishedAt: ts(10),
|
||||
},
|
||||
{
|
||||
ID: platform.ID(2),
|
||||
Name: "migration two",
|
||||
State: migration.UpMigrationState,
|
||||
StartedAt: ts(11),
|
||||
FinishedAt: ts(12),
|
||||
},
|
||||
{
|
||||
ID: platform.ID(3),
|
||||
Name: "migration three",
|
||||
State: migration.DownMigrationState,
|
||||
},
|
||||
{
|
||||
ID: platform.ID(4),
|
||||
Name: "migration four",
|
||||
State: migration.DownMigrationState,
|
||||
},
|
||||
}; !reflect.DeepEqual(expected, migrations) {
|
||||
t.Errorf("expected %#v, found %#v", expected, migrations)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("List() missing migration spec errors as expected", func(t *testing.T) {
|
||||
// remove last specification from migration list
|
||||
migrator.Specs = migrator.Specs[:len(migrator.Specs)-1]
|
||||
// remove all but first specification from migration list
|
||||
migrator.Specs = migrator.Specs[:1]
|
||||
// list migration again
|
||||
_, err := migrator.List(ctx)
|
||||
if !errors.Is(err, migration.ErrMigrationSpecNotFound) {
|
||||
|
|
Loading…
Reference in New Issue