fix: refine skipIndex to resolve cyclic dependcy(#29132) (#29189)

related: #29132

Signed-off-by: MrPresent-Han <chun.han@zilliz.com>
pull/29306/head
MrPresent-Han 2023-12-19 10:26:40 +08:00 committed by GitHub
parent 56d7225673
commit bfca0a7926
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 44 additions and 32 deletions

View File

@ -17,6 +17,7 @@ set(INDEX_FILES
VectorDiskIndex.cpp VectorDiskIndex.cpp
ScalarIndex.cpp ScalarIndex.cpp
ScalarIndexSort.cpp ScalarIndexSort.cpp
SkipIndex.cpp
) )
milvus_add_pkg_config("milvus_index") milvus_add_pkg_config("milvus_index")

View File

@ -17,11 +17,12 @@ static const FieldChunkMetrics defaultFieldChunkMetrics;
const FieldChunkMetrics& const FieldChunkMetrics&
SkipIndex::GetFieldChunkMetrics(milvus::FieldId field_id, int chunk_id) const { SkipIndex::GetFieldChunkMetrics(milvus::FieldId field_id, int chunk_id) const {
std::shared_lock lck(mutex_);
auto field_metrics = fieldChunkMetrics_.find(field_id); auto field_metrics = fieldChunkMetrics_.find(field_id);
if (field_metrics != fieldChunkMetrics_.end()) { if (field_metrics != fieldChunkMetrics_.end()) {
auto field_chunk_metrics = field_metrics->second.find(chunk_id); auto field_chunk_metrics = field_metrics->second.find(chunk_id);
if (field_chunk_metrics != field_metrics->second.end()) { if (field_chunk_metrics != field_metrics->second.end()) {
return field_chunk_metrics->second; return *(field_chunk_metrics->second.get());
} }
} }
return defaultFieldChunkMetrics; return defaultFieldChunkMetrics;
@ -33,17 +34,18 @@ SkipIndex::LoadPrimitive(milvus::FieldId field_id,
milvus::DataType data_type, milvus::DataType data_type,
const void* chunk_data, const void* chunk_data,
int64_t count) { int64_t count) {
FieldChunkMetrics chunkMetrics; auto chunkMetrics = std::make_unique<FieldChunkMetrics>();
if (count > 0) { if (count > 0) {
chunkMetrics.hasValue_ = true; chunkMetrics->hasValue_ = true;
switch (data_type) { switch (data_type) {
case DataType::INT8: { case DataType::INT8: {
const int8_t* typedData = const int8_t* typedData =
static_cast<const int8_t*>(chunk_data); static_cast<const int8_t*>(chunk_data);
std::pair<int8_t, int8_t> minMax = std::pair<int8_t, int8_t> minMax =
ProcessFieldMetrics<int8_t>(typedData, count); ProcessFieldMetrics<int8_t>(typedData, count);
chunkMetrics.min_ = Metrics(minMax.first); chunkMetrics->min_ = Metrics(minMax.first);
chunkMetrics.max_ = Metrics(minMax.second); chunkMetrics->max_ = Metrics(minMax.second);
break; break;
} }
case DataType::INT16: { case DataType::INT16: {
@ -51,8 +53,8 @@ SkipIndex::LoadPrimitive(milvus::FieldId field_id,
static_cast<const int16_t*>(chunk_data); static_cast<const int16_t*>(chunk_data);
std::pair<int16_t, int16_t> minMax = std::pair<int16_t, int16_t> minMax =
ProcessFieldMetrics<int16_t>(typedData, count); ProcessFieldMetrics<int16_t>(typedData, count);
chunkMetrics.min_ = Metrics(minMax.first); chunkMetrics->min_ = Metrics(minMax.first);
chunkMetrics.max_ = Metrics(minMax.second); chunkMetrics->max_ = Metrics(minMax.second);
break; break;
} }
case DataType::INT32: { case DataType::INT32: {
@ -60,8 +62,8 @@ SkipIndex::LoadPrimitive(milvus::FieldId field_id,
static_cast<const int32_t*>(chunk_data); static_cast<const int32_t*>(chunk_data);
std::pair<int32_t, int32_t> minMax = std::pair<int32_t, int32_t> minMax =
ProcessFieldMetrics<int32_t>(typedData, count); ProcessFieldMetrics<int32_t>(typedData, count);
chunkMetrics.min_ = Metrics(minMax.first); chunkMetrics->min_ = Metrics(minMax.first);
chunkMetrics.max_ = Metrics(minMax.second); chunkMetrics->max_ = Metrics(minMax.second);
break; break;
} }
case DataType::INT64: { case DataType::INT64: {
@ -69,16 +71,16 @@ SkipIndex::LoadPrimitive(milvus::FieldId field_id,
static_cast<const int64_t*>(chunk_data); static_cast<const int64_t*>(chunk_data);
std::pair<int64_t, int64_t> minMax = std::pair<int64_t, int64_t> minMax =
ProcessFieldMetrics<int64_t>(typedData, count); ProcessFieldMetrics<int64_t>(typedData, count);
chunkMetrics.min_ = Metrics(minMax.first); chunkMetrics->min_ = Metrics(minMax.first);
chunkMetrics.max_ = Metrics(minMax.second); chunkMetrics->max_ = Metrics(minMax.second);
break; break;
} }
case DataType::FLOAT: { case DataType::FLOAT: {
const float* typedData = static_cast<const float*>(chunk_data); const float* typedData = static_cast<const float*>(chunk_data);
std::pair<float, float> minMax = std::pair<float, float> minMax =
ProcessFieldMetrics<float>(typedData, count); ProcessFieldMetrics<float>(typedData, count);
chunkMetrics.min_ = Metrics(minMax.first); chunkMetrics->min_ = Metrics(minMax.first);
chunkMetrics.max_ = Metrics(minMax.second); chunkMetrics->max_ = Metrics(minMax.second);
break; break;
} }
case DataType::DOUBLE: { case DataType::DOUBLE: {
@ -86,13 +88,20 @@ SkipIndex::LoadPrimitive(milvus::FieldId field_id,
static_cast<const double*>(chunk_data); static_cast<const double*>(chunk_data);
std::pair<double, double> minMax = std::pair<double, double> minMax =
ProcessFieldMetrics<double>(typedData, count); ProcessFieldMetrics<double>(typedData, count);
chunkMetrics.min_ = Metrics(minMax.first); chunkMetrics->min_ = Metrics(minMax.first);
chunkMetrics.max_ = Metrics(minMax.second); chunkMetrics->max_ = Metrics(minMax.second);
break; break;
} }
} }
} }
fieldChunkMetrics_[field_id][chunk_id] = chunkMetrics; std::unique_lock lck(mutex_);
if (fieldChunkMetrics_.count(field_id) == 0) {
fieldChunkMetrics_.insert(std::make_pair(
field_id,
std::unordered_map<int64_t, std::unique_ptr<FieldChunkMetrics>>()));
}
fieldChunkMetrics_[field_id].emplace(chunk_id, std::move(chunkMetrics));
} }
void void
@ -100,9 +109,9 @@ SkipIndex::LoadString(milvus::FieldId field_id,
int64_t chunk_id, int64_t chunk_id,
const milvus::VariableColumn<std::string>& var_column) { const milvus::VariableColumn<std::string>& var_column) {
int num_rows = var_column.NumRows(); int num_rows = var_column.NumRows();
FieldChunkMetrics chunkMetrics; auto chunkMetrics = std::make_unique<FieldChunkMetrics>();
if (num_rows > 0) { if (num_rows > 0) {
chunkMetrics.hasValue_ = true; chunkMetrics->hasValue_ = true;
std::string_view min_string = var_column.RawAt(0); std::string_view min_string = var_column.RawAt(0);
std::string_view max_string = var_column.RawAt(0); std::string_view max_string = var_column.RawAt(0);
for (size_t i = 1; i < num_rows; i++) { for (size_t i = 1; i < num_rows; i++) {
@ -114,10 +123,16 @@ SkipIndex::LoadString(milvus::FieldId field_id,
max_string = val; max_string = val;
} }
} }
chunkMetrics.min_ = Metrics(min_string); chunkMetrics->min_ = Metrics(min_string);
chunkMetrics.max_ = Metrics(max_string); chunkMetrics->max_ = Metrics(max_string);
} }
fieldChunkMetrics_[field_id][chunk_id] = chunkMetrics; std::unique_lock lck(mutex_);
if (fieldChunkMetrics_.count(field_id) == 0) {
fieldChunkMetrics_.insert(std::make_pair(
field_id,
std::unordered_map<int64_t, std::unique_ptr<FieldChunkMetrics>>()));
}
fieldChunkMetrics_[field_id].emplace(chunk_id, std::move(chunkMetrics));
} }
} // namespace milvus } // namespace milvus

View File

@ -35,12 +35,6 @@ struct FieldChunkMetrics {
class SkipIndex { class SkipIndex {
public: public:
SkipIndex() {
fieldChunkMetrics_ = std::unordered_map<
FieldId,
std::unordered_map<int64_t, FieldChunkMetrics>>();
}
template <typename T> template <typename T>
bool bool
CanSkipUnaryRange(FieldId field_id, CanSkipUnaryRange(FieldId field_id,
@ -245,7 +239,10 @@ class SkipIndex {
} }
private: private:
std::unordered_map<FieldId, std::unordered_map<int64_t, FieldChunkMetrics>> std::unordered_map<
FieldId,
std::unordered_map<int64_t, std::unique_ptr<FieldChunkMetrics>>>
fieldChunkMetrics_; fieldChunkMetrics_;
mutable std::shared_mutex mutex_;
}; };
} // namespace milvus } // namespace milvus

View File

@ -36,7 +36,7 @@
#include "segcore/SegmentGrowingImpl.h" #include "segcore/SegmentGrowingImpl.h"
#include "simdjson/error.h" #include "simdjson/error.h"
#include "query/PlanProto.h" #include "query/PlanProto.h"
#include "segcore/SkipIndex.h" #include "index/SkipIndex.h"
#include "simd/hook.h" #include "simd/hook.h"
#include "index/Meta.h" #include "index/Meta.h"

View File

@ -38,8 +38,7 @@ set(SEGCORE_FILES
ScalarIndex.cpp ScalarIndex.cpp
TimestampIndex.cpp TimestampIndex.cpp
Utils.cpp Utils.cpp
ConcurrentVector.cpp ConcurrentVector.cpp)
SkipIndex.cpp)
add_library(milvus_segcore SHARED ${SEGCORE_FILES}) add_library(milvus_segcore SHARED ${SEGCORE_FILES})
target_link_libraries(milvus_segcore milvus_query milvus_exec ${OpenMP_CXX_FLAGS} milvus-storage) target_link_libraries(milvus_segcore milvus_query milvus_exec ${OpenMP_CXX_FLAGS} milvus-storage)

View File

@ -33,7 +33,7 @@
#include "pb/schema.pb.h" #include "pb/schema.pb.h"
#include "pb/segcore.pb.h" #include "pb/segcore.pb.h"
#include "index/IndexInfo.h" #include "index/IndexInfo.h"
#include "SkipIndex.h" #include "index/SkipIndex.h"
#include "mmap/Column.h" #include "mmap/Column.h"
namespace milvus::segcore { namespace milvus::segcore {