enhance: Append drop partition msg to wal (#35326)

issue: https://github.com/milvus-io/milvus/issues/33285

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/35353/head
yihao.dai 2024-08-07 17:28:16 +08:00 committed by GitHub
parent 838f06323f
commit 72a175478f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 295 additions and 76 deletions

View File

@ -5,6 +5,9 @@ dir: 'internal/mocks/{{trimPrefix .PackagePath "github.com/milvus-io/milvus/inte
mockname: "Mock{{.InterfaceName}}"
outpkg: "mock_{{.PackageName}}"
packages:
github.com/milvus-io/milvus/internal/distributed/streaming:
interfaces:
WALAccesser:
github.com/milvus-io/milvus/internal/streamingcoord/server/balancer:
interfaces:
Balancer:

View File

@ -9,7 +9,11 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/util/options"
)
var singleton *walAccesserImpl = nil
var singleton WALAccesser = nil
func SetWAL(w WALAccesser) {
singleton = w
}
// Init initializes the wal accesser with the given etcd client.
// should be called before any other operations.
@ -19,8 +23,8 @@ func Init(c *clientv3.Client) {
// Release releases the resources of the wal accesser.
func Release() {
if singleton != nil {
singleton.Close()
if w, ok := singleton.(*walAccesserImpl); ok && w != nil {
w.Close()
}
}

View File

@ -0,0 +1,141 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package mock_streaming
import (
context "context"
message "github.com/milvus-io/milvus/pkg/streaming/util/message"
mock "github.com/stretchr/testify/mock"
streaming "github.com/milvus-io/milvus/internal/distributed/streaming"
)
// MockWALAccesser is an autogenerated mock type for the WALAccesser type
type MockWALAccesser struct {
mock.Mock
}
type MockWALAccesser_Expecter struct {
mock *mock.Mock
}
func (_m *MockWALAccesser) EXPECT() *MockWALAccesser_Expecter {
return &MockWALAccesser_Expecter{mock: &_m.Mock}
}
// Append provides a mock function with given fields: ctx, msgs
func (_m *MockWALAccesser) Append(ctx context.Context, msgs ...message.MutableMessage) streaming.AppendResponses {
_va := make([]interface{}, len(msgs))
for _i := range msgs {
_va[_i] = msgs[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 streaming.AppendResponses
if rf, ok := ret.Get(0).(func(context.Context, ...message.MutableMessage) streaming.AppendResponses); ok {
r0 = rf(ctx, msgs...)
} else {
r0 = ret.Get(0).(streaming.AppendResponses)
}
return r0
}
// MockWALAccesser_Append_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Append'
type MockWALAccesser_Append_Call struct {
*mock.Call
}
// Append is a helper method to define mock.On call
// - ctx context.Context
// - msgs ...message.MutableMessage
func (_e *MockWALAccesser_Expecter) Append(ctx interface{}, msgs ...interface{}) *MockWALAccesser_Append_Call {
return &MockWALAccesser_Append_Call{Call: _e.mock.On("Append",
append([]interface{}{ctx}, msgs...)...)}
}
func (_c *MockWALAccesser_Append_Call) Run(run func(ctx context.Context, msgs ...message.MutableMessage)) *MockWALAccesser_Append_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]message.MutableMessage, len(args)-1)
for i, a := range args[1:] {
if a != nil {
variadicArgs[i] = a.(message.MutableMessage)
}
}
run(args[0].(context.Context), variadicArgs...)
})
return _c
}
func (_c *MockWALAccesser_Append_Call) Return(_a0 streaming.AppendResponses) *MockWALAccesser_Append_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockWALAccesser_Append_Call) RunAndReturn(run func(context.Context, ...message.MutableMessage) streaming.AppendResponses) *MockWALAccesser_Append_Call {
_c.Call.Return(run)
return _c
}
// Read provides a mock function with given fields: ctx, opts
func (_m *MockWALAccesser) Read(ctx context.Context, opts streaming.ReadOption) streaming.Scanner {
ret := _m.Called(ctx, opts)
var r0 streaming.Scanner
if rf, ok := ret.Get(0).(func(context.Context, streaming.ReadOption) streaming.Scanner); ok {
r0 = rf(ctx, opts)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(streaming.Scanner)
}
}
return r0
}
// MockWALAccesser_Read_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Read'
type MockWALAccesser_Read_Call struct {
*mock.Call
}
// Read is a helper method to define mock.On call
// - ctx context.Context
// - opts streaming.ReadOption
func (_e *MockWALAccesser_Expecter) Read(ctx interface{}, opts interface{}) *MockWALAccesser_Read_Call {
return &MockWALAccesser_Read_Call{Call: _e.mock.On("Read", ctx, opts)}
}
func (_c *MockWALAccesser_Read_Call) Run(run func(ctx context.Context, opts streaming.ReadOption)) *MockWALAccesser_Read_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(streaming.ReadOption))
})
return _c
}
func (_c *MockWALAccesser_Read_Call) Return(_a0 streaming.Scanner) *MockWALAccesser_Read_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockWALAccesser_Read_Call) RunAndReturn(run func(context.Context, streaming.ReadOption) streaming.Scanner) *MockWALAccesser_Read_Call {
_c.Call.Return(run)
return _c
}
// NewMockWALAccesser creates a new instance of MockWALAccesser. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockWALAccesser(t interface {
mock.TestingT
Cleanup(func())
}) *MockWALAccesser {
mock := &MockWALAccesser{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -240,14 +240,14 @@ func Test_dropCollectionTask_Execute(t *testing.T) {
return true
}
gc := newMockGarbageCollector()
gc := mockrootcoord.NewGarbageCollector(t)
deleteCollectionCalled := false
deleteCollectionChan := make(chan struct{}, 1)
gc.GcCollectionDataFunc = func(ctx context.Context, coll *model.Collection) (Timestamp, error) {
gc.EXPECT().GcCollectionData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, coll *model.Collection) (Timestamp, error) {
deleteCollectionCalled = true
deleteCollectionChan <- struct{}{}
return 0, nil
}
})
core := newTestCore(
withValidProxyManager(),

View File

@ -89,6 +89,7 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error {
redoTask.AddAsyncStep(&deletePartitionDataStep{
baseStep: baseStep{core: t.core},
pchans: t.collMeta.PhysicalChannelNames,
vchans: t.collMeta.VirtualChannelNames,
partition: &model.Partition{
PartitionID: partID,
PartitionName: t.Req.GetPartitionName(),

View File

@ -174,15 +174,15 @@ func Test_dropPartitionTask_Execute(t *testing.T) {
return nil
})
gc := newMockGarbageCollector()
gc := mockrootcoord.NewGarbageCollector(t)
deletePartitionCalled := false
deletePartitionChan := make(chan struct{}, 1)
gc.GcPartitionDataFunc = func(ctx context.Context, pChannels []string, coll *model.Partition) (Timestamp, error) {
gc.EXPECT().GcPartitionData(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, pChannels, vchannel []string, coll *model.Partition) (Timestamp, error) {
deletePartitionChan <- struct{}{}
deletePartitionCalled = true
time.Sleep(confirmGCInterval)
return 0, nil
}
})
broker := newMockBroker()
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {

View File

@ -21,8 +21,11 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/util/streamingutil"
ms "github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
)
@ -30,10 +33,10 @@ import (
type GarbageCollector interface {
ReDropCollection(collMeta *model.Collection, ts Timestamp)
RemoveCreatingCollection(collMeta *model.Collection)
ReDropPartition(dbID int64, pChannels []string, partition *model.Partition, ts Timestamp)
ReDropPartition(dbID int64, pChannels, vchannels []string, partition *model.Partition, ts Timestamp)
RemoveCreatingPartition(dbID int64, partition *model.Partition, ts Timestamp)
GcCollectionData(ctx context.Context, coll *model.Collection) (ddlTs Timestamp, err error)
GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition) (ddlTs Timestamp, err error)
GcPartitionData(ctx context.Context, pChannels, vchannels []string, partition *model.Partition) (ddlTs Timestamp, err error)
}
type bgGarbageCollector struct {
@ -110,7 +113,7 @@ func (c *bgGarbageCollector) RemoveCreatingCollection(collMeta *model.Collection
_ = redo.Execute(context.Background())
}
func (c *bgGarbageCollector) ReDropPartition(dbID int64, pChannels []string, partition *model.Partition, ts Timestamp) {
func (c *bgGarbageCollector) ReDropPartition(dbID int64, pChannels, vchannels []string, partition *model.Partition, ts Timestamp) {
// TODO: remove this after data gc can be notified by rpc.
c.s.chanTimeTick.addDmlChannels(pChannels...)
@ -118,6 +121,7 @@ func (c *bgGarbageCollector) ReDropPartition(dbID int64, pChannels []string, par
redo.AddAsyncStep(&deletePartitionDataStep{
baseStep: baseStep{core: c.s},
pchans: pChannels,
vchans: vchannels,
partition: partition,
isSkip: !Params.CommonCfg.TTMsgEnabled.GetAsBool(),
})
@ -227,6 +231,41 @@ func (c *bgGarbageCollector) notifyPartitionGc(ctx context.Context, pChannels []
return ts, nil
}
func (c *bgGarbageCollector) notifyPartitionGcByStreamingService(ctx context.Context, vchannels []string, partition *model.Partition) (uint64, error) {
req := &msgpb.DropPartitionRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_DropPartition),
commonpbutil.WithTimeStamp(0), // ts is given by streamingnode.
commonpbutil.WithSourceID(c.s.session.ServerID),
),
PartitionName: partition.PartitionName,
CollectionID: partition.CollectionID,
PartitionID: partition.PartitionID,
}
msgs := make([]message.MutableMessage, 0, len(vchannels))
for _, vchannel := range vchannels {
msg, err := message.NewDropPartitionMessageBuilderV1().
WithVChannel(vchannel).
WithHeader(&message.DropPartitionMessageHeader{
CollectionId: partition.CollectionID,
PartitionId: partition.PartitionID,
}).
WithBody(req).
BuildMutable()
if err != nil {
return 0, err
}
msgs = append(msgs, msg)
}
resp := streaming.WAL().Append(ctx, msgs...)
if err := resp.IsAnyError(); err != nil {
return 0, err
}
// TODO: sheep, return resp.MaxTimeTick(), nil
return c.s.tsoAllocator.GenerateTSO(1)
}
func (c *bgGarbageCollector) GcCollectionData(ctx context.Context, coll *model.Collection) (ddlTs Timestamp, err error) {
c.s.ddlTsLockManager.Lock()
c.s.ddlTsLockManager.AddRefCnt(1)
@ -241,13 +280,17 @@ func (c *bgGarbageCollector) GcCollectionData(ctx context.Context, coll *model.C
return ddlTs, nil
}
func (c *bgGarbageCollector) GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition) (ddlTs Timestamp, err error) {
func (c *bgGarbageCollector) GcPartitionData(ctx context.Context, pChannels, vchannels []string, partition *model.Partition) (ddlTs Timestamp, err error) {
c.s.ddlTsLockManager.Lock()
c.s.ddlTsLockManager.AddRefCnt(1)
defer c.s.ddlTsLockManager.AddRefCnt(-1)
defer c.s.ddlTsLockManager.Unlock()
ddlTs, err = c.notifyPartitionGc(ctx, pChannels, partition)
if streamingutil.IsStreamingServiceEnabled() {
ddlTs, err = c.notifyPartitionGcByStreamingService(ctx, vchannels, partition)
} else {
ddlTs, err = c.notifyPartitionGc(ctx, pChannels, partition)
}
if err != nil {
return 0, err
}

View File

@ -26,11 +26,14 @@ import (
"github.com/stretchr/testify/mock"
"google.golang.org/grpc"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/mocks/distributed/mock_streaming"
"github.com/milvus-io/milvus/internal/proto/querypb"
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
mocktso "github.com/milvus-io/milvus/internal/tso/mocks"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
)
@ -353,7 +356,7 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) {
core.ddlTsLockManager = newDdlTsLockManager(core.tsoAllocator)
gc := newBgGarbageCollector(core)
core.garbageCollector = gc
gc.ReDropPartition(0, pchans, &model.Partition{}, 100000)
gc.ReDropPartition(0, pchans, nil, &model.Partition{}, 100000)
})
t.Run("failed to RemovePartition", func(t *testing.T) {
@ -393,7 +396,7 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) {
core.ddlTsLockManager = newDdlTsLockManager(core.tsoAllocator)
gc := newBgGarbageCollector(core)
core.garbageCollector = gc
gc.ReDropPartition(0, pchans, &model.Partition{}, 100000)
gc.ReDropPartition(0, pchans, nil, &model.Partition{}, 100000)
<-gcConfirmChan
assert.True(t, gcConfirmCalled)
<-removePartitionChan
@ -438,7 +441,7 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) {
core.ddlTsLockManager = newDdlTsLockManager(core.tsoAllocator)
gc := newBgGarbageCollector(core)
core.garbageCollector = gc
gc.ReDropPartition(0, pchans, &model.Partition{}, 100000)
gc.ReDropPartition(0, pchans, nil, &model.Partition{}, 100000)
<-gcConfirmChan
assert.True(t, gcConfirmCalled)
<-removePartitionChan
@ -536,3 +539,24 @@ func TestGarbageCollector_RemoveCreatingPartition(t *testing.T) {
<-signal
})
}
func TestGcPartitionData(t *testing.T) {
defer cleanTestEnv()
streamingutil.SetStreamingServiceEnabled()
defer streamingutil.UnsetStreamingServiceEnabled()
wal := mock_streaming.NewMockWALAccesser(t)
wal.EXPECT().Append(mock.Anything, mock.Anything, mock.Anything).Return(streaming.AppendResponses{})
streaming.SetWAL(wal)
tsoAllocator := mocktso.NewAllocator(t)
tsoAllocator.EXPECT().GenerateTSO(mock.Anything).Return(1000, nil)
core := newTestCore(withTsoAllocator(tsoAllocator))
gc := newBgGarbageCollector(core)
core.ddlTsLockManager = newDdlTsLockManager(tsoAllocator)
_, err := gc.GcPartitionData(context.Background(), nil, []string{"ch-0", "ch-1"}, &model.Partition{})
assert.NoError(t, err)
}

View File

@ -941,24 +941,6 @@ func withBroker(b Broker) Opt {
}
}
type mockGarbageCollector struct {
GarbageCollector
GcCollectionDataFunc func(ctx context.Context, coll *model.Collection) (Timestamp, error)
GcPartitionDataFunc func(ctx context.Context, pChannels []string, partition *model.Partition) (Timestamp, error)
}
func (m mockGarbageCollector) GcCollectionData(ctx context.Context, coll *model.Collection) (Timestamp, error) {
return m.GcCollectionDataFunc(ctx, coll)
}
func (m mockGarbageCollector) GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition) (Timestamp, error) {
return m.GcPartitionDataFunc(ctx, pChannels, partition)
}
func newMockGarbageCollector() *mockGarbageCollector {
return &mockGarbageCollector{}
}
func withGarbageCollector(gc GarbageCollector) Opt {
return func(c *Core) {
c.garbageCollector = gc

View File

@ -52,8 +52,8 @@ type GarbageCollector_GcCollectionData_Call struct {
}
// GcCollectionData is a helper method to define mock.On call
// - ctx context.Context
// - coll *model.Collection
// - ctx context.Context
// - coll *model.Collection
func (_e *GarbageCollector_Expecter) GcCollectionData(ctx interface{}, coll interface{}) *GarbageCollector_GcCollectionData_Call {
return &GarbageCollector_GcCollectionData_Call{Call: _e.mock.On("GcCollectionData", ctx, coll)}
}
@ -75,23 +75,23 @@ func (_c *GarbageCollector_GcCollectionData_Call) RunAndReturn(run func(context.
return _c
}
// GcPartitionData provides a mock function with given fields: ctx, pChannels, partition
func (_m *GarbageCollector) GcPartitionData(ctx context.Context, pChannels []string, partition *model.Partition) (uint64, error) {
ret := _m.Called(ctx, pChannels, partition)
// GcPartitionData provides a mock function with given fields: ctx, pChannels, vchannels, partition
func (_m *GarbageCollector) GcPartitionData(ctx context.Context, pChannels []string, vchannels []string, partition *model.Partition) (uint64, error) {
ret := _m.Called(ctx, pChannels, vchannels, partition)
var r0 uint64
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, []string, *model.Partition) (uint64, error)); ok {
return rf(ctx, pChannels, partition)
if rf, ok := ret.Get(0).(func(context.Context, []string, []string, *model.Partition) (uint64, error)); ok {
return rf(ctx, pChannels, vchannels, partition)
}
if rf, ok := ret.Get(0).(func(context.Context, []string, *model.Partition) uint64); ok {
r0 = rf(ctx, pChannels, partition)
if rf, ok := ret.Get(0).(func(context.Context, []string, []string, *model.Partition) uint64); ok {
r0 = rf(ctx, pChannels, vchannels, partition)
} else {
r0 = ret.Get(0).(uint64)
}
if rf, ok := ret.Get(1).(func(context.Context, []string, *model.Partition) error); ok {
r1 = rf(ctx, pChannels, partition)
if rf, ok := ret.Get(1).(func(context.Context, []string, []string, *model.Partition) error); ok {
r1 = rf(ctx, pChannels, vchannels, partition)
} else {
r1 = ret.Error(1)
}
@ -105,16 +105,17 @@ type GarbageCollector_GcPartitionData_Call struct {
}
// GcPartitionData is a helper method to define mock.On call
// - ctx context.Context
// - pChannels []string
// - partition *model.Partition
func (_e *GarbageCollector_Expecter) GcPartitionData(ctx interface{}, pChannels interface{}, partition interface{}) *GarbageCollector_GcPartitionData_Call {
return &GarbageCollector_GcPartitionData_Call{Call: _e.mock.On("GcPartitionData", ctx, pChannels, partition)}
// - ctx context.Context
// - pChannels []string
// - vchannels []string
// - partition *model.Partition
func (_e *GarbageCollector_Expecter) GcPartitionData(ctx interface{}, pChannels interface{}, vchannels interface{}, partition interface{}) *GarbageCollector_GcPartitionData_Call {
return &GarbageCollector_GcPartitionData_Call{Call: _e.mock.On("GcPartitionData", ctx, pChannels, vchannels, partition)}
}
func (_c *GarbageCollector_GcPartitionData_Call) Run(run func(ctx context.Context, pChannels []string, partition *model.Partition)) *GarbageCollector_GcPartitionData_Call {
func (_c *GarbageCollector_GcPartitionData_Call) Run(run func(ctx context.Context, pChannels []string, vchannels []string, partition *model.Partition)) *GarbageCollector_GcPartitionData_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].([]string), args[2].(*model.Partition))
run(args[0].(context.Context), args[1].([]string), args[2].([]string), args[3].(*model.Partition))
})
return _c
}
@ -124,7 +125,7 @@ func (_c *GarbageCollector_GcPartitionData_Call) Return(ddlTs uint64, err error)
return _c
}
func (_c *GarbageCollector_GcPartitionData_Call) RunAndReturn(run func(context.Context, []string, *model.Partition) (uint64, error)) *GarbageCollector_GcPartitionData_Call {
func (_c *GarbageCollector_GcPartitionData_Call) RunAndReturn(run func(context.Context, []string, []string, *model.Partition) (uint64, error)) *GarbageCollector_GcPartitionData_Call {
_c.Call.Return(run)
return _c
}
@ -140,8 +141,8 @@ type GarbageCollector_ReDropCollection_Call struct {
}
// ReDropCollection is a helper method to define mock.On call
// - collMeta *model.Collection
// - ts uint64
// - collMeta *model.Collection
// - ts uint64
func (_e *GarbageCollector_Expecter) ReDropCollection(collMeta interface{}, ts interface{}) *GarbageCollector_ReDropCollection_Call {
return &GarbageCollector_ReDropCollection_Call{Call: _e.mock.On("ReDropCollection", collMeta, ts)}
}
@ -163,9 +164,9 @@ func (_c *GarbageCollector_ReDropCollection_Call) RunAndReturn(run func(*model.C
return _c
}
// ReDropPartition provides a mock function with given fields: dbID, pChannels, partition, ts
func (_m *GarbageCollector) ReDropPartition(dbID int64, pChannels []string, partition *model.Partition, ts uint64) {
_m.Called(dbID, pChannels, partition, ts)
// ReDropPartition provides a mock function with given fields: dbID, pChannels, vchannels, partition, ts
func (_m *GarbageCollector) ReDropPartition(dbID int64, pChannels []string, vchannels []string, partition *model.Partition, ts uint64) {
_m.Called(dbID, pChannels, vchannels, partition, ts)
}
// GarbageCollector_ReDropPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReDropPartition'
@ -174,17 +175,18 @@ type GarbageCollector_ReDropPartition_Call struct {
}
// ReDropPartition is a helper method to define mock.On call
// - dbID int64
// - pChannels []string
// - partition *model.Partition
// - ts uint64
func (_e *GarbageCollector_Expecter) ReDropPartition(dbID interface{}, pChannels interface{}, partition interface{}, ts interface{}) *GarbageCollector_ReDropPartition_Call {
return &GarbageCollector_ReDropPartition_Call{Call: _e.mock.On("ReDropPartition", dbID, pChannels, partition, ts)}
// - dbID int64
// - pChannels []string
// - vchannels []string
// - partition *model.Partition
// - ts uint64
func (_e *GarbageCollector_Expecter) ReDropPartition(dbID interface{}, pChannels interface{}, vchannels interface{}, partition interface{}, ts interface{}) *GarbageCollector_ReDropPartition_Call {
return &GarbageCollector_ReDropPartition_Call{Call: _e.mock.On("ReDropPartition", dbID, pChannels, vchannels, partition, ts)}
}
func (_c *GarbageCollector_ReDropPartition_Call) Run(run func(dbID int64, pChannels []string, partition *model.Partition, ts uint64)) *GarbageCollector_ReDropPartition_Call {
func (_c *GarbageCollector_ReDropPartition_Call) Run(run func(dbID int64, pChannels []string, vchannels []string, partition *model.Partition, ts uint64)) *GarbageCollector_ReDropPartition_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].([]string), args[2].(*model.Partition), args[3].(uint64))
run(args[0].(int64), args[1].([]string), args[2].([]string), args[3].(*model.Partition), args[4].(uint64))
})
return _c
}
@ -194,7 +196,7 @@ func (_c *GarbageCollector_ReDropPartition_Call) Return() *GarbageCollector_ReDr
return _c
}
func (_c *GarbageCollector_ReDropPartition_Call) RunAndReturn(run func(int64, []string, *model.Partition, uint64)) *GarbageCollector_ReDropPartition_Call {
func (_c *GarbageCollector_ReDropPartition_Call) RunAndReturn(run func(int64, []string, []string, *model.Partition, uint64)) *GarbageCollector_ReDropPartition_Call {
_c.Call.Return(run)
return _c
}
@ -210,7 +212,7 @@ type GarbageCollector_RemoveCreatingCollection_Call struct {
}
// RemoveCreatingCollection is a helper method to define mock.On call
// - collMeta *model.Collection
// - collMeta *model.Collection
func (_e *GarbageCollector_Expecter) RemoveCreatingCollection(collMeta interface{}) *GarbageCollector_RemoveCreatingCollection_Call {
return &GarbageCollector_RemoveCreatingCollection_Call{Call: _e.mock.On("RemoveCreatingCollection", collMeta)}
}
@ -243,9 +245,9 @@ type GarbageCollector_RemoveCreatingPartition_Call struct {
}
// RemoveCreatingPartition is a helper method to define mock.On call
// - dbID int64
// - partition *model.Partition
// - ts uint64
// - dbID int64
// - partition *model.Partition
// - ts uint64
func (_e *GarbageCollector_Expecter) RemoveCreatingPartition(dbID interface{}, partition interface{}, ts interface{}) *GarbageCollector_RemoveCreatingPartition_Call {
return &GarbageCollector_RemoveCreatingPartition_Call{Call: _e.mock.On("RemoveCreatingPartition", dbID, partition, ts)}
}

View File

@ -655,7 +655,7 @@ func (c *Core) restore(ctx context.Context) error {
for _, part := range coll.Partitions {
switch part.State {
case pb.PartitionState_PartitionDropping:
go c.garbageCollector.ReDropPartition(coll.DBID, coll.PhysicalChannelNames, part.Clone(), ts)
go c.garbageCollector.ReDropPartition(coll.DBID, coll.PhysicalChannelNames, coll.VirtualChannelNames, part.Clone(), ts)
case pb.PartitionState_PartitionCreating:
go c.garbageCollector.RemoveCreatingPartition(coll.DBID, part.Clone(), ts)
default:

View File

@ -1980,7 +1980,7 @@ func (s *RootCoordSuite) TestRestore() {
gc := mockrootcoord.NewGarbageCollector(s.T())
finishCh := make(chan struct{}, 4)
gc.EXPECT().ReDropPartition(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Once().
gc.EXPECT().ReDropPartition(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Once().
Run(func(args mock.Arguments) {
finishCh <- struct{}{}
})

View File

@ -263,6 +263,7 @@ func (s *waitForTsSyncedStep) Weight() stepPriority {
type deletePartitionDataStep struct {
baseStep
pchans []string
vchans []string
partition *model.Partition
isSkip bool
@ -272,7 +273,7 @@ func (s *deletePartitionDataStep) Execute(ctx context.Context) ([]nestedStep, er
if s.isSkip {
return nil, nil
}
_, err := s.core.garbageCollector.GcPartitionData(ctx, s.pchans, s.partition)
_, err := s.core.garbageCollector.GcPartitionData(ctx, s.pchans, s.vchans, s.partition)
return nil, err
}

View File

@ -2,10 +2,28 @@ package streamingutil
import "os"
const MilvusStreamingServiceEnabled = "MILVUS_STREAMING_SERVICE_ENABLED"
// IsStreamingServiceEnabled returns whether the streaming service is enabled.
func IsStreamingServiceEnabled() bool {
// TODO: check if the environment variable MILVUS_STREAMING_SERVICE_ENABLED is set
return os.Getenv("MILVUS_STREAMING_SERVICE_ENABLED") == "1"
return os.Getenv(MilvusStreamingServiceEnabled) == "1"
}
// SetStreamingServiceEnabled sets the env that indicates whether the streaming service is enabled.
func SetStreamingServiceEnabled() {
err := os.Setenv(MilvusStreamingServiceEnabled, "1")
if err != nil {
panic(err)
}
}
// UnsetStreamingServiceEnabled unsets the env that indicates whether the streaming service is enabled.
func UnsetStreamingServiceEnabled() {
err := os.Setenv(MilvusStreamingServiceEnabled, "0")
if err != nil {
panic(err)
}
}
// MustEnableStreamingService panics if the streaming service is not enabled.