Add global mutex for segment allocate

Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/4973/head^2
sunby 2021-03-20 20:41:39 +08:00 committed by yefu.chen
parent fca7c1594f
commit f4d055cf42
1 changed files with 5 additions and 0 deletions

View File

@ -61,6 +61,7 @@ type Server struct {
insertChannels []string
msFactory msgstream.Factory
ttBarrier timesync.TimeTickBarrier
allocMu sync.RWMutex
}
func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
@ -499,6 +500,8 @@ func (s *Server) newDataNode(ip string, port int64, id UniqueID) (*dataNode, err
}
func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb.Status, error) {
s.allocMu.Lock()
defer s.allocMu.Unlock()
if !s.checkStateIsHealthy() {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -512,6 +515,8 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb
}
func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
s.allocMu.Lock()
defer s.allocMu.Unlock()
resp := &datapb.AssignSegmentIDResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,