mirror of https://github.com/milvus-io/milvus.git
Refine meta, remove redundant proxy data (#17631)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>pull/17631/merge
parent
b15c24a554
commit
144f7a2d80
|
@ -41,9 +41,6 @@ const (
|
|||
// ComponentPrefix prefix for rootcoord component
|
||||
ComponentPrefix = "root-coord"
|
||||
|
||||
// ProxyMetaPrefix prefix for proxy meta
|
||||
ProxyMetaPrefix = ComponentPrefix + "/proxy"
|
||||
|
||||
// CollectionMetaPrefix prefix for collection meta
|
||||
CollectionMetaPrefix = ComponentPrefix + "/collection"
|
||||
|
||||
|
@ -94,7 +91,6 @@ const (
|
|||
type MetaTable struct {
|
||||
txn kv.TxnKV // client of a reliable txnkv service, i.e. etcd client
|
||||
snapshot kv.SnapShotKV // client of a reliable snapshotkv service, i.e. etcd client
|
||||
proxyID2Meta map[typeutil.UniqueID]pb.ProxyMeta // proxy id to proxy meta
|
||||
collID2Meta map[typeutil.UniqueID]pb.CollectionInfo // collection id -> collection meta
|
||||
collName2ID map[string]typeutil.UniqueID // collection name to collection id
|
||||
collAlias2ID map[string]typeutil.UniqueID // collection alias to collection id
|
||||
|
@ -102,20 +98,18 @@ type MetaTable struct {
|
|||
segID2IndexMeta map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo // collection id/index_id/partition_id/segment_id -> meta
|
||||
indexID2Meta map[typeutil.UniqueID]pb.IndexInfo // collection id/index_id -> meta
|
||||
|
||||
proxyLock sync.RWMutex
|
||||
ddLock sync.RWMutex
|
||||
credLock sync.RWMutex
|
||||
ddLock sync.RWMutex
|
||||
credLock sync.RWMutex
|
||||
}
|
||||
|
||||
// NewMetaTable creates meta table for rootcoord, which stores all in-memory information
|
||||
// for collection, partition, segment, index etc.
|
||||
func NewMetaTable(txn kv.TxnKV, snap kv.SnapShotKV) (*MetaTable, error) {
|
||||
mt := &MetaTable{
|
||||
txn: txn,
|
||||
snapshot: snap,
|
||||
proxyLock: sync.RWMutex{},
|
||||
ddLock: sync.RWMutex{},
|
||||
credLock: sync.RWMutex{},
|
||||
txn: txn,
|
||||
snapshot: snap,
|
||||
ddLock: sync.RWMutex{},
|
||||
credLock: sync.RWMutex{},
|
||||
}
|
||||
err := mt.reloadFromKV()
|
||||
if err != nil {
|
||||
|
@ -125,7 +119,6 @@ func NewMetaTable(txn kv.TxnKV, snap kv.SnapShotKV) (*MetaTable, error) {
|
|||
}
|
||||
|
||||
func (mt *MetaTable) reloadFromKV() error {
|
||||
mt.proxyID2Meta = make(map[typeutil.UniqueID]pb.ProxyMeta)
|
||||
mt.collID2Meta = make(map[typeutil.UniqueID]pb.CollectionInfo)
|
||||
mt.collName2ID = make(map[string]typeutil.UniqueID)
|
||||
mt.collAlias2ID = make(map[string]typeutil.UniqueID)
|
||||
|
@ -133,25 +126,7 @@ func (mt *MetaTable) reloadFromKV() error {
|
|||
mt.segID2IndexMeta = make(map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo)
|
||||
mt.indexID2Meta = make(map[typeutil.UniqueID]pb.IndexInfo)
|
||||
|
||||
_, values, err := mt.txn.LoadWithPrefix(ProxyMetaPrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, value := range values {
|
||||
if bytes.Equal([]byte(value), suffixSnapshotTombstone) {
|
||||
// backward compatibility, IndexMeta used to be in SnapshotKV
|
||||
continue
|
||||
}
|
||||
proxyMeta := pb.ProxyMeta{}
|
||||
err = proto.Unmarshal([]byte(value), &proxyMeta)
|
||||
if err != nil {
|
||||
return fmt.Errorf("rootcoord Unmarshal pb.ProxyMeta err:%w", err)
|
||||
}
|
||||
mt.proxyID2Meta[proxyMeta.ID] = proxyMeta
|
||||
}
|
||||
|
||||
_, values, err = mt.snapshot.LoadWithPrefix(CollectionAliasMetaPrefix, 0)
|
||||
_, values, err := mt.snapshot.LoadWithPrefix(CollectionAliasMetaPrefix, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -239,27 +214,6 @@ func (mt *MetaTable) reloadFromKV() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// AddProxy add proxy
|
||||
func (mt *MetaTable) AddProxy(po *pb.ProxyMeta) error {
|
||||
mt.proxyLock.Lock()
|
||||
defer mt.proxyLock.Unlock()
|
||||
|
||||
k := fmt.Sprintf("%s/%d", ProxyMetaPrefix, po.ID)
|
||||
v, err := proto.Marshal(po)
|
||||
if err != nil {
|
||||
log.Error("Failed to marshal ProxyMeta in AddProxy", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
err = mt.txn.Save(k, string(v))
|
||||
if err != nil {
|
||||
log.Error("SnapShotKV Save fail", zap.Error(err))
|
||||
panic("SnapShotKV Save fail")
|
||||
}
|
||||
mt.proxyID2Meta[po.ID] = *po
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddCollection add collection
|
||||
func (mt *MetaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestamp, idx []*pb.IndexInfo, ddOpStr string) error {
|
||||
mt.ddLock.Lock()
|
||||
|
@ -849,12 +803,12 @@ func (mt *MetaTable) MarkIndexDeleted(collName, fieldName, indexName string) err
|
|||
mt.ddLock.Lock()
|
||||
defer mt.ddLock.Unlock()
|
||||
|
||||
collMeta, err := mt.unlockGetCollectionInfo(collName)
|
||||
collMeta, err := mt.getCollectionInfoInternal(collName)
|
||||
if err != nil {
|
||||
log.Error("get collection meta failed", zap.String("collName", collName), zap.Error(err))
|
||||
return fmt.Errorf("collection name = %s not has meta", collName)
|
||||
}
|
||||
fieldSch, err := mt.unlockGetFieldSchema(collName, fieldName)
|
||||
fieldSch, err := mt.getFieldSchemaInternal(collName, fieldName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -924,7 +878,7 @@ func (mt *MetaTable) DropIndex(collName, fieldName, indexName string) (typeutil.
|
|||
if !ok {
|
||||
return 0, false, fmt.Errorf("collection name = %s not has meta", collName)
|
||||
}
|
||||
fieldSch, err := mt.unlockGetFieldSchema(collName, fieldName)
|
||||
fieldSch, err := mt.getFieldSchemaInternal(collName, fieldName)
|
||||
if err != nil {
|
||||
return 0, false, err
|
||||
}
|
||||
|
@ -991,7 +945,7 @@ func (mt *MetaTable) GetInitBuildIDs(collName, indexName string) ([]UniqueID, er
|
|||
mt.ddLock.RLock()
|
||||
defer mt.ddLock.RUnlock()
|
||||
|
||||
collMeta, err := mt.unlockGetCollectionInfo(collName)
|
||||
collMeta, err := mt.getCollectionInfoInternal(collName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1230,10 +1184,10 @@ func (mt *MetaTable) GetFieldSchema(collName string, fieldName string) (schemapb
|
|||
mt.ddLock.RLock()
|
||||
defer mt.ddLock.RUnlock()
|
||||
|
||||
return mt.unlockGetFieldSchema(collName, fieldName)
|
||||
return mt.getFieldSchemaInternal(collName, fieldName)
|
||||
}
|
||||
|
||||
func (mt *MetaTable) unlockGetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) {
|
||||
func (mt *MetaTable) getFieldSchemaInternal(collName string, fieldName string) (schemapb.FieldSchema, error) {
|
||||
collID, ok := mt.collName2ID[collName]
|
||||
if !ok {
|
||||
collID, ok = mt.collAlias2ID[collName]
|
||||
|
@ -1258,10 +1212,10 @@ func (mt *MetaTable) unlockGetFieldSchema(collName string, fieldName string) (sc
|
|||
func (mt *MetaTable) IsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool {
|
||||
mt.ddLock.RLock()
|
||||
defer mt.ddLock.RUnlock()
|
||||
return mt.unlockIsSegmentIndexed(segID, fieldSchema, indexParams)
|
||||
return mt.isSegmentIndexedInternal(segID, fieldSchema, indexParams)
|
||||
}
|
||||
|
||||
func (mt *MetaTable) unlockIsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool {
|
||||
func (mt *MetaTable) isSegmentIndexedInternal(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool {
|
||||
segIdx, ok := mt.segID2IndexMeta[segID]
|
||||
if !ok {
|
||||
return false
|
||||
|
@ -1283,7 +1237,7 @@ func (mt *MetaTable) unlockIsSegmentIndexed(segID typeutil.UniqueID, fieldSchema
|
|||
return exist
|
||||
}
|
||||
|
||||
func (mt *MetaTable) unlockGetCollectionInfo(collName string) (pb.CollectionInfo, error) {
|
||||
func (mt *MetaTable) getCollectionInfoInternal(collName string) (pb.CollectionInfo, error) {
|
||||
collID, ok := mt.collName2ID[collName]
|
||||
if !ok {
|
||||
collID, ok = mt.collAlias2ID[collName]
|
||||
|
@ -1344,17 +1298,18 @@ func (mt *MetaTable) checkFieldIndexDuplicate(collMeta pb.CollectionInfo, fieldS
|
|||
}
|
||||
|
||||
// GetNotIndexedSegments return segment ids which have no index
|
||||
// TODO, split GetNotIndexedSegments into two calls, one is to update CollectionMetaPrefix, IndexMetaPrefix, the otherone is trigger index build task
|
||||
func (mt *MetaTable) GetNotIndexedSegments(collName string, fieldName string, idxInfo *pb.IndexInfo, segIDs []typeutil.UniqueID) ([]typeutil.UniqueID, schemapb.FieldSchema, error) {
|
||||
mt.ddLock.Lock()
|
||||
defer mt.ddLock.Unlock()
|
||||
|
||||
collMeta, err := mt.unlockGetCollectionInfo(collName)
|
||||
collMeta, err := mt.getCollectionInfoInternal(collName)
|
||||
if err != nil {
|
||||
// error here if collection not found.
|
||||
return nil, schemapb.FieldSchema{}, err
|
||||
}
|
||||
|
||||
fieldSchema, err := mt.unlockGetFieldSchema(collName, fieldName)
|
||||
fieldSchema, err := mt.getFieldSchemaInternal(collName, fieldName)
|
||||
if err != nil {
|
||||
// error here if field not found.
|
||||
return nil, fieldSchema, err
|
||||
|
@ -1421,7 +1376,7 @@ func (mt *MetaTable) GetNotIndexedSegments(collName string, fieldName string, id
|
|||
|
||||
rstID := make([]typeutil.UniqueID, 0, 16)
|
||||
for _, segID := range segIDs {
|
||||
if exist := mt.unlockIsSegmentIndexed(segID, &fieldSchema, idxInfo.IndexParams); !exist {
|
||||
if exist := mt.isSegmentIndexedInternal(segID, &fieldSchema, idxInfo.IndexParams); !exist {
|
||||
rstID = append(rstID, segID)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,7 +27,6 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
memkv "github.com/milvus-io/milvus/internal/kv/mem"
|
||||
|
@ -102,84 +101,6 @@ func (m *mockTestTxnKV) MultiRemove(keys []string) error {
|
|||
return m.multiRemove(keys)
|
||||
}
|
||||
|
||||
func Test_MockKV(t *testing.T) {
|
||||
k1 := &mockTestKV{}
|
||||
kt := &mockTestTxnKV{}
|
||||
prefix := make(map[string][]string)
|
||||
k1.loadWithPrefix = func(key string, ts typeutil.Timestamp) ([]string, []string, error) {
|
||||
if val, ok := prefix[key]; ok {
|
||||
return nil, val, nil
|
||||
}
|
||||
return nil, nil, fmt.Errorf("load prefix error")
|
||||
}
|
||||
kt.loadWithPrefix = func(key string) ([]string, []string, error) {
|
||||
if val, ok := prefix[key]; ok {
|
||||
return nil, val, nil
|
||||
}
|
||||
return nil, nil, fmt.Errorf("load prefix error")
|
||||
}
|
||||
|
||||
_, err := NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, "load prefix error")
|
||||
|
||||
// proxy
|
||||
prefix[ProxyMetaPrefix] = []string{"porxy-meta"}
|
||||
_, err = NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
value, err := proto.Marshal(&pb.ProxyMeta{})
|
||||
assert.Nil(t, err)
|
||||
prefix[ProxyMetaPrefix] = []string{string(value)}
|
||||
_, err = NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
// collection
|
||||
prefix[CollectionMetaPrefix] = []string{"collection-meta"}
|
||||
_, err = NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
value, err = proto.Marshal(&pb.CollectionInfo{Schema: &schemapb.CollectionSchema{}})
|
||||
assert.Nil(t, err)
|
||||
prefix[CollectionMetaPrefix] = []string{string(value)}
|
||||
_, err = NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
// segment index
|
||||
prefix[SegmentIndexMetaPrefix] = []string{"segment-index-meta"}
|
||||
_, err = NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
value, err = proto.Marshal(&pb.SegmentIndexInfo{})
|
||||
assert.Nil(t, err)
|
||||
prefix[SegmentIndexMetaPrefix] = []string{string(value)}
|
||||
_, err = NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
prefix[SegmentIndexMetaPrefix] = []string{string(value), string(value)}
|
||||
_, err = NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, "load prefix error")
|
||||
|
||||
// index
|
||||
prefix[IndexMetaPrefix] = []string{"index-meta"}
|
||||
_, err = NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
value, err = proto.Marshal(&pb.IndexInfo{})
|
||||
assert.Nil(t, err)
|
||||
prefix[IndexMetaPrefix] = []string{string(value)}
|
||||
m1, err := NewMetaTable(kt, k1)
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, "load prefix error")
|
||||
prefix[CollectionAliasMetaPrefix] = []string{"alias-meta"}
|
||||
|
||||
k1.save = func(key string, value string, ts typeutil.Timestamp) error {
|
||||
return fmt.Errorf("save proxy error")
|
||||
}
|
||||
assert.Panics(t, func() { m1.AddProxy(&pb.ProxyMeta{}) })
|
||||
}
|
||||
|
||||
func TestMetaTable(t *testing.T) {
|
||||
const (
|
||||
collName = "testColl"
|
||||
|
@ -491,19 +412,6 @@ func TestMetaTable(t *testing.T) {
|
|||
assert.Zero(t, len(idx))
|
||||
})
|
||||
|
||||
wg.Add(1)
|
||||
t.Run("reload meta", func(t *testing.T) {
|
||||
defer wg.Done()
|
||||
po := pb.ProxyMeta{
|
||||
ID: 101,
|
||||
}
|
||||
err = mt.AddProxy(&po)
|
||||
assert.Nil(t, err)
|
||||
|
||||
_, err = NewMetaTable(txnKV, skv)
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
wg.Add(1)
|
||||
t.Run("drop index", func(t *testing.T) {
|
||||
defer wg.Done()
|
||||
|
@ -964,12 +872,12 @@ func TestMetaTable(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
|
||||
mt.collID2Meta = make(map[int64]pb.CollectionInfo)
|
||||
_, err = mt.unlockGetFieldSchema(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name)
|
||||
_, err = mt.getFieldSchemaInternal(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name)
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, fmt.Sprintf("collection %s not found", collInfo.Schema.Name))
|
||||
|
||||
mt.collName2ID = make(map[string]int64)
|
||||
_, err = mt.unlockGetFieldSchema(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name)
|
||||
_, err = mt.getFieldSchemaInternal(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name)
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualError(t, err, fmt.Sprintf("collection %s not found", collInfo.Schema.Name))
|
||||
})
|
||||
|
@ -1318,7 +1226,6 @@ func TestFixIssue10540(t *testing.T) {
|
|||
//txnKV := etcdkv.NewEtcdKVWithClient(etcdCli, rootPath)
|
||||
txnKV := memkv.NewMemoryKV()
|
||||
// compose rc7 legace tombstone cases
|
||||
txnKV.Save(path.Join(ProxyMetaPrefix, "1"), string(suffixSnapshotTombstone))
|
||||
txnKV.Save(path.Join(SegmentIndexMetaPrefix, "2"), string(suffixSnapshotTombstone))
|
||||
txnKV.Save(path.Join(IndexMetaPrefix, "3"), string(suffixSnapshotTombstone))
|
||||
|
||||
|
@ -1367,7 +1274,7 @@ func TestMetaTable_unlockGetCollectionInfo(t *testing.T) {
|
|||
100: {ID: 100, Schema: &schemapb.CollectionSchema{Name: "test"}},
|
||||
},
|
||||
}
|
||||
info, err := mt.unlockGetCollectionInfo("test")
|
||||
info, err := mt.getCollectionInfoInternal("test")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, UniqueID(100), info.ID)
|
||||
assert.Equal(t, "test", info.GetSchema().GetName())
|
||||
|
@ -1375,7 +1282,7 @@ func TestMetaTable_unlockGetCollectionInfo(t *testing.T) {
|
|||
|
||||
t.Run("collection name not found", func(t *testing.T) {
|
||||
mt := &MetaTable{collName2ID: nil, collAlias2ID: nil}
|
||||
_, err := mt.unlockGetCollectionInfo("test")
|
||||
_, err := mt.getCollectionInfoInternal("test")
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
|
@ -1385,7 +1292,7 @@ func TestMetaTable_unlockGetCollectionInfo(t *testing.T) {
|
|||
collAlias2ID: nil,
|
||||
collID2Meta: nil,
|
||||
}
|
||||
_, err := mt.unlockGetCollectionInfo("test")
|
||||
_, err := mt.getCollectionInfoInternal("test")
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
|
@ -1395,7 +1302,7 @@ func TestMetaTable_unlockGetCollectionInfo(t *testing.T) {
|
|||
collAlias2ID: map[string]typeutil.UniqueID{"test": 100},
|
||||
collID2Meta: nil,
|
||||
}
|
||||
_, err := mt.unlockGetCollectionInfo("test")
|
||||
_, err := mt.getCollectionInfoInternal("test")
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue