milvus/internal/querycoord/meta.go

1232 lines
40 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querycoord
import (
"context"
"errors"
"fmt"
"path/filepath"
"strconv"
"strings"
"sync"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/mqclient"
)
const (
collectionMetaPrefix = "queryCoord-collectionMeta"
queryChannelMetaPrefix = "queryCoord-queryChannel"
deltaChannelMetaPrefix = "queryCoord-deltaChannel"
globalQuerySeekPositionPrefix = "queryCoord-globalQuerySeekPosition"
)
type col2SegmentInfos = map[UniqueID][]*querypb.SegmentInfo
type col2SealedSegmentChangeInfos = map[UniqueID]*querypb.SealedSegmentsChangeInfo
// Meta contains information about all loaded collections and partitions, including segment information and vchannel information
type Meta interface {
reloadFromKV() error
setKvClient(kv kv.MetaKv)
showCollections() []*querypb.CollectionInfo
hasCollection(collectionID UniqueID) bool
getCollectionInfoByID(collectionID UniqueID) (*querypb.CollectionInfo, error)
addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error
releaseCollection(collectionID UniqueID) error
addPartition(collectionID UniqueID, partitionID UniqueID) error
showPartitions(collectionID UniqueID) ([]*querypb.PartitionStates, error)
hasPartition(collectionID UniqueID, partitionID UniqueID) bool
hasReleasePartition(collectionID UniqueID, partitionID UniqueID) bool
releasePartition(collectionID UniqueID, partitionID UniqueID) error
deleteSegmentInfoByNodeID(nodeID UniqueID) error
setSegmentInfos(segmentInfos map[UniqueID]*querypb.SegmentInfo) error
showSegmentInfos(collectionID UniqueID, partitionIDs []UniqueID) []*querypb.SegmentInfo
getSegmentInfoByID(segmentID UniqueID) (*querypb.SegmentInfo, error)
getSegmentInfosByNode(nodeID int64) []*querypb.SegmentInfo
getPartitionStatesByID(collectionID UniqueID, partitionID UniqueID) (*querypb.PartitionStates, error)
getDmChannelsByNodeID(collectionID UniqueID, nodeID int64) ([]string, error)
addDmChannel(collectionID UniqueID, nodeID int64, channels []string) error
removeDmChannel(collectionID UniqueID, nodeID int64, channels []string) error
getDeltaChannelsByCollectionID(collectionID UniqueID) ([]*datapb.VchannelInfo, error)
setDeltaChannel(collectionID UniqueID, info []*datapb.VchannelInfo) error
getQueryChannelInfoByID(collectionID UniqueID) (*querypb.QueryChannelInfo, error)
getQueryStreamByID(collectionID UniqueID) (msgstream.MsgStream, error)
setLoadType(collectionID UniqueID, loadType querypb.LoadType) error
getLoadType(collectionID UniqueID) (querypb.LoadType, error)
setLoadPercentage(collectionID UniqueID, partitionID UniqueID, percentage int64, loadType querypb.LoadType) error
//printMeta()
saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2SealedSegmentChangeInfos, error)
removeGlobalSealedSegInfos(collectionID UniqueID, partitionIDs []UniqueID) (col2SealedSegmentChangeInfos, error)
sendSealedSegmentChangeInfos(collectionID UniqueID, changeInfos *querypb.SealedSegmentsChangeInfo) (*querypb.QueryChannelInfo, map[string][]mqclient.MessageID, error)
}
// MetaReplica records the current load information on all querynodes
type MetaReplica struct {
ctx context.Context
cancel context.CancelFunc
client kv.MetaKv // client of a reliable kv service, i.e. etcd client
msFactory msgstream.Factory
idAllocator func() (UniqueID, error)
//sync.RWMutex
collectionInfos map[UniqueID]*querypb.CollectionInfo
collectionMu sync.RWMutex
segmentInfos map[UniqueID]*querypb.SegmentInfo
segmentMu sync.RWMutex
queryChannelInfos map[UniqueID]*querypb.QueryChannelInfo
channelMu sync.RWMutex
deltaChannelInfos map[UniqueID][]*datapb.VchannelInfo
deltaChannelMu sync.RWMutex
queryStreams map[UniqueID]msgstream.MsgStream
streamMu sync.RWMutex
globalSeekPosition *internalpb.MsgPosition
//partitionStates map[UniqueID]*querypb.PartitionStates
}
func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAllocator func() (UniqueID, error)) (Meta, error) {
childCtx, cancel := context.WithCancel(ctx)
collectionInfos := make(map[UniqueID]*querypb.CollectionInfo)
segmentInfos := make(map[UniqueID]*querypb.SegmentInfo)
queryChannelInfos := make(map[UniqueID]*querypb.QueryChannelInfo)
deltaChannelInfos := make(map[UniqueID][]*datapb.VchannelInfo)
queryMsgStream := make(map[UniqueID]msgstream.MsgStream)
position := &internalpb.MsgPosition{}
m := &MetaReplica{
ctx: childCtx,
cancel: cancel,
client: kv,
msFactory: factory,
idAllocator: idAllocator,
collectionInfos: collectionInfos,
segmentInfos: segmentInfos,
queryChannelInfos: queryChannelInfos,
deltaChannelInfos: deltaChannelInfos,
queryStreams: queryMsgStream,
globalSeekPosition: position,
}
err := m.reloadFromKV()
if err != nil {
return nil, err
}
return m, nil
}
func (m *MetaReplica) reloadFromKV() error {
log.Debug("start reload from kv")
collectionKeys, collectionValues, err := m.client.LoadWithPrefix(collectionMetaPrefix)
if err != nil {
return err
}
for index := range collectionKeys {
collectionID, err := strconv.ParseInt(filepath.Base(collectionKeys[index]), 10, 64)
if err != nil {
return err
}
collectionInfo := &querypb.CollectionInfo{}
err = proto.Unmarshal([]byte(collectionValues[index]), collectionInfo)
if err != nil {
return err
}
m.collectionInfos[collectionID] = collectionInfo
}
segmentKeys, segmentValues, err := m.client.LoadWithPrefix(util.SegmentMetaPrefix)
if err != nil {
return err
}
for index := range segmentKeys {
segmentID, err := strconv.ParseInt(filepath.Base(segmentKeys[index]), 10, 64)
if err != nil {
return err
}
segmentInfo := &querypb.SegmentInfo{}
err = proto.Unmarshal([]byte(segmentValues[index]), segmentInfo)
if err != nil {
return err
}
m.segmentInfos[segmentID] = segmentInfo
}
queryChannelKeys, queryChannelValues, err := m.client.LoadWithPrefix(queryChannelMetaPrefix)
if err != nil {
return nil
}
for index := range queryChannelKeys {
collectionID, err := strconv.ParseInt(filepath.Base(queryChannelKeys[index]), 10, 64)
if err != nil {
return err
}
queryChannelInfo := &querypb.QueryChannelInfo{}
err = proto.Unmarshal([]byte(queryChannelValues[index]), queryChannelInfo)
if err != nil {
return err
}
m.queryChannelInfos[collectionID] = queryChannelInfo
}
deltaChannelKeys, deltaChannelValues, err := m.client.LoadWithPrefix(deltaChannelMetaPrefix)
if err != nil {
return nil
}
for index, value := range deltaChannelValues {
pathStrings := strings.Split(deltaChannelKeys[index], "/")
collectionID, err := strconv.ParseInt(pathStrings[len(pathStrings)-2], 10, 64)
if err != nil {
return err
}
deltaChannelInfo := &datapb.VchannelInfo{}
err = proto.Unmarshal([]byte(value), deltaChannelInfo)
if err != nil {
return err
}
m.deltaChannelInfos[collectionID] = append(m.deltaChannelInfos[collectionID], deltaChannelInfo)
}
globalSeekPosValue, err := m.client.Load(globalQuerySeekPositionPrefix)
if err == nil {
position := &internalpb.MsgPosition{}
err = proto.Unmarshal([]byte(globalSeekPosValue), position)
if err != nil {
return err
}
m.globalSeekPosition = position
}
//TODO::update partition states
log.Debug("reload from kv finished")
return nil
}
func (m *MetaReplica) setKvClient(kv kv.MetaKv) {
m.client = kv
}
func (m *MetaReplica) showCollections() []*querypb.CollectionInfo {
m.collectionMu.RLock()
defer m.collectionMu.RUnlock()
collections := make([]*querypb.CollectionInfo, 0)
for _, info := range m.collectionInfos {
collections = append(collections, proto.Clone(info).(*querypb.CollectionInfo))
}
return collections
}
func (m *MetaReplica) showPartitions(collectionID UniqueID) ([]*querypb.PartitionStates, error) {
m.collectionMu.RLock()
defer m.collectionMu.RUnlock()
//TODO::should update after load collection
results := make([]*querypb.PartitionStates, 0)
if info, ok := m.collectionInfos[collectionID]; ok {
for _, state := range info.PartitionStates {
results = append(results, proto.Clone(state).(*querypb.PartitionStates))
}
return results, nil
}
return nil, errors.New("showPartitions: can't find collection in collectionInfos")
}
func (m *MetaReplica) hasCollection(collectionID UniqueID) bool {
m.collectionMu.RLock()
defer m.collectionMu.RUnlock()
if _, ok := m.collectionInfos[collectionID]; ok {
return true
}
return false
}
func (m *MetaReplica) hasPartition(collectionID UniqueID, partitionID UniqueID) bool {
m.collectionMu.RLock()
defer m.collectionMu.RUnlock()
if info, ok := m.collectionInfos[collectionID]; ok {
for _, id := range info.PartitionIDs {
if partitionID == id {
return true
}
}
}
return false
}
func (m *MetaReplica) hasReleasePartition(collectionID UniqueID, partitionID UniqueID) bool {
m.collectionMu.RLock()
defer m.collectionMu.RUnlock()
if info, ok := m.collectionInfos[collectionID]; ok {
for _, id := range info.ReleasedPartitionIDs {
if partitionID == id {
return true
}
}
}
return false
}
func (m *MetaReplica) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
hasCollection := m.hasCollection(collectionID)
if !hasCollection {
partitions := make([]UniqueID, 0)
partitionStates := make([]*querypb.PartitionStates, 0)
channels := make([]*querypb.DmChannelInfo, 0)
newCollection := &querypb.CollectionInfo{
CollectionID: collectionID,
PartitionIDs: partitions,
PartitionStates: partitionStates,
ChannelInfos: channels,
Schema: schema,
}
err := saveGlobalCollectionInfo(collectionID, newCollection, m.client)
if err != nil {
log.Error("save collectionInfo error", zap.Any("error", err.Error()), zap.Int64("collectionID", collectionID))
return err
}
m.collectionMu.Lock()
m.collectionInfos[collectionID] = newCollection
m.collectionMu.Unlock()
}
return nil
}
func (m *MetaReplica) addPartition(collectionID UniqueID, partitionID UniqueID) error {
m.collectionMu.Lock()
defer m.collectionMu.Unlock()
if info, ok := m.collectionInfos[collectionID]; ok {
col := proto.Clone(info).(*querypb.CollectionInfo)
log.Debug("add a partition to MetaReplica...", zap.Int64s("partitionIDs", col.PartitionIDs))
for _, id := range col.PartitionIDs {
if id == partitionID {
return nil
}
}
col.PartitionIDs = append(col.PartitionIDs, partitionID)
releasedPartitionIDs := make([]UniqueID, 0)
for _, id := range col.ReleasedPartitionIDs {
if id != partitionID {
releasedPartitionIDs = append(releasedPartitionIDs, id)
}
}
col.ReleasedPartitionIDs = releasedPartitionIDs
col.PartitionStates = append(col.PartitionStates, &querypb.PartitionStates{
PartitionID: partitionID,
State: querypb.PartitionState_NotPresent,
})
log.Debug("add a partition to MetaReplica", zap.Int64s("partitionIDs", col.PartitionIDs))
err := saveGlobalCollectionInfo(collectionID, col, m.client)
if err != nil {
log.Error("save collectionInfo error", zap.Any("error", err.Error()), zap.Int64("collectionID", collectionID))
return err
}
m.collectionInfos[collectionID] = col
return nil
}
return errors.New("addPartition: can't find collection when add partition")
}
func (m *MetaReplica) deleteSegmentInfoByNodeID(nodeID UniqueID) error {
m.segmentMu.Lock()
defer m.segmentMu.Unlock()
segmentIDsToRemove := make([]UniqueID, 0)
for segmentID, info := range m.segmentInfos {
if info.NodeID == nodeID {
segmentIDsToRemove = append(segmentIDsToRemove, segmentID)
}
}
err := multiRemoveSegmentInfo(segmentIDsToRemove, m.client)
if err != nil {
log.Error("remove segmentInfo from etcd error", zap.Any("error", err.Error()), zap.Int64s("segmentIDs", segmentIDsToRemove))
return err
}
for _, segmentID := range segmentIDsToRemove {
delete(m.segmentInfos, segmentID)
}
return nil
}
func (m *MetaReplica) setSegmentInfos(segmentInfos map[UniqueID]*querypb.SegmentInfo) error {
m.segmentMu.Lock()
defer m.segmentMu.Unlock()
err := multiSaveSegmentInfos(segmentInfos, m.client)
if err != nil {
log.Error("save segmentInfos error", zap.Any("segmentInfos", segmentInfos), zap.Error(err))
return err
}
for segmentID, info := range segmentInfos {
m.segmentInfos[segmentID] = info
}
return nil
}
func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2SealedSegmentChangeInfos, error) {
if len(saves) == 0 {
return nil, nil
}
// generate segment change info according segment info to updated
col2SegmentChangeInfos := make(col2SealedSegmentChangeInfos)
segmentsCompactionFrom := make([]UniqueID, 0)
// get segmentInfos to colSegmentInfos
for collectionID, onlineInfos := range saves {
segmentsChangeInfo := &querypb.SealedSegmentsChangeInfo{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SealedSegmentsChangeInfo,
},
Infos: []*querypb.SegmentChangeInfo{},
}
for _, info := range onlineInfos {
segmentID := info.SegmentID
onlineNodeID := info.NodeID
changeInfo := &querypb.SegmentChangeInfo{
OnlineNodeID: onlineNodeID,
OnlineSegments: []*querypb.SegmentInfo{info},
}
offlineInfo, err := m.getSegmentInfoByID(segmentID)
if err == nil {
offlineNodeID := offlineInfo.NodeID
// if the offline segment state is growing, it will not impact the global sealed segments
if offlineInfo.SegmentState == querypb.SegmentState_sealed {
changeInfo.OfflineNodeID = offlineNodeID
changeInfo.OfflineSegments = []*querypb.SegmentInfo{offlineInfo}
}
}
segmentsChangeInfo.Infos = append(segmentsChangeInfo.Infos, changeInfo)
// generate offline segment change info if the loaded segment is compacted from other sealed segments
for _, compactionSegmentID := range info.CompactionFrom {
compactionSegmentInfo, err := m.getSegmentInfoByID(compactionSegmentID)
if err == nil && compactionSegmentInfo.SegmentState == querypb.SegmentState_sealed {
segmentsChangeInfo.Infos = append(segmentsChangeInfo.Infos, &querypb.SegmentChangeInfo{
OfflineNodeID: compactionSegmentInfo.NodeID,
OfflineSegments: []*querypb.SegmentInfo{compactionSegmentInfo},
})
segmentsCompactionFrom = append(segmentsCompactionFrom, compactionSegmentID)
} else {
return nil, fmt.Errorf("saveGlobalSealedSegInfos: the compacted segment %d has not been loaded into memory", compactionSegmentID)
}
}
}
col2SegmentChangeInfos[collectionID] = segmentsChangeInfo
}
queryChannelInfosMap := make(map[UniqueID]*querypb.QueryChannelInfo)
var globalSeekPositionTmp *internalpb.MsgPosition
for collectionID, segmentChangeInfos := range col2SegmentChangeInfos {
// get msgStream to produce sealedSegmentChangeInfos to query channel
queryChannelInfo, messageIDInfos, err := m.sendSealedSegmentChangeInfos(collectionID, segmentChangeInfos)
if err != nil {
return nil, err
}
// len(messageIDs) == 1
messageIDs, ok := messageIDInfos[queryChannelInfo.QueryChannelID]
if !ok || len(messageIDs) == 0 {
return col2SegmentChangeInfos, errors.New("updateGlobalSealedSegmentInfos: send sealed segment change info failed")
}
if queryChannelInfo.SeekPosition == nil {
queryChannelInfo.SeekPosition = &internalpb.MsgPosition{
ChannelName: queryChannelInfo.QueryChannelID,
}
}
queryChannelInfo.SeekPosition.MsgID = messageIDs[0].Serialize()
// update segmentInfo, queryChannelInfo meta to cache and etcd
seg2Info := make(map[UniqueID]*querypb.SegmentInfo)
for _, segmentInfo := range queryChannelInfo.GlobalSealedSegments {
segmentID := segmentInfo.SegmentID
seg2Info[segmentID] = segmentInfo
}
if infos, ok := saves[collectionID]; ok {
for _, segmentInfo := range infos {
segmentID := segmentInfo.SegmentID
seg2Info[segmentID] = segmentInfo
}
}
globalSealedSegmentInfos := make([]*querypb.SegmentInfo, 0)
for _, info := range seg2Info {
globalSealedSegmentInfos = append(globalSealedSegmentInfos, info)
}
queryChannelInfo.GlobalSealedSegments = globalSealedSegmentInfos
queryChannelInfosMap[collectionID] = queryChannelInfo
globalSeekPositionTmp = queryChannelInfo.SeekPosition
}
// save segmentInfo to etcd
segmentInfoKvs := make(map[string]string)
for _, infos := range saves {
for _, info := range infos {
segmentInfoBytes, err := proto.Marshal(info)
if err != nil {
return col2SegmentChangeInfos, err
}
segmentKey := fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, info.SegmentID)
segmentInfoKvs[segmentKey] = string(segmentInfoBytes)
}
}
for key, value := range segmentInfoKvs {
err := m.client.Save(key, value)
if err != nil {
panic(err)
}
}
// remove compacted segment info from etcd
for _, segmentID := range segmentsCompactionFrom {
segmentKey := fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, segmentID)
err := m.client.Remove(segmentKey)
if err != nil {
panic(err)
}
}
// save queryChannelInfo and sealedSegmentsChangeInfo to etcd
saveKvs := make(map[string]string)
for collectionID, queryChannelInfo := range queryChannelInfosMap {
channelInfoBytes, err := proto.Marshal(queryChannelInfo)
if err != nil {
return col2SegmentChangeInfos, err
}
channelKey := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, collectionID)
saveKvs[channelKey] = string(channelInfoBytes)
}
seekPos, err := proto.Marshal(globalSeekPositionTmp)
if err != nil {
return col2SegmentChangeInfos, err
}
saveKvs[globalQuerySeekPositionPrefix] = string(seekPos)
// save segmentChangeInfo into etcd, query node will deal the changeInfo if the msgID key exist in etcd
// avoid the produce process success but save meta to etcd failed
// then the msgID key will not exist, and changeIndo will be ignored by query node
for _, changeInfos := range col2SegmentChangeInfos {
changeInfoBytes, err := proto.Marshal(changeInfos)
if err != nil {
return col2SegmentChangeInfos, err
}
// TODO:: segmentChangeInfo clear in etcd with coord gc and queryNode watch the changeInfo meta to deal changeInfoMsg
changeInfoKey := fmt.Sprintf("%s/%d", util.ChangeInfoMetaPrefix, changeInfos.Base.MsgID)
saveKvs[changeInfoKey] = string(changeInfoBytes)
}
err = m.client.MultiSave(saveKvs)
if err != nil {
panic(err)
}
m.segmentMu.Lock()
for _, segmentInfos := range saves {
for _, info := range segmentInfos {
segmentID := info.SegmentID
m.segmentInfos[segmentID] = info
}
}
for _, segmentID := range segmentsCompactionFrom {
delete(m.segmentInfos, segmentID)
}
m.segmentMu.Unlock()
m.channelMu.Lock()
for collectionID, channelInfo := range queryChannelInfosMap {
m.queryChannelInfos[collectionID] = channelInfo
}
m.globalSeekPosition = globalSeekPositionTmp
m.channelMu.Unlock()
return col2SegmentChangeInfos, nil
}
func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitionIDs []UniqueID) (col2SealedSegmentChangeInfos, error) {
removes := m.showSegmentInfos(collectionID, partitionIDs)
if len(removes) == 0 {
return nil, nil
}
// get segmentInfos to remove
segmentChangeInfos := &querypb.SealedSegmentsChangeInfo{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SealedSegmentsChangeInfo,
},
Infos: []*querypb.SegmentChangeInfo{},
}
for _, info := range removes {
offlineNodeID := info.NodeID
changeInfo := &querypb.SegmentChangeInfo{
OfflineNodeID: offlineNodeID,
OfflineSegments: []*querypb.SegmentInfo{info},
}
segmentChangeInfos.Infos = append(segmentChangeInfos.Infos, changeInfo)
}
// get msgStream to produce sealedSegmentChangeInfos to query channel
queryChannelInfo, messageIDInfos, err := m.sendSealedSegmentChangeInfos(collectionID, segmentChangeInfos)
if err != nil {
return nil, err
}
// len(messageIDs) = 1
messageIDs, ok := messageIDInfos[queryChannelInfo.QueryChannelID]
if !ok || len(messageIDs) == 0 {
return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, errors.New("updateGlobalSealedSegmentInfos: send sealed segment change info failed")
}
if queryChannelInfo.SeekPosition == nil {
queryChannelInfo.SeekPosition = &internalpb.MsgPosition{
ChannelName: queryChannelInfo.QueryChannelID,
}
}
queryChannelInfo.SeekPosition.MsgID = messageIDs[0].Serialize()
// update segmentInfo, queryChannelInfo meta to cache and etcd
seg2Info := make(map[UniqueID]*querypb.SegmentInfo)
for _, segmentInfo := range queryChannelInfo.GlobalSealedSegments {
segmentID := segmentInfo.SegmentID
seg2Info[segmentID] = segmentInfo
}
for _, segmentInfo := range removes {
segmentID := segmentInfo.SegmentID
delete(seg2Info, segmentID)
}
globalSealedSegmentInfos := make([]*querypb.SegmentInfo, 0)
for _, info := range seg2Info {
globalSealedSegmentInfos = append(globalSealedSegmentInfos, info)
}
queryChannelInfo.GlobalSealedSegments = globalSealedSegmentInfos
// remove meta from etcd
for _, info := range removes {
segmentKey := fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, info.SegmentID)
err = m.client.Remove(segmentKey)
if err != nil {
panic(err)
}
}
// save meta to etcd
saveKvs := make(map[string]string)
channelInfoBytes, err := proto.Marshal(queryChannelInfo)
if err != nil {
return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, err
}
channelKey := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, collectionID)
saveKvs[channelKey] = string(channelInfoBytes)
seekPos, err := proto.Marshal(queryChannelInfo.SeekPosition)
if err != nil {
return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, err
}
saveKvs[globalQuerySeekPositionPrefix] = string(seekPos)
// save segmentChangeInfo into etcd, query node will deal the changeInfo if the msgID key exist in etcd
// avoid the produce process success but save meta to etcd failed
// then the msgID key will not exist, and changeIndo will be ignored by query node
changeInfoBytes, err := proto.Marshal(segmentChangeInfos)
if err != nil {
return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, err
}
// TODO:: segmentChangeInfo clear in etcd with coord gc and queryNode watch the changeInfo meta to deal changeInfoMsg
changeInfoKey := fmt.Sprintf("%s/%d", util.ChangeInfoMetaPrefix, segmentChangeInfos.Base.MsgID)
saveKvs[changeInfoKey] = string(changeInfoBytes)
err = m.client.MultiSave(saveKvs)
if err != nil {
panic(err)
}
m.segmentMu.Lock()
for _, info := range removes {
delete(m.segmentInfos, info.SegmentID)
}
m.segmentMu.Unlock()
m.channelMu.Lock()
m.queryChannelInfos[collectionID] = queryChannelInfo
m.globalSeekPosition = queryChannelInfo.SeekPosition
m.channelMu.Unlock()
return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, nil
}
// send sealed segment change infos into query channels
func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, changeInfos *querypb.SealedSegmentsChangeInfo) (*querypb.QueryChannelInfo, map[string][]mqclient.MessageID, error) {
// get msgStream to produce sealedSegmentChangeInfos to query channel
queryChannelInfo, err := m.getQueryChannelInfoByID(collectionID)
if err != nil {
log.Error("updateGlobalSealedSegmentInfos: get query channel info failed", zap.Int64("collectionID", collectionID), zap.Error(err))
return nil, nil, err
}
queryStream, err := m.getQueryStreamByID(collectionID)
if err != nil {
log.Error("updateGlobalSealedSegmentInfos: get query stream failed", zap.Int64("collectionID", collectionID), zap.Error(err))
return nil, nil, err
}
var msgPack = &msgstream.MsgPack{
Msgs: []msgstream.TsMsg{},
}
id, err := m.idAllocator()
if err != nil {
log.Error("allocator trigger taskID failed", zap.Error(err))
return nil, nil, err
}
changeInfos.Base.MsgID = id
segmentChangeMsg := &msgstream.SealedSegmentsChangeInfoMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{0},
},
SealedSegmentsChangeInfo: *changeInfos,
}
msgPack.Msgs = append(msgPack.Msgs, segmentChangeMsg)
messageIDInfos, err := queryStream.ProduceMark(msgPack)
if err != nil {
log.Error("updateGlobalSealedSegmentInfos: send sealed segment change info failed", zap.Int64("collectionID", collectionID), zap.Error(err))
return nil, nil, err
}
log.Debug("updateGlobalSealedSegmentInfos: send sealed segment change info to queryChannel", zap.Any("msgPack", msgPack))
return queryChannelInfo, messageIDInfos, nil
}
func (m *MetaReplica) showSegmentInfos(collectionID UniqueID, partitionIDs []UniqueID) []*querypb.SegmentInfo {
m.segmentMu.RLock()
defer m.segmentMu.RUnlock()
results := make([]*querypb.SegmentInfo, 0)
segmentInfos := make([]*querypb.SegmentInfo, 0)
for _, info := range m.segmentInfos {
if info.CollectionID == collectionID {
segmentInfos = append(segmentInfos, proto.Clone(info).(*querypb.SegmentInfo))
}
}
if len(partitionIDs) == 0 {
return segmentInfos
}
for _, info := range segmentInfos {
for _, partitionID := range partitionIDs {
if info.PartitionID == partitionID {
results = append(results, info)
}
}
}
return results
}
func (m *MetaReplica) getSegmentInfoByID(segmentID UniqueID) (*querypb.SegmentInfo, error) {
m.segmentMu.RLock()
defer m.segmentMu.RUnlock()
if info, ok := m.segmentInfos[segmentID]; ok {
return proto.Clone(info).(*querypb.SegmentInfo), nil
}
return nil, errors.New("getSegmentInfoByID: can't find segmentID in segmentInfos")
}
func (m *MetaReplica) getSegmentInfosByNode(nodeID int64) []*querypb.SegmentInfo {
m.segmentMu.RLock()
defer m.segmentMu.RUnlock()
segmentInfos := make([]*querypb.SegmentInfo, 0)
for _, info := range m.segmentInfos {
if info.NodeID == nodeID {
segmentInfos = append(segmentInfos, proto.Clone(info).(*querypb.SegmentInfo))
}
}
return segmentInfos
}
func (m *MetaReplica) getCollectionInfoByID(collectionID UniqueID) (*querypb.CollectionInfo, error) {
m.collectionMu.RLock()
defer m.collectionMu.RUnlock()
if info, ok := m.collectionInfos[collectionID]; ok {
return proto.Clone(info).(*querypb.CollectionInfo), nil
}
return nil, errors.New("getCollectionInfoByID: can't find collectionID in collectionInfo")
}
func (m *MetaReplica) getPartitionStatesByID(collectionID UniqueID, partitionID UniqueID) (*querypb.PartitionStates, error) {
m.collectionMu.RLock()
defer m.collectionMu.RUnlock()
if info, ok := m.collectionInfos[collectionID]; ok {
for offset, id := range info.PartitionIDs {
if id == partitionID {
return proto.Clone(info.PartitionStates[offset]).(*querypb.PartitionStates), nil
}
}
return nil, errors.New("getPartitionStateByID: can't find partitionID in partitionStates")
}
return nil, errors.New("getPartitionStateByID: can't find collectionID in collectionInfo")
}
func (m *MetaReplica) releaseCollection(collectionID UniqueID) error {
err := removeGlobalCollectionInfo(collectionID, m.client)
if err != nil {
log.Warn("remove collectionInfo from etcd failed", zap.Any("error", err.Error()), zap.Int64("collectionID", collectionID))
return err
}
m.collectionMu.Lock()
delete(m.collectionInfos, collectionID)
m.collectionMu.Unlock()
return nil
}
func (m *MetaReplica) releasePartition(collectionID UniqueID, partitionID UniqueID) error {
info, err := m.getCollectionInfoByID(collectionID)
if err == nil {
newPartitionIDs := make([]UniqueID, 0)
newPartitionStates := make([]*querypb.PartitionStates, 0)
for offset, id := range info.PartitionIDs {
if id != partitionID {
newPartitionIDs = append(newPartitionIDs, id)
newPartitionStates = append(newPartitionStates, info.PartitionStates[offset])
}
}
info.PartitionIDs = newPartitionIDs
info.PartitionStates = newPartitionStates
releasedPartitionIDs := make([]UniqueID, 0)
for _, id := range info.ReleasedPartitionIDs {
if id != partitionID {
releasedPartitionIDs = append(releasedPartitionIDs, id)
}
}
releasedPartitionIDs = append(releasedPartitionIDs, partitionID)
info.ReleasedPartitionIDs = releasedPartitionIDs
// If user loaded a collectionA, and release a partitionB which belongs to collectionA,
// and then load collectionA again, if we don't set the inMemoryPercentage to 0 when releasing
// partitionB, the second loading of collectionA would directly return because
// the inMemoryPercentage in ShowCollection response is still the old value -- 100.
// So if releasing partition, inMemoryPercentage should be set to 0.
info.InMemoryPercentage = 0
err = saveGlobalCollectionInfo(collectionID, info, m.client)
if err != nil {
log.Error("releasePartition: save collectionInfo error", zap.Any("error", err.Error()), zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID))
return err
}
m.collectionMu.Lock()
m.collectionInfos[collectionID] = info
m.collectionMu.Unlock()
return nil
}
return err
}
func (m *MetaReplica) getDmChannelsByNodeID(collectionID UniqueID, nodeID int64) ([]string, error) {
m.collectionMu.RLock()
defer m.collectionMu.RUnlock()
if info, ok := m.collectionInfos[collectionID]; ok {
channels := make([]string, 0)
for _, channelInfo := range info.ChannelInfos {
if channelInfo.NodeIDLoaded == nodeID {
channels = append(channels, channelInfo.ChannelIDs...)
}
}
return channels, nil
}
return nil, errors.New("getDmChannelsByNodeID: can't find collection in collectionInfos")
}
func (m *MetaReplica) addDmChannel(collectionID UniqueID, nodeID int64, channels []string) error {
//before add channel, should ensure toAddedChannels not in MetaReplica
info, err := m.getCollectionInfoByID(collectionID)
if err == nil {
findNodeID := false
for _, channelInfo := range info.ChannelInfos {
if channelInfo.NodeIDLoaded == nodeID {
findNodeID = true
channelInfo.ChannelIDs = append(channelInfo.ChannelIDs, channels...)
}
}
if !findNodeID {
newChannelInfo := &querypb.DmChannelInfo{
NodeIDLoaded: nodeID,
ChannelIDs: channels,
}
info.ChannelInfos = append(info.ChannelInfos, newChannelInfo)
}
err = saveGlobalCollectionInfo(collectionID, info, m.client)
if err != nil {
log.Error("save collectionInfo error", zap.Any("error", err.Error()), zap.Int64("collectionID", collectionID))
return err
}
m.collectionMu.Lock()
m.collectionInfos[collectionID] = info
m.collectionMu.Unlock()
return nil
}
return errors.New("addDmChannels: can't find collection in collectionInfos")
}
func (m *MetaReplica) removeDmChannel(collectionID UniqueID, nodeID int64, channels []string) error {
info, err := m.getCollectionInfoByID(collectionID)
if err == nil {
for _, channelInfo := range info.ChannelInfos {
if channelInfo.NodeIDLoaded == nodeID {
newChannelIDs := make([]string, 0)
for _, channelID := range channelInfo.ChannelIDs {
findChannel := false
for _, channel := range channels {
if channelID == channel {
findChannel = true
}
}
if !findChannel {
newChannelIDs = append(newChannelIDs, channelID)
}
}
channelInfo.ChannelIDs = newChannelIDs
}
}
err := saveGlobalCollectionInfo(collectionID, info, m.client)
if err != nil {
log.Error("save collectionInfo error", zap.Any("error", err.Error()), zap.Int64("collectionID", collectionID))
return err
}
m.collectionMu.Lock()
m.collectionInfos[collectionID] = info
m.collectionMu.Unlock()
return nil
}
return errors.New("addDmChannels: can't find collection in collectionInfos")
}
func createQueryChannel(collectionID UniqueID) *querypb.QueryChannelInfo {
searchPrefix := Params.SearchChannelPrefix
searchResultPrefix := Params.SearchResultChannelPrefix
allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(collectionID, 10)
allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(collectionID, 10)
log.Debug("query coordinator create query channel", zap.String("queryChannelName", allocatedQueryChannel), zap.String("queryResultChannelName", allocatedQueryResultChannel))
seekPosition := &internalpb.MsgPosition{
ChannelName: allocatedQueryChannel,
}
info := &querypb.QueryChannelInfo{
CollectionID: collectionID,
QueryChannelID: allocatedQueryChannel,
QueryResultChannelID: allocatedQueryResultChannel,
GlobalSealedSegments: []*querypb.SegmentInfo{},
SeekPosition: seekPosition,
}
return info
}
// Get delta channel info for collection, so far all the collection share the same query channel 0
func (m *MetaReplica) getDeltaChannelsByCollectionID(collectionID UniqueID) ([]*datapb.VchannelInfo, error) {
m.deltaChannelMu.RLock()
defer m.deltaChannelMu.RUnlock()
if infos, ok := m.deltaChannelInfos[collectionID]; ok {
return infos, nil
}
return nil, fmt.Errorf("delta channel not exist in meta")
}
func (m *MetaReplica) setDeltaChannel(collectionID UniqueID, infos []*datapb.VchannelInfo) error {
m.deltaChannelMu.Lock()
defer m.deltaChannelMu.Unlock()
_, ok := m.deltaChannelInfos[collectionID]
if ok {
log.Debug("delta channel already exist", zap.Any("collectionID", collectionID))
return nil
}
err := saveDeltaChannelInfo(collectionID, infos, m.client)
if err != nil {
log.Error("save delta channel info error", zap.Error(err))
return err
}
log.Debug("save delta channel infos to meta", zap.Any("collectionID", collectionID), zap.Any("infos", infos))
m.deltaChannelInfos[collectionID] = infos
return nil
}
// Get Query channel info for collection, so far all the collection share the same query channel 0
func (m *MetaReplica) getQueryChannelInfoByID(collectionID UniqueID) (*querypb.QueryChannelInfo, error) {
m.channelMu.Lock()
defer m.channelMu.Unlock()
if info, ok := m.queryChannelInfos[collectionID]; ok {
return proto.Clone(info).(*querypb.QueryChannelInfo), nil
}
// TODO::to remove
// all collection use the same query channel
colIDForAssignChannel := UniqueID(0)
info := createQueryChannel(colIDForAssignChannel)
err := saveQueryChannelInfo(collectionID, info, m.client)
if err != nil {
log.Error("getQueryChannel: save channel to etcd error", zap.Error(err))
return nil, err
}
// set info.collectionID from 0 to realID
info.CollectionID = collectionID
m.queryChannelInfos[collectionID] = info
info.SeekPosition = m.globalSeekPosition
if info.SeekPosition != nil {
info.SeekPosition.ChannelName = info.QueryChannelID
}
return proto.Clone(info).(*querypb.QueryChannelInfo), nil
}
func (m *MetaReplica) getQueryStreamByID(collectionID UniqueID) (msgstream.MsgStream, error) {
m.streamMu.Lock()
defer m.streamMu.Unlock()
info, err := m.getQueryChannelInfoByID(collectionID)
if err != nil {
return nil, err
}
stream, ok := m.queryStreams[collectionID]
if !ok {
stream, err = m.msFactory.NewMsgStream(m.ctx)
if err != nil {
log.Error("updateGlobalSealedSegmentInfos: create msgStream failed", zap.Error(err))
return nil, err
}
queryChannel := info.QueryChannelID
stream.AsProducer([]string{queryChannel})
m.queryStreams[collectionID] = stream
log.Debug("getQueryStreamByID: create query msgStream for collection", zap.Int64("collectionID", collectionID))
}
return stream, nil
}
func (m *MetaReplica) setLoadType(collectionID UniqueID, loadType querypb.LoadType) error {
info, err := m.getCollectionInfoByID(collectionID)
if err == nil {
info.LoadType = loadType
err := saveGlobalCollectionInfo(collectionID, info, m.client)
if err != nil {
log.Error("save collectionInfo error", zap.Any("error", err.Error()), zap.Int64("collectionID", collectionID))
return err
}
m.collectionMu.Lock()
m.collectionInfos[collectionID] = info
m.collectionMu.Unlock()
return nil
}
return errors.New("setLoadType: can't find collection in collectionInfos")
}
func (m *MetaReplica) getLoadType(collectionID UniqueID) (querypb.LoadType, error) {
m.collectionMu.RLock()
defer m.collectionMu.RUnlock()
if info, ok := m.collectionInfos[collectionID]; ok {
return info.LoadType, nil
}
return 0, errors.New("getLoadType: can't find collection in collectionInfos")
}
func (m *MetaReplica) setLoadPercentage(collectionID UniqueID, partitionID UniqueID, percentage int64, loadType querypb.LoadType) error {
info, err := m.getCollectionInfoByID(collectionID)
if err != nil {
return errors.New("setLoadPercentage: can't find collection in collectionInfos")
}
if loadType == querypb.LoadType_loadCollection {
info.InMemoryPercentage = percentage
for _, partitionState := range info.PartitionStates {
if percentage >= 100 {
partitionState.State = querypb.PartitionState_InMemory
} else {
partitionState.State = querypb.PartitionState_PartialInMemory
}
partitionState.InMemoryPercentage = percentage
}
err := saveGlobalCollectionInfo(collectionID, info, m.client)
if err != nil {
log.Error("save collectionInfo error", zap.Any("error", err.Error()), zap.Int64("collectionID", collectionID))
return err
}
} else {
findPartition := false
for _, partitionState := range info.PartitionStates {
if partitionState.PartitionID == partitionID {
findPartition = true
if percentage >= 100 {
partitionState.State = querypb.PartitionState_InMemory
} else {
partitionState.State = querypb.PartitionState_PartialInMemory
}
partitionState.InMemoryPercentage = percentage
err := saveGlobalCollectionInfo(collectionID, info, m.client)
if err != nil {
log.Error("save collectionInfo error", zap.Any("error", err.Error()), zap.Int64("collectionID", collectionID))
return err
}
}
}
if !findPartition {
return errors.New("setLoadPercentage: can't find partitionID in collectionInfos")
}
}
m.collectionMu.Lock()
m.collectionInfos[collectionID] = info
m.collectionMu.Unlock()
return nil
}
//func (m *MetaReplica) printMeta() {
// m.RLock()
// defer m.RUnlock()
// for id, info := range m.collectionInfos {
// log.Debug("query coordinator MetaReplica: collectionInfo", zap.Int64("collectionID", id), zap.Any("info", info))
// }
//
// for id, info := range m.segmentInfos {
// log.Debug("query coordinator MetaReplica: segmentInfo", zap.Int64("segmentID", id), zap.Any("info", info))
// }
//
// for id, info := range m.queryChannelInfos {
// log.Debug("query coordinator MetaReplica: queryChannelInfo", zap.Int64("collectionID", id), zap.Any("info", info))
// }
//}
func saveGlobalCollectionInfo(collectionID UniqueID, info *querypb.CollectionInfo, kv kv.MetaKv) error {
infoBytes, err := proto.Marshal(info)
if err != nil {
return err
}
key := fmt.Sprintf("%s/%d", collectionMetaPrefix, collectionID)
return kv.Save(key, string(infoBytes))
}
func removeGlobalCollectionInfo(collectionID UniqueID, kv kv.MetaKv) error {
key := fmt.Sprintf("%s/%d", collectionMetaPrefix, collectionID)
return kv.Remove(key)
}
func multiSaveSegmentInfos(segmentInfos map[UniqueID]*querypb.SegmentInfo, kv kv.MetaKv) error {
kvs := make(map[string]string)
for segmentID, info := range segmentInfos {
infoBytes, err := proto.Marshal(info)
if err != nil {
return err
}
key := fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, segmentID)
kvs[key] = string(infoBytes)
}
return kv.MultiSave(kvs)
}
func multiRemoveSegmentInfo(segmentIDs []UniqueID, kv kv.MetaKv) error {
keys := make([]string, 0)
for _, segmentID := range segmentIDs {
key := fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, segmentID)
keys = append(keys, key)
}
return kv.MultiRemove(keys)
}
func saveQueryChannelInfo(collectionID UniqueID, info *querypb.QueryChannelInfo, kv kv.MetaKv) error {
infoBytes, err := proto.Marshal(info)
if err != nil {
return err
}
key := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, collectionID)
return kv.Save(key, string(infoBytes))
}
func saveDeltaChannelInfo(collectionID UniqueID, infos []*datapb.VchannelInfo, kv kv.MetaKv) error {
kvs := make(map[string]string)
for _, info := range infos {
infoBytes, err := proto.Marshal(info)
if err != nil {
return err
}
key := fmt.Sprintf("%s/%d/%s", deltaChannelMetaPrefix, collectionID, info.ChannelName)
kvs[key] = string(infoBytes)
}
return kv.MultiSave(kvs)
}