influxdb/cmd/influxd/upgrade/database.go

239 lines
7.7 KiB
Go
Raw Normal View History

package upgrade
import (
"context"
"errors"
2020-10-01 15:36:23 +00:00
"fmt"
"os"
2020-10-01 15:36:23 +00:00
"path/filepath"
"strings"
2020-10-01 15:36:23 +00:00
"github.com/dustin/go-humanize"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/cmd/internal"
2020-10-01 15:36:23 +00:00
"github.com/influxdata/influxdb/v2/pkg/fs"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"github.com/tcnksm/go-input"
"go.uber.org/zap"
)
// upgradeDatabases creates databases, buckets, retention policies and shard info according to 1.x meta and copies data
func upgradeDatabases(ctx context.Context, ui *input.UI, v1 *influxDBv1, v2 *influxDBv2, opts *options, orgID influxdb.ID, log *zap.Logger) (map[string][]influxdb.ID, error) {
v1opts := opts.source
v2opts := opts.target
db2BucketIds := make(map[string][]influxdb.ID)
2020-10-01 15:36:23 +00:00
targetDataPath := filepath.Join(v2opts.enginePath, "data")
targetWalPath := filepath.Join(v2opts.enginePath, "wal")
dirFilterFunc := func(path string) bool {
base := filepath.Base(path)
if base == "_series" ||
(len(base) > 0 && base[0] == '_') || //skip internal databases
base == "index" {
return true
}
return false
}
if len(v1.meta.Databases()) == 0 {
log.Info("No database found in the 1.x meta")
return db2BucketIds, nil
}
if err := checkDiskSpace(ui, opts, log); err != nil {
return nil, err
2020-10-01 15:36:23 +00:00
}
cqFile, err := os.OpenFile(v2opts.cqPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, fmt.Errorf("error opening file for CQ export %s: %w", v2opts.cqPath, err)
}
defer cqFile.Close()
2020-10-01 15:36:23 +00:00
log.Info("Upgrading databases")
// read each database / retention policy from v1.meta and create bucket db-name/rp-name
// create database in v2.meta
// copy shard info from v1.meta
// export any continuous queries
2020-10-01 15:36:23 +00:00
for _, db := range v1.meta.Databases() {
if db.Name == "_internal" {
log.Debug("Skipping _internal ")
2020-10-01 15:36:23 +00:00
continue
}
log.Debug("Upgrading database", zap.String("database", db.Name))
2020-10-01 15:36:23 +00:00
// db to buckets IDs mapping
db2BucketIds[db.Name] = make([]influxdb.ID, 0, len(db.RetentionPolicies))
2020-10-01 15:36:23 +00:00
for _, rp := range db.RetentionPolicies {
sourcePath := filepath.Join(v1opts.dataDir, db.Name, rp.Name)
bucket := &influxdb.Bucket{
OrgID: orgID,
Type: influxdb.BucketTypeUser,
Name: db.Name + "/" + rp.Name,
2020-10-01 15:36:23 +00:00
Description: fmt.Sprintf("Upgraded from v1 database %s with retention policy %s", db.Name, rp.Name),
RetentionPolicyName: rp.Name,
RetentionPeriod: rp.Duration,
}
log.Debug("Creating bucket", zap.String("Bucket", bucket.Name))
2020-10-01 15:36:23 +00:00
err = v2.bucketSvc.CreateBucket(ctx, bucket)
if err != nil {
return nil, fmt.Errorf("error creating bucket %s: %w", bucket.Name, err)
}
db2BucketIds[db.Name] = append(db2BucketIds[db.Name], bucket.ID)
log.Debug("Creating database with retention policy", zap.String("database", bucket.ID.String()))
2020-10-01 15:36:23 +00:00
spec := rp.ToSpec()
spec.Name = meta.DefaultRetentionPolicyName
dbv2, err := v2.meta.CreateDatabaseWithRetentionPolicy(bucket.ID.String(), spec)
if err != nil {
return nil, fmt.Errorf("error creating database %s: %w", bucket.ID.String(), err)
}
mapping := &influxdb.DBRPMappingV2{
Database: db.Name,
RetentionPolicy: rp.Name,
Default: db.DefaultRetentionPolicy == rp.Name,
OrganizationID: orgID,
BucketID: bucket.ID,
}
log.Debug(
"Creating mapping",
zap.String("database", mapping.Database),
zap.String("retention policy", mapping.RetentionPolicy),
zap.String("orgID", mapping.OrganizationID.String()),
zap.String("bucketID", mapping.BucketID.String()),
)
2020-10-01 15:36:23 +00:00
err = v2.dbrpSvc.Create(ctx, mapping)
if err != nil {
return nil, fmt.Errorf("error creating mapping %s/%s -> Org %s, bucket %s: %w", mapping.Database, mapping.RetentionPolicy, mapping.OrganizationID.String(), mapping.BucketID.String(), err)
}
shardsNum := 0
for _, sg := range rp.ShardGroups {
log.Debug(
"Creating shard group",
zap.String("database", dbv2.Name),
zap.String("retention policy", dbv2.DefaultRetentionPolicy),
zap.Time("time", sg.StartTime),
)
2020-10-01 15:36:23 +00:00
shardsNum += len(sg.Shards)
_, err := v2.meta.CreateShardGroupWithShards(dbv2.Name, dbv2.DefaultRetentionPolicy, sg.StartTime, sg.Shards)
if err != nil {
return nil, fmt.Errorf("error creating database %s: %w", bucket.ID.String(), err)
}
}
//empty retention policy doesn't have data
if shardsNum > 0 {
targetPath := filepath.Join(targetDataPath, dbv2.Name, spec.Name)
log.Debug(
"Copying data",
zap.String("source", sourcePath),
zap.String("target", targetPath),
)
2020-10-01 15:36:23 +00:00
err = CopyDir(sourcePath,
targetPath,
nil,
dirFilterFunc,
nil)
if err != nil {
return nil, fmt.Errorf("error copying v1 data from %s to %s: %w", sourcePath, targetPath, err)
}
sourcePath = filepath.Join(v1opts.walDir, db.Name, rp.Name)
targetPath = filepath.Join(targetWalPath, dbv2.Name, spec.Name)
log.Debug(
"Copying wal",
zap.String("source", sourcePath),
zap.String("target", targetPath),
)
2020-10-01 15:36:23 +00:00
err = CopyDir(sourcePath,
targetPath,
nil,
dirFilterFunc,
nil)
if err != nil {
return nil, fmt.Errorf("error copying v1 data from %s to %s: %w", sourcePath, targetPath, err)
}
} else {
log.Warn("Empty retention policy")
}
}
// Output CQs in the same format as SHOW CONTINUOUS QUERIES
_, err := cqFile.WriteString(fmt.Sprintf("name: %s\n", db.Name))
if err != nil {
return nil, err
}
maxNameLen := 4 // 4 == len("name"), the column header
for _, cq := range db.ContinuousQueries {
if len(cq.Name) > maxNameLen {
maxNameLen = len(cq.Name)
}
}
headerPadding := maxNameLen - 4 + 1
_, err = cqFile.WriteString(fmt.Sprintf("name%[1]squery\n----%[1]s-----\n", strings.Repeat(" ", headerPadding)))
if err != nil {
return nil, err
}
for _, cq := range db.ContinuousQueries {
log.Debug("Exporting CQ", zap.String("db", db.Name), zap.String("cq_name", cq.Name))
padding := maxNameLen - len(cq.Name) + 1
_, err := cqFile.WriteString(fmt.Sprintf("%s%s%s\n", cq.Name, strings.Repeat(" ", padding), cq.Query))
if err != nil {
return nil, fmt.Errorf("error exporting continuous query %s from DB %s: %w", cq.Name, db.Name, err)
}
}
_, err = cqFile.WriteString("\n")
if err != nil {
return nil, err
}
2020-10-01 15:36:23 +00:00
}
log.Info("Database upgrade complete", zap.Int("upgraded_count", len(db2BucketIds)))
2020-10-01 15:36:23 +00:00
return db2BucketIds, nil
}
// checkDiskSpace ensures there is enough room at the target path to store
// a full copy of all V1 data.
func checkDiskSpace(ui *input.UI, opts *options, log *zap.Logger) error {
log.Info("Checking available disk space")
size, err := DirSize(opts.source.dataDir)
if err != nil {
return fmt.Errorf("error getting size of %s: %w", opts.source.dataDir, err)
}
walSize, err := DirSize(opts.source.walDir)
if err != nil {
return fmt.Errorf("error getting size of %s: %w", opts.source.walDir, err)
}
size += walSize
v2dir := filepath.Dir(opts.target.boltPath)
diskInfo, err := fs.DiskUsage(v2dir)
if err != nil {
return fmt.Errorf("error getting info of disk %s: %w", v2dir, err)
}
freeBytes := humanize.Bytes(diskInfo.Free)
requiredBytes := humanize.Bytes(size)
log.Info("Computed disk space", zap.String("free", freeBytes), zap.String("required", requiredBytes))
if size > diskInfo.Free {
return fmt.Errorf("not enough space on target disk of %s: need %d, available %d", v2dir, size, diskInfo.Free)
}
if !opts.force {
if confirmed := internal.GetConfirm(ui, func() string {
return fmt.Sprintf(`Proceeding will copy all V1 data to %q
Space available: %s
Space required: %s
`, v2dir, freeBytes, requiredBytes)
}); !confirmed {
return errors.New("upgrade was canceled")
}
}
return nil
}