diff --git a/internal/proxy/task_insert.go b/internal/proxy/task_insert.go index 9ccace6032..fced82d481 100644 --- a/internal/proxy/task_insert.go +++ b/internal/proxy/task_insert.go @@ -197,6 +197,13 @@ func (it *insertTask) PreExecute(ctx context.Context) error { return err } + // check varchar with analyzer was utf-8 format + err = checkVarcharFormat(it.schema, it.insertMsg) + if err != nil { + log.Warn("check varchar format failed", zap.Error(err)) + return err + } + // set field ID to insert field data err = fillFieldPropertiesBySchema(it.insertMsg.GetFieldsData(), schema.CollectionSchema) if err != nil { diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index c5848b7638..ab6fbcd0f6 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -208,6 +208,14 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error { zap.Error(err)) return merr.WrapErrAsInputErrorWhen(err, merr.ErrParameterInvalid) } + + // check varchar with analyzer was utf-8 format + err = checkVarcharFormat(it.schema.CollectionSchema, it.upsertMsg.InsertMsg) + if err != nil { + log.Warn("check varchar format failed", zap.Error(err)) + return err + } + // set field ID to insert field data err = fillFieldPropertiesBySchema(it.upsertMsg.InsertMsg.GetFieldsData(), it.schema.CollectionSchema) if err != nil { diff --git a/internal/proxy/util.go b/internal/proxy/util.go index e8dac109c6..12e0b4e871 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -23,6 +23,7 @@ import ( "strconv" "strings" "time" + "unicode/utf8" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -1609,6 +1610,42 @@ func checkPrimaryFieldData(schema *schemapb.CollectionSchema, insertMsg *msgstre return ids, nil } +// for some varchar with analzyer +// we need check char format before insert it to message queue +// now only support utf-8 +func checkVarcharFormat(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg) error { + checkeFields := lo.FilterMap(schema.GetFields(), func(field *schemapb.FieldSchema, _ int) (int64, bool) { + if field.DataType != schemapb.DataType_VarChar { + return 0, false + } + + for _, kv := range field.GetTypeParams() { + if kv.Key == common.EnableAnalyzerKey { + return field.GetFieldID(), true + } + } + return 0, false + }) + + if len(checkeFields) == 0 { + return nil + } + + for _, fieldData := range insertMsg.FieldsData { + if !lo.Contains(checkeFields, fieldData.GetFieldId()) { + continue + } + + for row, data := range fieldData.GetScalars().GetStringData().GetData() { + ok := utf8.ValidString(data) + if !ok { + return merr.WrapErrAsInputError(fmt.Errorf("varchar with analyzer should be utf-8 format, but row: %d not utf-8 varchar. data: %s", row, data)) + } + } + } + return nil +} + func checkUpsertPrimaryFieldData(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg) (*schemapb.IDs, *schemapb.IDs, error) { log := log.With(zap.String("collectionName", insertMsg.CollectionName)) rowNums := uint32(insertMsg.NRows())