mirror of https://github.com/milvus-io/milvus.git
parent
f8c9c527c9
commit
2b81933d13
|
@ -73,7 +73,7 @@ func (dc *ControllerImpl) SyncAll(ctx context.Context) {
|
|||
wg.Add(1)
|
||||
go func(handler *distHandler) {
|
||||
defer wg.Done()
|
||||
handler.getDistribution(ctx, nil)
|
||||
handler.getDistribution(ctx)
|
||||
}(h)
|
||||
}
|
||||
wg.Wait()
|
||||
|
|
|
@ -32,6 +32,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/internal/util/etcd"
|
||||
"github.com/milvus-io/milvus/internal/util/merr"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
)
|
||||
|
||||
|
@ -83,7 +84,7 @@ func (suite *DistControllerTestSuite) TearDownSuite() {
|
|||
func (suite *DistControllerTestSuite) TestStart() {
|
||||
dispatchCalled := atomic.NewBool(false)
|
||||
suite.mockCluster.EXPECT().GetDataDistribution(mock.Anything, mock.Anything, mock.Anything).Return(
|
||||
&querypb.GetDataDistributionResponse{NodeID: 1},
|
||||
&querypb.GetDataDistributionResponse{Status: merr.Status(nil), NodeID: 1},
|
||||
nil,
|
||||
)
|
||||
suite.mockScheduler.EXPECT().Dispatch(int64(1)).Run(func(node int64) { dispatchCalled.Store(true) })
|
||||
|
@ -111,7 +112,7 @@ func (suite *DistControllerTestSuite) TestStop() {
|
|||
suite.controller.StartDistInstance(context.TODO(), 1)
|
||||
called := atomic.NewBool(false)
|
||||
suite.mockCluster.EXPECT().GetDataDistribution(mock.Anything, mock.Anything, mock.Anything).Maybe().Return(
|
||||
&querypb.GetDataDistributionResponse{NodeID: 1},
|
||||
&querypb.GetDataDistributionResponse{Status: merr.Status(nil), NodeID: 1},
|
||||
nil,
|
||||
).Run(func(args mock.Arguments) {
|
||||
called.Store(true)
|
||||
|
@ -136,6 +137,7 @@ func (suite *DistControllerTestSuite) TestSyncAll() {
|
|||
suite.mockCluster.EXPECT().GetDataDistribution(mock.Anything, mock.Anything, mock.Anything).Call.Return(
|
||||
func(ctx context.Context, nodeID int64, req *querypb.GetDataDistributionRequest) *querypb.GetDataDistributionResponse {
|
||||
return &querypb.GetDataDistributionResponse{
|
||||
Status: merr.Status(nil),
|
||||
NodeID: nodeID,
|
||||
}
|
||||
},
|
||||
|
|
|
@ -18,7 +18,6 @@ package dist
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -32,6 +31,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/internal/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/internal/util/merr"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -55,55 +55,35 @@ type distHandler struct {
|
|||
|
||||
func (dh *distHandler) start(ctx context.Context) {
|
||||
defer dh.wg.Done()
|
||||
logger := log.Ctx(ctx).With(zap.Int64("nodeID", dh.nodeID)).WithRateGroup("qnv2.distHandler", 1, 60)
|
||||
logger.Info("start dist handler")
|
||||
log := log.Ctx(ctx).With(zap.Int64("nodeID", dh.nodeID)).WithRateGroup("qnv2.distHandler", 1, 60)
|
||||
log.Info("start dist handler")
|
||||
ticker := time.NewTicker(Params.QueryCoordCfg.DistPullInterval.GetAsDuration(time.Millisecond))
|
||||
defer ticker.Stop()
|
||||
failures := 0
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Info("close dist handler due to context done")
|
||||
log.Info("close dist handler due to context done")
|
||||
return
|
||||
case <-dh.c:
|
||||
logger.Info("close dist handelr")
|
||||
log.Info("close dist handler")
|
||||
return
|
||||
case <-ticker.C:
|
||||
dh.getDistribution(ctx, func(isFail bool) {
|
||||
if isFail {
|
||||
failures++
|
||||
node := dh.nodeManager.Get(dh.nodeID)
|
||||
if node != nil {
|
||||
log.RatedDebug(30.0, "failed to get node's data distribution",
|
||||
zap.Int64("nodeID", dh.nodeID),
|
||||
zap.Time("lastHeartbeat", node.LastHeartbeat()),
|
||||
)
|
||||
}
|
||||
} else {
|
||||
failures = 0
|
||||
err := dh.getDistribution(ctx)
|
||||
if err != nil {
|
||||
node := dh.nodeManager.Get(dh.nodeID)
|
||||
fields := []zap.Field{zap.Int("times", failures)}
|
||||
if node != nil {
|
||||
fields = append(fields, zap.Time("lastHeartbeat", node.LastHeartbeat()))
|
||||
}
|
||||
|
||||
if failures >= maxFailureTimes {
|
||||
log.RatedInfo(30.0, fmt.Sprintf("can not get data distribution from node %d for %d times", dh.nodeID, failures))
|
||||
// TODO: kill the querynode server and stop the loop?
|
||||
}
|
||||
})
|
||||
log.RatedWarn(30.0, "failed to get data distribution", fields...)
|
||||
} else {
|
||||
failures = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (dh *distHandler) logFailureInfo(resp *querypb.GetDataDistributionResponse, err error) {
|
||||
log := log.With(zap.Int64("nodeID", dh.nodeID))
|
||||
if err != nil {
|
||||
log.Warn("failed to get data distribution",
|
||||
zap.Error(err))
|
||||
} else if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
|
||||
log.Warn("failed to get data distribution",
|
||||
zap.Any("errorCode", resp.GetStatus().GetErrorCode()),
|
||||
zap.Any("reason", resp.GetStatus().GetReason()))
|
||||
}
|
||||
}
|
||||
|
||||
func (dh *distHandler) handleDistResp(resp *querypb.GetDataDistributionResponse) {
|
||||
node := dh.nodeManager.Get(resp.GetNodeID())
|
||||
if node != nil {
|
||||
|
@ -217,27 +197,26 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons
|
|||
dh.dist.LeaderViewManager.Update(resp.GetNodeID(), updates...)
|
||||
}
|
||||
|
||||
func (dh *distHandler) getDistribution(ctx context.Context, fn func(isFail bool)) {
|
||||
func (dh *distHandler) getDistribution(ctx context.Context) error {
|
||||
dh.mu.Lock()
|
||||
defer dh.mu.Unlock()
|
||||
cctx, cancel := context.WithTimeout(ctx, distReqTimeout)
|
||||
resp, err := dh.client.GetDataDistribution(cctx, dh.nodeID, &querypb.GetDataDistributionRequest{
|
||||
ctx, cancel := context.WithTimeout(ctx, distReqTimeout)
|
||||
defer cancel()
|
||||
resp, err := dh.client.GetDataDistribution(ctx, dh.nodeID, &querypb.GetDataDistributionRequest{
|
||||
Base: commonpbutil.NewMsgBase(
|
||||
commonpbutil.WithMsgType(commonpb.MsgType_GetDistribution),
|
||||
),
|
||||
})
|
||||
cancel()
|
||||
|
||||
isFail := err != nil || resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success
|
||||
if isFail {
|
||||
dh.logFailureInfo(resp, err)
|
||||
} else {
|
||||
dh.handleDistResp(resp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !merr.Ok(resp.GetStatus()) {
|
||||
return merr.Error(resp.GetStatus())
|
||||
}
|
||||
|
||||
if fn != nil {
|
||||
fn(isFail)
|
||||
}
|
||||
dh.handleDistResp(resp)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dh *distHandler) stop() {
|
||||
|
|
|
@ -267,8 +267,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(collectionID int64) bool {
|
|||
}
|
||||
|
||||
func (ob *TargetObserver) updateCurrentTarget(collectionID int64) {
|
||||
log.Warn("observer trigger update current target",
|
||||
zap.Int64("collectionID", collectionID))
|
||||
log.Info("observer trigger update current target", zap.Int64("collectionID", collectionID))
|
||||
ob.targetMgr.UpdateCollectionCurrentTarget(collectionID)
|
||||
|
||||
ob.mut.Lock()
|
||||
|
|
Loading…
Reference in New Issue