mirror of https://github.com/milvus-io/milvus.git
issue: https://github.com/milvus-io/milvus/issues/32281 pr: https://github.com/milvus-io/milvus/pull/32361 Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/32675/head
parent
6178355e2f
commit
51e08556a9
|
@ -129,14 +129,6 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) {
|
|||
// panic here, insert failure
|
||||
panic(err)
|
||||
}
|
||||
metrics.QueryNodeNumEntities.WithLabelValues(
|
||||
growing.DatabaseName(),
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
fmt.Sprint(growing.Collection()),
|
||||
fmt.Sprint(growing.Partition()),
|
||||
growing.Type().String(),
|
||||
"0",
|
||||
).Add(float64(len(insertData.RowIDs)))
|
||||
growing.UpdateBloomFilter(insertData.PrimaryKeys)
|
||||
|
||||
if newGrowingSegment {
|
||||
|
|
|
@ -106,6 +106,8 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
|
|||
|
||||
collections := node.manager.Collection.List()
|
||||
|
||||
nodeID := fmt.Sprint(node.GetNodeID())
|
||||
|
||||
var totalGrowingSize int64
|
||||
growingSegments := node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing))
|
||||
growingGroupByCollection := lo.GroupBy(growingSegments, func(seg segments.Segment) int64 {
|
||||
|
@ -117,8 +119,25 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
|
|||
return seg.MemSize()
|
||||
})
|
||||
totalGrowingSize += size
|
||||
metrics.QueryNodeEntitiesSize.WithLabelValues(fmt.Sprint(node.GetNodeID()),
|
||||
fmt.Sprint(collection), segments.SegmentTypeGrowing.String()).Set(float64(size))
|
||||
metrics.QueryNodeEntitiesSize.WithLabelValues(nodeID, fmt.Sprint(collection),
|
||||
segments.SegmentTypeGrowing.String()).Set(float64(size))
|
||||
}
|
||||
growingGroupByPartition := lo.GroupBy(growingSegments, func(seg segments.Segment) int64 {
|
||||
return seg.Partition()
|
||||
})
|
||||
for _, segs := range growingGroupByPartition {
|
||||
numEntities := lo.SumBy(segs, func(seg segments.Segment) int64 {
|
||||
return seg.RowNum()
|
||||
})
|
||||
segment := segs[0]
|
||||
metrics.QueryNodeNumEntities.WithLabelValues(
|
||||
segment.DatabaseName(),
|
||||
nodeID,
|
||||
fmt.Sprint(segment.Collection()),
|
||||
fmt.Sprint(segment.Partition()),
|
||||
segments.SegmentTypeGrowing.String(),
|
||||
fmt.Sprint(len(segment.Indexes())),
|
||||
).Set(float64(numEntities))
|
||||
}
|
||||
|
||||
sealedSegments := node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeSealed))
|
||||
|
@ -133,6 +152,23 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
|
|||
metrics.QueryNodeEntitiesSize.WithLabelValues(fmt.Sprint(node.GetNodeID()),
|
||||
fmt.Sprint(collection), segments.SegmentTypeSealed.String()).Set(float64(size))
|
||||
}
|
||||
sealedGroupByPartition := lo.GroupBy(sealedSegments, func(seg segments.Segment) int64 {
|
||||
return seg.Partition()
|
||||
})
|
||||
for _, segs := range sealedGroupByPartition {
|
||||
numEntities := lo.SumBy(segs, func(seg segments.Segment) int64 {
|
||||
return seg.RowNum()
|
||||
})
|
||||
segment := segs[0]
|
||||
metrics.QueryNodeNumEntities.WithLabelValues(
|
||||
segment.DatabaseName(),
|
||||
nodeID,
|
||||
fmt.Sprint(segment.Collection()),
|
||||
fmt.Sprint(segment.Partition()),
|
||||
segments.SegmentTypeSealed.String(),
|
||||
fmt.Sprint(len(segment.Indexes())),
|
||||
).Set(float64(numEntities))
|
||||
}
|
||||
|
||||
return &metricsinfo.QueryNodeQuotaMetrics{
|
||||
Hms: metricsinfo.HardwareMetrics{},
|
||||
|
|
|
@ -29,7 +29,6 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"unsafe"
|
||||
|
@ -1368,15 +1367,6 @@ func (s *LocalSegment) Release(opts ...releaseOption) {
|
|||
|
||||
C.DeleteSegment(ptr)
|
||||
|
||||
metrics.QueryNodeNumEntities.WithLabelValues(
|
||||
s.DatabaseName(),
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
fmt.Sprint(s.Collection()),
|
||||
fmt.Sprint(s.Partition()),
|
||||
s.Type().String(),
|
||||
strconv.FormatInt(int64(len(s.Indexes())), 10),
|
||||
).Sub(float64(s.InsertCount()))
|
||||
|
||||
localDiskUsage, err := GetLocalUsedSize(context.Background(), paramtable.Get().LocalStorageCfg.Path.GetValue())
|
||||
// ignore error here, shall not block releasing
|
||||
if err == nil {
|
||||
|
|
|
@ -1026,15 +1026,6 @@ func (loader *segmentLoader) loadSegment(ctx context.Context,
|
|||
}
|
||||
}
|
||||
|
||||
metrics.QueryNodeNumEntities.WithLabelValues(
|
||||
segment.DatabaseName(),
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
fmt.Sprint(segment.Collection()),
|
||||
fmt.Sprint(segment.Partition()),
|
||||
segment.Type().String(),
|
||||
strconv.FormatInt(int64(len(segment.Indexes())), 10),
|
||||
).Add(float64(loadInfo.GetNumOfRows()))
|
||||
|
||||
log.Info("loading delta...")
|
||||
return loader.LoadDeltaLogs(ctx, segment, loadInfo.Deltalogs)
|
||||
}
|
||||
|
@ -1264,14 +1255,6 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment,
|
|||
return err
|
||||
}
|
||||
|
||||
metrics.QueryNodeNumEntities.WithLabelValues(
|
||||
segment.DatabaseName(),
|
||||
fmt.Sprint(paramtable.GetNodeID()),
|
||||
fmt.Sprint(segment.Collection()),
|
||||
fmt.Sprint(segment.Partition()),
|
||||
segment.Type().String(),
|
||||
strconv.FormatInt(int64(len(segment.Indexes())), 10),
|
||||
).Sub(float64(deltaData.RowCount))
|
||||
log.Info("load delta logs done", zap.Int64("deleteCount", deltaData.RowCount))
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue