mirror of https://github.com/milvus-io/milvus.git
FixBug: Incorrect handling of time synchronization (#6814)
* FixBug: proxy send wrong sync timetick and DataCoord did not filter channel Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>pull/6833/head
parent
3cc8ee298e
commit
8a1a841011
|
@ -376,14 +376,14 @@ func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel strin
|
|||
defer s.mu.Unlock()
|
||||
sp, _ := trace.StartSpanFromContext(ctx)
|
||||
defer sp.Finish()
|
||||
if err := s.tryToSealSegment(t); err != nil {
|
||||
if err := s.tryToSealSegment(t, channel); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ret := make([]UniqueID, 0, len(s.segments))
|
||||
for _, id := range s.segments {
|
||||
info := s.meta.GetSegment(id)
|
||||
if info == nil {
|
||||
if info == nil || info.InsertChannel != channel {
|
||||
continue
|
||||
}
|
||||
if s.flushPolicy(info, t) {
|
||||
|
@ -400,7 +400,7 @@ func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) error {
|
|||
defer s.mu.Unlock()
|
||||
for _, id := range s.segments {
|
||||
segment := s.meta.GetSegment(id)
|
||||
if segment == nil {
|
||||
if segment == nil || segment.InsertChannel != channel {
|
||||
continue
|
||||
}
|
||||
for i := 0; i < len(segment.allocations); i++ {
|
||||
|
@ -416,12 +416,11 @@ func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) error {
|
|||
}
|
||||
|
||||
// tryToSealSegment applies segment & channel seal policies
|
||||
func (s *SegmentManager) tryToSealSegment(ts Timestamp) error {
|
||||
func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error {
|
||||
channelInfo := make(map[string][]*SegmentInfo)
|
||||
for _, id := range s.segments {
|
||||
info := s.meta.GetSegment(id)
|
||||
if info == nil {
|
||||
log.Warn("Failed to get seg info from meta", zap.Int64("id", id))
|
||||
if info == nil || info.InsertChannel != channel {
|
||||
continue
|
||||
}
|
||||
channelInfo[info.InsertChannel] = append(channelInfo[info.InsertChannel], info)
|
||||
|
|
|
@ -335,8 +335,6 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
|
|||
|
||||
ch := ttMsg.ChannelName
|
||||
ts := ttMsg.Timestamp
|
||||
// log.Debug("Receive datanode timetick msg", zap.String("channel", ch),
|
||||
// zap.Any("ts", ts))
|
||||
segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
|
||||
if err != nil {
|
||||
log.Warn("get flushable segments failed", zap.Error(err))
|
||||
|
|
|
@ -96,7 +96,6 @@ func (ticker *channelsTimeTickerImpl) tick() error {
|
|||
log.Warn("Proxy channelsTimeTickerImpl failed to get ts from tso", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
//nowPTime, _ := tsoutil.ParseTS(now)
|
||||
|
||||
ticker.statisticsMtx.Lock()
|
||||
defer ticker.statisticsMtx.Unlock()
|
||||
|
@ -111,36 +110,19 @@ func (ticker *channelsTimeTickerImpl) tick() error {
|
|||
|
||||
for pchan := range ticker.currents {
|
||||
current := ticker.currents[pchan]
|
||||
//currentPTime, _ := tsoutil.ParseTS(current)
|
||||
stat, ok := stats[pchan]
|
||||
//log.Debug("Proxy channelsTimeTickerImpl", zap.Any("pchan", pchan),
|
||||
// zap.Any("TaskInQueue", ok),
|
||||
// zap.Any("current", currentPTime),
|
||||
// zap.Any("now", nowPTime))
|
||||
|
||||
if !ok {
|
||||
ticker.minTsStatistics[pchan] = current
|
||||
ticker.currents[pchan] = now
|
||||
} else {
|
||||
//minPTime, _ := tsoutil.ParseTS(stat.minTs)
|
||||
//maxPTime, _ := tsoutil.ParseTS(stat.maxTs)
|
||||
|
||||
if stat.minTs > current {
|
||||
cur := current
|
||||
if stat.minTs > now {
|
||||
cur = now
|
||||
}
|
||||
ticker.minTsStatistics[pchan] = cur
|
||||
ticker.minTsStatistics[pchan] = stat.minTs - 1
|
||||
next := now + Timestamp(sendTimeTickMsgInterval)
|
||||
if next > stat.maxTs {
|
||||
next = stat.maxTs
|
||||
}
|
||||
ticker.currents[pchan] = next
|
||||
//nextPTime, _ := tsoutil.ParseTS(next)
|
||||
//log.Debug("Proxy channelsTimeTickerImpl",
|
||||
// zap.Any("pchan", pchan),
|
||||
// zap.Any("minPTime", minPTime),
|
||||
// zap.Any("maxPTime", maxPTime),
|
||||
// zap.Any("nextPTime", nextPTime))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -190,7 +172,6 @@ func (ticker *channelsTimeTickerImpl) close() error {
|
|||
|
||||
func (ticker *channelsTimeTickerImpl) addPChan(pchan pChan) error {
|
||||
ticker.statisticsMtx.Lock()
|
||||
|
||||
if _, ok := ticker.minTsStatistics[pchan]; ok {
|
||||
ticker.statisticsMtx.Unlock()
|
||||
return fmt.Errorf("pChan %v already exist in minTsStatistics", pchan)
|
||||
|
|
|
@ -11,7 +11,6 @@
|
|||
|
||||
package proxy
|
||||
|
||||
/*
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
|
@ -38,7 +37,8 @@ func newMockTsoAllocator() *mockTsoAllocator {
|
|||
}
|
||||
|
||||
func newGetStatisticsFunc(pchans []pChan) getPChanStatisticsFuncType {
|
||||
pchanNum := rand.Uint64()%5 + 1
|
||||
totalPchan := len(pchans)
|
||||
pchanNum := rand.Uint64()%(uint64(totalPchan)) + 1
|
||||
pchans2 := make([]pChan, 0, pchanNum)
|
||||
for i := 0; uint64(i) < pchanNum; i++ {
|
||||
pchans2 = append(pchans2, pchans[i])
|
||||
|
@ -223,4 +223,3 @@ func TestChannelsTimeTickerImpl_getMinTsStatistics(t *testing.T) {
|
|||
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
*/
|
||||
|
|
|
@ -99,6 +99,7 @@ type dmlTask interface {
|
|||
task
|
||||
getChannels() ([]vChan, error)
|
||||
getPChanStats() (map[pChan]pChanStatistics, error)
|
||||
getChannelsTimerTicker() channelsTimeTicker
|
||||
}
|
||||
|
||||
type BaseInsertTask = msgstream.InsertMsg
|
||||
|
@ -153,6 +154,10 @@ func (it *InsertTask) EndTs() Timestamp {
|
|||
return it.EndTimestamp
|
||||
}
|
||||
|
||||
func (it *InsertTask) getChannelsTimerTicker() channelsTimeTicker {
|
||||
return it.chTicker
|
||||
}
|
||||
|
||||
func (it *InsertTask) getPChanStats() (map[pChan]pChanStatistics, error) {
|
||||
ret := make(map[pChan]pChanStatistics)
|
||||
|
||||
|
@ -1025,15 +1030,6 @@ func (it *InsertTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
pchans, err := it.chMgr.getChannels(collID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, pchan := range pchans {
|
||||
log.Debug("Proxy InsertTask add pchan", zap.Any("pchan", pchan))
|
||||
_ = it.chTicker.addPChan(pchan)
|
||||
}
|
||||
|
||||
// Assign SegmentID
|
||||
var pack *msgstream.MsgPack
|
||||
pack, err = it._assignSegmentID(stream, &msgPack)
|
||||
|
|
|
@ -224,7 +224,6 @@ func (queue *DmTaskQueue) Enqueue(t task) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_ = queue.addPChanStats(t)
|
||||
|
||||
return nil
|
||||
|
@ -248,10 +247,10 @@ func (queue *DmTaskQueue) addPChanStats(t task) error {
|
|||
if dmT, ok := t.(dmlTask); ok {
|
||||
stats, err := dmT.getPChanStats()
|
||||
if err != nil {
|
||||
log.Debug("Proxy DmTaskQueue addPChanStats", zap.Any("tID", t.ID()),
|
||||
zap.Any("stats", stats), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("Proxy DmTaskQueue addPChanStats", zap.Any("tID", t.ID()),
|
||||
zap.Any("stats", stats))
|
||||
queue.statsLock.Lock()
|
||||
for cName, stat := range stats {
|
||||
info, ok := queue.pChanStatisticsInfos[cName]
|
||||
|
@ -263,6 +262,7 @@ func (queue *DmTaskQueue) addPChanStats(t task) error {
|
|||
},
|
||||
}
|
||||
queue.pChanStatisticsInfos[cName] = info
|
||||
dmT.getChannelsTimerTicker().addPChan(cName)
|
||||
} else {
|
||||
if info.minTs > stat.minTs {
|
||||
queue.pChanStatisticsInfos[cName].minTs = stat.minTs
|
||||
|
@ -275,7 +275,7 @@ func (queue *DmTaskQueue) addPChanStats(t task) error {
|
|||
}
|
||||
queue.statsLock.Unlock()
|
||||
} else {
|
||||
return fmt.Errorf("Proxy addUnissuedTask reflect to dmlTask failed, tID:%v", t.ID())
|
||||
return fmt.Errorf("proxy addUnissuedTask reflect to dmlTask failed, tID:%v", t.ID())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue