mirror of https://github.com/milvus-io/milvus.git
Signed-off-by: MrPresent-Han <chun.han@zilliz.com>pull/25895/head
parent
2b9ec565bb
commit
b5e79e7f34
|
@ -18,6 +18,8 @@ package proxy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
@ -28,8 +30,10 @@ import (
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||||
"github.com/milvus-io/milvus/internal/allocator"
|
"github.com/milvus-io/milvus/internal/allocator"
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
|
"github.com/milvus-io/milvus/pkg/metrics"
|
||||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
"github.com/milvus-io/milvus/pkg/util/retry"
|
"github.com/milvus-io/milvus/pkg/util/retry"
|
||||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||||
)
|
)
|
||||||
|
@ -119,7 +123,9 @@ func repackInsertDataByPartition(ctx context.Context,
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
beforeAssign := time.Now()
|
||||||
assignedSegmentInfos, err := segIDAssigner.GetSegmentID(insertMsg.CollectionID, partitionID, channelName, uint32(len(rowOffsets)), maxTs)
|
assignedSegmentInfos, err := segIDAssigner.GetSegmentID(insertMsg.CollectionID, partitionID, channelName, uint32(len(rowOffsets)), maxTs)
|
||||||
|
metrics.ProxyAssignSegmentIDLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(time.Since(beforeAssign).Milliseconds()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("allocate segmentID for insert data failed",
|
log.Error("allocate segmentID for insert data failed",
|
||||||
zap.String("collectionName", insertMsg.CollectionName),
|
zap.String("collectionName", insertMsg.CollectionName),
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"container/list"
|
"container/list"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/cockroachdb/errors"
|
"github.com/cockroachdb/errors"
|
||||||
|
@ -29,6 +30,8 @@ import (
|
||||||
"github.com/milvus-io/milvus/internal/allocator"
|
"github.com/milvus-io/milvus/internal/allocator"
|
||||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
|
"github.com/milvus-io/milvus/pkg/metrics"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -309,10 +312,12 @@ func (sa *segIDAssigner) syncSegments() (bool, error) {
|
||||||
PeerRole: typeutil.ProxyRole,
|
PeerRole: typeutil.ProxyRole,
|
||||||
SegmentIDRequests: sa.segReqs,
|
SegmentIDRequests: sa.segReqs,
|
||||||
}
|
}
|
||||||
|
metrics.ProxySyncSegmentRequestLength.WithLabelValues(
|
||||||
|
strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(len(sa.segReqs)))
|
||||||
sa.segReqs = nil
|
sa.segReqs = nil
|
||||||
|
|
||||||
log.Debug("syncSegments call dataCoord.AssignSegmentID", zap.String("request", req.String()))
|
log.Debug("syncSegments call dataCoord.AssignSegmentID", zap.String("request", req.String()))
|
||||||
|
|
||||||
resp, err := sa.dataCoord.AssignSegmentID(context.Background(), req)
|
resp, err := sa.dataCoord.AssignSegmentID(context.Background(), req)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -138,6 +138,26 @@ var (
|
||||||
Buckets: buckets, // unit: ms
|
Buckets: buckets, // unit: ms
|
||||||
}, []string{nodeIDLabelName, msgTypeLabelName})
|
}, []string{nodeIDLabelName, msgTypeLabelName})
|
||||||
|
|
||||||
|
// ProxyAssignSegmentIDLatency record the latency that Proxy get segmentID from dataCoord.
|
||||||
|
ProxyAssignSegmentIDLatency = prometheus.NewHistogramVec(
|
||||||
|
prometheus.HistogramOpts{
|
||||||
|
Namespace: milvusNamespace,
|
||||||
|
Subsystem: typeutil.ProxyRole,
|
||||||
|
Name: "assign_segmentID_latency",
|
||||||
|
Help: "latency that proxy get segmentID from dataCoord",
|
||||||
|
Buckets: buckets, // unit: ms
|
||||||
|
}, []string{nodeIDLabelName})
|
||||||
|
|
||||||
|
// ProxySyncSegmentRequestLength the length of SegmentIDRequests when assigning segments for insert.
|
||||||
|
ProxySyncSegmentRequestLength = prometheus.NewHistogramVec(
|
||||||
|
prometheus.HistogramOpts{
|
||||||
|
Namespace: milvusNamespace,
|
||||||
|
Subsystem: typeutil.ProxyRole,
|
||||||
|
Name: "sync_segment_request_length",
|
||||||
|
Help: "the length of SegmentIDRequests when assigning segments for insert",
|
||||||
|
Buckets: buckets,
|
||||||
|
}, []string{nodeIDLabelName})
|
||||||
|
|
||||||
// ProxyCacheStatsCounter record the number of Proxy cache hits or miss.
|
// ProxyCacheStatsCounter record the number of Proxy cache hits or miss.
|
||||||
ProxyCacheStatsCounter = prometheus.NewCounterVec(
|
ProxyCacheStatsCounter = prometheus.NewCounterVec(
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
|
@ -289,6 +309,9 @@ func RegisterProxy(registry *prometheus.Registry) {
|
||||||
|
|
||||||
registry.MustRegister(ProxySendMutationReqLatency)
|
registry.MustRegister(ProxySendMutationReqLatency)
|
||||||
|
|
||||||
|
registry.MustRegister(ProxyAssignSegmentIDLatency)
|
||||||
|
registry.MustRegister(ProxySyncSegmentRequestLength)
|
||||||
|
|
||||||
registry.MustRegister(ProxyCacheStatsCounter)
|
registry.MustRegister(ProxyCacheStatsCounter)
|
||||||
registry.MustRegister(ProxyUpdateCacheLatency)
|
registry.MustRegister(ProxyUpdateCacheLatency)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue