Fix compaction ut datarace (#15267)

Fixes: #15241

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/15288/head
XuanYang-cn 2022-01-18 17:49:39 +08:00 committed by GitHub
parent e48b0fb5a8
commit 87394e29b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 57 additions and 20 deletions

View File

@ -59,13 +59,29 @@ func (c *compactionExecutor) execute(task compactor) {
c.taskCh <- task
}
func (c *compactionExecutor) toExecutingState(task compactor) {
task.start()
c.executing.Store(task.getPlanID(), task)
}
func (c *compactionExecutor) toCompleteState(task compactor) {
task.complete()
c.executing.Delete(task.getPlanID())
}
// These two func are bounded for waitGroup
func (c *compactionExecutor) executeWithState(task compactor) {
c.toExecutingState(task)
go c.executeTask(task)
}
func (c *compactionExecutor) start(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case task := <-c.taskCh:
go c.executeTask(task)
c.executeWithState(task)
}
}
}
@ -73,10 +89,10 @@ func (c *compactionExecutor) start(ctx context.Context) {
func (c *compactionExecutor) executeTask(task compactor) {
c.parallelCh <- struct{}{}
defer func() {
c.toCompleteState(task)
<-c.parallelCh
}()
c.executing.Store(task.getPlanID(), task)
log.Info("start to execute compaction", zap.Int64("planID", task.getPlanID()))
err := task.compact()
@ -87,7 +103,6 @@ func (c *compactionExecutor) executeTask(task compactor) {
)
}
c.executing.Delete(task.getPlanID())
log.Info("end to execute compaction", zap.Int64("planID", task.getPlanID()))
}

View File

@ -34,7 +34,7 @@ func TestCompactionExecutor(t *testing.T) {
t.Run("Test stopTask", func(t *testing.T) {
ex := newCompactionExecutor()
mc := newMockCompactor(true)
ex.executing.Store(UniqueID(1), mc)
ex.executeWithState(mc)
ex.stopTask(UniqueID(1))
})
@ -45,7 +45,7 @@ func TestCompactionExecutor(t *testing.T) {
go ex.start(ctx)
})
t.Run("Test excuteTask", func(t *testing.T) {
t.Run("Test executeTask", func(t *testing.T) {
tests := []struct {
isvalid bool
@ -59,9 +59,11 @@ func TestCompactionExecutor(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
if test.isvalid {
ex.executeTask(newMockCompactor(true))
validTask := newMockCompactor(true)
ex.executeWithState(validTask)
} else {
ex.executeTask(newMockCompactor(false))
invalidTask := newMockCompactor(false)
ex.executeWithState(invalidTask)
}
})
}
@ -98,10 +100,7 @@ func TestCompactionExecutor(t *testing.T) {
// wait for task enqueued
found := false
for !found {
ex.executing.Range(func(key, value interface{}) bool {
found = true
return true
})
_, found = ex.executing.Load(mc.getPlanID())
}
ex.stopExecutingtaskByVChannelName("mock")
@ -125,18 +124,25 @@ func newMockCompactor(isvalid bool) *mockCompactor {
}
type mockCompactor struct {
sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
isvalid bool
alwaysWorking bool
wg sync.WaitGroup
}
var _ compactor = (*mockCompactor)(nil)
func (mc *mockCompactor) start() {
mc.wg.Add(1)
}
func (mc *mockCompactor) complete() {
mc.wg.Done()
}
func (mc *mockCompactor) compact() error {
mc.Add(1)
defer mc.Done()
if !mc.isvalid {
return errStart
}
@ -154,7 +160,7 @@ func (mc *mockCompactor) getPlanID() UniqueID {
func (mc *mockCompactor) stop() {
if mc.cancel != nil {
mc.cancel()
mc.Wait()
mc.wg.Wait()
}
}

View File

@ -32,6 +32,8 @@ import (
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
@ -41,11 +43,14 @@ var (
errIllegalCompactionPlan = errors.New("compaction plan illegal")
errTransferType = errors.New("transfer intferface to type wrong")
errUnknownDataType = errors.New("unknown shema DataType")
errContext = errors.New("context done or timeout")
)
type iterator = storage.Iterator
type compactor interface {
start()
complete()
compact() error
stop()
getPlanID() UniqueID
@ -101,6 +106,14 @@ func newCompactionTask(
}
}
func (t *compactionTask) start() {
t.wg.Add(1)
}
func (t *compactionTask) complete() {
t.wg.Done()
}
func (t *compactionTask) stop() {
t.cancel()
t.wg.Wait()
@ -265,8 +278,11 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp,
}
func (t *compactionTask) compact() error {
t.wg.Add(1)
defer t.wg.Done()
if ok := funcutil.CheckCtxValid(t.ctx); !ok {
log.Error("compact wrong, task context done or timeout")
return errContext
}
ctxTimeout, cancelAll := context.WithTimeout(t.ctx, time.Duration(t.plan.GetTimeoutInSeconds())*time.Second)
defer cancelAll()

View File

@ -44,17 +44,17 @@ func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNode
// is virtual channel name, so we need to convert vchannel name into pchannel neme here.
pchannelName := rootcoord.ToPhysicalChannel(dmNodeConfig.vChannelName)
insertStream.AsConsumer([]string{pchannelName}, consumeSubName)
log.Debug("datanode AsConsumer", zap.String("physical channel", pchannelName), zap.String("subName", consumeSubName))
log.Debug("datanode AsConsumer", zap.String("physical channel", pchannelName), zap.String("subName", consumeSubName), zap.Int64("collection ID", dmNodeConfig.collectionID))
if seekPos != nil {
seekPos.ChannelName = pchannelName
start := time.Now()
log.Debug("datanode begin to seek", zap.String("Channel Name", seekPos.GetChannelName()))
log.Debug("datanode begin to seek", zap.String("physical channel", seekPos.GetChannelName()), zap.Int64("collection ID", dmNodeConfig.collectionID))
err = insertStream.Seek([]*internalpb.MsgPosition{seekPos})
if err != nil {
return nil, err
}
log.Debug("datanode Seek successfully", zap.String("Channel Name", seekPos.GetChannelName()), zap.Duration("elapse", time.Since(start)))
log.Debug("datanode seek successfully", zap.String("physical channel", seekPos.GetChannelName()), zap.Int64("collection ID", dmNodeConfig.collectionID), zap.Duration("elapse", time.Since(start)))
}
name := fmt.Sprintf("dmInputNode-%d-%s", dmNodeConfig.collectionID, dmNodeConfig.vChannelName)