Clean golint warnings for rootcoord (#8383)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/8392/head
Cai Yudong 2021-09-23 15:10:00 +08:00 committed by GitHub
parent aef2a73846
commit 177a465913
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 217 additions and 66 deletions

View File

@ -21,9 +21,18 @@ import "github.com/milvus-io/milvus/internal/common"
// 102: ...
const (
// StartOfUserFieldID id of user defined filed begin from here
StartOfUserFieldID = common.StartOfUserFieldID
RowIDField = common.RowIDField
TimeStampField = common.TimeStampField
RowIDFieldName = common.RowIDFieldName
// RowIDField id of row ID field
RowIDField = common.RowIDField
// TimeStampField id of timestamp field
TimeStampField = common.TimeStampField
// RowIDFieldName name of row ID field
RowIDFieldName = common.RowIDFieldName
// TimeStampFieldName name of timestamp field
TimeStampFieldName = common.TimeStampFieldName
)

View File

@ -26,6 +26,7 @@ import (
)
const (
// RequestTimeout timeout for request
RequestTimeout = 10 * time.Second
)

View File

@ -29,29 +29,60 @@ import (
)
const (
ComponentPrefix = "root-coord"
TenantMetaPrefix = ComponentPrefix + "/tenant"
ProxyMetaPrefix = ComponentPrefix + "/proxy"
CollectionMetaPrefix = ComponentPrefix + "/collection"
SegmentIndexMetaPrefix = ComponentPrefix + "/segment-index"
IndexMetaPrefix = ComponentPrefix + "/index"
// ComponentPrefix prefix for rootcoord component
ComponentPrefix = "root-coord"
// TenantMetaPrefix prefix for tenant meta
TenantMetaPrefix = ComponentPrefix + "/tenant"
// ProxyMetaPrefix prefix for proxy meta
ProxyMetaPrefix = ComponentPrefix + "/proxy"
// CollectionMetaPrefix prefix for collection meta
CollectionMetaPrefix = ComponentPrefix + "/collection"
// SegmentIndexMetaPrefix prefix for segment index meta
SegmentIndexMetaPrefix = ComponentPrefix + "/segment-index"
// IndexMetaPrefix prefix for index meta
IndexMetaPrefix = ComponentPrefix + "/index"
// CollectionAliasMetaPrefix prefix for collection alias meta
CollectionAliasMetaPrefix = ComponentPrefix + "/collection-alias"
// TimestampPrefix prefix for timestamp
TimestampPrefix = ComponentPrefix + "/timestamp"
// DDOperationPrefix prefix for DD operation
DDOperationPrefix = ComponentPrefix + "/dd-operation"
DDMsgSendPrefix = ComponentPrefix + "/dd-msg-send"
// DDMsgSendPrefix prefix to indicate whether DD msg has been send
DDMsgSendPrefix = ComponentPrefix + "/dd-msg-send"
// CreateCollectionDDType name of DD type for create collection
CreateCollectionDDType = "CreateCollection"
DropCollectionDDType = "DropCollection"
CreatePartitionDDType = "CreatePartition"
DropPartitionDDType = "DropPartition"
CreateAliasDDType = "CreateAlias"
DropAliasDDType = "DropAlias"
AlterAliasDDType = "AlterAlias"
// DropCollectionDDType name of DD type for drop collection
DropCollectionDDType = "DropCollection"
// CreatePartitionDDType name of DD type for create partition
CreatePartitionDDType = "CreatePartition"
// DropPartitionDDType name of DD type for drop partition
DropPartitionDDType = "DropPartition"
// CreateAliasDDType name of DD type for create collection alias
CreateAliasDDType = "CreateAlias"
// DropAliasDDType name of DD type for drop collection alias
DropAliasDDType = "DropAlias"
// AlterAliasDDType name of DD type for alter collection alias
AlterAliasDDType = "AlterAlias"
)
type metaTable struct {
// MetaTable store all rootcoord meta info
type MetaTable struct {
client kv.SnapShotKV // client of a reliable kv service, i.e. etcd client
tenantID2Meta map[typeutil.UniqueID]pb.TenantMeta // tenant id to tenant meta
proxyID2Meta map[typeutil.UniqueID]pb.ProxyMeta // proxy id to proxy meta
@ -67,8 +98,10 @@ type metaTable struct {
ddLock sync.RWMutex
}
func NewMetaTable(kv kv.SnapShotKV) (*metaTable, error) {
mt := &metaTable{
// NewMetaTable create meta table for rootcoord, which stores all in-memory information
// for collection, partion, segment, index etc.
func NewMetaTable(kv kv.SnapShotKV) (*MetaTable, error) {
mt := &MetaTable{
client: kv,
tenantLock: sync.RWMutex{},
proxyLock: sync.RWMutex{},
@ -81,8 +114,7 @@ func NewMetaTable(kv kv.SnapShotKV) (*metaTable, error) {
return mt, nil
}
func (mt *metaTable) reloadFromKV() error {
func (mt *MetaTable) reloadFromKV() error {
mt.tenantID2Meta = make(map[typeutil.UniqueID]pb.TenantMeta)
mt.proxyID2Meta = make(map[typeutil.UniqueID]pb.ProxyMeta)
mt.collID2Meta = make(map[typeutil.UniqueID]pb.CollectionInfo)
@ -197,7 +229,7 @@ func (mt *metaTable) reloadFromKV() error {
return nil
}
func (mt *metaTable) getAdditionKV(op func(ts typeutil.Timestamp) (string, error), meta map[string]string) func(ts typeutil.Timestamp) (string, string, error) {
func (mt *MetaTable) getAdditionKV(op func(ts typeutil.Timestamp) (string, error), meta map[string]string) func(ts typeutil.Timestamp) (string, string, error) {
if op == nil {
return nil
}
@ -211,7 +243,8 @@ func (mt *metaTable) getAdditionKV(op func(ts typeutil.Timestamp) (string, error
}
}
func (mt *metaTable) AddTenant(te *pb.TenantMeta, ts typeutil.Timestamp) error {
// AddTenant add tenant
func (mt *MetaTable) AddTenant(te *pb.TenantMeta, ts typeutil.Timestamp) error {
mt.tenantLock.Lock()
defer mt.tenantLock.Unlock()
@ -227,7 +260,8 @@ func (mt *metaTable) AddTenant(te *pb.TenantMeta, ts typeutil.Timestamp) error {
return nil
}
func (mt *metaTable) AddProxy(po *pb.ProxyMeta, ts typeutil.Timestamp) error {
// AddProxy add proxy
func (mt *MetaTable) AddProxy(po *pb.ProxyMeta, ts typeutil.Timestamp) error {
mt.proxyLock.Lock()
defer mt.proxyLock.Unlock()
@ -243,7 +277,8 @@ func (mt *metaTable) AddProxy(po *pb.ProxyMeta, ts typeutil.Timestamp) error {
return nil
}
func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestamp, idx []*pb.IndexInfo, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
// AddCollection add collection
func (mt *MetaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestamp, idx []*pb.IndexInfo, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
@ -295,7 +330,8 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestam
return nil
}
func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
// DeleteCollection delete collection
func (mt *MetaTable) DeleteCollection(collID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
@ -362,7 +398,8 @@ func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ts typeutil.Time
return nil
}
func (mt *metaTable) HasCollection(collID typeutil.UniqueID, ts typeutil.Timestamp) bool {
// HasCollection return collection existence
func (mt *MetaTable) HasCollection(collID typeutil.UniqueID, ts typeutil.Timestamp) bool {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
if ts == 0 {
@ -374,7 +411,8 @@ func (mt *metaTable) HasCollection(collID typeutil.UniqueID, ts typeutil.Timesta
return err == nil
}
func (mt *metaTable) GetCollectionByID(collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
// GetCollectionByID return collection meta by collection id
func (mt *MetaTable) GetCollectionByID(collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
@ -399,7 +437,8 @@ func (mt *metaTable) GetCollectionByID(collectionID typeutil.UniqueID, ts typeut
return &colMeta, nil
}
func (mt *metaTable) GetCollectionByName(collectionName string, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
// GetCollectionByName return collection meta by collection name
func (mt *MetaTable) GetCollectionByName(collectionName string, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
@ -435,7 +474,8 @@ func (mt *metaTable) GetCollectionByName(collectionName string, ts typeutil.Time
return nil, fmt.Errorf("can't find collection: %s, at timestamp = %d", collectionName, ts)
}
func (mt *metaTable) ListCollections(ts typeutil.Timestamp) (map[string]*pb.CollectionInfo, error) {
// ListCollections list all collection names
func (mt *MetaTable) ListCollections(ts typeutil.Timestamp) (map[string]*pb.CollectionInfo, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
colls := make(map[string]*pb.CollectionInfo)
@ -464,7 +504,8 @@ func (mt *metaTable) ListCollections(ts typeutil.Timestamp) (map[string]*pb.Coll
return colls, nil
}
func (mt *metaTable) ListAliases(collID typeutil.UniqueID) []string {
// ListAliases list all collection aliases
func (mt *MetaTable) ListAliases(collID typeutil.UniqueID) []string {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
var aliases []string
@ -476,8 +517,8 @@ func (mt *metaTable) ListAliases(collID typeutil.UniqueID) []string {
return aliases
}
// ListCollectionVirtualChannels list virtual channel of all the collection
func (mt *metaTable) ListCollectionVirtualChannels() []string {
// ListCollectionVirtualChannels list virtual channels of all collections
func (mt *MetaTable) ListCollectionVirtualChannels() []string {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
vlist := []string{}
@ -488,8 +529,8 @@ func (mt *metaTable) ListCollectionVirtualChannels() []string {
return vlist
}
// ListCollectionPhysicalChannels list physical channel of all the collection
func (mt *metaTable) ListCollectionPhysicalChannels() []string {
// ListCollectionPhysicalChannels list physical channels of all collections
func (mt *MetaTable) ListCollectionPhysicalChannels() []string {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
plist := []string{}
@ -500,7 +541,8 @@ func (mt *metaTable) ListCollectionPhysicalChannels() []string {
return plist
}
func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
// AddPartition add partition
func (mt *MetaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
coll, ok := mt.collID2Meta[collID]
@ -560,7 +602,8 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string
return nil
}
func (mt *metaTable) GetPartitionNameByID(collID, partitionID typeutil.UniqueID, ts typeutil.Timestamp) (string, error) {
// GetPartitionNameByID return partition name by partition id
func (mt *MetaTable) GetPartitionNameByID(collID, partitionID typeutil.UniqueID, ts typeutil.Timestamp) (string, error) {
if ts == 0 {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
@ -593,7 +636,7 @@ func (mt *metaTable) GetPartitionNameByID(collID, partitionID typeutil.UniqueID,
return "", fmt.Errorf("partition %d does not exist", partitionID)
}
func (mt *metaTable) getPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (typeutil.UniqueID, error) {
func (mt *MetaTable) getPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (typeutil.UniqueID, error) {
if ts == 0 {
collMeta, ok := mt.collID2Meta[collID]
if !ok {
@ -624,20 +667,23 @@ func (mt *metaTable) getPartitionByName(collID typeutil.UniqueID, partitionName
return 0, fmt.Errorf("partition %s does not exist", partitionName)
}
func (mt *metaTable) GetPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (typeutil.UniqueID, error) {
// GetPartitionByName return partition id by partition name
func (mt *MetaTable) GetPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (typeutil.UniqueID, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
return mt.getPartitionByName(collID, partitionName, ts)
}
func (mt *metaTable) HasPartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) bool {
// HasPartition check partition existence
func (mt *MetaTable) HasPartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) bool {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
_, err := mt.getPartitionByName(collID, partitionName, ts)
return err == nil
}
func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.UniqueID, error) {
// DeletePartition delete partition
func (mt *MetaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.UniqueID, error) {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
@ -703,7 +749,8 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str
return partID, nil
}
func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo, ts typeutil.Timestamp) error {
// AddIndex add index
func (mt *MetaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo, ts typeutil.Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
@ -757,8 +804,8 @@ func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo, ts typeutil.Times
return nil
}
//return timestamp, index id, is dropped, error
func (mt *metaTable) DropIndex(collName, fieldName, indexName string, ts typeutil.Timestamp) (typeutil.UniqueID, bool, error) {
// DropIndex drop index
func (mt *MetaTable) DropIndex(collName, fieldName, indexName string, ts typeutil.Timestamp) (typeutil.UniqueID, bool, error) {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
@ -832,7 +879,8 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string, ts typeuti
return dropIdxID, true, nil
}
func (mt *metaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID int64, idxName string) (pb.SegmentIndexInfo, error) {
// GetSegmentIndexInfoByID return segment index info by segment id
func (mt *MetaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID int64, idxName string) (pb.SegmentIndexInfo, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
@ -874,14 +922,15 @@ func (mt *metaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID in
return pb.SegmentIndexInfo{}, fmt.Errorf("can't find index name = %s on segment = %d, with filed id = %d", idxName, segID, filedID)
}
func (mt *metaTable) GetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) {
// GetFieldSchema return field schema
func (mt *MetaTable) GetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
return mt.unlockGetFieldSchema(collName, fieldName)
}
func (mt *metaTable) unlockGetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) {
func (mt *MetaTable) unlockGetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) {
collID, ok := mt.collName2ID[collName]
if !ok {
return schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
@ -899,14 +948,14 @@ func (mt *metaTable) unlockGetFieldSchema(collName string, fieldName string) (sc
return schemapb.FieldSchema{}, fmt.Errorf("collection %s doesn't have filed %s", collName, fieldName)
}
//return true/false
func (mt *metaTable) IsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool {
// IsSegmentIndexed check if segment has index
func (mt *MetaTable) IsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
return mt.unlockIsSegmentIndexed(segID, fieldSchema, indexParams)
}
func (mt *metaTable) unlockIsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool {
func (mt *MetaTable) unlockIsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool {
segIdx, ok := mt.segID2IndexMeta[segID]
if !ok {
return false
@ -928,8 +977,8 @@ func (mt *metaTable) unlockIsSegmentIndexed(segID typeutil.UniqueID, fieldSchema
return exist
}
// return segment ids, type params, error
func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, idxInfo *pb.IndexInfo, segIDs []typeutil.UniqueID, ts typeutil.Timestamp) ([]typeutil.UniqueID, schemapb.FieldSchema, error) {
// GetNotIndexedSegments return segment ids which have no index
func (mt *MetaTable) GetNotIndexedSegments(collName string, fieldName string, idxInfo *pb.IndexInfo, segIDs []typeutil.UniqueID, ts typeutil.Timestamp) ([]typeutil.UniqueID, schemapb.FieldSchema, error) {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
@ -1035,7 +1084,8 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id
return rstID, fieldSchema, nil
}
func (mt *metaTable) GetIndexByName(collName, indexName string) (pb.CollectionInfo, []pb.IndexInfo, error) {
// GetIndexByName return index info by index name
func (mt *MetaTable) GetIndexByName(collName, indexName string) (pb.CollectionInfo, []pb.IndexInfo, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
@ -1061,7 +1111,8 @@ func (mt *metaTable) GetIndexByName(collName, indexName string) (pb.CollectionIn
return collMeta, rstIndex, nil
}
func (mt *metaTable) GetIndexByID(indexID typeutil.UniqueID) (*pb.IndexInfo, error) {
// GetIndexByID return index info by index id
func (mt *MetaTable) GetIndexByID(indexID typeutil.UniqueID) (*pb.IndexInfo, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
@ -1072,7 +1123,7 @@ func (mt *metaTable) GetIndexByID(indexID typeutil.UniqueID) (*pb.IndexInfo, err
return &indexInfo, nil
}
func (mt *metaTable) dupMeta() (
func (mt *MetaTable) dupMeta() (
map[typeutil.UniqueID]pb.CollectionInfo,
map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo,
map[typeutil.UniqueID]pb.IndexInfo,
@ -1098,7 +1149,8 @@ func (mt *metaTable) dupMeta() (
return collID2Meta, segID2IndexMeta, indexID2Meta
}
func (mt *metaTable) AddAlias(collectionAlias string, collectionName string,
// AddAlias add collection alias
func (mt *MetaTable) AddAlias(collectionAlias string, collectionName string,
ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
@ -1133,7 +1185,8 @@ func (mt *metaTable) AddAlias(collectionAlias string, collectionName string,
return nil
}
func (mt *metaTable) DeleteAlias(collectionAlias string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
// DeleteAlias delete collection alias
func (mt *MetaTable) DeleteAlias(collectionAlias string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
if _, ok := mt.collAlias2ID[collectionAlias]; !ok {
@ -1154,7 +1207,8 @@ func (mt *metaTable) DeleteAlias(collectionAlias string, ts typeutil.Timestamp,
return nil
}
func (mt *metaTable) AlterAlias(collectionAlias string, collectionName string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
// AlterAlias alter collection alias
func (mt *MetaTable) AlterAlias(collectionAlias string, collectionName string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
if _, ok := mt.collAlias2ID[collectionAlias]; !ok {

View File

@ -21,9 +21,11 @@ import (
"github.com/milvus-io/milvus/internal/util/paramtable"
)
// Params globle params
var Params ParamTable
var once sync.Once
// ParamTable structure stores all params
type ParamTable struct {
paramtable.BaseTable
@ -58,12 +60,14 @@ type ParamTable struct {
RoleName string
}
// InitOnce initialize once
func (p *ParamTable) InitOnce() {
once.Do(func() {
p.Init()
})
}
// Init initialize param table
func (p *ParamTable) Init() {
// load yaml
p.BaseTable.Init()

View File

@ -74,7 +74,7 @@ func metricProxy(v int64) string {
// Core root coordinator core
type Core struct {
MetaTable *metaTable
MetaTable *MetaTable
//id allocator
IDAllocator func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error)
IDAllocatorUpdate func() error
@ -159,6 +159,7 @@ type Core struct {
// --------------------- function --------------------------
// NewCore create rootcoord core
func NewCore(c context.Context, factory ms.Factory) (*Core, error) {
ctx, cancel := context.WithCancel(c)
rand.Seed(time.Now().UnixNano())
@ -172,6 +173,7 @@ func NewCore(c context.Context, factory ms.Factory) (*Core, error) {
return core, nil
}
// UpdateStateCode update state code
func (c *Core) UpdateStateCode(code internalpb.StateCode) {
c.stateCode.Store(code)
}
@ -568,7 +570,7 @@ func (c *Core) setMsgStreams() error {
return nil
}
//SetNewProxyClient create proxy node by this func
// SetNewProxyClient set client to create proxy
func (c *Core) SetNewProxyClient(f func(sess *sessionutil.Session) (types.Proxy, error)) {
if c.NewProxyClient == nil {
c.NewProxyClient = f
@ -577,6 +579,7 @@ func (c *Core) SetNewProxyClient(f func(sess *sessionutil.Session) (types.Proxy,
}
}
// SetDataCoord set datacoord
func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error {
initCh := make(chan struct{})
go func() {
@ -692,6 +695,7 @@ func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error {
return nil
}
// SetIndexCoord set indexcoord
func (c *Core) SetIndexCoord(s types.IndexCoord) error {
initCh := make(chan struct{})
go func() {
@ -752,6 +756,7 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error {
return nil
}
// SetQueryCoord set querycoord
func (c *Core) SetQueryCoord(s types.QueryCoord) error {
initCh := make(chan struct{})
go func() {
@ -861,6 +866,7 @@ func (c *Core) Register() error {
return nil
}
// Init initialize routine
func (c *Core) Init() error {
var initError error = nil
if c.kvBaseCreate == nil {
@ -1090,6 +1096,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
return c.setDdMsgSendFlag(true)
}
// Start start rootcoord
func (c *Core) Start() error {
if err := c.checkInit(); err != nil {
log.Debug("RootCoord Start checkInit failed", zap.Error(err))
@ -1126,6 +1133,7 @@ func (c *Core) Start() error {
return nil
}
// Stop stop rootcoord
func (c *Core) Stop() error {
c.cancel()
c.wg.Wait()
@ -1133,6 +1141,7 @@ func (c *Core) Stop() error {
return nil
}
// GetComponentStates get states of components
func (c *Core) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
code := c.stateCode.Load().(internalpb.StateCode)
log.Debug("GetComponentStates", zap.String("State Code", internalpb.StateCode_name[int32(code)]))
@ -1159,6 +1168,7 @@ func (c *Core) GetComponentStates(ctx context.Context) (*internalpb.ComponentSta
}, nil
}
// GetTimeTickChannel get timetick channel name
func (c *Core) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
@ -1169,6 +1179,7 @@ func (c *Core) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse
}, nil
}
// GetStatisticsChannel get statistics channel name
func (c *Core) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
@ -1179,6 +1190,7 @@ func (c *Core) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringRespon
}, nil
}
// CreateCollection create collection
func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
metrics.RootCoordCreateCollectionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc()
code := c.stateCode.Load().(internalpb.StateCode)
@ -1212,6 +1224,7 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti
}, nil
}
// DropCollection drop collection
func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
metrics.RootCoordDropCollectionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc()
code := c.stateCode.Load().(internalpb.StateCode)
@ -1245,6 +1258,7 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe
}, nil
}
// HasCollection check collection existence
func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
metrics.RootCoordHasCollectionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc()
code := c.stateCode.Load().(internalpb.StateCode)
@ -1288,6 +1302,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ
}, nil
}
// DescribeCollection return collection info
func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
metrics.RootCoordDescribeCollectionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc()
code := c.stateCode.Load().(internalpb.StateCode)
@ -1331,6 +1346,7 @@ func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeColl
return t.Rsp, nil
}
// ShowCollections list all collection names
func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
metrics.RootCoordShowCollectionsCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc()
code := c.stateCode.Load().(internalpb.StateCode)
@ -1376,6 +1392,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections
return t.Rsp, nil
}
// CreatePartition create partition
func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
metrics.RootCoordCreatePartitionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc()
code := c.stateCode.Load().(internalpb.StateCode)
@ -1410,6 +1427,7 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition
}, nil
}
// DropPartition drop partition
func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
metrics.RootCoordDropPartitionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc()
code := c.stateCode.Load().(internalpb.StateCode)
@ -1444,6 +1462,7 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ
}, nil
}
// HasPartition check partition existence
func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
metrics.RootCoordHasPartitionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc()
code := c.stateCode.Load().(internalpb.StateCode)
@ -1487,6 +1506,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques
}, nil
}
// ShowPartitions list all partition names
func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
metrics.RootCoordShowPartitionsCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc()
log.Debug("ShowPartitionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", in.Base.MsgID),
@ -1536,6 +1556,7 @@ func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRe
return t.Rsp, nil
}
// CreateIndex create index
func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
metrics.RootCoordCreateIndexCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc()
code := c.stateCode.Load().(internalpb.StateCode)
@ -1571,6 +1592,7 @@ func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest)
}, nil
}
// DescribeIndex return index info
func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
metrics.RootCoordDescribeIndexCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc()
code := c.stateCode.Load().(internalpb.StateCode)
@ -1626,6 +1648,7 @@ func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequ
return t.Rsp, nil
}
// DropIndex drop index
func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
metrics.RootCoordDropIndexCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc()
code := c.stateCode.Load().(internalpb.StateCode)
@ -1659,6 +1682,7 @@ func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*c
}, nil
}
// DescribeSegment return segment info
func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
metrics.RootCoordDescribeSegmentCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc()
code := c.stateCode.Load().(internalpb.StateCode)
@ -1703,6 +1727,7 @@ func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegment
return t.Rsp, nil
}
// ShowSegments list all segments
func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) {
metrics.RootCoordShowSegmentsCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc()
code := c.stateCode.Load().(internalpb.StateCode)
@ -1747,6 +1772,7 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques
return t.Rsp, nil
}
// AllocTimestamp alloc timestamp
func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) {
code := c.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
@ -1785,6 +1811,7 @@ func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestam
}, nil
}
// AllocID alloc ids
func (c *Core) AllocID(ctx context.Context, in *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) {
code := c.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
@ -1850,6 +1877,7 @@ func (c *Core) UpdateChannelTimeTick(ctx context.Context, in *internalpb.Channel
}, nil
}
// ReleaseDQLMessageStream release DQL msgstream
func (c *Core) ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
code := c.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
@ -1861,6 +1889,7 @@ func (c *Core) ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseD
return c.proxyClientManager.ReleaseDQLMessageStream(ctx, in)
}
// SegmentFlushCompleted check whether segment flush has completed
func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) {
code := c.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
@ -1932,6 +1961,7 @@ func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlus
}, nil
}
// GetMetrics get metrics
func (c *Core) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
log.Debug("RootCoord.GetMetrics",
zap.Int64("node_id", c.session.ServerID),
@ -2007,6 +2037,7 @@ func (c *Core) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest)
}, nil
}
// CreateAlias create collection alias
func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
code := c.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
@ -2038,6 +2069,7 @@ func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest)
}, nil
}
// DropAlias drop collection alias
func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*commonpb.Status, error) {
code := c.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {
@ -2069,6 +2101,7 @@ func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*c
}, nil
}
// AlterAlias alter collection alias
func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (*commonpb.Status, error) {
code := c.stateCode.Load().(internalpb.StateCode)
if code != internalpb.StateCode_Healthy {

View File

@ -2306,7 +2306,7 @@ func TestCheckInit(t *testing.T) {
err = c.checkInit()
assert.NotNil(t, err)
c.MetaTable = &metaTable{}
c.MetaTable = &MetaTable{}
err = c.checkInit()
assert.NotNil(t, err)

View File

@ -69,15 +69,18 @@ func executeTask(t reqTask) error {
}
}
// CreateCollectionReqTask create collection request task
type CreateCollectionReqTask struct {
baseReqTask
Req *milvuspb.CreateCollectionRequest
}
// Type return msg type
func (t *CreateCollectionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
// Execute task execution
func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_CreateCollection {
return fmt.Errorf("create collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -221,15 +224,18 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
return t.core.setDdMsgSendFlag(true)
}
// DropCollectionReqTask drop collection request task
type DropCollectionReqTask struct {
baseReqTask
Req *milvuspb.DropCollectionRequest
}
// Type return msg type
func (t *DropCollectionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
// Execute task execution
func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DropCollection {
return fmt.Errorf("drop collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -320,16 +326,19 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
return t.core.setDdMsgSendFlag(true)
}
// HasCollectionReqTask has collection request task
type HasCollectionReqTask struct {
baseReqTask
Req *milvuspb.HasCollectionRequest
HasCollection bool
}
// Type return msg type
func (t *HasCollectionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
// Execute task execution
func (t *HasCollectionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_HasCollection {
return fmt.Errorf("has collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -343,16 +352,19 @@ func (t *HasCollectionReqTask) Execute(ctx context.Context) error {
return nil
}
// DescribeCollectionReqTask describe collection request task
type DescribeCollectionReqTask struct {
baseReqTask
Req *milvuspb.DescribeCollectionRequest
Rsp *milvuspb.DescribeCollectionResponse
}
// Type return msg type
func (t *DescribeCollectionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
// Execute task execution
func (t *DescribeCollectionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DescribeCollection {
return fmt.Errorf("describe collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -388,16 +400,19 @@ func (t *DescribeCollectionReqTask) Execute(ctx context.Context) error {
return nil
}
// ShowCollectionReqTask show collection request task
type ShowCollectionReqTask struct {
baseReqTask
Req *milvuspb.ShowCollectionsRequest
Rsp *milvuspb.ShowCollectionsResponse
}
// Type return msg type
func (t *ShowCollectionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
// Execute task execution
func (t *ShowCollectionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_ShowCollections {
return fmt.Errorf("show collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -416,15 +431,18 @@ func (t *ShowCollectionReqTask) Execute(ctx context.Context) error {
return nil
}
// CreatePartitionReqTask create partition request task
type CreatePartitionReqTask struct {
baseReqTask
Req *milvuspb.CreatePartitionRequest
}
// Type return msg type
func (t *CreatePartitionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
// Execute task execution
func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_CreatePartition {
return fmt.Errorf("create partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -508,15 +526,18 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
return t.core.setDdMsgSendFlag(true)
}
// DropPartitionReqTask drop partition request task
type DropPartitionReqTask struct {
baseReqTask
Req *milvuspb.DropPartitionRequest
}
// Type return msg type
func (t *DropPartitionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
// Execute task execution
func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DropPartition {
return fmt.Errorf("drop partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -606,16 +627,19 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
return t.core.setDdMsgSendFlag(true)
}
// HasPartitionReqTask has partition request task
type HasPartitionReqTask struct {
baseReqTask
Req *milvuspb.HasPartitionRequest
HasPartition bool
}
// Type return msg type
func (t *HasPartitionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
// Execute task execution
func (t *HasPartitionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_HasPartition {
return fmt.Errorf("has partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -628,16 +652,19 @@ func (t *HasPartitionReqTask) Execute(ctx context.Context) error {
return nil
}
// ShowPartitionReqTask show partition request task
type ShowPartitionReqTask struct {
baseReqTask
Req *milvuspb.ShowPartitionsRequest
Rsp *milvuspb.ShowPartitionsResponse
}
// Type return msg type
func (t *ShowPartitionReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
// Execute task execution
func (t *ShowPartitionReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_ShowPartitions {
return fmt.Errorf("show partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -664,16 +691,19 @@ func (t *ShowPartitionReqTask) Execute(ctx context.Context) error {
return nil
}
// DescribeSegmentReqTask describe segment request task
type DescribeSegmentReqTask struct {
baseReqTask
Req *milvuspb.DescribeSegmentRequest
Rsp *milvuspb.DescribeSegmentResponse //TODO,return repeated segment id in the future
}
// Type return msg type
func (t *DescribeSegmentReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
// Execute task execution
func (t *DescribeSegmentReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DescribeSegment {
return fmt.Errorf("describe segment, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -712,16 +742,19 @@ func (t *DescribeSegmentReqTask) Execute(ctx context.Context) error {
return nil
}
// ShowSegmentReqTask show segment request task
type ShowSegmentReqTask struct {
baseReqTask
Req *milvuspb.ShowSegmentsRequest
Rsp *milvuspb.ShowSegmentsResponse
}
// Type return msg type
func (t *ShowSegmentReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
// Execute task execution
func (t *ShowSegmentReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_ShowSegments {
return fmt.Errorf("show segments, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -750,15 +783,18 @@ func (t *ShowSegmentReqTask) Execute(ctx context.Context) error {
return nil
}
// CreateIndexReqTask create index request task
type CreateIndexReqTask struct {
baseReqTask
Req *milvuspb.CreateIndexRequest
}
// Type return msg type
func (t *CreateIndexReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
// Execute task execution
func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_CreateIndex {
return fmt.Errorf("create index, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -822,16 +858,19 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
return nil
}
// DescribeIndexReqTask describe index request task
type DescribeIndexReqTask struct {
baseReqTask
Req *milvuspb.DescribeIndexRequest
Rsp *milvuspb.DescribeIndexResponse
}
// Type return msg type
func (t *DescribeIndexReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
// Execute task execution
func (t *DescribeIndexReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DescribeIndex {
return fmt.Errorf("describe index, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -857,15 +896,18 @@ func (t *DescribeIndexReqTask) Execute(ctx context.Context) error {
return nil
}
// DropIndexReqTask drop index request task
type DropIndexReqTask struct {
baseReqTask
Req *milvuspb.DropIndexRequest
}
// Type return msg type
func (t *DropIndexReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
// Execute task execution
func (t *DropIndexReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DropIndex {
return fmt.Errorf("drop index, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -890,15 +932,18 @@ func (t *DropIndexReqTask) Execute(ctx context.Context) error {
return err
}
// CreateAliasReqTask create alias request task
type CreateAliasReqTask struct {
baseReqTask
Req *milvuspb.CreateAliasRequest
}
// Type return msg type
func (t *CreateAliasReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
// Execute task execution
func (t *CreateAliasReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_CreateAlias {
return fmt.Errorf("create alias, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -948,15 +993,18 @@ func (t *CreateAliasReqTask) Execute(ctx context.Context) error {
return t.core.setDdMsgSendFlag(true)
}
// DropAliasReqTask drop alias request task
type DropAliasReqTask struct {
baseReqTask
Req *milvuspb.DropAliasRequest
}
// Type return msg type
func (t *DropAliasReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
// Execute task execution
func (t *DropAliasReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_DropAlias {
return fmt.Errorf("create alias, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -1016,18 +1064,20 @@ func (t *DropAliasReqTask) Execute(ctx context.Context) error {
// Update DDOperation in etcd
return t.core.setDdMsgSendFlag(true)
}
// AlterAliasReqTask alter alias request task
type AlterAliasReqTask struct {
baseReqTask
Req *milvuspb.AlterAliasRequest
}
// Type return msg type
func (t *AlterAliasReqTask) Type() commonpb.MsgType {
return t.Req.Base.MsgType
}
// Execute task execution
func (t *AlterAliasReqTask) Execute(ctx context.Context) error {
if t.Type() != commonpb.MsgType_AlterAlias {
return fmt.Errorf("alter alias, msg type = %s", commonpb.MsgType_name[int32(t.Type())])
@ -1087,5 +1137,4 @@ func (t *AlterAliasReqTask) Execute(ctx context.Context) error {
// Update DDOperation in etcd
return t.core.setDdMsgSendFlag(true)
}

View File

@ -23,7 +23,7 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
)
//return
// EqualKeyPairArray check whether 2 KeyValuePairs are equal
func EqualKeyPairArray(p1 []*commonpb.KeyValuePair, p2 []*commonpb.KeyValuePair) bool {
if len(p1) != len(p2) {
return false
@ -44,6 +44,7 @@ func EqualKeyPairArray(p1 []*commonpb.KeyValuePair, p2 []*commonpb.KeyValuePair)
return true
}
// GetFieldSchemaByID return field schema by id
func GetFieldSchemaByID(coll *etcdpb.CollectionInfo, fieldID typeutil.UniqueID) (*schemapb.FieldSchema, error) {
for _, f := range coll.Schema.Fields {
if f.FieldID == fieldID {
@ -53,7 +54,7 @@ func GetFieldSchemaByID(coll *etcdpb.CollectionInfo, fieldID typeutil.UniqueID)
return nil, fmt.Errorf("field id = %d not found", fieldID)
}
//GetFieldSchemaByIndexID return the field schema by it's index id
// GetFieldSchemaByIndexID return field schema by it's index id
func GetFieldSchemaByIndexID(coll *etcdpb.CollectionInfo, idxID typeutil.UniqueID) (*schemapb.FieldSchema, error) {
var fieldID typeutil.UniqueID
exist := false
@ -122,7 +123,7 @@ func DecodeMsgPositions(str string, msgPositions *[]*msgstream.MsgPosition) erro
return json.Unmarshal([]byte(str), msgPositions)
}
//ToPhysicalChannel virtual channel -> physical channel
// ToPhysicalChannel get physical channel name from virtual channel name
func ToPhysicalChannel(vchannel string) string {
var idx int
for idx = len(vchannel) - 1; idx >= 0; idx-- {