mirror of https://github.com/milvus-io/milvus.git
Fix the `segment not found` error (#22739)
Signed-off-by: SimFG <bang.fu@zilliz.com>pull/22756/head
parent
bca02d90c0
commit
1e8c8f9f1c
|
@ -28,7 +28,6 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -255,21 +254,15 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu
|
|||
|
||||
func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.CompactionPlan, result *datapb.CompactionResult) error {
|
||||
// Also prepare metric updates.
|
||||
oldSegments, modSegments, newSegment, metricMutation, err := c.meta.PrepareCompleteCompactionMutation(plan.GetSegmentBinlogs(), result)
|
||||
_, modSegments, newSegment, metricMutation, err := c.meta.PrepareCompleteCompactionMutation(plan.GetSegmentBinlogs(), result)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log := log.With(zap.Int64("planID", plan.GetPlanID()))
|
||||
|
||||
modInfos := make([]*datapb.SegmentInfo, len(modSegments))
|
||||
for i := range modSegments {
|
||||
modInfos[i] = modSegments[i].SegmentInfo
|
||||
}
|
||||
|
||||
log.Info("handleCompactionResult: altering metastore after compaction")
|
||||
if err := c.meta.alterMetaStoreAfterCompaction(modInfos, newSegment.SegmentInfo); err != nil {
|
||||
log.Warn("handleCompactionResult: fail to alter metastore after compaction", zap.Error(err))
|
||||
return fmt.Errorf("fail to alter metastore after compaction, err=%w", err)
|
||||
if err := c.meta.alterMetaStoreAfterCompaction(newSegment, modSegments); err != nil {
|
||||
log.Warn("fail to alert meta store", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
var nodeID = c.plans[plan.GetPlanID()].dataNodeID
|
||||
|
@ -284,13 +277,11 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact
|
|||
log.Info("handleCompactionResult: syncing segments with node", zap.Int64("nodeID", nodeID))
|
||||
if err := c.sessions.SyncSegments(nodeID, req); err != nil {
|
||||
log.Warn("handleCompactionResult: fail to sync segments with node, reverting metastore",
|
||||
zap.Int64("nodeID", nodeID), zap.String("reason", err.Error()))
|
||||
return c.meta.revertAlterMetaStoreAfterCompaction(oldSegments, newSegment.SegmentInfo)
|
||||
zap.Int64("nodeID", nodeID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
// Apply metrics after successful meta update.
|
||||
metricMutation.commit()
|
||||
|
||||
c.meta.alterInMemoryMetaAfterCompaction(newSegment, modSegments)
|
||||
log.Info("handleCompactionResult: success to handle merge compaction result")
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -283,7 +283,7 @@ func getDeltaLogPath(rootPath string, segmentID typeutil.UniqueID) string {
|
|||
|
||||
func TestCompactionPlanHandler_handleMergeCompactionResult(t *testing.T) {
|
||||
mockDataNode := &mocks.DataNode{}
|
||||
mockDataNode.EXPECT().SyncSegments(mock.Anything, mock.Anything).Run(func(ctx context.Context, req *datapb.SyncSegmentsRequest) {}).Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil)
|
||||
call := mockDataNode.EXPECT().SyncSegments(mock.Anything, mock.Anything).Run(func(ctx context.Context, req *datapb.SyncSegmentsRequest) {}).Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil)
|
||||
|
||||
dataNodeID := UniqueID(111)
|
||||
|
||||
|
@ -415,6 +415,11 @@ func TestCompactionPlanHandler_handleMergeCompactionResult(t *testing.T) {
|
|||
has, err = c.meta.HasSegments([]UniqueID{1, 2, 3})
|
||||
require.NoError(t, err)
|
||||
require.True(t, has)
|
||||
|
||||
call.Unset()
|
||||
call = mockDataNode.EXPECT().SyncSegments(mock.Anything, mock.Anything).Run(func(ctx context.Context, req *datapb.SyncSegmentsRequest) {}).Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil)
|
||||
err = c.handleMergeCompactionResult(plan, compactionResult2)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestCompactionPlanHandler_completeCompaction(t *testing.T) {
|
||||
|
|
|
@ -1052,8 +1052,14 @@ func (m *meta) copyDeltaFiles(binlogs []*datapb.FieldBinlog, collectionID, parti
|
|||
return ret, nil
|
||||
}
|
||||
|
||||
func (m *meta) alterMetaStoreAfterCompaction(modSegments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error {
|
||||
modSegIDs := lo.Map(modSegments, func(segment *datapb.SegmentInfo, _ int) int64 { return segment.GetID() })
|
||||
func (m *meta) alterMetaStoreAfterCompaction(segmentCompactTo *SegmentInfo, segmentsCompactFrom []*SegmentInfo) error {
|
||||
modInfos := make([]*datapb.SegmentInfo, len(segmentsCompactFrom))
|
||||
for i := range segmentsCompactFrom {
|
||||
modInfos[i] = segmentsCompactFrom[i].SegmentInfo
|
||||
}
|
||||
newSegment := segmentCompactTo.SegmentInfo
|
||||
|
||||
modSegIDs := lo.Map(modInfos, func(segment *datapb.SegmentInfo, _ int) int64 { return segment.GetID() })
|
||||
if newSegment.GetNumOfRows() == 0 {
|
||||
newSegment.State = commonpb.SegmentState_Dropped
|
||||
}
|
||||
|
@ -1065,20 +1071,11 @@ func (m *meta) alterMetaStoreAfterCompaction(modSegments []*datapb.SegmentInfo,
|
|||
zap.Int("stats log", len(newSegment.GetStatslogs())),
|
||||
zap.Int("delta logs", len(newSegment.GetDeltalogs())),
|
||||
zap.Int64("compact to segment", newSegment.GetID()))
|
||||
return m.catalog.AlterSegmentsAndAddNewSegment(m.ctx, modSegments, newSegment)
|
||||
}
|
||||
|
||||
func (m *meta) revertAlterMetaStoreAfterCompaction(oldSegments []*datapb.SegmentInfo, removalSegment *datapb.SegmentInfo) error {
|
||||
log.Info("meta update: revert metastore after compaction failure",
|
||||
zap.Int64("collectionID", removalSegment.CollectionID),
|
||||
zap.Int64("partitionID", removalSegment.PartitionID),
|
||||
zap.Int64("compactedTo (segment to remove)", removalSegment.ID),
|
||||
zap.Int64s("compactedFrom (segments to add back)", removalSegment.GetCompactionFrom()),
|
||||
)
|
||||
return m.catalog.RevertAlterSegmentsAndAddNewSegment(m.ctx, oldSegments, removalSegment)
|
||||
}
|
||||
|
||||
func (m *meta) alterInMemoryMetaAfterCompaction(segmentCompactTo *SegmentInfo, segmentsCompactFrom []*SegmentInfo) {
|
||||
err := m.catalog.AlterSegmentsAndAddNewSegment(m.ctx, modInfos, newSegment)
|
||||
if err != nil {
|
||||
log.Warn("fail to alter segments and new segment", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
var compactFromIDs []int64
|
||||
for _, v := range segmentsCompactFrom {
|
||||
compactFromIDs = append(compactFromIDs, v.GetID())
|
||||
|
@ -1097,6 +1094,7 @@ func (m *meta) alterInMemoryMetaAfterCompaction(segmentCompactTo *SegmentInfo, s
|
|||
log.Info("meta update: alter in memory meta after compaction - complete",
|
||||
zap.Int64("compact to segment ID", segmentCompactTo.GetID()),
|
||||
zap.Int64s("compact from segment IDs", compactFromIDs))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *meta) updateBinlogs(origin []*datapb.FieldBinlog, removes []*datapb.FieldBinlog, adds []*datapb.FieldBinlog) []*datapb.FieldBinlog {
|
||||
|
|
|
@ -25,7 +25,6 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
|
@ -34,6 +33,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/util"
|
||||
"github.com/samber/lo"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -545,48 +545,10 @@ func TestMeta_alterMetaStore(t *testing.T) {
|
|||
}},
|
||||
}
|
||||
|
||||
err := m.alterMetaStoreAfterCompaction(toAlter, newSeg)
|
||||
err := m.alterMetaStoreAfterCompaction(&SegmentInfo{SegmentInfo: newSeg}, lo.Map(toAlter, func(t *datapb.SegmentInfo, _ int) *SegmentInfo {
|
||||
return &SegmentInfo{SegmentInfo: t}
|
||||
}))
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = m.revertAlterMetaStoreAfterCompaction(toAlter, newSeg)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestMeta_alterInMemoryMetaAfterCompaction(t *testing.T) {
|
||||
m := &meta{
|
||||
catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()},
|
||||
segments: &SegmentsInfo{make(map[UniqueID]*SegmentInfo)},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
description string
|
||||
compactToSeg *SegmentInfo
|
||||
}{
|
||||
{
|
||||
"numRows>0", &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
NumOfRows: 10,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"numRows=0", &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
compactFrom := []*SegmentInfo{{}, {}}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.description, func(t *testing.T) {
|
||||
m.alterInMemoryMetaAfterCompaction(test.compactToSeg, compactFrom)
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) {
|
||||
|
|
|
@ -18,6 +18,7 @@ package datanode
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
|
@ -32,6 +33,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -61,7 +63,7 @@ type Channel interface {
|
|||
listNewSegmentsStartPositions() []*datapb.SegmentStartPosition
|
||||
transferNewSegments(segmentIDs []UniqueID)
|
||||
updateSegmentPKRange(segID UniqueID, ids storage.FieldData)
|
||||
mergeFlushedSegments(seg *Segment, planID UniqueID, compactedFrom []UniqueID) error
|
||||
mergeFlushedSegments(ctx context.Context, seg *Segment, planID UniqueID, compactedFrom []UniqueID) error
|
||||
hasSegment(segID UniqueID, countFlushed bool) bool
|
||||
removeSegments(segID ...UniqueID)
|
||||
listCompactedSegmentIDs() map[UniqueID][]UniqueID
|
||||
|
@ -541,8 +543,8 @@ func (c *ChannelMeta) getCollectionSchema(collID UniqueID, ts Timestamp) (*schem
|
|||
return c.collSchema, nil
|
||||
}
|
||||
|
||||
func (c *ChannelMeta) mergeFlushedSegments(seg *Segment, planID UniqueID, compactedFrom []UniqueID) error {
|
||||
log := log.With(
|
||||
func (c *ChannelMeta) mergeFlushedSegments(ctx context.Context, seg *Segment, planID UniqueID, compactedFrom []UniqueID) error {
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.Int64("segment ID", seg.segmentID),
|
||||
zap.Int64("collection ID", seg.collectionID),
|
||||
zap.Int64("partition ID", seg.partitionID),
|
||||
|
@ -567,12 +569,18 @@ func (c *ChannelMeta) mergeFlushedSegments(seg *Segment, planID UniqueID, compac
|
|||
|
||||
if len(inValidSegments) > 0 {
|
||||
log.Warn("no match flushed segments to merge from", zap.Int64s("invalid segmentIDs", inValidSegments))
|
||||
return fmt.Errorf("invalid compactedFrom segments: %v", inValidSegments)
|
||||
compactedFrom = lo.Without(compactedFrom, inValidSegments...)
|
||||
}
|
||||
|
||||
log.Info("merge flushed segments")
|
||||
c.segMu.Lock()
|
||||
defer c.segMu.Unlock()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Warn("the context has been closed", zap.Error(ctx.Err()))
|
||||
return errors.New("invalid context")
|
||||
default:
|
||||
}
|
||||
for _, ID := range compactedFrom {
|
||||
// the existent of the segments are already checked
|
||||
s := c.segments[ID]
|
||||
|
|
|
@ -611,31 +611,42 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) {
|
|||
stored bool
|
||||
|
||||
inCompactedFrom []UniqueID
|
||||
outFrom []UniqueID
|
||||
inSeg *Segment
|
||||
}{
|
||||
{"mismatch collection", false, false, []UniqueID{1, 2}, &Segment{
|
||||
segmentID: 3,
|
||||
collectionID: -1,
|
||||
}},
|
||||
{"no match flushed segment", false, false, []UniqueID{1, 6}, &Segment{
|
||||
segmentID: 3,
|
||||
collectionID: 1,
|
||||
}},
|
||||
{"numRows==0", true, false, []UniqueID{1, 2}, &Segment{
|
||||
segmentID: 3,
|
||||
collectionID: 1,
|
||||
numRows: 0,
|
||||
}},
|
||||
{"numRows>0", true, true, []UniqueID{1, 2}, &Segment{
|
||||
segmentID: 3,
|
||||
collectionID: 1,
|
||||
numRows: 15,
|
||||
}},
|
||||
{"segment exists but not flushed", false, false, []UniqueID{1, 4}, &Segment{
|
||||
segmentID: 3,
|
||||
collectionID: 1,
|
||||
numRows: 15,
|
||||
}},
|
||||
{"mismatch collection", false, false,
|
||||
[]UniqueID{1, 2}, []UniqueID{},
|
||||
&Segment{
|
||||
segmentID: 3,
|
||||
collectionID: -1,
|
||||
}},
|
||||
{"no match flushed segment", true, false,
|
||||
[]UniqueID{1, 6}, []UniqueID{1},
|
||||
&Segment{
|
||||
segmentID: 3,
|
||||
collectionID: 1,
|
||||
}},
|
||||
{"numRows==0", true, false,
|
||||
[]UniqueID{1, 2}, []UniqueID{},
|
||||
&Segment{
|
||||
segmentID: 3,
|
||||
collectionID: 1,
|
||||
numRows: 0,
|
||||
}},
|
||||
{"numRows>0", true, true,
|
||||
[]UniqueID{1, 2}, []UniqueID{1, 2},
|
||||
&Segment{
|
||||
segmentID: 3,
|
||||
collectionID: 1,
|
||||
numRows: 15,
|
||||
}},
|
||||
{"segment exists but not flushed", true, true,
|
||||
[]UniqueID{1, 4}, []UniqueID{1},
|
||||
&Segment{
|
||||
segmentID: 3,
|
||||
collectionID: 1,
|
||||
numRows: 15,
|
||||
}},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
|
@ -669,7 +680,7 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) {
|
|||
require.False(t, channel.hasSegment(3, true))
|
||||
|
||||
// tests start
|
||||
err := channel.mergeFlushedSegments(test.inSeg, 100, test.inCompactedFrom)
|
||||
err := channel.mergeFlushedSegments(context.Background(), test.inSeg, 100, test.inCompactedFrom)
|
||||
if test.isValid {
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
|
@ -684,7 +695,7 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) {
|
|||
|
||||
from, ok := to2from[3]
|
||||
assert.True(t, ok)
|
||||
assert.ElementsMatch(t, []UniqueID{1, 2}, from)
|
||||
assert.ElementsMatch(t, test.outFrom, from)
|
||||
} else {
|
||||
assert.False(t, channel.hasSegment(3, true))
|
||||
}
|
||||
|
|
|
@ -31,10 +31,11 @@ const (
|
|||
)
|
||||
|
||||
type compactionExecutor struct {
|
||||
executing sync.Map // planID to compactor
|
||||
completed sync.Map // planID to CompactionResult
|
||||
taskCh chan compactor
|
||||
dropped sync.Map // vchannel dropped
|
||||
executing sync.Map // planID to compactor
|
||||
completedCompactor sync.Map // planID to compactor
|
||||
completed sync.Map // planID to CompactionResult
|
||||
taskCh chan compactor
|
||||
dropped sync.Map // vchannel dropped
|
||||
}
|
||||
|
||||
func newCompactionExecutor() *compactionExecutor {
|
||||
|
@ -58,6 +59,14 @@ func (c *compactionExecutor) toCompleteState(task compactor) {
|
|||
c.executing.Delete(task.getPlanID())
|
||||
}
|
||||
|
||||
func (c *compactionExecutor) injectDone(planID UniqueID) {
|
||||
c.completed.Delete(planID)
|
||||
task, loaded := c.completedCompactor.LoadAndDelete(planID)
|
||||
if loaded {
|
||||
task.(compactor).injectDone()
|
||||
}
|
||||
}
|
||||
|
||||
// These two func are bounded for waitGroup
|
||||
func (c *compactionExecutor) executeWithState(task compactor) {
|
||||
go c.executeTask(task)
|
||||
|
@ -89,6 +98,7 @@ func (c *compactionExecutor) executeTask(task compactor) {
|
|||
)
|
||||
} else {
|
||||
c.completed.Store(task.getPlanID(), result)
|
||||
c.completedCompactor.Store(task.getPlanID(), task)
|
||||
}
|
||||
|
||||
log.Info("end to execute compaction", zap.Int64("planID", task.getPlanID()))
|
||||
|
@ -119,7 +129,7 @@ func (c *compactionExecutor) stopExecutingtaskByVChannelName(vChannelName string
|
|||
// remove all completed plans for vChannelName
|
||||
c.completed.Range(func(key interface{}, value interface{}) bool {
|
||||
if value.(*datapb.CompactionResult).GetChannel() == vChannelName {
|
||||
c.completed.Delete(key.(UniqueID))
|
||||
c.injectDone(key.(UniqueID))
|
||||
log.Info("remove compaction results for dropped channel",
|
||||
zap.String("channel", vChannelName),
|
||||
zap.Int64("planID", key.(UniqueID)))
|
||||
|
|
|
@ -153,6 +153,10 @@ func (mc *mockCompactor) compact() (*datapb.CompactionResult, error) {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
func (mc *mockCompactor) injectDone() {
|
||||
|
||||
}
|
||||
|
||||
func (mc *mockCompactor) getPlanID() UniqueID {
|
||||
return 1
|
||||
}
|
||||
|
|
|
@ -57,6 +57,7 @@ type iterator = storage.Iterator
|
|||
type compactor interface {
|
||||
complete()
|
||||
compact() (*datapb.CompactionResult, error)
|
||||
injectDone()
|
||||
stop()
|
||||
getPlanID() UniqueID
|
||||
getCollection() UniqueID
|
||||
|
@ -82,6 +83,7 @@ type compactionTask struct {
|
|||
done chan struct{}
|
||||
tr *timerecord.TimeRecorder
|
||||
chunkManager storage.ChunkManager
|
||||
inject *taskInjection
|
||||
}
|
||||
|
||||
// check if compactionTask implements compactor
|
||||
|
@ -121,6 +123,7 @@ func (t *compactionTask) complete() {
|
|||
func (t *compactionTask) stop() {
|
||||
t.cancel()
|
||||
<-t.done
|
||||
t.injectDone()
|
||||
}
|
||||
|
||||
func (t *compactionTask) getPlanID() UniqueID {
|
||||
|
@ -549,7 +552,12 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
|||
statsLog.LogPath = blobPath
|
||||
}
|
||||
})
|
||||
defer close(ti.injectOver)
|
||||
defer func() {
|
||||
// the injection will be closed if fail to compact
|
||||
if t.inject == nil {
|
||||
close(ti.injectOver)
|
||||
}
|
||||
}()
|
||||
|
||||
t.injectFlush(ti, segIDs...)
|
||||
<-ti.Injected()
|
||||
|
@ -662,12 +670,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
|||
Channel: t.plan.GetChannel(),
|
||||
}
|
||||
|
||||
uninjectStart := time.Now()
|
||||
ti.injectDone(true)
|
||||
uninjectEnd := time.Now()
|
||||
defer func() {
|
||||
log.Info("uninject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(uninjectEnd.Sub(uninjectStart))))
|
||||
}()
|
||||
t.inject = ti
|
||||
|
||||
log.Info("compaction done",
|
||||
zap.Int64("planID", t.plan.GetPlanID()),
|
||||
|
@ -685,6 +688,15 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
|||
return pack, nil
|
||||
}
|
||||
|
||||
func (t *compactionTask) injectDone() {
|
||||
if t.inject != nil {
|
||||
uninjectStart := time.Now()
|
||||
t.inject.injectDone(true)
|
||||
uninjectEnd := time.Now()
|
||||
log.Info("uninject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(uninjectEnd.Sub(uninjectStart))))
|
||||
}
|
||||
}
|
||||
|
||||
// TODO copy maybe expensive, but this seems to be the only convinent way.
|
||||
func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}, numRows int64) (storage.FieldData, error) {
|
||||
var rst storage.FieldData
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
|
||||
// "math"
|
||||
"testing"
|
||||
|
@ -762,6 +763,11 @@ func TestCompactorInterfaceMethods(t *testing.T) {
|
|||
assert.Equal(t, int64(4), result.GetNumOfRows())
|
||||
assert.NotEmpty(t, result.InsertLogs)
|
||||
assert.NotEmpty(t, result.Field2StatslogPaths)
|
||||
|
||||
assert.Equal(t, 0, mockfm.injectCount())
|
||||
task.injectDone()
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
assert.Equal(t, 1, mockfm.injectCount())
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -845,6 +851,11 @@ func TestCompactorInterfaceMethods(t *testing.T) {
|
|||
assert.Equal(t, int64(2), result.GetNumOfRows())
|
||||
assert.NotEmpty(t, result.InsertLogs)
|
||||
assert.NotEmpty(t, result.Field2StatslogPaths)
|
||||
|
||||
assert.Equal(t, 0, mockfm.injectCount())
|
||||
task.injectDone()
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
assert.Equal(t, 1, mockfm.injectCount())
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -853,6 +864,10 @@ type mockFlushManager struct {
|
|||
returnError bool
|
||||
recordFlushedSeg bool
|
||||
flushedSegIDs []UniqueID
|
||||
injectOverCount struct {
|
||||
sync.RWMutex
|
||||
value int
|
||||
}
|
||||
}
|
||||
|
||||
var _ flushManager = (*mockFlushManager)(nil)
|
||||
|
@ -880,9 +895,18 @@ func (mfm *mockFlushManager) injectFlush(injection *taskInjection, segments ...U
|
|||
//injection.injected <- struct{}{}
|
||||
close(injection.injected)
|
||||
<-injection.injectOver
|
||||
mfm.injectOverCount.Lock()
|
||||
defer mfm.injectOverCount.Unlock()
|
||||
mfm.injectOverCount.value++
|
||||
}()
|
||||
}
|
||||
|
||||
func (mfm *mockFlushManager) injectCount() int {
|
||||
mfm.injectOverCount.RLock()
|
||||
defer mfm.injectOverCount.RUnlock()
|
||||
return mfm.injectOverCount.value
|
||||
}
|
||||
|
||||
func (mfm *mockFlushManager) notifyAllFlushed() {}
|
||||
|
||||
func (mfm *mockFlushManager) startDropping() {}
|
||||
|
|
|
@ -871,7 +871,6 @@ func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.Compac
|
|||
PlanID: k.(UniqueID),
|
||||
Result: v.(*datapb.CompactionResult),
|
||||
})
|
||||
node.compactionExecutor.completed.Delete(k)
|
||||
return true
|
||||
})
|
||||
|
||||
|
@ -904,28 +903,31 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
|
|||
return status, nil
|
||||
}
|
||||
|
||||
oneSegment := req.GetCompactedFrom()[0]
|
||||
channel, err := node.flowgraphManager.getChannel(oneSegment)
|
||||
if err != nil {
|
||||
status.Reason = fmt.Sprintf("invalid request, err=%s", err.Error())
|
||||
return status, nil
|
||||
}
|
||||
var (
|
||||
oneSegment int64
|
||||
channel Channel
|
||||
err error
|
||||
ds *dataSyncService
|
||||
ok bool
|
||||
)
|
||||
|
||||
ds, ok := node.flowgraphManager.getFlowgraphService(channel.getChannelName(oneSegment))
|
||||
if !ok {
|
||||
status.Reason = fmt.Sprintf("failed to find flow graph service, err=%s", err.Error())
|
||||
return status, nil
|
||||
}
|
||||
|
||||
// check if all compactedFrom segments are valid
|
||||
var invalidSegIDs []UniqueID
|
||||
for _, segID := range req.GetCompactedFrom() {
|
||||
if !channel.hasSegment(segID, true) {
|
||||
invalidSegIDs = append(invalidSegIDs, segID)
|
||||
for _, fromSegment := range req.GetCompactedFrom() {
|
||||
channel, err = node.flowgraphManager.getChannel(fromSegment)
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Warn("fail to get the channel", zap.Int64("segment", fromSegment), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
ds, ok = node.flowgraphManager.getFlowgraphService(channel.getChannelName(fromSegment))
|
||||
if !ok {
|
||||
log.Ctx(ctx).Warn("fail to find flow graph service", zap.Int64("segment", fromSegment))
|
||||
continue
|
||||
}
|
||||
oneSegment = fromSegment
|
||||
break
|
||||
}
|
||||
if len(invalidSegIDs) > 0 {
|
||||
status.Reason = fmt.Sprintf("invalid request, some segments are not in the same channel: %v", invalidSegIDs)
|
||||
if oneSegment == 0 {
|
||||
log.Ctx(ctx).Warn("no valid segment, maybe the request is a retry")
|
||||
status.ErrorCode = commonpb.ErrorCode_Success
|
||||
return status, nil
|
||||
}
|
||||
|
||||
|
@ -940,6 +942,7 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
|
|||
|
||||
err = channel.InitPKstats(ctx, targetSeg, req.GetStatsLogs(), tsoutil.GetCurrentTime())
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Warn("init pk stats fail", zap.Error(err))
|
||||
status.Reason = fmt.Sprintf("init pk stats fail, err=%s", err.Error())
|
||||
return status, nil
|
||||
}
|
||||
|
@ -947,11 +950,12 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
|
|||
// block all flow graph so it's safe to remove segment
|
||||
ds.fg.Blockall()
|
||||
defer ds.fg.Unblock()
|
||||
if err := channel.mergeFlushedSegments(targetSeg, req.GetPlanID(), req.GetCompactedFrom()); err != nil {
|
||||
if err = channel.mergeFlushedSegments(ctx, targetSeg, req.GetPlanID(), req.GetCompactedFrom()); err != nil {
|
||||
log.Ctx(ctx).Warn("fail to merge flushed segments", zap.Error(err))
|
||||
status.Reason = err.Error()
|
||||
return status, nil
|
||||
}
|
||||
|
||||
node.compactionExecutor.injectDone(req.GetPlanID())
|
||||
status.ErrorCode = commonpb.ErrorCode_Success
|
||||
return status, nil
|
||||
}
|
||||
|
|
|
@ -187,7 +187,7 @@ func TestDataNode(t *testing.T) {
|
|||
cnt++
|
||||
return true
|
||||
})
|
||||
assert.Equal(t, 0, cnt)
|
||||
assert.Equal(t, 1, cnt)
|
||||
})
|
||||
|
||||
t.Run("Test GetCompactionState unhealthy", func(t *testing.T) {
|
||||
|
@ -699,7 +699,6 @@ func TestDataNode(t *testing.T) {
|
|||
t.Run("invalid compacted from", func(t *testing.T) {
|
||||
invalidCompactedFroms := [][]UniqueID{
|
||||
{},
|
||||
{101, 201},
|
||||
}
|
||||
req := &datapb.SyncSegmentsRequest{}
|
||||
|
||||
|
@ -711,19 +710,43 @@ func TestDataNode(t *testing.T) {
|
|||
}
|
||||
})
|
||||
|
||||
t.Run("invalid compacted from", func(t *testing.T) {
|
||||
invalidCompactedFroms := [][]UniqueID{
|
||||
{101, 201},
|
||||
}
|
||||
req := &datapb.SyncSegmentsRequest{}
|
||||
|
||||
for _, invalid := range invalidCompactedFroms {
|
||||
req.CompactedFrom = invalid
|
||||
status, err := node.SyncSegments(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, status.GetErrorCode())
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("valid request numRows>0", func(t *testing.T) {
|
||||
req := &datapb.SyncSegmentsRequest{
|
||||
CompactedFrom: []int64{100, 200},
|
||||
CompactedTo: 101,
|
||||
NumOfRows: 100,
|
||||
}
|
||||
status, err := node.SyncSegments(ctx, req)
|
||||
cancelCtx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
status, err := node.SyncSegments(cancelCtx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
|
||||
|
||||
status, err = node.SyncSegments(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, status.GetErrorCode())
|
||||
|
||||
assert.True(t, fg.channel.hasSegment(req.CompactedTo, true))
|
||||
assert.False(t, fg.channel.hasSegment(req.CompactedFrom[0], true))
|
||||
assert.False(t, fg.channel.hasSegment(req.CompactedFrom[1], true))
|
||||
|
||||
status, err = node.SyncSegments(ctx, req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, status.GetErrorCode())
|
||||
})
|
||||
|
||||
t.Run("valid request numRows=0", func(t *testing.T) {
|
||||
|
|
|
@ -20,9 +20,8 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type sendTimeTick func(Timestamp, []int64) error
|
||||
|
@ -96,6 +95,8 @@ func (mt *mergedTimeTickerSender) isClosed() bool {
|
|||
|
||||
func (mt *mergedTimeTickerSender) work() {
|
||||
defer mt.wg.Done()
|
||||
var sids []int64
|
||||
var isDiffTs bool
|
||||
lastTs := uint64(0)
|
||||
for {
|
||||
mt.cond.L.Lock()
|
||||
|
@ -107,26 +108,24 @@ func (mt *mergedTimeTickerSender) work() {
|
|||
mt.cond.L.Unlock()
|
||||
|
||||
mt.mu.Lock()
|
||||
if mt.ts != lastTs {
|
||||
var sids []int64
|
||||
isDiffTs = mt.ts != lastTs
|
||||
if isDiffTs {
|
||||
for sid := range mt.segmentIDs {
|
||||
sids = append(sids, sid)
|
||||
}
|
||||
|
||||
// we will reset the timer but not the segmentIDs, since if we sent the timetick fail we may block forever due to flush stuck
|
||||
lastTs = mt.ts
|
||||
mt.lastSent = time.Now()
|
||||
|
||||
if err := mt.send(mt.ts, sids); err != nil {
|
||||
log.Error("send hard time tick failed", zap.Error(err))
|
||||
mt.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
mt.segmentIDs = make(map[int64]struct{})
|
||||
|
||||
}
|
||||
mt.mu.Unlock()
|
||||
|
||||
if isDiffTs {
|
||||
if err := mt.send(lastTs, sids); err != nil {
|
||||
log.Error("send hard time tick failed", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -109,7 +109,6 @@ type DataCoordCatalog interface {
|
|||
AlterSegment(ctx context.Context, newSegment *datapb.SegmentInfo, oldSegment *datapb.SegmentInfo) error
|
||||
SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error
|
||||
DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error
|
||||
RevertAlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, removalSegment *datapb.SegmentInfo) error
|
||||
|
||||
MarkChannelAdded(ctx context.Context, channel string) error
|
||||
MarkChannelDeleted(ctx context.Context, channel string) error
|
||||
|
|
Loading…
Reference in New Issue