mirror of https://github.com/milvus-io/milvus.git
Fix creating partition is not atomic (#23823)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/23930/head
parent
240c5625cd
commit
f1cc31ceee
|
@ -77,26 +77,35 @@ func (t *createPartitionTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
|
||||
undoTask := newBaseUndoTask(t.core.stepExecutor)
|
||||
|
||||
undoTask.AddStep(&expireCacheStep{
|
||||
baseStep: baseStep{core: t.core},
|
||||
collectionNames: []string{t.collMeta.Name},
|
||||
collectionID: t.collMeta.CollectionID,
|
||||
ts: t.GetTs(),
|
||||
}, &nullStep{})
|
||||
|
||||
undoTask.AddStep(&addPartitionMetaStep{
|
||||
baseStep: baseStep{core: t.core},
|
||||
partition: partition,
|
||||
}, &nullStep{}) // adding partition is atomic enough.
|
||||
}, &removePartitionMetaStep{
|
||||
baseStep: baseStep{core: t.core},
|
||||
collectionID: partition.CollectionID,
|
||||
partitionID: partition.PartitionID,
|
||||
ts: t.GetTs(),
|
||||
})
|
||||
|
||||
undoTask.AddStep(&nullStep{}, &releasePartitionsStep{
|
||||
baseStep: baseStep{core: t.core},
|
||||
collectionID: t.collMeta.CollectionID,
|
||||
partitionIDs: []int64{partID},
|
||||
})
|
||||
|
||||
undoTask.AddStep(&syncNewCreatedPartitionStep{
|
||||
baseStep: baseStep{core: t.core},
|
||||
collectionID: t.collMeta.CollectionID,
|
||||
partitionID: partID,
|
||||
}, &releasePartitionsStep{
|
||||
baseStep: baseStep{core: t.core},
|
||||
collectionID: t.collMeta.CollectionID,
|
||||
partitionIDs: []int64{partID},
|
||||
})
|
||||
}, &nullStep{})
|
||||
|
||||
undoTask.AddStep(&changePartitionStateStep{
|
||||
baseStep: baseStep{core: t.core},
|
||||
|
|
|
@ -26,11 +26,12 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||
)
|
||||
|
||||
//go:generate mockery --name=GarbageCollector --outpkg=mockrootcoord
|
||||
//go:generate mockery --name=GarbageCollector --outpkg=mockrootcoord --filename=garbage_collector.go --with-expecter --testonly
|
||||
type GarbageCollector interface {
|
||||
ReDropCollection(collMeta *model.Collection, ts Timestamp)
|
||||
RemoveCreatingCollection(collMeta *model.Collection)
|
||||
ReDropPartition(pChannels []string, partition *model.Partition, ts Timestamp)
|
||||
RemoveCreatingPartition(partition *model.Partition, ts Timestamp)
|
||||
GcCollectionData(ctx context.Context, coll *model.Collection) (ddlTs Timestamp, err error)
|
||||
GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition) (ddlTs Timestamp, err error)
|
||||
}
|
||||
|
@ -147,6 +148,26 @@ func (c *bgGarbageCollector) ReDropPartition(pChannels []string, partition *mode
|
|||
_ = redo.Execute(context.Background())
|
||||
}
|
||||
|
||||
func (c *bgGarbageCollector) RemoveCreatingPartition(partition *model.Partition, ts Timestamp) {
|
||||
redoTask := newBaseRedoTask(c.s.stepExecutor)
|
||||
|
||||
redoTask.AddAsyncStep(&releasePartitionsStep{
|
||||
baseStep: baseStep{core: c.s},
|
||||
collectionID: partition.CollectionID,
|
||||
partitionIDs: []int64{partition.PartitionID},
|
||||
})
|
||||
|
||||
redoTask.AddAsyncStep(&removePartitionMetaStep{
|
||||
baseStep: baseStep{core: c.s},
|
||||
collectionID: partition.CollectionID,
|
||||
partitionID: partition.PartitionID,
|
||||
ts: ts,
|
||||
})
|
||||
|
||||
// err is ignored since no sync steps will be executed.
|
||||
_ = redoTask.Execute(context.Background())
|
||||
}
|
||||
|
||||
func (c *bgGarbageCollector) notifyCollectionGc(ctx context.Context, coll *model.Collection) (ddlTs Timestamp, err error) {
|
||||
ts, err := c.s.tsoAllocator.GenerateTSO(1)
|
||||
if err != nil {
|
||||
|
|
|
@ -22,13 +22,16 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
|
||||
mocktso "github.com/milvus-io/milvus/internal/tso/mocks"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
)
|
||||
|
||||
func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
|
||||
|
@ -391,3 +394,94 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) {
|
|||
assert.True(t, removePartitionCalled)
|
||||
})
|
||||
}
|
||||
|
||||
func TestGarbageCollector_RemoveCreatingPartition(t *testing.T) {
|
||||
t.Run("test normal", func(t *testing.T) {
|
||||
defer cleanTestEnv()
|
||||
|
||||
ticker := newTickerWithMockNormalStream()
|
||||
tsoAllocator := mocktso.NewAllocator(t)
|
||||
|
||||
signal := make(chan struct{}, 1)
|
||||
meta := mockrootcoord.NewIMetaTable(t)
|
||||
meta.EXPECT().RemovePartition(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||
Return(nil).
|
||||
Run(func(ctx context.Context, collectionID int64, partitionID int64, ts uint64) {
|
||||
signal <- struct{}{}
|
||||
})
|
||||
|
||||
qc := types.NewMockQueryCoord(t)
|
||||
qc.EXPECT().ReleasePartitions(mock.Anything, mock.Anything).Return(merr.Status(nil), nil)
|
||||
|
||||
core := newTestCore(withTtSynchronizer(ticker),
|
||||
withMeta(meta),
|
||||
withTsoAllocator(tsoAllocator),
|
||||
withQueryCoord(qc))
|
||||
gc := newBgGarbageCollector(core)
|
||||
core.ddlTsLockManager = newDdlTsLockManager(tsoAllocator)
|
||||
core.garbageCollector = gc
|
||||
core.broker = newServerBroker(core)
|
||||
|
||||
gc.RemoveCreatingPartition(&model.Partition{}, 0)
|
||||
<-signal
|
||||
})
|
||||
|
||||
t.Run("test ReleasePartitions failed", func(t *testing.T) {
|
||||
defer cleanTestEnv()
|
||||
|
||||
ticker := newTickerWithMockNormalStream()
|
||||
tsoAllocator := mocktso.NewAllocator(t)
|
||||
|
||||
signal := make(chan struct{}, 1)
|
||||
meta := mockrootcoord.NewIMetaTable(t)
|
||||
|
||||
qc := types.NewMockQueryCoord(t)
|
||||
qc.EXPECT().ReleasePartitions(mock.Anything, mock.Anything).
|
||||
Return(merr.Status(nil), fmt.Errorf("mock err")).
|
||||
Run(func(ctx context.Context, req *querypb.ReleasePartitionsRequest) {
|
||||
signal <- struct{}{}
|
||||
})
|
||||
|
||||
core := newTestCore(withTtSynchronizer(ticker),
|
||||
withMeta(meta),
|
||||
withTsoAllocator(tsoAllocator),
|
||||
withQueryCoord(qc))
|
||||
gc := newBgGarbageCollector(core)
|
||||
core.ddlTsLockManager = newDdlTsLockManager(tsoAllocator)
|
||||
core.garbageCollector = gc
|
||||
core.broker = newServerBroker(core)
|
||||
|
||||
gc.RemoveCreatingPartition(&model.Partition{}, 0)
|
||||
<-signal
|
||||
})
|
||||
|
||||
t.Run("test RemovePartition failed", func(t *testing.T) {
|
||||
defer cleanTestEnv()
|
||||
|
||||
ticker := newTickerWithMockNormalStream()
|
||||
tsoAllocator := mocktso.NewAllocator(t)
|
||||
|
||||
signal := make(chan struct{}, 1)
|
||||
meta := mockrootcoord.NewIMetaTable(t)
|
||||
meta.EXPECT().RemovePartition(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||
Return(fmt.Errorf("mock err")).
|
||||
Run(func(ctx context.Context, collectionID int64, partitionID int64, ts uint64) {
|
||||
signal <- struct{}{}
|
||||
})
|
||||
|
||||
qc := types.NewMockQueryCoord(t)
|
||||
qc.EXPECT().ReleasePartitions(mock.Anything, mock.Anything).Return(merr.Status(nil), nil)
|
||||
|
||||
core := newTestCore(withTtSynchronizer(ticker),
|
||||
withMeta(meta),
|
||||
withTsoAllocator(tsoAllocator),
|
||||
withQueryCoord(qc))
|
||||
gc := newBgGarbageCollector(core)
|
||||
core.ddlTsLockManager = newDdlTsLockManager(tsoAllocator)
|
||||
core.garbageCollector = gc
|
||||
core.broker = newServerBroker(core)
|
||||
|
||||
gc.RemoveCreatingPartition(&model.Partition{}, 0)
|
||||
<-signal
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
// Code generated by mockery v2.16.0. DO NOT EDIT.
|
||||
// Code generated by mockery v2.14.0. DO NOT EDIT.
|
||||
|
||||
package mockrootcoord
|
||||
|
||||
|
@ -200,6 +200,35 @@ func (_c *GarbageCollector_RemoveCreatingCollection_Call) Return() *GarbageColle
|
|||
return _c
|
||||
}
|
||||
|
||||
// RemoveCreatingPartition provides a mock function with given fields: partition, ts
|
||||
func (_m *GarbageCollector) RemoveCreatingPartition(partition *model.Partition, ts uint64) {
|
||||
_m.Called(partition, ts)
|
||||
}
|
||||
|
||||
// GarbageCollector_RemoveCreatingPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveCreatingPartition'
|
||||
type GarbageCollector_RemoveCreatingPartition_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// RemoveCreatingPartition is a helper method to define mock.On call
|
||||
// - partition *model.Partition
|
||||
// - ts uint64
|
||||
func (_e *GarbageCollector_Expecter) RemoveCreatingPartition(partition interface{}, ts interface{}) *GarbageCollector_RemoveCreatingPartition_Call {
|
||||
return &GarbageCollector_RemoveCreatingPartition_Call{Call: _e.mock.On("RemoveCreatingPartition", partition, ts)}
|
||||
}
|
||||
|
||||
func (_c *GarbageCollector_RemoveCreatingPartition_Call) Run(run func(partition *model.Partition, ts uint64)) *GarbageCollector_RemoveCreatingPartition_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(*model.Partition), args[1].(uint64))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *GarbageCollector_RemoveCreatingPartition_Call) Return() *GarbageCollector_RemoveCreatingPartition_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
type mockConstructorTestingTNewGarbageCollector interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
|
|
|
@ -598,6 +598,8 @@ func (c *Core) restore(ctx context.Context) error {
|
|||
switch part.State {
|
||||
case pb.PartitionState_PartitionDropping:
|
||||
go c.garbageCollector.ReDropPartition(coll.PhysicalChannelNames, part.Clone(), ts)
|
||||
case pb.PartitionState_PartitionCreating:
|
||||
go c.garbageCollector.RemoveCreatingPartition(part.Clone(), ts)
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue