enhance: Load raw data while scalar index doesn't have raw data (#28888)

issue: #28886

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
pull/28978/head
cai.zhang 2023-12-06 20:36:36 +08:00 committed by GitHub
parent 6a86ac0ac6
commit fb089cda8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 190 additions and 48 deletions

View File

@ -68,6 +68,9 @@ class IndexBase {
virtual BinarySet
UploadV2(const Config& config = {}) = 0;
virtual const bool
HasRawData() const = 0;
bool
IsMmapSupported() const {
return index_type_ == knowhere::IndexEnum::INDEX_HNSW ||

View File

@ -68,6 +68,9 @@ class ScalarIndex : public IndexBase {
virtual const TargetBitmap
Query(const DatasetPtr& dataset);
virtual const bool
HasRawData() const override = 0;
virtual int64_t
Size() = 0;
};

View File

@ -95,6 +95,11 @@ class ScalarIndexSort : public ScalarIndex<T> {
BinarySet
UploadV2(const Config& config = {}) override;
const bool
HasRawData() const override {
return true;
}
private:
bool
ShouldSkip(const T lower_value, const T upper_value, const OpType op);

View File

@ -93,6 +93,11 @@ class StringIndexMarisa : public StringIndex {
BinarySet
UploadV2(const Config& config = {});
const bool
HasRawData() const override {
return true;
}
private:
void
fill_str_ids(size_t n, const std::string* values);

View File

@ -2280,8 +2280,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op)
// for case, sealed segment has loaded index for scalar field instead of raw data
auto& indexing = segment_.chunk_scalar_index<bool>(
field_id, chunk_id);
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
if (indexing.HasRawData()) {
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
};
}
auto chunk_data =
segment_.chunk_data<bool>(field_id, chunk_id)
.data();
return [chunk_data](int i) -> const number {
return chunk_data[i];
};
}
}
@ -2297,8 +2305,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op)
// for case, sealed segment has loaded index for scalar field instead of raw data
auto& indexing = segment_.chunk_scalar_index<int8_t>(
field_id, chunk_id);
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
if (indexing.HasRawData()) {
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
};
}
auto chunk_data =
segment_.chunk_data<int8_t>(field_id, chunk_id)
.data();
return [chunk_data](int i) -> const number {
return chunk_data[i];
};
}
}
@ -2314,8 +2330,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op)
// for case, sealed segment has loaded index for scalar field instead of raw data
auto& indexing = segment_.chunk_scalar_index<int16_t>(
field_id, chunk_id);
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
if (indexing.HasRawData()) {
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
};
}
auto chunk_data =
segment_.chunk_data<int16_t>(field_id, chunk_id)
.data();
return [chunk_data](int i) -> const number {
return chunk_data[i];
};
}
}
@ -2331,8 +2355,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op)
// for case, sealed segment has loaded index for scalar field instead of raw data
auto& indexing = segment_.chunk_scalar_index<int32_t>(
field_id, chunk_id);
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
if (indexing.HasRawData()) {
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
};
}
auto chunk_data =
segment_.chunk_data<int32_t>(field_id, chunk_id)
.data();
return [chunk_data](int i) -> const number {
return chunk_data[i];
};
}
}
@ -2348,8 +2380,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op)
// for case, sealed segment has loaded index for scalar field instead of raw data
auto& indexing = segment_.chunk_scalar_index<int64_t>(
field_id, chunk_id);
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
if (indexing.HasRawData()) {
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
};
}
auto chunk_data =
segment_.chunk_data<int64_t>(field_id, chunk_id)
.data();
return [chunk_data](int i) -> const number {
return chunk_data[i];
};
}
}
@ -2365,8 +2405,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op)
// for case, sealed segment has loaded index for scalar field instead of raw data
auto& indexing = segment_.chunk_scalar_index<float>(
field_id, chunk_id);
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
if (indexing.HasRawData()) {
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
};
}
auto chunk_data =
segment_.chunk_data<float>(field_id, chunk_id)
.data();
return [chunk_data](int i) -> const number {
return chunk_data[i];
};
}
}
@ -2382,8 +2430,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op)
// for case, sealed segment has loaded index for scalar field instead of raw data
auto& indexing = segment_.chunk_scalar_index<double>(
field_id, chunk_id);
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
if (indexing.HasRawData()) {
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
};
}
auto chunk_data =
segment_.chunk_data<double>(field_id, chunk_id)
.data();
return [chunk_data](int i) -> const number {
return chunk_data[i];
};
}
}
@ -2411,8 +2467,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op)
auto& indexing =
segment_.chunk_scalar_index<std::string>(field_id,
chunk_id);
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
if (indexing.HasRawData()) {
return [&indexing](int i) -> const number {
return indexing.Reverse_Lookup(i);
};
}
auto chunk_data =
segment_.chunk_data<std::string>(field_id, chunk_id)
.data();
return [chunk_data](int i) -> const number {
return chunk_data[i];
};
}
}

View File

@ -323,8 +323,8 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
auto data_type = field_meta.get_data_type();
// Don't allow raw data and index exist at the same time
AssertInfo(!get_bit(index_ready_bitset_, field_id),
"field data can't be loaded when indexing exists");
// AssertInfo(!get_bit(index_ready_bitset_, field_id),
// "field data can't be loaded when indexing exists");
std::shared_ptr<ColumnBase> column{};
if (datatype_is_variable(data_type)) {
@ -1071,30 +1071,11 @@ SegmentSealedImpl::fill_with_empty(FieldId field_id, int64_t count) const {
}
std::unique_ptr<DataArray>
SegmentSealedImpl::bulk_subscript(FieldId field_id,
const int64_t* seg_offsets,
int64_t count) const {
auto& field_meta = schema_->operator[](field_id);
// if count == 0, return empty data array
if (count == 0) {
return fill_with_empty(field_id, count);
}
if (HasIndex(field_id)) {
// if field has load scalar index, reverse raw data from index
if (!datatype_is_vector(field_meta.get_data_type())) {
AssertInfo(num_chunk() == 1,
"num chunk not equal to 1 for sealed segment");
auto index = chunk_index_impl(field_id, 0);
return ReverseDataFromIndex(index, seg_offsets, count, field_meta);
}
return get_vector(field_id, seg_offsets, count);
}
Assert(get_bit(field_data_ready_bitset_, field_id));
// DO NOT directly access the column byh map like: `fields_.at(field_id)->Data()`,
SegmentSealedImpl::get_raw_data(FieldId field_id,
const FieldMeta& field_meta,
const int64_t* seg_offsets,
int64_t count) const {
// DO NOT directly access the column by map like: `fields_.at(field_id)->Data()`,
// we have to clone the shared pointer,
// to make sure it won't get released if segment released
auto column = fields_.at(field_id);
@ -1235,10 +1216,39 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id,
field_meta.get_data_type()));
}
}
return ret;
}
std::unique_ptr<DataArray>
SegmentSealedImpl::bulk_subscript(FieldId field_id,
const int64_t* seg_offsets,
int64_t count) const {
auto& field_meta = schema_->operator[](field_id);
// if count == 0, return empty data array
if (count == 0) {
return fill_with_empty(field_id, count);
}
if (HasIndex(field_id)) {
// if field has load scalar index, reverse raw data from index
if (!datatype_is_vector(field_meta.get_data_type())) {
AssertInfo(num_chunk() == 1,
"num chunk not equal to 1 for sealed segment");
auto index = chunk_index_impl(field_id, 0);
if (index->HasRawData()) {
return ReverseDataFromIndex(
index, seg_offsets, count, field_meta);
}
return get_raw_data(field_id, field_meta, seg_offsets, count);
}
return get_vector(field_id, seg_offsets, count);
}
Assert(get_bit(field_data_ready_bitset_, field_id));
return get_raw_data(field_id, field_meta, seg_offsets, count);
}
bool
SegmentSealedImpl::HasIndex(FieldId field_id) const {
std::shared_lock lck(mutex_);
@ -1271,6 +1281,11 @@ SegmentSealedImpl::HasRawData(int64_t field_id) const {
field_indexing->indexing_.get());
return vec_index->HasRawData();
}
} else {
auto scalar_index = scalar_indexings_.find(fieldID);
if (scalar_index != scalar_indexings_.end()) {
return scalar_index->second->HasRawData();
}
}
return true;
}

View File

@ -207,6 +207,12 @@ class SegmentSealedImpl : public SegmentSealed {
std::unique_ptr<DataArray>
fill_with_empty(FieldId field_id, int64_t count) const;
std::unique_ptr<DataArray>
get_raw_data(FieldId field_id,
const FieldMeta& field_meta,
const int64_t* seg_offsets,
int64_t count) const;
void
update_row_count(int64_t row_count) {
// if (row_count_opt_.has_value()) {

View File

@ -81,6 +81,26 @@ TYPED_TEST_P(TypedScalarIndexTest, Count) {
}
}
TYPED_TEST_P(TypedScalarIndexTest, HasRawData) {
using T = TypeParam;
auto dtype = milvus::GetDType<T>();
auto index_types = GetIndexTypes<T>();
for (const auto& index_type : index_types) {
milvus::index::CreateIndexInfo create_index_info;
create_index_info.field_type = milvus::DataType(dtype);
create_index_info.index_type = index_type;
auto index =
milvus::index::IndexFactory::GetInstance().CreateScalarIndex(
create_index_info);
auto scalar_index =
dynamic_cast<milvus::index::ScalarIndex<T>*>(index.get());
auto arr = GenArr<T>(nb);
scalar_index->Build(nb, arr.data());
ASSERT_EQ(nb, scalar_index->Count());
ASSERT_TRUE(scalar_index->HasRawData());
}
}
TYPED_TEST_P(TypedScalarIndexTest, In) {
using T = TypeParam;
auto dtype = milvus::GetDType<T>();
@ -200,7 +220,8 @@ REGISTER_TYPED_TEST_CASE_P(TypedScalarIndexTest,
NotIn,
Range,
Codec,
Reverse);
Reverse,
HasRawData);
INSTANTIATE_TYPED_TEST_CASE_P(ArithmeticCheck, TypedScalarIndexTest, ScalarT);

View File

@ -52,6 +52,12 @@ TEST_F(StringIndexMarisaTest, Build) {
index->Build(strs.size(), strs.data());
}
TEST_F(StringIndexMarisaTest, HasRawData) {
auto index = milvus::index::CreateStringIndexMarisa();
index->Build(nb, strs.data());
ASSERT_TRUE(index->HasRawData());
}
TEST_F(StringIndexMarisaTest, Count) {
auto index = milvus::index::CreateStringIndexMarisa();
index->Build(nb, strs.data());

View File

@ -703,6 +703,7 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap
}
}
loadFieldDataInfo.appendMMapDirPath(paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue())
loadFieldDataInfo.enableMmap(fieldID, mmapEnabled)
var status C.CStatus
GetLoadPool().Submit(func() (any, error) {

View File

@ -564,12 +564,27 @@ func (loader *segmentLoader) loadSegment(ctx context.Context,
}
}
schemaHelper, _ := typeutil.CreateSchemaHelper(collection.Schema())
log.Info("load fields...",
zap.Int64s("indexedFields", lo.Keys(indexedFieldInfos)),
)
if err := loader.loadFieldsIndex(ctx, collection.Schema(), segment, loadInfo.GetNumOfRows(), indexedFieldInfos); err != nil {
if err := loader.loadFieldsIndex(ctx, schemaHelper, segment, loadInfo.GetNumOfRows(), indexedFieldInfos); err != nil {
return err
}
for fieldID, info := range indexedFieldInfos {
field, err := schemaHelper.GetFieldFromID(fieldID)
if err != nil {
return err
}
if !typeutil.IsVectorType(field.GetDataType()) && !segment.HasRawData(fieldID) {
log.Info("field index doesn't include raw data, load binlog...", zap.Int64("fieldID", fieldID), zap.String("index", info.IndexInfo.GetIndexName()))
if err = segment.LoadFieldData(fieldID, loadInfo.GetNumOfRows(), info.FieldBinlog, true); err != nil {
log.Warn("load raw data failed", zap.Int64("fieldID", fieldID), zap.Error(err))
return err
}
}
}
if err := loader.loadSealedSegmentFields(ctx, segment, fieldBinlogs, loadInfo.GetNumOfRows()); err != nil {
return err
}
@ -654,13 +669,11 @@ func (loader *segmentLoader) loadSealedSegmentFields(ctx context.Context, segmen
}
func (loader *segmentLoader) loadFieldsIndex(ctx context.Context,
schema *schemapb.CollectionSchema,
schemaHelper *typeutil.SchemaHelper,
segment *LocalSegment,
numRows int64,
indexedFieldInfos map[int64]*IndexedFieldInfo,
) error {
schemaHelper, _ := typeutil.CreateSchemaHelper(schema)
for fieldID, fieldInfo := range indexedFieldInfos {
indexInfo := fieldInfo.IndexInfo
err := loader.loadFieldIndex(ctx, segment, indexInfo)