Refactor query service’s meta interface

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/4973/head^2
xige-16 2021-02-02 11:52:41 +08:00 committed by yefu.chen
parent 46f14a5e69
commit 4cd315408b
8 changed files with 610 additions and 133 deletions

View File

@ -611,7 +611,7 @@ func (ms *PulsarTtMsgStream) findTimeTick(consumer Consumer,
}
// set pulsar info to tsMsg
tsMsg.SetPosition(&msgstream.MsgPosition{
ChannelName: pulsarMsg.Topic(),
ChannelName: filepath.Base(pulsarMsg.Topic()),
MsgID: typeutil.PulsarMsgIDToString(pulsarMsg.ID()),
})

View File

@ -140,6 +140,8 @@ func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) {
partitionName := msg.PartitionName
err := ddNode.replica.addPartition2(collectionID, partitionName)
// TODO:: add partition by partitionID
//err := ddNode.replica.addPartition(collectionID, msg.PartitionID)
if err != nil {
log.Println(err)
return

View File

@ -474,7 +474,9 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S
}
return status, err
}
return nil, nil
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}, nil
}
func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) {
@ -500,5 +502,7 @@ func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*comm
return status, err
}
}
return nil, nil
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}, nil
}

View File

@ -0,0 +1,443 @@
package queryservice
import (
"context"
"encoding/binary"
"math"
"strconv"
"testing"
"github.com/golang/protobuf/proto"
minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
//generate insert data
const msgLength = 100
const receiveBufSize = 1024
const pulsarBufSize = 1024
const DIM = 16
func genInsert(collectionID int64, partitionID int64, segmentID int64, timeStart int) (*msgstream.MsgPack, *msgstream.MsgPack) {
msgs := make([]msgstream.TsMsg, 0)
for n := timeStart; n < timeStart+msgLength; n++ {
rowData := make([]byte, 0)
id := make([]byte, 8)
binary.BigEndian.PutUint64(id, uint64(n))
rowData = append(rowData, id...)
time := make([]byte, 8)
binary.BigEndian.PutUint64(time, uint64(n))
rowData = append(rowData, time...)
for i := 0; i < DIM; i++ {
vec := make([]byte, 4)
binary.BigEndian.PutUint32(vec, math.Float32bits(float32(n*i)))
rowData = append(rowData, vec...)
}
age := make([]byte, 4)
binary.BigEndian.PutUint32(age, 1)
rowData = append(rowData, age...)
blob := &commonpb.Blob{
Value: rowData,
}
var insertMsg msgstream.TsMsg = &msgstream.InsertMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{uint32(n)},
},
InsertRequest: internalpb2.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kInsert,
MsgID: 0,
Timestamp: uint64(n),
SourceID: 0,
},
CollectionID: collectionID,
PartitionID: partitionID,
SegmentID: segmentID,
ChannelID: "0",
Timestamps: []uint64{uint64(n)},
RowIDs: []int64{int64(n)},
RowData: []*commonpb.Blob{blob},
},
}
msgs = append(msgs, insertMsg)
}
insertMsgPack := &msgstream.MsgPack{
BeginTs: uint64(timeStart),
EndTs: uint64(timeStart + msgLength),
Msgs: msgs,
}
// generate timeTick
timeTickMsg := &msgstream.TimeTickMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{0},
},
TimeTickMsg: internalpb2.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kTimeTick,
MsgID: 0,
Timestamp: uint64(timeStart + msgLength),
SourceID: 0,
},
},
}
timeTickMsgPack := &msgstream.MsgPack{
Msgs: []msgstream.TsMsg{timeTickMsg},
}
return insertMsgPack, timeTickMsgPack
}
func genSchema(collectionID int64) *schemapb.CollectionSchema {
fieldID := schemapb.FieldSchema{
FieldID: UniqueID(0),
Name: "RowID",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT64,
}
fieldTime := schemapb.FieldSchema{
FieldID: UniqueID(1),
Name: "Timestamp",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT64,
}
fieldVec := schemapb.FieldSchema{
FieldID: UniqueID(100),
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "16",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: "metric_type",
Value: "L2",
},
},
}
fieldInt := schemapb.FieldSchema{
FieldID: UniqueID(101),
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
}
return &schemapb.CollectionSchema{
Name: "collection-" + strconv.FormatInt(collectionID, 10),
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldID, &fieldTime, &fieldVec, &fieldInt,
},
}
}
func genCreateCollection(collectionID int64) *msgstream.MsgPack {
schema := genSchema(collectionID)
byteSchema, err := proto.Marshal(schema)
if err != nil {
panic(err)
}
request := internalpb2.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kCreateCollection,
Timestamp: uint64(10),
},
DbID: 0,
CollectionID: collectionID,
Schema: byteSchema,
}
msg := &msgstream.CreateCollectionMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{0},
},
CreateCollectionRequest: request,
}
return &msgstream.MsgPack{
Msgs: []msgstream.TsMsg{msg},
}
}
func genCreatePartition(collectionID int64, partitionID int64) *msgstream.MsgPack {
request := internalpb2.CreatePartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kCreatePartition,
Timestamp: uint64(20),
},
DbID: 0,
CollectionID: collectionID,
PartitionID: partitionID,
}
msg := &msgstream.CreatePartitionMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{0},
},
CreatePartitionRequest: request,
}
return &msgstream.MsgPack{
Msgs: []msgstream.TsMsg{msg},
}
}
func getMinioKV(ctx context.Context) (*minioKV.MinIOKV, error) {
minioAddress := "localhost:9000"
accessKeyID := "minioadmin"
secretAccessKey := "minioadmin"
useSSL := false
bucketName := "a-bucket"
option := &minioKV.Option{
Address: minioAddress,
AccessKeyID: accessKeyID,
SecretAccessKeyID: secretAccessKey,
UseSSL: useSSL,
BucketName: bucketName,
CreateBucket: true,
}
return minioKV.NewMinIOKV(ctx, option)
}
func TestLoadCollection(t *testing.T) {
//// produce msg
//insertChannels := []string{"insert-0"}
//ddChannels := []string{"data-definition-0"}
//pulsarAddress := "pulsar://127.0.0.1:6650"
//
//insertStream := pulsarms.NewPulsarMsgStream(context.Background(), receiveBufSize)
//insertStream.SetPulsarClient(pulsarAddress)
//insertStream.CreatePulsarProducers(insertChannels)
//ddStream := pulsarms.NewPulsarMsgStream(context.Background(), receiveBufSize)
//ddStream.SetPulsarClient(pulsarAddress)
//ddStream.CreatePulsarProducers(ddChannels)
//
//var insertMsgStream msgstream.MsgStream = insertStream
//insertMsgStream.Start()
//var ddMsgStream msgstream.MsgStream = ddStream
//ddMsgStream.Start()
//
//createCollectionMsgPack := genCreateCollection(1)
//createPartitionMsgPack := genCreatePartition(1, 1)
//ddMsgStream.Produce(createCollectionMsgPack)
//ddMsgStream.Produce(createPartitionMsgPack)
//
//consumeStream := pulsarms.NewPulsarTtMsgStream(context.Background(), receiveBufSize)
//consumeStream.SetPulsarClient(pulsarAddress)
//unmarshalDispatcher := util.NewUnmarshalDispatcher()
//consumeStream.CreatePulsarConsumers(insertChannels, "test", unmarshalDispatcher, pulsarBufSize)
//consumeStream.Start()
//
//for i := 0; i < 10; i++ {
// insertMsgPack, timeTickMsgPack := genInsert(1, 1, int64(i), i*msgLength+1)
// err := insertMsgStream.Produce(insertMsgPack)
// assert.NoError(t, err)
// err = insertMsgStream.Broadcast(timeTickMsgPack)
// assert.NoError(t, err)
// err = ddMsgStream.Broadcast(timeTickMsgPack)
// assert.NoError(t, err)
//}
//
////consume msg
//segPosition := make(map[int64][]*internalpb2.MsgPosition)
//segmentData := make([]*storage.InsertData, 0)
//indexRowDatas := make([][]float32, 0)
//for i := 0; i < 10; i++ {
// msgPack := consumeStream.Consume()
// idData := make([]int64, 0)
// timestamps := make([]int64, 0)
// fieldAgeData := make([]int32, 0)
// fieldVecData := make([]float32, 0)
// for n := 0; n < msgLength; n++ {
// blob := msgPack.Msgs[n].(*msgstream.InsertMsg).RowData[0].Value
// id := binary.BigEndian.Uint64(blob[0:8])
// idData = append(idData, int64(id))
// time := binary.BigEndian.Uint64(blob[8:16])
// timestamps = append(timestamps, int64(time))
// for i := 0; i < DIM; i++ {
// bits := binary.BigEndian.Uint32(blob[16+4*i : 16+4*(i+1)])
// floatVec := math.Float32frombits(bits)
// fieldVecData = append(fieldVecData, floatVec)
// }
// ageValue := binary.BigEndian.Uint32(blob[80:84])
// fieldAgeData = append(fieldAgeData, int32(ageValue))
// }
//
// insertData := &storage.InsertData{
// Data: map[int64]storage.FieldData{
// 0: &storage.Int64FieldData{
// NumRows: msgLength,
// Data: idData,
// },
// 1: &storage.Int64FieldData{
// NumRows: msgLength,
// Data: timestamps,
// },
// 100: &storage.FloatVectorFieldData{
// NumRows: msgLength,
// Data: fieldVecData,
// Dim: DIM,
// },
// 101: &storage.Int32FieldData{
// NumRows: msgLength,
// Data: fieldAgeData,
// },
// },
// }
// segPosition[int64(i)] = msgPack.StartPositions
// segmentData = append(segmentData, insertData)
// indexRowDatas = append(indexRowDatas, fieldVecData)
//}
//
////gen inCodec
//collectionMeta := &etcdpb.CollectionMeta{
// ID: 1,
// Schema: genSchema(1),
// CreateTime: 0,
// PartitionIDs: []int64{1},
// SegmentIDs: []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
//}
//inCodec := storage.NewInsertCodec(collectionMeta)
//
//// get minio client
//minioKV, err := getMinioKV(context.Background())
//assert.Nil(t, err)
//
//// write binlog minio
//collectionStr := strconv.FormatInt(1, 10)
//for i := 0; i < 9; i++ {
// binLogs, err := inCodec.Serialize(1, storage.UniqueID(i), segmentData[i])
// assert.Nil(t, err)
// assert.Equal(t, len(binLogs), 4)
// keyPrefix := "distributed-query-test-binlog"
// segmentStr := strconv.FormatInt(int64(i), 10)
//
// for _, blob := range binLogs {
// key := path.Join(keyPrefix, collectionStr, segmentStr, blob.Key)
// err = minioKV.Save(key, string(blob.Value[:]))
// assert.Nil(t, err)
// }
//}
//
//// gen index build's indexParams
//indexParams := make(map[string]string)
//indexParams["index_type"] = "IVF_PQ"
//indexParams["index_mode"] = "cpu"
//indexParams["dim"] = "16"
//indexParams["k"] = "10"
//indexParams["nlist"] = "100"
//indexParams["nprobe"] = "10"
//indexParams["m"] = "4"
//indexParams["nbits"] = "8"
//indexParams["metric_type"] = "L2"
//indexParams["SLICE_SIZE"] = "400"
//
//var indexParamsKV []*commonpb.KeyValuePair
//for key, value := range indexParams {
// indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{
// Key: key,
// Value: value,
// })
//}
//
//// generator index and write index to minio
//for i := 0; i < 9; i++ {
// typeParams := make(map[string]string)
// typeParams["dim"] = "16"
// index, err := indexnode.NewCIndex(typeParams, indexParams)
// assert.Nil(t, err)
// err = index.BuildFloatVecIndexWithoutIds(indexRowDatas[i])
// assert.Equal(t, err, nil)
// binarySet, err := index.Serialize()
// assert.Equal(t, len(binarySet), 1)
// assert.Nil(t, err)
// keyPrefix := "distributed-query-test-index"
// segmentStr := strconv.FormatInt(int64(i), 10)
// indexStr := strconv.FormatInt(int64(i), 10)
// key := path.Join(keyPrefix, collectionStr, segmentStr, indexStr)
// minioKV.Save(key, string(binarySet[0].Value))
//}
//
////generate query service
//service, err := NewQueryService(context.Background())
//assert.Nil(t, err)
//collectionID := UniqueID(1)
//partitions := []UniqueID{1}
//col2partition := make(map[UniqueID][]UniqueID)
//col2partition[collectionID] = partitions
//segments := []UniqueID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
//partition2segment := make(map[UniqueID][]UniqueID)
//partition2segment[UniqueID(1)] = segments
//masterMock := &masterMock{
// collectionIDs: []UniqueID{1},
// col2partition: col2partition,
// partition2segment: partition2segment,
//}
//service.SetMasterService(masterMock)
//segStates := make(map[UniqueID]*datapb.SegmentStatesResponse)
//for i := 0; i < 10; i++ {
// if i != 9 {
// state := &datapb.SegmentStatesResponse{
// State: datapb.SegmentState_SegmentFlushed,
// StartPositions: segPosition[int64(i)],
// }
// segStates[UniqueID(i)] = state
// } else {
// state := &datapb.SegmentStatesResponse{
// State: datapb.SegmentState_SegmentGrowing,
// StartPositions: segPosition[int64(i)],
// }
// segStates[UniqueID(i)] = state
// }
//}
//dataMock := &dataMock{
// segmentIDs: []UniqueID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
// segmentStates: segStates,
//}
//
//service.SetDataService(dataMock)
//service.SetEnableGrpc(true)
//
//loadCollectionRequest := &querypb.LoadCollectionRequest{
// Base: &commonpb.MsgBase{
// MsgType: commonpb.MsgType_kCreateCollection,
// },
// DbID: UniqueID(0),
// CollectionID: collectionID,
//}
//
//registerRequest := &querypb.RegisterNodeRequest{
// Address: &commonpb.Address{
// Ip: "localhost",
// Port: 20010,
// },
//}
//response, err := service.RegisterNode(registerRequest)
//assert.Nil(t, err)
//assert.Equal(t, response.Status.ErrorCode, commonpb.ErrorCode_SUCCESS)
//
//status, err := service.LoadCollection(loadCollectionRequest)
//assert.Nil(t, err)
//assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_SUCCESS)
}

View File

@ -1,20 +1,17 @@
package queryservice
import (
"log"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
)
type metaReplica interface {
getCollectionIDs(dbID UniqueID) ([]UniqueID, error)
getPartitionIDs(dbID UniqueID, collectionID UniqueID) ([]UniqueID, error)
getCollection(dbID UniqueID, collectionID UniqueID) *collection
getSegmentIDs(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]UniqueID, error)
loadCollection(dbID UniqueID, collectionID UniqueID)
loadPartitions(dbID UniqueID, collectionID UniqueID, partitionIDs []UniqueID)
updatePartitionState(dbID UniqueID, collectionID UniqueID, partitionID UniqueID, state querypb.PartitionState)
getCollections(dbID UniqueID) ([]*collection, error)
getPartitions(dbID UniqueID, collectionID UniqueID) ([]*partition, error)
getSegments(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]*segment, error)
loadCollection(dbID UniqueID, collectionID UniqueID) (*collection, error)
loadPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error)
updatePartitionState(dbID UniqueID, collectionID UniqueID, partitionID UniqueID, state querypb.PartitionState) error
getPartitionStates(dbID UniqueID, collectionID UniqueID, partitionIDs []UniqueID) ([]*querypb.PartitionStates, error)
}
@ -50,150 +47,137 @@ func newMetaReplica() metaReplica {
}
}
func (mp *metaReplicaImpl) addCollection(dbID UniqueID, collectionID UniqueID) {
partitions := make(map[UniqueID]*partition)
node2channel := make(map[int][]string)
newCollection := &collection{
id: collectionID,
partitions: partitions,
node2channel: node2channel,
}
mp.db2collections[dbID] = append(mp.db2collections[dbID], newCollection)
}
func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) {
collections := mp.db2collections[dbID]
for _, collection := range collections {
if collection.id == collectionID {
partitions := collection.partitions
segments := make(map[UniqueID]*segment)
partitions[partitionID] = &partition{
id: partitionID,
state: querypb.PartitionState_NotPresent,
segments: segments,
}
return
func (mp *metaReplicaImpl) addCollection(dbID UniqueID, collectionID UniqueID) (*collection, error) {
if _, ok := mp.db2collections[dbID]; ok {
partitions := make(map[UniqueID]*partition)
node2channel := make(map[int][]string)
newCollection := &collection{
id: collectionID,
partitions: partitions,
node2channel: node2channel,
}
mp.db2collections[dbID] = append(mp.db2collections[dbID], newCollection)
return newCollection, nil
}
log.Fatal("can't find collection when add partition")
return nil, errors.New("can't find dbID when add collection")
}
func (mp *metaReplicaImpl) getCollection(dbID UniqueID, collectionID UniqueID) *collection {
for _, id := range mp.dbID {
if id == dbID {
collections := mp.db2collections[id]
for _, collection := range collections {
if collection.id == collectionID {
return collection
}
}
return nil
}
}
return nil
}
func (mp *metaReplicaImpl) getCollectionIDs(dbID UniqueID) ([]UniqueID, error) {
func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) {
if collections, ok := mp.db2collections[dbID]; ok {
collectionIDs := make([]UniqueID, 0)
for _, collection := range collections {
collectionIDs = append(collectionIDs, collection.id)
if collection.id == collectionID {
partitions := collection.partitions
segments := make(map[UniqueID]*segment)
partition := &partition{
id: partitionID,
state: querypb.PartitionState_NotPresent,
segments: segments,
}
partitions[partitionID] = partition
return partition, nil
}
}
return collectionIDs, nil
}
return nil, errors.New("can't find collection in queryService")
return nil, errors.New("can't find collection when add partition")
}
func (mp *metaReplicaImpl) getPartitionIDs(dbID UniqueID, collectionID UniqueID) ([]UniqueID, error) {
func (mp *metaReplicaImpl) getCollections(dbID UniqueID) ([]*collection, error) {
if collections, ok := mp.db2collections[dbID]; ok {
return collections, nil
}
return nil, errors.New("can't find collectionID")
}
func (mp *metaReplicaImpl) getPartitions(dbID UniqueID, collectionID UniqueID) ([]*partition, error) {
if collections, ok := mp.db2collections[dbID]; ok {
for _, collection := range collections {
if collectionID == collection.id {
partitions := collection.partitions
partitionIDs := make([]UniqueID, 0)
for _, partition := range partitions {
partitionIDs = append(partitionIDs, partition.id)
partitions := make([]*partition, 0)
for _, partition := range collection.partitions {
partitions = append(partitions, partition)
}
return partitionIDs, nil
return partitions, nil
}
}
}
return nil, errors.New("can't find partitions in queryService")
return nil, errors.New("can't find partitionIDs")
}
func (mp *metaReplicaImpl) getSegmentIDs(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]UniqueID, error) {
segmentIDs := make([]UniqueID, 0)
func (mp *metaReplicaImpl) getSegments(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]*segment, error) {
if collections, ok := mp.db2collections[dbID]; ok {
for _, collection := range collections {
if collectionID == collection.id {
if partition, ok := collection.partitions[partitionID]; ok {
segments := make([]*segment, 0)
for _, segment := range partition.segments {
segmentIDs = append(segmentIDs, segment.id)
segments = append(segments, segment)
}
return segments, nil
}
}
}
}
return segmentIDs, nil
return nil, errors.New("can't find segmentID")
}
func (mp *metaReplicaImpl) loadCollection(dbID UniqueID, collectionID UniqueID) {
collectionIDs, err := mp.getCollectionIDs(dbID)
if err != nil {
mp.addCollection(dbID, collectionID)
return
}
for _, id := range collectionIDs {
if collectionID == id {
return
func (mp *metaReplicaImpl) loadCollection(dbID UniqueID, collectionID UniqueID) (*collection, error) {
var res *collection = nil
if collections, err := mp.getCollections(dbID); err == nil {
for _, collection := range collections {
if collectionID == collection.id {
return res, nil
}
}
} else {
res, err = mp.addCollection(dbID, collectionID)
if err != nil {
return nil, err
}
}
mp.addCollection(dbID, collectionID)
return res, nil
}
func (mp *metaReplicaImpl) loadPartitions(dbID UniqueID, collectionID UniqueID, partitionIDs []UniqueID) {
func (mp *metaReplicaImpl) loadPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) {
var collection *collection = nil
var partition *partition = nil
var err error
for _, col := range mp.db2collections[dbID] {
if col.id == collectionID {
collection = col
}
}
if collection == nil {
mp.addCollection(dbID, collectionID)
for _, col := range mp.db2collections[dbID] {
if col.id == collectionID {
collection = col
}
collection, err = mp.addCollection(dbID, collectionID)
if err != nil {
return partition, err
}
}
for _, partitionID := range partitionIDs {
match := false
for _, partition := range collection.partitions {
if partition.id == partitionID {
match = true
continue
}
}
if !match {
mp.addPartition(dbID, collectionID, partitionID)
if _, ok := collection.partitions[partitionID]; !ok {
partition, err = mp.addPartition(dbID, collectionID, partitionID)
if err != nil {
return partition, err
}
return partition, nil
}
return nil, nil
}
func (mp *metaReplicaImpl) updatePartitionState(dbID UniqueID,
collectionID UniqueID,
partitionID UniqueID,
state querypb.PartitionState) {
state querypb.PartitionState) error {
for _, collection := range mp.db2collections[dbID] {
if collection.id == collectionID {
if partition, ok := collection.partitions[partitionID]; ok {
partition.state = state
return
return nil
}
}
}
log.Fatal("update partition state fail")
return errors.New("update partition state fail")
}
func (mp *metaReplicaImpl) getPartitionStates(dbID UniqueID,

View File

@ -6,17 +6,18 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
)
type queryNode struct {
type queryNodeInfo struct {
client QueryNodeInterface
insertChannels string
nodeID uint64
segments []UniqueID
dmChannelNames []string
}
func (qn *queryNode) GetComponentStates() (*internalpb2.ComponentStates, error) {
func (qn *queryNodeInfo) GetComponentStates() (*internalpb2.ComponentStates, error) {
return qn.client.GetComponentStates()
}
func (qn *queryNode) LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb.Status, error) {
func (qn *queryNodeInfo) LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb.Status, error) {
return qn.client.LoadSegments(in)
}

View File

@ -46,10 +46,9 @@ type QueryService struct {
dataServiceClient DataServiceInterface
masterServiceClient MasterServiceInterface
queryNodes []*queryNode
//TODO:: nodeID use UniqueID
numRegisterNode uint64
numQueryChannel uint64
queryNodes []*queryNodeInfo
numRegisterNode uint64
numQueryChannel uint64
stateCode atomic.Value
isInit atomic.Value
@ -115,31 +114,26 @@ func (qs *QueryService) GetStatisticsChannel() (string, error) {
func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) {
fmt.Println("register query node =", req.Address)
// TODO:: add mutex
allocatedID := qs.numRegisterNode
qs.numRegisterNode++
if allocatedID > Params.QueryNodeNum {
log.Fatal("allocated queryNodeID should lower than Params.QueryNodeNum")
}
allocatedID := uint64(len(qs.queryNodes))
registerNodeAddress := req.Address.Ip + ":" + strconv.FormatInt(req.Address.Port, 10)
var node *queryNode
var node *queryNodeInfo
if qs.enableGrpc {
client := nodeclient.NewClient(registerNodeAddress)
node = &queryNode{
node = &queryNodeInfo{
client: client,
nodeID: allocatedID,
}
} else {
client := querynode.NewQueryNode(qs.loopCtx, allocatedID)
node = &queryNode{
node = &queryNodeInfo{
client: client,
nodeID: allocatedID,
}
}
qs.queryNodes = append(qs.queryNodes, node)
// TODO:: watch dm channels
//TODO::return init params to queryNode
return &querypb.RegisterNodeResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
@ -152,9 +146,18 @@ func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb
func (qs *QueryService) ShowCollections(req *querypb.ShowCollectionRequest) (*querypb.ShowCollectionResponse, error) {
dbID := req.DbID
collectionIDs, err := qs.replica.getCollectionIDs(dbID)
collections, err := qs.replica.getCollections(dbID)
collectionIDs := make([]UniqueID, 0)
for _, collection := range collections {
collectionIDs = append(collectionIDs, collection.id)
}
if err != nil {
return nil, err
return &querypb.ShowCollectionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
},
}, err
}
return &querypb.ShowCollectionResponse{
Status: &commonpb.Status{
@ -167,13 +170,23 @@ func (qs *QueryService) ShowCollections(req *querypb.ShowCollectionRequest) (*qu
func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
dbID := req.DbID
collectionID := req.CollectionID
qs.replica.loadCollection(dbID, collectionID)
fn := func(err error) *commonpb.Status {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
}
collection, err := qs.replica.loadCollection(dbID, collectionID)
if err != nil {
return fn(err), err
}
if collection == nil {
return fn(nil), nil
}
// get partitionIDs
@ -189,7 +202,7 @@ func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*com
return fn(err), err
}
if showPartitionResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
log.Fatal("show partition fail, v%", showPartitionResponse.Status.Reason)
return showPartitionResponse.Status, err
}
partitionIDs := showPartitionResponse.PartitionIDs
@ -208,15 +221,24 @@ func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*com
func (qs *QueryService) ReleaseCollection(req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
dbID := req.DbID
collectionID := req.CollectionID
partitionsIDs, err := qs.replica.getPartitionIDs(dbID, collectionID)
partitions, err := qs.replica.getPartitions(dbID, collectionID)
if err != nil {
log.Fatal("get partition ids error")
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}, err
}
partitionIDs := make([]UniqueID, 0)
for _, partition := range partitions {
partitionIDs = append(partitionIDs, partition.id)
}
releasePartitionRequest := &querypb.ReleasePartitionRequest{
Base: req.Base,
DbID: dbID,
CollectionID: collectionID,
PartitionIDs: partitionsIDs,
PartitionIDs: partitionIDs,
}
status, err := qs.ReleasePartitions(releasePartitionRequest)
@ -227,9 +249,18 @@ func (qs *QueryService) ReleaseCollection(req *querypb.ReleaseCollectionRequest)
func (qs *QueryService) ShowPartitions(req *querypb.ShowPartitionRequest) (*querypb.ShowPartitionResponse, error) {
dbID := req.DbID
collectionID := req.CollectionID
partitionIDs, err := qs.replica.getPartitionIDs(dbID, collectionID)
partitions, err := qs.replica.getPartitions(dbID, collectionID)
partitionIDs := make([]UniqueID, 0)
for _, partition := range partitions {
partitionIDs = append(partitionIDs, partition.id)
}
if err != nil {
return nil, err
return &querypb.ShowPartitionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
},
}, err
}
return &querypb.ShowPartitionResponse{
Status: &commonpb.Status{
@ -237,14 +268,14 @@ func (qs *QueryService) ShowPartitions(req *querypb.ShowPartitionRequest) (*quer
},
PartitionIDs: partitionIDs,
}, nil
}
func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*commonpb.Status, error) {
//TODO::suggest different partitions have different dm channel
dbID := req.DbID
collectionID := req.CollectionID
partitionIDs := req.PartitionIDs
qs.replica.loadPartitions(dbID, collectionID, partitionIDs)
qs.replica.loadPartition(dbID, collectionID, partitionIDs[0])
fn := func(err error) *commonpb.Status {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
@ -333,6 +364,9 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
for key, node := range qs.queryNodes {
if channels == node.insertChannels {
statesID := id2segs[i][len(id2segs[i])-1]
//TODO :: should be start position
position := segmentStates[statesID-1].StartPositions
segmentStates[statesID].StartPositions = position
loadSegmentRequest := &querypb.LoadSegmentRequest{
CollectionID: collectionID,
PartitionID: partitionID,
@ -359,10 +393,18 @@ func (qs *QueryService) ReleasePartitions(req *querypb.ReleasePartitionRequest)
partitionIDs := req.PartitionIDs
segmentIDs := make([]UniqueID, 0)
for _, partitionID := range partitionIDs {
res, err := qs.replica.getSegmentIDs(dbID, collectionID, partitionID)
segments, err := qs.replica.getSegments(dbID, collectionID, partitionID)
if err != nil {
log.Fatal("get segment ids error")
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}, err
}
res := make([]UniqueID, 0)
for _, segment := range segments {
res = append(res, segment.id)
}
segmentIDs = append(segmentIDs, res...)
}
releaseSegmentRequest := &querypb.ReleaseSegmentRequest{
@ -421,7 +463,7 @@ func (qs *QueryService) GetPartitionStates(req *querypb.PartitionStatesRequest)
}
func NewQueryService(ctx context.Context) (*QueryService, error) {
nodes := make([]*queryNode, 0)
nodes := make([]*queryNodeInfo, 0)
ctx1, cancel := context.WithCancel(ctx)
replica := newMetaReplica()
service := &QueryService{

View File

@ -4,6 +4,7 @@ import (
"encoding/binary"
"fmt"
"reflect"
"strings"
"github.com/apache/pulsar-client-go/pulsar"
@ -43,7 +44,7 @@ func Uint64ToBytes(v uint64) []byte {
}
func PulsarMsgIDToString(messageID pulsar.MessageID) string {
return string(messageID.Serialize())
return strings.ToValidUTF8(string(messageID.Serialize()), "")
}
func StringToPulsarMsgID(msgString string) (pulsar.MessageID, error) {