mirror of https://github.com/milvus-io/milvus.git
This PR primary picks up the SyncSegments functionality, including the following commits: - main functionality: https://github.com/milvus-io/milvus/pull/33420 - related fixes: - https://github.com/milvus-io/milvus/pull/33664 - https://github.com/milvus-io/milvus/pull/33829 - https://github.com/milvus-io/milvus/pull/34056 - https://github.com/milvus-io/milvus/pull/34156 issue: #32809 master pr: #33420, #33664, #33829, #34056, #34156 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>pull/34185/head^2
parent
42096a2337
commit
1c6e850f73
|
@ -489,6 +489,7 @@ dataCoord:
|
|||
serverMaxRecvSize: 268435456
|
||||
clientMaxSendSize: 268435456
|
||||
clientMaxRecvSize: 536870912
|
||||
syncSegmentsInterval: 300
|
||||
|
||||
dataNode:
|
||||
dataSync:
|
||||
|
|
|
@ -487,18 +487,7 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact
|
|||
// Apply metrics after successful meta update.
|
||||
metricMutation.commit()
|
||||
}
|
||||
|
||||
nodeID := c.plans[plan.GetPlanID()].dataNodeID
|
||||
req := &datapb.SyncSegmentsRequest{
|
||||
PlanID: plan.PlanID,
|
||||
}
|
||||
|
||||
log.Info("handleCompactionResult: syncing segments with node", zap.Int64("nodeID", nodeID))
|
||||
if err := c.sessions.SyncSegments(nodeID, req); err != nil {
|
||||
log.Warn("handleCompactionResult: fail to sync segments with node",
|
||||
zap.Int64("nodeID", nodeID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
// TODO @xiaocai2333: drop compaction plan on datanode
|
||||
|
||||
log.Info("handleCompactionResult: success to handle merge compaction result")
|
||||
return nil
|
||||
|
@ -546,13 +535,8 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
|
|||
// task.dataNodeID not match with channel
|
||||
// Mark this compaction as failure and skip processing the meta
|
||||
if !c.chManager.Match(task.dataNodeID, task.plan.GetChannel()) {
|
||||
// Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task
|
||||
// without changing the meta
|
||||
// TODO @xiaocai2333: drop compaction plan on datanode
|
||||
log.Warn("compaction failed for channel nodeID not match")
|
||||
if err := c.sessions.SyncSegments(task.dataNodeID, &datapb.SyncSegmentsRequest{PlanID: planID}); err != nil {
|
||||
log.Warn("compaction failed to sync segments with node", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
c.plans[planID] = c.plans[planID].shadowClone(setState(failed), endSpan())
|
||||
c.setSegmentsCompacting(task.plan, false)
|
||||
c.scheduler.Finish(task.dataNodeID, task.plan)
|
||||
|
@ -616,16 +600,8 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
|
|||
if nodeUnkonwnPlan, ok := completedPlans[planID]; ok {
|
||||
nodeID, plan := nodeUnkonwnPlan.A, nodeUnkonwnPlan.B
|
||||
log := log.With(zap.Int64("planID", planID), zap.Int64("nodeID", nodeID), zap.String("channel", plan.GetChannel()))
|
||||
|
||||
// Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task
|
||||
// without changing the meta
|
||||
log.Info("compaction syncing unknown plan with node")
|
||||
if err := c.sessions.SyncSegments(nodeID, &datapb.SyncSegmentsRequest{
|
||||
PlanID: planID,
|
||||
}); err != nil {
|
||||
log.Warn("compaction failed to sync segments with node", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
// TODO @xiaocai2333: drop compaction plan on datanode
|
||||
log.Info("drop unknown plan with node")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -85,7 +85,6 @@ func (s *CompactionPlanHandlerSuite) TestCheckResult() {
|
|||
4: {A: 100, B: &datapb.CompactionPlanResult{PlanID: 4, State: commonpb.CompactionState_Executing}},
|
||||
}, nil)
|
||||
|
||||
s.mockSessMgr.EXPECT().SyncSegments(int64(100), mock.Anything).Return(nil).Once()
|
||||
{
|
||||
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(0, errors.New("mock")).Once()
|
||||
handler := newCompactionPlanHandler(nil, s.mockSessMgr, nil, nil, s.mockAlloc)
|
||||
|
@ -476,7 +475,6 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() {
|
|||
}
|
||||
return nil
|
||||
}).Once()
|
||||
s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once()
|
||||
|
||||
handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc)
|
||||
handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan}
|
||||
|
@ -518,7 +516,6 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() {
|
|||
s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).Return(
|
||||
[]*SegmentInfo{segment},
|
||||
&segMetricMutation{}, nil).Once()
|
||||
s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once()
|
||||
|
||||
handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc)
|
||||
handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan}
|
||||
|
@ -530,7 +527,7 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() {
|
|||
}
|
||||
|
||||
err := handler.handleMergeCompactionResult(plan, compactionResult)
|
||||
s.Error(err)
|
||||
s.NoError(err)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -550,7 +547,6 @@ func (s *CompactionPlanHandlerSuite) TestCompleteCompaction() {
|
|||
})
|
||||
|
||||
s.Run("test complete merge compaction task", func() {
|
||||
s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once()
|
||||
// mock for handleMergeCompactionResult
|
||||
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil).Once()
|
||||
segment := NewSegmentInfo(&datapb.SegmentInfo{ID: 100})
|
||||
|
@ -703,14 +699,6 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() {
|
|||
},
|
||||
}
|
||||
|
||||
s.mockSessMgr.EXPECT().SyncSegments(int64(222), mock.Anything).RunAndReturn(func(nodeID int64, req *datapb.SyncSegmentsRequest) error {
|
||||
s.EqualValues(nodeID, 222)
|
||||
s.NotNil(req)
|
||||
s.Empty(req.GetCompactedFrom())
|
||||
s.EqualValues(5, req.GetPlanID())
|
||||
return nil
|
||||
}).Once()
|
||||
s.mockSessMgr.EXPECT().SyncSegments(int64(111), mock.Anything).Return(nil)
|
||||
s.mockCm.EXPECT().Match(int64(111), "ch-1").Return(true)
|
||||
s.mockCm.EXPECT().Match(int64(111), "ch-2").Return(false).Once()
|
||||
|
||||
|
|
|
@ -1572,3 +1572,10 @@ func updateSegStateAndPrepareMetrics(segToUpdate *SegmentInfo, targetState commo
|
|||
segToUpdate.DroppedAt = uint64(time.Now().UnixNano())
|
||||
}
|
||||
}
|
||||
|
||||
func (m *meta) ListCollections() []int64 {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
return lo.Keys(m.collections)
|
||||
}
|
||||
|
|
|
@ -127,6 +127,7 @@ type Server struct {
|
|||
compactionTrigger trigger
|
||||
compactionHandler compactionPlanContext
|
||||
compactionViewManager *CompactionViewManager
|
||||
syncSegmentsScheduler *SyncSegmentsScheduler
|
||||
|
||||
metricsCacheManager *metricsinfo.MetricsCacheManager
|
||||
|
||||
|
@ -393,6 +394,8 @@ func (s *Server) initDataCoord() error {
|
|||
s.importScheduler = NewImportScheduler(s.meta, s.cluster, s.allocator, s.importMeta, s.buildIndexCh)
|
||||
s.importChecker = NewImportChecker(s.meta, s.broker, s.cluster, s.allocator, s.segmentManager, s.importMeta)
|
||||
|
||||
s.syncSegmentsScheduler = newSyncSegmentsScheduler(s.meta, s.channelManager, s.sessionManager)
|
||||
|
||||
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
|
||||
|
||||
log.Info("init datacoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address))
|
||||
|
@ -712,6 +715,7 @@ func (s *Server) startServerLoop() {
|
|||
go s.importScheduler.Start()
|
||||
go s.importChecker.Start()
|
||||
s.garbageCollector.start()
|
||||
s.syncSegmentsScheduler.Start()
|
||||
}
|
||||
|
||||
// startDataNodeTtLoop start a goroutine to recv data node tt msg from msgstream
|
||||
|
@ -1102,6 +1106,7 @@ func (s *Server) Stop() error {
|
|||
|
||||
s.importScheduler.Close()
|
||||
s.importChecker.Close()
|
||||
s.syncSegmentsScheduler.Stop()
|
||||
|
||||
if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
|
||||
s.stopCompactionTrigger()
|
||||
|
|
|
@ -0,0 +1,150 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package datacoord
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/logutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
type SyncSegmentsScheduler struct {
|
||||
quit chan struct{}
|
||||
wg sync.WaitGroup
|
||||
|
||||
meta *meta
|
||||
channelManager ChannelManager
|
||||
sessions SessionManager
|
||||
}
|
||||
|
||||
func newSyncSegmentsScheduler(m *meta, channelManager ChannelManager, sessions SessionManager) *SyncSegmentsScheduler {
|
||||
return &SyncSegmentsScheduler{
|
||||
quit: make(chan struct{}),
|
||||
wg: sync.WaitGroup{},
|
||||
meta: m,
|
||||
channelManager: channelManager,
|
||||
sessions: sessions,
|
||||
}
|
||||
}
|
||||
|
||||
func (sss *SyncSegmentsScheduler) Start() {
|
||||
sss.quit = make(chan struct{})
|
||||
sss.wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer logutil.LogPanic()
|
||||
ticker := time.NewTicker(Params.DataCoordCfg.SyncSegmentsInterval.GetAsDuration(time.Second))
|
||||
defer sss.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-sss.quit:
|
||||
log.Info("sync segments scheduler quit")
|
||||
ticker.Stop()
|
||||
return
|
||||
case <-ticker.C:
|
||||
sss.SyncSegmentsForCollections()
|
||||
}
|
||||
}
|
||||
}()
|
||||
log.Info("SyncSegmentsScheduler started...")
|
||||
}
|
||||
|
||||
func (sss *SyncSegmentsScheduler) Stop() {
|
||||
close(sss.quit)
|
||||
sss.wg.Wait()
|
||||
}
|
||||
|
||||
func (sss *SyncSegmentsScheduler) SyncSegmentsForCollections() {
|
||||
collIDs := sss.meta.ListCollections()
|
||||
for _, collID := range collIDs {
|
||||
collInfo := sss.meta.GetCollection(collID)
|
||||
if collInfo == nil {
|
||||
log.Warn("collection info is nil, skip it", zap.Int64("collectionID", collID))
|
||||
continue
|
||||
}
|
||||
pkField, err := typeutil.GetPrimaryFieldSchema(collInfo.Schema)
|
||||
if err != nil {
|
||||
log.Warn("get primary field from schema failed", zap.Int64("collectionID", collID),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
for _, channelName := range collInfo.VChannelNames {
|
||||
nodeID, err := sss.channelManager.FindWatcher(channelName)
|
||||
if err != nil {
|
||||
log.Warn("find watcher for channel failed", zap.Int64("collectionID", collID),
|
||||
zap.String("channelName", channelName), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
for _, partitionID := range collInfo.Partitions {
|
||||
if err := sss.SyncFlushedSegments(collID, partitionID, channelName, nodeID, pkField.GetFieldID()); err != nil {
|
||||
log.Warn("sync segment with channel failed, retry next ticker",
|
||||
zap.Int64("collectionID", collID),
|
||||
zap.Int64("partitionID", partitionID),
|
||||
zap.String("channel", channelName),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sss *SyncSegmentsScheduler) SyncFlushedSegments(collectionID, partitionID int64, channelName string, nodeID, pkFieldID int64) error {
|
||||
log := log.With(zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID),
|
||||
zap.String("channelName", channelName), zap.Int64("nodeID", nodeID))
|
||||
segments := sss.meta.SelectSegments(WithCollection(collectionID), WithChannel(channelName), SegmentFilterFunc(func(info *SegmentInfo) bool {
|
||||
return info.GetPartitionID() == partitionID && isFlush(info)
|
||||
}))
|
||||
req := &datapb.SyncSegmentsRequest{
|
||||
ChannelName: channelName,
|
||||
PartitionId: partitionID,
|
||||
CollectionId: collectionID,
|
||||
SegmentInfos: make(map[int64]*datapb.SyncSegmentInfo),
|
||||
}
|
||||
|
||||
for _, seg := range segments {
|
||||
req.SegmentInfos[seg.ID] = &datapb.SyncSegmentInfo{
|
||||
SegmentId: seg.GetID(),
|
||||
State: seg.GetState(),
|
||||
Level: seg.GetLevel(),
|
||||
NumOfRows: seg.GetNumOfRows(),
|
||||
}
|
||||
for _, statsLog := range seg.GetStatslogs() {
|
||||
if statsLog.GetFieldID() == pkFieldID {
|
||||
req.SegmentInfos[seg.ID].PkStatsLog = statsLog
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := sss.sessions.SyncSegments(nodeID, req); err != nil {
|
||||
log.Warn("fail to sync segments with node", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Info("sync segments success", zap.Int64s("segments", lo.Map(segments, func(t *SegmentInfo, i int) int64 {
|
||||
return t.GetID()
|
||||
})))
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,369 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package datacoord
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
)
|
||||
|
||||
type SyncSegmentsSchedulerSuite struct {
|
||||
suite.Suite
|
||||
|
||||
m *meta
|
||||
new atomic.Int64
|
||||
old atomic.Int64
|
||||
}
|
||||
|
||||
func Test_SyncSegmentsSchedulerSuite(t *testing.T) {
|
||||
suite.Run(t, new(SyncSegmentsSchedulerSuite))
|
||||
}
|
||||
|
||||
func (s *SyncSegmentsSchedulerSuite) initParams() {
|
||||
s.m = &meta{
|
||||
RWMutex: sync.RWMutex{},
|
||||
collections: map[UniqueID]*collectionInfo{
|
||||
1: {
|
||||
ID: 1,
|
||||
Schema: &schemapb.CollectionSchema{
|
||||
Name: "coll1",
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: 100,
|
||||
Name: "pk",
|
||||
IsPrimaryKey: true,
|
||||
Description: "",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
Name: "vec",
|
||||
IsPrimaryKey: false,
|
||||
Description: "",
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
},
|
||||
},
|
||||
},
|
||||
Partitions: []int64{2, 3},
|
||||
VChannelNames: []string{"channel1", "channel2"},
|
||||
},
|
||||
2: nil,
|
||||
},
|
||||
segments: &SegmentsInfo{
|
||||
collSegments: map[UniqueID]*CollectionSegments{
|
||||
1: {
|
||||
segments: map[UniqueID]*SegmentInfo{
|
||||
5: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 5,
|
||||
CollectionID: 1,
|
||||
PartitionID: 2,
|
||||
InsertChannel: "channel1",
|
||||
NumOfRows: 3000,
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
Statslogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
FieldID: 100,
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogID: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogID: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
6: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 6,
|
||||
CollectionID: 1,
|
||||
PartitionID: 3,
|
||||
InsertChannel: "channel1",
|
||||
NumOfRows: 3000,
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
Statslogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
FieldID: 100,
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogID: 3,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogID: 4,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
9: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 9,
|
||||
CollectionID: 1,
|
||||
PartitionID: 2,
|
||||
InsertChannel: "channel1",
|
||||
NumOfRows: 3000,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Statslogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
FieldID: 100,
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogID: 9,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogID: 10,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
CompactionFrom: []int64{5},
|
||||
},
|
||||
},
|
||||
10: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 10,
|
||||
CollectionID: 1,
|
||||
PartitionID: 3,
|
||||
InsertChannel: "channel1",
|
||||
NumOfRows: 3000,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Statslogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
FieldID: 100,
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogID: 7,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogID: 8,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
CompactionFrom: []int64{6},
|
||||
},
|
||||
},
|
||||
7: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 7,
|
||||
CollectionID: 1,
|
||||
PartitionID: 2,
|
||||
InsertChannel: "channel2",
|
||||
NumOfRows: 3000,
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
Statslogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
FieldID: 100,
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogID: 5,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogID: 6,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
8: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 8,
|
||||
CollectionID: 1,
|
||||
PartitionID: 3,
|
||||
InsertChannel: "channel2",
|
||||
NumOfRows: 3000,
|
||||
State: commonpb.SegmentState_Dropped,
|
||||
Statslogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
FieldID: 100,
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogID: 7,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogID: 8,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
11: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 11,
|
||||
CollectionID: 1,
|
||||
PartitionID: 2,
|
||||
InsertChannel: "channel2",
|
||||
NumOfRows: 3000,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Statslogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
FieldID: 100,
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogID: 5,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogID: 6,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
CompactionFrom: []int64{7},
|
||||
},
|
||||
},
|
||||
12: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 12,
|
||||
CollectionID: 1,
|
||||
PartitionID: 3,
|
||||
InsertChannel: "channel2",
|
||||
NumOfRows: 3000,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Statslogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
FieldID: 100,
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogID: 7,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: 101,
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogID: 8,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
CompactionFrom: []int64{8},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SyncSegmentsSchedulerSuite) SetupTest() {
|
||||
s.initParams()
|
||||
}
|
||||
|
||||
func (s *SyncSegmentsSchedulerSuite) Test_newSyncSegmentsScheduler() {
|
||||
cm := NewMockChannelManager(s.T())
|
||||
cm.EXPECT().FindWatcher(mock.Anything).Return(100, nil)
|
||||
|
||||
sm := NewMockSessionManager(s.T())
|
||||
sm.EXPECT().SyncSegments(mock.Anything, mock.Anything).RunAndReturn(func(i int64, request *datapb.SyncSegmentsRequest) error {
|
||||
for _, seg := range request.GetSegmentInfos() {
|
||||
if seg.GetState() == commonpb.SegmentState_Flushed {
|
||||
s.new.Add(1)
|
||||
}
|
||||
if seg.GetState() == commonpb.SegmentState_Dropped {
|
||||
s.old.Add(1)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
Params.DataCoordCfg.SyncSegmentsInterval.SwapTempValue("1")
|
||||
defer Params.DataCoordCfg.SyncSegmentsInterval.SwapTempValue("600")
|
||||
sss := newSyncSegmentsScheduler(s.m, cm, sm)
|
||||
sss.Start()
|
||||
|
||||
// 2 channels, 2 partitions, 2 segments
|
||||
// no longer sync dropped segments
|
||||
for s.new.Load() < 4 {
|
||||
}
|
||||
sss.Stop()
|
||||
}
|
||||
|
||||
func (s *SyncSegmentsSchedulerSuite) Test_SyncSegmentsFail() {
|
||||
cm := NewMockChannelManager(s.T())
|
||||
sm := NewMockSessionManager(s.T())
|
||||
|
||||
sss := newSyncSegmentsScheduler(s.m, cm, sm)
|
||||
|
||||
s.Run("pk not found", func() {
|
||||
sss.meta.collections[1].Schema.Fields[0].IsPrimaryKey = false
|
||||
sss.SyncSegmentsForCollections()
|
||||
sss.meta.collections[1].Schema.Fields[0].IsPrimaryKey = true
|
||||
})
|
||||
|
||||
s.Run("find watcher failed", func() {
|
||||
cm.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("mock error")).Twice()
|
||||
sss.SyncSegmentsForCollections()
|
||||
})
|
||||
|
||||
s.Run("sync segment failed", func() {
|
||||
cm.EXPECT().FindWatcher(mock.Anything).Return(100, nil)
|
||||
sm.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(errors.New("mock error"))
|
||||
sss.SyncSegmentsForCollections()
|
||||
})
|
||||
}
|
|
@ -228,38 +228,6 @@ func (_c *MockCompactor_GetPlanID_Call) RunAndReturn(run func() int64) *MockComp
|
|||
return _c
|
||||
}
|
||||
|
||||
// InjectDone provides a mock function with given fields:
|
||||
func (_m *MockCompactor) InjectDone() {
|
||||
_m.Called()
|
||||
}
|
||||
|
||||
// MockCompactor_InjectDone_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InjectDone'
|
||||
type MockCompactor_InjectDone_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// InjectDone is a helper method to define mock.On call
|
||||
func (_e *MockCompactor_Expecter) InjectDone() *MockCompactor_InjectDone_Call {
|
||||
return &MockCompactor_InjectDone_Call{Call: _e.mock.On("InjectDone")}
|
||||
}
|
||||
|
||||
func (_c *MockCompactor_InjectDone_Call) Run(run func()) *MockCompactor_InjectDone_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run()
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockCompactor_InjectDone_Call) Return() *MockCompactor_InjectDone_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockCompactor_InjectDone_Call) RunAndReturn(run func()) *MockCompactor_InjectDone_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// Stop provides a mock function with given fields:
|
||||
func (_m *MockCompactor) Stop() {
|
||||
_m.Called()
|
||||
|
|
|
@ -80,7 +80,6 @@ func TestCompactionExecutor(t *testing.T) {
|
|||
ex.executeWithState(mockC)
|
||||
<-signal
|
||||
} else {
|
||||
mockC.EXPECT().InjectDone().Return().Maybe()
|
||||
mockC.EXPECT().Compact().RunAndReturn(
|
||||
func() (*datapb.CompactionPlanResult, error) {
|
||||
signal <- struct{}{}
|
||||
|
|
|
@ -297,6 +297,7 @@ func (node *DataNode) Init() error {
|
|||
} else {
|
||||
node.eventManager = NewEventManager()
|
||||
}
|
||||
|
||||
log.Info("init datanode done", zap.String("Address", node.address))
|
||||
})
|
||||
return initError
|
||||
|
|
|
@ -50,6 +50,10 @@ type MetaCache interface {
|
|||
GetSegmentIDsBy(filters ...SegmentFilter) []int64
|
||||
// PredictSegments returns the segment ids which may contain the provided primary key.
|
||||
PredictSegments(pk storage.PrimaryKey, filters ...SegmentFilter) ([]int64, bool)
|
||||
// DetectMissingSegments returns the segment ids which is missing in datanode.
|
||||
DetectMissingSegments(segments map[int64]struct{}) []int64
|
||||
// UpdateSegmentView updates the segments BF from datacoord view.
|
||||
UpdateSegmentView(partitionID int64, newSegments []*datapb.SyncSegmentInfo, newSegmentsBF []*BloomFilterSet, allSegments map[int64]struct{})
|
||||
}
|
||||
|
||||
var _ MetaCache = (*metaCacheImpl)(nil)
|
||||
|
@ -222,3 +226,55 @@ func (c *metaCacheImpl) rangeWithFilter(fn func(id int64, info *SegmentInfo), fi
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *metaCacheImpl) DetectMissingSegments(segments map[int64]struct{}) []int64 {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
missingSegments := make([]int64, 0)
|
||||
|
||||
for segID := range segments {
|
||||
if _, ok := c.segmentInfos[segID]; !ok {
|
||||
missingSegments = append(missingSegments, segID)
|
||||
}
|
||||
}
|
||||
|
||||
return missingSegments
|
||||
}
|
||||
|
||||
func (c *metaCacheImpl) UpdateSegmentView(partitionID int64,
|
||||
newSegments []*datapb.SyncSegmentInfo,
|
||||
newSegmentsBF []*BloomFilterSet,
|
||||
allSegments map[int64]struct{},
|
||||
) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
for i, info := range newSegments {
|
||||
// check again
|
||||
if _, ok := c.segmentInfos[info.GetSegmentId()]; !ok {
|
||||
segInfo := &SegmentInfo{
|
||||
segmentID: info.GetSegmentId(),
|
||||
partitionID: partitionID,
|
||||
state: info.GetState(),
|
||||
level: info.GetLevel(),
|
||||
flushedRows: info.GetNumOfRows(),
|
||||
startPosRecorded: true,
|
||||
bfs: newSegmentsBF[i],
|
||||
}
|
||||
c.segmentInfos[info.GetSegmentId()] = segInfo
|
||||
log.Info("metacache does not have segment, add it", zap.Int64("segmentID", info.GetSegmentId()))
|
||||
}
|
||||
}
|
||||
|
||||
for segID, info := range c.segmentInfos {
|
||||
if info.partitionID != partitionID ||
|
||||
(info.state != commonpb.SegmentState_Flushed && info.state != commonpb.SegmentState_Flushing) {
|
||||
continue
|
||||
}
|
||||
if _, ok := allSegments[segID]; !ok {
|
||||
log.Info("remove dropped segment", zap.Int64("segmentID", segID))
|
||||
delete(c.segmentInfos, segID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -189,6 +189,50 @@ func (s *MetaCacheSuite) TestPredictSegments() {
|
|||
s.EqualValues(1, predict[0])
|
||||
}
|
||||
|
||||
func (s *MetaCacheSuite) Test_DetectMissingSegments() {
|
||||
segments := map[int64]struct{}{
|
||||
1: {}, 2: {}, 3: {}, 4: {}, 5: {}, 6: {}, 7: {}, 8: {}, 9: {}, 10: {},
|
||||
}
|
||||
|
||||
missingSegments := s.cache.DetectMissingSegments(segments)
|
||||
s.ElementsMatch(missingSegments, []int64{9, 10})
|
||||
}
|
||||
|
||||
func (s *MetaCacheSuite) Test_UpdateSegmentView() {
|
||||
addSegments := []*datapb.SyncSegmentInfo{
|
||||
{
|
||||
SegmentId: 100,
|
||||
PkStatsLog: nil,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
NumOfRows: 10240,
|
||||
},
|
||||
}
|
||||
addSegmentsBF := []*BloomFilterSet{
|
||||
NewBloomFilterSet(),
|
||||
}
|
||||
segments := map[int64]struct{}{
|
||||
1: {}, 2: {}, 3: {}, 4: {}, 5: {}, 6: {}, 7: {}, 8: {}, 100: {},
|
||||
}
|
||||
|
||||
s.cache.UpdateSegmentView(1, addSegments, addSegmentsBF, segments)
|
||||
|
||||
addSegments = []*datapb.SyncSegmentInfo{
|
||||
{
|
||||
SegmentId: 101,
|
||||
PkStatsLog: nil,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
NumOfRows: 10240,
|
||||
},
|
||||
}
|
||||
|
||||
segments = map[int64]struct{}{
|
||||
1: {}, 2: {}, 3: {}, 4: {}, 5: {}, 6: {}, 7: {}, 8: {}, 101: {},
|
||||
}
|
||||
s.cache.UpdateSegmentView(1, addSegments, addSegmentsBF, segments)
|
||||
}
|
||||
|
||||
func TestMetaCacheSuite(t *testing.T) {
|
||||
suite.Run(t, new(MetaCacheSuite))
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
// Code generated by mockery v2.30.1. DO NOT EDIT.
|
||||
// Code generated by mockery v2.32.4. DO NOT EDIT.
|
||||
|
||||
package metacache
|
||||
|
||||
|
@ -42,9 +42,9 @@ type MockMetaCache_AddSegment_Call struct {
|
|||
}
|
||||
|
||||
// AddSegment is a helper method to define mock.On call
|
||||
// - segInfo *datapb.SegmentInfo
|
||||
// - factory PkStatsFactory
|
||||
// - actions ...SegmentAction
|
||||
// - segInfo *datapb.SegmentInfo
|
||||
// - factory PkStatsFactory
|
||||
// - actions ...SegmentAction
|
||||
func (_e *MockMetaCache_Expecter) AddSegment(segInfo interface{}, factory interface{}, actions ...interface{}) *MockMetaCache_AddSegment_Call {
|
||||
return &MockMetaCache_AddSegment_Call{Call: _e.mock.On("AddSegment",
|
||||
append([]interface{}{segInfo, factory}, actions...)...)}
|
||||
|
@ -114,6 +114,50 @@ func (_c *MockMetaCache_Collection_Call) RunAndReturn(run func() int64) *MockMet
|
|||
return _c
|
||||
}
|
||||
|
||||
// DetectMissingSegments provides a mock function with given fields: segments
|
||||
func (_m *MockMetaCache) DetectMissingSegments(segments map[int64]struct{}) []int64 {
|
||||
ret := _m.Called(segments)
|
||||
|
||||
var r0 []int64
|
||||
if rf, ok := ret.Get(0).(func(map[int64]struct{}) []int64); ok {
|
||||
r0 = rf(segments)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).([]int64)
|
||||
}
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// MockMetaCache_DetectMissingSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DetectMissingSegments'
|
||||
type MockMetaCache_DetectMissingSegments_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// DetectMissingSegments is a helper method to define mock.On call
|
||||
// - segments map[int64]struct{}
|
||||
func (_e *MockMetaCache_Expecter) DetectMissingSegments(segments interface{}) *MockMetaCache_DetectMissingSegments_Call {
|
||||
return &MockMetaCache_DetectMissingSegments_Call{Call: _e.mock.On("DetectMissingSegments", segments)}
|
||||
}
|
||||
|
||||
func (_c *MockMetaCache_DetectMissingSegments_Call) Run(run func(segments map[int64]struct{})) *MockMetaCache_DetectMissingSegments_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(map[int64]struct{}))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetaCache_DetectMissingSegments_Call) Return(_a0 []int64) *MockMetaCache_DetectMissingSegments_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetaCache_DetectMissingSegments_Call) RunAndReturn(run func(map[int64]struct{}) []int64) *MockMetaCache_DetectMissingSegments_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// GetSegmentByID provides a mock function with given fields: id, filters
|
||||
func (_m *MockMetaCache) GetSegmentByID(id int64, filters ...SegmentFilter) (*SegmentInfo, bool) {
|
||||
_va := make([]interface{}, len(filters))
|
||||
|
@ -153,8 +197,8 @@ type MockMetaCache_GetSegmentByID_Call struct {
|
|||
}
|
||||
|
||||
// GetSegmentByID is a helper method to define mock.On call
|
||||
// - id int64
|
||||
// - filters ...SegmentFilter
|
||||
// - id int64
|
||||
// - filters ...SegmentFilter
|
||||
func (_e *MockMetaCache_Expecter) GetSegmentByID(id interface{}, filters ...interface{}) *MockMetaCache_GetSegmentByID_Call {
|
||||
return &MockMetaCache_GetSegmentByID_Call{Call: _e.mock.On("GetSegmentByID",
|
||||
append([]interface{}{id}, filters...)...)}
|
||||
|
@ -211,7 +255,7 @@ type MockMetaCache_GetSegmentIDsBy_Call struct {
|
|||
}
|
||||
|
||||
// GetSegmentIDsBy is a helper method to define mock.On call
|
||||
// - filters ...SegmentFilter
|
||||
// - filters ...SegmentFilter
|
||||
func (_e *MockMetaCache_Expecter) GetSegmentIDsBy(filters ...interface{}) *MockMetaCache_GetSegmentIDsBy_Call {
|
||||
return &MockMetaCache_GetSegmentIDsBy_Call{Call: _e.mock.On("GetSegmentIDsBy",
|
||||
append([]interface{}{}, filters...)...)}
|
||||
|
@ -268,7 +312,7 @@ type MockMetaCache_GetSegmentsBy_Call struct {
|
|||
}
|
||||
|
||||
// GetSegmentsBy is a helper method to define mock.On call
|
||||
// - filters ...SegmentFilter
|
||||
// - filters ...SegmentFilter
|
||||
func (_e *MockMetaCache_Expecter) GetSegmentsBy(filters ...interface{}) *MockMetaCache_GetSegmentsBy_Call {
|
||||
return &MockMetaCache_GetSegmentsBy_Call{Call: _e.mock.On("GetSegmentsBy",
|
||||
append([]interface{}{}, filters...)...)}
|
||||
|
@ -336,8 +380,8 @@ type MockMetaCache_PredictSegments_Call struct {
|
|||
}
|
||||
|
||||
// PredictSegments is a helper method to define mock.On call
|
||||
// - pk storage.PrimaryKey
|
||||
// - filters ...SegmentFilter
|
||||
// - pk storage.PrimaryKey
|
||||
// - filters ...SegmentFilter
|
||||
func (_e *MockMetaCache_Expecter) PredictSegments(pk interface{}, filters ...interface{}) *MockMetaCache_PredictSegments_Call {
|
||||
return &MockMetaCache_PredictSegments_Call{Call: _e.mock.On("PredictSegments",
|
||||
append([]interface{}{pk}, filters...)...)}
|
||||
|
@ -394,7 +438,7 @@ type MockMetaCache_RemoveSegments_Call struct {
|
|||
}
|
||||
|
||||
// RemoveSegments is a helper method to define mock.On call
|
||||
// - filters ...SegmentFilter
|
||||
// - filters ...SegmentFilter
|
||||
func (_e *MockMetaCache_Expecter) RemoveSegments(filters ...interface{}) *MockMetaCache_RemoveSegments_Call {
|
||||
return &MockMetaCache_RemoveSegments_Call{Call: _e.mock.On("RemoveSegments",
|
||||
append([]interface{}{}, filters...)...)}
|
||||
|
@ -466,6 +510,42 @@ func (_c *MockMetaCache_Schema_Call) RunAndReturn(run func() *schemapb.Collectio
|
|||
return _c
|
||||
}
|
||||
|
||||
// UpdateSegmentView provides a mock function with given fields: partitionID, newSegments, newSegmentsBF, allSegments
|
||||
func (_m *MockMetaCache) UpdateSegmentView(partitionID int64, newSegments []*datapb.SyncSegmentInfo, newSegmentsBF []*BloomFilterSet, allSegments map[int64]struct{}) {
|
||||
_m.Called(partitionID, newSegments, newSegmentsBF, allSegments)
|
||||
}
|
||||
|
||||
// MockMetaCache_UpdateSegmentView_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateSegmentView'
|
||||
type MockMetaCache_UpdateSegmentView_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// UpdateSegmentView is a helper method to define mock.On call
|
||||
// - partitionID int64
|
||||
// - newSegments []*datapb.SyncSegmentInfo
|
||||
// - newSegmentsBF []*BloomFilterSet
|
||||
// - allSegments map[int64]struct{}
|
||||
func (_e *MockMetaCache_Expecter) UpdateSegmentView(partitionID interface{}, newSegments interface{}, newSegmentsBF interface{}, allSegments interface{}) *MockMetaCache_UpdateSegmentView_Call {
|
||||
return &MockMetaCache_UpdateSegmentView_Call{Call: _e.mock.On("UpdateSegmentView", partitionID, newSegments, newSegmentsBF, allSegments)}
|
||||
}
|
||||
|
||||
func (_c *MockMetaCache_UpdateSegmentView_Call) Run(run func(partitionID int64, newSegments []*datapb.SyncSegmentInfo, newSegmentsBF []*BloomFilterSet, allSegments map[int64]struct{})) *MockMetaCache_UpdateSegmentView_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(int64), args[1].([]*datapb.SyncSegmentInfo), args[2].([]*BloomFilterSet), args[3].(map[int64]struct{}))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetaCache_UpdateSegmentView_Call) Return() *MockMetaCache_UpdateSegmentView_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetaCache_UpdateSegmentView_Call) RunAndReturn(run func(int64, []*datapb.SyncSegmentInfo, []*BloomFilterSet, map[int64]struct{})) *MockMetaCache_UpdateSegmentView_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// UpdateSegments provides a mock function with given fields: action, filters
|
||||
func (_m *MockMetaCache) UpdateSegments(action SegmentAction, filters ...SegmentFilter) {
|
||||
_va := make([]interface{}, len(filters))
|
||||
|
@ -484,8 +564,8 @@ type MockMetaCache_UpdateSegments_Call struct {
|
|||
}
|
||||
|
||||
// UpdateSegments is a helper method to define mock.On call
|
||||
// - action SegmentAction
|
||||
// - filters ...SegmentFilter
|
||||
// - action SegmentAction
|
||||
// - filters ...SegmentFilter
|
||||
func (_e *MockMetaCache_Expecter) UpdateSegments(action interface{}, filters ...interface{}) *MockMetaCache_UpdateSegments_Call {
|
||||
return &MockMetaCache_UpdateSegments_Call{Call: _e.mock.On("UpdateSegments",
|
||||
append([]interface{}{action}, filters...)...)}
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
|
@ -30,13 +31,18 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/datanode/compaction"
|
||||
"github.com/milvus-io/milvus/internal/datanode/importv2"
|
||||
"github.com/milvus-io/milvus/internal/datanode/io"
|
||||
"github.com/milvus-io/milvus/internal/datanode/metacache"
|
||||
"github.com/milvus-io/milvus/internal/datanode/util"
|
||||
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/util/importutilv2"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/tracer"
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
|
@ -262,6 +268,9 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
|
|||
log := log.Ctx(ctx).With(
|
||||
zap.Int64("planID", req.GetPlanID()),
|
||||
zap.Int64("nodeID", node.GetNodeID()),
|
||||
zap.Int64("collectionID", req.GetCollectionId()),
|
||||
zap.Int64("partitionID", req.GetPartitionId()),
|
||||
zap.String("channel", req.GetChannelName()),
|
||||
)
|
||||
|
||||
log.Info("DataNode receives SyncSegments")
|
||||
|
@ -271,8 +280,62 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
|
|||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
// TODO: sheep, add a new DropCompaction interface, deprecate SyncSegments
|
||||
node.compactionExecutor.removeTask(req.GetPlanID())
|
||||
if len(req.GetSegmentInfos()) <= 0 {
|
||||
log.Info("sync segments is empty, skip it")
|
||||
return merr.Success(), nil
|
||||
}
|
||||
|
||||
ds, ok := node.flowgraphManager.GetFlowgraphService(req.GetChannelName())
|
||||
if !ok {
|
||||
node.compactionExecutor.discardPlan(req.GetChannelName())
|
||||
err := merr.WrapErrChannelNotFound(req.GetChannelName())
|
||||
log.Warn("failed to get flow graph service", zap.Error(err))
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
allSegments := make(map[int64]struct{})
|
||||
for segID := range req.GetSegmentInfos() {
|
||||
allSegments[segID] = struct{}{}
|
||||
}
|
||||
|
||||
missingSegments := ds.metacache.DetectMissingSegments(allSegments)
|
||||
|
||||
newSegments := make([]*datapb.SyncSegmentInfo, 0, len(missingSegments))
|
||||
futures := make([]*conc.Future[any], 0, len(missingSegments))
|
||||
|
||||
for _, segID := range missingSegments {
|
||||
segID := segID
|
||||
newSeg := req.GetSegmentInfos()[segID]
|
||||
newSegments = append(newSegments, newSeg)
|
||||
future := io.GetOrCreateStatsPool().Submit(func() (any, error) {
|
||||
var val *metacache.BloomFilterSet
|
||||
var err error
|
||||
err = binlog.DecompressBinLog(storage.StatsBinlog, req.GetCollectionId(), req.GetPartitionId(), newSeg.GetSegmentId(), []*datapb.FieldBinlog{newSeg.GetPkStatsLog()})
|
||||
if err != nil {
|
||||
log.Warn("failed to DecompressBinLog", zap.Error(err))
|
||||
return val, err
|
||||
}
|
||||
pks, err := util.LoadStats(ctx, node.chunkManager, ds.metacache.Schema(), newSeg.GetSegmentId(), []*datapb.FieldBinlog{newSeg.GetPkStatsLog()})
|
||||
if err != nil {
|
||||
log.Warn("failed to load segment stats log", zap.Error(err))
|
||||
return val, err
|
||||
}
|
||||
val = metacache.NewBloomFilterSet(pks...)
|
||||
return val, nil
|
||||
})
|
||||
futures = append(futures, future)
|
||||
}
|
||||
|
||||
err := conc.AwaitAll(futures...)
|
||||
if err != nil {
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
newSegmentsBF := lo.Map(futures, func(future *conc.Future[any], _ int) *metacache.BloomFilterSet {
|
||||
return future.Value().(*metacache.BloomFilterSet)
|
||||
})
|
||||
|
||||
ds.metacache.UpdateSegmentView(req.GetPartitionId(), newSegments, newSegmentsBF, allSegments)
|
||||
return merr.Success(), nil
|
||||
}
|
||||
|
||||
|
|
|
@ -492,3 +492,163 @@ func (s *DataNodeServicesSuite) TestQuerySlot() {
|
|||
s.NoError(merr.Error(resp.GetStatus()))
|
||||
})
|
||||
}
|
||||
|
||||
func (s *DataNodeServicesSuite) TestSyncSegments() {
|
||||
s.Run("node not healthy", func() {
|
||||
s.SetupTest()
|
||||
s.node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
|
||||
ctx := context.Background()
|
||||
status, err := s.node.SyncSegments(ctx, nil)
|
||||
s.NoError(err)
|
||||
s.False(merr.Ok(status))
|
||||
s.ErrorIs(merr.Error(status), merr.ErrServiceNotReady)
|
||||
})
|
||||
|
||||
s.Run("dataSyncService not exist", func() {
|
||||
s.SetupTest()
|
||||
ctx := context.Background()
|
||||
req := &datapb.SyncSegmentsRequest{
|
||||
ChannelName: "channel1",
|
||||
PartitionId: 2,
|
||||
CollectionId: 1,
|
||||
SegmentInfos: map[int64]*datapb.SyncSegmentInfo{
|
||||
102: {
|
||||
SegmentId: 102,
|
||||
PkStatsLog: &datapb.FieldBinlog{
|
||||
FieldID: 100,
|
||||
Binlogs: nil,
|
||||
},
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: 2,
|
||||
NumOfRows: 1024,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
status, err := s.node.SyncSegments(ctx, req)
|
||||
s.NoError(err)
|
||||
s.False(merr.Ok(status))
|
||||
})
|
||||
|
||||
s.Run("normal case", func() {
|
||||
s.SetupTest()
|
||||
cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{
|
||||
Schema: &schemapb.CollectionSchema{
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: 100,
|
||||
Name: "pk",
|
||||
IsPrimaryKey: true,
|
||||
Description: "",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
},
|
||||
},
|
||||
Vchan: &datapb.VchannelInfo{},
|
||||
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
|
||||
return metacache.NewBloomFilterSet()
|
||||
})
|
||||
cache.AddSegment(&datapb.SegmentInfo{
|
||||
ID: 100,
|
||||
CollectionID: 1,
|
||||
PartitionID: 2,
|
||||
InsertChannel: "111",
|
||||
NumOfRows: 0,
|
||||
State: commonpb.SegmentState_Growing,
|
||||
Level: datapb.SegmentLevel_L0,
|
||||
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
|
||||
return metacache.NewBloomFilterSet()
|
||||
})
|
||||
cache.AddSegment(&datapb.SegmentInfo{
|
||||
ID: 101,
|
||||
CollectionID: 1,
|
||||
PartitionID: 2,
|
||||
InsertChannel: "111",
|
||||
NumOfRows: 0,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
|
||||
return metacache.NewBloomFilterSet()
|
||||
})
|
||||
cache.AddSegment(&datapb.SegmentInfo{
|
||||
ID: 102,
|
||||
CollectionID: 1,
|
||||
PartitionID: 2,
|
||||
InsertChannel: "111",
|
||||
NumOfRows: 0,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L0,
|
||||
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
|
||||
return metacache.NewBloomFilterSet()
|
||||
})
|
||||
cache.AddSegment(&datapb.SegmentInfo{
|
||||
ID: 103,
|
||||
CollectionID: 1,
|
||||
PartitionID: 2,
|
||||
InsertChannel: "111",
|
||||
NumOfRows: 0,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L0,
|
||||
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
|
||||
return metacache.NewBloomFilterSet()
|
||||
})
|
||||
mockFlowgraphManager := NewMockFlowgraphManager(s.T())
|
||||
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).Return(&dataSyncService{
|
||||
metacache: cache,
|
||||
}, true)
|
||||
s.node.flowgraphManager = mockFlowgraphManager
|
||||
ctx := context.Background()
|
||||
req := &datapb.SyncSegmentsRequest{
|
||||
ChannelName: "channel1",
|
||||
PartitionId: 2,
|
||||
CollectionId: 1,
|
||||
SegmentInfos: map[int64]*datapb.SyncSegmentInfo{
|
||||
103: {
|
||||
SegmentId: 103,
|
||||
PkStatsLog: &datapb.FieldBinlog{
|
||||
FieldID: 100,
|
||||
Binlogs: nil,
|
||||
},
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L0,
|
||||
NumOfRows: 1024,
|
||||
},
|
||||
104: {
|
||||
SegmentId: 104,
|
||||
PkStatsLog: &datapb.FieldBinlog{
|
||||
FieldID: 100,
|
||||
Binlogs: nil,
|
||||
},
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Level: datapb.SegmentLevel_L1,
|
||||
NumOfRows: 1024,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
status, err := s.node.SyncSegments(ctx, req)
|
||||
s.NoError(err)
|
||||
s.True(merr.Ok(status))
|
||||
|
||||
info, exist := cache.GetSegmentByID(100)
|
||||
s.True(exist)
|
||||
s.NotNil(info)
|
||||
|
||||
info, exist = cache.GetSegmentByID(101)
|
||||
s.False(exist)
|
||||
s.Nil(info)
|
||||
|
||||
info, exist = cache.GetSegmentByID(102)
|
||||
s.False(exist)
|
||||
s.Nil(info)
|
||||
|
||||
info, exist = cache.GetSegmentByID(103)
|
||||
s.True(exist)
|
||||
s.NotNil(info)
|
||||
|
||||
info, exist = cache.GetSegmentByID(104)
|
||||
s.True(exist)
|
||||
s.NotNil(info)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
// Code generated by mockery v2.30.1. DO NOT EDIT.
|
||||
// Code generated by mockery v2.32.4. DO NOT EDIT.
|
||||
|
||||
package syncmgr
|
||||
|
||||
|
@ -57,7 +57,7 @@ type MockSyncManager_GetEarliestPosition_Call struct {
|
|||
}
|
||||
|
||||
// GetEarliestPosition is a helper method to define mock.On call
|
||||
// - channel string
|
||||
// - channel string
|
||||
func (_e *MockSyncManager_Expecter) GetEarliestPosition(channel interface{}) *MockSyncManager_GetEarliestPosition_Call {
|
||||
return &MockSyncManager_GetEarliestPosition_Call{Call: _e.mock.On("GetEarliestPosition", channel)}
|
||||
}
|
||||
|
@ -101,8 +101,8 @@ type MockSyncManager_SyncData_Call struct {
|
|||
}
|
||||
|
||||
// SyncData is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - task Task
|
||||
// - ctx context.Context
|
||||
// - task Task
|
||||
func (_e *MockSyncManager_Expecter) SyncData(ctx interface{}, task interface{}) *MockSyncManager_SyncData_Call {
|
||||
return &MockSyncManager_SyncData_Call{Call: _e.mock.On("SyncData", ctx, task)}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
// Code generated by mockery v2.30.1. DO NOT EDIT.
|
||||
// Code generated by mockery v2.32.4. DO NOT EDIT.
|
||||
|
||||
package syncmgr
|
||||
|
||||
|
@ -115,7 +115,7 @@ type MockTask_HandleError_Call struct {
|
|||
}
|
||||
|
||||
// HandleError is a helper method to define mock.On call
|
||||
// - _a0 error
|
||||
// - _a0 error
|
||||
func (_e *MockTask_Expecter) HandleError(_a0 interface{}) *MockTask_HandleError_Call {
|
||||
return &MockTask_HandleError_Call{Call: _e.mock.On("HandleError", _a0)}
|
||||
}
|
||||
|
|
|
@ -496,15 +496,29 @@ message CompactionStateRequest {
|
|||
common.MsgBase base = 1;
|
||||
}
|
||||
|
||||
message SyncSegmentInfo {
|
||||
int64 segment_id = 1;
|
||||
FieldBinlog pk_stats_log = 2;
|
||||
common.SegmentState state = 3;
|
||||
SegmentLevel level = 4;
|
||||
int64 num_of_rows = 5;
|
||||
}
|
||||
|
||||
message SyncSegmentsRequest {
|
||||
// Deprecated, after v2.4.3
|
||||
int64 planID = 1;
|
||||
// Deprecated, after v2.4.3
|
||||
int64 compacted_to = 2;
|
||||
// Deprecated, after v2.4.3
|
||||
int64 num_of_rows = 3;
|
||||
// Deprecated, after v2.4.3
|
||||
repeated int64 compacted_from = 4;
|
||||
// Deprecated, after v2.4.3
|
||||
repeated FieldBinlog stats_logs = 5;
|
||||
string channel_name = 6;
|
||||
int64 partition_id = 7;
|
||||
int64 collection_id = 8;
|
||||
map<int64, SyncSegmentInfo> segment_infos = 9;
|
||||
}
|
||||
|
||||
message CompactionSegmentBinlogs {
|
||||
|
|
|
@ -2806,6 +2806,7 @@ type dataCoordConfig struct {
|
|||
SingleCompactionDeltalogMaxNum ParamItem `refreshable:"true"`
|
||||
GlobalCompactionInterval ParamItem `refreshable:"false"`
|
||||
ChannelCheckpointMaxLag ParamItem `refreshable:"true"`
|
||||
SyncSegmentsInterval ParamItem `refreshable:"false"`
|
||||
|
||||
// LevelZero Segment
|
||||
EnableLevelZeroSegment ParamItem `refreshable:"false"`
|
||||
|
@ -3146,6 +3147,14 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
}
|
||||
p.ChannelCheckpointMaxLag.Init(base.mgr)
|
||||
|
||||
p.SyncSegmentsInterval = ParamItem{
|
||||
Key: "dataCoord.sync.interval",
|
||||
Version: "2.4.3",
|
||||
Doc: "The time interval for regularly syncing segments",
|
||||
DefaultValue: "600", // 10 * 60 seconds
|
||||
}
|
||||
p.SyncSegmentsInterval.Init(base.mgr)
|
||||
|
||||
// LevelZeroCompaction
|
||||
p.EnableLevelZeroSegment = ParamItem{
|
||||
Key: "dataCoord.segment.enableLevelZero",
|
||||
|
|
Loading…
Reference in New Issue