Substitute traceid for msgid in rpc (#20450)

Signed-off-by: lixinguo <xinguo.li@zilliz.com>

Signed-off-by: lixinguo <xinguo.li@zilliz.com>
Co-authored-by: lixinguo <xinguo.li@zilliz.com>
pull/20566/head
smellthemoon 2022-11-14 15:29:06 +08:00 committed by GitHub
parent 6a2e458f90
commit 7325b3e1c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 907 additions and 1251 deletions

File diff suppressed because it is too large Load Diff

View File

@ -1286,7 +1286,8 @@ func (lct *loadCollectionTask) OnEnqueue() error {
}
func (lct *loadCollectionTask) PreExecute(ctx context.Context) error {
log.Debug("loadCollectionTask PreExecute", zap.String("role", typeutil.ProxyRole), zap.Int64("msgID", lct.Base.MsgID))
log.Ctx(ctx).Debug("loadCollectionTask PreExecute",
zap.String("role", typeutil.ProxyRole))
lct.Base.MsgType = commonpb.MsgType_LoadCollection
lct.Base.SourceID = paramtable.GetNodeID()
@ -1305,8 +1306,13 @@ func (lct *loadCollectionTask) PreExecute(ctx context.Context) error {
}
func (lct *loadCollectionTask) Execute(ctx context.Context) (err error) {
log.Debug("loadCollectionTask Execute", zap.String("role", typeutil.ProxyRole), zap.Int64("msgID", lct.Base.MsgID))
collID, err := globalMetaCache.GetCollectionID(ctx, lct.CollectionName)
log := log.Ctx(ctx).With(
zap.String("role", typeutil.ProxyRole),
zap.Int64("collectionID", collID))
log.Debug("loadCollectionTask Execute")
if err != nil {
return err
}
@ -1340,7 +1346,7 @@ func (lct *loadCollectionTask) Execute(ctx context.Context) (err error) {
}
if !hasVecIndex {
errMsg := fmt.Sprintf("there is no vector index on collection: %s, please create index firstly", lct.LoadCollectionRequest.CollectionName)
log.Ctx(ctx).Error(errMsg)
log.Error(errMsg)
return errors.New(errMsg)
}
request := &querypb.LoadCollectionRequest{
@ -1354,8 +1360,7 @@ func (lct *loadCollectionTask) Execute(ctx context.Context) (err error) {
ReplicaNumber: lct.ReplicaNumber,
FieldIndexID: fieldIndexIDs,
}
log.Debug("send LoadCollectionRequest to query coordinator", zap.String("role", typeutil.ProxyRole),
zap.Int64("msgID", request.Base.MsgID), zap.Int64("collectionID", request.CollectionID),
log.Debug("send LoadCollectionRequest to query coordinator",
zap.Any("schema", request.Schema))
lct.result, err = lct.queryCoord.LoadCollection(ctx, request)
if err != nil {
@ -1365,8 +1370,13 @@ func (lct *loadCollectionTask) Execute(ctx context.Context) (err error) {
}
func (lct *loadCollectionTask) PostExecute(ctx context.Context) error {
log.Debug("loadCollectionTask PostExecute", zap.String("role", typeutil.ProxyRole),
zap.Int64("msgID", lct.Base.MsgID))
collID, err := globalMetaCache.GetCollectionID(ctx, lct.CollectionName)
log.Ctx(ctx).Debug("loadCollectionTask PostExecute",
zap.String("role", typeutil.ProxyRole),
zap.Int64("collectionID", collID))
if err != nil {
return err
}
return nil
}

View File

@ -132,7 +132,7 @@ func (it *insertTask) checkPrimaryFieldData() error {
primaryFieldSchema, err := typeutil.GetPrimaryFieldSchema(it.schema)
if err != nil {
log.Error("get primary field schema failed", zap.String("collection name", it.CollectionName), zap.Any("schema", it.schema), zap.Error(err))
log.Error("get primary field schema failed", zap.String("collectionName", it.CollectionName), zap.Any("schema", it.schema), zap.Error(err))
return err
}
@ -141,7 +141,7 @@ func (it *insertTask) checkPrimaryFieldData() error {
if !primaryFieldSchema.AutoID {
primaryFieldData, err = typeutil.GetPrimaryFieldData(it.GetFieldsData(), primaryFieldSchema)
if err != nil {
log.Error("get primary field data failed", zap.String("collection name", it.CollectionName), zap.Error(err))
log.Error("get primary field data failed", zap.String("collectionName", it.CollectionName), zap.Error(err))
return err
}
} else {
@ -152,7 +152,7 @@ func (it *insertTask) checkPrimaryFieldData() error {
// if autoID == true, currently only support autoID for int64 PrimaryField
primaryFieldData, err = autoGenPrimaryFieldData(primaryFieldSchema, it.RowIDs)
if err != nil {
log.Error("generate primary field data failed when autoID == true", zap.String("collection name", it.CollectionName), zap.Error(err))
log.Error("generate primary field data failed when autoID == true", zap.String("collectionName", it.CollectionName), zap.Error(err))
return err
}
// if autoID == true, set the primary field data
@ -162,7 +162,7 @@ func (it *insertTask) checkPrimaryFieldData() error {
// parse primaryFieldData to result.IDs, and as returned primary keys
it.result.IDs, err = parsePrimaryFieldData2IDs(primaryFieldData)
if err != nil {
log.Error("parse primary field data to IDs failed", zap.String("collection name", it.CollectionName), zap.Error(err))
log.Error("parse primary field data to IDs failed", zap.String("collectionName", it.CollectionName), zap.Error(err))
return err
}
@ -185,7 +185,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
collectionName := it.CollectionName
if err := validateCollectionName(collectionName); err != nil {
log.Error("valid collection name failed", zap.String("collection name", collectionName), zap.Error(err))
log.Error("valid collection name failed", zap.String("collectionName", collectionName), zap.Error(err))
return err
}
@ -197,7 +197,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
collSchema, err := globalMetaCache.GetCollectionSchema(ctx, collectionName)
if err != nil {
log.Error("get collection schema from global meta cache failed", zap.String("collection name", collectionName), zap.Error(err))
log.Error("get collection schema from global meta cache failed", zap.String("collectionName", collectionName), zap.Error(err))
return err
}
it.schema = collSchema
@ -232,25 +232,29 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
// check primaryFieldData whether autoID is true or not
// set rowIDs as primary data if autoID == true
err = it.checkPrimaryFieldData()
log := log.Ctx(ctx).With(zap.String("collectionName", collectionName))
if err != nil {
log.Error("check primary field data and hash primary key failed", zap.Int64("msgID", it.Base.MsgID), zap.String("collection name", collectionName), zap.Error(err))
log.Error("check primary field data and hash primary key failed",
zap.Error(err))
return err
}
// set field ID to insert field data
err = fillFieldIDBySchema(it.GetFieldsData(), collSchema)
if err != nil {
log.Error("set fieldID to fieldData failed", zap.Int64("msgID", it.Base.MsgID), zap.String("collection name", collectionName), zap.Error(err))
log.Error("set fieldID to fieldData failed",
zap.Error(err))
return err
}
// check that all field's number rows are equal
if err = it.CheckAligned(); err != nil {
log.Error("field data is not aligned", zap.Int64("msgID", it.Base.MsgID), zap.String("collection name", collectionName), zap.Error(err))
log.Error("field data is not aligned",
zap.Error(err))
return err
}
log.Debug("Proxy Insert PreExecute done", zap.Int64("msgID", it.Base.MsgID), zap.String("collection name", collectionName))
log.Debug("Proxy Insert PreExecute done")
return nil
}
@ -444,7 +448,9 @@ func (it *insertTask) Execute(ctx context.Context) error {
channelNames, err := it.chMgr.getVChannels(collID)
if err != nil {
log.Error("get vChannels failed", zap.Int64("msgID", it.Base.MsgID), zap.Int64("collectionID", collID), zap.Error(err))
log.Ctx(ctx).Error("get vChannels failed",
zap.Int64("collectionID", collID),
zap.Error(err))
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
it.result.Status.Reason = err.Error()
return err
@ -461,12 +467,16 @@ func (it *insertTask) Execute(ctx context.Context) error {
// assign segmentID for insert data and repack data by segmentID
msgPack, err := it.assignSegmentID(channelNames)
if err != nil {
log.Error("assign segmentID and repack insert data failed", zap.Int64("msgID", it.Base.MsgID), zap.Int64("collectionID", collID), zap.Error(err))
log.Error("assign segmentID and repack insert data failed",
zap.Int64("collectionID", collID),
zap.Error(err))
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
it.result.Status.Reason = err.Error()
return err
}
log.Debug("assign segmentID for insert data success", zap.Int64("msgID", it.Base.MsgID), zap.Int64("collectionID", collID), zap.String("collection name", it.CollectionName))
log.Debug("assign segmentID for insert data success",
zap.Int64("collectionID", collID),
zap.String("collectionName", it.CollectionName))
tr.Record("assign segment id")
err = stream.Produce(msgPack)
if err != nil {
@ -477,7 +487,8 @@ func (it *insertTask) Execute(ctx context.Context) error {
sendMsgDur := tr.Record("send insert request to dml channel")
metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel).Observe(float64(sendMsgDur.Milliseconds()))
log.Debug("Proxy Insert Execute done", zap.Int64("msgID", it.Base.MsgID), zap.String("collection name", collectionName))
log.Debug("Proxy Insert Execute done",
zap.String("collectionName", collectionName))
return nil
}

View File

@ -170,45 +170,51 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
collectionName := t.request.CollectionName
t.collectionName = collectionName
if err := validateCollectionName(collectionName); err != nil {
log.Ctx(ctx).Warn("Invalid collection name.", zap.String("collectionName", collectionName),
zap.Int64("msgID", t.ID()), zap.String("requestType", "query"))
log.Ctx(ctx).Warn("Invalid collectionName.",
zap.String("collectionName", collectionName),
zap.String("requestType", "query"))
return err
}
log.Ctx(ctx).Debug("Validate collection name.", zap.Any("collectionName", collectionName),
zap.Int64("msgID", t.ID()), zap.Any("requestType", "query"))
log.Ctx(ctx).Debug("Validate collectionName.",
zap.Any("collectionName", collectionName),
zap.Any("requestType", "query"))
collID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
if err != nil {
log.Ctx(ctx).Warn("Failed to get collection id.", zap.Any("collectionName", collectionName),
zap.Int64("msgID", t.ID()), zap.Any("requestType", "query"))
log.Ctx(ctx).Warn("Failed to get collection id.",
zap.Any("collectionName", collectionName),
zap.Any("requestType", "query"))
return err
}
t.CollectionID = collID
log.Ctx(ctx).Debug("Get collection ID by name",
zap.Int64("collectionID", t.CollectionID), zap.String("collection name", collectionName),
zap.Int64("msgID", t.ID()), zap.Any("requestType", "query"))
zap.Int64("collectionID", t.CollectionID),
zap.String("collectionName", collectionName),
zap.Any("requestType", "query"))
for _, tag := range t.request.PartitionNames {
if err := validatePartitionTag(tag, false); err != nil {
log.Ctx(ctx).Warn("invalid partition name", zap.String("partition name", tag),
zap.Int64("msgID", t.ID()), zap.Any("requestType", "query"))
log.Ctx(ctx).Warn("invalid partition name",
zap.String("partition name", tag),
zap.Any("requestType", "query"))
return err
}
}
log.Ctx(ctx).Debug("Validate partition names.",
zap.Int64("msgID", t.ID()), zap.Any("requestType", "query"))
zap.Any("requestType", "query"))
t.RetrieveRequest.PartitionIDs, err = getPartitionIDs(ctx, collectionName, t.request.GetPartitionNames())
if err != nil {
log.Ctx(ctx).Warn("failed to get partitions in collection.", zap.String("collection name", collectionName),
log.Ctx(ctx).Warn("failed to get partitions in collection.", zap.String("collectionName", collectionName),
zap.Error(err),
zap.Int64("msgID", t.ID()), zap.Any("requestType", "query"))
zap.Any("requestType", "query"))
return err
}
log.Ctx(ctx).Debug("Get partitions in collection.", zap.Any("collectionName", collectionName),
zap.Int64("msgID", t.ID()), zap.Any("requestType", "query"))
log.Ctx(ctx).Debug("Get partitions in collection.",
zap.Any("collectionName", collectionName),
zap.Any("requestType", "query"))
queryParams, err := parseQueryParams(t.request.GetQueryParams())
if err != nil {
@ -249,8 +255,9 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
if err != nil {
return err
}
log.Ctx(ctx).Debug("translate output fields", zap.Any("OutputFields", t.request.OutputFields),
zap.Int64("msgID", t.ID()), zap.Any("requestType", "query"))
log.Ctx(ctx).Debug("translate output fields",
zap.Any("OutputFields", t.request.OutputFields),
zap.Any("requestType", "query"))
outputFieldIDs, err := translateToOutputFieldIDs(t.request.GetOutputFields(), schema)
if err != nil {
@ -258,8 +265,9 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
}
t.RetrieveRequest.OutputFieldsId = outputFieldIDs
plan.OutputFieldIds = outputFieldIDs
log.Ctx(ctx).Debug("translate output fields to field ids", zap.Any("OutputFieldsID", t.OutputFieldsId),
zap.Int64("msgID", t.ID()), zap.Any("requestType", "query"))
log.Ctx(ctx).Debug("translate output fields to field ids",
zap.Any("OutputFieldsID", t.OutputFieldsId),
zap.Any("requestType", "query"))
t.RetrieveRequest.SerializedExprPlan, err = proto.Marshal(plan)
if err != nil {
@ -287,8 +295,9 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
t.DbID = 0 // TODO
log.Ctx(ctx).Debug("Query PreExecute done.",
zap.Int64("msgID", t.ID()), zap.Any("requestType", "query"),
zap.Uint64("guarantee_ts", guaranteeTs), zap.Uint64("travel_ts", t.GetTravelTimestamp()),
zap.Any("requestType", "query"),
zap.Uint64("guarantee_ts", guaranteeTs),
zap.Uint64("travel_ts", t.GetTravelTimestamp()),
zap.Uint64("timeout_ts", t.GetTimeoutTimestamp()))
return nil
}
@ -314,7 +323,7 @@ func (t *queryTask) Execute(ctx context.Context) error {
err := executeQuery(WithCache)
if errors.Is(err, errInvalidShardLeaders) || funcutil.IsGrpcErr(err) || errors.Is(err, grpcclient.ErrConnect) {
log.Ctx(ctx).Warn("invalid shard leaders cache, updating shardleader caches and retry search",
zap.Int64("msgID", t.ID()), zap.Error(err))
zap.Error(err))
return executeQuery(WithoutCache)
}
if err != nil {
@ -322,7 +331,7 @@ func (t *queryTask) Execute(ctx context.Context) error {
}
log.Ctx(ctx).Debug("Query Execute done.",
zap.Int64("msgID", t.ID()), zap.Any("requestType", "query"))
zap.Any("requestType", "query"))
return nil
}
@ -339,11 +348,11 @@ func (t *queryTask) PostExecute(ctx context.Context) error {
log.Ctx(ctx).Warn("proxy", zap.Int64("Query: wait to finish failed, timeout!, msgID:", t.ID()))
return nil
default:
log.Ctx(ctx).Debug("all queries are finished or canceled", zap.Int64("msgID", t.ID()))
log.Ctx(ctx).Debug("all queries are finished or canceled")
close(t.resultBuf)
for res := range t.resultBuf {
t.toReduceResults = append(t.toReduceResults, res)
log.Ctx(ctx).Debug("proxy receives one query result", zap.Int64("sourceID", res.GetBase().GetSourceID()), zap.Any("msgID", t.ID()))
log.Ctx(ctx).Debug("proxy receives one query result", zap.Int64("sourceID", res.GetBase().GetSourceID()))
}
}
@ -361,7 +370,8 @@ func (t *queryTask) PostExecute(ctx context.Context) error {
ErrorCode: commonpb.ErrorCode_Success,
}
} else {
log.Ctx(ctx).Warn("Query result is nil", zap.Int64("msgID", t.ID()), zap.Any("requestType", "query"))
log.Ctx(ctx).Warn("Query result is nil",
zap.Any("requestType", "query"))
t.result.Status = &commonpb.Status{
ErrorCode: commonpb.ErrorCode_EmptyCollection,
Reason: "empty collection", // TODO
@ -382,7 +392,8 @@ func (t *queryTask) PostExecute(ctx context.Context) error {
}
}
}
log.Ctx(ctx).Debug("Query PostExecute done", zap.Int64("msgID", t.ID()), zap.String("requestType", "query"))
log.Ctx(ctx).Debug("Query PostExecute done",
zap.String("requestType", "query"))
return nil
}
@ -395,8 +406,9 @@ func (t *queryTask) queryShard(ctx context.Context, nodeID int64, qn types.Query
result, err := qn.Query(ctx, req)
if err != nil {
log.Ctx(ctx).Warn("QueryNode query return error", zap.Int64("msgID", t.ID()),
zap.Int64("nodeID", nodeID), zap.Strings("channels", channelIDs), zap.Error(err))
log.Ctx(ctx).Warn("QueryNode query return error",
zap.Int64("nodeID", nodeID),
zap.Strings("channels", channelIDs), zap.Error(err))
return err
}
if result.GetStatus().GetErrorCode() == commonpb.ErrorCode_NotShardLeader {
@ -404,12 +416,15 @@ func (t *queryTask) queryShard(ctx context.Context, nodeID int64, qn types.Query
return errInvalidShardLeaders
}
if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Ctx(ctx).Warn("QueryNode query result error", zap.Int64("msgID", t.ID()), zap.Int64("nodeID", nodeID),
log.Ctx(ctx).Warn("QueryNode query result error",
zap.Int64("nodeID", nodeID),
zap.String("reason", result.GetStatus().GetReason()))
return fmt.Errorf("fail to Query, QueryNode ID = %d, reason=%s", nodeID, result.GetStatus().GetReason())
}
log.Ctx(ctx).Debug("get query result", zap.Int64("msgID", t.ID()), zap.Int64("nodeID", nodeID), zap.Strings("channelIDs", channelIDs))
log.Ctx(ctx).Debug("get query result",
zap.Int64("nodeID", nodeID),
zap.Strings("channelIDs", channelIDs))
t.resultBuf <- result
return nil
}

View File

@ -432,7 +432,7 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) {
"ID": t.ID(),
})
defer span.Finish()
traceID, _, _ := trace.InfoFromSpan(span)
log := log.Ctx(ctx)
span.LogFields(oplog.Int64("scheduler process AddActiveTask", t.ID()))
q.AddActiveTask(t)
@ -450,8 +450,7 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) {
}()
if err != nil {
trace.LogError(span, err)
log.Error("Failed to pre-execute task: "+err.Error(),
zap.String("traceID", traceID))
log.Error("Failed to pre-execute task: " + err.Error())
return
}
@ -459,8 +458,7 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) {
err = t.Execute(ctx)
if err != nil {
trace.LogError(span, err)
log.Error("Failed to execute task: ", zap.Error(err),
zap.String("traceID", traceID))
log.Error("Failed to execute task: ", zap.Error(err))
return
}
@ -469,8 +467,7 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) {
if err != nil {
trace.LogError(span, err)
log.Error("Failed to post-execute task: ", zap.Error(err),
zap.String("traceID", traceID))
log.Error("Failed to post-execute task: ", zap.Error(err))
return
}
}

View File

@ -298,7 +298,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
if err != nil {
return err
}
log.Ctx(ctx).Debug("translate output fields", zap.Int64("msgID", t.ID()),
log.Ctx(ctx).Debug("translate output fields",
zap.Strings("output fields", t.request.GetOutputFields()))
if t.request.GetDslType() == commonpb.DslType_BoolExprV1 {
@ -315,12 +315,12 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
plan, err := planparserv2.CreateSearchPlan(t.schema, t.request.Dsl, annsField, queryInfo)
if err != nil {
log.Ctx(ctx).Warn("failed to create query plan", zap.Error(err), zap.Int64("msgID", t.ID()),
log.Ctx(ctx).Warn("failed to create query plan", zap.Error(err),
zap.String("dsl", t.request.Dsl), // may be very large if large term passed.
zap.String("anns field", annsField), zap.Any("query info", queryInfo))
return fmt.Errorf("failed to create query plan: %v", err)
}
log.Ctx(ctx).Debug("create query plan", zap.Int64("msgID", t.ID()),
log.Ctx(ctx).Debug("create query plan",
zap.String("dsl", t.request.Dsl), // may be very large if large term passed.
zap.String("anns field", annsField), zap.Any("query info", queryInfo))
@ -340,7 +340,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
return err
}
log.Ctx(ctx).Debug("Proxy::searchTask::PreExecute", zap.Int64("msgID", t.ID()),
log.Ctx(ctx).Debug("Proxy::searchTask::PreExecute",
zap.Int64s("plan.OutputFieldIds", plan.GetOutputFieldIds()),
zap.String("plan", plan.String())) // may be very large if large term passed.
}
@ -377,7 +377,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
}
t.SearchRequest.Nq = nq
log.Ctx(ctx).Debug("search PreExecute done.", zap.Int64("msgID", t.ID()),
log.Ctx(ctx).Debug("search PreExecute done.",
zap.Uint64("travel_ts", travelTimestamp), zap.Uint64("guarantee_ts", guaranteeTs),
zap.Uint64("timeout_ts", t.SearchRequest.GetTimeoutTimestamp()))
@ -408,14 +408,14 @@ func (t *searchTask) Execute(ctx context.Context) error {
err := executeSearch(WithCache)
if errors.Is(err, errInvalidShardLeaders) || funcutil.IsGrpcErr(err) || errors.Is(err, grpcclient.ErrConnect) {
log.Ctx(ctx).Warn("first search failed, updating shardleader caches and retry search",
zap.Int64("msgID", t.ID()), zap.Error(err))
zap.Error(err))
return executeSearch(WithoutCache)
}
if err != nil {
return fmt.Errorf("fail to search on all shard leaders, err=%v", err)
}
log.Ctx(ctx).Debug("Search Execute done.", zap.Int64("msgID", t.ID()))
log.Ctx(ctx).Debug("Search Execute done.")
return nil
}
@ -448,14 +448,15 @@ func (t *searchTask) PostExecute(ctx context.Context) error {
metrics.SearchLabel).Observe(float64(tr.RecordSpan().Milliseconds()))
if len(validSearchResults) <= 0 {
log.Ctx(ctx).Warn("search result is empty", zap.Int64("msgID", t.ID()))
log.Ctx(ctx).Warn("search result is empty")
t.fillInEmptyResult(Nq)
return nil
}
// Reduce all search results
log.Ctx(ctx).Debug("proxy search post execute reduce", zap.Int64("msgID", t.ID()), zap.Int("number of valid search results", len(validSearchResults)))
log.Ctx(ctx).Debug("proxy search post execute reduce",
zap.Int("number of valid search results", len(validSearchResults)))
tr.CtxRecord(ctx, "reduceResultStart")
primaryFieldSchema, err := typeutil.GetPrimaryFieldSchema(t.schema)
if err != nil {
@ -472,7 +473,7 @@ func (t *searchTask) PostExecute(ctx context.Context) error {
t.result.CollectionName = t.collectionName
t.fillInFieldInfo()
log.Ctx(ctx).Debug("Search post execute done", zap.Int64("msgID", t.ID()))
log.Ctx(ctx).Debug("Search post execute done")
return nil
}
@ -484,17 +485,21 @@ func (t *searchTask) searchShard(ctx context.Context, nodeID int64, qn types.Que
}
result, err := qn.Search(ctx, req)
if err != nil {
log.Ctx(ctx).Warn("QueryNode search return error", zap.Int64("msgID", t.ID()),
zap.Int64("nodeID", nodeID), zap.Strings("channels", channelIDs), zap.Error(err))
log.Ctx(ctx).Warn("QueryNode search return error",
zap.Int64("nodeID", nodeID),
zap.Strings("channels", channelIDs),
zap.Error(err))
return err
}
if result.GetStatus().GetErrorCode() == commonpb.ErrorCode_NotShardLeader {
log.Ctx(ctx).Warn("QueryNode is not shardLeader", zap.Int64("msgID", t.ID()),
zap.Int64("nodeID", nodeID), zap.Strings("channels", channelIDs))
log.Ctx(ctx).Warn("QueryNode is not shardLeader",
zap.Int64("nodeID", nodeID),
zap.Strings("channels", channelIDs))
return errInvalidShardLeaders
}
if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Ctx(ctx).Warn("QueryNode search result error", zap.Int64("msgID", t.ID()), zap.Int64("nodeID", nodeID),
log.Ctx(ctx).Warn("QueryNode search result error",
zap.Int64("nodeID", nodeID),
zap.String("reason", result.GetStatus().GetReason()))
return fmt.Errorf("fail to Search, QueryNode ID=%d, reason=%s", nodeID, result.GetStatus().GetReason())
}
@ -534,14 +539,15 @@ func (t *searchTask) fillInFieldInfo() {
func (t *searchTask) collectSearchResults(ctx context.Context) error {
select {
case <-t.TraceCtx().Done():
log.Ctx(ctx).Warn("search task wait to finish timeout!", zap.Int64("msgID", t.ID()))
log.Ctx(ctx).Warn("search task wait to finish timeout!")
return fmt.Errorf("search task wait to finish timeout, msgID=%d", t.ID())
default:
log.Ctx(ctx).Debug("all searches are finished or canceled", zap.Int64("msgID", t.ID()))
log.Ctx(ctx).Debug("all searches are finished or canceled")
close(t.resultBuf)
for res := range t.resultBuf {
t.toReduceResults = append(t.toReduceResults, res)
log.Ctx(ctx).Debug("proxy receives one search result", zap.Int64("sourceID", res.GetBase().GetSourceID()), zap.Int64("msgID", t.ID()))
log.Ctx(ctx).Debug("proxy receives one search result",
zap.Int64("sourceID", res.GetBase().GetSourceID()))
}
}
return nil

View File

@ -146,18 +146,25 @@ func (g *getStatisticsTask) PreExecute(ctx context.Context) error {
if err != nil {
g.fromDataCoord = true
g.unloadedPartitionIDs = partIDs
log.Debug("checkFullLoaded failed, try get statistics from DataCoord", zap.Int64("msgID", g.ID()), zap.Error(err))
log.Ctx(ctx).Debug("checkFullLoaded failed, try get statistics from DataCoord",
zap.Error(err))
return nil
}
if len(unloaded) > 0 {
g.fromDataCoord = true
g.unloadedPartitionIDs = unloaded
log.Debug("some partitions has not been loaded, try get statistics from DataCoord", zap.Int64("msgID", g.ID()), zap.String("collection", g.collectionName), zap.Int64s("unloaded partitions", unloaded), zap.Error(err))
log.Debug("some partitions has not been loaded, try get statistics from DataCoord",
zap.String("collection", g.collectionName),
zap.Int64s("unloaded partitions", unloaded),
zap.Error(err))
}
if len(loaded) > 0 {
g.fromQueryNode = true
g.loadedPartitionIDs = loaded
log.Debug("some partitions has been loaded, try get statistics from QueryNode", zap.Int64("msgID", g.ID()), zap.String("collection", g.collectionName), zap.Int64s("loaded partitions", loaded), zap.Error(err))
log.Debug("some partitions has been loaded, try get statistics from QueryNode",
zap.String("collection", g.collectionName),
zap.Int64s("loaded partitions", loaded),
zap.Error(err))
}
return nil
}
@ -175,14 +182,14 @@ func (g *getStatisticsTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
log.Debug("get collection statistics from QueryNode execute done", zap.Int64("msgID", g.ID()))
log.Ctx(ctx).Debug("get collection statistics from QueryNode execute done")
}
if g.fromDataCoord {
err := g.getStatisticsFromDataCoord(ctx)
if err != nil {
return err
}
log.Debug("get collection statistics from DataCoord execute done", zap.Int64("msgID", g.ID()))
log.Debug("get collection statistics from DataCoord execute done")
}
return nil
}
@ -198,14 +205,15 @@ func (g *getStatisticsTask) PostExecute(ctx context.Context) error {
if g.fromQueryNode {
select {
case <-g.TraceCtx().Done():
log.Debug("wait to finish timeout!", zap.Int64("msgID", g.ID()))
log.Debug("wait to finish timeout!")
return nil
default:
log.Debug("all get statistics are finished or canceled", zap.Int64("msgID", g.ID()))
log.Debug("all get statistics are finished or canceled")
close(g.resultBuf)
for res := range g.resultBuf {
g.toReduceResults = append(g.toReduceResults, res)
log.Debug("proxy receives one get statistic response", zap.Int64("sourceID", res.GetBase().GetSourceID()), zap.Int64("msgID", g.ID()))
log.Debug("proxy receives one get statistic response",
zap.Int64("sourceID", res.GetBase().GetSourceID()))
}
}
}
@ -224,7 +232,8 @@ func (g *getStatisticsTask) PostExecute(ctx context.Context) error {
Stats: result,
}
log.Info("get statistics post execute done", zap.Int64("msgID", g.ID()), zap.Any("result", result))
log.Info("get statistics post execute done",
zap.Any("result", result))
return nil
}
@ -257,7 +266,6 @@ func (g *getStatisticsTask) getStatisticsFromDataCoord(ctx context.Context) erro
func (g *getStatisticsTask) getStatisticsFromQueryNode(ctx context.Context) error {
g.GetStatisticsRequest.PartitionIDs = g.loadedPartitionIDs
executeGetStatistics := func(withCache bool) error {
shard2Leaders, err := globalMetaCache.GetShards(ctx, withCache, g.collectionName)
if err != nil {
@ -265,7 +273,9 @@ func (g *getStatisticsTask) getStatisticsFromQueryNode(ctx context.Context) erro
}
g.resultBuf = make(chan *internalpb.GetStatisticsResponse, len(shard2Leaders))
if err := g.statisticShardPolicy(ctx, g.shardMgr, g.getStatisticsShard, shard2Leaders); err != nil {
log.Warn("failed to get statistics", zap.Int64("msgID", g.ID()), zap.Error(err), zap.String("Shards", fmt.Sprintf("%v", shard2Leaders)))
log.Warn("failed to get statistics",
zap.Error(err),
zap.String("Shards", fmt.Sprintf("%v", shard2Leaders)))
return err
}
return nil
@ -274,7 +284,7 @@ func (g *getStatisticsTask) getStatisticsFromQueryNode(ctx context.Context) erro
err := executeGetStatistics(WithCache)
if errors.Is(err, errInvalidShardLeaders) || funcutil.IsGrpcErr(err) || errors.Is(err, grpcclient.ErrConnect) {
log.Warn("first get statistics failed, updating shard leader caches and retry",
zap.Int64("msgID", g.ID()), zap.Error(err))
zap.Error(err))
err = executeGetStatistics(WithoutCache)
}
if err != nil {
@ -292,18 +302,22 @@ func (g *getStatisticsTask) getStatisticsShard(ctx context.Context, nodeID int64
}
result, err := qn.GetStatistics(ctx, req)
if err != nil {
log.Warn("QueryNode statistic return error", zap.Int64("msgID", g.ID()),
zap.Int64("nodeID", nodeID), zap.Strings("channels", channelIDs), zap.Error(err))
log.Warn("QueryNode statistic return error",
zap.Int64("nodeID", nodeID),
zap.Strings("channels", channelIDs),
zap.Error(err))
return err
}
if result.GetStatus().GetErrorCode() == commonpb.ErrorCode_NotShardLeader {
log.Warn("QueryNode is not shardLeader", zap.Int64("msgID", g.ID()),
zap.Int64("nodeID", nodeID), zap.Strings("channels", channelIDs))
log.Warn("QueryNode is not shardLeader",
zap.Int64("nodeID", nodeID),
zap.Strings("channels", channelIDs))
return errInvalidShardLeaders
}
if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("QueryNode statistic result error", zap.Int64("msgID", g.ID()),
zap.Int64("nodeID", nodeID), zap.String("reason", result.GetStatus().GetReason()))
log.Warn("QueryNode statistic result error",
zap.Int64("nodeID", nodeID),
zap.String("reason", result.GetStatus().GetReason()))
return fmt.Errorf("fail to get statistic, QueryNode ID=%d, reason=%s", nodeID, result.GetStatus().GetReason())
}
g.resultBuf <- result

View File

@ -41,6 +41,7 @@ import (
type Job interface {
MsgID() int64
CollectionID() int64
Context() context.Context
// PreExecute does checks, DO NOT persists any thing within this stage,
PreExecute() error
// Execute processes the request
@ -78,6 +79,10 @@ func (job *BaseJob) CollectionID() int64 {
return job.collectionID
}
func (job *BaseJob) Context() context.Context {
return job.ctx
}
func (job *BaseJob) Error() error {
return job.err
}

View File

@ -147,8 +147,7 @@ func (scheduler *Scheduler) processQueue(collection int64, queue jobQueue) {
}
func (scheduler *Scheduler) process(job Job) {
log := log.With(
zap.Int64("msgID", job.MsgID()),
log := log.Ctx(job.Context()).With(
zap.Int64("collectionID", job.CollectionID()))
defer func() {

View File

@ -46,9 +46,7 @@ var (
)
func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
log := log.With(zap.Int64("msgID", req.GetBase().GetMsgID()))
log.Info("show collections request received", zap.Int64s("collections", req.GetCollectionIDs()))
log.Ctx(ctx).Info("show collections request received", zap.Int64s("collections", req.GetCollectionIDs()))
if s.status.Load() != commonpb.StateCode_Healthy {
msg := "failed to show collections"
@ -102,8 +100,7 @@ func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectio
}
func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
log := log.With(
zap.Int64("msgID", req.GetBase().GetMsgID()),
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
@ -180,8 +177,7 @@ func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions
}
func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
log := log.With(
zap.Int64("msgID", req.GetBase().GetMsgID()),
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
@ -221,8 +217,7 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection
}
func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
log := log.With(
zap.Int64("msgID", req.GetBase().GetMsgID()),
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
@ -259,8 +254,7 @@ func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl
}
func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) {
log := log.With(
zap.Int64("msgID", req.GetBase().GetMsgID()),
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
@ -299,8 +293,7 @@ func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions
}
func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) {
log := log.With(
zap.Int64("msgID", req.GetBase().GetMsgID()),
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
@ -343,8 +336,7 @@ func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart
}
func (s *Server) GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error) {
log := log.With(
zap.Int64("msgID", req.GetBase().GetMsgID()),
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
@ -412,8 +404,7 @@ func (s *Server) GetPartitionStates(ctx context.Context, req *querypb.GetPartiti
}
func (s *Server) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
log := log.With(
zap.Int64("msgID", req.GetBase().GetMsgID()),
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
@ -453,8 +444,7 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfo
}
func (s *Server) LoadBalance(ctx context.Context, req *querypb.LoadBalanceRequest) (*commonpb.Status, error) {
log := log.With(
zap.Int64("msgID", req.GetBase().GetMsgID()),
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
@ -505,9 +495,7 @@ func (s *Server) LoadBalance(ctx context.Context, req *querypb.LoadBalanceReques
}
func (s *Server) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
log := log.With(
zap.Int64("msgID", req.GetBase().GetMsgID()),
)
log := log.Ctx(ctx)
log.Info("show configurations request received", zap.String("pattern", req.GetPattern()))
@ -540,7 +528,7 @@ func (s *Server) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon
}
func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
log := log.With(zap.Int64("msgID", req.Base.GetMsgID()))
log := log.Ctx(ctx)
log.Debug("get metrics request received",
zap.String("metricType", req.GetRequest()))
@ -587,8 +575,7 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
}
func (s *Server) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
log := log.With(
zap.Int64("msgID", req.Base.GetMsgID()),
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)
@ -630,8 +617,7 @@ func (s *Server) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasReque
}
func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeadersRequest) (*querypb.GetShardLeadersResponse, error) {
log := log.With(
zap.Int64("msgID", req.Base.GetMsgID()),
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
)

View File

@ -39,6 +39,7 @@ import (
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
@ -96,8 +97,7 @@ func (node *QueryNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.Stri
}
func (node *QueryNode) GetStatistics(ctx context.Context, req *querypb.GetStatisticsRequest) (*internalpb.GetStatisticsResponse, error) {
log.Debug("received GetStatisticsRequest",
zap.Int64("msgID", req.GetReq().GetBase().GetMsgID()),
log.Ctx(ctx).Debug("received GetStatisticsRequest",
zap.Strings("vChannels", req.GetDmlChannels()),
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()),
@ -162,9 +162,8 @@ func (node *QueryNode) getStatisticsWithDmlChannel(ctx context.Context, req *que
return failRet, nil
}
msgID := req.GetReq().GetBase().GetMsgID()
log.Debug("received GetStatisticRequest",
zap.Int64("msgID", msgID),
traceID, _, _ := trace.InfoFromContext(ctx)
log.Ctx(ctx).Debug("received GetStatisticRequest",
zap.Bool("fromShardLeader", req.GetFromShardLeader()),
zap.String("vChannel", dmlChannel),
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
@ -179,7 +178,6 @@ func (node *QueryNode) getStatisticsWithDmlChannel(ctx context.Context, req *que
qs, err := node.queryShardService.getQueryShard(dmlChannel)
if err != nil {
log.Warn("get statistics failed, failed to get query shard",
zap.Int64("msgID", msgID),
zap.String("dml channel", dmlChannel),
zap.Error(err))
failRet.Status.ErrorCode = commonpb.ErrorCode_NotShardLeader
@ -188,7 +186,6 @@ func (node *QueryNode) getStatisticsWithDmlChannel(ctx context.Context, req *que
}
log.Debug("start do statistics",
zap.Int64("msgID", msgID),
zap.Bool("fromShardLeader", req.GetFromShardLeader()),
zap.String("vChannel", dmlChannel),
zap.Int64s("segmentIDs", req.GetSegmentIDs()))
@ -222,8 +219,8 @@ func (node *QueryNode) getStatisticsWithDmlChannel(ctx context.Context, req *que
return failRet, nil
}
tr.Elapse(fmt.Sprintf("do statistics done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
tr.Elapse(fmt.Sprintf("do statistics done, traceID = %s, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
traceID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
return historicalTask.Ret, nil
}
@ -256,14 +253,17 @@ func (node *QueryNode) getStatisticsWithDmlChannel(ctx context.Context, req *que
// shard leader dispatches request to its shard cluster
results, errCluster = cluster.GetStatistics(statisticCtx, req, withStreaming)
if errCluster != nil {
log.Warn("get statistics on cluster failed", zap.Int64("msgID", msgID), zap.Int64("collectionID", req.Req.GetCollectionID()), zap.Error(errCluster))
log.Warn("get statistics on cluster failed",
zap.Int64("collectionID", req.Req.GetCollectionID()),
zap.Error(errCluster))
failRet.Status.Reason = errCluster.Error()
return failRet, nil
}
tr.Elapse(fmt.Sprintf("start reduce statistic result, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
tr.Elapse(fmt.Sprintf("start reduce statistic result, traceID = %s, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
traceID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
results = append(results, streamingResult)
ret, err := reduceStatisticResponse(results)
@ -271,10 +271,11 @@ func (node *QueryNode) getStatisticsWithDmlChannel(ctx context.Context, req *que
failRet.Status.Reason = err.Error()
return failRet, nil
}
log.Debug("reduce statistic result done", zap.Int64("msgID", msgID), zap.Any("results", ret))
log.Debug("reduce statistic result done",
zap.Any("results", ret))
tr.Elapse(fmt.Sprintf("do statistics done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
tr.Elapse(fmt.Sprintf("do statistics done, traceID = %s, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
traceID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
return ret, nil
@ -699,8 +700,7 @@ func (node *QueryNode) isHealthy() bool {
// Search performs replica search tasks.
func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
log.Debug("Received SearchRequest",
zap.Int64("msgID", req.GetReq().GetBase().GetMsgID()),
log.Ctx(ctx).Debug("Received SearchRequest",
zap.Strings("vChannels", req.GetDmlChannels()),
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()),
@ -779,7 +779,6 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se
msgID := req.GetReq().GetBase().GetMsgID()
log.Ctx(ctx).Debug("Received SearchRequest",
zap.Int64("msgID", msgID),
zap.Bool("fromShardLeader", req.GetFromShardLeader()),
zap.String("vChannel", dmlChannel),
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
@ -794,7 +793,6 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se
qs, err := node.queryShardService.getQueryShard(dmlChannel)
if err != nil {
log.Ctx(ctx).Warn("Search failed, failed to get query shard",
zap.Int64("msgID", msgID),
zap.String("dml channel", dmlChannel),
zap.Error(err))
failRet.Status.ErrorCode = commonpb.ErrorCode_NotShardLeader
@ -803,7 +801,6 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se
}
log.Ctx(ctx).Debug("start do search",
zap.Int64("msgID", msgID),
zap.Bool("fromShardLeader", req.GetFromShardLeader()),
zap.String("vChannel", dmlChannel),
zap.Int64s("segmentIDs", req.GetSegmentIDs()))
@ -884,7 +881,7 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se
// shard leader dispatches request to its shard cluster
results, errCluster = cluster.Search(searchCtx, req, withStreaming)
if errCluster != nil {
log.Ctx(ctx).Warn("search cluster failed", zap.Int64("msgID", msgID), zap.Int64("collectionID", req.Req.GetCollectionID()), zap.Error(errCluster))
log.Ctx(ctx).Warn("search cluster failed", zap.Int64("collectionID", req.Req.GetCollectionID()), zap.Error(errCluster))
failRet.Status.Reason = errCluster.Error()
return failRet, nil
}
@ -930,9 +927,8 @@ func (node *QueryNode) queryWithDmlChannel(ctx context.Context, req *querypb.Que
return failRet, nil
}
msgID := req.GetReq().GetBase().GetMsgID()
traceID, _, _ := trace.InfoFromContext(ctx)
log.Ctx(ctx).Debug("Received QueryRequest",
zap.Int64("msgID", msgID),
zap.Bool("fromShardLeader", req.GetFromShardLeader()),
zap.String("vChannel", dmlChannel),
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
@ -946,13 +942,14 @@ func (node *QueryNode) queryWithDmlChannel(ctx context.Context, req *querypb.Que
qs, err := node.queryShardService.getQueryShard(dmlChannel)
if err != nil {
log.Ctx(ctx).Warn("Query failed, failed to get query shard", zap.Int64("msgID", msgID), zap.String("dml channel", dmlChannel), zap.Error(err))
log.Ctx(ctx).Warn("Query failed, failed to get query shard",
zap.String("dml channel", dmlChannel),
zap.Error(err))
failRet.Status.Reason = err.Error()
return failRet, nil
}
log.Ctx(ctx).Debug("start do query",
zap.Int64("msgID", msgID),
zap.Bool("fromShardLeader", req.GetFromShardLeader()),
zap.String("vChannel", dmlChannel),
zap.Int64s("segmentIDs", req.GetSegmentIDs()))
@ -975,8 +972,8 @@ func (node *QueryNode) queryWithDmlChannel(ctx context.Context, req *querypb.Que
return failRet, nil
}
tr.CtxElapse(ctx, fmt.Sprintf("do query done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
tr.CtxElapse(ctx, fmt.Sprintf("do query done, traceID = %s, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
traceID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
metrics.QueryNodeSQLatencyInQueue.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()),
@ -1028,13 +1025,15 @@ func (node *QueryNode) queryWithDmlChannel(ctx context.Context, req *querypb.Que
// shard leader dispatches request to its shard cluster
results, errCluster = cluster.Query(queryCtx, req, withStreaming)
if errCluster != nil {
log.Ctx(ctx).Warn("failed to query cluster", zap.Int64("msgID", msgID), zap.Int64("collectionID", req.Req.GetCollectionID()), zap.Error(errCluster))
log.Ctx(ctx).Warn("failed to query cluster",
zap.Int64("collectionID", req.Req.GetCollectionID()),
zap.Error(errCluster))
failRet.Status.Reason = errCluster.Error()
return failRet, nil
}
tr.CtxElapse(ctx, fmt.Sprintf("start reduce query result, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
tr.CtxElapse(ctx, fmt.Sprintf("start reduce query result, traceID = %s, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
traceID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
results = append(results, streamingResult)
ret, err2 := mergeInternalRetrieveResult(ctx, results, req.Req.GetLimit())
@ -1043,8 +1042,8 @@ func (node *QueryNode) queryWithDmlChannel(ctx context.Context, req *querypb.Que
return failRet, nil
}
tr.CtxElapse(ctx, fmt.Sprintf("do query done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
msgID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
tr.CtxElapse(ctx, fmt.Sprintf("do query done, traceID = %s, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
traceID, req.GetFromShardLeader(), dmlChannel, req.GetSegmentIDs()))
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
latency := tr.ElapseSpan()
@ -1055,7 +1054,7 @@ func (node *QueryNode) queryWithDmlChannel(ctx context.Context, req *querypb.Que
// Query performs replica query tasks.
func (node *QueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) {
log.Ctx(ctx).Debug("Received QueryRequest", zap.Int64("msgID", req.GetReq().GetBase().GetMsgID()),
log.Ctx(ctx).Debug("Received QueryRequest",
zap.Strings("vChannels", req.GetDmlChannels()),
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
zap.Uint64("guaranteeTimestamp", req.Req.GetGuaranteeTimestamp()),
@ -1140,7 +1139,7 @@ func (node *QueryNode) SyncReplicaSegments(ctx context.Context, req *querypb.Syn
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
}
//ShowConfigurations returns the configurations of queryNode matching req.Pattern
// ShowConfigurations returns the configurations of queryNode matching req.Pattern
func (node *QueryNode) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
if !node.isHealthy() {
log.Warn("QueryNode.ShowConfigurations failed",

View File

@ -38,7 +38,7 @@ type loadSegmentsTask struct {
// loadSegmentsTask
func (l *loadSegmentsTask) PreExecute(ctx context.Context) error {
log.Info("LoadSegmentTask PreExecute start", zap.Int64("msgID", l.req.Base.MsgID))
log.Ctx(ctx).Info("LoadSegmentTask PreExecute start")
var err error
// init meta
collectionID := l.req.GetCollectionID()
@ -64,15 +64,15 @@ func (l *loadSegmentsTask) PreExecute(ctx context.Context) error {
}
}
l.req.Infos = filteredInfos
log.Info("LoadSegmentTask PreExecute done", zap.Int64("msgID", l.req.Base.MsgID))
log.Info("LoadSegmentTask PreExecute done")
return nil
}
func (l *loadSegmentsTask) Execute(ctx context.Context) error {
log.Info("LoadSegmentTask Execute start", zap.Int64("msgID", l.req.Base.MsgID))
log.Ctx(ctx).Info("LoadSegmentTask Execute start")
if len(l.req.Infos) == 0 {
log.Info("all segments loaded", zap.Int64("msgID", l.req.GetBase().GetMsgID()))
log.Info("all segments loaded")
return nil
}
@ -127,7 +127,7 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error {
}
log.Info("LoadSegmentTask Execute done", zap.Int64("collectionID", l.req.CollectionID),
zap.Int64("replicaID", l.req.ReplicaID), zap.Int64("msgID", l.req.Base.MsgID))
zap.Int64("replicaID", l.req.ReplicaID))
return nil
}

View File

@ -898,6 +898,7 @@ func (sc *ShardCluster) GetStatistics(ctx context.Context, req *querypb.GetStati
// Search preforms search operation on shard cluster.
func (sc *ShardCluster) Search(ctx context.Context, req *querypb.SearchRequest, withStreaming withStreaming) ([]*internalpb.SearchResults, error) {
log := log.Ctx(ctx)
if !sc.serviceable() {
err := WrapErrShardNotAvailable(sc.replicaID, sc.vchannelName)
log.Warn("failed to search on shard",
@ -983,7 +984,6 @@ func (sc *ShardCluster) Search(ctx context.Context, req *querypb.SearchRequest,
wg.Wait()
if err != nil {
log.Error("failed to do search",
zap.Int64("msgID", req.GetReq().GetBase().GetMsgID()),
zap.Int64("sourceID", req.GetReq().GetBase().GetSourceID()),
zap.Strings("channels", req.GetDmlChannels()),
zap.Int64s("segmentIDs", req.GetSegmentIDs()),

View File

@ -65,9 +65,8 @@ func (q *queryTask) queryOnStreaming() error {
if err != nil {
return err
}
if _, released := q.QS.collection.getReleaseTime(); released {
log.Ctx(ctx).Debug("collection release before search", zap.Int64("msgID", q.ID()),
log.Ctx(ctx).Debug("collection release before search",
zap.Int64("collectionID", q.CollectionID))
return fmt.Errorf("retrieve failed, collection has been released, collectionID = %d", q.CollectionID)
}
@ -111,9 +110,8 @@ func (q *queryTask) queryOnHistorical() error {
if err != nil {
return err
}
if _, released := q.QS.collection.getReleaseTime(); released {
log.Ctx(ctx).Debug("collection release before search", zap.Int64("msgID", q.ID()),
log.Ctx(ctx).Debug("collection release before search",
zap.Int64("collectionID", q.CollectionID))
return fmt.Errorf("retrieve failed, collection has been released, collectionID = %d", q.CollectionID)
}

View File

@ -104,7 +104,7 @@ func (s *searchTask) searchOnStreaming() error {
}
if _, released := s.QS.collection.getReleaseTime(); released {
log.Ctx(ctx).Debug("collection release before search", zap.Int64("msgID", s.ID()),
log.Ctx(ctx).Debug("collection release before search",
zap.Int64("collectionID", s.CollectionID))
return fmt.Errorf("retrieve failed, collection has been released, collectionID = %d", s.CollectionID)
}
@ -117,7 +117,7 @@ func (s *searchTask) searchOnStreaming() error {
partResults, _, _, sErr := searchStreaming(ctx, s.QS.metaReplica, searchReq, s.CollectionID, s.iReq.GetPartitionIDs(), s.req.GetDmlChannels()[0])
if sErr != nil {
log.Ctx(ctx).Warn("failed to search streaming data", zap.Int64("msgID", s.ID()),
log.Ctx(ctx).Warn("failed to search streaming data",
zap.Int64("collectionID", s.CollectionID), zap.Error(sErr))
return sErr
}
@ -139,7 +139,7 @@ func (s *searchTask) searchOnHistorical() error {
}
if _, released := s.QS.collection.getReleaseTime(); released {
log.Ctx(ctx).Warn("collection release before search", zap.Int64("msgID", s.ID()),
log.Ctx(ctx).Warn("collection release before search",
zap.Int64("collectionID", s.CollectionID))
return fmt.Errorf("retrieve failed, collection has been released, collectionID = %d", s.CollectionID)
}
@ -227,7 +227,8 @@ func (s *searchTask) reduceResults(ctx context.Context, searchReq *searchRequest
numSegment := int64(len(results))
blobs, err := reduceSearchResultsAndFillData(searchReq.plan, results, numSegment, sInfo.sliceNQs, sInfo.sliceTopKs)
if err != nil {
log.Ctx(ctx).Warn("marshal for historical results error", zap.Int64("msgID", s.ID()), zap.Error(err))
log.Ctx(ctx).Warn("marshal for historical results error",
zap.Error(err))
return err
}
@ -239,7 +240,7 @@ func (s *searchTask) reduceResults(ctx context.Context, searchReq *searchRequest
for i := 0; i < cnt; i++ {
blob, err := getSearchResultDataBlob(blobs, i)
if err != nil {
log.Ctx(ctx).Warn("getSearchResultDataBlob for historical results error", zap.Int64("msgID", s.ID()),
log.Ctx(ctx).Warn("getSearchResultDataBlob for historical results error",
zap.Error(err))
return err
}

View File

@ -44,9 +44,8 @@ func (s *statistics) statisticOnStreaming() error {
if err != nil {
return err
}
if _, released := s.qs.collection.getReleaseTime(); released {
log.Ctx(ctx).Warn("collection release before do statistics", zap.Int64("msgID", s.id),
log.Ctx(ctx).Warn("collection release before do statistics",
zap.Int64("collectionID", s.iReq.GetCollectionID()))
return fmt.Errorf("statistic failed, collection has been released, collectionID = %d", s.iReq.GetCollectionID())
}
@ -54,8 +53,9 @@ func (s *statistics) statisticOnStreaming() error {
results, _, _, err := statisticStreaming(ctx, s.qs.metaReplica, s.iReq.GetCollectionID(),
s.iReq.GetPartitionIDs(), s.req.GetDmlChannels()[0])
if err != nil {
log.Ctx(ctx).Warn("failed to statistic on streaming data", zap.Int64("msgID", s.id),
zap.Int64("collectionID", s.iReq.GetCollectionID()), zap.Error(err))
log.Ctx(ctx).Warn("failed to statistic on streaming data",
zap.Int64("collectionID", s.iReq.GetCollectionID()),
zap.Error(err))
return err
}
return s.reduceResults(results)
@ -73,9 +73,8 @@ func (s *statistics) statisticOnHistorical() error {
if err != nil {
return err
}
if _, released := s.qs.collection.getReleaseTime(); released {
log.Ctx(ctx).Debug("collection release before do statistics", zap.Int64("msgID", s.id),
log.Ctx(ctx).Debug("collection release before do statistics",
zap.Int64("collectionID", s.iReq.GetCollectionID()))
return fmt.Errorf("statistic failed, collection has been released, collectionID = %d", s.iReq.GetCollectionID())
}

View File

@ -767,8 +767,9 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder("CreateCollection")
log.Ctx(ctx).Info("received request to create collection", zap.String("role", typeutil.RootCoordRole),
zap.String("name", in.GetCollectionName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
log.Ctx(ctx).Info("received request to create collection",
zap.String("role", typeutil.RootCoordRole),
zap.String("name", in.GetCollectionName()))
t := &createCollectionTask{
baseTask: baseTask{
@ -780,19 +781,21 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti
}
if err := c.scheduler.AddTask(t); err != nil {
log.Ctx(ctx).Error("failed to enqueue request to create collection", zap.String("role", typeutil.RootCoordRole),
log.Ctx(ctx).Error("failed to enqueue request to create collection",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("name", in.GetCollectionName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
zap.String("name", in.GetCollectionName()))
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
}
if err := t.WaitToFinish(); err != nil {
log.Ctx(ctx).Error("failed to create collection", zap.String("role", typeutil.RootCoordRole),
log.Ctx(ctx).Error("failed to create collection",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("name", in.GetCollectionName()),
zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
zap.Uint64("ts", t.GetTs()))
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
@ -802,9 +805,10 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti
metrics.RootCoordDDLReqLatency.WithLabelValues("CreateCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordNumOfCollections.Inc()
log.Ctx(ctx).Info("done to create collection", zap.String("role", typeutil.RootCoordRole),
log.Ctx(ctx).Info("done to create collection",
zap.String("role", typeutil.RootCoordRole),
zap.String("name", in.GetCollectionName()),
zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
zap.Uint64("ts", t.GetTs()))
return succStatus(), nil
}
@ -818,7 +822,7 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe
tr := timerecord.NewTimeRecorder("DropCollection")
log.Ctx(ctx).Info("received request to drop collection", zap.String("role", typeutil.RootCoordRole),
zap.String("name", in.GetCollectionName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
zap.String("name", in.GetCollectionName()))
t := &dropCollectionTask{
baseTask: baseTask{
@ -832,7 +836,7 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe
if err := c.scheduler.AddTask(t); err != nil {
log.Ctx(ctx).Error("failed to enqueue request to drop collection", zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("name", in.GetCollectionName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
zap.String("name", in.GetCollectionName()))
metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
@ -842,7 +846,7 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe
log.Ctx(ctx).Error("failed to drop collection", zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("name", in.GetCollectionName()),
zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
zap.Uint64("ts", t.GetTs()))
metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
@ -853,7 +857,7 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe
metrics.RootCoordNumOfCollections.Dec()
log.Ctx(ctx).Info("done to drop collection", zap.String("role", typeutil.RootCoordRole),
zap.String("name", in.GetCollectionName()), zap.Int64("msgID", in.GetBase().GetMsgID()),
zap.String("name", in.GetCollectionName()),
zap.Uint64("ts", t.GetTs()))
return succStatus(), nil
}
@ -871,7 +875,8 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ
tr := timerecord.NewTimeRecorder("HasCollection")
ts := getTravelTs(in)
log := log.Ctx(ctx).With(zap.String("collection name", in.GetCollectionName()), zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", ts))
log := log.Ctx(ctx).With(zap.String("collection name", in.GetCollectionName()),
zap.Uint64("ts", ts))
log.Info("received request to has collection")
@ -955,7 +960,9 @@ func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeColl
tr := timerecord.NewTimeRecorder("DescribeCollection")
ts := getTravelTs(in)
log := log.Ctx(ctx).With(zap.String("collection name", in.GetCollectionName()), zap.Int64("id", in.GetCollectionID()), zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", ts))
log := log.Ctx(ctx).With(zap.String("collection name", in.GetCollectionName()),
zap.Int64("id", in.GetCollectionID()),
zap.Uint64("ts", ts))
// TODO(longjiquan): log may be very frequent here.
@ -1005,7 +1012,8 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections
tr := timerecord.NewTimeRecorder("ShowCollections")
ts := getTravelTs(in)
log := log.Ctx(ctx).With(zap.String("dbname", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", ts))
log := log.Ctx(ctx).With(zap.String("dbname", in.GetDbName()),
zap.Uint64("ts", ts))
log.Info("received request to show collections")
@ -1047,8 +1055,9 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder("AlterCollection")
log.Info("received request to alter collection", zap.String("role", typeutil.RootCoordRole),
zap.String("name", in.GetCollectionName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
log.Ctx(ctx).Info("received request to alter collection",
zap.String("role", typeutil.RootCoordRole),
zap.String("name", in.GetCollectionName()))
t := &alterCollectionTask{
baseTask: baseTask{
@ -1060,19 +1069,21 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection
}
if err := c.scheduler.AddTask(t); err != nil {
log.Error("failed to enqueue request to alter collection", zap.String("role", typeutil.RootCoordRole),
log.Error("failed to enqueue request to alter collection",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("name", in.GetCollectionName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
zap.String("name", in.GetCollectionName()))
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
}
if err := t.WaitToFinish(); err != nil {
log.Error("failed to alter collection", zap.String("role", typeutil.RootCoordRole),
log.Error("failed to alter collection",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("name", in.GetCollectionName()),
zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
zap.Uint64("ts", t.GetTs()))
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
@ -1082,8 +1093,9 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection
metrics.RootCoordDDLReqLatency.WithLabelValues("AlterCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordNumOfCollections.Dec()
log.Info("done to alter collection", zap.String("role", typeutil.RootCoordRole),
zap.String("name", in.GetCollectionName()), zap.Int64("msgID", in.GetBase().GetMsgID()),
log.Info("done to alter collection",
zap.String("role", typeutil.RootCoordRole),
zap.String("name", in.GetCollectionName()),
zap.Uint64("ts", t.GetTs()))
return succStatus(), nil
}
@ -1097,9 +1109,10 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition
metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder("CreatePartition")
log.Ctx(ctx).Info("received request to create partition", zap.String("role", typeutil.RootCoordRole),
zap.String("collection", in.GetCollectionName()), zap.String("partition", in.GetPartitionName()),
zap.Int64("msgID", in.GetBase().GetMsgID()))
log.Ctx(ctx).Info("received request to create partition",
zap.String("role", typeutil.RootCoordRole),
zap.String("collection", in.GetCollectionName()),
zap.String("partition", in.GetPartitionName()))
t := &createPartitionTask{
baseTask: baseTask{
@ -1111,20 +1124,23 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition
}
if err := c.scheduler.AddTask(t); err != nil {
log.Ctx(ctx).Error("failed to enqueue request to create partition", zap.String("role", typeutil.RootCoordRole),
log.Ctx(ctx).Error("failed to enqueue request to create partition",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("collection", in.GetCollectionName()), zap.String("partition", in.GetPartitionName()),
zap.Int64("msgID", in.GetBase().GetMsgID()))
zap.String("collection", in.GetCollectionName()),
zap.String("partition", in.GetPartitionName()))
metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
}
if err := t.WaitToFinish(); err != nil {
log.Ctx(ctx).Error("failed to create partition", zap.String("role", typeutil.RootCoordRole),
log.Ctx(ctx).Error("failed to create partition",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("collection", in.GetCollectionName()), zap.String("partition", in.GetPartitionName()),
zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
zap.String("collection", in.GetCollectionName()),
zap.String("partition", in.GetPartitionName()),
zap.Uint64("ts", t.GetTs()))
metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
@ -1133,9 +1149,11 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition
metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("CreatePartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
log.Ctx(ctx).Info("done to create partition", zap.String("role", typeutil.RootCoordRole),
zap.String("collection", in.GetCollectionName()), zap.String("partition", in.GetPartitionName()),
zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
log.Ctx(ctx).Info("done to create partition",
zap.String("role", typeutil.RootCoordRole),
zap.String("collection", in.GetCollectionName()),
zap.String("partition", in.GetPartitionName()),
zap.Uint64("ts", t.GetTs()))
return succStatus(), nil
}
@ -1148,9 +1166,10 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ
metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder("DropPartition")
log.Ctx(ctx).Info("received request to drop partition", zap.String("role", typeutil.RootCoordRole),
zap.String("collection", in.GetCollectionName()), zap.String("partition", in.GetPartitionName()),
zap.Int64("msgID", in.GetBase().GetMsgID()))
log.Ctx(ctx).Info("received request to drop partition",
zap.String("role", typeutil.RootCoordRole),
zap.String("collection", in.GetCollectionName()),
zap.String("partition", in.GetPartitionName()))
t := &dropPartitionTask{
baseTask: baseTask{
@ -1162,19 +1181,22 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ
}
if err := c.scheduler.AddTask(t); err != nil {
log.Ctx(ctx).Error("failed to enqueue request to drop partition", zap.String("role", typeutil.RootCoordRole),
log.Ctx(ctx).Error("failed to enqueue request to drop partition",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("collection", in.GetCollectionName()), zap.String("partition", in.GetPartitionName()),
zap.Int64("msgID", in.GetBase().GetMsgID()))
zap.String("collection", in.GetCollectionName()),
zap.String("partition", in.GetPartitionName()))
metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
}
if err := t.WaitToFinish(); err != nil {
log.Ctx(ctx).Error("failed to drop partition", zap.String("role", typeutil.RootCoordRole),
log.Ctx(ctx).Error("failed to drop partition",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("collection", in.GetCollectionName()), zap.String("partition", in.GetPartitionName()),
zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
zap.String("collection", in.GetCollectionName()),
zap.String("partition", in.GetPartitionName()),
zap.Uint64("ts", t.GetTs()))
metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
@ -1183,9 +1205,11 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ
metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("DropPartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
log.Ctx(ctx).Info("done to drop partition", zap.String("role", typeutil.RootCoordRole),
zap.String("collection", in.GetCollectionName()), zap.String("partition", in.GetPartitionName()),
zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
log.Ctx(ctx).Info("done to drop partition",
zap.String("role", typeutil.RootCoordRole),
zap.String("collection", in.GetCollectionName()),
zap.String("partition", in.GetPartitionName()),
zap.Uint64("ts", t.GetTs()))
return succStatus(), nil
}
@ -1203,7 +1227,9 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques
// TODO(longjiquan): why HasPartitionRequest doesn't contain Timestamp but other requests do.
ts := typeutil.MaxTimestamp
log := log.Ctx(ctx).With(zap.String("collection", in.GetCollectionName()), zap.String("partition", in.GetPartitionName()), zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", ts))
log := log.Ctx(ctx).With(zap.String("collection", in.GetCollectionName()),
zap.String("partition", in.GetPartitionName()),
zap.Uint64("ts", ts))
log.Info("received request to has partition")
@ -1250,7 +1276,7 @@ func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRe
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder("ShowPartitions")
log := log.Ctx(ctx).With(zap.String("collection", in.GetCollectionName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
log := log.Ctx(ctx).With(zap.String("collection", in.GetCollectionName()))
log.Info("received request to show partitions")
@ -1302,8 +1328,7 @@ func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestam
ts, err := c.tsoAllocator.GenerateTSO(in.GetCount())
if err != nil {
log.Ctx(ctx).Error("failed to allocate timestamp", zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.Int64("msgID", in.GetBase().GetMsgID()))
zap.Error(err))
return &rootcoordpb.AllocTimestampResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "AllocTimestamp failed: "+err.Error()),
@ -1329,9 +1354,9 @@ func (c *Core) AllocID(ctx context.Context, in *rootcoordpb.AllocIDRequest) (*ro
}
start, _, err := c.idAllocator.Alloc(in.Count)
if err != nil {
log.Ctx(ctx).Error("failed to allocate id", zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.Int64("msgID", in.GetBase().GetMsgID()))
log.Ctx(ctx).Error("failed to allocate id",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err))
return &rootcoordpb.AllocIDResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "AllocID failed: "+err.Error()),
@ -1349,6 +1374,7 @@ func (c *Core) AllocID(ctx context.Context, in *rootcoordpb.AllocIDRequest) (*ro
// UpdateChannelTimeTick used to handle ChannelTimeTickMsg
func (c *Core) UpdateChannelTimeTick(ctx context.Context, in *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) {
log := log.Ctx(ctx)
if code, ok := c.checkHealthy(); !ok {
log.Warn("failed to updateTimeTick because rootcoord is not healthy", zap.Any("state", code))
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil
@ -1360,8 +1386,9 @@ func (c *Core) UpdateChannelTimeTick(ctx context.Context, in *internalpb.Channel
}
err := c.chanTimeTick.updateTimeTick(in, "gRPC")
if err != nil {
log.Warn("failed to updateTimeTick", zap.String("role", typeutil.RootCoordRole),
zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
log.Warn("failed to updateTimeTick",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err))
return failStatus(commonpb.ErrorCode_UnexpectedError, "UpdateTimeTick failed: "+err.Error()), nil
}
return succStatus(), nil
@ -1410,8 +1437,9 @@ func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (
}, nil
}
log.Debug("GetMetrics success", zap.String("role", typeutil.RootCoordRole),
zap.String("metric_type", metricType), zap.Int64("msgID", in.GetBase().GetMsgID()))
log.Ctx(ctx).Debug("GetMetrics success",
zap.String("role", typeutil.RootCoordRole),
zap.String("metric_type", metricType))
if metricType == metricsinfo.SystemInfoMetrics {
ret, err := c.metricsCacheManager.GetSystemInfoMetrics()
@ -1419,13 +1447,16 @@ func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (
return ret, nil
}
log.Debug("GetSystemInfoMetrics from cache failed, recompute instead", zap.String("role", typeutil.RootCoordRole),
zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Error(err))
log.Warn("GetSystemInfoMetrics from cache failed",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err))
systemInfoMetrics, err := c.getSystemInfoMetrics(ctx, in)
if err != nil {
log.Warn("GetSystemInfoMetrics failed", zap.String("role", typeutil.RootCoordRole),
zap.String("metric_type", metricType), zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Error(err))
log.Warn("GetSystemInfoMetrics failed",
zap.String("role", typeutil.RootCoordRole),
zap.String("metric_type", metricType),
zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, fmt.Sprintf("getSystemInfoMetrics failed: %s", err.Error())),
Response: "",
@ -1437,7 +1468,7 @@ func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (
}
log.Warn("GetMetrics failed, metric type not implemented", zap.String("role", typeutil.RootCoordRole),
zap.String("metric_type", metricType), zap.Int64("msgID", in.GetBase().GetMsgID()))
zap.String("metric_type", metricType))
return &milvuspb.GetMetricsResponse{
Status: failStatus(commonpb.ErrorCode_UnexpectedError, metricsinfo.MsgUnimplementedMetric),
@ -1454,9 +1485,10 @@ func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest)
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder("CreateAlias")
log.Info("received request to create alias", zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.GetAlias()), zap.String("collection", in.GetCollectionName()),
zap.Int64("msgID", in.GetBase().GetMsgID()))
log.Ctx(ctx).Info("received request to create alias",
zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.GetAlias()),
zap.String("collection", in.GetCollectionName()))
t := &createAliasTask{
baseTask: baseTask{
@ -1468,20 +1500,23 @@ func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest)
}
if err := c.scheduler.AddTask(t); err != nil {
log.Error("failed to enqueue request to create alias", zap.String("role", typeutil.RootCoordRole),
log.Error("failed to enqueue request to create alias",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("alias", in.GetAlias()), zap.String("collection", in.GetCollectionName()),
zap.Int64("msgID", in.GetBase().GetMsgID()))
zap.String("alias", in.GetAlias()),
zap.String("collection", in.GetCollectionName()))
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
}
if err := t.WaitToFinish(); err != nil {
log.Error("failed to create alias", zap.String("role", typeutil.RootCoordRole),
log.Error("failed to create alias",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("alias", in.GetAlias()), zap.String("collection", in.GetCollectionName()),
zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
zap.String("alias", in.GetAlias()),
zap.String("collection", in.GetCollectionName()),
zap.Uint64("ts", t.GetTs()))
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
@ -1490,9 +1525,11 @@ func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest)
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("CreateAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
log.Info("done to create alias", zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.GetAlias()), zap.String("collection", in.GetCollectionName()),
zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
log.Info("done to create alias",
zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.GetAlias()),
zap.String("collection", in.GetCollectionName()),
zap.Uint64("ts", t.GetTs()))
return succStatus(), nil
}
@ -1505,8 +1542,9 @@ func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*c
metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder("DropAlias")
log.Info("received request to drop alias", zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.GetAlias()), zap.Int64("msgID", in.GetBase().GetMsgID()))
log.Ctx(ctx).Info("received request to drop alias",
zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.GetAlias()))
t := &dropAliasTask{
baseTask: baseTask{
@ -1518,19 +1556,21 @@ func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*c
}
if err := c.scheduler.AddTask(t); err != nil {
log.Error("failed to enqueue request to drop alias", zap.String("role", typeutil.RootCoordRole),
log.Error("failed to enqueue request to drop alias",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("alias", in.GetAlias()), zap.Int64("msgID", in.GetBase().GetMsgID()))
zap.String("alias", in.GetAlias()))
metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
}
if err := t.WaitToFinish(); err != nil {
log.Error("failed to drop alias", zap.String("role", typeutil.RootCoordRole),
log.Error("failed to drop alias",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("alias", in.GetAlias()),
zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
zap.Uint64("ts", t.GetTs()))
metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
@ -1539,9 +1579,10 @@ func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*c
metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("DropAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
log.Info("done to drop alias", zap.String("role", typeutil.RootCoordRole),
log.Info("done to drop alias",
zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.GetAlias()),
zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
zap.Uint64("ts", t.GetTs()))
return succStatus(), nil
}
@ -1554,9 +1595,10 @@ func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (
metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder("AlterAlias")
log.Info("received request to alter alias", zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.GetAlias()), zap.String("collection", in.GetCollectionName()),
zap.Int64("msgID", in.GetBase().GetMsgID()))
log.Ctx(ctx).Info("received request to alter alias",
zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.GetAlias()),
zap.String("collection", in.GetCollectionName()))
t := &alterAliasTask{
baseTask: baseTask{
@ -1568,20 +1610,23 @@ func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (
}
if err := c.scheduler.AddTask(t); err != nil {
log.Error("failed to enqueue request to alter alias", zap.String("role", typeutil.RootCoordRole),
log.Error("failed to enqueue request to alter alias",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("alias", in.GetAlias()), zap.String("collection", in.GetCollectionName()),
zap.Int64("msgID", in.GetBase().GetMsgID()))
zap.String("alias", in.GetAlias()),
zap.String("collection", in.GetCollectionName()))
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
}
if err := t.WaitToFinish(); err != nil {
log.Error("failed to alter alias", zap.String("role", typeutil.RootCoordRole),
log.Error("failed to alter alias",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("alias", in.GetAlias()), zap.String("collection", in.GetCollectionName()),
zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
zap.String("alias", in.GetAlias()),
zap.String("collection", in.GetCollectionName()),
zap.Uint64("ts", t.GetTs()))
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.FailLabel).Inc()
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
@ -1590,9 +1635,11 @@ func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("AlterAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
log.Info("done to alter alias", zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.GetAlias()), zap.String("collection", in.GetCollectionName()),
zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
log.Info("done to alter alias",
zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.GetAlias()),
zap.String("collection", in.GetCollectionName()),
zap.Uint64("ts", t.GetTs()))
return succStatus(), nil
}
@ -1930,8 +1977,9 @@ func (c *Core) ListCredUsers(ctx context.Context, in *milvuspb.ListCredUsersRequ
credInfo, err := c.meta.ListCredentialUsernames()
if err != nil {
log.Error("ListCredUsers query usernames failed", zap.String("role", typeutil.RootCoordRole),
zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
log.Ctx(ctx).Error("ListCredUsers query usernames failed",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
return &milvuspb.ListCredUsersResponse{
Status: failStatus(commonpb.ErrorCode_ListCredUsersFailure, "ListCredUsers failed: "+err.Error()),