enhance: add broadcast operation for msgstream (#39040)

issue: #38399

- make broadcast service available for msgstream by reusing the
architecture streaming service

---------

Signed-off-by: chyezh <chyezh@outlook.com>
pull/38599/head^2
Zhen Ye 2025-01-14 15:14:59 +08:00 committed by GitHub
parent da1b786ef8
commit fd84ed817c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 435 additions and 158 deletions

View File

@ -43,7 +43,6 @@ import (
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
"github.com/milvus-io/milvus/internal/util/initcore"
internalmetrics "github.com/milvus-io/milvus/internal/util/metrics"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
@ -389,10 +388,8 @@ func (mr *MilvusRoles) Run() {
tracer.Init()
// Initialize streaming service if enabled.
if streamingutil.IsStreamingServiceEnabled() {
streaming.Init()
defer streaming.Release()
}
streaming.Init()
defer streaming.Release()
coordclient.EnableLocalClientRole(&coordclient.LocalClientRoleConfig{
ServerType: mr.ServerType,

View File

@ -1121,6 +1121,8 @@ streaming:
# It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration
backoffInitialInterval: 50ms
backoffMultiplier: 2 # The multiplier of balance task trigger backoff, 2 by default
walBroadcaster:
concurrencyRatio: 1 # The concurrency ratio based on number of CPU for wal broadcaster, 1 by default.
txn:
defaultKeepaliveTimeout: 10s # The default keepalive timeout for wal txn, 10s by default

View File

@ -4,6 +4,7 @@ import (
"context"
"time"
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry"
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
@ -17,6 +18,8 @@ var singleton WALAccesser = nil
func Init() {
c, _ := kvfactory.GetEtcdAndPath()
singleton = newWALAccesser(c)
// Add the wal accesser to the broadcaster registry for making broadcast operation.
registry.Register(registry.AppendOperatorTypeStreaming, singleton)
}
// Release releases the resources of the wal accesser.

View File

@ -6,6 +6,7 @@ import (
"testing"
"time"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
@ -34,6 +35,10 @@ func TestStreamingProduce(t *testing.T) {
PartitionIds: []int64{1, 2, 3},
}).
WithBody(&msgpb.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreateCollection,
Timestamp: 1,
},
CollectionID: 1,
}).
WithBroadcast(vChannels).

View File

@ -8,6 +8,11 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/util/types"
)
type (
AppendResponses = types.AppendResponses
AppendResponse = types.AppendResponse
)
// AppendMessagesToWAL appends messages to the wal.
// It it a helper utility function to append messages to the wal.
// If the messages is belong to one vchannel, it will be sent as a transaction.
@ -26,7 +31,7 @@ func (u *walAccesserImpl) AppendMessages(ctx context.Context, msgs ...message.Mu
// Otherwise append the messages concurrently.
mu := &sync.Mutex{}
resp := newAppendResponseN(len(msgs))
resp := types.NewAppendResponseN(len(msgs))
wg := &sync.WaitGroup{}
wg.Add(len(dispatchedMessages))
@ -39,7 +44,7 @@ func (u *walAccesserImpl) AppendMessages(ctx context.Context, msgs ...message.Mu
singleResp := u.appendToVChannel(ctx, vchannel, msgs...)
mu.Lock()
for i, idx := range idxes {
resp.fillResponseAtIdx(singleResp.Responses[i], idx)
resp.FillResponseAtIdx(singleResp.Responses[i], idx)
}
mu.Unlock()
return struct{}{}, nil
@ -76,9 +81,9 @@ func (u *walAccesserImpl) dispatchMessages(msgs ...message.MutableMessage) (map[
// appendToVChannel appends the messages to the specified vchannel.
func (u *walAccesserImpl) appendToVChannel(ctx context.Context, vchannel string, msgs ...message.MutableMessage) AppendResponses {
if len(msgs) == 0 {
return newAppendResponseN(0)
return types.NewAppendResponseN(0)
}
resp := newAppendResponseN(len(msgs))
resp := types.NewAppendResponseN(len(msgs))
// if only one message here, append it directly, no more goroutine needed.
// at most time, there's only one message here.
@ -86,7 +91,7 @@ func (u *walAccesserImpl) appendToVChannel(ctx context.Context, vchannel string,
// we should optimize the message-format, make it into one; but not the goroutine count.
if len(msgs) == 1 {
appendResult, err := u.appendToWAL(ctx, msgs[0])
resp.fillResponseAtIdx(AppendResponse{
resp.FillResponseAtIdx(AppendResponse{
AppendResult: appendResult,
Error: err,
}, 0)
@ -99,7 +104,7 @@ func (u *walAccesserImpl) appendToVChannel(ctx context.Context, vchannel string,
VChannel: vchannel,
})
if err != nil {
resp.fillAllError(err)
resp.FillAllError(err)
return resp
}
@ -115,7 +120,7 @@ func (u *walAccesserImpl) appendToVChannel(ctx context.Context, vchannel string,
defer wg.Done()
if err := txn.Append(ctx, msg); err != nil {
mu.Lock()
resp.fillResponseAtIdx(AppendResponse{
resp.FillResponseAtIdx(AppendResponse{
Error: err,
}, i)
mu.Unlock()
@ -129,75 +134,19 @@ func (u *walAccesserImpl) appendToVChannel(ctx context.Context, vchannel string,
// and fill the error with the first error.
if err := resp.UnwrapFirstError(); err != nil {
_ = txn.Rollback(ctx) // rollback failure can be ignored.
resp.fillAllError(err)
resp.FillAllError(err)
return resp
}
// commit the transaction and fill the response.
appendResult, err := txn.Commit(ctx)
resp.fillAllResponse(AppendResponse{
resp.FillAllResponse(AppendResponse{
AppendResult: appendResult,
Error: err,
})
return resp
}
// newAppendResponseN creates a new append response.
func newAppendResponseN(n int) AppendResponses {
return AppendResponses{
Responses: make([]AppendResponse, n),
}
}
// AppendResponse is the response of one append operation.
type AppendResponse struct {
AppendResult *types.AppendResult
Error error
}
// AppendResponses is the response of append operation.
type AppendResponses struct {
Responses []AppendResponse
}
func (a AppendResponses) MaxTimeTick() uint64 {
var maxTimeTick uint64
for _, r := range a.Responses {
if r.AppendResult != nil && r.AppendResult.TimeTick > maxTimeTick {
maxTimeTick = r.AppendResult.TimeTick
}
}
return maxTimeTick
}
// UnwrapFirstError returns the first error in the responses.
func (a AppendResponses) UnwrapFirstError() error {
for _, r := range a.Responses {
if r.Error != nil {
return r.Error
}
}
return nil
}
// fillAllError fills all the responses with the same error.
func (a *AppendResponses) fillAllError(err error) {
for i := range a.Responses {
a.Responses[i].Error = err
}
}
// fillResponseAtIdx fill the response at idx
func (a *AppendResponses) fillResponseAtIdx(resp AppendResponse, idx int) {
a.Responses[idx] = resp
}
func (a *AppendResponses) fillAllResponse(resp AppendResponse) {
for i := range a.Responses {
a.Responses[i] = resp
}
}
// applyOpt applies the append options to the message.
func applyOpt(msg message.MutableMessage, opts ...AppendOption) message.MutableMessage {
if len(opts) == 0 {

View File

@ -11,6 +11,7 @@ import (
"github.com/milvus-io/milvus/internal/distributed/streaming/internal/producer"
"github.com/milvus-io/milvus/internal/streamingcoord/client"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/internal/util/streamingutil/util"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
@ -27,7 +28,11 @@ func newWALAccesser(c *clientv3.Client) *walAccesserImpl {
// Create a new streaming coord client.
streamingCoordClient := client.NewClient(c)
// Create a new streamingnode handler client.
handlerClient := handler.NewHandlerClient(streamingCoordClient.Assignment())
var handlerClient handler.HandlerClient
if streamingutil.IsStreamingServiceEnabled() {
// streaming service is enabled, create the handler client for the streaming service.
handlerClient = handler.NewHandlerClient(streamingCoordClient.Assignment())
}
return &walAccesserImpl{
lifetime: typeutil.NewLifetime(),
streamingCoordClient: streamingCoordClient,

View File

@ -0,0 +1,105 @@
package rootcoord
import (
"context"
"github.com/milvus-io/milvus/internal/util/streamingutil/util"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)
var _ task = (*broadcastTask)(nil)
// newBroadcastTask creates a new broadcast task.
func newBroadcastTask(ctx context.Context, core *Core, msgs []message.MutableMessage) *broadcastTask {
return &broadcastTask{
baseTask: newBaseTask(ctx, core),
msgs: msgs,
}
}
// BroadcastTask is used to implement the broadcast operation based on the msgstream
// by using the streaming service interface.
// msgstream will be deprecated since 2.6.0 with streaming service, so those code will be removed in the future version.
type broadcastTask struct {
baseTask
msgs []message.MutableMessage // The message wait for broadcast
walName string
resultFuture *syncutil.Future[types.AppendResponses]
}
func (b *broadcastTask) Execute(ctx context.Context) error {
result := types.NewAppendResponseN(len(b.msgs))
defer func() {
b.resultFuture.Set(result)
}()
for idx, msg := range b.msgs {
tsMsg, err := adaptor.NewMsgPackFromMutableMessageV1(msg)
if err != nil {
result.FillResponseAtIdx(types.AppendResponse{Error: err}, idx)
return err
}
pchannel := funcutil.ToPhysicalChannel(msg.VChannel())
msgID, err := b.core.chanTimeTick.broadcastMarkDmlChannels([]string{pchannel}, &msgstream.MsgPack{
BeginTs: b.ts,
EndTs: b.ts,
Msgs: []msgstream.TsMsg{tsMsg},
})
if err != nil {
result.FillResponseAtIdx(types.AppendResponse{Error: err}, idx)
continue
}
result.FillResponseAtIdx(types.AppendResponse{
AppendResult: &types.AppendResult{
MessageID: adaptor.MustGetMessageIDFromMQWrapperIDBytes(b.walName, msgID[pchannel]),
TimeTick: b.ts,
},
}, idx)
}
return result.UnwrapFirstError()
}
func newMsgStreamAppendOperator(c *Core) *msgstreamAppendOperator {
return &msgstreamAppendOperator{
core: c,
walName: util.MustSelectWALName(),
}
}
// msgstreamAppendOperator the code of streamingcoord to make broadcast available on the legacy msgstream.
// Because msgstream is bound to the rootcoord task, so we transfer each broadcast operation into a ddl task.
// to make sure the timetick rule.
// The Msgstream will be deprecated since 2.6.0, so we make a single module to hold it.
type msgstreamAppendOperator struct {
core *Core
walName string
}
// AppendMessages implements the AppendOperator interface for broadcaster service at streaming service.
func (m *msgstreamAppendOperator) AppendMessages(ctx context.Context, msgs ...message.MutableMessage) types.AppendResponses {
t := &broadcastTask{
baseTask: newBaseTask(ctx, m.core),
msgs: msgs,
walName: m.walName,
resultFuture: syncutil.NewFuture[types.AppendResponses](),
}
if err := m.core.scheduler.AddTask(t); err != nil {
resp := types.NewAppendResponseN(len(msgs))
resp.FillAllError(err)
return resp
}
result, err := t.resultFuture.GetWithContext(ctx)
if err != nil {
resp := types.NewAppendResponseN(len(msgs))
resp.FillAllError(err)
return resp
}
return result
}

View File

@ -327,6 +327,9 @@ func (d *dmlChannels) broadcastMark(chanNames []string, pack *msgstream.MsgPack)
result[cn] = id.Serialize()
}
}
} else {
dms.mutex.RUnlock()
return nil, errors.Newf("channel not in use: %s", chanName)
}
dms.mutex.RUnlock()
}

View File

@ -46,6 +46,7 @@ import (
kvmetestore "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
"github.com/milvus-io/milvus/internal/metastore/model"
streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server"
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry"
tso2 "github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
@ -766,6 +767,10 @@ func (c *Core) startInternal() error {
sessionutil.SaveServerInfo(typeutil.RootCoordRole, c.session.ServerID)
log.Info("rootcoord startup successfully")
// regster the core as a appendoperator for broadcast service.
// TODO: should be removed at 2.6.0.
// Add the wal accesser to the broadcaster registry for making broadcast operation.
registry.Register(registry.AppendOperatorTypeMsgstream, newMsgStreamAppendOperator(c))
return nil
}

View File

@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/mocks"
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry"
"github.com/milvus-io/milvus/internal/util/dependency"
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
"github.com/milvus-io/milvus/internal/util/proxyutil"
@ -1356,6 +1357,7 @@ func TestCore_startTimeTickLoop(t *testing.T) {
func TestRootcoord_EnableActiveStandby(t *testing.T) {
randVal := rand.Int()
paramtable.Init()
registry.ResetRegistration()
Params.Save("etcd.rootPath", fmt.Sprintf("/%d", randVal))
// Need to reset global etcd to follow new path
kvfactory.CloseEtcdClient()
@ -1416,6 +1418,7 @@ func TestRootcoord_EnableActiveStandby(t *testing.T) {
func TestRootcoord_DisableActiveStandby(t *testing.T) {
randVal := rand.Int()
paramtable.Init()
registry.ResetRegistration()
Params.Save("etcd.rootPath", fmt.Sprintf("/%d", randVal))
// Need to reset global etcd to follow new path
kvfactory.CloseEtcdClient()

View File

@ -13,6 +13,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingcoord/client/assignment"
"github.com/milvus-io/milvus/internal/streamingcoord/client/broadcast"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/internal/util/streamingutil/service/balancer/picker"
streamingserviceinterceptor "github.com/milvus-io/milvus/internal/util/streamingutil/service/interceptor"
"github.com/milvus-io/milvus/internal/util/streamingutil/service/lazygrpc"
@ -43,9 +44,14 @@ type BroadcastService interface {
// Client is the interface of log service client.
type Client interface {
// Broadcast access broadcast service.
// Broadcast service will always be available.
// When streaming service is enabled, the broadcast will use the streaming service.
// When streaming service is disabled, the broadcast will use legacy msgstream.
Broadcast() BroadcastService
// Assignment access assignment service.
// Assignment service will only be available when streaming service is enabled.
Assignment() AssignmentService
// Close close the client.
@ -68,12 +74,16 @@ func NewClient(etcdCli *clientv3.Client) Client {
dialOptions...,
)
})
assignmentService := lazygrpc.WithServiceCreator(conn, streamingpb.NewStreamingCoordAssignmentServiceClient)
var assignmentServiceImpl *assignment.AssignmentServiceImpl
if streamingutil.IsStreamingServiceEnabled() {
assignmentService := lazygrpc.WithServiceCreator(conn, streamingpb.NewStreamingCoordAssignmentServiceClient)
assignmentServiceImpl = assignment.NewAssignmentService(assignmentService)
}
broadcastService := lazygrpc.WithServiceCreator(conn, streamingpb.NewStreamingCoordBroadcastServiceClient)
return &clientImpl{
conn: conn,
rb: rb,
assignmentService: assignment.NewAssignmentService(assignmentService),
assignmentService: assignmentServiceImpl,
broadcastService: broadcast.NewBroadcastService(util.MustSelectWALName(), broadcastService),
}
}

View File

@ -26,7 +26,9 @@ func (c *clientImpl) Assignment() AssignmentService {
// Close close the client.
func (c *clientImpl) Close() {
c.assignmentService.Close()
if c.assignmentService != nil {
c.assignmentService.Close()
}
c.conn.Close()
c.rb.Close()
}

View File

@ -3,7 +3,7 @@ package broadcaster
import (
"context"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
)
@ -16,9 +16,4 @@ type Broadcaster interface {
Close()
}
// AppendOperator is used to append messages, there's only two implement of this interface:
// 1. streaming.WAL()
// 2. old msgstream interface
type AppendOperator interface {
AppendMessages(ctx context.Context, msgs ...message.MutableMessage) streaming.AppendResponses
}
type AppendOperator = registry.AppendOperator

View File

@ -15,13 +15,15 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/contextutil"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func RecoverBroadcaster(
ctx context.Context,
appendOperator AppendOperator,
appendOperator *syncutil.Future[AppendOperator],
) (Broadcaster, error) {
logger := resource.Resource().Logger().With(log.FieldComponent("broadcaster"))
tasks, err := resource.Resource().StreamingCatalog().ListBroadcastTask(ctx)
@ -61,7 +63,7 @@ type broadcasterImpl struct {
pendingChan chan *broadcastTask
backoffChan chan *broadcastTask
workerChan chan *broadcastTask
appendOperator AppendOperator
appendOperator *syncutil.Future[AppendOperator] // TODO: we can remove those lazy future in 2.6.0, by remove the msgstream broadcaster.
}
// Broadcast broadcasts the message to all channels.
@ -126,21 +128,34 @@ func (b *broadcasterImpl) Close() {
// execute the broadcaster
func (b *broadcasterImpl) execute() {
b.logger.Info("broadcaster start to execute")
workers := int(float64(hardware.GetCPUNum()) * paramtable.Get().StreamingCfg.WALBroadcasterConcurrencyRatio.GetAsFloat())
if workers < 1 {
workers = 1
}
b.logger.Info("broadcaster start to execute", zap.Int("workerNum", workers))
defer func() {
b.backgroundTaskNotifier.Finish(struct{}{})
b.logger.Info("broadcaster execute exit")
}()
// Wait for appendOperator ready
appendOperator, err := b.appendOperator.GetWithContext(b.backgroundTaskNotifier.Context())
if err != nil {
b.logger.Info("broadcaster is closed before appendOperator ready")
return
}
b.logger.Info("broadcaster appendOperator ready, begin to start workers and dispatch")
// Start n workers to handle the broadcast task.
wg := sync.WaitGroup{}
for i := 0; i < 4; i++ {
for i := 0; i < workers; i++ {
i := i
// Start n workers to handle the broadcast task.
wg.Add(1)
go func() {
defer wg.Done()
b.worker(i)
b.worker(i, appendOperator)
}()
}
defer wg.Wait()
@ -174,8 +189,13 @@ func (b *broadcasterImpl) dispatch() {
b.backoffs.Push(task)
case <-nextBackOff:
// backoff is done, move all the backoff done task into pending to retry.
newPops := make([]*broadcastTask, 0)
for b.backoffs.Len() > 0 && b.backoffs.Peek().NextInterval() < time.Millisecond {
b.pendings = append(b.pendings, b.backoffs.Pop())
newPops = append(newPops, b.backoffs.Pop())
}
if len(newPops) > 0 {
// Push the backoff task into pendings front.
b.pendings = append(newPops, b.pendings...)
}
case workerChan <- nextTask:
// The task is sent to worker, remove it from pending list.
@ -184,9 +204,10 @@ func (b *broadcasterImpl) dispatch() {
}
}
func (b *broadcasterImpl) worker(no int) {
func (b *broadcasterImpl) worker(no int, appendOperator AppendOperator) {
logger := b.logger.With(zap.Int("workerNo", no))
defer func() {
b.logger.Info("broadcaster worker exit", zap.Int("no", no))
logger.Info("broadcaster worker exit")
}()
for {
@ -194,7 +215,7 @@ func (b *broadcasterImpl) worker(no int) {
case <-b.backgroundTaskNotifier.Context().Done():
return
case task := <-b.workerChan:
if err := task.Poll(b.backgroundTaskNotifier.Context(), b.appendOperator); err != nil {
if err := task.Execute(b.backgroundTaskNotifier.Context(), appendOperator); err != nil {
// If the task is not done, repush it into pendings and retry infinitely.
select {
case <-b.backgroundTaskNotifier.Context().Done():

View File

@ -12,7 +12,6 @@ import (
"go.uber.org/atomic"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/internal/mocks/mock_metastore"
"github.com/milvus-io/milvus/internal/mocks/streamingcoord/server/mock_broadcaster"
"github.com/milvus-io/milvus/internal/streamingcoord/server/resource"
@ -23,10 +22,13 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)
func TestBroadcaster(t *testing.T) {
paramtable.Init()
meta := mock_metastore.NewMockStreamingCoordCataLog(t)
meta.EXPECT().ListBroadcastTask(mock.Anything).
RunAndReturn(func(ctx context.Context) ([]*streamingpb.BroadcastTask, error) {
@ -39,7 +41,7 @@ func TestBroadcaster(t *testing.T) {
done := atomic.NewInt64(0)
meta.EXPECT().SaveBroadcastTask(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, bt *streamingpb.BroadcastTask) error {
// may failure
if rand.Int31n(10) < 5 {
if rand.Int31n(10) < 3 {
return errors.New("save task failed")
}
if bt.State == streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_DONE {
@ -58,7 +60,7 @@ func TestBroadcaster(t *testing.T) {
assert.NotNil(t, bc)
assert.Eventually(t, func() bool {
return appended.Load() == 6 && done.Load() == 3
}, 10*time.Second, 10*time.Millisecond)
}, 30*time.Second, 10*time.Millisecond)
var result *types.BroadcastAppendResult
for {
@ -73,7 +75,7 @@ func TestBroadcaster(t *testing.T) {
assert.Eventually(t, func() bool {
return done.Load() == 4
}, 10*time.Second, 10*time.Millisecond)
}, 30*time.Second, 10*time.Millisecond)
// TODO: error path.
bc.Close()
@ -83,23 +85,23 @@ func TestBroadcaster(t *testing.T) {
assert.Nil(t, result)
}
func createOpeartor(t *testing.T) (AppendOperator, *atomic.Int64) {
func createOpeartor(t *testing.T) (*syncutil.Future[AppendOperator], *atomic.Int64) {
id := atomic.NewInt64(1)
appended := atomic.NewInt64(0)
operator := mock_broadcaster.NewMockAppendOperator(t)
f := func(ctx context.Context, msgs ...message.MutableMessage) streaming.AppendResponses {
resps := streaming.AppendResponses{
Responses: make([]streaming.AppendResponse, len(msgs)),
f := func(ctx context.Context, msgs ...message.MutableMessage) types.AppendResponses {
resps := types.AppendResponses{
Responses: make([]types.AppendResponse, len(msgs)),
}
for idx := range msgs {
newID := walimplstest.NewTestMessageID(id.Inc())
if rand.Int31n(10) < 5 {
resps.Responses[idx] = streaming.AppendResponse{
if rand.Int31n(10) < 3 {
resps.Responses[idx] = types.AppendResponse{
Error: errors.New("append failed"),
}
continue
}
resps.Responses[idx] = streaming.AppendResponse{
resps.Responses[idx] = types.AppendResponse{
AppendResult: &types.AppendResult{
MessageID: newID,
TimeTick: uint64(time.Now().UnixMilli()),
@ -114,7 +116,10 @@ func createOpeartor(t *testing.T) (AppendOperator, *atomic.Int64) {
operator.EXPECT().AppendMessages(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(f)
operator.EXPECT().AppendMessages(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(f)
operator.EXPECT().AppendMessages(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(f)
return operator, appended
fOperator := syncutil.NewFuture[AppendOperator]()
fOperator.Set(operator)
return fOperator, appended
}
func createNewBroadcastMsg(vchannels []string) message.BroadcastMutableMessage {

View File

@ -0,0 +1,42 @@
package registry
import (
"context"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)
type AppendOperatorType int
const (
AppendOperatorTypeMsgstream AppendOperatorType = iota + 1
AppendOperatorTypeStreaming
)
var localRegistry = make(map[AppendOperatorType]*syncutil.Future[AppendOperator])
// AppendOperator is used to append messages, there's only two implement of this interface:
// 1. streaming.WAL()
// 2. old msgstream interface
type AppendOperator interface {
AppendMessages(ctx context.Context, msgs ...message.MutableMessage) types.AppendResponses
}
func init() {
localRegistry[AppendOperatorTypeMsgstream] = syncutil.NewFuture[AppendOperator]()
localRegistry[AppendOperatorTypeStreaming] = syncutil.NewFuture[AppendOperator]()
}
func Register(typ AppendOperatorType, op AppendOperator) {
localRegistry[typ].Set(op)
}
func GetAppendOperator() *syncutil.Future[AppendOperator] {
if streamingutil.IsStreamingServiceEnabled() {
return localRegistry[AppendOperatorTypeStreaming]
}
return localRegistry[AppendOperatorTypeMsgstream]
}

View File

@ -0,0 +1,12 @@
//go:build test
// +build test
package registry
import "github.com/milvus-io/milvus/pkg/util/syncutil"
func ResetRegistration() {
localRegistry = make(map[AppendOperatorType]*syncutil.Future[AppendOperator])
localRegistry[AppendOperatorTypeMsgstream] = syncutil.NewFuture[AppendOperator]()
localRegistry[AppendOperatorTypeStreaming] = syncutil.NewFuture[AppendOperator]()
}

View File

@ -49,15 +49,17 @@ type broadcastTask struct {
*typeutil.BackoffWithInstant
}
// Poll polls the task, return nil if the task is done, otherwise not done.
// Poll can be repeated called until the task is done.
func (b *broadcastTask) Poll(ctx context.Context, operator AppendOperator) error {
// Execute reexecute the task, return nil if the task is done, otherwise not done.
// Execute can be repeated called until the task is done.
// Same semantics as the `Poll` operation in eventloop.
func (b *broadcastTask) Execute(ctx context.Context, operator AppendOperator) error {
if len(b.pendingMessages) > 0 {
b.logger.Debug("broadcast task is polling to make sent...", zap.Int("pendingMessages", len(b.pendingMessages)))
resps := operator.AppendMessages(ctx, b.pendingMessages...)
newPendings := make([]message.MutableMessage, 0)
for idx, resp := range resps.Responses {
if resp.Error != nil {
b.logger.Warn("broadcast task append message failed", zap.Int("idx", idx), zap.Error(resp.Error))
newPendings = append(newPendings, b.pendingMessages[idx])
continue
}
@ -67,7 +69,7 @@ func (b *broadcastTask) Poll(ctx context.Context, operator AppendOperator) error
if len(newPendings) == 0 {
b.future.Set(&types.BroadcastAppendResult{AppendResults: b.appendResult})
}
b.logger.Info("broadcast task make a new broadcast done", zap.Int("pendingMessages", len(b.pendingMessages)))
b.logger.Info("broadcast task make a new broadcast done", zap.Int("backoffRetryMessages", len(b.pendingMessages)))
}
if len(b.pendingMessages) == 0 {
// There's no more pending message, mark the task as done.

View File

@ -9,6 +9,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer"
_ "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer/policy" // register the balancer policy
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster"
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry"
"github.com/milvus-io/milvus/internal/streamingcoord/server/service"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
@ -69,7 +70,7 @@ func (s *Server) initBasicComponent(ctx context.Context) (err error) {
// So we need to recover it.
futures = append(futures, conc.Go(func() (struct{}, error) {
s.logger.Info("start recovery broadcaster...")
broadcaster, err := broadcaster.RecoverBroadcaster(ctx, broadcaster.NewAppendOperator())
broadcaster, err := broadcaster.RecoverBroadcaster(ctx, registry.GetAppendOperator())
if err != nil {
s.logger.Warn("recover broadcaster failed", zap.Error(err))
return struct{}{}, err

View File

@ -0,0 +1,25 @@
package adaptor
import (
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
)
func NewMsgPackFromMutableMessageV1(msg message.MutableMessage) (msgstream.TsMsg, error) {
if msg.Version() != message.VersionV1 {
return nil, errors.New("Invalid message version")
}
tsMsg, err := unmashalerDispatcher.Unmarshal(msg.Payload(), MustGetCommonpbMsgTypeFromMessageType(msg.MessageType()))
if err != nil {
return nil, errors.Wrap(err, "Failed to unmarshal message")
}
return recoverMutableMessageFromHeader(tsMsg, msg)
}
func recoverMutableMessageFromHeader(tsMsg msgstream.TsMsg, _ message.MutableMessage) (msgstream.TsMsg, error) {
// TODO: fillback the header information to tsMsg
return tsMsg, nil
}

View File

@ -0,0 +1,113 @@
package types
import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/milvus-io/milvus/pkg/proto/messagespb"
"github.com/milvus-io/milvus/pkg/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
)
// BroadcastAppendResult is the result of broadcast append operation.
type BroadcastAppendResult struct {
AppendResults map[string]*AppendResult // make the channel name to the append result.
}
// GetAppendResult returns the append result of the given channel.
func (r *BroadcastAppendResult) GetAppendResult(channelName string) *AppendResult {
return r.AppendResults[channelName]
}
// AppendResult is the result of append operation.
type AppendResult struct {
// MessageID is generated by underlying walimpls.
MessageID message.MessageID
// TimeTick is the time tick of the message.
// Set by timetick interceptor.
TimeTick uint64
// TxnCtx is the transaction context of the message.
// If the message is not belong to a transaction, the TxnCtx will be nil.
TxnCtx *message.TxnContext
// Extra is the extra information of the append result.
Extra *anypb.Any
}
// GetExtra unmarshal the extra information to the given message.
func (r *AppendResult) GetExtra(m proto.Message) error {
return anypb.UnmarshalTo(r.Extra, m, proto.UnmarshalOptions{
DiscardUnknown: true,
AllowPartial: true,
})
}
// IntoProto converts the append result to proto.
func (r *AppendResult) IntoProto() *streamingpb.ProduceMessageResponseResult {
return &streamingpb.ProduceMessageResponseResult{
Id: &messagespb.MessageID{
Id: r.MessageID.Marshal(),
},
Timetick: r.TimeTick,
TxnContext: r.TxnCtx.IntoProto(),
Extra: r.Extra,
}
}
// NewAppendResponseN creates a new append response.
func NewAppendResponseN(n int) AppendResponses {
return AppendResponses{
Responses: make([]AppendResponse, n),
}
}
// AppendResponse is the response of one append operation.
type AppendResponse struct {
AppendResult *AppendResult
Error error
}
// AppendResponses is the response of append operation.
type AppendResponses struct {
Responses []AppendResponse
}
func (a AppendResponses) MaxTimeTick() uint64 {
var maxTimeTick uint64
for _, r := range a.Responses {
if r.AppendResult != nil && r.AppendResult.TimeTick > maxTimeTick {
maxTimeTick = r.AppendResult.TimeTick
}
}
return maxTimeTick
}
// UnwrapFirstError returns the first error in the responses.
func (a AppendResponses) UnwrapFirstError() error {
for _, r := range a.Responses {
if r.Error != nil {
return r.Error
}
}
return nil
}
// FillAllError fills all the responses with the same error.
func (a *AppendResponses) FillAllError(err error) {
for i := range a.Responses {
a.Responses[i].Error = err
}
}
// FillResponseAtIdx fill the response at idx
func (a *AppendResponses) FillResponseAtIdx(resp AppendResponse, idx int) {
a.Responses[idx] = resp
}
func (a *AppendResponses) FillAllResponse(resp AppendResponse) {
for i := range a.Responses {
a.Responses[i] = resp
}
}

View File

@ -4,12 +4,8 @@ import (
"context"
"github.com/cockroachdb/errors"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/milvus-io/milvus/pkg/proto/messagespb"
"github.com/milvus-io/milvus/pkg/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -88,50 +84,3 @@ func (n *StreamingNodeStatus) ErrorOfNode() error {
}
return n.Err
}
// BroadcastAppendResult is the result of broadcast append operation.
type BroadcastAppendResult struct {
AppendResults map[string]*AppendResult // make the channel name to the append result.
}
// GetAppendResult returns the append result of the given channel.
func (r *BroadcastAppendResult) GetAppendResult(channelName string) *AppendResult {
return r.AppendResults[channelName]
}
// AppendResult is the result of append operation.
type AppendResult struct {
// MessageID is generated by underlying walimpls.
MessageID message.MessageID
// TimeTick is the time tick of the message.
// Set by timetick interceptor.
TimeTick uint64
// TxnCtx is the transaction context of the message.
// If the message is not belong to a transaction, the TxnCtx will be nil.
TxnCtx *message.TxnContext
// Extra is the extra information of the append result.
Extra *anypb.Any
}
// GetExtra unmarshal the extra information to the given message.
func (r *AppendResult) GetExtra(m proto.Message) error {
return anypb.UnmarshalTo(r.Extra, m, proto.UnmarshalOptions{
DiscardUnknown: true,
AllowPartial: true,
})
}
// IntoProto converts the append result to proto.
func (r *AppendResult) IntoProto() *streamingpb.ProduceMessageResponseResult {
return &streamingpb.ProduceMessageResponseResult{
Id: &messagespb.MessageID{
Id: r.MessageID.Marshal(),
},
Timetick: r.TimeTick,
TxnContext: r.TxnCtx.IntoProto(),
Extra: r.Extra,
}
}

View File

@ -4828,6 +4828,9 @@ type streamingConfig struct {
WALBalancerBackoffInitialInterval ParamItem `refreshable:"true"`
WALBalancerBackoffMultiplier ParamItem `refreshable:"true"`
// broadcaster
WALBroadcasterConcurrencyRatio ParamItem `refreshable:"false"`
// txn
TxnDefaultKeepaliveTimeout ParamItem `refreshable:"true"`
}
@ -4861,6 +4864,15 @@ It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDura
}
p.WALBalancerBackoffMultiplier.Init(base.mgr)
p.WALBroadcasterConcurrencyRatio = ParamItem{
Key: "streaming.walBroadcaster.concurrencyRatio",
Version: "2.5.4",
Doc: `The concurrency ratio based on number of CPU for wal broadcaster, 1 by default.`,
DefaultValue: "1",
Export: true,
}
p.WALBroadcasterConcurrencyRatio.Init(base.mgr)
// txn
p.TxnDefaultKeepaliveTimeout = ParamItem{
Key: "streaming.txn.defaultKeepaliveTimeout",

View File

@ -614,14 +614,17 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 1*time.Minute, params.StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse())
assert.Equal(t, 50*time.Millisecond, params.StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse())
assert.Equal(t, 2.0, params.StreamingCfg.WALBalancerBackoffMultiplier.GetAsFloat())
assert.Equal(t, 1.0, params.StreamingCfg.WALBroadcasterConcurrencyRatio.GetAsFloat())
assert.Equal(t, 10*time.Second, params.StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse())
params.Save(params.StreamingCfg.WALBalancerTriggerInterval.Key, "50s")
params.Save(params.StreamingCfg.WALBalancerBackoffInitialInterval.Key, "50s")
params.Save(params.StreamingCfg.WALBalancerBackoffMultiplier.Key, "3.5")
params.Save(params.StreamingCfg.WALBroadcasterConcurrencyRatio.Key, "1.5")
params.Save(params.StreamingCfg.TxnDefaultKeepaliveTimeout.Key, "3500ms")
assert.Equal(t, 50*time.Second, params.StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse())
assert.Equal(t, 50*time.Second, params.StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse())
assert.Equal(t, 3.5, params.StreamingCfg.WALBalancerBackoffMultiplier.GetAsFloat())
assert.Equal(t, 1.5, params.StreamingCfg.WALBroadcasterConcurrencyRatio.GetAsFloat())
assert.Equal(t, 3500*time.Millisecond, params.StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse())
})

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
@ -302,6 +303,8 @@ func (s *CoordDownSearch) searchAfterCoordDown() float64 {
s.searchFailed(searchCollectionName, Dim, commonpb.ConsistencyLevel_Strong)
log.Info(fmt.Sprintf("=========================Failed search cost: %fs=========================", time.Since(failedStart).Seconds()))
registry.ResetRegistration()
log.Info("=========================restart Root Coordinators=========================")
c.StartRootCoord()
s.search(searchCollectionName, Dim, commonpb.ConsistencyLevel_Eventually)

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
@ -242,6 +243,8 @@ func (s *CoordSwitchSuite) switchCoord() float64 {
log.Info("=========================Coordinators stopped=========================", zap.Duration("elapsed", time.Since(start)))
start = time.Now()
registry.ResetRegistration()
c.StartRootCoord()
log.Info("=========================RootCoord restarted=========================")
c.StartDataCoord()

View File

@ -30,6 +30,7 @@ import (
"go.uber.org/zap/zapcore"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry"
"github.com/milvus-io/milvus/internal/util/hookutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/etcd"
@ -169,4 +170,5 @@ func (s *MiniClusterSuite) TearDownTest() {
if s.Cluster != nil {
s.Cluster.Stop()
}
registry.ResetRegistration()
}