fix: check utf-8 format for varchar with analyzer open (#39299)

relate: https://github.com/milvus-io/milvus/issues/39285

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
pull/39665/head
aoiasd 2025-02-06 17:11:51 +08:00 committed by GitHub
parent 5669016af0
commit 2b4caba76e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 52 additions and 0 deletions

View File

@ -197,6 +197,13 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
return err
}
// check varchar with analyzer was utf-8 format
err = checkVarcharFormat(it.schema, it.insertMsg)
if err != nil {
log.Warn("check varchar format failed", zap.Error(err))
return err
}
// set field ID to insert field data
err = fillFieldPropertiesBySchema(it.insertMsg.GetFieldsData(), schema.CollectionSchema)
if err != nil {

View File

@ -208,6 +208,14 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error {
zap.Error(err))
return merr.WrapErrAsInputErrorWhen(err, merr.ErrParameterInvalid)
}
// check varchar with analyzer was utf-8 format
err = checkVarcharFormat(it.schema.CollectionSchema, it.upsertMsg.InsertMsg)
if err != nil {
log.Warn("check varchar format failed", zap.Error(err))
return err
}
// set field ID to insert field data
err = fillFieldPropertiesBySchema(it.upsertMsg.InsertMsg.GetFieldsData(), it.schema.CollectionSchema)
if err != nil {

View File

@ -23,6 +23,7 @@ import (
"strconv"
"strings"
"time"
"unicode/utf8"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
@ -1609,6 +1610,42 @@ func checkPrimaryFieldData(schema *schemapb.CollectionSchema, insertMsg *msgstre
return ids, nil
}
// for some varchar with analzyer
// we need check char format before insert it to message queue
// now only support utf-8
func checkVarcharFormat(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg) error {
checkeFields := lo.FilterMap(schema.GetFields(), func(field *schemapb.FieldSchema, _ int) (int64, bool) {
if field.DataType != schemapb.DataType_VarChar {
return 0, false
}
for _, kv := range field.GetTypeParams() {
if kv.Key == common.EnableAnalyzerKey {
return field.GetFieldID(), true
}
}
return 0, false
})
if len(checkeFields) == 0 {
return nil
}
for _, fieldData := range insertMsg.FieldsData {
if !lo.Contains(checkeFields, fieldData.GetFieldId()) {
continue
}
for row, data := range fieldData.GetScalars().GetStringData().GetData() {
ok := utf8.ValidString(data)
if !ok {
return merr.WrapErrAsInputError(fmt.Errorf("varchar with analyzer should be utf-8 format, but row: %d not utf-8 varchar. data: %s", row, data))
}
}
}
return nil
}
func checkUpsertPrimaryFieldData(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg) (*schemapb.IDs, *schemapb.IDs, error) {
log := log.With(zap.String("collectionName", insertMsg.CollectionName))
rowNums := uint32(insertMsg.NRows())