enhance: make compactor use actual buffer size to decide when to sync (#29945)

See also: #29657

Datanode Compactor use estimated row number from schema to decide when
to sync the batch of data when executing compaction. This est value
could go way from actual size when the schema contains variable field(
say VarChar, JSON, etc.)

This PR make compactor able to check the actual buffer data size and
make it possible to sync when buffer is actually beyong max binglog
size.

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/29957/head
congqixia 2024-01-13 01:32:52 +08:00 committed by GitHub
parent c1b0562d21
commit ed89c6a2ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 51 additions and 133 deletions

View File

@ -116,7 +116,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(nil, fmt.Errorf("mock err"))
b := binlogIO{cm, alloc}
_, _, err := b.uploadStatsLog(context.Background(), 1, 10, genInsertData(), genTestStat(meta), 10, meta)
_, _, err := b.uploadStatsLog(context.Background(), 1, 10, genInsertData(2), genTestStat(meta), 10, meta)
assert.Error(t, err)
})
})
@ -140,7 +140,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(nil, fmt.Errorf("mock err"))
_, err := b.uploadInsertLog(context.Background(), 1, 10, genInsertData(), meta)
_, err := b.uploadInsertLog(context.Background(), 1, 10, genInsertData(2), meta)
assert.Error(t, err)
})
@ -154,7 +154,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
_, err := b.uploadInsertLog(ctx, 1, 10, genInsertData(), meta)
_, err := b.uploadInsertLog(ctx, 1, 10, genInsertData(2), meta)
assert.Error(t, err)
})
})
@ -260,7 +260,7 @@ func TestBinlogIOInnerMethods(t *testing.T) {
iCodec := storage.NewInsertCodecWithSchema(meta)
kvs := make(map[string][]byte)
pin, err := b.genInsertBlobs(genInsertData(), 10, 1, iCodec, kvs)
pin, err := b.genInsertBlobs(genInsertData(2), 10, 1, iCodec, kvs)
assert.NoError(t, err)
assert.Equal(t, 12, len(pin))
@ -301,7 +301,7 @@ func TestBinlogIOInnerMethods(t *testing.T) {
bin := &binlogIO{cm, alloc}
kvs := make(map[string][]byte)
pin, err := bin.genInsertBlobs(genInsertData(), 10, 1, iCodec, kvs)
pin, err := bin.genInsertBlobs(genInsertData(2), 10, 1, iCodec, kvs)
assert.Error(t, err)
assert.Empty(t, kvs)

View File

@ -176,31 +176,10 @@ func (t *compactionTask) uploadRemainLog(
meta *etcdpb.CollectionMeta,
stats *storage.PrimaryKeyStats,
totRows int64,
fID2Content map[UniqueID][]interface{},
writeBuffer *storage.InsertData,
fID2Type map[UniqueID]schemapb.DataType,
) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) {
var iData *InsertData
// remain insert data
if len(fID2Content) != 0 {
iData = &InsertData{Data: make(map[storage.FieldID]storage.FieldData)}
for fID, content := range fID2Content {
tp, ok := fID2Type[fID]
if !ok {
log.Warn("no field ID in this schema", zap.Int64("fieldID", fID))
return nil, nil, errors.New("Unexpected error")
}
fData, err := interface2FieldData(tp, content, int64(len(content)))
if err != nil {
log.Warn("transfer interface to FieldData wrong", zap.Error(err))
return nil, nil, err
}
iData.Data[fID] = fData
}
}
inPaths, statPaths, err := t.uploadStatsLog(ctxTimeout, targetSegID, partID, iData, stats, totRows, meta)
inPaths, statPaths, err := t.uploadStatsLog(ctxTimeout, targetSegID, partID, writeBuffer, stats, totRows, meta)
if err != nil {
return nil, nil, err
}
@ -213,29 +192,10 @@ func (t *compactionTask) uploadSingleInsertLog(
targetSegID UniqueID,
partID UniqueID,
meta *etcdpb.CollectionMeta,
fID2Content map[UniqueID][]interface{},
writeBuffer *storage.InsertData,
fID2Type map[UniqueID]schemapb.DataType,
) (map[UniqueID]*datapb.FieldBinlog, error) {
iData := &InsertData{
Data: make(map[storage.FieldID]storage.FieldData),
}
for fID, content := range fID2Content {
tp, ok := fID2Type[fID]
if !ok {
log.Warn("no field ID in this schema", zap.Int64("fieldID", fID))
return nil, errors.New("Unexpected error")
}
fData, err := interface2FieldData(tp, content, int64(len(content)))
if err != nil {
log.Warn("transfer interface to FieldData wrong", zap.Error(err))
return nil, err
}
iData.Data[fID] = fData
}
inPaths, err := t.uploadInsertLog(ctxTimeout, targetSegID, partID, iData, meta)
inPaths, err := t.uploadInsertLog(ctxTimeout, targetSegID, partID, writeBuffer, meta)
if err != nil {
return nil, err
}
@ -255,13 +215,11 @@ func (t *compactionTask) merge(
mergeStart := time.Now()
var (
maxRowsPerBinlog int // maximum rows populating one binlog
numBinlogs int // binlog number
numRows int64 // the number of rows uploaded
expired int64 // the number of expired entity
numBinlogs int // binlog number
numRows int64 // the number of rows uploaded
expired int64 // the number of expired entity
fID2Type = make(map[UniqueID]schemapb.DataType)
fID2Content = make(map[UniqueID][]interface{})
fID2Type = make(map[UniqueID]schemapb.DataType)
insertField2Path = make(map[UniqueID]*datapb.FieldBinlog)
insertPaths = make([]*datapb.FieldBinlog, 0)
@ -269,6 +227,10 @@ func (t *compactionTask) merge(
statField2Path = make(map[UniqueID]*datapb.FieldBinlog)
statPaths = make([]*datapb.FieldBinlog, 0)
)
writeBuffer, err := storage.NewInsertData(meta.GetSchema())
if err != nil {
return nil, nil, -1, err
}
isDeletedValue := func(v *storage.Value) bool {
ts, ok := delta[v.PK.GetValue()]
@ -326,19 +288,6 @@ func (t *compactionTask) merge(
pkID := pkField.GetFieldID()
pkType := pkField.GetDataType()
// estimate Rows per binlog
// TODO should not convert size to row because we already know the size, this is especially important on varchar types.
size, err := typeutil.EstimateSizePerRecord(meta.GetSchema())
if err != nil {
log.Warn("failed to estimate size per record", zap.Error(err))
return nil, nil, 0, err
}
maxRowsPerBinlog = int(Params.DataNodeCfg.BinLogMaxSize.GetAsInt64() / int64(size))
if Params.DataNodeCfg.BinLogMaxSize.GetAsInt64()%int64(size) != 0 {
maxRowsPerBinlog++
}
expired = 0
numRows = 0
numBinlogs = 0
@ -410,19 +359,19 @@ func (t *compactionTask) merge(
return nil, nil, 0, errors.New("unexpected error")
}
for fID, vInter := range row {
if _, ok := fID2Content[fID]; !ok {
fID2Content[fID] = make([]interface{}, 0)
}
fID2Content[fID] = append(fID2Content[fID], vInter)
err = writeBuffer.Append(row)
if err != nil {
return nil, nil, 0, err
}
// update pk to new stats log
stats.Update(v.PK)
currentRows++
if currentRows >= maxRowsPerBinlog {
stats.Update(v.PK)
// check size every 100 rows in case of too many `GetMemorySize` call
if (currentRows+1)%100 == 0 && writeBuffer.GetMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsInt() {
numRows += int64(writeBuffer.GetRowNum())
uploadInsertStart := time.Now()
inPaths, err := t.uploadSingleInsertLog(ctxTimeout, targetSegID, partID, meta, fID2Content, fID2Type)
inPaths, err := t.uploadSingleInsertLog(ctxTimeout, targetSegID, partID, meta, writeBuffer, fID2Type)
if err != nil {
log.Warn("failed to upload single insert log", zap.Error(err))
return nil, nil, 0, err
@ -432,19 +381,19 @@ func (t *compactionTask) merge(
timestampFrom = -1
timestampTo = -1
fID2Content = make(map[int64][]interface{})
writeBuffer, _ = storage.NewInsertData(meta.GetSchema())
currentRows = 0
numRows += int64(maxRowsPerBinlog)
numBinlogs++
}
}
}
// upload stats log and remain insert rows
if numRows != 0 || currentRows != 0 {
if writeBuffer.GetRowNum() > 0 || numRows > 0 {
numRows += int64(writeBuffer.GetRowNum())
uploadStart := time.Now()
inPaths, statsPaths, err := t.uploadRemainLog(ctxTimeout, targetSegID, partID, meta,
stats, numRows+int64(currentRows), fID2Content, fID2Type)
stats, numRows+int64(currentRows), writeBuffer, fID2Type)
if err != nil {
return nil, nil, 0, err
}
@ -452,7 +401,6 @@ func (t *compactionTask) merge(
uploadInsertTimeCost += time.Since(uploadStart)
addInsertFieldPath(inPaths, timestampFrom, timestampTo)
addStatFieldPath(statsPaths)
numRows += int64(currentRows)
numBinlogs += len(inPaths)
}

View File

@ -367,7 +367,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
defer func() {
Params.Save(Params.DataNodeCfg.BinLogMaxSize.Key, BinLogMaxSize)
}()
paramtable.Get().Save(Params.DataNodeCfg.BinLogMaxSize.Key, "128")
paramtable.Get().Save(Params.DataNodeCfg.BinLogMaxSize.Key, "64")
iData := genInsertDataWithExpiredTS()
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
@ -402,14 +402,14 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
inPaths, statsPaths, numOfRow, err := ct.merge(context.Background(), allPaths, 2, 0, meta, dm)
assert.NoError(t, err)
assert.Equal(t, int64(2), numOfRow)
assert.Equal(t, 2, len(inPaths[0].GetBinlogs()))
assert.Equal(t, 1, len(inPaths[0].GetBinlogs()))
assert.Equal(t, 1, len(statsPaths))
assert.Equal(t, 1, len(statsPaths[0].GetBinlogs()))
assert.NotEqual(t, -1, inPaths[0].GetBinlogs()[0].GetTimestampFrom())
assert.NotEqual(t, -1, inPaths[0].GetBinlogs()[0].GetTimestampTo())
})
// set Params.DataNodeCfg.BinLogMaxSize.Key = 1 to generate multi binlogs, each has only one row
t.Run("Merge without expiration3", func(t *testing.T) {
t.Run("merge_with_more_than_100rows", func(t *testing.T) {
mockbIO := &binlogIO{cm, alloc}
paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0")
BinLogMaxSize := Params.DataNodeCfg.BinLogMaxSize.GetAsInt()
@ -417,7 +417,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
paramtable.Get().Save(Params.DataNodeCfg.BinLogMaxSize.Key, fmt.Sprintf("%d", BinLogMaxSize))
}()
paramtable.Get().Save(Params.DataNodeCfg.BinLogMaxSize.Key, "1")
iData := genInsertDataWithExpiredTS()
iData := genInsertData(101)
var allPaths [][]string
inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
@ -451,14 +451,12 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
}
inPaths, statsPaths, numOfRow, err := ct.merge(context.Background(), allPaths, 2, 0, meta, dm)
assert.NoError(t, err)
assert.Equal(t, int64(2), numOfRow)
assert.Equal(t, int64(101), numOfRow)
assert.Equal(t, 2, len(inPaths[0].GetBinlogs()))
assert.Equal(t, 1, len(statsPaths))
for _, inpath := range inPaths {
assert.NotEqual(t, -1, inpath.GetBinlogs()[0].GetTimestampFrom())
assert.NotEqual(t, -1, inpath.GetBinlogs()[0].GetTimestampTo())
// as only one row for each binlog, timestampTo == timestampFrom
assert.Equal(t, inpath.GetBinlogs()[0].GetTimestampTo(), inpath.GetBinlogs()[0].GetTimestampFrom())
}
})
@ -742,36 +740,6 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
t.Run("Test uploadRemainLog error", func(t *testing.T) {
f := &MetaFactory{}
t.Run("field not in field to type", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
ct := &compactionTask{
done: make(chan struct{}, 1),
}
meta := f.GetCollectionMeta(UniqueID(10001), "test_upload_remain_log", schemapb.DataType_Int64)
fid2C := make(map[int64][]interface{})
fid2T := make(map[int64]schemapb.DataType)
fid2C[1] = nil
_, _, err := ct.uploadRemainLog(ctx, 1, 2, meta, nil, 0, fid2C, fid2T)
assert.Error(t, err)
})
t.Run("transfer interface wrong", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
ct := &compactionTask{
done: make(chan struct{}, 1),
}
meta := f.GetCollectionMeta(UniqueID(10001), "test_upload_remain_log", schemapb.DataType_Int64)
fid2C := make(map[int64][]interface{})
fid2T := make(map[int64]schemapb.DataType)
fid2C[1] = nil
_, _, err := ct.uploadRemainLog(ctx, 1, 2, meta, nil, 0, fid2C, fid2T)
assert.Error(t, err)
})
t.Run("upload failed", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

View File

@ -22,9 +22,11 @@ import (
"encoding/binary"
"fmt"
"math"
"math/rand"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -1089,7 +1091,7 @@ func (f *FailMessageStreamFactory) NewTtMsgStream(ctx context.Context) (msgstrea
}
func genInsertDataWithPKs(PKs [2]storage.PrimaryKey, dataType schemapb.DataType) *InsertData {
iD := genInsertData()
iD := genInsertData(2)
switch dataType {
case schemapb.DataType_Int64:
values := make([]int64, len(PKs))
@ -1121,46 +1123,46 @@ func genTestStat(meta *etcdpb.CollectionMeta) *storage.PrimaryKeyStats {
return stats
}
func genInsertData() *InsertData {
func genInsertData(rowNum int) *InsertData {
return &InsertData{
Data: map[int64]storage.FieldData{
0: &storage.Int64FieldData{
Data: []int64{1, 2},
Data: lo.RepeatBy(rowNum, func(i int) int64 { return int64(i + 1) }),
},
1: &storage.Int64FieldData{
Data: []int64{3, 4},
Data: lo.RepeatBy(rowNum, func(i int) int64 { return int64(i + 3) }),
},
100: &storage.FloatVectorFieldData{
Data: []float32{1.0, 6.0, 7.0, 8.0},
Data: lo.RepeatBy(rowNum*2, func(i int) float32 { return rand.Float32() }),
Dim: 2,
},
101: &storage.BinaryVectorFieldData{
Data: []byte{0, 255, 255, 255, 128, 128, 128, 0},
Data: lo.RepeatBy(rowNum*4, func(i int) byte { return byte(rand.Intn(256)) }),
Dim: 32,
},
102: &storage.BoolFieldData{
Data: []bool{true, false},
Data: lo.RepeatBy(rowNum, func(i int) bool { return i%2 == 0 }),
},
103: &storage.Int8FieldData{
Data: []int8{5, 6},
Data: lo.RepeatBy(rowNum, func(i int) int8 { return int8(i) }),
},
104: &storage.Int16FieldData{
Data: []int16{7, 8},
Data: lo.RepeatBy(rowNum, func(i int) int16 { return int16(i) }),
},
105: &storage.Int32FieldData{
Data: []int32{9, 10},
Data: lo.RepeatBy(rowNum, func(i int) int32 { return int32(i) }),
},
106: &storage.Int64FieldData{
Data: []int64{1, 2},
Data: lo.RepeatBy(rowNum, func(i int) int64 { return int64(i) }),
},
107: &storage.FloatFieldData{
Data: []float32{2.333, 2.334},
Data: lo.RepeatBy(rowNum, func(i int) float32 { return rand.Float32() }),
},
108: &storage.DoubleFieldData{
Data: []float64{3.333, 3.334},
Data: lo.RepeatBy(rowNum, func(i int) float64 { return rand.Float64() }),
},
109: &storage.StringFieldData{
Data: []string{"test1", "test2"},
Data: lo.RepeatBy(rowNum, func(i int) string { return fmt.Sprintf("test%d", i) }),
},
},
}