enhance: gather materialized view search info once per request (#31996)

issue: #29892 

This PR:
1. Move the process of gathering materialized search info to when the
search plan is created, before it goes to each segment, to avoid
repeated work and access the plan node under multi-threaded
circumstances.
2. Enforce the supported MV type to `VARCHAR`
3. Add integration test

Signed-off-by: Patrick Weizhi Xu <weizhi.xu@zilliz.com>
pull/32012/head
Patrick Weizhi Xu 2024-04-11 15:21:19 +08:00 committed by GitHub
parent 4bf32bbc87
commit 52ae47c850
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 487 additions and 74 deletions

View File

@ -23,6 +23,7 @@
#include "generated/ExtractInfoPlanNodeVisitor.h"
#include "pb/plan.pb.h"
#include "query/Utils.h"
#include "knowhere/comp/materialized_view.h"
namespace milvus::query {
namespace planpb = milvus::proto::plan;
@ -232,6 +233,25 @@ ProtoParser::PlanNodeFromProto(const planpb::PlanNode& plan_node_proto) {
plan_node->filter_plannode_ = std::move(expr_parser());
}
plan_node->search_info_ = std::move(search_info);
if (plan_node->search_info_.materialized_view_involved &&
plan_node->filter_plannode_.has_value()) {
const auto expr_info =
plan_node->filter_plannode_.value()->GatherInfo();
knowhere::MaterializedViewSearchInfo materialized_view_search_info;
for (const auto& [expr_field_id, vals] : expr_info.field_id_to_values) {
materialized_view_search_info
.field_id_to_touched_categories_cnt[expr_field_id] =
vals.size();
}
materialized_view_search_info.is_pure_and = expr_info.is_pure_and;
materialized_view_search_info.has_not = expr_info.has_not;
plan_node->search_info_
.search_params_[knowhere::meta::MATERIALIZED_VIEW_SEARCH_INFO] =
materialized_view_search_info;
}
return plan_node;
}

View File

@ -26,7 +26,6 @@
#include "exec/Task.h"
#include "segcore/SegmentInterface.h"
#include "query/GroupByOperator.h"
#include "knowhere/comp/materialized_view.h"
namespace milvus::query {
namespace impl {
@ -141,11 +140,6 @@ ExecPlanNodeVisitor::ExecuteExprNodeInternal(
// std::cout << bitset_holder->size() << " . " << s << std::endl;
}
expr::ExprInfo
GatherInfoBasedOnExpr(const std::shared_ptr<milvus::plan::PlanNode>& node) {
return node->GatherInfo();
}
template <typename VectorType>
void
ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) {
@ -172,22 +166,6 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) {
std::unique_ptr<BitsetType> bitset_holder;
if (node.filter_plannode_.has_value()) {
if (node.search_info_.materialized_view_involved) {
knowhere::MaterializedViewSearchInfo materialized_view_search_info;
const auto expr_info =
GatherInfoBasedOnExpr(node.filter_plannode_.value());
for (const auto& [field_id, vals] : expr_info.field_id_to_values) {
materialized_view_search_info
.field_id_to_touched_categories_cnt[field_id] = vals.size();
}
materialized_view_search_info.is_pure_and = expr_info.is_pure_and;
materialized_view_search_info.has_not = expr_info.has_not;
node.search_info_
.search_params_[knowhere::meta::MATERIALIZED_VIEW_SEARCH_INFO] =
materialized_view_search_info;
}
BitsetType expr_res;
ExecuteExprNode(
node.filter_plannode_.value(), segment, active_count, expr_res);

View File

@ -76,6 +76,7 @@ const std::string kVecFieldIdPlaceholder = "VEC_FID";
const std::string kDataTypePlaceholder = "DT";
const std::string kValPlaceholder = "VAL";
const std::string kPredicatePlaceholder = "PREDICATE_PLACEHOLDER";
const std::string kMvInvolvedPlaceholder = "MV_INVOLVED_PLACEHOLDER";
} // namespace
class ExprMaterializedViewTest : public testing::Test {
@ -121,6 +122,7 @@ class ExprMaterializedViewTest : public testing::Test {
round_decimal: 3
metric_type: "L2"
search_params: "{\"nprobe\": 1}"
materialized_view_involved: MV_INVOLVED_PLACEHOLDER
>
placeholder_tag: "$0">)";
const int64_t vec_field_id =
@ -147,8 +149,9 @@ class ExprMaterializedViewTest : public testing::Test {
// this function takes an predicate string in schemapb format
// and return a vector search plan
std::unique_ptr<milvus::query::Plan>
CreatePlan(const std::string& predicate_str) {
CreatePlan(const std::string& predicate_str, const bool is_mv_enable) {
auto plan_str = InterpolateTemplate(predicate_str);
plan_str = InterpolateMvInvolved(plan_str, is_mv_enable);
auto binary_plan = milvus::segcore::translate_text_plan_to_binary_plan(
plan_str.c_str());
return milvus::query::CreateSearchPlanByExpr(
@ -217,15 +220,13 @@ class ExprMaterializedViewTest : public testing::Test {
knowhere::MaterializedViewSearchInfo
TranslateThenExecuteWhenMvInolved(const std::string& predicate_str) {
auto plan = CreatePlan(predicate_str);
plan->plan_node_->search_info_.materialized_view_involved = true;
auto plan = CreatePlan(predicate_str, true);
return ExecutePlan(plan);
}
knowhere::MaterializedViewSearchInfo
TranslateThenExecuteWhenMvNotInolved(const std::string& predicate_str) {
auto plan = CreatePlan(predicate_str);
plan->plan_node_->search_info_.materialized_view_involved = false;
auto plan = CreatePlan(predicate_str, false);
return ExecutePlan(plan);
}
@ -259,6 +260,14 @@ class ExprMaterializedViewTest : public testing::Test {
str = std::regex_replace(str, std::regex(occ), replace);
}
std::string
InterpolateMvInvolved(const std::string& plan, const bool is_mv_involved) {
std::string p = plan;
ReplaceAllOccurrence(
p, kMvInvolvedPlaceholder, is_mv_involved ? "true" : "false");
return p;
}
private:
std::string
InterpolateTemplate(const std::string& predicate_str) {
@ -320,13 +329,12 @@ TEST_F(ExprMaterializedViewTest, TestMvNoExpr) {
data_field_info[DataType::VECTOR_FLOAT].field_id;
ReplaceAllOccurrence(
plan_str, kVecFieldIdPlaceholder, std::to_string(vec_field_id));
plan_str = InterpolateMvInvolved(plan_str, mv_involved);
auto binary_plan =
milvus::segcore::translate_text_plan_to_binary_plan(
plan_str.c_str());
auto plan = milvus::query::CreateSearchPlanByExpr(
*schema, binary_plan.data(), binary_plan.size());
plan->plan_node_->search_info_.materialized_view_involved =
mv_involved;
auto mv = ExecutePlan(plan);
TestMvExpectDefault(mv);
}
@ -345,8 +353,7 @@ TEST_F(ExprMaterializedViewTest, TestMvNotInvolvedExpr) {
>
)";
predicate = InterpolateSingleExpr(predicate, data_type);
auto plan = CreatePlan(predicate);
plan->plan_node_->search_info_.materialized_view_involved = false;
auto plan = CreatePlan(predicate, false);
auto mv = ExecutePlan(plan);
TestMvExpectDefault(mv);
}
@ -360,8 +367,7 @@ TEST_F(ExprMaterializedViewTest, TestMvNotInvolvedJsonExpr) {
InterpolateSingleExpr(
R"( elements:<VAL1> op:Contains elements_same_type:true>)",
DataType::INT64);
auto plan = CreatePlan(predicate);
plan->plan_node_->search_info_.materialized_view_involved = false;
auto plan = CreatePlan(predicate, false);
auto mv = ExecutePlan(plan);
TestMvExpectDefault(mv);
}

View File

@ -287,20 +287,24 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
// vector index build needs information of optional scalar fields data
optionalFields := make([]*indexpb.OptionalFieldInfo, 0)
if Params.CommonCfg.EnableMaterializedView.GetAsBool() && isOptionalScalarFieldSupported(indexType) {
if Params.CommonCfg.EnableMaterializedView.GetAsBool() {
colSchema := ib.meta.GetCollection(meta.CollectionID).Schema
hasPartitionKey := typeutil.HasPartitionKey(colSchema)
if hasPartitionKey {
partitionKeyField, err := typeutil.GetPartitionKeyFieldSchema(colSchema)
if partitionKeyField == nil || err != nil {
log.Ctx(ib.ctx).Warn("index builder get partition key field failed", zap.Int64("build", buildID), zap.Error(err))
} else {
optionalFields = append(optionalFields, &indexpb.OptionalFieldInfo{
FieldID: partitionKeyField.FieldID,
FieldName: partitionKeyField.Name,
FieldType: int32(partitionKeyField.DataType),
DataIds: getBinLogIDs(segment, partitionKeyField.FieldID),
})
if colSchema != nil {
hasPartitionKey := typeutil.HasPartitionKey(colSchema)
if hasPartitionKey {
partitionKeyField, err := typeutil.GetPartitionKeyFieldSchema(colSchema)
if partitionKeyField == nil || err != nil {
log.Ctx(ib.ctx).Warn("index builder get partition key field failed", zap.Int64("build", buildID), zap.Error(err))
} else {
if typeutil.IsFieldDataTypeSupportMaterializedView(partitionKeyField) {
optionalFields = append(optionalFields, &indexpb.OptionalFieldInfo{
FieldID: partitionKeyField.FieldID,
FieldName: partitionKeyField.Name,
FieldType: int32(partitionKeyField.DataType),
DataIds: getBinLogIDs(segment, partitionKeyField.FieldID),
})
}
}
}
}
}

View File

@ -1286,7 +1286,7 @@ func TestVecIndexWithOptionalScalarField(t *testing.T) {
{
FieldID: partitionKeyID,
Name: "scalar",
DataType: schemapb.DataType_Int64,
DataType: schemapb.DataType_VarChar,
IsPartitionKey: true,
},
},
@ -1411,6 +1411,7 @@ func TestVecIndexWithOptionalScalarField(t *testing.T) {
mt.indexMeta.segmentIndexes[segID][indexID].IndexState = commonpb.IndexState_Unissued
mt.indexMeta.indexes[collID][indexID].IndexParams[1].Value = indexparamcheck.IndexHNSW
mt.collections[collID].Schema.Fields[1].IsPartitionKey = true
mt.collections[collID].Schema.Fields[1].DataType = schemapb.DataType_VarChar
}
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
@ -1449,7 +1450,22 @@ func TestVecIndexWithOptionalScalarField(t *testing.T) {
IndexSize: 0,
}
t.Run("enqueue", func(t *testing.T) {
t.Run("enqueue varchar", func(t *testing.T) {
mt.collections[collID].Schema.Fields[1].DataType = schemapb.DataType_VarChar
ic.EXPECT().CreateJob(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *indexpb.CreateJobRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
assert.NotZero(t, len(in.OptionalScalarFields), "optional scalar field should be set")
return merr.Success(), nil
}).Once()
err := ib.meta.indexMeta.AddSegmentIndex(segIdx)
assert.NoError(t, err)
ib.enqueue(buildID)
waitTaskDoneFunc(ib)
resetMetaFunc()
})
t.Run("enqueue string", func(t *testing.T) {
mt.collections[collID].Schema.Fields[1].DataType = schemapb.DataType_String
ic.EXPECT().CreateJob(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *indexpb.CreateJobRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
assert.NotZero(t, len(in.OptionalScalarFields), "optional scalar field should be set")
@ -1477,19 +1493,31 @@ func TestVecIndexWithOptionalScalarField(t *testing.T) {
resetMetaFunc()
})
t.Run("enqueue returns empty optional field when index is not HNSW", func(t *testing.T) {
t.Run("enqueue returns empty optional field when the data type is not STRING or VARCHAR", func(t *testing.T) {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
mt.indexMeta.indexes[collID][indexID].IndexParams[1].Value = indexparamcheck.IndexDISKANN
ic.EXPECT().CreateJob(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *indexpb.CreateJobRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
assert.Zero(t, len(in.OptionalScalarFields), "optional scalar field should be set")
return merr.Success(), nil
}).Once()
err := ib.meta.indexMeta.AddSegmentIndex(segIdx)
assert.NoError(t, err)
ib.enqueue(buildID)
waitTaskDoneFunc(ib)
resetMetaFunc()
for _, dataType := range []schemapb.DataType{
schemapb.DataType_Bool,
schemapb.DataType_Int8,
schemapb.DataType_Int16,
schemapb.DataType_Int32,
schemapb.DataType_Int64,
schemapb.DataType_Float,
schemapb.DataType_Double,
schemapb.DataType_Array,
schemapb.DataType_JSON,
} {
mt.collections[collID].Schema.Fields[1].DataType = dataType
ic.EXPECT().CreateJob(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *indexpb.CreateJobRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
assert.Zero(t, len(in.OptionalScalarFields), "optional scalar field should be set")
return merr.Success(), nil
}).Once()
err := ib.meta.indexMeta.AddSegmentIndex(segIdx)
assert.NoError(t, err)
ib.enqueue(buildID)
waitTaskDoneFunc(ib)
resetMetaFunc()
}
})
t.Run("enqueue returns empty optional field when no partition key", func(t *testing.T) {

View File

@ -192,10 +192,6 @@ func isFlatIndex(indexType string) bool {
return indexType == indexparamcheck.IndexFaissIDMap || indexType == indexparamcheck.IndexFaissBinIDMap
}
func isOptionalScalarFieldSupported(indexType string) bool {
return indexType == indexparamcheck.IndexHNSW
}
func isDiskANNIndex(indexType string) bool {
return indexType == indexparamcheck.IndexDISKANN
}

View File

@ -294,6 +294,20 @@ func (t *searchTask) checkNq(ctx context.Context) (int64, error) {
return nq, nil
}
func setQueryInfoIfMvEnable(queryInfo *planpb.QueryInfo, t *searchTask) error {
if t.enableMaterializedView {
partitionKeyFieldSchema, err := typeutil.GetPartitionKeyFieldSchema(t.schema.CollectionSchema)
if err != nil {
log.Warn("failed to get partition key field schema", zap.Error(err))
return err
}
if typeutil.IsFieldDataTypeSupportMaterializedView(partitionKeyFieldSchema) {
queryInfo.MaterializedViewInvolved = true
}
}
return nil
}
func (t *searchTask) initAdvancedSearchRequest(ctx context.Context) error {
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "init advanced search request")
defer sp.End()
@ -332,11 +346,7 @@ func (t *searchTask) initAdvancedSearchRequest(ctx context.Context) error {
if len(partitionIDs) > 0 {
internalSubReq.PartitionIDs = partitionIDs
t.partitionIDsSet.Upsert(partitionIDs...)
if t.enableMaterializedView {
if planPtr := plan.GetVectorAnns(); planPtr != nil {
planPtr.QueryInfo.MaterializedViewInvolved = true
}
}
setQueryInfoIfMvEnable(queryInfo, t)
}
} else {
internalSubReq.PartitionIDs = t.SearchRequest.GetPartitionIDs()
@ -392,11 +402,7 @@ func (t *searchTask) initSearchRequest(ctx context.Context) error {
}
if len(partitionIDs) > 0 {
t.SearchRequest.PartitionIDs = partitionIDs
if t.enableMaterializedView {
if planPtr := plan.GetVectorAnns(); planPtr != nil {
planPtr.QueryInfo.MaterializedViewInvolved = true
}
}
setQueryInfoIfMvEnable(queryInfo, t)
}
}

View File

@ -2728,3 +2728,140 @@ func TestSearchTask_CanSkipAllocTimestamp(t *testing.T) {
assert.True(t, skip)
})
}
type MaterializedViewTestSuite struct {
suite.Suite
mockMetaCache *MockCache
ctx context.Context
cancelFunc context.CancelFunc
dbName string
colName string
colID UniqueID
fieldName2Types map[string]schemapb.DataType
}
func (s *MaterializedViewTestSuite) SetupSuite() {
s.ctx, s.cancelFunc = context.WithCancel(context.Background())
s.dbName = "TestMvDbName"
s.colName = "TestMvColName"
s.colID = UniqueID(123)
s.fieldName2Types = map[string]schemapb.DataType{
testInt64Field: schemapb.DataType_Int64,
testVarCharField: schemapb.DataType_VarChar,
testFloatVecField: schemapb.DataType_FloatVector,
}
}
func (s *MaterializedViewTestSuite) TearDownSuite() {
s.cancelFunc()
}
func (s *MaterializedViewTestSuite) SetupTest() {
s.mockMetaCache = NewMockCache(s.T())
s.mockMetaCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.colID, nil).Maybe()
s.mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&collectionBasicInfo{
collID: s.colID,
}, nil).Maybe()
globalMetaCache = s.mockMetaCache
}
func (s *MaterializedViewTestSuite) TearDownTest() {
globalMetaCache = nil
}
func (s *MaterializedViewTestSuite) getSearchTask() *searchTask {
task := &searchTask{
ctx: s.ctx,
collectionName: s.colName,
SearchRequest: &internalpb.SearchRequest{},
request: &milvuspb.SearchRequest{
DbName: dbName,
CollectionName: s.colName,
Nq: 1,
SearchParams: getBaseSearchParams(),
},
}
s.NoError(task.OnEnqueue())
return task
}
func (s *MaterializedViewTestSuite) TestMvNotEnabledWithNoPartitionKey() {
task := s.getSearchTask()
task.enableMaterializedView = false
schema := constructCollectionSchemaByDataType(s.colName, s.fieldName2Types, testInt64Field, false)
schemaInfo := newSchemaInfo(schema)
s.mockMetaCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(schemaInfo, nil)
err := task.PreExecute(s.ctx)
s.NoError(err)
s.NotZero(len(task.queryInfos))
s.Equal(false, task.queryInfos[0].MaterializedViewInvolved)
}
func (s *MaterializedViewTestSuite) TestMvNotEnabledWithPartitionKey() {
task := s.getSearchTask()
task.enableMaterializedView = false
task.request.Dsl = testInt64Field + " == 1"
schema := ConstructCollectionSchemaWithPartitionKey(s.colName, s.fieldName2Types, testInt64Field, testInt64Field, false)
schemaInfo := newSchemaInfo(schema)
s.mockMetaCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(schemaInfo, nil)
s.mockMetaCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything).Return([]string{"partition_1", "partition_2"}, nil)
s.mockMetaCache.EXPECT().GetPartitions(mock.Anything, mock.Anything, mock.Anything).Return(map[string]int64{"partition_1": 1, "partition_2": 2}, nil)
err := task.PreExecute(s.ctx)
s.NoError(err)
s.NotZero(len(task.queryInfos))
s.Equal(false, task.queryInfos[0].MaterializedViewInvolved)
}
func (s *MaterializedViewTestSuite) TestMvEnabledNoPartitionKey() {
task := s.getSearchTask()
task.enableMaterializedView = true
schema := constructCollectionSchemaByDataType(s.colName, s.fieldName2Types, testInt64Field, false)
schemaInfo := newSchemaInfo(schema)
s.mockMetaCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(schemaInfo, nil)
err := task.PreExecute(s.ctx)
s.NoError(err)
s.NotZero(len(task.queryInfos))
s.Equal(false, task.queryInfos[0].MaterializedViewInvolved)
}
func (s *MaterializedViewTestSuite) TestMvEnabledPartitionKeyOnInt64() {
task := s.getSearchTask()
task.enableMaterializedView = true
task.request.Dsl = testInt64Field + " == 1"
schema := ConstructCollectionSchemaWithPartitionKey(s.colName, s.fieldName2Types, testInt64Field, testInt64Field, false)
schemaInfo := newSchemaInfo(schema)
s.mockMetaCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(schemaInfo, nil)
s.mockMetaCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything).Return([]string{"partition_1", "partition_2"}, nil)
s.mockMetaCache.EXPECT().GetPartitions(mock.Anything, mock.Anything, mock.Anything).Return(map[string]int64{"partition_1": 1, "partition_2": 2}, nil)
err := task.PreExecute(s.ctx)
s.NoError(err)
s.NotZero(len(task.queryInfos))
s.Equal(false, task.queryInfos[0].MaterializedViewInvolved)
}
func (s *MaterializedViewTestSuite) TestMvEnabledPartitionKeyOnVarChar() {
task := s.getSearchTask()
task.enableMaterializedView = true
task.request.Dsl = testVarCharField + " == \"a\""
schema := ConstructCollectionSchemaWithPartitionKey(s.colName, s.fieldName2Types, testInt64Field, testVarCharField, false)
schemaInfo := newSchemaInfo(schema)
s.mockMetaCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(schemaInfo, nil)
s.mockMetaCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything).Return([]string{"partition_1", "partition_2"}, nil)
s.mockMetaCache.EXPECT().GetPartitions(mock.Anything, mock.Anything, mock.Anything).Return(map[string]int64{"partition_1": 1, "partition_2": 2}, nil)
err := task.PreExecute(s.ctx)
s.NoError(err)
s.NotZero(len(task.queryInfos))
s.Equal(true, task.queryInfos[0].MaterializedViewInvolved)
}
func TestMaterializedView(t *testing.T) {
suite.Run(t, new(MaterializedViewTestSuite))
}

View File

@ -1087,6 +1087,10 @@ func HasPartitionKey(schema *schemapb.CollectionSchema) bool {
return false
}
func IsFieldDataTypeSupportMaterializedView(fieldSchema *schemapb.FieldSchema) bool {
return fieldSchema.DataType == schemapb.DataType_VarChar || fieldSchema.DataType == schemapb.DataType_String
}
// GetPrimaryFieldData get primary field data from all field data inserted from sdk
func GetPrimaryFieldData(datas []*schemapb.FieldData, primaryFieldSchema *schemapb.FieldSchema) (*schemapb.FieldData, error) {
primaryFieldID := primaryFieldSchema.FieldID

View File

@ -0,0 +1,210 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package materializedview
import (
"context"
"testing"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/tests/integration"
)
type MaterializedViewTestSuite struct {
integration.MiniClusterSuite
isPartitionKeyEnable bool
partitionKeyFieldDataType schemapb.DataType
}
// func (s *MaterializedViewTestSuite) SetupTest() {
// s.T().Log("Setup in mv")
// }
func (s *MaterializedViewTestSuite) run() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := s.Cluster
const (
dim = 128
dbName = ""
rowNum = 1000
partitionKeyFieldName = "pid"
)
collectionName := "IntegrationTestMaterializedView" + funcutil.GenRandomStr()
schema := integration.ConstructSchema(collectionName, dim, false)
schema.Fields = append(schema.Fields, &schemapb.FieldSchema{
FieldID: 102,
Name: partitionKeyFieldName,
Description: "",
DataType: s.partitionKeyFieldDataType,
TypeParams: []*commonpb.KeyValuePair{{Key: "max_length", Value: "100"}},
IndexParams: nil,
IsPartitionKey: s.isPartitionKeyEnable,
})
marshaledSchema, err := proto.Marshal(schema)
s.NoError(err)
createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
Schema: marshaledSchema,
ShardsNum: common.DefaultShardsNum,
})
s.NoError(err)
s.NoError(merr.Error(createCollectionStatus))
pkFieldData := integration.NewInt64FieldData(integration.Int64Field, rowNum)
vecFieldData := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim)
var partitionKeyFieldData *schemapb.FieldData
switch s.partitionKeyFieldDataType {
case schemapb.DataType_Int64:
partitionKeyFieldData = integration.NewInt64SameFieldData(partitionKeyFieldName, rowNum, 0)
case schemapb.DataType_VarChar:
partitionKeyFieldData = integration.NewVarCharSameFieldData(partitionKeyFieldName, rowNum, "a")
default:
s.FailNow("unsupported partition key field data type")
}
hashKeys := integration.GenerateHashKeys(rowNum)
insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{
DbName: dbName,
CollectionName: collectionName,
FieldsData: []*schemapb.FieldData{pkFieldData, vecFieldData, partitionKeyFieldData},
HashKeys: hashKeys,
NumRows: uint32(rowNum),
})
s.NoError(err)
s.True(merr.Ok(insertResult.GetStatus()))
flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{
DbName: dbName,
CollectionNames: []string{collectionName},
})
s.NoError(err)
segmentIDs, has := flushResp.GetCollSegIDs()[collectionName]
ids := segmentIDs.GetData()
s.Require().NotEmpty(segmentIDs)
s.Require().True(has)
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
// create index
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: integration.FloatVecField,
IndexName: "_default",
ExtraParams: integration.ConstructIndexParam(dim, integration.IndexHNSW, metric.L2),
})
s.NoError(err)
s.NoError(merr.Error(createIndexStatus))
s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)
// load
loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
})
s.NoError(err)
s.NoError(merr.Error(loadStatus))
s.WaitForLoad(ctx, collectionName)
{
var expr string
switch s.partitionKeyFieldDataType {
case schemapb.DataType_Int64:
expr = partitionKeyFieldName + " == 0"
case schemapb.DataType_VarChar:
expr = partitionKeyFieldName + " == \"a\""
default:
s.FailNow("unsupported partition key field data type")
}
nq := 1
topk := 10
roundDecimal := -1
params := integration.GetSearchParams(integration.IndexHNSW, metric.L2)
searchReq := integration.ConstructSearchRequest("", collectionName, expr,
integration.FloatVecField, schemapb.DataType_FloatVector, nil, metric.L2, params, nq, dim, topk, roundDecimal)
searchResult, err := c.Proxy.Search(ctx, searchReq)
s.NoError(err)
s.NoError(merr.Error(searchResult.GetStatus()))
s.Equal(topk, len(searchResult.GetResults().GetScores()))
}
status, err := s.Cluster.Proxy.DropCollection(ctx, &milvuspb.DropCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
})
s.Require().NoError(err)
s.NoError(merr.Error(status))
}
func (s *MaterializedViewTestSuite) TestPartitionKeyDisabledInt64() {
s.isPartitionKeyEnable = false
s.partitionKeyFieldDataType = schemapb.DataType_Int64
s.run()
}
func (s *MaterializedViewTestSuite) TestMvInt64() {
s.isPartitionKeyEnable = true
s.partitionKeyFieldDataType = schemapb.DataType_Int64
s.run()
}
func (s *MaterializedViewTestSuite) TestPartitionKeyDisabledVarChar() {
s.isPartitionKeyEnable = false
s.partitionKeyFieldDataType = schemapb.DataType_VarChar
s.run()
}
func (s *MaterializedViewTestSuite) TestMvVarChar() {
s.isPartitionKeyEnable = true
s.partitionKeyFieldDataType = schemapb.DataType_VarChar
s.run()
}
func TestMaterializedViewEnabled(t *testing.T) {
paramtable.Init()
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
suite.Run(t, new(MaterializedViewTestSuite))
}

View File

@ -99,6 +99,22 @@ func NewInt64SameFieldData(fieldName string, numRows int, value int64) *schemapb
}
}
func NewVarCharSameFieldData(fieldName string, numRows int, value string) *schemapb.FieldData {
return &schemapb.FieldData{
Type: schemapb.DataType_String,
FieldName: fieldName,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_StringData{
StringData: &schemapb.StringArray{
Data: GenerateSameStringArray(numRows, value),
},
},
},
},
}
}
func NewStringFieldData(fieldName string, numRows int) *schemapb.FieldData {
return &schemapb.FieldData{
Type: schemapb.DataType_Int64,
@ -209,6 +225,14 @@ func GenerateSameInt64Array(numRows int, value int64) []int64 {
return ret
}
func GenerateSameStringArray(numRows int, value string) []string {
ret := make([]string, numRows)
for i := 0; i < numRows; i++ {
ret[i] = value
}
return ret
}
func GenerateStringArray(numRows int) []string {
ret := make([]string, numRows)
for i := 0; i < numRows; i++ {