Refine offline segments logic in shard delegator (#28073) (#28084)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/28089/head
congqixia 2023-11-01 23:18:17 +08:00 committed by GitHub
parent f492e33343
commit 994bb6991b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 183 additions and 45 deletions

View File

@ -206,8 +206,12 @@ func (sd *shardDelegator) Search(ctx context.Context, req *querypb.SearchRequest
fmt.Sprint(paramtable.GetNodeID()), metrics.SearchLabel).
Observe(float64(waitTr.ElapseSpan().Milliseconds()))
sealed, growing, version := sd.distribution.GetSegments(true, req.GetReq().GetPartitionIDs()...)
defer sd.distribution.FinishUsage(version)
sealed, growing, version, err := sd.distribution.PinReadableSegments(req.GetReq().GetPartitionIDs()...)
if err != nil {
log.Warn("delegator failed to search, current distribution is not serviceable")
return nil, merr.WrapErrChannelNotAvailable(sd.vchannelName, "distribution is not servcieable")
}
defer sd.distribution.Unpin(version)
existPartitions := sd.collection.GetPartitions()
growing = lo.Filter(growing, func(segment SegmentEntry, _ int) bool {
return funcutil.SliceContain(existPartitions, segment.PartitionID)
@ -270,8 +274,12 @@ func (sd *shardDelegator) QueryStream(ctx context.Context, req *querypb.QueryReq
fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel).
Observe(float64(waitTr.ElapseSpan().Milliseconds()))
sealed, growing, version := sd.distribution.GetSegments(true, req.GetReq().GetPartitionIDs()...)
defer sd.distribution.FinishUsage(version)
sealed, growing, version, err := sd.distribution.PinReadableSegments(req.GetReq().GetPartitionIDs()...)
if err != nil {
log.Warn("delegator failed to query, current distribution is not serviceable")
return merr.WrapErrChannelNotAvailable(sd.vchannelName, "distribution is not servcieable")
}
defer sd.distribution.Unpin(version)
existPartitions := sd.collection.GetPartitions()
growing = lo.Filter(growing, func(segment SegmentEntry, _ int) bool {
return funcutil.SliceContain(existPartitions, segment.PartitionID)
@ -334,8 +342,12 @@ func (sd *shardDelegator) Query(ctx context.Context, req *querypb.QueryRequest)
fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel).
Observe(float64(waitTr.ElapseSpan().Milliseconds()))
sealed, growing, version := sd.distribution.GetSegments(true, req.GetReq().GetPartitionIDs()...)
defer sd.distribution.FinishUsage(version)
sealed, growing, version, err := sd.distribution.PinReadableSegments(req.GetReq().GetPartitionIDs()...)
if err != nil {
log.Warn("delegator failed to query, current distribution is not serviceable")
return nil, merr.WrapErrChannelNotAvailable(sd.vchannelName, "distribution is not servcieable")
}
defer sd.distribution.Unpin(version)
existPartitions := sd.collection.GetPartitions()
growing = lo.Filter(growing, func(segment SegmentEntry, _ int) bool {
return funcutil.SliceContain(existPartitions, segment.PartitionID)
@ -377,21 +389,25 @@ func (sd *shardDelegator) GetStatistics(ctx context.Context, req *querypb.GetSta
defer sd.lifetime.Done()
if !funcutil.SliceContain(req.GetDmlChannels(), sd.vchannelName) {
log.Warn("deletgator received query request not belongs to it",
log.Warn("delegator received GetStatistics request not belongs to it",
zap.Strings("reqChannels", req.GetDmlChannels()),
)
return nil, fmt.Errorf("dml channel not match, delegator channel %s, search channels %v", sd.vchannelName, req.GetDmlChannels())
return nil, fmt.Errorf("dml channel not match, delegator channel %s, GetStatistics channels %v", sd.vchannelName, req.GetDmlChannels())
}
// wait tsafe
err := sd.waitTSafe(ctx, req.Req.GuaranteeTimestamp)
if err != nil {
log.Warn("delegator query failed to wait tsafe", zap.Error(err))
log.Warn("delegator GetStatistics failed to wait tsafe", zap.Error(err))
return nil, err
}
sealed, growing, version := sd.distribution.GetSegments(true, req.Req.GetPartitionIDs()...)
defer sd.distribution.FinishUsage(version)
sealed, growing, version, err := sd.distribution.PinReadableSegments(req.Req.GetPartitionIDs()...)
if err != nil {
log.Warn("delegator failed to GetStatistics, current distribution is not servicable")
return nil, merr.WrapErrChannelNotAvailable(sd.vchannelName, "distribution is not serviceable")
}
defer sd.distribution.Unpin(version)
tasks, err := organizeSubTask(ctx, req, sealed, growing, sd, func(req *querypb.GetStatisticsRequest, scope querypb.DataScope, segmentIDs []int64, targetID int64) *querypb.GetStatisticsRequest {
nodeReq := proto.Clone(req).(*querypb.GetStatisticsRequest)

View File

@ -186,7 +186,7 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) {
offlineSegments := typeutil.NewConcurrentSet[int64]()
sealed, growing, version := sd.distribution.GetSegments(false)
sealed, growing, version := sd.distribution.PinOnlineSegments()
eg, ctx := errgroup.WithContext(context.Background())
for _, entry := range sealed {
@ -225,7 +225,7 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) {
// not error return in apply delete
_ = eg.Wait()
sd.distribution.FinishUsage(version)
sd.distribution.Unpin(version)
offlineSegIDs := offlineSegments.Collect()
if len(offlineSegIDs) > 0 {
log.Warn("failed to apply delete, mark segment offline", zap.Int64s("offlineSegments", offlineSegIDs))

View File

@ -434,6 +434,24 @@ func (s *DelegatorSuite) TestSearch() {
s.Error(err)
})
s.Run("distribution_not_serviceable", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sd, ok := s.delegator.(*shardDelegator)
s.Require().True(ok)
sd.distribution.AddOfflines(1001)
_, err := s.delegator.Search(ctx, &querypb.SearchRequest{
Req: &internalpb.SearchRequest{
Base: commonpbutil.NewMsgBase(),
},
DmlChannels: []string{s.vchannelName},
})
s.Error(err)
})
s.Run("cluster_not_serviceable", func() {
s.delegator.Close()
@ -603,6 +621,22 @@ func (s *DelegatorSuite) TestQuery() {
s.Error(err)
})
s.Run("distribution_not_serviceable", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sd, ok := s.delegator.(*shardDelegator)
s.Require().True(ok)
sd.distribution.AddOfflines(1001)
_, err := s.delegator.Query(ctx, &querypb.QueryRequest{
Req: &internalpb.RetrieveRequest{Base: commonpbutil.NewMsgBase()},
DmlChannels: []string{s.vchannelName},
})
s.Error(err)
})
s.Run("cluster_not_serviceable", func() {
s.delegator.Close()
@ -865,6 +899,25 @@ func (s *DelegatorSuite) TestQueryStream() {
s.Error(err)
})
s.Run("distribution_not_serviceable", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sd, ok := s.delegator.(*shardDelegator)
s.Require().True(ok)
sd.distribution.AddOfflines(1001)
client := streamrpc.NewLocalQueryClient(ctx)
server := client.CreateServer()
// run stream function
err := s.delegator.QueryStream(ctx, &querypb.QueryRequest{
Req: &internalpb.RetrieveRequest{Base: commonpbutil.NewMsgBase()},
DmlChannels: []string{s.vchannelName},
}, server)
s.Error(err)
})
s.Run("cluster_not_serviceable", func() {
s.delegator.Close()
@ -1023,6 +1076,22 @@ func (s *DelegatorSuite) TestGetStats() {
s.Error(err)
})
s.Run("distribution_not_serviceable", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sd, ok := s.delegator.(*shardDelegator)
s.Require().True(ok)
sd.distribution.AddOfflines(1001)
_, err := s.delegator.GetStatistics(ctx, &querypb.GetStatisticsRequest{
Req: &internalpb.GetStatisticsRequest{Base: commonpbutil.NewMsgBase()},
DmlChannels: []string{s.vchannelName},
})
s.Error(err)
})
s.Run("cluster_not_serviceable", func() {
s.delegator.Close()

View File

@ -24,6 +24,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -101,8 +102,26 @@ func NewDistribution() *distribution {
return dist
}
// GetAllSegments returns segments in current snapshot, filter readable segment when readable is true
func (d *distribution) GetSegments(readable bool, partitions ...int64) (sealed []SnapshotItem, growing []SegmentEntry, version int64) {
func (d *distribution) PinReadableSegments(partitions ...int64) (sealed []SnapshotItem, growing []SegmentEntry, version int64, err error) {
d.mut.RLock()
defer d.mut.RUnlock()
if !d.Serviceable() {
return nil, nil, -1, merr.WrapErrServiceInternal("channel distribution is not serviceable")
}
current := d.current.Load()
sealed, growing = current.Get(partitions...)
version = current.version
targetVersion := current.GetTargetVersion()
filterReadable := func(entry SegmentEntry, _ int) bool {
return entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion
}
sealed, growing = d.filterSegments(sealed, growing, filterReadable)
return
}
func (d *distribution) PinOnlineSegments(partitions ...int64) (sealed []SnapshotItem, growing []SegmentEntry, version int64) {
d.mut.RLock()
defer d.mut.RUnlock()
@ -110,25 +129,20 @@ func (d *distribution) GetSegments(readable bool, partitions ...int64) (sealed [
sealed, growing = current.Get(partitions...)
version = current.version
if readable {
TargetVersion := current.GetTargetVersion()
sealed, growing = d.filterReadableSegments(sealed, growing, TargetVersion)
return
filterOnline := func(entry SegmentEntry, _ int) bool {
return !d.offlines.Contain(entry.SegmentID)
}
sealed, growing = d.filterSegments(sealed, growing, filterOnline)
return
}
func (d *distribution) filterReadableSegments(sealed []SnapshotItem, growing []SegmentEntry, targetVersion int64) ([]SnapshotItem, []SegmentEntry) {
filterReadable := func(entry SegmentEntry, _ int) bool {
return entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion
}
growing = lo.Filter(growing, filterReadable)
func (d *distribution) filterSegments(sealed []SnapshotItem, growing []SegmentEntry, filter func(SegmentEntry, int) bool) ([]SnapshotItem, []SegmentEntry) {
growing = lo.Filter(growing, filter)
sealed = lo.Map(sealed, func(item SnapshotItem, _ int) SnapshotItem {
return SnapshotItem{
NodeID: item.NodeID,
Segments: lo.Filter(item.Segments, filterReadable),
Segments: lo.Filter(item.Segments, filter),
}
})
@ -142,16 +156,19 @@ func (d *distribution) PeekSegments(readable bool, partitions ...int64) (sealed
sealed, growing = current.Peek(partitions...)
if readable {
TargetVersion := current.GetTargetVersion()
sealed, growing = d.filterReadableSegments(sealed, growing, TargetVersion)
targetVersion := current.GetTargetVersion()
filterReadable := func(entry SegmentEntry, _ int) bool {
return entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion
}
sealed, growing = d.filterSegments(sealed, growing, filterReadable)
return
}
return
}
// FinishUsage notifies snapshot one reference is released.
func (d *distribution) FinishUsage(version int64) {
// Unpin notifies snapshot one reference is released.
func (d *distribution) Unpin(version int64) {
snapshot, ok := d.snapshots.Get(version)
if ok {
snapshot.Done(d.getCleanup(snapshot.version))

View File

@ -177,11 +177,12 @@ func (s *DistributionSuite) TestAddDistribution() {
s.SetupTest()
defer s.TearDownTest()
s.dist.AddGrowing(tc.growing...)
_, _, version := s.dist.GetSegments(false)
_, _, version, err := s.dist.PinReadableSegments()
s.Require().NoError(err)
s.dist.AddDistributions(tc.input...)
sealed, _ := s.dist.PeekSegments(false)
s.compareSnapshotItems(tc.expected, sealed)
s.dist.FinishUsage(version)
s.dist.Unpin(version)
})
}
}
@ -246,8 +247,9 @@ func (s *DistributionSuite) TestAddGrowing() {
defer s.TearDownTest()
s.dist.AddGrowing(tc.input...)
_, growing, version := s.dist.GetSegments(false)
defer s.dist.FinishUsage(version)
_, growing, version, err := s.dist.PinReadableSegments()
s.Require().NoError(err)
defer s.dist.Unpin(version)
s.ElementsMatch(tc.expected, growing)
})
@ -433,7 +435,9 @@ func (s *DistributionSuite) TestRemoveDistribution() {
var version int64
if tc.withMockRead {
_, _, version = s.dist.GetSegments(false)
var err error
_, _, version, err = s.dist.PinReadableSegments()
s.Require().NoError(err)
}
ch := s.dist.RemoveDistributions(tc.removalSealed, tc.removalGrowing)
@ -446,7 +450,7 @@ func (s *DistributionSuite) TestRemoveDistribution() {
default:
}
s.dist.FinishUsage(version)
s.dist.Unpin(version)
}
// check ch close very soon
timeout := time.NewTimer(time.Second)
@ -457,8 +461,8 @@ func (s *DistributionSuite) TestRemoveDistribution() {
case <-ch:
}
sealed, growing, version := s.dist.GetSegments(false)
defer s.dist.FinishUsage(version)
sealed, growing, version := s.dist.PinOnlineSegments()
defer s.dist.Unpin(version)
s.compareSnapshotItems(tc.expectSealed, sealed)
s.ElementsMatch(tc.expectGrowing, growing)
})
@ -468,13 +472,15 @@ func (s *DistributionSuite) TestRemoveDistribution() {
func (s *DistributionSuite) TestPeek() {
type testCase struct {
tag string
readable bool
input []SegmentEntry
expected []SnapshotItem
}
cases := []testCase{
{
tag: "one node",
tag: "one_node",
readable: false,
input: []SegmentEntry{
{
NodeID: 1,
@ -504,7 +510,8 @@ func (s *DistributionSuite) TestPeek() {
},
},
{
tag: "multiple nodes",
tag: "multiple_nodes",
readable: false,
input: []SegmentEntry{
{
NodeID: 1,
@ -548,6 +555,34 @@ func (s *DistributionSuite) TestPeek() {
},
},
},
{
tag: "peek_readable",
readable: true,
input: []SegmentEntry{
{
NodeID: 1,
SegmentID: 1,
},
{
NodeID: 2,
SegmentID: 2,
},
{
NodeID: 1,
SegmentID: 3,
},
},
expected: []SnapshotItem{
{
NodeID: 1,
Segments: []SegmentEntry{},
},
{
NodeID: 2,
Segments: []SegmentEntry{},
},
},
},
}
for _, tc := range cases {
@ -558,7 +593,7 @@ func (s *DistributionSuite) TestPeek() {
// peek during lock
s.dist.AddDistributions(tc.input...)
s.dist.mut.Lock()
sealed, _ := s.dist.PeekSegments(false)
sealed, _ := s.dist.PeekSegments(tc.readable)
s.compareSnapshotItems(tc.expected, sealed)
s.dist.mut.Unlock()
})
@ -668,11 +703,12 @@ func (s *DistributionSuite) Test_SyncTargetVersion() {
s.dist.AddDistributions(sealed...)
s.dist.SyncTargetVersion(2, []int64{2, 3}, []int64{6}, []int64{})
s1, s2, _ := s.dist.GetSegments(true)
s1, s2, _, err := s.dist.PinReadableSegments()
s.Require().NoError(err)
s.Len(s1[0].Segments, 1)
s.Len(s2, 2)
s1, s2, _ = s.dist.GetSegments(false)
s1, s2, _ = s.dist.PinOnlineSegments()
s.Len(s1[0].Segments, 3)
s.Len(s2, 3)
@ -684,8 +720,8 @@ func (s *DistributionSuite) Test_SyncTargetVersion() {
s.False(s.dist.Serviceable())
s.dist.SyncTargetVersion(2, []int64{}, []int64{333}, []int64{1, 2, 3})
_, segments, _ := s.dist.GetSegments(true)
s.Len(segments, 0)
_, _, _, err = s.dist.PinReadableSegments()
s.Error(err)
}
func TestDistributionSuite(t *testing.T) {