From 1594122c0ac91a1349a051f1ae45fa6a4558ce32 Mon Sep 17 00:00:00 2001
From: "yihao.dai" <yihao.dai@zilliz.com>
Date: Sun, 28 Apr 2024 19:39:25 +0800
Subject: [PATCH] enhance: Make the dynamic field file optional during numpy
 import (#32596)

1. Make the dynamic field file optional during numpy import
2. Add integration importing test with dynamic
3. Disallow file of pk when autoID=true during numpy import

issue: https://github.com/milvus-io/milvus/issues/32542

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
---
 internal/util/importutilv2/numpy/reader.go    |  13 +-
 .../util/importutilv2/numpy/reader_test.go    |  43 +++-
 internal/util/importutilv2/numpy/util.go      |   5 +-
 internal/util/importutilv2/reader.go          |   2 +-
 .../integration/import/dynamic_field_test.go  | 204 ++++++++++++++++++
 5 files changed, 259 insertions(+), 8 deletions(-)
 create mode 100644 tests/integration/import/dynamic_field_test.go

diff --git a/internal/util/importutilv2/numpy/reader.go b/internal/util/importutilv2/numpy/reader.go
index 6acabe1f37..1f6007b0e1 100644
--- a/internal/util/importutilv2/numpy/reader.go
+++ b/internal/util/importutilv2/numpy/reader.go
@@ -43,7 +43,7 @@ type reader struct {
 	frs   map[int64]*FieldReader // fieldID -> FieldReader
 }
 
-func NewReader(ctx context.Context, schema *schemapb.CollectionSchema, paths []string, cm storage.ChunkManager, bufferSize int) (*reader, error) {
+func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.CollectionSchema, paths []string, bufferSize int) (*reader, error) {
 	fields := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 {
 		return field.GetFieldID()
 	})
@@ -52,7 +52,7 @@ func NewReader(ctx context.Context, schema *schemapb.CollectionSchema, paths []s
 		return nil, err
 	}
 	crs := make(map[int64]*FieldReader)
-	readers, err := CreateReaders(ctx, paths, cm, schema)
+	readers, err := CreateReaders(ctx, cm, schema, paths)
 	if err != nil {
 		return nil, err
 	}
@@ -118,7 +118,7 @@ func (r *reader) Close() {
 	}
 }
 
-func CreateReaders(ctx context.Context, paths []string, cm storage.ChunkManager, schema *schemapb.CollectionSchema) (map[int64]io.Reader, error) {
+func CreateReaders(ctx context.Context, cm storage.ChunkManager, schema *schemapb.CollectionSchema, paths []string) (map[int64]io.Reader, error) {
 	readers := make(map[int64]io.Reader)
 	nameToPath := lo.SliceToMap(paths, func(path string) (string, string) {
 		nameWithExt := filepath.Base(path)
@@ -127,9 +127,16 @@ func CreateReaders(ctx context.Context, paths []string, cm storage.ChunkManager,
 	})
 	for _, field := range schema.GetFields() {
 		if field.GetIsPrimaryKey() && field.GetAutoID() {
+			if _, ok := nameToPath[field.GetName()]; ok {
+				return nil, merr.WrapErrImportFailed(
+					fmt.Sprintf("the primary key '%s' is auto-generated, no need to provide", field.GetName()))
+			}
 			continue
 		}
 		if _, ok := nameToPath[field.GetName()]; !ok {
+			if field.GetIsDynamic() {
+				continue
+			}
 			return nil, merr.WrapErrImportFailed(
 				fmt.Sprintf("no file for field: %s, files: %v", field.GetName(), lo.Values(nameToPath)))
 		}
diff --git a/internal/util/importutilv2/numpy/reader_test.go b/internal/util/importutilv2/numpy/reader_test.go
index 2189cf8961..b990b0f578 100644
--- a/internal/util/importutilv2/numpy/reader_test.go
+++ b/internal/util/importutilv2/numpy/reader_test.go
@@ -309,7 +309,7 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) {
 		}
 	}
 
-	reader, err := NewReader(context.Background(), schema, lo.Values(files), cm, math.MaxInt)
+	reader, err := NewReader(context.Background(), cm, schema, lo.Values(files), math.MaxInt)
 	suite.NoError(err)
 
 	checkFn := func(actualInsertData *storage.InsertData, offsetBegin, expectRows int) {
@@ -447,7 +447,7 @@ func (suite *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) {
 		}
 	}
 
-	reader, err := NewReader(context.Background(), schema, lo.Values(files), cm, math.MaxInt)
+	reader, err := NewReader(context.Background(), cm, schema, lo.Values(files), math.MaxInt)
 	suite.NoError(err)
 
 	_, err = reader.Read()
@@ -486,3 +486,42 @@ func (suite *ReaderSuite) TestVector() {
 func TestUtil(t *testing.T) {
 	suite.Run(t, new(ReaderSuite))
 }
+
+func TestCreateReaders(t *testing.T) {
+	ctx := context.Background()
+	cm := mocks.NewChunkManager(t)
+	cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(nil, nil)
+
+	// normal
+	schema := &schemapb.CollectionSchema{
+		Fields: []*schemapb.FieldSchema{
+			{Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true},
+			{Name: "vec", DataType: schemapb.DataType_FloatVector},
+			{Name: "json", DataType: schemapb.DataType_JSON},
+		},
+	}
+	_, err := CreateReaders(ctx, cm, schema, []string{"pk", "vec", "json"})
+	assert.NoError(t, err)
+
+	// auto id
+	schema = &schemapb.CollectionSchema{
+		Fields: []*schemapb.FieldSchema{
+			{Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, AutoID: true},
+			{Name: "vec", DataType: schemapb.DataType_FloatVector},
+			{Name: "json", DataType: schemapb.DataType_JSON},
+		},
+	}
+	_, err = CreateReaders(ctx, cm, schema, []string{"pk", "vec", "json"})
+	assert.Error(t, err)
+
+	// $meta
+	schema = &schemapb.CollectionSchema{
+		Fields: []*schemapb.FieldSchema{
+			{Name: "pk", DataType: schemapb.DataType_Int64, AutoID: true},
+			{Name: "vec", DataType: schemapb.DataType_FloatVector},
+			{Name: "$meta", DataType: schemapb.DataType_JSON, IsDynamic: true},
+		},
+	}
+	_, err = CreateReaders(ctx, cm, schema, []string{"pk", "vec"})
+	assert.NoError(t, err)
+}
diff --git a/internal/util/importutilv2/numpy/util.go b/internal/util/importutilv2/numpy/util.go
index 2acd5f8982..c43dd73ca6 100644
--- a/internal/util/importutilv2/numpy/util.go
+++ b/internal/util/importutilv2/numpy/util.go
@@ -261,11 +261,12 @@ func fillDynamicData(data *storage.InsertData, schema *schemapb.CollectionSchema
 	if dynamicField == nil {
 		return nil
 	}
-	rowNum := getInsertDataRowNum(data, schema)
+	totalRowNum := getInsertDataRowNum(data, schema)
 	dynamicData := data.Data[dynamicField.GetFieldID()]
 	jsonFD := dynamicData.(*storage.JSONFieldData)
 	bs := []byte("{}")
-	for i := 0; i < rowNum-dynamicData.RowNum(); i++ {
+	existedRowNum := dynamicData.RowNum()
+	for i := 0; i < totalRowNum-existedRowNum; i++ {
 		jsonFD.Data = append(jsonFD.Data, bs)
 	}
 	data.Data[dynamicField.GetFieldID()] = dynamicData
diff --git a/internal/util/importutilv2/reader.go b/internal/util/importutilv2/reader.go
index d3c0d532fb..de142feca1 100644
--- a/internal/util/importutilv2/reader.go
+++ b/internal/util/importutilv2/reader.go
@@ -67,7 +67,7 @@ func NewReader(ctx context.Context,
 	case JSON:
 		return json.NewReader(ctx, cm, schema, importFile.GetPaths()[0], bufferSize)
 	case Numpy:
-		return numpy.NewReader(ctx, schema, importFile.GetPaths(), cm, bufferSize)
+		return numpy.NewReader(ctx, cm, schema, importFile.GetPaths(), bufferSize)
 	case Parquet:
 		return parquet.NewReader(ctx, cm, schema, importFile.GetPaths()[0], bufferSize)
 	}
diff --git a/tests/integration/import/dynamic_field_test.go b/tests/integration/import/dynamic_field_test.go
new file mode 100644
index 0000000000..6af4c83146
--- /dev/null
+++ b/tests/integration/import/dynamic_field_test.go
@@ -0,0 +1,204 @@
+// Licensed to the LF AI & Data foundation under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package importv2
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"os"
+	"strings"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+	"github.com/samber/lo"
+	"go.uber.org/zap"
+
+	"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
+	"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
+	"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
+	"github.com/milvus-io/milvus/internal/proto/internalpb"
+	"github.com/milvus-io/milvus/internal/util/importutilv2"
+	"github.com/milvus-io/milvus/pkg/common"
+	"github.com/milvus-io/milvus/pkg/log"
+	"github.com/milvus-io/milvus/pkg/util/funcutil"
+	"github.com/milvus-io/milvus/pkg/util/merr"
+	"github.com/milvus-io/milvus/pkg/util/metric"
+	"github.com/milvus-io/milvus/tests/integration"
+)
+
+func (s *BulkInsertSuite) testImportDynamicField() {
+	const (
+		rowCount = 10000
+	)
+
+	c := s.Cluster
+	ctx, cancel := context.WithTimeout(c.GetContext(), 60*time.Second)
+	defer cancel()
+
+	collectionName := "TestBulkInsert_B_" + funcutil.GenRandomStr()
+
+	schema := integration.ConstructSchema(collectionName, dim, true, &schemapb.FieldSchema{
+		FieldID:      100,
+		Name:         integration.Int64Field,
+		IsPrimaryKey: true,
+		DataType:     schemapb.DataType_Int64,
+		AutoID:       true,
+	}, &schemapb.FieldSchema{
+		FieldID:  101,
+		Name:     integration.FloatVecField,
+		DataType: schemapb.DataType_FloatVector,
+		TypeParams: []*commonpb.KeyValuePair{
+			{
+				Key:   common.DimKey,
+				Value: fmt.Sprintf("%d", dim),
+			},
+		},
+	})
+	schema.EnableDynamicField = true
+	marshaledSchema, err := proto.Marshal(schema)
+	s.NoError(err)
+
+	createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
+		DbName:         "",
+		CollectionName: collectionName,
+		Schema:         marshaledSchema,
+		ShardsNum:      common.DefaultShardsNum,
+	})
+	s.NoError(err)
+	s.Equal(int32(0), createCollectionStatus.GetCode())
+
+	// create index
+	createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
+		CollectionName: collectionName,
+		FieldName:      integration.FloatVecField,
+		IndexName:      "_default",
+		ExtraParams:    integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, metric.L2),
+	})
+	s.NoError(err)
+	s.Equal(int32(0), createIndexStatus.GetCode())
+
+	s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)
+
+	// import
+	var files []*internalpb.ImportFile
+	err = os.MkdirAll(c.ChunkManager.RootPath(), os.ModePerm)
+	s.NoError(err)
+
+	switch s.fileType {
+	case importutilv2.Numpy:
+		importFile, err := GenerateNumpyFiles(c.ChunkManager, schema, rowCount)
+		s.NoError(err)
+		importFile.Paths = lo.Filter(importFile.Paths, func(path string, _ int) bool {
+			return !strings.Contains(path, "$meta")
+		})
+		files = []*internalpb.ImportFile{importFile}
+	case importutilv2.JSON:
+		rowBasedFile := c.ChunkManager.RootPath() + "/" + "test.json"
+		GenerateJSONFile(s.T(), rowBasedFile, schema, rowCount)
+		defer os.Remove(rowBasedFile)
+		files = []*internalpb.ImportFile{
+			{
+				Paths: []string{
+					rowBasedFile,
+				},
+			},
+		}
+	case importutilv2.Parquet:
+		filePath := fmt.Sprintf("/tmp/test_%d.parquet", rand.Int())
+		schema.Fields = append(schema.Fields, &schemapb.FieldSchema{
+			FieldID:  102,
+			Name:     "$meta",
+			DataType: schemapb.DataType_JSON,
+		})
+		err = GenerateParquetFile(filePath, schema, rowCount)
+		s.NoError(err)
+		defer os.Remove(filePath)
+		files = []*internalpb.ImportFile{
+			{
+				Paths: []string{
+					filePath,
+				},
+			},
+		}
+	}
+
+	importResp, err := c.Proxy.ImportV2(ctx, &internalpb.ImportRequest{
+		CollectionName: collectionName,
+		Files:          files,
+	})
+	s.NoError(err)
+	s.Equal(int32(0), importResp.GetStatus().GetCode())
+	log.Info("Import result", zap.Any("importResp", importResp))
+
+	jobID := importResp.GetJobID()
+	err = WaitForImportDone(ctx, c, jobID)
+	s.NoError(err)
+
+	// load
+	loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
+		CollectionName: collectionName,
+	})
+	s.NoError(err)
+	s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode())
+	s.WaitForLoad(ctx, collectionName)
+
+	segments, err := c.MetaWatcher.ShowSegments()
+	s.NoError(err)
+	s.NotEmpty(segments)
+	log.Info("Show segments", zap.Any("segments", segments))
+
+	// load refresh
+	loadStatus, err = c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
+		CollectionName: collectionName,
+		Refresh:        true,
+	})
+	s.NoError(err)
+	s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode())
+	s.WaitForLoadRefresh(ctx, "", collectionName)
+
+	// search
+	expr := fmt.Sprintf("%s > 0", integration.Int64Field)
+	nq := 10
+	topk := 10
+	roundDecimal := -1
+
+	params := integration.GetSearchParams(integration.IndexFaissIvfFlat, metric.L2)
+	searchReq := integration.ConstructSearchRequest("", collectionName, expr,
+		integration.FloatVecField, schemapb.DataType_FloatVector, nil, metric.L2, params, nq, dim, topk, roundDecimal)
+
+	searchResult, err := c.Proxy.Search(ctx, searchReq)
+
+	err = merr.CheckRPCCall(searchResult, err)
+	s.NoError(err)
+	s.Equal(nq*topk, len(searchResult.GetResults().GetScores()))
+}
+
+func (s *BulkInsertSuite) TestImportDynamicField_JSON() {
+	s.fileType = importutilv2.JSON
+	s.testImportDynamicField()
+}
+
+func (s *BulkInsertSuite) TestImportDynamicField_Numpy() {
+	s.fileType = importutilv2.Numpy
+	s.testImportDynamicField()
+}
+
+func (s *BulkInsertSuite) TestImportDynamicField_Parquet() {
+	s.fileType = importutilv2.Parquet
+	s.testImportDynamicField()
+}