enhance: Use proto for passing info in cgo (#33184)

issue: #33183

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
pull/33301/head
cai.zhang 2024-05-23 10:31:40 +08:00 committed by GitHub
parent 22bddde5ff
commit be77ceba84
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 500 additions and 209 deletions

View File

@ -84,29 +84,95 @@ CreateIndexV0(enum CDataType dtype,
return status;
}
CStatus
CreateIndex(CIndex* res_index, CBuildIndexInfo c_build_index_info) {
try {
auto build_index_info = (BuildIndexInfo*)c_build_index_info;
auto field_type = build_index_info->field_type;
milvus::storage::StorageConfig
get_storage_config(const milvus::proto::indexcgo::StorageConfig& config) {
auto storage_config = milvus::storage::StorageConfig();
storage_config.address = std::string(config.address());
storage_config.bucket_name = std::string(config.bucket_name());
storage_config.access_key_id = std::string(config.access_keyid());
storage_config.access_key_value = std::string(config.secret_access_key());
storage_config.root_path = std::string(config.root_path());
storage_config.storage_type = std::string(config.storage_type());
storage_config.cloud_provider = std::string(config.cloud_provider());
storage_config.iam_endpoint = std::string(config.iamendpoint());
storage_config.cloud_provider = std::string(config.cloud_provider());
storage_config.useSSL = config.usessl();
storage_config.sslCACert = config.sslcacert();
storage_config.useIAM = config.useiam();
storage_config.region = config.region();
storage_config.useVirtualHost = config.use_virtual_host();
storage_config.requestTimeoutMs = config.request_timeout_ms();
return storage_config;
}
milvus::index::CreateIndexInfo index_info;
index_info.field_type = build_index_info->field_type;
auto& config = build_index_info->config;
config["insert_files"] = build_index_info->insert_files;
if (build_index_info->opt_fields.size()) {
config["opt_fields"] = build_index_info->opt_fields;
milvus::OptFieldT
get_opt_field(const ::google::protobuf::RepeatedPtrField<
milvus::proto::indexcgo::OptionalFieldInfo>& field_infos) {
milvus::OptFieldT opt_fields_map;
for (const auto& field_info : field_infos) {
auto field_id = field_info.fieldid();
if (opt_fields_map.find(field_id) == opt_fields_map.end()) {
opt_fields_map[field_id] = {
field_info.field_name(),
static_cast<milvus::DataType>(field_info.field_type()),
{}};
}
for (const auto& str : field_info.data_paths()) {
std::get<2>(opt_fields_map[field_id]).emplace_back(str);
}
}
return opt_fields_map;
}
milvus::Config
get_config(std::unique_ptr<milvus::proto::indexcgo::BuildIndexInfo>& info) {
milvus::Config config;
for (auto i = 0; i < info->index_params().size(); ++i) {
const auto& param = info->index_params(i);
config[param.key()] = param.value();
}
for (auto i = 0; i < info->type_params().size(); ++i) {
const auto& param = info->type_params(i);
config[param.key()] = param.value();
}
config["insert_files"] = info->insert_files();
if (info->opt_fields().size()) {
config["opt_fields"] = get_opt_field(info->opt_fields());
}
return config;
}
CStatus
CreateIndex(CIndex* res_index,
const uint8_t* serialized_build_index_info,
const uint64_t len) {
try {
auto build_index_info =
std::make_unique<milvus::proto::indexcgo::BuildIndexInfo>();
auto res =
build_index_info->ParseFromArray(serialized_build_index_info, len);
AssertInfo(res, "Unmarshall build index info failed");
auto field_type =
static_cast<DataType>(build_index_info->field_schema().data_type());
milvus::index::CreateIndexInfo index_info;
index_info.field_type = field_type;
auto storage_config =
get_storage_config(build_index_info->storage_config());
auto config = get_config(build_index_info);
// get index type
auto index_type = milvus::index::GetValueFromConfig<std::string>(
config, "index_type");
AssertInfo(index_type.has_value(), "index type is empty");
index_info.index_type = index_type.value();
auto engine_version = build_index_info->index_engine_version;
auto engine_version = build_index_info->current_index_version();
index_info.index_engine_version = engine_version;
config[milvus::index::INDEX_ENGINE_VERSION] =
std::to_string(engine_version);
@ -121,24 +187,30 @@ CreateIndex(CIndex* res_index, CBuildIndexInfo c_build_index_info) {
// init file manager
milvus::storage::FieldDataMeta field_meta{
build_index_info->collection_id,
build_index_info->partition_id,
build_index_info->segment_id,
build_index_info->field_id};
build_index_info->collectionid(),
build_index_info->partitionid(),
build_index_info->segmentid(),
build_index_info->field_schema().fieldid()};
milvus::storage::IndexMeta index_meta{build_index_info->segment_id,
build_index_info->field_id,
build_index_info->index_build_id,
build_index_info->index_version};
auto chunk_manager = milvus::storage::CreateChunkManager(
build_index_info->storage_config);
milvus::storage::IndexMeta index_meta{
build_index_info->segmentid(),
build_index_info->field_schema().fieldid(),
build_index_info->buildid(),
build_index_info->index_version(),
"",
build_index_info->field_schema().name(),
field_type,
build_index_info->dim(),
};
auto chunk_manager =
milvus::storage::CreateChunkManager(storage_config);
milvus::storage::FileManagerContext fileManagerContext(
field_meta, index_meta, chunk_manager);
auto index =
milvus::indexbuilder::IndexFactory::GetInstance().CreateIndex(
build_index_info->field_type, config, fileManagerContext);
field_type, config, fileManagerContext);
index->Build();
*res_index = index.release();
auto status = CStatus();
@ -159,22 +231,32 @@ CreateIndex(CIndex* res_index, CBuildIndexInfo c_build_index_info) {
}
CStatus
CreateIndexV2(CIndex* res_index, CBuildIndexInfo c_build_index_info) {
CreateIndexV2(CIndex* res_index,
const uint8_t* serialized_build_index_info,
const uint64_t len) {
try {
auto build_index_info = (BuildIndexInfo*)c_build_index_info;
auto field_type = build_index_info->field_type;
milvus::index::CreateIndexInfo index_info;
index_info.field_type = build_index_info->field_type;
index_info.dim = build_index_info->dim;
auto build_index_info =
std::make_unique<milvus::proto::indexcgo::BuildIndexInfo>();
auto res =
build_index_info->ParseFromArray(serialized_build_index_info, len);
AssertInfo(res, "Unmarshall build index info failed");
auto field_type =
static_cast<DataType>(build_index_info->field_schema().data_type());
auto& config = build_index_info->config;
milvus::index::CreateIndexInfo index_info;
index_info.field_type = field_type;
index_info.dim = build_index_info->dim();
auto storage_config =
get_storage_config(build_index_info->storage_config());
auto config = get_config(build_index_info);
// get index type
auto index_type = milvus::index::GetValueFromConfig<std::string>(
config, "index_type");
AssertInfo(index_type.has_value(), "index type is empty");
index_info.index_type = index_type.value();
auto engine_version = build_index_info->index_engine_version;
auto engine_version = build_index_info->current_index_version();
index_info.index_engine_version = engine_version;
config[milvus::index::INDEX_ENGINE_VERSION] =
std::to_string(engine_version);
@ -188,39 +270,39 @@ CreateIndexV2(CIndex* res_index, CBuildIndexInfo c_build_index_info) {
}
milvus::storage::FieldDataMeta field_meta{
build_index_info->collection_id,
build_index_info->partition_id,
build_index_info->segment_id,
build_index_info->field_id};
build_index_info->collectionid(),
build_index_info->partitionid(),
build_index_info->segmentid(),
build_index_info->field_schema().fieldid()};
milvus::storage::IndexMeta index_meta{
build_index_info->segment_id,
build_index_info->field_id,
build_index_info->index_build_id,
build_index_info->index_version,
build_index_info->field_name,
build_index_info->segmentid(),
build_index_info->field_schema().fieldid(),
build_index_info->buildid(),
build_index_info->index_version(),
"",
build_index_info->field_type,
build_index_info->dim,
build_index_info->field_schema().name(),
field_type,
build_index_info->dim(),
};
auto store_space = milvus_storage::Space::Open(
build_index_info->data_store_path,
build_index_info->store_path(),
milvus_storage::Options{nullptr,
build_index_info->data_store_version});
build_index_info->store_version()});
AssertInfo(store_space.ok() && store_space.has_value(),
"create space failed: {}",
store_space.status().ToString());
auto index_space = milvus_storage::Space::Open(
build_index_info->index_store_path,
build_index_info->index_store_path(),
milvus_storage::Options{.schema = store_space.value()->schema()});
AssertInfo(index_space.ok() && index_space.has_value(),
"create space failed: {}",
index_space.status().ToString());
LOG_INFO("init space success");
auto chunk_manager = milvus::storage::CreateChunkManager(
build_index_info->storage_config);
auto chunk_manager =
milvus::storage::CreateChunkManager(storage_config);
milvus::storage::FileManagerContext fileManagerContext(
field_meta,
index_meta,
@ -229,9 +311,9 @@ CreateIndexV2(CIndex* res_index, CBuildIndexInfo c_build_index_info) {
auto index =
milvus::indexbuilder::IndexFactory::GetInstance().CreateIndex(
build_index_info->field_type,
build_index_info->field_name,
build_index_info->dim,
field_type,
build_index_info->field_schema().name(),
build_index_info->dim(),
config,
fileManagerContext,
std::move(store_space.value()));

View File

@ -28,7 +28,9 @@ CreateIndexV0(enum CDataType dtype,
CIndex* res_index);
CStatus
CreateIndex(CIndex* res_index, CBuildIndexInfo c_build_index_info);
CreateIndex(CIndex* res_index,
const uint8_t* serialized_build_index_info,
const uint64_t len);
CStatus
DeleteIndex(CIndex index);
@ -130,7 +132,9 @@ CStatus
SerializeIndexAndUpLoadV2(CIndex index, CBinarySet* c_binary_set);
CStatus
CreateIndexV2(CIndex* res_index, CBuildIndexInfo c_build_index_info);
CreateIndexV2(CIndex* res_index,
const uint8_t* serialized_build_index_info,
const uint64_t len);
CStatus
AppendIndexStorageInfo(CBuildIndexInfo c_build_index_info,

View File

@ -23,7 +23,7 @@
using namespace milvus;
using namespace milvus::segcore;
using namespace milvus::proto::indexcgo;
using namespace milvus::proto;
using Param = std::pair<knowhere::IndexType, knowhere::MetricType>;

View File

@ -348,10 +348,9 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
}
}
var req *indexpb.CreateJobRequest
if Params.CommonCfg.EnableStorageV2.GetAsBool() {
collectionInfo, err := ib.handler.GetCollection(ib.ctx, segment.GetCollectionID())
if err != nil {
log.Info("index builder get collection info failed", zap.Int64("collectionID", segment.GetCollectionID()), zap.Error(err))
log.Ctx(ib.ctx).Info("index builder get collection info failed", zap.Int64("collectionID", segment.GetCollectionID()), zap.Error(err))
return false
}
@ -367,9 +366,11 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
dim, err := storage.GetDimFromParams(field.TypeParams)
if err != nil {
return false
log.Ctx(ib.ctx).Warn("failed to get dim from field type params",
zap.String("field type", field.GetDataType().String()), zap.Error(err))
// don't return, maybe field is scalar field or sparseFloatVector
}
if Params.CommonCfg.EnableStorageV2.GetAsBool() {
storePath, err := itypeutil.GetStorageURI(params.Params.CommonCfg.StorageScheme.GetValue(), params.Params.CommonCfg.StoragePathPrefix.GetValue(), segment.GetID())
if err != nil {
log.Ctx(ib.ctx).Warn("failed to get storage uri", zap.Error(err))
@ -403,6 +404,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(),
DataIds: binlogIDs,
OptionalScalarFields: optionalFields,
Field: field,
}
} else {
req = &indexpb.CreateJobRequest{
@ -421,6 +423,8 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
SegmentID: segment.GetID(),
FieldID: fieldID,
OptionalScalarFields: optionalFields,
Dim: int64(dim),
Field: field,
}
}

View File

@ -675,7 +675,30 @@ func TestIndexBuilder(t *testing.T) {
chunkManager := &mocks.ChunkManager{}
chunkManager.EXPECT().RootPath().Return("root")
ib := newIndexBuilder(ctx, mt, nodeManager, chunkManager, newIndexEngineVersionManager(), nil)
handler := NewNMockHandler(t)
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Name: "coll",
Fields: []*schemapb.FieldSchema{
{
FieldID: fieldID,
Name: "vec",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "128",
},
},
},
},
EnableDynamicField: false,
Properties: nil,
},
}, nil)
ib := newIndexBuilder(ctx, mt, nodeManager, chunkManager, newIndexEngineVersionManager(), handler)
assert.Equal(t, 6, len(ib.tasks))
assert.Equal(t, indexTaskInit, ib.tasks[buildID])
@ -741,6 +764,30 @@ func TestIndexBuilder_Error(t *testing.T) {
chunkManager := &mocks.ChunkManager{}
chunkManager.EXPECT().RootPath().Return("root")
handler := NewNMockHandler(t)
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Name: "coll",
Fields: []*schemapb.FieldSchema{
{
FieldID: fieldID,
Name: "vec",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "128",
},
},
},
},
EnableDynamicField: false,
Properties: nil,
},
}, nil)
ib := &indexBuilder{
ctx: context.Background(),
tasks: map[int64]indexTaskState{
@ -749,6 +796,7 @@ func TestIndexBuilder_Error(t *testing.T) {
meta: createMetaTable(ec),
chunkManager: chunkManager,
indexEngineVersionManager: newIndexEngineVersionManager(),
handler: handler,
}
t.Run("meta not exist", func(t *testing.T) {
@ -1414,9 +1462,32 @@ func TestVecIndexWithOptionalScalarField(t *testing.T) {
mt.collections[collID].Schema.Fields[1].DataType = schemapb.DataType_VarChar
}
handler := NewNMockHandler(t)
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Name: "coll",
Fields: []*schemapb.FieldSchema{
{
FieldID: fieldID,
Name: "vec",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "128",
},
},
},
},
EnableDynamicField: false,
Properties: nil,
},
}, nil)
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
ib := newIndexBuilder(ctx, &mt, nodeManager, cm, newIndexEngineVersionManager(), nil)
ib := newIndexBuilder(ctx, &mt, nodeManager, cm, newIndexEngineVersionManager(), handler)
t.Run("success to get opt field on startup", func(t *testing.T) {
ic.EXPECT().CreateJob(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(

View File

@ -55,6 +55,8 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest
defer i.lifetime.Done()
log.Info("IndexNode building index ...",
zap.Int64("collectionID", req.GetCollectionID()),
zap.Int64("partitionID", req.GetPartitionID()),
zap.Int64("segmentID", req.GetSegmentID()),
zap.Int64("indexID", req.GetIndexID()),
zap.String("indexName", req.GetIndexName()),
zap.String("indexFilePrefix", req.GetIndexFilePrefix()),

View File

@ -18,7 +18,6 @@ package indexnode
import (
"context"
"encoding/json"
"fmt"
"runtime/debug"
"strconv"
@ -30,6 +29,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/indexcgopb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/indexcgowrapper"
@ -84,12 +84,21 @@ type indexBuildTaskV2 struct {
}
func (it *indexBuildTaskV2) parseParams(ctx context.Context) error {
it.collectionID = it.req.CollectionID
it.partitionID = it.req.PartitionID
it.segmentID = it.req.SegmentID
it.fieldType = it.req.FieldType
it.fieldID = it.req.FieldID
it.fieldName = it.req.FieldName
it.collectionID = it.req.GetCollectionID()
it.partitionID = it.req.GetPartitionID()
it.segmentID = it.req.GetSegmentID()
it.fieldType = it.req.GetFieldType()
if it.fieldType == schemapb.DataType_None {
it.fieldType = it.req.GetField().GetDataType()
}
it.fieldID = it.req.GetFieldID()
if it.fieldID == 0 {
it.fieldID = it.req.GetField().GetFieldID()
}
it.fieldName = it.req.GetFieldName()
if it.fieldName == "" {
it.fieldName = it.req.GetField().GetName()
}
return nil
}
@ -138,61 +147,66 @@ func (it *indexBuildTaskV2) BuildIndex(ctx context.Context) error {
}
}
var buildIndexInfo *indexcgowrapper.BuildIndexInfo
buildIndexInfo, err = indexcgowrapper.NewBuildIndexInfo(it.req.GetStorageConfig())
defer indexcgowrapper.DeleteBuildIndexInfo(buildIndexInfo)
if err != nil {
log.Ctx(ctx).Warn("create build index info failed", zap.Error(err))
return err
}
err = buildIndexInfo.AppendFieldMetaInfoV2(it.collectionID, it.partitionID, it.segmentID, it.fieldID, it.fieldType, it.fieldName, it.req.Dim)
if err != nil {
log.Ctx(ctx).Warn("append field meta failed", zap.Error(err))
return err
}
err = buildIndexInfo.AppendIndexMetaInfo(it.req.IndexID, it.req.BuildID, it.req.IndexVersion)
if err != nil {
log.Ctx(ctx).Warn("append index meta failed", zap.Error(err))
return err
}
err = buildIndexInfo.AppendBuildIndexParam(it.newIndexParams)
if err != nil {
log.Ctx(ctx).Warn("append index params failed", zap.Error(err))
return err
}
err = buildIndexInfo.AppendIndexStorageInfo(it.req.StorePath, it.req.IndexStorePath, it.req.StoreVersion)
if err != nil {
log.Ctx(ctx).Warn("append storage info failed", zap.Error(err))
return err
}
jsonIndexParams, err := json.Marshal(it.newIndexParams)
if err != nil {
log.Ctx(ctx).Error("failed to json marshal index params", zap.Error(err))
return err
}
log.Ctx(ctx).Info("index params are ready",
zap.Int64("buildID", it.BuildID),
zap.String("index params", string(jsonIndexParams)))
err = buildIndexInfo.AppendBuildTypeParam(it.newTypeParams)
if err != nil {
log.Ctx(ctx).Warn("append type params failed", zap.Error(err))
return err
storageConfig := &indexcgopb.StorageConfig{
Address: it.req.GetStorageConfig().GetAddress(),
AccessKeyID: it.req.GetStorageConfig().GetAccessKeyID(),
SecretAccessKey: it.req.GetStorageConfig().GetSecretAccessKey(),
UseSSL: it.req.GetStorageConfig().GetUseSSL(),
BucketName: it.req.GetStorageConfig().GetBucketName(),
RootPath: it.req.GetStorageConfig().GetRootPath(),
UseIAM: it.req.GetStorageConfig().GetUseIAM(),
IAMEndpoint: it.req.GetStorageConfig().GetIAMEndpoint(),
StorageType: it.req.GetStorageConfig().GetStorageType(),
UseVirtualHost: it.req.GetStorageConfig().GetUseVirtualHost(),
Region: it.req.GetStorageConfig().GetRegion(),
CloudProvider: it.req.GetStorageConfig().GetCloudProvider(),
RequestTimeoutMs: it.req.GetStorageConfig().GetRequestTimeoutMs(),
SslCACert: it.req.GetStorageConfig().GetSslCACert(),
}
optFields := make([]*indexcgopb.OptionalFieldInfo, 0, len(it.req.GetOptionalScalarFields()))
for _, optField := range it.req.GetOptionalScalarFields() {
if err := buildIndexInfo.AppendOptionalField(optField); err != nil {
log.Ctx(ctx).Warn("append optional field failed", zap.Error(err))
return err
optFields = append(optFields, &indexcgopb.OptionalFieldInfo{
FieldID: optField.GetFieldID(),
FieldName: optField.GetFieldName(),
FieldType: optField.GetFieldType(),
DataPaths: optField.GetDataPaths(),
})
}
it.currentIndexVersion = getCurrentIndexVersion(it.req.GetCurrentIndexVersion())
field := it.req.GetField()
if field == nil || field.GetDataType() == schemapb.DataType_None {
field = &schemapb.FieldSchema{
FieldID: it.fieldID,
Name: it.fieldName,
DataType: it.fieldType,
}
}
it.index, err = indexcgowrapper.CreateIndexV2(ctx, buildIndexInfo)
buildIndexParams := &indexcgopb.BuildIndexInfo{
ClusterID: it.ClusterID,
BuildID: it.BuildID,
CollectionID: it.collectionID,
PartitionID: it.partitionID,
SegmentID: it.segmentID,
IndexVersion: it.req.GetIndexVersion(),
CurrentIndexVersion: it.currentIndexVersion,
NumRows: it.req.GetNumRows(),
Dim: it.req.GetDim(),
IndexFilePrefix: it.req.GetIndexFilePrefix(),
InsertFiles: it.req.GetDataPaths(),
FieldSchema: field,
StorageConfig: storageConfig,
IndexParams: mapToKVPairs(it.newIndexParams),
TypeParams: mapToKVPairs(it.newTypeParams),
StorePath: it.req.GetStorePath(),
StoreVersion: it.req.GetStoreVersion(),
IndexStorePath: it.req.GetIndexStorePath(),
OptFields: optFields,
}
it.index, err = indexcgowrapper.CreateIndexV2(ctx, buildIndexParams)
if err != nil {
if it.index != nil && it.index.CleanLocalData() != nil {
log.Ctx(ctx).Error("failed to clean cached data on disk after build index failed",
@ -328,7 +342,7 @@ func (it *indexBuildTask) Prepare(ctx context.Context) error {
if len(it.req.DataPaths) == 0 {
for _, id := range it.req.GetDataIds() {
path := metautil.BuildInsertLogPath(it.req.GetStorageConfig().RootPath, it.req.GetCollectionID(), it.req.GetPartitionID(), it.req.GetSegmentID(), it.req.GetFieldID(), id)
path := metautil.BuildInsertLogPath(it.req.GetStorageConfig().RootPath, it.req.GetCollectionID(), it.req.GetPartitionID(), it.req.GetSegmentID(), it.req.GetField().GetFieldID(), id)
it.req.DataPaths = append(it.req.DataPaths, path)
}
}
@ -362,16 +376,10 @@ func (it *indexBuildTask) Prepare(ctx context.Context) error {
}
it.newTypeParams = typeParams
it.newIndexParams = indexParams
it.statistic.IndexParams = it.req.GetIndexParams()
// ugly codes to get dimension
if dimStr, ok := typeParams[common.DimKey]; ok {
var err error
it.statistic.Dim, err = strconv.ParseInt(dimStr, 10, 64)
if err != nil {
log.Ctx(ctx).Error("parse dimesion failed", zap.Error(err))
// ignore error
}
}
it.statistic.Dim = it.req.GetDim()
log.Ctx(ctx).Info("Successfully prepare indexBuildTask", zap.Int64("buildID", it.BuildID),
zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID))
return nil
@ -482,69 +490,65 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
}
}
var buildIndexInfo *indexcgowrapper.BuildIndexInfo
buildIndexInfo, err = indexcgowrapper.NewBuildIndexInfo(it.req.GetStorageConfig())
defer indexcgowrapper.DeleteBuildIndexInfo(buildIndexInfo)
if err != nil {
log.Ctx(ctx).Warn("create build index info failed", zap.Error(err))
return err
}
err = buildIndexInfo.AppendFieldMetaInfo(it.collectionID, it.partitionID, it.segmentID, it.fieldID, it.fieldType)
if err != nil {
log.Ctx(ctx).Warn("append field meta failed", zap.Error(err))
return err
storageConfig := &indexcgopb.StorageConfig{
Address: it.req.GetStorageConfig().GetAddress(),
AccessKeyID: it.req.GetStorageConfig().GetAccessKeyID(),
SecretAccessKey: it.req.GetStorageConfig().GetSecretAccessKey(),
UseSSL: it.req.GetStorageConfig().GetUseSSL(),
BucketName: it.req.GetStorageConfig().GetBucketName(),
RootPath: it.req.GetStorageConfig().GetRootPath(),
UseIAM: it.req.GetStorageConfig().GetUseIAM(),
IAMEndpoint: it.req.GetStorageConfig().GetIAMEndpoint(),
StorageType: it.req.GetStorageConfig().GetStorageType(),
UseVirtualHost: it.req.GetStorageConfig().GetUseVirtualHost(),
Region: it.req.GetStorageConfig().GetRegion(),
CloudProvider: it.req.GetStorageConfig().GetCloudProvider(),
RequestTimeoutMs: it.req.GetStorageConfig().GetRequestTimeoutMs(),
SslCACert: it.req.GetStorageConfig().GetSslCACert(),
}
err = buildIndexInfo.AppendIndexMetaInfo(it.req.IndexID, it.req.BuildID, it.req.IndexVersion)
if err != nil {
log.Ctx(ctx).Warn("append index meta failed", zap.Error(err))
return err
}
err = buildIndexInfo.AppendBuildIndexParam(it.newIndexParams)
if err != nil {
log.Ctx(ctx).Warn("append index params failed", zap.Error(err))
return err
}
jsonIndexParams, err := json.Marshal(it.newIndexParams)
if err != nil {
log.Ctx(ctx).Error("failed to json marshal index params", zap.Error(err))
return err
}
log.Ctx(ctx).Info("index params are ready",
zap.Int64("buildID", it.BuildID),
zap.String("index params", string(jsonIndexParams)))
err = buildIndexInfo.AppendBuildTypeParam(it.newTypeParams)
if err != nil {
log.Ctx(ctx).Warn("append type params failed", zap.Error(err))
return err
}
for _, path := range it.req.GetDataPaths() {
err = buildIndexInfo.AppendInsertFile(path)
if err != nil {
log.Ctx(ctx).Warn("append insert binlog path failed", zap.Error(err))
return err
}
optFields := make([]*indexcgopb.OptionalFieldInfo, 0, len(it.req.GetOptionalScalarFields()))
for _, optField := range it.req.GetOptionalScalarFields() {
optFields = append(optFields, &indexcgopb.OptionalFieldInfo{
FieldID: optField.GetFieldID(),
FieldName: optField.GetFieldName(),
FieldType: optField.GetFieldType(),
DataPaths: optField.GetDataPaths(),
})
}
it.currentIndexVersion = getCurrentIndexVersion(it.req.GetCurrentIndexVersion())
if err := buildIndexInfo.AppendIndexEngineVersion(it.currentIndexVersion); err != nil {
log.Ctx(ctx).Warn("append index engine version failed", zap.Error(err))
return err
field := it.req.GetField()
if field == nil || field.GetDataType() == schemapb.DataType_None {
field = &schemapb.FieldSchema{
FieldID: it.fieldID,
Name: it.fieldName,
DataType: it.fieldType,
}
}
buildIndexParams := &indexcgopb.BuildIndexInfo{
ClusterID: it.ClusterID,
BuildID: it.BuildID,
CollectionID: it.collectionID,
PartitionID: it.partitionID,
SegmentID: it.segmentID,
IndexVersion: it.req.GetIndexVersion(),
CurrentIndexVersion: it.currentIndexVersion,
NumRows: it.req.GetNumRows(),
Dim: it.req.GetDim(),
IndexFilePrefix: it.req.GetIndexFilePrefix(),
InsertFiles: it.req.GetDataPaths(),
FieldSchema: field,
StorageConfig: storageConfig,
IndexParams: mapToKVPairs(it.newIndexParams),
TypeParams: mapToKVPairs(it.newTypeParams),
StorePath: it.req.GetStorePath(),
StoreVersion: it.req.GetStoreVersion(),
IndexStorePath: it.req.GetIndexStorePath(),
OptFields: optFields,
}
for _, optField := range it.req.GetOptionalScalarFields() {
if err := buildIndexInfo.AppendOptionalField(optField); err != nil {
log.Ctx(ctx).Warn("append optional field failed", zap.Error(err))
return err
}
}
it.index, err = indexcgowrapper.CreateIndex(ctx, buildIndexInfo)
it.index, err = indexcgowrapper.CreateIndex(ctx, buildIndexParams)
if err != nil {
if it.index != nil && it.index.CleanLocalData() != nil {
log.Ctx(ctx).Error("failed to clean cached data on disk after build index failed",
@ -653,8 +657,6 @@ func (it *indexBuildTask) decodeBlobs(ctx context.Context, blobs []*storage.Blob
deserializeDur := it.tr.RecordSpan()
log.Ctx(ctx).Info("IndexNode deserialize data success",
zap.Int64("index id", it.req.IndexID),
zap.String("index name", it.req.IndexName),
zap.Int64("collectionID", it.collectionID),
zap.Int64("partitionID", it.partitionID),
zap.Int64("segmentID", it.segmentID),

View File

@ -286,9 +286,11 @@ func (suite *IndexBuildTaskV2Suite) TestBuildIndex() {
CollectionID: 1,
PartitionID: 1,
SegmentID: 1,
Field: &schemapb.FieldSchema{
FieldID: 3,
FieldName: "vec",
FieldType: schemapb.DataType_FloatVector,
Name: "vec",
DataType: schemapb.DataType_FloatVector,
},
StorePath: "file://" + suite.space.Path(),
StoreVersion: suite.space.GetCurrentVersion(),
IndexStorePath: "file://" + suite.space.Path(),

View File

@ -19,6 +19,7 @@ package indexnode
import (
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
)
@ -36,3 +37,14 @@ func estimateFieldDataSize(dim int64, numRows int64, dataType schemapb.DataType)
return 0, nil
}
}
func mapToKVPairs(m map[string]string) []*commonpb.KeyValuePair {
kvs := make([]*commonpb.KeyValuePair, 0, len(m))
for k, v := range m {
kvs = append(kvs, &commonpb.KeyValuePair{
Key: k,
Value: v,
})
}
return kvs
}

View File

@ -0,0 +1,41 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package indexnode
import (
"testing"
"github.com/stretchr/testify/suite"
)
type utilSuite struct {
suite.Suite
}
func (s *utilSuite) Test_mapToKVPairs() {
indexParams := map[string]string{
"index_type": "IVF_FLAT",
"dim": "128",
"nlist": "1024",
}
s.Equal(3, len(mapToKVPairs(indexParams)))
}
func Test_utilSuite(t *testing.T) {
suite.Run(t, new(utilSuite))
}

View File

@ -4,6 +4,7 @@ package milvus.proto.indexcgo;
option go_package="github.com/milvus-io/milvus/internal/proto/indexcgopb";
import "common.proto";
import "schema.proto";
message TypeParams {
repeated common.KeyValuePair params = 1;
@ -30,3 +31,52 @@ message Binary {
message BinarySet {
repeated Binary datas = 1;
}
// Synchronously modify StorageConfig in index_coord.proto file
message StorageConfig {
string address = 1;
string access_keyID = 2;
string secret_access_key = 3;
bool useSSL = 4;
string bucket_name = 5;
string root_path = 6;
bool useIAM = 7;
string IAMEndpoint = 8;
string storage_type = 9;
bool use_virtual_host = 10;
string region = 11;
string cloud_provider = 12;
int64 request_timeout_ms = 13;
string sslCACert = 14;
}
// Synchronously modify OptionalFieldInfo in index_coord.proto file
message OptionalFieldInfo {
int64 fieldID = 1;
string field_name = 2;
int32 field_type = 3;
repeated string data_paths = 4;
}
message BuildIndexInfo {
string clusterID = 1;
int64 buildID = 2;
int64 collectionID = 3;
int64 partitionID = 4;
int64 segmentID = 5;
int64 index_version = 6;
int32 current_index_version = 7;
int64 num_rows = 8;
int64 dim = 9;
string index_file_prefix = 10;
repeated string insert_files = 11;
// repeated int64 data_ids = 12;
schema.FieldSchema field_schema = 12;
StorageConfig storage_config = 13;
repeated common.KeyValuePair index_params = 14;
repeated common.KeyValuePair type_params = 15;
string store_path = 16;
int64 store_version = 17;
string index_store_path = 18;
repeated OptionalFieldInfo opt_fields = 19;
}

View File

@ -8,6 +8,7 @@ import "common.proto";
import "internal.proto";
import "milvus.proto";
import "schema.proto";
import "index_cgo_msg.proto";
service IndexCoord {
rpc GetComponentStates(milvus.GetComponentStatesRequest) returns (milvus.ComponentStates) {}
@ -226,6 +227,7 @@ message GetIndexBuildProgressResponse {
int64 pending_index_rows = 4;
}
// Synchronously modify StorageConfig in index_cgo_msg.proto file
message StorageConfig {
string address = 1;
string access_keyID = 2;
@ -243,6 +245,7 @@ message StorageConfig {
string sslCACert = 14;
}
// Synchronously modify OptionalFieldInfo in index_cgo_msg.proto file
message OptionalFieldInfo {
int64 fieldID = 1;
string field_name = 2;
@ -276,6 +279,7 @@ message CreateJobRequest {
int64 dim = 22;
repeated int64 data_ids = 23;
repeated OptionalFieldInfo optional_scalar_fields = 24;
schema.FieldSchema field = 25;
}
message QueryJobsRequest {

View File

@ -16,6 +16,7 @@ import (
"unsafe"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
@ -94,9 +95,17 @@ func NewCgoIndex(dtype schemapb.DataType, typeParams, indexParams map[string]str
return index, nil
}
func CreateIndex(ctx context.Context, buildIndexInfo *BuildIndexInfo) (CodecIndex, error) {
func CreateIndex(ctx context.Context, buildIndexInfo *indexcgopb.BuildIndexInfo) (CodecIndex, error) {
buildIndexInfoBlob, err := proto.Marshal(buildIndexInfo)
if err != nil {
log.Ctx(ctx).Warn("marshal buildIndexInfo failed",
zap.String("clusterID", buildIndexInfo.GetClusterID()),
zap.Int64("buildID", buildIndexInfo.GetBuildID()),
zap.Error(err))
return nil, err
}
var indexPtr C.CIndex
status := C.CreateIndex(&indexPtr, buildIndexInfo.cBuildIndexInfo)
status := C.CreateIndex(&indexPtr, (*C.uint8_t)(unsafe.Pointer(&buildIndexInfoBlob[0])), (C.uint64_t)(len(buildIndexInfoBlob)))
if err := HandleCStatus(&status, "failed to create index"); err != nil {
return nil, err
}
@ -109,9 +118,17 @@ func CreateIndex(ctx context.Context, buildIndexInfo *BuildIndexInfo) (CodecInde
return index, nil
}
func CreateIndexV2(ctx context.Context, buildIndexInfo *BuildIndexInfo) (CodecIndex, error) {
func CreateIndexV2(ctx context.Context, buildIndexInfo *indexcgopb.BuildIndexInfo) (CodecIndex, error) {
buildIndexInfoBlob, err := proto.Marshal(buildIndexInfo)
if err != nil {
log.Ctx(ctx).Warn("marshal buildIndexInfo failed",
zap.String("clusterID", buildIndexInfo.GetClusterID()),
zap.Int64("buildID", buildIndexInfo.GetBuildID()),
zap.Error(err))
return nil, err
}
var indexPtr C.CIndex
status := C.CreateIndexV2(&indexPtr, buildIndexInfo.cBuildIndexInfo)
status := C.CreateIndexV2(&indexPtr, (*C.uint8_t)(unsafe.Pointer(&buildIndexInfoBlob[0])), (C.uint64_t)(len(buildIndexInfoBlob)))
if err := HandleCStatus(&status, "failed to create index"); err != nil {
return nil, err
}