mirror of https://github.com/milvus-io/milvus.git
Fix the misleading log messages (#22606)
Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/22645/head
parent
11f1f4226a
commit
b3287ca5ec
|
@ -18,7 +18,6 @@ package querynode
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
@ -133,18 +132,21 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
|
||||
// internal helper function to subscribe delta channel
|
||||
func (l *loadSegmentsTask) watchDeltaChannel(deltaChannels []string) error {
|
||||
collectionID := l.req.CollectionID
|
||||
func (l *loadSegmentsTask) watchDeltaChannel(dmlChannels []string) error {
|
||||
var (
|
||||
collectionID = l.req.CollectionID
|
||||
vDeltaChannels []string
|
||||
VPDeltaChannels = make(map[string]string)
|
||||
)
|
||||
log := log.With(
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Strings("dmlChannels", dmlChannels),
|
||||
)
|
||||
var vDeltaChannels []string
|
||||
VPDeltaChannels := make(map[string]string)
|
||||
for _, v := range deltaChannels {
|
||||
for _, v := range dmlChannels {
|
||||
dc, err := funcutil.ConvertChannelName(v, Params.CommonCfg.RootCoordDml.GetValue(), Params.CommonCfg.RootCoordDelta.GetValue())
|
||||
if err != nil {
|
||||
log.Warn("watchDeltaChannels, failed to convert deltaChannel from dmlChannel",
|
||||
zap.String("DmlChannel", v),
|
||||
zap.String("dmlChannel", v),
|
||||
zap.Error(err),
|
||||
)
|
||||
return err
|
||||
|
@ -153,9 +155,7 @@ func (l *loadSegmentsTask) watchDeltaChannel(deltaChannels []string) error {
|
|||
vDeltaChannels = append(vDeltaChannels, dc)
|
||||
VPDeltaChannels[dc] = p
|
||||
}
|
||||
log.Info("Starting WatchDeltaChannels ...",
|
||||
zap.Strings("channels", vDeltaChannels),
|
||||
)
|
||||
log.Info("Starting WatchDeltaChannels ...", zap.Strings("deltaChannels", vDeltaChannels))
|
||||
|
||||
coll, err := l.node.metaReplica.getCollectionByID(collectionID)
|
||||
if err != nil {
|
||||
|
@ -165,14 +165,14 @@ func (l *loadSegmentsTask) watchDeltaChannel(deltaChannels []string) error {
|
|||
// add collection meta and fg with mutex protection.
|
||||
channel2FlowGraph, err := l.node.dataSyncService.addFlowGraphsForDeltaChannels(collectionID, vDeltaChannels, VPDeltaChannels)
|
||||
if err != nil {
|
||||
log.Warn("watchDeltaChannel, add flowGraph for deltaChannel failed", zap.Int64("collectionID", collectionID), zap.Strings("vDeltaChannels", vDeltaChannels), zap.Error(err))
|
||||
log.Warn("watchDeltaChannel, failed to add flowGraph for deltaChannels",
|
||||
zap.Strings("deltaChannels", vDeltaChannels),
|
||||
zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if len(channel2FlowGraph) == 0 {
|
||||
log.Warn("all delta channels have been added before",
|
||||
zap.Strings("deltaChannels", deltaChannels),
|
||||
)
|
||||
log.Warn("all delta channels have been added before", zap.Strings("deltaChannels", vDeltaChannels))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -189,7 +189,7 @@ func (l *loadSegmentsTask) watchDeltaChannel(deltaChannels []string) error {
|
|||
}
|
||||
}()
|
||||
|
||||
log.Info("watchDeltaChannel, add flowGraph for deltaChannel success", zap.Int64("collectionID", collectionID), zap.Strings("vDeltaChannels", vDeltaChannels))
|
||||
log.Info("watchDeltaChannel, add flowGraph for deltaChannel success", zap.Strings("deltaChannels", vDeltaChannels))
|
||||
|
||||
// create tSafe
|
||||
for _, channel := range vDeltaChannels {
|
||||
|
@ -200,12 +200,15 @@ func (l *loadSegmentsTask) watchDeltaChannel(deltaChannels []string) error {
|
|||
for _, channel := range vDeltaChannels {
|
||||
dmlChannel, err := funcutil.ConvertChannelName(channel, Params.CommonCfg.RootCoordDelta.GetValue(), Params.CommonCfg.RootCoordDml.GetValue())
|
||||
if err != nil {
|
||||
log.Error("failed to convert delta channel to dml", zap.String("channel", channel), zap.Error(err))
|
||||
log.Error("failed to convert delta channel to dml", zap.String("deltaChannel", channel), zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
err = l.node.queryShardService.addQueryShard(collectionID, dmlChannel, l.req.GetReplicaID())
|
||||
if err != nil {
|
||||
log.Error("failed to add shard Service to query shard", zap.String("channel", channel), zap.Error(err))
|
||||
log.Error("failed to add shard Service to query shard",
|
||||
zap.String("dmlChannel", dmlChannel),
|
||||
zap.String("deltaChannel", channel),
|
||||
zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
@ -215,6 +218,6 @@ func (l *loadSegmentsTask) watchDeltaChannel(deltaChannels []string) error {
|
|||
fg.flowGraph.Start()
|
||||
}
|
||||
|
||||
log.Info("WatchDeltaChannels done", zap.Int64("collectionID", collectionID), zap.String("ChannelIDs", fmt.Sprintln(vDeltaChannels)))
|
||||
log.Info("WatchDeltaChannels done", zap.Strings("deltaChannels", vDeltaChannels))
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue