From 177a465913e248df1a9bb392c797e4db35f68b5d Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Thu, 23 Sep 2021 15:10:00 +0800 Subject: [PATCH] Clean golint warnings for rootcoord (#8383) Signed-off-by: yudong.cai --- internal/rootcoord/field_id.go | 15 ++- internal/rootcoord/meta_snapshot.go | 1 + internal/rootcoord/meta_table.go | 164 +++++++++++++++++--------- internal/rootcoord/param_table.go | 4 + internal/rootcoord/root_coord.go | 37 +++++- internal/rootcoord/root_coord_test.go | 2 +- internal/rootcoord/task.go | 53 ++++++++- internal/rootcoord/util.go | 7 +- 8 files changed, 217 insertions(+), 66 deletions(-) diff --git a/internal/rootcoord/field_id.go b/internal/rootcoord/field_id.go index 2192cbef4e..6823f1d764 100644 --- a/internal/rootcoord/field_id.go +++ b/internal/rootcoord/field_id.go @@ -21,9 +21,18 @@ import "github.com/milvus-io/milvus/internal/common" // 102: ... const ( + // StartOfUserFieldID id of user defined filed begin from here StartOfUserFieldID = common.StartOfUserFieldID - RowIDField = common.RowIDField - TimeStampField = common.TimeStampField - RowIDFieldName = common.RowIDFieldName + + // RowIDField id of row ID field + RowIDField = common.RowIDField + + // TimeStampField id of timestamp field + TimeStampField = common.TimeStampField + + // RowIDFieldName name of row ID field + RowIDFieldName = common.RowIDFieldName + + // TimeStampFieldName name of timestamp field TimeStampFieldName = common.TimeStampFieldName ) diff --git a/internal/rootcoord/meta_snapshot.go b/internal/rootcoord/meta_snapshot.go index 739ee6d634..fc1e219649 100644 --- a/internal/rootcoord/meta_snapshot.go +++ b/internal/rootcoord/meta_snapshot.go @@ -26,6 +26,7 @@ import ( ) const ( + // RequestTimeout timeout for request RequestTimeout = 10 * time.Second ) diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 56cf4203a6..8de656dad4 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -29,29 +29,60 @@ import ( ) const ( - ComponentPrefix = "root-coord" - TenantMetaPrefix = ComponentPrefix + "/tenant" - ProxyMetaPrefix = ComponentPrefix + "/proxy" - CollectionMetaPrefix = ComponentPrefix + "/collection" - SegmentIndexMetaPrefix = ComponentPrefix + "/segment-index" - IndexMetaPrefix = ComponentPrefix + "/index" + // ComponentPrefix prefix for rootcoord component + ComponentPrefix = "root-coord" + + // TenantMetaPrefix prefix for tenant meta + TenantMetaPrefix = ComponentPrefix + "/tenant" + + // ProxyMetaPrefix prefix for proxy meta + ProxyMetaPrefix = ComponentPrefix + "/proxy" + + // CollectionMetaPrefix prefix for collection meta + CollectionMetaPrefix = ComponentPrefix + "/collection" + + // SegmentIndexMetaPrefix prefix for segment index meta + SegmentIndexMetaPrefix = ComponentPrefix + "/segment-index" + + // IndexMetaPrefix prefix for index meta + IndexMetaPrefix = ComponentPrefix + "/index" + + // CollectionAliasMetaPrefix prefix for collection alias meta CollectionAliasMetaPrefix = ComponentPrefix + "/collection-alias" + // TimestampPrefix prefix for timestamp TimestampPrefix = ComponentPrefix + "/timestamp" + // DDOperationPrefix prefix for DD operation DDOperationPrefix = ComponentPrefix + "/dd-operation" - DDMsgSendPrefix = ComponentPrefix + "/dd-msg-send" + // DDMsgSendPrefix prefix to indicate whether DD msg has been send + DDMsgSendPrefix = ComponentPrefix + "/dd-msg-send" + + // CreateCollectionDDType name of DD type for create collection CreateCollectionDDType = "CreateCollection" - DropCollectionDDType = "DropCollection" - CreatePartitionDDType = "CreatePartition" - DropPartitionDDType = "DropPartition" - CreateAliasDDType = "CreateAlias" - DropAliasDDType = "DropAlias" - AlterAliasDDType = "AlterAlias" + + // DropCollectionDDType name of DD type for drop collection + DropCollectionDDType = "DropCollection" + + // CreatePartitionDDType name of DD type for create partition + CreatePartitionDDType = "CreatePartition" + + // DropPartitionDDType name of DD type for drop partition + DropPartitionDDType = "DropPartition" + + // CreateAliasDDType name of DD type for create collection alias + CreateAliasDDType = "CreateAlias" + + // DropAliasDDType name of DD type for drop collection alias + DropAliasDDType = "DropAlias" + + // AlterAliasDDType name of DD type for alter collection alias + AlterAliasDDType = "AlterAlias" ) -type metaTable struct { +// MetaTable store all rootcoord meta info +type MetaTable struct { client kv.SnapShotKV // client of a reliable kv service, i.e. etcd client tenantID2Meta map[typeutil.UniqueID]pb.TenantMeta // tenant id to tenant meta proxyID2Meta map[typeutil.UniqueID]pb.ProxyMeta // proxy id to proxy meta @@ -67,8 +98,10 @@ type metaTable struct { ddLock sync.RWMutex } -func NewMetaTable(kv kv.SnapShotKV) (*metaTable, error) { - mt := &metaTable{ +// NewMetaTable create meta table for rootcoord, which stores all in-memory information +// for collection, partion, segment, index etc. +func NewMetaTable(kv kv.SnapShotKV) (*MetaTable, error) { + mt := &MetaTable{ client: kv, tenantLock: sync.RWMutex{}, proxyLock: sync.RWMutex{}, @@ -81,8 +114,7 @@ func NewMetaTable(kv kv.SnapShotKV) (*metaTable, error) { return mt, nil } -func (mt *metaTable) reloadFromKV() error { - +func (mt *MetaTable) reloadFromKV() error { mt.tenantID2Meta = make(map[typeutil.UniqueID]pb.TenantMeta) mt.proxyID2Meta = make(map[typeutil.UniqueID]pb.ProxyMeta) mt.collID2Meta = make(map[typeutil.UniqueID]pb.CollectionInfo) @@ -197,7 +229,7 @@ func (mt *metaTable) reloadFromKV() error { return nil } -func (mt *metaTable) getAdditionKV(op func(ts typeutil.Timestamp) (string, error), meta map[string]string) func(ts typeutil.Timestamp) (string, string, error) { +func (mt *MetaTable) getAdditionKV(op func(ts typeutil.Timestamp) (string, error), meta map[string]string) func(ts typeutil.Timestamp) (string, string, error) { if op == nil { return nil } @@ -211,7 +243,8 @@ func (mt *metaTable) getAdditionKV(op func(ts typeutil.Timestamp) (string, error } } -func (mt *metaTable) AddTenant(te *pb.TenantMeta, ts typeutil.Timestamp) error { +// AddTenant add tenant +func (mt *MetaTable) AddTenant(te *pb.TenantMeta, ts typeutil.Timestamp) error { mt.tenantLock.Lock() defer mt.tenantLock.Unlock() @@ -227,7 +260,8 @@ func (mt *metaTable) AddTenant(te *pb.TenantMeta, ts typeutil.Timestamp) error { return nil } -func (mt *metaTable) AddProxy(po *pb.ProxyMeta, ts typeutil.Timestamp) error { +// AddProxy add proxy +func (mt *MetaTable) AddProxy(po *pb.ProxyMeta, ts typeutil.Timestamp) error { mt.proxyLock.Lock() defer mt.proxyLock.Unlock() @@ -243,7 +277,8 @@ func (mt *metaTable) AddProxy(po *pb.ProxyMeta, ts typeutil.Timestamp) error { return nil } -func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestamp, idx []*pb.IndexInfo, ddOpStr func(ts typeutil.Timestamp) (string, error)) error { +// AddCollection add collection +func (mt *MetaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestamp, idx []*pb.IndexInfo, ddOpStr func(ts typeutil.Timestamp) (string, error)) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() @@ -295,7 +330,8 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestam return nil } -func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error { +// DeleteCollection delete collection +func (mt *MetaTable) DeleteCollection(collID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() @@ -362,7 +398,8 @@ func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ts typeutil.Time return nil } -func (mt *metaTable) HasCollection(collID typeutil.UniqueID, ts typeutil.Timestamp) bool { +// HasCollection return collection existence +func (mt *MetaTable) HasCollection(collID typeutil.UniqueID, ts typeutil.Timestamp) bool { mt.ddLock.RLock() defer mt.ddLock.RUnlock() if ts == 0 { @@ -374,7 +411,8 @@ func (mt *metaTable) HasCollection(collID typeutil.UniqueID, ts typeutil.Timesta return err == nil } -func (mt *metaTable) GetCollectionByID(collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) { +// GetCollectionByID return collection meta by collection id +func (mt *MetaTable) GetCollectionByID(collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) { mt.ddLock.RLock() defer mt.ddLock.RUnlock() @@ -399,7 +437,8 @@ func (mt *metaTable) GetCollectionByID(collectionID typeutil.UniqueID, ts typeut return &colMeta, nil } -func (mt *metaTable) GetCollectionByName(collectionName string, ts typeutil.Timestamp) (*pb.CollectionInfo, error) { +// GetCollectionByName return collection meta by collection name +func (mt *MetaTable) GetCollectionByName(collectionName string, ts typeutil.Timestamp) (*pb.CollectionInfo, error) { mt.ddLock.RLock() defer mt.ddLock.RUnlock() @@ -435,7 +474,8 @@ func (mt *metaTable) GetCollectionByName(collectionName string, ts typeutil.Time return nil, fmt.Errorf("can't find collection: %s, at timestamp = %d", collectionName, ts) } -func (mt *metaTable) ListCollections(ts typeutil.Timestamp) (map[string]*pb.CollectionInfo, error) { +// ListCollections list all collection names +func (mt *MetaTable) ListCollections(ts typeutil.Timestamp) (map[string]*pb.CollectionInfo, error) { mt.ddLock.RLock() defer mt.ddLock.RUnlock() colls := make(map[string]*pb.CollectionInfo) @@ -464,7 +504,8 @@ func (mt *metaTable) ListCollections(ts typeutil.Timestamp) (map[string]*pb.Coll return colls, nil } -func (mt *metaTable) ListAliases(collID typeutil.UniqueID) []string { +// ListAliases list all collection aliases +func (mt *MetaTable) ListAliases(collID typeutil.UniqueID) []string { mt.ddLock.RLock() defer mt.ddLock.RUnlock() var aliases []string @@ -476,8 +517,8 @@ func (mt *metaTable) ListAliases(collID typeutil.UniqueID) []string { return aliases } -// ListCollectionVirtualChannels list virtual channel of all the collection -func (mt *metaTable) ListCollectionVirtualChannels() []string { +// ListCollectionVirtualChannels list virtual channels of all collections +func (mt *MetaTable) ListCollectionVirtualChannels() []string { mt.ddLock.RLock() defer mt.ddLock.RUnlock() vlist := []string{} @@ -488,8 +529,8 @@ func (mt *metaTable) ListCollectionVirtualChannels() []string { return vlist } -// ListCollectionPhysicalChannels list physical channel of all the collection -func (mt *metaTable) ListCollectionPhysicalChannels() []string { +// ListCollectionPhysicalChannels list physical channels of all collections +func (mt *MetaTable) ListCollectionPhysicalChannels() []string { mt.ddLock.RLock() defer mt.ddLock.RUnlock() plist := []string{} @@ -500,7 +541,8 @@ func (mt *metaTable) ListCollectionPhysicalChannels() []string { return plist } -func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error { +// AddPartition add partition +func (mt *MetaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() coll, ok := mt.collID2Meta[collID] @@ -560,7 +602,8 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string return nil } -func (mt *metaTable) GetPartitionNameByID(collID, partitionID typeutil.UniqueID, ts typeutil.Timestamp) (string, error) { +// GetPartitionNameByID return partition name by partition id +func (mt *MetaTable) GetPartitionNameByID(collID, partitionID typeutil.UniqueID, ts typeutil.Timestamp) (string, error) { if ts == 0 { mt.ddLock.RLock() defer mt.ddLock.RUnlock() @@ -593,7 +636,7 @@ func (mt *metaTable) GetPartitionNameByID(collID, partitionID typeutil.UniqueID, return "", fmt.Errorf("partition %d does not exist", partitionID) } -func (mt *metaTable) getPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (typeutil.UniqueID, error) { +func (mt *MetaTable) getPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (typeutil.UniqueID, error) { if ts == 0 { collMeta, ok := mt.collID2Meta[collID] if !ok { @@ -624,20 +667,23 @@ func (mt *metaTable) getPartitionByName(collID typeutil.UniqueID, partitionName return 0, fmt.Errorf("partition %s does not exist", partitionName) } -func (mt *metaTable) GetPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (typeutil.UniqueID, error) { +// GetPartitionByName return partition id by partition name +func (mt *MetaTable) GetPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (typeutil.UniqueID, error) { mt.ddLock.RLock() defer mt.ddLock.RUnlock() return mt.getPartitionByName(collID, partitionName, ts) } -func (mt *metaTable) HasPartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) bool { +// HasPartition check partition existence +func (mt *MetaTable) HasPartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) bool { mt.ddLock.RLock() defer mt.ddLock.RUnlock() _, err := mt.getPartitionByName(collID, partitionName, ts) return err == nil } -func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.UniqueID, error) { +// DeletePartition delete partition +func (mt *MetaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.UniqueID, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() @@ -703,7 +749,8 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str return partID, nil } -func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo, ts typeutil.Timestamp) error { +// AddIndex add index +func (mt *MetaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo, ts typeutil.Timestamp) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() @@ -757,8 +804,8 @@ func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo, ts typeutil.Times return nil } -//return timestamp, index id, is dropped, error -func (mt *metaTable) DropIndex(collName, fieldName, indexName string, ts typeutil.Timestamp) (typeutil.UniqueID, bool, error) { +// DropIndex drop index +func (mt *MetaTable) DropIndex(collName, fieldName, indexName string, ts typeutil.Timestamp) (typeutil.UniqueID, bool, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() @@ -832,7 +879,8 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string, ts typeuti return dropIdxID, true, nil } -func (mt *metaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID int64, idxName string) (pb.SegmentIndexInfo, error) { +// GetSegmentIndexInfoByID return segment index info by segment id +func (mt *MetaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID int64, idxName string) (pb.SegmentIndexInfo, error) { mt.ddLock.RLock() defer mt.ddLock.RUnlock() @@ -874,14 +922,15 @@ func (mt *metaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID in return pb.SegmentIndexInfo{}, fmt.Errorf("can't find index name = %s on segment = %d, with filed id = %d", idxName, segID, filedID) } -func (mt *metaTable) GetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) { +// GetFieldSchema return field schema +func (mt *MetaTable) GetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) { mt.ddLock.RLock() defer mt.ddLock.RUnlock() return mt.unlockGetFieldSchema(collName, fieldName) } -func (mt *metaTable) unlockGetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) { +func (mt *MetaTable) unlockGetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) { collID, ok := mt.collName2ID[collName] if !ok { return schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName) @@ -899,14 +948,14 @@ func (mt *metaTable) unlockGetFieldSchema(collName string, fieldName string) (sc return schemapb.FieldSchema{}, fmt.Errorf("collection %s doesn't have filed %s", collName, fieldName) } -//return true/false -func (mt *metaTable) IsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool { +// IsSegmentIndexed check if segment has index +func (mt *MetaTable) IsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool { mt.ddLock.RLock() defer mt.ddLock.RUnlock() return mt.unlockIsSegmentIndexed(segID, fieldSchema, indexParams) } -func (mt *metaTable) unlockIsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool { +func (mt *MetaTable) unlockIsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool { segIdx, ok := mt.segID2IndexMeta[segID] if !ok { return false @@ -928,8 +977,8 @@ func (mt *metaTable) unlockIsSegmentIndexed(segID typeutil.UniqueID, fieldSchema return exist } -// return segment ids, type params, error -func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, idxInfo *pb.IndexInfo, segIDs []typeutil.UniqueID, ts typeutil.Timestamp) ([]typeutil.UniqueID, schemapb.FieldSchema, error) { +// GetNotIndexedSegments return segment ids which have no index +func (mt *MetaTable) GetNotIndexedSegments(collName string, fieldName string, idxInfo *pb.IndexInfo, segIDs []typeutil.UniqueID, ts typeutil.Timestamp) ([]typeutil.UniqueID, schemapb.FieldSchema, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() @@ -1035,7 +1084,8 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id return rstID, fieldSchema, nil } -func (mt *metaTable) GetIndexByName(collName, indexName string) (pb.CollectionInfo, []pb.IndexInfo, error) { +// GetIndexByName return index info by index name +func (mt *MetaTable) GetIndexByName(collName, indexName string) (pb.CollectionInfo, []pb.IndexInfo, error) { mt.ddLock.RLock() defer mt.ddLock.RUnlock() @@ -1061,7 +1111,8 @@ func (mt *metaTable) GetIndexByName(collName, indexName string) (pb.CollectionIn return collMeta, rstIndex, nil } -func (mt *metaTable) GetIndexByID(indexID typeutil.UniqueID) (*pb.IndexInfo, error) { +// GetIndexByID return index info by index id +func (mt *MetaTable) GetIndexByID(indexID typeutil.UniqueID) (*pb.IndexInfo, error) { mt.ddLock.RLock() defer mt.ddLock.RUnlock() @@ -1072,7 +1123,7 @@ func (mt *metaTable) GetIndexByID(indexID typeutil.UniqueID) (*pb.IndexInfo, err return &indexInfo, nil } -func (mt *metaTable) dupMeta() ( +func (mt *MetaTable) dupMeta() ( map[typeutil.UniqueID]pb.CollectionInfo, map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo, map[typeutil.UniqueID]pb.IndexInfo, @@ -1098,7 +1149,8 @@ func (mt *metaTable) dupMeta() ( return collID2Meta, segID2IndexMeta, indexID2Meta } -func (mt *metaTable) AddAlias(collectionAlias string, collectionName string, +// AddAlias add collection alias +func (mt *MetaTable) AddAlias(collectionAlias string, collectionName string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() @@ -1133,7 +1185,8 @@ func (mt *metaTable) AddAlias(collectionAlias string, collectionName string, return nil } -func (mt *metaTable) DeleteAlias(collectionAlias string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error { +// DeleteAlias delete collection alias +func (mt *MetaTable) DeleteAlias(collectionAlias string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() if _, ok := mt.collAlias2ID[collectionAlias]; !ok { @@ -1154,7 +1207,8 @@ func (mt *metaTable) DeleteAlias(collectionAlias string, ts typeutil.Timestamp, return nil } -func (mt *metaTable) AlterAlias(collectionAlias string, collectionName string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error { +// AlterAlias alter collection alias +func (mt *MetaTable) AlterAlias(collectionAlias string, collectionName string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() if _, ok := mt.collAlias2ID[collectionAlias]; !ok { diff --git a/internal/rootcoord/param_table.go b/internal/rootcoord/param_table.go index 3bfa607da0..e2e62b945d 100644 --- a/internal/rootcoord/param_table.go +++ b/internal/rootcoord/param_table.go @@ -21,9 +21,11 @@ import ( "github.com/milvus-io/milvus/internal/util/paramtable" ) +// Params globle params var Params ParamTable var once sync.Once +// ParamTable structure stores all params type ParamTable struct { paramtable.BaseTable @@ -58,12 +60,14 @@ type ParamTable struct { RoleName string } +// InitOnce initialize once func (p *ParamTable) InitOnce() { once.Do(func() { p.Init() }) } +// Init initialize param table func (p *ParamTable) Init() { // load yaml p.BaseTable.Init() diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index c816885bbd..65709ff91a 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -74,7 +74,7 @@ func metricProxy(v int64) string { // Core root coordinator core type Core struct { - MetaTable *metaTable + MetaTable *MetaTable //id allocator IDAllocator func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) IDAllocatorUpdate func() error @@ -159,6 +159,7 @@ type Core struct { // --------------------- function -------------------------- +// NewCore create rootcoord core func NewCore(c context.Context, factory ms.Factory) (*Core, error) { ctx, cancel := context.WithCancel(c) rand.Seed(time.Now().UnixNano()) @@ -172,6 +173,7 @@ func NewCore(c context.Context, factory ms.Factory) (*Core, error) { return core, nil } +// UpdateStateCode update state code func (c *Core) UpdateStateCode(code internalpb.StateCode) { c.stateCode.Store(code) } @@ -568,7 +570,7 @@ func (c *Core) setMsgStreams() error { return nil } -//SetNewProxyClient create proxy node by this func +// SetNewProxyClient set client to create proxy func (c *Core) SetNewProxyClient(f func(sess *sessionutil.Session) (types.Proxy, error)) { if c.NewProxyClient == nil { c.NewProxyClient = f @@ -577,6 +579,7 @@ func (c *Core) SetNewProxyClient(f func(sess *sessionutil.Session) (types.Proxy, } } +// SetDataCoord set datacoord func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error { initCh := make(chan struct{}) go func() { @@ -692,6 +695,7 @@ func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error { return nil } +// SetIndexCoord set indexcoord func (c *Core) SetIndexCoord(s types.IndexCoord) error { initCh := make(chan struct{}) go func() { @@ -752,6 +756,7 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error { return nil } +// SetQueryCoord set querycoord func (c *Core) SetQueryCoord(s types.QueryCoord) error { initCh := make(chan struct{}) go func() { @@ -861,6 +866,7 @@ func (c *Core) Register() error { return nil } +// Init initialize routine func (c *Core) Init() error { var initError error = nil if c.kvBaseCreate == nil { @@ -1090,6 +1096,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error { return c.setDdMsgSendFlag(true) } +// Start start rootcoord func (c *Core) Start() error { if err := c.checkInit(); err != nil { log.Debug("RootCoord Start checkInit failed", zap.Error(err)) @@ -1126,6 +1133,7 @@ func (c *Core) Start() error { return nil } +// Stop stop rootcoord func (c *Core) Stop() error { c.cancel() c.wg.Wait() @@ -1133,6 +1141,7 @@ func (c *Core) Stop() error { return nil } +// GetComponentStates get states of components func (c *Core) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { code := c.stateCode.Load().(internalpb.StateCode) log.Debug("GetComponentStates", zap.String("State Code", internalpb.StateCode_name[int32(code)])) @@ -1159,6 +1168,7 @@ func (c *Core) GetComponentStates(ctx context.Context) (*internalpb.ComponentSta }, nil } +// GetTimeTickChannel get timetick channel name func (c *Core) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { return &milvuspb.StringResponse{ Status: &commonpb.Status{ @@ -1169,6 +1179,7 @@ func (c *Core) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse }, nil } +// GetStatisticsChannel get statistics channel name func (c *Core) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { return &milvuspb.StringResponse{ Status: &commonpb.Status{ @@ -1179,6 +1190,7 @@ func (c *Core) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringRespon }, nil } +// CreateCollection create collection func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { metrics.RootCoordCreateCollectionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) @@ -1212,6 +1224,7 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti }, nil } +// DropCollection drop collection func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { metrics.RootCoordDropCollectionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) @@ -1245,6 +1258,7 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe }, nil } +// HasCollection check collection existence func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { metrics.RootCoordHasCollectionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) @@ -1288,6 +1302,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ }, nil } +// DescribeCollection return collection info func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { metrics.RootCoordDescribeCollectionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) @@ -1331,6 +1346,7 @@ func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeColl return t.Rsp, nil } +// ShowCollections list all collection names func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) { metrics.RootCoordShowCollectionsCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) @@ -1376,6 +1392,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections return t.Rsp, nil } +// CreatePartition create partition func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { metrics.RootCoordCreatePartitionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) @@ -1410,6 +1427,7 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition }, nil } +// DropPartition drop partition func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { metrics.RootCoordDropPartitionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) @@ -1444,6 +1462,7 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ }, nil } +// HasPartition check partition existence func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { metrics.RootCoordHasPartitionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) @@ -1487,6 +1506,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques }, nil } +// ShowPartitions list all partition names func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) { metrics.RootCoordShowPartitionsCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() log.Debug("ShowPartitionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", in.Base.MsgID), @@ -1536,6 +1556,7 @@ func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRe return t.Rsp, nil } +// CreateIndex create index func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) { metrics.RootCoordCreateIndexCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) @@ -1571,6 +1592,7 @@ func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) }, nil } +// DescribeIndex return index info func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { metrics.RootCoordDescribeIndexCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) @@ -1626,6 +1648,7 @@ func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequ return t.Rsp, nil } +// DropIndex drop index func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*commonpb.Status, error) { metrics.RootCoordDropIndexCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) @@ -1659,6 +1682,7 @@ func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*c }, nil } +// DescribeSegment return segment info func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) { metrics.RootCoordDescribeSegmentCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) @@ -1703,6 +1727,7 @@ func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegment return t.Rsp, nil } +// ShowSegments list all segments func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) { metrics.RootCoordShowSegmentsCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) @@ -1747,6 +1772,7 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques return t.Rsp, nil } +// AllocTimestamp alloc timestamp func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) { code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { @@ -1785,6 +1811,7 @@ func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestam }, nil } +// AllocID alloc ids func (c *Core) AllocID(ctx context.Context, in *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) { code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { @@ -1850,6 +1877,7 @@ func (c *Core) UpdateChannelTimeTick(ctx context.Context, in *internalpb.Channel }, nil } +// ReleaseDQLMessageStream release DQL msgstream func (c *Core) ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) { code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { @@ -1861,6 +1889,7 @@ func (c *Core) ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseD return c.proxyClientManager.ReleaseDQLMessageStream(ctx, in) } +// SegmentFlushCompleted check whether segment flush has completed func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) { code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { @@ -1932,6 +1961,7 @@ func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlus }, nil } +// GetMetrics get metrics func (c *Core) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { log.Debug("RootCoord.GetMetrics", zap.Int64("node_id", c.session.ServerID), @@ -2007,6 +2037,7 @@ func (c *Core) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) }, nil } +// CreateAlias create collection alias func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest) (*commonpb.Status, error) { code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { @@ -2038,6 +2069,7 @@ func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest) }, nil } +// DropAlias drop collection alias func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*commonpb.Status, error) { code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { @@ -2069,6 +2101,7 @@ func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*c }, nil } +// AlterAlias alter collection alias func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (*commonpb.Status, error) { code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index ce56892397..c213a00615 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -2306,7 +2306,7 @@ func TestCheckInit(t *testing.T) { err = c.checkInit() assert.NotNil(t, err) - c.MetaTable = &metaTable{} + c.MetaTable = &MetaTable{} err = c.checkInit() assert.NotNil(t, err) diff --git a/internal/rootcoord/task.go b/internal/rootcoord/task.go index f577cab9e2..047b8f349a 100644 --- a/internal/rootcoord/task.go +++ b/internal/rootcoord/task.go @@ -69,15 +69,18 @@ func executeTask(t reqTask) error { } } +// CreateCollectionReqTask create collection request task type CreateCollectionReqTask struct { baseReqTask Req *milvuspb.CreateCollectionRequest } +// Type return msg type func (t *CreateCollectionReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } +// Execute task execution func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_CreateCollection { return fmt.Errorf("create collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -221,15 +224,18 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error { return t.core.setDdMsgSendFlag(true) } +// DropCollectionReqTask drop collection request task type DropCollectionReqTask struct { baseReqTask Req *milvuspb.DropCollectionRequest } +// Type return msg type func (t *DropCollectionReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } +// Execute task execution func (t *DropCollectionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_DropCollection { return fmt.Errorf("drop collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -320,16 +326,19 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error { return t.core.setDdMsgSendFlag(true) } +// HasCollectionReqTask has collection request task type HasCollectionReqTask struct { baseReqTask Req *milvuspb.HasCollectionRequest HasCollection bool } +// Type return msg type func (t *HasCollectionReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } +// Execute task execution func (t *HasCollectionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_HasCollection { return fmt.Errorf("has collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -343,16 +352,19 @@ func (t *HasCollectionReqTask) Execute(ctx context.Context) error { return nil } +// DescribeCollectionReqTask describe collection request task type DescribeCollectionReqTask struct { baseReqTask Req *milvuspb.DescribeCollectionRequest Rsp *milvuspb.DescribeCollectionResponse } +// Type return msg type func (t *DescribeCollectionReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } +// Execute task execution func (t *DescribeCollectionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_DescribeCollection { return fmt.Errorf("describe collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -388,16 +400,19 @@ func (t *DescribeCollectionReqTask) Execute(ctx context.Context) error { return nil } +// ShowCollectionReqTask show collection request task type ShowCollectionReqTask struct { baseReqTask Req *milvuspb.ShowCollectionsRequest Rsp *milvuspb.ShowCollectionsResponse } +// Type return msg type func (t *ShowCollectionReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } +// Execute task execution func (t *ShowCollectionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_ShowCollections { return fmt.Errorf("show collection, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -416,15 +431,18 @@ func (t *ShowCollectionReqTask) Execute(ctx context.Context) error { return nil } +// CreatePartitionReqTask create partition request task type CreatePartitionReqTask struct { baseReqTask Req *milvuspb.CreatePartitionRequest } +// Type return msg type func (t *CreatePartitionReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } +// Execute task execution func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_CreatePartition { return fmt.Errorf("create partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -508,15 +526,18 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error { return t.core.setDdMsgSendFlag(true) } +// DropPartitionReqTask drop partition request task type DropPartitionReqTask struct { baseReqTask Req *milvuspb.DropPartitionRequest } +// Type return msg type func (t *DropPartitionReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } +// Execute task execution func (t *DropPartitionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_DropPartition { return fmt.Errorf("drop partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -606,16 +627,19 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error { return t.core.setDdMsgSendFlag(true) } +// HasPartitionReqTask has partition request task type HasPartitionReqTask struct { baseReqTask Req *milvuspb.HasPartitionRequest HasPartition bool } +// Type return msg type func (t *HasPartitionReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } +// Execute task execution func (t *HasPartitionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_HasPartition { return fmt.Errorf("has partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -628,16 +652,19 @@ func (t *HasPartitionReqTask) Execute(ctx context.Context) error { return nil } +// ShowPartitionReqTask show partition request task type ShowPartitionReqTask struct { baseReqTask Req *milvuspb.ShowPartitionsRequest Rsp *milvuspb.ShowPartitionsResponse } +// Type return msg type func (t *ShowPartitionReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } +// Execute task execution func (t *ShowPartitionReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_ShowPartitions { return fmt.Errorf("show partition, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -664,16 +691,19 @@ func (t *ShowPartitionReqTask) Execute(ctx context.Context) error { return nil } +// DescribeSegmentReqTask describe segment request task type DescribeSegmentReqTask struct { baseReqTask Req *milvuspb.DescribeSegmentRequest Rsp *milvuspb.DescribeSegmentResponse //TODO,return repeated segment id in the future } +// Type return msg type func (t *DescribeSegmentReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } +// Execute task execution func (t *DescribeSegmentReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_DescribeSegment { return fmt.Errorf("describe segment, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -712,16 +742,19 @@ func (t *DescribeSegmentReqTask) Execute(ctx context.Context) error { return nil } +// ShowSegmentReqTask show segment request task type ShowSegmentReqTask struct { baseReqTask Req *milvuspb.ShowSegmentsRequest Rsp *milvuspb.ShowSegmentsResponse } +// Type return msg type func (t *ShowSegmentReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } +// Execute task execution func (t *ShowSegmentReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_ShowSegments { return fmt.Errorf("show segments, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -750,15 +783,18 @@ func (t *ShowSegmentReqTask) Execute(ctx context.Context) error { return nil } +// CreateIndexReqTask create index request task type CreateIndexReqTask struct { baseReqTask Req *milvuspb.CreateIndexRequest } +// Type return msg type func (t *CreateIndexReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } +// Execute task execution func (t *CreateIndexReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_CreateIndex { return fmt.Errorf("create index, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -822,16 +858,19 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error { return nil } +// DescribeIndexReqTask describe index request task type DescribeIndexReqTask struct { baseReqTask Req *milvuspb.DescribeIndexRequest Rsp *milvuspb.DescribeIndexResponse } +// Type return msg type func (t *DescribeIndexReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } +// Execute task execution func (t *DescribeIndexReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_DescribeIndex { return fmt.Errorf("describe index, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -857,15 +896,18 @@ func (t *DescribeIndexReqTask) Execute(ctx context.Context) error { return nil } +// DropIndexReqTask drop index request task type DropIndexReqTask struct { baseReqTask Req *milvuspb.DropIndexRequest } +// Type return msg type func (t *DropIndexReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } +// Execute task execution func (t *DropIndexReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_DropIndex { return fmt.Errorf("drop index, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -890,15 +932,18 @@ func (t *DropIndexReqTask) Execute(ctx context.Context) error { return err } +// CreateAliasReqTask create alias request task type CreateAliasReqTask struct { baseReqTask Req *milvuspb.CreateAliasRequest } +// Type return msg type func (t *CreateAliasReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } +// Execute task execution func (t *CreateAliasReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_CreateAlias { return fmt.Errorf("create alias, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -948,15 +993,18 @@ func (t *CreateAliasReqTask) Execute(ctx context.Context) error { return t.core.setDdMsgSendFlag(true) } +// DropAliasReqTask drop alias request task type DropAliasReqTask struct { baseReqTask Req *milvuspb.DropAliasRequest } +// Type return msg type func (t *DropAliasReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } +// Execute task execution func (t *DropAliasReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_DropAlias { return fmt.Errorf("create alias, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -1016,18 +1064,20 @@ func (t *DropAliasReqTask) Execute(ctx context.Context) error { // Update DDOperation in etcd return t.core.setDdMsgSendFlag(true) - } +// AlterAliasReqTask alter alias request task type AlterAliasReqTask struct { baseReqTask Req *milvuspb.AlterAliasRequest } +// Type return msg type func (t *AlterAliasReqTask) Type() commonpb.MsgType { return t.Req.Base.MsgType } +// Execute task execution func (t *AlterAliasReqTask) Execute(ctx context.Context) error { if t.Type() != commonpb.MsgType_AlterAlias { return fmt.Errorf("alter alias, msg type = %s", commonpb.MsgType_name[int32(t.Type())]) @@ -1087,5 +1137,4 @@ func (t *AlterAliasReqTask) Execute(ctx context.Context) error { // Update DDOperation in etcd return t.core.setDdMsgSendFlag(true) - } diff --git a/internal/rootcoord/util.go b/internal/rootcoord/util.go index ebbb84756e..e006446956 100644 --- a/internal/rootcoord/util.go +++ b/internal/rootcoord/util.go @@ -23,7 +23,7 @@ import ( "github.com/milvus-io/milvus/internal/util/typeutil" ) -//return +// EqualKeyPairArray check whether 2 KeyValuePairs are equal func EqualKeyPairArray(p1 []*commonpb.KeyValuePair, p2 []*commonpb.KeyValuePair) bool { if len(p1) != len(p2) { return false @@ -44,6 +44,7 @@ func EqualKeyPairArray(p1 []*commonpb.KeyValuePair, p2 []*commonpb.KeyValuePair) return true } +// GetFieldSchemaByID return field schema by id func GetFieldSchemaByID(coll *etcdpb.CollectionInfo, fieldID typeutil.UniqueID) (*schemapb.FieldSchema, error) { for _, f := range coll.Schema.Fields { if f.FieldID == fieldID { @@ -53,7 +54,7 @@ func GetFieldSchemaByID(coll *etcdpb.CollectionInfo, fieldID typeutil.UniqueID) return nil, fmt.Errorf("field id = %d not found", fieldID) } -//GetFieldSchemaByIndexID return the field schema by it's index id +// GetFieldSchemaByIndexID return field schema by it's index id func GetFieldSchemaByIndexID(coll *etcdpb.CollectionInfo, idxID typeutil.UniqueID) (*schemapb.FieldSchema, error) { var fieldID typeutil.UniqueID exist := false @@ -122,7 +123,7 @@ func DecodeMsgPositions(str string, msgPositions *[]*msgstream.MsgPosition) erro return json.Unmarshal([]byte(str), msgPositions) } -//ToPhysicalChannel virtual channel -> physical channel +// ToPhysicalChannel get physical channel name from virtual channel name func ToPhysicalChannel(vchannel string) string { var idx int for idx = len(vchannel) - 1; idx >= 0; idx-- {