mirror of https://github.com/milvus-io/milvus.git
Check ignorable error to prevent unnecessary panic (#17317)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/17336/head
parent
f5bd519e49
commit
e0cbacba59
|
@ -0,0 +1,36 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package common
|
||||
|
||||
type IgnorableError struct {
|
||||
msg string
|
||||
}
|
||||
|
||||
func (i *IgnorableError) Error() string {
|
||||
return i.msg
|
||||
}
|
||||
|
||||
func NewIgnorableError(err error) error {
|
||||
return &IgnorableError{
|
||||
msg: err.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
func IsIgnorableError(err error) bool {
|
||||
_, ok := err.(*IgnorableError)
|
||||
return ok
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package common
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestIgnorableError(t *testing.T) {
|
||||
err := fmt.Errorf("test err")
|
||||
iErr := NewIgnorableError(err)
|
||||
assert.True(t, IsIgnorableError(iErr))
|
||||
assert.False(t, IsIgnorableError(err))
|
||||
}
|
|
@ -26,6 +26,7 @@ import (
|
|||
"github.com/opentracing/opentracing-go"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
"github.com/milvus-io/milvus/internal/mq/msgstream"
|
||||
|
@ -177,7 +178,9 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
|
|||
if err != nil {
|
||||
err = fmt.Errorf("DDNode forward delete msg failed, vChannel = %s, err = %s", ddn.vchannelName, err)
|
||||
log.Error(err.Error())
|
||||
panic(err)
|
||||
if !common.IsIgnorableError(err) {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
fgMsg.startPositions = append(fgMsg.startPositions, msMsg.StartPositions()...)
|
||||
|
|
|
@ -2,14 +2,15 @@ package kafka
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/confluentinc/confluent-kafka-go/kafka"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/confluentinc/confluent-kafka-go/kafka"
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
|
||||
)
|
||||
|
||||
|
@ -37,7 +38,7 @@ func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMe
|
|||
e, ok := <-kp.deliveryChan
|
||||
if !ok {
|
||||
log.Error("kafka produce message fail because of delivery chan is closed", zap.String("topic", kp.topic))
|
||||
return nil, errors.New("delivery chan of kafka producer is closed")
|
||||
return nil, common.NewIgnorableError(fmt.Errorf("delivery chan of kafka producer is closed"))
|
||||
}
|
||||
|
||||
m := e.(*kafka.Message)
|
||||
|
|
Loading…
Reference in New Issue