From 7a4de9897d48b1db2e78a14389081477df533585 Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Fri, 24 Dec 2021 14:14:15 +0800 Subject: [PATCH] Fix Rocksmq ignored error (#14011) Signed-off-by: xiaofan-luan --- internal/proxy/impl.go | 4 +++ .../rocksmq/server/rocksmq/rocksmq_impl.go | 31 +++++++++++++------ .../server/rocksmq/rocksmq_impl_test.go | 23 ++++++++++++++ 3 files changed, 49 insertions(+), 9 deletions(-) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 7e3c4c7d71..ed7d8da1a1 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -3572,6 +3572,10 @@ func (node *Proxy) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStat } resp, err = node.dataCoord.GetFlushState(ctx, req) + if err != nil { + log.Info("failed to get flush state response", zap.Error(err)) + return nil, err + } log.Info("received get flush state response", zap.Any("response", resp)) return resp, err } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index c6d6b8bc24..c2b670a532 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -262,6 +262,7 @@ func (rmq *rocksmq) Close() { defer rmq.storeMu.Unlock() rmq.kv.Close() rmq.store.Close() + log.Info("Successfully close rocksmq") } func (rmq *rocksmq) stopRetention() { @@ -565,7 +566,6 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni if err != nil { return []UniqueID{}, err } - updatePageTime := time.Since(start).Milliseconds() // TODO add this to monitor metrics getProduceTime := time.Since(start).Milliseconds() @@ -574,7 +574,7 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni zap.Int64("get lock elapse", getLockTime), zap.Int64("alloc elapse", allocTime), zap.Int64("write elapse", writeTime), - zap.Int64("updatePage elapse", updatePageTime)) + zap.Int64("updatePage elapse", getProduceTime)) } return msgIDs, nil } @@ -699,11 +699,17 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum consumedIDs = append(consumedIDs, msg.MsgID) } newID := consumedIDs[len(consumedIDs)-1] + err := rmq.updateAckedInfo(topicName, groupName, consumedIDs) + if err != nil { + log.Warn("failed to update acked info ", zap.String("topic", topicName), + zap.String("groupName", groupName), zap.Error(err)) + return nil, err + } rmq.moveConsumePos(topicName, groupName, newID+1) - rmq.updateAckedInfo(topicName, groupName, consumedIDs) + // TODO add this to monitor metrics getConsumeTime := time.Since(start).Milliseconds() - if getLockTime > 200 || getConsumeTime > 200 { + if getConsumeTime > 200 { log.Warn("rocksmq consume too slowly", zap.String("topic", topicName), zap.Int64("get lock elapse", getLockTime), zap.Int64("consume elapse", getConsumeTime)) } @@ -764,7 +770,13 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err lock.Lock() defer lock.Unlock() - return rmq.seek(topicName, groupName, msgID) + err := rmq.seek(topicName, groupName, msgID) + if err != nil { + log.Debug("failed to seek", zap.String("topic", topicName), zap.String("group", groupName), zap.Uint64("msgId", uint64(msgID)), zap.Error(err)) + return err + } + log.Debug("successfully seek", zap.String("topic", topicName), zap.String("group", groupName), zap.Uint64("msgId", uint64(msgID))) + return nil } // SeekToLatest updates current id to the msg id of latest message + 1 @@ -786,8 +798,9 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error { iter := rmq.store.NewIterator(readOpts) defer iter.Close() + prefix := topicName + "/" // seek to the last message of thie topic - iter.SeekForPrev([]byte(typeutil.AddOne(topicName + "/"))) + iter.SeekForPrev([]byte(typeutil.AddOne(prefix))) // if iterate fail if err := iter.Err(); err != nil { @@ -799,15 +812,13 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error { return nil } - fixTopicName := topicName + "/" - iKey := iter.Key() seekMsgID := string(iKey.Data()) if iKey != nil { iKey.Free() } // if find message is not belong to current channel, start from 0 - if !strings.Contains(seekMsgID, fixTopicName) { + if !strings.Contains(seekMsgID, prefix) { return nil } @@ -817,6 +828,8 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error { } // current msgID should not be included rmq.moveConsumePos(topicName, groupName, msgID+1) + log.Debug("successfully seek to latest", zap.String("topic", topicName), + zap.String("group", groupName), zap.Uint64("latest", uint64(msgID+1))) return nil } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go index 25082bfc4d..2a4721c14c 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go @@ -861,3 +861,26 @@ func TestRocksmq_Close(t *testing.T) { rmq.HasNext("", "") rmq.CloseReader("", "") } + +func TestRocksmq_SeekWithNoConsumerError(t *testing.T) { + ep := etcdEndpoints() + etcdKV, err := etcdkv.NewEtcdKV(ep, "/etcd/test/root") + assert.Nil(t, err) + defer etcdKV.Close() + idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV) + _ = idAllocator.Initialize() + + name := "/tmp/rocksmq_seekerror" + defer os.RemoveAll(name) + kvName := name + "_meta_kv" + _ = os.RemoveAll(kvName) + defer os.RemoveAll(kvName) + rmq, err := NewRocksMQ(name, idAllocator) + assert.Nil(t, err) + defer rmq.Close() + + rmq.CreateTopic("test") + err = rmq.Seek("test", "", 0) + fmt.Println(err) + assert.Error(t, err) +}