mirror of https://github.com/milvus-io/milvus.git
fix: segment stats may be inconsistent after wal closing (#39593)
issue: #38399 - The stats may be kept after wal closing if the growing segment is not dirty. - Change the error handling of wal open to avoid redundant manager api call. Signed-off-by: chyezh <chyezh@outlook.com>pull/39452/head
parent
a816a03351
commit
b1cee78a55
|
@ -95,6 +95,10 @@ func TestAssignmentService(t *testing.T) {
|
|||
|
||||
assignmentService.ReportAssignmentError(ctx, types.PChannelInfo{Name: "c1", Term: 1}, errors.New("test"))
|
||||
|
||||
// Repeated report error at the same term should be ignored.
|
||||
assignmentService.ReportAssignmentError(ctx, types.PChannelInfo{Name: "c1", Term: 1}, errors.New("test"))
|
||||
assignmentService.ReportAssignmentError(ctx, types.PChannelInfo{Name: "c1", Term: 1}, errors.New("test"))
|
||||
|
||||
// test close
|
||||
go close(closeCh)
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
|
|
@ -14,13 +14,14 @@ import (
|
|||
// newAssignmentDiscoverClient creates a new assignment discover client.
|
||||
func newAssignmentDiscoverClient(w *watcher, streamClient streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverClient) *assignmentDiscoverClient {
|
||||
c := &assignmentDiscoverClient{
|
||||
lifetime: typeutil.NewLifetime(),
|
||||
w: w,
|
||||
streamClient: streamClient,
|
||||
logger: log.With(),
|
||||
requestCh: make(chan *streamingpb.AssignmentDiscoverRequest, 16),
|
||||
exitCh: make(chan struct{}),
|
||||
wg: sync.WaitGroup{},
|
||||
lifetime: typeutil.NewLifetime(),
|
||||
w: w,
|
||||
streamClient: streamClient,
|
||||
logger: log.With(),
|
||||
requestCh: make(chan *streamingpb.AssignmentDiscoverRequest, 16),
|
||||
exitCh: make(chan struct{}),
|
||||
wg: sync.WaitGroup{},
|
||||
lastErrorReportedTerm: make(map[string]int64),
|
||||
}
|
||||
c.executeBackgroundTask()
|
||||
return c
|
||||
|
@ -28,13 +29,14 @@ func newAssignmentDiscoverClient(w *watcher, streamClient streamingpb.StreamingC
|
|||
|
||||
// assignmentDiscoverClient is the client for assignment discover.
|
||||
type assignmentDiscoverClient struct {
|
||||
lifetime *typeutil.Lifetime
|
||||
w *watcher
|
||||
logger *log.MLogger
|
||||
requestCh chan *streamingpb.AssignmentDiscoverRequest
|
||||
exitCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
streamClient streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverClient
|
||||
lifetime *typeutil.Lifetime
|
||||
w *watcher
|
||||
logger *log.MLogger
|
||||
requestCh chan *streamingpb.AssignmentDiscoverRequest
|
||||
exitCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
streamClient streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverClient
|
||||
lastErrorReportedTerm map[string]int64
|
||||
}
|
||||
|
||||
// ReportAssignmentError reports the assignment error to server.
|
||||
|
@ -101,12 +103,28 @@ func (c *assignmentDiscoverClient) sendLoop() (err error) {
|
|||
}
|
||||
return c.streamClient.CloseSend()
|
||||
}
|
||||
if c.shouldIgnore(req) {
|
||||
continue
|
||||
}
|
||||
if err := c.streamClient.Send(req); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// shouldIgnore checks if the request should be ignored.
|
||||
func (c *assignmentDiscoverClient) shouldIgnore(req *streamingpb.AssignmentDiscoverRequest) bool {
|
||||
switch req := req.Command.(type) {
|
||||
case *streamingpb.AssignmentDiscoverRequest_ReportError:
|
||||
if term, ok := c.lastErrorReportedTerm[req.ReportError.Pchannel.Name]; ok && req.ReportError.Pchannel.Term <= term {
|
||||
// If the error at newer term has been reported, ignore it right now.
|
||||
return true
|
||||
}
|
||||
c.lastErrorReportedTerm[req.ReportError.Pchannel.Name] = req.ReportError.Pchannel.Term
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// recvLoop receives the message from server.
|
||||
// 1. FullAssignment
|
||||
// 2. Close
|
||||
|
|
|
@ -226,7 +226,7 @@ func (b *balancerImpl) applyBalanceResultToStreamingNode(ctx context.Context, mo
|
|||
|
||||
// assign the channel to the target node.
|
||||
if err := resource.Resource().StreamingNodeManagerClient().Assign(ctx, channel.CurrentAssignment()); err != nil {
|
||||
b.logger.Warn("fail to assign channel", zap.Any("assignment", channel.CurrentAssignment()))
|
||||
b.logger.Warn("fail to assign channel", zap.Any("assignment", channel.CurrentAssignment()), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
b.logger.Info("assign channel success", zap.Any("assignment", channel.CurrentAssignment()))
|
||||
|
|
|
@ -4,9 +4,12 @@ import (
|
|||
"context"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
|
||||
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/syncutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
@ -121,9 +124,16 @@ func (s *sealOperationInspectorImpl) background() {
|
|||
return true
|
||||
})
|
||||
case <-mustSealTicker.C:
|
||||
segmentBelongs := resource.Resource().SegmentAssignStatsManager().SealByTotalGrowingSegmentsSize()
|
||||
threshold := paramtable.Get().DataCoordCfg.GrowingSegmentsMemSizeInMB.GetAsUint64() * 1024 * 1024
|
||||
segmentBelongs := resource.Resource().SegmentAssignStatsManager().SealByTotalGrowingSegmentsSize(threshold)
|
||||
if segmentBelongs == nil {
|
||||
continue
|
||||
}
|
||||
log.Info("seal by total growing segments size", zap.String("vchannel", segmentBelongs.VChannel),
|
||||
zap.Uint64("sealThreshold", threshold),
|
||||
zap.Int64("sealSegment", segmentBelongs.SegmentID))
|
||||
if pm, ok := s.managers.Get(segmentBelongs.PChannel); ok {
|
||||
pm.MustSealSegments(s.taskNotifier.Context(), segmentBelongs)
|
||||
pm.MustSealSegments(s.taskNotifier.Context(), *segmentBelongs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -164,19 +164,14 @@ func (m *partitionSegmentManager) collectShouldBeSealedWithPolicy(predicates fun
|
|||
return shouldBeSealedSegments
|
||||
}
|
||||
|
||||
// CollectDirtySegmentsAndClear collects all segments in the manager and clear the maanger.
|
||||
func (m *partitionSegmentManager) CollectDirtySegmentsAndClear() []*segmentAllocManager {
|
||||
// CollectAllSegmentsAndClear collects all segments in the manager and clear the manager.
|
||||
func (m *partitionSegmentManager) CollectAllSegmentsAndClear() []*segmentAllocManager {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
dirtySegments := make([]*segmentAllocManager, 0, len(m.segments))
|
||||
for _, segment := range m.segments {
|
||||
if segment.IsDirtyEnough() {
|
||||
dirtySegments = append(dirtySegments, segment)
|
||||
}
|
||||
}
|
||||
m.segments = make([]*segmentAllocManager, 0)
|
||||
return dirtySegments
|
||||
segments := m.segments
|
||||
m.segments = nil
|
||||
return segments
|
||||
}
|
||||
|
||||
// CollectAllCanBeSealedAndClear collects all segments that can be sealed and clear the manager.
|
||||
|
|
|
@ -265,31 +265,35 @@ func (m *PChannelSegmentAllocManager) Close(ctx context.Context) {
|
|||
|
||||
// Try to seal all wait
|
||||
m.helper.SealAllWait(ctx)
|
||||
m.logger.Info("seal all waited segments done", zap.Int("waitCounter", m.helper.WaitCounter()))
|
||||
m.logger.Info("seal all waited segments done, may be some not done here", zap.Int("waitCounter", m.helper.WaitCounter()))
|
||||
|
||||
segments := make([]*segmentAllocManager, 0)
|
||||
m.managers.Range(func(pm *partitionSegmentManager) {
|
||||
segments = append(segments, pm.CollectDirtySegmentsAndClear()...)
|
||||
segments = append(segments, pm.CollectAllSegmentsAndClear()...)
|
||||
})
|
||||
|
||||
// commitAllSegmentsOnSamePChannel commits all segments on the same pchannel.
|
||||
// Try to seal the dirty segment to avoid generate too large segment.
|
||||
protoSegments := make([]*streamingpb.SegmentAssignmentMeta, 0, len(segments))
|
||||
growingCnt := 0
|
||||
for _, segment := range segments {
|
||||
protoSegments = append(protoSegments, segment.Snapshot())
|
||||
if segment.GetState() == streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_GROWING {
|
||||
growingCnt++
|
||||
}
|
||||
if segment.IsDirtyEnough() {
|
||||
// Only persist the dirty segment.
|
||||
protoSegments = append(protoSegments, segment.Snapshot())
|
||||
}
|
||||
}
|
||||
|
||||
m.logger.Info("segment assignment manager save all dirty segment assignments info", zap.Int("segmentCount", len(protoSegments)))
|
||||
m.logger.Info("segment assignment manager save all dirty segment assignments info",
|
||||
zap.Int("dirtySegmentCount", len(protoSegments)),
|
||||
zap.Int("growingSegmentCount", growingCnt),
|
||||
zap.Int("segmentCount", len(segments)))
|
||||
if err := resource.Resource().StreamingNodeCatalog().SaveSegmentAssignments(ctx, m.pchannel.Name, protoSegments); err != nil {
|
||||
m.logger.Warn("commit segment assignment at pchannel failed", zap.Error(err))
|
||||
}
|
||||
|
||||
// remove the stats from stats manager.
|
||||
m.logger.Info("segment assignment manager remove all segment stats from stats manager")
|
||||
for _, segment := range segments {
|
||||
if segment.GetState() == streamingpb.SegmentAssignmentState_SEGMENT_ASSIGNMENT_STATE_GROWING {
|
||||
resource.Resource().SegmentAssignStatsManager().UnregisterSealedSegment(segment.GetSegmentID())
|
||||
}
|
||||
}
|
||||
|
||||
removedStatsSegmentCnt := resource.Resource().SegmentAssignStatsManager().UnregisterAllStatsOnPChannel(m.pchannel.Name)
|
||||
m.logger.Info("segment assignment manager remove all segment stats from stats manager", zap.Int("removedStatsSegmentCount", removedStatsSegmentCnt))
|
||||
m.metrics.Close()
|
||||
}
|
||||
|
|
|
@ -5,10 +5,6 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/pingcap/log"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -24,8 +20,9 @@ type StatsManager struct {
|
|||
totalStats InsertMetrics
|
||||
pchannelStats map[string]*InsertMetrics
|
||||
vchannelStats map[string]*InsertMetrics
|
||||
segmentStats map[int64]*SegmentStats // map[SegmentID]SegmentStats
|
||||
segmentIndex map[int64]SegmentBelongs // map[SegmentID]channels
|
||||
segmentStats map[int64]*SegmentStats // map[SegmentID]SegmentStats
|
||||
segmentIndex map[int64]SegmentBelongs // map[SegmentID]channels
|
||||
pchannelIndex map[string]map[int64]struct{} // map[PChannel]SegmentID
|
||||
sealNotifier *SealSignalNotifier
|
||||
}
|
||||
|
||||
|
@ -46,6 +43,7 @@ func NewStatsManager() *StatsManager {
|
|||
vchannelStats: make(map[string]*InsertMetrics),
|
||||
segmentStats: make(map[int64]*SegmentStats),
|
||||
segmentIndex: make(map[int64]SegmentBelongs),
|
||||
pchannelIndex: make(map[string]map[int64]struct{}),
|
||||
sealNotifier: NewSealSignalNotifier(),
|
||||
}
|
||||
}
|
||||
|
@ -62,6 +60,10 @@ func (m *StatsManager) RegisterNewGrowingSegment(belongs SegmentBelongs, segment
|
|||
|
||||
m.segmentStats[segmentID] = stats
|
||||
m.segmentIndex[segmentID] = belongs
|
||||
if _, ok := m.pchannelIndex[belongs.PChannel]; !ok {
|
||||
m.pchannelIndex[belongs.PChannel] = make(map[int64]struct{})
|
||||
}
|
||||
m.pchannelIndex[belongs.PChannel][segmentID] = struct{}{}
|
||||
m.totalStats.Collect(stats.Insert)
|
||||
if _, ok := m.pchannelStats[belongs.PChannel]; !ok {
|
||||
m.pchannelStats[belongs.PChannel] = &InsertMetrics{}
|
||||
|
@ -145,6 +147,10 @@ func (m *StatsManager) UnregisterSealedSegment(segmentID int64) *SegmentStats {
|
|||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
return m.unregisterSealedSegment(segmentID)
|
||||
}
|
||||
|
||||
func (m *StatsManager) unregisterSealedSegment(segmentID int64) *SegmentStats {
|
||||
// Must be exist, otherwise it's a bug.
|
||||
info, ok := m.segmentIndex[segmentID]
|
||||
if !ok {
|
||||
|
@ -156,6 +162,13 @@ func (m *StatsManager) UnregisterSealedSegment(segmentID int64) *SegmentStats {
|
|||
m.totalStats.Subtract(stats.Insert)
|
||||
delete(m.segmentStats, segmentID)
|
||||
delete(m.segmentIndex, segmentID)
|
||||
if _, ok := m.pchannelIndex[info.PChannel]; ok {
|
||||
delete(m.pchannelIndex[info.PChannel], segmentID)
|
||||
if len(m.pchannelIndex[info.PChannel]) == 0 {
|
||||
delete(m.pchannelIndex, info.PChannel)
|
||||
}
|
||||
}
|
||||
|
||||
if _, ok := m.pchannelStats[info.PChannel]; ok {
|
||||
m.pchannelStats[info.PChannel].Subtract(stats.Insert)
|
||||
if m.pchannelStats[info.PChannel].BinarySize == 0 {
|
||||
|
@ -171,15 +184,29 @@ func (m *StatsManager) UnregisterSealedSegment(segmentID int64) *SegmentStats {
|
|||
return stats
|
||||
}
|
||||
|
||||
// SealByTotalGrowingSegmentsSize seals the largest growing segment
|
||||
// if the total size of growing segments in ANY vchannel exceeds the threshold.
|
||||
func (m *StatsManager) SealByTotalGrowingSegmentsSize() SegmentBelongs {
|
||||
// UnregisterAllStatsOnPChannel unregisters all stats on pchannel.
|
||||
func (m *StatsManager) UnregisterAllStatsOnPChannel(pchannel string) int {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
for vchannel, metrics := range m.vchannelStats {
|
||||
threshold := paramtable.Get().DataCoordCfg.GrowingSegmentsMemSizeInMB.GetAsUint64() * 1024 * 1024
|
||||
if metrics.BinarySize >= threshold {
|
||||
segmentIDs, ok := m.pchannelIndex[pchannel]
|
||||
if !ok {
|
||||
return 0
|
||||
}
|
||||
for segmentID := range segmentIDs {
|
||||
m.unregisterSealedSegment(segmentID)
|
||||
}
|
||||
return len(segmentIDs)
|
||||
}
|
||||
|
||||
// SealByTotalGrowingSegmentsSize seals the largest growing segment
|
||||
// if the total size of growing segments in ANY vchannel exceeds the threshold.
|
||||
func (m *StatsManager) SealByTotalGrowingSegmentsSize(vchannelThreshold uint64) *SegmentBelongs {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
for _, metrics := range m.vchannelStats {
|
||||
if metrics.BinarySize >= vchannelThreshold {
|
||||
var (
|
||||
largestSegment int64 = 0
|
||||
largestSegmentSize uint64 = 0
|
||||
|
@ -190,13 +217,14 @@ func (m *StatsManager) SealByTotalGrowingSegmentsSize() SegmentBelongs {
|
|||
largestSegment = segmentID
|
||||
}
|
||||
}
|
||||
log.Info("seal by total growing segments size", zap.String("vchannel", vchannel),
|
||||
zap.Uint64("vchannelGrowingSize", metrics.BinarySize), zap.Uint64("sealThreshold", threshold),
|
||||
zap.Int64("sealSegment", largestSegment), zap.Uint64("sealSegmentSize", largestSegmentSize))
|
||||
return m.segmentIndex[largestSegment]
|
||||
belongs, ok := m.segmentIndex[largestSegment]
|
||||
if !ok {
|
||||
panic("unrechable: the segmentID should always be found in segmentIndex")
|
||||
}
|
||||
return &belongs
|
||||
}
|
||||
}
|
||||
return SegmentBelongs{}
|
||||
return nil
|
||||
}
|
||||
|
||||
// InsertOpeatationMetrics is the metrics of insert operation.
|
||||
|
|
|
@ -106,6 +106,25 @@ func TestStatsManager(t *testing.T) {
|
|||
assert.Panics(t, func() {
|
||||
m.UnregisterSealedSegment(1)
|
||||
})
|
||||
m.UnregisterAllStatsOnPChannel("pchannel")
|
||||
m.UnregisterAllStatsOnPChannel("pchannel2")
|
||||
}
|
||||
|
||||
func TestSealByTotalGrowingSegmentsSize(t *testing.T) {
|
||||
m := NewStatsManager()
|
||||
m.RegisterNewGrowingSegment(SegmentBelongs{PChannel: "pchannel", VChannel: "vchannel", CollectionID: 1, PartitionID: 2, SegmentID: 3}, 3, createSegmentStats(100, 100, 300))
|
||||
m.RegisterNewGrowingSegment(SegmentBelongs{PChannel: "pchannel", VChannel: "vchannel", CollectionID: 1, PartitionID: 2, SegmentID: 4}, 4, createSegmentStats(100, 200, 300))
|
||||
m.RegisterNewGrowingSegment(SegmentBelongs{PChannel: "pchannel", VChannel: "vchannel", CollectionID: 1, PartitionID: 2, SegmentID: 5}, 5, createSegmentStats(100, 100, 300))
|
||||
belongs := m.SealByTotalGrowingSegmentsSize(401)
|
||||
assert.Nil(t, belongs)
|
||||
belongs = m.SealByTotalGrowingSegmentsSize(400)
|
||||
assert.NotNil(t, belongs)
|
||||
assert.Equal(t, int64(4), belongs.SegmentID)
|
||||
m.UnregisterAllStatsOnPChannel("pchannel")
|
||||
assert.Empty(t, m.pchannelStats)
|
||||
assert.Empty(t, m.vchannelStats)
|
||||
assert.Empty(t, m.segmentStats)
|
||||
assert.Empty(t, m.segmentIndex)
|
||||
}
|
||||
|
||||
func createSegmentStats(row uint64, binarySize uint64, maxBinarSize uint64) *SegmentStats {
|
||||
|
|
|
@ -101,7 +101,9 @@ func (m *managerImpl) GetAvailableWAL(channel types.PChannelInfo) (wal.WAL, erro
|
|||
if currentTerm != channel.Term {
|
||||
return nil, status.NewUnmatchedChannelTerm(channel.Name, channel.Term, currentTerm)
|
||||
}
|
||||
return l, nil
|
||||
// wal's lifetime is fully managed by wal manager,
|
||||
// so wrap the wal instance to prevent it from being closed by other components.
|
||||
return nopCloseWAL{l}, nil
|
||||
}
|
||||
|
||||
// GetAllAvailableChannels returns all available channel info.
|
||||
|
@ -176,3 +178,13 @@ func isRemoveable(state managerState) bool {
|
|||
func isOpenable(state managerState) bool {
|
||||
return state&managerOpenable != 0
|
||||
}
|
||||
|
||||
// wal can be only closed by the wal manager.
|
||||
// So wrap the wal instance to prevent it from being closed by other components.
|
||||
type nopCloseWAL struct {
|
||||
wal.WAL
|
||||
}
|
||||
|
||||
func (w nopCloseWAL) Close() {
|
||||
// do nothing
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package walmanager
|
|||
import (
|
||||
"context"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
|
||||
|
@ -72,7 +73,14 @@ func (w *walLifetime) Remove(ctx context.Context, term int64) error {
|
|||
}
|
||||
|
||||
// Wait until the WAL state is ready or term expired or error occurs.
|
||||
return w.statePair.WaitCurrentStateReachExpected(ctx, expected)
|
||||
err := w.statePair.WaitCurrentStateReachExpected(ctx, expected)
|
||||
if errors.IsAny(err, context.Canceled, context.DeadlineExceeded) {
|
||||
return err
|
||||
}
|
||||
if err != nil {
|
||||
w.logger.Info("remove wal success because that previous open operation is failure", zap.NamedError("previousOpenError", err))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the wal lifetime.
|
||||
|
|
Loading…
Reference in New Issue