enhance: Produce messages of multiple topics in parallel (#36344)

Related to #36343

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/36410/head
congqixia 2024-09-23 11:23:11 +08:00 committed by GitHub
parent bfd68cc092
commit 3b01b7dc9a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 28 additions and 21 deletions

View File

@ -29,6 +29,7 @@ import (
"github.com/samber/lo" "github.com/samber/lo"
uatomic "go.uber.org/atomic" uatomic "go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -304,35 +305,41 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
if err != nil { if err != nil {
return err return err
} }
eg, _ := errgroup.WithContext(context.Background())
for k, v := range result { for k, v := range result {
channel := ms.producerChannels[k] k := k
for i := 0; i < len(v.Msgs); i++ { v := v
spanCtx, sp := MsgSpanFromCtx(v.Msgs[i].TraceCtx(), v.Msgs[i]) eg.Go(func() error {
defer sp.End() channel := ms.producerChannels[k]
for i := 0; i < len(v.Msgs); i++ {
spanCtx, sp := MsgSpanFromCtx(v.Msgs[i].TraceCtx(), v.Msgs[i])
defer sp.End()
mb, err := v.Msgs[i].Marshal(v.Msgs[i]) mb, err := v.Msgs[i].Marshal(v.Msgs[i])
if err != nil { if err != nil {
return err return err
} }
m, err := convertToByteArray(mb) m, err := convertToByteArray(mb)
if err != nil { if err != nil {
return err return err
} }
msg := &common.ProducerMessage{Payload: m, Properties: map[string]string{}} msg := &common.ProducerMessage{Payload: m, Properties: map[string]string{}}
InjectCtx(spanCtx, msg.Properties) InjectCtx(spanCtx, msg.Properties)
ms.producerLock.RLock() ms.producerLock.RLock()
if _, err := ms.producers[channel].Send(spanCtx, msg); err != nil { if _, err := ms.producers[channel].Send(spanCtx, msg); err != nil {
ms.producerLock.RUnlock()
sp.RecordError(err)
return err
}
ms.producerLock.RUnlock() ms.producerLock.RUnlock()
sp.RecordError(err)
return err
} }
ms.producerLock.RUnlock() return nil
} })
} }
return nil return eg.Wait()
} }
// BroadcastMark broadcast msg pack to all producers and returns corresponding msg id // BroadcastMark broadcast msg pack to all producers and returns corresponding msg id