Refactor proxy ticker to fix guarantee_time not work (#11248)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/11287/head
bigsheeper 2021-11-05 09:14:02 +08:00 committed by GitHub
parent e26d4a72b4
commit c99d1a56bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 41 additions and 148 deletions

View File

@ -13,14 +13,12 @@ package proxy
import (
"context"
"fmt"
"sync"
"time"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
// ticker can update ts only when the minTs greater than the ts of ticker, we can use maxTs to update current later
@ -30,10 +28,8 @@ type getPChanStatisticsFuncType func() (map[pChan]*pChanStatistics, error)
type channelsTimeTicker interface {
start() error
close() error
addPChan(pchan pChan) error
removePChan(pchan pChan) error
getLastTick(pchan pChan) (Timestamp, error)
getMinTsStatistics() (map[pChan]Timestamp, error)
getMinTsStatistics() (map[pChan]Timestamp, Timestamp, error)
getMinTick() Timestamp
}
@ -44,13 +40,14 @@ type channelsTimeTickerImpl struct {
getStatisticsFunc getPChanStatisticsFuncType
tso tsoAllocator
currents map[pChan]Timestamp
currentsMtx sync.RWMutex
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
defaultTimestamp Timestamp
minTimestamp Timestamp
}
func (ticker *channelsTimeTickerImpl) getMinTsStatistics() (map[pChan]Timestamp, error) {
func (ticker *channelsTimeTickerImpl) getMinTsStatistics() (map[pChan]Timestamp, Timestamp, error) {
ticker.statisticsMtx.RLock()
defer ticker.statisticsMtx.RUnlock()
@ -60,7 +57,7 @@ func (ticker *channelsTimeTickerImpl) getMinTsStatistics() (map[pChan]Timestamp,
ret[k] = v
}
}
return ret, nil
return ret, ticker.defaultTimestamp, nil
}
func (ticker *channelsTimeTickerImpl) initStatistics() {
@ -73,9 +70,6 @@ func (ticker *channelsTimeTickerImpl) initStatistics() {
}
func (ticker *channelsTimeTickerImpl) initCurrents(current Timestamp) {
ticker.currentsMtx.Lock()
defer ticker.currentsMtx.Unlock()
for pchan := range ticker.currents {
ticker.currents[pchan] = current
}
@ -97,16 +91,16 @@ func (ticker *channelsTimeTickerImpl) tick() error {
ticker.statisticsMtx.Lock()
defer ticker.statisticsMtx.Unlock()
ticker.currentsMtx.Lock()
defer ticker.currentsMtx.Unlock()
ticker.defaultTimestamp = now
minTs := now
for pchan := range ticker.currents {
current := ticker.currents[pchan]
stat, ok := stats[pchan]
if !ok {
ticker.minTsStatistics[pchan] = current
ticker.currents[pchan] = now
delete(ticker.minTsStatistics, pchan)
delete(ticker.currents, pchan)
} else {
if stat.minTs > current {
ticker.minTsStatistics[pchan] = stat.minTs - 1
@ -116,9 +110,25 @@ func (ticker *channelsTimeTickerImpl) tick() error {
}
ticker.currents[pchan] = next
}
lastMin := ticker.minTsStatistics[pchan]
if minTs > lastMin {
minTs = lastMin
}
}
}
for pchan, value := range stats {
_, ok := ticker.currents[pchan]
if !ok {
ticker.minTsStatistics[pchan] = value.minTs - 1
ticker.currents[pchan] = now
}
if minTs > value.minTs-1 {
minTs = value.minTs - 1
}
}
ticker.minTimestamp = minTs
return nil
}
@ -162,53 +172,13 @@ func (ticker *channelsTimeTickerImpl) close() error {
return nil
}
func (ticker *channelsTimeTickerImpl) addPChan(pchan pChan) error {
ticker.statisticsMtx.Lock()
if _, ok := ticker.minTsStatistics[pchan]; ok {
ticker.statisticsMtx.Unlock()
return fmt.Errorf("pChan %v already exist in minTsStatistics", pchan)
}
ticker.minTsStatistics[pchan] = 0
ticker.statisticsMtx.Unlock()
ticker.currentsMtx.Lock()
defer ticker.currentsMtx.Unlock()
if _, ok := ticker.currents[pchan]; ok {
return fmt.Errorf("pChan %v already exist in currents", pchan)
}
ticker.currents[pchan] = 0
return nil
}
func (ticker *channelsTimeTickerImpl) removePChan(pchan pChan) error {
ticker.statisticsMtx.Lock()
if _, ok := ticker.minTsStatistics[pchan]; !ok {
ticker.statisticsMtx.Unlock()
return fmt.Errorf("pChan %v don't exist in minTsStatistics", pchan)
}
delete(ticker.minTsStatistics, pchan)
ticker.statisticsMtx.Unlock()
ticker.currentsMtx.Lock()
defer ticker.currentsMtx.Unlock()
if _, ok := ticker.currents[pchan]; !ok {
return fmt.Errorf("pChan %v don't exist in currents", pchan)
}
delete(ticker.currents, pchan)
return nil
}
func (ticker *channelsTimeTickerImpl) getLastTick(pchan pChan) (Timestamp, error) {
ticker.statisticsMtx.RLock()
defer ticker.statisticsMtx.RUnlock()
ts, ok := ticker.minTsStatistics[pchan]
if !ok {
return 0, fmt.Errorf("pChan %v not found", pchan)
return ticker.defaultTimestamp, nil
}
return ts, nil
@ -217,16 +187,8 @@ func (ticker *channelsTimeTickerImpl) getLastTick(pchan pChan) (Timestamp, error
func (ticker *channelsTimeTickerImpl) getMinTick() Timestamp {
ticker.statisticsMtx.RLock()
defer ticker.statisticsMtx.RUnlock()
minTs := typeutil.ZeroTimestamp
for _, ts := range ticker.minTsStatistics {
if ts < minTs {
minTs = ts
}
}
return minTs
// may be zero
return ticker.minTimestamp
}
func newChannelsTimeTicker(

View File

@ -95,34 +95,6 @@ func TestChannelsTimeTickerImpl_close(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}
func TestChannelsTimeTickerImpl_addPChan(t *testing.T) {
interval := time.Millisecond * 10
pchanNum := rand.Uint64()%10 + 1
pchans := make([]pChan, 0, pchanNum)
for i := 0; uint64(i) < pchanNum; i++ {
pchans = append(pchans, funcutil.GenRandomStr())
}
tso := newMockTsoAllocator()
ctx := context.Background()
ticker := newChannelsTimeTicker(ctx, interval, pchans, newGetStatisticsFunc(pchans), tso)
err := ticker.start()
assert.Equal(t, nil, err)
newPChanNum := rand.Uint64()%10 + 1
for i := 0; uint64(i) < newPChanNum; i++ {
err = ticker.addPChan(funcutil.GenRandomStr())
assert.Equal(t, nil, err)
}
defer func() {
err := ticker.close()
assert.Equal(t, nil, err)
}()
time.Sleep(100 * time.Millisecond)
}
func TestChannelsTimeTickerImpl_getLastTick(t *testing.T) {
interval := time.Millisecond * 10
pchanNum := rand.Uint64()%10 + 1
@ -195,7 +167,7 @@ func TestChannelsTimeTickerImpl_getMinTsStatistics(t *testing.T) {
case <-b:
return
case <-timer.C:
stats, err := ticker.getMinTsStatistics()
stats, _, err := ticker.getMinTsStatistics()
assert.Equal(t, nil, err)
for pchan, ts := range stats {
log.Debug("TestChannelsTimeTickerImpl_getLastTick",

View File

@ -234,15 +234,14 @@ func (node *Proxy) sendChannelsTimeTickLoop() {
case <-node.ctx.Done():
return
case <-timer.C:
ts, err := node.tsoAllocator.AllocOne()
stats, ts, err := node.chTicker.getMinTsStatistics()
if err != nil {
log.Warn("Failed to get timestamp from tso", zap.Error(err))
log.Warn("sendChannelsTimeTickLoop.getMinTsStatistics", zap.Error(err))
continue
}
stats, err := node.chTicker.getMinTsStatistics()
if err != nil {
log.Warn("sendChannelsTimeTickLoop.getMinTsStatistics", zap.Error(err))
if ts == 0 {
log.Warn("sendChannelsTimeTickLoop.getMinTsStatistics default timestamp equal 0")
continue
}

View File

@ -194,17 +194,6 @@ func (it *insertTask) getChannels() ([]pChan, error) {
return nil, err
}
channels, err = it.chMgr.getChannels(collID)
if err == nil {
for _, pchan := range channels {
err := it.chTicker.addPChan(pchan)
if err != nil {
log.Warn("failed to add pchan to channels time ticker",
zap.Error(err),
zap.Int64("collection id", collID),
zap.String("pchan", pchan))
}
}
}
}
return channels, err
}
@ -729,7 +718,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
IDs: &schemapb.IDs{
IdField: nil,
},
Timestamp: it.BeginTs(),
Timestamp: it.EndTs(),
}
collectionName := it.BaseInsertTask.CollectionName
@ -1025,17 +1014,6 @@ func (it *insertTask) Execute(ctx context.Context) error {
it.result.Status.Reason = err.Error()
return err
}
channels, err := it.chMgr.getChannels(collID)
if err == nil {
for _, pchan := range channels {
err := it.chTicker.addPChan(pchan)
if err != nil {
log.Warn("failed to add pchan to channels time ticker",
zap.Error(err),
zap.String("pchan", pchan))
}
}
}
stream, err = it.chMgr.getDMLStream(collID)
if err != nil {
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
@ -1268,11 +1246,6 @@ func (dct *dropCollectionTask) Execute(ctx context.Context) error {
return err
}
pchans, _ := dct.chMgr.getChannels(collID)
for _, pchan := range pchans {
_ = dct.chTicker.removePChan(pchan)
}
_ = dct.chMgr.removeDMLStream(collID)
_ = dct.chMgr.removeDQLStream(collID)
@ -4643,17 +4616,6 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) {
dt.result.Status.Reason = err.Error()
return err
}
channels, err := dt.chMgr.getChannels(collID)
if err == nil {
for _, pchan := range channels {
err := dt.chTicker.addPChan(pchan)
if err != nil {
log.Warn("failed to add pchan to channels time ticker",
zap.Error(err),
zap.String("pchan", pchan))
}
}
}
stream, err = dt.chMgr.getDMLStream(collID)
if err != nil {
dt.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError

View File

@ -226,9 +226,8 @@ type dmTaskQueue struct {
}
func (queue *dmTaskQueue) Enqueue(t task) error {
queue.lock.Lock()
defer queue.lock.Unlock()
queue.statsLock.Lock()
defer queue.statsLock.Unlock()
err := queue.baseTaskQueue.Enqueue(t)
if err != nil {
return err
@ -243,6 +242,9 @@ func (queue *dmTaskQueue) PopActiveTask(tID UniqueID) task {
defer queue.atLock.Unlock()
t, ok := queue.activeTasks[tID]
if ok {
queue.statsLock.Lock()
defer queue.statsLock.Unlock()
delete(queue.activeTasks, tID)
log.Debug("Proxy dmTaskQueue popPChanStats", zap.Any("tID", t.ID()))
queue.popPChanStats(t)
@ -260,7 +262,6 @@ func (queue *dmTaskQueue) addPChanStats(t task) error {
zap.Any("stats", stats), zap.Error(err))
return err
}
queue.statsLock.Lock()
for cName, stat := range stats {
info, ok := queue.pChanStatisticsInfos[cName]
if !ok {
@ -281,7 +282,6 @@ func (queue *dmTaskQueue) addPChanStats(t task) error {
queue.pChanStatisticsInfos[cName].tsSet[info.minTs] = struct{}{}
}
}
queue.statsLock.Unlock()
} else {
return fmt.Errorf("proxy addUnissuedTask reflect to dmlTask failed, tID:%v", t.ID())
}
@ -294,7 +294,6 @@ func (queue *dmTaskQueue) popPChanStats(t task) error {
if err != nil {
return err
}
queue.statsLock.Lock()
for _, cName := range channels {
info, ok := queue.pChanStatisticsInfos[cName]
if ok {
@ -312,7 +311,6 @@ func (queue *dmTaskQueue) popPChanStats(t task) error {
}
}
}
queue.statsLock.Unlock()
} else {
return fmt.Errorf("Proxy dmTaskQueue popPChanStats reflect to dmlTask failed, tID:%v", t.ID())
}

View File

@ -17,7 +17,7 @@ import (
)
// MaxTimestamp is the max timestamp.
const MaxTimestamp = math.MaxUint64
const MaxTimestamp = Timestamp(math.MaxUint64)
// ZeroTime is a zero time.
var ZeroTime = time.Time{}