enhance: enable stream writer in compactions (#32612)

See #31679

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
pull/33126/head
Ted Xu 2024-05-17 15:05:37 +08:00 committed by GitHub
parent b560602885
commit a9c7ce72b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 253 additions and 182 deletions

View File

@ -80,22 +80,18 @@ func genDeltaBlobs(b io.BinlogIO, allocator allocator.Allocator, data *DeleteDat
}
// genInsertBlobs returns insert-paths and save blob to kvs
func genInsertBlobs(b io.BinlogIO, allocator allocator.Allocator, data *InsertData, collectionID, partID, segID UniqueID, iCodec *storage.InsertCodec, kvs map[string][]byte) (map[UniqueID]*datapb.FieldBinlog, error) {
inlogs, err := iCodec.Serialize(partID, segID, data)
if err != nil {
return nil, err
}
func genInsertBlobs(b io.BinlogIO, allocator allocator.Allocator, data []*Blob, collectionID, partID, segID UniqueID, kvs map[string][]byte,
) (map[UniqueID]*datapb.FieldBinlog, error) {
inpaths := make(map[UniqueID]*datapb.FieldBinlog)
notifyGenIdx := make(chan struct{})
defer close(notifyGenIdx)
generator, err := allocator.GetGenerator(len(inlogs), notifyGenIdx)
generator, err := allocator.GetGenerator(len(data), notifyGenIdx)
if err != nil {
return nil, err
}
for _, blob := range inlogs {
for _, blob := range data {
// Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt
fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64)
k := metautil.JoinIDPath(collectionID, partID, segID, fID, <-generator)
@ -177,22 +173,21 @@ func uploadInsertLog(
collectionID UniqueID,
partID UniqueID,
segID UniqueID,
iData *InsertData,
iCodec *storage.InsertCodec,
data []*Blob,
) (map[UniqueID]*datapb.FieldBinlog, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "UploadInsertLog")
defer span.End()
kvs := make(map[string][]byte)
if iData.IsEmpty() {
if len(data) <= 0 || data[0].RowNum <= 0 {
log.Warn("binlog io uploading empty insert data",
zap.Int64("segmentID", segID),
zap.Int64("collectionID", iCodec.Schema.GetID()),
zap.Int64("collectionID", collectionID),
)
return nil, nil
}
inpaths, err := genInsertBlobs(b, allocator, iData, collectionID, partID, segID, iCodec, kvs)
inpaths, err := genInsertBlobs(b, allocator, data, collectionID, partID, segID, kvs)
if err != nil {
return nil, err
}

View File

@ -124,21 +124,17 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
f := &MetaFactory{}
meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", schemapb.DataType_Int64)
t.Run("empty insert", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
iCodec := storage.NewInsertCodecWithSchema(meta)
paths, err := uploadInsertLog(context.Background(), binlogIO, alloc, meta.GetID(), 10, 1, genEmptyInsertData(), iCodec)
assert.NoError(t, err)
assert.Nil(t, paths)
})
t.Run("gen insert blob failed", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
iCodec := storage.NewInsertCodecWithSchema(meta)
var partId int64 = 10
var segId int64 = 1
iData := genInsertData(2)
blobs, err := iCodec.Serialize(10, 1, iData)
assert.NoError(t, err)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(nil, fmt.Errorf("mock err"))
_, err := uploadInsertLog(context.Background(), binlogIO, alloc, meta.GetID(), 10, 1, genInsertData(2), iCodec)
_, err = uploadInsertLog(context.Background(), binlogIO, alloc, meta.GetID(), partId, segId, blobs)
assert.Error(t, err)
})
@ -147,13 +143,18 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
binlogIO := io.NewBinlogIO(mkc, getOrCreateIOPool())
iCodec := storage.NewInsertCodecWithSchema(meta)
var partId int64 = 1
var segId int64 = 10
iData := genInsertData(2)
blobs, err := iCodec.Serialize(10, 1, iData)
assert.NoError(t, err)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
_, err := uploadInsertLog(ctx, binlogIO, alloc, meta.GetID(), 1, 10, genInsertData(2), iCodec)
_, err = uploadInsertLog(ctx, binlogIO, alloc, meta.GetID(), partId, segId, blobs)
assert.Error(t, err)
})
})
@ -256,9 +257,13 @@ func TestBinlogIOInnerMethods(t *testing.T) {
t.Run(test.description, func(t *testing.T) {
meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", test.pkType)
iCodec := storage.NewInsertCodecWithSchema(meta)
var partId int64 = 10
var segId int64 = 1
iData := genInsertData(2)
blobs, err := iCodec.Serialize(10, 1, iData)
assert.NoError(t, err)
kvs := make(map[string][]byte)
pin, err := genInsertBlobs(binlogIO, alloc, genInsertData(2), meta.GetID(), 10, 1, iCodec, kvs)
pin, err := genInsertBlobs(binlogIO, alloc, blobs, meta.GetID(), partId, segId, kvs)
assert.NoError(t, err)
assert.Equal(t, 12, len(pin))
@ -277,30 +282,22 @@ func TestBinlogIOInnerMethods(t *testing.T) {
cm := storage.NewLocalChunkManager(storage.RootPath(binlogTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
t.Run("serialize error", func(t *testing.T) {
iCodec := storage.NewInsertCodecWithSchema(nil)
alloc := allocator.NewMockAllocator(t)
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
kvs := make(map[string][]byte)
pin, err := genInsertBlobs(binlogIO, alloc, genEmptyInsertData(), 0, 10, 1, iCodec, kvs)
assert.Error(t, err)
assert.Empty(t, kvs)
assert.Empty(t, pin)
})
t.Run("GetGenerator error", func(t *testing.T) {
f := &MetaFactory{}
meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", schemapb.DataType_Int64)
iCodec := storage.NewInsertCodecWithSchema(meta)
var partId int64 = 10
var segId int64 = 1
iData := genInsertData(2)
blobs, err := iCodec.Serialize(partId, segId, iData)
assert.NoError(t, err)
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock GetGenerator error"))
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
kvs := make(map[string][]byte)
pin, err := genInsertBlobs(binlogIO, alloc, genInsertData(2), meta.GetID(), 10, 1, iCodec, kvs)
pin, err := genInsertBlobs(binlogIO, alloc, blobs, meta.GetID(), partId, segId, kvs)
assert.Error(t, err)
assert.Empty(t, kvs)

View File

@ -55,8 +55,6 @@ var (
errContext = errors.New("context done or timeout")
)
type iterator = storage.Iterator
type compactor interface {
complete()
compact() (*datapb.CompactionPlanResult, error)
@ -174,48 +172,15 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob) (map[interf
return pk2ts, nil
}
func (t *compactionTask) uploadRemainLog(
ctxTimeout context.Context,
targetSegID UniqueID,
partID UniqueID,
meta *etcdpb.CollectionMeta,
stats *storage.PrimaryKeyStats,
totRows int64,
writeBuffer *storage.InsertData,
) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) {
iCodec := storage.NewInsertCodecWithSchema(meta)
inPaths := make(map[int64]*datapb.FieldBinlog, 0)
var err error
if !writeBuffer.IsEmpty() {
inPaths, err = uploadInsertLog(ctxTimeout, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, writeBuffer, iCodec)
if err != nil {
return nil, nil, err
}
func newBinlogWriter(collectionId, partitionId, segmentId UniqueID, schema *schemapb.CollectionSchema,
) (writer *storage.SerializeWriter[*storage.Value], closers []func() (*Blob, error), err error) {
fieldWriters := storage.NewBinlogStreamWriters(collectionId, partitionId, segmentId, schema.Fields)
closers = make([]func() (*Blob, error), 0, len(fieldWriters))
for _, w := range fieldWriters {
closers = append(closers, w.Finalize)
}
statPaths, err := uploadStatsLog(ctxTimeout, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, stats, totRows, iCodec)
if err != nil {
return nil, nil, err
}
return inPaths, statPaths, nil
}
func (t *compactionTask) uploadSingleInsertLog(
ctxTimeout context.Context,
targetSegID UniqueID,
partID UniqueID,
meta *etcdpb.CollectionMeta,
writeBuffer *storage.InsertData,
) (map[UniqueID]*datapb.FieldBinlog, error) {
iCodec := storage.NewInsertCodecWithSchema(meta)
inPaths, err := uploadInsertLog(ctxTimeout, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, writeBuffer, iCodec)
if err != nil {
return nil, err
}
return inPaths, nil
writer, err = storage.NewBinlogSerializeWriter(schema, partitionId, segmentId, fieldWriters, 1024)
return
}
func (t *compactionTask) merge(
@ -231,10 +196,15 @@ func (t *compactionTask) merge(
log := log.With(zap.Int64("planID", t.getPlanID()))
mergeStart := time.Now()
writer, finalizers, err := newBinlogWriter(meta.GetID(), partID, targetSegID, meta.GetSchema())
if err != nil {
return nil, nil, 0, err
}
var (
numBinlogs int // binlog number
numRows int64 // the number of rows uploaded
expired int64 // the number of expired entity
numBinlogs int // binlog number
numRows uint64 // the number of rows uploaded
expired int64 // the number of expired entity
insertField2Path = make(map[UniqueID]*datapb.FieldBinlog)
insertPaths = make([]*datapb.FieldBinlog, 0)
@ -242,10 +212,6 @@ func (t *compactionTask) merge(
statField2Path = make(map[UniqueID]*datapb.FieldBinlog)
statPaths = make([]*datapb.FieldBinlog, 0)
)
writeBuffer, err := storage.NewInsertData(meta.GetSchema())
if err != nil {
return nil, nil, -1, err
}
isDeletedValue := func(v *storage.Value) bool {
ts, ok := delta[v.PK.GetValue()]
@ -306,7 +272,7 @@ func (t *compactionTask) merge(
numRows = 0
numBinlogs = 0
currentTs := t.GetCurrentTime()
currentRows := 0
unflushedRows := 0
downloadTimeCost := time.Duration(0)
uploadInsertTimeCost := time.Duration(0)
@ -325,6 +291,30 @@ func (t *compactionTask) merge(
timestampFrom int64 = -1
)
flush := func() error {
uploadInsertStart := time.Now()
writer.Close()
fieldData := make([]*Blob, len(finalizers))
for i, f := range finalizers {
blob, err := f()
if err != nil {
return err
}
fieldData[i] = blob
}
inPaths, err := uploadInsertLog(ctx, t.binlogIO, t.Allocator, meta.ID, partID, targetSegID, fieldData)
if err != nil {
log.Warn("failed to upload single insert log", zap.Error(err))
return err
}
numBinlogs += len(inPaths)
uploadInsertTimeCost += time.Since(uploadInsertStart)
addInsertFieldPath(inPaths, timestampFrom, timestampTo)
unflushedRows = 0
return nil
}
for _, path := range unMergedInsertlogs {
downloadStart := time.Now()
data, err := downloadBlobs(ctx, t.binlogIO, path)
@ -370,55 +360,50 @@ func (t *compactionTask) merge(
timestampTo = v.Timestamp
}
row, ok := v.Value.(map[UniqueID]interface{})
if !ok {
log.Warn("transfer interface to map wrong", zap.Strings("path", path))
return nil, nil, 0, errors.New("unexpected error")
}
err = writeBuffer.Append(row)
err = writer.Write(v)
if err != nil {
return nil, nil, 0, err
}
numRows++
unflushedRows++
currentRows++
stats.Update(v.PK)
// check size every 100 rows in case of too many `GetMemorySize` call
if (currentRows+1)%100 == 0 && writeBuffer.GetMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsInt() {
numRows += int64(writeBuffer.GetRowNum())
uploadInsertStart := time.Now()
inPaths, err := t.uploadSingleInsertLog(ctx, targetSegID, partID, meta, writeBuffer)
if err != nil {
log.Warn("failed to upload single insert log", zap.Error(err))
return nil, nil, 0, err
}
uploadInsertTimeCost += time.Since(uploadInsertStart)
addInsertFieldPath(inPaths, timestampFrom, timestampTo)
timestampFrom = -1
timestampTo = -1
if (unflushedRows+1)%100 == 0 {
writer.Flush() // Flush to update memory size
writeBuffer, _ = storage.NewInsertData(meta.GetSchema())
currentRows = 0
numBinlogs++
if writer.WrittenMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64() {
if err := flush(); err != nil {
return nil, nil, 0, err
}
timestampFrom = -1
timestampTo = -1
writer, finalizers, err = newBinlogWriter(meta.ID, targetSegID, partID, meta.Schema)
if err != nil {
return nil, nil, 0, err
}
}
}
}
}
// upload stats log and remain insert rows
if writeBuffer.GetRowNum() > 0 || numRows > 0 {
numRows += int64(writeBuffer.GetRowNum())
uploadStart := time.Now()
inPaths, statsPaths, err := t.uploadRemainLog(ctx, targetSegID, partID, meta,
stats, numRows+int64(currentRows), writeBuffer)
// final flush if there is unflushed rows
if unflushedRows > 0 {
if err := flush(); err != nil {
return nil, nil, 0, err
}
}
// upload stats log
if numRows > 0 {
iCodec := storage.NewInsertCodecWithSchema(meta)
statsPaths, err := uploadStatsLog(ctx, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, stats, int64(numRows), iCodec)
if err != nil {
return nil, nil, 0, err
}
uploadInsertTimeCost += time.Since(uploadStart)
addInsertFieldPath(inPaths, timestampFrom, timestampTo)
addStatFieldPath(statsPaths)
numBinlogs += len(inPaths)
}
for _, path := range insertField2Path {
@ -430,14 +415,14 @@ func (t *compactionTask) merge(
}
log.Info("compact merge end",
zap.Int64("remaining insert numRows", numRows),
zap.Uint64("remaining insert numRows", numRows),
zap.Int64("expired entities", expired),
zap.Int("binlog file number", numBinlogs),
zap.Duration("download insert log elapse", downloadTimeCost),
zap.Duration("upload insert log elapse", uploadInsertTimeCost),
zap.Duration("merge elapse", time.Since(mergeStart)))
return insertPaths, statPaths, numRows, nil
return insertPaths, statPaths, int64(numRows), nil
}
func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) {

View File

@ -21,7 +21,6 @@ import (
"fmt"
"math"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
@ -296,8 +295,12 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0")
iData := genInsertDataWithExpiredTS()
iCodec := storage.NewInsertCodecWithSchema(meta)
var partId int64 = 0
var segmentId int64 = 1
blobs, err := iCodec.Serialize(partId, 0, iData)
assert.NoError(t, err)
var allPaths [][]string
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec)
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partId, segmentId, blobs)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -336,18 +339,22 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
})
t.Run("Merge without expiration2", func(t *testing.T) {
mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool())
iData := genInsertDataWithExpiredTS()
iCodec := storage.NewInsertCodecWithSchema(meta)
var partId int64 = 0
var segmentId int64 = 1
blobs, err := iCodec.Serialize(partId, 0, iData)
assert.NoError(t, err)
paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0")
BinLogMaxSize := Params.DataNodeCfg.BinLogMaxSize.GetValue()
defer func() {
Params.Save(Params.DataNodeCfg.BinLogMaxSize.Key, BinLogMaxSize)
}()
paramtable.Get().Save(Params.DataNodeCfg.BinLogMaxSize.Key, "64")
iData := genInsertDataWithExpiredTS()
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
var allPaths [][]string
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec)
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partId, segmentId, blobs)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -394,9 +401,13 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
}()
paramtable.Get().Save(Params.DataNodeCfg.BinLogMaxSize.Key, "1")
iData := genInsertData(101)
var partId int64 = 0
var segmentId int64 = 1
blobs, err := iCodec.Serialize(partId, segmentId, iData)
assert.NoError(t, err)
var allPaths [][]string
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec)
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partId, segmentId, blobs)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -440,10 +451,14 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool())
iCodec := storage.NewInsertCodecWithSchema(meta)
iData := genInsertDataWithExpiredTS()
var partId int64 = 0
var segmentId int64 = 1
blobs, err := iCodec.Serialize(partId, 0, iData)
assert.NoError(t, err)
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
var allPaths [][]string
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec)
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partId, segmentId, blobs)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -485,6 +500,10 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool())
iData := genInsertDataWithExpiredTS()
iCodec := storage.NewInsertCodecWithSchema(meta)
var partId int64 = 0
var segmentId int64 = 1
blobs, err := iCodec.Serialize(partId, 0, iData)
assert.NoError(t, err)
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
metaCache := metacache.NewMockMetaCache(t)
metaCache.EXPECT().Schema().Return(meta.GetSchema()).Maybe()
@ -499,7 +518,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
})
var allPaths [][]string
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec)
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partId, segmentId, blobs)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -539,10 +558,14 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
iCodec := storage.NewInsertCodecWithSchema(meta)
paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0")
iData := genInsertDataWithExpiredTS()
var partId int64 = 0
var segmentId int64 = 1
blobs, err := iCodec.Serialize(partId, 0, iData)
assert.NoError(t, err)
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
var allPaths [][]string
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec)
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partId, segmentId, blobs)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -586,10 +609,14 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
iCodec := storage.NewInsertCodecWithSchema(meta)
paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0")
iData := genInsertDataWithExpiredTS()
var partId int64 = 0
var segmentId int64 = 1
blobs, err := iCodec.Serialize(partId, 0, iData)
assert.NoError(t, err)
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
var allPaths [][]string
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec)
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partId, segmentId, blobs)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -714,32 +741,6 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
_, err := ct.getNumRows()
assert.Error(t, err, "segment not found")
})
t.Run("Test uploadRemainLog error", func(t *testing.T) {
f := &MetaFactory{}
t.Run("upload failed", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil)
meta := f.GetCollectionMeta(UniqueID(10001), "test_upload_remain_log", schemapb.DataType_Int64)
stats, err := storage.NewPrimaryKeyStats(106, int64(schemapb.DataType_Int64), 10)
require.NoError(t, err)
ct := &compactionTask{
binlogIO: io.NewBinlogIO(&mockCm{errSave: true}, getOrCreateIOPool()),
Allocator: alloc,
done: make(chan struct{}, 1),
}
_, _, err = ct.uploadRemainLog(ctx, 1, 2, meta, stats, 10, nil)
assert.Error(t, err)
})
})
}
func getInt64DeltaBlobs(segID UniqueID, pks []UniqueID, tss []Timestamp) ([]*Blob, error) {
@ -924,12 +925,16 @@ func TestCompactorInterfaceMethods(t *testing.T) {
metaCache.EXPECT().GetSegmentByID(mock.Anything).Return(nil, false)
iData1 := genInsertDataWithPKs(c.pks1, c.pkType)
iblobs1, err := iCodec.Serialize(c.parID, 0, iData1)
assert.NoError(t, err)
dData1 := &DeleteData{
Pks: []storage.PrimaryKey{c.pks1[0]},
Tss: []Timestamp{20000},
RowCount: 1,
}
iData2 := genInsertDataWithPKs(c.pks2, c.pkType)
iblobs2, err := iCodec.Serialize(c.parID, 3, iData2)
assert.NoError(t, err)
dData2 := &DeleteData{
Pks: []storage.PrimaryKey{c.pks2[0]},
Tss: []Timestamp{30000},
@ -938,7 +943,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
stats1, err := storage.NewPrimaryKeyStats(1, int64(c.pkType), 1)
require.NoError(t, err)
iPaths1, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), c.parID, c.segID1, iData1, iCodec)
iPaths1, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), c.parID, c.segID1, iblobs1)
require.NoError(t, err)
sPaths1, err := uploadStatsLog(context.Background(), mockbIO, alloc, meta.GetID(), c.parID, c.segID1, stats1, 2, iCodec)
require.NoError(t, err)
@ -948,7 +953,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
stats2, err := storage.NewPrimaryKeyStats(1, int64(c.pkType), 1)
require.NoError(t, err)
iPaths2, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), c.parID, c.segID2, iData2, iCodec)
iPaths2, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), c.parID, c.segID2, iblobs2)
require.NoError(t, err)
sPaths2, err := uploadStatsLog(context.Background(), mockbIO, alloc, meta.GetID(), c.parID, c.segID2, stats2, 2, iCodec)
require.NoError(t, err)
@ -1067,7 +1072,11 @@ func TestCompactorInterfaceMethods(t *testing.T) {
// the same pk for segmentI and segmentII
pks := [2]storage.PrimaryKey{storage.NewInt64PrimaryKey(1), storage.NewInt64PrimaryKey(2)}
iData1 := genInsertDataWithPKs(pks, schemapb.DataType_Int64)
iblobs1, err := iCodec.Serialize(partID, 0, iData1)
assert.NoError(t, err)
iData2 := genInsertDataWithPKs(pks, schemapb.DataType_Int64)
iblobs2, err := iCodec.Serialize(partID, 1, iData2)
assert.NoError(t, err)
pk1 := storage.NewInt64PrimaryKey(1)
dData1 := &DeleteData{
@ -1084,7 +1093,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
stats1, err := storage.NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 1)
require.NoError(t, err)
iPaths1, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partID, segID1, iData1, iCodec)
iPaths1, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partID, segID1, iblobs1)
require.NoError(t, err)
sPaths1, err := uploadStatsLog(context.Background(), mockbIO, alloc, meta.GetID(), partID, segID1, stats1, 1, iCodec)
require.NoError(t, err)
@ -1094,7 +1103,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
stats2, err := storage.NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 1)
require.NoError(t, err)
iPaths2, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partID, segID2, iData2, iCodec)
iPaths2, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partID, segID2, iblobs2)
require.NoError(t, err)
sPaths2, err := uploadStatsLog(context.Background(), mockbIO, alloc, meta.GetID(), partID, segID2, stats2, 1, iCodec)
require.NoError(t, err)
@ -1160,3 +1169,78 @@ func TestInjectDone(t *testing.T) {
task.injectDone()
task.injectDone()
}
func BenchmarkCompaction(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cm := storage.NewLocalChunkManager(storage.RootPath(compactTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
collectionID := int64(1)
meta := NewMetaFactory().GetCollectionMeta(collectionID, "test", schemapb.DataType_Int64)
mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool())
paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0")
iData := genInsertDataWithExpiredTS()
iCodec := storage.NewInsertCodecWithSchema(meta)
var partId int64 = 0
var segmentId int64 = 1
blobs, err := iCodec.Serialize(partId, 0, iData)
assert.NoError(b, err)
var allPaths [][]string
alloc := allocator.NewMockAllocator(b)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil)
alloc.EXPECT().AllocOne().Call.Return(int64(19530), nil)
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partId, segmentId, blobs)
assert.NoError(b, err)
assert.Equal(b, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
assert.Equal(b, 1, binlogNum)
for idx := 0; idx < binlogNum; idx++ {
var ps []string
for _, path := range inpath {
ps = append(ps, path.GetBinlogs()[idx].GetLogPath())
}
allPaths = append(allPaths, ps)
}
dm := map[interface{}]Timestamp{
1: 10000,
}
metaCache := metacache.NewMockMetaCache(b)
metaCache.EXPECT().Schema().Return(meta.GetSchema()).Maybe()
metaCache.EXPECT().GetSegmentByID(mock.Anything).RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) {
segment := metacache.NewSegmentInfo(&datapb.SegmentInfo{
CollectionID: 1,
PartitionID: 0,
ID: id,
NumOfRows: 10,
}, nil)
return segment, true
})
ct := &compactionTask{
metaCache: metaCache,
binlogIO: mockbIO,
Allocator: alloc,
done: make(chan struct{}, 1),
plan: &datapb.CompactionPlan{
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{SegmentID: 1},
},
},
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
inPaths, statsPaths, numOfRow, err := ct.merge(context.Background(), allPaths, 2, 0, meta, dm)
assert.NoError(b, err)
assert.Equal(b, int64(2), numOfRow)
assert.Equal(b, 1, len(inPaths[0].GetBinlogs()))
assert.Equal(b, 1, len(statsPaths))
assert.NotEqual(b, -1, inPaths[0].GetBinlogs()[0].GetTimestampFrom())
assert.NotEqual(b, -1, inPaths[0].GetBinlogs()[0].GetTimestampTo())
}
}

View File

@ -99,7 +99,6 @@ func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error
}
return err
})
return struct{}{}, err
})

View File

@ -28,6 +28,7 @@ import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/apache/arrow/go/v12/parquet"
"github.com/apache/arrow/go/v12/parquet/pqarrow"
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
@ -749,18 +750,17 @@ var _ RecordWriter = (*singleFieldRecordWriter)(nil)
type singleFieldRecordWriter struct {
fw *pqarrow.FileWriter
fieldId FieldID
schema *arrow.Schema
grouped bool
numRows int
}
func (sfw *singleFieldRecordWriter) Write(r Record) error {
if !sfw.grouped {
sfw.grouped = true
sfw.fw.NewRowGroup()
}
// TODO: adding row group support by calling fw.NewRowGroup()
sfw.numRows += r.Len()
a := r.Column(sfw.fieldId)
return sfw.fw.WriteColumnData(a)
rec := array.NewRecord(sfw.schema, []arrow.Array{a}, int64(r.Len()))
defer rec.Release()
return sfw.fw.WriteBuffered(rec)
}
func (sfw *singleFieldRecordWriter) Close() {
@ -769,13 +769,16 @@ func (sfw *singleFieldRecordWriter) Close() {
func newSingleFieldRecordWriter(fieldId FieldID, field arrow.Field, writer io.Writer) (*singleFieldRecordWriter, error) {
schema := arrow.NewSchema([]arrow.Field{field}, nil)
fw, err := pqarrow.NewFileWriter(schema, writer, nil, pqarrow.DefaultWriterProps())
fw, err := pqarrow.NewFileWriter(schema, writer,
parquet.NewWriterProperties(parquet.WithMaxRowGroupLength(math.MaxInt64)), // No additional grouping for now.
pqarrow.DefaultWriterProps())
if err != nil {
return nil, err
}
return &singleFieldRecordWriter{
fw: fw,
fieldId: fieldId,
schema: schema,
}, nil
}
@ -790,15 +793,18 @@ type SerializeWriter[T any] struct {
}
func (sw *SerializeWriter[T]) Flush() error {
if sw.pos == 0 {
return nil
}
buf := sw.buffer[:sw.pos]
r, size, err := sw.serializer(buf)
if err != nil {
return err
}
defer r.Release()
if err := sw.rw.Write(r); err != nil {
return err
}
r.Release()
sw.pos = 0
sw.writtenMemorySize += size
return nil
@ -823,8 +829,11 @@ func (sw *SerializeWriter[T]) WrittenMemorySize() uint64 {
}
func (sw *SerializeWriter[T]) Close() error {
if err := sw.Flush(); err != nil {
return err
}
sw.rw.Close()
return sw.Flush()
return nil
}
func NewSerializeRecordWriter[T any](rw RecordWriter, serializer Serializer[T], batchSize int) *SerializeWriter[T] {
@ -881,7 +890,7 @@ type BinlogStreamWriter struct {
memorySize int // To be updated on the fly
buf bytes.Buffer
rw RecordWriter
rw *singleFieldRecordWriter
}
func (bsw *BinlogStreamWriter) GetRecordWriter() (RecordWriter, error) {
@ -916,8 +925,9 @@ func (bsw *BinlogStreamWriter) Finalize() (*Blob, error) {
return nil, err
}
return &Blob{
Key: strconv.Itoa(int(bsw.fieldSchema.FieldID)),
Value: b.Bytes(),
Key: strconv.Itoa(int(bsw.fieldSchema.FieldID)),
Value: b.Bytes(),
RowNum: int64(bsw.rw.numRows),
}, nil
}

View File

@ -124,7 +124,7 @@ func TestBinlogSerializeWriter(t *testing.T) {
})
t.Run("test serialize", func(t *testing.T) {
size := 3
size := 16
blobs, err := generateTestData(size)
assert.NoError(t, err)
reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField)
@ -134,7 +134,7 @@ func TestBinlogSerializeWriter(t *testing.T) {
schema := generateTestSchema()
// Copy write the generated data
writers := NewBinlogStreamWriters(0, 0, 0, schema.Fields)
writer, err := NewBinlogSerializeWriter(schema, 0, 0, writers, 1024)
writer, err := NewBinlogSerializeWriter(schema, 0, 0, writers, 7)
assert.NoError(t, err)
for i := 1; i <= size; i++ {
@ -143,7 +143,8 @@ func TestBinlogSerializeWriter(t *testing.T) {
value := reader.Value()
assertTestData(t, i, value)
writer.Write(value)
err := writer.Write(value)
assert.NoError(t, err)
}
err = reader.Next()