mirror of https://github.com/milvus-io/milvus.git
Seek Query/Query Result channel to Latest (#10985)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>pull/11314/head
parent
a4715996ef
commit
d81b15163f
|
@ -164,6 +164,8 @@ msgChannel:
|
|||
dataCoordStatistic: "datacoord-statistics-channel"
|
||||
dataCoordTimeTick: "datacoord-timetick-channel"
|
||||
dataCoordSegmentInfo: "segment-info-channel"
|
||||
# skip replay query channel under failure recovery
|
||||
skipQueryChannelRecovery: "false"
|
||||
|
||||
# Sub name generation rule: ${subNamePrefix}-${NodeID}
|
||||
subNamePrefix:
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/msgstream"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/util/mqclient"
|
||||
"github.com/milvus-io/milvus/internal/util/trace"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
oplog "github.com/opentracing/opentracing-go/log"
|
||||
|
@ -638,7 +639,8 @@ func (sched *taskScheduler) collectResultLoop() {
|
|||
defer sched.wg.Done()
|
||||
|
||||
queryResultMsgStream, _ := sched.msFactory.NewQueryMsgStream(sched.ctx)
|
||||
queryResultMsgStream.AsConsumer(Params.SearchResultChannelNames, Params.ProxySubName)
|
||||
// proxy didn't need to walk through all the search results in channel, because it no longer has client connections.
|
||||
queryResultMsgStream.AsConsumerWithPosition(Params.SearchResultChannelNames, Params.ProxySubName, mqclient.SubscriptionPositionLatest)
|
||||
log.Debug("Proxy", zap.Strings("SearchResultChannelNames", Params.SearchResultChannelNames),
|
||||
zap.Any("ProxySubName", Params.ProxySubName))
|
||||
|
||||
|
|
|
@ -909,6 +909,7 @@ func createQueryChannel(collectionID UniqueID) *querypb.QueryChannelInfo {
|
|||
return info
|
||||
}
|
||||
|
||||
// Get Query channel info for collection, so far all the collection share the same query channel 0
|
||||
func (m *MetaReplica) getQueryChannelInfoByID(collectionID UniqueID) (*querypb.QueryChannelInfo, error) {
|
||||
m.channelMu.Lock()
|
||||
defer m.channelMu.Unlock()
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/util/mqclient"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
)
|
||||
|
||||
|
@ -133,21 +134,29 @@ func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQuery
|
|||
}
|
||||
consumeChannels := []string{in.RequestChannelID}
|
||||
consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collectionID, 10) + "-" + strconv.Itoa(rand.Int())
|
||||
sc.queryMsgStream.AsConsumer(consumeChannels, consumeSubName)
|
||||
if in.SeekPosition == nil || len(in.SeekPosition.MsgID) == 0 {
|
||||
// as consumer
|
||||
log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName)
|
||||
|
||||
if Params.skipQueryChannelRecovery {
|
||||
log.Debug("Skip query channel seek back ", zap.Strings("channels", consumeChannels),
|
||||
zap.String("seek position", string(in.SeekPosition.MsgID)),
|
||||
zap.Uint64("ts", in.SeekPosition.Timestamp))
|
||||
sc.queryMsgStream.AsConsumerWithPosition(consumeChannels, consumeSubName, mqclient.SubscriptionPositionLatest)
|
||||
} else {
|
||||
// seek query channel
|
||||
err = sc.queryMsgStream.Seek([]*internalpb.MsgPosition{in.SeekPosition})
|
||||
if err != nil {
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
sc.queryMsgStream.AsConsumer(consumeChannels, consumeSubName)
|
||||
if in.SeekPosition == nil || len(in.SeekPosition.MsgID) == 0 {
|
||||
// as consumer
|
||||
log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName)
|
||||
} else {
|
||||
// seek query channel
|
||||
err = sc.queryMsgStream.Seek([]*internalpb.MsgPosition{in.SeekPosition})
|
||||
if err != nil {
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}
|
||||
return status, err
|
||||
}
|
||||
return status, err
|
||||
log.Debug("querynode seek query channel: ", zap.Any("consumeChannels", consumeChannels))
|
||||
}
|
||||
log.Debug("querynode seek query channel: ", zap.Any("consumeChannels", consumeChannels))
|
||||
}
|
||||
|
||||
// add result channel
|
||||
|
|
|
@ -84,6 +84,9 @@ type ParamTable struct {
|
|||
|
||||
CreatedTime time.Time
|
||||
UpdatedTime time.Time
|
||||
|
||||
// recovery
|
||||
skipQueryChannelRecovery bool
|
||||
}
|
||||
|
||||
// Params is a package scoped variable of type ParamTable.
|
||||
|
@ -144,6 +147,8 @@ func (p *ParamTable) Init() {
|
|||
p.initKnowhereSimdType()
|
||||
|
||||
p.initRoleName()
|
||||
|
||||
p.initSkipQueryChannelRecovery()
|
||||
}
|
||||
|
||||
func (p *ParamTable) initCacheSize() {
|
||||
|
@ -340,3 +345,7 @@ func (p *ParamTable) initKnowhereSimdType() {
|
|||
func (p *ParamTable) initRoleName() {
|
||||
p.RoleName = "querynode"
|
||||
}
|
||||
|
||||
func (p *ParamTable) initSkipQueryChannelRecovery() {
|
||||
p.skipQueryChannelRecovery = p.ParseBool("msgChannel.skipQueryChannelRecovery", false)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue