Update repack delete msg (#8631)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
pull/8644/head
Cai Yudong 2021-09-27 10:35:58 +08:00 committed by GitHub
parent b23761f3ed
commit ece0a06798
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 98 additions and 103 deletions

View File

@ -217,8 +217,8 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
switch msgType {
case commonpb.MsgType_Insert:
result, err = InsertRepackFunc(tsMsgs, reBucketValues)
//case commonpb.MsgType_Delete:
// result, err = DeleteRepackFunc(tsMsgs, reBucketValues)
case commonpb.MsgType_Delete:
result, err = DeleteRepackFunc(tsMsgs, reBucketValues)
default:
result, err = DefaultRepackFunc(tsMsgs, reBucketValues)
}

View File

@ -302,59 +302,59 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
(*outputStream).Close()
}
//func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
// pulsarAddress, _ := Params.Load("_PulsarAddress")
// c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
// producerChannels := []string{c1, c2}
// consumerChannels := []string{c1, c2}
// consumerSubName := funcutil.RandomString(8)
//
// baseMsg := BaseMsg{
// BeginTimestamp: 0,
// EndTimestamp: 0,
// HashValues: []uint32{1, 3},
// }
//
// deleteRequest := internalpb.DeleteRequest{
// Base: &commonpb.MsgBase{
// MsgType: commonpb.MsgType_Delete,
// MsgID: 1,
// Timestamp: 1,
// SourceID: 1,
// },
// CollectionName: "Collection",
// ChannelID: "1",
// Timestamps: []Timestamp{1, 1},
// PrimaryKeys: []int64{1, 3},
// }
// deleteMsg := &DeleteMsg{
// BaseMsg: baseMsg,
// DeleteRequest: deleteRequest,
// }
//
// msgPack := MsgPack{}
// msgPack.Msgs = append(msgPack.Msgs, deleteMsg)
//
// factory := ProtoUDFactory{}
// pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
// inputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
// inputStream.AsProducer(producerChannels)
// inputStream.Start()
//
// pulsarClient2, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
// outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher())
// outputStream.AsConsumer(consumerChannels, consumerSubName)
// outputStream.Start()
// var output MsgStream = outputStream
//
// err := (*inputStream).Produce(&msgPack)
// if err != nil {
// log.Fatalf("produce error = %v", err)
// }
// receiveMsg(output, len(msgPack.Msgs)*2)
// (*inputStream).Close()
// (*outputStream).Close()
//}
func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
pulsarAddress, _ := Params.Load("_PulsarAddress")
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
producerChannels := []string{c1, c2}
consumerChannels := []string{c1, c2}
consumerSubName := funcutil.RandomString(8)
baseMsg := BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{1},
}
deleteRequest := internalpb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Delete,
MsgID: 1,
Timestamp: 1,
SourceID: 1,
},
CollectionName: "Collection",
ChannelID: "1",
Timestamp: Timestamp(1),
ExprPlan: []byte{},
}
deleteMsg := &DeleteMsg{
BaseMsg: baseMsg,
DeleteRequest: deleteRequest,
}
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, deleteMsg)
factory := ProtoUDFactory{}
pulsarClient, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
inputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
inputStream.Start()
pulsarClient2, _ := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher())
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.Start()
var output MsgStream = outputStream
err := (*inputStream).Produce(&msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
receiveMsg(output, len(msgPack.Msgs)*1)
(*inputStream).Close()
(*outputStream).Close()
}
func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
pulsarAddress, _ := Params.Load("_PulsarAddress")

View File

@ -74,54 +74,49 @@ func InsertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e
return result, nil
}
//func DeleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
// result := make(map[int32]*MsgPack)
// for i, request := range tsMsgs {
// if request.Type() != commonpb.MsgType_Delete {
// return nil, errors.New("msg's must be Delete")
// }
// deleteRequest := request.(*DeleteMsg)
// keys := hashKeys[i]
//
// timestampLen := len(deleteRequest.Timestamps)
// primaryKeysLen := len(deleteRequest.PrimaryKeys)
// keysLen := len(keys)
//
// if keysLen != timestampLen || keysLen != primaryKeysLen {
// return nil, errors.New("the length of hashValue, timestamps, primaryKeys are not equal")
// }
//
// for index, key := range keys {
// _, ok := result[key]
// if !ok {
// msgPack := MsgPack{}
// result[key] = &msgPack
// }
//
// sliceRequest := internalpb.DeleteRequest{
// Base: &commonpb.MsgBase{
// MsgType: commonpb.MsgType_Delete,
// MsgID: deleteRequest.Base.MsgID,
// Timestamp: deleteRequest.Timestamps[index],
// SourceID: deleteRequest.Base.SourceID,
// },
// CollectionName: deleteRequest.CollectionName,
// ChannelID: deleteRequest.ChannelID,
// Timestamps: []uint64{deleteRequest.Timestamps[index]},
// PrimaryKeys: []int64{deleteRequest.PrimaryKeys[index]},
// }
//
// deleteMsg := &DeleteMsg{
// BaseMsg: BaseMsg{
// Ctx: request.TraceCtx(),
// },
// DeleteRequest: sliceRequest,
// }
// result[key].Msgs = append(result[key].Msgs, deleteMsg)
// }
// }
// return result, nil
//}
func DeleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
result := make(map[int32]*MsgPack)
for i, request := range tsMsgs {
if request.Type() != commonpb.MsgType_Delete {
return nil, errors.New("msg's must be Delete")
}
deleteRequest := request.(*DeleteMsg)
keys := hashKeys[i]
if len(keys) != 1 {
return nil, errors.New("len(msg.hashValue) must equal 1, but it is: " + strconv.Itoa(len(keys)))
}
key := keys[0]
_, ok := result[key]
if !ok {
msgPack := MsgPack{}
result[key] = &msgPack
}
sliceRequest := internalpb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Delete,
MsgID: deleteRequest.Base.MsgID,
Timestamp: deleteRequest.Timestamp,
SourceID: deleteRequest.Base.SourceID,
},
CollectionName: deleteRequest.CollectionName,
ChannelID: deleteRequest.ChannelID,
Timestamp: deleteRequest.Timestamp,
ExprPlan: deleteRequest.ExprPlan,
}
deleteMsg := &DeleteMsg{
BaseMsg: BaseMsg{
Ctx: request.TraceCtx(),
},
DeleteRequest: sliceRequest,
}
result[key].Msgs = append(result[key].Msgs, deleteMsg)
}
return result, nil
}
func DefaultRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
result := make(map[int32]*MsgPack)