From fbe177d6e7795abbcac4ecfdc2bb432e8c7ce0c7 Mon Sep 17 00:00:00 2001 From: aoiasd <45024769+aoiasd@users.noreply.github.com> Date: Fri, 18 Oct 2024 11:57:24 +0800 Subject: [PATCH] fix: avoid panic when load segment with pkoracle and idforacle already exist (#36959) relate: https://github.com/milvus-io/milvus/issues/36949 Signed-off-by: aoiasd --- internal/querynodev2/delegator/delegator.go | 12 ++++++------ internal/querynodev2/delegator/delegator_data.go | 7 +++++-- internal/querynodev2/delegator/distribution.go | 9 +++++++-- internal/querynodev2/delegator/distribution_test.go | 4 +--- 4 files changed, 19 insertions(+), 13 deletions(-) diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 06f98608ec..6cb63aa65a 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -913,10 +913,6 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni excludedSegments := NewExcludedSegments(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.GetAsDuration(time.Second)) - var idfOracle IDFOracle - if len(collection.Schema().GetFunctions()) > 0 { - idfOracle = NewIDFOracle(collection.Schema().GetFunctions()) - } sd := &shardDelegator{ collectionID: collectionID, replicaID: replicaID, @@ -926,7 +922,7 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni segmentManager: manager.Segment, workerManager: workerManager, lifetime: lifetime.NewLifetime(lifetime.Initializing), - distribution: NewDistribution(idfOracle), + distribution: NewDistribution(), deleteBuffer: deletebuffer.NewListDeleteBuffer[*deletebuffer.Item](startTs, sizePerBlock), pkOracle: pkoracle.NewPkOracle(), tsafeManager: tsafeManager, @@ -935,7 +931,6 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni factory: factory, queryHook: queryHook, chunkManager: chunkManager, - idfOracle: idfOracle, partitionStats: make(map[UniqueID]*storage.PartitionStatsSnapshot), excludedSegments: excludedSegments, functionRunners: make(map[int64]function.FunctionRunner), @@ -955,6 +950,11 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni } } + if len(sd.isBM25Field) > 0 { + sd.idfOracle = NewIDFOracle(collection.Schema().GetFunctions()) + sd.distribution.SetIDFOracle(sd.idfOracle) + } + m := sync.Mutex{} sd.tsCond = sync.NewCond(&m) if sd.lifetime.Add(lifetime.NotStopped) == nil { diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index fc5d3803c4..092dcd55d8 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -487,7 +487,7 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg }) var bm25Stats *typeutil.ConcurrentMap[int64, map[int64]*storage.BM25Stats] - if len(sd.isBM25Field) > 0 { + if sd.idfOracle != nil { bm25Stats, err = sd.loader.LoadBM25Stats(ctx, req.GetCollectionID(), infos...) if err != nil { log.Warn("failed to load bm25 stats for segment", zap.Error(err)) @@ -690,8 +690,11 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context, sd.pkOracle.Register(candidate, targetNodeID) } - if sd.idfOracle != nil { + if sd.idfOracle != nil && bm25Stats != nil { bm25Stats.Range(func(segmentID int64, stats map[int64]*storage.BM25Stats) bool { + log.Info("register sealed segment bm25 stats into idforacle", + zap.Int64("segmentID", segmentID), + ) sd.idfOracle.Register(segmentID, stats, segments.SegmentTypeSealed) return false }) diff --git a/internal/querynodev2/delegator/distribution.go b/internal/querynodev2/delegator/distribution.go index 165db04a2f..0bcbea20b8 100644 --- a/internal/querynodev2/delegator/distribution.go +++ b/internal/querynodev2/delegator/distribution.go @@ -91,7 +91,7 @@ type SegmentEntry struct { } // NewDistribution creates a new distribution instance with all field initialized. -func NewDistribution(idfOracle IDFOracle) *distribution { +func NewDistribution() *distribution { dist := &distribution{ serviceable: atomic.NewBool(false), growingSegments: make(map[UniqueID]SegmentEntry), @@ -100,13 +100,18 @@ func NewDistribution(idfOracle IDFOracle) *distribution { current: atomic.NewPointer[snapshot](nil), offlines: typeutil.NewSet[int64](), targetVersion: atomic.NewInt64(initialTargetVersion), - idfOracle: idfOracle, } dist.genSnapshot() return dist } +func (d *distribution) SetIDFOracle(idfOracle IDFOracle) { + d.mut.Lock() + defer d.mut.Unlock() + d.idfOracle = idfOracle +} + func (d *distribution) PinReadableSegments(partitions ...int64) (sealed []SnapshotItem, growing []SegmentEntry, version int64, err error) { d.mut.RLock() defer d.mut.RUnlock() diff --git a/internal/querynodev2/delegator/distribution_test.go b/internal/querynodev2/delegator/distribution_test.go index d5fa335384..aa8c534e7d 100644 --- a/internal/querynodev2/delegator/distribution_test.go +++ b/internal/querynodev2/delegator/distribution_test.go @@ -21,8 +21,6 @@ import ( "time" "github.com/stretchr/testify/suite" - - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" ) type DistributionSuite struct { @@ -31,7 +29,7 @@ type DistributionSuite struct { } func (s *DistributionSuite) SetupTest() { - s.dist = NewDistribution(NewIDFOracle([]*schemapb.FunctionSchema{})) + s.dist = NewDistribution() s.Equal(initialTargetVersion, s.dist.getTargetVersion()) }