Add ignore zero segment config (#23047)

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/23052/head
XuanYang-cn 2023-03-28 11:54:54 +08:00 committed by GitHub
parent 677d456092
commit 3484851511
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 57 additions and 0 deletions

View File

@ -429,6 +429,8 @@ common:
ttl: 60 # ttl value when session granting a lease to register service
retryTimes: 30 # retry times when session sending etcd requests
ignoreSegment: -1 # The segmentID to ignore
# QuotaConfig, configurations of Milvus quota and limits.
# By default, we enable:
# 1. TT protection;

View File

@ -75,6 +75,11 @@ func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID Uniq
continue
}
if Params.CommonCfg.IgnoreSegment >= 0 && s.GetID() == Params.CommonCfg.IgnoreSegment {
// Skip the ignored segment
continue
}
if s.GetState() == commonpb.SegmentState_Dropped {
droppedIDs.Insert(s.GetID())
continue
@ -128,6 +133,10 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID Uni
// Skip bulk insert segments.
continue
}
if Params.CommonCfg.IgnoreSegment >= 0 && s.GetID() == Params.CommonCfg.IgnoreSegment {
// Skip the ignored segment
continue
}
segmentInfos[s.GetID()] = s
if s.GetState() == commonpb.SegmentState_Dropped {
droppedIDs.Insert(s.GetID())

View File

@ -18,6 +18,7 @@ package datanode
import (
"context"
"encoding/base64"
"fmt"
"reflect"
"sync/atomic"
@ -197,6 +198,38 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
continue
}
// Skip the ignored segment and log the checkpoints
if Params.CommonCfg.IgnoreSegment >= 0 && imsg.GetSegmentID() == Params.CommonCfg.IgnoreSegment {
if len(msMsg.StartPositions()) <= 0 || len(msMsg.EndPositions()) <= 0 {
log.Warn("Empty startpositions and endpositions",
zap.Int64("filter segment ID", imsg.GetSegmentID()),
zap.String("current vChannel", ddn.vChannelName),
)
continue
}
var (
startPos = msMsg.StartPositions()[0]
endPos = msMsg.EndPositions()[0]
cp = &datapb.CheckPoint{
SegmentID: imsg.GetSegmentID(),
Position: startPos,
}
base64StartPosMsgID = base64.StdEncoding.EncodeToString(startPos.GetMsgID())
base64EndPosMsgID = base64.StdEncoding.EncodeToString(endPos.GetMsgID())
)
log.Info("Filter ignored segment",
zap.Int64("filter segment ID", imsg.GetSegmentID()),
zap.String("current vChannel", ddn.vChannelName),
zap.Any("cp", cp),
zap.Any("start position", startPos),
zap.Any("end position", endPos),
zap.String("start position msgID", base64StartPosMsgID),
zap.String("end position msgID", base64EndPosMsgID),
)
continue
}
rateCol.Add(metricsinfo.InsertConsumeThroughput, float64(proto.Size(&imsg.InsertRequest)))
metrics.DataNodeConsumeBytesCount.

View File

@ -203,6 +203,11 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
}
}
if Params.CommonCfg.IgnoreSegment >= 0 && msg.GetSegmentID() == Params.CommonCfg.IgnoreSegment {
// filter out msg when IgnoreSegment equals msg's segmentID
return nil, nil
}
// Check if the segment is in excluded segments,
// messages after seekPosition may contain the redundant data from flushed slice of segment,
// so we need to compare the endTimestamp of received messages and position's timestamp.

View File

@ -170,6 +170,8 @@ type commonConfig struct {
SessionTTL int64
SessionRetryTimes int64
IgnoreSegment int64
}
func (p *commonConfig) init(base *BaseTable) {
@ -223,6 +225,12 @@ func (p *commonConfig) init(base *BaseTable) {
p.initSessionTTL()
p.initSessionRetryTimes()
p.initIgnoreSegment()
}
func (p *commonConfig) initIgnoreSegment() {
p.IgnoreSegment = p.Base.ParseInt64WithDefault("common.ignoreSegment", -1)
}
func (p *commonConfig) initClusterPrefix() {