mirror of https://github.com/milvus-io/milvus.git
Add more logs for checkpoint of channel (#19885)
Signed-off-by: yah01 <yang.cen@zilliz.com> Signed-off-by: yah01 <yang.cen@zilliz.com>pull/19906/head
parent
da6be52185
commit
7cf26011e6
|
@ -18,10 +18,12 @@ package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/log"
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -146,8 +148,12 @@ func (mgr *TargetManager) AddDmChannel(channels ...*DmChannel) {
|
||||||
defer mgr.rwmutex.Unlock()
|
defer mgr.rwmutex.Unlock()
|
||||||
|
|
||||||
for _, channel := range channels {
|
for _, channel := range channels {
|
||||||
|
ts := channel.GetSeekPosition().GetTimestamp()
|
||||||
log.Info("add channel into targets",
|
log.Info("add channel into targets",
|
||||||
zap.String("channel", channel.GetChannelName()))
|
zap.String("channel", channel.GetChannelName()),
|
||||||
|
zap.Uint64("checkpoint", ts),
|
||||||
|
zap.Duration("sinceCheckpoint", time.Since(tsoutil.PhysicalTime(ts))),
|
||||||
|
)
|
||||||
mgr.dmChannels[channel.ChannelName] = channel
|
mgr.dmChannels[channel.ChannelName] = channel
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ package task
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||||
"github.com/milvus-io/milvus/internal/log"
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
|
@ -26,6 +27,7 @@ import (
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -389,7 +391,12 @@ func (ex *Executor) subDmChannel(task *ChannelTask, step int) error {
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Info("subscribe channel...")
|
|
||||||
|
ts := dmChannel.GetSeekPosition().GetTimestamp()
|
||||||
|
log.Info("subscribe channel...",
|
||||||
|
zap.Uint64("checkpoint", ts),
|
||||||
|
zap.Duration("sinceCheckpoint", time.Since(tsoutil.PhysicalTime(ts))),
|
||||||
|
)
|
||||||
status, err := ex.cluster.WatchDmChannels(ctx, action.Node(), req)
|
status, err := ex.cluster.WatchDmChannels(ctx, action.Node(), req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("failed to subscribe DmChannel, it may be a false failure", zap.Error(err))
|
log.Warn("failed to subscribe DmChannel, it may be a false failure", zap.Error(err))
|
||||||
|
|
|
@ -53,6 +53,11 @@ func ParseTS(ts uint64) (time.Time, uint64) {
|
||||||
return physicalTime, logical
|
return physicalTime, logical
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func PhysicalTime(ts uint64) time.Time {
|
||||||
|
physicalTime, _ := ParseTS(ts)
|
||||||
|
return physicalTime
|
||||||
|
}
|
||||||
|
|
||||||
// ParseHybridTs parses the ts to (physical, logical), physical part is of utc-timestamp format.
|
// ParseHybridTs parses the ts to (physical, logical), physical part is of utc-timestamp format.
|
||||||
func ParseHybridTs(ts uint64) (int64, int64) {
|
func ParseHybridTs(ts uint64) (int64, int64) {
|
||||||
logical := ts & logicalBitsMask
|
logical := ts & logicalBitsMask
|
||||||
|
|
Loading…
Reference in New Issue