mirror of https://github.com/milvus-io/milvus.git
parent
7aa0942b11
commit
f0bedc7046
|
@ -515,7 +515,7 @@ func (node *DataNode) ReadyToFlush() error {
|
|||
// If DataNode receives a valid segment to flush, new flush message for the segment should be ignored.
|
||||
// So if receiving calls to flush segment A, DataNode should guarantee the segment to be flushed.
|
||||
//
|
||||
// There are 1 precondition: The segmentID in req is in ascending order.
|
||||
// One precondition: The segmentID in req is in ascending order.
|
||||
func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) {
|
||||
metrics.DataNodeFlushSegmentsCounter.WithLabelValues(MetricRequestsTotal).Inc()
|
||||
status := &commonpb.Status{
|
||||
|
@ -528,23 +528,25 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
|
|||
}
|
||||
|
||||
log.Debug("Receive FlushSegments req",
|
||||
zap.Int("num", len(req.SegmentIDs)),
|
||||
zap.Int64("collectionID", req.GetCollectionID()), zap.Int("num", len(req.SegmentIDs)),
|
||||
zap.Int64s("segments", req.SegmentIDs),
|
||||
)
|
||||
|
||||
for _, id := range req.SegmentIDs {
|
||||
chanName := node.getChannelNamebySegmentID(id)
|
||||
log.Debug("vchannel", zap.String("name", chanName), zap.Int64("SegmentID", id))
|
||||
|
||||
if len(chanName) == 0 {
|
||||
log.Warn("DataNode not find segment", zap.Int64("ID", id))
|
||||
status.Reason = fmt.Sprintf("DataNode not find segment %d!", id)
|
||||
log.Warn("FlushSegments failed, cannot find segment in DataNode replica",
|
||||
zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("segmentID", id))
|
||||
|
||||
status.Reason = fmt.Sprintf("DataNode replica not find segment %d!", id)
|
||||
return status, nil
|
||||
}
|
||||
|
||||
if node.segmentCache.checkIfCached(id) {
|
||||
// Segment in flushing or flushed, ignore
|
||||
log.Info("Segment in flushing, ignore it", zap.Int64("ID", id))
|
||||
// Segment in flushing, ignore
|
||||
log.Info("Segment flushing, ignore the flush request until flush is done.",
|
||||
zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("segmentID", id))
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -555,6 +557,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
|
|||
node.chanMut.RUnlock()
|
||||
if !ok {
|
||||
status.Reason = "DataNode abnormal, restarting"
|
||||
log.Error("DataNode abnormal, no flushCh for a vchannel")
|
||||
return status, nil
|
||||
}
|
||||
|
||||
|
@ -572,8 +575,8 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
|
|||
flushChs.deleteBufferCh <- &deleteFlushMsg
|
||||
}
|
||||
|
||||
log.Debug("FlushSegments tasks triggered",
|
||||
zap.Int64s("segments", req.SegmentIDs))
|
||||
log.Debug("Flowgraph flushSegment tasks triggered",
|
||||
zap.Int64("collectionID", req.GetCollectionID()), zap.Int64s("segments", req.GetSegmentIDs()))
|
||||
|
||||
status.ErrorCode = commonpb.ErrorCode_Success
|
||||
metrics.DataNodeFlushSegmentsCounter.WithLabelValues(MetricRequestsSuccess).Inc()
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus/internal/msgstream"
|
||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||
|
||||
|
@ -164,6 +165,52 @@ func TestDataNode(t *testing.T) {
|
|||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("Test SetRootCoord", func(t *testing.T) {
|
||||
emptyDN := &DataNode{}
|
||||
tests := []struct {
|
||||
inrc types.RootCoord
|
||||
isvalid bool
|
||||
description string
|
||||
}{
|
||||
{nil, false, "nil input"},
|
||||
{&RootCoordFactory{}, true, "valid input"},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.description, func(t *testing.T) {
|
||||
err := emptyDN.SetRootCoord(test.inrc)
|
||||
if test.isvalid {
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
assert.Error(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Test SetDataCoord", func(t *testing.T) {
|
||||
emptyDN := &DataNode{}
|
||||
tests := []struct {
|
||||
inrc types.DataCoord
|
||||
isvalid bool
|
||||
description string
|
||||
}{
|
||||
{nil, false, "nil input"},
|
||||
{&DataCoordFactory{}, true, "valid input"},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.description, func(t *testing.T) {
|
||||
err := emptyDN.SetDataCoord(test.inrc)
|
||||
if test.isvalid {
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
assert.Error(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Test GetComponentStates", func(t *testing.T) {
|
||||
stat, err := node.GetComponentStates(node.ctx)
|
||||
assert.NoError(t, err)
|
||||
|
|
Loading…
Reference in New Issue