enhance: Prevent the backlog of channelCP update tasks, perform batch updates of channelCPs (#30941) (#31024)

This PR includes the following adjustments:

1. To prevent channelCP update task backlog, only one task with the same
vchannel is retained in the updater. Additionally, the lastUpdateTime is
refreshed after the flowgraph submits the update task, rather than in
the callBack function.
2. Batch updates of multiple vchannel checkpoints are performed in the
UpdateChannelCheckpoint RPC (default batch size is 128). Additionally,
the lock for channelCPs in DataCoord meta has been switched from key
lock to global lock.
3. The concurrency of UpdateChannelCheckpoint RPCs in the datanode has
been reduced from 1000 to 10.

issue: https://github.com/milvus-io/milvus/issues/30004

pr: https://github.com/milvus-io/milvus/pull/30941

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/31035/head
yihao.dai 2024-03-05 14:27:01 +08:00 committed by GitHub
parent b7635ed989
commit 91d17870d6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 502 additions and 1367 deletions

View File

@ -478,9 +478,8 @@ dataNode:
# suggest to set it bigger on large collection numbers to avoid blocking # suggest to set it bigger on large collection numbers to avoid blocking
workPoolSize: -1 workPoolSize: -1
# specify the size of global work pool for channel checkpoint updating # specify the size of global work pool for channel checkpoint updating
# if this parameter <= 0, will set it as 1000 # if this parameter <= 0, will set it as 10
# suggest to set it bigger on large collection numbers to avoid blocking updateChannelCheckpointMaxParallel: 10
updateChannelCheckpointMaxParallel: 1000
# Configures the system log output. # Configures the system log output.
log: log:

View File

@ -343,7 +343,7 @@ func createMetaForRecycleUnusedIndexes(catalog metastore.DataCoordCatalog) *meta
catalog: catalog, catalog: catalog,
collections: nil, collections: nil,
segments: nil, segments: nil,
channelCPs: nil, channelCPs: newChannelCps(),
chunkManager: nil, chunkManager: nil,
indexMeta: &indexMeta{ indexMeta: &indexMeta{
catalog: catalog, catalog: catalog,
@ -787,13 +787,14 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
mock.Anything, mock.Anything,
).Return(nil) ).Return(nil)
channelCPs := newChannelCps()
channelCPs.checkpoints["dmlChannel"] = &msgpb.MsgPosition{
Timestamp: 1000,
}
m := &meta{ m := &meta{
catalog: catalog, catalog: catalog,
channelCPs: map[string]*msgpb.MsgPosition{ channelCPs: channelCPs,
"dmlChannel": {
Timestamp: 1000,
},
},
segments: &SegmentsInfo{ segments: &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{ segments: map[UniqueID]*SegmentInfo{
segID: { segID: {

View File

@ -53,14 +53,25 @@ type meta struct {
sync.RWMutex sync.RWMutex
ctx context.Context ctx context.Context
catalog metastore.DataCoordCatalog catalog metastore.DataCoordCatalog
collections map[UniqueID]*collectionInfo // collection id to collection info collections map[UniqueID]*collectionInfo // collection id to collection info
segments *SegmentsInfo // segment id to segment info segments *SegmentsInfo // segment id to segment info
channelCPs map[string]*msgpb.MsgPosition // vChannel -> channel checkpoint/see position channelCPs *channelCPs // vChannel -> channel checkpoint/see position
chunkManager storage.ChunkManager chunkManager storage.ChunkManager
indexMeta *indexMeta indexMeta *indexMeta
} }
type channelCPs struct {
sync.RWMutex
checkpoints map[string]*msgpb.MsgPosition
}
func newChannelCps() *channelCPs {
return &channelCPs{
checkpoints: make(map[string]*msgpb.MsgPosition),
}
}
// A local cache of segment metric update. Must call commit() to take effect. // A local cache of segment metric update. Must call commit() to take effect.
type segMetricMutation struct { type segMetricMutation struct {
stateChange map[string]int // segment state -> state change count (to increase or decrease). stateChange map[string]int // segment state -> state change count (to increase or decrease).
@ -89,7 +100,7 @@ func newMeta(ctx context.Context, catalog metastore.DataCoordCatalog, chunkManag
catalog: catalog, catalog: catalog,
collections: make(map[UniqueID]*collectionInfo), collections: make(map[UniqueID]*collectionInfo),
segments: NewSegmentsInfo(), segments: NewSegmentsInfo(),
channelCPs: make(map[string]*msgpb.MsgPosition), channelCPs: newChannelCps(),
chunkManager: chunkManager, chunkManager: chunkManager,
indexMeta: indexMeta, indexMeta: indexMeta,
} }
@ -148,7 +159,7 @@ func (m *meta) reloadFromKV() error {
for vChannel, pos := range channelCPs { for vChannel, pos := range channelCPs {
// for 2.2.2 issue https://github.com/milvus-io/milvus/issues/22181 // for 2.2.2 issue https://github.com/milvus-io/milvus/issues/22181
pos.ChannelName = vChannel pos.ChannelName = vChannel
m.channelCPs[vChannel] = pos m.channelCPs.checkpoints[vChannel] = pos
} }
log.Info("DataCoord meta reloadFromKV done", zap.Duration("duration", record.ElapseSpan())) log.Info("DataCoord meta reloadFromKV done", zap.Duration("duration", record.ElapseSpan()))
return nil return nil
@ -1298,16 +1309,16 @@ func (m *meta) UpdateChannelCheckpoint(vChannel string, pos *msgpb.MsgPosition)
return fmt.Errorf("channelCP is nil, vChannel=%s", vChannel) return fmt.Errorf("channelCP is nil, vChannel=%s", vChannel)
} }
m.Lock() m.channelCPs.Lock()
defer m.Unlock() defer m.channelCPs.Unlock()
oldPosition, ok := m.channelCPs[vChannel] oldPosition, ok := m.channelCPs.checkpoints[vChannel]
if !ok || oldPosition.Timestamp < pos.Timestamp { if !ok || oldPosition.Timestamp < pos.Timestamp {
err := m.catalog.SaveChannelCheckpoint(m.ctx, vChannel, pos) err := m.catalog.SaveChannelCheckpoint(m.ctx, vChannel, pos)
if err != nil { if err != nil {
return err return err
} }
m.channelCPs[vChannel] = pos m.channelCPs.checkpoints[vChannel] = pos
ts, _ := tsoutil.ParseTS(pos.Timestamp) ts, _ := tsoutil.ParseTS(pos.Timestamp)
log.Info("UpdateChannelCheckpoint done", log.Info("UpdateChannelCheckpoint done",
zap.String("vChannel", vChannel), zap.String("vChannel", vChannel),
@ -1320,23 +1331,53 @@ func (m *meta) UpdateChannelCheckpoint(vChannel string, pos *msgpb.MsgPosition)
return nil return nil
} }
// UpdateChannelCheckpoints updates and saves channel checkpoints.
func (m *meta) UpdateChannelCheckpoints(positions []*msgpb.MsgPosition) error {
m.channelCPs.Lock()
defer m.channelCPs.Unlock()
toUpdates := lo.Filter(positions, func(pos *msgpb.MsgPosition, _ int) bool {
if pos == nil || pos.GetMsgID() == nil || pos.GetChannelName() == "" {
log.Warn("illegal channel cp", zap.Any("pos", pos))
return false
}
vChannel := pos.GetChannelName()
oldPosition, ok := m.channelCPs.checkpoints[vChannel]
return !ok || oldPosition.Timestamp < pos.Timestamp
})
err := m.catalog.SaveChannelCheckpoints(m.ctx, toUpdates)
if err != nil {
return err
}
for _, pos := range toUpdates {
channel := pos.GetChannelName()
m.channelCPs.checkpoints[channel] = pos
log.Info("UpdateChannelCheckpoint done", zap.String("channel", channel),
zap.Uint64("ts", pos.GetTimestamp()),
zap.Time("time", tsoutil.PhysicalTime(pos.GetTimestamp())))
ts, _ := tsoutil.ParseTS(pos.Timestamp)
metrics.DataCoordCheckpointUnixSeconds.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), channel).Set(float64(ts.Unix()))
}
return nil
}
func (m *meta) GetChannelCheckpoint(vChannel string) *msgpb.MsgPosition { func (m *meta) GetChannelCheckpoint(vChannel string) *msgpb.MsgPosition {
m.RLock() m.channelCPs.RLock()
defer m.RUnlock() defer m.channelCPs.RUnlock()
if m.channelCPs[vChannel] == nil { cp, ok := m.channelCPs.checkpoints[vChannel]
if !ok {
return nil return nil
} }
return proto.Clone(m.channelCPs[vChannel]).(*msgpb.MsgPosition) return proto.Clone(cp).(*msgpb.MsgPosition)
} }
func (m *meta) DropChannelCheckpoint(vChannel string) error { func (m *meta) DropChannelCheckpoint(vChannel string) error {
m.Lock() m.channelCPs.Lock()
defer m.Unlock() defer m.channelCPs.Unlock()
err := m.catalog.DropChannelCheckpoint(m.ctx, vChannel) err := m.catalog.DropChannelCheckpoint(m.ctx, vChannel)
if err != nil { if err != nil {
return err return err
} }
delete(m.channelCPs, vChannel) delete(m.channelCPs.checkpoints, vChannel)
log.Debug("DropChannelCheckpoint done", zap.String("vChannel", vChannel)) log.Debug("DropChannelCheckpoint done", zap.String("vChannel", vChannel))
return nil return nil
} }

View File

@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/kv"
mockkv "github.com/milvus-io/milvus/internal/kv/mocks" mockkv "github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
mocks2 "github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
@ -46,12 +47,12 @@ import (
type MetaReloadSuite struct { type MetaReloadSuite struct {
testutils.PromMetricsSuite testutils.PromMetricsSuite
catalog *mocks.DataCoordCatalog catalog *mocks2.DataCoordCatalog
meta *meta meta *meta
} }
func (suite *MetaReloadSuite) SetupTest() { func (suite *MetaReloadSuite) SetupTest() {
catalog := mocks.NewDataCoordCatalog(suite.T()) catalog := mocks2.NewDataCoordCatalog(suite.T())
suite.catalog = catalog suite.catalog = catalog
} }
@ -952,6 +953,22 @@ func TestChannelCP(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
}) })
t.Run("UpdateChannelCheckpoints", func(t *testing.T) {
meta, err := newMemoryMeta()
assert.NoError(t, err)
assert.Equal(t, 0, len(meta.channelCPs.checkpoints))
err = meta.UpdateChannelCheckpoints(nil)
assert.NoError(t, err)
assert.Equal(t, 0, len(meta.channelCPs.checkpoints))
err = meta.UpdateChannelCheckpoints([]*msgpb.MsgPosition{pos, {
ChannelName: "",
}})
assert.NoError(t, err)
assert.Equal(t, 1, len(meta.channelCPs.checkpoints))
})
t.Run("GetChannelCheckpoint", func(t *testing.T) { t.Run("GetChannelCheckpoint", func(t *testing.T) {
meta, err := newMemoryMeta() meta, err := newMemoryMeta()
assert.NoError(t, err) assert.NoError(t, err)
@ -983,7 +1000,7 @@ func TestChannelCP(t *testing.T) {
func Test_meta_GcConfirm(t *testing.T) { func Test_meta_GcConfirm(t *testing.T) {
m := &meta{} m := &meta{}
catalog := mocks.NewDataCoordCatalog(t) catalog := mocks2.NewDataCoordCatalog(t)
m.catalog = catalog m.catalog = catalog
catalog.On("GcConfirm", catalog.On("GcConfirm",

View File

@ -44,6 +44,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
mocks2 "github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
@ -389,7 +390,7 @@ func TestFlushForImport(t *testing.T) {
allocation, err = svr.segmentManager.allocSegmentForImport( allocation, err = svr.segmentManager.allocSegmentForImport(
context.TODO(), 0, 1, "ch-1", 1, 1) context.TODO(), 0, 1, "ch-1", 1, 1)
assert.NoError(t, err) assert.NoError(t, err)
catalog := mocks.NewDataCoordCatalog(t) catalog := mocks2.NewDataCoordCatalog(t)
catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(errors.New("mock err")) catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(errors.New("mock err"))
svr.meta.catalog = catalog svr.meta.catalog = catalog
req.SegmentIDs = []UniqueID{allocation.SegmentID} req.SegmentIDs = []UniqueID{allocation.SegmentID}
@ -3714,10 +3715,10 @@ func TestGetFlushAllState(t *testing.T) {
}, nil).Maybe() }, nil).Maybe()
} }
svr.meta.channelCPs = make(map[string]*msgpb.MsgPosition) svr.meta.channelCPs = newChannelCps()
for i, ts := range test.ChannelCPs { for i, ts := range test.ChannelCPs {
channel := vchannels[i] channel := vchannels[i]
svr.meta.channelCPs[channel] = &msgpb.MsgPosition{ svr.meta.channelCPs.checkpoints[channel] = &msgpb.MsgPosition{
ChannelName: channel, ChannelName: channel,
Timestamp: ts, Timestamp: ts,
} }
@ -3790,11 +3791,11 @@ func TestGetFlushAllStateWithDB(t *testing.T) {
CollectionName: collectionName, CollectionName: collectionName,
}, nil).Maybe() }, nil).Maybe()
svr.meta.channelCPs = make(map[string]*msgpb.MsgPosition) svr.meta.channelCPs = newChannelCps()
channelCPs := []Timestamp{100, 200} channelCPs := []Timestamp{100, 200}
for i, ts := range channelCPs { for i, ts := range channelCPs {
channel := vchannels[i] channel := vchannels[i]
svr.meta.channelCPs[channel] = &msgpb.MsgPosition{ svr.meta.channelCPs.checkpoints[channel] = &msgpb.MsgPosition{
ChannelName: channel, ChannelName: channel,
Timestamp: ts, Timestamp: ts,
} }
@ -4189,10 +4190,21 @@ func TestDataCoordServer_UpdateChannelCheckpoint(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.ErrorCode) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.ErrorCode)
req.Position = nil req = &datapb.UpdateChannelCheckpointRequest{
Base: &commonpb.MsgBase{
SourceID: paramtable.GetNodeID(),
},
VChannel: mockVChannel,
ChannelCheckpoints: []*msgpb.MsgPosition{{
ChannelName: mockPChannel,
Timestamp: 1000,
MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0},
}},
}
resp, err = svr.UpdateChannelCheckpoint(context.TODO(), req) resp, err = svr.UpdateChannelCheckpoint(context.TODO(), req)
assert.NoError(t, err) assert.NoError(t, err)
assert.EqualValues(t, commonpb.ErrorCode_UnexpectedError, resp.ErrorCode) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.ErrorCode)
}) })
} }

View File

@ -1381,9 +1381,19 @@ func (s *Server) UpdateChannelCheckpoint(ctx context.Context, req *datapb.Update
return merr.Status(err), nil return merr.Status(err), nil
} }
err := s.meta.UpdateChannelCheckpoint(req.GetVChannel(), req.GetPosition()) // For compatibility with old client
if req.GetVChannel() != "" && req.GetPosition() != nil {
err := s.meta.UpdateChannelCheckpoint(req.GetVChannel(), req.GetPosition())
if err != nil {
log.Warn("failed to UpdateChannelCheckpoint", zap.String("vChannel", req.GetVChannel()), zap.Error(err))
return merr.Status(err), nil
}
return merr.Success(), nil
}
err := s.meta.UpdateChannelCheckpoints(req.GetChannelCheckpoints())
if err != nil { if err != nil {
log.Warn("failed to UpdateChannelCheckpoint", zap.String("vChannel", req.GetVChannel()), zap.Error(err)) log.Warn("failed to update channel checkpoint", zap.Error(err))
return merr.Status(err), nil return merr.Status(err), nil
} }

View File

@ -11,8 +11,8 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/types"

View File

@ -46,7 +46,7 @@ type DataCoord interface {
AssignSegmentID(ctx context.Context, reqs ...*datapb.SegmentIDRequest) ([]typeutil.UniqueID, error) AssignSegmentID(ctx context.Context, reqs ...*datapb.SegmentIDRequest) ([]typeutil.UniqueID, error)
ReportTimeTick(ctx context.Context, msgs []*msgpb.DataNodeTtMsg) error ReportTimeTick(ctx context.Context, msgs []*msgpb.DataNodeTtMsg) error
GetSegmentInfo(ctx context.Context, segmentIDs []int64) ([]*datapb.SegmentInfo, error) GetSegmentInfo(ctx context.Context, segmentIDs []int64) ([]*datapb.SegmentInfo, error)
UpdateChannelCheckpoint(ctx context.Context, channelName string, cp *msgpb.MsgPosition) error UpdateChannelCheckpoint(ctx context.Context, channelCPs []*msgpb.MsgPosition) error
SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) error SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) error
DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error)
UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) error UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) error

View File

@ -2,6 +2,7 @@ package broker
import ( import (
"context" "context"
"time"
"github.com/samber/lo" "github.com/samber/lo"
"go.uber.org/zap" "go.uber.org/zap"
@ -82,24 +83,24 @@ func (dc *dataCoordBroker) GetSegmentInfo(ctx context.Context, segmentIDs []int6
return infoResp.Infos, nil return infoResp.Infos, nil
} }
func (dc *dataCoordBroker) UpdateChannelCheckpoint(ctx context.Context, channelName string, cp *msgpb.MsgPosition) error { func (dc *dataCoordBroker) UpdateChannelCheckpoint(ctx context.Context, channelCPs []*msgpb.MsgPosition) error {
channelCPTs, _ := tsoutil.ParseTS(cp.GetTimestamp())
log := log.Ctx(ctx).With(
zap.String("channelName", channelName),
zap.Time("channelCheckpointTime", channelCPTs),
)
req := &datapb.UpdateChannelCheckpointRequest{ req := &datapb.UpdateChannelCheckpointRequest{
Base: commonpbutil.NewMsgBase( Base: commonpbutil.NewMsgBase(
commonpbutil.WithSourceID(paramtable.GetNodeID()), commonpbutil.WithSourceID(paramtable.GetNodeID()),
), ),
VChannel: channelName, ChannelCheckpoints: channelCPs,
Position: cp,
} }
resp, err := dc.client.UpdateChannelCheckpoint(ctx, req) resp, err := dc.client.UpdateChannelCheckpoint(ctx, req)
if err := merr.CheckRPCCall(resp, err); err != nil { if err = merr.CheckRPCCall(resp, err); err != nil {
log.Warn("failed to update channel checkpoint", zap.Error(err)) channels := lo.Map(channelCPs, func(pos *msgpb.MsgPosition, _ int) string {
return pos.GetChannelName()
})
channelTimes := lo.Map(channelCPs, func(pos *msgpb.MsgPosition, _ int) time.Time {
return tsoutil.PhysicalTime(pos.GetTimestamp())
})
log.Warn("failed to update channel checkpoint", zap.Strings("channelNames", channels),
zap.Times("channelCheckpointTimes", channelTimes), zap.Error(err))
return err return err
} }
return nil return nil

View File

@ -178,15 +178,14 @@ func (s *dataCoordSuite) TestUpdateChannelCheckpoint() {
s.Run("normal_case", func() { s.Run("normal_case", func() {
s.dc.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything). s.dc.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).
Run(func(_ context.Context, req *datapb.UpdateChannelCheckpointRequest, _ ...grpc.CallOption) { Run(func(_ context.Context, req *datapb.UpdateChannelCheckpointRequest, _ ...grpc.CallOption) {
s.Equal(channelName, req.GetVChannel()) cp := req.GetChannelCheckpoints()[0]
cp := req.GetPosition()
s.Equal(checkpoint.MsgID, cp.GetMsgID()) s.Equal(checkpoint.MsgID, cp.GetMsgID())
s.Equal(checkpoint.ChannelName, cp.GetChannelName()) s.Equal(checkpoint.ChannelName, cp.GetChannelName())
s.Equal(checkpoint.Timestamp, cp.GetTimestamp()) s.Equal(checkpoint.Timestamp, cp.GetTimestamp())
}). }).
Return(merr.Status(nil), nil) Return(merr.Status(nil), nil)
err := s.broker.UpdateChannelCheckpoint(ctx, channelName, checkpoint) err := s.broker.UpdateChannelCheckpoint(ctx, []*msgpb.MsgPosition{checkpoint})
s.NoError(err) s.NoError(err)
s.resetMock() s.resetMock()
}) })
@ -195,7 +194,7 @@ func (s *dataCoordSuite) TestUpdateChannelCheckpoint() {
s.dc.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything). s.dc.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).
Return(nil, errors.New("mock")) Return(nil, errors.New("mock"))
err := s.broker.UpdateChannelCheckpoint(ctx, channelName, checkpoint) err := s.broker.UpdateChannelCheckpoint(ctx, []*msgpb.MsgPosition{checkpoint})
s.Error(err) s.Error(err)
s.resetMock() s.resetMock()
}) })
@ -204,7 +203,7 @@ func (s *dataCoordSuite) TestUpdateChannelCheckpoint() {
s.dc.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything). s.dc.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).
Return(merr.Status(errors.New("mock")), nil) Return(merr.Status(errors.New("mock")), nil)
err := s.broker.UpdateChannelCheckpoint(ctx, channelName, checkpoint) err := s.broker.UpdateChannelCheckpoint(ctx, []*msgpb.MsgPosition{checkpoint})
s.Error(err) s.Error(err)
s.resetMock() s.resetMock()
}) })

View File

@ -65,8 +65,8 @@ type MockBroker_AllocTimestamp_Call struct {
} }
// AllocTimestamp is a helper method to define mock.On call // AllocTimestamp is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - num uint32 // - num uint32
func (_e *MockBroker_Expecter) AllocTimestamp(ctx interface{}, num interface{}) *MockBroker_AllocTimestamp_Call { func (_e *MockBroker_Expecter) AllocTimestamp(ctx interface{}, num interface{}) *MockBroker_AllocTimestamp_Call {
return &MockBroker_AllocTimestamp_Call{Call: _e.mock.On("AllocTimestamp", ctx, num)} return &MockBroker_AllocTimestamp_Call{Call: _e.mock.On("AllocTimestamp", ctx, num)}
} }
@ -127,8 +127,8 @@ type MockBroker_AssignSegmentID_Call struct {
} }
// AssignSegmentID is a helper method to define mock.On call // AssignSegmentID is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - reqs ...*datapb.SegmentIDRequest // - reqs ...*datapb.SegmentIDRequest
func (_e *MockBroker_Expecter) AssignSegmentID(ctx interface{}, reqs ...interface{}) *MockBroker_AssignSegmentID_Call { func (_e *MockBroker_Expecter) AssignSegmentID(ctx interface{}, reqs ...interface{}) *MockBroker_AssignSegmentID_Call {
return &MockBroker_AssignSegmentID_Call{Call: _e.mock.On("AssignSegmentID", return &MockBroker_AssignSegmentID_Call{Call: _e.mock.On("AssignSegmentID",
append([]interface{}{ctx}, reqs...)...)} append([]interface{}{ctx}, reqs...)...)}
@ -189,9 +189,9 @@ type MockBroker_DescribeCollection_Call struct {
} }
// DescribeCollection is a helper method to define mock.On call // DescribeCollection is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - collectionID int64 // - collectionID int64
// - ts uint64 // - ts uint64
func (_e *MockBroker_Expecter) DescribeCollection(ctx interface{}, collectionID interface{}, ts interface{}) *MockBroker_DescribeCollection_Call { func (_e *MockBroker_Expecter) DescribeCollection(ctx interface{}, collectionID interface{}, ts interface{}) *MockBroker_DescribeCollection_Call {
return &MockBroker_DescribeCollection_Call{Call: _e.mock.On("DescribeCollection", ctx, collectionID, ts)} return &MockBroker_DescribeCollection_Call{Call: _e.mock.On("DescribeCollection", ctx, collectionID, ts)}
} }
@ -245,8 +245,8 @@ type MockBroker_DropVirtualChannel_Call struct {
} }
// DropVirtualChannel is a helper method to define mock.On call // DropVirtualChannel is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - req *datapb.DropVirtualChannelRequest // - req *datapb.DropVirtualChannelRequest
func (_e *MockBroker_Expecter) DropVirtualChannel(ctx interface{}, req interface{}) *MockBroker_DropVirtualChannel_Call { func (_e *MockBroker_Expecter) DropVirtualChannel(ctx interface{}, req interface{}) *MockBroker_DropVirtualChannel_Call {
return &MockBroker_DropVirtualChannel_Call{Call: _e.mock.On("DropVirtualChannel", ctx, req)} return &MockBroker_DropVirtualChannel_Call{Call: _e.mock.On("DropVirtualChannel", ctx, req)}
} }
@ -300,8 +300,8 @@ type MockBroker_GetSegmentInfo_Call struct {
} }
// GetSegmentInfo is a helper method to define mock.On call // GetSegmentInfo is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - segmentIDs []int64 // - segmentIDs []int64
func (_e *MockBroker_Expecter) GetSegmentInfo(ctx interface{}, segmentIDs interface{}) *MockBroker_GetSegmentInfo_Call { func (_e *MockBroker_Expecter) GetSegmentInfo(ctx interface{}, segmentIDs interface{}) *MockBroker_GetSegmentInfo_Call {
return &MockBroker_GetSegmentInfo_Call{Call: _e.mock.On("GetSegmentInfo", ctx, segmentIDs)} return &MockBroker_GetSegmentInfo_Call{Call: _e.mock.On("GetSegmentInfo", ctx, segmentIDs)}
} }
@ -343,8 +343,8 @@ type MockBroker_ReportImport_Call struct {
} }
// ReportImport is a helper method to define mock.On call // ReportImport is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - req *rootcoordpb.ImportResult // - req *rootcoordpb.ImportResult
func (_e *MockBroker_Expecter) ReportImport(ctx interface{}, req interface{}) *MockBroker_ReportImport_Call { func (_e *MockBroker_Expecter) ReportImport(ctx interface{}, req interface{}) *MockBroker_ReportImport_Call {
return &MockBroker_ReportImport_Call{Call: _e.mock.On("ReportImport", ctx, req)} return &MockBroker_ReportImport_Call{Call: _e.mock.On("ReportImport", ctx, req)}
} }
@ -386,8 +386,8 @@ type MockBroker_ReportTimeTick_Call struct {
} }
// ReportTimeTick is a helper method to define mock.On call // ReportTimeTick is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - msgs []*msgpb.DataNodeTtMsg // - msgs []*msgpb.DataNodeTtMsg
func (_e *MockBroker_Expecter) ReportTimeTick(ctx interface{}, msgs interface{}) *MockBroker_ReportTimeTick_Call { func (_e *MockBroker_Expecter) ReportTimeTick(ctx interface{}, msgs interface{}) *MockBroker_ReportTimeTick_Call {
return &MockBroker_ReportTimeTick_Call{Call: _e.mock.On("ReportTimeTick", ctx, msgs)} return &MockBroker_ReportTimeTick_Call{Call: _e.mock.On("ReportTimeTick", ctx, msgs)}
} }
@ -429,8 +429,8 @@ type MockBroker_SaveBinlogPaths_Call struct {
} }
// SaveBinlogPaths is a helper method to define mock.On call // SaveBinlogPaths is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - req *datapb.SaveBinlogPathsRequest // - req *datapb.SaveBinlogPathsRequest
func (_e *MockBroker_Expecter) SaveBinlogPaths(ctx interface{}, req interface{}) *MockBroker_SaveBinlogPaths_Call { func (_e *MockBroker_Expecter) SaveBinlogPaths(ctx interface{}, req interface{}) *MockBroker_SaveBinlogPaths_Call {
return &MockBroker_SaveBinlogPaths_Call{Call: _e.mock.On("SaveBinlogPaths", ctx, req)} return &MockBroker_SaveBinlogPaths_Call{Call: _e.mock.On("SaveBinlogPaths", ctx, req)}
} }
@ -472,8 +472,8 @@ type MockBroker_SaveImportSegment_Call struct {
} }
// SaveImportSegment is a helper method to define mock.On call // SaveImportSegment is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - req *datapb.SaveImportSegmentRequest // - req *datapb.SaveImportSegmentRequest
func (_e *MockBroker_Expecter) SaveImportSegment(ctx interface{}, req interface{}) *MockBroker_SaveImportSegment_Call { func (_e *MockBroker_Expecter) SaveImportSegment(ctx interface{}, req interface{}) *MockBroker_SaveImportSegment_Call {
return &MockBroker_SaveImportSegment_Call{Call: _e.mock.On("SaveImportSegment", ctx, req)} return &MockBroker_SaveImportSegment_Call{Call: _e.mock.On("SaveImportSegment", ctx, req)}
} }
@ -527,9 +527,9 @@ type MockBroker_ShowPartitions_Call struct {
} }
// ShowPartitions is a helper method to define mock.On call // ShowPartitions is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - dbName string // - dbName string
// - collectionName string // - collectionName string
func (_e *MockBroker_Expecter) ShowPartitions(ctx interface{}, dbName interface{}, collectionName interface{}) *MockBroker_ShowPartitions_Call { func (_e *MockBroker_Expecter) ShowPartitions(ctx interface{}, dbName interface{}, collectionName interface{}) *MockBroker_ShowPartitions_Call {
return &MockBroker_ShowPartitions_Call{Call: _e.mock.On("ShowPartitions", ctx, dbName, collectionName)} return &MockBroker_ShowPartitions_Call{Call: _e.mock.On("ShowPartitions", ctx, dbName, collectionName)}
} }
@ -551,13 +551,13 @@ func (_c *MockBroker_ShowPartitions_Call) RunAndReturn(run func(context.Context,
return _c return _c
} }
// UpdateChannelCheckpoint provides a mock function with given fields: ctx, channelName, cp // UpdateChannelCheckpoint provides a mock function with given fields: ctx, channelCPs
func (_m *MockBroker) UpdateChannelCheckpoint(ctx context.Context, channelName string, cp *msgpb.MsgPosition) error { func (_m *MockBroker) UpdateChannelCheckpoint(ctx context.Context, channelCPs []*msgpb.MsgPosition) error {
ret := _m.Called(ctx, channelName, cp) ret := _m.Called(ctx, channelCPs)
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, *msgpb.MsgPosition) error); ok { if rf, ok := ret.Get(0).(func(context.Context, []*msgpb.MsgPosition) error); ok {
r0 = rf(ctx, channelName, cp) r0 = rf(ctx, channelCPs)
} else { } else {
r0 = ret.Error(0) r0 = ret.Error(0)
} }
@ -571,16 +571,15 @@ type MockBroker_UpdateChannelCheckpoint_Call struct {
} }
// UpdateChannelCheckpoint is a helper method to define mock.On call // UpdateChannelCheckpoint is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - channelName string // - channelCPs []*msgpb.MsgPosition
// - cp *msgpb.MsgPosition func (_e *MockBroker_Expecter) UpdateChannelCheckpoint(ctx interface{}, channelCPs interface{}) *MockBroker_UpdateChannelCheckpoint_Call {
func (_e *MockBroker_Expecter) UpdateChannelCheckpoint(ctx interface{}, channelName interface{}, cp interface{}) *MockBroker_UpdateChannelCheckpoint_Call { return &MockBroker_UpdateChannelCheckpoint_Call{Call: _e.mock.On("UpdateChannelCheckpoint", ctx, channelCPs)}
return &MockBroker_UpdateChannelCheckpoint_Call{Call: _e.mock.On("UpdateChannelCheckpoint", ctx, channelName, cp)}
} }
func (_c *MockBroker_UpdateChannelCheckpoint_Call) Run(run func(ctx context.Context, channelName string, cp *msgpb.MsgPosition)) *MockBroker_UpdateChannelCheckpoint_Call { func (_c *MockBroker_UpdateChannelCheckpoint_Call) Run(run func(ctx context.Context, channelCPs []*msgpb.MsgPosition)) *MockBroker_UpdateChannelCheckpoint_Call {
_c.Call.Run(func(args mock.Arguments) { _c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(*msgpb.MsgPosition)) run(args[0].(context.Context), args[1].([]*msgpb.MsgPosition))
}) })
return _c return _c
} }
@ -590,7 +589,7 @@ func (_c *MockBroker_UpdateChannelCheckpoint_Call) Return(_a0 error) *MockBroker
return _c return _c
} }
func (_c *MockBroker_UpdateChannelCheckpoint_Call) RunAndReturn(run func(context.Context, string, *msgpb.MsgPosition) error) *MockBroker_UpdateChannelCheckpoint_Call { func (_c *MockBroker_UpdateChannelCheckpoint_Call) RunAndReturn(run func(context.Context, []*msgpb.MsgPosition) error) *MockBroker_UpdateChannelCheckpoint_Call {
_c.Call.Return(run) _c.Call.Return(run)
return _c return _c
} }
@ -615,8 +614,8 @@ type MockBroker_UpdateSegmentStatistics_Call struct {
} }
// UpdateSegmentStatistics is a helper method to define mock.On call // UpdateSegmentStatistics is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - req *datapb.UpdateSegmentStatisticsRequest // - req *datapb.UpdateSegmentStatisticsRequest
func (_e *MockBroker_Expecter) UpdateSegmentStatistics(ctx interface{}, req interface{}) *MockBroker_UpdateSegmentStatistics_Call { func (_e *MockBroker_Expecter) UpdateSegmentStatistics(ctx interface{}, req interface{}) *MockBroker_UpdateSegmentStatistics_Call {
return &MockBroker_UpdateSegmentStatistics_Call{Call: _e.mock.On("UpdateSegmentStatistics", ctx, req)} return &MockBroker_UpdateSegmentStatistics_Call{Call: _e.mock.On("UpdateSegmentStatistics", ctx, req)}
} }

View File

@ -18,49 +18,139 @@ package datanode
import ( import (
"context" "context"
"sync"
"time" "time"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
) )
const ( const (
updateChanCPInterval = 1 * time.Minute defaultUpdateChanCPMaxParallel = 10
updateChanCPTimeout = 10 * time.Second
defaultUpdateChanCPMaxParallel = 1000
) )
type channelCPUpdateTask struct {
pos *msgpb.MsgPosition
callback func()
}
type channelCheckpointUpdater struct { type channelCheckpointUpdater struct {
dn *DataNode dn *DataNode
workerPool *conc.Pool[any]
mu sync.RWMutex
tasks map[string]*channelCPUpdateTask
closeCh chan struct{}
closeOnce sync.Once
} }
func newChannelCheckpointUpdater(dn *DataNode) *channelCheckpointUpdater { func newChannelCheckpointUpdater(dn *DataNode) *channelCheckpointUpdater {
return &channelCheckpointUpdater{
dn: dn,
tasks: make(map[string]*channelCPUpdateTask),
closeCh: make(chan struct{}),
}
}
func (ccu *channelCheckpointUpdater) start() {
log.Info("channel checkpoint updater start")
ticker := time.NewTicker(paramtable.Get().DataNodeCfg.ChannelCheckpointUpdateTickInSeconds.GetAsDuration(time.Second))
defer ticker.Stop()
for {
select {
case <-ccu.closeCh:
log.Info("channel checkpoint updater exit")
return
case <-ticker.C:
ccu.execute()
}
}
}
func (ccu *channelCheckpointUpdater) execute() {
ccu.mu.RLock()
taskGroups := lo.Chunk(lo.Values(ccu.tasks), paramtable.Get().DataNodeCfg.MaxChannelCheckpointsPerRPC.GetAsInt())
ccu.mu.RUnlock()
updateChanCPMaxParallel := paramtable.Get().DataNodeCfg.UpdateChannelCheckpointMaxParallel.GetAsInt() updateChanCPMaxParallel := paramtable.Get().DataNodeCfg.UpdateChannelCheckpointMaxParallel.GetAsInt()
if updateChanCPMaxParallel <= 0 { if updateChanCPMaxParallel <= 0 {
updateChanCPMaxParallel = defaultUpdateChanCPMaxParallel updateChanCPMaxParallel = defaultUpdateChanCPMaxParallel
} }
return &channelCheckpointUpdater{ rpcGroups := lo.Chunk(taskGroups, updateChanCPMaxParallel)
dn: dn,
workerPool: conc.NewPool[any](updateChanCPMaxParallel, conc.WithPreAlloc(true)), finished := typeutil.NewConcurrentMap[string, *channelCPUpdateTask]()
for _, groups := range rpcGroups {
wg := &sync.WaitGroup{}
for _, tasks := range groups {
wg.Add(1)
go func(tasks []*channelCPUpdateTask) {
defer wg.Done()
timeout := paramtable.Get().DataNodeCfg.UpdateChannelCheckpointRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
channelCPs := lo.Map(tasks, func(t *channelCPUpdateTask, _ int) *msgpb.MsgPosition {
return t.pos
})
err := ccu.dn.broker.UpdateChannelCheckpoint(ctx, channelCPs)
if err != nil {
log.Warn("update channel checkpoint failed", zap.Error(err))
return
}
for _, task := range tasks {
task.callback()
finished.Insert(task.pos.GetChannelName(), task)
}
}(tasks)
}
wg.Wait()
}
ccu.mu.Lock()
defer ccu.mu.Unlock()
finished.Range(func(_ string, task *channelCPUpdateTask) bool {
channel := task.pos.GetChannelName()
if ccu.tasks[channel].pos.GetTimestamp() <= task.pos.GetTimestamp() {
delete(ccu.tasks, channel)
}
return true
})
}
func (ccu *channelCheckpointUpdater) addTask(channelPos *msgpb.MsgPosition, callback func()) {
if channelPos == nil || channelPos.GetMsgID() == nil || channelPos.GetChannelName() == "" {
log.Warn("illegal checkpoint", zap.Any("pos", channelPos))
return
}
channel := channelPos.GetChannelName()
ccu.mu.RLock()
if ccu.tasks[channel] != nil && channelPos.GetTimestamp() <= ccu.tasks[channel].pos.GetTimestamp() {
ccu.mu.RUnlock()
return
}
ccu.mu.RUnlock()
ccu.mu.Lock()
defer ccu.mu.Unlock()
ccu.tasks[channel] = &channelCPUpdateTask{
pos: channelPos,
callback: callback,
} }
} }
func (ccu *channelCheckpointUpdater) updateChannelCP(channelPos *msgpb.MsgPosition, callback func() error) error { func (ccu *channelCheckpointUpdater) taskNum() int {
ccu.workerPool.Submit(func() (any, error) { ccu.mu.RLock()
ctx, cancel := context.WithTimeout(context.Background(), updateChanCPTimeout) defer ccu.mu.RUnlock()
defer cancel() return len(ccu.tasks)
err := ccu.dn.broker.UpdateChannelCheckpoint(ctx, channelPos.GetChannelName(), channelPos)
if err != nil {
return nil, err
}
err = callback()
return nil, err
})
return nil
} }
func (ccu *channelCheckpointUpdater) close() { func (ccu *channelCheckpointUpdater) close() {
ccu.workerPool.Release() ccu.closeOnce.Do(func() {
close(ccu.closeCh)
})
} }

View File

@ -0,0 +1,86 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type ChannelCPUpdaterSuite struct {
suite.Suite
updater *channelCheckpointUpdater
}
func (s *ChannelCPUpdaterSuite) SetupTest() {
s.updater = newChannelCheckpointUpdater(&DataNode{})
}
func (s *ChannelCPUpdaterSuite) TestUpdate() {
paramtable.Get().Save(paramtable.Get().DataNodeCfg.ChannelCheckpointUpdateTickInSeconds.Key, "0.01")
defer paramtable.Get().Save(paramtable.Get().DataNodeCfg.ChannelCheckpointUpdateTickInSeconds.Key, "10")
b := broker.NewMockBroker(s.T())
b.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, positions []*msgpb.MsgPosition) error {
time.Sleep(10 * time.Millisecond)
return nil
})
s.updater.dn.broker = b
go s.updater.start()
defer s.updater.close()
tasksNum := 100000
counter := atomic.NewInt64(0)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < tasksNum; i++ {
// add duplicated task with same timestamp
for j := 0; j < 10; j++ {
s.updater.addTask(&msgpb.MsgPosition{
ChannelName: fmt.Sprintf("ch-%d", i),
MsgID: []byte{0},
Timestamp: 100,
}, func() {
counter.Add(1)
})
}
}
}()
s.Eventually(func() bool {
return counter.Load() == int64(tasksNum)
}, time.Second*10, time.Millisecond*100)
wg.Wait()
}
func TestChannelCPUpdater(t *testing.T) {
suite.Run(t, new(ChannelCPUpdaterSuite))
}

View File

@ -357,6 +357,8 @@ func (node *DataNode) Start() error {
node.timeTickSender.start() node.timeTickSender.start()
} }
go node.channelCheckpointUpdater.start()
// Start node watch node // Start node watch node
node.startWatchChannelsAtBackground(node.ctx) node.startWatchChannelsAtBackground(node.ctx)

View File

@ -199,7 +199,7 @@ func TestDataSyncService_Start(t *testing.T) {
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe() broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Maybe()
node.broker = broker node.broker = broker
node.channelCheckpointUpdater = newChannelCheckpointUpdater(node) node.channelCheckpointUpdater = newChannelCheckpointUpdater(node)
@ -378,7 +378,7 @@ func TestDataSyncService_Close(t *testing.T) {
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe() broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Maybe()
node.broker = broker node.broker = broker
node.channelCheckpointUpdater = newChannelCheckpointUpdater(node) node.channelCheckpointUpdater = newChannelCheckpointUpdater(node)

View File

@ -67,7 +67,7 @@ func TestWatchChannel(t *testing.T) {
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe() broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe()
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe() broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Maybe()
node.broker = broker node.broker = broker

View File

@ -61,7 +61,7 @@ func TestFlowGraphManager(t *testing.T) {
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe() broker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe()
broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe() broker.EXPECT().DropVirtualChannel(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything). broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{ Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Status(nil), Status: merr.Status(nil),

View File

@ -93,25 +93,21 @@ func (ttn *ttNode) Operate(in []Msg) []Msg {
// Do not block and async updateCheckPoint // Do not block and async updateCheckPoint
channelPos := ttn.channel.getChannelCheckpoint(fgMsg.endPositions[0]) channelPos := ttn.channel.getChannelCheckpoint(fgMsg.endPositions[0])
nonBlockingNotify := func() {
ttn.updateChannelCP(channelPos, curTs)
}
if curTs.Sub(ttn.lastUpdateTime.Load()) >= updateChanCPInterval { if curTs.Sub(ttn.lastUpdateTime.Load()) >= Params.DataNodeCfg.UpdateChannelCheckpointInterval.GetAsDuration(time.Second) {
nonBlockingNotify() ttn.updateChannelCP(channelPos, curTs)
return []Msg{} return []Msg{}
} }
if channelPos.GetTimestamp() >= ttn.channel.getFlushTs() { if channelPos.GetTimestamp() >= ttn.channel.getFlushTs() {
nonBlockingNotify() ttn.updateChannelCP(channelPos, curTs)
} }
return []Msg{} return []Msg{}
} }
func (ttn *ttNode) updateChannelCP(channelPos *msgpb.MsgPosition, curTs time.Time) error { func (ttn *ttNode) updateChannelCP(channelPos *msgpb.MsgPosition, curTs time.Time) {
callBack := func() error { callBack := func() {
channelCPTs, _ := tsoutil.ParseTS(channelPos.GetTimestamp()) channelCPTs, _ := tsoutil.ParseTS(channelPos.GetTimestamp())
ttn.lastUpdateTime.Store(curTs)
log.Debug("UpdateChannelCheckpoint success", log.Debug("UpdateChannelCheckpoint success",
zap.String("channel", ttn.vChannelName), zap.String("channel", ttn.vChannelName),
zap.Uint64("cpTs", channelPos.GetTimestamp()), zap.Uint64("cpTs", channelPos.GetTimestamp()),
@ -120,11 +116,9 @@ func (ttn *ttNode) updateChannelCP(channelPos *msgpb.MsgPosition, curTs time.Tim
if channelPos.GetTimestamp() >= ttn.channel.getFlushTs() { if channelPos.GetTimestamp() >= ttn.channel.getFlushTs() {
ttn.channel.setFlushTs(math.MaxUint64) ttn.channel.setFlushTs(math.MaxUint64)
} }
return nil
} }
ttn.cpUpdater.addTask(channelPos, callBack)
err := ttn.cpUpdater.updateChannelCP(channelPos, callBack) ttn.lastUpdateTime.Store(curTs)
return err
} }
func newTTNode(config *nodeConfig, broker broker.Broker, cpUpdater *channelCheckpointUpdater) (*ttNode, error) { func newTTNode(config *nodeConfig, broker broker.Broker, cpUpdater *channelCheckpointUpdater) (*ttNode, error) {

View File

@ -113,7 +113,7 @@ func (s *DataNodeServicesSuite) SetupTest() {
}, nil).Maybe() }, nil).Maybe()
broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Maybe()
broker.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Call.Return(tsoutil.ComposeTSByTime(time.Now(), 0), broker.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Call.Return(tsoutil.ComposeTSByTime(time.Now(), 0),
func(_ context.Context, num uint32) uint32 { return num }, nil).Maybe() func(_ context.Context, num uint32) uint32 { return num }, nil).Maybe()
@ -576,7 +576,7 @@ func (s *DataNodeServicesSuite) TestImport() {
Return([]*datapb.SegmentInfo{}, nil).Maybe() Return([]*datapb.SegmentInfo{}, nil).Maybe()
s.broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe() s.broker.EXPECT().ReportTimeTick(mock.Anything, mock.Anything).Return(nil).Maybe()
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe() s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil).Maybe()
s.broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() s.broker.EXPECT().UpdateChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Maybe()
s.broker.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Call.Return(tsoutil.ComposeTSByTime(time.Now(), 0), s.broker.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Call.Return(tsoutil.ComposeTSByTime(time.Now(), 0),
func(_ context.Context, num uint32) uint32 { return num }, nil).Maybe() func(_ context.Context, num uint32) uint32 { return num }, nil).Maybe()

View File

@ -123,6 +123,7 @@ type DataCoordCatalog interface {
ListChannelCheckpoint(ctx context.Context) (map[string]*msgpb.MsgPosition, error) ListChannelCheckpoint(ctx context.Context) (map[string]*msgpb.MsgPosition, error)
SaveChannelCheckpoint(ctx context.Context, vChannel string, pos *msgpb.MsgPosition) error SaveChannelCheckpoint(ctx context.Context, vChannel string, pos *msgpb.MsgPosition) error
SaveChannelCheckpoints(ctx context.Context, positions []*msgpb.MsgPosition) error
DropChannelCheckpoint(ctx context.Context, vChannel string) error DropChannelCheckpoint(ctx context.Context, vChannel string) error
CreateIndex(ctx context.Context, index *model.Index) error CreateIndex(ctx context.Context, index *model.Index) error

View File

@ -526,6 +526,19 @@ func (kc *Catalog) SaveChannelCheckpoint(ctx context.Context, vChannel string, p
return kc.MetaKv.Save(k, string(v)) return kc.MetaKv.Save(k, string(v))
} }
func (kc *Catalog) SaveChannelCheckpoints(ctx context.Context, positions []*msgpb.MsgPosition) error {
kvs := make(map[string]string)
for _, position := range positions {
k := buildChannelCPKey(position.GetChannelName())
v, err := proto.Marshal(position)
if err != nil {
return err
}
kvs[k] = string(v)
}
return kc.SaveByBatch(kvs)
}
func (kc *Catalog) DropChannelCheckpoint(ctx context.Context, vChannel string) error { func (kc *Catalog) DropChannelCheckpoint(ctx context.Context, vChannel string) error {
k := buildChannelCPKey(vChannel) k := buildChannelCPKey(vChannel)
return kc.MetaKv.Remove(k) return kc.MetaKv.Remove(k)

View File

@ -590,6 +590,22 @@ func TestChannelCP(t *testing.T) {
assert.Error(t, err) assert.Error(t, err)
}) })
t.Run("SaveChannelCheckpoints", func(t *testing.T) {
txn := mocks.NewMetaKv(t)
txn.EXPECT().MultiSave(mock.Anything).Return(nil)
catalog := NewCatalog(txn, rootPath, "")
err := catalog.SaveChannelCheckpoints(context.TODO(), []*msgpb.MsgPosition{pos})
assert.NoError(t, err)
})
t.Run("SaveChannelCheckpoints failed", func(t *testing.T) {
txn := mocks.NewMetaKv(t)
catalog := NewCatalog(txn, rootPath, "")
txn.EXPECT().MultiSave(mock.Anything).Return(errors.New("mock error"))
err = catalog.SaveChannelCheckpoints(context.TODO(), []*msgpb.MsgPosition{pos})
assert.Error(t, err)
})
t.Run("DropChannelCheckpoint", func(t *testing.T) { t.Run("DropChannelCheckpoint", func(t *testing.T) {
txn := mocks.NewMetaKv(t) txn := mocks.NewMetaKv(t)
txn.EXPECT().Save(mock.Anything, mock.Anything).Return(nil) txn.EXPECT().Save(mock.Anything, mock.Anything).Return(nil)

View File

@ -953,6 +953,49 @@ func (_c *DataCoordCatalog_SaveChannelCheckpoint_Call) RunAndReturn(run func(con
return _c return _c
} }
// SaveChannelCheckpoints provides a mock function with given fields: ctx, positions
func (_m *DataCoordCatalog) SaveChannelCheckpoints(ctx context.Context, positions []*msgpb.MsgPosition) error {
ret := _m.Called(ctx, positions)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, []*msgpb.MsgPosition) error); ok {
r0 = rf(ctx, positions)
} else {
r0 = ret.Error(0)
}
return r0
}
// DataCoordCatalog_SaveChannelCheckpoints_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveChannelCheckpoints'
type DataCoordCatalog_SaveChannelCheckpoints_Call struct {
*mock.Call
}
// SaveChannelCheckpoints is a helper method to define mock.On call
// - ctx context.Context
// - positions []*msgpb.MsgPosition
func (_e *DataCoordCatalog_Expecter) SaveChannelCheckpoints(ctx interface{}, positions interface{}) *DataCoordCatalog_SaveChannelCheckpoints_Call {
return &DataCoordCatalog_SaveChannelCheckpoints_Call{Call: _e.mock.On("SaveChannelCheckpoints", ctx, positions)}
}
func (_c *DataCoordCatalog_SaveChannelCheckpoints_Call) Run(run func(ctx context.Context, positions []*msgpb.MsgPosition)) *DataCoordCatalog_SaveChannelCheckpoints_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].([]*msgpb.MsgPosition))
})
return _c
}
func (_c *DataCoordCatalog_SaveChannelCheckpoints_Call) Return(_a0 error) *DataCoordCatalog_SaveChannelCheckpoints_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *DataCoordCatalog_SaveChannelCheckpoints_Call) RunAndReturn(run func(context.Context, []*msgpb.MsgPosition) error) *DataCoordCatalog_SaveChannelCheckpoints_Call {
_c.Call.Return(run)
return _c
}
// SaveDroppedSegmentsInBatch provides a mock function with given fields: ctx, segments // SaveDroppedSegmentsInBatch provides a mock function with given fields: ctx, segments
func (_m *DataCoordCatalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error { func (_m *DataCoordCatalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error {
ret := _m.Called(ctx, segments) ret := _m.Called(ctx, segments)

File diff suppressed because it is too large Load Diff

View File

@ -632,8 +632,9 @@ message UpdateSegmentStatisticsRequest {
message UpdateChannelCheckpointRequest { message UpdateChannelCheckpointRequest {
common.MsgBase base = 1; common.MsgBase base = 1;
string vChannel = 2; string vChannel = 2; // deprecated, keep it for compatibility
msg.MsgPosition position = 3; msg.MsgPosition position = 3; // deprecated, keep it for compatibility
repeated msg.MsgPosition channel_checkpoints = 4;
} }
message ResendSegmentStatsRequest { message ResendSegmentStatsRequest {

View File

@ -2768,7 +2768,11 @@ type dataNodeConfig struct {
// channel // channel
ChannelWorkPoolSize ParamItem `refreshable:"true"` ChannelWorkPoolSize ParamItem `refreshable:"true"`
UpdateChannelCheckpointMaxParallel ParamItem `refreshable:"true"` UpdateChannelCheckpointMaxParallel ParamItem `refreshable:"true"`
UpdateChannelCheckpointInterval ParamItem `refreshable:"true"`
UpdateChannelCheckpointRPCTimeout ParamItem `refreshable:"true"`
MaxChannelCheckpointsPerRPC ParamItem `refreshable:"true"`
ChannelCheckpointUpdateTickInSeconds ParamItem `refreshable:"true"`
GracefulStopTimeout ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"`
} }
@ -2984,10 +2988,42 @@ func (p *dataNodeConfig) init(base *BaseTable) {
Key: "datanode.channel.updateChannelCheckpointMaxParallel", Key: "datanode.channel.updateChannelCheckpointMaxParallel",
Version: "2.3.4", Version: "2.3.4",
PanicIfEmpty: false, PanicIfEmpty: false,
DefaultValue: "1000", DefaultValue: "10",
} }
p.UpdateChannelCheckpointMaxParallel.Init(base.mgr) p.UpdateChannelCheckpointMaxParallel.Init(base.mgr)
p.UpdateChannelCheckpointInterval = ParamItem{
Key: "datanode.channel.updateChannelCheckpointInterval",
Version: "2.4.0",
Doc: "the interval duration(in seconds) for datanode to update channel checkpoint of each channel",
DefaultValue: "60",
}
p.UpdateChannelCheckpointInterval.Init(base.mgr)
p.UpdateChannelCheckpointRPCTimeout = ParamItem{
Key: "datanode.channel.updateChannelCheckpointRPCTimeout",
Version: "2.4.0",
Doc: "timeout in seconds for UpdateChannelCheckpoint RPC call",
DefaultValue: "20",
}
p.UpdateChannelCheckpointRPCTimeout.Init(base.mgr)
p.MaxChannelCheckpointsPerRPC = ParamItem{
Key: "datanode.channel.maxChannelCheckpointsPerPRC",
Version: "2.4.0",
Doc: "The maximum number of channel checkpoints per UpdateChannelCheckpoint RPC.",
DefaultValue: "128",
}
p.MaxChannelCheckpointsPerRPC.Init(base.mgr)
p.ChannelCheckpointUpdateTickInSeconds = ParamItem{
Key: "datanode.channel.channelCheckpointUpdateTickInSeconds",
Version: "2.4.0",
Doc: "The frequency, in seconds, at which the channel checkpoint updater executes updates.",
DefaultValue: "10",
}
p.ChannelCheckpointUpdateTickInSeconds.Init(base.mgr)
p.GracefulStopTimeout = ParamItem{ p.GracefulStopTimeout = ParamItem{
Key: "datanode.gracefulStopTimeout", Key: "datanode.gracefulStopTimeout",
Version: "2.3.7", Version: "2.3.7",

View File

@ -426,7 +426,9 @@ func TestComponentParam(t *testing.T) {
updateChannelCheckpointMaxParallel := Params.UpdateChannelCheckpointMaxParallel.GetAsInt() updateChannelCheckpointMaxParallel := Params.UpdateChannelCheckpointMaxParallel.GetAsInt()
t.Logf("updateChannelCheckpointMaxParallel: %d", updateChannelCheckpointMaxParallel) t.Logf("updateChannelCheckpointMaxParallel: %d", updateChannelCheckpointMaxParallel)
assert.Equal(t, 1000, Params.UpdateChannelCheckpointMaxParallel.GetAsInt()) assert.Equal(t, 10, Params.UpdateChannelCheckpointMaxParallel.GetAsInt())
assert.Equal(t, 128, Params.MaxChannelCheckpointsPerRPC.GetAsInt())
assert.Equal(t, 10*time.Second, Params.ChannelCheckpointUpdateTickInSeconds.GetAsDuration(time.Second))
params.Save("datanode.gracefulStopTimeout", "100") params.Save("datanode.gracefulStopTimeout", "100")
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))