diff --git a/cmd/influxd/backup/backup.go b/cmd/influxd/backup/backup.go index 5ba5069190..448d0ac0c3 100644 --- a/cmd/influxd/backup/backup.go +++ b/cmd/influxd/backup/backup.go @@ -301,24 +301,34 @@ func (cmd *Command) download(req *snapshotter.Request, path string) error { } defer f.Close() - // Connect to snapshotter service. - conn, err := tcp.Dial("tcp", cmd.host, snapshotter.MuxHeader) - if err != nil { - return err - } - defer conn.Close() + for i := 0; i < 10; i++ { + if err = func() error { + // Connect to snapshotter service. + conn, err := tcp.Dial("tcp", cmd.host, snapshotter.MuxHeader) + if err != nil { + return err + } + defer conn.Close() - // Write the request - if err := json.NewEncoder(conn).Encode(req); err != nil { - return fmt.Errorf("encode snapshot request: %s", err) + // Write the request + if err := json.NewEncoder(conn).Encode(req); err != nil { + return fmt.Errorf("encode snapshot request: %s", err) + } + + // Read snapshot from the connection + if n, err := io.Copy(f, conn); err != nil || n == 0 { + return fmt.Errorf("copy backup to file: err=%v, n=%d", err, n) + } + return nil + }(); err == nil { + break + } else if err != nil { + cmd.Logger.Printf("Download shard %v failed %s. Retrying (%d)...\n", req.ShardID, err, i) + time.Sleep(time.Second) + } } - // Read snapshot from the connection - if _, err := io.Copy(f, conn); err != nil { - return fmt.Errorf("copy backup to file: %s", err) - } - - return nil + return err } // requestInfo will request the database or retention policy information from the host diff --git a/cmd/influxd/restore/restore.go b/cmd/influxd/restore/restore.go index 87f1f44079..253cb81e0a 100644 --- a/cmd/influxd/restore/restore.go +++ b/cmd/influxd/restore/restore.go @@ -159,8 +159,8 @@ func (cmd *Command) unpackMeta() error { i += int(length) // Size of the node.json bytes - i += 8 length = int(binary.BigEndian.Uint64(b[i : i+8])) + i += 8 nodeBytes := b[i : i+length] // Unpack into metadata. diff --git a/cmd/influxd/run/backup_restore_test.go b/cmd/influxd/run/backup_restore_test.go index c6b419d81b..ee9fda7656 100644 --- a/cmd/influxd/run/backup_restore_test.go +++ b/cmd/influxd/run/backup_restore_test.go @@ -13,7 +13,6 @@ import ( ) func TestServer_BackupAndRestore(t *testing.T) { - t.Skip("currently fails intermittently. See issue https://github.com/influxdata/influxdb/issues/6590") config := NewConfig() config.Data.Engine = "tsm1" config.Data.Dir, _ = ioutil.TempDir("", "data_backup") @@ -25,7 +24,7 @@ func TestServer_BackupAndRestore(t *testing.T) { db := "mydb" rp := "forever" - expected := `{"results":[{"series":[{"name":"myseries","columns":["time","host","value"],"values":[["1970-01-01T00:00:00.001Z","A",23]]}]}]}` + expected := `{"results":[{"statement_id":0,"series":[{"name":"myseries","columns":["time","host","value"],"values":[["1970-01-01T00:00:00.001Z","A",23]]}]}]}` // set the cache snapshot size low so that a single point will cause TSM file creation config.Data.CacheSnapshotMemorySize = 1 diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 37ba8ee2a3..4d09922b05 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -489,6 +489,9 @@ func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error { return err } + tw := tar.NewWriter(w) + defer tw.Close() + // Remove the temporary snapshot dir defer os.RemoveAll(path) @@ -515,9 +518,6 @@ func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error { return nil } - tw := tar.NewWriter(w) - defer tw.Close() - for _, f := range files { if err := e.writeFileToBackup(f, basePath, filepath.Join(path, f.Name()), tw); err != nil { return err diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index f8e5485894..fca6bb3ca5 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -327,7 +327,7 @@ func (t *TSMReader) Close() error { t.mu.Lock() defer t.mu.Unlock() - if t.refs > 0 { + if t.InUse() { return ErrFileInUse }