mirror of https://github.com/milvus-io/milvus.git
[Feature] Add benchmark and retention configuration for nmq (#25768)
Signed-off-by: chyezh <ye.zhen@zilliz.com>pull/25972/head
parent
302897f866
commit
e24a8b3606
|
@ -143,6 +143,10 @@ natsmq:
|
|||
logTime: true # true by default, If set to false, log without timestamps.
|
||||
logFile: # no log file by default, Log file path relative to.. .
|
||||
logSizeLimit: 0 # (B) 0, unlimited by default, Size in bytes after the log file rolls over to a new one.
|
||||
retention:
|
||||
maxAge: 4320 # (min) 3 days by default, Maximum age of any message in the P-channel.
|
||||
maxBytes: # (B) None by default, How many bytes the single P-channel may contain. Removing oldest messages if the P-channel exceeds this size.
|
||||
maxMsgs: # None by default, How many message the single P-channel may contain. Removing oldest messages if the P-channel exceeds this limit.
|
||||
|
||||
# Related configuration of rootCoord, used to handle data definition language (DDL) and data control language (DCL) requests
|
||||
rootCoord:
|
||||
|
|
|
@ -19,14 +19,14 @@ package nmq
|
|||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/nats-io/nats.go"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
)
|
||||
|
||||
|
@ -42,7 +42,6 @@ type nmqClient struct {
|
|||
// It retrieves the NMQ client URL from the server configuration.
|
||||
func NewClientWithDefaultOptions() (mqwrapper.Client, error) {
|
||||
url := Nmq.ClientURL()
|
||||
log.Info("123123 ", zap.String("url", url))
|
||||
return NewClient(url)
|
||||
}
|
||||
|
||||
|
@ -68,9 +67,13 @@ func (nc *nmqClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwrappe
|
|||
}
|
||||
// TODO: (1) investigate on performance of multiple streams vs multiple topics.
|
||||
// (2) investigate if we should have topics under the same stream.
|
||||
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: options.Topic,
|
||||
Subjects: []string{options.Topic},
|
||||
MaxAge: paramtable.Get().NatsmqCfg.ServerRetentionMaxAge.GetAsDuration(time.Minute),
|
||||
MaxBytes: paramtable.Get().NatsmqCfg.ServerRetentionMaxBytes.GetAsInt64(),
|
||||
MaxMsgs: paramtable.Get().NatsmqCfg.ServerRetentionMaxMsgs.GetAsInt64(),
|
||||
})
|
||||
if err != nil {
|
||||
metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateProducerLabel, metrics.FailLabel).Inc()
|
||||
|
@ -110,6 +113,9 @@ func (nc *nmqClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.Con
|
|||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: options.Topic,
|
||||
Subjects: []string{options.Topic},
|
||||
MaxAge: paramtable.Get().NatsmqCfg.ServerRetentionMaxAge.GetAsDuration(time.Minute),
|
||||
MaxBytes: paramtable.Get().NatsmqCfg.ServerRetentionMaxBytes.GetAsInt64(),
|
||||
MaxMsgs: paramtable.Get().NatsmqCfg.ServerRetentionMaxMsgs.GetAsInt64(),
|
||||
})
|
||||
if err != nil {
|
||||
metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc()
|
||||
|
|
|
@ -23,7 +23,6 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
|
||||
|
@ -238,33 +237,15 @@ func TestCheckTopicValid(t *testing.T) {
|
|||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func newTestConsumer(topic string, fromEarliest bool, fromLatest bool) (mqwrapper.Consumer, error) {
|
||||
conn, err := nats.Connect(natsServerAddress)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
js, err := conn.JetStream()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
groupName := topic
|
||||
_, err = js.AddStream(&nats.StreamConfig{
|
||||
Name: groupName,
|
||||
Subjects: []string{topic},
|
||||
func newTestConsumer(t *testing.T, topic string, position mqwrapper.SubscriptionInitialPosition) (mqwrapper.Consumer, error) {
|
||||
client, err := createNmqClient()
|
||||
assert.NoError(t, err)
|
||||
return client.Subscribe(mqwrapper.ConsumerOptions{
|
||||
Topic: topic,
|
||||
SubscriptionName: topic,
|
||||
SubscriptionInitialPosition: position,
|
||||
BufSize: 1024,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
natsChan := make(chan *nats.Msg, 8192)
|
||||
closeChan := make(chan struct{})
|
||||
return &Consumer{
|
||||
js: js,
|
||||
topic: topic,
|
||||
groupName: groupName,
|
||||
natsChan: natsChan,
|
||||
closeChan: closeChan,
|
||||
skip: false,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newProducer(t *testing.T, topic string) (*nmqClient, mqwrapper.Producer) {
|
||||
|
@ -401,7 +382,7 @@ func TestNatsConsumer_SeekExclusive(t *testing.T) {
|
|||
process(t, msgs, p)
|
||||
|
||||
msgID := &nmqID{messageID: 2}
|
||||
consumer, err := newTestConsumer(topic, false, false)
|
||||
consumer, err := newTestConsumer(t, topic, mqwrapper.SubscriptionPositionUnknown)
|
||||
assert.NoError(t, err)
|
||||
defer consumer.Close()
|
||||
err = consumer.Seek(msgID, false)
|
||||
|
@ -420,10 +401,11 @@ func TestNatsConsumer_SeekInclusive(t *testing.T) {
|
|||
defer p.Close()
|
||||
|
||||
msgs := []string{"111", "222", "333", "444", "555"}
|
||||
|
||||
process(t, msgs, p)
|
||||
|
||||
msgID := &nmqID{messageID: 2}
|
||||
consumer, err := newTestConsumer(topic, false, false)
|
||||
consumer, err := newTestConsumer(t, topic, mqwrapper.SubscriptionPositionUnknown)
|
||||
assert.NoError(t, err)
|
||||
defer consumer.Close()
|
||||
err = consumer.Seek(msgID, true)
|
||||
|
@ -442,7 +424,7 @@ func TestNatsConsumer_NoDoubleSeek(t *testing.T) {
|
|||
defer p.Close()
|
||||
|
||||
msgID := &nmqID{messageID: 2}
|
||||
consumer, err := newTestConsumer(topic, false, false)
|
||||
consumer, err := newTestConsumer(t, topic, mqwrapper.SubscriptionPositionUnknown)
|
||||
assert.NoError(t, err)
|
||||
defer consumer.Close()
|
||||
err = consumer.Seek(msgID, true)
|
||||
|
@ -460,7 +442,7 @@ func TestNatsConsumer_ChanWithNoAssign(t *testing.T) {
|
|||
msgs := []string{"111", "222", "333", "444", "555"}
|
||||
process(t, msgs, p)
|
||||
|
||||
consumer, err := newTestConsumer(topic, false, false)
|
||||
consumer, err := newTestConsumer(t, topic, mqwrapper.SubscriptionPositionUnknown)
|
||||
assert.NoError(t, err)
|
||||
defer consumer.Close()
|
||||
|
||||
|
|
|
@ -50,13 +50,13 @@ func MustInitNatsMQ(cfg *NatsMQConfig) {
|
|||
log.Fatal("fail to initailize nmq", zap.Error(err))
|
||||
}
|
||||
|
||||
log.Info("initialize nmq finished", zap.Error(err))
|
||||
// Start Nmq in background and wait until it's ready for connection.
|
||||
go Nmq.Start()
|
||||
// Wait for server to be ready for connections
|
||||
if !Nmq.ReadyForConnections(cfg.InitializeTimeout) {
|
||||
log.Fatal("nmq is not ready")
|
||||
log.Fatal("nmq is not ready within timeout")
|
||||
}
|
||||
log.Info("initialize nmq finished", zap.String("client-url", Nmq.ClientURL()), zap.Error(err))
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,95 @@
|
|||
package msgstream
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper/nmq"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func BenchmarkProduceAndConsumeNatsMQ(b *testing.B) {
|
||||
storeDir, err := os.MkdirTemp("", "milvus_mq_nmq")
|
||||
assert.NoError(b, err)
|
||||
defer os.RemoveAll(storeDir)
|
||||
|
||||
paramtable.Init()
|
||||
cfg := nmq.ParseServerOption(paramtable.Get())
|
||||
cfg.Opts.StoreDir = storeDir
|
||||
nmq.MustInitNatsMQ(cfg)
|
||||
|
||||
client, err := nmq.NewClientWithDefaultOptions()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
cases := generateRandBytes(64*1024, 10000)
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
benchmarkProduceAndConsume(b, client, cases)
|
||||
}
|
||||
}
|
||||
|
||||
func benchmarkProduceAndConsume(b *testing.B, mqClient mqwrapper.Client, cases [][]byte) {
|
||||
topic := fmt.Sprintf("test_produce_and_consume_topic_%d", rand.Int31n(100000))
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
p, err := mqClient.CreateProducer(mqwrapper.ProducerOptions{
|
||||
Topic: topic,
|
||||
})
|
||||
assert.NoError(b, err)
|
||||
defer p.Close()
|
||||
benchmarkMQProduce(b, p, cases)
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
c, _ := mqClient.Subscribe(mqwrapper.ConsumerOptions{
|
||||
Topic: topic,
|
||||
SubscriptionName: topic,
|
||||
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
|
||||
BufSize: 1024,
|
||||
})
|
||||
defer c.Close()
|
||||
benchmarkMQConsume(b, c, cases)
|
||||
}()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func benchmarkMQConsume(b *testing.B, c mqwrapper.Consumer, cases [][]byte) {
|
||||
ch := c.Chan()
|
||||
for range cases {
|
||||
msg := <-ch
|
||||
c.Ack(msg)
|
||||
}
|
||||
c.Close()
|
||||
}
|
||||
|
||||
func benchmarkMQProduce(b *testing.B, p mqwrapper.Producer, cases [][]byte) {
|
||||
for _, c := range cases {
|
||||
p.Send(context.Background(), &mqwrapper.ProducerMessage{
|
||||
Payload: c,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func generateRandBytes(m int, n int) [][]byte {
|
||||
letterRunes := funcutil.RandomBytes(2 * m)
|
||||
cases := make([][]byte, 0, n)
|
||||
for i := 0; i < n; i++ {
|
||||
startOffset := rand.Intn(m)
|
||||
endOffset := startOffset + m
|
||||
|
||||
cases = append(cases, letterRunes[startOffset:endOffset])
|
||||
}
|
||||
return cases
|
||||
}
|
|
@ -677,6 +677,9 @@ type NatsmqConfig struct {
|
|||
ServerMonitorLogTime ParamItem `refreshable:"false"`
|
||||
ServerMonitorLogFile ParamItem `refreshable:"false"`
|
||||
ServerMonitorLogSizeLimit ParamItem `refreshable:"false"`
|
||||
ServerRetentionMaxAge ParamItem `refreshable:"true"`
|
||||
ServerRetentionMaxBytes ParamItem `refreshable:"true"`
|
||||
ServerRetentionMaxMsgs ParamItem `refreshable:"true"`
|
||||
}
|
||||
|
||||
// Init sets up a new NatsmqConfig instance using the provided BaseTable
|
||||
|
@ -761,6 +764,31 @@ func (r *NatsmqConfig) Init(base *BaseTable) {
|
|||
Export: true,
|
||||
}
|
||||
r.ServerMonitorLogSizeLimit.Init(base.mgr)
|
||||
|
||||
r.ServerRetentionMaxAge = ParamItem{
|
||||
Key: "natsmq.server.retention.maxAge",
|
||||
Version: "2.3.0",
|
||||
DefaultValue: "4320",
|
||||
Doc: `Maximum age of any message in the P-channel`,
|
||||
Export: true,
|
||||
}
|
||||
r.ServerRetentionMaxAge.Init(base.mgr)
|
||||
r.ServerRetentionMaxBytes = ParamItem{
|
||||
Key: "natsmq.server.retention.maxBytes",
|
||||
Version: "2.3.0",
|
||||
DefaultValue: "",
|
||||
Doc: `How many bytes the single P-channel may contain. Removing oldest messages if the P-channel exceeds this size`,
|
||||
Export: true,
|
||||
}
|
||||
r.ServerRetentionMaxBytes.Init(base.mgr)
|
||||
r.ServerRetentionMaxMsgs = ParamItem{
|
||||
Key: "natsmq.server.retention.maxMsgs",
|
||||
Version: "2.3.0",
|
||||
DefaultValue: "",
|
||||
Doc: `How many message the single P-channel may contain. Removing oldest messages if the P-channel exceeds this limit`,
|
||||
Export: true,
|
||||
}
|
||||
r.ServerRetentionMaxMsgs.Init(base.mgr)
|
||||
}
|
||||
|
||||
// /////////////////////////////////////////////////////////////////////////////
|
||||
|
|
Loading…
Reference in New Issue