enhance: skip orphan channel cp meta when checking cp lag (#34555)

issue: # #34545

Print warn log instead of check health fail if orphan channel cp meta is
found in health check request.

Signed-off-by: jaime <yun.zhang@zilliz.com>
pull/34537/head^2
jaime 2024-07-11 09:36:56 +08:00 committed by GitHub
parent aef7664fc6
commit c332f69dec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 49 additions and 4 deletions

View File

@ -3158,8 +3158,8 @@ func Test_CheckHealth(t *testing.T) {
}
collections := map[UniqueID]*collectionInfo{
1: {
ID: 1,
449684528748778322: {
ID: 449684528748778322,
VChannelNames: []string{"ch1", "ch2"},
},
2: nil,
@ -3208,7 +3208,7 @@ func Test_CheckHealth(t *testing.T) {
collections: collections,
channelCPs: &channelCPs{
checkpoints: map[string]*msgpb.MsgPosition{
"ch1": {
"cluster-id-rootcoord-dm_3_449684528748778322v0": {
Timestamp: tsoutil.ComposeTSByTime(time.Now().Add(-1000*time.Hour), 0),
MsgID: []byte{1, 2, 3, 4},
},
@ -3232,7 +3232,15 @@ func Test_CheckHealth(t *testing.T) {
collections: collections,
channelCPs: &channelCPs{
checkpoints: map[string]*msgpb.MsgPosition{
"ch1": {
"cluster-id-rootcoord-dm_3_449684528748778322v0": {
Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0),
MsgID: []byte{1, 2, 3, 4},
},
"cluster-id-rootcoord-dm_3_449684528748778323v0": {
Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0),
MsgID: []byte{1, 2, 3, 4},
},
"invalid-vchannel-name": {
Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0),
MsgID: []byte{1, 2, 3, 4},
},

View File

@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -279,6 +280,15 @@ func getBinLogIDs(segment *SegmentInfo, fieldID int64) []int64 {
func CheckCheckPointsHealth(meta *meta) error {
for channel, cp := range meta.GetChannelCheckpoints() {
collectionID := funcutil.GetCollectionIDFromVChannel(channel)
if collectionID == -1 {
log.Warn("can't parse collection id from vchannel, skip check cp lag", zap.String("vchannel", channel))
continue
}
if meta.GetCollection(collectionID) == nil {
log.Warn("corresponding the collection doesn't exists, skip check cp lag", zap.String("vchannel", channel))
continue
}
ts, _ := tsoutil.ParseTS(cp.Timestamp)
lag := time.Since(ts)
if lag > paramtable.Get().DataCoordCfg.ChannelCheckpointMaxLag.GetAsDuration(time.Second) {

View File

@ -24,6 +24,7 @@ import (
"fmt"
"net"
"reflect"
"regexp"
"strconv"
"strings"
"time"
@ -237,6 +238,18 @@ func ConvertChannelName(chanName string, tokenFrom string, tokenTo string) (stri
return strings.Replace(chanName, tokenFrom, tokenTo, 1), nil
}
func GetCollectionIDFromVChannel(vChannelName string) int64 {
re := regexp.MustCompile(`.*_(\d+)v\d+`)
matches := re.FindStringSubmatch(vChannelName)
if len(matches) > 1 {
number, err := strconv.ParseInt(matches[1], 0, 64)
if err == nil {
return number
}
}
return -1
}
func getNumRowsOfScalarField(datas interface{}) uint64 {
realTypeDatas := reflect.ValueOf(datas)
return uint64(realTypeDatas.Len())

View File

@ -142,6 +142,20 @@ func TestGetAttrByKeyFromRepeatedKV(t *testing.T) {
assert.Error(t, err)
}
func TestGetCollectionIDFromVChannel(t *testing.T) {
vChannel1 := "06b84fe16780ed1-rootcoord-dm_3_449684528748778322v0"
collectionID := GetCollectionIDFromVChannel(vChannel1)
assert.Equal(t, int64(449684528748778322), collectionID)
invailedVChannel := "06b84fe16780ed1-rootcoord-dm_3_v0"
collectionID = GetCollectionIDFromVChannel(invailedVChannel)
assert.Equal(t, int64(-1), collectionID)
invailedVChannel = "06b84fe16780ed1-rootcoord-dm_3_-1v0"
collectionID = GetCollectionIDFromVChannel(invailedVChannel)
assert.Equal(t, int64(-1), collectionID)
}
func TestCheckCtxValid(t *testing.T) {
bgCtx := context.Background()
timeout := 20 * time.Millisecond