mirror of https://github.com/milvus-io/milvus.git
Fix QueryNode may panics when stopped (#21406)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/21415/head
parent
d16b7c3c2d
commit
c5ff8bf7c8
|
@ -18,6 +18,7 @@ package querynode
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
@ -177,6 +178,11 @@ func (nd *etcdShardNodeDetector) handlePutEvent(e *clientv3.Event, collectionID,
|
|||
|
||||
idAddr, err := nd.idAddr()
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
// session canceled, query node is stopping.
|
||||
log.Warn("EtcdNodeDetector id resolve failed", zap.Error(err))
|
||||
return
|
||||
}
|
||||
log.Error("Etcd NodeDetector session map failed", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
|
@ -253,6 +259,11 @@ func (nd *etcdShardNodeDetector) handleDelEvent(e *clientv3.Event, collectionID,
|
|||
}
|
||||
idAddr, err := nd.idAddr()
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
// session canceled, query node is stopping.
|
||||
log.Warn("EtcdNodeDetector id resolve failed", zap.Error(err))
|
||||
return
|
||||
}
|
||||
log.Error("Etcd NodeDetector session map failed", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package querynode
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path"
|
||||
"strconv"
|
||||
|
@ -31,6 +32,8 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
|
||||
)
|
||||
|
||||
|
@ -324,3 +327,81 @@ func TestEtcdShardNodeDetector_watch(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeDetectorHandleWithError(t *testing.T) {
|
||||
t.Run("unexpected error type", func(t *testing.T) {
|
||||
collectionID := int64(1)
|
||||
replicaID := int64(1001)
|
||||
nd := &etcdShardNodeDetector{
|
||||
idAddr: func() (map[int64]string, error) {
|
||||
return nil, errors.New("unexpected error")
|
||||
},
|
||||
}
|
||||
|
||||
replica := &querypb.Replica{
|
||||
ID: replicaID,
|
||||
CollectionID: collectionID,
|
||||
}
|
||||
bs, err := proto.Marshal(replica)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Panics(t, func() {
|
||||
nd.handlePutEvent(&clientv3.Event{
|
||||
Type: mvccpb.PUT,
|
||||
Kv: &mvccpb.KeyValue{
|
||||
Value: bs,
|
||||
},
|
||||
}, collectionID, replicaID)
|
||||
})
|
||||
|
||||
assert.Panics(t, func() {
|
||||
nd.handleDelEvent(&clientv3.Event{
|
||||
Type: mvccpb.DELETE,
|
||||
Kv: &mvccpb.KeyValue{
|
||||
Value: bs,
|
||||
},
|
||||
PrevKv: &mvccpb.KeyValue{
|
||||
Value: bs,
|
||||
},
|
||||
}, collectionID, replicaID)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("context canceled", func(t *testing.T) {
|
||||
collectionID := int64(1)
|
||||
replicaID := int64(1001)
|
||||
nd := &etcdShardNodeDetector{
|
||||
idAddr: func() (map[int64]string, error) {
|
||||
return nil, context.Canceled
|
||||
},
|
||||
}
|
||||
|
||||
replica := &querypb.Replica{
|
||||
ID: replicaID,
|
||||
CollectionID: collectionID,
|
||||
}
|
||||
bs, err := proto.Marshal(replica)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.NotPanics(t, func() {
|
||||
nd.handlePutEvent(&clientv3.Event{
|
||||
Type: mvccpb.PUT,
|
||||
Kv: &mvccpb.KeyValue{
|
||||
Value: bs,
|
||||
},
|
||||
}, collectionID, replicaID)
|
||||
})
|
||||
|
||||
assert.NotPanics(t, func() {
|
||||
nd.handleDelEvent(&clientv3.Event{
|
||||
Type: mvccpb.DELETE,
|
||||
Kv: &mvccpb.KeyValue{
|
||||
Value: bs,
|
||||
},
|
||||
PrevKv: &mvccpb.KeyValue{
|
||||
Value: bs,
|
||||
},
|
||||
}, collectionID, replicaID)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue