mirror of https://github.com/milvus-io/milvus.git
47 lines
1.6 KiB
Go
47 lines
1.6 KiB
Go
package service
|
|
|
|
import (
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer"
|
|
"github.com/milvus-io/milvus/internal/streamingcoord/server/service/discover"
|
|
"github.com/milvus-io/milvus/pkg/v2/metrics"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
|
|
)
|
|
|
|
var _ streamingpb.StreamingCoordAssignmentServiceServer = (*assignmentServiceImpl)(nil)
|
|
|
|
// NewAssignmentService returns a new assignment service.
|
|
func NewAssignmentService(
|
|
balancer *syncutil.Future[balancer.Balancer],
|
|
) streamingpb.StreamingCoordAssignmentServiceServer {
|
|
return &assignmentServiceImpl{
|
|
balancer: balancer,
|
|
listenerTotal: metrics.StreamingCoordAssignmentListenerTotal.WithLabelValues(paramtable.GetStringNodeID()),
|
|
}
|
|
}
|
|
|
|
type AssignmentService interface {
|
|
streamingpb.StreamingCoordAssignmentServiceServer
|
|
}
|
|
|
|
// assignmentServiceImpl is the implementation of the assignment service.
|
|
type assignmentServiceImpl struct {
|
|
balancer *syncutil.Future[balancer.Balancer]
|
|
listenerTotal prometheus.Gauge
|
|
}
|
|
|
|
// AssignmentDiscover watches the state of all log nodes.
|
|
func (s *assignmentServiceImpl) AssignmentDiscover(server streamingpb.StreamingCoordAssignmentService_AssignmentDiscoverServer) error {
|
|
s.listenerTotal.Inc()
|
|
defer s.listenerTotal.Dec()
|
|
|
|
balancer, err := s.balancer.GetWithContext(server.Context())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return discover.NewAssignmentDiscoverServer(balancer, server).Execute()
|
|
}
|