mirror of https://github.com/milvus-io/milvus.git
Delete usless params SkipQueryChannelRecovery (#15289)
Signed-off-by: xige-16 <xi.ge@zilliz.com>pull/15289/merge
parent
72a17339f5
commit
1cd2363bd8
|
@ -223,8 +223,6 @@ msgChannel:
|
|||
dataCoordStatistic: "datacoord-statistics-channel"
|
||||
dataCoordTimeTick: "datacoord-timetick-channel"
|
||||
dataCoordSegmentInfo: "segment-info-channel"
|
||||
# skip replay query channel under failure recovery
|
||||
skipQueryChannelRecovery: "false"
|
||||
|
||||
# Sub name generation rule: ${subNamePrefix}-${NodeID}
|
||||
subNamePrefix:
|
||||
|
|
|
@ -32,7 +32,6 @@ import (
|
|||
queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/rootcoord"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/mqclient"
|
||||
)
|
||||
|
||||
type task interface {
|
||||
|
@ -160,25 +159,18 @@ func (r *addQueryChannelTask) Execute(ctx context.Context) error {
|
|||
consumeChannels := []string{r.req.QueryChannel}
|
||||
consumeSubName := funcutil.GenChannelSubName(Params.QueryNodeCfg.MsgChannelSubName, collectionID, Params.QueryNodeCfg.QueryNodeID)
|
||||
|
||||
if Params.QueryNodeCfg.SkipQueryChannelRecovery {
|
||||
log.Debug("Skip query channel seek back ", zap.Strings("channels", consumeChannels),
|
||||
zap.String("seek position", string(r.req.SeekPosition.MsgID)),
|
||||
zap.Uint64("ts", r.req.SeekPosition.Timestamp))
|
||||
sc.queryMsgStream.AsConsumerWithPosition(consumeChannels, consumeSubName, mqclient.SubscriptionPositionLatest)
|
||||
sc.queryMsgStream.AsConsumer(consumeChannels, consumeSubName)
|
||||
if r.req.SeekPosition == nil || len(r.req.SeekPosition.MsgID) == 0 {
|
||||
// as consumer
|
||||
log.Debug("QueryNode AsConsumer", zap.Strings("channels", consumeChannels), zap.String("sub name", consumeSubName))
|
||||
} else {
|
||||
sc.queryMsgStream.AsConsumer(consumeChannels, consumeSubName)
|
||||
if r.req.SeekPosition == nil || len(r.req.SeekPosition.MsgID) == 0 {
|
||||
// as consumer
|
||||
log.Debug("QueryNode AsConsumer", zap.Strings("channels", consumeChannels), zap.String("sub name", consumeSubName))
|
||||
} else {
|
||||
// seek query channel
|
||||
err = sc.queryMsgStream.Seek([]*internalpb.MsgPosition{r.req.SeekPosition})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("querynode seek query channel: ", zap.Any("consumeChannels", consumeChannels),
|
||||
zap.String("seek position", string(r.req.SeekPosition.MsgID)))
|
||||
// seek query channel
|
||||
err = sc.queryMsgStream.Seek([]*internalpb.MsgPosition{r.req.SeekPosition})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("querynode seek query channel: ", zap.Any("consumeChannels", consumeChannels),
|
||||
zap.String("seek position", string(r.req.SeekPosition.MsgID)))
|
||||
}
|
||||
|
||||
// add result channel
|
||||
|
|
|
@ -190,30 +190,6 @@ func TestTask_AddQueryChannel(t *testing.T) {
|
|||
err = task.Execute(ctx)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("test execute skipQueryChannelRecovery", func(t *testing.T) {
|
||||
node, err := genSimpleQueryNode(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
position := &internalpb.MsgPosition{
|
||||
ChannelName: genQueryChannel(),
|
||||
MsgID: []byte{1, 2, 3},
|
||||
MsgGroup: defaultSubName,
|
||||
Timestamp: 0,
|
||||
}
|
||||
|
||||
task := addQueryChannelTask{
|
||||
req: genAddQueryChanelRequest(),
|
||||
node: node,
|
||||
}
|
||||
|
||||
task.req.SeekPosition = position
|
||||
|
||||
Params.QueryNodeCfg.SkipQueryChannelRecovery = true
|
||||
|
||||
err = task.Execute(ctx)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestTask_watchDmChannelsTask(t *testing.T) {
|
||||
|
|
|
@ -911,9 +911,6 @@ type queryNodeConfig struct {
|
|||
CreatedTime time.Time
|
||||
UpdatedTime time.Time
|
||||
|
||||
// recovery
|
||||
SkipQueryChannelRecovery bool
|
||||
|
||||
// memory limit
|
||||
OverloadedMemoryThresholdPercentage float64
|
||||
}
|
||||
|
@ -941,7 +938,6 @@ func (p *queryNodeConfig) init(bp *BaseParamTable) {
|
|||
|
||||
p.initSegcoreChunkRows()
|
||||
|
||||
p.initSkipQueryChannelRecovery()
|
||||
p.initOverloadedMemoryThresholdPercentage()
|
||||
}
|
||||
|
||||
|
@ -1050,10 +1046,6 @@ func (p *queryNodeConfig) initSegcoreChunkRows() {
|
|||
p.ChunkRows = p.BaseParams.ParseInt64WithDefault("queryNode.segcore.chunkRows", 32768)
|
||||
}
|
||||
|
||||
func (p *queryNodeConfig) initSkipQueryChannelRecovery() {
|
||||
p.SkipQueryChannelRecovery = p.BaseParams.ParseBool("msgChannel.skipQueryChannelRecovery", false)
|
||||
}
|
||||
|
||||
func (p *queryNodeConfig) initOverloadedMemoryThresholdPercentage() {
|
||||
overloadedMemoryThresholdPercentage := p.BaseParams.LoadWithDefault("queryCoord.overloadedMemoryThresholdPercentage", "90")
|
||||
thresholdPercentage, err := strconv.ParseInt(overloadedMemoryThresholdPercentage, 10, 64)
|
||||
|
|
Loading…
Reference in New Issue