mirror of https://github.com/milvus-io/milvus.git
[Cherry-Pick]Add a segment metrics for count number of binlog files (#22103)
Signed-off-by: jaime <yun.zhang@zilliz.com>pull/22137/head
parent
511265c68c
commit
a2435cfc4f
|
@ -297,7 +297,7 @@ dataCoord:
|
|||
minSizeFromIdleToSealed: 16 # The min size in MB of segment which can be idle from sealed.
|
||||
# The max number of statslog file for one segment, the segment will be sealed if
|
||||
# the number of statslog file reaches to max value.
|
||||
maxBinlogFileNumber: 16
|
||||
maxBinlogFileNumber: 32
|
||||
smallProportion: 0.5 # The segment is considered as "small segment" when its # of rows is smaller than
|
||||
# (smallProportion * segment max # of rows).
|
||||
compactableProportion: 0.5 # A compaction will happen on small segments if the segment after compaction will have
|
||||
|
|
|
@ -23,8 +23,6 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/maps"
|
||||
|
@ -34,7 +32,9 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/kv"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/util"
|
||||
|
@ -258,6 +258,7 @@ func (kc *Catalog) AlterSegments(ctx context.Context, newSegments []*datapb.Segm
|
|||
}
|
||||
kvsBySeg := make(map[int64]map[string]string)
|
||||
for _, segment := range newSegments {
|
||||
kc.collectMetrics(segment)
|
||||
segmentKvs, err := buildSegmentAndBinlogsKvs(segment)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -305,9 +306,29 @@ func (kc *Catalog) AlterSegment(ctx context.Context, newSegment *datapb.SegmentI
|
|||
}
|
||||
maps.Copy(kvs, segmentKvs)
|
||||
|
||||
kc.collectMetrics(newSegment)
|
||||
return kc.MetaKv.MultiSave(kvs)
|
||||
}
|
||||
|
||||
func (kc *Catalog) collectMetrics(s *datapb.SegmentInfo) {
|
||||
statsFieldFn := func(fieldBinlogs []*datapb.FieldBinlog) int {
|
||||
cnt := 0
|
||||
for _, fbs := range fieldBinlogs {
|
||||
cnt += len(fbs.Binlogs)
|
||||
}
|
||||
return cnt
|
||||
}
|
||||
|
||||
cnt := 0
|
||||
cnt += statsFieldFn(s.GetBinlogs())
|
||||
cnt += statsFieldFn(s.GetStatslogs())
|
||||
cnt += statsFieldFn(s.GetDeltalogs())
|
||||
|
||||
metrics.DataCoordSegmentBinLogFileCount.
|
||||
WithLabelValues(fmt.Sprint(s.CollectionID), fmt.Sprint(s.GetID())).
|
||||
Set(float64(cnt))
|
||||
}
|
||||
|
||||
func (kc *Catalog) hasBinlogPrefix(segment *datapb.SegmentInfo) (bool, error) {
|
||||
binlogsKey, _, err := kc.getBinlogsWithPrefix(storage.InsertBinlog, segment.CollectionID, segment.PartitionID, segment.ID)
|
||||
if err != nil {
|
||||
|
@ -371,6 +392,7 @@ func (kc *Catalog) AlterSegmentsAndAddNewSegment(ctx context.Context, segments [
|
|||
return err
|
||||
}
|
||||
maps.Copy(kvs, segmentKvs)
|
||||
kc.collectMetrics(newSegment)
|
||||
}
|
||||
}
|
||||
return kc.MetaKv.MultiSave(kvs)
|
||||
|
@ -441,7 +463,12 @@ func (kc *Catalog) DropSegment(ctx context.Context, segment *datapb.SegmentInfo)
|
|||
keys := []string{segKey}
|
||||
binlogKeys := buildBinlogKeys(segment)
|
||||
keys = append(keys, binlogKeys...)
|
||||
return kc.MetaKv.MultiRemove(keys)
|
||||
if err := kc.MetaKv.MultiRemove(keys); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
metrics.CleanupDataCoordSegmentMetrics(segment.CollectionID, segment.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kc *Catalog) MarkChannelDeleted(ctx context.Context, channel string) error {
|
||||
|
|
|
@ -17,8 +17,11 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"fmt"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -95,6 +98,17 @@ var (
|
|||
Help: "binlog size of segments",
|
||||
}, []string{segmentStateLabelName})
|
||||
|
||||
DataCoordSegmentBinLogFileCount = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.DataCoordRole,
|
||||
Name: "segment_binlog_file_count",
|
||||
Help: "number of binlog files for each segment",
|
||||
}, []string{
|
||||
collectionIDLabelName,
|
||||
segmentIDLabelName,
|
||||
})
|
||||
|
||||
/* hard to implement, commented now
|
||||
DataCoordSegmentSizeRatio = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
|
@ -177,7 +191,17 @@ func RegisterDataCoord(registry *prometheus.Registry) {
|
|||
registry.MustRegister(DataCoordNumStoredRowsCounter)
|
||||
registry.MustRegister(DataCoordConsumeDataNodeTimeTickLag)
|
||||
registry.MustRegister(DataCoordStoredBinlogSize)
|
||||
registry.MustRegister(DataCoordSegmentBinLogFileCount)
|
||||
registry.MustRegister(IndexRequestCounter)
|
||||
registry.MustRegister(IndexTaskNum)
|
||||
registry.MustRegister(IndexNodeNum)
|
||||
}
|
||||
|
||||
func CleanupDataCoordSegmentMetrics(collectionID int64, segmentID int64) {
|
||||
DataCoordSegmentBinLogFileCount.
|
||||
Delete(
|
||||
prometheus.Labels{
|
||||
collectionIDLabelName: fmt.Sprint(collectionID),
|
||||
segmentIDLabelName: fmt.Sprint(segmentID),
|
||||
})
|
||||
}
|
||||
|
|
|
@ -20,9 +20,10 @@ import (
|
|||
// nolint:gosec
|
||||
_ "net/http/pprof"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/management"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/management"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -70,6 +71,7 @@ const (
|
|||
queryTypeLabelName = "query_type"
|
||||
collectionName = "collection_name"
|
||||
segmentStateLabelName = "segment_state"
|
||||
segmentIDLabelName = "segment_id"
|
||||
usernameLabelName = "username"
|
||||
roleNameLabelName = "role_name"
|
||||
cacheNameLabelName = "cache_name"
|
||||
|
|
|
@ -20,9 +20,10 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/shirou/gopsutil/v3/disk"
|
||||
|
||||
config "github.com/milvus-io/milvus/internal/config"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"github.com/shirou/gopsutil/v3/disk"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -1472,7 +1473,7 @@ func (p *dataCoordConfig) init(base *BaseTable) {
|
|||
p.SegmentMaxBinlogFileNumber = ParamItem{
|
||||
Key: "dataCoord.segment.maxBinlogFileNumber",
|
||||
Version: "2.2.0",
|
||||
DefaultValue: "16",
|
||||
DefaultValue: "32",
|
||||
}
|
||||
p.SegmentMaxBinlogFileNumber.Init(base.mgr)
|
||||
|
||||
|
|
Loading…
Reference in New Issue