enhance: make Load process traceable in querynode & segcore (#29858)

See also #29803

This PR:
- Add trace span for `LoadIndex` & `LoadFieldData` in segment loader
- Add `TraceCtx` parameter for `Index.Load` in segcore
- Add span for ReadFiles & Engine Load for Memory/Disk Vector index

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/29863/head
congqixia 2024-01-10 21:58:51 +08:00 committed by GitHub
parent 73cfdab776
commit d6429933a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 152 additions and 81 deletions

View File

@ -85,7 +85,7 @@ GetTracer() {
}
std::shared_ptr<trace::Span>
StartSpan(std::string name, TraceContext* parentCtx) {
StartSpan(const std::string& name, TraceContext* parentCtx) {
trace::StartSpanOptions opts;
if (enable_trace && parentCtx != nullptr && parentCtx->traceID != nullptr &&
parentCtx->spanID != nullptr) {

View File

@ -43,7 +43,7 @@ std::shared_ptr<trace::Tracer>
GetTracer();
std::shared_ptr<trace::Span>
StartSpan(std::string name, TraceContext* ctx = nullptr);
StartSpan(const std::string& name, TraceContext* ctx = nullptr);
void
SetRootSpan(std::shared_ptr<trace::Span> span);

View File

@ -21,6 +21,7 @@
#include "common/EasyAssert.h"
#include "knowhere/comp/index_param.h"
#include "knowhere/dataset.h"
#include "common/Tracer.h"
#include "common/Types.h"
const std::string kMmapFilepath = "mmap_filepath";
@ -40,7 +41,7 @@ class IndexBase {
Load(const BinarySet& binary_set, const Config& config = {}) = 0;
virtual void
Load(const Config& config = {}) = 0;
Load(milvus::tracer::TraceContext ctx, const Config& config = {}) = 0;
virtual void
LoadV2(const Config& config = {}) = 0;

View File

@ -294,7 +294,8 @@ InvertedIndexTantivy<T>::BuildV2(const Config& config) {
template <typename T>
void
InvertedIndexTantivy<T>::Load(const Config& config) {
InvertedIndexTantivy<T>::Load(milvus::tracer::TraceContext ctx,
const Config& config) {
auto index_files =
GetValueFromConfig<std::vector<std::string>>(config, "index_files");
AssertInfo(index_files.has_value(),

View File

@ -55,7 +55,7 @@ class InvertedIndexTantivy : public ScalarIndex<T> {
}
void
Load(const Config& config = {}) override;
Load(milvus::tracer::TraceContext ctx, const Config& config = {}) override;
void
LoadV2(const Config& config = {}) override;

View File

@ -250,7 +250,8 @@ ScalarIndexSort<T>::Load(const BinarySet& index_binary, const Config& config) {
template <typename T>
void
ScalarIndexSort<T>::Load(const Config& config) {
ScalarIndexSort<T>::Load(milvus::tracer::TraceContext ctx,
const Config& config) {
auto index_files =
GetValueFromConfig<std::vector<std::string>>(config, "index_files");
AssertInfo(index_files.has_value(),

View File

@ -48,7 +48,7 @@ class ScalarIndexSort : public ScalarIndex<T> {
Load(const BinarySet& index_binary, const Config& config = {}) override;
void
Load(const Config& config = {}) override;
Load(milvus::tracer::TraceContext ctx, const Config& config = {}) override;
void
LoadV2(const Config& config = {}) override;

View File

@ -287,7 +287,8 @@ StringIndexMarisa::Load(const BinarySet& set, const Config& config) {
}
void
StringIndexMarisa::Load(const Config& config) {
StringIndexMarisa::Load(milvus::tracer::TraceContext ctx,
const Config& config) {
auto index_files =
GetValueFromConfig<std::vector<std::string>>(config, "index_files");
AssertInfo(index_files.has_value(),

View File

@ -47,7 +47,7 @@ class StringIndexMarisa : public StringIndex {
Load(const BinarySet& set, const Config& config = {}) override;
void
Load(const Config& config = {}) override;
Load(milvus::tracer::TraceContext ctx, const Config& config = {}) override;
void
LoadV2(const Config& config = {}) override;

View File

@ -16,6 +16,7 @@
#include "index/VectorDiskIndex.h"
#include "common/Tracer.h"
#include "common/Utils.h"
#include "config/ConfigKnowhere.h"
#include "index/Meta.h"
@ -93,24 +94,39 @@ template <typename T>
void
VectorDiskAnnIndex<T>::Load(const BinarySet& binary_set /* not used */,
const Config& config) {
Load(config);
Load(milvus::tracer::TraceContext{}, config);
}
template <typename T>
void
VectorDiskAnnIndex<T>::Load(const Config& config) {
VectorDiskAnnIndex<T>::Load(milvus::tracer::TraceContext ctx,
const Config& config) {
knowhere::Json load_config = update_load_json(config);
auto index_files =
GetValueFromConfig<std::vector<std::string>>(config, "index_files");
AssertInfo(index_files.has_value(),
"index file paths is empty when load disk ann index data");
file_manager_->CacheIndexToDisk(index_files.value());
// start read file span with active scope
{
auto read_file_span =
milvus::tracer::StartSpan("SegCoreReadDiskIndexFile", &ctx);
auto read_scope =
milvus::tracer::GetTracer()->WithActiveSpan(read_file_span);
auto index_files =
GetValueFromConfig<std::vector<std::string>>(config, "index_files");
AssertInfo(index_files.has_value(),
"index file paths is empty when load disk ann index data");
file_manager_->CacheIndexToDisk(index_files.value());
read_file_span->End();
}
// start engine load index span
auto span_load_engine =
milvus::tracer::StartSpan("SegCoreEngineLoadDiskIndex", &ctx);
auto engine_scope =
milvus::tracer::GetTracer()->WithActiveSpan(span_load_engine);
auto stat = index_.Deserialize(knowhere::BinarySet(), load_config);
if (stat != knowhere::Status::success)
PanicInfo(ErrorCode::UnexpectedError,
"failed to Deserialize index, " + KnowhereStatusString(stat));
span_load_engine->End();
SetDim(index_.Dim());
}

View File

@ -71,7 +71,7 @@ class VectorDiskAnnIndex : public VectorIndex {
const Config& config = {}) override;
void
Load(const Config& config = {}) override;
Load(milvus::tracer::TraceContext ctx, const Config& config = {}) override;
void
LoadV2(const Config& config = {}) override;

View File

@ -27,6 +27,7 @@
#include <unordered_set>
#include "common/Types.h"
#include "common/type_c.h"
#include "fmt/format.h"
#include "index/Index.h"
@ -274,7 +275,8 @@ VectorMemIndex<T>::LoadV2(const Config& config) {
template <typename T>
void
VectorMemIndex<T>::Load(const Config& config) {
VectorMemIndex<T>::Load(milvus::tracer::TraceContext ctx,
const Config& config) {
if (config.contains(kMmapFilepath)) {
return LoadFromFile(config);
}
@ -304,67 +306,80 @@ VectorMemIndex<T>::Load(const Config& config) {
}
}
LOG_INFO("load with slice meta: {}", !slice_meta_filepath.empty());
// start read file span with active scope
{
auto read_file_span =
milvus::tracer::StartSpan("SegCoreReadIndexFile", &ctx);
auto read_scope =
milvus::tracer::GetTracer()->WithActiveSpan(read_file_span);
LOG_INFO("load with slice meta: {}", !slice_meta_filepath.empty());
if (!slice_meta_filepath
.empty()) { // load with the slice meta info, then we can load batch by batch
std::string index_file_prefix = slice_meta_filepath.substr(
0, slice_meta_filepath.find_last_of('/') + 1);
std::vector<std::string> batch{};
batch.reserve(parallel_degree);
if (!slice_meta_filepath
.empty()) { // load with the slice meta info, then we can load batch by batch
std::string index_file_prefix = slice_meta_filepath.substr(
0, slice_meta_filepath.find_last_of('/') + 1);
std::vector<std::string> batch{};
batch.reserve(parallel_degree);
auto result = file_manager_->LoadIndexToMemory({slice_meta_filepath});
auto raw_slice_meta = result[INDEX_FILE_SLICE_META];
Config meta_data = Config::parse(
std::string(static_cast<const char*>(raw_slice_meta->Data()),
raw_slice_meta->Size()));
auto result =
file_manager_->LoadIndexToMemory({slice_meta_filepath});
auto raw_slice_meta = result[INDEX_FILE_SLICE_META];
Config meta_data = Config::parse(
std::string(static_cast<const char*>(raw_slice_meta->Data()),
raw_slice_meta->Size()));
for (auto& item : meta_data[META]) {
std::string prefix = item[NAME];
int slice_num = item[SLICE_NUM];
auto total_len = static_cast<size_t>(item[TOTAL_LEN]);
for (auto& item : meta_data[META]) {
std::string prefix = item[NAME];
int slice_num = item[SLICE_NUM];
auto total_len = static_cast<size_t>(item[TOTAL_LEN]);
auto new_field_data =
milvus::storage::CreateFieldData(DataType::INT8, 1, total_len);
auto HandleBatch = [&](int index) {
auto batch_data = file_manager_->LoadIndexToMemory(batch);
for (int j = index - batch.size() + 1; j <= index; j++) {
std::string file_name = GenSlicedFileName(prefix, j);
AssertInfo(batch_data.find(file_name) != batch_data.end(),
"lost index slice data");
auto data = batch_data[file_name];
new_field_data->FillFieldData(data->Data(), data->Size());
auto new_field_data = milvus::storage::CreateFieldData(
DataType::INT8, 1, total_len);
auto HandleBatch = [&](int index) {
auto batch_data = file_manager_->LoadIndexToMemory(batch);
for (int j = index - batch.size() + 1; j <= index; j++) {
std::string file_name = GenSlicedFileName(prefix, j);
AssertInfo(
batch_data.find(file_name) != batch_data.end(),
"lost index slice data");
auto data = batch_data[file_name];
new_field_data->FillFieldData(data->Data(),
data->Size());
}
for (auto& file : batch) {
pending_index_files.erase(file);
}
batch.clear();
};
for (auto i = 0; i < slice_num; ++i) {
std::string file_name = GenSlicedFileName(prefix, i);
batch.push_back(index_file_prefix + file_name);
if (batch.size() >= parallel_degree) {
HandleBatch(i);
}
}
for (auto& file : batch) {
pending_index_files.erase(file);
if (batch.size() > 0) {
HandleBatch(slice_num - 1);
}
batch.clear();
};
for (auto i = 0; i < slice_num; ++i) {
std::string file_name = GenSlicedFileName(prefix, i);
batch.push_back(index_file_prefix + file_name);
if (batch.size() >= parallel_degree) {
HandleBatch(i);
}
AssertInfo(
new_field_data->IsFull(),
"index len is inconsistent after disassemble and assemble");
index_datas[prefix] = new_field_data;
}
if (batch.size() > 0) {
HandleBatch(slice_num - 1);
}
if (!pending_index_files.empty()) {
auto result =
file_manager_->LoadIndexToMemory(std::vector<std::string>(
pending_index_files.begin(), pending_index_files.end()));
for (auto&& index_data : result) {
index_datas.insert(std::move(index_data));
}
AssertInfo(
new_field_data->IsFull(),
"index len is inconsistent after disassemble and assemble");
index_datas[prefix] = new_field_data;
}
}
if (!pending_index_files.empty()) {
auto result = file_manager_->LoadIndexToMemory(std::vector<std::string>(
pending_index_files.begin(), pending_index_files.end()));
for (auto&& index_data : result) {
index_datas.insert(std::move(index_data));
}
read_file_span->End();
}
LOG_INFO("construct binary set...");
@ -378,8 +393,14 @@ VectorMemIndex<T>::Load(const Config& config) {
binary_set.Append(key, buf, size);
}
// start engine load index span
auto span_load_engine =
milvus::tracer::StartSpan("SegCoreEngineLoadIndex", &ctx);
auto engine_scope =
milvus::tracer::GetTracer()->WithActiveSpan(span_load_engine);
LOG_INFO("load index into Knowhere...");
LoadWithoutAssemble(binary_set, config);
span_load_engine->End();
LOG_INFO("load vector index done");
}

View File

@ -50,7 +50,7 @@ class VectorMemIndex : public VectorIndex {
Load(const BinarySet& binary_set, const Config& config = {}) override;
void
Load(const Config& config = {}) override;
Load(milvus::tracer::TraceContext ctx, const Config& config = {}) override;
void
LoadV2(const Config& config = {}) override;

View File

@ -13,6 +13,7 @@
#include "common/FieldMeta.h"
#include "common/EasyAssert.h"
#include "common/type_c.h"
#include "index/Index.h"
#include "index/IndexFactory.h"
#include "index/Meta.h"
@ -206,10 +207,10 @@ AppendIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) {
}
CStatus
AppendIndexV2(CLoadIndexInfo c_load_index_info) {
AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) {
try {
auto load_index_info =
(milvus::segcore::LoadIndexInfo*)c_load_index_info;
static_cast<milvus::segcore::LoadIndexInfo*>(c_load_index_info);
auto& index_params = load_index_info->index_params;
auto field_type = load_index_info->field_type;
@ -219,6 +220,11 @@ AppendIndexV2(CLoadIndexInfo c_load_index_info) {
index_info.field_type = load_index_info->field_type;
index_info.index_engine_version = engine_version;
auto ctx = milvus::tracer::TraceContext{
c_trace.traceID, c_trace.spanID, c_trace.flag};
auto span = milvus::tracer::StartSpan("SegCoreLoadIndex", &ctx);
milvus::tracer::SetRootSpan(span);
LOG_INFO(
"[collection={}][segment={}][field={}][enable_mmap={}] load index "
"{}",
@ -277,7 +283,10 @@ AppendIndexV2(CLoadIndexInfo c_load_index_info) {
config[kMmapFilepath] = filepath.string();
}
load_index_info->index->Load(config);
load_index_info->index->Load(ctx, config);
span->End();
milvus::tracer::CloseRootSpan();
LOG_INFO(
"[collection={}][segment={}][field={}][enable_mmap={}] load index "

View File

@ -57,7 +57,7 @@ CStatus
AppendIndexFilePath(CLoadIndexInfo c_load_index_info, const char* file_path);
CStatus
AppendIndexV2(CLoadIndexInfo c_load_index_info);
AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info);
CStatus
AppendIndexV3(CLoadIndexInfo c_load_index_info);

View File

@ -22,6 +22,7 @@
#include "arrow/type.h"
#include "common/EasyAssert.h"
#include "common/Tracer.h"
#include "common/Types.h"
#include "index/Index.h"
#include "knowhere/comp/index_param.h"
@ -426,7 +427,7 @@ TEST_P(IndexTest, BuildAndQuery) {
}
load_conf = generate_load_conf(index_type, metric_type, 0);
load_conf["index_files"] = index_files;
ASSERT_NO_THROW(vec_index->Load(load_conf));
ASSERT_NO_THROW(vec_index->Load(milvus::tracer::TraceContext{}, load_conf));
EXPECT_EQ(vec_index->Count(), NB);
EXPECT_EQ(vec_index->GetDim(), DIM);
@ -484,7 +485,7 @@ TEST_P(IndexTest, Mmap) {
load_conf = generate_load_conf(index_type, metric_type, 0);
load_conf["index_files"] = index_files;
load_conf["mmap_filepath"] = "mmap/test_index_mmap_" + index_type;
vec_index->Load(load_conf);
vec_index->Load(milvus::tracer::TraceContext{}, load_conf);
EXPECT_EQ(vec_index->Count(), NB);
EXPECT_EQ(vec_index->GetDim(), DIM);
@ -541,7 +542,7 @@ TEST_P(IndexTest, GetVector) {
vec_index->Load(binary_set, load_conf);
EXPECT_EQ(vec_index->Count(), NB);
} else {
vec_index->Load(load_conf);
vec_index->Load(milvus::tracer::TraceContext{}, load_conf);
}
EXPECT_EQ(vec_index->GetDim(), DIM);
EXPECT_EQ(vec_index->Count(), NB);
@ -638,7 +639,7 @@ TEST(Indexing, SearchDiskAnnWithInvalidParam) {
}
auto load_conf = generate_load_conf(index_type, metric_type, NB);
load_conf["index_files"] = index_files;
vec_index->Load(load_conf);
vec_index->Load(milvus::tracer::TraceContext{}, load_conf);
EXPECT_EQ(vec_index->Count(), NB);
// search disk index with search_list == limit

View File

@ -14,6 +14,7 @@
#include <boost/filesystem.hpp>
#include <unordered_set>
#include "common/Tracer.h"
#include "index/InvertedIndexTantivy.h"
#include "storage/Util.h"
#include "storage/InsertData.h"
@ -168,7 +169,7 @@ test_run() {
auto index =
index::IndexFactory::GetInstance().CreateIndex(index_info, ctx);
index->Load(config);
index->Load(milvus::tracer::TraceContext{}, config);
auto cnt = index->Count();
ASSERT_EQ(cnt, nb);
@ -369,7 +370,7 @@ test_string() {
auto index =
index::IndexFactory::GetInstance().CreateIndex(index_info, ctx);
index->Load(config);
index->Load(milvus::tracer::TraceContext{}, config);
auto cnt = index->Count();
ASSERT_EQ(cnt, nb);

View File

@ -937,7 +937,7 @@ GenVecIndexing(int64_t N,
conf["index_files"] = index_files;
// we need a load stage to use index as the producation does
// knowhere would do some data preparation in this stage
indexing->Load(conf);
indexing->Load(milvus::tracer::TraceContext{}, conf);
return indexing;
}

View File

@ -29,6 +29,7 @@ import (
"unsafe"
"github.com/pingcap/log"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
@ -165,7 +166,17 @@ func (li *LoadIndexInfo) appendIndexData(ctx context.Context, indexKeys []string
}
}
status := C.AppendIndexV2(li.cLoadIndexInfo)
span := trace.SpanFromContext(ctx)
traceID := span.SpanContext().TraceID()
spanID := span.SpanContext().SpanID()
traceCtx := C.CTraceContext{
traceID: (*C.uint8_t)(unsafe.Pointer(&traceID[0])),
spanID: (*C.uint8_t)(unsafe.Pointer(&spanID[0])),
flag: C.uchar(span.SpanContext().TraceFlags()),
}
status := C.AppendIndexV2(traceCtx, li.cLoadIndexInfo)
return HandleCStatus(ctx, &status, "AppendIndex failed")
}

View File

@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"go.uber.org/zap"
@ -695,6 +696,8 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context, rowCount int64, f
func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCount int64, field *datapb.FieldBinlog, mmapEnabled bool) error {
s.ptrLock.RLock()
defer s.ptrLock.RUnlock()
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadFieldData-%d-%d", s.segmentID, fieldID))
defer sp.End()
if s.ptr == nil {
return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
@ -883,6 +886,8 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del
}
func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIndexInfo, fieldType schemapb.DataType) error {
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadIndex-%d-%d", s.segmentID, indexInfo.GetFieldID()))
defer sp.End()
loadIndexInfo, err := newLoadIndexInfo(ctx)
defer deleteLoadIndexInfo(loadIndexInfo)
if err != nil {

View File

@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
@ -801,6 +802,8 @@ func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int6
}
func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error {
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadDeltalogs-%d", segment.ID()))
defer sp.End()
log := log.Ctx(ctx).With(
zap.Int64("segmentID", segment.ID()),
)