mirror of https://github.com/milvus-io/milvus.git
Faster garbage collect on compacted data (#25088)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>pull/25415/head
parent
7d00020c9e
commit
dbf0130803
|
@ -335,7 +335,7 @@ dataCoord:
|
|||
enableGarbageCollection: true
|
||||
gc:
|
||||
interval: 3600 # gc interval in seconds
|
||||
missingTolerance: 86400 # file meta missing tolerance duration in seconds, 60*24
|
||||
missingTolerance: 10800 # file meta missing tolerance duration in seconds, 10800
|
||||
dropTolerance: 3600 # file belongs to dropped entity tolerance duration in seconds. 3600
|
||||
enableActiveStandby: false
|
||||
port: 13333
|
||||
|
|
|
@ -148,6 +148,7 @@ func (gc *garbageCollector) scan() {
|
|||
var removedKeys []string
|
||||
|
||||
for _, prefix := range prefixes {
|
||||
startTs := time.Now()
|
||||
infoKeys, modTimes, err := gc.option.cli.ListWithPrefix(ctx, prefix, true)
|
||||
if err != nil {
|
||||
log.Error("failed to list files with prefix",
|
||||
|
@ -155,6 +156,7 @@ func (gc *garbageCollector) scan() {
|
|||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
log.Info("gc scan finish list object", zap.String("prefix", prefix), zap.Duration("time spent", time.Since(startTs)), zap.Int("keys", len(infoKeys)))
|
||||
for i, infoKey := range infoKeys {
|
||||
total++
|
||||
_, has := filesMap[infoKey]
|
||||
|
@ -236,7 +238,9 @@ func (gc *garbageCollector) clearEtcd() {
|
|||
|
||||
for _, segment := range drops {
|
||||
log := log.With(zap.Int64("segmentID", segment.ID))
|
||||
if !gc.isExpire(segment.GetDroppedAt()) {
|
||||
to, isCompacted := compactTo[segment.GetID()]
|
||||
// for compacted segment, try to clean up the files as long as target segment is there
|
||||
if !isCompacted && !gc.isExpire(segment.GetDroppedAt()) {
|
||||
continue
|
||||
}
|
||||
segInsertChannel := segment.GetInsertChannel()
|
||||
|
@ -254,7 +258,7 @@ func (gc *garbageCollector) clearEtcd() {
|
|||
}
|
||||
// For compact A, B -> C, don't GC A or B if C is not indexed,
|
||||
// guarantee replacing A, B with C won't downgrade performance
|
||||
if to, ok := compactTo[segment.GetID()]; ok && !indexedSet.Contain(to.GetID()) {
|
||||
if isCompacted && !indexedSet.Contain(to.GetID()) {
|
||||
log.WithRateGroup("GC_FAIL_COMPACT_TO_NOT_INDEXED", 1, 60).
|
||||
RatedWarn(60, "skipping GC when compact target segment is not indexed",
|
||||
zap.Int64("segmentID", to.GetID()))
|
||||
|
@ -263,16 +267,16 @@ func (gc *garbageCollector) clearEtcd() {
|
|||
logs := getLogs(segment)
|
||||
log.Info("GC segment", zap.Int64("segmentID", segment.GetID()))
|
||||
if gc.removeLogs(logs) {
|
||||
_ = gc.meta.DropSegment(segment.GetID())
|
||||
err := gc.meta.DropSegment(segment.GetID())
|
||||
log.Warn("failed to drop segment", zap.Int64("segment id", segment.GetID()), zap.Error(err))
|
||||
}
|
||||
if segList := gc.meta.GetSegmentsByChannel(segInsertChannel); len(segList) == 0 &&
|
||||
!gc.meta.catalog.ChannelExists(context.Background(), segInsertChannel) {
|
||||
log.Info("empty channel found during gc, manually cleanup channel checkpoints",
|
||||
zap.String("vChannel", segInsertChannel))
|
||||
log.Info("empty channel found during gc, manually cleanup channel checkpoints", zap.String("vChannel", segInsertChannel))
|
||||
|
||||
if err := gc.meta.DropChannelCheckpoint(segInsertChannel); err != nil {
|
||||
// Fail-open as there's nothing to do.
|
||||
log.Warn("failed to drop channel check point during segment garbage collection", zap.Error(err))
|
||||
log.Warn("failed to drop channel check point during segment garbage collection", zap.String("vchannel", segInsertChannel), zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -352,19 +356,20 @@ func (gc *garbageCollector) recycleUnusedIndexFiles() {
|
|||
log.Info("start recycleUnusedIndexFiles")
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
startTs := time.Now()
|
||||
prefix := path.Join(gc.option.cli.RootPath(), common.SegmentIndexPath) + "/"
|
||||
// list dir first
|
||||
keys, _, err := gc.option.cli.ListWithPrefix(ctx, prefix, false)
|
||||
if err != nil {
|
||||
log.Error("garbageCollector recycleUnusedIndexFiles list keys from chunk manager failed", zap.Error(err))
|
||||
log.Warn("garbageCollector recycleUnusedIndexFiles list keys from chunk manager failed", zap.Error(err))
|
||||
return
|
||||
}
|
||||
log.Info("recycleUnusedIndexFiles, finish list object", zap.Duration("time spent", time.Since(startTs)), zap.Int("build ids", len(keys)))
|
||||
for _, key := range keys {
|
||||
log.Debug("indexFiles keys", zap.String("key", key))
|
||||
buildID, err := parseBuildIDFromFilePath(key)
|
||||
if err != nil {
|
||||
log.Error("garbageCollector recycleUnusedIndexFiles parseIndexFileKey", zap.String("key", key), zap.Error(err))
|
||||
log.Warn("garbageCollector recycleUnusedIndexFiles parseIndexFileKey", zap.String("key", key), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
log.Info("garbageCollector will recycle index files", zap.Int64("buildID", buildID))
|
||||
|
|
|
@ -2133,11 +2133,12 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
}
|
||||
p.GCInterval.Init(base.mgr)
|
||||
|
||||
// Do you set this to incredible small value, make sure this to be more than 10 minutes at least
|
||||
p.GCMissingTolerance = ParamItem{
|
||||
Key: "dataCoord.gc.missingTolerance",
|
||||
Version: "2.0.0",
|
||||
DefaultValue: "86400",
|
||||
Doc: "file meta missing tolerance duration in seconds, 60*24",
|
||||
DefaultValue: "10800",
|
||||
Doc: "file meta missing tolerance duration in seconds, 60*60*3",
|
||||
Export: true,
|
||||
}
|
||||
p.GCMissingTolerance.Init(base.mgr)
|
||||
|
|
Loading…
Reference in New Issue