Add segment drop logic (#11621)

issue: #11558
Signed-off-by: sunby <bingyi.sun@zilliz.com>

Co-authored-by: sunby <bingyi.sun@zilliz.com>
pull/11695/head
Bingyi Sun 2021-11-12 00:22:42 +08:00 committed by GitHub
parent 002b854590
commit 54b40da4c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 333 additions and 81 deletions

View File

@ -317,3 +317,33 @@ func (c *ChannelManager) FindWatcher(channel string) (int64, error) {
}
return 0, errChannelNotWatched
}
// RemoveChannel removes the channel from channel manager
func (c *ChannelManager) RemoveChannel(channelName string) error {
c.mu.Lock()
defer c.mu.Unlock()
nodeID, ch := c.findChannel(channelName)
if ch == nil {
return nil
}
var op ChannelOpSet
op.Delete(nodeID, []*channel{ch})
if err := c.store.Update(op); err != nil {
return err
}
return nil
}
func (c *ChannelManager) findChannel(channelName string) (int64, *channel) {
infos := c.store.GetNodesChannels()
for _, info := range infos {
for _, channelInfo := range info.Channels {
if channelInfo.Name == channelName {
return info.NodeID, channelInfo
}
}
}
return 0, nil
}

View File

@ -45,3 +45,50 @@ func TestReload(t *testing.T) {
assert.True(t, cm2.Match(3, "channel2"))
})
}
func TestChannelManager_RemoveChannel(t *testing.T) {
type fields struct {
store RWChannelStore
}
type args struct {
channelName string
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
{
"test remove existed channel",
fields{
store: &ChannelStore{
store: memkv.NewMemoryKV(),
channelsInfo: map[int64]*NodeChannelInfo{
1: {
NodeID: 1,
Channels: []*channel{
{"ch1", 1},
},
},
},
},
},
args{
"ch1",
},
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &ChannelManager{
store: tt.fields.store,
}
err := c.RemoveChannel(tt.args.channelName)
assert.Equal(t, tt.wantErr, err != nil)
_, ch := c.findChannel(tt.args.channelName)
assert.Nil(t, ch)
})
}
}

View File

@ -294,6 +294,7 @@ func (t *compactionTrigger) globalMergeCompaction(signal *compactionSignal, isFo
m := t.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool {
_, has := colls[segment.GetCollectionID()]
return (has || len(collections) == 0) && // if filters collection
isSegmentHealthy(segment) &&
segment.State == commonpb.SegmentState_Flushed && // flushed only
!segment.isCompacting // not compacting now
}) // m is list of chanPartSegments, which is channel-partition organized segments

View File

@ -73,9 +73,6 @@ func (m *meta) reloadFromKV() error {
if err != nil {
return fmt.Errorf("DataCoord reloadFromKV UnMarshal datapb.SegmentInfo err:%w", err)
}
if segmentInfo.State == commonpb.SegmentState_NotExist {
continue
}
m.segments.SetSegment(segmentInfo.GetID(), NewSegmentInfo(segmentInfo))
}
@ -145,7 +142,7 @@ func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) int64 {
var ret int64 = 0
segments := m.segments.GetSegments()
for _, segment := range segments {
if segment.GetCollectionID() == collectionID {
if isSegmentHealthy(segment) && segment.GetCollectionID() == collectionID {
ret += segment.GetNumOfRows()
}
}
@ -163,6 +160,7 @@ func (m *meta) AddSegment(segment *SegmentInfo) error {
return nil
}
// Deprecated
// DropSegment remove segment with provided id, etcd persistence also removed
func (m *meta) DropSegment(segmentID UniqueID) error {
m.Lock()
@ -183,7 +181,11 @@ func (m *meta) DropSegment(segmentID UniqueID) error {
func (m *meta) GetSegment(segID UniqueID) *SegmentInfo {
m.RLock()
defer m.RUnlock()
return m.segments.GetSegment(segID)
segment := m.segments.GetSegment(segID)
if segment != nil && isSegmentHealthy(segment) {
return segment
}
return nil
}
// SetState setting segment with provided ID state
@ -191,7 +193,7 @@ func (m *meta) SetState(segmentID UniqueID, state commonpb.SegmentState) error {
m.Lock()
defer m.Unlock()
m.segments.SetState(segmentID, state)
if segInfo := m.segments.GetSegment(segmentID); segInfo != nil {
if segInfo := m.segments.GetSegment(segmentID); segInfo != nil && isSegmentHealthy(segInfo) {
return m.saveSegmentInfo(segInfo)
}
return nil
@ -200,14 +202,20 @@ func (m *meta) SetState(segmentID UniqueID, state commonpb.SegmentState) error {
// UpdateFlushSegmentsInfo update segment partial/completed flush info
// `flushed` parameter indicating whether segment is flushed completely or partially
// `binlogs`, `checkpoints` and `statPositions` are persistence data for segment
func (m *meta) UpdateFlushSegmentsInfo(segmentID UniqueID, flushed bool,
binlogs, statslogs []*datapb.FieldBinlog, deltalogs []*datapb.DeltaLogInfo, checkpoints []*datapb.CheckPoint,
startPositions []*datapb.SegmentStartPosition) error {
func (m *meta) UpdateFlushSegmentsInfo(
segmentID UniqueID,
flushed bool,
dropped bool,
binlogs, statslogs []*datapb.FieldBinlog,
deltalogs []*datapb.DeltaLogInfo,
checkpoints []*datapb.CheckPoint,
startPositions []*datapb.SegmentStartPosition,
) error {
m.Lock()
defer m.Unlock()
segment := m.segments.GetSegment(segmentID)
if segment == nil {
if segment == nil || !isSegmentHealthy(segment) {
return nil
}
@ -221,6 +229,11 @@ func (m *meta) UpdateFlushSegmentsInfo(segmentID UniqueID, flushed bool,
modSegments[segmentID] = clonedSegment
}
if dropped {
clonedSegment.State = commonpb.SegmentState_Dropped
modSegments[segmentID] = clonedSegment
}
currBinlogs := clonedSegment.GetBinlogs()
var getFieldBinlogs = func(id UniqueID, binlogs []*datapb.FieldBinlog) *datapb.FieldBinlog {
@ -261,7 +274,7 @@ func (m *meta) UpdateFlushSegmentsInfo(segmentID UniqueID, flushed bool,
if s, ok := modSegments[segmentID]; ok {
return s
}
if s := m.segments.GetSegment(segmentID); s != nil {
if s := m.segments.GetSegment(segmentID); s != nil && isSegmentHealthy(s) {
return s.Clone()
}
return nil
@ -320,20 +333,6 @@ func (m *meta) UpdateFlushSegmentsInfo(segmentID UniqueID, flushed bool,
return nil
}
// ListSegmentIDs list all segment ids stored in meta (no collection filter)
func (m *meta) ListSegmentIDs() []UniqueID {
m.RLock()
defer m.RUnlock()
infos := make([]UniqueID, 0)
segments := m.segments.GetSegments()
for _, segment := range segments {
infos = append(infos, segment.GetID())
}
return infos
}
// ListSegmentFiles lists all segment related file paths in valid & dropped list
func (m *meta) ListSegmentFiles() ([]string, []string) {
m.RLock()
@ -378,7 +377,7 @@ func (m *meta) GetSegmentsByChannel(dmlCh string) []*SegmentInfo {
infos := make([]*SegmentInfo, 0)
segments := m.segments.GetSegments()
for _, segment := range segments {
if segment.InsertChannel != dmlCh {
if !isSegmentHealthy(segment) || segment.InsertChannel != dmlCh {
continue
}
infos = append(infos, segment)
@ -394,7 +393,7 @@ func (m *meta) GetSegmentsOfCollection(collectionID UniqueID) []*SegmentInfo {
ret := make([]*SegmentInfo, 0)
segments := m.segments.GetSegments()
for _, segment := range segments {
if segment.GetCollectionID() == collectionID {
if isSegmentHealthy(segment) && segment.GetCollectionID() == collectionID {
ret = append(ret, segment)
}
}
@ -407,9 +406,9 @@ func (m *meta) GetSegmentsIDOfCollection(collectionID UniqueID) []UniqueID {
defer m.RUnlock()
ret := make([]UniqueID, 0)
segments := m.segments.GetSegments()
for _, info := range segments {
if info.CollectionID == collectionID {
ret = append(ret, info.ID)
for _, segment := range segments {
if isSegmentHealthy(segment) && segment.CollectionID == collectionID {
ret = append(ret, segment.ID)
}
}
return ret
@ -421,9 +420,9 @@ func (m *meta) GetSegmentsIDOfPartition(collectionID, partitionID UniqueID) []Un
defer m.RUnlock()
ret := make([]UniqueID, 0)
segments := m.segments.GetSegments()
for _, info := range segments {
if info.CollectionID == collectionID && info.PartitionID == partitionID {
ret = append(ret, info.ID)
for _, segment := range segments {
if isSegmentHealthy(segment) && segment.CollectionID == collectionID && segment.PartitionID == partitionID {
ret = append(ret, segment.ID)
}
}
return ret
@ -435,9 +434,9 @@ func (m *meta) GetNumRowsOfPartition(collectionID UniqueID, partitionID UniqueID
defer m.RUnlock()
var ret int64 = 0
segments := m.segments.GetSegments()
for _, info := range segments {
if info.CollectionID == collectionID && info.PartitionID == partitionID {
ret += info.NumOfRows
for _, segment := range segments {
if isSegmentHealthy(segment) && segment.CollectionID == collectionID && segment.PartitionID == partitionID {
ret += segment.NumOfRows
}
}
return ret
@ -449,9 +448,9 @@ func (m *meta) GetUnFlushedSegments() []*SegmentInfo {
defer m.RUnlock()
ret := make([]*SegmentInfo, 0)
segments := m.segments.GetSegments()
for _, info := range segments {
if info.State != commonpb.SegmentState_Flushing && info.State != commonpb.SegmentState_Flushed {
ret = append(ret, info)
for _, segment := range segments {
if segment.State == commonpb.SegmentState_Growing || segment.State == commonpb.SegmentState_Sealed {
ret = append(ret, segment)
}
}
return ret
@ -536,7 +535,7 @@ func (m *meta) CompleteMergeCompaction(compactionLogs []*datapb.CompactionSegmen
for _, cl := range compactionLogs {
if segment := m.segments.GetSegment(cl.GetSegmentID()); segment != nil {
cloned := segment.Clone()
cloned.State = commonpb.SegmentState_NotExist
cloned.State = commonpb.SegmentState_Dropped
segments = append(segments, cloned)
}
}
@ -549,12 +548,12 @@ func (m *meta) CompleteMergeCompaction(compactionLogs []*datapb.CompactionSegmen
}
// find new added delta logs when executing compaction
originDeltalogs := make([]*datapb.DeltaLogInfo, 0)
var originDeltalogs []*datapb.DeltaLogInfo
for _, s := range segments {
originDeltalogs = append(originDeltalogs, s.GetDeltalogs()...)
}
deletedDeltalogs := make([]*datapb.DeltaLogInfo, 0)
var deletedDeltalogs []*datapb.DeltaLogInfo
for _, l := range compactionLogs {
deletedDeltalogs = append(deletedDeltalogs, l.GetDeltalogs()...)
}
@ -775,3 +774,8 @@ func buildSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueI
}
return NewSegmentInfo(info)
}
func isSegmentHealthy(segment *SegmentInfo) bool {
return segment.GetState() != commonpb.SegmentState_NotExist &&
segment.GetState() != commonpb.SegmentState_Dropped
}

View File

@ -241,7 +241,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
err = meta.AddSegment(segment1)
assert.Nil(t, err)
err = meta.UpdateFlushSegmentsInfo(1, true, []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"binlog1"}}},
err = meta.UpdateFlushSegmentsInfo(1, true, false, []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"binlog1"}}},
[]*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"statslog1"}}},
[]*datapb.DeltaLogInfo{{RecordEntries: 1, TimestampFrom: 100, TimestampTo: 200, DeltaLogSize: 1000}},
[]*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}})
@ -262,7 +262,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
meta, err := newMeta(memkv.NewMemoryKV())
assert.Nil(t, err)
err = meta.UpdateFlushSegmentsInfo(1, false, nil, nil, nil, nil, nil)
err = meta.UpdateFlushSegmentsInfo(1, false, false, nil, nil, nil, nil, nil)
assert.Nil(t, err)
})
@ -274,7 +274,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
err = meta.AddSegment(segment1)
assert.Nil(t, err)
err = meta.UpdateFlushSegmentsInfo(1, false, nil, nil, nil, []*datapb.CheckPoint{{SegmentID: 2, NumOfRows: 10}},
err = meta.UpdateFlushSegmentsInfo(1, false, false, nil, nil, nil, []*datapb.CheckPoint{{SegmentID: 2, NumOfRows: 10}},
[]*datapb.SegmentStartPosition{{SegmentID: 2, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}})
assert.Nil(t, err)
@ -296,7 +296,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
}
meta.segments.SetSegment(1, segmentInfo)
err = meta.UpdateFlushSegmentsInfo(1, true, []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"binlog"}}},
err = meta.UpdateFlushSegmentsInfo(1, true, false, []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"binlog"}}},
[]*datapb.FieldBinlog{{FieldID: 1, Binlogs: []string{"statslog"}}},
[]*datapb.DeltaLogInfo{{RecordEntries: 1, TimestampFrom: 100, TimestampTo: 200, DeltaLogSize: 1000}},
[]*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}})

View File

@ -83,6 +83,8 @@ type Manager interface {
GetFlushableSegments(ctx context.Context, channel string, ts Timestamp) ([]UniqueID, error)
// ExpireAllocations notifies segment status to expire old allocations
ExpireAllocations(channel string, ts Timestamp) error
// DropSegmentsOfChannel drops all segments in a channel
DropSegmentsOfChannel(ctx context.Context, channel string)
}
// Allocation records the allocation info
@ -482,3 +484,23 @@ func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error {
}
return nil
}
// DropSegmentsOfChannel drops all segments in a channel
func (s *SegmentManager) DropSegmentsOfChannel(ctx context.Context, channel string) {
s.mu.Lock()
defer s.mu.Unlock()
validSegments := make([]int64, 0, len(s.segments))
for _, sid := range s.segments {
segment := s.meta.GetSegment(sid)
if segment != nil && segment.GetInsertChannel() != channel {
validSegments = append(validSegments, sid)
}
s.meta.SetAllocations(sid, nil)
for _, allocation := range segment.allocations {
putAllocation(allocation)
}
}
s.segments = validSegments
}

View File

@ -27,7 +27,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/stretchr/testify/assert"
)
@ -496,3 +495,58 @@ func TestAllocationPool(t *testing.T) {
})
}
func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) {
type fields struct {
meta *meta
segments []UniqueID
}
type args struct {
channel string
}
tests := []struct {
name string
fields fields
args args
want []UniqueID
}{
{
"test drop segments",
fields{
meta: &meta{
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
InsertChannel: "ch1",
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
InsertChannel: "ch2",
},
},
},
},
},
segments: []UniqueID{1, 2},
},
args{
"ch1",
},
[]UniqueID{2},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &SegmentManager{
meta: tt.fields.meta,
segments: tt.fields.segments,
}
s.DropSegmentsOfChannel(context.TODO(), tt.args.channel)
assert.ElementsMatch(t, tt.want, s.segments)
})
}
}

View File

@ -167,6 +167,13 @@ func SetDataNodeCreator(creator dataNodeCreatorFunc) Option {
}
}
// SetSegmentManager returns an Option to set SegmentManager
func SetSegmentManager(manager Manager) Option {
return func(svr *Server) {
svr.segmentManager = manager
}
}
// CreateServer create `Server` instance
func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) (*Server, error) {
rand.Seed(time.Now().UnixNano())
@ -359,7 +366,9 @@ func (s *Server) initServiceDiscovery() error {
}
func (s *Server) startSegmentManager() {
s.segmentManager = newSegmentManager(s.meta, s.allocator)
if s.segmentManager == nil {
s.segmentManager = newSegmentManager(s.meta, s.allocator)
}
}
func (s *Server) initMeta() error {
@ -499,7 +508,8 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
}
staleSegments := s.meta.SelectSegments(func(info *SegmentInfo) bool {
return info.GetInsertChannel() == ch &&
return isSegmentHealthy(info) &&
info.GetInsertChannel() == ch &&
!info.lastFlushTime.IsZero() &&
time.Since(info.lastFlushTime).Minutes() >= segmentTimedFlushDuration
})

View File

@ -734,53 +734,75 @@ func TestChannel(t *testing.T) {
})
}
type spySegmentManager struct {
spyCh chan struct{}
}
// AllocSegment allocates rows and record the allocation.
func (s *spySegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error) {
panic("not implemented") // TODO: Implement
}
// DropSegment drops the segment from manager.
func (s *spySegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) {
panic("not implemented") // TODO: Implement
}
// SealAllSegments seals all segments of collection with collectionID and return sealed segments
func (s *spySegmentManager) SealAllSegments(ctx context.Context, collectionID UniqueID) ([]UniqueID, error) {
panic("not implemented") // TODO: Implement
}
// GetFlushableSegments returns flushable segment ids
func (s *spySegmentManager) GetFlushableSegments(ctx context.Context, channel string, ts Timestamp) ([]UniqueID, error) {
panic("not implemented") // TODO: Implement
}
// ExpireAllocations notifies segment status to expire old allocations
func (s *spySegmentManager) ExpireAllocations(channel string, ts Timestamp) error {
panic("not implemented") // TODO: Implement
}
// DropSegmentsOfChannel drops all segments in a channel
func (s *spySegmentManager) DropSegmentsOfChannel(ctx context.Context, channel string) {
s.spyCh <- struct{}{}
}
func TestSaveBinlogPaths(t *testing.T) {
t.Run("Normal SaveRequest", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
collections := []struct {
ID UniqueID
Partitions []int64
}{
{0, []int64{0, 1}},
{1, []int64{0, 1}},
}
for _, collection := range collections {
svr.meta.AddCollection(&datapb.CollectionInfo{
ID: collection.ID,
Schema: nil,
Partitions: collection.Partitions,
})
}
svr.meta.AddCollection(&datapb.CollectionInfo{ID: 0})
segments := []struct {
id UniqueID
collectionID UniqueID
partitionID UniqueID
}{
{0, 0, 0},
{1, 0, 0},
{2, 0, 1},
{3, 1, 1},
{0, 0},
{1, 0},
}
for _, segment := range segments {
s := &datapb.SegmentInfo{
ID: segment.id,
CollectionID: segment.collectionID,
PartitionID: segment.partitionID,
ID: segment.id,
CollectionID: segment.collectionID,
InsertChannel: "ch1",
}
err := svr.meta.AddSegment(NewSegmentInfo(s))
assert.Nil(t, err)
}
err := svr.channelManager.AddNode(0)
assert.Nil(t, err)
err = svr.channelManager.Watch(&channel{"ch1", 0})
assert.Nil(t, err)
ctx := context.Background()
resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
Timestamp: uint64(time.Now().Unix()),
},
SegmentID: 2,
SegmentID: 1,
CollectionID: 0,
Field2BinlogPaths: []*datapb.FieldBinlog{
{
@ -808,7 +830,7 @@ func TestSaveBinlogPaths(t *testing.T) {
assert.Nil(t, err)
assert.EqualValues(t, resp.ErrorCode, commonpb.ErrorCode_Success)
segment := svr.meta.GetSegment(2)
segment := svr.meta.GetSegment(1)
assert.NotNil(t, segment)
binlogs := segment.GetBinlogs()
assert.EqualValues(t, 1, len(binlogs))
@ -834,6 +856,34 @@ func TestSaveBinlogPaths(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode())
assert.Equal(t, serverNotServingErrMsg, resp.GetReason())
})
t.Run("test save dropped segment and remove channel", func(t *testing.T) {
spyCh := make(chan struct{}, 1)
svr := newTestServer(t, nil, SetSegmentManager(&spySegmentManager{spyCh: spyCh}))
defer closeTestServer(t, svr)
svr.meta.AddCollection(&datapb.CollectionInfo{ID: 1})
err := svr.meta.AddSegment(&SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
InsertChannel: "ch1",
},
})
assert.Nil(t, err)
err = svr.channelManager.AddNode(0)
assert.Nil(t, err)
err = svr.channelManager.Watch(&channel{"ch1", 1})
assert.Nil(t, err)
_, err = svr.SaveBinlogPaths(context.TODO(), &datapb.SaveBinlogPathsRequest{
SegmentID: 1,
Dropped: true,
})
assert.Nil(t, err)
<-spyCh
})
}
func TestDataNodeTtChannel(t *testing.T) {
@ -1276,6 +1326,12 @@ func TestGetRecoveryInfo(t *testing.T) {
segment := createSegment(0, 0, 0, 100, 10, "ch1", commonpb.SegmentState_Flushed)
err := svr.meta.AddSegment(NewSegmentInfo(segment))
assert.Nil(t, err)
err = svr.channelManager.AddNode(0)
assert.Nil(t, err)
err = svr.channelManager.Watch(&channel{"ch1", 0})
assert.Nil(t, err)
sResp, err := svr.SaveBinlogPaths(context.TODO(), binlogReq)
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, sResp.ErrorCode)

View File

@ -289,9 +289,7 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
// SaveBinlogPaths update segment related binlog path
// works for Checkpoints and Flush
func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
resp := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}
resp := &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}
if s.isClosed() {
resp.Reason = serverNotServingErrMsg
@ -319,12 +317,19 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
if !s.channelManager.Match(nodeID, channel) {
FailResponse(resp, fmt.Sprintf("channel %s is not watched on node %d", channel, nodeID))
log.Warn("node is not matched with channel", zap.String("channel", channel), zap.Int64("nodeID", nodeID))
return resp, nil
}
// set segment to SegmentState_Flushing and save binlogs and checkpoints
err := s.meta.UpdateFlushSegmentsInfo(req.GetSegmentID(), req.GetFlushed(),
req.GetField2BinlogPaths(), req.GetField2StatslogPaths(), req.GetDeltalogs(),
req.GetCheckPoints(), req.GetStartPositions())
err := s.meta.UpdateFlushSegmentsInfo(
req.GetSegmentID(),
req.GetFlushed(),
req.GetDropped(),
req.GetField2BinlogPaths(),
req.GetField2StatslogPaths(),
req.GetDeltalogs(),
req.GetCheckPoints(),
req.GetStartPositions())
if err != nil {
log.Error("save binlog and checkpoints failed",
zap.Int64("segmentID", req.GetSegmentID()),
@ -336,7 +341,15 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
log.Debug("flush segment with meta", zap.Int64("id", req.SegmentID),
zap.Any("meta", req.GetField2BinlogPaths()))
if req.Flushed {
if req.GetDropped() && s.checkShouldDropChannel(channel) {
err = s.channelManager.RemoveChannel(channel)
if err != nil {
log.Warn("failed to remove channel", zap.String("channel", channel), zap.Error(err))
}
s.segmentManager.DropSegmentsOfChannel(ctx, channel)
}
if req.GetFlushed() {
s.segmentManager.DropSegment(ctx, req.SegmentID)
s.flushCh <- req.SegmentID
@ -357,6 +370,21 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
return resp, nil
}
func (s *Server) checkShouldDropChannel(channel string) bool {
segments := s.meta.GetSegmentsByChannel(channel)
for _, segment := range segments {
if segment.GetStartPosition() != nil && // fitler empty segment
// FIXME: we filter compaction generated segments
// because datanode may not know the segment due to the network lag or
// datacoord crash when handling CompleteCompaction.
len(segment.CompactionFrom) != 0 &&
segment.GetState() != commonpb.SegmentState_Dropped {
return false
}
}
return true
}
// GetComponentStates returns DataCoord's current state
func (s *Server) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
resp := &internalpb.ComponentStates{