From fd84ed817c54b4f046b0bdb43cb64fc6675eefc3 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Tue, 14 Jan 2025 15:14:59 +0800 Subject: [PATCH] enhance: add broadcast operation for msgstream (#39040) issue: #38399 - make broadcast service available for msgstream by reusing the architecture streaming service --------- Signed-off-by: chyezh --- cmd/roles/roles.go | 7 +- configs/milvus.yaml | 2 + internal/distributed/streaming/streaming.go | 3 + .../distributed/streaming/streaming_test.go | 5 + internal/distributed/streaming/util.go | 79 +++--------- internal/distributed/streaming/wal.go | 7 +- internal/rootcoord/broadcast_task.go | 105 ++++++++++++++++ internal/rootcoord/dml_channels.go | 3 + internal/rootcoord/root_coord.go | 5 + internal/rootcoord/root_coord_test.go | 3 + internal/streamingcoord/client/client.go | 14 ++- internal/streamingcoord/client/client_impl.go | 4 +- .../server/broadcaster/broadcaster.go | 9 +- .../server/broadcaster/broadcaster_impl.go | 39 ++++-- .../server/broadcaster/broadcaster_test.go | 29 +++-- .../broadcaster/registry/append_operator.go | 42 +++++++ .../broadcaster/registry/test_utility.go | 12 ++ .../streamingcoord/server/broadcaster/task.go | 10 +- internal/streamingcoord/server/server.go | 3 +- .../util/message/adaptor/broadcast_message.go | 25 ++++ pkg/streaming/util/types/responses.go | 113 ++++++++++++++++++ pkg/streaming/util/types/streaming_node.go | 51 -------- pkg/util/paramtable/component_param.go | 12 ++ pkg/util/paramtable/component_param_test.go | 3 + .../search_after_coord_down_test.go | 3 + .../coordrecovery/coord_recovery_test.go | 3 + tests/integration/suite.go | 2 + 27 files changed, 435 insertions(+), 158 deletions(-) create mode 100644 internal/rootcoord/broadcast_task.go create mode 100644 internal/streamingcoord/server/broadcaster/registry/append_operator.go create mode 100644 internal/streamingcoord/server/broadcaster/registry/test_utility.go create mode 100644 pkg/streaming/util/message/adaptor/broadcast_message.go create mode 100644 pkg/streaming/util/types/responses.go diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index ef59216f0f..355c30eb13 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -43,7 +43,6 @@ import ( kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" "github.com/milvus-io/milvus/internal/util/initcore" internalmetrics "github.com/milvus-io/milvus/internal/util/metrics" - "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -389,10 +388,8 @@ func (mr *MilvusRoles) Run() { tracer.Init() // Initialize streaming service if enabled. - if streamingutil.IsStreamingServiceEnabled() { - streaming.Init() - defer streaming.Release() - } + streaming.Init() + defer streaming.Release() coordclient.EnableLocalClientRole(&coordclient.LocalClientRoleConfig{ ServerType: mr.ServerType, diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 305db189ef..7b591fa847 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -1121,6 +1121,8 @@ streaming: # It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration backoffInitialInterval: 50ms backoffMultiplier: 2 # The multiplier of balance task trigger backoff, 2 by default + walBroadcaster: + concurrencyRatio: 1 # The concurrency ratio based on number of CPU for wal broadcaster, 1 by default. txn: defaultKeepaliveTimeout: 10s # The default keepalive timeout for wal txn, 10s by default diff --git a/internal/distributed/streaming/streaming.go b/internal/distributed/streaming/streaming.go index efd77d5f2a..df54d7c95e 100644 --- a/internal/distributed/streaming/streaming.go +++ b/internal/distributed/streaming/streaming.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry" kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/util/options" @@ -17,6 +18,8 @@ var singleton WALAccesser = nil func Init() { c, _ := kvfactory.GetEtcdAndPath() singleton = newWALAccesser(c) + // Add the wal accesser to the broadcaster registry for making broadcast operation. + registry.Register(registry.AppendOperatorTypeStreaming, singleton) } // Release releases the resources of the wal accesser. diff --git a/internal/distributed/streaming/streaming_test.go b/internal/distributed/streaming/streaming_test.go index e44e18e7c2..0da14f0923 100644 --- a/internal/distributed/streaming/streaming_test.go +++ b/internal/distributed/streaming/streaming_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/distributed/streaming" "github.com/milvus-io/milvus/pkg/streaming/util/message" @@ -34,6 +35,10 @@ func TestStreamingProduce(t *testing.T) { PartitionIds: []int64{1, 2, 3}, }). WithBody(&msgpb.CreateCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_CreateCollection, + Timestamp: 1, + }, CollectionID: 1, }). WithBroadcast(vChannels). diff --git a/internal/distributed/streaming/util.go b/internal/distributed/streaming/util.go index 3a024dc03c..02ba8e4584 100644 --- a/internal/distributed/streaming/util.go +++ b/internal/distributed/streaming/util.go @@ -8,6 +8,11 @@ import ( "github.com/milvus-io/milvus/pkg/streaming/util/types" ) +type ( + AppendResponses = types.AppendResponses + AppendResponse = types.AppendResponse +) + // AppendMessagesToWAL appends messages to the wal. // It it a helper utility function to append messages to the wal. // If the messages is belong to one vchannel, it will be sent as a transaction. @@ -26,7 +31,7 @@ func (u *walAccesserImpl) AppendMessages(ctx context.Context, msgs ...message.Mu // Otherwise append the messages concurrently. mu := &sync.Mutex{} - resp := newAppendResponseN(len(msgs)) + resp := types.NewAppendResponseN(len(msgs)) wg := &sync.WaitGroup{} wg.Add(len(dispatchedMessages)) @@ -39,7 +44,7 @@ func (u *walAccesserImpl) AppendMessages(ctx context.Context, msgs ...message.Mu singleResp := u.appendToVChannel(ctx, vchannel, msgs...) mu.Lock() for i, idx := range idxes { - resp.fillResponseAtIdx(singleResp.Responses[i], idx) + resp.FillResponseAtIdx(singleResp.Responses[i], idx) } mu.Unlock() return struct{}{}, nil @@ -76,9 +81,9 @@ func (u *walAccesserImpl) dispatchMessages(msgs ...message.MutableMessage) (map[ // appendToVChannel appends the messages to the specified vchannel. func (u *walAccesserImpl) appendToVChannel(ctx context.Context, vchannel string, msgs ...message.MutableMessage) AppendResponses { if len(msgs) == 0 { - return newAppendResponseN(0) + return types.NewAppendResponseN(0) } - resp := newAppendResponseN(len(msgs)) + resp := types.NewAppendResponseN(len(msgs)) // if only one message here, append it directly, no more goroutine needed. // at most time, there's only one message here. @@ -86,7 +91,7 @@ func (u *walAccesserImpl) appendToVChannel(ctx context.Context, vchannel string, // we should optimize the message-format, make it into one; but not the goroutine count. if len(msgs) == 1 { appendResult, err := u.appendToWAL(ctx, msgs[0]) - resp.fillResponseAtIdx(AppendResponse{ + resp.FillResponseAtIdx(AppendResponse{ AppendResult: appendResult, Error: err, }, 0) @@ -99,7 +104,7 @@ func (u *walAccesserImpl) appendToVChannel(ctx context.Context, vchannel string, VChannel: vchannel, }) if err != nil { - resp.fillAllError(err) + resp.FillAllError(err) return resp } @@ -115,7 +120,7 @@ func (u *walAccesserImpl) appendToVChannel(ctx context.Context, vchannel string, defer wg.Done() if err := txn.Append(ctx, msg); err != nil { mu.Lock() - resp.fillResponseAtIdx(AppendResponse{ + resp.FillResponseAtIdx(AppendResponse{ Error: err, }, i) mu.Unlock() @@ -129,75 +134,19 @@ func (u *walAccesserImpl) appendToVChannel(ctx context.Context, vchannel string, // and fill the error with the first error. if err := resp.UnwrapFirstError(); err != nil { _ = txn.Rollback(ctx) // rollback failure can be ignored. - resp.fillAllError(err) + resp.FillAllError(err) return resp } // commit the transaction and fill the response. appendResult, err := txn.Commit(ctx) - resp.fillAllResponse(AppendResponse{ + resp.FillAllResponse(AppendResponse{ AppendResult: appendResult, Error: err, }) return resp } -// newAppendResponseN creates a new append response. -func newAppendResponseN(n int) AppendResponses { - return AppendResponses{ - Responses: make([]AppendResponse, n), - } -} - -// AppendResponse is the response of one append operation. -type AppendResponse struct { - AppendResult *types.AppendResult - Error error -} - -// AppendResponses is the response of append operation. -type AppendResponses struct { - Responses []AppendResponse -} - -func (a AppendResponses) MaxTimeTick() uint64 { - var maxTimeTick uint64 - for _, r := range a.Responses { - if r.AppendResult != nil && r.AppendResult.TimeTick > maxTimeTick { - maxTimeTick = r.AppendResult.TimeTick - } - } - return maxTimeTick -} - -// UnwrapFirstError returns the first error in the responses. -func (a AppendResponses) UnwrapFirstError() error { - for _, r := range a.Responses { - if r.Error != nil { - return r.Error - } - } - return nil -} - -// fillAllError fills all the responses with the same error. -func (a *AppendResponses) fillAllError(err error) { - for i := range a.Responses { - a.Responses[i].Error = err - } -} - -// fillResponseAtIdx fill the response at idx -func (a *AppendResponses) fillResponseAtIdx(resp AppendResponse, idx int) { - a.Responses[idx] = resp -} - -func (a *AppendResponses) fillAllResponse(resp AppendResponse) { - for i := range a.Responses { - a.Responses[i] = resp - } -} - // applyOpt applies the append options to the message. func applyOpt(msg message.MutableMessage, opts ...AppendOption) message.MutableMessage { if len(opts) == 0 { diff --git a/internal/distributed/streaming/wal.go b/internal/distributed/streaming/wal.go index f721f2d63b..052129a1d6 100644 --- a/internal/distributed/streaming/wal.go +++ b/internal/distributed/streaming/wal.go @@ -11,6 +11,7 @@ import ( "github.com/milvus-io/milvus/internal/distributed/streaming/internal/producer" "github.com/milvus-io/milvus/internal/streamingcoord/client" "github.com/milvus-io/milvus/internal/streamingnode/client/handler" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/internal/util/streamingutil/status" "github.com/milvus-io/milvus/internal/util/streamingutil/util" "github.com/milvus-io/milvus/pkg/streaming/util/message" @@ -27,7 +28,11 @@ func newWALAccesser(c *clientv3.Client) *walAccesserImpl { // Create a new streaming coord client. streamingCoordClient := client.NewClient(c) // Create a new streamingnode handler client. - handlerClient := handler.NewHandlerClient(streamingCoordClient.Assignment()) + var handlerClient handler.HandlerClient + if streamingutil.IsStreamingServiceEnabled() { + // streaming service is enabled, create the handler client for the streaming service. + handlerClient = handler.NewHandlerClient(streamingCoordClient.Assignment()) + } return &walAccesserImpl{ lifetime: typeutil.NewLifetime(), streamingCoordClient: streamingCoordClient, diff --git a/internal/rootcoord/broadcast_task.go b/internal/rootcoord/broadcast_task.go new file mode 100644 index 0000000000..d95000318c --- /dev/null +++ b/internal/rootcoord/broadcast_task.go @@ -0,0 +1,105 @@ +package rootcoord + +import ( + "context" + + "github.com/milvus-io/milvus/internal/util/streamingutil/util" + "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/syncutil" +) + +var _ task = (*broadcastTask)(nil) + +// newBroadcastTask creates a new broadcast task. +func newBroadcastTask(ctx context.Context, core *Core, msgs []message.MutableMessage) *broadcastTask { + return &broadcastTask{ + baseTask: newBaseTask(ctx, core), + msgs: msgs, + } +} + +// BroadcastTask is used to implement the broadcast operation based on the msgstream +// by using the streaming service interface. +// msgstream will be deprecated since 2.6.0 with streaming service, so those code will be removed in the future version. +type broadcastTask struct { + baseTask + msgs []message.MutableMessage // The message wait for broadcast + walName string + resultFuture *syncutil.Future[types.AppendResponses] +} + +func (b *broadcastTask) Execute(ctx context.Context) error { + result := types.NewAppendResponseN(len(b.msgs)) + defer func() { + b.resultFuture.Set(result) + }() + + for idx, msg := range b.msgs { + tsMsg, err := adaptor.NewMsgPackFromMutableMessageV1(msg) + if err != nil { + result.FillResponseAtIdx(types.AppendResponse{Error: err}, idx) + return err + } + pchannel := funcutil.ToPhysicalChannel(msg.VChannel()) + msgID, err := b.core.chanTimeTick.broadcastMarkDmlChannels([]string{pchannel}, &msgstream.MsgPack{ + BeginTs: b.ts, + EndTs: b.ts, + Msgs: []msgstream.TsMsg{tsMsg}, + }) + if err != nil { + result.FillResponseAtIdx(types.AppendResponse{Error: err}, idx) + continue + } + result.FillResponseAtIdx(types.AppendResponse{ + AppendResult: &types.AppendResult{ + MessageID: adaptor.MustGetMessageIDFromMQWrapperIDBytes(b.walName, msgID[pchannel]), + TimeTick: b.ts, + }, + }, idx) + } + return result.UnwrapFirstError() +} + +func newMsgStreamAppendOperator(c *Core) *msgstreamAppendOperator { + return &msgstreamAppendOperator{ + core: c, + walName: util.MustSelectWALName(), + } +} + +// msgstreamAppendOperator the code of streamingcoord to make broadcast available on the legacy msgstream. +// Because msgstream is bound to the rootcoord task, so we transfer each broadcast operation into a ddl task. +// to make sure the timetick rule. +// The Msgstream will be deprecated since 2.6.0, so we make a single module to hold it. +type msgstreamAppendOperator struct { + core *Core + walName string +} + +// AppendMessages implements the AppendOperator interface for broadcaster service at streaming service. +func (m *msgstreamAppendOperator) AppendMessages(ctx context.Context, msgs ...message.MutableMessage) types.AppendResponses { + t := &broadcastTask{ + baseTask: newBaseTask(ctx, m.core), + msgs: msgs, + walName: m.walName, + resultFuture: syncutil.NewFuture[types.AppendResponses](), + } + + if err := m.core.scheduler.AddTask(t); err != nil { + resp := types.NewAppendResponseN(len(msgs)) + resp.FillAllError(err) + return resp + } + + result, err := t.resultFuture.GetWithContext(ctx) + if err != nil { + resp := types.NewAppendResponseN(len(msgs)) + resp.FillAllError(err) + return resp + } + return result +} diff --git a/internal/rootcoord/dml_channels.go b/internal/rootcoord/dml_channels.go index 7e210efaed..b626b6e0aa 100644 --- a/internal/rootcoord/dml_channels.go +++ b/internal/rootcoord/dml_channels.go @@ -327,6 +327,9 @@ func (d *dmlChannels) broadcastMark(chanNames []string, pack *msgstream.MsgPack) result[cn] = id.Serialize() } } + } else { + dms.mutex.RUnlock() + return nil, errors.Newf("channel not in use: %s", chanName) } dms.mutex.RUnlock() } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 16e319cb8c..ec8fd93ac8 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -46,6 +46,7 @@ import ( kvmetestore "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord" "github.com/milvus-io/milvus/internal/metastore/model" streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server" + "github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry" tso2 "github.com/milvus-io/milvus/internal/tso" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" @@ -766,6 +767,10 @@ func (c *Core) startInternal() error { sessionutil.SaveServerInfo(typeutil.RootCoordRole, c.session.ServerID) log.Info("rootcoord startup successfully") + // regster the core as a appendoperator for broadcast service. + // TODO: should be removed at 2.6.0. + // Add the wal accesser to the broadcaster registry for making broadcast operation. + registry.Register(registry.AppendOperatorTypeMsgstream, newMsgStreamAppendOperator(c)) return nil } diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 402a6b2242..951b23d580 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/mocks" mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" + "github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry" "github.com/milvus-io/milvus/internal/util/dependency" kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" "github.com/milvus-io/milvus/internal/util/proxyutil" @@ -1356,6 +1357,7 @@ func TestCore_startTimeTickLoop(t *testing.T) { func TestRootcoord_EnableActiveStandby(t *testing.T) { randVal := rand.Int() paramtable.Init() + registry.ResetRegistration() Params.Save("etcd.rootPath", fmt.Sprintf("/%d", randVal)) // Need to reset global etcd to follow new path kvfactory.CloseEtcdClient() @@ -1416,6 +1418,7 @@ func TestRootcoord_EnableActiveStandby(t *testing.T) { func TestRootcoord_DisableActiveStandby(t *testing.T) { randVal := rand.Int() paramtable.Init() + registry.ResetRegistration() Params.Save("etcd.rootPath", fmt.Sprintf("/%d", randVal)) // Need to reset global etcd to follow new path kvfactory.CloseEtcdClient() diff --git a/internal/streamingcoord/client/client.go b/internal/streamingcoord/client/client.go index 79ef36053d..4d9f30c35a 100644 --- a/internal/streamingcoord/client/client.go +++ b/internal/streamingcoord/client/client.go @@ -13,6 +13,7 @@ import ( "github.com/milvus-io/milvus/internal/streamingcoord/client/assignment" "github.com/milvus-io/milvus/internal/streamingcoord/client/broadcast" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/internal/util/streamingutil/service/balancer/picker" streamingserviceinterceptor "github.com/milvus-io/milvus/internal/util/streamingutil/service/interceptor" "github.com/milvus-io/milvus/internal/util/streamingutil/service/lazygrpc" @@ -43,9 +44,14 @@ type BroadcastService interface { // Client is the interface of log service client. type Client interface { + // Broadcast access broadcast service. + // Broadcast service will always be available. + // When streaming service is enabled, the broadcast will use the streaming service. + // When streaming service is disabled, the broadcast will use legacy msgstream. Broadcast() BroadcastService // Assignment access assignment service. + // Assignment service will only be available when streaming service is enabled. Assignment() AssignmentService // Close close the client. @@ -68,12 +74,16 @@ func NewClient(etcdCli *clientv3.Client) Client { dialOptions..., ) }) - assignmentService := lazygrpc.WithServiceCreator(conn, streamingpb.NewStreamingCoordAssignmentServiceClient) + var assignmentServiceImpl *assignment.AssignmentServiceImpl + if streamingutil.IsStreamingServiceEnabled() { + assignmentService := lazygrpc.WithServiceCreator(conn, streamingpb.NewStreamingCoordAssignmentServiceClient) + assignmentServiceImpl = assignment.NewAssignmentService(assignmentService) + } broadcastService := lazygrpc.WithServiceCreator(conn, streamingpb.NewStreamingCoordBroadcastServiceClient) return &clientImpl{ conn: conn, rb: rb, - assignmentService: assignment.NewAssignmentService(assignmentService), + assignmentService: assignmentServiceImpl, broadcastService: broadcast.NewBroadcastService(util.MustSelectWALName(), broadcastService), } } diff --git a/internal/streamingcoord/client/client_impl.go b/internal/streamingcoord/client/client_impl.go index 88c94794e1..e45b6ebd20 100644 --- a/internal/streamingcoord/client/client_impl.go +++ b/internal/streamingcoord/client/client_impl.go @@ -26,7 +26,9 @@ func (c *clientImpl) Assignment() AssignmentService { // Close close the client. func (c *clientImpl) Close() { - c.assignmentService.Close() + if c.assignmentService != nil { + c.assignmentService.Close() + } c.conn.Close() c.rb.Close() } diff --git a/internal/streamingcoord/server/broadcaster/broadcaster.go b/internal/streamingcoord/server/broadcaster/broadcaster.go index 79e77bb882..b1b6b3c633 100644 --- a/internal/streamingcoord/server/broadcaster/broadcaster.go +++ b/internal/streamingcoord/server/broadcaster/broadcaster.go @@ -3,7 +3,7 @@ package broadcaster import ( "context" - "github.com/milvus-io/milvus/internal/distributed/streaming" + "github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry" "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/util/types" ) @@ -16,9 +16,4 @@ type Broadcaster interface { Close() } -// AppendOperator is used to append messages, there's only two implement of this interface: -// 1. streaming.WAL() -// 2. old msgstream interface -type AppendOperator interface { - AppendMessages(ctx context.Context, msgs ...message.MutableMessage) streaming.AppendResponses -} +type AppendOperator = registry.AppendOperator diff --git a/internal/streamingcoord/server/broadcaster/broadcaster_impl.go b/internal/streamingcoord/server/broadcaster/broadcaster_impl.go index 8019ac9c77..0b63ba3288 100644 --- a/internal/streamingcoord/server/broadcaster/broadcaster_impl.go +++ b/internal/streamingcoord/server/broadcaster/broadcaster_impl.go @@ -15,13 +15,15 @@ import ( "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/util/contextutil" + "github.com/milvus-io/milvus/pkg/util/hardware" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/syncutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) func RecoverBroadcaster( ctx context.Context, - appendOperator AppendOperator, + appendOperator *syncutil.Future[AppendOperator], ) (Broadcaster, error) { logger := resource.Resource().Logger().With(log.FieldComponent("broadcaster")) tasks, err := resource.Resource().StreamingCatalog().ListBroadcastTask(ctx) @@ -61,7 +63,7 @@ type broadcasterImpl struct { pendingChan chan *broadcastTask backoffChan chan *broadcastTask workerChan chan *broadcastTask - appendOperator AppendOperator + appendOperator *syncutil.Future[AppendOperator] // TODO: we can remove those lazy future in 2.6.0, by remove the msgstream broadcaster. } // Broadcast broadcasts the message to all channels. @@ -126,21 +128,34 @@ func (b *broadcasterImpl) Close() { // execute the broadcaster func (b *broadcasterImpl) execute() { - b.logger.Info("broadcaster start to execute") + workers := int(float64(hardware.GetCPUNum()) * paramtable.Get().StreamingCfg.WALBroadcasterConcurrencyRatio.GetAsFloat()) + if workers < 1 { + workers = 1 + } + b.logger.Info("broadcaster start to execute", zap.Int("workerNum", workers)) + defer func() { b.backgroundTaskNotifier.Finish(struct{}{}) b.logger.Info("broadcaster execute exit") }() + // Wait for appendOperator ready + appendOperator, err := b.appendOperator.GetWithContext(b.backgroundTaskNotifier.Context()) + if err != nil { + b.logger.Info("broadcaster is closed before appendOperator ready") + return + } + b.logger.Info("broadcaster appendOperator ready, begin to start workers and dispatch") + // Start n workers to handle the broadcast task. wg := sync.WaitGroup{} - for i := 0; i < 4; i++ { + for i := 0; i < workers; i++ { i := i // Start n workers to handle the broadcast task. wg.Add(1) go func() { defer wg.Done() - b.worker(i) + b.worker(i, appendOperator) }() } defer wg.Wait() @@ -174,8 +189,13 @@ func (b *broadcasterImpl) dispatch() { b.backoffs.Push(task) case <-nextBackOff: // backoff is done, move all the backoff done task into pending to retry. + newPops := make([]*broadcastTask, 0) for b.backoffs.Len() > 0 && b.backoffs.Peek().NextInterval() < time.Millisecond { - b.pendings = append(b.pendings, b.backoffs.Pop()) + newPops = append(newPops, b.backoffs.Pop()) + } + if len(newPops) > 0 { + // Push the backoff task into pendings front. + b.pendings = append(newPops, b.pendings...) } case workerChan <- nextTask: // The task is sent to worker, remove it from pending list. @@ -184,9 +204,10 @@ func (b *broadcasterImpl) dispatch() { } } -func (b *broadcasterImpl) worker(no int) { +func (b *broadcasterImpl) worker(no int, appendOperator AppendOperator) { + logger := b.logger.With(zap.Int("workerNo", no)) defer func() { - b.logger.Info("broadcaster worker exit", zap.Int("no", no)) + logger.Info("broadcaster worker exit") }() for { @@ -194,7 +215,7 @@ func (b *broadcasterImpl) worker(no int) { case <-b.backgroundTaskNotifier.Context().Done(): return case task := <-b.workerChan: - if err := task.Poll(b.backgroundTaskNotifier.Context(), b.appendOperator); err != nil { + if err := task.Execute(b.backgroundTaskNotifier.Context(), appendOperator); err != nil { // If the task is not done, repush it into pendings and retry infinitely. select { case <-b.backgroundTaskNotifier.Context().Done(): diff --git a/internal/streamingcoord/server/broadcaster/broadcaster_test.go b/internal/streamingcoord/server/broadcaster/broadcaster_test.go index de0cc86b3b..738057dfa3 100644 --- a/internal/streamingcoord/server/broadcaster/broadcaster_test.go +++ b/internal/streamingcoord/server/broadcaster/broadcaster_test.go @@ -12,7 +12,6 @@ import ( "go.uber.org/atomic" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus/internal/distributed/streaming" "github.com/milvus-io/milvus/internal/mocks/mock_metastore" "github.com/milvus-io/milvus/internal/mocks/streamingcoord/server/mock_broadcaster" "github.com/milvus-io/milvus/internal/streamingcoord/server/resource" @@ -23,10 +22,13 @@ import ( "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/syncutil" ) func TestBroadcaster(t *testing.T) { + paramtable.Init() + meta := mock_metastore.NewMockStreamingCoordCataLog(t) meta.EXPECT().ListBroadcastTask(mock.Anything). RunAndReturn(func(ctx context.Context) ([]*streamingpb.BroadcastTask, error) { @@ -39,7 +41,7 @@ func TestBroadcaster(t *testing.T) { done := atomic.NewInt64(0) meta.EXPECT().SaveBroadcastTask(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, bt *streamingpb.BroadcastTask) error { // may failure - if rand.Int31n(10) < 5 { + if rand.Int31n(10) < 3 { return errors.New("save task failed") } if bt.State == streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_DONE { @@ -58,7 +60,7 @@ func TestBroadcaster(t *testing.T) { assert.NotNil(t, bc) assert.Eventually(t, func() bool { return appended.Load() == 6 && done.Load() == 3 - }, 10*time.Second, 10*time.Millisecond) + }, 30*time.Second, 10*time.Millisecond) var result *types.BroadcastAppendResult for { @@ -73,7 +75,7 @@ func TestBroadcaster(t *testing.T) { assert.Eventually(t, func() bool { return done.Load() == 4 - }, 10*time.Second, 10*time.Millisecond) + }, 30*time.Second, 10*time.Millisecond) // TODO: error path. bc.Close() @@ -83,23 +85,23 @@ func TestBroadcaster(t *testing.T) { assert.Nil(t, result) } -func createOpeartor(t *testing.T) (AppendOperator, *atomic.Int64) { +func createOpeartor(t *testing.T) (*syncutil.Future[AppendOperator], *atomic.Int64) { id := atomic.NewInt64(1) appended := atomic.NewInt64(0) operator := mock_broadcaster.NewMockAppendOperator(t) - f := func(ctx context.Context, msgs ...message.MutableMessage) streaming.AppendResponses { - resps := streaming.AppendResponses{ - Responses: make([]streaming.AppendResponse, len(msgs)), + f := func(ctx context.Context, msgs ...message.MutableMessage) types.AppendResponses { + resps := types.AppendResponses{ + Responses: make([]types.AppendResponse, len(msgs)), } for idx := range msgs { newID := walimplstest.NewTestMessageID(id.Inc()) - if rand.Int31n(10) < 5 { - resps.Responses[idx] = streaming.AppendResponse{ + if rand.Int31n(10) < 3 { + resps.Responses[idx] = types.AppendResponse{ Error: errors.New("append failed"), } continue } - resps.Responses[idx] = streaming.AppendResponse{ + resps.Responses[idx] = types.AppendResponse{ AppendResult: &types.AppendResult{ MessageID: newID, TimeTick: uint64(time.Now().UnixMilli()), @@ -114,7 +116,10 @@ func createOpeartor(t *testing.T) (AppendOperator, *atomic.Int64) { operator.EXPECT().AppendMessages(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(f) operator.EXPECT().AppendMessages(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(f) operator.EXPECT().AppendMessages(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(f) - return operator, appended + + fOperator := syncutil.NewFuture[AppendOperator]() + fOperator.Set(operator) + return fOperator, appended } func createNewBroadcastMsg(vchannels []string) message.BroadcastMutableMessage { diff --git a/internal/streamingcoord/server/broadcaster/registry/append_operator.go b/internal/streamingcoord/server/broadcaster/registry/append_operator.go new file mode 100644 index 0000000000..e5e59d8bce --- /dev/null +++ b/internal/streamingcoord/server/broadcaster/registry/append_operator.go @@ -0,0 +1,42 @@ +package registry + +import ( + "context" + + "github.com/milvus-io/milvus/internal/util/streamingutil" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/util/syncutil" +) + +type AppendOperatorType int + +const ( + AppendOperatorTypeMsgstream AppendOperatorType = iota + 1 + AppendOperatorTypeStreaming +) + +var localRegistry = make(map[AppendOperatorType]*syncutil.Future[AppendOperator]) + +// AppendOperator is used to append messages, there's only two implement of this interface: +// 1. streaming.WAL() +// 2. old msgstream interface +type AppendOperator interface { + AppendMessages(ctx context.Context, msgs ...message.MutableMessage) types.AppendResponses +} + +func init() { + localRegistry[AppendOperatorTypeMsgstream] = syncutil.NewFuture[AppendOperator]() + localRegistry[AppendOperatorTypeStreaming] = syncutil.NewFuture[AppendOperator]() +} + +func Register(typ AppendOperatorType, op AppendOperator) { + localRegistry[typ].Set(op) +} + +func GetAppendOperator() *syncutil.Future[AppendOperator] { + if streamingutil.IsStreamingServiceEnabled() { + return localRegistry[AppendOperatorTypeStreaming] + } + return localRegistry[AppendOperatorTypeMsgstream] +} diff --git a/internal/streamingcoord/server/broadcaster/registry/test_utility.go b/internal/streamingcoord/server/broadcaster/registry/test_utility.go new file mode 100644 index 0000000000..1e555ff261 --- /dev/null +++ b/internal/streamingcoord/server/broadcaster/registry/test_utility.go @@ -0,0 +1,12 @@ +//go:build test +// +build test + +package registry + +import "github.com/milvus-io/milvus/pkg/util/syncutil" + +func ResetRegistration() { + localRegistry = make(map[AppendOperatorType]*syncutil.Future[AppendOperator]) + localRegistry[AppendOperatorTypeMsgstream] = syncutil.NewFuture[AppendOperator]() + localRegistry[AppendOperatorTypeStreaming] = syncutil.NewFuture[AppendOperator]() +} diff --git a/internal/streamingcoord/server/broadcaster/task.go b/internal/streamingcoord/server/broadcaster/task.go index fff789e200..38693bf509 100644 --- a/internal/streamingcoord/server/broadcaster/task.go +++ b/internal/streamingcoord/server/broadcaster/task.go @@ -49,15 +49,17 @@ type broadcastTask struct { *typeutil.BackoffWithInstant } -// Poll polls the task, return nil if the task is done, otherwise not done. -// Poll can be repeated called until the task is done. -func (b *broadcastTask) Poll(ctx context.Context, operator AppendOperator) error { +// Execute reexecute the task, return nil if the task is done, otherwise not done. +// Execute can be repeated called until the task is done. +// Same semantics as the `Poll` operation in eventloop. +func (b *broadcastTask) Execute(ctx context.Context, operator AppendOperator) error { if len(b.pendingMessages) > 0 { b.logger.Debug("broadcast task is polling to make sent...", zap.Int("pendingMessages", len(b.pendingMessages))) resps := operator.AppendMessages(ctx, b.pendingMessages...) newPendings := make([]message.MutableMessage, 0) for idx, resp := range resps.Responses { if resp.Error != nil { + b.logger.Warn("broadcast task append message failed", zap.Int("idx", idx), zap.Error(resp.Error)) newPendings = append(newPendings, b.pendingMessages[idx]) continue } @@ -67,7 +69,7 @@ func (b *broadcastTask) Poll(ctx context.Context, operator AppendOperator) error if len(newPendings) == 0 { b.future.Set(&types.BroadcastAppendResult{AppendResults: b.appendResult}) } - b.logger.Info("broadcast task make a new broadcast done", zap.Int("pendingMessages", len(b.pendingMessages))) + b.logger.Info("broadcast task make a new broadcast done", zap.Int("backoffRetryMessages", len(b.pendingMessages))) } if len(b.pendingMessages) == 0 { // There's no more pending message, mark the task as done. diff --git a/internal/streamingcoord/server/server.go b/internal/streamingcoord/server/server.go index 67726bcf01..cb0a7e65e8 100644 --- a/internal/streamingcoord/server/server.go +++ b/internal/streamingcoord/server/server.go @@ -9,6 +9,7 @@ import ( "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer" _ "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer/policy" // register the balancer policy "github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster" + "github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry" "github.com/milvus-io/milvus/internal/streamingcoord/server/service" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/streamingutil" @@ -69,7 +70,7 @@ func (s *Server) initBasicComponent(ctx context.Context) (err error) { // So we need to recover it. futures = append(futures, conc.Go(func() (struct{}, error) { s.logger.Info("start recovery broadcaster...") - broadcaster, err := broadcaster.RecoverBroadcaster(ctx, broadcaster.NewAppendOperator()) + broadcaster, err := broadcaster.RecoverBroadcaster(ctx, registry.GetAppendOperator()) if err != nil { s.logger.Warn("recover broadcaster failed", zap.Error(err)) return struct{}{}, err diff --git a/pkg/streaming/util/message/adaptor/broadcast_message.go b/pkg/streaming/util/message/adaptor/broadcast_message.go new file mode 100644 index 0000000000..69bcde5dad --- /dev/null +++ b/pkg/streaming/util/message/adaptor/broadcast_message.go @@ -0,0 +1,25 @@ +package adaptor + +import ( + "github.com/cockroachdb/errors" + + "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/streaming/util/message" +) + +func NewMsgPackFromMutableMessageV1(msg message.MutableMessage) (msgstream.TsMsg, error) { + if msg.Version() != message.VersionV1 { + return nil, errors.New("Invalid message version") + } + + tsMsg, err := unmashalerDispatcher.Unmarshal(msg.Payload(), MustGetCommonpbMsgTypeFromMessageType(msg.MessageType())) + if err != nil { + return nil, errors.Wrap(err, "Failed to unmarshal message") + } + return recoverMutableMessageFromHeader(tsMsg, msg) +} + +func recoverMutableMessageFromHeader(tsMsg msgstream.TsMsg, _ message.MutableMessage) (msgstream.TsMsg, error) { + // TODO: fillback the header information to tsMsg + return tsMsg, nil +} diff --git a/pkg/streaming/util/types/responses.go b/pkg/streaming/util/types/responses.go new file mode 100644 index 0000000000..7f9dcf6908 --- /dev/null +++ b/pkg/streaming/util/types/responses.go @@ -0,0 +1,113 @@ +package types + +import ( + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/milvus-io/milvus/pkg/proto/messagespb" + "github.com/milvus-io/milvus/pkg/proto/streamingpb" + "github.com/milvus-io/milvus/pkg/streaming/util/message" +) + +// BroadcastAppendResult is the result of broadcast append operation. +type BroadcastAppendResult struct { + AppendResults map[string]*AppendResult // make the channel name to the append result. +} + +// GetAppendResult returns the append result of the given channel. +func (r *BroadcastAppendResult) GetAppendResult(channelName string) *AppendResult { + return r.AppendResults[channelName] +} + +// AppendResult is the result of append operation. +type AppendResult struct { + // MessageID is generated by underlying walimpls. + MessageID message.MessageID + + // TimeTick is the time tick of the message. + // Set by timetick interceptor. + TimeTick uint64 + + // TxnCtx is the transaction context of the message. + // If the message is not belong to a transaction, the TxnCtx will be nil. + TxnCtx *message.TxnContext + + // Extra is the extra information of the append result. + Extra *anypb.Any +} + +// GetExtra unmarshal the extra information to the given message. +func (r *AppendResult) GetExtra(m proto.Message) error { + return anypb.UnmarshalTo(r.Extra, m, proto.UnmarshalOptions{ + DiscardUnknown: true, + AllowPartial: true, + }) +} + +// IntoProto converts the append result to proto. +func (r *AppendResult) IntoProto() *streamingpb.ProduceMessageResponseResult { + return &streamingpb.ProduceMessageResponseResult{ + Id: &messagespb.MessageID{ + Id: r.MessageID.Marshal(), + }, + Timetick: r.TimeTick, + TxnContext: r.TxnCtx.IntoProto(), + Extra: r.Extra, + } +} + +// NewAppendResponseN creates a new append response. +func NewAppendResponseN(n int) AppendResponses { + return AppendResponses{ + Responses: make([]AppendResponse, n), + } +} + +// AppendResponse is the response of one append operation. +type AppendResponse struct { + AppendResult *AppendResult + Error error +} + +// AppendResponses is the response of append operation. +type AppendResponses struct { + Responses []AppendResponse +} + +func (a AppendResponses) MaxTimeTick() uint64 { + var maxTimeTick uint64 + for _, r := range a.Responses { + if r.AppendResult != nil && r.AppendResult.TimeTick > maxTimeTick { + maxTimeTick = r.AppendResult.TimeTick + } + } + return maxTimeTick +} + +// UnwrapFirstError returns the first error in the responses. +func (a AppendResponses) UnwrapFirstError() error { + for _, r := range a.Responses { + if r.Error != nil { + return r.Error + } + } + return nil +} + +// FillAllError fills all the responses with the same error. +func (a *AppendResponses) FillAllError(err error) { + for i := range a.Responses { + a.Responses[i].Error = err + } +} + +// FillResponseAtIdx fill the response at idx +func (a *AppendResponses) FillResponseAtIdx(resp AppendResponse, idx int) { + a.Responses[idx] = resp +} + +func (a *AppendResponses) FillAllResponse(resp AppendResponse) { + for i := range a.Responses { + a.Responses[i] = resp + } +} diff --git a/pkg/streaming/util/types/streaming_node.go b/pkg/streaming/util/types/streaming_node.go index bc4118cb30..f2a68b99e2 100644 --- a/pkg/streaming/util/types/streaming_node.go +++ b/pkg/streaming/util/types/streaming_node.go @@ -4,12 +4,8 @@ import ( "context" "github.com/cockroachdb/errors" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/anypb" - "github.com/milvus-io/milvus/pkg/proto/messagespb" "github.com/milvus-io/milvus/pkg/proto/streamingpb" - "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -88,50 +84,3 @@ func (n *StreamingNodeStatus) ErrorOfNode() error { } return n.Err } - -// BroadcastAppendResult is the result of broadcast append operation. -type BroadcastAppendResult struct { - AppendResults map[string]*AppendResult // make the channel name to the append result. -} - -// GetAppendResult returns the append result of the given channel. -func (r *BroadcastAppendResult) GetAppendResult(channelName string) *AppendResult { - return r.AppendResults[channelName] -} - -// AppendResult is the result of append operation. -type AppendResult struct { - // MessageID is generated by underlying walimpls. - MessageID message.MessageID - - // TimeTick is the time tick of the message. - // Set by timetick interceptor. - TimeTick uint64 - - // TxnCtx is the transaction context of the message. - // If the message is not belong to a transaction, the TxnCtx will be nil. - TxnCtx *message.TxnContext - - // Extra is the extra information of the append result. - Extra *anypb.Any -} - -// GetExtra unmarshal the extra information to the given message. -func (r *AppendResult) GetExtra(m proto.Message) error { - return anypb.UnmarshalTo(r.Extra, m, proto.UnmarshalOptions{ - DiscardUnknown: true, - AllowPartial: true, - }) -} - -// IntoProto converts the append result to proto. -func (r *AppendResult) IntoProto() *streamingpb.ProduceMessageResponseResult { - return &streamingpb.ProduceMessageResponseResult{ - Id: &messagespb.MessageID{ - Id: r.MessageID.Marshal(), - }, - Timetick: r.TimeTick, - TxnContext: r.TxnCtx.IntoProto(), - Extra: r.Extra, - } -} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 37b6a2fdf7..b16f836d96 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -4828,6 +4828,9 @@ type streamingConfig struct { WALBalancerBackoffInitialInterval ParamItem `refreshable:"true"` WALBalancerBackoffMultiplier ParamItem `refreshable:"true"` + // broadcaster + WALBroadcasterConcurrencyRatio ParamItem `refreshable:"false"` + // txn TxnDefaultKeepaliveTimeout ParamItem `refreshable:"true"` } @@ -4861,6 +4864,15 @@ It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDura } p.WALBalancerBackoffMultiplier.Init(base.mgr) + p.WALBroadcasterConcurrencyRatio = ParamItem{ + Key: "streaming.walBroadcaster.concurrencyRatio", + Version: "2.5.4", + Doc: `The concurrency ratio based on number of CPU for wal broadcaster, 1 by default.`, + DefaultValue: "1", + Export: true, + } + p.WALBroadcasterConcurrencyRatio.Init(base.mgr) + // txn p.TxnDefaultKeepaliveTimeout = ParamItem{ Key: "streaming.txn.defaultKeepaliveTimeout", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 2fbe4079cc..9b95f3384a 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -614,14 +614,17 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 1*time.Minute, params.StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse()) assert.Equal(t, 50*time.Millisecond, params.StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse()) assert.Equal(t, 2.0, params.StreamingCfg.WALBalancerBackoffMultiplier.GetAsFloat()) + assert.Equal(t, 1.0, params.StreamingCfg.WALBroadcasterConcurrencyRatio.GetAsFloat()) assert.Equal(t, 10*time.Second, params.StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse()) params.Save(params.StreamingCfg.WALBalancerTriggerInterval.Key, "50s") params.Save(params.StreamingCfg.WALBalancerBackoffInitialInterval.Key, "50s") params.Save(params.StreamingCfg.WALBalancerBackoffMultiplier.Key, "3.5") + params.Save(params.StreamingCfg.WALBroadcasterConcurrencyRatio.Key, "1.5") params.Save(params.StreamingCfg.TxnDefaultKeepaliveTimeout.Key, "3500ms") assert.Equal(t, 50*time.Second, params.StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse()) assert.Equal(t, 50*time.Second, params.StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse()) assert.Equal(t, 3.5, params.StreamingCfg.WALBalancerBackoffMultiplier.GetAsFloat()) + assert.Equal(t, 1.5, params.StreamingCfg.WALBroadcasterConcurrencyRatio.GetAsFloat()) assert.Equal(t, 3500*time.Millisecond, params.StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse()) }) diff --git a/tests/integration/coorddownsearch/search_after_coord_down_test.go b/tests/integration/coorddownsearch/search_after_coord_down_test.go index 1b02bd0a6e..ead15e6f19 100644 --- a/tests/integration/coorddownsearch/search_after_coord_down_test.go +++ b/tests/integration/coorddownsearch/search_after_coord_down_test.go @@ -30,6 +30,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/funcutil" @@ -302,6 +303,8 @@ func (s *CoordDownSearch) searchAfterCoordDown() float64 { s.searchFailed(searchCollectionName, Dim, commonpb.ConsistencyLevel_Strong) log.Info(fmt.Sprintf("=========================Failed search cost: %fs=========================", time.Since(failedStart).Seconds())) + registry.ResetRegistration() + log.Info("=========================restart Root Coordinators=========================") c.StartRootCoord() s.search(searchCollectionName, Dim, commonpb.ConsistencyLevel_Eventually) diff --git a/tests/integration/coordrecovery/coord_recovery_test.go b/tests/integration/coordrecovery/coord_recovery_test.go index 46f02c593b..b80c63023a 100644 --- a/tests/integration/coordrecovery/coord_recovery_test.go +++ b/tests/integration/coordrecovery/coord_recovery_test.go @@ -30,6 +30,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/funcutil" @@ -242,6 +243,8 @@ func (s *CoordSwitchSuite) switchCoord() float64 { log.Info("=========================Coordinators stopped=========================", zap.Duration("elapsed", time.Since(start))) start = time.Now() + registry.ResetRegistration() + c.StartRootCoord() log.Info("=========================RootCoord restarted=========================") c.StartDataCoord() diff --git a/tests/integration/suite.go b/tests/integration/suite.go index 68751b40f1..884bf945b7 100644 --- a/tests/integration/suite.go +++ b/tests/integration/suite.go @@ -30,6 +30,7 @@ import ( "go.uber.org/zap/zapcore" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry" "github.com/milvus-io/milvus/internal/util/hookutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/etcd" @@ -169,4 +170,5 @@ func (s *MiniClusterSuite) TearDownTest() { if s.Cluster != nil { s.Cluster.Stop() } + registry.ResetRegistration() }