mirror of https://github.com/milvus-io/milvus.git
Wait safe signal before release growing (#23358)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/23369/head
parent
49eb4b8af4
commit
6cd73d4e2a
|
@ -360,11 +360,12 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
|
|||
Version: req.GetVersion(),
|
||||
}
|
||||
})
|
||||
removed := sd.distribution.AddDistributions(entries...)
|
||||
removed, signal := sd.distribution.AddDistributions(entries...)
|
||||
|
||||
// release possible matched growing segments async
|
||||
if len(removed) > 0 {
|
||||
go func() {
|
||||
<-signal
|
||||
worker, err := sd.workerManager.GetWorker(paramtable.GetNodeID())
|
||||
if err != nil {
|
||||
log.Warn("failed to get local worker when try to release related growing", zap.Error(err))
|
||||
|
|
|
@ -29,6 +29,19 @@ const (
|
|||
wildcardNodeID = int64(-1)
|
||||
)
|
||||
|
||||
var (
|
||||
closedCh chan struct{}
|
||||
closeOnce sync.Once
|
||||
)
|
||||
|
||||
func getClosedCh() chan struct{} {
|
||||
closeOnce.Do(func() {
|
||||
closedCh = make(chan struct{})
|
||||
close(closedCh)
|
||||
})
|
||||
return closedCh
|
||||
}
|
||||
|
||||
// distribution is the struct to store segment distribution.
|
||||
// it contains both growing and sealed segments.
|
||||
type distribution struct {
|
||||
|
@ -107,7 +120,7 @@ func (d *distribution) Serviceable() bool {
|
|||
}
|
||||
|
||||
// AddDistributions add multiple segment entries.
|
||||
func (d *distribution) AddDistributions(entries ...SegmentEntry) []int64 {
|
||||
func (d *distribution) AddDistributions(entries ...SegmentEntry) ([]int64, chan struct{}) {
|
||||
d.mut.Lock()
|
||||
defer d.mut.Unlock()
|
||||
|
||||
|
@ -123,8 +136,12 @@ func (d *distribution) AddDistributions(entries ...SegmentEntry) []int64 {
|
|||
}
|
||||
}
|
||||
|
||||
d.genSnapshot()
|
||||
return removed
|
||||
ch := d.genSnapshot()
|
||||
// no offline growing, return closed ch to skip wait
|
||||
if len(removed) == 0 {
|
||||
return removed, getClosedCh()
|
||||
}
|
||||
return removed, ch
|
||||
}
|
||||
|
||||
// AddGrowing adds growing segment distribution.
|
||||
|
@ -192,9 +209,7 @@ func (d *distribution) RemoveDistributions(sealedSegments []SegmentEntry, growin
|
|||
|
||||
if !changed {
|
||||
// no change made, return closed signal channel
|
||||
ch := make(chan struct{})
|
||||
close(ch)
|
||||
return ch
|
||||
return getClosedCh()
|
||||
}
|
||||
|
||||
return d.genSnapshot()
|
||||
|
|
|
@ -38,14 +38,16 @@ func (s *DistributionSuite) TearDownTest() {
|
|||
|
||||
func (s *DistributionSuite) TestAddDistribution() {
|
||||
type testCase struct {
|
||||
tag string
|
||||
input []SegmentEntry
|
||||
expected []SnapshotItem
|
||||
tag string
|
||||
input []SegmentEntry
|
||||
growing []SegmentEntry
|
||||
expected []SnapshotItem
|
||||
expectedSignalClosed bool
|
||||
}
|
||||
|
||||
cases := []testCase{
|
||||
{
|
||||
tag: "one node",
|
||||
tag: "one_node",
|
||||
input: []SegmentEntry{
|
||||
{
|
||||
NodeID: 1,
|
||||
|
@ -71,9 +73,10 @@ func (s *DistributionSuite) TestAddDistribution() {
|
|||
},
|
||||
},
|
||||
},
|
||||
expectedSignalClosed: true,
|
||||
},
|
||||
{
|
||||
tag: "multiple nodes",
|
||||
tag: "multiple_nodes",
|
||||
input: []SegmentEntry{
|
||||
{
|
||||
NodeID: 1,
|
||||
|
@ -113,6 +116,27 @@ func (s *DistributionSuite) TestAddDistribution() {
|
|||
},
|
||||
},
|
||||
},
|
||||
expectedSignalClosed: true,
|
||||
},
|
||||
{
|
||||
tag: "remove_growing",
|
||||
growing: []SegmentEntry{
|
||||
{NodeID: 1, SegmentID: 1},
|
||||
},
|
||||
input: []SegmentEntry{
|
||||
{NodeID: 1, SegmentID: 1},
|
||||
{NodeID: 1, SegmentID: 2},
|
||||
},
|
||||
expected: []SnapshotItem{
|
||||
{
|
||||
NodeID: 1,
|
||||
Segments: []SegmentEntry{
|
||||
{NodeID: 1, SegmentID: 1},
|
||||
{NodeID: 1, SegmentID: 2},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedSignalClosed: false,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -120,14 +144,24 @@ func (s *DistributionSuite) TestAddDistribution() {
|
|||
s.Run(tc.tag, func() {
|
||||
s.SetupTest()
|
||||
defer s.TearDownTest()
|
||||
s.dist.AddDistributions(tc.input...)
|
||||
sealed, _, version := s.dist.GetCurrent()
|
||||
defer s.dist.FinishUsage(version)
|
||||
s.dist.AddGrowing(tc.growing...)
|
||||
_, signal := s.dist.AddDistributions(tc.input...)
|
||||
sealed, _ := s.dist.Peek()
|
||||
s.compareSnapshotItems(tc.expected, sealed)
|
||||
s.Equal(tc.expectedSignalClosed, s.isClosedCh(signal))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (s *DistributionSuite) isClosedCh(ch chan struct{}) bool {
|
||||
select {
|
||||
case <-ch:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (s *DistributionSuite) compareSnapshotItems(target, value []SnapshotItem) {
|
||||
if !s.Equal(len(target), len(value)) {
|
||||
return
|
||||
|
|
Loading…
Reference in New Issue