mirror of https://github.com/milvus-io/milvus.git
parent
f107f9d03e
commit
89d712ebc3
|
@ -2214,6 +2214,11 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*
|
|||
resp.Status.Reason = "proxy is not healthy"
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Flush")
|
||||
defer sp.Finish()
|
||||
traceID, _, _ := trace.InfoFromSpan(sp)
|
||||
|
||||
ft := &flushTask{
|
||||
ctx: ctx,
|
||||
Condition: NewTaskCondition(ctx),
|
||||
|
@ -2221,39 +2226,65 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*
|
|||
dataCoord: node.dataCoord,
|
||||
}
|
||||
|
||||
log.Debug("Flush enqueue",
|
||||
method := "Flush"
|
||||
|
||||
log.Debug(
|
||||
rpcReceived(method),
|
||||
zap.String("traceID", traceID),
|
||||
zap.String("role", Params.RoleName),
|
||||
zap.String("db", request.DbName),
|
||||
zap.Any("collections", request.CollectionNames))
|
||||
err := node.sched.ddQueue.Enqueue(ft)
|
||||
if err != nil {
|
||||
|
||||
if err := node.sched.ddQueue.Enqueue(ft); err != nil {
|
||||
log.Warn(
|
||||
rpcFailedToEnqueue(method),
|
||||
zap.Error(err),
|
||||
zap.String("traceID", traceID),
|
||||
zap.String("role", Params.RoleName),
|
||||
zap.String("db", request.DbName),
|
||||
zap.Any("collections", request.CollectionNames))
|
||||
|
||||
resp.Status.Reason = err.Error()
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
log.Debug("Flush",
|
||||
log.Debug(
|
||||
rpcEnqueued(method),
|
||||
zap.String("traceID", traceID),
|
||||
zap.String("role", Params.RoleName),
|
||||
zap.Int64("msgID", request.Base.MsgID),
|
||||
zap.Uint64("timestamp", request.Base.Timestamp),
|
||||
zap.Int64("MsgID", ft.ID()),
|
||||
zap.Uint64("BeginTs", ft.BeginTs()),
|
||||
zap.Uint64("EndTs", ft.EndTs()),
|
||||
zap.String("db", request.DbName),
|
||||
zap.Any("collections", request.CollectionNames))
|
||||
defer func() {
|
||||
log.Debug("Flush Done",
|
||||
|
||||
if err := ft.WaitToFinish(); err != nil {
|
||||
log.Warn(
|
||||
rpcFailedToWaitToFinish(method),
|
||||
zap.Error(err),
|
||||
zap.String("traceID", traceID),
|
||||
zap.String("role", Params.RoleName),
|
||||
zap.Int64("msgID", request.Base.MsgID),
|
||||
zap.Uint64("timestamp", request.Base.Timestamp),
|
||||
zap.Int64("MsgID", ft.ID()),
|
||||
zap.Uint64("BeginTs", ft.BeginTs()),
|
||||
zap.Uint64("EndTs", ft.EndTs()),
|
||||
zap.String("db", request.DbName),
|
||||
zap.Any("collections", request.CollectionNames))
|
||||
}()
|
||||
|
||||
err = ft.WaitToFinish()
|
||||
if err != nil {
|
||||
resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
||||
resp.Status.Reason = err.Error()
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
log.Debug(
|
||||
rpcDone(method),
|
||||
zap.String("traceID", traceID),
|
||||
zap.String("role", Params.RoleName),
|
||||
zap.Int64("MsgID", ft.ID()),
|
||||
zap.Uint64("BeginTs", ft.BeginTs()),
|
||||
zap.Uint64("EndTs", ft.EndTs()),
|
||||
zap.String("db", request.DbName),
|
||||
zap.Any("collections", request.CollectionNames))
|
||||
|
||||
return ft.result, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue