From 00b3fcb98b84e10e15332df5dd86976a18a4baf3 Mon Sep 17 00:00:00 2001
From: Jiquan Long <jiquan.long@zilliz.com>
Date: Wed, 29 Dec 2021 14:55:21 +0800
Subject: [PATCH] Add log for flowgraph (#14441)

Signed-off-by: dragondriver <jiquan.long@zilliz.com>
---
 internal/datacoord/server.go              |  6 ++-
 internal/datacoord/util.go                | 50 -----------------------
 internal/util/flowgraph/node.go           | 24 +++++++++++
 internal/util/timerecord/time_recorder.go | 50 +++++++++++++++++++++++
 4 files changed, 78 insertions(+), 52 deletions(-)

diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go
index f777e62c5f..37c2bf9463 100644
--- a/internal/datacoord/server.go
+++ b/internal/datacoord/server.go
@@ -26,6 +26,8 @@ import (
 	"syscall"
 	"time"
 
+	"github.com/milvus-io/milvus/internal/util/timerecord"
+
 	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
 	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
 	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
@@ -454,9 +456,9 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
 	ttMsgStream.Start()
 
 	go func() {
-		var checker *LongTermChecker
+		var checker *timerecord.LongTermChecker
 		if enableTtChecker {
-			checker = NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
+			checker = timerecord.NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
 			checker.Start()
 			defer checker.Stop()
 		}
diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go
index 23fe80d669..95ba9f93aa 100644
--- a/internal/datacoord/util.go
+++ b/internal/datacoord/util.go
@@ -19,10 +19,8 @@ package datacoord
 import (
 	"context"
 	"errors"
-	"fmt"
 	"time"
 
-	"github.com/milvus-io/milvus/internal/log"
 	"github.com/milvus-io/milvus/internal/proto/commonpb"
 	"github.com/milvus-io/milvus/internal/util/tsoutil"
 )
@@ -68,54 +66,6 @@ func FailResponse(status *commonpb.Status, reason string) {
 	status.Reason = reason
 }
 
-// LongTermChecker checks we receive at least one msg in d duration. If not, checker
-// will print a warn message.
-type LongTermChecker struct {
-	d    time.Duration
-	t    *time.Ticker
-	ch   chan struct{}
-	warn string
-	name string
-}
-
-// NewLongTermChecker creates a long term checker specified name, checking interval and warning string to print
-func NewLongTermChecker(ctx context.Context, name string, d time.Duration, warn string) *LongTermChecker {
-	c := &LongTermChecker{
-		name: name,
-		d:    d,
-		warn: warn,
-		ch:   make(chan struct{}),
-	}
-	return c
-}
-
-// Start starts the check process
-func (c *LongTermChecker) Start() {
-	c.t = time.NewTicker(c.d)
-	go func() {
-		for {
-			select {
-			case <-c.ch:
-				log.Warn(fmt.Sprintf("long term checker [%s] shutdown", c.name))
-				return
-			case <-c.t.C:
-				log.Warn(c.warn)
-			}
-		}
-	}()
-}
-
-// Check resets the time ticker
-func (c *LongTermChecker) Check() {
-	c.t.Reset(c.d)
-}
-
-// Stop stops the checker
-func (c *LongTermChecker) Stop() {
-	c.t.Stop()
-	close(c.ch)
-}
-
 func getTimetravelReverseTime(ctx context.Context, allocator allocator) (*timetravel, error) {
 	ts, err := allocator.allocTimestamp(ctx)
 	if err != nil {
diff --git a/internal/util/flowgraph/node.go b/internal/util/flowgraph/node.go
index 78dd46af0d..5c5c367b46 100644
--- a/internal/util/flowgraph/node.go
+++ b/internal/util/flowgraph/node.go
@@ -12,14 +12,23 @@
 package flowgraph
 
 import (
+	"context"
 	"fmt"
 	"sync"
 	"time"
 
+	"github.com/milvus-io/milvus/internal/util/timerecord"
+
 	"github.com/milvus-io/milvus/internal/log"
 	"go.uber.org/zap"
 )
 
+const (
+	// TODO: better to be configured
+	nodeCtxTtInterval = 2 * time.Minute
+	enableTtChecker   = true
+)
+
 // Node is the interface defines the behavior of flowgraph
 type Node interface {
 	Name() string
@@ -61,6 +70,17 @@ func (nodeCtx *nodeCtx) Start(wg *sync.WaitGroup) {
 // 2. invoke node.Operate
 // 3. deliver the Operate result to downstream nodes
 func (nodeCtx *nodeCtx) work() {
+	// TODO: necessary to check every node?
+	name := fmt.Sprintf("nodeCtxTtChecker-%s", nodeCtx.node.Name())
+	warn := fmt.Sprintf("node %s haven't received input for %f minutes",
+		nodeCtx.node.Name(), nodeCtxTtInterval.Minutes())
+	var checker *timerecord.LongTermChecker
+	if enableTtChecker {
+		checker = timerecord.NewLongTermChecker(context.Background(), name, nodeCtxTtInterval, warn)
+		checker.Start()
+		defer checker.Stop()
+	}
+
 	for {
 		select {
 		case <-nodeCtx.closeCh:
@@ -76,6 +96,10 @@ func (nodeCtx *nodeCtx) work() {
 			n := nodeCtx.node
 			res = n.Operate(inputs)
 
+			if enableTtChecker {
+				checker.Check()
+			}
+
 			downstreamLength := len(nodeCtx.downstreamInputChanIdx)
 			if len(nodeCtx.downstream) < downstreamLength {
 				log.Warn("", zap.Any("nodeCtx.downstream length", len(nodeCtx.downstream)))
diff --git a/internal/util/timerecord/time_recorder.go b/internal/util/timerecord/time_recorder.go
index 82c52dcc40..fe405abf2c 100644
--- a/internal/util/timerecord/time_recorder.go
+++ b/internal/util/timerecord/time_recorder.go
@@ -12,6 +12,8 @@
 package timerecord
 
 import (
+	"context"
+	"fmt"
 	"strconv"
 	"time"
 
@@ -75,3 +77,51 @@ func (tr *TimeRecorder) printTimeRecord(msg string, span time.Duration) {
 	str += "ms)"
 	log.Debug(str)
 }
+
+// LongTermChecker checks we receive at least one msg in d duration. If not, checker
+// will print a warn message.
+type LongTermChecker struct {
+	d    time.Duration
+	t    *time.Ticker
+	ch   chan struct{}
+	warn string
+	name string
+}
+
+// NewLongTermChecker creates a long term checker specified name, checking interval and warning string to print
+func NewLongTermChecker(ctx context.Context, name string, d time.Duration, warn string) *LongTermChecker {
+	c := &LongTermChecker{
+		name: name,
+		d:    d,
+		warn: warn,
+		ch:   make(chan struct{}),
+	}
+	return c
+}
+
+// Start starts the check process
+func (c *LongTermChecker) Start() {
+	c.t = time.NewTicker(c.d)
+	go func() {
+		for {
+			select {
+			case <-c.ch:
+				log.Warn(fmt.Sprintf("long term checker [%s] shutdown", c.name))
+				return
+			case <-c.t.C:
+				log.Warn(c.warn)
+			}
+		}
+	}()
+}
+
+// Check resets the time ticker
+func (c *LongTermChecker) Check() {
+	c.t.Reset(c.d)
+}
+
+// Stop stops the checker
+func (c *LongTermChecker) Stop() {
+	c.t.Stop()
+	close(c.ch)
+}