enhance: access log support restful api (#33155)

relate: https://github.com/milvus-io/milvus/issues/31823

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
pull/33477/head^2
aoiasd 2024-05-31 11:13:44 +08:00 committed by GitHub
parent 4171414222
commit 0cf225fa75
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 713 additions and 219 deletions

View File

@ -47,6 +47,7 @@ const (
)
const (
ContextRequest = "request"
ContextUsername = "username"
VectorCollectionsPath = "/vector/collections"
VectorCollectionsCreatePath = "/vector/collections/create"

View File

@ -32,12 +32,12 @@ var RestRequestInterceptorErr = errors.New("interceptor error placeholder")
func checkAuthorization(ctx context.Context, c *gin.Context, req interface{}) error {
username, ok := c.Get(ContextUsername)
if !ok || username.(string) == "" {
c.JSON(http.StatusUnauthorized, gin.H{HTTPReturnCode: merr.Code(merr.ErrNeedAuthenticate), HTTPReturnMessage: merr.ErrNeedAuthenticate.Error()})
HTTPReturn(c, http.StatusUnauthorized, gin.H{HTTPReturnCode: merr.Code(merr.ErrNeedAuthenticate), HTTPReturnMessage: merr.ErrNeedAuthenticate.Error()})
return RestRequestInterceptorErr
}
_, authErr := proxy.PrivilegeInterceptor(ctx, req)
if authErr != nil {
c.JSON(http.StatusForbidden, gin.H{HTTPReturnCode: merr.Code(authErr), HTTPReturnMessage: authErr.Error()})
HTTPReturn(c, http.StatusForbidden, gin.H{HTTPReturnCode: merr.Code(authErr), HTTPReturnMessage: authErr.Error()})
return RestRequestInterceptorErr
}
@ -104,7 +104,7 @@ func (h *HandlersV1) checkDatabase(ctx context.Context, c *gin.Context, dbName s
err = merr.Error(response.GetStatus())
}
if err != nil {
c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
HTTPAbortReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
return RestRequestInterceptorErr
}
for _, db := range response.DbNames {
@ -112,7 +112,7 @@ func (h *HandlersV1) checkDatabase(ctx context.Context, c *gin.Context, dbName s
return nil
}
}
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrDatabaseNotFound),
HTTPReturnMessage: merr.ErrDatabaseNotFound.Error() + ", database: " + dbName,
})
@ -133,7 +133,7 @@ func (h *HandlersV1) describeCollection(ctx context.Context, c *gin.Context, dbN
err = merr.Error(response.GetStatus())
}
if err != nil {
c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
HTTPAbortReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
return nil, err
}
primaryField, ok := getPrimaryField(response.Schema)
@ -154,7 +154,7 @@ func (h *HandlersV1) hasCollection(ctx context.Context, c *gin.Context, dbName s
err = merr.Error(response.GetStatus())
}
if err != nil {
c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
HTTPAbortReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
return false, err
}
return response.Value, nil
@ -193,6 +193,7 @@ func (h *HandlersV1) listCollections(c *gin.Context) {
req := &milvuspb.ShowCollectionsRequest{
DbName: dbName,
}
c.Set(ContextRequest, req)
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
@ -206,7 +207,7 @@ func (h *HandlersV1) listCollections(c *gin.Context) {
err = merr.Error(resp.(*milvuspb.ShowCollectionsResponse).GetStatus())
}
if err != nil {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
return
}
response := resp.(*milvuspb.ShowCollectionsResponse)
@ -216,7 +217,7 @@ func (h *HandlersV1) listCollections(c *gin.Context) {
} else {
collections = []string{}
}
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: collections})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: collections})
}
func (h *HandlersV1) createCollection(c *gin.Context) {
@ -229,7 +230,7 @@ func (h *HandlersV1) createCollection(c *gin.Context) {
}
if err := c.ShouldBindWith(&httpReq, binding.JSON); err != nil {
log.Warn("high level restful api, the parameter of create collection is incorrect", zap.Any("request", httpReq), zap.Error(err))
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(),
})
@ -237,12 +238,20 @@ func (h *HandlersV1) createCollection(c *gin.Context) {
}
if httpReq.CollectionName == "" || httpReq.Dimension == 0 {
log.Warn("high level restful api, create collection require parameters: [collectionName, dimension], but miss", zap.Any("request", httpReq))
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters),
HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName, dimension]",
})
return
}
req := &milvuspb.CreateCollectionRequest{
DbName: httpReq.DbName,
CollectionName: httpReq.CollectionName,
ShardsNum: ShardNumDefault,
ConsistencyLevel: commonpb.ConsistencyLevel_Bounded,
}
c.Set(ContextRequest, req)
schema, err := proto.Marshal(&schemapb.CollectionSchema{
Name: httpReq.CollectionName,
Description: httpReq.Description,
@ -272,19 +281,13 @@ func (h *HandlersV1) createCollection(c *gin.Context) {
})
if err != nil {
log.Warn("high level restful api, marshal collection schema fail", zap.Any("request", httpReq), zap.Error(err))
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMarshalCollectionSchema),
HTTPReturnMessage: merr.ErrMarshalCollectionSchema.Error() + ", error: " + err.Error(),
})
return
}
req := &milvuspb.CreateCollectionRequest{
DbName: httpReq.DbName,
CollectionName: httpReq.CollectionName,
Schema: schema,
ShardsNum: ShardNumDefault,
ConsistencyLevel: commonpb.ConsistencyLevel_Bounded,
}
req.Schema = schema
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
@ -297,7 +300,7 @@ func (h *HandlersV1) createCollection(c *gin.Context) {
err = merr.Error(response.(*commonpb.Status))
}
if err != nil {
c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
HTTPAbortReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
return
}
@ -312,7 +315,7 @@ func (h *HandlersV1) createCollection(c *gin.Context) {
err = merr.Error(statusResponse)
}
if err != nil {
c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
HTTPAbortReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
return
}
statusResponse, err = h.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
@ -323,17 +326,17 @@ func (h *HandlersV1) createCollection(c *gin.Context) {
err = merr.Error(statusResponse)
}
if err != nil {
c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
HTTPAbortReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{}})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{}})
}
func (h *HandlersV1) getCollectionDetails(c *gin.Context) {
collectionName := c.Query(HTTPCollectionName)
if collectionName == "" {
log.Warn("high level restful api, desc collection require parameter: [collectionName], but miss")
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters),
HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName]",
})
@ -347,6 +350,7 @@ func (h *HandlersV1) getCollectionDetails(c *gin.Context) {
DbName: dbName,
CollectionName: collectionName,
}
c.Set(ContextRequest, req)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
return h.proxy.DescribeCollection(reqCtx, req.(*milvuspb.DescribeCollectionRequest))
@ -356,7 +360,7 @@ func (h *HandlersV1) getCollectionDetails(c *gin.Context) {
err = merr.Error(response.(*milvuspb.DescribeCollectionResponse).GetStatus())
}
if err != nil {
c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
HTTPAbortReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
return
}
coll := response.(*milvuspb.DescribeCollectionResponse)
@ -408,7 +412,7 @@ func (h *HandlersV1) getCollectionDetails(c *gin.Context) {
} else {
indexDesc = printIndexes(indexResp.IndexDescriptions)
}
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{
HTTPCollectionName: coll.CollectionName,
HTTPReturnDescription: coll.Schema.Description,
"fields": printFields(coll.Schema.Fields),
@ -425,7 +429,7 @@ func (h *HandlersV1) dropCollection(c *gin.Context) {
}
if err := c.ShouldBindWith(&httpReq, binding.JSON); err != nil {
log.Warn("high level restful api, the parameter of drop collection is incorrect", zap.Any("request", httpReq), zap.Error(err))
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(),
})
@ -433,7 +437,7 @@ func (h *HandlersV1) dropCollection(c *gin.Context) {
}
if httpReq.CollectionName == "" {
log.Warn("high level restful api, drop collection require parameter: [collectionName], but miss")
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters),
HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName]",
})
@ -443,6 +447,7 @@ func (h *HandlersV1) dropCollection(c *gin.Context) {
DbName: httpReq.DbName,
CollectionName: httpReq.CollectionName,
}
c.Set(ContextRequest, req)
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
@ -451,7 +456,7 @@ func (h *HandlersV1) dropCollection(c *gin.Context) {
return nil, RestRequestInterceptorErr
}
if !has {
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrCollectionNotFound),
HTTPReturnMessage: merr.ErrCollectionNotFound.Error() + ", database: " + httpReq.DbName + ", collection: " + httpReq.CollectionName,
})
@ -466,9 +471,9 @@ func (h *HandlersV1) dropCollection(c *gin.Context) {
err = merr.Error(response.(*commonpb.Status))
}
if err != nil {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
} else {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{}})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{}})
}
}
@ -480,7 +485,7 @@ func (h *HandlersV1) query(c *gin.Context) {
}
if err := c.ShouldBindWith(&httpReq, binding.JSON); err != nil {
log.Warn("high level restful api, the parameter of query is incorrect", zap.Any("request", httpReq), zap.Error(err))
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(),
})
@ -488,7 +493,7 @@ func (h *HandlersV1) query(c *gin.Context) {
}
if httpReq.CollectionName == "" || httpReq.Filter == "" {
log.Warn("high level restful api, query require parameter: [collectionName, filter], but miss")
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters),
HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName, filter]",
})
@ -502,6 +507,7 @@ func (h *HandlersV1) query(c *gin.Context) {
GuaranteeTimestamp: BoundedTimestamp,
QueryParams: []*commonpb.KeyValuePair{},
}
c.Set(ContextRequest, req)
if httpReq.Offset > 0 {
req.QueryParams = append(req.QueryParams, &commonpb.KeyValuePair{Key: ParamOffset, Value: strconv.FormatInt(int64(httpReq.Offset), 10)})
}
@ -520,19 +526,19 @@ func (h *HandlersV1) query(c *gin.Context) {
err = merr.Error(response.(*milvuspb.QueryResults).GetStatus())
}
if err != nil {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
} else {
queryResp := response.(*milvuspb.QueryResults)
allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64))
outputData, err := buildQueryResp(int64(0), queryResp.OutputFields, queryResp.FieldsData, nil, nil, allowJS)
if err != nil {
log.Warn("high level restful api, fail to deal with query result", zap.Any("response", response), zap.Error(err))
c.JSON(http.StatusOK, gin.H{
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidSearchResult),
HTTPReturnMessage: merr.ErrInvalidSearchResult.Error() + ", error: " + err.Error(),
})
} else {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: outputData})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: outputData})
}
}
}
@ -544,7 +550,7 @@ func (h *HandlersV1) get(c *gin.Context) {
}
if err := c.ShouldBindBodyWith(&httpReq, binding.JSON); err != nil {
log.Warn("high level restful api, the parameter of get is incorrect", zap.Any("request", httpReq), zap.Error(err))
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(),
})
@ -552,7 +558,7 @@ func (h *HandlersV1) get(c *gin.Context) {
}
if httpReq.CollectionName == "" || httpReq.ID == nil {
log.Warn("high level restful api, get require parameter: [collectionName, id], but miss")
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters),
HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName, id]",
})
@ -564,6 +570,7 @@ func (h *HandlersV1) get(c *gin.Context) {
OutputFields: httpReq.OutputFields,
GuaranteeTimestamp: BoundedTimestamp,
}
c.Set(ContextRequest, req)
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
@ -574,7 +581,7 @@ func (h *HandlersV1) get(c *gin.Context) {
body, _ := c.Get(gin.BodyBytesKey)
filter, err := checkGetPrimaryKey(collSchema, gjson.Get(string(body.([]byte)), DefaultPrimaryFieldName))
if err != nil {
c.JSON(http.StatusOK, gin.H{
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrCheckPrimaryKey),
HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error() + ", error: " + err.Error(),
})
@ -591,19 +598,19 @@ func (h *HandlersV1) get(c *gin.Context) {
err = merr.Error(response.(*milvuspb.QueryResults).GetStatus())
}
if err != nil {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
} else {
queryResp := response.(*milvuspb.QueryResults)
allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64))
outputData, err := buildQueryResp(int64(0), queryResp.OutputFields, queryResp.FieldsData, nil, nil, allowJS)
if err != nil {
log.Warn("high level restful api, fail to deal with get result", zap.Any("response", response), zap.Error(err))
c.JSON(http.StatusOK, gin.H{
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidSearchResult),
HTTPReturnMessage: merr.ErrInvalidSearchResult.Error() + ", error: " + err.Error(),
})
} else {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: outputData})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: outputData})
}
}
}
@ -614,7 +621,7 @@ func (h *HandlersV1) delete(c *gin.Context) {
}
if err := c.ShouldBindBodyWith(&httpReq, binding.JSON); err != nil {
log.Warn("high level restful api, the parameter of delete is incorrect", zap.Any("request", httpReq), zap.Error(err))
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(),
})
@ -622,7 +629,7 @@ func (h *HandlersV1) delete(c *gin.Context) {
}
if httpReq.CollectionName == "" || (httpReq.ID == nil && httpReq.Filter == "") {
log.Warn("high level restful api, delete require parameter: [collectionName, id/filter], but miss")
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters),
HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName, id/filter]",
})
@ -632,6 +639,7 @@ func (h *HandlersV1) delete(c *gin.Context) {
DbName: httpReq.DbName,
CollectionName: httpReq.CollectionName,
}
c.Set(ContextRequest, req)
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
@ -645,7 +653,7 @@ func (h *HandlersV1) delete(c *gin.Context) {
body, _ := c.Get(gin.BodyBytesKey)
filter, err := checkGetPrimaryKey(collSchema, gjson.Get(string(body.([]byte)), DefaultPrimaryFieldName))
if err != nil {
c.JSON(http.StatusOK, gin.H{
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrCheckPrimaryKey),
HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error() + ", error: " + err.Error(),
})
@ -662,9 +670,9 @@ func (h *HandlersV1) delete(c *gin.Context) {
err = merr.Error(response.(*milvuspb.MutationResult).GetStatus())
}
if err != nil {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
} else {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{}})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{}})
}
}
@ -678,7 +686,7 @@ func (h *HandlersV1) insert(c *gin.Context) {
}
if err = c.ShouldBindBodyWith(&singleInsertReq, binding.JSON); err != nil {
log.Warn("high level restful api, the parameter of insert is incorrect", zap.Any("request", httpReq), zap.Error(err))
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(),
})
@ -690,7 +698,7 @@ func (h *HandlersV1) insert(c *gin.Context) {
}
if httpReq.CollectionName == "" || httpReq.Data == nil {
log.Warn("high level restful api, insert require parameter: [collectionName, data], but miss")
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters),
HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName, data]",
})
@ -701,6 +709,7 @@ func (h *HandlersV1) insert(c *gin.Context) {
CollectionName: httpReq.CollectionName,
NumRows: uint32(len(httpReq.Data)),
}
c.Set(ContextRequest, req)
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
@ -712,7 +721,7 @@ func (h *HandlersV1) insert(c *gin.Context) {
err, httpReq.Data = checkAndSetData(string(body.([]byte)), collSchema)
if err != nil {
log.Warn("high level restful api, fail to deal with insert data", zap.Any("body", body), zap.Error(err))
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData),
HTTPReturnMessage: merr.ErrInvalidInsertData.Error() + ", error: " + err.Error(),
})
@ -722,7 +731,7 @@ func (h *HandlersV1) insert(c *gin.Context) {
insertReq.FieldsData, err = anyToColumns(httpReq.Data, collSchema)
if err != nil {
log.Warn("high level restful api, fail to deal with insert data", zap.Any("data", httpReq.Data), zap.Error(err))
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData),
HTTPReturnMessage: merr.ErrInvalidInsertData.Error() + ", error: " + err.Error(),
})
@ -737,21 +746,21 @@ func (h *HandlersV1) insert(c *gin.Context) {
err = merr.Error(response.(*milvuspb.MutationResult).GetStatus())
}
if err != nil {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
} else {
insertResp := response.(*milvuspb.MutationResult)
switch insertResp.IDs.GetIdField().(type) {
case *schemapb.IDs_IntId:
allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64))
if allowJS {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"insertCount": insertResp.InsertCnt, "insertIds": insertResp.IDs.IdField.(*schemapb.IDs_IntId).IntId.Data}})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"insertCount": insertResp.InsertCnt, "insertIds": insertResp.IDs.IdField.(*schemapb.IDs_IntId).IntId.Data}})
} else {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"insertCount": insertResp.InsertCnt, "insertIds": formatInt64(insertResp.IDs.IdField.(*schemapb.IDs_IntId).IntId.Data)}})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"insertCount": insertResp.InsertCnt, "insertIds": formatInt64(insertResp.IDs.IdField.(*schemapb.IDs_IntId).IntId.Data)}})
}
case *schemapb.IDs_StrId:
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"insertCount": insertResp.InsertCnt, "insertIds": insertResp.IDs.IdField.(*schemapb.IDs_StrId).StrId.Data}})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"insertCount": insertResp.InsertCnt, "insertIds": insertResp.IDs.IdField.(*schemapb.IDs_StrId).StrId.Data}})
default:
c.JSON(http.StatusOK, gin.H{
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrCheckPrimaryKey),
HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error() + ", error: unsupported primary key data type",
})
@ -769,7 +778,7 @@ func (h *HandlersV1) upsert(c *gin.Context) {
}
if err = c.ShouldBindBodyWith(&singleUpsertReq, binding.JSON); err != nil {
log.Warn("high level restful api, the parameter of upsert is incorrect", zap.Any("request", httpReq), zap.Error(err))
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(),
})
@ -781,7 +790,7 @@ func (h *HandlersV1) upsert(c *gin.Context) {
}
if httpReq.CollectionName == "" || httpReq.Data == nil {
log.Warn("high level restful api, upsert require parameter: [collectionName, data], but miss")
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters),
HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName, data]",
})
@ -792,6 +801,7 @@ func (h *HandlersV1) upsert(c *gin.Context) {
CollectionName: httpReq.CollectionName,
NumRows: uint32(len(httpReq.Data)),
}
c.Set(ContextRequest, req)
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
@ -802,7 +812,7 @@ func (h *HandlersV1) upsert(c *gin.Context) {
for _, fieldSchema := range collSchema.Fields {
if fieldSchema.IsPrimaryKey && fieldSchema.AutoID {
err := merr.WrapErrParameterInvalid("autoID: false", "autoID: true", "cannot upsert an autoID collection")
c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
HTTPAbortReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
return nil, RestRequestInterceptorErr
}
}
@ -810,7 +820,7 @@ func (h *HandlersV1) upsert(c *gin.Context) {
err, httpReq.Data = checkAndSetData(string(body.([]byte)), collSchema)
if err != nil {
log.Warn("high level restful api, fail to deal with upsert data", zap.Any("body", body), zap.Error(err))
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData),
HTTPReturnMessage: merr.ErrInvalidInsertData.Error() + ", error: " + err.Error(),
})
@ -820,7 +830,7 @@ func (h *HandlersV1) upsert(c *gin.Context) {
upsertReq.FieldsData, err = anyToColumns(httpReq.Data, collSchema)
if err != nil {
log.Warn("high level restful api, fail to deal with upsert data", zap.Any("data", httpReq.Data), zap.Error(err))
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidInsertData),
HTTPReturnMessage: merr.ErrInvalidInsertData.Error() + ", error: " + err.Error(),
})
@ -835,21 +845,21 @@ func (h *HandlersV1) upsert(c *gin.Context) {
err = merr.Error(response.(*milvuspb.MutationResult).GetStatus())
}
if err != nil {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
} else {
upsertResp := response.(*milvuspb.MutationResult)
switch upsertResp.IDs.GetIdField().(type) {
case *schemapb.IDs_IntId:
allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64))
if allowJS {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"upsertCount": upsertResp.UpsertCnt, "upsertIds": upsertResp.IDs.IdField.(*schemapb.IDs_IntId).IntId.Data}})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"upsertCount": upsertResp.UpsertCnt, "upsertIds": upsertResp.IDs.IdField.(*schemapb.IDs_IntId).IntId.Data}})
} else {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"upsertCount": upsertResp.UpsertCnt, "upsertIds": formatInt64(upsertResp.IDs.IdField.(*schemapb.IDs_IntId).IntId.Data)}})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"upsertCount": upsertResp.UpsertCnt, "upsertIds": formatInt64(upsertResp.IDs.IdField.(*schemapb.IDs_IntId).IntId.Data)}})
}
case *schemapb.IDs_StrId:
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"upsertCount": upsertResp.UpsertCnt, "upsertIds": upsertResp.IDs.IdField.(*schemapb.IDs_StrId).StrId.Data}})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{"upsertCount": upsertResp.UpsertCnt, "upsertIds": upsertResp.IDs.IdField.(*schemapb.IDs_StrId).StrId.Data}})
default:
c.JSON(http.StatusOK, gin.H{
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrCheckPrimaryKey),
HTTPReturnMessage: merr.ErrCheckPrimaryKey.Error() + ", error: unsupported primary key data type",
})
@ -864,7 +874,7 @@ func (h *HandlersV1) search(c *gin.Context) {
}
if err := c.ShouldBindWith(&httpReq, binding.JSON); err != nil {
log.Warn("high level restful api, the parameter of search is incorrect", zap.Any("request", httpReq), zap.Error(err))
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: " + err.Error(),
})
@ -872,12 +882,24 @@ func (h *HandlersV1) search(c *gin.Context) {
}
if httpReq.CollectionName == "" || httpReq.Vector == nil {
log.Warn("high level restful api, search require parameter: [collectionName, vector], but miss")
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrMissingRequiredParameters),
HTTPReturnMessage: merr.ErrMissingRequiredParameters.Error() + ", required parameters: [collectionName, vector]",
})
return
}
req := &milvuspb.SearchRequest{
DbName: httpReq.DbName,
CollectionName: httpReq.CollectionName,
Dsl: httpReq.Filter,
PlaceholderGroup: vectors2PlaceholderGroupBytes([][]float32{httpReq.Vector}),
DslType: commonpb.DslType_BoolExprV1,
OutputFields: httpReq.OutputFields,
GuaranteeTimestamp: BoundedTimestamp,
Nq: int64(1),
}
c.Set(ContextRequest, req)
params := map[string]interface{}{ // auto generated mapping
"level": int(commonpb.ConsistencyLevel_Bounded),
}
@ -887,7 +909,7 @@ func (h *HandlersV1) search(c *gin.Context) {
if rangeFilterOk {
if !radiusOk {
log.Warn("high level restful api, search params invalid, because only " + ParamRangeFilter)
c.AbortWithStatusJSON(http.StatusOK, gin.H{
HTTPAbortReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrIncorrectParameterFormat),
HTTPReturnMessage: merr.ErrIncorrectParameterFormat.Error() + ", error: invalid search params",
})
@ -900,23 +922,13 @@ func (h *HandlersV1) search(c *gin.Context) {
}
}
bs, _ := json.Marshal(params)
searchParams := []*commonpb.KeyValuePair{
req.SearchParams = []*commonpb.KeyValuePair{
{Key: common.TopKKey, Value: strconv.FormatInt(int64(httpReq.Limit), 10)},
{Key: Params, Value: string(bs)},
{Key: ParamRoundDecimal, Value: "-1"},
{Key: ParamOffset, Value: strconv.FormatInt(int64(httpReq.Offset), 10)},
}
req := &milvuspb.SearchRequest{
DbName: httpReq.DbName,
CollectionName: httpReq.CollectionName,
Dsl: httpReq.Filter,
PlaceholderGroup: vectors2PlaceholderGroupBytes([][]float32{httpReq.Vector}),
DslType: commonpb.DslType_BoolExprV1,
OutputFields: httpReq.OutputFields,
SearchParams: searchParams,
GuaranteeTimestamp: BoundedTimestamp,
Nq: int64(1),
}
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
@ -929,22 +941,22 @@ func (h *HandlersV1) search(c *gin.Context) {
err = merr.Error(response.(*milvuspb.SearchResults).GetStatus())
}
if err != nil {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
} else {
searchResp := response.(*milvuspb.SearchResults)
if searchResp.Results.TopK == int64(0) {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: []interface{}{}})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: []interface{}{}})
} else {
allowJS, _ := strconv.ParseBool(c.Request.Header.Get(HTTPHeaderAllowInt64))
outputData, err := buildQueryResp(searchResp.Results.TopK, searchResp.Results.OutputFields, searchResp.Results.FieldsData, searchResp.Results.Ids, searchResp.Results.Scores, allowJS)
if err != nil {
log.Warn("high level restful api, fail to deal with search result", zap.Any("result", searchResp.Results), zap.Error(err))
c.JSON(http.StatusOK, gin.H{
HTTPReturn(c, http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrInvalidSearchResult),
HTTPReturnMessage: merr.ErrInvalidSearchResult.Error() + ", error: " + err.Error(),
})
} else {
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: outputData})
HTTPReturn(c, http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: outputData})
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -28,6 +28,22 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func HTTPReturn(c *gin.Context, code int, result gin.H) {
c.Set(HTTPReturnCode, result[HTTPReturnCode])
if errorMsg, ok := result[HTTPReturnMessage]; ok {
c.Set(HTTPReturnMessage, errorMsg)
}
c.JSON(code, result)
}
func HTTPAbortReturn(c *gin.Context, code int, result gin.H) {
c.Set(HTTPReturnCode, result[HTTPReturnCode])
if errorMsg, ok := result[HTTPReturnMessage]; ok {
c.Set(HTTPReturnMessage, errorMsg)
}
c.AbortWithStatusJSON(code, result)
}
func ParseUsernamePassword(c *gin.Context) (string, string, bool) {
username, password, ok := c.Request.BasicAuth()
if !ok {

View File

@ -172,6 +172,8 @@ func (s *Server) registerHTTPServer() {
func (s *Server) startHTTPServer(errChan chan error) {
defer s.wg.Done()
ginHandler := gin.New()
ginHandler.Use(accesslog.AccessLogMiddleware)
ginLogger := gin.LoggerWithConfig(gin.LoggerConfig{
SkipPaths: proxy.Params.ProxyCfg.GinLogSkipPaths.GetAsStrings(),
Formatter: func(param gin.LogFormatterParams) string {
@ -182,6 +184,8 @@ func (s *Server) startHTTPServer(errChan chan error) {
if !ok {
traceID = ""
}
accesslog.SetHTTPParams(&param)
return fmt.Sprintf("[%v] [GIN] [%s] [traceID=%s] [code=%3d] [latency=%v] [client=%s] [method=%s] [error=%s]\n",
param.TimeStamp.Format("2006/01/02 15:04:05.000 Z07:00"),
param.Path,

View File

@ -32,7 +32,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proxy/accesslog/info"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/tracer"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/crypto"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -153,16 +153,15 @@ func (s *LogFormatterSuite) TestFormatMethodInfo() {
for _, req := range s.reqs {
i := info.NewGrpcAccessInfo(metaContext, s.serverinfo, req)
fs := formatter.Format(i)
log.Info(fs)
s.True(strings.Contains(fs, s.traceID))
}
tracer.Init()
traceContext, traceSpan := otel.Tracer(typeutil.ProxyRole).Start(s.ctx, "test")
trueTraceID := traceSpan.SpanContext().TraceID().String()
for _, req := range s.reqs {
i := info.NewGrpcAccessInfo(traceContext, s.serverinfo, req)
fs := formatter.Format(i)
log.Info(fs)
s.True(strings.Contains(fs, trueTraceID))
}
}

View File

@ -33,7 +33,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proxy/connection"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/requestutil"
)
@ -129,6 +128,10 @@ func (i *GrpcAccessInfo) TraceID() string {
}
traceID := trace.SpanFromContext(i.ctx).SpanContext().TraceID()
if !traceID.IsValid() {
return Unknown
}
return traceID.String()
}
@ -252,10 +255,6 @@ func (i *GrpcAccessInfo) SdkVersion() string {
return getSdkVersionByUserAgent(i.ctx)
}
func (i *GrpcAccessInfo) ClusterPrefix() string {
return paramtable.Get().CommonCfg.ClusterPrefix.GetValue()
}
func (i *GrpcAccessInfo) OutputFields() string {
fields, ok := requestutil.GetOutputFieldsFromRequest(i.req)
if ok {

View File

@ -0,0 +1,189 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package info
import (
"fmt"
"net/http"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/milvus-io/milvus/pkg/util/requestutil"
)
const (
ContextUsername = "username"
ContextReturnCode = "code"
ContextReturnMessage = "message"
ContextRequest = "request"
)
type RestfulInfo struct {
params *gin.LogFormatterParams
start time.Time
req interface{}
reqInitOnce sync.Once
}
func NewRestfulInfo() *RestfulInfo {
return &RestfulInfo{start: time.Now()}
}
func (i *RestfulInfo) SetParams(p *gin.LogFormatterParams) {
i.params = p
}
func (i *RestfulInfo) InitReq() {
req, ok := i.params.Keys[ContextRequest]
if !ok {
return
}
i.req = req
}
func (i *RestfulInfo) TimeCost() string {
return fmt.Sprint(i.params.Latency)
}
func (i *RestfulInfo) TimeNow() string {
return time.Now().Format(timeFormat)
}
func (i *RestfulInfo) TimeStart() string {
if i.start.IsZero() {
return Unknown
}
return i.start.Format(timeFormat)
}
func (i *RestfulInfo) TimeEnd() string {
return i.params.TimeStamp.Format(timeFormat)
}
func (i *RestfulInfo) MethodName() string {
return i.params.Path
}
func (i *RestfulInfo) Address() string {
return i.params.ClientIP
}
func (i *RestfulInfo) TraceID() string {
traceID, ok := i.params.Keys["traceID"]
if !ok {
return Unknown
}
return traceID.(string)
}
func (i *RestfulInfo) MethodStatus() string {
if i.params.StatusCode != http.StatusOK {
return fmt.Sprintf("HttpError%d", i.params.StatusCode)
}
if code, ok := i.params.Keys[ContextReturnCode]; !ok || code.(int32) != 0 {
return "Failed"
}
return "Successful"
}
func (i *RestfulInfo) UserName() string {
username, ok := i.params.Keys[ContextUsername]
if !ok || username == "" {
return Unknown
}
return username.(string)
}
func (i *RestfulInfo) ResponseSize() string {
return fmt.Sprint(i.params.BodySize)
}
func (i *RestfulInfo) ErrorCode() string {
code, ok := i.params.Keys[ContextReturnCode]
if !ok {
return Unknown
}
return fmt.Sprint(code)
}
func (i *RestfulInfo) ErrorMsg() string {
message, ok := i.params.Keys[ContextReturnMessage]
if !ok {
return ""
}
return fmt.Sprint(message)
}
func (i *RestfulInfo) SdkVersion() string {
return "Restful"
}
func (i *RestfulInfo) DbName() string {
name, ok := requestutil.GetDbNameFromRequest(i.req)
if !ok {
return Unknown
}
return name.(string)
}
func (i *RestfulInfo) CollectionName() string {
name, ok := requestutil.GetCollectionNameFromRequest(i.req)
if !ok {
return Unknown
}
return name.(string)
}
func (i *RestfulInfo) PartitionName() string {
name, ok := requestutil.GetPartitionNameFromRequest(i.req)
if ok {
return name.(string)
}
names, ok := requestutil.GetPartitionNamesFromRequest(i.req)
if ok {
return fmt.Sprint(names.([]string))
}
return Unknown
}
func (i *RestfulInfo) Expression() string {
expr, ok := requestutil.GetExprFromRequest(i.req)
if ok {
return expr.(string)
}
dsl, ok := requestutil.GetDSLFromRequest(i.req)
if ok {
return dsl.(string)
}
return Unknown
}
func (i *RestfulInfo) OutputFields() string {
fields, ok := requestutil.GetOutputFieldsFromRequest(i.req)
if ok {
return fmt.Sprint(fields.([]string))
}
return Unknown
}

View File

@ -0,0 +1,192 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package info
import (
"fmt"
"net/http"
"testing"
"time"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type RestfulAccessInfoSuite struct {
suite.Suite
username string
traceID string
info *RestfulInfo
}
func (s *RestfulAccessInfoSuite) SetupSuite() {
paramtable.Init()
}
func (s *RestfulAccessInfoSuite) SetupTest() {
s.username = "test-user"
s.traceID = "test-trace"
s.info = &RestfulInfo{}
s.info.SetParams(
&gin.LogFormatterParams{
Keys: make(map[string]any),
})
}
func (s *RestfulAccessInfoSuite) TestTimeCost() {
s.info.params.Latency = time.Second
result := Get(s.info, "$time_cost")
s.Equal(fmt.Sprint(time.Second), result[0])
}
func (s *RestfulAccessInfoSuite) TestTimeNow() {
result := Get(s.info, "$time_now")
s.NotEqual(Unknown, result[0])
}
func (s *RestfulAccessInfoSuite) TestTimeStart() {
result := Get(s.info, "$time_start")
s.Equal(Unknown, result[0])
s.info.start = time.Now()
result = Get(s.info, "$time_start")
s.Equal(s.info.start.Format(timeFormat), result[0])
}
func (s *RestfulAccessInfoSuite) TestTimeEnd() {
s.info.params.TimeStamp = time.Now()
result := Get(s.info, "$time_end")
s.Equal(s.info.params.TimeStamp.Format(timeFormat), result[0])
}
func (s *RestfulAccessInfoSuite) TestMethodName() {
s.info.params.Path = "/restful/test"
result := Get(s.info, "$method_name")
s.Equal(s.info.params.Path, result[0])
}
func (s *RestfulAccessInfoSuite) TestAddress() {
s.info.params.ClientIP = "127.0.0.1"
result := Get(s.info, "$user_addr")
s.Equal(s.info.params.ClientIP, result[0])
}
func (s *RestfulAccessInfoSuite) TestTraceID() {
result := Get(s.info, "$trace_id")
s.Equal(Unknown, result[0])
s.info.params.Keys["traceID"] = "testtrace"
result = Get(s.info, "$trace_id")
s.Equal(s.info.params.Keys["traceID"], result[0])
}
func (s *RestfulAccessInfoSuite) TestStatus() {
s.info.params.StatusCode = http.StatusBadRequest
result := Get(s.info, "$method_status")
s.Equal("HttpError400", result[0])
s.info.params.StatusCode = http.StatusOK
s.info.params.Keys[ContextReturnCode] = merr.Code(merr.ErrChannelLack)
result = Get(s.info, "$method_status")
s.Equal("Failed", result[0])
s.info.params.StatusCode = http.StatusOK
s.info.params.Keys[ContextReturnCode] = merr.Code(nil)
result = Get(s.info, "$method_status")
s.Equal("Successful", result[0])
}
func (s *RestfulAccessInfoSuite) TestErrorCode() {
result := Get(s.info, "$error_code")
s.Equal(Unknown, result[0])
s.info.params.Keys[ContextReturnCode] = 200
result = Get(s.info, "$error_code")
s.Equal(fmt.Sprint(200), result[0])
}
func (s *RestfulAccessInfoSuite) TestErrorMsg() {
s.info.params.Keys[ContextReturnMessage] = merr.ErrChannelLack.Error()
result := Get(s.info, "$error_msg")
s.Equal(merr.ErrChannelLack.Error(), result[0])
}
func (s *RestfulAccessInfoSuite) TestDbName() {
result := Get(s.info, "$database_name")
s.Equal(Unknown, result[0])
req := &milvuspb.QueryRequest{
DbName: "test",
}
s.info.req = req
result = Get(s.info, "$database_name")
s.Equal("test", result[0])
}
func (s *RestfulAccessInfoSuite) TestSdkInfo() {
result := Get(s.info, "$sdk_version")
s.Equal("Restful", result[0])
}
func (s *RestfulAccessInfoSuite) TestExpression() {
result := Get(s.info, "$method_expr")
s.Equal(Unknown, result[0])
testExpr := "test"
s.info.req = &milvuspb.QueryRequest{
Expr: testExpr,
}
result = Get(s.info, "$method_expr")
s.Equal(testExpr, result[0])
s.info.req = &milvuspb.SearchRequest{
Dsl: testExpr,
}
result = Get(s.info, "$method_expr")
s.Equal(testExpr, result[0])
}
func (s *RestfulAccessInfoSuite) TestOutputFields() {
result := Get(s.info, "$output_fields")
s.Equal(Unknown, result[0])
fields := []string{"pk"}
s.info.params.Keys[ContextRequest] = &milvuspb.QueryRequest{
OutputFields: fields,
}
s.info.InitReq()
result = Get(s.info, "$output_fields")
s.Equal(fmt.Sprint(fields), result[0])
}
func (s *RestfulAccessInfoSuite) TestClusterPrefix() {
cluster := "instance-test"
paramtable.Init()
ClusterPrefix.Store(cluster)
result := Get(s.info, "$cluster_prefix")
s.Equal(cluster, result[0])
}
func TestRestfulAccessInfo(t *testing.T) {
suite.Run(t, new(RestfulAccessInfoSuite))
}

View File

@ -22,6 +22,7 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/gin-gonic/gin"
"google.golang.org/grpc"
"github.com/milvus-io/milvus/internal/proxy/accesslog/info"
@ -29,6 +30,8 @@ import (
type AccessKey struct{}
const ContextLogKey = "accesslog"
func UnaryAccessLogInterceptor(ctx context.Context, req any, rpcInfo *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
accessInfo := info.NewGrpcAccessInfo(ctx, rpcInfo, req)
newCtx := context.WithValue(ctx, AccessKey{}, accessInfo)
@ -44,6 +47,24 @@ func UnaryUpdateAccessInfoInterceptor(ctx context.Context, req any, rpcInfonfo *
return handler(ctx, req)
}
func AccessLogMiddleware(ctx *gin.Context) {
accessInfo := info.NewRestfulInfo()
ctx.Set(ContextLogKey, accessInfo)
ctx.Next()
accessInfo.InitReq()
_globalL.Write(accessInfo)
}
func SetHTTPParams(p *gin.LogFormatterParams) {
value, ok := p.Keys[ContextLogKey]
if !ok {
return
}
info := value.(*info.RestfulInfo)
info.SetParams(p)
}
func join(path1, path2 string) string {
if strings.HasSuffix(path1, "/") {
return path1 + path2