Accelerate flush in flushAll (#26769)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/26808/head
yihao.dai 2023-09-01 15:23:01 +08:00 committed by GitHub
parent e8f1b1736e
commit 64cf5eab18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 22 additions and 11 deletions

View File

@ -3172,14 +3172,11 @@ func (node *Proxy) FlushAll(ctx context.Context, _ *milvuspb.FlushAllRequest) (*
hasError := func(status *commonpb.Status, err error) bool {
if err != nil {
resp.Status = &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}
resp.Status = merr.Status(err)
log.Warn("FlushAll failed", zap.String("err", err.Error()))
return true
}
if status.ErrorCode != commonpb.ErrorCode_Success {
if status != nil && status.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("FlushAll failed", zap.String("err", status.GetReason()))
resp.Status = status
return true
@ -3204,12 +3201,26 @@ func (node *Proxy) FlushAll(ctx context.Context, _ *milvuspb.FlushAllRequest) (*
return resp, nil
}
flushRsp, err := node.Flush(ctx, &milvuspb.FlushRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithMsgType(commonpb.MsgType_Flush)),
DbName: dbName,
CollectionNames: showColRsp.GetCollectionNames(),
})
if hasError(flushRsp.GetStatus(), err) {
group, ctx := errgroup.WithContext(ctx)
for _, collection := range showColRsp.GetCollectionNames() {
collection := collection
group.Go(func() error {
flushRsp, err := node.Flush(ctx, &milvuspb.FlushRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithMsgType(commonpb.MsgType_Flush)),
DbName: dbName,
CollectionNames: []string{collection},
})
if err != nil {
return err
}
if flushRsp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return merr.Error(flushRsp.GetStatus())
}
return nil
})
}
err = group.Wait()
if hasError(nil, err) {
return resp, nil
}
}