fix: Load segment task promote failed (#31430)

issue: #30816

pr #31319 introduce the logic that segment checker need to load level
zero segment which only exist in current target.

This PR fix load segment task promote failed when segment only belongs
to current target

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/31492/head
wei liu 2024-03-21 18:09:07 +08:00 committed by GitHub
parent a2fdebd90d
commit 03eaa5d478
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 38 additions and 15 deletions

View File

@ -894,7 +894,7 @@ func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error {
if taskType == TaskTypeMove || taskType == TaskTypeUpdate {
segment = scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.CurrentTarget)
} else {
segment = scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.NextTarget)
segment = scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.NextTargetFirst)
}
if segment == nil {
log.Warn("task stale due to the segment to load not exists in targets",

View File

@ -18,10 +18,14 @@ package balance
import (
"context"
"fmt"
"strconv"
"strings"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
@ -51,7 +55,7 @@ func (s *BalanceTestSuit) SetupSuite() {
s.Require().NoError(s.SetupEmbedEtcd())
}
func (s *BalanceTestSuit) initCollection(collectionName string, replica int, channelNum int, segmentNum int, segmentRowNum int) {
func (s *BalanceTestSuit) initCollection(collectionName string, replica int, channelNum int, segmentNum int, segmentRowNum int, segmentDeleteNum int) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -92,6 +96,26 @@ func (s *BalanceTestSuit) initCollection(collectionName string, replica int, cha
s.NoError(err)
s.True(merr.Ok(insertResult.Status))
if segmentDeleteNum > 0 {
if segmentDeleteNum > segmentRowNum {
segmentDeleteNum = segmentRowNum
}
pks := insertResult.GetIDs().GetIntId().GetData()
expr := fmt.Sprintf("%s in [%s]", integration.Int64Field, strings.Join(lo.Map(pks, func(pk int64, _ int) string { return strconv.FormatInt(pk, 10) }), ","))
log.Info("========================delete expr==================",
zap.String("expr", expr),
)
deleteResp, err := s.Cluster.Proxy.Delete(ctx, &milvuspb.DeleteRequest{
CollectionName: collectionName,
Expr: expr,
})
s.Require().NoError(err)
s.Require().True(merr.Ok(deleteResp.GetStatus()))
s.Require().EqualValues(len(pks), deleteResp.GetDeleteCnt())
}
// flush
flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{
DbName: dbName,
@ -137,7 +161,7 @@ func (s *BalanceTestSuit) initCollection(collectionName string, replica int, cha
func (s *BalanceTestSuit) TestBalanceOnSingleReplica() {
name := "test_balance_" + funcutil.GenRandomStr()
s.initCollection(name, 1, 2, 2, 2000)
s.initCollection(name, 1, 2, 2, 2000, 500)
ctx := context.Background()
// disable compact
@ -161,7 +185,7 @@ func (s *BalanceTestSuit) TestBalanceOnSingleReplica() {
resp, err := qn.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
return len(resp.Channels) == 1 && len(resp.Segments) == 2
return len(resp.Channels) == 1 && len(resp.Segments) >= 2
}, 30*time.Second, 1*time.Second)
// check total segment number
@ -173,7 +197,7 @@ func (s *BalanceTestSuit) TestBalanceOnSingleReplica() {
s.True(merr.Ok(resp1.GetStatus()))
count += len(resp1.Segments)
}
return count == 4
return count == 8
}, 10*time.Second, 1*time.Second)
}
@ -193,11 +217,11 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() {
Command: datapb.GcCommand_Resume,
})
// init collection with 2 channel, each channel has 2 segment, each segment has 2000 row
// init collection with 2 channel, each channel has 4 segment, each segment has 2000 row
// and load it with 2 replicas on 2 nodes.
// then we add 2 query node, after balance happens, expected each node have 1 channel and 2 segments
name := "test_balance_" + funcutil.GenRandomStr()
s.initCollection(name, 2, 2, 2, 2000)
s.initCollection(name, 2, 2, 2, 2000, 500)
resp, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{CollectionName: name})
s.NoError(err)
@ -211,13 +235,13 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() {
s.Eventually(func() bool {
resp, err := qn1.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
s.NoError(err)
return len(resp.Channels) == 1 && len(resp.Segments) == 2
return len(resp.Channels) == 1 && len(resp.Segments) >= 2
}, 30*time.Second, 1*time.Second)
s.Eventually(func() bool {
resp, err := qn2.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
s.NoError(err)
return len(resp.Channels) == 1 && len(resp.Segments) == 2
return len(resp.Channels) == 1 && len(resp.Segments) >= 2
}, 30*time.Second, 1*time.Second)
// check total segment num
@ -229,7 +253,7 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() {
s.True(merr.Ok(resp1.GetStatus()))
count += len(resp1.Segments)
}
return count == 8
return count == 16
}, 10*time.Second, 1*time.Second)
}
@ -256,7 +280,7 @@ func (s *BalanceTestSuit) TestNodeDown() {
// init collection with 3 channel, each channel has 15 segment, each segment has 2000 row
// and load it with 2 replicas on 2 nodes.
name := "test_balance_" + funcutil.GenRandomStr()
s.initCollection(name, 1, 2, 15, 2000)
s.initCollection(name, 1, 2, 15, 2000, 500)
// then we add 2 query node, after balance happens, expected each node have 1 channel and 2 segments
qn1 := s.Cluster.AddQueryNode()
@ -268,7 +292,7 @@ func (s *BalanceTestSuit) TestNodeDown() {
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments))
return len(resp.Channels) == 0 && len(resp.Segments) == 10
return len(resp.Channels) == 0 && len(resp.Segments) >= 10
}, 30*time.Second, 1*time.Second)
s.Eventually(func() bool {
@ -276,7 +300,7 @@ func (s *BalanceTestSuit) TestNodeDown() {
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments))
return len(resp.Channels) == 0 && len(resp.Segments) == 10
return len(resp.Channels) == 0 && len(resp.Segments) >= 10
}, 30*time.Second, 1*time.Second)
// then we force stop qn1 and resume balance channel, let balance channel and load segment happens concurrently on qn2
@ -298,7 +322,7 @@ func (s *BalanceTestSuit) TestNodeDown() {
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments))
return len(resp.Channels) == 1 && len(resp.Segments) == 15
return len(resp.Channels) == 1 && len(resp.Segments) >= 15
}, 30*time.Second, 1*time.Second)
// expect all delegator will recover to healthy
@ -308,7 +332,6 @@ func (s *BalanceTestSuit) TestNodeDown() {
CollectionID: collectionID,
})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
return len(resp.Shards) == 2
}, 30*time.Second, 1*time.Second)
}