2021-10-25 11:46:28 +00:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2021-06-22 02:42:07 +00:00
package datacoord
2021-05-25 07:35:37 +00:00
import (
"context"
"fmt"
"strconv"
2021-05-28 01:55:21 +00:00
"sync/atomic"
2021-11-05 14:25:00 +00:00
"time"
2021-05-25 07:35:37 +00:00
2021-11-19 05:57:12 +00:00
"github.com/milvus-io/milvus/internal/common"
2021-05-25 07:35:37 +00:00
"github.com/milvus-io/milvus/internal/log"
2021-12-21 11:13:13 +00:00
"github.com/milvus-io/milvus/internal/logutil"
2021-05-25 07:35:37 +00:00
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
2021-09-01 02:13:15 +00:00
"github.com/milvus-io/milvus/internal/util/metricsinfo"
2021-12-21 11:13:13 +00:00
"github.com/milvus-io/milvus/internal/util/trace"
2021-05-25 07:35:37 +00:00
"go.uber.org/zap"
)
2021-12-21 01:34:53 +00:00
const moduleName = "DataCoord"
2021-09-06 03:12:42 +00:00
// checks whether server in Healthy State
2021-05-28 01:55:21 +00:00
func ( s * Server ) isClosed ( ) bool {
2021-06-29 02:46:13 +00:00
return atomic . LoadInt64 ( & s . isServing ) != ServerStateHealthy
2021-05-25 07:35:37 +00:00
}
2021-09-06 03:12:42 +00:00
// GetTimeTickChannel legacy API, returns time tick channel name
2021-05-25 07:35:37 +00:00
func ( s * Server ) GetTimeTickChannel ( ctx context . Context ) ( * milvuspb . StringResponse , error ) {
return & milvuspb . StringResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
} ,
2021-12-23 10:39:11 +00:00
Value : Params . DataCoordCfg . TimeTickChannelName ,
2021-05-25 07:35:37 +00:00
} , nil
}
2021-09-06 03:12:42 +00:00
// GetStatisticsChannel legacy API, returns statistics channel name
2021-05-25 07:35:37 +00:00
func ( s * Server ) GetStatisticsChannel ( ctx context . Context ) ( * milvuspb . StringResponse , error ) {
return & milvuspb . StringResponse {
Status : & commonpb . Status {
2021-12-15 02:53:16 +00:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : "no statistics channel" ,
2021-05-25 07:35:37 +00:00
} ,
} , nil
}
2021-09-06 03:12:42 +00:00
// Flush notify segment to flush
// this api only guarantees all the segments requested is sealed
// these segments will be flushed only after the Flush policy is fulfilled
2021-06-23 08:56:11 +00:00
func ( s * Server ) Flush ( ctx context . Context , req * datapb . FlushRequest ) ( * datapb . FlushResponse , error ) {
2021-07-28 03:43:22 +00:00
log . Debug ( "receive flush request" , zap . Int64 ( "dbID" , req . GetDbID ( ) ) , zap . Int64 ( "collectionID" , req . GetCollectionID ( ) ) )
2021-09-22 11:33:54 +00:00
sp , ctx := trace . StartSpanFromContextWithOperationName ( ctx , "DataCoord-Flush" )
defer sp . Finish ( )
2021-06-23 08:56:11 +00:00
resp := & datapb . FlushResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : "" ,
} ,
DbID : 0 ,
CollectionID : 0 ,
SegmentIDs : [ ] int64 { } ,
2021-05-28 01:55:21 +00:00
}
if s . isClosed ( ) {
2021-06-23 08:56:11 +00:00
resp . Status . Reason = serverNotServingErrMsg
2021-05-28 01:55:21 +00:00
return resp , nil
2021-05-25 07:35:37 +00:00
}
2021-06-23 08:56:11 +00:00
sealedSegments , err := s . segmentManager . SealAllSegments ( ctx , req . CollectionID )
if err != nil {
2021-07-28 03:43:22 +00:00
resp . Status . Reason = fmt . Sprintf ( "failed to flush %d, %s" , req . CollectionID , err )
2021-05-28 01:55:21 +00:00
return resp , nil
2021-05-25 07:35:37 +00:00
}
2021-09-30 10:12:25 +00:00
log . Debug ( "flush response with segments" ,
zap . Int64 ( "collectionID" , req . GetCollectionID ( ) ) ,
zap . Any ( "segments" , sealedSegments ) )
2021-06-23 08:56:11 +00:00
resp . Status . ErrorCode = commonpb . ErrorCode_Success
resp . DbID = req . GetDbID ( )
resp . CollectionID = req . GetCollectionID ( )
resp . SegmentIDs = sealedSegments
return resp , nil
2021-05-25 07:35:37 +00:00
}
2021-09-06 03:12:42 +00:00
// AssignSegmentID applies for segment ids and make allocation for records
2021-05-25 07:35:37 +00:00
func ( s * Server ) AssignSegmentID ( ctx context . Context , req * datapb . AssignSegmentIDRequest ) ( * datapb . AssignSegmentIDResponse , error ) {
2021-05-28 01:55:21 +00:00
if s . isClosed ( ) {
2021-05-25 07:35:37 +00:00
return & datapb . AssignSegmentIDResponse {
Status : & commonpb . Status {
2021-06-22 10:24:08 +00:00
Reason : serverNotServingErrMsg ,
2021-05-25 07:35:37 +00:00
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
} , nil
}
assigns := make ( [ ] * datapb . SegmentIDAssignment , 0 , len ( req . SegmentIDRequests ) )
for _ , r := range req . SegmentIDRequests {
2021-07-28 03:43:22 +00:00
log . Debug ( "handle assign segment request" ,
2021-06-08 11:25:37 +00:00
zap . Int64 ( "collectionID" , r . GetCollectionID ( ) ) ,
zap . Int64 ( "partitionID" , r . GetPartitionID ( ) ) ,
zap . String ( "channelName" , r . GetChannelName ( ) ) ,
zap . Uint32 ( "count" , r . GetCount ( ) ) )
2021-11-18 14:31:34 +00:00
if s . meta . GetCollection ( r . GetCollectionID ( ) ) == nil {
err := s . loadCollectionFromRootCoord ( ctx , r . GetCollectionID ( ) )
if err != nil {
log . Warn ( "failed to load collection in alloc segment" , zap . Any ( "request" , r ) , zap . Error ( err ) )
continue
}
}
2021-07-12 03:03:52 +00:00
s . cluster . Watch ( r . ChannelName , r . CollectionID )
2021-05-29 02:47:29 +00:00
2021-07-23 13:58:33 +00:00
allocations , err := s . segmentManager . AllocSegment ( ctx ,
2021-05-25 07:35:37 +00:00
r . CollectionID , r . PartitionID , r . ChannelName , int64 ( r . Count ) )
if err != nil {
2021-07-23 13:58:33 +00:00
log . Warn ( "failed to alloc segment" , zap . Any ( "request" , r ) , zap . Error ( err ) )
2021-05-25 07:35:37 +00:00
continue
}
2022-01-04 11:35:32 +00:00
log . Debug ( "success to assign segments" , zap . Int64 ( "collectionID" , r . GetCollectionID ( ) ) , zap . Any ( "assignments" , allocations ) )
2021-06-08 11:25:37 +00:00
2021-07-23 13:58:33 +00:00
for _ , allocation := range allocations {
result := & datapb . SegmentIDAssignment {
SegID : allocation . SegmentID ,
ChannelName : r . ChannelName ,
Count : uint32 ( allocation . NumOfRows ) ,
CollectionID : r . CollectionID ,
PartitionID : r . PartitionID ,
ExpireTime : allocation . ExpireTime ,
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
Reason : "" ,
} ,
}
assigns = append ( assigns , result )
2021-05-25 07:35:37 +00:00
}
}
return & datapb . AssignSegmentIDResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
} ,
SegIDAssignments : assigns ,
} , nil
}
2021-05-26 11:06:56 +00:00
2021-09-06 03:12:42 +00:00
// GetSegmentStates returns segments state
2021-05-25 07:35:37 +00:00
func ( s * Server ) GetSegmentStates ( ctx context . Context , req * datapb . GetSegmentStatesRequest ) ( * datapb . GetSegmentStatesResponse , error ) {
resp := & datapb . GetSegmentStatesResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
2021-05-28 01:55:21 +00:00
if s . isClosed ( ) {
2021-06-22 10:24:08 +00:00
resp . Status . Reason = serverNotServingErrMsg
2021-05-25 07:35:37 +00:00
return resp , nil
}
for _ , segmentID := range req . SegmentIDs {
state := & datapb . SegmentStateInfo {
Status : & commonpb . Status { } ,
SegmentID : segmentID ,
}
2021-07-07 06:02:01 +00:00
segmentInfo := s . meta . GetSegment ( segmentID )
if segmentInfo == nil {
2021-05-25 07:35:37 +00:00
state . Status . ErrorCode = commonpb . ErrorCode_UnexpectedError
2021-07-28 03:43:22 +00:00
state . Status . Reason = fmt . Sprintf ( "failed to get segment %d" , segmentID )
2021-05-25 07:35:37 +00:00
} else {
state . Status . ErrorCode = commonpb . ErrorCode_Success
2021-06-15 03:06:42 +00:00
state . State = segmentInfo . GetState ( )
state . StartPosition = segmentInfo . GetStartPosition ( )
2021-05-25 07:35:37 +00:00
}
resp . States = append ( resp . States , state )
}
resp . Status . ErrorCode = commonpb . ErrorCode_Success
return resp , nil
}
2021-09-06 03:12:42 +00:00
// GetInsertBinlogPaths returns binlog paths info for requested segments
2021-05-25 07:35:37 +00:00
func ( s * Server ) GetInsertBinlogPaths ( ctx context . Context , req * datapb . GetInsertBinlogPathsRequest ) ( * datapb . GetInsertBinlogPathsResponse , error ) {
resp := & datapb . GetInsertBinlogPathsResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
2021-06-09 07:02:48 +00:00
if s . isClosed ( ) {
2021-06-22 10:24:08 +00:00
resp . Status . Reason = serverNotServingErrMsg
2021-06-09 07:02:48 +00:00
return resp , nil
}
2021-08-19 05:00:12 +00:00
segment := s . meta . GetSegment ( req . GetSegmentID ( ) )
if segment == nil {
resp . Status . Reason = "segment not found"
2021-05-25 07:35:37 +00:00
return resp , nil
}
2021-08-19 05:00:12 +00:00
binlogs := segment . GetBinlogs ( )
fids := make ( [ ] UniqueID , 0 , len ( binlogs ) )
paths := make ( [ ] * internalpb . StringList , 0 , len ( binlogs ) )
for _ , field := range binlogs {
fids = append ( fids , field . GetFieldID ( ) )
2021-12-19 12:00:42 +00:00
binlogs := field . GetBinlogs ( )
p := make ( [ ] string , 0 , len ( binlogs ) )
for _ , log := range binlogs {
p = append ( p , log . GetLogPath ( ) )
}
paths = append ( paths , & internalpb . StringList { Values : p } )
2021-05-25 07:35:37 +00:00
}
resp . Status . ErrorCode = commonpb . ErrorCode_Success
resp . FieldIDs = fids
resp . Paths = paths
return resp , nil
}
2021-09-06 03:12:42 +00:00
// GetCollectionStatistics returns statistics for collection
// for now only row count is returned
2021-05-25 07:35:37 +00:00
func ( s * Server ) GetCollectionStatistics ( ctx context . Context , req * datapb . GetCollectionStatisticsRequest ) ( * datapb . GetCollectionStatisticsResponse , error ) {
2021-12-21 01:34:53 +00:00
ctx = logutil . WithModule ( ctx , moduleName )
logutil . Logger ( ctx ) . Debug ( "received request to get collection statistics" )
2021-05-25 07:35:37 +00:00
resp := & datapb . GetCollectionStatisticsResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
2021-06-09 07:02:48 +00:00
if s . isClosed ( ) {
2021-06-22 10:24:08 +00:00
resp . Status . Reason = serverNotServingErrMsg
2021-06-09 07:02:48 +00:00
return resp , nil
}
2021-07-07 06:02:01 +00:00
nums := s . meta . GetNumRowsOfCollection ( req . CollectionID )
2021-05-25 07:35:37 +00:00
resp . Status . ErrorCode = commonpb . ErrorCode_Success
resp . Stats = append ( resp . Stats , & commonpb . KeyValuePair { Key : "row_count" , Value : strconv . FormatInt ( nums , 10 ) } )
2021-12-21 01:34:53 +00:00
logutil . Logger ( ctx ) . Debug ( "success to get collection statistics" , zap . Any ( "response" , resp ) )
2021-05-25 07:35:37 +00:00
return resp , nil
}
2021-12-21 11:05:19 +00:00
// GetPartitionStatistics returns statistics for parition
2021-09-06 03:12:42 +00:00
// for now only row count is returned
2021-05-25 07:35:37 +00:00
func ( s * Server ) GetPartitionStatistics ( ctx context . Context , req * datapb . GetPartitionStatisticsRequest ) ( * datapb . GetPartitionStatisticsResponse , error ) {
resp := & datapb . GetPartitionStatisticsResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
2021-06-09 07:02:48 +00:00
if s . isClosed ( ) {
2021-06-22 10:24:08 +00:00
resp . Status . Reason = serverNotServingErrMsg
2021-06-09 07:02:48 +00:00
return resp , nil
}
2021-07-07 06:02:01 +00:00
nums := s . meta . GetNumRowsOfPartition ( req . CollectionID , req . PartitionID )
2021-05-25 07:35:37 +00:00
resp . Status . ErrorCode = commonpb . ErrorCode_Success
resp . Stats = append ( resp . Stats , & commonpb . KeyValuePair { Key : "row_count" , Value : strconv . FormatInt ( nums , 10 ) } )
return resp , nil
}
2021-09-06 03:12:42 +00:00
// GetSegmentInfoChannel legacy API, returns segment info statistics channel
2021-05-25 07:35:37 +00:00
func ( s * Server ) GetSegmentInfoChannel ( ctx context . Context ) ( * milvuspb . StringResponse , error ) {
return & milvuspb . StringResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
} ,
2021-12-23 10:39:11 +00:00
Value : Params . DataCoordCfg . SegmentInfoChannelName ,
2021-05-25 07:35:37 +00:00
} , nil
}
2021-09-06 03:12:42 +00:00
// GetSegmentInfo returns segment info requested, status, row count, etc included
2021-05-25 07:35:37 +00:00
func ( s * Server ) GetSegmentInfo ( ctx context . Context , req * datapb . GetSegmentInfoRequest ) ( * datapb . GetSegmentInfoResponse , error ) {
resp := & datapb . GetSegmentInfoResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
2021-05-28 01:55:21 +00:00
if s . isClosed ( ) {
2021-06-22 10:24:08 +00:00
resp . Status . Reason = serverNotServingErrMsg
2021-05-25 07:35:37 +00:00
return resp , nil
}
infos := make ( [ ] * datapb . SegmentInfo , 0 , len ( req . SegmentIDs ) )
for _ , id := range req . SegmentIDs {
2021-07-07 06:02:01 +00:00
info := s . meta . GetSegment ( id )
if info == nil {
2021-07-28 03:43:22 +00:00
resp . Status . Reason = fmt . Sprintf ( "failed to get segment %d" , id )
2021-05-25 07:35:37 +00:00
return resp , nil
}
2021-07-12 09:24:25 +00:00
infos = append ( infos , info . SegmentInfo )
2021-05-25 07:35:37 +00:00
}
resp . Status . ErrorCode = commonpb . ErrorCode_Success
resp . Infos = infos
return resp , nil
}
2021-12-21 11:05:19 +00:00
// SaveBinlogPaths updates segment related binlog path
2021-09-06 03:12:42 +00:00
// works for Checkpoints and Flush
2021-05-25 07:35:37 +00:00
func ( s * Server ) SaveBinlogPaths ( ctx context . Context , req * datapb . SaveBinlogPathsRequest ) ( * commonpb . Status , error ) {
2021-11-11 16:22:42 +00:00
resp := & commonpb . Status { ErrorCode : commonpb . ErrorCode_UnexpectedError }
2021-10-14 07:44:34 +00:00
2021-05-28 01:55:21 +00:00
if s . isClosed ( ) {
2021-06-22 10:24:08 +00:00
resp . Reason = serverNotServingErrMsg
2021-05-25 07:35:37 +00:00
return resp , nil
}
2021-10-14 07:44:34 +00:00
2021-07-28 03:43:22 +00:00
log . Debug ( "receive SaveBinlogPaths request" ,
2022-01-04 11:29:45 +00:00
zap . Int64 ( "nodeID" , req . GetBase ( ) . GetSourceID ( ) ) ,
2021-06-08 11:25:37 +00:00
zap . Int64 ( "collectionID" , req . GetCollectionID ( ) ) ,
2021-06-18 08:02:05 +00:00
zap . Int64 ( "segmentID" , req . GetSegmentID ( ) ) ,
2021-10-13 12:54:33 +00:00
zap . Bool ( "isFlush" , req . GetFlushed ( ) ) ,
2021-11-12 09:31:11 +00:00
zap . Bool ( "isDropped" , req . GetDropped ( ) ) ,
2021-11-17 09:57:11 +00:00
zap . Any ( "startPositions" , req . GetStartPositions ( ) ) ,
2021-06-18 08:02:05 +00:00
zap . Any ( "checkpoints" , req . GetCheckPoints ( ) ) )
2021-05-25 07:35:37 +00:00
2021-10-14 07:44:34 +00:00
// validate
nodeID := req . GetBase ( ) . GetSourceID ( )
segmentID := req . GetSegmentID ( )
segment := s . meta . GetSegment ( segmentID )
if segment == nil {
FailResponse ( resp , fmt . Sprintf ( "failed to get segment %d" , segmentID ) )
log . Error ( "failed to get segment" , zap . Int64 ( "segmentID" , segmentID ) )
return resp , nil
}
channel := segment . GetInsertChannel ( )
if ! s . channelManager . Match ( nodeID , channel ) {
FailResponse ( resp , fmt . Sprintf ( "channel %s is not watched on node %d" , channel , nodeID ) )
log . Warn ( "node is not matched with channel" , zap . String ( "channel" , channel ) , zap . Int64 ( "nodeID" , nodeID ) )
2021-11-11 16:22:42 +00:00
return resp , nil
2021-10-14 07:44:34 +00:00
}
2021-11-12 09:31:11 +00:00
if req . GetDropped ( ) {
s . segmentManager . DropSegment ( ctx , segment . GetID ( ) )
}
2021-06-04 03:45:45 +00:00
// set segment to SegmentState_Flushing and save binlogs and checkpoints
2021-11-11 16:22:42 +00:00
err := s . meta . UpdateFlushSegmentsInfo (
req . GetSegmentID ( ) ,
req . GetFlushed ( ) ,
req . GetDropped ( ) ,
req . GetField2BinlogPaths ( ) ,
req . GetField2StatslogPaths ( ) ,
req . GetDeltalogs ( ) ,
req . GetCheckPoints ( ) ,
req . GetStartPositions ( ) )
2021-05-25 07:35:37 +00:00
if err != nil {
2021-07-28 03:43:22 +00:00
log . Error ( "save binlog and checkpoints failed" ,
2021-06-08 11:25:37 +00:00
zap . Int64 ( "segmentID" , req . GetSegmentID ( ) ) ,
zap . Error ( err ) )
2021-05-25 07:35:37 +00:00
resp . Reason = err . Error ( )
2021-05-27 10:45:24 +00:00
return resp , nil
2021-05-25 07:35:37 +00:00
}
2021-10-14 07:44:34 +00:00
2021-07-28 03:43:22 +00:00
log . Debug ( "flush segment with meta" , zap . Int64 ( "id" , req . SegmentID ) ,
2021-08-19 05:00:12 +00:00
zap . Any ( "meta" , req . GetField2BinlogPaths ( ) ) )
2021-05-25 07:35:37 +00:00
2021-11-11 16:22:42 +00:00
if req . GetFlushed ( ) {
2021-06-04 03:45:45 +00:00
s . segmentManager . DropSegment ( ctx , req . SegmentID )
s . flushCh <- req . SegmentID
2021-11-05 14:25:00 +00:00
2021-12-23 10:39:11 +00:00
if Params . DataCoordCfg . EnableCompaction {
2021-11-05 14:25:00 +00:00
cctx , cancel := context . WithTimeout ( s . ctx , 5 * time . Second )
defer cancel ( )
2021-11-08 13:45:00 +00:00
tt , err := getTimetravelReverseTime ( cctx , s . allocator )
2021-11-05 14:25:00 +00:00
if err == nil {
2021-11-12 09:31:11 +00:00
err = s . compactionTrigger . triggerSingleCompaction ( segment . GetCollectionID ( ) ,
segment . GetPartitionID ( ) , segmentID , segment . GetInsertChannel ( ) , tt )
if err != nil {
2021-11-05 14:25:00 +00:00
log . Warn ( "failed to trigger single compaction" , zap . Int64 ( "segmentID" , segmentID ) )
}
}
}
2021-06-04 03:45:45 +00:00
}
2021-05-25 07:35:37 +00:00
resp . ErrorCode = commonpb . ErrorCode_Success
return resp , nil
}
2021-05-28 01:55:21 +00:00
2021-11-29 14:35:41 +00:00
// DropVirtualChannel notifies vchannel dropped
// And contains the remaining data log & checkpoint to update
func ( s * Server ) DropVirtualChannel ( ctx context . Context , req * datapb . DropVirtualChannelRequest ) ( * datapb . DropVirtualChannelResponse , error ) {
resp := & datapb . DropVirtualChannelResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
if s . isClosed ( ) {
resp . Status . Reason = serverNotServingErrMsg
return resp , nil
}
channel := req . GetChannelName ( )
log . Debug ( "receive DropVirtualChannel request" ,
zap . String ( "channel name" , channel ) )
// validate
nodeID := req . GetBase ( ) . GetSourceID ( )
if ! s . channelManager . Match ( nodeID , channel ) {
FailResponse ( resp . Status , fmt . Sprintf ( "channel %s is not watched on node %d" , channel , nodeID ) )
log . Warn ( "node is not matched with channel" , zap . String ( "channel" , channel ) , zap . Int64 ( "nodeID" , nodeID ) )
return resp , nil
}
segments := make ( [ ] * SegmentInfo , 0 , len ( req . GetSegments ( ) ) )
for _ , seg2Drop := range req . GetSegments ( ) {
info := & datapb . SegmentInfo {
ID : seg2Drop . GetSegmentID ( ) ,
CollectionID : seg2Drop . GetCollectionID ( ) ,
InsertChannel : channel ,
Binlogs : seg2Drop . GetField2BinlogPaths ( ) ,
Statslogs : seg2Drop . GetField2StatslogPaths ( ) ,
Deltalogs : seg2Drop . GetDeltalogs ( ) ,
StartPosition : seg2Drop . GetStartPosition ( ) ,
DmlPosition : seg2Drop . GetCheckPoint ( ) ,
NumOfRows : seg2Drop . GetNumOfRows ( ) ,
}
segment := NewSegmentInfo ( info )
segments = append ( segments , segment )
}
err := s . meta . UpdateDropChannelSegmentInfo ( channel , segments )
if err != nil {
log . Error ( "Update Drop Channel segment info failed" , zap . String ( "channel" , channel ) , zap . Error ( err ) )
resp . Status . Reason = err . Error ( )
return resp , nil
}
log . Debug ( "DropVChannel plan to remove" , zap . String ( "channel" , channel ) )
err = s . channelManager . RemoveChannel ( channel )
if err != nil {
log . Warn ( "DropVChannel failed to RemoveChannel" , zap . String ( "channel" , channel ) , zap . Error ( err ) )
}
s . segmentManager . DropSegmentsOfChannel ( ctx , channel )
// clean up removal flag
s . meta . FinishRemoveChannel ( channel )
// no compaction triggerred in Drop procedure
resp . Status . ErrorCode = commonpb . ErrorCode_Success
return resp , nil
}
2021-09-24 13:33:55 +00:00
// GetComponentStates returns DataCoord's current state
2021-05-28 01:55:21 +00:00
func ( s * Server ) GetComponentStates ( ctx context . Context ) ( * internalpb . ComponentStates , error ) {
2021-11-19 05:57:12 +00:00
nodeID := common . NotRegisteredID
if s . session != nil && s . session . Registered ( ) {
nodeID = s . session . ServerID // or Params.NodeID
}
2021-06-02 07:11:17 +00:00
resp := & internalpb . ComponentStates {
State : & internalpb . ComponentInfo {
2021-11-19 05:57:12 +00:00
// NodeID: Params.NodeID, // will race with Server.Register()
NodeID : nodeID ,
2021-06-21 10:22:13 +00:00
Role : "datacoord" ,
2021-06-02 07:11:17 +00:00
StateCode : 0 ,
} ,
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
Reason : "" ,
} ,
}
state := atomic . LoadInt64 ( & s . isServing )
switch state {
2021-06-29 02:46:13 +00:00
case ServerStateInitializing :
2021-06-02 07:11:17 +00:00
resp . State . StateCode = internalpb . StateCode_Initializing
2021-06-29 02:46:13 +00:00
case ServerStateHealthy :
2021-06-02 07:11:17 +00:00
resp . State . StateCode = internalpb . StateCode_Healthy
default :
resp . State . StateCode = internalpb . StateCode_Abnormal
}
return resp , nil
2021-05-28 01:55:21 +00:00
}
2021-09-06 03:12:42 +00:00
// GetRecoveryInfo get recovery info for segment
2021-06-07 01:47:36 +00:00
func ( s * Server ) GetRecoveryInfo ( ctx context . Context , req * datapb . GetRecoveryInfoRequest ) ( * datapb . GetRecoveryInfoResponse , error ) {
collectionID := req . GetCollectionID ( )
partitionID := req . GetPartitionID ( )
2021-07-28 03:43:22 +00:00
log . Info ( "receive get recovery info request" ,
2021-06-08 11:25:37 +00:00
zap . Int64 ( "collectionID" , collectionID ) ,
2021-06-07 01:47:36 +00:00
zap . Int64 ( "partitionID" , partitionID ) )
resp := & datapb . GetRecoveryInfoResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
2021-06-09 07:02:48 +00:00
if s . isClosed ( ) {
2021-06-22 10:24:08 +00:00
resp . Status . Reason = serverNotServingErrMsg
2021-06-09 07:02:48 +00:00
return resp , nil
}
2021-11-05 14:25:00 +00:00
segmentIDs := s . meta . GetSegmentsIDOfPartition ( collectionID , partitionID )
2021-06-07 01:47:36 +00:00
segment2Binlogs := make ( map [ UniqueID ] [ ] * datapb . FieldBinlog )
2021-10-22 06:31:13 +00:00
segment2StatsBinlogs := make ( map [ UniqueID ] [ ] * datapb . FieldBinlog )
2021-12-19 12:00:42 +00:00
segment2DeltaBinlogs := make ( map [ UniqueID ] [ ] * datapb . FieldBinlog )
2021-09-07 03:35:18 +00:00
segmentsNumOfRows := make ( map [ UniqueID ] int64 )
2021-10-22 06:31:13 +00:00
flushedIDs := make ( map [ int64 ] struct { } )
2021-06-07 01:47:36 +00:00
for _ , id := range segmentIDs {
2021-07-07 06:02:01 +00:00
segment := s . meta . GetSegment ( id )
if segment == nil {
2021-07-28 03:43:22 +00:00
errMsg := fmt . Sprintf ( "failed to get segment %d" , id )
2021-07-07 06:02:01 +00:00
log . Error ( errMsg )
resp . Status . Reason = errMsg
2021-06-15 11:23:55 +00:00
return resp , nil
}
if segment . State != commonpb . SegmentState_Flushed && segment . State != commonpb . SegmentState_Flushing {
continue
}
2021-12-09 03:03:08 +00:00
binlogs := segment . GetBinlogs ( )
if len ( binlogs ) == 0 {
continue
}
2021-10-22 06:31:13 +00:00
_ , ok := flushedIDs [ id ]
if ! ok {
flushedIDs [ id ] = struct { } { }
}
2021-06-15 11:23:55 +00:00
2021-12-19 12:00:42 +00:00
field2Binlog := make ( map [ UniqueID ] [ ] * datapb . Binlog )
2021-08-19 05:00:12 +00:00
for _ , field := range binlogs {
field2Binlog [ field . GetFieldID ( ) ] = append ( field2Binlog [ field . GetFieldID ( ) ] , field . GetBinlogs ( ) ... )
2021-06-07 01:47:36 +00:00
}
for f , paths := range field2Binlog {
fieldBinlogs := & datapb . FieldBinlog {
FieldID : f ,
Binlogs : paths ,
}
segment2Binlogs [ id ] = append ( segment2Binlogs [ id ] , fieldBinlogs )
}
2021-09-07 03:35:18 +00:00
segmentsNumOfRows [ id ] = segment . NumOfRows
2021-10-22 06:31:13 +00:00
statsBinlogs := segment . GetStatslogs ( )
2021-12-19 12:00:42 +00:00
field2StatsBinlog := make ( map [ UniqueID ] [ ] * datapb . Binlog )
2021-10-22 06:31:13 +00:00
for _ , field := range statsBinlogs {
field2StatsBinlog [ field . GetFieldID ( ) ] = append ( field2StatsBinlog [ field . GetFieldID ( ) ] , field . GetBinlogs ( ) ... )
}
for f , paths := range field2StatsBinlog {
fieldBinlogs := & datapb . FieldBinlog {
FieldID : f ,
Binlogs : paths ,
}
segment2StatsBinlogs [ id ] = append ( segment2StatsBinlogs [ id ] , fieldBinlogs )
}
2021-12-19 12:00:42 +00:00
if len ( segment . GetDeltalogs ( ) ) > 0 {
segment2DeltaBinlogs [ id ] = append ( segment2DeltaBinlogs [ id ] , segment . GetDeltalogs ( ) ... )
}
2021-06-07 01:47:36 +00:00
}
binlogs := make ( [ ] * datapb . SegmentBinlogs , 0 , len ( segment2Binlogs ) )
2021-10-22 06:31:13 +00:00
for segmentID := range flushedIDs {
2021-06-07 01:47:36 +00:00
sbl := & datapb . SegmentBinlogs {
SegmentID : segmentID ,
2021-09-07 03:35:18 +00:00
NumOfRows : segmentsNumOfRows [ segmentID ] ,
2021-10-22 06:31:13 +00:00
FieldBinlogs : segment2Binlogs [ segmentID ] ,
Statslogs : segment2StatsBinlogs [ segmentID ] ,
Deltalogs : segment2DeltaBinlogs [ segmentID ] ,
2021-06-07 01:47:36 +00:00
}
binlogs = append ( binlogs , sbl )
}
2021-06-21 09:28:03 +00:00
dresp , err := s . rootCoordClient . DescribeCollection ( s . ctx , & milvuspb . DescribeCollectionRequest {
2021-06-07 01:47:36 +00:00
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_DescribeCollection ,
2021-12-23 10:39:11 +00:00
SourceID : Params . DataCoordCfg . NodeID ,
2021-06-07 01:47:36 +00:00
} ,
CollectionID : collectionID ,
} )
if err = VerifyResponse ( dresp , err ) ; err != nil {
2021-07-28 03:43:22 +00:00
log . Error ( "get collection info from master failed" ,
2021-06-07 01:47:36 +00:00
zap . Int64 ( "collectionID" , collectionID ) ,
zap . Error ( err ) )
resp . Status . Reason = err . Error ( )
return resp , nil
}
channels := dresp . GetVirtualChannelNames ( )
2021-10-14 07:44:34 +00:00
channelInfos := make ( [ ] * datapb . VchannelInfo , 0 , len ( channels ) )
2021-06-07 01:47:36 +00:00
for _ , c := range channels {
2021-11-17 15:25:12 +00:00
channelInfo := s . handler . GetVChanPositions ( c , collectionID , partitionID )
2021-10-14 07:44:34 +00:00
channelInfos = append ( channelInfos , channelInfo )
2021-11-12 10:27:10 +00:00
log . Debug ( "datacoord append channelInfo in GetRecoveryInfo" ,
zap . Any ( "collectionID" , collectionID ) ,
zap . Any ( "channelInfo" , channelInfo ) ,
)
2021-06-07 01:47:36 +00:00
}
resp . Binlogs = binlogs
resp . Channels = channelInfos
resp . Status . ErrorCode = commonpb . ErrorCode_Success
return resp , nil
}
2021-07-02 03:16:20 +00:00
2021-09-06 03:12:42 +00:00
// GetFlushedSegments returns all segment matches provided criterion and in State Flushed
// If requested partition id < 0, ignores the partition id filter
2021-07-02 03:16:20 +00:00
func ( s * Server ) GetFlushedSegments ( ctx context . Context , req * datapb . GetFlushedSegmentsRequest ) ( * datapb . GetFlushedSegmentsResponse , error ) {
2021-09-06 03:12:42 +00:00
resp := & datapb . GetFlushedSegmentsResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
2021-07-02 03:16:20 +00:00
collectionID := req . GetCollectionID ( )
partitionID := req . GetPartitionID ( )
2021-12-15 08:03:12 +00:00
log . Debug ( "received get flushed segments request" ,
2021-09-06 03:12:42 +00:00
zap . Int64 ( "collectionID" , collectionID ) ,
zap . Int64 ( "partitionID" , partitionID ) )
if s . isClosed ( ) {
resp . Status . Reason = serverNotServingErrMsg
return resp , nil
}
2021-07-02 03:16:20 +00:00
var segmentIDs [ ] UniqueID
if partitionID < 0 {
2021-11-05 14:25:00 +00:00
segmentIDs = s . meta . GetSegmentsIDOfCollection ( collectionID )
2021-07-02 03:16:20 +00:00
} else {
2021-11-05 14:25:00 +00:00
segmentIDs = s . meta . GetSegmentsIDOfPartition ( collectionID , partitionID )
2021-07-02 03:16:20 +00:00
}
ret := make ( [ ] UniqueID , 0 , len ( segmentIDs ) )
for _ , id := range segmentIDs {
2021-07-07 06:02:01 +00:00
s := s . meta . GetSegment ( id )
2021-11-05 14:25:00 +00:00
if s != nil && s . GetState ( ) != commonpb . SegmentState_Flushed {
2021-07-02 03:16:20 +00:00
continue
}
2021-11-05 14:25:00 +00:00
// if this segment == nil, we assume this segment has been compacted and flushed
2021-07-02 03:16:20 +00:00
ret = append ( ret , id )
}
2021-09-06 03:12:42 +00:00
resp . Segments = ret
resp . Status . ErrorCode = commonpb . ErrorCode_Success
return resp , nil
2021-07-02 03:16:20 +00:00
}
2021-09-01 02:13:15 +00:00
2021-09-06 03:12:42 +00:00
// GetMetrics returns DataCoord metrics info
// it may include SystemMetrics, Topology metrics, etc.
2021-09-01 02:13:15 +00:00
func ( s * Server ) GetMetrics ( ctx context . Context , req * milvuspb . GetMetricsRequest ) ( * milvuspb . GetMetricsResponse , error ) {
2021-12-15 05:15:10 +00:00
log . Debug ( "received get metrics request" ,
2021-12-23 10:39:11 +00:00
zap . Int64 ( "nodeID" , Params . DataCoordCfg . NodeID ) ,
2021-12-15 05:15:10 +00:00
zap . String ( "request" , req . Request ) )
2021-09-01 02:13:15 +00:00
if s . isClosed ( ) {
log . Warn ( "DataCoord.GetMetrics failed" ,
2021-12-23 10:39:11 +00:00
zap . Int64 ( "node_id" , Params . DataCoordCfg . NodeID ) ,
2021-09-01 02:13:15 +00:00
zap . String ( "req" , req . Request ) ,
2021-12-23 10:39:11 +00:00
zap . Error ( errDataCoordIsUnhealthy ( Params . DataCoordCfg . NodeID ) ) )
2021-09-01 02:13:15 +00:00
return & milvuspb . GetMetricsResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
2021-12-23 10:39:11 +00:00
Reason : msgDataCoordIsUnhealthy ( Params . DataCoordCfg . NodeID ) ,
2021-09-01 02:13:15 +00:00
} ,
Response : "" ,
} , nil
}
metricType , err := metricsinfo . ParseMetricType ( req . Request )
if err != nil {
log . Warn ( "DataCoord.GetMetrics failed to parse metric type" ,
2021-12-23 10:39:11 +00:00
zap . Int64 ( "node_id" , Params . DataCoordCfg . NodeID ) ,
2021-09-01 02:13:15 +00:00
zap . String ( "req" , req . Request ) ,
zap . Error ( err ) )
return & milvuspb . GetMetricsResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : err . Error ( ) ,
} ,
Response : "" ,
} , nil
}
log . Debug ( "DataCoord.GetMetrics" ,
zap . String ( "metric_type" , metricType ) )
if metricType == metricsinfo . SystemInfoMetrics {
2021-09-03 09:15:26 +00:00
ret , err := s . metricsCacheManager . GetSystemInfoMetrics ( )
if err == nil && ret != nil {
return ret , nil
}
log . Debug ( "failed to get system info metrics from cache, recompute instead" ,
zap . Error ( err ) )
2021-09-01 02:13:15 +00:00
metrics , err := s . getSystemInfoMetrics ( ctx , req )
log . Debug ( "DataCoord.GetMetrics" ,
2021-12-23 10:39:11 +00:00
zap . Int64 ( "node_id" , Params . DataCoordCfg . NodeID ) ,
2021-09-01 02:13:15 +00:00
zap . String ( "req" , req . Request ) ,
zap . String ( "metric_type" , metricType ) ,
zap . Any ( "metrics" , metrics ) , // TODO(dragondriver): necessary? may be very large
zap . Error ( err ) )
2021-09-03 09:15:26 +00:00
s . metricsCacheManager . UpdateSystemInfoMetrics ( metrics )
2021-12-01 14:17:46 +00:00
return metrics , nil
2021-09-01 02:13:15 +00:00
}
log . Debug ( "DataCoord.GetMetrics failed, request metric type is not implemented yet" ,
2021-12-23 10:39:11 +00:00
zap . Int64 ( "node_id" , Params . DataCoordCfg . NodeID ) ,
2021-09-01 02:13:15 +00:00
zap . String ( "req" , req . Request ) ,
zap . String ( "metric_type" , metricType ) )
return & milvuspb . GetMetricsResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
Reason : metricsinfo . MsgUnimplementedMetric ,
} ,
Response : "" ,
} , nil
}
2021-11-05 14:25:00 +00:00
2021-11-10 15:57:38 +00:00
// CompleteCompaction completes a compaction with the result
2021-11-05 14:25:00 +00:00
func ( s * Server ) CompleteCompaction ( ctx context . Context , req * datapb . CompactionResult ) ( * commonpb . Status , error ) {
log . Debug ( "receive complete compaction request" , zap . Int64 ( "planID" , req . PlanID ) , zap . Int64 ( "segmentID" , req . GetSegmentID ( ) ) )
resp := & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
}
if s . isClosed ( ) {
log . Warn ( "failed to complete compaction" , zap . Int64 ( "planID" , req . PlanID ) ,
2021-12-23 10:39:11 +00:00
zap . Error ( errDataCoordIsUnhealthy ( Params . DataCoordCfg . NodeID ) ) )
2021-11-05 14:25:00 +00:00
2021-12-23 10:39:11 +00:00
resp . Reason = msgDataCoordIsUnhealthy ( Params . DataCoordCfg . NodeID )
2021-11-05 14:25:00 +00:00
return resp , nil
}
2021-12-23 10:39:11 +00:00
if ! Params . DataCoordCfg . EnableCompaction {
2021-11-05 14:25:00 +00:00
resp . Reason = "compaction disabled"
return resp , nil
}
if err := s . compactionHandler . completeCompaction ( req ) ; err != nil {
log . Error ( "failed to complete compaction" , zap . Int64 ( "planID" , req . PlanID ) , zap . Error ( err ) )
resp . Reason = err . Error ( )
return resp , nil
}
log . Debug ( "success to complete compaction" , zap . Int64 ( "planID" , req . PlanID ) )
resp . ErrorCode = commonpb . ErrorCode_Success
return resp , nil
}
2021-11-10 15:58:57 +00:00
// ManualCompaction triggers a compaction for a collection
2021-11-09 06:47:02 +00:00
func ( s * Server ) ManualCompaction ( ctx context . Context , req * milvuspb . ManualCompactionRequest ) ( * milvuspb . ManualCompactionResponse , error ) {
2021-12-15 08:01:11 +00:00
log . Debug ( "received manual compaction" , zap . Int64 ( "collectionID" , req . GetCollectionID ( ) ) )
2021-11-05 14:25:00 +00:00
2021-11-09 06:47:02 +00:00
resp := & milvuspb . ManualCompactionResponse {
2021-11-05 14:25:00 +00:00
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
if s . isClosed ( ) {
log . Warn ( "failed to execute manual compaction" , zap . Int64 ( "collectionID" , req . GetCollectionID ( ) ) ,
2021-12-23 10:39:11 +00:00
zap . Error ( errDataCoordIsUnhealthy ( Params . DataCoordCfg . NodeID ) ) )
resp . Status . Reason = msgDataCoordIsUnhealthy ( Params . DataCoordCfg . NodeID )
2021-11-05 14:25:00 +00:00
return resp , nil
}
2021-12-23 10:39:11 +00:00
if ! Params . DataCoordCfg . EnableCompaction {
2021-11-05 14:25:00 +00:00
resp . Status . Reason = "compaction disabled"
return resp , nil
}
2021-12-09 03:03:08 +00:00
tt , err := getTimetravelReverseTime ( ctx , s . allocator )
if err != nil {
log . Warn ( "failed to get timetravel reverse time" , zap . Int64 ( "collectionID" , req . GetCollectionID ( ) ) , zap . Error ( err ) )
resp . Status . Reason = err . Error ( )
return resp , nil
}
id , err := s . compactionTrigger . forceTriggerCompaction ( req . CollectionID , tt )
2021-11-05 14:25:00 +00:00
if err != nil {
log . Error ( "failed to trigger manual compaction" , zap . Int64 ( "collectionID" , req . GetCollectionID ( ) ) , zap . Error ( err ) )
resp . Status . Reason = err . Error ( )
return resp , nil
}
log . Debug ( "success to trigger manual compaction" , zap . Int64 ( "collectionID" , req . GetCollectionID ( ) ) , zap . Int64 ( "compactionID" , id ) )
resp . Status . ErrorCode = commonpb . ErrorCode_Success
resp . CompactionID = id
return resp , nil
}
2021-11-10 16:00:45 +00:00
// GetCompactionState gets the state of a compaction
2021-11-09 06:47:02 +00:00
func ( s * Server ) GetCompactionState ( ctx context . Context , req * milvuspb . GetCompactionStateRequest ) ( * milvuspb . GetCompactionStateResponse , error ) {
2021-12-15 08:07:10 +00:00
log . Debug ( "received get compaction state request" , zap . Int64 ( "compactionID" , req . GetCompactionID ( ) ) )
2021-11-09 06:47:02 +00:00
resp := & milvuspb . GetCompactionStateResponse {
2021-11-05 14:25:00 +00:00
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
if s . isClosed ( ) {
log . Warn ( "failed to get compaction state" , zap . Int64 ( "compactionID" , req . GetCompactionID ( ) ) ,
2021-12-23 10:39:11 +00:00
zap . Error ( errDataCoordIsUnhealthy ( Params . DataCoordCfg . NodeID ) ) )
resp . Status . Reason = msgDataCoordIsUnhealthy ( Params . DataCoordCfg . NodeID )
2021-11-05 14:25:00 +00:00
return resp , nil
}
2021-12-23 10:39:11 +00:00
if ! Params . DataCoordCfg . EnableCompaction {
2021-11-05 14:25:00 +00:00
resp . Status . Reason = "compaction disabled"
return resp , nil
}
2021-11-09 06:47:02 +00:00
tasks := s . compactionHandler . getCompactionTasksBySignalID ( req . GetCompactionID ( ) )
state , executingCnt , completedCnt , timeoutCnt := getCompactionState ( tasks )
resp . State = state
resp . ExecutingPlanNo = int64 ( executingCnt )
resp . CompletedPlanNo = int64 ( completedCnt )
resp . TimeoutPlanNo = int64 ( timeoutCnt )
resp . Status . ErrorCode = commonpb . ErrorCode_Success
2021-11-11 07:54:42 +00:00
log . Debug ( "success to get compaction state" , zap . Any ( "state" , state ) , zap . Int ( "executing" , executingCnt ) ,
zap . Int ( "completed" , completedCnt ) , zap . Int ( "timeout" , timeoutCnt ) )
2021-11-09 06:47:02 +00:00
return resp , nil
}
2021-11-19 10:05:25 +00:00
// GetCompactionStateWithPlans returns the compaction state of given plan
2021-11-09 06:47:02 +00:00
func ( s * Server ) GetCompactionStateWithPlans ( ctx context . Context , req * milvuspb . GetCompactionPlansRequest ) ( * milvuspb . GetCompactionPlansResponse , error ) {
2021-12-15 02:43:15 +00:00
log . Debug ( "received the request to get compaction state with plans" , zap . Int64 ( "compactionID" , req . GetCompactionID ( ) ) )
2021-11-09 06:47:02 +00:00
resp := & milvuspb . GetCompactionPlansResponse {
Status : & commonpb . Status { ErrorCode : commonpb . ErrorCode_UnexpectedError } ,
}
if s . isClosed ( ) {
2021-12-23 10:39:11 +00:00
log . Warn ( "failed to get compaction state with plans" , zap . Int64 ( "compactionID" , req . GetCompactionID ( ) ) , zap . Error ( errDataCoordIsUnhealthy ( Params . DataCoordCfg . NodeID ) ) )
resp . Status . Reason = msgDataCoordIsUnhealthy ( Params . DataCoordCfg . NodeID )
2021-11-09 06:47:02 +00:00
return resp , nil
}
2021-12-23 10:39:11 +00:00
if ! Params . DataCoordCfg . EnableCompaction {
2021-11-09 06:47:02 +00:00
resp . Status . Reason = "compaction disabled"
return resp , nil
2021-11-05 14:25:00 +00:00
}
2021-11-09 06:47:02 +00:00
tasks := s . compactionHandler . getCompactionTasksBySignalID ( req . GetCompactionID ( ) )
for _ , task := range tasks {
resp . MergeInfos = append ( resp . MergeInfos , getCompactionMergeInfo ( task ) )
}
state , _ , _ , _ := getCompactionState ( tasks )
2021-11-05 14:25:00 +00:00
resp . Status . ErrorCode = commonpb . ErrorCode_Success
2021-11-09 06:47:02 +00:00
resp . State = state
2021-11-11 07:54:42 +00:00
log . Debug ( "success to get state with plans" , zap . Any ( "state" , state ) , zap . Any ( "merge infos" , resp . MergeInfos ) )
2021-11-05 14:25:00 +00:00
return resp , nil
}
2021-11-09 06:47:02 +00:00
func getCompactionMergeInfo ( task * compactionTask ) * milvuspb . CompactionMergeInfo {
segments := task . plan . GetSegmentBinlogs ( )
var sources [ ] int64
for _ , s := range segments {
sources = append ( sources , s . GetSegmentID ( ) )
}
var target int64 = - 1
if task . result != nil {
target = task . result . GetSegmentID ( )
}
return & milvuspb . CompactionMergeInfo {
Sources : sources ,
Target : target ,
}
}
func getCompactionState ( tasks [ ] * compactionTask ) ( state commonpb . CompactionState , executingCnt , completedCnt , timeoutCnt int ) {
for _ , t := range tasks {
switch t . state {
case executing :
executingCnt ++
case completed :
completedCnt ++
case timeout :
timeoutCnt ++
}
}
if executingCnt != 0 {
state = commonpb . CompactionState_Executing
} else {
state = commonpb . CompactionState_Completed
}
return
}
2021-11-10 16:54:45 +00:00
2021-11-29 15:09:41 +00:00
// WatchChannels notifies DataCoord to watch vchannels of a collection
2021-11-10 16:54:45 +00:00
func ( s * Server ) WatchChannels ( ctx context . Context , req * datapb . WatchChannelsRequest ) ( * datapb . WatchChannelsResponse , error ) {
log . Debug ( "receive watch channels request" , zap . Any ( "channels" , req . GetChannelNames ( ) ) )
resp := & datapb . WatchChannelsResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_UnexpectedError ,
} ,
}
if s . isClosed ( ) {
2021-11-23 02:55:14 +00:00
log . Warn ( "failed to watch channels request" , zap . Any ( "channels" , req . GetChannelNames ( ) ) ,
2021-12-23 10:39:11 +00:00
zap . Error ( errDataCoordIsUnhealthy ( Params . DataCoordCfg . NodeID ) ) )
resp . Status . Reason = msgDataCoordIsUnhealthy ( Params . DataCoordCfg . NodeID )
2021-11-10 16:54:45 +00:00
return resp , nil
}
for _ , channelName := range req . GetChannelNames ( ) {
ch := & channel {
Name : channelName ,
CollectionID : req . GetCollectionID ( ) ,
}
err := s . channelManager . Watch ( ch )
if err != nil {
log . Warn ( "fail to watch channelName" , zap . String ( "channelName" , channelName ) , zap . Error ( err ) )
resp . Status . Reason = err . Error ( )
return resp , nil
}
}
resp . Status . ErrorCode = commonpb . ErrorCode_Success
return resp , nil
}
2021-11-23 02:55:14 +00:00
// GetFlushState gets the flush state of multiple segments
func ( s * Server ) GetFlushState ( ctx context . Context , req * milvuspb . GetFlushStateRequest ) ( * milvuspb . GetFlushStateResponse , error ) {
2021-12-20 02:42:43 +00:00
log . Debug ( "received get flush state request" , zap . Int64s ( "segmentIDs" , req . GetSegmentIDs ( ) ) , zap . Int ( "len" , len ( req . GetSegmentIDs ( ) ) ) )
2021-11-23 02:55:14 +00:00
resp := & milvuspb . GetFlushStateResponse { Status : & commonpb . Status { ErrorCode : commonpb . ErrorCode_UnexpectedError } }
if s . isClosed ( ) {
log . Warn ( "failed to get flush state because of closed server" ,
2021-12-17 11:48:42 +00:00
zap . Int64s ( "segmentIDs" , req . GetSegmentIDs ( ) ) , zap . Int ( "len" , len ( req . GetSegmentIDs ( ) ) ) )
2021-12-23 10:39:11 +00:00
resp . Status . Reason = msgDataCoordIsUnhealthy ( Params . DataCoordCfg . NodeID )
2021-11-23 02:55:14 +00:00
return resp , nil
}
var unflushed [ ] UniqueID
for _ , sid := range req . GetSegmentIDs ( ) {
segment := s . meta . GetSegment ( sid )
// segment is nil if it was compacted
if segment == nil || segment . GetState ( ) == commonpb . SegmentState_Flushed ||
segment . GetState ( ) == commonpb . SegmentState_Flushed {
continue
}
unflushed = append ( unflushed , sid )
}
if len ( unflushed ) != 0 {
2021-12-17 11:18:52 +00:00
log . Debug ( "[flush state] unflushed segment ids" , zap . Int64s ( "segmentIDs" , unflushed ) , zap . Int ( "len" , len ( unflushed ) ) )
2021-11-23 02:55:14 +00:00
resp . Flushed = false
} else {
2021-12-15 02:41:27 +00:00
log . Debug ( "[flush state] all segment is flushed" , zap . Int64s ( "segment ids" , req . GetSegmentIDs ( ) ) )
2021-11-23 02:55:14 +00:00
resp . Flushed = true
}
resp . Status . ErrorCode = commonpb . ErrorCode_Success
return resp , nil
}