mirror of https://github.com/milvus-io/milvus.git
fix: split delete task msg to MaxMessageSize to avoid mq message too large error (#36197)
relate: https://github.com/milvus-io/milvus/issues/36089 --------- Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>pull/36523/head
parent
b1ac3f0df0
commit
ffc12fb5c4
|
@ -166,8 +166,8 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) {
|
|||
EndTs: dt.EndTs(),
|
||||
}
|
||||
|
||||
for _, msg := range result {
|
||||
if msg != nil {
|
||||
for _, msgs := range result {
|
||||
for _, msg := range msgs {
|
||||
msgPack.Msgs = append(msgPack.Msgs, msg)
|
||||
}
|
||||
}
|
||||
|
@ -202,75 +202,78 @@ func repackDeleteMsgByHash(
|
|||
collectionName string,
|
||||
partitionID int64,
|
||||
partitionName string,
|
||||
) (map[uint32]*msgstream.DeleteMsg, int64, error) {
|
||||
) (map[uint32][]*msgstream.DeleteMsg, int64, error) {
|
||||
maxSize := Params.PulsarCfg.MaxMessageSize.GetAsInt()
|
||||
hashValues := typeutil.HashPK2Channels(primaryKeys, vChannels)
|
||||
// repack delete msg by dmChannel
|
||||
result := make(map[uint32]*msgstream.DeleteMsg)
|
||||
result := make(map[uint32][]*msgstream.DeleteMsg)
|
||||
lastMessageSize := map[uint32]int{}
|
||||
|
||||
numRows := int64(0)
|
||||
numMessage := 0
|
||||
|
||||
createMessage := func(key uint32, vchannel string) *msgstream.DeleteMsg {
|
||||
numMessage++
|
||||
lastMessageSize[key] = 0
|
||||
return &msgstream.DeleteMsg{
|
||||
BaseMsg: msgstream.BaseMsg{
|
||||
Ctx: ctx,
|
||||
},
|
||||
DeleteRequest: &msgpb.DeleteRequest{
|
||||
Base: commonpbutil.NewMsgBase(
|
||||
commonpbutil.WithMsgType(commonpb.MsgType_Delete),
|
||||
// msgid of delete msg must be set later
|
||||
// or it will be seen as duplicated msg in mq
|
||||
commonpbutil.WithTimeStamp(ts),
|
||||
commonpbutil.WithSourceID(paramtable.GetNodeID()),
|
||||
),
|
||||
CollectionID: collectionID,
|
||||
PartitionID: partitionID,
|
||||
CollectionName: collectionName,
|
||||
PartitionName: partitionName,
|
||||
PrimaryKeys: &schemapb.IDs{},
|
||||
ShardName: vchannel,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
for index, key := range hashValues {
|
||||
vchannel := vChannels[key]
|
||||
_, ok := result[key]
|
||||
msgs, ok := result[key]
|
||||
if !ok {
|
||||
deleteMsg, err := newDeleteMsg(
|
||||
ctx,
|
||||
idAllocator,
|
||||
ts,
|
||||
collectionID,
|
||||
collectionName,
|
||||
partitionID,
|
||||
partitionName,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
deleteMsg.ShardName = vchannel
|
||||
result[key] = deleteMsg
|
||||
result[key] = make([]*msgstream.DeleteMsg, 1)
|
||||
msgs = result[key]
|
||||
result[key][0] = createMessage(key, vchannel)
|
||||
}
|
||||
curMsg := msgs[len(msgs)-1]
|
||||
size, id := typeutil.GetId(primaryKeys, index)
|
||||
if lastMessageSize[key]+16+size > maxSize {
|
||||
curMsg = createMessage(key, vchannel)
|
||||
result[key] = append(result[key], curMsg)
|
||||
}
|
||||
curMsg := result[key]
|
||||
curMsg.HashValues = append(curMsg.HashValues, hashValues[index])
|
||||
curMsg.Timestamps = append(curMsg.Timestamps, ts)
|
||||
|
||||
typeutil.AppendIDs(curMsg.PrimaryKeys, primaryKeys, index)
|
||||
typeutil.AppendID(curMsg.PrimaryKeys, id)
|
||||
lastMessageSize[key] += 16 + size
|
||||
curMsg.NumRows++
|
||||
numRows++
|
||||
}
|
||||
return result, numRows, nil
|
||||
}
|
||||
|
||||
func newDeleteMsg(
|
||||
ctx context.Context,
|
||||
idAllocator allocator.Interface,
|
||||
ts uint64,
|
||||
collectionID int64,
|
||||
collectionName string,
|
||||
partitionID int64,
|
||||
partitionName string,
|
||||
) (*msgstream.DeleteMsg, error) {
|
||||
msgid, err := idAllocator.AllocOne()
|
||||
// alloc messageID
|
||||
start, _, err := idAllocator.Alloc(uint32(numMessage))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to allocate MsgID of delete")
|
||||
return nil, 0, err
|
||||
}
|
||||
sliceRequest := &msgpb.DeleteRequest{
|
||||
Base: commonpbutil.NewMsgBase(
|
||||
commonpbutil.WithMsgType(commonpb.MsgType_Delete),
|
||||
// msgid of delete msg must be set
|
||||
// or it will be seen as duplicated msg in mq
|
||||
commonpbutil.WithMsgID(msgid),
|
||||
commonpbutil.WithTimeStamp(ts),
|
||||
commonpbutil.WithSourceID(paramtable.GetNodeID()),
|
||||
),
|
||||
CollectionID: collectionID,
|
||||
PartitionID: partitionID,
|
||||
CollectionName: collectionName,
|
||||
PartitionName: partitionName,
|
||||
PrimaryKeys: &schemapb.IDs{},
|
||||
|
||||
cnt := int64(0)
|
||||
for _, msgs := range result {
|
||||
for _, msg := range msgs {
|
||||
msg.Base.MsgID = start + cnt
|
||||
cnt++
|
||||
}
|
||||
}
|
||||
return &msgstream.DeleteMsg{
|
||||
BaseMsg: msgstream.BaseMsg{
|
||||
Ctx: ctx,
|
||||
},
|
||||
DeleteRequest: sliceRequest,
|
||||
}, nil
|
||||
return result, numRows, nil
|
||||
}
|
||||
|
||||
type deleteRunner struct {
|
||||
|
|
|
@ -46,19 +46,21 @@ func (dt *deleteTaskByStreamingService) Execute(ctx context.Context) (err error)
|
|||
}
|
||||
|
||||
var msgs []message.MutableMessage
|
||||
for hashKey, deleteMsg := range result {
|
||||
for hashKey, deleteMsgs := range result {
|
||||
vchannel := dt.vChannels[hashKey]
|
||||
msg, err := message.NewDeleteMessageBuilderV1().
|
||||
WithHeader(&message.DeleteMessageHeader{
|
||||
CollectionId: dt.collectionID,
|
||||
}).
|
||||
WithBody(deleteMsg.DeleteRequest).
|
||||
WithVChannel(vchannel).
|
||||
BuildMutable()
|
||||
if err != nil {
|
||||
return err
|
||||
for _, deleteMsg := range deleteMsgs {
|
||||
msg, err := message.NewDeleteMessageBuilderV1().
|
||||
WithHeader(&message.DeleteMessageHeader{
|
||||
CollectionId: dt.collectionID,
|
||||
}).
|
||||
WithBody(deleteMsg.DeleteRequest).
|
||||
WithVChannel(vchannel).
|
||||
BuildMutable()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
msgs = append(msgs, msg)
|
||||
}
|
||||
msgs = append(msgs, msg)
|
||||
}
|
||||
|
||||
log.Debug("send delete request to virtual channels",
|
||||
|
|
|
@ -122,19 +122,21 @@ func (it *upsertTaskByStreamingService) packDeleteMessage(ctx context.Context) (
|
|||
}
|
||||
|
||||
var msgs []message.MutableMessage
|
||||
for hashKey, deleteMsg := range result {
|
||||
for hashKey, deleteMsgs := range result {
|
||||
vchannel := vChannels[hashKey]
|
||||
msg, err := message.NewDeleteMessageBuilderV1().
|
||||
WithHeader(&message.DeleteMessageHeader{
|
||||
CollectionId: it.upsertMsg.DeleteMsg.CollectionID,
|
||||
}).
|
||||
WithBody(deleteMsg.DeleteRequest).
|
||||
WithVChannel(vchannel).
|
||||
BuildMutable()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
for _, deleteMsg := range deleteMsgs {
|
||||
msg, err := message.NewDeleteMessageBuilderV1().
|
||||
WithHeader(&message.DeleteMessageHeader{
|
||||
CollectionId: it.upsertMsg.DeleteMsg.CollectionID,
|
||||
}).
|
||||
WithBody(deleteMsg.DeleteRequest).
|
||||
WithVChannel(vchannel).
|
||||
BuildMutable()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
msgs = append(msgs, msg)
|
||||
}
|
||||
msgs = append(msgs, msg)
|
||||
}
|
||||
|
||||
log.Debug("Proxy Upsert deleteExecute done",
|
||||
|
|
|
@ -1269,6 +1269,44 @@ func AppendSystemFields(schema *schemapb.CollectionSchema) *schemapb.CollectionS
|
|||
return newSchema
|
||||
}
|
||||
|
||||
func GetId(src *schemapb.IDs, idx int) (int, any) {
|
||||
switch src.IdField.(type) {
|
||||
case *schemapb.IDs_IntId:
|
||||
return 8, src.GetIntId().Data[idx]
|
||||
case *schemapb.IDs_StrId:
|
||||
return len(src.GetStrId().Data[idx]), src.GetStrId().Data[idx]
|
||||
default:
|
||||
panic("unknown pk type")
|
||||
}
|
||||
}
|
||||
|
||||
func AppendID(dst *schemapb.IDs, src any) {
|
||||
switch value := src.(type) {
|
||||
case int64:
|
||||
if dst.GetIdField() == nil {
|
||||
dst.IdField = &schemapb.IDs_IntId{
|
||||
IntId: &schemapb.LongArray{
|
||||
Data: []int64{value},
|
||||
},
|
||||
}
|
||||
} else {
|
||||
dst.GetIntId().Data = append(dst.GetIntId().Data, value)
|
||||
}
|
||||
case string:
|
||||
if dst.GetIdField() == nil {
|
||||
dst.IdField = &schemapb.IDs_StrId{
|
||||
StrId: &schemapb.StringArray{
|
||||
Data: []string{value},
|
||||
},
|
||||
}
|
||||
} else {
|
||||
dst.GetStrId().Data = append(dst.GetStrId().Data, value)
|
||||
}
|
||||
default:
|
||||
// TODO
|
||||
}
|
||||
}
|
||||
|
||||
func AppendIDs(dst *schemapb.IDs, src *schemapb.IDs, idx int) {
|
||||
switch src.IdField.(type) {
|
||||
case *schemapb.IDs_IntId:
|
||||
|
|
Loading…
Reference in New Issue