mirror of https://github.com/milvus-io/milvus.git
Add some log for delete task when it need query (#27317)
Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>pull/27885/head
parent
178db7b0f0
commit
9ba79a97e4
|
@ -262,17 +262,21 @@ func (dt *deleteTask) PostExecute(ctx context.Context) error {
|
|||
|
||||
func (dt *deleteTask) getStreamingQueryAndDelteFunc(stream msgstream.MsgStream, plan *planpb.PlanNode) executeFunc {
|
||||
return func(ctx context.Context, nodeID int64, qn types.QueryNodeClient, channelIDs ...string) error {
|
||||
// outputField := translateOutputFields(, dt.schema, true)
|
||||
|
||||
partationIDs := []int64{}
|
||||
if dt.partitionID != common.InvalidFieldID {
|
||||
partationIDs = append(partationIDs, dt.partitionID)
|
||||
}
|
||||
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.Int64("collectionID", dt.collectionID),
|
||||
zap.Int64s("partationIDs", partationIDs),
|
||||
zap.Strings("channels", channelIDs),
|
||||
zap.Int64("nodeID", nodeID))
|
||||
// set plan
|
||||
_, outputFieldIDs := translatePkOutputFields(dt.schema)
|
||||
outputFieldIDs = append(outputFieldIDs, common.TimeStampField)
|
||||
plan.OutputFieldIds = outputFieldIDs
|
||||
log.Debug("start query for delete")
|
||||
|
||||
serializedPlan, err := proto.Marshal(plan)
|
||||
if err != nil {
|
||||
|
@ -300,9 +304,10 @@ func (dt *deleteTask) getStreamingQueryAndDelteFunc(stream msgstream.MsgStream,
|
|||
Scope: querypb.DataScope_All,
|
||||
}
|
||||
|
||||
rc := timerecord.NewTimeRecorder("QueryStreamDelete")
|
||||
client, err := qn.QueryStream(ctx, queryReq)
|
||||
if err != nil {
|
||||
log.Warn("query for delete return error", zap.Error(err))
|
||||
log.Warn("query stream for delete create failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -310,6 +315,7 @@ func (dt *deleteTask) getStreamingQueryAndDelteFunc(stream msgstream.MsgStream,
|
|||
result, err := client.Recv()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
log.Debug("query stream for delete finished", zap.Int64("msgID", dt.msgID), zap.Duration("duration", rc.ElapseSpan()))
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
|
@ -317,11 +323,13 @@ func (dt *deleteTask) getStreamingQueryAndDelteFunc(stream msgstream.MsgStream,
|
|||
|
||||
err = merr.Error(result.GetStatus())
|
||||
if err != nil {
|
||||
log.Warn("query stream for delete get error status", zap.Int64("msgID", dt.msgID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
err = dt.produce(ctx, stream, result.GetIds())
|
||||
if err != nil {
|
||||
log.Warn("query stream for delete produce result failed", zap.Int64("msgID", dt.msgID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -350,7 +358,10 @@ func (dt *deleteTask) simpleDelete(ctx context.Context, termExp *planpb.Expr_Ter
|
|||
log.Info("Failed to get primary keys from expr", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("get primary keys from expr", zap.Int64("len of primary keys", numRow))
|
||||
log.Debug("get primary keys from expr",
|
||||
zap.Int64("len of primary keys", numRow),
|
||||
zap.Int64("collectionID", dt.collectionID),
|
||||
zap.Int64("partationID", dt.partitionID))
|
||||
err = dt.produce(ctx, stream, primaryKeys)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -280,7 +280,7 @@ func (sd *shardDelegator) QueryStream(ctx context.Context, req *querypb.QueryReq
|
|||
growing = []SegmentEntry{}
|
||||
}
|
||||
|
||||
log.Info("query segments...",
|
||||
log.Info("query stream segments...",
|
||||
zap.Int("sealedNum", len(sealed)),
|
||||
zap.Int("growingNum", len(growing)),
|
||||
)
|
||||
|
|
|
@ -264,6 +264,13 @@ func (node *QueryNode) queryChannelStream(ctx context.Context, req *querypb.Quer
|
|||
}
|
||||
|
||||
func (node *QueryNode) queryStreamSegments(ctx context.Context, req *querypb.QueryRequest, srv streamrpc.QueryStreamServer) error {
|
||||
log.Debug("received query stream request",
|
||||
zap.Int64s("outputFields", req.GetReq().GetOutputFieldsId()),
|
||||
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
|
||||
zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()),
|
||||
zap.Uint64("mvccTimestamp", req.GetReq().GetMvccTimestamp()),
|
||||
)
|
||||
|
||||
collection := node.manager.Collection.Get(req.Req.GetCollectionID())
|
||||
if collection == nil {
|
||||
return merr.WrapErrCollectionNotFound(req.Req.GetCollectionID())
|
||||
|
|
Loading…
Reference in New Issue