feat: restful phase two (#29728)

issue: #29732

Signed-off-by: PowderLi <min.li@zilliz.com>
pull/30332/head
PowderLi 2024-01-28 16:03:01 +08:00 committed by GitHub
parent 8fc4ebfa11
commit 6abbab12fa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 3033 additions and 16 deletions

View File

@ -1,5 +1,45 @@
package httpserver
import "time"
// v2
const (
// --- category ---
CollectionCategory = "/collections/"
EntityCategory = "/entities/"
PartitionCategory = "/partitions/"
UserCategory = "/users/"
RoleCategory = "/roles/"
IndexCategory = "/indexes/"
AliasCategory = "/aliases/"
JobCategory = "/jobs/"
ListAction = "list"
HasAction = "has"
DescribeAction = "describe"
CreateAction = "create"
DropAction = "drop"
StatsAction = "get_stats"
LoadStateAction = "get_load_state"
RenameAction = "rename"
LoadAction = "load"
ReleaseAction = "release"
QueryAction = "query"
GetAction = "get"
DeleteAction = "delete"
InsertAction = "insert"
UpsertAction = "upsert"
SearchAction = "search"
UpdatePasswordAction = "update_password"
GrantRoleAction = "grant_role"
RevokeRoleAction = "revoke_role"
GrantPrivilegeAction = "grant_privilege"
RevokePrivilegeAction = "revoke_privilege"
AlterAction = "alter"
GetProgressAction = "get_progress"
)
const (
ContextUsername = "username"
VectorCollectionsPath = "/vector/collections"
@ -15,19 +55,36 @@ const (
ShardNumDefault = 1
PrimaryFieldName = "id"
VectorFieldName = "vector"
EnableDynamic = true
EnableAutoID = true
DisableAutoID = false
HTTPCollectionName = "collectionName"
HTTPDbName = "dbName"
HTTPPartitionName = "partitionName"
HTTPPartitionNames = "partitionNames"
HTTPUserName = "userName"
HTTPRoleName = "roleName"
HTTPIndexName = "indexName"
HTTPIndexField = "fieldName"
HTTPAliasName = "aliasName"
DefaultDbName = "default"
DefaultIndexName = "vector_idx"
DefaultAliasName = "the_alias"
DefaultOutputFields = "*"
HTTPHeaderAllowInt64 = "Accept-Type-Allow-Int64"
HTTPHeaderRequestTimeout = "Request-Timeout"
HTTPDefaultTimeout = 30 * time.Second
HTTPReturnCode = "code"
HTTPReturnMessage = "message"
HTTPReturnData = "data"
HTTPReturnLoadState = "loadState"
HTTPReturnLoadProgress = "loadProgress"
HTTPReturnHas = "has"
HTTPReturnFieldName = "name"
HTTPReturnFieldType = "type"
@ -35,12 +92,24 @@ const (
HTTPReturnFieldAutoID = "autoId"
HTTPReturnDescription = "description"
HTTPReturnIndexName = "indexName"
HTTPReturnIndexField = "fieldName"
HTTPReturnIndexMetricsType = "metricType"
HTTPReturnIndexType = "indexType"
HTTPReturnIndexTotalRows = "totalRows"
HTTPReturnIndexPendingRows = "pendingRows"
HTTPReturnIndexIndexedRows = "indexedRows"
HTTPReturnIndexState = "indexState"
HTTPReturnIndexFailReason = "failReason"
HTTPReturnDistance = "distance"
HTTPReturnRowCount = "rowCount"
HTTPReturnObjectType = "objectType"
HTTPReturnObjectName = "objectName"
HTTPReturnPrivilege = "privilege"
HTTPReturnGrantor = "grantor"
HTTPReturnDbName = "dbName"
DefaultMetricType = "L2"
DefaultPrimaryFieldName = "id"
DefaultVectorFieldName = "vector"

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,285 @@
package httpserver
import (
"net/http"
"github.com/gin-gonic/gin"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
)
type DatabaseReq struct {
DbName string `json:"dbName"`
}
func (req *DatabaseReq) GetDbName() string { return req.DbName }
type CollectionNameReq struct {
DbName string `json:"dbName"`
CollectionName string `json:"collectionName" binding:"required"`
PartitionNames []string `json:"partitionNames"`
}
func (req *CollectionNameReq) GetDbName() string {
return req.DbName
}
func (req *CollectionNameReq) GetCollectionName() string {
return req.CollectionName
}
func (req *CollectionNameReq) GetPartitionNames() []string {
return req.PartitionNames
}
type RenameCollectionReq struct {
DbName string `json:"dbName"`
CollectionName string `json:"collectionName" binding:"required"`
NewCollectionName string `json:"newCollectionName" binding:"required"`
NewDbName string `json:"newDbName"`
}
func (req *RenameCollectionReq) GetDbName() string { return req.DbName }
type PartitionReq struct {
// CollectionNameReq
DbName string `json:"dbName"`
CollectionName string `json:"collectionName" binding:"required"`
PartitionName string `json:"partitionName" binding:"required"`
}
func (req *PartitionReq) GetDbName() string { return req.DbName }
func (req *PartitionReq) GetCollectionName() string { return req.CollectionName }
func (req *PartitionReq) GetPartitionName() string { return req.PartitionName }
type QueryReqV2 struct {
DbName string `json:"dbName"`
CollectionName string `json:"collectionName" binding:"required"`
PartitionNames []string `json:"partitionNames"`
OutputFields []string `json:"outputFields"`
Filter string `json:"filter" binding:"required"`
Limit int32 `json:"limit"`
Offset int32 `json:"offset"`
}
func (req *QueryReqV2) GetDbName() string { return req.DbName }
type CollectionIDOutputReq struct {
DbName string `json:"dbName"`
CollectionName string `json:"collectionName" binding:"required"`
PartitionName string `json:"partitionName"`
PartitionNames []string `json:"partitionNames"`
OutputFields []string `json:"outputFields"`
ID interface{} `json:"id" binding:"required"`
}
func (req *CollectionIDOutputReq) GetDbName() string { return req.DbName }
type CollectionIDFilterReq struct {
DbName string `json:"dbName"`
CollectionName string `json:"collectionName" binding:"required"`
PartitionName string `json:"partitionName"`
ID interface{} `json:"id"`
Filter string `json:"filter"`
}
func (req *CollectionIDFilterReq) GetDbName() string { return req.DbName }
type CollectionDataReq struct {
DbName string `json:"dbName"`
CollectionName string `json:"collectionName" binding:"required"`
PartitionName string `json:"partitionName"`
Data []map[string]interface{} `json:"data" binding:"required"`
}
func (req *CollectionDataReq) GetDbName() string { return req.DbName }
type SearchReqV2 struct {
DbName string `json:"dbName"`
CollectionName string `json:"collectionName" binding:"required"`
PartitionNames []string `json:"partitionNames"`
Filter string `json:"filter"`
Limit int32 `json:"limit"`
Offset int32 `json:"offset"`
OutputFields []string `json:"outputFields"`
Vector []float32 `json:"vector"`
Params map[string]float64 `json:"params"`
}
func (req *SearchReqV2) GetDbName() string { return req.DbName }
type ReturnErrMsg struct {
Code int32 `json:"code"`
Message string `json:"message"`
}
type PartitionsReq struct {
// CollectionNameReq
DbName string `json:"dbName"`
CollectionName string `json:"collectionName" binding:"required"`
PartitionNames []string `json:"partitionNames" binding:"required"`
}
func (req *PartitionsReq) GetDbName() string { return req.DbName }
type UserReq struct {
UserName string `json:"userName"`
}
func (req *UserReq) GetUserName() string { return req.UserName }
type UserNameGetter interface {
GetUserName() string
}
type RoleNameGetter interface {
GetRoleName() string
}
type IndexNameGetter interface {
GetIndexName() string
}
type AliasNameGetter interface {
GetAliasName() string
}
type PasswordReq struct {
UserName string `json:"userName"`
Password string `json:"password" binding:"required"`
}
type NewPasswordReq struct {
UserName string `json:"userName"`
Password string `json:"password"`
NewPassword string `json:"newPassword"`
}
type UserRoleReq struct {
UserName string `json:"userName"`
RoleName string `json:"roleName"`
}
type RoleReq struct {
RoleName string `json:"roleName"`
Timeout int32 `json:"timeout"`
}
func (req *RoleReq) GetRoleName() string {
return req.RoleName
}
type GrantReq struct {
RoleName string `json:"roleName" binding:"required"`
ObjectType string `json:"objectType" binding:"required"`
ObjectName string `json:"objectName" binding:"required"`
Privilege string `json:"privilege" binding:"required"`
DbName string `json:"dbName"`
}
type IndexParam struct {
FieldName string `json:"fieldName" binding:"required"`
IndexName string `json:"indexName" binding:"required"`
MetricsType string `json:"metricsType" binding:"required"`
IndexType string `json:"indexType"`
IndexConfig map[string]interface{} `json:"indexConfig"`
}
type IndexParamReq struct {
DbName string `json:"dbName"`
CollectionName string `json:"collectionName" binding:"required"`
IndexParams []IndexParam `json:"indexParams" binding:"required"`
}
func (req *IndexParamReq) GetDbName() string { return req.DbName }
type IndexReq struct {
DbName string `json:"dbName"`
CollectionName string `json:"collectionName" binding:"required"`
IndexName string `json:"indexName"`
Timeout int32 `json:"timeout"`
}
func (req *IndexReq) GetDbName() string { return req.DbName }
func (req *IndexReq) GetCollectionName() string {
return req.CollectionName
}
func (req *IndexReq) GetIndexName() string {
return req.IndexName
}
type FieldSchema struct {
FieldName string `json:"fieldName" binding:"required"`
DataType string `json:"dataType" binding:"required"`
IsPrimary bool `json:"isPrimary"`
IsPartitionKey bool `json:"isPartitionKey"`
Dim int `json:"dimension"`
MaxLength int `json:"maxLength"`
MaxCapacity int `json:"maxCapacity"`
ElementTypeParams map[string]string `json:"elementTypeParams" binding:"required"`
}
type CollectionSchema struct {
Fields []FieldSchema `json:"fields"`
AutoId bool `json:"autoID"`
EnableDynamicField bool `json:"enableDynamicField"`
}
type CollectionReq struct {
DbName string `json:"dbName"`
CollectionName string `json:"collectionName" binding:"required"`
Dimension int32 `json:"dimension"`
MetricsType string `json:"metricsType"`
Schema CollectionSchema `json:"schema"`
IndexParams []IndexParam `json:"indexParams"`
}
func (req *CollectionReq) GetDbName() string { return req.DbName }
type AliasReq struct {
DbName string `json:"dbName"`
AliasName string `json:"aliasName" binding:"required"`
}
func (req *AliasReq) GetAliasName() string {
return req.AliasName
}
type AliasCollectionReq struct {
DbName string `json:"dbName"`
CollectionName string `json:"collectionName" binding:"required"`
AliasName string `json:"aliasName" binding:"required"`
}
func (req *AliasCollectionReq) GetDbName() string { return req.DbName }
func (req *AliasCollectionReq) GetCollectionName() string {
return req.CollectionName
}
func (req *AliasCollectionReq) GetAliasName() string {
return req.AliasName
}
func wrapperReturnHas(has bool) gin.H {
return gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{HTTPReturnHas: has}}
}
func wrapperReturnList(names []string) gin.H {
if names == nil {
return gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: []string{}}
}
return gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: names}
}
func wrapperReturnRowCount(pairs []*commonpb.KeyValuePair) gin.H {
rowCount := "0"
for _, keyValue := range pairs {
if keyValue.Key == "row_count" {
rowCount = keyValue.GetValue()
}
}
return gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{HTTPReturnRowCount: rowCount}}
}
func wrapperReturnDefault() gin.H {
return gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: gin.H{}}
}

View File

@ -0,0 +1,199 @@
package httpserver
import (
"bytes"
"fmt"
"net/http"
"strconv"
"sync"
"time"
"github.com/gin-gonic/gin"
)
func defaultResponse(c *gin.Context) {
c.String(http.StatusRequestTimeout, "timeout")
}
// BufferPool represents a pool of buffers.
type BufferPool struct {
pool sync.Pool
}
// Get returns a buffer from the buffer pool.
// If the pool is empty, a new buffer is created and returned.
func (p *BufferPool) Get() *bytes.Buffer {
buf := p.pool.Get()
if buf == nil {
return &bytes.Buffer{}
}
return buf.(*bytes.Buffer)
}
// Put adds a buffer back to the pool.
func (p *BufferPool) Put(buf *bytes.Buffer) {
p.pool.Put(buf)
}
// Timeout struct
type Timeout struct {
timeout time.Duration
handler gin.HandlerFunc
response gin.HandlerFunc
}
// Writer is a writer with memory buffer
type Writer struct {
gin.ResponseWriter
body *bytes.Buffer
headers http.Header
mu sync.Mutex
timeout bool
wroteHeaders bool
code int
}
// NewWriter will return a timeout.Writer pointer
func NewWriter(w gin.ResponseWriter, buf *bytes.Buffer) *Writer {
return &Writer{ResponseWriter: w, body: buf, headers: make(http.Header)}
}
// Write will write data to response body
func (w *Writer) Write(data []byte) (int, error) {
if w.timeout || w.body == nil {
return 0, nil
}
w.mu.Lock()
defer w.mu.Unlock()
return w.body.Write(data)
}
// WriteHeader sends an HTTP response header with the provided status code.
// If the response writer has already written headers or if a timeout has occurred,
// this method does nothing.
func (w *Writer) WriteHeader(code int) {
if w.timeout || w.wroteHeaders {
return
}
// gin is using -1 to skip writing the status code
// see https://github.com/gin-gonic/gin/blob/a0acf1df2814fcd828cb2d7128f2f4e2136d3fac/response_writer.go#L61
if code == -1 {
return
}
checkWriteHeaderCode(code)
w.mu.Lock()
defer w.mu.Unlock()
w.writeHeader(code)
w.ResponseWriter.WriteHeader(code)
}
func (w *Writer) writeHeader(code int) {
w.wroteHeaders = true
w.code = code
}
// Header will get response headers
func (w *Writer) Header() http.Header {
return w.headers
}
// WriteString will write string to response body
func (w *Writer) WriteString(s string) (int, error) {
return w.Write([]byte(s))
}
// FreeBuffer will release buffer pointer
func (w *Writer) FreeBuffer() {
// if not reset body,old bytes will put in bufPool
w.body.Reset()
w.body = nil
}
// Status we must override Status func here,
// or the http status code returned by gin.Context.Writer.Status()
// will always be 200 in other custom gin middlewares.
func (w *Writer) Status() int {
if w.code == 0 || w.timeout {
return w.ResponseWriter.Status()
}
return w.code
}
func checkWriteHeaderCode(code int) {
if code < 100 || code > 999 {
panic(fmt.Sprintf("invalid http status code: %d", code))
}
}
func timeoutMiddleware(handler gin.HandlerFunc) gin.HandlerFunc {
t := &Timeout{
timeout: HTTPDefaultTimeout,
handler: handler,
response: defaultResponse,
}
bufPool := &BufferPool{}
return func(c *gin.Context) {
timeoutSecond, err := strconv.ParseInt(c.Request.Header.Get(HTTPHeaderRequestTimeout), 10, 64)
if err == nil {
t.timeout = time.Duration(timeoutSecond) * time.Second
}
finish := make(chan struct{}, 1)
panicChan := make(chan interface{}, 1)
w := c.Writer
buffer := bufPool.Get()
tw := NewWriter(w, buffer)
c.Writer = tw
buffer.Reset()
go func() {
defer func() {
if p := recover(); p != nil {
panicChan <- p
}
}()
t.handler(c)
finish <- struct{}{}
}()
select {
case p := <-panicChan:
tw.FreeBuffer()
c.Writer = w
panic(p)
case <-finish:
c.Next()
tw.mu.Lock()
defer tw.mu.Unlock()
dst := tw.ResponseWriter.Header()
for k, vv := range tw.Header() {
dst[k] = vv
}
if _, err := tw.ResponseWriter.Write(buffer.Bytes()); err != nil {
panic(err)
}
tw.FreeBuffer()
bufPool.Put(buffer)
case <-time.After(t.timeout):
c.Abort()
tw.mu.Lock()
defer tw.mu.Unlock()
tw.timeout = true
tw.FreeBuffer()
bufPool.Put(buffer)
c.Writer = w
t.response(c)
c.Writer = tw
}
}
}

View File

@ -162,8 +162,8 @@ func printIndexes(indexes []*milvuspb.IndexDescription) []gin.H {
var res []gin.H
for _, index := range indexes {
res = append(res, gin.H{
HTTPReturnIndexName: index.IndexName,
HTTPReturnIndexField: index.FieldName,
HTTPIndexName: index.IndexName,
HTTPIndexField: index.FieldName,
HTTPReturnIndexMetricsType: getMetricType(index.Params),
})
}

View File

@ -292,8 +292,8 @@ func TestPrintCollectionDetails(t *testing.T) {
}, printFields(coll.Fields))
assert.Equal(t, []gin.H{
{
HTTPReturnIndexName: DefaultIndexName,
HTTPReturnIndexField: FieldBookIntro,
HTTPIndexName: DefaultIndexName,
HTTPIndexField: FieldBookIntro,
HTTPReturnIndexMetricsType: DefaultMetricType,
},
}, printIndexes(indexes))

View File

@ -202,6 +202,8 @@ func (s *Server) startHTTPServer(errChan chan error) {
}
app := ginHandler.Group("/v1")
httpserver.NewHandlersV1(s.proxy).RegisterRoutesToV1(app)
appV2 := ginHandler.Group("/v2/vectordb")
httpserver.NewHandlersV2(s.proxy).RegisterRoutesToV2(appV2)
s.httpServer = &http.Server{Handler: ginHandler, ReadHeaderTimeout: time.Second}
errChan <- nil
if err := s.httpServer.Serve(s.httpListener); err != nil && err != cmux.ErrServerClosed {

View File

@ -904,10 +904,13 @@ func GetCurDBNameFromContextOrDefault(ctx context.Context) string {
}
func NewContextWithMetadata(ctx context.Context, username string, dbName string) context.Context {
dbKey := strings.ToLower(util.HeaderDBName)
if username == "" {
return contextutil.AppendToIncomingContext(ctx, dbKey, dbName)
}
originValue := fmt.Sprintf("%s%s%s", username, util.CredentialSeperator, username)
authKey := strings.ToLower(util.HeaderAuthorize)
authValue := crypto.Base64Encode(originValue)
dbKey := strings.ToLower(util.HeaderDBName)
return contextutil.AppendToIncomingContext(ctx, authKey, authValue, dbKey, dbName)
}