mirror of https://github.com/milvus-io/milvus.git
Fix docker lib dependency
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>pull/4973/head^2
parent
a4860beae2
commit
6a3ec57fef
|
@ -2,7 +2,7 @@
|
|||
"name": "Milvus Distributed Dev Container Definition",
|
||||
"dockerComposeFile": ["./docker-compose-vscode.yml"],
|
||||
"service": "ubuntu",
|
||||
"initializeCommand": "scripts/init_devcontainer.sh && docker-compose -f docker-compose-vscode.yml down all -v || true && docker-compose -f docker-compose-vscode.yml pull --ignore-pull-failures ubuntu",
|
||||
"initializeCommand": "scripts/init_devcontainer.sh && docker-compose -f docker-compose-vscode.yml down || true",
|
||||
"workspaceFolder": "/go/src/github.com/zilliztech/milvus-distributed",
|
||||
"shutdownAction": "stopCompose",
|
||||
"extensions": [
|
||||
|
|
|
@ -11,7 +11,7 @@ pulsar/client-cpp/build/*
|
|||
# vscode generated files
|
||||
.vscode
|
||||
docker-compose-vscode.yml
|
||||
docker-compose-vscode.yml.bak
|
||||
docker-compose-vscode.yml.tmp
|
||||
|
||||
cmake-build-debug
|
||||
cmake-build-release
|
||||
|
|
|
@ -6,4 +6,4 @@ PULSAR_ADDRESS=pulsar://pulsar:6650
|
|||
ETCD_ADDRESS=etcd:2379
|
||||
MASTER_ADDRESS=master:53100
|
||||
MINIO_ADDRESS=minio:9000
|
||||
INDEX_BUILDER_ADDRESS=indexbuider:31000
|
||||
INDEX_BUILDER_ADDRESS=indexbuilder:31000
|
||||
|
|
|
@ -9,12 +9,31 @@
|
|||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
# or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
FROM alpine:3.12.1
|
||||
FROM milvusdb/milvus-distributed-dev:amd64-ubuntu18.04-latest AS openblas
|
||||
|
||||
#FROM alpine
|
||||
FROM ubuntu:bionic-20200921
|
||||
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends libtbb-dev gfortran
|
||||
|
||||
#RUN echo "http://dl-cdn.alpinelinux.org/alpine/edge/testing" >> /etc/apk/repositories
|
||||
|
||||
#RUN sed -i "s/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g" /etc/apk/repositories \
|
||||
# && apk add --no-cache libtbb gfortran
|
||||
|
||||
COPY --from=openblas /usr/lib/libopenblas-r0.3.9.so /usr/lib/
|
||||
|
||||
RUN ln -s /usr/lib/libopenblas-r0.3.9.so /usr/lib/libopenblas.so.0 && \
|
||||
ln -s /usr/lib/libopenblas.so.0 /usr/lib/libopenblas.so
|
||||
|
||||
COPY ./bin/indexbuilder /milvus-distributed/bin/indexbuilder
|
||||
|
||||
COPY ./configs/ /milvus-distributed/configs/
|
||||
|
||||
COPY ./lib/ /milvus-distributed/lib/
|
||||
|
||||
ENV LD_LIBRARY_PATH=/milvus-distributed/lib:$LD_LIBRARY_PATH:/usr/lib
|
||||
|
||||
WORKDIR /milvus-distributed/
|
||||
|
||||
CMD ["./bin/indexbuilder"]
|
||||
|
|
|
@ -126,7 +126,7 @@ func TestSegmentManager_AssignSegment(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
time.Sleep(time.Duration(Params.SegIDAssignExpiration))
|
||||
time.Sleep(time.Duration(Params.SegIDAssignExpiration) * time.Millisecond)
|
||||
timestamp, err := globalTsoAllocator()
|
||||
assert.Nil(t, err)
|
||||
err = mt.UpdateSegment(&pb.SegmentMeta{
|
||||
|
@ -156,3 +156,122 @@ func TestSegmentManager_AssignSegment(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
assert.NotEqualValues(t, 0, segMeta.CloseTime)
|
||||
}
|
||||
|
||||
func TestSegmentManager_SycnWritenode(t *testing.T) {
|
||||
ctx, cancelFunc := context.WithCancel(context.TODO())
|
||||
defer cancelFunc()
|
||||
|
||||
Init()
|
||||
Params.TopicNum = 5
|
||||
Params.QueryNodeNum = 3
|
||||
Params.SegmentSize = 536870912 / 1024 / 1024
|
||||
Params.SegmentSizeFactor = 0.75
|
||||
Params.DefaultRecordSize = 1024
|
||||
Params.MinSegIDAssignCnt = 1048576 / 1024
|
||||
Params.SegIDAssignExpiration = 2000
|
||||
etcdAddress := Params.EtcdAddress
|
||||
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})
|
||||
assert.Nil(t, err)
|
||||
rootPath := "/test/root"
|
||||
_, err = cli.Delete(ctx, rootPath, clientv3.WithPrefix())
|
||||
assert.Nil(t, err)
|
||||
|
||||
kvBase := etcdkv.NewEtcdKV(cli, rootPath)
|
||||
defer kvBase.Close()
|
||||
mt, err := NewMetaTable(kvBase)
|
||||
assert.Nil(t, err)
|
||||
|
||||
collName := "segmgr_test_coll"
|
||||
var collID int64 = 1001
|
||||
partitionTag := "test_part"
|
||||
schema := &schemapb.CollectionSchema{
|
||||
Name: collName,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{FieldID: 1, Name: "f1", IsPrimaryKey: false, DataType: schemapb.DataType_INT32},
|
||||
{FieldID: 2, Name: "f2", IsPrimaryKey: false, DataType: schemapb.DataType_VECTOR_FLOAT, TypeParams: []*commonpb.KeyValuePair{
|
||||
{Key: "dim", Value: "128"},
|
||||
}},
|
||||
},
|
||||
}
|
||||
err = mt.AddCollection(&pb.CollectionMeta{
|
||||
ID: collID,
|
||||
Schema: schema,
|
||||
CreateTime: 0,
|
||||
SegmentIDs: []UniqueID{},
|
||||
PartitionTags: []string{},
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
err = mt.AddPartition(collID, partitionTag)
|
||||
assert.Nil(t, err)
|
||||
|
||||
var cnt int64
|
||||
globalIDAllocator := func() (UniqueID, error) {
|
||||
val := atomic.AddInt64(&cnt, 1)
|
||||
return val, nil
|
||||
}
|
||||
globalTsoAllocator := func() (Timestamp, error) {
|
||||
val := atomic.AddInt64(&cnt, 1)
|
||||
phy := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
ts := tsoutil.ComposeTS(phy, val)
|
||||
return ts, nil
|
||||
}
|
||||
syncWriteChan := make(chan *msgstream.TimeTickMsg)
|
||||
syncProxyChan := make(chan *msgstream.TimeTickMsg)
|
||||
|
||||
segAssigner := NewSegmentAssigner(ctx, mt, globalTsoAllocator, syncProxyChan)
|
||||
mockScheduler := &MockFlushScheduler{}
|
||||
segManager, err := NewSegmentManager(ctx, mt, globalIDAllocator, globalTsoAllocator, syncWriteChan, mockScheduler, segAssigner)
|
||||
assert.Nil(t, err)
|
||||
|
||||
segManager.Start()
|
||||
defer segManager.Close()
|
||||
sizePerRecord, err := typeutil.EstimateSizePerRecord(schema)
|
||||
assert.Nil(t, err)
|
||||
maxCount := uint32(Params.SegmentSize * 1024 * 1024 / float64(sizePerRecord))
|
||||
|
||||
req := []*internalpb.SegIDRequest{
|
||||
{Count: maxCount, ChannelID: 1, CollName: collName, PartitionTag: partitionTag},
|
||||
{Count: maxCount, ChannelID: 2, CollName: collName, PartitionTag: partitionTag},
|
||||
{Count: maxCount, ChannelID: 3, CollName: collName, PartitionTag: partitionTag},
|
||||
}
|
||||
assignSegment, err := segManager.AssignSegment(req)
|
||||
assert.Nil(t, err)
|
||||
timestamp, err := globalTsoAllocator()
|
||||
assert.Nil(t, err)
|
||||
for i := 0; i < len(assignSegment); i++ {
|
||||
assert.EqualValues(t, maxCount, assignSegment[i].Count)
|
||||
assert.EqualValues(t, i+1, assignSegment[i].ChannelID)
|
||||
|
||||
err = mt.UpdateSegment(&pb.SegmentMeta{
|
||||
SegmentID: assignSegment[i].SegID,
|
||||
CollectionID: collID,
|
||||
PartitionTag: partitionTag,
|
||||
ChannelStart: 0,
|
||||
ChannelEnd: 1,
|
||||
CloseTime: timestamp,
|
||||
NumRows: int64(maxCount),
|
||||
MemSize: 500000,
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
time.Sleep(time.Duration(Params.SegIDAssignExpiration) * time.Millisecond)
|
||||
|
||||
timestamp, err = globalTsoAllocator()
|
||||
assert.Nil(t, err)
|
||||
tsMsg := &msgstream.TimeTickMsg{
|
||||
BaseMsg: msgstream.BaseMsg{
|
||||
BeginTimestamp: timestamp, EndTimestamp: timestamp, HashValues: []uint32{},
|
||||
},
|
||||
TimeTickMsg: internalpb.TimeTickMsg{
|
||||
MsgType: internalpb.MsgType_kTimeTick,
|
||||
PeerID: 1,
|
||||
Timestamp: timestamp,
|
||||
},
|
||||
}
|
||||
syncWriteChan <- tsMsg
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
|
||||
status := segManager.collStatus[collID]
|
||||
assert.Empty(t, status.segments)
|
||||
}
|
||||
|
|
|
@ -100,6 +100,7 @@ func (lis *loadIndexService) start() {
|
|||
continue
|
||||
}
|
||||
// 1. use msg's index paths to get index bytes
|
||||
fmt.Println("start load index")
|
||||
var indexBuffer [][]byte
|
||||
var err error
|
||||
fn := func() error {
|
||||
|
@ -138,6 +139,13 @@ func (lis *loadIndexService) start() {
|
|||
}
|
||||
}
|
||||
|
||||
func (lis *loadIndexService) close() {
|
||||
if lis.loadIndexMsgStream != nil {
|
||||
lis.loadIndexMsgStream.Close()
|
||||
}
|
||||
lis.cancel()
|
||||
}
|
||||
|
||||
func (lis *loadIndexService) printIndexParams(index []*commonpb.KeyValuePair) {
|
||||
fmt.Println("=================================================")
|
||||
for i := 0; i < len(index); i++ {
|
||||
|
|
|
@ -22,26 +22,29 @@ import (
|
|||
"github.com/zilliztech/milvus-distributed/internal/querynode/client"
|
||||
)
|
||||
|
||||
func TestLoadIndexService(t *testing.T) {
|
||||
func TestLoadIndexService_FloatVector(t *testing.T) {
|
||||
node := newQueryNode()
|
||||
collectionID := rand.Int63n(1000000)
|
||||
segmentID := rand.Int63n(1000000)
|
||||
initTestMeta(t, node, "collection0", collectionID, segmentID)
|
||||
|
||||
// loadIndexService and statsService
|
||||
suffix := "-test-search" + strconv.FormatInt(rand.Int63n(1000000), 10)
|
||||
oldSearchChannelNames := Params.SearchChannelNames
|
||||
var newSearchChannelNames []string
|
||||
for _, channel := range oldSearchChannelNames {
|
||||
newSearchChannelNames = append(newSearchChannelNames, channel+"new")
|
||||
}
|
||||
newSearchChannelNames := makeNewChannelNames(oldSearchChannelNames, suffix)
|
||||
Params.SearchChannelNames = newSearchChannelNames
|
||||
|
||||
oldSearchResultChannelNames := Params.SearchChannelNames
|
||||
var newSearchResultChannelNames []string
|
||||
for _, channel := range oldSearchResultChannelNames {
|
||||
newSearchResultChannelNames = append(newSearchResultChannelNames, channel+"new")
|
||||
}
|
||||
newSearchResultChannelNames := makeNewChannelNames(oldSearchResultChannelNames, suffix)
|
||||
Params.SearchResultChannelNames = newSearchResultChannelNames
|
||||
|
||||
oldLoadIndexChannelNames := Params.LoadIndexChannelNames
|
||||
newLoadIndexChannelNames := makeNewChannelNames(oldLoadIndexChannelNames, suffix)
|
||||
Params.LoadIndexChannelNames = newLoadIndexChannelNames
|
||||
|
||||
oldStatsChannelName := Params.StatsChannelName
|
||||
newStatsChannelNames := makeNewChannelNames([]string{oldStatsChannelName}, suffix)
|
||||
Params.StatsChannelName = newStatsChannelNames[0]
|
||||
go node.Start()
|
||||
|
||||
//generate insert data
|
||||
|
@ -328,9 +331,319 @@ func TestLoadIndexService(t *testing.T) {
|
|||
}
|
||||
Params.SearchChannelNames = oldSearchChannelNames
|
||||
Params.SearchResultChannelNames = oldSearchResultChannelNames
|
||||
Params.LoadIndexChannelNames = oldLoadIndexChannelNames
|
||||
Params.StatsChannelName = oldStatsChannelName
|
||||
fmt.Println("loadIndex floatVector test Done!")
|
||||
|
||||
defer assert.Equal(t, findFiledStats, true)
|
||||
<-node.queryNodeLoopCtx.Done()
|
||||
node.Close()
|
||||
}
|
||||
|
||||
func TestLoadIndexService_BinaryVector(t *testing.T) {
|
||||
node := newQueryNode()
|
||||
collectionID := rand.Int63n(1000000)
|
||||
segmentID := rand.Int63n(1000000)
|
||||
initTestMeta(t, node, "collection0", collectionID, segmentID, true)
|
||||
|
||||
// loadIndexService and statsService
|
||||
suffix := "-test-search-binary" + strconv.FormatInt(rand.Int63n(1000000), 10)
|
||||
oldSearchChannelNames := Params.SearchChannelNames
|
||||
newSearchChannelNames := makeNewChannelNames(oldSearchChannelNames, suffix)
|
||||
Params.SearchChannelNames = newSearchChannelNames
|
||||
|
||||
oldSearchResultChannelNames := Params.SearchChannelNames
|
||||
newSearchResultChannelNames := makeNewChannelNames(oldSearchResultChannelNames, suffix)
|
||||
Params.SearchResultChannelNames = newSearchResultChannelNames
|
||||
|
||||
oldLoadIndexChannelNames := Params.LoadIndexChannelNames
|
||||
newLoadIndexChannelNames := makeNewChannelNames(oldLoadIndexChannelNames, suffix)
|
||||
Params.LoadIndexChannelNames = newLoadIndexChannelNames
|
||||
|
||||
oldStatsChannelName := Params.StatsChannelName
|
||||
newStatsChannelNames := makeNewChannelNames([]string{oldStatsChannelName}, suffix)
|
||||
Params.StatsChannelName = newStatsChannelNames[0]
|
||||
go node.Start()
|
||||
|
||||
const msgLength = 1000
|
||||
const receiveBufSize = 1024
|
||||
const DIM = 128
|
||||
|
||||
// generator index data
|
||||
var indexRowData []byte
|
||||
for n := 0; n < msgLength; n++ {
|
||||
for i := 0; i < DIM/8; i++ {
|
||||
indexRowData = append(indexRowData, byte(rand.Intn(8)))
|
||||
}
|
||||
}
|
||||
|
||||
//generator insert data
|
||||
var insertRowBlob []*commonpb.Blob
|
||||
var timestamps []uint64
|
||||
var rowIDs []int64
|
||||
var hashValues []uint32
|
||||
offset := 0
|
||||
for n := 0; n < msgLength; n++ {
|
||||
rowData := make([]byte, 0)
|
||||
rowData = append(rowData, indexRowData[offset:offset+(DIM/8)]...)
|
||||
offset += DIM / 8
|
||||
age := make([]byte, 4)
|
||||
binary.LittleEndian.PutUint32(age, 1)
|
||||
rowData = append(rowData, age...)
|
||||
blob := &commonpb.Blob{
|
||||
Value: rowData,
|
||||
}
|
||||
insertRowBlob = append(insertRowBlob, blob)
|
||||
timestamps = append(timestamps, uint64(n))
|
||||
rowIDs = append(rowIDs, int64(n))
|
||||
hashValues = append(hashValues, uint32(n))
|
||||
}
|
||||
|
||||
var insertMsg msgstream.TsMsg = &msgstream.InsertMsg{
|
||||
BaseMsg: msgstream.BaseMsg{
|
||||
HashValues: hashValues,
|
||||
},
|
||||
InsertRequest: internalpb.InsertRequest{
|
||||
MsgType: internalpb.MsgType_kInsert,
|
||||
ReqID: 0,
|
||||
CollectionName: "collection0",
|
||||
PartitionTag: "default",
|
||||
SegmentID: segmentID,
|
||||
ChannelID: int64(0),
|
||||
ProxyID: int64(0),
|
||||
Timestamps: timestamps,
|
||||
RowIDs: rowIDs,
|
||||
RowData: insertRowBlob,
|
||||
},
|
||||
}
|
||||
insertMsgPack := msgstream.MsgPack{
|
||||
BeginTs: 0,
|
||||
EndTs: math.MaxUint64,
|
||||
Msgs: []msgstream.TsMsg{insertMsg},
|
||||
}
|
||||
|
||||
// generate timeTick
|
||||
timeTickMsg := &msgstream.TimeTickMsg{
|
||||
BaseMsg: msgstream.BaseMsg{
|
||||
BeginTimestamp: 0,
|
||||
EndTimestamp: 0,
|
||||
HashValues: []uint32{0},
|
||||
},
|
||||
TimeTickMsg: internalpb.TimeTickMsg{
|
||||
MsgType: internalpb.MsgType_kTimeTick,
|
||||
PeerID: UniqueID(0),
|
||||
Timestamp: math.MaxUint64,
|
||||
},
|
||||
}
|
||||
timeTickMsgPack := &msgstream.MsgPack{
|
||||
Msgs: []msgstream.TsMsg{timeTickMsg},
|
||||
}
|
||||
|
||||
// pulsar produce
|
||||
insertChannels := Params.InsertChannelNames
|
||||
ddChannels := Params.DDChannelNames
|
||||
|
||||
insertStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
|
||||
insertStream.SetPulsarClient(Params.PulsarAddress)
|
||||
insertStream.CreatePulsarProducers(insertChannels)
|
||||
ddStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
|
||||
ddStream.SetPulsarClient(Params.PulsarAddress)
|
||||
ddStream.CreatePulsarProducers(ddChannels)
|
||||
|
||||
var insertMsgStream msgstream.MsgStream = insertStream
|
||||
insertMsgStream.Start()
|
||||
var ddMsgStream msgstream.MsgStream = ddStream
|
||||
ddMsgStream.Start()
|
||||
|
||||
err := insertMsgStream.Produce(&insertMsgPack)
|
||||
assert.NoError(t, err)
|
||||
err = insertMsgStream.Broadcast(timeTickMsgPack)
|
||||
assert.NoError(t, err)
|
||||
err = ddMsgStream.Broadcast(timeTickMsgPack)
|
||||
assert.NoError(t, err)
|
||||
|
||||
//generate search data and send search msg
|
||||
searchRowData := indexRowData[42*(DIM/8) : 43*(DIM/8)]
|
||||
dslString := "{\"bool\": { \n\"vector\": {\n \"vec\": {\n \"metric_type\": \"JACCARD\", \n \"params\": {\n \"nprobe\": 10 \n},\n \"query\": \"$0\",\"topk\": 10 \n } \n } \n } \n }"
|
||||
placeholderValue := servicepb.PlaceholderValue{
|
||||
Tag: "$0",
|
||||
Type: servicepb.PlaceholderType_VECTOR_BINARY,
|
||||
Values: [][]byte{searchRowData},
|
||||
}
|
||||
placeholderGroup := servicepb.PlaceholderGroup{
|
||||
Placeholders: []*servicepb.PlaceholderValue{&placeholderValue},
|
||||
}
|
||||
placeGroupByte, err := proto.Marshal(&placeholderGroup)
|
||||
if err != nil {
|
||||
log.Print("marshal placeholderGroup failed")
|
||||
}
|
||||
query := servicepb.Query{
|
||||
CollectionName: "collection0",
|
||||
PartitionTags: []string{"default"},
|
||||
Dsl: dslString,
|
||||
PlaceholderGroup: placeGroupByte,
|
||||
}
|
||||
queryByte, err := proto.Marshal(&query)
|
||||
if err != nil {
|
||||
log.Print("marshal query failed")
|
||||
}
|
||||
blob := commonpb.Blob{
|
||||
Value: queryByte,
|
||||
}
|
||||
fn := func(n int64) *msgstream.MsgPack {
|
||||
searchMsg := &msgstream.SearchMsg{
|
||||
BaseMsg: msgstream.BaseMsg{
|
||||
HashValues: []uint32{0},
|
||||
},
|
||||
SearchRequest: internalpb.SearchRequest{
|
||||
MsgType: internalpb.MsgType_kSearch,
|
||||
ReqID: n,
|
||||
ProxyID: int64(1),
|
||||
Timestamp: uint64(msgLength),
|
||||
ResultChannelID: int64(0),
|
||||
Query: &blob,
|
||||
},
|
||||
}
|
||||
return &msgstream.MsgPack{
|
||||
Msgs: []msgstream.TsMsg{searchMsg},
|
||||
}
|
||||
}
|
||||
searchStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
|
||||
searchStream.SetPulsarClient(Params.PulsarAddress)
|
||||
searchStream.CreatePulsarProducers(newSearchChannelNames)
|
||||
searchStream.Start()
|
||||
err = searchStream.Produce(fn(1))
|
||||
assert.NoError(t, err)
|
||||
|
||||
//get search result
|
||||
searchResultStream := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
|
||||
searchResultStream.SetPulsarClient(Params.PulsarAddress)
|
||||
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
|
||||
searchResultStream.CreatePulsarConsumers(newSearchResultChannelNames, "loadIndexTestSubSearchResult2", unmarshalDispatcher, receiveBufSize)
|
||||
searchResultStream.Start()
|
||||
searchResult := searchResultStream.Consume()
|
||||
assert.NotNil(t, searchResult)
|
||||
unMarshaledHit := servicepb.Hits{}
|
||||
err = proto.Unmarshal(searchResult.Msgs[0].(*msgstream.SearchResultMsg).Hits[0], &unMarshaledHit)
|
||||
assert.Nil(t, err)
|
||||
|
||||
// gen load index message pack
|
||||
indexParams := make(map[string]string)
|
||||
indexParams["index_type"] = "BIN_IVF_FLAT"
|
||||
indexParams["index_mode"] = "cpu"
|
||||
indexParams["dim"] = "128"
|
||||
indexParams["k"] = "10"
|
||||
indexParams["nlist"] = "100"
|
||||
indexParams["nprobe"] = "10"
|
||||
indexParams["m"] = "4"
|
||||
indexParams["nbits"] = "8"
|
||||
indexParams["metric_type"] = "JACCARD"
|
||||
indexParams["SLICE_SIZE"] = "4"
|
||||
|
||||
var indexParamsKV []*commonpb.KeyValuePair
|
||||
for key, value := range indexParams {
|
||||
indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{
|
||||
Key: key,
|
||||
Value: value,
|
||||
})
|
||||
}
|
||||
|
||||
// generator index
|
||||
typeParams := make(map[string]string)
|
||||
typeParams["dim"] = "128"
|
||||
index, err := indexbuilder.NewCIndex(typeParams, indexParams)
|
||||
assert.Nil(t, err)
|
||||
err = index.BuildBinaryVecIndexWithoutIds(indexRowData)
|
||||
assert.Equal(t, err, nil)
|
||||
|
||||
option := &minioKV.Option{
|
||||
Address: Params.MinioEndPoint,
|
||||
AccessKeyID: Params.MinioAccessKeyID,
|
||||
SecretAccessKeyID: Params.MinioSecretAccessKey,
|
||||
UseSSL: Params.MinioUseSSLStr,
|
||||
BucketName: Params.MinioBucketName,
|
||||
CreateBucket: true,
|
||||
}
|
||||
|
||||
minioKV, err := minioKV.NewMinIOKV(node.queryNodeLoopCtx, option)
|
||||
assert.Equal(t, err, nil)
|
||||
//save index to minio
|
||||
binarySet, err := index.Serialize()
|
||||
assert.Equal(t, err, nil)
|
||||
indexPaths := make([]string, 0)
|
||||
for _, index := range binarySet {
|
||||
path := strconv.Itoa(int(segmentID)) + "/" + index.Key
|
||||
indexPaths = append(indexPaths, path)
|
||||
minioKV.Save(path, string(index.Value))
|
||||
}
|
||||
|
||||
//test index search result
|
||||
indexResult, err := index.QueryOnBinaryVecIndexWithParam(searchRowData, indexParams)
|
||||
assert.Equal(t, err, nil)
|
||||
|
||||
// create loadIndexClient
|
||||
fieldID := UniqueID(100)
|
||||
loadIndexChannelNames := Params.LoadIndexChannelNames
|
||||
client := client.NewLoadIndexClient(node.queryNodeLoopCtx, Params.PulsarAddress, loadIndexChannelNames)
|
||||
client.LoadIndex(indexPaths, segmentID, fieldID, "vec", indexParams)
|
||||
|
||||
// init message stream consumer and do checks
|
||||
statsMs := msgstream.NewPulsarMsgStream(node.queryNodeLoopCtx, Params.StatsReceiveBufSize)
|
||||
statsMs.SetPulsarClient(Params.PulsarAddress)
|
||||
statsMs.CreatePulsarConsumers([]string{Params.StatsChannelName}, Params.MsgChannelSubName, msgstream.NewUnmarshalDispatcher(), Params.StatsReceiveBufSize)
|
||||
statsMs.Start()
|
||||
|
||||
findFiledStats := false
|
||||
for {
|
||||
receiveMsg := msgstream.MsgStream(statsMs).Consume()
|
||||
assert.NotNil(t, receiveMsg)
|
||||
assert.NotEqual(t, len(receiveMsg.Msgs), 0)
|
||||
|
||||
for _, msg := range receiveMsg.Msgs {
|
||||
statsMsg, ok := msg.(*msgstream.QueryNodeStatsMsg)
|
||||
if statsMsg.FieldStats == nil || len(statsMsg.FieldStats) == 0 {
|
||||
continue
|
||||
}
|
||||
findFiledStats = true
|
||||
assert.Equal(t, ok, true)
|
||||
assert.Equal(t, len(statsMsg.FieldStats), 1)
|
||||
fieldStats0 := statsMsg.FieldStats[0]
|
||||
assert.Equal(t, fieldStats0.FieldID, fieldID)
|
||||
assert.Equal(t, fieldStats0.CollectionID, collectionID)
|
||||
assert.Equal(t, len(fieldStats0.IndexStats), 1)
|
||||
indexStats0 := fieldStats0.IndexStats[0]
|
||||
params := indexStats0.IndexParams
|
||||
// sort index params by key
|
||||
sort.Slice(indexParamsKV, func(i, j int) bool { return indexParamsKV[i].Key < indexParamsKV[j].Key })
|
||||
indexEqual := node.loadIndexService.indexParamsEqual(params, indexParamsKV)
|
||||
assert.Equal(t, indexEqual, true)
|
||||
}
|
||||
|
||||
if findFiledStats {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
err = searchStream.Produce(fn(2))
|
||||
assert.NoError(t, err)
|
||||
searchResult = searchResultStream.Consume()
|
||||
assert.NotNil(t, searchResult)
|
||||
err = proto.Unmarshal(searchResult.Msgs[0].(*msgstream.SearchResultMsg).Hits[0], &unMarshaledHit)
|
||||
assert.Nil(t, err)
|
||||
|
||||
idsIndex := indexResult.IDs()
|
||||
idsSegment := unMarshaledHit.IDs
|
||||
assert.Equal(t, len(idsIndex), len(idsSegment))
|
||||
for i := 0; i < len(idsIndex); i++ {
|
||||
assert.Equal(t, idsIndex[i], idsSegment[i])
|
||||
}
|
||||
Params.SearchChannelNames = oldSearchChannelNames
|
||||
Params.SearchResultChannelNames = oldSearchResultChannelNames
|
||||
Params.LoadIndexChannelNames = oldLoadIndexChannelNames
|
||||
Params.StatsChannelName = oldStatsChannelName
|
||||
fmt.Println("loadIndex binaryVector test Done!")
|
||||
|
||||
defer assert.Equal(t, findFiledStats, true)
|
||||
<-node.queryNodeLoopCtx.Done()
|
||||
node.Close()
|
||||
}
|
||||
|
|
|
@ -97,6 +97,9 @@ func (node *QueryNode) Close() {
|
|||
if node.searchService != nil {
|
||||
node.searchService.close()
|
||||
}
|
||||
if node.loadIndexService != nil {
|
||||
node.loadIndexService.close()
|
||||
}
|
||||
if node.statsService != nil {
|
||||
node.statsService.close()
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ func genTestCollectionMeta(collectionName string, collectionID UniqueID, isBinar
|
|||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: "dim",
|
||||
Value: "16",
|
||||
Value: "128",
|
||||
},
|
||||
},
|
||||
IndexParams: []*commonpb.KeyValuePair{
|
||||
|
@ -92,8 +92,12 @@ func genTestCollectionMeta(collectionName string, collectionID UniqueID, isBinar
|
|||
return &collectionMeta
|
||||
}
|
||||
|
||||
func initTestMeta(t *testing.T, node *QueryNode, collectionName string, collectionID UniqueID, segmentID UniqueID) {
|
||||
collectionMeta := genTestCollectionMeta(collectionName, collectionID, false)
|
||||
func initTestMeta(t *testing.T, node *QueryNode, collectionName string, collectionID UniqueID, segmentID UniqueID, optional ...bool) {
|
||||
isBinary := false
|
||||
if len(optional) > 0 {
|
||||
isBinary = optional[0]
|
||||
}
|
||||
collectionMeta := genTestCollectionMeta(collectionName, collectionID, isBinary)
|
||||
|
||||
schemaBlob := proto.MarshalTextString(collectionMeta.Schema)
|
||||
assert.NotEqual(t, "", schemaBlob)
|
||||
|
|
|
@ -8,6 +8,15 @@ while [ -h "$SOURCE" ]; do # resolve $SOURCE until the file is no longer a symli
|
|||
done
|
||||
ROOT_DIR="$( cd -P "$( dirname "$SOURCE" )/.." && pwd )"
|
||||
|
||||
unameOut="$(uname -s)"
|
||||
case "${unameOut}" in
|
||||
Linux*) machine=Linux;;
|
||||
Darwin*) machine=Mac;;
|
||||
CYGWIN*) machine=Cygwin;;
|
||||
MINGW*) machine=MinGw;;
|
||||
*) machine="UNKNOWN:${unameOut}"
|
||||
esac
|
||||
|
||||
# Attempt to run in the container with the same UID/GID as we have on the host,
|
||||
# as this results in the correct permissions on files created in the shared
|
||||
# volumes. This isn't always possible, however, as IDs less than 100 are
|
||||
|
@ -21,8 +30,12 @@ gid=$(id -g)
|
|||
[ "$uid" -lt 500 ] && uid=501
|
||||
[ "$gid" -lt 500 ] && gid=$uid
|
||||
|
||||
awk 'c&&c--{sub(/^/,"#")} /# Build devcontainer/{c=5} 1' $ROOT_DIR/docker-compose.yml > $ROOT_DIR/docker-compose-vscode.yml.bak
|
||||
awk 'c&&c--{sub(/^/,"#")} /# Build devcontainer/{c=5} 1' $ROOT_DIR/docker-compose.yml > $ROOT_DIR/docker-compose-vscode.yml.tmp
|
||||
|
||||
awk 'c&&c--{sub(/^/,"#")} /# Command/{c=3} 1' $ROOT_DIR/docker-compose-vscode.yml.bak > $ROOT_DIR/docker-compose-vscode.yml
|
||||
awk 'c&&c--{sub(/^/,"#")} /# Command/{c=3} 1' $ROOT_DIR/docker-compose-vscode.yml.tmp > $ROOT_DIR/docker-compose-vscode.yml
|
||||
|
||||
sed -i '.bak' "s/# user: {{ CURRENT_ID }}/user: \"$uid:$gid\"/g" $ROOT_DIR/docker-compose-vscode.yml
|
||||
if [ "${machine}" == "Mac" ];then
|
||||
sed -i '' "s/# user: {{ CURRENT_ID }}/user: \"$uid:$gid\"/g" $ROOT_DIR/docker-compose-vscode.yml
|
||||
else
|
||||
sed -i "s/# user: {{ CURRENT_ID }}/user: \"$uid:$gid\"/g" $ROOT_DIR/docker-compose-vscode.yml
|
||||
fi
|
Loading…
Reference in New Issue