Fix segment not found if forward delete to empty segment (#22528)

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/22558/head
yah01 2023-03-03 14:13:49 +08:00 committed by GitHub
parent ab8738ef05
commit 319f1773af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 315 additions and 102 deletions

View File

@ -620,7 +620,7 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) {
assert.NoError(t, err)
segments = meta.GetAllSegmentsUnsafe()
assert.Equal(t, len(segments), 2)
assert.Equal(t, len(segments), 3)
for _, segment := range segments {
assert.True(t, segment.State == commonpb.SegmentState_Dropped)

View File

@ -216,9 +216,9 @@ func (t *compactionTrigger) triggerCompaction() error {
return nil
}
// triggerSingleCompaction triger a compaction bundled with collection-partiiton-channel-segment
// triggerSingleCompaction triger a compaction bundled with collection-partition-channel-segment
func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string) error {
// If AutoCompaction diabled, flush request will not trigger compaction
// If AutoCompaction disabled, flush request will not trigger compaction
if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() {
return nil
}
@ -424,7 +424,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
return
}
segment := t.meta.GetSegment(signal.segmentID)
segment := t.meta.GetHealthySegment(signal.segmentID)
if segment == nil {
log.Warn("segment in compaction signal not found in meta", zap.Int64("segmentID", signal.segmentID))
return

View File

@ -358,7 +358,7 @@ func (gc *garbageCollector) recycleUnusedIndexes() {
func (gc *garbageCollector) recycleUnusedSegIndexes() {
segIndexes := gc.meta.GetAllSegIndexes()
for _, segIdx := range segIndexes {
if gc.meta.GetSegmentUnsafe(segIdx.SegmentID) == nil || !gc.meta.IsIndexExist(segIdx.CollectionID, segIdx.IndexID) {
if gc.meta.GetSegment(segIdx.SegmentID) == nil || !gc.meta.IsIndexExist(segIdx.CollectionID, segIdx.IndexID) {
if err := gc.meta.RemoveSegmentIndex(segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.IndexID, segIdx.BuildID); err != nil {
log.Warn("delete index meta from etcd failed, wait to retry", zap.Int64("buildID", segIdx.BuildID),
zap.Int64("segID", segIdx.SegmentID), zap.Int64("nodeID", segIdx.NodeID), zap.Error(err))

View File

@ -1143,17 +1143,17 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
}
gc.clearEtcd()
segA := gc.meta.GetSegmentUnsafe(segID)
segA := gc.meta.GetSegment(segID)
assert.NotNil(t, segA)
segB := gc.meta.GetSegmentUnsafe(segID + 1)
segB := gc.meta.GetSegment(segID + 1)
assert.NotNil(t, segB)
segC := gc.meta.GetSegmentUnsafe(segID + 2)
segC := gc.meta.GetSegment(segID + 2)
assert.NotNil(t, segC)
segD := gc.meta.GetSegmentUnsafe(segID + 3)
segD := gc.meta.GetSegment(segID + 3)
assert.NotNil(t, segD)
segE := gc.meta.GetSegmentUnsafe(segID + 4)
segE := gc.meta.GetSegment(segID + 4)
assert.NotNil(t, segE)
segF := gc.meta.GetSegmentUnsafe(segID + 5)
segF := gc.meta.GetSegment(segID + 5)
assert.NotNil(t, segF)
err := gc.meta.AddSegmentIndex(&model.SegmentIndex{
@ -1180,21 +1180,21 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
//assert.NotNil(t, segA)
//segB := gc.meta.GetSegmentUnsafe(segID + 1)
//assert.NotNil(t, segB)
segC = gc.meta.GetSegmentUnsafe(segID + 2)
segC = gc.meta.GetSegment(segID + 2)
assert.Nil(t, segC)
segD = gc.meta.GetSegmentUnsafe(segID + 3)
segD = gc.meta.GetSegment(segID + 3)
assert.Nil(t, segD)
segE = gc.meta.GetSegmentUnsafe(segID + 4)
segE = gc.meta.GetSegment(segID + 4)
assert.NotNil(t, segE)
segF = gc.meta.GetSegmentUnsafe(segID + 5)
segF = gc.meta.GetSegment(segID + 5)
assert.NotNil(t, segF)
gc.clearEtcd()
segA = gc.meta.GetSegmentUnsafe(segID)
segA = gc.meta.GetSegment(segID)
assert.Nil(t, segA)
segB = gc.meta.GetSegmentUnsafe(segID + 1)
segB = gc.meta.GetSegment(segID + 1)
assert.Nil(t, segB)
segF = gc.meta.GetSegmentUnsafe(segID + 5)
segF = gc.meta.GetSegment(segID + 5)
assert.NotNil(t, segF)
}

View File

@ -220,7 +220,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
switch state {
case indexTaskInit:
segment := ib.meta.GetSegmentUnsafe(meta.SegmentID)
segment := ib.meta.GetSegment(meta.SegmentID)
if !isSegmentHealthy(segment) || !ib.meta.IsIndexExist(meta.CollectionID, meta.IndexID) {
log.Ctx(ib.ctx).Info("task is no need to build index, remove it", zap.Int64("buildID", buildID))
deleteFunc(buildID)

View File

@ -108,7 +108,7 @@ func (s *Server) createIndexForSegmentLoop(ctx context.Context) {
}
case segID := <-s.buildIndexCh:
log.Info("receive new flushed segment", zap.Int64("segID", segID))
segment := s.meta.GetSegmentUnsafe(segID)
segment := s.meta.GetSegment(segID)
if segment == nil {
log.Warn("segment is not exist, no need to build index", zap.Int64("segID", segID))
continue

View File

@ -315,9 +315,9 @@ func (m *meta) DropSegment(segmentID UniqueID) error {
return nil
}
// GetSegment returns segment info with provided id
// GetHealthySegment returns segment info with provided id
// if not segment is found, nil will be returned
func (m *meta) GetSegment(segID UniqueID) *SegmentInfo {
func (m *meta) GetHealthySegment(segID UniqueID) *SegmentInfo {
m.RLock()
defer m.RUnlock()
segment := m.segments.GetSegment(segID)
@ -327,10 +327,10 @@ func (m *meta) GetSegment(segID UniqueID) *SegmentInfo {
return nil
}
// GetSegmentUnsafe returns segment info with provided id
// GetSegment returns segment info with provided id
// include the unhealthy segment
// if not segment is found, nil will be returned
func (m *meta) GetSegmentUnsafe(segID UniqueID) *SegmentInfo {
func (m *meta) GetSegment(segID UniqueID) *SegmentInfo {
m.RLock()
defer m.RUnlock()
return m.segments.GetSegment(segID)
@ -706,7 +706,7 @@ func (m *meta) mergeDropSegment(seg2Drop *SegmentInfo) (*SegmentInfo, *segMetric
// since the channel unwatching operation is not atomic here
// ** the removal flag is always with last batch
// ** the last batch must contains at least one segment
// 1. when failure occurs between batches, failover mechanism will continue with the earlist checkpoint of this channel
// 1. when failure occurs between batches, failover mechanism will continue with the earliest checkpoint of this channel
// since the flag is not marked so DataNode can re-consume the drop collection msg
// 2. when failure occurs between save meta and unwatch channel, the removal flag shall be check before let datanode watch this channel
func (m *meta) batchSaveDropSegments(channel string, modSegments map[int64]*SegmentInfo) error {
@ -1066,10 +1066,11 @@ func (m *meta) copyDeltaFiles(binlogs []*datapb.FieldBinlog, collectionID, parti
}
func (m *meta) alterMetaStoreAfterCompaction(modSegments []*SegmentInfo, newSegment *SegmentInfo) error {
var modSegIDs []int64
for _, seg := range modSegments {
modSegIDs = append(modSegIDs, seg.GetID())
modSegIDs := lo.Map(modSegments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })
if newSegment.GetNumOfRows() == 0 {
newSegment.State = commonpb.SegmentState_Dropped
}
log.Info("meta update: alter meta store for compaction updates",
zap.Int64s("compact from segments (segments to be updated as dropped)", modSegIDs),
zap.Int64("new segmentId", newSegment.GetID()),
@ -1093,9 +1094,7 @@ func (m *meta) alterMetaStoreAfterCompaction(modSegments []*SegmentInfo, newSegm
m.segments.SetSegment(s.GetID(), s)
}
if newSegment.GetNumOfRows() > 0 {
m.segments.SetSegment(newSegment.GetID(), newSegment)
}
m.segments.SetSegment(newSegment.GetID(), newSegment)
return nil
}

View File

@ -205,10 +205,10 @@ func TestMeta_Basic(t *testing.T) {
assert.Nil(t, err)
// check GetSegment
info0_0 := meta.GetSegment(segID0_0)
info0_0 := meta.GetHealthySegment(segID0_0)
assert.NotNil(t, info0_0)
assert.True(t, proto.Equal(info0_0, segInfo0_0))
info1_0 := meta.GetSegment(segID1_0)
info1_0 := meta.GetHealthySegment(segID1_0)
assert.NotNil(t, info1_0)
assert.True(t, proto.Equal(info1_0, segInfo1_0))
@ -240,16 +240,16 @@ func TestMeta_Basic(t *testing.T) {
err = meta.SetState(segID0_0, commonpb.SegmentState_Flushed)
assert.Nil(t, err)
info0_0 = meta.GetSegment(segID0_0)
info0_0 = meta.GetHealthySegment(segID0_0)
assert.NotNil(t, info0_0)
assert.EqualValues(t, commonpb.SegmentState_Flushed, info0_0.State)
info0_0 = meta.GetSegment(segID0_0)
info0_0 = meta.GetHealthySegment(segID0_0)
assert.NotNil(t, info0_0)
assert.Equal(t, true, info0_0.GetIsImporting())
err = meta.UnsetIsImporting(segID0_0)
assert.NoError(t, err)
info0_0 = meta.GetSegment(segID0_0)
info0_0 = meta.GetHealthySegment(segID0_0)
assert.NotNil(t, info0_0)
assert.Equal(t, false, info0_0.GetIsImporting())
@ -257,12 +257,12 @@ func TestMeta_Basic(t *testing.T) {
err = meta.UnsetIsImporting(segID1_0)
assert.Error(t, err)
info1_1 := meta.GetSegment(segID1_1)
info1_1 := meta.GetHealthySegment(segID1_1)
assert.NotNil(t, info1_1)
assert.Equal(t, false, info1_1.GetIsImporting())
err = meta.UnsetIsImporting(segID1_1)
assert.NoError(t, err)
info1_1 = meta.GetSegment(segID1_1)
info1_1 = meta.GetHealthySegment(segID1_1)
assert.NotNil(t, info1_1)
assert.Equal(t, false, info1_1.GetIsImporting())
@ -442,7 +442,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
[]*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}})
assert.Nil(t, err)
updated := meta.GetSegment(1)
updated := meta.GetHealthySegment(1)
expected := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: 1, State: commonpb.SegmentState_Flushing, NumOfRows: 10,
StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}},
@ -485,7 +485,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
[]*datapb.SegmentStartPosition{{SegmentID: 2, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}})
assert.Nil(t, err)
assert.Nil(t, meta.GetSegment(2))
assert.Nil(t, meta.GetHealthySegment(2))
})
t.Run("test save etcd failed", func(t *testing.T) {
@ -510,7 +510,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
[]*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}})
assert.NotNil(t, err)
assert.Equal(t, "mocked fail", err.Error())
segmentInfo = meta.GetSegment(1)
segmentInfo = meta.GetHealthySegment(1)
assert.EqualValues(t, 0, segmentInfo.NumOfRows)
assert.Equal(t, commonpb.SegmentState_Growing, segmentInfo.State)
assert.Nil(t, segmentInfo.Binlogs)
@ -699,7 +699,7 @@ func Test_meta_SetSegmentCompacting(t *testing.T) {
segments: tt.fields.segments,
}
m.SetSegmentCompacting(tt.args.segmentID, tt.args.compacting)
segment := m.GetSegment(tt.args.segmentID)
segment := m.GetHealthySegment(tt.args.segmentID)
assert.Equal(t, tt.args.compacting, segment.isCompacting)
})
}
@ -748,7 +748,7 @@ func Test_meta_SetSegmentImporting(t *testing.T) {
segments: tt.fields.segments,
}
m.SetSegmentCompacting(tt.args.segmentID, tt.args.importing)
segment := m.GetSegment(tt.args.segmentID)
segment := m.GetHealthySegment(tt.args.segmentID)
assert.Equal(t, tt.args.importing, segment.isCompacting)
})
}
@ -871,10 +871,10 @@ func TestMeta_GetAllSegments(t *testing.T) {
},
}
seg1 := m.GetSegment(1)
seg1All := m.GetSegmentUnsafe(1)
seg2 := m.GetSegment(2)
seg2All := m.GetSegmentUnsafe(2)
seg1 := m.GetHealthySegment(1)
seg1All := m.GetSegment(1)
seg2 := m.GetHealthySegment(2)
seg2All := m.GetSegment(2)
assert.NotNil(t, seg1)
assert.NotNil(t, seg1All)
assert.Nil(t, seg2)

View File

@ -242,7 +242,7 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
// filter segments
segments := make([]*SegmentInfo, 0)
for _, segmentID := range s.segments {
segment := s.meta.GetSegment(segmentID)
segment := s.meta.GetHealthySegment(segmentID)
if segment == nil {
log.Warn("Failed to get seginfo from meta", zap.Int64("id", segmentID))
continue
@ -409,7 +409,7 @@ func (s *SegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) {
break
}
}
segment := s.meta.GetSegment(segmentID)
segment := s.meta.GetHealthySegment(segmentID)
if segment == nil {
log.Warn("Failed to get segment", zap.Int64("id", segmentID))
return
@ -432,7 +432,7 @@ func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID Uniqu
segCandidates = segIDs
}
for _, id := range segCandidates {
info := s.meta.GetSegment(id)
info := s.meta.GetHealthySegment(id)
if info == nil {
log.Warn("failed to get seg info from meta", zap.Int64("segment ID", id))
continue
@ -472,7 +472,7 @@ func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel strin
ret := make([]UniqueID, 0, len(s.segments))
for _, id := range s.segments {
info := s.meta.GetSegment(id)
info := s.meta.GetHealthySegment(id)
if info == nil || info.InsertChannel != channel {
continue
}
@ -489,7 +489,7 @@ func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) error {
s.mu.Lock()
defer s.mu.Unlock()
for _, id := range s.segments {
segment := s.meta.GetSegment(id)
segment := s.meta.GetHealthySegment(id)
if segment == nil || segment.InsertChannel != channel {
continue
}
@ -510,7 +510,7 @@ func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) error {
func (s *SegmentManager) cleanupSealedSegment(ts Timestamp, channel string) {
valids := make([]int64, 0, len(s.segments))
for _, id := range s.segments {
segment := s.meta.GetSegment(id)
segment := s.meta.GetHealthySegment(id)
if segment == nil || segment.InsertChannel != channel {
valids = append(valids, id)
continue
@ -542,7 +542,7 @@ func isEmptySealedSegment(segment *SegmentInfo, ts Timestamp) bool {
func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error {
channelInfo := make(map[string][]*SegmentInfo)
for _, id := range s.segments {
info := s.meta.GetSegment(id)
info := s.meta.GetHealthySegment(id)
if info == nil || info.InsertChannel != channel {
continue
}
@ -583,7 +583,7 @@ func (s *SegmentManager) DropSegmentsOfChannel(ctx context.Context, channel stri
validSegments := make([]int64, 0, len(s.segments))
for _, sid := range s.segments {
segment := s.meta.GetSegment(sid)
segment := s.meta.GetHealthySegment(sid)
if segment == nil {
continue
}

View File

@ -78,7 +78,7 @@ func TestManagerOptions(t *testing.T) {
t.Run("test withChannelSealPolicies", func(t *testing.T) {
opt := withChannelSealPolices(getChannelOpenSegCapacityPolicy(1000))
assert.NotNil(t, opt)
// manaul set nil
// manual set nil
segmentManager.channelSealPolicies = []channelSealPolicy{}
opt.apply(segmentManager)
assert.True(t, len(segmentManager.channelSealPolicies) > 0)
@ -249,7 +249,7 @@ func TestSaveSegmentsToMeta(t *testing.T) {
assert.EqualValues(t, 1, len(allocations))
_, err = segmentManager.SealAllSegments(context.Background(), collID, nil, false)
assert.Nil(t, err)
segment := meta.GetSegment(allocations[0].SegmentID)
segment := meta.GetHealthySegment(allocations[0].SegmentID)
assert.NotNil(t, segment)
assert.EqualValues(t, segment.LastExpireTime, allocations[0].ExpireTime)
assert.EqualValues(t, commonpb.SegmentState_Sealed, segment.State)
@ -271,7 +271,7 @@ func TestSaveSegmentsToMetaWithSpecificSegments(t *testing.T) {
assert.EqualValues(t, 1, len(allocations))
_, err = segmentManager.SealAllSegments(context.Background(), collID, []int64{allocations[0].SegmentID}, false)
assert.Nil(t, err)
segment := meta.GetSegment(allocations[0].SegmentID)
segment := meta.GetHealthySegment(allocations[0].SegmentID)
assert.NotNil(t, segment)
assert.EqualValues(t, segment.LastExpireTime, allocations[0].ExpireTime)
assert.EqualValues(t, commonpb.SegmentState_Sealed, segment.State)
@ -292,11 +292,11 @@ func TestDropSegment(t *testing.T) {
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
segID := allocations[0].SegmentID
segment := meta.GetSegment(segID)
segment := meta.GetHealthySegment(segID)
assert.NotNil(t, segment)
segmentManager.DropSegment(context.Background(), segID)
segment = meta.GetSegment(segID)
segment = meta.GetHealthySegment(segID)
assert.NotNil(t, segment)
}
@ -354,12 +354,12 @@ func TestExpireAllocation(t *testing.T) {
}
}
segment := meta.GetSegment(id)
segment := meta.GetHealthySegment(id)
assert.NotNil(t, segment)
assert.EqualValues(t, 100, len(segment.allocations))
err = segmentManager.ExpireAllocations("ch1", maxts)
assert.Nil(t, err)
segment = meta.GetSegment(id)
segment = meta.GetHealthySegment(id)
assert.NotNil(t, segment)
assert.EqualValues(t, 0, len(segment.allocations))
}
@ -437,7 +437,7 @@ func TestGetFlushableSegments(t *testing.T) {
ids, err = segmentManager.GetFlushableSegments(context.TODO(), "c1", allocations[0].ExpireTime)
assert.Nil(t, err)
assert.Empty(t, ids)
assert.Nil(t, meta.GetSegment(allocations[0].SegmentID))
assert.Nil(t, meta.GetHealthySegment(allocations[0].SegmentID))
})
}

View File

@ -61,7 +61,7 @@ import (
const (
connEtcdMaxRetryTime = 100
allPartitionID = 0 // paritionID means no filtering
allPartitionID = 0 // partitionID means no filtering
)
var (
@ -681,7 +681,7 @@ func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.Dat
func (s *Server) updateSegmentStatistics(stats []*datapb.SegmentStats) {
for _, stat := range stats {
segment := s.meta.GetSegmentUnsafe(stat.GetSegmentID())
segment := s.meta.GetSegment(stat.GetSegmentID())
if segment == nil {
log.Warn("skip updating row number for not exist segment",
zap.Int64("segment ID", stat.GetSegmentID()),
@ -700,7 +700,7 @@ func (s *Server) updateSegmentStatistics(stats []*datapb.SegmentStats) {
if segment.currRows < stat.GetNumRows() {
log.Info("Updating segment number of rows",
zap.Int64("segment ID", stat.GetSegmentID()),
zap.Int64("old value", s.meta.GetSegmentUnsafe(stat.GetSegmentID()).GetNumOfRows()),
zap.Int64("old value", s.meta.GetSegment(stat.GetSegmentID()).GetNumOfRows()),
zap.Int64("new value", stat.GetNumRows()),
)
s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
@ -711,7 +711,7 @@ func (s *Server) updateSegmentStatistics(stats []*datapb.SegmentStats) {
func (s *Server) getFlushableSegmentsInfo(flushableIDs []int64) []*SegmentInfo {
res := make([]*SegmentInfo, 0, len(flushableIDs))
for _, id := range flushableIDs {
sinfo := s.meta.GetSegment(id)
sinfo := s.meta.GetHealthySegment(id)
if sinfo == nil {
log.Error("get segment from meta error", zap.Int64("id", id))
continue
@ -878,7 +878,7 @@ func (s *Server) startFlushLoop(ctx context.Context) {
// 2. notify RootCoord segment is flushed
// 3. change segment state to `Flushed` in meta
func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
segment := s.meta.GetSegment(segmentID)
segment := s.meta.GetHealthySegment(segmentID)
if segment == nil {
return errors.New("segment not found, might be a faked segemnt, ignore post flush")
}

View File

@ -1313,7 +1313,7 @@ func TestSaveBinlogPaths(t *testing.T) {
assert.Nil(t, err)
assert.EqualValues(t, resp.ErrorCode, commonpb.ErrorCode_Success)
segment := svr.meta.GetSegment(1)
segment := svr.meta.GetHealthySegment(1)
assert.NotNil(t, segment)
binlogs := segment.GetBinlogs()
assert.EqualValues(t, 1, len(binlogs))
@ -1329,6 +1329,210 @@ func TestSaveBinlogPaths(t *testing.T) {
assert.EqualValues(t, segment.NumOfRows, 10)
})
t.Run("SaveDroppedSegment", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
// vecFieldID := int64(201)
svr.meta.AddCollection(&collectionInfo{
ID: 0,
})
segments := []struct {
id UniqueID
collectionID UniqueID
}{
{0, 0},
{1, 0},
}
for _, segment := range segments {
s := &datapb.SegmentInfo{
ID: segment.id,
CollectionID: segment.collectionID,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
}
err := svr.meta.AddSegment(NewSegmentInfo(s))
assert.Nil(t, err)
}
err := svr.channelManager.AddNode(0)
assert.Nil(t, err)
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
assert.Nil(t, err)
ctx := context.Background()
resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
Timestamp: uint64(time.Now().Unix()),
},
SegmentID: 1,
CollectionID: 0,
Field2BinlogPaths: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: "/by-dev/test/0/1/1/1/Allo1",
EntriesNum: 5,
},
{
LogPath: "/by-dev/test/0/1/1/1/Allo2",
EntriesNum: 5,
},
},
},
},
CheckPoints: []*datapb.CheckPoint{
{
SegmentID: 1,
Position: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 0,
},
NumOfRows: 12,
},
},
Flushed: false,
})
assert.Nil(t, err)
assert.EqualValues(t, resp.ErrorCode, commonpb.ErrorCode_Success)
segment := svr.meta.GetSegment(1)
assert.NotNil(t, segment)
binlogs := segment.GetBinlogs()
assert.EqualValues(t, 0, len(binlogs))
assert.EqualValues(t, segment.NumOfRows, 0)
})
t.Run("SaveUnhealthySegment", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
// vecFieldID := int64(201)
svr.meta.AddCollection(&collectionInfo{
ID: 0,
})
segments := []struct {
id UniqueID
collectionID UniqueID
}{
{0, 0},
{1, 0},
}
for _, segment := range segments {
s := &datapb.SegmentInfo{
ID: segment.id,
CollectionID: segment.collectionID,
InsertChannel: "ch1",
State: commonpb.SegmentState_NotExist,
}
err := svr.meta.AddSegment(NewSegmentInfo(s))
assert.Nil(t, err)
}
err := svr.channelManager.AddNode(0)
assert.Nil(t, err)
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
assert.Nil(t, err)
ctx := context.Background()
resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
Timestamp: uint64(time.Now().Unix()),
},
SegmentID: 1,
CollectionID: 0,
Field2BinlogPaths: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: "/by-dev/test/0/1/1/1/Allo1",
EntriesNum: 5,
},
{
LogPath: "/by-dev/test/0/1/1/1/Allo2",
EntriesNum: 5,
},
},
},
},
CheckPoints: []*datapb.CheckPoint{
{
SegmentID: 1,
Position: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 0,
},
NumOfRows: 12,
},
},
Flushed: false,
})
assert.Nil(t, err)
assert.EqualValues(t, resp.ErrorCode, commonpb.ErrorCode_SegmentNotFound)
})
t.Run("SaveNotExistSegment", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
// vecFieldID := int64(201)
svr.meta.AddCollection(&collectionInfo{
ID: 0,
})
err := svr.channelManager.AddNode(0)
assert.Nil(t, err)
err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 0})
assert.Nil(t, err)
ctx := context.Background()
resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
Timestamp: uint64(time.Now().Unix()),
},
SegmentID: 1,
CollectionID: 0,
Field2BinlogPaths: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: "/by-dev/test/0/1/1/1/Allo1",
EntriesNum: 5,
},
{
LogPath: "/by-dev/test/0/1/1/1/Allo2",
EntriesNum: 5,
},
},
},
},
CheckPoints: []*datapb.CheckPoint{
{
SegmentID: 1,
Position: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 0,
},
NumOfRows: 12,
},
},
Flushed: false,
})
assert.Nil(t, err)
assert.EqualValues(t, resp.ErrorCode, commonpb.ErrorCode_SegmentNotFound)
})
t.Run("with channel not matched", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
@ -1764,7 +1968,7 @@ func TestDataNodeTtChannel(t *testing.T) {
assert.EqualValues(t, 1, len(resp.SegIDAssignments))
assignedSegmentID := resp.SegIDAssignments[0].SegID
segment := svr.meta.GetSegment(assignedSegmentID)
segment := svr.meta.GetHealthySegment(assignedSegmentID)
assert.EqualValues(t, 1, len(segment.allocations))
msgPack := msgstream.MsgPack{}
@ -1774,7 +1978,7 @@ func TestDataNodeTtChannel(t *testing.T) {
assert.Nil(t, err)
<-ch
segment = svr.meta.GetSegment(assignedSegmentID)
segment = svr.meta.GetHealthySegment(assignedSegmentID)
assert.EqualValues(t, 0, len(segment.allocations))
})
}
@ -3454,7 +3658,7 @@ func TestDataCoord_SegmentStatistics(t *testing.T) {
})
assert.NoError(t, err)
assert.Equal(t, svr.meta.GetSegment(100).currRows, int64(1))
assert.Equal(t, svr.meta.GetHealthySegment(100).currRows, int64(1))
assert.Equal(t, commonpb.ErrorCode_Success, status.GetErrorCode())
closeTestServer(t, svr)
})
@ -3481,7 +3685,7 @@ func TestDataCoord_SegmentStatistics(t *testing.T) {
})
assert.NoError(t, err)
assert.Equal(t, svr.meta.GetSegment(100).currRows, int64(0))
assert.Equal(t, svr.meta.GetHealthySegment(100).currRows, int64(0))
assert.Equal(t, commonpb.ErrorCode_Success, status.GetErrorCode())
closeTestServer(t, svr)
})

View File

@ -231,7 +231,7 @@ func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentSta
state := &datapb.SegmentStateInfo{
SegmentID: segmentID,
}
segmentInfo := s.meta.GetSegment(segmentID)
segmentInfo := s.meta.GetHealthySegment(segmentID)
if segmentInfo == nil {
state.State = commonpb.SegmentState_NotExist
} else {
@ -256,7 +256,7 @@ func (s *Server) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsert
resp.Status.Reason = serverNotServingErrMsg
return resp, nil
}
segment := s.meta.GetSegment(req.GetSegmentID())
segment := s.meta.GetHealthySegment(req.GetSegmentID())
if segment == nil {
resp.Status.Reason = "segment not found"
return resp, nil
@ -354,7 +354,7 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
for _, id := range req.SegmentIDs {
var info *SegmentInfo
if req.IncludeUnHealthy {
info = s.meta.GetSegmentUnsafe(id)
info = s.meta.GetSegment(id)
if info == nil {
log.Warn("failed to get segment, this may have been cleaned", zap.Int64("segmentID", id))
@ -371,7 +371,7 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
segmentutil.ReCalcRowCount(info.SegmentInfo, clonedInfo.SegmentInfo)
infos = append(infos, clonedInfo.SegmentInfo)
} else {
info = s.meta.GetSegment(id)
info = s.meta.GetHealthySegment(id)
if info == nil {
resp.Status.Reason = msgSegmentNotFound(id)
return resp, nil
@ -401,10 +401,13 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
return resp, nil
}
log.Info("receive SaveBinlogPaths request",
log := log.Ctx(ctx).With(
zap.Int64("nodeID", req.GetBase().GetSourceID()),
zap.Int64("collectionID", req.GetCollectionID()),
zap.Int64("segmentID", req.GetSegmentID()),
)
log.Info("receive SaveBinlogPaths request",
zap.Bool("isFlush", req.GetFlushed()),
zap.Bool("isDropped", req.GetDropped()),
zap.Any("startPositions", req.GetStartPositions()),
@ -416,7 +419,17 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
segment := s.meta.GetSegment(segmentID)
if segment == nil {
log.Error("failed to get segment", zap.Int64("segmentID", segmentID))
log.Error("failed to get segment")
failResponseWithCode(resp, commonpb.ErrorCode_SegmentNotFound, fmt.Sprintf("failed to get segment %d", segmentID))
return resp, nil
}
if segment.State == commonpb.SegmentState_Dropped {
log.Info("save to dropped segment, ignore this request")
resp.ErrorCode = commonpb.ErrorCode_Success
return resp, nil
} else if !isSegmentHealthy(segment) {
log.Error("failed to get segment")
failResponseWithCode(resp, commonpb.ErrorCode_SegmentNotFound, fmt.Sprintf("failed to get segment %d", segmentID))
return resp, nil
}
@ -427,7 +440,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
if !s.channelManager.Match(nodeID, channel) {
failResponse(resp, fmt.Sprintf("channel %s is not watched on node %d", channel, nodeID))
resp.ErrorCode = commonpb.ErrorCode_MetaFailed
log.Warn("node is not matched with channel", zap.String("channel", channel), zap.Int64("nodeID", nodeID))
log.Warn("node is not matched with channel", zap.String("channel", channel))
return resp, nil
}
}
@ -448,15 +461,12 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
req.GetCheckPoints(),
req.GetStartPositions())
if err != nil {
log.Error("save binlog and checkpoints failed",
zap.Int64("segmentID", req.GetSegmentID()),
zap.Error(err))
log.Error("save binlog and checkpoints failed", zap.Error(err))
resp.Reason = err.Error()
return resp, nil
}
log.Info("flush segment with meta", zap.Int64("segment id", req.SegmentID),
zap.Any("meta", req.GetField2BinlogPaths()))
log.Info("flush segment with meta", zap.Any("meta", req.GetField2BinlogPaths()))
if req.GetFlushed() {
s.segmentManager.DropSegment(ctx, req.SegmentID)
@ -466,9 +476,9 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
err = s.compactionTrigger.triggerSingleCompaction(segment.GetCollectionID(), segment.GetPartitionID(),
segmentID, segment.GetInsertChannel())
if err != nil {
log.Warn("failed to trigger single compaction", zap.Int64("segment ID", segmentID))
log.Warn("failed to trigger single compaction")
} else {
log.Info("compaction triggered for segment", zap.Int64("segment ID", segmentID))
log.Info("compaction triggered for segment")
}
}
}
@ -533,7 +543,7 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual
}
s.segmentManager.DropSegmentsOfChannel(ctx, channel)
// no compaction triggerred in Drop procedure
// no compaction triggered in Drop procedure
resp.Status.ErrorCode = commonpb.ErrorCode_Success
return resp, nil
}
@ -649,7 +659,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
segment2InsertChannel := make(map[UniqueID]string)
segmentsNumOfRows := make(map[UniqueID]int64)
for id := range flushedIDs {
segment := s.meta.GetSegmentUnsafe(id)
segment := s.meta.GetSegment(id)
if segment == nil {
errMsg := fmt.Sprintf("failed to get segment %d", id)
log.Error(errMsg)
@ -759,7 +769,7 @@ func (s *Server) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedS
}
ret := make([]UniqueID, 0, len(segmentIDs))
for _, id := range segmentIDs {
segment := s.meta.GetSegmentUnsafe(id)
segment := s.meta.GetSegment(id)
// if this segment == nil, we assume this segment has been gc
if segment == nil ||
(segment.GetState() != commonpb.SegmentState_Dropped &&
@ -810,7 +820,7 @@ func (s *Server) GetSegmentsByStates(ctx context.Context, req *datapb.GetSegment
statesDict[state] = true
}
for _, id := range segmentIDs {
segment := s.meta.GetSegment(id)
segment := s.meta.GetHealthySegment(id)
if segment != nil && statesDict[segment.GetState()] {
ret = append(ret, id)
}
@ -1131,7 +1141,7 @@ func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateR
var unflushed []UniqueID
for _, sid := range req.GetSegmentIDs() {
segment := s.meta.GetSegment(sid)
segment := s.meta.GetHealthySegment(sid)
// segment is nil if it was compacted or it's a empty segment and is set to dropped
if segment == nil || segment.GetState() == commonpb.SegmentState_Flushing ||
segment.GetState() == commonpb.SegmentState_Flushed {

View File

@ -129,22 +129,22 @@ func (kc *Catalog) parseBinlogKey(key string, prefixIdx int) (int64, int64, int6
remainedKey := key[prefixIdx:]
keyWordGroup := strings.Split(remainedKey, "/")
if len(keyWordGroup) < 3 {
return 0, 0, 0, fmt.Errorf("parse key: %s faild, trimed key:%s", key, remainedKey)
return 0, 0, 0, fmt.Errorf("parse key: %s failed, trimmed key:%s", key, remainedKey)
}
collectionID, err := strconv.ParseInt(keyWordGroup[0], 10, 64)
if err != nil {
return 0, 0, 0, fmt.Errorf("parse key: %s faild, trimed key:%s, %w", key, remainedKey, err)
return 0, 0, 0, fmt.Errorf("parse key: %s failed, trimmed key:%s, %w", key, remainedKey, err)
}
partitionID, err := strconv.ParseInt(keyWordGroup[1], 10, 64)
if err != nil {
return 0, 0, 0, fmt.Errorf("parse key: %s faild, trimed key:%s, %w", key, remainedKey, err)
return 0, 0, 0, fmt.Errorf("parse key: %s failed, trimmed key:%s, %w", key, remainedKey, err)
}
segmentID, err := strconv.ParseInt(keyWordGroup[2], 10, 64)
if err != nil {
return 0, 0, 0, fmt.Errorf("parse key: %s faild, trimed key:%s, %w", key, remainedKey, err)
return 0, 0, 0, fmt.Errorf("parse key: %s failed, trimmed key:%s, %w", key, remainedKey, err)
}
return collectionID, partitionID, segmentID, nil
@ -386,12 +386,12 @@ func (kc *Catalog) AlterSegmentsAndAddNewSegment(ctx context.Context, segments [
}
if newSegment != nil {
if newSegment.GetNumOfRows() > 0 {
segmentKvs, err := buildSegmentAndBinlogsKvs(newSegment)
if err != nil {
return err
}
maps.Copy(kvs, segmentKvs)
segmentKvs, err := buildSegmentAndBinlogsKvs(newSegment)
if err != nil {
return err
}
maps.Copy(kvs, segmentKvs)
if newSegment.NumOfRows > 0 {
kc.collectMetrics(newSegment)
}
}