enhance: [2.3] make Load process traceable in querynode & segcore (#30187)

Cherry-pick from master, modified some files since branching
pr: #29858
See also #29803

This PR:
- Add trace span for LoadIndex & LoadFieldData in segment loader
- Add TraceCtx parameter for Index.Load in segcore
- Add span for ReadFiles & Engine Load for Memory/Disk Vector index

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/30234/head
congqixia 2024-01-23 15:58:57 +08:00 committed by GitHub
parent e0beea7fb6
commit 35e4165722
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 140 additions and 75 deletions

View File

@ -85,7 +85,7 @@ GetTracer() {
}
std::shared_ptr<trace::Span>
StartSpan(std::string name, TraceContext* parentCtx) {
StartSpan(const std::string& name, TraceContext* parentCtx) {
trace::StartSpanOptions opts;
if (enable_trace && parentCtx != nullptr && parentCtx->traceID != nullptr &&
parentCtx->spanID != nullptr) {

View File

@ -9,6 +9,8 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <memory>
#include <string>
@ -41,7 +43,7 @@ std::shared_ptr<trace::Tracer>
GetTracer();
std::shared_ptr<trace::Span>
StartSpan(std::string name, TraceContext* ctx = nullptr);
StartSpan(const std::string& name, TraceContext* ctx = nullptr);
void
SetRootSpan(std::shared_ptr<trace::Span> span);

View File

@ -21,6 +21,7 @@
#include "common/EasyAssert.h"
#include "knowhere/comp/index_param.h"
#include "knowhere/dataset.h"
#include "common/Tracer.h"
#include "common/Types.h"
const std::string kMmapFilepath = "mmap_filepath";
@ -40,7 +41,7 @@ class IndexBase {
Load(const BinarySet& binary_set, const Config& config = {}) = 0;
virtual void
Load(const Config& config = {}) = 0;
Load(milvus::tracer::TraceContext ctx, const Config& config = {}) = 0;
virtual void
BuildWithRawData(size_t n,

View File

@ -166,7 +166,8 @@ ScalarIndexSort<T>::Load(const BinarySet& index_binary, const Config& config) {
template <typename T>
inline void
ScalarIndexSort<T>::Load(const Config& config) {
ScalarIndexSort<T>::Load(milvus::tracer::TraceContext ctx,
const Config& config) {
auto index_files =
GetValueFromConfig<std::vector<std::string>>(config, "index_files");
AssertInfo(index_files.has_value(),

View File

@ -43,7 +43,7 @@ class ScalarIndexSort : public ScalarIndex<T> {
Load(const BinarySet& index_binary, const Config& config = {}) override;
void
Load(const Config& config = {}) override;
Load(milvus::tracer::TraceContext ctx, const Config& config = {}) override;
int64_t
Count() override {

View File

@ -202,7 +202,8 @@ StringIndexMarisa::Load(const BinarySet& set, const Config& config) {
}
void
StringIndexMarisa::Load(const Config& config) {
StringIndexMarisa::Load(milvus::tracer::TraceContext ctx,
const Config& config) {
auto index_files =
GetValueFromConfig<std::vector<std::string>>(config, "index_files");
AssertInfo(index_files.has_value(),

View File

@ -42,7 +42,7 @@ class StringIndexMarisa : public StringIndex {
Load(const BinarySet& set, const Config& config = {}) override;
void
Load(const Config& config = {}) override;
Load(milvus::tracer::TraceContext ctx, const Config& config = {}) override;
int64_t
Count() override {

View File

@ -64,24 +64,39 @@ template <typename T>
void
VectorDiskAnnIndex<T>::Load(const BinarySet& binary_set /* not used */,
const Config& config) {
Load(config);
Load(milvus::tracer::TraceContext{}, config);
}
template <typename T>
void
VectorDiskAnnIndex<T>::Load(const Config& config) {
VectorDiskAnnIndex<T>::Load(milvus::tracer::TraceContext ctx,
const Config& config) {
knowhere::Json load_config = update_load_json(config);
auto index_files =
GetValueFromConfig<std::vector<std::string>>(config, "index_files");
AssertInfo(index_files.has_value(),
"index file paths is empty when load disk ann index data");
file_manager_->CacheIndexToDisk(index_files.value());
// start read file span with active scope
{
auto read_file_span =
milvus::tracer::StartSpan("SegCoreReadDiskIndexFile", &ctx);
auto read_scope =
milvus::tracer::GetTracer()->WithActiveSpan(read_file_span);
auto index_files =
GetValueFromConfig<std::vector<std::string>>(config, "index_files");
AssertInfo(index_files.has_value(),
"index file paths is empty when load disk ann index data");
file_manager_->CacheIndexToDisk(index_files.value());
read_file_span->End();
}
// start engine load index span
auto span_load_engine =
milvus::tracer::StartSpan("SegCoreEngineLoadDiskIndex", &ctx);
auto engine_scope =
milvus::tracer::GetTracer()->WithActiveSpan(span_load_engine);
auto stat = index_.Deserialize(knowhere::BinarySet(), load_config);
if (stat != knowhere::Status::success)
PanicInfo(ErrorCode::UnexpectedError,
"failed to Deserialize index, " + KnowhereStatusString(stat));
span_load_engine->End();
SetDim(index_.Dim());
}

View File

@ -57,7 +57,7 @@ class VectorDiskAnnIndex : public VectorIndex {
const Config& config = {}) override;
void
Load(const Config& config = {}) override;
Load(milvus::tracer::TraceContext ctx, const Config& config = {}) override;
void
BuildWithDataset(const DatasetPtr& dataset,

View File

@ -108,7 +108,7 @@ VectorMemIndex::Load(const BinarySet& binary_set, const Config& config) {
}
void
VectorMemIndex::Load(const Config& config) {
VectorMemIndex::Load(milvus::tracer::TraceContext ctx, const Config& config) {
if (config.contains(kMmapFilepath)) {
return LoadFromFile(config);
}
@ -138,62 +138,72 @@ VectorMemIndex::Load(const Config& config) {
}
}
LOG_SEGCORE_INFO_ << "load with slice meta: "
<< !slice_meta_filepath.empty();
// start read file span with active scope
{
auto read_file_span =
milvus::tracer::StartSpan("SegCoreReadIndexFile", &ctx);
auto read_scope =
milvus::tracer::GetTracer()->WithActiveSpan(read_file_span);
LOG_SEGCORE_INFO_ << "load with slice meta: "
<< !slice_meta_filepath.empty();
if (!slice_meta_filepath
.empty()) { // load with the slice meta info, then we can load batch by batch
std::string index_file_prefix = slice_meta_filepath.substr(
0, slice_meta_filepath.find_last_of('/') + 1);
if (!slice_meta_filepath
.empty()) { // load with the slice meta info, then we can load batch by batch
std::string index_file_prefix = slice_meta_filepath.substr(
0, slice_meta_filepath.find_last_of('/') + 1);
auto result = file_manager_->LoadIndexToMemory({slice_meta_filepath});
auto raw_slice_meta = result[INDEX_FILE_SLICE_META];
Config meta_data = Config::parse(
std::string(static_cast<const char*>(raw_slice_meta->Data()),
raw_slice_meta->Size()));
auto result =
file_manager_->LoadIndexToMemory({slice_meta_filepath});
auto raw_slice_meta = result[INDEX_FILE_SLICE_META];
Config meta_data = Config::parse(
std::string(static_cast<const char*>(raw_slice_meta->Data()),
raw_slice_meta->Size()));
for (auto& item : meta_data[META]) {
std::string prefix = item[NAME];
int slice_num = item[SLICE_NUM];
auto total_len = static_cast<size_t>(item[TOTAL_LEN]);
for (auto& item : meta_data[META]) {
std::string prefix = item[NAME];
int slice_num = item[SLICE_NUM];
auto total_len = static_cast<size_t>(item[TOTAL_LEN]);
auto new_field_data =
milvus::storage::CreateFieldData(DataType::INT8, 1, total_len);
auto new_field_data = milvus::storage::CreateFieldData(
DataType::INT8, 1, total_len);
std::vector<std::string> batch;
batch.reserve(slice_num);
for (auto i = 0; i < slice_num; ++i) {
std::string file_name = GenSlicedFileName(prefix, i);
batch.push_back(index_file_prefix + file_name);
std::vector<std::string> batch;
batch.reserve(slice_num);
for (auto i = 0; i < slice_num; ++i) {
std::string file_name = GenSlicedFileName(prefix, i);
batch.push_back(index_file_prefix + file_name);
}
auto batch_data = file_manager_->LoadIndexToMemory(batch);
for (const auto& file_path : batch) {
const std::string file_name =
file_path.substr(file_path.find_last_of('/') + 1);
AssertInfo(batch_data.find(file_name) != batch_data.end(),
"lost index slice data: {}",
file_name);
auto data = batch_data[file_name];
new_field_data->FillFieldData(data->Data(), data->Size());
}
for (auto& file : batch) {
pending_index_files.erase(file);
}
AssertInfo(
new_field_data->IsFull(),
"index len is inconsistent after disassemble and assemble");
index_datas[prefix] = new_field_data;
}
auto batch_data = file_manager_->LoadIndexToMemory(batch);
for (const auto& file_path : batch) {
const std::string file_name =
file_path.substr(file_path.find_last_of('/') + 1);
AssertInfo(batch_data.find(file_name) != batch_data.end(),
"lost index slice data: {}",
file_name);
auto data = batch_data[file_name];
new_field_data->FillFieldData(data->Data(), data->Size());
}
for (auto& file : batch) {
pending_index_files.erase(file);
}
AssertInfo(
new_field_data->IsFull(),
"index len is inconsistent after disassemble and assemble");
index_datas[prefix] = new_field_data;
}
}
if (!pending_index_files.empty()) {
auto result = file_manager_->LoadIndexToMemory(std::vector<std::string>(
pending_index_files.begin(), pending_index_files.end()));
for (auto&& index_data : result) {
index_datas.insert(std::move(index_data));
if (!pending_index_files.empty()) {
auto result =
file_manager_->LoadIndexToMemory(std::vector<std::string>(
pending_index_files.begin(), pending_index_files.end()));
for (auto&& index_data : result) {
index_datas.insert(std::move(index_data));
}
}
read_file_span->End();
}
LOG_SEGCORE_INFO_ << "construct binary set...";
@ -207,6 +217,11 @@ VectorMemIndex::Load(const Config& config) {
binary_set.Append(key, buf, size);
}
// start engine load index span
auto span_load_engine =
milvus::tracer::StartSpan("SegCoreEngineLoadIndex", &ctx);
auto engine_scope =
milvus::tracer::GetTracer()->WithActiveSpan(span_load_engine);
LOG_SEGCORE_INFO_ << "load index into Knowhere...";
LoadWithoutAssemble(binary_set, config);
LOG_SEGCORE_INFO_ << "load vector index done";

View File

@ -43,7 +43,7 @@ class VectorMemIndex : public VectorIndex {
Load(const BinarySet& binary_set, const Config& config = {}) override;
void
Load(const Config& config = {}) override;
Load(milvus::tracer::TraceContext ctx, const Config& config = {}) override;
void
BuildWithDataset(const DatasetPtr& dataset,

View File

@ -13,6 +13,8 @@
#include "common/FieldMeta.h"
#include "common/EasyAssert.h"
#include "common/Tracer.h"
#include "common/type_c.h"
#include "index/Index.h"
#include "index/IndexFactory.h"
#include "index/Meta.h"
@ -204,10 +206,10 @@ AppendIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) {
}
CStatus
AppendIndexV2(CLoadIndexInfo c_load_index_info) {
AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) {
try {
auto load_index_info =
(milvus::segcore::LoadIndexInfo*)c_load_index_info;
static_cast<milvus::segcore::LoadIndexInfo*>(c_load_index_info);
auto& index_params = load_index_info->index_params;
auto field_type = load_index_info->field_type;
@ -217,6 +219,11 @@ AppendIndexV2(CLoadIndexInfo c_load_index_info) {
index_info.field_type = load_index_info->field_type;
index_info.index_engine_version = engine_version;
auto ctx = milvus::tracer::TraceContext{
c_trace.traceID, c_trace.spanID, c_trace.flag};
auto span = milvus::tracer::StartSpan("SegCoreLoadIndex", &ctx);
milvus::tracer::SetRootSpan(span);
// get index type
AssertInfo(index_params.find("index_type") != index_params.end(),
"index type is empty");
@ -264,7 +271,7 @@ AppendIndexV2(CLoadIndexInfo c_load_index_info) {
config[kMmapFilepath] = filepath.string();
}
load_index_info->index->Load(config);
load_index_info->index->Load(ctx, config);
auto status = CStatus();
status.error_code = milvus::Success;
status.error_msg = "";

View File

@ -56,7 +56,7 @@ CStatus
AppendIndexFilePath(CLoadIndexInfo c_load_index_info, const char* file_path);
CStatus
AppendIndexV2(CLoadIndexInfo c_load_index_info);
AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info);
CStatus
AppendIndexEngineVersionToLoadInfo(CLoadIndexInfo c_load_index_info,

View File

@ -17,6 +17,7 @@
#include <vector>
#include "common/EasyAssert.h"
#include "common/Tracer.h"
#include "common/Types.h"
#include "knowhere/comp/index_param.h"
#include "nlohmann/json.hpp"
@ -418,7 +419,7 @@ TEST_P(IndexTest, BuildAndQuery) {
}
load_conf = generate_load_conf(index_type, metric_type, 0);
load_conf["index_files"] = index_files;
ASSERT_NO_THROW(vec_index->Load(load_conf));
ASSERT_NO_THROW(vec_index->Load(milvus::tracer::TraceContext{}, load_conf));
EXPECT_EQ(vec_index->Count(), NB);
EXPECT_EQ(vec_index->GetDim(), DIM);
@ -476,7 +477,7 @@ TEST_P(IndexTest, Mmap) {
load_conf = generate_load_conf(index_type, metric_type, 0);
load_conf["index_files"] = index_files;
load_conf["mmap_filepath"] = "mmap/test_index_mmap_" + index_type;
vec_index->Load(load_conf);
vec_index->Load(milvus::tracer::TraceContext{}, load_conf);
EXPECT_EQ(vec_index->Count(), NB);
EXPECT_EQ(vec_index->GetDim(), DIM);
@ -533,7 +534,7 @@ TEST_P(IndexTest, GetVector) {
vec_index->Load(binary_set, load_conf);
EXPECT_EQ(vec_index->Count(), NB);
} else {
vec_index->Load(load_conf);
vec_index->Load(milvus::tracer::TraceContext{}, load_conf);
}
EXPECT_EQ(vec_index->GetDim(), DIM);
EXPECT_EQ(vec_index->Count(), NB);
@ -630,7 +631,7 @@ TEST(Indexing, SearchDiskAnnWithInvalidParam) {
}
auto load_conf = generate_load_conf(index_type, metric_type, NB);
load_conf["index_files"] = index_files;
vec_index->Load(load_conf);
vec_index->Load(milvus::tracer::TraceContext{}, load_conf);
EXPECT_EQ(vec_index->Count(), NB);
// search disk index with search_list == limit

View File

@ -932,7 +932,7 @@ GenVecIndexing(int64_t N, int64_t dim, const float* vec) {
conf["index_files"] = index_files;
// we need a load stage to use index as the producation does
// knowhere would do some data preparation in this stage
indexing->Load(conf);
indexing->Load(milvus::tracer::TraceContext{}, conf);
return indexing;
}

View File

@ -28,6 +28,8 @@ import (
"context"
"unsafe"
"go.opentelemetry.io/otel/trace"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/funcutil"
@ -155,7 +157,17 @@ func (li *LoadIndexInfo) appendIndexData(ctx context.Context, indexKeys []string
}
}
status := C.AppendIndexV2(li.cLoadIndexInfo)
span := trace.SpanFromContext(ctx)
traceID := span.SpanContext().TraceID()
spanID := span.SpanContext().SpanID()
traceCtx := C.CTraceContext{
traceID: (*C.uint8_t)(unsafe.Pointer(&traceID[0])),
spanID: (*C.uint8_t)(unsafe.Pointer(&spanID[0])),
flag: C.uchar(span.SpanContext().TraceFlags()),
}
status := C.AppendIndexV2(traceCtx, li.cLoadIndexInfo)
return HandleCStatus(ctx, &status, "AppendIndex failed")
}

View File

@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"go.uber.org/zap"
@ -672,6 +673,8 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context, rowCount int64, f
func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCount int64, field *datapb.FieldBinlog) error {
s.ptrLock.RLock()
defer s.ptrLock.RUnlock()
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadFieldData-%d-%d", s.segmentID, fieldID))
defer sp.End()
if s.ptr == nil {
return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
@ -859,6 +862,9 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del
}
func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIndexInfo, fieldType schemapb.DataType) error {
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadIndex-%d-%d", s.segmentID, indexInfo.GetFieldID()))
defer sp.End()
log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),

View File

@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
@ -788,6 +789,9 @@ func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int6
}
func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment *LocalSegment, deltaLogs []*datapb.FieldBinlog) error {
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadDeltalogs-%d", segment.ID()))
defer sp.End()
log := log.Ctx(ctx).With(
zap.Int64("segmentID", segment.ID()),
)