fix(cmd/influxd): improve `influxd upgrade` logging, and require manual confirmation of data copy (#20440)

pull/20469/head
Daniel Moran 2021-01-07 07:34:48 -08:00 committed by GitHub
parent 52692ba7d4
commit 30306e5b10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 138 additions and 106 deletions

View File

@ -33,6 +33,7 @@ Replacement `tsi1` indexes will be automatically generated on startup for shards
1. [20313](https://github.com/influxdata/influxdb/pull/20313): Automatically build `tsi1` indexes for shards that need it instead of falling back to `inmem`.
1. [20313](https://github.com/influxdata/influxdb/pull/20313): Fix logging initialization for storage engine.
1. [20442](https://github.com/influxdata/influxdb/pull/20442): Don't return 500 codes for partial write failures.
1. [20440](https://github.com/influxdata/influxdb/pull/20440): Add confirmation step w/ file sizes before copying data files in `influxd upgrade`.
## v2.0.3 [2020-12-14]

View File

@ -2,20 +2,25 @@ package upgrade
import (
"context"
"errors"
"fmt"
"github.com/dustin/go-humanize"
"os"
"path/filepath"
"strings"
"github.com/dustin/go-humanize"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/cmd/internal"
"github.com/influxdata/influxdb/v2/pkg/fs"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"github.com/tcnksm/go-input"
"go.uber.org/zap"
)
// upgradeDatabases creates databases, buckets, retention policies and shard info according to 1.x meta and copies data
func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, v1opts *optionsV1, v2opts *optionsV2, orgID influxdb.ID, log *zap.Logger) (map[string][]influxdb.ID, error) {
func upgradeDatabases(ctx context.Context, ui *input.UI, v1 *influxDBv1, v2 *influxDBv2, opts *options, orgID influxdb.ID, log *zap.Logger) (map[string][]influxdb.ID, error) {
v1opts := opts.source
v2opts := opts.target
db2BucketIds := make(map[string][]influxdb.ID)
targetDataPath := filepath.Join(v2opts.enginePath, "data")
@ -33,29 +38,8 @@ func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, v1opt
log.Info("No database found in the 1.x meta")
return db2BucketIds, nil
}
// Check space
log.Info("Checking space")
size, err := DirSize(v1opts.dataDir)
if err != nil {
return nil, fmt.Errorf("error getting size of %s: %w", v1opts.dataDir, err)
}
size2, err := DirSize(v1opts.walDir)
if err != nil {
return nil, fmt.Errorf("error getting size of %s: %w", v1opts.walDir, err)
}
size += size2
v2dir := filepath.Dir(v2opts.boltPath)
diskInfo, err := fs.DiskUsage(v2dir)
if err != nil {
return nil, fmt.Errorf("error getting info of disk %s: %w", v2dir, err)
}
if options.verbose {
log.Info("Disk space info",
zap.String("Free space", humanize.Bytes(diskInfo.Free)),
zap.String("Requested space", humanize.Bytes(size)))
}
if size > diskInfo.Free {
return nil, fmt.Errorf("not enough space on target disk of %s: need %d, available %d ", v2dir, size, diskInfo.Free)
if err := checkDiskSpace(ui, opts, log); err != nil {
return nil, err
}
cqFile, err := os.OpenFile(v2opts.cqPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
@ -71,15 +55,10 @@ func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, v1opt
// export any continuous queries
for _, db := range v1.meta.Databases() {
if db.Name == "_internal" {
if options.verbose {
log.Info("Skipping _internal ")
}
log.Debug("Skipping _internal ")
continue
}
if options.verbose {
log.Info("Upgrading database ",
zap.String("database", db.Name))
}
log.Debug("Upgrading database", zap.String("database", db.Name))
// db to buckets IDs mapping
db2BucketIds[db.Name] = make([]influxdb.ID, 0, len(db.RetentionPolicies))
@ -95,10 +74,7 @@ func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, v1opt
RetentionPolicyName: rp.Name,
RetentionPeriod: rp.Duration,
}
if options.verbose {
log.Info("Creating bucket ",
zap.String("Bucket", bucket.Name))
}
log.Debug("Creating bucket", zap.String("Bucket", bucket.Name))
err = v2.bucketSvc.CreateBucket(ctx, bucket)
if err != nil {
return nil, fmt.Errorf("error creating bucket %s: %w", bucket.Name, err)
@ -106,10 +82,7 @@ func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, v1opt
}
db2BucketIds[db.Name] = append(db2BucketIds[db.Name], bucket.ID)
if options.verbose {
log.Info("Creating database with retention policy",
zap.String("database", bucket.ID.String()))
}
log.Debug("Creating database with retention policy", zap.String("database", bucket.ID.String()))
spec := rp.ToSpec()
spec.Name = meta.DefaultRetentionPolicyName
dbv2, err := v2.meta.CreateDatabaseWithRetentionPolicy(bucket.ID.String(), spec)
@ -124,25 +97,25 @@ func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, v1opt
OrganizationID: orgID,
BucketID: bucket.ID,
}
if options.verbose {
log.Info("Creating mapping",
zap.String("database", mapping.Database),
zap.String("retention policy", mapping.RetentionPolicy),
zap.String("orgID", mapping.OrganizationID.String()),
zap.String("bucketID", mapping.BucketID.String()))
}
log.Debug(
"Creating mapping",
zap.String("database", mapping.Database),
zap.String("retention policy", mapping.RetentionPolicy),
zap.String("orgID", mapping.OrganizationID.String()),
zap.String("bucketID", mapping.BucketID.String()),
)
err = v2.dbrpSvc.Create(ctx, mapping)
if err != nil {
return nil, fmt.Errorf("error creating mapping %s/%s -> Org %s, bucket %s: %w", mapping.Database, mapping.RetentionPolicy, mapping.OrganizationID.String(), mapping.BucketID.String(), err)
}
shardsNum := 0
for _, sg := range rp.ShardGroups {
if options.verbose {
log.Info("Creating shard group",
zap.String("database", dbv2.Name),
zap.String("retention policy", dbv2.DefaultRetentionPolicy),
zap.Time("time", sg.StartTime))
}
log.Debug(
"Creating shard group",
zap.String("database", dbv2.Name),
zap.String("retention policy", dbv2.DefaultRetentionPolicy),
zap.Time("time", sg.StartTime),
)
shardsNum += len(sg.Shards)
_, err := v2.meta.CreateShardGroupWithShards(dbv2.Name, dbv2.DefaultRetentionPolicy, sg.StartTime, sg.Shards)
if err != nil {
@ -152,11 +125,11 @@ func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, v1opt
//empty retention policy doesn't have data
if shardsNum > 0 {
targetPath := filepath.Join(targetDataPath, dbv2.Name, spec.Name)
if options.verbose {
log.Info("Copying data",
zap.String("source", sourcePath),
zap.String("target", targetPath))
}
log.Debug(
"Copying data",
zap.String("source", sourcePath),
zap.String("target", targetPath),
)
err = CopyDir(sourcePath,
targetPath,
nil,
@ -167,11 +140,11 @@ func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, v1opt
}
sourcePath = filepath.Join(v1opts.walDir, db.Name, rp.Name)
targetPath = filepath.Join(targetWalPath, dbv2.Name, spec.Name)
if options.verbose {
log.Info("Copying wal",
zap.String("source", sourcePath),
zap.String("target", targetPath))
}
log.Debug(
"Copying wal",
zap.String("source", sourcePath),
zap.String("target", targetPath),
)
err = CopyDir(sourcePath,
targetPath,
nil,
@ -204,9 +177,7 @@ func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, v1opt
}
for _, cq := range db.ContinuousQueries {
if options.verbose {
log.Info("Exporting CQ", zap.String("db", db.Name), zap.String("cq_name", cq.Name))
}
log.Debug("Exporting CQ", zap.String("db", db.Name), zap.String("cq_name", cq.Name))
padding := maxNameLen - len(cq.Name) + 1
_, err := cqFile.WriteString(fmt.Sprintf("%s%s%s\n", cq.Name, strings.Repeat(" ", padding), cq.Query))
@ -222,3 +193,45 @@ func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, v1opt
return db2BucketIds, nil
}
// checkDiskSpace ensures there is enough room at the target path to store
// a full copy of all V1 data.
func checkDiskSpace(ui *input.UI, opts *options, log *zap.Logger) error {
log.Info("Checking available disk space")
size, err := DirSize(opts.source.dataDir)
if err != nil {
return fmt.Errorf("error getting size of %s: %w", opts.source.dataDir, err)
}
walSize, err := DirSize(opts.source.walDir)
if err != nil {
return fmt.Errorf("error getting size of %s: %w", opts.source.walDir, err)
}
size += walSize
v2dir := filepath.Dir(opts.target.boltPath)
diskInfo, err := fs.DiskUsage(v2dir)
if err != nil {
return fmt.Errorf("error getting info of disk %s: %w", v2dir, err)
}
freeBytes := humanize.Bytes(diskInfo.Free)
requiredBytes := humanize.Bytes(size)
log.Info("Computed disk space", zap.String("free", freeBytes), zap.String("required", requiredBytes))
if size > diskInfo.Free {
return fmt.Errorf("not enough space on target disk of %s: need %d, available %d", v2dir, size, diskInfo.Free)
}
if !opts.force {
if confirmed := internal.GetConfirm(ui, func() string {
return fmt.Sprintf(`Proceeding will copy all V1 data to %q
Space available: %s
Space required: %s
`, v2dir, freeBytes, requiredBytes)
}); !confirmed {
return errors.New("upgrade was canceled")
}
}
return nil
}

View File

@ -1,6 +1,7 @@
package upgrade
import (
"bytes"
"context"
"io/ioutil"
"net/http"
@ -11,7 +12,6 @@ import (
"testing"
"github.com/dustin/go-humanize"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
@ -19,6 +19,7 @@ import (
"github.com/influxdata/influxdb/v2/v1/services/meta"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tcnksm/go-input"
"go.uber.org/zap"
)
@ -61,8 +62,8 @@ func TestUpgradeRealDB(t *testing.T) {
v2, err := newInfluxDBv2(ctx, v2opts, zap.NewNop())
require.Nil(t, err)
options.target = *v2opts
req, err := nonInteractive()
opts := &options{source: *v1opts, target: *v2opts, force: true}
req, err := nonInteractive(opts)
require.Nil(t, err)
assert.Equal(t, req.RetentionPeriod, humanize.Week, "Retention policy should pass through")
@ -82,7 +83,11 @@ func TestUpgradeRealDB(t *testing.T) {
log, err := zap.NewDevelopment()
require.Nil(t, err)
db2bids, err := upgradeDatabases(ctx, v1, v2, v1opts, v2opts, resp.Org.ID, log)
ui := &input.UI{
Writer: &bytes.Buffer{},
Reader: &bytes.Buffer{},
}
db2bids, err := upgradeDatabases(ctx, ui, v1, v2, opts, resp.Org.ID, log)
require.Nil(t, err)
err = v2.close()

View File

@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strconv"
"time"
@ -25,7 +24,7 @@ func setupAdmin(ctx context.Context, v2 *influxDBv2, req *influxdb.OnboardingReq
return res, nil
}
func isInteractive() bool {
func isInteractive(options *options) bool {
return !options.force ||
options.target.userName == "" ||
options.target.password == "" ||
@ -33,14 +32,14 @@ func isInteractive() bool {
options.target.bucket == ""
}
func onboardingRequest() (*influxdb.OnboardingRequest, error) {
if isInteractive() {
return interactive()
func onboardingRequest(ui *input.UI, options *options) (*influxdb.OnboardingRequest, error) {
if isInteractive(options) {
return interactive(ui, options)
}
return nonInteractive()
return nonInteractive(options)
}
func nonInteractive() (*influxdb.OnboardingRequest, error) {
func nonInteractive(options *options) (*influxdb.OnboardingRequest, error) {
if len(options.target.password) < internal.MinPasswordLen {
return nil, internal.ErrPasswordIsTooShort
}
@ -63,11 +62,7 @@ func nonInteractive() (*influxdb.OnboardingRequest, error) {
return req, nil
}
func interactive() (req *influxdb.OnboardingRequest, err error) {
ui := &input.UI{
Writer: os.Stdout,
Reader: os.Stdin,
}
func interactive(ui *input.UI, options *options) (req *influxdb.OnboardingRequest, err error) {
req = new(influxdb.OnboardingRequest)
fmt.Println(string(internal.PromptWithColor(`Welcome to InfluxDB 2.0 upgrade!`, internal.ColorYellow)))
if options.target.userName != "" {
@ -119,8 +114,7 @@ func interactive() (req *influxdb.OnboardingRequest, err error) {
if req.RetentionPeriod > 0 {
rp = fmt.Sprintf("%d hrs", req.RetentionPeriod/time.Hour)
}
return fmt.Sprintf(`
You have entered:
return fmt.Sprintf(`You have entered:
Username: %s
Organization: %s
Bucket: %s

View File

@ -30,6 +30,7 @@ import (
"github.com/influxdata/influxdb/v2/v1/services/meta/filestore"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/tcnksm/go-input"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
@ -102,22 +103,19 @@ type optionsV2 struct {
retention string
}
var options = struct {
type options struct {
// flags for source InfluxDB
source optionsV1
// flags for target InfluxDB
target optionsV2
// verbose output
verbose bool
// logging
logLevel zapcore.Level
logPath string
force bool
}{}
}
func NewCommand(v *viper.Viper) *cobra.Command {
@ -127,6 +125,10 @@ func NewCommand(v *viper.Viper) *cobra.Command {
panic("error fetching default InfluxDB 2.0 dir: " + err.Error())
}
// DEPRECATED in favor of log-level=debug, but left for backwards-compatibility
verbose := false
options := &options{}
cmd := &cobra.Command{
Use: "upgrade",
Short: "Upgrade a 1.x version of InfluxDB",
@ -143,7 +145,9 @@ func NewCommand(v *viper.Viper) *cobra.Command {
Target 2.x database dir is specified by the --engine-path option. If changed, the bolt path should be changed as well.
`,
RunE: runUpgradeE,
RunE: func(cmd *cobra.Command, _ []string) error {
return runUpgradeE(cmd, options, verbose)
},
Args: cobra.NoArgs,
}
@ -154,11 +158,12 @@ func NewCommand(v *viper.Viper) *cobra.Command {
Desc: "path to source 1.x db directory containing meta, data and wal sub-folders",
},
{
DestP: &options.verbose,
DestP: &verbose,
Flag: "verbose",
Default: true,
Desc: "verbose output",
Default: false,
Desc: "DEPRECATED: use --log-level=debug instead",
Short: 'v',
Hidden: true,
},
{
DestP: &options.target.boltPath,
@ -309,7 +314,26 @@ func (i *influxDBv2) close() error {
var fluxInitialized bool
func runUpgradeE(*cobra.Command, []string) error {
func runUpgradeE(cmd *cobra.Command, options *options, verbose bool) error {
ctx := context.Background()
config := zap.NewProductionConfig()
config.Level = zap.NewAtomicLevelAt(options.logLevel)
if verbose {
config.Level.SetLevel(zap.DebugLevel)
}
config.OutputPaths = append(config.OutputPaths, options.logPath)
config.ErrorOutputPaths = append(config.ErrorOutputPaths, options.logPath)
log, err := config.Build()
if err != nil {
return err
}
if verbose {
log.Warn("--verbose is deprecated, use --log-level=debug instead")
}
// This command is executed multiple times by test code. Initialization can happen only once.
if !fluxInitialized {
fluxinit.FluxInit()
@ -333,16 +357,6 @@ func runUpgradeE(*cobra.Command, []string) error {
}
}
ctx := context.Background()
config := zap.NewProductionConfig()
config.Level = zap.NewAtomicLevelAt(options.logLevel)
config.OutputPaths = append(config.OutputPaths, options.logPath)
config.ErrorOutputPaths = append(config.ErrorOutputPaths, options.logPath)
log, err := config.Build()
if err != nil {
return err
}
var v1Config *configV1
var genericV1ops *map[string]interface{}
@ -413,7 +427,12 @@ func runUpgradeE(*cobra.Command, []string) error {
return errors.New("InfluxDB has been already set up")
}
req, err := onboardingRequest()
ui := &input.UI{
Writer: cmd.OutOrStdout(),
Reader: cmd.InOrStdin(),
}
req, err := onboardingRequest(ui, options)
if err != nil {
return err
}
@ -431,7 +450,7 @@ func runUpgradeE(*cobra.Command, []string) error {
return err
}
db2BucketIds, err := upgradeDatabases(ctx, v1, v2, &options.source, &options.target, or.Org.ID, log)
db2BucketIds, err := upgradeDatabases(ctx, ui, v1, v2, options, or.Org.ID, log)
if err != nil {
//remove all files
log.Info("Database upgrade error, removing data")