Refactor proxy channelTimeTick and fix search timeout (#5760)

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
pull/5779/head
zhenshan.cao 2021-06-15 10:19:38 +08:00
parent 0f4bd50ce3
commit 5c69c6902d
14 changed files with 329 additions and 212 deletions

View File

@ -54,6 +54,7 @@ func getIndexServiceaddr(sess *sessionutil.Session) (string, error) {
log.Debug("IndexServiceClient GetSessions failed", zap.Any("key", key), zap.Error(err))
return "", err
}
log.Debug("IndexServiceClient GetSessions success", zap.Any("key", key), zap.Any("msess", msess))
ms, ok := msess[key]
if !ok {
log.Debug("IndexServiceClient msess key not existed", zap.Any("key", key), zap.Any("len of msess", len(msess)))

View File

@ -178,6 +178,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
if err := s.grpcServer.Serve(lis); err != nil {
s.grpcErrChan <- err
}
log.Debug("IndexService grpcServer loop exit")
}
func NewServer(ctx context.Context) (*Server, error) {

View File

@ -206,11 +206,10 @@ func (i *IndexNode) CreateIndex(ctx context.Context, request *indexpb.CreateInde
ctx: ctx,
done: make(chan error),
},
req: request,
kv: i.kv,
etcdKV: i.etcdKV,
serviceClient: i.serviceClient,
nodeID: Params.NodeID,
req: request,
kv: i.kv,
etcdKV: i.etcdKV,
nodeID: Params.NodeID,
}
ret := &commonpb.Status{

View File

@ -29,7 +29,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
)
@ -86,13 +85,12 @@ func (bt *BaseTask) Notify(err error) {
type IndexBuildTask struct {
BaseTask
index Index
kv kv.BaseKV
etcdKV *etcdkv.EtcdKV
savePaths []string
req *indexpb.CreateIndexRequest
serviceClient types.IndexService
nodeID UniqueID
index Index
kv kv.BaseKV
etcdKV *etcdkv.EtcdKV
savePaths []string
req *indexpb.CreateIndexRequest
nodeID UniqueID
}
func (it *IndexBuildTask) Ctx() context.Context {
@ -174,18 +172,6 @@ func (it *IndexBuildTask) PreExecute(ctx context.Context) error {
func (it *IndexBuildTask) PostExecute(ctx context.Context) error {
log.Debug("IndexNode IndexBuildTask PostExecute...")
defer func() {
if it.err != nil {
it.Rollback()
}
}()
if it.serviceClient == nil {
err := errors.New("IndexNode IndexBuildTask PostExecute, serviceClient is nil")
log.Error("", zap.Error(err))
return err
}
return it.checkIndexMeta(false)
}
@ -395,16 +381,3 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
// }
return nil
}
func (it *IndexBuildTask) Rollback() error {
if it.savePaths == nil {
return nil
}
err := it.kv.MultiRemove(it.savePaths)
if err != nil {
log.Warn("IndexNode IndexBuildTask Rollback Failed", zap.Error(err))
return err
}
return nil
}

View File

@ -757,6 +757,7 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
}
indexName := Params.DefaultIndexName //TODO, get name from request
indexID, _, err := t.core.IDAllocator(1)
log.Debug("MasterService CreateIndexReqTask", zap.Any("indexID", indexID), zap.Error(err))
if err != nil {
return err
}
@ -766,6 +767,7 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
IndexParams: t.Req.ExtraParams,
}
segIDs, field, err := t.core.MetaTable.GetNotIndexedSegments(t.Req.CollectionName, t.Req.FieldName, idxInfo)
log.Debug("MasterService CreateIndexReqTask metaTable.GetNotIndexedSegments", zap.Error(err))
if err != nil {
return err
}

View File

@ -134,6 +134,8 @@ func (t *timetickSync) StartWatch() {
}
}
}
log.Debug("MasterService timeticksync",
zap.Any("chanName2TimeTickMap", chanName2TimeTickMap))
// send timetick msg to msg stream
for chanName, chanTs := range chanName2TimeTickMap {
if err := t.SendChannelTimeTick(chanName, chanTs); err != nil {

View File

@ -6,22 +6,17 @@ import (
"sync"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
type pChanStatistics struct {
minTs Timestamp
maxTs Timestamp
invalid bool // invalid is true when there is no task in queue
minTs Timestamp
maxTs Timestamp
}
// channelsTimeTickerCheckFunc(pchan, ts) return true only when all timestamp of tasks who use the pchan is greater than ts
type channelsTimeTickerCheckFunc func(string, Timestamp) bool
// ticker can update ts only when the minTs greater than the ts of ticker, we can use maxTs to update current later
type getPChanStatisticsFuncType func(pChan) (pChanStatistics, error)
type getPChanStatisticsFuncType func() (map[pChan]*pChanStatistics, error)
// use interface tsoAllocator to keep channelsTimeTickerImpl testable
type tsoAllocator interface {
@ -59,9 +54,10 @@ func (ticker *channelsTimeTickerImpl) getMinTsStatistics() (map[pChan]Timestamp,
ret := make(map[pChan]Timestamp)
for k, v := range ticker.minTsStatistics {
ret[k] = v
if v > 0 {
ret[k] = v
}
}
return ret, nil
}
@ -83,47 +79,58 @@ func (ticker *channelsTimeTickerImpl) initCurrents(current Timestamp) {
}
}
// What if golang support generic? interface{} is not comparable now!
func getTs(ts1, ts2 Timestamp, comp func(ts1, ts2 Timestamp) bool) Timestamp {
if comp(ts1, ts2) {
return ts1
}
return ts2
}
func (ticker *channelsTimeTickerImpl) tick() error {
now, err := ticker.tso.AllocOne()
if err != nil {
log.Warn("ProxyNode channelsTimeTickerImpl failed to get ts from tso", zap.Error(err))
return err
}
//nowPTime, _ := tsoutil.ParseTS(now)
ticker.statisticsMtx.Lock()
defer ticker.statisticsMtx.Unlock()
ticker.currentsMtx.Lock()
defer ticker.currentsMtx.Unlock()
stats, err := ticker.getStatisticsFunc()
if err != nil {
log.Debug("ProxyNode channelsTimeTickerImpl failed to getStatistics", zap.Error(err))
}
for pchan := range ticker.currents {
current := ticker.currents[pchan]
//currentPTime, _ := tsoutil.ParseTS(current)
stat, ok := stats[pchan]
//log.Debug("ProxyNode channelsTimeTickerImpl", zap.Any("pchan", pchan),
// zap.Any("TaskInQueue", ok),
// zap.Any("current", currentPTime),
// zap.Any("now", nowPTime))
if !ok {
ticker.minTsStatistics[pchan] = current
ticker.currents[pchan] = now
} else {
//minPTime, _ := tsoutil.ParseTS(stat.minTs)
//maxPTime, _ := tsoutil.ParseTS(stat.maxTs)
stats, err := ticker.getStatisticsFunc(pchan)
if err != nil {
log.Warn("failed to get statistics from scheduler", zap.Error(err))
continue
}
log.Debug("ProxyNode channelsTimeTickerImpl", zap.Any("invalid", stats.invalid),
zap.Any("stats.minTs", stats.minTs), zap.Any("current", current))
if !stats.invalid && stats.minTs > current {
ticker.minTsStatistics[pchan] = current
ticker.currents[pchan] = getTs(current+Timestamp(ticker.interval), stats.maxTs, func(ts1, ts2 Timestamp) bool {
return ts1 > ts2
})
} else if stats.invalid {
ticker.minTsStatistics[pchan] = current
// ticker.currents[pchan] = current + Timestamp(ticker.interval)
t, err := ticker.tso.AllocOne()
if err != nil {
log.Warn("failed to get ts from tso", zap.Error(err))
continue
if stat.minTs > current {
cur := current
if stat.minTs > now {
cur = now
}
ticker.minTsStatistics[pchan] = cur
next := now + Timestamp(sendTimeTickMsgInterval)
if next > stat.maxTs {
next = stat.maxTs
}
ticker.currents[pchan] = next
//nextPTime, _ := tsoutil.ParseTS(next)
//log.Debug("ProxyNode channelsTimeTickerImpl",
// zap.Any("pchan", pchan),
// zap.Any("minPTime", minPTime),
// zap.Any("maxPTime", maxPTime),
// zap.Any("nextPTime", nextPTime))
}
log.Debug("ProxyNode channelsTimeTickerImpl, update currents", zap.Any("pchan", pchan),
zap.Any("t", t))
ticker.currents[pchan] = t
}
}
@ -172,12 +179,13 @@ func (ticker *channelsTimeTickerImpl) close() error {
func (ticker *channelsTimeTickerImpl) addPChan(pchan pChan) error {
ticker.statisticsMtx.Lock()
defer ticker.statisticsMtx.Unlock()
if _, ok := ticker.minTsStatistics[pchan]; ok {
ticker.statisticsMtx.Unlock()
return fmt.Errorf("pChan %v already exist in minTsStatistics", pchan)
}
ticker.minTsStatistics[pchan] = 0
ticker.statisticsMtx.Unlock()
ticker.currentsMtx.Lock()
defer ticker.currentsMtx.Unlock()
@ -191,13 +199,13 @@ func (ticker *channelsTimeTickerImpl) addPChan(pchan pChan) error {
func (ticker *channelsTimeTickerImpl) removePChan(pchan pChan) error {
ticker.statisticsMtx.Lock()
defer ticker.statisticsMtx.Unlock()
if _, ok := ticker.minTsStatistics[pchan]; !ok {
ticker.statisticsMtx.Unlock()
return fmt.Errorf("pChan %v don't exist in minTsStatistics", pchan)
}
delete(ticker.minTsStatistics, pchan)
ticker.statisticsMtx.Unlock()
ticker.currentsMtx.Lock()
defer ticker.currentsMtx.Unlock()

View File

@ -25,13 +25,25 @@ func newMockTsoAllocator() *mockTsoAllocator {
return &mockTsoAllocator{}
}
func getStatistics(pchan pChan) (pChanStatistics, error) {
stats := pChanStatistics{
minTs: Timestamp(time.Now().UnixNano()),
invalid: false,
func newGetStatisticsFunc(pchans []pChan) getPChanStatisticsFuncType {
pchanNum := rand.Uint64()%5 + 1
pchans2 := make([]pChan, 0, pchanNum)
for i := 0; uint64(i) < pchanNum; i++ {
pchans2 = append(pchans2, pchans[i])
}
stats.maxTs = stats.minTs + Timestamp(time.Millisecond*10)
return stats, nil
retFunc := func() (map[pChan]*pChanStatistics, error) {
ret := make(map[pChan]*pChanStatistics)
for _, pchannel := range pchans2 {
minTs := Timestamp(time.Now().UnixNano())
ret[pchannel] = &pChanStatistics{
minTs: minTs,
maxTs: minTs + Timestamp(time.Millisecond*10),
}
}
return ret, nil
}
return retFunc
}
func TestChannelsTimeTickerImpl_start(t *testing.T) {
@ -44,7 +56,7 @@ func TestChannelsTimeTickerImpl_start(t *testing.T) {
tso := newMockTsoAllocator()
ctx := context.Background()
ticker := newChannelsTimeTicker(ctx, interval, pchans, getStatistics, tso)
ticker := newChannelsTimeTicker(ctx, interval, pchans, newGetStatisticsFunc(pchans), tso)
err := ticker.start()
assert.Equal(t, nil, err)
@ -66,7 +78,7 @@ func TestChannelsTimeTickerImpl_close(t *testing.T) {
tso := newMockTsoAllocator()
ctx := context.Background()
ticker := newChannelsTimeTicker(ctx, interval, pchans, getStatistics, tso)
ticker := newChannelsTimeTicker(ctx, interval, pchans, newGetStatisticsFunc(pchans), tso)
err := ticker.start()
assert.Equal(t, nil, err)
@ -88,7 +100,7 @@ func TestChannelsTimeTickerImpl_addPChan(t *testing.T) {
tso := newMockTsoAllocator()
ctx := context.Background()
ticker := newChannelsTimeTicker(ctx, interval, pchans, getStatistics, tso)
ticker := newChannelsTimeTicker(ctx, interval, pchans, newGetStatisticsFunc(pchans), tso)
err := ticker.start()
assert.Equal(t, nil, err)
@ -116,7 +128,7 @@ func TestChannelsTimeTickerImpl_getLastTick(t *testing.T) {
tso := newMockTsoAllocator()
ctx := context.Background()
ticker := newChannelsTimeTicker(ctx, interval, pchans, getStatistics, tso)
ticker := newChannelsTimeTicker(ctx, interval, pchans, newGetStatisticsFunc(pchans), tso)
err := ticker.start()
assert.Equal(t, nil, err)
@ -163,7 +175,7 @@ func TestChannelsTimeTickerImpl_getMinTsStatistics(t *testing.T) {
tso := newMockTsoAllocator()
ctx := context.Background()
ticker := newChannelsTimeTicker(ctx, interval, pchans, getStatistics, tso)
ticker := newChannelsTimeTicker(ctx, interval, pchans, newGetStatisticsFunc(pchans), tso)
err := ticker.start()
assert.Equal(t, nil, err)

View File

@ -38,6 +38,9 @@ import (
type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
const sendTimeTickMsgInterval = 200 * time.Millisecond
const channelMgrTickerInterval = 100 * time.Millisecond
type ProxyNode struct {
ctx context.Context
cancel func()
@ -256,9 +259,7 @@ func (node *ProxyNode) Init() error {
node.tick = newTimeTick(node.ctx, node.tsoAllocator, time.Millisecond*200, node.sched.TaskDoneTest, node.msFactory)
// TODO(dragondriver): read this from config
interval := time.Millisecond * 200
node.chTicker = newChannelsTimeTicker(node.ctx, interval, []string{}, node.sched.getPChanStatistics, tsoAllocator)
node.chTicker = newChannelsTimeTicker(node.ctx, channelMgrTickerInterval, []string{}, node.sched.getPChanStatistics, tsoAllocator)
return nil
}
@ -269,8 +270,7 @@ func (node *ProxyNode) sendChannelsTimeTickLoop() {
defer node.wg.Done()
// TODO(dragondriver): read this from config
interval := time.Millisecond * 200
timer := time.NewTicker(interval)
timer := time.NewTicker(sendTimeTickMsgInterval)
for {
select {
@ -283,8 +283,6 @@ func (node *ProxyNode) sendChannelsTimeTickLoop() {
continue
}
log.Debug("send timestamp statistics of pchan", zap.Any("statistics", stats))
channels := make([]pChan, 0, len(stats))
tss := make([]Timestamp, 0, len(stats))
@ -292,6 +290,7 @@ func (node *ProxyNode) sendChannelsTimeTickLoop() {
channels = append(channels, channel)
tss = append(tss, ts)
}
log.Debug("send timestamp statistics of pchan", zap.Any("channels", channels), zap.Any("tss", tss))
req := &internalpb.ChannelTimeTickMsg{
Base: &commonpb.MsgBase{

View File

@ -95,18 +95,10 @@ type task interface {
Notify(err error)
}
type ddlTask interface {
task
}
type dmlTask interface {
task
getStatistics(pchan pChan) (pChanStatistics, error)
}
type dqlTask interface {
task
getVChannels() ([]vChan, error)
getChannels() ([]vChan, error)
getPChanStats() (map[pChan]pChanStatistics, error)
}
type BaseInsertTask = msgstream.InsertMsg
@ -122,6 +114,8 @@ type InsertTask struct {
segIDAssigner *SegIDAssigner
chMgr channelsMgr
chTicker channelsTimeTicker
vChannels []vChan
pChannels []pChan
}
func (it *InsertTask) TraceCtx() context.Context {
@ -157,35 +151,41 @@ func (it *InsertTask) EndTs() Timestamp {
return it.EndTimestamp
}
func (it *InsertTask) getStatistics(pchan pChan) (pChanStatistics, error) {
func (it *InsertTask) getPChanStats() (map[pChan]pChanStatistics, error) {
ret := make(map[pChan]pChanStatistics)
channels, err := it.getChannels()
if err != nil {
return ret, err
}
beginTs := it.BeginTs()
endTs := it.EndTs()
for _, channel := range channels {
ret[channel] = pChanStatistics{
minTs: beginTs,
maxTs: endTs,
}
}
return ret, nil
}
func (it *InsertTask) getChannels() ([]pChan, error) {
collID, err := globalMetaCache.GetCollectionID(it.ctx, it.CollectionName)
if err != nil {
return pChanStatistics{invalid: true}, err
return nil, err
}
_, err = it.chMgr.getChannels(collID)
var channels []pChan
channels, err = it.chMgr.getChannels(collID)
if err != nil {
err := it.chMgr.createDMLMsgStream(collID)
err = it.chMgr.createDMLMsgStream(collID)
if err != nil {
return pChanStatistics{invalid: true}, err
return nil, err
}
channels, err = it.chMgr.getChannels(collID)
}
pchans, err := it.chMgr.getChannels(collID)
if err != nil {
return pChanStatistics{invalid: true}, err
}
for _, ch := range pchans {
if pchan == ch {
return pChanStatistics{
minTs: it.BeginTimestamp,
maxTs: it.EndTimestamp,
invalid: false,
}, nil
}
}
return pChanStatistics{invalid: true}, nil
return channels, err
}
func (it *InsertTask) OnEnqueue() error {
@ -735,14 +735,16 @@ func (it *InsertTask) Execute(ctx context.Context) error {
if err != nil {
err = it.chMgr.createDMLMsgStream(collID)
if err != nil {
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
it.result.Status.Reason = err.Error()
return err
}
stream, err = it.chMgr.getDMLStream(collID)
if err != nil {
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
it.result.Status.Reason = err.Error()
return err
}
}
stream, err = it.chMgr.getDMLStream(collID)
if err != nil {
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
it.result.Status.Reason = err.Error()
return err
}
pchans, err := it.chMgr.getChannels(collID)
@ -750,7 +752,7 @@ func (it *InsertTask) Execute(ctx context.Context) error {
return err
}
for _, pchan := range pchans {
log.Debug("add pchan to time ticker", zap.Any("pchan", pchan))
log.Debug("ProxyNode InsertTask add pchan", zap.Any("pchan", pchan))
_ = it.chTicker.addPChan(pchan)
}
@ -1027,7 +1029,7 @@ func (st *SearchTask) OnEnqueue() error {
return nil
}
func (st *SearchTask) getChannels() ([]vChan, error) {
func (st *SearchTask) getChannels() ([]pChan, error) {
collID, err := globalMetaCache.GetCollectionID(st.ctx, st.query.CollectionName)
if err != nil {
return nil, err

View File

@ -24,6 +24,7 @@ import (
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/opentracing/opentracing-go"
@ -38,7 +39,7 @@ type TaskQueue interface {
FrontUnissuedTask() task
PopUnissuedTask() task
AddActiveTask(t task)
PopActiveTask(ts Timestamp) task
PopActiveTask(tID UniqueID) task
getTaskByReqID(reqID UniqueID) task
TaskDoneTest(ts Timestamp) bool
Enqueue(t task) error
@ -46,7 +47,7 @@ type TaskQueue interface {
type BaseTaskQueue struct {
unissuedTasks *list.List
activeTasks map[Timestamp]task
activeTasks map[UniqueID]task
utLock sync.RWMutex
atLock sync.RWMutex
@ -114,28 +115,26 @@ func (queue *BaseTaskQueue) PopUnissuedTask() task {
func (queue *BaseTaskQueue) AddActiveTask(t task) {
queue.atLock.Lock()
defer queue.atLock.Unlock()
ts := t.EndTs()
_, ok := queue.activeTasks[ts]
tID := t.ID()
_, ok := queue.activeTasks[tID]
if ok {
log.Debug("proxynode", zap.Uint64("task with timestamp ts already in active task list! ts:", ts))
log.Debug("ProxyNode task with tID already in active task list!", zap.Any("ID", tID))
}
queue.activeTasks[ts] = t
queue.activeTasks[tID] = t
}
func (queue *BaseTaskQueue) PopActiveTask(ts Timestamp) task {
func (queue *BaseTaskQueue) PopActiveTask(tID UniqueID) task {
queue.atLock.Lock()
defer queue.atLock.Unlock()
t, ok := queue.activeTasks[ts]
t, ok := queue.activeTasks[tID]
if ok {
delete(queue.activeTasks, ts)
delete(queue.activeTasks, tID)
return t
}
log.Debug("proxynode", zap.Uint64("task with timestamp ts already in active task list! ts:", ts))
return nil
log.Debug("ProxyNode task not in active task list! ts", zap.Any("tID", tID))
return t
}
func (queue *BaseTaskQueue) getTaskByReqID(reqID UniqueID) task {
@ -149,9 +148,9 @@ func (queue *BaseTaskQueue) getTaskByReqID(reqID UniqueID) task {
queue.atLock.RLock()
defer queue.atLock.RUnlock()
for ats := range queue.activeTasks {
if queue.activeTasks[ats].ID() == reqID {
return queue.activeTasks[ats]
for tID := range queue.activeTasks {
if tID == reqID {
return queue.activeTasks[tID]
}
}
@ -169,8 +168,8 @@ func (queue *BaseTaskQueue) TaskDoneTest(ts Timestamp) bool {
queue.atLock.RLock()
defer queue.atLock.RUnlock()
for ats := range queue.activeTasks {
if ats < ts {
for _, task := range queue.activeTasks {
if task.BeginTs() < ts {
return false
}
}
@ -204,54 +203,134 @@ type DdTaskQueue struct {
lock sync.Mutex
}
type DmTaskQueue struct {
BaseTaskQueue
type pChanStatInfo struct {
pChanStatistics
refCnt int
}
func (queue *DmTaskQueue) getPChanStatistics(pchan pChan) (pChanStatistics, error) {
stats := pChanStatistics{
minTs: 0,
maxTs: ^uint64(0),
invalid: true,
type DmTaskQueue struct {
BaseTaskQueue
statsLock sync.RWMutex
pChanStatisticsInfos map[pChan]*pChanStatInfo
}
func (queue *DmTaskQueue) Enqueue(t task) error {
err := t.OnEnqueue()
if err != nil {
return err
}
queue.utLock.RLock()
defer queue.utLock.RUnlock()
ts, err := queue.sched.tsoAllocator.AllocOne()
if err != nil {
return err
}
t.SetTs(ts)
for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() {
dmlT := e.Value.(task).(dmlTask)
stat, err := dmlT.getStatistics(pchan)
reqID, err := queue.sched.idAllocator.AllocOne()
if err != nil {
return err
}
t.SetID(reqID)
return queue.addUnissuedTask(t)
}
func (queue *DmTaskQueue) addUnissuedTask(t task) error {
queue.utLock.Lock()
defer queue.utLock.Unlock()
if queue.utFull() {
return errors.New("task queue is full")
}
queue.unissuedTasks.PushBack(t)
queue.addPChanStats(t)
queue.utBufChan <- 1
return nil
}
func (queue *DmTaskQueue) PopActiveTask(tID UniqueID) task {
queue.atLock.Lock()
defer queue.atLock.Unlock()
t, ok := queue.activeTasks[tID]
if ok {
delete(queue.activeTasks, tID)
log.Debug("ProxyNode DmTaskQueue popPChanStats", zap.Any("tID", t.ID()))
queue.popPChanStats(t)
} else {
log.Debug("ProxyNode task not in active task list!", zap.Any("tID", tID))
}
return t
}
func (queue *DmTaskQueue) addPChanStats(t task) error {
if dmT, ok := t.(dmlTask); ok {
stats, err := dmT.getPChanStats()
if err != nil {
return pChanStatistics{invalid: true}, nil
return err
}
if stat.minTs < stats.minTs {
stats.minTs = stat.minTs
log.Debug("ProxyNode DmTaskQueue addPChanStats", zap.Any("tID", t.ID()),
zap.Any("stats", stats))
queue.statsLock.Lock()
for cName, stat := range stats {
info, ok := queue.pChanStatisticsInfos[cName]
if !ok {
info = &pChanStatInfo{
pChanStatistics: stat,
refCnt: 1,
}
queue.pChanStatisticsInfos[cName] = info
} else {
if info.minTs > stat.minTs {
info.minTs = stat.minTs
}
if info.maxTs < stat.maxTs {
info.maxTs = stat.maxTs
}
info.refCnt++
}
}
if stat.maxTs > stats.maxTs {
stats.maxTs = stat.maxTs
}
stats.invalid = false
queue.statsLock.Unlock()
} else {
return fmt.Errorf("ProxyNode addUnissuedTask reflect to dmlTask failed, tID:%v", t.ID())
}
return nil
}
queue.atLock.RLock()
defer queue.atLock.RUnlock()
for _, t := range queue.activeTasks {
dmlT, _ := t.(dmlTask)
stat, err := dmlT.getStatistics(pchan)
func (queue *DmTaskQueue) popPChanStats(t task) error {
if dmT, ok := t.(dmlTask); ok {
channels, err := dmT.getChannels()
if err != nil {
return pChanStatistics{invalid: true}, nil
return err
}
if stat.minTs < stats.minTs {
stats.minTs = stat.minTs
queue.statsLock.Lock()
for _, cName := range channels {
info, ok := queue.pChanStatisticsInfos[cName]
if ok {
info.refCnt--
if info.refCnt <= 0 {
delete(queue.pChanStatisticsInfos, cName)
}
}
}
if stat.maxTs > stats.maxTs {
stats.maxTs = stat.maxTs
}
stats.invalid = false
queue.statsLock.Unlock()
} else {
return fmt.Errorf("ProxyNode DmTaskQueue popPChanStats reflect to dmlTask failed, tID:%v", t.ID())
}
return nil
}
return stats, nil
func (queue *DmTaskQueue) getPChanStatsInfo() (map[pChan]*pChanStatistics, error) {
ret := make(map[pChan]*pChanStatistics)
queue.statsLock.RLock()
defer queue.statsLock.RUnlock()
for cName, info := range queue.pChanStatisticsInfos {
ret[cName] = &pChanStatistics{
minTs: info.minTs,
maxTs: info.maxTs,
}
}
return ret, nil
}
type DqTaskQueue struct {
@ -268,7 +347,7 @@ func NewDdTaskQueue(sched *TaskScheduler) *DdTaskQueue {
return &DdTaskQueue{
BaseTaskQueue: BaseTaskQueue{
unissuedTasks: list.New(),
activeTasks: make(map[Timestamp]task),
activeTasks: make(map[UniqueID]task),
maxTaskNum: 1024,
utBufChan: make(chan int, 1024),
sched: sched,
@ -280,11 +359,12 @@ func NewDmTaskQueue(sched *TaskScheduler) *DmTaskQueue {
return &DmTaskQueue{
BaseTaskQueue: BaseTaskQueue{
unissuedTasks: list.New(),
activeTasks: make(map[Timestamp]task),
activeTasks: make(map[UniqueID]task),
maxTaskNum: 1024,
utBufChan: make(chan int, 1024),
sched: sched,
},
pChanStatisticsInfos: make(map[pChan]*pChanStatInfo),
}
}
@ -292,7 +372,7 @@ func NewDqTaskQueue(sched *TaskScheduler) *DqTaskQueue {
return &DqTaskQueue{
BaseTaskQueue: BaseTaskQueue{
unissuedTasks: list.New(),
activeTasks: make(map[Timestamp]task),
activeTasks: make(map[UniqueID]task),
maxTaskNum: 1024,
utBufChan: make(chan int, 1024),
sched: sched,
@ -382,7 +462,7 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
defer func() {
span.LogFields(oplog.Int64("scheduler process PopActiveTask", t.ID()))
q.PopActiveTask(t.EndTs())
q.PopActiveTask(t.ID())
}()
span.LogFields(oplog.Int64("scheduler process Execute", t.ID()))
err = t.Execute(ctx)
@ -449,6 +529,7 @@ type searchResultBuf struct {
receivedSealedSegmentIDsSet map[interface{}]struct{} // set of UniqueID
receivedGlobalSegmentIDsSet map[interface{}]struct{} // set of UniqueID
resultBuf []*internalpb.SearchResults
haveError bool
}
func newSearchResultBuf() *searchResultBuf {
@ -459,6 +540,7 @@ func newSearchResultBuf() *searchResultBuf {
receivedSealedSegmentIDsSet: make(map[interface{}]struct{}),
receivedGlobalSegmentIDsSet: make(map[interface{}]struct{}),
resultBuf: make([]*internalpb.SearchResults, 0),
haveError: false,
}
}
@ -482,6 +564,10 @@ func setContain(m1, m2 map[interface{}]struct{}) bool {
}
func (sr *searchResultBuf) readyToReduce() bool {
if sr.haveError {
log.Debug("ProxyNode searchResultBuf readyToReduce", zap.Any("haveError", true))
return true
}
usedChansSetStrMap := make(map[string]int)
for x := range sr.usedChans {
@ -529,6 +615,10 @@ func (sr *searchResultBuf) readyToReduce() bool {
func (sr *searchResultBuf) addPartialResult(result *internalpb.SearchResults) {
sr.resultBuf = append(sr.resultBuf, result)
if result.Status.ErrorCode != commonpb.ErrorCode_Success {
sr.haveError = true
return
}
for _, vchan := range result.ChannelIDsSearched {
sr.receivedVChansSet[vchan] = struct{}{}
@ -557,6 +647,7 @@ func (sched *TaskScheduler) queryResultLoop() {
defer queryResultMsgStream.Close()
queryResultBuf := make(map[UniqueID]*searchResultBuf)
queryResultBufFlag := make(map[UniqueID]bool) // if value is true, we can ignore queryResult
retrieveResultBuf := make(map[UniqueID][]*internalpb.RetrieveResults)
for {
@ -575,11 +666,21 @@ func (sched *TaskScheduler) queryResultLoop() {
if searchResultMsg, srOk := tsMsg.(*msgstream.SearchResultMsg); srOk {
reqID := searchResultMsg.Base.MsgID
reqIDStr := strconv.FormatInt(reqID, 10)
ignoreThisResult, ok := queryResultBufFlag[reqID]
if !ok {
queryResultBufFlag[reqID] = false
ignoreThisResult = false
}
if ignoreThisResult {
log.Debug("ProxyNode queryResultLoop Got a SearchResultMsg, but we should ignore", zap.Any("ReqID", reqID))
continue
}
t := sched.getTaskByReqID(reqID)
log.Debug("ProxyNode queryResultLoop Got a SearchResultMsg", zap.Any("ReqID", reqID), zap.Any("t", t))
if t == nil {
log.Debug("ProxyNode queryResultLoop GetTaskByReqID failed", zap.String("reqID", reqIDStr))
delete(queryResultBuf, reqID)
queryResultBufFlag[reqID] = true
continue
}
@ -587,12 +688,13 @@ func (sched *TaskScheduler) queryResultLoop() {
if !ok {
log.Debug("ProxyNode queryResultLoop type assert t as SearchTask failed", zap.Any("t", t))
delete(queryResultBuf, reqID)
queryResultBufFlag[reqID] = true
continue
}
_, ok = queryResultBuf[reqID]
resultBuf, ok := queryResultBuf[reqID]
if !ok {
queryResultBuf[reqID] = newSearchResultBuf()
resultBuf = newSearchResultBuf()
vchans, err := st.getVChannels()
log.Debug("ProxyNode queryResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("vchans", vchans),
zap.Error(err))
@ -601,7 +703,7 @@ func (sched *TaskScheduler) queryResultLoop() {
continue
}
for _, vchan := range vchans {
queryResultBuf[reqID].usedVChans[vchan] = struct{}{}
resultBuf.usedVChans[vchan] = struct{}{}
}
pchans, err := st.getChannels()
log.Debug("ProxyNode queryResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("pchans", pchans),
@ -611,10 +713,11 @@ func (sched *TaskScheduler) queryResultLoop() {
continue
}
for _, pchan := range pchans {
queryResultBuf[reqID].usedChans[pchan] = struct{}{}
resultBuf.usedChans[pchan] = struct{}{}
}
queryResultBuf[reqID] = resultBuf
}
queryResultBuf[reqID].addPartialResult(&searchResultMsg.SearchResults)
resultBuf.addPartialResult(&searchResultMsg.SearchResults)
//t := sched.getTaskByReqID(reqID)
{
@ -622,9 +725,11 @@ func (sched *TaskScheduler) queryResultLoop() {
log.Debug("ProxyNode queryResultLoop", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(queryResultBuf[reqID].resultBuf)))
}
if queryResultBuf[reqID].readyToReduce() {
if resultBuf.readyToReduce() {
log.Debug("ProxyNode queryResultLoop readyToReduce and assign to reduce")
st.resultBuf <- queryResultBuf[reqID].resultBuf
queryResultBufFlag[reqID] = true
st.resultBuf <- resultBuf.resultBuf
delete(queryResultBuf, reqID)
}
sp.Finish()
@ -694,10 +799,9 @@ func (sched *TaskScheduler) Close() {
func (sched *TaskScheduler) TaskDoneTest(ts Timestamp) bool {
ddTaskDone := sched.DdQueue.TaskDoneTest(ts)
dmTaskDone := sched.DmQueue.TaskDoneTest(ts)
//dqTaskDone := sched.DqQueue.TaskDoneTest(ts)
return ddTaskDone && dmTaskDone && true
return ddTaskDone && dmTaskDone
}
func (sched *TaskScheduler) getPChanStatistics(pchan pChan) (pChanStatistics, error) {
return sched.DmQueue.getPChanStatistics(pchan)
func (sched *TaskScheduler) getPChanStatistics() (map[pChan]*pChanStatistics, error) {
return sched.DmQueue.getPChanStatsInfo()
}

View File

@ -493,6 +493,15 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error {
}
nilHits[i] = bs
}
fakedDmChannels := collection.getWatchedDmChannels()
var realDmChannels []string
for _, dmChan := range fakedDmChannels {
parts := strings.Split(dmChan, "#")
realDmChannels = append(realDmChannels, parts[0])
}
log.Debug("QueryNode searchCollection search, realDmChannels", zap.Any("fakedDmChannels", fakedDmChannels),
zap.Any("realDmChannels", realDmChannels), zap.Any("collectionID", collection.ID()),
zap.Any("sealedSegmentSearched", sealedSegmentSearched))
resultChannelInt := 0
searchResultMsg := &msgstream.SearchResultMsg{
BaseMsg: msgstream.BaseMsg{Ctx: searchMsg.Ctx, HashValues: []uint32{uint32(resultChannelInt)}},
@ -503,10 +512,14 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error {
Timestamp: searchTimestamp,
SourceID: searchMsg.Base.SourceID,
},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
ResultChannelID: searchMsg.ResultChannelID,
Hits: nilHits,
MetricType: plan.getMetricType(),
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
ResultChannelID: searchMsg.ResultChannelID,
Hits: nilHits,
MetricType: plan.getMetricType(),
SealedSegmentIDsSearched: sealedSegmentSearched,
ChannelIDsSearched: realDmChannels,
//TODO:: get global sealed segment from etcd
GlobalSealedSegmentIDs: sealedSegmentSearched,
},
}
err = s.publishSearchResult(searchResultMsg, searchMsg.CollectionID)

View File

@ -247,9 +247,10 @@ func (s *Session) GetSessions(prefix string) (map[string]*Session, int64, error)
if err != nil {
return nil, 0, err
}
log.Debug("SessionUtil GetSessions", zap.Any("prefix", prefix), zap.Any("resp", resp))
for _, kv := range resp.Kvs {
session := &Session{}
err = json.Unmarshal([]byte(kv.Value), session)
err = json.Unmarshal(kv.Value, session)
if err != nil {
return nil, 0, err
}

View File

@ -198,7 +198,7 @@ class TestLoadCollection:
message = getattr(e, 'message', "The exception does not contain the field of message.")
assert message == "describe collection failed: can't find collection: %s" % collection
@pytest.mark.tags(CaseLabel.tags_smoke)
# @pytest.mark.tags(CaseLabel.tags_smoke)
def test_load_collection_without_flush(self, connect, collection):
"""
target: test load collection without flush