mirror of https://github.com/milvus-io/milvus.git
Move Rocksmq page info log to Rokcsmq monitor (#20022)
Signed-off-by: aoiasd <zhicheng.yue@zilliz.com> Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>pull/20022/merge
parent
98237b7cf7
commit
e470cd3dfa
|
@ -275,27 +275,54 @@ func (rmq *rocksmq) Close() {
|
|||
}
|
||||
|
||||
//print rmq consumer Info
|
||||
func (rmq *rocksmq) Info() {
|
||||
func (rmq *rocksmq) Info() bool {
|
||||
rtn := true
|
||||
rmq.consumers.Range(func(key, vals interface{}) bool {
|
||||
topic, _ := key.(string)
|
||||
consumers, _ := vals.([]*Consumer)
|
||||
consumerList, _ := vals.([]*Consumer)
|
||||
|
||||
consumersPosition := make([]UniqueID, len(consumers))
|
||||
consumersName := make([]string, len(consumers))
|
||||
for id, consumer := range consumers {
|
||||
groupKey := constructCurrentID(consumer.Topic, consumer.GroupName)
|
||||
groupPosition, ok := rmq.consumersID.Load(groupKey)
|
||||
minConsumerPosition := UniqueID(-1)
|
||||
minConsumerGroupName := ""
|
||||
for _, consumer := range consumerList {
|
||||
consumerKey := constructCurrentID(consumer.Topic, consumer.GroupName)
|
||||
consumerPosition, ok := rmq.consumersID.Load(consumerKey)
|
||||
if !ok {
|
||||
log.Error("some group not regist", zap.String("topic", consumer.Topic), zap.String("groupName", consumer.GroupName))
|
||||
continue
|
||||
}
|
||||
consumersPosition[id] = groupPosition.(UniqueID)
|
||||
consumersName[id] = consumer.GroupName
|
||||
if minConsumerPosition == UniqueID(-1) || consumerPosition.(UniqueID) < minConsumerPosition {
|
||||
minConsumerPosition = consumerPosition.(UniqueID)
|
||||
minConsumerGroupName = consumer.GroupName
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("Rocksmq Info", zap.String("topic", topic), zap.Int("consumer num", len(consumers)), zap.Any("group names", consumersName), zap.Any("group positions", consumersPosition))
|
||||
pageTsSizeKey := constructKey(PageTsTitle, topic)
|
||||
pages, _, err := rmq.kv.LoadWithPrefix(pageTsSizeKey)
|
||||
if err != nil {
|
||||
log.Error("Rocksmq get page num failed", zap.String("topic", topic))
|
||||
rtn = false
|
||||
return false
|
||||
}
|
||||
|
||||
msgSizeKey := MessageSizeTitle + topic
|
||||
msgSizeVal, err := rmq.kv.Load(msgSizeKey)
|
||||
if err != nil {
|
||||
log.Error("Rocksmq get last page size failed", zap.String("topic", topic))
|
||||
rtn = false
|
||||
return false
|
||||
}
|
||||
|
||||
log.Info("Rocksmq Info",
|
||||
zap.String("topic", topic),
|
||||
zap.Int("consumer num", len(consumerList)),
|
||||
zap.String("min position group names", minConsumerGroupName),
|
||||
zap.Int64("min positions", minConsumerPosition),
|
||||
zap.Int("page sum", len(pages)),
|
||||
zap.String("last page size", msgSizeVal),
|
||||
)
|
||||
return true
|
||||
})
|
||||
return rtn
|
||||
}
|
||||
|
||||
func (rmq *rocksmq) stopRetention() {
|
||||
|
@ -629,7 +656,6 @@ func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes
|
|||
// Current page is full
|
||||
newPageSize := curMsgSize + msgSize
|
||||
pageEndID := id
|
||||
log.Info("new page", zap.String("topic", topicName), zap.Int64("pageId", pageEndID))
|
||||
// Update page message size for current page. key is page end ID
|
||||
pageMsgSizeKey := fixedPageSizeKey + "/" + strconv.FormatInt(pageEndID, 10)
|
||||
mutateBuffer[pageMsgSizeKey] = strconv.FormatInt(newPageSize, 10)
|
||||
|
|
|
@ -1057,3 +1057,48 @@ func TestRocksmq_updateAckedInfoErr(t *testing.T) {
|
|||
// update acked for all page in rmq but some consumer not in rmq.consumers
|
||||
assert.Error(t, rmq.updateAckedInfo(topicName, groupName, 0, ids[len(ids)-1]))
|
||||
}
|
||||
|
||||
func TestRocksmq_Info(t *testing.T) {
|
||||
ep := etcdEndpoints()
|
||||
etcdCli, err := etcd.GetRemoteEtcdClient(ep)
|
||||
assert.Nil(t, err)
|
||||
etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root")
|
||||
defer etcdKV.Close()
|
||||
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
|
||||
_ = idAllocator.Initialize()
|
||||
|
||||
name := "/tmp/rocksmq_testinfo"
|
||||
defer os.RemoveAll(name)
|
||||
kvName := name + "_meta_kv"
|
||||
_ = os.RemoveAll(kvName)
|
||||
defer os.RemoveAll(kvName)
|
||||
var params paramtable.BaseTable
|
||||
params.Init()
|
||||
atomic.StoreInt64(&RocksmqPageSize, 10)
|
||||
rmq, err := NewRocksMQ(params, name, idAllocator)
|
||||
assert.Nil(t, err)
|
||||
defer rmq.Close()
|
||||
|
||||
topicName := "test_testinfo"
|
||||
groupName := "test"
|
||||
rmq.CreateTopic(topicName)
|
||||
defer rmq.DestroyTopic(topicName)
|
||||
|
||||
consumer := &Consumer{
|
||||
Topic: topicName,
|
||||
GroupName: groupName,
|
||||
}
|
||||
|
||||
_ = rmq.DestroyConsumerGroup(topicName, groupName)
|
||||
err = rmq.CreateConsumerGroup(topicName, groupName)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = rmq.RegisterConsumer(consumer)
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.True(t, rmq.Info())
|
||||
|
||||
//test error
|
||||
rmq.kv = &rocksdbkv.RocksdbKV{}
|
||||
assert.False(t, rmq.Info())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue