diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 5b11341b73..44e292bc76 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -486,22 +486,23 @@ func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) { dblobs := make(map[UniqueID][]*Blob) allPath := make([][]string, 0) + for _, s := range t.plan.GetSegmentBinlogs() { - // Get the number of field binlog files from non-empty segment - var binlogNum int + log := log.With(zap.Int64("segmentID", s.GetSegmentID())) + // Get the batch count of field binlog files + var binlogBatch int for _, b := range s.GetFieldBinlogs() { if b != nil { - binlogNum = len(b.GetBinlogs()) + binlogBatch = len(b.GetBinlogs()) break } } - // Unable to deal with all empty segments cases, so return error - if binlogNum == 0 { - log.Warn("compact wrong, all segments' binlogs are empty") - return nil, errIllegalCompactionPlan + if binlogBatch == 0 { + log.Warn("compacting empty segment") + continue } - for idx := 0; idx < binlogNum; idx++ { + for idx := 0; idx < binlogBatch; idx++ { var ps []string for _, f := range s.GetFieldBinlogs() { ps = append(ps, f.GetBinlogs()[idx].GetLogPath()) @@ -509,7 +510,6 @@ func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) { allPath = append(allPath, ps) } - segID := s.GetSegmentID() paths := make([]string, 0) for _, d := range s.GetDeltalogs() { for _, l := range d.GetBinlogs() { @@ -521,13 +521,25 @@ func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) { if len(paths) != 0 { bs, err := downloadBlobs(ctxTimeout, t.binlogIO, paths) if err != nil { - log.Warn("compact wrong, fail to download deltalogs", zap.Int64("segment", segID), zap.Strings("path", paths), zap.Error(err)) + log.Warn("compact wrong, fail to download deltalogs", zap.Strings("path", paths), zap.Error(err)) return nil, err } - dblobs[segID] = append(dblobs[segID], bs...) + dblobs[s.GetSegmentID()] = append(dblobs[s.GetSegmentID()], bs...) } } - log.Info("compact download deltalogs done", zap.Duration("elapse", t.tr.RecordSpan())) + + // Unable to deal with all empty segments cases, so return error + if len(allPath) == 0 { + log.Warn("compact wrong, all segments are empty") + return nil, errIllegalCompactionPlan + } + + log.Info("compact download deltalogs elapse", zap.Duration("elapse", t.tr.RecordSpan())) + + if err != nil { + log.Warn("compact IO wrong", zap.Error(err)) + return nil, err + } deltaPk2Ts, err := t.mergeDeltalogs(dblobs) if err != nil { diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index 331b9594d8..e3b329335c 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -778,6 +778,31 @@ func TestCompactorInterfaceMethods(t *testing.T) { Deltalogs: nil, }} paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0") // Turn off auto expiration + t.Run("Test compact with all segment empty", func(t *testing.T) { + alloc := allocator.NewMockAllocator(t) + alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil) + ctx, cancel := context.WithCancel(context.TODO()) + + mockSyncmgr := syncmgr.NewMockSyncManager(t) + mockSyncmgr.EXPECT().Block(mock.Anything).Return() + task := &compactionTask{ + ctx: ctx, + cancel: cancel, + Allocator: alloc, + done: make(chan struct{}, 1), + tr: timerecord.NewTimeRecorder("test"), + syncMgr: mockSyncmgr, + plan: &datapb.CompactionPlan{ + PlanID: 999, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{{SegmentID: 100}}, + TimeoutInSeconds: 10, + Type: datapb.CompactionType_MixCompaction, + }, + } + + _, err := task.compact() + assert.ErrorIs(t, errIllegalCompactionPlan, err) + }) t.Run("Test compact invalid empty segment binlogs", func(t *testing.T) { plan := &datapb.CompactionPlan{ @@ -894,16 +919,17 @@ func TestCompactorInterfaceMethods(t *testing.T) { NumOfRows: 2, }, bfs) - metaCache.EXPECT().GetSegmentByID(mock.Anything).RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) { - switch id { - case c.segID1: - return seg1, true - case c.segID2: - return seg2, true - default: - return nil, false - } - }) + bfs = metacache.NewBloomFilterSet() + seg3 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ + CollectionID: c.colID, + PartitionID: c.parID, + ID: 99999, + }, bfs) + + metaCache.EXPECT().GetSegmentByID(c.segID1).Return(seg1, true) + metaCache.EXPECT().GetSegmentByID(c.segID2).Return(seg2, true) + metaCache.EXPECT().GetSegmentByID(seg3.SegmentID()).Return(seg3, true) + metaCache.EXPECT().GetSegmentByID(mock.Anything).Return(nil, false) iData1 := genInsertDataWithPKs(c.pks1, c.pkType) dData1 := &DeleteData{ @@ -953,6 +979,9 @@ func TestCompactorInterfaceMethods(t *testing.T) { Field2StatslogPaths: lo.Values(sPaths2), Deltalogs: dPaths2, }, + { + SegmentID: seg3.SegmentID(), // empty segment + }, }, StartTime: 0, TimeoutInSeconds: 10, diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 54d92f8ab1..ea7d2e815d 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -91,6 +91,7 @@ func (ddn *ddNode) IsValidInMsg(in []Msg) bool { // Operate handles input messages, implementing flowgrpah.Node func (ddn *ddNode) Operate(in []Msg) []Msg { + log := log.With(zap.String("channel", ddn.vChannelName)) msMsg, ok := in[0].(*MsgStreamMsg) if !ok { log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) @@ -109,14 +110,12 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { endPositions: msMsg.EndPositions(), dropCollection: false, } - log.Warn("MsgStream closed", zap.Any("ddNode node", ddn.Name()), zap.Int64("collection", ddn.collectionID), zap.String("channel", ddn.vChannelName)) + log.Warn("MsgStream closed", zap.Any("ddNode node", ddn.Name()), zap.Int64("collection", ddn.collectionID)) return []Msg{&fgMsg} } if load := ddn.dropMode.Load(); load != nil && load.(bool) { - log.RatedInfo(1.0, "ddNode in dropMode", - zap.String("vChannelName", ddn.vChannelName), - zap.Int64("collectionID", ddn.collectionID)) + log.RatedInfo(1.0, "ddNode in dropMode") return []Msg{} } @@ -147,12 +146,10 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { switch msg.Type() { case commonpb.MsgType_DropCollection: if msg.(*msgstream.DropCollectionMsg).GetCollectionID() == ddn.collectionID { - log.Info("Receiving DropCollection msg", - zap.Int64("collectionID", ddn.collectionID), - zap.String("vChannelName", ddn.vChannelName)) + log.Info("Receiving DropCollection msg") ddn.dropMode.Store(true) - log.Info("Stop compaction of vChannel", zap.String("vChannelName", ddn.vChannelName)) + log.Info("Stop compaction for dropped channel") ddn.compactionExecutor.discardByDroppedChannel(ddn.vChannelName) fgMsg.dropCollection = true } @@ -160,10 +157,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { case commonpb.MsgType_DropPartition: dpMsg := msg.(*msgstream.DropPartitionMsg) if dpMsg.GetCollectionID() == ddn.collectionID { - log.Info("drop partition msg received", - zap.Int64("collectionID", dpMsg.GetCollectionID()), - zap.Int64("partitionID", dpMsg.GetPartitionID()), - zap.String("vChanneName", ddn.vChannelName)) + log.Info("drop partition msg received", zap.Int64("partitionID", dpMsg.GetPartitionID())) fgMsg.dropPartitions = append(fgMsg.dropPartitions, dpMsg.PartitionID) } @@ -180,8 +174,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { log.Debug("filter insert messages", zap.Int64("filter segmentID", imsg.GetSegmentID()), zap.Uint64("message timestamp", msg.EndTs()), - zap.String("segment's vChannel", imsg.GetShardName()), - zap.String("current vChannel", ddn.vChannelName)) + ) continue } @@ -200,15 +193,12 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { Add(float64(imsg.GetNumRows())) log.Debug("DDNode receive insert messages", - zap.Int("numRows", len(imsg.GetRowIDs())), - zap.String("vChannelName", ddn.vChannelName)) + zap.Int64("segmentID", imsg.GetSegmentID()), + zap.Int("numRows", len(imsg.GetRowIDs()))) fgMsg.insertMessages = append(fgMsg.insertMessages, imsg) case commonpb.MsgType_Delete: dmsg := msg.(*msgstream.DeleteMsg) - log.Debug("DDNode receive delete messages", - zap.Int64("numRows", dmsg.NumRows), - zap.String("vChannelName", ddn.vChannelName)) if dmsg.CollectionID != ddn.collectionID { log.Warn("filter invalid DeleteMsg, collection mis-match", @@ -216,6 +206,8 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { zap.Int64("Expected collID", ddn.collectionID)) continue } + + log.Debug("DDNode receive delete messages", zap.Int64("numRows", dmsg.NumRows)) rateCol.Add(metricsinfo.DeleteConsumeThroughput, float64(proto.Size(&dmsg.DeleteRequest))) metrics.DataNodeConsumeBytesCount.