fix compact task data race (#22378)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/22369/head
wei liu 2023-02-24 18:49:46 +08:00 committed by GitHub
parent e89ee8d547
commit 8175220019
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 25 additions and 23 deletions

View File

@ -60,7 +60,6 @@ func (c *compactionExecutor) toCompleteState(task compactor) {
// These two func are bounded for waitGroup
func (c *compactionExecutor) executeWithState(task compactor) {
task.start()
go c.executeTask(task)
}

View File

@ -18,7 +18,6 @@ package datanode
import (
"context"
"sync"
"testing"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -124,6 +123,7 @@ func newMockCompactor(isvalid bool) *mockCompactor {
ctx: ctx,
cancel: cancel,
isvalid: isvalid,
done: make(chan struct{}, 1),
}
}
@ -133,17 +133,13 @@ type mockCompactor struct {
isvalid bool
alwaysWorking bool
wg sync.WaitGroup
done chan struct{}
}
var _ compactor = (*mockCompactor)(nil)
func (mc *mockCompactor) start() {
mc.wg.Add(1)
}
func (mc *mockCompactor) complete() {
mc.wg.Done()
mc.done <- struct{}{}
}
func (mc *mockCompactor) compact() (*datapb.CompactionResult, error) {
@ -164,7 +160,7 @@ func (mc *mockCompactor) getPlanID() UniqueID {
func (mc *mockCompactor) stop() {
if mc.cancel != nil {
mc.cancel()
mc.wg.Wait()
<-mc.done
}
}

View File

@ -55,7 +55,6 @@ var (
type iterator = storage.Iterator
type compactor interface {
start()
complete()
compact() (*datapb.CompactionResult, error)
stop()
@ -80,7 +79,7 @@ type compactionTask struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
done chan struct{}
tr *timerecord.TimeRecorder
chunkManager storage.ChunkManager
}
@ -111,20 +110,17 @@ func newCompactionTask(
plan: plan,
tr: timerecord.NewTimeRecorder("compactionTask"),
chunkManager: chunkManager,
done: make(chan struct{}, 1),
}
}
func (t *compactionTask) start() {
t.wg.Add(1)
}
func (t *compactionTask) complete() {
t.wg.Done()
t.done <- struct{}{}
}
func (t *compactionTask) stop() {
t.cancel()
t.wg.Wait()
<-t.done
}
func (t *compactionTask) getPlanID() UniqueID {

View File

@ -56,6 +56,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
task := &compactionTask{
Channel: channel,
done: make(chan struct{}, 1),
}
_, _, _, err = task.getSegmentMeta(100)
@ -174,7 +175,9 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
}
for _, test := range tests {
task := &compactionTask{}
task := &compactionTask{
done: make(chan struct{}, 1),
}
t.Run(test.description, func(t *testing.T) {
if test.isvalid {
pk2ts, db, err := task.mergeDeltalogs(test.dBlobs, test.timetravel)
@ -250,7 +253,9 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
dBlobs[test.segIDC] = d
}
task := &compactionTask{}
task := &compactionTask{
done: make(chan struct{}, 1),
}
pk2ts, db, err := task.mergeDeltalogs(dBlobs, test.timetravel)
assert.NoError(t, err)
assert.Equal(t, test.expectedpk2ts, len(pk2ts))
@ -296,7 +301,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
1: 10000,
}
ct := &compactionTask{Channel: channel, downloader: mockbIO, uploader: mockbIO}
ct := &compactionTask{Channel: channel, downloader: mockbIO, uploader: mockbIO, done: make(chan struct{}, 1)}
inPaths, statsPaths, numOfRow, err := ct.merge(context.Background(), allPaths, 2, 0, meta, dm)
assert.NoError(t, err)
assert.Equal(t, int64(2), numOfRow)
@ -332,7 +337,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
dm := map[interface{}]Timestamp{}
ct := &compactionTask{Channel: channel, downloader: mockbIO, uploader: mockbIO}
ct := &compactionTask{Channel: channel, downloader: mockbIO, uploader: mockbIO, done: make(chan struct{}, 1)}
inPaths, statsPaths, numOfRow, err := ct.merge(context.Background(), allPaths, 2, 0, meta, dm)
assert.NoError(t, err)
assert.Equal(t, int64(2), numOfRow)
@ -375,6 +380,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
plan: &datapb.CompactionPlan{
CollectionTtl: 864000,
},
done: make(chan struct{}, 1),
}
inPaths, statsPaths, numOfRow, err := ct.merge(context.Background(), allPaths, 2, 0, meta, dm)
assert.NoError(t, err)
@ -409,7 +415,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
1: 10000,
}
ct := &compactionTask{Channel: channel, downloader: mockbIO, uploader: mockbIO}
ct := &compactionTask{Channel: channel, downloader: mockbIO, uploader: mockbIO, done: make(chan struct{}, 1)}
_, _, _, err = ct.merge(context.Background(), allPaths, 2, 0, &etcdpb.CollectionMeta{
Schema: &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
{DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{
@ -446,7 +452,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
1: 10000,
}
ct := &compactionTask{Channel: channel, downloader: mockbIO, uploader: mockbIO}
ct := &compactionTask{Channel: channel, downloader: mockbIO, uploader: mockbIO, done: make(chan struct{}, 1)}
_, _, _, err = ct.merge(context.Background(), allPaths, 2, 0, &etcdpb.CollectionMeta{
Schema: &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
@ -464,6 +470,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
plan: &datapb.CompactionPlan{
CollectionTtl: math.MaxInt64,
},
done: make(chan struct{}, 1),
}
res := ct.isExpiredEntity(0, genTimestamp())
@ -487,6 +494,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
plan: &datapb.CompactionPlan{
CollectionTtl: 0,
},
done: make(chan struct{}, 1),
}
res := ct.isExpiredEntity(0, genTimestamp())
assert.Equal(t, false, res)
@ -509,6 +517,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
plan: &datapb.CompactionPlan{
CollectionTtl: 864000,
},
done: make(chan struct{}, 1),
}
res := ct.isExpiredEntity(0, genTimestamp())
assert.Equal(t, true, res)
@ -571,6 +580,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
ctx: ctx,
cancel: cancel,
tr: timerecord.NewTimeRecorder("test"),
done: make(chan struct{}, 1),
}
plan := &datapb.CompactionPlan{
@ -592,6 +602,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
_, err = emptyTask.compact()
assert.Error(t, err)
emptyTask.complete()
emptyTask.stop()
})