mirror of https://github.com/milvus-io/milvus.git
Refine `NewNmqClient` logic with nats.ConsumeDialer option (#26987)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/26859/head
parent
9e0977622d
commit
1aa3a1e067
|
@ -19,6 +19,7 @@ package nmq
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
|
@ -39,20 +40,33 @@ type nmqClient struct {
|
|||
conn *nats.Conn
|
||||
}
|
||||
|
||||
type nmqDialer struct {
|
||||
ctx func() context.Context
|
||||
}
|
||||
|
||||
func (d *nmqDialer) Dial(network, address string) (net.Conn, error) {
|
||||
ctx := d.ctx()
|
||||
|
||||
dial := &net.Dialer{}
|
||||
|
||||
// keep default 2s timeout
|
||||
if _, ok := ctx.Deadline(); !ok {
|
||||
dial.Timeout = 2 * time.Second
|
||||
}
|
||||
|
||||
return dial.DialContext(ctx, network, address)
|
||||
}
|
||||
|
||||
// NewClientWithDefaultOptions returns a new NMQ client with default options.
|
||||
// It retrieves the NMQ client URL from the server configuration.
|
||||
func NewClientWithDefaultOptions(ctx context.Context) (mqwrapper.Client, error) {
|
||||
url := Nmq.ClientURL()
|
||||
|
||||
if deadline, ok := ctx.Deadline(); ok {
|
||||
if deadline.Before(time.Now()) {
|
||||
return nil, errors.New("context timeout when new nmq client")
|
||||
}
|
||||
timeoutOption := nats.Timeout(time.Until(deadline))
|
||||
return NewClient(url, timeoutOption)
|
||||
}
|
||||
opt := nats.SetCustomDialer(&nmqDialer{
|
||||
ctx: func() context.Context { return ctx },
|
||||
})
|
||||
|
||||
return NewClient(url)
|
||||
return NewClient(url, opt)
|
||||
}
|
||||
|
||||
// NewClient returns a new nmqClient object
|
||||
|
|
|
@ -55,13 +55,14 @@ func Test_NewNmqClient(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
var cancel context.CancelFunc
|
||||
if test.withTimeout {
|
||||
ctx, cancel = context.WithTimeout(ctx, time.Millisecond)
|
||||
defer cancel()
|
||||
ctx, cancel = context.WithTimeout(ctx, time.Second)
|
||||
if test.ctxTimeouted {
|
||||
cancel()
|
||||
} else {
|
||||
defer cancel()
|
||||
}
|
||||
}
|
||||
|
||||
if test.ctxTimeouted {
|
||||
<-time.After(time.Millisecond)
|
||||
}
|
||||
client, err := NewClientWithDefaultOptions(ctx)
|
||||
|
||||
if test.expectErr {
|
||||
|
|
Loading…
Reference in New Issue