2022-10-11 03:39:22 +00:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2022-09-15 10:48:32 +00:00
package dist
import (
"context"
2024-11-12 08:34:28 +00:00
"fmt"
2022-09-15 10:48:32 +00:00
"sync"
"time"
2024-05-09 07:41:30 +00:00
"github.com/samber/lo"
2023-04-06 11:14:32 +00:00
"go.uber.org/zap"
2025-01-15 12:17:00 +00:00
"google.golang.org/protobuf/proto"
2023-04-06 11:14:32 +00:00
2023-06-08 17:28:37 +00:00
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
2022-09-15 10:48:32 +00:00
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
2024-09-03 07:39:03 +00:00
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
2023-04-06 11:14:32 +00:00
"github.com/milvus-io/milvus/pkg/log"
2025-01-10 02:49:01 +00:00
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/proto/querypb"
2023-04-06 11:14:32 +00:00
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
2024-01-16 09:33:03 +00:00
"github.com/milvus-io/milvus/pkg/util/paramtable"
2025-01-15 12:17:00 +00:00
"github.com/milvus-io/milvus/pkg/util/timerecord"
2024-11-15 02:20:31 +00:00
"github.com/milvus-io/milvus/pkg/util/typeutil"
2022-09-15 10:48:32 +00:00
)
2024-11-12 08:34:28 +00:00
type TriggerUpdateTargetVersion = func ( collectionID int64 )
2022-09-15 10:48:32 +00:00
type distHandler struct {
2024-05-17 02:11:37 +00:00
nodeID int64
c chan struct { }
wg sync . WaitGroup
client session . Cluster
nodeManager * session . NodeManager
scheduler task . Scheduler
dist * meta . DistributionManager
target meta . TargetManagerInterface
mu sync . Mutex
stopOnce sync . Once
lastUpdateTs int64
2024-11-12 08:34:28 +00:00
syncTargetVersionFn TriggerUpdateTargetVersion
2022-09-15 10:48:32 +00:00
}
func ( dh * distHandler ) start ( ctx context . Context ) {
defer dh . wg . Done ( )
2023-03-31 07:22:23 +00:00
log := log . Ctx ( ctx ) . With ( zap . Int64 ( "nodeID" , dh . nodeID ) ) . WithRateGroup ( "qcv2.distHandler" , 1 , 60 )
2023-03-21 10:11:57 +00:00
log . Info ( "start dist handler" )
2022-12-07 10:01:19 +00:00
ticker := time . NewTicker ( Params . QueryCoordCfg . DistPullInterval . GetAsDuration ( time . Millisecond ) )
2023-02-23 10:59:45 +00:00
defer ticker . Stop ( )
2024-06-07 00:25:53 +00:00
checkExecutedFlagTicker := time . NewTicker ( Params . QueryCoordCfg . CheckExecutedFlagInterval . GetAsDuration ( time . Millisecond ) )
defer checkExecutedFlagTicker . Stop ( )
2022-09-15 10:48:32 +00:00
failures := 0
for {
select {
case <- ctx . Done ( ) :
2023-03-21 10:11:57 +00:00
log . Info ( "close dist handler due to context done" )
2022-09-15 10:48:32 +00:00
return
case <- dh . c :
2023-03-21 10:11:57 +00:00
log . Info ( "close dist handler" )
2022-09-15 10:48:32 +00:00
return
2024-06-07 00:25:53 +00:00
case <- checkExecutedFlagTicker . C :
executedFlagChan := dh . scheduler . GetExecutedFlag ( dh . nodeID )
if executedFlagChan != nil {
select {
case <- executedFlagChan :
dh . pullDist ( ctx , & failures , false )
default :
2022-12-05 07:09:20 +00:00
}
2023-03-21 10:11:57 +00:00
}
2024-06-07 00:25:53 +00:00
case <- ticker . C :
dh . pullDist ( ctx , & failures , true )
}
}
}
func ( dh * distHandler ) pullDist ( ctx context . Context , failures * int , dispatchTask bool ) {
2025-01-15 12:17:00 +00:00
tr := timerecord . NewTimeRecorder ( "" )
2024-06-07 00:25:53 +00:00
resp , err := dh . getDistribution ( ctx )
2025-01-15 12:17:00 +00:00
d1 := tr . RecordSpan ( )
2024-06-07 00:25:53 +00:00
if err != nil {
node := dh . nodeManager . Get ( dh . nodeID )
* failures = * failures + 1
fields := [ ] zap . Field { zap . Int ( "times" , * failures ) }
if node != nil {
fields = append ( fields , zap . Time ( "lastHeartbeat" , node . LastHeartbeat ( ) ) )
2022-09-15 10:48:32 +00:00
}
2024-06-07 00:25:53 +00:00
fields = append ( fields , zap . Error ( err ) )
2025-01-15 12:17:00 +00:00
log . Ctx ( ctx ) . WithRateGroup ( "distHandler.pullDist" , 1 , 60 ) .
RatedWarn ( 30.0 , "failed to get data distribution" , fields ... )
2024-06-07 00:25:53 +00:00
} else {
* failures = 0
2024-11-25 03:14:34 +00:00
dh . handleDistResp ( ctx , resp , dispatchTask )
2022-09-15 10:48:32 +00:00
}
2025-01-15 12:17:00 +00:00
log . Ctx ( ctx ) . WithRateGroup ( "distHandler.pullDist" , 1 , 120 ) .
RatedInfo ( 120.0 , "pull and handle distribution done" ,
zap . Int ( "respSize" , proto . Size ( resp ) ) , zap . Duration ( "pullDur" , d1 ) , zap . Duration ( "handleDur" , tr . RecordSpan ( ) ) )
2022-09-15 10:48:32 +00:00
}
2024-11-25 03:14:34 +00:00
func ( dh * distHandler ) handleDistResp ( ctx context . Context , resp * querypb . GetDataDistributionResponse , dispatchTask bool ) {
2022-09-15 10:48:32 +00:00
node := dh . nodeManager . Get ( resp . GetNodeID ( ) )
2024-05-17 02:11:37 +00:00
if node == nil {
return
}
if time . Since ( node . LastHeartbeat ( ) ) > paramtable . Get ( ) . QueryCoordCfg . HeartBeatWarningLag . GetAsDuration ( time . Millisecond ) {
log . Warn ( "node last heart beat time lag too behind" , zap . Time ( "now" , time . Now ( ) ) ,
zap . Time ( "lastHeartBeatTime" , node . LastHeartbeat ( ) ) , zap . Int64 ( "nodeID" , node . ID ( ) ) )
}
node . SetLastHeartbeat ( time . Now ( ) )
// skip update dist if no distribution change happens in query node
2024-05-27 07:01:42 +00:00
if resp . GetLastModifyTs ( ) != 0 && resp . GetLastModifyTs ( ) <= dh . lastUpdateTs {
2024-05-17 02:11:37 +00:00
log . RatedInfo ( 30 , "skip update dist due to no distribution change" , zap . Int64 ( "lastModifyTs" , resp . GetLastModifyTs ( ) ) , zap . Int64 ( "lastUpdateTs" , dh . lastUpdateTs ) )
} else {
dh . lastUpdateTs = resp . GetLastModifyTs ( )
2022-09-15 10:48:32 +00:00
node . UpdateStats (
session . WithSegmentCnt ( len ( resp . GetSegments ( ) ) ) ,
session . WithChannelCnt ( len ( resp . GetChannels ( ) ) ) ,
2024-09-30 08:15:17 +00:00
session . WithMemCapacity ( resp . GetMemCapacityInMB ( ) ) ,
2022-09-15 10:48:32 +00:00
)
2024-11-25 03:14:34 +00:00
dh . updateSegmentsDistribution ( ctx , resp )
dh . updateChannelsDistribution ( ctx , resp )
dh . updateLeaderView ( ctx , resp )
2024-05-17 02:11:37 +00:00
}
2022-09-15 10:48:32 +00:00
2024-06-07 00:25:53 +00:00
if dispatchTask {
dh . scheduler . Dispatch ( dh . nodeID )
}
2022-09-15 10:48:32 +00:00
}
2024-11-25 03:14:34 +00:00
func ( dh * distHandler ) updateSegmentsDistribution ( ctx context . Context , resp * querypb . GetDataDistributionResponse ) {
2022-09-15 10:48:32 +00:00
updates := make ( [ ] * meta . Segment , 0 , len ( resp . GetSegments ( ) ) )
for _ , s := range resp . GetSegments ( ) {
2024-11-25 03:14:34 +00:00
segmentInfo := dh . target . GetSealedSegment ( ctx , s . GetCollection ( ) , s . GetID ( ) , meta . CurrentTargetFirst )
2022-11-07 11:37:04 +00:00
if segmentInfo == nil {
2024-08-02 02:18:13 +00:00
segmentInfo = & datapb . SegmentInfo {
ID : s . GetID ( ) ,
CollectionID : s . GetCollection ( ) ,
PartitionID : s . GetPartition ( ) ,
InsertChannel : s . GetChannel ( ) ,
Level : s . GetLevel ( ) ,
2024-09-30 02:01:16 +00:00
IsSorted : s . GetIsSorted ( ) ,
2022-09-15 10:48:32 +00:00
}
}
2024-08-02 02:18:13 +00:00
updates = append ( updates , & meta . Segment {
2024-11-20 03:26:31 +00:00
SegmentInfo : segmentInfo ,
2024-08-02 02:18:13 +00:00
Node : resp . GetNodeID ( ) ,
Version : s . GetVersion ( ) ,
LastDeltaTimestamp : s . GetLastDeltaTimestamp ( ) ,
IndexInfo : s . GetIndexInfo ( ) ,
} )
2022-09-15 10:48:32 +00:00
}
dh . dist . SegmentDistManager . Update ( resp . GetNodeID ( ) , updates ... )
}
2024-11-25 03:14:34 +00:00
func ( dh * distHandler ) updateChannelsDistribution ( ctx context . Context , resp * querypb . GetDataDistributionResponse ) {
2022-09-15 10:48:32 +00:00
updates := make ( [ ] * meta . DmChannel , 0 , len ( resp . GetChannels ( ) ) )
for _ , ch := range resp . GetChannels ( ) {
2024-11-25 03:14:34 +00:00
channelInfo := dh . target . GetDmChannel ( ctx , ch . GetCollection ( ) , ch . GetChannel ( ) , meta . CurrentTarget )
2022-09-15 10:48:32 +00:00
var channel * meta . DmChannel
if channelInfo == nil {
channel = & meta . DmChannel {
VchannelInfo : & datapb . VchannelInfo {
ChannelName : ch . GetChannel ( ) ,
CollectionID : ch . GetCollection ( ) ,
} ,
Node : resp . GetNodeID ( ) ,
Version : ch . GetVersion ( ) ,
}
} else {
2024-11-21 02:40:31 +00:00
channel = & meta . DmChannel {
VchannelInfo : channelInfo . VchannelInfo ,
Node : resp . GetNodeID ( ) ,
Version : ch . GetVersion ( ) ,
}
2022-09-15 10:48:32 +00:00
}
updates = append ( updates , channel )
}
dh . dist . ChannelDistManager . Update ( resp . GetNodeID ( ) , updates ... )
}
2024-11-25 03:14:34 +00:00
func ( dh * distHandler ) updateLeaderView ( ctx context . Context , resp * querypb . GetDataDistributionResponse ) {
2022-09-15 10:48:32 +00:00
updates := make ( [ ] * meta . LeaderView , 0 , len ( resp . GetLeaderViews ( ) ) )
2024-05-09 07:41:30 +00:00
channels := lo . SliceToMap ( resp . GetChannels ( ) , func ( channel * querypb . ChannelVersionInfo ) ( string , * querypb . ChannelVersionInfo ) {
return channel . GetChannel ( ) , channel
} )
2024-11-15 02:20:31 +00:00
collectionsToSync := typeutil . NewUniqueSet ( )
2022-09-15 10:48:32 +00:00
for _ , lview := range resp . GetLeaderViews ( ) {
2022-11-07 11:37:04 +00:00
segments := make ( map [ int64 ] * meta . Segment )
for ID , position := range lview . GrowingSegments {
segments [ ID ] = & meta . Segment {
SegmentInfo : & datapb . SegmentInfo {
ID : ID ,
CollectionID : lview . GetCollection ( ) ,
StartPosition : position ,
InsertChannel : lview . GetChannel ( ) ,
} ,
Node : resp . NodeID ,
}
}
2023-03-21 08:57:57 +00:00
var version int64
2024-05-09 07:41:30 +00:00
channel , ok := channels [ lview . GetChannel ( ) ]
if ok {
version = channel . GetVersion ( )
2023-03-21 08:57:57 +00:00
}
2022-09-15 10:48:32 +00:00
view := & meta . LeaderView {
2024-06-10 13:34:08 +00:00
ID : resp . GetNodeID ( ) ,
CollectionID : lview . GetCollection ( ) ,
Channel : lview . GetChannel ( ) ,
Version : version ,
Segments : lview . GetSegmentDist ( ) ,
GrowingSegments : segments ,
TargetVersion : lview . TargetVersion ,
NumOfGrowingRows : lview . GetNumOfGrowingRows ( ) ,
PartitionStatsVersions : lview . PartitionStatsVersions ,
2022-09-15 10:48:32 +00:00
}
2024-11-12 08:34:28 +00:00
updates = append ( updates , view )
2024-09-03 07:39:03 +00:00
// check leader serviceable
2024-11-12 08:34:28 +00:00
if err := utils . CheckDelegatorDataReady ( dh . nodeManager , dh . target , view , meta . CurrentTarget ) ; err != nil {
2024-09-03 07:39:03 +00:00
view . UnServiceableError = err
2024-11-12 08:34:28 +00:00
log . Info ( "leader is not available due to distribution not ready" ,
zap . Int64 ( "collectionID" , view . CollectionID ) ,
zap . Int64 ( "nodeID" , view . ID ) ,
zap . String ( "channel" , view . Channel ) ,
zap . Error ( err ) )
continue
}
// if target version hasn't been synced, delegator will get empty readable segment list
// so shard leader should be unserviceable until target version is synced
2024-11-25 03:14:34 +00:00
currentTargetVersion := dh . target . GetCollectionTargetVersion ( ctx , lview . GetCollection ( ) , meta . CurrentTarget )
2024-11-12 08:34:28 +00:00
if lview . TargetVersion <= 0 {
err := merr . WrapErrServiceInternal ( fmt . Sprintf ( "target version mismatch, collection: %d, channel: %s, current target version: %v, leader version: %v" ,
lview . GetCollection ( ) , lview . GetChannel ( ) , currentTargetVersion , lview . TargetVersion ) )
view . UnServiceableError = err
2024-11-15 02:20:31 +00:00
// make dist handler pull next distribution until all delegator is serviceable
dh . lastUpdateTs = 0
collectionsToSync . Insert ( lview . Collection )
2024-11-12 08:34:28 +00:00
log . Info ( "leader is not available due to target version not ready" ,
zap . Int64 ( "collectionID" , view . CollectionID ) ,
zap . Int64 ( "nodeID" , view . ID ) ,
zap . String ( "channel" , view . Channel ) ,
zap . Error ( err ) )
2024-09-03 07:39:03 +00:00
}
2022-09-15 10:48:32 +00:00
}
dh . dist . LeaderViewManager . Update ( resp . GetNodeID ( ) , updates ... )
2024-11-15 02:20:31 +00:00
// segment and channel already loaded, trigger target observer to update
collectionsToSync . Range ( func ( collection int64 ) bool {
dh . syncTargetVersionFn ( collection )
return true
} )
2022-09-15 10:48:32 +00:00
}
2023-05-22 11:37:25 +00:00
func ( dh * distHandler ) getDistribution ( ctx context . Context ) ( * querypb . GetDataDistributionResponse , error ) {
2022-09-15 10:48:32 +00:00
dh . mu . Lock ( )
defer dh . mu . Unlock ( )
2023-03-26 16:42:00 +00:00
2024-01-16 09:33:03 +00:00
ctx , cancel := context . WithTimeout ( ctx , paramtable . Get ( ) . QueryCoordCfg . DistributionRequestTimeout . GetAsDuration ( time . Millisecond ) )
2023-03-21 10:11:57 +00:00
defer cancel ( )
resp , err := dh . client . GetDataDistribution ( ctx , dh . nodeID , & querypb . GetDataDistributionRequest {
2022-10-21 07:57:28 +00:00
Base : commonpbutil . NewMsgBase (
commonpbutil . WithMsgType ( commonpb . MsgType_GetDistribution ) ,
) ,
2024-05-17 02:11:37 +00:00
LastUpdateTs : dh . lastUpdateTs ,
2022-10-08 12:26:57 +00:00
} )
2023-03-21 10:11:57 +00:00
if err != nil {
2023-05-22 11:37:25 +00:00
return nil , err
2022-09-15 10:48:32 +00:00
}
2023-03-21 10:11:57 +00:00
if ! merr . Ok ( resp . GetStatus ( ) ) {
2023-05-22 11:37:25 +00:00
return nil , merr . Error ( resp . GetStatus ( ) )
2022-12-06 14:59:19 +00:00
}
2023-05-22 11:37:25 +00:00
return resp , nil
2022-09-15 10:48:32 +00:00
}
func ( dh * distHandler ) stop ( ) {
2022-10-19 04:13:28 +00:00
dh . stopOnce . Do ( func ( ) {
close ( dh . c )
dh . wg . Wait ( )
2023-09-27 08:27:27 +00:00
// clear dist
dh . dist . ChannelDistManager . Update ( dh . nodeID )
dh . dist . SegmentDistManager . Update ( dh . nodeID )
2022-10-19 04:13:28 +00:00
} )
2022-09-15 10:48:32 +00:00
}
func newDistHandler (
ctx context . Context ,
nodeID int64 ,
client session . Cluster ,
nodeManager * session . NodeManager ,
scheduler task . Scheduler ,
dist * meta . DistributionManager ,
2024-05-17 02:11:37 +00:00
targetMgr meta . TargetManagerInterface ,
2024-11-12 08:34:28 +00:00
syncTargetVersionFn TriggerUpdateTargetVersion ,
2022-09-15 10:48:32 +00:00
) * distHandler {
h := & distHandler {
2024-11-12 08:34:28 +00:00
nodeID : nodeID ,
c : make ( chan struct { } ) ,
client : client ,
nodeManager : nodeManager ,
scheduler : scheduler ,
dist : dist ,
target : targetMgr ,
syncTargetVersionFn : syncTargetVersionFn ,
2022-09-15 10:48:32 +00:00
}
h . wg . Add ( 1 )
go h . start ( ctx )
return h
}