diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 1aca5ef297..d468a9f833 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -41,9 +41,6 @@ const ( // ComponentPrefix prefix for rootcoord component ComponentPrefix = "root-coord" - // ProxyMetaPrefix prefix for proxy meta - ProxyMetaPrefix = ComponentPrefix + "/proxy" - // CollectionMetaPrefix prefix for collection meta CollectionMetaPrefix = ComponentPrefix + "/collection" @@ -94,7 +91,6 @@ const ( type MetaTable struct { txn kv.TxnKV // client of a reliable txnkv service, i.e. etcd client snapshot kv.SnapShotKV // client of a reliable snapshotkv service, i.e. etcd client - proxyID2Meta map[typeutil.UniqueID]pb.ProxyMeta // proxy id to proxy meta collID2Meta map[typeutil.UniqueID]pb.CollectionInfo // collection id -> collection meta collName2ID map[string]typeutil.UniqueID // collection name to collection id collAlias2ID map[string]typeutil.UniqueID // collection alias to collection id @@ -102,20 +98,18 @@ type MetaTable struct { segID2IndexMeta map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo // collection id/index_id/partition_id/segment_id -> meta indexID2Meta map[typeutil.UniqueID]pb.IndexInfo // collection id/index_id -> meta - proxyLock sync.RWMutex - ddLock sync.RWMutex - credLock sync.RWMutex + ddLock sync.RWMutex + credLock sync.RWMutex } // NewMetaTable creates meta table for rootcoord, which stores all in-memory information // for collection, partition, segment, index etc. func NewMetaTable(txn kv.TxnKV, snap kv.SnapShotKV) (*MetaTable, error) { mt := &MetaTable{ - txn: txn, - snapshot: snap, - proxyLock: sync.RWMutex{}, - ddLock: sync.RWMutex{}, - credLock: sync.RWMutex{}, + txn: txn, + snapshot: snap, + ddLock: sync.RWMutex{}, + credLock: sync.RWMutex{}, } err := mt.reloadFromKV() if err != nil { @@ -125,7 +119,6 @@ func NewMetaTable(txn kv.TxnKV, snap kv.SnapShotKV) (*MetaTable, error) { } func (mt *MetaTable) reloadFromKV() error { - mt.proxyID2Meta = make(map[typeutil.UniqueID]pb.ProxyMeta) mt.collID2Meta = make(map[typeutil.UniqueID]pb.CollectionInfo) mt.collName2ID = make(map[string]typeutil.UniqueID) mt.collAlias2ID = make(map[string]typeutil.UniqueID) @@ -133,25 +126,7 @@ func (mt *MetaTable) reloadFromKV() error { mt.segID2IndexMeta = make(map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo) mt.indexID2Meta = make(map[typeutil.UniqueID]pb.IndexInfo) - _, values, err := mt.txn.LoadWithPrefix(ProxyMetaPrefix) - if err != nil { - return err - } - - for _, value := range values { - if bytes.Equal([]byte(value), suffixSnapshotTombstone) { - // backward compatibility, IndexMeta used to be in SnapshotKV - continue - } - proxyMeta := pb.ProxyMeta{} - err = proto.Unmarshal([]byte(value), &proxyMeta) - if err != nil { - return fmt.Errorf("rootcoord Unmarshal pb.ProxyMeta err:%w", err) - } - mt.proxyID2Meta[proxyMeta.ID] = proxyMeta - } - - _, values, err = mt.snapshot.LoadWithPrefix(CollectionAliasMetaPrefix, 0) + _, values, err := mt.snapshot.LoadWithPrefix(CollectionAliasMetaPrefix, 0) if err != nil { return err } @@ -239,27 +214,6 @@ func (mt *MetaTable) reloadFromKV() error { return nil } -// AddProxy add proxy -func (mt *MetaTable) AddProxy(po *pb.ProxyMeta) error { - mt.proxyLock.Lock() - defer mt.proxyLock.Unlock() - - k := fmt.Sprintf("%s/%d", ProxyMetaPrefix, po.ID) - v, err := proto.Marshal(po) - if err != nil { - log.Error("Failed to marshal ProxyMeta in AddProxy", zap.Error(err)) - return err - } - - err = mt.txn.Save(k, string(v)) - if err != nil { - log.Error("SnapShotKV Save fail", zap.Error(err)) - panic("SnapShotKV Save fail") - } - mt.proxyID2Meta[po.ID] = *po - return nil -} - // AddCollection add collection func (mt *MetaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestamp, idx []*pb.IndexInfo, ddOpStr string) error { mt.ddLock.Lock() @@ -849,12 +803,12 @@ func (mt *MetaTable) MarkIndexDeleted(collName, fieldName, indexName string) err mt.ddLock.Lock() defer mt.ddLock.Unlock() - collMeta, err := mt.unlockGetCollectionInfo(collName) + collMeta, err := mt.getCollectionInfoInternal(collName) if err != nil { log.Error("get collection meta failed", zap.String("collName", collName), zap.Error(err)) return fmt.Errorf("collection name = %s not has meta", collName) } - fieldSch, err := mt.unlockGetFieldSchema(collName, fieldName) + fieldSch, err := mt.getFieldSchemaInternal(collName, fieldName) if err != nil { return err } @@ -924,7 +878,7 @@ func (mt *MetaTable) DropIndex(collName, fieldName, indexName string) (typeutil. if !ok { return 0, false, fmt.Errorf("collection name = %s not has meta", collName) } - fieldSch, err := mt.unlockGetFieldSchema(collName, fieldName) + fieldSch, err := mt.getFieldSchemaInternal(collName, fieldName) if err != nil { return 0, false, err } @@ -991,7 +945,7 @@ func (mt *MetaTable) GetInitBuildIDs(collName, indexName string) ([]UniqueID, er mt.ddLock.RLock() defer mt.ddLock.RUnlock() - collMeta, err := mt.unlockGetCollectionInfo(collName) + collMeta, err := mt.getCollectionInfoInternal(collName) if err != nil { return nil, err } @@ -1230,10 +1184,10 @@ func (mt *MetaTable) GetFieldSchema(collName string, fieldName string) (schemapb mt.ddLock.RLock() defer mt.ddLock.RUnlock() - return mt.unlockGetFieldSchema(collName, fieldName) + return mt.getFieldSchemaInternal(collName, fieldName) } -func (mt *MetaTable) unlockGetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) { +func (mt *MetaTable) getFieldSchemaInternal(collName string, fieldName string) (schemapb.FieldSchema, error) { collID, ok := mt.collName2ID[collName] if !ok { collID, ok = mt.collAlias2ID[collName] @@ -1258,10 +1212,10 @@ func (mt *MetaTable) unlockGetFieldSchema(collName string, fieldName string) (sc func (mt *MetaTable) IsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool { mt.ddLock.RLock() defer mt.ddLock.RUnlock() - return mt.unlockIsSegmentIndexed(segID, fieldSchema, indexParams) + return mt.isSegmentIndexedInternal(segID, fieldSchema, indexParams) } -func (mt *MetaTable) unlockIsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool { +func (mt *MetaTable) isSegmentIndexedInternal(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool { segIdx, ok := mt.segID2IndexMeta[segID] if !ok { return false @@ -1283,7 +1237,7 @@ func (mt *MetaTable) unlockIsSegmentIndexed(segID typeutil.UniqueID, fieldSchema return exist } -func (mt *MetaTable) unlockGetCollectionInfo(collName string) (pb.CollectionInfo, error) { +func (mt *MetaTable) getCollectionInfoInternal(collName string) (pb.CollectionInfo, error) { collID, ok := mt.collName2ID[collName] if !ok { collID, ok = mt.collAlias2ID[collName] @@ -1344,17 +1298,18 @@ func (mt *MetaTable) checkFieldIndexDuplicate(collMeta pb.CollectionInfo, fieldS } // GetNotIndexedSegments return segment ids which have no index +// TODO, split GetNotIndexedSegments into two calls, one is to update CollectionMetaPrefix, IndexMetaPrefix, the otherone is trigger index build task func (mt *MetaTable) GetNotIndexedSegments(collName string, fieldName string, idxInfo *pb.IndexInfo, segIDs []typeutil.UniqueID) ([]typeutil.UniqueID, schemapb.FieldSchema, error) { mt.ddLock.Lock() defer mt.ddLock.Unlock() - collMeta, err := mt.unlockGetCollectionInfo(collName) + collMeta, err := mt.getCollectionInfoInternal(collName) if err != nil { // error here if collection not found. return nil, schemapb.FieldSchema{}, err } - fieldSchema, err := mt.unlockGetFieldSchema(collName, fieldName) + fieldSchema, err := mt.getFieldSchemaInternal(collName, fieldName) if err != nil { // error here if field not found. return nil, fieldSchema, err @@ -1421,7 +1376,7 @@ func (mt *MetaTable) GetNotIndexedSegments(collName string, fieldName string, id rstID := make([]typeutil.UniqueID, 0, 16) for _, segID := range segIDs { - if exist := mt.unlockIsSegmentIndexed(segID, &fieldSchema, idxInfo.IndexParams); !exist { + if exist := mt.isSegmentIndexedInternal(segID, &fieldSchema, idxInfo.IndexParams); !exist { rstID = append(rstID, segID) } } diff --git a/internal/rootcoord/meta_table_test.go b/internal/rootcoord/meta_table_test.go index 5f80e3048d..b322f4d96b 100644 --- a/internal/rootcoord/meta_table_test.go +++ b/internal/rootcoord/meta_table_test.go @@ -27,7 +27,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/kv" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" memkv "github.com/milvus-io/milvus/internal/kv/mem" @@ -102,84 +101,6 @@ func (m *mockTestTxnKV) MultiRemove(keys []string) error { return m.multiRemove(keys) } -func Test_MockKV(t *testing.T) { - k1 := &mockTestKV{} - kt := &mockTestTxnKV{} - prefix := make(map[string][]string) - k1.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) { - if val, ok := prefix[key]; ok { - return nil, val, nil - } - return nil, nil, fmt.Errorf("load prefix error") - } - kt.loadWithPrefix = func(key string) ([]string, []string, error) { - if val, ok := prefix[key]; ok { - return nil, val, nil - } - return nil, nil, fmt.Errorf("load prefix error") - } - - _, err := NewMetaTable(kt, k1) - assert.NotNil(t, err) - assert.EqualError(t, err, "load prefix error") - - // proxy - prefix[ProxyMetaPrefix] = []string{"porxy-meta"} - _, err = NewMetaTable(kt, k1) - assert.NotNil(t, err) - - value, err := proto.Marshal(&pb.ProxyMeta{}) - assert.Nil(t, err) - prefix[ProxyMetaPrefix] = []string{string(value)} - _, err = NewMetaTable(kt, k1) - assert.NotNil(t, err) - - // collection - prefix[CollectionMetaPrefix] = []string{"collection-meta"} - _, err = NewMetaTable(kt, k1) - assert.NotNil(t, err) - - value, err = proto.Marshal(&pb.CollectionInfo{Schema: &schemapb.CollectionSchema{}}) - assert.Nil(t, err) - prefix[CollectionMetaPrefix] = []string{string(value)} - _, err = NewMetaTable(kt, k1) - assert.NotNil(t, err) - - // segment index - prefix[SegmentIndexMetaPrefix] = []string{"segment-index-meta"} - _, err = NewMetaTable(kt, k1) - assert.NotNil(t, err) - - value, err = proto.Marshal(&pb.SegmentIndexInfo{}) - assert.Nil(t, err) - prefix[SegmentIndexMetaPrefix] = []string{string(value)} - _, err = NewMetaTable(kt, k1) - assert.NotNil(t, err) - - prefix[SegmentIndexMetaPrefix] = []string{string(value), string(value)} - _, err = NewMetaTable(kt, k1) - assert.NotNil(t, err) - assert.EqualError(t, err, "load prefix error") - - // index - prefix[IndexMetaPrefix] = []string{"index-meta"} - _, err = NewMetaTable(kt, k1) - assert.NotNil(t, err) - - value, err = proto.Marshal(&pb.IndexInfo{}) - assert.Nil(t, err) - prefix[IndexMetaPrefix] = []string{string(value)} - m1, err := NewMetaTable(kt, k1) - assert.NotNil(t, err) - assert.EqualError(t, err, "load prefix error") - prefix[CollectionAliasMetaPrefix] = []string{"alias-meta"} - - k1.save = func(key string, value string, ts typeutil.Timestamp) error { - return fmt.Errorf("save proxy error") - } - assert.Panics(t, func() { m1.AddProxy(&pb.ProxyMeta{}) }) -} - func TestMetaTable(t *testing.T) { const ( collName = "testColl" @@ -491,19 +412,6 @@ func TestMetaTable(t *testing.T) { assert.Zero(t, len(idx)) }) - wg.Add(1) - t.Run("reload meta", func(t *testing.T) { - defer wg.Done() - po := pb.ProxyMeta{ - ID: 101, - } - err = mt.AddProxy(&po) - assert.Nil(t, err) - - _, err = NewMetaTable(txnKV, skv) - assert.Nil(t, err) - }) - wg.Add(1) t.Run("drop index", func(t *testing.T) { defer wg.Done() @@ -964,12 +872,12 @@ func TestMetaTable(t *testing.T) { assert.Nil(t, err) mt.collID2Meta = make(map[int64]pb.CollectionInfo) - _, err = mt.unlockGetFieldSchema(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name) + _, err = mt.getFieldSchemaInternal(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("collection %s not found", collInfo.Schema.Name)) mt.collName2ID = make(map[string]int64) - _, err = mt.unlockGetFieldSchema(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name) + _, err = mt.getFieldSchemaInternal(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name) assert.NotNil(t, err) assert.EqualError(t, err, fmt.Sprintf("collection %s not found", collInfo.Schema.Name)) }) @@ -1318,7 +1226,6 @@ func TestFixIssue10540(t *testing.T) { //txnKV := etcdkv.NewEtcdKVWithClient(etcdCli, rootPath) txnKV := memkv.NewMemoryKV() // compose rc7 legace tombstone cases - txnKV.Save(path.Join(ProxyMetaPrefix, "1"), string(suffixSnapshotTombstone)) txnKV.Save(path.Join(SegmentIndexMetaPrefix, "2"), string(suffixSnapshotTombstone)) txnKV.Save(path.Join(IndexMetaPrefix, "3"), string(suffixSnapshotTombstone)) @@ -1367,7 +1274,7 @@ func TestMetaTable_unlockGetCollectionInfo(t *testing.T) { 100: {ID: 100, Schema: &schemapb.CollectionSchema{Name: "test"}}, }, } - info, err := mt.unlockGetCollectionInfo("test") + info, err := mt.getCollectionInfoInternal("test") assert.NoError(t, err) assert.Equal(t, UniqueID(100), info.ID) assert.Equal(t, "test", info.GetSchema().GetName()) @@ -1375,7 +1282,7 @@ func TestMetaTable_unlockGetCollectionInfo(t *testing.T) { t.Run("collection name not found", func(t *testing.T) { mt := &MetaTable{collName2ID: nil, collAlias2ID: nil} - _, err := mt.unlockGetCollectionInfo("test") + _, err := mt.getCollectionInfoInternal("test") assert.Error(t, err) }) @@ -1385,7 +1292,7 @@ func TestMetaTable_unlockGetCollectionInfo(t *testing.T) { collAlias2ID: nil, collID2Meta: nil, } - _, err := mt.unlockGetCollectionInfo("test") + _, err := mt.getCollectionInfoInternal("test") assert.Error(t, err) }) @@ -1395,7 +1302,7 @@ func TestMetaTable_unlockGetCollectionInfo(t *testing.T) { collAlias2ID: map[string]typeutil.UniqueID{"test": 100}, collID2Meta: nil, } - _, err := mt.unlockGetCollectionInfo("test") + _, err := mt.getCollectionInfoInternal("test") assert.Error(t, err) }) }