Refactor logs in proxy package. (#24936)

Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>
pull/24999/head
Enwei Jiao 2023-06-19 13:28:41 +08:00 committed by GitHub
parent e9f1515a0e
commit d143682d7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 85 additions and 148 deletions

View File

@ -372,7 +372,7 @@ func (m *MetaCache) GetPartitions(ctx context.Context, collectionName string) (m
return nil, err return nil, err
} }
metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
log.Debug("proxy", zap.Any("GetPartitions:partitions after update", partitions), zap.Any("collectionName", collectionName)) log.Debug("proxy", zap.Any("GetPartitions:partitions after update", partitions), zap.String("collectionName", collectionName))
ret := make(map[string]typeutil.UniqueID) ret := make(map[string]typeutil.UniqueID)
partInfo := m.collInfo[collectionName].partInfo partInfo := m.collInfo[collectionName].partInfo
for k, v := range partInfo { for k, v := range partInfo {
@ -414,7 +414,7 @@ func (m *MetaCache) GetPartitionInfo(ctx context.Context, collectionName string,
return nil, err return nil, err
} }
metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
log.Debug("proxy", zap.Any("GetPartitionID:partitions after update", partitions), zap.Any("collectionName", collectionName)) log.Debug("proxy", zap.Any("GetPartitionID:partitions after update", partitions), zap.String("collectionName", collectionName))
partInfo, ok = m.collInfo[collectionName].partInfo[partitionName] partInfo, ok = m.collInfo[collectionName].partInfo[partitionName]
if !ok { if !ok {
return nil, merr.WrapErrPartitionNotFound(partitionName) return nil, merr.WrapErrPartitionNotFound(partitionName)

View File

@ -259,16 +259,6 @@ func setRateGaugeByRateType(rateType internalpb.RateType, nodeID int64, collecti
} }
} }
// printRates logs the rate info.
func (rl *rateLimiter) printRates(rates []*internalpb.Rate) {
//fmt.Printf("RateLimiter set rates:\n---------------------------------\n")
//for _, r := range rates {
// fmt.Printf("%s -> %v\n", r.GetRt().String(), r.GetR())
//}
//fmt.Printf("---------------------------------\n")
log.Debug("RateLimiter setRates", zap.Any("rates", rates))
}
// registerLimiters register limiter for all rate types. // registerLimiters register limiter for all rate types.
func (rl *rateLimiter) registerLimiters(globalLevel bool) { func (rl *rateLimiter) registerLimiters(globalLevel bool) {
log := log.Ctx(context.TODO()).WithRateGroup("proxy.rateLimiter", 1.0, 60.0) log := log.Ctx(context.TODO()).WithRateGroup("proxy.rateLimiter", 1.0, 60.0)

View File

@ -68,7 +68,7 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context
log.Debug("PrivilegeInterceptor", zap.String("type", reflect.TypeOf(req).String())) log.Debug("PrivilegeInterceptor", zap.String("type", reflect.TypeOf(req).String()))
privilegeExt, err := funcutil.GetPrivilegeExtObj(req) privilegeExt, err := funcutil.GetPrivilegeExtObj(req)
if err != nil { if err != nil {
log.Debug("GetPrivilegeExtObj err", zap.Error(err)) log.Warn("GetPrivilegeExtObj err", zap.Error(err))
return ctx, nil return ctx, nil
} }
username, err := GetCurUserFromContext(ctx) username, err := GetCurUserFromContext(ctx)
@ -96,7 +96,7 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context
objectPrivilege := privilegeExt.ObjectPrivilege.String() objectPrivilege := privilegeExt.ObjectPrivilege.String()
policyInfo := strings.Join(globalMetaCache.GetPrivilegeInfo(ctx), ",") policyInfo := strings.Join(globalMetaCache.GetPrivilegeInfo(ctx), ",")
logWithCurrentRequestInfo := log.With(zap.String("username", username), zap.Strings("role_names", roleNames), log := log.With(zap.String("username", username), zap.Strings("role_names", roleNames),
zap.String("object_type", objectType), zap.String("object_privilege", objectPrivilege), zap.String("object_type", objectType), zap.String("object_privilege", objectPrivilege),
zap.Int32("object_index", objectNameIndex), zap.String("object_name", objectName), zap.Int32("object_index", objectNameIndex), zap.String("object_name", objectName),
zap.Int32("object_indexs", objectNameIndexs), zap.Strings("object_names", objectNames), zap.Int32("object_indexs", objectNameIndexs), zap.Strings("object_names", objectNames),
@ -109,7 +109,7 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context
casbinModel := templateModel.Copy() casbinModel := templateModel.Copy()
e, err := casbin.NewEnforcer(casbinModel, a) e, err := casbin.NewEnforcer(casbinModel, a)
if err != nil { if err != nil {
logWithCurrentRequestInfo.Warn("NewEnforcer fail", zap.String("policy", policy), zap.Error(err)) log.Warn("NewEnforcer fail", zap.String("policy", policy), zap.Error(err))
return ctx, err return ctx, err
} }
for _, roleName := range roleNames { for _, roleName := range roleNames {
@ -126,7 +126,7 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context
// handle the api which refers one resource // handle the api which refers one resource
permitObject, err := permitFunc(objectName) permitObject, err := permitFunc(objectName)
if err != nil { if err != nil {
logWithCurrentRequestInfo.Warn("fail to execute permit func", zap.String("name", objectName), zap.Error(err)) log.Warn("fail to execute permit func", zap.String("name", objectName), zap.Error(err))
return ctx, err return ctx, err
} }
if permitObject { if permitObject {
@ -140,7 +140,7 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context
for _, name := range objectNames { for _, name := range objectNames {
p, err := permitFunc(name) p, err := permitFunc(name)
if err != nil { if err != nil {
logWithCurrentRequestInfo.Warn("fail to execute permit func", zap.String("name", name), zap.Error(err)) log.Warn("fail to execute permit func", zap.String("name", name), zap.Error(err))
return ctx, err return ctx, err
} }
if !p { if !p {
@ -154,7 +154,7 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context
} }
} }
logWithCurrentRequestInfo.Info("permission deny", zap.String("policy", policy), zap.Strings("roles", roleNames)) log.Info("permission deny", zap.String("policy", policy), zap.Strings("roles", roleNames))
return ctx, status.Error(codes.PermissionDenied, fmt.Sprintf("%s: permission deny", objectPrivilege)) return ctx, status.Error(codes.PermissionDenied, fmt.Sprintf("%s: permission deny", objectPrivilege))
} }

View File

@ -189,7 +189,6 @@ func (node *Proxy) Init() error {
log.Info("init session for Proxy done") log.Info("init session for Proxy done")
node.factory.Init(Params) node.factory.Init(Params)
log.Debug("init parameters for factory", zap.String("role", typeutil.ProxyRole), zap.Any("parameters", Params.GetAll()))
accesslog.SetupAccseeLog(&Params.ProxyCfg.AccessLog, &Params.MinioCfg) accesslog.SetupAccseeLog(&Params.ProxyCfg.AccessLog, &Params.MinioCfg)
log.Debug("init access log for Proxy done") log.Debug("init access log for Proxy done")
@ -200,67 +199,58 @@ func (node *Proxy) Init() error {
} }
log.Info("Proxy init rateCollector done", zap.Int64("nodeID", paramtable.GetNodeID())) log.Info("Proxy init rateCollector done", zap.Int64("nodeID", paramtable.GetNodeID()))
log.Debug("create id allocator", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()))
idAllocator, err := allocator.NewIDAllocator(node.ctx, node.rootCoord, paramtable.GetNodeID()) idAllocator, err := allocator.NewIDAllocator(node.ctx, node.rootCoord, paramtable.GetNodeID())
if err != nil { if err != nil {
log.Warn("failed to create id allocator", log.Warn("failed to create id allocator",
zap.Error(err), zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()),
zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID())) zap.Error(err))
return err return err
} }
node.rowIDAllocator = idAllocator node.rowIDAllocator = idAllocator
log.Debug("create id allocator done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID())) log.Debug("create id allocator done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()))
log.Debug("create timestamp allocator", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()))
tsoAllocator, err := newTimestampAllocator(node.rootCoord, paramtable.GetNodeID()) tsoAllocator, err := newTimestampAllocator(node.rootCoord, paramtable.GetNodeID())
if err != nil { if err != nil {
log.Warn("failed to create timestamp allocator", log.Warn("failed to create timestamp allocator",
zap.Error(err), zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()),
zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID())) zap.Error(err))
return err return err
} }
node.tsoAllocator = tsoAllocator node.tsoAllocator = tsoAllocator
log.Debug("create timestamp allocator done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID())) log.Debug("create timestamp allocator done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()))
log.Debug("create segment id assigner", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()))
segAssigner, err := newSegIDAssigner(node.ctx, node.dataCoord, node.lastTick) segAssigner, err := newSegIDAssigner(node.ctx, node.dataCoord, node.lastTick)
if err != nil { if err != nil {
log.Warn("failed to create segment id assigner", log.Warn("failed to create segment id assigner",
zap.Error(err), zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()),
zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID())) zap.Error(err))
return err return err
} }
node.segAssigner = segAssigner node.segAssigner = segAssigner
node.segAssigner.PeerID = paramtable.GetNodeID() node.segAssigner.PeerID = paramtable.GetNodeID()
log.Debug("create segment id assigner done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID())) log.Debug("create segment id assigner done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()))
log.Debug("create channels manager", zap.String("role", typeutil.ProxyRole))
dmlChannelsFunc := getDmlChannelsFunc(node.ctx, node.rootCoord) dmlChannelsFunc := getDmlChannelsFunc(node.ctx, node.rootCoord)
chMgr := newChannelsMgrImpl(dmlChannelsFunc, defaultInsertRepackFunc, node.factory) chMgr := newChannelsMgrImpl(dmlChannelsFunc, defaultInsertRepackFunc, node.factory)
node.chMgr = chMgr node.chMgr = chMgr
log.Debug("create channels manager done", zap.String("role", typeutil.ProxyRole)) log.Debug("create channels manager done", zap.String("role", typeutil.ProxyRole))
log.Debug("create task scheduler", zap.String("role", typeutil.ProxyRole))
node.sched, err = newTaskScheduler(node.ctx, node.tsoAllocator, node.factory) node.sched, err = newTaskScheduler(node.ctx, node.tsoAllocator, node.factory)
if err != nil { if err != nil {
log.Warn("failed to create task scheduler", zap.Error(err), zap.String("role", typeutil.ProxyRole)) log.Warn("failed to create task scheduler", zap.String("role", typeutil.ProxyRole), zap.Error(err))
return err return err
} }
log.Debug("create task scheduler done", zap.String("role", typeutil.ProxyRole)) log.Debug("create task scheduler done", zap.String("role", typeutil.ProxyRole))
syncTimeTickInterval := Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond) / 2 syncTimeTickInterval := Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond) / 2
log.Debug("create channels time ticker",
zap.String("role", typeutil.ProxyRole), zap.Duration("syncTimeTickInterval", syncTimeTickInterval))
node.chTicker = newChannelsTimeTicker(node.ctx, Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond)/2, []string{}, node.sched.getPChanStatistics, tsoAllocator) node.chTicker = newChannelsTimeTicker(node.ctx, Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond)/2, []string{}, node.sched.getPChanStatistics, tsoAllocator)
log.Debug("create channels time ticker done", zap.String("role", typeutil.ProxyRole)) log.Debug("create channels time ticker done", zap.String("role", typeutil.ProxyRole), zap.Duration("syncTimeTickInterval", syncTimeTickInterval))
log.Debug("create metrics cache manager", zap.String("role", typeutil.ProxyRole))
node.metricsCacheManager = metricsinfo.NewMetricsCacheManager() node.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
log.Debug("create metrics cache manager done", zap.String("role", typeutil.ProxyRole)) log.Debug("create metrics cache manager done", zap.String("role", typeutil.ProxyRole))
log.Debug("init meta cache", zap.String("role", typeutil.ProxyRole))
if err := InitMetaCache(node.ctx, node.rootCoord, node.queryCoord, node.shardMgr); err != nil { if err := InitMetaCache(node.ctx, node.rootCoord, node.queryCoord, node.shardMgr); err != nil {
log.Warn("failed to init meta cache", zap.Error(err), zap.String("role", typeutil.ProxyRole)) log.Warn("failed to init meta cache", zap.String("role", typeutil.ProxyRole), zap.Error(err))
return err return err
} }
log.Debug("init meta cache done", zap.String("role", typeutil.ProxyRole)) log.Debug("init meta cache done", zap.String("role", typeutil.ProxyRole))
@ -351,30 +341,26 @@ func (node *Proxy) sendChannelsTimeTickLoop() {
// Start starts a proxy node. // Start starts a proxy node.
func (node *Proxy) Start() error { func (node *Proxy) Start() error {
log.Debug("start task scheduler", zap.String("role", typeutil.ProxyRole))
if err := node.sched.Start(); err != nil { if err := node.sched.Start(); err != nil {
log.Warn("failed to start task scheduler", zap.Error(err), zap.String("role", typeutil.ProxyRole)) log.Warn("failed to start task scheduler", zap.String("role", typeutil.ProxyRole), zap.Error(err))
return err return err
} }
log.Debug("start task scheduler done", zap.String("role", typeutil.ProxyRole)) log.Debug("start task scheduler done", zap.String("role", typeutil.ProxyRole))
log.Debug("start id allocator", zap.String("role", typeutil.ProxyRole))
if err := node.rowIDAllocator.Start(); err != nil { if err := node.rowIDAllocator.Start(); err != nil {
log.Warn("failed to start id allocator", zap.Error(err), zap.String("role", typeutil.ProxyRole)) log.Warn("failed to start id allocator", zap.String("role", typeutil.ProxyRole), zap.Error(err))
return err return err
} }
log.Debug("start id allocator done", zap.String("role", typeutil.ProxyRole)) log.Debug("start id allocator done", zap.String("role", typeutil.ProxyRole))
log.Debug("start segment id assigner", zap.String("role", typeutil.ProxyRole))
if err := node.segAssigner.Start(); err != nil { if err := node.segAssigner.Start(); err != nil {
log.Warn("failed to start segment id assigner", zap.Error(err), zap.String("role", typeutil.ProxyRole)) log.Warn("failed to start segment id assigner", zap.String("role", typeutil.ProxyRole), zap.Error(err))
return err return err
} }
log.Debug("start segment id assigner done", zap.String("role", typeutil.ProxyRole)) log.Debug("start segment id assigner done", zap.String("role", typeutil.ProxyRole))
log.Debug("start channels time ticker", zap.String("role", typeutil.ProxyRole))
if err := node.chTicker.start(); err != nil { if err := node.chTicker.start(); err != nil {
log.Warn("failed to start channels time ticker", zap.Error(err), zap.String("role", typeutil.ProxyRole)) log.Warn("failed to start channels time ticker", zap.String("role", typeutil.ProxyRole), zap.Error(err))
return err return err
} }
log.Debug("start channels time ticker done", zap.String("role", typeutil.ProxyRole)) log.Debug("start channels time ticker done", zap.String("role", typeutil.ProxyRole))

View File

@ -176,24 +176,21 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi
return t.arrangeVectorsByIntID(inputIds, dict, retrievedVectors) return t.arrangeVectorsByIntID(inputIds, dict, retrievedVectors)
} }
log := log.Ctx(ctx)
log.Debug("CalcDistance received", log.Debug("CalcDistance received",
zap.String("traceID", t.traceID),
zap.String("role", typeutil.ProxyRole), zap.String("role", typeutil.ProxyRole),
zap.String("metric", metric)) zap.String("metric", metric))
vectorsLeft := request.GetOpLeft().GetDataArray() vectorsLeft := request.GetOpLeft().GetDataArray()
opLeft := request.GetOpLeft().GetIdArray() opLeft := request.GetOpLeft().GetIdArray()
if opLeft != nil { if opLeft != nil {
log.Debug("OpLeft IdArray not empty, Get vectors by id", log.Debug("OpLeft IdArray not empty, Get vectors by id", zap.String("role", typeutil.ProxyRole))
zap.String("traceID", t.traceID),
zap.String("role", typeutil.ProxyRole))
result, err := t.queryFunc(opLeft) result, err := t.queryFunc(opLeft)
if err != nil { if err != nil {
log.Debug("Failed to get left vectors by id", log.Warn("Failed to get left vectors by id",
zap.Error(err), zap.String("role", typeutil.ProxyRole),
zap.String("traceID", t.traceID), zap.Error(err))
zap.String("role", typeutil.ProxyRole))
return &milvuspb.CalcDistanceResults{ return &milvuspb.CalcDistanceResults{
Status: &commonpb.Status{ Status: &commonpb.Status{
@ -204,15 +201,13 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi
} }
log.Debug("OpLeft IdArray not empty, Get vectors by id done", log.Debug("OpLeft IdArray not empty, Get vectors by id done",
zap.String("traceID", t.traceID),
zap.String("role", typeutil.ProxyRole)) zap.String("role", typeutil.ProxyRole))
vectorsLeft, err = arrangeFunc(opLeft, result.FieldsData) vectorsLeft, err = arrangeFunc(opLeft, result.FieldsData)
if err != nil { if err != nil {
log.Debug("Failed to re-arrange left vectors", log.Debug("Failed to re-arrange left vectors",
zap.Error(err), zap.String("role", typeutil.ProxyRole),
zap.String("traceID", t.traceID), zap.Error(err))
zap.String("role", typeutil.ProxyRole))
return &milvuspb.CalcDistanceResults{ return &milvuspb.CalcDistanceResults{
Status: &commonpb.Status{ Status: &commonpb.Status{
@ -223,14 +218,12 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi
} }
log.Debug("Re-arrange left vectors done", log.Debug("Re-arrange left vectors done",
zap.String("traceID", t.traceID),
zap.String("role", typeutil.ProxyRole)) zap.String("role", typeutil.ProxyRole))
} }
if vectorsLeft == nil { if vectorsLeft == nil {
msg := "Left vectors array is empty" msg := "Left vectors array is empty"
log.Debug(msg, log.Debug(msg,
zap.String("traceID", t.traceID),
zap.String("role", typeutil.ProxyRole)) zap.String("role", typeutil.ProxyRole))
return &milvuspb.CalcDistanceResults{ return &milvuspb.CalcDistanceResults{
@ -245,15 +238,13 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi
opRight := request.GetOpRight().GetIdArray() opRight := request.GetOpRight().GetIdArray()
if opRight != nil { if opRight != nil {
log.Debug("OpRight IdArray not empty, Get vectors by id", log.Debug("OpRight IdArray not empty, Get vectors by id",
zap.String("traceID", t.traceID),
zap.String("role", typeutil.ProxyRole)) zap.String("role", typeutil.ProxyRole))
result, err := t.queryFunc(opRight) result, err := t.queryFunc(opRight)
if err != nil { if err != nil {
log.Debug("Failed to get right vectors by id", log.Debug("Failed to get right vectors by id",
zap.Error(err), zap.String("role", typeutil.ProxyRole),
zap.String("traceID", t.traceID), zap.Error(err))
zap.String("role", typeutil.ProxyRole))
return &milvuspb.CalcDistanceResults{ return &milvuspb.CalcDistanceResults{
Status: &commonpb.Status{ Status: &commonpb.Status{
@ -264,15 +255,13 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi
} }
log.Debug("OpRight IdArray not empty, Get vectors by id done", log.Debug("OpRight IdArray not empty, Get vectors by id done",
zap.String("traceID", t.traceID),
zap.String("role", typeutil.ProxyRole)) zap.String("role", typeutil.ProxyRole))
vectorsRight, err = arrangeFunc(opRight, result.FieldsData) vectorsRight, err = arrangeFunc(opRight, result.FieldsData)
if err != nil { if err != nil {
log.Debug("Failed to re-arrange right vectors", log.Debug("Failed to re-arrange right vectors",
zap.Error(err), zap.String("role", typeutil.ProxyRole),
zap.String("traceID", t.traceID), zap.Error(err))
zap.String("role", typeutil.ProxyRole))
return &milvuspb.CalcDistanceResults{ return &milvuspb.CalcDistanceResults{
Status: &commonpb.Status{ Status: &commonpb.Status{
@ -283,15 +272,12 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi
} }
log.Debug("Re-arrange right vectors done", log.Debug("Re-arrange right vectors done",
zap.String("traceID", t.traceID),
zap.String("role", typeutil.ProxyRole)) zap.String("role", typeutil.ProxyRole))
} }
if vectorsRight == nil { if vectorsRight == nil {
msg := "Right vectors array is empty" msg := "Right vectors array is empty"
log.Debug(msg, log.Warn(msg, zap.String("role", typeutil.ProxyRole))
zap.String("traceID", t.traceID),
zap.String("role", typeutil.ProxyRole))
return &milvuspb.CalcDistanceResults{ return &milvuspb.CalcDistanceResults{
Status: &commonpb.Status{ Status: &commonpb.Status{
@ -303,9 +289,7 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi
if vectorsLeft.GetDim() != vectorsRight.GetDim() { if vectorsLeft.GetDim() != vectorsRight.GetDim() {
msg := "Vectors dimension is not equal" msg := "Vectors dimension is not equal"
log.Debug(msg, log.Debug(msg, zap.String("role", typeutil.ProxyRole))
zap.String("traceID", t.traceID),
zap.String("role", typeutil.ProxyRole))
return &milvuspb.CalcDistanceResults{ return &milvuspb.CalcDistanceResults{
Status: &commonpb.Status{ Status: &commonpb.Status{
@ -318,14 +302,14 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi
if vectorsLeft.GetFloatVector() != nil && vectorsRight.GetFloatVector() != nil { if vectorsLeft.GetFloatVector() != nil && vectorsRight.GetFloatVector() != nil {
distances, err := distance.CalcFloatDistance(vectorsLeft.GetDim(), vectorsLeft.GetFloatVector().GetData(), vectorsRight.GetFloatVector().GetData(), metric) distances, err := distance.CalcFloatDistance(vectorsLeft.GetDim(), vectorsLeft.GetFloatVector().GetData(), vectorsRight.GetFloatVector().GetData(), metric)
if err != nil { if err != nil {
log.Debug("Failed to CalcFloatDistance", log.Warn("Failed to CalcFloatDistance",
zap.Error(err),
zap.Int64("leftDim", vectorsLeft.GetDim()), zap.Int64("leftDim", vectorsLeft.GetDim()),
zap.Int("leftLen", len(vectorsLeft.GetFloatVector().GetData())), zap.Int("leftLen", len(vectorsLeft.GetFloatVector().GetData())),
zap.Int64("rightDim", vectorsRight.GetDim()), zap.Int64("rightDim", vectorsRight.GetDim()),
zap.Int("rightLen", len(vectorsRight.GetFloatVector().GetData())), zap.Int("rightLen", len(vectorsRight.GetFloatVector().GetData())),
zap.String("traceID", t.traceID), zap.String("traceID", t.traceID),
zap.String("role", typeutil.ProxyRole)) zap.String("role", typeutil.ProxyRole),
zap.Error(err))
return &milvuspb.CalcDistanceResults{ return &milvuspb.CalcDistanceResults{
Status: &commonpb.Status{ Status: &commonpb.Status{
@ -336,7 +320,6 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi
} }
log.Debug("CalcFloatDistance done", log.Debug("CalcFloatDistance done",
zap.Error(err),
zap.String("traceID", t.traceID), zap.String("traceID", t.traceID),
zap.String("role", typeutil.ProxyRole)) zap.String("role", typeutil.ProxyRole))
@ -354,13 +337,12 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi
hamming, err := distance.CalcHammingDistance(vectorsLeft.GetDim(), vectorsLeft.GetBinaryVector(), vectorsRight.GetBinaryVector()) hamming, err := distance.CalcHammingDistance(vectorsLeft.GetDim(), vectorsLeft.GetBinaryVector(), vectorsRight.GetBinaryVector())
if err != nil { if err != nil {
log.Debug("Failed to CalcHammingDistance", log.Debug("Failed to CalcHammingDistance",
zap.Error(err),
zap.Int64("leftDim", vectorsLeft.GetDim()), zap.Int64("leftDim", vectorsLeft.GetDim()),
zap.Int("leftLen", len(vectorsLeft.GetBinaryVector())), zap.Int("leftLen", len(vectorsLeft.GetBinaryVector())),
zap.Int64("rightDim", vectorsRight.GetDim()), zap.Int64("rightDim", vectorsRight.GetDim()),
zap.Int("rightLen", len(vectorsRight.GetBinaryVector())), zap.Int("rightLen", len(vectorsRight.GetBinaryVector())),
zap.String("traceID", t.traceID), zap.String("role", typeutil.ProxyRole),
zap.String("role", typeutil.ProxyRole)) zap.Error(err))
return &milvuspb.CalcDistanceResults{ return &milvuspb.CalcDistanceResults{
Status: &commonpb.Status{ Status: &commonpb.Status{
@ -371,9 +353,7 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi
} }
if metric == distance.HAMMING { if metric == distance.HAMMING {
log.Debug("CalcHammingDistance done", log.Debug("CalcHammingDistance done", zap.String("role", typeutil.ProxyRole))
zap.String("traceID", t.traceID),
zap.String("role", typeutil.ProxyRole))
return &milvuspb.CalcDistanceResults{ return &milvuspb.CalcDistanceResults{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success, Reason: ""}, Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success, Reason: ""},
@ -388,10 +368,9 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi
if metric == distance.TANIMOTO { if metric == distance.TANIMOTO {
tanimoto, err := distance.CalcTanimotoCoefficient(vectorsLeft.GetDim(), hamming) tanimoto, err := distance.CalcTanimotoCoefficient(vectorsLeft.GetDim(), hamming)
if err != nil { if err != nil {
log.Debug("Failed to CalcTanimotoCoefficient", log.Warn("Failed to CalcTanimotoCoefficient",
zap.Error(err), zap.String("role", typeutil.ProxyRole),
zap.String("traceID", t.traceID), zap.Error(err))
zap.String("role", typeutil.ProxyRole))
return &milvuspb.CalcDistanceResults{ return &milvuspb.CalcDistanceResults{
Status: &commonpb.Status{ Status: &commonpb.Status{
@ -402,7 +381,6 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi
} }
log.Debug("CalcTanimotoCoefficient done", log.Debug("CalcTanimotoCoefficient done",
zap.String("traceID", t.traceID),
zap.String("role", typeutil.ProxyRole)) zap.String("role", typeutil.ProxyRole))
return &milvuspb.CalcDistanceResults{ return &milvuspb.CalcDistanceResults{
@ -421,10 +399,9 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi
err = errors.New("cannot calculate distance between binary vectors and float vectors") err = errors.New("cannot calculate distance between binary vectors and float vectors")
} }
log.Debug("Failed to CalcDistance", log.Warn("Failed to CalcDistance",
zap.Error(err), zap.String("role", typeutil.ProxyRole),
zap.String("traceID", t.traceID), zap.Error(err))
zap.String("role", typeutil.ProxyRole))
return &milvuspb.CalcDistanceResults{ return &milvuspb.CalcDistanceResults{
Status: &commonpb.Status{ Status: &commonpb.Status{

View File

@ -312,10 +312,10 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) {
} }
log.Debug("send delete request to virtual channels", log.Debug("send delete request to virtual channels",
zap.String("collection", dt.deleteMsg.GetCollectionName()), zap.String("collectionName", dt.deleteMsg.GetCollectionName()),
zap.Int64("collection_id", collID), zap.Int64("collectionID", collID),
zap.Strings("virtual_channels", channelNames), zap.Strings("virtual_channels", channelNames),
zap.Int64("task_id", dt.ID()), zap.Int64("taskID", dt.ID()),
zap.Duration("prepare duration", tr.RecordSpan())) zap.Duration("prepare duration", tr.RecordSpan()))
err = stream.Produce(msgPack) err = stream.Produce(msgPack)

View File

@ -110,13 +110,13 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
collectionName := it.insertMsg.CollectionName collectionName := it.insertMsg.CollectionName
if err := validateCollectionName(collectionName); err != nil { if err := validateCollectionName(collectionName); err != nil {
log.Info("valid collection name failed", zap.String("collectionName", collectionName), zap.Error(err)) log.Warn("valid collection name failed", zap.String("collectionName", collectionName), zap.Error(err))
return err return err
} }
schema, err := globalMetaCache.GetCollectionSchema(ctx, collectionName) schema, err := globalMetaCache.GetCollectionSchema(ctx, collectionName)
if err != nil { if err != nil {
log.Error("get collection schema from global meta cache failed", zap.String("collectionName", collectionName), zap.Error(err)) log.Warn("get collection schema from global meta cache failed", zap.String("collectionName", collectionName), zap.Error(err))
return err return err
} }
it.schema = schema it.schema = schema
@ -161,7 +161,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
it.result.IDs, err = checkPrimaryFieldData(it.schema, it.result, it.insertMsg, true) it.result.IDs, err = checkPrimaryFieldData(it.schema, it.result, it.insertMsg, true)
log := log.Ctx(ctx).With(zap.String("collectionName", collectionName)) log := log.Ctx(ctx).With(zap.String("collectionName", collectionName))
if err != nil { if err != nil {
log.Error("check primary field data and hash primary key failed", log.Warn("check primary field data and hash primary key failed",
zap.Error(err)) zap.Error(err))
return err return err
} }
@ -183,7 +183,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
fieldSchema, _ := typeutil.GetPartitionKeyFieldSchema(it.schema) fieldSchema, _ := typeutil.GetPartitionKeyFieldSchema(it.schema)
it.partitionKeys, err = getPartitionKeyFieldData(fieldSchema, it.insertMsg) it.partitionKeys, err = getPartitionKeyFieldData(fieldSchema, it.insertMsg)
if err != nil { if err != nil {
log.Info("get partition keys from insert request failed", zap.String("collection name", collectionName), zap.Error(err)) log.Warn("get partition keys from insert request failed", zap.String("collection name", collectionName), zap.Error(err))
return err return err
} }
} else { } else {
@ -196,7 +196,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
} }
if err := validatePartitionTag(partitionTag, true); err != nil { if err := validatePartitionTag(partitionTag, true); err != nil {
log.Info("valid partition name failed", zap.String("partition name", partitionTag), zap.Error(err)) log.Warn("valid partition name failed", zap.String("partition name", partitionTag), zap.Error(err))
return err return err
} }
} }
@ -233,7 +233,7 @@ func (it *insertTask) Execute(ctx context.Context) error {
channelNames, err := it.chMgr.getVChannels(collID) channelNames, err := it.chMgr.getVChannels(collID)
if err != nil { if err != nil {
log.Ctx(ctx).Error("get vChannels failed", log.Ctx(ctx).Warn("get vChannels failed",
zap.Int64("collectionID", collID), zap.Int64("collectionID", collID),
zap.Error(err)) zap.Error(err))
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
@ -242,9 +242,9 @@ func (it *insertTask) Execute(ctx context.Context) error {
} }
log.Ctx(ctx).Debug("send insert request to virtual channels", log.Ctx(ctx).Debug("send insert request to virtual channels",
zap.String("collection", it.insertMsg.GetCollectionName()), zap.String("collectionName", it.insertMsg.GetCollectionName()),
zap.String("partition", it.insertMsg.GetPartitionName()), zap.String("partition", it.insertMsg.GetPartitionName()),
zap.Int64("collection_id", collID), zap.Int64("collectionID", collID),
zap.Strings("virtual_channels", channelNames), zap.Strings("virtual_channels", channelNames),
zap.Int64("task_id", it.ID()), zap.Int64("task_id", it.ID()),
zap.Duration("get cache duration", getCacheDur), zap.Duration("get cache duration", getCacheDur),
@ -258,7 +258,7 @@ func (it *insertTask) Execute(ctx context.Context) error {
msgPack, err = repackInsertDataWithPartitionKey(it.TraceCtx(), channelNames, it.partitionKeys, it.insertMsg, it.result, it.idAllocator, it.segIDAssigner) msgPack, err = repackInsertDataWithPartitionKey(it.TraceCtx(), channelNames, it.partitionKeys, it.insertMsg, it.result, it.idAllocator, it.segIDAssigner)
} }
if err != nil { if err != nil {
log.Error("assign segmentID and repack insert data failed", log.Warn("assign segmentID and repack insert data failed",
zap.Int64("collectionID", collID), zap.Int64("collectionID", collID),
zap.Error(err)) zap.Error(err))
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError

View File

@ -260,7 +260,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
collID, err := globalMetaCache.GetCollectionID(ctx, collectionName) collID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
if err != nil { if err != nil {
log.Warn("Failed to get collection id.") log.Warn("Failed to get collection id.", zap.String("collectionName", collectionName), zap.Error(err))
return err return err
} }
t.CollectionID = collID t.CollectionID = collID
@ -268,7 +268,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
t.partitionKeyMode, err = isPartitionKeyMode(ctx, collectionName) t.partitionKeyMode, err = isPartitionKeyMode(ctx, collectionName)
if err != nil { if err != nil {
log.Warn("check partition key mode failed", zap.Int64("collectionID", t.CollectionID)) log.Warn("check partition key mode failed", zap.Int64("collectionID", t.CollectionID), zap.Error(err))
return err return err
} }
if t.partitionKeyMode && len(t.request.GetPartitionNames()) != 0 { if t.partitionKeyMode && len(t.request.GetPartitionNames()) != 0 {
@ -485,7 +485,7 @@ func (t *queryTask) queryShard(ctx context.Context, nodeID int64, qn types.Query
return errInvalidShardLeaders return errInvalidShardLeaders
} }
if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("QueryNode query result error") log.Warn("QueryNode query result error", zap.Any("errorCode", result.GetStatus().GetErrorCode()), zap.String("reason", result.GetStatus().GetReason()))
return fmt.Errorf("fail to Query, QueryNode ID = %d, reason=%s", nodeID, result.GetStatus().GetReason()) return fmt.Errorf("fail to Query, QueryNode ID = %d, reason=%s", nodeID, result.GetStatus().GetReason())
} }

View File

@ -449,7 +449,7 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) {
}() }()
if err != nil { if err != nil {
span.RecordError(err) span.RecordError(err)
log.Ctx(ctx).Error("Failed to pre-execute task: " + err.Error()) log.Ctx(ctx).Warn("Failed to pre-execute task: " + err.Error())
return return
} }
@ -457,7 +457,7 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) {
err = t.Execute(ctx) err = t.Execute(ctx)
if err != nil { if err != nil {
span.RecordError(err) span.RecordError(err)
log.Ctx(ctx).Error("Failed to execute task: ", zap.Error(err)) log.Ctx(ctx).Warn("Failed to execute task: ", zap.Error(err))
return return
} }
@ -466,7 +466,7 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) {
if err != nil { if err != nil {
span.RecordError(err) span.RecordError(err)
log.Ctx(ctx).Error("Failed to post-execute task: ", zap.Error(err)) log.Ctx(ctx).Warn("Failed to post-execute task: ", zap.Error(err))
return return
} }
} }

View File

@ -878,22 +878,9 @@ func reduceSearchResultData(ctx context.Context, subSearchResultData []*schemapb
ret.Results.Scores[k] *= -1 ret.Results.Scores[k] *= -1
} }
} }
// printSearchResultData(ret.Results, "proxy reduce result")
return ret, nil return ret, nil
} }
// func printSearchResultData(data *schemapb.SearchResultData, header string) {
// size := len(data.GetIds().GetIntId().GetData())
// if size != len(data.Scores) {
// log.Error("SearchResultData length mis-match")
// }
// log.Debug("==== SearchResultData ====",
// zap.String("header", header), zap.Int64("nq", data.NumQueries), zap.Int64("topk", data.TopK))
// for i := 0; i < size; i++ {
// log.Debug("", zap.Int("i", i), zap.Int64("id", data.GetIds().GetIntId().Data[i]), zap.Float32("score", data.Scores[i]))
// }
// }
func (t *searchTask) TraceCtx() context.Context { func (t *searchTask) TraceCtx() context.Context {
return t.ctx return t.ctx
} }

View File

@ -140,28 +140,26 @@ func (g *getStatisticsTask) PreExecute(ctx context.Context) error {
// check if collection/partitions are loaded into query node // check if collection/partitions are loaded into query node
loaded, unloaded, err := checkFullLoaded(ctx, g.qc, g.collectionName, partIDs) loaded, unloaded, err := checkFullLoaded(ctx, g.qc, g.collectionName, partIDs)
log := log.Ctx(ctx)
if err != nil { if err != nil {
g.fromDataCoord = true g.fromDataCoord = true
g.unloadedPartitionIDs = partIDs g.unloadedPartitionIDs = partIDs
log.Ctx(ctx).Debug("checkFullLoaded failed, try get statistics from DataCoord", log.Info("checkFullLoaded failed, try get statistics from DataCoord", zap.Error(err))
zap.Error(err))
return nil return nil
} }
if len(unloaded) > 0 { if len(unloaded) > 0 {
g.fromDataCoord = true g.fromDataCoord = true
g.unloadedPartitionIDs = unloaded g.unloadedPartitionIDs = unloaded
log.Debug("some partitions has not been loaded, try get statistics from DataCoord", log.Info("some partitions has not been loaded, try get statistics from DataCoord",
zap.String("collection", g.collectionName), zap.String("collection", g.collectionName),
zap.Int64s("unloaded partitions", unloaded), zap.Int64s("unloaded partitions", unloaded))
zap.Error(err))
} }
if len(loaded) > 0 { if len(loaded) > 0 {
g.fromQueryNode = true g.fromQueryNode = true
g.loadedPartitionIDs = loaded g.loadedPartitionIDs = loaded
log.Debug("some partitions has been loaded, try get statistics from QueryNode", log.Info("some partitions has been loaded, try get statistics from QueryNode",
zap.String("collection", g.collectionName), zap.String("collection", g.collectionName),
zap.Int64s("loaded partitions", loaded), zap.Int64s("loaded partitions", loaded))
zap.Error(err))
} }
return nil return nil
} }
@ -228,8 +226,7 @@ func (g *getStatisticsTask) PostExecute(ctx context.Context) error {
Stats: result, Stats: result,
} }
log.Info("get statistics post execute done", log.Debug("get statistics post execute done", zap.Any("result", result))
zap.Any("result", result))
return nil return nil
} }

View File

@ -176,14 +176,14 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error {
it.result.IDs, err = checkPrimaryFieldData(it.schema, it.result, it.upsertMsg.InsertMsg, false) it.result.IDs, err = checkPrimaryFieldData(it.schema, it.result, it.upsertMsg.InsertMsg, false)
log := log.Ctx(ctx).With(zap.String("collectionName", it.upsertMsg.InsertMsg.CollectionName)) log := log.Ctx(ctx).With(zap.String("collectionName", it.upsertMsg.InsertMsg.CollectionName))
if err != nil { if err != nil {
log.Error("check primary field data and hash primary key failed when upsert", log.Warn("check primary field data and hash primary key failed when upsert",
zap.Error(err)) zap.Error(err))
return err return err
} }
// set field ID to insert field data // set field ID to insert field data
err = fillFieldIDBySchema(it.upsertMsg.InsertMsg.GetFieldsData(), it.schema) err = fillFieldIDBySchema(it.upsertMsg.InsertMsg.GetFieldsData(), it.schema)
if err != nil { if err != nil {
log.Error("insert set fieldID to fieldData failed when upsert", log.Warn("insert set fieldID to fieldData failed when upsert",
zap.Error(err)) zap.Error(err))
return err return err
} }
@ -192,7 +192,7 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error {
fieldSchema, _ := typeutil.GetPartitionKeyFieldSchema(it.schema) fieldSchema, _ := typeutil.GetPartitionKeyFieldSchema(it.schema)
it.partitionKeys, err = getPartitionKeyFieldData(fieldSchema, it.upsertMsg.InsertMsg) it.partitionKeys, err = getPartitionKeyFieldData(fieldSchema, it.upsertMsg.InsertMsg)
if err != nil { if err != nil {
log.Info("get partition keys from insert request failed", log.Warn("get partition keys from insert request failed",
zap.String("collectionName", collectionName), zap.String("collectionName", collectionName),
zap.Error(err)) zap.Error(err))
return err return err
@ -200,7 +200,7 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error {
} else { } else {
partitionTag := it.upsertMsg.InsertMsg.PartitionName partitionTag := it.upsertMsg.InsertMsg.PartitionName
if err = validatePartitionTag(partitionTag, true); err != nil { if err = validatePartitionTag(partitionTag, true); err != nil {
log.Error("valid partition name failed", zap.String("partition name", partitionTag), zap.Error(err)) log.Warn("valid partition name failed", zap.String("partition name", partitionTag), zap.Error(err))
return err return err
} }
} }
@ -241,12 +241,12 @@ func (it *upsertTask) deletePreExecute(ctx context.Context) error {
// partition name could be defaultPartitionName or name specified by sdk // partition name could be defaultPartitionName or name specified by sdk
partName := it.upsertMsg.DeleteMsg.PartitionName partName := it.upsertMsg.DeleteMsg.PartitionName
if err := validatePartitionTag(partName, true); err != nil { if err := validatePartitionTag(partName, true); err != nil {
log.Info("Invalid partition name", zap.String("partitionName", partName), zap.Error(err)) log.Warn("Invalid partition name", zap.String("partitionName", partName), zap.Error(err))
return err return err
} }
partID, err := globalMetaCache.GetPartitionID(ctx, collName, partName) partID, err := globalMetaCache.GetPartitionID(ctx, collName, partName)
if err != nil { if err != nil {
log.Info("Failed to get partition id", zap.String("collectionName", collName), zap.String("partitionName", partName), zap.Error(err)) log.Warn("Failed to get partition id", zap.String("collectionName", collName), zap.String("partitionName", partName), zap.Error(err))
return err return err
} }
it.upsertMsg.DeleteMsg.PartitionID = partID it.upsertMsg.DeleteMsg.PartitionID = partID
@ -279,7 +279,7 @@ func (it *upsertTask) PreExecute(ctx context.Context) error {
schema, err := globalMetaCache.GetCollectionSchema(ctx, collectionName) schema, err := globalMetaCache.GetCollectionSchema(ctx, collectionName)
if err != nil { if err != nil {
log.Info("Failed to get collection schema", log.Warn("Failed to get collection schema",
zap.String("collectionName", collectionName), zap.String("collectionName", collectionName),
zap.Error(err)) zap.Error(err))
return err return err
@ -337,20 +337,20 @@ func (it *upsertTask) PreExecute(ctx context.Context) error {
} }
err = it.insertPreExecute(ctx) err = it.insertPreExecute(ctx)
if err != nil { if err != nil {
log.Info("Fail to insertPreExecute", zap.Error(err)) log.Warn("Fail to insertPreExecute", zap.Error(err))
return err return err
} }
err = it.deletePreExecute(ctx) err = it.deletePreExecute(ctx)
if err != nil { if err != nil {
log.Info("Fail to deletePreExecute", zap.Error(err)) log.Warn("Fail to deletePreExecute", zap.Error(err))
return err return err
} }
it.result.DeleteCnt = it.upsertMsg.DeleteMsg.NumRows it.result.DeleteCnt = it.upsertMsg.DeleteMsg.NumRows
it.result.InsertCnt = int64(it.upsertMsg.InsertMsg.NumRows) it.result.InsertCnt = int64(it.upsertMsg.InsertMsg.NumRows)
if it.result.DeleteCnt != it.result.InsertCnt { if it.result.DeleteCnt != it.result.InsertCnt {
log.Error("DeleteCnt and InsertCnt are not the same when upsert", log.Info("DeleteCnt and InsertCnt are not the same when upsert",
zap.Int64("DeleteCnt", it.result.DeleteCnt), zap.Int64("DeleteCnt", it.result.DeleteCnt),
zap.Int64("InsertCnt", it.result.InsertCnt)) zap.Int64("InsertCnt", it.result.InsertCnt))
} }
@ -380,7 +380,7 @@ func (it *upsertTask) insertExecute(ctx context.Context, msgPack *msgstream.MsgP
getMsgStreamDur := tr.RecordSpan() getMsgStreamDur := tr.RecordSpan()
channelNames, err := it.chMgr.getVChannels(collID) channelNames, err := it.chMgr.getVChannels(collID)
if err != nil { if err != nil {
log.Error("get vChannels failed when insertExecute", log.Warn("get vChannels failed when insertExecute",
zap.Error(err)) zap.Error(err))
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
it.result.Status.Reason = err.Error() it.result.Status.Reason = err.Error()
@ -404,7 +404,7 @@ func (it *upsertTask) insertExecute(ctx context.Context, msgPack *msgstream.MsgP
insertMsgPack, err = repackInsertDataWithPartitionKey(it.TraceCtx(), channelNames, it.partitionKeys, it.upsertMsg.InsertMsg, it.result, it.idAllocator, it.segIDAssigner) insertMsgPack, err = repackInsertDataWithPartitionKey(it.TraceCtx(), channelNames, it.partitionKeys, it.upsertMsg.InsertMsg, it.result, it.idAllocator, it.segIDAssigner)
} }
if err != nil { if err != nil {
log.Error("assign segmentID and repack insert data failed when insertExecute", log.Warn("assign segmentID and repack insert data failed when insertExecute",
zap.Error(err)) zap.Error(err))
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
it.result.Status.Reason = err.Error() it.result.Status.Reason = err.Error()
@ -519,13 +519,13 @@ func (it *upsertTask) Execute(ctx context.Context) (err error) {
} }
err = it.insertExecute(ctx, msgPack) err = it.insertExecute(ctx, msgPack)
if err != nil { if err != nil {
log.Info("Fail to insertExecute", zap.Error(err)) log.Warn("Fail to insertExecute", zap.Error(err))
return err return err
} }
err = it.deleteExecute(ctx, msgPack) err = it.deleteExecute(ctx, msgPack)
if err != nil { if err != nil {
log.Info("Fail to deleteExecute", zap.Error(err)) log.Warn("Fail to deleteExecute", zap.Error(err))
return err return err
} }