mirror of https://github.com/milvus-io/milvus.git
Merge syncCPLagTooBehind policy into syncPeriodically policy (#26713)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/26822/head
parent
0530fd80c9
commit
4340cbfba2
|
@ -40,7 +40,6 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
|
@ -80,7 +79,6 @@ type Channel interface {
|
|||
mergeFlushedSegments(ctx context.Context, seg *Segment, planID UniqueID, compactedFrom []UniqueID) error
|
||||
listCompactedSegmentIDs() map[UniqueID][]UniqueID
|
||||
listSegmentIDsToSync(ts Timestamp) []UniqueID
|
||||
setSegmentLastSyncTs(segID UniqueID, ts Timestamp)
|
||||
|
||||
updateSegmentRowNumber(segID UniqueID, numRows int64)
|
||||
updateSingleSegmentMemorySize(segID UniqueID)
|
||||
|
@ -153,7 +151,6 @@ func newChannel(channelName string, collID UniqueID, schema *schemapb.Collection
|
|||
syncPolicies: []segmentSyncPolicy{
|
||||
syncPeriodically(),
|
||||
syncMemoryTooHigh(),
|
||||
syncCPLagTooBehind(),
|
||||
},
|
||||
|
||||
metaService: metaService,
|
||||
|
@ -225,7 +222,6 @@ func (c *ChannelMeta) addSegment(req addSegmentReq) error {
|
|||
historyInsertBuf: make([]*BufferData, 0),
|
||||
historyDeleteBuf: make([]*DelDataBuf, 0),
|
||||
startPos: req.startPos,
|
||||
lastSyncTs: tsoutil.GetCurrentTime(),
|
||||
}
|
||||
seg.setType(req.segType)
|
||||
// Set up pk stats
|
||||
|
@ -291,18 +287,6 @@ func (c *ChannelMeta) listSegmentIDsToSync(ts Timestamp) []UniqueID {
|
|||
return segIDsToSync.Collect()
|
||||
}
|
||||
|
||||
func (c *ChannelMeta) setSegmentLastSyncTs(segID UniqueID, ts Timestamp) {
|
||||
c.segMu.Lock()
|
||||
defer c.segMu.Unlock()
|
||||
if _, ok := c.segments[segID]; ok {
|
||||
c.segments[segID].lastSyncTs = ts
|
||||
tsTime, _ := tsoutil.ParseTS(ts)
|
||||
log.Debug("Set last syncTs for segment", zap.Int64("segmentID", segID), zap.Time("ts", tsTime))
|
||||
} else {
|
||||
log.Warn("Wrong! Try to set lastSync ts for non-existing segment", zap.Int64("segmentID", segID))
|
||||
}
|
||||
}
|
||||
|
||||
// filterSegments return segments with same partitionID for all segments
|
||||
// get all segments
|
||||
func (c *ChannelMeta) filterSegments(partitionID UniqueID) []*Segment {
|
||||
|
@ -728,11 +712,6 @@ func (c *ChannelMeta) mergeFlushedSegments(ctx context.Context, seg *Segment, pl
|
|||
// release bloom filter
|
||||
s.currentStat = nil
|
||||
s.historyStats = nil
|
||||
|
||||
// set correct lastSyncTs for 10-mins channelCP force sync.
|
||||
if s.lastSyncTs < seg.lastSyncTs {
|
||||
seg.lastSyncTs = s.lastSyncTs
|
||||
}
|
||||
}
|
||||
|
||||
// only store segments with numRows > 0
|
||||
|
|
|
@ -436,7 +436,6 @@ func TestDataSyncService_Close(t *testing.T) {
|
|||
|
||||
channel := newChannel(insertChannelName, collMeta.ID, collMeta.GetSchema(), mockRootCoord, cm)
|
||||
channel.syncPolicies = []segmentSyncPolicy{
|
||||
syncPeriodically(),
|
||||
syncMemoryTooHigh(),
|
||||
}
|
||||
atimeTickSender := newTimeTickSender(mockDataCoord, 0)
|
||||
|
|
|
@ -503,7 +503,6 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID,
|
|||
segmentsToSync = append(segmentsToSync, task.segmentID)
|
||||
ibNode.channel.rollInsertBuffer(task.segmentID)
|
||||
ibNode.channel.RollPKstats(task.segmentID, pkStats)
|
||||
ibNode.channel.setSegmentLastSyncTs(task.segmentID, endPosition.GetTimestamp())
|
||||
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel).Inc()
|
||||
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.TotalLabel).Inc()
|
||||
if task.auto {
|
||||
|
|
|
@ -223,7 +223,6 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
|
|||
}
|
||||
|
||||
inMsg := genFlowGraphInsertMsg(insertChannelName)
|
||||
iBNode.channel.setSegmentLastSyncTs(UniqueID(1), tsoutil.ComposeTSByTime(time.Now().Add(-11*time.Minute), 0))
|
||||
assert.NotPanics(t, func() {
|
||||
res := iBNode.Operate([]flowgraph.Msg{&inMsg})
|
||||
assert.Subset(t, res[0].(*flowGraphMsg).segmentsToSync, []UniqueID{1})
|
||||
|
|
|
@ -18,6 +18,7 @@ package datanode
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
@ -55,7 +56,6 @@ type Segment struct {
|
|||
currentStat *storage.PkStatistics
|
||||
historyStats []*storage.PkStatistics
|
||||
|
||||
lastSyncTs Timestamp
|
||||
startPos *msgpb.MsgPosition // TODO readonly
|
||||
lazyLoading atomic.Value
|
||||
syncing atomic.Value
|
||||
|
@ -256,3 +256,24 @@ func (s *Segment) isBufferEmpty() bool {
|
|||
len(s.historyInsertBuf) == 0 &&
|
||||
len(s.historyDeleteBuf) == 0
|
||||
}
|
||||
|
||||
func (s *Segment) minBufferTs() uint64 {
|
||||
var minTs uint64 = math.MaxUint64
|
||||
if s.curInsertBuf != nil && s.curInsertBuf.startPos != nil && s.curInsertBuf.startPos.Timestamp < minTs {
|
||||
minTs = s.curInsertBuf.startPos.Timestamp
|
||||
}
|
||||
if s.curDeleteBuf != nil && s.curDeleteBuf.startPos != nil && s.curDeleteBuf.startPos.Timestamp < minTs {
|
||||
minTs = s.curDeleteBuf.startPos.Timestamp
|
||||
}
|
||||
for _, ib := range s.historyInsertBuf {
|
||||
if ib != nil && ib.startPos != nil && ib.startPos.Timestamp < minTs {
|
||||
minTs = ib.startPos.Timestamp
|
||||
}
|
||||
}
|
||||
for _, db := range s.historyDeleteBuf {
|
||||
if db != nil && db.startPos != nil && db.startPos.Timestamp < minTs {
|
||||
minTs = db.startPos.Timestamp
|
||||
}
|
||||
}
|
||||
return minTs
|
||||
}
|
||||
|
|
|
@ -21,10 +21,8 @@ import (
|
|||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -37,19 +35,19 @@ type segmentSyncPolicy func(segments []*Segment, ts Timestamp, needToSync *atomi
|
|||
// syncPeriodically get segmentSyncPolicy with segments sync periodically.
|
||||
func syncPeriodically() segmentSyncPolicy {
|
||||
return func(segments []*Segment, ts Timestamp, _ *atomic.Bool) []UniqueID {
|
||||
segsToSync := make([]UniqueID, 0)
|
||||
segmentsToSync := make([]UniqueID, 0)
|
||||
for _, seg := range segments {
|
||||
endTime := tsoutil.PhysicalTime(ts)
|
||||
lastSyncTime := tsoutil.PhysicalTime(seg.lastSyncTs)
|
||||
shouldSync := endTime.Sub(lastSyncTime) >= Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second) && !seg.isBufferEmpty()
|
||||
endPosTime := tsoutil.PhysicalTime(ts)
|
||||
minBufferTime := tsoutil.PhysicalTime(seg.minBufferTs())
|
||||
shouldSync := endPosTime.Sub(minBufferTime) >= Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)
|
||||
if shouldSync {
|
||||
segsToSync = append(segsToSync, seg.segmentID)
|
||||
segmentsToSync = append(segmentsToSync, seg.segmentID)
|
||||
}
|
||||
}
|
||||
if len(segsToSync) > 0 {
|
||||
log.Info("sync segment periodically", zap.Int64s("segmentID", segsToSync))
|
||||
if len(segmentsToSync) > 0 {
|
||||
log.Info("sync segment periodically", zap.Int64s("segmentID", segmentsToSync))
|
||||
}
|
||||
return segsToSync
|
||||
return segmentsToSync
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -76,59 +74,3 @@ func syncMemoryTooHigh() segmentSyncPolicy {
|
|||
return syncSegments
|
||||
}
|
||||
}
|
||||
|
||||
// syncCPLagTooBehind force sync the segments lagging too behind the channel checkPoint
|
||||
func syncCPLagTooBehind() segmentSyncPolicy {
|
||||
segmentMinTs := func(segment *Segment) uint64 {
|
||||
var minTs uint64 = math.MaxUint64
|
||||
if segment.curInsertBuf != nil && segment.curInsertBuf.startPos != nil && segment.curInsertBuf.startPos.Timestamp < minTs {
|
||||
minTs = segment.curInsertBuf.startPos.Timestamp
|
||||
}
|
||||
if segment.curDeleteBuf != nil && segment.curDeleteBuf.startPos != nil && segment.curDeleteBuf.startPos.Timestamp < minTs {
|
||||
minTs = segment.curDeleteBuf.startPos.Timestamp
|
||||
}
|
||||
for _, ib := range segment.historyInsertBuf {
|
||||
if ib != nil && ib.startPos != nil && ib.startPos.Timestamp < minTs {
|
||||
minTs = ib.startPos.Timestamp
|
||||
}
|
||||
}
|
||||
for _, db := range segment.historyDeleteBuf {
|
||||
if db != nil && db.startPos != nil && db.startPos.Timestamp < minTs {
|
||||
minTs = db.startPos.Timestamp
|
||||
}
|
||||
}
|
||||
return minTs
|
||||
}
|
||||
|
||||
return func(segments []*Segment, ts Timestamp, _ *atomic.Bool) []UniqueID {
|
||||
segmentsSyncPairs := make([][2]int64, 0)
|
||||
for _, segment := range segments {
|
||||
if segment == nil || segment.sType.Load() == nil || segment.getType() != datapb.SegmentType_Flushed {
|
||||
continue //cp behind check policy only towards flushed segments generated by compaction
|
||||
}
|
||||
segmentStartTime := tsoutil.PhysicalTime(segmentMinTs(segment))
|
||||
cpLagDuration := tsoutil.PhysicalTime(ts).Sub(segmentStartTime)
|
||||
shouldSync := cpLagDuration > Params.DataNodeCfg.CpLagPeriod.GetAsDuration(time.Second) && !segment.isBufferEmpty()
|
||||
lagInfo := [2]int64{segment.segmentID, cpLagDuration.Nanoseconds()}
|
||||
if shouldSync {
|
||||
segmentsSyncPairs = append(segmentsSyncPairs, lagInfo)
|
||||
}
|
||||
}
|
||||
segmentsIDsToSync := make([]UniqueID, 0)
|
||||
if len(segmentsSyncPairs) > 0 {
|
||||
if uint16(len(segmentsSyncPairs)) > Params.DataNodeCfg.CpLagSyncLimit.GetAsUint16() {
|
||||
//sort all segments according to the length of lag duration
|
||||
sort.Slice(segmentsSyncPairs, func(i, j int) bool {
|
||||
return segmentsSyncPairs[i][1] > segmentsSyncPairs[j][1]
|
||||
})
|
||||
segmentsSyncPairs = segmentsSyncPairs[:Params.DataNodeCfg.CpLagSyncLimit.GetAsUint16()]
|
||||
}
|
||||
segmentsIDsToSync = lo.Map(segmentsSyncPairs, func(t [2]int64, _ int) int64 {
|
||||
return t[0]
|
||||
})
|
||||
log.Info("sync segment for cp lag behind too much", zap.Int("segmentCount", len(segmentsIDsToSync)),
|
||||
zap.Int64s("segmentIDs", segmentsIDsToSync))
|
||||
}
|
||||
return segmentsIDsToSync
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,14 +21,11 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/atomic"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
)
|
||||
|
||||
func TestSyncPeriodically(t *testing.T) {
|
||||
|
@ -36,13 +33,12 @@ func TestSyncPeriodically(t *testing.T) {
|
|||
|
||||
tests := []struct {
|
||||
testName string
|
||||
lastTs time.Time
|
||||
ts time.Time
|
||||
bufferTs time.Time
|
||||
endPosTs time.Time
|
||||
isBufferEmpty bool
|
||||
shouldSyncNum int
|
||||
}{
|
||||
{"test buffer empty and stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), true, 0},
|
||||
{"test buffer empty and not stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second) / 2), true, 0},
|
||||
{"test buffer empty", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), true, 0},
|
||||
{"test buffer not empty and stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), false, 1},
|
||||
{"test buffer not empty and not stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second) / 2), false, 0},
|
||||
}
|
||||
|
@ -51,11 +47,15 @@ func TestSyncPeriodically(t *testing.T) {
|
|||
t.Run(test.testName, func(t *testing.T) {
|
||||
policy := syncPeriodically()
|
||||
segment := &Segment{}
|
||||
segment.lastSyncTs = tsoutil.ComposeTSByTime(test.lastTs, 0)
|
||||
if !test.isBufferEmpty {
|
||||
segment.curInsertBuf = &BufferData{}
|
||||
segment.setInsertBuffer(&BufferData{
|
||||
startPos: &msgpb.MsgPosition{
|
||||
Timestamp: tsoutil.ComposeTSByTime(test.bufferTs, 0),
|
||||
},
|
||||
})
|
||||
if test.isBufferEmpty {
|
||||
segment.curInsertBuf = nil
|
||||
}
|
||||
res := policy([]*Segment{segment}, tsoutil.ComposeTSByTime(test.ts, 0), nil)
|
||||
res := policy([]*Segment{segment}, tsoutil.ComposeTSByTime(test.endPosTs, 0), nil)
|
||||
assert.Equal(t, test.shouldSyncNum, len(res))
|
||||
})
|
||||
}
|
||||
|
@ -99,110 +99,3 @@ func TestSyncMemoryTooHigh(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncCpLagBehindTooMuch(t *testing.T) {
|
||||
nowTs := tsoutil.ComposeTSByTime(time.Now(), 0)
|
||||
paramtable.Get().Save(Params.DataNodeCfg.CpLagPeriod.Key, "60")
|
||||
paramtable.Get().Save(Params.DataNodeCfg.CpLagSyncLimit.Key, "2")
|
||||
laggedTs := tsoutil.AddPhysicalDurationOnTs(nowTs, -2*Params.DataNodeCfg.CpLagPeriod.GetAsDuration(time.Second))
|
||||
tests := []struct {
|
||||
testName string
|
||||
segments []*Segment
|
||||
idsToSync []int64
|
||||
}{
|
||||
{"test_current_buf_lag_behind",
|
||||
[]*Segment{
|
||||
{
|
||||
segmentID: 1,
|
||||
curInsertBuf: &BufferData{
|
||||
startPos: &msgpb.MsgPosition{
|
||||
Timestamp: laggedTs,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
segmentID: 2,
|
||||
curDeleteBuf: &DelDataBuf{
|
||||
startPos: &msgpb.MsgPosition{
|
||||
Timestamp: laggedTs,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]int64{1, 2},
|
||||
},
|
||||
{"test_history_buf_lag_behind",
|
||||
[]*Segment{
|
||||
{
|
||||
segmentID: 1,
|
||||
historyInsertBuf: []*BufferData{
|
||||
{
|
||||
startPos: &msgpb.MsgPosition{
|
||||
Timestamp: laggedTs,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
segmentID: 2,
|
||||
historyDeleteBuf: []*DelDataBuf{
|
||||
{
|
||||
startPos: &msgpb.MsgPosition{
|
||||
Timestamp: laggedTs,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
segmentID: 3,
|
||||
},
|
||||
},
|
||||
[]int64{1, 2},
|
||||
},
|
||||
{"test_cp_sync_limit",
|
||||
[]*Segment{
|
||||
{
|
||||
segmentID: 1,
|
||||
historyInsertBuf: []*BufferData{
|
||||
{
|
||||
startPos: &msgpb.MsgPosition{
|
||||
Timestamp: tsoutil.AddPhysicalDurationOnTs(laggedTs, -3*time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
segmentID: 2,
|
||||
historyDeleteBuf: []*DelDataBuf{
|
||||
{
|
||||
startPos: &msgpb.MsgPosition{
|
||||
Timestamp: tsoutil.AddPhysicalDurationOnTs(laggedTs, -2*time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
segmentID: 3,
|
||||
historyDeleteBuf: []*DelDataBuf{
|
||||
{
|
||||
startPos: &msgpb.MsgPosition{
|
||||
Timestamp: tsoutil.AddPhysicalDurationOnTs(laggedTs, -1*time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]int64{1, 2},
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
t.Run(test.testName, func(t *testing.T) {
|
||||
lo.ForEach(test.segments, func(segment *Segment, _ int) {
|
||||
segment.setType(datapb.SegmentType_Flushed)
|
||||
})
|
||||
policy := syncCPLagTooBehind()
|
||||
ids := policy(test.segments, tsoutil.ComposeTSByTime(time.Now(), 0), nil)
|
||||
assert.Exactly(t, test.idsToSync, ids)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -385,7 +385,6 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
|
|||
partitionID: partID,
|
||||
segmentID: req.GetCompactedTo(),
|
||||
numRows: req.GetNumOfRows(),
|
||||
lastSyncTs: tsoutil.GetCurrentTime(),
|
||||
}
|
||||
|
||||
err = channel.InitPKstats(ctx, targetSeg, req.GetStatsLogs(), tsoutil.GetCurrentTime())
|
||||
|
|
|
@ -2351,8 +2351,6 @@ type dataNodeConfig struct {
|
|||
FlushDeleteBufferBytes ParamItem `refreshable:"true"`
|
||||
BinLogMaxSize ParamItem `refreshable:"true"`
|
||||
SyncPeriod ParamItem `refreshable:"true"`
|
||||
CpLagPeriod ParamItem `refreshable:"true"`
|
||||
CpLagSyncLimit ParamItem `refreshable:"true"`
|
||||
|
||||
// watchEvent
|
||||
WatchEventTicklerInterval ParamItem `refreshable:"false"`
|
||||
|
@ -2476,24 +2474,6 @@ func (p *dataNodeConfig) init(base *BaseTable) {
|
|||
}
|
||||
p.SyncPeriod.Init(base.mgr)
|
||||
|
||||
p.CpLagPeriod = ParamItem{
|
||||
Key: "datanode.segment.cpLagPeriod",
|
||||
Version: "2.2.0",
|
||||
DefaultValue: "600",
|
||||
Doc: "The period to sync segments for cp lag period policy",
|
||||
Export: true,
|
||||
}
|
||||
p.CpLagPeriod.Init(base.mgr)
|
||||
|
||||
p.CpLagSyncLimit = ParamItem{
|
||||
Key: "datanode.segment.cpLagSyncLimit",
|
||||
Version: "2.2.0",
|
||||
DefaultValue: "10",
|
||||
Doc: "The limit to sync segments for cp lag period policy",
|
||||
Export: true,
|
||||
}
|
||||
p.CpLagSyncLimit.Init(base.mgr)
|
||||
|
||||
p.WatchEventTicklerInterval = ParamItem{
|
||||
Key: "datanode.segment.watchEventTicklerInterval",
|
||||
Version: "2.2.3",
|
||||
|
|
Loading…
Reference in New Issue