mirror of https://github.com/milvus-io/milvus.git
Add more test cases for proxy implementation (#7685)
Signed-off-by: dragondriver <jiquan.long@zilliz.com>pull/7635/head
parent
04b8160cac
commit
a872a4357c
|
@ -22,6 +22,8 @@ import (
|
|||
"sync"
|
||||
"unsafe"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
|
@ -166,11 +168,17 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
}
|
||||
|
||||
// replace pchannel with vchannel
|
||||
for _, pos := range iMsg.startPositions {
|
||||
startPositions := make([]*internalpb.MsgPosition, 0, len(iMsg.startPositions))
|
||||
for idx := range iMsg.startPositions {
|
||||
pos := proto.Clone(iMsg.startPositions[idx]).(*internalpb.MsgPosition)
|
||||
pos.ChannelName = ibNode.channelName
|
||||
startPositions = append(startPositions, pos)
|
||||
}
|
||||
for _, pos := range iMsg.endPositions {
|
||||
endPositions := make([]*internalpb.MsgPosition, 0, len(iMsg.endPositions))
|
||||
for idx := range iMsg.endPositions {
|
||||
pos := proto.Clone(iMsg.endPositions[idx]).(*internalpb.MsgPosition)
|
||||
pos.ChannelName = ibNode.channelName
|
||||
endPositions = append(endPositions, pos)
|
||||
}
|
||||
|
||||
// Updating segment statistics
|
||||
|
@ -183,7 +191,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
|
||||
if !ibNode.replica.hasSegment(currentSegID, true) {
|
||||
err := ibNode.replica.addNewSegment(currentSegID, collID, partitionID, msg.GetChannelID(),
|
||||
iMsg.startPositions[0], iMsg.endPositions[0])
|
||||
startPositions[0], endPositions[0])
|
||||
if err != nil {
|
||||
log.Error("add segment wrong", zap.Error(err))
|
||||
}
|
||||
|
@ -605,7 +613,13 @@ func (ibNode *insertBufferNode) bufferInsertMsg(iMsg *insertMsg, msg *msgstream.
|
|||
ibNode.insertBuffer.insertData[currentSegID] = idata
|
||||
|
||||
// store current endPositions as Segment->EndPostion
|
||||
ibNode.replica.updateSegmentEndPosition(currentSegID, iMsg.endPositions[0])
|
||||
endPositions := make([]*internalpb.MsgPosition, 0, len(iMsg.endPositions))
|
||||
for idx := range iMsg.endPositions {
|
||||
pos := proto.Clone(iMsg.endPositions[idx]).(*internalpb.MsgPosition)
|
||||
pos.ChannelName = ibNode.channelName
|
||||
endPositions = append(endPositions, pos)
|
||||
}
|
||||
ibNode.replica.updateSegmentEndPosition(currentSegID, endPositions[0])
|
||||
// update segment pk filter
|
||||
ibNode.replica.updateSegmentPKRange(currentSegID, msg.GetRowIDs())
|
||||
return nil
|
||||
|
|
|
@ -126,7 +126,6 @@ func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.Creat
|
|||
Condition: NewTaskCondition(ctx),
|
||||
CreateCollectionRequest: request,
|
||||
rootCoord: node.rootCoord,
|
||||
dataCoordClient: node.dataCoord,
|
||||
}
|
||||
|
||||
log.Debug("CreateCollection enqueue",
|
||||
|
@ -935,7 +934,7 @@ func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateInde
|
|||
if !node.checkHealthy() {
|
||||
return unhealthyStatus(), nil
|
||||
}
|
||||
cit := &CreateIndexTask{
|
||||
cit := &createIndexTask{
|
||||
ctx: ctx,
|
||||
Condition: NewTaskCondition(ctx),
|
||||
CreateIndexRequest: request,
|
||||
|
@ -993,7 +992,7 @@ func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.Describe
|
|||
Status: unhealthyStatus(),
|
||||
}, nil
|
||||
}
|
||||
dit := &DescribeIndexTask{
|
||||
dit := &describeIndexTask{
|
||||
ctx: ctx,
|
||||
Condition: NewTaskCondition(ctx),
|
||||
DescribeIndexRequest: request,
|
||||
|
@ -1023,7 +1022,7 @@ func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.Describe
|
|||
zap.String("db", request.DbName),
|
||||
zap.String("collection", request.CollectionName),
|
||||
zap.String("field", request.FieldName),
|
||||
zap.String("index name", request.IndexName))
|
||||
)
|
||||
defer func() {
|
||||
log.Debug("DescribeIndex Done",
|
||||
zap.Error(err),
|
||||
|
@ -1057,7 +1056,7 @@ func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexReq
|
|||
if !node.checkHealthy() {
|
||||
return unhealthyStatus(), nil
|
||||
}
|
||||
dit := &DropIndexTask{
|
||||
dit := &dropIndexTask{
|
||||
ctx: ctx,
|
||||
Condition: NewTaskCondition(ctx),
|
||||
DropIndexRequest: request,
|
||||
|
@ -1117,7 +1116,7 @@ func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb.
|
|||
Status: unhealthyStatus(),
|
||||
}, nil
|
||||
}
|
||||
gibpt := &GetIndexBuildProgressTask{
|
||||
gibpt := &getIndexBuildProgressTask{
|
||||
ctx: ctx,
|
||||
Condition: NewTaskCondition(ctx),
|
||||
GetIndexBuildProgressRequest: request,
|
||||
|
@ -1183,7 +1182,7 @@ func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndex
|
|||
Status: unhealthyStatus(),
|
||||
}, nil
|
||||
}
|
||||
dipt := &GetIndexStateTask{
|
||||
dipt := &getIndexStateTask{
|
||||
ctx: ctx,
|
||||
Condition: NewTaskCondition(ctx),
|
||||
GetIndexStateRequest: request,
|
||||
|
@ -1246,7 +1245,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
|
|||
Status: unhealthyStatus(),
|
||||
}, nil
|
||||
}
|
||||
it := &InsertTask{
|
||||
it := &insertTask{
|
||||
ctx: ctx,
|
||||
Condition: NewTaskCondition(ctx),
|
||||
dataCoord: node.dataCoord,
|
||||
|
@ -1312,8 +1311,6 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
|
|||
zap.String("db", request.DbName),
|
||||
zap.String("collection", request.CollectionName),
|
||||
zap.String("partition", request.PartitionName),
|
||||
zap.Any("len(RowData)", len(it.RowData)),
|
||||
zap.Any("len(RowIDs)", len(it.RowIDs)),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
|
@ -1420,7 +1417,7 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest)
|
|||
Status: unhealthyStatus(),
|
||||
}, nil
|
||||
}
|
||||
qt := &SearchTask{
|
||||
qt := &searchTask{
|
||||
ctx: ctx,
|
||||
Condition: NewTaskCondition(ctx),
|
||||
SearchRequest: &internalpb.SearchRequest{
|
||||
|
@ -1513,7 +1510,7 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*
|
|||
resp.Status.Reason = "proxy is not healthy"
|
||||
return resp, nil
|
||||
}
|
||||
ft := &FlushTask{
|
||||
ft := &flushTask{
|
||||
ctx: ctx,
|
||||
Condition: NewTaskCondition(ctx),
|
||||
FlushRequest: request,
|
||||
|
@ -1570,7 +1567,7 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
|
|||
OutputFields: request.OutputFields,
|
||||
}
|
||||
|
||||
qt := &QueryTask{
|
||||
qt := &queryTask{
|
||||
ctx: ctx,
|
||||
Condition: NewTaskCondition(ctx),
|
||||
RetrieveRequest: &internalpb.RetrieveRequest{
|
||||
|
@ -1657,7 +1654,7 @@ func (node *Proxy) CalcDistance(ctx context.Context, request *milvuspb.CalcDista
|
|||
OutputFields: outputFields,
|
||||
}
|
||||
|
||||
qt := &QueryTask{
|
||||
qt := &queryTask{
|
||||
ctx: ctx,
|
||||
Condition: NewTaskCondition(ctx),
|
||||
RetrieveRequest: &internalpb.RetrieveRequest{
|
||||
|
|
|
@ -17,6 +17,8 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/msgstream"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
|
@ -353,3 +355,201 @@ func (factory *simpleMockMsgStreamFactory) NewQueryMsgStream(ctx context.Context
|
|||
func newSimpleMockMsgStreamFactory() *simpleMockMsgStreamFactory {
|
||||
return &simpleMockMsgStreamFactory{}
|
||||
}
|
||||
|
||||
func generateBoolArray(numRows int) []bool {
|
||||
ret := make([]bool, 0, numRows)
|
||||
for i := 0; i < numRows; i++ {
|
||||
ret = append(ret, rand.Int()%2 == 0)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func generateInt8Array(numRows int) []int8 {
|
||||
ret := make([]int8, 0, numRows)
|
||||
for i := 0; i < numRows; i++ {
|
||||
ret = append(ret, int8(rand.Int()))
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func generateInt16Array(numRows int) []int16 {
|
||||
ret := make([]int16, 0, numRows)
|
||||
for i := 0; i < numRows; i++ {
|
||||
ret = append(ret, int16(rand.Int()))
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func generateInt32Array(numRows int) []int32 {
|
||||
ret := make([]int32, 0, numRows)
|
||||
for i := 0; i < numRows; i++ {
|
||||
ret = append(ret, int32(rand.Int()))
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func generateInt64Array(numRows int) []int64 {
|
||||
ret := make([]int64, 0, numRows)
|
||||
for i := 0; i < numRows; i++ {
|
||||
ret = append(ret, int64(rand.Int()))
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func generateFloat32Array(numRows int) []float32 {
|
||||
ret := make([]float32, 0, numRows)
|
||||
for i := 0; i < numRows; i++ {
|
||||
ret = append(ret, rand.Float32())
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func generateFloat64Array(numRows int) []float64 {
|
||||
ret := make([]float64, 0, numRows)
|
||||
for i := 0; i < numRows; i++ {
|
||||
ret = append(ret, rand.Float64())
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func generateFloatVectors(numRows, dim int) []float32 {
|
||||
total := numRows * dim
|
||||
ret := make([]float32, 0, total)
|
||||
for i := 0; i < total; i++ {
|
||||
ret = append(ret, rand.Float32())
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func generateBinaryVectors(numRows, dim int) []byte {
|
||||
total := (numRows * dim) / 8
|
||||
ret := make([]byte, total)
|
||||
_, err := rand.Read(ret)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func newScalarFieldData(dType schemapb.DataType, fieldName string, numRows int) *schemapb.FieldData {
|
||||
ret := &schemapb.FieldData{
|
||||
Type: dType,
|
||||
FieldName: fieldName,
|
||||
Field: nil,
|
||||
}
|
||||
|
||||
switch dType {
|
||||
case schemapb.DataType_Bool:
|
||||
ret.Field = &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_BoolData{
|
||||
BoolData: &schemapb.BoolArray{
|
||||
Data: generateBoolArray(numRows),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
case schemapb.DataType_Int8:
|
||||
ret.Field = &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_IntData{
|
||||
IntData: &schemapb.IntArray{
|
||||
Data: generateInt32Array(numRows),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
case schemapb.DataType_Int16:
|
||||
ret.Field = &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_IntData{
|
||||
IntData: &schemapb.IntArray{
|
||||
Data: generateInt32Array(numRows),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
case schemapb.DataType_Int32:
|
||||
ret.Field = &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_IntData{
|
||||
IntData: &schemapb.IntArray{
|
||||
Data: generateInt32Array(numRows),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
case schemapb.DataType_Int64:
|
||||
ret.Field = &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_LongData{
|
||||
LongData: &schemapb.LongArray{
|
||||
Data: generateInt64Array(numRows),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
case schemapb.DataType_Float:
|
||||
ret.Field = &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_FloatData{
|
||||
FloatData: &schemapb.FloatArray{
|
||||
Data: generateFloat32Array(numRows),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
case schemapb.DataType_Double:
|
||||
ret.Field = &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_DoubleData{
|
||||
DoubleData: &schemapb.DoubleArray{
|
||||
Data: generateFloat64Array(numRows),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
func newFloatVectorFieldData(fieldName string, numRows, dim int) *schemapb.FieldData {
|
||||
return &schemapb.FieldData{
|
||||
Type: schemapb.DataType_FloatVector,
|
||||
FieldName: fieldName,
|
||||
Field: &schemapb.FieldData_Vectors{
|
||||
Vectors: &schemapb.VectorField{
|
||||
Dim: int64(dim),
|
||||
Data: &schemapb.VectorField_FloatVector{
|
||||
FloatVector: &schemapb.FloatArray{
|
||||
Data: generateFloatVectors(numRows, dim),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newBinaryVectorFieldData(fieldName string, numRows, dim int) *schemapb.FieldData {
|
||||
return &schemapb.FieldData{
|
||||
Type: schemapb.DataType_BinaryVector,
|
||||
FieldName: fieldName,
|
||||
Field: &schemapb.FieldData_Vectors{
|
||||
Vectors: &schemapb.VectorField{
|
||||
Dim: int64(dim),
|
||||
Data: &schemapb.VectorField_BinaryVector{
|
||||
BinaryVector: generateBinaryVectors(numRows, dim),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func generateHashKeys(numRows int) []uint32 {
|
||||
ret := make([]uint32, 0, numRows)
|
||||
for i := 0; i < numRows; i++ {
|
||||
ret = append(ret, rand.Uint32())
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
|
|
@ -1,13 +1,24 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/distance"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
@ -466,6 +477,14 @@ func TestProxy(t *testing.T) {
|
|||
int64Field := "int64"
|
||||
floatVecField := "fVec"
|
||||
dim := 128
|
||||
rowNum := 3000
|
||||
indexName := "_default"
|
||||
nlist := 10
|
||||
nprobe := 10
|
||||
topk := 10
|
||||
nq := 10
|
||||
expr := fmt.Sprintf("%s > 0", int64Field)
|
||||
var segmentIDs []int64
|
||||
|
||||
// an int64 field (pk) & a float vector field
|
||||
constructCollectionSchema := func() *schemapb.CollectionSchema {
|
||||
|
@ -519,6 +538,113 @@ func TestProxy(t *testing.T) {
|
|||
}
|
||||
createCollectionReq := constructCreateCollectionRequest()
|
||||
|
||||
constructInsertRequest := func() *milvuspb.InsertRequest {
|
||||
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
|
||||
hashKeys := generateHashKeys(rowNum)
|
||||
return &milvuspb.InsertRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
PartitionName: "",
|
||||
FieldsData: []*schemapb.FieldData{fVecColumn},
|
||||
HashKeys: hashKeys,
|
||||
NumRows: uint32(rowNum),
|
||||
}
|
||||
}
|
||||
|
||||
constructCreateIndexRequest := func() *milvuspb.CreateIndexRequest {
|
||||
return &milvuspb.CreateIndexRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
FieldName: floatVecField,
|
||||
ExtraParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: "dim",
|
||||
Value: strconv.Itoa(dim),
|
||||
},
|
||||
{
|
||||
Key: MetricTypeKey,
|
||||
Value: distance.L2,
|
||||
},
|
||||
{
|
||||
Key: "index_type",
|
||||
Value: "IVF_FLAT",
|
||||
},
|
||||
{
|
||||
Key: "nlist",
|
||||
Value: strconv.Itoa(nlist),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
constructPlaceholderGroup := func() *milvuspb.PlaceholderGroup {
|
||||
values := make([][]byte, 0, nq)
|
||||
for i := 0; i < nq; i++ {
|
||||
bs := make([]byte, 0, dim*4)
|
||||
for j := 0; j < dim; j++ {
|
||||
var buffer bytes.Buffer
|
||||
f := rand.Float32()
|
||||
err := binary.Write(&buffer, binary.LittleEndian, f)
|
||||
assert.NoError(t, err)
|
||||
bs = append(bs, buffer.Bytes()...)
|
||||
}
|
||||
values = append(values, bs)
|
||||
}
|
||||
|
||||
return &milvuspb.PlaceholderGroup{
|
||||
Placeholders: []*milvuspb.PlaceholderValue{
|
||||
{
|
||||
Tag: "$0",
|
||||
Type: milvuspb.PlaceholderType_FloatVector,
|
||||
Values: values,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
constructSearchRequest := func() *milvuspb.SearchRequest {
|
||||
params := make(map[string]string)
|
||||
params["nprobe"] = strconv.Itoa(nprobe)
|
||||
b, err := json.Marshal(params)
|
||||
assert.NoError(t, err)
|
||||
plg := constructPlaceholderGroup()
|
||||
plgBs, err := proto.Marshal(plg)
|
||||
assert.NoError(t, err)
|
||||
|
||||
return &milvuspb.SearchRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
PartitionNames: nil,
|
||||
Dsl: expr,
|
||||
PlaceholderGroup: plgBs,
|
||||
DslType: commonpb.DslType_BoolExprV1,
|
||||
OutputFields: nil,
|
||||
SearchParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: MetricTypeKey,
|
||||
Value: distance.L2,
|
||||
},
|
||||
{
|
||||
Key: SearchParamsKey,
|
||||
Value: string(b),
|
||||
},
|
||||
{
|
||||
Key: AnnsFieldKey,
|
||||
Value: floatVecField,
|
||||
},
|
||||
{
|
||||
Key: TopKKey,
|
||||
Value: strconv.Itoa(topk),
|
||||
},
|
||||
},
|
||||
TravelTimestamp: 0,
|
||||
GuaranteeTimestamp: 0,
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("create collection", func(t *testing.T) {
|
||||
req := createCollectionReq
|
||||
resp, err := proxy.CreateCollection(ctx, req)
|
||||
|
@ -555,62 +681,6 @@ func TestProxy(t *testing.T) {
|
|||
assert.False(t, resp.Value)
|
||||
})
|
||||
|
||||
t.Run("load collection", func(t *testing.T) {
|
||||
resp, err := proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
|
||||
// load other collection -> fail
|
||||
resp, err = proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: otherCollectionName,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("show in-memory collections", func(t *testing.T) {
|
||||
resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
TimeStamp: 0,
|
||||
Type: milvuspb.ShowType_InMemory,
|
||||
CollectionNames: nil,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
assert.Equal(t, 1, len(resp.CollectionNames))
|
||||
|
||||
// get in-memory percentage
|
||||
resp, err = proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
TimeStamp: 0,
|
||||
Type: milvuspb.ShowType_InMemory,
|
||||
CollectionNames: []string{collectionName},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
assert.Equal(t, 1, len(resp.CollectionNames))
|
||||
assert.Equal(t, 1, len(resp.InMemoryPercentages))
|
||||
|
||||
// get in-memory percentage of not loaded collection -> fail
|
||||
resp, err = proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
TimeStamp: 0,
|
||||
Type: milvuspb.ShowType_InMemory,
|
||||
CollectionNames: []string{otherCollectionName},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("describe collection", func(t *testing.T) {
|
||||
collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
|
||||
assert.NoError(t, err)
|
||||
|
@ -737,6 +807,166 @@ func TestProxy(t *testing.T) {
|
|||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("get partition statistics", func(t *testing.T) {
|
||||
resp, err := proxy.GetPartitionStatistics(ctx, &milvuspb.GetPartitionStatisticsRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
PartitionName: partitionName,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
|
||||
// non-exist partition -> fail
|
||||
resp, err = proxy.GetPartitionStatistics(ctx, &milvuspb.GetPartitionStatisticsRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
PartitionName: otherPartitionName,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
|
||||
// non-exist collection -> fail
|
||||
resp, err = proxy.GetPartitionStatistics(ctx, &milvuspb.GetPartitionStatisticsRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: otherCollectionName,
|
||||
PartitionName: partitionName,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("show partitions", func(t *testing.T) {
|
||||
collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
|
||||
assert.NoError(t, err)
|
||||
|
||||
resp, err := proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
CollectionID: collectionID,
|
||||
PartitionNames: nil,
|
||||
Type: milvuspb.ShowType_All,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
// default partition
|
||||
assert.Equal(t, 2, len(resp.PartitionNames))
|
||||
|
||||
// non-exist collection -> fail
|
||||
resp, err = proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: otherCollectionName,
|
||||
CollectionID: collectionID + 1,
|
||||
PartitionNames: nil,
|
||||
Type: milvuspb.ShowType_All,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("insert", func(t *testing.T) {
|
||||
req := constructInsertRequest()
|
||||
|
||||
resp, err := proxy.Insert(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
assert.Equal(t, rowNum, len(resp.SuccIndex))
|
||||
assert.Equal(t, 0, len(resp.ErrIndex))
|
||||
assert.Equal(t, int64(rowNum), resp.InsertCnt)
|
||||
})
|
||||
|
||||
// TODO(dragondriver): proxy.Delete()
|
||||
|
||||
t.Run("create index", func(t *testing.T) {
|
||||
req := constructCreateIndexRequest()
|
||||
|
||||
resp, err := proxy.CreateIndex(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("describe index", func(t *testing.T) {
|
||||
resp, err := proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
FieldName: floatVecField,
|
||||
IndexName: "",
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
indexName = resp.IndexDescriptions[0].IndexName
|
||||
})
|
||||
|
||||
t.Run("get index build progress", func(t *testing.T) {
|
||||
resp, err := proxy.GetIndexBuildProgress(ctx, &milvuspb.GetIndexBuildProgressRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
FieldName: floatVecField,
|
||||
IndexName: indexName,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("get index state", func(t *testing.T) {
|
||||
resp, err := proxy.GetIndexState(ctx, &milvuspb.GetIndexStateRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
FieldName: floatVecField,
|
||||
IndexName: indexName,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("flush", func(t *testing.T) {
|
||||
resp, err := proxy.Flush(ctx, &milvuspb.FlushRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionNames: []string{collectionName},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
segmentIDs = resp.CollSegIDs[collectionName].Data
|
||||
|
||||
f := func() bool {
|
||||
states := make(map[int64]commonpb.SegmentState) // segment id -> segment state
|
||||
for _, id := range segmentIDs {
|
||||
states[id] = commonpb.SegmentState_Sealed
|
||||
}
|
||||
resp, err := proxy.GetPersistentSegmentInfo(ctx, &milvuspb.GetPersistentSegmentInfoRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
for _, info := range resp.Infos {
|
||||
states[info.SegmentID] = info.State
|
||||
}
|
||||
for id, state := range states {
|
||||
if state != commonpb.SegmentState_Flushed {
|
||||
log.Debug("waiting for segment to be flushed",
|
||||
zap.Int64("segment id", id),
|
||||
zap.Any("state", state))
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// waiting for flush operation to be done
|
||||
for f() {
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("load partitions", func(t *testing.T) {
|
||||
resp, err := proxy.LoadPartitions(ctx, &milvuspb.LoadPartitionsRequest{
|
||||
Base: nil,
|
||||
|
@ -810,65 +1040,202 @@ func TestProxy(t *testing.T) {
|
|||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("get partition statistics", func(t *testing.T) {
|
||||
resp, err := proxy.GetPartitionStatistics(ctx, &milvuspb.GetPartitionStatisticsRequest{
|
||||
t.Run("load collection", func(t *testing.T) {
|
||||
resp, err := proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
PartitionName: partitionName,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
|
||||
// non-exist partition -> fail
|
||||
resp, err = proxy.GetPartitionStatistics(ctx, &milvuspb.GetPartitionStatisticsRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
PartitionName: otherPartitionName,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
|
||||
// non-exist collection -> fail
|
||||
resp, err = proxy.GetPartitionStatistics(ctx, &milvuspb.GetPartitionStatisticsRequest{
|
||||
// load other collection -> fail
|
||||
resp, err = proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: otherCollectionName,
|
||||
PartitionName: partitionName,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
|
||||
f := func() bool {
|
||||
resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
TimeStamp: 0,
|
||||
Type: milvuspb.ShowType_InMemory,
|
||||
CollectionNames: []string{collectionName},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
|
||||
for idx, name := range resp.CollectionNames {
|
||||
if name == collectionName && resp.InMemoryPercentages[idx] == 100 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// waiting for collection to be loaded
|
||||
for f() {
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("show in-memory collections", func(t *testing.T) {
|
||||
resp, err := proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
TimeStamp: 0,
|
||||
Type: milvuspb.ShowType_InMemory,
|
||||
CollectionNames: nil,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
assert.Equal(t, 1, len(resp.CollectionNames))
|
||||
|
||||
// get in-memory percentage
|
||||
resp, err = proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
TimeStamp: 0,
|
||||
Type: milvuspb.ShowType_InMemory,
|
||||
CollectionNames: []string{collectionName},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
assert.Equal(t, 1, len(resp.CollectionNames))
|
||||
assert.Equal(t, 1, len(resp.InMemoryPercentages))
|
||||
|
||||
// get in-memory percentage of not loaded collection -> fail
|
||||
resp, err = proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
TimeStamp: 0,
|
||||
Type: milvuspb.ShowType_InMemory,
|
||||
CollectionNames: []string{otherCollectionName},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("show partitions", func(t *testing.T) {
|
||||
collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
|
||||
assert.NoError(t, err)
|
||||
t.Run("search", func(t *testing.T) {
|
||||
req := constructSearchRequest()
|
||||
|
||||
resp, err := proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
|
||||
//resp, err := proxy.Search(ctx, req)
|
||||
_, err := proxy.Search(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
// FIXME(dragondriver)
|
||||
// assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
// TODO(dragondriver): compare search result
|
||||
})
|
||||
|
||||
t.Run("query", func(t *testing.T) {
|
||||
//resp, err := proxy.Query(ctx, &milvuspb.QueryRequest{
|
||||
_, err := proxy.Query(ctx, &milvuspb.QueryRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
Expr: expr,
|
||||
OutputFields: nil,
|
||||
PartitionNames: nil,
|
||||
TravelTimestamp: 0,
|
||||
GuaranteeTimestamp: 0,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
// FIXME(dragondriver)
|
||||
// assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
// TODO(dragondriver): compare query result
|
||||
})
|
||||
|
||||
t.Run("calculate distance", func(t *testing.T) {
|
||||
opLeft := &milvuspb.VectorsArray{
|
||||
Array: &milvuspb.VectorsArray_DataArray{
|
||||
DataArray: &schemapb.VectorField{
|
||||
Dim: int64(dim),
|
||||
Data: &schemapb.VectorField_FloatVector{
|
||||
FloatVector: &schemapb.FloatArray{
|
||||
Data: generateFloatVectors(nq, dim),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
opRight := &milvuspb.VectorsArray{
|
||||
Array: &milvuspb.VectorsArray_DataArray{
|
||||
DataArray: &schemapb.VectorField{
|
||||
Dim: int64(dim),
|
||||
Data: &schemapb.VectorField_FloatVector{
|
||||
FloatVector: &schemapb.FloatArray{
|
||||
Data: generateFloatVectors(nq, dim),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
//resp, err := proxy.CalcDistance(ctx, &milvuspb.CalcDistanceRequest{
|
||||
_, err := proxy.CalcDistance(ctx, &milvuspb.CalcDistanceRequest{
|
||||
Base: nil,
|
||||
OpLeft: opLeft,
|
||||
OpRight: opRight,
|
||||
Params: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: MetricTypeKey,
|
||||
Value: distance.L2,
|
||||
},
|
||||
},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
// assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
// TODO(dragondriver): compare distance
|
||||
|
||||
// TODO(dragondriver): use primary key to calculate distance
|
||||
})
|
||||
|
||||
t.Run("get dd channel", func(t *testing.T) {
|
||||
f := func() {
|
||||
_, _ = proxy.GetDdChannel(ctx, &internalpb.GetDdChannelRequest{})
|
||||
}
|
||||
assert.Panics(t, f)
|
||||
})
|
||||
|
||||
t.Run("get persistent segment info", func(t *testing.T) {
|
||||
resp, err := proxy.GetPersistentSegmentInfo(ctx, &milvuspb.GetPersistentSegmentInfoRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
CollectionID: collectionID,
|
||||
PartitionNames: nil,
|
||||
Type: milvuspb.ShowType_All,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
// default partition
|
||||
assert.Equal(t, 2, len(resp.PartitionNames))
|
||||
})
|
||||
|
||||
// non-exist collection -> fail
|
||||
resp, err = proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
|
||||
t.Run("get query segment info", func(t *testing.T) {
|
||||
resp, err := proxy.GetQuerySegmentInfo(ctx, &milvuspb.GetQuerySegmentInfoRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: otherCollectionName,
|
||||
CollectionID: collectionID + 1,
|
||||
PartitionNames: nil,
|
||||
Type: milvuspb.ShowType_All,
|
||||
CollectionName: collectionName,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
// TODO(dragondriver): dummy
|
||||
|
||||
t.Run("register link", func(t *testing.T) {
|
||||
resp, err := proxy.RegisterLink(ctx, &milvuspb.RegisterLinkRequest{})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("get metrics", func(t *testing.T) {
|
||||
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
|
||||
assert.NoError(t, err)
|
||||
resp, err := proxy.GetMetrics(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("release partition", func(t *testing.T) {
|
||||
|
@ -1025,6 +1392,18 @@ func TestProxy(t *testing.T) {
|
|||
assert.Equal(t, 0, len(resp.CollectionNames))
|
||||
})
|
||||
|
||||
t.Run("drop index", func(t *testing.T) {
|
||||
resp, err := proxy.DropIndex(ctx, &milvuspb.DropIndexRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
FieldName: floatVecField,
|
||||
IndexName: indexName,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("drop collection", func(t *testing.T) {
|
||||
collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
|
||||
assert.NoError(t, err)
|
||||
|
|
|
@ -0,0 +1,646 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/uniquegenerator"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/milvuserrors"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
||||
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
||||
)
|
||||
|
||||
type collectionMeta struct {
|
||||
name string
|
||||
id typeutil.UniqueID
|
||||
schema *schemapb.CollectionSchema
|
||||
shardsNum int32
|
||||
virtualChannelNames []string
|
||||
physicalChannelNames []string
|
||||
createdTimestamp uint64
|
||||
createdUtcTimestamp uint64
|
||||
}
|
||||
|
||||
type partitionMeta struct {
|
||||
createdTimestamp uint64
|
||||
createdUtcTimestamp uint64
|
||||
}
|
||||
|
||||
type partitionMap struct {
|
||||
collID typeutil.UniqueID
|
||||
// naive inverted index
|
||||
partitionName2ID map[string]typeutil.UniqueID
|
||||
partitionID2Name map[typeutil.UniqueID]string
|
||||
partitionID2Meta map[typeutil.UniqueID]partitionMeta
|
||||
}
|
||||
|
||||
type RootCoordMock struct {
|
||||
nodeID typeutil.UniqueID
|
||||
address string
|
||||
|
||||
state atomic.Value // internal.StateCode
|
||||
|
||||
statisticsChannel string
|
||||
timeTickChannel string
|
||||
|
||||
// naive inverted index
|
||||
collName2ID map[string]typeutil.UniqueID
|
||||
collID2Meta map[typeutil.UniqueID]collectionMeta
|
||||
collMtx sync.RWMutex
|
||||
|
||||
// TODO(dragondriver): need default partition?
|
||||
collID2Partitions map[typeutil.UniqueID]partitionMap
|
||||
partitionMtx sync.RWMutex
|
||||
|
||||
// TODO(dragondriver): index-related
|
||||
|
||||
// TODO(dragondriver): segment-related
|
||||
|
||||
// TODO(dragondriver): TimeTick-related
|
||||
|
||||
lastTs typeutil.Timestamp
|
||||
lastTsMtx sync.Mutex
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) updateState(state internalpb.StateCode) {
|
||||
coord.state.Store(state)
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) getState() internalpb.StateCode {
|
||||
return coord.state.Load().(internalpb.StateCode)
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) Init() error {
|
||||
coord.updateState(internalpb.StateCode_Initializing)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) Start() error {
|
||||
defer coord.updateState(internalpb.StateCode_Healthy)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) Stop() error {
|
||||
defer coord.updateState(internalpb.StateCode_Abnormal)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
|
||||
return &internalpb.ComponentStates{
|
||||
State: &internalpb.ComponentInfo{
|
||||
NodeID: coord.nodeID,
|
||||
Role: typeutil.RootCoordRole,
|
||||
StateCode: coord.getState(),
|
||||
ExtraInfo: nil,
|
||||
},
|
||||
SubcomponentStates: nil,
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
return &milvuspb.StringResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
Value: coord.statisticsChannel,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) Register() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
return &milvuspb.StringResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
Value: coord.timeTickChannel,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) CreateCollection(ctx context.Context, req *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
|
||||
coord.collMtx.Lock()
|
||||
defer coord.collMtx.Unlock()
|
||||
|
||||
_, exist := coord.collName2ID[req.CollectionName]
|
||||
if exist {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: milvuserrors.MsgCollectionAlreadyExist(req.CollectionName),
|
||||
}, nil
|
||||
}
|
||||
|
||||
var schema schemapb.CollectionSchema
|
||||
err := proto.Unmarshal(req.Schema, &schema)
|
||||
if err != nil {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: fmt.Sprintf("failed to parse schema, error: %v", err),
|
||||
}, nil
|
||||
}
|
||||
|
||||
collID := typeutil.UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
|
||||
coord.collName2ID[req.CollectionName] = collID
|
||||
|
||||
var shardsNum int32
|
||||
if req.ShardsNum <= 0 {
|
||||
shardsNum = 2
|
||||
} else {
|
||||
shardsNum = req.ShardsNum
|
||||
}
|
||||
|
||||
virtualChannelNames := make([]string, 0, shardsNum)
|
||||
physicalChannelNames := make([]string, 0, shardsNum)
|
||||
for i := 0; i < int(shardsNum); i++ {
|
||||
virtualChannelNames = append(virtualChannelNames, funcutil.GenRandomStr())
|
||||
physicalChannelNames = append(physicalChannelNames, funcutil.GenRandomStr())
|
||||
}
|
||||
|
||||
ts := uint64(time.Now().Nanosecond())
|
||||
|
||||
coord.collID2Meta[collID] = collectionMeta{
|
||||
name: req.CollectionName,
|
||||
id: collID,
|
||||
schema: &schema,
|
||||
shardsNum: shardsNum,
|
||||
virtualChannelNames: virtualChannelNames,
|
||||
physicalChannelNames: physicalChannelNames,
|
||||
createdTimestamp: ts,
|
||||
createdUtcTimestamp: ts,
|
||||
}
|
||||
|
||||
coord.partitionMtx.Lock()
|
||||
defer coord.partitionMtx.Unlock()
|
||||
|
||||
coord.collID2Partitions[collID] = partitionMap{
|
||||
collID: collID,
|
||||
partitionName2ID: make(map[string]typeutil.UniqueID),
|
||||
partitionID2Name: make(map[typeutil.UniqueID]string),
|
||||
partitionID2Meta: make(map[typeutil.UniqueID]partitionMeta),
|
||||
}
|
||||
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) DropCollection(ctx context.Context, req *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
|
||||
coord.collMtx.Lock()
|
||||
defer coord.collMtx.Unlock()
|
||||
|
||||
collID, exist := coord.collName2ID[req.CollectionName]
|
||||
if !exist {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_CollectionNotExists,
|
||||
Reason: milvuserrors.MsgCollectionNotExist(req.CollectionName),
|
||||
}, nil
|
||||
}
|
||||
|
||||
delete(coord.collName2ID, req.CollectionName)
|
||||
|
||||
delete(coord.collID2Meta, collID)
|
||||
|
||||
coord.partitionMtx.Lock()
|
||||
defer coord.partitionMtx.Unlock()
|
||||
|
||||
delete(coord.collID2Partitions, collID)
|
||||
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) HasCollection(ctx context.Context, req *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
|
||||
coord.collMtx.RLock()
|
||||
defer coord.collMtx.RUnlock()
|
||||
|
||||
_, exist := coord.collName2ID[req.CollectionName]
|
||||
|
||||
return &milvuspb.BoolResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
Value: exist,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) DescribeCollection(ctx context.Context, req *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
|
||||
coord.collMtx.RLock()
|
||||
defer coord.collMtx.RUnlock()
|
||||
|
||||
collID, exist := coord.collName2ID[req.CollectionName]
|
||||
if !exist {
|
||||
return &milvuspb.DescribeCollectionResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_CollectionNotExists,
|
||||
Reason: milvuserrors.MsgCollectionNotExist(req.CollectionName),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
meta := coord.collID2Meta[collID]
|
||||
|
||||
return &milvuspb.DescribeCollectionResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
Schema: meta.schema,
|
||||
CollectionID: collID,
|
||||
VirtualChannelNames: meta.virtualChannelNames,
|
||||
PhysicalChannelNames: meta.physicalChannelNames,
|
||||
CreatedTimestamp: meta.createdUtcTimestamp,
|
||||
CreatedUtcTimestamp: meta.createdUtcTimestamp,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) ShowCollections(ctx context.Context, req *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
|
||||
coord.collMtx.RLock()
|
||||
defer coord.collMtx.RUnlock()
|
||||
|
||||
coord.collMtx.RLock()
|
||||
defer coord.collMtx.RUnlock()
|
||||
|
||||
names := make([]string, 0, len(coord.collName2ID))
|
||||
ids := make([]int64, 0, len(coord.collName2ID))
|
||||
createdTimestamps := make([]uint64, 0, len(coord.collName2ID))
|
||||
createdUtcTimestamps := make([]uint64, 0, len(coord.collName2ID))
|
||||
|
||||
for name, id := range coord.collName2ID {
|
||||
names = append(names, name)
|
||||
ids = append(ids, id)
|
||||
meta := coord.collID2Meta[id]
|
||||
createdTimestamps = append(createdTimestamps, meta.createdTimestamp)
|
||||
createdUtcTimestamps = append(createdUtcTimestamps, meta.createdUtcTimestamp)
|
||||
}
|
||||
|
||||
return &milvuspb.ShowCollectionsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
CollectionNames: names,
|
||||
CollectionIds: ids,
|
||||
CreatedTimestamps: createdTimestamps,
|
||||
CreatedUtcTimestamps: createdUtcTimestamps,
|
||||
InMemoryPercentages: nil,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) CreatePartition(ctx context.Context, req *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
|
||||
coord.collMtx.RLock()
|
||||
defer coord.collMtx.RUnlock()
|
||||
|
||||
collID, exist := coord.collName2ID[req.CollectionName]
|
||||
if !exist {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_CollectionNotExists,
|
||||
Reason: milvuserrors.MsgCollectionNotExist(req.CollectionName),
|
||||
}, nil
|
||||
}
|
||||
|
||||
coord.partitionMtx.Lock()
|
||||
defer coord.partitionMtx.Unlock()
|
||||
|
||||
_, partitionExist := coord.collID2Partitions[collID].partitionName2ID[req.PartitionName]
|
||||
if partitionExist {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: milvuserrors.MsgPartitionAlreadyExist(req.PartitionName),
|
||||
}, nil
|
||||
}
|
||||
|
||||
ts := uint64(time.Now().Nanosecond())
|
||||
|
||||
partitionID := typeutil.UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
|
||||
coord.collID2Partitions[collID].partitionName2ID[req.PartitionName] = partitionID
|
||||
coord.collID2Partitions[collID].partitionID2Name[partitionID] = req.PartitionName
|
||||
coord.collID2Partitions[collID].partitionID2Meta[partitionID] = partitionMeta{
|
||||
createdTimestamp: ts,
|
||||
createdUtcTimestamp: ts,
|
||||
}
|
||||
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) DropPartition(ctx context.Context, req *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
|
||||
coord.collMtx.RLock()
|
||||
defer coord.collMtx.RUnlock()
|
||||
|
||||
collID, exist := coord.collName2ID[req.CollectionName]
|
||||
if !exist {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_CollectionNotExists,
|
||||
Reason: milvuserrors.MsgCollectionNotExist(req.CollectionName),
|
||||
}, nil
|
||||
}
|
||||
|
||||
coord.partitionMtx.Lock()
|
||||
defer coord.partitionMtx.Unlock()
|
||||
|
||||
partitionID, partitionExist := coord.collID2Partitions[collID].partitionName2ID[req.PartitionName]
|
||||
if !partitionExist {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: milvuserrors.MsgPartitionNotExist(req.PartitionName),
|
||||
}, nil
|
||||
}
|
||||
|
||||
delete(coord.collID2Partitions[collID].partitionName2ID, req.PartitionName)
|
||||
delete(coord.collID2Partitions[collID].partitionID2Name, partitionID)
|
||||
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) HasPartition(ctx context.Context, req *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
|
||||
coord.collMtx.RLock()
|
||||
defer coord.collMtx.RUnlock()
|
||||
|
||||
collID, exist := coord.collName2ID[req.CollectionName]
|
||||
if !exist {
|
||||
return &milvuspb.BoolResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_CollectionNotExists,
|
||||
Reason: milvuserrors.MsgCollectionNotExist(req.CollectionName),
|
||||
},
|
||||
Value: false,
|
||||
}, nil
|
||||
}
|
||||
|
||||
coord.partitionMtx.RLock()
|
||||
defer coord.partitionMtx.RUnlock()
|
||||
|
||||
_, partitionExist := coord.collID2Partitions[collID].partitionName2ID[req.PartitionName]
|
||||
return &milvuspb.BoolResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
Value: partitionExist,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) ShowPartitions(ctx context.Context, req *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
|
||||
coord.collMtx.RLock()
|
||||
defer coord.collMtx.RUnlock()
|
||||
|
||||
collID, exist := coord.collName2ID[req.CollectionName]
|
||||
if !exist {
|
||||
return &milvuspb.ShowPartitionsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_CollectionNotExists,
|
||||
Reason: milvuserrors.MsgCollectionNotExist(req.CollectionName),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
coord.partitionMtx.RLock()
|
||||
defer coord.partitionMtx.RUnlock()
|
||||
|
||||
l := len(coord.collID2Partitions[collID].partitionName2ID)
|
||||
|
||||
names := make([]string, 0, l)
|
||||
ids := make([]int64, 0, l)
|
||||
createdTimestamps := make([]uint64, 0, l)
|
||||
createdUtcTimestamps := make([]uint64, 0, l)
|
||||
|
||||
for name, id := range coord.collID2Partitions[collID].partitionName2ID {
|
||||
names = append(names, name)
|
||||
ids = append(ids, id)
|
||||
meta := coord.collID2Partitions[collID].partitionID2Meta[id]
|
||||
createdTimestamps = append(createdTimestamps, meta.createdTimestamp)
|
||||
createdUtcTimestamps = append(createdUtcTimestamps, meta.createdUtcTimestamp)
|
||||
}
|
||||
|
||||
return &milvuspb.ShowPartitionsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
PartitionNames: names,
|
||||
PartitionIDs: ids,
|
||||
CreatedTimestamps: createdTimestamps,
|
||||
CreatedUtcTimestamps: createdUtcTimestamps,
|
||||
InMemoryPercentages: nil,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) CreateIndex(ctx context.Context, req *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) DescribeIndex(ctx context.Context, req *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
|
||||
return &milvuspb.DescribeIndexResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
IndexDescriptions: nil,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) DropIndex(ctx context.Context, req *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) AllocTimestamp(ctx context.Context, req *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) {
|
||||
coord.lastTsMtx.Lock()
|
||||
defer coord.lastTsMtx.Unlock()
|
||||
|
||||
ts := uint64(time.Now().UnixNano())
|
||||
if ts < coord.lastTs+typeutil.Timestamp(req.Count) {
|
||||
ts = coord.lastTs + typeutil.Timestamp(req.Count)
|
||||
}
|
||||
|
||||
coord.lastTs = ts
|
||||
return &rootcoordpb.AllocTimestampResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
Timestamp: ts,
|
||||
Count: req.Count,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) AllocID(ctx context.Context, req *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) {
|
||||
begin, _ := uniquegenerator.GetUniqueIntGeneratorIns().GetInts(int(req.Count))
|
||||
return &rootcoordpb.AllocIDResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
ID: int64(begin),
|
||||
Count: req.Count,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) UpdateChannelTimeTick(ctx context.Context, req *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) DescribeSegment(ctx context.Context, req *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
|
||||
return &milvuspb.DescribeSegmentResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
IndexID: 0,
|
||||
BuildID: 0,
|
||||
EnableIndex: false,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) ShowSegments(ctx context.Context, req *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) {
|
||||
return &milvuspb.ShowSegmentsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
SegmentIDs: nil,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (coord *RootCoordMock) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
||||
rootCoordTopology := metricsinfo.RootCoordTopology{
|
||||
Self: metricsinfo.RootCoordInfos{
|
||||
BaseComponentInfos: metricsinfo.BaseComponentInfos{
|
||||
Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, coord.nodeID),
|
||||
HardwareInfos: metricsinfo.HardwareMetrics{
|
||||
IP: coord.address,
|
||||
CPUCoreCount: metricsinfo.GetCPUCoreCount(false),
|
||||
CPUCoreUsage: metricsinfo.GetCPUUsage(),
|
||||
Memory: metricsinfo.GetMemoryCount(),
|
||||
MemoryUsage: metricsinfo.GetUsedMemoryCount(),
|
||||
Disk: metricsinfo.GetDiskCount(),
|
||||
DiskUsage: metricsinfo.GetDiskUsage(),
|
||||
},
|
||||
SystemInfo: metricsinfo.DeployMetrics{
|
||||
SystemVersion: os.Getenv(metricsinfo.GitCommitEnvKey),
|
||||
DeployMode: os.Getenv(metricsinfo.DeployModeEnvKey),
|
||||
},
|
||||
// TODO(dragondriver): CreatedTime & UpdatedTime, easy but time-costing
|
||||
Type: typeutil.RootCoordRole,
|
||||
},
|
||||
SystemConfigurations: metricsinfo.RootCoordConfiguration{
|
||||
MinSegmentSizeToEnableIndex: 100,
|
||||
},
|
||||
},
|
||||
Connections: metricsinfo.ConnTopology{
|
||||
Name: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, coord.nodeID),
|
||||
// TODO(dragondriver): fill ConnectedComponents if necessary
|
||||
ConnectedComponents: []metricsinfo.ConnectionInfo{},
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := metricsinfo.MarshalTopology(rootCoordTopology)
|
||||
if err != nil {
|
||||
log.Warn("Failed to marshal system info metrics of root coordinator",
|
||||
zap.Error(err))
|
||||
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
},
|
||||
Response: "",
|
||||
ComponentName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, coord.nodeID),
|
||||
}, nil
|
||||
}
|
||||
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
Response: resp,
|
||||
ComponentName: metricsinfo.ConstructComponentName(typeutil.RootCoordRole, coord.nodeID),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewRootCoordMock() *RootCoordMock {
|
||||
return &RootCoordMock{
|
||||
nodeID: typeutil.UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()),
|
||||
address: funcutil.GenRandomStr(), // TODO(dragondriver): random address
|
||||
statisticsChannel: funcutil.GenRandomStr(),
|
||||
timeTickChannel: funcutil.GenRandomStr(),
|
||||
collName2ID: make(map[string]typeutil.UniqueID),
|
||||
collID2Meta: make(map[typeutil.UniqueID]collectionMeta),
|
||||
collID2Partitions: make(map[typeutil.UniqueID]partitionMap),
|
||||
lastTs: typeutil.Timestamp(time.Now().UnixNano()),
|
||||
}
|
||||
}
|
|
@ -48,12 +48,12 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
InsertTaskName = "InsertTask"
|
||||
InsertTaskName = "insertTask"
|
||||
CreateCollectionTaskName = "CreateCollectionTask"
|
||||
DropCollectionTaskName = "DropCollectionTask"
|
||||
SearchTaskName = "SearchTask"
|
||||
SearchTaskName = "searchTask"
|
||||
RetrieveTaskName = "RetrieveTask"
|
||||
QueryTaskName = "QueryTask"
|
||||
QueryTaskName = "queryTask"
|
||||
AnnsFieldKey = "anns_field"
|
||||
TopKKey = "topk"
|
||||
MetricTypeKey = "metric_type"
|
||||
|
@ -67,12 +67,12 @@ const (
|
|||
DropPartitionTaskName = "DropPartitionTask"
|
||||
HasPartitionTaskName = "HasPartitionTask"
|
||||
ShowPartitionTaskName = "ShowPartitionTask"
|
||||
CreateIndexTaskName = "CreateIndexTask"
|
||||
DescribeIndexTaskName = "DescribeIndexTask"
|
||||
DropIndexTaskName = "DropIndexTask"
|
||||
GetIndexStateTaskName = "GetIndexStateTask"
|
||||
GetIndexBuildProgressTaskName = "GetIndexBuildProgressTask"
|
||||
FlushTaskName = "FlushTask"
|
||||
CreateIndexTaskName = "createIndexTask"
|
||||
DescribeIndexTaskName = "describeIndexTask"
|
||||
DropIndexTaskName = "dropIndexTask"
|
||||
GetIndexStateTaskName = "getIndexStateTask"
|
||||
GetIndexBuildProgressTaskName = "getIndexBuildProgressTask"
|
||||
FlushTaskName = "flushTask"
|
||||
LoadCollectionTaskName = "LoadCollectionTask"
|
||||
ReleaseCollectionTaskName = "ReleaseCollectionTask"
|
||||
LoadPartitionTaskName = "LoadPartitionsTask"
|
||||
|
@ -104,7 +104,7 @@ type dmlTask interface {
|
|||
|
||||
type BaseInsertTask = msgstream.InsertMsg
|
||||
|
||||
type InsertTask struct {
|
||||
type insertTask struct {
|
||||
BaseInsertTask
|
||||
req *milvuspb.InsertRequest
|
||||
Condition
|
||||
|
@ -121,40 +121,40 @@ type InsertTask struct {
|
|||
schema *schemapb.CollectionSchema
|
||||
}
|
||||
|
||||
func (it *InsertTask) TraceCtx() context.Context {
|
||||
func (it *insertTask) TraceCtx() context.Context {
|
||||
return it.ctx
|
||||
}
|
||||
|
||||
func (it *InsertTask) ID() UniqueID {
|
||||
func (it *insertTask) ID() UniqueID {
|
||||
return it.Base.MsgID
|
||||
}
|
||||
|
||||
func (it *InsertTask) SetID(uid UniqueID) {
|
||||
func (it *insertTask) SetID(uid UniqueID) {
|
||||
it.Base.MsgID = uid
|
||||
}
|
||||
|
||||
func (it *InsertTask) Name() string {
|
||||
func (it *insertTask) Name() string {
|
||||
return InsertTaskName
|
||||
}
|
||||
|
||||
func (it *InsertTask) Type() commonpb.MsgType {
|
||||
func (it *insertTask) Type() commonpb.MsgType {
|
||||
return it.Base.MsgType
|
||||
}
|
||||
|
||||
func (it *InsertTask) BeginTs() Timestamp {
|
||||
func (it *insertTask) BeginTs() Timestamp {
|
||||
return it.BeginTimestamp
|
||||
}
|
||||
|
||||
func (it *InsertTask) SetTs(ts Timestamp) {
|
||||
func (it *insertTask) SetTs(ts Timestamp) {
|
||||
it.BeginTimestamp = ts
|
||||
it.EndTimestamp = ts
|
||||
}
|
||||
|
||||
func (it *InsertTask) EndTs() Timestamp {
|
||||
func (it *insertTask) EndTs() Timestamp {
|
||||
return it.EndTimestamp
|
||||
}
|
||||
|
||||
func (it *InsertTask) getPChanStats() (map[pChan]pChanStatistics, error) {
|
||||
func (it *insertTask) getPChanStats() (map[pChan]pChanStatistics, error) {
|
||||
ret := make(map[pChan]pChanStatistics)
|
||||
|
||||
channels, err := it.getChannels()
|
||||
|
@ -174,7 +174,7 @@ func (it *InsertTask) getPChanStats() (map[pChan]pChanStatistics, error) {
|
|||
return ret, nil
|
||||
}
|
||||
|
||||
func (it *InsertTask) getChannels() ([]pChan, error) {
|
||||
func (it *insertTask) getChannels() ([]pChan, error) {
|
||||
collID, err := globalMetaCache.GetCollectionID(it.ctx, it.CollectionName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -202,7 +202,7 @@ func (it *InsertTask) getChannels() ([]pChan, error) {
|
|||
return channels, err
|
||||
}
|
||||
|
||||
func (it *InsertTask) OnEnqueue() error {
|
||||
func (it *insertTask) OnEnqueue() error {
|
||||
it.BaseInsertTask.InsertRequest.Base = &commonpb.MsgBase{}
|
||||
return nil
|
||||
}
|
||||
|
@ -237,7 +237,7 @@ func getNumRowsOfBinaryVectorField(bDatas []byte, dim int64) (uint32, error) {
|
|||
return uint32(int((8 * int64(l)) / dim)), nil
|
||||
}
|
||||
|
||||
func (it *InsertTask) checkLengthOfFieldsData() error {
|
||||
func (it *insertTask) checkLengthOfFieldsData() error {
|
||||
neededFieldsNum := 0
|
||||
for _, field := range it.schema.Fields {
|
||||
if !field.AutoID {
|
||||
|
@ -252,7 +252,7 @@ func (it *InsertTask) checkLengthOfFieldsData() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (it *InsertTask) checkRowNums() error {
|
||||
func (it *insertTask) checkRowNums() error {
|
||||
if it.req.NumRows <= 0 {
|
||||
return errNumRowsLessThanOrEqualToZero(it.req.NumRows)
|
||||
}
|
||||
|
@ -335,7 +335,7 @@ func (it *InsertTask) checkRowNums() error {
|
|||
}
|
||||
|
||||
// TODO(dragondriver): ignore the order of fields in request, use the order of CollectionSchema to reorganize data
|
||||
func (it *InsertTask) transferColumnBasedRequestToRowBasedData() error {
|
||||
func (it *insertTask) transferColumnBasedRequestToRowBasedData() error {
|
||||
dTypes := make([]schemapb.DataType, 0, len(it.req.FieldsData))
|
||||
datas := make([][]interface{}, 0, len(it.req.FieldsData))
|
||||
rowNum := 0
|
||||
|
@ -576,7 +576,7 @@ func (it *InsertTask) transferColumnBasedRequestToRowBasedData() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (it *InsertTask) checkFieldAutoID() error {
|
||||
func (it *insertTask) checkFieldAutoID() error {
|
||||
// TODO(dragondriver): in fact, NumRows is not trustable, we should check all input fields
|
||||
if it.req.NumRows <= 0 {
|
||||
return errNumRowsLessThanOrEqualToZero(it.req.NumRows)
|
||||
|
@ -723,7 +723,7 @@ func (it *InsertTask) checkFieldAutoID() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (it *InsertTask) PreExecute(ctx context.Context) error {
|
||||
func (it *insertTask) PreExecute(ctx context.Context) error {
|
||||
it.Base.MsgType = commonpb.MsgType_Insert
|
||||
it.Base.SourceID = Params.ProxyID
|
||||
|
||||
|
@ -778,7 +778,7 @@ func (it *InsertTask) PreExecute(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (it *InsertTask) _assignSegmentID(stream msgstream.MsgStream, pack *msgstream.MsgPack) (*msgstream.MsgPack, error) {
|
||||
func (it *insertTask) _assignSegmentID(stream msgstream.MsgStream, pack *msgstream.MsgPack) (*msgstream.MsgPack, error) {
|
||||
newPack := &msgstream.MsgPack{
|
||||
BeginTs: pack.BeginTs,
|
||||
EndTs: pack.EndTs,
|
||||
|
@ -841,7 +841,7 @@ func (it *InsertTask) _assignSegmentID(stream msgstream.MsgStream, pack *msgstre
|
|||
}
|
||||
mapInfo, err := it.segIDAssigner.GetSegmentID(it.CollectionID, it.PartitionID, channelName, count, ts)
|
||||
if err != nil {
|
||||
log.Debug("InsertTask.go", zap.Any("MapInfo", mapInfo),
|
||||
log.Debug("insertTask.go", zap.Any("MapInfo", mapInfo),
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
@ -990,7 +990,7 @@ func (it *InsertTask) _assignSegmentID(stream msgstream.MsgStream, pack *msgstre
|
|||
return newPack, nil
|
||||
}
|
||||
|
||||
func (it *InsertTask) Execute(ctx context.Context) error {
|
||||
func (it *insertTask) Execute(ctx context.Context) error {
|
||||
collectionName := it.BaseInsertTask.CollectionName
|
||||
collID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
|
||||
if err != nil {
|
||||
|
@ -1065,18 +1065,17 @@ func (it *InsertTask) Execute(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (it *InsertTask) PostExecute(ctx context.Context) error {
|
||||
func (it *insertTask) PostExecute(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type createCollectionTask struct {
|
||||
Condition
|
||||
*milvuspb.CreateCollectionRequest
|
||||
ctx context.Context
|
||||
rootCoord types.RootCoord
|
||||
dataCoordClient types.DataCoord
|
||||
result *commonpb.Status
|
||||
schema *schemapb.CollectionSchema
|
||||
ctx context.Context
|
||||
rootCoord types.RootCoord
|
||||
result *commonpb.Status
|
||||
schema *schemapb.CollectionSchema
|
||||
}
|
||||
|
||||
func (cct *createCollectionTask) TraceCtx() context.Context {
|
||||
|
@ -1336,7 +1335,7 @@ func translateOutputFields(outputFields []string, schema *schemapb.CollectionSch
|
|||
return resultFieldNames, nil
|
||||
}
|
||||
|
||||
type SearchTask struct {
|
||||
type searchTask struct {
|
||||
Condition
|
||||
*internalpb.SearchRequest
|
||||
ctx context.Context
|
||||
|
@ -1347,44 +1346,44 @@ type SearchTask struct {
|
|||
qc types.QueryCoord
|
||||
}
|
||||
|
||||
func (st *SearchTask) TraceCtx() context.Context {
|
||||
func (st *searchTask) TraceCtx() context.Context {
|
||||
return st.ctx
|
||||
}
|
||||
|
||||
func (st *SearchTask) ID() UniqueID {
|
||||
func (st *searchTask) ID() UniqueID {
|
||||
return st.Base.MsgID
|
||||
}
|
||||
|
||||
func (st *SearchTask) SetID(uid UniqueID) {
|
||||
func (st *searchTask) SetID(uid UniqueID) {
|
||||
st.Base.MsgID = uid
|
||||
}
|
||||
|
||||
func (st *SearchTask) Name() string {
|
||||
func (st *searchTask) Name() string {
|
||||
return SearchTaskName
|
||||
}
|
||||
|
||||
func (st *SearchTask) Type() commonpb.MsgType {
|
||||
func (st *searchTask) Type() commonpb.MsgType {
|
||||
return st.Base.MsgType
|
||||
}
|
||||
|
||||
func (st *SearchTask) BeginTs() Timestamp {
|
||||
func (st *searchTask) BeginTs() Timestamp {
|
||||
return st.Base.Timestamp
|
||||
}
|
||||
|
||||
func (st *SearchTask) EndTs() Timestamp {
|
||||
func (st *searchTask) EndTs() Timestamp {
|
||||
return st.Base.Timestamp
|
||||
}
|
||||
|
||||
func (st *SearchTask) SetTs(ts Timestamp) {
|
||||
func (st *searchTask) SetTs(ts Timestamp) {
|
||||
st.Base.Timestamp = ts
|
||||
}
|
||||
|
||||
func (st *SearchTask) OnEnqueue() error {
|
||||
func (st *searchTask) OnEnqueue() error {
|
||||
st.Base = &commonpb.MsgBase{}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (st *SearchTask) getChannels() ([]pChan, error) {
|
||||
func (st *searchTask) getChannels() ([]pChan, error) {
|
||||
collID, err := globalMetaCache.GetCollectionID(st.ctx, st.query.CollectionName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -1393,7 +1392,7 @@ func (st *SearchTask) getChannels() ([]pChan, error) {
|
|||
return st.chMgr.getChannels(collID)
|
||||
}
|
||||
|
||||
func (st *SearchTask) getVChannels() ([]vChan, error) {
|
||||
func (st *searchTask) getVChannels() ([]vChan, error) {
|
||||
collID, err := globalMetaCache.GetCollectionID(st.ctx, st.query.CollectionName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -1410,7 +1409,7 @@ func (st *SearchTask) getVChannels() ([]vChan, error) {
|
|||
return st.chMgr.getVChannels(collID)
|
||||
}
|
||||
|
||||
func (st *SearchTask) PreExecute(ctx context.Context) error {
|
||||
func (st *searchTask) PreExecute(ctx context.Context) error {
|
||||
st.Base.MsgType = commonpb.MsgType_Search
|
||||
st.Base.SourceID = Params.ProxyID
|
||||
|
||||
|
@ -1539,7 +1538,7 @@ func (st *SearchTask) PreExecute(ctx context.Context) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("Proxy::SearchTask::PreExecute", zap.Any("plan.OutputFieldIds", plan.OutputFieldIds),
|
||||
log.Debug("Proxy::searchTask::PreExecute", zap.Any("plan.OutputFieldIds", plan.OutputFieldIds),
|
||||
zap.Any("plan", plan.String()))
|
||||
}
|
||||
travelTimestamp := st.query.TravelTimestamp
|
||||
|
@ -1596,7 +1595,7 @@ func (st *SearchTask) PreExecute(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (st *SearchTask) Execute(ctx context.Context) error {
|
||||
func (st *searchTask) Execute(ctx context.Context) error {
|
||||
var tsMsg msgstream.TsMsg = &msgstream.SearchMsg{
|
||||
SearchRequest: *st.SearchRequest,
|
||||
BaseMsg: msgstream.BaseMsg{
|
||||
|
@ -1928,7 +1927,7 @@ func reduceSearchResultData(searchResultData []*schemapb.SearchResultData, avail
|
|||
// }
|
||||
//}
|
||||
|
||||
func (st *SearchTask) PostExecute(ctx context.Context) error {
|
||||
func (st *searchTask) PostExecute(ctx context.Context) error {
|
||||
t0 := time.Now()
|
||||
defer func() {
|
||||
log.Debug("WaitAndPostExecute", zap.Any("time cost", time.Since(t0)))
|
||||
|
@ -1936,8 +1935,8 @@ func (st *SearchTask) PostExecute(ctx context.Context) error {
|
|||
for {
|
||||
select {
|
||||
case <-st.TraceCtx().Done():
|
||||
log.Debug("Proxy", zap.Int64("SearchTask PostExecute Loop exit caused by ctx.Done", st.ID()))
|
||||
return fmt.Errorf("SearchTask:wait to finish failed, timeout: %d", st.ID())
|
||||
log.Debug("Proxy", zap.Int64("searchTask PostExecute Loop exit caused by ctx.Done", st.ID()))
|
||||
return fmt.Errorf("searchTask:wait to finish failed, timeout: %d", st.ID())
|
||||
case searchResults := <-st.resultBuf:
|
||||
// fmt.Println("searchResults: ", searchResults)
|
||||
filterSearchResult := make([]*internalpb.SearchResults, 0)
|
||||
|
@ -2024,7 +2023,7 @@ func (st *SearchTask) PostExecute(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
type QueryTask struct {
|
||||
type queryTask struct {
|
||||
Condition
|
||||
*internalpb.RetrieveRequest
|
||||
ctx context.Context
|
||||
|
@ -2036,44 +2035,44 @@ type QueryTask struct {
|
|||
ids *schemapb.IDs
|
||||
}
|
||||
|
||||
func (qt *QueryTask) TraceCtx() context.Context {
|
||||
func (qt *queryTask) TraceCtx() context.Context {
|
||||
return qt.ctx
|
||||
}
|
||||
|
||||
func (qt *QueryTask) ID() UniqueID {
|
||||
func (qt *queryTask) ID() UniqueID {
|
||||
return qt.Base.MsgID
|
||||
}
|
||||
|
||||
func (qt *QueryTask) SetID(uid UniqueID) {
|
||||
func (qt *queryTask) SetID(uid UniqueID) {
|
||||
qt.Base.MsgID = uid
|
||||
}
|
||||
|
||||
func (qt *QueryTask) Name() string {
|
||||
func (qt *queryTask) Name() string {
|
||||
return RetrieveTaskName
|
||||
}
|
||||
|
||||
func (qt *QueryTask) Type() commonpb.MsgType {
|
||||
func (qt *queryTask) Type() commonpb.MsgType {
|
||||
return qt.Base.MsgType
|
||||
}
|
||||
|
||||
func (qt *QueryTask) BeginTs() Timestamp {
|
||||
func (qt *queryTask) BeginTs() Timestamp {
|
||||
return qt.Base.Timestamp
|
||||
}
|
||||
|
||||
func (qt *QueryTask) EndTs() Timestamp {
|
||||
func (qt *queryTask) EndTs() Timestamp {
|
||||
return qt.Base.Timestamp
|
||||
}
|
||||
|
||||
func (qt *QueryTask) SetTs(ts Timestamp) {
|
||||
func (qt *queryTask) SetTs(ts Timestamp) {
|
||||
qt.Base.Timestamp = ts
|
||||
}
|
||||
|
||||
func (qt *QueryTask) OnEnqueue() error {
|
||||
func (qt *queryTask) OnEnqueue() error {
|
||||
qt.Base.MsgType = commonpb.MsgType_Retrieve
|
||||
return nil
|
||||
}
|
||||
|
||||
func (qt *QueryTask) getChannels() ([]pChan, error) {
|
||||
func (qt *queryTask) getChannels() ([]pChan, error) {
|
||||
collID, err := globalMetaCache.GetCollectionID(qt.ctx, qt.query.CollectionName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -2082,7 +2081,7 @@ func (qt *QueryTask) getChannels() ([]pChan, error) {
|
|||
return qt.chMgr.getChannels(collID)
|
||||
}
|
||||
|
||||
func (qt *QueryTask) getVChannels() ([]vChan, error) {
|
||||
func (qt *queryTask) getVChannels() ([]vChan, error) {
|
||||
collID, err := globalMetaCache.GetCollectionID(qt.ctx, qt.query.CollectionName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -2132,7 +2131,7 @@ func IDs2Expr(fieldName string, ids []int64) string {
|
|||
return fieldName + " in [ " + idsStr + " ]"
|
||||
}
|
||||
|
||||
func (qt *QueryTask) PreExecute(ctx context.Context) error {
|
||||
func (qt *queryTask) PreExecute(ctx context.Context) error {
|
||||
qt.Base.MsgType = commonpb.MsgType_Retrieve
|
||||
qt.Base.SourceID = Params.ProxyID
|
||||
|
||||
|
@ -2347,7 +2346,7 @@ func (qt *QueryTask) PreExecute(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (qt *QueryTask) Execute(ctx context.Context) error {
|
||||
func (qt *queryTask) Execute(ctx context.Context) error {
|
||||
var tsMsg msgstream.TsMsg = &msgstream.RetrieveMsg{
|
||||
RetrieveRequest: *qt.RetrieveRequest,
|
||||
BaseMsg: msgstream.BaseMsg{
|
||||
|
@ -2397,7 +2396,7 @@ func (qt *QueryTask) Execute(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (qt *QueryTask) PostExecute(ctx context.Context) error {
|
||||
func (qt *queryTask) PostExecute(ctx context.Context) error {
|
||||
t0 := time.Now()
|
||||
defer func() {
|
||||
log.Debug("WaitAndPostExecute", zap.Any("time cost", time.Since(t0)))
|
||||
|
@ -2405,7 +2404,7 @@ func (qt *QueryTask) PostExecute(ctx context.Context) error {
|
|||
select {
|
||||
case <-qt.TraceCtx().Done():
|
||||
log.Debug("proxy", zap.Int64("Query: wait to finish failed, timeout!, taskID:", qt.ID()))
|
||||
return fmt.Errorf("QueryTask:wait to finish failed, timeout : %d", qt.ID())
|
||||
return fmt.Errorf("queryTask:wait to finish failed, timeout : %d", qt.ID())
|
||||
case retrieveResults := <-qt.resultBuf:
|
||||
retrieveResult := make([]*internalpb.RetrieveResults, 0)
|
||||
var reason string
|
||||
|
@ -3433,7 +3432,7 @@ func (spt *showPartitionsTask) PostExecute(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type CreateIndexTask struct {
|
||||
type createIndexTask struct {
|
||||
Condition
|
||||
*milvuspb.CreateIndexRequest
|
||||
ctx context.Context
|
||||
|
@ -3441,44 +3440,44 @@ type CreateIndexTask struct {
|
|||
result *commonpb.Status
|
||||
}
|
||||
|
||||
func (cit *CreateIndexTask) TraceCtx() context.Context {
|
||||
func (cit *createIndexTask) TraceCtx() context.Context {
|
||||
return cit.ctx
|
||||
}
|
||||
|
||||
func (cit *CreateIndexTask) ID() UniqueID {
|
||||
func (cit *createIndexTask) ID() UniqueID {
|
||||
return cit.Base.MsgID
|
||||
}
|
||||
|
||||
func (cit *CreateIndexTask) SetID(uid UniqueID) {
|
||||
func (cit *createIndexTask) SetID(uid UniqueID) {
|
||||
cit.Base.MsgID = uid
|
||||
}
|
||||
|
||||
func (cit *CreateIndexTask) Name() string {
|
||||
func (cit *createIndexTask) Name() string {
|
||||
return CreateIndexTaskName
|
||||
}
|
||||
|
||||
func (cit *CreateIndexTask) Type() commonpb.MsgType {
|
||||
func (cit *createIndexTask) Type() commonpb.MsgType {
|
||||
return cit.Base.MsgType
|
||||
}
|
||||
|
||||
func (cit *CreateIndexTask) BeginTs() Timestamp {
|
||||
func (cit *createIndexTask) BeginTs() Timestamp {
|
||||
return cit.Base.Timestamp
|
||||
}
|
||||
|
||||
func (cit *CreateIndexTask) EndTs() Timestamp {
|
||||
func (cit *createIndexTask) EndTs() Timestamp {
|
||||
return cit.Base.Timestamp
|
||||
}
|
||||
|
||||
func (cit *CreateIndexTask) SetTs(ts Timestamp) {
|
||||
func (cit *createIndexTask) SetTs(ts Timestamp) {
|
||||
cit.Base.Timestamp = ts
|
||||
}
|
||||
|
||||
func (cit *CreateIndexTask) OnEnqueue() error {
|
||||
func (cit *createIndexTask) OnEnqueue() error {
|
||||
cit.Base = &commonpb.MsgBase{}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cit *CreateIndexTask) PreExecute(ctx context.Context) error {
|
||||
func (cit *createIndexTask) PreExecute(ctx context.Context) error {
|
||||
cit.Base.MsgType = commonpb.MsgType_CreateIndex
|
||||
cit.Base.SourceID = Params.ProxyID
|
||||
|
||||
|
@ -3531,7 +3530,7 @@ func (cit *CreateIndexTask) PreExecute(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (cit *CreateIndexTask) Execute(ctx context.Context) error {
|
||||
func (cit *createIndexTask) Execute(ctx context.Context) error {
|
||||
var err error
|
||||
cit.result, err = cit.rootCoord.CreateIndex(ctx, cit.CreateIndexRequest)
|
||||
if cit.result == nil {
|
||||
|
@ -3543,11 +3542,11 @@ func (cit *CreateIndexTask) Execute(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (cit *CreateIndexTask) PostExecute(ctx context.Context) error {
|
||||
func (cit *createIndexTask) PostExecute(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type DescribeIndexTask struct {
|
||||
type describeIndexTask struct {
|
||||
Condition
|
||||
*milvuspb.DescribeIndexRequest
|
||||
ctx context.Context
|
||||
|
@ -3555,44 +3554,44 @@ type DescribeIndexTask struct {
|
|||
result *milvuspb.DescribeIndexResponse
|
||||
}
|
||||
|
||||
func (dit *DescribeIndexTask) TraceCtx() context.Context {
|
||||
func (dit *describeIndexTask) TraceCtx() context.Context {
|
||||
return dit.ctx
|
||||
}
|
||||
|
||||
func (dit *DescribeIndexTask) ID() UniqueID {
|
||||
func (dit *describeIndexTask) ID() UniqueID {
|
||||
return dit.Base.MsgID
|
||||
}
|
||||
|
||||
func (dit *DescribeIndexTask) SetID(uid UniqueID) {
|
||||
func (dit *describeIndexTask) SetID(uid UniqueID) {
|
||||
dit.Base.MsgID = uid
|
||||
}
|
||||
|
||||
func (dit *DescribeIndexTask) Name() string {
|
||||
func (dit *describeIndexTask) Name() string {
|
||||
return DescribeIndexTaskName
|
||||
}
|
||||
|
||||
func (dit *DescribeIndexTask) Type() commonpb.MsgType {
|
||||
func (dit *describeIndexTask) Type() commonpb.MsgType {
|
||||
return dit.Base.MsgType
|
||||
}
|
||||
|
||||
func (dit *DescribeIndexTask) BeginTs() Timestamp {
|
||||
func (dit *describeIndexTask) BeginTs() Timestamp {
|
||||
return dit.Base.Timestamp
|
||||
}
|
||||
|
||||
func (dit *DescribeIndexTask) EndTs() Timestamp {
|
||||
func (dit *describeIndexTask) EndTs() Timestamp {
|
||||
return dit.Base.Timestamp
|
||||
}
|
||||
|
||||
func (dit *DescribeIndexTask) SetTs(ts Timestamp) {
|
||||
func (dit *describeIndexTask) SetTs(ts Timestamp) {
|
||||
dit.Base.Timestamp = ts
|
||||
}
|
||||
|
||||
func (dit *DescribeIndexTask) OnEnqueue() error {
|
||||
func (dit *describeIndexTask) OnEnqueue() error {
|
||||
dit.Base = &commonpb.MsgBase{}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dit *DescribeIndexTask) PreExecute(ctx context.Context) error {
|
||||
func (dit *describeIndexTask) PreExecute(ctx context.Context) error {
|
||||
dit.Base.MsgType = commonpb.MsgType_DescribeIndex
|
||||
dit.Base.SourceID = Params.ProxyID
|
||||
|
||||
|
@ -3608,7 +3607,7 @@ func (dit *DescribeIndexTask) PreExecute(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (dit *DescribeIndexTask) Execute(ctx context.Context) error {
|
||||
func (dit *describeIndexTask) Execute(ctx context.Context) error {
|
||||
var err error
|
||||
dit.result, err = dit.rootCoord.DescribeIndex(ctx, dit.DescribeIndexRequest)
|
||||
if dit.result == nil {
|
||||
|
@ -3620,11 +3619,11 @@ func (dit *DescribeIndexTask) Execute(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (dit *DescribeIndexTask) PostExecute(ctx context.Context) error {
|
||||
func (dit *describeIndexTask) PostExecute(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type DropIndexTask struct {
|
||||
type dropIndexTask struct {
|
||||
Condition
|
||||
ctx context.Context
|
||||
*milvuspb.DropIndexRequest
|
||||
|
@ -3632,44 +3631,44 @@ type DropIndexTask struct {
|
|||
result *commonpb.Status
|
||||
}
|
||||
|
||||
func (dit *DropIndexTask) TraceCtx() context.Context {
|
||||
func (dit *dropIndexTask) TraceCtx() context.Context {
|
||||
return dit.ctx
|
||||
}
|
||||
|
||||
func (dit *DropIndexTask) ID() UniqueID {
|
||||
func (dit *dropIndexTask) ID() UniqueID {
|
||||
return dit.Base.MsgID
|
||||
}
|
||||
|
||||
func (dit *DropIndexTask) SetID(uid UniqueID) {
|
||||
func (dit *dropIndexTask) SetID(uid UniqueID) {
|
||||
dit.Base.MsgID = uid
|
||||
}
|
||||
|
||||
func (dit *DropIndexTask) Name() string {
|
||||
func (dit *dropIndexTask) Name() string {
|
||||
return DropIndexTaskName
|
||||
}
|
||||
|
||||
func (dit *DropIndexTask) Type() commonpb.MsgType {
|
||||
func (dit *dropIndexTask) Type() commonpb.MsgType {
|
||||
return dit.Base.MsgType
|
||||
}
|
||||
|
||||
func (dit *DropIndexTask) BeginTs() Timestamp {
|
||||
func (dit *dropIndexTask) BeginTs() Timestamp {
|
||||
return dit.Base.Timestamp
|
||||
}
|
||||
|
||||
func (dit *DropIndexTask) EndTs() Timestamp {
|
||||
func (dit *dropIndexTask) EndTs() Timestamp {
|
||||
return dit.Base.Timestamp
|
||||
}
|
||||
|
||||
func (dit *DropIndexTask) SetTs(ts Timestamp) {
|
||||
func (dit *dropIndexTask) SetTs(ts Timestamp) {
|
||||
dit.Base.Timestamp = ts
|
||||
}
|
||||
|
||||
func (dit *DropIndexTask) OnEnqueue() error {
|
||||
func (dit *dropIndexTask) OnEnqueue() error {
|
||||
dit.Base = &commonpb.MsgBase{}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dit *DropIndexTask) PreExecute(ctx context.Context) error {
|
||||
func (dit *dropIndexTask) PreExecute(ctx context.Context) error {
|
||||
dit.Base.MsgType = commonpb.MsgType_DropIndex
|
||||
dit.Base.SourceID = Params.ProxyID
|
||||
|
||||
|
@ -3690,7 +3689,7 @@ func (dit *DropIndexTask) PreExecute(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (dit *DropIndexTask) Execute(ctx context.Context) error {
|
||||
func (dit *dropIndexTask) Execute(ctx context.Context) error {
|
||||
var err error
|
||||
dit.result, err = dit.rootCoord.DropIndex(ctx, dit.DropIndexRequest)
|
||||
if dit.result == nil {
|
||||
|
@ -3702,11 +3701,11 @@ func (dit *DropIndexTask) Execute(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (dit *DropIndexTask) PostExecute(ctx context.Context) error {
|
||||
func (dit *dropIndexTask) PostExecute(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type GetIndexBuildProgressTask struct {
|
||||
type getIndexBuildProgressTask struct {
|
||||
Condition
|
||||
*milvuspb.GetIndexBuildProgressRequest
|
||||
ctx context.Context
|
||||
|
@ -3716,44 +3715,44 @@ type GetIndexBuildProgressTask struct {
|
|||
result *milvuspb.GetIndexBuildProgressResponse
|
||||
}
|
||||
|
||||
func (gibpt *GetIndexBuildProgressTask) TraceCtx() context.Context {
|
||||
func (gibpt *getIndexBuildProgressTask) TraceCtx() context.Context {
|
||||
return gibpt.ctx
|
||||
}
|
||||
|
||||
func (gibpt *GetIndexBuildProgressTask) ID() UniqueID {
|
||||
func (gibpt *getIndexBuildProgressTask) ID() UniqueID {
|
||||
return gibpt.Base.MsgID
|
||||
}
|
||||
|
||||
func (gibpt *GetIndexBuildProgressTask) SetID(uid UniqueID) {
|
||||
func (gibpt *getIndexBuildProgressTask) SetID(uid UniqueID) {
|
||||
gibpt.Base.MsgID = uid
|
||||
}
|
||||
|
||||
func (gibpt *GetIndexBuildProgressTask) Name() string {
|
||||
func (gibpt *getIndexBuildProgressTask) Name() string {
|
||||
return GetIndexBuildProgressTaskName
|
||||
}
|
||||
|
||||
func (gibpt *GetIndexBuildProgressTask) Type() commonpb.MsgType {
|
||||
func (gibpt *getIndexBuildProgressTask) Type() commonpb.MsgType {
|
||||
return gibpt.Base.MsgType
|
||||
}
|
||||
|
||||
func (gibpt *GetIndexBuildProgressTask) BeginTs() Timestamp {
|
||||
func (gibpt *getIndexBuildProgressTask) BeginTs() Timestamp {
|
||||
return gibpt.Base.Timestamp
|
||||
}
|
||||
|
||||
func (gibpt *GetIndexBuildProgressTask) EndTs() Timestamp {
|
||||
func (gibpt *getIndexBuildProgressTask) EndTs() Timestamp {
|
||||
return gibpt.Base.Timestamp
|
||||
}
|
||||
|
||||
func (gibpt *GetIndexBuildProgressTask) SetTs(ts Timestamp) {
|
||||
func (gibpt *getIndexBuildProgressTask) SetTs(ts Timestamp) {
|
||||
gibpt.Base.Timestamp = ts
|
||||
}
|
||||
|
||||
func (gibpt *GetIndexBuildProgressTask) OnEnqueue() error {
|
||||
func (gibpt *getIndexBuildProgressTask) OnEnqueue() error {
|
||||
gibpt.Base = &commonpb.MsgBase{}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gibpt *GetIndexBuildProgressTask) PreExecute(ctx context.Context) error {
|
||||
func (gibpt *getIndexBuildProgressTask) PreExecute(ctx context.Context) error {
|
||||
gibpt.Base.MsgType = commonpb.MsgType_GetIndexBuildProgress
|
||||
gibpt.Base.SourceID = Params.ProxyID
|
||||
|
||||
|
@ -3764,7 +3763,7 @@ func (gibpt *GetIndexBuildProgressTask) PreExecute(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (gibpt *GetIndexBuildProgressTask) Execute(ctx context.Context) error {
|
||||
func (gibpt *getIndexBuildProgressTask) Execute(ctx context.Context) error {
|
||||
collectionName := gibpt.CollectionName
|
||||
collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
|
||||
if err != nil { // err is not nil if collection not exists
|
||||
|
@ -3924,11 +3923,11 @@ func (gibpt *GetIndexBuildProgressTask) Execute(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (gibpt *GetIndexBuildProgressTask) PostExecute(ctx context.Context) error {
|
||||
func (gibpt *getIndexBuildProgressTask) PostExecute(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type GetIndexStateTask struct {
|
||||
type getIndexStateTask struct {
|
||||
Condition
|
||||
*milvuspb.GetIndexStateRequest
|
||||
ctx context.Context
|
||||
|
@ -3937,44 +3936,44 @@ type GetIndexStateTask struct {
|
|||
result *milvuspb.GetIndexStateResponse
|
||||
}
|
||||
|
||||
func (gist *GetIndexStateTask) TraceCtx() context.Context {
|
||||
func (gist *getIndexStateTask) TraceCtx() context.Context {
|
||||
return gist.ctx
|
||||
}
|
||||
|
||||
func (gist *GetIndexStateTask) ID() UniqueID {
|
||||
func (gist *getIndexStateTask) ID() UniqueID {
|
||||
return gist.Base.MsgID
|
||||
}
|
||||
|
||||
func (gist *GetIndexStateTask) SetID(uid UniqueID) {
|
||||
func (gist *getIndexStateTask) SetID(uid UniqueID) {
|
||||
gist.Base.MsgID = uid
|
||||
}
|
||||
|
||||
func (gist *GetIndexStateTask) Name() string {
|
||||
func (gist *getIndexStateTask) Name() string {
|
||||
return GetIndexStateTaskName
|
||||
}
|
||||
|
||||
func (gist *GetIndexStateTask) Type() commonpb.MsgType {
|
||||
func (gist *getIndexStateTask) Type() commonpb.MsgType {
|
||||
return gist.Base.MsgType
|
||||
}
|
||||
|
||||
func (gist *GetIndexStateTask) BeginTs() Timestamp {
|
||||
func (gist *getIndexStateTask) BeginTs() Timestamp {
|
||||
return gist.Base.Timestamp
|
||||
}
|
||||
|
||||
func (gist *GetIndexStateTask) EndTs() Timestamp {
|
||||
func (gist *getIndexStateTask) EndTs() Timestamp {
|
||||
return gist.Base.Timestamp
|
||||
}
|
||||
|
||||
func (gist *GetIndexStateTask) SetTs(ts Timestamp) {
|
||||
func (gist *getIndexStateTask) SetTs(ts Timestamp) {
|
||||
gist.Base.Timestamp = ts
|
||||
}
|
||||
|
||||
func (gist *GetIndexStateTask) OnEnqueue() error {
|
||||
func (gist *getIndexStateTask) OnEnqueue() error {
|
||||
gist.Base = &commonpb.MsgBase{}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gist *GetIndexStateTask) PreExecute(ctx context.Context) error {
|
||||
func (gist *getIndexStateTask) PreExecute(ctx context.Context) error {
|
||||
gist.Base.MsgType = commonpb.MsgType_GetIndexState
|
||||
gist.Base.SourceID = Params.ProxyID
|
||||
|
||||
|
@ -3985,7 +3984,7 @@ func (gist *GetIndexStateTask) PreExecute(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (gist *GetIndexStateTask) Execute(ctx context.Context) error {
|
||||
func (gist *getIndexStateTask) Execute(ctx context.Context) error {
|
||||
collectionName := gist.CollectionName
|
||||
collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
|
||||
if err != nil { // err is not nil if collection not exists
|
||||
|
@ -4131,11 +4130,11 @@ func (gist *GetIndexStateTask) Execute(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (gist *GetIndexStateTask) PostExecute(ctx context.Context) error {
|
||||
func (gist *getIndexStateTask) PostExecute(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type FlushTask struct {
|
||||
type flushTask struct {
|
||||
Condition
|
||||
*milvuspb.FlushRequest
|
||||
ctx context.Context
|
||||
|
@ -4143,50 +4142,50 @@ type FlushTask struct {
|
|||
result *milvuspb.FlushResponse
|
||||
}
|
||||
|
||||
func (ft *FlushTask) TraceCtx() context.Context {
|
||||
func (ft *flushTask) TraceCtx() context.Context {
|
||||
return ft.ctx
|
||||
}
|
||||
|
||||
func (ft *FlushTask) ID() UniqueID {
|
||||
func (ft *flushTask) ID() UniqueID {
|
||||
return ft.Base.MsgID
|
||||
}
|
||||
|
||||
func (ft *FlushTask) SetID(uid UniqueID) {
|
||||
func (ft *flushTask) SetID(uid UniqueID) {
|
||||
ft.Base.MsgID = uid
|
||||
}
|
||||
|
||||
func (ft *FlushTask) Name() string {
|
||||
func (ft *flushTask) Name() string {
|
||||
return FlushTaskName
|
||||
}
|
||||
|
||||
func (ft *FlushTask) Type() commonpb.MsgType {
|
||||
func (ft *flushTask) Type() commonpb.MsgType {
|
||||
return ft.Base.MsgType
|
||||
}
|
||||
|
||||
func (ft *FlushTask) BeginTs() Timestamp {
|
||||
func (ft *flushTask) BeginTs() Timestamp {
|
||||
return ft.Base.Timestamp
|
||||
}
|
||||
|
||||
func (ft *FlushTask) EndTs() Timestamp {
|
||||
func (ft *flushTask) EndTs() Timestamp {
|
||||
return ft.Base.Timestamp
|
||||
}
|
||||
|
||||
func (ft *FlushTask) SetTs(ts Timestamp) {
|
||||
func (ft *flushTask) SetTs(ts Timestamp) {
|
||||
ft.Base.Timestamp = ts
|
||||
}
|
||||
|
||||
func (ft *FlushTask) OnEnqueue() error {
|
||||
func (ft *flushTask) OnEnqueue() error {
|
||||
ft.Base = &commonpb.MsgBase{}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ft *FlushTask) PreExecute(ctx context.Context) error {
|
||||
func (ft *flushTask) PreExecute(ctx context.Context) error {
|
||||
ft.Base.MsgType = commonpb.MsgType_Flush
|
||||
ft.Base.SourceID = Params.ProxyID
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ft *FlushTask) Execute(ctx context.Context) error {
|
||||
func (ft *flushTask) Execute(ctx context.Context) error {
|
||||
coll2Segments := make(map[string]*schemapb.LongArray)
|
||||
for _, collName := range ft.CollectionNames {
|
||||
collID, err := globalMetaCache.GetCollectionID(ctx, collName)
|
||||
|
@ -4223,7 +4222,7 @@ func (ft *FlushTask) Execute(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (ft *FlushTask) PostExecute(ctx context.Context) error {
|
||||
func (ft *flushTask) PostExecute(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -684,9 +684,9 @@ func (sched *taskScheduler) collectResultLoop() {
|
|||
continue
|
||||
}
|
||||
|
||||
st, ok := t.(*SearchTask)
|
||||
st, ok := t.(*searchTask)
|
||||
if !ok {
|
||||
log.Debug("Proxy collectResultLoop type assert t as SearchTask failed", zap.Any("ReqID", reqID))
|
||||
log.Debug("Proxy collectResultLoop type assert t as searchTask failed", zap.Any("ReqID", reqID))
|
||||
delete(searchResultBufs, reqID)
|
||||
searchResultBufFlags[reqID] = true
|
||||
continue
|
||||
|
@ -718,7 +718,7 @@ func (sched *taskScheduler) collectResultLoop() {
|
|||
|
||||
//t := sched.getTaskByReqID(reqID)
|
||||
{
|
||||
colName := t.(*SearchTask).query.CollectionName
|
||||
colName := t.(*searchTask).query.CollectionName
|
||||
log.Debug("Proxy collectResultLoop", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(searchResultBufs[reqID].resultBuf)))
|
||||
}
|
||||
|
||||
|
@ -783,9 +783,9 @@ func (sched *taskScheduler) collectResultLoop() {
|
|||
continue
|
||||
}
|
||||
|
||||
st, ok := t.(*QueryTask)
|
||||
st, ok := t.(*queryTask)
|
||||
if !ok {
|
||||
log.Debug("Proxy collectResultLoop type assert t as QueryTask failed")
|
||||
log.Debug("Proxy collectResultLoop type assert t as queryTask failed")
|
||||
delete(queryResultBufs, reqID)
|
||||
queryResultBufFlags[reqID] = true
|
||||
continue
|
||||
|
@ -817,7 +817,7 @@ func (sched *taskScheduler) collectResultLoop() {
|
|||
|
||||
//t := sched.getTaskByReqID(reqID)
|
||||
{
|
||||
colName := t.(*QueryTask).query.CollectionName
|
||||
colName := t.(*queryTask).query.CollectionName
|
||||
log.Debug("Proxy collectResultLoop", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(queryResultBufs[reqID].resultBuf)))
|
||||
}
|
||||
|
||||
|
|
|
@ -101,7 +101,7 @@ func TestInsertTask_checkLengthOfFieldsData(t *testing.T) {
|
|||
var err error
|
||||
|
||||
// schema is empty, though won't happened in system
|
||||
case1 := InsertTask{
|
||||
case1 := insertTask{
|
||||
schema: &schemapb.CollectionSchema{
|
||||
Name: "TestInsertTask_checkLengthOfFieldsData",
|
||||
Description: "TestInsertTask_checkLengthOfFieldsData",
|
||||
|
@ -119,7 +119,7 @@ func TestInsertTask_checkLengthOfFieldsData(t *testing.T) {
|
|||
assert.Equal(t, nil, err)
|
||||
|
||||
// schema has two fields, neither of them are autoID
|
||||
case2 := InsertTask{
|
||||
case2 := insertTask{
|
||||
schema: &schemapb.CollectionSchema{
|
||||
Name: "TestInsertTask_checkLengthOfFieldsData",
|
||||
Description: "TestInsertTask_checkLengthOfFieldsData",
|
||||
|
@ -165,7 +165,7 @@ func TestInsertTask_checkLengthOfFieldsData(t *testing.T) {
|
|||
assert.Equal(t, nil, err)
|
||||
|
||||
// schema has two field, one of them are autoID
|
||||
case3 := InsertTask{
|
||||
case3 := insertTask{
|
||||
schema: &schemapb.CollectionSchema{
|
||||
Name: "TestInsertTask_checkLengthOfFieldsData",
|
||||
Description: "TestInsertTask_checkLengthOfFieldsData",
|
||||
|
@ -198,7 +198,7 @@ func TestInsertTask_checkLengthOfFieldsData(t *testing.T) {
|
|||
assert.Equal(t, nil, err)
|
||||
|
||||
// schema has one field which is autoID
|
||||
case4 := InsertTask{
|
||||
case4 := insertTask{
|
||||
schema: &schemapb.CollectionSchema{
|
||||
Name: "TestInsertTask_checkLengthOfFieldsData",
|
||||
Description: "TestInsertTask_checkLengthOfFieldsData",
|
||||
|
@ -222,7 +222,7 @@ func TestInsertTask_checkRowNums(t *testing.T) {
|
|||
var err error
|
||||
|
||||
// passed NumRows is less than 0
|
||||
case1 := InsertTask{
|
||||
case1 := insertTask{
|
||||
req: &milvuspb.InsertRequest{
|
||||
NumRows: 0,
|
||||
},
|
||||
|
@ -234,7 +234,7 @@ func TestInsertTask_checkRowNums(t *testing.T) {
|
|||
|
||||
numRows := 20
|
||||
dim := 128
|
||||
case2 := InsertTask{
|
||||
case2 := insertTask{
|
||||
schema: &schemapb.CollectionSchema{
|
||||
Name: "TestInsertTask_checkRowNums",
|
||||
Description: "TestInsertTask_checkRowNums",
|
||||
|
@ -508,3 +508,7 @@ func TestTranslateOutputFields(t *testing.T) {
|
|||
assert.Equal(t, nil, err)
|
||||
assert.ElementsMatch(t, []string{idFieldName, floatVectorFieldName, binaryVectorFieldName}, outputFields)
|
||||
}
|
||||
|
||||
func TestCreateCollectionTask(t *testing.T) {
|
||||
|
||||
}
|
||||
|
|
|
@ -1,208 +0,0 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
||||
)
|
||||
|
||||
func generateBoolArray(numRows int) []bool {
|
||||
ret := make([]bool, 0, numRows)
|
||||
for i := 0; i < numRows; i++ {
|
||||
ret = append(ret, rand.Int()%2 == 0)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func generateInt8Array(numRows int) []int8 {
|
||||
ret := make([]int8, 0, numRows)
|
||||
for i := 0; i < numRows; i++ {
|
||||
ret = append(ret, int8(rand.Int()))
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func generateInt16Array(numRows int) []int16 {
|
||||
ret := make([]int16, 0, numRows)
|
||||
for i := 0; i < numRows; i++ {
|
||||
ret = append(ret, int16(rand.Int()))
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func generateInt32Array(numRows int) []int32 {
|
||||
ret := make([]int32, 0, numRows)
|
||||
for i := 0; i < numRows; i++ {
|
||||
ret = append(ret, int32(rand.Int()))
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func generateInt64Array(numRows int) []int64 {
|
||||
ret := make([]int64, 0, numRows)
|
||||
for i := 0; i < numRows; i++ {
|
||||
ret = append(ret, int64(rand.Int()))
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func generateFloat32Array(numRows int) []float32 {
|
||||
ret := make([]float32, 0, numRows)
|
||||
for i := 0; i < numRows; i++ {
|
||||
ret = append(ret, rand.Float32())
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func generateFloat64Array(numRows int) []float64 {
|
||||
ret := make([]float64, 0, numRows)
|
||||
for i := 0; i < numRows; i++ {
|
||||
ret = append(ret, rand.Float64())
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func generateFloatVectors(numRows, dim int) []float32 {
|
||||
total := numRows * dim
|
||||
ret := make([]float32, 0, total)
|
||||
for i := 0; i < total; i++ {
|
||||
ret = append(ret, rand.Float32())
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func generateBinaryVectors(numRows, dim int) []byte {
|
||||
total := (numRows * dim) / 8
|
||||
ret := make([]byte, total)
|
||||
_, err := rand.Read(ret)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func newScalarFieldData(dType schemapb.DataType, fieldName string, numRows int) *schemapb.FieldData {
|
||||
ret := &schemapb.FieldData{
|
||||
Type: dType,
|
||||
FieldName: fieldName,
|
||||
Field: nil,
|
||||
}
|
||||
|
||||
switch dType {
|
||||
case schemapb.DataType_Bool:
|
||||
ret.Field = &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_BoolData{
|
||||
BoolData: &schemapb.BoolArray{
|
||||
Data: generateBoolArray(numRows),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
case schemapb.DataType_Int8:
|
||||
ret.Field = &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_IntData{
|
||||
IntData: &schemapb.IntArray{
|
||||
Data: generateInt32Array(numRows),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
case schemapb.DataType_Int16:
|
||||
ret.Field = &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_IntData{
|
||||
IntData: &schemapb.IntArray{
|
||||
Data: generateInt32Array(numRows),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
case schemapb.DataType_Int32:
|
||||
ret.Field = &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_IntData{
|
||||
IntData: &schemapb.IntArray{
|
||||
Data: generateInt32Array(numRows),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
case schemapb.DataType_Int64:
|
||||
ret.Field = &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_LongData{
|
||||
LongData: &schemapb.LongArray{
|
||||
Data: generateInt64Array(numRows),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
case schemapb.DataType_Float:
|
||||
ret.Field = &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_FloatData{
|
||||
FloatData: &schemapb.FloatArray{
|
||||
Data: generateFloat32Array(numRows),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
case schemapb.DataType_Double:
|
||||
ret.Field = &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_DoubleData{
|
||||
DoubleData: &schemapb.DoubleArray{
|
||||
Data: generateFloat64Array(numRows),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
func newFloatVectorFieldData(fieldName string, numRows, dim int) *schemapb.FieldData {
|
||||
return &schemapb.FieldData{
|
||||
Type: schemapb.DataType_FloatVector,
|
||||
FieldName: fieldName,
|
||||
Field: &schemapb.FieldData_Vectors{
|
||||
Vectors: &schemapb.VectorField{
|
||||
Dim: int64(dim),
|
||||
Data: &schemapb.VectorField_FloatVector{
|
||||
FloatVector: &schemapb.FloatArray{
|
||||
Data: generateFloatVectors(numRows, dim),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newBinaryVectorFieldData(fieldName string, numRows, dim int) *schemapb.FieldData {
|
||||
return &schemapb.FieldData{
|
||||
Type: schemapb.DataType_BinaryVector,
|
||||
FieldName: fieldName,
|
||||
Field: &schemapb.FieldData_Vectors{
|
||||
Vectors: &schemapb.VectorField{
|
||||
Dim: int64(dim),
|
||||
Data: &schemapb.VectorField_BinaryVector{
|
||||
BinaryVector: generateBinaryVectors(numRows, dim),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
package milvuserrors
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
func MsgCollectionAlreadyExist(name string) string {
|
||||
return fmt.Sprintf("Collection %s already exist", name)
|
||||
}
|
||||
|
||||
func ErrCollectionAlreadyExist(name string) error {
|
||||
return errors.New(MsgCollectionAlreadyExist(name))
|
||||
}
|
||||
|
||||
func MsgCollectionNotExist(name string) string {
|
||||
return fmt.Sprintf("Collection %s not exist", name)
|
||||
}
|
||||
|
||||
func ErrCollectionNotExist(name string) error {
|
||||
return errors.New(MsgCollectionNotExist(name))
|
||||
}
|
||||
|
||||
func MsgPartitionAlreadyExist(name string) string {
|
||||
return fmt.Sprintf("Partition %s already exist", name)
|
||||
}
|
||||
|
||||
func ErrPartitionAlreadyExist(name string) error {
|
||||
return errors.New(MsgPartitionAlreadyExist(name))
|
||||
}
|
||||
|
||||
func MsgPartitionNotExist(name string) string {
|
||||
return fmt.Sprintf("Partition %s not exist", name)
|
||||
}
|
||||
|
||||
func ErrPartitionNotExist(name string) error {
|
||||
return errors.New(MsgPartitionNotExist(name))
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
package milvuserrors
|
Loading…
Reference in New Issue