Improve logs in proxy (#12762)

Signed-off-by: Xiangyu Wang <xiangyu.wang@zilliz.com>
pull/12771/head
Xiangyu Wang 2021-12-06 14:19:40 +08:00 committed by GitHub
parent 43ef945b6f
commit 51d55dc5cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 47 additions and 88 deletions

View File

@ -1395,26 +1395,18 @@ func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndex
// Insert insert records into collection. // Insert insert records into collection.
func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) { func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) {
sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
defer sp.Finish()
traceID, _, _ := trace.InfoFromSpan(sp)
log.Info("Start processing insert request in Proxy", zap.String("traceID", traceID))
defer log.Info("Finish processing insert request in Proxy", zap.String("traceID", traceID))
if !node.checkHealthy() { if !node.checkHealthy() {
return &milvuspb.MutationResult{ return &milvuspb.MutationResult{
Status: unhealthyStatus(), Status: unhealthyStatus(),
}, nil }, nil
} }
sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
defer sp.Finish()
traceID, _, _ := trace.InfoFromSpan(sp)
log.Debug("Insert received",
zap.String("traceID", traceID),
zap.String("role", Params.RoleName),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
zap.String("partition", request.PartitionName),
zap.Int("len(FieldsData)", len(request.FieldsData)),
zap.Int("len(HashKeys)", len(request.HashKeys)),
zap.Uint32("NumRows", request.NumRows))
it := &insertTask{ it := &insertTask{
ctx: ctx, ctx: ctx,
Condition: NewTaskCondition(ctx), Condition: NewTaskCondition(ctx),
@ -1439,24 +1431,26 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
chTicker: node.chTicker, chTicker: node.chTicker,
} }
var err error
if len(it.PartitionName) <= 0 { if len(it.PartitionName) <= 0 {
it.PartitionName = Params.DefaultPartitionName it.PartitionName = Params.DefaultPartitionName
} }
result := &milvuspb.MutationResult{ constructFailedResponse := func(err error) *milvuspb.MutationResult {
numRows := it.req.NumRows
errIndex := make([]uint32, numRows)
for i := uint32(0); i < numRows; i++ {
errIndex[i] = i
}
return &milvuspb.MutationResult{
Status: &commonpb.Status{ Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success, ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}, },
ErrIndex: errIndex,
}
} }
err = node.sched.dmQueue.Enqueue(it) log.Debug("Enqueue insert request in Proxy",
if err != nil {
log.Debug("Insert failed to enqueue",
zap.Error(err),
zap.String("traceID", traceID),
zap.String("role", Params.RoleName), zap.String("role", Params.RoleName),
zap.String("db", request.DbName), zap.String("db", request.DbName),
zap.String("collection", request.CollectionName), zap.String("collection", request.CollectionName),
@ -1465,21 +1459,14 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
zap.Int("len(HashKeys)", len(request.HashKeys)), zap.Int("len(HashKeys)", len(request.HashKeys)),
zap.Uint32("NumRows", request.NumRows)) zap.Uint32("NumRows", request.NumRows))
result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError if err := node.sched.dmQueue.Enqueue(it); err != nil {
result.Status.Reason = err.Error() log.Debug("Failed to enqueue insert task: " + err.Error())
numRows := it.req.NumRows return constructFailedResponse(err), nil
errIndex := make([]uint32, numRows)
for i := uint32(0); i < numRows; i++ {
errIndex[i] = i
}
result.ErrIndex = errIndex
return result, nil
} }
log.Debug("Insert enqueued", log.Debug("Detail of insert request in Proxy",
zap.String("traceID", traceID),
zap.String("role", Params.RoleName), zap.String("role", Params.RoleName),
zap.Int64("MsgID", it.ID()), zap.Int64("msgID", it.Base.MsgID),
zap.Uint64("BeginTS", it.BeginTs()), zap.Uint64("BeginTS", it.BeginTs()),
zap.Uint64("EndTS", it.EndTs()), zap.Uint64("EndTS", it.EndTs()),
zap.String("db", request.DbName), zap.String("db", request.DbName),
@ -1487,36 +1474,16 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
zap.String("partition", request.PartitionName), zap.String("partition", request.PartitionName),
zap.Int("len(FieldsData)", len(request.FieldsData)), zap.Int("len(FieldsData)", len(request.FieldsData)),
zap.Int("len(HashKeys)", len(request.HashKeys)), zap.Int("len(HashKeys)", len(request.HashKeys)),
zap.Uint32("NumRows", request.NumRows)) zap.Uint32("NumRows", request.NumRows),
zap.String("traceID", traceID))
err = it.WaitToFinish() if err := it.WaitToFinish(); err != nil {
if err != nil { log.Debug("Failed to execute insert task in task scheduler: "+err.Error(), zap.String("traceID", traceID))
log.Debug("Insert failed to WaitToFinish", return constructFailedResponse(err), nil
zap.Error(err),
zap.String("traceID", traceID),
zap.String("role", Params.RoleName),
zap.Int64("MsgID", it.ID()),
zap.Uint64("BeginTS", it.BeginTs()),
zap.Uint64("EndTS", it.EndTs()),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
zap.String("partition", request.PartitionName),
zap.Int("len(FieldsData)", len(request.FieldsData)),
zap.Int("len(HashKeys)", len(request.HashKeys)),
zap.Uint32("NumRows", request.NumRows))
result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
result.Status.Reason = err.Error()
numRows := it.req.NumRows
errIndex := make([]uint32, numRows)
for i := uint32(0); i < numRows; i++ {
errIndex[i] = i
}
result.ErrIndex = errIndex
return result, nil
} }
if it.result.Status.ErrorCode != commonpb.ErrorCode_Success { if it.result.Status.ErrorCode != commonpb.ErrorCode_Success {
setErrorIndex := func() {
numRows := it.req.NumRows numRows := it.req.NumRows
errIndex := make([]uint32, numRows) errIndex := make([]uint32, numRows)
for i := uint32(0); i < numRows; i++ { for i := uint32(0); i < numRows; i++ {
@ -1524,20 +1491,12 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
} }
it.result.ErrIndex = errIndex it.result.ErrIndex = errIndex
} }
it.result.InsertCnt = int64(it.req.NumRows)
log.Debug("Insert done", setErrorIndex()
zap.String("traceID", traceID), }
zap.String("role", Params.RoleName),
zap.Int64("MsgID", it.ID()), // InsertCnt always equals to the number of entities in the request
zap.Uint64("BeginTS", it.BeginTs()), it.result.InsertCnt = int64(it.req.NumRows)
zap.Uint64("EndTS", it.EndTs()),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
zap.String("partition", request.PartitionName),
zap.Int("len(FieldsData)", len(request.FieldsData)),
zap.Int("len(HashKeys)", len(request.HashKeys)),
zap.Uint32("NumRows", request.NumRows))
return it.result, nil return it.result, nil
} }