mirror of https://github.com/milvus-io/milvus.git
Fix datanode panic due to concurrent compaction and delete processing (#27167)
Co-authored-by: xiaofan-luan <xiaofan.luan@zilliz.com>pull/27161/head
parent
0459a662e4
commit
4b2802033d
|
@ -76,7 +76,7 @@ type Channel interface {
|
|||
listNewSegmentsStartPositions() []*datapb.SegmentStartPosition
|
||||
transferNewSegments(segmentIDs []UniqueID)
|
||||
updateSegmentPKRange(segID UniqueID, ids storage.FieldData)
|
||||
mergeFlushedSegments(ctx context.Context, seg *Segment, planID UniqueID, compactedFrom []UniqueID) error
|
||||
mergeFlushedSegments(ctx context.Context, seg *Segment, planID UniqueID, compactedFrom []UniqueID)
|
||||
listCompactedSegmentIDs() map[UniqueID][]UniqueID
|
||||
listSegmentIDsToSync(ts Timestamp) []UniqueID
|
||||
|
||||
|
@ -678,7 +678,7 @@ func (c *ChannelMeta) getCollectionSchema(collID UniqueID, ts Timestamp) (*schem
|
|||
return c.collSchema, nil
|
||||
}
|
||||
|
||||
func (c *ChannelMeta) mergeFlushedSegments(ctx context.Context, seg *Segment, planID UniqueID, compactedFrom []UniqueID) error {
|
||||
func (c *ChannelMeta) mergeFlushedSegments(ctx context.Context, seg *Segment, planID UniqueID, compactedFrom []UniqueID) {
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.Int64("segmentID", seg.segmentID),
|
||||
zap.Int64("collectionID", seg.collectionID),
|
||||
|
@ -687,13 +687,6 @@ func (c *ChannelMeta) mergeFlushedSegments(ctx context.Context, seg *Segment, pl
|
|||
zap.Int64("planID", planID),
|
||||
zap.String("channelName", c.channelName))
|
||||
|
||||
if seg.collectionID != c.collectionID {
|
||||
log.Warn("failed to mergeFlushedSegments, collection mismatch",
|
||||
zap.Int64("current collection ID", seg.collectionID),
|
||||
zap.Int64("expected collection ID", c.collectionID))
|
||||
return merr.WrapErrParameterInvalid(c.collectionID, seg.collectionID, "collection not match")
|
||||
}
|
||||
|
||||
var inValidSegments []UniqueID
|
||||
for _, ID := range compactedFrom {
|
||||
// no such segments in channel or the segments are unflushed.
|
||||
|
@ -711,12 +704,6 @@ func (c *ChannelMeta) mergeFlushedSegments(ctx context.Context, seg *Segment, pl
|
|||
log.Info("merge flushed segments")
|
||||
c.segMu.Lock()
|
||||
defer c.segMu.Unlock()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Warn("the context has been closed", zap.Error(ctx.Err()))
|
||||
return errors.New("invalid context")
|
||||
default:
|
||||
}
|
||||
|
||||
for _, ID := range compactedFrom {
|
||||
// the existent of the segments are already checked
|
||||
|
@ -733,8 +720,6 @@ func (c *ChannelMeta) mergeFlushedSegments(ctx context.Context, seg *Segment, pl
|
|||
seg.setType(datapb.SegmentType_Flushed)
|
||||
c.segments[seg.segmentID] = seg
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// for tests only
|
||||
|
|
|
@ -676,12 +676,7 @@ func TestChannelMeta_InterfaceMethod(t *testing.T) {
|
|||
require.False(t, channel.hasSegment(3, true))
|
||||
|
||||
// tests start
|
||||
err := channel.mergeFlushedSegments(context.Background(), test.inSeg, 100, test.inCompactedFrom)
|
||||
if test.isValid {
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
assert.Error(t, err)
|
||||
}
|
||||
channel.mergeFlushedSegments(context.Background(), test.inSeg, 100, test.inCompactedFrom)
|
||||
|
||||
if test.stored {
|
||||
assert.True(t, channel.hasSegment(3, true))
|
||||
|
|
|
@ -60,15 +60,8 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
// RPCConnectionTimeout is used to set the timeout for rpc request
|
||||
RPCConnectionTimeout = 30 * time.Second
|
||||
|
||||
// ConnectEtcdMaxRetryTime is used to limit the max retry time for connection etcd
|
||||
ConnectEtcdMaxRetryTime = 100
|
||||
|
||||
// ImportCallTimeout is the timeout used in Import() method calls
|
||||
// This value is equal to RootCoord's task expire time
|
||||
ImportCallTimeout = 15 * 60 * time.Second
|
||||
)
|
||||
|
||||
var getFlowGraphServiceAttempts = uint(50)
|
||||
|
|
|
@ -100,6 +100,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
|
|||
log.Debug("Buffer delete request in DataNode", zap.String("traceID", traceID))
|
||||
tmpSegIDs, err := dn.bufferDeleteMsg(msg, fgMsg.timeRange, fgMsg.startPositions[0], fgMsg.endPositions[0])
|
||||
if err != nil {
|
||||
// should not happen
|
||||
// error occurs only when deleteMsg is misaligned, should not happen
|
||||
log.Fatal("failed to buffer delete msg", zap.String("traceID", traceID), zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"math"
|
||||
"reflect"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/atomic"
|
||||
|
@ -485,6 +486,9 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID,
|
|||
task.dropped,
|
||||
endPosition)
|
||||
if err != nil {
|
||||
if errors.Is(err, merr.ErrSegmentNotFound) {
|
||||
return retry.Unrecoverable(err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
@ -497,6 +501,15 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID,
|
|||
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.TotalLabel).Inc()
|
||||
}
|
||||
|
||||
if errors.Is(err, merr.ErrSegmentNotFound) {
|
||||
if !segment.isValid() {
|
||||
log.Info("try to flush a compacted segment, ignore..",
|
||||
zap.Int64("segmentID", task.segmentID),
|
||||
zap.Error(err))
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if merr.IsCanceledOrTimeout(err) {
|
||||
log.Warn("skip syncing buffer data for context done",
|
||||
zap.Int64("segmentID", task.segmentID),
|
||||
|
@ -504,6 +517,7 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID,
|
|||
)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Fatal("insertBufferNode failed to flushBufferData",
|
||||
zap.Int64("segmentID", task.segmentID),
|
||||
zap.Error(err),
|
||||
|
|
|
@ -21,8 +21,10 @@ import (
|
|||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
|
@ -48,8 +50,17 @@ type ttNode struct {
|
|||
BaseNode
|
||||
vChannelName string
|
||||
channel Channel
|
||||
lastUpdateTime time.Time
|
||||
lastUpdateTime *atomic.Time
|
||||
dataCoord types.DataCoord
|
||||
|
||||
updateCPLock sync.Mutex
|
||||
notifyChannel chan checkPoint
|
||||
closeChannel chan struct{}
|
||||
}
|
||||
|
||||
type checkPoint struct {
|
||||
curTs time.Time
|
||||
pos *msgpb.MsgPosition
|
||||
}
|
||||
|
||||
// Name returns node name, implementing flowgraph.Node
|
||||
|
@ -72,43 +83,45 @@ func (ttn *ttNode) IsValidInMsg(in []Msg) bool {
|
|||
// Operate handles input messages, implementing flowgraph.Node
|
||||
func (ttn *ttNode) Operate(in []Msg) []Msg {
|
||||
fgMsg := in[0].(*flowGraphMsg)
|
||||
curTs, _ := tsoutil.ParseTS(fgMsg.timeRange.timestampMax)
|
||||
if fgMsg.IsCloseMsg() {
|
||||
if len(fgMsg.endPositions) > 0 {
|
||||
close(ttn.closeChannel)
|
||||
channelPos := ttn.channel.getChannelCheckpoint(fgMsg.endPositions[0])
|
||||
log.Info("flowgraph is closing, force update channel CP",
|
||||
zap.Time("cpTs", tsoutil.PhysicalTime(channelPos.GetTimestamp())),
|
||||
zap.String("channel", channelPos.GetChannelName()))
|
||||
ttn.updateChannelCP(channelPos)
|
||||
ttn.updateChannelCP(channelPos, curTs)
|
||||
}
|
||||
return in
|
||||
}
|
||||
|
||||
curTs, _ := tsoutil.ParseTS(fgMsg.timeRange.timestampMax)
|
||||
// Do not block and async updateCheckPoint
|
||||
channelPos := ttn.channel.getChannelCheckpoint(fgMsg.endPositions[0])
|
||||
log := log.With(zap.String("channel", ttn.vChannelName),
|
||||
zap.Time("cpTs", tsoutil.PhysicalTime(channelPos.GetTimestamp())))
|
||||
if curTs.Sub(ttn.lastUpdateTime) >= updateChanCPInterval {
|
||||
if err := ttn.updateChannelCP(channelPos); err == nil {
|
||||
ttn.lastUpdateTime = curTs
|
||||
log.Info("update channel cp periodically")
|
||||
return []Msg{}
|
||||
nonBlockingNotify := func() {
|
||||
select {
|
||||
case ttn.notifyChannel <- checkPoint{curTs, channelPos}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
if curTs.Sub(ttn.lastUpdateTime.Load()) >= updateChanCPInterval {
|
||||
nonBlockingNotify()
|
||||
return []Msg{}
|
||||
}
|
||||
|
||||
if channelPos.GetTimestamp() >= ttn.channel.getFlushTs() {
|
||||
if err := ttn.updateChannelCP(channelPos); err == nil {
|
||||
ttn.lastUpdateTime = curTs
|
||||
log.Info("update channel cp at updateTs", zap.Time("updateTs", tsoutil.PhysicalTime(ttn.channel.getFlushTs())))
|
||||
ttn.channel.setFlushTs(math.MaxUint64)
|
||||
}
|
||||
nonBlockingNotify()
|
||||
}
|
||||
|
||||
return []Msg{}
|
||||
}
|
||||
|
||||
func (ttn *ttNode) updateChannelCP(channelPos *msgpb.MsgPosition) error {
|
||||
channelCPTs, _ := tsoutil.ParseTS(channelPos.GetTimestamp())
|
||||
func (ttn *ttNode) updateChannelCP(channelPos *msgpb.MsgPosition, curTs time.Time) error {
|
||||
ttn.updateCPLock.Lock()
|
||||
defer ttn.updateCPLock.Unlock()
|
||||
|
||||
channelCPTs, _ := tsoutil.ParseTS(channelPos.GetTimestamp())
|
||||
// TODO, change to ETCD operation, avoid datacoord operation
|
||||
ctx, cancel := context.WithTimeout(context.Background(), updateChanCPTimeout)
|
||||
defer cancel()
|
||||
resp, err := ttn.dataCoord.UpdateChannelCheckpoint(ctx, &datapb.UpdateChannelCheckpointRequest{
|
||||
|
@ -124,6 +137,11 @@ func (ttn *ttNode) updateChannelCP(channelPos *msgpb.MsgPosition) error {
|
|||
return err
|
||||
}
|
||||
|
||||
ttn.lastUpdateTime.Store(curTs)
|
||||
// channelPos ts > flushTs means we could stop flush.
|
||||
if channelPos.GetTimestamp() >= ttn.channel.getFlushTs() {
|
||||
ttn.channel.setFlushTs(math.MaxUint64)
|
||||
}
|
||||
log.Info("UpdateChannelCheckpoint success",
|
||||
zap.String("channel", ttn.vChannelName),
|
||||
zap.Uint64("cpTs", channelPos.GetTimestamp()),
|
||||
|
@ -140,9 +158,23 @@ func newTTNode(config *nodeConfig, dc types.DataCoord) (*ttNode, error) {
|
|||
BaseNode: baseNode,
|
||||
vChannelName: config.vChannelName,
|
||||
channel: config.channel,
|
||||
lastUpdateTime: time.Time{}, // set to Zero to update channel checkpoint immediately after fg started
|
||||
lastUpdateTime: atomic.NewTime(time.Time{}), // set to Zero to update channel checkpoint immediately after fg started
|
||||
dataCoord: dc,
|
||||
notifyChannel: make(chan checkPoint, 1),
|
||||
closeChannel: make(chan struct{}),
|
||||
}
|
||||
|
||||
// check point updater
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-tt.closeChannel:
|
||||
return
|
||||
case cp := <-tt.notifyChannel:
|
||||
tt.updateChannelCP(cp.pos, cp.curTs)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return tt, nil
|
||||
}
|
||||
|
|
|
@ -578,7 +578,7 @@ func (m *rendezvousFlushManager) injectFlush(injection *taskInjection, segments
|
|||
// fetch meta info for segment
|
||||
func (m *rendezvousFlushManager) getSegmentMeta(segmentID UniqueID, pos *msgpb.MsgPosition) (UniqueID, UniqueID, *etcdpb.CollectionMeta, error) {
|
||||
if !m.hasSegment(segmentID, true) {
|
||||
return -1, -1, nil, fmt.Errorf("no such segment %d in the channel", segmentID)
|
||||
return -1, -1, nil, merr.WrapErrSegmentNotFound(segmentID, "segment not found during flush")
|
||||
}
|
||||
|
||||
// fetch meta information of segment
|
||||
|
|
|
@ -362,6 +362,8 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
|
|||
oneSegment int64
|
||||
channel Channel
|
||||
err error
|
||||
ds *dataSyncService
|
||||
ok bool
|
||||
)
|
||||
|
||||
for _, fromSegment := range req.GetCompactedFrom() {
|
||||
|
@ -370,6 +372,11 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
|
|||
log.Ctx(ctx).Warn("fail to get the channel", zap.Int64("segment", fromSegment), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
ds, ok = node.flowgraphManager.getFlowgraphService(channel.getChannelName(fromSegment))
|
||||
if !ok {
|
||||
log.Ctx(ctx).Warn("fail to find flow graph service", zap.Int64("segment", fromSegment))
|
||||
continue
|
||||
}
|
||||
oneSegment = fromSegment
|
||||
break
|
||||
}
|
||||
|
@ -392,9 +399,9 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
|
|||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
if err := channel.mergeFlushedSegments(ctx, targetSeg, req.GetPlanID(), req.GetCompactedFrom()); err != nil {
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
ds.fg.Blockall()
|
||||
defer ds.fg.Unblock()
|
||||
channel.mergeFlushedSegments(ctx, targetSeg, req.GetPlanID(), req.GetCompactedFrom())
|
||||
node.compactionExecutor.injectDone(req.GetPlanID(), true)
|
||||
return merr.Status(nil), nil
|
||||
}
|
||||
|
|
|
@ -701,13 +701,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
|
|||
CompactedTo: 102,
|
||||
NumOfRows: 100,
|
||||
}
|
||||
cancelCtx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
status, err := s.node.SyncSegments(cancelCtx, req)
|
||||
s.Assert().NoError(err)
|
||||
s.Assert().False(merr.Ok(status))
|
||||
|
||||
status, err = s.node.SyncSegments(s.ctx, req)
|
||||
status, err := s.node.SyncSegments(s.ctx, req)
|
||||
s.Assert().NoError(err)
|
||||
s.Assert().True(merr.Ok(status))
|
||||
|
||||
|
|
|
@ -31,6 +31,8 @@ const (
|
|||
// TODO: better to be configured
|
||||
nodeCtxTtInterval = 2 * time.Minute
|
||||
enableTtChecker = true
|
||||
// blockAll should wait no more than 10 seconds
|
||||
blockAllWait = 10 * time.Second
|
||||
)
|
||||
|
||||
// Node is the interface defines the behavior of flowgraph
|
||||
|
@ -74,7 +76,13 @@ func (nodeCtx *nodeCtx) Start() {
|
|||
func (nodeCtx *nodeCtx) Block() {
|
||||
// input node operate function will be blocking
|
||||
if !nodeCtx.node.IsInputNode() {
|
||||
startTs := time.Now()
|
||||
nodeCtx.blockMutex.Lock()
|
||||
if time.Since(startTs) >= blockAllWait {
|
||||
log.Warn("flow graph wait for long time",
|
||||
zap.String("name", nodeCtx.node.Name()),
|
||||
zap.Duration("wait time", time.Since(startTs)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -134,3 +134,13 @@ func TestContextCancel(t *testing.T) {
|
|||
assert.True(t, merr.IsCanceledOrTimeout(err))
|
||||
t.Log(err)
|
||||
}
|
||||
|
||||
func TestWrap(t *testing.T) {
|
||||
err := merr.WrapErrSegmentNotFound(1, "failed to get Segment")
|
||||
assert.True(t, errors.Is(err, merr.ErrSegmentNotFound))
|
||||
assert.True(t, IsRecoverable(err))
|
||||
err2 := Unrecoverable(err)
|
||||
fmt.Println(err2)
|
||||
assert.True(t, errors.Is(err2, merr.ErrSegmentNotFound))
|
||||
assert.False(t, IsRecoverable(err2))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue