Reorganize querycoord proto (#13181)

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/13441/head
xige-16 2021-12-15 16:53:12 +08:00 committed by GitHub
parent 5e644a780c
commit 97d5231a0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 889 additions and 1072 deletions

View File

@ -892,8 +892,8 @@ func (m *meta) saveSegmentInfo(segment *SegmentInfo) error {
SegmentID: segment.ID,
CollectionID: segment.CollectionID,
PartitionID: segment.PartitionID,
ChannelID: segment.InsertChannel,
SegmentState: querypb.SegmentState_sealed,
DmChannel: segment.InsertChannel,
SegmentState: commonpb.SegmentState_Sealed,
CreatedByCompaction: segment.GetCreatedByCompaction(),
CompactionFrom: segment.GetCompactionFrom(),
}

View File

@ -52,7 +52,7 @@ service QueryNode {
rpc GetMetrics(milvus.GetMetricsRequest) returns (milvus.GetMetricsResponse) {}
}
//--------------------query coordinator proto------------------
//--------------------QueryCoord grpc request and response proto------------------
message ShowCollectionsRequest {
common.MsgBase base = 1;
// Not useful for now
@ -116,8 +116,8 @@ message CreateQueryChannelRequest {
message CreateQueryChannelResponse {
common.Status status = 1;
string request_channel = 2;
string result_channel = 3;
string query_channel = 2;
string query_result_channel = 3;
}
message GetPartitionStatesRequest {
@ -127,22 +127,6 @@ message GetPartitionStatesRequest {
repeated int64 partitionIDs = 4;
}
enum PartitionState {
NotExist = 0;
NotPresent = 1;
OnDisk = 2;
PartialInMemory = 3;
InMemory = 4;
PartialInGPU = 5;
InGPU = 6;
}
message PartitionStates {
int64 partitionID = 1;
PartitionState state = 2;
int64 inMemory_percentage = 3;
}
message GetPartitionStatesResponse {
common.Status status = 1;
repeated PartitionStates partition_descriptions = 2;
@ -154,36 +138,18 @@ message GetSegmentInfoRequest {
int64 collectionID = 3;
}
message SegmentInfo {
int64 segmentID = 1;
int64 collectionID = 2;
int64 partitionID = 3;
int64 nodeID = 4;
int64 mem_size = 5;
int64 num_rows = 6;
string index_name = 7;
int64 indexID = 8;
string channelID = 9;
SegmentState segment_state = 10; // will deprecated
repeated int64 compactionFrom = 11;
bool createdByCompaction = 12;
common.SegmentState state = 13;
bool enable_index = 14;
repeated index.IndexFilePathInfo index_path_infos = 15;
}
message GetSegmentInfoResponse {
common.Status status = 1;
repeated SegmentInfo infos = 2;
}
//-----------------query node proto----------------
//-----------------query node grpc request and response proto----------------
message AddQueryChannelRequest {
common.MsgBase base = 1;
int64 nodeID = 2;
int64 collectionID = 3;
string request_channelID = 4;
string result_channelID = 5;
string query_channel = 4;
string query_result_channel = 5;
repeated int64 global_sealed_segmentID = 6;
internal.MsgPosition seek_position = 7;
repeated SegmentInfo global_sealed_segments = 8;
@ -193,20 +159,10 @@ message RemoveQueryChannelRequest {
common.MsgBase base = 1;
int64 nodeID = 2;
int64 collectionID = 3;
string request_channelID = 4;
string result_channelID = 5;
string query_channel = 4;
string query_result_channel = 5;
}
//message excludedSegmentInfo {
// int64 segmentID = 1;
// internal.MsgPosition pos = 2;
//}
//message WatchDmChannelInfo {
// string channelID = 1;
// internal.MsgPosition pos = 2;
//}
message WatchDmChannelsRequest {
common.MsgBase base = 1;
int64 nodeID = 2;
@ -224,19 +180,6 @@ message WatchDeltaChannelsRequest {
repeated data.VchannelInfo infos = 4;
}
enum TriggerCondition {
handoff = 0;
loadBalance = 1;
grpcRequest = 2;
nodeDown = 3;
}
//message FieldBinlogPath {
// int64 filedID = 1;
// repeated string binlog_path = 2;
//}
//used for handoff task
message SegmentLoadInfo {
int64 segmentID = 1;
int64 partitionID = 2;
@ -257,9 +200,8 @@ message LoadSegmentsRequest {
int64 dst_nodeID = 2;
repeated SegmentLoadInfo infos = 3;
schema.CollectionSchema schema = 4;
TriggerCondition load_condition = 5; // deprecated
int64 source_nodeID = 6;
int64 collectionID = 7;
int64 source_nodeID = 5;
int64 collectionID = 6;
}
message ReleaseSegmentsRequest {
@ -272,55 +214,7 @@ message ReleaseSegmentsRequest {
repeated int64 segmentIDs = 6;
}
//----------------etcd-----------------
enum SegmentState {
None = 0;
Growing = 1;
Frozen = 2;
sealing = 3;
sealed = 4;
}
enum LoadType {
LoadPartition = 0;
loadCollection = 1;
}
message DmChannelInfo {
int64 nodeID_loaded = 1;
repeated string channelIDs = 2;
}
message QueryChannelInfo {
int64 collectionID = 1;
string query_channelID = 2;
string query_result_channelID = 3;
repeated SegmentInfo global_sealed_segments = 4;
internal.MsgPosition seek_position = 5;
}
message CollectionInfo {
int64 collectionID = 1;
repeated int64 partitionIDs = 2;
repeated PartitionStates partition_states = 3;
repeated DmChannelInfo channel_infos = 4;
LoadType load_type = 5;
schema.CollectionSchema schema = 6;
repeated int64 released_partitionIDs = 7;
int64 inMemory_percentage = 8;
}
message LoadBalanceSegmentInfo {
int64 segmentID = 1;
int64 partitionID = 2;
int64 collectionID = 3;
int64 source_nodeID = 4;
int64 dst_nodeID = 5;
bool source_done = 6;
bool dst_done = 7;
bool valid_info = 8;
}
//----------------request auto triggered by QueryCoord-----------------
message HandoffSegmentsRequest {
common.MsgBase base = 1;
repeated SegmentInfo segmentInfos = 2;
@ -334,7 +228,81 @@ message LoadBalanceRequest {
repeated int64 sealed_segmentIDs = 5;
}
//---------------- common query proto -----------------
//-------------------- internal meta proto------------------
enum PartitionState {
NotExist = 0;
NotPresent = 1;
OnDisk = 2;
PartialInMemory = 3;
InMemory = 4;
PartialInGPU = 5;
InGPU = 6;
}
enum TriggerCondition {
UnKnowCondition = 0;
Handoff = 1;
LoadBalance = 2;
GrpcRequest = 3;
NodeDown = 4;
}
enum LoadType {
UnKnowType = 0;
LoadPartition = 1;
loadCollection = 2;
}
message DmChannelWatchInfo {
int64 collectionID = 1;
repeated string channelIDs = 2; // will deprecate
string dmChannel = 3;
int64 nodeID_loaded = 4;
}
message QueryChannelInfo {
int64 collectionID = 1;
string query_channel = 2;
string query_result_channel = 3;
repeated SegmentInfo global_sealed_segments = 4;
internal.MsgPosition seek_position = 5;
}
message PartitionStates {
int64 partitionID = 1;
PartitionState state = 2;
int64 inMemory_percentage = 3;
}
message SegmentInfo {
int64 segmentID = 1;
int64 collectionID = 2;
int64 partitionID = 3;
int64 nodeID = 4;
int64 mem_size = 5;
int64 num_rows = 6;
string index_name = 7;
int64 indexID = 8;
string dmChannel = 9;
repeated int64 compactionFrom = 10;
bool createdByCompaction = 11;
common.SegmentState segment_state = 12;
bool enable_index = 13;
repeated index.IndexFilePathInfo index_path_infos = 14;
}
message CollectionInfo {
int64 collectionID = 1;
repeated int64 partitionIDs = 2;
repeated PartitionStates partition_states = 3;
repeated DmChannelWatchInfo channel_infos = 4;
LoadType load_type = 5;
schema.CollectionSchema schema = 6;
repeated int64 released_partitionIDs = 7;
int64 inMemory_percentage = 8;
}
//---- synchronize messages proto between QueryCoord and QueryNode -----
message SegmentChangeInfo {
int64 online_nodeID = 1;
repeated SegmentInfo online_segments = 2;

File diff suppressed because it is too large Load Diff

View File

@ -117,7 +117,7 @@ func getDqlChannelsFunc(ctx context.Context, proxyID int64, qc createQueryChanne
}
m := make(map[vChan]pChan)
m[resp.RequestChannel] = resp.RequestChannel
m[resp.QueryChannel] = resp.QueryChannel
return m, nil
}

View File

@ -3215,7 +3215,7 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue
IndexName: info.IndexName,
IndexID: info.IndexID,
NodeID: info.NodeID,
State: info.State,
State: info.SegmentState,
}
}
resp.Status.ErrorCode = commonpb.ErrorCode_Success
@ -3460,7 +3460,7 @@ func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceReq
},
SourceNodeIDs: []int64{req.SrcNodeID},
DstNodeIDs: req.DstNodeIDs,
BalanceReason: querypb.TriggerCondition_grpcRequest,
BalanceReason: querypb.TriggerCondition_GrpcRequest,
SealedSegmentIDs: req.SealedSegmentIDs,
})
if err != nil {

View File

@ -189,8 +189,8 @@ func (node *Proxy) Init() error {
// TODO SearchResultChannelNames and RetrieveResultChannelNames should not be part in the Param table
// we should maintain a separate map for search result
Params.SearchResultChannelNames = []string{resp.ResultChannel}
Params.RetrieveResultChannelNames = []string{resp.ResultChannel}
Params.SearchResultChannelNames = []string{resp.QueryResultChannel}
Params.RetrieveResultChannelNames = []string{resp.QueryResultChannel}
log.Debug("Proxy CreateQueryChannel success", zap.Any("SearchResultChannelNames", Params.SearchResultChannelNames))
log.Debug("Proxy CreateQueryChannel success", zap.Any("RetrieveResultChannelNames", Params.RetrieveResultChannelNames))
}

View File

@ -546,10 +546,10 @@ func TestGrpcRequest(t *testing.T) {
info, err := cluster.clusterMeta.getQueryChannelInfoByID(defaultCollectionID)
assert.Nil(t, err)
addQueryChannelReq := &querypb.AddQueryChannelRequest{
NodeID: nodeID,
CollectionID: defaultCollectionID,
RequestChannelID: info.QueryChannelID,
ResultChannelID: info.QueryResultChannelID,
NodeID: nodeID,
CollectionID: defaultCollectionID,
QueryChannel: info.QueryChannel,
QueryResultChannel: info.QueryResultChannel,
}
err = cluster.addQueryChannel(baseCtx, nodeID, addQueryChannelReq)
assert.Nil(t, err)
@ -559,10 +559,10 @@ func TestGrpcRequest(t *testing.T) {
info, err := cluster.clusterMeta.getQueryChannelInfoByID(defaultCollectionID)
assert.Nil(t, err)
removeQueryChannelReq := &querypb.RemoveQueryChannelRequest{
NodeID: nodeID,
CollectionID: defaultCollectionID,
RequestChannelID: info.QueryChannelID,
ResultChannelID: info.QueryResultChannelID,
NodeID: nodeID,
CollectionID: defaultCollectionID,
QueryChannel: info.QueryChannel,
QueryResultChannel: info.QueryResultChannel,
}
err = cluster.removeQueryChannel(baseCtx, nodeID, removeQueryChannelReq)
assert.Nil(t, err)

View File

@ -159,7 +159,7 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
return status, nil
}
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_grpcRequest)
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_GrpcRequest)
loadCollectionTask := &loadCollectionTask{
baseTask: baseTask,
LoadCollectionRequest: req,
@ -212,7 +212,7 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas
return status, nil
}
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_grpcRequest)
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_GrpcRequest)
releaseCollectionTask := &releaseCollectionTask{
baseTask: baseTask,
ReleaseCollectionRequest: req,
@ -362,7 +362,7 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
req.PartitionIDs = partitionIDsToLoad
}
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_grpcRequest)
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_GrpcRequest)
loadPartitionTask := &loadPartitionTask{
baseTask: baseTask,
LoadPartitionsRequest: req,
@ -443,7 +443,7 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
}
req.PartitionIDs = toReleasedPartitions
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_grpcRequest)
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_GrpcRequest)
releasePartitionTask := &releasePartitionTask{
baseTask: baseTask,
ReleasePartitionsRequest: req,
@ -497,9 +497,9 @@ func (qc *QueryCoord) CreateQueryChannel(ctx context.Context, req *querypb.Creat
}
return &querypb.CreateQueryChannelResponse{
Status: status,
RequestChannel: info.QueryChannelID,
ResultChannel: info.QueryResultChannelID,
Status: status,
QueryChannel: info.QueryChannel,
QueryResultChannel: info.QueryResultChannel,
}, nil
}
@ -599,7 +599,7 @@ func (qc *QueryCoord) LoadBalance(ctx context.Context, req *querypb.LoadBalanceR
return status, nil
}
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_loadBalance)
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_LoadBalance)
loadBalanceTask := &loadBalanceTask{
baseTask: baseTask,
LoadBalanceRequest: req,

View File

@ -480,14 +480,14 @@ func TestLoadBalanceTask(t *testing.T) {
SourceID: nodeID,
},
SourceNodeIDs: []int64{nodeID},
BalanceReason: querypb.TriggerCondition_nodeDown,
BalanceReason: querypb.TriggerCondition_NodeDown,
}
loadBalanceTask := &loadBalanceTask{
baseTask: &baseTask{
ctx: baseCtx,
condition: newTaskCondition(baseCtx),
triggerCondition: querypb.TriggerCondition_nodeDown,
triggerCondition: querypb.TriggerCondition_NodeDown,
},
LoadBalanceRequest: loadBalanceSegment,
rootCoord: queryCoord.rootCoordClient,

View File

@ -236,7 +236,7 @@ func (ic *IndexChecker) processHandoffAfterIndexDone() {
partitionID := segmentInfo.PartitionID
segmentID := segmentInfo.SegmentID
log.Debug("processHandoffAfterIndexDone: handoff segment start", zap.Any("segmentInfo", segmentInfo))
baseTask := newBaseTask(ic.ctx, querypb.TriggerCondition_handoff)
baseTask := newBaseTask(ic.ctx, querypb.TriggerCondition_Handoff)
handoffReq := &querypb.HandoffSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_HandoffSegments,

View File

@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus/internal/allocator"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/tsoutil"
)
@ -42,7 +43,7 @@ func TestReloadFromKV(t *testing.T) {
SegmentID: defaultSegmentID,
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID,
SegmentState: querypb.SegmentState_sealed,
SegmentState: commonpb.SegmentState_Sealed,
}
key := fmt.Sprintf("%s/%d/%d/%d", handoffSegmentPrefix, defaultCollectionID, defaultPartitionID, defaultSegmentID)
value, err := proto.Marshal(segmentInfo)
@ -102,7 +103,7 @@ func TestCheckIndexLoop(t *testing.T) {
SegmentID: defaultSegmentID,
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID,
SegmentState: querypb.SegmentState_sealed,
SegmentState: commonpb.SegmentState_Sealed,
}
key := fmt.Sprintf("%s/%d/%d/%d", handoffSegmentPrefix, defaultCollectionID, defaultPartitionID, defaultSegmentID)
value, err := proto.Marshal(segmentInfo)
@ -180,7 +181,7 @@ func TestProcessHandoffAfterIndexDone(t *testing.T) {
SegmentID: defaultSegmentID,
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID,
SegmentState: querypb.SegmentState_sealed,
SegmentState: commonpb.SegmentState_Sealed,
}
key := fmt.Sprintf("%s/%d/%d/%d", handoffSegmentPrefix, defaultCollectionID, defaultPartitionID, defaultSegmentID)
value, err := proto.Marshal(segmentInfo)

View File

@ -313,7 +313,7 @@ func (m *MetaReplica) addCollection(collectionID UniqueID, schema *schemapb.Coll
if !hasCollection {
partitions := make([]UniqueID, 0)
partitionStates := make([]*querypb.PartitionStates, 0)
channels := make([]*querypb.DmChannelInfo, 0)
channels := make([]*querypb.DmChannelWatchInfo, 0)
newCollection := &querypb.CollectionInfo{
CollectionID: collectionID,
PartitionIDs: partitions,
@ -438,7 +438,7 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
if err == nil {
offlineNodeID := offlineInfo.NodeID
// if the offline segment state is growing, it will not impact the global sealed segments
if offlineInfo.SegmentState == querypb.SegmentState_sealed {
if offlineInfo.SegmentState == commonpb.SegmentState_Sealed {
changeInfo.OfflineNodeID = offlineNodeID
changeInfo.OfflineSegments = []*querypb.SegmentInfo{offlineInfo}
}
@ -448,7 +448,7 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
// generate offline segment change info if the loaded segment is compacted from other sealed segments
for _, compactionSegmentID := range info.CompactionFrom {
compactionSegmentInfo, err := m.getSegmentInfoByID(compactionSegmentID)
if err == nil && compactionSegmentInfo.SegmentState == querypb.SegmentState_sealed {
if err == nil && compactionSegmentInfo.SegmentState == commonpb.SegmentState_Sealed {
segmentsChangeInfo.Infos = append(segmentsChangeInfo.Infos, &querypb.SegmentChangeInfo{
OfflineNodeID: compactionSegmentInfo.NodeID,
OfflineSegments: []*querypb.SegmentInfo{compactionSegmentInfo},
@ -471,14 +471,14 @@ func (m *MetaReplica) saveGlobalSealedSegInfos(saves col2SegmentInfos) (col2Seal
return nil, err
}
// len(messageIDs) == 1
messageIDs, ok := messageIDInfos[queryChannelInfo.QueryChannelID]
messageIDs, ok := messageIDInfos[queryChannelInfo.QueryChannel]
if !ok || len(messageIDs) == 0 {
return col2SegmentChangeInfos, errors.New("updateGlobalSealedSegmentInfos: send sealed segment change info failed")
}
if queryChannelInfo.SeekPosition == nil {
queryChannelInfo.SeekPosition = &internalpb.MsgPosition{
ChannelName: queryChannelInfo.QueryChannelID,
ChannelName: queryChannelInfo.QueryChannel,
}
}
@ -618,14 +618,14 @@ func (m *MetaReplica) removeGlobalSealedSegInfos(collectionID UniqueID, partitio
return nil, err
}
// len(messageIDs) = 1
messageIDs, ok := messageIDInfos[queryChannelInfo.QueryChannelID]
messageIDs, ok := messageIDInfos[queryChannelInfo.QueryChannel]
if !ok || len(messageIDs) == 0 {
return col2SealedSegmentChangeInfos{collectionID: segmentChangeInfos}, errors.New("updateGlobalSealedSegmentInfos: send sealed segment change info failed")
}
if queryChannelInfo.SeekPosition == nil {
queryChannelInfo.SeekPosition = &internalpb.MsgPosition{
ChannelName: queryChannelInfo.QueryChannelID,
ChannelName: queryChannelInfo.QueryChannel,
}
}
queryChannelInfo.SeekPosition.MsgID = messageIDs[0].Serialize()
@ -907,7 +907,7 @@ func (m *MetaReplica) addDmChannel(collectionID UniqueID, nodeID int64, channels
}
}
if !findNodeID {
newChannelInfo := &querypb.DmChannelInfo{
newChannelInfo := &querypb.DmChannelWatchInfo{
NodeIDLoaded: nodeID,
ChannelIDs: channels,
}
@ -977,8 +977,8 @@ func createQueryChannel(collectionID UniqueID) *querypb.QueryChannelInfo {
}
info := &querypb.QueryChannelInfo{
CollectionID: collectionID,
QueryChannelID: allocatedQueryChannel,
QueryResultChannelID: allocatedQueryResultChannel,
QueryChannel: allocatedQueryChannel,
QueryResultChannel: allocatedQueryResultChannel,
GlobalSealedSegments: []*querypb.SegmentInfo{},
SeekPosition: seekPosition,
}
@ -1039,7 +1039,7 @@ func (m *MetaReplica) getQueryChannelInfoByID(collectionID UniqueID) (*querypb.Q
m.queryChannelInfos[collectionID] = info
info.SeekPosition = m.globalSeekPosition
if info.SeekPosition != nil {
info.SeekPosition.ChannelName = info.QueryChannelID
info.SeekPosition.ChannelName = info.QueryChannel
}
return proto.Clone(info).(*querypb.QueryChannelInfo), nil
}
@ -1061,7 +1061,7 @@ func (m *MetaReplica) getQueryStreamByID(collectionID UniqueID) (msgstream.MsgSt
return nil, err
}
queryChannel := info.QueryChannelID
queryChannel := info.QueryChannel
stream.AsProducer([]string{queryChannel})
m.queryStreams[collectionID] = stream
log.Debug("getQueryStreamByID: create query msgStream for collection", zap.Int64("collectionID", collectionID))

View File

@ -254,8 +254,8 @@ func TestMetaFunc(t *testing.T) {
t.Run("Test getQueryChannel", func(t *testing.T) {
info, err := meta.getQueryChannelInfoByID(defaultCollectionID)
assert.NotNil(t, info.QueryChannelID)
assert.NotNil(t, info.QueryResultChannelID)
assert.NotNil(t, info.QueryChannel)
assert.NotNil(t, info.QueryResultChannel)
assert.Nil(t, err)
})

View File

@ -213,7 +213,7 @@ func (qs *queryNodeServerMock) LoadSegments(ctx context.Context, req *querypb.Lo
PartitionID: info.PartitionID,
CollectionID: info.CollectionID,
NodeID: qs.queryNodeID,
SegmentState: querypb.SegmentState_sealed,
SegmentState: commonpb.SegmentState_Sealed,
MemSize: info.NumOfRows * int64(sizePerRecord),
NumRows: info.NumOfRows,
}

View File

@ -328,7 +328,7 @@ func (qc *QueryCoord) watchNodeLoop() {
SourceNodeIDs: offlineNodeIDs,
}
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_nodeDown)
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_NodeDown)
loadBalanceTask := &loadBalanceTask{
baseTask: baseTask,
LoadBalanceRequest: loadBalanceSegment,
@ -377,10 +377,10 @@ func (qc *QueryCoord) watchNodeLoop() {
SourceID: qc.session.ServerID,
},
SourceNodeIDs: []int64{serverID},
BalanceReason: querypb.TriggerCondition_nodeDown,
BalanceReason: querypb.TriggerCondition_NodeDown,
}
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_nodeDown)
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_NodeDown)
loadBalanceTask := &loadBalanceTask{
baseTask: baseTask,
LoadBalanceRequest: loadBalanceSegment,
@ -526,12 +526,12 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() {
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadBalanceSegments,
},
BalanceReason: querypb.TriggerCondition_loadBalance,
BalanceReason: querypb.TriggerCondition_LoadBalance,
SourceNodeIDs: []UniqueID{sourceNodeID},
DstNodeIDs: []UniqueID{dstNodeID},
SealedSegmentIDs: []UniqueID{selectedSegmentInfo.SegmentID},
}
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_loadBalance)
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_LoadBalance)
balanceTask := &loadBalanceTask{
baseTask: baseTask,
LoadBalanceRequest: req,

View File

@ -248,7 +248,7 @@ func TestHandoffSegmentLoop(t *testing.T) {
SegmentID: defaultSegmentID,
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID,
SegmentState: querypb.SegmentState_sealed,
SegmentState: commonpb.SegmentState_Sealed,
}
key := fmt.Sprintf("%s/%d/%d/%d", handoffSegmentPrefix, defaultCollectionID, defaultPartitionID, defaultSegmentID)
@ -270,12 +270,12 @@ func TestHandoffSegmentLoop(t *testing.T) {
waitTaskFinalState(loadPartitionTask, taskExpired)
t.Run("Test partitionNotLoaded", func(t *testing.T) {
baseTask := newBaseTask(baseCtx, querypb.TriggerCondition_handoff)
baseTask := newBaseTask(baseCtx, querypb.TriggerCondition_Handoff)
segmentInfo := &querypb.SegmentInfo{
SegmentID: defaultSegmentID,
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID + 1,
SegmentState: querypb.SegmentState_sealed,
SegmentState: commonpb.SegmentState_Sealed,
}
handoffReq := &querypb.HandoffSegmentsRequest{
Base: &commonpb.MsgBase{
@ -305,13 +305,13 @@ func TestHandoffSegmentLoop(t *testing.T) {
infos := queryCoord.meta.showSegmentInfos(defaultCollectionID, nil)
assert.NotEqual(t, 0, len(infos))
segmentID := defaultSegmentID + 4
baseTask := newBaseTask(baseCtx, querypb.TriggerCondition_handoff)
baseTask := newBaseTask(baseCtx, querypb.TriggerCondition_Handoff)
segmentInfo := &querypb.SegmentInfo{
SegmentID: segmentID,
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID + 2,
SegmentState: querypb.SegmentState_sealed,
SegmentState: commonpb.SegmentState_Sealed,
}
handoffReq := &querypb.HandoffSegmentsRequest{
Base: &commonpb.MsgBase{
@ -333,12 +333,12 @@ func TestHandoffSegmentLoop(t *testing.T) {
})
t.Run("Test binlogNotExist", func(t *testing.T) {
baseTask := newBaseTask(baseCtx, querypb.TriggerCondition_handoff)
baseTask := newBaseTask(baseCtx, querypb.TriggerCondition_Handoff)
segmentInfo := &querypb.SegmentInfo{
SegmentID: defaultSegmentID + 100,
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID,
SegmentState: querypb.SegmentState_sealed,
SegmentState: commonpb.SegmentState_Sealed,
}
handoffReq := &querypb.HandoffSegmentsRequest{
Base: &commonpb.MsgBase{
@ -360,12 +360,12 @@ func TestHandoffSegmentLoop(t *testing.T) {
})
t.Run("Test sealedSegmentExist", func(t *testing.T) {
baseTask := newBaseTask(baseCtx, querypb.TriggerCondition_handoff)
baseTask := newBaseTask(baseCtx, querypb.TriggerCondition_Handoff)
segmentInfo := &querypb.SegmentInfo{
SegmentID: defaultSegmentID,
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID,
SegmentState: querypb.SegmentState_sealed,
SegmentState: commonpb.SegmentState_Sealed,
}
handoffReq := &querypb.HandoffSegmentsRequest{
Base: &commonpb.MsgBase{
@ -390,13 +390,13 @@ func TestHandoffSegmentLoop(t *testing.T) {
infos := queryCoord.meta.showSegmentInfos(defaultCollectionID, nil)
assert.NotEqual(t, 0, len(infos))
segmentID := defaultSegmentID + 5
baseTask := newBaseTask(baseCtx, querypb.TriggerCondition_handoff)
baseTask := newBaseTask(baseCtx, querypb.TriggerCondition_Handoff)
segmentInfo := &querypb.SegmentInfo{
SegmentID: segmentID,
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID + 2,
SegmentState: querypb.SegmentState_sealed,
SegmentState: commonpb.SegmentState_Sealed,
CompactionFrom: []UniqueID{defaultSegmentID, defaultSegmentID + 1},
}
handoffReq := &querypb.HandoffSegmentsRequest{
@ -429,13 +429,13 @@ func TestHandoffSegmentLoop(t *testing.T) {
infos := queryCoord.meta.showSegmentInfos(defaultCollectionID, nil)
assert.NotEqual(t, 0, len(infos))
segmentID := defaultSegmentID + 6
baseTask := newBaseTask(baseCtx, querypb.TriggerCondition_handoff)
baseTask := newBaseTask(baseCtx, querypb.TriggerCondition_Handoff)
segmentInfo := &querypb.SegmentInfo{
SegmentID: segmentID,
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID + 2,
SegmentState: querypb.SegmentState_sealed,
SegmentState: commonpb.SegmentState_Sealed,
CompactionFrom: []UniqueID{defaultSegmentID + 2, defaultSegmentID + 100},
}
handoffReq := &querypb.HandoffSegmentsRequest{
@ -470,12 +470,12 @@ func TestHandoffSegmentLoop(t *testing.T) {
waitTaskFinalState(releasePartitionTask, taskExpired)
t.Run("Test handoffReleasedPartition", func(t *testing.T) {
baseTask := newBaseTask(baseCtx, querypb.TriggerCondition_handoff)
baseTask := newBaseTask(baseCtx, querypb.TriggerCondition_Handoff)
segmentInfo := &querypb.SegmentInfo{
SegmentID: defaultSegmentID,
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID,
SegmentState: querypb.SegmentState_sealed,
SegmentState: commonpb.SegmentState_Sealed,
}
handoffReq := &querypb.HandoffSegmentsRequest{
Base: &commonpb.MsgBase{
@ -529,7 +529,7 @@ func TestLoadBalanceSegmentLoop(t *testing.T) {
PartitionIDs: []UniqueID{partitionID},
Schema: genCollectionSchema(defaultCollectionID, false),
}
baseTask := newBaseTask(baseCtx, querypb.TriggerCondition_grpcRequest)
baseTask := newBaseTask(baseCtx, querypb.TriggerCondition_GrpcRequest)
loadPartitionTask := &loadPartitionTask{
baseTask: baseTask,
LoadPartitionsRequest: req,

View File

@ -158,7 +158,7 @@ func (qn *queryNode) addCollection(collectionID UniqueID, schema *schemapb.Colle
if _, ok := qn.collectionInfos[collectionID]; !ok {
partitions := make([]UniqueID, 0)
channels := make([]*querypb.DmChannelInfo, 0)
channels := make([]*querypb.DmChannelWatchInfo, 0)
newCollection := &querypb.CollectionInfo{
CollectionID: collectionID,
PartitionIDs: partitions,
@ -282,7 +282,7 @@ func (qn *queryNode) addDmChannel(collectionID UniqueID, channels []string) erro
}
}
if !findNodeID {
newChannelInfo := &querypb.DmChannelInfo{
newChannelInfo := &querypb.DmChannelWatchInfo{
NodeIDLoaded: qn.id,
ChannelIDs: channels,
}
@ -479,9 +479,9 @@ func (qn *queryNode) addQueryChannel(ctx context.Context, in *querypb.AddQueryCh
}
queryChannelInfo := &querypb.QueryChannelInfo{
CollectionID: in.CollectionID,
QueryChannelID: in.RequestChannelID,
QueryResultChannelID: in.ResultChannelID,
CollectionID: in.CollectionID,
QueryChannel: in.QueryChannel,
QueryResultChannel: in.QueryResultChannel,
}
qn.setQueryChannelInfo(queryChannelInfo)
return nil

View File

@ -403,11 +403,10 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error {
msgBase := proto.Clone(lct.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_LoadSegments
loadSegmentReq := &querypb.LoadSegmentsRequest{
Base: msgBase,
Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo},
Schema: lct.Schema,
LoadCondition: querypb.TriggerCondition_grpcRequest,
CollectionID: collectionID,
Base: msgBase,
Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo},
Schema: lct.Schema,
CollectionID: collectionID,
}
loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq)
@ -527,7 +526,7 @@ func (lct *loadCollectionTask) rollBack(ctx context.Context) []task {
CollectionID: lct.CollectionID,
NodeID: nodeID,
}
baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest)
baseTask := newBaseTask(ctx, querypb.TriggerCondition_GrpcRequest)
baseTask.setParentTask(lct)
releaseCollectionTask := &releaseCollectionTask{
baseTask: baseTask,
@ -615,7 +614,7 @@ func (rct *releaseCollectionTask) execute(ctx context.Context) error {
for nodeID := range nodes {
req := proto.Clone(rct.ReleaseCollectionRequest).(*querypb.ReleaseCollectionRequest)
req.NodeID = nodeID
baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest)
baseTask := newBaseTask(ctx, querypb.TriggerCondition_GrpcRequest)
baseTask.setParentTask(rct)
releaseCollectionTask := &releaseCollectionTask{
baseTask: baseTask,
@ -772,11 +771,10 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error {
msgBase := proto.Clone(lpt.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_LoadSegments
loadSegmentReq := &querypb.LoadSegmentsRequest{
Base: msgBase,
Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo},
Schema: lpt.Schema,
LoadCondition: querypb.TriggerCondition_grpcRequest,
CollectionID: collectionID,
Base: msgBase,
Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo},
Schema: lpt.Schema,
CollectionID: collectionID,
}
loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq)
}
@ -883,7 +881,7 @@ func (lpt *loadPartitionTask) rollBack(ctx context.Context) []task {
CollectionID: lpt.CollectionID,
NodeID: nodeID,
}
baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest)
baseTask := newBaseTask(ctx, querypb.TriggerCondition_GrpcRequest)
baseTask.setParentTask(lpt)
releaseCollectionTask := &releaseCollectionTask{
baseTask: baseTask,
@ -908,7 +906,7 @@ func (lpt *loadPartitionTask) rollBack(ctx context.Context) []task {
NodeID: nodeID,
}
baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest)
baseTask := newBaseTask(ctx, querypb.TriggerCondition_GrpcRequest)
baseTask.setParentTask(lpt)
releasePartitionTask := &releasePartitionTask{
baseTask: baseTask,
@ -970,7 +968,7 @@ func (rpt *releasePartitionTask) execute(ctx context.Context) error {
for nodeID := range nodes {
req := proto.Clone(rpt.ReleasePartitionsRequest).(*querypb.ReleasePartitionsRequest)
req.NodeID = nodeID
baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest)
baseTask := newBaseTask(ctx, querypb.TriggerCondition_GrpcRequest)
baseTask.setParentTask(rpt)
releasePartitionTask := &releasePartitionTask{
baseTask: baseTask,
@ -1104,12 +1102,11 @@ func (lst *loadSegmentTask) reschedule(ctx context.Context) ([]task, error) {
msgBase := proto.Clone(lst.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_LoadSegments
req := &querypb.LoadSegmentsRequest{
Base: msgBase,
Infos: []*querypb.SegmentLoadInfo{info},
Schema: lst.Schema,
LoadCondition: lst.triggerCondition,
SourceNodeID: lst.SourceNodeID,
CollectionID: lst.CollectionID,
Base: msgBase,
Infos: []*querypb.SegmentLoadInfo{info},
Schema: lst.Schema,
SourceNodeID: lst.SourceNodeID,
CollectionID: lst.CollectionID,
}
loadSegmentReqs = append(loadSegmentReqs, req)
}
@ -1429,8 +1426,8 @@ func (wqt *watchQueryChannelTask) preExecute(context.Context) error {
wqt.setResultInfo(nil)
log.Debug("start do watchQueryChannelTask",
zap.Int64("collectionID", wqt.CollectionID),
zap.String("queryChannel", wqt.RequestChannelID),
zap.String("queryResultChannel", wqt.ResultChannelID),
zap.String("queryChannel", wqt.QueryChannel),
zap.String("queryResultChannel", wqt.QueryResultChannel),
zap.Int64("loaded nodeID", wqt.NodeID),
zap.Int64("taskID", wqt.getTaskID()))
return nil
@ -1450,8 +1447,8 @@ func (wqt *watchQueryChannelTask) execute(ctx context.Context) error {
log.Debug("watchQueryChannelTask Execute done",
zap.Int64("collectionID", wqt.CollectionID),
zap.String("queryChannel", wqt.RequestChannelID),
zap.String("queryResultChannel", wqt.ResultChannelID),
zap.String("queryChannel", wqt.QueryChannel),
zap.String("queryResultChannel", wqt.QueryResultChannel),
zap.Int64("taskID", wqt.getTaskID()))
return nil
}
@ -1459,8 +1456,8 @@ func (wqt *watchQueryChannelTask) execute(ctx context.Context) error {
func (wqt *watchQueryChannelTask) postExecute(context.Context) error {
log.Debug("watchQueryChannelTask postExecute done",
zap.Int64("collectionID", wqt.CollectionID),
zap.String("queryChannel", wqt.RequestChannelID),
zap.String("queryResultChannel", wqt.ResultChannelID),
zap.String("queryChannel", wqt.QueryChannel),
zap.String("queryResultChannel", wqt.QueryResultChannel),
zap.Int64("taskID", wqt.getTaskID()))
return nil
}
@ -1572,10 +1569,9 @@ func (ht *handoffTask) execute(ctx context.Context) error {
msgBase := proto.Clone(ht.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_LoadSegments
loadSegmentReq = &querypb.LoadSegmentsRequest{
Base: msgBase,
Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo},
Schema: collectionInfo.Schema,
LoadCondition: querypb.TriggerCondition_handoff,
Base: msgBase,
Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo},
Schema: collectionInfo.Schema,
}
}
}
@ -1694,7 +1690,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
lbt.retryCount--
}()
if lbt.triggerCondition == querypb.TriggerCondition_nodeDown {
if lbt.triggerCondition == querypb.TriggerCondition_NodeDown {
for _, nodeID := range lbt.SourceNodeIDs {
collectionInfos := lbt.cluster.getCollectionInfosByID(lbt.ctx, nodeID)
for _, info := range collectionInfos {
@ -1754,11 +1750,10 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_LoadSegments
loadSegmentReq := &querypb.LoadSegmentsRequest{
Base: msgBase,
Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo},
Schema: schema,
LoadCondition: querypb.TriggerCondition_nodeDown,
SourceNodeID: nodeID,
Base: msgBase,
Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo},
Schema: schema,
SourceNodeID: nodeID,
}
segmentsToLoad = append(segmentsToLoad, segmentID)
@ -1847,7 +1842,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
}
//TODO:: use request.DstNodeIDs to balance
if lbt.triggerCondition == querypb.TriggerCondition_loadBalance {
if lbt.triggerCondition == querypb.TriggerCondition_LoadBalance {
if len(lbt.SourceNodeIDs) == 0 {
err := errors.New("loadBalanceTask: empty source Node list to balance")
log.Error(err.Error())
@ -1956,10 +1951,9 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_LoadSegments
loadSegmentReq := &querypb.LoadSegmentsRequest{
Base: msgBase,
Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo},
Schema: collectionInfo.Schema,
LoadCondition: querypb.TriggerCondition_grpcRequest,
Base: msgBase,
Infos: []*querypb.SegmentLoadInfo{segmentLoadInfo},
Schema: collectionInfo.Schema,
}
segmentsToLoad = append(segmentsToLoad, segmentID)
@ -2015,7 +2009,7 @@ func (lbt *loadBalanceTask) postExecute(context.Context) error {
if lbt.result.ErrorCode != commonpb.ErrorCode_Success {
lbt.childTasks = []task{}
}
if lbt.triggerCondition == querypb.TriggerCondition_nodeDown {
if lbt.triggerCondition == querypb.TriggerCondition_NodeDown {
for _, id := range lbt.SourceNodeIDs {
err := lbt.cluster.removeNodeInfo(id)
if err != nil {
@ -2184,8 +2178,8 @@ func assignInternalTask(ctx context.Context,
Base: msgBase,
NodeID: nodeID,
CollectionID: collectionID,
RequestChannelID: queryChannelInfo.QueryChannelID,
ResultChannelID: queryChannelInfo.QueryResultChannelID,
QueryChannel: queryChannelInfo.QueryChannel,
QueryResultChannel: queryChannelInfo.QueryResultChannel,
GlobalSealedSegments: queryChannelInfo.GlobalSealedSegments,
SeekPosition: queryChannelInfo.SeekPosition,
}

View File

@ -266,7 +266,7 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task,
return nil, fmt.Errorf("failed to unmarshal message header, err %s ", err.Error())
}
var newTask task
baseTask := newBaseTask(scheduler.ctx, querypb.TriggerCondition_grpcRequest)
baseTask := newBaseTask(scheduler.ctx, querypb.TriggerCondition_GrpcRequest)
switch header.Base.MsgType {
case commonpb.MsgType_LoadCollection:
loadReq := querypb.LoadCollectionRequest{}
@ -906,7 +906,7 @@ func updateSegmentInfoFromTask(ctx context.Context, triggerTask task, meta Meta)
CollectionID: loadInfo.CollectionID,
PartitionID: loadInfo.PartitionID,
NodeID: dstNodeID,
SegmentState: querypb.SegmentState_sealed,
SegmentState: commonpb.SegmentState_Sealed,
CompactionFrom: loadInfo.CompactionFrom,
}
if _, ok := segmentInfosToSave[collectionID]; !ok {

View File

@ -178,7 +178,7 @@ func TestWatchQueryChannel_ClearEtcdInfoAfterAssignedNodeDown(t *testing.T) {
baseTask: baseTask{
ctx: baseCtx,
condition: newTaskCondition(baseCtx),
triggerCondition: querypb.TriggerCondition_grpcRequest,
triggerCondition: querypb.TriggerCondition_GrpcRequest,
},
baseMsg: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchQueryChannels,
@ -511,7 +511,7 @@ func Test_saveInternalTaskToEtcd(t *testing.T) {
baseTask: baseTask{
ctx: ctx,
condition: newTaskCondition(ctx),
triggerCondition: querypb.TriggerCondition_grpcRequest,
triggerCondition: querypb.TriggerCondition_GrpcRequest,
taskID: 100,
},
baseMsg: &commonpb.MsgBase{

View File

@ -37,7 +37,7 @@ func genLoadCollectionTask(ctx context.Context, queryCoord *QueryCoord) *loadCol
CollectionID: defaultCollectionID,
Schema: genCollectionSchema(defaultCollectionID, false),
}
baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest)
baseTask := newBaseTask(ctx, querypb.TriggerCondition_GrpcRequest)
loadCollectionTask := &loadCollectionTask{
baseTask: baseTask,
LoadCollectionRequest: req,
@ -59,7 +59,7 @@ func genLoadPartitionTask(ctx context.Context, queryCoord *QueryCoord) *loadPart
PartitionIDs: []UniqueID{defaultPartitionID},
Schema: genCollectionSchema(defaultCollectionID, false),
}
baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest)
baseTask := newBaseTask(ctx, querypb.TriggerCondition_GrpcRequest)
loadPartitionTask := &loadPartitionTask{
baseTask: baseTask,
LoadPartitionsRequest: req,
@ -79,7 +79,7 @@ func genReleaseCollectionTask(ctx context.Context, queryCoord *QueryCoord) *rele
},
CollectionID: defaultCollectionID,
}
baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest)
baseTask := newBaseTask(ctx, querypb.TriggerCondition_GrpcRequest)
releaseCollectionTask := &releaseCollectionTask{
baseTask: baseTask,
ReleaseCollectionRequest: req,
@ -99,7 +99,7 @@ func genReleasePartitionTask(ctx context.Context, queryCoord *QueryCoord) *relea
CollectionID: defaultCollectionID,
PartitionIDs: []UniqueID{defaultPartitionID},
}
baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest)
baseTask := newBaseTask(ctx, querypb.TriggerCondition_GrpcRequest)
releasePartitionTask := &releasePartitionTask{
baseTask: baseTask,
ReleasePartitionsRequest: req,
@ -119,7 +119,7 @@ func genReleaseSegmentTask(ctx context.Context, queryCoord *QueryCoord, nodeID i
PartitionIDs: []UniqueID{defaultPartitionID},
SegmentIDs: []UniqueID{defaultSegmentID},
}
baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest)
baseTask := newBaseTask(ctx, querypb.TriggerCondition_GrpcRequest)
releaseSegmentTask := &releaseSegmentTask{
baseTask: baseTask,
ReleaseSegmentsRequest: req,
@ -144,7 +144,7 @@ func genWatchDmChannelTask(ctx context.Context, queryCoord *QueryCoord, nodeID i
Schema: schema,
Infos: []*datapb.VchannelInfo{vChannelInfo},
}
baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest)
baseTask := newBaseTask(ctx, querypb.TriggerCondition_GrpcRequest)
baseTask.taskID = 100
watchDmChannelTask := &watchDmChannelTask{
baseTask: baseTask,
@ -161,7 +161,7 @@ func genWatchDmChannelTask(ctx context.Context, queryCoord *QueryCoord, nodeID i
CollectionID: defaultCollectionID,
Schema: genCollectionSchema(defaultCollectionID, false),
}
baseParentTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest)
baseParentTask := newBaseTask(ctx, querypb.TriggerCondition_GrpcRequest)
baseParentTask.taskID = 10
baseParentTask.setState(taskDone)
parentTask := &loadCollectionTask{
@ -197,7 +197,7 @@ func genLoadSegmentTask(ctx context.Context, queryCoord *QueryCoord, nodeID int6
Infos: []*querypb.SegmentLoadInfo{segmentInfo},
CollectionID: defaultCollectionID,
}
baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest)
baseTask := newBaseTask(ctx, querypb.TriggerCondition_GrpcRequest)
baseTask.taskID = 100
loadSegmentTask := &loadSegmentTask{
baseTask: baseTask,
@ -214,7 +214,7 @@ func genLoadSegmentTask(ctx context.Context, queryCoord *QueryCoord, nodeID int6
CollectionID: defaultCollectionID,
Schema: genCollectionSchema(defaultCollectionID, false),
}
baseParentTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest)
baseParentTask := newBaseTask(ctx, querypb.TriggerCondition_GrpcRequest)
baseParentTask.taskID = 10
baseParentTask.setState(taskDone)
parentTask := &loadCollectionTask{
@ -773,13 +773,13 @@ func Test_handoffSegmentFail(t *testing.T) {
infos := queryCoord.meta.showSegmentInfos(defaultCollectionID, nil)
assert.NotEqual(t, 0, len(infos))
segmentID := defaultSegmentID + 4
baseTask := newBaseTask(ctx, querypb.TriggerCondition_handoff)
baseTask := newBaseTask(ctx, querypb.TriggerCondition_Handoff)
segmentInfo := &querypb.SegmentInfo{
SegmentID: segmentID,
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID + 2,
SegmentState: querypb.SegmentState_sealed,
SegmentState: commonpb.SegmentState_Sealed,
}
handoffReq := &querypb.HandoffSegmentsRequest{
Base: &commonpb.MsgBase{
@ -827,7 +827,7 @@ func TestLoadBalanceSegmentsTask(t *testing.T) {
waitQueryNodeOnline(queryCoord.cluster, node2.queryNodeID)
t.Run("Test LoadBalanceBySegmentID", func(t *testing.T) {
baseTask := newBaseTask(ctx, querypb.TriggerCondition_loadBalance)
baseTask := newBaseTask(ctx, querypb.TriggerCondition_LoadBalance)
loadBalanceTask := &loadBalanceTask{
baseTask: baseTask,
LoadBalanceRequest: &querypb.LoadBalanceRequest{
@ -849,7 +849,7 @@ func TestLoadBalanceSegmentsTask(t *testing.T) {
})
t.Run("Test LoadBalanceByNotExistSegmentID", func(t *testing.T) {
baseTask := newBaseTask(ctx, querypb.TriggerCondition_loadBalance)
baseTask := newBaseTask(ctx, querypb.TriggerCondition_LoadBalance)
loadBalanceTask := &loadBalanceTask{
baseTask: baseTask,
LoadBalanceRequest: &querypb.LoadBalanceRequest{
@ -871,7 +871,7 @@ func TestLoadBalanceSegmentsTask(t *testing.T) {
})
t.Run("Test LoadBalanceByNode", func(t *testing.T) {
baseTask := newBaseTask(ctx, querypb.TriggerCondition_loadBalance)
baseTask := newBaseTask(ctx, querypb.TriggerCondition_LoadBalance)
loadBalanceTask := &loadBalanceTask{
baseTask: baseTask,
LoadBalanceRequest: &querypb.LoadBalanceRequest{
@ -892,7 +892,7 @@ func TestLoadBalanceSegmentsTask(t *testing.T) {
})
t.Run("Test LoadBalanceWithEmptySourceNode", func(t *testing.T) {
baseTask := newBaseTask(ctx, querypb.TriggerCondition_loadBalance)
baseTask := newBaseTask(ctx, querypb.TriggerCondition_LoadBalance)
loadBalanceTask := &loadBalanceTask{
baseTask: baseTask,
LoadBalanceRequest: &querypb.LoadBalanceRequest{
@ -912,7 +912,7 @@ func TestLoadBalanceSegmentsTask(t *testing.T) {
})
t.Run("Test LoadBalanceByNotExistNode", func(t *testing.T) {
baseTask := newBaseTask(ctx, querypb.TriggerCondition_loadBalance)
baseTask := newBaseTask(ctx, querypb.TriggerCondition_LoadBalance)
loadBalanceTask := &loadBalanceTask{
baseTask: baseTask,
LoadBalanceRequest: &querypb.LoadBalanceRequest{
@ -968,7 +968,7 @@ func TestLoadBalanceIndexedSegmentsTask(t *testing.T) {
assert.Nil(t, err)
waitQueryNodeOnline(queryCoord.cluster, node2.queryNodeID)
baseTask := newBaseTask(ctx, querypb.TriggerCondition_loadBalance)
baseTask := newBaseTask(ctx, querypb.TriggerCondition_LoadBalance)
loadBalanceTask := &loadBalanceTask{
baseTask: baseTask,
LoadBalanceRequest: &querypb.LoadBalanceRequest{

View File

@ -715,8 +715,8 @@ func getSegmentInfo(segment *Segment) *querypb.SegmentInfo {
NumRows: segment.getRowCount(),
IndexName: indexName,
IndexID: indexID,
ChannelID: segment.vChannelID,
State: getSegmentStateBySegmentType(segment.segmentType),
DmChannel: segment.vChannelID,
SegmentState: getSegmentStateBySegmentType(segment.segmentType),
}
return info
}

View File

@ -90,10 +90,10 @@ func TestImpl_AddQueryChannel(t *testing.T) {
MsgType: commonpb.MsgType_LoadCollection,
MsgID: rand.Int63(),
},
NodeID: 0,
CollectionID: defaultCollectionID,
RequestChannelID: genQueryChannel(),
ResultChannelID: genQueryResultChannel(),
NodeID: 0,
CollectionID: defaultCollectionID,
QueryChannel: genQueryChannel(),
QueryResultChannel: genQueryResultChannel(),
}
status, err := node.AddQueryChannel(ctx, req)
@ -159,9 +159,8 @@ func TestImpl_LoadSegments(t *testing.T) {
MsgType: commonpb.MsgType_WatchQueryChannels,
MsgID: rand.Int63(),
},
DstNodeID: 0,
Schema: schema,
LoadCondition: queryPb.TriggerCondition_grpcRequest,
DstNodeID: 0,
Schema: schema,
}
status, err := node.LoadSegments(ctx, req)

View File

@ -386,7 +386,7 @@ func (q *queryCollection) adjustByChangeInfo(msg *msgstream.SealedSegmentsChange
ID: segment.SegmentID,
CollectionID: segment.CollectionID,
PartitionID: segment.PartitionID,
InsertChannel: segment.ChannelID,
InsertChannel: segment.DmChannel,
NumOfRows: segment.NumRows,
// TODO: add status, remove query pb segment status, use common pb segment status?
DmlPosition: &internalpb.MsgPosition{

View File

@ -162,8 +162,8 @@ func initTestMeta(t *testing.T, node *QueryNode, collectionID UniqueID, segmentI
func initSearchChannel(ctx context.Context, searchChan string, resultChan string, node *QueryNode) {
searchReq := &querypb.AddQueryChannelRequest{
RequestChannelID: searchChan,
ResultChannelID: resultChan,
QueryChannel: searchChan,
QueryResultChannel: resultChan,
}
_, err := node.AddQueryChannel(ctx, searchReq)
if err != nil {

View File

@ -53,9 +53,8 @@ func TestSegmentLoader_loadSegment(t *testing.T) {
MsgType: commonpb.MsgType_WatchQueryChannels,
MsgID: rand.Int63(),
},
DstNodeID: 0,
Schema: schema,
LoadCondition: querypb.TriggerCondition_grpcRequest,
DstNodeID: 0,
Schema: schema,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: defaultSegmentID,
@ -85,9 +84,8 @@ func TestSegmentLoader_loadSegment(t *testing.T) {
MsgType: commonpb.MsgType_WatchQueryChannels,
MsgID: rand.Int63(),
},
DstNodeID: 0,
Schema: schema,
LoadCondition: querypb.TriggerCondition_grpcRequest,
DstNodeID: 0,
Schema: schema,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: defaultSegmentID,
@ -204,8 +202,7 @@ func TestSegmentLoader_invalid(t *testing.T) {
MsgType: commonpb.MsgType_WatchQueryChannels,
MsgID: rand.Int63(),
},
DstNodeID: 0,
LoadCondition: querypb.TriggerCondition_grpcRequest,
DstNodeID: 0,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: defaultSegmentID,
@ -281,9 +278,8 @@ func TestSegmentLoader_invalid(t *testing.T) {
MsgType: commonpb.MsgType_WatchQueryChannels,
MsgID: rand.Int63(),
},
DstNodeID: 0,
Schema: schema,
LoadCondition: querypb.TriggerCondition_grpcRequest,
DstNodeID: 0,
Schema: schema,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: defaultSegmentID,
@ -434,9 +430,8 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) {
MsgType: commonpb.MsgType_WatchQueryChannels,
MsgID: rand.Int63(),
},
DstNodeID: 0,
Schema: schema,
LoadCondition: querypb.TriggerCondition_grpcRequest,
DstNodeID: 0,
Schema: schema,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: segmentID1,
@ -456,9 +451,8 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) {
MsgType: commonpb.MsgType_WatchQueryChannels,
MsgID: rand.Int63(),
},
DstNodeID: 0,
Schema: schema,
LoadCondition: querypb.TriggerCondition_grpcRequest,
DstNodeID: 0,
Schema: schema,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: segmentID2,

View File

@ -156,7 +156,7 @@ func (r *addQueryChannelTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
consumeChannels := []string{r.req.RequestChannelID}
consumeChannels := []string{r.req.QueryChannel}
consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collectionID, 10) + "-" + strconv.Itoa(rand.Int())
if Params.skipQueryChannelRecovery {
@ -181,7 +181,7 @@ func (r *addQueryChannelTask) Execute(ctx context.Context) error {
}
// add result channel
producerChannels := []string{r.req.ResultChannelID}
producerChannels := []string{r.req.QueryResultChannel}
sc.queryResultMsgStream.AsProducer(producerChannels)
log.Debug("QueryNode AsProducer", zap.Strings("channels", producerChannels))

View File

@ -36,11 +36,11 @@ func TestTask_AddQueryChannel(t *testing.T) {
genAddQueryChanelRequest := func() *querypb.AddQueryChannelRequest {
return &querypb.AddQueryChannelRequest{
Base: genCommonMsgBase(commonpb.MsgType_LoadCollection),
NodeID: 0,
CollectionID: defaultCollectionID,
RequestChannelID: genQueryChannel(),
ResultChannelID: genQueryResultChannel(),
Base: genCommonMsgBase(commonpb.MsgType_LoadCollection),
NodeID: 0,
CollectionID: defaultCollectionID,
QueryChannel: genQueryChannel(),
QueryResultChannel: genQueryResultChannel(),
}
}
@ -409,9 +409,8 @@ func TestTask_loadSegmentsTask(t *testing.T) {
genLoadEmptySegmentsRequest := func() *querypb.LoadSegmentsRequest {
schema := genSimpleSegCoreSchema()
req := &querypb.LoadSegmentsRequest{
Base: genCommonMsgBase(commonpb.MsgType_LoadSegments),
Schema: schema,
LoadCondition: querypb.TriggerCondition_grpcRequest,
Base: genCommonMsgBase(commonpb.MsgType_LoadSegments),
Schema: schema,
}
return req
}
@ -450,9 +449,8 @@ func TestTask_loadSegmentsTask(t *testing.T) {
assert.NoError(t, err)
req := &querypb.LoadSegmentsRequest{
Base: genCommonMsgBase(commonpb.MsgType_LoadSegments),
Schema: schema,
LoadCondition: querypb.TriggerCondition_grpcRequest,
Base: genCommonMsgBase(commonpb.MsgType_LoadSegments),
Schema: schema,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: defaultSegmentID,
@ -505,7 +503,6 @@ func TestTask_loadSegmentsTask(t *testing.T) {
CollectionID: defaultCollectionID + 1,
},
}
task.req.LoadCondition = querypb.TriggerCondition_nodeDown
err = task.Execute(ctx)
assert.Error(t, err)
})
@ -534,7 +531,6 @@ func TestTask_loadSegmentsTask(t *testing.T) {
NumOfRows: totalRAM / int64(sizePerRecord),
},
}
task.req.LoadCondition = querypb.TriggerCondition_handoff
err = task.Execute(ctx)
assert.Error(t, err)
})