Apply retry for flush manager execution (#11808)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/11823/head
congqixia 2021-11-15 17:19:10 +08:00 committed by GitHub
parent b0054f1b48
commit 579416e3d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 230 additions and 114 deletions

View File

@ -19,12 +19,10 @@ package datanode
import (
"context"
"errors"
"fmt"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/flowgraph"
@ -145,76 +143,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
}
// initialize flush manager for DataSync Service
dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, dsService.blobKV, dsService.replica, func(pack *segmentFlushPack) error {
fieldInsert := []*datapb.FieldBinlog{}
fieldStats := []*datapb.FieldBinlog{}
deltaInfos := []*datapb.DeltaLogInfo{}
checkPoints := []*datapb.CheckPoint{}
for k, v := range pack.insertLogs {
fieldInsert = append(fieldInsert, &datapb.FieldBinlog{FieldID: k, Binlogs: []string{v}})
}
for k, v := range pack.statsLogs {
fieldStats = append(fieldStats, &datapb.FieldBinlog{FieldID: k, Binlogs: []string{v}})
}
for _, delData := range pack.deltaLogs {
deltaInfos = append(deltaInfos, &datapb.DeltaLogInfo{RecordEntries: uint64(delData.size), TimestampFrom: delData.tsFrom, TimestampTo: delData.tsTo, DeltaLogPath: delData.filePath, DeltaLogSize: delData.fileSize})
}
// only current segment checkpoint info,
updates, _ := dsService.replica.getSegmentStatisticsUpdates(pack.segmentID)
checkPoints = append(checkPoints, &datapb.CheckPoint{
SegmentID: pack.segmentID,
NumOfRows: updates.GetNumRows(),
Position: pack.pos,
})
startPos := dsService.replica.listNewSegmentsStartPositions()
log.Debug("SaveBinlogPath",
zap.Int64("SegmentID", pack.segmentID),
zap.Int64("CollectionID", dsService.collectionID),
zap.Bool("IsFlushed", pack.flushed),
zap.Bool("IsDropped", pack.dropped),
zap.Int("Length of Field2BinlogPaths", len(fieldInsert)),
zap.Int("Length of Field2Stats", len(fieldStats)),
zap.Int("Length of Field2Deltalogs", len(deltaInfos)),
zap.Any("Listed start positions", startPos),
)
req := &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
MsgType: 0, //TODO msg type
MsgID: 0, //TODO msg id
Timestamp: 0, //TODO time stamp
SourceID: Params.NodeID,
},
SegmentID: pack.segmentID,
CollectionID: dsService.collectionID,
Field2BinlogPaths: fieldInsert,
Field2StatslogPaths: fieldStats,
Deltalogs: deltaInfos,
CheckPoints: checkPoints,
StartPositions: startPos,
Flushed: pack.flushed,
Dropped: pack.dropped,
}
rsp, err := dsService.dataCoord.SaveBinlogPaths(context.Background(), req)
if err != nil {
log.Warn(err.Error())
return fmt.Errorf(err.Error())
}
if rsp.ErrorCode != commonpb.ErrorCode_Success {
return fmt.Errorf("data service save bin log path failed, reason = %s", rsp.Reason)
}
if pack.flushed || pack.dropped {
dsService.replica.segmentFlushed(pack.segmentID)
}
dsService.flushingSegCache.Remove(req.GetSegmentID())
return nil
})
dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, dsService.blobKV, dsService.replica, flushNotifyFunc(dsService))
// recover segment checkpoints
for _, us := range vchanInfo.GetUnflushedSegments() {

View File

@ -205,9 +205,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
)
replica := genMockReplica(segIDs, pks, chanName)
kv := memkv.NewMemoryKV()
fm := NewRendezvousFlushManager(NewAllocatorFactory(), kv, replica, func(*segmentFlushPack) error {
return nil
})
fm := NewRendezvousFlushManager(NewAllocatorFactory(), kv, replica, func(*segmentFlushPack) {})
t.Run("Test get segment by primary keys", func(te *testing.T) {
c := &nodeConfig{
replica: replica,

View File

@ -83,9 +83,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
memkv := memkv.NewMemoryKV()
fm := NewRendezvousFlushManager(&allocator{}, memkv, replica, func(*segmentFlushPack) error {
return nil
})
fm := NewRendezvousFlushManager(&allocator{}, memkv, replica, func(*segmentFlushPack) {})
flushChan := make(chan flushMsg, 100)
@ -182,9 +180,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
memkv := memkv.NewMemoryKV()
fm := NewRendezvousFlushManager(NewAllocatorFactory(), memkv, replica, func(*segmentFlushPack) error {
return nil
})
fm := NewRendezvousFlushManager(NewAllocatorFactory(), memkv, replica, func(*segmentFlushPack) {})
flushChan := make(chan flushMsg, 100)
c := &nodeConfig{
@ -389,7 +385,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
memkv := memkv.NewMemoryKV()
wg := sync.WaitGroup{}
fm := NewRendezvousFlushManager(NewAllocatorFactory(), memkv, colRep, func(pack *segmentFlushPack) error {
fm := NewRendezvousFlushManager(NewAllocatorFactory(), memkv, colRep, func(pack *segmentFlushPack) {
fpMut.Lock()
flushPacks = append(flushPacks, pack)
fpMut.Unlock()
@ -399,7 +395,6 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
colRep.segmentFlushed(pack.segmentID)
}
wg.Done()
return nil
})
flushChan := make(chan flushMsg, 100)
@ -661,9 +656,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
memkv := memkv.NewMemoryKV()
fm := NewRendezvousFlushManager(&allocator{}, memkv, replica, func(*segmentFlushPack) error {
return nil
})
fm := NewRendezvousFlushManager(&allocator{}, memkv, replica, func(*segmentFlushPack) {})
flushChan := make(chan flushMsg, 100)
c := &nodeConfig{

View File

@ -17,6 +17,7 @@
package datanode
import (
"context"
"fmt"
"path"
"strconv"
@ -24,9 +25,12 @@ import (
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/retry"
"go.uber.org/zap"
)
@ -49,10 +53,11 @@ type segmentFlushPack struct {
pos *internalpb.MsgPosition
flushed bool
dropped bool
err error // task execution error, if not nil, notify func should stop datanode
}
// notifyMetaFunc notify meta to persistent flush result
type notifyMetaFunc func(*segmentFlushPack) error
type notifyMetaFunc func(*segmentFlushPack)
// taskPostFunc clean up function after single flush task done
type taskPostFunc func(pack *segmentFlushPack, postInjection postInjectionFunc)
@ -404,3 +409,85 @@ func NewRendezvousFlushManager(allocator allocatorInterface, kv kv.BaseKV, repli
Replica: replica,
}
}
func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMetaFunc {
return func(pack *segmentFlushPack) {
if pack.err != nil {
log.Warn("flush pack with error, data node quit now")
// TODO silverxia change to graceful stop datanode
panic(pack.err)
}
fieldInsert := []*datapb.FieldBinlog{}
fieldStats := []*datapb.FieldBinlog{}
deltaInfos := []*datapb.DeltaLogInfo{}
checkPoints := []*datapb.CheckPoint{}
for k, v := range pack.insertLogs {
fieldInsert = append(fieldInsert, &datapb.FieldBinlog{FieldID: k, Binlogs: []string{v}})
}
for k, v := range pack.statsLogs {
fieldStats = append(fieldStats, &datapb.FieldBinlog{FieldID: k, Binlogs: []string{v}})
}
for _, delData := range pack.deltaLogs {
deltaInfos = append(deltaInfos, &datapb.DeltaLogInfo{RecordEntries: uint64(delData.size), TimestampFrom: delData.tsFrom, TimestampTo: delData.tsTo, DeltaLogPath: delData.filePath, DeltaLogSize: delData.fileSize})
}
// only current segment checkpoint info,
updates, _ := dsService.replica.getSegmentStatisticsUpdates(pack.segmentID)
checkPoints = append(checkPoints, &datapb.CheckPoint{
SegmentID: pack.segmentID,
NumOfRows: updates.GetNumRows(),
Position: pack.pos,
})
log.Debug("SaveBinlogPath",
zap.Int64("SegmentID", pack.segmentID),
zap.Int64("CollectionID", dsService.collectionID),
zap.Int("Length of Field2BinlogPaths", len(fieldInsert)),
zap.Int("Length of Field2Stats", len(fieldStats)),
zap.Int("Length of Field2Deltalogs", len(deltaInfos)),
)
req := &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
MsgType: 0, //TODO msg type
MsgID: 0, //TODO msg id
Timestamp: 0, //TODO time stamp
SourceID: Params.NodeID,
},
SegmentID: pack.segmentID,
CollectionID: dsService.collectionID,
Field2BinlogPaths: fieldInsert,
Field2StatslogPaths: fieldStats,
Deltalogs: deltaInfos,
CheckPoints: checkPoints,
StartPositions: dsService.replica.listNewSegmentsStartPositions(),
Flushed: pack.flushed,
Dropped: pack.dropped,
}
err := retry.Do(context.Background(), func() error {
rsp, err := dsService.dataCoord.SaveBinlogPaths(context.Background(), req)
// should be network issue, return error and retry
if err != nil {
return fmt.Errorf(err.Error())
}
// TODO should retry only when datacoord status is unhealthy
if rsp.ErrorCode != commonpb.ErrorCode_Success {
return fmt.Errorf("data service save bin log path failed, reason = %s", rsp.Reason)
}
return nil
}, opts...)
if err != nil {
log.Warn("failed to SaveBinlogPaths", zap.Error(err))
// TODO change to graceful stop
panic(err)
}
if pack.flushed || pack.dropped {
dsService.replica.segmentFlushed(pack.segmentID)
}
dsService.flushingSegCache.Remove(req.GetSegmentID())
}
}

View File

@ -17,12 +17,15 @@
package datanode
import (
"context"
"crypto/rand"
"errors"
"sync"
"testing"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
@ -38,16 +41,25 @@ func (t *emptyFlushTask) flushDeleteData() error {
return nil
}
type errFlushTask struct{}
func (t *errFlushTask) flushInsertData() error {
return errors.New("mocked error")
}
func (t *errFlushTask) flushDeleteData() error {
return errors.New("mocked error")
}
func TestOrderFlushQueue_Execute(t *testing.T) {
counter := atomic.Int64{}
finish := sync.WaitGroup{}
size := 1000
finish.Add(size)
q := newOrderFlushQueue(1, func(*segmentFlushPack) error {
q := newOrderFlushQueue(1, func(*segmentFlushPack) {
counter.Inc()
finish.Done()
return nil
})
q.init()
@ -86,11 +98,10 @@ func TestOrderFlushQueue_Order(t *testing.T) {
size := 1000
finish.Add(size)
resultList := make([][]byte, 0, size)
q := newOrderFlushQueue(1, func(pack *segmentFlushPack) error {
q := newOrderFlushQueue(1, func(pack *segmentFlushPack) {
counter.Inc()
resultList = append(resultList, pack.pos.MsgID)
finish.Done()
return nil
})
q.init()
@ -130,10 +141,9 @@ func TestRendezvousFlushManager(t *testing.T) {
var counter atomic.Int64
finish := sync.WaitGroup{}
finish.Add(size)
m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) error {
m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) {
counter.Inc()
finish.Done()
return nil
})
ids := make([][]byte, 0, size)
@ -168,11 +178,10 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
finish := sync.WaitGroup{}
finish.Add(size)
packs := make([]*segmentFlushPack, 0, size+1)
m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) error {
m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) {
packs = append(packs, pack)
counter.Inc()
finish.Done()
return nil
})
injected := make(chan struct{})
@ -253,8 +262,7 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
func TestRendezvousFlushManager_getSegmentMeta(t *testing.T) {
memkv := memkv.NewMemoryKV()
replica := newMockReplica()
fm := NewRendezvousFlushManager(NewAllocatorFactory(), memkv, replica, func(*segmentFlushPack) error {
return nil
fm := NewRendezvousFlushManager(NewAllocatorFactory(), memkv, replica, func(*segmentFlushPack) {
})
// non exists segment
@ -271,3 +279,56 @@ func TestRendezvousFlushManager_getSegmentMeta(t *testing.T) {
_, _, _, err = fm.getSegmentMeta(1, &internalpb.MsgPosition{})
assert.Error(t, err)
}
func TestFlushNotifyFunc(t *testing.T) {
// replica :=
// rcf := &RootCoordFactory{}
ctx := context.Background()
rcf := &RootCoordFactory{}
replica, err := newReplica(ctx, rcf, 1)
require.NoError(t, err)
dataCoord := &DataCoordFactory{}
flushingCache := newCache()
dsService := &dataSyncService{
collectionID: 1,
replica: replica,
dataCoord: dataCoord,
flushingSegCache: flushingCache,
}
notifyFunc := flushNotifyFunc(dsService, retry.Attempts(1))
t.Run("normal run", func(t *testing.T) {
assert.NotPanics(t, func() {
notifyFunc(&segmentFlushPack{
insertLogs: map[UniqueID]string{1: "/dev/test/id"},
statsLogs: map[UniqueID]string{1: "/dev/test/id-stats"},
deltaLogs: []*DelDataBuf{{filePath: "/dev/test/del"}},
flushed: true,
})
})
})
t.Run("pack has error", func(t *testing.T) {
assert.Panics(t, func() {
notifyFunc(&segmentFlushPack{
err: errors.New("mocked pack error"),
})
})
})
t.Run("datacoord Save fails", func(t *testing.T) {
dataCoord.SaveBinlogPathNotSuccess = true
assert.Panics(t, func() {
notifyFunc(&segmentFlushPack{})
})
})
t.Run("datacoord call error", func(t *testing.T) {
dataCoord.SaveBinlogPathError = true
assert.Panics(t, func() {
notifyFunc(&segmentFlushPack{})
})
})
}

View File

@ -17,11 +17,15 @@
package datanode
import (
"context"
"errors"
"sync"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/retry"
"go.uber.org/zap"
)
// errStart used for retry start
@ -59,6 +63,9 @@ type flushTaskRunner struct {
pos *internalpb.MsgPosition
flushed bool
dropped bool
insertErr error // task execution error
deleteErr error // task execution error
}
type taskInjection struct {
@ -77,7 +84,8 @@ func (t *flushTaskRunner) init(f notifyMetaFunc, postFunc taskPostFunc, signal <
}
// runFlushInsert executei flush insert task with once and retry
func (t *flushTaskRunner) runFlushInsert(task flushInsertTask, binlogs, statslogs map[UniqueID]string, flushed bool, dropped bool, pos *internalpb.MsgPosition) {
func (t *flushTaskRunner) runFlushInsert(task flushInsertTask,
binlogs, statslogs map[UniqueID]string, flushed bool, dropped bool, pos *internalpb.MsgPosition, opts ...retry.Option) {
t.insertOnce.Do(func() {
t.insertLogs = binlogs
t.statsLogs = statslogs
@ -85,9 +93,11 @@ func (t *flushTaskRunner) runFlushInsert(task flushInsertTask, binlogs, statslog
t.pos = pos
t.dropped = dropped
go func() {
err := errStart
for err != nil {
err = task.flushInsertData()
err := retry.Do(context.Background(), func() error {
return task.flushInsertData()
}, opts...)
if err != nil {
t.insertErr = err
}
t.Done()
}()
@ -95,7 +105,7 @@ func (t *flushTaskRunner) runFlushInsert(task flushInsertTask, binlogs, statslog
}
// runFlushDel execute flush delete task with once and retry
func (t *flushTaskRunner) runFlushDel(task flushDeleteTask, deltaLogs *DelDataBuf) {
func (t *flushTaskRunner) runFlushDel(task flushDeleteTask, deltaLogs *DelDataBuf, opts ...retry.Option) {
t.deleteOnce.Do(func() {
if deltaLogs == nil {
t.deltaLogs = []*DelDataBuf{}
@ -103,9 +113,11 @@ func (t *flushTaskRunner) runFlushDel(task flushDeleteTask, deltaLogs *DelDataBu
t.deltaLogs = []*DelDataBuf{deltaLogs}
}
go func() {
err := errStart
for err != nil {
err = task.flushDeleteData()
err := retry.Do(context.Background(), func() error {
return task.flushDeleteData()
}, opts...)
if err != nil {
t.deleteErr = err
}
t.Done()
}()
@ -135,10 +147,7 @@ func (t *flushTaskRunner) waitFinish(notifyFunc notifyMetaFunc, postFunc taskPos
postFunc(pack, postInjection)
// execution done, dequeue and make count --
err := errStart
for err != nil {
err = notifyFunc(pack)
}
notifyFunc(pack)
// notify next task
close(t.finishSignal)
@ -154,6 +163,10 @@ func (t *flushTaskRunner) getFlushPack() *segmentFlushPack {
flushed: t.flushed,
dropped: t.dropped,
}
if t.insertErr != nil || t.deleteErr != nil {
log.Warn("flush task error detected", zap.Error(t.insertErr), zap.Error(t.deleteErr))
pack.err = errors.New("execution failed")
}
return pack
}

View File

@ -19,6 +19,7 @@ package datanode
import (
"testing"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/stretchr/testify/assert"
)
@ -30,9 +31,8 @@ func TestFlushTaskRunner(t *testing.T) {
nextFlag := false
processed := make(chan struct{})
task.init(func(*segmentFlushPack) error {
task.init(func(*segmentFlushPack) {
saveFlag = true
return nil
}, func(pack *segmentFlushPack, i postInjectionFunc) {}, signal)
go func() {
@ -57,6 +57,43 @@ func TestFlushTaskRunner(t *testing.T) {
assert.True(t, nextFlag)
}
func TestFlushTaskRunner_FailError(t *testing.T) {
task := newFlushTaskRunner(1, nil)
signal := make(chan struct{})
errFlag := false
nextFlag := false
processed := make(chan struct{})
task.init(func(pack *segmentFlushPack) {
if pack.err != nil {
errFlag = true
}
}, func(pack *segmentFlushPack, i postInjectionFunc) {}, signal)
go func() {
<-task.finishSignal
nextFlag = true
processed <- struct{}{}
}()
assert.False(t, errFlag)
assert.False(t, nextFlag)
task.runFlushInsert(&errFlushTask{}, nil, nil, false, false, nil, retry.Attempts(1))
task.runFlushDel(&errFlushTask{}, &DelDataBuf{}, retry.Attempts(1))
assert.False(t, errFlag)
assert.False(t, nextFlag)
close(signal)
<-processed
assert.True(t, errFlag)
assert.True(t, nextFlag)
}
func TestFlushTaskRunner_Injection(t *testing.T) {
injectCh := make(chan taskInjection, 1)
task := newFlushTaskRunner(1, injectCh)
@ -83,10 +120,9 @@ func TestFlushTaskRunner_Injection(t *testing.T) {
injectOver <- true
}()
task.init(func(pack *segmentFlushPack) error {
task.init(func(pack *segmentFlushPack) {
assert.EqualValues(t, 2, pack.segmentID)
saveFlag = true
return nil
}, func(pack *segmentFlushPack, i postInjectionFunc) {
if i != nil {
i(pack)
@ -113,5 +149,4 @@ func TestFlushTaskRunner_Injection(t *testing.T) {
assert.True(t, saveFlag)
assert.True(t, nextFlag)
}