milvus/internal/datacoord/garbage_collector.go

907 lines
32 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"context"
"fmt"
"path"
"sync"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datacoord/broker"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// GcOption garbage collection options
type GcOption struct {
cli storage.ChunkManager // client
enabled bool // enable switch
checkInterval time.Duration // each interval
missingTolerance time.Duration // key missing in meta tolerance time
dropTolerance time.Duration // dropped segment related key tolerance time
scanInterval time.Duration // interval for scan residue for interupted log wrttien
broker broker.Broker
removeObjectPool *conc.Pool[struct{}]
}
// garbageCollector handles garbage files in object storage
// which could be dropped collection remanent or data node failure traces
type garbageCollector struct {
ctx context.Context
cancel context.CancelFunc
option GcOption
meta *meta
handler Handler
startOnce sync.Once
stopOnce sync.Once
wg sync.WaitGroup
cmdCh chan gcCmd
pauseUntil atomic.Time
}
type gcCmd struct {
cmdType datapb.GcCommand
duration time.Duration
done chan struct{}
}
// newGarbageCollector create garbage collector with meta and option
func newGarbageCollector(meta *meta, handler Handler, opt GcOption) *garbageCollector {
log.Info("GC with option",
zap.Bool("enabled", opt.enabled),
zap.Duration("interval", opt.checkInterval),
zap.Duration("scanInterval", opt.scanInterval),
zap.Duration("missingTolerance", opt.missingTolerance),
zap.Duration("dropTolerance", opt.dropTolerance))
opt.removeObjectPool = conc.NewPool[struct{}](Params.DataCoordCfg.GCRemoveConcurrent.GetAsInt(), conc.WithExpiryDuration(time.Minute))
ctx, cancel := context.WithCancel(context.Background())
return &garbageCollector{
ctx: ctx,
cancel: cancel,
meta: meta,
handler: handler,
option: opt,
cmdCh: make(chan gcCmd),
}
}
// start a goroutine and perform gc check every `checkInterval`
func (gc *garbageCollector) start() {
if gc.option.enabled {
if gc.option.cli == nil {
log.Warn("DataCoord gc enabled, but SSO client is not provided")
return
}
gc.startOnce.Do(func() {
gc.work(gc.ctx)
})
}
}
func (gc *garbageCollector) Pause(ctx context.Context, pauseDuration time.Duration) error {
if !gc.option.enabled {
log.Info("garbage collection not enabled")
return nil
}
done := make(chan struct{})
select {
case gc.cmdCh <- gcCmd{
cmdType: datapb.GcCommand_Pause,
duration: pauseDuration,
done: done,
}:
<-done
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (gc *garbageCollector) Resume(ctx context.Context) error {
if !gc.option.enabled {
log.Warn("garbage collection not enabled, cannot resume")
return merr.WrapErrServiceUnavailable("garbage collection not enabled")
}
done := make(chan struct{})
select {
case gc.cmdCh <- gcCmd{
cmdType: datapb.GcCommand_Resume,
done: done,
}:
<-done
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// work contains actual looping check logic
func (gc *garbageCollector) work(ctx context.Context) {
// TODO: fast cancel for gc when closing.
// Run gc tasks in parallel.
gc.wg.Add(3)
go func() {
defer gc.wg.Done()
gc.runRecycleTaskWithPauser(ctx, "meta", gc.option.checkInterval, func(ctx context.Context) {
gc.recycleDroppedSegments(ctx)
gc.recycleChannelCPMeta(ctx)
gc.recycleUnusedIndexes(ctx)
gc.recycleUnusedSegIndexes(ctx)
gc.recycleUnusedAnalyzeFiles(ctx)
gc.recycleUnusedTextIndexFiles(ctx)
})
}()
go func() {
defer gc.wg.Done()
gc.runRecycleTaskWithPauser(ctx, "orphan", gc.option.scanInterval, func(ctx context.Context) {
gc.recycleUnusedBinlogFiles(ctx)
gc.recycleUnusedIndexFiles(ctx)
})
}()
go func() {
defer gc.wg.Done()
gc.startControlLoop(ctx)
}()
}
// startControlLoop start a control loop for garbageCollector.
func (gc *garbageCollector) startControlLoop(_ context.Context) {
for {
select {
case cmd := <-gc.cmdCh:
switch cmd.cmdType {
case datapb.GcCommand_Pause:
pauseUntil := time.Now().Add(cmd.duration)
if pauseUntil.After(gc.pauseUntil.Load()) {
log.Info("garbage collection paused", zap.Duration("duration", cmd.duration), zap.Time("pauseUntil", pauseUntil))
gc.pauseUntil.Store(pauseUntil)
} else {
log.Info("new pause until before current value", zap.Duration("duration", cmd.duration), zap.Time("pauseUntil", pauseUntil), zap.Time("oldPauseUntil", gc.pauseUntil.Load()))
}
case datapb.GcCommand_Resume:
// reset to zero value
gc.pauseUntil.Store(time.Time{})
log.Info("garbage collection resumed")
}
close(cmd.done)
case <-gc.ctx.Done():
log.Warn("garbage collector control loop quit")
return
}
}
}
// runRecycleTaskWithPauser is a helper function to create a task with pauser
func (gc *garbageCollector) runRecycleTaskWithPauser(ctx context.Context, name string, interval time.Duration, task func(ctx context.Context)) {
logger := log.With(zap.String("gcType", name)).With(zap.Duration("interval", interval))
timer := time.NewTicker(interval)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-timer.C:
if time.Now().Before(gc.pauseUntil.Load()) {
logger.Info("garbage collector paused", zap.Time("until", gc.pauseUntil.Load()))
continue
}
logger.Info("garbage collector recycle task start...")
start := time.Now()
task(ctx)
logger.Info("garbage collector recycle task done", zap.Duration("timeCost", time.Since(start)))
}
}
}
// close stop the garbage collector.
func (gc *garbageCollector) close() {
gc.stopOnce.Do(func() {
gc.cancel()
gc.wg.Wait()
})
}
// recycleUnusedBinlogFiles load meta file info and compares OSS keys
// if missing found, performs gc cleanup
func (gc *garbageCollector) recycleUnusedBinlogFiles(ctx context.Context) {
start := time.Now()
log := log.With(zap.String("gcName", "recycleUnusedBinlogFiles"), zap.Time("startAt", start))
log.Info("start recycleUnusedBinlogFiles...")
defer func() { log.Info("recycleUnusedBinlogFiles done", zap.Duration("timeCost", time.Since(start))) }()
type scanTask struct {
prefix string
checker func(objectInfo *storage.ChunkObjectInfo, segment *SegmentInfo) bool
label string
}
scanTasks := []scanTask{
{
prefix: path.Join(gc.option.cli.RootPath(), common.SegmentInsertLogPath),
checker: func(objectInfo *storage.ChunkObjectInfo, segment *SegmentInfo) bool {
return segment != nil
},
label: metrics.InsertFileLabel,
},
{
prefix: path.Join(gc.option.cli.RootPath(), common.SegmentStatslogPath),
checker: func(objectInfo *storage.ChunkObjectInfo, segment *SegmentInfo) bool {
logID, err := binlog.GetLogIDFromBingLogPath(objectInfo.FilePath)
if err != nil {
log.Warn("garbageCollector find dirty stats log", zap.String("filePath", objectInfo.FilePath), zap.Error(err))
return false
}
return segment != nil && segment.IsStatsLogExists(logID)
},
label: metrics.StatFileLabel,
},
{
prefix: path.Join(gc.option.cli.RootPath(), common.SegmentDeltaLogPath),
checker: func(objectInfo *storage.ChunkObjectInfo, segment *SegmentInfo) bool {
logID, err := binlog.GetLogIDFromBingLogPath(objectInfo.FilePath)
if err != nil {
log.Warn("garbageCollector find dirty dleta log", zap.String("filePath", objectInfo.FilePath), zap.Error(err))
return false
}
return segment != nil && segment.IsDeltaLogExists(logID)
},
label: metrics.DeleteFileLabel,
},
}
for _, task := range scanTasks {
gc.recycleUnusedBinLogWithChecker(ctx, task.prefix, task.label, task.checker)
}
metrics.GarbageCollectorRunCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Add(1)
}
// recycleUnusedBinLogWithChecker scans the prefix and checks the path with checker.
// GC the file if checker returns false.
func (gc *garbageCollector) recycleUnusedBinLogWithChecker(ctx context.Context, prefix string, label string, checker func(objectInfo *storage.ChunkObjectInfo, segment *SegmentInfo) bool) {
logger := log.With(zap.String("prefix", prefix))
logger.Info("garbageCollector recycleUnusedBinlogFiles start", zap.String("prefix", prefix))
lastFilePath := ""
total := 0
valid := 0
unexpectedFailure := atomic.NewInt32(0)
removed := atomic.NewInt32(0)
start := time.Now()
futures := make([]*conc.Future[struct{}], 0)
err := gc.option.cli.WalkWithPrefix(ctx, prefix, true, func(chunkInfo *storage.ChunkObjectInfo) bool {
total++
lastFilePath = chunkInfo.FilePath
// Check file tolerance first to avoid unnecessary operation.
if time.Since(chunkInfo.ModifyTime) <= gc.option.missingTolerance {
logger.Info("garbageCollector recycleUnusedBinlogFiles skip file since it is not expired", zap.String("filePath", chunkInfo.FilePath), zap.Time("modifyTime", chunkInfo.ModifyTime))
return true
}
// Parse segmentID from file path.
// TODO: Does all files in the same segment have the same segmentID?
segmentID, err := storage.ParseSegmentIDByBinlog(gc.option.cli.RootPath(), chunkInfo.FilePath)
if err != nil {
unexpectedFailure.Inc()
logger.Warn("garbageCollector recycleUnusedBinlogFiles parse segment id error",
zap.String("filePath", chunkInfo.FilePath),
zap.Error(err))
return true
}
segment := gc.meta.GetSegment(ctx, segmentID)
if checker(chunkInfo, segment) {
valid++
logger.Info("garbageCollector recycleUnusedBinlogFiles skip file since it is valid", zap.String("filePath", chunkInfo.FilePath), zap.Int64("segmentID", segmentID))
return true
}
// ignore error since it could be cleaned up next time
file := chunkInfo.FilePath
future := gc.option.removeObjectPool.Submit(func() (struct{}, error) {
logger := logger.With(zap.String("file", file))
logger.Info("garbageCollector recycleUnusedBinlogFiles remove file...")
if err = gc.option.cli.Remove(ctx, file); err != nil {
log.Warn("garbageCollector recycleUnusedBinlogFiles remove file failed", zap.Error(err))
unexpectedFailure.Inc()
return struct{}{}, err
}
log.Info("garbageCollector recycleUnusedBinlogFiles remove file success")
removed.Inc()
return struct{}{}, nil
})
futures = append(futures, future)
return true
})
// Wait for all remove tasks done.
if err := conc.BlockOnAll(futures...); err != nil {
// error is logged, and can be ignored here.
logger.Warn("some task failure in remove object pool", zap.Error(err))
}
cost := time.Since(start)
logger.Info("garbageCollector recycleUnusedBinlogFiles done",
zap.Int("total", total),
zap.Int("valid", valid),
zap.Int("unexpectedFailure", int(unexpectedFailure.Load())),
zap.Int("removed", int(removed.Load())),
zap.String("lastFilePath", lastFilePath),
zap.Duration("cost", cost),
zap.Error(err))
metrics.GarbageCollectorFileScanDuration.
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), label).
Observe(float64(cost.Milliseconds()))
}
func (gc *garbageCollector) checkDroppedSegmentGC(segment *SegmentInfo,
childSegment *SegmentInfo,
indexSet typeutil.UniqueSet,
cpTimestamp Timestamp,
) bool {
log := log.With(zap.Int64("segmentID", segment.ID))
if !gc.isExpire(segment.GetDroppedAt()) {
return false
}
isCompacted := childSegment != nil || segment.GetCompacted()
if isCompacted {
// For compact A, B -> C, don't GC A or B if C is not indexed,
// guarantee replacing A, B with C won't downgrade performance
// If the child is GC'ed first, then childSegment will be nil.
if childSegment != nil && !indexSet.Contain(childSegment.GetID()) {
log.WithRateGroup("GC_FAIL_COMPACT_TO_NOT_INDEXED", 1, 60).
RatedInfo(60, "skipping GC when compact target segment is not indexed",
zap.Int64("child segment ID", childSegment.GetID()))
return false
}
}
segInsertChannel := segment.GetInsertChannel()
// Ignore segments from potentially dropped collection. Check if collection is to be dropped by checking if channel is dropped.
// We do this because collection meta drop relies on all segment being GCed.
if gc.meta.catalog.ChannelExists(context.Background(), segInsertChannel) &&
segment.GetDmlPosition().GetTimestamp() > cpTimestamp {
// segment gc shall only happen when channel cp is after segment dml cp.
log.WithRateGroup("GC_FAIL_CP_BEFORE", 1, 60).
RatedInfo(60, "dropped segment dml position after channel cp, skip meta gc",
zap.Uint64("dmlPosTs", segment.GetDmlPosition().GetTimestamp()),
zap.Uint64("channelCpTs", cpTimestamp),
)
return false
}
return true
}
// recycleDroppedSegments scans all segments and remove those dropped segments from meta and oss.
func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) {
start := time.Now()
log := log.With(zap.String("gcName", "recycleDroppedSegments"), zap.Time("startAt", start))
log.Info("start clear dropped segments...")
defer func() { log.Info("clear dropped segments done", zap.Duration("timeCost", time.Since(start))) }()
all := gc.meta.SelectSegments(ctx)
drops := make(map[int64]*SegmentInfo, 0)
compactTo := make(map[int64]*SegmentInfo)
channels := typeutil.NewSet[string]()
for _, segment := range all {
cloned := segment.Clone()
binlog.DecompressBinLogs(cloned.SegmentInfo)
if cloned.GetState() == commonpb.SegmentState_Dropped {
drops[cloned.GetID()] = cloned
channels.Insert(cloned.GetInsertChannel())
// continue
// A(indexed), B(indexed) -> C(no indexed), D(no indexed) -> E(no indexed), A, B can not be GC
}
for _, from := range cloned.GetCompactionFrom() {
compactTo[from] = cloned
}
}
droppedCompactTo := make(map[*SegmentInfo]struct{})
for id := range drops {
if to, ok := compactTo[id]; ok {
droppedCompactTo[to] = struct{}{}
}
}
indexedSegments := FilterInIndexedSegments(gc.handler, gc.meta, false, lo.Keys(droppedCompactTo)...)
indexedSet := make(typeutil.UniqueSet)
for _, segment := range indexedSegments {
indexedSet.Insert(segment.GetID())
}
channelCPs := make(map[string]uint64)
for channel := range channels {
pos := gc.meta.GetChannelCheckpoint(channel)
channelCPs[channel] = pos.GetTimestamp()
}
log.Info("start to GC segments", zap.Int("drop_num", len(drops)))
for segmentID, segment := range drops {
if ctx.Err() != nil {
// process canceled, stop.
return
}
log := log.With(zap.Int64("segmentID", segmentID))
segInsertChannel := segment.GetInsertChannel()
if !gc.checkDroppedSegmentGC(segment, compactTo[segment.GetID()], indexedSet, channelCPs[segInsertChannel]) {
continue
}
logs := getLogs(segment)
for key := range getTextLogs(segment) {
logs[key] = struct{}{}
}
log.Info("GC segment start...", zap.Int("insert_logs", len(segment.GetBinlogs())),
zap.Int("delta_logs", len(segment.GetDeltalogs())),
zap.Int("stats_logs", len(segment.GetStatslogs())),
zap.Int("bm25_logs", len(segment.GetBm25Statslogs())),
zap.Int("text_logs", len(segment.GetTextStatsLogs())))
if err := gc.removeObjectFiles(ctx, logs); err != nil {
log.Warn("GC segment remove logs failed", zap.Error(err))
continue
}
if err := gc.meta.DropSegment(ctx, segment.GetID()); err != nil {
log.Warn("GC segment meta failed to drop segment", zap.Error(err))
continue
}
log.Info("GC segment meta drop segment done")
}
}
func (gc *garbageCollector) recycleChannelCPMeta(ctx context.Context) {
log := log.Ctx(ctx)
channelCPs, err := gc.meta.catalog.ListChannelCheckpoint(ctx)
if err != nil {
log.Warn("list channel cp fail during GC", zap.Error(err))
return
}
collectionID2GcStatus := make(map[int64]bool)
skippedCnt := 0
log.Info("start to GC channel cp", zap.Int("vchannelCPCnt", len(channelCPs)))
for vChannel := range channelCPs {
collectionID := funcutil.GetCollectionIDFromVChannel(vChannel)
// !!! Skip to GC if vChannel format is illegal, it will lead meta leak in this case
if collectionID == -1 {
skippedCnt++
log.Warn("parse collection id fail, skip to gc channel cp", zap.String("vchannel", vChannel))
continue
}
_, ok := collectionID2GcStatus[collectionID]
if !ok {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
has, err := gc.option.broker.HasCollection(ctx, collectionID)
if err == nil && !has {
collectionID2GcStatus[collectionID] = gc.meta.catalog.GcConfirm(ctx, collectionID, -1)
} else {
// skip checkpoints GC of this cycle if describe collection fails or the collection state is available.
log.Debug("skip channel cp GC, the collection state is available",
zap.Int64("collectionID", collectionID),
zap.Bool("dropped", has), zap.Error(err))
collectionID2GcStatus[collectionID] = false
}
}
// Skip to GC if all segments meta of the corresponding collection are not removed
if gcConfirmed, _ := collectionID2GcStatus[collectionID]; !gcConfirmed {
skippedCnt++
continue
}
err := gc.meta.DropChannelCheckpoint(vChannel)
if err != nil {
// Try to GC in the next gc cycle if drop channel cp meta fail.
log.Warn("failed to drop channelcp check point during gc", zap.String("vchannel", vChannel), zap.Error(err))
} else {
log.Info("GC channel cp", zap.String("vchannel", vChannel))
}
}
log.Info("GC channel cp done", zap.Int("skippedChannelCP", skippedCnt))
}
func (gc *garbageCollector) isExpire(dropts Timestamp) bool {
droptime := time.Unix(0, int64(dropts))
return time.Since(droptime) > gc.option.dropTolerance
}
func getLogs(sinfo *SegmentInfo) map[string]struct{} {
logs := make(map[string]struct{})
for _, flog := range sinfo.GetBinlogs() {
for _, l := range flog.GetBinlogs() {
logs[l.GetLogPath()] = struct{}{}
}
}
for _, flog := range sinfo.GetStatslogs() {
for _, l := range flog.GetBinlogs() {
logs[l.GetLogPath()] = struct{}{}
}
}
for _, flog := range sinfo.GetDeltalogs() {
for _, l := range flog.GetBinlogs() {
logs[l.GetLogPath()] = struct{}{}
}
}
for _, flog := range sinfo.GetBm25Statslogs() {
for _, l := range flog.GetBinlogs() {
logs[l.GetLogPath()] = struct{}{}
}
}
return logs
}
func getTextLogs(sinfo *SegmentInfo) map[string]struct{} {
textLogs := make(map[string]struct{})
for _, flog := range sinfo.GetTextStatsLogs() {
for _, file := range flog.GetFiles() {
textLogs[file] = struct{}{}
}
}
return textLogs
}
// removeObjectFiles remove file from oss storage, return error if any log failed to remove.
func (gc *garbageCollector) removeObjectFiles(ctx context.Context, filePaths map[string]struct{}) error {
futures := make([]*conc.Future[struct{}], 0)
for filePath := range filePaths {
filePath := filePath
future := gc.option.removeObjectPool.Submit(func() (struct{}, error) {
err := gc.option.cli.Remove(ctx, filePath)
// ignore the error Key Not Found
if err != nil {
if !errors.Is(err, merr.ErrIoKeyNotFound) {
return struct{}{}, err
}
log.Ctx(ctx).Info("remove log failed, key not found, may be removed at previous GC, ignore the error",
zap.String("path", filePath),
zap.Error(err))
}
return struct{}{}, nil
})
futures = append(futures, future)
}
return conc.BlockOnAll(futures...)
}
// recycleUnusedIndexes is used to delete those indexes that is deleted by collection.
func (gc *garbageCollector) recycleUnusedIndexes(ctx context.Context) {
start := time.Now()
log := log.Ctx(ctx).With(zap.String("gcName", "recycleUnusedIndexes"), zap.Time("startAt", start))
log.Info("start recycleUnusedIndexes...")
defer func() { log.Info("recycleUnusedIndexes done", zap.Duration("timeCost", time.Since(start))) }()
deletedIndexes := gc.meta.indexMeta.GetDeletedIndexes()
for _, index := range deletedIndexes {
if ctx.Err() != nil {
// process canceled.
return
}
log := log.With(zap.Int64("collectionID", index.CollectionID), zap.Int64("fieldID", index.FieldID), zap.Int64("indexID", index.IndexID))
if err := gc.meta.indexMeta.RemoveIndex(ctx, index.CollectionID, index.IndexID); err != nil {
log.Warn("remove index on collection fail", zap.Error(err))
continue
}
log.Info("remove index on collection done")
}
}
// recycleUnusedSegIndexes remove the index of segment if index is deleted or segment itself is deleted.
func (gc *garbageCollector) recycleUnusedSegIndexes(ctx context.Context) {
start := time.Now()
log := log.Ctx(ctx).With(zap.String("gcName", "recycleUnusedSegIndexes"), zap.Time("startAt", start))
log.Info("start recycleUnusedSegIndexes...")
defer func() { log.Info("recycleUnusedSegIndexes done", zap.Duration("timeCost", time.Since(start))) }()
segIndexes := gc.meta.indexMeta.GetAllSegIndexes()
for _, segIdx := range segIndexes {
if ctx.Err() != nil {
// process canceled.
return
}
// 1. segment belongs to is deleted.
// 2. index is deleted.
if gc.meta.GetSegment(ctx, segIdx.SegmentID) == nil || !gc.meta.indexMeta.IsIndexExist(segIdx.CollectionID, segIdx.IndexID) {
indexFiles := gc.getAllIndexFilesOfIndex(segIdx)
log := log.With(zap.Int64("collectionID", segIdx.CollectionID),
zap.Int64("partitionID", segIdx.PartitionID),
zap.Int64("segmentID", segIdx.SegmentID),
zap.Int64("indexID", segIdx.IndexID),
zap.Int64("buildID", segIdx.BuildID),
zap.Int64("nodeID", segIdx.NodeID),
zap.Int("indexFiles", len(indexFiles)))
log.Info("GC Segment Index file start...")
// Remove index files first.
if err := gc.removeObjectFiles(ctx, indexFiles); err != nil {
log.Warn("fail to remove index files for index", zap.Error(err))
continue
}
// Remove meta from index meta.
if err := gc.meta.indexMeta.RemoveSegmentIndex(ctx, segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.IndexID, segIdx.BuildID); err != nil {
log.Warn("delete index meta from etcd failed, wait to retry", zap.Error(err))
continue
}
log.Info("index meta recycle success")
}
}
}
// recycleUnusedIndexFiles is used to delete those index files that no longer exist in the meta.
func (gc *garbageCollector) recycleUnusedIndexFiles(ctx context.Context) {
start := time.Now()
log := log.Ctx(ctx).With(zap.String("gcName", "recycleUnusedIndexFiles"), zap.Time("startAt", start))
log.Info("start recycleUnusedIndexFiles...")
prefix := path.Join(gc.option.cli.RootPath(), common.SegmentIndexPath) + "/"
// list dir first
keyCount := 0
err := gc.option.cli.WalkWithPrefix(ctx, prefix, false, func(indexPathInfo *storage.ChunkObjectInfo) bool {
key := indexPathInfo.FilePath
keyCount++
logger := log.With(zap.String("prefix", prefix), zap.String("key", key))
buildID, err := parseBuildIDFromFilePath(key)
if err != nil {
logger.Warn("garbageCollector recycleUnusedIndexFiles parseIndexFileKey", zap.Error(err))
return true
}
logger = logger.With(zap.Int64("buildID", buildID))
logger.Info("garbageCollector will recycle index files")
canRecycle, segIdx := gc.meta.indexMeta.CheckCleanSegmentIndex(buildID)
if !canRecycle {
// Even if the index is marked as deleted, the index file will not be recycled, wait for the next gc,
// and delete all index files about the buildID at one time.
logger.Info("garbageCollector can not recycle index files")
return true
}
if segIdx == nil {
// buildID no longer exists in meta, remove all index files
logger.Info("garbageCollector recycleUnusedIndexFiles find meta has not exist, remove index files")
err = gc.option.cli.RemoveWithPrefix(ctx, key)
if err != nil {
logger.Warn("garbageCollector recycleUnusedIndexFiles remove index files failed", zap.Error(err))
return true
}
logger.Info("garbageCollector recycleUnusedIndexFiles remove index files success")
return true
}
filesMap := gc.getAllIndexFilesOfIndex(segIdx)
logger.Info("recycle index files", zap.Int("meta files num", len(filesMap)))
deletedFilesNum := atomic.NewInt32(0)
fileNum := 0
futures := make([]*conc.Future[struct{}], 0)
err = gc.option.cli.WalkWithPrefix(ctx, key, true, func(indexFile *storage.ChunkObjectInfo) bool {
fileNum++
file := indexFile.FilePath
if _, ok := filesMap[file]; !ok {
future := gc.option.removeObjectPool.Submit(func() (struct{}, error) {
logger := logger.With(zap.String("file", file))
logger.Info("garbageCollector recycleUnusedIndexFiles remove file...")
if err := gc.option.cli.Remove(ctx, file); err != nil {
logger.Warn("garbageCollector recycleUnusedIndexFiles remove file failed", zap.Error(err))
return struct{}{}, err
}
deletedFilesNum.Inc()
logger.Info("garbageCollector recycleUnusedIndexFiles remove file success")
return struct{}{}, nil
})
futures = append(futures, future)
}
return true
})
// Wait for all remove tasks done.
if err := conc.BlockOnAll(futures...); err != nil {
// error is logged, and can be ignored here.
logger.Warn("some task failure in remove object pool", zap.Error(err))
}
logger = logger.With(zap.Int("deleteIndexFilesNum", int(deletedFilesNum.Load())), zap.Int("walkFileNum", fileNum))
if err != nil {
logger.Warn("index files recycle failed when walk with prefix", zap.Error(err))
return true
}
logger.Info("index files recycle done")
return true
})
log = log.With(zap.Duration("timeCost", time.Since(start)), zap.Int("keyCount", keyCount), zap.Error(err))
if err != nil {
log.Warn("garbageCollector recycleUnusedIndexFiles failed", zap.Error(err))
return
}
log.Info("recycleUnusedIndexFiles done")
}
// getAllIndexFilesOfIndex returns the all index files of index.
func (gc *garbageCollector) getAllIndexFilesOfIndex(segmentIndex *model.SegmentIndex) map[string]struct{} {
filesMap := make(map[string]struct{})
for _, fileID := range segmentIndex.IndexFileKeys {
filepath := metautil.BuildSegmentIndexFilePath(gc.option.cli.RootPath(), segmentIndex.BuildID, segmentIndex.IndexVersion,
segmentIndex.PartitionID, segmentIndex.SegmentID, fileID)
filesMap[filepath] = struct{}{}
}
return filesMap
}
// recycleUnusedAnalyzeFiles is used to delete those analyze stats files that no longer exist in the meta.
func (gc *garbageCollector) recycleUnusedAnalyzeFiles(ctx context.Context) {
log := log.Ctx(ctx)
log.Info("start recycleUnusedAnalyzeFiles")
startTs := time.Now()
prefix := path.Join(gc.option.cli.RootPath(), common.AnalyzeStatsPath) + "/"
// list dir first
keys := make([]string, 0)
err := gc.option.cli.WalkWithPrefix(ctx, prefix, false, func(chunkInfo *storage.ChunkObjectInfo) bool {
keys = append(keys, chunkInfo.FilePath)
return true
})
if err != nil {
log.Warn("garbageCollector recycleUnusedAnalyzeFiles list keys from chunk manager failed", zap.Error(err))
return
}
log.Info("recycleUnusedAnalyzeFiles, finish list object", zap.Duration("time spent", time.Since(startTs)), zap.Int("task ids", len(keys)))
for _, key := range keys {
if ctx.Err() != nil {
// process canceled
return
}
log.Debug("analyze keys", zap.String("key", key))
taskID, err := parseBuildIDFromFilePath(key)
if err != nil {
log.Warn("garbageCollector recycleUnusedAnalyzeFiles parseAnalyzeResult failed", zap.String("key", key), zap.Error(err))
continue
}
log.Info("garbageCollector will recycle analyze stats files", zap.Int64("taskID", taskID))
canRecycle, task := gc.meta.analyzeMeta.CheckCleanAnalyzeTask(taskID)
if !canRecycle {
// Even if the analysis task is marked as deleted, the analysis stats file will not be recycled, wait for the next gc,
// and delete all index files about the taskID at one time.
log.Info("garbageCollector no need to recycle analyze stats files", zap.Int64("taskID", taskID))
continue
}
if task == nil {
// taskID no longer exists in meta, remove all analysis files
log.Info("garbageCollector recycleUnusedAnalyzeFiles find meta has not exist, remove index files",
zap.Int64("taskID", taskID))
err = gc.option.cli.RemoveWithPrefix(ctx, key)
if err != nil {
log.Warn("garbageCollector recycleUnusedAnalyzeFiles remove analyze stats files failed",
zap.Int64("taskID", taskID), zap.String("prefix", key), zap.Error(err))
continue
}
log.Info("garbageCollector recycleUnusedAnalyzeFiles remove analyze stats files success",
zap.Int64("taskID", taskID), zap.String("prefix", key))
continue
}
log.Info("remove analyze stats files which version is less than current task",
zap.Int64("taskID", taskID), zap.Int64("current version", task.Version))
var i int64
for i = 0; i < task.Version; i++ {
if ctx.Err() != nil {
// process canceled.
return
}
removePrefix := prefix + fmt.Sprintf("%d/", task.Version)
if err := gc.option.cli.RemoveWithPrefix(ctx, removePrefix); err != nil {
log.Warn("garbageCollector recycleUnusedAnalyzeFiles remove files with prefix failed",
zap.Int64("taskID", taskID), zap.String("removePrefix", removePrefix))
continue
}
}
log.Info("analyze stats files recycle success", zap.Int64("taskID", taskID))
}
}
// recycleUnusedTextIndexFiles load meta file info and compares OSS keys
// if missing found, performs gc cleanup
func (gc *garbageCollector) recycleUnusedTextIndexFiles(ctx context.Context) {
start := time.Now()
log := log.Ctx(ctx).With(zap.String("gcName", "recycleUnusedTextIndexFiles"), zap.Time("startAt", start))
log.Info("start recycleUnusedTextIndexFiles...")
defer func() { log.Info("recycleUnusedTextIndexFiles done", zap.Duration("timeCost", time.Since(start))) }()
hasTextIndexSegments := gc.meta.SelectSegments(ctx, SegmentFilterFunc(func(info *SegmentInfo) bool {
return len(info.GetTextStatsLogs()) != 0
}))
fileNum := 0
deletedFilesNum := atomic.NewInt32(0)
for _, seg := range hasTextIndexSegments {
for _, fieldStats := range seg.GetTextStatsLogs() {
log := log.With(zap.Int64("segmentID", seg.GetID()), zap.Int64("fieldID", fieldStats.GetFieldID()))
// clear low version task
for i := int64(1); i < fieldStats.GetVersion(); i++ {
prefix := fmt.Sprintf("%s/%s/%d/%d/%d/%d/%d", gc.option.cli.RootPath(), common.TextIndexPath,
seg.GetCollectionID(), seg.GetPartitionID(), seg.GetID(), fieldStats.GetFieldID(), i)
futures := make([]*conc.Future[struct{}], 0)
err := gc.option.cli.WalkWithPrefix(ctx, prefix, true, func(files *storage.ChunkObjectInfo) bool {
file := files.FilePath
future := gc.option.removeObjectPool.Submit(func() (struct{}, error) {
log := log.With(zap.String("file", file))
log.Info("garbageCollector recycleUnusedTextIndexFiles remove file...")
if err := gc.option.cli.Remove(ctx, file); err != nil {
log.Warn("garbageCollector recycleUnusedTextIndexFiles remove file failed", zap.Error(err))
return struct{}{}, err
}
deletedFilesNum.Inc()
log.Info("garbageCollector recycleUnusedTextIndexFiles remove file success")
return struct{}{}, nil
})
futures = append(futures, future)
return true
})
// Wait for all remove tasks done.
if err := conc.BlockOnAll(futures...); err != nil {
// error is logged, and can be ignored here.
log.Warn("some task failure in remove object pool", zap.Error(err))
}
log = log.With(zap.Int("deleteIndexFilesNum", int(deletedFilesNum.Load())), zap.Int("walkFileNum", fileNum))
if err != nil {
log.Warn("text index files recycle failed when walk with prefix", zap.Error(err))
return
}
}
}
}
log.Info("text index files recycle done")
metrics.GarbageCollectorRunCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Add(1)
}