mirror of https://github.com/milvus-io/milvus.git
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/22543/head
parent
9129c11dec
commit
a58abe1665
|
@ -19,6 +19,7 @@ package datacoord
|
|||
import (
|
||||
"context"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -41,6 +42,8 @@ const (
|
|||
deltaLogPrefix = `delta_log`
|
||||
)
|
||||
|
||||
type collectionValidator func(int64) bool
|
||||
|
||||
// GcOption garbage collection options
|
||||
type GcOption struct {
|
||||
cli storage.ChunkManager // client
|
||||
|
@ -48,6 +51,7 @@ type GcOption struct {
|
|||
checkInterval time.Duration // each interval
|
||||
missingTolerance time.Duration // key missing in meta tolerance time
|
||||
dropTolerance time.Duration // dropped segment related key tolerance time
|
||||
collValidator collectionValidator // validates collection id
|
||||
}
|
||||
|
||||
// garbageCollector handles garbage files in object storage
|
||||
|
@ -109,6 +113,24 @@ func (gc *garbageCollector) work() {
|
|||
}
|
||||
}
|
||||
|
||||
func (gc *garbageCollector) isCollectionPrefixValid(p string, prefix string) bool {
|
||||
if gc.option.collValidator == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(p, prefix) {
|
||||
return false
|
||||
}
|
||||
|
||||
p = strings.Trim(p[len(prefix):], "/")
|
||||
collectionID, err := strconv.ParseInt(p, 10, 64)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return gc.option.collValidator(collectionID)
|
||||
}
|
||||
|
||||
func (gc *garbageCollector) close() {
|
||||
gc.stopOnce.Do(func() {
|
||||
close(gc.closeCh)
|
||||
|
@ -146,13 +168,31 @@ func (gc *garbageCollector) scan() {
|
|||
var removedKeys []string
|
||||
|
||||
for _, prefix := range prefixes {
|
||||
infoKeys, modTimes, err := gc.option.cli.ListWithPrefix(ctx, prefix, true)
|
||||
// list first level prefix, then perform collection id validation
|
||||
collectionPrefixes, _, err := gc.option.cli.ListWithPrefix(ctx, prefix+"/", false)
|
||||
if err != nil {
|
||||
log.Error("failed to list files with prefix",
|
||||
log.Warn("failed to list collection prefix",
|
||||
zap.String("prefix", prefix),
|
||||
zap.String("error", err.Error()),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
for _, collPrefix := range collectionPrefixes {
|
||||
if !gc.isCollectionPrefixValid(collPrefix, prefix) {
|
||||
log.Warn("garbage collector meet invalid collection prefix, ignore it",
|
||||
zap.String("collPrefix", collPrefix),
|
||||
zap.String("prefix", prefix),
|
||||
)
|
||||
continue
|
||||
}
|
||||
infoKeys, modTimes, err := gc.option.cli.ListWithPrefix(ctx, collPrefix, true)
|
||||
if err != nil {
|
||||
log.Error("failed to list files with collPrefix",
|
||||
zap.String("collPrefix", collPrefix),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
for i, infoKey := range infoKeys {
|
||||
total++
|
||||
_, has := filesMap[infoKey]
|
||||
|
@ -169,7 +209,6 @@ func (gc *garbageCollector) scan() {
|
|||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
if gc.segRefer.HasSegmentLock(segmentID) {
|
||||
valid++
|
||||
continue
|
||||
|
@ -195,6 +234,7 @@ func (gc *garbageCollector) scan() {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Info("scan file to do garbage collection",
|
||||
zap.Int("total", total),
|
||||
zap.Int("valid", valid),
|
||||
|
|
|
@ -19,6 +19,7 @@ package datacoord
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -27,6 +28,7 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
kvmocks "github.com/milvus-io/milvus/internal/kv/mocks"
|
||||
"github.com/milvus-io/milvus/internal/mocks"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
|
@ -34,10 +36,170 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
"github.com/samber/lo"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
type GarbageCollectorSuite struct {
|
||||
suite.Suite
|
||||
|
||||
mockChunkManager *mocks.ChunkManager
|
||||
gc *garbageCollector
|
||||
}
|
||||
|
||||
func (s *GarbageCollectorSuite) SetupTest() {
|
||||
meta, err := newMemoryMeta()
|
||||
s.Require().NoError(err)
|
||||
s.mockChunkManager = &mocks.ChunkManager{}
|
||||
|
||||
mockKV := &kvmocks.TxnKV{}
|
||||
mockKV.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{}, []string{}, nil)
|
||||
segRefer, err := NewSegmentReferenceManager(mockKV, nil)
|
||||
|
||||
s.Require().NoError(err)
|
||||
s.Require().NotNil(segRefer)
|
||||
|
||||
s.gc = newGarbageCollector(
|
||||
meta, newMockHandler(), segRefer, &mocks.MockIndexCoord{}, GcOption{
|
||||
cli: s.mockChunkManager,
|
||||
enabled: true,
|
||||
checkInterval: time.Millisecond * 10,
|
||||
missingTolerance: time.Hour * 24,
|
||||
dropTolerance: time.Hour * 24,
|
||||
},
|
||||
)
|
||||
|
||||
}
|
||||
|
||||
func (s *GarbageCollectorSuite) TearDownTest() {
|
||||
s.mockChunkManager = nil
|
||||
s.gc.close()
|
||||
s.gc = nil
|
||||
}
|
||||
|
||||
func (s *GarbageCollectorSuite) TestBasicOperation() {
|
||||
s.Run("normal_gc", func() {
|
||||
gc := s.gc
|
||||
s.mockChunkManager.EXPECT().RootPath().Return("files")
|
||||
s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("bool")).
|
||||
Return([]string{}, []time.Time{}, nil)
|
||||
gc.start()
|
||||
// make ticker run at least once
|
||||
time.Sleep(time.Millisecond * 20)
|
||||
|
||||
s.NotPanics(func() {
|
||||
gc.close()
|
||||
})
|
||||
})
|
||||
|
||||
s.Run("nil_client", func() {
|
||||
// initial a new garbageCollector here
|
||||
mockKV := &kvmocks.TxnKV{}
|
||||
mockKV.EXPECT().LoadWithPrefix(mock.Anything).Return([]string{}, []string{}, nil)
|
||||
segRefer, err := NewSegmentReferenceManager(mockKV, nil)
|
||||
s.Require().NoError(err)
|
||||
|
||||
gc := newGarbageCollector(nil, newMockHandler(), segRefer, &mocks.MockIndexCoord{}, GcOption{
|
||||
cli: nil,
|
||||
enabled: true,
|
||||
})
|
||||
|
||||
s.NotPanics(func() {
|
||||
gc.start()
|
||||
})
|
||||
|
||||
s.NotPanics(func() {
|
||||
gc.close()
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (s *GarbageCollectorSuite) TestScan() {
|
||||
s.Run("listCollectionPrefix_fails", func() {
|
||||
s.mockChunkManager.ExpectedCalls = nil
|
||||
s.mockChunkManager.EXPECT().RootPath().Return("files")
|
||||
s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("bool")).
|
||||
Return(nil, nil, errors.New("mocked"))
|
||||
|
||||
s.gc.scan()
|
||||
s.mockChunkManager.AssertNotCalled(s.T(), "Remove", mock.Anything, mock.Anything)
|
||||
})
|
||||
|
||||
s.Run("collectionPrefix_invalid", func() {
|
||||
s.mockChunkManager.ExpectedCalls = nil
|
||||
s.mockChunkManager.EXPECT().RootPath().Return("files")
|
||||
/*
|
||||
s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("bool")).
|
||||
Return([]string{"files/insert_log/1/", "files/bad_prefix", "files/insert_log/string/"}, lo.RepeatBy(3, func(_ int) time.Time {
|
||||
return time.Now().Add(-time.Hour)
|
||||
}), nil)*/
|
||||
|
||||
logTypes := []string{"files/insert_log/", "files/stats_log/", "files/delta_log/"}
|
||||
for _, logType := range logTypes {
|
||||
validSubPath := "1/2/3/100/2000"
|
||||
if logType == "files/delta_log/" {
|
||||
validSubPath = "1/2/3/2000"
|
||||
}
|
||||
s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, logType, false).
|
||||
Return([]string{path.Join(logType, "1") + "/", path.Join(logType, "2") + "/", path.Join(logType, "string") + "/", "files/badprefix/"}, lo.RepeatBy(4, func(_ int) time.Time { return time.Now() }), nil)
|
||||
s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, path.Join(logType, "1")+"/", true).
|
||||
Return([]string{path.Join(logType, validSubPath)}, []time.Time{time.Now().Add(time.Hour * -48)}, nil)
|
||||
s.mockChunkManager.EXPECT().Remove(mock.Anything, path.Join(logType, validSubPath)).Return(nil)
|
||||
}
|
||||
|
||||
s.gc.option.collValidator = func(collID int64) bool {
|
||||
return collID == 1
|
||||
}
|
||||
|
||||
s.gc.scan()
|
||||
//s.mockChunkManager.AssertNotCalled(s.T(), "Remove", mock.Anything, mock.Anything)
|
||||
s.mockChunkManager.AssertExpectations(s.T())
|
||||
})
|
||||
|
||||
s.Run("fileScan_fails", func() {
|
||||
s.mockChunkManager.ExpectedCalls = nil
|
||||
s.mockChunkManager.Calls = nil
|
||||
s.mockChunkManager.EXPECT().RootPath().Return("files")
|
||||
isCollPrefix := func(prefix string) bool {
|
||||
return lo.Contains([]string{"files/insert_log/", "files/stats_log/", "files/delta_log/"}, prefix)
|
||||
}
|
||||
s.mockChunkManager.EXPECT().ListWithPrefix(mock.Anything, mock.AnythingOfType("string"), mock.AnythingOfType("bool")).Call.Return(
|
||||
func(_ context.Context, prefix string, recursive bool) []string {
|
||||
if isCollPrefix(prefix) {
|
||||
return []string{path.Join(prefix, "1")}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
func(_ context.Context, prefix string, recursive bool) []time.Time {
|
||||
if isCollPrefix(prefix) {
|
||||
return []time.Time{time.Now()}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
func(_ context.Context, prefix string, recursive bool) error {
|
||||
if isCollPrefix(prefix) {
|
||||
return nil
|
||||
}
|
||||
return errors.New("mocked")
|
||||
},
|
||||
)
|
||||
s.gc.option.collValidator = func(collID int64) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
s.gc.scan()
|
||||
s.mockChunkManager.AssertNotCalled(s.T(), "Remove", mock.Anything, mock.Anything)
|
||||
})
|
||||
}
|
||||
|
||||
func TestGarbageCollectorSuite(t *testing.T) {
|
||||
suite.Run(t, new(GarbageCollectorSuite))
|
||||
}
|
||||
|
||||
/*
|
||||
func Test_garbageCollector_basic(t *testing.T) {
|
||||
bucketName := `datacoord-ut` + strings.ToLower(funcutil.RandomString(8))
|
||||
rootPath := `gc` + funcutil.RandomString(8)
|
||||
|
@ -97,7 +259,7 @@ func Test_garbageCollector_basic(t *testing.T) {
|
|||
})
|
||||
})
|
||||
|
||||
}
|
||||
}*/
|
||||
|
||||
func validateMinioPrefixElements(t *testing.T, cli *minio.Client, bucketName string, prefix string, elements []string) {
|
||||
var current []string
|
||||
|
|
|
@ -416,6 +416,16 @@ func (s *Server) initGarbageCollection(cli storage.ChunkManager) {
|
|||
checkInterval: Params.DataCoordCfg.GCInterval,
|
||||
missingTolerance: Params.DataCoordCfg.GCMissingTolerance,
|
||||
dropTolerance: Params.DataCoordCfg.GCDropTolerance,
|
||||
collValidator: func(collID int64) bool {
|
||||
resp, err := s.rootCoordClient.DescribeCollectionInternal(context.Background(), &milvuspb.DescribeCollectionRequest{
|
||||
Base: commonpbutil.NewMsgBase(),
|
||||
CollectionID: collID,
|
||||
})
|
||||
if err != nil {
|
||||
log.Warn("failed to check collection id", zap.Int64("collID", collID), zap.Error(err))
|
||||
}
|
||||
return resp.GetStatus().GetErrorCode() == commonpb.ErrorCode_Success
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue