milvus/internal/distributed/querynode/mock.go

194 lines
5.3 KiB
Go
Raw Normal View History

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcquerynode
import (
"path"
"strconv"
"errors"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
internalPb "github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
)
const (
collectionID = 1
binlogPathPrefix = "distributed-query-test-binlog"
indexPathPrefix = "distributed-query-test-index"
uidFieldID = 0
timestampFieldID = 1
vecFieldID = 100
ageFieldID = 101
vecParamsID = "indexParams"
vecDataID = "IVF"
)
var fieldIDs = []int64{uidFieldID, timestampFieldID, vecFieldID, ageFieldID}
/*
masterMock receive segmentID ,return indexID, segmentID = IndexID
dataMock return binlogPath, path = distributed-query-test-binlog/collectionID/segmentID/fieldID
indexMock return indexPath and IndexParam, indexPath = distributed-query-test-index/collectionID/segmentID/indexID,
indexParam use default:
indexID: 1
schema:
collectionID: 1
partitionID: 1
segmentID: [1, 10]
0: int64: uid
1: int64: timestamp
100: float32: vec: 16
101: int32: age
indexParams:
indexParams := make(map[string]string)
indexParams["index_type"] = "IVF_PQ"
indexParams["index_mode"] = "cpu"
indexParams["dim"] = "16"
indexParams["k"] = "10"
indexParams["nlist"] = "100"
indexParams["nprobe"] = "10"
indexParams["m"] = "4"
indexParams["nbits"] = "8"
indexParams["metric_type"] = "L2"
indexParams["SLICE_SIZE"] = "4"
*/
type RootCoordMock struct {
Count int
}
func (m *RootCoordMock) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
if m.Count < 20 {
m.Count++
return nil, errors.New("index not exit")
}
indexParams := make(map[string]string)
indexParams["index_type"] = "IVF_PQ"
indexParams["index_mode"] = "cpu"
indexParams["dim"] = "16"
indexParams["k"] = "10"
indexParams["nlist"] = "100"
indexParams["nprobe"] = "10"
indexParams["m"] = "4"
indexParams["nbits"] = "8"
indexParams["metric_type"] = "L2"
indexParams["SLICE_SIZE"] = "4"
params := make([]*commonpb.KeyValuePair, 0)
for k, v := range indexParams {
params = append(params, &commonpb.KeyValuePair{
Key: k,
Value: v,
})
}
rsp := &milvuspb.DescribeSegmentResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
IndexID: in.SegmentID, // use index id as segment id
BuildID: in.SegmentID,
}
return rsp, nil
}
type DataCoordMock struct {
Count int
}
func (data *DataCoordMock) GetInsertBinlogPaths(req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) {
if data.Count < 10 {
data.Count++
return nil, errors.New("binlog not exist")
}
paths := make([]*internalPb.StringList, len(fieldIDs))
for i := range paths {
pathKey := path.Join(binlogPathPrefix,
strconv.FormatInt(collectionID, 10),
strconv.FormatInt(req.SegmentID, 10),
strconv.FormatInt(fieldIDs[i], 10))
paths[i] = &internalPb.StringList{
Values: []string{pathKey},
}
}
rsp := &datapb.GetInsertBinlogPathsResponse{
FieldIDs: fieldIDs,
Paths: paths,
}
return rsp, nil
}
func (data *DataCoordMock) GetSegmentStates(req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
segmentGrowingInfo := &datapb.SegmentStateInfo{
State: commonpb.SegmentState_Growing,
}
segmentFlushedInfo := &datapb.SegmentStateInfo{
State: commonpb.SegmentState_Flushed,
}
if data.Count < 10 {
data.Count++
return &datapb.GetSegmentStatesResponse{
States: []*datapb.SegmentStateInfo{segmentGrowingInfo},
}, nil
}
return &datapb.GetSegmentStatesResponse{
States: []*datapb.SegmentStateInfo{segmentFlushedInfo},
}, nil
}
2021-06-21 09:28:03 +00:00
type IndexCoordMock struct {
Count int
}
2021-06-21 09:28:03 +00:00
func (index *IndexCoordMock) GetIndexFilePaths(req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) {
if index.Count < 30 {
index.Count++
return nil, errors.New("index path not exist")
}
if len(req.IndexBuildIDs) != 1 {
panic("illegal index ids")
}
segmentID := req.IndexBuildIDs[0] // use index id as segment id
indexPaths1 := path.Join(indexPathPrefix,
strconv.FormatInt(collectionID, 10),
strconv.FormatInt(segmentID, 10),
vecDataID)
indexPaths2 := path.Join(indexPathPrefix,
strconv.FormatInt(collectionID, 10),
strconv.FormatInt(segmentID, 10),
vecParamsID)
indexPathInfo := make([]*indexpb.IndexFilePathInfo, 1)
indexPathInfo[0] = &indexpb.IndexFilePathInfo{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
IndexFilePaths: []string{indexPaths1, indexPaths2},
}
rsp := &indexpb.GetIndexFilePathsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
FilePaths: indexPathInfo,
}
return rsp, nil
}