From af1d84e5e1066e58e67c25b3894f9e083ff1140a Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Tue, 20 Jun 2023 10:00:41 +0800 Subject: [PATCH] Support sync when cp is lagged (#24989) Signed-off-by: xiaofan-luan --- internal/core/conanfile.py | 2 +- internal/datanode/channel_meta.go | 1 + internal/datanode/data_sync_service_test.go | 28 ++++---- internal/datanode/segment_sync_policy.go | 45 +++++++++++- internal/datanode/segment_sync_policy_test.go | 68 +++++++++++++++++++ pkg/util/paramtable/component_param.go | 10 +++ 6 files changed, 139 insertions(+), 15 deletions(-) diff --git a/internal/core/conanfile.py b/internal/core/conanfile.py index a935354489..8eabf4d60e 100644 --- a/internal/core/conanfile.py +++ b/internal/core/conanfile.py @@ -8,7 +8,7 @@ class MilvusConan(ConanFile): "boost/1.79.0", "onetbb/2021.7.0", "nlohmann_json/3.11.2", - "zstd/1.5.5", + "zstd/1.5.4", "lz4/1.9.4", "snappy/1.1.9", "lzo/2.10", diff --git a/internal/datanode/channel_meta.go b/internal/datanode/channel_meta.go index 4ed70c0db7..9cb3d160c5 100644 --- a/internal/datanode/channel_meta.go +++ b/internal/datanode/channel_meta.go @@ -156,6 +156,7 @@ func newChannel(channelName string, collID UniqueID, schema *schemapb.Collection syncPolicies: []segmentSyncPolicy{ syncPeriodically(), syncMemoryTooHigh(), + syncCPLagTooBehind(), }, metaService: metaService, diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 83bbe874ff..a810bbbcbd 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -435,14 +435,18 @@ func TestDataSyncService_Close(t *testing.T) { paramtable.Get().Reset(Params.DataNodeCfg.FlushInsertBufferSize.Key) channel := newChannel(insertChannelName, collMeta.ID, collMeta.GetSchema(), mockRootCoord, cm) + channel.syncPolicies = []segmentSyncPolicy{ + syncPeriodically(), + syncMemoryTooHigh(), + } atimeTickSender := newTimeTickManager(mockDataCoord, 0) - sync, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, alloc, dispClient, factory, vchan, signalCh, mockDataCoord, newCache(), cm, newCompactionExecutor(), genTestTickler(), 0, atimeTickSender) + syncService, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, alloc, dispClient, factory, vchan, signalCh, mockDataCoord, newCache(), cm, newCompactionExecutor(), genTestTickler(), 0, atimeTickSender) assert.NoError(t, err) - sync.flushListener = make(chan *segmentFlushPack, 10) - defer close(sync.flushListener) + syncService.flushListener = make(chan *segmentFlushPack, 10) + defer close(syncService.flushListener) - sync.start() + syncService.start() var ( dataFactory = NewDataFactory() @@ -525,26 +529,26 @@ func TestDataSyncService_Close(t *testing.T) { assert.NoError(t, err) // wait for delete, no auto flush leads to all data in buffer. - require.Eventually(t, func() bool { return sync.delBufferManager.GetEntriesNum(1) == 1 }, + require.Eventually(t, func() bool { return syncService.delBufferManager.GetEntriesNum(1) == 1 }, 5*time.Second, 100*time.Millisecond) - assert.Equal(t, 0, len(sync.flushListener)) + assert.Equal(t, 0, len(syncService.flushListener)) // close will trigger a force sync - sync.close() - assert.Eventually(t, func() bool { return len(sync.flushListener) == 1 }, + syncService.close() + assert.Eventually(t, func() bool { return len(syncService.flushListener) == 1 }, 5*time.Second, 100*time.Millisecond) - flushPack, ok := <-sync.flushListener + flushPack, ok := <-syncService.flushListener assert.True(t, ok) assert.Equal(t, UniqueID(1), flushPack.segmentID) assert.True(t, len(flushPack.insertLogs) == 12) assert.True(t, len(flushPack.statsLogs) == 1) assert.True(t, len(flushPack.deltaLogs) == 1) - <-sync.ctx.Done() + <-syncService.ctx.Done() // Double close is safe - sync.close() - <-sync.ctx.Done() + syncService.close() + <-syncService.ctx.Done() } func genBytes() (rawData []byte) { diff --git a/internal/datanode/segment_sync_policy.go b/internal/datanode/segment_sync_policy.go index 697386ac7a..ed6b90b276 100644 --- a/internal/datanode/segment_sync_policy.go +++ b/internal/datanode/segment_sync_policy.go @@ -46,8 +46,7 @@ func syncPeriodically() segmentSyncPolicy { } } if len(segsToSync) > 0 { - log.Debug("sync segment periodically", - zap.Int64s("segmentID", segsToSync)) + log.Info("sync segment periodically", zap.Int64s("segmentID", segsToSync)) } return segsToSync } @@ -76,3 +75,45 @@ func syncMemoryTooHigh() segmentSyncPolicy { return syncSegments } } + +// syncCPLagTooBehind force sync the segments lagging too behind the channel checkPoint +func syncCPLagTooBehind() segmentSyncPolicy { + segmentMinTs := func(segment *Segment) uint64 { + var minTs uint64 = math.MaxUint64 + if segment.curInsertBuf != nil && segment.curInsertBuf.startPos != nil && segment.curInsertBuf.startPos.Timestamp < minTs { + minTs = segment.curInsertBuf.startPos.Timestamp + } + if segment.curDeleteBuf != nil && segment.curDeleteBuf.startPos != nil && segment.curDeleteBuf.startPos.Timestamp < minTs { + minTs = segment.curDeleteBuf.startPos.Timestamp + } + for _, ib := range segment.historyInsertBuf { + if ib != nil && ib.startPos != nil && ib.startPos.Timestamp < minTs { + minTs = ib.startPos.Timestamp + } + } + for _, db := range segment.historyDeleteBuf { + if db != nil && db.startPos != nil && db.startPos.Timestamp < minTs { + minTs = db.startPos.Timestamp + } + } + return minTs + } + + return func(segments []*Segment, ts Timestamp, _ *atomic.Bool) []UniqueID { + segmentsToSync := make([]UniqueID, 0) + for _, segment := range segments { + segmentMinTs := segmentMinTs(segment) + segmentStartTime := tsoutil.PhysicalTime(segmentMinTs) + cpLagDuration := tsoutil.PhysicalTime(ts).Sub(segmentStartTime) + shouldSync := cpLagDuration > Params.DataNodeCfg.CpLagPeriod.GetAsDuration(time.Second) && !segment.isBufferEmpty() + if shouldSync { + segmentsToSync = append(segmentsToSync, segment.segmentID) + } + } + if len(segmentsToSync) > 0 { + log.Info("sync segment for cp lag behind too much", + zap.Int64s("segmentID", segmentsToSync)) + } + return segmentsToSync + } +} diff --git a/internal/datanode/segment_sync_policy_test.go b/internal/datanode/segment_sync_policy_test.go index 21bbe28bee..4af3057041 100644 --- a/internal/datanode/segment_sync_policy_test.go +++ b/internal/datanode/segment_sync_policy_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/stretchr/testify/assert" "go.uber.org/atomic" @@ -94,3 +95,70 @@ func TestSyncMemoryTooHigh(t *testing.T) { }) } } + +func TestSyncCpLagBehindTooMuch(t *testing.T) { + nowTs := tsoutil.ComposeTSByTime(time.Now(), 0) + laggedTs := tsoutil.AddPhysicalDurationOnTs(nowTs, -2*Params.DataNodeCfg.CpLagPeriod.GetAsDuration(time.Second)) + tests := []struct { + testName string + segments []*Segment + idsToSync []int64 + }{ + {"test_current_buf_lag_behind", + []*Segment{ + { + segmentID: 1, + curInsertBuf: &BufferData{ + startPos: &msgpb.MsgPosition{ + Timestamp: laggedTs, + }, + }, + }, + { + segmentID: 2, + curDeleteBuf: &DelDataBuf{ + startPos: &msgpb.MsgPosition{ + Timestamp: laggedTs, + }, + }, + }, + }, + []int64{1, 2}, + }, + {"test_history_buf_lag_behind", + []*Segment{ + { + segmentID: 1, + historyInsertBuf: []*BufferData{ + { + startPos: &msgpb.MsgPosition{ + Timestamp: laggedTs, + }, + }, + }, + }, + { + segmentID: 2, + historyDeleteBuf: []*DelDataBuf{ + { + startPos: &msgpb.MsgPosition{ + Timestamp: laggedTs, + }, + }, + }, + }, + { + segmentID: 3, + }, + }, + []int64{1, 2}, + }, + } + for _, test := range tests { + t.Run(test.testName, func(t *testing.T) { + policy := syncCPLagTooBehind() + ids := policy(test.segments, tsoutil.ComposeTSByTime(time.Now(), 0), nil) + assert.ElementsMatch(t, test.idsToSync, ids) + }) + } +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index bff5c2960d..157dfebae5 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2138,6 +2138,7 @@ type dataNodeConfig struct { FlushDeleteBufferBytes ParamItem `refreshable:"true"` BinLogMaxSize ParamItem `refreshable:"true"` SyncPeriod ParamItem `refreshable:"true"` + CpLagPeriod ParamItem `refreshable:"true"` // watchEvent WatchEventTicklerInterval ParamItem `refreshable:"false"` @@ -2258,6 +2259,15 @@ func (p *dataNodeConfig) init(base *BaseTable) { } p.SyncPeriod.Init(base.mgr) + p.CpLagPeriod = ParamItem{ + Key: "datanode.segment.cpLagPeriod", + Version: "2.2.0", + DefaultValue: "600", + Doc: "The period to sync segments if buffer is not empty.", + Export: true, + } + p.CpLagPeriod.Init(base.mgr) + p.WatchEventTicklerInterval = ParamItem{ Key: "datanode.segment.watchEventTicklerInterval", Version: "2.2.3",