2021-04-19 03:12:56 +00:00
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
2021-06-18 13:30:08 +00:00
package rootcoord
2021-01-19 06:44:03 +00:00
import (
2021-03-13 06:42:53 +00:00
"context"
2021-02-07 09:02:13 +00:00
"fmt"
2021-02-05 06:09:55 +00:00
2021-01-19 06:44:03 +00:00
"github.com/golang/protobuf/proto"
2021-04-22 06:45:57 +00:00
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
2021-05-26 12:14:30 +00:00
"github.com/milvus-io/milvus/internal/proto/proxypb"
2021-04-22 06:45:57 +00:00
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/typeutil"
2021-03-25 06:41:46 +00:00
"go.uber.org/zap"
2021-01-19 06:44:03 +00:00
)
type reqTask interface {
2021-03-13 06:42:53 +00:00
Ctx ( ) context . Context
2021-01-19 06:44:03 +00:00
Type ( ) commonpb . MsgType
2021-03-13 06:42:53 +00:00
Execute ( ctx context . Context ) error
2021-06-26 01:22:11 +00:00
Core ( ) * Core
2021-01-19 06:44:03 +00:00
}
type baseReqTask struct {
2021-03-13 06:42:53 +00:00
ctx context . Context
2021-01-19 06:44:03 +00:00
core * Core
}
2021-06-26 01:22:11 +00:00
func ( b * baseReqTask ) Core ( ) * Core {
return b . core
2021-01-19 06:44:03 +00:00
}
2021-06-26 01:22:11 +00:00
func ( b * baseReqTask ) Ctx ( ) context . Context {
return b . ctx
}
func executeTask ( t reqTask ) error {
errChan := make ( chan error )
go func ( ) {
err := t . Execute ( t . Ctx ( ) )
errChan <- err
} ( )
2021-01-19 06:44:03 +00:00
select {
2021-06-26 01:22:11 +00:00
case <- t . Core ( ) . ctx . Done ( ) :
return fmt . Errorf ( "context canceled" )
case <- t . Ctx ( ) . Done ( ) :
return fmt . Errorf ( "context canceled" )
case err := <- errChan :
2021-01-19 06:44:03 +00:00
return err
}
}
type CreateCollectionReqTask struct {
baseReqTask
Req * milvuspb . CreateCollectionRequest
}
func ( t * CreateCollectionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-01-23 02:12:41 +00:00
2021-03-13 06:42:53 +00:00
func ( t * CreateCollectionReqTask ) Execute ( ctx context . Context ) error {
2021-05-17 11:15:01 +00:00
const defaultShardsNum = 2
2021-04-08 07:26:18 +00:00
if t . Type ( ) != commonpb . MsgType_CreateCollection {
return fmt . Errorf ( "create collection, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-01-19 06:44:03 +00:00
var schema schemapb . CollectionSchema
err := proto . Unmarshal ( t . Req . Schema , & schema )
if err != nil {
return err
}
2021-01-23 02:12:41 +00:00
if t . Req . CollectionName != schema . Name {
2021-03-05 02:15:27 +00:00
return fmt . Errorf ( "collection name = %s, schema.Name=%s" , t . Req . CollectionName , schema . Name )
2021-01-23 02:12:41 +00:00
}
2021-05-17 11:15:01 +00:00
if t . Req . ShardsNum <= 0 {
log . Debug ( "Set ShardsNum to default" , zap . String ( "collection name" , t . Req . CollectionName ) ,
zap . Int32 ( "defaultShardsNum" , defaultShardsNum ) )
t . Req . ShardsNum = defaultShardsNum
}
2021-01-19 06:44:03 +00:00
for idx , field := range schema . Fields {
field . FieldID = int64 ( idx + StartOfUserFieldID )
}
rowIDField := & schemapb . FieldSchema {
FieldID : int64 ( RowIDField ) ,
Name : RowIDFieldName ,
IsPrimaryKey : false ,
Description : "row id" ,
2021-03-12 06:22:09 +00:00
DataType : schemapb . DataType_Int64 ,
2021-01-19 06:44:03 +00:00
}
timeStampField := & schemapb . FieldSchema {
FieldID : int64 ( TimeStampField ) ,
Name : TimeStampFieldName ,
IsPrimaryKey : false ,
Description : "time stamp" ,
2021-03-12 06:22:09 +00:00
DataType : schemapb . DataType_Int64 ,
2021-01-19 06:44:03 +00:00
}
schema . Fields = append ( schema . Fields , rowIDField , timeStampField )
2021-05-20 06:14:14 +00:00
collID , _ , err := t . core . IDAllocator ( 1 )
2021-01-19 06:44:03 +00:00
if err != nil {
return err
}
2021-05-20 06:14:14 +00:00
collTs := t . Req . Base . Timestamp
partID , _ , err := t . core . IDAllocator ( 1 )
2021-01-19 06:44:03 +00:00
if err != nil {
return err
}
2021-05-17 11:15:01 +00:00
2021-06-19 03:34:08 +00:00
log . Debug ( "collection name -> id" ,
zap . String ( "collection name" , t . Req . CollectionName ) ,
zap . Int64 ( "colletion_id" , collID ) ,
zap . Int64 ( "default partition id" , partID ) )
2021-05-17 11:15:01 +00:00
vchanNames := make ( [ ] string , t . Req . ShardsNum )
chanNames := make ( [ ] string , t . Req . ShardsNum )
for i := int32 ( 0 ) ; i < t . Req . ShardsNum ; i ++ {
2021-06-19 06:18:08 +00:00
vchanNames [ i ] = fmt . Sprintf ( "%s_%d_%d_v" , t . Req . CollectionName , collID , i )
chanNames [ i ] = ToPhysicalChannel ( vchanNames [ i ] )
2021-05-17 11:15:01 +00:00
}
2021-05-14 13:26:06 +00:00
collInfo := etcdpb . CollectionInfo {
2021-05-17 11:15:01 +00:00
ID : collID ,
Schema : & schema ,
CreateTime : collTs ,
PartitionIDs : make ( [ ] typeutil . UniqueID , 0 , 16 ) ,
FieldIndexes : make ( [ ] * etcdpb . FieldIndexInfo , 0 , 16 ) ,
VirtualChannelNames : vchanNames ,
PhysicalChannelNames : chanNames ,
2021-01-20 01:36:50 +00:00
}
2021-05-12 07:33:53 +00:00
// every collection has _default partition
2021-05-14 13:26:06 +00:00
partInfo := etcdpb . PartitionInfo {
2021-01-20 01:36:50 +00:00
PartitionName : Params . DefaultPartitionName ,
2021-05-14 13:26:06 +00:00
PartitionID : partID ,
2021-01-20 01:36:50 +00:00
SegmentIDs : make ( [ ] typeutil . UniqueID , 0 , 16 ) ,
2021-01-19 06:44:03 +00:00
}
2021-02-11 00:41:59 +00:00
idxInfo := make ( [ ] * etcdpb . IndexInfo , 0 , 16 )
2021-02-26 06:40:53 +00:00
/////////////////////// ignore index param from create_collection /////////////////////////
//for _, field := range schema.Fields {
2021-03-12 06:22:09 +00:00
// if field.DataType == schemapb.DataType_VectorFloat || field.DataType == schemapb.DataType_VectorBinary {
2021-02-26 06:40:53 +00:00
// if len(field.IndexParams) > 0 {
// idxID, err := t.core.idAllocator.AllocOne()
// if err != nil {
// return err
// }
// filedIdx := &etcdpb.FieldIndexInfo{
// FiledID: field.FieldID,
// IndexID: idxID,
// }
// idx := &etcdpb.IndexInfo{
// IndexName: fmt.Sprintf("%s_index_%d", collMeta.Schema.Name, field.FieldID),
// IndexID: idxID,
// IndexParams: field.IndexParams,
// }
// idxInfo = append(idxInfo, idx)
// collMeta.FieldIndexes = append(collMeta.FieldIndexes, filedIdx)
// }
// }
//}
2021-01-20 01:36:50 +00:00
2021-05-12 07:33:53 +00:00
// schema is modified (add RowIDField and TimestampField),
// so need Marshal again
2021-01-20 01:36:50 +00:00
schemaBytes , err := proto . Marshal ( & schema )
if err != nil {
return err
}
2021-05-14 13:26:06 +00:00
ddCollReq := internalpb . CreateCollectionRequest {
2021-05-17 11:15:01 +00:00
Base : t . Req . Base ,
DbName : t . Req . DbName ,
CollectionName : t . Req . CollectionName ,
DbID : 0 , //TODO,not used
CollectionID : collID ,
Schema : schemaBytes ,
VirtualChannelNames : vchanNames ,
PhysicalChannelNames : chanNames ,
2021-01-20 01:36:50 +00:00
}
2021-05-14 13:26:06 +00:00
ddPartReq := internalpb . CreatePartitionRequest {
2021-02-03 12:04:29 +00:00
Base : & commonpb . MsgBase {
2021-03-10 06:45:35 +00:00
MsgType : commonpb . MsgType_CreatePartition ,
2021-02-03 12:04:29 +00:00
MsgID : t . Req . Base . MsgID , //TODO, msg id
Timestamp : t . Req . Base . Timestamp + 1 ,
SourceID : t . Req . Base . SourceID ,
} ,
DbName : t . Req . DbName ,
CollectionName : t . Req . CollectionName ,
PartitionName : Params . DefaultPartitionName ,
DbID : 0 , //TODO, not used
2021-05-14 13:26:06 +00:00
CollectionID : collInfo . ID ,
PartitionID : partInfo . PartitionID ,
2021-02-03 12:04:29 +00:00
}
2021-05-14 13:26:06 +00:00
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
2021-05-20 06:14:14 +00:00
ddOp := func ( ts typeutil . Timestamp ) ( string , error ) {
2021-05-31 08:48:31 +00:00
ddCollReq . Base . Timestamp = ts
ddPartReq . Base . Timestamp = ts
2021-05-20 06:14:14 +00:00
return EncodeDdOperation ( & ddCollReq , & ddPartReq , CreateCollectionDDType )
2021-02-03 12:04:29 +00:00
}
2021-05-20 06:14:14 +00:00
ts , err := t . core . MetaTable . AddCollection ( & collInfo , & partInfo , idxInfo , ddOp )
2021-05-14 13:26:06 +00:00
if err != nil {
return err
}
2021-05-12 07:33:53 +00:00
2021-06-04 07:00:34 +00:00
// add dml channel before send dd msg
2021-06-08 11:25:37 +00:00
t . core . dmlChannels . AddProducerChannels ( chanNames ... )
2021-06-04 07:00:34 +00:00
2021-06-11 08:39:29 +00:00
err = t . core . SendDdCreateCollectionReq ( ctx , & ddCollReq , chanNames )
2021-05-14 13:26:06 +00:00
if err != nil {
return err
}
2021-06-11 08:39:29 +00:00
err = t . core . SendDdCreatePartitionReq ( ctx , & ddPartReq , chanNames )
2021-05-14 13:26:06 +00:00
if err != nil {
return err
}
2021-05-31 08:48:31 +00:00
t . core . SendTimeTick ( ts )
2021-05-20 06:14:14 +00:00
2021-05-14 13:26:06 +00:00
// Update DDOperation in etcd
return t . core . setDdMsgSendFlag ( true )
2021-01-19 06:44:03 +00:00
}
type DropCollectionReqTask struct {
baseReqTask
Req * milvuspb . DropCollectionRequest
}
func ( t * DropCollectionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-03-13 06:42:53 +00:00
func ( t * DropCollectionReqTask ) Execute ( ctx context . Context ) error {
2021-04-08 07:26:18 +00:00
if t . Type ( ) != commonpb . MsgType_DropCollection {
return fmt . Errorf ( "drop collection, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 06:18:02 +00:00
collMeta , err := t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , 0 )
2021-01-19 06:44:03 +00:00
if err != nil {
return err
}
2021-03-12 06:22:09 +00:00
ddReq := internalpb . DropCollectionRequest {
2021-01-20 01:36:50 +00:00
Base : t . Req . Base ,
DbName : t . Req . DbName ,
CollectionName : t . Req . CollectionName ,
DbID : 0 , //not used
CollectionID : collMeta . ID ,
}
2021-05-14 13:26:06 +00:00
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
2021-05-20 06:14:14 +00:00
ddOp := func ( ts typeutil . Timestamp ) ( string , error ) {
2021-05-31 08:48:31 +00:00
ddReq . Base . Timestamp = ts
2021-05-20 06:14:14 +00:00
return EncodeDdOperation ( & ddReq , nil , DropCollectionDDType )
2021-05-14 13:26:06 +00:00
}
2021-05-20 06:14:14 +00:00
ts , err := t . core . MetaTable . DeleteCollection ( collMeta . ID , ddOp )
2021-05-14 13:26:06 +00:00
if err != nil {
return err
}
2021-06-11 08:39:29 +00:00
err = t . core . SendDdDropCollectionReq ( ctx , & ddReq , collMeta . PhysicalChannelNames )
2021-01-19 06:44:03 +00:00
if err != nil {
return err
}
2021-02-05 06:09:55 +00:00
2021-05-31 08:48:31 +00:00
t . core . SendTimeTick ( ts )
2021-05-20 06:14:14 +00:00
2021-06-04 07:00:34 +00:00
// remove dml channel after send dd msg
t . core . dmlChannels . RemoveProducerChannels ( collMeta . PhysicalChannelNames ... )
2021-02-05 06:09:55 +00:00
//notify query service to release collection
go func ( ) {
2021-05-26 12:14:30 +00:00
if err = t . core . CallReleaseCollectionService ( t . core . ctx , ts , 0 , collMeta . ID ) ; err != nil {
2021-05-25 06:03:06 +00:00
log . Warn ( "CallReleaseCollectionService failed" , zap . String ( "error" , err . Error ( ) ) )
2021-02-05 06:09:55 +00:00
}
} ( )
2021-05-26 12:14:30 +00:00
req := proxypb . InvalidateCollMetaCacheRequest {
Base : & commonpb . MsgBase {
MsgType : 0 , //TODO, msg type
MsgID : 0 , //TODO, msg id
Timestamp : ts ,
SourceID : t . core . session . ServerID ,
} ,
DbName : t . Req . DbName ,
CollectionName : t . Req . CollectionName ,
}
2021-05-14 13:26:06 +00:00
// error doesn't matter here
2021-05-26 12:14:30 +00:00
t . core . proxyClientManager . InvalidateCollectionMetaCache ( ctx , & req )
2021-05-12 07:33:53 +00:00
2021-05-14 13:26:06 +00:00
// Update DDOperation in etcd
return t . core . setDdMsgSendFlag ( true )
2021-01-19 06:44:03 +00:00
}
type HasCollectionReqTask struct {
baseReqTask
Req * milvuspb . HasCollectionRequest
HasCollection bool
}
func ( t * HasCollectionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-04-08 07:26:18 +00:00
func ( t * HasCollectionReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_HasCollection {
return fmt . Errorf ( "has collection, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 09:12:17 +00:00
_ , err := t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , t . Req . TimeStamp )
2021-01-19 06:44:03 +00:00
if err == nil {
t . HasCollection = true
} else {
t . HasCollection = false
}
return nil
}
type DescribeCollectionReqTask struct {
baseReqTask
Req * milvuspb . DescribeCollectionRequest
Rsp * milvuspb . DescribeCollectionResponse
}
func ( t * DescribeCollectionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-03-13 06:42:53 +00:00
func ( t * DescribeCollectionReqTask ) Execute ( ctx context . Context ) error {
2021-04-08 07:26:18 +00:00
if t . Type ( ) != commonpb . MsgType_DescribeCollection {
return fmt . Errorf ( "describe collection, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-17 11:15:01 +00:00
var collInfo * etcdpb . CollectionInfo
2021-02-25 08:08:56 +00:00
var err error
if t . Req . CollectionName != "" {
2021-05-18 09:12:17 +00:00
collInfo , err = t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , t . Req . TimeStamp )
2021-02-25 08:08:56 +00:00
if err != nil {
return err
}
} else {
2021-05-18 09:12:17 +00:00
collInfo , err = t . core . MetaTable . GetCollectionByID ( t . Req . CollectionID , t . Req . TimeStamp )
2021-02-25 08:08:56 +00:00
if err != nil {
return err
}
2021-01-19 06:44:03 +00:00
}
2021-02-25 08:08:56 +00:00
2021-05-17 11:15:01 +00:00
t . Rsp . Schema = proto . Clone ( collInfo . Schema ) . ( * schemapb . CollectionSchema )
t . Rsp . CollectionID = collInfo . ID
2021-06-08 11:25:37 +00:00
//var newField []*schemapb.FieldSchema
//for _, field := range t.Rsp.Schema.Fields {
// if field.FieldID >= StartOfUserFieldID {
// newField = append(newField, field)
// }
//}
//t.Rsp.Schema.Fields = newField
2021-05-17 11:15:01 +00:00
t . Rsp . VirtualChannelNames = collInfo . VirtualChannelNames
t . Rsp . PhysicalChannelNames = collInfo . PhysicalChannelNames
2021-01-19 06:44:03 +00:00
return nil
}
type ShowCollectionReqTask struct {
baseReqTask
2021-03-12 06:22:09 +00:00
Req * milvuspb . ShowCollectionsRequest
Rsp * milvuspb . ShowCollectionsResponse
2021-01-19 06:44:03 +00:00
}
func ( t * ShowCollectionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-04-08 07:26:18 +00:00
func ( t * ShowCollectionReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_ShowCollections {
return fmt . Errorf ( "show collection, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 09:12:17 +00:00
coll , err := t . core . MetaTable . ListCollections ( t . Req . TimeStamp )
2021-01-19 06:44:03 +00:00
if err != nil {
return err
}
2021-06-03 11:09:33 +00:00
for name , id := range coll {
t . Rsp . CollectionNames = append ( t . Rsp . CollectionNames , name )
t . Rsp . CollectionIds = append ( t . Rsp . CollectionIds , id )
}
2021-01-19 06:44:03 +00:00
return nil
}
type CreatePartitionReqTask struct {
baseReqTask
Req * milvuspb . CreatePartitionRequest
}
func ( t * CreatePartitionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-03-13 06:42:53 +00:00
func ( t * CreatePartitionReqTask ) Execute ( ctx context . Context ) error {
2021-04-08 07:26:18 +00:00
if t . Type ( ) != commonpb . MsgType_CreatePartition {
return fmt . Errorf ( "create partition, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 06:18:02 +00:00
collMeta , err := t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , 0 )
2021-01-19 06:44:03 +00:00
if err != nil {
return err
}
2021-05-20 06:14:14 +00:00
partID , _ , err := t . core . IDAllocator ( 1 )
2021-01-19 06:44:03 +00:00
if err != nil {
return err
}
2021-03-12 06:22:09 +00:00
ddReq := internalpb . CreatePartitionRequest {
2021-01-20 01:36:50 +00:00
Base : t . Req . Base ,
DbName : t . Req . DbName ,
CollectionName : t . Req . CollectionName ,
PartitionName : t . Req . PartitionName ,
DbID : 0 , // todo, not used
CollectionID : collMeta . ID ,
2021-05-14 13:26:06 +00:00
PartitionID : partID ,
2021-01-20 01:36:50 +00:00
}
2021-05-14 13:26:06 +00:00
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
2021-05-20 06:14:14 +00:00
ddOp := func ( ts typeutil . Timestamp ) ( string , error ) {
2021-05-31 08:48:31 +00:00
ddReq . Base . Timestamp = ts
2021-05-20 06:14:14 +00:00
return EncodeDdOperation ( & ddReq , nil , CreatePartitionDDType )
2021-01-19 06:44:03 +00:00
}
2021-05-20 06:14:14 +00:00
ts , err := t . core . MetaTable . AddPartition ( collMeta . ID , t . Req . PartitionName , partID , ddOp )
2021-05-14 13:26:06 +00:00
if err != nil {
return err
}
2021-03-22 08:36:10 +00:00
2021-06-11 08:39:29 +00:00
err = t . core . SendDdCreatePartitionReq ( ctx , & ddReq , collMeta . PhysicalChannelNames )
2021-05-14 13:26:06 +00:00
if err != nil {
return err
}
2021-05-12 07:33:53 +00:00
2021-05-31 08:48:31 +00:00
t . core . SendTimeTick ( ts )
2021-05-20 06:14:14 +00:00
2021-05-26 12:14:30 +00:00
req := proxypb . InvalidateCollMetaCacheRequest {
Base : & commonpb . MsgBase {
MsgType : 0 , //TODO, msg type
MsgID : 0 , //TODO, msg id
Timestamp : ts ,
SourceID : t . core . session . ServerID ,
} ,
DbName : t . Req . DbName ,
CollectionName : t . Req . CollectionName ,
}
2021-05-14 13:26:06 +00:00
// error doesn't matter here
2021-05-26 12:14:30 +00:00
t . core . proxyClientManager . InvalidateCollectionMetaCache ( ctx , & req )
2021-05-14 13:26:06 +00:00
// Update DDOperation in etcd
return t . core . setDdMsgSendFlag ( true )
2021-01-19 06:44:03 +00:00
}
type DropPartitionReqTask struct {
baseReqTask
Req * milvuspb . DropPartitionRequest
}
func ( t * DropPartitionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-03-13 06:42:53 +00:00
func ( t * DropPartitionReqTask ) Execute ( ctx context . Context ) error {
2021-04-08 07:26:18 +00:00
if t . Type ( ) != commonpb . MsgType_DropPartition {
return fmt . Errorf ( "drop partition, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 06:18:02 +00:00
collInfo , err := t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , 0 )
2021-01-19 06:44:03 +00:00
if err != nil {
return err
}
2021-05-18 06:18:02 +00:00
partInfo , err := t . core . MetaTable . GetPartitionByName ( collInfo . ID , t . Req . PartitionName , 0 )
2021-01-19 06:44:03 +00:00
if err != nil {
return err
}
2021-03-12 06:22:09 +00:00
ddReq := internalpb . DropPartitionRequest {
2021-01-20 01:36:50 +00:00
Base : t . Req . Base ,
DbName : t . Req . DbName ,
CollectionName : t . Req . CollectionName ,
PartitionName : t . Req . PartitionName ,
DbID : 0 , //todo,not used
2021-05-14 13:26:06 +00:00
CollectionID : collInfo . ID ,
PartitionID : partInfo . PartitionID ,
2021-01-20 01:36:50 +00:00
}
2021-05-14 13:26:06 +00:00
// build DdOperation and save it into etcd, when ddmsg send fail,
// system can restore ddmsg from etcd and re-send
2021-05-20 06:14:14 +00:00
ddOp := func ( ts typeutil . Timestamp ) ( string , error ) {
2021-05-31 08:48:31 +00:00
ddReq . Base . Timestamp = ts
2021-05-20 06:14:14 +00:00
return EncodeDdOperation ( & ddReq , nil , DropPartitionDDType )
2021-01-19 06:44:03 +00:00
}
2021-03-22 08:36:10 +00:00
2021-05-20 06:14:14 +00:00
ts , _ , err := t . core . MetaTable . DeletePartition ( collInfo . ID , t . Req . PartitionName , ddOp )
2021-05-14 13:26:06 +00:00
if err != nil {
return err
}
2021-05-12 07:33:53 +00:00
2021-06-11 08:39:29 +00:00
err = t . core . SendDdDropPartitionReq ( ctx , & ddReq , collInfo . PhysicalChannelNames )
2021-05-14 13:26:06 +00:00
if err != nil {
return err
}
2021-05-12 07:33:53 +00:00
2021-05-31 08:48:31 +00:00
t . core . SendTimeTick ( ts )
2021-05-20 06:14:14 +00:00
2021-05-26 12:14:30 +00:00
req := proxypb . InvalidateCollMetaCacheRequest {
Base : & commonpb . MsgBase {
MsgType : 0 , //TODO, msg type
MsgID : 0 , //TODO, msg id
Timestamp : ts ,
SourceID : t . core . session . ServerID ,
} ,
DbName : t . Req . DbName ,
CollectionName : t . Req . CollectionName ,
}
2021-05-14 13:26:06 +00:00
// error doesn't matter here
2021-05-26 12:14:30 +00:00
t . core . proxyClientManager . InvalidateCollectionMetaCache ( ctx , & req )
2021-05-14 13:26:06 +00:00
2021-06-22 08:08:08 +00:00
//notify query service to release partition
go func ( ) {
if err = t . core . CallReleasePartitionService ( t . core . ctx , ts , 0 , collInfo . ID , [ ] typeutil . UniqueID { partInfo . PartitionID } ) ; err != nil {
log . Warn ( "CallReleaseCollectionService failed" , zap . String ( "error" , err . Error ( ) ) )
}
} ( )
2021-05-14 13:26:06 +00:00
// Update DDOperation in etcd
return t . core . setDdMsgSendFlag ( true )
2021-01-19 06:44:03 +00:00
}
type HasPartitionReqTask struct {
baseReqTask
Req * milvuspb . HasPartitionRequest
HasPartition bool
}
func ( t * HasPartitionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-04-08 07:26:18 +00:00
func ( t * HasPartitionReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_HasPartition {
return fmt . Errorf ( "has partition, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 06:18:02 +00:00
coll , err := t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , 0 )
2021-01-19 06:44:03 +00:00
if err != nil {
return err
}
2021-05-18 06:18:02 +00:00
t . HasPartition = t . core . MetaTable . HasPartition ( coll . ID , t . Req . PartitionName , 0 )
2021-01-19 06:44:03 +00:00
return nil
}
type ShowPartitionReqTask struct {
baseReqTask
2021-03-12 06:22:09 +00:00
Req * milvuspb . ShowPartitionsRequest
Rsp * milvuspb . ShowPartitionsResponse
2021-01-19 06:44:03 +00:00
}
func ( t * ShowPartitionReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-04-08 07:26:18 +00:00
func ( t * ShowPartitionReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_ShowPartitions {
return fmt . Errorf ( "show partition, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-02-05 08:32:03 +00:00
var coll * etcdpb . CollectionInfo
var err error
if t . Req . CollectionName == "" {
2021-05-18 06:18:02 +00:00
coll , err = t . core . MetaTable . GetCollectionByID ( t . Req . CollectionID , 0 )
2021-02-05 08:32:03 +00:00
} else {
2021-05-18 06:18:02 +00:00
coll , err = t . core . MetaTable . GetCollectionByName ( t . Req . CollectionName , 0 )
2021-02-05 08:32:03 +00:00
}
2021-01-19 06:44:03 +00:00
if err != nil {
return err
}
2021-01-20 01:36:50 +00:00
for _ , partID := range coll . PartitionIDs {
2021-05-18 06:18:02 +00:00
partMeta , err := t . core . MetaTable . GetPartitionByID ( coll . ID , partID , 0 )
2021-01-20 01:36:50 +00:00
if err != nil {
return err
}
t . Rsp . PartitionIDs = append ( t . Rsp . PartitionIDs , partMeta . PartitionID )
t . Rsp . PartitionNames = append ( t . Rsp . PartitionNames , partMeta . PartitionName )
}
2021-01-19 06:44:03 +00:00
return nil
}
2021-01-21 02:01:29 +00:00
type DescribeSegmentReqTask struct {
baseReqTask
Req * milvuspb . DescribeSegmentRequest
Rsp * milvuspb . DescribeSegmentResponse //TODO,return repeated segment id in the future
}
func ( t * DescribeSegmentReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-04-08 07:26:18 +00:00
func ( t * DescribeSegmentReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_DescribeSegment {
return fmt . Errorf ( "describe segment, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 06:18:02 +00:00
coll , err := t . core . MetaTable . GetCollectionByID ( t . Req . CollectionID , 0 )
2021-01-21 02:01:29 +00:00
if err != nil {
return err
}
exist := false
for _ , partID := range coll . PartitionIDs {
if exist {
break
}
2021-05-18 06:18:02 +00:00
partMeta , err := t . core . MetaTable . GetPartitionByID ( coll . ID , partID , 0 )
2021-01-21 02:01:29 +00:00
if err != nil {
return err
}
for _ , e := range partMeta . SegmentIDs {
if e == t . Req . SegmentID {
exist = true
break
}
}
}
if ! exist {
2021-03-05 02:15:27 +00:00
return fmt . Errorf ( "segment id %d not belong to collection id %d" , t . Req . SegmentID , t . Req . CollectionID )
2021-01-21 02:01:29 +00:00
}
//TODO, get filed_id and index_name from request
segIdxInfo , err := t . core . MetaTable . GetSegmentIndexInfoByID ( t . Req . SegmentID , - 1 , "" )
2021-06-23 07:28:09 +00:00
log . Debug ( "RootCoord DescribeSegmentReqTask, MetaTable.GetSegmentIndexInfoByID" , zap . Any ( "SegmentID" , t . Req . SegmentID ) ,
2021-06-06 01:41:35 +00:00
zap . Any ( "segIdxInfo" , segIdxInfo ) , zap . Error ( err ) )
2021-01-21 02:01:29 +00:00
if err != nil {
return err
}
t . Rsp . IndexID = segIdxInfo . IndexID
2021-01-28 09:25:43 +00:00
t . Rsp . BuildID = segIdxInfo . BuildID
2021-03-08 07:46:51 +00:00
t . Rsp . EnableIndex = segIdxInfo . EnableIndex
2021-01-21 02:01:29 +00:00
return nil
}
type ShowSegmentReqTask struct {
baseReqTask
2021-03-12 06:22:09 +00:00
Req * milvuspb . ShowSegmentsRequest
Rsp * milvuspb . ShowSegmentsResponse
2021-01-21 02:01:29 +00:00
}
func ( t * ShowSegmentReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-04-08 07:26:18 +00:00
func ( t * ShowSegmentReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_ShowSegments {
return fmt . Errorf ( "show segments, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-05-18 06:18:02 +00:00
coll , err := t . core . MetaTable . GetCollectionByID ( t . Req . CollectionID , 0 )
2021-01-21 02:01:29 +00:00
if err != nil {
return err
}
2021-02-22 08:40:26 +00:00
exist := false
2021-01-21 02:01:29 +00:00
for _ , partID := range coll . PartitionIDs {
2021-02-22 08:40:26 +00:00
if partID == t . Req . PartitionID {
exist = true
break
2021-01-21 02:01:29 +00:00
}
}
2021-02-22 08:40:26 +00:00
if ! exist {
2021-03-05 02:15:27 +00:00
return fmt . Errorf ( "partition id = %d not belong to collection id = %d" , t . Req . PartitionID , t . Req . CollectionID )
2021-02-22 08:40:26 +00:00
}
2021-05-18 06:18:02 +00:00
partMeta , err := t . core . MetaTable . GetPartitionByID ( coll . ID , t . Req . PartitionID , 0 )
2021-02-22 08:40:26 +00:00
if err != nil {
return err
}
t . Rsp . SegmentIDs = append ( t . Rsp . SegmentIDs , partMeta . SegmentIDs ... )
2021-01-21 02:01:29 +00:00
return nil
}
type CreateIndexReqTask struct {
baseReqTask
Req * milvuspb . CreateIndexRequest
}
func ( t * CreateIndexReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-03-13 06:42:53 +00:00
func ( t * CreateIndexReqTask ) Execute ( ctx context . Context ) error {
2021-04-08 07:26:18 +00:00
if t . Type ( ) != commonpb . MsgType_CreateIndex {
return fmt . Errorf ( "create index, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-02-08 06:49:12 +00:00
indexName := Params . DefaultIndexName //TODO, get name from request
2021-05-20 06:14:14 +00:00
indexID , _ , err := t . core . IDAllocator ( 1 )
2021-06-23 07:28:09 +00:00
log . Debug ( "RootCoord CreateIndexReqTask" , zap . Any ( "indexID" , indexID ) , zap . Error ( err ) )
2021-02-09 05:11:55 +00:00
if err != nil {
return err
}
idxInfo := & etcdpb . IndexInfo {
IndexName : indexName ,
IndexID : indexID ,
IndexParams : t . Req . ExtraParams ,
}
segIDs , field , err := t . core . MetaTable . GetNotIndexedSegments ( t . Req . CollectionName , t . Req . FieldName , idxInfo )
2021-06-23 07:28:09 +00:00
log . Debug ( "RootCoord CreateIndexReqTask metaTable.GetNotIndexedSegments" , zap . Error ( err ) )
2021-01-21 02:01:29 +00:00
if err != nil {
return err
}
2021-03-12 06:22:09 +00:00
if field . DataType != schemapb . DataType_FloatVector && field . DataType != schemapb . DataType_BinaryVector {
2021-03-05 02:15:27 +00:00
return fmt . Errorf ( "field name = %s, data type = %s" , t . Req . FieldName , schemapb . DataType_name [ int32 ( field . DataType ) ] )
2021-01-22 07:41:54 +00:00
}
2021-05-24 06:19:52 +00:00
var segIdxInfos [ ] * etcdpb . SegmentIndexInfo
2021-05-15 10:08:08 +00:00
for _ , segID := range segIDs {
2021-05-24 06:19:52 +00:00
info := etcdpb . SegmentIndexInfo {
SegmentID : segID ,
FieldID : field . FieldID ,
IndexID : idxInfo . IndexID ,
EnableIndex : false ,
}
2021-06-30 08:18:13 +00:00
info . BuildID , err = t . core . BuildIndex ( ctx , segID , & field , idxInfo , false )
2021-05-24 06:19:52 +00:00
if err != nil {
2021-05-15 10:08:08 +00:00
return err
2021-01-21 02:01:29 +00:00
}
2021-06-06 01:41:35 +00:00
if info . BuildID != 0 {
info . EnableIndex = true
}
2021-05-24 06:19:52 +00:00
segIdxInfos = append ( segIdxInfos , & info )
2021-01-21 02:01:29 +00:00
}
2021-05-24 06:19:52 +00:00
_ , err = t . core . MetaTable . AddIndex ( segIdxInfos , "" , "" )
2021-06-23 07:28:09 +00:00
log . Debug ( "RootCoord CreateIndexReq" , zap . Any ( "segIdxInfos" , segIdxInfos ) , zap . Error ( err ) )
2021-05-24 06:19:52 +00:00
return err
2021-01-21 02:01:29 +00:00
}
type DescribeIndexReqTask struct {
baseReqTask
Req * milvuspb . DescribeIndexRequest
Rsp * milvuspb . DescribeIndexResponse
}
func ( t * DescribeIndexReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-04-08 07:26:18 +00:00
func ( t * DescribeIndexReqTask ) Execute ( ctx context . Context ) error {
if t . Type ( ) != commonpb . MsgType_DescribeIndex {
return fmt . Errorf ( "describe index, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-04-27 02:30:55 +00:00
coll , idx , err := t . core . MetaTable . GetIndexByName ( t . Req . CollectionName , t . Req . IndexName )
2021-01-21 02:01:29 +00:00
if err != nil {
return err
}
for _ , i := range idx {
2021-04-27 02:30:55 +00:00
f , err := GetFieldSchemaByIndexID ( & coll , typeutil . UniqueID ( i . IndexID ) )
if err != nil {
log . Warn ( "get field schema by index id failed" , zap . String ( "collection name" , t . Req . CollectionName ) , zap . String ( "index name" , t . Req . IndexName ) , zap . Error ( err ) )
continue
}
2021-01-21 02:01:29 +00:00
desc := & milvuspb . IndexDescription {
IndexName : i . IndexName ,
Params : i . IndexParams ,
2021-02-08 06:20:29 +00:00
IndexID : i . IndexID ,
2021-04-27 02:30:55 +00:00
FieldName : f . Name ,
2021-01-21 02:01:29 +00:00
}
t . Rsp . IndexDescriptions = append ( t . Rsp . IndexDescriptions , desc )
}
return nil
}
2021-02-20 07:38:44 +00:00
type DropIndexReqTask struct {
baseReqTask
Req * milvuspb . DropIndexRequest
}
func ( t * DropIndexReqTask ) Type ( ) commonpb . MsgType {
return t . Req . Base . MsgType
}
2021-03-13 06:42:53 +00:00
func ( t * DropIndexReqTask ) Execute ( ctx context . Context ) error {
2021-04-08 07:26:18 +00:00
if t . Type ( ) != commonpb . MsgType_DropIndex {
return fmt . Errorf ( "drop index, msg type = %s" , commonpb . MsgType_name [ int32 ( t . Type ( ) ) ] )
}
2021-04-27 02:30:55 +00:00
_ , info , err := t . core . MetaTable . GetIndexByName ( t . Req . CollectionName , t . Req . IndexName )
2021-02-20 07:38:44 +00:00
if err != nil {
2021-03-04 06:54:16 +00:00
log . Warn ( "GetIndexByName failed," , zap . String ( "collection name" , t . Req . CollectionName ) , zap . String ( "field name" , t . Req . FieldName ) , zap . String ( "index name" , t . Req . IndexName ) , zap . Error ( err ) )
2021-02-20 07:38:44 +00:00
return err
}
2021-03-04 06:54:16 +00:00
if len ( info ) == 0 {
return nil
2021-02-20 07:38:44 +00:00
}
2021-03-04 06:54:16 +00:00
if len ( info ) != 1 {
2021-03-05 02:15:27 +00:00
return fmt . Errorf ( "len(index) = %d" , len ( info ) )
2021-03-04 06:54:16 +00:00
}
2021-05-25 06:03:06 +00:00
err = t . core . CallDropIndexService ( ctx , info [ 0 ] . IndexID )
2021-03-04 06:54:16 +00:00
if err != nil {
return err
}
2021-05-18 06:18:02 +00:00
_ , _ , _ , err = t . core . MetaTable . DropIndex ( t . Req . CollectionName , t . Req . FieldName , t . Req . IndexName )
2021-03-04 06:54:16 +00:00
return err
2021-02-20 07:38:44 +00:00
}