enhance: [2.4] Avoid merging insert data when buffering insert msgs (#34205)

Cherry-pick from master
pr: #33526 #33817
See also #33561

This PR:
- Use zero copy when buffering insert messages
- Make `storage.InsertCodec` support serialize multiple insert data
chunk into same batch binlog files

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/34185/head^2
congqixia 2024-06-27 10:14:05 +08:00 committed by GitHub
parent aae94d7c40
commit f741bb7526
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 296 additions and 224 deletions

View File

@ -79,7 +79,7 @@ func NewSyncTask(ctx context.Context,
}
syncPack := &syncmgr.SyncPack{}
syncPack.WithInsertData(insertData).
syncPack.WithInsertData([]*storage.InsertData{insertData}).
WithDeleteData(deleteData).
WithCollectionID(collectionID).
WithPartitionID(partitionID).

View File

@ -5,9 +5,11 @@ import (
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
)
func TestInsertBinlogIteratorSuite(t *testing.T) {
@ -223,6 +225,9 @@ func genTestInsertData() (*storage.InsertData, *etcdpb.CollectionMeta) {
IsPrimaryKey: false,
Description: "binary_vector",
DataType: schemapb.DataType_BinaryVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "8"},
},
},
{
FieldID: FloatVectorField,
@ -230,6 +235,9 @@ func genTestInsertData() (*storage.InsertData, *etcdpb.CollectionMeta) {
IsPrimaryKey: false,
Description: "float_vector",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "4"},
},
},
{
FieldID: Float16VectorField,
@ -237,6 +245,9 @@ func genTestInsertData() (*storage.InsertData, *etcdpb.CollectionMeta) {
IsPrimaryKey: false,
Description: "float16_vector",
DataType: schemapb.DataType_Float16Vector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "4"},
},
},
{
FieldID: BFloat16VectorField,
@ -244,6 +255,9 @@ func genTestInsertData() (*storage.InsertData, *etcdpb.CollectionMeta) {
IsPrimaryKey: false,
Description: "bfloat16_vector",
DataType: schemapb.DataType_BFloat16Vector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "4"},
},
},
},
},

View File

@ -47,7 +47,7 @@ func (s *BloomFilterSetSuite) GetFieldData(ids []int64) storage.FieldData {
Name: "ID",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
})
}, len(ids))
s.Require().NoError(err)
for _, id := range ids {

View File

@ -19,6 +19,8 @@ package syncmgr
import (
"context"
"github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -37,7 +39,7 @@ type SyncPack struct {
metacache metacache.MetaCache
metawriter MetaWriter
// data
insertData *storage.InsertData
insertData []*storage.InsertData
deltaData *storage.DeleteData
// statistics
tsFrom typeutil.Timestamp
@ -55,8 +57,10 @@ type SyncPack struct {
level datapb.SegmentLevel
}
func (p *SyncPack) WithInsertData(insertData *storage.InsertData) *SyncPack {
p.insertData = insertData
func (p *SyncPack) WithInsertData(insertData []*storage.InsertData) *SyncPack {
p.insertData = lo.Filter(insertData, func(inData *storage.InsertData, _ int) bool {
return inData != nil
})
return p
}

View File

@ -82,10 +82,12 @@ func (s *storageV1Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack)
zap.String("channel", pack.channelName),
)
if pack.insertData != nil {
if len(pack.insertData) > 0 {
memSize := make(map[int64]int64)
for fieldID, fieldData := range pack.insertData.Data {
memSize[fieldID] = int64(fieldData.GetMemorySize())
for _, chunk := range pack.insertData {
for fieldID, fieldData := range chunk.Data {
memSize[fieldID] += int64(fieldData.GetMemorySize())
}
}
task.binlogMemsize = memSize
@ -159,7 +161,7 @@ func (s *storageV1Serializer) setTaskMeta(task *SyncTask, pack *SyncPack) {
func (s *storageV1Serializer) serializeBinlog(ctx context.Context, pack *SyncPack) (map[int64]*storage.Blob, error) {
log := log.Ctx(ctx)
blobs, err := s.inCodec.Serialize(pack.partitionID, pack.segmentID, pack.insertData)
blobs, err := s.inCodec.Serialize(pack.partitionID, pack.segmentID, pack.insertData...)
if err != nil {
return nil, err
}
@ -178,14 +180,21 @@ func (s *storageV1Serializer) serializeBinlog(ctx context.Context, pack *SyncPac
}
func (s *storageV1Serializer) serializeStatslog(pack *SyncPack) (*storage.PrimaryKeyStats, *storage.Blob, error) {
pkFieldData := pack.insertData.Data[s.pkField.GetFieldID()]
rowNum := int64(pkFieldData.RowNum())
var rowNum int64
var pkFieldData []storage.FieldData
for _, chunk := range pack.insertData {
chunkPKData := chunk.Data[s.pkField.GetFieldID()]
pkFieldData = append(pkFieldData, chunkPKData)
rowNum += int64(chunkPKData.RowNum())
}
stats, err := storage.NewPrimaryKeyStats(s.pkField.GetFieldID(), int64(s.pkField.GetDataType()), rowNum)
if err != nil {
return nil, nil, err
}
stats.UpdateByMsgs(pkFieldData)
for _, chunkPkData := range pkFieldData {
stats.UpdateByMsgs(chunkPkData)
}
blob, err := s.inCodec.SerializePkStats(stats, pack.batchSize)
if err != nil {

View File

@ -160,7 +160,7 @@ func (s *StorageV1SerializerSuite) getBfs() *metacache.BloomFilterSet {
Name: "ID",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
})
}, 16)
s.Require().NoError(err)
ids := []int64{1, 2, 3, 4, 5, 6, 7}
@ -200,7 +200,7 @@ func (s *StorageV1SerializerSuite) TestSerializeInsert() {
s.Run("with_empty_data", func() {
pack := s.getBasicPack()
pack.WithTimeRange(50, 100)
pack.WithInsertData(s.getEmptyInsertBuffer()).WithBatchSize(0)
pack.WithInsertData([]*storage.InsertData{s.getEmptyInsertBuffer()}).WithBatchSize(0)
_, err := s.serializer.EncodeBuffer(ctx, pack)
s.Error(err)
@ -209,7 +209,7 @@ func (s *StorageV1SerializerSuite) TestSerializeInsert() {
s.Run("with_normal_data", func() {
pack := s.getBasicPack()
pack.WithTimeRange(50, 100)
pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10)
pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithBatchSize(10)
s.mockCache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return().Once()
@ -243,7 +243,7 @@ func (s *StorageV1SerializerSuite) TestSerializeInsert() {
s.Run("with_flush", func() {
pack := s.getBasicPack()
pack.WithTimeRange(50, 100)
pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10)
pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithBatchSize(10)
pack.WithFlush()
bfs := s.getBfs()

View File

@ -82,7 +82,7 @@ func (s *storageV2Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack)
}
task.space = space
if pack.insertData != nil {
if len(pack.insertData) > 0 {
insertReader, err := s.serializeInsertData(pack)
if err != nil {
log.Warn("failed to serialize insert data with storagev2", zap.Error(err))
@ -155,8 +155,10 @@ func (s *storageV2Serializer) serializeInsertData(pack *SyncPack) (array.RecordR
builder := array.NewRecordBuilder(memory.DefaultAllocator, s.arrowSchema)
defer builder.Release()
if err := iTypeutil.BuildRecord(builder, pack.insertData, s.schema.GetFields()); err != nil {
return nil, err
for _, chunk := range pack.insertData {
if err := iTypeutil.BuildRecord(builder, chunk, s.schema.GetFields()); err != nil {
return nil, err
}
}
rec := builder.NewRecord()

View File

@ -179,7 +179,7 @@ func (s *StorageV2SerializerSuite) getBfs() *metacache.BloomFilterSet {
Name: "ID",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
})
}, 16)
s.Require().NoError(err)
ids := []int64{1, 2, 3, 4, 5, 6, 7}
@ -221,7 +221,7 @@ func (s *StorageV2SerializerSuite) TestSerializeInsert() {
s.Run("empty_insert_data", func() {
pack := s.getBasicPack()
pack.WithTimeRange(50, 100)
pack.WithInsertData(s.getEmptyInsertBuffer()).WithBatchSize(0)
pack.WithInsertData([]*storage.InsertData{s.getEmptyInsertBuffer()}).WithBatchSize(0)
_, err := s.serializer.EncodeBuffer(ctx, pack)
s.Error(err)
@ -230,7 +230,7 @@ func (s *StorageV2SerializerSuite) TestSerializeInsert() {
s.Run("with_normal_data", func() {
pack := s.getBasicPack()
pack.WithTimeRange(50, 100)
pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10)
pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithBatchSize(10)
s.mockCache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return().Once()
@ -264,7 +264,7 @@ func (s *StorageV2SerializerSuite) TestSerializeInsert() {
s.Run("with_flush", func() {
pack := s.getBasicPack()
pack.WithTimeRange(50, 100)
pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10)
pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithBatchSize(10)
pack.WithFlush()
bfs := s.getBfs()

View File

@ -171,7 +171,7 @@ func (s *SyncTaskSuite) TestRunNormal() {
Name: "ID",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
})
}, 16)
s.Require().NoError(err)
ids := []int64{1, 2, 3, 4, 5, 6, 7}

View File

@ -176,7 +176,7 @@ func (s *SyncTaskSuiteV2) getSuiteSyncTask() *SyncTaskV2 {
Timestamp: 1000,
ChannelName: s.channelName,
})
pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10)
pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithBatchSize(10)
pack.WithDeleteData(s.getDeleteBuffer())
storageCache, err := metacache.NewStorageV2Cache(s.schema)
@ -203,7 +203,7 @@ func (s *SyncTaskSuiteV2) TestRunNormal() {
Name: "ID",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
})
}, 16)
s.Require().NoError(err)
ids := []int64{1, 2, 3, 4, 5, 6, 7}

View File

@ -74,7 +74,7 @@ type InsertBuffer struct {
BufferBase
collSchema *schemapb.CollectionSchema
buffer *storage.InsertData
buffers []*storage.InsertData
}
func NewInsertBuffer(sch *schemapb.CollectionSchema) (*InsertBuffer, error) {
@ -87,13 +87,10 @@ func NewInsertBuffer(sch *schemapb.CollectionSchema) (*InsertBuffer, error) {
if estSize == 0 {
return nil, errors.New("Invalid schema")
}
buffer, err := storage.NewInsertData(sch)
if err != nil {
return nil, err
}
sizeLimit := paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt64()
return &InsertBuffer{
ib := &InsertBuffer{
BufferBase: BufferBase{
rowLimit: noLimit,
sizeLimit: sizeLimit,
@ -101,26 +98,34 @@ func NewInsertBuffer(sch *schemapb.CollectionSchema) (*InsertBuffer, error) {
TimestampTo: 0,
},
collSchema: sch,
buffer: buffer,
}, nil
}
func (ib *InsertBuffer) Yield() *storage.InsertData {
if ib.IsEmpty() {
return nil
}
return ib.buffer
return ib, nil
}
func (ib *InsertBuffer) buffer(inData *storage.InsertData, tr TimeRange, startPos, endPos *msgpb.MsgPosition) {
// buffer := ib.currentBuffer()
// storage.MergeInsertData(buffer.buffer, inData)
ib.buffers = append(ib.buffers, inData)
ib.UpdateStatistics(int64(inData.GetRowNum()), int64(inData.GetMemorySize()), tr, startPos, endPos)
}
func (ib *InsertBuffer) Yield() []*storage.InsertData {
result := ib.buffers
// set buffer nil to so that fragmented buffer could get GCed
ib.buffers = nil
return result
}
func (ib *InsertBuffer) Buffer(inData *inData, startPos, endPos *msgpb.MsgPosition) int64 {
bufferedSize := int64(0)
for idx, data := range inData.data {
storage.MergeInsertData(ib.buffer, data)
tsData := inData.tsField[idx]
tr := ib.getTimestampRange(tsData)
ib.buffer(data, tr, startPos, endPos)
// update buffer size
ib.UpdateStatistics(int64(data.GetRowNum()), int64(data.GetMemorySize()), ib.getTimestampRange(tsData), startPos, endPos)
ib.UpdateStatistics(int64(data.GetRowNum()), int64(data.GetMemorySize()), tr, startPos, endPos)
bufferedSize += int64(data.GetMemorySize())
}
return bufferedSize

View File

@ -168,9 +168,12 @@ func (s *InsertBufferSuite) TestYield() {
result = insertBuffer.Yield()
s.NotNil(result)
pkField, ok := result.Data[common.StartOfUserFieldID]
s.Require().True(ok)
pkData := lo.RepeatBy(pkField.RowNum(), func(idx int) int64 { return pkField.GetRow(idx).(int64) })
var pkData []int64
for _, chunk := range result {
pkField, ok := chunk.Data[common.StartOfUserFieldID]
s.Require().True(ok)
pkData = append(pkData, lo.RepeatBy(pkField.RowNum(), func(idx int) int64 { return pkField.GetRow(idx).(int64) })...)
}
s.ElementsMatch(pks, pkData)
}
@ -232,20 +235,6 @@ func (s *InsertBufferConstructSuite) TestCreateFailure() {
Fields: []*schemapb.FieldSchema{},
},
},
{
tag: "missing_type_param",
schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
},
},
},
},
}
for _, tc := range cases {
s.Run(tc.tag, func() {

View File

@ -32,7 +32,7 @@ func (buf *segmentBuffer) IsFull() bool {
return buf.insertBuffer.IsFull() || buf.deltaBuffer.IsFull()
}
func (buf *segmentBuffer) Yield() (insert *storage.InsertData, delete *storage.DeleteData) {
func (buf *segmentBuffer) Yield() (insert []*storage.InsertData, delete *storage.DeleteData) {
return buf.insertBuffer.Yield(), buf.deltaBuffer.Yield()
}

View File

@ -364,7 +364,7 @@ func (wb *writeBufferBase) getOrCreateBuffer(segmentID int64) *segmentBuffer {
return buffer
}
func (wb *writeBufferBase) yieldBuffer(segmentID int64) (*storage.InsertData, *storage.DeleteData, *TimeRange, *msgpb.MsgPosition) {
func (wb *writeBufferBase) yieldBuffer(segmentID int64) ([]*storage.InsertData, *storage.DeleteData, *TimeRange, *msgpb.MsgPosition) {
buffer, ok := wb.buffers[segmentID]
if !ok {
return nil, nil, nil, nil
@ -560,10 +560,12 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (sy
}
actions := []metacache.SegmentAction{}
if insert != nil {
batchSize = int64(insert.GetRowNum())
totalMemSize += float64(insert.GetMemorySize())
for _, chunk := range insert {
batchSize = int64(chunk.GetRowNum())
totalMemSize += float64(chunk.GetMemorySize())
}
if delta != nil {
totalMemSize += float64(delta.Size())
}

View File

@ -9,6 +9,7 @@ import (
"golang.org/x/exp/mmap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
@ -66,6 +67,9 @@ var (
Description: "",
DataType: schemapb.DataType_FloatVector,
AutoID: false,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "8"},
},
},
},
},

View File

@ -208,58 +208,70 @@ func (insertCodec *InsertCodec) SerializePkStatsByData(data *InsertData) (*Blob,
// From schema, it gets all fields.
// For each field, it will create a binlog writer, and write an event to the binlog.
// It returns binlog buffer in the end.
func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data *InsertData) ([]*Blob, error) {
func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data ...*InsertData) ([]*Blob, error) {
blobs := make([]*Blob, 0)
var writer *InsertBinlogWriter
if insertCodec.Schema == nil {
return nil, fmt.Errorf("schema is not set")
}
timeFieldData, ok := data.Data[common.TimeStampField]
if !ok {
return nil, fmt.Errorf("data doesn't contains timestamp field")
}
if timeFieldData.RowNum() <= 0 {
return nil, fmt.Errorf("there's no data in InsertData")
}
rowNum := int64(timeFieldData.RowNum())
ts := timeFieldData.(*Int64FieldData).Data
var rowNum int64
var startTs, endTs Timestamp
startTs, endTs = math.MaxUint64, 0
for _, t := range ts {
if uint64(t) > endTs {
endTs = uint64(t)
for _, block := range data {
timeFieldData, ok := block.Data[common.TimeStampField]
if !ok {
return nil, fmt.Errorf("data doesn't contains timestamp field")
}
if uint64(t) < startTs {
startTs = uint64(t)
rowNum += int64(timeFieldData.RowNum())
ts := timeFieldData.(*Int64FieldData).Data
for _, t := range ts {
if uint64(t) > endTs {
endTs = uint64(t)
}
if uint64(t) < startTs {
startTs = uint64(t)
}
}
}
// sort insert data by rowID
dataSorter := &DataSorter{
InsertCodec: insertCodec,
InsertData: data,
}
sort.Sort(dataSorter)
for _, field := range insertCodec.Schema.Schema.Fields {
singleData := data.Data[field.FieldID]
// encode fields
writer = NewInsertBinlogWriter(field.DataType, insertCodec.Schema.ID, partitionID, segmentID, field.FieldID)
var eventWriter *insertEventWriter
var err error
var dim int64
if typeutil.IsVectorType(field.DataType) {
switch field.DataType {
case schemapb.DataType_FloatVector:
eventWriter, err = writer.NextInsertEventWriter(singleData.(*FloatVectorFieldData).Dim)
dim, err = typeutil.GetDim(field)
if err != nil {
return nil, err
}
eventWriter, err = writer.NextInsertEventWriter(int(dim))
case schemapb.DataType_BinaryVector:
eventWriter, err = writer.NextInsertEventWriter(singleData.(*BinaryVectorFieldData).Dim)
dim, err = typeutil.GetDim(field)
if err != nil {
return nil, err
}
eventWriter, err = writer.NextInsertEventWriter(int(dim))
case schemapb.DataType_Float16Vector:
eventWriter, err = writer.NextInsertEventWriter(singleData.(*Float16VectorFieldData).Dim)
dim, err = typeutil.GetDim(field)
if err != nil {
return nil, err
}
eventWriter, err = writer.NextInsertEventWriter(int(dim))
case schemapb.DataType_BFloat16Vector:
eventWriter, err = writer.NextInsertEventWriter(singleData.(*BFloat16VectorFieldData).Dim)
dim, err = typeutil.GetDim(field)
if err != nil {
return nil, err
}
eventWriter, err = writer.NextInsertEventWriter(int(dim))
case schemapb.DataType_SparseFloatVector:
eventWriter, err = writer.NextInsertEventWriter()
default:
@ -272,137 +284,147 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
writer.Close()
return nil, err
}
eventWriter.SetEventTimestamp(startTs, endTs)
switch field.DataType {
case schemapb.DataType_Bool:
err = eventWriter.AddBoolToPayload(singleData.(*BoolFieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*BoolFieldData).GetMemorySize()))
case schemapb.DataType_Int8:
err = eventWriter.AddInt8ToPayload(singleData.(*Int8FieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int8FieldData).GetMemorySize()))
case schemapb.DataType_Int16:
err = eventWriter.AddInt16ToPayload(singleData.(*Int16FieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int16FieldData).GetMemorySize()))
case schemapb.DataType_Int32:
err = eventWriter.AddInt32ToPayload(singleData.(*Int32FieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int32FieldData).GetMemorySize()))
case schemapb.DataType_Int64:
err = eventWriter.AddInt64ToPayload(singleData.(*Int64FieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int64FieldData).GetMemorySize()))
case schemapb.DataType_Float:
err = eventWriter.AddFloatToPayload(singleData.(*FloatFieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*FloatFieldData).GetMemorySize()))
case schemapb.DataType_Double:
err = eventWriter.AddDoubleToPayload(singleData.(*DoubleFieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*DoubleFieldData).GetMemorySize()))
case schemapb.DataType_String, schemapb.DataType_VarChar:
for _, singleString := range singleData.(*StringFieldData).Data {
err = eventWriter.AddOneStringToPayload(singleString)
eventWriter.Reserve(int(rowNum))
var memorySize int64
for _, block := range data {
singleData := block.Data[field.FieldID]
blockMemorySize := singleData.GetMemorySize()
memorySize += int64(blockMemorySize)
switch field.DataType {
case schemapb.DataType_Bool:
err = eventWriter.AddBoolToPayload(singleData.(*BoolFieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*StringFieldData).GetMemorySize()))
case schemapb.DataType_Array:
for _, singleArray := range singleData.(*ArrayFieldData).Data {
err = eventWriter.AddOneArrayToPayload(singleArray)
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_Int8:
err = eventWriter.AddInt8ToPayload(singleData.(*Int8FieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*ArrayFieldData).GetMemorySize()))
case schemapb.DataType_JSON:
for _, singleJSON := range singleData.(*JSONFieldData).Data {
err = eventWriter.AddOneJSONToPayload(singleJSON)
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_Int16:
err = eventWriter.AddInt16ToPayload(singleData.(*Int16FieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_Int32:
err = eventWriter.AddInt32ToPayload(singleData.(*Int32FieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_Int64:
err = eventWriter.AddInt64ToPayload(singleData.(*Int64FieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_Float:
err = eventWriter.AddFloatToPayload(singleData.(*FloatFieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_Double:
err = eventWriter.AddDoubleToPayload(singleData.(*DoubleFieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_String, schemapb.DataType_VarChar:
for _, singleString := range singleData.(*StringFieldData).Data {
err = eventWriter.AddOneStringToPayload(singleString)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_Array:
for _, singleArray := range singleData.(*ArrayFieldData).Data {
err = eventWriter.AddOneArrayToPayload(singleArray)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_JSON:
for _, singleJSON := range singleData.(*JSONFieldData).Data {
err = eventWriter.AddOneJSONToPayload(singleJSON)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_BinaryVector:
err = eventWriter.AddBinaryVectorToPayload(singleData.(*BinaryVectorFieldData).Data, singleData.(*BinaryVectorFieldData).Dim)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_FloatVector:
err = eventWriter.AddFloatVectorToPayload(singleData.(*FloatVectorFieldData).Data, singleData.(*FloatVectorFieldData).Dim)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_Float16Vector:
err = eventWriter.AddFloat16VectorToPayload(singleData.(*Float16VectorFieldData).Data, singleData.(*Float16VectorFieldData).Dim)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_BFloat16Vector:
err = eventWriter.AddBFloat16VectorToPayload(singleData.(*BFloat16VectorFieldData).Data, singleData.(*BFloat16VectorFieldData).Dim)
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_SparseFloatVector:
err = eventWriter.AddSparseFloatVectorToPayload(singleData.(*SparseFloatVectorFieldData))
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
default:
return nil, fmt.Errorf("undefined data type %d", field.DataType)
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*JSONFieldData).GetMemorySize()))
case schemapb.DataType_BinaryVector:
err = eventWriter.AddBinaryVectorToPayload(singleData.(*BinaryVectorFieldData).Data, singleData.(*BinaryVectorFieldData).Dim)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*BinaryVectorFieldData).GetMemorySize()))
case schemapb.DataType_FloatVector:
err = eventWriter.AddFloatVectorToPayload(singleData.(*FloatVectorFieldData).Data, singleData.(*FloatVectorFieldData).Dim)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*FloatVectorFieldData).GetMemorySize()))
case schemapb.DataType_Float16Vector:
err = eventWriter.AddFloat16VectorToPayload(singleData.(*Float16VectorFieldData).Data, singleData.(*Float16VectorFieldData).Dim)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Float16VectorFieldData).GetMemorySize()))
case schemapb.DataType_BFloat16Vector:
err = eventWriter.AddBFloat16VectorToPayload(singleData.(*BFloat16VectorFieldData).Data, singleData.(*BFloat16VectorFieldData).Dim)
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*BFloat16VectorFieldData).GetMemorySize()))
case schemapb.DataType_SparseFloatVector:
err = eventWriter.AddSparseFloatVectorToPayload(singleData.(*SparseFloatVectorFieldData))
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*SparseFloatVectorFieldData).GetMemorySize()))
default:
return nil, fmt.Errorf("undefined data type %d", field.DataType)
writer.SetEventTimeStamp(startTs, endTs)
}
if err != nil {
return nil, err
}
writer.SetEventTimeStamp(startTs, endTs)
err = writer.Finish()
if err != nil {
@ -422,7 +444,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
Key: blobKey,
Value: buffer,
RowNum: rowNum,
MemorySize: int64(singleData.GetMemorySize()),
MemorySize: memorySize,
})
eventWriter.Close()
writer.Close()

View File

@ -50,20 +50,24 @@ type InsertData struct {
}
func NewInsertData(schema *schemapb.CollectionSchema) (*InsertData, error) {
return NewInsertDataWithCap(schema, 0)
}
func NewInsertDataWithCap(schema *schemapb.CollectionSchema, cap int) (*InsertData, error) {
if schema == nil {
return nil, fmt.Errorf("Nil input schema")
return nil, merr.WrapErrParameterMissing("collection schema")
}
idata := &InsertData{
Data: make(map[FieldID]FieldData),
}
for _, fSchema := range schema.Fields {
fieldData, err := NewFieldData(fSchema.DataType, fSchema)
for _, field := range schema.GetFields() {
fieldData, err := NewFieldData(field.DataType, field, cap)
if err != nil {
return nil, err
}
idata.Data[fSchema.FieldID] = fieldData
idata.Data[field.FieldID] = fieldData
}
return idata, nil
}
@ -147,7 +151,7 @@ type FieldData interface {
GetDataType() schemapb.DataType
}
func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema) (FieldData, error) {
func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema, cap int) (FieldData, error) {
typeParams := fieldSchema.GetTypeParams()
switch dataType {
case schemapb.DataType_Float16Vector:
@ -156,7 +160,7 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema)
return nil, err
}
return &Float16VectorFieldData{
Data: make([]byte, 0),
Data: make([]byte, 0, cap),
Dim: dim,
}, nil
case schemapb.DataType_BFloat16Vector:
@ -165,7 +169,7 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema)
return nil, err
}
return &BFloat16VectorFieldData{
Data: make([]byte, 0),
Data: make([]byte, 0, cap),
Dim: dim,
}, nil
case schemapb.DataType_FloatVector:
@ -174,7 +178,7 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema)
return nil, err
}
return &FloatVectorFieldData{
Data: make([]float32, 0),
Data: make([]float32, 0, cap),
Dim: dim,
}, nil
case schemapb.DataType_BinaryVector:
@ -183,56 +187,56 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema)
return nil, err
}
return &BinaryVectorFieldData{
Data: make([]byte, 0),
Data: make([]byte, 0, cap),
Dim: dim,
}, nil
case schemapb.DataType_SparseFloatVector:
return &SparseFloatVectorFieldData{}, nil
case schemapb.DataType_Bool:
return &BoolFieldData{
Data: make([]bool, 0),
Data: make([]bool, 0, cap),
}, nil
case schemapb.DataType_Int8:
return &Int8FieldData{
Data: make([]int8, 0),
Data: make([]int8, 0, cap),
}, nil
case schemapb.DataType_Int16:
return &Int16FieldData{
Data: make([]int16, 0),
Data: make([]int16, 0, cap),
}, nil
case schemapb.DataType_Int32:
return &Int32FieldData{
Data: make([]int32, 0),
Data: make([]int32, 0, cap),
}, nil
case schemapb.DataType_Int64:
return &Int64FieldData{
Data: make([]int64, 0),
Data: make([]int64, 0, cap),
}, nil
case schemapb.DataType_Float:
return &FloatFieldData{
Data: make([]float32, 0),
Data: make([]float32, 0, cap),
}, nil
case schemapb.DataType_Double:
return &DoubleFieldData{
Data: make([]float64, 0),
Data: make([]float64, 0, cap),
}, nil
case schemapb.DataType_JSON:
return &JSONFieldData{
Data: make([][]byte, 0),
Data: make([][]byte, 0, cap),
}, nil
case schemapb.DataType_Array:
return &ArrayFieldData{
Data: make([]*schemapb.ScalarField, 0),
Data: make([]*schemapb.ScalarField, 0, cap),
ElementType: fieldSchema.GetElementType(),
}, nil
case schemapb.DataType_String, schemapb.DataType_VarChar:
return &StringFieldData{
Data: make([]string, 0),
Data: make([]string, 0, cap),
DataType: dataType,
}, nil
default:

View File

@ -47,6 +47,7 @@ type PayloadWriterInterface interface {
GetPayloadBufferFromWriter() ([]byte, error)
GetPayloadLengthFromWriter() (int, error)
ReleasePayloadWriter()
Reserve(size int)
Close()
}

View File

@ -535,6 +535,10 @@ func (w *NativePayloadWriter) FinishPayloadWriter() error {
)
}
func (w *NativePayloadWriter) Reserve(size int) {
w.builder.Reserve(size)
}
func (w *NativePayloadWriter) GetPayloadBufferFromWriter() ([]byte, error) {
data := w.output.Bytes()

View File

@ -169,6 +169,9 @@ func TestPrintBinlogFiles(t *testing.T) {
IsPrimaryKey: false,
Description: "description_10",
DataType: schemapb.DataType_BinaryVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "8"},
},
},
{
FieldID: 109,
@ -176,6 +179,9 @@ func TestPrintBinlogFiles(t *testing.T) {
IsPrimaryKey: false,
Description: "description_11",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "8"},
},
},
{
FieldID: 110,
@ -190,6 +196,9 @@ func TestPrintBinlogFiles(t *testing.T) {
IsPrimaryKey: false,
Description: "description_13",
DataType: schemapb.DataType_BFloat16Vector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "4"},
},
},
{
FieldID: 112,
@ -197,6 +206,9 @@ func TestPrintBinlogFiles(t *testing.T) {
IsPrimaryKey: false,
Description: "description_14",
DataType: schemapb.DataType_Float16Vector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "4"},
},
},
},
},

View File

@ -40,7 +40,7 @@ func newFieldReader(ctx context.Context, cm storage.ChunkManager, fieldSchema *s
}
func (r *fieldReader) Next() (storage.FieldData, error) {
fieldData, err := storage.NewFieldData(r.fieldSchema.GetDataType(), r.fieldSchema)
fieldData, err := storage.NewFieldData(r.fieldSchema.GetDataType(), r.fieldSchema, 0)
if err != nil {
return nil, err
}

View File

@ -282,7 +282,7 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
expectInsertData, err := storage.NewInsertData(schema)
suite.NoError(err)
for _, field := range schema.GetFields() {
expectInsertData.Data[field.GetFieldID()], err = storage.NewFieldData(field.GetDataType(), field)
expectInsertData.Data[field.GetFieldID()], err = storage.NewFieldData(field.GetDataType(), field, suite.numRows)
suite.NoError(err)
}
OUTER: