mirror of https://github.com/milvus-io/milvus.git
fix garbage collector err handling (#18277)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>pull/18292/head
parent
eb2de5aa59
commit
6d82ef8c20
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/minio/minio-go/v7"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -112,11 +113,11 @@ func (gc *garbageCollector) close() {
|
|||
// scan load meta file info and compares OSS keys
|
||||
// if missing found, performs gc cleanup
|
||||
func (gc *garbageCollector) scan() {
|
||||
var v, m int
|
||||
valid := gc.meta.ListSegmentFiles()
|
||||
vm := make(map[string]struct{})
|
||||
for _, k := range valid {
|
||||
vm[k.GetLogPath()] = struct{}{}
|
||||
var total, valid, missing int
|
||||
segmentFiles := gc.meta.ListSegmentFiles()
|
||||
filesMap := make(map[string]struct{})
|
||||
for _, k := range segmentFiles {
|
||||
filesMap[k.GetLogPath()] = struct{}{}
|
||||
}
|
||||
|
||||
// walk only data cluster related prefixes
|
||||
|
@ -131,30 +132,36 @@ func (gc *garbageCollector) scan() {
|
|||
Prefix: prefix,
|
||||
Recursive: true,
|
||||
}) {
|
||||
_, has := vm[info.Key]
|
||||
total++
|
||||
_, has := filesMap[info.Key]
|
||||
if has {
|
||||
v++
|
||||
valid++
|
||||
continue
|
||||
}
|
||||
|
||||
// binlog path should consist of "/files/insertLog/collID/partID/segID/fieldID/fileName"
|
||||
segmentID, err := parseSegmentIDByBinlog(info.Key)
|
||||
if err == nil {
|
||||
if gc.segRefer.HasSegmentLock(segmentID) {
|
||||
v++
|
||||
continue
|
||||
}
|
||||
segmentID, err := storage.ParseSegmentIDByBinlog(info.Key)
|
||||
if err != nil {
|
||||
log.Error("parse segment id error", zap.String("infoKey", info.Key), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
m++
|
||||
if gc.segRefer.HasSegmentLock(segmentID) {
|
||||
valid++
|
||||
continue
|
||||
}
|
||||
missing++
|
||||
// not found in meta, check last modified time exceeds tolerance duration
|
||||
if time.Since(info.LastModified) > gc.option.missingTolerance {
|
||||
// ignore error since it could be cleaned up next time
|
||||
removedKeys = append(removedKeys, info.Key)
|
||||
_ = gc.option.cli.RemoveObject(context.TODO(), gc.option.bucketName, info.Key, minio.RemoveObjectOptions{})
|
||||
err = gc.option.cli.RemoveObject(context.TODO(), gc.option.bucketName, info.Key, minio.RemoveObjectOptions{})
|
||||
if err != nil {
|
||||
log.Error("failed to remove object", zap.String("infoKey", info.Key), zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Info("scan result", zap.Int("valid", v), zap.Int("missing", m), zap.Strings("removed keys", removedKeys))
|
||||
log.Info("scan file to do garbage collection", zap.Int("total", total),
|
||||
zap.Int("valid", valid), zap.Int("missing", missing), zap.Strings("removed keys", removedKeys))
|
||||
}
|
||||
|
||||
func (gc *garbageCollector) clearEtcd() {
|
||||
|
|
|
@ -284,9 +284,9 @@ func initUtOSSEnv(bucket, root string, n int) (cli *minio.Client, inserts []stri
|
|||
content := []byte("test")
|
||||
for i := 0; i < n; i++ {
|
||||
reader := bytes.NewReader(content)
|
||||
token := path.Join(funcutil.RandomString(8), funcutil.RandomString(8), strconv.Itoa(i), funcutil.RandomString(8), funcutil.RandomString(8))
|
||||
token := path.Join(funcutil.RandomString(8), strconv.Itoa(i), strconv.Itoa(i), funcutil.RandomString(8), funcutil.RandomString(8))
|
||||
if i == 1 {
|
||||
token = path.Join(funcutil.RandomString(8), funcutil.RandomString(8), strconv.Itoa(i), funcutil.RandomString(8))
|
||||
token = path.Join(funcutil.RandomString(8), strconv.Itoa(i), strconv.Itoa(i), funcutil.RandomString(8))
|
||||
}
|
||||
// insert
|
||||
filePath := path.Join(root, insertLogPrefix, token)
|
||||
|
|
|
@ -19,8 +19,6 @@ package datacoord
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
|
@ -87,9 +85,3 @@ func getCompactTime(ctx context.Context, allocator allocator) (*compactTime, err
|
|||
// no expiration time
|
||||
return &compactTime{ttRetentionLogic, 0}, nil
|
||||
}
|
||||
|
||||
func parseSegmentIDByBinlog(path string) (UniqueID, error) {
|
||||
// binlog path should consist of "files/insertLog/collID/partID/segID/fieldID/fileName"
|
||||
keyStr := strings.Split(path, "/")
|
||||
return strconv.ParseInt(keyStr[len(keyStr)-3], 10, 64)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func ParseSegmentIDByBinlog(path string) (UniqueID, error) {
|
||||
// binlog path should consist of "files/insertLog/collID/partID/segID/fieldID/fileName"
|
||||
keyStr := strings.Split(path, "/")
|
||||
return strconv.ParseInt(keyStr[len(keyStr)-3], 10, 64)
|
||||
}
|
Loading…
Reference in New Issue