mirror of https://github.com/milvus-io/milvus.git
related: #30376 also: refine log output for query_coord task by rephrasing action string Signed-off-by: MrPresent-Han <chun.han@gmail.com> Co-authored-by: MrPresent-Han <chun.han@gmail.com>pull/34084/head
parent
78885a44c4
commit
ca7ef26e4b
|
@ -17,13 +17,14 @@
|
|||
package task
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"fmt"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/atomic"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
@ -50,6 +51,7 @@ type Action interface {
|
|||
Node() int64
|
||||
Type() ActionType
|
||||
IsFinished(distMgr *meta.DistributionManager) bool
|
||||
String() string
|
||||
}
|
||||
|
||||
type BaseAction struct {
|
||||
|
@ -78,6 +80,10 @@ func (action *BaseAction) Shard() string {
|
|||
return action.shard
|
||||
}
|
||||
|
||||
func (action *BaseAction) String() string {
|
||||
return fmt.Sprintf(`{[type=%v][node=%d][shard=%v]}`, action.Type(), action.Node(), action.Shard())
|
||||
}
|
||||
|
||||
type SegmentAction struct {
|
||||
*BaseAction
|
||||
|
||||
|
@ -153,6 +159,10 @@ func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool
|
|||
return true
|
||||
}
|
||||
|
||||
func (action *SegmentAction) String() string {
|
||||
return action.BaseAction.String() + fmt.Sprintf(`{[segmentID=%d][scope=%d]}`, action.SegmentID(), action.Scope())
|
||||
}
|
||||
|
||||
type ChannelAction struct {
|
||||
*BaseAction
|
||||
}
|
||||
|
@ -218,6 +228,19 @@ func (action *LeaderAction) Version() typeutil.UniqueID {
|
|||
return action.version
|
||||
}
|
||||
|
||||
func (action *LeaderAction) PartStats() map[int64]int64 {
|
||||
return action.partStatsVersions
|
||||
}
|
||||
|
||||
func (action *LeaderAction) String() string {
|
||||
partStatsStr := ""
|
||||
if action.PartStats() != nil {
|
||||
partStatsStr = fmt.Sprintf("%v", action.PartStats())
|
||||
}
|
||||
return action.BaseAction.String() + fmt.Sprintf(`{[leaderID=%v][segmentID=%d][version=%d][partStats=%s]}`,
|
||||
action.GetLeaderID(), action.SegmentID(), action.Version(), partStatsStr)
|
||||
}
|
||||
|
||||
func (action *LeaderAction) GetLeaderID() typeutil.UniqueID {
|
||||
return action.leaderID
|
||||
}
|
||||
|
@ -237,7 +260,7 @@ func (action *LeaderAction) IsFinished(distMgr *meta.DistributionManager) bool {
|
|||
case ActionTypeReduce:
|
||||
return action.rpcReturned.Load() && (dist == nil || dist.NodeID != action.Node())
|
||||
case ActionTypeUpdate:
|
||||
return action.rpcReturned.Load() && (dist != nil && reflect.DeepEqual(action.partStatsVersions, view.PartitionStatsVersions))
|
||||
return action.rpcReturned.Load() && common.MapEquals(action.partStatsVersions, view.PartitionStatsVersions)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -280,15 +280,8 @@ func (task *baseTask) SetReason(reason string) {
|
|||
|
||||
func (task *baseTask) String() string {
|
||||
var actionsStr string
|
||||
for i, action := range task.actions {
|
||||
if realAction, ok := action.(*SegmentAction); ok {
|
||||
actionsStr += fmt.Sprintf(`{[type=%v][node=%d][streaming=%v]}`, action.Type(), action.Node(), realAction.Scope() == querypb.DataScope_Streaming)
|
||||
} else {
|
||||
actionsStr += fmt.Sprintf(`{[type=%v][node=%d]}`, action.Type(), action.Node())
|
||||
}
|
||||
if i != len(task.actions)-1 {
|
||||
actionsStr += ", "
|
||||
}
|
||||
for _, action := range task.actions {
|
||||
actionsStr += action.String() + ","
|
||||
}
|
||||
return fmt.Sprintf(
|
||||
"[id=%d] [type=%s] [source=%s] [reason=%s] [collectionID=%d] [replicaID=%d] [resourceGroup=%s] [priority=%s] [actionsCount=%d] [actions=%s]",
|
||||
|
|
|
@ -825,7 +825,8 @@ func (sd *shardDelegator) loadPartitionStats(ctx context.Context, partStatsVersi
|
|||
defer sd.partitionStatsMut.Unlock()
|
||||
sd.partitionStats[partID] = partStats
|
||||
}()
|
||||
log.Info("Updated partitionStats for partition", zap.Int64("partitionID", partID))
|
||||
log.Info("Updated partitionStats for partition", zap.Int64("collectionID", sd.collectionID), zap.Int64("partitionID", partID),
|
||||
zap.Int64("newVersion", newVersion), zap.Int64("oldVersion", curStats.GetVersion()))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -85,12 +85,12 @@ func PruneSegments(ctx context.Context,
|
|||
plan := planpb.PlanNode{}
|
||||
err := proto.Unmarshal(expr, &plan)
|
||||
if err != nil {
|
||||
log.Error("failed to unmarshall serialized expr from bytes, failed the operation")
|
||||
log.Ctx(ctx).Error("failed to unmarshall serialized expr from bytes, failed the operation")
|
||||
return
|
||||
}
|
||||
expr, err := exprutil.ParseExprFromPlan(&plan)
|
||||
if err != nil {
|
||||
log.Error("failed to parse expr from plan, failed the operation")
|
||||
log.Ctx(ctx).Error("failed to parse expr from plan, failed the operation")
|
||||
return
|
||||
}
|
||||
targetRanges, matchALL := exprutil.ParseRanges(expr, exprutil.ClusteringKey)
|
||||
|
@ -123,7 +123,8 @@ func PruneSegments(ctx context.Context,
|
|||
metrics.QueryNodeSegmentPruneRatio.
|
||||
WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(typeutil.IsVectorType(clusteringKeyField.GetDataType()))).
|
||||
Observe(float64(realFilteredSegments / totalSegNum))
|
||||
log.Debug("Pruned segment for search/query",
|
||||
log.Ctx(ctx).Debug("Pruned segment for search/query",
|
||||
zap.Int("filtered_segment_num[stats]", len(filteredSegments)),
|
||||
zap.Int("filtered_segment_num[excluded]", realFilteredSegments),
|
||||
zap.Int("total_segment_num", totalSegNum),
|
||||
zap.Float32("filtered_ratio", float32(realFilteredSegments)/float32(totalSegNum)),
|
||||
|
|
|
@ -46,6 +46,9 @@ func NewPartitionStatsSnapshot() *PartitionStatsSnapshot {
|
|||
}
|
||||
|
||||
func (ps *PartitionStatsSnapshot) GetVersion() int64 {
|
||||
if ps == nil {
|
||||
return 0
|
||||
}
|
||||
return ps.Version
|
||||
}
|
||||
|
||||
|
|
|
@ -22,3 +22,16 @@ func (m Str2Str) Equal(other Str2Str) bool {
|
|||
func CloneStr2Str(m Str2Str) Str2Str {
|
||||
return m.Clone()
|
||||
}
|
||||
|
||||
func MapEquals(m1, m2 map[int64]int64) bool {
|
||||
if len(m1) != len(m2) {
|
||||
return false
|
||||
}
|
||||
for k1, v1 := range m1 {
|
||||
v2, exist := m2[k1]
|
||||
if !exist || v1 != v2 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -35,3 +35,25 @@ func TestCloneStr2Str(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMapEqual(t *testing.T) {
|
||||
{
|
||||
m1 := map[int64]int64{1: 11, 2: 22, 3: 33}
|
||||
m2 := map[int64]int64{1: 11, 2: 22, 3: 33}
|
||||
assert.True(t, MapEquals(m1, m2))
|
||||
}
|
||||
{
|
||||
m1 := map[int64]int64{1: 11, 2: 23, 3: 33}
|
||||
m2 := map[int64]int64{1: 11, 2: 22, 3: 33}
|
||||
assert.False(t, MapEquals(m1, m2))
|
||||
}
|
||||
{
|
||||
m1 := map[int64]int64{1: 11, 2: 23, 3: 33}
|
||||
m2 := map[int64]int64{1: 11, 2: 22}
|
||||
assert.False(t, MapEquals(m1, m2))
|
||||
}
|
||||
{
|
||||
m1 := map[int64]int64{1: 11, 2: 23, 3: 33}
|
||||
assert.False(t, MapEquals(m1, nil))
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue