mirror of https://github.com/milvus-io/milvus.git
194 lines
5.3 KiB
Go
194 lines
5.3 KiB
Go
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
|
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
|
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
|
|
|
package grpcquerynode
|
|
|
|
import (
|
|
"path"
|
|
"strconv"
|
|
|
|
"errors"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
|
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
|
internalPb "github.com/milvus-io/milvus/internal/proto/internalpb"
|
|
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
|
)
|
|
|
|
const (
|
|
collectionID = 1
|
|
|
|
binlogPathPrefix = "distributed-query-test-binlog"
|
|
indexPathPrefix = "distributed-query-test-index"
|
|
|
|
uidFieldID = 0
|
|
timestampFieldID = 1
|
|
vecFieldID = 100
|
|
ageFieldID = 101
|
|
vecParamsID = "indexParams"
|
|
vecDataID = "IVF"
|
|
)
|
|
|
|
var fieldIDs = []int64{uidFieldID, timestampFieldID, vecFieldID, ageFieldID}
|
|
|
|
/*
|
|
masterMock receive segmentID ,return indexID, segmentID = IndexID
|
|
dataMock return binlogPath, path = distributed-query-test-binlog/collectionID/segmentID/fieldID
|
|
indexMock return indexPath and IndexParam, indexPath = distributed-query-test-index/collectionID/segmentID/indexID,
|
|
indexParam use default:
|
|
|
|
indexID: 1
|
|
|
|
schema:
|
|
collectionID: 1
|
|
partitionID: 1
|
|
segmentID: [1, 10]
|
|
0: int64: uid
|
|
1: int64: timestamp
|
|
100: float32: vec: 16
|
|
101: int32: age
|
|
|
|
indexParams:
|
|
indexParams := make(map[string]string)
|
|
indexParams["index_type"] = "IVF_PQ"
|
|
indexParams["index_mode"] = "cpu"
|
|
indexParams["dim"] = "16"
|
|
indexParams["k"] = "10"
|
|
indexParams["nlist"] = "100"
|
|
indexParams["nprobe"] = "10"
|
|
indexParams["m"] = "4"
|
|
indexParams["nbits"] = "8"
|
|
indexParams["metric_type"] = "L2"
|
|
indexParams["SLICE_SIZE"] = "4"
|
|
*/
|
|
|
|
type RootCoordMock struct {
|
|
Count int
|
|
}
|
|
|
|
func (m *RootCoordMock) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
|
|
if m.Count < 20 {
|
|
m.Count++
|
|
return nil, errors.New("index not exit")
|
|
}
|
|
indexParams := make(map[string]string)
|
|
indexParams["index_type"] = "IVF_PQ"
|
|
indexParams["index_mode"] = "cpu"
|
|
indexParams["dim"] = "16"
|
|
indexParams["k"] = "10"
|
|
indexParams["nlist"] = "100"
|
|
indexParams["nprobe"] = "10"
|
|
indexParams["m"] = "4"
|
|
indexParams["nbits"] = "8"
|
|
indexParams["metric_type"] = "L2"
|
|
indexParams["SLICE_SIZE"] = "4"
|
|
|
|
params := make([]*commonpb.KeyValuePair, 0)
|
|
for k, v := range indexParams {
|
|
params = append(params, &commonpb.KeyValuePair{
|
|
Key: k,
|
|
Value: v,
|
|
})
|
|
}
|
|
rsp := &milvuspb.DescribeSegmentResponse{
|
|
Status: &commonpb.Status{
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
},
|
|
IndexID: in.SegmentID, // use index id as segment id
|
|
BuildID: in.SegmentID,
|
|
}
|
|
return rsp, nil
|
|
}
|
|
|
|
type DataCoordMock struct {
|
|
Count int
|
|
}
|
|
|
|
func (data *DataCoordMock) GetInsertBinlogPaths(req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) {
|
|
if data.Count < 10 {
|
|
data.Count++
|
|
return nil, errors.New("binlog not exist")
|
|
}
|
|
paths := make([]*internalPb.StringList, len(fieldIDs))
|
|
for i := range paths {
|
|
pathKey := path.Join(binlogPathPrefix,
|
|
strconv.FormatInt(collectionID, 10),
|
|
strconv.FormatInt(req.SegmentID, 10),
|
|
strconv.FormatInt(fieldIDs[i], 10))
|
|
paths[i] = &internalPb.StringList{
|
|
Values: []string{pathKey},
|
|
}
|
|
}
|
|
rsp := &datapb.GetInsertBinlogPathsResponse{
|
|
FieldIDs: fieldIDs,
|
|
Paths: paths,
|
|
}
|
|
return rsp, nil
|
|
}
|
|
|
|
func (data *DataCoordMock) GetSegmentStates(req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
|
|
segmentGrowingInfo := &datapb.SegmentStateInfo{
|
|
State: commonpb.SegmentState_Growing,
|
|
}
|
|
segmentFlushedInfo := &datapb.SegmentStateInfo{
|
|
State: commonpb.SegmentState_Flushed,
|
|
}
|
|
|
|
if data.Count < 10 {
|
|
data.Count++
|
|
return &datapb.GetSegmentStatesResponse{
|
|
States: []*datapb.SegmentStateInfo{segmentGrowingInfo},
|
|
}, nil
|
|
}
|
|
|
|
return &datapb.GetSegmentStatesResponse{
|
|
States: []*datapb.SegmentStateInfo{segmentFlushedInfo},
|
|
}, nil
|
|
}
|
|
|
|
type IndexCoordMock struct {
|
|
Count int
|
|
}
|
|
|
|
func (index *IndexCoordMock) GetIndexFilePaths(req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) {
|
|
if index.Count < 30 {
|
|
index.Count++
|
|
return nil, errors.New("index path not exist")
|
|
}
|
|
if len(req.IndexBuildIDs) != 1 {
|
|
panic("illegal index ids")
|
|
}
|
|
segmentID := req.IndexBuildIDs[0] // use index id as segment id
|
|
indexPaths1 := path.Join(indexPathPrefix,
|
|
strconv.FormatInt(collectionID, 10),
|
|
strconv.FormatInt(segmentID, 10),
|
|
vecDataID)
|
|
indexPaths2 := path.Join(indexPathPrefix,
|
|
strconv.FormatInt(collectionID, 10),
|
|
strconv.FormatInt(segmentID, 10),
|
|
vecParamsID)
|
|
indexPathInfo := make([]*indexpb.IndexFilePathInfo, 1)
|
|
indexPathInfo[0] = &indexpb.IndexFilePathInfo{
|
|
Status: &commonpb.Status{
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
},
|
|
IndexFilePaths: []string{indexPaths1, indexPaths2},
|
|
}
|
|
rsp := &indexpb.GetIndexFilePathsResponse{
|
|
Status: &commonpb.Status{
|
|
ErrorCode: commonpb.ErrorCode_Success,
|
|
},
|
|
FilePaths: indexPathInfo,
|
|
}
|
|
return rsp, nil
|
|
}
|