mirror of https://github.com/k3s-io/k3s.git
Simplify snapshot compress/decompress logic
Compression creates a zipfile with the same path as the snapshot file containing only the snapshot. Decompression can be a bit simpler by also extracting to the same path, and erroring if there are unexpected contents. In retrospect we probably should have just gzip'd the snapshot file, but I think there was some intention to observe the same behavior as RKE1, which used zip files. Signed-off-by: Brad Davidson <brad.davidson@rancher.com>pull/13863/head
parent
630f7d5e59
commit
4cc440f2c9
|
|
@ -102,91 +102,102 @@ func snapshotDir(config *config.Control, create bool) (string, error) {
|
|||
|
||||
// compressSnapshot compresses the given snapshot and provides the
|
||||
// caller with the path to the file.
|
||||
func (e *ETCD) compressSnapshot(snapshotDir, snapshotName, snapshotPath string, now time.Time) (string, error) {
|
||||
logrus.Info("Compressing etcd snapshot file: " + snapshotName)
|
||||
func (e *ETCD) compressSnapshot(snapshotDir, snapshotFilename string, mtime time.Time) (zipPath string, err error) {
|
||||
logrus.Info("Compressing etcd snapshot file: " + snapshotFilename)
|
||||
snapshotPath := filepath.Join(snapshotDir, snapshotFilename)
|
||||
zipPath = snapshotPath + snapshot.CompressedExtension
|
||||
|
||||
zippedSnapshotName := snapshotName + snapshot.CompressedExtension
|
||||
zipPath := filepath.Join(snapshotDir, zippedSnapshotName)
|
||||
defer func() {
|
||||
if err != nil {
|
||||
os.Remove(zipPath)
|
||||
}
|
||||
}()
|
||||
|
||||
zf, err := os.Create(zipPath)
|
||||
sf, err := os.Open(snapshotPath)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer zf.Close()
|
||||
defer sf.Close()
|
||||
|
||||
zipWriter := zip.NewWriter(zf)
|
||||
defer zipWriter.Close()
|
||||
|
||||
uncompressedPath := filepath.Join(snapshotDir, snapshotName)
|
||||
fileToZip, err := os.Open(uncompressedPath)
|
||||
fi, err := sf.Stat()
|
||||
if err != nil {
|
||||
os.Remove(zipPath)
|
||||
return "", err
|
||||
}
|
||||
defer fileToZip.Close()
|
||||
|
||||
info, err := fileToZip.Stat()
|
||||
if err != nil {
|
||||
os.Remove(zipPath)
|
||||
return "", err
|
||||
}
|
||||
|
||||
header, err := zip.FileInfoHeader(info)
|
||||
of, err := os.Create(zipPath)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer of.Close()
|
||||
|
||||
zw := zip.NewWriter(of)
|
||||
defer zw.Close()
|
||||
|
||||
zfi, err := zip.FileInfoHeader(fi)
|
||||
if err != nil {
|
||||
os.Remove(zipPath)
|
||||
return "", err
|
||||
}
|
||||
|
||||
header.Name = snapshotName
|
||||
header.Method = zip.Deflate
|
||||
header.Modified = now
|
||||
zfi.Name = snapshotFilename
|
||||
zfi.Method = zip.Deflate
|
||||
zfi.Modified = mtime
|
||||
|
||||
writer, err := zipWriter.CreateHeader(header)
|
||||
hw, err := zw.CreateHeader(zfi)
|
||||
if err != nil {
|
||||
os.Remove(zipPath)
|
||||
return "", err
|
||||
}
|
||||
_, err = io.Copy(writer, fileToZip)
|
||||
|
||||
_, err = io.Copy(hw, sf)
|
||||
return zipPath, err
|
||||
}
|
||||
|
||||
// decompressSnapshot decompresses the given snapshot and provides the caller
|
||||
// with the full path to the uncompressed snapshot.
|
||||
func (e *ETCD) decompressSnapshot(snapshotDir, snapshotFile string) (string, error) {
|
||||
logrus.Info("Decompressing etcd snapshot file: " + snapshotFile)
|
||||
func (e *ETCD) decompressSnapshot(snapshotDir, snapshotFilename string) (unzipPath string, err error) {
|
||||
logrus.Info("Decompressing etcd snapshot file: " + snapshotFilename)
|
||||
snapshotPath := filepath.Join(snapshotDir, snapshotFilename)
|
||||
unzipPath = strings.TrimSuffix(snapshotPath, snapshot.CompressedExtension)
|
||||
|
||||
r, err := zip.OpenReader(snapshotFile)
|
||||
defer func() {
|
||||
if err != nil {
|
||||
os.Remove(unzipPath)
|
||||
}
|
||||
}()
|
||||
|
||||
sf, err := os.Open(snapshotPath)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer r.Close()
|
||||
defer sf.Close()
|
||||
|
||||
var decompressed *os.File
|
||||
for _, sf := range r.File {
|
||||
decompressed, err = os.OpenFile(strings.Replace(sf.Name, snapshot.CompressedExtension, "", -1), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, sf.Mode())
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
//revive:disable-next-line:defer
|
||||
defer decompressed.Close()
|
||||
|
||||
ss, err := sf.Open()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
//revive:disable-next-line:defer
|
||||
defer ss.Close()
|
||||
|
||||
if _, err := io.Copy(decompressed, ss); err != nil {
|
||||
os.Remove(decompressed.Name())
|
||||
return "", err
|
||||
}
|
||||
fi, err := sf.Stat()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return decompressed.Name(), nil
|
||||
zf, err := zip.NewReader(sf, fi.Size())
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if len(zf.File) != 1 {
|
||||
return "", errors.New("unexpected compressed etcd snapshot contents")
|
||||
}
|
||||
|
||||
cf, err := zf.File[0].Open()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer cf.Close()
|
||||
|
||||
of, err := os.OpenFile(unzipPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer of.Close()
|
||||
|
||||
_, err = io.Copy(of, cf)
|
||||
return unzipPath, err
|
||||
}
|
||||
|
||||
// Snapshot attempts to save a new snapshot to the configured directory, and then clean up any old and failed
|
||||
|
|
@ -289,7 +300,7 @@ func (e *ETCD) snapshot(ctx context.Context) (_ *managed.SnapshotResult, rerr er
|
|||
// If the snapshot attempt was successful, sf will be nil as we did not set it to store the error message.
|
||||
if sf == nil {
|
||||
if e.config.EtcdSnapshotCompress {
|
||||
zipPath, err := e.compressSnapshot(snapshotDir, snapshotName, snapshotPath, now)
|
||||
zipPath, err := e.compressSnapshot(snapshotDir, snapshotName, now)
|
||||
|
||||
// ensure that the unncompressed snapshot is cleaned up even if compression fails
|
||||
if err := os.Remove(snapshotPath); err != nil && !os.IsNotExist(err) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue