mirror of https://github.com/milvus-io/milvus.git
418 lines
15 KiB
Go
418 lines
15 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package meta
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"time"
|
|
|
|
"github.com/cockroachdb/errors"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
|
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
|
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
|
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
|
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
|
"github.com/milvus-io/milvus/internal/types"
|
|
"github.com/milvus-io/milvus/pkg/common"
|
|
"github.com/milvus-io/milvus/pkg/log"
|
|
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
|
"github.com/milvus-io/milvus/pkg/util/retry"
|
|
. "github.com/milvus-io/milvus/pkg/util/typeutil"
|
|
)
|
|
|
|
type Broker interface {
|
|
DescribeCollection(ctx context.Context, collectionID UniqueID) (*milvuspb.DescribeCollectionResponse, error)
|
|
GetPartitions(ctx context.Context, collectionID UniqueID) ([]UniqueID, error)
|
|
GetRecoveryInfo(ctx context.Context, collectionID UniqueID, partitionID UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentBinlogs, error)
|
|
ListIndexes(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error)
|
|
GetSegmentInfo(ctx context.Context, segmentID ...UniqueID) ([]*datapb.SegmentInfo, error)
|
|
GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentIDs ...UniqueID) (map[int64][]*querypb.FieldIndexInfo, error)
|
|
GetRecoveryInfoV2(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error)
|
|
DescribeDatabase(ctx context.Context, dbName string) (*rootcoordpb.DescribeDatabaseResponse, error)
|
|
GetCollectionLoadInfo(ctx context.Context, collectionID UniqueID) ([]string, int64, error)
|
|
}
|
|
|
|
type CoordinatorBroker struct {
|
|
dataCoord types.DataCoordClient
|
|
rootCoord types.RootCoordClient
|
|
}
|
|
|
|
func NewCoordinatorBroker(
|
|
dataCoord types.DataCoordClient,
|
|
rootCoord types.RootCoordClient,
|
|
) *CoordinatorBroker {
|
|
return &CoordinatorBroker{
|
|
dataCoord,
|
|
rootCoord,
|
|
}
|
|
}
|
|
|
|
func (broker *CoordinatorBroker) DescribeCollection(ctx context.Context, collectionID UniqueID) (*milvuspb.DescribeCollectionResponse, error) {
|
|
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
|
|
defer cancel()
|
|
|
|
req := &milvuspb.DescribeCollectionRequest{
|
|
Base: commonpbutil.NewMsgBase(
|
|
commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
|
|
),
|
|
// please do not specify the collection name alone after database feature.
|
|
CollectionID: collectionID,
|
|
}
|
|
resp, err := broker.rootCoord.DescribeCollection(ctx, req)
|
|
if err := merr.CheckRPCCall(resp, err); err != nil {
|
|
log.Ctx(ctx).Warn("failed to get collection schema", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (broker *CoordinatorBroker) DescribeDatabase(ctx context.Context, dbName string) (*rootcoordpb.DescribeDatabaseResponse, error) {
|
|
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
|
|
defer cancel()
|
|
|
|
req := &rootcoordpb.DescribeDatabaseRequest{
|
|
Base: commonpbutil.NewMsgBase(
|
|
commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
|
|
),
|
|
DbName: dbName,
|
|
}
|
|
resp, err := broker.rootCoord.DescribeDatabase(ctx, req)
|
|
if err := merr.CheckRPCCall(resp, err); err != nil {
|
|
log.Ctx(ctx).Warn("failed to describe database", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// try to get database level replica_num and resource groups, return (resource_groups, replica_num, error)
|
|
func (broker *CoordinatorBroker) GetCollectionLoadInfo(ctx context.Context, collectionID UniqueID) ([]string, int64, error) {
|
|
collectionInfo, err := broker.DescribeCollection(ctx, collectionID)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
log := log.Ctx(ctx)
|
|
replicaNum, err := common.CollectionLevelReplicaNumber(collectionInfo.GetProperties())
|
|
if err != nil {
|
|
log.Debug("failed to get collection level load info", zap.Int64("collectionID", collectionID), zap.Error(err))
|
|
} else if replicaNum > 0 {
|
|
log.Info("get collection level load info", zap.Int64("collectionID", collectionID), zap.Int64("replica_num", replicaNum))
|
|
}
|
|
|
|
rgs, err := common.CollectionLevelResourceGroups(collectionInfo.GetProperties())
|
|
if err != nil {
|
|
log.Debug("failed to get collection level load info", zap.Int64("collectionID", collectionID), zap.Error(err))
|
|
} else if len(rgs) > 0 {
|
|
log.Info("get collection level load info", zap.Int64("collectionID", collectionID), zap.Strings("resource_groups", rgs))
|
|
}
|
|
|
|
if replicaNum <= 0 || len(rgs) == 0 {
|
|
dbInfo, err := broker.DescribeDatabase(ctx, collectionInfo.GetDbName())
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
if replicaNum <= 0 {
|
|
replicaNum, err = common.DatabaseLevelReplicaNumber(dbInfo.GetProperties())
|
|
if err != nil {
|
|
log.Debug("failed to get database level load info", zap.Int64("collectionID", collectionID), zap.Error(err))
|
|
} else if replicaNum > 0 {
|
|
log.Info("get database level load info", zap.Int64("collectionID", collectionID), zap.Int64("replica_num", replicaNum))
|
|
}
|
|
}
|
|
|
|
if len(rgs) == 0 {
|
|
rgs, err = common.DatabaseLevelResourceGroups(dbInfo.GetProperties())
|
|
if err != nil {
|
|
log.Debug("failed to get database level load info", zap.Int64("collectionID", collectionID), zap.Error(err))
|
|
} else if len(rgs) > 0 {
|
|
log.Info("get database level load info", zap.Int64("collectionID", collectionID), zap.Strings("resource_groups", rgs))
|
|
}
|
|
}
|
|
}
|
|
|
|
if replicaNum <= 0 || len(rgs) == 0 {
|
|
if replicaNum <= 0 {
|
|
replicaNum = paramtable.Get().QueryCoordCfg.ClusterLevelLoadReplicaNumber.GetAsInt64()
|
|
if replicaNum > 0 {
|
|
log.Info("get cluster level load info", zap.Int64("collectionID", collectionID), zap.Int64("replica_num", replicaNum))
|
|
}
|
|
}
|
|
|
|
if len(rgs) == 0 {
|
|
rgs = paramtable.Get().QueryCoordCfg.ClusterLevelLoadResourceGroups.GetAsStrings()
|
|
if len(rgs) > 0 {
|
|
log.Info("get cluster level load info", zap.Int64("collectionID", collectionID), zap.Strings("resource_groups", rgs))
|
|
}
|
|
}
|
|
}
|
|
|
|
return rgs, replicaNum, nil
|
|
}
|
|
|
|
func (broker *CoordinatorBroker) GetPartitions(ctx context.Context, collectionID UniqueID) ([]UniqueID, error) {
|
|
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
|
|
defer cancel()
|
|
log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID))
|
|
req := &milvuspb.ShowPartitionsRequest{
|
|
Base: commonpbutil.NewMsgBase(
|
|
commonpbutil.WithMsgType(commonpb.MsgType_ShowPartitions),
|
|
),
|
|
// please do not specify the collection name alone after database feature.
|
|
CollectionID: collectionID,
|
|
}
|
|
resp, err := broker.rootCoord.ShowPartitions(ctx, req)
|
|
if err := merr.CheckRPCCall(resp, err); err != nil {
|
|
log.Warn("failed to get partitions", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
return resp.PartitionIDs, nil
|
|
}
|
|
|
|
func (broker *CoordinatorBroker) GetRecoveryInfo(ctx context.Context, collectionID UniqueID, partitionID UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentBinlogs, error) {
|
|
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
|
|
defer cancel()
|
|
log := log.Ctx(ctx).With(
|
|
zap.Int64("collectionID", collectionID),
|
|
zap.Int64("partitionID", partitionID),
|
|
)
|
|
|
|
getRecoveryInfoRequest := &datapb.GetRecoveryInfoRequest{
|
|
Base: commonpbutil.NewMsgBase(
|
|
commonpbutil.WithMsgType(commonpb.MsgType_GetRecoveryInfo),
|
|
),
|
|
CollectionID: collectionID,
|
|
PartitionID: partitionID,
|
|
}
|
|
recoveryInfo, err := broker.dataCoord.GetRecoveryInfo(ctx, getRecoveryInfoRequest)
|
|
if err := merr.CheckRPCCall(recoveryInfo, err); err != nil {
|
|
log.Warn("get recovery info failed", zap.Error(err))
|
|
return nil, nil, err
|
|
}
|
|
|
|
// fallback binlog memory size to log size when it is zero
|
|
fallbackBinlogMemorySize := func(binlogs []*datapb.FieldBinlog) {
|
|
for _, insertBinlogs := range binlogs {
|
|
for _, b := range insertBinlogs.GetBinlogs() {
|
|
if b.GetMemorySize() == 0 {
|
|
b.MemorySize = b.GetLogSize()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
for _, segBinlogs := range recoveryInfo.GetBinlogs() {
|
|
fallbackBinlogMemorySize(segBinlogs.GetFieldBinlogs())
|
|
fallbackBinlogMemorySize(segBinlogs.GetStatslogs())
|
|
fallbackBinlogMemorySize(segBinlogs.GetDeltalogs())
|
|
}
|
|
|
|
return recoveryInfo.Channels, recoveryInfo.Binlogs, nil
|
|
}
|
|
|
|
func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error) {
|
|
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
|
|
defer cancel()
|
|
log := log.Ctx(ctx).With(
|
|
zap.Int64("collectionID", collectionID),
|
|
zap.Int64s("partitionIDis", partitionIDs),
|
|
)
|
|
|
|
getRecoveryInfoRequest := &datapb.GetRecoveryInfoRequestV2{
|
|
Base: commonpbutil.NewMsgBase(
|
|
commonpbutil.WithMsgType(commonpb.MsgType_GetRecoveryInfo),
|
|
),
|
|
CollectionID: collectionID,
|
|
PartitionIDs: partitionIDs,
|
|
}
|
|
recoveryInfo, err := broker.dataCoord.GetRecoveryInfoV2(ctx, getRecoveryInfoRequest)
|
|
|
|
if err := merr.CheckRPCCall(recoveryInfo, err); err != nil {
|
|
log.Warn("get recovery info failed", zap.Error(err))
|
|
return nil, nil, err
|
|
}
|
|
|
|
return recoveryInfo.Channels, recoveryInfo.Segments, nil
|
|
}
|
|
|
|
func (broker *CoordinatorBroker) GetSegmentInfo(ctx context.Context, ids ...UniqueID) ([]*datapb.SegmentInfo, error) {
|
|
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
|
|
defer cancel()
|
|
|
|
log := log.Ctx(ctx).With(
|
|
zap.Int64s("segments", ids),
|
|
)
|
|
|
|
getSegmentInfo := func(ids []UniqueID) (*datapb.GetSegmentInfoResponse, error) {
|
|
req := &datapb.GetSegmentInfoRequest{
|
|
SegmentIDs: ids,
|
|
IncludeUnHealthy: true,
|
|
}
|
|
resp, err := broker.dataCoord.GetSegmentInfo(ctx, req)
|
|
if err := merr.CheckRPCCall(resp, err); err != nil {
|
|
log.Warn("failed to get segment info from DataCoord", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
if len(resp.Infos) == 0 {
|
|
log.Warn("No such segment in DataCoord")
|
|
return nil, fmt.Errorf("no such segment in DataCoord")
|
|
}
|
|
|
|
err = binlog.DecompressMultiBinLogs(resp.GetInfos())
|
|
if err != nil {
|
|
log.Warn("failed to DecompressMultiBinLogs", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
ret := make([]*datapb.SegmentInfo, 0, len(ids))
|
|
batchSize := 1000
|
|
startIdx := 0
|
|
for startIdx < len(ids) {
|
|
endIdx := int(math.Min(float64(startIdx+batchSize), float64(len(ids))))
|
|
|
|
resp, err := getSegmentInfo(ids[startIdx:endIdx])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ret = append(ret, resp.GetInfos()...)
|
|
startIdx += batchSize
|
|
}
|
|
|
|
return ret, nil
|
|
}
|
|
|
|
func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentIDs ...UniqueID) (map[int64][]*querypb.FieldIndexInfo, error) {
|
|
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
|
|
defer cancel()
|
|
|
|
log := log.Ctx(ctx).With(
|
|
zap.Int64("collectionID", collectionID),
|
|
zap.Int64s("segmentIDs", segmentIDs),
|
|
)
|
|
|
|
// during rolling upgrade, query coord may connect to datacoord with version 2.2, which will return merr.ErrServiceUnimplemented
|
|
// we add retry here to retry the request until context done, and if new data coord start up, it will success
|
|
var resp *indexpb.GetIndexInfoResponse
|
|
var err error
|
|
retry.Do(ctx, func() error {
|
|
resp, err = broker.dataCoord.GetIndexInfos(ctx, &indexpb.GetIndexInfoRequest{
|
|
CollectionID: collectionID,
|
|
SegmentIDs: segmentIDs,
|
|
})
|
|
|
|
if errors.Is(err, merr.ErrServiceUnimplemented) {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err := merr.CheckRPCCall(resp, err); err != nil {
|
|
log.Warn("failed to get segment index info", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
if resp.GetSegmentInfo() == nil {
|
|
err = merr.WrapErrIndexNotFoundForSegments(segmentIDs)
|
|
log.Warn("failed to get segments index info",
|
|
zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
indexes := make(map[int64][]*querypb.FieldIndexInfo, 0)
|
|
for segmentID, segmentInfo := range resp.GetSegmentInfo() {
|
|
indexes[segmentID] = make([]*querypb.FieldIndexInfo, 0)
|
|
for _, info := range segmentInfo.GetIndexInfos() {
|
|
indexes[segmentID] = append(indexes[segmentID], &querypb.FieldIndexInfo{
|
|
FieldID: info.GetFieldID(),
|
|
EnableIndex: true, // deprecated, but keep it for compatibility
|
|
IndexName: info.GetIndexName(),
|
|
IndexID: info.GetIndexID(),
|
|
BuildID: info.GetBuildID(),
|
|
IndexParams: info.GetIndexParams(),
|
|
IndexFilePaths: info.GetIndexFilePaths(),
|
|
IndexSize: int64(info.GetSerializedSize()),
|
|
IndexVersion: info.GetIndexVersion(),
|
|
NumRows: info.GetNumRows(),
|
|
CurrentIndexVersion: info.GetCurrentIndexVersion(),
|
|
})
|
|
}
|
|
}
|
|
|
|
return indexes, nil
|
|
}
|
|
|
|
func (broker *CoordinatorBroker) describeIndex(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error) {
|
|
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
|
|
defer cancel()
|
|
|
|
// during rolling upgrade, query coord may connect to datacoord with version 2.2, which will return merr.ErrServiceUnimplemented
|
|
// we add retry here to retry the request until context done, and if new data coord start up, it will success
|
|
var resp *indexpb.DescribeIndexResponse
|
|
var err error
|
|
retry.Do(ctx, func() error {
|
|
resp, err = broker.dataCoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{
|
|
CollectionID: collectionID,
|
|
})
|
|
if errors.Is(err, merr.ErrServiceUnimplemented) {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err := merr.CheckRPCCall(resp, err); err != nil {
|
|
log.Error("failed to fetch index meta",
|
|
zap.Int64("collection", collectionID),
|
|
zap.Error(err))
|
|
return nil, err
|
|
}
|
|
return resp.GetIndexInfos(), nil
|
|
}
|
|
|
|
func (broker *CoordinatorBroker) ListIndexes(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error) {
|
|
log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID))
|
|
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
|
|
defer cancel()
|
|
|
|
resp, err := broker.dataCoord.ListIndexes(ctx, &indexpb.ListIndexesRequest{
|
|
CollectionID: collectionID,
|
|
})
|
|
|
|
err = merr.CheckRPCCall(resp, err)
|
|
if err != nil {
|
|
if errors.Is(err, merr.ErrServiceUnimplemented) {
|
|
log.Warn("datacoord does not implement ListIndex API fallback to DescribeIndex")
|
|
return broker.describeIndex(ctx, collectionID)
|
|
}
|
|
log.Warn("failed to fetch index meta", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
return resp.GetIndexInfos(), nil
|
|
}
|