From 51eb12a03420785b2e2b042874a41697da55749f Mon Sep 17 00:00:00 2001 From: yah01 Date: Thu, 29 Sep 2022 18:02:54 +0800 Subject: [PATCH] Fix QueryNode panics (#19541) Applying delete messages during the segment was compacted, the QueryNode would panic. Signed-off-by: yah01 Signed-off-by: yah01 --- internal/querycoordv2/server.go | 10 ++--- internal/querynode/flow_graph_insert_node.go | 37 ++++++++++++++---- .../querynode/flow_graph_insert_node_test.go | 38 +++++-------------- internal/querynode/meta_replica.go | 19 ++++++++-- internal/querynode/shard_cluster.go | 2 +- internal/querynode/task.go | 7 ++++ internal/querynode/task_test.go | 2 +- 7 files changed, 69 insertions(+), 46 deletions(-) diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index f902e367d5..feea6695fc 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -618,13 +618,13 @@ func (s *Server) checkReplicas() { toRemove = append(toRemove, node) } } - log := log.With( - zap.Int64("replicaID", replica.GetID()), - zap.Int64s("offlineNodes", toRemove), - ) - log.Debug("some nodes are offline, remove them from replica") if len(toRemove) > 0 { + log := log.With( + zap.Int64("replicaID", replica.GetID()), + zap.Int64s("offlineNodes", toRemove), + ) + log.Debug("some nodes are offline, remove them from replica") replica.RemoveNode(toRemove...) err := s.meta.ReplicaManager.Put(replica) if err != nil { diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index 84679ab48e..815685d5e1 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -19,6 +19,7 @@ package querynode import ( "bytes" "encoding/binary" + "errors" "fmt" "io" "reflect" @@ -203,7 +204,8 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { segmentID := segmentID wg.Add(1) go func() { - err := iNode.insert(&iData, segmentID, &wg) + defer wg.Done() + err := iNode.insert(&iData, segmentID) if err != nil { // error occurs when segment cannot be found or cgo function `Insert` failed err = fmt.Errorf("segment insert failed, segmentID = %d, err = %s", segmentID, err) @@ -241,7 +243,14 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { for segmentID, pks := range delData.deleteIDs { segment, err := iNode.metaReplica.getSegmentByID(segmentID, segmentTypeGrowing) if err != nil { - // error occurs when segment cannot be found, should not happen + if errors.Is(err, ErrSegmentNotFound) { + log.Warn("segment not found when do preDelete, it may have been released due to compaction", + zap.Int64("segmentID", segmentID), + zap.Error(err), + ) + continue + } + err = fmt.Errorf("insertNode getSegmentByID failed, err = %s", err) log.Error(err.Error()) panic(err) @@ -255,7 +264,8 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { segmentID := segmentID wg.Add(1) go func() { - err := iNode.delete(delData, segmentID, &wg) + defer wg.Done() + err := iNode.delete(delData, segmentID) if err != nil { // error occurs when segment cannot be found, calling cgo function delete failed and etc... err = fmt.Errorf("segment delete failed, segmentID = %d, err = %s", segmentID, err) @@ -301,6 +311,13 @@ func processDeleteMessages(replica ReplicaInterface, segType segmentType, msg *m for _, segmentID := range resultSegmentIDs { segment, err := replica.getSegmentByID(segmentID, segType) if err != nil { + if errors.Is(err, ErrSegmentNotFound) { + log.Warn("segment not found when process delete messages, it may have been released due to compaction", + zap.Int64("segmentID", segmentID), + zap.Error(err), + ) + continue + } return err } pks, tss, err := filterSegmentsByPKs(primaryKeys, msg.Timestamps, segment) @@ -346,9 +363,7 @@ func filterSegmentsByPKs(pks []primaryKey, timestamps []Timestamp, segment *Segm } // insert would execute insert operations for specific growing segment -func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync.WaitGroup) error { - defer wg.Done() - +func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID) error { var targetSegment, err = iNode.metaReplica.getSegmentByID(segmentID, segmentTypeGrowing) if err != nil { return fmt.Errorf("getSegmentByID failed, err = %s", err) @@ -372,10 +387,16 @@ func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync. } // delete would execute delete operations for specific growing segment -func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID, wg *sync.WaitGroup) error { - defer wg.Done() +func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID) error { targetSegment, err := iNode.metaReplica.getSegmentByID(segmentID, segmentTypeGrowing) if err != nil { + if errors.Is(err, ErrSegmentNotFound) { + log.Warn("segment not found when applying delete message, it may have been released due to compaction", + zap.Int64("segmentID", segmentID), + zap.Error(err), + ) + return nil + } return fmt.Errorf("getSegmentByID failed, err = %s", err) } diff --git a/internal/querynode/flow_graph_insert_node_test.go b/internal/querynode/flow_graph_insert_node_test.go index dc3efcf57b..840f3985af 100644 --- a/internal/querynode/flow_graph_insert_node_test.go +++ b/internal/querynode/flow_graph_insert_node_test.go @@ -18,7 +18,6 @@ package querynode import ( "fmt" - "sync" "testing" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -106,9 +105,7 @@ func TestFlowGraphInsertNode_insert(t *testing.T) { insertData, err := genFlowGraphInsertData(schema, defaultMsgLength) assert.NoError(t, err) - wg := &sync.WaitGroup{} - wg.Add(1) - err = insertNode.insert(insertData, defaultSegmentID, wg) + err = insertNode.insert(insertData, defaultSegmentID) assert.NoError(t, err) }) @@ -119,10 +116,8 @@ func TestFlowGraphInsertNode_insert(t *testing.T) { insertData, err := genFlowGraphInsertData(schema, defaultMsgLength) assert.NoError(t, err) - wg := &sync.WaitGroup{} - wg.Add(1) insertData.insertRecords[defaultSegmentID] = insertData.insertRecords[defaultSegmentID][:len(insertData.insertRecords[defaultSegmentID])/2] - err = insertNode.insert(insertData, defaultSegmentID, wg) + err = insertNode.insert(insertData, defaultSegmentID) assert.Error(t, err) }) @@ -130,9 +125,7 @@ func TestFlowGraphInsertNode_insert(t *testing.T) { streaming, err := genSimpleReplica() assert.NoError(t, err) insertNode := newInsertNode(streaming, defaultCollectionID, defaultDMLChannel) - wg := &sync.WaitGroup{} - wg.Add(1) - err = insertNode.insert(nil, defaultSegmentID, wg) + err = insertNode.insert(nil, defaultSegmentID) assert.Error(t, err) }) @@ -147,9 +140,7 @@ func TestFlowGraphInsertNode_insert(t *testing.T) { assert.NoError(t, err) seg.setType(segmentTypeSealed) - wg := &sync.WaitGroup{} - wg.Add(1) - err = insertNode.insert(insertData, defaultSegmentID, wg) + err = insertNode.insert(insertData, defaultSegmentID) assert.Error(t, err) }) } @@ -164,15 +155,12 @@ func TestFlowGraphInsertNode_delete(t *testing.T) { insertData, err := genFlowGraphInsertData(schema, defaultMsgLength) assert.NoError(t, err) - wg := &sync.WaitGroup{} - wg.Add(1) - err = insertNode.insert(insertData, defaultSegmentID, wg) + err = insertNode.insert(insertData, defaultSegmentID) assert.NoError(t, err) deleteData, err := genFlowGraphDeleteData() assert.NoError(t, err) - wg.Add(1) - err = insertNode.delete(deleteData, defaultSegmentID, wg) + err = insertNode.delete(deleteData, defaultSegmentID) assert.NoError(t, err) }) @@ -182,9 +170,7 @@ func TestFlowGraphInsertNode_delete(t *testing.T) { deleteData, err := genFlowGraphDeleteData() assert.NoError(t, err) - wg := &sync.WaitGroup{} - wg.Add(1) - err = insertNode.delete(deleteData, defaultSegmentID, wg) + err = insertNode.delete(deleteData, defaultSegmentID) assert.NoError(t, err) }) @@ -194,10 +180,8 @@ func TestFlowGraphInsertNode_delete(t *testing.T) { deleteData, err := genFlowGraphDeleteData() assert.NoError(t, err) - wg := &sync.WaitGroup{} - wg.Add(1) deleteData.deleteTimestamps[defaultSegmentID] = deleteData.deleteTimestamps[defaultSegmentID][:len(deleteData.deleteTimestamps)/2] - err = insertNode.delete(deleteData, defaultSegmentID, wg) + err = insertNode.delete(deleteData, defaultSegmentID) assert.Error(t, err) }) @@ -205,10 +189,8 @@ func TestFlowGraphInsertNode_delete(t *testing.T) { streaming, err := genSimpleReplica() assert.NoError(t, err) insertNode := newInsertNode(streaming, defaultCollectionID, defaultDMLChannel) - wg := &sync.WaitGroup{} - wg.Add(1) - err = insertNode.delete(nil, defaultSegmentID, wg) - assert.Error(t, err) + err = insertNode.delete(nil, defaultSegmentID) + assert.NoError(t, err) }) } diff --git a/internal/querynode/meta_replica.go b/internal/querynode/meta_replica.go index 0e607cd257..71eab54679 100644 --- a/internal/querynode/meta_replica.go +++ b/internal/querynode/meta_replica.go @@ -43,6 +43,19 @@ import ( "github.com/samber/lo" ) +var ( + ErrSegmentNotFound = errors.New("SegmentNotFound") + ErrCollectionNotFound = errors.New("CollectionNotFound") +) + +func WrapSegmentNotFound(segmentID int64) error { + return fmt.Errorf("%w(%v)", ErrSegmentNotFound, segmentID) +} + +func WrapCollectionNotFound(collectionID int64) error { + return fmt.Errorf("%w(%v)", ErrCollectionNotFound, collectionID) +} + // ReplicaInterface specifies all the methods that the Collection object needs to implement in QueryNode. // In common cases, the system has multiple query nodes. The full data of a collection will be distributed // across multiple query nodes, and each query node's collectionReplica will maintain its own part. @@ -240,7 +253,7 @@ func (replica *metaReplica) getCollectionByID(collectionID UniqueID) (*Collectio func (replica *metaReplica) getCollectionByIDPrivate(collectionID UniqueID) (*Collection, error) { collection, ok := replica.collections[collectionID] if !ok { - return nil, fmt.Errorf("collection hasn't been loaded or has been released, collection id = %d", collectionID) + return nil, fmt.Errorf("collection hasn't been loaded or has been released %w", WrapCollectionNotFound(collectionID)) } return collection, nil @@ -685,13 +698,13 @@ func (replica *metaReplica) getSegmentByIDPrivate(segmentID UniqueID, segType se case segmentTypeGrowing: segment, ok := replica.growingSegments[segmentID] if !ok { - return nil, fmt.Errorf("cannot find growing segment %d in QueryNode", segmentID) + return nil, fmt.Errorf("growing %w", WrapSegmentNotFound(segmentID)) } return segment, nil case segmentTypeSealed: segment, ok := replica.sealedSegments[segmentID] if !ok { - return nil, fmt.Errorf("cannot find sealed segment %d in QueryNode", segmentID) + return nil, fmt.Errorf("sealed %w", WrapSegmentNotFound(segmentID)) } return segment, nil default: diff --git a/internal/querynode/shard_cluster.go b/internal/querynode/shard_cluster.go index 21bde53f0b..87fa959fce 100644 --- a/internal/querynode/shard_cluster.go +++ b/internal/querynode/shard_cluster.go @@ -167,8 +167,8 @@ func NewShardCluster(collectionID int64, replicaID int64, vchannelName string, replicaID: replicaID, vchannelName: vchannelName, - nodeDetector: nodeDetector, segmentDetector: segmentDetector, + nodeDetector: nodeDetector, nodeBuilder: nodeBuilder, nodes: make(map[int64]*shardNode), diff --git a/internal/querynode/task.go b/internal/querynode/task.go index 457c590cc0..1149b044eb 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -599,6 +599,13 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error { collection, err := r.node.metaReplica.getCollectionByID(r.req.CollectionID) if err != nil { + if errors.Is(err, ErrCollectionNotFound) { + log.Info("collection has been released", + zap.Int64("collectionID", r.req.GetCollectionID()), + zap.Error(err), + ) + return nil + } return err } // set release time diff --git a/internal/querynode/task_test.go b/internal/querynode/task_test.go index 9301a6dd20..4714c661dd 100644 --- a/internal/querynode/task_test.go +++ b/internal/querynode/task_test.go @@ -719,7 +719,7 @@ func TestTask_releaseCollectionTask(t *testing.T) { node: node, } err = task.Execute(ctx) - assert.Error(t, err) + assert.NoError(t, err) }) t.Run("test execute remove deltaVChannel tSafe", func(t *testing.T) {