package httpserver import ( "context" "encoding/json" "net/http" "strconv" "github.com/cockroachdb/errors" "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" "github.com/golang/protobuf/proto" "github.com/tidwall/gjson" "go.uber.org/zap" "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proxy" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/requestutil" ) var RestRequestInterceptorErr = errors.New("interceptor error placeholder") func checkAuthorization(ctx context.Context, c *gin.Context, req interface{}) error { username, ok := c.Get(ContextUsername) if !ok || username.(string) == "" { c.JSON(http.StatusUnauthorized, gin.H{HTTPReturnCode: merr.Code(merr.ErrNeedAuthenticate), HTTPReturnMessage: merr.ErrNeedAuthenticate.Error()}) return RestRequestInterceptorErr } _, authErr := proxy.PrivilegeInterceptor(ctx, req) if authErr != nil { c.JSON(http.StatusForbidden, gin.H{HTTPReturnCode: merr.Code(authErr), HTTPReturnMessage: authErr.Error()}) return RestRequestInterceptorErr } return nil } type RestRequestInterceptor func(ctx context.Context, ginCtx *gin.Context, req any, handler func(reqCtx context.Context, req any) (any, error)) (any, error) // HandlersV1 handles http requests type HandlersV1 struct { proxy types.ProxyComponent interceptors []RestRequestInterceptor } // NewHandlers creates a new HandlersV1 func NewHandlersV1(proxyComponent types.ProxyComponent) *HandlersV1 { h := &HandlersV1{ proxy: proxyComponent, interceptors: []RestRequestInterceptor{}, } if proxy.Params.CommonCfg.AuthorizationEnabled.GetAsBool() { h.interceptors = append(h.interceptors, // authorization func(ctx context.Context, ginCtx *gin.Context, req any, handler func(reqCtx context.Context, req any) (any, error)) (any, error) { err := checkAuthorization(ctx, ginCtx, req) if err != nil { return nil, err } return handler(ctx, req) }) } h.interceptors = append(h.interceptors, // check database func(ctx context.Context, ginCtx *gin.Context, req any, handler func(reqCtx context.Context, req any) (any, error)) (any, error) { value, ok := requestutil.GetDbNameFromRequest(req) if !ok { return handler(ctx, req) } err := h.checkDatabase(ctx, ginCtx, value.(string)) if err != nil { return nil, err } return handler(ctx, req) }) h.interceptors = append(h.interceptors, // trace request func(ctx context.Context, ginCtx *gin.Context, req any, handler func(reqCtx context.Context, req any) (any, error)) (any, error) { return proxy.TraceLogInterceptor(ctx, req, &grpc.UnaryServerInfo{ FullMethod: ginCtx.Request.URL.Path, }, handler) }) return h } func (h *HandlersV1) checkDatabase(ctx context.Context, c *gin.Context, dbName string) error { if dbName == DefaultDbName { return nil } if proxy.CheckDatabase(ctx, dbName) { return nil } response, err := h.proxy.ListDatabases(ctx, &milvuspb.ListDatabasesRequest{}) if err == nil { err = merr.Error(response.GetStatus()) } if err != nil { c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) return RestRequestInterceptorErr } for _, db := range response.DbNames { if db == dbName { return nil } } c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrDatabaseNotFound), HTTPReturnMessage: merr.ErrDatabaseNotFound.Error() + ", database: " + dbName, }) return RestRequestInterceptorErr } func (h *HandlersV1) describeCollection(ctx context.Context, c *gin.Context, dbName string, collectionName string) (*schemapb.CollectionSchema, error) { collSchema, err := proxy.GetCachedCollectionSchema(ctx, dbName, collectionName) if err == nil { return collSchema.CollectionSchema, nil } req := milvuspb.DescribeCollectionRequest{ DbName: dbName, CollectionName: collectionName, } response, err := h.proxy.DescribeCollection(ctx, &req) if err == nil { err = merr.Error(response.GetStatus()) } if err != nil { c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) return nil, err } primaryField, ok := getPrimaryField(response.Schema) if ok && primaryField.AutoID && !primaryField.AutoID { log.Warn("primary filed autoID VS schema autoID", zap.String("collectionName", collectionName), zap.Bool("primary Field", primaryField.AutoID), zap.Bool("schema", primaryField.AutoID)) response.Schema.AutoID = EnableAutoID } return response.Schema, nil } func (h *HandlersV1) hasCollection(ctx context.Context, c *gin.Context, dbName string, collectionName string) (bool, error) { req := milvuspb.HasCollectionRequest{ DbName: dbName, CollectionName: collectionName, } response, err := h.proxy.HasCollection(ctx, &req) if err == nil { err = merr.Error(response.GetStatus()) } if err != nil { c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) return false, err } return response.Value, nil } func (h *HandlersV1) RegisterRoutesToV1(router gin.IRouter) { router.GET(VectorCollectionsPath, h.listCollections) router.POST(VectorCollectionsCreatePath, h.createCollection) router.GET(VectorCollectionsDescribePath, h.getCollectionDetails) router.POST(VectorCollectionsDropPath, h.dropCollection) router.POST(VectorQueryPath, h.query) router.POST(VectorGetPath, h.get) router.POST(VectorDeletePath, h.delete) router.POST(VectorInsertPath, h.insert) router.POST(VectorUpsertPath, h.upsert) router.POST(VectorSearchPath, h.search) } func (h *HandlersV1) executeRestRequestInterceptor(ctx context.Context, ginCtx *gin.Context, req any, handler func(reqCtx context.Context, req any) (any, error), ) (any, error) { f := handler for i := len(h.interceptors) - 1; i >= 0; i-- { f = func(j int, handlerFunc func(reqCtx context.Context, req any) (any, error)) func(reqCtx context.Context, req any) (any, error) { return func(reqCtx context.Context, req any) (any, error) { return h.interceptors[j](reqCtx, ginCtx, req, handlerFunc) } }(i, f) } return f(ctx, req) } func (h *HandlersV1) listCollections(c *gin.Context) { dbName := c.DefaultQuery(HTTPDbName, DefaultDbName) req := &milvuspb.ShowCollectionsRequest{ DbName: dbName, } username, _ := c.Get(ContextUsername) ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName) resp, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) { return h.proxy.ShowCollections(reqCtx, req.(*milvuspb.ShowCollectionsRequest)) }) if err == RestRequestInterceptorErr { return } if err == nil { err = merr.Error(resp.(*milvuspb.ShowCollectionsResponse).GetStatus()) } if err != nil { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) return } response := resp.(*milvuspb.ShowCollectionsResponse) var collections []string if response.CollectionNames != nil { collections = response.CollectionNames } else { collections = []string{} } c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: collections}) } func (h *HandlersV1) createCollection(c *gin.Context) { httpReq := CreateCollectionReq{ DbName: DefaultDbName, MetricType: DefaultMetricType, PrimaryField: DefaultPrimaryFieldName, VectorField: DefaultVectorFieldName, EnableDynamicField: EnableDynamic, } if err := c.ShouldBindWith(&httpReq, binding.JSON); err != nil { log.Warn("high level restful api, the parameter of create collection is incorrect", zap.Any("request", httpReq), zap.Error(err)) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(), }) return } if httpReq.CollectionName == "" || httpReq.Dimension == 0 { log.Warn("high level restful api, create collection require parameters: [collectionName, dimension], but miss", zap.Any("request", httpReq)) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName, dimension]", }) return } schema, err := proto.Marshal(&schemapb.CollectionSchema{ Name: httpReq.CollectionName, Description: httpReq.Description, AutoID: EnableAutoID, Fields: []*schemapb.FieldSchema{ { FieldID: common.StartOfUserFieldID, Name: httpReq.PrimaryField, IsPrimaryKey: true, DataType: schemapb.DataType_Int64, AutoID: EnableAutoID, }, { FieldID: common.StartOfUserFieldID + 1, Name: httpReq.VectorField, IsPrimaryKey: false, DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ { Key: Dim, Value: strconv.FormatInt(int64(httpReq.Dimension), 10), }, }, AutoID: DisableAutoID, }, }, EnableDynamicField: httpReq.EnableDynamicField, }) if err != nil { log.Warn("high level restful api, marshal collection schema fail", zap.Any("request", httpReq), zap.Error(err)) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrMarshalCollectionSchema), HTTPReturnMessage: merr.ErrMarshalCollectionSchema.Error() + ", error: " + err.Error(), }) return } req := &milvuspb.CreateCollectionRequest{ DbName: httpReq.DbName, CollectionName: httpReq.CollectionName, Schema: schema, ShardsNum: ShardNumDefault, ConsistencyLevel: commonpb.ConsistencyLevel_Bounded, } username, _ := c.Get(ContextUsername) ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName) response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) { return h.proxy.CreateCollection(reqCtx, req.(*milvuspb.CreateCollectionRequest)) }) if err == RestRequestInterceptorErr { return } if err == nil { err = merr.Error(response.(*commonpb.Status)) } if err != nil { c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) return } statusResponse, err := h.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ DbName: httpReq.DbName, CollectionName: httpReq.CollectionName, FieldName: httpReq.VectorField, IndexName: DefaultIndexName, ExtraParams: []*commonpb.KeyValuePair{{Key: common.MetricTypeKey, Value: httpReq.MetricType}}, }) if err == nil { err = merr.Error(statusResponse) } if err != nil { c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) return } statusResponse, err = h.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ DbName: httpReq.DbName, CollectionName: httpReq.CollectionName, }) if err == nil { err = merr.Error(statusResponse) } if err != nil { c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) return } c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{}}) } func (h *HandlersV1) getCollectionDetails(c *gin.Context) { collectionName := c.Query(HTTPCollectionName) if collectionName == "" { log.Warn("high level restful api, desc collection require parameter: [collectionName], but miss") c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName]", }) return } dbName := c.DefaultQuery(HTTPDbName, DefaultDbName) username, _ := c.Get(ContextUsername) ctx := proxy.NewContextWithMetadata(c, username.(string), dbName) req := &milvuspb.DescribeCollectionRequest{ DbName: dbName, CollectionName: collectionName, } response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) { return h.proxy.DescribeCollection(reqCtx, req.(*milvuspb.DescribeCollectionRequest)) }) if err == nil { err = merr.Error(response.(*milvuspb.DescribeCollectionResponse).GetStatus()) } if err != nil { c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) return } coll := response.(*milvuspb.DescribeCollectionResponse) primaryField, ok := getPrimaryField(coll.Schema) if ok && primaryField.AutoID && !primaryField.AutoID { log.Warn("primary filed autoID VS schema autoID", zap.String("collectionName", collectionName), zap.Bool("primary Field", primaryField.AutoID), zap.Bool("schema", primaryField.AutoID)) coll.Schema.AutoID = EnableAutoID } stateResp, err := h.proxy.GetLoadState(ctx, &milvuspb.GetLoadStateRequest{ DbName: dbName, CollectionName: collectionName, }) collLoadState := "" if err == nil { err = merr.Error(stateResp.GetStatus()) } if err != nil { log.Warn("get collection load state fail", zap.String("collection", collectionName), zap.Error(err), ) } else { collLoadState = stateResp.State.String() } vectorField := "" for _, field := range coll.Schema.Fields { if IsVectorField(field) { vectorField = field.Name break } } indexResp, err := h.proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{ DbName: dbName, CollectionName: collectionName, FieldName: vectorField, }) if err == nil { err = merr.Error(indexResp.GetStatus()) } var indexDesc []gin.H if err != nil { indexDesc = []gin.H{} log.Warn("get indexes description fail", zap.String("collection", collectionName), zap.String("vectorField", vectorField), zap.Error(err), ) } else { indexDesc = printIndexes(indexResp.IndexDescriptions) } c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{ HTTPCollectionName: coll.CollectionName, HTTPReturnDescription: coll.Schema.Description, "fields": printFields(coll.Schema.Fields), "indexes": indexDesc, "load": collLoadState, "shardsNum": coll.ShardsNum, "enableDynamicField": coll.Schema.EnableDynamicField, }}) } func (h *HandlersV1) dropCollection(c *gin.Context) { httpReq := DropCollectionReq{ DbName: DefaultDbName, } if err := c.ShouldBindWith(&httpReq, binding.JSON); err != nil { log.Warn("high level restful api, the parameter of drop collection is incorrect", zap.Any("request", httpReq), zap.Error(err)) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(), }) return } if httpReq.CollectionName == "" { log.Warn("high level restful api, drop collection require parameter: [collectionName], but miss") c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName]", }) return } req := &milvuspb.DropCollectionRequest{ DbName: httpReq.DbName, CollectionName: httpReq.CollectionName, } username, _ := c.Get(ContextUsername) ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName) response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) { has, err := h.hasCollection(ctx, c, httpReq.DbName, httpReq.CollectionName) if err != nil { return nil, RestRequestInterceptorErr } if !has { c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrCollectionNotFound), HTTPReturnMessage: merr.ErrCollectionNotFound.Error() + ", database: " + httpReq.DbName + ", collection: " + httpReq.CollectionName, }) return nil, RestRequestInterceptorErr } return h.proxy.DropCollection(reqCtx, req.(*milvuspb.DropCollectionRequest)) }) if err == RestRequestInterceptorErr { return } if err == nil { err = merr.Error(response.(*commonpb.Status)) } if err != nil { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) } else { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{}}) } } func (h *HandlersV1) query(c *gin.Context) { httpReq := QueryReq{ DbName: DefaultDbName, Limit: 100, OutputFields: []string{DefaultOutputFields}, } if err := c.ShouldBindWith(&httpReq, binding.JSON); err != nil { log.Warn("high level restful api, the parameter of query is incorrect", zap.Any("request", httpReq), zap.Error(err)) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(), }) return } if httpReq.CollectionName == "" || httpReq.Filter == "" { log.Warn("high level restful api, query require parameter: [collectionName, filter], but miss") c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName, filter]", }) return } req := &milvuspb.QueryRequest{ DbName: httpReq.DbName, CollectionName: httpReq.CollectionName, Expr: httpReq.Filter, OutputFields: httpReq.OutputFields, GuaranteeTimestamp: BoundedTimestamp, QueryParams: []*commonpb.KeyValuePair{}, } if httpReq.Offset > 0 { req.QueryParams = append(req.QueryParams, &commonpb.KeyValuePair{Key: ParamOffset, Value: strconv.FormatInt(int64(httpReq.Offset), 10)}) } if httpReq.Limit > 0 { req.QueryParams = append(req.QueryParams, &commonpb.KeyValuePair{Key: ParamLimit, Value: strconv.FormatInt(int64(httpReq.Limit), 10)}) } username, _ := c.Get(ContextUsername) ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName) response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) { return h.proxy.Query(reqCtx, req.(*milvuspb.QueryRequest)) }) if err == RestRequestInterceptorErr { return } if err == nil { err = merr.Error(response.(*milvuspb.QueryResults).GetStatus()) } if err != nil { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) } else { queryResp := response.(*milvuspb.QueryResults) allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64)) outputData, err := buildQueryResp(int64(0), queryResp.OutputFields, queryResp.FieldsData, nil, nil, allowJS) if err != nil { log.Warn("high level restful api, fail to deal with query result", zap.Any("response", response), zap.Error(err)) c.JSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrInvalidSearchResult), HTTPReturnMessage: merr.ErrInvalidSearchResult.Error() + ", error: " + err.Error(), }) } else { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: outputData}) } } } func (h *HandlersV1) get(c *gin.Context) { httpReq := GetReq{ DbName: DefaultDbName, OutputFields: []string{DefaultOutputFields}, } if err := c.ShouldBindBodyWith(&httpReq, binding.JSON); err != nil { log.Warn("high level restful api, the parameter of get is incorrect", zap.Any("request", httpReq), zap.Error(err)) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(), }) return } if httpReq.CollectionName == "" || httpReq.ID == nil { log.Warn("high level restful api, get require parameter: [collectionName, id], but miss") c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName, id]", }) return } req := &milvuspb.QueryRequest{ DbName: httpReq.DbName, CollectionName: httpReq.CollectionName, OutputFields: httpReq.OutputFields, GuaranteeTimestamp: BoundedTimestamp, } username, _ := c.Get(ContextUsername) ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName) response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) { collSchema, err := h.describeCollection(ctx, c, httpReq.DbName, httpReq.CollectionName) if err != nil || collSchema == nil { return nil, RestRequestInterceptorErr } body, _ := c.Get(gin.BodyBytesKey) filter, err := checkGetPrimaryKey(collSchema, gjson.Get(string(body.([]byte)), DefaultPrimaryFieldName)) if err != nil { c.JSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrCheckPrimaryKey), HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error() + ", error: " + err.Error(), }) return nil, RestRequestInterceptorErr } queryReq := req.(*milvuspb.QueryRequest) queryReq.Expr = filter return h.proxy.Query(reqCtx, queryReq) }) if err == RestRequestInterceptorErr { return } if err == nil { err = merr.Error(response.(*milvuspb.QueryResults).GetStatus()) } if err != nil { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) } else { queryResp := response.(*milvuspb.QueryResults) allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64)) outputData, err := buildQueryResp(int64(0), queryResp.OutputFields, queryResp.FieldsData, nil, nil, allowJS) if err != nil { log.Warn("high level restful api, fail to deal with get result", zap.Any("response", response), zap.Error(err)) c.JSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrInvalidSearchResult), HTTPReturnMessage: merr.ErrInvalidSearchResult.Error() + ", error: " + err.Error(), }) } else { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: outputData}) } } } func (h *HandlersV1) delete(c *gin.Context) { httpReq := DeleteReq{ DbName: DefaultDbName, } if err := c.ShouldBindBodyWith(&httpReq, binding.JSON); err != nil { log.Warn("high level restful api, the parameter of delete is incorrect", zap.Any("request", httpReq), zap.Error(err)) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(), }) return } if httpReq.CollectionName == "" || (httpReq.ID == nil && httpReq.Filter == "") { log.Warn("high level restful api, delete require parameter: [collectionName, id/filter], but miss") c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName, id/filter]", }) return } req := &milvuspb.DeleteRequest{ DbName: httpReq.DbName, CollectionName: httpReq.CollectionName, } username, _ := c.Get(ContextUsername) ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName) response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) { collSchema, err := h.describeCollection(ctx, c, httpReq.DbName, httpReq.CollectionName) if err != nil || collSchema == nil { return nil, RestRequestInterceptorErr } deleteReq := req.(*milvuspb.DeleteRequest) deleteReq.Expr = httpReq.Filter if deleteReq.Expr == "" { body, _ := c.Get(gin.BodyBytesKey) filter, err := checkGetPrimaryKey(collSchema, gjson.Get(string(body.([]byte)), DefaultPrimaryFieldName)) if err != nil { c.JSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrCheckPrimaryKey), HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error() + ", error: " + err.Error(), }) return nil, RestRequestInterceptorErr } deleteReq.Expr = filter } return h.proxy.Delete(ctx, deleteReq) }) if err == RestRequestInterceptorErr { return } if err == nil { err = merr.Error(response.(*milvuspb.MutationResult).GetStatus()) } if err != nil { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) } else { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{}}) } } func (h *HandlersV1) insert(c *gin.Context) { httpReq := InsertReq{ DbName: DefaultDbName, } if err := c.ShouldBindBodyWith(&httpReq, binding.JSON); err != nil { singleInsertReq := SingleInsertReq{ DbName: DefaultDbName, } if err = c.ShouldBindBodyWith(&singleInsertReq, binding.JSON); err != nil { log.Warn("high level restful api, the parameter of insert is incorrect", zap.Any("request", httpReq), zap.Error(err)) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(), }) return } httpReq.DbName = singleInsertReq.DbName httpReq.CollectionName = singleInsertReq.CollectionName httpReq.Data = []map[string]interface{}{singleInsertReq.Data} } if httpReq.CollectionName == "" || httpReq.Data == nil { log.Warn("high level restful api, insert require parameter: [collectionName, data], but miss") c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName, data]", }) return } req := &milvuspb.InsertRequest{ DbName: httpReq.DbName, CollectionName: httpReq.CollectionName, PartitionName: "_default", NumRows: uint32(len(httpReq.Data)), } username, _ := c.Get(ContextUsername) ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName) response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) { collSchema, err := h.describeCollection(ctx, c, httpReq.DbName, httpReq.CollectionName) if err != nil || collSchema == nil { return nil, RestRequestInterceptorErr } body, _ := c.Get(gin.BodyBytesKey) err, httpReq.Data = checkAndSetData(string(body.([]byte)), collSchema) if err != nil { log.Warn("high level restful api, fail to deal with insert data", zap.Any("body", body), zap.Error(err)) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData), HTTPReturnMessage: merr.ErrInvalidInsertData.Error() + ", error: " + err.Error(), }) return nil, RestRequestInterceptorErr } insertReq := req.(*milvuspb.InsertRequest) insertReq.FieldsData, err = anyToColumns(httpReq.Data, collSchema) if err != nil { log.Warn("high level restful api, fail to deal with insert data", zap.Any("data", httpReq.Data), zap.Error(err)) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData), HTTPReturnMessage: merr.ErrInvalidInsertData.Error() + ", error: " + err.Error(), }) return nil, RestRequestInterceptorErr } return h.proxy.Insert(ctx, insertReq) }) if err == RestRequestInterceptorErr { return } if err == nil { err = merr.Error(response.(*milvuspb.MutationResult).GetStatus()) } if err != nil { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) } else { insertResp := response.(*milvuspb.MutationResult) switch insertResp.IDs.GetIdField().(type) { case *schemapb.IDs_IntId: allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64)) if allowJS { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"insertCount": insertResp.InsertCnt, "insertIds": insertResp.IDs.IdField.(*schemapb.IDs_IntId).IntId.Data}}) } else { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"insertCount": insertResp.InsertCnt, "insertIds": formatInt64(insertResp.IDs.IdField.(*schemapb.IDs_IntId).IntId.Data)}}) } case *schemapb.IDs_StrId: c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"insertCount": insertResp.InsertCnt, "insertIds": insertResp.IDs.IdField.(*schemapb.IDs_StrId).StrId.Data}}) default: c.JSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrCheckPrimaryKey), HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error() + ", error: unsupported primary key data type", }) } } } func (h *HandlersV1) upsert(c *gin.Context) { httpReq := UpsertReq{ DbName: DefaultDbName, } if err := c.ShouldBindBodyWith(&httpReq, binding.JSON); err != nil { singleUpsertReq := SingleUpsertReq{ DbName: DefaultDbName, } if err = c.ShouldBindBodyWith(&singleUpsertReq, binding.JSON); err != nil { log.Warn("high level restful api, the parameter of upsert is incorrect", zap.Any("request", httpReq), zap.Error(err)) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(), }) return } httpReq.DbName = singleUpsertReq.DbName httpReq.CollectionName = singleUpsertReq.CollectionName httpReq.Data = []map[string]interface{}{singleUpsertReq.Data} } if httpReq.CollectionName == "" || httpReq.Data == nil { log.Warn("high level restful api, upsert require parameter: [collectionName, data], but miss") c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName, data]", }) return } req := &milvuspb.UpsertRequest{ DbName: httpReq.DbName, CollectionName: httpReq.CollectionName, PartitionName: "_default", NumRows: uint32(len(httpReq.Data)), } username, _ := c.Get(ContextUsername) ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName) response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) { collSchema, err := h.describeCollection(ctx, c, httpReq.DbName, httpReq.CollectionName) if err != nil || collSchema == nil { return nil, RestRequestInterceptorErr } for _, fieldSchema := range collSchema.Fields { if fieldSchema.IsPrimaryKey && fieldSchema.AutoID { err := merr.WrapErrParameterInvalid("autoID: false", "autoID: true", "cannot upsert an autoID collection") c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) return nil, RestRequestInterceptorErr } } body, _ := c.Get(gin.BodyBytesKey) err, httpReq.Data = checkAndSetData(string(body.([]byte)), collSchema) if err != nil { log.Warn("high level restful api, fail to deal with upsert data", zap.Any("body", body), zap.Error(err)) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData), HTTPReturnMessage: merr.ErrInvalidInsertData.Error() + ", error: " + err.Error(), }) return nil, RestRequestInterceptorErr } upsertReq := req.(*milvuspb.UpsertRequest) upsertReq.FieldsData, err = anyToColumns(httpReq.Data, collSchema) if err != nil { log.Warn("high level restful api, fail to deal with upsert data", zap.Any("data", httpReq.Data), zap.Error(err)) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData), HTTPReturnMessage: merr.ErrInvalidInsertData.Error() + ", error: " + err.Error(), }) return nil, RestRequestInterceptorErr } return h.proxy.Upsert(ctx, upsertReq) }) if err == RestRequestInterceptorErr { return } if err == nil { err = merr.Error(response.(*milvuspb.MutationResult).GetStatus()) } if err != nil { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) } else { upsertResp := response.(*milvuspb.MutationResult) switch upsertResp.IDs.GetIdField().(type) { case *schemapb.IDs_IntId: allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64)) if allowJS { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"upsertCount": upsertResp.UpsertCnt, "upsertIds": upsertResp.IDs.IdField.(*schemapb.IDs_IntId).IntId.Data}}) } else { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"upsertCount": upsertResp.UpsertCnt, "upsertIds": formatInt64(upsertResp.IDs.IdField.(*schemapb.IDs_IntId).IntId.Data)}}) } case *schemapb.IDs_StrId: c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"upsertCount": upsertResp.UpsertCnt, "upsertIds": upsertResp.IDs.IdField.(*schemapb.IDs_StrId).StrId.Data}}) default: c.JSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrCheckPrimaryKey), HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error() + ", error: unsupported primary key data type", }) } } } func (h *HandlersV1) search(c *gin.Context) { httpReq := SearchReq{ DbName: DefaultDbName, Limit: 100, } if err := c.ShouldBindWith(&httpReq, binding.JSON); err != nil { log.Warn("high level restful api, the parameter of search is incorrect", zap.Any("request", httpReq), zap.Error(err)) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(), }) return } if httpReq.CollectionName == "" || httpReq.Vector == nil { log.Warn("high level restful api, search require parameter: [collectionName, vector], but miss") c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters), HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName, vector]", }) return } params := map[string]interface{}{ // auto generated mapping "level": int(commonpb.ConsistencyLevel_Bounded), } if httpReq.Params != nil { radius, radiusOk := httpReq.Params[ParamRadius] rangeFilter, rangeFilterOk := httpReq.Params[ParamRangeFilter] if rangeFilterOk { if !radiusOk { log.Warn("high level restful api, search params invalid, because only " + ParamRangeFilter) c.AbortWithStatusJSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat), HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: invalid search params", }) return } params[ParamRangeFilter] = rangeFilter } if radiusOk { params[ParamRadius] = radius } } bs, _ := json.Marshal(params) searchParams := []*commonpb.KeyValuePair{ {Key: common.TopKKey, Value: strconv.FormatInt(int64(httpReq.Limit), 10)}, {Key: Params, Value: string(bs)}, {Key: ParamRoundDecimal, Value: "-1"}, {Key: ParamOffset, Value: strconv.FormatInt(int64(httpReq.Offset), 10)}, } req := &milvuspb.SearchRequest{ DbName: httpReq.DbName, CollectionName: httpReq.CollectionName, Dsl: httpReq.Filter, PlaceholderGroup: vectors2PlaceholderGroupBytes([][]float32{httpReq.Vector}), DslType: commonpb.DslType_BoolExprV1, OutputFields: httpReq.OutputFields, SearchParams: searchParams, GuaranteeTimestamp: BoundedTimestamp, Nq: int64(1), } username, _ := c.Get(ContextUsername) ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName) response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) { return h.proxy.Search(ctx, req.(*milvuspb.SearchRequest)) }) if err == RestRequestInterceptorErr { return } if err == nil { err = merr.Error(response.(*milvuspb.SearchResults).GetStatus()) } if err != nil { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()}) } else { searchResp := response.(*milvuspb.SearchResults) if searchResp.Results.TopK == int64(0) { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: []interface{}{}}) } else { allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64)) outputData, err := buildQueryResp(searchResp.Results.TopK, searchResp.Results.OutputFields, searchResp.Results.FieldsData, searchResp.Results.Ids, searchResp.Results.Scores, allowJS) if err != nil { log.Warn("high level restful api, fail to deal with search result", zap.Any("result", searchResp.Results), zap.Error(err)) c.JSON(http.StatusOK, gin.H{ HTTPReturnCode: merr.Code(merr.ErrInvalidSearchResult), HTTPReturnMessage: merr.ErrInvalidSearchResult.Error() + ", error: " + err.Error(), }) } else { c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: outputData}) } } } }