fix: Set segment level for comapct to segment (#30129)

See also: #29204

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/30149/head
XuanYang-cn 2024-01-19 18:52:53 +08:00 committed by GitHub
parent aee19dcd6b
commit 3d46096f86
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 18 additions and 14 deletions

View File

@ -125,6 +125,7 @@ func (c *metaCacheImpl) CompactSegments(newSegmentID, partitionID int64, numOfRo
segmentID: newSegmentID,
partitionID: partitionID,
state: commonpb.SegmentState_Flushed,
level: datapb.SegmentLevel_L1,
flushedRows: numOfRows,
startPosRecorded: true,
bfs: bfs,

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type MetaCacheSuite struct {
@ -46,6 +47,8 @@ type MetaCacheSuite struct {
}
func (s *MetaCacheSuite) SetupSuite() {
paramtable.Init()
s.collectionID = 1
s.vchannel = "test"
s.partitionIDs = []int64{1, 2, 3, 4}
@ -112,6 +115,7 @@ func (s *MetaCacheSuite) TestCompactSegments() {
if seg.SegmentID() == s.newSegments[i] {
s.Equal(commonpb.SegmentState_Flushed, seg.State())
s.Equal(int64(100), seg.NumOfRows())
s.Equal(datapb.SegmentLevel_L1, seg.Level())
}
if seg.SegmentID() == s.flushedSegments[i] {
s.Equal(s.newSegments[i], seg.CompactTo())

View File

@ -75,7 +75,6 @@ func NewStorageSerializer(metacache metacache.MetaCache, metaWriter MetaWriter)
func (s *storageV1Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack) (Task, error) {
task := NewSyncTask()
tr := timerecord.NewTimeRecorder("storage_serializer")
metricSegLevel := pack.level.String()
log := log.Ctx(ctx).With(
zap.Int64("segmentID", pack.segmentID),
@ -135,7 +134,7 @@ func (s *storageV1Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack)
s.setTaskMeta(task, pack)
metrics.DataNodeEncodeBufferLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metricSegLevel).Observe(float64(tr.RecordSpan().Milliseconds()))
metrics.DataNodeEncodeBufferLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), pack.level.String()).Observe(float64(tr.RecordSpan().Milliseconds()))
return task, nil
}

View File

@ -98,28 +98,28 @@ func (t *SyncTask) getLogger() *log.MLogger {
zap.Int64("partitionID", t.partitionID),
zap.Int64("segmentID", t.segmentID),
zap.String("channel", t.channelName),
zap.String("level", t.level.String()),
)
}
func (t *SyncTask) handleError(err error, metricSegLevel string) {
func (t *SyncTask) handleError(err error) {
if t.failureCallback != nil {
t.failureCallback(err)
}
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel, metricSegLevel).Inc()
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel, t.level.String()).Inc()
if !t.isFlush {
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel, metricSegLevel).Inc()
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel, t.level.String()).Inc()
}
}
func (t *SyncTask) Run() (err error) {
t.tr = timerecord.NewTimeRecorder("syncTask")
metricSegLevel := t.level.String()
log := t.getLogger()
defer func() {
if err != nil {
t.handleError(err, metricSegLevel)
t.handleError(err)
}
}()
@ -128,7 +128,7 @@ func (t *SyncTask) Run() (err error) {
if !has {
log.Warn("failed to sync data, segment not found in metacache")
err := merr.WrapErrSegmentNotFound(t.segmentID)
t.handleError(err, metricSegLevel)
t.handleError(err)
return err
}
@ -157,7 +157,7 @@ func (t *SyncTask) Run() (err error) {
err = t.writeLogs()
if err != nil {
log.Warn("failed to save serialized data into storage", zap.Error(err))
t.handleError(err, metricSegLevel)
t.handleError(err)
return err
}
@ -169,15 +169,15 @@ func (t *SyncTask) Run() (err error) {
totalSize += float64(len(t.deltaBlob.Value))
}
metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.AllLabel, metricSegLevel).Add(totalSize)
metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.AllLabel, t.level.String()).Add(totalSize)
metrics.DataNodeSave2StorageLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metricSegLevel).Observe(float64(t.tr.RecordSpan().Milliseconds()))
metrics.DataNodeSave2StorageLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.level.String()).Observe(float64(t.tr.RecordSpan().Milliseconds()))
if t.metaWriter != nil {
err = t.writeMeta()
if err != nil {
log.Warn("failed to save serialized data into storage", zap.Error(err))
t.handleError(err, metricSegLevel)
t.handleError(err)
return err
}
}
@ -195,9 +195,9 @@ func (t *SyncTask) Run() (err error) {
log.Info("task done")
if !t.isFlush {
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel, metricSegLevel).Inc()
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel, t.level.String()).Inc()
}
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel, metricSegLevel).Inc()
metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel, t.level.String()).Inc()
return nil
}