mirror of https://github.com/milvus-io/milvus.git
Fix the problem of stuck after loadBalance and loadFieldData error (#5960)
* delete nodeInfo after nodeDown Signed-off-by: xige-16 <xi.ge@zilliz.com> * fix load balance can's stop Signed-off-by: xige-16 <xi.ge@zilliz.com> * fix load field data error Signed-off-by: xige-16 <xi.ge@zilliz.com> * contiue task loop after error in queryService Signed-off-by: xige-16 <xi.ge@zilliz.com>pull/5958/head
parent
5cdc1af1af
commit
7039fb7c82
|
@ -17,6 +17,7 @@ import (
|
|||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
|
||||
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
|
||||
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
|
||||
|
@ -105,6 +106,9 @@ func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error)
|
|||
if err == nil {
|
||||
return ret, nil
|
||||
}
|
||||
if c.conn.GetState() == connectivity.Shutdown {
|
||||
return ret, err
|
||||
}
|
||||
for i := 0; i < c.reconnTry; i++ {
|
||||
err = c.connect()
|
||||
if err == nil {
|
||||
|
|
|
@ -84,6 +84,7 @@ type ReplicaInterface interface {
|
|||
replaceGrowingSegmentBySealedSegment(segment *Segment) error
|
||||
|
||||
freeAll()
|
||||
printReplica()
|
||||
}
|
||||
|
||||
type collectionReplica struct {
|
||||
|
@ -99,6 +100,16 @@ type collectionReplica struct {
|
|||
etcdKV *etcdkv.EtcdKV
|
||||
}
|
||||
|
||||
func (colReplica *collectionReplica) printReplica() {
|
||||
colReplica.mu.Lock()
|
||||
defer colReplica.mu.Unlock()
|
||||
|
||||
log.Debug("collections in collectionReplica", zap.Any("info", colReplica.collections))
|
||||
log.Debug("partitions in collectionReplica", zap.Any("info", colReplica.partitions))
|
||||
log.Debug("segments in collectionReplica", zap.Any("info", colReplica.segments))
|
||||
log.Debug("excludedSegments in collectionReplica", zap.Any("info", colReplica.excludedSegments))
|
||||
}
|
||||
|
||||
//----------------------------------------------------------------------------------------------------- collection
|
||||
func (colReplica *collectionReplica) getCollectionIDs() []UniqueID {
|
||||
colReplica.mu.RLock()
|
||||
|
|
|
@ -374,6 +374,7 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmen
|
|||
return info
|
||||
}
|
||||
// get info from historical
|
||||
node.historical.replica.printReplica()
|
||||
for _, id := range in.SegmentIDs {
|
||||
log.Debug("QueryNode::Impl::GetSegmentInfo for historical", zap.Any("SegmentID", id))
|
||||
segment, err := node.historical.replica.getSegmentByID(id)
|
||||
|
@ -387,8 +388,10 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmen
|
|||
infos = append(infos, info)
|
||||
}
|
||||
// get info from streaming
|
||||
node.streaming.replica.printReplica()
|
||||
for _, id := range in.SegmentIDs {
|
||||
log.Debug("QueryNode::Impl::GetSegmentInfo for streaming", zap.Any("SegmentID", id))
|
||||
|
||||
segment, err := node.streaming.replica.getSegmentByID(id)
|
||||
if err != nil {
|
||||
log.Debug("QueryNode::Impl::GetSegmentInfo, for streaming segmentID not exist", zap.Any("SegmentID", id))
|
||||
|
|
|
@ -234,19 +234,18 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, binlogPaths
|
|||
zap.Any("fieldID", fieldID),
|
||||
zap.String("paths", fmt.Sprintln(paths)),
|
||||
)
|
||||
blob := &storage.Blob{
|
||||
Key: strconv.FormatInt(fieldID, 10),
|
||||
Value: make([]byte, 0),
|
||||
}
|
||||
for _, path := range paths {
|
||||
binLog, err := loader.minioKV.Load(path)
|
||||
if err != nil {
|
||||
// TODO: return or continue?
|
||||
return err
|
||||
}
|
||||
blob.Value = append(blob.Value, []byte(binLog)...)
|
||||
blob := &storage.Blob{
|
||||
Key: strconv.FormatInt(fieldID, 10),
|
||||
Value: []byte(binLog),
|
||||
}
|
||||
blobs = append(blobs, blob)
|
||||
}
|
||||
blobs = append(blobs, blob)
|
||||
}
|
||||
|
||||
_, _, insertData, err := iCodec.Deserialize(blobs)
|
||||
|
|
|
@ -406,8 +406,23 @@ func (c *queryNodeCluster) RegisterNode(session *sessionutil.Session, id UniqueI
|
|||
}
|
||||
|
||||
func (c *queryNodeCluster) removeNodeInfo(nodeID int64) error {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
key := fmt.Sprintf("%s/%d", queryNodeInfoPrefix, nodeID)
|
||||
return c.client.Remove(key)
|
||||
err := c.client.Remove(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = c.nodes[nodeID].clearNodeInfo()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
delete(c.nodes, nodeID)
|
||||
log.Debug("delete nodeInfo in cluster meta and etcd", zap.Int64("nodeID", nodeID))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *queryNodeCluster) onServiceNodeIDs() ([]int64, error) {
|
||||
|
|
|
@ -310,6 +310,17 @@ func (qn *queryNode) removeCollectionInfo(collectionID UniqueID) error {
|
|||
return qn.kvClient.Remove(key)
|
||||
}
|
||||
|
||||
func (qn *queryNode) clearNodeInfo() error {
|
||||
for collectionID := range qn.collectionInfos {
|
||||
err := qn.removeCollectionInfo(collectionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (qn *queryNode) setNodeState(onService bool) {
|
||||
qn.Lock()
|
||||
defer qn.Unlock()
|
||||
|
|
|
@ -194,6 +194,7 @@ func (qs *QueryService) watchNodeLoop() {
|
|||
for nodeID := range qs.cluster.nodes {
|
||||
if _, ok := sessionMap[nodeID]; !ok {
|
||||
qs.cluster.nodes[nodeID].setNodeState(false)
|
||||
qs.cluster.nodes[nodeID].client.Stop()
|
||||
loadBalanceSegment := &querypb.LoadBalanceRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_LoadBalanceSegments,
|
||||
|
@ -236,6 +237,7 @@ func (qs *QueryService) watchNodeLoop() {
|
|||
serverID := event.Session.ServerID
|
||||
log.Debug("QueryService", zap.Any("The QueryNode crashed with ID", serverID))
|
||||
qs.cluster.nodes[serverID].setNodeState(false)
|
||||
qs.cluster.nodes[serverID].client.Stop()
|
||||
loadBalanceSegment := &querypb.LoadBalanceRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_LoadBalanceSegments,
|
||||
|
|
|
@ -1273,6 +1273,12 @@ func (lbt *LoadBalanceTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (lbt *LoadBalanceTask) PostExecute(ctx context.Context) error {
|
||||
for _, id := range lbt.SourceNodeIDs {
|
||||
err := lbt.cluster.removeNodeInfo(id)
|
||||
if err != nil {
|
||||
log.Error("LoadBalanceTask: remove mode info error", zap.Int64("nodeID", id))
|
||||
}
|
||||
}
|
||||
log.Debug("LoadBalanceTask postExecute done",
|
||||
zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs),
|
||||
zap.Any("balanceReason", lbt.BalanceReason),
|
||||
|
|
|
@ -504,8 +504,9 @@ func (scheduler *TaskScheduler) processTask(t task) error {
|
|||
func (scheduler *TaskScheduler) scheduleLoop() {
|
||||
defer scheduler.wg.Done()
|
||||
activeTaskWg := &sync.WaitGroup{}
|
||||
var err error = nil
|
||||
|
||||
for {
|
||||
var err error = nil
|
||||
select {
|
||||
case <-scheduler.ctx.Done():
|
||||
return
|
||||
|
@ -516,6 +517,7 @@ func (scheduler *TaskScheduler) scheduleLoop() {
|
|||
err = scheduler.processTask(t)
|
||||
if err != nil {
|
||||
log.Error("scheduleLoop: process task error", zap.Any("error", err.Error()))
|
||||
t.Notify(err)
|
||||
continue
|
||||
}
|
||||
if t.Type() == commonpb.MsgType_LoadCollection || t.Type() == commonpb.MsgType_LoadPartitions {
|
||||
|
@ -532,9 +534,9 @@ func (scheduler *TaskScheduler) scheduleLoop() {
|
|||
}
|
||||
}
|
||||
activeTaskWg.Wait()
|
||||
if t.Type() == commonpb.MsgType_ReleaseCollection || t.Type() == commonpb.MsgType_ReleasePartitions {
|
||||
t.Notify(err)
|
||||
}
|
||||
//if t.Type() == commonpb.MsgType_ReleaseCollection || t.Type() == commonpb.MsgType_ReleasePartitions {
|
||||
// t.Notify(err)
|
||||
//}
|
||||
keys := make([]string, 0)
|
||||
taskKey := fmt.Sprintf("%s/%d", triggerTaskPrefix, t.ID())
|
||||
stateKey := fmt.Sprintf("%s/%d", taskInfoPrefix, t.ID())
|
||||
|
@ -543,8 +545,11 @@ func (scheduler *TaskScheduler) scheduleLoop() {
|
|||
err = scheduler.client.MultiRemove(keys)
|
||||
if err != nil {
|
||||
log.Error("scheduleLoop: error when remove trigger task to etcd", zap.Int64("taskID", t.ID()))
|
||||
t.Notify(err)
|
||||
continue
|
||||
}
|
||||
log.Debug("scheduleLoop: trigger task done and delete from etcd", zap.Int64("taskID", t.ID()))
|
||||
t.Notify(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue