feat: Storage v2 binlog packed record reader and writer (#40221)

related: #39173

---------

Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
pull/39873/head
sthuang 2025-03-03 10:24:02 +08:00 committed by GitHub
parent 8eb662b4dc
commit de02a3ebcc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 1079 additions and 113 deletions

View File

@ -486,7 +486,7 @@ generate-mockery-flushcommon: getdeps
$(INSTALL_PATH)/mockery --name=Task --dir=$(PWD)/internal/flushcommon/syncmgr --output=$(PWD)/internal/flushcommon/syncmgr --filename=mock_task.go --with-expecter --structname=MockTask --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/flushcommon/writebuffer --output=$(PWD)/internal/flushcommon/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --outpkg=writebuffer --inpackage
$(INSTALL_PATH)/mockery --name=BufferManager --dir=$(PWD)/internal/flushcommon/writebuffer --output=$(PWD)/internal/flushcommon/writebuffer --filename=mock_manager.go --with-expecter --structname=MockBufferManager --outpkg=writebuffer --inpackage
$(INSTALL_PATH)/mockery --name=BinlogIO --dir=$(PWD)/internal/flushcommon/io --output=$(PWD)/internal/flushcommon/io --filename=mock_binlogio.go --with-expecter --structname=MockBinlogIO --outpkg=io --inpackage
$(INSTALL_PATH)/mockery --name=BinlogIO --dir=$(PWD)/internal/flushcommon/io --output=$(PWD)/internal/mocks/flushcommon/mock_util --filename=mock_binlogio.go --with-expecter --structname=MockBinlogIO --outpkg=mock_util --inpackage=false
$(INSTALL_PATH)/mockery --name=FlowgraphManager --dir=$(PWD)/internal/flushcommon/pipeline --output=$(PWD)/internal/flushcommon/pipeline --filename=mock_fgmanager.go --with-expecter --structname=MockFlowgraphManager --outpkg=pipeline --inpackage
generate-mockery-metastore: getdeps

View File

@ -32,7 +32,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/mocks/flushcommon/mock_util"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
@ -49,7 +49,7 @@ func TestClusteringCompactionTaskSuite(t *testing.T) {
type ClusteringCompactionTaskSuite struct {
suite.Suite
mockBinlogIO *io.MockBinlogIO
mockBinlogIO *mock_util.MockBinlogIO
mockAlloc *allocator.MockAllocator
mockID atomic.Int64
@ -63,7 +63,7 @@ func (s *ClusteringCompactionTaskSuite) SetupSuite() {
}
func (s *ClusteringCompactionTaskSuite) SetupTest() {
s.mockBinlogIO = io.NewMockBinlogIO(s.T())
s.mockBinlogIO = mock_util.NewMockBinlogIO(s.T())
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Maybe()
@ -490,7 +490,7 @@ func (s *ClusteringCompactionTaskSuite) TestGenerateBM25Stats() {
s.Run("upload failed", func() {
segmentID := int64(1)
mockBinlogIO := io.NewMockBinlogIO(s.T())
mockBinlogIO := mock_util.NewMockBinlogIO(s.T())
mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(fmt.Errorf("mock error")).Once()
task := &clusteringCompactionTask{
@ -566,7 +566,7 @@ func (s *ClusteringCompactionTaskSuite) TestGeneratePkStats() {
kvs, _, err := serializeWrite(context.TODO(), s.mockAlloc, segWriter)
s.NoError(err)
mockBinlogIO := io.NewMockBinlogIO(s.T())
mockBinlogIO := mock_util.NewMockBinlogIO(s.T())
mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(lo.Values(kvs), nil)
mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(fmt.Errorf("mock error"))
task := &clusteringCompactionTask{

View File

@ -28,9 +28,9 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/mocks/flushcommon/mock_util"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
@ -47,7 +47,7 @@ func TestLevelZeroCompactionTaskSuite(t *testing.T) {
type LevelZeroCompactionTaskSuite struct {
suite.Suite
mockBinlogIO *io.MockBinlogIO
mockBinlogIO *mock_util.MockBinlogIO
task *LevelZeroCompactionTask
dData *storage.DeleteData
@ -56,7 +56,7 @@ type LevelZeroCompactionTaskSuite struct {
func (s *LevelZeroCompactionTaskSuite) SetupTest() {
paramtable.Init()
s.mockBinlogIO = io.NewMockBinlogIO(s.T())
s.mockBinlogIO = mock_util.NewMockBinlogIO(s.T())
// plan of the task is unset
s.task = NewLevelZeroCompactionTask(context.Background(), s.mockBinlogIO, nil, nil)

View File

@ -32,9 +32,9 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
"github.com/milvus-io/milvus/internal/mocks/flushcommon/mock_util"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
@ -51,7 +51,7 @@ func TestMixCompactionTaskSuite(t *testing.T) {
type MixCompactionTaskSuite struct {
suite.Suite
mockBinlogIO *io.MockBinlogIO
mockBinlogIO *mock_util.MockBinlogIO
meta *etcdpb.CollectionMeta
segWriter *SegmentWriter
@ -64,7 +64,7 @@ func (s *MixCompactionTaskSuite) SetupSuite() {
}
func (s *MixCompactionTaskSuite) SetupTest() {
s.mockBinlogIO = io.NewMockBinlogIO(s.T())
s.mockBinlogIO = mock_util.NewMockBinlogIO(s.T())
s.meta = genTestCollectionMeta()
@ -90,7 +90,7 @@ func (s *MixCompactionTaskSuite) SetupTest() {
}
func (s *MixCompactionTaskSuite) SetupBM25() {
s.mockBinlogIO = io.NewMockBinlogIO(s.T())
s.mockBinlogIO = mock_util.NewMockBinlogIO(s.T())
s.meta = genTestCollectionMetaWithBM25()
plan := &datapb.CompactionPlan{

View File

@ -29,7 +29,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/compaction"
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/mocks/flushcommon/mock_util"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
@ -51,7 +51,7 @@ type TaskStatsSuite struct {
clusterID string
schema *schemapb.CollectionSchema
mockBinlogIO *io.MockBinlogIO
mockBinlogIO *mock_util.MockBinlogIO
segWriter *compaction.SegmentWriter
}
@ -63,7 +63,7 @@ func (s *TaskStatsSuite) SetupSuite() {
func (s *TaskStatsSuite) SetupSubTest() {
paramtable.Init()
s.mockBinlogIO = io.NewMockBinlogIO(s.T())
s.mockBinlogIO = mock_util.NewMockBinlogIO(s.T())
}
func (s *TaskStatsSuite) GenSegmentWriterWithBM25(magic int64) {

View File

@ -1,6 +1,6 @@
// Code generated by mockery v2.46.0. DO NOT EDIT.
package io
package mock_util
import (
context "context"

View File

@ -25,6 +25,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/storagecommon"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
)
@ -35,17 +36,20 @@ const (
)
type rwOptions struct {
version int64
bufferSize uint64
downloader func(ctx context.Context, paths []string) ([][]byte, error)
uploader func(ctx context.Context, kvs map[string][]byte) error
version int64
bufferSize int64
downloader func(ctx context.Context, paths []string) ([][]byte, error)
uploader func(ctx context.Context, kvs map[string][]byte) error
multiPartUploadSize int64
columnGroups []storagecommon.ColumnGroup
}
type RwOption func(*rwOptions)
func defaultRwOptions() *rwOptions {
return &rwOptions{
bufferSize: 32 * 1024 * 1024,
bufferSize: 32 * 1024 * 1024,
multiPartUploadSize: 10 * 1024 * 1024,
}
}
@ -55,12 +59,18 @@ func WithVersion(version int64) RwOption {
}
}
func WithBufferSize(bufferSize uint64) RwOption {
func WithBufferSize(bufferSize int64) RwOption {
return func(options *rwOptions) {
options.bufferSize = bufferSize
}
}
func WithMultiPartUploadSize(multiPartUploadSize int64) RwOption {
return func(options *rwOptions) {
options.multiPartUploadSize = multiPartUploadSize
}
}
func WithDownloader(downloader func(ctx context.Context, paths []string) ([][]byte, error)) RwOption {
return func(options *rwOptions) {
options.downloader = downloader
@ -73,6 +83,12 @@ func WithUploader(uploader func(ctx context.Context, kvs map[string][]byte) erro
}
}
func WithColumnGroups(columnGroups []storagecommon.ColumnGroup) RwOption {
return func(options *rwOptions) {
options.columnGroups = columnGroups
}
}
func NewBinlogRecordReader(ctx context.Context, binlogs []*datapb.FieldBinlog, schema *schemapb.CollectionSchema, option ...RwOption) (RecordReader, error) {
rwOptions := defaultRwOptions()
for _, opt := range option {
@ -103,7 +119,19 @@ func NewBinlogRecordReader(ctx context.Context, binlogs []*datapb.FieldBinlog, s
return blobs, nil
})
case StorageV2:
// TODO: integrate v2
if len(binlogs) <= 0 {
return nil, sio.EOF
}
binlogLists := lo.Map(binlogs, func(fieldBinlog *datapb.FieldBinlog, _ int) []*datapb.Binlog {
return fieldBinlog.GetBinlogs()
})
paths := make([][]string, len(binlogLists[0]))
for _, binlogs := range binlogLists {
for j, binlog := range binlogs {
paths[j] = append(paths[j], binlog.GetLogPath())
}
}
return newPackedRecordReader(paths, schema, rwOptions.bufferSize)
}
return nil, merr.WrapErrServiceInternal(fmt.Sprintf("unsupported storage version %d", rwOptions.version))
}
@ -116,20 +144,23 @@ func NewBinlogRecordWriter(ctx context.Context, collectionID, partitionID, segme
for _, opt := range option {
opt(rwOptions)
}
blobsWriter := func(blobs []*Blob) error {
kvs := make(map[string][]byte, len(blobs))
for _, blob := range blobs {
kvs[blob.Key] = blob.Value
}
return rwOptions.uploader(ctx, kvs)
}
switch rwOptions.version {
case StorageV1:
blobsWriter := func(blobs []*Blob) error {
kvs := make(map[string][]byte, len(blobs))
for _, blob := range blobs {
kvs[blob.Key] = blob.Value
}
return rwOptions.uploader(ctx, kvs)
}
return newCompositeBinlogRecordWriter(collectionID, partitionID, segmentID, schema,
blobsWriter, allocator, chunkSize, rootPath, maxRowNum,
)
case StorageV2:
// TODO: integrate v2
return newPackedBinlogRecordWriter(collectionID, partitionID, segmentID, schema,
blobsWriter, allocator, chunkSize, rootPath, maxRowNum,
rwOptions.bufferSize, rwOptions.multiPartUploadSize, rwOptions.columnGroups,
)
}
return nil, merr.WrapErrServiceInternal(fmt.Sprintf("unsupported storage version %d", rwOptions.version))
}

381
internal/storage/rw_test.go Normal file
View File

@ -0,0 +1,381 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"io"
"math"
"strconv"
"sync/atomic"
"testing"
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/mocks/flushcommon/mock_util"
"github.com/milvus-io/milvus/internal/storagecommon"
"github.com/milvus-io/milvus/internal/util/initcore"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
type mockIDAllocator struct{}
func (tso *mockIDAllocator) AllocID(ctx context.Context, req *rootcoordpb.AllocIDRequest, opts ...grpc.CallOption) (*rootcoordpb.AllocIDResponse, error) {
return &rootcoordpb.AllocIDResponse{
Status: merr.Success(),
ID: int64(1),
Count: req.Count,
}, nil
}
func newMockIDAllocator() *mockIDAllocator {
return &mockIDAllocator{}
}
func TestPackedBinlogRecordSuite(t *testing.T) {
suite.Run(t, new(PackedBinlogRecordSuite))
}
type PackedBinlogRecordSuite struct {
suite.Suite
ctx context.Context
mockID atomic.Int64
logIDAlloc allocator.Interface
mockBinlogIO *mock_util.MockBinlogIO
collectionID UniqueID
partitionID UniqueID
segmentID UniqueID
schema *schemapb.CollectionSchema
rootPath string
maxRowNum int64
chunkSize uint64
}
func (s *PackedBinlogRecordSuite) SetupTest() {
ctx := context.Background()
s.ctx = ctx
logIDAlloc := allocator.NewLocalAllocator(1, math.MaxInt64)
s.logIDAlloc = logIDAlloc
initcore.InitLocalArrowFileSystem("/tmp")
s.mockID.Store(time.Now().UnixMilli())
s.mockBinlogIO = mock_util.NewMockBinlogIO(s.T())
s.collectionID = UniqueID(0)
s.partitionID = UniqueID(0)
s.segmentID = UniqueID(0)
s.schema = generateTestSchema()
s.rootPath = "/tmp"
s.maxRowNum = int64(1000)
s.chunkSize = uint64(1024)
}
func genTestColumnGroups(schema *schemapb.CollectionSchema) []storagecommon.ColumnGroup {
fieldBinlogs := make([]*datapb.FieldBinlog, 0)
for i, field := range schema.Fields {
fieldBinlogs = append(fieldBinlogs, &datapb.FieldBinlog{
FieldID: field.FieldID,
Binlogs: []*datapb.Binlog{
{
EntriesNum: int64(10 * (i + 1)),
LogSize: int64(1000 / (i + 1)),
},
},
})
}
return storagecommon.SplitByFieldSize(fieldBinlogs, 10)
}
func (s *PackedBinlogRecordSuite) TestPackedBinlogRecordIntegration() {
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
rows := 10000
readBatchSize := 1024
columnGroups := genTestColumnGroups(s.schema)
wOption := []RwOption{
WithUploader(func(ctx context.Context, kvs map[string][]byte) error {
return s.mockBinlogIO.Upload(ctx, kvs)
}),
WithVersion(StorageV2),
WithMultiPartUploadSize(0),
WithBufferSize(1 * 1024 * 1024), // 1MB
WithColumnGroups(columnGroups),
}
w, err := NewBinlogRecordWriter(s.ctx, s.collectionID, s.partitionID, s.segmentID, s.schema, s.logIDAlloc, s.chunkSize, s.rootPath, s.maxRowNum, wOption...)
s.NoError(err)
blobs, err := generateTestData(rows)
s.NoError(err)
reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs))
s.NoError(err)
defer reader.Close()
for i := 1; i <= rows; i++ {
err = reader.Next()
s.NoError(err)
value := reader.Value()
rec, err := ValueSerializer([]*Value{value}, s.schema.Fields)
s.NoError(err)
err = w.Write(rec)
s.NoError(err)
}
err = w.Close()
s.NoError(err)
writtenUncompressed := w.GetWrittenUncompressed()
s.Positive(writtenUncompressed)
rowNum := w.GetRowNum()
s.Equal(rowNum, int64(rows))
fieldBinlogs, statsLog, bm25StatsLog := w.GetLogs()
s.Equal(len(fieldBinlogs), len(columnGroups))
for _, columnGroup := range fieldBinlogs {
s.Equal(len(columnGroup.Binlogs), 1)
s.Equal(columnGroup.Binlogs[0].EntriesNum, int64(rows))
s.Positive(columnGroup.Binlogs[0].MemorySize)
}
s.Equal(len(statsLog.Binlogs), 1)
s.Equal(statsLog.Binlogs[0].EntriesNum, int64(rows))
s.Equal(len(bm25StatsLog), 0)
binlogs := lo.Values(fieldBinlogs)
rOption := []RwOption{
WithVersion(StorageV2),
}
r, err := NewBinlogRecordReader(s.ctx, binlogs, s.schema, rOption...)
s.NoError(err)
for i := 0; i < rows/readBatchSize+1; i++ {
rec, err := r.Next()
s.NoError(err)
if i < rows/readBatchSize {
s.Equal(rec.Len(), readBatchSize)
} else {
s.Equal(rec.Len(), rows%readBatchSize)
}
}
_, err = r.Next()
s.Equal(err, io.EOF)
err = r.Close()
s.NoError(err)
}
func (s *PackedBinlogRecordSuite) TestGenerateBM25Stats() {
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
s.schema = genCollectionSchemaWithBM25()
columnGroups := genTestColumnGroups(s.schema)
wOption := []RwOption{
WithUploader(func(ctx context.Context, kvs map[string][]byte) error {
return s.mockBinlogIO.Upload(ctx, kvs)
}),
WithVersion(StorageV2),
WithMultiPartUploadSize(0),
WithBufferSize(10 * 1024 * 1024), // 10MB
WithColumnGroups(columnGroups),
}
v := &Value{
PK: NewVarCharPrimaryKey("0"),
Timestamp: int64(tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)),
Value: genRowWithBM25(0),
}
rec, err := ValueSerializer([]*Value{v}, s.schema.Fields)
s.NoError(err)
w, err := NewBinlogRecordWriter(s.ctx, s.collectionID, s.partitionID, s.segmentID, s.schema, s.logIDAlloc, s.chunkSize, s.rootPath, s.maxRowNum, wOption...)
s.NoError(err)
err = w.Write(rec)
s.NoError(err)
err = w.Close()
s.NoError(err)
fieldBinlogs, statsLog, bm25StatsLog := w.GetLogs()
s.Equal(len(fieldBinlogs), len(columnGroups))
s.Equal(statsLog.Binlogs[0].EntriesNum, int64(1))
s.Positive(statsLog.Binlogs[0].MemorySize)
s.Equal(len(bm25StatsLog), 1)
s.Equal(bm25StatsLog[102].Binlogs[0].EntriesNum, int64(1))
s.Positive(bm25StatsLog[102].Binlogs[0].MemorySize)
}
func (s *PackedBinlogRecordSuite) TestUnsuportedStorageVersion() {
wOption := []RwOption{
WithVersion(-1),
}
_, err := NewBinlogRecordWriter(s.ctx, s.collectionID, s.partitionID, s.segmentID, s.schema, s.logIDAlloc, s.chunkSize, s.rootPath, s.maxRowNum, wOption...)
s.Error(err)
rOption := []RwOption{
WithVersion(-1),
}
_, err = NewBinlogRecordReader(s.ctx, []*datapb.FieldBinlog{{}}, s.schema, rOption...)
s.Error(err)
}
func (s *PackedBinlogRecordSuite) TestNoPrimaryKeyError() {
s.schema = &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
{FieldID: 13, Name: "field12", DataType: schemapb.DataType_JSON},
}}
columnGroups := genTestColumnGroups(s.schema)
wOption := []RwOption{
WithVersion(StorageV2),
WithColumnGroups(columnGroups),
}
_, err := NewBinlogRecordWriter(s.ctx, s.collectionID, s.partitionID, s.segmentID, s.schema, s.logIDAlloc, s.chunkSize, s.rootPath, s.maxRowNum, wOption...)
s.Error(err)
}
func (s *PackedBinlogRecordSuite) TestConvertArrowSchemaError() {
s.schema = &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
{FieldID: 14, Name: "field13", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{}},
}}
columnGroups := genTestColumnGroups(s.schema)
wOption := []RwOption{
WithVersion(StorageV2),
WithColumnGroups(columnGroups),
}
_, err := NewBinlogRecordWriter(s.ctx, s.collectionID, s.partitionID, s.segmentID, s.schema, s.logIDAlloc, s.chunkSize, s.rootPath, s.maxRowNum, wOption...)
s.Error(err)
}
func (s *PackedBinlogRecordSuite) TestEmptyColumnGroup() {
wOption := []RwOption{
WithVersion(StorageV2),
}
_, err := NewBinlogRecordWriter(s.ctx, s.collectionID, s.partitionID, s.segmentID, s.schema, s.logIDAlloc, s.chunkSize, s.rootPath, s.maxRowNum, wOption...)
s.Error(err)
}
func (s *PackedBinlogRecordSuite) TestEmptyBinlog() {
rOption := []RwOption{
WithVersion(StorageV2),
}
_, err := NewBinlogRecordReader(s.ctx, []*datapb.FieldBinlog{}, s.schema, rOption...)
s.Error(err)
}
func (s *PackedBinlogRecordSuite) TestAllocIDExhausedError() {
columnGroups := genTestColumnGroups(s.schema)
wOption := []RwOption{
WithVersion(StorageV2),
WithColumnGroups(columnGroups),
}
logIDAlloc := allocator.NewLocalAllocator(1, 1)
w, err := NewBinlogRecordWriter(s.ctx, s.collectionID, s.partitionID, s.segmentID, s.schema, logIDAlloc, s.chunkSize, s.rootPath, s.maxRowNum, wOption...)
s.NoError(err)
size := 10
blobs, err := generateTestData(size)
s.NoError(err)
reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs))
s.NoError(err)
defer reader.Close()
for i := 0; i < size; i++ {
err = reader.Next()
s.NoError(err)
value := reader.Value()
rec, err := ValueSerializer([]*Value{value}, s.schema.Fields)
s.NoError(err)
err = w.Write(rec)
s.Error(err)
}
}
func genRowWithBM25(magic int64) map[int64]interface{} {
ts := tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)
return map[int64]interface{}{
common.RowIDField: magic,
common.TimeStampField: int64(ts),
100: strconv.FormatInt(magic, 10),
101: "varchar",
102: typeutil.CreateAndSortSparseFloatRow(map[uint32]float32{1: 1}),
}
}
func genCollectionSchemaWithBM25() *schemapb.CollectionSchema {
return &schemapb.CollectionSchema{
Name: "schema",
Description: "schema",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField,
Name: "row_id",
DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField,
Name: "Timestamp",
DataType: schemapb.DataType_Int64,
},
{
FieldID: 100,
Name: "pk",
DataType: schemapb.DataType_VarChar,
IsPrimaryKey: true,
},
{
FieldID: 101,
Name: "text",
DataType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.MaxLengthKey,
Value: "8",
},
},
},
{
FieldID: 102,
Name: "sparse",
DataType: schemapb.DataType_SparseFloatVector,
},
},
Functions: []*schemapb.FunctionSchema{{
Name: "BM25",
Id: 100,
Type: schemapb.FunctionType_BM25,
InputFieldNames: []string{"text"},
InputFieldIds: []int64{101},
OutputFieldNames: []string{"sparse"},
OutputFieldIds: []int64{102},
}},
}
}
func getMilvusBirthday() time.Time {
return time.Date(2019, time.Month(5), 30, 0, 0, 0, 0, time.UTC)
}

View File

@ -21,32 +21,73 @@ import (
"io"
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/array"
"github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/storagecommon"
"github.com/milvus-io/milvus/internal/storagev2/packed"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/etcdpb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/metautil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
type packedRecordReader struct {
paths [][]string
chunk int
reader *packed.PackedReader
bufferSize int64
schema *schemapb.CollectionSchema
field2Col map[FieldID]int
bufferSize int64
arrowSchema *arrow.Schema
field2Col map[FieldID]int
}
var _ RecordReader = (*packedRecordReader)(nil)
func (pr *packedRecordReader) iterateNextBatch() error {
if pr.reader != nil {
if err := pr.reader.Close(); err != nil {
return err
}
}
if pr.chunk >= len(pr.paths) {
return io.EOF
}
reader, err := packed.NewPackedReader(pr.paths[pr.chunk], pr.arrowSchema, pr.bufferSize)
pr.chunk++
if err != nil {
return merr.WrapErrParameterInvalid("New binlog record packed reader error: %s", err.Error())
}
pr.reader = reader
return nil
}
func (pr *packedRecordReader) Next() (Record, error) {
if pr.reader == nil {
return nil, io.EOF
if err := pr.iterateNextBatch(); err != nil {
return nil, err
}
}
rec, err := pr.reader.ReadNext()
if err != nil || rec == nil {
return nil, io.EOF
for {
rec, err := pr.reader.ReadNext()
if err == io.EOF {
if err := pr.iterateNextBatch(); err != nil {
return nil, err
}
continue
} else if err != nil {
return nil, err
}
return NewSimpleArrowRecord(rec, pr.field2Col), nil
}
return NewSimpleArrowRecord(rec, pr.field2Col), nil
}
func (pr *packedRecordReader) Close() error {
@ -56,30 +97,26 @@ func (pr *packedRecordReader) Close() error {
return nil
}
func newPackedRecordReader(paths []string, schema *schemapb.CollectionSchema, bufferSize int64,
func newPackedRecordReader(paths [][]string, schema *schemapb.CollectionSchema, bufferSize int64,
) (*packedRecordReader, error) {
arrowSchema, err := ConvertToArrowSchema(schema.Fields)
if err != nil {
return nil, merr.WrapErrParameterInvalid("convert collection schema [%s] to arrow schema error: %s", schema.Name, err.Error())
}
reader, err := packed.NewPackedReader(paths, arrowSchema, bufferSize)
if err != nil {
return nil, merr.WrapErrParameterInvalid("New binlog record packed reader error: %s", err.Error())
}
field2Col := make(map[FieldID]int)
for i, field := range schema.Fields {
field2Col[field.FieldID] = i
}
return &packedRecordReader{
reader: reader,
schema: schema,
bufferSize: bufferSize,
field2Col: field2Col,
paths: paths,
bufferSize: bufferSize,
arrowSchema: arrowSchema,
field2Col: field2Col,
}, nil
}
func NewPackedDeserializeReader(paths []string, schema *schemapb.CollectionSchema,
bufferSize int64, pkFieldID FieldID,
func NewPackedDeserializeReader(paths [][]string, schema *schemapb.CollectionSchema,
bufferSize int64,
) (*DeserializeReader[*Value], error) {
reader, err := newPackedRecordReader(paths, schema, bufferSize)
if err != nil {
@ -87,12 +124,23 @@ func NewPackedDeserializeReader(paths []string, schema *schemapb.CollectionSchem
}
return NewDeserializeReader(reader, func(r Record, v []*Value) error {
pkField := func() *schemapb.FieldSchema {
for _, field := range schema.Fields {
if field.GetIsPrimaryKey() {
return field
}
}
return nil
}()
if pkField == nil {
return merr.WrapErrServiceInternal("no primary key field found")
}
rec, ok := r.(*simpleArrowRecord)
if !ok {
return merr.WrapErrServiceInternal("can not cast to simple arrow record")
}
schema := reader.schema
numFields := len(schema.Fields)
for i := 0; i < rec.Len(); i++ {
if v[i] == nil {
@ -124,8 +172,8 @@ func NewPackedDeserializeReader(paths []string, schema *schemapb.CollectionSchem
value.ID = rowID
value.Timestamp = m[common.TimeStampField].(int64)
pkCol := rec.field2Col[pkFieldID]
pk, err := GenPrimaryKeyByRawData(m[pkFieldID], schema.Fields[pkCol].DataType)
pkCol := rec.field2Col[pkField.FieldID]
pk, err := GenPrimaryKeyByRawData(m[pkField.FieldID], schema.Fields[pkCol].DataType)
if err != nil {
return err
}
@ -141,16 +189,15 @@ func NewPackedDeserializeReader(paths []string, schema *schemapb.CollectionSchem
var _ RecordWriter = (*packedRecordWriter)(nil)
type packedRecordWriter struct {
writer *packed.PackedWriter
bufferSize int64
multiPartUploadSize int64
columnGroups [][]int
paths []string
schema *arrow.Schema
numRows int
writtenUncompressed uint64
writer *packed.PackedWriter
bufferSize int64
multiPartUploadSize int64
columnGroups []storagecommon.ColumnGroup
paths []string
schema *arrow.Schema
rowNum int64
writtenUncompressed uint64
columnGroupUncompressed []uint64
}
func (pw *packedRecordWriter) Write(r Record) error {
@ -158,9 +205,16 @@ func (pw *packedRecordWriter) Write(r Record) error {
if !ok {
return merr.WrapErrServiceInternal("can not cast to simple arrow record")
}
pw.numRows += r.Len()
for _, arr := range rec.r.Columns() {
pw.writtenUncompressed += uint64(calculateArraySize(arr))
pw.rowNum += int64(r.Len())
for col, arr := range rec.r.Columns() {
size := uint64(calculateArraySize(arr))
pw.writtenUncompressed += size
for columnGroup, group := range pw.columnGroups {
if lo.Contains(group.Columns, col) {
pw.columnGroupUncompressed[columnGroup] += size
break
}
}
}
defer rec.Release()
return pw.writer.WriteRecordBatch(rec.r)
@ -170,6 +224,14 @@ func (pw *packedRecordWriter) GetWrittenUncompressed() uint64 {
return pw.writtenUncompressed
}
func (pw *packedRecordWriter) GetWrittenPaths() []string {
return pw.paths
}
func (pw *packedRecordWriter) GetWrittenRowNum() int64 {
return pw.rowNum
}
func (pw *packedRecordWriter) Close() error {
if pw.writer != nil {
return pw.writer.Close()
@ -177,32 +239,331 @@ func (pw *packedRecordWriter) Close() error {
return nil
}
func NewPackedRecordWriter(paths []string, schema *arrow.Schema, bufferSize int64, multiPartUploadSize int64, columnGroups [][]int) (*packedRecordWriter, error) {
func NewPackedRecordWriter(paths []string, schema *arrow.Schema, bufferSize int64, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup) (*packedRecordWriter, error) {
writer, err := packed.NewPackedWriter(paths, schema, bufferSize, multiPartUploadSize, columnGroups)
if err != nil {
return nil, merr.WrapErrServiceInternal(
fmt.Sprintf("can not new packed record writer %s", err.Error()))
}
columnGroupUncompressed := make([]uint64, len(columnGroups))
return &packedRecordWriter{
writer: writer,
schema: schema,
bufferSize: bufferSize,
paths: paths,
writer: writer,
schema: schema,
bufferSize: bufferSize,
paths: paths,
columnGroups: columnGroups,
columnGroupUncompressed: columnGroupUncompressed,
}, nil
}
func NewPackedSerializeWriter(paths []string, schema *schemapb.CollectionSchema, bufferSize int64, multiPartUploadSize int64, columnGroups [][]int, batchSize int) (*SerializeWriter[*Value], error) {
func NewPackedSerializeWriter(paths []string, schema *schemapb.CollectionSchema, bufferSize int64, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup, batchSize int) (*SerializeWriter[*Value], error) {
arrowSchema, err := ConvertToArrowSchema(schema.Fields)
if err != nil {
return nil, merr.WrapErrServiceInternal(
fmt.Sprintf("can not convert collection schema %s to arrow schema: %s", schema.Name, err.Error()))
}
packedRecordWriter, err := NewPackedRecordWriter(paths, arrowSchema, bufferSize, multiPartUploadSize, columnGroups)
PackedBinlogRecordWriter, err := NewPackedRecordWriter(paths, arrowSchema, bufferSize, multiPartUploadSize, columnGroups)
if err != nil {
return nil, merr.WrapErrServiceInternal(
fmt.Sprintf("can not new packed record writer %s", err.Error()))
}
return NewSerializeRecordWriter[*Value](packedRecordWriter, func(v []*Value) (Record, error) {
return NewSerializeRecordWriter[*Value](PackedBinlogRecordWriter, func(v []*Value) (Record, error) {
return ValueSerializer(v, schema.Fields)
}, batchSize), nil
}
var _ BinlogRecordWriter = (*PackedBinlogRecordWriter)(nil)
type PackedBinlogRecordWriter struct {
// attributes
collectionID UniqueID
partitionID UniqueID
segmentID UniqueID
schema *schemapb.CollectionSchema
BlobsWriter ChunkedBlobsWriter
allocator allocator.Interface
chunkSize uint64
rootPath string
maxRowNum int64
arrowSchema *arrow.Schema
bufferSize int64
multiPartUploadSize int64
columnGroups []storagecommon.ColumnGroup
// writer and stats generated at runtime
writer *packedRecordWriter
pkstats *PrimaryKeyStats
bm25Stats map[int64]*BM25Stats
tsFrom typeutil.Timestamp
tsTo typeutil.Timestamp
rowNum int64
writtenUncompressed uint64
// results
fieldBinlogs map[FieldID]*datapb.FieldBinlog
statsLog *datapb.FieldBinlog
bm25StatsLog map[FieldID]*datapb.FieldBinlog
}
func (pw *PackedBinlogRecordWriter) Write(r Record) error {
if err := pw.initWriters(); err != nil {
return err
}
tsArray := r.Column(common.TimeStampField).(*array.Int64)
rows := r.Len()
for i := 0; i < rows; i++ {
ts := typeutil.Timestamp(tsArray.Value(i))
if ts < pw.tsFrom {
pw.tsFrom = ts
}
if ts > pw.tsTo {
pw.tsTo = ts
}
switch schemapb.DataType(pw.pkstats.PkType) {
case schemapb.DataType_Int64:
pkArray := r.Column(pw.pkstats.FieldID).(*array.Int64)
pk := &Int64PrimaryKey{
Value: pkArray.Value(i),
}
pw.pkstats.Update(pk)
case schemapb.DataType_VarChar:
pkArray := r.Column(pw.pkstats.FieldID).(*array.String)
pk := &VarCharPrimaryKey{
Value: pkArray.Value(i),
}
pw.pkstats.Update(pk)
default:
panic("invalid data type")
}
for fieldID, stats := range pw.bm25Stats {
field, ok := r.Column(fieldID).(*array.Binary)
if !ok {
return fmt.Errorf("bm25 field value not found")
}
stats.AppendBytes(field.Value(i))
}
}
err := pw.writer.Write(r)
if err != nil {
return merr.WrapErrServiceInternal(fmt.Sprintf("write record batch error: %s", err.Error()))
}
return nil
}
func (pw *PackedBinlogRecordWriter) initWriters() error {
if pw.writer == nil {
logIdStart, _, err := pw.allocator.Alloc(uint32(len(pw.columnGroups)))
if err != nil {
return err
}
paths := []string{}
for columnGroup := range pw.columnGroups {
path := metautil.BuildInsertLogPath(pw.rootPath, pw.collectionID, pw.partitionID, pw.segmentID, typeutil.UniqueID(columnGroup), logIdStart)
paths = append(paths, path)
logIdStart++
}
pw.writer, err = NewPackedRecordWriter(paths, pw.arrowSchema, pw.bufferSize, pw.multiPartUploadSize, pw.columnGroups)
if err != nil {
return merr.WrapErrServiceInternal(fmt.Sprintf("can not new packed record writer %s", err.Error()))
}
}
return nil
}
func (pw *PackedBinlogRecordWriter) GetWrittenUncompressed() uint64 {
return pw.writtenUncompressed
}
func (pw *PackedBinlogRecordWriter) Close() error {
pw.finalizeBinlogs()
if err := pw.writeStats(); err != nil {
return err
}
if err := pw.writeBm25Stats(); err != nil {
return err
}
if err := pw.writer.Close(); err != nil {
return err
}
return nil
}
func (pw *PackedBinlogRecordWriter) finalizeBinlogs() {
if pw.writer == nil {
return
}
pw.rowNum = pw.writer.GetWrittenRowNum()
pw.writtenUncompressed = pw.writer.GetWrittenUncompressed()
if pw.fieldBinlogs == nil {
pw.fieldBinlogs = make(map[FieldID]*datapb.FieldBinlog, len(pw.columnGroups))
}
for columnGroup := range pw.columnGroups {
columnGroupID := typeutil.UniqueID(columnGroup)
if _, exists := pw.fieldBinlogs[columnGroupID]; !exists {
pw.fieldBinlogs[columnGroupID] = &datapb.FieldBinlog{
FieldID: columnGroupID,
}
}
pw.fieldBinlogs[columnGroupID].Binlogs = append(pw.fieldBinlogs[columnGroupID].Binlogs, &datapb.Binlog{
LogSize: int64(pw.writer.columnGroupUncompressed[columnGroup]), // TODO: should provide the log size of each column group file in storage v2
MemorySize: int64(pw.writer.columnGroupUncompressed[columnGroup]),
LogPath: pw.writer.GetWrittenPaths()[columnGroupID],
EntriesNum: pw.writer.GetWrittenRowNum(),
TimestampFrom: pw.tsFrom,
TimestampTo: pw.tsTo,
})
}
}
func (pw *PackedBinlogRecordWriter) writeStats() error {
if pw.pkstats == nil {
return nil
}
id, err := pw.allocator.AllocOne()
if err != nil {
return err
}
codec := NewInsertCodecWithSchema(&etcdpb.CollectionMeta{
ID: pw.collectionID,
Schema: pw.schema,
})
sblob, err := codec.SerializePkStats(pw.pkstats, pw.rowNum)
if err != nil {
return err
}
sblob.Key = metautil.BuildStatsLogPath(pw.rootPath,
pw.collectionID, pw.partitionID, pw.segmentID, pw.pkstats.FieldID, id)
if err := pw.BlobsWriter([]*Blob{sblob}); err != nil {
return err
}
pw.statsLog = &datapb.FieldBinlog{
FieldID: pw.pkstats.FieldID,
Binlogs: []*datapb.Binlog{
{
LogSize: int64(len(sblob.GetValue())),
MemorySize: int64(len(sblob.GetValue())),
LogPath: sblob.Key,
EntriesNum: pw.rowNum,
},
},
}
return nil
}
func (pw *PackedBinlogRecordWriter) writeBm25Stats() error {
if len(pw.bm25Stats) == 0 {
return nil
}
id, _, err := pw.allocator.Alloc(uint32(len(pw.bm25Stats)))
if err != nil {
return err
}
if pw.bm25StatsLog == nil {
pw.bm25StatsLog = make(map[FieldID]*datapb.FieldBinlog)
}
for fid, stats := range pw.bm25Stats {
bytes, err := stats.Serialize()
if err != nil {
return err
}
key := metautil.BuildBm25LogPath(pw.rootPath,
pw.collectionID, pw.partitionID, pw.segmentID, fid, id)
blob := &Blob{
Key: key,
Value: bytes,
RowNum: stats.NumRow(),
MemorySize: int64(len(bytes)),
}
if err := pw.BlobsWriter([]*Blob{blob}); err != nil {
return err
}
fieldLog := &datapb.FieldBinlog{
FieldID: fid,
Binlogs: []*datapb.Binlog{
{
LogSize: int64(len(blob.GetValue())),
MemorySize: int64(len(blob.GetValue())),
LogPath: key,
EntriesNum: pw.rowNum,
},
},
}
pw.bm25StatsLog[fid] = fieldLog
id++
}
return nil
}
func (pw *PackedBinlogRecordWriter) GetLogs() (
fieldBinlogs map[FieldID]*datapb.FieldBinlog,
statsLog *datapb.FieldBinlog,
bm25StatsLog map[FieldID]*datapb.FieldBinlog,
) {
return pw.fieldBinlogs, pw.statsLog, pw.bm25StatsLog
}
func (pw *PackedBinlogRecordWriter) GetRowNum() int64 {
return pw.rowNum
}
func newPackedBinlogRecordWriter(collectionID, partitionID, segmentID UniqueID, schema *schemapb.CollectionSchema,
blobsWriter ChunkedBlobsWriter, allocator allocator.Interface, chunkSize uint64, rootPath string, maxRowNum int64, bufferSize, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup,
) (*PackedBinlogRecordWriter, error) {
if len(columnGroups) == 0 {
return nil, merr.WrapErrParameterInvalidMsg("please specify column group for packed binlog record writer")
}
arrowSchema, err := ConvertToArrowSchema(schema.Fields)
if err != nil {
return nil, merr.WrapErrParameterInvalid("convert collection schema [%s] to arrow schema error: %s", schema.Name, err.Error())
}
pkField, err := typeutil.GetPrimaryFieldSchema(schema)
if err != nil {
log.Warn("failed to get pk field from schema")
return nil, err
}
stats, err := NewPrimaryKeyStats(pkField.GetFieldID(), int64(pkField.GetDataType()), maxRowNum)
if err != nil {
return nil, err
}
bm25FieldIDs := lo.FilterMap(schema.GetFunctions(), func(function *schemapb.FunctionSchema, _ int) (int64, bool) {
if function.GetType() == schemapb.FunctionType_BM25 {
return function.GetOutputFieldIds()[0], true
}
return 0, false
})
bm25Stats := make(map[int64]*BM25Stats, len(bm25FieldIDs))
for _, fid := range bm25FieldIDs {
bm25Stats[fid] = NewBM25Stats()
}
return &PackedBinlogRecordWriter{
collectionID: collectionID,
partitionID: partitionID,
segmentID: segmentID,
schema: schema,
arrowSchema: arrowSchema,
BlobsWriter: blobsWriter,
allocator: allocator,
chunkSize: chunkSize,
rootPath: rootPath,
maxRowNum: maxRowNum,
bufferSize: bufferSize,
multiPartUploadSize: multiPartUploadSize,
columnGroups: columnGroups,
pkstats: stats,
bm25Stats: bm25Stats,
}, nil
}

View File

@ -17,12 +17,13 @@
package storage
import (
"io"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/storagecommon"
"github.com/milvus-io/milvus/internal/util/initcore"
"github.com/milvus-io/milvus/pkg/v2/common"
)
func TestPackedSerde(t *testing.T) {
@ -30,47 +31,56 @@ func TestPackedSerde(t *testing.T) {
initcore.InitLocalArrowFileSystem("/tmp")
size := 10
blobs, err := generateTestData(size)
assert.NoError(t, err)
reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs))
assert.NoError(t, err)
defer reader.Close()
paths := []string{"/tmp/0"}
paths := [][]string{{"/tmp/0"}, {"/tmp/1"}}
bufferSize := int64(10 * 1024 * 1024) // 10MB
schema := generateTestSchema()
group := []int{}
for i := 0; i < len(schema.Fields); i++ {
group = append(group, i)
}
columnGroups := [][]int{group}
multiPartUploadSize := int64(0)
batchSize := 7
writer, err := NewPackedSerializeWriter(paths, schema, bufferSize, multiPartUploadSize, columnGroups, batchSize)
assert.NoError(t, err)
for i := 1; i <= size; i++ {
err = reader.Next()
prepareChunkData := func(chunkPaths []string, size int) {
blobs, err := generateTestData(size)
assert.NoError(t, err)
value := reader.Value()
assertTestData(t, i, value)
err := writer.Write(value)
reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs))
assert.NoError(t, err)
group := storagecommon.ColumnGroup{}
for i := 0; i < len(schema.Fields); i++ {
group.Columns = append(group.Columns, i)
}
multiPartUploadSize := int64(0)
batchSize := 7
writer, err := NewPackedSerializeWriter(chunkPaths, generateTestSchema(), bufferSize, multiPartUploadSize, []storagecommon.ColumnGroup{group}, batchSize)
assert.NoError(t, err)
for i := 1; i <= size; i++ {
err = reader.Next()
assert.NoError(t, err)
value := reader.Value()
assertTestData(t, i, value)
err := writer.Write(value)
assert.NoError(t, err)
}
err = writer.Close()
assert.NoError(t, err)
err = reader.Close()
assert.NoError(t, err)
}
err = writer.Close()
assert.NoError(t, err)
reader, err = NewPackedDeserializeReader(paths, schema, bufferSize, common.RowIDField)
for _, chunkPaths := range paths {
prepareChunkData(chunkPaths, size)
}
reader, err := NewPackedDeserializeReader(paths, schema, bufferSize)
assert.NoError(t, err)
defer reader.Close()
for i := 1; i <= size; i++ {
for i := 0; i < size*len(paths); i++ {
err = reader.Next()
assert.NoError(t, err)
value := reader.Value()
assertTestData(t, i, value)
assertTestData(t, i%10+1, value)
}
err = reader.Next()
assert.Equal(t, err, io.EOF)
})
}

View File

@ -0,0 +1,49 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storagecommon
import (
"github.com/samber/lo"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
)
type ColumnGroup struct {
Columns []int // column indices
}
// split by row average size
func SplitByFieldSize(fieldBinlogs []*datapb.FieldBinlog, splitThresHold int64) []ColumnGroup {
groups := make([]ColumnGroup, 0)
shortColumnGroup := ColumnGroup{Columns: make([]int, 0)}
for i, fieldBinlog := range fieldBinlogs {
if len(fieldBinlog.Binlogs) == 0 {
continue
}
totalSize := lo.SumBy(fieldBinlog.Binlogs, func(b *datapb.Binlog) int64 { return b.LogSize })
totalNumRows := lo.SumBy(fieldBinlog.Binlogs, func(b *datapb.Binlog) int64 { return b.EntriesNum })
if totalSize/totalNumRows >= splitThresHold {
groups = append(groups, ColumnGroup{Columns: []int{i}})
} else {
shortColumnGroup.Columns = append(shortColumnGroup.Columns, i)
}
}
if len(shortColumnGroup.Columns) > 0 {
groups = append(groups, shortColumnGroup)
}
return groups
}

View File

@ -0,0 +1,127 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storagecommon
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
)
func TestSplitByFieldSize(t *testing.T) {
tests := []struct {
name string
fieldBinlogs []*datapb.FieldBinlog
splitThresHold int64
expected []ColumnGroup
}{
{
name: "Empty input",
fieldBinlogs: []*datapb.FieldBinlog{},
splitThresHold: 100,
expected: []ColumnGroup{},
},
{
name: "Empty binlogs",
fieldBinlogs: []*datapb.FieldBinlog{{FieldID: 0, Binlogs: []*datapb.Binlog{}}},
splitThresHold: 100,
expected: []ColumnGroup{},
},
{
name: "above threshold",
fieldBinlogs: []*datapb.FieldBinlog{
{
FieldID: 0,
Binlogs: []*datapb.Binlog{
{LogSize: 1000, EntriesNum: 10},
},
},
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{LogSize: 2000, EntriesNum: 10},
},
},
},
splitThresHold: 50,
expected: []ColumnGroup{
{Columns: []int{0}},
{Columns: []int{1}},
},
},
{
name: "one field",
fieldBinlogs: []*datapb.FieldBinlog{
{
FieldID: 0,
Binlogs: []*datapb.Binlog{
{LogSize: 100, EntriesNum: 10},
},
},
},
splitThresHold: 50,
expected: []ColumnGroup{
{Columns: []int{0}},
},
},
{
name: "Multiple fields, mixed sizes",
fieldBinlogs: []*datapb.FieldBinlog{
{
FieldID: 0,
Binlogs: []*datapb.Binlog{ // (above)
{LogSize: 500, EntriesNum: 5},
{LogSize: 500, EntriesNum: 5},
},
},
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{LogSize: 200, EntriesNum: 20}, // (below)
},
},
{
FieldID: 2,
Binlogs: []*datapb.Binlog{
{LogSize: 500, EntriesNum: 10}, // (threshold)
},
},
{
FieldID: 3,
Binlogs: []*datapb.Binlog{
{LogSize: 400, EntriesNum: 10}, // (below)
},
},
},
splitThresHold: 50,
expected: []ColumnGroup{
{Columns: []int{0}},
{Columns: []int{2}},
{Columns: []int{1, 3}},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := SplitByFieldSize(tt.fieldBinlogs, tt.splitThresHold)
assert.Equal(t, tt.expected, result)
})
}
}

View File

@ -26,6 +26,7 @@ import "C"
import (
"fmt"
"io"
"unsafe"
"github.com/apache/arrow/go/v17/arrow"
@ -64,7 +65,7 @@ func (pr *PackedReader) ReadNext() (arrow.Record, error) {
}
if cArr == nil {
return nil, nil // end of stream, no more records to read
return nil, io.EOF // end of stream, no more records to read
}
// Convert ArrowArray to Go RecordBatch using cdata

View File

@ -15,6 +15,7 @@
package packed
import (
"io"
"testing"
"github.com/apache/arrow/go/v17/arrow"
@ -23,6 +24,7 @@ import (
"github.com/stretchr/testify/suite"
"golang.org/x/exp/rand"
"github.com/milvus-io/milvus/internal/storagecommon"
"github.com/milvus-io/milvus/internal/util/initcore"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
@ -77,7 +79,7 @@ func (suite *PackedTestSuite) TestPackedOneFile() {
batches := 100
paths := []string{"/tmp/100"}
columnGroups := [][]int{{0, 1, 2}}
columnGroups := []storagecommon.ColumnGroup{{Columns: []int{0, 1, 2}}}
bufferSize := int64(10 * 1024 * 1024) // 10MB
multiPartUploadSize := int64(0)
pw, err := NewPackedWriter(paths, suite.schema, bufferSize, multiPartUploadSize, columnGroups)
@ -129,7 +131,7 @@ func (suite *PackedTestSuite) TestPackedMultiFiles() {
rec := b.NewRecord()
defer rec.Release()
paths := []string{"/tmp/100", "/tmp/101"}
columnGroups := [][]int{{2}, {0, 1}}
columnGroups := []storagecommon.ColumnGroup{{Columns: []int{2}}, {Columns: []int{0, 1}}}
bufferSize := int64(10 * 1024 * 1024) // 10MB
multiPartUploadSize := int64(0)
pw, err := NewPackedWriter(paths, suite.schema, bufferSize, multiPartUploadSize, columnGroups)
@ -147,8 +149,10 @@ func (suite *PackedTestSuite) TestPackedMultiFiles() {
var rr arrow.Record
for {
rr, err = reader.ReadNext()
suite.NoError(err)
if rr == nil {
if err == nil {
suite.NotNil(rr)
}
if err == io.EOF {
// end of file
break
}

View File

@ -31,9 +31,11 @@ import (
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/cdata"
"github.com/milvus-io/milvus/internal/storagecommon"
)
func NewPackedWriter(filePaths []string, schema *arrow.Schema, bufferSize int64, multiPartUploadSize int64, columnGroups [][]int) (*PackedWriter, error) {
func NewPackedWriter(filePaths []string, schema *arrow.Schema, bufferSize int64, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup) (*PackedWriter, error) {
cFilePaths := make([]*C.char, len(filePaths))
for i, path := range filePaths {
cFilePaths[i] = C.CString(path)
@ -52,15 +54,15 @@ func NewPackedWriter(filePaths []string, schema *arrow.Schema, bufferSize int64,
cColumnGroups := C.NewCColumnGroups()
for _, group := range columnGroups {
cGroup := C.malloc(C.size_t(len(group)) * C.size_t(unsafe.Sizeof(C.int(0))))
cGroup := C.malloc(C.size_t(len(group.Columns)) * C.size_t(unsafe.Sizeof(C.int(0))))
if cGroup == nil {
return nil, fmt.Errorf("failed to allocate memory for column groups")
}
cGroupSlice := (*[1 << 30]C.int)(cGroup)[:len(group):len(group)]
for i, val := range group {
cGroupSlice := (*[1 << 30]C.int)(cGroup)[:len(group.Columns):len(group.Columns)]
for i, val := range group.Columns {
cGroupSlice[i] = C.int(val)
}
C.AddCColumnGroup(cColumnGroups, (*C.int)(cGroup), C.int(len(group)))
C.AddCColumnGroup(cColumnGroups, (*C.int)(cGroup), C.int(len(group.Columns)))
C.free(cGroup)
}