Check whether leader view exists (#19237)

Signed-off-by: yah01 <yang.cen@zilliz.com>

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/19292/head
yah01 2022-09-20 16:10:50 +08:00 committed by GitHub
parent 77aee8cb65
commit 539585e91b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 18 deletions

View File

@ -164,13 +164,13 @@ func (c *SegmentChecker) findNeedReleasedGrowingSegments(replica *meta.Replica)
ret := make(map[int64][]int64, 0) // leaderID -> segment ids
leaders := c.dist.ChannelDistManager.GetShardLeadersByReplica(replica)
for shard, leaderID := range leaders {
lview := c.dist.LeaderViewManager.GetLeaderShardView(leaderID, shard)
if lview == nil {
leaderView := c.dist.LeaderViewManager.GetLeaderShardView(leaderID, shard)
if leaderView == nil {
continue
}
// find growing segments from leaderview's sealed segments
// because growing segments should be released only after loading the compaction created segment successfully.
for sid := range lview.Segments {
for sid := range leaderView.Segments {
segment := c.targetMgr.GetSegment(sid)
if segment == nil {
continue
@ -178,8 +178,8 @@ func (c *SegmentChecker) findNeedReleasedGrowingSegments(replica *meta.Replica)
sources := append(segment.GetCompactionFrom(), segment.GetID())
for _, source := range sources {
if lview.GrowingSegments.Contain(source) {
ret[lview.ID] = append(ret[lview.ID], source)
if leaderView.GrowingSegments.Contain(source) {
ret[leaderView.ID] = append(ret[leaderView.ID], source)
}
}
}

View File

@ -120,8 +120,7 @@ func NewLoadCollectionJob(
func (job *LoadCollectionJob) PreExecute() error {
req := job.req
log := log.With(
zap.Int64("msgID", req.Base.GetMsgID()),
log := log.Ctx(job.ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
@ -153,8 +152,7 @@ func (job *LoadCollectionJob) PreExecute() error {
func (job *LoadCollectionJob) Execute() error {
req := job.req
log := log.With(
zap.Int64("msgID", req.GetBase().GetMsgID()),
log := log.Ctx(job.ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
@ -249,8 +247,7 @@ func NewReleaseCollectionJob(ctx context.Context,
func (job *ReleaseCollectionJob) Execute() error {
req := job.req
log := log.With(
zap.Int64("msgID", req.GetBase().GetMsgID()),
log := log.Ctx(job.ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
if !job.meta.CollectionManager.Exist(req.GetCollectionID()) {
@ -314,8 +311,7 @@ func NewLoadPartitionJob(
func (job *LoadPartitionJob) PreExecute() error {
req := job.req
log := log.With(
zap.Int64("msgID", req.Base.GetMsgID()),
log := log.Ctx(job.ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
@ -357,8 +353,7 @@ func (job *LoadPartitionJob) PreExecute() error {
func (job *LoadPartitionJob) Execute() error {
req := job.req
log := log.With(
zap.Int64("msgID", req.GetBase().GetMsgID()),
log := log.Ctx(job.ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
@ -448,6 +443,9 @@ func NewReleasePartitionJob(ctx context.Context,
}
func (job *ReleasePartitionJob) PreExecute() error {
log := log.Ctx(job.ctx).With(
zap.Int64("collectionID", job.req.GetCollectionID()),
)
if job.meta.CollectionManager.GetLoadType(job.req.GetCollectionID()) == querypb.LoadType_LoadCollection {
msg := "releasing some partitions after load collection is not supported"
log.Warn(msg)
@ -458,11 +456,9 @@ func (job *ReleasePartitionJob) PreExecute() error {
func (job *ReleasePartitionJob) Execute() error {
req := job.req
log := log.With(
zap.Int64("msgID", req.GetBase().GetMsgID()),
log := log.Ctx(job.ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
if !job.meta.CollectionManager.Exist(req.GetCollectionID()) {
log.Info("release collection end, the collection has not been loaded into QueryNode")
return nil