diff --git a/internal/proxy/util.go b/internal/proxy/util.go index 7298b69182..b0ae463e36 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -957,9 +957,12 @@ func fillFieldsDataBySchema(schema *schemapb.CollectionSchema, insertMsg *msgstr isPrimaryKeyNum := 0 var dataNameSet = typeutil.NewSet[string]() - for _, data := range insertMsg.FieldsData { - dataNameSet.Insert(data.GetFieldName()) + fieldName := data.GetFieldName() + if dataNameSet.Contain(fieldName) { + return merr.WrapErrParameterDuplicateFieldData(fieldName, "The FieldDatas parameter being passed contains duplicate data for a field.") + } + dataNameSet.Insert(fieldName) } for _, fieldSchema := range schema.Fields { diff --git a/internal/proxy/util_test.go b/internal/proxy/util_test.go index 9aab6be4e3..20cdd70d2f 100644 --- a/internal/proxy/util_test.go +++ b/internal/proxy/util_test.go @@ -1168,6 +1168,20 @@ func Test_InsertTaskfillFieldsDataBySchema(t *testing.T) { assert.ErrorIs(t, merr.ErrParameterInvalid, err) assert.Equal(t, len(case5.insertMsg.FieldsData), 3) + // duplicate field datas + case5.insertMsg.FieldsData = []*schemapb.FieldData{ + { + FieldName: "a", + Type: schemapb.DataType_Int64, + }, + { + FieldName: "a", + Type: schemapb.DataType_Int64, + }, + } + err = fillFieldsDataBySchema(case5.schema, case5.insertMsg) + assert.Error(t, err) + // not pk, but autoid == true case6 := insertTask{ schema: &schemapb.CollectionSchema{ diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 7f9e5c74f6..e27975e6bd 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -362,6 +362,14 @@ func WrapErrParameterInvalidRange[T any](lower, upper, actual T, msg ...string) return err } +func WrapErrParameterDuplicateFieldData(fieldName string, msg ...string) error { + err := errors.Wrapf(ErrParameterInvalid, "field name=%v", fieldName) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "; ")) + } + return err +} + // Metrics related func WrapErrMetricNotFound(name string, msg ...string) error { err := errors.Wrapf(ErrMetricNotFound, "metric=%s", name) diff --git a/tests/integration/insert/insert_test.go b/tests/integration/insert/insert_test.go new file mode 100644 index 0000000000..2c6031c460 --- /dev/null +++ b/tests/integration/insert/insert_test.go @@ -0,0 +1,126 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package insert + +import ( + "context" + "testing" + + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/suite" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/schemapb" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/distance" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/tests/integration" +) + +type InsertSuite struct { + integration.MiniClusterSuite +} + +func (s *InsertSuite) TestInsert() { + c := s.Cluster + ctx, cancel := context.WithCancel(c.GetContext()) + defer cancel() + + prefix := "TestInsert" + dbName := "" + collectionName := prefix + funcutil.GenRandomStr() + dim := 128 + rowNum := 3000 + + schema := integration.ConstructSchema(collectionName, dim, false) + marshaledSchema, err := proto.Marshal(schema) + s.NoError(err) + + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: common.DefaultShardsNum, + }) + s.NoError(err) + + err = merr.Error(createCollectionStatus) + if err != nil { + log.Warn("createCollectionStatus fail reason", zap.Error(err)) + } + + log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) + showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + s.NoError(err) + s.True(merr.Ok(showCollectionsResp.GetStatus())) + log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) + + // create index + createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: integration.FloatVecField, + IndexName: "_default", + ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, distance.IP), + }) + s.NoError(err) + err = merr.Error(createIndexStatus) + if err != nil { + log.Warn("createIndexStatus fail reason", zap.Error(err)) + } + + s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) + log.Info("Create index done") + + // load + loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + err = merr.Error(loadStatus) + if err != nil { + log.Warn("LoadCollection fail reason", zap.Error(err)) + } + s.WaitForLoad(ctx, collectionName) + log.Info("Load collection done") + + pkFieldData := integration.NewInt64FieldData(integration.Int64Field, rowNum) + hashKeys := integration.GenerateHashKeys(rowNum) + insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ + DbName: dbName, + CollectionName: collectionName, + FieldsData: []*schemapb.FieldData{pkFieldData, pkFieldData}, + HashKeys: hashKeys, + NumRows: uint32(rowNum), + }) + s.NoError(err) + s.False(merr.Ok(insertResult.GetStatus())) + + log.Info("==================") + log.Info("==================") + log.Info("TestInsert succeed") + log.Info("==================") + log.Info("==================") + +} + +func TestInsert(t *testing.T) { + suite.Run(t, new(InsertSuite)) +}