feat: integrate storagev2 into loading segment (#29336)

issue: #29335

---------

Signed-off-by: sunby <sunbingyi1992@gmail.com>
pull/29948/head
Bingyi Sun 2024-01-12 18:10:51 +08:00 committed by GitHub
parent f2e36db488
commit e1258b8cad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1030 additions and 201 deletions

2
go.mod
View File

@ -58,7 +58,7 @@ require (
require github.com/apache/arrow/go/v12 v12.0.1
require github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092
require github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70
require (
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000

14
go.sum
View File

@ -589,6 +589,20 @@ github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240109020841-d367b5a59df1
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240109020841-d367b5a59df1/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092 h1:UYJ7JB+QlMOoFHNdd8mUa3/lV63t9dnBX7ILXmEEWPY=
github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho=
github.com/milvus-io/milvus-storage/go v0.0.0-20231213080429-ed6b9bd5c9d2 h1:2epYWKCSY6Rq/aJ/6UyUS1d3+Yts0UK8HNiWGjVN4Pc=
github.com/milvus-io/milvus-storage/go v0.0.0-20231213080429-ed6b9bd5c9d2/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho=
github.com/milvus-io/milvus-storage/go v0.0.0-20231226033437-76e506e3ae48 h1:EXDWA9yjmLLjIlIFjTdwtA3p1G0FDJdT07QdgCAWFWU=
github.com/milvus-io/milvus-storage/go v0.0.0-20231226033437-76e506e3ae48/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho=
github.com/milvus-io/milvus-storage/go v0.0.0-20231226075239-137cb5c55a5f h1:l43tW6aahbKcatIsX2X1guQktWSv/wgCBcGhmMWJgTg=
github.com/milvus-io/milvus-storage/go v0.0.0-20231226075239-137cb5c55a5f/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho=
github.com/milvus-io/milvus-storage/go v0.0.0-20231226081638-4a9a35e739b6 h1:v8WP0xJoOFno/YKdTrVfjWNn/VBmRX4IirK3/dhtH+8=
github.com/milvus-io/milvus-storage/go v0.0.0-20231226081638-4a9a35e739b6/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho=
github.com/milvus-io/milvus-storage/go v0.0.0-20231226083239-422d03dd1e1c h1:Xnc1Jt4joXVu2OsZp3xNZYQ/rKptRfRzYIHNaZkCpF8=
github.com/milvus-io/milvus-storage/go v0.0.0-20231226083239-422d03dd1e1c/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho=
github.com/milvus-io/milvus-storage/go v0.0.0-20231226085237-57519406e94f h1:4qnOXYGDVXdbIWUp9tk+JYtQ58QKf5d8q+XVk9+UVXo=
github.com/milvus-io/milvus-storage/go v0.0.0-20231226085237-57519406e94f/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho=
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 h1:Z+sp64fmAOxAG7mU0dfVOXvAXlwRB0c8a96rIM5HevI=
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=

View File

@ -77,6 +77,7 @@ class IndexFactory {
IndexCreatorBasePtr
CreateIndex(DataType type,
const std::string& field_name,
const int64_t dim,
Config& config,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space) {
@ -101,7 +102,7 @@ class IndexFactory {
case DataType::VECTOR_FLOAT16:
case DataType::VECTOR_BFLOAT16:
return std::make_unique<VecIndexCreator>(
type, field_name, config, file_manager_context, space);
type, field_name, dim, config, file_manager_context, space);
default:
throw std::invalid_argument(invalid_dtype_msg);
}

View File

@ -24,12 +24,13 @@ VecIndexCreator::VecIndexCreator(
DataType data_type,
Config& config,
const storage::FileManagerContext& file_manager_context)
: VecIndexCreator(data_type, "", config, file_manager_context, nullptr) {
: VecIndexCreator(data_type, "", 0, config, file_manager_context, nullptr) {
}
VecIndexCreator::VecIndexCreator(
DataType data_type,
const std::string& field_name,
const int64_t dim,
Config& config,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space)
@ -41,6 +42,7 @@ VecIndexCreator::VecIndexCreator(
index_info.field_name = field_name;
index_info.index_engine_version =
index::GetIndexEngineVersionFromConfig(config_);
index_info.dim = dim;
index_ = index::IndexFactory::GetInstance().CreateIndex(
index_info, file_manager_context, space_);

View File

@ -35,6 +35,7 @@ class VecIndexCreator : public IndexCreatorBase {
VecIndexCreator(DataType data_type,
const std::string& field_name,
const int64_t dim,
Config& config,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space);

View File

@ -152,6 +152,7 @@ CreateIndexV2(CIndex* res_index, CBuildIndexInfo c_build_index_info) {
auto field_type = build_index_info->field_type;
milvus::index::CreateIndexInfo index_info;
index_info.field_type = build_index_info->field_type;
index_info.dim = build_index_info->dim;
auto& config = build_index_info->config;
// get index type
@ -217,6 +218,7 @@ CreateIndexV2(CIndex* res_index, CBuildIndexInfo c_build_index_info) {
milvus::indexbuilder::IndexFactory::GetInstance().CreateIndex(
build_index_info->field_type,
build_index_info->field_name,
build_index_info->dim,
config,
fileManagerContext,
std::move(store_space.value()));

View File

@ -719,10 +719,8 @@ void
LoadFieldDatasFromRemote2(std::shared_ptr<milvus_storage::Space> space,
SchemaPtr schema,
FieldDataInfo& field_data_info) {
// log all schema ids
for (auto& field : schema->get_fields()) {
}
auto res = space->ScanData();
if (!res.ok()) {
PanicInfo(S3Error, "failed to create scan iterator");
}

View File

@ -25,6 +25,7 @@
// #include "common/Schema.h"
#include "common/Types.h"
#include "index/Index.h"
#include "log/Log.h"
#include "segcore/DeletedRecord.h"
#include "segcore/InsertRecord.h"
#include "storage/space.h"

View File

@ -93,7 +93,7 @@ DiskFileManagerImpl::AddFileUsingSpace(
for (int64_t i = 0; i < remote_files.size(); ++i) {
auto data = LoadIndexFromDisk(
local_file_name, local_file_offsets[i], remote_file_sizes[i]);
auto status = space_->WriteBolb(
auto status = space_->WriteBlob(
remote_files[i], data.get(), remote_file_sizes[i]);
if (!status.ok()) {
return false;

View File

@ -479,7 +479,7 @@ EncodeAndUploadIndexSlice2(std::shared_ptr<milvus_storage::Space> space,
indexData->SetFieldDataMeta(field_meta);
auto serialized_index_data = indexData->serialize_to_remote_file();
auto serialized_index_size = serialized_index_data.size();
auto status = space->WriteBolb(
auto status = space->WriteBlob(
object_key, serialized_index_data.data(), serialized_index_size);
AssertInfo(status.ok(), "write to space error: {}", status.ToString());
return std::make_pair(std::move(object_key), serialized_index_size);

View File

@ -11,7 +11,7 @@
# or implied. See the License for the specific language governing permissions and limitations under the License.
#-------------------------------------------------------------------------------
set( MILVUS_STORAGE_VERSION c7107a0)
set( MILVUS_STORAGE_VERSION 4a9a35e)
message(STATUS "Building milvus-storage-${MILVUS_STORAGE_VERSION} from source")
message(STATUS ${CMAKE_BUILD_TYPE})

View File

@ -18,7 +18,6 @@ package datacoord
import (
"context"
"fmt"
"path"
"sync"
"time"
@ -28,8 +27,10 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -323,12 +324,20 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
}
}
dim, _ := storage.GetDimFromParams(field.TypeParams)
var scheme string
if Params.MinioCfg.UseSSL.GetAsBool() {
scheme = "https"
} else {
scheme = "http"
dim, err := storage.GetDimFromParams(field.TypeParams)
if err != nil {
return false
}
storePath, err := typeutil.GetStorageURI(params.Params.CommonCfg.StorageScheme.GetValue(), params.Params.CommonCfg.StoragePathPrefix.GetValue(), segment.GetID())
if err != nil {
log.Ctx(ib.ctx).Warn("failed to get storage uri", zap.Error(err))
return false
}
indexStorePath, err := typeutil.GetStorageURI(params.Params.CommonCfg.StorageScheme.GetValue(), params.Params.CommonCfg.StoragePathPrefix.GetValue()+"/index", segment.GetID())
if err != nil {
log.Ctx(ib.ctx).Warn("failed to get storage uri", zap.Error(err))
return false
}
req = &indexpb.CreateJobRequest{
@ -347,9 +356,9 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
FieldID: fieldID,
FieldName: field.Name,
FieldType: field.DataType,
StorePath: fmt.Sprintf("s3://%s:%s@%s/%d?scheme=%s&endpoint_override=%s&allow_bucket_creation=true", Params.MinioCfg.AccessKeyID.GetValue(), Params.MinioCfg.SecretAccessKey.GetValue(), Params.MinioCfg.BucketName.GetValue(), segment.GetID(), scheme, Params.MinioCfg.Address.GetValue()),
StorePath: storePath,
StoreVersion: segment.GetStorageVersion(),
IndexStorePath: fmt.Sprintf("s3://%s:%s@%s/index/%d?scheme=%s&endpoint_override=%s&allow_bucket_creation=true", Params.MinioCfg.AccessKeyID.GetValue(), Params.MinioCfg.SecretAccessKey.GetValue(), Params.MinioCfg.BucketName.GetValue(), segment.GetID(), scheme, Params.MinioCfg.Address.GetValue()),
IndexStorePath: indexStorePath,
Dim: int64(dim),
CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(),
}

View File

@ -489,7 +489,7 @@ func CreateL0Operator(collectionID, partitionID, segmentID int64, channel string
func UpdateStorageVersionOperator(segmentID int64, version int64) UpdateOperator {
return func(modPack *updateSegmentPack) bool {
segment := modPack.meta.GetSegment(segmentID)
segment := modPack.Get(segmentID)
if segment == nil {
log.Info("meta update: update storage version - segment not found",
zap.Int64("segmentID", segmentID))

View File

@ -97,7 +97,7 @@ func (s *storageV2Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack)
return nil, err
}
task.batchStatsBlob = batchStatsBlob
task.statsBlob = batchStatsBlob
s.metacache.UpdateSegments(metacache.RollStats(singlePKStats), metacache.WithSegmentIDs(pack.segmentID))
}
@ -155,7 +155,7 @@ func (s *storageV2Serializer) serializeInsertData(pack *SyncPack) (array.RecordR
builder := array.NewRecordBuilder(memory.DefaultAllocator, s.arrowSchema)
defer builder.Release()
if err := buildRecord(builder, pack.insertData, s.schema.GetFields()); err != nil {
if err := iTypeutil.BuildRecord(builder, pack.insertData, s.schema.GetFields()); err != nil {
return nil, err
}
@ -221,13 +221,10 @@ func (s *storageV2Serializer) serializeDeltaData(pack *SyncPack) (array.RecordRe
func SpaceCreatorFunc(segmentID int64, collSchema *schemapb.CollectionSchema, arrowSchema *arrow.Schema) func() (*milvus_storage.Space, error) {
return func() (*milvus_storage.Space, error) {
url := fmt.Sprintf("%s://%s:%s@%s/%d?endpoint_override=%s",
params.Params.CommonCfg.StorageScheme.GetValue(),
params.Params.MinioCfg.AccessKeyID.GetValue(),
params.Params.MinioCfg.SecretAccessKey.GetValue(),
params.Params.MinioCfg.BucketName.GetValue(),
segmentID,
params.Params.MinioCfg.Address.GetValue())
url, err := iTypeutil.GetStorageURI(params.Params.CommonCfg.StorageScheme.GetValue(), params.Params.CommonCfg.StoragePathPrefix.GetValue(), segmentID)
if err != nil {
return nil, err
}
pkSchema, err := typeutil.GetPrimaryFieldSchema(collSchema)
if err != nil {

View File

@ -249,7 +249,7 @@ func (s *StorageV2SerializerSuite) TestSerializeInsert() {
s.EqualValues(50, taskV2.tsFrom)
s.EqualValues(100, taskV2.tsTo)
s.NotNil(taskV2.reader)
s.NotNil(taskV2.batchStatsBlob)
s.NotNil(taskV2.statsBlob)
})
s.Run("with_flush_segment_not_found", func() {

View File

@ -18,7 +18,6 @@ package syncmgr
import (
"context"
"math"
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
@ -33,7 +32,6 @@ import (
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/retry"
@ -133,139 +131,10 @@ func (t *SyncTaskV2) writeSpace() error {
}
func (t *SyncTaskV2) writeMeta() error {
t.storageVersion = t.space.GetCurrentVersion()
return t.metaWriter.UpdateSyncV2(t)
}
func buildRecord(b *array.RecordBuilder, data *storage.InsertData, fields []*schemapb.FieldSchema) error {
if data == nil {
log.Info("no buffer data to flush")
return nil
}
for i, field := range fields {
fBuilder := b.Field(i)
switch field.DataType {
case schemapb.DataType_Bool:
fBuilder.(*array.BooleanBuilder).AppendValues(data.Data[field.FieldID].(*storage.BoolFieldData).Data, nil)
case schemapb.DataType_Int8:
fBuilder.(*array.Int8Builder).AppendValues(data.Data[field.FieldID].(*storage.Int8FieldData).Data, nil)
case schemapb.DataType_Int16:
fBuilder.(*array.Int16Builder).AppendValues(data.Data[field.FieldID].(*storage.Int16FieldData).Data, nil)
case schemapb.DataType_Int32:
fBuilder.(*array.Int32Builder).AppendValues(data.Data[field.FieldID].(*storage.Int32FieldData).Data, nil)
case schemapb.DataType_Int64:
fBuilder.(*array.Int64Builder).AppendValues(data.Data[field.FieldID].(*storage.Int64FieldData).Data, nil)
case schemapb.DataType_Float:
fBuilder.(*array.Float32Builder).AppendValues(data.Data[field.FieldID].(*storage.FloatFieldData).Data, nil)
case schemapb.DataType_Double:
fBuilder.(*array.Float64Builder).AppendValues(data.Data[field.FieldID].(*storage.DoubleFieldData).Data, nil)
case schemapb.DataType_VarChar, schemapb.DataType_String:
fBuilder.(*array.StringBuilder).AppendValues(data.Data[field.FieldID].(*storage.StringFieldData).Data, nil)
case schemapb.DataType_Array:
appendListValues(fBuilder.(*array.ListBuilder), data.Data[field.FieldID].(*storage.ArrayFieldData))
case schemapb.DataType_JSON:
fBuilder.(*array.BinaryBuilder).AppendValues(data.Data[field.FieldID].(*storage.JSONFieldData).Data, nil)
case schemapb.DataType_BinaryVector:
vecData := data.Data[field.FieldID].(*storage.BinaryVectorFieldData)
for i := 0; i < len(vecData.Data); i += vecData.Dim / 8 {
fBuilder.(*array.FixedSizeBinaryBuilder).Append(vecData.Data[i : i+vecData.Dim/8])
}
case schemapb.DataType_FloatVector:
vecData := data.Data[field.FieldID].(*storage.FloatVectorFieldData)
builder := fBuilder.(*array.FixedSizeBinaryBuilder)
dim := vecData.Dim
data := vecData.Data
byteLength := dim * 4
length := len(data) / dim
builder.Reserve(length)
bytesData := make([]byte, byteLength)
for i := 0; i < length; i++ {
vec := data[i*dim : (i+1)*dim]
for j := range vec {
bytes := math.Float32bits(vec[j])
common.Endian.PutUint32(bytesData[j*4:], bytes)
}
builder.Append(bytesData)
}
case schemapb.DataType_Float16Vector:
vecData := data.Data[field.FieldID].(*storage.Float16VectorFieldData)
builder := fBuilder.(*array.FixedSizeBinaryBuilder)
dim := vecData.Dim
data := vecData.Data
byteLength := dim * 2
length := len(data) / byteLength
builder.Reserve(length)
for i := 0; i < length; i++ {
builder.Append(data[i*byteLength : (i+1)*byteLength])
}
default:
return merr.WrapErrParameterInvalidMsg("unknown type %v", field.DataType.String())
}
}
return nil
}
func appendListValues(builder *array.ListBuilder, data *storage.ArrayFieldData) error {
vb := builder.ValueBuilder()
switch data.ElementType {
case schemapb.DataType_Bool:
for _, data := range data.Data {
builder.Append(true)
vb.(*array.BooleanBuilder).AppendValues(data.GetBoolData().Data, nil)
}
case schemapb.DataType_Int8:
for _, data := range data.Data {
builder.Append(true)
vb.(*array.Int8Builder).AppendValues(castIntArray[int8](data.GetIntData().Data), nil)
}
case schemapb.DataType_Int16:
for _, data := range data.Data {
builder.Append(true)
vb.(*array.Int16Builder).AppendValues(castIntArray[int16](data.GetIntData().Data), nil)
}
case schemapb.DataType_Int32:
for _, data := range data.Data {
builder.Append(true)
vb.(*array.Int32Builder).AppendValues(data.GetIntData().Data, nil)
}
case schemapb.DataType_Int64:
for _, data := range data.Data {
builder.Append(true)
vb.(*array.Int64Builder).AppendValues(data.GetLongData().Data, nil)
}
case schemapb.DataType_Float:
for _, data := range data.Data {
builder.Append(true)
vb.(*array.Float32Builder).AppendValues(data.GetFloatData().Data, nil)
}
case schemapb.DataType_Double:
for _, data := range data.Data {
builder.Append(true)
vb.(*array.Float64Builder).AppendValues(data.GetDoubleData().Data, nil)
}
case schemapb.DataType_String, schemapb.DataType_VarChar:
for _, data := range data.Data {
builder.Append(true)
vb.(*array.StringBuilder).AppendValues(data.GetStringData().Data, nil)
}
default:
return merr.WrapErrParameterInvalidMsg("unknown type %v", data.ElementType.String())
}
return nil
}
func castIntArray[T int8 | int16](nums []int32) []T {
ret := make([]T, 0, len(nums))
for _, n := range nums {
ret = append(ret, T(n))
}
return ret
}
func NewSyncTaskV2() *SyncTaskV2 {
return &SyncTaskV2{
SyncTask: NewSyncTask(),

View File

@ -316,7 +316,7 @@ func (s *SyncTaskSuiteV2) TestBuildRecord() {
},
}
err = buildRecord(b, data, fieldSchemas)
err = typeutil.BuildRecord(b, data, fieldSchemas)
s.NoError(err)
s.EqualValues(2, b.NewRecord().NumRows())
}

View File

@ -63,6 +63,10 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest
zap.Any("indexParams", req.GetIndexParams()),
zap.Int64("numRows", req.GetNumRows()),
zap.Int32("current_index_version", req.GetCurrentIndexVersion()),
zap.Any("storepath", req.GetStorePath()),
zap.Any("storeversion", req.GetStoreVersion()),
zap.Any("indexstorepath", req.GetIndexStorePath()),
zap.Any("dim", req.GetDim()),
)
ctx, sp := otel.Tracer(typeutil.IndexNodeRole).Start(ctx, "IndexNode-CreateIndex", trace.WithAttributes(
attribute.Int64("indexBuildID", req.GetBuildID()),

View File

@ -265,6 +265,7 @@ message SegmentLoadInfo {
msg.MsgPosition delta_position = 15;
int64 readableVersion = 16;
data.SegmentLevel level = 17;
int64 storageVersion = 18;
}
message FieldIndexInfo {
@ -280,6 +281,7 @@ message FieldIndexInfo {
int64 index_version = 9;
int64 num_rows = 10;
int32 current_index_version = 11;
int64 index_store_version = 12;
}
enum LoadScope {

View File

@ -72,18 +72,19 @@ func PackSegmentLoadInfo(segment *datapb.SegmentInfo, channelCheckpoint *msgpb.M
zap.Duration("tsLag", tsLag))
}
loadInfo := &querypb.SegmentLoadInfo{
SegmentID: segment.ID,
PartitionID: segment.PartitionID,
CollectionID: segment.CollectionID,
BinlogPaths: segment.Binlogs,
NumOfRows: segment.NumOfRows,
Statslogs: segment.Statslogs,
Deltalogs: segment.Deltalogs,
InsertChannel: segment.InsertChannel,
IndexInfos: indexes,
StartPosition: segment.GetStartPosition(),
DeltaPosition: channelCheckpoint,
Level: segment.GetLevel(),
SegmentID: segment.ID,
PartitionID: segment.PartitionID,
CollectionID: segment.CollectionID,
BinlogPaths: segment.Binlogs,
NumOfRows: segment.NumOfRows,
Statslogs: segment.Statslogs,
Deltalogs: segment.Deltalogs,
InsertChannel: segment.InsertChannel,
IndexInfos: indexes,
StartPosition: segment.GetStartPosition(),
DeltaPosition: channelCheckpoint,
Level: segment.GetLevel(),
StorageVersion: segment.GetStorageVersion(),
}
loadInfo.SegmentSize = calculateSegmentSize(loadInfo)
return loadInfo

View File

@ -78,3 +78,14 @@ func (ld *LoadFieldDataInfo) appendMMapDirPath(dir string) {
C.AppendMMapDirPath(ld.cLoadFieldDataInfo, cDir)
}
func (ld *LoadFieldDataInfo) appendURI(uri string) {
cURI := C.CString(uri)
defer C.free(unsafe.Pointer(cURI))
C.SetUri(ld.cLoadFieldDataInfo, cURI)
}
func (ld *LoadFieldDataInfo) appendStorageVersion(version int64) {
cVersion := C.int64_t(version)
C.SetStorageVersion(ld.cLoadFieldDataInfo, cVersion)
}

View File

@ -157,6 +157,13 @@ func (li *LoadIndexInfo) appendFieldInfo(ctx context.Context, collectionID int64
return HandleCStatus(ctx, &status, "AppendFieldInfo failed")
}
func (li *LoadIndexInfo) appendStorageInfo(uri string, version int64) {
cURI := C.CString(uri)
defer C.free(unsafe.Pointer(cURI))
cVersion := C.int64_t(version)
C.AppendStorageInfo(li.cLoadIndexInfo, cURI, cVersion)
}
// appendIndexData appends index path to cLoadIndexInfo and create index
func (li *LoadIndexInfo) appendIndexData(ctx context.Context, indexKeys []string) error {
for _, indexPath := range indexKeys {
@ -166,17 +173,22 @@ func (li *LoadIndexInfo) appendIndexData(ctx context.Context, indexKeys []string
}
}
span := trace.SpanFromContext(ctx)
var status C.CStatus
if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() {
status = C.AppendIndexV3(li.cLoadIndexInfo)
} else {
span := trace.SpanFromContext(ctx)
traceID := span.SpanContext().TraceID()
spanID := span.SpanContext().SpanID()
traceCtx := C.CTraceContext{
traceID: (*C.uint8_t)(unsafe.Pointer(&traceID[0])),
spanID: (*C.uint8_t)(unsafe.Pointer(&spanID[0])),
flag: C.uchar(span.SpanContext().TraceFlags()),
traceID := span.SpanContext().TraceID()
spanID := span.SpanContext().SpanID()
traceCtx := C.CTraceContext{
traceID: (*C.uint8_t)(unsafe.Pointer(&traceID[0])),
spanID: (*C.uint8_t)(unsafe.Pointer(&spanID[0])),
flag: C.uchar(span.SpanContext().TraceFlags()),
}
status = C.AppendIndexV2(traceCtx, li.cLoadIndexInfo)
}
status := C.AppendIndexV2(traceCtx, li.cLoadIndexInfo)
return HandleCStatus(ctx, &status, "AppendIndex failed")
}

View File

@ -833,7 +833,8 @@ func genInsertData(msgLength int, schema *schemapb.CollectionSchema) (*storage.I
}
case schemapb.DataType_Array:
insertData.Data[f.FieldID] = &storage.ArrayFieldData{
Data: generateArrayArray(msgLength),
ElementType: schemapb.DataType_Int32,
Data: generateArrayArray(msgLength),
}
case schemapb.DataType_JSON:
insertData.Data[f.FieldID] = &storage.JSONFieldData{

View File

@ -172,6 +172,50 @@ func (_c *MockLoader_LoadBloomFilterSet_Call) RunAndReturn(run func(context.Cont
return _c
}
// LoadDelta provides a mock function with given fields: ctx, collectionID, segment
func (_m *MockLoader) LoadDelta(ctx context.Context, collectionID int64, segment *LocalSegment) error {
ret := _m.Called(ctx, collectionID, segment)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, int64, *LocalSegment) error); ok {
r0 = rf(ctx, collectionID, segment)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockLoader_LoadDelta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LoadDelta'
type MockLoader_LoadDelta_Call struct {
*mock.Call
}
// LoadDelta is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - segment *LocalSegment
func (_e *MockLoader_Expecter) LoadDelta(ctx interface{}, collectionID interface{}, segment interface{}) *MockLoader_LoadDelta_Call {
return &MockLoader_LoadDelta_Call{Call: _e.mock.On("LoadDelta", ctx, collectionID, segment)}
}
func (_c *MockLoader_LoadDelta_Call) Run(run func(ctx context.Context, collectionID int64, segment *LocalSegment)) *MockLoader_LoadDelta_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64), args[2].(*LocalSegment))
})
return _c
}
func (_c *MockLoader_LoadDelta_Call) Return(_a0 error) *MockLoader_LoadDelta_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockLoader_LoadDelta_Call) RunAndReturn(run func(context.Context, int64, *LocalSegment) error) *MockLoader_LoadDelta_Call {
_c.Call.Return(run)
return _c
}
// LoadDeltaLogs provides a mock function with given fields: ctx, segment, deltaLogs
func (_m *MockLoader) LoadDeltaLogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error {
ret := _m.Called(ctx, segment, deltaLogs)

View File

@ -28,9 +28,11 @@ import "C"
import (
"context"
"fmt"
"io"
"sync"
"unsafe"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"go.opentelemetry.io/otel"
@ -41,11 +43,15 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
milvus_storage "github.com/milvus-io/milvus-storage/go/storage"
"github.com/milvus-io/milvus-storage/go/storage/options"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
"github.com/milvus-io/milvus/internal/storage"
typeutil_internal "github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -158,6 +164,7 @@ type LocalSegment struct {
lastDeltaTimestamp *atomic.Uint64
fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo]
space *milvus_storage.Space
}
func NewSegment(ctx context.Context,
@ -226,6 +233,71 @@ func NewSegment(ctx context.Context,
return segment, nil
}
func NewSegmentV2(
ctx context.Context,
collection *Collection,
segmentID int64,
partitionID int64,
collectionID int64,
shard string,
segmentType SegmentType,
version int64,
startPosition *msgpb.MsgPosition,
deltaPosition *msgpb.MsgPosition,
storageVersion int64,
level datapb.SegmentLevel,
) (Segment, error) {
/*
CSegmentInterface
NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type);
*/
if level == datapb.SegmentLevel_L0 {
return NewL0Segment(collection, segmentID, partitionID, collectionID, shard, segmentType, version, startPosition, deltaPosition)
}
var segmentPtr C.CSegmentInterface
var status C.CStatus
switch segmentType {
case SegmentTypeSealed:
status = C.NewSegment(collection.collectionPtr, C.Sealed, C.int64_t(segmentID), &segmentPtr)
case SegmentTypeGrowing:
status = C.NewSegment(collection.collectionPtr, C.Growing, C.int64_t(segmentID), &segmentPtr)
default:
return nil, fmt.Errorf("illegal segment type %d when create segment %d", segmentType, segmentID)
}
if err := HandleCStatus(ctx, &status, "NewSegmentFailed"); err != nil {
return nil, err
}
log.Info("create segment",
zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", partitionID),
zap.Int64("segmentID", segmentID),
zap.String("segmentType", segmentType.String()))
url, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), segmentID)
if err != nil {
return nil, err
}
space, err := milvus_storage.Open(url, options.NewSpaceOptionBuilder().SetVersion(storageVersion).Build())
if err != nil {
return nil, err
}
segment := &LocalSegment{
baseSegment: newBaseSegment(segmentID, partitionID, collectionID, shard, segmentType, level, version, startPosition),
ptr: segmentPtr,
lastDeltaTimestamp: atomic.NewUint64(deltaPosition.GetTimestamp()),
fieldIndexes: typeutil.NewConcurrentMap[int64, *IndexedFieldInfo](),
space: space,
memSize: atomic.NewInt64(-1),
rowNum: atomic.NewInt64(-1),
insertCount: atomic.NewInt64(0),
}
return segment, nil
}
func (s *LocalSegment) isValid() bool {
return s.ptr != nil
}
@ -671,7 +743,18 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context, rowCount int64, f
var status C.CStatus
GetLoadPool().Submit(func() (any, error) {
status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() {
uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.segmentID)
if err != nil {
return nil, err
}
loadFieldDataInfo.appendURI(uri)
loadFieldDataInfo.appendStorageVersion(s.space.GetCurrentVersion())
status = C.LoadFieldDataV2(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
} else {
status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
}
return nil, nil
}).Await()
if err := HandleCStatus(ctx, &status, "LoadMultiFieldData failed",
@ -719,19 +802,33 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun
return err
}
for _, binlog := range field.Binlogs {
err = loadFieldDataInfo.appendLoadFieldDataPath(ctx, fieldID, binlog)
if err != nil {
return err
if field != nil {
for _, binlog := range field.Binlogs {
err = loadFieldDataInfo.appendLoadFieldDataPath(ctx, fieldID, binlog)
if err != nil {
return err
}
}
}
loadFieldDataInfo.appendMMapDirPath(paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue())
loadFieldDataInfo.enableMmap(fieldID, mmapEnabled)
var status C.CStatus
GetLoadPool().Submit(func() (any, error) {
log.Info("submitted loadFieldData task to dy pool")
status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() {
uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.segmentID)
if err != nil {
return nil, err
}
loadFieldDataInfo.appendURI(uri)
loadFieldDataInfo.appendStorageVersion(s.space.GetCurrentVersion())
status = C.LoadFieldDataV2(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
} else {
status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
}
return nil, nil
}).Await()
if err := HandleCStatus(ctx, &status, "LoadFieldData failed",
@ -748,6 +845,95 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun
return nil
}
func (s *LocalSegment) LoadDeltaData2(ctx context.Context, schema *schemapb.CollectionSchema) error {
deleteReader, err := s.space.ScanDelete()
if err != nil {
return err
}
if !deleteReader.Schema().HasField(common.TimeStampFieldName) {
return fmt.Errorf("can not read timestamp field in space")
}
pkFieldSchema, err := typeutil.GetPrimaryFieldSchema(schema)
if err != nil {
return err
}
ids := &schemapb.IDs{}
var pkint64s []int64
var pkstrings []string
var tss []int64
for deleteReader.Next() {
rec := deleteReader.Record()
indices := rec.Schema().FieldIndices(common.TimeStampFieldName)
tss = append(tss, rec.Column(indices[0]).(*array.Int64).Int64Values()...)
indices = rec.Schema().FieldIndices(pkFieldSchema.Name)
switch pkFieldSchema.DataType {
case schemapb.DataType_Int64:
pkint64s = append(pkint64s, rec.Column(indices[0]).(*array.Int64).Int64Values()...)
case schemapb.DataType_VarChar:
columnData := rec.Column(indices[0]).(*array.String)
for i := 0; i < columnData.Len(); i++ {
pkstrings = append(pkstrings, columnData.Value(i))
}
default:
return fmt.Errorf("unknown data type %v", pkFieldSchema.DataType)
}
}
if err := deleteReader.Err(); err != nil && err != io.EOF {
return err
}
switch pkFieldSchema.DataType {
case schemapb.DataType_Int64:
ids.IdField = &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: pkint64s,
},
}
case schemapb.DataType_VarChar:
ids.IdField = &schemapb.IDs_StrId{
StrId: &schemapb.StringArray{
Data: pkstrings,
},
}
default:
return fmt.Errorf("unknown data type %v", pkFieldSchema.DataType)
}
idsBlob, err := proto.Marshal(ids)
if err != nil {
return err
}
if len(tss) == 0 {
return nil
}
loadInfo := C.CLoadDeletedRecordInfo{
timestamps: unsafe.Pointer(&tss[0]),
primary_keys: (*C.uint8_t)(unsafe.Pointer(&idsBlob[0])),
primary_keys_size: C.uint64_t(len(idsBlob)),
row_count: C.int64_t(len(tss)),
}
/*
CStatus
LoadDeletedRecord(CSegmentInterface c_segment, CLoadDeletedRecordInfo deleted_record_info)
*/
var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
status = C.LoadDeletedRecord(s.ptr, loadInfo)
return nil, nil
}).Await()
if err := HandleCStatus(ctx, &status, "LoadDeletedRecord failed"); err != nil {
return err
}
log.Info("load deleted record done",
zap.Int("rowNum", len(tss)),
zap.String("segmentType", s.Type().String()))
return nil
}
func (s *LocalSegment) AddFieldDataInfo(ctx context.Context, rowCount int64, fields []*datapb.FieldBinlog) error {
s.ptrLock.RLock()
defer s.ptrLock.RUnlock()
@ -906,6 +1092,14 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn
}
defer deleteLoadIndexInfo(loadIndexInfo)
if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() {
uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.segmentID)
if err != nil {
return err
}
loadIndexInfo.appendStorageInfo(uri, indexInfo.IndexStoreVersion)
}
err = loadIndexInfo.appendLoadIndexInfo(ctx, indexInfo, s.collectionID, s.partitionID, s.segmentID, fieldType)
if err != nil {
if loadIndexInfo.cleanLocalData(ctx) != nil {

View File

@ -19,6 +19,7 @@ package segments
import (
"context"
"fmt"
"io"
"path"
"runtime/debug"
"strconv"
@ -34,10 +35,13 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
milvus_storage "github.com/milvus-io/milvus-storage/go/storage"
"github.com/milvus-io/milvus-storage/go/storage/options"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
"github.com/milvus-io/milvus/internal/storage"
typeutil_internal "github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
@ -84,6 +88,385 @@ func (r *LoadResource) Sub(resource LoadResource) {
r.DiskSize -= resource.DiskSize
}
type segmentLoaderV2 struct {
*segmentLoader
}
func NewLoaderV2(
manager *Manager,
cm storage.ChunkManager,
) *segmentLoaderV2 {
return &segmentLoaderV2{
segmentLoader: NewLoader(manager, cm),
}
}
func (loader *segmentLoaderV2) LoadDelta(ctx context.Context, collectionID int64, segment *LocalSegment) error {
collection := loader.manager.Collection.Get(collectionID)
if collection == nil {
err := merr.WrapErrCollectionNotFound(collectionID)
log.Warn("failed to get collection while loading delta", zap.Error(err))
return err
}
return segment.LoadDeltaData2(ctx, collection.Schema())
}
func (loader *segmentLoaderV2) Load(ctx context.Context,
collectionID int64,
segmentType SegmentType,
version int64,
segments ...*querypb.SegmentLoadInfo,
) ([]Segment, error) {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", collectionID),
zap.String("segmentType", segmentType.String()),
)
if len(segments) == 0 {
log.Info("no segment to load")
return nil, nil
}
// Filter out loaded & loading segments
infos := loader.prepare(segmentType, version, segments...)
defer loader.unregister(infos...)
log.With(
zap.Int64s("requestSegments", lo.Map(segments, func(s *querypb.SegmentLoadInfo, _ int) int64 { return s.GetSegmentID() })),
zap.Int64s("preparedSegments", lo.Map(infos, func(s *querypb.SegmentLoadInfo, _ int) int64 { return s.GetSegmentID() })),
)
// continue to wait other task done
log.Info("start loading...", zap.Int("segmentNum", len(segments)), zap.Int("afterFilter", len(infos)))
// Check memory & storage limit
resource, concurrencyLevel, err := loader.requestResource(ctx, infos...)
if err != nil {
log.Warn("request resource failed", zap.Error(err))
return nil, err
}
defer loader.freeRequest(resource)
newSegments := typeutil.NewConcurrentMap[int64, Segment]()
loaded := typeutil.NewConcurrentMap[int64, Segment]()
defer func() {
newSegments.Range(func(_ int64, s Segment) bool {
s.Release()
return true
})
debug.FreeOSMemory()
}()
for _, info := range infos {
segmentID := info.GetSegmentID()
partitionID := info.GetPartitionID()
collectionID := info.GetCollectionID()
shard := info.GetInsertChannel()
collection := loader.manager.Collection.Get(collectionID)
if collection == nil {
err := merr.WrapErrCollectionNotFound(collectionID)
log.Warn("failed to get collection", zap.Error(err))
return nil, err
}
segment, err := NewSegmentV2(ctx, collection, segmentID, partitionID, collectionID, shard, segmentType, version, info.GetStartPosition(), info.GetDeltaPosition(), info.GetStorageVersion(), info.GetLevel())
if err != nil {
log.Warn("load segment failed when create new segment",
zap.Int64("partitionID", partitionID),
zap.Int64("segmentID", segmentID),
zap.Error(err),
)
return nil, err
}
newSegments.Insert(segmentID, segment)
}
loadSegmentFunc := func(idx int) error {
loadInfo := infos[idx]
partitionID := loadInfo.PartitionID
segmentID := loadInfo.SegmentID
segment, _ := newSegments.Get(segmentID)
tr := timerecord.NewTimeRecorder("loadDurationPerSegment")
var err error
if loadInfo.GetLevel() == datapb.SegmentLevel_L0 {
err = loader.LoadDelta(ctx, collectionID, segment.(*LocalSegment))
} else {
err = loader.loadSegment(ctx, segment.(*LocalSegment), loadInfo)
}
if err != nil {
log.Warn("load segment failed when load data into memory",
zap.Int64("partitionID", partitionID),
zap.Int64("segmentID", segmentID),
zap.Error(err),
)
return err
}
loader.manager.Segment.Put(segmentType, segment)
newSegments.GetAndRemove(segmentID)
loaded.Insert(segmentID, segment)
log.Info("load segment done", zap.Int64("segmentID", segmentID))
loader.notifyLoadFinish(loadInfo)
metrics.QueryNodeLoadSegmentLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(tr.ElapseSpan().Seconds())
return nil
}
// Start to load,
// Make sure we can always benefit from concurrency, and not spawn too many idle goroutines
log.Info("start to load segments in parallel",
zap.Int("segmentNum", len(infos)),
zap.Int("concurrencyLevel", concurrencyLevel))
err = funcutil.ProcessFuncParallel(len(infos),
concurrencyLevel, loadSegmentFunc, "loadSegmentFunc")
if err != nil {
log.Warn("failed to load some segments", zap.Error(err))
return nil, err
}
// Wait for all segments loaded
if err := loader.waitSegmentLoadDone(ctx, segmentType, lo.Map(segments, func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })...); err != nil {
log.Warn("failed to wait the filtered out segments load done", zap.Error(err))
return nil, err
}
log.Info("all segment load done")
var result []Segment
loaded.Range(func(_ int64, s Segment) bool {
result = append(result, s)
return true
})
return result, nil
}
func (loader *segmentLoaderV2) LoadBloomFilterSet(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) ([]*pkoracle.BloomFilterSet, error) {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", collectionID),
zap.Int64s("segmentIDs", lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) int64 {
return info.GetSegmentID()
})),
)
segmentNum := len(infos)
if segmentNum == 0 {
log.Info("no segment to load")
return nil, nil
}
collection := loader.manager.Collection.Get(collectionID)
if collection == nil {
err := merr.WrapErrCollectionNotFound(collectionID)
log.Warn("failed to get collection while loading segment", zap.Error(err))
return nil, err
}
log.Info("start loading remote...", zap.Int("segmentNum", segmentNum))
loadedBfs := typeutil.NewConcurrentSet[*pkoracle.BloomFilterSet]()
// TODO check memory for bf size
loadRemoteFunc := func(idx int) error {
loadInfo := infos[idx]
partitionID := loadInfo.PartitionID
segmentID := loadInfo.SegmentID
bfs := pkoracle.NewBloomFilterSet(segmentID, partitionID, commonpb.SegmentState_Sealed)
log.Info("loading bloom filter for remote...")
err := loader.loadBloomFilter(ctx, segmentID, bfs, loadInfo.StorageVersion)
if err != nil {
log.Warn("load remote segment bloom filter failed",
zap.Int64("partitionID", partitionID),
zap.Int64("segmentID", segmentID),
zap.Error(err),
)
return err
}
loadedBfs.Insert(bfs)
return nil
}
err := funcutil.ProcessFuncParallel(segmentNum, segmentNum, loadRemoteFunc, "loadRemoteFunc")
if err != nil {
// no partial success here
log.Warn("failed to load remote segment", zap.Error(err))
return nil, err
}
return loadedBfs.Collect(), nil
}
func (loader *segmentLoaderV2) loadBloomFilter(ctx context.Context, segmentID int64, bfs *pkoracle.BloomFilterSet,
storeVersion int64,
) error {
log := log.Ctx(ctx).With(
zap.Int64("segmentID", segmentID),
)
startTs := time.Now()
url, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), segmentID)
if err != nil {
return err
}
space, err := milvus_storage.Open(url, options.NewSpaceOptionBuilder().SetVersion(storeVersion).Build())
if err != nil {
return err
}
statsBlobs := space.StatisticsBlobs()
blobs := []*storage.Blob{}
for _, statsBlob := range statsBlobs {
blob := make([]byte, statsBlob.Size)
_, err := space.ReadBlob(statsBlob.Name, blob)
if err != nil && err != io.EOF {
return err
}
blobs = append(blobs, &storage.Blob{Value: blob})
}
var stats []*storage.PrimaryKeyStats
stats, err = storage.DeserializeStats(blobs)
if err != nil {
log.Warn("failed to deserialize stats", zap.Error(err))
return err
}
var size uint
for _, stat := range stats {
pkStat := &storage.PkStatistics{
PkFilter: stat.BF,
MinPK: stat.MinPk,
MaxPK: stat.MaxPk,
}
size += stat.BF.Cap()
bfs.AddHistoricalStats(pkStat)
}
log.Info("Successfully load pk stats", zap.Duration("time", time.Since(startTs)), zap.Uint("size", size))
return nil
}
func (loader *segmentLoaderV2) loadSegment(ctx context.Context,
segment *LocalSegment,
loadInfo *querypb.SegmentLoadInfo,
) error {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", segment.Collection()),
zap.Int64("partitionID", segment.Partition()),
zap.String("shard", segment.Shard()),
zap.Int64("segmentID", segment.ID()),
)
log.Info("start loading segment files",
zap.Int64("rowNum", loadInfo.GetNumOfRows()),
zap.String("segmentType", segment.Type().String()))
collection := loader.manager.Collection.Get(segment.Collection())
if collection == nil {
err := merr.WrapErrCollectionNotFound(segment.Collection())
log.Warn("failed to get collection while loading segment", zap.Error(err))
return err
}
// pkField := GetPkField(collection.Schema())
// TODO(xige-16): Optimize the data loading process and reduce data copying
// for now, there will be multiple copies in the process of data loading into segCore
defer debug.FreeOSMemory()
if segment.Type() == SegmentTypeSealed {
fieldsMap := typeutil.NewConcurrentMap[int64, *schemapb.FieldSchema]()
for _, field := range collection.Schema().Fields {
fieldsMap.Insert(field.FieldID, field)
}
// fieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo)
indexedFieldInfos := make(map[int64]*IndexedFieldInfo)
for _, indexInfo := range loadInfo.IndexInfos {
if indexInfo.GetIndexStoreVersion() > 0 {
fieldID := indexInfo.FieldID
fieldInfo := &IndexedFieldInfo{
IndexInfo: indexInfo,
}
indexedFieldInfos[fieldID] = fieldInfo
fieldsMap.Remove(fieldID)
// fieldID2IndexInfo[fieldID] = indexInfo
}
}
log.Info("load fields...",
zap.Int64s("indexedFields", lo.Keys(indexedFieldInfos)),
)
schemaHelper, err := typeutil.CreateSchemaHelper(collection.Schema())
if err != nil {
return err
}
if err := loader.loadFieldsIndex(ctx, schemaHelper, segment, loadInfo.GetNumOfRows(), indexedFieldInfos); err != nil {
return err
}
// REMOVEME
keys := make([]int64, 0)
fieldsMap.Range(func(key int64, value *schemapb.FieldSchema) bool {
keys = append(keys, key)
return true
})
if err := loader.loadSealedSegmentFields(ctx, segment, fieldsMap, loadInfo.GetNumOfRows()); err != nil {
return err
}
if err := segment.AddFieldDataInfo(ctx, loadInfo.GetNumOfRows(), loadInfo.GetBinlogPaths()); err != nil {
return err
}
// https://github.com/milvus-io/milvus/23654
// legacy entry num = 0
if err := loader.patchEntryNumber(ctx, segment, loadInfo); err != nil {
return err
}
} else {
if err := segment.LoadMultiFieldData(ctx, loadInfo.GetNumOfRows(), loadInfo.BinlogPaths); err != nil {
return err
}
}
// load statslog if it's growing segment
if segment.typ == SegmentTypeGrowing {
log.Info("loading statslog...")
// pkStatsBinlogs, logType := loader.filterPKStatsBinlogs(loadInfo.Statslogs, pkField.GetFieldID())
err := loader.loadBloomFilter(ctx, segment.segmentID, segment.bloomFilterSet, loadInfo.StorageVersion)
if err != nil {
return err
}
}
log.Info("loading delta...")
return loader.LoadDelta(ctx, segment.Collection(), segment)
}
func (loader *segmentLoaderV2) loadSealedSegmentFields(ctx context.Context, segment *LocalSegment, fields *typeutil.ConcurrentMap[int64, *schemapb.FieldSchema], rowCount int64) error {
runningGroup, _ := errgroup.WithContext(ctx)
fields.Range(func(fieldID int64, fieldSchema *schemapb.FieldSchema) bool {
runningGroup.Go(func() error {
return segment.LoadFieldData(ctx, fieldID, rowCount, nil, common.IsMmapEnabled(fieldSchema.GetTypeParams()...))
})
return true
})
err := runningGroup.Wait()
if err != nil {
return err
}
log.Ctx(ctx).Info("load field binlogs done for sealed segment",
zap.Int64("collection", segment.collectionID),
zap.Int64("segment", segment.segmentID),
zap.String("segmentType", segment.Type().String()))
return nil
}
func NewLoader(
manager *Manager,
cm storage.ChunkManager,

View File

@ -22,14 +22,21 @@ import (
"testing"
"time"
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
milvus_storage "github.com/milvus-io/milvus-storage/go/storage"
"github.com/milvus-io/milvus-storage/go/storage/options"
"github.com/milvus-io/milvus-storage/go/storage/schema"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/initcore"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -564,6 +571,7 @@ func (suite *SegmentLoaderSuite) TestPatchEntryNum() {
func (suite *SegmentLoaderSuite) TestRunOutMemory() {
ctx := context.Background()
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.Key, "0")
defer paramtable.Get().Reset(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.Key)
msgLength := 4
@ -770,3 +778,149 @@ func TestSegmentLoader(t *testing.T) {
suite.Run(t, &SegmentLoaderSuite{})
suite.Run(t, &SegmentLoaderDetailSuite{})
}
type SegmentLoaderV2Suite struct {
suite.Suite
loader *segmentLoaderV2
// Dependencies
manager *Manager
rootPath string
chunkManager storage.ChunkManager
// Data
collectionID int64
partitionID int64
segmentID int64
schema *schemapb.CollectionSchema
segmentNum int
}
func (suite *SegmentLoaderV2Suite) SetupSuite() {
paramtable.Init()
suite.rootPath = suite.T().Name()
suite.collectionID = rand.Int63()
suite.partitionID = rand.Int63()
suite.segmentID = rand.Int63()
suite.segmentNum = 5
}
func (suite *SegmentLoaderV2Suite) SetupTest() {
paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("true")
// Dependencies
suite.manager = NewManager()
ctx := context.Background()
// TODO:: cpp chunk manager not support local chunk manager
// suite.chunkManager = storage.NewLocalChunkManager(storage.RootPath(
// fmt.Sprintf("/tmp/milvus-ut/%d", rand.Int63())))
chunkManagerFactory := NewTestChunkManagerFactory(paramtable.Get(), suite.rootPath)
suite.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(ctx)
suite.loader = NewLoaderV2(suite.manager, suite.chunkManager)
initcore.InitRemoteChunkManager(paramtable.Get())
// Data
suite.schema = GenTestCollectionSchema("test", schemapb.DataType_Int64)
indexMeta := GenTestIndexMeta(suite.collectionID, suite.schema)
loadMeta := &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
CollectionID: suite.collectionID,
PartitionIDs: []int64{suite.partitionID},
}
suite.manager.Collection.PutOrRef(suite.collectionID, suite.schema, indexMeta, loadMeta)
}
func (suite *SegmentLoaderV2Suite) TearDownTest() {
ctx := context.Background()
for i := 0; i < suite.segmentNum; i++ {
suite.manager.Segment.Remove(suite.segmentID+int64(i), querypb.DataScope_All)
}
suite.chunkManager.RemoveWithPrefix(ctx, suite.rootPath)
paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("false")
}
func (suite *SegmentLoaderV2Suite) TestLoad() {
tmpDir := suite.T().TempDir()
paramtable.Get().CommonCfg.StorageScheme.SwapTempValue("file")
paramtable.Get().CommonCfg.StoragePathPrefix.SwapTempValue(tmpDir)
ctx := context.Background()
msgLength := 4
arrowSchema, err := typeutil.ConvertToArrowSchema(suite.schema.Fields)
suite.NoError(err)
opt := options.NewSpaceOptionBuilder().
SetSchema(schema.NewSchema(
arrowSchema,
&schema.SchemaOptions{
PrimaryColumn: "int64Field",
VectorColumn: "floatVectorField",
VersionColumn: "Timestamp",
})).
Build()
uri, err := typeutil.GetStorageURI("file", tmpDir, suite.segmentID)
suite.NoError(err)
space, err := milvus_storage.Open(uri, opt)
suite.NoError(err)
b := array.NewRecordBuilder(memory.DefaultAllocator, arrowSchema)
defer b.Release()
insertData, err := genInsertData(msgLength, suite.schema)
suite.NoError(err)
err = typeutil.BuildRecord(b, insertData, suite.schema.Fields)
suite.NoError(err)
rec := b.NewRecord()
defer rec.Release()
reader, err := array.NewRecordReader(arrowSchema, []arrow.Record{rec})
suite.NoError(err)
err = space.Write(reader, &options.DefaultWriteOptions)
suite.NoError(err)
collMeta := genCollectionMeta(suite.collectionID, suite.partitionID, suite.schema)
inCodec := storage.NewInsertCodecWithSchema(collMeta)
statsLog, err := inCodec.SerializePkStatsByData(insertData)
suite.NoError(err)
err = space.WriteBlob(statsLog.Value, statsLog.Key, false)
suite.NoError(err)
dschema := space.Manifest().GetSchema().DeleteSchema()
dbuilder := array.NewRecordBuilder(memory.DefaultAllocator, dschema)
defer dbuilder.Release()
dbuilder.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2}, nil)
dbuilder.Field(1).(*array.Int64Builder).AppendValues([]int64{100, 200}, nil)
drec := dbuilder.NewRecord()
defer drec.Release()
dreader, err := array.NewRecordReader(dschema, []arrow.Record{drec})
suite.NoError(err)
err = space.Delete(dreader)
suite.NoError(err)
segments, err := suite.loader.Load(ctx, suite.collectionID, SegmentTypeSealed, 0, &querypb.SegmentLoadInfo{
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
NumOfRows: int64(msgLength),
StorageVersion: 3,
})
suite.NoError(err)
_, err = suite.loader.LoadBloomFilterSet(ctx, suite.collectionID, 0, &querypb.SegmentLoadInfo{
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
NumOfRows: int64(msgLength),
StorageVersion: 3,
})
suite.NoError(err)
segment := segments[0]
suite.Equal(int64(msgLength-2), segment.RowNum())
}
func TestSegmentLoaderV2(t *testing.T) {
suite.Run(t, &SegmentLoaderV2Suite{})
}

View File

@ -345,7 +345,11 @@ func (node *QueryNode) Init() error {
node.subscribingChannels = typeutil.NewConcurrentSet[string]()
node.unsubscribingChannels = typeutil.NewConcurrentSet[string]()
node.manager = segments.NewManager()
node.loader = segments.NewLoader(node.manager, node.chunkManager)
if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() {
node.loader = segments.NewLoaderV2(node.manager, node.chunkManager)
} else {
node.loader = segments.NewLoader(node.manager, node.chunkManager)
}
node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.QueryNodeRole, paramtable.GetNodeID())
// init pipeline manager
node.pipelineManager = pipeline.NewManager(node.manager, node.tSafeManager, node.dispClient, node.delegators)

View File

@ -53,13 +53,9 @@ func ConvertToArrowSchema(fields []*schemapb.FieldSchema) (*arrow.Schema, error)
Type: arrow.BinaryTypes.String,
})
case schemapb.DataType_Array:
elemType, err := convertToArrowType(field.ElementType)
if err != nil {
return nil, err
}
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: arrow.ListOf(elemType),
Type: arrow.BinaryTypes.Binary,
})
case schemapb.DataType_JSON:
arrowFields = append(arrowFields, arrow.Field{

View File

@ -0,0 +1,122 @@
package typeutil
import (
"fmt"
"math"
"path"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
func GetStorageURI(protocol, pathPrefix string, segmentID int64) (string, error) {
switch protocol {
case "s3":
var scheme string
if paramtable.Get().MinioCfg.UseSSL.GetAsBool() {
scheme = "https"
} else {
scheme = "http"
}
if pathPrefix != "" {
cleanPath := path.Clean(pathPrefix)
return fmt.Sprintf("s3://%s:%s@%s/%s/%d?scheme=%s&endpoint_override=%s&allow_bucket_creation=true", paramtable.Get().MinioCfg.AccessKeyID.GetValue(), paramtable.Get().MinioCfg.SecretAccessKey.GetValue(), paramtable.Get().MinioCfg.BucketName.GetValue(), cleanPath, segmentID, scheme, paramtable.Get().MinioCfg.Address.GetValue()), nil
} else {
return fmt.Sprintf("s3://%s:%s@%s/%d?scheme=%s&endpoint_override=%s&allow_bucket_creation=true", paramtable.Get().MinioCfg.AccessKeyID.GetValue(), paramtable.Get().MinioCfg.SecretAccessKey.GetValue(), paramtable.Get().MinioCfg.BucketName.GetValue(), segmentID, scheme, paramtable.Get().MinioCfg.Address.GetValue()), nil
}
case "file":
if pathPrefix != "" {
cleanPath := path.Clean(pathPrefix)
return fmt.Sprintf("file://%s/%d", cleanPath, segmentID), nil
} else {
return fmt.Sprintf("file://%d", segmentID), nil
}
default:
return "", merr.WrapErrParameterInvalidMsg("unsupported schema %s", protocol)
}
}
func BuildRecord(b *array.RecordBuilder, data *storage.InsertData, fields []*schemapb.FieldSchema) error {
if data == nil {
log.Info("no buffer data to flush")
return nil
}
for i, field := range fields {
fBuilder := b.Field(i)
switch field.DataType {
case schemapb.DataType_Bool:
fBuilder.(*array.BooleanBuilder).AppendValues(data.Data[field.FieldID].(*storage.BoolFieldData).Data, nil)
case schemapb.DataType_Int8:
fBuilder.(*array.Int8Builder).AppendValues(data.Data[field.FieldID].(*storage.Int8FieldData).Data, nil)
case schemapb.DataType_Int16:
fBuilder.(*array.Int16Builder).AppendValues(data.Data[field.FieldID].(*storage.Int16FieldData).Data, nil)
case schemapb.DataType_Int32:
fBuilder.(*array.Int32Builder).AppendValues(data.Data[field.FieldID].(*storage.Int32FieldData).Data, nil)
case schemapb.DataType_Int64:
fBuilder.(*array.Int64Builder).AppendValues(data.Data[field.FieldID].(*storage.Int64FieldData).Data, nil)
case schemapb.DataType_Float:
fBuilder.(*array.Float32Builder).AppendValues(data.Data[field.FieldID].(*storage.FloatFieldData).Data, nil)
case schemapb.DataType_Double:
fBuilder.(*array.Float64Builder).AppendValues(data.Data[field.FieldID].(*storage.DoubleFieldData).Data, nil)
case schemapb.DataType_VarChar, schemapb.DataType_String:
fBuilder.(*array.StringBuilder).AppendValues(data.Data[field.FieldID].(*storage.StringFieldData).Data, nil)
case schemapb.DataType_Array:
for _, data := range data.Data[field.FieldID].(*storage.ArrayFieldData).Data {
marsheled, err := proto.Marshal(data)
if err != nil {
return err
}
fBuilder.(*array.BinaryBuilder).Append(marsheled)
}
case schemapb.DataType_JSON:
fBuilder.(*array.BinaryBuilder).AppendValues(data.Data[field.FieldID].(*storage.JSONFieldData).Data, nil)
case schemapb.DataType_BinaryVector:
vecData := data.Data[field.FieldID].(*storage.BinaryVectorFieldData)
for i := 0; i < len(vecData.Data); i += vecData.Dim / 8 {
fBuilder.(*array.FixedSizeBinaryBuilder).Append(vecData.Data[i : i+vecData.Dim/8])
}
case schemapb.DataType_FloatVector:
vecData := data.Data[field.FieldID].(*storage.FloatVectorFieldData)
builder := fBuilder.(*array.FixedSizeBinaryBuilder)
dim := vecData.Dim
data := vecData.Data
byteLength := dim * 4
length := len(data) / dim
builder.Reserve(length)
bytesData := make([]byte, byteLength)
for i := 0; i < length; i++ {
vec := data[i*dim : (i+1)*dim]
for j := range vec {
bytes := math.Float32bits(vec[j])
common.Endian.PutUint32(bytesData[j*4:], bytes)
}
builder.Append(bytesData)
}
case schemapb.DataType_Float16Vector:
vecData := data.Data[field.FieldID].(*storage.Float16VectorFieldData)
builder := fBuilder.(*array.FixedSizeBinaryBuilder)
dim := vecData.Dim
data := vecData.Data
byteLength := dim * 2
length := len(data) / byteLength
builder.Reserve(length)
for i := 0; i < length; i++ {
builder.Append(data[i*byteLength : (i+1)*byteLength])
}
default:
return merr.WrapErrParameterInvalidMsg("unknown type %v", field.DataType.String())
}
}
return nil
}

View File

@ -226,11 +226,11 @@ type commonConfig struct {
LockSlowLogInfoThreshold ParamItem `refreshable:"true"`
LockSlowLogWarnThreshold ParamItem `refreshable:"true"`
StorageScheme ParamItem `refreshable:"false"`
EnableStorageV2 ParamItem `refreshable:"false"`
TTMsgEnabled ParamItem `refreshable:"true"`
TraceLogMode ParamItem `refreshable:"true"`
StorageScheme ParamItem `refreshable:"false"`
EnableStorageV2 ParamItem `refreshable:"false"`
StoragePathPrefix ParamItem `refreshable:"false"`
TTMsgEnabled ParamItem `refreshable:"true"`
TraceLogMode ParamItem `refreshable:"true"`
BloomFilterSize ParamItem `refreshable:"true"`
MaxBloomFalsePositive ParamItem `refreshable:"true"`
}
@ -660,6 +660,13 @@ like the old password verification when updating the credential`,
}
p.StorageScheme.Init(base.mgr)
p.StoragePathPrefix = ParamItem{
Key: "common.storage.pathPrefix",
Version: "2.3.4",
DefaultValue: "",
}
p.StoragePathPrefix.Init(base.mgr)
p.TTMsgEnabled = ParamItem{
Key: "common.ttMsgEnabled",
Version: "2.3.2",