Fix meta cache error

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/4973/head^2
godchen 2021-02-04 15:54:16 +08:00 committed by yefu.chen
parent 55634dc5f3
commit d6200a5196
1 changed files with 46 additions and 35 deletions

View File

@ -20,20 +20,19 @@ type Cache interface {
GetPartitionID(collectionName string, partitionName string) (typeutil.UniqueID, error)
GetCollectionSchema(collectionName string) (*schemapb.CollectionSchema, error)
RemoveCollection(collectionName string)
RemovePartition(partitionName string)
RemovePartition(collectionName string, partitionName string)
}
type collectionInfo struct {
collID typeutil.UniqueID
schema *schemapb.CollectionSchema
collID typeutil.UniqueID
schema *schemapb.CollectionSchema
partInfo map[string]typeutil.UniqueID
}
type MetaCache struct {
client MasterClientInterface
collInfo map[string]*collectionInfo
partInfo map[string]typeutil.UniqueID
col2par map[string][]string
mu sync.RWMutex
}
@ -52,8 +51,6 @@ func NewMetaCache(client MasterClientInterface) (*MetaCache, error) {
return &MetaCache{
client: client,
collInfo: map[string]*collectionInfo{},
partInfo: map[string]typeutil.UniqueID{},
col2par: map[string][]string{},
}, nil
}
@ -79,11 +76,16 @@ func (m *MetaCache) readCollectionSchema(collectionName string) (*schemapb.Colle
return collInfo.schema, nil
}
func (m *MetaCache) readPartitionID(partitionName string) (typeutil.UniqueID, error) {
func (m *MetaCache) readPartitionID(collectionName string, partitionName string) (typeutil.UniqueID, error) {
m.mu.RLock()
defer m.mu.RUnlock()
partitionID, ok := m.partInfo[partitionName]
collInfo, ok := m.collInfo[collectionName]
if !ok {
return 0, errors.Errorf("can't find collection name:%s", collectionName)
}
partitionID, ok := collInfo.partInfo[partitionName]
if !ok {
return 0, errors.Errorf("can't find partition name:%s", partitionName)
}
@ -112,15 +114,14 @@ func (m *MetaCache) GetCollectionID(collectionName string) (typeutil.UniqueID, e
return 0, errors.Errorf("%s", coll.Status.Reason)
}
collInfo := &collectionInfo{
collID: coll.CollectionID,
schema: coll.Schema,
}
_, ok := m.collInfo[collectionName]
if !ok {
m.collInfo[collectionName] = collInfo
m.collInfo[collectionName] = &collectionInfo{}
}
return collInfo.collID, nil
m.collInfo[collectionName].schema = coll.Schema
m.collInfo[collectionName].collID = coll.CollectionID
return m.collInfo[collectionName].collID, nil
}
func (m *MetaCache) GetCollectionSchema(collectionName string) (*schemapb.CollectionSchema, error) {
collSchema, err := m.readCollectionSchema(collectionName)
@ -144,19 +145,18 @@ func (m *MetaCache) GetCollectionSchema(collectionName string) (*schemapb.Collec
return nil, errors.Errorf("%s", coll.Status.Reason)
}
collInfo := &collectionInfo{
collID: coll.CollectionID,
schema: coll.Schema,
}
_, ok := m.collInfo[collectionName]
if !ok {
m.collInfo[collectionName] = collInfo
m.collInfo[collectionName] = &collectionInfo{}
}
return collInfo.schema, nil
m.collInfo[collectionName].schema = coll.Schema
m.collInfo[collectionName].collID = coll.CollectionID
return m.collInfo[collectionName].schema, nil
}
func (m *MetaCache) GetPartitionID(collectionName string, partitionName string) (typeutil.UniqueID, error) {
partitionID, err := m.readPartitionID(partitionName)
partitionID, err := m.readPartitionID(collectionName, partitionName)
if err == nil {
return partitionID, nil
}
@ -180,34 +180,45 @@ func (m *MetaCache) GetPartitionID(collectionName string, partitionName string)
return 0, errors.Errorf("partition ids len: %d doesn't equal Partition name len %d",
len(partitions.PartitionIDs), len(partitions.PartitionNames))
}
m.col2par[collectionName] = partitions.PartitionNames
for i := 0; i < len(partitions.PartitionIDs); i++ {
_, ok := m.partInfo[partitions.PartitionNames[i]]
if !ok {
m.partInfo[partitions.PartitionNames[i]] = partitions.PartitionIDs[i]
_, ok := m.collInfo[collectionName]
if !ok {
m.collInfo[collectionName] = &collectionInfo{
partInfo: map[string]typeutil.UniqueID{},
}
}
_, ok := m.partInfo[partitionName]
partInfo := m.collInfo[collectionName].partInfo
for i := 0; i < len(partitions.PartitionIDs); i++ {
_, ok := partInfo[partitions.PartitionNames[i]]
if !ok {
partInfo[partitions.PartitionNames[i]] = partitions.PartitionIDs[i]
}
}
_, ok = partInfo[partitionName]
if !ok {
return 0, errors.Errorf("partitionID of partitionName:%s can not be find", partitionName)
}
return m.partInfo[partitionName], nil
return partInfo[partitionName], nil
}
func (m *MetaCache) RemoveCollection(collectionName string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.collInfo, collectionName)
for _, partitionName := range m.col2par[collectionName] {
delete(m.partInfo, partitionName)
}
delete(m.col2par, collectionName)
}
func (m *MetaCache) RemovePartition(partitionName string) {
func (m *MetaCache) RemovePartition(collectionName, partitionName string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.partInfo, partitionName)
_, ok := m.collInfo[collectionName]
if !ok {
return
}
partInfo := m.collInfo[collectionName].partInfo
if partInfo == nil {
return
}
delete(partInfo, partitionName)
}