enhance: replace binlogIO with io.BinlogIO in datanode (#29725)

#30633

Signed-off-by: wayblink <anyang.wang@zilliz.com>
pull/30689/head
wayblink 2024-02-20 14:38:51 +08:00 committed by GitHub
parent ed754dc58c
commit f976385421
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 198 additions and 283 deletions

View File

@ -18,21 +18,18 @@ package datanode
import (
"context"
"path"
"strconv"
"time"
"github.com/cockroachdb/errors"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/io"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -44,121 +41,27 @@ var (
errStart = errors.New("start")
)
type downloader interface {
// donload downloads insert-binlogs, stats-binlogs, and, delta-binlogs from blob storage for given paths.
// The paths are 1 group of binlog paths generated by 1 `Serialize`.
//
// errDownloadFromBlobStorage is returned if ctx is canceled from outside while a downloading is inprogress.
// Beware of the ctx here, if no timeout or cancel is applied to this ctx, this downloading may retry forever.
download(ctx context.Context, paths []string) ([]*Blob, error)
}
type uploader interface {
// upload saves InsertData and DeleteData into blob storage, stats binlogs are generated from InsertData.
//
// errUploadToBlobStorage is returned if ctx is canceled from outside while a uploading is inprogress.
// Beware of the ctx here, if no timeout or cancel is applied to this ctx, this uploading may retry forever.
uploadInsertLog(ctx context.Context, segID, partID UniqueID, iData *InsertData, meta *etcdpb.CollectionMeta) (map[UniqueID]*datapb.FieldBinlog, error)
uploadStatsLog(ctx context.Context, segID, partID UniqueID, iData *InsertData, stats *storage.PrimaryKeyStats, totRows int64, meta *etcdpb.CollectionMeta) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error)
uploadDeltaLog(ctx context.Context, segID, partID UniqueID, dData *DeleteData, meta *etcdpb.CollectionMeta) ([]*datapb.FieldBinlog, error)
}
type binlogIO struct {
storage.ChunkManager
allocator.Allocator
}
var (
_ downloader = (*binlogIO)(nil)
_ uploader = (*binlogIO)(nil)
)
func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error) {
func downloadBlobs(ctx context.Context, b io.BinlogIO, paths []string) ([]*Blob, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "download")
defer span.End()
log.Debug("down load", zap.Strings("path", paths))
bytes, err := b.Download(ctx, paths)
if err != nil {
log.Warn("ctx done when downloading kvs from blob storage", zap.Strings("paths", paths))
return nil, errDownloadFromBlobStorage
}
resp := make([]*Blob, len(paths))
if len(paths) == 0 {
return resp, nil
}
futures := make([]*conc.Future[any], len(paths))
for i, path := range paths {
localPath := path
future := getMultiReadPool().Submit(func() (any, error) {
var vs []byte
err := errStart
for err != nil {
select {
case <-ctx.Done():
log.Warn("ctx done when downloading kvs from blob storage", zap.Strings("paths", paths))
return nil, errDownloadFromBlobStorage
default:
if err != errStart {
time.Sleep(50 * time.Millisecond)
}
vs, err = b.Read(ctx, localPath)
}
}
return vs, nil
})
futures[i] = future
for i := range bytes {
resp[i] = &Blob{Value: bytes[i]}
}
for i := range futures {
if !futures[i].OK() {
return nil, futures[i].Err()
}
resp[i] = &Blob{Value: futures[i].Value().([]byte)}
}
return resp, nil
}
func (b *binlogIO) uploadSegmentFiles(
ctx context.Context,
CollectionID UniqueID,
segID UniqueID,
kvs map[string][]byte,
) error {
log.Debug("update", zap.Int64("collectionID", CollectionID), zap.Int64("segmentID", segID))
if len(kvs) == 0 {
return nil
}
futures := make([]*conc.Future[any], 0)
for key, val := range kvs {
localPath := key
localVal := val
future := getMultiReadPool().Submit(func() (any, error) {
err := errStart
for err != nil {
select {
case <-ctx.Done():
log.Warn("ctx done when saving kvs to blob storage",
zap.Int64("collectionID", CollectionID),
zap.Int64("segmentID", segID),
zap.Int("number of kvs", len(kvs)))
return nil, errUploadToBlobStorage
default:
if err != errStart {
time.Sleep(50 * time.Millisecond)
}
err = b.Write(ctx, localPath, localVal)
}
}
return nil, nil
})
futures = append(futures, future)
}
err := conc.AwaitAll(futures...)
if err != nil {
return err
}
return nil
}
// genDeltaBlobs returns key, value
func (b *binlogIO) genDeltaBlobs(data *DeleteData, collID, partID, segID UniqueID) (string, []byte, error) {
func genDeltaBlobs(b io.BinlogIO, allocator allocator.Allocator, data *DeleteData, collID, partID, segID UniqueID) (string, []byte, error) {
dCodec := storage.NewDeleteCodec()
blob, err := dCodec.Serialize(collID, partID, segID, data)
@ -166,19 +69,18 @@ func (b *binlogIO) genDeltaBlobs(data *DeleteData, collID, partID, segID UniqueI
return "", nil, err
}
idx, err := b.AllocOne()
idx, err := allocator.AllocOne()
if err != nil {
return "", nil, err
}
k := metautil.JoinIDPath(collID, partID, segID, idx)
key := path.Join(b.ChunkManager.RootPath(), common.SegmentDeltaLogPath, k)
key := b.JoinFullPath(common.SegmentDeltaLogPath, k)
return key, blob.GetValue(), nil
}
// genInsertBlobs returns insert-paths and save blob to kvs
func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, iCodec *storage.InsertCodec, kvs map[string][]byte) (map[UniqueID]*datapb.FieldBinlog, error) {
func genInsertBlobs(b io.BinlogIO, allocator allocator.Allocator, data *InsertData, collectionID, partID, segID UniqueID, iCodec *storage.InsertCodec, kvs map[string][]byte) (map[UniqueID]*datapb.FieldBinlog, error) {
inlogs, err := iCodec.Serialize(partID, segID, data)
if err != nil {
return nil, err
@ -188,7 +90,7 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, iCod
notifyGenIdx := make(chan struct{})
defer close(notifyGenIdx)
generator, err := b.GetGenerator(len(inlogs), notifyGenIdx)
generator, err := allocator.GetGenerator(len(inlogs), notifyGenIdx)
if err != nil {
return nil, err
}
@ -196,9 +98,8 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, iCod
for _, blob := range inlogs {
// Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt
fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64)
k := metautil.JoinIDPath(iCodec.Schema.GetID(), partID, segID, fID, <-generator)
key := path.Join(b.ChunkManager.RootPath(), common.SegmentInsertLogPath, k)
k := metautil.JoinIDPath(collectionID, partID, segID, fID, <-generator)
key := b.JoinFullPath(common.SegmentInsertLogPath, k)
value := blob.GetValue()
fileLen := len(value)
@ -213,22 +114,20 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, iCod
}
// genStatBlobs return stats log paths and save blob to kvs
func (b *binlogIO) genStatBlobs(stats *storage.PrimaryKeyStats, partID, segID UniqueID, iCodec *storage.InsertCodec, kvs map[string][]byte, totRows int64) (map[UniqueID]*datapb.FieldBinlog, error) {
func genStatBlobs(b io.BinlogIO, allocator allocator.Allocator, stats *storage.PrimaryKeyStats, collectionID, partID, segID UniqueID, iCodec *storage.InsertCodec, kvs map[string][]byte, totRows int64) (map[UniqueID]*datapb.FieldBinlog, error) {
statBlob, err := iCodec.SerializePkStats(stats, totRows)
if err != nil {
return nil, err
}
statPaths := make(map[UniqueID]*datapb.FieldBinlog)
idx, err := b.AllocOne()
idx, err := allocator.AllocOne()
if err != nil {
return nil, err
}
fID, _ := strconv.ParseInt(statBlob.GetKey(), 10, 64)
k := metautil.JoinIDPath(iCodec.Schema.GetID(), partID, segID, fID, idx)
key := path.Join(b.ChunkManager.RootPath(), common.SegmentStatslogPath, k)
k := metautil.JoinIDPath(collectionID, partID, segID, fID, idx)
key := b.JoinFullPath(common.SegmentStatslogPath, k)
value := statBlob.GetValue()
fileLen := len(value)
@ -243,57 +142,46 @@ func (b *binlogIO) genStatBlobs(stats *storage.PrimaryKeyStats, partID, segID Un
// update stats log
// also update with insert data if not nil
func (b *binlogIO) uploadStatsLog(
func uploadStatsLog(
ctx context.Context,
segID UniqueID,
b io.BinlogIO,
allocator allocator.Allocator,
collectionID UniqueID,
partID UniqueID,
iData *InsertData,
segID UniqueID,
stats *storage.PrimaryKeyStats,
totRows int64,
meta *etcdpb.CollectionMeta,
) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) {
iCodec *storage.InsertCodec,
) (map[UniqueID]*datapb.FieldBinlog, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "UploadStatslog")
defer span.End()
var inPaths map[int64]*datapb.FieldBinlog
var err error
iCodec := storage.NewInsertCodecWithSchema(meta)
kvs := make(map[string][]byte)
if !iData.IsEmpty() {
inPaths, err = b.genInsertBlobs(iData, partID, segID, iCodec, kvs)
if err != nil {
log.Warn("generate insert blobs wrong",
zap.Int64("collectionID", iCodec.Schema.GetID()),
zap.Int64("segmentID", segID),
zap.Error(err))
return nil, nil, err
}
}
statPaths, err := b.genStatBlobs(stats, partID, segID, iCodec, kvs, totRows)
statPaths, err := genStatBlobs(b, allocator, stats, collectionID, partID, segID, iCodec, kvs, totRows)
if err != nil {
return nil, nil, err
return nil, err
}
err = b.uploadSegmentFiles(ctx, meta.GetID(), segID, kvs)
err = b.Upload(ctx, kvs)
if err != nil {
return nil, nil, err
return nil, err
}
return inPaths, statPaths, nil
return statPaths, nil
}
func (b *binlogIO) uploadInsertLog(
func uploadInsertLog(
ctx context.Context,
segID UniqueID,
b io.BinlogIO,
allocator allocator.Allocator,
collectionID UniqueID,
partID UniqueID,
segID UniqueID,
iData *InsertData,
meta *etcdpb.CollectionMeta,
iCodec *storage.InsertCodec,
) (map[UniqueID]*datapb.FieldBinlog, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "UploadInsertLog")
defer span.End()
iCodec := storage.NewInsertCodecWithSchema(meta)
kvs := make(map[string][]byte)
if iData.IsEmpty() {
@ -304,12 +192,12 @@ func (b *binlogIO) uploadInsertLog(
return nil, nil
}
inpaths, err := b.genInsertBlobs(iData, partID, segID, iCodec, kvs)
inpaths, err := genInsertBlobs(b, allocator, iData, collectionID, partID, segID, iCodec, kvs)
if err != nil {
return nil, err
}
err = b.uploadSegmentFiles(ctx, meta.GetID(), segID, kvs)
err = b.Upload(ctx, kvs)
if err != nil {
return nil, err
}
@ -317,12 +205,14 @@ func (b *binlogIO) uploadInsertLog(
return inpaths, nil
}
func (b *binlogIO) uploadDeltaLog(
func uploadDeltaLog(
ctx context.Context,
segID UniqueID,
b io.BinlogIO,
allocator allocator.Allocator,
collectionID UniqueID,
partID UniqueID,
segID UniqueID,
dData *DeleteData,
meta *etcdpb.CollectionMeta,
) ([]*datapb.FieldBinlog, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "UploadDeltaLog")
defer span.End()
@ -332,10 +222,10 @@ func (b *binlogIO) uploadDeltaLog(
)
if dData.RowCount > 0 {
k, v, err := b.genDeltaBlobs(dData, meta.GetID(), partID, segID)
k, v, err := genDeltaBlobs(b, allocator, dData, collectionID, partID, segID)
if err != nil {
log.Warn("generate delta blobs wrong",
zap.Int64("collectionID", meta.GetID()),
zap.Int64("collectionID", collectionID),
zap.Int64("segmentID", segID),
zap.Error(err))
return nil, err
@ -354,7 +244,7 @@ func (b *binlogIO) uploadDeltaLog(
return nil, nil
}
err := b.uploadSegmentFiles(ctx, meta.GetID(), segID, kvs)
err := b.Upload(ctx, kvs)
if err != nil {
return nil, err
}

View File

@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/io"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
@ -53,8 +54,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
t.Run("Test download", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
b := &binlogIO{cm, alloc}
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
tests := []struct {
isvalid bool
ks []string // for preparation
@ -77,19 +77,19 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
assert.NotEmpty(t, blob)
inkeys = append(inkeys, key)
loaded, err := b.download(test.inctx, []string{key})
loaded, err := downloadBlobs(test.inctx, binlogIO, []string{key})
assert.NoError(t, err)
assert.ElementsMatch(t, blob, loaded[0].GetValue())
}
loaded, err := b.download(test.inctx, inkeys)
loaded, err := downloadBlobs(test.inctx, binlogIO, inkeys)
assert.NoError(t, err)
assert.Equal(t, len(test.ks), len(loaded))
} else {
ctx, cancel := context.WithCancel(test.inctx)
cancel()
_, err := b.download(ctx, []string{"test"})
_, err := downloadBlobs(ctx, binlogIO, []string{"test"})
assert.EqualError(t, err, errDownloadFromBlobStorage.Error())
}
})
@ -97,12 +97,10 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
})
t.Run("Test download twice", func(t *testing.T) {
mkc := &mockCm{errRead: true}
alloc := allocator.NewMockAllocator(t)
b := &binlogIO{mkc, alloc}
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
ctx, cancel := context.WithTimeout(context.TODO(), time.Millisecond*20)
blobs, err := b.download(ctx, []string{"a"})
blobs, err := downloadBlobs(ctx, binlogIO, []string{"a"})
assert.Error(t, err)
assert.Empty(t, blobs)
cancel()
@ -114,9 +112,10 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
t.Run("gen insert blob failed", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(nil, fmt.Errorf("mock err"))
b := binlogIO{cm, alloc}
_, _, err := b.uploadStatsLog(context.Background(), 1, 10, genInsertData(2), genTestStat(meta), 10, meta)
alloc.EXPECT().AllocOne().Call.Return(int64(0), fmt.Errorf("mock AllocOne error"))
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
iCodec := storage.NewInsertCodecWithSchema(meta)
_, err := uploadStatsLog(context.Background(), binlogIO, alloc, meta.GetID(), 10, 1, genTestStat(meta), 10, iCodec)
assert.Error(t, err)
})
})
@ -127,34 +126,34 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
t.Run("empty insert", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
b := binlogIO{cm, alloc}
paths, err := b.uploadInsertLog(context.Background(), 1, 10, genEmptyInsertData(), meta)
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
iCodec := storage.NewInsertCodecWithSchema(meta)
paths, err := uploadInsertLog(context.Background(), binlogIO, alloc, meta.GetID(), 10, 1, genEmptyInsertData(), iCodec)
assert.NoError(t, err)
assert.Nil(t, paths)
})
t.Run("gen insert blob failed", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
b := binlogIO{cm, alloc}
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
iCodec := storage.NewInsertCodecWithSchema(meta)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(nil, fmt.Errorf("mock err"))
_, err := b.uploadInsertLog(context.Background(), 1, 10, genInsertData(2), meta)
_, err := uploadInsertLog(context.Background(), binlogIO, alloc, meta.GetID(), 10, 1, genInsertData(2), iCodec)
assert.Error(t, err)
})
t.Run("upload failed", func(t *testing.T) {
mkc := &mockCm{errRead: true, errSave: true}
alloc := allocator.NewMockAllocator(t)
b := binlogIO{mkc, alloc}
binlogIO := io.NewBinlogIO(mkc, getOrCreateIOPool())
iCodec := storage.NewInsertCodecWithSchema(meta)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
_, err := b.uploadInsertLog(ctx, 1, 10, genInsertData(2), meta)
_, err := uploadInsertLog(ctx, binlogIO, alloc, meta.GetID(), 1, 10, genInsertData(2), iCodec)
assert.Error(t, err)
})
})
@ -183,8 +182,7 @@ func TestBinlogIOInnerMethods(t *testing.T) {
t.Run("Test genDeltaBlobs", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil)
b := &binlogIO{cm, alloc}
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
f := &MetaFactory{}
meta := f.GetCollectionMeta(UniqueID(10002), "test_gen_blobs", schemapb.DataType_Int64)
@ -201,7 +199,7 @@ func TestBinlogIOInnerMethods(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
if test.isvalid {
k, v, err := b.genDeltaBlobs(&DeleteData{
k, v, err := genDeltaBlobs(binlogIO, alloc, &DeleteData{
Pks: []storage.PrimaryKey{test.deletepk},
Tss: []uint64{test.ts},
}, meta.GetID(), 10, 1)
@ -221,8 +219,8 @@ func TestBinlogIOInnerMethods(t *testing.T) {
t.Run("Test serialize error", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
b := &binlogIO{cm, alloc}
k, v, err := b.genDeltaBlobs(&DeleteData{Pks: []storage.PrimaryKey{pk}, Tss: []uint64{}}, 1, 1, 1)
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
k, v, err := genDeltaBlobs(binlogIO, alloc, &DeleteData{Pks: []storage.PrimaryKey{pk}, Tss: []uint64{}}, 1, 1, 1)
assert.Error(t, err)
assert.Empty(t, k)
assert.Empty(t, v)
@ -231,8 +229,8 @@ func TestBinlogIOInnerMethods(t *testing.T) {
t.Run("Test AllocOne error", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().AllocOne().Call.Return(int64(0), fmt.Errorf("mock AllocOne error"))
bin := binlogIO{cm, alloc}
k, v, err := bin.genDeltaBlobs(&DeleteData{Pks: []storage.PrimaryKey{pk}, Tss: []uint64{1}}, 1, 1, 1)
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
k, v, err := genDeltaBlobs(binlogIO, alloc, &DeleteData{Pks: []storage.PrimaryKey{pk}, Tss: []uint64{1}}, 1, 1, 1)
assert.Error(t, err)
assert.Empty(t, k)
assert.Empty(t, v)
@ -243,7 +241,7 @@ func TestBinlogIOInnerMethods(t *testing.T) {
f := &MetaFactory{}
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil)
b := binlogIO{cm, alloc}
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
tests := []struct {
pkType schemapb.DataType
@ -260,7 +258,7 @@ func TestBinlogIOInnerMethods(t *testing.T) {
iCodec := storage.NewInsertCodecWithSchema(meta)
kvs := make(map[string][]byte)
pin, err := b.genInsertBlobs(genInsertData(2), 10, 1, iCodec, kvs)
pin, err := genInsertBlobs(binlogIO, alloc, genInsertData(2), meta.GetID(), 10, 1, iCodec, kvs)
assert.NoError(t, err)
assert.Equal(t, 12, len(pin))
@ -282,9 +280,10 @@ func TestBinlogIOInnerMethods(t *testing.T) {
t.Run("serialize error", func(t *testing.T) {
iCodec := storage.NewInsertCodecWithSchema(nil)
bin := &binlogIO{cm, allocator.NewMockAllocator(t)}
alloc := allocator.NewMockAllocator(t)
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
kvs := make(map[string][]byte)
pin, err := bin.genInsertBlobs(genEmptyInsertData(), 10, 1, iCodec, kvs)
pin, err := genInsertBlobs(binlogIO, alloc, genEmptyInsertData(), 0, 10, 1, iCodec, kvs)
assert.Error(t, err)
assert.Empty(t, kvs)
@ -298,10 +297,10 @@ func TestBinlogIOInnerMethods(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock GetGenerator error"))
bin := &binlogIO{cm, alloc}
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
kvs := make(map[string][]byte)
pin, err := bin.genInsertBlobs(genInsertData(2), 10, 1, iCodec, kvs)
pin, err := genInsertBlobs(binlogIO, alloc, genInsertData(2), meta.GetID(), 10, 1, iCodec, kvs)
assert.Error(t, err)
assert.Empty(t, kvs)
@ -314,7 +313,7 @@ func TestBinlogIOInnerMethods(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().AllocOne().Return(0, nil)
b := binlogIO{cm, alloc}
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
tests := []struct {
pkType schemapb.DataType
@ -331,7 +330,7 @@ func TestBinlogIOInnerMethods(t *testing.T) {
iCodec := storage.NewInsertCodecWithSchema(meta)
kvs := make(map[string][]byte)
stat, err := b.genStatBlobs(genTestStat(meta), 10, 1, iCodec, kvs, 0)
stat, err := genStatBlobs(binlogIO, alloc, genTestStat(meta), meta.GetID(), 10, 1, iCodec, kvs, 0)
assert.NoError(t, err)
assert.Equal(t, 1, len(stat))
@ -343,14 +342,14 @@ func TestBinlogIOInnerMethods(t *testing.T) {
t.Run("Test genStatsBlob error", func(t *testing.T) {
f := &MetaFactory{}
alloc := allocator.NewMockAllocator(t)
b := binlogIO{cm, alloc}
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
t.Run("serialize error", func(t *testing.T) {
meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_stat_blobs_error", schemapb.DataType_Int64)
iCodec := storage.NewInsertCodecWithSchema(meta)
kvs := make(map[string][]byte)
_, err := b.genStatBlobs(nil, 10, 1, iCodec, kvs, 0)
_, err := genStatBlobs(binlogIO, alloc, nil, meta.GetID(), 10, 1, iCodec, kvs, 0)
assert.Error(t, err)
})
})
@ -378,6 +377,9 @@ func (mk *mockCm) Write(ctx context.Context, filePath string, content []byte) er
}
func (mk *mockCm) MultiWrite(ctx context.Context, contents map[string][]byte) error {
if mk.errSave {
return errors.New("mockKv save error")
}
return nil
}

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/io"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
@ -70,8 +71,7 @@ var _ compactor = (*compactionTask)(nil)
// for MixCompaction only
type compactionTask struct {
downloader
uploader
binlogIO io.BinlogIO
compactor
metaCache metacache.MetaCache
syncMgr syncmgr.SyncManager
@ -89,8 +89,7 @@ type compactionTask struct {
func newCompactionTask(
ctx context.Context,
dl downloader,
ul uploader,
binlogIO io.BinlogIO,
metaCache metacache.MetaCache,
syncMgr syncmgr.SyncManager,
alloc allocator.Allocator,
@ -98,17 +97,15 @@ func newCompactionTask(
) *compactionTask {
ctx1, cancel := context.WithCancel(ctx)
return &compactionTask{
ctx: ctx1,
cancel: cancel,
downloader: dl,
uploader: ul,
syncMgr: syncMgr,
metaCache: metaCache,
Allocator: alloc,
plan: plan,
tr: timerecord.NewTimeRecorder("levelone compaction"),
done: make(chan struct{}, 1),
ctx: ctx1,
cancel: cancel,
binlogIO: binlogIO,
syncMgr: syncMgr,
metaCache: metaCache,
Allocator: alloc,
plan: plan,
tr: timerecord.NewTimeRecorder("levelone compaction"),
done: make(chan struct{}, 1),
}
}
@ -184,9 +181,18 @@ func (t *compactionTask) uploadRemainLog(
stats *storage.PrimaryKeyStats,
totRows int64,
writeBuffer *storage.InsertData,
fID2Type map[UniqueID]schemapb.DataType,
) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) {
inPaths, statPaths, err := t.uploadStatsLog(ctxTimeout, targetSegID, partID, writeBuffer, stats, totRows, meta)
iCodec := storage.NewInsertCodecWithSchema(meta)
inPaths := make(map[int64]*datapb.FieldBinlog, 0)
var err error
if !writeBuffer.IsEmpty() {
inPaths, err = uploadInsertLog(ctxTimeout, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, writeBuffer, iCodec)
if err != nil {
return nil, nil, err
}
}
statPaths, err := uploadStatsLog(ctxTimeout, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, stats, totRows, iCodec)
if err != nil {
return nil, nil, err
}
@ -200,9 +206,10 @@ func (t *compactionTask) uploadSingleInsertLog(
partID UniqueID,
meta *etcdpb.CollectionMeta,
writeBuffer *storage.InsertData,
fID2Type map[UniqueID]schemapb.DataType,
) (map[UniqueID]*datapb.FieldBinlog, error) {
inPaths, err := t.uploadInsertLog(ctxTimeout, targetSegID, partID, writeBuffer, meta)
iCodec := storage.NewInsertCodecWithSchema(meta)
inPaths, err := uploadInsertLog(ctxTimeout, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, writeBuffer, iCodec)
if err != nil {
return nil, err
}
@ -228,8 +235,6 @@ func (t *compactionTask) merge(
numRows int64 // the number of rows uploaded
expired int64 // the number of expired entity
fID2Type = make(map[UniqueID]schemapb.DataType)
insertField2Path = make(map[UniqueID]*datapb.FieldBinlog)
insertPaths = make([]*datapb.FieldBinlog, 0)
@ -283,7 +288,6 @@ func (t *compactionTask) merge(
// get pkID, pkType, dim
var pkField *schemapb.FieldSchema
for _, fs := range meta.GetSchema().GetFields() {
fID2Type[fs.GetFieldID()] = fs.GetDataType()
if fs.GetIsPrimaryKey() && fs.GetFieldID() >= 100 && typeutil.IsPrimaryFieldType(fs.GetDataType()) {
pkField = fs
}
@ -322,7 +326,7 @@ func (t *compactionTask) merge(
for _, path := range unMergedInsertlogs {
downloadStart := time.Now()
data, err := t.download(ctx, path)
data, err := downloadBlobs(ctx, t.binlogIO, path)
if err != nil {
log.Warn("download insertlogs wrong", zap.Strings("path", path), zap.Error(err))
return nil, nil, 0, err
@ -380,7 +384,7 @@ func (t *compactionTask) merge(
if (currentRows+1)%100 == 0 && writeBuffer.GetMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsInt() {
numRows += int64(writeBuffer.GetRowNum())
uploadInsertStart := time.Now()
inPaths, err := t.uploadSingleInsertLog(ctx, targetSegID, partID, meta, writeBuffer, fID2Type)
inPaths, err := t.uploadSingleInsertLog(ctx, targetSegID, partID, meta, writeBuffer)
if err != nil {
log.Warn("failed to upload single insert log", zap.Error(err))
return nil, nil, 0, err
@ -402,7 +406,7 @@ func (t *compactionTask) merge(
numRows += int64(writeBuffer.GetRowNum())
uploadStart := time.Now()
inPaths, statsPaths, err := t.uploadRemainLog(ctx, targetSegID, partID, meta,
stats, numRows+int64(currentRows), writeBuffer, fID2Type)
stats, numRows+int64(currentRows), writeBuffer)
if err != nil {
return nil, nil, 0, err
}
@ -511,7 +515,7 @@ func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) {
}
if len(paths) != 0 {
bs, err := t.download(ctxTimeout, paths)
bs, err := downloadBlobs(ctxTimeout, t.binlogIO, paths)
if err != nil {
log.Warn("compact wrong, fail to download deltalogs", zap.Int64("segment", segID), zap.Strings("path", paths), zap.Error(err))
return nil, err

View File

@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/datanode/io"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
@ -292,12 +293,12 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil)
alloc.EXPECT().AllocOne().Return(0, nil)
t.Run("Merge without expiration", func(t *testing.T) {
mockbIO := &binlogIO{cm, alloc}
mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool())
paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0")
iData := genInsertDataWithExpiredTS()
iCodec := storage.NewInsertCodecWithSchema(meta)
var allPaths [][]string
inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -316,10 +317,10 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
}
ct := &compactionTask{
metaCache: metaCache,
downloader: mockbIO,
uploader: mockbIO,
done: make(chan struct{}, 1),
metaCache: metaCache,
binlogIO: mockbIO,
Allocator: alloc,
done: make(chan struct{}, 1),
plan: &datapb.CompactionPlan{
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{SegmentID: 1},
@ -335,7 +336,8 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
assert.NotEqual(t, -1, inPaths[0].GetBinlogs()[0].GetTimestampTo())
})
t.Run("Merge without expiration2", func(t *testing.T) {
mockbIO := &binlogIO{cm, alloc}
mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool())
iCodec := storage.NewInsertCodecWithSchema(meta)
paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0")
BinLogMaxSize := Params.DataNodeCfg.BinLogMaxSize.GetValue()
defer func() {
@ -346,7 +348,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
var allPaths [][]string
inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -363,10 +365,10 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
dm := map[interface{}]Timestamp{}
ct := &compactionTask{
metaCache: metaCache,
downloader: mockbIO,
uploader: mockbIO,
done: make(chan struct{}, 1),
metaCache: metaCache,
binlogIO: mockbIO,
Allocator: alloc,
done: make(chan struct{}, 1),
plan: &datapb.CompactionPlan{
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{SegmentID: 1},
@ -384,7 +386,8 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
})
// set Params.DataNodeCfg.BinLogMaxSize.Key = 1 to generate multi binlogs, each has only one row
t.Run("merge_with_more_than_100rows", func(t *testing.T) {
mockbIO := &binlogIO{cm, alloc}
mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool())
iCodec := storage.NewInsertCodecWithSchema(meta)
paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0")
BinLogMaxSize := Params.DataNodeCfg.BinLogMaxSize.GetAsInt()
defer func() {
@ -394,7 +397,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
iData := genInsertData(101)
var allPaths [][]string
inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -413,10 +416,10 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
}
ct := &compactionTask{
metaCache: metaCache,
downloader: mockbIO,
uploader: mockbIO,
done: make(chan struct{}, 1),
metaCache: metaCache,
binlogIO: mockbIO,
Allocator: alloc,
done: make(chan struct{}, 1),
plan: &datapb.CompactionPlan{
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{SegmentID: 1},
@ -435,13 +438,13 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
})
t.Run("Merge with expiration", func(t *testing.T) {
mockbIO := &binlogIO{cm, alloc}
mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool())
iCodec := storage.NewInsertCodecWithSchema(meta)
iData := genInsertDataWithExpiredTS()
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
var allPaths [][]string
inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -461,9 +464,9 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
// 10 days in seconds
ct := &compactionTask{
metaCache: metaCache,
downloader: mockbIO,
uploader: mockbIO,
metaCache: metaCache,
binlogIO: mockbIO,
Allocator: alloc,
plan: &datapb.CompactionPlan{
CollectionTtl: 864000,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
@ -480,8 +483,9 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
})
t.Run("merge_with_rownum_zero", func(t *testing.T) {
mockbIO := &binlogIO{cm, alloc}
mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool())
iData := genInsertDataWithExpiredTS()
iCodec := storage.NewInsertCodecWithSchema(meta)
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
metaCache := metacache.NewMockMetaCache(t)
metaCache.EXPECT().Schema().Return(meta.GetSchema()).Maybe()
@ -496,7 +500,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
})
var allPaths [][]string
inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -515,10 +519,10 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
}
ct := &compactionTask{
metaCache: metaCache,
downloader: mockbIO,
uploader: mockbIO,
done: make(chan struct{}, 1),
metaCache: metaCache,
binlogIO: mockbIO,
Allocator: alloc,
done: make(chan struct{}, 1),
plan: &datapb.CompactionPlan{
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{SegmentID: 1},
@ -533,13 +537,14 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
})
t.Run("Merge with meta error", func(t *testing.T) {
mockbIO := &binlogIO{cm, alloc}
mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool())
iCodec := storage.NewInsertCodecWithSchema(meta)
paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0")
iData := genInsertDataWithExpiredTS()
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
var allPaths [][]string
inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -558,10 +563,10 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
}
ct := &compactionTask{
metaCache: metaCache,
downloader: mockbIO,
uploader: mockbIO,
done: make(chan struct{}, 1),
metaCache: metaCache,
binlogIO: mockbIO,
Allocator: alloc,
done: make(chan struct{}, 1),
plan: &datapb.CompactionPlan{
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{SegmentID: 1},
@ -579,13 +584,14 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
})
t.Run("Merge with meta type param error", func(t *testing.T) {
mockbIO := &binlogIO{cm, alloc}
mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool())
iCodec := storage.NewInsertCodecWithSchema(meta)
paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0")
iData := genInsertDataWithExpiredTS()
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)
var allPaths [][]string
inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
@ -604,10 +610,10 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
}
ct := &compactionTask{
metaCache: metaCache,
downloader: mockbIO,
uploader: mockbIO,
done: make(chan struct{}, 1),
metaCache: metaCache,
binlogIO: mockbIO,
Allocator: alloc,
done: make(chan struct{}, 1),
}
_, _, _, err = ct.merge(context.Background(), allPaths, 2, 0, &etcdpb.CollectionMeta{
@ -727,11 +733,12 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
require.NoError(t, err)
ct := &compactionTask{
uploader: &binlogIO{&mockCm{errSave: true}, alloc},
done: make(chan struct{}, 1),
binlogIO: io.NewBinlogIO(&mockCm{errSave: true}, getOrCreateIOPool()),
Allocator: alloc,
done: make(chan struct{}, 1),
}
_, _, err = ct.uploadRemainLog(ctx, 1, 2, meta, stats, 10, nil, nil)
_, _, err = ct.uploadRemainLog(ctx, 1, 2, meta, stats, 10, nil)
assert.Error(t, err)
})
})
@ -855,7 +862,8 @@ func TestCompactorInterfaceMethods(t *testing.T) {
collName := "test_compact_coll_name"
meta := NewMetaFactory().GetCollectionMeta(c.colID, collName, c.pkType)
mockbIO := &binlogIO{cm, alloc}
mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool())
iCodec := storage.NewInsertCodecWithSchema(meta)
mockKv := memkv.NewMemoryKV()
metaCache := metacache.NewMockMetaCache(t)
metaCache.EXPECT().Collection().Return(c.colID)
@ -906,17 +914,21 @@ func TestCompactorInterfaceMethods(t *testing.T) {
stats1, err := storage.NewPrimaryKeyStats(1, int64(c.pkType), 1)
require.NoError(t, err)
iPaths1, sPaths1, err := mockbIO.uploadStatsLog(context.TODO(), c.segID1, c.parID, iData1, stats1, 2, meta)
iPaths1, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), c.parID, c.segID1, iData1, iCodec)
require.NoError(t, err)
dPaths1, err := mockbIO.uploadDeltaLog(context.TODO(), c.segID1, c.parID, dData1, meta)
sPaths1, err := uploadStatsLog(context.Background(), mockbIO, alloc, meta.GetID(), c.parID, c.segID1, stats1, 2, iCodec)
require.NoError(t, err)
dPaths1, err := uploadDeltaLog(context.TODO(), mockbIO, alloc, meta.GetID(), c.parID, c.segID1, dData1)
require.NoError(t, err)
require.Equal(t, 12, len(iPaths1))
stats2, err := storage.NewPrimaryKeyStats(1, int64(c.pkType), 1)
require.NoError(t, err)
iPaths2, sPaths2, err := mockbIO.uploadStatsLog(context.TODO(), c.segID2, c.parID, iData2, stats2, 2, meta)
iPaths2, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), c.parID, c.segID2, iData2, iCodec)
require.NoError(t, err)
dPaths2, err := mockbIO.uploadDeltaLog(context.TODO(), c.segID2, c.parID, dData2, meta)
sPaths2, err := uploadStatsLog(context.Background(), mockbIO, alloc, meta.GetID(), c.parID, c.segID2, stats2, 2, iCodec)
require.NoError(t, err)
dPaths2, err := uploadDeltaLog(context.TODO(), mockbIO, alloc, meta.GetID(), c.parID, c.segID2, dData2)
require.NoError(t, err)
require.Equal(t, 12, len(iPaths2))
@ -942,7 +954,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
Channel: "channelname",
}
task := newCompactionTask(context.TODO(), mockbIO, mockbIO, metaCache, syncMgr, alloc, plan)
task := newCompactionTask(context.TODO(), mockbIO, metaCache, syncMgr, alloc, plan)
result, err := task.compact()
assert.NoError(t, err)
assert.NotNil(t, result)
@ -988,7 +1000,8 @@ func TestCompactorInterfaceMethods(t *testing.T) {
meta := NewMetaFactory().GetCollectionMeta(collID, "test_compact_coll_name", schemapb.DataType_Int64)
mockbIO := &binlogIO{cm, alloc}
mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool())
iCodec := storage.NewInsertCodecWithSchema(meta)
metaCache := metacache.NewMockMetaCache(t)
metaCache.EXPECT().Collection().Return(collID)
@ -1044,17 +1057,21 @@ func TestCompactorInterfaceMethods(t *testing.T) {
stats1, err := storage.NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 1)
require.NoError(t, err)
iPaths1, sPaths1, err := mockbIO.uploadStatsLog(context.TODO(), segID1, partID, iData1, stats1, 1, meta)
iPaths1, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partID, segID1, iData1, iCodec)
require.NoError(t, err)
dPaths1, err := mockbIO.uploadDeltaLog(context.TODO(), segID1, partID, dData1, meta)
sPaths1, err := uploadStatsLog(context.Background(), mockbIO, alloc, meta.GetID(), partID, segID1, stats1, 1, iCodec)
require.NoError(t, err)
dPaths1, err := uploadDeltaLog(context.TODO(), mockbIO, alloc, meta.GetID(), partID, segID1, dData1)
require.NoError(t, err)
require.Equal(t, 12, len(iPaths1))
stats2, err := storage.NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 1)
require.NoError(t, err)
iPaths2, sPaths2, err := mockbIO.uploadStatsLog(context.TODO(), segID2, partID, iData2, stats2, 1, meta)
iPaths2, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partID, segID2, iData2, iCodec)
require.NoError(t, err)
dPaths2, err := mockbIO.uploadDeltaLog(context.TODO(), segID2, partID, dData2, meta)
sPaths2, err := uploadStatsLog(context.Background(), mockbIO, alloc, meta.GetID(), partID, segID2, stats2, 1, iCodec)
require.NoError(t, err)
dPaths2, err := uploadDeltaLog(context.TODO(), mockbIO, alloc, meta.GetID(), partID, segID2, dData2)
require.NoError(t, err)
require.Equal(t, 12, len(iPaths2))
@ -1080,7 +1097,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
Channel: "channelname",
}
task := newCompactionTask(context.TODO(), mockbIO, mockbIO, metaCache, syncMgr, alloc, plan)
task := newCompactionTask(context.TODO(), mockbIO, metaCache, syncMgr, alloc, plan)
result, err := task.compact()
assert.NoError(t, err)
assert.NotNil(t, result)

View File

@ -285,11 +285,10 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
req,
)
case datapb.CompactionType_MixCompaction:
// TODO, replace this binlogIO with io.BinlogIO
binlogIO := &binlogIO{node.chunkManager, ds.idAllocator}
binlogIO := io.NewBinlogIO(node.chunkManager, getOrCreateIOPool())
task = newCompactionTask(
taskCtx,
binlogIO, binlogIO,
binlogIO,
ds.metacache,
node.syncMgr,
node.allocator,

View File

@ -205,6 +205,9 @@ func (insertCodec *InsertCodec) SerializePkStatsByData(data *InsertData) (*Blob,
func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data *InsertData) ([]*Blob, error) {
blobs := make([]*Blob, 0)
var writer *InsertBinlogWriter
if insertCodec.Schema == nil {
return nil, fmt.Errorf("schema is not set")
}
timeFieldData, ok := data.Data[common.TimeStampField]
if !ok {
return nil, fmt.Errorf("data doesn't contains timestamp field")