feat: recommend `influxd downgrade` after encountering unknown KV migration (#22805)
parent
b93f3a3222
commit
0fbda8397d
|
@ -0,0 +1,14 @@
|
||||||
|
package migration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb/v2/kit/platform/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func ErrInvalidMigration(n string) *errors.Error {
|
||||||
|
return &errors.Error{
|
||||||
|
Code: errors.EInternal,
|
||||||
|
Msg: fmt.Sprintf(`DB contains record of unknown migration %q - if you are downgrading from a more recent version of influxdb, please run the "influxd downgrade" command from that version to revert your metadata to be compatible with this version prior to starting influxd.`, n),
|
||||||
|
}
|
||||||
|
}
|
|
@ -3,23 +3,17 @@ package migration
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb/v2/kit/migration"
|
||||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||||
"github.com/influxdata/influxdb/v2/kv"
|
"github.com/influxdata/influxdb/v2/kv"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var migrationBucket = []byte("migrationsv1")
|
||||||
migrationBucket = []byte("migrationsv1")
|
|
||||||
|
|
||||||
// ErrMigrationSpecNotFound is returned when a migration specification is missing
|
|
||||||
// for an already applied migration.
|
|
||||||
ErrMigrationSpecNotFound = errors.New("migration specification not found")
|
|
||||||
)
|
|
||||||
|
|
||||||
type Store = kv.SchemaStore
|
type Store = kv.SchemaStore
|
||||||
|
|
||||||
|
@ -323,25 +317,25 @@ func (m *Migrator) walk(ctx context.Context, store kv.Store, fn func(id platform
|
||||||
return false, fmt.Errorf("decoding migration id: %w", err)
|
return false, fmt.Errorf("decoding migration id: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var migration Migration
|
var mig Migration
|
||||||
if err := json.Unmarshal(v, &migration); err != nil {
|
if err := json.Unmarshal(v, &mig); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
idx := int(id) - 1
|
idx := int(id) - 1
|
||||||
if idx >= len(m.Specs) {
|
if idx >= len(m.Specs) {
|
||||||
return false, fmt.Errorf("migration %q: %w", migration.Name, ErrMigrationSpecNotFound)
|
return false, migration.ErrInvalidMigration(mig.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
if spec := m.Specs[idx]; spec.MigrationName() != migration.Name {
|
if spec := m.Specs[idx]; spec.MigrationName() != mig.Name {
|
||||||
return false, fmt.Errorf("expected migration %q, found %q", spec.MigrationName(), migration.Name)
|
return false, fmt.Errorf("expected migration %q, found %q", spec.MigrationName(), mig.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
if migration.FinishedAt != nil {
|
if mig.FinishedAt != nil {
|
||||||
migration.State = UpMigrationState
|
mig.State = UpMigrationState
|
||||||
}
|
}
|
||||||
|
|
||||||
fn(id, migration)
|
fn(id, mig)
|
||||||
|
|
||||||
return true, nil
|
return true, nil
|
||||||
})
|
})
|
||||||
|
|
|
@ -9,17 +9,10 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb/v2/kit/platform/errors"
|
"github.com/influxdata/influxdb/v2/kit/migration"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func errInvalidMigration(n string) *errors.Error {
|
|
||||||
return &errors.Error{
|
|
||||||
Code: errors.EInternal,
|
|
||||||
Msg: fmt.Sprintf(`DB contains record of unknown migration %q - if you are downgrading from a more recent version of influxdb, please run the "influxd downgrade" command from that version to revert your metadata to be compatible with this version prior to starting influxd.`, n),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type Migrator struct {
|
type Migrator struct {
|
||||||
store *SqlStore
|
store *SqlStore
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
|
@ -58,7 +51,7 @@ func (m *Migrator) Up(ctx context.Context, source embed.FS) error {
|
||||||
var lastMigration int
|
var lastMigration int
|
||||||
for idx := range executedMigrations {
|
for idx := range executedMigrations {
|
||||||
if idx > len(knownMigrations)-1 || executedMigrations[idx] != dropExtension(knownMigrations[idx].Name()) {
|
if idx > len(knownMigrations)-1 || executedMigrations[idx] != dropExtension(knownMigrations[idx].Name()) {
|
||||||
return errInvalidMigration(executedMigrations[idx])
|
return migration.ErrInvalidMigration(executedMigrations[idx])
|
||||||
}
|
}
|
||||||
|
|
||||||
lastMigration, err = scriptVersion(executedMigrations[idx])
|
lastMigration, err = scriptVersion(executedMigrations[idx])
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb/v2/kit/errors"
|
"github.com/influxdata/influxdb/v2/kit/errors"
|
||||||
|
"github.com/influxdata/influxdb/v2/kit/migration"
|
||||||
"github.com/influxdata/influxdb/v2/sqlite/test_migrations"
|
"github.com/influxdata/influxdb/v2/sqlite/test_migrations"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -75,7 +76,7 @@ func TestUpErrors(t *testing.T) {
|
||||||
migrator := NewMigrator(store, zaptest.NewLogger(t))
|
migrator := NewMigrator(store, zaptest.NewLogger(t))
|
||||||
require.NoError(t, migrator.Up(ctx, test_migrations.MigrationTable))
|
require.NoError(t, migrator.Up(ctx, test_migrations.MigrationTable))
|
||||||
require.NoError(t, store.execTrans(ctx, `INSERT INTO migrations (name) VALUES ("0010_some_bad_migration")`))
|
require.NoError(t, store.execTrans(ctx, `INSERT INTO migrations (name) VALUES ("0010_some_bad_migration")`))
|
||||||
require.Equal(t, errInvalidMigration("0010_some_bad_migration"), migrator.Up(ctx, test_migrations.All))
|
require.Equal(t, migration.ErrInvalidMigration("0010_some_bad_migration"), migrator.Up(ctx, test_migrations.All))
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("known + unknown migrations exist", func(t *testing.T) {
|
t.Run("known + unknown migrations exist", func(t *testing.T) {
|
||||||
|
@ -86,7 +87,7 @@ func TestUpErrors(t *testing.T) {
|
||||||
migrator := NewMigrator(store, zaptest.NewLogger(t))
|
migrator := NewMigrator(store, zaptest.NewLogger(t))
|
||||||
require.NoError(t, migrator.Up(ctx, test_migrations.First))
|
require.NoError(t, migrator.Up(ctx, test_migrations.First))
|
||||||
require.NoError(t, store.execTrans(ctx, `INSERT INTO migrations (name) VALUES ("0010_some_bad_migration")`))
|
require.NoError(t, store.execTrans(ctx, `INSERT INTO migrations (name) VALUES ("0010_some_bad_migration")`))
|
||||||
require.Equal(t, errInvalidMigration("0010_some_bad_migration"), migrator.Up(ctx, test_migrations.All))
|
require.Equal(t, migration.ErrInvalidMigration("0010_some_bad_migration"), migrator.Up(ctx, test_migrations.All))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,6 @@ package testing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -10,6 +9,7 @@ import (
|
||||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||||
"github.com/influxdata/influxdb/v2/kv"
|
"github.com/influxdata/influxdb/v2/kv"
|
||||||
"github.com/influxdata/influxdb/v2/kv/migration"
|
"github.com/influxdata/influxdb/v2/kv/migration"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -357,9 +357,9 @@ func Migrator(t *testing.T, store kv.SchemaStore, newMigrator func(*testing.T, *
|
||||||
migrator.Specs = migrator.Specs[:1]
|
migrator.Specs = migrator.Specs[:1]
|
||||||
// list migration again
|
// list migration again
|
||||||
_, err := migrator.List(ctx)
|
_, err := migrator.List(ctx)
|
||||||
if !errors.Is(err, migration.ErrMigrationSpecNotFound) {
|
require.Error(t, err)
|
||||||
t.Errorf("expected migration spec error, found %v", err)
|
require.Contains(t, err.Error(), "influxd downgrade",
|
||||||
}
|
"Error returned on unknown migration should recommend `influxd downgrade`")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue