2022-10-11 03:39:22 +00:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2022-09-15 10:48:32 +00:00
package dist
import (
"context"
"sync"
"time"
"github.com/golang/protobuf/proto"
2024-05-09 07:41:30 +00:00
"github.com/samber/lo"
2023-04-06 11:14:32 +00:00
"go.uber.org/zap"
2023-06-08 17:28:37 +00:00
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
2022-09-15 10:48:32 +00:00
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
2023-04-06 11:14:32 +00:00
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
2024-01-16 09:33:03 +00:00
"github.com/milvus-io/milvus/pkg/util/paramtable"
2022-09-15 10:48:32 +00:00
)
type distHandler struct {
2024-05-17 02:11:37 +00:00
nodeID int64
c chan struct { }
wg sync . WaitGroup
client session . Cluster
nodeManager * session . NodeManager
scheduler task . Scheduler
dist * meta . DistributionManager
target meta . TargetManagerInterface
mu sync . Mutex
stopOnce sync . Once
lastUpdateTs int64
2022-09-15 10:48:32 +00:00
}
func ( dh * distHandler ) start ( ctx context . Context ) {
defer dh . wg . Done ( )
2023-03-31 07:22:23 +00:00
log := log . Ctx ( ctx ) . With ( zap . Int64 ( "nodeID" , dh . nodeID ) ) . WithRateGroup ( "qcv2.distHandler" , 1 , 60 )
2023-03-21 10:11:57 +00:00
log . Info ( "start dist handler" )
2022-12-07 10:01:19 +00:00
ticker := time . NewTicker ( Params . QueryCoordCfg . DistPullInterval . GetAsDuration ( time . Millisecond ) )
2023-02-23 10:59:45 +00:00
defer ticker . Stop ( )
2024-06-07 00:25:53 +00:00
checkExecutedFlagTicker := time . NewTicker ( Params . QueryCoordCfg . CheckExecutedFlagInterval . GetAsDuration ( time . Millisecond ) )
defer checkExecutedFlagTicker . Stop ( )
2022-09-15 10:48:32 +00:00
failures := 0
for {
select {
case <- ctx . Done ( ) :
2023-03-21 10:11:57 +00:00
log . Info ( "close dist handler due to context done" )
2022-09-15 10:48:32 +00:00
return
case <- dh . c :
2023-03-21 10:11:57 +00:00
log . Info ( "close dist handler" )
2022-09-15 10:48:32 +00:00
return
2024-06-07 00:25:53 +00:00
case <- checkExecutedFlagTicker . C :
executedFlagChan := dh . scheduler . GetExecutedFlag ( dh . nodeID )
if executedFlagChan != nil {
select {
case <- executedFlagChan :
dh . pullDist ( ctx , & failures , false )
default :
2022-12-05 07:09:20 +00:00
}
2023-03-21 10:11:57 +00:00
}
2024-06-07 00:25:53 +00:00
case <- ticker . C :
dh . pullDist ( ctx , & failures , true )
}
}
}
func ( dh * distHandler ) pullDist ( ctx context . Context , failures * int , dispatchTask bool ) {
resp , err := dh . getDistribution ( ctx )
if err != nil {
node := dh . nodeManager . Get ( dh . nodeID )
* failures = * failures + 1
fields := [ ] zap . Field { zap . Int ( "times" , * failures ) }
if node != nil {
fields = append ( fields , zap . Time ( "lastHeartbeat" , node . LastHeartbeat ( ) ) )
2022-09-15 10:48:32 +00:00
}
2024-06-07 00:25:53 +00:00
fields = append ( fields , zap . Error ( err ) )
log . RatedWarn ( 30.0 , "failed to get data distribution" , fields ... )
} else {
* failures = 0
dh . handleDistResp ( resp , dispatchTask )
2022-09-15 10:48:32 +00:00
}
}
2024-06-07 00:25:53 +00:00
func ( dh * distHandler ) handleDistResp ( resp * querypb . GetDataDistributionResponse , dispatchTask bool ) {
2022-09-15 10:48:32 +00:00
node := dh . nodeManager . Get ( resp . GetNodeID ( ) )
2024-05-17 02:11:37 +00:00
if node == nil {
return
}
if time . Since ( node . LastHeartbeat ( ) ) > paramtable . Get ( ) . QueryCoordCfg . HeartBeatWarningLag . GetAsDuration ( time . Millisecond ) {
log . Warn ( "node last heart beat time lag too behind" , zap . Time ( "now" , time . Now ( ) ) ,
zap . Time ( "lastHeartBeatTime" , node . LastHeartbeat ( ) ) , zap . Int64 ( "nodeID" , node . ID ( ) ) )
}
node . SetLastHeartbeat ( time . Now ( ) )
// skip update dist if no distribution change happens in query node
2024-05-27 07:01:42 +00:00
if resp . GetLastModifyTs ( ) != 0 && resp . GetLastModifyTs ( ) <= dh . lastUpdateTs {
2024-05-17 02:11:37 +00:00
log . RatedInfo ( 30 , "skip update dist due to no distribution change" , zap . Int64 ( "lastModifyTs" , resp . GetLastModifyTs ( ) ) , zap . Int64 ( "lastUpdateTs" , dh . lastUpdateTs ) )
} else {
dh . lastUpdateTs = resp . GetLastModifyTs ( )
2022-09-15 10:48:32 +00:00
node . UpdateStats (
session . WithSegmentCnt ( len ( resp . GetSegments ( ) ) ) ,
session . WithChannelCnt ( len ( resp . GetChannels ( ) ) ) ,
)
2024-05-17 02:11:37 +00:00
dh . updateSegmentsDistribution ( resp )
dh . updateChannelsDistribution ( resp )
dh . updateLeaderView ( resp )
}
2022-09-15 10:48:32 +00:00
2024-06-07 00:25:53 +00:00
if dispatchTask {
dh . scheduler . Dispatch ( dh . nodeID )
}
2022-09-15 10:48:32 +00:00
}
func ( dh * distHandler ) updateSegmentsDistribution ( resp * querypb . GetDataDistributionResponse ) {
updates := make ( [ ] * meta . Segment , 0 , len ( resp . GetSegments ( ) ) )
for _ , s := range resp . GetSegments ( ) {
2022-11-07 11:37:04 +00:00
// for collection which is already loaded
2023-10-24 16:44:12 +00:00
segmentInfo := dh . target . GetSealedSegment ( s . GetCollection ( ) , s . GetID ( ) , meta . CurrentTarget )
2022-11-07 11:37:04 +00:00
if segmentInfo == nil {
// for collection which is loading
2023-10-24 16:44:12 +00:00
segmentInfo = dh . target . GetSealedSegment ( s . GetCollection ( ) , s . GetID ( ) , meta . NextTarget )
2022-11-07 11:37:04 +00:00
}
2022-09-15 10:48:32 +00:00
var segment * meta . Segment
if segmentInfo == nil {
segment = & meta . Segment {
SegmentInfo : & datapb . SegmentInfo {
ID : s . GetID ( ) ,
CollectionID : s . GetCollection ( ) ,
PartitionID : s . GetPartition ( ) ,
InsertChannel : s . GetChannel ( ) ,
} ,
2023-03-26 16:42:00 +00:00
Node : resp . GetNodeID ( ) ,
Version : s . GetVersion ( ) ,
LastDeltaTimestamp : s . GetLastDeltaTimestamp ( ) ,
2023-07-11 03:22:29 +00:00
IndexInfo : s . GetIndexInfo ( ) ,
2022-09-15 10:48:32 +00:00
}
} else {
segment = & meta . Segment {
2023-03-26 16:42:00 +00:00
SegmentInfo : proto . Clone ( segmentInfo ) . ( * datapb . SegmentInfo ) ,
Node : resp . GetNodeID ( ) ,
Version : s . GetVersion ( ) ,
LastDeltaTimestamp : s . GetLastDeltaTimestamp ( ) ,
2023-07-11 03:22:29 +00:00
IndexInfo : s . GetIndexInfo ( ) ,
2022-09-15 10:48:32 +00:00
}
}
updates = append ( updates , segment )
}
dh . dist . SegmentDistManager . Update ( resp . GetNodeID ( ) , updates ... )
}
func ( dh * distHandler ) updateChannelsDistribution ( resp * querypb . GetDataDistributionResponse ) {
updates := make ( [ ] * meta . DmChannel , 0 , len ( resp . GetChannels ( ) ) )
for _ , ch := range resp . GetChannels ( ) {
2022-11-07 11:37:04 +00:00
channelInfo := dh . target . GetDmChannel ( ch . GetCollection ( ) , ch . GetChannel ( ) , meta . CurrentTarget )
2022-09-15 10:48:32 +00:00
var channel * meta . DmChannel
if channelInfo == nil {
channel = & meta . DmChannel {
VchannelInfo : & datapb . VchannelInfo {
ChannelName : ch . GetChannel ( ) ,
CollectionID : ch . GetCollection ( ) ,
} ,
Node : resp . GetNodeID ( ) ,
Version : ch . GetVersion ( ) ,
}
} else {
channel = channelInfo . Clone ( )
}
updates = append ( updates , channel )
}
dh . dist . ChannelDistManager . Update ( resp . GetNodeID ( ) , updates ... )
}
func ( dh * distHandler ) updateLeaderView ( resp * querypb . GetDataDistributionResponse ) {
updates := make ( [ ] * meta . LeaderView , 0 , len ( resp . GetLeaderViews ( ) ) )
2024-05-09 07:41:30 +00:00
channels := lo . SliceToMap ( resp . GetChannels ( ) , func ( channel * querypb . ChannelVersionInfo ) ( string , * querypb . ChannelVersionInfo ) {
return channel . GetChannel ( ) , channel
} )
2022-09-15 10:48:32 +00:00
for _ , lview := range resp . GetLeaderViews ( ) {
2022-11-07 11:37:04 +00:00
segments := make ( map [ int64 ] * meta . Segment )
for ID , position := range lview . GrowingSegments {
segments [ ID ] = & meta . Segment {
SegmentInfo : & datapb . SegmentInfo {
ID : ID ,
CollectionID : lview . GetCollection ( ) ,
StartPosition : position ,
InsertChannel : lview . GetChannel ( ) ,
} ,
Node : resp . NodeID ,
}
}
2023-03-21 08:57:57 +00:00
var version int64
2024-05-09 07:41:30 +00:00
channel , ok := channels [ lview . GetChannel ( ) ]
if ok {
version = channel . GetVersion ( )
2023-03-21 08:57:57 +00:00
}
2022-09-15 10:48:32 +00:00
view := & meta . LeaderView {
2024-06-10 13:34:08 +00:00
ID : resp . GetNodeID ( ) ,
CollectionID : lview . GetCollection ( ) ,
Channel : lview . GetChannel ( ) ,
Version : version ,
Segments : lview . GetSegmentDist ( ) ,
GrowingSegments : segments ,
TargetVersion : lview . TargetVersion ,
NumOfGrowingRows : lview . GetNumOfGrowingRows ( ) ,
PartitionStatsVersions : lview . PartitionStatsVersions ,
2022-09-15 10:48:32 +00:00
}
updates = append ( updates , view )
}
dh . dist . LeaderViewManager . Update ( resp . GetNodeID ( ) , updates ... )
}
2023-05-22 11:37:25 +00:00
func ( dh * distHandler ) getDistribution ( ctx context . Context ) ( * querypb . GetDataDistributionResponse , error ) {
2022-09-15 10:48:32 +00:00
dh . mu . Lock ( )
defer dh . mu . Unlock ( )
2023-03-26 16:42:00 +00:00
2024-01-16 09:33:03 +00:00
ctx , cancel := context . WithTimeout ( ctx , paramtable . Get ( ) . QueryCoordCfg . DistributionRequestTimeout . GetAsDuration ( time . Millisecond ) )
2023-03-21 10:11:57 +00:00
defer cancel ( )
resp , err := dh . client . GetDataDistribution ( ctx , dh . nodeID , & querypb . GetDataDistributionRequest {
2022-10-21 07:57:28 +00:00
Base : commonpbutil . NewMsgBase (
commonpbutil . WithMsgType ( commonpb . MsgType_GetDistribution ) ,
) ,
2024-05-17 02:11:37 +00:00
LastUpdateTs : dh . lastUpdateTs ,
2022-10-08 12:26:57 +00:00
} )
2023-03-21 10:11:57 +00:00
if err != nil {
2023-05-22 11:37:25 +00:00
return nil , err
2022-09-15 10:48:32 +00:00
}
2023-03-21 10:11:57 +00:00
if ! merr . Ok ( resp . GetStatus ( ) ) {
2023-05-22 11:37:25 +00:00
return nil , merr . Error ( resp . GetStatus ( ) )
2022-12-06 14:59:19 +00:00
}
2023-05-22 11:37:25 +00:00
return resp , nil
2022-09-15 10:48:32 +00:00
}
func ( dh * distHandler ) stop ( ) {
2022-10-19 04:13:28 +00:00
dh . stopOnce . Do ( func ( ) {
close ( dh . c )
dh . wg . Wait ( )
2023-09-27 08:27:27 +00:00
// clear dist
dh . dist . ChannelDistManager . Update ( dh . nodeID )
dh . dist . SegmentDistManager . Update ( dh . nodeID )
2022-10-19 04:13:28 +00:00
} )
2022-09-15 10:48:32 +00:00
}
func newDistHandler (
ctx context . Context ,
nodeID int64 ,
client session . Cluster ,
nodeManager * session . NodeManager ,
scheduler task . Scheduler ,
dist * meta . DistributionManager ,
2024-05-17 02:11:37 +00:00
targetMgr meta . TargetManagerInterface ,
2022-09-15 10:48:32 +00:00
) * distHandler {
h := & distHandler {
nodeID : nodeID ,
c : make ( chan struct { } ) ,
client : client ,
nodeManager : nodeManager ,
scheduler : scheduler ,
dist : dist ,
target : targetMgr ,
}
h . wg . Add ( 1 )
go h . start ( ctx )
return h
}