Bugfix for shard collisions resulting from attempted _internal import (#9569)
parent
6820dd8d13
commit
b213b31cd4
|
@ -402,6 +402,14 @@ func (cmd *Command) uploadShardsPortable() error {
|
|||
if cmd.sourceDatabase == "" || cmd.sourceDatabase == file.Database {
|
||||
if cmd.backupRetention == "" || cmd.backupRetention == file.Policy {
|
||||
if cmd.shard == 0 || cmd.shard == file.ShardID {
|
||||
oldID := file.ShardID
|
||||
// if newID not found then this shard's metadata was NOT imported
|
||||
// and should be skipped
|
||||
newID, ok := cmd.shardIDMap[oldID]
|
||||
if !ok {
|
||||
cmd.StdoutLogger.Printf("Meta info not found for shard %d on database %s. Skipping shard file %s", oldID, file.Database, file.FileName)
|
||||
continue
|
||||
}
|
||||
cmd.StdoutLogger.Printf("Restoring shard %d live from backup %s\n", file.ShardID, file.FileName)
|
||||
f, err := os.Open(filepath.Join(cmd.backupFilesPath, file.FileName))
|
||||
if err != nil {
|
||||
|
@ -419,7 +427,7 @@ func (cmd *Command) uploadShardsPortable() error {
|
|||
targetDB = file.Database
|
||||
}
|
||||
|
||||
if err := cmd.client.UploadShard(file.ShardID, cmd.shardIDMap[file.ShardID], targetDB, cmd.restoreRetention, tr); err != nil {
|
||||
if err := cmd.client.UploadShard(oldID, newID, targetDB, cmd.restoreRetention, tr); err != nil {
|
||||
f.Close()
|
||||
return err
|
||||
}
|
||||
|
@ -454,12 +462,20 @@ func (cmd *Command) uploadShardsLegacy() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// if newID not found then this shard's metadata was NOT imported
|
||||
// and should be skipped
|
||||
newID, ok := cmd.shardIDMap[shardID]
|
||||
if !ok {
|
||||
cmd.StdoutLogger.Printf("Meta info not found for shard %d. Skipping shard file %s", shardID, fn)
|
||||
continue
|
||||
}
|
||||
f, err := os.Open(fn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tr := tar.NewReader(f)
|
||||
if err := cmd.client.UploadShard(shardID, cmd.shardIDMap[shardID], cmd.destinationDatabase, cmd.restoreRetention, tr); err != nil {
|
||||
if err := cmd.client.UploadShard(shardID, newID, cmd.destinationDatabase, cmd.restoreRetention, tr); err != nil {
|
||||
f.Close()
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -12,12 +12,16 @@ import (
|
|||
|
||||
"github.com/influxdata/influxdb/cmd/influxd/backup"
|
||||
"github.com/influxdata/influxdb/cmd/influxd/restore"
|
||||
"github.com/influxdata/influxdb/toml"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func TestServer_BackupAndRestore(t *testing.T) {
|
||||
config := NewConfig()
|
||||
config.Data.Engine = "tsm1"
|
||||
config.BindAddress = freePort()
|
||||
config.Monitor.StoreEnabled = true
|
||||
config.Monitor.StoreInterval = toml.Duration(time.Second)
|
||||
|
||||
fullBackupDir, _ := ioutil.TempDir("", "backup")
|
||||
defer os.RemoveAll(fullBackupDir)
|
||||
|
@ -81,6 +85,15 @@ func TestServer_BackupAndRestore(t *testing.T) {
|
|||
t.Fatalf("query results wrong:\n\texp: %s\n\tgot: %s", expected, res)
|
||||
}
|
||||
|
||||
for !strings.Contains(res, "_internal") {
|
||||
res, err = s.Query(`SHOW DATABASES`)
|
||||
if err != nil {
|
||||
t.Fatalf("error querying: %s", err.Error())
|
||||
}
|
||||
// technically not necessary, but no reason to crush the CPU for polling
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
// now backup
|
||||
cmd := backup.NewCommand()
|
||||
_, port, err := net.SplitHostPort(config.BindAddress)
|
||||
|
|
Loading…
Reference in New Issue