milvus/internal/distributed/proxy/httpserver/handler_v2.go

2045 lines
87 KiB
Go

package httpserver
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"github.com/cockroachdb/errors"
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
validator "github.com/go-playground/validator/v10"
"github.com/samber/lo"
"github.com/tidwall/gjson"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proxy"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/crypto"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/requestutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type HandlersV2 struct {
proxy types.ProxyComponent
checkAuth bool
}
func NewHandlersV2(proxyClient types.ProxyComponent) *HandlersV2 {
return &HandlersV2{
proxy: proxyClient,
checkAuth: proxy.Params.CommonCfg.AuthorizationEnabled.GetAsBool(),
}
}
func (h *HandlersV2) RegisterRoutesToV2(router gin.IRouter) {
router.POST(CollectionCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &DatabaseReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.listCollections)))))
router.POST(CollectionCategory+HasAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.hasCollection)))))
// todo review the return data
router.POST(CollectionCategory+DescribeAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.getCollectionDetails)))))
router.POST(CollectionCategory+StatsAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.getCollectionStats)))))
router.POST(CollectionCategory+LoadStateAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.getCollectionLoadState)))))
router.POST(CollectionCategory+CreateAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionReq{AutoID: DisableAutoID} }, wrapperTraceLog(h.wrapperCheckDatabase(h.createCollection)))))
router.POST(CollectionCategory+DropAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.dropCollection)))))
router.POST(CollectionCategory+RenameAction, timeoutMiddleware(wrapperPost(func() any { return &RenameCollectionReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.renameCollection)))))
router.POST(CollectionCategory+LoadAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.loadCollection)))))
router.POST(CollectionCategory+ReleaseAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.releaseCollection)))))
router.POST(EntityCategory+QueryAction, timeoutMiddleware(wrapperPost(func() any {
return &QueryReqV2{
Limit: 100,
OutputFields: []string{DefaultOutputFields},
}
}, wrapperTraceLog(h.wrapperCheckDatabase(h.query)))))
router.POST(EntityCategory+GetAction, timeoutMiddleware(wrapperPost(func() any {
return &CollectionIDReq{
OutputFields: []string{DefaultOutputFields},
}
}, wrapperTraceLog(h.wrapperCheckDatabase(h.get)))))
router.POST(EntityCategory+DeleteAction, timeoutMiddleware(wrapperPost(func() any {
return &CollectionFilterReq{}
}, wrapperTraceLog(h.wrapperCheckDatabase(h.delete)))))
router.POST(EntityCategory+InsertAction, timeoutMiddleware(wrapperPost(func() any {
return &CollectionDataReq{}
}, wrapperTraceLog(h.wrapperCheckDatabase(h.insert)))))
router.POST(EntityCategory+UpsertAction, timeoutMiddleware(wrapperPost(func() any {
return &CollectionDataReq{}
}, wrapperTraceLog(h.wrapperCheckDatabase(h.upsert)))))
router.POST(EntityCategory+SearchAction, timeoutMiddleware(wrapperPost(func() any {
return &SearchReqV2{
Limit: 100,
}
}, wrapperTraceLog(h.wrapperCheckDatabase(h.search)))))
router.POST(EntityCategory+AdvancedSearchAction, timeoutMiddleware(wrapperPost(func() any {
return &HybridSearchReq{
Limit: 100,
}
}, wrapperTraceLog(h.wrapperCheckDatabase(h.advancedSearch)))))
router.POST(EntityCategory+HybridSearchAction, timeoutMiddleware(wrapperPost(func() any {
return &HybridSearchReq{
Limit: 100,
}
}, wrapperTraceLog(h.wrapperCheckDatabase(h.advancedSearch)))))
router.POST(PartitionCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.listPartitions)))))
router.POST(PartitionCategory+HasAction, timeoutMiddleware(wrapperPost(func() any { return &PartitionReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.hasPartitions)))))
router.POST(PartitionCategory+StatsAction, timeoutMiddleware(wrapperPost(func() any { return &PartitionReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.statsPartition)))))
router.POST(PartitionCategory+CreateAction, timeoutMiddleware(wrapperPost(func() any { return &PartitionReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.createPartition)))))
router.POST(PartitionCategory+DropAction, timeoutMiddleware(wrapperPost(func() any { return &PartitionReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.dropPartition)))))
router.POST(PartitionCategory+LoadAction, timeoutMiddleware(wrapperPost(func() any { return &PartitionsReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.loadPartitions)))))
router.POST(PartitionCategory+ReleaseAction, timeoutMiddleware(wrapperPost(func() any { return &PartitionsReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.releasePartitions)))))
router.POST(UserCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &DatabaseReq{} }, wrapperTraceLog(h.listUsers))))
router.POST(UserCategory+DescribeAction, timeoutMiddleware(wrapperPost(func() any { return &UserReq{} }, wrapperTraceLog(h.describeUser))))
router.POST(UserCategory+CreateAction, timeoutMiddleware(wrapperPost(func() any { return &PasswordReq{} }, wrapperTraceLog(h.createUser))))
router.POST(UserCategory+UpdatePasswordAction, timeoutMiddleware(wrapperPost(func() any { return &NewPasswordReq{} }, wrapperTraceLog(h.updateUser))))
router.POST(UserCategory+DropAction, timeoutMiddleware(wrapperPost(func() any { return &UserReq{} }, wrapperTraceLog(h.dropUser))))
router.POST(UserCategory+GrantRoleAction, timeoutMiddleware(wrapperPost(func() any { return &UserRoleReq{} }, wrapperTraceLog(h.addRoleToUser))))
router.POST(UserCategory+RevokeRoleAction, timeoutMiddleware(wrapperPost(func() any { return &UserRoleReq{} }, wrapperTraceLog(h.removeRoleFromUser))))
router.POST(RoleCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &DatabaseReq{} }, wrapperTraceLog(h.listRoles))))
router.POST(RoleCategory+DescribeAction, timeoutMiddleware(wrapperPost(func() any { return &RoleReq{} }, wrapperTraceLog(h.describeRole))))
router.POST(RoleCategory+CreateAction, timeoutMiddleware(wrapperPost(func() any { return &RoleReq{} }, wrapperTraceLog(h.createRole))))
router.POST(RoleCategory+DropAction, timeoutMiddleware(wrapperPost(func() any { return &RoleReq{} }, wrapperTraceLog(h.dropRole))))
router.POST(RoleCategory+GrantPrivilegeAction, timeoutMiddleware(wrapperPost(func() any { return &GrantReq{} }, wrapperTraceLog(h.addPrivilegeToRole))))
router.POST(RoleCategory+RevokePrivilegeAction, timeoutMiddleware(wrapperPost(func() any { return &GrantReq{} }, wrapperTraceLog(h.removePrivilegeFromRole))))
router.POST(IndexCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.listIndexes)))))
router.POST(IndexCategory+DescribeAction, timeoutMiddleware(wrapperPost(func() any { return &IndexReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.describeIndex)))))
router.POST(IndexCategory+CreateAction, timeoutMiddleware(wrapperPost(func() any { return &IndexParamReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.createIndex)))))
// todo cannot drop index before release it ?
router.POST(IndexCategory+DropAction, timeoutMiddleware(wrapperPost(func() any { return &IndexReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.dropIndex)))))
router.POST(AliasCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &OptionalCollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.listAlias)))))
router.POST(AliasCategory+DescribeAction, timeoutMiddleware(wrapperPost(func() any { return &AliasReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.describeAlias)))))
router.POST(AliasCategory+CreateAction, timeoutMiddleware(wrapperPost(func() any { return &AliasCollectionReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.createAlias)))))
router.POST(AliasCategory+DropAction, timeoutMiddleware(wrapperPost(func() any { return &AliasReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.dropAlias)))))
router.POST(AliasCategory+AlterAction, timeoutMiddleware(wrapperPost(func() any { return &AliasCollectionReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.alterAlias)))))
router.POST(ImportJobCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &OptionalCollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.listImportJob)))))
router.POST(ImportJobCategory+CreateAction, timeoutMiddleware(wrapperPost(func() any { return &ImportReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.createImportJob)))))
router.POST(ImportJobCategory+GetProgressAction, timeoutMiddleware(wrapperPost(func() any { return &JobIDReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.getImportJobProcess)))))
}
type (
newReqFunc func() any
handlerFuncV2 func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error)
)
func wrapperPost(newReq newReqFunc, v2 handlerFuncV2) gin.HandlerFunc {
return func(c *gin.Context) {
req := newReq()
if err := c.ShouldBindBodyWith(req, binding.JSON); err != nil {
log.Warn("high level restful api, read parameters from request body fail", zap.Error(err),
zap.Any("url", c.Request.URL.Path), zap.Any("request", req))
if _, ok := err.(validator.ValidationErrors); ok {
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters),
HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", error: " + err.Error(),
})
} else if err == io.EOF {
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", the request body should be nil, however {} is valid",
})
} else {
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(),
})
}
return
}
dbName := ""
if getter, ok := req.(requestutil.DBNameGetter); ok {
dbName = getter.GetDbName()
}
if dbName == "" {
dbName = c.Request.Header.Get(HTTPHeaderDBName)
if dbName == "" {
dbName = DefaultDbName
}
}
username, _ := c.Get(ContextUsername)
ctx, span := otel.Tracer(typeutil.ProxyRole).Start(context.Background(), c.Request.URL.Path)
defer span.End()
ctx = proxy.NewContextWithMetadata(ctx, username.(string), dbName)
traceID := span.SpanContext().TraceID().String()
ctx = log.WithTraceID(ctx, traceID)
c.Keys["traceID"] = traceID
log.Ctx(ctx).Debug("high level restful api, read parameters from request body, then start to handle.",
zap.Any("url", c.Request.URL.Path), zap.Any("request", req))
v2(ctx, c, req, dbName)
}
}
func wrapperTraceLog(v2 handlerFuncV2) handlerFuncV2 {
return func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) {
switch proxy.Params.CommonCfg.TraceLogMode.GetAsInt() {
case 1: // simple info
fields := proxy.GetRequestBaseInfo(ctx, req, &grpc.UnaryServerInfo{
FullMethod: c.Request.URL.Path,
}, false)
log.Ctx(ctx).Info("trace info: simple", fields...)
case 2: // detail info
fields := proxy.GetRequestBaseInfo(ctx, req, &grpc.UnaryServerInfo{
FullMethod: c.Request.URL.Path,
}, true)
fields = append(fields, proxy.GetRequestFieldWithoutSensitiveInfo(req))
log.Ctx(ctx).Info("trace info: detail", fields...)
case 3: // detail info with request and response
fields := proxy.GetRequestBaseInfo(ctx, req, &grpc.UnaryServerInfo{
FullMethod: c.Request.URL.Path,
}, true)
fields = append(fields, proxy.GetRequestFieldWithoutSensitiveInfo(req))
log.Ctx(ctx).Info("trace info: all request", fields...)
}
resp, err := v2(ctx, c, req, dbName)
if proxy.Params.CommonCfg.TraceLogMode.GetAsInt() > 2 {
if err != nil {
log.Ctx(ctx).Info("trace info: all, error", zap.Error(err))
} else {
log.Ctx(ctx).Info("trace info: all, unknown", zap.Any("resp", resp))
}
}
return resp, err
}
}
func checkAuthorizationV2(ctx context.Context, c *gin.Context, ignoreErr bool, req interface{}) error {
username, ok := c.Get(ContextUsername)
if !ok || username.(string) == "" {
if !ignoreErr {
HTTPReturn(c, http.StatusUnauthorized, gin.H{HTTPReturnCode: merr.Code(merr.ErrNeedAuthenticate), HTTPReturnMessage: merr.ErrNeedAuthenticate.Error()})
}
return merr.ErrNeedAuthenticate
}
_, authErr := proxy.PrivilegeInterceptor(ctx, req)
if authErr != nil {
if !ignoreErr {
HTTPReturn(c, http.StatusForbidden, gin.H{HTTPReturnCode: merr.Code(authErr), HTTPReturnMessage: authErr.Error()})
}
return authErr
}
return nil
}
func wrapperProxy(ctx context.Context, c *gin.Context, req any, checkAuth bool, ignoreErr bool, fullMethod string, handler func(reqCtx context.Context, req any) (any, error)) (interface{}, error) {
return wrapperProxyWithLimit(ctx, c, req, checkAuth, ignoreErr, fullMethod, false, nil, handler)
}
func wrapperProxyWithLimit(ctx context.Context, c *gin.Context, req any, checkAuth bool, ignoreErr bool, fullMethod string, checkLimit bool, pxy types.ProxyComponent, handler func(reqCtx context.Context, req any) (any, error)) (interface{}, error) {
if baseGetter, ok := req.(BaseGetter); ok {
span := trace.SpanFromContext(ctx)
span.AddEvent(baseGetter.GetBase().GetMsgType().String())
}
if checkAuth {
err := checkAuthorizationV2(ctx, c, ignoreErr, req)
if err != nil {
return nil, err
}
}
if checkLimit {
_, err := CheckLimiter(ctx, req, pxy)
if err != nil {
log.Warn("high level restful api, fail to check limiter", zap.Error(err), zap.String("method", fullMethod))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrHTTPRateLimit),
HTTPReturnMessage: merr.ErrHTTPRateLimit.Error() + ", error: " + err.Error(),
})
return nil, RestRequestInterceptorErr
}
}
log.Ctx(ctx).Debug("high level restful api, try to do a grpc call", zap.Any("grpcRequest", req))
username, ok := c.Get(ContextUsername)
if !ok {
username = ""
}
response, err := proxy.HookInterceptor(ctx, req, username.(string), fullMethod, handler)
if err == nil {
status, ok := requestutil.GetStatusFromResponse(response)
if ok {
err = merr.Error(status)
}
}
if err != nil {
log.Ctx(ctx).Warn("high level restful api, grpc call failed", zap.Error(err), zap.Any("grpcRequest", req))
if !ignoreErr {
HTTPAbortReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
}
}
return response, err
}
func (h *HandlersV2) wrapperCheckDatabase(v2 handlerFuncV2) handlerFuncV2 {
return func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) {
if dbName == DefaultDbName || proxy.CheckDatabase(ctx, dbName) {
return v2(ctx, c, req, dbName)
}
resp, err := wrapperProxy(ctx, c, req, false, false, "/milvus.proto.milvus.MilvusService/ListDatabases", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.ListDatabases(reqCtx, &milvuspb.ListDatabasesRequest{})
})
if err != nil {
return resp, err
}
for _, db := range resp.(*milvuspb.ListDatabasesResponse).DbNames {
if db == dbName {
return v2(ctx, c, req, dbName)
}
}
log.Ctx(ctx).Warn("high level restful api, non-exist database", zap.String("database", dbName), zap.Any("request", req))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrDatabaseNotFound),
HTTPReturnMessage: merr.ErrDatabaseNotFound.Error() + ", database: " + dbName,
})
return nil, merr.ErrDatabaseNotFound
}
}
func (h *HandlersV2) hasCollection(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
getter, _ := anyReq.(requestutil.CollectionNameGetter)
collectionName := getter.GetCollectionName()
_, err := proxy.GetCachedCollectionSchema(ctx, dbName, collectionName)
has := true
if err != nil {
req := &milvuspb.HasCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
}
resp, err := wrapperProxy(ctx, c, req, false, false, "/milvus.proto.milvus.MilvusService/HasCollection", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.HasCollection(reqCtx, req.(*milvuspb.HasCollectionRequest))
})
if err != nil {
return nil, err
}
has = resp.(*milvuspb.BoolResponse).Value
}
HTTPReturn(c, http.StatusOK, wrapperReturnHas(has))
return has, nil
}
func (h *HandlersV2) listCollections(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
req := &milvuspb.ShowCollectionsRequest{
DbName: dbName,
}
c.Set(ContextRequest, req)
resp, err := wrapperProxy(ctx, c, req, false, false, "/milvus.proto.milvus.MilvusService/ShowCollections", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.ShowCollections(reqCtx, req.(*milvuspb.ShowCollectionsRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnList(resp.(*milvuspb.ShowCollectionsResponse).CollectionNames))
}
return resp, err
}
func (h *HandlersV2) getCollectionDetails(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter)
collectionName := collectionGetter.GetCollectionName()
req := &milvuspb.DescribeCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
}
c.Set(ContextRequest, req)
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/DescribeCollection", func(reqCtx context.Context, req any) (any, error) {
return h.proxy.DescribeCollection(reqCtx, req.(*milvuspb.DescribeCollectionRequest))
})
if err != nil {
return resp, err
}
coll := resp.(*milvuspb.DescribeCollectionResponse)
primaryField, ok := getPrimaryField(coll.Schema)
autoID := false
if !ok {
log.Ctx(ctx).Warn("high level restful api, get primary field from collection schema fail", zap.Any("collection schema", coll.Schema), zap.Any("request", anyReq))
} else {
autoID = primaryField.AutoID
}
errMessage := ""
loadStateReq := &milvuspb.GetLoadStateRequest{
DbName: dbName,
CollectionName: collectionName,
}
stateResp, err := wrapperProxy(ctx, c, loadStateReq, h.checkAuth, true, "/milvus.proto.milvus.MilvusService/GetLoadState", func(reqCtx context.Context, req any) (any, error) {
return h.proxy.GetLoadState(reqCtx, req.(*milvuspb.GetLoadStateRequest))
})
collLoadState := ""
if err == nil {
collLoadState = stateResp.(*milvuspb.GetLoadStateResponse).State.String()
} else {
errMessage += err.Error() + ";"
}
vectorField := ""
for _, field := range coll.Schema.Fields {
if typeutil.IsVectorType(field.DataType) {
vectorField = field.Name
break
}
}
indexDesc := []gin.H{}
descIndexReq := &milvuspb.DescribeIndexRequest{
DbName: dbName,
CollectionName: collectionName,
FieldName: vectorField,
}
indexResp, err := wrapperProxy(ctx, c, descIndexReq, h.checkAuth, true, "/milvus.proto.milvus.MilvusService/DescribeIndex", func(reqCtx context.Context, req any) (any, error) {
return h.proxy.DescribeIndex(reqCtx, req.(*milvuspb.DescribeIndexRequest))
})
if err == nil {
indexDesc = printIndexes(indexResp.(*milvuspb.DescribeIndexResponse).IndexDescriptions)
} else {
errMessage += err.Error() + ";"
}
var aliases []string
aliasReq := &milvuspb.ListAliasesRequest{
DbName: dbName,
CollectionName: collectionName,
}
aliasResp, err := wrapperProxy(ctx, c, aliasReq, h.checkAuth, true, "/milvus.proto.milvus.MilvusService/ListAliases", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.ListAliases(reqCtx, req.(*milvuspb.ListAliasesRequest))
})
if err == nil {
aliases = aliasResp.(*milvuspb.ListAliasesResponse).GetAliases()
} else {
errMessage += err.Error() + "."
}
if aliases == nil {
aliases = []string{}
}
if coll.Properties == nil {
coll.Properties = []*commonpb.KeyValuePair{}
}
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(nil), HTTPReturnData: gin.H{
HTTPCollectionName: coll.CollectionName,
HTTPCollectionID: coll.CollectionID,
HTTPReturnDescription: coll.Schema.Description,
HTTPReturnFieldAutoID: autoID,
"fields": printFieldsV2(coll.Schema.Fields),
"aliases": aliases,
"indexes": indexDesc,
"load": collLoadState,
"shardsNum": coll.ShardsNum,
"partitionsNum": coll.NumPartitions,
"consistencyLevel": commonpb.ConsistencyLevel_name[int32(coll.ConsistencyLevel)],
"enableDynamicField": coll.Schema.EnableDynamicField,
"properties": coll.Properties,
}, HTTPReturnMessage: errMessage})
return resp, nil
}
func (h *HandlersV2) getCollectionStats(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter)
req := &milvuspb.GetCollectionStatisticsRequest{
DbName: dbName,
CollectionName: collectionGetter.GetCollectionName(),
}
c.Set(ContextRequest, req)
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/GetCollectionStatistics", func(reqCtx context.Context, req any) (any, error) {
return h.proxy.GetCollectionStatistics(reqCtx, req.(*milvuspb.GetCollectionStatisticsRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnRowCount(resp.(*milvuspb.GetCollectionStatisticsResponse).Stats))
}
return resp, err
}
func (h *HandlersV2) getCollectionLoadState(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter)
req := &milvuspb.GetLoadStateRequest{
DbName: dbName,
CollectionName: collectionGetter.GetCollectionName(),
}
c.Set(ContextRequest, req)
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/GetLoadState", func(reqCtx context.Context, req any) (any, error) {
return h.proxy.GetLoadState(reqCtx, req.(*milvuspb.GetLoadStateRequest))
})
if err != nil {
return resp, err
}
if resp.(*milvuspb.GetLoadStateResponse).State == commonpb.LoadState_LoadStateNotExist {
err = merr.WrapErrCollectionNotFound(req.CollectionName)
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
return resp, err
} else if resp.(*milvuspb.GetLoadStateResponse).State == commonpb.LoadState_LoadStateNotLoad {
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(nil), HTTPReturnData: gin.H{
HTTPReturnLoadState: resp.(*milvuspb.GetLoadStateResponse).State.String(),
}})
return resp, err
}
partitionsGetter, _ := anyReq.(requestutil.PartitionNamesGetter)
progressReq := &milvuspb.GetLoadingProgressRequest{
CollectionName: collectionGetter.GetCollectionName(),
PartitionNames: partitionsGetter.GetPartitionNames(),
DbName: dbName,
}
progressResp, err := wrapperProxy(ctx, c, progressReq, h.checkAuth, true, "/milvus.proto.milvus.MilvusService/GetLoadingProgress", func(reqCtx context.Context, req any) (any, error) {
return h.proxy.GetLoadingProgress(reqCtx, req.(*milvuspb.GetLoadingProgressRequest))
})
progress := int64(-1)
errMessage := ""
if err == nil {
progress = progressResp.(*milvuspb.GetLoadingProgressResponse).Progress
} else {
errMessage += err.Error() + "."
}
state := commonpb.LoadState_LoadStateLoading.String()
if progress >= 100 {
state = commonpb.LoadState_LoadStateLoaded.String()
}
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(nil), HTTPReturnData: gin.H{
HTTPReturnLoadState: state,
HTTPReturnLoadProgress: progress,
}, HTTPReturnMessage: errMessage})
return resp, err
}
func (h *HandlersV2) dropCollection(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
getter, _ := anyReq.(requestutil.CollectionNameGetter)
req := &milvuspb.DropCollectionRequest{
DbName: dbName,
CollectionName: getter.GetCollectionName(),
}
c.Set(ContextRequest, req)
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/DropCollection", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.DropCollection(reqCtx, req.(*milvuspb.DropCollectionRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}
func (h *HandlersV2) renameCollection(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*RenameCollectionReq)
req := &milvuspb.RenameCollectionRequest{
DbName: dbName,
OldName: httpReq.CollectionName,
NewName: httpReq.NewCollectionName,
NewDBName: httpReq.NewDbName,
}
c.Set(ContextRequest, req)
if req.NewDBName == "" {
req.NewDBName = dbName
}
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/RenameCollection", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.RenameCollection(reqCtx, req.(*milvuspb.RenameCollectionRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}
func (h *HandlersV2) loadCollection(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
getter, _ := anyReq.(requestutil.CollectionNameGetter)
req := &milvuspb.LoadCollectionRequest{
DbName: dbName,
CollectionName: getter.GetCollectionName(),
}
c.Set(ContextRequest, req)
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/LoadCollection", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.LoadCollection(reqCtx, req.(*milvuspb.LoadCollectionRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}
func (h *HandlersV2) releaseCollection(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
getter, _ := anyReq.(requestutil.CollectionNameGetter)
req := &milvuspb.ReleaseCollectionRequest{
DbName: dbName,
CollectionName: getter.GetCollectionName(),
}
c.Set(ContextRequest, req)
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/ReleaseCollection", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.ReleaseCollection(reqCtx, req.(*milvuspb.ReleaseCollectionRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}
// copy from internal/proxy/task_query.go
func matchCountRule(outputs []string) bool {
return len(outputs) == 1 && strings.ToLower(strings.TrimSpace(outputs[0])) == "count(*)"
}
func (h *HandlersV2) query(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*QueryReqV2)
req := &milvuspb.QueryRequest{
DbName: dbName,
CollectionName: httpReq.CollectionName,
Expr: httpReq.Filter,
OutputFields: httpReq.OutputFields,
PartitionNames: httpReq.PartitionNames,
QueryParams: []*commonpb.KeyValuePair{},
UseDefaultConsistency: true,
}
c.Set(ContextRequest, req)
if httpReq.Offset > 0 {
req.QueryParams = append(req.QueryParams, &commonpb.KeyValuePair{Key: ParamOffset, Value: strconv.FormatInt(int64(httpReq.Offset), 10)})
}
if httpReq.Limit > 0 && !matchCountRule(httpReq.OutputFields) {
req.QueryParams = append(req.QueryParams, &commonpb.KeyValuePair{Key: ParamLimit, Value: strconv.FormatInt(int64(httpReq.Limit), 10)})
}
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/Query", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.Query(reqCtx, req.(*milvuspb.QueryRequest))
})
if err == nil {
queryResp := resp.(*milvuspb.QueryResults)
allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64))
outputData, err := buildQueryResp(int64(0), queryResp.OutputFields, queryResp.FieldsData, nil, nil, allowJS)
if err != nil {
log.Ctx(ctx).Warn("high level restful api, fail to deal with query result", zap.Any("response", resp), zap.Error(err))
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidSearchResult),
HTTPReturnMessage: merr.ErrInvalidSearchResult.Error() + ", error: " + err.Error(),
})
} else {
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(nil),
HTTPReturnData: outputData,
HTTPReturnCost: proxy.GetCostValue(queryResp.GetStatus()),
})
}
}
return resp, err
}
func (h *HandlersV2) get(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*CollectionIDReq)
collSchema, err := h.GetCollectionSchema(ctx, c, dbName, httpReq.CollectionName)
if err != nil {
return nil, err
}
body, _ := c.Get(gin.BodyBytesKey)
filter, err := checkGetPrimaryKey(collSchema, gjson.Get(string(body.([]byte)), DefaultPrimaryFieldName))
if err != nil {
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrCheckPrimaryKey),
HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error() + ", error: " + err.Error(),
})
return nil, err
}
req := &milvuspb.QueryRequest{
DbName: dbName,
CollectionName: httpReq.CollectionName,
OutputFields: httpReq.OutputFields,
PartitionNames: httpReq.PartitionNames,
Expr: filter,
UseDefaultConsistency: true,
}
c.Set(ContextRequest, req)
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/Query", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.Query(reqCtx, req.(*milvuspb.QueryRequest))
})
if err == nil {
queryResp := resp.(*milvuspb.QueryResults)
allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64))
outputData, err := buildQueryResp(int64(0), queryResp.OutputFields, queryResp.FieldsData, nil, nil, allowJS)
if err != nil {
log.Ctx(ctx).Warn("high level restful api, fail to deal with get result", zap.Any("response", resp), zap.Error(err))
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidSearchResult),
HTTPReturnMessage: merr.ErrInvalidSearchResult.Error() + ", error: " + err.Error(),
})
} else {
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(nil),
HTTPReturnData: outputData,
HTTPReturnCost: proxy.GetCostValue(queryResp.GetStatus()),
})
}
}
return resp, err
}
func (h *HandlersV2) delete(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*CollectionFilterReq)
collSchema, err := h.GetCollectionSchema(ctx, c, dbName, httpReq.CollectionName)
if err != nil {
return nil, err
}
req := &milvuspb.DeleteRequest{
DbName: dbName,
CollectionName: httpReq.CollectionName,
PartitionName: httpReq.PartitionName,
Expr: httpReq.Filter,
}
c.Set(ContextRequest, req)
if req.Expr == "" {
body, _ := c.Get(gin.BodyBytesKey)
filter, err := checkGetPrimaryKey(collSchema, gjson.Get(string(body.([]byte)), DefaultPrimaryFieldName))
if err != nil {
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrCheckPrimaryKey),
HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error() + ", error: " + err.Error(),
})
return nil, err
}
req.Expr = filter
}
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/Delete", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.Delete(reqCtx, req.(*milvuspb.DeleteRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefaultWithCost(
proxy.GetCostValue(resp.(*milvuspb.MutationResult).GetStatus()),
))
}
return resp, err
}
func (h *HandlersV2) insert(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*CollectionDataReq)
req := &milvuspb.InsertRequest{
DbName: dbName,
CollectionName: httpReq.CollectionName,
PartitionName: httpReq.PartitionName,
// PartitionName: "_default",
}
c.Set(ContextRequest, req)
collSchema, err := h.GetCollectionSchema(ctx, c, dbName, httpReq.CollectionName)
if err != nil {
return nil, err
}
body, _ := c.Get(gin.BodyBytesKey)
err, httpReq.Data = checkAndSetData(string(body.([]byte)), collSchema)
if err != nil {
log.Ctx(ctx).Warn("high level restful api, fail to deal with insert data", zap.Error(err), zap.String("body", string(body.([]byte))))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData),
HTTPReturnMessage: merr.ErrInvalidInsertData.Error() + ", error: " + err.Error(),
})
return nil, err
}
req.NumRows = uint32(len(httpReq.Data))
req.FieldsData, err = anyToColumns(httpReq.Data, collSchema)
if err != nil {
log.Ctx(ctx).Warn("high level restful api, fail to deal with insert data", zap.Any("data", httpReq.Data), zap.Error(err))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData),
HTTPReturnMessage: merr.ErrInvalidInsertData.Error() + ", error: " + err.Error(),
})
return nil, err
}
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/Insert", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.Insert(reqCtx, req.(*milvuspb.InsertRequest))
})
if err == nil {
insertResp := resp.(*milvuspb.MutationResult)
cost := proxy.GetCostValue(insertResp.GetStatus())
switch insertResp.IDs.GetIdField().(type) {
case *schemapb.IDs_IntId:
allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64))
if allowJS {
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(nil),
HTTPReturnData: gin.H{"insertCount": insertResp.InsertCnt, "insertIds": insertResp.IDs.IdField.(*schemapb.IDs_IntId).IntId.Data},
HTTPReturnCost: cost,
})
} else {
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(nil),
HTTPReturnData: gin.H{"insertCount": insertResp.InsertCnt, "insertIds": formatInt64(insertResp.IDs.IdField.(*schemapb.IDs_IntId).IntId.Data)},
HTTPReturnCost: cost,
})
}
case *schemapb.IDs_StrId:
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(nil),
HTTPReturnData: gin.H{"insertCount": insertResp.InsertCnt, "insertIds": insertResp.IDs.IdField.(*schemapb.IDs_StrId).StrId.Data},
HTTPReturnCost: cost,
})
default:
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrCheckPrimaryKey),
HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error() + ", error: unsupported primary key data type",
})
}
}
return resp, err
}
func (h *HandlersV2) upsert(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*CollectionDataReq)
req := &milvuspb.UpsertRequest{
DbName: dbName,
CollectionName: httpReq.CollectionName,
PartitionName: httpReq.PartitionName,
// PartitionName: "_default",
}
c.Set(ContextRequest, req)
collSchema, err := h.GetCollectionSchema(ctx, c, dbName, httpReq.CollectionName)
if err != nil {
return nil, err
}
if collSchema.AutoID {
err := merr.WrapErrParameterInvalid("autoID: false", "autoID: true", "cannot upsert an autoID collection")
HTTPAbortReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
return nil, err
}
body, _ := c.Get(gin.BodyBytesKey)
err, httpReq.Data = checkAndSetData(string(body.([]byte)), collSchema)
if err != nil {
log.Ctx(ctx).Warn("high level restful api, fail to deal with upsert data", zap.Any("body", body), zap.Error(err))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData),
HTTPReturnMessage: merr.ErrInvalidInsertData.Error() + ", error: " + err.Error(),
})
return nil, err
}
req.NumRows = uint32(len(httpReq.Data))
req.FieldsData, err = anyToColumns(httpReq.Data, collSchema)
if err != nil {
log.Ctx(ctx).Warn("high level restful api, fail to deal with upsert data", zap.Any("data", httpReq.Data), zap.Error(err))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData),
HTTPReturnMessage: merr.ErrInvalidInsertData.Error() + ", error: " + err.Error(),
})
return nil, err
}
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/Upsert", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.Upsert(reqCtx, req.(*milvuspb.UpsertRequest))
})
if err == nil {
upsertResp := resp.(*milvuspb.MutationResult)
cost := proxy.GetCostValue(upsertResp.GetStatus())
switch upsertResp.IDs.GetIdField().(type) {
case *schemapb.IDs_IntId:
allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64))
if allowJS {
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(nil),
HTTPReturnData: gin.H{"upsertCount": upsertResp.UpsertCnt, "upsertIds": upsertResp.IDs.IdField.(*schemapb.IDs_IntId).IntId.Data},
HTTPReturnCost: cost,
})
} else {
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(nil),
HTTPReturnData: gin.H{"upsertCount": upsertResp.UpsertCnt, "upsertIds": formatInt64(upsertResp.IDs.IdField.(*schemapb.IDs_IntId).IntId.Data)},
HTTPReturnCost: cost,
})
}
case *schemapb.IDs_StrId:
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(nil),
HTTPReturnData: gin.H{"upsertCount": upsertResp.UpsertCnt, "upsertIds": upsertResp.IDs.IdField.(*schemapb.IDs_StrId).StrId.Data},
HTTPReturnCost: cost,
})
default:
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrCheckPrimaryKey),
HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error() + ", error: unsupported primary key data type",
})
}
}
return resp, err
}
func generatePlaceholderGroup(ctx context.Context, body string, collSchema *schemapb.CollectionSchema, fieldName string) ([]byte, error) {
var err error
var vectorField *schemapb.FieldSchema
if len(fieldName) == 0 {
for _, field := range collSchema.Fields {
if typeutil.IsVectorType(field.DataType) {
if len(fieldName) == 0 {
fieldName = field.Name
vectorField = field
} else {
return nil, errors.New("search without annsField, but already found multiple vector fields: [" + fieldName + ", " + field.Name + ",,,]")
}
}
}
} else {
for _, field := range collSchema.Fields {
if field.Name == fieldName && typeutil.IsVectorType(field.DataType) {
vectorField = field
break
}
}
}
if vectorField == nil {
return nil, errors.New("cannot find a vector field named: " + fieldName)
}
dim := int64(0)
if !typeutil.IsSparseFloatVectorType(vectorField.DataType) {
dim, _ = getDim(vectorField)
}
phv, err := convertVectors2Placeholder(body, vectorField.DataType, dim)
if err != nil {
return nil, err
}
return proto.Marshal(&commonpb.PlaceholderGroup{
Placeholders: []*commonpb.PlaceholderValue{
phv,
},
})
}
func generateSearchParams(ctx context.Context, c *gin.Context, reqParams map[string]float64) ([]*commonpb.KeyValuePair, error) {
params := map[string]interface{}{ // auto generated mapping
"level": int(commonpb.ConsistencyLevel_Bounded),
}
if reqParams != nil {
radius, radiusOk := reqParams[ParamRadius]
rangeFilter, rangeFilterOk := reqParams[ParamRangeFilter]
if rangeFilterOk {
if !radiusOk {
log.Ctx(ctx).Warn("high level restful api, search params invalid, because only " + ParamRangeFilter)
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: invalid search params",
})
return nil, merr.ErrIncorrectParameterFormat
}
params[ParamRangeFilter] = rangeFilter
}
if radiusOk {
params[ParamRadius] = radius
}
}
bs, _ := json.Marshal(params)
searchParams := []*commonpb.KeyValuePair{
{Key: Params, Value: string(bs)},
}
return searchParams, nil
}
func (h *HandlersV2) search(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*SearchReqV2)
req := &milvuspb.SearchRequest{
DbName: dbName,
CollectionName: httpReq.CollectionName,
Dsl: httpReq.Filter,
DslType: commonpb.DslType_BoolExprV1,
OutputFields: httpReq.OutputFields,
PartitionNames: httpReq.PartitionNames,
UseDefaultConsistency: true,
}
c.Set(ContextRequest, req)
collSchema, err := h.GetCollectionSchema(ctx, c, dbName, httpReq.CollectionName)
if err != nil {
return nil, err
}
searchParams, err := generateSearchParams(ctx, c, httpReq.Params)
if err != nil {
return nil, err
}
searchParams = append(searchParams, &commonpb.KeyValuePair{Key: common.TopKKey, Value: strconv.FormatInt(int64(httpReq.Limit), 10)})
searchParams = append(searchParams, &commonpb.KeyValuePair{Key: ParamOffset, Value: strconv.FormatInt(int64(httpReq.Offset), 10)})
searchParams = append(searchParams, &commonpb.KeyValuePair{Key: ParamGroupByField, Value: httpReq.GroupByField})
searchParams = append(searchParams, &commonpb.KeyValuePair{Key: proxy.AnnsFieldKey, Value: httpReq.AnnsField})
searchParams = append(searchParams, &commonpb.KeyValuePair{Key: ParamRoundDecimal, Value: "-1"})
body, _ := c.Get(gin.BodyBytesKey)
placeholderGroup, err := generatePlaceholderGroup(ctx, string(body.([]byte)), collSchema, httpReq.AnnsField)
if err != nil {
log.Ctx(ctx).Warn("high level restful api, search with vector invalid", zap.Error(err))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(),
})
return nil, err
}
req.SearchParams = searchParams
req.PlaceholderGroup = placeholderGroup
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/Search", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.Search(reqCtx, req.(*milvuspb.SearchRequest))
})
if err == nil {
searchResp := resp.(*milvuspb.SearchResults)
cost := proxy.GetCostValue(searchResp.GetStatus())
if searchResp.Results.TopK == int64(0) {
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(nil), HTTPReturnData: []interface{}{}, HTTPReturnCost: cost})
} else {
allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64))
outputData, err := buildQueryResp(0, searchResp.Results.OutputFields, searchResp.Results.FieldsData, searchResp.Results.Ids, searchResp.Results.Scores, allowJS)
if err != nil {
log.Ctx(ctx).Warn("high level restful api, fail to deal with search result", zap.Any("result", searchResp.Results), zap.Error(err))
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidSearchResult),
HTTPReturnMessage: merr.ErrInvalidSearchResult.Error() + ", error: " + err.Error(),
})
} else {
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(nil), HTTPReturnData: outputData, HTTPReturnCost: cost})
}
}
}
return resp, err
}
func (h *HandlersV2) advancedSearch(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*HybridSearchReq)
req := &milvuspb.HybridSearchRequest{
DbName: dbName,
CollectionName: httpReq.CollectionName,
Requests: []*milvuspb.SearchRequest{},
OutputFields: httpReq.OutputFields,
}
c.Set(ContextRequest, req)
collSchema, err := h.GetCollectionSchema(ctx, c, dbName, httpReq.CollectionName)
if err != nil {
return nil, err
}
body, _ := c.Get(gin.BodyBytesKey)
searchArray := gjson.Get(string(body.([]byte)), "search").Array()
for i, subReq := range httpReq.Search {
searchParams, err := generateSearchParams(ctx, c, subReq.Params)
if err != nil {
return nil, err
}
searchParams = append(searchParams, &commonpb.KeyValuePair{Key: common.TopKKey, Value: strconv.FormatInt(int64(subReq.Limit), 10)})
searchParams = append(searchParams, &commonpb.KeyValuePair{Key: ParamOffset, Value: strconv.FormatInt(int64(subReq.Offset), 10)})
searchParams = append(searchParams, &commonpb.KeyValuePair{Key: ParamGroupByField, Value: subReq.GroupByField})
searchParams = append(searchParams, &commonpb.KeyValuePair{Key: proxy.AnnsFieldKey, Value: subReq.AnnsField})
searchParams = append(searchParams, &commonpb.KeyValuePair{Key: ParamRoundDecimal, Value: "-1"})
placeholderGroup, err := generatePlaceholderGroup(ctx, searchArray[i].Raw, collSchema, subReq.AnnsField)
if err != nil {
log.Ctx(ctx).Warn("high level restful api, search with vector invalid", zap.Error(err))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(),
})
return nil, err
}
searchReq := &milvuspb.SearchRequest{
DbName: dbName,
CollectionName: httpReq.CollectionName,
Dsl: subReq.Filter,
PlaceholderGroup: placeholderGroup,
DslType: commonpb.DslType_BoolExprV1,
OutputFields: httpReq.OutputFields,
PartitionNames: httpReq.PartitionNames,
SearchParams: searchParams,
UseDefaultConsistency: true,
}
req.Requests = append(req.Requests, searchReq)
}
bs, _ := json.Marshal(httpReq.Rerank.Params)
req.RankParams = []*commonpb.KeyValuePair{
{Key: proxy.RankTypeKey, Value: httpReq.Rerank.Strategy},
{Key: proxy.RankParamsKey, Value: string(bs)},
{Key: ParamLimit, Value: strconv.FormatInt(int64(httpReq.Limit), 10)},
{Key: ParamRoundDecimal, Value: "-1"},
}
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/HybridSearch", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.HybridSearch(reqCtx, req.(*milvuspb.HybridSearchRequest))
})
if err == nil {
searchResp := resp.(*milvuspb.SearchResults)
cost := proxy.GetCostValue(searchResp.GetStatus())
if searchResp.Results.TopK == int64(0) {
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(nil), HTTPReturnData: []interface{}{}, HTTPReturnCost: cost})
} else {
allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64))
outputData, err := buildQueryResp(0, searchResp.Results.OutputFields, searchResp.Results.FieldsData, searchResp.Results.Ids, searchResp.Results.Scores, allowJS)
if err != nil {
log.Ctx(ctx).Warn("high level restful api, fail to deal with search result", zap.Any("result", searchResp.Results), zap.Error(err))
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidSearchResult),
HTTPReturnMessage: merr.ErrInvalidSearchResult.Error() + ", error: " + err.Error(),
})
} else {
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(nil), HTTPReturnData: outputData, HTTPReturnCost: cost})
}
}
}
return resp, err
}
func (h *HandlersV2) createCollection(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*CollectionReq)
req := &milvuspb.CreateCollectionRequest{
DbName: dbName,
CollectionName: httpReq.CollectionName,
Properties: []*commonpb.KeyValuePair{},
}
c.Set(ContextRequest, req)
var schema []byte
var err error
fieldNames := map[string]bool{}
partitionsNum := int64(-1)
if httpReq.Schema.Fields == nil || len(httpReq.Schema.Fields) == 0 {
if httpReq.Dimension == 0 {
err := merr.WrapErrParameterInvalid("collectionName & dimension", "collectionName",
"dimension is required for quickly create collection(default metric type: "+DefaultMetricType+")")
log.Ctx(ctx).Warn("high level restful api, quickly create collection fail", zap.Error(err), zap.Any("request", anyReq))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(err),
HTTPReturnMessage: err.Error(),
})
return nil, err
}
idDataType := schemapb.DataType_Int64
idParams := []*commonpb.KeyValuePair{}
switch httpReq.IDType {
case "VarChar", "Varchar":
idDataType = schemapb.DataType_VarChar
idParams = append(idParams, &commonpb.KeyValuePair{
Key: common.MaxLengthKey,
Value: fmt.Sprintf("%v", httpReq.Params["max_length"]),
})
httpReq.IDType = "VarChar"
case "", "Int64", "int64":
httpReq.IDType = "Int64"
default:
err := merr.WrapErrParameterInvalid("Int64, Varchar", httpReq.IDType,
"idType can only be [Int64, VarChar], default: Int64")
log.Ctx(ctx).Warn("high level restful api, quickly create collection fail", zap.Error(err), zap.Any("request", anyReq))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(err),
HTTPReturnMessage: err.Error(),
})
return nil, err
}
if len(httpReq.PrimaryFieldName) == 0 {
httpReq.PrimaryFieldName = DefaultPrimaryFieldName
}
if len(httpReq.VectorFieldName) == 0 {
httpReq.VectorFieldName = DefaultVectorFieldName
}
enableDynamic := EnableDynamic
if enStr, ok := httpReq.Params["enableDynamicField"]; ok {
if en, err := strconv.ParseBool(fmt.Sprintf("%v", enStr)); err == nil {
enableDynamic = en
}
}
schema, err = proto.Marshal(&schemapb.CollectionSchema{
Name: httpReq.CollectionName,
Fields: []*schemapb.FieldSchema{
{
FieldID: common.StartOfUserFieldID,
Name: httpReq.PrimaryFieldName,
IsPrimaryKey: true,
DataType: idDataType,
AutoID: httpReq.AutoID,
TypeParams: idParams,
},
{
FieldID: common.StartOfUserFieldID + 1,
Name: httpReq.VectorFieldName,
IsPrimaryKey: false,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: Dim,
Value: strconv.FormatInt(int64(httpReq.Dimension), 10),
},
},
AutoID: DisableAutoID,
},
},
EnableDynamicField: enableDynamic,
})
} else {
collSchema := schemapb.CollectionSchema{
Name: httpReq.CollectionName,
AutoID: httpReq.Schema.AutoId,
Fields: []*schemapb.FieldSchema{},
EnableDynamicField: httpReq.Schema.EnableDynamicField,
}
for _, field := range httpReq.Schema.Fields {
fieldDataType, ok := schemapb.DataType_value[field.DataType]
if !ok {
log.Ctx(ctx).Warn("field's data type is invalid(case sensitive).", zap.Any("fieldDataType", field.DataType), zap.Any("field", field))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrParameterInvalid),
HTTPReturnMessage: merr.ErrParameterInvalid.Error() + ", data type " + field.DataType + " is invalid(case sensitive).",
})
return nil, merr.ErrParameterInvalid
}
dataType := schemapb.DataType(fieldDataType)
fieldSchema := schemapb.FieldSchema{
Name: field.FieldName,
IsPrimaryKey: field.IsPrimary,
IsPartitionKey: field.IsPartitionKey,
DataType: dataType,
TypeParams: []*commonpb.KeyValuePair{},
}
if dataType == schemapb.DataType_Array {
if _, ok := schemapb.DataType_value[field.ElementDataType]; !ok {
log.Ctx(ctx).Warn("element's data type is invalid(case sensitive).", zap.Any("elementDataType", field.ElementDataType), zap.Any("field", field))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrParameterInvalid),
HTTPReturnMessage: merr.ErrParameterInvalid.Error() + ", element data type " + field.ElementDataType + " is invalid(case sensitive).",
})
return nil, merr.ErrParameterInvalid
}
fieldSchema.ElementType = schemapb.DataType(schemapb.DataType_value[field.ElementDataType])
}
if field.IsPrimary {
fieldSchema.AutoID = httpReq.Schema.AutoId
}
if field.IsPartitionKey {
partitionsNum = int64(64)
if partitionsNumStr, ok := httpReq.Params["partitionsNum"]; ok {
if partitions, err := strconv.ParseInt(fmt.Sprintf("%v", partitionsNumStr), 10, 64); err == nil {
partitionsNum = partitions
}
}
}
for key, fieldParam := range field.ElementTypeParams {
fieldSchema.TypeParams = append(fieldSchema.TypeParams, &commonpb.KeyValuePair{Key: key, Value: fmt.Sprintf("%v", fieldParam)})
}
collSchema.Fields = append(collSchema.Fields, &fieldSchema)
fieldNames[field.FieldName] = true
}
schema, err = proto.Marshal(&collSchema)
}
if err != nil {
log.Ctx(ctx).Warn("high level restful api, marshal collection schema fail", zap.Error(err), zap.Any("request", anyReq))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMarshalCollectionSchema),
HTTPReturnMessage: merr.ErrMarshalCollectionSchema.Error() + ", error: " + err.Error(),
})
return nil, err
}
req.Schema = schema
shardsNum := int32(ShardNumDefault)
if shardsNumStr, ok := httpReq.Params["shardsNum"]; ok {
if shards, err := strconv.ParseInt(fmt.Sprintf("%v", shardsNumStr), 10, 64); err == nil {
shardsNum = int32(shards)
}
}
req.ShardsNum = shardsNum
consistencyLevel := commonpb.ConsistencyLevel_Bounded
if _, ok := httpReq.Params["consistencyLevel"]; ok {
if level, ok := commonpb.ConsistencyLevel_value[fmt.Sprintf("%s", httpReq.Params["consistencyLevel"])]; ok {
consistencyLevel = commonpb.ConsistencyLevel(level)
} else {
err := merr.WrapErrParameterInvalid("Strong, Session, Bounded, Eventually, Customized", httpReq.Params["consistencyLevel"],
"consistencyLevel can only be [Strong, Session, Bounded, Eventually, Customized], default: Bounded")
log.Ctx(ctx).Warn("high level restful api, create collection fail", zap.Error(err), zap.Any("request", anyReq))
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(err),
HTTPReturnMessage: err.Error(),
})
return nil, err
}
}
req.ConsistencyLevel = consistencyLevel
if partitionsNum > 0 {
req.NumPartitions = partitionsNum
}
if _, ok := httpReq.Params["ttlSeconds"]; ok {
req.Properties = append(req.Properties, &commonpb.KeyValuePair{
Key: common.CollectionTTLConfigKey,
Value: fmt.Sprintf("%v", httpReq.Params["ttlSeconds"]),
})
}
if _, ok := httpReq.Params["partitionKeyIsolation"]; ok {
req.Properties = append(req.Properties, &commonpb.KeyValuePair{
Key: common.PartitionKeyIsolationKey,
Value: fmt.Sprintf("%v", httpReq.Params["partitionKeyIsolation"]),
})
}
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/CreateCollection", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.CreateCollection(reqCtx, req.(*milvuspb.CreateCollectionRequest))
})
if err != nil {
return resp, err
}
if httpReq.Schema.Fields == nil || len(httpReq.Schema.Fields) == 0 {
if len(httpReq.MetricType) == 0 {
httpReq.MetricType = DefaultMetricType
}
createIndexReq := &milvuspb.CreateIndexRequest{
DbName: dbName,
CollectionName: httpReq.CollectionName,
FieldName: httpReq.VectorFieldName,
IndexName: httpReq.VectorFieldName,
ExtraParams: []*commonpb.KeyValuePair{{Key: common.MetricTypeKey, Value: httpReq.MetricType}},
}
statusResponse, err := wrapperProxyWithLimit(ctx, c, createIndexReq, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/CreateIndex", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.CreateIndex(ctx, req.(*milvuspb.CreateIndexRequest))
})
if err != nil {
return statusResponse, err
}
} else {
if len(httpReq.IndexParams) == 0 {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
return nil, nil
}
for _, indexParam := range httpReq.IndexParams {
if _, ok := fieldNames[indexParam.FieldName]; !ok {
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters),
HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", error: `" + indexParam.FieldName + "` hasn't defined in schema",
})
return nil, merr.ErrMissingRequiredParameters
}
createIndexReq := &milvuspb.CreateIndexRequest{
DbName: dbName,
CollectionName: httpReq.CollectionName,
FieldName: indexParam.FieldName,
IndexName: indexParam.IndexName,
ExtraParams: []*commonpb.KeyValuePair{{Key: common.MetricTypeKey, Value: indexParam.MetricType}},
}
for key, value := range indexParam.Params {
createIndexReq.ExtraParams = append(createIndexReq.ExtraParams, &commonpb.KeyValuePair{Key: key, Value: fmt.Sprintf("%v", value)})
}
statusResponse, err := wrapperProxyWithLimit(ctx, c, createIndexReq, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/CreateIndex", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.CreateIndex(ctx, req.(*milvuspb.CreateIndexRequest))
})
if err != nil {
return statusResponse, err
}
}
}
loadReq := &milvuspb.LoadCollectionRequest{
DbName: dbName,
CollectionName: httpReq.CollectionName,
}
statusResponse, err := wrapperProxyWithLimit(ctx, c, loadReq, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/LoadCollection", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.LoadCollection(ctx, req.(*milvuspb.LoadCollectionRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return statusResponse, err
}
func (h *HandlersV2) listPartitions(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter)
req := &milvuspb.ShowPartitionsRequest{
DbName: dbName,
CollectionName: collectionGetter.GetCollectionName(),
}
c.Set(ContextRequest, req)
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/ShowPartitions", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.ShowPartitions(reqCtx, req.(*milvuspb.ShowPartitionsRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnList(resp.(*milvuspb.ShowPartitionsResponse).PartitionNames))
}
return resp, err
}
func (h *HandlersV2) hasPartitions(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter)
partitionGetter, _ := anyReq.(requestutil.PartitionNameGetter)
req := &milvuspb.HasPartitionRequest{
DbName: dbName,
CollectionName: collectionGetter.GetCollectionName(),
PartitionName: partitionGetter.GetPartitionName(),
}
c.Set(ContextRequest, req)
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/HasPartition", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.HasPartition(reqCtx, req.(*milvuspb.HasPartitionRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnHas(resp.(*milvuspb.BoolResponse).Value))
}
return resp, err
}
// data coord will collect partitions' row_count
// proxy grpc call only support partition not partitions
func (h *HandlersV2) statsPartition(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter)
partitionGetter, _ := anyReq.(requestutil.PartitionNameGetter)
req := &milvuspb.GetPartitionStatisticsRequest{
DbName: dbName,
CollectionName: collectionGetter.GetCollectionName(),
PartitionName: partitionGetter.GetPartitionName(),
}
c.Set(ContextRequest, req)
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/GetPartitionStatistics", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.GetPartitionStatistics(reqCtx, req.(*milvuspb.GetPartitionStatisticsRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnRowCount(resp.(*milvuspb.GetPartitionStatisticsResponse).Stats))
}
return resp, err
}
func (h *HandlersV2) createPartition(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter)
partitionGetter, _ := anyReq.(requestutil.PartitionNameGetter)
req := &milvuspb.CreatePartitionRequest{
DbName: dbName,
CollectionName: collectionGetter.GetCollectionName(),
PartitionName: partitionGetter.GetPartitionName(),
}
c.Set(ContextRequest, req)
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/CreatePartition", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.CreatePartition(reqCtx, req.(*milvuspb.CreatePartitionRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}
func (h *HandlersV2) dropPartition(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter)
partitionGetter, _ := anyReq.(requestutil.PartitionNameGetter)
req := &milvuspb.DropPartitionRequest{
DbName: dbName,
CollectionName: collectionGetter.GetCollectionName(),
PartitionName: partitionGetter.GetPartitionName(),
}
c.Set(ContextRequest, req)
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/DropPartition", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.DropPartition(reqCtx, req.(*milvuspb.DropPartitionRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}
func (h *HandlersV2) loadPartitions(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*PartitionsReq)
req := &milvuspb.LoadPartitionsRequest{
DbName: dbName,
CollectionName: httpReq.CollectionName,
PartitionNames: httpReq.PartitionNames,
}
c.Set(ContextRequest, req)
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/LoadPartitions", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.LoadPartitions(reqCtx, req.(*milvuspb.LoadPartitionsRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}
func (h *HandlersV2) releasePartitions(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*PartitionsReq)
req := &milvuspb.ReleasePartitionsRequest{
DbName: dbName,
CollectionName: httpReq.CollectionName,
PartitionNames: httpReq.PartitionNames,
}
c.Set(ContextRequest, req)
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/ReleasePartitions", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.ReleasePartitions(reqCtx, req.(*milvuspb.ReleasePartitionsRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}
func (h *HandlersV2) listUsers(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
req := &milvuspb.ListCredUsersRequest{}
c.Set(ContextRequest, req)
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/ListCredUsers", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.ListCredUsers(reqCtx, req.(*milvuspb.ListCredUsersRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnList(resp.(*milvuspb.ListCredUsersResponse).Usernames))
}
return resp, err
}
func (h *HandlersV2) describeUser(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
userNameGetter, _ := anyReq.(UserNameGetter)
userName := userNameGetter.GetUserName()
req := &milvuspb.SelectUserRequest{
User: &milvuspb.UserEntity{
Name: userName,
},
IncludeRoleInfo: true,
}
c.Set(ContextRequest, req)
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/SelectUser", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.SelectUser(reqCtx, req.(*milvuspb.SelectUserRequest))
})
if err == nil {
roleNames := []string{}
for _, userRole := range resp.(*milvuspb.SelectUserResponse).Results {
if userRole.User.Name == userName {
for _, role := range userRole.Roles {
roleNames = append(roleNames, role.Name)
}
}
}
HTTPReturn(c, http.StatusOK, wrapperReturnList(roleNames))
}
return resp, err
}
func (h *HandlersV2) createUser(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*PasswordReq)
req := &milvuspb.CreateCredentialRequest{
Username: httpReq.UserName,
Password: crypto.Base64Encode(httpReq.Password),
}
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/CreateCredential", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.CreateCredential(reqCtx, req.(*milvuspb.CreateCredentialRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}
func (h *HandlersV2) updateUser(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*NewPasswordReq)
req := &milvuspb.UpdateCredentialRequest{
Username: httpReq.UserName,
OldPassword: crypto.Base64Encode(httpReq.Password),
NewPassword: crypto.Base64Encode(httpReq.NewPassword),
}
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/UpdateCredential", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.UpdateCredential(reqCtx, req.(*milvuspb.UpdateCredentialRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}
func (h *HandlersV2) dropUser(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
getter, _ := anyReq.(UserNameGetter)
req := &milvuspb.DeleteCredentialRequest{
Username: getter.GetUserName(),
}
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/DeleteCredential", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.DeleteCredential(reqCtx, req.(*milvuspb.DeleteCredentialRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}
func (h *HandlersV2) operateRoleToUser(ctx context.Context, c *gin.Context, userName, roleName string, operateType milvuspb.OperateUserRoleType) (interface{}, error) {
req := &milvuspb.OperateUserRoleRequest{
Username: userName,
RoleName: roleName,
Type: operateType,
}
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/OperateUserRole", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.OperateUserRole(reqCtx, req.(*milvuspb.OperateUserRoleRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}
func (h *HandlersV2) addRoleToUser(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
return h.operateRoleToUser(ctx, c, anyReq.(*UserRoleReq).UserName, anyReq.(*UserRoleReq).RoleName, milvuspb.OperateUserRoleType_AddUserToRole)
}
func (h *HandlersV2) removeRoleFromUser(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
return h.operateRoleToUser(ctx, c, anyReq.(*UserRoleReq).UserName, anyReq.(*UserRoleReq).RoleName, milvuspb.OperateUserRoleType_RemoveUserFromRole)
}
func (h *HandlersV2) listRoles(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
req := &milvuspb.SelectRoleRequest{}
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/SelectRole", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.SelectRole(reqCtx, req.(*milvuspb.SelectRoleRequest))
})
if err == nil {
roleNames := []string{}
for _, role := range resp.(*milvuspb.SelectRoleResponse).Results {
roleNames = append(roleNames, role.Role.Name)
}
HTTPReturn(c, http.StatusOK, wrapperReturnList(roleNames))
}
return resp, err
}
func (h *HandlersV2) describeRole(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
getter, _ := anyReq.(RoleNameGetter)
req := &milvuspb.SelectGrantRequest{
Entity: &milvuspb.GrantEntity{Role: &milvuspb.RoleEntity{Name: getter.GetRoleName()}, DbName: dbName},
}
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/SelectGrant", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.SelectGrant(reqCtx, req.(*milvuspb.SelectGrantRequest))
})
if err == nil {
privileges := [](map[string]string){}
for _, grant := range resp.(*milvuspb.SelectGrantResponse).Entities {
privilege := map[string]string{
HTTPReturnObjectType: grant.Object.Name,
HTTPReturnObjectName: grant.ObjectName,
HTTPReturnPrivilege: grant.Grantor.Privilege.Name,
HTTPReturnDbName: grant.DbName,
HTTPReturnGrantor: grant.Grantor.User.Name,
}
privileges = append(privileges, privilege)
}
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(nil), HTTPReturnData: privileges})
}
return resp, err
}
func (h *HandlersV2) createRole(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
getter, _ := anyReq.(RoleNameGetter)
req := &milvuspb.CreateRoleRequest{
Entity: &milvuspb.RoleEntity{Name: getter.GetRoleName()},
}
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/CreateRole", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.CreateRole(reqCtx, req.(*milvuspb.CreateRoleRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}
func (h *HandlersV2) dropRole(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
getter, _ := anyReq.(RoleNameGetter)
req := &milvuspb.DropRoleRequest{
RoleName: getter.GetRoleName(),
}
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/DropRole", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.DropRole(reqCtx, req.(*milvuspb.DropRoleRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}
func (h *HandlersV2) operatePrivilegeToRole(ctx context.Context, c *gin.Context, httpReq *GrantReq, operateType milvuspb.OperatePrivilegeType, dbName string) (interface{}, error) {
req := &milvuspb.OperatePrivilegeRequest{
Entity: &milvuspb.GrantEntity{
Role: &milvuspb.RoleEntity{Name: httpReq.RoleName},
Object: &milvuspb.ObjectEntity{Name: httpReq.ObjectType},
ObjectName: httpReq.ObjectName,
DbName: dbName,
Grantor: &milvuspb.GrantorEntity{
Privilege: &milvuspb.PrivilegeEntity{Name: httpReq.Privilege},
},
},
Type: operateType,
}
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/OperatePrivilege", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.OperatePrivilege(reqCtx, req.(*milvuspb.OperatePrivilegeRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}
func (h *HandlersV2) addPrivilegeToRole(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
return h.operatePrivilegeToRole(ctx, c, anyReq.(*GrantReq), milvuspb.OperatePrivilegeType_Grant, dbName)
}
func (h *HandlersV2) removePrivilegeFromRole(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
return h.operatePrivilegeToRole(ctx, c, anyReq.(*GrantReq), milvuspb.OperatePrivilegeType_Revoke, dbName)
}
func (h *HandlersV2) listIndexes(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter)
indexNames := []string{}
req := &milvuspb.DescribeIndexRequest{
DbName: dbName,
CollectionName: collectionGetter.GetCollectionName(),
}
c.Set(ContextRequest, req)
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/DescribeIndex", func(reqCtx context.Context, req any) (any, error) {
resp, err := h.proxy.DescribeIndex(reqCtx, req.(*milvuspb.DescribeIndexRequest))
if errors.Is(err, merr.ErrIndexNotFound) {
return &milvuspb.DescribeIndexResponse{
IndexDescriptions: []*milvuspb.IndexDescription{},
}, nil
}
if resp != nil && errors.Is(merr.Error(resp.Status), merr.ErrIndexNotFound) {
return &milvuspb.DescribeIndexResponse{
IndexDescriptions: []*milvuspb.IndexDescription{},
}, nil
}
return resp, err
})
if err != nil {
return resp, err
}
for _, index := range resp.(*milvuspb.DescribeIndexResponse).IndexDescriptions {
indexNames = append(indexNames, index.IndexName)
}
HTTPReturn(c, http.StatusOK, wrapperReturnList(indexNames))
return resp, err
}
func (h *HandlersV2) describeIndex(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter)
indexGetter, _ := anyReq.(IndexNameGetter)
req := &milvuspb.DescribeIndexRequest{
DbName: dbName,
CollectionName: collectionGetter.GetCollectionName(),
IndexName: indexGetter.GetIndexName(),
}
c.Set(ContextRequest, req)
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/DescribeIndex", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.DescribeIndex(reqCtx, req.(*milvuspb.DescribeIndexRequest))
})
if err == nil {
indexInfos := [](map[string]any){}
for _, indexDescription := range resp.(*milvuspb.DescribeIndexResponse).IndexDescriptions {
metricType := ""
indexType := ""
for _, pair := range indexDescription.Params {
if pair.Key == common.MetricTypeKey {
metricType = pair.Value
} else if pair.Key == common.IndexTypeKey {
indexType = pair.Value
}
}
indexInfo := map[string]any{
HTTPIndexName: indexDescription.IndexName,
HTTPIndexField: indexDescription.FieldName,
HTTPReturnIndexType: indexType,
HTTPReturnIndexMetricType: metricType,
HTTPReturnIndexTotalRows: indexDescription.TotalRows,
HTTPReturnIndexPendingRows: indexDescription.PendingIndexRows,
HTTPReturnIndexIndexedRows: indexDescription.IndexedRows,
HTTPReturnIndexState: indexDescription.State.String(),
HTTPReturnIndexFailReason: indexDescription.IndexStateFailReason,
}
indexInfos = append(indexInfos, indexInfo)
}
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(nil), HTTPReturnData: indexInfos})
}
return resp, err
}
func (h *HandlersV2) createIndex(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*IndexParamReq)
for _, indexParam := range httpReq.IndexParams {
req := &milvuspb.CreateIndexRequest{
DbName: dbName,
CollectionName: httpReq.CollectionName,
FieldName: indexParam.FieldName,
IndexName: indexParam.IndexName,
ExtraParams: []*commonpb.KeyValuePair{
{Key: common.MetricTypeKey, Value: indexParam.MetricType},
},
}
c.Set(ContextRequest, req)
for key, value := range indexParam.Params {
req.ExtraParams = append(req.ExtraParams, &commonpb.KeyValuePair{Key: key, Value: fmt.Sprintf("%v", value)})
}
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/CreateIndex", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.CreateIndex(reqCtx, req.(*milvuspb.CreateIndexRequest))
})
if err != nil {
return resp, err
}
}
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
return httpReq.IndexParams, nil
}
func (h *HandlersV2) dropIndex(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
collGetter, _ := anyReq.(requestutil.CollectionNameGetter)
indexGetter, _ := anyReq.(IndexNameGetter)
req := &milvuspb.DropIndexRequest{
DbName: dbName,
CollectionName: collGetter.GetCollectionName(),
IndexName: indexGetter.GetIndexName(),
}
c.Set(ContextRequest, req)
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/DropIndex", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.DropIndex(reqCtx, req.(*milvuspb.DropIndexRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}
func (h *HandlersV2) listAlias(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter)
req := &milvuspb.ListAliasesRequest{
DbName: dbName,
CollectionName: collectionGetter.GetCollectionName(),
}
c.Set(ContextRequest, req)
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/ListAliases", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.ListAliases(reqCtx, req.(*milvuspb.ListAliasesRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnList(resp.(*milvuspb.ListAliasesResponse).Aliases))
}
return resp, err
}
func (h *HandlersV2) describeAlias(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
getter, _ := anyReq.(AliasNameGetter)
req := &milvuspb.DescribeAliasRequest{
DbName: dbName,
Alias: getter.GetAliasName(),
}
c.Set(ContextRequest, req)
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/DescribeAlias", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.DescribeAlias(reqCtx, req.(*milvuspb.DescribeAliasRequest))
})
if err == nil {
response := resp.(*milvuspb.DescribeAliasResponse)
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(nil), HTTPReturnData: gin.H{
HTTPDbName: response.DbName,
HTTPCollectionName: response.Collection,
HTTPAliasName: response.Alias,
}})
}
return resp, err
}
func (h *HandlersV2) createAlias(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter)
aliasGetter, _ := anyReq.(AliasNameGetter)
req := &milvuspb.CreateAliasRequest{
DbName: dbName,
CollectionName: collectionGetter.GetCollectionName(),
Alias: aliasGetter.GetAliasName(),
}
c.Set(ContextRequest, req)
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/CreateAlias", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.CreateAlias(reqCtx, req.(*milvuspb.CreateAliasRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}
func (h *HandlersV2) dropAlias(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
getter, _ := anyReq.(AliasNameGetter)
req := &milvuspb.DropAliasRequest{
DbName: dbName,
Alias: getter.GetAliasName(),
}
c.Set(ContextRequest, req)
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/DropAlias", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.DropAlias(reqCtx, req.(*milvuspb.DropAliasRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}
func (h *HandlersV2) alterAlias(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter)
aliasGetter, _ := anyReq.(AliasNameGetter)
req := &milvuspb.AlterAliasRequest{
DbName: dbName,
CollectionName: collectionGetter.GetCollectionName(),
Alias: aliasGetter.GetAliasName(),
}
c.Set(ContextRequest, req)
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/AlterAlias", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.AlterAlias(reqCtx, req.(*milvuspb.AlterAliasRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}
func (h *HandlersV2) listImportJob(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
var collectionName string
if collectionGetter, ok := anyReq.(requestutil.CollectionNameGetter); ok {
collectionName = collectionGetter.GetCollectionName()
}
req := &internalpb.ListImportsRequest{
DbName: dbName,
CollectionName: collectionName,
}
c.Set(ContextRequest, req)
if h.checkAuth {
err := checkAuthorizationV2(ctx, c, false, &milvuspb.ListImportsAuthPlaceholder{
DbName: dbName,
CollectionName: collectionName,
})
if err != nil {
return nil, err
}
}
resp, err := wrapperProxy(ctx, c, req, false, false, "/milvus.proto.milvus.MilvusService/ListImports", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.ListImports(reqCtx, req.(*internalpb.ListImportsRequest))
})
if err == nil {
returnData := make(map[string]interface{})
records := make([]map[string]interface{}, 0)
response := resp.(*internalpb.ListImportsResponse)
for i, jobID := range response.GetJobIDs() {
jobDetail := make(map[string]interface{})
jobDetail["jobId"] = jobID
jobDetail["collectionName"] = response.GetCollectionNames()[i]
jobDetail["state"] = response.GetStates()[i].String()
jobDetail["progress"] = response.GetProgresses()[i]
reason := response.GetReasons()[i]
if reason != "" {
jobDetail["reason"] = reason
}
records = append(records, jobDetail)
}
returnData["records"] = records
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(nil), HTTPReturnData: returnData})
}
return resp, err
}
func (h *HandlersV2) createImportJob(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
var (
collectionGetter = anyReq.(requestutil.CollectionNameGetter)
partitionGetter = anyReq.(requestutil.PartitionNameGetter)
filesGetter = anyReq.(FilesGetter)
optionsGetter = anyReq.(OptionsGetter)
)
req := &internalpb.ImportRequest{
DbName: dbName,
CollectionName: collectionGetter.GetCollectionName(),
PartitionName: partitionGetter.GetPartitionName(),
Files: lo.Map(filesGetter.GetFiles(), func(paths []string, _ int) *internalpb.ImportFile {
return &internalpb.ImportFile{Paths: paths}
}),
Options: funcutil.Map2KeyValuePair(optionsGetter.GetOptions()),
}
c.Set(ContextRequest, req)
if h.checkAuth {
err := checkAuthorizationV2(ctx, c, false, &milvuspb.ImportAuthPlaceholder{
DbName: dbName,
CollectionName: collectionGetter.GetCollectionName(),
PartitionName: partitionGetter.GetPartitionName(),
})
if err != nil {
return nil, err
}
}
resp, err := wrapperProxy(ctx, c, req, false, false, "/milvus.proto.milvus.MilvusService/ImportV2", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.ImportV2(reqCtx, req.(*internalpb.ImportRequest))
})
if err == nil {
returnData := make(map[string]interface{})
returnData["jobId"] = resp.(*internalpb.ImportResponse).GetJobID()
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(nil), HTTPReturnData: returnData})
}
return resp, err
}
func (h *HandlersV2) getImportJobProcess(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
jobIDGetter := anyReq.(JobIDGetter)
req := &internalpb.GetImportProgressRequest{
DbName: dbName,
JobID: jobIDGetter.GetJobID(),
}
c.Set(ContextRequest, req)
if h.checkAuth {
err := checkAuthorizationV2(ctx, c, false, &milvuspb.GetImportProgressAuthPlaceholder{
DbName: dbName,
})
if err != nil {
return nil, err
}
}
resp, err := wrapperProxy(ctx, c, req, false, false, "/milvus.proto.milvus.MilvusService/GetImportProgress", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.GetImportProgress(reqCtx, req.(*internalpb.GetImportProgressRequest))
})
if err == nil {
response := resp.(*internalpb.GetImportProgressResponse)
returnData := make(map[string]interface{})
returnData["jobId"] = jobIDGetter.GetJobID()
returnData["collectionName"] = response.GetCollectionName()
returnData["completeTime"] = response.GetCompleteTime()
returnData["state"] = response.GetState().String()
returnData["progress"] = response.GetProgress()
returnData["importedRows"] = response.GetImportedRows()
returnData["totalRows"] = response.GetTotalRows()
reason := response.GetReason()
if reason != "" {
returnData["reason"] = reason
}
details := make([]map[string]interface{}, 0)
totalFileSize := int64(0)
for _, taskProgress := range response.GetTaskProgresses() {
detail := make(map[string]interface{})
detail["fileName"] = taskProgress.GetFileName()
detail["fileSize"] = taskProgress.GetFileSize()
detail["progress"] = taskProgress.GetProgress()
detail["completeTime"] = taskProgress.GetCompleteTime()
detail["state"] = taskProgress.GetState()
detail["importedRows"] = taskProgress.GetImportedRows()
detail["totalRows"] = taskProgress.GetTotalRows()
reason = taskProgress.GetReason()
if reason != "" {
detail["reason"] = reason
}
details = append(details, detail)
totalFileSize += taskProgress.GetFileSize()
}
returnData["fileSize"] = totalFileSize
returnData["details"] = details
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(nil), HTTPReturnData: returnData})
}
return resp, err
}
func (h *HandlersV2) GetCollectionSchema(ctx context.Context, c *gin.Context, dbName, collectionName string) (*schemapb.CollectionSchema, error) {
collSchema, err := proxy.GetCachedCollectionSchema(ctx, dbName, collectionName)
if err == nil {
return collSchema.CollectionSchema, nil
}
descReq := &milvuspb.DescribeCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
}
descResp, err := wrapperProxy(ctx, c, descReq, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/DescribeCollection", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.DescribeCollection(reqCtx, req.(*milvuspb.DescribeCollectionRequest))
})
if err != nil {
return nil, err
}
response, _ := descResp.(*milvuspb.DescribeCollectionResponse)
return response.Schema, nil
}