enhance: Remove datanode reporting TT based on MQ implementation (#34421)

issue: #34420

Signed-off-by: jaime <yun.zhang@zilliz.com>
pull/34354/head
jaime 2024-07-05 15:48:09 +08:00 committed by GitHub
parent 0817802db8
commit 21fc5f5d46
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 51 additions and 813 deletions

View File

@ -535,7 +535,6 @@ dataNode:
checkInterval: 3000 # the interal to check datanode memory usage, in milliseconds
forceSyncWatermark: 0.5 # memory watermark for standalone, upon reaching this watermark, segments will be synced.
timetick:
byRPC: true
interval: 500
channel:
# specify the size of global work pool of all channels

View File

@ -59,7 +59,7 @@ type SubCluster interface {
CheckChannelOperationProgress(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (*datapb.ChannelOperationProgressResponse, error)
}
type ChannelManagerImplV2 struct {
type ChannelManagerImpl struct {
cancel context.CancelFunc
mu lock.RWMutex
wg sync.WaitGroup
@ -82,15 +82,15 @@ type ChannelManagerImplV2 struct {
// ChannelBGChecker are goroutining running background
type ChannelBGChecker func(ctx context.Context)
// ChannelmanagerOptV2 is to set optional parameters in channel manager.
type ChannelmanagerOptV2 func(c *ChannelManagerImplV2)
// ChannelmanagerOpt is to set optional parameters in channel manager.
type ChannelmanagerOpt func(c *ChannelManagerImpl)
func withFactoryV2(f ChannelPolicyFactory) ChannelmanagerOptV2 {
return func(c *ChannelManagerImplV2) { c.factory = f }
func withFactoryV2(f ChannelPolicyFactory) ChannelmanagerOpt {
return func(c *ChannelManagerImpl) { c.factory = f }
}
func withCheckerV2() ChannelmanagerOptV2 {
return func(c *ChannelManagerImplV2) { c.balanceCheckLoop = c.CheckLoop }
func withCheckerV2() ChannelmanagerOpt {
return func(c *ChannelManagerImpl) { c.balanceCheckLoop = c.CheckLoop }
}
func NewChannelManagerV2(
@ -98,9 +98,9 @@ func NewChannelManagerV2(
h Handler,
subCluster SubCluster, // sessionManager
alloc allocator,
options ...ChannelmanagerOptV2,
) (*ChannelManagerImplV2, error) {
m := &ChannelManagerImplV2{
options ...ChannelmanagerOpt,
) (*ChannelManagerImpl, error) {
m := &ChannelManagerImpl{
h: h,
factory: NewChannelPolicyFactoryV1(),
store: NewChannelStoreV2(kv),
@ -121,7 +121,7 @@ func NewChannelManagerV2(
return m, nil
}
func (m *ChannelManagerImplV2) Startup(ctx context.Context, legacyNodes, allNodes []int64) error {
func (m *ChannelManagerImpl) Startup(ctx context.Context, legacyNodes, allNodes []int64) error {
ctx, m.cancel = context.WithCancel(ctx)
m.legacyNodes = typeutil.NewUniqueSet(legacyNodes...)
@ -175,14 +175,14 @@ func (m *ChannelManagerImplV2) Startup(ctx context.Context, legacyNodes, allNode
return nil
}
func (m *ChannelManagerImplV2) Close() {
func (m *ChannelManagerImpl) Close() {
if m.cancel != nil {
m.cancel()
m.wg.Wait()
}
}
func (m *ChannelManagerImplV2) AddNode(nodeID UniqueID) error {
func (m *ChannelManagerImpl) AddNode(nodeID UniqueID) error {
m.mu.Lock()
defer m.mu.Unlock()
@ -204,7 +204,7 @@ func (m *ChannelManagerImplV2) AddNode(nodeID UniqueID) error {
}
// Release writes ToRelease channel watch states for a channel
func (m *ChannelManagerImplV2) Release(nodeID UniqueID, channelName string) error {
func (m *ChannelManagerImpl) Release(nodeID UniqueID, channelName string) error {
log := log.With(
zap.Int64("nodeID", nodeID),
zap.String("channel", channelName),
@ -227,7 +227,7 @@ func (m *ChannelManagerImplV2) Release(nodeID UniqueID, channelName string) erro
return m.execute(updates)
}
func (m *ChannelManagerImplV2) Watch(ctx context.Context, ch RWChannel) error {
func (m *ChannelManagerImpl) Watch(ctx context.Context, ch RWChannel) error {
log := log.Ctx(ctx).With(zap.String("channel", ch.GetName()))
m.mu.Lock()
defer m.mu.Unlock()
@ -256,7 +256,7 @@ func (m *ChannelManagerImplV2) Watch(ctx context.Context, ch RWChannel) error {
return nil
}
func (m *ChannelManagerImplV2) DeleteNode(nodeID UniqueID) error {
func (m *ChannelManagerImpl) DeleteNode(nodeID UniqueID) error {
m.mu.Lock()
defer m.mu.Unlock()
@ -288,7 +288,7 @@ func (m *ChannelManagerImplV2) DeleteNode(nodeID UniqueID) error {
}
// reassign reassigns a channel to another DataNode.
func (m *ChannelManagerImplV2) reassign(original *NodeChannelInfo) error {
func (m *ChannelManagerImpl) reassign(original *NodeChannelInfo) error {
m.mu.Lock()
defer m.mu.Unlock()
@ -309,7 +309,7 @@ func (m *ChannelManagerImplV2) reassign(original *NodeChannelInfo) error {
return nil
}
func (m *ChannelManagerImplV2) Balance() {
func (m *ChannelManagerImpl) Balance() {
m.mu.Lock()
defer m.mu.Unlock()
@ -325,7 +325,7 @@ func (m *ChannelManagerImplV2) Balance() {
}
}
func (m *ChannelManagerImplV2) Match(nodeID UniqueID, channel string) bool {
func (m *ChannelManagerImpl) Match(nodeID UniqueID, channel string) bool {
m.mu.RLock()
defer m.mu.RUnlock()
@ -338,7 +338,7 @@ func (m *ChannelManagerImplV2) Match(nodeID UniqueID, channel string) bool {
return ok
}
func (m *ChannelManagerImplV2) GetChannel(nodeID int64, channelName string) (RWChannel, bool) {
func (m *ChannelManagerImpl) GetChannel(nodeID int64, channelName string) (RWChannel, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
@ -350,13 +350,13 @@ func (m *ChannelManagerImplV2) GetChannel(nodeID int64, channelName string) (RWC
return nil, false
}
func (m *ChannelManagerImplV2) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string {
func (m *ChannelManagerImpl) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string {
m.mu.RLock()
defer m.mu.RUnlock()
return m.store.GetNodeChannelsByCollectionID(collectionID)
}
func (m *ChannelManagerImplV2) GetChannelsByCollectionID(collectionID int64) []RWChannel {
func (m *ChannelManagerImpl) GetChannelsByCollectionID(collectionID int64) []RWChannel {
m.mu.RLock()
defer m.mu.RUnlock()
channels := []RWChannel{}
@ -370,14 +370,14 @@ func (m *ChannelManagerImplV2) GetChannelsByCollectionID(collectionID int64) []R
return channels
}
func (m *ChannelManagerImplV2) GetChannelNamesByCollectionID(collectionID int64) []string {
func (m *ChannelManagerImpl) GetChannelNamesByCollectionID(collectionID int64) []string {
channels := m.GetChannelsByCollectionID(collectionID)
return lo.Map(channels, func(ch RWChannel, _ int) string {
return ch.GetName()
})
}
func (m *ChannelManagerImplV2) FindWatcher(channel string) (UniqueID, error) {
func (m *ChannelManagerImpl) FindWatcher(channel string) (UniqueID, error) {
m.mu.RLock()
defer m.mu.RUnlock()
@ -400,7 +400,7 @@ func (m *ChannelManagerImplV2) FindWatcher(channel string) (UniqueID, error) {
}
// unsafe innter func
func (m *ChannelManagerImplV2) removeChannel(nodeID int64, ch RWChannel) error {
func (m *ChannelManagerImpl) removeChannel(nodeID int64, ch RWChannel) error {
op := NewChannelOpSet(NewChannelOp(nodeID, Delete, ch))
log.Info("remove channel assignment",
zap.String("channel", ch.GetName()),
@ -409,7 +409,7 @@ func (m *ChannelManagerImplV2) removeChannel(nodeID int64, ch RWChannel) error {
return m.store.Update(op)
}
func (m *ChannelManagerImplV2) CheckLoop(ctx context.Context) {
func (m *ChannelManagerImpl) CheckLoop(ctx context.Context) {
balanceTicker := time.NewTicker(Params.DataCoordCfg.ChannelBalanceInterval.GetAsDuration(time.Second))
defer balanceTicker.Stop()
checkTicker := time.NewTicker(Params.DataCoordCfg.ChannelCheckInterval.GetAsDuration(time.Second))
@ -430,7 +430,7 @@ func (m *ChannelManagerImplV2) CheckLoop(ctx context.Context) {
}
}
func (m *ChannelManagerImplV2) AdvanceChannelState(ctx context.Context) {
func (m *ChannelManagerImpl) AdvanceChannelState(ctx context.Context) {
m.mu.RLock()
standbys := m.store.GetNodeChannelsBy(WithAllNodes(), WithChannelStates(Standby))
toNotifies := m.store.GetNodeChannelsBy(WithoutBufferNode(), WithChannelStates(ToWatch, ToRelease))
@ -447,7 +447,7 @@ func (m *ChannelManagerImplV2) AdvanceChannelState(ctx context.Context) {
}
}
func (m *ChannelManagerImplV2) finishRemoveChannel(nodeID int64, channels ...RWChannel) {
func (m *ChannelManagerImpl) finishRemoveChannel(nodeID int64, channels ...RWChannel) {
m.mu.Lock()
defer m.mu.Unlock()
for _, ch := range channels {
@ -463,7 +463,7 @@ func (m *ChannelManagerImplV2) finishRemoveChannel(nodeID int64, channels ...RWC
}
}
func (m *ChannelManagerImplV2) advanceStandbys(_ context.Context, standbys []*NodeChannelInfo) bool {
func (m *ChannelManagerImpl) advanceStandbys(_ context.Context, standbys []*NodeChannelInfo) bool {
var advanced bool = false
for _, nodeAssign := range standbys {
validChannels := make(map[string]RWChannel)
@ -500,7 +500,7 @@ func (m *ChannelManagerImplV2) advanceStandbys(_ context.Context, standbys []*No
return advanced
}
func (m *ChannelManagerImplV2) advanceToNotifies(ctx context.Context, toNotifies []*NodeChannelInfo) bool {
func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies []*NodeChannelInfo) bool {
var advanced bool = false
for _, nodeAssign := range toNotifies {
channelCount := len(nodeAssign.Channels)
@ -563,7 +563,7 @@ type poolResult struct {
ch RWChannel
}
func (m *ChannelManagerImplV2) advanceToChecks(ctx context.Context, toChecks []*NodeChannelInfo) bool {
func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*NodeChannelInfo) bool {
var advanced bool = false
for _, nodeAssign := range toChecks {
if len(nodeAssign.Channels) == 0 {
@ -615,7 +615,7 @@ func (m *ChannelManagerImplV2) advanceToChecks(ctx context.Context, toChecks []*
return advanced
}
func (m *ChannelManagerImplV2) Notify(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) error {
func (m *ChannelManagerImpl) Notify(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) error {
log := log.With(
zap.String("channel", info.GetVchan().GetChannelName()),
zap.Int64("assignment", nodeID),
@ -631,7 +631,7 @@ func (m *ChannelManagerImplV2) Notify(ctx context.Context, nodeID int64, info *d
return nil
}
func (m *ChannelManagerImplV2) Check(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (successful bool, got bool) {
func (m *ChannelManagerImpl) Check(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (successful bool, got bool) {
log := log.With(
zap.Int64("opID", info.GetOpID()),
zap.Int64("nodeID", nodeID),
@ -674,7 +674,7 @@ func (m *ChannelManagerImplV2) Check(ctx context.Context, nodeID int64, info *da
return false, false
}
func (m *ChannelManagerImplV2) execute(updates *ChannelOpSet) error {
func (m *ChannelManagerImpl) execute(updates *ChannelOpSet) error {
for _, op := range updates.ops {
if op.Type != Delete {
if err := m.fillChannelWatchInfo(op); err != nil {
@ -688,7 +688,7 @@ func (m *ChannelManagerImplV2) execute(updates *ChannelOpSet) error {
}
// fillChannelWatchInfoWithState updates the channel op by filling in channel watch info.
func (m *ChannelManagerImplV2) fillChannelWatchInfo(op *ChannelOp) error {
func (m *ChannelManagerImpl) fillChannelWatchInfo(op *ChannelOp) error {
startTs := time.Now().Unix()
for _, ch := range op.Channels {
vcInfo := m.h.GetDataVChanPositions(ch, allPartitionID)

View File

@ -66,7 +66,7 @@ func (s *ChannelManagerSuite) prepareMeta(chNodes map[string]int64, state datapb
s.mockKv.EXPECT().LoadWithPrefix(mock.Anything).Return(keys, values, nil).Once()
}
func (s *ChannelManagerSuite) checkAssignment(m *ChannelManagerImplV2, nodeID int64, channel string, state ChannelState) {
func (s *ChannelManagerSuite) checkAssignment(m *ChannelManagerImpl, nodeID int64, channel string, state ChannelState) {
rwChannel, found := m.GetChannel(nodeID, channel)
s.True(found)
s.NotNil(rwChannel)
@ -84,7 +84,7 @@ func (s *ChannelManagerSuite) checkAssignment(m *ChannelManagerImplV2, nodeID in
}
}
func (s *ChannelManagerSuite) checkNoAssignment(m *ChannelManagerImplV2, nodeID int64, channel string) {
func (s *ChannelManagerSuite) checkNoAssignment(m *ChannelManagerImpl, nodeID int64, channel string) {
rwChannel, found := m.GetChannel(nodeID, channel)
s.False(found)
s.Nil(rwChannel)

View File

@ -57,7 +57,6 @@ type ROChannelStore interface {
// GetNodeChannels for given collection
GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string
// GetNodeChannelsBy used by channel_store_v2 and channel_manager_v2 only
GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo
}

View File

@ -49,8 +49,6 @@ import (
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/expr"
"github.com/milvus-io/milvus/pkg/util/logutil"
@ -58,7 +56,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -103,7 +100,6 @@ type Server struct {
serverLoopWg sync.WaitGroup
quitCh chan struct{}
stateCode atomic.Value
helper ServerHelper
etcdCli *clientv3.Client
tikvCli *txnkv.Client
@ -166,17 +162,6 @@ type CollectionNameInfo struct {
DBName string
}
// ServerHelper datacoord server injection helper
type ServerHelper struct {
eventAfterHandleDataNodeTt func()
}
func defaultServerHelper() ServerHelper {
return ServerHelper{
eventAfterHandleDataNodeTt: func() {},
}
}
// Option utility function signature to set DataCoord server attributes
type Option func(svr *Server)
@ -187,13 +172,6 @@ func WithRootCoordCreator(creator rootCoordCreatorFunc) Option {
}
}
// WithServerHelper returns an `Option` setting ServerHelp with provided parameter
func WithServerHelper(helper ServerHelper) Option {
return func(svr *Server) {
svr.helper = helper
}
}
// WithCluster returns an `Option` setting Cluster with provided parameter
func WithCluster(cluster Cluster) Option {
return func(svr *Server) {
@ -228,7 +206,6 @@ func CreateServer(ctx context.Context, factory dependency.Factory, opts ...Optio
dataNodeCreator: defaultDataNodeCreatorFunc,
indexNodeCreator: defaultIndexNodeCreatorFunc,
rootCoordClientCreator: defaultRootCoordCreatorFunc,
helper: defaultServerHelper(),
metricsCacheManager: metricsinfo.NewMetricsCacheManager(),
enableActiveStandBy: Params.DataCoordCfg.EnableActiveStandby.GetAsBool(),
}
@ -697,11 +674,6 @@ func (s *Server) initIndexNodeManager() {
}
func (s *Server) startServerLoop() {
if !Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() {
s.serverLoopWg.Add(1)
s.startDataNodeTtLoop(s.serverLoopCtx)
}
s.serverLoopWg.Add(2)
s.startWatchService(s.serverLoopCtx)
s.startFlushLoop(s.serverLoopCtx)
@ -712,80 +684,6 @@ func (s *Server) startServerLoop() {
s.syncSegmentsScheduler.Start()
}
// startDataNodeTtLoop start a goroutine to recv data node tt msg from msgstream
// tt msg stands for the currently consumed timestamp for each channel
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
ttMsgStream, err := s.factory.NewMsgStream(ctx)
if err != nil {
log.Error("DataCoord failed to create timetick channel", zap.Error(err))
panic(err)
}
timeTickChannel := Params.CommonCfg.DataCoordTimeTick.GetValue()
if Params.CommonCfg.PreCreatedTopicEnabled.GetAsBool() {
timeTickChannel = Params.CommonCfg.TimeTicker.GetValue()
}
subName := fmt.Sprintf("%s-%d-datanodeTl", Params.CommonCfg.DataCoordSubName.GetValue(), paramtable.GetNodeID())
ttMsgStream.AsConsumer(context.TODO(), []string{timeTickChannel}, subName, common.SubscriptionPositionLatest)
log.Info("DataCoord creates the timetick channel consumer",
zap.String("timeTickChannel", timeTickChannel),
zap.String("subscription", subName))
go s.handleDataNodeTimetickMsgstream(ctx, ttMsgStream)
}
func (s *Server) handleDataNodeTimetickMsgstream(ctx context.Context, ttMsgStream msgstream.MsgStream) {
var checker *timerecord.LongTermChecker
if enableTtChecker {
checker = timerecord.NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
checker.Start()
defer checker.Stop()
}
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
defer func() {
// https://github.com/milvus-io/milvus/issues/15659
// msgstream service closed before datacoord quits
defer func() {
if x := recover(); x != nil {
log.Error("Failed to close ttMessage", zap.Any("recovered", x))
}
}()
ttMsgStream.Close()
}()
for {
select {
case <-ctx.Done():
log.Info("DataNode timetick loop shutdown")
return
case msgPack, ok := <-ttMsgStream.Chan():
if !ok || msgPack == nil || len(msgPack.Msgs) == 0 {
log.Info("receive nil timetick msg and shutdown timetick channel")
return
}
for _, msg := range msgPack.Msgs {
ttMsg, ok := msg.(*msgstream.DataNodeTtMsg)
if !ok {
log.Warn("receive unexpected msg type from tt channel")
continue
}
if enableTtChecker {
checker.Check()
}
if err := s.handleDataNodeTtMsg(ctx, &ttMsg.DataNodeTtMsg); err != nil {
log.Warn("failed to handle timetick message", zap.Error(err))
continue
}
}
s.helper.eventAfterHandleDataNodeTt()
}
}
}
func (s *Server) updateSegmentStatistics(stats []*commonpb.SegmentStats) {
for _, stat := range stats {
segment := s.meta.GetSegment(stat.GetSegmentID())

View File

@ -1468,21 +1468,6 @@ func (s *Server) handleDataNodeTtMsg(ctx context.Context, ttMsg *msgpb.DataNodeT
return nil
}
// getDiff returns the difference of base and remove. i.e. all items that are in `base` but not in `remove`.
func getDiff(base, remove []int64) []int64 {
mb := make(map[int64]struct{}, len(remove))
for _, x := range remove {
mb[x] = struct{}{}
}
var diff []int64
for _, x := range base {
if _, found := mb[x]; !found {
diff = append(diff, x)
}
}
return diff
}
// MarkSegmentsDropped marks the given segments as `Dropped`.
// An error status will be returned and error will be logged, if we failed to mark *all* segments.
// Deprecated, do not use it

View File

@ -2,7 +2,6 @@ package datacoord
import (
"context"
"fmt"
"testing"
"time"
@ -32,7 +31,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
type ServerSuite struct {
@ -90,309 +88,6 @@ func genMsg(msgType commonpb.MsgType, ch string, t Timestamp, sourceID int64) *m
}
}
func (s *ServerSuite) TestHandleDataNodeTtMsg() {
var (
chanName = "ch-1"
collID int64 = 100
sourceID int64 = 1
)
s.testServer.meta.AddCollection(&collectionInfo{
ID: collID,
Schema: newTestSchema(),
Partitions: []int64{10},
})
resp, err := s.testServer.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{
NodeID: sourceID,
SegmentIDRequests: []*datapb.SegmentIDRequest{
{
CollectionID: collID,
PartitionID: 10,
ChannelName: chanName,
Count: 100,
},
},
})
s.Require().NoError(err)
s.Require().True(merr.Ok(resp.GetStatus()))
s.Equal(1, len(resp.GetSegIDAssignments()))
assign := resp.GetSegIDAssignments()[0]
assignedSegmentID := resp.SegIDAssignments[0].SegID
segment := s.testServer.meta.GetHealthySegment(assignedSegmentID)
s.Require().NotNil(segment)
s.Equal(1, len(segment.allocations))
ts := tsoutil.AddPhysicalDurationOnTs(assign.ExpireTime, -3*time.Minute)
msg := genMsg(commonpb.MsgType_DataNodeTt, chanName, ts, sourceID)
msg.SegmentsStats = append(msg.SegmentsStats, &commonpb.SegmentStats{
SegmentID: assign.GetSegID(),
NumRows: 1,
})
mockCluster := NewMockCluster(s.T())
mockCluster.EXPECT().Close().Once()
mockCluster.EXPECT().Flush(mock.Anything, sourceID, chanName, mock.Anything).RunAndReturn(
func(ctx context.Context, nodeID int64, channel string, segments []*datapb.SegmentInfo) error {
s.EqualValues(chanName, channel)
s.EqualValues(sourceID, nodeID)
s.Equal(1, len(segments))
s.EqualValues(2, segments[0].GetID())
return fmt.Errorf("mock error")
}).Once()
s.testServer.cluster = mockCluster
s.mockChMgr.EXPECT().Match(sourceID, chanName).Return(true).Twice()
err = s.testServer.handleDataNodeTtMsg(context.TODO(), &msg.DataNodeTtMsg)
s.NoError(err)
tt := tsoutil.AddPhysicalDurationOnTs(assign.ExpireTime, 48*time.Hour)
msg = genMsg(commonpb.MsgType_DataNodeTt, chanName, tt, sourceID)
msg.SegmentsStats = append(msg.SegmentsStats, &commonpb.SegmentStats{
SegmentID: assign.GetSegID(),
NumRows: 1,
})
err = s.testServer.handleDataNodeTtMsg(context.TODO(), &msg.DataNodeTtMsg)
s.Error(err)
}
// restart the server for config DataNodeTimeTickByRPC=false
func (s *ServerSuite) initSuiteForTtChannel() {
s.testServer.serverLoopWg.Add(1)
s.testServer.startDataNodeTtLoop(s.testServer.serverLoopCtx)
s.testServer.meta.AddCollection(&collectionInfo{
ID: 1,
Schema: newTestSchema(),
Partitions: []int64{10},
})
}
func (s *ServerSuite) TestDataNodeTtChannel_ExpireAfterTt() {
s.initSuiteForTtChannel()
ctx := context.TODO()
ttMsgStream, err := s.testServer.factory.NewMsgStream(ctx)
s.Require().NoError(err)
ttMsgStream.AsProducer([]string{paramtable.Get().CommonCfg.DataCoordTimeTick.GetValue()})
defer ttMsgStream.Close()
var (
sourceID int64 = 9997
chanName = "ch-1"
signal = make(chan struct{})
collID int64 = 1
)
mockCluster := NewMockCluster(s.T())
mockCluster.EXPECT().Close().Once()
mockCluster.EXPECT().Flush(mock.Anything, sourceID, chanName, mock.Anything).RunAndReturn(
func(ctx context.Context, nodeID int64, channel string, segments []*datapb.SegmentInfo) error {
s.EqualValues(chanName, channel)
s.EqualValues(sourceID, nodeID)
s.Equal(1, len(segments))
s.EqualValues(2, segments[0].GetID())
signal <- struct{}{}
return nil
}).Once()
s.testServer.cluster = mockCluster
s.mockChMgr.EXPECT().Match(sourceID, chanName).Return(true).Once()
resp, err := s.testServer.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{
NodeID: sourceID,
SegmentIDRequests: []*datapb.SegmentIDRequest{
{
CollectionID: collID,
PartitionID: 10,
ChannelName: chanName,
Count: 100,
},
},
})
s.Require().NoError(err)
s.Require().True(merr.Ok(resp.GetStatus()))
s.Equal(1, len(resp.GetSegIDAssignments()))
assignedSegmentID := resp.SegIDAssignments[0].SegID
segment := s.testServer.meta.GetHealthySegment(assignedSegmentID)
s.Require().NotNil(segment)
s.Equal(1, len(segment.allocations))
msgPack := msgstream.MsgPack{}
tt := tsoutil.AddPhysicalDurationOnTs(resp.SegIDAssignments[0].ExpireTime, 48*time.Hour)
msg := genMsg(commonpb.MsgType_DataNodeTt, "ch-1", tt, sourceID)
msgPack.Msgs = append(msgPack.Msgs, msg)
err = ttMsgStream.Produce(&msgPack)
s.Require().NoError(err)
<-signal
segment = s.testServer.meta.GetHealthySegment(assignedSegmentID)
s.NotNil(segment)
s.Equal(0, len(segment.allocations))
}
func (s *ServerSuite) TestDataNodeTtChannel_FlushWithDiffChan() {
s.initSuiteForTtChannel()
ctx := context.TODO()
ttMsgStream, err := s.testServer.factory.NewMsgStream(ctx)
s.Require().NoError(err)
ttMsgStream.AsProducer([]string{paramtable.Get().CommonCfg.DataCoordTimeTick.GetValue()})
defer ttMsgStream.Close()
var (
sourceID int64 = 9998
chanName = "ch-1"
signal = make(chan struct{})
collID int64 = 1
)
mockCluster := NewMockCluster(s.T())
mockCluster.EXPECT().Close().Once()
mockCluster.EXPECT().Flush(mock.Anything, sourceID, chanName, mock.Anything).RunAndReturn(
func(ctx context.Context, nodeID int64, channel string, segments []*datapb.SegmentInfo) error {
s.EqualValues(chanName, channel)
s.EqualValues(sourceID, nodeID)
s.Equal(1, len(segments))
signal <- struct{}{}
return nil
}).Once()
mockCluster.EXPECT().FlushChannels(mock.Anything, sourceID, mock.Anything, []string{chanName}).Return(nil).Once()
s.testServer.cluster = mockCluster
s.mockChMgr.EXPECT().Match(sourceID, chanName).Return(true).Once()
s.mockChMgr.EXPECT().GetNodeChannelsByCollectionID(collID).Return(map[int64][]string{
sourceID: {chanName},
})
resp, err := s.testServer.AssignSegmentID(ctx, &datapb.AssignSegmentIDRequest{
NodeID: sourceID,
SegmentIDRequests: []*datapb.SegmentIDRequest{
{
CollectionID: collID,
PartitionID: 10,
ChannelName: chanName,
Count: 100,
},
{
CollectionID: collID,
PartitionID: 10,
ChannelName: "ch-2",
Count: 100,
},
},
})
s.Require().NoError(err)
s.Require().True(merr.Ok(resp.GetStatus()))
s.Equal(2, len(resp.GetSegIDAssignments()))
var assign *datapb.SegmentIDAssignment
for _, segment := range resp.SegIDAssignments {
if segment.GetChannelName() == chanName {
assign = segment
break
}
}
s.Require().NotNil(assign)
resp2, err := s.testServer.Flush(ctx, &datapb.FlushRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Flush,
SourceID: sourceID,
},
CollectionID: collID,
})
s.Require().NoError(err)
s.Require().True(merr.Ok(resp2.GetStatus()))
msgPack := msgstream.MsgPack{}
msg := genMsg(commonpb.MsgType_DataNodeTt, chanName, assign.ExpireTime, sourceID)
msg.SegmentsStats = append(msg.SegmentsStats, &commonpb.SegmentStats{
SegmentID: assign.GetSegID(),
NumRows: 1,
})
msgPack.Msgs = append(msgPack.Msgs, msg)
err = ttMsgStream.Produce(&msgPack)
s.NoError(err)
<-signal
}
func (s *ServerSuite) TestDataNodeTtChannel_SegmentFlushAfterTt() {
s.initSuiteForTtChannel()
var (
sourceID int64 = 9999
chanName = "ch-1"
signal = make(chan struct{})
collID int64 = 1
)
mockCluster := NewMockCluster(s.T())
mockCluster.EXPECT().Close().Once()
mockCluster.EXPECT().Flush(mock.Anything, sourceID, chanName, mock.Anything).RunAndReturn(
func(ctx context.Context, nodeID int64, channel string, segments []*datapb.SegmentInfo) error {
s.EqualValues(chanName, channel)
s.EqualValues(sourceID, nodeID)
s.Equal(1, len(segments))
signal <- struct{}{}
return nil
}).Once()
mockCluster.EXPECT().FlushChannels(mock.Anything, sourceID, mock.Anything, []string{chanName}).Return(nil).Once()
s.testServer.cluster = mockCluster
s.mockChMgr.EXPECT().Match(sourceID, chanName).Return(true).Once()
s.mockChMgr.EXPECT().GetNodeChannelsByCollectionID(collID).Return(map[int64][]string{
sourceID: {chanName},
})
ctx := context.TODO()
ttMsgStream, err := s.testServer.factory.NewMsgStream(ctx)
s.Require().NoError(err)
ttMsgStream.AsProducer([]string{paramtable.Get().CommonCfg.DataCoordTimeTick.GetValue()})
defer ttMsgStream.Close()
resp, err := s.testServer.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{
SegmentIDRequests: []*datapb.SegmentIDRequest{
{
CollectionID: 1,
PartitionID: 10,
ChannelName: chanName,
Count: 100,
},
},
})
s.Require().NoError(err)
s.Require().True(merr.Ok(resp.GetStatus()))
s.Require().Equal(1, len(resp.GetSegIDAssignments()))
assign := resp.GetSegIDAssignments()[0]
resp2, err := s.testServer.Flush(ctx, &datapb.FlushRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Flush,
},
CollectionID: 1,
})
s.Require().NoError(err)
s.Require().True(merr.Ok(resp2.GetStatus()))
msgPack := msgstream.MsgPack{}
msg := genMsg(commonpb.MsgType_DataNodeTt, "ch-1", assign.ExpireTime, 9999)
msg.SegmentsStats = append(msg.SegmentsStats, &commonpb.SegmentStats{
SegmentID: assign.GetSegID(),
NumRows: 1,
})
msgPack.Msgs = append(msgPack.Msgs, msg)
err = ttMsgStream.Produce(&msgPack)
s.Require().NoError(err)
<-signal
}
func (s *ServerSuite) TestGetFlushState_ByFlushTs() {
s.mockChMgr.EXPECT().GetChannelsByCollectionID(int64(0)).
Return([]RWChannel{&channelMeta{Name: "ch1", CollectionID: 0}}).Times(3)

View File

@ -616,7 +616,7 @@ func (t *clusteringCompactionTask) mappingSegment(
log.Warn("stop waiting for memory buffer release as task chan done")
return nil
default:
//currentSize := t.getCurrentBufferWrittenMemorySize()
// currentSize := t.getCurrentBufferWrittenMemorySize()
currentSize := t.getBufferTotalUsedMemorySize()
if currentSize < t.getMemoryBufferBlockFlushThreshold() {
log.Debug("memory is already below the block watermark, continue writing",

View File

@ -326,11 +326,9 @@ func (node *DataNode) Start() error {
go node.importScheduler.Start()
if Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() {
node.timeTickSender = util.NewTimeTickSender(node.broker, node.session.ServerID,
retry.Attempts(20), retry.Sleep(time.Millisecond*100))
node.timeTickSender.Start()
}
node.timeTickSender = util.NewTimeTickSender(node.broker, node.session.ServerID,
retry.Attempts(20), retry.Sleep(time.Millisecond*100))
node.timeTickSender.Start()
go node.channelCheckpointUpdater.Start()

View File

@ -18,7 +18,6 @@ package pipeline
import (
"context"
"fmt"
"sync"
"go.uber.org/zap"
@ -260,26 +259,7 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams,
return nil, err
}
var updater statsUpdater
if paramtable.Get().DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() {
updater = ds.timetickSender
} else {
m, err := config.msFactory.NewMsgStream(ctx)
if err != nil {
return nil, err
}
m.AsProducer([]string{paramtable.Get().CommonCfg.DataCoordTimeTick.GetValue()})
metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(config.serverID)).Inc()
log.Info("datanode AsProducer", zap.String("TimeTickChannelName", paramtable.Get().CommonCfg.DataCoordTimeTick.GetValue()))
m.EnableProduce(true)
updater = newMqStatsUpdater(config, m)
}
writeNode := newWriteNode(params.Ctx, params.WriteBufferManager, updater, config)
writeNode := newWriteNode(params.Ctx, params.WriteBufferManager, ds.timetickSender, config)
ttNode, err := newTTNode(config, params.WriteBufferManager, params.CheckpointUpdater)
if err != nil {
return nil, err

View File

@ -1,146 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pipeline
import (
"sync"
"time"
"github.com/samber/lo"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"github.com/milvus-io/milvus/internal/datanode/util"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type sendTimeTick func(util.Timestamp, []int64) error
// mergedTimeTickerSender reduces time ticker sending rate when datanode is doing `fast-forwarding`
// it makes sure time ticker send at most 10 times a second (1tick/100millisecond)
// and the last time tick is always sent
type mergedTimeTickerSender struct {
ts uint64
segmentIDs map[int64]struct{}
lastSent time.Time
mu sync.Mutex
cond *sync.Cond // condition to send timeticker
send sendTimeTick // actual sender logic
wg sync.WaitGroup
closeCh chan struct{}
closeOnce sync.Once
}
func newUniqueMergedTimeTickerSender(send sendTimeTick) *mergedTimeTickerSender {
return &mergedTimeTickerSender{
ts: 0, // 0 for not tt send
segmentIDs: make(map[int64]struct{}),
cond: sync.NewCond(&sync.Mutex{}),
send: send,
closeCh: make(chan struct{}),
}
}
func (mt *mergedTimeTickerSender) bufferTs(ts util.Timestamp, segmentIDs []int64) {
mt.mu.Lock()
defer mt.mu.Unlock()
mt.ts = ts
for _, sid := range segmentIDs {
mt.segmentIDs[sid] = struct{}{}
}
}
func (mt *mergedTimeTickerSender) tick() {
defer mt.wg.Done()
// this duration might be configuable in the future
t := time.NewTicker(paramtable.Get().DataNodeCfg.DataNodeTimeTickInterval.GetAsDuration(time.Millisecond)) // 500 millisecond
defer t.Stop()
for {
select {
case <-t.C:
mt.cond.L.Lock()
mt.cond.Signal()
mt.cond.L.Unlock()
case <-mt.closeCh:
return
}
}
}
func (mt *mergedTimeTickerSender) isClosed() bool {
select {
case <-mt.closeCh:
return true
default:
return false
}
}
func (mt *mergedTimeTickerSender) work() {
defer mt.wg.Done()
lastTs := uint64(0)
for {
var (
isDiffTs bool
sids []int64
)
mt.cond.L.Lock()
if mt.isClosed() {
mt.cond.L.Unlock()
return
}
mt.cond.Wait()
mt.cond.L.Unlock()
mt.mu.Lock()
isDiffTs = mt.ts != lastTs
if isDiffTs {
for sid := range mt.segmentIDs {
sids = append(sids, sid)
}
// we will reset the timer but not the segmentIDs, since if we sent the timetick fail we may block forever due to flush stuck
lastTs = mt.ts
mt.lastSent = time.Now()
mt.segmentIDs = make(map[int64]struct{})
}
mt.mu.Unlock()
if isDiffTs {
if err := mt.send(lastTs, sids); err != nil {
log.Error("send hard time tick failed", zap.Error(err))
mt.mu.Lock()
maps.Copy(mt.segmentIDs, lo.SliceToMap(sids, func(t int64) (int64, struct{}) {
return t, struct{}{}
}))
mt.mu.Unlock()
}
}
}
}
func (mt *mergedTimeTickerSender) close() {
mt.closeOnce.Do(func() {
mt.cond.L.Lock()
close(mt.closeCh)
mt.cond.Broadcast()
mt.cond.L.Unlock()
mt.wg.Wait()
})
}

View File

@ -24,7 +24,7 @@ type writeNode struct {
channelName string
wbManager writebuffer.BufferManager
updater statsUpdater
updater util.StatsUpdater
metacache metacache.MetaCache
}
@ -122,7 +122,7 @@ func (wNode *writeNode) Operate(in []Msg) []Msg {
func newWriteNode(
_ context.Context,
writeBufferManager writebuffer.BufferManager,
updater statsUpdater,
updater util.StatsUpdater,
config *nodeConfig,
) *writeNode {
baseNode := BaseNode{}

View File

@ -1,100 +0,0 @@
package pipeline
import (
"fmt"
"sync"
"github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/datanode/util"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
type statsUpdater interface {
Update(channel string, ts util.Timestamp, stats []*commonpb.SegmentStats)
}
// mqStatsUpdater is the wrapper of mergedTimeTickSender
type mqStatsUpdater struct {
sender *mergedTimeTickerSender
producer msgstream.MsgStream
config *nodeConfig
mut sync.Mutex
stats map[int64]int64 // segment id => row nums
}
func newMqStatsUpdater(config *nodeConfig, producer msgstream.MsgStream) statsUpdater {
updater := &mqStatsUpdater{
stats: make(map[int64]int64),
producer: producer,
config: config,
}
sender := newUniqueMergedTimeTickerSender(updater.send)
updater.sender = sender
return updater
}
func (u *mqStatsUpdater) send(ts util.Timestamp, segmentIDs []int64) error {
u.mut.Lock()
defer u.mut.Unlock()
stats := lo.Map(segmentIDs, func(id int64, _ int) *commonpb.SegmentStats {
rowNum := u.stats[id]
return &commonpb.SegmentStats{
SegmentID: id,
NumRows: rowNum,
}
})
msgPack := msgstream.MsgPack{}
timeTickMsg := msgstream.DataNodeTtMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: ts,
EndTimestamp: ts,
HashValues: []uint32{0},
},
DataNodeTtMsg: msgpb.DataNodeTtMsg{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_DataNodeTt),
commonpbutil.WithTimeStamp(ts),
commonpbutil.WithSourceID(u.config.serverID),
),
ChannelName: u.config.vChannelName,
Timestamp: ts,
SegmentsStats: stats,
},
}
msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
sub := tsoutil.SubByNow(ts)
pChan := funcutil.ToPhysicalChannel(u.config.vChannelName)
metrics.DataNodeProduceTimeTickLag.
WithLabelValues(fmt.Sprint(u.config.serverID), fmt.Sprint(u.config.collectionID), pChan).
Set(float64(sub))
err := u.producer.Produce(&msgPack)
if err != nil {
return err
}
for _, segmentID := range segmentIDs {
delete(u.stats, segmentID)
}
return nil
}
func (u *mqStatsUpdater) Update(channel string, ts util.Timestamp, stats []*commonpb.SegmentStats) {
u.mut.Lock()
defer u.mut.Unlock()
segmentIDs := lo.Map(stats, func(stats *commonpb.SegmentStats, _ int) int64 { return stats.SegmentID })
lo.ForEach(stats, func(stats *commonpb.SegmentStats, _ int) {
u.stats[stats.SegmentID] = stats.NumRows
})
u.sender.bufferTs(ts, segmentIDs)
}

View File

@ -1,64 +0,0 @@
package pipeline
import (
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
type MqStatsUpdaterSuite struct {
suite.Suite
producer *msgstream.MockMsgStream
updater *mqStatsUpdater
}
func (s *MqStatsUpdaterSuite) SetupTest() {
s.producer = msgstream.NewMockMsgStream(s.T())
s.updater = &mqStatsUpdater{
stats: make(map[int64]int64),
producer: s.producer,
config: &nodeConfig{
vChannelName: "by-dev-rootcoord-dml_0v0",
},
}
}
func (s *MqStatsUpdaterSuite) TestSend() {
s.Run("send_ok", func() {
s.producer.EXPECT().Produce(mock.Anything).Return(nil)
s.updater.mut.Lock()
s.updater.stats[100] = 1024
s.updater.mut.Unlock()
err := s.updater.send(tsoutil.GetCurrentTime(), []int64{100})
s.NoError(err)
s.updater.mut.Lock()
_, has := s.updater.stats[100]
s.updater.mut.Unlock()
s.False(has)
})
s.Run("send_error", func() {
s.SetupTest()
s.producer.EXPECT().Produce(mock.Anything).Return(errors.New("mocked"))
s.updater.mut.Lock()
s.updater.stats[100] = 1024
s.updater.mut.Unlock()
err := s.updater.send(tsoutil.GetCurrentTime(), []int64{100})
s.Error(err)
})
}
func TestMqStatsUpdater(t *testing.T) {
suite.Run(t, new(MqStatsUpdaterSuite))
}

View File

@ -33,6 +33,10 @@ import (
"github.com/milvus-io/milvus/pkg/util/retry"
)
type StatsUpdater interface {
Update(channel string, ts Timestamp, stats []*commonpb.SegmentStats)
}
// TimeTickSender is to merge channel states updated by flow graph node and send to datacoord periodically
// TimeTickSender hold segmentStats cache for each channel,
// after send succeeds will clean the cache earlier than last sent timestamp

View File

@ -28,7 +28,7 @@ type rankParams struct {
// parseSearchInfo returns QueryInfo and offset
func parseSearchInfo(searchParamsPair []*commonpb.KeyValuePair, schema *schemapb.CollectionSchema, ignoreOffset bool) (*planpb.QueryInfo, int64, error) {
//0. parse iterator field
// 0. parse iterator field
isIterator, _ := funcutil.GetAttrByKeyFromRepeatedKV(IteratorField, searchParamsPair)
// 1. parse offset and real topk
@ -42,8 +42,8 @@ func parseSearchInfo(searchParamsPair []*commonpb.KeyValuePair, schema *schemapb
}
if err := validateLimit(topK); err != nil {
if isIterator == "True" {
//1. if the request is from iterator, we set topK to QuotaLimit as the iterator can resolve too large topK problem
//2. GetAsInt64 has cached inside, no need to worry about cpu cost for parsing here
// 1. if the request is from iterator, we set topK to QuotaLimit as the iterator can resolve too large topK problem
// 2. GetAsInt64 has cached inside, no need to worry about cpu cost for parsing here
topK = Params.QuotaConfig.TopKLimit.GetAsInt64()
} else {
return nil, 0, fmt.Errorf("%s [%d] is invalid, %w", TopKKey, topK, err)

View File

@ -3712,7 +3712,6 @@ type dataNodeConfig struct {
MemoryCheckInterval ParamItem `refreshable:"true"`
MemoryForceSyncWatermark ParamItem `refreshable:"true"`
DataNodeTimeTickByRPC ParamItem `refreshable:"false"`
// DataNode send timetick interval per collection
DataNodeTimeTickInterval ParamItem `refreshable:"false"`
@ -3920,15 +3919,6 @@ func (p *dataNodeConfig) init(base *BaseTable) {
}
p.FileReadConcurrency.Init(base.mgr)
p.DataNodeTimeTickByRPC = ParamItem{
Key: "dataNode.timetick.byRPC",
Version: "2.2.9",
PanicIfEmpty: false,
DefaultValue: "true",
Export: true,
}
p.DataNodeTimeTickByRPC.Init(base.mgr)
p.DataNodeTimeTickInterval = ParamItem{
Key: "dataNode.timetick.interval",
Version: "2.2.5",