enhance: support integral type for MV and skip MV if there is only one category (#33161)

issue: #29892

---------

Signed-off-by: Patrick Weizhi Xu <weizhi.xu@zilliz.com>
pull/34090/head
Patrick Weizhi Xu 2024-06-24 10:20:01 +08:00 committed by GitHub
parent b5c9a7364b
commit b961767005
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 544 additions and 102 deletions

View File

@ -23,6 +23,7 @@
#include <optional>
#include <type_traits>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
@ -569,7 +570,7 @@ using DataTypeToOffsetMap =
std::unordered_map<DataTypeNativeOrVoid<T>, int64_t>;
template <DataType T>
void
bool
WriteOptFieldIvfDataImpl(
const int64_t field_id,
const std::shared_ptr<LocalChunkManager>& local_chunk_manager,
@ -587,6 +588,12 @@ WriteOptFieldIvfDataImpl(
mp[val].push_back(offset++);
}
}
// Do not write to disk if there is only one value
if (mp.size() == 1) {
return false;
}
local_chunk_manager->Write(local_data_path,
write_offset,
const_cast<int64_t*>(&field_id),
@ -612,6 +619,7 @@ WriteOptFieldIvfDataImpl(
data_size);
write_offset += data_size;
}
return true;
}
#define GENERATE_OPT_FIELD_IVF_IMPL(DT) \
@ -630,32 +638,23 @@ WriteOptFieldIvfData(
uint64_t& write_offset) {
switch (dt) {
case DataType::BOOL:
GENERATE_OPT_FIELD_IVF_IMPL(DataType::BOOL);
break;
return GENERATE_OPT_FIELD_IVF_IMPL(DataType::BOOL);
case DataType::INT8:
GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT8);
break;
return GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT8);
case DataType::INT16:
GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT16);
break;
return GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT16);
case DataType::INT32:
GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT32);
break;
return GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT32);
case DataType::INT64:
GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT64);
break;
return GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT64);
case DataType::FLOAT:
GENERATE_OPT_FIELD_IVF_IMPL(DataType::FLOAT);
break;
return GENERATE_OPT_FIELD_IVF_IMPL(DataType::FLOAT);
case DataType::DOUBLE:
GENERATE_OPT_FIELD_IVF_IMPL(DataType::DOUBLE);
break;
return GENERATE_OPT_FIELD_IVF_IMPL(DataType::DOUBLE);
case DataType::STRING:
GENERATE_OPT_FIELD_IVF_IMPL(DataType::STRING);
break;
return GENERATE_OPT_FIELD_IVF_IMPL(DataType::STRING);
case DataType::VARCHAR:
GENERATE_OPT_FIELD_IVF_IMPL(DataType::VARCHAR);
break;
return GENERATE_OPT_FIELD_IVF_IMPL(DataType::VARCHAR);
default:
LOG_WARN("Unsupported data type in optional scalar field: ", dt);
return false;
@ -693,7 +692,7 @@ WriteOptFieldsIvfMeta(
std::string
DiskFileManagerImpl::CacheOptFieldToDisk(
std::shared_ptr<milvus_storage::Space> space, OptFieldT& fields_map) {
uint32_t num_of_fields = fields_map.size();
const uint32_t num_of_fields = fields_map.size();
if (0 == num_of_fields) {
return "";
} else if (num_of_fields > 1) {
@ -719,6 +718,7 @@ DiskFileManagerImpl::CacheOptFieldToDisk(
WriteOptFieldsIvfMeta(
local_chunk_manager, local_data_path, num_of_fields, write_offset);
std::unordered_set<int64_t> actual_field_ids;
auto reader = space->ScanData();
for (auto& [field_id, tup] : fields_map) {
const auto& field_name = std::get<0>(tup);
@ -745,12 +745,23 @@ DiskFileManagerImpl::CacheOptFieldToDisk(
field_data->FillFieldData(col_data);
field_datas.emplace_back(field_data);
}
if (!WriteOptFieldIvfData(field_type,
field_id,
local_chunk_manager,
local_data_path,
field_datas,
write_offset)) {
if (WriteOptFieldIvfData(field_type,
field_id,
local_chunk_manager,
local_data_path,
field_datas,
write_offset)) {
actual_field_ids.insert(field_id);
}
}
if (actual_field_ids.size() != num_of_fields) {
write_offset = 0;
WriteOptFieldsIvfMeta(local_chunk_manager,
local_data_path,
actual_field_ids.size(),
write_offset);
if (actual_field_ids.empty()) {
return "";
}
}
@ -759,7 +770,7 @@ DiskFileManagerImpl::CacheOptFieldToDisk(
std::string
DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) {
uint32_t num_of_fields = fields_map.size();
const uint32_t num_of_fields = fields_map.size();
if (0 == num_of_fields) {
return "";
} else if (num_of_fields > 1) {
@ -793,6 +804,7 @@ DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) {
auto parallel_degree =
uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
std::unordered_set<int64_t> actual_field_ids;
for (auto& [field_id, tup] : fields_map) {
const auto& field_type = std::get<1>(tup);
auto& field_paths = std::get<2>(tup);
@ -814,15 +826,27 @@ DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) {
if (batch_files.size() > 0) {
FetchRawData();
}
if (!WriteOptFieldIvfData(field_type,
field_id,
local_chunk_manager,
local_data_path,
field_datas,
write_offset)) {
if (WriteOptFieldIvfData(field_type,
field_id,
local_chunk_manager,
local_data_path,
field_datas,
write_offset)) {
actual_field_ids.insert(field_id);
}
}
if (actual_field_ids.size() != num_of_fields) {
write_offset = 0;
WriteOptFieldsIvfMeta(local_chunk_manager,
local_data_path,
actual_field_ids.size(),
write_offset);
if (actual_field_ids.empty()) {
return "";
}
}
return local_data_path;
}

View File

@ -18,7 +18,9 @@
#include <arrow/type_fwd.h>
#include <gtest/gtest.h>
#include <cstdint>
#include <limits>
#include <memory>
#include <stdexcept>
#include <string>
#include <fstream>
#include <vector>
@ -202,14 +204,12 @@ TEST_F(DiskAnnFileManagerTest, TestThreadPoolException) {
}
}
namespace {
const int64_t kOptFieldId = 123456;
const std::string kOptFieldName = "opt_field_name";
const DataType kOptFiledType = DataType::INT64;
const int64_t kOptFieldDataRange = 10000;
const std::string kOptFieldPath = "/tmp/diskann/opt_field/123123";
// const std::string kOptFieldPath = "/tmp/diskann/index_files/1000/index";
const size_t kEntityCnt = 1000 * 1000;
const DataType kOptFieldDataType = DataType::INT64;
const int64_t kOptFieldDataRange = 1000;
const std::string kOptFieldPath = "/tmp/diskann/opt_field/";
const size_t kEntityCnt = 1000 * 10;
const FieldDataMeta kOptVecFieldDataMeta = {1, 2, 3, 100};
using OffsetT = uint32_t;
@ -225,25 +225,50 @@ CreateFileManager(const ChunkManagerPtr& cm)
storage::FileManagerContext(kOptVecFieldDataMeta, index_meta, cm));
}
template <typename T>
auto
PrepareRawFieldData() -> std::vector<int64_t> {
std::vector<int64_t> data(kEntityCnt);
int64_t field_val = 0;
PrepareRawFieldData(const int64_t opt_field_data_range) -> std::vector<T> {
if (opt_field_data_range > std::numeric_limits<T>::max()) {
throw std::runtime_error("field data range is too large: " +
std::to_string(opt_field_data_range));
}
std::vector<T> data(kEntityCnt);
T field_val = 0;
for (size_t i = 0; i < kEntityCnt; ++i) {
data[i] = field_val++;
if (field_val >= kOptFieldDataRange) {
if (field_val >= opt_field_data_range) {
field_val = 0;
}
}
return data;
}
template <>
auto
PrepareInsertData() -> std::string {
size_t sz = sizeof(int64_t) * kEntityCnt;
std::vector<int64_t> data = PrepareRawFieldData();
auto field_data =
storage::CreateFieldData(kOptFieldDataType, 1, kEntityCnt);
PrepareRawFieldData<std::string>(const int64_t opt_field_data_range)
-> std::vector<std::string> {
if (opt_field_data_range > std::numeric_limits<char>::max()) {
throw std::runtime_error("field data range is too large: " +
std::to_string(opt_field_data_range));
}
std::vector<std::string> data(kEntityCnt);
char field_val = 0;
for (size_t i = 0; i < kEntityCnt; ++i) {
data[i] = std::to_string(field_val);
field_val++;
if (field_val >= opt_field_data_range) {
field_val = 0;
}
}
return data;
}
template <DataType DT, typename NativeType>
auto
PrepareInsertData(const int64_t opt_field_data_range) -> std::string {
std::vector<NativeType> data =
PrepareRawFieldData<NativeType>(opt_field_data_range);
auto field_data = storage::CreateFieldData(DT, 1, kEntityCnt);
field_data->FillFieldData(data.data(), kEntityCnt);
storage::InsertData insert_data(field_data);
insert_data.SetFieldDataMeta(kOptVecFieldDataMeta);
@ -253,16 +278,16 @@ PrepareInsertData() -> std::string {
auto chunk_manager =
storage::CreateChunkManager(get_default_local_storage_config());
std::string path = kOptFieldPath + "0";
std::string path = kOptFieldPath + std::to_string(kOptFieldId);
boost::filesystem::remove_all(path);
chunk_manager->Write(path, serialized_data.data(), serialized_data.size());
return path;
}
auto
PrepareInsertDataSpace()
PrepareInsertDataSpace(const int64_t opt_field_data_range)
-> std::pair<std::string, std::shared_ptr<milvus_storage::Space>> {
std::string path = kOptFieldPath + "1";
std::string path = kOptFieldPath + "space/" + std::to_string(kOptFieldId);
arrow::FieldVector arrow_fields{
arrow::field("pk", arrow::int64()),
arrow::field("ts", arrow::int64()),
@ -281,7 +306,7 @@ PrepareInsertDataSpace()
milvus_storage::Options{schema});
EXPECT_TRUE(opt_space.has_value());
auto space = std::move(opt_space.value());
const auto data = PrepareRawFieldData();
const auto data = PrepareRawFieldData<int64_t>(opt_field_data_range);
arrow::Int64Builder pk_builder;
arrow::Int64Builder ts_builder;
arrow::NumericBuilder<arrow::Int64Type> scalar_builder;
@ -315,18 +340,21 @@ PrepareInsertDataSpace()
return {path, std::move(space)};
}
template <DataType DT>
auto
PrepareOptionalField(const std::shared_ptr<DiskFileManagerImpl>& file_manager,
const std::string& insert_file_path) -> OptFieldT {
OptFieldT opt_field;
std::vector<std::string> insert_files;
insert_files.emplace_back(insert_file_path);
opt_field[kOptFieldId] = {kOptFieldName, kOptFiledType, insert_files};
opt_field[kOptFieldId] = {kOptFieldName, DT, insert_files};
return opt_field;
}
void
CheckOptFieldCorrectness(const std::string& local_file_path) {
CheckOptFieldCorrectness(
const std::string& local_file_path,
const int64_t opt_field_data_range = kOptFieldDataRange) {
std::ifstream ifs(local_file_path);
if (!ifs.is_open()) {
FAIL() << "open file failed: " << local_file_path << std::endl;
@ -344,10 +372,10 @@ CheckOptFieldCorrectness(const std::string& local_file_path) {
EXPECT_EQ(field_id, kOptFieldId);
ifs.read(reinterpret_cast<char*>(&num_of_unique_field_data),
sizeof(num_of_unique_field_data));
EXPECT_EQ(num_of_unique_field_data, kOptFieldDataRange);
EXPECT_EQ(num_of_unique_field_data, opt_field_data_range);
uint32_t expected_single_category_offset_cnt =
kEntityCnt / kOptFieldDataRange;
kEntityCnt / opt_field_data_range;
uint32_t read_single_category_offset_cnt;
std::vector<OffsetT> single_category_offsets(
expected_single_category_offset_cnt);
@ -364,54 +392,96 @@ CheckOptFieldCorrectness(const std::string& local_file_path) {
first_offset = single_category_offsets[0];
}
for (size_t j = 1; j < read_single_category_offset_cnt; ++j) {
ASSERT_EQ(single_category_offsets[j] % kOptFieldDataRange,
first_offset % kOptFieldDataRange);
ASSERT_EQ(single_category_offsets[j] % opt_field_data_range,
first_offset % opt_field_data_range);
}
}
}
} // namespace
TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskFieldEmpty) {
auto file_manager = CreateFileManager(cm_);
const auto& [insert_file_space_path, space] = PrepareInsertDataSpace();
OptFieldT opt_fields;
EXPECT_TRUE(file_manager->CacheOptFieldToDisk(opt_fields).empty());
EXPECT_TRUE(file_manager->CacheOptFieldToDisk(space, opt_fields).empty());
}
{
const auto& [insert_file_space_path, space] =
PrepareInsertDataSpace(kOptFieldDataRange);
OptFieldT opt_fields;
EXPECT_TRUE(file_manager->CacheOptFieldToDisk(opt_fields).empty());
EXPECT_TRUE(
file_manager->CacheOptFieldToDisk(space, opt_fields).empty());
}
TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskSpaceEmpty) {
auto file_manager = CreateFileManager(cm_);
auto opt_fileds = PrepareOptionalField(file_manager, "");
auto res = file_manager->CacheOptFieldToDisk(nullptr, opt_fileds);
EXPECT_TRUE(res.empty());
{
auto opt_fileds =
PrepareOptionalField<DataType::INT64>(file_manager, "");
auto res = file_manager->CacheOptFieldToDisk(nullptr, opt_fileds);
EXPECT_TRUE(res.empty());
}
}
TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskOptFieldMoreThanOne) {
auto file_manager = CreateFileManager(cm_);
const auto insert_file_path = PrepareInsertData();
const auto& [insert_file_space_path, space] = PrepareInsertDataSpace();
OptFieldT opt_fields = PrepareOptionalField(file_manager, insert_file_path);
const auto insert_file_path =
PrepareInsertData<DataType::INT64, int64_t>(kOptFieldDataRange);
const auto& [insert_file_space_path, space] =
PrepareInsertDataSpace(kOptFieldDataRange);
OptFieldT opt_fields =
PrepareOptionalField<DataType::INT64>(file_manager, insert_file_path);
opt_fields[kOptFieldId + 1] = {
kOptFieldName + "second", kOptFiledType, {insert_file_space_path}};
kOptFieldName + "second", DataType::INT64, {insert_file_space_path}};
EXPECT_THROW(file_manager->CacheOptFieldToDisk(opt_fields), SegcoreError);
EXPECT_THROW(file_manager->CacheOptFieldToDisk(space, opt_fields),
SegcoreError);
}
TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskCorrect) {
auto file_manager = CreateFileManager(cm_);
const auto insert_file_path = PrepareInsertData();
auto opt_fileds = PrepareOptionalField(file_manager, insert_file_path);
auto res = file_manager->CacheOptFieldToDisk(opt_fileds);
ASSERT_FALSE(res.empty());
CheckOptFieldCorrectness(res);
}
TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskSpaceCorrect) {
auto file_manager = CreateFileManager(cm_);
const auto& [insert_file_path, space] = PrepareInsertDataSpace();
auto opt_fileds = PrepareOptionalField(file_manager, insert_file_path);
const auto& [insert_file_path, space] =
PrepareInsertDataSpace(kOptFieldDataRange);
auto opt_fileds =
PrepareOptionalField<DataType::INT64>(file_manager, insert_file_path);
auto res = file_manager->CacheOptFieldToDisk(space, opt_fileds);
ASSERT_FALSE(res.empty());
CheckOptFieldCorrectness(res);
}
#define TEST_TYPE(NAME, TYPE, NATIVE_TYPE, RANGE) \
TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskCorrect##NAME) { \
auto file_manager = CreateFileManager(cm_); \
auto insert_file_path = PrepareInsertData<TYPE, NATIVE_TYPE>(RANGE); \
auto opt_fields = \
PrepareOptionalField<TYPE>(file_manager, insert_file_path); \
auto res = file_manager->CacheOptFieldToDisk(opt_fields); \
ASSERT_FALSE(res.empty()); \
CheckOptFieldCorrectness(res, RANGE); \
};
TEST_TYPE(INT8, DataType::INT8, int8_t, 100);
TEST_TYPE(INT16, DataType::INT16, int16_t, kOptFieldDataRange);
TEST_TYPE(INT32, DataType::INT32, int32_t, kOptFieldDataRange);
TEST_TYPE(INT64, DataType::INT64, int64_t, kOptFieldDataRange);
TEST_TYPE(FLOAT, DataType::FLOAT, float, kOptFieldDataRange);
TEST_TYPE(DOUBLE, DataType::DOUBLE, double, kOptFieldDataRange);
TEST_TYPE(STRING, DataType::STRING, std::string, 100);
TEST_TYPE(VARCHAR, DataType::VARCHAR, std::string, 100);
#undef TEST_TYPE
TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskOnlyOneCategory) {
auto file_manager = CreateFileManager(cm_);
{
const auto insert_file_path =
PrepareInsertData<DataType::INT64, int64_t>(1);
auto opt_fileds = PrepareOptionalField<DataType::INT64>(
file_manager, insert_file_path);
auto res = file_manager->CacheOptFieldToDisk(opt_fileds);
ASSERT_TRUE(res.empty());
}
{
const auto& [insert_file_path, space] = PrepareInsertDataSpace(1);
auto opt_fileds = PrepareOptionalField<DataType::INT64>(
file_manager, insert_file_path);
auto res = file_manager->CacheOptFieldToDisk(space, opt_fileds);
ASSERT_TRUE(res.empty());
}
}

View File

@ -117,12 +117,14 @@ func (it *indexBuildTask) AssignTask(ctx context.Context, client types.IndexNode
if partitionKeyField == nil || err != nil {
log.Ctx(ctx).Warn("index builder get partition key field failed", zap.Int64("buildID", it.buildID), zap.Error(err))
} else {
optionalFields = append(optionalFields, &indexpb.OptionalFieldInfo{
FieldID: partitionKeyField.FieldID,
FieldName: partitionKeyField.Name,
FieldType: int32(partitionKeyField.DataType),
DataIds: getBinLogIDs(segment, partitionKeyField.FieldID),
})
if typeutil.IsFieldDataTypeSupportMaterializedView(partitionKeyField) {
optionalFields = append(optionalFields, &indexpb.OptionalFieldInfo{
FieldID: partitionKeyField.FieldID,
FieldName: partitionKeyField.Name,
FieldType: int32(partitionKeyField.DataType),
DataIds: getBinLogIDs(segment, partitionKeyField.FieldID),
})
}
}
}

View File

@ -37,19 +37,21 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
var (
collID = UniqueID(100)
partID = UniqueID(200)
indexID = UniqueID(300)
fieldID = UniqueID(400)
indexName = "_default_idx"
segID = UniqueID(500)
buildID = UniqueID(600)
nodeID = UniqueID(700)
collID = UniqueID(100)
partID = UniqueID(200)
indexID = UniqueID(300)
fieldID = UniqueID(400)
indexName = "_default_idx"
segID = UniqueID(500)
buildID = UniqueID(600)
nodeID = UniqueID(700)
partitionKeyID = UniqueID(800)
)
func createIndexMeta(catalog metastore.DataCoordCatalog) *indexMeta {
@ -905,14 +907,16 @@ func (s *taskSchedulerSuite) Test_scheduler() {
}, nil)
s.Run("test scheduler with indexBuilderV1", func() {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("True")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("False")
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
s.scheduler(handler)
})
s.Run("test scheduler with indexBuilderV2", func() {
paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("false")
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
s.scheduler(handler)
})
@ -1421,3 +1425,345 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() {
func Test_taskSchedulerSuite(t *testing.T) {
suite.Run(t, new(taskSchedulerSuite))
}
func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
ctx := context.Background()
catalog := catalogmocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil)
in := mocks.NewMockIndexNodeClient(s.T())
workerManager := NewMockWorkerManager(s.T())
workerManager.EXPECT().PickClient().Return(s.nodeID, in)
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true)
minNumberOfRowsToBuild := paramtable.Get().DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() + 1
fieldsSchema := []*schemapb.FieldSchema{
{
FieldID: fieldID,
Name: "vec",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: common.MetricTypeKey,
Value: "L2",
},
{
Key: common.IndexTypeKey,
Value: indexparamcheck.IndexHNSW,
},
},
},
{
FieldID: partitionKeyID,
Name: "scalar",
DataType: schemapb.DataType_VarChar,
IsPartitionKey: true,
},
}
mt := meta{
catalog: catalog,
collections: map[int64]*collectionInfo{
collID: {
ID: collID,
Schema: &schemapb.CollectionSchema{
Fields: fieldsSchema,
},
CreatedAt: 0,
},
},
analyzeMeta: &analyzeMeta{
ctx: context.Background(),
catalog: catalog,
},
indexMeta: &indexMeta{
catalog: catalog,
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 1,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: common.MetricTypeKey,
Value: "L2",
},
{
Key: common.IndexTypeKey,
Value: indexparamcheck.IndexHNSW,
},
},
},
},
},
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
segID: {
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: minNumberOfRowsToBuild,
IndexID: indexID,
BuildID: buildID,
NodeID: 0,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreateTime: 0,
IndexFileKeys: nil,
IndexSize: 0,
},
},
},
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{
buildID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: minNumberOfRowsToBuild,
IndexID: indexID,
BuildID: buildID,
NodeID: 0,
IndexVersion: 0,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreateTime: 0,
IndexFileKeys: nil,
IndexSize: 0,
},
},
},
segments: &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: minNumberOfRowsToBuild,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: 10,
},
},
},
},
}
cm := mocks.NewChunkManager(s.T())
cm.EXPECT().RootPath().Return("ut-index")
handler := NewNMockHandler(s.T())
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Name: "coll",
Fields: fieldsSchema,
EnableDynamicField: false,
Properties: nil,
},
}, nil)
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
scheduler := newTaskScheduler(ctx, &mt, workerManager, cm, newIndexEngineVersionManager(), handler)
waitTaskDoneFunc := func(sche *taskScheduler) {
for {
fmt.Println("wait for read lock")
sche.RLock()
fmt.Println("after read lock")
taskNum := len(sche.tasks)
fmt.Println("taskNum: ", taskNum)
sche.RUnlock()
if taskNum == 0 {
break
}
time.Sleep(time.Second)
}
}
resetMetaFunc := func() {
mt.indexMeta.buildID2SegmentIndex[buildID].IndexState = commonpb.IndexState_Unissued
mt.indexMeta.segmentIndexes[segID][indexID].IndexState = commonpb.IndexState_Unissued
mt.indexMeta.indexes[collID][indexID].IndexParams[1].Value = indexparamcheck.IndexHNSW
mt.collections[collID].Schema.Fields[1].IsPartitionKey = true
mt.collections[collID].Schema.Fields[1].DataType = schemapb.DataType_VarChar
}
in.EXPECT().QueryJobsV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, request *indexpb.QueryJobsV2Request, option ...grpc.CallOption) (*indexpb.QueryJobsV2Response, error) {
switch request.GetJobType() {
case indexpb.JobType_JobTypeIndexJob:
results := make([]*indexpb.IndexTaskInfo, 0)
for _, buildID := range request.GetTaskIDs() {
results = append(results, &indexpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Finished,
IndexFileKeys: []string{"file1", "file2"},
SerializedSize: 1024,
FailReason: "",
CurrentIndexVersion: 0,
IndexStoreVersion: 0,
})
}
return &indexpb.QueryJobsV2Response{
Status: merr.Success(),
ClusterID: request.GetClusterID(),
Result: &indexpb.QueryJobsV2Response_IndexJobResults{
IndexJobResults: &indexpb.IndexJobResults{
Results: results,
},
},
}, nil
default:
return &indexpb.QueryJobsV2Response{
Status: merr.Status(errors.New("unknown job type")),
}, nil
}
})
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil)
s.Run("success to get opt field on startup", func() {
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.NotZero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set")
return merr.Success(), nil
}).Once()
s.Equal(1, len(scheduler.tasks))
s.Equal(indexpb.JobState_JobStateInit, scheduler.tasks[buildID].GetState())
scheduler.Start()
waitTaskDoneFunc(scheduler)
resetMetaFunc()
})
s.Run("enqueue valid", func() {
for _, dataType := range []schemapb.DataType{
schemapb.DataType_Int8,
schemapb.DataType_Int16,
schemapb.DataType_Int32,
schemapb.DataType_Int64,
schemapb.DataType_VarChar,
schemapb.DataType_String,
} {
mt.collections[collID].Schema.Fields[1].DataType = dataType
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.NotZero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set")
return merr.Success(), nil
}).Once()
t := &indexBuildTask{
buildID: buildID,
nodeID: nodeID,
taskInfo: &indexpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Unissued,
FailReason: "",
// CurrentIndexVersion: 0,
// IndexStoreVersion: 0,
},
}
scheduler.enqueue(t)
waitTaskDoneFunc(scheduler)
resetMetaFunc()
}
})
// should still be able to build vec index when opt field is not set
s.Run("enqueue returns empty optional field when cfg disable", func() {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set")
return merr.Success(), nil
}).Once()
t := &indexBuildTask{
buildID: buildID,
nodeID: nodeID,
taskInfo: &indexpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Unissued,
FailReason: "",
},
}
scheduler.enqueue(t)
waitTaskDoneFunc(scheduler)
resetMetaFunc()
})
s.Run("enqueue returns empty optional field when the data type is not STRING or VARCHAR or Integer", func() {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
for _, dataType := range []schemapb.DataType{
schemapb.DataType_Bool,
schemapb.DataType_Float,
schemapb.DataType_Double,
schemapb.DataType_Array,
schemapb.DataType_JSON,
} {
mt.collections[collID].Schema.Fields[1].DataType = dataType
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set")
return merr.Success(), nil
}).Once()
t := &indexBuildTask{
buildID: buildID,
nodeID: nodeID,
taskInfo: &indexpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Unissued,
FailReason: "",
},
}
scheduler.enqueue(t)
waitTaskDoneFunc(scheduler)
resetMetaFunc()
}
})
s.Run("enqueue returns empty optional field when no partition key", func() {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
mt.collections[collID].Schema.Fields[1].IsPartitionKey = false
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set")
return merr.Success(), nil
}).Once()
t := &indexBuildTask{
buildID: buildID,
nodeID: nodeID,
taskInfo: &indexpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Unissued,
FailReason: "",
},
}
scheduler.enqueue(t)
waitTaskDoneFunc(scheduler)
resetMetaFunc()
})
scheduler.Stop()
}

View File

@ -2692,7 +2692,7 @@ func (s *MaterializedViewTestSuite) TestMvEnabledPartitionKeyOnInt64() {
err := task.PreExecute(s.ctx)
s.NoError(err)
s.NotZero(len(task.queryInfos))
s.Equal(false, task.queryInfos[0].MaterializedViewInvolved)
s.Equal(true, task.queryInfos[0].MaterializedViewInvolved)
}
func (s *MaterializedViewTestSuite) TestMvEnabledPartitionKeyOnVarChar() {

View File

@ -1097,7 +1097,7 @@ func HasPartitionKey(schema *schemapb.CollectionSchema) bool {
}
func IsFieldDataTypeSupportMaterializedView(fieldSchema *schemapb.FieldSchema) bool {
return fieldSchema.DataType == schemapb.DataType_VarChar || fieldSchema.DataType == schemapb.DataType_String
return IsIntegerType(fieldSchema.DataType) || IsStringType(fieldSchema.DataType)
}
// HasClusterKey check if a collection schema has ClusterKey field