mirror of https://github.com/milvus-io/milvus.git
99 lines
2.8 KiB
Go
99 lines
2.8 KiB
Go
package discover
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
|
|
"github.com/cockroachdb/errors"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer"
|
|
"github.com/milvus-io/milvus/internal/streamingcoord/server/resource"
|
|
"github.com/milvus-io/milvus/pkg/log"
|
|
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
|
|
"github.com/milvus-io/milvus/pkg/streaming/util/types"
|
|
)
|
|
|
|
var errClosedByUser = errors.New("closed by user")
|
|
|
|
func NewAssignmentDiscoverServer(
|
|
balancer balancer.Balancer,
|
|
streamServer streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverServer,
|
|
) *AssignmentDiscoverServer {
|
|
ctx, cancel := context.WithCancelCause(streamServer.Context())
|
|
return &AssignmentDiscoverServer{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
balancer: balancer,
|
|
streamServer: discoverGrpcServerHelper{
|
|
streamServer,
|
|
},
|
|
logger: resource.Resource().Logger().With(log.FieldComponent("assignment-discover-server")),
|
|
}
|
|
}
|
|
|
|
type AssignmentDiscoverServer struct {
|
|
ctx context.Context
|
|
cancel context.CancelCauseFunc
|
|
balancer balancer.Balancer
|
|
streamServer discoverGrpcServerHelper
|
|
logger *log.MLogger
|
|
}
|
|
|
|
func (s *AssignmentDiscoverServer) Execute() error {
|
|
// Start a recv arm to handle the control message from client.
|
|
go func() {
|
|
// recv loop will be blocked until the stream is closed.
|
|
// 1. close by client.
|
|
// 2. close by server context cancel by return of outside Execute.
|
|
_ = s.recvLoop()
|
|
}()
|
|
|
|
// Start a send loop on current main goroutine.
|
|
// the loop will be blocked until:
|
|
// 1. the stream is broken.
|
|
// 2. recv arm recv closed and all response is sent.
|
|
return s.sendLoop()
|
|
}
|
|
|
|
// recvLoop receives the message from client.
|
|
func (s *AssignmentDiscoverServer) recvLoop() (err error) {
|
|
defer func() {
|
|
if err != nil {
|
|
s.cancel(err)
|
|
s.logger.Warn("recv arm of stream closed by unexpected error", zap.Error(err))
|
|
return
|
|
}
|
|
s.cancel(errClosedByUser)
|
|
s.logger.Info("recv arm of stream closed")
|
|
}()
|
|
|
|
for {
|
|
req, err := s.streamServer.Recv()
|
|
if err == io.EOF {
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
switch req := req.Command.(type) {
|
|
case *streamingpb.AssignmentDiscoverRequest_ReportError:
|
|
channel := types.NewPChannelInfoFromProto(req.ReportError.GetPchannel())
|
|
// mark the channel as unavailable and trigger a recover right away.
|
|
s.balancer.MarkAsUnavailable(s.ctx, []types.PChannelInfo{channel})
|
|
case *streamingpb.AssignmentDiscoverRequest_Close:
|
|
default:
|
|
s.logger.Warn("unknown command type", zap.Any("command", req))
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendLoop sends the message to client.
|
|
func (s *AssignmentDiscoverServer) sendLoop() error {
|
|
err := s.balancer.WatchChannelAssignments(s.ctx, s.streamServer.SendFullAssignment)
|
|
if errors.Is(err, errClosedByUser) {
|
|
return s.streamServer.SendCloseResponse()
|
|
}
|
|
return err
|
|
}
|