enhance: Replace currRows with NumOfRows ()

See also: 

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/40499/head
XuanYang-cn 2025-03-10 12:16:03 +08:00 committed by GitHub
parent 1c9d43ee9f
commit 4bebca6416
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 83 additions and 290 deletions

View File

@ -138,7 +138,6 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() {
CollectionID: 2,
PartitionID: 3,
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
@ -260,7 +259,6 @@ func (s *CompactionPlanHandlerSuite) TestScheduleWithSlotLimit() {
CollectionID: 2,
PartitionID: 3,
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
@ -353,7 +351,6 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() {
CollectionID: 2,
PartitionID: 3,
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
@ -633,7 +630,6 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
CollectionID: 2,
PartitionID: 3,
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,

View File

@ -443,7 +443,7 @@ func getImportRowsInfo(jobID int64, imeta ImportMeta, meta *meta) (importedRows,
})
segmentIDs = append(segmentIDs, task.(*importTask).GetSegmentIDs()...)
}
importedRows = meta.GetSegmentsTotalCurrentRows(segmentIDs)
importedRows = meta.GetSegmentsTotalNumRows(segmentIDs)
return
}
@ -553,7 +553,7 @@ func GetTaskProgresses(jobID int64, imeta ImportMeta, meta *meta) []*internalpb.
totalRows := lo.SumBy(task.GetFileStats(), func(file *datapb.ImportFileStats) int64 {
return file.GetTotalRows()
})
importedRows := meta.GetSegmentsTotalCurrentRows(task.(*importTask).GetSegmentIDs())
importedRows := meta.GetSegmentsTotalNumRows(task.(*importTask).GetSegmentIDs())
progress := int64(100)
if totalRows != 0 {
progress = int64(float32(importedRows) / float32(totalRows) * 100)

View File

@ -652,15 +652,15 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
err = imeta.AddTask(context.TODO(), it1)
assert.NoError(t, err)
err = meta.AddSegment(ctx, &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{ID: 10, IsImporting: true, State: commonpb.SegmentState_Flushed}, currRows: 50,
SegmentInfo: &datapb.SegmentInfo{ID: 10, IsImporting: true, State: commonpb.SegmentState_Flushed, NumOfRows: 50},
})
assert.NoError(t, err)
err = meta.AddSegment(ctx, &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{ID: 11, IsImporting: true, State: commonpb.SegmentState_Flushed}, currRows: 50,
SegmentInfo: &datapb.SegmentInfo{ID: 11, IsImporting: true, State: commonpb.SegmentState_Flushed, NumOfRows: 50},
})
assert.NoError(t, err)
err = meta.AddSegment(ctx, &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{ID: 12, IsImporting: true, State: commonpb.SegmentState_Flushed}, currRows: 50,
SegmentInfo: &datapb.SegmentInfo{ID: 12, IsImporting: true, State: commonpb.SegmentState_Flushed, NumOfRows: 50},
})
assert.NoError(t, err)
@ -681,15 +681,15 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
err = imeta.AddTask(context.TODO(), it2)
assert.NoError(t, err)
err = meta.AddSegment(ctx, &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{ID: 20, IsImporting: true, State: commonpb.SegmentState_Flushed}, currRows: 50,
SegmentInfo: &datapb.SegmentInfo{ID: 20, IsImporting: true, State: commonpb.SegmentState_Flushed, NumOfRows: 50},
})
assert.NoError(t, err)
err = meta.AddSegment(ctx, &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{ID: 21, IsImporting: true, State: commonpb.SegmentState_Flushed}, currRows: 50,
SegmentInfo: &datapb.SegmentInfo{ID: 21, IsImporting: true, State: commonpb.SegmentState_Flushed, NumOfRows: 50},
})
assert.NoError(t, err)
err = meta.AddSegment(ctx, &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{ID: 22, IsImporting: true, State: commonpb.SegmentState_Flushed}, currRows: 50,
SegmentInfo: &datapb.SegmentInfo{ID: 22, IsImporting: true, State: commonpb.SegmentState_Flushed, NumOfRows: 50},
})
assert.NoError(t, err)

View File

@ -777,7 +777,6 @@ func TestServer_GetIndexState(t *testing.T) {
Timestamp: createTS - 1,
},
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
@ -836,7 +835,6 @@ func TestServer_GetIndexState(t *testing.T) {
Timestamp: createTS - 1,
},
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
@ -1014,7 +1012,6 @@ func TestServer_GetSegmentIndexState(t *testing.T) {
PartitionID: partID,
InsertChannel: "ch",
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
@ -1134,7 +1131,6 @@ func TestServer_GetIndexBuildProgress(t *testing.T) {
Timestamp: createTS,
},
},
currRows: 10250,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
@ -1181,7 +1177,6 @@ func TestServer_GetIndexBuildProgress(t *testing.T) {
Timestamp: createTS,
},
},
currRows: 10250,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,

View File

@ -647,7 +647,7 @@ func (m *meta) GetAllSegmentsUnsafe() []*SegmentInfo {
return m.segments.GetSegments()
}
func (m *meta) GetSegmentsTotalCurrentRows(segmentIDs []UniqueID) int64 {
func (m *meta) GetSegmentsTotalNumRows(segmentIDs []UniqueID) int64 {
m.RLock()
defer m.RUnlock()
var sum int64 = 0
@ -657,7 +657,7 @@ func (m *meta) GetSegmentsTotalCurrentRows(segmentIDs []UniqueID) int64 {
log.Ctx(context.TODO()).Warn("cannot find segment", zap.Int64("segmentID", segmentID))
continue
}
sum += segment.currRows
sum += segment.GetNumOfRows()
}
return sum
}
@ -1037,6 +1037,9 @@ func UpdateCheckPointOperator(segmentID int64, checkpoints []*datapb.CheckPoint)
return false
}
var cpNumRows int64
// Set segment dml position
for _, cp := range checkpoints {
if cp.SegmentID != segmentID {
// Don't think this is gonna to happen, ignore for now.
@ -1050,18 +1053,22 @@ func UpdateCheckPointOperator(segmentID int64, checkpoints []*datapb.CheckPoint)
continue
}
segment.NumOfRows = cp.NumOfRows
cpNumRows = cp.NumOfRows
segment.DmlPosition = cp.GetPosition()
}
// update segments num rows
count := segmentutil.CalcRowCountFromBinLog(segment.SegmentInfo)
if count != segment.currRows && count > 0 {
log.Ctx(context.TODO()).Info("check point reported inconsistent with bin log row count",
zap.Int64("segmentID", segmentID),
zap.Int64("current rows (wrong)", segment.currRows),
zap.Int64("segment bin log row count (correct)", count))
if count > 0 {
if cpNumRows != count {
log.Ctx(context.TODO()).Info("check point reported row count inconsistent with binlog row count",
zap.Int64("segmentID", segmentID),
zap.Int64("binlog reported (wrong)", cpNumRows),
zap.Int64("segment binlog row count (correct)", count))
}
segment.NumOfRows = count
}
return true
}
}
@ -1074,7 +1081,6 @@ func UpdateImportedRows(segmentID int64, rows int64) UpdateOperator {
zap.Int64("segmentID", segmentID))
return false
}
segment.currRows = rows
segment.NumOfRows = rows
segment.MaxRowNum = rows
return true
@ -1261,8 +1267,7 @@ func (m *meta) mergeDropSegment(seg2Drop *SegmentInfo) (*SegmentInfo, *segMetric
if seg2Drop.GetDmlPosition() != nil {
clonedSegment.DmlPosition = seg2Drop.GetDmlPosition()
}
clonedSegment.currRows = seg2Drop.currRows
clonedSegment.NumOfRows = seg2Drop.currRows
clonedSegment.NumOfRows = seg2Drop.GetNumOfRows()
return clonedSegment, metricMutation
}
@ -1417,6 +1422,12 @@ func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error {
return nil
}
func (m *meta) SetRowCount(segmentID UniqueID, rowCount int64) {
m.Lock()
defer m.Unlock()
m.segments.SetRowCount(segmentID, rowCount)
}
// SetAllocations set Segment allocations, will overwrite ALL original allocations
// Note that allocations is not persisted in KV store
func (m *meta) SetAllocations(segmentID UniqueID, allocations []*Allocation) {
@ -1425,14 +1436,6 @@ func (m *meta) SetAllocations(segmentID UniqueID, allocations []*Allocation) {
m.segments.SetAllocations(segmentID, allocations)
}
// SetCurrentRows set current row count for segment with provided `segmentID`
// Note that currRows is not persisted in KV store
func (m *meta) SetCurrentRows(segmentID UniqueID, rows int64) {
m.Lock()
defer m.Unlock()
m.segments.SetCurrentRows(segmentID, rows)
}
// SetLastExpire set lastExpire time for segment
// Note that last is not necessary to store in KV meta
func (m *meta) SetLastExpire(segmentID UniqueID, lastExpire uint64) {
@ -1451,6 +1454,14 @@ func (m *meta) SetLastFlushTime(segmentID UniqueID, t time.Time) {
m.segments.SetFlushTime(segmentID, t)
}
// SetLastWrittenTime set LastWrittenTime for segment with provided `segmentID`
// Note that lastWrittenTime is not persisted in KV store
func (m *meta) SetLastWrittenTime(segmentID UniqueID) {
m.Lock()
defer m.Unlock()
m.segments.SetLastWrittenTime(segmentID)
}
func (m *meta) CheckSegmentsStating(ctx context.Context, segmentIDs []UniqueID) (exist bool, hasStating bool) {
m.RLock()
defer m.RUnlock()

View File

@ -810,7 +810,7 @@ func TestUpdateSegmentsInfo(t *testing.T) {
segment1 := NewSegmentInfo(&datapb.SegmentInfo{
ID: 1, State: commonpb.SegmentState_Growing,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(1, 1, 222)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
})
err = meta.AddSegment(context.TODO(), segment1)
@ -820,11 +820,11 @@ func TestUpdateSegmentsInfo(t *testing.T) {
err = meta.UpdateSegmentsInfo(
context.TODO(),
UpdateStatusOperator(1, commonpb.SegmentState_Flushing),
UpdateStatusOperator(1, commonpb.SegmentState_Growing),
AddBinlogsOperator(1,
[]*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(1, 10, 1)},
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 1)},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: "", LogID: 2}}}},
[]*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(1, 10, 333)},
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 334)},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogID: 335}}}},
[]*datapb.FieldBinlog{},
),
UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}),
@ -837,9 +837,9 @@ func TestUpdateSegmentsInfo(t *testing.T) {
assert.EqualValues(t, 1, updated.getDeltaCount())
expected := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: 1, State: commonpb.SegmentState_Flushing, NumOfRows: 10,
ID: 1, State: commonpb.SegmentState_Growing, NumOfRows: 11,
StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}},
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0, 1)},
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 222, 333)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0, 1)},
Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000}}}},
}}
@ -1202,9 +1202,9 @@ func TestMeta_HasSegments(t *testing.T) {
segments: map[UniqueID]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
ID: 1,
NumOfRows: 100,
},
currRows: 100,
},
},
},

View File

@ -147,12 +147,13 @@ func (f segmentSealPolicyFunc) ShouldSeal(segment *SegmentInfo, ts Timestamp) (b
}
// sealL1SegmentByCapacity get segmentSealPolicy with segment size factor policy
// TODO: change numOfRows to size
func sealL1SegmentByCapacity(sizeFactor float64) segmentSealPolicyFunc {
return func(segment *SegmentInfo, ts Timestamp) (bool, string) {
jitter := paramtable.Get().DataCoordCfg.SegmentSealProportionJitter.GetAsFloat()
ratio := (1 - jitter*rand.Float64())
return float64(segment.currRows) >= sizeFactor*float64(segment.GetMaxRowNum())*ratio,
fmt.Sprintf("Row count capacity full, current rows: %d, max row: %d, seal factor: %f, jitter ratio: %f", segment.currRows, segment.GetMaxRowNum(), sizeFactor, ratio)
return float64(segment.GetNumOfRows()) >= sizeFactor*float64(segment.GetMaxRowNum())*ratio,
fmt.Sprintf("Row count capacity full, current rows: %d, max row: %d, seal factor: %f, jitter ratio: %f", segment.GetNumOfRows(), segment.GetMaxRowNum(), sizeFactor, ratio)
}
}
@ -196,12 +197,13 @@ func sealL1SegmentByBinlogFileNumber(maxBinlogFileNumber int) segmentSealPolicyF
// into this segment anymore, so sealLongTimeIdlePolicy will seal these segments to trigger handoff of query cluster.
// Q: Why we don't decrease the expiry time directly?
// A: We don't want to influence segments which are accepting `frequent small` batch entities.
// TODO: replace rowNum with segment size
func sealL1SegmentByIdleTime(idleTimeTolerance time.Duration, minSizeToSealIdleSegment float64, maxSizeOfSegment float64) segmentSealPolicyFunc {
return func(segment *SegmentInfo, ts Timestamp) (bool, string) {
limit := (minSizeToSealIdleSegment / maxSizeOfSegment) * float64(segment.GetMaxRowNum())
return time.Since(segment.lastWrittenTime) > idleTimeTolerance &&
float64(segment.currRows) > limit,
fmt.Sprintf("segment idle, segment row number :%d, last written time: %v, max idle duration: %v", segment.currRows, segment.lastWrittenTime, idleTimeTolerance)
float64(segment.GetNumOfRows()) > limit,
fmt.Sprintf("segment idle, segment row number :%d, last written time: %v, max idle duration: %v", segment.GetNumOfRows(), segment.lastWrittenTime, idleTimeTolerance)
}
}
@ -264,10 +266,7 @@ func flushPolicyL1(segment *SegmentInfo, t Timestamp) bool {
segment.Level != datapb.SegmentLevel_L0 &&
time.Since(segment.lastFlushTime) >= paramtable.Get().DataCoordCfg.SegmentFlushInterval.GetAsDuration(time.Second) &&
segment.GetLastExpireTime() <= t &&
// A corner case when there's only 1 row in the segment, and the segment is synced
// before report currRows to DC. When DN recovered at this moment,
// it'll never report this segment's numRows again, leaving segment.currRows == 0 forever.
(segment.currRows != 0 || segment.GetNumOfRows() != 0) &&
segment.GetNumOfRows() != 0 &&
// Decoupling the importing segment from the flush process,
// This check avoids notifying the datanode to flush the
// importing segment which may not exist.

View File

@ -246,10 +246,10 @@ func Test_sealLongTimeIdlePolicy(t *testing.T) {
seg1 := &SegmentInfo{lastWrittenTime: time.Now().Add(idleTimeTolerance * 5)}
shouldSeal, _ := policy.ShouldSeal(seg1, 100)
assert.False(t, shouldSeal)
seg2 := &SegmentInfo{lastWrittenTime: getZeroTime(), currRows: 1, SegmentInfo: &datapb.SegmentInfo{MaxRowNum: 10000}}
seg2 := &SegmentInfo{lastWrittenTime: getZeroTime(), SegmentInfo: &datapb.SegmentInfo{MaxRowNum: 10000, NumOfRows: 1}}
shouldSeal, _ = policy.ShouldSeal(seg2, 100)
assert.False(t, shouldSeal)
seg3 := &SegmentInfo{lastWrittenTime: getZeroTime(), currRows: 1000, SegmentInfo: &datapb.SegmentInfo{MaxRowNum: 10000}}
seg3 := &SegmentInfo{lastWrittenTime: getZeroTime(), SegmentInfo: &datapb.SegmentInfo{MaxRowNum: 10000, NumOfRows: 1000}}
shouldSeal, _ = policy.ShouldSeal(seg3, 100)
assert.True(t, shouldSeal)
}
@ -287,19 +287,3 @@ func Test_sealByTotalGrowingSegmentsSize(t *testing.T) {
assert.Equal(t, 1, len(res))
assert.Equal(t, seg2.GetID(), res[0].GetID())
}
func TestFlushPolicyWithZeroCurRows(t *testing.T) {
seg := &SegmentInfo{
currRows: 0,
// lastFlushTime unset because its a sealed segment
SegmentInfo: &datapb.SegmentInfo{
NumOfRows: 1,
State: commonpb.SegmentState_Sealed,
Level: datapb.SegmentLevel_L1,
LastExpireTime: 456094911979061251,
},
}
flushed := flushPolicyL1(seg, tsoutil.GetCurrentTime())
assert.True(t, flushed)
}

View File

@ -50,7 +50,6 @@ type segmentInfoIndexes struct {
// SegmentInfo wraps datapb.SegmentInfo and patches some extra info on it
type SegmentInfo struct {
*datapb.SegmentInfo
currRows int64
allocations []*Allocation
lastFlushTime time.Time
isCompacting bool
@ -70,7 +69,6 @@ type SegmentInfo struct {
func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo {
s := &SegmentInfo{
SegmentInfo: info,
currRows: info.GetNumOfRows(),
}
// setup growing fields
if s.GetState() == commonpb.SegmentState_Growing {
@ -258,12 +256,12 @@ func (s *SegmentsInfo) AddAllocation(segmentID UniqueID, allocation *Allocation)
}
}
// SetCurrentRows sets rows count for segment
// UpdateLastWrittenTime updates segment last writtent time to now.
// if the segment is not found, do nothing
// uses `ShadowClone` since internal SegmentInfo is not changed
func (s *SegmentsInfo) SetCurrentRows(segmentID UniqueID, rows int64) {
func (s *SegmentsInfo) SetLastWrittenTime(segmentID UniqueID) {
if segment, ok := s.segments[segmentID]; ok {
s.segments[segmentID] = segment.ShadowClone(SetCurrentRows(rows))
s.segments[segmentID] = segment.ShadowClone(SetLastWrittenTime())
}
}
@ -326,7 +324,6 @@ func (s *SegmentInfo) Clone(opts ...SegmentInfoOption) *SegmentInfo {
info := proto.Clone(s.SegmentInfo).(*datapb.SegmentInfo)
cloned := &SegmentInfo{
SegmentInfo: info,
currRows: s.currRows,
allocations: s.allocations,
lastFlushTime: s.lastFlushTime,
isCompacting: s.isCompacting,
@ -343,7 +340,6 @@ func (s *SegmentInfo) Clone(opts ...SegmentInfoOption) *SegmentInfo {
func (s *SegmentInfo) ShadowClone(opts ...SegmentInfoOption) *SegmentInfo {
cloned := &SegmentInfo{
SegmentInfo: s.SegmentInfo,
currRows: s.currRows,
allocations: s.allocations,
lastFlushTime: s.lastFlushTime,
isCompacting: s.isCompacting,
@ -457,10 +453,9 @@ func AddAllocation(allocation *Allocation) SegmentInfoOption {
}
}
// SetCurrentRows is the option to set current row count for segment info
func SetCurrentRows(rows int64) SegmentInfoOption {
// SetLastWrittenTime is the option to set last writtent time for segment info
func SetLastWrittenTime() SegmentInfoOption {
return func(segment *SegmentInfo) {
segment.currRows = rows
segment.lastWrittenTime = time.Now()
}
}

View File

@ -582,7 +582,7 @@ func (s *SegmentManager) CleanZeroSealedSegmentsOfChannel(ctx context.Context, c
return true
}
// Check if segment is empty
if segment.GetLastExpireTime() > 0 && segment.GetLastExpireTime() < cpTs && segment.currRows == 0 && segment.GetNumOfRows() == 0 {
if segment.GetLastExpireTime() > 0 && segment.GetLastExpireTime() < cpTs && segment.GetNumOfRows() == 0 {
log.Info("try remove empty sealed segment after channel cp updated",
zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id),
zap.String("channel", channel), zap.Any("cpTs", cpTs))

View File

@ -243,9 +243,12 @@ func TestLastExpireReset(t *testing.T) {
segmentID3, expire3 := allocs[0].SegmentID, allocs[0].ExpireTime
// simulate handleTimeTick op on dataCoord
meta.SetCurrentRows(segmentID1, bigRows)
meta.SetCurrentRows(segmentID2, bigRows)
meta.SetCurrentRows(segmentID3, smallRows)
// meta.SetLastWrittenTime(segmentID1, bigRows)
// meta.SetLastWrittenTime(segmentID2, bigRows)
// meta.SetLastWrittenTime(segmentID3, smallRows)
meta.SetRowCount(segmentID1, bigRows)
meta.SetRowCount(segmentID2, bigRows)
meta.SetRowCount(segmentID3, smallRows)
err = segmentManager.tryToSealSegment(context.TODO(), expire1, channelName)
assert.NoError(t, err)
assert.Equal(t, commonpb.SegmentState_Sealed, meta.GetSegment(context.TODO(), segmentID1).GetState())
@ -268,9 +271,9 @@ func TestLastExpireReset(t *testing.T) {
assert.Nil(t, err)
newSegmentManager, _ := newSegmentManager(restartedMeta, mockAllocator)
// reset row number to avoid being cleaned by empty segment
restartedMeta.SetCurrentRows(segmentID1, bigRows)
restartedMeta.SetCurrentRows(segmentID2, bigRows)
restartedMeta.SetCurrentRows(segmentID3, smallRows)
restartedMeta.SetRowCount(segmentID1, bigRows)
restartedMeta.SetRowCount(segmentID2, bigRows)
restartedMeta.SetRowCount(segmentID3, smallRows)
// verify lastExpire of growing and sealed segments
segment1, segment2, segment3 := restartedMeta.GetSegment(context.TODO(), segmentID1), restartedMeta.GetSegment(context.TODO(), segmentID2), restartedMeta.GetSegment(context.TODO(), segmentID3)
@ -498,7 +501,7 @@ func TestGetFlushableSegments(t *testing.T) {
assert.EqualValues(t, 1, len(ids))
assert.EqualValues(t, allocations[0].SegmentID, ids[0])
meta.SetCurrentRows(allocations[0].SegmentID, 1)
meta.SetRowCount(allocations[0].SegmentID, 1)
ids, err = segmentManager.GetFlushableSegments(context.TODO(), "c1", allocations[0].ExpireTime)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(ids))
@ -515,7 +518,7 @@ func TestGetFlushableSegments(t *testing.T) {
assert.EqualValues(t, 1, len(ids))
assert.EqualValues(t, allocations[0].SegmentID, ids[0])
meta.SetCurrentRows(allocations[0].SegmentID, 0)
meta.SetRowCount(allocations[0].SegmentID, 0)
postions := make([]*msgpb.MsgPosition, 0)
cpTs := allocations[0].ExpireTime + 1
postions = append(postions, &msgpb.MsgPosition{
@ -949,7 +952,6 @@ func TestSegmentManager_CleanZeroSealedSegmentsOfChannel(t *testing.T) {
NumOfRows: 1,
LastExpireTime: 100,
},
currRows: 1,
}
seg2 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
@ -960,7 +962,6 @@ func TestSegmentManager_CleanZeroSealedSegmentsOfChannel(t *testing.T) {
NumOfRows: 0,
LastExpireTime: 100,
},
currRows: 0,
}
seg3 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
@ -980,7 +981,6 @@ func TestSegmentManager_CleanZeroSealedSegmentsOfChannel(t *testing.T) {
NumOfRows: 1,
LastExpireTime: 100,
},
currRows: 1,
}
newMetaFunc := func() *meta {
return &meta{

View File

@ -784,36 +784,6 @@ func (s *Server) startTaskScheduler() {
s.startCollectMetaMetrics(s.serverLoopCtx)
}
func (s *Server) updateSegmentStatistics(ctx context.Context, stats []*commonpb.SegmentStats) {
log := log.Ctx(ctx)
for _, stat := range stats {
segment := s.meta.GetSegment(ctx, stat.GetSegmentID())
if segment == nil {
log.Warn("skip updating row number for not exist segment",
zap.Int64("segmentID", stat.GetSegmentID()),
zap.Int64("new value", stat.GetNumRows()))
continue
}
if isFlushState(segment.GetState()) {
log.Warn("skip updating row number for flushed segment",
zap.Int64("segmentID", stat.GetSegmentID()),
zap.Int64("new value", stat.GetNumRows()))
continue
}
// Log if # of rows is updated.
if segment.currRows < stat.GetNumRows() {
log.Debug("Updating segment number of rows",
zap.Int64("segmentID", stat.GetSegmentID()),
zap.Int64("old value", s.meta.GetSegment(ctx, stat.GetSegmentID()).GetNumOfRows()),
zap.Int64("new value", stat.GetNumRows()),
)
s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
}
}
}
func (s *Server) getFlushableSegmentsInfo(ctx context.Context, flushableIDs []int64) []*SegmentInfo {
log := log.Ctx(ctx)
res := make([]*SegmentInfo, 0, len(flushableIDs))

View File

@ -2274,62 +2274,6 @@ func TestDataCoordServer_SetSegmentState(t *testing.T) {
})
}
func TestDataCoord_SegmentStatistics(t *testing.T) {
t.Run("test update imported segment stat", func(t *testing.T) {
svr := newTestServer(t)
seg1 := &datapb.SegmentInfo{
ID: 100,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(101, 1, 1)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3)},
State: commonpb.SegmentState_Importing,
}
info := NewSegmentInfo(seg1)
svr.meta.AddSegment(context.TODO(), info)
status, err := svr.UpdateSegmentStatistics(context.TODO(), &datapb.UpdateSegmentStatisticsRequest{
Stats: []*commonpb.SegmentStats{{
SegmentID: 100,
NumRows: int64(1),
}},
})
assert.NoError(t, err)
assert.Equal(t, svr.meta.GetHealthySegment(context.TODO(), 100).currRows, int64(1))
assert.Equal(t, commonpb.ErrorCode_Success, status.GetErrorCode())
closeTestServer(t, svr)
})
t.Run("test update flushed segment stat", func(t *testing.T) {
svr := newTestServer(t)
seg1 := &datapb.SegmentInfo{
ID: 100,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(101, 1, 1)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3)},
State: commonpb.SegmentState_Flushed,
}
info := NewSegmentInfo(seg1)
svr.meta.AddSegment(context.TODO(), info)
status, err := svr.UpdateSegmentStatistics(context.TODO(), &datapb.UpdateSegmentStatisticsRequest{
Stats: []*commonpb.SegmentStats{{
SegmentID: 100,
NumRows: int64(1),
}},
})
assert.NoError(t, err)
assert.Equal(t, svr.meta.GetHealthySegment(context.TODO(), 100).currRows, int64(0))
assert.Equal(t, commonpb.ErrorCode_Success, status.GetErrorCode())
closeTestServer(t, svr)
})
}
func TestDataCoordServer_UpdateChannelCheckpoint(t *testing.T) {
mockVChannel := "fake-by-dev-rootcoord-dml-1-testchannelcp-v0"

View File

@ -566,6 +566,8 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
log.Error("save binlog and checkpoints failed", zap.Error(err))
return merr.Status(err), nil
}
s.meta.SetLastWrittenTime(req.GetSegmentID())
log.Info("SaveBinlogPaths sync segment with meta",
zap.Any("binlogs", req.GetField2BinlogPaths()),
zap.Any("deltalogs", req.GetDeltalogs()),
@ -1411,12 +1413,12 @@ func (s *Server) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAll
return resp, nil
}
// Deprecated
// UpdateSegmentStatistics updates a segment's stats.
func (s *Server) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) (*commonpb.Status, error) {
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
return merr.Status(err), nil
}
s.updateSegmentStatistics(ctx, req.GetStats())
return merr.Success(), nil
}
@ -1495,10 +1497,9 @@ func (s *Server) ReportDataNodeTtMsgs(ctx context.Context, req *datapb.ReportDat
func (s *Server) handleDataNodeTtMsg(ctx context.Context, ttMsg *msgpb.DataNodeTtMsg) error {
var (
channel = ttMsg.GetChannelName()
ts = ttMsg.GetTimestamp()
sourceID = ttMsg.GetBase().GetSourceID()
segmentStats = ttMsg.GetSegmentsStats()
channel = ttMsg.GetChannelName()
ts = ttMsg.GetTimestamp()
sourceID = ttMsg.GetBase().GetSourceID()
)
physical, _ := tsoutil.ParseTS(ts)
@ -1523,8 +1524,6 @@ func (s *Server) handleDataNodeTtMsg(ctx context.Context, ttMsg *msgpb.DataNodeT
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), pChannelName).
Set(float64(sub))
s.updateSegmentStatistics(ctx, segmentStats)
s.segmentManager.ExpireAllocations(ctx, channel, ts)
flushableIDs, err := s.segmentManager.GetFlushableSegments(ctx, channel, ts)

View File

@ -517,7 +517,7 @@ func (s *ServerSuite) TestFlush_NormalCase() {
s.NoError(err)
s.EqualValues(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
s.testServer.meta.SetCurrentRows(segID, 1)
s.testServer.meta.SetRowCount(segID, 1)
ids, err := s.testServer.segmentManager.GetFlushableSegments(context.TODO(), "channel-1", expireTs)
s.NoError(err)
s.EqualValues(1, len(ids))

View File

@ -38,6 +38,5 @@ type DataCoord interface {
UpdateChannelCheckpoint(ctx context.Context, channelCPs []*msgpb.MsgPosition) error
SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) error
DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error)
UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) error
ImportV2(ctx context.Context, in *internalpb.ImportRequestInternal) (*internalpb.ImportResponse, error)
}

View File

@ -156,18 +156,6 @@ func (dc *dataCoordBroker) DropVirtualChannel(ctx context.Context, req *datapb.D
return resp, nil
}
func (dc *dataCoordBroker) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) error {
log := log.Ctx(ctx)
resp, err := dc.client.UpdateSegmentStatistics(ctx, req)
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Warn("failed to UpdateSegmentStatistics", zap.Error(err))
return err
}
return nil
}
func (dc *dataCoordBroker) ImportV2(ctx context.Context, in *internalpb.ImportRequestInternal) (*internalpb.ImportResponse, error) {
resp, err := dc.client.ImportV2(ctx, in)
if err := merr.CheckRPCCall(resp, err); err != nil {

View File

@ -291,44 +291,6 @@ func (s *dataCoordSuite) TestDropVirtualChannel() {
})
}
func (s *dataCoordSuite) TestUpdateSegmentStatistics() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req := &datapb.UpdateSegmentStatisticsRequest{
Stats: []*commonpb.SegmentStats{
{}, {}, {},
},
}
s.Run("normal_case", func() {
s.dc.EXPECT().UpdateSegmentStatistics(mock.Anything, mock.Anything).
Run(func(_ context.Context, r *datapb.UpdateSegmentStatisticsRequest, _ ...grpc.CallOption) {
s.Equal(len(req.GetStats()), len(r.GetStats()))
}).
Return(merr.Status(nil), nil)
err := s.broker.UpdateSegmentStatistics(ctx, req)
s.NoError(err)
s.resetMock()
})
s.Run("datacoord_return_failure_status", func() {
s.dc.EXPECT().UpdateSegmentStatistics(mock.Anything, mock.Anything).
Return(nil, errors.New("mock"))
err := s.broker.UpdateSegmentStatistics(ctx, req)
s.Error(err)
s.resetMock()
})
s.Run("datacoord_return_failure_status", func() {
s.dc.EXPECT().UpdateSegmentStatistics(mock.Anything, mock.Anything).
Return(merr.Status(errors.New("mock")), nil)
err := s.broker.UpdateSegmentStatistics(ctx, req)
s.Error(err)
s.resetMock()
})
}
func (s *dataCoordSuite) TestImportV2() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

View File

@ -417,53 +417,6 @@ func (_c *MockBroker_UpdateChannelCheckpoint_Call) RunAndReturn(run func(context
return _c
}
// UpdateSegmentStatistics provides a mock function with given fields: ctx, req
func (_m *MockBroker) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) error {
ret := _m.Called(ctx, req)
if len(ret) == 0 {
panic("no return value specified for UpdateSegmentStatistics")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *datapb.UpdateSegmentStatisticsRequest) error); ok {
r0 = rf(ctx, req)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockBroker_UpdateSegmentStatistics_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateSegmentStatistics'
type MockBroker_UpdateSegmentStatistics_Call struct {
*mock.Call
}
// UpdateSegmentStatistics is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.UpdateSegmentStatisticsRequest
func (_e *MockBroker_Expecter) UpdateSegmentStatistics(ctx interface{}, req interface{}) *MockBroker_UpdateSegmentStatistics_Call {
return &MockBroker_UpdateSegmentStatistics_Call{Call: _e.mock.On("UpdateSegmentStatistics", ctx, req)}
}
func (_c *MockBroker_UpdateSegmentStatistics_Call) Run(run func(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest)) *MockBroker_UpdateSegmentStatistics_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*datapb.UpdateSegmentStatisticsRequest))
})
return _c
}
func (_c *MockBroker_UpdateSegmentStatistics_Call) Return(_a0 error) *MockBroker_UpdateSegmentStatistics_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockBroker_UpdateSegmentStatistics_Call) RunAndReturn(run func(context.Context, *datapb.UpdateSegmentStatisticsRequest) error) *MockBroker_UpdateSegmentStatistics_Call {
_c.Call.Return(run)
return _c
}
// NewMockBroker creates a new instance of MockBroker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockBroker(t interface {

View File

@ -320,7 +320,6 @@ func (s *DataSyncServiceSuite) SetupTest() {
s.chunkManager = mocks.NewChunkManager(s.T())
s.broker = broker.NewMockBroker(s.T())
s.broker.EXPECT().UpdateSegmentStatistics(mock.Anything, mock.Anything).Return(nil).Maybe()
s.allocator = allocator.NewMockAllocator(s.T())
s.wbManager = writebuffer.NewMockBufferManager(s.T())

View File

@ -171,10 +171,6 @@ func (ds *DataCoordFactory) DropVirtualChannel(ctx context.Context, req *datapb.
}, nil
}
func (ds *DataCoordFactory) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
return merr.Success(), nil
}
func (ds *DataCoordFactory) UpdateChannelCheckpoint(ctx context.Context, req *datapb.UpdateChannelCheckpointRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
return merr.Success(), nil
}

View File

@ -71,6 +71,7 @@ service DataCoord {
rpc DropVirtualChannel(DropVirtualChannelRequest) returns (DropVirtualChannelResponse) {}
rpc SetSegmentState(SetSegmentStateRequest) returns (SetSegmentStateResponse) {}
// Deprecated
rpc UpdateSegmentStatistics(UpdateSegmentStatisticsRequest) returns (common.Status) {}
rpc UpdateChannelCheckpoint(UpdateChannelCheckpointRequest) returns (common.Status) {}

View File

@ -109,6 +109,7 @@ type DataCoordClient interface {
GetFlushState(ctx context.Context, in *GetFlushStateRequest, opts ...grpc.CallOption) (*milvuspb.GetFlushStateResponse, error)
DropVirtualChannel(ctx context.Context, in *DropVirtualChannelRequest, opts ...grpc.CallOption) (*DropVirtualChannelResponse, error)
SetSegmentState(ctx context.Context, in *SetSegmentStateRequest, opts ...grpc.CallOption) (*SetSegmentStateResponse, error)
// Deprecated
UpdateSegmentStatistics(ctx context.Context, in *UpdateSegmentStatisticsRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
UpdateChannelCheckpoint(ctx context.Context, in *UpdateChannelCheckpointRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
MarkSegmentsDropped(ctx context.Context, in *MarkSegmentsDroppedRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
@ -620,6 +621,7 @@ type DataCoordServer interface {
GetFlushState(context.Context, *GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error)
DropVirtualChannel(context.Context, *DropVirtualChannelRequest) (*DropVirtualChannelResponse, error)
SetSegmentState(context.Context, *SetSegmentStateRequest) (*SetSegmentStateResponse, error)
// Deprecated
UpdateSegmentStatistics(context.Context, *UpdateSegmentStatisticsRequest) (*commonpb.Status, error)
UpdateChannelCheckpoint(context.Context, *UpdateChannelCheckpointRequest) (*commonpb.Status, error)
MarkSegmentsDropped(context.Context, *MarkSegmentsDroppedRequest) (*commonpb.Status, error)