mirror of https://github.com/milvus-io/milvus.git
fix: disable sparse and binary in MV (#35126)
issue: #35120 Signed-off-by: Patrick Weizhi Xu <weizhi.xu@zilliz.com> (cherry picked from commit 0d340ebe7412cd85963825bc8abd3c874e339679)pull/35144/head
parent
f7f9a729c9
commit
acaa78db58
|
@ -107,38 +107,6 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
|
|||
it.SetState(indexpb.JobState_JobStateFinished, "fake finished index success")
|
||||
return true
|
||||
}
|
||||
// vector index build needs information of optional scalar fields data
|
||||
optionalFields := make([]*indexpb.OptionalFieldInfo, 0)
|
||||
partitionKeyIsolation := false
|
||||
if Params.CommonCfg.EnableMaterializedView.GetAsBool() && isOptionalScalarFieldSupported(indexType) {
|
||||
collInfo, err := dependency.handler.GetCollection(ctx, segIndex.CollectionID)
|
||||
if err != nil || collInfo == nil {
|
||||
log.Ctx(ctx).Warn("get collection failed", zap.Int64("collID", segIndex.CollectionID), zap.Error(err))
|
||||
it.SetState(indexpb.JobState_JobStateInit, err.Error())
|
||||
return true
|
||||
}
|
||||
colSchema := collInfo.Schema
|
||||
partitionKeyField, err := typeutil.GetPartitionKeyFieldSchema(colSchema)
|
||||
if partitionKeyField == nil || err != nil {
|
||||
log.Ctx(ctx).Warn("index builder get partition key field failed", zap.Int64("taskID", it.taskID), zap.Error(err))
|
||||
} else {
|
||||
if typeutil.IsFieldDataTypeSupportMaterializedView(partitionKeyField) {
|
||||
optionalFields = append(optionalFields, &indexpb.OptionalFieldInfo{
|
||||
FieldID: partitionKeyField.FieldID,
|
||||
FieldName: partitionKeyField.Name,
|
||||
FieldType: int32(partitionKeyField.DataType),
|
||||
DataIds: getBinLogIDs(segment, partitionKeyField.FieldID),
|
||||
})
|
||||
iso, isoErr := common.IsPartitionKeyIsolationPropEnabled(collInfo.Properties)
|
||||
if isoErr != nil {
|
||||
log.Ctx(ctx).Warn("failed to parse partition key isolation", zap.Error(isoErr))
|
||||
}
|
||||
if iso {
|
||||
partitionKeyIsolation = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
typeParams := dependency.meta.indexMeta.GetTypeParams(segIndex.CollectionID, segIndex.IndexID)
|
||||
|
||||
|
@ -202,6 +170,37 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
|
|||
// don't return, maybe field is scalar field or sparseFloatVector
|
||||
}
|
||||
|
||||
// vector index build needs information of optional scalar fields data
|
||||
optionalFields := make([]*indexpb.OptionalFieldInfo, 0)
|
||||
partitionKeyIsolation := false
|
||||
if Params.CommonCfg.EnableMaterializedView.GetAsBool() && isOptionalScalarFieldSupported(indexType) && typeutil.IsDenseFloatVectorType(field.DataType) {
|
||||
if collectionInfo == nil {
|
||||
log.Ctx(ctx).Warn("get collection failed", zap.Int64("collID", segIndex.CollectionID), zap.Error(err))
|
||||
it.SetState(indexpb.JobState_JobStateInit, err.Error())
|
||||
return true
|
||||
}
|
||||
partitionKeyField, err := typeutil.GetPartitionKeyFieldSchema(schema)
|
||||
if partitionKeyField == nil || err != nil {
|
||||
log.Ctx(ctx).Warn("index builder get partition key field failed", zap.Int64("taskID", it.taskID), zap.Error(err))
|
||||
} else {
|
||||
if typeutil.IsFieldDataTypeSupportMaterializedView(partitionKeyField) {
|
||||
optionalFields = append(optionalFields, &indexpb.OptionalFieldInfo{
|
||||
FieldID: partitionKeyField.FieldID,
|
||||
FieldName: partitionKeyField.Name,
|
||||
FieldType: int32(partitionKeyField.DataType),
|
||||
DataIds: getBinLogIDs(segment, partitionKeyField.FieldID),
|
||||
})
|
||||
iso, isoErr := common.IsPartitionKeyIsolationPropEnabled(collectionInfo.Properties)
|
||||
if isoErr != nil {
|
||||
log.Ctx(ctx).Warn("failed to parse partition key isolation", zap.Error(isoErr))
|
||||
}
|
||||
if iso {
|
||||
partitionKeyIsolation = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if Params.CommonCfg.EnableStorageV2.GetAsBool() {
|
||||
storePath, err := itypeutil.GetStorageURI(params.Params.CommonCfg.StorageScheme.GetValue(), params.Params.CommonCfg.StoragePathPrefix.GetValue(), segment.GetID())
|
||||
if err != nil {
|
||||
|
|
|
@ -1349,7 +1349,7 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() {
|
|||
},
|
||||
},
|
||||
}, nil
|
||||
}).Twice()
|
||||
}).Once()
|
||||
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(nil, errors.New("mock error")).Once()
|
||||
|
||||
// retry --> init
|
||||
|
@ -1366,7 +1366,7 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() {
|
|||
{FieldID: s.fieldID, Name: "vec", TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "10"}}},
|
||||
},
|
||||
},
|
||||
}, nil).Twice()
|
||||
}, nil).Once()
|
||||
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil).Once()
|
||||
|
||||
// inProgress --> Finished
|
||||
|
@ -1595,6 +1595,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
|
|||
mt.indexMeta.buildID2SegmentIndex[buildID].IndexState = commonpb.IndexState_Unissued
|
||||
mt.indexMeta.segmentIndexes[segID][indexID].IndexState = commonpb.IndexState_Unissued
|
||||
mt.indexMeta.indexes[collID][indexID].IndexParams[1].Value = indexparamcheck.IndexHNSW
|
||||
mt.collections[collID].Schema.Fields[0].DataType = schemapb.DataType_FloatVector
|
||||
mt.collections[collID].Schema.Fields[1].IsPartitionKey = true
|
||||
mt.collections[collID].Schema.Fields[1].DataType = schemapb.DataType_VarChar
|
||||
}
|
||||
|
@ -1681,7 +1682,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
|
|||
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
|
||||
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
|
||||
func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||
s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set")
|
||||
s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should not be set")
|
||||
return merr.Success(), nil
|
||||
}).Once()
|
||||
t := &indexBuildTask{
|
||||
|
@ -1698,6 +1699,33 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
|
|||
resetMetaFunc()
|
||||
})
|
||||
|
||||
s.Run("enqueue returns empty when vector type is not dense vector", func() {
|
||||
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
|
||||
for _, dataType := range []schemapb.DataType{
|
||||
schemapb.DataType_BinaryVector,
|
||||
schemapb.DataType_SparseFloatVector,
|
||||
} {
|
||||
mt.collections[collID].Schema.Fields[0].DataType = dataType
|
||||
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
|
||||
func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||
s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should not be set")
|
||||
return merr.Success(), nil
|
||||
}).Once()
|
||||
t := &indexBuildTask{
|
||||
taskID: buildID,
|
||||
nodeID: nodeID,
|
||||
taskInfo: &indexpb.IndexTaskInfo{
|
||||
BuildID: buildID,
|
||||
State: commonpb.IndexState_Unissued,
|
||||
FailReason: "",
|
||||
},
|
||||
}
|
||||
scheduler.enqueue(t)
|
||||
waitTaskDoneFunc(scheduler)
|
||||
resetMetaFunc()
|
||||
}
|
||||
})
|
||||
|
||||
s.Run("enqueue returns empty optional field when the data type is not STRING or VARCHAR or Integer", func() {
|
||||
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
|
||||
for _, dataType := range []schemapb.DataType{
|
||||
|
@ -1710,7 +1738,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
|
|||
mt.collections[collID].Schema.Fields[1].DataType = dataType
|
||||
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
|
||||
func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||
s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set")
|
||||
s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should not be set")
|
||||
return merr.Success(), nil
|
||||
}).Once()
|
||||
t := &indexBuildTask{
|
||||
|
@ -1733,7 +1761,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
|
|||
mt.collections[collID].Schema.Fields[1].IsPartitionKey = false
|
||||
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
|
||||
func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||
s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set")
|
||||
s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should not be set")
|
||||
return merr.Success(), nil
|
||||
}).Once()
|
||||
t := &indexBuildTask{
|
||||
|
|
Loading…
Reference in New Issue