mirror of https://github.com/milvus-io/milvus.git
Support sync when cp is lagged (#24989)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>pull/24139/head
parent
a6310050c3
commit
af1d84e5e1
|
@ -8,7 +8,7 @@ class MilvusConan(ConanFile):
|
||||||
"boost/1.79.0",
|
"boost/1.79.0",
|
||||||
"onetbb/2021.7.0",
|
"onetbb/2021.7.0",
|
||||||
"nlohmann_json/3.11.2",
|
"nlohmann_json/3.11.2",
|
||||||
"zstd/1.5.5",
|
"zstd/1.5.4",
|
||||||
"lz4/1.9.4",
|
"lz4/1.9.4",
|
||||||
"snappy/1.1.9",
|
"snappy/1.1.9",
|
||||||
"lzo/2.10",
|
"lzo/2.10",
|
||||||
|
|
|
@ -156,6 +156,7 @@ func newChannel(channelName string, collID UniqueID, schema *schemapb.Collection
|
||||||
syncPolicies: []segmentSyncPolicy{
|
syncPolicies: []segmentSyncPolicy{
|
||||||
syncPeriodically(),
|
syncPeriodically(),
|
||||||
syncMemoryTooHigh(),
|
syncMemoryTooHigh(),
|
||||||
|
syncCPLagTooBehind(),
|
||||||
},
|
},
|
||||||
|
|
||||||
metaService: metaService,
|
metaService: metaService,
|
||||||
|
|
|
@ -435,14 +435,18 @@ func TestDataSyncService_Close(t *testing.T) {
|
||||||
paramtable.Get().Reset(Params.DataNodeCfg.FlushInsertBufferSize.Key)
|
paramtable.Get().Reset(Params.DataNodeCfg.FlushInsertBufferSize.Key)
|
||||||
|
|
||||||
channel := newChannel(insertChannelName, collMeta.ID, collMeta.GetSchema(), mockRootCoord, cm)
|
channel := newChannel(insertChannelName, collMeta.ID, collMeta.GetSchema(), mockRootCoord, cm)
|
||||||
|
channel.syncPolicies = []segmentSyncPolicy{
|
||||||
|
syncPeriodically(),
|
||||||
|
syncMemoryTooHigh(),
|
||||||
|
}
|
||||||
atimeTickSender := newTimeTickManager(mockDataCoord, 0)
|
atimeTickSender := newTimeTickManager(mockDataCoord, 0)
|
||||||
sync, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, alloc, dispClient, factory, vchan, signalCh, mockDataCoord, newCache(), cm, newCompactionExecutor(), genTestTickler(), 0, atimeTickSender)
|
syncService, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, alloc, dispClient, factory, vchan, signalCh, mockDataCoord, newCache(), cm, newCompactionExecutor(), genTestTickler(), 0, atimeTickSender)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
sync.flushListener = make(chan *segmentFlushPack, 10)
|
syncService.flushListener = make(chan *segmentFlushPack, 10)
|
||||||
defer close(sync.flushListener)
|
defer close(syncService.flushListener)
|
||||||
|
|
||||||
sync.start()
|
syncService.start()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
dataFactory = NewDataFactory()
|
dataFactory = NewDataFactory()
|
||||||
|
@ -525,26 +529,26 @@ func TestDataSyncService_Close(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// wait for delete, no auto flush leads to all data in buffer.
|
// wait for delete, no auto flush leads to all data in buffer.
|
||||||
require.Eventually(t, func() bool { return sync.delBufferManager.GetEntriesNum(1) == 1 },
|
require.Eventually(t, func() bool { return syncService.delBufferManager.GetEntriesNum(1) == 1 },
|
||||||
5*time.Second, 100*time.Millisecond)
|
5*time.Second, 100*time.Millisecond)
|
||||||
assert.Equal(t, 0, len(sync.flushListener))
|
assert.Equal(t, 0, len(syncService.flushListener))
|
||||||
|
|
||||||
// close will trigger a force sync
|
// close will trigger a force sync
|
||||||
sync.close()
|
syncService.close()
|
||||||
assert.Eventually(t, func() bool { return len(sync.flushListener) == 1 },
|
assert.Eventually(t, func() bool { return len(syncService.flushListener) == 1 },
|
||||||
5*time.Second, 100*time.Millisecond)
|
5*time.Second, 100*time.Millisecond)
|
||||||
flushPack, ok := <-sync.flushListener
|
flushPack, ok := <-syncService.flushListener
|
||||||
assert.True(t, ok)
|
assert.True(t, ok)
|
||||||
assert.Equal(t, UniqueID(1), flushPack.segmentID)
|
assert.Equal(t, UniqueID(1), flushPack.segmentID)
|
||||||
assert.True(t, len(flushPack.insertLogs) == 12)
|
assert.True(t, len(flushPack.insertLogs) == 12)
|
||||||
assert.True(t, len(flushPack.statsLogs) == 1)
|
assert.True(t, len(flushPack.statsLogs) == 1)
|
||||||
assert.True(t, len(flushPack.deltaLogs) == 1)
|
assert.True(t, len(flushPack.deltaLogs) == 1)
|
||||||
|
|
||||||
<-sync.ctx.Done()
|
<-syncService.ctx.Done()
|
||||||
|
|
||||||
// Double close is safe
|
// Double close is safe
|
||||||
sync.close()
|
syncService.close()
|
||||||
<-sync.ctx.Done()
|
<-syncService.ctx.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
func genBytes() (rawData []byte) {
|
func genBytes() (rawData []byte) {
|
||||||
|
|
|
@ -46,8 +46,7 @@ func syncPeriodically() segmentSyncPolicy {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(segsToSync) > 0 {
|
if len(segsToSync) > 0 {
|
||||||
log.Debug("sync segment periodically",
|
log.Info("sync segment periodically", zap.Int64s("segmentID", segsToSync))
|
||||||
zap.Int64s("segmentID", segsToSync))
|
|
||||||
}
|
}
|
||||||
return segsToSync
|
return segsToSync
|
||||||
}
|
}
|
||||||
|
@ -76,3 +75,45 @@ func syncMemoryTooHigh() segmentSyncPolicy {
|
||||||
return syncSegments
|
return syncSegments
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// syncCPLagTooBehind force sync the segments lagging too behind the channel checkPoint
|
||||||
|
func syncCPLagTooBehind() segmentSyncPolicy {
|
||||||
|
segmentMinTs := func(segment *Segment) uint64 {
|
||||||
|
var minTs uint64 = math.MaxUint64
|
||||||
|
if segment.curInsertBuf != nil && segment.curInsertBuf.startPos != nil && segment.curInsertBuf.startPos.Timestamp < minTs {
|
||||||
|
minTs = segment.curInsertBuf.startPos.Timestamp
|
||||||
|
}
|
||||||
|
if segment.curDeleteBuf != nil && segment.curDeleteBuf.startPos != nil && segment.curDeleteBuf.startPos.Timestamp < minTs {
|
||||||
|
minTs = segment.curDeleteBuf.startPos.Timestamp
|
||||||
|
}
|
||||||
|
for _, ib := range segment.historyInsertBuf {
|
||||||
|
if ib != nil && ib.startPos != nil && ib.startPos.Timestamp < minTs {
|
||||||
|
minTs = ib.startPos.Timestamp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, db := range segment.historyDeleteBuf {
|
||||||
|
if db != nil && db.startPos != nil && db.startPos.Timestamp < minTs {
|
||||||
|
minTs = db.startPos.Timestamp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return minTs
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(segments []*Segment, ts Timestamp, _ *atomic.Bool) []UniqueID {
|
||||||
|
segmentsToSync := make([]UniqueID, 0)
|
||||||
|
for _, segment := range segments {
|
||||||
|
segmentMinTs := segmentMinTs(segment)
|
||||||
|
segmentStartTime := tsoutil.PhysicalTime(segmentMinTs)
|
||||||
|
cpLagDuration := tsoutil.PhysicalTime(ts).Sub(segmentStartTime)
|
||||||
|
shouldSync := cpLagDuration > Params.DataNodeCfg.CpLagPeriod.GetAsDuration(time.Second) && !segment.isBufferEmpty()
|
||||||
|
if shouldSync {
|
||||||
|
segmentsToSync = append(segmentsToSync, segment.segmentID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(segmentsToSync) > 0 {
|
||||||
|
log.Info("sync segment for cp lag behind too much",
|
||||||
|
zap.Int64s("segmentID", segmentsToSync))
|
||||||
|
}
|
||||||
|
return segmentsToSync
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
|
@ -94,3 +95,70 @@ func TestSyncMemoryTooHigh(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSyncCpLagBehindTooMuch(t *testing.T) {
|
||||||
|
nowTs := tsoutil.ComposeTSByTime(time.Now(), 0)
|
||||||
|
laggedTs := tsoutil.AddPhysicalDurationOnTs(nowTs, -2*Params.DataNodeCfg.CpLagPeriod.GetAsDuration(time.Second))
|
||||||
|
tests := []struct {
|
||||||
|
testName string
|
||||||
|
segments []*Segment
|
||||||
|
idsToSync []int64
|
||||||
|
}{
|
||||||
|
{"test_current_buf_lag_behind",
|
||||||
|
[]*Segment{
|
||||||
|
{
|
||||||
|
segmentID: 1,
|
||||||
|
curInsertBuf: &BufferData{
|
||||||
|
startPos: &msgpb.MsgPosition{
|
||||||
|
Timestamp: laggedTs,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
segmentID: 2,
|
||||||
|
curDeleteBuf: &DelDataBuf{
|
||||||
|
startPos: &msgpb.MsgPosition{
|
||||||
|
Timestamp: laggedTs,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
[]int64{1, 2},
|
||||||
|
},
|
||||||
|
{"test_history_buf_lag_behind",
|
||||||
|
[]*Segment{
|
||||||
|
{
|
||||||
|
segmentID: 1,
|
||||||
|
historyInsertBuf: []*BufferData{
|
||||||
|
{
|
||||||
|
startPos: &msgpb.MsgPosition{
|
||||||
|
Timestamp: laggedTs,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
segmentID: 2,
|
||||||
|
historyDeleteBuf: []*DelDataBuf{
|
||||||
|
{
|
||||||
|
startPos: &msgpb.MsgPosition{
|
||||||
|
Timestamp: laggedTs,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
segmentID: 3,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
[]int64{1, 2},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.testName, func(t *testing.T) {
|
||||||
|
policy := syncCPLagTooBehind()
|
||||||
|
ids := policy(test.segments, tsoutil.ComposeTSByTime(time.Now(), 0), nil)
|
||||||
|
assert.ElementsMatch(t, test.idsToSync, ids)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -2138,6 +2138,7 @@ type dataNodeConfig struct {
|
||||||
FlushDeleteBufferBytes ParamItem `refreshable:"true"`
|
FlushDeleteBufferBytes ParamItem `refreshable:"true"`
|
||||||
BinLogMaxSize ParamItem `refreshable:"true"`
|
BinLogMaxSize ParamItem `refreshable:"true"`
|
||||||
SyncPeriod ParamItem `refreshable:"true"`
|
SyncPeriod ParamItem `refreshable:"true"`
|
||||||
|
CpLagPeriod ParamItem `refreshable:"true"`
|
||||||
|
|
||||||
// watchEvent
|
// watchEvent
|
||||||
WatchEventTicklerInterval ParamItem `refreshable:"false"`
|
WatchEventTicklerInterval ParamItem `refreshable:"false"`
|
||||||
|
@ -2258,6 +2259,15 @@ func (p *dataNodeConfig) init(base *BaseTable) {
|
||||||
}
|
}
|
||||||
p.SyncPeriod.Init(base.mgr)
|
p.SyncPeriod.Init(base.mgr)
|
||||||
|
|
||||||
|
p.CpLagPeriod = ParamItem{
|
||||||
|
Key: "datanode.segment.cpLagPeriod",
|
||||||
|
Version: "2.2.0",
|
||||||
|
DefaultValue: "600",
|
||||||
|
Doc: "The period to sync segments if buffer is not empty.",
|
||||||
|
Export: true,
|
||||||
|
}
|
||||||
|
p.CpLagPeriod.Init(base.mgr)
|
||||||
|
|
||||||
p.WatchEventTicklerInterval = ParamItem{
|
p.WatchEventTicklerInterval = ParamItem{
|
||||||
Key: "datanode.segment.watchEventTicklerInterval",
|
Key: "datanode.segment.watchEventTicklerInterval",
|
||||||
Version: "2.2.3",
|
Version: "2.2.3",
|
||||||
|
|
Loading…
Reference in New Issue