Apply flush manager logic in datanode (#10142)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/10158/head
congqixia 2021-10-19 11:04:34 +08:00 committed by GitHub
parent f467260208
commit 5737e0075f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 524 additions and 536 deletions

View File

@ -21,6 +21,7 @@ import (
"errors"
"fmt"
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -45,8 +46,7 @@ type dataSyncService struct {
clearSignal chan<- UniqueID
flushingSegCache *Cache
saveBinlog func(fu *segmentFlushUnit) error
flushManager flushManager
}
func newDataSyncService(ctx context.Context,
@ -128,7 +128,6 @@ func (dsService *dataSyncService) close() {
// initNodes inits a TimetickedFlowGraph
func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) error {
// TODO: add delete pipeline support
dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)
m := map[string]interface{}{
@ -142,24 +141,43 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
return err
}
saveBinlog := func(fu *segmentFlushUnit) error {
id2path := []*datapb.FieldBinlog{}
// MinIO
option := &miniokv.Option{
Address: Params.MinioAddress,
AccessKeyID: Params.MinioAccessKeyID,
SecretAccessKeyID: Params.MinioSecretAccessKey,
UseSSL: Params.MinioUseSSL,
CreateBucket: true,
BucketName: Params.MinioBucketName,
}
minIOKV, err := miniokv.NewMinIOKV(dsService.ctx, option)
if err != nil {
return err
}
dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, minIOKV, dsService.replica, func(pack *segmentFlushPack) error {
fieldInsert := []*datapb.FieldBinlog{}
fieldStats := []*datapb.FieldBinlog{}
checkPoints := []*datapb.CheckPoint{}
for k, v := range fu.field2Path {
id2path = append(id2path, &datapb.FieldBinlog{FieldID: k, Binlogs: []string{v}})
for k, v := range pack.insertLogs {
fieldInsert = append(fieldInsert, &datapb.FieldBinlog{FieldID: k, Binlogs: []string{v}})
}
for k, v := range fu.checkPoint {
v := v
checkPoints = append(checkPoints, &datapb.CheckPoint{
SegmentID: k,
NumOfRows: v.numRows,
Position: &v.pos,
})
for k, v := range pack.statsLogs {
fieldStats = append(fieldStats, &datapb.FieldBinlog{FieldID: k, Binlogs: []string{v}})
}
// only current segment checkpoint info,
updates, _ := dsService.replica.getSegmentStatisticsUpdates(pack.segmentID)
checkPoints = append(checkPoints, &datapb.CheckPoint{
SegmentID: pack.segmentID,
NumOfRows: updates.GetNumRows(),
Position: pack.pos,
})
log.Debug("SaveBinlogPath",
zap.Int64("SegmentID", fu.segID),
zap.Int64("CollectionID", fu.collID),
zap.Int("Length of Field2BinlogPaths", len(id2path)),
zap.Int64("SegmentID", pack.segmentID),
zap.Int64("CollectionID", dsService.collectionID),
zap.Int("Length of Field2BinlogPaths", len(fieldInsert)),
)
req := &datapb.SaveBinlogPathsRequest{
@ -169,12 +187,14 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
Timestamp: 0, //TODO time stamp
SourceID: Params.NodeID,
},
SegmentID: fu.segID,
CollectionID: fu.collID,
Field2BinlogPaths: id2path,
CheckPoints: checkPoints,
StartPositions: fu.startPositions,
Flushed: fu.flushed,
SegmentID: pack.segmentID,
CollectionID: dsService.collectionID,
Field2BinlogPaths: fieldInsert,
//TODO WIP add statslog and deltalog
CheckPoints: checkPoints,
StartPositions: dsService.replica.listNewSegmentsStartPositions(),
Flushed: pack.flushed,
}
rsp, err := dsService.dataCoord.SaveBinlogPaths(dsService.ctx, req)
if err != nil {
@ -184,9 +204,8 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
return fmt.Errorf("data service save bin log path failed, reason = %s", rsp.Reason)
}
return nil
}
dsService.saveBinlog = saveBinlog
})
c := &nodeConfig{
msFactory: dsService.msFactory,
@ -209,7 +228,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
insertBufferNode, err = newInsertBufferNode(
dsService.ctx,
dsService.flushCh,
saveBinlog,
dsService.flushManager,
dsService.flushingSegCache,
c,
)
@ -218,7 +237,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
}
var deleteNode Node
deleteNode, err = newDeleteNode(dsService.ctx, c)
deleteNode, err = newDeleteNode(dsService.ctx, dsService.flushManager, c)
if err != nil {
return err
}

View File

@ -154,28 +154,6 @@ func TestDataSyncService_newDataSyncService(te *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, ds)
// save binlog
fu := &segmentFlushUnit{
collID: 1,
segID: 100,
field2Path: map[UniqueID]string{100: "path1"},
checkPoint: map[UniqueID]segmentCheckPoint{100: {100, internalpb.MsgPosition{}}},
}
df.SaveBinlogPathError = true
err := ds.saveBinlog(fu)
assert.Error(t, err)
df.SaveBinlogPathError = false
df.SaveBinlogPathNotSucess = true
err = ds.saveBinlog(fu)
assert.Error(t, err)
df.SaveBinlogPathError = false
df.SaveBinlogPathNotSucess = false
err = ds.saveBinlog(fu)
assert.NoError(t, err)
// start
ds.fg = nil
ds.start()

View File

@ -20,17 +20,13 @@ import (
"context"
"encoding/binary"
"math"
"path"
"strconv"
"sync"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/kv"
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/opentracing/opentracing-go"
@ -44,11 +40,11 @@ type (
// DeleteNode is to process delete msg, flush delete info into storage.
type deleteNode struct {
BaseNode
channelName string
delBuf sync.Map // map[segmentID]*DelDataBuf
replica Replica
idAllocator allocatorInterface
minIOKV kv.BaseKV
channelName string
delBuf sync.Map // map[segmentID]*DelDataBuf
replica Replica
idAllocator allocatorInterface
flushManager flushManager
}
// DelDataBuf buffers insert data, monitoring buffer size and limit
@ -187,7 +183,22 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
// handle flush
if len(fgMsg.segmentsToFlush) > 0 {
log.Debug("DeleteNode receives flush message", zap.Int64s("segIDs", fgMsg.segmentsToFlush))
dn.flushDelData(fgMsg.segmentsToFlush, fgMsg.timeRange)
for _, segmentToFlush := range fgMsg.segmentsToFlush {
buf, ok := dn.delBuf.Load(segmentToFlush)
if !ok {
// send signal
dn.flushManager.flushDelData(nil, segmentToFlush, fgMsg.endPositions[0])
} else {
err := dn.flushManager.flushDelData(buf.(*DelDataBuf), segmentToFlush, fgMsg.endPositions[0])
if err != nil {
log.Warn("Failed to flush delete data", zap.Error(err))
} else {
// clean up
dn.delBuf.Delete(segmentToFlush)
}
}
}
}
for _, sp := range spans {
@ -215,97 +226,18 @@ func (dn *deleteNode) filterSegmentByPK(partID UniqueID, pks []int64) map[int64]
return result
}
func (dn *deleteNode) flushDelData(segIDs []UniqueID, timeRange TimeRange) {
segsToFlush := make(map[UniqueID]struct{}, len(segIDs))
for _, segID := range segIDs {
segsToFlush[segID] = struct{}{}
}
collID := dn.replica.getCollectionID()
schema, err := dn.replica.getCollectionSchema(collID, timeRange.timestampMax)
if err != nil {
log.Error("failed to get collection schema", zap.Error(err))
return
}
delCodec := storage.NewDeleteCodec(&etcdpb.CollectionMeta{
ID: collID,
Schema: schema,
})
kvs := make(map[string]string)
// buffer data to binlogs
dn.delBuf.Range(func(k, v interface{}) bool {
segID := k.(int64)
if _, has := segsToFlush[segID]; !has {
return true
}
delDataBuf := v.(*DelDataBuf)
collID, partID, err := dn.replica.getCollectionAndPartitionID(segID)
if err != nil {
log.Error("failed to get collection ID and partition ID", zap.Error(err))
return false
}
blob, err := delCodec.Serialize(partID, segID, delDataBuf.delData)
if err != nil {
log.Error("failed to serialize delete data", zap.Error(err))
return false
}
// write insert binlog
logID, err := dn.idAllocator.allocID()
if err != nil {
log.Error("failed to alloc ID", zap.Error(err))
return false
}
blobKey, _ := dn.idAllocator.genKey(false, collID, partID, segID, logID)
blobPath := path.Join(Params.DeleteBinlogRootPath, blobKey)
kvs[blobPath] = string(blob.Value[:])
log.Debug("delete blob path", zap.String("path", blobPath))
return true
})
if len(kvs) > 0 {
err = dn.minIOKV.MultiSave(kvs)
if err != nil {
log.Error("failed to save minIO ..", zap.Error(err))
}
log.Debug("save delete blobs to minIO successfully")
}
// only after success
for _, segID := range segIDs {
dn.delBuf.Delete(segID)
}
}
func newDeleteNode(ctx context.Context, config *nodeConfig) (*deleteNode, error) {
func newDeleteNode(ctx context.Context, fm flushManager, config *nodeConfig) (*deleteNode, error) {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(config.maxQueueLength)
baseNode.SetMaxParallelism(config.maxParallelism)
// MinIO
option := &miniokv.Option{
Address: Params.MinioAddress,
AccessKeyID: Params.MinioAccessKeyID,
SecretAccessKeyID: Params.MinioSecretAccessKey,
UseSSL: Params.MinioUseSSL,
CreateBucket: true,
BucketName: Params.MinioBucketName,
}
minIOKV, err := miniokv.NewMinIOKV(ctx, option)
if err != nil {
return nil, err
}
return &deleteNode{
BaseNode: baseNode,
delBuf: sync.Map{},
minIOKV: minIOKV,
replica: config.replica,
idAllocator: config.allocator,
channelName: config.vChannelName,
replica: config.replica,
idAllocator: config.allocator,
channelName: config.vChannelName,
flushManager: fm,
}, nil
}

View File

@ -19,10 +19,13 @@ package datanode
import (
"context"
"encoding/binary"
"errors"
"testing"
"time"
"github.com/bits-and-blooms/bloom/v3"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/stretchr/testify/assert"
@ -63,13 +66,35 @@ func (replica *mockReplica) getCollectionID() UniqueID {
}
func (replica *mockReplica) getCollectionSchema(collectionID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error) {
if ts == 0 {
return nil, errors.New("mocked error")
}
return &schemapb.CollectionSchema{}, nil
}
func (replica *mockReplica) getCollectionAndPartitionID(segID UniqueID) (collID, partitionID UniqueID, err error) {
if segID == -1 {
return -1, -1, errors.New("mocked error")
}
return 0, 1, nil
}
func (replica *mockReplica) hasSegment(segID UniqueID, countFlushed bool) bool {
_, has := replica.newSegments[segID]
if has {
return true
}
_, has = replica.normalSegments[segID]
if has {
return true
}
if !countFlushed {
return false
}
_, has = replica.flushedSegments[segID]
return has
}
func TestFlowGraphDeleteNode_newDeleteNode(te *testing.T) {
tests := []struct {
ctx context.Context
@ -82,7 +107,7 @@ func TestFlowGraphDeleteNode_newDeleteNode(te *testing.T) {
for _, test := range tests {
te.Run(test.description, func(t *testing.T) {
dn, err := newDeleteNode(test.ctx, test.config)
dn, err := newDeleteNode(test.ctx, nil, test.config)
assert.Nil(t, err)
assert.NotNil(t, dn)
@ -179,6 +204,10 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
pks = []int64{3, 17, 44, 190, 425}
)
replica := genMockReplica(segIDs, pks, chanName)
kv := memkv.NewMemoryKV()
fm := NewRendezvousFlushManager(NewAllocatorFactory(), kv, replica, func(*segmentFlushPack) error {
return nil
})
t.Run("Test get segment by primary keys", func(te *testing.T) {
c := &nodeConfig{
replica: replica,
@ -186,7 +215,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
vChannelName: chanName,
}
dn, err := newDeleteNode(context.Background(), c)
dn, err := newDeleteNode(context.Background(), fm, c)
assert.Nil(t, err)
results := dn.filterSegmentByPK(0, pks)
@ -202,7 +231,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
}
})
t.Run("Test deleteNode Operate valid Msg", func(te *testing.T) {
t.Run("Test deleteNode Operate valid Msg with failure", func(te *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
@ -217,12 +246,42 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delNode, err := newDeleteNode(ctx, c)
delNode, err := newDeleteNode(ctx, fm, c)
assert.Nil(te, err)
msg := GenFlowGraphDeleteMsg(pks, chanName)
msg.segmentsToFlush = segIDs
// this will fail since ts = 0 will trigger mocked error
var fgMsg flowgraph.Msg = &msg
delNode.Operate([]flowgraph.Msg{fgMsg})
})
t.Run("Test deleteNode Operate valid Msg with failure", func(te *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
chanName := "datanode-test-FlowGraphDeletenode-operate"
testPath := "/test/datanode/root/meta"
assert.NoError(t, clearEtcd(testPath))
Params.MetaRootPath = testPath
Params.DeleteBinlogRootPath = testPath
c := &nodeConfig{
replica: replica,
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delNode, err := newDeleteNode(ctx, fm, c)
assert.Nil(te, err)
msg := GenFlowGraphDeleteMsg(pks, chanName)
msg.segmentsToFlush = segIDs
msg.endPositions[0].Timestamp = 100 // set to normal timestamp
var fgMsg flowgraph.Msg = &msg
delNode.Operate([]flowgraph.Msg{fgMsg})
msg.deleteMessages = []*msgstream.DeleteMsg{}
// send again shall trigger empty buffer flush
delNode.Operate([]flowgraph.Msg{fgMsg})
})
}

View File

@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"io"
"path"
"strconv"
"sync"
@ -31,8 +30,6 @@ import (
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/kv"
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/storage"
@ -63,13 +60,10 @@ type insertBufferNode struct {
flushMap sync.Map
flushChan <-chan flushMsg
flushingSegCache *Cache
minIOKV kv.BaseKV
flushManager flushManager
timeTickStream msgstream.MsgStream
segmentStatisticsStream msgstream.MsgStream
dsSaveBinlog func(fu *segmentFlushUnit) error
}
type segmentCheckPoint struct {
@ -225,48 +219,18 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
segmentsToFlush := make([]UniqueID, 0, len(seg2Upload)+1) //auto flush number + possible manual flush
// Auto Flush
finishCh := make(chan segmentFlushUnit, len(seg2Upload))
finishCnt := sync.WaitGroup{}
for _, segToFlush := range seg2Upload {
// If full, auto flush
if bd, ok := ibNode.insertBuffer.Load(segToFlush); ok && bd.(*BufferData).effectiveCap() <= 0 {
// Move data from insertBuffer to flushBuffer
log.Warn("Auto flush", zap.Int64("segment id", segToFlush))
ibuffer := bd.(*BufferData)
ibNode.flushMap.Store(segToFlush, ibuffer.buffer)
ibNode.insertBuffer.Delete(segToFlush)
log.Debug(". Insert Buffer full, auto flushing ", zap.Int64("num of rows", ibuffer.size))
collMeta, err := ibNode.getCollMetabySegID(segToFlush, fgMsg.timeRange.timestampMax)
err := ibNode.flushManager.flushBufferData(ibuffer, segToFlush, false, endPositions[0])
if err != nil {
log.Error("Auto flush failed .. cannot get collection meta ..", zap.Error(err))
continue
log.Warn("Failed to flushBufferData", zap.Error(err))
} else {
segmentsToFlush = append(segmentsToFlush, segToFlush)
ibNode.insertBuffer.Delete(segToFlush)
}
collID, partitionID, err := ibNode.getCollectionandPartitionIDbySegID(segToFlush)
if err != nil {
log.Error("Auto flush failed .. cannot get collection ID or partition ID..", zap.Error(err))
continue
}
finishCnt.Add(1)
segmentsToFlush = append(segmentsToFlush, segToFlush)
go flushSegment(collMeta, segToFlush, partitionID, collID,
&ibNode.flushMap, ibNode.minIOKV, finishCh, &finishCnt, ibNode, ibNode.idAllocator)
}
}
finishCnt.Wait()
close(finishCh)
for fu := range finishCh {
if fu.field2Path == nil {
log.Debug("segment is empty")
continue
}
fu.checkPoint = ibNode.replica.listSegmentsCheckPoints()
fu.flushed = false
if err := ibNode.dsSaveBinlog(&fu); err != nil {
log.Debug("data service save bin log path failed", zap.Error(err))
}
}
@ -281,67 +245,20 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
)
segmentsToFlush = append(segmentsToFlush, currentSegID)
bd, ok := ibNode.insertBuffer.Load(currentSegID)
if !ok || bd.(*BufferData).size <= 0 { // Buffer empty
var err error
buf := bd.(*BufferData)
if !ok || buf.size <= 0 { // Buffer empty
log.Debug(".. Buffer empty ...")
err = ibNode.dsSaveBinlog(&segmentFlushUnit{
collID: fmsg.collectionID,
segID: currentSegID,
field2Path: map[UniqueID]string{},
checkPoint: ibNode.replica.listSegmentsCheckPoints(),
flushed: true,
})
if err != nil {
log.Debug("insert buffer node save binlog failed", zap.Error(err))
break
}
ibNode.replica.segmentFlushed(currentSegID)
err = ibNode.flushManager.flushBufferData(nil, currentSegID, true, endPositions[0])
} else { // Buffer not empty
log.Debug(".. Buffer not empty, flushing ..")
finishCh := make(chan segmentFlushUnit, 1)
ibNode.flushMap.Store(currentSegID, bd.(*BufferData).buffer)
clearFn := func() {
finishCh <- segmentFlushUnit{field2Path: nil}
log.Debug(".. Clearing flush Buffer ..")
ibNode.flushMap.Delete(currentSegID)
close(finishCh)
}
collID, partitionID, err := ibNode.getCollectionandPartitionIDbySegID(currentSegID)
if err != nil {
log.Error("Flush failed .. cannot get segment ..", zap.Error(err))
clearFn()
break
// TODO add error handling
}
collMeta, err := ibNode.getCollMetabySegID(currentSegID, fgMsg.timeRange.timestampMax)
if err != nil {
log.Error("Flush failed .. cannot get collection schema ..", zap.Error(err))
clearFn()
break
// TODO add error handling
}
flushSegment(collMeta, currentSegID, partitionID, collID,
&ibNode.flushMap, ibNode.minIOKV, finishCh, nil, ibNode, ibNode.idAllocator)
fu := <-finishCh
close(finishCh)
if fu.field2Path != nil {
fu.checkPoint = ibNode.replica.listSegmentsCheckPoints()
fu.flushed = true
if err := ibNode.dsSaveBinlog(&fu); err != nil {
log.Debug("Data service save binlog path failed", zap.Error(err))
} else {
ibNode.replica.segmentFlushed(fu.segID)
ibNode.insertBuffer.Delete(fu.segID)
}
}
//always remove from flushing seg cache
ibNode.flushingSegCache.Remove(fu.segID)
err = ibNode.flushManager.flushBufferData(buf, currentSegID, true, endPositions[0])
}
if err != nil {
log.Warn("failed to manual invoke flushBufferData", zap.Error(err))
} else {
ibNode.replica.segmentFlushed(currentSegID)
ibNode.insertBuffer.Delete(currentSegID)
}
default:
}
@ -693,112 +610,6 @@ func readBinary(reader io.Reader, receiver interface{}, dataType schemapb.DataTy
}
}
func flushSegment(
collMeta *etcdpb.CollectionMeta,
segID, partitionID, collID UniqueID,
insertData *sync.Map,
kv kv.BaseKV,
flushUnit chan<- segmentFlushUnit,
wgFinish *sync.WaitGroup,
ibNode *insertBufferNode,
idAllocator allocatorInterface) {
if wgFinish != nil {
defer wgFinish.Done()
}
clearFn := func(isSuccess bool) {
if !isSuccess {
flushUnit <- segmentFlushUnit{field2Path: nil}
}
log.Debug(".. Clearing flush Buffer ..")
insertData.Delete(segID)
}
inCodec := storage.NewInsertCodec(collMeta)
// buffer data to binlogs
data, ok := insertData.Load(segID)
if !ok {
log.Error("Flush failed ... cannot load insertData ..")
clearFn(false)
return
}
binLogs, statsBinlogs, err := inCodec.Serialize(partitionID, segID, data.(*InsertData))
if err != nil {
log.Error("Flush failed ... cannot generate binlog ..", zap.Error(err))
clearFn(false)
return
}
log.Debug(".. Saving binlogs to MinIO ..", zap.Int("number", len(binLogs)))
field2Path := make(map[UniqueID]string, len(binLogs))
kvs := make(map[string]string, len(binLogs))
paths := make([]string, 0, len(binLogs))
field2Logidx := make(map[UniqueID]UniqueID, len(binLogs))
// write insert binlog
for _, blob := range binLogs {
fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
if err != nil {
log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
clearFn(false)
return
}
log.Debug("save binlog", zap.Int64("fieldID", fieldID))
logidx, err := idAllocator.allocID()
if err != nil {
log.Error("Flush failed ... cannot alloc ID ..", zap.Error(err))
clearFn(false)
return
}
// no error raise if alloc=false
k, _ := idAllocator.genKey(false, collID, partitionID, segID, fieldID, logidx)
key := path.Join(Params.InsertBinlogRootPath, k)
paths = append(paths, key)
kvs[key] = string(blob.Value[:])
field2Path[fieldID] = key
field2Logidx[fieldID] = logidx
}
// write stats binlog
for _, blob := range statsBinlogs {
fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
if err != nil {
log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
clearFn(false)
return
}
logidx := field2Logidx[fieldID]
// no error raise if alloc=false
k, _ := idAllocator.genKey(false, collID, partitionID, segID, fieldID, logidx)
key := path.Join(Params.StatsBinlogRootPath, k)
kvs[key] = string(blob.Value[:])
}
log.Debug("save binlog file to MinIO/S3")
err = kv.MultiSave(kvs)
if err != nil {
log.Error("Flush failed ... cannot save to MinIO ..", zap.Error(err))
_ = kv.MultiRemove(paths)
clearFn(false)
return
}
ibNode.replica.updateSegmentCheckPoint(segID)
startPos := ibNode.replica.listNewSegmentsStartPositions()
flushUnit <- segmentFlushUnit{collID: collID, segID: segID, field2Path: field2Path, startPositions: startPos}
clearFn(true)
}
// writeHardTimeTick writes timetick once insertBufferNode operates.
func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
msgPack := msgstream.MsgPack{}
@ -889,28 +700,13 @@ func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID Uni
return ibNode.replica.getCollectionAndPartitionID(segmentID)
}
func newInsertBufferNode(ctx context.Context, flushCh <-chan flushMsg, saveBinlog func(*segmentFlushUnit) error,
func newInsertBufferNode(ctx context.Context, flushCh <-chan flushMsg, fm flushManager,
flushingSegCache *Cache, config *nodeConfig) (*insertBufferNode, error) {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(config.maxQueueLength)
baseNode.SetMaxParallelism(config.maxParallelism)
// MinIO
option := &miniokv.Option{
Address: Params.MinioAddress,
AccessKeyID: Params.MinioAccessKeyID,
SecretAccessKeyID: Params.MinioSecretAccessKey,
UseSSL: Params.MinioUseSSL,
CreateBucket: true,
BucketName: Params.MinioBucketName,
}
minIOKV, err := miniokv.NewMinIOKV(ctx, option)
if err != nil {
return nil, err
}
//input stream, data node time tick
wTt, err := config.msFactory.NewMsgStream(ctx)
if err != nil {
@ -934,15 +730,14 @@ func newInsertBufferNode(ctx context.Context, flushCh <-chan flushMsg, saveBinlo
return &insertBufferNode{
BaseNode: baseNode,
insertBuffer: sync.Map{},
minIOKV: minIOKV,
timeTickStream: wTtMsgStream,
segmentStatisticsStream: segStatisticsMsgStream,
flushMap: sync.Map{},
flushChan: flushCh,
dsSaveBinlog: saveBinlog,
flushingSegCache: flushingSegCache,
flushManager: fm,
replica: config.replica,
idAllocator: config.allocator,

View File

@ -19,8 +19,6 @@ package datanode
import (
"context"
"errors"
"fmt"
"path"
"sync"
"testing"
"time"
@ -30,7 +28,6 @@ import (
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/flowgraph"
@ -84,10 +81,11 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
err = msFactory.SetParams(m)
assert.Nil(t, err)
saveBinlog := func(fu *segmentFlushUnit) error {
t.Log(fu)
memkv := memkv.NewMemoryKV()
fm := NewRendezvousFlushManager(&allocator{}, memkv, replica, func(*segmentFlushPack) error {
return nil
}
})
flushChan := make(chan flushMsg, 100)
@ -98,28 +96,28 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
vChannelName: "string",
}
iBNode, err := newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c)
iBNode, err := newInsertBufferNode(ctx, flushChan, fm, newCache(), c)
assert.NotNil(t, iBNode)
require.NoError(t, err)
ctxDone, cancel := context.WithCancel(ctx)
/*ctxDone, cancel := context.WithCancel(ctx)
cancel() // cancel now to make context done
_, err = newInsertBufferNode(ctxDone, flushChan, saveBinlog, newCache(), c)
assert.Error(t, err)
_, err = newInsertBufferNode(ctxDone, flushChan, fm, newCache(), c)
assert.Error(t, err)*/
c.msFactory = &CDFMsFactory{
Factory: msFactory,
cd: 0,
}
_, err = newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c)
_, err = newInsertBufferNode(ctx, flushChan, fm, newCache(), c)
assert.Error(t, err)
c.msFactory = &CDFMsFactory{
Factory: msFactory,
cd: 1,
}
_, err = newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c)
_, err = newInsertBufferNode(ctx, flushChan, fm, newCache(), c)
assert.Error(t, err)
}
@ -180,10 +178,11 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
err = msFactory.SetParams(m)
assert.Nil(t, err)
saveBinlog := func(fu *segmentFlushUnit) error {
t.Log(fu)
memkv := memkv.NewMemoryKV()
fm := NewRendezvousFlushManager(NewAllocatorFactory(), memkv, replica, func(*segmentFlushPack) error {
return nil
}
})
flushChan := make(chan flushMsg, 100)
c := &nodeConfig{
@ -193,7 +192,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
vChannelName: "string",
}
iBNode, err := newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c)
iBNode, err := newInsertBufferNode(ctx, flushChan, fm, newCache(), c)
require.NoError(t, err)
flushChan <- flushMsg{
@ -208,6 +207,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
iBNode.Operate([]flowgraph.Msg{fgMsg})
}
/*
func TestFlushSegment(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
@ -259,9 +259,12 @@ func TestFlushSegment(t *testing.T) {
err = msFactory.SetParams(m)
assert.Nil(t, err)
flushChan := make(chan flushMsg, 100)
saveBinlog := func(*segmentFlushUnit) error {
memkv := memkv.NewMemoryKV()
fm := NewRendezvousFlushManager(&allocator{}, memkv, replica, func(*segmentFlushPack) error {
return nil
}
})
c := &nodeConfig{
replica: replica,
@ -269,7 +272,7 @@ func TestFlushSegment(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
ibNode, err := newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c)
ibNode, err := newInsertBufferNode(ctx, flushChan, fm, newCache(), c)
require.NoError(t, err)
flushSegment(collMeta,
@ -291,7 +294,7 @@ func TestFlushSegment(t *testing.T) {
key := path.Join(Params.StatsBinlogRootPath, k)
_, values, _ := mockMinIO.LoadWithPrefix(key)
assert.Equal(t, len(values), 1)
}
}*/
func genCollectionMeta(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta {
sch := schemapb.CollectionSchema{
@ -376,11 +379,17 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
err = msFactory.SetParams(m)
assert.Nil(t, err)
flushUnit := []segmentFlushUnit{}
saveBinlog := func(fu *segmentFlushUnit) error {
flushUnit = append(flushUnit, *fu)
flushPacks := []*segmentFlushPack{}
memkv := memkv.NewMemoryKV()
wg := sync.WaitGroup{}
fm := NewRendezvousFlushManager(NewAllocatorFactory(), memkv, colRep, func(pack *segmentFlushPack) error {
flushPacks = append(flushPacks, pack)
colRep.listNewSegmentsStartPositions()
colRep.listSegmentsCheckPoints()
wg.Done()
return nil
}
})
flushChan := make(chan flushMsg, 100)
c := &nodeConfig{
@ -389,7 +398,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
iBNode, err := newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c)
iBNode, err := newInsertBufferNode(ctx, flushChan, fm, newCache(), c)
require.NoError(t, err)
// Auto flush number of rows set to 2
@ -426,11 +435,11 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
{1, 1, 100, 123, 0, 100},
{2, 1, 100, 123, 0, 100},
}
iBNode.Operate([]flowgraph.Msg{iMsg})
require.Equal(t, 2, len(colRep.newSegments))
require.Equal(t, 0, len(colRep.normalSegments))
assert.Equal(t, 0, len(flushUnit))
assert.Equal(t, 0, len(flushPacks))
for i, test := range beforeAutoFlushTests {
seg, ok := colRep.newSegments[UniqueID(i+1)]
@ -450,14 +459,23 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
iMsg = &inMsg
// Triger auto flush
iBNode.Operate([]flowgraph.Msg{iMsg})
output := iBNode.Operate([]flowgraph.Msg{iMsg})
fgm := output[0].(*flowGraphMsg)
wg.Add(len(fgm.segmentsToFlush))
t.Log("segments to flush", fgm.segmentsToFlush)
for _, im := range fgm.segmentsToFlush {
// send del done signal
fm.flushDelData(nil, im, fgm.endPositions[0])
}
wg.Wait()
require.Equal(t, 0, len(colRep.newSegments))
require.Equal(t, 3, len(colRep.normalSegments))
assert.Equal(t, 1, len(flushUnit))
assert.Equal(t, 3, len(flushUnit[0].checkPoint))
assert.Less(t, 0, len(flushUnit[0].field2Path))
assert.False(t, flushUnit[0].flushed)
assert.Equal(t, 1, len(flushPacks))
// assert.Equal(t, 3, len(flushUnit[0].checkPoint))
assert.Less(t, 0, len(flushPacks[0].insertLogs))
assert.False(t, flushPacks[0].flushed)
afterAutoFlushTests := []Test{
// segID, numOfRow, startTs, endTs, cp.numOfRow, cp.Ts
@ -475,11 +493,11 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
assert.Equal(t, test.expectedCpNumOfRows, seg.checkPoint.numRows)
assert.Equal(t, test.expectedCpPosTs, seg.checkPoint.pos.GetTimestamp())
assert.Equal(t, test.expectedCpNumOfRows, flushUnit[0].checkPoint[UniqueID(i+1)].numRows)
assert.Equal(t, test.expectedCpPosTs, flushUnit[0].checkPoint[UniqueID(i+1)].pos.Timestamp)
// assert.Equal(t, test.expectedCpNumOfRows, flushPacks[0].checkPoint[UniqueID(i+1)].numRows)
// assert.Equal(t, test.expectedCpPosTs, flushPacks[0].checkPoint[UniqueID(i+1)].pos.Timestamp)
if i == 1 {
assert.Equal(t, test.expectedSegID, flushUnit[0].segID)
assert.Equal(t, test.expectedSegID, flushPacks[0].segmentID)
// assert.Equal(t, int64(0), iBNode.insertBuffer.size(UniqueID(i+1)))
}
// else {
@ -491,75 +509,76 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
t.Run("Auto with manual flush", func(t *testing.T) {
t.Skipf("Skip, fix later")
for i := range inMsg.insertMessages {
inMsg.insertMessages[i].SegmentID = 1
}
/*
for i := range inMsg.insertMessages {
inMsg.insertMessages[i].SegmentID = 1
}
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 234}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 345}}
iBNode.Operate([]flowgraph.Msg{iMsg})
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 234}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 345}}
iBNode.Operate([]flowgraph.Msg{iMsg})
assert.Equal(t, len(flushUnit), 2)
assert.Equal(t, flushUnit[1].segID, int64(1))
assert.Equal(t, len(flushUnit[1].checkPoint), 3)
assert.Equal(t, flushUnit[1].checkPoint[1].numRows, int64(50+16000+100+32000))
assert.Equal(t, flushUnit[1].checkPoint[2].numRows, int64(100+32000))
assert.Equal(t, flushUnit[1].checkPoint[3].numRows, int64(0))
assert.Equal(t, flushUnit[1].checkPoint[1].pos.Timestamp, Timestamp(345))
assert.Equal(t, flushUnit[1].checkPoint[2].pos.Timestamp, Timestamp(234))
assert.Equal(t, flushUnit[1].checkPoint[3].pos.Timestamp, Timestamp(123))
assert.False(t, flushUnit[1].flushed)
assert.Greater(t, len(flushUnit[1].field2Path), 0)
// assert.Equal(t, len(iBNode.insertBuffer.insertData), 1)
// assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000))
assert.Equal(t, len(flushUnit), 2)
assert.Equal(t, flushUnit[1].segID, int64(1))
assert.Equal(t, len(flushUnit[1].checkPoint), 3)
assert.Equal(t, flushUnit[1].checkPoint[1].numRows, int64(50+16000+100+32000))
assert.Equal(t, flushUnit[1].checkPoint[2].numRows, int64(100+32000))
assert.Equal(t, flushUnit[1].checkPoint[3].numRows, int64(0))
assert.Equal(t, flushUnit[1].checkPoint[1].pos.Timestamp, Timestamp(345))
assert.Equal(t, flushUnit[1].checkPoint[2].pos.Timestamp, Timestamp(234))
assert.Equal(t, flushUnit[1].checkPoint[3].pos.Timestamp, Timestamp(123))
assert.False(t, flushUnit[1].flushed)
assert.Greater(t, len(flushUnit[1].field2Path), 0)
// assert.Equal(t, len(iBNode.insertBuffer.insertData), 1)
// assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000))
flushChan <- flushMsg{
msgID: 3,
timestamp: 456,
segmentID: UniqueID(1),
collectionID: UniqueID(1),
}
flushChan <- flushMsg{
msgID: 3,
timestamp: 456,
segmentID: UniqueID(1),
collectionID: UniqueID(1),
}
inMsg.insertMessages = []*msgstream.InsertMsg{}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 345}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 456}}
iBNode.Operate([]flowgraph.Msg{iMsg})
inMsg.insertMessages = []*msgstream.InsertMsg{}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 345}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 456}}
iBNode.Operate([]flowgraph.Msg{iMsg})
assert.Equal(t, len(flushUnit), 3)
assert.Equal(t, flushUnit[2].segID, int64(1))
assert.Equal(t, len(flushUnit[2].checkPoint), 3)
assert.Equal(t, flushUnit[2].checkPoint[1].numRows, int64(50+16000+100+32000))
assert.Equal(t, flushUnit[2].checkPoint[2].numRows, int64(100+32000))
assert.Equal(t, flushUnit[2].checkPoint[3].numRows, int64(0))
assert.Equal(t, flushUnit[2].checkPoint[1].pos.Timestamp, Timestamp(345))
assert.Equal(t, flushUnit[2].checkPoint[2].pos.Timestamp, Timestamp(234))
assert.Equal(t, flushUnit[2].checkPoint[3].pos.Timestamp, Timestamp(123))
assert.Equal(t, len(flushUnit[2].field2Path), 0)
assert.NotNil(t, flushUnit[2].field2Path)
assert.True(t, flushUnit[2].flushed)
// assert.Equal(t, len(iBNode.insertBuffer.insertData), 1)
// assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000))
assert.Equal(t, len(flushUnit), 3)
assert.Equal(t, flushUnit[2].segID, int64(1))
assert.Equal(t, len(flushUnit[2].checkPoint), 3)
assert.Equal(t, flushUnit[2].checkPoint[1].numRows, int64(50+16000+100+32000))
assert.Equal(t, flushUnit[2].checkPoint[2].numRows, int64(100+32000))
assert.Equal(t, flushUnit[2].checkPoint[3].numRows, int64(0))
assert.Equal(t, flushUnit[2].checkPoint[1].pos.Timestamp, Timestamp(345))
assert.Equal(t, flushUnit[2].checkPoint[2].pos.Timestamp, Timestamp(234))
assert.Equal(t, flushUnit[2].checkPoint[3].pos.Timestamp, Timestamp(123))
assert.Equal(t, len(flushUnit[2].field2Path), 0)
assert.NotNil(t, flushUnit[2].field2Path)
assert.True(t, flushUnit[2].flushed)
// assert.Equal(t, len(iBNode.insertBuffer.insertData), 1)
// assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000))
flushChan <- flushMsg{
msgID: 4,
timestamp: 567,
segmentID: UniqueID(3),
collectionID: UniqueID(3),
}
iBNode.Operate([]flowgraph.Msg{iMsg})
assert.Equal(t, len(flushUnit), 4)
assert.Equal(t, flushUnit[3].segID, int64(3))
assert.Equal(t, len(flushUnit[3].checkPoint), 2)
assert.Equal(t, flushUnit[3].checkPoint[3].numRows, int64(50+16000))
assert.Equal(t, flushUnit[3].checkPoint[2].numRows, int64(100+32000))
assert.Equal(t, flushUnit[3].checkPoint[3].pos.Timestamp, Timestamp(234))
assert.Equal(t, flushUnit[3].checkPoint[2].pos.Timestamp, Timestamp(234))
assert.Greater(t, len(flushUnit[3].field2Path), 0)
assert.NotNil(t, flushUnit[3].field2Path)
assert.True(t, flushUnit[3].flushed)
// assert.Equal(t, len(iBNode.insertBuffer.insertData), 0)
flushChan <- flushMsg{
msgID: 4,
timestamp: 567,
segmentID: UniqueID(3),
collectionID: UniqueID(3),
}
iBNode.Operate([]flowgraph.Msg{iMsg})
assert.Equal(t, len(flushUnit), 4)
assert.Equal(t, flushUnit[3].segID, int64(3))
assert.Equal(t, len(flushUnit[3].checkPoint), 2)
assert.Equal(t, flushUnit[3].checkPoint[3].numRows, int64(50+16000))
assert.Equal(t, flushUnit[3].checkPoint[2].numRows, int64(100+32000))
assert.Equal(t, flushUnit[3].checkPoint[3].pos.Timestamp, Timestamp(234))
assert.Equal(t, flushUnit[3].checkPoint[2].pos.Timestamp, Timestamp(234))
assert.Greater(t, len(flushUnit[3].field2Path), 0)
assert.NotNil(t, flushUnit[3].field2Path)
assert.True(t, flushUnit[3].flushed)
// assert.Equal(t, len(iBNode.insertBuffer.insertData), 0)
*/
})
}
@ -615,10 +634,11 @@ func TestInsertBufferNode_getCollMetaBySegID(t *testing.T) {
err = msFactory.SetParams(m)
assert.Nil(t, err)
saveBinlog := func(fu *segmentFlushUnit) error {
t.Log(fu)
memkv := memkv.NewMemoryKV()
fm := NewRendezvousFlushManager(&allocator{}, memkv, replica, func(*segmentFlushPack) error {
return nil
}
})
flushChan := make(chan flushMsg, 100)
c := &nodeConfig{
@ -627,7 +647,7 @@ func TestInsertBufferNode_getCollMetaBySegID(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
iBNode, err := newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c)
iBNode, err := newInsertBufferNode(ctx, flushChan, fm, newCache(), c)
require.NoError(t, err)
meta, err := iBNode.getCollMetabySegID(1, 101)
@ -675,10 +695,11 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
err = msFactory.SetParams(m)
assert.Nil(t, err)
saveBinlog := func(fu *segmentFlushUnit) error {
t.Log(fu)
memkv := memkv.NewMemoryKV()
fm := NewRendezvousFlushManager(&allocator{}, memkv, replica, func(*segmentFlushPack) error {
return nil
}
})
flushChan := make(chan flushMsg, 100)
c := &nodeConfig{
@ -687,7 +708,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
iBNode, err := newInsertBufferNode(ctx, flushChan, saveBinlog, newCache(), c)
iBNode, err := newInsertBufferNode(ctx, flushChan, fm, newCache(), c)
require.NoError(t, err)
inMsg := GenFlowGraphInsertMsg(insertChannelName)

View File

@ -17,27 +17,35 @@
package datanode
import (
"fmt"
"path"
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"go.uber.org/zap"
)
// flushManager defines a flush manager signature
type flushManager interface {
// notify flush manager insert buffer data
flushBufferData(data *BufferData, segmentID UniqueID, pos *internalpb.MsgPosition)
flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, pos *internalpb.MsgPosition) error
// notify flush manager del buffer data
flushDelData(data *DelDataBuf, segmentID UniqueID, pos *internalpb.MsgPosition)
flushDelData(data *DelDataBuf, segmentID UniqueID, pos *internalpb.MsgPosition) error
}
// segmentFlushPack contains result to save into meta
type segmentFlushPack struct {
segmentID UniqueID
insertLogs []string
statsLogs []string
deltaLogs []string
insertLogs map[UniqueID]string
statsLogs map[UniqueID]string
deltaLogs []*DelDataBuf
pos *internalpb.MsgPosition
flushed bool
}
// notifyMetaFunc notify meta to persistent flush result
@ -51,6 +59,7 @@ var _ flushManager = (*rendezvousFlushManager)(nil)
type orderFlushQueue struct {
sync.Once
segmentID UniqueID
// MsgID => flushTask
working sync.Map
notifyFunc notifyMetaFunc
@ -60,8 +69,9 @@ type orderFlushQueue struct {
}
// newOrderFlushQueue creates a orderFlushQueue
func newOrderFlushQueue(f notifyMetaFunc) *orderFlushQueue {
func newOrderFlushQueue(segID UniqueID, f notifyMetaFunc) *orderFlushQueue {
return &orderFlushQueue{
segmentID: segID,
notifyFunc: f,
}
}
@ -76,14 +86,13 @@ func (q *orderFlushQueue) init() {
}
func (q *orderFlushQueue) getFlushTaskRunner(pos *internalpb.MsgPosition) *flushTaskRunner {
actual, loaded := q.working.LoadOrStore(string(pos.MsgID), newFlushTaskRunner())
actual, loaded := q.working.LoadOrStore(string(pos.MsgID), newFlushTaskRunner(q.segmentID))
t := actual.(*flushTaskRunner)
if !loaded {
q.tailMut.Lock()
t.init(q.notifyFunc, func() {
q.working.Delete(string(pos.MsgID))
}, q.tailCh)
t.pos = pos
q.tailCh = t.finishSignal
q.tailMut.Unlock()
}
@ -91,19 +100,20 @@ func (q *orderFlushQueue) getFlushTaskRunner(pos *internalpb.MsgPosition) *flush
}
// enqueueInsertBuffer put insert buffer data into queue
func (q *orderFlushQueue) enqueueInsertFlush(task flushInsertTask, pos *internalpb.MsgPosition) {
q.getFlushTaskRunner(pos).runFlushInsert(task)
func (q *orderFlushQueue) enqueueInsertFlush(task flushInsertTask, binlogs, statslogs map[UniqueID]string, flushed bool, pos *internalpb.MsgPosition) {
q.getFlushTaskRunner(pos).runFlushInsert(task, binlogs, statslogs, flushed, pos)
}
// enqueueDelBuffer put delete buffer data into queue
func (q *orderFlushQueue) enqueueDelFlush(task flushDeleteTask, pos *internalpb.MsgPosition) {
q.getFlushTaskRunner(pos).runFlushDel(task)
func (q *orderFlushQueue) enqueueDelFlush(task flushDeleteTask, deltaLogs *DelDataBuf, pos *internalpb.MsgPosition) {
q.getFlushTaskRunner(pos).runFlushDel(task, deltaLogs)
}
// rendezvousFlushManager makes sure insert & del buf all flushed
type rendezvousFlushManager struct {
allocatorInterface
kv.BaseKV
Replica
// segment id => flush queue
dispatcher sync.Map
@ -112,7 +122,7 @@ type rendezvousFlushManager struct {
// getFlushQueue
func (m *rendezvousFlushManager) getFlushQueue(segmentID UniqueID) *orderFlushQueue {
actual, loaded := m.dispatcher.LoadOrStore(segmentID, newOrderFlushQueue(m.notifyFunc))
actual, loaded := m.dispatcher.LoadOrStore(segmentID, newOrderFlushQueue(segmentID, m.notifyFunc))
// all operation on dispatcher is private, assertion ok guaranteed
queue := actual.(*orderFlushQueue)
if !loaded {
@ -122,54 +132,182 @@ func (m *rendezvousFlushManager) getFlushQueue(segmentID UniqueID) *orderFlushQu
}
// notify flush manager insert buffer data
func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID UniqueID,
pos *internalpb.MsgPosition) {
func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool,
pos *internalpb.MsgPosition) error {
// empty flush
if data == nil || data.buffer == nil {
m.getFlushQueue(segmentID).enqueueInsertFlush(&flushBufferInsertTask{},
map[UniqueID]string{}, map[UniqueID]string{}, flushed, pos)
return nil
}
collID, partID, meta, err := m.getSegmentMeta(segmentID, pos)
if err != nil {
return err
}
// encode data and convert output data
inCodec := storage.NewInsertCodec(meta)
binLogs, statsBinlogs, err := inCodec.Serialize(partID, segmentID, data.buffer)
if err != nil {
return err
}
start, _, err := m.allocIDBatch(uint32(len(binLogs)))
if err != nil {
return err
}
field2Insert := make(map[UniqueID]string, len(binLogs))
kvs := make(map[string]string, len(binLogs))
paths := make([]string, 0, len(binLogs))
field2Logidx := make(map[UniqueID]UniqueID, len(binLogs))
for idx, blob := range binLogs {
fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
if err != nil {
log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
return err
}
logidx := start + int64(idx)
// no error raise if alloc=false
k, _ := m.genKey(false, collID, partID, segmentID, fieldID, logidx)
key := path.Join(Params.InsertBinlogRootPath, k)
paths = append(paths, key)
kvs[key] = string(blob.Value[:])
field2Insert[fieldID] = key
field2Logidx[fieldID] = logidx
}
field2Stats := make(map[UniqueID]string)
// write stats binlog
for _, blob := range statsBinlogs {
fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)
if err != nil {
log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))
return err
}
logidx := field2Logidx[fieldID]
// no error raise if alloc=false
k, _ := m.genKey(false, collID, partID, segmentID, fieldID, logidx)
key := path.Join(Params.StatsBinlogRootPath, k)
kvs[key] = string(blob.Value[:])
field2Stats[fieldID] = key
}
m.updateSegmentCheckPoint(segmentID)
m.getFlushQueue(segmentID).enqueueInsertFlush(&flushBufferInsertTask{
BaseKV: m.BaseKV,
allocatorInterface: m.allocatorInterface,
data: data,
}, pos)
BaseKV: m.BaseKV,
data: kvs,
}, field2Insert, field2Stats, flushed, pos)
return nil
}
// notify flush manager del buffer data
func (m *rendezvousFlushManager) flushDelData(data *DelDataBuf, segmentID UniqueID,
pos *internalpb.MsgPosition) {
pos *internalpb.MsgPosition) error {
// del signal with empty data
if data == nil || data.delData == nil {
m.getFlushQueue(segmentID).enqueueDelFlush(&flushBufferDeleteTask{}, nil, pos)
return nil
}
collID, partID, meta, err := m.getSegmentMeta(segmentID, pos)
if err != nil {
return err
}
delCodec := storage.NewDeleteCodec(&etcdpb.CollectionMeta{
ID: collID,
Schema: meta.GetSchema(),
})
blob, err := delCodec.Serialize(partID, segmentID, data.delData)
if err != nil {
return err
}
logID, err := m.allocID()
if err != nil {
log.Error("failed to alloc ID", zap.Error(err))
return err
}
blobKey, _ := m.genKey(false, collID, partID, segmentID, logID)
blobPath := path.Join(Params.DeleteBinlogRootPath, blobKey)
kvs := map[string]string{blobPath: string(blob.Value[:])}
log.Debug("delete blob path", zap.String("path", blobPath))
m.getFlushQueue(segmentID).enqueueDelFlush(&flushBufferDeleteTask{
BaseKV: m.BaseKV,
allocatorInterface: m.allocatorInterface,
data: data,
}, pos)
BaseKV: m.BaseKV,
data: kvs,
}, data, pos)
return nil
}
// fetch meta info for segment
func (m *rendezvousFlushManager) getSegmentMeta(segmentID UniqueID, pos *internalpb.MsgPosition) (UniqueID, UniqueID, *etcdpb.CollectionMeta, error) {
if !m.hasSegment(segmentID, true) {
return -1, -1, nil, fmt.Errorf("No such segment %d in the replica", segmentID)
}
// fetch meta information of segment
collID, partID, err := m.getCollectionAndPartitionID(segmentID)
if err != nil {
return -1, -1, nil, err
}
sch, err := m.getCollectionSchema(collID, pos.GetTimestamp())
if err != nil {
return -1, -1, nil, err
}
meta := &etcdpb.CollectionMeta{
ID: collID,
Schema: sch,
}
return collID, partID, meta, nil
}
type flushBufferInsertTask struct {
kv.BaseKV
allocatorInterface
data *BufferData
data map[string]string
}
// flushInsertData implements flushInsertTask
func (t *flushBufferInsertTask) flushInsertData() error {
//TODO implement
if t.BaseKV != nil && len(t.data) > 0 {
return t.MultiSave(t.data)
}
return nil
}
type flushBufferDeleteTask struct {
kv.BaseKV
allocatorInterface
data *DelDataBuf
data map[string]string
}
// flushDeleteData implements flushDeleteTask
func (t *flushBufferDeleteTask) flushDeleteData() error {
//TODO implement
if len(t.data) > 0 && t.BaseKV != nil {
return t.MultiSave(t.data)
}
return nil
}
// NewRendezvousFlushManager create rendezvousFlushManager with provided allocator and kv
func NewRendezvousFlushManager(allocator allocatorInterface, kv kv.BaseKV, f notifyMetaFunc) *rendezvousFlushManager {
func NewRendezvousFlushManager(allocator allocatorInterface, kv kv.BaseKV, replica Replica, f notifyMetaFunc) *rendezvousFlushManager {
return &rendezvousFlushManager{
allocatorInterface: allocator,
BaseKV: kv,
notifyFunc: f,
Replica: replica,
}
}

View File

@ -44,7 +44,7 @@ func TestOrderFlushQueue_Execute(t *testing.T) {
size := 1000
finish.Add(size)
q := newOrderFlushQueue(func(*segmentFlushPack) error {
q := newOrderFlushQueue(1, func(*segmentFlushPack) error {
counter.Inc()
finish.Done()
return nil
@ -62,13 +62,13 @@ func TestOrderFlushQueue_Execute(t *testing.T) {
wg.Add(2 * size)
for i := 0; i < size; i++ {
go func(id []byte) {
q.enqueueDelFlush(&emptyFlushTask{}, &internalpb.MsgPosition{
q.enqueueDelFlush(&emptyFlushTask{}, &DelDataBuf{}, &internalpb.MsgPosition{
MsgID: id,
})
wg.Done()
}(ids[i])
go func(id []byte) {
q.enqueueInsertFlush(&emptyFlushTask{}, &internalpb.MsgPosition{
q.enqueueInsertFlush(&emptyFlushTask{}, map[UniqueID]string{}, map[UniqueID]string{}, false, &internalpb.MsgPosition{
MsgID: id,
})
wg.Done()
@ -86,7 +86,7 @@ func TestOrderFlushQueue_Order(t *testing.T) {
size := 1000
finish.Add(size)
resultList := make([][]byte, 0, size)
q := newOrderFlushQueue(func(pack *segmentFlushPack) error {
q := newOrderFlushQueue(1, func(pack *segmentFlushPack) error {
counter.Inc()
resultList = append(resultList, pack.pos.MsgID)
finish.Done()
@ -104,10 +104,10 @@ func TestOrderFlushQueue_Order(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(size)
for i := 0; i < size; i++ {
q.enqueueDelFlush(&emptyFlushTask{}, &internalpb.MsgPosition{
q.enqueueDelFlush(&emptyFlushTask{}, &DelDataBuf{}, &internalpb.MsgPosition{
MsgID: ids[i],
})
q.enqueueInsertFlush(&emptyFlushTask{}, &internalpb.MsgPosition{
q.enqueueInsertFlush(&emptyFlushTask{}, map[UniqueID]string{}, map[UniqueID]string{}, false, &internalpb.MsgPosition{
MsgID: ids[i],
})
wg.Done()
@ -130,7 +130,7 @@ func TestRendezvousFlushManager(t *testing.T) {
var counter atomic.Int64
finish := sync.WaitGroup{}
finish.Add(size)
m := NewRendezvousFlushManager(&allocator{}, kv, func(pack *segmentFlushPack) error {
m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) error {
counter.Inc()
finish.Done()
return nil
@ -149,7 +149,7 @@ func TestRendezvousFlushManager(t *testing.T) {
m.flushDelData(nil, 1, &internalpb.MsgPosition{
MsgID: ids[i],
})
m.flushBufferData(nil, 1, &internalpb.MsgPosition{
m.flushBufferData(nil, 1, true, &internalpb.MsgPosition{
MsgID: ids[i],
})
wg.Done()
@ -160,3 +160,25 @@ func TestRendezvousFlushManager(t *testing.T) {
assert.EqualValues(t, size, counter.Load())
}
func TestRendezvousFlushManager_getSegmentMeta(t *testing.T) {
memkv := memkv.NewMemoryKV()
replica := newMockReplica()
fm := NewRendezvousFlushManager(NewAllocatorFactory(), memkv, replica, func(*segmentFlushPack) error {
return nil
})
// non exists segment
_, _, _, err := fm.getSegmentMeta(-1, &internalpb.MsgPosition{})
assert.Error(t, err)
replica.newSegments[-1] = &Segment{}
replica.newSegments[1] = &Segment{}
// injected get part/coll id error
_, _, _, err = fm.getSegmentMeta(-1, &internalpb.MsgPosition{})
assert.Error(t, err)
// injected get schema error
_, _, _, err = fm.getSegmentMeta(1, &internalpb.MsgPosition{})
assert.Error(t, err)
}

View File

@ -52,10 +52,11 @@ type flushTaskRunner struct {
finishSignal chan struct{}
segmentID UniqueID
insertLogs []string
statsLogs []string
deltaLogs []string
insertLogs map[UniqueID]string
statsLogs map[UniqueID]string
deltaLogs []*DelDataBuf
pos *internalpb.MsgPosition
flushed bool
}
// init initializes flushTaskRunner with provided actions and signal
@ -68,8 +69,12 @@ func (t *flushTaskRunner) init(f notifyMetaFunc, postFunc taskPostFunc, signal <
}
// runFlushInsert executei flush insert task with once and retry
func (t *flushTaskRunner) runFlushInsert(task flushInsertTask) {
func (t *flushTaskRunner) runFlushInsert(task flushInsertTask, binlogs, statslogs map[UniqueID]string, flushed bool, pos *internalpb.MsgPosition) {
t.insertOnce.Do(func() {
t.insertLogs = binlogs
t.statsLogs = statslogs
t.flushed = flushed
t.pos = pos
go func() {
err := errStart
for err != nil {
@ -81,8 +86,9 @@ func (t *flushTaskRunner) runFlushInsert(task flushInsertTask) {
}
// runFlushDel execute flush delete task with once and retry
func (t *flushTaskRunner) runFlushDel(task flushDeleteTask) {
func (t *flushTaskRunner) runFlushDel(task flushDeleteTask, deltaLogs *DelDataBuf) {
t.deleteOnce.Do(func() {
t.deltaLogs = []*DelDataBuf{deltaLogs}
go func() {
err := errStart
for err != nil {
@ -99,24 +105,35 @@ func (t *flushTaskRunner) waitFinish(notifyFunc notifyMetaFunc, postFunc taskPos
t.Wait()
// wait previous task done
<-t.startSignal
notifyFunc(&segmentFlushPack{
segmentID: t.segmentID,
insertLogs: t.insertLogs,
statsLogs: t.statsLogs,
deltaLogs: t.deltaLogs,
pos: t.pos,
})
pack := t.getFlushPack()
err := errStart
for err != nil {
err = notifyFunc(pack)
}
// notify next task
close(t.finishSignal)
postFunc()
}
func (t *flushTaskRunner) getFlushPack() *segmentFlushPack {
pack := &segmentFlushPack{
segmentID: t.segmentID,
insertLogs: t.insertLogs,
statsLogs: t.statsLogs,
pos: t.pos,
deltaLogs: t.deltaLogs,
flushed: t.flushed,
}
return pack
}
// newFlushTaskRunner create a usable task runner
func newFlushTaskRunner() *flushTaskRunner {
func newFlushTaskRunner(segmentID UniqueID) *flushTaskRunner {
t := &flushTaskRunner{
WaitGroup: sync.WaitGroup{},
segmentID: segmentID,
}
// insert & del
t.Add(2)

View File

@ -23,7 +23,7 @@ import (
)
func TestFlushTaskRunner(t *testing.T) {
task := newFlushTaskRunner()
task := newFlushTaskRunner(1)
signal := make(chan struct{})
saveFlag := false
@ -44,8 +44,8 @@ func TestFlushTaskRunner(t *testing.T) {
assert.False(t, saveFlag)
assert.False(t, nextFlag)
task.runFlushInsert(&emptyFlushTask{})
task.runFlushDel(&emptyFlushTask{})
task.runFlushInsert(&emptyFlushTask{}, nil, nil, false, nil)
task.runFlushDel(&emptyFlushTask{}, &DelDataBuf{})
assert.False(t, saveFlag)
assert.False(t, nextFlag)

View File

@ -559,6 +559,11 @@ func (replica *SegmentReplica) getSegmentStatisticsUpdates(segID UniqueID) (*int
return updates, nil
}
if seg, ok := replica.flushedSegments[segID]; ok {
updates.NumRows = seg.numRows
return updates, nil
}
return nil, fmt.Errorf("Error, there's no segment %v", segID)
}

View File

@ -441,7 +441,7 @@ func TestSegmentReplica_InterfaceMethod(te *testing.T) {
description: "input seg 200 in normalSegments with numRows 200"},
{isvalidCase: false, normalSegID: 200, inSegID: 201, inNumRows: 200,
description: "input seg 201 not in normalSegments with numRows 200"},
{isvalidCase: false, flushedSegID: 300, inSegID: 300, inNumRows: 300,
{isvalidCase: true, flushedSegID: 300, inSegID: 300, inNumRows: 300,
description: "input seg 300 in flushedSegments"},
{isvalidCase: false, flushedSegID: 300, inSegID: 301, inNumRows: 300,
description: "input seg 301 not in flushedSegments"},
@ -460,8 +460,10 @@ func TestSegmentReplica_InterfaceMethod(te *testing.T) {
if test.normalSegID != 0 {
sr.normalSegments[test.normalSegID] = &Segment{}
}
if test.flushedSegID != 0 {
sr.flushedSegments[test.flushedSegID] = &Segment{}
if test.flushedSegID != 0 { // not update flushed num rows
sr.flushedSegments[test.flushedSegID] = &Segment{
numRows: test.inNumRows,
}
}
sr.updateStatistics(test.inSegID, test.inNumRows)