From 4bebca64161a962484ed73853ab43f4f9306ec01 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Mon, 10 Mar 2025 12:16:03 +0800 Subject: [PATCH] enhance: Replace currRows with NumOfRows (#40074) See also: #40068 --------- Signed-off-by: yangxuan --- internal/datacoord/compaction_test.go | 4 -- internal/datacoord/import_util.go | 4 +- internal/datacoord/import_util_test.go | 12 ++-- internal/datacoord/index_service_test.go | 5 -- internal/datacoord/meta.go | 49 +++++++++------- internal/datacoord/meta_test.go | 18 +++--- .../datacoord/segment_allocation_policy.go | 15 +++-- .../segment_allocation_policy_test.go | 20 +------ internal/datacoord/segment_info.go | 15 ++--- internal/datacoord/segment_manager.go | 2 +- internal/datacoord/segment_manager_test.go | 22 ++++---- internal/datacoord/server.go | 30 ---------- internal/datacoord/server_test.go | 56 ------------------- internal/datacoord/services.go | 13 ++--- internal/datacoord/services_test.go | 2 +- internal/flushcommon/broker/broker.go | 1 - internal/flushcommon/broker/datacoord.go | 12 ---- internal/flushcommon/broker/datacoord_test.go | 38 ------------- internal/flushcommon/broker/mock_broker.go | 47 ---------------- .../pipeline/data_sync_service_test.go | 1 - .../flushcommon/pipeline/testutils_test.go | 4 -- pkg/proto/data_coord.proto | 1 + pkg/proto/datapb/data_coord_grpc.pb.go | 2 + 23 files changed, 83 insertions(+), 290 deletions(-) diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index b50edf0e37..d8d1c90231 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -138,7 +138,6 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() { CollectionID: 2, PartitionID: 3, }, - currRows: 0, allocations: nil, lastFlushTime: time.Time{}, isCompacting: false, @@ -260,7 +259,6 @@ func (s *CompactionPlanHandlerSuite) TestScheduleWithSlotLimit() { CollectionID: 2, PartitionID: 3, }, - currRows: 0, allocations: nil, lastFlushTime: time.Time{}, isCompacting: false, @@ -353,7 +351,6 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() { CollectionID: 2, PartitionID: 3, }, - currRows: 0, allocations: nil, lastFlushTime: time.Time{}, isCompacting: false, @@ -633,7 +630,6 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() { CollectionID: 2, PartitionID: 3, }, - currRows: 0, allocations: nil, lastFlushTime: time.Time{}, isCompacting: false, diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index c7949f45dd..a6a43931d0 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -443,7 +443,7 @@ func getImportRowsInfo(jobID int64, imeta ImportMeta, meta *meta) (importedRows, }) segmentIDs = append(segmentIDs, task.(*importTask).GetSegmentIDs()...) } - importedRows = meta.GetSegmentsTotalCurrentRows(segmentIDs) + importedRows = meta.GetSegmentsTotalNumRows(segmentIDs) return } @@ -553,7 +553,7 @@ func GetTaskProgresses(jobID int64, imeta ImportMeta, meta *meta) []*internalpb. totalRows := lo.SumBy(task.GetFileStats(), func(file *datapb.ImportFileStats) int64 { return file.GetTotalRows() }) - importedRows := meta.GetSegmentsTotalCurrentRows(task.(*importTask).GetSegmentIDs()) + importedRows := meta.GetSegmentsTotalNumRows(task.(*importTask).GetSegmentIDs()) progress := int64(100) if totalRows != 0 { progress = int64(float32(importedRows) / float32(totalRows) * 100) diff --git a/internal/datacoord/import_util_test.go b/internal/datacoord/import_util_test.go index 31af868a69..b06fdfa45e 100644 --- a/internal/datacoord/import_util_test.go +++ b/internal/datacoord/import_util_test.go @@ -652,15 +652,15 @@ func TestImportUtil_GetImportProgress(t *testing.T) { err = imeta.AddTask(context.TODO(), it1) assert.NoError(t, err) err = meta.AddSegment(ctx, &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ID: 10, IsImporting: true, State: commonpb.SegmentState_Flushed}, currRows: 50, + SegmentInfo: &datapb.SegmentInfo{ID: 10, IsImporting: true, State: commonpb.SegmentState_Flushed, NumOfRows: 50}, }) assert.NoError(t, err) err = meta.AddSegment(ctx, &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ID: 11, IsImporting: true, State: commonpb.SegmentState_Flushed}, currRows: 50, + SegmentInfo: &datapb.SegmentInfo{ID: 11, IsImporting: true, State: commonpb.SegmentState_Flushed, NumOfRows: 50}, }) assert.NoError(t, err) err = meta.AddSegment(ctx, &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ID: 12, IsImporting: true, State: commonpb.SegmentState_Flushed}, currRows: 50, + SegmentInfo: &datapb.SegmentInfo{ID: 12, IsImporting: true, State: commonpb.SegmentState_Flushed, NumOfRows: 50}, }) assert.NoError(t, err) @@ -681,15 +681,15 @@ func TestImportUtil_GetImportProgress(t *testing.T) { err = imeta.AddTask(context.TODO(), it2) assert.NoError(t, err) err = meta.AddSegment(ctx, &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ID: 20, IsImporting: true, State: commonpb.SegmentState_Flushed}, currRows: 50, + SegmentInfo: &datapb.SegmentInfo{ID: 20, IsImporting: true, State: commonpb.SegmentState_Flushed, NumOfRows: 50}, }) assert.NoError(t, err) err = meta.AddSegment(ctx, &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ID: 21, IsImporting: true, State: commonpb.SegmentState_Flushed}, currRows: 50, + SegmentInfo: &datapb.SegmentInfo{ID: 21, IsImporting: true, State: commonpb.SegmentState_Flushed, NumOfRows: 50}, }) assert.NoError(t, err) err = meta.AddSegment(ctx, &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ID: 22, IsImporting: true, State: commonpb.SegmentState_Flushed}, currRows: 50, + SegmentInfo: &datapb.SegmentInfo{ID: 22, IsImporting: true, State: commonpb.SegmentState_Flushed, NumOfRows: 50}, }) assert.NoError(t, err) diff --git a/internal/datacoord/index_service_test.go b/internal/datacoord/index_service_test.go index 82176d1954..25c5d2c15c 100644 --- a/internal/datacoord/index_service_test.go +++ b/internal/datacoord/index_service_test.go @@ -777,7 +777,6 @@ func TestServer_GetIndexState(t *testing.T) { Timestamp: createTS - 1, }, }, - currRows: 0, allocations: nil, lastFlushTime: time.Time{}, isCompacting: false, @@ -836,7 +835,6 @@ func TestServer_GetIndexState(t *testing.T) { Timestamp: createTS - 1, }, }, - currRows: 0, allocations: nil, lastFlushTime: time.Time{}, isCompacting: false, @@ -1014,7 +1012,6 @@ func TestServer_GetSegmentIndexState(t *testing.T) { PartitionID: partID, InsertChannel: "ch", }, - currRows: 0, allocations: nil, lastFlushTime: time.Time{}, isCompacting: false, @@ -1134,7 +1131,6 @@ func TestServer_GetIndexBuildProgress(t *testing.T) { Timestamp: createTS, }, }, - currRows: 10250, allocations: nil, lastFlushTime: time.Time{}, isCompacting: false, @@ -1181,7 +1177,6 @@ func TestServer_GetIndexBuildProgress(t *testing.T) { Timestamp: createTS, }, }, - currRows: 10250, allocations: nil, lastFlushTime: time.Time{}, isCompacting: false, diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index bd5aa004b1..ddb5ecb22e 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -647,7 +647,7 @@ func (m *meta) GetAllSegmentsUnsafe() []*SegmentInfo { return m.segments.GetSegments() } -func (m *meta) GetSegmentsTotalCurrentRows(segmentIDs []UniqueID) int64 { +func (m *meta) GetSegmentsTotalNumRows(segmentIDs []UniqueID) int64 { m.RLock() defer m.RUnlock() var sum int64 = 0 @@ -657,7 +657,7 @@ func (m *meta) GetSegmentsTotalCurrentRows(segmentIDs []UniqueID) int64 { log.Ctx(context.TODO()).Warn("cannot find segment", zap.Int64("segmentID", segmentID)) continue } - sum += segment.currRows + sum += segment.GetNumOfRows() } return sum } @@ -1037,6 +1037,9 @@ func UpdateCheckPointOperator(segmentID int64, checkpoints []*datapb.CheckPoint) return false } + var cpNumRows int64 + + // Set segment dml position for _, cp := range checkpoints { if cp.SegmentID != segmentID { // Don't think this is gonna to happen, ignore for now. @@ -1050,18 +1053,22 @@ func UpdateCheckPointOperator(segmentID int64, checkpoints []*datapb.CheckPoint) continue } - segment.NumOfRows = cp.NumOfRows + cpNumRows = cp.NumOfRows segment.DmlPosition = cp.GetPosition() } + // update segments num rows count := segmentutil.CalcRowCountFromBinLog(segment.SegmentInfo) - if count != segment.currRows && count > 0 { - log.Ctx(context.TODO()).Info("check point reported inconsistent with bin log row count", - zap.Int64("segmentID", segmentID), - zap.Int64("current rows (wrong)", segment.currRows), - zap.Int64("segment bin log row count (correct)", count)) + if count > 0 { + if cpNumRows != count { + log.Ctx(context.TODO()).Info("check point reported row count inconsistent with binlog row count", + zap.Int64("segmentID", segmentID), + zap.Int64("binlog reported (wrong)", cpNumRows), + zap.Int64("segment binlog row count (correct)", count)) + } segment.NumOfRows = count } + return true } } @@ -1074,7 +1081,6 @@ func UpdateImportedRows(segmentID int64, rows int64) UpdateOperator { zap.Int64("segmentID", segmentID)) return false } - segment.currRows = rows segment.NumOfRows = rows segment.MaxRowNum = rows return true @@ -1261,8 +1267,7 @@ func (m *meta) mergeDropSegment(seg2Drop *SegmentInfo) (*SegmentInfo, *segMetric if seg2Drop.GetDmlPosition() != nil { clonedSegment.DmlPosition = seg2Drop.GetDmlPosition() } - clonedSegment.currRows = seg2Drop.currRows - clonedSegment.NumOfRows = seg2Drop.currRows + clonedSegment.NumOfRows = seg2Drop.GetNumOfRows() return clonedSegment, metricMutation } @@ -1417,6 +1422,12 @@ func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error { return nil } +func (m *meta) SetRowCount(segmentID UniqueID, rowCount int64) { + m.Lock() + defer m.Unlock() + m.segments.SetRowCount(segmentID, rowCount) +} + // SetAllocations set Segment allocations, will overwrite ALL original allocations // Note that allocations is not persisted in KV store func (m *meta) SetAllocations(segmentID UniqueID, allocations []*Allocation) { @@ -1425,14 +1436,6 @@ func (m *meta) SetAllocations(segmentID UniqueID, allocations []*Allocation) { m.segments.SetAllocations(segmentID, allocations) } -// SetCurrentRows set current row count for segment with provided `segmentID` -// Note that currRows is not persisted in KV store -func (m *meta) SetCurrentRows(segmentID UniqueID, rows int64) { - m.Lock() - defer m.Unlock() - m.segments.SetCurrentRows(segmentID, rows) -} - // SetLastExpire set lastExpire time for segment // Note that last is not necessary to store in KV meta func (m *meta) SetLastExpire(segmentID UniqueID, lastExpire uint64) { @@ -1451,6 +1454,14 @@ func (m *meta) SetLastFlushTime(segmentID UniqueID, t time.Time) { m.segments.SetFlushTime(segmentID, t) } +// SetLastWrittenTime set LastWrittenTime for segment with provided `segmentID` +// Note that lastWrittenTime is not persisted in KV store +func (m *meta) SetLastWrittenTime(segmentID UniqueID) { + m.Lock() + defer m.Unlock() + m.segments.SetLastWrittenTime(segmentID) +} + func (m *meta) CheckSegmentsStating(ctx context.Context, segmentIDs []UniqueID) (exist bool, hasStating bool) { m.RLock() defer m.RUnlock() diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index e4ad6d6acd..f8c0660632 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -810,7 +810,7 @@ func TestUpdateSegmentsInfo(t *testing.T) { segment1 := NewSegmentInfo(&datapb.SegmentInfo{ ID: 1, State: commonpb.SegmentState_Growing, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(1, 1, 222)}, Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, }) err = meta.AddSegment(context.TODO(), segment1) @@ -820,11 +820,11 @@ func TestUpdateSegmentsInfo(t *testing.T) { err = meta.UpdateSegmentsInfo( context.TODO(), - UpdateStatusOperator(1, commonpb.SegmentState_Flushing), + UpdateStatusOperator(1, commonpb.SegmentState_Growing), AddBinlogsOperator(1, - []*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(1, 10, 1)}, - []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1)}, - []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: "", LogID: 2}}}}, + []*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(1, 10, 333)}, + []*datapb.FieldBinlog{getFieldBinlogIDs(1, 334)}, + []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogID: 335}}}}, []*datapb.FieldBinlog{}, ), UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}), @@ -837,9 +837,9 @@ func TestUpdateSegmentsInfo(t *testing.T) { assert.EqualValues(t, 1, updated.getDeltaCount()) expected := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ - ID: 1, State: commonpb.SegmentState_Flushing, NumOfRows: 10, + ID: 1, State: commonpb.SegmentState_Growing, NumOfRows: 11, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0, 1)}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 222, 333)}, Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0, 1)}, Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000}}}}, }} @@ -1202,9 +1202,9 @@ func TestMeta_HasSegments(t *testing.T) { segments: map[UniqueID]*SegmentInfo{ 1: { SegmentInfo: &datapb.SegmentInfo{ - ID: 1, + ID: 1, + NumOfRows: 100, }, - currRows: 100, }, }, }, diff --git a/internal/datacoord/segment_allocation_policy.go b/internal/datacoord/segment_allocation_policy.go index d80ba9db47..18367a2727 100644 --- a/internal/datacoord/segment_allocation_policy.go +++ b/internal/datacoord/segment_allocation_policy.go @@ -147,12 +147,13 @@ func (f segmentSealPolicyFunc) ShouldSeal(segment *SegmentInfo, ts Timestamp) (b } // sealL1SegmentByCapacity get segmentSealPolicy with segment size factor policy +// TODO: change numOfRows to size func sealL1SegmentByCapacity(sizeFactor float64) segmentSealPolicyFunc { return func(segment *SegmentInfo, ts Timestamp) (bool, string) { jitter := paramtable.Get().DataCoordCfg.SegmentSealProportionJitter.GetAsFloat() ratio := (1 - jitter*rand.Float64()) - return float64(segment.currRows) >= sizeFactor*float64(segment.GetMaxRowNum())*ratio, - fmt.Sprintf("Row count capacity full, current rows: %d, max row: %d, seal factor: %f, jitter ratio: %f", segment.currRows, segment.GetMaxRowNum(), sizeFactor, ratio) + return float64(segment.GetNumOfRows()) >= sizeFactor*float64(segment.GetMaxRowNum())*ratio, + fmt.Sprintf("Row count capacity full, current rows: %d, max row: %d, seal factor: %f, jitter ratio: %f", segment.GetNumOfRows(), segment.GetMaxRowNum(), sizeFactor, ratio) } } @@ -196,12 +197,13 @@ func sealL1SegmentByBinlogFileNumber(maxBinlogFileNumber int) segmentSealPolicyF // into this segment anymore, so sealLongTimeIdlePolicy will seal these segments to trigger handoff of query cluster. // Q: Why we don't decrease the expiry time directly? // A: We don't want to influence segments which are accepting `frequent small` batch entities. +// TODO: replace rowNum with segment size func sealL1SegmentByIdleTime(idleTimeTolerance time.Duration, minSizeToSealIdleSegment float64, maxSizeOfSegment float64) segmentSealPolicyFunc { return func(segment *SegmentInfo, ts Timestamp) (bool, string) { limit := (minSizeToSealIdleSegment / maxSizeOfSegment) * float64(segment.GetMaxRowNum()) return time.Since(segment.lastWrittenTime) > idleTimeTolerance && - float64(segment.currRows) > limit, - fmt.Sprintf("segment idle, segment row number :%d, last written time: %v, max idle duration: %v", segment.currRows, segment.lastWrittenTime, idleTimeTolerance) + float64(segment.GetNumOfRows()) > limit, + fmt.Sprintf("segment idle, segment row number :%d, last written time: %v, max idle duration: %v", segment.GetNumOfRows(), segment.lastWrittenTime, idleTimeTolerance) } } @@ -264,10 +266,7 @@ func flushPolicyL1(segment *SegmentInfo, t Timestamp) bool { segment.Level != datapb.SegmentLevel_L0 && time.Since(segment.lastFlushTime) >= paramtable.Get().DataCoordCfg.SegmentFlushInterval.GetAsDuration(time.Second) && segment.GetLastExpireTime() <= t && - // A corner case when there's only 1 row in the segment, and the segment is synced - // before report currRows to DC. When DN recovered at this moment, - // it'll never report this segment's numRows again, leaving segment.currRows == 0 forever. - (segment.currRows != 0 || segment.GetNumOfRows() != 0) && + segment.GetNumOfRows() != 0 && // Decoupling the importing segment from the flush process, // This check avoids notifying the datanode to flush the // importing segment which may not exist. diff --git a/internal/datacoord/segment_allocation_policy_test.go b/internal/datacoord/segment_allocation_policy_test.go index b3bf6b4b54..ce794b4e5a 100644 --- a/internal/datacoord/segment_allocation_policy_test.go +++ b/internal/datacoord/segment_allocation_policy_test.go @@ -246,10 +246,10 @@ func Test_sealLongTimeIdlePolicy(t *testing.T) { seg1 := &SegmentInfo{lastWrittenTime: time.Now().Add(idleTimeTolerance * 5)} shouldSeal, _ := policy.ShouldSeal(seg1, 100) assert.False(t, shouldSeal) - seg2 := &SegmentInfo{lastWrittenTime: getZeroTime(), currRows: 1, SegmentInfo: &datapb.SegmentInfo{MaxRowNum: 10000}} + seg2 := &SegmentInfo{lastWrittenTime: getZeroTime(), SegmentInfo: &datapb.SegmentInfo{MaxRowNum: 10000, NumOfRows: 1}} shouldSeal, _ = policy.ShouldSeal(seg2, 100) assert.False(t, shouldSeal) - seg3 := &SegmentInfo{lastWrittenTime: getZeroTime(), currRows: 1000, SegmentInfo: &datapb.SegmentInfo{MaxRowNum: 10000}} + seg3 := &SegmentInfo{lastWrittenTime: getZeroTime(), SegmentInfo: &datapb.SegmentInfo{MaxRowNum: 10000, NumOfRows: 1000}} shouldSeal, _ = policy.ShouldSeal(seg3, 100) assert.True(t, shouldSeal) } @@ -287,19 +287,3 @@ func Test_sealByTotalGrowingSegmentsSize(t *testing.T) { assert.Equal(t, 1, len(res)) assert.Equal(t, seg2.GetID(), res[0].GetID()) } - -func TestFlushPolicyWithZeroCurRows(t *testing.T) { - seg := &SegmentInfo{ - currRows: 0, - // lastFlushTime unset because its a sealed segment - SegmentInfo: &datapb.SegmentInfo{ - NumOfRows: 1, - State: commonpb.SegmentState_Sealed, - Level: datapb.SegmentLevel_L1, - LastExpireTime: 456094911979061251, - }, - } - - flushed := flushPolicyL1(seg, tsoutil.GetCurrentTime()) - assert.True(t, flushed) -} diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index 8a395b4d17..3af851a3d4 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -50,7 +50,6 @@ type segmentInfoIndexes struct { // SegmentInfo wraps datapb.SegmentInfo and patches some extra info on it type SegmentInfo struct { *datapb.SegmentInfo - currRows int64 allocations []*Allocation lastFlushTime time.Time isCompacting bool @@ -70,7 +69,6 @@ type SegmentInfo struct { func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo { s := &SegmentInfo{ SegmentInfo: info, - currRows: info.GetNumOfRows(), } // setup growing fields if s.GetState() == commonpb.SegmentState_Growing { @@ -258,12 +256,12 @@ func (s *SegmentsInfo) AddAllocation(segmentID UniqueID, allocation *Allocation) } } -// SetCurrentRows sets rows count for segment +// UpdateLastWrittenTime updates segment last writtent time to now. // if the segment is not found, do nothing // uses `ShadowClone` since internal SegmentInfo is not changed -func (s *SegmentsInfo) SetCurrentRows(segmentID UniqueID, rows int64) { +func (s *SegmentsInfo) SetLastWrittenTime(segmentID UniqueID) { if segment, ok := s.segments[segmentID]; ok { - s.segments[segmentID] = segment.ShadowClone(SetCurrentRows(rows)) + s.segments[segmentID] = segment.ShadowClone(SetLastWrittenTime()) } } @@ -326,7 +324,6 @@ func (s *SegmentInfo) Clone(opts ...SegmentInfoOption) *SegmentInfo { info := proto.Clone(s.SegmentInfo).(*datapb.SegmentInfo) cloned := &SegmentInfo{ SegmentInfo: info, - currRows: s.currRows, allocations: s.allocations, lastFlushTime: s.lastFlushTime, isCompacting: s.isCompacting, @@ -343,7 +340,6 @@ func (s *SegmentInfo) Clone(opts ...SegmentInfoOption) *SegmentInfo { func (s *SegmentInfo) ShadowClone(opts ...SegmentInfoOption) *SegmentInfo { cloned := &SegmentInfo{ SegmentInfo: s.SegmentInfo, - currRows: s.currRows, allocations: s.allocations, lastFlushTime: s.lastFlushTime, isCompacting: s.isCompacting, @@ -457,10 +453,9 @@ func AddAllocation(allocation *Allocation) SegmentInfoOption { } } -// SetCurrentRows is the option to set current row count for segment info -func SetCurrentRows(rows int64) SegmentInfoOption { +// SetLastWrittenTime is the option to set last writtent time for segment info +func SetLastWrittenTime() SegmentInfoOption { return func(segment *SegmentInfo) { - segment.currRows = rows segment.lastWrittenTime = time.Now() } } diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index ac1f080372..682e019162 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -582,7 +582,7 @@ func (s *SegmentManager) CleanZeroSealedSegmentsOfChannel(ctx context.Context, c return true } // Check if segment is empty - if segment.GetLastExpireTime() > 0 && segment.GetLastExpireTime() < cpTs && segment.currRows == 0 && segment.GetNumOfRows() == 0 { + if segment.GetLastExpireTime() > 0 && segment.GetLastExpireTime() < cpTs && segment.GetNumOfRows() == 0 { log.Info("try remove empty sealed segment after channel cp updated", zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id), zap.String("channel", channel), zap.Any("cpTs", cpTs)) diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index f5e3d48f0e..d67be39d2f 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -243,9 +243,12 @@ func TestLastExpireReset(t *testing.T) { segmentID3, expire3 := allocs[0].SegmentID, allocs[0].ExpireTime // simulate handleTimeTick op on dataCoord - meta.SetCurrentRows(segmentID1, bigRows) - meta.SetCurrentRows(segmentID2, bigRows) - meta.SetCurrentRows(segmentID3, smallRows) + // meta.SetLastWrittenTime(segmentID1, bigRows) + // meta.SetLastWrittenTime(segmentID2, bigRows) + // meta.SetLastWrittenTime(segmentID3, smallRows) + meta.SetRowCount(segmentID1, bigRows) + meta.SetRowCount(segmentID2, bigRows) + meta.SetRowCount(segmentID3, smallRows) err = segmentManager.tryToSealSegment(context.TODO(), expire1, channelName) assert.NoError(t, err) assert.Equal(t, commonpb.SegmentState_Sealed, meta.GetSegment(context.TODO(), segmentID1).GetState()) @@ -268,9 +271,9 @@ func TestLastExpireReset(t *testing.T) { assert.Nil(t, err) newSegmentManager, _ := newSegmentManager(restartedMeta, mockAllocator) // reset row number to avoid being cleaned by empty segment - restartedMeta.SetCurrentRows(segmentID1, bigRows) - restartedMeta.SetCurrentRows(segmentID2, bigRows) - restartedMeta.SetCurrentRows(segmentID3, smallRows) + restartedMeta.SetRowCount(segmentID1, bigRows) + restartedMeta.SetRowCount(segmentID2, bigRows) + restartedMeta.SetRowCount(segmentID3, smallRows) // verify lastExpire of growing and sealed segments segment1, segment2, segment3 := restartedMeta.GetSegment(context.TODO(), segmentID1), restartedMeta.GetSegment(context.TODO(), segmentID2), restartedMeta.GetSegment(context.TODO(), segmentID3) @@ -498,7 +501,7 @@ func TestGetFlushableSegments(t *testing.T) { assert.EqualValues(t, 1, len(ids)) assert.EqualValues(t, allocations[0].SegmentID, ids[0]) - meta.SetCurrentRows(allocations[0].SegmentID, 1) + meta.SetRowCount(allocations[0].SegmentID, 1) ids, err = segmentManager.GetFlushableSegments(context.TODO(), "c1", allocations[0].ExpireTime) assert.NoError(t, err) assert.EqualValues(t, 1, len(ids)) @@ -515,7 +518,7 @@ func TestGetFlushableSegments(t *testing.T) { assert.EqualValues(t, 1, len(ids)) assert.EqualValues(t, allocations[0].SegmentID, ids[0]) - meta.SetCurrentRows(allocations[0].SegmentID, 0) + meta.SetRowCount(allocations[0].SegmentID, 0) postions := make([]*msgpb.MsgPosition, 0) cpTs := allocations[0].ExpireTime + 1 postions = append(postions, &msgpb.MsgPosition{ @@ -949,7 +952,6 @@ func TestSegmentManager_CleanZeroSealedSegmentsOfChannel(t *testing.T) { NumOfRows: 1, LastExpireTime: 100, }, - currRows: 1, } seg2 := &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ @@ -960,7 +962,6 @@ func TestSegmentManager_CleanZeroSealedSegmentsOfChannel(t *testing.T) { NumOfRows: 0, LastExpireTime: 100, }, - currRows: 0, } seg3 := &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ @@ -980,7 +981,6 @@ func TestSegmentManager_CleanZeroSealedSegmentsOfChannel(t *testing.T) { NumOfRows: 1, LastExpireTime: 100, }, - currRows: 1, } newMetaFunc := func() *meta { return &meta{ diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 405893459c..4bbb1dc9b7 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -784,36 +784,6 @@ func (s *Server) startTaskScheduler() { s.startCollectMetaMetrics(s.serverLoopCtx) } -func (s *Server) updateSegmentStatistics(ctx context.Context, stats []*commonpb.SegmentStats) { - log := log.Ctx(ctx) - for _, stat := range stats { - segment := s.meta.GetSegment(ctx, stat.GetSegmentID()) - if segment == nil { - log.Warn("skip updating row number for not exist segment", - zap.Int64("segmentID", stat.GetSegmentID()), - zap.Int64("new value", stat.GetNumRows())) - continue - } - - if isFlushState(segment.GetState()) { - log.Warn("skip updating row number for flushed segment", - zap.Int64("segmentID", stat.GetSegmentID()), - zap.Int64("new value", stat.GetNumRows())) - continue - } - - // Log if # of rows is updated. - if segment.currRows < stat.GetNumRows() { - log.Debug("Updating segment number of rows", - zap.Int64("segmentID", stat.GetSegmentID()), - zap.Int64("old value", s.meta.GetSegment(ctx, stat.GetSegmentID()).GetNumOfRows()), - zap.Int64("new value", stat.GetNumRows()), - ) - s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows()) - } - } -} - func (s *Server) getFlushableSegmentsInfo(ctx context.Context, flushableIDs []int64) []*SegmentInfo { log := log.Ctx(ctx) res := make([]*SegmentInfo, 0, len(flushableIDs)) diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 5ae91b005a..d4ec1ef332 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -2274,62 +2274,6 @@ func TestDataCoordServer_SetSegmentState(t *testing.T) { }) } -func TestDataCoord_SegmentStatistics(t *testing.T) { - t.Run("test update imported segment stat", func(t *testing.T) { - svr := newTestServer(t) - - seg1 := &datapb.SegmentInfo{ - ID: 100, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(101, 1, 1)}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3)}, - State: commonpb.SegmentState_Importing, - } - - info := NewSegmentInfo(seg1) - svr.meta.AddSegment(context.TODO(), info) - - status, err := svr.UpdateSegmentStatistics(context.TODO(), &datapb.UpdateSegmentStatisticsRequest{ - Stats: []*commonpb.SegmentStats{{ - SegmentID: 100, - NumRows: int64(1), - }}, - }) - assert.NoError(t, err) - - assert.Equal(t, svr.meta.GetHealthySegment(context.TODO(), 100).currRows, int64(1)) - assert.Equal(t, commonpb.ErrorCode_Success, status.GetErrorCode()) - closeTestServer(t, svr) - }) - - t.Run("test update flushed segment stat", func(t *testing.T) { - svr := newTestServer(t) - - seg1 := &datapb.SegmentInfo{ - ID: 100, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(101, 1, 1)}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, - Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3)}, - State: commonpb.SegmentState_Flushed, - } - - info := NewSegmentInfo(seg1) - svr.meta.AddSegment(context.TODO(), info) - - status, err := svr.UpdateSegmentStatistics(context.TODO(), &datapb.UpdateSegmentStatisticsRequest{ - Stats: []*commonpb.SegmentStats{{ - SegmentID: 100, - NumRows: int64(1), - }}, - }) - assert.NoError(t, err) - - assert.Equal(t, svr.meta.GetHealthySegment(context.TODO(), 100).currRows, int64(0)) - assert.Equal(t, commonpb.ErrorCode_Success, status.GetErrorCode()) - closeTestServer(t, svr) - }) -} - func TestDataCoordServer_UpdateChannelCheckpoint(t *testing.T) { mockVChannel := "fake-by-dev-rootcoord-dml-1-testchannelcp-v0" diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 5078da1638..1ec765179d 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -566,6 +566,8 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath log.Error("save binlog and checkpoints failed", zap.Error(err)) return merr.Status(err), nil } + + s.meta.SetLastWrittenTime(req.GetSegmentID()) log.Info("SaveBinlogPaths sync segment with meta", zap.Any("binlogs", req.GetField2BinlogPaths()), zap.Any("deltalogs", req.GetDeltalogs()), @@ -1411,12 +1413,12 @@ func (s *Server) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAll return resp, nil } +// Deprecated // UpdateSegmentStatistics updates a segment's stats. func (s *Server) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(s.GetStateCode()); err != nil { return merr.Status(err), nil } - s.updateSegmentStatistics(ctx, req.GetStats()) return merr.Success(), nil } @@ -1495,10 +1497,9 @@ func (s *Server) ReportDataNodeTtMsgs(ctx context.Context, req *datapb.ReportDat func (s *Server) handleDataNodeTtMsg(ctx context.Context, ttMsg *msgpb.DataNodeTtMsg) error { var ( - channel = ttMsg.GetChannelName() - ts = ttMsg.GetTimestamp() - sourceID = ttMsg.GetBase().GetSourceID() - segmentStats = ttMsg.GetSegmentsStats() + channel = ttMsg.GetChannelName() + ts = ttMsg.GetTimestamp() + sourceID = ttMsg.GetBase().GetSourceID() ) physical, _ := tsoutil.ParseTS(ts) @@ -1523,8 +1524,6 @@ func (s *Server) handleDataNodeTtMsg(ctx context.Context, ttMsg *msgpb.DataNodeT WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), pChannelName). Set(float64(sub)) - s.updateSegmentStatistics(ctx, segmentStats) - s.segmentManager.ExpireAllocations(ctx, channel, ts) flushableIDs, err := s.segmentManager.GetFlushableSegments(ctx, channel, ts) diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 06814e1e41..9ad350bbaa 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -517,7 +517,7 @@ func (s *ServerSuite) TestFlush_NormalCase() { s.NoError(err) s.EqualValues(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) - s.testServer.meta.SetCurrentRows(segID, 1) + s.testServer.meta.SetRowCount(segID, 1) ids, err := s.testServer.segmentManager.GetFlushableSegments(context.TODO(), "channel-1", expireTs) s.NoError(err) s.EqualValues(1, len(ids)) diff --git a/internal/flushcommon/broker/broker.go b/internal/flushcommon/broker/broker.go index b08c34ded9..469156420e 100644 --- a/internal/flushcommon/broker/broker.go +++ b/internal/flushcommon/broker/broker.go @@ -38,6 +38,5 @@ type DataCoord interface { UpdateChannelCheckpoint(ctx context.Context, channelCPs []*msgpb.MsgPosition) error SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) error DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) - UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) error ImportV2(ctx context.Context, in *internalpb.ImportRequestInternal) (*internalpb.ImportResponse, error) } diff --git a/internal/flushcommon/broker/datacoord.go b/internal/flushcommon/broker/datacoord.go index fcd9708912..8c3f49a988 100644 --- a/internal/flushcommon/broker/datacoord.go +++ b/internal/flushcommon/broker/datacoord.go @@ -156,18 +156,6 @@ func (dc *dataCoordBroker) DropVirtualChannel(ctx context.Context, req *datapb.D return resp, nil } -func (dc *dataCoordBroker) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) error { - log := log.Ctx(ctx) - - resp, err := dc.client.UpdateSegmentStatistics(ctx, req) - if err := merr.CheckRPCCall(resp, err); err != nil { - log.Warn("failed to UpdateSegmentStatistics", zap.Error(err)) - return err - } - - return nil -} - func (dc *dataCoordBroker) ImportV2(ctx context.Context, in *internalpb.ImportRequestInternal) (*internalpb.ImportResponse, error) { resp, err := dc.client.ImportV2(ctx, in) if err := merr.CheckRPCCall(resp, err); err != nil { diff --git a/internal/flushcommon/broker/datacoord_test.go b/internal/flushcommon/broker/datacoord_test.go index 2ede261b66..7966bcfe9d 100644 --- a/internal/flushcommon/broker/datacoord_test.go +++ b/internal/flushcommon/broker/datacoord_test.go @@ -291,44 +291,6 @@ func (s *dataCoordSuite) TestDropVirtualChannel() { }) } -func (s *dataCoordSuite) TestUpdateSegmentStatistics() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - req := &datapb.UpdateSegmentStatisticsRequest{ - Stats: []*commonpb.SegmentStats{ - {}, {}, {}, - }, - } - - s.Run("normal_case", func() { - s.dc.EXPECT().UpdateSegmentStatistics(mock.Anything, mock.Anything). - Run(func(_ context.Context, r *datapb.UpdateSegmentStatisticsRequest, _ ...grpc.CallOption) { - s.Equal(len(req.GetStats()), len(r.GetStats())) - }). - Return(merr.Status(nil), nil) - err := s.broker.UpdateSegmentStatistics(ctx, req) - s.NoError(err) - s.resetMock() - }) - - s.Run("datacoord_return_failure_status", func() { - s.dc.EXPECT().UpdateSegmentStatistics(mock.Anything, mock.Anything). - Return(nil, errors.New("mock")) - err := s.broker.UpdateSegmentStatistics(ctx, req) - s.Error(err) - s.resetMock() - }) - - s.Run("datacoord_return_failure_status", func() { - s.dc.EXPECT().UpdateSegmentStatistics(mock.Anything, mock.Anything). - Return(merr.Status(errors.New("mock")), nil) - err := s.broker.UpdateSegmentStatistics(ctx, req) - s.Error(err) - s.resetMock() - }) -} - func (s *dataCoordSuite) TestImportV2() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/flushcommon/broker/mock_broker.go b/internal/flushcommon/broker/mock_broker.go index 030c7c34f4..4b6557530a 100644 --- a/internal/flushcommon/broker/mock_broker.go +++ b/internal/flushcommon/broker/mock_broker.go @@ -417,53 +417,6 @@ func (_c *MockBroker_UpdateChannelCheckpoint_Call) RunAndReturn(run func(context return _c } -// UpdateSegmentStatistics provides a mock function with given fields: ctx, req -func (_m *MockBroker) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) error { - ret := _m.Called(ctx, req) - - if len(ret) == 0 { - panic("no return value specified for UpdateSegmentStatistics") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *datapb.UpdateSegmentStatisticsRequest) error); ok { - r0 = rf(ctx, req) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// MockBroker_UpdateSegmentStatistics_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateSegmentStatistics' -type MockBroker_UpdateSegmentStatistics_Call struct { - *mock.Call -} - -// UpdateSegmentStatistics is a helper method to define mock.On call -// - ctx context.Context -// - req *datapb.UpdateSegmentStatisticsRequest -func (_e *MockBroker_Expecter) UpdateSegmentStatistics(ctx interface{}, req interface{}) *MockBroker_UpdateSegmentStatistics_Call { - return &MockBroker_UpdateSegmentStatistics_Call{Call: _e.mock.On("UpdateSegmentStatistics", ctx, req)} -} - -func (_c *MockBroker_UpdateSegmentStatistics_Call) Run(run func(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest)) *MockBroker_UpdateSegmentStatistics_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*datapb.UpdateSegmentStatisticsRequest)) - }) - return _c -} - -func (_c *MockBroker_UpdateSegmentStatistics_Call) Return(_a0 error) *MockBroker_UpdateSegmentStatistics_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockBroker_UpdateSegmentStatistics_Call) RunAndReturn(run func(context.Context, *datapb.UpdateSegmentStatisticsRequest) error) *MockBroker_UpdateSegmentStatistics_Call { - _c.Call.Return(run) - return _c -} - // NewMockBroker creates a new instance of MockBroker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockBroker(t interface { diff --git a/internal/flushcommon/pipeline/data_sync_service_test.go b/internal/flushcommon/pipeline/data_sync_service_test.go index 25932e7137..07d82295c1 100644 --- a/internal/flushcommon/pipeline/data_sync_service_test.go +++ b/internal/flushcommon/pipeline/data_sync_service_test.go @@ -320,7 +320,6 @@ func (s *DataSyncServiceSuite) SetupTest() { s.chunkManager = mocks.NewChunkManager(s.T()) s.broker = broker.NewMockBroker(s.T()) - s.broker.EXPECT().UpdateSegmentStatistics(mock.Anything, mock.Anything).Return(nil).Maybe() s.allocator = allocator.NewMockAllocator(s.T()) s.wbManager = writebuffer.NewMockBufferManager(s.T()) diff --git a/internal/flushcommon/pipeline/testutils_test.go b/internal/flushcommon/pipeline/testutils_test.go index 145e2c225c..cc36a033f2 100644 --- a/internal/flushcommon/pipeline/testutils_test.go +++ b/internal/flushcommon/pipeline/testutils_test.go @@ -171,10 +171,6 @@ func (ds *DataCoordFactory) DropVirtualChannel(ctx context.Context, req *datapb. }, nil } -func (ds *DataCoordFactory) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { - return merr.Success(), nil -} - func (ds *DataCoordFactory) UpdateChannelCheckpoint(ctx context.Context, req *datapb.UpdateChannelCheckpointRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { return merr.Success(), nil } diff --git a/pkg/proto/data_coord.proto b/pkg/proto/data_coord.proto index f71de652fe..d82fb6467d 100644 --- a/pkg/proto/data_coord.proto +++ b/pkg/proto/data_coord.proto @@ -71,6 +71,7 @@ service DataCoord { rpc DropVirtualChannel(DropVirtualChannelRequest) returns (DropVirtualChannelResponse) {} rpc SetSegmentState(SetSegmentStateRequest) returns (SetSegmentStateResponse) {} + // Deprecated rpc UpdateSegmentStatistics(UpdateSegmentStatisticsRequest) returns (common.Status) {} rpc UpdateChannelCheckpoint(UpdateChannelCheckpointRequest) returns (common.Status) {} diff --git a/pkg/proto/datapb/data_coord_grpc.pb.go b/pkg/proto/datapb/data_coord_grpc.pb.go index 320efe5637..d72f5b1003 100644 --- a/pkg/proto/datapb/data_coord_grpc.pb.go +++ b/pkg/proto/datapb/data_coord_grpc.pb.go @@ -109,6 +109,7 @@ type DataCoordClient interface { GetFlushState(ctx context.Context, in *GetFlushStateRequest, opts ...grpc.CallOption) (*milvuspb.GetFlushStateResponse, error) DropVirtualChannel(ctx context.Context, in *DropVirtualChannelRequest, opts ...grpc.CallOption) (*DropVirtualChannelResponse, error) SetSegmentState(ctx context.Context, in *SetSegmentStateRequest, opts ...grpc.CallOption) (*SetSegmentStateResponse, error) + // Deprecated UpdateSegmentStatistics(ctx context.Context, in *UpdateSegmentStatisticsRequest, opts ...grpc.CallOption) (*commonpb.Status, error) UpdateChannelCheckpoint(ctx context.Context, in *UpdateChannelCheckpointRequest, opts ...grpc.CallOption) (*commonpb.Status, error) MarkSegmentsDropped(ctx context.Context, in *MarkSegmentsDroppedRequest, opts ...grpc.CallOption) (*commonpb.Status, error) @@ -620,6 +621,7 @@ type DataCoordServer interface { GetFlushState(context.Context, *GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) DropVirtualChannel(context.Context, *DropVirtualChannelRequest) (*DropVirtualChannelResponse, error) SetSegmentState(context.Context, *SetSegmentStateRequest) (*SetSegmentStateResponse, error) + // Deprecated UpdateSegmentStatistics(context.Context, *UpdateSegmentStatisticsRequest) (*commonpb.Status, error) UpdateChannelCheckpoint(context.Context, *UpdateChannelCheckpointRequest) (*commonpb.Status, error) MarkSegmentsDropped(context.Context, *MarkSegmentsDroppedRequest) (*commonpb.Status, error)