Setup first tsafe to avoid false maxLag error (#21490) (#21497)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/21565/head
congqixia 2023-01-06 13:19:36 +08:00 committed by GitHub
parent bdced9c3ca
commit c21cdb5d2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 9 additions and 15 deletions

View File

@ -45,12 +45,11 @@ type (
// queryNodeFlowGraph is a TimeTickedFlowGraph in query node
type queryNodeFlowGraph struct {
ctx context.Context
cancel context.CancelFunc
collectionID UniqueID
vchannel Channel
flowGraph *flowgraph.TimeTickedFlowGraph
dmlStream msgstream.MsgStream
tSafeReplica TSafeReplicaInterface
consumerCnt int
}
@ -62,17 +61,14 @@ func newQueryNodeFlowGraph(ctx context.Context,
vchannel Channel,
factory msgstream.Factory) (*queryNodeFlowGraph, error) {
ctx1, cancel := context.WithCancel(ctx)
q := &queryNodeFlowGraph{
ctx: ctx1,
cancel: cancel,
collectionID: collectionID,
vchannel: vchannel,
flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx1),
tSafeReplica: tSafeReplica,
flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx),
}
dmStreamNode, err := q.newDmInputNode(ctx1, factory, collectionID, vchannel, metrics.InsertLabel)
dmStreamNode, err := q.newDmInputNode(ctx, factory, collectionID, vchannel, metrics.InsertLabel)
if err != nil {
return nil, err
}
@ -128,17 +124,14 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
vchannel Channel,
factory msgstream.Factory) (*queryNodeFlowGraph, error) {
ctx1, cancel := context.WithCancel(ctx)
q := &queryNodeFlowGraph{
ctx: ctx1,
cancel: cancel,
collectionID: collectionID,
vchannel: vchannel,
flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx1),
tSafeReplica: tSafeReplica,
flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx),
}
dmStreamNode, err := q.newDmInputNode(ctx1, factory, collectionID, vchannel, metrics.DeleteLabel)
dmStreamNode, err := q.newDmInputNode(ctx, factory, collectionID, vchannel, metrics.DeleteLabel)
if err != nil {
return nil, err
}
@ -247,6 +240,8 @@ func (q *queryNodeFlowGraph) consumeFlowGraphFromPosition(position *internalpb.M
start := time.Now()
err := q.dmlStream.Seek([]*internalpb.MsgPosition{position})
// setup first ts
q.tSafeReplica.setTSafe(q.vchannel, position.GetTimestamp())
ts, _ := tsoutil.ParseTS(position.GetTimestamp())
log.Info("query node flow graph seeks from position",
@ -264,7 +259,6 @@ func (q *queryNodeFlowGraph) consumeFlowGraphFromPosition(position *internalpb.M
// close would close queryNodeFlowGraph
func (q *queryNodeFlowGraph) close() {
q.cancel()
q.flowGraph.Close()
if q.dmlStream != nil && q.consumerCnt > 0 {
metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Sub(float64(q.consumerCnt))