mirror of https://github.com/milvus-io/milvus.git
fix: Skip to load l0 segment on old version query node (#32131)
issue: #32107 pr: #32124 during rolling upgrade progress, skip to load l0 segment on old version query node --------- Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/32251/head
parent
b68af208bc
commit
e50599ba10
|
@ -20,6 +20,7 @@ import (
|
|||
"fmt"
|
||||
"sort"
|
||||
|
||||
"github.com/blang/semver/v4"
|
||||
"github.com/samber/lo"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
|
@ -103,9 +104,12 @@ func (b *RoundRobinBalancer) AssignSegment(collectionID int64, segments []*meta.
|
|||
func (b *RoundRobinBalancer) AssignChannel(channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
|
||||
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
|
||||
if !manualBalance {
|
||||
versionRangeFilter := semver.MustParseRange(">2.3.x")
|
||||
nodes = lo.Filter(nodes, func(node int64, _ int) bool {
|
||||
info := b.nodeManager.Get(node)
|
||||
return info != nil && info.GetState() == session.NodeStateNormal
|
||||
// balance channel to qn with version < 2.4 is not allowed since l0 segment supported
|
||||
// if watch channel on qn with version < 2.4, it may cause delete data loss
|
||||
return info != nil && info.GetState() == session.NodeStateNormal && versionRangeFilter(info.Version())
|
||||
})
|
||||
}
|
||||
nodesInfo := b.getNodes(nodes)
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
)
|
||||
|
||||
type BalanceTestSuite struct {
|
||||
|
@ -153,6 +154,7 @@ func (suite *BalanceTestSuite) TestAssignChannel() {
|
|||
NodeID: c.nodeIDs[i],
|
||||
Address: "127.0.0.1:0",
|
||||
Hostname: "localhost",
|
||||
Version: common.Version,
|
||||
})
|
||||
nodeInfo.UpdateStats(session.WithChannelCnt(c.channelCnts[i]))
|
||||
nodeInfo.SetState(c.states[i])
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"math"
|
||||
"sort"
|
||||
|
||||
"github.com/blang/semver/v4"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
|
@ -86,9 +87,12 @@ func (b *RowCountBasedBalancer) AssignSegment(collectionID int64, segments []*me
|
|||
func (b *RowCountBasedBalancer) AssignChannel(channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
|
||||
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
|
||||
if !manualBalance {
|
||||
versionRangeFilter := semver.MustParseRange(">2.3.x")
|
||||
nodes = lo.Filter(nodes, func(node int64, _ int) bool {
|
||||
info := b.nodeManager.Get(node)
|
||||
return info != nil && info.GetState() == session.NodeStateNormal
|
||||
// balance channel to qn with version < 2.4 is not allowed since l0 segment supported
|
||||
// if watch channel on qn with version < 2.4, it may cause delete data loss
|
||||
return info != nil && info.GetState() == session.NodeStateNormal && versionRangeFilter(info.Version())
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/etcd"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
|
@ -472,6 +473,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
|
|||
NodeID: c.nodes[i],
|
||||
Address: "127.0.0.1:0",
|
||||
Hostname: "localhost",
|
||||
Version: common.Version,
|
||||
})
|
||||
nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i]))
|
||||
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
|
||||
|
@ -683,6 +685,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnPartStopping() {
|
|||
NodeID: c.nodes[i],
|
||||
Address: "127.0.0.1:0",
|
||||
Hostname: "localhost",
|
||||
Version: common.Version,
|
||||
})
|
||||
nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i]))
|
||||
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
|
||||
|
@ -824,6 +827,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() {
|
|||
NodeID: c.nodes[i],
|
||||
Address: "127.0.0.1:0",
|
||||
Hostname: "localhost",
|
||||
Version: common.Version,
|
||||
})
|
||||
nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i]))
|
||||
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
|
||||
|
@ -1057,6 +1061,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestDisableBalanceChannel() {
|
|||
NodeID: c.nodes[i],
|
||||
Address: "127.0.0.1:0",
|
||||
Hostname: "localhost",
|
||||
Version: common.Version,
|
||||
})
|
||||
nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i]))
|
||||
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
|
||||
|
@ -1187,6 +1192,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestMultiReplicaBalance() {
|
|||
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: nodes[i],
|
||||
Address: "127.0.0.1:0",
|
||||
Version: common.Version,
|
||||
})
|
||||
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.channelDist[nodes[i]])))
|
||||
nodeInfo.SetState(c.states[i])
|
||||
|
|
|
@ -32,6 +32,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/etcd"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
@ -812,6 +813,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestMultiReplicaBalance() {
|
|||
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: nodes[i],
|
||||
Address: "127.0.0.1:0",
|
||||
Version: common.Version,
|
||||
})
|
||||
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.channelDist[nodes[i]])))
|
||||
nodeInfo.SetState(c.states[i])
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/blang/semver/v4"
|
||||
"github.com/samber/lo"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
|
@ -212,38 +213,58 @@ func (c *SegmentChecker) getSealedSegmentDiff(
|
|||
distMap[s.GetID()] = s.Node
|
||||
}
|
||||
|
||||
versionRangeFilter := semver.MustParseRange(">2.3.x")
|
||||
checkLeaderVersion := func(leader *meta.LeaderView, segmentID int64) bool {
|
||||
// if current shard leader's node version < 2.4, skip load L0 segment
|
||||
info := c.nodeMgr.Get(leader.ID)
|
||||
if info != nil && !versionRangeFilter(info.Version()) {
|
||||
log.Warn("l0 segment is not supported in current node version, skip it",
|
||||
zap.Int64("collection", replica.GetCollectionID()),
|
||||
zap.Int64("segmentID", segmentID),
|
||||
zap.String("channel", leader.Channel),
|
||||
zap.Int64("leaderID", leader.ID),
|
||||
zap.String("nodeVersion", info.Version().String()))
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
isSegmentLack := func(segment *datapb.SegmentInfo) bool {
|
||||
node, existInDist := distMap[segment.ID]
|
||||
|
||||
if segment.GetLevel() == datapb.SegmentLevel_L0 {
|
||||
// the L0 segments have to been in the same node as the channel watched
|
||||
leader := c.dist.LeaderViewManager.GetLatestShardLeaderByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(segment.GetInsertChannel()))
|
||||
|
||||
// if the leader node's version doesn't match load l0 segment's requirement, skip it
|
||||
if leader != nil && checkLeaderVersion(leader, segment.ID) {
|
||||
l0WithWrongLocation := node != leader.ID
|
||||
return !existInDist || l0WithWrongLocation
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
return !existInDist
|
||||
}
|
||||
|
||||
nextTargetMap := c.targetMgr.GetSealedSegmentsByCollection(collectionID, meta.NextTarget)
|
||||
currentTargetMap := c.targetMgr.GetSealedSegmentsByCollection(collectionID, meta.CurrentTarget)
|
||||
|
||||
// Segment which exist on next target, but not on dist
|
||||
for segmentID, segment := range nextTargetMap {
|
||||
node, existInDist := distMap[segmentID]
|
||||
l0WithWrongLocation := false
|
||||
if existInDist && segment.GetLevel() == datapb.SegmentLevel_L0 {
|
||||
// the L0 segments have to been in the same node as the channel watched
|
||||
leader := c.dist.LeaderViewManager.GetLatestShardLeaderByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(segment.GetInsertChannel()))
|
||||
l0WithWrongLocation = leader != nil && node != leader.ID
|
||||
}
|
||||
if !existInDist || l0WithWrongLocation {
|
||||
for _, segment := range nextTargetMap {
|
||||
if isSegmentLack(segment) {
|
||||
toLoad = append(toLoad, segment)
|
||||
}
|
||||
}
|
||||
|
||||
// l0 Segment which exist on current target, but not on dist
|
||||
for segmentID, segment := range currentTargetMap {
|
||||
for _, segment := range currentTargetMap {
|
||||
// to avoid generate duplicate segment task
|
||||
if nextTargetMap[segmentID] != nil {
|
||||
if nextTargetMap[segment.ID] != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
node, existInDist := distMap[segmentID]
|
||||
l0WithWrongLocation := false
|
||||
if existInDist && segment.GetLevel() == datapb.SegmentLevel_L0 {
|
||||
// the L0 segments have to been in the same node as the channel watched
|
||||
leader := c.dist.LeaderViewManager.GetLatestShardLeaderByFilter(meta.WithReplica2LeaderView(replica), meta.WithChannelName2LeaderView(segment.GetInsertChannel()))
|
||||
l0WithWrongLocation = leader != nil && node != leader.ID
|
||||
}
|
||||
if !existInDist || l0WithWrongLocation {
|
||||
if isSegmentLack(segment) {
|
||||
toLoad = append(toLoad, segment)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/etcd"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
@ -178,11 +179,13 @@ func (suite *SegmentCheckerTestSuite) TestLoadL0Segments() {
|
|||
NodeID: 1,
|
||||
Address: "localhost",
|
||||
Hostname: "localhost",
|
||||
Version: common.Version,
|
||||
}))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||
NodeID: 2,
|
||||
Address: "localhost",
|
||||
Hostname: "localhost",
|
||||
Version: common.Version,
|
||||
}))
|
||||
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
|
||||
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
|
||||
|
|
|
@ -6,5 +6,5 @@ import semver "github.com/blang/semver/v4"
|
|||
var Version semver.Version
|
||||
|
||||
func init() {
|
||||
Version, _ = semver.Parse("2.4.0-rc.1")
|
||||
Version, _ = semver.Parse("2.4.0")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue