2021-05-28 01:55:21 +00:00
// Copyright (C) 2019-2020 Zilliz. All rights reserved.//// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
2021-04-19 03:35:38 +00:00
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
2021-05-20 03:34:45 +00:00
2021-06-22 02:42:07 +00:00
package datacoord
2021-01-19 04:10:49 +00:00
2021-01-22 03:07:07 +00:00
import (
"context"
"fmt"
2021-03-08 07:25:55 +00:00
"math/rand"
2021-01-23 12:22:59 +00:00
"sync"
2021-01-26 07:14:49 +00:00
"sync/atomic"
"time"
2021-05-28 01:55:21 +00:00
datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
2021-06-18 13:30:08 +00:00
rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
2021-05-28 01:55:21 +00:00
"github.com/milvus-io/milvus/internal/logutil"
2021-05-27 10:45:24 +00:00
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
2021-03-04 08:01:30 +00:00
2021-04-22 06:45:57 +00:00
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/retry"
2021-05-21 11:28:52 +00:00
"github.com/milvus-io/milvus/internal/util/sessionutil"
2021-04-22 06:45:57 +00:00
"github.com/milvus-io/milvus/internal/util/typeutil"
2021-03-05 12:41:34 +00:00
2021-04-22 06:45:57 +00:00
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
2021-01-22 03:07:07 +00:00
)
2021-06-22 10:24:08 +00:00
const (
rootCoordClientTimout = 20 * time . Second
connEtcdMaxRetryTime = 100000
connEtcdRetryInterval = 200 * time . Millisecond
)
2021-05-28 01:55:21 +00:00
2021-01-22 03:07:07 +00:00
type (
2021-01-22 11:43:27 +00:00
UniqueID = typeutil . UniqueID
Timestamp = typeutil . Timestamp
2021-01-22 03:07:07 +00:00
)
2021-06-22 10:24:08 +00:00
2021-06-23 01:24:10 +00:00
type dataNodeCreatorFunc func ( ctx context . Context , addr string , retryOptions ... retry . Option ) ( types . DataNode , error )
type rootCoordCreatorFunc func ( ctx context . Context , metaRootPath string , etcdEndpoints [ ] string , retryOptions ... retry . Option ) ( types . RootCoord , error )
2021-06-22 10:24:08 +00:00
2021-03-05 12:41:34 +00:00
type Server struct {
2021-04-13 01:47:02 +00:00
ctx context . Context
serverLoopCtx context . Context
serverLoopCancel context . CancelFunc
serverLoopWg sync . WaitGroup
2021-05-28 01:55:21 +00:00
isServing int64
2021-05-26 11:06:56 +00:00
kvClient * etcdkv . EtcdKV
meta * meta
segmentInfoStream msgstream . MsgStream
2021-06-03 11:06:33 +00:00
segmentManager Manager
2021-05-28 01:55:21 +00:00
allocator allocator
2021-05-26 11:06:56 +00:00
cluster * cluster
2021-06-21 09:28:03 +00:00
rootCoordClient types . RootCoord
2021-05-28 01:55:21 +00:00
ddChannelName string
2021-05-26 11:06:56 +00:00
2021-05-28 01:55:21 +00:00
flushCh chan UniqueID
2021-05-26 11:06:56 +00:00
flushMsgStream msgstream . MsgStream
msFactory msgstream . Factory
session * sessionutil . Session
activeCh <- chan bool
2021-05-28 01:55:21 +00:00
eventCh <- chan * sessionutil . SessionEvent
2021-05-26 11:06:56 +00:00
2021-06-22 10:24:08 +00:00
dataClientCreator dataNodeCreatorFunc
rootCoordClientCreator rootCoordCreatorFunc
2021-03-05 12:41:34 +00:00
}
2021-01-22 03:07:07 +00:00
2021-02-08 06:30:54 +00:00
func CreateServer ( ctx context . Context , factory msgstream . Factory ) ( * Server , error ) {
2021-03-08 07:25:55 +00:00
rand . Seed ( time . Now ( ) . UnixNano ( ) )
2021-01-26 07:14:49 +00:00
s := & Server {
2021-06-22 10:24:08 +00:00
ctx : ctx ,
msFactory : factory ,
flushCh : make ( chan UniqueID , 1024 ) ,
dataClientCreator : defaultDataNodeCreatorFunc ,
rootCoordClientCreator : defaultRootCoordCreatorFunc ,
2021-01-26 07:14:49 +00:00
}
return s , nil
}
2021-06-23 01:24:10 +00:00
func defaultDataNodeCreatorFunc ( ctx context . Context , addr string , retryOptions ... retry . Option ) ( types . DataNode , error ) {
2021-06-23 03:48:06 +00:00
return datanodeclient . NewClient ( ctx , addr , retryOptions ... )
2021-06-22 10:24:08 +00:00
}
2021-06-23 01:24:10 +00:00
func defaultRootCoordCreatorFunc ( ctx context . Context , metaRootPath string , etcdEndpoints [ ] string , retryOptions ... retry . Option ) ( types . RootCoord , error ) {
return rootcoordclient . NewClient ( ctx , metaRootPath , etcdEndpoints , retryOptions ... )
2021-06-22 10:24:08 +00:00
}
2021-05-25 07:06:05 +00:00
// Register register data service at etcd
func ( s * Server ) Register ( ) error {
2021-06-11 14:04:41 +00:00
s . session = sessionutil . NewSession ( s . ctx , Params . MetaRootPath , Params . EtcdEndpoints )
2021-06-21 03:40:15 +00:00
s . activeCh = s . session . Init ( typeutil . DataCoordRole , Params . IP , true )
2021-05-25 07:06:05 +00:00
Params . NodeID = s . session . ServerID
return nil
}
func ( s * Server ) Init ( ) error {
2021-06-02 07:11:17 +00:00
atomic . StoreInt64 ( & s . isServing , 1 )
2021-01-22 11:43:27 +00:00
return nil
}
func ( s * Server ) Start ( ) error {
2021-01-26 01:43:41 +00:00
var err error
2021-05-28 01:55:21 +00:00
m := map [ string ] interface { } {
"PulsarAddress" : Params . PulsarAddress ,
"ReceiveBufSize" : 1024 ,
"PulsarBufSize" : 1024 }
err = s . msFactory . SetParams ( m )
if err != nil {
return err
}
2021-06-21 09:28:03 +00:00
if err = s . initRootCoordClient ( ) ; err != nil {
2021-05-28 01:55:21 +00:00
return err
}
2021-05-26 11:06:56 +00:00
2021-05-28 01:55:21 +00:00
if err = s . initMeta ( ) ; err != nil {
return err
}
2021-05-26 11:06:56 +00:00
2021-05-28 01:55:21 +00:00
if err = s . initCluster ( ) ; err != nil {
return err
}
2021-05-26 11:06:56 +00:00
2021-05-28 01:55:21 +00:00
if err = s . initSegmentInfoChannel ( ) ; err != nil {
return err
}
2021-05-26 11:06:56 +00:00
2021-06-22 10:24:08 +00:00
s . allocator = newRootCoordAllocator ( s . ctx , s . rootCoordClient )
2021-05-26 11:06:56 +00:00
2021-06-03 11:06:33 +00:00
s . startSegmentManager ( )
2021-05-28 01:55:21 +00:00
if err = s . initFlushMsgStream ( ) ; err != nil {
return err
}
2021-05-26 11:06:56 +00:00
2021-05-28 01:55:21 +00:00
if err = s . initServiceDiscovery ( ) ; err != nil {
return err
}
2021-05-26 11:06:56 +00:00
2021-05-28 01:55:21 +00:00
s . startServerLoop ( )
2021-05-26 11:06:56 +00:00
2021-06-02 07:11:17 +00:00
atomic . StoreInt64 ( & s . isServing , 2 )
2021-06-22 10:24:08 +00:00
log . Debug ( "DataCoordinator startup success" )
2021-05-28 01:55:21 +00:00
return nil
2021-05-26 11:06:56 +00:00
}
func ( s * Server ) initCluster ( ) error {
dManager , err := newClusterNodeManager ( s . kvClient )
2021-02-08 06:30:54 +00:00
if err != nil {
return err
}
2021-06-22 10:24:08 +00:00
sManager := newClusterSessionManager ( s . ctx , s . dataClientCreator )
2021-05-27 06:14:05 +00:00
s . cluster = newCluster ( s . ctx , dManager , sManager , s )
2021-05-26 11:06:56 +00:00
return nil
}
2021-02-08 06:30:54 +00:00
2021-05-26 11:06:56 +00:00
func ( s * Server ) initServiceDiscovery ( ) error {
sessions , rev , err := s . session . GetSessions ( typeutil . DataNodeRole )
if err != nil {
2021-06-21 03:40:15 +00:00
log . Debug ( "DataCoord initMeta failed" , zap . Error ( err ) )
2021-05-19 10:36:05 +00:00
return err
}
2021-05-26 11:06:56 +00:00
log . Debug ( "registered sessions" , zap . Any ( "sessions" , sessions ) )
datanodes := make ( [ ] * datapb . DataNodeInfo , 0 , len ( sessions ) )
for _ , session := range sessions {
datanodes = append ( datanodes , & datapb . DataNodeInfo {
Address : session . Address ,
Version : session . ServerID ,
Channels : [ ] * datapb . ChannelStatus { } ,
} )
}
2021-05-19 10:36:05 +00:00
2021-05-26 11:06:56 +00:00
if err := s . cluster . startup ( datanodes ) ; err != nil {
2021-06-21 09:28:03 +00:00
log . Debug ( "DataCoord loadMetaFromRootCoord failed" , zap . Error ( err ) )
2021-01-22 03:07:07 +00:00
return err
}
2021-05-26 11:06:56 +00:00
2021-05-28 01:55:21 +00:00
s . eventCh = s . session . WatchServices ( typeutil . DataNodeRole , rev )
2021-01-22 11:43:27 +00:00
return nil
}
2021-06-03 11:06:33 +00:00
func ( s * Server ) startSegmentManager ( ) {
2021-05-26 11:06:56 +00:00
helper := createNewSegmentHelper ( s . segmentInfoStream )
2021-06-03 11:06:33 +00:00
s . segmentManager = newSegmentManager ( s . meta , s . allocator , withAllocHelper ( helper ) )
2021-05-21 10:30:41 +00:00
}
2021-05-26 11:06:56 +00:00
func ( s * Server ) initSegmentInfoChannel ( ) error {
var err error
s . segmentInfoStream , err = s . msFactory . NewMsgStream ( s . ctx )
if err != nil {
return err
}
s . segmentInfoStream . AsProducer ( [ ] string { Params . SegmentInfoChannelName } )
2021-06-21 03:40:15 +00:00
log . Debug ( "DataCoord AsProducer: " + Params . SegmentInfoChannelName )
2021-05-26 11:06:56 +00:00
s . segmentInfoStream . Start ( )
return nil
2021-05-21 10:30:41 +00:00
}
2021-01-22 03:07:07 +00:00
func ( s * Server ) initMeta ( ) error {
2021-02-26 07:17:47 +00:00
connectEtcdFn := func ( ) error {
2021-06-11 14:04:41 +00:00
etcdClient , err := clientv3 . New ( clientv3 . Config { Endpoints : Params . EtcdEndpoints } )
2021-02-26 07:17:47 +00:00
if err != nil {
return err
}
2021-04-24 03:29:15 +00:00
s . kvClient = etcdkv . NewEtcdKV ( etcdClient , Params . MetaRootPath )
s . meta , err = newMeta ( s . kvClient )
2021-02-26 07:17:47 +00:00
if err != nil {
return err
}
return nil
2021-01-22 03:07:07 +00:00
}
2021-06-23 01:24:10 +00:00
return retry . Do ( s . ctx , connectEtcdFn , retry . Attempts ( connEtcdMaxRetryTime ) )
2021-01-22 03:07:07 +00:00
}
2021-05-26 11:06:56 +00:00
func ( s * Server ) initFlushMsgStream ( ) error {
2021-02-04 10:11:19 +00:00
var err error
2021-05-21 08:54:29 +00:00
// segment flush stream
s . flushMsgStream , err = s . msFactory . NewMsgStream ( s . ctx )
if err != nil {
return err
}
s . flushMsgStream . AsProducer ( [ ] string { Params . SegmentInfoChannelName } )
2021-06-21 03:40:15 +00:00
log . Debug ( "DataCoord AsProducer:" + Params . SegmentInfoChannelName )
2021-05-21 08:54:29 +00:00
s . flushMsgStream . Start ( )
2021-01-22 03:07:07 +00:00
return nil
}
2021-01-23 12:22:59 +00:00
2021-01-26 01:43:41 +00:00
func ( s * Server ) startServerLoop ( ) {
s . serverLoopCtx , s . serverLoopCancel = context . WithCancel ( s . ctx )
2021-05-28 01:55:21 +00:00
s . serverLoopWg . Add ( 5 )
2021-01-26 01:43:41 +00:00
go s . startStatsChannel ( s . serverLoopCtx )
2021-05-25 07:35:37 +00:00
go s . startDataNodeTtLoop ( s . serverLoopCtx )
2021-05-26 11:06:56 +00:00
go s . startWatchService ( s . serverLoopCtx )
go s . startActiveCheck ( s . serverLoopCtx )
2021-05-28 01:55:21 +00:00
go s . startFlushLoop ( s . serverLoopCtx )
2021-01-26 01:43:41 +00:00
}
func ( s * Server ) startStatsChannel ( ctx context . Context ) {
2021-03-04 08:01:30 +00:00
defer logutil . LogPanic ( )
2021-01-26 01:43:41 +00:00
defer s . serverLoopWg . Done ( )
2021-02-08 06:30:54 +00:00
statsStream , _ := s . msFactory . NewMsgStream ( ctx )
2021-06-21 03:40:15 +00:00
statsStream . AsConsumer ( [ ] string { Params . StatisticsChannelName } , Params . DataCoordSubscriptionName )
log . Debug ( "DataCoord stats stream" ,
2021-06-03 11:06:33 +00:00
zap . String ( "channelName" , Params . StatisticsChannelName ) ,
2021-06-21 03:40:15 +00:00
zap . String ( "descriptionName" , Params . DataCoordSubscriptionName ) )
2021-01-26 01:43:41 +00:00
statsStream . Start ( )
defer statsStream . Close ( )
for {
select {
case <- ctx . Done ( ) :
2021-06-22 10:24:08 +00:00
log . Debug ( "stats channel shutdown" )
2021-01-26 01:43:41 +00:00
return
default :
}
2021-03-25 06:41:46 +00:00
msgPack := statsStream . Consume ( )
2021-04-16 08:30:55 +00:00
if msgPack == nil {
2021-05-25 07:35:37 +00:00
return
2021-04-16 08:30:55 +00:00
}
2021-01-26 01:43:41 +00:00
for _ , msg := range msgPack . Msgs {
2021-04-24 03:29:15 +00:00
if msg . Type ( ) != commonpb . MsgType_SegmentStatistics {
2021-05-28 01:55:21 +00:00
log . Warn ( "receive unknown msg from segment statistics channel" ,
zap . Stringer ( "msgType" , msg . Type ( ) ) )
2021-04-24 03:29:15 +00:00
continue
2021-02-23 01:58:06 +00:00
}
2021-06-09 09:31:48 +00:00
log . Debug ( "Receive DataNode segment statistics update" )
2021-04-24 03:29:15 +00:00
ssMsg := msg . ( * msgstream . SegmentStatisticsMsg )
for _ , stat := range ssMsg . SegStats {
2021-06-03 11:06:33 +00:00
s . segmentManager . UpdateSegmentStats ( stat )
2021-05-19 06:13:53 +00:00
}
2021-01-26 01:43:41 +00:00
}
}
}
2021-05-25 07:35:37 +00:00
func ( s * Server ) startDataNodeTtLoop ( ctx context . Context ) {
2021-03-04 08:01:30 +00:00
defer logutil . LogPanic ( )
2021-01-26 01:43:41 +00:00
defer s . serverLoopWg . Done ( )
2021-05-25 07:35:37 +00:00
ttMsgStream , err := s . msFactory . NewMsgStream ( ctx )
if err != nil {
log . Error ( "new msg stream failed" , zap . Error ( err ) )
return
}
ttMsgStream . AsConsumer ( [ ] string { Params . TimeTickChannelName } ,
2021-06-21 03:40:15 +00:00
Params . DataCoordSubscriptionName )
log . Debug ( fmt . Sprintf ( "DataCoord AsConsumer:%s:%s" ,
Params . TimeTickChannelName , Params . DataCoordSubscriptionName ) )
2021-05-25 07:35:37 +00:00
ttMsgStream . Start ( )
defer ttMsgStream . Close ( )
2021-01-26 01:43:41 +00:00
for {
select {
case <- ctx . Done ( ) :
2021-06-22 10:24:08 +00:00
log . Debug ( "data node tt loop shutdown" )
2021-01-26 01:43:41 +00:00
return
default :
}
2021-05-25 07:35:37 +00:00
msgPack := ttMsgStream . Consume ( )
2021-04-16 08:30:55 +00:00
if msgPack == nil {
2021-05-25 07:35:37 +00:00
return
2021-04-16 08:30:55 +00:00
}
2021-01-26 01:43:41 +00:00
for _ , msg := range msgPack . Msgs {
2021-05-25 07:35:37 +00:00
if msg . Type ( ) != commonpb . MsgType_DataNodeTt {
2021-06-08 11:25:37 +00:00
log . Warn ( "Receive unexpected msg type from tt channel" ,
2021-05-25 07:35:37 +00:00
zap . Stringer ( "msgType" , msg . Type ( ) ) )
2021-01-26 01:43:41 +00:00
continue
}
2021-05-25 07:35:37 +00:00
ttMsg := msg . ( * msgstream . DataNodeTtMsg )
ch := ttMsg . ChannelName
ts := ttMsg . Timestamp
2021-06-24 05:40:03 +00:00
// log.Debug("Receive datanode timetick msg", zap.String("channel", ch),
// zap.Any("ts", ts))
2021-06-03 11:06:33 +00:00
segments , err := s . segmentManager . GetFlushableSegments ( ctx , ch , ts )
2021-01-26 01:43:41 +00:00
if err != nil {
2021-05-25 07:35:37 +00:00
log . Warn ( "get flushable segments failed" , zap . Error ( err ) )
2021-01-26 01:43:41 +00:00
continue
}
2021-05-26 11:06:56 +00:00
2021-06-08 11:25:37 +00:00
if len ( segments ) == 0 {
continue
}
log . Debug ( "Flush segments" , zap . Int64s ( "segmentIDs" , segments ) )
2021-05-26 11:06:56 +00:00
segmentInfos := make ( [ ] * datapb . SegmentInfo , 0 , len ( segments ) )
2021-05-25 07:35:37 +00:00
for _ , id := range segments {
sInfo , err := s . meta . GetSegment ( id )
2021-05-19 06:13:53 +00:00
if err != nil {
2021-05-25 07:35:37 +00:00
log . Error ( "get segment from meta error" , zap . Int64 ( "id" , id ) ,
2021-05-19 06:13:53 +00:00
zap . Error ( err ) )
2021-05-25 07:35:37 +00:00
continue
2021-05-19 06:13:53 +00:00
}
2021-05-26 11:06:56 +00:00
segmentInfos = append ( segmentInfos , sInfo )
}
2021-06-08 11:25:37 +00:00
if len ( segmentInfos ) > 0 {
s . cluster . flush ( segmentInfos )
}
2021-05-26 11:06:56 +00:00
}
}
}
func ( s * Server ) startWatchService ( ctx context . Context ) {
2021-05-28 01:55:21 +00:00
defer logutil . LogPanic ( )
2021-05-26 11:06:56 +00:00
defer s . serverLoopWg . Done ( )
for {
select {
case <- ctx . Done ( ) :
log . Debug ( "watch service shutdown" )
return
2021-05-28 01:55:21 +00:00
case event := <- s . eventCh :
2021-05-26 11:06:56 +00:00
datanode := & datapb . DataNodeInfo {
Address : event . Session . Address ,
Version : event . Session . ServerID ,
Channels : [ ] * datapb . ChannelStatus { } ,
2021-05-25 07:35:37 +00:00
}
2021-05-26 11:06:56 +00:00
switch event . EventType {
case sessionutil . SessionAddEvent :
2021-06-08 11:25:37 +00:00
log . Info ( "Received datanode register" ,
zap . String ( "address" , datanode . Address ) ,
zap . Int64 ( "serverID" , datanode . Version ) )
2021-05-26 11:06:56 +00:00
s . cluster . register ( datanode )
case sessionutil . SessionDelEvent :
2021-06-08 11:25:37 +00:00
log . Info ( "Received datanode unregister" ,
zap . String ( "address" , datanode . Address ) ,
zap . Int64 ( "serverID" , datanode . Version ) )
2021-05-26 11:06:56 +00:00
s . cluster . unregister ( datanode )
default :
log . Warn ( "receive unknown service event type" ,
zap . Any ( "type" , event . EventType ) )
}
}
}
}
2021-05-25 07:35:37 +00:00
2021-05-26 11:06:56 +00:00
func ( s * Server ) startActiveCheck ( ctx context . Context ) {
2021-05-28 01:55:21 +00:00
defer logutil . LogPanic ( )
2021-05-26 11:06:56 +00:00
defer s . serverLoopWg . Done ( )
for {
select {
case _ , ok := <- s . activeCh :
if ok {
continue
2021-05-19 06:13:53 +00:00
}
2021-05-26 11:06:56 +00:00
s . Stop ( )
2021-06-22 10:24:08 +00:00
log . Debug ( "disconnect with etcd and shutdown data coordinator" )
2021-05-26 11:06:56 +00:00
return
case <- ctx . Done ( ) :
log . Debug ( "connection check shutdown" )
return
2021-01-26 01:43:41 +00:00
}
}
}
2021-05-28 01:55:21 +00:00
func ( s * Server ) startFlushLoop ( ctx context . Context ) {
defer logutil . LogPanic ( )
defer s . serverLoopWg . Done ( )
ctx2 , cancel := context . WithCancel ( ctx )
defer cancel ( )
// send `Flushing` segments
go s . handleFlushingSegments ( ctx2 )
var err error
for {
select {
case <- ctx . Done ( ) :
log . Debug ( "flush loop shutdown" )
return
case segmentID := <- s . flushCh :
// write flush msg into segmentInfo/flush stream
msgPack := composeSegmentFlushMsgPack ( segmentID )
err = s . flushMsgStream . Produce ( & msgPack )
if err != nil {
log . Error ( "produce flush msg failed" ,
zap . Int64 ( "segmentID" , segmentID ) ,
zap . Error ( err ) )
continue
}
log . Debug ( "send segment flush msg" , zap . Int64 ( "id" , segmentID ) )
// set segment to SegmentState_Flushed
if err = s . meta . FlushSegment ( segmentID ) ; err != nil {
log . Error ( "flush segment complete failed" , zap . Error ( err ) )
continue
}
log . Debug ( "flush segment complete" , zap . Int64 ( "id" , segmentID ) )
}
}
}
func ( s * Server ) handleFlushingSegments ( ctx context . Context ) {
segments := s . meta . GetFlushingSegments ( )
for _ , segment := range segments {
select {
case <- ctx . Done ( ) :
return
case s . flushCh <- segment . ID :
}
}
}
2021-06-21 09:28:03 +00:00
func ( s * Server ) initRootCoordClient ( ) error {
2021-05-28 01:55:21 +00:00
var err error
2021-06-23 01:24:10 +00:00
if s . rootCoordClient , err = s . rootCoordClientCreator ( s . ctx , Params . MetaRootPath , Params . EtcdEndpoints , retry . Attempts ( 300 ) ) ; err != nil {
2021-05-28 01:55:21 +00:00
return err
}
2021-06-21 09:28:03 +00:00
if err = s . rootCoordClient . Init ( ) ; err != nil {
2021-05-28 01:55:21 +00:00
return err
}
2021-06-21 09:28:03 +00:00
return s . rootCoordClient . Start ( )
2021-05-28 01:55:21 +00:00
}
2021-05-26 11:06:56 +00:00
2021-01-22 03:07:07 +00:00
func ( s * Server ) Stop ( ) error {
2021-06-02 07:11:17 +00:00
if ! atomic . CompareAndSwapInt64 ( & s . isServing , 2 , 0 ) {
2021-05-28 01:55:21 +00:00
return nil
}
2021-06-21 03:40:15 +00:00
log . Debug ( "DataCoord server shutdown" )
2021-05-28 01:55:21 +00:00
atomic . StoreInt64 ( & s . isServing , 0 )
s . cluster . releaseSessions ( )
s . segmentInfoStream . Close ( )
s . flushMsgStream . Close ( )
s . stopServerLoop ( )
2021-01-22 03:07:07 +00:00
return nil
}
2021-04-16 08:30:55 +00:00
// CleanMeta only for test
func ( s * Server ) CleanMeta ( ) error {
2021-05-26 11:06:56 +00:00
log . Debug ( "clean meta" , zap . Any ( "kv" , s . kvClient ) )
2021-04-24 03:29:15 +00:00
return s . kvClient . RemoveWithPrefix ( "" )
2021-04-16 08:30:55 +00:00
}
2021-01-23 12:22:59 +00:00
func ( s * Server ) stopServerLoop ( ) {
s . serverLoopCancel ( )
s . serverLoopWg . Wait ( )
}
2021-04-26 01:45:54 +00:00
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
// if !s.meta.HasCollection(collID) {
// return fmt.Errorf("can not find collection %d", collID)
// }
// if !s.meta.HasPartition(collID, partID) {
// return fmt.Errorf("can not find partition %d", partID)
// }
// for _, name := range s.insertChannels {
// if name == channelName {
// return nil
// }
// }
// return fmt.Errorf("can not find channel %s", channelName)
//}
2021-04-12 08:35:51 +00:00
2021-06-21 09:28:03 +00:00
func ( s * Server ) loadCollectionFromRootCoord ( ctx context . Context , collectionID int64 ) error {
resp , err := s . rootCoordClient . DescribeCollection ( ctx , & milvuspb . DescribeCollectionRequest {
2021-02-26 01:23:39 +00:00
Base : & commonpb . MsgBase {
2021-03-10 06:45:35 +00:00
MsgType : commonpb . MsgType_DescribeCollection ,
2021-02-26 01:23:39 +00:00
SourceID : Params . NodeID ,
} ,
DbName : "" ,
CollectionID : collectionID ,
} )
if err = VerifyResponse ( resp , err ) ; err != nil {
return err
}
2021-06-21 09:28:03 +00:00
presp , err := s . rootCoordClient . ShowPartitions ( ctx , & milvuspb . ShowPartitionsRequest {
2021-04-12 08:35:51 +00:00
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_ShowPartitions ,
2021-06-23 04:10:12 +00:00
MsgID : 0 ,
Timestamp : 0 ,
2021-04-12 08:35:51 +00:00
SourceID : Params . NodeID ,
2021-01-26 01:43:41 +00:00
} ,
2021-04-12 08:35:51 +00:00
DbName : "" ,
CollectionName : resp . Schema . Name ,
CollectionID : resp . CollectionID ,
} )
if err = VerifyResponse ( presp , err ) ; err != nil {
2021-05-29 02:47:29 +00:00
log . Error ( "show partitions error" , zap . String ( "collectionName" , resp . Schema . Name ) ,
zap . Int64 ( "collectionID" , resp . CollectionID ) , zap . Error ( err ) )
2021-01-26 01:43:41 +00:00
return err
}
2021-04-12 08:35:51 +00:00
collInfo := & datapb . CollectionInfo {
ID : resp . CollectionID ,
Schema : resp . Schema ,
Partitions : presp . PartitionIDs ,
}
return s . meta . AddCollection ( collInfo )
2021-01-22 03:07:07 +00:00
}
2021-05-31 10:47:32 +00:00
func ( s * Server ) prepareBinlog ( req * datapb . SaveBinlogPathsRequest ) ( map [ string ] string , error ) {
2021-05-25 07:35:37 +00:00
meta := make ( map [ string ] string )
2021-05-20 03:34:45 +00:00
2021-05-26 04:21:55 +00:00
for _ , fieldBlp := range req . Field2BinlogPaths {
fieldMeta , err := s . prepareField2PathMeta ( req . SegmentID , fieldBlp )
if err != nil {
return nil , err
}
for k , v := range fieldMeta {
meta [ k ] = v
}
2021-05-20 03:34:45 +00:00
}
2021-05-26 04:21:55 +00:00
2021-05-25 07:35:37 +00:00
return meta , nil
2021-05-20 03:34:45 +00:00
}
2021-05-21 08:54:29 +00:00
func composeSegmentFlushMsgPack ( segmentID UniqueID ) msgstream . MsgPack {
msgPack := msgstream . MsgPack {
Msgs : make ( [ ] msgstream . TsMsg , 0 , 1 ) ,
}
completeFlushMsg := internalpb . SegmentFlushCompletedMsg {
Base : & commonpb . MsgBase {
MsgType : commonpb . MsgType_SegmentFlushDone ,
2021-06-23 04:10:12 +00:00
MsgID : 0 ,
Timestamp : 0 ,
2021-05-21 08:54:29 +00:00
SourceID : Params . NodeID ,
} ,
SegmentID : segmentID ,
}
var msg msgstream . TsMsg = & msgstream . FlushCompletedMsg {
BaseMsg : msgstream . BaseMsg {
HashValues : [ ] uint32 { 0 } ,
} ,
SegmentFlushCompletedMsg : completeFlushMsg ,
}
msgPack . Msgs = append ( msgPack . Msgs , msg )
return msgPack
}