Remove collection in segment manager when dropping collection

Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/4973/head^2
sunby 2021-01-06 19:23:31 +08:00 committed by yefu.chen
parent 8715cd1f0c
commit bf3a6dc3c6
7 changed files with 100 additions and 29 deletions

View File

@ -19,7 +19,8 @@ type createCollectionTask struct {
type dropCollectionTask struct {
baseTask
req *internalpb.DropCollectionRequest
req *internalpb.DropCollectionRequest
segManager SegmentManager
}
type hasCollectionTask struct {
@ -163,6 +164,11 @@ func (t *dropCollectionTask) Execute() error {
return err
}
// before drop collection in segment manager, if segment manager receive a time tick from write node,
// maybe this collection can not be found in meta table.
if err = t.segManager.DropCollection(collectionID); err != nil {
return err
}
ts, err := t.Ts()
if err != nil {
return err

View File

@ -50,6 +50,7 @@ func (s *Master) DropCollection(ctx context.Context, in *internalpb.DropCollecti
mt: s.metaTable,
cv: make(chan error),
},
segManager: s.segmentManager,
}
response := &commonpb.Status{

View File

@ -12,7 +12,7 @@ type createIndexTask struct {
req *internalpb.CreateIndexRequest
indexBuildScheduler *IndexBuildScheduler
indexLoadScheduler *IndexLoadScheduler
segManager *SegmentManager
segManager SegmentManager
}
func (task *createIndexTask) Type() internalpb.MsgType {

View File

@ -63,7 +63,7 @@ type Master struct {
startCallbacks []func()
closeCallbacks []func()
segmentManager *SegmentManager
segmentManager SegmentManager
segmentAssigner *SegmentAssigner
statProcessor *StatsProcessor
segmentStatusMsg ms.MsgStream

View File

@ -130,6 +130,7 @@ func TestMaster_Scheduler_Collection(t *testing.T) {
mt: meta,
cv: make(chan error),
},
segManager: NewMockSegmentManager(),
}
err = scheduler.Enqueue(dropCollectionTask)

View File

@ -31,7 +31,16 @@ type channelRange struct {
channelStart int32
channelEnd int32
}
type SegmentManager struct {
type SegmentManager interface {
Start()
Close()
AssignSegment(segIDReq []*internalpb.SegIDRequest) ([]*internalpb.SegIDAssignment, error)
ForceClose(collID UniqueID) error
DropCollection(collID UniqueID) error
}
type SegmentManagerImpl struct {
metaTable *metaTable
channelRanges []*channelRange
collStatus map[UniqueID]*collectionStatus // collection id to collection status
@ -54,7 +63,7 @@ type SegmentManager struct {
waitGroup sync.WaitGroup
}
func (manager *SegmentManager) AssignSegment(segIDReq []*internalpb.SegIDRequest) ([]*internalpb.SegIDAssignment, error) {
func (manager *SegmentManagerImpl) AssignSegment(segIDReq []*internalpb.SegIDRequest) ([]*internalpb.SegIDAssignment, error) {
manager.mu.Lock()
defer manager.mu.Unlock()
@ -97,7 +106,7 @@ func (manager *SegmentManager) AssignSegment(segIDReq []*internalpb.SegIDRequest
return res, nil
}
func (manager *SegmentManager) assignSegment(
func (manager *SegmentManagerImpl) assignSegment(
collName string,
collID UniqueID,
partitionTag string,
@ -180,7 +189,7 @@ func (manager *SegmentManager) assignSegment(
}, nil
}
func (manager *SegmentManager) isMatch(segmentID UniqueID, partitionTag string, channelID int32) (bool, error) {
func (manager *SegmentManagerImpl) isMatch(segmentID UniqueID, partitionTag string, channelID int32) (bool, error) {
segMeta, err := manager.metaTable.GetSegmentByID(segmentID)
if err != nil {
return false, err
@ -193,7 +202,7 @@ func (manager *SegmentManager) isMatch(segmentID UniqueID, partitionTag string,
return true, nil
}
func (manager *SegmentManager) estimateTotalRows(collName string) (int, error) {
func (manager *SegmentManagerImpl) estimateTotalRows(collName string) (int, error) {
collMeta, err := manager.metaTable.GetCollectionByName(collName)
if err != nil {
return -1, err
@ -205,7 +214,7 @@ func (manager *SegmentManager) estimateTotalRows(collName string) (int, error) {
return int(manager.segmentThreshold / float64(sizePerRecord)), nil
}
func (manager *SegmentManager) openNewSegment(channelID int32, collID UniqueID, partitionTag string, numRows int) (UniqueID, error) {
func (manager *SegmentManagerImpl) openNewSegment(channelID int32, collID UniqueID, partitionTag string, numRows int) (UniqueID, error) {
// find the channel range
channelStart, channelEnd := int32(-1), int32(-1)
for _, r := range manager.channelRanges {
@ -259,17 +268,17 @@ func (manager *SegmentManager) openNewSegment(channelID int32, collID UniqueID,
return newID, nil
}
func (manager *SegmentManager) Start() {
func (manager *SegmentManagerImpl) Start() {
manager.waitGroup.Add(1)
go manager.startWriteNodeTimeSync()
}
func (manager *SegmentManager) Close() {
func (manager *SegmentManagerImpl) Close() {
manager.cancel()
manager.waitGroup.Wait()
}
func (manager *SegmentManager) startWriteNodeTimeSync() {
func (manager *SegmentManagerImpl) startWriteNodeTimeSync() {
defer manager.waitGroup.Done()
for {
select {
@ -284,7 +293,7 @@ func (manager *SegmentManager) startWriteNodeTimeSync() {
}
}
func (manager *SegmentManager) syncWriteNodeTimestamp(timeTick Timestamp) error {
func (manager *SegmentManagerImpl) syncWriteNodeTimestamp(timeTick Timestamp) error {
manager.mu.Lock()
defer manager.mu.Unlock()
for _, status := range manager.collStatus {
@ -292,7 +301,8 @@ func (manager *SegmentManager) syncWriteNodeTimestamp(timeTick Timestamp) error
if !segStatus.closable {
closable, err := manager.judgeSegmentClosable(segStatus)
if err != nil {
return err
log.Printf("check segment closable error: %s", err.Error())
continue
}
segStatus.closable = closable
if !segStatus.closable {
@ -310,16 +320,20 @@ func (manager *SegmentManager) syncWriteNodeTimestamp(timeTick Timestamp) error
status.segments = append(status.segments[:i], status.segments[i+1:]...)
ts, err := manager.globalTSOAllocator()
if err != nil {
return err
log.Printf("allocate tso error: %s", err.Error())
continue
}
if err = manager.metaTable.CloseSegment(segStatus.segmentID, ts); err != nil {
return err
log.Printf("meta table close segment error: %s", err.Error())
continue
}
if err = manager.assigner.CloseSegment(segStatus.segmentID); err != nil {
return err
log.Printf("assigner close segment error: %s", err.Error())
continue
}
if err = manager.flushScheduler.Enqueue(segStatus.segmentID); err != nil {
return err
log.Printf("flush scheduler enqueue error: %s", err.Error())
continue
}
}
}
@ -327,7 +341,7 @@ func (manager *SegmentManager) syncWriteNodeTimestamp(timeTick Timestamp) error
return nil
}
func (manager *SegmentManager) judgeSegmentClosable(status *segmentStatus) (bool, error) {
func (manager *SegmentManagerImpl) judgeSegmentClosable(status *segmentStatus) (bool, error) {
segMeta, err := manager.metaTable.GetSegmentByID(status.segmentID)
if err != nil {
return false, err
@ -339,7 +353,7 @@ func (manager *SegmentManager) judgeSegmentClosable(status *segmentStatus) (bool
return false, nil
}
func (manager *SegmentManager) initChannelRanges() error {
func (manager *SegmentManagerImpl) initChannelRanges() error {
div, rem := manager.numOfChannels/manager.numOfQueryNodes, manager.numOfChannels%manager.numOfQueryNodes
for i, j := 0, 0; i < manager.numOfChannels; j++ {
if j < rem {
@ -360,7 +374,9 @@ func (manager *SegmentManager) initChannelRanges() error {
}
// ForceClose set segments of collection with collID closable, segment will be closed after the assignments of it has expired
func (manager *SegmentManager) ForceClose(collID UniqueID) error {
func (manager *SegmentManagerImpl) ForceClose(collID UniqueID) error {
manager.mu.Lock()
defer manager.mu.Unlock()
status, ok := manager.collStatus[collID]
if !ok {
return nil
@ -372,16 +388,35 @@ func (manager *SegmentManager) ForceClose(collID UniqueID) error {
return nil
}
func (manager *SegmentManagerImpl) DropCollection(collID UniqueID) error {
manager.mu.Lock()
defer manager.mu.Unlock()
status, ok := manager.collStatus[collID]
if !ok {
return nil
}
for _, segStatus := range status.segments {
if err := manager.assigner.CloseSegment(segStatus.segmentID); err != nil {
return err
}
}
delete(manager.collStatus, collID)
return nil
}
func NewSegmentManager(ctx context.Context,
meta *metaTable,
globalIDAllocator func() (UniqueID, error),
globalTSOAllocator func() (Timestamp, error),
syncWriteNodeChan chan *ms.TimeTickMsg,
scheduler persistenceScheduler,
assigner *SegmentAssigner) (*SegmentManager, error) {
assigner *SegmentAssigner) (*SegmentManagerImpl, error) {
assignerCtx, cancel := context.WithCancel(ctx)
segAssigner := &SegmentManager{
segManager := &SegmentManagerImpl{
metaTable: meta,
channelRanges: make([]*channelRange, 0),
collStatus: make(map[UniqueID]*collectionStatus),
@ -401,9 +436,35 @@ func NewSegmentManager(ctx context.Context,
cancel: cancel,
}
if err := segAssigner.initChannelRanges(); err != nil {
if err := segManager.initChannelRanges(); err != nil {
return nil, err
}
return segAssigner, nil
return segManager, nil
}
type mockSegmentManager struct {
}
func (manager *mockSegmentManager) Start() {
}
func (manager *mockSegmentManager) Close() {
}
func (manager *mockSegmentManager) AssignSegment(segIDReq []*internalpb.SegIDRequest) ([]*internalpb.SegIDAssignment, error) {
return nil, nil
}
func (manager *mockSegmentManager) ForceClose(collID UniqueID) error {
return nil
}
func (manager *mockSegmentManager) DropCollection(collID UniqueID) error {
return nil
}
// only used in unit tests
func NewMockSegmentManager() SegmentManager {
return &mockSegmentManager{}
}

View File

@ -46,8 +46,9 @@ func TestMaster_DropCollectionTask(t *testing.T) {
CollectionName: nil,
}
var collectionTask task = &dropCollectionTask{
req: &req,
baseTask: baseTask{},
req: &req,
baseTask: baseTask{},
segManager: NewMockSegmentManager(),
}
assert.Equal(t, internalpb.MsgType_kDropPartition, collectionTask.Type())
ts, err := collectionTask.Ts()
@ -55,8 +56,9 @@ func TestMaster_DropCollectionTask(t *testing.T) {
assert.Nil(t, err)
collectionTask = &dropCollectionTask{
req: nil,
baseTask: baseTask{},
req: nil,
baseTask: baseTask{},
segManager: NewMockSegmentManager(),
}
assert.Equal(t, internalpb.MsgType_kNone, collectionTask.Type())