Remove not needed ctx from rocksmq client (#10416)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/10442/head
congqixia 2021-10-22 17:25:12 +08:00 committed by GitHub
parent f97690fcff
commit 76b64a9d01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 13 additions and 29 deletions

View File

@ -12,8 +12,6 @@
package rocksmq package rocksmq
import ( import (
"context"
server "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq" server "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq"
) )
@ -31,8 +29,6 @@ func NewClient(options ClientOptions) (Client, error) {
// ClientOptions is the options of a client // ClientOptions is the options of a client
type ClientOptions struct { type ClientOptions struct {
Server RocksMQ Server RocksMQ
Ctx context.Context
Cancel context.CancelFunc
} }
// Client is the interface rocksmq client // Client is the interface rocksmq client

View File

@ -12,7 +12,6 @@
package rocksmq package rocksmq
import ( import (
"context"
"reflect" "reflect"
"sync" "sync"
@ -25,9 +24,9 @@ type client struct {
server RocksMQ server RocksMQ
producerOptions []ProducerOptions producerOptions []ProducerOptions
consumerOptions []ConsumerOptions consumerOptions []ConsumerOptions
ctx context.Context
cancel context.CancelFunc
wg *sync.WaitGroup wg *sync.WaitGroup
closeCh chan struct{}
closeOnce sync.Once
} }
func newClient(options ClientOptions) (*client, error) { func newClient(options ClientOptions) (*client, error) {
@ -35,16 +34,11 @@ func newClient(options ClientOptions) (*client, error) {
return nil, newError(InvalidConfiguration, "options.Server is nil") return nil, newError(InvalidConfiguration, "options.Server is nil")
} }
if options.Ctx == nil {
options.Ctx, options.Cancel = context.WithCancel(context.Background())
}
c := &client{ c := &client{
server: options.Server, server: options.Server,
producerOptions: []ProducerOptions{}, producerOptions: []ProducerOptions{},
ctx: options.Ctx,
cancel: options.Cancel,
wg: &sync.WaitGroup{}, wg: &sync.WaitGroup{},
closeCh: make(chan struct{}),
} }
return c, nil return c, nil
} }
@ -128,7 +122,7 @@ func (c *client) consume(consumer *consumer) {
defer c.wg.Done() defer c.wg.Done()
for { for {
select { select {
case <-c.ctx.Done(): case <-c.closeCh:
return return
case _, ok := <-consumer.MsgMutex(): case _, ok := <-consumer.MsgMutex():
if !ok { if !ok {
@ -164,11 +158,13 @@ func (c *client) consume(consumer *consumer) {
func (c *client) Close() { func (c *client) Close() {
// TODO(yukun): Should call server.close() here? // TODO(yukun): Should call server.close() here?
c.cancel() c.closeOnce.Do(func() {
// Wait all consume goroutines exit close(c.closeCh)
c.wg.Wait() c.wg.Wait()
if c.server != nil { if c.server != nil {
c.server.Close() c.server.Close()
} }
c.consumerOptions = nil // Wait all consume goroutines exit
c.consumerOptions = nil
})
} }

View File

@ -12,7 +12,6 @@
package rocksmq package rocksmq
import ( import (
"context"
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -68,11 +67,8 @@ func TestClient_CreateProducer(t *testing.T) {
} }
func TestClient_Subscribe(t *testing.T) { func TestClient_Subscribe(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
client, err := NewClient(ClientOptions{ client, err := NewClient(ClientOptions{
Server: newMockRocksMQ(), Server: newMockRocksMQ(),
Ctx: ctx,
Cancel: cancel,
}) })
assert.NoError(t, err) assert.NoError(t, err)
@ -128,11 +124,8 @@ func TestClient_consume(t *testing.T) {
rmqPath := "/tmp/milvus/test_client3" rmqPath := "/tmp/milvus/test_client3"
rmq := newRocksMQ(rmqPath) rmq := newRocksMQ(rmqPath)
defer removePath(rmqPath) defer removePath(rmqPath)
ctx, cancel := context.WithCancel(context.Background())
client, err := NewClient(ClientOptions{ client, err := NewClient(ClientOptions{
Server: rmq, Server: rmq,
Ctx: ctx,
Cancel: cancel,
}) })
assert.NoError(t, err) assert.NoError(t, err)
defer client.Close() defer client.Close()
@ -159,5 +152,4 @@ func TestClient_consume(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
<-consumer.Chan() <-consumer.Chan()
} }