mirror of https://github.com/milvus-io/milvus.git
add datanode unittest to improve code coverage (#4982)
add datanode unittest to improve code coverage Signed-off-by: yudong.cai <yudong.cai@zilliz.com>pull/4990/head
parent
5b3479fdc7
commit
51cae2c9ba
|
@ -0,0 +1,28 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
package datanode
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestAllocator_Basic(t *testing.T) {
|
||||
ms := &MasterServiceFactory{}
|
||||
allocator := newAllocator(ms)
|
||||
|
||||
t.Run("Test allocID", func(t *testing.T) {
|
||||
_, err := allocator.allocID()
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
|
@ -180,7 +180,7 @@ func (node *DataNode) WatchDmChannels(ctx context.Context, in *datapb.WatchDmCha
|
|||
return status, errors.New(status.GetReason())
|
||||
|
||||
case len(Params.InsertChannelNames) != 0:
|
||||
status.Reason = fmt.Sprintf("DataNode has %d already set insert channels!", node.NodeID)
|
||||
status.Reason = fmt.Sprintf("DataNode %d has already set insert channels!", node.NodeID)
|
||||
return status, errors.New(status.GetReason())
|
||||
|
||||
default:
|
||||
|
|
|
@ -12,453 +12,71 @@
|
|||
package datanode
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"math"
|
||||
"math/rand"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"go.uber.org/zap"
|
||||
|
||||
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
|
||||
memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
"github.com/zilliztech/milvus-distributed/internal/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||
)
|
||||
|
||||
func makeNewChannelNames(names []string, suffix string) []string {
|
||||
var ret []string
|
||||
for _, name := range names {
|
||||
ret = append(ret, name+suffix)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func refreshChannelNames() {
|
||||
Params.DDChannelNames = []string{"datanode-test"}
|
||||
Params.SegmentStatisticsChannelName = "segtment-statistics"
|
||||
Params.CompleteFlushChannelName = "flush-completed"
|
||||
Params.InsertChannelNames = []string{"intsert-a-1", "insert-b-1"}
|
||||
Params.TimeTickChannelName = "hard-timetick"
|
||||
suffix := "-test-data-node" + strconv.FormatInt(rand.Int63n(100), 10)
|
||||
Params.DDChannelNames = makeNewChannelNames(Params.DDChannelNames, suffix)
|
||||
Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix)
|
||||
}
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
func TestDataNode(t *testing.T) {
|
||||
Params.Init()
|
||||
|
||||
refreshChannelNames()
|
||||
exitCode := m.Run()
|
||||
os.Exit(exitCode)
|
||||
}
|
||||
|
||||
func newBinlogMeta() *binlogMeta {
|
||||
kvMock := memkv.NewMemoryKV()
|
||||
idAllocMock := NewAllocatorFactory(1)
|
||||
mt, _ := NewBinlogMeta(kvMock, idAllocMock)
|
||||
return mt
|
||||
}
|
||||
node := newDataNodeMock()
|
||||
node.Start()
|
||||
|
||||
func clearEtcd(rootPath string) error {
|
||||
etcdAddr := Params.EtcdAddress
|
||||
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
etcdKV := etcdkv.NewEtcdKV(etcdClient, rootPath)
|
||||
t.Run("Test WatchDmChannels", func(t *testing.T) {
|
||||
channelNames := Params.InsertChannelNames
|
||||
|
||||
err = etcdKV.RemoveWithPrefix("writer/segment")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, _, err = etcdKV.LoadWithPrefix("writer/segment")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("Clear ETCD with prefix writer/segment ")
|
||||
|
||||
err = etcdKV.RemoveWithPrefix("writer/ddl")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, _, err = etcdKV.LoadWithPrefix("writer/ddl")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("Clear ETCD with prefix writer/ddl")
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
type Factory interface {
|
||||
}
|
||||
|
||||
type MetaFactory struct {
|
||||
}
|
||||
|
||||
type DataFactory struct {
|
||||
rawData []byte
|
||||
}
|
||||
|
||||
type MasterServiceFactory struct {
|
||||
types.MasterService
|
||||
ID UniqueID
|
||||
collectionName string
|
||||
collectionID UniqueID
|
||||
}
|
||||
|
||||
func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta {
|
||||
sch := schemapb.CollectionSchema{
|
||||
Name: collectionName,
|
||||
Description: "test collection by meta factory",
|
||||
AutoID: true,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: 0,
|
||||
Name: "RowID",
|
||||
Description: "RowID field",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: "f0_tk1",
|
||||
Value: "f0_tv1",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: 1,
|
||||
Name: "Timestamp",
|
||||
Description: "Timestamp field",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: "f1_tk1",
|
||||
Value: "f1_tv1",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: 100,
|
||||
Name: "float_vector_field",
|
||||
Description: "field 100",
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: "dim",
|
||||
Value: "2",
|
||||
},
|
||||
},
|
||||
IndexParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: "indexkey",
|
||||
Value: "indexvalue",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
Name: "binary_vector_field",
|
||||
Description: "field 101",
|
||||
DataType: schemapb.DataType_BinaryVector,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: "dim",
|
||||
Value: "32",
|
||||
},
|
||||
},
|
||||
IndexParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: "indexkey",
|
||||
Value: "indexvalue",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: 102,
|
||||
Name: "bool_field",
|
||||
Description: "field 102",
|
||||
DataType: schemapb.DataType_Bool,
|
||||
TypeParams: []*commonpb.KeyValuePair{},
|
||||
IndexParams: []*commonpb.KeyValuePair{},
|
||||
},
|
||||
{
|
||||
FieldID: 103,
|
||||
Name: "int8_field",
|
||||
Description: "field 103",
|
||||
DataType: schemapb.DataType_Int8,
|
||||
TypeParams: []*commonpb.KeyValuePair{},
|
||||
IndexParams: []*commonpb.KeyValuePair{},
|
||||
},
|
||||
{
|
||||
FieldID: 104,
|
||||
Name: "int16_field",
|
||||
Description: "field 104",
|
||||
DataType: schemapb.DataType_Int16,
|
||||
TypeParams: []*commonpb.KeyValuePair{},
|
||||
IndexParams: []*commonpb.KeyValuePair{},
|
||||
},
|
||||
{
|
||||
FieldID: 105,
|
||||
Name: "int32_field",
|
||||
Description: "field 105",
|
||||
DataType: schemapb.DataType_Int32,
|
||||
TypeParams: []*commonpb.KeyValuePair{},
|
||||
IndexParams: []*commonpb.KeyValuePair{},
|
||||
},
|
||||
{
|
||||
FieldID: 106,
|
||||
Name: "int64_field",
|
||||
Description: "field 106",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
TypeParams: []*commonpb.KeyValuePair{},
|
||||
IndexParams: []*commonpb.KeyValuePair{},
|
||||
},
|
||||
{
|
||||
FieldID: 107,
|
||||
Name: "float32_field",
|
||||
Description: "field 107",
|
||||
DataType: schemapb.DataType_Float,
|
||||
TypeParams: []*commonpb.KeyValuePair{},
|
||||
IndexParams: []*commonpb.KeyValuePair{},
|
||||
},
|
||||
{
|
||||
FieldID: 108,
|
||||
Name: "float64_field",
|
||||
Description: "field 108",
|
||||
DataType: schemapb.DataType_Double,
|
||||
TypeParams: []*commonpb.KeyValuePair{},
|
||||
IndexParams: []*commonpb.KeyValuePair{},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
collection := etcdpb.CollectionMeta{
|
||||
ID: collectionID,
|
||||
Schema: &sch,
|
||||
CreateTime: Timestamp(1),
|
||||
SegmentIDs: make([]UniqueID, 0),
|
||||
PartitionIDs: []UniqueID{0},
|
||||
}
|
||||
return &collection
|
||||
}
|
||||
|
||||
func NewDataFactory() *DataFactory {
|
||||
return &DataFactory{rawData: GenRowData()}
|
||||
}
|
||||
|
||||
func GenRowData() (rawData []byte) {
|
||||
const DIM = 2
|
||||
const N = 1
|
||||
|
||||
// Float vector
|
||||
var fvector = [DIM]float32{1, 2}
|
||||
for _, ele := range fvector {
|
||||
buf := make([]byte, 4)
|
||||
binary.LittleEndian.PutUint32(buf, math.Float32bits(ele))
|
||||
rawData = append(rawData, buf...)
|
||||
}
|
||||
|
||||
// Binary vector
|
||||
// Dimension of binary vector is 32
|
||||
// size := 4, = 32 / 8
|
||||
var bvector = []byte{255, 255, 255, 0}
|
||||
rawData = append(rawData, bvector...)
|
||||
|
||||
// Bool
|
||||
var fieldBool = true
|
||||
buf := new(bytes.Buffer)
|
||||
if err := binary.Write(buf, binary.LittleEndian, fieldBool); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
rawData = append(rawData, buf.Bytes()...)
|
||||
|
||||
// int8
|
||||
var dataInt8 int8 = 100
|
||||
bint8 := new(bytes.Buffer)
|
||||
if err := binary.Write(bint8, binary.LittleEndian, dataInt8); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
rawData = append(rawData, bint8.Bytes()...)
|
||||
|
||||
// int16
|
||||
var dataInt16 int16 = 200
|
||||
bint16 := new(bytes.Buffer)
|
||||
if err := binary.Write(bint16, binary.LittleEndian, dataInt16); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
rawData = append(rawData, bint16.Bytes()...)
|
||||
|
||||
// int32
|
||||
var dataInt32 int32 = 300
|
||||
bint32 := new(bytes.Buffer)
|
||||
if err := binary.Write(bint32, binary.LittleEndian, dataInt32); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
rawData = append(rawData, bint32.Bytes()...)
|
||||
|
||||
// int64
|
||||
var dataInt64 int64 = 400
|
||||
bint64 := new(bytes.Buffer)
|
||||
if err := binary.Write(bint64, binary.LittleEndian, dataInt64); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
rawData = append(rawData, bint64.Bytes()...)
|
||||
|
||||
// float32
|
||||
var datafloat float32 = 1.1
|
||||
bfloat32 := new(bytes.Buffer)
|
||||
if err := binary.Write(bfloat32, binary.LittleEndian, datafloat); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
rawData = append(rawData, bfloat32.Bytes()...)
|
||||
|
||||
// float64
|
||||
var datafloat64 float64 = 2.2
|
||||
bfloat64 := new(bytes.Buffer)
|
||||
if err := binary.Write(bfloat64, binary.LittleEndian, datafloat64); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
rawData = append(rawData, bfloat64.Bytes()...)
|
||||
log.Debug("Rawdata length:", zap.Int("Length of rawData", len(rawData)))
|
||||
return
|
||||
}
|
||||
|
||||
// n: number of TsinsertMsgs to generate
|
||||
func (df *DataFactory) GetMsgStreamTsInsertMsgs(n int) (inMsgs []msgstream.TsMsg) {
|
||||
for i := 0; i < n; i++ {
|
||||
var msg msgstream.TsMsg = &msgstream.InsertMsg{
|
||||
BaseMsg: msgstream.BaseMsg{
|
||||
HashValues: []uint32{uint32(i)},
|
||||
},
|
||||
InsertRequest: internalpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Insert,
|
||||
MsgID: 0, // GOOSE TODO
|
||||
Timestamp: Timestamp(i + 1000),
|
||||
SourceID: 0,
|
||||
},
|
||||
CollectionName: "col1", // GOOSE TODO
|
||||
PartitionName: "default",
|
||||
SegmentID: 1, // GOOSE TODO
|
||||
ChannelID: "0", // GOOSE TODO
|
||||
Timestamps: []Timestamp{Timestamp(i + 1000)},
|
||||
RowIDs: []UniqueID{UniqueID(i)},
|
||||
RowData: []*commonpb.Blob{{Value: df.rawData}},
|
||||
},
|
||||
req := &datapb.WatchDmChannelsRequest{
|
||||
ChannelNames: channelNames,
|
||||
}
|
||||
inMsgs = append(inMsgs, msg)
|
||||
}
|
||||
return
|
||||
}
|
||||
_, err1 := node.WatchDmChannels(node.ctx, req)
|
||||
assert.Error(t, err1)
|
||||
|
||||
// n: number of insertMsgs to generate
|
||||
func (df *DataFactory) GetMsgStreamInsertMsgs(n int) (inMsgs []*msgstream.InsertMsg) {
|
||||
for i := 0; i < n; i++ {
|
||||
var msg = &msgstream.InsertMsg{
|
||||
BaseMsg: msgstream.BaseMsg{
|
||||
HashValues: []uint32{uint32(i)},
|
||||
},
|
||||
InsertRequest: internalpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Insert,
|
||||
MsgID: 0, // GOOSE TODO
|
||||
Timestamp: Timestamp(i + 1000),
|
||||
SourceID: 0,
|
||||
},
|
||||
CollectionName: "col1", // GOOSE TODO
|
||||
PartitionName: "default",
|
||||
SegmentID: 1, // GOOSE TODO
|
||||
ChannelID: "0", // GOOSE TODO
|
||||
Timestamps: []Timestamp{Timestamp(i + 1000)},
|
||||
RowIDs: []UniqueID{UniqueID(i)},
|
||||
RowData: []*commonpb.Blob{{Value: df.rawData}},
|
||||
},
|
||||
node.UpdateStateCode(internalpb.StateCode_Initializing)
|
||||
_, err2 := node.WatchDmChannels(node.ctx, req)
|
||||
assert.Error(t, err2)
|
||||
|
||||
Params.InsertChannelNames = []string{}
|
||||
status, err3 := node.WatchDmChannels(node.ctx, req)
|
||||
assert.NoError(t, err3)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
|
||||
|
||||
Params.InsertChannelNames = channelNames
|
||||
})
|
||||
|
||||
t.Run("Test GetComponentStates", func(t *testing.T) {
|
||||
stat, err := node.GetComponentStates(node.ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, stat.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("Test FlushSegments", func(t *testing.T) {
|
||||
req := &datapb.FlushSegmentsRequest{
|
||||
Base: &commonpb.MsgBase{},
|
||||
DbID: 0,
|
||||
CollectionID: 1,
|
||||
SegmentIDs: []int64{0, 1},
|
||||
}
|
||||
inMsgs = append(inMsgs, msg)
|
||||
}
|
||||
return
|
||||
}
|
||||
status, err := node.FlushSegments(node.ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
|
||||
})
|
||||
|
||||
type AllocatorFactory struct {
|
||||
sync.Mutex
|
||||
r *rand.Rand
|
||||
}
|
||||
t.Run("Test GetTimeTickChannel", func(t *testing.T) {
|
||||
_, err := node.GetTimeTickChannel(node.ctx)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
func NewAllocatorFactory(id ...UniqueID) *AllocatorFactory {
|
||||
f := &AllocatorFactory{
|
||||
r: rand.New(rand.NewSource(time.Now().UnixNano())),
|
||||
}
|
||||
t.Run("Test GetStatisticsChannel", func(t *testing.T) {
|
||||
_, err := node.GetStatisticsChannel(node.ctx)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
return f
|
||||
}
|
||||
|
||||
func (alloc *AllocatorFactory) allocID() (UniqueID, error) {
|
||||
alloc.Lock()
|
||||
defer alloc.Unlock()
|
||||
return alloc.r.Int63n(1000000), nil
|
||||
}
|
||||
|
||||
func (m *MasterServiceFactory) setID(id UniqueID) {
|
||||
m.ID = id // GOOSE TODO: random ID generator
|
||||
}
|
||||
|
||||
func (m *MasterServiceFactory) setCollectionID(id UniqueID) {
|
||||
m.collectionID = id
|
||||
}
|
||||
|
||||
func (m *MasterServiceFactory) setCollectionName(name string) {
|
||||
m.collectionName = name
|
||||
}
|
||||
|
||||
func (m *MasterServiceFactory) AllocID(ctx context.Context, in *masterpb.AllocIDRequest) (*masterpb.AllocIDResponse, error) {
|
||||
resp := &masterpb.AllocIDResponse{
|
||||
Status: &commonpb.Status{},
|
||||
ID: m.ID,
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (m *MasterServiceFactory) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
|
||||
resp := &milvuspb.ShowCollectionsResponse{
|
||||
Status: &commonpb.Status{},
|
||||
CollectionNames: []string{m.collectionName},
|
||||
}
|
||||
return resp, nil
|
||||
|
||||
}
|
||||
func (m *MasterServiceFactory) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
|
||||
f := MetaFactory{}
|
||||
meta := f.CollectionMetaFactory(m.collectionID, m.collectionName)
|
||||
resp := &milvuspb.DescribeCollectionResponse{
|
||||
Status: &commonpb.Status{},
|
||||
CollectionID: m.collectionID,
|
||||
Schema: meta.Schema,
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (m *MasterServiceFactory) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
|
||||
return &internalpb.ComponentStates{
|
||||
State: &internalpb.ComponentInfo{},
|
||||
SubcomponentStates: make([]*internalpb.ComponentInfo, 0),
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
},
|
||||
}, nil
|
||||
<-node.ctx.Done()
|
||||
node.Stop()
|
||||
}
|
||||
|
|
|
@ -0,0 +1,486 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
package datanode
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"math"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"go.uber.org/zap"
|
||||
|
||||
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
|
||||
memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem"
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
"github.com/zilliztech/milvus-distributed/internal/types"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||
)
|
||||
|
||||
const ctxTimeInMillisecond = 5000
|
||||
const debug = false
|
||||
|
||||
func newDataNodeMock() *DataNode {
|
||||
var ctx context.Context
|
||||
|
||||
if debug {
|
||||
ctx = context.Background()
|
||||
} else {
|
||||
var cancel context.CancelFunc
|
||||
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
|
||||
ctx, cancel = context.WithDeadline(context.Background(), d)
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
cancel()
|
||||
}()
|
||||
}
|
||||
|
||||
msFactory := msgstream.NewPmsFactory()
|
||||
node := NewDataNode(ctx, msFactory)
|
||||
replica := newReplica()
|
||||
|
||||
ms := &MasterServiceFactory{
|
||||
ID: 0,
|
||||
collectionID: 1,
|
||||
collectionName: "collection-1",
|
||||
}
|
||||
node.SetMasterServiceInterface(ms)
|
||||
|
||||
ds := &DataServiceFactory{}
|
||||
node.SetDataServiceInterface(ds)
|
||||
|
||||
var alloc allocatorInterface = NewAllocatorFactory(100)
|
||||
|
||||
chanSize := 100
|
||||
flushChan := make(chan *flushMsg, chanSize)
|
||||
node.flushChan = flushChan
|
||||
node.dataSyncService = newDataSyncService(node.ctx, flushChan, replica, alloc, node.msFactory)
|
||||
node.dataSyncService.init()
|
||||
node.metaService = newMetaService(node.ctx, replica, node.masterService)
|
||||
node.replica = replica
|
||||
|
||||
return node
|
||||
}
|
||||
|
||||
func makeNewChannelNames(names []string, suffix string) []string {
|
||||
var ret []string
|
||||
for _, name := range names {
|
||||
ret = append(ret, name+suffix)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func refreshChannelNames() {
|
||||
Params.DDChannelNames = []string{"datanode-test"}
|
||||
Params.SegmentStatisticsChannelName = "segment-statistics"
|
||||
Params.CompleteFlushChannelName = "flush-completed"
|
||||
Params.InsertChannelNames = []string{"intsert-a-1", "insert-b-1"}
|
||||
Params.TimeTickChannelName = "hard-timetick"
|
||||
suffix := "-test-data-node" + strconv.FormatInt(rand.Int63n(100), 10)
|
||||
Params.DDChannelNames = makeNewChannelNames(Params.DDChannelNames, suffix)
|
||||
Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix)
|
||||
}
|
||||
|
||||
func newBinlogMeta() *binlogMeta {
|
||||
kvMock := memkv.NewMemoryKV()
|
||||
idAllocMock := NewAllocatorFactory(1)
|
||||
mt, _ := NewBinlogMeta(kvMock, idAllocMock)
|
||||
return mt
|
||||
}
|
||||
|
||||
func clearEtcd(rootPath string) error {
|
||||
etcdAddr := Params.EtcdAddress
|
||||
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
etcdKV := etcdkv.NewEtcdKV(etcdClient, rootPath)
|
||||
|
||||
err = etcdKV.RemoveWithPrefix("writer/segment")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, _, err = etcdKV.LoadWithPrefix("writer/segment")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("Clear ETCD with prefix writer/segment ")
|
||||
|
||||
err = etcdKV.RemoveWithPrefix("writer/ddl")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, _, err = etcdKV.LoadWithPrefix("writer/ddl")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("Clear ETCD with prefix writer/ddl")
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
type MetaFactory struct {
|
||||
}
|
||||
|
||||
type DataFactory struct {
|
||||
rawData []byte
|
||||
}
|
||||
|
||||
type MasterServiceFactory struct {
|
||||
types.MasterService
|
||||
ID UniqueID
|
||||
collectionName string
|
||||
collectionID UniqueID
|
||||
}
|
||||
|
||||
type DataServiceFactory struct {
|
||||
types.DataService
|
||||
}
|
||||
|
||||
func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta {
|
||||
sch := schemapb.CollectionSchema{
|
||||
Name: collectionName,
|
||||
Description: "test collection by meta factory",
|
||||
AutoID: true,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: 0,
|
||||
Name: "RowID",
|
||||
Description: "RowID field",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: "f0_tk1",
|
||||
Value: "f0_tv1",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: 1,
|
||||
Name: "Timestamp",
|
||||
Description: "Timestamp field",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: "f1_tk1",
|
||||
Value: "f1_tv1",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: 100,
|
||||
Name: "float_vector_field",
|
||||
Description: "field 100",
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: "dim",
|
||||
Value: "2",
|
||||
},
|
||||
},
|
||||
IndexParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: "indexkey",
|
||||
Value: "indexvalue",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
Name: "binary_vector_field",
|
||||
Description: "field 101",
|
||||
DataType: schemapb.DataType_BinaryVector,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: "dim",
|
||||
Value: "32",
|
||||
},
|
||||
},
|
||||
IndexParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: "indexkey",
|
||||
Value: "indexvalue",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: 102,
|
||||
Name: "bool_field",
|
||||
Description: "field 102",
|
||||
DataType: schemapb.DataType_Bool,
|
||||
TypeParams: []*commonpb.KeyValuePair{},
|
||||
IndexParams: []*commonpb.KeyValuePair{},
|
||||
},
|
||||
{
|
||||
FieldID: 103,
|
||||
Name: "int8_field",
|
||||
Description: "field 103",
|
||||
DataType: schemapb.DataType_Int8,
|
||||
TypeParams: []*commonpb.KeyValuePair{},
|
||||
IndexParams: []*commonpb.KeyValuePair{},
|
||||
},
|
||||
{
|
||||
FieldID: 104,
|
||||
Name: "int16_field",
|
||||
Description: "field 104",
|
||||
DataType: schemapb.DataType_Int16,
|
||||
TypeParams: []*commonpb.KeyValuePair{},
|
||||
IndexParams: []*commonpb.KeyValuePair{},
|
||||
},
|
||||
{
|
||||
FieldID: 105,
|
||||
Name: "int32_field",
|
||||
Description: "field 105",
|
||||
DataType: schemapb.DataType_Int32,
|
||||
TypeParams: []*commonpb.KeyValuePair{},
|
||||
IndexParams: []*commonpb.KeyValuePair{},
|
||||
},
|
||||
{
|
||||
FieldID: 106,
|
||||
Name: "int64_field",
|
||||
Description: "field 106",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
TypeParams: []*commonpb.KeyValuePair{},
|
||||
IndexParams: []*commonpb.KeyValuePair{},
|
||||
},
|
||||
{
|
||||
FieldID: 107,
|
||||
Name: "float32_field",
|
||||
Description: "field 107",
|
||||
DataType: schemapb.DataType_Float,
|
||||
TypeParams: []*commonpb.KeyValuePair{},
|
||||
IndexParams: []*commonpb.KeyValuePair{},
|
||||
},
|
||||
{
|
||||
FieldID: 108,
|
||||
Name: "float64_field",
|
||||
Description: "field 108",
|
||||
DataType: schemapb.DataType_Double,
|
||||
TypeParams: []*commonpb.KeyValuePair{},
|
||||
IndexParams: []*commonpb.KeyValuePair{},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
collection := etcdpb.CollectionMeta{
|
||||
ID: collectionID,
|
||||
Schema: &sch,
|
||||
CreateTime: Timestamp(1),
|
||||
SegmentIDs: make([]UniqueID, 0),
|
||||
PartitionIDs: []UniqueID{0},
|
||||
}
|
||||
return &collection
|
||||
}
|
||||
|
||||
func NewDataFactory() *DataFactory {
|
||||
return &DataFactory{rawData: GenRowData()}
|
||||
}
|
||||
|
||||
func GenRowData() (rawData []byte) {
|
||||
const DIM = 2
|
||||
const N = 1
|
||||
|
||||
// Float vector
|
||||
var fvector = [DIM]float32{1, 2}
|
||||
for _, ele := range fvector {
|
||||
buf := make([]byte, 4)
|
||||
binary.LittleEndian.PutUint32(buf, math.Float32bits(ele))
|
||||
rawData = append(rawData, buf...)
|
||||
}
|
||||
|
||||
// Binary vector
|
||||
// Dimension of binary vector is 32
|
||||
// size := 4, = 32 / 8
|
||||
var bvector = []byte{255, 255, 255, 0}
|
||||
rawData = append(rawData, bvector...)
|
||||
|
||||
// Bool
|
||||
var fieldBool = true
|
||||
buf := new(bytes.Buffer)
|
||||
if err := binary.Write(buf, binary.LittleEndian, fieldBool); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
rawData = append(rawData, buf.Bytes()...)
|
||||
|
||||
// int8
|
||||
var dataInt8 int8 = 100
|
||||
bint8 := new(bytes.Buffer)
|
||||
if err := binary.Write(bint8, binary.LittleEndian, dataInt8); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
rawData = append(rawData, bint8.Bytes()...)
|
||||
|
||||
// int16
|
||||
var dataInt16 int16 = 200
|
||||
bint16 := new(bytes.Buffer)
|
||||
if err := binary.Write(bint16, binary.LittleEndian, dataInt16); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
rawData = append(rawData, bint16.Bytes()...)
|
||||
|
||||
// int32
|
||||
var dataInt32 int32 = 300
|
||||
bint32 := new(bytes.Buffer)
|
||||
if err := binary.Write(bint32, binary.LittleEndian, dataInt32); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
rawData = append(rawData, bint32.Bytes()...)
|
||||
|
||||
// int64
|
||||
var dataInt64 int64 = 400
|
||||
bint64 := new(bytes.Buffer)
|
||||
if err := binary.Write(bint64, binary.LittleEndian, dataInt64); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
rawData = append(rawData, bint64.Bytes()...)
|
||||
|
||||
// float32
|
||||
var datafloat float32 = 1.1
|
||||
bfloat32 := new(bytes.Buffer)
|
||||
if err := binary.Write(bfloat32, binary.LittleEndian, datafloat); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
rawData = append(rawData, bfloat32.Bytes()...)
|
||||
|
||||
// float64
|
||||
var datafloat64 = 2.2
|
||||
bfloat64 := new(bytes.Buffer)
|
||||
if err := binary.Write(bfloat64, binary.LittleEndian, datafloat64); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
rawData = append(rawData, bfloat64.Bytes()...)
|
||||
log.Debug("Rawdata length:", zap.Int("Length of rawData", len(rawData)))
|
||||
return
|
||||
}
|
||||
|
||||
func (df *DataFactory) GenMsgStreamInsertMsg(idx int) *msgstream.InsertMsg {
|
||||
var msg = &msgstream.InsertMsg{
|
||||
BaseMsg: msgstream.BaseMsg{
|
||||
HashValues: []uint32{uint32(idx)},
|
||||
},
|
||||
InsertRequest: internalpb.InsertRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Insert,
|
||||
MsgID: 0, // GOOSE TODO
|
||||
Timestamp: Timestamp(idx + 1000),
|
||||
SourceID: 0,
|
||||
},
|
||||
CollectionName: "col1", // GOOSE TODO
|
||||
PartitionName: "default",
|
||||
SegmentID: 1, // GOOSE TODO
|
||||
ChannelID: "0", // GOOSE TODO
|
||||
Timestamps: []Timestamp{Timestamp(idx + 1000)},
|
||||
RowIDs: []UniqueID{UniqueID(idx)},
|
||||
RowData: []*commonpb.Blob{{Value: df.rawData}},
|
||||
},
|
||||
}
|
||||
return msg
|
||||
}
|
||||
|
||||
func (df *DataFactory) GetMsgStreamTsInsertMsgs(n int) (inMsgs []msgstream.TsMsg) {
|
||||
for i := 0; i < n; i++ {
|
||||
var msg = df.GenMsgStreamInsertMsg(i)
|
||||
var tsMsg msgstream.TsMsg = msg
|
||||
inMsgs = append(inMsgs, tsMsg)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (df *DataFactory) GetMsgStreamInsertMsgs(n int) (inMsgs []*msgstream.InsertMsg) {
|
||||
for i := 0; i < n; i++ {
|
||||
var msg = df.GenMsgStreamInsertMsg(i)
|
||||
inMsgs = append(inMsgs, msg)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type AllocatorFactory struct {
|
||||
sync.Mutex
|
||||
r *rand.Rand
|
||||
}
|
||||
|
||||
func NewAllocatorFactory(id ...UniqueID) *AllocatorFactory {
|
||||
f := &AllocatorFactory{
|
||||
r: rand.New(rand.NewSource(time.Now().UnixNano())),
|
||||
}
|
||||
|
||||
return f
|
||||
}
|
||||
|
||||
func (alloc *AllocatorFactory) allocID() (UniqueID, error) {
|
||||
alloc.Lock()
|
||||
defer alloc.Unlock()
|
||||
return alloc.r.Int63n(1000000), nil
|
||||
}
|
||||
|
||||
func (m *MasterServiceFactory) setID(id UniqueID) {
|
||||
m.ID = id // GOOSE TODO: random ID generator
|
||||
}
|
||||
|
||||
func (m *MasterServiceFactory) setCollectionID(id UniqueID) {
|
||||
m.collectionID = id
|
||||
}
|
||||
|
||||
func (m *MasterServiceFactory) setCollectionName(name string) {
|
||||
m.collectionName = name
|
||||
}
|
||||
|
||||
func (m *MasterServiceFactory) AllocID(ctx context.Context, in *masterpb.AllocIDRequest) (*masterpb.AllocIDResponse, error) {
|
||||
resp := &masterpb.AllocIDResponse{
|
||||
Status: &commonpb.Status{},
|
||||
ID: m.ID,
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (m *MasterServiceFactory) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
|
||||
resp := &milvuspb.ShowCollectionsResponse{
|
||||
Status: &commonpb.Status{},
|
||||
CollectionNames: []string{m.collectionName},
|
||||
}
|
||||
return resp, nil
|
||||
|
||||
}
|
||||
|
||||
func (m *MasterServiceFactory) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
|
||||
f := MetaFactory{}
|
||||
meta := f.CollectionMetaFactory(m.collectionID, m.collectionName)
|
||||
resp := &milvuspb.DescribeCollectionResponse{
|
||||
Status: &commonpb.Status{},
|
||||
CollectionID: m.collectionID,
|
||||
Schema: meta.Schema,
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (m *MasterServiceFactory) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
|
||||
return &internalpb.ComponentStates{
|
||||
State: &internalpb.ComponentInfo{},
|
||||
SubcomponentStates: make([]*internalpb.ComponentInfo, 0),
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
},
|
||||
}, nil
|
||||
}
|
Loading…
Reference in New Issue