mirror of https://github.com/milvus-io/milvus.git
Signed-off-by: MrPresent-Han <chun.han@zilliz.com>pull/26347/head
parent
f371ec0316
commit
d30a920226
|
@ -34,6 +34,9 @@ namespace jaeger = opentelemetry::exporter::jaeger;
|
|||
namespace ostream = opentelemetry::exporter::trace;
|
||||
namespace otlp = opentelemetry::exporter::otlp;
|
||||
|
||||
static const int trace_id_size = 2 * opentelemetry::trace::TraceId::kSize;
|
||||
static bool enable_trace = true;
|
||||
|
||||
void
|
||||
initTelementry(TraceConfig* config) {
|
||||
std::unique_ptr<opentelemetry::sdk::trace::SpanExporter> exporter;
|
||||
|
@ -50,6 +53,7 @@ initTelementry(TraceConfig* config) {
|
|||
exporter = otlp::OtlpGrpcExporterFactory::Create(opts);
|
||||
} else {
|
||||
LOG_SEGCORE_INFO_ << "Empty Trace";
|
||||
enable_trace = false;
|
||||
}
|
||||
auto processor =
|
||||
trace_sdk::BatchSpanProcessorFactory::Create(std::move(exporter), {});
|
||||
|
@ -84,4 +88,39 @@ StartSpan(std::string name, TraceContext* parentCtx) {
|
|||
return GetTracer()->StartSpan(name, opts);
|
||||
}
|
||||
|
||||
thread_local std::shared_ptr<trace::Span> local_span;
|
||||
void
|
||||
SetRootSpan(std::shared_ptr<trace::Span> span) {
|
||||
if (enable_trace) {
|
||||
local_span = span;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
CloseRootSpan() {
|
||||
if (enable_trace) {
|
||||
local_span = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<trace::Span>
|
||||
GetRootSpan() {
|
||||
if (enable_trace && local_span != nullptr) {
|
||||
return local_span;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void
|
||||
logTraceContext(const std::string& extended_info,
|
||||
const std::shared_ptr<trace::Span> span) {
|
||||
if (enable_trace && span != nullptr) {
|
||||
char traceID[trace_id_size];
|
||||
span->GetContext().trace_id().ToLowerBase16(
|
||||
nostd::span<char, 2 * opentelemetry::trace::TraceId::kSize>{
|
||||
&traceID[0], trace_id_size});
|
||||
LOG_SEGCORE_INFO_ << extended_info << ", traceID:" << traceID;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace milvus::tracer
|
||||
|
|
|
@ -42,4 +42,17 @@ GetTracer();
|
|||
std::shared_ptr<trace::Span>
|
||||
StartSpan(std::string name, TraceContext* ctx = nullptr);
|
||||
|
||||
void
|
||||
logTraceContext(const std::string& extended_info,
|
||||
std::shared_ptr<trace::Span> span);
|
||||
|
||||
void
|
||||
SetRootSpan(std::shared_ptr<trace::Span> span);
|
||||
|
||||
void
|
||||
CloseRootSpan();
|
||||
|
||||
std::shared_ptr<trace::Span>
|
||||
GetRootSpan();
|
||||
|
||||
} // namespace milvus::tracer
|
||||
|
|
|
@ -78,9 +78,12 @@ Search(CSegmentInterface c_segment,
|
|||
c_placeholder_group);
|
||||
auto ctx = milvus::tracer::TraceContext{
|
||||
c_trace.traceID, c_trace.spanID, c_trace.flag};
|
||||
|
||||
auto span = milvus::tracer::StartSpan("SegcoreSearch", &ctx);
|
||||
|
||||
auto span = milvus::tracer::StartSpan("SegCoreSearch", &ctx);
|
||||
milvus::tracer::logTraceContext(
|
||||
"SegCore_SegmentSearch_SegmentID:" +
|
||||
std::to_string(segment->get_segment_id()),
|
||||
span);
|
||||
milvus::tracer::SetRootSpan(span);
|
||||
auto search_result = segment->Search(plan, phg_ptr);
|
||||
if (!milvus::PositivelyRelated(
|
||||
plan->plan_node_->search_info_.metric_type_)) {
|
||||
|
@ -89,8 +92,8 @@ Search(CSegmentInterface c_segment,
|
|||
}
|
||||
}
|
||||
*result = search_result.release();
|
||||
|
||||
span->End();
|
||||
milvus::tracer::CloseRootSpan();
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(UnexpectedError, e.what());
|
||||
|
|
|
@ -95,7 +95,7 @@ func (it *insertTask) OnEnqueue() error {
|
|||
}
|
||||
|
||||
func (it *insertTask) PreExecute(ctx context.Context) error {
|
||||
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Insert-PreExecute")
|
||||
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Insert-PreExecute")
|
||||
defer sp.End()
|
||||
|
||||
it.result = &milvuspb.MutationResult{
|
||||
|
@ -212,7 +212,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (it *insertTask) Execute(ctx context.Context) error {
|
||||
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Insert-PreExecute")
|
||||
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Insert-Execute")
|
||||
defer sp.End()
|
||||
|
||||
tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute insert %d", it.ID()))
|
||||
|
|
|
@ -20,10 +20,10 @@ import (
|
|||
"fmt"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
. "github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Manager is the interface for tsafe manager.
|
||||
|
@ -58,8 +58,9 @@ func (t *tSafeManager) WatchChannel(channel string) Listener {
|
|||
}
|
||||
|
||||
func (t *tSafeManager) Add(vChannel string, timestamp uint64) {
|
||||
ts, _ := tsoutil.ParseTS(timestamp)
|
||||
log.Info("add tSafe done",
|
||||
zap.String("channel", vChannel))
|
||||
zap.String("channel", vChannel), zap.Time("timestamp", ts))
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
if _, ok := t.tSafes[vChannel]; !ok {
|
||||
|
|
Loading…
Reference in New Issue