170 lines
4.6 KiB
Go
170 lines
4.6 KiB
Go
package tar
|
|
|
|
import (
|
|
"archive/tar"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/influxdata/influxdb/v2/pkg/file"
|
|
)
|
|
|
|
// Stream is a convenience function for creating a tar of a shard dir. It walks over the directory and subdirs,
|
|
// possibly writing each file to a tar writer stream. By default StreamFile is used, which will result in all files
|
|
// being written. A custom writeFunc can be passed so that each file may be written, modified+written, or skipped
|
|
// depending on the custom logic.
|
|
func Stream(w io.Writer, dir, relativePath string, writeFunc func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error) error {
|
|
tw := tar.NewWriter(w)
|
|
defer tw.Close()
|
|
|
|
if writeFunc == nil {
|
|
writeFunc = StreamFile
|
|
}
|
|
|
|
return filepath.WalkDir(dir, func(path string, entry os.DirEntry, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Skip adding an entry for the root dir
|
|
if dir == path && entry.IsDir() {
|
|
return nil
|
|
}
|
|
|
|
// Figure out the the full relative path including any sub-dirs
|
|
subDir, _ := filepath.Split(path)
|
|
subDir, err = filepath.Rel(dir, subDir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
f, err := entry.Info()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return writeFunc(f, filepath.Join(relativePath, subDir), path, tw)
|
|
})
|
|
}
|
|
|
|
// Generates a filtering function for Stream that checks an incoming file, and only writes the file to the stream if
|
|
// its mod time is later than since. Example: to tar only files newer than a certain datetime, use
|
|
// tar.Stream(w, dir, relativePath, SinceFilterTarFile(datetime))
|
|
func SinceFilterTarFile(since time.Time) func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
|
|
return func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
|
|
if f.ModTime().After(since) {
|
|
return StreamFile(f, shardRelativePath, fullPath, tw)
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// stream a single file to tw, extending the header name using the shardRelativePath
|
|
func StreamFile(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
|
|
return StreamRenameFile(f, f.Name(), shardRelativePath, fullPath, tw)
|
|
}
|
|
|
|
/// Stream a single file to tw, using tarHeaderFileName instead of the actual filename
|
|
// e.g., when we want to write a *.tmp file using the original file's non-tmp name.
|
|
func StreamRenameFile(f os.FileInfo, tarHeaderFileName, relativePath, fullPath string, tw *tar.Writer) error {
|
|
h, err := tar.FileInfoHeader(f, f.Name())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
h.Name = filepath.ToSlash(filepath.Join(relativePath, tarHeaderFileName))
|
|
|
|
if err := tw.WriteHeader(h); err != nil {
|
|
return err
|
|
}
|
|
|
|
if !f.Mode().IsRegular() {
|
|
return nil
|
|
}
|
|
|
|
fr, err := os.Open(fullPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer fr.Close()
|
|
|
|
_, err = io.CopyN(tw, fr, h.Size)
|
|
|
|
return err
|
|
}
|
|
|
|
// Restore reads a tar archive from r and extracts all of its files into dir,
|
|
// using only the base name of each file.
|
|
func Restore(r io.Reader, dir string) error {
|
|
tr := tar.NewReader(r)
|
|
for {
|
|
if err := extractFile(tr, dir); err == io.EOF {
|
|
break
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return file.SyncDir(dir)
|
|
}
|
|
|
|
// extractFile copies the next file from tr into dir, using the file's base name.
|
|
func extractFile(tr *tar.Reader, dir string) error {
|
|
// Read next archive file.
|
|
hdr, err := tr.Next()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// The hdr.Name is the relative path of the file from the root data dir.
|
|
// e.g (db/rp/1/xxxxx.tsm or db/rp/1/index/xxxxxx.tsi)
|
|
sections := strings.Split(filepath.FromSlash(hdr.Name), string(filepath.Separator))
|
|
if len(sections) < 3 {
|
|
return fmt.Errorf("invalid archive path: %s", hdr.Name)
|
|
}
|
|
|
|
relativePath := filepath.Join(sections[3:]...)
|
|
|
|
subDir, _ := filepath.Split(relativePath)
|
|
// If this is a directory entry (usually just `index` for tsi), create it an move on.
|
|
if hdr.Typeflag == tar.TypeDir {
|
|
return os.MkdirAll(filepath.Join(dir, subDir), os.FileMode(hdr.Mode).Perm())
|
|
}
|
|
|
|
// Make sure the dir we need to write into exists. It should, but just double check in
|
|
// case we get a slightly invalid tarball.
|
|
if subDir != "" {
|
|
if err := os.MkdirAll(filepath.Join(dir, subDir), 0755); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
destPath := filepath.Join(dir, relativePath)
|
|
tmp := destPath + ".tmp"
|
|
|
|
// Create new file on disk.
|
|
f, err := os.OpenFile(tmp, os.O_CREATE|os.O_RDWR, os.FileMode(hdr.Mode).Perm())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
|
|
// Copy from archive to the file.
|
|
if _, err := io.CopyN(f, tr, hdr.Size); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Sync to disk & close.
|
|
if err := f.Sync(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := f.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return file.RenameFile(tmp, destPath)
|
|
}
|