enhance: make Load process traceable in querycoord (#29806)

See also #29803

This PR:
- Add trace span for collection/partition load
- Use TraceSpan to generate Segment/ChannelTasks when loading
- Refine BaseTask trace tag usage

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/29690/head
congqixia 2024-01-10 09:58:49 +08:00 committed by GitHub
parent 2f702ad316
commit c4ddfff2a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 63 additions and 27 deletions

View File

@ -21,6 +21,7 @@ import (
"time"
"github.com/samber/lo"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
@ -98,16 +99,16 @@ func (c *ChannelChecker) checkReplica(ctx context.Context, replica *meta.Replica
ret := make([]task.Task, 0)
lacks, redundancies := c.getDmChannelDiff(replica.GetCollectionID(), replica.GetID())
tasks := c.createChannelLoadTask(ctx, lacks, replica)
tasks := c.createChannelLoadTask(c.getTraceCtx(ctx, replica.CollectionID), lacks, replica)
task.SetReason("lacks of channel", tasks...)
ret = append(ret, tasks...)
tasks = c.createChannelReduceTasks(ctx, redundancies, replica.GetID())
tasks = c.createChannelReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), redundancies, replica.GetID())
task.SetReason("collection released", tasks...)
ret = append(ret, tasks...)
repeated := c.findRepeatedChannels(replica.GetID())
tasks = c.createChannelReduceTasks(ctx, repeated, replica.GetID())
tasks = c.createChannelReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), repeated, replica.GetID())
task.SetReason("redundancies of channel")
ret = append(ret, tasks...)
@ -222,3 +223,12 @@ func (c *ChannelChecker) createChannelReduceTasks(ctx context.Context, channels
}
return ret
}
func (c *ChannelChecker) getTraceCtx(ctx context.Context, collectionID int64) context.Context {
coll := c.meta.GetCollection(collectionID)
if coll == nil || coll.LoadSpan == nil {
return ctx
}
return trace.ContextWithSpan(ctx, coll.LoadSpan)
}

View File

@ -22,6 +22,7 @@ import (
"time"
"github.com/samber/lo"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -120,25 +121,26 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica
// compare with targets to find the lack and redundancy of segments
lacks, redundancies := c.getSealedSegmentDiff(replica.GetCollectionID(), replica.GetID())
tasks := c.createSegmentLoadTasks(ctx, lacks, replica)
// loadCtx := trace.ContextWithSpan(context.Background(), c.meta.GetCollection(replica.CollectionID).LoadSpan)
tasks := c.createSegmentLoadTasks(c.getTraceCtx(ctx, replica.CollectionID), lacks, replica)
task.SetReason("lacks of segment", tasks...)
ret = append(ret, tasks...)
redundancies = c.filterSegmentInUse(replica, redundancies)
tasks = c.createSegmentReduceTasks(ctx, redundancies, replica.GetID(), querypb.DataScope_Historical)
tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), redundancies, replica.GetID(), querypb.DataScope_Historical)
task.SetReason("segment not exists in target", tasks...)
ret = append(ret, tasks...)
// compare inner dists to find repeated loaded segments
redundancies = c.findRepeatedSealedSegments(replica.GetID())
redundancies = c.filterExistedOnLeader(replica, redundancies)
tasks = c.createSegmentReduceTasks(ctx, redundancies, replica.GetID(), querypb.DataScope_Historical)
tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), redundancies, replica.GetID(), querypb.DataScope_Historical)
task.SetReason("redundancies of segment", tasks...)
ret = append(ret, tasks...)
// compare with target to find the lack and redundancy of segments
_, redundancies = c.getGrowingSegmentDiff(replica.GetCollectionID(), replica.GetID())
tasks = c.createSegmentReduceTasks(ctx, redundancies, replica.GetID(), querypb.DataScope_Streaming)
tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.CollectionID), redundancies, replica.GetID(), querypb.DataScope_Streaming)
task.SetReason("streaming segment not exists in target", tasks...)
ret = append(ret, tasks...)
@ -411,3 +413,12 @@ func (c *SegmentChecker) createSegmentReduceTasks(ctx context.Context, segments
}
return ret
}
func (c *SegmentChecker) getTraceCtx(ctx context.Context, collectionID int64) context.Context {
coll := c.meta.GetCollection(collectionID)
if coll == nil || coll.LoadSpan == nil {
return ctx
}
return trace.ContextWithSpan(ctx, coll.LoadSpan)
}

View File

@ -23,6 +23,8 @@ import (
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/querypb"
@ -179,6 +181,8 @@ func (job *LoadCollectionJob) Execute() error {
CreatedAt: time.Now(),
}
})
_, sp := otel.Tracer(typeutil.QueryCoordRole).Start(job.ctx, "LoadCollection", trace.WithNewRoot())
collection := &meta.Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: req.GetCollectionID(),
@ -188,6 +192,7 @@ func (job *LoadCollectionJob) Execute() error {
LoadType: querypb.LoadType_LoadCollection,
},
CreatedAt: time.Now(),
LoadSpan: sp,
}
job.undo.IsNewCollection = true
err = job.meta.CollectionManager.PutCollection(collection, partitions...)
@ -355,6 +360,8 @@ func (job *LoadPartitionJob) Execute() error {
})
if !job.meta.CollectionManager.Exist(req.GetCollectionID()) {
job.undo.IsNewCollection = true
_, sp := otel.Tracer(typeutil.QueryCoordRole).Start(job.ctx, "LoadPartition", trace.WithNewRoot())
collection := &meta.Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: req.GetCollectionID(),
@ -364,6 +371,7 @@ func (job *LoadPartitionJob) Execute() error {
LoadType: querypb.LoadType_LoadPartition,
},
CreatedAt: time.Now(),
LoadSpan: sp,
}
err = job.meta.CollectionManager.PutCollection(collection, partitions...)
if err != nil {

View File

@ -25,6 +25,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/metastore"
@ -45,6 +46,7 @@ type Collection struct {
mut sync.RWMutex
refreshNotifier chan struct{}
LoadSpan trace.Span
}
func (collection *Collection) SetRefreshNotifier(notifier chan struct{}) {
@ -79,6 +81,7 @@ func (collection *Collection) Clone() *Collection {
CreatedAt: collection.CreatedAt,
UpdatedAt: collection.UpdatedAt,
refreshNotifier: collection.refreshNotifier,
LoadSpan: collection.LoadSpan,
}
}
@ -502,6 +505,10 @@ func (m *CollectionManager) UpdateLoadPercent(partitionID int64, loadPercent int
saveCollection := false
if collectionPercent == 100 {
saveCollection = true
if newCollection.LoadSpan != nil {
newCollection.LoadSpan.End()
newCollection.LoadSpan = nil
}
newCollection.Status = querypb.LoadStatus_Loaded
// if collection becomes loaded, clear it's recoverTimes in load info

View File

@ -29,7 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/pkg/util/merr"
. "github.com/milvus-io/milvus/pkg/util/typeutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type (
@ -69,10 +69,10 @@ type Source fmt.Stringer
type Task interface {
Context() context.Context
Source() Source
ID() UniqueID
CollectionID() UniqueID
ReplicaID() UniqueID
SetID(id UniqueID)
ID() typeutil.UniqueID
CollectionID() typeutil.UniqueID
ReplicaID() typeutil.UniqueID
SetID(id typeutil.UniqueID)
Status() Status
SetStatus(status Status)
Err() error
@ -100,9 +100,9 @@ type baseTask struct {
doneCh chan struct{}
canceled *atomic.Bool
id UniqueID // Set by scheduler
collectionID UniqueID
replicaID UniqueID
id typeutil.UniqueID // Set by scheduler
collectionID typeutil.UniqueID
replicaID typeutil.UniqueID
shard string
loadType querypb.LoadType
@ -118,9 +118,9 @@ type baseTask struct {
span trace.Span
}
func newBaseTask(ctx context.Context, source Source, collectionID, replicaID UniqueID, shard string) *baseTask {
func newBaseTask(ctx context.Context, source Source, collectionID, replicaID typeutil.UniqueID, shard string, taskTag string) *baseTask {
ctx, cancel := context.WithCancel(ctx)
ctx, span := otel.Tracer("QueryCoord").Start(ctx, "QueryCoord-BaseTask")
ctx, span := otel.Tracer(typeutil.QueryCoordRole).Start(ctx, taskTag)
return &baseTask{
source: source,
@ -146,19 +146,19 @@ func (task *baseTask) Source() Source {
return task.source
}
func (task *baseTask) ID() UniqueID {
func (task *baseTask) ID() typeutil.UniqueID {
return task.id
}
func (task *baseTask) SetID(id UniqueID) {
func (task *baseTask) SetID(id typeutil.UniqueID) {
task.id = id
}
func (task *baseTask) CollectionID() UniqueID {
func (task *baseTask) CollectionID() typeutil.UniqueID {
return task.collectionID
}
func (task *baseTask) ReplicaID() UniqueID {
func (task *baseTask) ReplicaID() typeutil.UniqueID {
return task.replicaID
}
@ -278,7 +278,7 @@ func (task *baseTask) String() string {
type SegmentTask struct {
*baseTask
segmentID UniqueID
segmentID typeutil.UniqueID
}
// NewSegmentTask creates a SegmentTask with actions,
@ -288,7 +288,7 @@ func NewSegmentTask(ctx context.Context,
timeout time.Duration,
source Source,
collectionID,
replicaID UniqueID,
replicaID typeutil.UniqueID,
actions ...Action,
) (*SegmentTask, error) {
if len(actions) == 0 {
@ -310,7 +310,7 @@ func NewSegmentTask(ctx context.Context,
}
}
base := newBaseTask(ctx, source, collectionID, replicaID, shard)
base := newBaseTask(ctx, source, collectionID, replicaID, shard, fmt.Sprintf("SegmentTask-%s-%d", actions[0].Type().String(), segmentID))
base.actions = actions
return &SegmentTask{
baseTask: base,
@ -322,7 +322,7 @@ func (task *SegmentTask) Shard() string {
return task.shard
}
func (task *SegmentTask) SegmentID() UniqueID {
func (task *SegmentTask) SegmentID() typeutil.UniqueID {
return task.segmentID
}
@ -345,7 +345,7 @@ func NewChannelTask(ctx context.Context,
timeout time.Duration,
source Source,
collectionID,
replicaID UniqueID,
replicaID typeutil.UniqueID,
actions ...Action,
) (*ChannelTask, error) {
if len(actions) == 0 {
@ -365,7 +365,7 @@ func NewChannelTask(ctx context.Context,
}
}
base := newBaseTask(ctx, source, collectionID, replicaID, channel)
base := newBaseTask(ctx, source, collectionID, replicaID, channel, fmt.Sprintf("ChannelTask-%s-%s", actions[0].Type().String(), channel))
base.actions = actions
return &ChannelTask{
baseTask: base,