milvus/internal/querycoordv2/job/job.go

538 lines
16 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package job
import (
"context"
"fmt"
"time"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
// Job is request of loading/releasing collection/partitions,
// the execution flow is:
// 1. PreExecute()
// 2. Execute(), skip this step if PreExecute() failed
// 3. PostExecute()
type Job interface {
MsgID() int64
CollectionID() int64
Context() context.Context
// PreExecute does checks, DO NOT persists any thing within this stage,
PreExecute() error
// Execute processes the request
Execute() error
// PostExecute clears resources, it will be always processed
PostExecute()
Error() error
SetError(err error)
Done()
Wait() error
}
type BaseJob struct {
ctx context.Context
msgID int64
collectionID int64
err error
doneCh chan struct{}
}
func NewBaseJob(ctx context.Context, msgID, collectionID int64) *BaseJob {
return &BaseJob{
ctx: ctx,
msgID: msgID,
collectionID: collectionID,
doneCh: make(chan struct{}),
}
}
func (job *BaseJob) MsgID() int64 {
return job.msgID
}
func (job *BaseJob) CollectionID() int64 {
return job.collectionID
}
func (job *BaseJob) Context() context.Context {
return job.ctx
}
func (job *BaseJob) Error() error {
return job.err
}
func (job *BaseJob) SetError(err error) {
job.err = err
}
func (job *BaseJob) Done() {
close(job.doneCh)
}
func (job *BaseJob) Wait() error {
<-job.doneCh
return job.err
}
func (job *BaseJob) PreExecute() error {
return nil
}
func (job *BaseJob) PostExecute() {}
type LoadCollectionJob struct {
*BaseJob
req *querypb.LoadCollectionRequest
dist *meta.DistributionManager
meta *meta.Meta
targetMgr *meta.TargetManager
broker meta.Broker
nodeMgr *session.NodeManager
}
func NewLoadCollectionJob(
ctx context.Context,
req *querypb.LoadCollectionRequest,
dist *meta.DistributionManager,
meta *meta.Meta,
targetMgr *meta.TargetManager,
broker meta.Broker,
nodeMgr *session.NodeManager,
) *LoadCollectionJob {
return &LoadCollectionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
dist: dist,
meta: meta,
targetMgr: targetMgr,
broker: broker,
nodeMgr: nodeMgr,
}
}
func (job *LoadCollectionJob) PreExecute() error {
req := job.req
log := log.Ctx(job.ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
if req.GetReplicaNumber() <= 0 {
log.Info("request doesn't indicate the number of replicas, set it to 1",
zap.Int32("replicaNumber", req.GetReplicaNumber()))
req.ReplicaNumber = 1
}
if job.meta.Exist(req.GetCollectionID()) {
old := job.meta.GetCollection(req.GetCollectionID())
if old == nil {
msg := "load the partition after load collection is not supported"
log.Warn(msg)
return utils.WrapError(msg, ErrLoadParameterMismatched)
} else if old.GetReplicaNumber() != req.GetReplicaNumber() {
msg := fmt.Sprintf("collection with different replica number %d existed, release this collection first before changing its replica number",
job.meta.GetReplicaNumber(req.GetCollectionID()),
)
log.Warn(msg)
return utils.WrapError(msg, ErrLoadParameterMismatched)
} else if !typeutil.MapEqual(old.GetFieldIndexID(), req.GetFieldIndexID()) {
msg := fmt.Sprintf("collection with different index %v existed, release this collection first before changing its index",
old.GetFieldIndexID())
log.Warn(msg)
return utils.WrapError(msg, ErrLoadParameterMismatched)
}
return ErrCollectionLoaded
}
if len(job.nodeMgr.GetAll()) < int(job.req.GetReplicaNumber()) {
msg := "no enough nodes to create replicas"
log.Warn(msg)
return utils.WrapError(msg, ErrNoEnoughNode)
}
return nil
}
func (job *LoadCollectionJob) Execute() error {
req := job.req
log := log.Ctx(job.ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
// Clear stale replicas
err := job.meta.ReplicaManager.RemoveCollection(req.GetCollectionID())
if err != nil {
log.Warn("failed to clear stale replicas", zap.Error(err))
return err
}
// Create replicas
replicas, err := utils.SpawnReplicas(job.meta.ReplicaManager,
job.nodeMgr,
req.GetCollectionID(),
req.GetReplicaNumber())
if err != nil {
msg := "failed to spawn replica for collection"
log.Error(msg, zap.Error(err))
return utils.WrapError(msg, err)
}
for _, replica := range replicas {
log.Info("replica created",
zap.Int64("replicaID", replica.GetID()),
zap.Int64s("nodes", replica.GetNodes()))
}
// Fetch channels and segments from DataCoord
partitionIDs, err := job.broker.GetPartitions(job.ctx, req.GetCollectionID())
if err != nil {
msg := "failed to get partitions from RootCoord"
log.Error(msg, zap.Error(err))
return utils.WrapError(msg, err)
}
err = job.targetMgr.UpdateCollectionNextTargetWithPartitions(req.GetCollectionID(), partitionIDs...)
if err != nil {
msg := "failed to update next targets for collection"
log.Error(msg, zap.Error(err))
return utils.WrapError(msg, err)
}
err = job.meta.CollectionManager.PutCollection(&meta.Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: req.GetCollectionID(),
ReplicaNumber: req.GetReplicaNumber(),
Status: querypb.LoadStatus_Loading,
FieldIndexID: req.GetFieldIndexID(),
},
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
})
if err != nil {
msg := "failed to store collection"
log.Error(msg, zap.Error(err))
return utils.WrapError(msg, err)
}
metrics.QueryCoordNumCollections.WithLabelValues().Inc()
return nil
}
func (job *LoadCollectionJob) PostExecute() {
if job.Error() != nil && !job.meta.Exist(job.CollectionID()) {
job.meta.ReplicaManager.RemoveCollection(job.CollectionID())
job.targetMgr.RemoveCollection(job.req.GetCollectionID())
}
}
type ReleaseCollectionJob struct {
*BaseJob
req *querypb.ReleaseCollectionRequest
dist *meta.DistributionManager
meta *meta.Meta
targetMgr *meta.TargetManager
}
func NewReleaseCollectionJob(ctx context.Context,
req *querypb.ReleaseCollectionRequest,
dist *meta.DistributionManager,
meta *meta.Meta,
targetMgr *meta.TargetManager,
) *ReleaseCollectionJob {
return &ReleaseCollectionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
dist: dist,
meta: meta,
targetMgr: targetMgr,
}
}
func (job *ReleaseCollectionJob) Execute() error {
req := job.req
log := log.Ctx(job.ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
if !job.meta.CollectionManager.Exist(req.GetCollectionID()) {
log.Info("release collection end, the collection has not been loaded into QueryNode")
return nil
}
err := job.meta.CollectionManager.RemoveCollection(req.GetCollectionID())
if err != nil {
msg := "failed to remove collection"
log.Warn(msg, zap.Error(err))
return utils.WrapError(msg, err)
}
err = job.meta.ReplicaManager.RemoveCollection(req.GetCollectionID())
if err != nil {
msg := "failed to remove replicas"
log.Warn(msg, zap.Error(err))
}
job.targetMgr.RemoveCollection(req.GetCollectionID())
waitCollectionReleased(job.dist, req.GetCollectionID())
metrics.QueryCoordNumCollections.WithLabelValues().Dec()
return nil
}
type LoadPartitionJob struct {
*BaseJob
req *querypb.LoadPartitionsRequest
dist *meta.DistributionManager
meta *meta.Meta
targetMgr *meta.TargetManager
broker meta.Broker
nodeMgr *session.NodeManager
}
func NewLoadPartitionJob(
ctx context.Context,
req *querypb.LoadPartitionsRequest,
dist *meta.DistributionManager,
meta *meta.Meta,
targetMgr *meta.TargetManager,
broker meta.Broker,
nodeMgr *session.NodeManager,
) *LoadPartitionJob {
return &LoadPartitionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
dist: dist,
meta: meta,
targetMgr: targetMgr,
broker: broker,
nodeMgr: nodeMgr,
}
}
func (job *LoadPartitionJob) PreExecute() error {
req := job.req
log := log.Ctx(job.ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
if req.GetReplicaNumber() <= 0 {
log.Info("request doesn't indicate the number of replicas, set it to 1",
zap.Int32("replicaNumber", req.GetReplicaNumber()))
req.ReplicaNumber = 1
}
if job.meta.Exist(req.GetCollectionID()) {
old := job.meta.GetCollection(req.GetCollectionID())
if old != nil {
msg := "load the partition after load collection is not supported"
log.Warn(msg)
return utils.WrapError(msg, ErrLoadParameterMismatched)
} else if job.meta.GetReplicaNumber(req.GetCollectionID()) != req.GetReplicaNumber() {
msg := "collection with different replica number existed, release this collection first before changing its replica number"
log.Warn(msg)
return utils.WrapError(msg, ErrLoadParameterMismatched)
} else if !typeutil.MapEqual(job.meta.GetFieldIndex(req.GetCollectionID()), req.GetFieldIndexID()) {
msg := fmt.Sprintf("collection with different index %v existed, release this collection first before changing its index",
job.meta.GetFieldIndex(req.GetCollectionID()))
log.Warn(msg)
return utils.WrapError(msg, ErrLoadParameterMismatched)
}
// Check whether one of the given partitions not loaded
for _, partitionID := range req.GetPartitionIDs() {
partition := job.meta.GetPartition(partitionID)
if partition == nil {
msg := fmt.Sprintf("some partitions %v of collection %v has been loaded into QueryNode, please release partitions firstly",
req.GetPartitionIDs(),
req.GetCollectionID())
log.Warn(msg)
return utils.WrapError(msg, ErrLoadParameterMismatched)
}
}
return ErrCollectionLoaded
}
if len(job.nodeMgr.GetAll()) < int(job.req.GetReplicaNumber()) {
msg := "no enough nodes to create replicas"
log.Warn(msg)
return utils.WrapError(msg, ErrNoEnoughNode)
}
return nil
}
func (job *LoadPartitionJob) Execute() error {
req := job.req
log := log.Ctx(job.ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
zap.Int64s("partitionIDs", req.GetPartitionIDs()),
)
// Clear stale replicas
err := job.meta.ReplicaManager.RemoveCollection(req.GetCollectionID())
if err != nil {
log.Warn("failed to clear stale replicas", zap.Error(err))
return err
}
// Create replicas
replicas, err := utils.SpawnReplicas(job.meta.ReplicaManager,
job.nodeMgr,
req.GetCollectionID(),
req.GetReplicaNumber())
if err != nil {
msg := "failed to spawn replica for collection"
log.Error(msg, zap.Error(err))
return utils.WrapError(msg, err)
}
for _, replica := range replicas {
log.Info("replica created",
zap.Int64("replicaID", replica.GetID()),
zap.Int64s("nodes", replica.GetNodes()))
}
err = job.targetMgr.UpdateCollectionNextTargetWithPartitions(req.GetCollectionID(), req.GetPartitionIDs()...)
if err != nil {
msg := "failed to update next targets for collection"
log.Error(msg,
zap.Int64s("partitionIDs", req.GetPartitionIDs()),
zap.Error(err))
return utils.WrapError(msg, err)
}
partitions := lo.Map(req.GetPartitionIDs(), func(partition int64, _ int) *meta.Partition {
return &meta.Partition{
PartitionLoadInfo: &querypb.PartitionLoadInfo{
CollectionID: req.GetCollectionID(),
PartitionID: partition,
ReplicaNumber: req.GetReplicaNumber(),
Status: querypb.LoadStatus_Loading,
FieldIndexID: req.GetFieldIndexID(),
},
CreatedAt: time.Now(),
}
})
err = job.meta.CollectionManager.PutPartition(partitions...)
if err != nil {
msg := "failed to store partitions"
log.Error(msg, zap.Error(err))
return utils.WrapError(msg, err)
}
metrics.QueryCoordNumCollections.WithLabelValues().Inc()
return nil
}
func (job *LoadPartitionJob) PostExecute() {
if job.Error() != nil && !job.meta.Exist(job.CollectionID()) {
job.meta.ReplicaManager.RemoveCollection(job.CollectionID())
job.targetMgr.RemoveCollection(job.req.GetCollectionID())
}
}
type ReleasePartitionJob struct {
*BaseJob
req *querypb.ReleasePartitionsRequest
dist *meta.DistributionManager
meta *meta.Meta
targetMgr *meta.TargetManager
}
func NewReleasePartitionJob(ctx context.Context,
req *querypb.ReleasePartitionsRequest,
dist *meta.DistributionManager,
meta *meta.Meta,
targetMgr *meta.TargetManager,
) *ReleasePartitionJob {
return &ReleasePartitionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
dist: dist,
meta: meta,
targetMgr: targetMgr,
}
}
func (job *ReleasePartitionJob) PreExecute() error {
log := log.Ctx(job.ctx).With(
zap.Int64("collectionID", job.req.GetCollectionID()),
)
if job.meta.CollectionManager.GetLoadType(job.req.GetCollectionID()) == querypb.LoadType_LoadCollection {
msg := "releasing some partitions after load collection is not supported"
log.Warn(msg)
return utils.WrapError(msg, ErrLoadParameterMismatched)
}
return nil
}
func (job *ReleasePartitionJob) Execute() error {
req := job.req
log := log.Ctx(job.ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
if !job.meta.CollectionManager.Exist(req.GetCollectionID()) {
log.Info("release collection end, the collection has not been loaded into QueryNode")
return nil
}
loadedPartitions := job.meta.CollectionManager.GetPartitionsByCollection(req.GetCollectionID())
partitionIDs := typeutil.NewUniqueSet(req.GetPartitionIDs()...)
toRelease := make([]int64, 0)
for _, partition := range loadedPartitions {
if partitionIDs.Contain(partition.GetPartitionID()) {
toRelease = append(toRelease, partition.GetPartitionID())
}
}
if len(toRelease) == len(loadedPartitions) { // All partitions are released, clear all
log.Info("release partitions covers all partitions, will remove the whole collection")
err := job.meta.CollectionManager.RemoveCollection(req.GetCollectionID())
if err != nil {
msg := "failed to release partitions from store"
log.Warn(msg, zap.Error(err))
return utils.WrapError(msg, err)
}
err = job.meta.ReplicaManager.RemoveCollection(req.GetCollectionID())
if err != nil {
log.Warn("failed to remove replicas", zap.Error(err))
}
job.targetMgr.RemoveCollection(req.GetCollectionID())
waitCollectionReleased(job.dist, req.GetCollectionID())
} else {
err := job.meta.CollectionManager.RemovePartition(toRelease...)
if err != nil {
msg := "failed to release partitions from store"
log.Warn(msg, zap.Error(err))
return utils.WrapError(msg, err)
}
job.targetMgr.RemovePartition(req.GetCollectionID(), toRelease...)
waitCollectionReleased(job.dist, req.GetCollectionID(), toRelease...)
}
metrics.QueryCoordNumCollections.WithLabelValues().Dec()
return nil
}