Fix QueryNode log level (#16604)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/16828/head
Xiaofan 2022-05-07 10:27:51 +08:00 committed by GitHub
parent 25add4414e
commit 92b6293be4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 434 additions and 324 deletions

View File

@ -592,7 +592,7 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() {
// auto balance is executed on replica level
onlineNodeIDs := replica.GetNodeIds()
if len(onlineNodeIDs) == 0 {
log.Error("loadBalanceSegmentLoop: there are no online QueryNode to balance")
log.Error("loadBalanceSegmentLoop: there are no online QueryNode to balance", zap.Int64("collection", replica.CollectionID), zap.Int64("replica", replica.ReplicaID))
continue
}
var availableNodeIDs []int64
@ -601,7 +601,9 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() {
if _, ok := nodeID2MemUsage[nodeID]; !ok {
nodeInfo, err := qc.cluster.getNodeInfoByID(nodeID)
if err != nil {
log.Warn("loadBalanceSegmentLoop: get node info from QueryNode failed", zap.Int64("nodeID", nodeID), zap.Error(err))
log.Warn("loadBalanceSegmentLoop: get node info from QueryNode failed",
zap.Int64("nodeID", nodeID), zap.Int64("collection", replica.CollectionID), zap.Int64("replica", replica.ReplicaID),
zap.Error(err))
continue
}
nodeID2MemUsageRate[nodeID] = nodeInfo.(*queryNode).memUsageRate
@ -615,7 +617,9 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() {
for _, segmentInfo := range segmentInfos {
leastInfo, err := qc.cluster.getSegmentInfoByID(ctx, segmentInfo.SegmentID)
if err != nil {
log.Warn("loadBalanceSegmentLoop: get segment info from QueryNode failed", zap.Int64("nodeID", nodeID), zap.Error(err))
log.Warn("loadBalanceSegmentLoop: get segment info from QueryNode failed", zap.Int64("nodeID", nodeID),
zap.Int64("collection", replica.CollectionID), zap.Int64("replica", replica.ReplicaID),
zap.Error(err))
updateSegmentInfoDone = false
break
}
@ -626,9 +630,12 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() {
nodeID2SegmentInfos[nodeID] = leastSegmentInfos
}
}
log.Info("loadBalanceSegmentLoop: memory usage rate of all online QueryNode", zap.Any("mem rate", nodeID2MemUsageRate))
log.Info("loadBalanceSegmentLoop: memory usage rate of all online QueryNode", zap.Int64("collection", replica.CollectionID),
zap.Int64("replica", replica.ReplicaID), zap.Any("mem rate", nodeID2MemUsageRate))
if len(availableNodeIDs) <= 1 {
log.Warn("loadBalanceSegmentLoop: there are too few available query nodes to balance", zap.Int64s("onlineNodeIDs", onlineNodeIDs), zap.Int64s("availableNodeIDs", availableNodeIDs))
log.Info("loadBalanceSegmentLoop: there are too few available query nodes to balance",
zap.Int64("collection", replica.CollectionID), zap.Int64("replica", replica.ReplicaID),
zap.Int64s("onlineNodeIDs", onlineNodeIDs), zap.Int64s("availableNodeIDs", availableNodeIDs))
continue
}
@ -678,6 +685,9 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() {
cluster: qc.cluster,
meta: qc.meta,
}
log.Info("loadBalanceSegmentLoop: generate a loadBalance task",
zap.Int64("collection", replica.CollectionID), zap.Int64("replica", replica.ReplicaID),
zap.Any("task", balanceTask))
loadBalanceTasks = append(loadBalanceTasks, balanceTask)
nodeID2MemUsage[sourceNodeID] -= uint64(selectedSegmentInfo.MemSize)
nodeID2MemUsage[dstNodeID] += uint64(selectedSegmentInfo.MemSize)
@ -690,13 +700,12 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() {
if memoryInsufficient {
// no enough memory on query nodes to balance, then notify proxy to stop insert
//TODO:: xige-16
log.Warn("loadBalanceSegmentLoop: QueryNode has insufficient memory, stop inserting data")
log.Warn("loadBalanceSegmentLoop: QueryNode has insufficient memory, stop inserting data", zap.Int64("collection", replica.CollectionID), zap.Int64("replica", replica.ReplicaID))
}
}
}
for _, t := range loadBalanceTasks {
qc.scheduler.Enqueue(t)
log.Info("loadBalanceSegmentLoop: enqueue a loadBalance task", zap.Any("task", t))
err := t.waitToFinish()
if err != nil {
// if failed, wait for next balance loop
@ -707,7 +716,6 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() {
log.Info("loadBalanceSegmentLoop: balance task execute success", zap.Any("task", t))
}
}
log.Info("loadBalanceSegmentLoop: load balance Done in this loop", zap.Any("tasks", loadBalanceTasks))
}
}
}

View File

@ -83,7 +83,7 @@ func (c *Collection) addPartitionID(partitionID UniqueID) {
defer c.releaseMu.Unlock()
c.partitionIDs = append(c.partitionIDs, partitionID)
log.Debug("queryNode collection info after add a partition",
log.Info("queryNode collection info after add a partition",
zap.Int64("partitionID", partitionID), zap.Int64("collectionID", c.id),
zap.Int64s("partitions", c.partitionIDs))
}
@ -107,14 +107,14 @@ OUTER:
for _, dstChan := range channels {
for _, srcChan := range c.vChannels {
if dstChan == srcChan {
log.Debug("vChannel has been existed in collection's vChannels",
log.Warn("vChannel has been existed in collection's vChannels",
zap.Int64("collectionID", c.ID()),
zap.String("vChannel", dstChan),
)
continue OUTER
}
}
log.Debug("add vChannel to collection",
log.Info("add vChannel to collection",
zap.Int64("collectionID", c.ID()),
zap.String("vChannel", dstChan),
)
@ -144,7 +144,7 @@ func (c *Collection) removeVChannel(channel Channel) {
}
}
c.vChannels = tmpChannels
log.Debug("remove vChannel from collection",
log.Info("remove vChannel from collection",
zap.Int64("collectionID", c.ID()),
zap.String("channel", channel),
)
@ -160,14 +160,14 @@ OUTER:
for _, dstChan := range channels {
for _, srcChan := range c.pChannels {
if dstChan == srcChan {
log.Debug("pChannel has been existed in collection's pChannels",
log.Info("pChannel has been existed in collection's pChannels",
zap.Int64("collectionID", c.ID()),
zap.String("pChannel", dstChan),
)
continue OUTER
}
}
log.Debug("add pChannel to collection",
log.Info("add pChannel to collection",
zap.Int64("collectionID", c.ID()),
zap.String("pChannel", dstChan),
)
@ -192,14 +192,14 @@ OUTER:
for _, dstChan := range channels {
for _, srcChan := range c.pDeltaChannels {
if dstChan == srcChan {
log.Debug("pChannel has been existed in collection's pChannels",
log.Info("pChannel has been existed in collection's pChannels",
zap.Int64("collectionID", c.ID()),
zap.String("pChannel", dstChan),
)
continue OUTER
}
}
log.Debug("add pChannel to collection",
log.Info("add pChannel to collection",
zap.Int64("collectionID", c.ID()),
zap.String("pChannel", dstChan),
)
@ -232,14 +232,14 @@ OUTER:
for _, dstChan := range channels {
for _, srcChan := range c.vDeltaChannels {
if dstChan == srcChan {
log.Debug("vDeltaChannel has been existed in collection's vDeltaChannels",
log.Info("vDeltaChannel has been existed in collection's vDeltaChannels",
zap.Int64("collectionID", c.ID()),
zap.String("vChannel", dstChan),
)
continue OUTER
}
}
log.Debug("add vDeltaChannel to collection",
log.Info("add vDeltaChannel to collection",
zap.Int64("collectionID", c.ID()),
zap.String("vDeltaChannel", dstChan),
)
@ -259,7 +259,7 @@ func (c *Collection) removeVDeltaChannel(channel Channel) {
}
}
c.vDeltaChannels = tmpChannels
log.Debug("remove vDeltaChannel from collection",
log.Info("remove vDeltaChannel from collection",
zap.Int64("collectionID", c.ID()),
zap.String("channel", channel),
)
@ -323,7 +323,7 @@ func newCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) *Co
}
C.free(unsafe.Pointer(cSchemaBlob))
log.Debug("create collection", zap.Int64("collectionID", collectionID))
log.Info("create collection", zap.Int64("collectionID", collectionID))
newCollection.setReleaseTime(Timestamp(math.MaxUint64))
return newCollection
@ -340,7 +340,7 @@ func deleteCollection(collection *Collection) {
collection.collectionPtr = nil
log.Debug("delete collection", zap.Int64("collectionID", collection.ID()))
log.Info("delete collection", zap.Int64("collectionID", collection.ID()))
collection = nil
}

View File

@ -181,10 +181,10 @@ func (colReplica *collectionReplica) printReplica() {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
log.Debug("collections in collectionReplica", zap.Any("info", colReplica.collections))
log.Debug("partitions in collectionReplica", zap.Any("info", colReplica.partitions))
log.Debug("segments in collectionReplica", zap.Any("info", colReplica.segments))
log.Debug("excludedSegments in collectionReplica", zap.Any("info", colReplica.excludedSegments))
log.Info("collections in collectionReplica", zap.Any("info", colReplica.collections))
log.Info("partitions in collectionReplica", zap.Any("info", colReplica.partitions))
log.Info("segments in collectionReplica", zap.Any("info", colReplica.segments))
log.Info("excludedSegments in collectionReplica", zap.Any("info", colReplica.excludedSegments))
}
//----------------------------------------------------------------------------------------------------- collection
@ -210,8 +210,6 @@ func (colReplica *collectionReplica) addCollection(collectionID UniqueID, schema
var newCollection = newCollection(collectionID, schema)
colReplica.collections[collectionID] = newCollection
log.Debug("Successfully add collection ", zap.Int64("collectionID", collectionID))
metrics.QueryNodeNumCollections.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Set(float64(len(colReplica.collections)))
return newCollection
}

View File

@ -73,7 +73,7 @@ func (dsService *dataSyncService) addFlowGraphsForDMLChannels(collectionID Uniqu
for channel, fg := range results {
dsService.dmlChannel2FlowGraph[channel] = fg
log.Debug("add DML flow graph",
log.Info("add DML flow graph",
zap.Any("collectionID", collectionID),
zap.Any("channel", channel))
metrics.QueryNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Inc()
@ -113,7 +113,7 @@ func (dsService *dataSyncService) addFlowGraphsForDeltaChannels(collectionID Uni
for channel, fg := range results {
dsService.deltaChannel2FlowGraph[channel] = fg
log.Debug("add delta flow graph",
log.Info("add delta flow graph",
zap.Any("collectionID", collectionID),
zap.Any("channel", channel))
metrics.QueryNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Inc()
@ -156,7 +156,7 @@ func (dsService *dataSyncService) startFlowGraphByDMLChannel(collectionID Unique
if _, ok := dsService.dmlChannel2FlowGraph[channel]; !ok {
return fmt.Errorf("DML flow graph doesn't existed, collectionID = %d", collectionID)
}
log.Debug("start DML flow graph",
log.Info("start DML flow graph",
zap.Any("collectionID", collectionID),
zap.Any("channel", channel),
)
@ -172,7 +172,7 @@ func (dsService *dataSyncService) startFlowGraphForDeltaChannel(collectionID Uni
if _, ok := dsService.deltaChannel2FlowGraph[channel]; !ok {
return fmt.Errorf("delta flow graph doesn't existed, collectionID = %d", collectionID)
}
log.Debug("start delta flow graph",
log.Info("start delta flow graph",
zap.Any("collectionID", collectionID),
zap.Any("channel", channel),
)

View File

@ -17,6 +17,7 @@
package querynode
import (
"reflect"
"sync"
"github.com/opentracing/opentracing-go"
@ -55,7 +56,11 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
dMsg, ok := in[0].(*deleteMsg)
if !ok {
log.Warn("type assertion failed for deleteMsg")
if in[0] == nil {
log.Debug("type assertion failed for deleteMsg because it's nil")
} else {
log.Warn("type assertion failed for deleteMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
}
return []Msg{}
}
@ -79,9 +84,6 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// 1. filter segment by bloom filter
for i, delMsg := range dMsg.deleteMessages {
traceID, _, _ := trace.InfoFromSpan(spans[i])
log.Debug("Process delete request in QueryNode", zap.String("traceID", traceID))
if dNode.replica.getSegmentNum() != 0 {
log.Debug("delete in historical replica",
zap.Any("collectionID", delMsg.CollectionID),
zap.Any("collectionName", delMsg.CollectionName),
@ -89,7 +91,11 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
zap.Int("numTS", len(delMsg.Timestamps)),
zap.Any("timestampBegin", delMsg.BeginTs()),
zap.Any("timestampEnd", delMsg.EndTs()),
zap.Any("segmentNum", dNode.replica.getSegmentNum()),
zap.Any("traceID", traceID),
)
if dNode.replica.getSegmentNum() != 0 {
processDeleteMessages(dNode.replica, delMsg, delData)
}
}
@ -98,7 +104,7 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
for segmentID, pks := range delData.deleteIDs {
segment, err := dNode.replica.getSegmentByID(segmentID)
if err != nil {
log.Debug(err.Error())
log.Debug("failed to get segment", zap.Int64("segmentId", segmentID), zap.Error(err))
continue
}
offset := segment.segmentPreDelete(len(pks))
@ -126,7 +132,6 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// delete will do delete operation at segment which id is segmentID
func (dNode *deleteNode) delete(deleteData *deleteData, segmentID UniqueID, wg *sync.WaitGroup) {
defer wg.Done()
log.Debug("QueryNode::dNode::delete", zap.Any("SegmentID", segmentID))
targetSegment, err := dNode.replica.getSegmentByID(segmentID)
if err != nil {
log.Error(err.Error())

View File

@ -18,6 +18,7 @@ package querynode
import (
"fmt"
"reflect"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
@ -50,7 +51,11 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
msgStreamMsg, ok := in[0].(*MsgStreamMsg)
if !ok {
log.Warn("type assertion failed for MsgStreamMsg")
if in[0] == nil {
log.Debug("type assertion failed for MsgStreamMsg because it's nil")
} else {
log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
}
return []Msg{}
}

View File

@ -18,6 +18,7 @@ package querynode
import (
"fmt"
"reflect"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
@ -43,8 +44,6 @@ func (fdmNode *filterDmNode) Name() string {
// Operate handles input messages, to filter invalid insert messages
func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
//log.Debug("Do filterDmNode operation")
if len(in) != 1 {
log.Warn("Invalid operate message input in filterDmNode", zap.Int("input length", len(in)))
return []Msg{}
@ -52,7 +51,11 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
msgStreamMsg, ok := in[0].(*MsgStreamMsg)
if !ok {
log.Warn("type assertion failed for MsgStreamMsg")
if in[0] == nil {
log.Debug("type assertion failed for MsgStreamMsg because it's nil")
} else {
log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
}
return []Msg{}
}
@ -78,7 +81,7 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
for i, msg := range msgStreamMsg.TsMessages() {
traceID, _, _ := trace.InfoFromSpan(spans[i])
log.Info("Filter invalid message in QueryNode", zap.String("traceID", traceID))
log.Debug("Filter invalid message in QueryNode", zap.String("traceID", traceID))
switch msg.Type() {
case commonpb.MsgType_Insert:
resMsg := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg))
@ -192,7 +195,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
for _, segmentInfo := range excludedSegments {
// unFlushed segment may not have checkPoint, so `segmentInfo.DmlPosition` may be nil
if segmentInfo.DmlPosition == nil {
log.Debug("filter unFlushed segment without checkPoint",
log.Warn("filter unFlushed segment without checkPoint",
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
return nil

View File

@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"io"
"reflect"
"sort"
"strconv"
"sync"
@ -69,8 +70,6 @@ func (iNode *insertNode) Name() string {
// Operate handles input messages, to execute insert operations
func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
//log.Debug("Do insertNode operation")
if len(in) != 1 {
log.Warn("Invalid operate message input in insertNode", zap.Int("input length", len(in)))
return []Msg{}
@ -78,7 +77,11 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
iMsg, ok := in[0].(*insertMsg)
if !ok {
log.Warn("type assertion failed for insertMsg")
if in[0] == nil {
log.Debug("type assertion failed for insertMsg because it's nil")
} else {
log.Warn("type assertion failed for insertMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
}
return []Msg{}
}
@ -111,13 +114,13 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// if loadType is loadCollection, check if partition exists, if not, create partition
col, err := iNode.streamingReplica.getCollectionByID(insertMsg.CollectionID)
if err != nil {
log.Error(err.Error())
log.Warn("failed to get collection", zap.Error(err))
continue
}
if col.getLoadType() == loadTypeCollection {
err = iNode.streamingReplica.addPartition(insertMsg.CollectionID, insertMsg.PartitionID)
if err != nil {
log.Error(err.Error())
log.Warn("failed to add partition", zap.Error(err))
continue
}
}
@ -126,14 +129,14 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if !iNode.streamingReplica.hasSegment(insertMsg.SegmentID) {
err := iNode.streamingReplica.addSegment(insertMsg.SegmentID, insertMsg.PartitionID, insertMsg.CollectionID, insertMsg.ShardName, segmentTypeGrowing, true)
if err != nil {
log.Warn(err.Error())
log.Warn("failed to add segment", zap.Error(err))
continue
}
}
insertRecord, err := storage.TransferInsertMsgToInsertRecord(col.schema, insertMsg)
if err != nil {
log.Error("failed to transfer msgStream.insertMsg to segcorepb.InsertRecord", zap.Error(err))
log.Warn("failed to transfer msgStream.insertMsg to segcorepb.InsertRecord", zap.Error(err))
return []Msg{}
}
@ -146,7 +149,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
pks, err := getPrimaryKeys(insertMsg, iNode.streamingReplica)
if err != nil {
log.Warn(err.Error())
log.Warn("failed to get primary keys", zap.Error(err))
continue
}
iData.insertPKs[insertMsg.SegmentID] = append(iData.insertPKs[insertMsg.SegmentID], pks...)
@ -298,13 +301,11 @@ func filterSegmentsByPKs(pks []primaryKey, timestamps []Timestamp, segment *Segm
retTss = append(retTss, timestamps[index])
}
}
log.Debug("In filterSegmentsByPKs", zap.Any("pk len", len(retPks)), zap.Any("segment", segment.segmentID))
return retPks, retTss, nil
}
// insert would execute insert operations for specific growing segment
func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync.WaitGroup) {
log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID))
var targetSegment, err = iNode.streamingReplica.getSegmentByID(segmentID)
if err != nil {
log.Warn("cannot find segment:", zap.Int64("segmentID", segmentID))
@ -341,10 +342,9 @@ func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync.
// delete would execute delete operations for specific growing segment
func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID, wg *sync.WaitGroup) {
defer wg.Done()
log.Debug("QueryNode::iNode::delete", zap.Any("SegmentID", segmentID))
targetSegment, err := iNode.streamingReplica.getSegmentByID(segmentID)
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return
}
@ -416,7 +416,7 @@ func getPKsFromRowBasedInsertMsg(msg *msgstream.InsertMsg, schema *schemapb.Coll
if t.Key == "dim" {
dim, err := strconv.Atoi(t.Value)
if err != nil {
log.Error("strconv wrong on get dim", zap.Error(err))
log.Warn("strconv wrong on get dim", zap.Error(err))
break
}
offset += dim * 4
@ -428,7 +428,7 @@ func getPKsFromRowBasedInsertMsg(msg *msgstream.InsertMsg, schema *schemapb.Coll
if t.Key == "dim" {
dim, err := strconv.Atoi(t.Value)
if err != nil {
log.Error("strconv wrong on get dim", zap.Error(err))
log.Warn("strconv wrong on get dim", zap.Error(err))
return nil, err
}
offset += dim / 8

View File

@ -204,7 +204,7 @@ func (q *queryNodeFlowGraph) consumeFlowGraph(channel Channel, subName ConsumeSu
return errors.New("null dml message stream in flow graph")
}
q.dmlStream.AsConsumer([]string{channel}, subName)
log.Debug("query node flow graph consumes from pChannel",
log.Info("query node flow graph consumes from pChannel",
zap.Any("collectionID", q.collectionID),
zap.Any("channel", channel),
zap.Any("subName", subName),
@ -220,7 +220,7 @@ func (q *queryNodeFlowGraph) consumeFlowGraphFromLatest(channel Channel, subName
return errors.New("null dml message stream in flow graph")
}
q.dmlStream.AsConsumerWithPosition([]string{channel}, subName, mqwrapper.SubscriptionPositionLatest)
log.Debug("query node flow graph consumes from pChannel",
log.Info("query node flow graph consumes from pChannel",
zap.Any("collectionID", q.collectionID),
zap.Any("channel", channel),
zap.Any("subName", subName),
@ -234,7 +234,7 @@ func (q *queryNodeFlowGraph) consumeFlowGraphFromLatest(channel Channel, subName
func (q *queryNodeFlowGraph) seekQueryNodeFlowGraph(position *internalpb.MsgPosition) error {
q.dmlStream.AsConsumer([]string{position.ChannelName}, position.MsgGroup)
err := q.dmlStream.Seek([]*internalpb.MsgPosition{position})
log.Debug("query node flow graph seeks from pChannel",
log.Info("query node flow graph seeks from pChannel",
zap.Any("collectionID", q.collectionID),
zap.Any("channel", position.ChannelName),
)
@ -250,7 +250,7 @@ func (q *queryNodeFlowGraph) close() {
if q.dmlStream != nil && q.consumerCnt > 0 {
metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Sub(float64(q.consumerCnt))
}
log.Debug("stop query node flow graph",
log.Info("stop query node flow graph",
zap.Any("collectionID", q.collectionID),
zap.Any("channel", q.channel),
)

View File

@ -18,11 +18,12 @@ package querynode
import (
"fmt"
"go.uber.org/zap"
"reflect"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"go.uber.org/zap"
)
// serviceTimeNode is one of the nodes in delta flow graph
@ -40,8 +41,6 @@ func (stNode *serviceTimeNode) Name() string {
// Operate handles input messages, to execute insert operations
func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
//log.Debug("Do serviceTimeNode operation")
if len(in) != 1 {
log.Warn("Invalid operate message input in serviceTimeNode, input length = ", zap.Int("input node", len(in)))
return []Msg{}
@ -49,7 +48,11 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
serviceTimeMsg, ok := in[0].(*serviceTimeMsg)
if !ok {
log.Warn("type assertion failed for serviceTimeMsg")
if in[0] == nil {
log.Debug("type assertion failed for serviceTimeMsg because it's nil")
} else {
log.Warn("type assertion failed for serviceTimeMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
}
return []Msg{}
}
@ -65,14 +68,13 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
zap.Error(err),
)
}
//p, _ := tsoutil.ParseTS(serviceTimeMsg.timeRange.timestampMax)
//log.Debug("update tSafe:",
// zap.Any("collectionID", stNode.collectionID),
// zap.Any("tSafe", serviceTimeMsg.timeRange.timestampMax),
// zap.Any("tSafe_p", p),
// zap.Any("id", id),
// zap.Any("channel", stNode.vChannel),
//)
p, _ := tsoutil.ParseTS(serviceTimeMsg.timeRange.timestampMax)
log.Debug("update tSafe:",
zap.Any("collectionID", stNode.collectionID),
zap.Any("tSafe", serviceTimeMsg.timeRange.timestampMax),
zap.Any("tSafe_p", p),
zap.Any("channel", stNode.vChannel),
)
return []Msg{}
}

View File

@ -35,7 +35,6 @@ import (
// GetComponentStates returns information about whether the node is healthy
func (node *QueryNode) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
log.Debug("Get QueryNode component states")
stats := &internalpb.ComponentStates{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -114,10 +113,10 @@ func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQuery
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Error(err.Error())
log.Warn(err.Error())
return status, nil
}
log.Debug("addQueryChannelTask Enqueue done",
log.Info("addQueryChannelTask Enqueue done",
zap.Int64("collectionID", in.CollectionID),
zap.String("queryChannel", in.QueryChannel),
zap.String("queryResultChannel", in.QueryResultChannel),
@ -130,10 +129,10 @@ func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQuery
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Error(err.Error())
log.Warn(err.Error())
return status, nil
}
log.Debug("addQueryChannelTask WaitToFinish done",
log.Info("addQueryChannelTask WaitToFinish done",
zap.Int64("collectionID", in.CollectionID),
zap.String("queryChannel", in.QueryChannel),
zap.String("queryResultChannel", in.QueryResultChannel),
@ -224,11 +223,10 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmC
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Error(err.Error())
log.Warn(err.Error())
return status, nil
}
log.Debug("watchDmChannelsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()), zap.Int64("replicaID", in.GetReplicaID()))
log.Info("watchDmChannelsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()), zap.Int64("replicaID", in.GetReplicaID()))
waitFunc := func() (*commonpb.Status, error) {
err = dct.WaitToFinish()
if err != nil {
@ -236,10 +234,10 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmC
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Error(err.Error())
log.Warn(err.Error())
return status, nil
}
log.Debug("watchDmChannelsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
log.Info("watchDmChannelsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
@ -274,10 +272,11 @@ func (node *QueryNode) WatchDeltaChannels(ctx context.Context, in *queryPb.Watch
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Error(err.Error())
log.Warn(err.Error())
return status, nil
}
log.Debug("watchDeltaChannelsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
log.Info("watchDeltaChannelsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
waitFunc := func() (*commonpb.Status, error) {
err = dct.WaitToFinish()
@ -286,10 +285,11 @@ func (node *QueryNode) WatchDeltaChannels(ctx context.Context, in *queryPb.Watch
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Error(err.Error())
log.Warn(err.Error())
return status, nil
}
log.Debug("watchDeltaChannelsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
log.Info("watchDeltaChannelsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
@ -324,14 +324,14 @@ func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegment
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Error(err.Error())
log.Warn(err.Error())
return status, nil
}
segmentIDs := make([]UniqueID, 0)
for _, info := range in.Infos {
segmentIDs = append(segmentIDs, info.SegmentID)
}
log.Debug("loadSegmentsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
log.Info("loadSegmentsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
waitFunc := func() (*commonpb.Status, error) {
err = dct.WaitToFinish()
@ -340,10 +340,10 @@ func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegment
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Error(err.Error())
log.Warn(err.Error())
return status, nil
}
log.Debug("loadSegmentsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
log.Info("loadSegmentsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
@ -378,18 +378,18 @@ func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.Releas
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Error(err.Error())
log.Warn(err.Error())
return status, nil
}
log.Debug("releaseCollectionTask Enqueue done", zap.Int64("collectionID", in.CollectionID))
log.Info("releaseCollectionTask Enqueue done", zap.Int64("collectionID", in.CollectionID))
func() {
err = dct.WaitToFinish()
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return
}
log.Debug("releaseCollectionTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID))
log.Info("releaseCollectionTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID))
}()
status := &commonpb.Status{
@ -424,18 +424,18 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.Releas
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
log.Error(err.Error())
log.Warn(err.Error())
return status, nil
}
log.Debug("releasePartitionsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("partitionIDs", in.PartitionIDs))
log.Info("releasePartitionsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("partitionIDs", in.PartitionIDs))
func() {
err = dct.WaitToFinish()
if err != nil {
log.Error(err.Error())
log.Warn(err.Error())
return
}
log.Debug("releasePartitionsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("partitionIDs", in.PartitionIDs))
log.Info("releasePartitionsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("partitionIDs", in.PartitionIDs))
}()
status := &commonpb.Status{
@ -473,7 +473,7 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseS
}
}
log.Debug("release segments done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", in.SegmentIDs))
log.Info("release segments done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", in.SegmentIDs))
return status, nil
}
@ -500,7 +500,7 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmen
// get info from historical
historicalSegmentInfos, err := node.historical.replica.getSegmentInfosByColID(in.CollectionID)
if err != nil {
log.Debug("GetSegmentInfo: get historical segmentInfo failed", zap.Int64("collectionID", in.CollectionID), zap.Error(err))
log.Warn("GetSegmentInfo: get historical segmentInfo failed", zap.Int64("collectionID", in.CollectionID), zap.Error(err))
res := &queryPb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -514,7 +514,7 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmen
// get info from streaming
streamingSegmentInfos, err := node.streaming.replica.getSegmentInfosByColID(in.CollectionID)
if err != nil {
log.Debug("GetSegmentInfo: get streaming segmentInfo failed", zap.Int64("collectionID", in.CollectionID), zap.Error(err))
log.Warn("GetSegmentInfo: get streaming segmentInfo failed", zap.Int64("collectionID", in.CollectionID), zap.Error(err))
res := &queryPb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,

View File

@ -32,12 +32,8 @@ import (
"path/filepath"
"unsafe"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/funcutil"
)
@ -112,7 +108,6 @@ func (li *LoadIndexInfo) appendIndexData(bytesIndex [][]byte, indexKeys []string
indexPtr := unsafe.Pointer(&byteIndex[0])
indexLen := C.int64_t(len(byteIndex))
binarySetKey := filepath.Base(indexKeys[i])
log.Debug("", zap.String("index key", binarySetKey))
indexKey := C.CString(binarySetKey)
status = C.AppendIndexBinary(cBinarySet, indexPtr, indexLen, indexKey)
C.free(unsafe.Pointer(indexKey))

View File

@ -49,7 +49,7 @@ func (p *Partition) ID() UniqueID {
// addSegmentID add segmentID to segmentIDs
func (p *Partition) addSegmentID(segmentID UniqueID) {
p.segmentIDs = append(p.segmentIDs, segmentID)
log.Debug("add a segment to replica", zap.Int64("collectionID", p.collectionID), zap.Int64("partitionID", p.partitionID), zap.Int64("segmentID", segmentID))
log.Info("add a segment to replica", zap.Int64("collectionID", p.collectionID), zap.Int64("partitionID", p.partitionID), zap.Int64("segmentID", segmentID))
}
// removeSegmentID removes segmentID from segmentIDs
@ -61,7 +61,7 @@ func (p *Partition) removeSegmentID(segmentID UniqueID) {
}
}
p.segmentIDs = tmpIDs
log.Debug("remove a segment from replica", zap.Int64("collectionID", p.collectionID), zap.Int64("partitionID", p.partitionID), zap.Int64("segmentID", segmentID))
log.Info("remove a segment from replica", zap.Int64("collectionID", p.collectionID), zap.Int64("partitionID", p.partitionID), zap.Int64("segmentID", segmentID))
}
// newPartition returns a new Partition
@ -71,6 +71,6 @@ func newPartition(collectionID UniqueID, partitionID UniqueID) *Partition {
partitionID: partitionID,
}
log.Debug("create partition", zap.Int64("partitionID", partitionID))
log.Info("create partition", zap.Int64("partitionID", partitionID))
return newPartition
}

View File

@ -58,7 +58,7 @@ func NewSessionManager(options ...SessionOpt) *SessionManager {
func (c *SessionManager) AddSession(node *NodeInfo) {
c.sessions.Lock()
defer c.sessions.Unlock()
log.Info("add proxy session", zap.Int64("node", node.NodeID))
session := NewSession(node, c.sessionCreator)
c.sessions.data[node.NodeID] = session
}
@ -73,7 +73,7 @@ func (c *SessionManager) Startup(nodes []*NodeInfo) {
func (c *SessionManager) DeleteSession(node *NodeInfo) {
c.sessions.Lock()
defer c.sessions.Unlock()
log.Info("delete proxy session", zap.Int64("node", node.NodeID))
if session, ok := c.sessions.data[node.NodeID]; ok {
session.Dispose()
delete(c.sessions.data, node.NodeID)

View File

@ -44,11 +44,11 @@ func (qc *queryChannel) AsConsumer(channelName string, subName string, position
qc.queryMsgStream.AsConsumer([]string{channelName}, subName)
metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Inc()
if position == nil || len(position.MsgID) == 0 {
log.Debug("QueryNode AsConsumer", zap.String("channel", channelName), zap.String("sub name", subName))
log.Info("QueryNode AsConsumer", zap.String("channel", channelName), zap.String("sub name", subName))
} else {
err = qc.queryMsgStream.Seek([]*internalpb.MsgPosition{position})
if err == nil {
log.Debug("querynode seek query channel: ", zap.Any("consumeChannel", channelName),
log.Info("querynode seek query channel: ", zap.Any("consumeChannel", channelName),
zap.String("seek position", string(position.MsgID)))
}
}

View File

@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"math"
"reflect"
"sync"
"time"
@ -181,7 +182,7 @@ func (q *queryCollection) registerCollectionTSafe() error {
return err
}
}
log.Debug("register tSafe watcher and init watcher select case",
log.Info("register tSafe watcher and init watcher select case",
zap.Any("collectionID", streamingCollection.ID()),
zap.Any("dml channels", streamingCollection.getVChannels()))
@ -195,7 +196,7 @@ func (q *queryCollection) registerCollectionTSafe() error {
return err
}
}
log.Debug("register tSafe watcher and init watcher select case",
log.Info("register tSafe watcher and init watcher select case",
zap.Any("collectionID", historicalCollection.ID()),
zap.Any("delta channels", historicalCollection.getVDeltaChannels()))
@ -215,9 +216,10 @@ func (q *queryCollection) addTSafeWatcher(vChannel Channel) error {
q.tSafeWatchers[vChannel] = newTSafeWatcher()
err := q.streaming.tSafeReplica.registerTSafeWatcher(vChannel, q.tSafeWatchers[vChannel])
if err != nil {
log.Warn("failed to register tsafe watcher", zap.Error(err))
return err
}
log.Debug("add tSafeWatcher to queryCollection",
log.Info("add tSafeWatcher to queryCollection",
zap.Any("collectionID", q.collectionID),
zap.Any("channel", vChannel),
)
@ -236,7 +238,7 @@ func (q *queryCollection) removeTSafeWatcher(channel Channel) error {
}
q.tSafeWatchers[channel].close()
delete(q.tSafeWatchers, channel)
log.Debug("remove tSafeWatcher from queryCollection",
log.Info("remove tSafeWatcher from queryCollection",
zap.Any("collectionID", q.collectionID),
zap.Any("channel", channel),
)
@ -247,10 +249,10 @@ func (q *queryCollection) startWatcher(channel <-chan bool, closeCh <-chan struc
for {
select {
case <-q.releaseCtx.Done():
log.Debug("stop queryCollection watcher because queryCollection ctx done", zap.Any("collectionID", q.collectionID))
log.Info("stop queryCollection watcher because queryCollection ctx done", zap.Any("collectionID", q.collectionID))
return
case <-closeCh:
log.Debug("stop queryCollection watcher because watcher closed", zap.Any("collectionID", q.collectionID))
log.Info("stop queryCollection watcher because watcher closed", zap.Any("collectionID", q.collectionID))
return
case <-channel:
// TODO: check if channel is closed
@ -352,7 +354,7 @@ func (q *queryCollection) consumeQuery() {
for {
select {
case <-q.releaseCtx.Done():
log.Debug("stop queryCollection's receiveQueryMsg", zap.Int64("collectionID", q.collectionID))
log.Info("stop queryCollection's receiveQueryMsg", zap.Int64("collectionID", q.collectionID))
return
case msgPack, ok := <-q.queryMsgStream.Chan():
if !ok {
@ -378,7 +380,7 @@ func (q *queryCollection) consumeQuery() {
case *msgstream.SealedSegmentsChangeInfoMsg:
q.adjustByChangeInfo(sm)
default:
log.Warn("unsupported msg type in search channel", zap.Int64("msgID", sm.ID()))
log.Warn("unsupported msg type in search channel", zap.Int64("msgID", sm.ID()), zap.String("name", reflect.TypeOf(msg).Name()))
}
}
}
@ -446,17 +448,9 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) error {
case commonpb.MsgType_Retrieve:
collectionID = msg.(*msgstream.RetrieveMsg).CollectionID
msgTypeStr = "retrieve"
//log.Debug("consume retrieve message",
// zap.Any("collectionID", collectionID),
// zap.Int64("msgID", msg.ID()),
//)
case commonpb.MsgType_Search:
collectionID = msg.(*msgstream.SearchMsg).CollectionID
msgTypeStr = "search"
//log.Debug("consume search message",
// zap.Any("collectionID", collectionID),
// zap.Int64("msgID", msg.ID()),
//)
default:
err := fmt.Errorf("receive invalid msgType = %d", msgType)
return err
@ -493,7 +487,7 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) error {
finalErr := fmt.Errorf("first err = %s, second err = %s", err, publishErr)
return finalErr
}
log.Debug("do query failed in receiveQueryMsg, publish failed query result",
log.Warn("do query failed in receiveQueryMsg, publish failed query result",
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", msg.ID()),
zap.String("msgType", msgTypeStr),
@ -508,7 +502,7 @@ func (q *queryCollection) receiveQueryMsg(msg queryMsg) error {
finalErr := fmt.Errorf("first err = %s, second err = %s", err, publishErr)
return finalErr
}
log.Debug("do query failed in receiveQueryMsg, publish failed query result",
log.Warn("do query failed in receiveQueryMsg, publish failed query result",
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", msg.ID()),
zap.String("msgType", msgTypeStr),
@ -588,7 +582,7 @@ func (q *queryCollection) doUnsolvedQueryMsg() {
for {
select {
case <-q.releaseCtx.Done():
log.Debug("stop Collection's doUnsolvedMsg", zap.Int64("collectionID", q.collectionID))
log.Info("stop Collection's doUnsolvedMsg", zap.Int64("collectionID", q.collectionID))
return
default:
//time.Sleep(10 * time.Millisecond)
@ -628,10 +622,10 @@ func (q *queryCollection) doUnsolvedQueryMsg() {
err := errors.New(fmt.Sprintln("do query failed in doUnsolvedQueryMsg because timeout"+
", collectionID = ", q.collectionID,
", msgID = ", m.ID()))
log.Warn(err.Error())
log.Warn("query timeout", zap.Error(err))
publishErr := q.publishFailedQueryResult(m, err.Error())
if publishErr != nil {
log.Error(publishErr.Error())
log.Warn("failed to publish failed result", zap.Error(publishErr))
}
continue
}
@ -678,24 +672,22 @@ func (q *queryCollection) doUnsolvedQueryMsg() {
}
if err != nil {
log.Warn(err.Error())
err = q.publishFailedQueryResult(m, err.Error())
if err != nil {
log.Warn(err.Error())
} else {
log.Debug("do query failed in doUnsolvedMsg, publish failed query result",
zap.Int64("collectionID", q.collectionID),
zap.Int64("msgID", m.ID()),
)
err = q.publishFailedQueryResult(m, err.Error())
if err != nil {
log.Warn("failed to publish failed result", zap.Error(err))
}
}
sp.Finish()
} else {
log.Debug("do query done in doUnsolvedMsg",
zap.Int64("collectionID", q.collectionID),
zap.Int64("msgID", m.ID()),
)
}
log.Debug("doUnsolvedMsg: do query done", zap.Int("num of query msg", len(unSolvedMsg)))
sp.Finish()
}
}
}
}
@ -854,7 +846,6 @@ func (q *queryCollection) search(msg queryMsg) error {
log.Debug("QueryNode reduce data finished", zap.Int64("msgID", searchMsg.ID()))
sp.LogFields(oplog.String("statistical time", "reduceSearchResults end"))
if err != nil {
log.Error("QueryNode reduce data failed", zap.Int64("msgID", searchMsg.ID()), zap.Error(err))
return err
}
nqOfReqs := []int64{nq}
@ -1160,69 +1151,3 @@ func (q *queryCollection) publishFailedQueryResultWithCtx(ctx context.Context, m
func (q *queryCollection) publishFailedQueryResult(msg msgstream.TsMsg, errMsg string) error {
return q.publishFailedQueryResultWithCtx(q.releaseCtx, msg, errMsg)
}
// func (q *queryCollection) publishQueryResult(msg msgstream.TsMsg, collectionID UniqueID) error {
// span, ctx := trace.StartSpanFromContext(msg.TraceCtx())
// defer span.Finish()
// msg.SetTraceCtx(ctx)
// msgPack := msgstream.MsgPack{}
// msgPack.Msgs = append(msgPack.Msgs, msg)
// err := q.queryResultMsgStream.Produce(&msgPack)
// if err != nil {
// log.Error(err.Error())
// }
//
// return err
// }
// func (q *queryCollection) publishFailedQueryResult(msg msgstream.TsMsg, errMsg string) error {
// msgType := msg.Type()
// span, ctx := trace.StartSpanFromContext(msg.TraceCtx())
// defer span.Finish()
// msg.SetTraceCtx(ctx)
// msgPack := msgstream.MsgPack{}
//
// resultChannelInt := 0
// baseMsg := msgstream.BaseMsg{
// HashValues: []uint32{uint32(resultChannelInt)},
// }
// baseResult := &commonpb.MsgBase{
// MsgID: msg.ID(),
// Timestamp: msg.BeginTs(),
// SourceID: msg.SourceID(),
// }
//
// switch msgType {
// case commonpb.MsgType_Retrieve:
// retrieveMsg := msg.(*msgstream.RetrieveMsg)
// baseResult.MsgType = commonpb.MsgType_RetrieveResult
// retrieveResultMsg := &msgstream.RetrieveResultMsg{
// BaseMsg: baseMsg,
// RetrieveResults: internalpb.RetrieveResults{
// Base: baseResult,
// Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: errMsg},
// ResultChannelID: retrieveMsg.ResultChannelID,
// Ids: nil,
// FieldsData: nil,
// },
// }
// msgPack.Msgs = append(msgPack.Msgs, retrieveResultMsg)
// case commonpb.MsgType_Search:
// searchMsg := msg.(*msgstream.SearchMsg)
// baseResult.MsgType = commonpb.MsgType_SearchResult
// searchResultMsg := &msgstream.SearchResultMsg{
// BaseMsg: baseMsg,
// SearchResults: internalpb.SearchResults{
// Base: baseResult,
// Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: errMsg},
// ResultChannelID: searchMsg.ResultChannelID,
// },
// }
// msgPack.Msgs = append(msgPack.Msgs, searchResultMsg)
// default:
// return fmt.Errorf("publish invalid msgType %d", msgType)
// }
//
// return q.queryResultMsgStream.Produce(&msgPack)
// }
//

View File

@ -149,7 +149,7 @@ func (node *QueryNode) initSession() error {
node.session.Init(typeutil.QueryNodeRole, Params.QueryNodeCfg.QueryNodeIP+":"+strconv.FormatInt(Params.QueryNodeCfg.QueryNodePort, 10), false, true)
Params.QueryNodeCfg.SetNodeID(node.session.ServerID)
Params.SetLogger(Params.QueryNodeCfg.GetNodeID())
log.Debug("QueryNode", zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()), zap.String("node address", node.session.Address))
log.Info("QueryNode init session", zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()), zap.String("node address", node.session.Address))
return nil
}
@ -213,7 +213,7 @@ func (node *QueryNode) initServiceDiscovery() error {
log.Warn("QueryNode failed to init service discovery", zap.Error(err))
return err
}
log.Debug("QueryNode success to get Proxy sessions", zap.Any("sessions", sessions))
log.Info("QueryNode success to get Proxy sessions", zap.Any("sessions", sessions))
nodes := make([]*NodeInfo, 0, len(sessions))
for _, session := range sessions {
@ -235,7 +235,7 @@ func (node *QueryNode) watchService(ctx context.Context) {
for {
select {
case <-ctx.Done():
log.Debug("watch service shutdown")
log.Info("watch service shutdown")
return
case event, ok := <-node.eventCh:
if !ok {
@ -250,17 +250,12 @@ func (node *QueryNode) watchService(ctx context.Context) {
}
return
}
if err := node.handleSessionEvent(ctx, event); err != nil {
log.Warn("handleSessionEvent", zap.Error(err))
}
node.handleSessionEvent(ctx, event)
}
}
}
func (node *QueryNode) handleSessionEvent(ctx context.Context, event *sessionutil.SessionEvent) error {
if event == nil {
return nil
}
func (node *QueryNode) handleSessionEvent(ctx context.Context, event *sessionutil.SessionEvent) {
info := &NodeInfo{
NodeID: event.Session.ServerID,
Address: event.Session.Address,
@ -274,7 +269,6 @@ func (node *QueryNode) handleSessionEvent(ctx context.Context, event *sessionuti
log.Warn("receive unknown service event type",
zap.Any("type", event.EventType))
}
return nil
}
// Init function init historical and streaming module to manage segments
@ -282,7 +276,7 @@ func (node *QueryNode) Init() error {
var initError error = nil
node.initOnce.Do(func() {
//ctx := context.Background()
log.Debug("QueryNode session info", zap.String("metaPath", Params.EtcdCfg.MetaRootPath))
log.Info("QueryNode session info", zap.String("metaPath", Params.EtcdCfg.MetaRootPath))
err := node.initSession()
if err != nil {
log.Error("QueryNode init session failed", zap.Error(err))
@ -307,7 +301,7 @@ func (node *QueryNode) Init() error {
}
node.etcdKV = etcdkv.NewEtcdKV(node.etcdCli, Params.EtcdCfg.MetaRootPath)
log.Debug("queryNode try to connect etcd success", zap.Any("MetaRootPath", Params.EtcdCfg.MetaRootPath))
log.Info("queryNode try to connect etcd success", zap.Any("MetaRootPath", Params.EtcdCfg.MetaRootPath))
node.tSafeReplica = newTSafeReplica()
streamingReplica := newCollectionReplica(node.etcdKV)
@ -349,7 +343,7 @@ func (node *QueryNode) Init() error {
// node.factory,
// qsOptWithSessionManager(node.sessionManager))
log.Debug("query node init successfully",
log.Info("query node init successfully",
zap.Any("queryNodeID", Params.QueryNodeCfg.GetNodeID()),
zap.Any("IP", Params.QueryNodeCfg.QueryNodeIP),
zap.Any("Port", Params.QueryNodeCfg.QueryNodePort),
@ -385,7 +379,7 @@ func (node *QueryNode) Start() error {
Params.QueryNodeCfg.UpdatedTime = time.Now()
node.UpdateStateCode(internalpb.StateCode_Healthy)
log.Debug("query node start successfully",
log.Info("query node start successfully",
zap.Any("queryNodeID", Params.QueryNodeCfg.GetNodeID()),
zap.Any("IP", Params.QueryNodeCfg.QueryNodeIP),
zap.Any("Port", Params.QueryNodeCfg.QueryNodePort),
@ -395,6 +389,7 @@ func (node *QueryNode) Start() error {
// Stop mainly stop QueryNode's query service, historical loop and streaming loop.
func (node *QueryNode) Stop() error {
log.Warn("Query node stop..")
node.UpdateStateCode(internalpb.StateCode_Abnormal)
node.queryNodeLoopCancel()
@ -435,12 +430,12 @@ func (node *QueryNode) SetEtcdClient(client *clientv3.Client) {
}
func (node *QueryNode) watchChangeInfo() {
log.Debug("query node watchChangeInfo start")
log.Info("query node watchChangeInfo start")
watchChan := node.etcdKV.WatchWithPrefix(util.ChangeInfoMetaPrefix)
for {
select {
case <-node.queryNodeLoopCtx.Done():
log.Debug("query node watchChangeInfo close")
log.Info("query node watchChangeInfo close")
return
case resp := <-watchChan:
for _, event := range resp.Events {
@ -451,7 +446,7 @@ func (node *QueryNode) watchChangeInfo() {
log.Warn("Parse SealedSegmentsChangeInfo id failed", zap.Any("error", err.Error()))
continue
}
log.Debug("get SealedSegmentsChangeInfo from etcd",
log.Info("get SealedSegmentsChangeInfo from etcd",
zap.Any("infoID", infoID),
)
info := &querypb.SealedSegmentsChangeInfo{}
@ -525,7 +520,7 @@ func (node *QueryNode) removeSegments(segmentChangeInfos *querypb.SealedSegments
if err != nil {
return err
}
log.Debug("remove growing segment in removeSegments",
log.Info("remove growing segment in removeSegments",
zap.Any("collectionID", segmentInfo.CollectionID),
zap.Any("segmentID", segmentInfo.SegmentID),
zap.Any("infoID", segmentChangeInfos.Base.GetMsgID()),
@ -541,7 +536,7 @@ func (node *QueryNode) removeSegments(segmentChangeInfos *querypb.SealedSegments
if err != nil {
return err
}
log.Debug("remove sealed segment", zap.Any("collectionID", segmentInfo.CollectionID),
log.Info("remove sealed segment", zap.Any("collectionID", segmentInfo.CollectionID),
zap.Any("segmentID", segmentInfo.SegmentID),
zap.Any("infoID", segmentChangeInfos.Base.GetMsgID()),
)

View File

@ -0,0 +1,160 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querynode
import "C"
import (
"context"
"errors"
"fmt"
"sync"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/storage"
)
type queryService struct {
ctx context.Context
cancel context.CancelFunc
historical *historical
streaming *streaming
queryCollectionMu sync.Mutex // guards queryCollections
queryCollections map[UniqueID]*queryCollection
factory msgstream.Factory
sessionManager *SessionManager
cacheStorage storage.ChunkManager
vectorStorage storage.ChunkManager
}
type qsOpt func(*queryService)
func qsOptWithSessionManager(s *SessionManager) qsOpt {
return func(qs *queryService) {
qs.sessionManager = s
}
}
func newQueryService(ctx context.Context,
historical *historical,
streaming *streaming,
vectorStorage storage.ChunkManager,
cacheStorage storage.ChunkManager,
factory msgstream.Factory,
opts ...qsOpt,
) *queryService {
queryServiceCtx, queryServiceCancel := context.WithCancel(ctx)
qs := &queryService{
ctx: queryServiceCtx,
cancel: queryServiceCancel,
historical: historical,
streaming: streaming,
queryCollections: make(map[UniqueID]*queryCollection),
vectorStorage: vectorStorage,
cacheStorage: cacheStorage,
factory: factory,
}
for _, opt := range opts {
opt(qs)
}
return qs
}
func (q *queryService) close() {
log.Info("search service closed")
q.queryCollectionMu.Lock()
for collectionID, sc := range q.queryCollections {
sc.close()
sc.cancel()
delete(q.queryCollections, collectionID)
}
q.queryCollections = make(map[UniqueID]*queryCollection)
q.queryCollectionMu.Unlock()
q.cancel()
}
func (q *queryService) addQueryCollection(collectionID UniqueID) error {
q.queryCollectionMu.Lock()
defer q.queryCollectionMu.Unlock()
if _, ok := q.queryCollections[collectionID]; ok {
log.Warn("query collection already exists", zap.Any("collectionID", collectionID))
err := errors.New(fmt.Sprintln("query collection already exists, collectionID = ", collectionID))
return err
}
ctx1, cancel := context.WithCancel(q.ctx)
qc, err := newQueryCollection(ctx1,
cancel,
collectionID,
q.historical,
q.streaming,
q.factory,
q.cacheStorage,
q.vectorStorage,
qcOptWithSessionManager(q.sessionManager),
)
if err != nil {
return err
}
q.queryCollections[collectionID] = qc
return nil
}
func (q *queryService) hasQueryCollection(collectionID UniqueID) bool {
q.queryCollectionMu.Lock()
defer q.queryCollectionMu.Unlock()
_, ok := q.queryCollections[collectionID]
return ok
}
func (q *queryService) getQueryCollection(collectionID UniqueID) (*queryCollection, error) {
q.queryCollectionMu.Lock()
defer q.queryCollectionMu.Unlock()
_, ok := q.queryCollections[collectionID]
if ok {
return q.queryCollections[collectionID], nil
}
return nil, errors.New(fmt.Sprintln("queryCollection not exists, collectionID = ", collectionID))
}
func (q *queryService) stopQueryCollection(collectionID UniqueID) {
q.queryCollectionMu.Lock()
defer q.queryCollectionMu.Unlock()
sc, ok := q.queryCollections[collectionID]
if !ok {
log.Warn("stopQueryCollection failed, collection doesn't exist", zap.Int64("collectionID", collectionID))
return
}
sc.close()
sc.cancel()
// for not blocking waitNewTsafe, which will block doUnsolvedMsg quit.
sc.watcherCond.Broadcast()
delete(q.queryCollections, collectionID)
}

View File

@ -179,10 +179,10 @@ func (q *queryShard) watchTs(channel <-chan bool, closeCh <-chan struct{}, tp ts
for {
select {
case <-q.ctx.Done():
log.Debug("stop queryShard watcher due to ctx done", zap.Int64("collectionID", q.collectionID), zap.String("vChannel", q.channel))
log.Info("stop queryShard watcher due to ctx done", zap.Int64("collectionID", q.collectionID), zap.String("vChannel", q.channel))
return
case <-closeCh:
log.Debug("stop queryShard watcher due to watcher closed", zap.Int64("collectionID", q.collectionID), zap.String("vChannel", q.channel))
log.Info("stop queryShard watcher due to watcher closed", zap.Int64("collectionID", q.collectionID), zap.String("vChannel", q.channel))
return
case _, ok := <-channel:
if !ok {
@ -458,7 +458,6 @@ func (q *queryShard) searchLeader(ctx context.Context, req *querypb.SearchReques
}
// reduce shard search results: unmarshal -> reduce -> marshal
log.Debug("shard leader get search results", zap.Int("numbers", len(results)))
searchResultData, err := decodeSearchResults(results)
if err != nil {
log.Warn("shard leader decode search results errors", zap.Error(err))

View File

@ -23,8 +23,10 @@ import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/dependency"
"go.uber.org/zap"
)
type queryShardService struct {
@ -92,6 +94,7 @@ func (q *queryShardService) addQueryShard(collectionID UniqueID, channel Channel
q.localCacheEnabled,
)
q.queryShards[channel] = qs
log.Info("Successfully add query shard", zap.Int64("collection", collectionID), zap.Int64("replica", replicaID), zap.String("channel", channel))
return nil
}
@ -102,6 +105,7 @@ func (q *queryShardService) removeQueryShard(channel Channel) error {
return errors.New(fmt.Sprintln("query shard(channel) ", channel, " does not exist"))
}
delete(q.queryShards, channel)
log.Info("Successfully remove query shard", zap.String("channel", channel))
return nil
}
@ -122,6 +126,7 @@ func (q *queryShardService) getQueryShard(channel Channel) (*queryShard, error)
}
func (q *queryShardService) close() {
log.Warn("Close query shard service")
q.cancel()
q.queryShardsMu.Lock()
defer q.queryShardsMu.Unlock()
@ -161,4 +166,5 @@ func (q *queryShardService) releaseCollection(collectionID int64) {
}
}
q.queryShardsMu.Unlock()
log.Info("release collection in query shard service", zap.Int64("collectionId", collectionID))
}

View File

@ -199,7 +199,7 @@ func newSegment(collection *Collection, segmentID UniqueID, partitionID UniqueID
return nil, err
}
log.Debug("create segment",
log.Info("create segment",
zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", partitionID),
zap.Int64("segmentID", segmentID),
@ -236,7 +236,7 @@ func deleteSegment(segment *Segment) {
C.DeleteSegment(cPtr)
segment.segmentPtr = nil
log.Debug("delete segment from memory", zap.Int64("collectionID", segment.collectionID), zap.Int64("partitionID", segment.partitionID), zap.Int64("segmentID", segment.ID()))
log.Info("delete segment from memory", zap.Int64("collectionID", segment.collectionID), zap.Int64("partitionID", segment.partitionID), zap.Int64("segmentID", segment.ID()))
segment = nil
}
@ -345,6 +345,7 @@ func (s *Segment) retrieve(plan *RetrievePlan) (*segcorepb.RetrieveResults, erro
status := C.Retrieve(s.segmentPtr, plan.cRetrievePlan, ts, &retrieveResult.cRetrieveResult)
metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
log.Debug("do retrieve on segment", zap.Int64("segmentID", s.segmentID), zap.Int32("segmentType", int32(s.segmentType)))
if err := HandleCStatus(&status, "Retrieve failed"); err != nil {
return nil, err
}
@ -570,7 +571,7 @@ func (s *Segment) updateBloomFilter(pks []primaryKey) {
stringValue := pk.(*varCharPrimaryKey).Value
s.pkFilter.AddString(stringValue)
default:
//TODO::
log.Warn("failed to update bloomfilter", zap.Any("PK type", pk.Type()))
}
}
}
@ -759,7 +760,7 @@ func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int64, data *sche
return err
}
log.Debug("load field done",
log.Info("load field done",
zap.Int64("fieldID", fieldID),
zap.Int64("row count", rowCount),
zap.Int64("segmentID", s.ID()))
@ -827,7 +828,7 @@ func (s *Segment) segmentLoadDeletedRecord(primaryKeys []primaryKey, timestamps
return err
}
log.Debug("load deleted record done",
log.Info("load deleted record done",
zap.Int64("row count", rowCount),
zap.Int64("segmentID", s.ID()))
return nil
@ -861,7 +862,7 @@ func (s *Segment) segmentLoadIndexData(bytesIndex [][]byte, indexInfo *querypb.F
return err
}
log.Debug("updateSegmentIndex done", zap.Int64("segmentID", s.ID()))
log.Info("updateSegmentIndex done", zap.Int64("segmentID", s.ID()))
return nil
}

View File

@ -24,10 +24,6 @@ import (
"runtime"
"strconv"
"sync"
"time"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/common"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
@ -45,10 +41,10 @@ import (
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
const timeoutForEachRead = 10 * time.Second
// segmentLoader is only responsible for loading the field data from binlog
type segmentLoader struct {
historicalReplica ReplicaInterface
@ -113,9 +109,9 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, segme
return err
}
log.Debug("segmentLoader start loading...",
zap.Int64("collectionID", req.CollectionID),
zap.Int("numOfSegments", len(req.Infos)),
log.Info("segmentLoader start loading...",
zap.Any("collectionID", req.CollectionID),
zap.Any("numOfSegments", len(req.Infos)),
zap.Any("loadType", segmentType),
)
// check memory limit
@ -230,7 +226,7 @@ func (loader *segmentLoader) loadSegmentInternal(segment *Segment,
collectionID := loadInfo.CollectionID
partitionID := loadInfo.PartitionID
segmentID := loadInfo.SegmentID
log.Debug("start loading segment data into memory",
log.Info("start loading segment data into memory",
zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", partitionID),
zap.Int64("segmentID", segmentID))
@ -283,7 +279,7 @@ func (loader *segmentLoader) loadSegmentInternal(segment *Segment,
if pkFieldID == common.InvalidFieldID {
log.Warn("segment primary key field doesn't exist when load segment")
} else {
log.Debug("loading bloom filter...")
log.Debug("loading bloom filter...", zap.Int64("segmentID", segmentID))
pkStatsBinlogs := loader.filterPKStatsBinlogs(loadInfo.Statslogs, pkFieldID)
err = loader.loadSegmentBloomFilter(segment, pkStatsBinlogs)
if err != nil {
@ -291,7 +287,7 @@ func (loader *segmentLoader) loadSegmentInternal(segment *Segment,
}
}
log.Debug("loading delta...")
log.Debug("loading delta...", zap.Int64("segmentID", segmentID))
err = loader.loadDeltaLogs(segment, loadInfo.Deltalogs)
return err
}
@ -476,7 +472,7 @@ func (loader *segmentLoader) loadGrowingSegments(segment *Segment,
return errors.New(fmt.Sprintln("illegal insert data when load segment, collectionID = ", segment.collectionID))
}
log.Debug("start load growing segments...",
log.Info("start load growing segments...",
zap.Any("collectionID", segment.collectionID),
zap.Any("segmentID", segment.ID()),
zap.Any("numRows", len(ids)),
@ -516,7 +512,7 @@ func (loader *segmentLoader) loadGrowingSegments(segment *Segment,
if err != nil {
return err
}
log.Debug("Do insert done in segment loader", zap.Int("len", numOfRecords), zap.Int64("segmentID", segment.ID()), zap.Int64("collectionID", segment.collectionID))
log.Info("Do insert done fro growing segment ", zap.Int("len", numOfRecords), zap.Int64("segmentID", segment.ID()), zap.Int64("collectionID", segment.collectionID))
return nil
}
@ -608,7 +604,7 @@ func (loader *segmentLoader) loadDeltaLogs(segment *Segment, deltaLogs []*datapb
}
func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collectionID int64, position *internalpb.MsgPosition) error {
log.Debug("from dml check point load delete", zap.Any("position", position), zap.Any("msg id", position.MsgID))
log.Info("from dml check point load delete", zap.Any("position", position), zap.Any("msg id", position.MsgID))
stream, err := loader.factory.NewMsgStream(ctx)
if err != nil {
return err
@ -629,7 +625,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
}
if lastMsgID.AtEarliestPosition() {
log.Debug("there is no more delta msg", zap.Int64("Collection ID", collectionID), zap.String("channel", pChannelName))
log.Info("there is no more delta msg", zap.Int64("Collection ID", collectionID), zap.String("channel", pChannelName))
return nil
}
@ -646,7 +642,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
deleteOffset: make(map[UniqueID]int64),
}
log.Debug("start read delta msg from seek position to last position",
log.Info("start read delta msg from seek position to last position",
zap.Int64("Collection ID", collectionID), zap.String("channel", pChannelName))
hasMore := true
for hasMore {
@ -692,12 +688,12 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
}
}
log.Debug("All data has been read, there is no more data", zap.Int64("Collection ID", collectionID),
log.Info("All data has been read, there is no more data", zap.Int64("Collection ID", collectionID),
zap.String("channel", pChannelName), zap.Any("msg id", position.GetMsgID()))
for segmentID, pks := range delData.deleteIDs {
segment, err := loader.historicalReplica.getSegmentByID(segmentID)
if err != nil {
log.Debug(err.Error())
log.Warn(err.Error())
continue
}
offset := segment.segmentPreDelete(len(pks))
@ -710,7 +706,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
go deletePk(loader.historicalReplica, delData, segmentID, &wg)
}
wg.Wait()
log.Debug("from dml check point load done", zap.Any("msg id", position.GetMsgID()))
log.Info("from dml check point load done", zap.Any("msg id", position.GetMsgID()))
return nil
}

View File

@ -168,6 +168,7 @@ func NewShardCluster(collectionID int64, replicaID int64, vchannelName string,
}
func (sc *ShardCluster) Close() {
log.Info("Close shard cluster")
sc.closeOnce.Do(func() {
sc.state.Store(int32(unavailable))
close(sc.closeCh)
@ -176,7 +177,7 @@ func (sc *ShardCluster) Close() {
// addNode add a node into cluster
func (sc *ShardCluster) addNode(evt nodeEvent) {
log.Debug("ShardCluster add node", zap.Int64("nodeID", evt.nodeID))
log.Info("ShardCluster add node", zap.Int64("nodeID", evt.nodeID))
sc.mut.Lock()
defer sc.mut.Unlock()
@ -198,6 +199,7 @@ func (sc *ShardCluster) addNode(evt nodeEvent) {
// removeNode handles node offline and setup related segments
func (sc *ShardCluster) removeNode(evt nodeEvent) {
log.Info("ShardCluster remove node", zap.Int64("nodeID", evt.nodeID))
sc.mut.Lock()
defer sc.mut.Unlock()
@ -220,8 +222,7 @@ func (sc *ShardCluster) removeNode(evt nodeEvent) {
// updateSegment apply segment change to shard cluster
func (sc *ShardCluster) updateSegment(evt segmentEvent) {
log.Debug("ShardCluster update segment", zap.Int64("nodeID", evt.nodeID), zap.Int64("segmentID", evt.segmentID), zap.Int32("state", int32(evt.state)))
log.Info("ShardCluster update segment", zap.Int64("nodeID", evt.nodeID), zap.Int64("segmentID", evt.segmentID), zap.Int32("state", int32(evt.state)))
// notify handoff wait online if any
defer func() {
sc.segmentCond.L.Lock()
@ -248,6 +249,7 @@ func (sc *ShardCluster) updateSegment(evt segmentEvent) {
// SyncSegments synchronize segment distribution in batch
func (sc *ShardCluster) SyncSegments(distribution []*querypb.ReplicaSegmentsInfo, state segmentState) {
log.Info("ShardCluster sync segments", zap.Any("replica segments", distribution), zap.Int32("state", int32(state)))
// notify handoff wait online if any
defer func() {
sc.segmentCond.L.Lock()
@ -312,7 +314,7 @@ func (sc *ShardCluster) transferSegment(old *shardSegmentInfo, evt segmentEvent)
// removeSegment removes segment from cluster
// should only applied in hand-off or load balance procedure
func (sc *ShardCluster) removeSegment(evt segmentEvent) {
log.Debug("ShardCluster remove segment", zap.Int64("nodeID", evt.nodeID), zap.Int64("segmentID", evt.segmentID), zap.Int32("state", int32(evt.state)))
log.Info("ShardCluster remove segment", zap.Int64("nodeID", evt.nodeID), zap.Int64("segmentID", evt.segmentID), zap.Int32("state", int32(evt.state)))
sc.mut.Lock()
defer sc.mut.Unlock()
@ -680,6 +682,7 @@ func (sc *ShardCluster) Search(ctx context.Context, req *querypb.SearchRequest)
wg.Wait()
if err != nil {
log.Error(err.Error())
return nil, err
}
@ -735,6 +738,7 @@ func (sc *ShardCluster) Query(ctx context.Context, req *querypb.QueryRequest) ([
wg.Wait()
if err != nil {
log.Error(err.Error())
return nil, err
}

View File

@ -8,11 +8,13 @@ import (
"sync"
grpcquerynodeclient "github.com/milvus-io/milvus/internal/distributed/querynode/client"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
const (
@ -75,6 +77,7 @@ func (s *ShardClusterService) addShardCluster(collectionID, replicaID int64, vch
})
s.clusters.Store(vchannelName, cs)
log.Info("successfully add shard cluster", zap.Int64("collectionID", collectionID), zap.Int64("replica", replicaID), zap.String("vchan", vchannelName))
}
// getShardCluster gets shardCluster of specified vchannel if exists.
@ -107,6 +110,7 @@ func (s *ShardClusterService) releaseCollection(collectionID int64) {
}
return true
})
log.Info("successfully release collection", zap.Int64("collectionID", collectionID))
}
// HandoffSegments dispatch segmentChangeInfo to related shardClusters
@ -124,6 +128,7 @@ func (s *ShardClusterService) HandoffSegments(collectionID int64, info *querypb.
return true
})
wg.Wait()
log.Info("successfully handoff segments", zap.Int64("collectionID", collectionID))
}
// SyncReplicaSegments dispatches nodeID segments distribution to ShardCluster.
@ -134,7 +139,7 @@ func (s *ShardClusterService) SyncReplicaSegments(vchannelName string, distribut
}
sc.SyncSegments(distribution, segmentStateLoaded)
log.Info("successfully sync segments", zap.String("channel", vchannelName), zap.Any("distribution", distribution))
return nil
}
@ -146,5 +151,11 @@ func (s *ShardClusterService) HandoffVChannelSegments(vchannel string, info *que
return nil
}
sc := raw.(*ShardCluster)
return sc.HandoffSegments(info)
err := sc.HandoffSegments(info)
if err != nil {
log.Info("successfully handoff ", zap.String("channel", vchannel), zap.Any("segment", info))
} else {
log.Warn("failed to handoff", zap.String("channel", vchannel), zap.Any("segment", info), zap.Error(err))
}
return err
}

View File

@ -65,7 +65,7 @@ func (nd *etcdShardNodeDetector) Close() {
// watchNodes lists current online nodes and returns a channel for incoming events.
func (nd *etcdShardNodeDetector) watchNodes(collectionID int64, replicaID int64, vchannelName string) ([]nodeEvent, <-chan nodeEvent) {
log.Debug("nodeDetector watch", zap.Int64("collectionID", collectionID), zap.Int64("replicaID", replicaID), zap.String("vchannelName", vchannelName))
log.Info("nodeDetector watch", zap.Int64("collectionID", collectionID), zap.Int64("replicaID", replicaID), zap.String("vchannelName", vchannelName))
resp, err := nd.client.Get(context.Background(), nd.path, clientv3.WithPrefix())
if err != nil {
log.Warn("Etcd NodeDetector get replica info failed", zap.Error(err))
@ -74,7 +74,7 @@ func (nd *etcdShardNodeDetector) watchNodes(collectionID int64, replicaID int64,
idAddr, err := nd.idAddr()
if err != nil {
log.Warn("Etcd NodeDetector session map failed", zap.Error(err))
log.Error("Etcd NodeDetector session map failed", zap.Error(err))
panic(err)
}
@ -121,7 +121,6 @@ func (nd *etcdShardNodeDetector) cancelClose(cancel func()) {
}
func (nd *etcdShardNodeDetector) watch(ch clientv3.WatchChan, collectionID, replicaID int64) {
log.Debug("etcdNodeDetector start watch")
defer nd.wg.Done()
for {
select {
@ -171,7 +170,7 @@ func (nd *etcdShardNodeDetector) handlePutEvent(e *clientv3.Event, collectionID,
idAddr, err := nd.idAddr()
if err != nil {
log.Warn("Etcd NodeDetector session map failed", zap.Error(err))
log.Error("Etcd NodeDetector session map failed", zap.Error(err))
panic(err)
}
// all node is added
@ -247,7 +246,7 @@ func (nd *etcdShardNodeDetector) handleDelEvent(e *clientv3.Event, collectionID,
}
idAddr, err := nd.idAddr()
if err != nil {
log.Warn("Etcd NodeDetector session map failed", zap.Error(err))
log.Error("Etcd NodeDetector session map failed", zap.Error(err))
panic(err)
}
for _, id := range prevInfo.GetNodeIds() {

View File

@ -71,13 +71,13 @@ func (sd *etcdShardSegmentDetector) getCtx() context.Context {
}
func (sd *etcdShardSegmentDetector) watchSegments(collectionID int64, replicaID int64, vchannelName string) ([]segmentEvent, <-chan segmentEvent) {
log.Debug("segmentDetector start watch", zap.Int64("collectionID", collectionID),
log.Info("segmentDetector start watch", zap.Int64("collectionID", collectionID),
zap.Int64("replicaID", replicaID),
zap.String("vchannelName", vchannelName),
zap.String("rootPath", sd.path))
resp, err := sd.client.Get(context.Background(), sd.path, clientv3.WithPrefix())
if err != nil {
log.Warn("Etcd SegmentDetector get replica info failed", zap.Error(err))
log.Error("Etcd SegmentDetector get replica info failed", zap.Error(err))
panic(err)
}
@ -112,7 +112,6 @@ func (sd *etcdShardSegmentDetector) watchSegments(collectionID int64, replicaID
func (sd *etcdShardSegmentDetector) watch(ch clientv3.WatchChan, collectionID int64, replicaID int64, vchannel string) {
defer sd.wg.Done()
log.Debug("etcdSegmentDetector start watch")
for {
select {
case <-sd.closeCh:

View File

@ -54,7 +54,7 @@ func (sService *statsService) start() {
statsStream, _ := sService.msFactory.NewMsgStream(sService.ctx)
statsStream.AsProducer(producerChannels)
log.Debug("QueryNode statsService AsProducer succeed", zap.Strings("channels", producerChannels))
log.Info("QueryNode statsService AsProducer succeed", zap.Strings("channels", producerChannels))
var statsMsgStream msgstream.MsgStream = statsStream
@ -65,7 +65,7 @@ func (sService *statsService) start() {
for {
select {
case <-sService.ctx.Done():
log.Debug("stats service closed")
log.Info("stats service closed")
return
case <-time.After(time.Duration(sleepTimeInterval) * time.Millisecond):
sService.publicStatistic(nil)
@ -103,6 +103,6 @@ func (sService *statsService) publicStatistic(fieldStats []*internalpb.FieldStat
}
err := sService.statsStream.Produce(&msgPack)
if err != nil {
log.Error(err.Error())
log.Warn("failed to publish stats", zap.Error(err))
}
}

View File

@ -129,6 +129,7 @@ func (s *streaming) search(searchReqs []*searchRequest, collID UniqueID, partIDs
return searchResults, searchSegmentIDs, searchPartIDs, nil
}
if err != nil {
log.Error(err.Error())
return searchResults, searchSegmentIDs, searchPartIDs, err
}
log.Debug("no partition specified, search all partitions",

View File

@ -128,7 +128,7 @@ func (r *addQueryChannelTask) PreExecute(ctx context.Context) error {
}
func (r *addQueryChannelTask) Execute(ctx context.Context) error {
log.Debug("Execute addQueryChannelTask",
log.Info("Execute addQueryChannelTask",
zap.Any("collectionID", r.req.CollectionID))
collectionID := r.req.CollectionID
@ -137,7 +137,7 @@ func (r *addQueryChannelTask) Execute(ctx context.Context) error {
}
qc := r.node.queryShardService.getQueryChannel(collectionID)
log.Debug("add query channel for collection", zap.Int64("collectionID", collectionID))
log.Info("add query channel for collection", zap.Int64("collectionID", collectionID))
consumeSubName := funcutil.GenChannelSubName(Params.CommonCfg.QueryNodeSubName, collectionID, Params.QueryNodeCfg.GetNodeID())
@ -154,9 +154,7 @@ func (r *addQueryChannelTask) Execute(ctx context.Context) error {
}*/
qc.Start()
log.Debug("start query channel", zap.Int64("collectionID", collectionID))
log.Debug("addQueryChannelTask done",
log.Info("addQueryChannelTask done",
zap.Any("collectionID", r.req.CollectionID),
)
return nil
@ -218,7 +216,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
return errors.New("get physical channels failed, illegal channel length, collectionID = " + fmt.Sprintln(collectionID))
}
log.Debug("Starting WatchDmChannels ...",
log.Info("Starting WatchDmChannels ...",
zap.String("collectionName", w.req.Schema.Name),
zap.Int64("collectionID", collectionID),
zap.Int64("replicaID", w.req.GetReplicaID()),
@ -277,15 +275,16 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
w.node.streaming.replica.addPartition(collectionID, partitionID)
}
log.Debug("loading growing segments in WatchDmChannels...",
log.Info("loading growing segments in WatchDmChannels...",
zap.Int64("collectionID", collectionID),
zap.Int64s("unFlushedSegmentIDs", unFlushedSegmentIDs),
)
err := w.node.loader.loadSegment(req, segmentTypeGrowing)
if err != nil {
log.Warn(err.Error())
return err
}
log.Debug("successfully load growing segments done in WatchDmChannels",
log.Info("successfully load growing segments done in WatchDmChannels",
zap.Int64("collectionID", collectionID),
zap.Int64s("unFlushedSegmentIDs", unFlushedSegmentIDs),
)
@ -312,7 +311,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
info.SeekPosition.MsgGroup = consumeSubName
channel2SeekPosition[info.ChannelName] = info.SeekPosition
}
log.Debug("watchDMChannel, group channels done", zap.Int64("collectionID", collectionID))
log.Info("watchDMChannel, group channels done", zap.Int64("collectionID", collectionID))
// add excluded segments for unFlushed segments,
// unFlushed segments before check point should be filtered out.
@ -325,7 +324,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
for i := 0; i < len(unFlushedCheckPointInfos); i++ {
unflushedSegmentIDs = append(unflushedSegmentIDs, unFlushedCheckPointInfos[i].GetID())
}
log.Debug("watchDMChannel, add check points info for unFlushed segments done",
log.Info("watchDMChannel, add check points info for unFlushed segments done",
zap.Int64("collectionID", collectionID),
zap.Any("unflushedSegmentIDs", unflushedSegmentIDs),
)
@ -345,7 +344,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
}
}
w.node.streaming.replica.addExcludedSegments(collectionID, flushedCheckPointInfos)
log.Debug("watchDMChannel, add check points info for flushed segments done",
log.Info("watchDMChannel, add check points info for flushed segments done",
zap.Int64("collectionID", collectionID),
zap.Any("flushedCheckPointInfos", flushedCheckPointInfos),
)
@ -365,7 +364,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
}
}
w.node.streaming.replica.addExcludedSegments(collectionID, droppedCheckPointInfos)
log.Debug("watchDMChannel, add check points info for dropped segments done",
log.Info("watchDMChannel, add check points info for dropped segments done",
zap.Int64("collectionID", collectionID),
zap.Any("droppedCheckPointInfos", droppedCheckPointInfos),
)
@ -376,7 +375,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
log.Warn("watchDMChannel, add flowGraph for dmChannels failed", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels), zap.Error(err))
return err
}
log.Debug("Query node add DML flow graphs", zap.Int64("collectionID", collectionID), zap.Any("channels", vChannels))
log.Info("Query node add DML flow graphs", zap.Int64("collectionID", collectionID), zap.Any("channels", vChannels))
// channels as consumer
for channel, fg := range channel2FlowGraph {
@ -414,7 +413,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
return err
}
log.Debug("watchDMChannel, add flowGraph for dmChannels success", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels))
log.Info("watchDMChannel, add flowGraph for dmChannels success", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels))
sCol.addVChannels(vChannels)
sCol.addPChannels(pChannels)
@ -423,7 +422,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
hCol.addVChannels(vChannels)
hCol.addPChannels(pChannels)
hCol.setLoadType(lType)
log.Debug("watchDMChannel, init replica done", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels))
log.Info("watchDMChannel, init replica done", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels))
// create tSafe
for _, channel := range vChannels {
@ -452,7 +451,7 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
fg.flowGraph.Start()
}
log.Debug("WatchDmChannels done", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels))
log.Info("WatchDmChannels done", zap.Int64("collectionID", collectionID), zap.Strings("vChannels", vChannels))
return nil
}
@ -498,7 +497,7 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error {
VPDeltaChannels[v] = p
vChannel2SeekPosition[v] = info.SeekPosition
}
log.Debug("Starting WatchDeltaChannels ...",
log.Info("Starting WatchDeltaChannels ...",
zap.Any("collectionID", collectionID),
zap.Any("vDeltaChannels", vDeltaChannels),
zap.Any("pChannels", pDeltaChannels),
@ -506,7 +505,7 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error {
if len(VPDeltaChannels) != len(vDeltaChannels) {
return errors.New("get physical channels failed, illegal channel length, collectionID = " + fmt.Sprintln(collectionID))
}
log.Debug("Get physical channels done",
log.Info("Get physical channels done",
zap.Any("collectionID", collectionID),
)
@ -559,7 +558,7 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error {
return err
}
log.Debug("watchDeltaChannel, add flowGraph for deltaChannel success", zap.Int64("collectionID", collectionID), zap.Strings("vDeltaChannels", vDeltaChannels))
log.Info("watchDeltaChannel, add flowGraph for deltaChannel success", zap.Int64("collectionID", collectionID), zap.Strings("vDeltaChannels", vDeltaChannels))
//set collection replica
hCol.addVDeltaChannels(vDeltaChannels)
@ -600,7 +599,7 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error {
fg.flowGraph.Start()
}
log.Debug("WatchDeltaChannels done", zap.Int64("collectionID", collectionID), zap.String("ChannelIDs", fmt.Sprintln(vDeltaChannels)))
log.Info("WatchDeltaChannels done", zap.Int64("collectionID", collectionID), zap.String("ChannelIDs", fmt.Sprintln(vDeltaChannels)))
return nil
}
@ -632,7 +631,7 @@ func (l *loadSegmentsTask) PreExecute(ctx context.Context) error {
func (l *loadSegmentsTask) Execute(ctx context.Context) error {
// TODO: support db
log.Debug("LoadSegment start", zap.Int64("msgID", l.req.Base.MsgID))
log.Info("LoadSegment start", zap.Int64("msgID", l.req.Base.MsgID))
var err error
// init meta
@ -656,7 +655,7 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error {
return err
}
log.Debug("LoadSegments done", zap.Int64("msgID", l.req.Base.MsgID))
log.Info("LoadSegments done", zap.Int64("msgID", l.req.Base.MsgID))
return nil
}
@ -695,12 +694,11 @@ const (
)
func (r *releaseCollectionTask) Execute(ctx context.Context) error {
log.Debug("Execute release collection task", zap.Any("collectionID", r.req.CollectionID))
log.Debug("release streaming", zap.Any("collectionID", r.req.CollectionID))
log.Info("Execute release collection task", zap.Any("collectionID", r.req.CollectionID))
// sleep to wait for query tasks done
const gracefulReleaseTime = 1
time.Sleep(gracefulReleaseTime * time.Second)
log.Debug("Starting release collection...",
log.Info("Starting release collection...",
zap.Any("collectionID", r.req.CollectionID),
)
@ -712,7 +710,7 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error {
}
// remove collection metas in streaming and historical
log.Debug("release historical", zap.Any("collectionID", r.req.CollectionID))
log.Info("release historical", zap.Any("collectionID", r.req.CollectionID))
err = r.releaseReplica(r.node.historical.replica, replicaHistorical)
if err != nil {
return fmt.Errorf("release collection failed, collectionID = %d, err = %s", r.req.CollectionID, err)
@ -720,7 +718,7 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error {
debug.FreeOSMemory()
log.Debug("ReleaseCollection done", zap.Int64("collectionID", r.req.CollectionID))
log.Info("ReleaseCollection done", zap.Int64("collectionID", r.req.CollectionID))
return nil
}
@ -730,7 +728,7 @@ func (r *releaseCollectionTask) releaseReplica(replica ReplicaInterface, replica
return err
}
// set release time
log.Debug("set release time", zap.Any("collectionID", r.req.CollectionID))
log.Info("set release time", zap.Any("collectionID", r.req.CollectionID))
collection.setReleaseTime(r.req.Base.Timestamp)
// remove all flow graphs of the target collection
@ -746,7 +744,7 @@ func (r *releaseCollectionTask) releaseReplica(replica ReplicaInterface, replica
// remove all tSafes of the target collection
for _, channel := range channels {
log.Debug("Releasing tSafe in releaseCollectionTask...",
log.Info("Releasing tSafe in releaseCollectionTask...",
zap.Any("collectionID", r.req.CollectionID),
zap.Any("vDeltaChannel", channel),
)
@ -789,7 +787,7 @@ func (r *releasePartitionsTask) PreExecute(ctx context.Context) error {
}
func (r *releasePartitionsTask) Execute(ctx context.Context) error {
log.Debug("Execute release partition task",
log.Info("Execute release partition task",
zap.Any("collectionID", r.req.CollectionID),
zap.Any("partitionIDs", r.req.PartitionIDs))
@ -806,7 +804,7 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error {
if err != nil {
return fmt.Errorf("release partitions failed, collectionID = %d, err = %s", r.req.CollectionID, err)
}
log.Debug("start release partition", zap.Any("collectionID", r.req.CollectionID))
log.Info("start release partition", zap.Any("collectionID", r.req.CollectionID))
for _, id := range r.req.PartitionIDs {
// remove partition from streaming and historical
@ -828,7 +826,7 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error {
}
}
log.Debug("Release partition task done",
log.Info("Release partition task done",
zap.Any("collectionID", r.req.CollectionID),
zap.Any("partitionIDs", r.req.PartitionIDs))
return nil

View File

@ -141,7 +141,7 @@ func (queue *baseTaskQueue) PopActiveTask(tID UniqueID) task {
delete(queue.activeTasks, tID)
return t
}
log.Debug("queryNode", zap.Int64("cannot found ID in the active task list!", tID))
log.Info("queryNode", zap.Int64("cannot found ID in the active task list!", tID))
return nil
}

View File

@ -77,11 +77,11 @@ func (t *tSafeReplica) addTSafe(vChannel Channel) {
defer t.mu.Unlock()
if _, ok := t.tSafes[vChannel]; !ok {
t.tSafes[vChannel] = newTSafe(vChannel)
log.Debug("add tSafe done",
log.Info("add tSafe done",
zap.String("channel", vChannel),
)
} else {
log.Debug("tSafe has been existed",
log.Info("tSafe has been existed",
zap.String("channel", vChannel),
)
}
@ -91,7 +91,7 @@ func (t *tSafeReplica) removeTSafe(vChannel Channel) {
t.mu.Lock()
defer t.mu.Unlock()
log.Debug("remove tSafe replica",
log.Info("remove tSafe replica",
zap.String("vChannel", vChannel),
)
delete(t.tSafes, vChannel)