diff --git a/internal/querynode/shard_node_detector.go b/internal/querynode/shard_node_detector.go index 0eb009232a..b84b685e42 100644 --- a/internal/querynode/shard_node_detector.go +++ b/internal/querynode/shard_node_detector.go @@ -18,6 +18,7 @@ package querynode import ( "context" + "errors" "sync" "github.com/golang/protobuf/proto" @@ -177,6 +178,11 @@ func (nd *etcdShardNodeDetector) handlePutEvent(e *clientv3.Event, collectionID, idAddr, err := nd.idAddr() if err != nil { + if errors.Is(err, context.Canceled) { + // session canceled, query node is stopping. + log.Warn("EtcdNodeDetector id resolve failed", zap.Error(err)) + return + } log.Error("Etcd NodeDetector session map failed", zap.Error(err)) panic(err) } @@ -253,6 +259,11 @@ func (nd *etcdShardNodeDetector) handleDelEvent(e *clientv3.Event, collectionID, } idAddr, err := nd.idAddr() if err != nil { + if errors.Is(err, context.Canceled) { + // session canceled, query node is stopping. + log.Warn("EtcdNodeDetector id resolve failed", zap.Error(err)) + return + } log.Error("Etcd NodeDetector session map failed", zap.Error(err)) panic(err) } diff --git a/internal/querynode/shard_node_detector_test.go b/internal/querynode/shard_node_detector_test.go index 36638e034a..51aed3822d 100644 --- a/internal/querynode/shard_node_detector_test.go +++ b/internal/querynode/shard_node_detector_test.go @@ -18,6 +18,7 @@ package querynode import ( "context" + "errors" "fmt" "path" "strconv" @@ -31,6 +32,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/etcdserver/api/v3client" ) @@ -324,3 +327,81 @@ func TestEtcdShardNodeDetector_watch(t *testing.T) { }) } } + +func TestNodeDetectorHandleWithError(t *testing.T) { + t.Run("unexpected error type", func(t *testing.T) { + collectionID := int64(1) + replicaID := int64(1001) + nd := &etcdShardNodeDetector{ + idAddr: func() (map[int64]string, error) { + return nil, errors.New("unexpected error") + }, + } + + replica := &querypb.Replica{ + ID: replicaID, + CollectionID: collectionID, + } + bs, err := proto.Marshal(replica) + require.NoError(t, err) + + assert.Panics(t, func() { + nd.handlePutEvent(&clientv3.Event{ + Type: mvccpb.PUT, + Kv: &mvccpb.KeyValue{ + Value: bs, + }, + }, collectionID, replicaID) + }) + + assert.Panics(t, func() { + nd.handleDelEvent(&clientv3.Event{ + Type: mvccpb.DELETE, + Kv: &mvccpb.KeyValue{ + Value: bs, + }, + PrevKv: &mvccpb.KeyValue{ + Value: bs, + }, + }, collectionID, replicaID) + }) + }) + + t.Run("context canceled", func(t *testing.T) { + collectionID := int64(1) + replicaID := int64(1001) + nd := &etcdShardNodeDetector{ + idAddr: func() (map[int64]string, error) { + return nil, context.Canceled + }, + } + + replica := &querypb.Replica{ + ID: replicaID, + CollectionID: collectionID, + } + bs, err := proto.Marshal(replica) + require.NoError(t, err) + + assert.NotPanics(t, func() { + nd.handlePutEvent(&clientv3.Event{ + Type: mvccpb.PUT, + Kv: &mvccpb.KeyValue{ + Value: bs, + }, + }, collectionID, replicaID) + }) + + assert.NotPanics(t, func() { + nd.handleDelEvent(&clientv3.Event{ + Type: mvccpb.DELETE, + Kv: &mvccpb.KeyValue{ + Value: bs, + }, + PrevKv: &mvccpb.KeyValue{ + Value: bs, + }, + }, collectionID, replicaID) + }) + }) +}