Add revive sub-lints and fix existing problems (#27495)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/27511/head
congqixia 2023-10-07 20:53:38 +08:00 committed by GitHub
parent 2607357147
commit 5d558623fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 125 additions and 111 deletions

View File

@ -53,6 +53,34 @@ linters-settings:
disabled: false
arguments:
- ["ID"] # Allow list
- name: context-as-argument
severity: warning
disabled: false
arguments:
- allowTypesBefore: "*testing.T"
- name: datarace
severity: warning
disabled: false
- name: duplicated-imports
severity: warning
disabled: false
- name: waitgroup-by-value
severity: warning
disabled: false
- name: indent-error-flow
severity: warning
disabled: false
arguments:
- "preserveScope"
- name: range-val-in-closure
severity: warning
disabled: false
- name: range-val-address
severity: warning
disabled: false
- name: string-of-int
severity: warning
disabled: false
misspell:
locale: US
gocritic:

View File

@ -28,7 +28,6 @@ import (
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/kv/mocks"
mockkv "github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
catalogmocks "github.com/milvus-io/milvus/internal/metastore/mocks"
@ -731,7 +730,7 @@ func TestMeta_MarkIndexAsDeleted(t *testing.T) {
}
func TestMeta_GetSegmentIndexes(t *testing.T) {
m := createMetaTable(&datacoord.Catalog{MetaKv: mocks.NewMetaKv(t)})
m := createMetaTable(&datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)})
t.Run("success", func(t *testing.T) {
segIndexes := m.GetSegmentIndexes(segID)

View File

@ -27,7 +27,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/kv/mocks"
mockkv "github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
catalogmocks "github.com/milvus-io/milvus/internal/metastore/mocks"
@ -183,7 +182,7 @@ func TestServer_GetIndexState(t *testing.T) {
)
s := &Server{
meta: &meta{
catalog: &datacoord.Catalog{MetaKv: mocks.NewMetaKv(t)},
catalog: &datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)},
},
allocator: newMockAllocator(),
notifyIndexChan: make(chan UniqueID, 1),
@ -204,7 +203,7 @@ func TestServer_GetIndexState(t *testing.T) {
})
s.meta = &meta{
catalog: &datacoord.Catalog{MetaKv: mocks.NewMetaKv(t)},
catalog: &datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)},
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
@ -255,7 +254,7 @@ func TestServer_GetIndexState(t *testing.T) {
})
s.meta = &meta{
catalog: &datacoord.Catalog{MetaKv: mocks.NewMetaKv(t)},
catalog: &datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)},
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
@ -373,7 +372,7 @@ func TestServer_GetSegmentIndexState(t *testing.T) {
)
s := &Server{
meta: &meta{
catalog: &datacoord.Catalog{MetaKv: mocks.NewMetaKv(t)},
catalog: &datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)},
indexes: map[UniqueID]map[UniqueID]*model.Index{},
segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{}},
},
@ -508,7 +507,7 @@ func TestServer_GetIndexBuildProgress(t *testing.T) {
s := &Server{
meta: &meta{
catalog: &datacoord.Catalog{MetaKv: mocks.NewMetaKv(t)},
catalog: &datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)},
indexes: map[UniqueID]map[UniqueID]*model.Index{},
segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{}},
},
@ -1540,7 +1539,7 @@ func TestServer_GetIndexInfos(t *testing.T) {
s := &Server{
meta: &meta{
catalog: &datacoord.Catalog{MetaKv: mocks.NewMetaKv(t)},
catalog: &datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)},
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
// finished

View File

@ -101,9 +101,8 @@ func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error
for i := range futures {
if !futures[i].OK() {
return nil, futures[i].Err()
} else {
resp[i] = &Blob{Value: futures[i].Value().([]byte)}
}
resp[i] = &Blob{Value: futures[i].Value().([]byte)}
}
return resp, nil

View File

@ -73,7 +73,7 @@ func tiTxnBegin(txn *txnkv.Client) (*transaction.KVTxn, error) {
return txn.Begin()
}
func tiTxnCommit(txn *transaction.KVTxn, ctx context.Context) error {
func tiTxnCommit(ctx context.Context, txn *transaction.KVTxn) error {
return txn.Commit(ctx)
}
@ -143,10 +143,9 @@ func (kv *txnTiKV) Has(key string) (bool, error) {
// Dont error out if not present unless failed call to tikv
if common.IsKeyNotExistError(err) {
return false, nil
} else {
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to read key: %s", key))
return false, loggingErr
}
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to read key: %s", key))
return false, loggingErr
}
CheckElapseAndWarn(start, "Slow txnTiKV Has() operation", zap.String("key", key))
return true, nil
@ -348,7 +347,7 @@ func (kv *txnTiKV) MultiSave(kvs map[string]string) error {
return loggingErr
}
}
err = kv.executeTxn(txn, ctx)
err = kv.executeTxn(ctx, txn)
if err != nil {
loggingErr = errors.Wrap(err, "Failed to commit for MultiSave()")
return loggingErr
@ -397,7 +396,7 @@ func (kv *txnTiKV) MultiRemove(keys []string) error {
}
}
err = kv.executeTxn(txn, ctx)
err = kv.executeTxn(ctx, txn)
if err != nil {
loggingErr = errors.Wrap(err, "Failed to commit for MultiRemove()")
return loggingErr
@ -481,7 +480,7 @@ func (kv *txnTiKV) MultiSaveAndRemove(saves map[string]string, removals []string
}
}
err = kv.executeTxn(txn, ctx)
err = kv.executeTxn(ctx, txn)
if err != nil {
loggingErr = errors.Wrap(err, "Failed to commit for MultiSaveAndRemove")
return loggingErr
@ -567,7 +566,7 @@ func (kv *txnTiKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removal
}
}
}
err = kv.executeTxn(txn, ctx)
err = kv.executeTxn(ctx, txn)
if err != nil {
loggingErr = errors.Wrap(err, "Failed to commit for MultiSaveAndRemoveWithPrefix")
return loggingErr
@ -620,12 +619,12 @@ func (kv *txnTiKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]b
return nil
}
func (kv *txnTiKV) executeTxn(txn *transaction.KVTxn, ctx context.Context) error {
func (kv *txnTiKV) executeTxn(ctx context.Context, txn *transaction.KVTxn) error {
start := timerecord.NewTimeRecorder("executeTxn")
elapsed := start.ElapseSpan()
metrics.MetaOpCounter.WithLabelValues(metrics.MetaTxnLabel, metrics.TotalLabel).Inc()
err := commitTxn(txn, ctx)
err := commitTxn(ctx, txn)
if err == nil {
metrics.MetaRequestLatency.WithLabelValues(metrics.MetaTxnLabel).Observe(float64(elapsed.Milliseconds()))
metrics.MetaOpCounter.WithLabelValues(metrics.MetaTxnLabel, metrics.SuccessLabel).Inc()
@ -651,10 +650,9 @@ func (kv *txnTiKV) getTiKVMeta(ctx context.Context, key string) (string, error)
if err == tikverr.ErrNotExist {
// If key is missing
return "", common.NewKeyNotExistError(key)
} else {
// If call to tikv fails
return "", errors.Wrap(err, fmt.Sprintf("Failed to get value for key %s in getTiKVMeta", key))
}
// If call to tikv fails
return "", errors.Wrap(err, fmt.Sprintf("Failed to get value for key %s in getTiKVMeta", key))
}
// Check if value is the empty placeholder
@ -692,7 +690,7 @@ func (kv *txnTiKV) putTiKVMeta(ctx context.Context, key, val string) error {
if err != nil {
return errors.Wrap(err, fmt.Sprintf("Failed to set value for key %s in putTiKVMeta", key))
}
err = commitTxn(txn, ctx1)
err = commitTxn(ctx1, txn)
elapsed := start.ElapseSpan()
metrics.MetaOpCounter.WithLabelValues(metrics.MetaPutLabel, metrics.TotalLabel).Inc()
@ -724,7 +722,7 @@ func (kv *txnTiKV) removeTiKVMeta(ctx context.Context, key string) error {
if err != nil {
return errors.Wrap(err, fmt.Sprintf("Failed to remove key %s in removeTiKVMeta", key))
}
err = commitTxn(txn, ctx1)
err = commitTxn(ctx1, txn)
elapsed := start.ElapseSpan()
metrics.MetaOpCounter.WithLabelValues(metrics.MetaRemoveLabel, metrics.TotalLabel).Inc()
@ -765,9 +763,8 @@ func isEmptyByte(value []byte) bool {
func convertEmptyByteToString(value []byte) string {
if isEmptyByte(value) {
return ""
} else {
return string(value)
}
return string(value)
}
// Convert string into EmptyValue if empty else cast to []byte. Will throw error if value is equal

View File

@ -325,7 +325,7 @@ func TestTiKVLoad(te *testing.T) {
kv := NewTiKV(txnClient, rootPath)
defer kv.Close()
commitTxn = func(txn *transaction.KVTxn, ctx context.Context) error {
commitTxn = func(ctx context.Context, txn *transaction.KVTxn) error {
return fmt.Errorf("bad txn commit!")
}
defer func() {

View File

@ -137,14 +137,13 @@ func CreateSearchPlan(schemaPb *schemapb.CollectionSchema, exprStr string, vecto
var vectorType planpb.VectorType
if !typeutil.IsVectorType(dataType) {
return nil, fmt.Errorf("field (%s) to search is not of vector data type", vectorFieldName)
}
if dataType == schemapb.DataType_FloatVector {
vectorType = planpb.VectorType_FloatVector
} else if dataType == schemapb.DataType_BinaryVector {
vectorType = planpb.VectorType_BinaryVector
} else {
if dataType == schemapb.DataType_FloatVector {
vectorType = planpb.VectorType_FloatVector
} else if dataType == schemapb.DataType_BinaryVector {
vectorType = planpb.VectorType_BinaryVector
} else {
vectorType = planpb.VectorType_Float16Vector
}
vectorType = planpb.VectorType_Float16Vector
}
planNode := &planpb.PlanNode{
Node: &planpb.PlanNode_VectorAnns{

View File

@ -391,34 +391,34 @@ func TestProxy_FlushAll_DbCollection(t *testing.T) {
globalMetaCache = cache
for _, test := range tests {
factory := dependency.NewDefaultFactory(true)
ctx := context.Background()
paramtable.Init()
node, err := NewProxy(ctx, factory)
assert.NoError(t, err)
node.stateCode.Store(commonpb.StateCode_Healthy)
node.tsoAllocator = &timestampAllocator{
tso: newMockTimestampAllocatorInterface(),
}
Params.Save(Params.ProxyCfg.MaxTaskNum.Key, "1000")
node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory)
assert.NoError(t, err)
err = node.sched.Start()
assert.NoError(t, err)
defer node.sched.Close()
node.dataCoord = mocks.NewMockDataCoordClient(t)
node.rootCoord = mocks.NewMockRootCoordClient(t)
successStatus := &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}
node.dataCoord.(*mocks.MockDataCoordClient).EXPECT().Flush(mock.Anything, mock.Anything).
Return(&datapb.FlushResponse{Status: successStatus}, nil).Maybe()
node.rootCoord.(*mocks.MockRootCoordClient).EXPECT().ShowCollections(mock.Anything, mock.Anything).
Return(&milvuspb.ShowCollectionsResponse{Status: successStatus, CollectionNames: []string{"col-0"}}, nil).Maybe()
node.rootCoord.(*mocks.MockRootCoordClient).EXPECT().ListDatabases(mock.Anything, mock.Anything).
Return(&milvuspb.ListDatabasesResponse{Status: successStatus, DbNames: []string{"default"}}, nil).Maybe()
t.Run(test.testName, func(t *testing.T) {
factory := dependency.NewDefaultFactory(true)
ctx := context.Background()
paramtable.Init()
node, err := NewProxy(ctx, factory)
assert.NoError(t, err)
node.stateCode.Store(commonpb.StateCode_Healthy)
node.tsoAllocator = &timestampAllocator{
tso: newMockTimestampAllocatorInterface(),
}
Params.Save(Params.ProxyCfg.MaxTaskNum.Key, "1000")
node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory)
assert.NoError(t, err)
err = node.sched.Start()
assert.NoError(t, err)
defer node.sched.Close()
node.dataCoord = mocks.NewMockDataCoordClient(t)
node.rootCoord = mocks.NewMockRootCoordClient(t)
successStatus := &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}
node.dataCoord.(*mocks.MockDataCoordClient).EXPECT().Flush(mock.Anything, mock.Anything).
Return(&datapb.FlushResponse{Status: successStatus}, nil).Maybe()
node.rootCoord.(*mocks.MockRootCoordClient).EXPECT().ShowCollections(mock.Anything, mock.Anything).
Return(&milvuspb.ShowCollectionsResponse{Status: successStatus, CollectionNames: []string{"col-0"}}, nil).Maybe()
node.rootCoord.(*mocks.MockRootCoordClient).EXPECT().ListDatabases(mock.Anything, mock.Anything).
Return(&milvuspb.ListDatabasesResponse{Status: successStatus, DbNames: []string{"default"}}, nil).Maybe()
resp, err := node.FlushAll(ctx, test.FlushRequest)
assert.NoError(t, err)
if test.ExpectedSuccess {

View File

@ -884,14 +884,13 @@ func createQueryPlan(schemaPb *schemapb.CollectionSchema, exprStr string, vector
var vectorType planpb.VectorType
if !typeutil.IsVectorType(dataType) {
return nil, fmt.Errorf("field (%s) to search is not of vector data type", vectorFieldName)
}
if dataType == schemapb.DataType_FloatVector {
vectorType = planpb.VectorType_FloatVector
} else if dataType == schemapb.DataType_BinaryVector {
vectorType = planpb.VectorType_BinaryVector
} else {
if dataType == schemapb.DataType_FloatVector {
vectorType = planpb.VectorType_FloatVector
} else if dataType == schemapb.DataType_BinaryVector {
vectorType = planpb.VectorType_BinaryVector
} else {
vectorType = planpb.VectorType_Float16Vector
}
vectorType = planpb.VectorType_Float16Vector
}
planNode := &planpb.PlanNode{

View File

@ -36,7 +36,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
. "github.com/milvus-io/milvus/pkg/util/typeutil"
)
type Collection struct {
@ -100,8 +99,8 @@ func (partition *Partition) Clone() *Partition {
type CollectionManager struct {
rwmutex sync.RWMutex
collections map[UniqueID]*Collection
partitions map[UniqueID]*Partition
collections map[typeutil.UniqueID]*Collection
partitions map[typeutil.UniqueID]*Partition
catalog metastore.QueryCoordCatalog
}
@ -275,21 +274,21 @@ func (m *CollectionManager) upgradeRecover(broker Broker) error {
return nil
}
func (m *CollectionManager) GetCollection(collectionID UniqueID) *Collection {
func (m *CollectionManager) GetCollection(collectionID typeutil.UniqueID) *Collection {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
return m.collections[collectionID]
}
func (m *CollectionManager) GetPartition(partitionID UniqueID) *Partition {
func (m *CollectionManager) GetPartition(partitionID typeutil.UniqueID) *Partition {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
return m.partitions[partitionID]
}
func (m *CollectionManager) GetLoadType(collectionID UniqueID) querypb.LoadType {
func (m *CollectionManager) GetLoadType(collectionID typeutil.UniqueID) querypb.LoadType {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
@ -300,7 +299,7 @@ func (m *CollectionManager) GetLoadType(collectionID UniqueID) querypb.LoadType
return querypb.LoadType_UnKnownType
}
func (m *CollectionManager) GetReplicaNumber(collectionID UniqueID) int32 {
func (m *CollectionManager) GetReplicaNumber(collectionID typeutil.UniqueID) int32 {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
@ -312,14 +311,14 @@ func (m *CollectionManager) GetReplicaNumber(collectionID UniqueID) int32 {
}
// CalculateLoadPercentage checks if collection is currently fully loaded.
func (m *CollectionManager) CalculateLoadPercentage(collectionID UniqueID) int32 {
func (m *CollectionManager) CalculateLoadPercentage(collectionID typeutil.UniqueID) int32 {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
return m.calculateLoadPercentage(collectionID)
}
func (m *CollectionManager) calculateLoadPercentage(collectionID UniqueID) int32 {
func (m *CollectionManager) calculateLoadPercentage(collectionID typeutil.UniqueID) int32 {
_, ok := m.collections[collectionID]
if ok {
partitions := m.getPartitionsByCollection(collectionID)
@ -332,7 +331,7 @@ func (m *CollectionManager) calculateLoadPercentage(collectionID UniqueID) int32
return -1
}
func (m *CollectionManager) GetPartitionLoadPercentage(partitionID UniqueID) int32 {
func (m *CollectionManager) GetPartitionLoadPercentage(partitionID typeutil.UniqueID) int32 {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
@ -343,7 +342,7 @@ func (m *CollectionManager) GetPartitionLoadPercentage(partitionID UniqueID) int
return -1
}
func (m *CollectionManager) CalculateLoadStatus(collectionID UniqueID) querypb.LoadStatus {
func (m *CollectionManager) CalculateLoadStatus(collectionID typeutil.UniqueID) querypb.LoadStatus {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
@ -366,7 +365,7 @@ func (m *CollectionManager) CalculateLoadStatus(collectionID UniqueID) querypb.L
return querypb.LoadStatus_Invalid
}
func (m *CollectionManager) GetFieldIndex(collectionID UniqueID) map[int64]int64 {
func (m *CollectionManager) GetFieldIndex(collectionID typeutil.UniqueID) map[int64]int64 {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
@ -377,7 +376,7 @@ func (m *CollectionManager) GetFieldIndex(collectionID UniqueID) map[int64]int64
return nil
}
func (m *CollectionManager) Exist(collectionID UniqueID) bool {
func (m *CollectionManager) Exist(collectionID typeutil.UniqueID) bool {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
@ -411,14 +410,14 @@ func (m *CollectionManager) GetAllPartitions() []*Partition {
return lo.Values(m.partitions)
}
func (m *CollectionManager) GetPartitionsByCollection(collectionID UniqueID) []*Partition {
func (m *CollectionManager) GetPartitionsByCollection(collectionID typeutil.UniqueID) []*Partition {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
return m.getPartitionsByCollection(collectionID)
}
func (m *CollectionManager) getPartitionsByCollection(collectionID UniqueID) []*Partition {
func (m *CollectionManager) getPartitionsByCollection(collectionID typeutil.UniqueID) []*Partition {
partitions := make([]*Partition, 0)
for _, partition := range m.partitions {
if partition.CollectionID == collectionID {
@ -547,7 +546,7 @@ func (m *CollectionManager) UpdateLoadPercent(partitionID int64, loadPercent int
}
// RemoveCollection removes collection and its partitions.
func (m *CollectionManager) RemoveCollection(collectionID UniqueID) error {
func (m *CollectionManager) RemoveCollection(collectionID typeutil.UniqueID) error {
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
@ -567,7 +566,7 @@ func (m *CollectionManager) RemoveCollection(collectionID UniqueID) error {
return nil
}
func (m *CollectionManager) RemovePartition(collectionID UniqueID, partitionIDs ...UniqueID) error {
func (m *CollectionManager) RemovePartition(collectionID typeutil.UniqueID, partitionIDs ...typeutil.UniqueID) error {
if len(partitionIDs) == 0 {
return nil
}
@ -578,7 +577,7 @@ func (m *CollectionManager) RemovePartition(collectionID UniqueID, partitionIDs
return m.removePartition(collectionID, partitionIDs...)
}
func (m *CollectionManager) removePartition(collectionID UniqueID, partitionIDs ...UniqueID) error {
func (m *CollectionManager) removePartition(collectionID typeutil.UniqueID, partitionIDs ...typeutil.UniqueID) error {
err := m.catalog.ReleasePartition(collectionID, partitionIDs...)
if err != nil {
return err

View File

@ -28,16 +28,15 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
. "github.com/milvus-io/milvus/pkg/util/typeutil"
)
type Replica struct {
*querypb.Replica
nodes UniqueSet // a helper field for manipulating replica's Nodes slice field
nodes typeutil.UniqueSet // a helper field for manipulating replica's Nodes slice field
rwmutex sync.RWMutex
}
func NewReplica(replica *querypb.Replica, nodes UniqueSet) *Replica {
func NewReplica(replica *querypb.Replica, nodes typeutil.UniqueSet) *Replica {
return &Replica{
Replica: replica,
nodes: nodes,
@ -92,7 +91,7 @@ func (replica *Replica) Clone() *Replica {
defer replica.rwmutex.RUnlock()
return &Replica{
Replica: proto.Clone(replica.Replica).(*querypb.Replica),
nodes: NewUniqueSet(replica.Replica.Nodes...),
nodes: typeutil.NewUniqueSet(replica.Replica.Nodes...),
}
}
@ -100,7 +99,7 @@ type ReplicaManager struct {
rwmutex sync.RWMutex
idAllocator func() (int64, error)
replicas map[UniqueID]*Replica
replicas map[typeutil.UniqueID]*Replica
catalog metastore.QueryCoordCatalog
}
@ -128,7 +127,7 @@ func (m *ReplicaManager) Recover(collections []int64) error {
if collectionSet.Contain(replica.GetCollectionID()) {
m.replicas[replica.GetID()] = &Replica{
Replica: replica,
nodes: NewUniqueSet(replica.GetNodes()...),
nodes: typeutil.NewUniqueSet(replica.GetNodes()...),
}
log.Info("recover replica",
zap.Int64("collectionID", replica.GetCollectionID()),
@ -150,7 +149,7 @@ func (m *ReplicaManager) Recover(collections []int64) error {
return nil
}
func (m *ReplicaManager) Get(id UniqueID) *Replica {
func (m *ReplicaManager) Get(id typeutil.UniqueID) *Replica {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
@ -180,7 +179,7 @@ func (m *ReplicaManager) Put(replicas ...*Replica) error {
return m.put(replicas...)
}
func (m *ReplicaManager) spawn(collectionID UniqueID, rgName string) (*Replica, error) {
func (m *ReplicaManager) spawn(collectionID typeutil.UniqueID, rgName string) (*Replica, error) {
id, err := m.idAllocator()
if err != nil {
return nil, err
@ -191,7 +190,7 @@ func (m *ReplicaManager) spawn(collectionID UniqueID, rgName string) (*Replica,
CollectionID: collectionID,
ResourceGroup: rgName,
},
nodes: make(UniqueSet),
nodes: make(typeutil.UniqueSet),
}, nil
}
@ -208,7 +207,7 @@ func (m *ReplicaManager) put(replicas ...*Replica) error {
// RemoveCollection removes replicas of given collection,
// returns error if failed to remove replica from KV
func (m *ReplicaManager) RemoveCollection(collectionID UniqueID) error {
func (m *ReplicaManager) RemoveCollection(collectionID typeutil.UniqueID) error {
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
@ -224,7 +223,7 @@ func (m *ReplicaManager) RemoveCollection(collectionID UniqueID) error {
return nil
}
func (m *ReplicaManager) GetByCollection(collectionID UniqueID) []*Replica {
func (m *ReplicaManager) GetByCollection(collectionID typeutil.UniqueID) []*Replica {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
@ -238,7 +237,7 @@ func (m *ReplicaManager) GetByCollection(collectionID UniqueID) []*Replica {
return replicas
}
func (m *ReplicaManager) GetByCollectionAndNode(collectionID, nodeID UniqueID) *Replica {
func (m *ReplicaManager) GetByCollectionAndNode(collectionID, nodeID typeutil.UniqueID) *Replica {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
@ -279,7 +278,7 @@ func (m *ReplicaManager) GetByResourceGroup(rgName string) []*Replica {
return ret
}
func (m *ReplicaManager) AddNode(replicaID UniqueID, nodes ...UniqueID) error {
func (m *ReplicaManager) AddNode(replicaID typeutil.UniqueID, nodes ...typeutil.UniqueID) error {
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
@ -293,7 +292,7 @@ func (m *ReplicaManager) AddNode(replicaID UniqueID, nodes ...UniqueID) error {
return m.put(replica)
}
func (m *ReplicaManager) RemoveNode(replicaID UniqueID, nodes ...UniqueID) error {
func (m *ReplicaManager) RemoveNode(replicaID typeutil.UniqueID, nodes ...typeutil.UniqueID) error {
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
@ -307,7 +306,7 @@ func (m *ReplicaManager) RemoveNode(replicaID UniqueID, nodes ...UniqueID) error
return m.put(replica)
}
func (m *ReplicaManager) GetResourceGroupByCollection(collection UniqueID) typeutil.Set[string] {
func (m *ReplicaManager) GetResourceGroupByCollection(collection typeutil.UniqueID) typeutil.Set[string] {
m.rwmutex.Lock()
defer m.rwmutex.Unlock()

View File

@ -29,7 +29,6 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
. "github.com/milvus-io/milvus/pkg/util/typeutil"
)
var (
@ -56,7 +55,7 @@ var DefaultResourceGroupName = "__default_resource_group"
var DefaultResourceGroupCapacity = 1000000
type ResourceGroup struct {
nodes UniqueSet
nodes typeutil.UniqueSet
capacity int
}

View File

@ -228,12 +228,10 @@ func (kc *Consumer) CheckTopicValid(topic string) error {
if err != nil {
switch v := err.(type) {
case kafka.Error:
if v.Code() == kafka.ErrUnknownTopic || v.Code() == kafka.ErrUnknownTopicOrPart {
return merr.WrapErrMqTopicNotFound(topic, err.Error())
} else {
return merr.WrapErrMqInternal(err)
}
return merr.WrapErrMqInternal(err)
default:
return err
}

View File

@ -28,11 +28,10 @@ func (c floatVectorBaseChecker) CheckTrain(params map[string]string) error {
}
func (c floatVectorBaseChecker) CheckValidDataType(dType schemapb.DataType) error {
if dType == schemapb.DataType_FloatVector || dType == schemapb.DataType_Float16Vector {
return nil
} else {
if dType != schemapb.DataType_FloatVector && dType != schemapb.DataType_Float16Vector {
return fmt.Errorf("float or float16 vector are only supported")
}
return nil
}
func (c floatVectorBaseChecker) SetDefaultMetricTypeIfNotExist(params map[string]string) {