Fix error that different loadIndex messages have same indexParam

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/4973/head^2
xige-16 2021-01-09 15:57:55 +08:00 committed by yefu.chen
parent 02df310fa5
commit 004d0027b3
2 changed files with 66 additions and 1 deletions

View File

@ -101,8 +101,17 @@ func (lis *loadIndexService) start() {
}
// 1. use msg's index paths to get index bytes
fmt.Println("start load index")
var indexBuffer [][]byte
var err error
ok, err = lis.checkIndexReady(indexMsg)
if err != nil {
log.Println(err)
continue
}
if ok {
continue
}
var indexBuffer [][]byte
fn := func() error {
indexBuffer, err = lis.loadIndex(indexMsg.IndexPaths)
if err != nil {
@ -198,6 +207,7 @@ func (lis *loadIndexService) updateSegmentIndexStats(indexMsg *msgstream.LoadInd
fieldStatsKey := lis.fieldsStatsIDs2Key(targetSegment.collectionID, indexMsg.FieldID)
_, ok := lis.fieldIndexes[fieldStatsKey]
newIndexParams := indexMsg.IndexParams
// sort index params by key
sort.Slice(newIndexParams, func(i, j int) bool { return newIndexParams[i].Key < newIndexParams[j].Key })
if !ok {
@ -223,6 +233,7 @@ func (lis *loadIndexService) updateSegmentIndexStats(indexMsg *msgstream.LoadInd
})
}
}
targetSegment.setIndexParam(indexMsg.FieldID, indexMsg.IndexParams)
return nil
}
@ -294,3 +305,15 @@ func (lis *loadIndexService) sendQueryNodeStats() error {
fmt.Println("sent field stats")
return nil
}
func (lis *loadIndexService) checkIndexReady(loadIndexMsg *msgstream.LoadIndexMsg) (bool, error) {
segment, err := lis.replica.getSegmentByID(loadIndexMsg.SegmentID)
if err != nil {
return false, err
}
if !segment.matchIndexParam(loadIndexMsg.FieldID, loadIndexMsg.IndexParams) {
return false, nil
}
return true, nil
}

View File

@ -22,6 +22,8 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
)
type indexParam = map[string]string
type Segment struct {
segmentPtr C.CSegmentBase
segmentID UniqueID
@ -31,6 +33,8 @@ type Segment struct {
lastRowCount int64
mu sync.Mutex
recentlyModified bool
indexParam map[int64]indexParam
paramMutex sync.RWMutex
}
func (s *Segment) ID() UniqueID {
@ -55,12 +59,14 @@ func newSegment(collection *Collection, segmentID int64, partitionTag string, co
CSegmentBase
newSegment(CPartition partition, unsigned long segment_id);
*/
initIndexParam := make(map[int64]indexParam)
segmentPtr := C.NewSegment(collection.collectionPtr, C.ulong(segmentID))
var newSegment = &Segment{
segmentPtr: segmentPtr,
segmentID: segmentID,
partitionTag: partitionTag,
collectionID: collectionID,
indexParam: initIndexParam,
}
return newSegment
@ -270,3 +276,39 @@ func (s *Segment) updateSegmentIndex(loadIndexInfo *LoadIndexInfo) error {
return nil
}
func (s *Segment) setIndexParam(fieldID int64, indexParamKv []*commonpb.KeyValuePair) error {
s.paramMutex.Lock()
defer s.paramMutex.Unlock()
indexParamMap := make(indexParam)
if indexParamKv == nil {
return errors.New("loadIndexMsg's indexParam empty")
}
for _, param := range indexParamKv {
indexParamMap[param.Key] = param.Value
}
s.indexParam[fieldID] = indexParamMap
return nil
}
func (s *Segment) matchIndexParam(fieldID int64, indexParamKv []*commonpb.KeyValuePair) bool {
s.paramMutex.RLock()
defer s.paramMutex.RUnlock()
fieldIndexParam := s.indexParam[fieldID]
if fieldIndexParam == nil {
return false
}
paramSize := len(s.indexParam)
matchCount := 0
for _, param := range indexParamKv {
value, ok := fieldIndexParam[param.Key]
if !ok {
return false
}
if param.Value != value {
return false
}
matchCount++
}
return paramSize == matchCount
}