enhance: Add open telemetry tracing for compaction (#30168)

Resolves #30167

This PR add tracing for all compaction from the task start in datacoord
and execution procedures in datanode.

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/29931/head
congqixia 2024-01-23 10:37:00 +08:00 committed by GitHub
parent 3d3101796b
commit 6a73860815
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 80 additions and 28 deletions

View File

@ -24,6 +24,8 @@ import (
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -31,6 +33,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// TODO this num should be determined by resources of datanode, for now, we set to a fixed value for simple
@ -88,6 +91,7 @@ type compactionTask struct {
state compactionTaskState
dataNodeID int64
result *datapb.CompactionPlanResult
span trace.Span
}
func (t *compactionTask) shadowClone(opts ...compactionTaskOpt) *compactionTask {
@ -96,6 +100,7 @@ func (t *compactionTask) shadowClone(opts ...compactionTaskOpt) *compactionTask
plan: t.plan,
state: t.state,
dataNodeID: t.dataNodeID,
span: t.span,
}
for _, opt := range opts {
opt(task)
@ -276,11 +281,14 @@ func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *data
log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.Int64("nodeID", nodeID))
c.setSegmentsCompacting(plan, true)
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", plan.GetType()))
task := &compactionTask{
triggerInfo: signal,
plan: plan,
state: pipelining,
dataNodeID: nodeID,
span: span,
}
c.mu.Lock()
c.plans[plan.PlanID] = task
@ -346,7 +354,11 @@ func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) {
return nil, err
}
c.updateTask(plan.PlanID, setStartTime(ts))
err = c.sessions.Compaction(innerTask.dataNodeID, plan)
ctx := trace.ContextWithSpan(context.Background(), task.span)
err = c.sessions.Compaction(ctx, innerTask.dataNodeID, plan)
c.updateTask(plan.PlanID, setState(executing))
if err != nil {
log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err))
@ -397,7 +409,7 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionPlan
return errors.New("unknown compaction type")
}
UpdateCompactionSegmentSizeMetrics(result.GetSegments())
c.plans[planID] = c.plans[planID].shadowClone(setState(completed), setResult(result), cleanLogPath())
c.plans[planID] = c.plans[planID].shadowClone(setState(completed), setResult(result), cleanLogPath(), endSpan())
return nil
}
@ -516,12 +528,12 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
zap.Uint64("startTime", task.plan.GetStartTime()),
zap.Uint64("now", ts),
)
c.plans[planID] = c.plans[planID].shadowClone(setState(timeout))
c.plans[planID] = c.plans[planID].shadowClone(setState(timeout), endSpan())
continue
}
log.Info("compaction failed", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID))
c.plans[planID] = c.plans[planID].shadowClone(setState(failed))
c.plans[planID] = c.plans[planID].shadowClone(setState(failed), endSpan())
c.setSegmentsCompacting(task.plan, false)
c.scheduler.Finish(task.dataNodeID, task.plan)
}
@ -535,7 +547,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
if !ok {
log.Info("compaction failed for timeout", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID))
c.plans[planID] = c.plans[planID].shadowClone(setState(failed))
c.plans[planID] = c.plans[planID].shadowClone(setState(failed), endSpan())
c.setSegmentsCompacting(task.plan, false)
c.scheduler.Finish(task.dataNodeID, task.plan)
}
@ -602,6 +614,14 @@ func setState(state compactionTaskState) compactionTaskOpt {
}
}
func endSpan() compactionTaskOpt {
return func(task *compactionTask) {
if task.span != nil {
task.span.End()
}
}
}
func setStartTime(startTime uint64) compactionTaskOpt {
return func(task *compactionTask) {
task.plan.StartTime = startTime

View File

@ -241,13 +241,13 @@ func (_c *MockSessionManager_Close_Call) RunAndReturn(run func()) *MockSessionMa
return _c
}
// Compaction provides a mock function with given fields: nodeID, plan
func (_m *MockSessionManager) Compaction(nodeID int64, plan *datapb.CompactionPlan) error {
ret := _m.Called(nodeID, plan)
// Compaction provides a mock function with given fields: ctx, nodeID, plan
func (_m *MockSessionManager) Compaction(ctx context.Context, nodeID int64, plan *datapb.CompactionPlan) error {
ret := _m.Called(ctx, nodeID, plan)
var r0 error
if rf, ok := ret.Get(0).(func(int64, *datapb.CompactionPlan) error); ok {
r0 = rf(nodeID, plan)
if rf, ok := ret.Get(0).(func(context.Context, int64, *datapb.CompactionPlan) error); ok {
r0 = rf(ctx, nodeID, plan)
} else {
r0 = ret.Error(0)
}
@ -261,15 +261,16 @@ type MockSessionManager_Compaction_Call struct {
}
// Compaction is a helper method to define mock.On call
// - ctx context.Context
// - nodeID int64
// - plan *datapb.CompactionPlan
func (_e *MockSessionManager_Expecter) Compaction(nodeID interface{}, plan interface{}) *MockSessionManager_Compaction_Call {
return &MockSessionManager_Compaction_Call{Call: _e.mock.On("Compaction", nodeID, plan)}
func (_e *MockSessionManager_Expecter) Compaction(ctx interface{}, nodeID interface{}, plan interface{}) *MockSessionManager_Compaction_Call {
return &MockSessionManager_Compaction_Call{Call: _e.mock.On("Compaction", ctx, nodeID, plan)}
}
func (_c *MockSessionManager_Compaction_Call) Run(run func(nodeID int64, plan *datapb.CompactionPlan)) *MockSessionManager_Compaction_Call {
func (_c *MockSessionManager_Compaction_Call) Run(run func(ctx context.Context, nodeID int64, plan *datapb.CompactionPlan)) *MockSessionManager_Compaction_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(*datapb.CompactionPlan))
run(args[0].(context.Context), args[1].(int64), args[2].(*datapb.CompactionPlan))
})
return _c
}
@ -279,7 +280,7 @@ func (_c *MockSessionManager_Compaction_Call) Return(_a0 error) *MockSessionMana
return _c
}
func (_c *MockSessionManager_Compaction_Call) RunAndReturn(run func(int64, *datapb.CompactionPlan) error) *MockSessionManager_Compaction_Call {
func (_c *MockSessionManager_Compaction_Call) RunAndReturn(run func(context.Context, int64, *datapb.CompactionPlan) error) *MockSessionManager_Compaction_Call {
_c.Call.Return(run)
return _c
}

View File

@ -55,7 +55,7 @@ type SessionManager interface {
Flush(ctx context.Context, nodeID int64, req *datapb.FlushSegmentsRequest)
FlushChannels(ctx context.Context, nodeID int64, req *datapb.FlushChannelsRequest) error
Compaction(nodeID int64, plan *datapb.CompactionPlan) error
Compaction(ctx context.Context, nodeID int64, plan *datapb.CompactionPlan) error
SyncSegments(nodeID int64, req *datapb.SyncSegmentsRequest) error
Import(ctx context.Context, nodeID int64, itr *datapb.ImportTaskRequest)
GetCompactionPlansResults() map[int64]*datapb.CompactionPlanResult
@ -186,8 +186,8 @@ func (c *SessionManagerImpl) execFlush(ctx context.Context, nodeID int64, req *d
}
// Compaction is a grpc interface. It will send request to DataNode with provided `nodeID` synchronously.
func (c *SessionManagerImpl) Compaction(nodeID int64, plan *datapb.CompactionPlan) error {
ctx, cancel := context.WithTimeout(context.Background(), Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second))
func (c *SessionManagerImpl) Compaction(ctx context.Context, nodeID int64, plan *datapb.CompactionPlan) error {
ctx, cancel := context.WithTimeout(ctx, Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second))
defer cancel()
cli, err := c.getClient(ctx, nodeID)
if err != nil {

View File

@ -23,6 +23,7 @@ import (
"time"
"github.com/cockroachdb/errors"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/datanode/allocator"
@ -33,6 +34,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var (
@ -72,6 +74,8 @@ var (
)
func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "download")
defer span.End()
log.Debug("down load", zap.Strings("path", paths))
resp := make([]*Blob, len(paths))
if len(paths) == 0 {
@ -248,6 +252,8 @@ func (b *binlogIO) uploadStatsLog(
totRows int64,
meta *etcdpb.CollectionMeta,
) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "UploadStatslog")
defer span.End()
var inPaths map[int64]*datapb.FieldBinlog
var err error
@ -285,6 +291,8 @@ func (b *binlogIO) uploadInsertLog(
iData *InsertData,
meta *etcdpb.CollectionMeta,
) (map[UniqueID]*datapb.FieldBinlog, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "UploadInsertLog")
defer span.End()
iCodec := storage.NewInsertCodecWithSchema(meta)
kvs := make(map[string][]byte)
@ -316,6 +324,8 @@ func (b *binlogIO) uploadDeltaLog(
dData *DeleteData,
meta *etcdpb.CollectionMeta,
) ([]*datapb.FieldBinlog, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "UploadDeltaLog")
defer span.End()
var (
deltaInfo = make([]*datapb.FieldBinlog, 0)
kvs = make(map[string][]byte)

View File

@ -22,6 +22,7 @@ import (
"time"
"github.com/cockroachdb/errors"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -207,13 +208,15 @@ func (t *compactionTask) uploadSingleInsertLog(
}
func (t *compactionTask) merge(
ctxTimeout context.Context,
ctx context.Context,
unMergedInsertlogs [][]string,
targetSegID UniqueID,
partID UniqueID,
meta *etcdpb.CollectionMeta,
delta map[interface{}]Timestamp,
) ([]*datapb.FieldBinlog, []*datapb.FieldBinlog, int64, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("CompactMerge-%d", t.getPlanID()))
defer span.End()
log := log.With(zap.Int64("planID", t.getPlanID()))
mergeStart := time.Now()
@ -316,7 +319,7 @@ func (t *compactionTask) merge(
for _, path := range unMergedInsertlogs {
downloadStart := time.Now()
data, err := t.download(ctxTimeout, path)
data, err := t.download(ctx, path)
if err != nil {
log.Warn("download insertlogs wrong", zap.Strings("path", path), zap.Error(err))
return nil, nil, 0, err
@ -374,7 +377,7 @@ func (t *compactionTask) merge(
if (currentRows+1)%100 == 0 && writeBuffer.GetMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsInt() {
numRows += int64(writeBuffer.GetRowNum())
uploadInsertStart := time.Now()
inPaths, err := t.uploadSingleInsertLog(ctxTimeout, targetSegID, partID, meta, writeBuffer, fID2Type)
inPaths, err := t.uploadSingleInsertLog(ctx, targetSegID, partID, meta, writeBuffer, fID2Type)
if err != nil {
log.Warn("failed to upload single insert log", zap.Error(err))
return nil, nil, 0, err
@ -395,7 +398,7 @@ func (t *compactionTask) merge(
if writeBuffer.GetRowNum() > 0 || numRows > 0 {
numRows += int64(writeBuffer.GetRowNum())
uploadStart := time.Now()
inPaths, statsPaths, err := t.uploadRemainLog(ctxTimeout, targetSegID, partID, meta,
inPaths, statsPaths, err := t.uploadRemainLog(ctx, targetSegID, partID, meta,
stats, numRows+int64(currentRows), writeBuffer, fID2Type)
if err != nil {
return nil, nil, 0, err
@ -427,15 +430,17 @@ func (t *compactionTask) merge(
}
func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(t.ctx, fmt.Sprintf("Compact-%d", t.getPlanID()))
defer span.End()
log := log.With(zap.Int64("planID", t.plan.GetPlanID()))
compactStart := time.Now()
if ok := funcutil.CheckCtxValid(t.ctx); !ok {
if ok := funcutil.CheckCtxValid(ctx); !ok {
log.Warn("compact wrong, task context done or timeout")
return nil, errContext
}
durInQueue := t.tr.RecordSpan()
ctxTimeout, cancelAll := context.WithTimeout(t.ctx, time.Duration(t.plan.GetTimeoutInSeconds())*time.Second)
ctxTimeout, cancelAll := context.WithTimeout(ctx, time.Duration(t.plan.GetTimeoutInSeconds())*time.Second)
defer cancelAll()
var targetSegID UniqueID

View File

@ -20,9 +20,12 @@ import (
"context"
"path"
"go.opentelemetry.io/otel"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type BinlogIO interface {
@ -42,6 +45,8 @@ func NewBinlogIO(cm storage.ChunkManager, ioPool *conc.Pool[any]) BinlogIO {
}
func (b *BinlogIoImpl) Download(ctx context.Context, paths []string) ([][]byte, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "Download")
defer span.End()
future := b.pool.Submit(func() (any, error) {
var vs [][]byte
var err error
@ -63,6 +68,8 @@ func (b *BinlogIoImpl) Download(ctx context.Context, paths []string) ([][]byte,
}
func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "Upload")
defer span.End()
future := b.pool.Submit(func() (any, error) {
err := retry.Do(ctx, func() error {
return b.MultiWrite(ctx, kvs)

View File

@ -22,6 +22,7 @@ import (
"time"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -41,6 +42,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type levelZeroCompactionTask struct {
@ -108,15 +110,17 @@ func (t *levelZeroCompactionTask) getCollection() int64 {
func (t *levelZeroCompactionTask) injectDone() {}
func (t *levelZeroCompactionTask) compact() (*datapb.CompactionPlanResult, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(t.ctx, "L0Compact")
defer span.End()
log := log.With(zap.Int64("planID", t.plan.GetPlanID()), zap.String("type", t.plan.GetType().String()))
log.Info("L0 compaction", zap.Duration("wait in queue elapse", t.tr.RecordSpan()))
if !funcutil.CheckCtxValid(t.ctx) {
if !funcutil.CheckCtxValid(ctx) {
log.Warn("compact wrong, task context done or timeout")
return nil, errContext
}
ctxTimeout, cancelAll := context.WithTimeout(t.ctx, time.Duration(t.plan.GetTimeoutInSeconds())*time.Second)
ctxTimeout, cancelAll := context.WithTimeout(ctx, time.Duration(t.plan.GetTimeoutInSeconds())*time.Second)
defer cancelAll()
l0Segments := lo.Filter(t.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool {

View File

@ -27,6 +27,7 @@ import (
"time"
"github.com/cockroachdb/errors"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -264,12 +265,16 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
}
}
spanCtx := trace.SpanContextFromContext(ctx)
taskCtx := trace.ContextWithSpanContext(node.ctx, spanCtx)
var task compactor
switch req.GetType() {
case datapb.CompactionType_Level0DeleteCompaction:
binlogIO := io.NewBinlogIO(node.chunkManager, getOrCreateIOPool())
task = newLevelZeroCompactionTask(
node.ctx,
taskCtx,
binlogIO,
node.allocator,
ds.metacache,
@ -280,7 +285,7 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
// TODO, replace this binlogIO with io.BinlogIO
binlogIO := &binlogIO{node.chunkManager, ds.idAllocator}
task = newCompactionTask(
node.ctx,
taskCtx,
binlogIO, binlogIO,
ds.metacache,
node.syncMgr,