milvus/internal/querynode/data_sync_service_test.go

127 lines
3.5 KiB
Go

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
import (
"encoding/binary"
"math"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
)
// NOTE: start pulsar before test
func TestDataSyncService_Start(t *testing.T) {
collectionID := UniqueID(0)
node := newQueryNodeMock()
initTestMeta(t, node, 0, 0)
// test data generate
const msgLength = 10
const DIM = 16
const N = 10
var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
var rawData []byte
for _, ele := range vec {
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, math.Float32bits(ele))
rawData = append(rawData, buf...)
}
bs := make([]byte, 4)
binary.LittleEndian.PutUint32(bs, 1)
rawData = append(rawData, bs...)
var records []*commonpb.Blob
for i := 0; i < N; i++ {
blob := &commonpb.Blob{
Value: rawData,
}
records = append(records, blob)
}
// messages generate
insertMessages := make([]msgstream.TsMsg, 0)
for i := 0; i < msgLength; i++ {
var msg msgstream.TsMsg = &msgstream.InsertMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{
uint32(i), uint32(i),
},
},
InsertRequest: internalpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
MsgID: 0,
Timestamp: uint64(i + 1000),
SourceID: 0,
},
CollectionID: collectionID,
PartitionID: defaultPartitionID,
SegmentID: int64(0),
ChannelID: "0",
Timestamps: []uint64{uint64(i + 1000), uint64(i + 1000)},
RowIDs: []int64{int64(i), int64(i)},
RowData: []*commonpb.Blob{
{Value: rawData},
{Value: rawData},
},
},
}
insertMessages = append(insertMessages, msg)
}
// generate timeTick
timeTickMsgPack := msgstream.MsgPack{}
baseMsg := msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{0},
}
timeTickResult := internalpb.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
MsgID: 0,
Timestamp: math.MaxUint64,
SourceID: 0,
},
}
timeTickMsg := &msgstream.TimeTickMsg{
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg)
// pulsar produce
const receiveBufSize = 1024
pulsarURL := Params.PulsarAddress
msFactory := msgstream.NewPmsFactory()
m := map[string]interface{}{
"receiveBufSize": receiveBufSize,
"pulsarAddress": pulsarURL,
"pulsarBufSize": 1024}
err := msFactory.SetParams(m)
assert.Nil(t, err)
channels := []Channel{"0"}
err = node.streaming.dataSyncService.addCollectionFlowGraph(collectionID, channels)
assert.NoError(t, err)
err = node.streaming.dataSyncService.startCollectionFlowGraph(collectionID, channels)
assert.NoError(t, err)
<-node.queryNodeLoopCtx.Done()
node.Stop()
}