Update excluded segments info when receive changeInfo (#10011)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/10011/merge
bigsheeper 2021-10-16 17:00:33 +08:00 committed by GitHub
parent af07691394
commit 4c420dac21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 25 additions and 25 deletions

View File

@ -75,9 +75,8 @@ type ReplicaInterface interface {
getSegmentStatistics() []*internalpb.SegmentStats
// excluded segments
initExcludedSegments(collectionID UniqueID)
removeExcludedSegments(collectionID UniqueID)
addExcludedSegments(collectionID UniqueID, segmentInfos []*datapb.SegmentInfo) error
addExcludedSegments(collectionID UniqueID, segmentInfos []*datapb.SegmentInfo)
getExcludedSegments(collectionID UniqueID) ([]*datapb.SegmentInfo, error)
getSegmentsMemSize() int64
@ -504,15 +503,6 @@ func (colReplica *collectionReplica) getSegmentStatistics() []*internalpb.Segmen
return statisticData
}
func (colReplica *collectionReplica) initExcludedSegments(collectionID UniqueID) {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
if _, ok := colReplica.excludedSegments[collectionID]; !ok {
colReplica.excludedSegments[collectionID] = make([]*datapb.SegmentInfo, 0)
}
}
func (colReplica *collectionReplica) removeExcludedSegments(collectionID UniqueID) {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
@ -520,16 +510,15 @@ func (colReplica *collectionReplica) removeExcludedSegments(collectionID UniqueI
delete(colReplica.excludedSegments, collectionID)
}
func (colReplica *collectionReplica) addExcludedSegments(collectionID UniqueID, segmentInfos []*datapb.SegmentInfo) error {
func (colReplica *collectionReplica) addExcludedSegments(collectionID UniqueID, segmentInfos []*datapb.SegmentInfo) {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
if _, ok := colReplica.excludedSegments[collectionID]; !ok {
return errors.New("addExcludedSegments failed, cannot found collection, id =" + fmt.Sprintln(collectionID))
colReplica.excludedSegments[collectionID] = make([]*datapb.SegmentInfo, 0)
}
colReplica.excludedSegments[collectionID] = append(colReplica.excludedSegments[collectionID], segmentInfos...)
return nil
}
func (colReplica *collectionReplica) getExcludedSegments(collectionID UniqueID) ([]*datapb.SegmentInfo, error) {

View File

@ -29,8 +29,8 @@ func getFilterDMNode(ctx context.Context) (*filterDmNode, error) {
if err != nil {
return nil, err
}
streaming.replica.initExcludedSegments(defaultCollectionID)
streaming.replica.addExcludedSegments(defaultCollectionID, nil)
return newFilteredDmNode(streaming.replica, loadTypeCollection, defaultCollectionID, defaultPartitionID), nil
}
@ -130,7 +130,7 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) {
assert.NoError(t, err)
fg, err := getFilterDMNode(ctx)
assert.NoError(t, err)
err = fg.replica.addExcludedSegments(defaultCollectionID, []*datapb.SegmentInfo{
fg.replica.addExcludedSegments(defaultCollectionID, []*datapb.SegmentInfo{
{
ID: defaultSegmentID,
CollectionID: defaultCollectionID,
@ -140,7 +140,6 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) {
},
},
})
assert.NoError(t, err)
res := fg.filterInvalidInsertMessage(msg)
assert.Nil(t, res)
})

View File

@ -20,13 +20,14 @@ import (
"sync"
"unsafe"
"github.com/golang/protobuf/proto"
oplog "github.com/opentracing/opentracing-go/log"
"go.uber.org/zap"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
@ -317,7 +318,23 @@ func (q *queryCollection) adjustByChangeInfo(msg *msgstream.SealedSegmentsChange
if err != nil {
return err
}
// 2. delete growing segment because these segments are loaded
// 2. update excluded segment, cluster have been loaded sealed segments,
// so we need to avoid getting growing segment from flow graph.
q.streaming.replica.addExcludedSegments(segment.CollectionID, []*datapb.SegmentInfo{
{
ID: segment.SegmentID,
CollectionID: segment.CollectionID,
PartitionID: segment.PartitionID,
InsertChannel: segment.ChannelID,
NumOfRows: segment.NumRows,
// TODO: add status, remove query pb segment status, use common pb segment status?
DmlPosition: &internalpb.MsgPosition{
// use max timestamp to filter out dm messages
Timestamp: math.MaxInt64,
},
},
})
// 3. delete growing segment because these segments are loaded
hasGrowingSegment := q.streaming.replica.hasSegment(segment.SegmentID)
if hasGrowingSegment {
err = q.streaming.replica.removeSegment(segment.SegmentID)

View File

@ -146,7 +146,6 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
return err
}
}
w.node.streaming.replica.initExcludedSegments(collectionID)
if hasCollectionInHistorical := w.node.historical.replica.hasCollection(collectionID); !hasCollectionInHistorical {
err := w.node.historical.replica.addCollection(collectionID, w.req.Schema)
if err != nil {
@ -216,11 +215,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
for _, info := range w.req.Infos {
checkPointInfos = append(checkPointInfos, info.UnflushedSegments...)
}
err = w.node.streaming.replica.addExcludedSegments(collectionID, checkPointInfos)
if err != nil {
log.Warn(err.Error())
return err
}
w.node.streaming.replica.addExcludedSegments(collectionID, checkPointInfos)
log.Debug("watchDMChannel, add check points info done", zap.Any("collectionID", collectionID))
// create tSafe