milvus/internal/querynode/shard_cluster_service.go

160 lines
4.8 KiB
Go

package querynode
import (
"context"
"errors"
"fmt"
"path"
"strconv"
"sync"
grpcquerynodeclient "github.com/milvus-io/milvus/internal/distributed/querynode/client"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
const (
ReplicaMetaPrefix = "querycoord-replica"
)
// shardQueryNodeWrapper wraps a querynode to shardQueryNode and preventing it been closed
type shardQueryNodeWrapper struct {
*QueryNode
}
// Stop overrides default close method
func (w *shardQueryNodeWrapper) Stop() error { return nil }
// ShardClusterService maintains the online ShardCluster(leader) in this querynode.
type ShardClusterService struct {
client *clientv3.Client // etcd client for detectors
session *sessionutil.Session
node *QueryNode
clusters sync.Map // channel name => *shardCluster
}
// newShardClusterService returns a new shardClusterService
func newShardClusterService(client *clientv3.Client, session *sessionutil.Session, node *QueryNode) *ShardClusterService {
return &ShardClusterService{
node: node,
session: session,
client: client,
clusters: sync.Map{},
}
}
// addShardCluster adds shardCluster into service.
func (s *ShardClusterService) addShardCluster(collectionID, replicaID int64, vchannelName string) {
nodeDetector := NewEtcdShardNodeDetector(s.client, path.Join(Params.EtcdCfg.MetaRootPath, ReplicaMetaPrefix),
func() (map[int64]string, error) {
result := make(map[int64]string)
sessions, _, err := s.session.GetSessions(typeutil.QueryNodeRole)
if err != nil {
return nil, err
}
for _, session := range sessions {
result[session.ServerID] = session.Address
}
return result, nil
})
segmentDetector := NewEtcdShardSegmentDetector(s.client, path.Join(Params.EtcdCfg.MetaRootPath, util.SegmentMetaPrefix, strconv.FormatInt(collectionID, 10)))
cs := NewShardCluster(collectionID, replicaID, vchannelName, nodeDetector, segmentDetector,
func(nodeID int64, addr string) shardQueryNode {
if nodeID == s.session.ServerID {
// wrap node itself
return &shardQueryNodeWrapper{QueryNode: s.node}
}
ctx := context.Background()
qn, _ := grpcquerynodeclient.NewClient(ctx, addr)
return qn
})
s.clusters.Store(vchannelName, cs)
log.Info("successfully add shard cluster", zap.Int64("collectionID", collectionID), zap.Int64("replica", replicaID), zap.String("vchan", vchannelName))
}
// getShardCluster gets shardCluster of specified vchannel if exists.
func (s *ShardClusterService) getShardCluster(vchannelName string) (*ShardCluster, bool) {
raw, ok := s.clusters.Load(vchannelName)
if !ok {
return nil, false
}
return raw.(*ShardCluster), true
}
// releaseShardCluster removes shardCluster from service and stops it.
func (s *ShardClusterService) releaseShardCluster(vchannelName string) error {
raw, ok := s.clusters.LoadAndDelete(vchannelName)
if !ok {
return fmt.Errorf("ShardCluster of channel: %s does not exists", vchannelName)
}
cs := raw.(*ShardCluster)
cs.Close()
return nil
}
func (s *ShardClusterService) close() error {
log.Debug("start to close shard cluster service")
isFinish := true
s.clusters.Range(func(key, value any) bool {
cs, ok := value.(*ShardCluster)
if !ok {
log.Error("convert to ShardCluster fail, close shard cluster is interrupted", zap.Any("key", key))
isFinish = false
return false
}
cs.Close()
return true
})
if isFinish {
return nil
}
return errors.New("close shard cluster failed")
}
// releaseCollection removes all shardCluster matching specified collectionID
func (s *ShardClusterService) releaseCollection(collectionID int64) {
s.clusters.Range(func(k, v interface{}) bool {
cs := v.(*ShardCluster)
if cs.collectionID == collectionID {
s.releaseShardCluster(k.(string))
}
return true
})
log.Info("successfully release collection", zap.Int64("collectionID", collectionID))
}
// SyncReplicaSegments dispatches nodeID segments distribution to ShardCluster.
func (s *ShardClusterService) SyncReplicaSegments(vchannelName string, distribution []*querypb.ReplicaSegmentsInfo) error {
sc, ok := s.getShardCluster(vchannelName)
if !ok {
return fmt.Errorf("Leader of VChannel %s is not this QueryNode %d", vchannelName, s.session.ServerID)
}
sc.SyncSegments(distribution, segmentStateLoaded)
log.Info("successfully sync segments", zap.String("channel", vchannelName), zap.Any("distribution", distribution))
return nil
}
func (s *ShardClusterService) GetShardClusters() []*ShardCluster {
ret := make([]*ShardCluster, 0)
s.clusters.Range(func(key, value any) bool {
ret = append(ret, value.(*ShardCluster))
return true
})
return ret
}