mirror of https://github.com/milvus-io/milvus.git
Fix scan may break GC limitation (#19670)
Signed-off-by: yah01 <yang.cen@zilliz.com> Signed-off-by: yah01 <yang.cen@zilliz.com>pull/19802/head
parent
d45d667f5f
commit
89c9cb3680
|
@ -19,6 +19,7 @@ package datacoord
|
|||
import (
|
||||
"context"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -120,11 +121,21 @@ func (gc *garbageCollector) close() {
|
|||
func (gc *garbageCollector) scan() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
var total, valid, missing int
|
||||
segmentFiles := gc.meta.ListSegmentFiles()
|
||||
filesMap := make(map[string]struct{})
|
||||
for _, k := range segmentFiles {
|
||||
filesMap[k.GetLogPath()] = struct{}{}
|
||||
|
||||
var (
|
||||
total = 0
|
||||
valid = 0
|
||||
missing = 0
|
||||
|
||||
segmentMap = typeutil.NewUniqueSet()
|
||||
filesMap = typeutil.NewSet[string]()
|
||||
)
|
||||
segments := gc.meta.GetAllSegmentsUnsafe()
|
||||
for _, segment := range segments {
|
||||
segmentMap.Insert(segment.GetID())
|
||||
for _, log := range getLogs(segment) {
|
||||
filesMap.Insert(log.GetLogPath())
|
||||
}
|
||||
}
|
||||
|
||||
// walk only data cluster related prefixes
|
||||
|
@ -137,7 +148,10 @@ func (gc *garbageCollector) scan() {
|
|||
for _, prefix := range prefixes {
|
||||
infoKeys, modTimes, err := gc.option.cli.ListWithPrefix(ctx, prefix, true)
|
||||
if err != nil {
|
||||
log.Error("gc listWithPrefix error", zap.String("error", err.Error()))
|
||||
log.Error("failed to list files with prefix",
|
||||
zap.String("prefix", prefix),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
for i, infoKey := range infoKeys {
|
||||
total++
|
||||
|
@ -150,7 +164,9 @@ func (gc *garbageCollector) scan() {
|
|||
segmentID, err := storage.ParseSegmentIDByBinlog(gc.option.cli.RootPath(), infoKey)
|
||||
if err != nil {
|
||||
missing++
|
||||
log.Warn("parse segment id error", zap.String("infoKey", infoKey), zap.Error(err))
|
||||
log.Warn("parse segment id error",
|
||||
zap.String("infoKey", infoKey),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -159,6 +175,12 @@ func (gc *garbageCollector) scan() {
|
|||
continue
|
||||
}
|
||||
|
||||
if strings.Contains(prefix, statsLogPrefix) &&
|
||||
segmentMap.Contain(segmentID) {
|
||||
valid++
|
||||
continue
|
||||
}
|
||||
|
||||
// not found in meta, check last modified time exceeds tolerance duration
|
||||
if time.Since(modTimes[i]) > gc.option.missingTolerance {
|
||||
// ignore error since it could be cleaned up next time
|
||||
|
@ -166,13 +188,18 @@ func (gc *garbageCollector) scan() {
|
|||
err = gc.option.cli.Remove(ctx, infoKey)
|
||||
if err != nil {
|
||||
missing++
|
||||
log.Error("failed to remove object", zap.String("infoKey", infoKey), zap.Error(err))
|
||||
log.Error("failed to remove object",
|
||||
zap.String("infoKey", infoKey),
|
||||
zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Info("scan file to do garbage collection", zap.Int("total", total),
|
||||
zap.Int("valid", valid), zap.Int("missing", missing), zap.Strings("removed keys", removedKeys))
|
||||
log.Info("scan file to do garbage collection",
|
||||
zap.Int("total", total),
|
||||
zap.Int("valid", valid),
|
||||
zap.Int("missing", missing),
|
||||
zap.Strings("removedKeys", removedKeys))
|
||||
}
|
||||
|
||||
func (gc *garbageCollector) clearEtcd() {
|
||||
|
@ -201,18 +228,20 @@ func (gc *garbageCollector) clearEtcd() {
|
|||
indexedSet.Insert(segment.GetID())
|
||||
}
|
||||
|
||||
for _, sinfo := range drops {
|
||||
if !gc.isExpire(sinfo.GetDroppedAt()) {
|
||||
for _, segment := range drops {
|
||||
if !gc.isExpire(segment.GetDroppedAt()) {
|
||||
continue
|
||||
}
|
||||
// For compact A, B -> C, don't GC A or B if C is not indexed,
|
||||
// guarantee replacing A, B with C won't downgrade performance
|
||||
if to, ok := compactTo[sinfo.GetID()]; ok && !indexedSet.Contain(to.GetID()) {
|
||||
if to, ok := compactTo[segment.GetID()]; ok && !indexedSet.Contain(to.GetID()) {
|
||||
continue
|
||||
}
|
||||
logs := getLogs(sinfo)
|
||||
logs := getLogs(segment)
|
||||
log.Info("GC segment",
|
||||
zap.Int64("segmentID", segment.GetID()))
|
||||
if gc.removeLogs(logs) {
|
||||
_ = gc.meta.DropSegment(sinfo.GetID())
|
||||
_ = gc.meta.DropSegment(segment.GetID())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -119,8 +119,6 @@ func Test_garbageCollector_scan(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
assert.NotNil(t, segRefer)
|
||||
|
||||
indexCoord := mocks.NewMockIndexCoord(t)
|
||||
|
||||
t.Run("key is reference", func(t *testing.T) {
|
||||
segReferManager := &SegmentReferenceManager{
|
||||
etcdKV: etcdKV,
|
||||
|
@ -137,6 +135,8 @@ func Test_garbageCollector_scan(t *testing.T) {
|
|||
2: 1,
|
||||
},
|
||||
}
|
||||
|
||||
indexCoord := mocks.NewMockIndexCoord(t)
|
||||
gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{
|
||||
cli: cli,
|
||||
enabled: true,
|
||||
|
@ -158,6 +158,7 @@ func Test_garbageCollector_scan(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("missing all but save tolerance", func(t *testing.T) {
|
||||
indexCoord := mocks.NewMockIndexCoord(t)
|
||||
gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{
|
||||
cli: cli,
|
||||
enabled: true,
|
||||
|
@ -183,6 +184,7 @@ func Test_garbageCollector_scan(t *testing.T) {
|
|||
err = meta.AddSegment(segment)
|
||||
require.NoError(t, err)
|
||||
|
||||
indexCoord := mocks.NewMockIndexCoord(t)
|
||||
gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{
|
||||
cli: cli,
|
||||
enabled: true,
|
||||
|
@ -211,6 +213,7 @@ func Test_garbageCollector_scan(t *testing.T) {
|
|||
err = meta.AddSegment(segment)
|
||||
require.NoError(t, err)
|
||||
|
||||
indexCoord := mocks.NewMockIndexCoord(t)
|
||||
gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{
|
||||
cli: cli,
|
||||
enabled: true,
|
||||
|
@ -227,6 +230,7 @@ func Test_garbageCollector_scan(t *testing.T) {
|
|||
gc.close()
|
||||
})
|
||||
t.Run("missing gc all", func(t *testing.T) {
|
||||
indexCoord := mocks.NewMockIndexCoord(t)
|
||||
gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{
|
||||
cli: cli,
|
||||
enabled: true,
|
||||
|
@ -248,6 +252,7 @@ func Test_garbageCollector_scan(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("list object with error", func(t *testing.T) {
|
||||
indexCoord := mocks.NewMockIndexCoord(t)
|
||||
gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{
|
||||
cli: cli,
|
||||
enabled: true,
|
||||
|
|
|
@ -264,7 +264,8 @@ func (m *meta) GetSegment(segID UniqueID) *SegmentInfo {
|
|||
return nil
|
||||
}
|
||||
|
||||
// GetSegment returns segment info with provided id
|
||||
// GetSegmentUnsafe returns segment info with provided id
|
||||
// include the unhealthy segment
|
||||
// if not segment is found, nil will be returned
|
||||
func (m *meta) GetSegmentUnsafe(segID UniqueID) *SegmentInfo {
|
||||
m.RLock()
|
||||
|
@ -272,16 +273,11 @@ func (m *meta) GetSegmentUnsafe(segID UniqueID) *SegmentInfo {
|
|||
return m.segments.GetSegment(segID)
|
||||
}
|
||||
|
||||
// GetAllSegment returns segment info with provided id
|
||||
// different from GetSegment, this will return unhealthy segment as well
|
||||
func (m *meta) GetAllSegment(segID UniqueID) *SegmentInfo {
|
||||
// GetAllSegmentsUnsafe returns all segments
|
||||
func (m *meta) GetAllSegmentsUnsafe() []*SegmentInfo {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
segment := m.segments.GetSegment(segID)
|
||||
if segment != nil {
|
||||
return segment
|
||||
}
|
||||
return nil
|
||||
return m.segments.GetSegments()
|
||||
}
|
||||
|
||||
// SetState setting segment with provided ID state
|
||||
|
@ -646,32 +642,6 @@ func (m *meta) batchSaveDropSegments(channel string, modSegments map[int64]*Segm
|
|||
return nil
|
||||
}
|
||||
|
||||
// ListSegmentFiles lists all segments' logs
|
||||
func (m *meta) ListSegmentFiles() []*datapb.Binlog {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
var logs []*datapb.Binlog
|
||||
|
||||
for _, segment := range m.segments.GetSegments() {
|
||||
if !isSegmentHealthy(segment) {
|
||||
continue
|
||||
}
|
||||
for _, binlog := range segment.GetBinlogs() {
|
||||
logs = append(logs, binlog.Binlogs...)
|
||||
}
|
||||
|
||||
for _, statLog := range segment.GetStatslogs() {
|
||||
logs = append(logs, statLog.Binlogs...)
|
||||
}
|
||||
|
||||
for _, deltaLog := range segment.GetDeltalogs() {
|
||||
logs = append(logs, deltaLog.Binlogs...)
|
||||
}
|
||||
}
|
||||
return logs
|
||||
}
|
||||
|
||||
// GetSegmentsByChannel returns all segment info which insert channel equals provided `dmlCh`
|
||||
func (m *meta) GetSegmentsByChannel(dmlCh string) []*SegmentInfo {
|
||||
m.RLock()
|
||||
|
|
|
@ -997,9 +997,9 @@ func TestMeta_GetAllSegments(t *testing.T) {
|
|||
}
|
||||
|
||||
seg1 := m.GetSegment(1)
|
||||
seg1All := m.GetAllSegment(1)
|
||||
seg1All := m.GetSegmentUnsafe(1)
|
||||
seg2 := m.GetSegment(2)
|
||||
seg2All := m.GetAllSegment(2)
|
||||
seg2All := m.GetSegmentUnsafe(2)
|
||||
assert.NotNil(t, seg1)
|
||||
assert.NotNil(t, seg1All)
|
||||
assert.Nil(t, seg2)
|
||||
|
|
|
@ -594,11 +594,11 @@ func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.Dat
|
|||
func (s *Server) updateSegmentStatistics(stats []*datapb.SegmentStats) {
|
||||
for _, stat := range stats {
|
||||
// Log if # of rows is updated.
|
||||
if s.meta.GetAllSegment(stat.GetSegmentID()) != nil &&
|
||||
s.meta.GetAllSegment(stat.GetSegmentID()).GetNumOfRows() != stat.GetNumRows() {
|
||||
if s.meta.GetSegmentUnsafe(stat.GetSegmentID()) != nil &&
|
||||
s.meta.GetSegmentUnsafe(stat.GetSegmentID()).GetNumOfRows() != stat.GetNumRows() {
|
||||
log.Debug("Updating segment number of rows",
|
||||
zap.Int64("segment ID", stat.GetSegmentID()),
|
||||
zap.Int64("old value", s.meta.GetAllSegment(stat.GetSegmentID()).GetNumOfRows()),
|
||||
zap.Int64("old value", s.meta.GetSegmentUnsafe(stat.GetSegmentID()).GetNumOfRows()),
|
||||
zap.Int64("new value", stat.GetNumRows()),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -342,7 +342,7 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
|
|||
for _, id := range req.SegmentIDs {
|
||||
var info *SegmentInfo
|
||||
if req.IncludeUnHealthy {
|
||||
info = s.meta.GetAllSegment(id)
|
||||
info = s.meta.GetSegmentUnsafe(id)
|
||||
if info == nil {
|
||||
log.Warn("failed to get segment, this may have been cleaned", zap.Int64("segmentID", id))
|
||||
resp.Status.Reason = msgSegmentNotFound(id)
|
||||
|
|
|
@ -36,6 +36,14 @@ import (
|
|||
"golang.org/x/exp/mmap"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrNoSuchKey = errors.New("NoSuchKey")
|
||||
)
|
||||
|
||||
func WrapErrNoSuchKey(key string) error {
|
||||
return fmt.Errorf("%w(key=%s)", ErrNoSuchKey, key)
|
||||
}
|
||||
|
||||
var CheckBucketRetryAttempts uint = 20
|
||||
|
||||
// MinioChunkManager is responsible for read and write data stored in minio.
|
||||
|
@ -215,7 +223,7 @@ func (mcm *MinioChunkManager) Read(ctx context.Context, filePath string) ([]byte
|
|||
if err != nil {
|
||||
errResponse := minio.ToErrorResponse(err)
|
||||
if errResponse.Code == "NoSuchKey" {
|
||||
return nil, errors.New("NoSuchKey")
|
||||
return nil, WrapErrNoSuchKey(filePath)
|
||||
}
|
||||
log.Warn("failed to read object", zap.String("path", filePath), zap.Error(err))
|
||||
return nil, err
|
||||
|
|
Loading…
Reference in New Issue