commit
9423300292
|
@ -301,24 +301,34 @@ func (cmd *Command) download(req *snapshotter.Request, path string) error {
|
|||
}
|
||||
defer f.Close()
|
||||
|
||||
// Connect to snapshotter service.
|
||||
conn, err := tcp.Dial("tcp", cmd.host, snapshotter.MuxHeader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Close()
|
||||
for i := 0; i < 10; i++ {
|
||||
if err = func() error {
|
||||
// Connect to snapshotter service.
|
||||
conn, err := tcp.Dial("tcp", cmd.host, snapshotter.MuxHeader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// Write the request
|
||||
if err := json.NewEncoder(conn).Encode(req); err != nil {
|
||||
return fmt.Errorf("encode snapshot request: %s", err)
|
||||
// Write the request
|
||||
if err := json.NewEncoder(conn).Encode(req); err != nil {
|
||||
return fmt.Errorf("encode snapshot request: %s", err)
|
||||
}
|
||||
|
||||
// Read snapshot from the connection
|
||||
if n, err := io.Copy(f, conn); err != nil || n == 0 {
|
||||
return fmt.Errorf("copy backup to file: err=%v, n=%d", err, n)
|
||||
}
|
||||
return nil
|
||||
}(); err == nil {
|
||||
break
|
||||
} else if err != nil {
|
||||
cmd.Logger.Printf("Download shard %v failed %s. Retrying (%d)...\n", req.ShardID, err, i)
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// Read snapshot from the connection
|
||||
if _, err := io.Copy(f, conn); err != nil {
|
||||
return fmt.Errorf("copy backup to file: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
// requestInfo will request the database or retention policy information from the host
|
||||
|
|
|
@ -159,8 +159,8 @@ func (cmd *Command) unpackMeta() error {
|
|||
i += int(length)
|
||||
|
||||
// Size of the node.json bytes
|
||||
i += 8
|
||||
length = int(binary.BigEndian.Uint64(b[i : i+8]))
|
||||
i += 8
|
||||
nodeBytes := b[i : i+length]
|
||||
|
||||
// Unpack into metadata.
|
||||
|
|
|
@ -13,7 +13,6 @@ import (
|
|||
)
|
||||
|
||||
func TestServer_BackupAndRestore(t *testing.T) {
|
||||
t.Skip("currently fails intermittently. See issue https://github.com/influxdata/influxdb/issues/6590")
|
||||
config := NewConfig()
|
||||
config.Data.Engine = "tsm1"
|
||||
config.Data.Dir, _ = ioutil.TempDir("", "data_backup")
|
||||
|
@ -25,7 +24,7 @@ func TestServer_BackupAndRestore(t *testing.T) {
|
|||
|
||||
db := "mydb"
|
||||
rp := "forever"
|
||||
expected := `{"results":[{"series":[{"name":"myseries","columns":["time","host","value"],"values":[["1970-01-01T00:00:00.001Z","A",23]]}]}]}`
|
||||
expected := `{"results":[{"statement_id":0,"series":[{"name":"myseries","columns":["time","host","value"],"values":[["1970-01-01T00:00:00.001Z","A",23]]}]}]}`
|
||||
|
||||
// set the cache snapshot size low so that a single point will cause TSM file creation
|
||||
config.Data.CacheSnapshotMemorySize = 1
|
||||
|
|
|
@ -489,6 +489,9 @@ func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error {
|
|||
return err
|
||||
}
|
||||
|
||||
tw := tar.NewWriter(w)
|
||||
defer tw.Close()
|
||||
|
||||
// Remove the temporary snapshot dir
|
||||
defer os.RemoveAll(path)
|
||||
|
||||
|
@ -515,9 +518,6 @@ func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
tw := tar.NewWriter(w)
|
||||
defer tw.Close()
|
||||
|
||||
for _, f := range files {
|
||||
if err := e.writeFileToBackup(f, basePath, filepath.Join(path, f.Name()), tw); err != nil {
|
||||
return err
|
||||
|
|
|
@ -327,7 +327,7 @@ func (t *TSMReader) Close() error {
|
|||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
if t.refs > 0 {
|
||||
if t.InUse() {
|
||||
return ErrFileInUse
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue