mirror of https://github.com/milvus-io/milvus.git
Cherry-pick from master pr: #32505 See also #32165 Change `LeaderViewFilter` to interface to provided map key to avoid iterating all key-values in LeaderViewManager Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/32581/head
parent
1aba9eb616
commit
e1b4ef7451
|
@ -24,44 +24,86 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
)
|
||||
|
||||
type LeaderViewFilter = func(view *LeaderView) bool
|
||||
|
||||
func WithChannelName2LeaderView(channelName string) LeaderViewFilter {
|
||||
return func(view *LeaderView) bool {
|
||||
return view.Channel == channelName
|
||||
}
|
||||
type LeaderViewFilter interface {
|
||||
Match(*LeaderView) bool
|
||||
Node() (int64, bool)
|
||||
ChannelName() (string, bool)
|
||||
}
|
||||
|
||||
func WithCollectionID2LeaderView(collectionID int64) LeaderViewFilter {
|
||||
return func(view *LeaderView) bool {
|
||||
return view.CollectionID == collectionID
|
||||
}
|
||||
type lvFilterFunc func(view *LeaderView) bool
|
||||
|
||||
func (f lvFilterFunc) Match(view *LeaderView) bool {
|
||||
return f(view)
|
||||
}
|
||||
|
||||
func (f lvFilterFunc) Node() (int64, bool) {
|
||||
return -1, false
|
||||
}
|
||||
|
||||
func (f lvFilterFunc) ChannelName() (string, bool) {
|
||||
return "", false
|
||||
}
|
||||
|
||||
type lvChannelNameFilter string
|
||||
|
||||
func (f lvChannelNameFilter) Match(v *LeaderView) bool {
|
||||
return v.Channel == string(f)
|
||||
}
|
||||
|
||||
func (f lvChannelNameFilter) Node() (int64, bool) {
|
||||
return -1, false
|
||||
}
|
||||
|
||||
func (f lvChannelNameFilter) ChannelName() (string, bool) {
|
||||
return string(f), true
|
||||
}
|
||||
|
||||
type lvNodeFilter int64
|
||||
|
||||
func (f lvNodeFilter) Match(v *LeaderView) bool {
|
||||
return v.ID == int64(f)
|
||||
}
|
||||
|
||||
func (f lvNodeFilter) Node() (int64, bool) {
|
||||
return int64(f), true
|
||||
}
|
||||
|
||||
func (f lvNodeFilter) ChannelName() (string, bool) {
|
||||
return "", false
|
||||
}
|
||||
|
||||
func WithNodeID2LeaderView(nodeID int64) LeaderViewFilter {
|
||||
return func(view *LeaderView) bool {
|
||||
return view.ID == nodeID
|
||||
}
|
||||
return lvNodeFilter(nodeID)
|
||||
}
|
||||
|
||||
func WithChannelName2LeaderView(channelName string) LeaderViewFilter {
|
||||
return lvChannelNameFilter(channelName)
|
||||
}
|
||||
|
||||
func WithCollectionID2LeaderView(collectionID int64) LeaderViewFilter {
|
||||
return lvFilterFunc(func(view *LeaderView) bool {
|
||||
return view.CollectionID == collectionID
|
||||
})
|
||||
}
|
||||
|
||||
func WithReplica2LeaderView(replica *Replica) LeaderViewFilter {
|
||||
return func(view *LeaderView) bool {
|
||||
return lvFilterFunc(func(view *LeaderView) bool {
|
||||
if replica == nil {
|
||||
return false
|
||||
}
|
||||
return replica.GetCollectionID() == view.CollectionID && replica.Contains(view.ID)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func WithSegment2LeaderView(segmentID int64, isGrowing bool) LeaderViewFilter {
|
||||
return func(view *LeaderView) bool {
|
||||
return lvFilterFunc(func(view *LeaderView) bool {
|
||||
if isGrowing {
|
||||
_, ok := view.GrowingSegments[segmentID]
|
||||
return ok
|
||||
}
|
||||
_, ok := view.Segments[segmentID]
|
||||
return ok
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
type LeaderView struct {
|
||||
|
@ -136,23 +178,56 @@ func (mgr *LeaderViewManager) GetByFilter(filters ...LeaderViewFilter) []*Leader
|
|||
}
|
||||
|
||||
func (mgr *LeaderViewManager) getByFilter(filters ...LeaderViewFilter) []*LeaderView {
|
||||
ret := make([]*LeaderView, 0)
|
||||
for _, viewsOnNode := range mgr.views {
|
||||
for _, view := range viewsOnNode {
|
||||
allMatch := true
|
||||
for _, fn := range filters {
|
||||
if fn != nil && !fn(view) {
|
||||
allMatch = false
|
||||
}
|
||||
}
|
||||
otherFilters := make([]LeaderViewFilter, 0, len(filters))
|
||||
var nodeID int64
|
||||
var channelName string
|
||||
var hasNodeID, hasChannelName bool
|
||||
|
||||
if allMatch {
|
||||
ret = append(ret, view)
|
||||
for _, filter := range filters {
|
||||
if node, ok := filter.Node(); ok {
|
||||
nodeID, hasNodeID = node, true
|
||||
continue
|
||||
}
|
||||
if channel, ok := filter.ChannelName(); ok {
|
||||
channelName, hasChannelName = channel, true
|
||||
continue
|
||||
}
|
||||
otherFilters = append(otherFilters, filter)
|
||||
}
|
||||
mergedFilter := func(view *LeaderView) bool {
|
||||
for _, filter := range otherFilters {
|
||||
if !filter.Match(view) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
var candidates []channelViews
|
||||
if hasNodeID {
|
||||
nodeView, ok := mgr.views[nodeID]
|
||||
if ok {
|
||||
candidates = append(candidates, nodeView)
|
||||
}
|
||||
} else {
|
||||
candidates = lo.Values(mgr.views)
|
||||
}
|
||||
|
||||
var result []*LeaderView
|
||||
for _, candidate := range candidates {
|
||||
if hasChannelName {
|
||||
if view, ok := candidate[channelName]; ok && mergedFilter(view) {
|
||||
result = append(result, view)
|
||||
}
|
||||
} else {
|
||||
for _, view := range candidate {
|
||||
if mergedFilter(view) {
|
||||
result = append(result, view)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret
|
||||
return result
|
||||
}
|
||||
|
||||
func (mgr *LeaderViewManager) GetLatestShardLeaderByFilter(filters ...LeaderViewFilter) *LeaderView {
|
||||
|
|
Loading…
Reference in New Issue