From e1258b8cade4b9803d11df9b0885965552c8e1ff Mon Sep 17 00:00:00 2001 From: Bingyi Sun Date: Fri, 12 Jan 2024 18:10:51 +0800 Subject: [PATCH] feat: integrate storagev2 into loading segment (#29336) issue: #29335 --------- Signed-off-by: sunby --- go.mod | 2 +- go.sum | 14 + internal/core/src/indexbuilder/IndexFactory.h | 3 +- .../core/src/indexbuilder/VecIndexCreator.cpp | 4 +- .../core/src/indexbuilder/VecIndexCreator.h | 1 + internal/core/src/indexbuilder/index_c.cpp | 2 + internal/core/src/segcore/Utils.cpp | 4 +- internal/core/src/segcore/Utils.h | 1 + .../core/src/storage/DiskFileManagerImpl.cpp | 2 +- internal/core/src/storage/Util.cpp | 2 +- .../thirdparty/milvus-storage/CMakeLists.txt | 2 +- internal/datacoord/index_builder.go | 27 +- internal/datacoord/meta.go | 2 +- .../datanode/syncmgr/storage_v2_serializer.go | 15 +- .../syncmgr/storage_v2_serializer_test.go | 2 +- internal/datanode/syncmgr/taskv2.go | 133 +----- internal/datanode/syncmgr/taskv2_test.go | 2 +- internal/indexnode/indexnode_service.go | 4 + internal/proto/query_coord.proto | 2 + internal/querycoordv2/utils/types.go | 25 +- .../segments/load_field_data_info.go | 11 + .../querynodev2/segments/load_index_info.go | 30 +- internal/querynodev2/segments/mock_data.go | 3 +- internal/querynodev2/segments/mock_loader.go | 44 ++ internal/querynodev2/segments/segment.go | 206 +++++++++- .../querynodev2/segments/segment_loader.go | 383 ++++++++++++++++++ .../segments/segment_loader_test.go | 154 +++++++ internal/querynodev2/server.go | 6 +- internal/util/typeutil/schema.go | 6 +- internal/util/typeutil/storage.go | 122 ++++++ pkg/util/paramtable/component_param.go | 17 +- 31 files changed, 1030 insertions(+), 201 deletions(-) create mode 100644 internal/util/typeutil/storage.go diff --git a/go.mod b/go.mod index 22688776b1..ada8ec0160 100644 --- a/go.mod +++ b/go.mod @@ -58,7 +58,7 @@ require ( require github.com/apache/arrow/go/v12 v12.0.1 -require github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092 +require github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 require ( github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000 diff --git a/go.sum b/go.sum index 2f83471f73..22fbf14496 100644 --- a/go.sum +++ b/go.sum @@ -589,6 +589,20 @@ github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240109020841-d367b5a59df1 github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240109020841-d367b5a59df1/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092 h1:UYJ7JB+QlMOoFHNdd8mUa3/lV63t9dnBX7ILXmEEWPY= github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho= +github.com/milvus-io/milvus-storage/go v0.0.0-20231213080429-ed6b9bd5c9d2 h1:2epYWKCSY6Rq/aJ/6UyUS1d3+Yts0UK8HNiWGjVN4Pc= +github.com/milvus-io/milvus-storage/go v0.0.0-20231213080429-ed6b9bd5c9d2/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho= +github.com/milvus-io/milvus-storage/go v0.0.0-20231226033437-76e506e3ae48 h1:EXDWA9yjmLLjIlIFjTdwtA3p1G0FDJdT07QdgCAWFWU= +github.com/milvus-io/milvus-storage/go v0.0.0-20231226033437-76e506e3ae48/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho= +github.com/milvus-io/milvus-storage/go v0.0.0-20231226075239-137cb5c55a5f h1:l43tW6aahbKcatIsX2X1guQktWSv/wgCBcGhmMWJgTg= +github.com/milvus-io/milvus-storage/go v0.0.0-20231226075239-137cb5c55a5f/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho= +github.com/milvus-io/milvus-storage/go v0.0.0-20231226081638-4a9a35e739b6 h1:v8WP0xJoOFno/YKdTrVfjWNn/VBmRX4IirK3/dhtH+8= +github.com/milvus-io/milvus-storage/go v0.0.0-20231226081638-4a9a35e739b6/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho= +github.com/milvus-io/milvus-storage/go v0.0.0-20231226083239-422d03dd1e1c h1:Xnc1Jt4joXVu2OsZp3xNZYQ/rKptRfRzYIHNaZkCpF8= +github.com/milvus-io/milvus-storage/go v0.0.0-20231226083239-422d03dd1e1c/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho= +github.com/milvus-io/milvus-storage/go v0.0.0-20231226085237-57519406e94f h1:4qnOXYGDVXdbIWUp9tk+JYtQ58QKf5d8q+XVk9+UVXo= +github.com/milvus-io/milvus-storage/go v0.0.0-20231226085237-57519406e94f/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho= +github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 h1:Z+sp64fmAOxAG7mU0dfVOXvAXlwRB0c8a96rIM5HevI= +github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho= github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A= github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= diff --git a/internal/core/src/indexbuilder/IndexFactory.h b/internal/core/src/indexbuilder/IndexFactory.h index 47b5125a85..3b6e6874de 100644 --- a/internal/core/src/indexbuilder/IndexFactory.h +++ b/internal/core/src/indexbuilder/IndexFactory.h @@ -77,6 +77,7 @@ class IndexFactory { IndexCreatorBasePtr CreateIndex(DataType type, const std::string& field_name, + const int64_t dim, Config& config, const storage::FileManagerContext& file_manager_context, std::shared_ptr space) { @@ -101,7 +102,7 @@ class IndexFactory { case DataType::VECTOR_FLOAT16: case DataType::VECTOR_BFLOAT16: return std::make_unique( - type, field_name, config, file_manager_context, space); + type, field_name, dim, config, file_manager_context, space); default: throw std::invalid_argument(invalid_dtype_msg); } diff --git a/internal/core/src/indexbuilder/VecIndexCreator.cpp b/internal/core/src/indexbuilder/VecIndexCreator.cpp index 14535646dc..ae4474be83 100644 --- a/internal/core/src/indexbuilder/VecIndexCreator.cpp +++ b/internal/core/src/indexbuilder/VecIndexCreator.cpp @@ -24,12 +24,13 @@ VecIndexCreator::VecIndexCreator( DataType data_type, Config& config, const storage::FileManagerContext& file_manager_context) - : VecIndexCreator(data_type, "", config, file_manager_context, nullptr) { + : VecIndexCreator(data_type, "", 0, config, file_manager_context, nullptr) { } VecIndexCreator::VecIndexCreator( DataType data_type, const std::string& field_name, + const int64_t dim, Config& config, const storage::FileManagerContext& file_manager_context, std::shared_ptr space) @@ -41,6 +42,7 @@ VecIndexCreator::VecIndexCreator( index_info.field_name = field_name; index_info.index_engine_version = index::GetIndexEngineVersionFromConfig(config_); + index_info.dim = dim; index_ = index::IndexFactory::GetInstance().CreateIndex( index_info, file_manager_context, space_); diff --git a/internal/core/src/indexbuilder/VecIndexCreator.h b/internal/core/src/indexbuilder/VecIndexCreator.h index b1cf03b986..2973f4f3b3 100644 --- a/internal/core/src/indexbuilder/VecIndexCreator.h +++ b/internal/core/src/indexbuilder/VecIndexCreator.h @@ -35,6 +35,7 @@ class VecIndexCreator : public IndexCreatorBase { VecIndexCreator(DataType data_type, const std::string& field_name, + const int64_t dim, Config& config, const storage::FileManagerContext& file_manager_context, std::shared_ptr space); diff --git a/internal/core/src/indexbuilder/index_c.cpp b/internal/core/src/indexbuilder/index_c.cpp index c9128ea9e6..6b34ca3734 100644 --- a/internal/core/src/indexbuilder/index_c.cpp +++ b/internal/core/src/indexbuilder/index_c.cpp @@ -152,6 +152,7 @@ CreateIndexV2(CIndex* res_index, CBuildIndexInfo c_build_index_info) { auto field_type = build_index_info->field_type; milvus::index::CreateIndexInfo index_info; index_info.field_type = build_index_info->field_type; + index_info.dim = build_index_info->dim; auto& config = build_index_info->config; // get index type @@ -217,6 +218,7 @@ CreateIndexV2(CIndex* res_index, CBuildIndexInfo c_build_index_info) { milvus::indexbuilder::IndexFactory::GetInstance().CreateIndex( build_index_info->field_type, build_index_info->field_name, + build_index_info->dim, config, fileManagerContext, std::move(store_space.value())); diff --git a/internal/core/src/segcore/Utils.cpp b/internal/core/src/segcore/Utils.cpp index 4ae158eb11..b654cba0bc 100644 --- a/internal/core/src/segcore/Utils.cpp +++ b/internal/core/src/segcore/Utils.cpp @@ -719,10 +719,8 @@ void LoadFieldDatasFromRemote2(std::shared_ptr space, SchemaPtr schema, FieldDataInfo& field_data_info) { - // log all schema ids - for (auto& field : schema->get_fields()) { - } auto res = space->ScanData(); + if (!res.ok()) { PanicInfo(S3Error, "failed to create scan iterator"); } diff --git a/internal/core/src/segcore/Utils.h b/internal/core/src/segcore/Utils.h index f0d3cf4715..b3987597c5 100644 --- a/internal/core/src/segcore/Utils.h +++ b/internal/core/src/segcore/Utils.h @@ -25,6 +25,7 @@ // #include "common/Schema.h" #include "common/Types.h" #include "index/Index.h" +#include "log/Log.h" #include "segcore/DeletedRecord.h" #include "segcore/InsertRecord.h" #include "storage/space.h" diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp index 132a51804c..060c2110ee 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.cpp +++ b/internal/core/src/storage/DiskFileManagerImpl.cpp @@ -93,7 +93,7 @@ DiskFileManagerImpl::AddFileUsingSpace( for (int64_t i = 0; i < remote_files.size(); ++i) { auto data = LoadIndexFromDisk( local_file_name, local_file_offsets[i], remote_file_sizes[i]); - auto status = space_->WriteBolb( + auto status = space_->WriteBlob( remote_files[i], data.get(), remote_file_sizes[i]); if (!status.ok()) { return false; diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index decacc74d5..c725d80297 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -479,7 +479,7 @@ EncodeAndUploadIndexSlice2(std::shared_ptr space, indexData->SetFieldDataMeta(field_meta); auto serialized_index_data = indexData->serialize_to_remote_file(); auto serialized_index_size = serialized_index_data.size(); - auto status = space->WriteBolb( + auto status = space->WriteBlob( object_key, serialized_index_data.data(), serialized_index_size); AssertInfo(status.ok(), "write to space error: {}", status.ToString()); return std::make_pair(std::move(object_key), serialized_index_size); diff --git a/internal/core/thirdparty/milvus-storage/CMakeLists.txt b/internal/core/thirdparty/milvus-storage/CMakeLists.txt index 4839730d01..b88930e692 100644 --- a/internal/core/thirdparty/milvus-storage/CMakeLists.txt +++ b/internal/core/thirdparty/milvus-storage/CMakeLists.txt @@ -11,7 +11,7 @@ # or implied. See the License for the specific language governing permissions and limitations under the License. #------------------------------------------------------------------------------- -set( MILVUS_STORAGE_VERSION c7107a0) +set( MILVUS_STORAGE_VERSION 4a9a35e) message(STATUS "Building milvus-storage-${MILVUS_STORAGE_VERSION} from source") message(STATUS ${CMAKE_BUILD_TYPE}) diff --git a/internal/datacoord/index_builder.go b/internal/datacoord/index_builder.go index c009c0b74a..374806ef3a 100644 --- a/internal/datacoord/index_builder.go +++ b/internal/datacoord/index_builder.go @@ -18,7 +18,6 @@ package datacoord import ( "context" - "fmt" "path" "sync" "time" @@ -28,8 +27,10 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" @@ -323,12 +324,20 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { } } - dim, _ := storage.GetDimFromParams(field.TypeParams) - var scheme string - if Params.MinioCfg.UseSSL.GetAsBool() { - scheme = "https" - } else { - scheme = "http" + dim, err := storage.GetDimFromParams(field.TypeParams) + if err != nil { + return false + } + + storePath, err := typeutil.GetStorageURI(params.Params.CommonCfg.StorageScheme.GetValue(), params.Params.CommonCfg.StoragePathPrefix.GetValue(), segment.GetID()) + if err != nil { + log.Ctx(ib.ctx).Warn("failed to get storage uri", zap.Error(err)) + return false + } + indexStorePath, err := typeutil.GetStorageURI(params.Params.CommonCfg.StorageScheme.GetValue(), params.Params.CommonCfg.StoragePathPrefix.GetValue()+"/index", segment.GetID()) + if err != nil { + log.Ctx(ib.ctx).Warn("failed to get storage uri", zap.Error(err)) + return false } req = &indexpb.CreateJobRequest{ @@ -347,9 +356,9 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { FieldID: fieldID, FieldName: field.Name, FieldType: field.DataType, - StorePath: fmt.Sprintf("s3://%s:%s@%s/%d?scheme=%s&endpoint_override=%s&allow_bucket_creation=true", Params.MinioCfg.AccessKeyID.GetValue(), Params.MinioCfg.SecretAccessKey.GetValue(), Params.MinioCfg.BucketName.GetValue(), segment.GetID(), scheme, Params.MinioCfg.Address.GetValue()), + StorePath: storePath, StoreVersion: segment.GetStorageVersion(), - IndexStorePath: fmt.Sprintf("s3://%s:%s@%s/index/%d?scheme=%s&endpoint_override=%s&allow_bucket_creation=true", Params.MinioCfg.AccessKeyID.GetValue(), Params.MinioCfg.SecretAccessKey.GetValue(), Params.MinioCfg.BucketName.GetValue(), segment.GetID(), scheme, Params.MinioCfg.Address.GetValue()), + IndexStorePath: indexStorePath, Dim: int64(dim), CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(), } diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 68c422ee86..90a586cfd6 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -489,7 +489,7 @@ func CreateL0Operator(collectionID, partitionID, segmentID int64, channel string func UpdateStorageVersionOperator(segmentID int64, version int64) UpdateOperator { return func(modPack *updateSegmentPack) bool { - segment := modPack.meta.GetSegment(segmentID) + segment := modPack.Get(segmentID) if segment == nil { log.Info("meta update: update storage version - segment not found", zap.Int64("segmentID", segmentID)) diff --git a/internal/datanode/syncmgr/storage_v2_serializer.go b/internal/datanode/syncmgr/storage_v2_serializer.go index 6b99ca4521..36ed26abc4 100644 --- a/internal/datanode/syncmgr/storage_v2_serializer.go +++ b/internal/datanode/syncmgr/storage_v2_serializer.go @@ -97,7 +97,7 @@ func (s *storageV2Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack) return nil, err } - task.batchStatsBlob = batchStatsBlob + task.statsBlob = batchStatsBlob s.metacache.UpdateSegments(metacache.RollStats(singlePKStats), metacache.WithSegmentIDs(pack.segmentID)) } @@ -155,7 +155,7 @@ func (s *storageV2Serializer) serializeInsertData(pack *SyncPack) (array.RecordR builder := array.NewRecordBuilder(memory.DefaultAllocator, s.arrowSchema) defer builder.Release() - if err := buildRecord(builder, pack.insertData, s.schema.GetFields()); err != nil { + if err := iTypeutil.BuildRecord(builder, pack.insertData, s.schema.GetFields()); err != nil { return nil, err } @@ -221,13 +221,10 @@ func (s *storageV2Serializer) serializeDeltaData(pack *SyncPack) (array.RecordRe func SpaceCreatorFunc(segmentID int64, collSchema *schemapb.CollectionSchema, arrowSchema *arrow.Schema) func() (*milvus_storage.Space, error) { return func() (*milvus_storage.Space, error) { - url := fmt.Sprintf("%s://%s:%s@%s/%d?endpoint_override=%s", - params.Params.CommonCfg.StorageScheme.GetValue(), - params.Params.MinioCfg.AccessKeyID.GetValue(), - params.Params.MinioCfg.SecretAccessKey.GetValue(), - params.Params.MinioCfg.BucketName.GetValue(), - segmentID, - params.Params.MinioCfg.Address.GetValue()) + url, err := iTypeutil.GetStorageURI(params.Params.CommonCfg.StorageScheme.GetValue(), params.Params.CommonCfg.StoragePathPrefix.GetValue(), segmentID) + if err != nil { + return nil, err + } pkSchema, err := typeutil.GetPrimaryFieldSchema(collSchema) if err != nil { diff --git a/internal/datanode/syncmgr/storage_v2_serializer_test.go b/internal/datanode/syncmgr/storage_v2_serializer_test.go index 9a4711a7d6..e430de00ab 100644 --- a/internal/datanode/syncmgr/storage_v2_serializer_test.go +++ b/internal/datanode/syncmgr/storage_v2_serializer_test.go @@ -249,7 +249,7 @@ func (s *StorageV2SerializerSuite) TestSerializeInsert() { s.EqualValues(50, taskV2.tsFrom) s.EqualValues(100, taskV2.tsTo) s.NotNil(taskV2.reader) - s.NotNil(taskV2.batchStatsBlob) + s.NotNil(taskV2.statsBlob) }) s.Run("with_flush_segment_not_found", func() { diff --git a/internal/datanode/syncmgr/taskv2.go b/internal/datanode/syncmgr/taskv2.go index 293ed92ff7..732c1003e0 100644 --- a/internal/datanode/syncmgr/taskv2.go +++ b/internal/datanode/syncmgr/taskv2.go @@ -18,7 +18,6 @@ package syncmgr import ( "context" - "math" "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" @@ -33,7 +32,6 @@ import ( "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/retry" @@ -133,139 +131,10 @@ func (t *SyncTaskV2) writeSpace() error { } func (t *SyncTaskV2) writeMeta() error { + t.storageVersion = t.space.GetCurrentVersion() return t.metaWriter.UpdateSyncV2(t) } -func buildRecord(b *array.RecordBuilder, data *storage.InsertData, fields []*schemapb.FieldSchema) error { - if data == nil { - log.Info("no buffer data to flush") - return nil - } - for i, field := range fields { - fBuilder := b.Field(i) - switch field.DataType { - case schemapb.DataType_Bool: - fBuilder.(*array.BooleanBuilder).AppendValues(data.Data[field.FieldID].(*storage.BoolFieldData).Data, nil) - case schemapb.DataType_Int8: - fBuilder.(*array.Int8Builder).AppendValues(data.Data[field.FieldID].(*storage.Int8FieldData).Data, nil) - case schemapb.DataType_Int16: - fBuilder.(*array.Int16Builder).AppendValues(data.Data[field.FieldID].(*storage.Int16FieldData).Data, nil) - case schemapb.DataType_Int32: - fBuilder.(*array.Int32Builder).AppendValues(data.Data[field.FieldID].(*storage.Int32FieldData).Data, nil) - case schemapb.DataType_Int64: - fBuilder.(*array.Int64Builder).AppendValues(data.Data[field.FieldID].(*storage.Int64FieldData).Data, nil) - case schemapb.DataType_Float: - fBuilder.(*array.Float32Builder).AppendValues(data.Data[field.FieldID].(*storage.FloatFieldData).Data, nil) - case schemapb.DataType_Double: - fBuilder.(*array.Float64Builder).AppendValues(data.Data[field.FieldID].(*storage.DoubleFieldData).Data, nil) - case schemapb.DataType_VarChar, schemapb.DataType_String: - fBuilder.(*array.StringBuilder).AppendValues(data.Data[field.FieldID].(*storage.StringFieldData).Data, nil) - case schemapb.DataType_Array: - appendListValues(fBuilder.(*array.ListBuilder), data.Data[field.FieldID].(*storage.ArrayFieldData)) - case schemapb.DataType_JSON: - fBuilder.(*array.BinaryBuilder).AppendValues(data.Data[field.FieldID].(*storage.JSONFieldData).Data, nil) - case schemapb.DataType_BinaryVector: - vecData := data.Data[field.FieldID].(*storage.BinaryVectorFieldData) - for i := 0; i < len(vecData.Data); i += vecData.Dim / 8 { - fBuilder.(*array.FixedSizeBinaryBuilder).Append(vecData.Data[i : i+vecData.Dim/8]) - } - case schemapb.DataType_FloatVector: - vecData := data.Data[field.FieldID].(*storage.FloatVectorFieldData) - builder := fBuilder.(*array.FixedSizeBinaryBuilder) - dim := vecData.Dim - data := vecData.Data - byteLength := dim * 4 - length := len(data) / dim - - builder.Reserve(length) - bytesData := make([]byte, byteLength) - for i := 0; i < length; i++ { - vec := data[i*dim : (i+1)*dim] - for j := range vec { - bytes := math.Float32bits(vec[j]) - common.Endian.PutUint32(bytesData[j*4:], bytes) - } - builder.Append(bytesData) - } - case schemapb.DataType_Float16Vector: - vecData := data.Data[field.FieldID].(*storage.Float16VectorFieldData) - builder := fBuilder.(*array.FixedSizeBinaryBuilder) - dim := vecData.Dim - data := vecData.Data - byteLength := dim * 2 - length := len(data) / byteLength - - builder.Reserve(length) - for i := 0; i < length; i++ { - builder.Append(data[i*byteLength : (i+1)*byteLength]) - } - - default: - return merr.WrapErrParameterInvalidMsg("unknown type %v", field.DataType.String()) - } - } - - return nil -} - -func appendListValues(builder *array.ListBuilder, data *storage.ArrayFieldData) error { - vb := builder.ValueBuilder() - switch data.ElementType { - case schemapb.DataType_Bool: - for _, data := range data.Data { - builder.Append(true) - vb.(*array.BooleanBuilder).AppendValues(data.GetBoolData().Data, nil) - } - case schemapb.DataType_Int8: - for _, data := range data.Data { - builder.Append(true) - vb.(*array.Int8Builder).AppendValues(castIntArray[int8](data.GetIntData().Data), nil) - } - case schemapb.DataType_Int16: - for _, data := range data.Data { - builder.Append(true) - vb.(*array.Int16Builder).AppendValues(castIntArray[int16](data.GetIntData().Data), nil) - } - case schemapb.DataType_Int32: - for _, data := range data.Data { - builder.Append(true) - vb.(*array.Int32Builder).AppendValues(data.GetIntData().Data, nil) - } - case schemapb.DataType_Int64: - for _, data := range data.Data { - builder.Append(true) - vb.(*array.Int64Builder).AppendValues(data.GetLongData().Data, nil) - } - case schemapb.DataType_Float: - for _, data := range data.Data { - builder.Append(true) - vb.(*array.Float32Builder).AppendValues(data.GetFloatData().Data, nil) - } - case schemapb.DataType_Double: - for _, data := range data.Data { - builder.Append(true) - vb.(*array.Float64Builder).AppendValues(data.GetDoubleData().Data, nil) - } - case schemapb.DataType_String, schemapb.DataType_VarChar: - for _, data := range data.Data { - builder.Append(true) - vb.(*array.StringBuilder).AppendValues(data.GetStringData().Data, nil) - } - - default: - return merr.WrapErrParameterInvalidMsg("unknown type %v", data.ElementType.String()) - } - return nil -} - -func castIntArray[T int8 | int16](nums []int32) []T { - ret := make([]T, 0, len(nums)) - for _, n := range nums { - ret = append(ret, T(n)) - } - return ret -} - func NewSyncTaskV2() *SyncTaskV2 { return &SyncTaskV2{ SyncTask: NewSyncTask(), diff --git a/internal/datanode/syncmgr/taskv2_test.go b/internal/datanode/syncmgr/taskv2_test.go index c58b45400d..ea29dba8d9 100644 --- a/internal/datanode/syncmgr/taskv2_test.go +++ b/internal/datanode/syncmgr/taskv2_test.go @@ -316,7 +316,7 @@ func (s *SyncTaskSuiteV2) TestBuildRecord() { }, } - err = buildRecord(b, data, fieldSchemas) + err = typeutil.BuildRecord(b, data, fieldSchemas) s.NoError(err) s.EqualValues(2, b.NewRecord().NumRows()) } diff --git a/internal/indexnode/indexnode_service.go b/internal/indexnode/indexnode_service.go index 5886df5e73..70631a5bc4 100644 --- a/internal/indexnode/indexnode_service.go +++ b/internal/indexnode/indexnode_service.go @@ -63,6 +63,10 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest zap.Any("indexParams", req.GetIndexParams()), zap.Int64("numRows", req.GetNumRows()), zap.Int32("current_index_version", req.GetCurrentIndexVersion()), + zap.Any("storepath", req.GetStorePath()), + zap.Any("storeversion", req.GetStoreVersion()), + zap.Any("indexstorepath", req.GetIndexStorePath()), + zap.Any("dim", req.GetDim()), ) ctx, sp := otel.Tracer(typeutil.IndexNodeRole).Start(ctx, "IndexNode-CreateIndex", trace.WithAttributes( attribute.Int64("indexBuildID", req.GetBuildID()), diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index da69d840ea..5e3410c9e0 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -265,6 +265,7 @@ message SegmentLoadInfo { msg.MsgPosition delta_position = 15; int64 readableVersion = 16; data.SegmentLevel level = 17; + int64 storageVersion = 18; } message FieldIndexInfo { @@ -280,6 +281,7 @@ message FieldIndexInfo { int64 index_version = 9; int64 num_rows = 10; int32 current_index_version = 11; + int64 index_store_version = 12; } enum LoadScope { diff --git a/internal/querycoordv2/utils/types.go b/internal/querycoordv2/utils/types.go index 60d03587a8..aa4481cd34 100644 --- a/internal/querycoordv2/utils/types.go +++ b/internal/querycoordv2/utils/types.go @@ -72,18 +72,19 @@ func PackSegmentLoadInfo(segment *datapb.SegmentInfo, channelCheckpoint *msgpb.M zap.Duration("tsLag", tsLag)) } loadInfo := &querypb.SegmentLoadInfo{ - SegmentID: segment.ID, - PartitionID: segment.PartitionID, - CollectionID: segment.CollectionID, - BinlogPaths: segment.Binlogs, - NumOfRows: segment.NumOfRows, - Statslogs: segment.Statslogs, - Deltalogs: segment.Deltalogs, - InsertChannel: segment.InsertChannel, - IndexInfos: indexes, - StartPosition: segment.GetStartPosition(), - DeltaPosition: channelCheckpoint, - Level: segment.GetLevel(), + SegmentID: segment.ID, + PartitionID: segment.PartitionID, + CollectionID: segment.CollectionID, + BinlogPaths: segment.Binlogs, + NumOfRows: segment.NumOfRows, + Statslogs: segment.Statslogs, + Deltalogs: segment.Deltalogs, + InsertChannel: segment.InsertChannel, + IndexInfos: indexes, + StartPosition: segment.GetStartPosition(), + DeltaPosition: channelCheckpoint, + Level: segment.GetLevel(), + StorageVersion: segment.GetStorageVersion(), } loadInfo.SegmentSize = calculateSegmentSize(loadInfo) return loadInfo diff --git a/internal/querynodev2/segments/load_field_data_info.go b/internal/querynodev2/segments/load_field_data_info.go index b300b6ecf3..a261872d29 100644 --- a/internal/querynodev2/segments/load_field_data_info.go +++ b/internal/querynodev2/segments/load_field_data_info.go @@ -78,3 +78,14 @@ func (ld *LoadFieldDataInfo) appendMMapDirPath(dir string) { C.AppendMMapDirPath(ld.cLoadFieldDataInfo, cDir) } + +func (ld *LoadFieldDataInfo) appendURI(uri string) { + cURI := C.CString(uri) + defer C.free(unsafe.Pointer(cURI)) + C.SetUri(ld.cLoadFieldDataInfo, cURI) +} + +func (ld *LoadFieldDataInfo) appendStorageVersion(version int64) { + cVersion := C.int64_t(version) + C.SetStorageVersion(ld.cLoadFieldDataInfo, cVersion) +} diff --git a/internal/querynodev2/segments/load_index_info.go b/internal/querynodev2/segments/load_index_info.go index 1a830caea4..f92000c9c6 100644 --- a/internal/querynodev2/segments/load_index_info.go +++ b/internal/querynodev2/segments/load_index_info.go @@ -157,6 +157,13 @@ func (li *LoadIndexInfo) appendFieldInfo(ctx context.Context, collectionID int64 return HandleCStatus(ctx, &status, "AppendFieldInfo failed") } +func (li *LoadIndexInfo) appendStorageInfo(uri string, version int64) { + cURI := C.CString(uri) + defer C.free(unsafe.Pointer(cURI)) + cVersion := C.int64_t(version) + C.AppendStorageInfo(li.cLoadIndexInfo, cURI, cVersion) +} + // appendIndexData appends index path to cLoadIndexInfo and create index func (li *LoadIndexInfo) appendIndexData(ctx context.Context, indexKeys []string) error { for _, indexPath := range indexKeys { @@ -166,17 +173,22 @@ func (li *LoadIndexInfo) appendIndexData(ctx context.Context, indexKeys []string } } - span := trace.SpanFromContext(ctx) + var status C.CStatus + if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() { + status = C.AppendIndexV3(li.cLoadIndexInfo) + } else { + span := trace.SpanFromContext(ctx) - traceID := span.SpanContext().TraceID() - spanID := span.SpanContext().SpanID() - traceCtx := C.CTraceContext{ - traceID: (*C.uint8_t)(unsafe.Pointer(&traceID[0])), - spanID: (*C.uint8_t)(unsafe.Pointer(&spanID[0])), - flag: C.uchar(span.SpanContext().TraceFlags()), + traceID := span.SpanContext().TraceID() + spanID := span.SpanContext().SpanID() + traceCtx := C.CTraceContext{ + traceID: (*C.uint8_t)(unsafe.Pointer(&traceID[0])), + spanID: (*C.uint8_t)(unsafe.Pointer(&spanID[0])), + flag: C.uchar(span.SpanContext().TraceFlags()), + } + + status = C.AppendIndexV2(traceCtx, li.cLoadIndexInfo) } - - status := C.AppendIndexV2(traceCtx, li.cLoadIndexInfo) return HandleCStatus(ctx, &status, "AppendIndex failed") } diff --git a/internal/querynodev2/segments/mock_data.go b/internal/querynodev2/segments/mock_data.go index c49de49270..34652cf1c8 100644 --- a/internal/querynodev2/segments/mock_data.go +++ b/internal/querynodev2/segments/mock_data.go @@ -833,7 +833,8 @@ func genInsertData(msgLength int, schema *schemapb.CollectionSchema) (*storage.I } case schemapb.DataType_Array: insertData.Data[f.FieldID] = &storage.ArrayFieldData{ - Data: generateArrayArray(msgLength), + ElementType: schemapb.DataType_Int32, + Data: generateArrayArray(msgLength), } case schemapb.DataType_JSON: insertData.Data[f.FieldID] = &storage.JSONFieldData{ diff --git a/internal/querynodev2/segments/mock_loader.go b/internal/querynodev2/segments/mock_loader.go index 74d46d0ce3..704b75d898 100644 --- a/internal/querynodev2/segments/mock_loader.go +++ b/internal/querynodev2/segments/mock_loader.go @@ -172,6 +172,50 @@ func (_c *MockLoader_LoadBloomFilterSet_Call) RunAndReturn(run func(context.Cont return _c } +// LoadDelta provides a mock function with given fields: ctx, collectionID, segment +func (_m *MockLoader) LoadDelta(ctx context.Context, collectionID int64, segment *LocalSegment) error { + ret := _m.Called(ctx, collectionID, segment) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64, *LocalSegment) error); ok { + r0 = rf(ctx, collectionID, segment) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockLoader_LoadDelta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LoadDelta' +type MockLoader_LoadDelta_Call struct { + *mock.Call +} + +// LoadDelta is a helper method to define mock.On call +// - ctx context.Context +// - collectionID int64 +// - segment *LocalSegment +func (_e *MockLoader_Expecter) LoadDelta(ctx interface{}, collectionID interface{}, segment interface{}) *MockLoader_LoadDelta_Call { + return &MockLoader_LoadDelta_Call{Call: _e.mock.On("LoadDelta", ctx, collectionID, segment)} +} + +func (_c *MockLoader_LoadDelta_Call) Run(run func(ctx context.Context, collectionID int64, segment *LocalSegment)) *MockLoader_LoadDelta_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64), args[2].(*LocalSegment)) + }) + return _c +} + +func (_c *MockLoader_LoadDelta_Call) Return(_a0 error) *MockLoader_LoadDelta_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockLoader_LoadDelta_Call) RunAndReturn(run func(context.Context, int64, *LocalSegment) error) *MockLoader_LoadDelta_Call { + _c.Call.Return(run) + return _c +} + // LoadDeltaLogs provides a mock function with given fields: ctx, segment, deltaLogs func (_m *MockLoader) LoadDeltaLogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error { ret := _m.Called(ctx, segment, deltaLogs) diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 87ae1d04df..32b54735d2 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -28,9 +28,11 @@ import "C" import ( "context" "fmt" + "io" "sync" "unsafe" + "github.com/apache/arrow/go/v12/arrow/array" "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" "go.opentelemetry.io/otel" @@ -41,11 +43,15 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + milvus_storage "github.com/milvus-io/milvus-storage/go/storage" + "github.com/milvus-io/milvus-storage/go/storage/options" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/segcorepb" "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" "github.com/milvus-io/milvus/internal/storage" + typeutil_internal "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" @@ -158,6 +164,7 @@ type LocalSegment struct { lastDeltaTimestamp *atomic.Uint64 fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo] + space *milvus_storage.Space } func NewSegment(ctx context.Context, @@ -226,6 +233,71 @@ func NewSegment(ctx context.Context, return segment, nil } +func NewSegmentV2( + ctx context.Context, + collection *Collection, + segmentID int64, + partitionID int64, + collectionID int64, + shard string, + segmentType SegmentType, + version int64, + startPosition *msgpb.MsgPosition, + deltaPosition *msgpb.MsgPosition, + storageVersion int64, + level datapb.SegmentLevel, +) (Segment, error) { + /* + CSegmentInterface + NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type); + */ + if level == datapb.SegmentLevel_L0 { + return NewL0Segment(collection, segmentID, partitionID, collectionID, shard, segmentType, version, startPosition, deltaPosition) + } + var segmentPtr C.CSegmentInterface + var status C.CStatus + switch segmentType { + case SegmentTypeSealed: + status = C.NewSegment(collection.collectionPtr, C.Sealed, C.int64_t(segmentID), &segmentPtr) + case SegmentTypeGrowing: + status = C.NewSegment(collection.collectionPtr, C.Growing, C.int64_t(segmentID), &segmentPtr) + default: + return nil, fmt.Errorf("illegal segment type %d when create segment %d", segmentType, segmentID) + } + + if err := HandleCStatus(ctx, &status, "NewSegmentFailed"); err != nil { + return nil, err + } + + log.Info("create segment", + zap.Int64("collectionID", collectionID), + zap.Int64("partitionID", partitionID), + zap.Int64("segmentID", segmentID), + zap.String("segmentType", segmentType.String())) + + url, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), segmentID) + if err != nil { + return nil, err + } + space, err := milvus_storage.Open(url, options.NewSpaceOptionBuilder().SetVersion(storageVersion).Build()) + if err != nil { + return nil, err + } + + segment := &LocalSegment{ + baseSegment: newBaseSegment(segmentID, partitionID, collectionID, shard, segmentType, level, version, startPosition), + ptr: segmentPtr, + lastDeltaTimestamp: atomic.NewUint64(deltaPosition.GetTimestamp()), + fieldIndexes: typeutil.NewConcurrentMap[int64, *IndexedFieldInfo](), + space: space, + memSize: atomic.NewInt64(-1), + rowNum: atomic.NewInt64(-1), + insertCount: atomic.NewInt64(0), + } + + return segment, nil +} + func (s *LocalSegment) isValid() bool { return s.ptr != nil } @@ -671,7 +743,18 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context, rowCount int64, f var status C.CStatus GetLoadPool().Submit(func() (any, error) { - status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo) + if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() { + uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.segmentID) + if err != nil { + return nil, err + } + + loadFieldDataInfo.appendURI(uri) + loadFieldDataInfo.appendStorageVersion(s.space.GetCurrentVersion()) + status = C.LoadFieldDataV2(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo) + } else { + status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo) + } return nil, nil }).Await() if err := HandleCStatus(ctx, &status, "LoadMultiFieldData failed", @@ -719,19 +802,33 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun return err } - for _, binlog := range field.Binlogs { - err = loadFieldDataInfo.appendLoadFieldDataPath(ctx, fieldID, binlog) - if err != nil { - return err + if field != nil { + for _, binlog := range field.Binlogs { + err = loadFieldDataInfo.appendLoadFieldDataPath(ctx, fieldID, binlog) + if err != nil { + return err + } } } + loadFieldDataInfo.appendMMapDirPath(paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()) loadFieldDataInfo.enableMmap(fieldID, mmapEnabled) var status C.CStatus GetLoadPool().Submit(func() (any, error) { log.Info("submitted loadFieldData task to dy pool") - status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo) + if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() { + uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.segmentID) + if err != nil { + return nil, err + } + + loadFieldDataInfo.appendURI(uri) + loadFieldDataInfo.appendStorageVersion(s.space.GetCurrentVersion()) + status = C.LoadFieldDataV2(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo) + } else { + status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo) + } return nil, nil }).Await() if err := HandleCStatus(ctx, &status, "LoadFieldData failed", @@ -748,6 +845,95 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun return nil } +func (s *LocalSegment) LoadDeltaData2(ctx context.Context, schema *schemapb.CollectionSchema) error { + deleteReader, err := s.space.ScanDelete() + if err != nil { + return err + } + if !deleteReader.Schema().HasField(common.TimeStampFieldName) { + return fmt.Errorf("can not read timestamp field in space") + } + pkFieldSchema, err := typeutil.GetPrimaryFieldSchema(schema) + if err != nil { + return err + } + ids := &schemapb.IDs{} + var pkint64s []int64 + var pkstrings []string + var tss []int64 + for deleteReader.Next() { + rec := deleteReader.Record() + indices := rec.Schema().FieldIndices(common.TimeStampFieldName) + tss = append(tss, rec.Column(indices[0]).(*array.Int64).Int64Values()...) + indices = rec.Schema().FieldIndices(pkFieldSchema.Name) + switch pkFieldSchema.DataType { + case schemapb.DataType_Int64: + pkint64s = append(pkint64s, rec.Column(indices[0]).(*array.Int64).Int64Values()...) + case schemapb.DataType_VarChar: + columnData := rec.Column(indices[0]).(*array.String) + for i := 0; i < columnData.Len(); i++ { + pkstrings = append(pkstrings, columnData.Value(i)) + } + default: + return fmt.Errorf("unknown data type %v", pkFieldSchema.DataType) + } + } + if err := deleteReader.Err(); err != nil && err != io.EOF { + return err + } + + switch pkFieldSchema.DataType { + case schemapb.DataType_Int64: + ids.IdField = &schemapb.IDs_IntId{ + IntId: &schemapb.LongArray{ + Data: pkint64s, + }, + } + case schemapb.DataType_VarChar: + ids.IdField = &schemapb.IDs_StrId{ + StrId: &schemapb.StringArray{ + Data: pkstrings, + }, + } + default: + return fmt.Errorf("unknown data type %v", pkFieldSchema.DataType) + } + + idsBlob, err := proto.Marshal(ids) + if err != nil { + return err + } + + if len(tss) == 0 { + return nil + } + + loadInfo := C.CLoadDeletedRecordInfo{ + timestamps: unsafe.Pointer(&tss[0]), + primary_keys: (*C.uint8_t)(unsafe.Pointer(&idsBlob[0])), + primary_keys_size: C.uint64_t(len(idsBlob)), + row_count: C.int64_t(len(tss)), + } + /* + CStatus + LoadDeletedRecord(CSegmentInterface c_segment, CLoadDeletedRecordInfo deleted_record_info) + */ + var status C.CStatus + GetDynamicPool().Submit(func() (any, error) { + status = C.LoadDeletedRecord(s.ptr, loadInfo) + return nil, nil + }).Await() + + if err := HandleCStatus(ctx, &status, "LoadDeletedRecord failed"); err != nil { + return err + } + + log.Info("load deleted record done", + zap.Int("rowNum", len(tss)), + zap.String("segmentType", s.Type().String())) + return nil +} + func (s *LocalSegment) AddFieldDataInfo(ctx context.Context, rowCount int64, fields []*datapb.FieldBinlog) error { s.ptrLock.RLock() defer s.ptrLock.RUnlock() @@ -906,6 +1092,14 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn } defer deleteLoadIndexInfo(loadIndexInfo) + if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() { + uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.segmentID) + if err != nil { + return err + } + + loadIndexInfo.appendStorageInfo(uri, indexInfo.IndexStoreVersion) + } err = loadIndexInfo.appendLoadIndexInfo(ctx, indexInfo, s.collectionID, s.partitionID, s.segmentID, fieldType) if err != nil { if loadIndexInfo.cleanLocalData(ctx) != nil { diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 26fefccf63..3bb8421d41 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -19,6 +19,7 @@ package segments import ( "context" "fmt" + "io" "path" "runtime/debug" "strconv" @@ -34,10 +35,13 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + milvus_storage "github.com/milvus-io/milvus-storage/go/storage" + "github.com/milvus-io/milvus-storage/go/storage/options" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querynodev2/pkoracle" "github.com/milvus-io/milvus/internal/storage" + typeutil_internal "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -84,6 +88,385 @@ func (r *LoadResource) Sub(resource LoadResource) { r.DiskSize -= resource.DiskSize } +type segmentLoaderV2 struct { + *segmentLoader +} + +func NewLoaderV2( + manager *Manager, + cm storage.ChunkManager, +) *segmentLoaderV2 { + return &segmentLoaderV2{ + segmentLoader: NewLoader(manager, cm), + } +} + +func (loader *segmentLoaderV2) LoadDelta(ctx context.Context, collectionID int64, segment *LocalSegment) error { + collection := loader.manager.Collection.Get(collectionID) + if collection == nil { + err := merr.WrapErrCollectionNotFound(collectionID) + log.Warn("failed to get collection while loading delta", zap.Error(err)) + return err + } + return segment.LoadDeltaData2(ctx, collection.Schema()) +} + +func (loader *segmentLoaderV2) Load(ctx context.Context, + collectionID int64, + segmentType SegmentType, + version int64, + segments ...*querypb.SegmentLoadInfo, +) ([]Segment, error) { + log := log.Ctx(ctx).With( + zap.Int64("collectionID", collectionID), + zap.String("segmentType", segmentType.String()), + ) + + if len(segments) == 0 { + log.Info("no segment to load") + return nil, nil + } + // Filter out loaded & loading segments + infos := loader.prepare(segmentType, version, segments...) + defer loader.unregister(infos...) + + log.With( + zap.Int64s("requestSegments", lo.Map(segments, func(s *querypb.SegmentLoadInfo, _ int) int64 { return s.GetSegmentID() })), + zap.Int64s("preparedSegments", lo.Map(infos, func(s *querypb.SegmentLoadInfo, _ int) int64 { return s.GetSegmentID() })), + ) + + // continue to wait other task done + log.Info("start loading...", zap.Int("segmentNum", len(segments)), zap.Int("afterFilter", len(infos))) + + // Check memory & storage limit + resource, concurrencyLevel, err := loader.requestResource(ctx, infos...) + if err != nil { + log.Warn("request resource failed", zap.Error(err)) + return nil, err + } + defer loader.freeRequest(resource) + + newSegments := typeutil.NewConcurrentMap[int64, Segment]() + loaded := typeutil.NewConcurrentMap[int64, Segment]() + defer func() { + newSegments.Range(func(_ int64, s Segment) bool { + s.Release() + return true + }) + debug.FreeOSMemory() + }() + + for _, info := range infos { + segmentID := info.GetSegmentID() + partitionID := info.GetPartitionID() + collectionID := info.GetCollectionID() + shard := info.GetInsertChannel() + + collection := loader.manager.Collection.Get(collectionID) + if collection == nil { + err := merr.WrapErrCollectionNotFound(collectionID) + log.Warn("failed to get collection", zap.Error(err)) + return nil, err + } + + segment, err := NewSegmentV2(ctx, collection, segmentID, partitionID, collectionID, shard, segmentType, version, info.GetStartPosition(), info.GetDeltaPosition(), info.GetStorageVersion(), info.GetLevel()) + if err != nil { + log.Warn("load segment failed when create new segment", + zap.Int64("partitionID", partitionID), + zap.Int64("segmentID", segmentID), + zap.Error(err), + ) + return nil, err + } + + newSegments.Insert(segmentID, segment) + } + + loadSegmentFunc := func(idx int) error { + loadInfo := infos[idx] + partitionID := loadInfo.PartitionID + segmentID := loadInfo.SegmentID + segment, _ := newSegments.Get(segmentID) + + tr := timerecord.NewTimeRecorder("loadDurationPerSegment") + + var err error + if loadInfo.GetLevel() == datapb.SegmentLevel_L0 { + err = loader.LoadDelta(ctx, collectionID, segment.(*LocalSegment)) + } else { + err = loader.loadSegment(ctx, segment.(*LocalSegment), loadInfo) + } + if err != nil { + log.Warn("load segment failed when load data into memory", + zap.Int64("partitionID", partitionID), + zap.Int64("segmentID", segmentID), + zap.Error(err), + ) + return err + } + loader.manager.Segment.Put(segmentType, segment) + newSegments.GetAndRemove(segmentID) + loaded.Insert(segmentID, segment) + log.Info("load segment done", zap.Int64("segmentID", segmentID)) + loader.notifyLoadFinish(loadInfo) + + metrics.QueryNodeLoadSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(tr.ElapseSpan().Seconds()) + return nil + } + + // Start to load, + // Make sure we can always benefit from concurrency, and not spawn too many idle goroutines + log.Info("start to load segments in parallel", + zap.Int("segmentNum", len(infos)), + zap.Int("concurrencyLevel", concurrencyLevel)) + err = funcutil.ProcessFuncParallel(len(infos), + concurrencyLevel, loadSegmentFunc, "loadSegmentFunc") + if err != nil { + log.Warn("failed to load some segments", zap.Error(err)) + return nil, err + } + + // Wait for all segments loaded + if err := loader.waitSegmentLoadDone(ctx, segmentType, lo.Map(segments, func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })...); err != nil { + log.Warn("failed to wait the filtered out segments load done", zap.Error(err)) + return nil, err + } + + log.Info("all segment load done") + var result []Segment + loaded.Range(func(_ int64, s Segment) bool { + result = append(result, s) + return true + }) + return result, nil +} + +func (loader *segmentLoaderV2) LoadBloomFilterSet(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) ([]*pkoracle.BloomFilterSet, error) { + log := log.Ctx(ctx).With( + zap.Int64("collectionID", collectionID), + zap.Int64s("segmentIDs", lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) int64 { + return info.GetSegmentID() + })), + ) + + segmentNum := len(infos) + if segmentNum == 0 { + log.Info("no segment to load") + return nil, nil + } + + collection := loader.manager.Collection.Get(collectionID) + if collection == nil { + err := merr.WrapErrCollectionNotFound(collectionID) + log.Warn("failed to get collection while loading segment", zap.Error(err)) + return nil, err + } + + log.Info("start loading remote...", zap.Int("segmentNum", segmentNum)) + + loadedBfs := typeutil.NewConcurrentSet[*pkoracle.BloomFilterSet]() + // TODO check memory for bf size + loadRemoteFunc := func(idx int) error { + loadInfo := infos[idx] + partitionID := loadInfo.PartitionID + segmentID := loadInfo.SegmentID + bfs := pkoracle.NewBloomFilterSet(segmentID, partitionID, commonpb.SegmentState_Sealed) + + log.Info("loading bloom filter for remote...") + err := loader.loadBloomFilter(ctx, segmentID, bfs, loadInfo.StorageVersion) + if err != nil { + log.Warn("load remote segment bloom filter failed", + zap.Int64("partitionID", partitionID), + zap.Int64("segmentID", segmentID), + zap.Error(err), + ) + return err + } + loadedBfs.Insert(bfs) + + return nil + } + + err := funcutil.ProcessFuncParallel(segmentNum, segmentNum, loadRemoteFunc, "loadRemoteFunc") + if err != nil { + // no partial success here + log.Warn("failed to load remote segment", zap.Error(err)) + return nil, err + } + + return loadedBfs.Collect(), nil +} + +func (loader *segmentLoaderV2) loadBloomFilter(ctx context.Context, segmentID int64, bfs *pkoracle.BloomFilterSet, + storeVersion int64, +) error { + log := log.Ctx(ctx).With( + zap.Int64("segmentID", segmentID), + ) + + startTs := time.Now() + + url, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), segmentID) + if err != nil { + return err + } + space, err := milvus_storage.Open(url, options.NewSpaceOptionBuilder().SetVersion(storeVersion).Build()) + if err != nil { + return err + } + + statsBlobs := space.StatisticsBlobs() + blobs := []*storage.Blob{} + + for _, statsBlob := range statsBlobs { + blob := make([]byte, statsBlob.Size) + _, err := space.ReadBlob(statsBlob.Name, blob) + if err != nil && err != io.EOF { + return err + } + + blobs = append(blobs, &storage.Blob{Value: blob}) + } + + var stats []*storage.PrimaryKeyStats + + stats, err = storage.DeserializeStats(blobs) + if err != nil { + log.Warn("failed to deserialize stats", zap.Error(err)) + return err + } + + var size uint + for _, stat := range stats { + pkStat := &storage.PkStatistics{ + PkFilter: stat.BF, + MinPK: stat.MinPk, + MaxPK: stat.MaxPk, + } + size += stat.BF.Cap() + bfs.AddHistoricalStats(pkStat) + } + log.Info("Successfully load pk stats", zap.Duration("time", time.Since(startTs)), zap.Uint("size", size)) + return nil +} + +func (loader *segmentLoaderV2) loadSegment(ctx context.Context, + segment *LocalSegment, + loadInfo *querypb.SegmentLoadInfo, +) error { + log := log.Ctx(ctx).With( + zap.Int64("collectionID", segment.Collection()), + zap.Int64("partitionID", segment.Partition()), + zap.String("shard", segment.Shard()), + zap.Int64("segmentID", segment.ID()), + ) + log.Info("start loading segment files", + zap.Int64("rowNum", loadInfo.GetNumOfRows()), + zap.String("segmentType", segment.Type().String())) + + collection := loader.manager.Collection.Get(segment.Collection()) + if collection == nil { + err := merr.WrapErrCollectionNotFound(segment.Collection()) + log.Warn("failed to get collection while loading segment", zap.Error(err)) + return err + } + // pkField := GetPkField(collection.Schema()) + + // TODO(xige-16): Optimize the data loading process and reduce data copying + // for now, there will be multiple copies in the process of data loading into segCore + defer debug.FreeOSMemory() + + if segment.Type() == SegmentTypeSealed { + fieldsMap := typeutil.NewConcurrentMap[int64, *schemapb.FieldSchema]() + for _, field := range collection.Schema().Fields { + fieldsMap.Insert(field.FieldID, field) + } + // fieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo) + indexedFieldInfos := make(map[int64]*IndexedFieldInfo) + for _, indexInfo := range loadInfo.IndexInfos { + if indexInfo.GetIndexStoreVersion() > 0 { + fieldID := indexInfo.FieldID + fieldInfo := &IndexedFieldInfo{ + IndexInfo: indexInfo, + } + indexedFieldInfos[fieldID] = fieldInfo + fieldsMap.Remove(fieldID) + // fieldID2IndexInfo[fieldID] = indexInfo + } + } + + log.Info("load fields...", + zap.Int64s("indexedFields", lo.Keys(indexedFieldInfos)), + ) + + schemaHelper, err := typeutil.CreateSchemaHelper(collection.Schema()) + if err != nil { + return err + } + if err := loader.loadFieldsIndex(ctx, schemaHelper, segment, loadInfo.GetNumOfRows(), indexedFieldInfos); err != nil { + return err + } + + // REMOVEME + keys := make([]int64, 0) + fieldsMap.Range(func(key int64, value *schemapb.FieldSchema) bool { + keys = append(keys, key) + return true + }) + + if err := loader.loadSealedSegmentFields(ctx, segment, fieldsMap, loadInfo.GetNumOfRows()); err != nil { + return err + } + if err := segment.AddFieldDataInfo(ctx, loadInfo.GetNumOfRows(), loadInfo.GetBinlogPaths()); err != nil { + return err + } + // https://github.com/milvus-io/milvus/23654 + // legacy entry num = 0 + if err := loader.patchEntryNumber(ctx, segment, loadInfo); err != nil { + return err + } + } else { + if err := segment.LoadMultiFieldData(ctx, loadInfo.GetNumOfRows(), loadInfo.BinlogPaths); err != nil { + return err + } + } + + // load statslog if it's growing segment + if segment.typ == SegmentTypeGrowing { + log.Info("loading statslog...") + // pkStatsBinlogs, logType := loader.filterPKStatsBinlogs(loadInfo.Statslogs, pkField.GetFieldID()) + err := loader.loadBloomFilter(ctx, segment.segmentID, segment.bloomFilterSet, loadInfo.StorageVersion) + if err != nil { + return err + } + } + + log.Info("loading delta...") + return loader.LoadDelta(ctx, segment.Collection(), segment) +} + +func (loader *segmentLoaderV2) loadSealedSegmentFields(ctx context.Context, segment *LocalSegment, fields *typeutil.ConcurrentMap[int64, *schemapb.FieldSchema], rowCount int64) error { + runningGroup, _ := errgroup.WithContext(ctx) + fields.Range(func(fieldID int64, fieldSchema *schemapb.FieldSchema) bool { + runningGroup.Go(func() error { + return segment.LoadFieldData(ctx, fieldID, rowCount, nil, common.IsMmapEnabled(fieldSchema.GetTypeParams()...)) + }) + return true + }) + + err := runningGroup.Wait() + if err != nil { + return err + } + + log.Ctx(ctx).Info("load field binlogs done for sealed segment", + zap.Int64("collection", segment.collectionID), + zap.Int64("segment", segment.segmentID), + zap.String("segmentType", segment.Type().String())) + + return nil +} + func NewLoader( manager *Manager, cm storage.ChunkManager, diff --git a/internal/querynodev2/segments/segment_loader_test.go b/internal/querynodev2/segments/segment_loader_test.go index 1d0549da01..eecbf6f56d 100644 --- a/internal/querynodev2/segments/segment_loader_test.go +++ b/internal/querynodev2/segments/segment_loader_test.go @@ -22,14 +22,21 @@ import ( "testing" "time" + "github.com/apache/arrow/go/v12/arrow" + "github.com/apache/arrow/go/v12/arrow/array" + "github.com/apache/arrow/go/v12/arrow/memory" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + milvus_storage "github.com/milvus-io/milvus-storage/go/storage" + "github.com/milvus-io/milvus-storage/go/storage/options" + "github.com/milvus-io/milvus-storage/go/storage/schema" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/initcore" + "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" @@ -564,6 +571,7 @@ func (suite *SegmentLoaderSuite) TestPatchEntryNum() { func (suite *SegmentLoaderSuite) TestRunOutMemory() { ctx := context.Background() paramtable.Get().Save(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.Key, "0") + defer paramtable.Get().Reset(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.Key) msgLength := 4 @@ -770,3 +778,149 @@ func TestSegmentLoader(t *testing.T) { suite.Run(t, &SegmentLoaderSuite{}) suite.Run(t, &SegmentLoaderDetailSuite{}) } + +type SegmentLoaderV2Suite struct { + suite.Suite + loader *segmentLoaderV2 + + // Dependencies + manager *Manager + rootPath string + chunkManager storage.ChunkManager + + // Data + collectionID int64 + partitionID int64 + segmentID int64 + schema *schemapb.CollectionSchema + segmentNum int +} + +func (suite *SegmentLoaderV2Suite) SetupSuite() { + paramtable.Init() + suite.rootPath = suite.T().Name() + suite.collectionID = rand.Int63() + suite.partitionID = rand.Int63() + suite.segmentID = rand.Int63() + suite.segmentNum = 5 +} + +func (suite *SegmentLoaderV2Suite) SetupTest() { + paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("true") + // Dependencies + suite.manager = NewManager() + ctx := context.Background() + // TODO:: cpp chunk manager not support local chunk manager + // suite.chunkManager = storage.NewLocalChunkManager(storage.RootPath( + // fmt.Sprintf("/tmp/milvus-ut/%d", rand.Int63()))) + chunkManagerFactory := NewTestChunkManagerFactory(paramtable.Get(), suite.rootPath) + suite.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(ctx) + suite.loader = NewLoaderV2(suite.manager, suite.chunkManager) + initcore.InitRemoteChunkManager(paramtable.Get()) + + // Data + suite.schema = GenTestCollectionSchema("test", schemapb.DataType_Int64) + indexMeta := GenTestIndexMeta(suite.collectionID, suite.schema) + loadMeta := &querypb.LoadMetaInfo{ + LoadType: querypb.LoadType_LoadCollection, + CollectionID: suite.collectionID, + PartitionIDs: []int64{suite.partitionID}, + } + suite.manager.Collection.PutOrRef(suite.collectionID, suite.schema, indexMeta, loadMeta) +} + +func (suite *SegmentLoaderV2Suite) TearDownTest() { + ctx := context.Background() + for i := 0; i < suite.segmentNum; i++ { + suite.manager.Segment.Remove(suite.segmentID+int64(i), querypb.DataScope_All) + } + suite.chunkManager.RemoveWithPrefix(ctx, suite.rootPath) + paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("false") +} + +func (suite *SegmentLoaderV2Suite) TestLoad() { + tmpDir := suite.T().TempDir() + paramtable.Get().CommonCfg.StorageScheme.SwapTempValue("file") + paramtable.Get().CommonCfg.StoragePathPrefix.SwapTempValue(tmpDir) + ctx := context.Background() + + msgLength := 4 + + arrowSchema, err := typeutil.ConvertToArrowSchema(suite.schema.Fields) + suite.NoError(err) + opt := options.NewSpaceOptionBuilder(). + SetSchema(schema.NewSchema( + arrowSchema, + &schema.SchemaOptions{ + PrimaryColumn: "int64Field", + VectorColumn: "floatVectorField", + VersionColumn: "Timestamp", + })). + Build() + uri, err := typeutil.GetStorageURI("file", tmpDir, suite.segmentID) + suite.NoError(err) + space, err := milvus_storage.Open(uri, opt) + suite.NoError(err) + + b := array.NewRecordBuilder(memory.DefaultAllocator, arrowSchema) + defer b.Release() + insertData, err := genInsertData(msgLength, suite.schema) + suite.NoError(err) + + err = typeutil.BuildRecord(b, insertData, suite.schema.Fields) + suite.NoError(err) + rec := b.NewRecord() + defer rec.Release() + reader, err := array.NewRecordReader(arrowSchema, []arrow.Record{rec}) + suite.NoError(err) + err = space.Write(reader, &options.DefaultWriteOptions) + suite.NoError(err) + + collMeta := genCollectionMeta(suite.collectionID, suite.partitionID, suite.schema) + inCodec := storage.NewInsertCodecWithSchema(collMeta) + statsLog, err := inCodec.SerializePkStatsByData(insertData) + suite.NoError(err) + + err = space.WriteBlob(statsLog.Value, statsLog.Key, false) + suite.NoError(err) + + dschema := space.Manifest().GetSchema().DeleteSchema() + dbuilder := array.NewRecordBuilder(memory.DefaultAllocator, dschema) + defer dbuilder.Release() + dbuilder.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2}, nil) + dbuilder.Field(1).(*array.Int64Builder).AppendValues([]int64{100, 200}, nil) + + drec := dbuilder.NewRecord() + defer drec.Release() + + dreader, err := array.NewRecordReader(dschema, []arrow.Record{drec}) + suite.NoError(err) + + err = space.Delete(dreader) + suite.NoError(err) + + segments, err := suite.loader.Load(ctx, suite.collectionID, SegmentTypeSealed, 0, &querypb.SegmentLoadInfo{ + SegmentID: suite.segmentID, + PartitionID: suite.partitionID, + CollectionID: suite.collectionID, + NumOfRows: int64(msgLength), + StorageVersion: 3, + }) + suite.NoError(err) + + _, err = suite.loader.LoadBloomFilterSet(ctx, suite.collectionID, 0, &querypb.SegmentLoadInfo{ + SegmentID: suite.segmentID, + PartitionID: suite.partitionID, + CollectionID: suite.collectionID, + NumOfRows: int64(msgLength), + StorageVersion: 3, + }) + suite.NoError(err) + + segment := segments[0] + suite.Equal(int64(msgLength-2), segment.RowNum()) +} + +func TestSegmentLoaderV2(t *testing.T) { + suite.Run(t, &SegmentLoaderV2Suite{}) +} diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 4f5c7decfd..af1d76630b 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -345,7 +345,11 @@ func (node *QueryNode) Init() error { node.subscribingChannels = typeutil.NewConcurrentSet[string]() node.unsubscribingChannels = typeutil.NewConcurrentSet[string]() node.manager = segments.NewManager() - node.loader = segments.NewLoader(node.manager, node.chunkManager) + if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() { + node.loader = segments.NewLoaderV2(node.manager, node.chunkManager) + } else { + node.loader = segments.NewLoader(node.manager, node.chunkManager) + } node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.QueryNodeRole, paramtable.GetNodeID()) // init pipeline manager node.pipelineManager = pipeline.NewManager(node.manager, node.tSafeManager, node.dispClient, node.delegators) diff --git a/internal/util/typeutil/schema.go b/internal/util/typeutil/schema.go index 658f8103a6..77d4159b83 100644 --- a/internal/util/typeutil/schema.go +++ b/internal/util/typeutil/schema.go @@ -53,13 +53,9 @@ func ConvertToArrowSchema(fields []*schemapb.FieldSchema) (*arrow.Schema, error) Type: arrow.BinaryTypes.String, }) case schemapb.DataType_Array: - elemType, err := convertToArrowType(field.ElementType) - if err != nil { - return nil, err - } arrowFields = append(arrowFields, arrow.Field{ Name: field.Name, - Type: arrow.ListOf(elemType), + Type: arrow.BinaryTypes.Binary, }) case schemapb.DataType_JSON: arrowFields = append(arrowFields, arrow.Field{ diff --git a/internal/util/typeutil/storage.go b/internal/util/typeutil/storage.go new file mode 100644 index 0000000000..443b1f93c3 --- /dev/null +++ b/internal/util/typeutil/storage.go @@ -0,0 +1,122 @@ +package typeutil + +import ( + "fmt" + "math" + "path" + + "github.com/apache/arrow/go/v12/arrow/array" + "github.com/golang/protobuf/proto" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +func GetStorageURI(protocol, pathPrefix string, segmentID int64) (string, error) { + switch protocol { + case "s3": + var scheme string + if paramtable.Get().MinioCfg.UseSSL.GetAsBool() { + scheme = "https" + } else { + scheme = "http" + } + if pathPrefix != "" { + cleanPath := path.Clean(pathPrefix) + return fmt.Sprintf("s3://%s:%s@%s/%s/%d?scheme=%s&endpoint_override=%s&allow_bucket_creation=true", paramtable.Get().MinioCfg.AccessKeyID.GetValue(), paramtable.Get().MinioCfg.SecretAccessKey.GetValue(), paramtable.Get().MinioCfg.BucketName.GetValue(), cleanPath, segmentID, scheme, paramtable.Get().MinioCfg.Address.GetValue()), nil + } else { + return fmt.Sprintf("s3://%s:%s@%s/%d?scheme=%s&endpoint_override=%s&allow_bucket_creation=true", paramtable.Get().MinioCfg.AccessKeyID.GetValue(), paramtable.Get().MinioCfg.SecretAccessKey.GetValue(), paramtable.Get().MinioCfg.BucketName.GetValue(), segmentID, scheme, paramtable.Get().MinioCfg.Address.GetValue()), nil + } + case "file": + if pathPrefix != "" { + cleanPath := path.Clean(pathPrefix) + return fmt.Sprintf("file://%s/%d", cleanPath, segmentID), nil + } else { + return fmt.Sprintf("file://%d", segmentID), nil + } + default: + return "", merr.WrapErrParameterInvalidMsg("unsupported schema %s", protocol) + } +} + +func BuildRecord(b *array.RecordBuilder, data *storage.InsertData, fields []*schemapb.FieldSchema) error { + if data == nil { + log.Info("no buffer data to flush") + return nil + } + for i, field := range fields { + fBuilder := b.Field(i) + switch field.DataType { + case schemapb.DataType_Bool: + fBuilder.(*array.BooleanBuilder).AppendValues(data.Data[field.FieldID].(*storage.BoolFieldData).Data, nil) + case schemapb.DataType_Int8: + fBuilder.(*array.Int8Builder).AppendValues(data.Data[field.FieldID].(*storage.Int8FieldData).Data, nil) + case schemapb.DataType_Int16: + fBuilder.(*array.Int16Builder).AppendValues(data.Data[field.FieldID].(*storage.Int16FieldData).Data, nil) + case schemapb.DataType_Int32: + fBuilder.(*array.Int32Builder).AppendValues(data.Data[field.FieldID].(*storage.Int32FieldData).Data, nil) + case schemapb.DataType_Int64: + fBuilder.(*array.Int64Builder).AppendValues(data.Data[field.FieldID].(*storage.Int64FieldData).Data, nil) + case schemapb.DataType_Float: + fBuilder.(*array.Float32Builder).AppendValues(data.Data[field.FieldID].(*storage.FloatFieldData).Data, nil) + case schemapb.DataType_Double: + fBuilder.(*array.Float64Builder).AppendValues(data.Data[field.FieldID].(*storage.DoubleFieldData).Data, nil) + case schemapb.DataType_VarChar, schemapb.DataType_String: + fBuilder.(*array.StringBuilder).AppendValues(data.Data[field.FieldID].(*storage.StringFieldData).Data, nil) + case schemapb.DataType_Array: + for _, data := range data.Data[field.FieldID].(*storage.ArrayFieldData).Data { + marsheled, err := proto.Marshal(data) + if err != nil { + return err + } + fBuilder.(*array.BinaryBuilder).Append(marsheled) + } + case schemapb.DataType_JSON: + fBuilder.(*array.BinaryBuilder).AppendValues(data.Data[field.FieldID].(*storage.JSONFieldData).Data, nil) + case schemapb.DataType_BinaryVector: + vecData := data.Data[field.FieldID].(*storage.BinaryVectorFieldData) + for i := 0; i < len(vecData.Data); i += vecData.Dim / 8 { + fBuilder.(*array.FixedSizeBinaryBuilder).Append(vecData.Data[i : i+vecData.Dim/8]) + } + case schemapb.DataType_FloatVector: + vecData := data.Data[field.FieldID].(*storage.FloatVectorFieldData) + builder := fBuilder.(*array.FixedSizeBinaryBuilder) + dim := vecData.Dim + data := vecData.Data + byteLength := dim * 4 + length := len(data) / dim + + builder.Reserve(length) + bytesData := make([]byte, byteLength) + for i := 0; i < length; i++ { + vec := data[i*dim : (i+1)*dim] + for j := range vec { + bytes := math.Float32bits(vec[j]) + common.Endian.PutUint32(bytesData[j*4:], bytes) + } + builder.Append(bytesData) + } + case schemapb.DataType_Float16Vector: + vecData := data.Data[field.FieldID].(*storage.Float16VectorFieldData) + builder := fBuilder.(*array.FixedSizeBinaryBuilder) + dim := vecData.Dim + data := vecData.Data + byteLength := dim * 2 + length := len(data) / byteLength + + builder.Reserve(length) + for i := 0; i < length; i++ { + builder.Append(data[i*byteLength : (i+1)*byteLength]) + } + + default: + return merr.WrapErrParameterInvalidMsg("unknown type %v", field.DataType.String()) + } + } + + return nil +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index a6d9e83a76..67262948bc 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -226,11 +226,11 @@ type commonConfig struct { LockSlowLogInfoThreshold ParamItem `refreshable:"true"` LockSlowLogWarnThreshold ParamItem `refreshable:"true"` - StorageScheme ParamItem `refreshable:"false"` - EnableStorageV2 ParamItem `refreshable:"false"` - TTMsgEnabled ParamItem `refreshable:"true"` - TraceLogMode ParamItem `refreshable:"true"` - + StorageScheme ParamItem `refreshable:"false"` + EnableStorageV2 ParamItem `refreshable:"false"` + StoragePathPrefix ParamItem `refreshable:"false"` + TTMsgEnabled ParamItem `refreshable:"true"` + TraceLogMode ParamItem `refreshable:"true"` BloomFilterSize ParamItem `refreshable:"true"` MaxBloomFalsePositive ParamItem `refreshable:"true"` } @@ -660,6 +660,13 @@ like the old password verification when updating the credential`, } p.StorageScheme.Init(base.mgr) + p.StoragePathPrefix = ParamItem{ + Key: "common.storage.pathPrefix", + Version: "2.3.4", + DefaultValue: "", + } + p.StoragePathPrefix.Init(base.mgr) + p.TTMsgEnabled = ParamItem{ Key: "common.ttMsgEnabled", Version: "2.3.2",