diff --git a/internal/core/src/indexbuilder/index_c.cpp b/internal/core/src/indexbuilder/index_c.cpp index 28a629052c..ae319cc26d 100644 --- a/internal/core/src/indexbuilder/index_c.cpp +++ b/internal/core/src/indexbuilder/index_c.cpp @@ -84,29 +84,95 @@ CreateIndexV0(enum CDataType dtype, return status; } +milvus::storage::StorageConfig +get_storage_config(const milvus::proto::indexcgo::StorageConfig& config) { + auto storage_config = milvus::storage::StorageConfig(); + storage_config.address = std::string(config.address()); + storage_config.bucket_name = std::string(config.bucket_name()); + storage_config.access_key_id = std::string(config.access_keyid()); + storage_config.access_key_value = std::string(config.secret_access_key()); + storage_config.root_path = std::string(config.root_path()); + storage_config.storage_type = std::string(config.storage_type()); + storage_config.cloud_provider = std::string(config.cloud_provider()); + storage_config.iam_endpoint = std::string(config.iamendpoint()); + storage_config.cloud_provider = std::string(config.cloud_provider()); + storage_config.useSSL = config.usessl(); + storage_config.sslCACert = config.sslcacert(); + storage_config.useIAM = config.useiam(); + storage_config.region = config.region(); + storage_config.useVirtualHost = config.use_virtual_host(); + storage_config.requestTimeoutMs = config.request_timeout_ms(); + return storage_config; +} + +milvus::OptFieldT +get_opt_field(const ::google::protobuf::RepeatedPtrField< + milvus::proto::indexcgo::OptionalFieldInfo>& field_infos) { + milvus::OptFieldT opt_fields_map; + for (const auto& field_info : field_infos) { + auto field_id = field_info.fieldid(); + if (opt_fields_map.find(field_id) == opt_fields_map.end()) { + opt_fields_map[field_id] = { + field_info.field_name(), + static_cast(field_info.field_type()), + {}}; + } + for (const auto& str : field_info.data_paths()) { + std::get<2>(opt_fields_map[field_id]).emplace_back(str); + } + } + + return opt_fields_map; +} + +milvus::Config +get_config(std::unique_ptr& info) { + milvus::Config config; + for (auto i = 0; i < info->index_params().size(); ++i) { + const auto& param = info->index_params(i); + config[param.key()] = param.value(); + } + + for (auto i = 0; i < info->type_params().size(); ++i) { + const auto& param = info->type_params(i); + config[param.key()] = param.value(); + } + + config["insert_files"] = info->insert_files(); + if (info->opt_fields().size()) { + config["opt_fields"] = get_opt_field(info->opt_fields()); + } + + return config; +} + CStatus -CreateIndex(CIndex* res_index, CBuildIndexInfo c_build_index_info) { +CreateIndex(CIndex* res_index, + const uint8_t* serialized_build_index_info, + const uint64_t len) { try { - auto build_index_info = (BuildIndexInfo*)c_build_index_info; - auto field_type = build_index_info->field_type; + auto build_index_info = + std::make_unique(); + auto res = + build_index_info->ParseFromArray(serialized_build_index_info, len); + AssertInfo(res, "Unmarshall build index info failed"); + + auto field_type = + static_cast(build_index_info->field_schema().data_type()); milvus::index::CreateIndexInfo index_info; - index_info.field_type = build_index_info->field_type; - - auto& config = build_index_info->config; - config["insert_files"] = build_index_info->insert_files; - if (build_index_info->opt_fields.size()) { - config["opt_fields"] = build_index_info->opt_fields; - } + index_info.field_type = field_type; + auto storage_config = + get_storage_config(build_index_info->storage_config()); + auto config = get_config(build_index_info); // get index type auto index_type = milvus::index::GetValueFromConfig( config, "index_type"); AssertInfo(index_type.has_value(), "index type is empty"); index_info.index_type = index_type.value(); - auto engine_version = build_index_info->index_engine_version; - + auto engine_version = build_index_info->current_index_version(); index_info.index_engine_version = engine_version; config[milvus::index::INDEX_ENGINE_VERSION] = std::to_string(engine_version); @@ -121,24 +187,30 @@ CreateIndex(CIndex* res_index, CBuildIndexInfo c_build_index_info) { // init file manager milvus::storage::FieldDataMeta field_meta{ - build_index_info->collection_id, - build_index_info->partition_id, - build_index_info->segment_id, - build_index_info->field_id}; + build_index_info->collectionid(), + build_index_info->partitionid(), + build_index_info->segmentid(), + build_index_info->field_schema().fieldid()}; - milvus::storage::IndexMeta index_meta{build_index_info->segment_id, - build_index_info->field_id, - build_index_info->index_build_id, - build_index_info->index_version}; - auto chunk_manager = milvus::storage::CreateChunkManager( - build_index_info->storage_config); + milvus::storage::IndexMeta index_meta{ + build_index_info->segmentid(), + build_index_info->field_schema().fieldid(), + build_index_info->buildid(), + build_index_info->index_version(), + "", + build_index_info->field_schema().name(), + field_type, + build_index_info->dim(), + }; + auto chunk_manager = + milvus::storage::CreateChunkManager(storage_config); milvus::storage::FileManagerContext fileManagerContext( field_meta, index_meta, chunk_manager); auto index = milvus::indexbuilder::IndexFactory::GetInstance().CreateIndex( - build_index_info->field_type, config, fileManagerContext); + field_type, config, fileManagerContext); index->Build(); *res_index = index.release(); auto status = CStatus(); @@ -159,22 +231,32 @@ CreateIndex(CIndex* res_index, CBuildIndexInfo c_build_index_info) { } CStatus -CreateIndexV2(CIndex* res_index, CBuildIndexInfo c_build_index_info) { +CreateIndexV2(CIndex* res_index, + const uint8_t* serialized_build_index_info, + const uint64_t len) { try { - auto build_index_info = (BuildIndexInfo*)c_build_index_info; - auto field_type = build_index_info->field_type; - milvus::index::CreateIndexInfo index_info; - index_info.field_type = build_index_info->field_type; - index_info.dim = build_index_info->dim; + auto build_index_info = + std::make_unique(); + auto res = + build_index_info->ParseFromArray(serialized_build_index_info, len); + AssertInfo(res, "Unmarshall build index info failed"); + auto field_type = + static_cast(build_index_info->field_schema().data_type()); - auto& config = build_index_info->config; + milvus::index::CreateIndexInfo index_info; + index_info.field_type = field_type; + index_info.dim = build_index_info->dim(); + + auto storage_config = + get_storage_config(build_index_info->storage_config()); + auto config = get_config(build_index_info); // get index type auto index_type = milvus::index::GetValueFromConfig( config, "index_type"); AssertInfo(index_type.has_value(), "index type is empty"); index_info.index_type = index_type.value(); - auto engine_version = build_index_info->index_engine_version; + auto engine_version = build_index_info->current_index_version(); index_info.index_engine_version = engine_version; config[milvus::index::INDEX_ENGINE_VERSION] = std::to_string(engine_version); @@ -188,39 +270,39 @@ CreateIndexV2(CIndex* res_index, CBuildIndexInfo c_build_index_info) { } milvus::storage::FieldDataMeta field_meta{ - build_index_info->collection_id, - build_index_info->partition_id, - build_index_info->segment_id, - build_index_info->field_id}; + build_index_info->collectionid(), + build_index_info->partitionid(), + build_index_info->segmentid(), + build_index_info->field_schema().fieldid()}; milvus::storage::IndexMeta index_meta{ - build_index_info->segment_id, - build_index_info->field_id, - build_index_info->index_build_id, - build_index_info->index_version, - build_index_info->field_name, + build_index_info->segmentid(), + build_index_info->field_schema().fieldid(), + build_index_info->buildid(), + build_index_info->index_version(), "", - build_index_info->field_type, - build_index_info->dim, + build_index_info->field_schema().name(), + field_type, + build_index_info->dim(), }; auto store_space = milvus_storage::Space::Open( - build_index_info->data_store_path, + build_index_info->store_path(), milvus_storage::Options{nullptr, - build_index_info->data_store_version}); + build_index_info->store_version()}); AssertInfo(store_space.ok() && store_space.has_value(), "create space failed: {}", store_space.status().ToString()); auto index_space = milvus_storage::Space::Open( - build_index_info->index_store_path, + build_index_info->index_store_path(), milvus_storage::Options{.schema = store_space.value()->schema()}); AssertInfo(index_space.ok() && index_space.has_value(), "create space failed: {}", index_space.status().ToString()); LOG_INFO("init space success"); - auto chunk_manager = milvus::storage::CreateChunkManager( - build_index_info->storage_config); + auto chunk_manager = + milvus::storage::CreateChunkManager(storage_config); milvus::storage::FileManagerContext fileManagerContext( field_meta, index_meta, @@ -229,9 +311,9 @@ CreateIndexV2(CIndex* res_index, CBuildIndexInfo c_build_index_info) { auto index = milvus::indexbuilder::IndexFactory::GetInstance().CreateIndex( - build_index_info->field_type, - build_index_info->field_name, - build_index_info->dim, + field_type, + build_index_info->field_schema().name(), + build_index_info->dim(), config, fileManagerContext, std::move(store_space.value())); diff --git a/internal/core/src/indexbuilder/index_c.h b/internal/core/src/indexbuilder/index_c.h index 16cd76e453..53ce5552fe 100644 --- a/internal/core/src/indexbuilder/index_c.h +++ b/internal/core/src/indexbuilder/index_c.h @@ -28,7 +28,9 @@ CreateIndexV0(enum CDataType dtype, CIndex* res_index); CStatus -CreateIndex(CIndex* res_index, CBuildIndexInfo c_build_index_info); +CreateIndex(CIndex* res_index, + const uint8_t* serialized_build_index_info, + const uint64_t len); CStatus DeleteIndex(CIndex index); @@ -130,7 +132,9 @@ CStatus SerializeIndexAndUpLoadV2(CIndex index, CBinarySet* c_binary_set); CStatus -CreateIndexV2(CIndex* res_index, CBuildIndexInfo c_build_index_info); +CreateIndexV2(CIndex* res_index, + const uint8_t* serialized_build_index_info, + const uint64_t len); CStatus AppendIndexStorageInfo(CBuildIndexInfo c_build_index_info, diff --git a/internal/core/unittest/test_index_wrapper.cpp b/internal/core/unittest/test_index_wrapper.cpp index 39f6841957..79581bc969 100644 --- a/internal/core/unittest/test_index_wrapper.cpp +++ b/internal/core/unittest/test_index_wrapper.cpp @@ -23,7 +23,7 @@ using namespace milvus; using namespace milvus::segcore; -using namespace milvus::proto::indexcgo; +using namespace milvus::proto; using Param = std::pair; diff --git a/internal/datacoord/index_builder.go b/internal/datacoord/index_builder.go index c565545613..9a83f2384c 100644 --- a/internal/datacoord/index_builder.go +++ b/internal/datacoord/index_builder.go @@ -348,28 +348,29 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { } } var req *indexpb.CreateJobRequest + collectionInfo, err := ib.handler.GetCollection(ib.ctx, segment.GetCollectionID()) + if err != nil { + log.Ctx(ib.ctx).Info("index builder get collection info failed", zap.Int64("collectionID", segment.GetCollectionID()), zap.Error(err)) + return false + } + + schema := collectionInfo.Schema + var field *schemapb.FieldSchema + + for _, f := range schema.Fields { + if f.FieldID == fieldID { + field = f + break + } + } + + dim, err := storage.GetDimFromParams(field.TypeParams) + if err != nil { + log.Ctx(ib.ctx).Warn("failed to get dim from field type params", + zap.String("field type", field.GetDataType().String()), zap.Error(err)) + // don't return, maybe field is scalar field or sparseFloatVector + } if Params.CommonCfg.EnableStorageV2.GetAsBool() { - collectionInfo, err := ib.handler.GetCollection(ib.ctx, segment.GetCollectionID()) - if err != nil { - log.Info("index builder get collection info failed", zap.Int64("collectionID", segment.GetCollectionID()), zap.Error(err)) - return false - } - - schema := collectionInfo.Schema - var field *schemapb.FieldSchema - - for _, f := range schema.Fields { - if f.FieldID == fieldID { - field = f - break - } - } - - dim, err := storage.GetDimFromParams(field.TypeParams) - if err != nil { - return false - } - storePath, err := itypeutil.GetStorageURI(params.Params.CommonCfg.StorageScheme.GetValue(), params.Params.CommonCfg.StoragePathPrefix.GetValue(), segment.GetID()) if err != nil { log.Ctx(ib.ctx).Warn("failed to get storage uri", zap.Error(err)) @@ -403,6 +404,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(), DataIds: binlogIDs, OptionalScalarFields: optionalFields, + Field: field, } } else { req = &indexpb.CreateJobRequest{ @@ -421,6 +423,8 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { SegmentID: segment.GetID(), FieldID: fieldID, OptionalScalarFields: optionalFields, + Dim: int64(dim), + Field: field, } } diff --git a/internal/datacoord/index_builder_test.go b/internal/datacoord/index_builder_test.go index 46d8c7fe3f..9488c70f5e 100644 --- a/internal/datacoord/index_builder_test.go +++ b/internal/datacoord/index_builder_test.go @@ -675,7 +675,30 @@ func TestIndexBuilder(t *testing.T) { chunkManager := &mocks.ChunkManager{} chunkManager.EXPECT().RootPath().Return("root") - ib := newIndexBuilder(ctx, mt, nodeManager, chunkManager, newIndexEngineVersionManager(), nil) + handler := NewNMockHandler(t) + handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{ + ID: collID, + Schema: &schemapb.CollectionSchema{ + Name: "coll", + Fields: []*schemapb.FieldSchema{ + { + FieldID: fieldID, + Name: "vec", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "128", + }, + }, + }, + }, + EnableDynamicField: false, + Properties: nil, + }, + }, nil) + + ib := newIndexBuilder(ctx, mt, nodeManager, chunkManager, newIndexEngineVersionManager(), handler) assert.Equal(t, 6, len(ib.tasks)) assert.Equal(t, indexTaskInit, ib.tasks[buildID]) @@ -741,6 +764,30 @@ func TestIndexBuilder_Error(t *testing.T) { chunkManager := &mocks.ChunkManager{} chunkManager.EXPECT().RootPath().Return("root") + + handler := NewNMockHandler(t) + handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{ + ID: collID, + Schema: &schemapb.CollectionSchema{ + Name: "coll", + Fields: []*schemapb.FieldSchema{ + { + FieldID: fieldID, + Name: "vec", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "128", + }, + }, + }, + }, + EnableDynamicField: false, + Properties: nil, + }, + }, nil) + ib := &indexBuilder{ ctx: context.Background(), tasks: map[int64]indexTaskState{ @@ -749,6 +796,7 @@ func TestIndexBuilder_Error(t *testing.T) { meta: createMetaTable(ec), chunkManager: chunkManager, indexEngineVersionManager: newIndexEngineVersionManager(), + handler: handler, } t.Run("meta not exist", func(t *testing.T) { @@ -1414,9 +1462,32 @@ func TestVecIndexWithOptionalScalarField(t *testing.T) { mt.collections[collID].Schema.Fields[1].DataType = schemapb.DataType_VarChar } + handler := NewNMockHandler(t) + handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{ + ID: collID, + Schema: &schemapb.CollectionSchema{ + Name: "coll", + Fields: []*schemapb.FieldSchema{ + { + FieldID: fieldID, + Name: "vec", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "128", + }, + }, + }, + }, + EnableDynamicField: false, + Properties: nil, + }, + }, nil) + paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false") - ib := newIndexBuilder(ctx, &mt, nodeManager, cm, newIndexEngineVersionManager(), nil) + ib := newIndexBuilder(ctx, &mt, nodeManager, cm, newIndexEngineVersionManager(), handler) t.Run("success to get opt field on startup", func(t *testing.T) { ic.EXPECT().CreateJob(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn( diff --git a/internal/indexnode/indexnode_service.go b/internal/indexnode/indexnode_service.go index a690e35e4a..fb9d5a0cc1 100644 --- a/internal/indexnode/indexnode_service.go +++ b/internal/indexnode/indexnode_service.go @@ -55,6 +55,8 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest defer i.lifetime.Done() log.Info("IndexNode building index ...", zap.Int64("collectionID", req.GetCollectionID()), + zap.Int64("partitionID", req.GetPartitionID()), + zap.Int64("segmentID", req.GetSegmentID()), zap.Int64("indexID", req.GetIndexID()), zap.String("indexName", req.GetIndexName()), zap.String("indexFilePrefix", req.GetIndexFilePrefix()), diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index b14343900d..54c8b3fe45 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -18,7 +18,6 @@ package indexnode import ( "context" - "encoding/json" "fmt" "runtime/debug" "strconv" @@ -30,6 +29,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proto/indexcgopb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/indexcgowrapper" @@ -84,12 +84,21 @@ type indexBuildTaskV2 struct { } func (it *indexBuildTaskV2) parseParams(ctx context.Context) error { - it.collectionID = it.req.CollectionID - it.partitionID = it.req.PartitionID - it.segmentID = it.req.SegmentID - it.fieldType = it.req.FieldType - it.fieldID = it.req.FieldID - it.fieldName = it.req.FieldName + it.collectionID = it.req.GetCollectionID() + it.partitionID = it.req.GetPartitionID() + it.segmentID = it.req.GetSegmentID() + it.fieldType = it.req.GetFieldType() + if it.fieldType == schemapb.DataType_None { + it.fieldType = it.req.GetField().GetDataType() + } + it.fieldID = it.req.GetFieldID() + if it.fieldID == 0 { + it.fieldID = it.req.GetField().GetFieldID() + } + it.fieldName = it.req.GetFieldName() + if it.fieldName == "" { + it.fieldName = it.req.GetField().GetName() + } return nil } @@ -138,61 +147,66 @@ func (it *indexBuildTaskV2) BuildIndex(ctx context.Context) error { } } - var buildIndexInfo *indexcgowrapper.BuildIndexInfo - buildIndexInfo, err = indexcgowrapper.NewBuildIndexInfo(it.req.GetStorageConfig()) - defer indexcgowrapper.DeleteBuildIndexInfo(buildIndexInfo) - if err != nil { - log.Ctx(ctx).Warn("create build index info failed", zap.Error(err)) - return err - } - err = buildIndexInfo.AppendFieldMetaInfoV2(it.collectionID, it.partitionID, it.segmentID, it.fieldID, it.fieldType, it.fieldName, it.req.Dim) - if err != nil { - log.Ctx(ctx).Warn("append field meta failed", zap.Error(err)) - return err - } - - err = buildIndexInfo.AppendIndexMetaInfo(it.req.IndexID, it.req.BuildID, it.req.IndexVersion) - if err != nil { - log.Ctx(ctx).Warn("append index meta failed", zap.Error(err)) - return err - } - - err = buildIndexInfo.AppendBuildIndexParam(it.newIndexParams) - if err != nil { - log.Ctx(ctx).Warn("append index params failed", zap.Error(err)) - return err - } - - err = buildIndexInfo.AppendIndexStorageInfo(it.req.StorePath, it.req.IndexStorePath, it.req.StoreVersion) - if err != nil { - log.Ctx(ctx).Warn("append storage info failed", zap.Error(err)) - return err - } - - jsonIndexParams, err := json.Marshal(it.newIndexParams) - if err != nil { - log.Ctx(ctx).Error("failed to json marshal index params", zap.Error(err)) - return err - } - - log.Ctx(ctx).Info("index params are ready", - zap.Int64("buildID", it.BuildID), - zap.String("index params", string(jsonIndexParams))) - - err = buildIndexInfo.AppendBuildTypeParam(it.newTypeParams) - if err != nil { - log.Ctx(ctx).Warn("append type params failed", zap.Error(err)) - return err + storageConfig := &indexcgopb.StorageConfig{ + Address: it.req.GetStorageConfig().GetAddress(), + AccessKeyID: it.req.GetStorageConfig().GetAccessKeyID(), + SecretAccessKey: it.req.GetStorageConfig().GetSecretAccessKey(), + UseSSL: it.req.GetStorageConfig().GetUseSSL(), + BucketName: it.req.GetStorageConfig().GetBucketName(), + RootPath: it.req.GetStorageConfig().GetRootPath(), + UseIAM: it.req.GetStorageConfig().GetUseIAM(), + IAMEndpoint: it.req.GetStorageConfig().GetIAMEndpoint(), + StorageType: it.req.GetStorageConfig().GetStorageType(), + UseVirtualHost: it.req.GetStorageConfig().GetUseVirtualHost(), + Region: it.req.GetStorageConfig().GetRegion(), + CloudProvider: it.req.GetStorageConfig().GetCloudProvider(), + RequestTimeoutMs: it.req.GetStorageConfig().GetRequestTimeoutMs(), + SslCACert: it.req.GetStorageConfig().GetSslCACert(), } + optFields := make([]*indexcgopb.OptionalFieldInfo, 0, len(it.req.GetOptionalScalarFields())) for _, optField := range it.req.GetOptionalScalarFields() { - if err := buildIndexInfo.AppendOptionalField(optField); err != nil { - log.Ctx(ctx).Warn("append optional field failed", zap.Error(err)) - return err + optFields = append(optFields, &indexcgopb.OptionalFieldInfo{ + FieldID: optField.GetFieldID(), + FieldName: optField.GetFieldName(), + FieldType: optField.GetFieldType(), + DataPaths: optField.GetDataPaths(), + }) + } + + it.currentIndexVersion = getCurrentIndexVersion(it.req.GetCurrentIndexVersion()) + field := it.req.GetField() + if field == nil || field.GetDataType() == schemapb.DataType_None { + field = &schemapb.FieldSchema{ + FieldID: it.fieldID, + Name: it.fieldName, + DataType: it.fieldType, } } - it.index, err = indexcgowrapper.CreateIndexV2(ctx, buildIndexInfo) + buildIndexParams := &indexcgopb.BuildIndexInfo{ + ClusterID: it.ClusterID, + BuildID: it.BuildID, + CollectionID: it.collectionID, + PartitionID: it.partitionID, + SegmentID: it.segmentID, + IndexVersion: it.req.GetIndexVersion(), + CurrentIndexVersion: it.currentIndexVersion, + NumRows: it.req.GetNumRows(), + Dim: it.req.GetDim(), + IndexFilePrefix: it.req.GetIndexFilePrefix(), + InsertFiles: it.req.GetDataPaths(), + FieldSchema: field, + StorageConfig: storageConfig, + IndexParams: mapToKVPairs(it.newIndexParams), + TypeParams: mapToKVPairs(it.newTypeParams), + StorePath: it.req.GetStorePath(), + StoreVersion: it.req.GetStoreVersion(), + IndexStorePath: it.req.GetIndexStorePath(), + OptFields: optFields, + } + + it.index, err = indexcgowrapper.CreateIndexV2(ctx, buildIndexParams) if err != nil { if it.index != nil && it.index.CleanLocalData() != nil { log.Ctx(ctx).Error("failed to clean cached data on disk after build index failed", @@ -328,7 +342,7 @@ func (it *indexBuildTask) Prepare(ctx context.Context) error { if len(it.req.DataPaths) == 0 { for _, id := range it.req.GetDataIds() { - path := metautil.BuildInsertLogPath(it.req.GetStorageConfig().RootPath, it.req.GetCollectionID(), it.req.GetPartitionID(), it.req.GetSegmentID(), it.req.GetFieldID(), id) + path := metautil.BuildInsertLogPath(it.req.GetStorageConfig().RootPath, it.req.GetCollectionID(), it.req.GetPartitionID(), it.req.GetSegmentID(), it.req.GetField().GetFieldID(), id) it.req.DataPaths = append(it.req.DataPaths, path) } } @@ -362,16 +376,10 @@ func (it *indexBuildTask) Prepare(ctx context.Context) error { } it.newTypeParams = typeParams it.newIndexParams = indexParams + it.statistic.IndexParams = it.req.GetIndexParams() - // ugly codes to get dimension - if dimStr, ok := typeParams[common.DimKey]; ok { - var err error - it.statistic.Dim, err = strconv.ParseInt(dimStr, 10, 64) - if err != nil { - log.Ctx(ctx).Error("parse dimesion failed", zap.Error(err)) - // ignore error - } - } + it.statistic.Dim = it.req.GetDim() + log.Ctx(ctx).Info("Successfully prepare indexBuildTask", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID)) return nil @@ -482,69 +490,65 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error { } } - var buildIndexInfo *indexcgowrapper.BuildIndexInfo - buildIndexInfo, err = indexcgowrapper.NewBuildIndexInfo(it.req.GetStorageConfig()) - defer indexcgowrapper.DeleteBuildIndexInfo(buildIndexInfo) - if err != nil { - log.Ctx(ctx).Warn("create build index info failed", zap.Error(err)) - return err - } - err = buildIndexInfo.AppendFieldMetaInfo(it.collectionID, it.partitionID, it.segmentID, it.fieldID, it.fieldType) - if err != nil { - log.Ctx(ctx).Warn("append field meta failed", zap.Error(err)) - return err + storageConfig := &indexcgopb.StorageConfig{ + Address: it.req.GetStorageConfig().GetAddress(), + AccessKeyID: it.req.GetStorageConfig().GetAccessKeyID(), + SecretAccessKey: it.req.GetStorageConfig().GetSecretAccessKey(), + UseSSL: it.req.GetStorageConfig().GetUseSSL(), + BucketName: it.req.GetStorageConfig().GetBucketName(), + RootPath: it.req.GetStorageConfig().GetRootPath(), + UseIAM: it.req.GetStorageConfig().GetUseIAM(), + IAMEndpoint: it.req.GetStorageConfig().GetIAMEndpoint(), + StorageType: it.req.GetStorageConfig().GetStorageType(), + UseVirtualHost: it.req.GetStorageConfig().GetUseVirtualHost(), + Region: it.req.GetStorageConfig().GetRegion(), + CloudProvider: it.req.GetStorageConfig().GetCloudProvider(), + RequestTimeoutMs: it.req.GetStorageConfig().GetRequestTimeoutMs(), + SslCACert: it.req.GetStorageConfig().GetSslCACert(), } - err = buildIndexInfo.AppendIndexMetaInfo(it.req.IndexID, it.req.BuildID, it.req.IndexVersion) - if err != nil { - log.Ctx(ctx).Warn("append index meta failed", zap.Error(err)) - return err - } - - err = buildIndexInfo.AppendBuildIndexParam(it.newIndexParams) - if err != nil { - log.Ctx(ctx).Warn("append index params failed", zap.Error(err)) - return err - } - - jsonIndexParams, err := json.Marshal(it.newIndexParams) - if err != nil { - log.Ctx(ctx).Error("failed to json marshal index params", zap.Error(err)) - return err - } - - log.Ctx(ctx).Info("index params are ready", - zap.Int64("buildID", it.BuildID), - zap.String("index params", string(jsonIndexParams))) - - err = buildIndexInfo.AppendBuildTypeParam(it.newTypeParams) - if err != nil { - log.Ctx(ctx).Warn("append type params failed", zap.Error(err)) - return err - } - - for _, path := range it.req.GetDataPaths() { - err = buildIndexInfo.AppendInsertFile(path) - if err != nil { - log.Ctx(ctx).Warn("append insert binlog path failed", zap.Error(err)) - return err - } + optFields := make([]*indexcgopb.OptionalFieldInfo, 0, len(it.req.GetOptionalScalarFields())) + for _, optField := range it.req.GetOptionalScalarFields() { + optFields = append(optFields, &indexcgopb.OptionalFieldInfo{ + FieldID: optField.GetFieldID(), + FieldName: optField.GetFieldName(), + FieldType: optField.GetFieldType(), + DataPaths: optField.GetDataPaths(), + }) } it.currentIndexVersion = getCurrentIndexVersion(it.req.GetCurrentIndexVersion()) - if err := buildIndexInfo.AppendIndexEngineVersion(it.currentIndexVersion); err != nil { - log.Ctx(ctx).Warn("append index engine version failed", zap.Error(err)) - return err - } - - for _, optField := range it.req.GetOptionalScalarFields() { - if err := buildIndexInfo.AppendOptionalField(optField); err != nil { - log.Ctx(ctx).Warn("append optional field failed", zap.Error(err)) - return err + field := it.req.GetField() + if field == nil || field.GetDataType() == schemapb.DataType_None { + field = &schemapb.FieldSchema{ + FieldID: it.fieldID, + Name: it.fieldName, + DataType: it.fieldType, } } + buildIndexParams := &indexcgopb.BuildIndexInfo{ + ClusterID: it.ClusterID, + BuildID: it.BuildID, + CollectionID: it.collectionID, + PartitionID: it.partitionID, + SegmentID: it.segmentID, + IndexVersion: it.req.GetIndexVersion(), + CurrentIndexVersion: it.currentIndexVersion, + NumRows: it.req.GetNumRows(), + Dim: it.req.GetDim(), + IndexFilePrefix: it.req.GetIndexFilePrefix(), + InsertFiles: it.req.GetDataPaths(), + FieldSchema: field, + StorageConfig: storageConfig, + IndexParams: mapToKVPairs(it.newIndexParams), + TypeParams: mapToKVPairs(it.newTypeParams), + StorePath: it.req.GetStorePath(), + StoreVersion: it.req.GetStoreVersion(), + IndexStorePath: it.req.GetIndexStorePath(), + OptFields: optFields, + } - it.index, err = indexcgowrapper.CreateIndex(ctx, buildIndexInfo) + it.index, err = indexcgowrapper.CreateIndex(ctx, buildIndexParams) if err != nil { if it.index != nil && it.index.CleanLocalData() != nil { log.Ctx(ctx).Error("failed to clean cached data on disk after build index failed", @@ -653,8 +657,6 @@ func (it *indexBuildTask) decodeBlobs(ctx context.Context, blobs []*storage.Blob deserializeDur := it.tr.RecordSpan() log.Ctx(ctx).Info("IndexNode deserialize data success", - zap.Int64("index id", it.req.IndexID), - zap.String("index name", it.req.IndexName), zap.Int64("collectionID", it.collectionID), zap.Int64("partitionID", it.partitionID), zap.Int64("segmentID", it.segmentID), diff --git a/internal/indexnode/task_test.go b/internal/indexnode/task_test.go index dc30abd800..6450c3e504 100644 --- a/internal/indexnode/task_test.go +++ b/internal/indexnode/task_test.go @@ -283,12 +283,14 @@ func (suite *IndexBuildTaskV2Suite) TestBuildIndex() { RootPath: "/tmp/milvus/data", StorageType: "local", }, - CollectionID: 1, - PartitionID: 1, - SegmentID: 1, - FieldID: 3, - FieldName: "vec", - FieldType: schemapb.DataType_FloatVector, + CollectionID: 1, + PartitionID: 1, + SegmentID: 1, + Field: &schemapb.FieldSchema{ + FieldID: 3, + Name: "vec", + DataType: schemapb.DataType_FloatVector, + }, StorePath: "file://" + suite.space.Path(), StoreVersion: suite.space.GetCurrentVersion(), IndexStorePath: "file://" + suite.space.Path(), diff --git a/internal/indexnode/util.go b/internal/indexnode/util.go index 9186f9855a..8aaa929105 100644 --- a/internal/indexnode/util.go +++ b/internal/indexnode/util.go @@ -19,6 +19,7 @@ package indexnode import ( "github.com/cockroachdb/errors" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" ) @@ -36,3 +37,14 @@ func estimateFieldDataSize(dim int64, numRows int64, dataType schemapb.DataType) return 0, nil } } + +func mapToKVPairs(m map[string]string) []*commonpb.KeyValuePair { + kvs := make([]*commonpb.KeyValuePair, 0, len(m)) + for k, v := range m { + kvs = append(kvs, &commonpb.KeyValuePair{ + Key: k, + Value: v, + }) + } + return kvs +} diff --git a/internal/indexnode/util_test.go b/internal/indexnode/util_test.go new file mode 100644 index 0000000000..6d7d98e823 --- /dev/null +++ b/internal/indexnode/util_test.go @@ -0,0 +1,41 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package indexnode + +import ( + "testing" + + "github.com/stretchr/testify/suite" +) + +type utilSuite struct { + suite.Suite +} + +func (s *utilSuite) Test_mapToKVPairs() { + indexParams := map[string]string{ + "index_type": "IVF_FLAT", + "dim": "128", + "nlist": "1024", + } + + s.Equal(3, len(mapToKVPairs(indexParams))) +} + +func Test_utilSuite(t *testing.T) { + suite.Run(t, new(utilSuite)) +} diff --git a/internal/proto/index_cgo_msg.proto b/internal/proto/index_cgo_msg.proto index 50b1ea5dde..688f871f55 100644 --- a/internal/proto/index_cgo_msg.proto +++ b/internal/proto/index_cgo_msg.proto @@ -4,6 +4,7 @@ package milvus.proto.indexcgo; option go_package="github.com/milvus-io/milvus/internal/proto/indexcgopb"; import "common.proto"; +import "schema.proto"; message TypeParams { repeated common.KeyValuePair params = 1; @@ -30,3 +31,52 @@ message Binary { message BinarySet { repeated Binary datas = 1; } + +// Synchronously modify StorageConfig in index_coord.proto file +message StorageConfig { + string address = 1; + string access_keyID = 2; + string secret_access_key = 3; + bool useSSL = 4; + string bucket_name = 5; + string root_path = 6; + bool useIAM = 7; + string IAMEndpoint = 8; + string storage_type = 9; + bool use_virtual_host = 10; + string region = 11; + string cloud_provider = 12; + int64 request_timeout_ms = 13; + string sslCACert = 14; +} + +// Synchronously modify OptionalFieldInfo in index_coord.proto file +message OptionalFieldInfo { + int64 fieldID = 1; + string field_name = 2; + int32 field_type = 3; + repeated string data_paths = 4; +} + +message BuildIndexInfo { + string clusterID = 1; + int64 buildID = 2; + int64 collectionID = 3; + int64 partitionID = 4; + int64 segmentID = 5; + int64 index_version = 6; + int32 current_index_version = 7; + int64 num_rows = 8; + int64 dim = 9; + string index_file_prefix = 10; + repeated string insert_files = 11; +// repeated int64 data_ids = 12; + schema.FieldSchema field_schema = 12; + StorageConfig storage_config = 13; + repeated common.KeyValuePair index_params = 14; + repeated common.KeyValuePair type_params = 15; + string store_path = 16; + int64 store_version = 17; + string index_store_path = 18; + repeated OptionalFieldInfo opt_fields = 19; +} diff --git a/internal/proto/index_coord.proto b/internal/proto/index_coord.proto index d59452b17d..9204d7da2a 100644 --- a/internal/proto/index_coord.proto +++ b/internal/proto/index_coord.proto @@ -8,6 +8,7 @@ import "common.proto"; import "internal.proto"; import "milvus.proto"; import "schema.proto"; +import "index_cgo_msg.proto"; service IndexCoord { rpc GetComponentStates(milvus.GetComponentStatesRequest) returns (milvus.ComponentStates) {} @@ -226,6 +227,7 @@ message GetIndexBuildProgressResponse { int64 pending_index_rows = 4; } +// Synchronously modify StorageConfig in index_cgo_msg.proto file message StorageConfig { string address = 1; string access_keyID = 2; @@ -243,6 +245,7 @@ message StorageConfig { string sslCACert = 14; } +// Synchronously modify OptionalFieldInfo in index_cgo_msg.proto file message OptionalFieldInfo { int64 fieldID = 1; string field_name = 2; @@ -276,6 +279,7 @@ message CreateJobRequest { int64 dim = 22; repeated int64 data_ids = 23; repeated OptionalFieldInfo optional_scalar_fields = 24; + schema.FieldSchema field = 25; } message QueryJobsRequest { diff --git a/internal/util/indexcgowrapper/index.go b/internal/util/indexcgowrapper/index.go index f0850b3b91..a7cc7d0e9b 100644 --- a/internal/util/indexcgowrapper/index.go +++ b/internal/util/indexcgowrapper/index.go @@ -16,6 +16,7 @@ import ( "unsafe" "github.com/golang/protobuf/proto" + "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -94,9 +95,17 @@ func NewCgoIndex(dtype schemapb.DataType, typeParams, indexParams map[string]str return index, nil } -func CreateIndex(ctx context.Context, buildIndexInfo *BuildIndexInfo) (CodecIndex, error) { +func CreateIndex(ctx context.Context, buildIndexInfo *indexcgopb.BuildIndexInfo) (CodecIndex, error) { + buildIndexInfoBlob, err := proto.Marshal(buildIndexInfo) + if err != nil { + log.Ctx(ctx).Warn("marshal buildIndexInfo failed", + zap.String("clusterID", buildIndexInfo.GetClusterID()), + zap.Int64("buildID", buildIndexInfo.GetBuildID()), + zap.Error(err)) + return nil, err + } var indexPtr C.CIndex - status := C.CreateIndex(&indexPtr, buildIndexInfo.cBuildIndexInfo) + status := C.CreateIndex(&indexPtr, (*C.uint8_t)(unsafe.Pointer(&buildIndexInfoBlob[0])), (C.uint64_t)(len(buildIndexInfoBlob))) if err := HandleCStatus(&status, "failed to create index"); err != nil { return nil, err } @@ -109,9 +118,17 @@ func CreateIndex(ctx context.Context, buildIndexInfo *BuildIndexInfo) (CodecInde return index, nil } -func CreateIndexV2(ctx context.Context, buildIndexInfo *BuildIndexInfo) (CodecIndex, error) { +func CreateIndexV2(ctx context.Context, buildIndexInfo *indexcgopb.BuildIndexInfo) (CodecIndex, error) { + buildIndexInfoBlob, err := proto.Marshal(buildIndexInfo) + if err != nil { + log.Ctx(ctx).Warn("marshal buildIndexInfo failed", + zap.String("clusterID", buildIndexInfo.GetClusterID()), + zap.Int64("buildID", buildIndexInfo.GetBuildID()), + zap.Error(err)) + return nil, err + } var indexPtr C.CIndex - status := C.CreateIndexV2(&indexPtr, buildIndexInfo.cBuildIndexInfo) + status := C.CreateIndexV2(&indexPtr, (*C.uint8_t)(unsafe.Pointer(&buildIndexInfoBlob[0])), (C.uint64_t)(len(buildIndexInfoBlob))) if err := HandleCStatus(&status, "failed to create index"); err != nil { return nil, err }